iroh_docs/rpc/client/
docs.rs

1//! API for document management.
2//!
3//! The main entry point is the [`Client`].
4use std::{
5    path::{Path, PathBuf},
6    pin::Pin,
7    sync::Arc,
8    task::{Context, Poll},
9};
10
11use anyhow::{anyhow, Context as _, Result};
12use bytes::Bytes;
13use derive_more::{Display, FromStr};
14use futures_lite::{Stream, StreamExt};
15use iroh::NodeAddr;
16use iroh_blobs::{export::ExportProgress, store::ExportMode, Hash};
17use portable_atomic::{AtomicBool, Ordering};
18use quic_rpc::{
19    client::BoxedConnector, message::RpcMsg, transport::flume::FlumeConnector, Connector,
20};
21use serde::{Deserialize, Serialize};
22
23use super::{authors, flatten};
24use crate::{
25    actor::OpenState,
26    rpc::{
27        proto::{
28            CloseRequest, CreateRequest, DelRequest, DelResponse, DocListRequest,
29            DocSubscribeRequest, DropRequest, ExportFileRequest, GetDownloadPolicyRequest,
30            GetExactRequest, GetManyRequest, GetSyncPeersRequest, ImportFileRequest, ImportRequest,
31            LeaveRequest, OpenRequest, RpcService, SetDownloadPolicyRequest, SetHashRequest,
32            SetRequest, ShareRequest, StartSyncRequest, StatusRequest,
33        },
34        AddrInfoOptions,
35    },
36    store::{DownloadPolicy, Query},
37    AuthorId, Capability, CapabilityKind, DocTicket, NamespaceId, PeerIdBytes,
38};
39#[doc(inline)]
40pub use crate::{
41    engine::{LiveEvent, Origin, SyncEvent, SyncReason},
42    Entry,
43};
44
45/// Type alias for a memory-backed client.
46pub type MemClient =
47    Client<FlumeConnector<crate::rpc::proto::Response, crate::rpc::proto::Request>>;
48
49/// Iroh docs client.
50#[derive(Debug, Clone)]
51#[repr(transparent)]
52pub struct Client<C = BoxedConnector<RpcService>> {
53    pub(super) rpc: quic_rpc::RpcClient<RpcService, C>,
54}
55
56impl<C: Connector<RpcService>> Client<C> {
57    /// Creates a new docs client.
58    pub fn new(rpc: quic_rpc::RpcClient<RpcService, C>) -> Self {
59        Self { rpc }
60    }
61
62    /// Returns an authors client.
63    pub fn authors(&self) -> authors::Client<C> {
64        authors::Client::new(self.rpc.clone())
65    }
66
67    /// Creates a client.
68    pub async fn create(&self) -> Result<Doc<C>> {
69        let res = self.rpc.rpc(CreateRequest {}).await??;
70        let doc = Doc::new(self.rpc.clone(), res.id);
71        Ok(doc)
72    }
73
74    /// Deletes a document from the local node.
75    ///
76    /// This is a destructive operation. Both the document secret key and all entries in the
77    /// document will be permanently deleted from the node's storage. Content blobs will be deleted
78    /// through garbage collection unless they are referenced from another document or tag.
79    pub async fn drop_doc(&self, doc_id: NamespaceId) -> Result<()> {
80        self.rpc.rpc(DropRequest { doc_id }).await??;
81        Ok(())
82    }
83
84    /// Imports a document from a namespace capability.
85    ///
86    /// This does not start sync automatically. Use [`Doc::start_sync`] to start sync.
87    pub async fn import_namespace(&self, capability: Capability) -> Result<Doc<C>> {
88        let res = self.rpc.rpc(ImportRequest { capability }).await??;
89        let doc = Doc::new(self.rpc.clone(), res.doc_id);
90        Ok(doc)
91    }
92
93    /// Imports a document from a ticket and joins all peers in the ticket.
94    pub async fn import(&self, ticket: DocTicket) -> Result<Doc<C>> {
95        let DocTicket { capability, nodes } = ticket;
96        let doc = self.import_namespace(capability).await?;
97        doc.start_sync(nodes).await?;
98        Ok(doc)
99    }
100
101    /// Imports a document from a ticket, creates a subscription stream and joins all peers in the ticket.
102    ///
103    /// Returns the [`Doc`] and a [`Stream`] of [`LiveEvent`]s.
104    ///
105    /// The subscription stream is created before the sync is started, so the first call to this
106    /// method after starting the node is guaranteed to not miss any sync events.
107    pub async fn import_and_subscribe(
108        &self,
109        ticket: DocTicket,
110    ) -> Result<(Doc<C>, impl Stream<Item = anyhow::Result<LiveEvent>>)> {
111        let DocTicket { capability, nodes } = ticket;
112        let res = self.rpc.rpc(ImportRequest { capability }).await??;
113        let doc = Doc::new(self.rpc.clone(), res.doc_id);
114        let events = doc.subscribe().await?;
115        doc.start_sync(nodes).await?;
116        Ok((doc, events))
117    }
118
119    /// Lists all documents.
120    pub async fn list(&self) -> Result<impl Stream<Item = Result<(NamespaceId, CapabilityKind)>>> {
121        let stream = self.rpc.server_streaming(DocListRequest {}).await?;
122        Ok(flatten(stream).map(|res| res.map(|res| (res.id, res.capability))))
123    }
124
125    /// Returns a [`Doc`] client for a single document.
126    ///
127    /// Returns None if the document cannot be found.
128    pub async fn open(&self, id: NamespaceId) -> Result<Option<Doc<C>>> {
129        self.rpc.rpc(OpenRequest { doc_id: id }).await??;
130        let doc = Doc::new(self.rpc.clone(), id);
131        Ok(Some(doc))
132    }
133}
134
135/// Document handle
136#[derive(Debug, Clone)]
137pub struct Doc<C: Connector<RpcService> = BoxedConnector<RpcService>>(Arc<DocInner<C>>)
138where
139    C: quic_rpc::Connector<RpcService>;
140
141impl<C: Connector<RpcService>> PartialEq for Doc<C> {
142    fn eq(&self, other: &Self) -> bool {
143        self.0.id == other.0.id
144    }
145}
146
147impl<C: Connector<RpcService>> Eq for Doc<C> {}
148
149#[derive(Debug)]
150struct DocInner<C: Connector<RpcService> = BoxedConnector<RpcService>> {
151    id: NamespaceId,
152    rpc: quic_rpc::RpcClient<RpcService, C>,
153    closed: AtomicBool,
154    rt: tokio::runtime::Handle,
155}
156
157impl<C> Drop for DocInner<C>
158where
159    C: quic_rpc::Connector<RpcService>,
160{
161    fn drop(&mut self) {
162        let doc_id = self.id;
163        let rpc = self.rpc.clone();
164        if !self.closed.swap(true, Ordering::Relaxed) {
165            self.rt.spawn(async move {
166                rpc.rpc(CloseRequest { doc_id }).await.ok();
167            });
168        }
169    }
170}
171
172impl<C: Connector<RpcService>> Doc<C> {
173    fn new(rpc: quic_rpc::RpcClient<RpcService, C>, id: NamespaceId) -> Self {
174        Self(Arc::new(DocInner {
175            rpc,
176            id,
177            closed: AtomicBool::new(false),
178            rt: tokio::runtime::Handle::current(),
179        }))
180    }
181
182    async fn rpc<M>(&self, msg: M) -> Result<M::Response>
183    where
184        M: RpcMsg<RpcService>,
185    {
186        let res = self.0.rpc.rpc(msg).await?;
187        Ok(res)
188    }
189
190    /// Returns the document id of this doc.
191    pub fn id(&self) -> NamespaceId {
192        self.0.id
193    }
194
195    /// Closes the document.
196    pub async fn close(&self) -> Result<()> {
197        if !self.0.closed.swap(true, Ordering::Relaxed) {
198            self.rpc(CloseRequest { doc_id: self.id() }).await??;
199        }
200        Ok(())
201    }
202
203    fn ensure_open(&self) -> Result<()> {
204        if self.0.closed.load(Ordering::Relaxed) {
205            Err(anyhow!("document is closed"))
206        } else {
207            Ok(())
208        }
209    }
210
211    /// Sets the content of a key to a byte array.
212    pub async fn set_bytes(
213        &self,
214        author_id: AuthorId,
215        key: impl Into<Bytes>,
216        value: impl Into<Bytes>,
217    ) -> Result<Hash> {
218        self.ensure_open()?;
219        let res = self
220            .rpc(SetRequest {
221                doc_id: self.id(),
222                author_id,
223                key: key.into(),
224                value: value.into(),
225            })
226            .await??;
227        Ok(res.entry.content_hash())
228    }
229
230    /// Sets an entries on the doc via its key, hash, and size.
231    pub async fn set_hash(
232        &self,
233        author_id: AuthorId,
234        key: impl Into<Bytes>,
235        hash: Hash,
236        size: u64,
237    ) -> Result<()> {
238        self.ensure_open()?;
239        self.rpc(SetHashRequest {
240            doc_id: self.id(),
241            author_id,
242            key: key.into(),
243            hash,
244            size,
245        })
246        .await??;
247        Ok(())
248    }
249
250    /// Adds an entry from an absolute file path
251    pub async fn import_file(
252        &self,
253        author: AuthorId,
254        key: Bytes,
255        path: impl AsRef<Path>,
256        in_place: bool,
257    ) -> Result<ImportFileProgress> {
258        self.ensure_open()?;
259        let stream = self
260            .0
261            .rpc
262            .server_streaming(ImportFileRequest {
263                doc_id: self.id(),
264                author_id: author,
265                path: path.as_ref().into(),
266                key,
267                in_place,
268            })
269            .await?;
270        Ok(ImportFileProgress::new(stream))
271    }
272
273    /// Exports an entry as a file to a given absolute path.
274    pub async fn export_file(
275        &self,
276        entry: Entry,
277        path: impl AsRef<Path>,
278        mode: ExportMode,
279    ) -> Result<ExportFileProgress> {
280        self.ensure_open()?;
281        let stream = self
282            .0
283            .rpc
284            .server_streaming(ExportFileRequest {
285                entry,
286                path: path.as_ref().into(),
287                mode,
288            })
289            .await?;
290        Ok(ExportFileProgress::new(stream))
291    }
292
293    /// Deletes entries that match the given `author` and key `prefix`.
294    ///
295    /// This inserts an empty entry with the key set to `prefix`, effectively clearing all other
296    /// entries whose key starts with or is equal to the given `prefix`.
297    ///
298    /// Returns the number of entries deleted.
299    pub async fn del(&self, author_id: AuthorId, prefix: impl Into<Bytes>) -> Result<usize> {
300        self.ensure_open()?;
301        let res = self
302            .rpc(DelRequest {
303                doc_id: self.id(),
304                author_id,
305                prefix: prefix.into(),
306            })
307            .await??;
308        let DelResponse { removed } = res;
309        Ok(removed)
310    }
311
312    /// Returns an entry for a key and author.
313    ///
314    /// Optionally also returns the entry unless it is empty (i.e. a deletion marker).
315    pub async fn get_exact(
316        &self,
317        author: AuthorId,
318        key: impl AsRef<[u8]>,
319        include_empty: bool,
320    ) -> Result<Option<Entry>> {
321        self.ensure_open()?;
322        let res = self
323            .rpc(GetExactRequest {
324                author,
325                key: key.as_ref().to_vec().into(),
326                doc_id: self.id(),
327                include_empty,
328            })
329            .await??;
330        Ok(res.entry.map(|entry| entry.into()))
331    }
332
333    /// Returns all entries matching the query.
334    pub async fn get_many(
335        &self,
336        query: impl Into<Query>,
337    ) -> Result<impl Stream<Item = Result<Entry>>> {
338        self.ensure_open()?;
339        let stream = self
340            .0
341            .rpc
342            .server_streaming(GetManyRequest {
343                doc_id: self.id(),
344                query: query.into(),
345            })
346            .await?;
347        Ok(flatten(stream).map(|res| res.map(|res| res.entry.into())))
348    }
349
350    /// Returns a single entry.
351    pub async fn get_one(&self, query: impl Into<Query>) -> Result<Option<Entry>> {
352        self.get_many(query).await?.next().await.transpose()
353    }
354
355    /// Shares this document with peers over a ticket.
356    pub async fn share(
357        &self,
358        mode: ShareMode,
359        addr_options: AddrInfoOptions,
360    ) -> anyhow::Result<DocTicket> {
361        self.ensure_open()?;
362        let res = self
363            .rpc(ShareRequest {
364                doc_id: self.id(),
365                mode,
366                addr_options,
367            })
368            .await??;
369        Ok(res.0)
370    }
371
372    /// Starts to sync this document with a list of peers.
373    pub async fn start_sync(&self, peers: Vec<NodeAddr>) -> Result<()> {
374        self.ensure_open()?;
375        let _res = self
376            .rpc(StartSyncRequest {
377                doc_id: self.id(),
378                peers,
379            })
380            .await??;
381        Ok(())
382    }
383
384    /// Stops the live sync for this document.
385    pub async fn leave(&self) -> Result<()> {
386        self.ensure_open()?;
387        let _res = self.rpc(LeaveRequest { doc_id: self.id() }).await??;
388        Ok(())
389    }
390
391    /// Subscribes to events for this document.
392    pub async fn subscribe(&self) -> anyhow::Result<impl Stream<Item = anyhow::Result<LiveEvent>>> {
393        self.ensure_open()?;
394        let stream = self
395            .0
396            .rpc
397            .try_server_streaming(DocSubscribeRequest { doc_id: self.id() })
398            .await?;
399        Ok(stream.map(|res| match res {
400            Ok(res) => Ok(res.event),
401            Err(err) => Err(err.into()),
402        }))
403    }
404
405    /// Returns status info for this document
406    pub async fn status(&self) -> anyhow::Result<OpenState> {
407        self.ensure_open()?;
408        let res = self.rpc(StatusRequest { doc_id: self.id() }).await??;
409        Ok(res.status)
410    }
411
412    /// Sets the download policy for this document
413    pub async fn set_download_policy(&self, policy: DownloadPolicy) -> Result<()> {
414        self.rpc(SetDownloadPolicyRequest {
415            doc_id: self.id(),
416            policy,
417        })
418        .await??;
419        Ok(())
420    }
421
422    /// Returns the download policy for this document
423    pub async fn get_download_policy(&self) -> Result<DownloadPolicy> {
424        let res = self
425            .rpc(GetDownloadPolicyRequest { doc_id: self.id() })
426            .await??;
427        Ok(res.policy)
428    }
429
430    /// Returns sync peers for this document
431    pub async fn get_sync_peers(&self) -> Result<Option<Vec<PeerIdBytes>>> {
432        let res = self
433            .rpc(GetSyncPeersRequest { doc_id: self.id() })
434            .await??;
435        Ok(res.peers)
436    }
437}
438
439impl<'a, C> From<&'a Doc<C>> for &'a quic_rpc::RpcClient<RpcService, C>
440where
441    C: quic_rpc::Connector<RpcService>,
442{
443    fn from(doc: &'a Doc<C>) -> &'a quic_rpc::RpcClient<RpcService, C> {
444        &doc.0.rpc
445    }
446}
447
448/// Progress messages for an doc import operation
449///
450/// An import operation involves computing the outboard of a file, and then
451/// either copying or moving the file into the database, then setting the author, hash, size, and tag of that
452/// file as an entry in the doc.
453#[derive(Debug, Serialize, Deserialize)]
454pub enum ImportProgress {
455    /// An item was found with name `name`, from now on referred to via `id`.
456    Found {
457        /// A new unique id for this entry.
458        id: u64,
459        /// The name of the entry.
460        name: String,
461        /// The size of the entry in bytes.
462        size: u64,
463    },
464    /// We got progress ingesting item `id`.
465    Progress {
466        /// The unique id of the entry.
467        id: u64,
468        /// The offset of the progress, in bytes.
469        offset: u64,
470    },
471    /// We are done adding `id` to the data store and the hash is `hash`.
472    IngestDone {
473        /// The unique id of the entry.
474        id: u64,
475        /// The hash of the entry.
476        hash: Hash,
477    },
478    /// We are done setting the entry to the doc.
479    AllDone {
480        /// The key of the entry
481        key: Bytes,
482    },
483    /// We got an error and need to abort.
484    ///
485    /// This will be the last message in the stream.
486    Abort(serde_error::Error),
487}
488
489/// Intended capability for document share tickets
490#[derive(Serialize, Deserialize, Debug, Clone, Display, FromStr)]
491pub enum ShareMode {
492    /// Read-only access
493    Read,
494    /// Write access
495    Write,
496}
497/// Progress stream for [`Doc::import_file`].
498#[derive(derive_more::Debug)]
499#[must_use = "streams do nothing unless polled"]
500pub struct ImportFileProgress {
501    #[debug(skip)]
502    stream: Pin<Box<dyn Stream<Item = Result<ImportProgress>> + Send + Unpin + 'static>>,
503}
504
505impl ImportFileProgress {
506    fn new(
507        stream: (impl Stream<Item = Result<impl Into<ImportProgress>, impl Into<anyhow::Error>>>
508             + Send
509             + Unpin
510             + 'static),
511    ) -> Self {
512        let stream = stream.map(|item| match item {
513            Ok(item) => Ok(item.into()),
514            Err(err) => Err(err.into()),
515        });
516        Self {
517            stream: Box::pin(stream),
518        }
519    }
520
521    /// Finishes writing the stream, ignoring all intermediate progress events.
522    ///
523    /// Returns a [`ImportFileOutcome`] which contains a tag, key, and hash and the size of the
524    /// content.
525    pub async fn finish(mut self) -> Result<ImportFileOutcome> {
526        let mut entry_size = 0;
527        let mut entry_hash = None;
528        while let Some(msg) = self.next().await {
529            match msg? {
530                ImportProgress::Found { size, .. } => {
531                    entry_size = size;
532                }
533                ImportProgress::AllDone { key } => {
534                    let hash = entry_hash
535                        .context("expected DocImportProgress::IngestDone event to occur")?;
536                    let outcome = ImportFileOutcome {
537                        hash,
538                        key,
539                        size: entry_size,
540                    };
541                    return Ok(outcome);
542                }
543                ImportProgress::Abort(err) => return Err(err.into()),
544                ImportProgress::Progress { .. } => {}
545                ImportProgress::IngestDone { hash, .. } => {
546                    entry_hash = Some(hash);
547                }
548            }
549        }
550        Err(anyhow!("Response stream ended prematurely"))
551    }
552}
553
554/// Outcome of a [`Doc::import_file`] operation
555#[derive(Debug, Clone, PartialEq, Eq)]
556pub struct ImportFileOutcome {
557    /// The hash of the entry's content
558    pub hash: Hash,
559    /// The size of the entry
560    pub size: u64,
561    /// The key of the entry
562    pub key: Bytes,
563}
564
565impl Stream for ImportFileProgress {
566    type Item = Result<ImportProgress>;
567    fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
568        Pin::new(&mut self.stream).poll_next(cx)
569    }
570}
571
572/// Progress stream for [`Doc::export_file`].
573#[derive(derive_more::Debug)]
574pub struct ExportFileProgress {
575    #[debug(skip)]
576    stream: Pin<Box<dyn Stream<Item = Result<ExportProgress>> + Send + Unpin + 'static>>,
577}
578impl ExportFileProgress {
579    fn new(
580        stream: (impl Stream<Item = Result<impl Into<ExportProgress>, impl Into<anyhow::Error>>>
581             + Send
582             + Unpin
583             + 'static),
584    ) -> Self {
585        let stream = stream.map(|item| match item {
586            Ok(item) => Ok(item.into()),
587            Err(err) => Err(err.into()),
588        });
589        Self {
590            stream: Box::pin(stream),
591        }
592    }
593
594    /// Iterates through the export progress stream, returning when the stream has completed.
595    ///
596    /// Returns a [`ExportFileOutcome`] which contains a file path the data was written to and the size of the content.
597    pub async fn finish(mut self) -> Result<ExportFileOutcome> {
598        let mut total_size = 0;
599        let mut path = None;
600        while let Some(msg) = self.next().await {
601            match msg? {
602                ExportProgress::Found { size, outpath, .. } => {
603                    total_size = size.value();
604                    path = Some(outpath);
605                }
606                ExportProgress::AllDone => {
607                    let path = path.context("expected ExportProgress::Found event to occur")?;
608                    let outcome = ExportFileOutcome {
609                        size: total_size,
610                        path,
611                    };
612                    return Ok(outcome);
613                }
614                ExportProgress::Done { .. } => {}
615                ExportProgress::Abort(err) => return Err(anyhow!(err)),
616                ExportProgress::Progress { .. } => {}
617            }
618        }
619        Err(anyhow!("Response stream ended prematurely"))
620    }
621}
622
623/// Outcome of a [`Doc::export_file`] operation
624#[derive(Debug, Clone, PartialEq, Eq)]
625pub struct ExportFileOutcome {
626    /// The size of the entry
627    pub size: u64,
628    /// The path to which the entry was saved
629    pub path: PathBuf,
630}
631
632impl Stream for ExportFileProgress {
633    type Item = Result<ExportProgress>;
634
635    fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
636        Pin::new(&mut self.stream).poll_next(cx)
637    }
638}