1#![allow(missing_docs)]
4
5use std::{
6 future::Future,
7 net::SocketAddr,
8 path::Path,
9 pin::Pin,
10 sync::{
11 atomic::{AtomicBool, Ordering},
12 Arc,
13 },
14 task::{ready, Poll},
15};
16
17use anyhow::{Context, Result};
18use bytes::Bytes;
19use iroh::NodeAddr;
20use iroh_blobs::{
21 api::blobs::{AddPathOptions, AddProgressItem, ExportMode, ExportOptions, ExportProgress},
22 Hash,
23};
24use irpc::rpc::Handler;
25use n0_future::{
26 task::{self, AbortOnDropHandle},
27 FutureExt, Stream, StreamExt,
28};
29
30use self::{
31 actor::RpcActor,
32 protocol::{
33 AddrInfoOptions, AuthorCreateRequest, AuthorDeleteRequest, AuthorExportRequest,
34 AuthorGetDefaultRequest, AuthorImportRequest, AuthorListRequest, AuthorSetDefaultRequest,
35 CloseRequest, CreateRequest, DelRequest, DocsProtocol, DropRequest,
36 GetDownloadPolicyRequest, GetExactRequest, GetManyRequest, GetSyncPeersRequest,
37 ImportRequest, LeaveRequest, ListRequest, OpenRequest, SetDownloadPolicyRequest,
38 SetHashRequest, SetRequest, ShareMode, ShareRequest, StartSyncRequest, StatusRequest,
39 SubscribeRequest,
40 },
41};
42use crate::{
43 actor::OpenState,
44 engine::{Engine, LiveEvent},
45 store::{DownloadPolicy, Query},
46 Author, AuthorId, Capability, CapabilityKind, DocTicket, Entry, NamespaceId, PeerIdBytes,
47};
48
49pub(crate) mod actor;
50pub mod protocol;
51
52pub type RpcError = serde_error::Error;
53pub type RpcResult<T> = std::result::Result<T, RpcError>;
54
55type Client = irpc::Client<DocsProtocol>;
56
57#[derive(Debug, Clone)]
59pub struct DocsApi {
60 pub(crate) inner: Client,
61}
62
63impl DocsApi {
64 pub fn spawn(engine: Arc<Engine>) -> Self {
66 RpcActor::spawn(engine)
67 }
68
69 pub fn connect(endpoint: quinn::Endpoint, addr: SocketAddr) -> Result<DocsApi> {
71 Ok(DocsApi {
72 inner: Client::quinn(endpoint, addr),
73 })
74 }
75
76 pub fn listen(&self, endpoint: quinn::Endpoint) -> Result<AbortOnDropHandle<()>> {
78 let local = self
79 .inner
80 .as_local()
81 .context("cannot listen on remote API")?;
82 let handler: Handler<DocsProtocol> = Arc::new(move |msg, _rx, tx| {
83 let local = local.clone();
84 Box::pin(async move {
85 match msg {
86 DocsProtocol::Open(msg) => local.send((msg, tx)).await,
87 DocsProtocol::Close(msg) => local.send((msg, tx)).await,
88 DocsProtocol::Status(msg) => local.send((msg, tx)).await,
89 DocsProtocol::List(msg) => local.send((msg, tx)).await,
90 DocsProtocol::Create(msg) => local.send((msg, tx)).await,
91 DocsProtocol::Drop(msg) => local.send((msg, tx)).await,
92 DocsProtocol::Import(msg) => local.send((msg, tx)).await,
93 DocsProtocol::Set(msg) => local.send((msg, tx)).await,
94 DocsProtocol::SetHash(msg) => local.send((msg, tx)).await,
95 DocsProtocol::Get(msg) => local.send((msg, tx)).await,
96 DocsProtocol::GetExact(msg) => local.send((msg, tx)).await,
97 DocsProtocol::Del(msg) => local.send((msg, tx)).await,
100 DocsProtocol::StartSync(msg) => local.send((msg, tx)).await,
101 DocsProtocol::Leave(msg) => local.send((msg, tx)).await,
102 DocsProtocol::Share(msg) => local.send((msg, tx)).await,
103 DocsProtocol::Subscribe(msg) => local.send((msg, tx)).await,
104 DocsProtocol::GetDownloadPolicy(msg) => local.send((msg, tx)).await,
105 DocsProtocol::SetDownloadPolicy(msg) => local.send((msg, tx)).await,
106 DocsProtocol::GetSyncPeers(msg) => local.send((msg, tx)).await,
107 DocsProtocol::AuthorList(msg) => local.send((msg, tx)).await,
108 DocsProtocol::AuthorCreate(msg) => local.send((msg, tx)).await,
109 DocsProtocol::AuthorGetDefault(msg) => local.send((msg, tx)).await,
110 DocsProtocol::AuthorSetDefault(msg) => local.send((msg, tx)).await,
111 DocsProtocol::AuthorImport(msg) => local.send((msg, tx)).await,
112 DocsProtocol::AuthorExport(msg) => local.send((msg, tx)).await,
113 DocsProtocol::AuthorDelete(msg) => local.send((msg, tx)).await,
114 }
115 })
116 });
117 let join_handle = task::spawn(irpc::rpc::listen(endpoint, handler));
118 Ok(AbortOnDropHandle::new(join_handle))
119 }
120
121 pub async fn author_create(&self) -> Result<AuthorId> {
128 let response = self.inner.rpc(AuthorCreateRequest).await??;
129 Ok(response.author_id)
130 }
131
132 pub async fn author_default(&self) -> Result<AuthorId> {
139 let response = self.inner.rpc(AuthorGetDefaultRequest).await??;
140 Ok(response.author_id)
141 }
142
143 pub async fn author_set_default(&self, author_id: AuthorId) -> Result<()> {
150 self.inner
151 .rpc(AuthorSetDefaultRequest { author_id })
152 .await??;
153 Ok(())
154 }
155
156 pub async fn author_list(&self) -> Result<impl Stream<Item = Result<AuthorId>>> {
160 let stream = self.inner.server_streaming(AuthorListRequest, 64).await?;
161 Ok(stream.into_stream().map(|res| match res {
162 Err(err) => Err(err.into()),
163 Ok(Err(err)) => Err(err.into()),
164 Ok(Ok(res)) => Ok(res.author_id),
165 }))
166 }
167
168 pub async fn author_export(&self, author: AuthorId) -> Result<Option<Author>> {
172 let response = self.inner.rpc(AuthorExportRequest { author }).await??;
173 Ok(response.author)
174 }
175
176 pub async fn author_import(&self, author: Author) -> Result<()> {
180 self.inner.rpc(AuthorImportRequest { author }).await??;
181 Ok(())
182 }
183
184 pub async fn author_delete(&self, author: AuthorId) -> Result<()> {
190 self.inner.rpc(AuthorDeleteRequest { author }).await??;
191 Ok(())
192 }
193
194 pub async fn create(&self) -> Result<Doc> {
196 let response = self.inner.rpc(CreateRequest).await??;
197 Ok(Doc::new(self.inner.clone(), response.id))
198 }
199
200 pub async fn drop_doc(&self, doc_id: NamespaceId) -> Result<()> {
206 self.inner.rpc(DropRequest { doc_id }).await??;
207 Ok(())
208 }
209
210 pub async fn import_namespace(&self, capability: Capability) -> Result<Doc> {
214 let response = self.inner.rpc(ImportRequest { capability }).await??;
215 Ok(Doc::new(self.inner.clone(), response.doc_id))
216 }
217
218 pub async fn import(&self, ticket: DocTicket) -> Result<Doc> {
220 let DocTicket { capability, nodes } = ticket;
221 let doc = self.import_namespace(capability).await?;
222 doc.start_sync(nodes).await?;
223 Ok(doc)
224 }
225
226 pub async fn import_and_subscribe(
233 &self,
234 ticket: DocTicket,
235 ) -> Result<(Doc, impl Stream<Item = Result<LiveEvent>>)> {
236 let DocTicket { capability, nodes } = ticket;
237 let response = self.inner.rpc(ImportRequest { capability }).await??;
238 let doc = Doc::new(self.inner.clone(), response.doc_id);
239 let events = doc.subscribe().await?;
240 doc.start_sync(nodes).await?;
241 Ok((doc, events))
242 }
243
244 pub async fn list(
246 &self,
247 ) -> Result<impl Stream<Item = Result<(NamespaceId, CapabilityKind)>> + Unpin + Send + 'static>
248 {
249 let stream = self.inner.server_streaming(ListRequest, 64).await?;
250 let stream = Box::pin(stream.into_stream());
251 Ok(stream.map(|res| match res {
252 Err(err) => Err(err.into()),
253 Ok(Err(err)) => Err(err.into()),
254 Ok(Ok(res)) => Ok((res.id, res.capability)),
255 }))
256 }
257
258 pub async fn open(&self, id: NamespaceId) -> Result<Option<Doc>> {
262 self.inner.rpc(OpenRequest { doc_id: id }).await??;
263 Ok(Some(Doc::new(self.inner.clone(), id)))
264 }
265}
266
267#[derive(Debug, Clone)]
269pub struct Doc {
270 inner: Client,
271 namespace_id: NamespaceId,
272 closed: Arc<AtomicBool>,
273}
274
275impl Doc {
276 fn new(inner: Client, namespace_id: NamespaceId) -> Self {
277 Self {
278 inner,
279 namespace_id,
280 closed: Default::default(),
281 }
282 }
283
284 pub fn id(&self) -> NamespaceId {
286 self.namespace_id
287 }
288
289 pub async fn close(&self) -> Result<()> {
291 self.closed.store(true, Ordering::Relaxed);
292 self.inner
293 .rpc(CloseRequest {
294 doc_id: self.namespace_id,
295 })
296 .await??;
297 Ok(())
298 }
299
300 fn ensure_open(&self) -> Result<()> {
301 if self.closed.load(Ordering::Relaxed) {
302 Err(anyhow::anyhow!("document is closed"))
303 } else {
304 Ok(())
305 }
306 }
307
308 pub async fn set_bytes(
310 &self,
311 author_id: AuthorId,
312 key: impl Into<Bytes>,
313 value: impl Into<Bytes>,
314 ) -> Result<Hash> {
315 self.ensure_open()?;
316 let response = self
317 .inner
318 .rpc(SetRequest {
319 doc_id: self.namespace_id,
320 author_id,
321 key: key.into(),
322 value: value.into(),
323 })
324 .await??;
325 Ok(response.entry.content_hash())
326 }
327
328 pub async fn set_hash(
330 &self,
331 author_id: AuthorId,
332 key: impl Into<Bytes>,
333 hash: Hash,
334 size: u64,
335 ) -> Result<()> {
336 self.ensure_open()?;
337 self.inner
338 .rpc(SetHashRequest {
339 doc_id: self.namespace_id,
340 author_id,
341 key: key.into(),
342 hash,
343 size,
344 })
345 .await??;
346 Ok(())
347 }
348
349 pub async fn del(&self, author_id: AuthorId, prefix: impl Into<Bytes>) -> Result<usize> {
356 self.ensure_open()?;
357 let response = self
358 .inner
359 .rpc(DelRequest {
360 doc_id: self.namespace_id,
361 author_id,
362 prefix: prefix.into(),
363 })
364 .await??;
365 Ok(response.removed)
366 }
367
368 pub async fn get_exact(
372 &self,
373 author: AuthorId,
374 key: impl AsRef<[u8]>,
375 include_empty: bool,
376 ) -> Result<Option<Entry>> {
377 self.ensure_open()?;
378 let response = self
379 .inner
380 .rpc(GetExactRequest {
381 author,
382 key: key.as_ref().to_vec().into(),
383 doc_id: self.namespace_id,
384 include_empty,
385 })
386 .await??;
387 Ok(response.entry.map(|entry| entry.into()))
388 }
389
390 pub async fn get_many(
392 &self,
393 query: impl Into<Query>,
394 ) -> Result<impl Stream<Item = Result<Entry>>> {
395 self.ensure_open()?;
396 let stream = self
397 .inner
398 .server_streaming(
399 GetManyRequest {
400 doc_id: self.namespace_id,
401 query: query.into(),
402 },
403 64,
404 )
405 .await?;
406 Ok(stream.into_stream().map(|res| match res {
407 Err(err) => Err(err.into()),
408 Ok(Err(err)) => Err(err.into()),
409 Ok(Ok(res)) => Ok(res.into()),
410 }))
411 }
412
413 pub async fn get_one(&self, query: impl Into<Query>) -> Result<Option<Entry>> {
415 self.ensure_open()?;
416 let stream = self.get_many(query).await?;
417 tokio::pin!(stream);
418 futures_lite::StreamExt::next(&mut stream).await.transpose()
419 }
420
421 pub async fn share(&self, mode: ShareMode, addr_options: AddrInfoOptions) -> Result<DocTicket> {
423 self.ensure_open()?;
424 let response = self
425 .inner
426 .rpc(ShareRequest {
427 doc_id: self.namespace_id,
428 mode,
429 addr_options,
430 })
431 .await??;
432 Ok(response.0)
433 }
434
435 pub async fn start_sync(&self, peers: Vec<NodeAddr>) -> Result<()> {
437 self.ensure_open()?;
438 self.inner
439 .rpc(StartSyncRequest {
440 doc_id: self.namespace_id,
441 peers,
442 })
443 .await??;
444 Ok(())
445 }
446
447 pub async fn leave(&self) -> Result<()> {
449 self.ensure_open()?;
450 self.inner
451 .rpc(LeaveRequest {
452 doc_id: self.namespace_id,
453 })
454 .await??;
455 Ok(())
456 }
457
458 pub async fn subscribe(
460 &self,
461 ) -> Result<impl Stream<Item = Result<LiveEvent>> + Send + Unpin + 'static> {
462 self.ensure_open()?;
463 let stream = self
464 .inner
465 .server_streaming(
466 SubscribeRequest {
467 doc_id: self.namespace_id,
468 },
469 64,
470 )
471 .await?;
472 Ok(Box::pin(stream.into_stream().map(|res| match res {
473 Err(err) => Err(err.into()),
474 Ok(Err(err)) => Err(err.into()),
475 Ok(Ok(res)) => Ok(res.event),
476 })))
477 }
478
479 pub async fn status(&self) -> Result<OpenState> {
481 self.ensure_open()?;
482 let response = self
483 .inner
484 .rpc(StatusRequest {
485 doc_id: self.namespace_id,
486 })
487 .await??;
488 Ok(response.status)
489 }
490
491 pub async fn set_download_policy(&self, policy: DownloadPolicy) -> Result<()> {
493 self.ensure_open()?;
494 self.inner
495 .rpc(SetDownloadPolicyRequest {
496 doc_id: self.namespace_id,
497 policy,
498 })
499 .await??;
500 Ok(())
501 }
502
503 pub async fn get_download_policy(&self) -> Result<DownloadPolicy> {
505 self.ensure_open()?;
506 let response = self
507 .inner
508 .rpc(GetDownloadPolicyRequest {
509 doc_id: self.namespace_id,
510 })
511 .await??;
512 Ok(response.policy)
513 }
514
515 pub async fn get_sync_peers(&self) -> Result<Option<Vec<PeerIdBytes>>> {
517 self.ensure_open()?;
518 let response = self
519 .inner
520 .rpc(GetSyncPeersRequest {
521 doc_id: self.namespace_id,
522 })
523 .await??;
524 Ok(response.peers)
525 }
526
527 pub async fn import_file(
529 &self,
530 blobs: &iroh_blobs::api::Store,
531 author: AuthorId,
532 key: Bytes,
533 path: impl AsRef<Path>,
534 import_mode: iroh_blobs::api::blobs::ImportMode,
535 ) -> Result<ImportFileProgress> {
536 self.ensure_open()?;
537 let progress = blobs.add_path_with_opts(AddPathOptions {
538 path: path.as_ref().to_owned(),
539 format: iroh_blobs::BlobFormat::Raw,
540 mode: import_mode,
541 });
542 let stream = progress.stream().await;
543 let doc = self.clone();
544 let ctx = EntryContext {
545 doc,
546 author,
547 key,
548 size: None,
549 };
550 Ok(ImportFileProgress(ImportInner::Blobs(
551 Box::pin(stream),
552 Some(ctx),
553 )))
554 }
555
556 pub async fn export_file(
558 &self,
559 blobs: &iroh_blobs::api::Store,
560 entry: Entry,
561 path: impl AsRef<Path>,
562 mode: ExportMode,
563 ) -> Result<ExportProgress> {
564 self.ensure_open()?;
565 let hash = entry.content_hash();
566 let progress = blobs.export_with_opts(ExportOptions {
567 hash,
568 mode,
569 target: path.as_ref().to_path_buf(),
570 });
571 Ok(progress)
572 }
573}
574
575#[derive(Debug)]
576pub enum ImportFileProgressItem {
577 Error(anyhow::Error),
578 Blobs(AddProgressItem),
579 Done(ImportFileOutcome),
580}
581
582#[derive(Debug)]
583pub struct ImportFileProgress(ImportInner);
584
585#[derive(derive_more::Debug)]
586enum ImportInner {
587 #[debug("Blobs")]
588 Blobs(
589 n0_future::boxed::BoxStream<AddProgressItem>,
590 Option<EntryContext>,
591 ),
592 #[debug("Entry")]
593 Entry(n0_future::boxed::BoxFuture<Result<ImportFileOutcome>>),
594 Done,
595}
596
597struct EntryContext {
598 doc: Doc,
599 author: AuthorId,
600 key: Bytes,
601 size: Option<u64>,
602}
603
604impl Stream for ImportFileProgress {
605 type Item = ImportFileProgressItem;
606
607 fn poll_next(
608 self: Pin<&mut Self>,
609 cx: &mut std::task::Context<'_>,
610 ) -> Poll<Option<Self::Item>> {
611 let this = self.get_mut();
612 match this.0 {
613 ImportInner::Blobs(ref mut progress, ref mut context) => {
614 match ready!(progress.poll_next(cx)) {
615 Some(item) => match item {
616 AddProgressItem::Size(size) => {
617 context
618 .as_mut()
619 .expect("Size must be emitted before done")
620 .size = Some(size);
621 Poll::Ready(Some(ImportFileProgressItem::Blobs(AddProgressItem::Size(
622 size,
623 ))))
624 }
625 AddProgressItem::Error(err) => {
626 *this = Self(ImportInner::Done);
627 Poll::Ready(Some(ImportFileProgressItem::Error(err.into())))
628 }
629 AddProgressItem::Done(tag) => {
630 let EntryContext {
631 doc,
632 author,
633 key,
634 size,
635 } = context
636 .take()
637 .expect("AddProgressItem::Done may be emitted only once");
638 let size = size.expect("Size must be emitted before done");
639 let hash = tag.hash();
640 *this = Self(ImportInner::Entry(Box::pin(async move {
641 doc.set_hash(author, key.clone(), hash, size).await?;
642 Ok(ImportFileOutcome { hash, size, key })
643 })));
644 Poll::Ready(Some(ImportFileProgressItem::Blobs(AddProgressItem::Done(
645 tag,
646 ))))
647 }
648 item => Poll::Ready(Some(ImportFileProgressItem::Blobs(item))),
649 },
650 None => todo!(),
651 }
652 }
653 ImportInner::Entry(ref mut fut) => {
654 let res = ready!(fut.poll(cx));
655 *this = Self(ImportInner::Done);
656 match res {
657 Ok(outcome) => Poll::Ready(Some(ImportFileProgressItem::Done(outcome))),
658 Err(err) => Poll::Ready(Some(ImportFileProgressItem::Error(err))),
659 }
660 }
661 ImportInner::Done => Poll::Ready(None),
662 }
663 }
664}
665
666impl Future for ImportFileProgress {
667 type Output = Result<ImportFileOutcome>;
668 fn poll(mut self: Pin<&mut Self>, cx: &mut std::task::Context<'_>) -> Poll<Self::Output> {
669 loop {
670 match self.as_mut().poll_next(cx) {
671 Poll::Ready(Some(item)) => match item {
672 ImportFileProgressItem::Error(error) => return Poll::Ready(Err(error)),
673 ImportFileProgressItem::Blobs(_add_progress_item) => continue,
674 ImportFileProgressItem::Done(outcome) => return Poll::Ready(Ok(outcome)),
675 },
676 Poll::Ready(None) => {
677 return Poll::Ready(Err(anyhow::anyhow!(
678 "ImportFileProgress polled after completion"
679 )))
680 }
681 Poll::Pending => return Poll::Pending,
682 }
683 }
684 }
685}
686
687#[derive(Debug, Clone, PartialEq, Eq)]
689pub struct ImportFileOutcome {
690 pub hash: Hash,
692 pub size: u64,
694 pub key: Bytes,
696}