iroh_blobs/api/
blobs.rs

1//! API to interact with a local blob store
2//!
3//! This API is for local interactions with the blob store, such as importing
4//! and exporting blobs, observing the bitfield of a blob, and deleting blobs.
5//!
6//! The main entry point is the [`Blobs`] struct.
7use std::{
8    collections::BTreeMap,
9    future::{Future, IntoFuture},
10    io,
11    num::NonZeroU64,
12    path::{Path, PathBuf},
13    pin::Pin,
14};
15
16pub use bao_tree::io::mixed::EncodedItem;
17use bao_tree::{
18    io::{
19        fsm::{ResponseDecoder, ResponseDecoderNext},
20        BaoContentItem, Leaf,
21    },
22    BaoTree, ChunkNum, ChunkRanges,
23};
24use bytes::Bytes;
25use genawaiter::sync::Gen;
26use iroh_io::{AsyncStreamReader, TokioStreamReader};
27use irpc::channel::{mpsc, oneshot};
28use n0_future::{future, stream, Stream, StreamExt};
29use quinn::SendStream;
30use range_collections::{range_set::RangeSetRange, RangeSet2};
31use ref_cast::RefCast;
32use serde::{Deserialize, Serialize};
33use tokio::io::AsyncWriteExt;
34use tracing::trace;
35mod reader;
36pub use reader::BlobReader;
37
38// Public reexports from the proto module.
39//
40// Due to the fact that the proto module is hidden from docs by default,
41// these will appear in the docs as if they were declared here.
42pub use super::proto::{
43    AddProgressItem, Bitfield, BlobDeleteRequest as DeleteOptions, BlobStatus,
44    ExportBaoRequest as ExportBaoOptions, ExportMode, ExportPathRequest as ExportOptions,
45    ExportProgressItem, ExportRangesRequest as ExportRangesOptions,
46    ImportBaoRequest as ImportBaoOptions, ImportMode, ObserveRequest as ObserveOptions,
47};
48use super::{
49    proto::{
50        BatchResponse, BlobStatusRequest, ClearProtectedRequest, CreateTempTagRequest,
51        ExportBaoRequest, ExportRangesItem, ImportBaoRequest, ImportByteStreamRequest,
52        ImportBytesRequest, ImportPathRequest, ListRequest, Scope,
53    },
54    remote::HashSeqChunk,
55    tags::TagInfo,
56    ApiClient, RequestResult, Tags,
57};
58use crate::{
59    api::proto::{BatchRequest, ImportByteStreamUpdate},
60    provider::events::ClientResult,
61    store::IROH_BLOCK_SIZE,
62    util::temp_tag::TempTag,
63    BlobFormat, Hash, HashAndFormat,
64};
65
66/// Options for adding bytes.
67#[derive(Debug)]
68pub struct AddBytesOptions {
69    pub data: Bytes,
70    pub format: BlobFormat,
71}
72
73impl<T: Into<Bytes>> From<(T, BlobFormat)> for AddBytesOptions {
74    fn from(item: (T, BlobFormat)) -> Self {
75        let (data, format) = item;
76        Self {
77            data: data.into(),
78            format,
79        }
80    }
81}
82
83/// Blobs API
84#[derive(Debug, Clone, ref_cast::RefCast)]
85#[repr(transparent)]
86pub struct Blobs {
87    client: ApiClient,
88}
89
90impl Blobs {
91    pub(crate) fn ref_from_sender(sender: &ApiClient) -> &Self {
92        Self::ref_cast(sender)
93    }
94
95    pub async fn batch(&self) -> irpc::Result<Batch<'_>> {
96        let msg = BatchRequest;
97        trace!("{msg:?}");
98        let (tx, rx) = self.client.client_streaming(msg, 32).await?;
99        let scope = rx.await?;
100
101        Ok(Batch {
102            scope,
103            blobs: self,
104            _tx: tx,
105        })
106    }
107
108    /// Create a reader for the given hash. The reader implements [`tokio::io::AsyncRead`] and [`tokio::io::AsyncSeek`]
109    /// and therefore can be used to read the blob's content.
110    ///
111    /// Any access to parts of the blob that are not present will result in an error.
112    ///
113    /// Example:
114    /// ```rust
115    /// use iroh_blobs::{store::mem::MemStore, api::blobs::Blobs};
116    /// use tokio::io::AsyncReadExt;
117    ///
118    /// # async fn example() -> anyhow::Result<()> {
119    /// let store = MemStore::new();
120    /// let tag = store.add_slice(b"Hello, world!").await?;
121    /// let mut reader = store.reader(tag.hash);
122    /// let mut buf = String::new();
123    /// reader.read_to_string(&mut buf).await?;
124    /// assert_eq!(buf, "Hello, world!");
125    /// # Ok(())
126    /// }
127    /// ```
128    pub fn reader(&self, hash: impl Into<Hash>) -> BlobReader {
129        self.reader_with_opts(ReaderOptions { hash: hash.into() })
130    }
131
132    /// Create a reader for the given options. The reader implements [`tokio::io::AsyncRead`] and [`tokio::io::AsyncSeek`]
133    /// and therefore can be used to read the blob's content.
134    ///
135    /// Any access to parts of the blob that are not present will result in an error.
136    pub fn reader_with_opts(&self, options: ReaderOptions) -> BlobReader {
137        BlobReader::new(self.clone(), options)
138    }
139
140    /// Delete a blob.
141    ///
142    /// This function is not public, because it does not work as expected when called manually,
143    /// because blobs are protected from deletion. This is only called from the gc task, which
144    /// clears the protections before.
145    ///
146    /// Users should rely only on garbage collection for blob deletion.
147    #[cfg(feature = "fs-store")]
148    pub(crate) async fn delete_with_opts(&self, options: DeleteOptions) -> RequestResult<()> {
149        trace!("{options:?}");
150        self.client.rpc(options).await??;
151        Ok(())
152    }
153
154    /// See [`Self::delete_with_opts`].
155    #[cfg(feature = "fs-store")]
156    pub(crate) async fn delete(
157        &self,
158        hashes: impl IntoIterator<Item = impl Into<Hash>>,
159    ) -> RequestResult<()> {
160        self.delete_with_opts(DeleteOptions {
161            hashes: hashes.into_iter().map(Into::into).collect(),
162            force: false,
163        })
164        .await
165    }
166
167    pub fn add_slice(&self, data: impl AsRef<[u8]>) -> AddProgress<'_> {
168        let options = ImportBytesRequest {
169            data: Bytes::copy_from_slice(data.as_ref()),
170            format: crate::BlobFormat::Raw,
171            scope: Scope::GLOBAL,
172        };
173        self.add_bytes_impl(options)
174    }
175
176    pub fn add_bytes(&self, data: impl Into<bytes::Bytes>) -> AddProgress<'_> {
177        let options = ImportBytesRequest {
178            data: data.into(),
179            format: crate::BlobFormat::Raw,
180            scope: Scope::GLOBAL,
181        };
182        self.add_bytes_impl(options)
183    }
184
185    pub fn add_bytes_with_opts(&self, options: impl Into<AddBytesOptions>) -> AddProgress<'_> {
186        let options = options.into();
187        let request = ImportBytesRequest {
188            data: options.data,
189            format: options.format,
190            scope: Scope::GLOBAL,
191        };
192        self.add_bytes_impl(request)
193    }
194
195    fn add_bytes_impl(&self, options: ImportBytesRequest) -> AddProgress<'_> {
196        trace!("{options:?}");
197        let this = self.clone();
198        let stream = Gen::new(|co| async move {
199            let mut receiver = match this.client.server_streaming(options, 32).await {
200                Ok(receiver) => receiver,
201                Err(cause) => {
202                    co.yield_(AddProgressItem::Error(cause.into())).await;
203                    return;
204                }
205            };
206            loop {
207                match receiver.recv().await {
208                    Ok(Some(item)) => co.yield_(item).await,
209                    Err(cause) => {
210                        co.yield_(AddProgressItem::Error(cause.into())).await;
211                        break;
212                    }
213                    Ok(None) => break,
214                }
215            }
216        });
217        AddProgress::new(self, stream)
218    }
219
220    pub fn add_path_with_opts(&self, options: impl Into<AddPathOptions>) -> AddProgress<'_> {
221        let options = options.into();
222        self.add_path_with_opts_impl(ImportPathRequest {
223            path: options.path,
224            mode: options.mode,
225            format: options.format,
226            scope: Scope::GLOBAL,
227        })
228    }
229
230    fn add_path_with_opts_impl(&self, options: ImportPathRequest) -> AddProgress<'_> {
231        trace!("{:?}", options);
232        let client = self.client.clone();
233        let stream = Gen::new(|co| async move {
234            let mut receiver = match client.server_streaming(options, 32).await {
235                Ok(receiver) => receiver,
236                Err(cause) => {
237                    co.yield_(AddProgressItem::Error(cause.into())).await;
238                    return;
239                }
240            };
241            loop {
242                match receiver.recv().await {
243                    Ok(Some(item)) => co.yield_(item).await,
244                    Err(cause) => {
245                        co.yield_(AddProgressItem::Error(cause.into())).await;
246                        break;
247                    }
248                    Ok(None) => break,
249                }
250            }
251        });
252        AddProgress::new(self, stream)
253    }
254
255    pub fn add_path(&self, path: impl AsRef<Path>) -> AddProgress<'_> {
256        self.add_path_with_opts(AddPathOptions {
257            path: path.as_ref().to_owned(),
258            mode: ImportMode::Copy,
259            format: BlobFormat::Raw,
260        })
261    }
262
263    pub async fn add_stream(
264        &self,
265        data: impl Stream<Item = io::Result<Bytes>> + Send + Sync + 'static,
266    ) -> AddProgress<'_> {
267        let inner = ImportByteStreamRequest {
268            format: crate::BlobFormat::Raw,
269            scope: Scope::default(),
270        };
271        let client = self.client.clone();
272        let stream = Gen::new(|co| async move {
273            let (sender, mut receiver) = match client.bidi_streaming(inner, 32, 32).await {
274                Ok(x) => x,
275                Err(cause) => {
276                    co.yield_(AddProgressItem::Error(cause.into())).await;
277                    return;
278                }
279            };
280            let recv = async {
281                loop {
282                    match receiver.recv().await {
283                        Ok(Some(item)) => co.yield_(item).await,
284                        Err(cause) => {
285                            co.yield_(AddProgressItem::Error(cause.into())).await;
286                            break;
287                        }
288                        Ok(None) => break,
289                    }
290                }
291            };
292            let send = async {
293                tokio::pin!(data);
294                while let Some(item) = data.next().await {
295                    sender.send(ImportByteStreamUpdate::Bytes(item?)).await?;
296                }
297                sender.send(ImportByteStreamUpdate::Done).await?;
298                anyhow::Ok(())
299            };
300            let _ = tokio::join!(send, recv);
301        });
302        AddProgress::new(self, stream)
303    }
304
305    pub fn export_ranges(
306        &self,
307        hash: impl Into<Hash>,
308        ranges: impl Into<RangeSet2<u64>>,
309    ) -> ExportRangesProgress {
310        self.export_ranges_with_opts(ExportRangesOptions {
311            hash: hash.into(),
312            ranges: ranges.into(),
313        })
314    }
315
316    pub fn export_ranges_with_opts(&self, options: ExportRangesOptions) -> ExportRangesProgress {
317        trace!("{options:?}");
318        ExportRangesProgress::new(
319            options.ranges.clone(),
320            self.client.server_streaming(options, 32),
321        )
322    }
323
324    pub fn export_bao_with_opts(
325        &self,
326        options: ExportBaoOptions,
327        local_update_cap: usize,
328    ) -> ExportBaoProgress {
329        trace!("{options:?}");
330        ExportBaoProgress::new(self.client.server_streaming(options, local_update_cap))
331    }
332
333    pub fn export_bao(
334        &self,
335        hash: impl Into<Hash>,
336        ranges: impl Into<ChunkRanges>,
337    ) -> ExportBaoProgress {
338        self.export_bao_with_opts(
339            ExportBaoRequest {
340                hash: hash.into(),
341                ranges: ranges.into(),
342            },
343            32,
344        )
345    }
346
347    /// Export a single chunk from the given hash, at the given offset.
348    pub async fn export_chunk(
349        &self,
350        hash: impl Into<Hash>,
351        offset: u64,
352    ) -> super::ExportBaoResult<Leaf> {
353        let base = ChunkNum::full_chunks(offset);
354        let ranges = ChunkRanges::from(base..base + 1);
355        let mut stream = self.export_bao(hash, ranges).stream();
356        while let Some(item) = stream.next().await {
357            match item {
358                EncodedItem::Leaf(leaf) => return Ok(leaf),
359                EncodedItem::Parent(_) => {}
360                EncodedItem::Size(_) => {}
361                EncodedItem::Done => break,
362                EncodedItem::Error(cause) => return Err(cause.into()),
363            }
364        }
365        Err(io::Error::other("unexpected end of stream").into())
366    }
367
368    /// Get the entire blob into a Bytes
369    ///
370    /// This will run out of memory when called for very large blobs, so be careful!
371    pub async fn get_bytes(&self, hash: impl Into<Hash>) -> super::ExportBaoResult<Bytes> {
372        self.export_bao(hash.into(), ChunkRanges::all())
373            .data_to_bytes()
374            .await
375    }
376
377    /// Observe the bitfield of the given hash.
378    pub fn observe(&self, hash: impl Into<Hash>) -> ObserveProgress {
379        self.observe_with_opts(ObserveOptions { hash: hash.into() })
380    }
381
382    pub fn observe_with_opts(&self, options: ObserveOptions) -> ObserveProgress {
383        trace!("{:?}", options);
384        if options.hash == Hash::EMPTY {
385            return ObserveProgress::new(async move {
386                let (tx, rx) = mpsc::channel(1);
387                tx.send(Bitfield::complete(0)).await.ok();
388                Ok(rx)
389            });
390        }
391        ObserveProgress::new(self.client.server_streaming(options, 32))
392    }
393
394    pub fn export_with_opts(&self, options: ExportOptions) -> ExportProgress {
395        trace!("{:?}", options);
396        ExportProgress::new(self.client.server_streaming(options, 32))
397    }
398
399    pub fn export(&self, hash: impl Into<Hash>, target: impl AsRef<Path>) -> ExportProgress {
400        let options = ExportOptions {
401            hash: hash.into(),
402            mode: ExportMode::Copy,
403            target: target.as_ref().to_owned(),
404        };
405        self.export_with_opts(options)
406    }
407
408    /// Import BaoContentItems from a stream.
409    ///
410    /// The store assumes that these are already verified and in the correct order.
411    #[cfg_attr(feature = "hide-proto-docs", doc(hidden))]
412    pub async fn import_bao(
413        &self,
414        hash: impl Into<Hash>,
415        size: NonZeroU64,
416        local_update_cap: usize,
417    ) -> irpc::Result<ImportBaoHandle> {
418        let options = ImportBaoRequest {
419            hash: hash.into(),
420            size,
421        };
422        self.import_bao_with_opts(options, local_update_cap).await
423    }
424
425    #[cfg_attr(feature = "hide-proto-docs", doc(hidden))]
426    pub async fn import_bao_with_opts(
427        &self,
428        options: ImportBaoOptions,
429        local_update_cap: usize,
430    ) -> irpc::Result<ImportBaoHandle> {
431        trace!("{:?}", options);
432        ImportBaoHandle::new(self.client.client_streaming(options, local_update_cap)).await
433    }
434
435    #[cfg_attr(feature = "hide-proto-docs", doc(hidden))]
436    async fn import_bao_reader<R: AsyncStreamReader>(
437        &self,
438        hash: Hash,
439        ranges: ChunkRanges,
440        mut reader: R,
441    ) -> RequestResult<R> {
442        let size = u64::from_le_bytes(reader.read::<8>().await.map_err(super::Error::other)?);
443        let Some(size) = NonZeroU64::new(size) else {
444            return if hash == Hash::EMPTY {
445                Ok(reader)
446            } else {
447                Err(super::Error::other("invalid size for hash").into())
448            };
449        };
450        let tree = BaoTree::new(size.get(), IROH_BLOCK_SIZE);
451        let mut decoder = ResponseDecoder::new(hash.into(), ranges, tree, reader);
452        let options = ImportBaoOptions { hash, size };
453        let handle = self.import_bao_with_opts(options, 32).await?;
454        let driver = async move {
455            let reader = loop {
456                match decoder.next().await {
457                    ResponseDecoderNext::More((rest, item)) => {
458                        handle.tx.send(item?).await?;
459                        decoder = rest;
460                    }
461                    ResponseDecoderNext::Done(reader) => break reader,
462                };
463            };
464            drop(handle.tx);
465            io::Result::Ok(reader)
466        };
467        let fut = async move { handle.rx.await.map_err(io::Error::other)? };
468        let (reader, res) = tokio::join!(driver, fut);
469        res?;
470        Ok(reader?)
471    }
472
473    #[cfg_attr(feature = "hide-proto-docs", doc(hidden))]
474    pub async fn import_bao_quinn(
475        &self,
476        hash: Hash,
477        ranges: ChunkRanges,
478        stream: &mut iroh::endpoint::RecvStream,
479    ) -> RequestResult<()> {
480        let reader = TokioStreamReader::new(stream);
481        self.import_bao_reader(hash, ranges, reader).await?;
482        Ok(())
483    }
484
485    #[cfg_attr(feature = "hide-proto-docs", doc(hidden))]
486    pub async fn import_bao_bytes(
487        &self,
488        hash: Hash,
489        ranges: ChunkRanges,
490        data: impl Into<Bytes>,
491    ) -> RequestResult<()> {
492        self.import_bao_reader(hash, ranges, data.into()).await?;
493        Ok(())
494    }
495
496    pub fn list(&self) -> BlobsListProgress {
497        let msg = ListRequest;
498        let client = self.client.clone();
499        BlobsListProgress::new(client.server_streaming(msg, 32))
500    }
501
502    pub async fn status(&self, hash: impl Into<Hash>) -> irpc::Result<BlobStatus> {
503        let hash = hash.into();
504        let msg = BlobStatusRequest { hash };
505        self.client.rpc(msg).await
506    }
507
508    pub async fn has(&self, hash: impl Into<Hash>) -> irpc::Result<bool> {
509        match self.status(hash).await? {
510            BlobStatus::Complete { .. } => Ok(true),
511            _ => Ok(false),
512        }
513    }
514
515    #[allow(dead_code)]
516    pub(crate) async fn clear_protected(&self) -> RequestResult<()> {
517        let msg = ClearProtectedRequest;
518        self.client.rpc(msg).await??;
519        Ok(())
520    }
521}
522
523/// A progress handle for a batch scoped add operation.
524pub struct BatchAddProgress<'a>(AddProgress<'a>);
525
526impl<'a> IntoFuture for BatchAddProgress<'a> {
527    type Output = RequestResult<TempTag>;
528
529    type IntoFuture = Pin<Box<dyn Future<Output = Self::Output> + Send + 'a>>;
530
531    fn into_future(self) -> Self::IntoFuture {
532        Box::pin(self.temp_tag())
533    }
534}
535
536impl<'a> BatchAddProgress<'a> {
537    pub async fn with_named_tag(self, name: impl AsRef<[u8]>) -> RequestResult<HashAndFormat> {
538        self.0.with_named_tag(name).await
539    }
540
541    pub async fn with_tag(self) -> RequestResult<TagInfo> {
542        self.0.with_tag().await
543    }
544
545    pub async fn stream(self) -> impl Stream<Item = AddProgressItem> {
546        self.0.stream().await
547    }
548
549    pub async fn temp_tag(self) -> RequestResult<TempTag> {
550        self.0.temp_tag().await
551    }
552}
553
554/// A batch of operations that modify the blob store.
555pub struct Batch<'a> {
556    scope: Scope,
557    blobs: &'a Blobs,
558    _tx: mpsc::Sender<BatchResponse>,
559}
560
561impl<'a> Batch<'a> {
562    pub fn add_bytes(&self, data: impl Into<Bytes>) -> BatchAddProgress<'_> {
563        let options = ImportBytesRequest {
564            data: data.into(),
565            format: crate::BlobFormat::Raw,
566            scope: self.scope,
567        };
568        BatchAddProgress(self.blobs.add_bytes_impl(options))
569    }
570
571    pub fn add_bytes_with_opts(&self, options: impl Into<AddBytesOptions>) -> BatchAddProgress<'_> {
572        let options = options.into();
573        BatchAddProgress(self.blobs.add_bytes_impl(ImportBytesRequest {
574            data: options.data,
575            format: options.format,
576            scope: self.scope,
577        }))
578    }
579
580    pub fn add_slice(&self, data: impl AsRef<[u8]>) -> BatchAddProgress<'_> {
581        let options = ImportBytesRequest {
582            data: Bytes::copy_from_slice(data.as_ref()),
583            format: crate::BlobFormat::Raw,
584            scope: self.scope,
585        };
586        BatchAddProgress(self.blobs.add_bytes_impl(options))
587    }
588
589    pub fn add_path_with_opts(&self, options: impl Into<AddPathOptions>) -> BatchAddProgress<'_> {
590        let options = options.into();
591        BatchAddProgress(self.blobs.add_path_with_opts_impl(ImportPathRequest {
592            path: options.path,
593            mode: options.mode,
594            format: options.format,
595            scope: self.scope,
596        }))
597    }
598
599    pub async fn temp_tag(&self, value: impl Into<HashAndFormat>) -> irpc::Result<TempTag> {
600        let value = value.into();
601        let msg = CreateTempTagRequest {
602            scope: self.scope,
603            value,
604        };
605        self.blobs.client.rpc(msg).await
606    }
607}
608
609/// Options for adding data from a file system path.
610#[derive(Debug)]
611pub struct AddPathOptions {
612    pub path: PathBuf,
613    pub format: BlobFormat,
614    pub mode: ImportMode,
615}
616
617/// A progress handle for an import operation.
618///
619/// Internally this is a stream of [`AddProgressItem`] items. Working with this
620/// stream directly can be inconvenient, so this struct provides some convenience
621/// methods to work with the result.
622///
623/// It also implements [`IntoFuture`], so you can await it to get the [`TempTag`] that
624/// contains the hash of the added content and also protects the content.
625///
626/// If you want access to the stream, you can use the [`AddProgress::stream`] method.
627pub struct AddProgress<'a> {
628    blobs: &'a Blobs,
629    inner: stream::Boxed<AddProgressItem>,
630}
631
632impl<'a> IntoFuture for AddProgress<'a> {
633    type Output = RequestResult<TagInfo>;
634
635    type IntoFuture = Pin<Box<dyn Future<Output = Self::Output> + Send + 'a>>;
636
637    fn into_future(self) -> Self::IntoFuture {
638        Box::pin(self.with_tag())
639    }
640}
641
642impl<'a> AddProgress<'a> {
643    fn new(blobs: &'a Blobs, stream: impl Stream<Item = AddProgressItem> + Send + 'static) -> Self {
644        Self {
645            blobs,
646            inner: Box::pin(stream),
647        }
648    }
649
650    pub async fn temp_tag(self) -> RequestResult<TempTag> {
651        let mut stream = self.inner;
652        while let Some(item) = stream.next().await {
653            match item {
654                AddProgressItem::Done(tt) => return Ok(tt),
655                AddProgressItem::Error(e) => return Err(e.into()),
656                _ => {}
657            }
658        }
659        Err(super::Error::other("unexpected end of stream").into())
660    }
661
662    pub async fn with_named_tag(self, name: impl AsRef<[u8]>) -> RequestResult<HashAndFormat> {
663        let blobs = self.blobs.clone();
664        let tt = self.temp_tag().await?;
665        let haf = *tt.hash_and_format();
666        let tags = Tags::ref_from_sender(&blobs.client);
667        tags.set(name, *tt.hash_and_format()).await?;
668        drop(tt);
669        Ok(haf)
670    }
671
672    pub async fn with_tag(self) -> RequestResult<TagInfo> {
673        let blobs = self.blobs.clone();
674        let tt = self.temp_tag().await?;
675        let hash = *tt.hash();
676        let format = tt.format();
677        let tags = Tags::ref_from_sender(&blobs.client);
678        let name = tags.create(*tt.hash_and_format()).await?;
679        drop(tt);
680        Ok(TagInfo { name, hash, format })
681    }
682
683    pub async fn stream(self) -> impl Stream<Item = AddProgressItem> {
684        self.inner
685    }
686}
687
688/// Options for an async reader for blobs that supports AsyncRead and AsyncSeek.
689#[derive(Debug, Clone, Serialize, Deserialize)]
690pub struct ReaderOptions {
691    pub hash: Hash,
692}
693
694/// An observe result. Awaiting this will return the current state.
695///
696/// Calling [`ObserveProgress::stream`] will return a stream of updates, where
697/// the first item is the current state and subsequent items are updates.
698pub struct ObserveProgress {
699    inner: future::Boxed<irpc::Result<mpsc::Receiver<Bitfield>>>,
700}
701
702impl IntoFuture for ObserveProgress {
703    type Output = RequestResult<Bitfield>;
704
705    type IntoFuture = Pin<Box<dyn Future<Output = Self::Output> + Send>>;
706
707    fn into_future(self) -> Self::IntoFuture {
708        Box::pin(async move {
709            let mut rx = self.inner.await?;
710            match rx.recv().await? {
711                Some(bitfield) => Ok(bitfield),
712                None => Err(super::Error::other("unexpected end of stream").into()),
713            }
714        })
715    }
716}
717
718impl ObserveProgress {
719    fn new(
720        fut: impl Future<Output = irpc::Result<mpsc::Receiver<Bitfield>>> + Send + 'static,
721    ) -> Self {
722        Self {
723            inner: Box::pin(fut),
724        }
725    }
726
727    pub async fn await_completion(self) -> RequestResult<Bitfield> {
728        let mut stream = self.stream().await?;
729        while let Some(item) = stream.next().await {
730            if item.is_complete() {
731                return Ok(item);
732            }
733        }
734        Err(super::Error::other("unexpected end of stream").into())
735    }
736
737    /// Returns an infinite stream of bitfields. The first bitfield is the
738    /// current state, and the following bitfields are updates.
739    ///
740    /// Once a blob is complete, there will be no more updates.
741    pub async fn stream(self) -> irpc::Result<impl Stream<Item = Bitfield>> {
742        let mut rx = self.inner.await?;
743        Ok(Gen::new(|co| async move {
744            while let Ok(Some(item)) = rx.recv().await {
745                co.yield_(item).await;
746            }
747        }))
748    }
749}
750
751/// A progress handle for an export operation.
752///
753/// Internally this is a stream of [`ExportProgress`] items. Working with this
754/// stream directly can be inconvenient, so this struct provides some convenience
755/// methods to work with the result.
756///
757/// To get the underlying stream, use the [`ExportProgress::stream`] method.
758///
759/// It also implements [`IntoFuture`], so you can await it to get the size of the
760/// exported blob.
761pub struct ExportProgress {
762    inner: future::Boxed<irpc::Result<mpsc::Receiver<ExportProgressItem>>>,
763}
764
765impl IntoFuture for ExportProgress {
766    type Output = RequestResult<u64>;
767
768    type IntoFuture = Pin<Box<dyn Future<Output = Self::Output> + Send>>;
769
770    fn into_future(self) -> Self::IntoFuture {
771        Box::pin(self.finish())
772    }
773}
774
775impl ExportProgress {
776    fn new(
777        fut: impl Future<Output = irpc::Result<mpsc::Receiver<ExportProgressItem>>> + Send + 'static,
778    ) -> Self {
779        Self {
780            inner: Box::pin(fut),
781        }
782    }
783
784    pub async fn stream(self) -> impl Stream<Item = ExportProgressItem> {
785        Gen::new(|co| async move {
786            let mut rx = match self.inner.await {
787                Ok(rx) => rx,
788                Err(e) => {
789                    co.yield_(ExportProgressItem::Error(e.into())).await;
790                    return;
791                }
792            };
793            while let Ok(Some(item)) = rx.recv().await {
794                co.yield_(item).await;
795            }
796        })
797    }
798
799    pub async fn finish(self) -> RequestResult<u64> {
800        let mut rx = self.inner.await?;
801        let mut size = None;
802        loop {
803            match rx.recv().await? {
804                Some(ExportProgressItem::Done) => break,
805                Some(ExportProgressItem::Size(s)) => size = Some(s),
806                Some(ExportProgressItem::Error(cause)) => return Err(cause.into()),
807                _ => {}
808            }
809        }
810        if let Some(size) = size {
811            Ok(size)
812        } else {
813            Err(super::Error::other("unexpected end of stream").into())
814        }
815    }
816}
817
818/// A handle for an ongoing bao import operation.
819pub struct ImportBaoHandle {
820    pub tx: mpsc::Sender<BaoContentItem>,
821    pub rx: oneshot::Receiver<super::Result<()>>,
822}
823
824impl ImportBaoHandle {
825    pub(crate) async fn new(
826        fut: impl Future<
827                Output = irpc::Result<(
828                    mpsc::Sender<BaoContentItem>,
829                    oneshot::Receiver<super::Result<()>>,
830                )>,
831            > + Send
832            + 'static,
833    ) -> irpc::Result<Self> {
834        let (tx, rx) = fut.await?;
835        Ok(Self { tx, rx })
836    }
837}
838
839/// A progress handle for a blobs list operation.
840pub struct BlobsListProgress {
841    inner: future::Boxed<irpc::Result<mpsc::Receiver<super::Result<Hash>>>>,
842}
843
844impl BlobsListProgress {
845    fn new(
846        fut: impl Future<Output = irpc::Result<mpsc::Receiver<super::Result<Hash>>>> + Send + 'static,
847    ) -> Self {
848        Self {
849            inner: Box::pin(fut),
850        }
851    }
852
853    pub async fn hashes(self) -> RequestResult<Vec<Hash>> {
854        let mut rx: mpsc::Receiver<Result<Hash, super::Error>> = self.inner.await?;
855        let mut hashes = Vec::new();
856        while let Some(item) = rx.recv().await? {
857            hashes.push(item?);
858        }
859        Ok(hashes)
860    }
861
862    pub async fn stream(self) -> irpc::Result<impl Stream<Item = super::Result<Hash>>> {
863        let mut rx = self.inner.await?;
864        Ok(Gen::new(|co| async move {
865            while let Ok(Some(item)) = rx.recv().await {
866                co.yield_(item).await;
867            }
868        }))
869    }
870}
871
872/// A progress handle for a bao export operation.
873///
874/// Internally, this is a stream of [`EncodedItem`]s. Using this stream directly
875/// is often inconvenient, so there are a number of higher level methods to
876/// process the stream.
877///
878/// You can get access to the underlying stream using the [`ExportBaoProgress::stream`] method.
879pub struct ExportRangesProgress {
880    ranges: RangeSet2<u64>,
881    inner: future::Boxed<irpc::Result<mpsc::Receiver<ExportRangesItem>>>,
882}
883
884impl ExportRangesProgress {
885    fn new(
886        ranges: RangeSet2<u64>,
887        fut: impl Future<Output = irpc::Result<mpsc::Receiver<ExportRangesItem>>> + Send + 'static,
888    ) -> Self {
889        Self {
890            ranges,
891            inner: Box::pin(fut),
892        }
893    }
894}
895
896impl ExportRangesProgress {
897    /// A raw stream of [`ExportRangesItem`]s.
898    ///
899    /// Ranges will be rounded up to chunk boundaries. So if you request a
900    /// range of 0..100, you will get the entire first chunk, 0..1024.
901    ///
902    /// It is up to the caller to clip the ranges to the requested ranges.
903    pub fn stream(self) -> impl Stream<Item = ExportRangesItem> {
904        Gen::new(|co| async move {
905            let mut rx = match self.inner.await {
906                Ok(rx) => rx,
907                Err(e) => {
908                    co.yield_(ExportRangesItem::Error(e.into())).await;
909                    return;
910                }
911            };
912            while let Ok(Some(item)) = rx.recv().await {
913                co.yield_(item).await;
914            }
915        })
916    }
917
918    /// Concatenate all the data into a single `Bytes`.
919    pub async fn concatenate(self) -> RequestResult<Vec<u8>> {
920        let mut rx = self.inner.await?;
921        let mut data = BTreeMap::new();
922        while let Some(item) = rx.recv().await? {
923            match item {
924                ExportRangesItem::Size(_) => {}
925                ExportRangesItem::Data(leaf) => {
926                    data.insert(leaf.offset, leaf.data);
927                }
928                ExportRangesItem::Error(cause) => return Err(cause.into()),
929            }
930        }
931        let mut res = Vec::new();
932        for range in self.ranges.iter() {
933            let (start, end) = match range {
934                RangeSetRange::RangeFrom(range) => (*range.start, u64::MAX),
935                RangeSetRange::Range(range) => (*range.start, *range.end),
936            };
937            for (offset, data) in data.iter() {
938                let cstart = *offset;
939                let cend = *offset + (data.len() as u64);
940                if cstart >= end || cend <= start {
941                    continue;
942                }
943                let start = start.max(cstart);
944                let end = end.min(cend);
945                let data = &data[(start - cstart) as usize..(end - cstart) as usize];
946                res.extend_from_slice(data);
947            }
948        }
949        Ok(res)
950    }
951}
952
953/// A progress handle for a bao export operation.
954///
955/// Internally, this is a stream of [`EncodedItem`]s. Using this stream directly
956/// is often inconvenient, so there are a number of higher level methods to
957/// process the stream.
958///
959/// You can get access to the underlying stream using the [`ExportBaoProgress::stream`] method.
960pub struct ExportBaoProgress {
961    inner: future::Boxed<irpc::Result<mpsc::Receiver<EncodedItem>>>,
962}
963
964impl ExportBaoProgress {
965    fn new(
966        fut: impl Future<Output = irpc::Result<mpsc::Receiver<EncodedItem>>> + Send + 'static,
967    ) -> Self {
968        Self {
969            inner: Box::pin(fut),
970        }
971    }
972
973    /// Interprets this blob as a hash sequence and returns a stream of hashes.
974    ///
975    /// Errors will be reported, but the iterator will nevertheless continue.
976    /// If you get an error despite having asked for ranges that should be present,
977    /// this means that the data is corrupted. It can still make sense to continue
978    /// to get all non-corrupted sections.
979    pub fn hashes_with_index(
980        self,
981    ) -> impl Stream<Item = std::result::Result<(u64, Hash), anyhow::Error>> {
982        let mut stream = self.stream();
983        Gen::new(|co| async move {
984            while let Some(item) = stream.next().await {
985                let leaf = match item {
986                    EncodedItem::Leaf(leaf) => leaf,
987                    EncodedItem::Error(e) => {
988                        co.yield_(Err(e.into())).await;
989                        continue;
990                    }
991                    _ => continue,
992                };
993                let slice = match HashSeqChunk::try_from(leaf) {
994                    Ok(slice) => slice,
995                    Err(e) => {
996                        co.yield_(Err(e)).await;
997                        continue;
998                    }
999                };
1000                let offset = slice.base();
1001                for (o, hash) in slice.into_iter().enumerate() {
1002                    co.yield_(Ok((offset + o as u64, hash))).await;
1003                }
1004            }
1005        })
1006    }
1007
1008    /// Same as [`Self::hashes_with_index`], but without the indexes.
1009    pub fn hashes(self) -> impl Stream<Item = std::result::Result<Hash, anyhow::Error>> {
1010        self.hashes_with_index().map(|x| x.map(|(_, hash)| hash))
1011    }
1012
1013    pub async fn bao_to_vec(self) -> RequestResult<Vec<u8>> {
1014        let mut data = Vec::new();
1015        let mut stream = self.into_byte_stream();
1016        while let Some(item) = stream.next().await {
1017            data.extend_from_slice(&item?);
1018        }
1019        Ok(data)
1020    }
1021
1022    pub async fn data_to_bytes(self) -> super::ExportBaoResult<Bytes> {
1023        let mut rx = self.inner.await?;
1024        let mut data = Vec::new();
1025        while let Some(item) = rx.recv().await? {
1026            match item {
1027                EncodedItem::Leaf(leaf) => {
1028                    data.push(leaf.data);
1029                }
1030                EncodedItem::Parent(_) => {}
1031                EncodedItem::Size(_) => {}
1032                EncodedItem::Done => break,
1033                EncodedItem::Error(cause) => return Err(cause.into()),
1034            }
1035        }
1036        if data.len() == 1 {
1037            Ok(data.pop().unwrap())
1038        } else {
1039            let mut out = Vec::new();
1040            for item in data {
1041                out.extend_from_slice(&item);
1042            }
1043            Ok(out.into())
1044        }
1045    }
1046
1047    pub async fn data_to_vec(self) -> super::ExportBaoResult<Vec<u8>> {
1048        let mut rx = self.inner.await?;
1049        let mut data = Vec::new();
1050        while let Some(item) = rx.recv().await? {
1051            match item {
1052                EncodedItem::Leaf(leaf) => {
1053                    data.extend_from_slice(&leaf.data);
1054                }
1055                EncodedItem::Parent(_) => {}
1056                EncodedItem::Size(_) => {}
1057                EncodedItem::Done => break,
1058                EncodedItem::Error(cause) => return Err(cause.into()),
1059            }
1060        }
1061        Ok(data)
1062    }
1063
1064    pub async fn write_quinn(self, target: &mut quinn::SendStream) -> super::ExportBaoResult<()> {
1065        let mut rx = self.inner.await?;
1066        while let Some(item) = rx.recv().await? {
1067            match item {
1068                EncodedItem::Size(size) => {
1069                    target.write_u64_le(size).await?;
1070                }
1071                EncodedItem::Parent(parent) => {
1072                    let mut data = vec![0u8; 64];
1073                    data[..32].copy_from_slice(parent.pair.0.as_bytes());
1074                    data[32..].copy_from_slice(parent.pair.1.as_bytes());
1075                    target.write_all(&data).await.map_err(io::Error::from)?;
1076                }
1077                EncodedItem::Leaf(leaf) => {
1078                    target
1079                        .write_chunk(leaf.data)
1080                        .await
1081                        .map_err(io::Error::from)?;
1082                }
1083                EncodedItem::Done => break,
1084                EncodedItem::Error(cause) => return Err(cause.into()),
1085            }
1086        }
1087        Ok(())
1088    }
1089
1090    /// Write quinn variant that also feeds a progress writer.
1091    pub(crate) async fn write_quinn_with_progress(
1092        self,
1093        writer: &mut SendStream,
1094        progress: &mut impl WriteProgress,
1095        hash: &Hash,
1096        index: u64,
1097    ) -> super::ExportBaoResult<()> {
1098        let mut rx = self.inner.await?;
1099        while let Some(item) = rx.recv().await? {
1100            match item {
1101                EncodedItem::Size(size) => {
1102                    progress.send_transfer_started(index, hash, size).await;
1103                    writer.write_u64_le(size).await?;
1104                    progress.log_other_write(8);
1105                }
1106                EncodedItem::Parent(parent) => {
1107                    let mut data = vec![0u8; 64];
1108                    data[..32].copy_from_slice(parent.pair.0.as_bytes());
1109                    data[32..].copy_from_slice(parent.pair.1.as_bytes());
1110                    writer.write_all(&data).await.map_err(io::Error::from)?;
1111                    progress.log_other_write(64);
1112                }
1113                EncodedItem::Leaf(leaf) => {
1114                    let len = leaf.data.len();
1115                    writer
1116                        .write_chunk(leaf.data)
1117                        .await
1118                        .map_err(io::Error::from)?;
1119                    progress
1120                        .notify_payload_write(index, leaf.offset, len)
1121                        .await?;
1122                }
1123                EncodedItem::Done => break,
1124                EncodedItem::Error(cause) => return Err(cause.into()),
1125            }
1126        }
1127        Ok(())
1128    }
1129
1130    pub fn into_byte_stream(self) -> impl Stream<Item = super::Result<Bytes>> {
1131        self.stream().filter_map(|item| match item {
1132            EncodedItem::Size(size) => {
1133                let size = size.to_le_bytes().to_vec().into();
1134                Some(Ok(size))
1135            }
1136            EncodedItem::Parent(parent) => {
1137                let mut data = vec![0u8; 64];
1138                data[..32].copy_from_slice(parent.pair.0.as_bytes());
1139                data[32..].copy_from_slice(parent.pair.1.as_bytes());
1140                Some(Ok(data.into()))
1141            }
1142            EncodedItem::Leaf(leaf) => Some(Ok(leaf.data)),
1143            EncodedItem::Done => None,
1144            EncodedItem::Error(cause) => Some(Err(cause.into())),
1145        })
1146    }
1147
1148    pub fn stream(self) -> impl Stream<Item = EncodedItem> {
1149        Gen::new(|co| async move {
1150            let mut rx = match self.inner.await {
1151                Ok(rx) => rx,
1152                Err(cause) => {
1153                    co.yield_(EncodedItem::Error(io::Error::other(cause).into()))
1154                        .await;
1155                    return;
1156                }
1157            };
1158            while let Ok(Some(item)) = rx.recv().await {
1159                co.yield_(item).await;
1160            }
1161        })
1162    }
1163}
1164
1165pub(crate) trait WriteProgress {
1166    /// Notify the progress writer that a payload write has happened.
1167    async fn notify_payload_write(&mut self, index: u64, offset: u64, len: usize) -> ClientResult;
1168
1169    /// Log a write of some other data.
1170    fn log_other_write(&mut self, len: usize);
1171
1172    /// Notify the progress writer that a transfer has started.
1173    async fn send_transfer_started(&mut self, index: u64, hash: &Hash, size: u64);
1174}