iroh_docs/
api.rs

1//! irpc-based RPC implementation for docs.
2
3#![allow(missing_docs)]
4
5use std::{
6    future::Future,
7    net::SocketAddr,
8    path::Path,
9    pin::Pin,
10    sync::{
11        atomic::{AtomicBool, Ordering},
12        Arc,
13    },
14    task::{ready, Poll},
15};
16
17use anyhow::{Context, Result};
18use bytes::Bytes;
19use iroh::NodeAddr;
20use iroh_blobs::{
21    api::blobs::{AddPathOptions, AddProgressItem, ExportMode, ExportOptions, ExportProgress},
22    Hash,
23};
24use irpc::rpc::Handler;
25use n0_future::{
26    task::{self, AbortOnDropHandle},
27    FutureExt, Stream, StreamExt,
28};
29
30use self::{
31    actor::RpcActor,
32    protocol::{
33        AddrInfoOptions, AuthorCreateRequest, AuthorDeleteRequest, AuthorExportRequest,
34        AuthorGetDefaultRequest, AuthorImportRequest, AuthorListRequest, AuthorSetDefaultRequest,
35        CloseRequest, CreateRequest, DelRequest, DocsProtocol, DropRequest,
36        GetDownloadPolicyRequest, GetExactRequest, GetManyRequest, GetSyncPeersRequest,
37        ImportRequest, LeaveRequest, ListRequest, OpenRequest, SetDownloadPolicyRequest,
38        SetHashRequest, SetRequest, ShareMode, ShareRequest, StartSyncRequest, StatusRequest,
39        SubscribeRequest,
40    },
41};
42use crate::{
43    actor::OpenState,
44    engine::{Engine, LiveEvent},
45    store::{DownloadPolicy, Query},
46    Author, AuthorId, Capability, CapabilityKind, DocTicket, Entry, NamespaceId, PeerIdBytes,
47};
48
49pub(crate) mod actor;
50pub mod protocol;
51
52pub type RpcError = serde_error::Error;
53pub type RpcResult<T> = std::result::Result<T, RpcError>;
54
55type Client = irpc::Client<DocsProtocol>;
56
57/// API wrapper for the docs service
58#[derive(Debug, Clone)]
59pub struct DocsApi {
60    pub(crate) inner: Client,
61}
62
63impl DocsApi {
64    /// Create a new docs API from an engine
65    pub fn spawn(engine: Arc<Engine>) -> Self {
66        RpcActor::spawn(engine)
67    }
68
69    /// Connect to a remote docs service
70    pub fn connect(endpoint: quinn::Endpoint, addr: SocketAddr) -> Result<DocsApi> {
71        Ok(DocsApi {
72            inner: Client::quinn(endpoint, addr),
73        })
74    }
75
76    /// Listen for incoming RPC connections
77    pub fn listen(&self, endpoint: quinn::Endpoint) -> Result<AbortOnDropHandle<()>> {
78        let local = self
79            .inner
80            .as_local()
81            .context("cannot listen on remote API")?;
82        let handler: Handler<DocsProtocol> = Arc::new(move |msg, _rx, tx| {
83            let local = local.clone();
84            Box::pin(async move {
85                match msg {
86                    DocsProtocol::Open(msg) => local.send((msg, tx)).await,
87                    DocsProtocol::Close(msg) => local.send((msg, tx)).await,
88                    DocsProtocol::Status(msg) => local.send((msg, tx)).await,
89                    DocsProtocol::List(msg) => local.send((msg, tx)).await,
90                    DocsProtocol::Create(msg) => local.send((msg, tx)).await,
91                    DocsProtocol::Drop(msg) => local.send((msg, tx)).await,
92                    DocsProtocol::Import(msg) => local.send((msg, tx)).await,
93                    DocsProtocol::Set(msg) => local.send((msg, tx)).await,
94                    DocsProtocol::SetHash(msg) => local.send((msg, tx)).await,
95                    DocsProtocol::Get(msg) => local.send((msg, tx)).await,
96                    DocsProtocol::GetExact(msg) => local.send((msg, tx)).await,
97                    // DocsProtocol::ImportFile(msg) => local.send((msg, tx)).await,
98                    // DocsProtocol::ExportFile(msg) => local.send((msg, tx)).await,
99                    DocsProtocol::Del(msg) => local.send((msg, tx)).await,
100                    DocsProtocol::StartSync(msg) => local.send((msg, tx)).await,
101                    DocsProtocol::Leave(msg) => local.send((msg, tx)).await,
102                    DocsProtocol::Share(msg) => local.send((msg, tx)).await,
103                    DocsProtocol::Subscribe(msg) => local.send((msg, tx)).await,
104                    DocsProtocol::GetDownloadPolicy(msg) => local.send((msg, tx)).await,
105                    DocsProtocol::SetDownloadPolicy(msg) => local.send((msg, tx)).await,
106                    DocsProtocol::GetSyncPeers(msg) => local.send((msg, tx)).await,
107                    DocsProtocol::AuthorList(msg) => local.send((msg, tx)).await,
108                    DocsProtocol::AuthorCreate(msg) => local.send((msg, tx)).await,
109                    DocsProtocol::AuthorGetDefault(msg) => local.send((msg, tx)).await,
110                    DocsProtocol::AuthorSetDefault(msg) => local.send((msg, tx)).await,
111                    DocsProtocol::AuthorImport(msg) => local.send((msg, tx)).await,
112                    DocsProtocol::AuthorExport(msg) => local.send((msg, tx)).await,
113                    DocsProtocol::AuthorDelete(msg) => local.send((msg, tx)).await,
114                }
115            })
116        });
117        let join_handle = task::spawn(irpc::rpc::listen(endpoint, handler));
118        Ok(AbortOnDropHandle::new(join_handle))
119    }
120
121    /// Creates a new document author.
122    ///
123    /// You likely want to save the returned [`AuthorId`] somewhere so that you can use this author
124    /// again.
125    ///
126    /// If you need only a single author, use [`Self::author_default`].
127    pub async fn author_create(&self) -> Result<AuthorId> {
128        let response = self.inner.rpc(AuthorCreateRequest).await??;
129        Ok(response.author_id)
130    }
131
132    /// Returns the default document author of this node.
133    ///
134    /// On persistent nodes, the author is created on first start and its public key is saved
135    /// in the data directory.
136    ///
137    /// The default author can be set with [`Self::author_set_default`].
138    pub async fn author_default(&self) -> Result<AuthorId> {
139        let response = self.inner.rpc(AuthorGetDefaultRequest).await??;
140        Ok(response.author_id)
141    }
142
143    /// Sets the node-wide default author.
144    ///
145    /// If the author does not exist, an error is returned.
146    ///
147    /// On a persistent node, the author id will be saved to a file in the data directory and
148    /// reloaded after a restart.
149    pub async fn author_set_default(&self, author_id: AuthorId) -> Result<()> {
150        self.inner
151            .rpc(AuthorSetDefaultRequest { author_id })
152            .await??;
153        Ok(())
154    }
155
156    /// Lists document authors for which we have a secret key.
157    ///
158    /// It's only possible to create writes from authors that we have the secret key of.
159    pub async fn author_list(&self) -> Result<impl Stream<Item = Result<AuthorId>>> {
160        let stream = self.inner.server_streaming(AuthorListRequest, 64).await?;
161        Ok(stream.into_stream().map(|res| match res {
162            Err(err) => Err(err.into()),
163            Ok(Err(err)) => Err(err.into()),
164            Ok(Ok(res)) => Ok(res.author_id),
165        }))
166    }
167
168    /// Exports the given author.
169    ///
170    /// Warning: The [`Author`] struct contains sensitive data.
171    pub async fn author_export(&self, author: AuthorId) -> Result<Option<Author>> {
172        let response = self.inner.rpc(AuthorExportRequest { author }).await??;
173        Ok(response.author)
174    }
175
176    /// Imports the given author.
177    ///
178    /// Warning: The [`Author`] struct contains sensitive data.
179    pub async fn author_import(&self, author: Author) -> Result<()> {
180        self.inner.rpc(AuthorImportRequest { author }).await??;
181        Ok(())
182    }
183
184    /// Deletes the given author by id.
185    ///
186    /// Warning: This permanently removes this author.
187    ///
188    /// Returns an error if attempting to delete the default author.
189    pub async fn author_delete(&self, author: AuthorId) -> Result<()> {
190        self.inner.rpc(AuthorDeleteRequest { author }).await??;
191        Ok(())
192    }
193
194    /// Creates a new document.
195    pub async fn create(&self) -> Result<Doc> {
196        let response = self.inner.rpc(CreateRequest).await??;
197        Ok(Doc::new(self.inner.clone(), response.id))
198    }
199
200    /// Deletes a document from the local node.
201    ///
202    /// This is a destructive operation. Both the document secret key and all entries in the
203    /// document will be permanently deleted from the node's storage. Content blobs will be deleted
204    /// through garbage collection unless they are referenced from another document or tag.
205    pub async fn drop_doc(&self, doc_id: NamespaceId) -> Result<()> {
206        self.inner.rpc(DropRequest { doc_id }).await??;
207        Ok(())
208    }
209
210    /// Imports a document from a namespace capability.
211    ///
212    /// This does not start sync automatically. Use [`Doc::start_sync`] to start sync.
213    pub async fn import_namespace(&self, capability: Capability) -> Result<Doc> {
214        let response = self.inner.rpc(ImportRequest { capability }).await??;
215        Ok(Doc::new(self.inner.clone(), response.doc_id))
216    }
217
218    /// Imports a document from a ticket and joins all peers in the ticket.
219    pub async fn import(&self, ticket: DocTicket) -> Result<Doc> {
220        let DocTicket { capability, nodes } = ticket;
221        let doc = self.import_namespace(capability).await?;
222        doc.start_sync(nodes).await?;
223        Ok(doc)
224    }
225
226    /// Imports a document from a ticket, creates a subscription stream and joins all peers in the ticket.
227    ///
228    /// Returns the [`Doc`] and a [`Stream`] of [`LiveEvent`]s.
229    ///
230    /// The subscription stream is created before the sync is started, so the first call to this
231    /// method after starting the node is guaranteed to not miss any sync events.
232    pub async fn import_and_subscribe(
233        &self,
234        ticket: DocTicket,
235    ) -> Result<(Doc, impl Stream<Item = Result<LiveEvent>>)> {
236        let DocTicket { capability, nodes } = ticket;
237        let response = self.inner.rpc(ImportRequest { capability }).await??;
238        let doc = Doc::new(self.inner.clone(), response.doc_id);
239        let events = doc.subscribe().await?;
240        doc.start_sync(nodes).await?;
241        Ok((doc, events))
242    }
243
244    /// Lists all documents.
245    pub async fn list(
246        &self,
247    ) -> Result<impl Stream<Item = Result<(NamespaceId, CapabilityKind)>> + Unpin + Send + 'static>
248    {
249        let stream = self.inner.server_streaming(ListRequest, 64).await?;
250        let stream = Box::pin(stream.into_stream());
251        Ok(stream.map(|res| match res {
252            Err(err) => Err(err.into()),
253            Ok(Err(err)) => Err(err.into()),
254            Ok(Ok(res)) => Ok((res.id, res.capability)),
255        }))
256    }
257
258    /// Returns a [`Doc`] client for a single document.
259    ///
260    /// Returns None if the document cannot be found.
261    pub async fn open(&self, id: NamespaceId) -> Result<Option<Doc>> {
262        self.inner.rpc(OpenRequest { doc_id: id }).await??;
263        Ok(Some(Doc::new(self.inner.clone(), id)))
264    }
265}
266
267/// Document handle
268#[derive(Debug, Clone)]
269pub struct Doc {
270    inner: Client,
271    namespace_id: NamespaceId,
272    closed: Arc<AtomicBool>,
273}
274
275impl Doc {
276    fn new(inner: Client, namespace_id: NamespaceId) -> Self {
277        Self {
278            inner,
279            namespace_id,
280            closed: Default::default(),
281        }
282    }
283
284    /// Returns the document id of this doc.
285    pub fn id(&self) -> NamespaceId {
286        self.namespace_id
287    }
288
289    /// Closes the document.
290    pub async fn close(&self) -> Result<()> {
291        self.closed.store(true, Ordering::Relaxed);
292        self.inner
293            .rpc(CloseRequest {
294                doc_id: self.namespace_id,
295            })
296            .await??;
297        Ok(())
298    }
299
300    fn ensure_open(&self) -> Result<()> {
301        if self.closed.load(Ordering::Relaxed) {
302            Err(anyhow::anyhow!("document is closed"))
303        } else {
304            Ok(())
305        }
306    }
307
308    /// Sets the content of a key to a byte array.
309    pub async fn set_bytes(
310        &self,
311        author_id: AuthorId,
312        key: impl Into<Bytes>,
313        value: impl Into<Bytes>,
314    ) -> Result<Hash> {
315        self.ensure_open()?;
316        let response = self
317            .inner
318            .rpc(SetRequest {
319                doc_id: self.namespace_id,
320                author_id,
321                key: key.into(),
322                value: value.into(),
323            })
324            .await??;
325        Ok(response.entry.content_hash())
326    }
327
328    /// Sets an entry on the doc via its key, hash, and size.
329    pub async fn set_hash(
330        &self,
331        author_id: AuthorId,
332        key: impl Into<Bytes>,
333        hash: Hash,
334        size: u64,
335    ) -> Result<()> {
336        self.ensure_open()?;
337        self.inner
338            .rpc(SetHashRequest {
339                doc_id: self.namespace_id,
340                author_id,
341                key: key.into(),
342                hash,
343                size,
344            })
345            .await??;
346        Ok(())
347    }
348
349    /// Deletes entries that match the given `author` and key `prefix`.
350    ///
351    /// This inserts an empty entry with the key set to `prefix`, effectively clearing all other
352    /// entries whose key starts with or is equal to the given `prefix`.
353    ///
354    /// Returns the number of entries deleted.
355    pub async fn del(&self, author_id: AuthorId, prefix: impl Into<Bytes>) -> Result<usize> {
356        self.ensure_open()?;
357        let response = self
358            .inner
359            .rpc(DelRequest {
360                doc_id: self.namespace_id,
361                author_id,
362                prefix: prefix.into(),
363            })
364            .await??;
365        Ok(response.removed)
366    }
367
368    /// Returns an entry for a key and author.
369    ///
370    /// Optionally also returns the entry unless it is empty (i.e. a deletion marker).
371    pub async fn get_exact(
372        &self,
373        author: AuthorId,
374        key: impl AsRef<[u8]>,
375        include_empty: bool,
376    ) -> Result<Option<Entry>> {
377        self.ensure_open()?;
378        let response = self
379            .inner
380            .rpc(GetExactRequest {
381                author,
382                key: key.as_ref().to_vec().into(),
383                doc_id: self.namespace_id,
384                include_empty,
385            })
386            .await??;
387        Ok(response.entry.map(|entry| entry.into()))
388    }
389
390    /// Returns all entries matching the query.
391    pub async fn get_many(
392        &self,
393        query: impl Into<Query>,
394    ) -> Result<impl Stream<Item = Result<Entry>>> {
395        self.ensure_open()?;
396        let stream = self
397            .inner
398            .server_streaming(
399                GetManyRequest {
400                    doc_id: self.namespace_id,
401                    query: query.into(),
402                },
403                64,
404            )
405            .await?;
406        Ok(stream.into_stream().map(|res| match res {
407            Err(err) => Err(err.into()),
408            Ok(Err(err)) => Err(err.into()),
409            Ok(Ok(res)) => Ok(res.into()),
410        }))
411    }
412
413    /// Returns a single entry.
414    pub async fn get_one(&self, query: impl Into<Query>) -> Result<Option<Entry>> {
415        self.ensure_open()?;
416        let stream = self.get_many(query).await?;
417        tokio::pin!(stream);
418        futures_lite::StreamExt::next(&mut stream).await.transpose()
419    }
420
421    /// Shares this document with peers over a ticket.
422    pub async fn share(&self, mode: ShareMode, addr_options: AddrInfoOptions) -> Result<DocTicket> {
423        self.ensure_open()?;
424        let response = self
425            .inner
426            .rpc(ShareRequest {
427                doc_id: self.namespace_id,
428                mode,
429                addr_options,
430            })
431            .await??;
432        Ok(response.0)
433    }
434
435    /// Starts to sync this document with a list of peers.
436    pub async fn start_sync(&self, peers: Vec<NodeAddr>) -> Result<()> {
437        self.ensure_open()?;
438        self.inner
439            .rpc(StartSyncRequest {
440                doc_id: self.namespace_id,
441                peers,
442            })
443            .await??;
444        Ok(())
445    }
446
447    /// Stops the live sync for this document.
448    pub async fn leave(&self) -> Result<()> {
449        self.ensure_open()?;
450        self.inner
451            .rpc(LeaveRequest {
452                doc_id: self.namespace_id,
453            })
454            .await??;
455        Ok(())
456    }
457
458    /// Subscribes to events for this document.
459    pub async fn subscribe(
460        &self,
461    ) -> Result<impl Stream<Item = Result<LiveEvent>> + Send + Unpin + 'static> {
462        self.ensure_open()?;
463        let stream = self
464            .inner
465            .server_streaming(
466                SubscribeRequest {
467                    doc_id: self.namespace_id,
468                },
469                64,
470            )
471            .await?;
472        Ok(Box::pin(stream.into_stream().map(|res| match res {
473            Err(err) => Err(err.into()),
474            Ok(Err(err)) => Err(err.into()),
475            Ok(Ok(res)) => Ok(res.event),
476        })))
477    }
478
479    /// Returns status info for this document
480    pub async fn status(&self) -> Result<OpenState> {
481        self.ensure_open()?;
482        let response = self
483            .inner
484            .rpc(StatusRequest {
485                doc_id: self.namespace_id,
486            })
487            .await??;
488        Ok(response.status)
489    }
490
491    /// Sets the download policy for this document
492    pub async fn set_download_policy(&self, policy: DownloadPolicy) -> Result<()> {
493        self.ensure_open()?;
494        self.inner
495            .rpc(SetDownloadPolicyRequest {
496                doc_id: self.namespace_id,
497                policy,
498            })
499            .await??;
500        Ok(())
501    }
502
503    /// Returns the download policy for this document
504    pub async fn get_download_policy(&self) -> Result<DownloadPolicy> {
505        self.ensure_open()?;
506        let response = self
507            .inner
508            .rpc(GetDownloadPolicyRequest {
509                doc_id: self.namespace_id,
510            })
511            .await??;
512        Ok(response.policy)
513    }
514
515    /// Returns sync peers for this document
516    pub async fn get_sync_peers(&self) -> Result<Option<Vec<PeerIdBytes>>> {
517        self.ensure_open()?;
518        let response = self
519            .inner
520            .rpc(GetSyncPeersRequest {
521                doc_id: self.namespace_id,
522            })
523            .await??;
524        Ok(response.peers)
525    }
526
527    /// Adds an entry from an absolute file path
528    pub async fn import_file(
529        &self,
530        blobs: &iroh_blobs::api::Store,
531        author: AuthorId,
532        key: Bytes,
533        path: impl AsRef<Path>,
534        import_mode: iroh_blobs::api::blobs::ImportMode,
535    ) -> Result<ImportFileProgress> {
536        self.ensure_open()?;
537        let progress = blobs.add_path_with_opts(AddPathOptions {
538            path: path.as_ref().to_owned(),
539            format: iroh_blobs::BlobFormat::Raw,
540            mode: import_mode,
541        });
542        let stream = progress.stream().await;
543        let doc = self.clone();
544        let ctx = EntryContext {
545            doc,
546            author,
547            key,
548            size: None,
549        };
550        Ok(ImportFileProgress(ImportInner::Blobs(
551            Box::pin(stream),
552            Some(ctx),
553        )))
554    }
555
556    /// Exports an entry as a file to a given absolute path.
557    pub async fn export_file(
558        &self,
559        blobs: &iroh_blobs::api::Store,
560        entry: Entry,
561        path: impl AsRef<Path>,
562        mode: ExportMode,
563    ) -> Result<ExportProgress> {
564        self.ensure_open()?;
565        let hash = entry.content_hash();
566        let progress = blobs.export_with_opts(ExportOptions {
567            hash,
568            mode,
569            target: path.as_ref().to_path_buf(),
570        });
571        Ok(progress)
572    }
573}
574
575#[derive(Debug)]
576pub enum ImportFileProgressItem {
577    Error(anyhow::Error),
578    Blobs(AddProgressItem),
579    Done(ImportFileOutcome),
580}
581
582#[derive(Debug)]
583pub struct ImportFileProgress(ImportInner);
584
585#[derive(derive_more::Debug)]
586enum ImportInner {
587    #[debug("Blobs")]
588    Blobs(
589        n0_future::boxed::BoxStream<AddProgressItem>,
590        Option<EntryContext>,
591    ),
592    #[debug("Entry")]
593    Entry(n0_future::boxed::BoxFuture<Result<ImportFileOutcome>>),
594    Done,
595}
596
597struct EntryContext {
598    doc: Doc,
599    author: AuthorId,
600    key: Bytes,
601    size: Option<u64>,
602}
603
604impl Stream for ImportFileProgress {
605    type Item = ImportFileProgressItem;
606
607    fn poll_next(
608        self: Pin<&mut Self>,
609        cx: &mut std::task::Context<'_>,
610    ) -> Poll<Option<Self::Item>> {
611        let this = self.get_mut();
612        match this.0 {
613            ImportInner::Blobs(ref mut progress, ref mut context) => {
614                match ready!(progress.poll_next(cx)) {
615                    Some(item) => match item {
616                        AddProgressItem::Size(size) => {
617                            context
618                                .as_mut()
619                                .expect("Size must be emitted before done")
620                                .size = Some(size);
621                            Poll::Ready(Some(ImportFileProgressItem::Blobs(AddProgressItem::Size(
622                                size,
623                            ))))
624                        }
625                        AddProgressItem::Error(err) => {
626                            *this = Self(ImportInner::Done);
627                            Poll::Ready(Some(ImportFileProgressItem::Error(err.into())))
628                        }
629                        AddProgressItem::Done(tag) => {
630                            let EntryContext {
631                                doc,
632                                author,
633                                key,
634                                size,
635                            } = context
636                                .take()
637                                .expect("AddProgressItem::Done may be emitted only once");
638                            let size = size.expect("Size must be emitted before done");
639                            let hash = tag.hash();
640                            *this = Self(ImportInner::Entry(Box::pin(async move {
641                                doc.set_hash(author, key.clone(), hash, size).await?;
642                                Ok(ImportFileOutcome { hash, size, key })
643                            })));
644                            Poll::Ready(Some(ImportFileProgressItem::Blobs(AddProgressItem::Done(
645                                tag,
646                            ))))
647                        }
648                        item => Poll::Ready(Some(ImportFileProgressItem::Blobs(item))),
649                    },
650                    None => todo!(),
651                }
652            }
653            ImportInner::Entry(ref mut fut) => {
654                let res = ready!(fut.poll(cx));
655                *this = Self(ImportInner::Done);
656                match res {
657                    Ok(outcome) => Poll::Ready(Some(ImportFileProgressItem::Done(outcome))),
658                    Err(err) => Poll::Ready(Some(ImportFileProgressItem::Error(err))),
659                }
660            }
661            ImportInner::Done => Poll::Ready(None),
662        }
663    }
664}
665
666impl Future for ImportFileProgress {
667    type Output = Result<ImportFileOutcome>;
668    fn poll(mut self: Pin<&mut Self>, cx: &mut std::task::Context<'_>) -> Poll<Self::Output> {
669        loop {
670            match self.as_mut().poll_next(cx) {
671                Poll::Ready(Some(item)) => match item {
672                    ImportFileProgressItem::Error(error) => return Poll::Ready(Err(error)),
673                    ImportFileProgressItem::Blobs(_add_progress_item) => continue,
674                    ImportFileProgressItem::Done(outcome) => return Poll::Ready(Ok(outcome)),
675                },
676                Poll::Ready(None) => {
677                    return Poll::Ready(Err(anyhow::anyhow!(
678                        "ImportFileProgress polled after completion"
679                    )))
680                }
681                Poll::Pending => return Poll::Pending,
682            }
683        }
684    }
685}
686
687/// Outcome of a [`Doc::import_file`] operation
688#[derive(Debug, Clone, PartialEq, Eq)]
689pub struct ImportFileOutcome {
690    /// The hash of the entry's content
691    pub hash: Hash,
692    /// The size of the entry
693    pub size: u64,
694    /// The key of the entry
695    pub key: Bytes,
696}