iroh_blobs/api/
blobs.rs

1//! API to interact with a local blob store
2//!
3//! This API is for local interactions with the blob store, such as importing
4//! and exporting blobs, observing the bitfield of a blob, and deleting blobs.
5//!
6//! The main entry point is the [`Blobs`] struct.
7use std::{
8    collections::BTreeMap,
9    future::{Future, IntoFuture},
10    io,
11    num::NonZeroU64,
12    path::{Path, PathBuf},
13    pin::Pin,
14};
15
16pub use bao_tree::io::mixed::EncodedItem;
17use bao_tree::{
18    io::{
19        fsm::{ResponseDecoder, ResponseDecoderNext},
20        BaoContentItem, Leaf,
21    },
22    BaoTree, ChunkNum, ChunkRanges,
23};
24use bytes::Bytes;
25use genawaiter::sync::Gen;
26use iroh_io::{AsyncStreamReader, TokioStreamReader};
27use irpc::channel::{mpsc, oneshot};
28use n0_future::{future, stream, Stream, StreamExt};
29use quinn::SendStream;
30use range_collections::{range_set::RangeSetRange, RangeSet2};
31use ref_cast::RefCast;
32use serde::{Deserialize, Serialize};
33use tokio::io::AsyncWriteExt;
34use tracing::trace;
35mod reader;
36pub use reader::BlobReader;
37
38// Public reexports from the proto module.
39//
40// Due to the fact that the proto module is hidden from docs by default,
41// these will appear in the docs as if they were declared here.
42pub use super::proto::{
43    AddProgressItem, Bitfield, BlobDeleteRequest as DeleteOptions, BlobStatus,
44    ExportBaoRequest as ExportBaoOptions, ExportMode, ExportPathRequest as ExportOptions,
45    ExportProgressItem, ExportRangesRequest as ExportRangesOptions,
46    ImportBaoRequest as ImportBaoOptions, ImportMode, ObserveRequest as ObserveOptions,
47};
48use super::{
49    proto::{
50        BatchResponse, BlobStatusRequest, ClearProtectedRequest, CreateTempTagRequest,
51        ExportBaoRequest, ExportRangesItem, ImportBaoRequest, ImportByteStreamRequest,
52        ImportBytesRequest, ImportPathRequest, ListRequest, Scope,
53    },
54    remote::HashSeqChunk,
55    tags::TagInfo,
56    ApiClient, RequestResult, Tags,
57};
58use crate::{
59    api::proto::{BatchRequest, ImportByteStreamUpdate},
60    provider::StreamContext,
61    store::IROH_BLOCK_SIZE,
62    util::temp_tag::TempTag,
63    BlobFormat, Hash, HashAndFormat,
64};
65
66/// Options for adding bytes.
67#[derive(Debug)]
68pub struct AddBytesOptions {
69    pub data: Bytes,
70    pub format: BlobFormat,
71}
72
73impl<T: Into<Bytes>> From<(T, BlobFormat)> for AddBytesOptions {
74    fn from(item: (T, BlobFormat)) -> Self {
75        let (data, format) = item;
76        Self {
77            data: data.into(),
78            format,
79        }
80    }
81}
82
83/// Blobs API
84#[derive(Debug, Clone, ref_cast::RefCast)]
85#[repr(transparent)]
86pub struct Blobs {
87    client: ApiClient,
88}
89
90impl Blobs {
91    pub(crate) fn ref_from_sender(sender: &ApiClient) -> &Self {
92        Self::ref_cast(sender)
93    }
94
95    pub async fn batch(&self) -> irpc::Result<Batch<'_>> {
96        let msg = BatchRequest;
97        trace!("{msg:?}");
98        let (tx, rx) = self.client.client_streaming(msg, 32).await?;
99        let scope = rx.await?;
100
101        Ok(Batch {
102            scope,
103            blobs: self,
104            _tx: tx,
105        })
106    }
107
108    /// Create a reader for the given hash. The reader implements [`tokio::io::AsyncRead`] and [`tokio::io::AsyncSeek`]
109    /// and therefore can be used to read the blob's content.
110    ///
111    /// Any access to parts of the blob that are not present will result in an error.
112    ///
113    /// Example:
114    /// ```rust
115    /// use iroh_blobs::{store::mem::MemStore, api::blobs::Blobs};
116    /// use tokio::io::AsyncReadExt;
117    ///
118    /// # async fn example() -> anyhow::Result<()> {
119    /// let store = MemStore::new();
120    /// let tag = store.add_slice(b"Hello, world!").await?;
121    /// let mut reader = store.reader(tag.hash);
122    /// let mut buf = String::new();
123    /// reader.read_to_string(&mut buf).await?;
124    /// assert_eq!(buf, "Hello, world!");
125    /// # Ok(())
126    /// }
127    /// ```
128    pub fn reader(&self, hash: impl Into<Hash>) -> BlobReader {
129        self.reader_with_opts(ReaderOptions { hash: hash.into() })
130    }
131
132    /// Create a reader for the given options. The reader implements [`tokio::io::AsyncRead`] and [`tokio::io::AsyncSeek`]
133    /// and therefore can be used to read the blob's content.
134    ///
135    /// Any access to parts of the blob that are not present will result in an error.
136    pub fn reader_with_opts(&self, options: ReaderOptions) -> BlobReader {
137        BlobReader::new(self.clone(), options)
138    }
139
140    /// Delete a blob.
141    ///
142    /// This function is not public, because it does not work as expected when called manually,
143    /// because blobs are protected from deletion. This is only called from the gc task, which
144    /// clears the protections before.
145    ///
146    /// Users should rely only on garbage collection for blob deletion.
147    pub(crate) async fn delete_with_opts(&self, options: DeleteOptions) -> RequestResult<()> {
148        trace!("{options:?}");
149        self.client.rpc(options).await??;
150        Ok(())
151    }
152
153    /// See [`Self::delete_with_opts`].
154    pub(crate) async fn delete(
155        &self,
156        hashes: impl IntoIterator<Item = impl Into<Hash>>,
157    ) -> RequestResult<()> {
158        self.delete_with_opts(DeleteOptions {
159            hashes: hashes.into_iter().map(Into::into).collect(),
160            force: false,
161        })
162        .await
163    }
164
165    pub fn add_slice(&self, data: impl AsRef<[u8]>) -> AddProgress<'_> {
166        let options = ImportBytesRequest {
167            data: Bytes::copy_from_slice(data.as_ref()),
168            format: crate::BlobFormat::Raw,
169            scope: Scope::GLOBAL,
170        };
171        self.add_bytes_impl(options)
172    }
173
174    pub fn add_bytes(&self, data: impl Into<bytes::Bytes>) -> AddProgress<'_> {
175        let options = ImportBytesRequest {
176            data: data.into(),
177            format: crate::BlobFormat::Raw,
178            scope: Scope::GLOBAL,
179        };
180        self.add_bytes_impl(options)
181    }
182
183    pub fn add_bytes_with_opts(&self, options: impl Into<AddBytesOptions>) -> AddProgress<'_> {
184        let options = options.into();
185        let request = ImportBytesRequest {
186            data: options.data,
187            format: options.format,
188            scope: Scope::GLOBAL,
189        };
190        self.add_bytes_impl(request)
191    }
192
193    fn add_bytes_impl(&self, options: ImportBytesRequest) -> AddProgress<'_> {
194        trace!("{options:?}");
195        let this = self.clone();
196        let stream = Gen::new(|co| async move {
197            let mut receiver = match this.client.server_streaming(options, 32).await {
198                Ok(receiver) => receiver,
199                Err(cause) => {
200                    co.yield_(AddProgressItem::Error(cause.into())).await;
201                    return;
202                }
203            };
204            loop {
205                match receiver.recv().await {
206                    Ok(Some(item)) => co.yield_(item).await,
207                    Err(cause) => {
208                        co.yield_(AddProgressItem::Error(cause.into())).await;
209                        break;
210                    }
211                    Ok(None) => break,
212                }
213            }
214        });
215        AddProgress::new(self, stream)
216    }
217
218    pub fn add_path_with_opts(&self, options: impl Into<AddPathOptions>) -> AddProgress<'_> {
219        let options = options.into();
220        self.add_path_with_opts_impl(ImportPathRequest {
221            path: options.path,
222            mode: options.mode,
223            format: options.format,
224            scope: Scope::GLOBAL,
225        })
226    }
227
228    fn add_path_with_opts_impl(&self, options: ImportPathRequest) -> AddProgress<'_> {
229        trace!("{:?}", options);
230        let client = self.client.clone();
231        let stream = Gen::new(|co| async move {
232            let mut receiver = match client.server_streaming(options, 32).await {
233                Ok(receiver) => receiver,
234                Err(cause) => {
235                    co.yield_(AddProgressItem::Error(cause.into())).await;
236                    return;
237                }
238            };
239            loop {
240                match receiver.recv().await {
241                    Ok(Some(item)) => co.yield_(item).await,
242                    Err(cause) => {
243                        co.yield_(AddProgressItem::Error(cause.into())).await;
244                        break;
245                    }
246                    Ok(None) => break,
247                }
248            }
249        });
250        AddProgress::new(self, stream)
251    }
252
253    pub fn add_path(&self, path: impl AsRef<Path>) -> AddProgress<'_> {
254        self.add_path_with_opts(AddPathOptions {
255            path: path.as_ref().to_owned(),
256            mode: ImportMode::Copy,
257            format: BlobFormat::Raw,
258        })
259    }
260
261    pub async fn add_stream(
262        &self,
263        data: impl Stream<Item = io::Result<Bytes>> + Send + Sync + 'static,
264    ) -> AddProgress<'_> {
265        let inner = ImportByteStreamRequest {
266            format: crate::BlobFormat::Raw,
267            scope: Scope::default(),
268        };
269        let client = self.client.clone();
270        let stream = Gen::new(|co| async move {
271            let (sender, mut receiver) = match client.bidi_streaming(inner, 32, 32).await {
272                Ok(x) => x,
273                Err(cause) => {
274                    co.yield_(AddProgressItem::Error(cause.into())).await;
275                    return;
276                }
277            };
278            let recv = async {
279                loop {
280                    match receiver.recv().await {
281                        Ok(Some(item)) => co.yield_(item).await,
282                        Err(cause) => {
283                            co.yield_(AddProgressItem::Error(cause.into())).await;
284                            break;
285                        }
286                        Ok(None) => break,
287                    }
288                }
289            };
290            let send = async {
291                tokio::pin!(data);
292                while let Some(item) = data.next().await {
293                    sender.send(ImportByteStreamUpdate::Bytes(item?)).await?;
294                }
295                sender.send(ImportByteStreamUpdate::Done).await?;
296                anyhow::Ok(())
297            };
298            let _ = tokio::join!(send, recv);
299        });
300        AddProgress::new(self, stream)
301    }
302
303    pub fn export_ranges(
304        &self,
305        hash: impl Into<Hash>,
306        ranges: impl Into<RangeSet2<u64>>,
307    ) -> ExportRangesProgress {
308        self.export_ranges_with_opts(ExportRangesOptions {
309            hash: hash.into(),
310            ranges: ranges.into(),
311        })
312    }
313
314    pub fn export_ranges_with_opts(&self, options: ExportRangesOptions) -> ExportRangesProgress {
315        trace!("{options:?}");
316        ExportRangesProgress::new(
317            options.ranges.clone(),
318            self.client.server_streaming(options, 32),
319        )
320    }
321
322    pub fn export_bao_with_opts(
323        &self,
324        options: ExportBaoOptions,
325        local_update_cap: usize,
326    ) -> ExportBaoProgress {
327        trace!("{options:?}");
328        ExportBaoProgress::new(self.client.server_streaming(options, local_update_cap))
329    }
330
331    pub fn export_bao(
332        &self,
333        hash: impl Into<Hash>,
334        ranges: impl Into<ChunkRanges>,
335    ) -> ExportBaoProgress {
336        self.export_bao_with_opts(
337            ExportBaoRequest {
338                hash: hash.into(),
339                ranges: ranges.into(),
340            },
341            32,
342        )
343    }
344
345    /// Export a single chunk from the given hash, at the given offset.
346    pub async fn export_chunk(
347        &self,
348        hash: impl Into<Hash>,
349        offset: u64,
350    ) -> super::ExportBaoResult<Leaf> {
351        let base = ChunkNum::full_chunks(offset);
352        let ranges = ChunkRanges::from(base..base + 1);
353        let mut stream = self.export_bao(hash, ranges).stream();
354        while let Some(item) = stream.next().await {
355            match item {
356                EncodedItem::Leaf(leaf) => return Ok(leaf),
357                EncodedItem::Parent(_) => {}
358                EncodedItem::Size(_) => {}
359                EncodedItem::Done => break,
360                EncodedItem::Error(cause) => return Err(cause.into()),
361            }
362        }
363        Err(io::Error::other("unexpected end of stream").into())
364    }
365
366    /// Get the entire blob into a Bytes
367    ///
368    /// This will run out of memory when called for very large blobs, so be careful!
369    pub async fn get_bytes(&self, hash: impl Into<Hash>) -> super::ExportBaoResult<Bytes> {
370        self.export_bao(hash.into(), ChunkRanges::all())
371            .data_to_bytes()
372            .await
373    }
374
375    /// Observe the bitfield of the given hash.
376    pub fn observe(&self, hash: impl Into<Hash>) -> ObserveProgress {
377        self.observe_with_opts(ObserveOptions { hash: hash.into() })
378    }
379
380    pub fn observe_with_opts(&self, options: ObserveOptions) -> ObserveProgress {
381        trace!("{:?}", options);
382        if options.hash == Hash::EMPTY {
383            return ObserveProgress::new(async move {
384                let (tx, rx) = mpsc::channel(1);
385                tx.send(Bitfield::complete(0)).await.ok();
386                Ok(rx)
387            });
388        }
389        ObserveProgress::new(self.client.server_streaming(options, 32))
390    }
391
392    pub fn export_with_opts(&self, options: ExportOptions) -> ExportProgress {
393        trace!("{:?}", options);
394        ExportProgress::new(self.client.server_streaming(options, 32))
395    }
396
397    pub fn export(&self, hash: impl Into<Hash>, target: impl AsRef<Path>) -> ExportProgress {
398        let options = ExportOptions {
399            hash: hash.into(),
400            mode: ExportMode::Copy,
401            target: target.as_ref().to_owned(),
402        };
403        self.export_with_opts(options)
404    }
405
406    /// Import BaoContentItems from a stream.
407    ///
408    /// The store assumes that these are already verified and in the correct order.
409    #[cfg_attr(feature = "hide-proto-docs", doc(hidden))]
410    pub async fn import_bao(
411        &self,
412        hash: impl Into<Hash>,
413        size: NonZeroU64,
414        local_update_cap: usize,
415    ) -> irpc::Result<ImportBaoHandle> {
416        let options = ImportBaoRequest {
417            hash: hash.into(),
418            size,
419        };
420        self.import_bao_with_opts(options, local_update_cap).await
421    }
422
423    #[cfg_attr(feature = "hide-proto-docs", doc(hidden))]
424    pub async fn import_bao_with_opts(
425        &self,
426        options: ImportBaoOptions,
427        local_update_cap: usize,
428    ) -> irpc::Result<ImportBaoHandle> {
429        trace!("{:?}", options);
430        ImportBaoHandle::new(self.client.client_streaming(options, local_update_cap)).await
431    }
432
433    #[cfg_attr(feature = "hide-proto-docs", doc(hidden))]
434    async fn import_bao_reader<R: AsyncStreamReader>(
435        &self,
436        hash: Hash,
437        ranges: ChunkRanges,
438        mut reader: R,
439    ) -> RequestResult<R> {
440        let size = u64::from_le_bytes(reader.read::<8>().await.map_err(super::Error::other)?);
441        let Some(size) = NonZeroU64::new(size) else {
442            return if hash == Hash::EMPTY {
443                Ok(reader)
444            } else {
445                Err(super::Error::other("invalid size for hash").into())
446            };
447        };
448        let tree = BaoTree::new(size.get(), IROH_BLOCK_SIZE);
449        let mut decoder = ResponseDecoder::new(hash.into(), ranges, tree, reader);
450        let options = ImportBaoOptions { hash, size };
451        let handle = self.import_bao_with_opts(options, 32).await?;
452        let driver = async move {
453            let reader = loop {
454                match decoder.next().await {
455                    ResponseDecoderNext::More((rest, item)) => {
456                        handle.tx.send(item?).await?;
457                        decoder = rest;
458                    }
459                    ResponseDecoderNext::Done(reader) => break reader,
460                };
461            };
462            drop(handle.tx);
463            io::Result::Ok(reader)
464        };
465        let fut = async move { handle.rx.await.map_err(io::Error::other)? };
466        let (reader, res) = tokio::join!(driver, fut);
467        res?;
468        Ok(reader?)
469    }
470
471    #[cfg_attr(feature = "hide-proto-docs", doc(hidden))]
472    pub async fn import_bao_quinn(
473        &self,
474        hash: Hash,
475        ranges: ChunkRanges,
476        stream: &mut iroh::endpoint::RecvStream,
477    ) -> RequestResult<()> {
478        let reader = TokioStreamReader::new(stream);
479        self.import_bao_reader(hash, ranges, reader).await?;
480        Ok(())
481    }
482
483    #[cfg_attr(feature = "hide-proto-docs", doc(hidden))]
484    pub async fn import_bao_bytes(
485        &self,
486        hash: Hash,
487        ranges: ChunkRanges,
488        data: impl Into<Bytes>,
489    ) -> RequestResult<()> {
490        self.import_bao_reader(hash, ranges, data.into()).await?;
491        Ok(())
492    }
493
494    pub fn list(&self) -> BlobsListProgress {
495        let msg = ListRequest;
496        let client = self.client.clone();
497        BlobsListProgress::new(client.server_streaming(msg, 32))
498    }
499
500    pub async fn status(&self, hash: impl Into<Hash>) -> irpc::Result<BlobStatus> {
501        let hash = hash.into();
502        let msg = BlobStatusRequest { hash };
503        self.client.rpc(msg).await
504    }
505
506    pub async fn has(&self, hash: impl Into<Hash>) -> irpc::Result<bool> {
507        match self.status(hash).await? {
508            BlobStatus::Complete { .. } => Ok(true),
509            _ => Ok(false),
510        }
511    }
512
513    pub(crate) async fn clear_protected(&self) -> RequestResult<()> {
514        let msg = ClearProtectedRequest;
515        self.client.rpc(msg).await??;
516        Ok(())
517    }
518}
519
520/// A progress handle for a batch scoped add operation.
521pub struct BatchAddProgress<'a>(AddProgress<'a>);
522
523impl<'a> IntoFuture for BatchAddProgress<'a> {
524    type Output = RequestResult<TempTag>;
525
526    type IntoFuture = Pin<Box<dyn Future<Output = Self::Output> + Send + 'a>>;
527
528    fn into_future(self) -> Self::IntoFuture {
529        Box::pin(self.temp_tag())
530    }
531}
532
533impl<'a> BatchAddProgress<'a> {
534    pub async fn with_named_tag(self, name: impl AsRef<[u8]>) -> RequestResult<HashAndFormat> {
535        self.0.with_named_tag(name).await
536    }
537
538    pub async fn with_tag(self) -> RequestResult<TagInfo> {
539        self.0.with_tag().await
540    }
541
542    pub async fn stream(self) -> impl Stream<Item = AddProgressItem> {
543        self.0.stream().await
544    }
545
546    pub async fn temp_tag(self) -> RequestResult<TempTag> {
547        self.0.temp_tag().await
548    }
549}
550
551/// A batch of operations that modify the blob store.
552pub struct Batch<'a> {
553    scope: Scope,
554    blobs: &'a Blobs,
555    _tx: mpsc::Sender<BatchResponse>,
556}
557
558impl<'a> Batch<'a> {
559    pub fn add_bytes(&self, data: impl Into<Bytes>) -> BatchAddProgress<'_> {
560        let options = ImportBytesRequest {
561            data: data.into(),
562            format: crate::BlobFormat::Raw,
563            scope: self.scope,
564        };
565        BatchAddProgress(self.blobs.add_bytes_impl(options))
566    }
567
568    pub fn add_bytes_with_opts(&self, options: impl Into<AddBytesOptions>) -> BatchAddProgress<'_> {
569        let options = options.into();
570        BatchAddProgress(self.blobs.add_bytes_impl(ImportBytesRequest {
571            data: options.data,
572            format: options.format,
573            scope: self.scope,
574        }))
575    }
576
577    pub fn add_slice(&self, data: impl AsRef<[u8]>) -> BatchAddProgress<'_> {
578        let options = ImportBytesRequest {
579            data: Bytes::copy_from_slice(data.as_ref()),
580            format: crate::BlobFormat::Raw,
581            scope: self.scope,
582        };
583        BatchAddProgress(self.blobs.add_bytes_impl(options))
584    }
585
586    pub fn add_path_with_opts(&self, options: impl Into<AddPathOptions>) -> BatchAddProgress<'_> {
587        let options = options.into();
588        BatchAddProgress(self.blobs.add_path_with_opts_impl(ImportPathRequest {
589            path: options.path,
590            mode: options.mode,
591            format: options.format,
592            scope: self.scope,
593        }))
594    }
595
596    pub async fn temp_tag(&self, value: impl Into<HashAndFormat>) -> irpc::Result<TempTag> {
597        let value = value.into();
598        let msg = CreateTempTagRequest {
599            scope: self.scope,
600            value,
601        };
602        self.blobs.client.rpc(msg).await
603    }
604}
605
606/// Options for adding data from a file system path.
607#[derive(Debug)]
608pub struct AddPathOptions {
609    pub path: PathBuf,
610    pub format: BlobFormat,
611    pub mode: ImportMode,
612}
613
614/// A progress handle for an import operation.
615///
616/// Internally this is a stream of [`AddProgressItem`] items. Working with this
617/// stream directly can be inconvenient, so this struct provides some convenience
618/// methods to work with the result.
619///
620/// It also implements [`IntoFuture`], so you can await it to get the [`TempTag`] that
621/// contains the hash of the added content and also protects the content.
622///
623/// If you want access to the stream, you can use the [`AddProgress::stream`] method.
624pub struct AddProgress<'a> {
625    blobs: &'a Blobs,
626    inner: stream::Boxed<AddProgressItem>,
627}
628
629impl<'a> IntoFuture for AddProgress<'a> {
630    type Output = RequestResult<TagInfo>;
631
632    type IntoFuture = Pin<Box<dyn Future<Output = Self::Output> + Send + 'a>>;
633
634    fn into_future(self) -> Self::IntoFuture {
635        Box::pin(self.with_tag())
636    }
637}
638
639impl<'a> AddProgress<'a> {
640    fn new(blobs: &'a Blobs, stream: impl Stream<Item = AddProgressItem> + Send + 'static) -> Self {
641        Self {
642            blobs,
643            inner: Box::pin(stream),
644        }
645    }
646
647    pub async fn temp_tag(self) -> RequestResult<TempTag> {
648        let mut stream = self.inner;
649        while let Some(item) = stream.next().await {
650            match item {
651                AddProgressItem::Done(tt) => return Ok(tt),
652                AddProgressItem::Error(e) => return Err(e.into()),
653                _ => {}
654            }
655        }
656        Err(super::Error::other("unexpected end of stream").into())
657    }
658
659    pub async fn with_named_tag(self, name: impl AsRef<[u8]>) -> RequestResult<HashAndFormat> {
660        let blobs = self.blobs.clone();
661        let tt = self.temp_tag().await?;
662        let haf = *tt.hash_and_format();
663        let tags = Tags::ref_from_sender(&blobs.client);
664        tags.set(name, *tt.hash_and_format()).await?;
665        drop(tt);
666        Ok(haf)
667    }
668
669    pub async fn with_tag(self) -> RequestResult<TagInfo> {
670        let blobs = self.blobs.clone();
671        let tt = self.temp_tag().await?;
672        let hash = *tt.hash();
673        let format = tt.format();
674        let tags = Tags::ref_from_sender(&blobs.client);
675        let name = tags.create(*tt.hash_and_format()).await?;
676        drop(tt);
677        Ok(TagInfo { name, hash, format })
678    }
679
680    pub async fn stream(self) -> impl Stream<Item = AddProgressItem> {
681        self.inner
682    }
683}
684
685/// Options for an async reader for blobs that supports AsyncRead and AsyncSeek.
686#[derive(Debug, Clone, Serialize, Deserialize)]
687pub struct ReaderOptions {
688    pub hash: Hash,
689}
690
691/// An observe result. Awaiting this will return the current state.
692///
693/// Calling [`ObserveProgress::stream`] will return a stream of updates, where
694/// the first item is the current state and subsequent items are updates.
695pub struct ObserveProgress {
696    inner: future::Boxed<irpc::Result<mpsc::Receiver<Bitfield>>>,
697}
698
699impl IntoFuture for ObserveProgress {
700    type Output = RequestResult<Bitfield>;
701
702    type IntoFuture = Pin<Box<dyn Future<Output = Self::Output> + Send>>;
703
704    fn into_future(self) -> Self::IntoFuture {
705        Box::pin(async move {
706            let mut rx = self.inner.await?;
707            match rx.recv().await? {
708                Some(bitfield) => Ok(bitfield),
709                None => Err(super::Error::other("unexpected end of stream").into()),
710            }
711        })
712    }
713}
714
715impl ObserveProgress {
716    fn new(
717        fut: impl Future<Output = irpc::Result<mpsc::Receiver<Bitfield>>> + Send + 'static,
718    ) -> Self {
719        Self {
720            inner: Box::pin(fut),
721        }
722    }
723
724    pub async fn await_completion(self) -> RequestResult<Bitfield> {
725        let mut stream = self.stream().await?;
726        while let Some(item) = stream.next().await {
727            if item.is_complete() {
728                return Ok(item);
729            }
730        }
731        Err(super::Error::other("unexpected end of stream").into())
732    }
733
734    /// Returns an infinite stream of bitfields. The first bitfield is the
735    /// current state, and the following bitfields are updates.
736    ///
737    /// Once a blob is complete, there will be no more updates.
738    pub async fn stream(self) -> irpc::Result<impl Stream<Item = Bitfield>> {
739        let mut rx = self.inner.await?;
740        Ok(Gen::new(|co| async move {
741            while let Ok(Some(item)) = rx.recv().await {
742                co.yield_(item).await;
743            }
744        }))
745    }
746}
747
748/// A progress handle for an export operation.
749///
750/// Internally this is a stream of [`ExportProgress`] items. Working with this
751/// stream directly can be inconvenient, so this struct provides some convenience
752/// methods to work with the result.
753///
754/// To get the underlying stream, use the [`ExportProgress::stream`] method.
755///
756/// It also implements [`IntoFuture`], so you can await it to get the size of the
757/// exported blob.
758pub struct ExportProgress {
759    inner: future::Boxed<irpc::Result<mpsc::Receiver<ExportProgressItem>>>,
760}
761
762impl IntoFuture for ExportProgress {
763    type Output = RequestResult<u64>;
764
765    type IntoFuture = Pin<Box<dyn Future<Output = Self::Output> + Send>>;
766
767    fn into_future(self) -> Self::IntoFuture {
768        Box::pin(self.finish())
769    }
770}
771
772impl ExportProgress {
773    fn new(
774        fut: impl Future<Output = irpc::Result<mpsc::Receiver<ExportProgressItem>>> + Send + 'static,
775    ) -> Self {
776        Self {
777            inner: Box::pin(fut),
778        }
779    }
780
781    pub async fn stream(self) -> impl Stream<Item = ExportProgressItem> {
782        Gen::new(|co| async move {
783            let mut rx = match self.inner.await {
784                Ok(rx) => rx,
785                Err(e) => {
786                    co.yield_(ExportProgressItem::Error(e.into())).await;
787                    return;
788                }
789            };
790            while let Ok(Some(item)) = rx.recv().await {
791                co.yield_(item).await;
792            }
793        })
794    }
795
796    pub async fn finish(self) -> RequestResult<u64> {
797        let mut rx = self.inner.await?;
798        let mut size = None;
799        loop {
800            match rx.recv().await? {
801                Some(ExportProgressItem::Done) => break,
802                Some(ExportProgressItem::Size(s)) => size = Some(s),
803                Some(ExportProgressItem::Error(cause)) => return Err(cause.into()),
804                _ => {}
805            }
806        }
807        if let Some(size) = size {
808            Ok(size)
809        } else {
810            Err(super::Error::other("unexpected end of stream").into())
811        }
812    }
813}
814
815/// A handle for an ongoing bao import operation.
816pub struct ImportBaoHandle {
817    pub tx: mpsc::Sender<BaoContentItem>,
818    pub rx: oneshot::Receiver<super::Result<()>>,
819}
820
821impl ImportBaoHandle {
822    pub(crate) async fn new(
823        fut: impl Future<
824                Output = irpc::Result<(
825                    mpsc::Sender<BaoContentItem>,
826                    oneshot::Receiver<super::Result<()>>,
827                )>,
828            > + Send
829            + 'static,
830    ) -> irpc::Result<Self> {
831        let (tx, rx) = fut.await?;
832        Ok(Self { tx, rx })
833    }
834}
835
836/// A progress handle for a blobs list operation.
837pub struct BlobsListProgress {
838    inner: future::Boxed<irpc::Result<mpsc::Receiver<super::Result<Hash>>>>,
839}
840
841impl BlobsListProgress {
842    fn new(
843        fut: impl Future<Output = irpc::Result<mpsc::Receiver<super::Result<Hash>>>> + Send + 'static,
844    ) -> Self {
845        Self {
846            inner: Box::pin(fut),
847        }
848    }
849
850    pub async fn hashes(self) -> RequestResult<Vec<Hash>> {
851        let mut rx: mpsc::Receiver<Result<Hash, super::Error>> = self.inner.await?;
852        let mut hashes = Vec::new();
853        while let Some(item) = rx.recv().await? {
854            hashes.push(item?);
855        }
856        Ok(hashes)
857    }
858
859    pub async fn stream(self) -> irpc::Result<impl Stream<Item = super::Result<Hash>>> {
860        let mut rx = self.inner.await?;
861        Ok(Gen::new(|co| async move {
862            while let Ok(Some(item)) = rx.recv().await {
863                co.yield_(item).await;
864            }
865        }))
866    }
867}
868
869/// A progress handle for a bao export operation.
870///
871/// Internally, this is a stream of [`EncodedItem`]s. Using this stream directly
872/// is often inconvenient, so there are a number of higher level methods to
873/// process the stream.
874///
875/// You can get access to the underlying stream using the [`ExportBaoProgress::stream`] method.
876pub struct ExportRangesProgress {
877    ranges: RangeSet2<u64>,
878    inner: future::Boxed<irpc::Result<mpsc::Receiver<ExportRangesItem>>>,
879}
880
881impl ExportRangesProgress {
882    fn new(
883        ranges: RangeSet2<u64>,
884        fut: impl Future<Output = irpc::Result<mpsc::Receiver<ExportRangesItem>>> + Send + 'static,
885    ) -> Self {
886        Self {
887            ranges,
888            inner: Box::pin(fut),
889        }
890    }
891}
892
893impl ExportRangesProgress {
894    /// A raw stream of [`ExportRangesItem`]s.
895    ///
896    /// Ranges will be rounded up to chunk boundaries. So if you request a
897    /// range of 0..100, you will get the entire first chunk, 0..1024.
898    ///
899    /// It is up to the caller to clip the ranges to the requested ranges.
900    pub fn stream(self) -> impl Stream<Item = ExportRangesItem> {
901        Gen::new(|co| async move {
902            let mut rx = match self.inner.await {
903                Ok(rx) => rx,
904                Err(e) => {
905                    co.yield_(ExportRangesItem::Error(e.into())).await;
906                    return;
907                }
908            };
909            while let Ok(Some(item)) = rx.recv().await {
910                co.yield_(item).await;
911            }
912        })
913    }
914
915    /// Concatenate all the data into a single `Bytes`.
916    pub async fn concatenate(self) -> RequestResult<Vec<u8>> {
917        let mut rx = self.inner.await?;
918        let mut data = BTreeMap::new();
919        while let Some(item) = rx.recv().await? {
920            match item {
921                ExportRangesItem::Size(_) => {}
922                ExportRangesItem::Data(leaf) => {
923                    data.insert(leaf.offset, leaf.data);
924                }
925                ExportRangesItem::Error(cause) => return Err(cause.into()),
926            }
927        }
928        let mut res = Vec::new();
929        for range in self.ranges.iter() {
930            let (start, end) = match range {
931                RangeSetRange::RangeFrom(range) => (*range.start, u64::MAX),
932                RangeSetRange::Range(range) => (*range.start, *range.end),
933            };
934            for (offset, data) in data.iter() {
935                let cstart = *offset;
936                let cend = *offset + (data.len() as u64);
937                if cstart >= end || cend <= start {
938                    continue;
939                }
940                let start = start.max(cstart);
941                let end = end.min(cend);
942                let data = &data[(start - cstart) as usize..(end - cstart) as usize];
943                res.extend_from_slice(data);
944            }
945        }
946        Ok(res)
947    }
948}
949
950/// A progress handle for a bao export operation.
951///
952/// Internally, this is a stream of [`EncodedItem`]s. Using this stream directly
953/// is often inconvenient, so there are a number of higher level methods to
954/// process the stream.
955///
956/// You can get access to the underlying stream using the [`ExportBaoProgress::stream`] method.
957pub struct ExportBaoProgress {
958    inner: future::Boxed<irpc::Result<mpsc::Receiver<EncodedItem>>>,
959}
960
961impl ExportBaoProgress {
962    fn new(
963        fut: impl Future<Output = irpc::Result<mpsc::Receiver<EncodedItem>>> + Send + 'static,
964    ) -> Self {
965        Self {
966            inner: Box::pin(fut),
967        }
968    }
969
970    /// Interprets this blob as a hash sequence and returns a stream of hashes.
971    ///
972    /// Errors will be reported, but the iterator will nevertheless continue.
973    /// If you get an error despite having asked for ranges that should be present,
974    /// this means that the data is corrupted. It can still make sense to continue
975    /// to get all non-corrupted sections.
976    pub fn hashes_with_index(
977        self,
978    ) -> impl Stream<Item = std::result::Result<(u64, Hash), anyhow::Error>> {
979        let mut stream = self.stream();
980        Gen::new(|co| async move {
981            while let Some(item) = stream.next().await {
982                let leaf = match item {
983                    EncodedItem::Leaf(leaf) => leaf,
984                    EncodedItem::Error(e) => {
985                        co.yield_(Err(e.into())).await;
986                        continue;
987                    }
988                    _ => continue,
989                };
990                let slice = match HashSeqChunk::try_from(leaf) {
991                    Ok(slice) => slice,
992                    Err(e) => {
993                        co.yield_(Err(e)).await;
994                        continue;
995                    }
996                };
997                let offset = slice.base();
998                for (o, hash) in slice.into_iter().enumerate() {
999                    co.yield_(Ok((offset + o as u64, hash))).await;
1000                }
1001            }
1002        })
1003    }
1004
1005    /// Same as [`Self::hashes_with_index`], but without the indexes.
1006    pub fn hashes(self) -> impl Stream<Item = std::result::Result<Hash, anyhow::Error>> {
1007        self.hashes_with_index().map(|x| x.map(|(_, hash)| hash))
1008    }
1009
1010    pub async fn bao_to_vec(self) -> RequestResult<Vec<u8>> {
1011        let mut data = Vec::new();
1012        let mut stream = self.into_byte_stream();
1013        while let Some(item) = stream.next().await {
1014            data.extend_from_slice(&item?);
1015        }
1016        Ok(data)
1017    }
1018
1019    pub async fn data_to_bytes(self) -> super::ExportBaoResult<Bytes> {
1020        let mut rx = self.inner.await?;
1021        let mut data = Vec::new();
1022        while let Some(item) = rx.recv().await? {
1023            match item {
1024                EncodedItem::Leaf(leaf) => {
1025                    data.push(leaf.data);
1026                }
1027                EncodedItem::Parent(_) => {}
1028                EncodedItem::Size(_) => {}
1029                EncodedItem::Done => break,
1030                EncodedItem::Error(cause) => return Err(cause.into()),
1031            }
1032        }
1033        if data.len() == 1 {
1034            Ok(data.pop().unwrap())
1035        } else {
1036            let mut out = Vec::new();
1037            for item in data {
1038                out.extend_from_slice(&item);
1039            }
1040            Ok(out.into())
1041        }
1042    }
1043
1044    pub async fn data_to_vec(self) -> super::ExportBaoResult<Vec<u8>> {
1045        let mut rx = self.inner.await?;
1046        let mut data = Vec::new();
1047        while let Some(item) = rx.recv().await? {
1048            match item {
1049                EncodedItem::Leaf(leaf) => {
1050                    data.extend_from_slice(&leaf.data);
1051                }
1052                EncodedItem::Parent(_) => {}
1053                EncodedItem::Size(_) => {}
1054                EncodedItem::Done => break,
1055                EncodedItem::Error(cause) => return Err(cause.into()),
1056            }
1057        }
1058        Ok(data)
1059    }
1060
1061    pub async fn write_quinn(self, target: &mut quinn::SendStream) -> super::ExportBaoResult<()> {
1062        let mut rx = self.inner.await?;
1063        while let Some(item) = rx.recv().await? {
1064            match item {
1065                EncodedItem::Size(size) => {
1066                    target.write_u64_le(size).await?;
1067                }
1068                EncodedItem::Parent(parent) => {
1069                    let mut data = vec![0u8; 64];
1070                    data[..32].copy_from_slice(parent.pair.0.as_bytes());
1071                    data[32..].copy_from_slice(parent.pair.1.as_bytes());
1072                    target.write_all(&data).await.map_err(io::Error::from)?;
1073                }
1074                EncodedItem::Leaf(leaf) => {
1075                    target
1076                        .write_chunk(leaf.data)
1077                        .await
1078                        .map_err(io::Error::from)?;
1079                }
1080                EncodedItem::Done => break,
1081                EncodedItem::Error(cause) => return Err(cause.into()),
1082            }
1083        }
1084        Ok(())
1085    }
1086
1087    /// Write quinn variant that also feeds a progress writer.
1088    pub(crate) async fn write_quinn_with_progress(
1089        self,
1090        writer: &mut SendStream,
1091        progress: &mut impl WriteProgress,
1092        hash: &Hash,
1093        index: u64,
1094    ) -> super::ExportBaoResult<()> {
1095        let mut rx = self.inner.await?;
1096        while let Some(item) = rx.recv().await? {
1097            match item {
1098                EncodedItem::Size(size) => {
1099                    progress.send_transfer_started(index, hash, size).await;
1100                    writer.write_u64_le(size).await?;
1101                    progress.log_other_write(8);
1102                }
1103                EncodedItem::Parent(parent) => {
1104                    let mut data = vec![0u8; 64];
1105                    data[..32].copy_from_slice(parent.pair.0.as_bytes());
1106                    data[32..].copy_from_slice(parent.pair.1.as_bytes());
1107                    writer.write_all(&data).await.map_err(io::Error::from)?;
1108                    progress.log_other_write(64);
1109                }
1110                EncodedItem::Leaf(leaf) => {
1111                    let len = leaf.data.len();
1112                    writer
1113                        .write_chunk(leaf.data)
1114                        .await
1115                        .map_err(io::Error::from)?;
1116                    progress.notify_payload_write(index, leaf.offset, len).await;
1117                }
1118                EncodedItem::Done => break,
1119                EncodedItem::Error(cause) => return Err(cause.into()),
1120            }
1121        }
1122        Ok(())
1123    }
1124
1125    pub fn into_byte_stream(self) -> impl Stream<Item = super::Result<Bytes>> {
1126        self.stream().filter_map(|item| match item {
1127            EncodedItem::Size(size) => {
1128                let size = size.to_le_bytes().to_vec().into();
1129                Some(Ok(size))
1130            }
1131            EncodedItem::Parent(parent) => {
1132                let mut data = vec![0u8; 64];
1133                data[..32].copy_from_slice(parent.pair.0.as_bytes());
1134                data[32..].copy_from_slice(parent.pair.1.as_bytes());
1135                Some(Ok(data.into()))
1136            }
1137            EncodedItem::Leaf(leaf) => Some(Ok(leaf.data)),
1138            EncodedItem::Done => None,
1139            EncodedItem::Error(cause) => Some(Err(cause.into())),
1140        })
1141    }
1142
1143    pub fn stream(self) -> impl Stream<Item = EncodedItem> {
1144        Gen::new(|co| async move {
1145            let mut rx = match self.inner.await {
1146                Ok(rx) => rx,
1147                Err(cause) => {
1148                    co.yield_(EncodedItem::Error(io::Error::other(cause).into()))
1149                        .await;
1150                    return;
1151                }
1152            };
1153            while let Ok(Some(item)) = rx.recv().await {
1154                co.yield_(item).await;
1155            }
1156        })
1157    }
1158}
1159
1160pub(crate) trait WriteProgress {
1161    /// Notify the progress writer that a payload write has happened.
1162    async fn notify_payload_write(&mut self, index: u64, offset: u64, len: usize);
1163
1164    /// Log a write of some other data.
1165    fn log_other_write(&mut self, len: usize);
1166
1167    /// Notify the progress writer that a transfer has started.
1168    async fn send_transfer_started(&mut self, index: u64, hash: &Hash, size: u64);
1169}
1170
1171impl WriteProgress for StreamContext {
1172    async fn notify_payload_write(&mut self, index: u64, offset: u64, len: usize) {
1173        StreamContext::notify_payload_write(self, index, offset, len);
1174    }
1175
1176    fn log_other_write(&mut self, len: usize) {
1177        StreamContext::log_other_write(self, len);
1178    }
1179
1180    async fn send_transfer_started(&mut self, index: u64, hash: &Hash, size: u64) {
1181        StreamContext::send_transfer_started(self, index, hash, size).await
1182    }
1183}