1use std::{
5 path::{Path, PathBuf},
6 pin::Pin,
7 sync::Arc,
8 task::{Context, Poll},
9};
10
11use anyhow::{anyhow, Context as _, Result};
12use bytes::Bytes;
13use derive_more::{Display, FromStr};
14use futures_lite::{Stream, StreamExt};
15use iroh::NodeAddr;
16use iroh_blobs::{export::ExportProgress, store::ExportMode, Hash};
17use portable_atomic::{AtomicBool, Ordering};
18use quic_rpc::{
19 client::BoxedConnector, message::RpcMsg, transport::flume::FlumeConnector, Connector,
20};
21use serde::{Deserialize, Serialize};
22
23use super::{authors, flatten};
24use crate::{
25 actor::OpenState,
26 rpc::{
27 proto::{
28 CloseRequest, CreateRequest, DelRequest, DelResponse, DocListRequest,
29 DocSubscribeRequest, DropRequest, ExportFileRequest, GetDownloadPolicyRequest,
30 GetExactRequest, GetManyRequest, GetSyncPeersRequest, ImportFileRequest, ImportRequest,
31 LeaveRequest, OpenRequest, RpcService, SetDownloadPolicyRequest, SetHashRequest,
32 SetRequest, ShareRequest, StartSyncRequest, StatusRequest,
33 },
34 AddrInfoOptions,
35 },
36 store::{DownloadPolicy, Query},
37 AuthorId, Capability, CapabilityKind, DocTicket, NamespaceId, PeerIdBytes,
38};
39#[doc(inline)]
40pub use crate::{
41 engine::{LiveEvent, Origin, SyncEvent, SyncReason},
42 Entry,
43};
44
45pub type MemClient =
47 Client<FlumeConnector<crate::rpc::proto::Response, crate::rpc::proto::Request>>;
48
49#[derive(Debug, Clone)]
51#[repr(transparent)]
52pub struct Client<C = BoxedConnector<RpcService>> {
53 pub(super) rpc: quic_rpc::RpcClient<RpcService, C>,
54}
55
56impl<C: Connector<RpcService>> Client<C> {
57 pub fn new(rpc: quic_rpc::RpcClient<RpcService, C>) -> Self {
59 Self { rpc }
60 }
61
62 pub fn authors(&self) -> authors::Client<C> {
64 authors::Client::new(self.rpc.clone())
65 }
66
67 pub async fn create(&self) -> Result<Doc<C>> {
69 let res = self.rpc.rpc(CreateRequest {}).await??;
70 let doc = Doc::new(self.rpc.clone(), res.id);
71 Ok(doc)
72 }
73
74 pub async fn drop_doc(&self, doc_id: NamespaceId) -> Result<()> {
80 self.rpc.rpc(DropRequest { doc_id }).await??;
81 Ok(())
82 }
83
84 pub async fn import_namespace(&self, capability: Capability) -> Result<Doc<C>> {
88 let res = self.rpc.rpc(ImportRequest { capability }).await??;
89 let doc = Doc::new(self.rpc.clone(), res.doc_id);
90 Ok(doc)
91 }
92
93 pub async fn import(&self, ticket: DocTicket) -> Result<Doc<C>> {
95 let DocTicket { capability, nodes } = ticket;
96 let doc = self.import_namespace(capability).await?;
97 doc.start_sync(nodes).await?;
98 Ok(doc)
99 }
100
101 pub async fn import_and_subscribe(
108 &self,
109 ticket: DocTicket,
110 ) -> Result<(Doc<C>, impl Stream<Item = anyhow::Result<LiveEvent>>)> {
111 let DocTicket { capability, nodes } = ticket;
112 let res = self.rpc.rpc(ImportRequest { capability }).await??;
113 let doc = Doc::new(self.rpc.clone(), res.doc_id);
114 let events = doc.subscribe().await?;
115 doc.start_sync(nodes).await?;
116 Ok((doc, events))
117 }
118
119 pub async fn list(&self) -> Result<impl Stream<Item = Result<(NamespaceId, CapabilityKind)>>> {
121 let stream = self.rpc.server_streaming(DocListRequest {}).await?;
122 Ok(flatten(stream).map(|res| res.map(|res| (res.id, res.capability))))
123 }
124
125 pub async fn open(&self, id: NamespaceId) -> Result<Option<Doc<C>>> {
129 self.rpc.rpc(OpenRequest { doc_id: id }).await??;
130 let doc = Doc::new(self.rpc.clone(), id);
131 Ok(Some(doc))
132 }
133}
134
135#[derive(Debug, Clone)]
137pub struct Doc<C: Connector<RpcService> = BoxedConnector<RpcService>>(Arc<DocInner<C>>)
138where
139 C: quic_rpc::Connector<RpcService>;
140
141impl<C: Connector<RpcService>> PartialEq for Doc<C> {
142 fn eq(&self, other: &Self) -> bool {
143 self.0.id == other.0.id
144 }
145}
146
147impl<C: Connector<RpcService>> Eq for Doc<C> {}
148
149#[derive(Debug)]
150struct DocInner<C: Connector<RpcService> = BoxedConnector<RpcService>> {
151 id: NamespaceId,
152 rpc: quic_rpc::RpcClient<RpcService, C>,
153 closed: AtomicBool,
154 rt: tokio::runtime::Handle,
155}
156
157impl<C> Drop for DocInner<C>
158where
159 C: quic_rpc::Connector<RpcService>,
160{
161 fn drop(&mut self) {
162 let doc_id = self.id;
163 let rpc = self.rpc.clone();
164 if !self.closed.swap(true, Ordering::Relaxed) {
165 self.rt.spawn(async move {
166 rpc.rpc(CloseRequest { doc_id }).await.ok();
167 });
168 }
169 }
170}
171
172impl<C: Connector<RpcService>> Doc<C> {
173 fn new(rpc: quic_rpc::RpcClient<RpcService, C>, id: NamespaceId) -> Self {
174 Self(Arc::new(DocInner {
175 rpc,
176 id,
177 closed: AtomicBool::new(false),
178 rt: tokio::runtime::Handle::current(),
179 }))
180 }
181
182 async fn rpc<M>(&self, msg: M) -> Result<M::Response>
183 where
184 M: RpcMsg<RpcService>,
185 {
186 let res = self.0.rpc.rpc(msg).await?;
187 Ok(res)
188 }
189
190 pub fn id(&self) -> NamespaceId {
192 self.0.id
193 }
194
195 pub async fn close(&self) -> Result<()> {
197 if !self.0.closed.swap(true, Ordering::Relaxed) {
198 self.rpc(CloseRequest { doc_id: self.id() }).await??;
199 }
200 Ok(())
201 }
202
203 fn ensure_open(&self) -> Result<()> {
204 if self.0.closed.load(Ordering::Relaxed) {
205 Err(anyhow!("document is closed"))
206 } else {
207 Ok(())
208 }
209 }
210
211 pub async fn set_bytes(
213 &self,
214 author_id: AuthorId,
215 key: impl Into<Bytes>,
216 value: impl Into<Bytes>,
217 ) -> Result<Hash> {
218 self.ensure_open()?;
219 let res = self
220 .rpc(SetRequest {
221 doc_id: self.id(),
222 author_id,
223 key: key.into(),
224 value: value.into(),
225 })
226 .await??;
227 Ok(res.entry.content_hash())
228 }
229
230 pub async fn set_hash(
232 &self,
233 author_id: AuthorId,
234 key: impl Into<Bytes>,
235 hash: Hash,
236 size: u64,
237 ) -> Result<()> {
238 self.ensure_open()?;
239 self.rpc(SetHashRequest {
240 doc_id: self.id(),
241 author_id,
242 key: key.into(),
243 hash,
244 size,
245 })
246 .await??;
247 Ok(())
248 }
249
250 pub async fn import_file(
252 &self,
253 author: AuthorId,
254 key: Bytes,
255 path: impl AsRef<Path>,
256 in_place: bool,
257 ) -> Result<ImportFileProgress> {
258 self.ensure_open()?;
259 let stream = self
260 .0
261 .rpc
262 .server_streaming(ImportFileRequest {
263 doc_id: self.id(),
264 author_id: author,
265 path: path.as_ref().into(),
266 key,
267 in_place,
268 })
269 .await?;
270 Ok(ImportFileProgress::new(stream))
271 }
272
273 pub async fn export_file(
275 &self,
276 entry: Entry,
277 path: impl AsRef<Path>,
278 mode: ExportMode,
279 ) -> Result<ExportFileProgress> {
280 self.ensure_open()?;
281 let stream = self
282 .0
283 .rpc
284 .server_streaming(ExportFileRequest {
285 entry,
286 path: path.as_ref().into(),
287 mode,
288 })
289 .await?;
290 Ok(ExportFileProgress::new(stream))
291 }
292
293 pub async fn del(&self, author_id: AuthorId, prefix: impl Into<Bytes>) -> Result<usize> {
300 self.ensure_open()?;
301 let res = self
302 .rpc(DelRequest {
303 doc_id: self.id(),
304 author_id,
305 prefix: prefix.into(),
306 })
307 .await??;
308 let DelResponse { removed } = res;
309 Ok(removed)
310 }
311
312 pub async fn get_exact(
316 &self,
317 author: AuthorId,
318 key: impl AsRef<[u8]>,
319 include_empty: bool,
320 ) -> Result<Option<Entry>> {
321 self.ensure_open()?;
322 let res = self
323 .rpc(GetExactRequest {
324 author,
325 key: key.as_ref().to_vec().into(),
326 doc_id: self.id(),
327 include_empty,
328 })
329 .await??;
330 Ok(res.entry.map(|entry| entry.into()))
331 }
332
333 pub async fn get_many(
335 &self,
336 query: impl Into<Query>,
337 ) -> Result<impl Stream<Item = Result<Entry>>> {
338 self.ensure_open()?;
339 let stream = self
340 .0
341 .rpc
342 .server_streaming(GetManyRequest {
343 doc_id: self.id(),
344 query: query.into(),
345 })
346 .await?;
347 Ok(flatten(stream).map(|res| res.map(|res| res.entry.into())))
348 }
349
350 pub async fn get_one(&self, query: impl Into<Query>) -> Result<Option<Entry>> {
352 self.get_many(query).await?.next().await.transpose()
353 }
354
355 pub async fn share(
357 &self,
358 mode: ShareMode,
359 addr_options: AddrInfoOptions,
360 ) -> anyhow::Result<DocTicket> {
361 self.ensure_open()?;
362 let res = self
363 .rpc(ShareRequest {
364 doc_id: self.id(),
365 mode,
366 addr_options,
367 })
368 .await??;
369 Ok(res.0)
370 }
371
372 pub async fn start_sync(&self, peers: Vec<NodeAddr>) -> Result<()> {
374 self.ensure_open()?;
375 let _res = self
376 .rpc(StartSyncRequest {
377 doc_id: self.id(),
378 peers,
379 })
380 .await??;
381 Ok(())
382 }
383
384 pub async fn leave(&self) -> Result<()> {
386 self.ensure_open()?;
387 let _res = self.rpc(LeaveRequest { doc_id: self.id() }).await??;
388 Ok(())
389 }
390
391 pub async fn subscribe(&self) -> anyhow::Result<impl Stream<Item = anyhow::Result<LiveEvent>>> {
393 self.ensure_open()?;
394 let stream = self
395 .0
396 .rpc
397 .try_server_streaming(DocSubscribeRequest { doc_id: self.id() })
398 .await?;
399 Ok(stream.map(|res| match res {
400 Ok(res) => Ok(res.event),
401 Err(err) => Err(err.into()),
402 }))
403 }
404
405 pub async fn status(&self) -> anyhow::Result<OpenState> {
407 self.ensure_open()?;
408 let res = self.rpc(StatusRequest { doc_id: self.id() }).await??;
409 Ok(res.status)
410 }
411
412 pub async fn set_download_policy(&self, policy: DownloadPolicy) -> Result<()> {
414 self.rpc(SetDownloadPolicyRequest {
415 doc_id: self.id(),
416 policy,
417 })
418 .await??;
419 Ok(())
420 }
421
422 pub async fn get_download_policy(&self) -> Result<DownloadPolicy> {
424 let res = self
425 .rpc(GetDownloadPolicyRequest { doc_id: self.id() })
426 .await??;
427 Ok(res.policy)
428 }
429
430 pub async fn get_sync_peers(&self) -> Result<Option<Vec<PeerIdBytes>>> {
432 let res = self
433 .rpc(GetSyncPeersRequest { doc_id: self.id() })
434 .await??;
435 Ok(res.peers)
436 }
437}
438
439impl<'a, C> From<&'a Doc<C>> for &'a quic_rpc::RpcClient<RpcService, C>
440where
441 C: quic_rpc::Connector<RpcService>,
442{
443 fn from(doc: &'a Doc<C>) -> &'a quic_rpc::RpcClient<RpcService, C> {
444 &doc.0.rpc
445 }
446}
447
448#[derive(Debug, Serialize, Deserialize)]
454pub enum ImportProgress {
455 Found {
457 id: u64,
459 name: String,
461 size: u64,
463 },
464 Progress {
466 id: u64,
468 offset: u64,
470 },
471 IngestDone {
473 id: u64,
475 hash: Hash,
477 },
478 AllDone {
480 key: Bytes,
482 },
483 Abort(serde_error::Error),
487}
488
489#[derive(Serialize, Deserialize, Debug, Clone, Display, FromStr)]
491pub enum ShareMode {
492 Read,
494 Write,
496}
497#[derive(derive_more::Debug)]
499#[must_use = "streams do nothing unless polled"]
500pub struct ImportFileProgress {
501 #[debug(skip)]
502 stream: Pin<Box<dyn Stream<Item = Result<ImportProgress>> + Send + Unpin + 'static>>,
503}
504
505impl ImportFileProgress {
506 fn new(
507 stream: (impl Stream<Item = Result<impl Into<ImportProgress>, impl Into<anyhow::Error>>>
508 + Send
509 + Unpin
510 + 'static),
511 ) -> Self {
512 let stream = stream.map(|item| match item {
513 Ok(item) => Ok(item.into()),
514 Err(err) => Err(err.into()),
515 });
516 Self {
517 stream: Box::pin(stream),
518 }
519 }
520
521 pub async fn finish(mut self) -> Result<ImportFileOutcome> {
526 let mut entry_size = 0;
527 let mut entry_hash = None;
528 while let Some(msg) = self.next().await {
529 match msg? {
530 ImportProgress::Found { size, .. } => {
531 entry_size = size;
532 }
533 ImportProgress::AllDone { key } => {
534 let hash = entry_hash
535 .context("expected DocImportProgress::IngestDone event to occur")?;
536 let outcome = ImportFileOutcome {
537 hash,
538 key,
539 size: entry_size,
540 };
541 return Ok(outcome);
542 }
543 ImportProgress::Abort(err) => return Err(err.into()),
544 ImportProgress::Progress { .. } => {}
545 ImportProgress::IngestDone { hash, .. } => {
546 entry_hash = Some(hash);
547 }
548 }
549 }
550 Err(anyhow!("Response stream ended prematurely"))
551 }
552}
553
554#[derive(Debug, Clone, PartialEq, Eq)]
556pub struct ImportFileOutcome {
557 pub hash: Hash,
559 pub size: u64,
561 pub key: Bytes,
563}
564
565impl Stream for ImportFileProgress {
566 type Item = Result<ImportProgress>;
567 fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
568 Pin::new(&mut self.stream).poll_next(cx)
569 }
570}
571
572#[derive(derive_more::Debug)]
574pub struct ExportFileProgress {
575 #[debug(skip)]
576 stream: Pin<Box<dyn Stream<Item = Result<ExportProgress>> + Send + Unpin + 'static>>,
577}
578impl ExportFileProgress {
579 fn new(
580 stream: (impl Stream<Item = Result<impl Into<ExportProgress>, impl Into<anyhow::Error>>>
581 + Send
582 + Unpin
583 + 'static),
584 ) -> Self {
585 let stream = stream.map(|item| match item {
586 Ok(item) => Ok(item.into()),
587 Err(err) => Err(err.into()),
588 });
589 Self {
590 stream: Box::pin(stream),
591 }
592 }
593
594 pub async fn finish(mut self) -> Result<ExportFileOutcome> {
598 let mut total_size = 0;
599 let mut path = None;
600 while let Some(msg) = self.next().await {
601 match msg? {
602 ExportProgress::Found { size, outpath, .. } => {
603 total_size = size.value();
604 path = Some(outpath);
605 }
606 ExportProgress::AllDone => {
607 let path = path.context("expected ExportProgress::Found event to occur")?;
608 let outcome = ExportFileOutcome {
609 size: total_size,
610 path,
611 };
612 return Ok(outcome);
613 }
614 ExportProgress::Done { .. } => {}
615 ExportProgress::Abort(err) => return Err(anyhow!(err)),
616 ExportProgress::Progress { .. } => {}
617 }
618 }
619 Err(anyhow!("Response stream ended prematurely"))
620 }
621}
622
623#[derive(Debug, Clone, PartialEq, Eq)]
625pub struct ExportFileOutcome {
626 pub size: u64,
628 pub path: PathBuf,
630}
631
632impl Stream for ExportFileProgress {
633 type Item = Result<ExportProgress>;
634
635 fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
636 Pin::new(&mut self.stream).poll_next(cx)
637 }
638}