iroh_blobs/api/
blobs.rs

1//! API to interact with a local blob store
2//!
3//! This API is for local interactions with the blob store, such as importing
4//! and exporting blobs, observing the bitfield of a blob, and deleting blobs.
5//!
6//! The main entry point is the [`Blobs`] struct.
7use std::{
8    collections::BTreeMap,
9    future::{Future, IntoFuture},
10    io,
11    num::NonZeroU64,
12    path::{Path, PathBuf},
13    pin::Pin,
14};
15
16pub use bao_tree::io::mixed::EncodedItem;
17use bao_tree::{
18    io::{
19        fsm::{ResponseDecoder, ResponseDecoderNext},
20        BaoContentItem, Leaf,
21    },
22    BaoTree, ChunkNum, ChunkRanges,
23};
24use bytes::Bytes;
25use genawaiter::sync::Gen;
26use iroh_io::{AsyncStreamReader, TokioStreamReader};
27use irpc::channel::{mpsc, oneshot};
28use n0_future::{future, stream, Stream, StreamExt};
29use quinn::SendStream;
30use range_collections::{range_set::RangeSetRange, RangeSet2};
31use ref_cast::RefCast;
32use tokio::io::AsyncWriteExt;
33use tracing::trace;
34
35// Public reexports from the proto module.
36//
37// Due to the fact that the proto module is hidden from docs by default,
38// these will appear in the docs as if they were declared here.
39pub use super::proto::{
40    AddProgressItem, Bitfield, BlobDeleteRequest as DeleteOptions, BlobStatus,
41    ExportBaoRequest as ExportBaoOptions, ExportMode, ExportPathRequest as ExportOptions,
42    ExportProgressItem, ExportRangesRequest as ExportRangesOptions,
43    ImportBaoRequest as ImportBaoOptions, ImportMode, ObserveRequest as ObserveOptions,
44};
45use super::{
46    proto::{
47        BatchResponse, BlobStatusRequest, ClearProtectedRequest, CreateTempTagRequest,
48        ExportBaoRequest, ExportRangesItem, ImportBaoRequest, ImportByteStreamRequest,
49        ImportBytesRequest, ImportPathRequest, ListRequest, Scope,
50    },
51    remote::HashSeqChunk,
52    tags::TagInfo,
53    ApiClient, RequestResult, Tags,
54};
55use crate::{
56    api::proto::{BatchRequest, ImportByteStreamUpdate},
57    provider::StreamContext,
58    store::IROH_BLOCK_SIZE,
59    util::temp_tag::TempTag,
60    BlobFormat, Hash, HashAndFormat,
61};
62
63/// Options for adding bytes.
64#[derive(Debug)]
65pub struct AddBytesOptions {
66    pub data: Bytes,
67    pub format: BlobFormat,
68}
69
70impl<T: Into<Bytes>> From<(T, BlobFormat)> for AddBytesOptions {
71    fn from(item: (T, BlobFormat)) -> Self {
72        let (data, format) = item;
73        Self {
74            data: data.into(),
75            format,
76        }
77    }
78}
79
80/// Blobs API
81#[derive(Debug, Clone, ref_cast::RefCast)]
82#[repr(transparent)]
83pub struct Blobs {
84    client: ApiClient,
85}
86
87impl Blobs {
88    pub(crate) fn ref_from_sender(sender: &ApiClient) -> &Self {
89        Self::ref_cast(sender)
90    }
91
92    pub async fn batch(&self) -> irpc::Result<Batch<'_>> {
93        let msg = BatchRequest;
94        trace!("{msg:?}");
95        let (tx, rx) = self.client.client_streaming(msg, 32).await?;
96        let scope = rx.await?;
97
98        Ok(Batch {
99            scope,
100            blobs: self,
101            _tx: tx,
102        })
103    }
104
105    /// Delete a blob.
106    ///
107    /// This function is not public, because it does not work as expected when called manually,
108    /// because blobs are protected from deletion. This is only called from the gc task, which
109    /// clears the protections before.
110    ///
111    /// Users should rely only on garbage collection for blob deletion.
112    pub(crate) async fn delete_with_opts(&self, options: DeleteOptions) -> RequestResult<()> {
113        trace!("{options:?}");
114        self.client.rpc(options).await??;
115        Ok(())
116    }
117
118    /// See [`Self::delete_with_opts`].
119    pub(crate) async fn delete(
120        &self,
121        hashes: impl IntoIterator<Item = impl Into<Hash>>,
122    ) -> RequestResult<()> {
123        self.delete_with_opts(DeleteOptions {
124            hashes: hashes.into_iter().map(Into::into).collect(),
125            force: false,
126        })
127        .await
128    }
129
130    pub fn add_slice(&self, data: impl AsRef<[u8]>) -> AddProgress<'_> {
131        let options = ImportBytesRequest {
132            data: Bytes::copy_from_slice(data.as_ref()),
133            format: crate::BlobFormat::Raw,
134            scope: Scope::GLOBAL,
135        };
136        self.add_bytes_impl(options)
137    }
138
139    pub fn add_bytes(&self, data: impl Into<bytes::Bytes>) -> AddProgress<'_> {
140        let options = ImportBytesRequest {
141            data: data.into(),
142            format: crate::BlobFormat::Raw,
143            scope: Scope::GLOBAL,
144        };
145        self.add_bytes_impl(options)
146    }
147
148    pub fn add_bytes_with_opts(&self, options: impl Into<AddBytesOptions>) -> AddProgress<'_> {
149        let options = options.into();
150        let request = ImportBytesRequest {
151            data: options.data,
152            format: options.format,
153            scope: Scope::GLOBAL,
154        };
155        self.add_bytes_impl(request)
156    }
157
158    fn add_bytes_impl(&self, options: ImportBytesRequest) -> AddProgress<'_> {
159        trace!("{options:?}");
160        let this = self.clone();
161        let stream = Gen::new(|co| async move {
162            let mut receiver = match this.client.server_streaming(options, 32).await {
163                Ok(receiver) => receiver,
164                Err(cause) => {
165                    co.yield_(AddProgressItem::Error(cause.into())).await;
166                    return;
167                }
168            };
169            loop {
170                match receiver.recv().await {
171                    Ok(Some(item)) => co.yield_(item).await,
172                    Err(cause) => {
173                        co.yield_(AddProgressItem::Error(cause.into())).await;
174                        break;
175                    }
176                    Ok(None) => break,
177                }
178            }
179        });
180        AddProgress::new(self, stream)
181    }
182
183    pub fn add_path_with_opts(&self, options: impl Into<AddPathOptions>) -> AddProgress<'_> {
184        let options = options.into();
185        self.add_path_with_opts_impl(ImportPathRequest {
186            path: options.path,
187            mode: options.mode,
188            format: options.format,
189            scope: Scope::GLOBAL,
190        })
191    }
192
193    fn add_path_with_opts_impl(&self, options: ImportPathRequest) -> AddProgress<'_> {
194        trace!("{:?}", options);
195        let client = self.client.clone();
196        let stream = Gen::new(|co| async move {
197            let mut receiver = match client.server_streaming(options, 32).await {
198                Ok(receiver) => receiver,
199                Err(cause) => {
200                    co.yield_(AddProgressItem::Error(cause.into())).await;
201                    return;
202                }
203            };
204            loop {
205                match receiver.recv().await {
206                    Ok(Some(item)) => co.yield_(item).await,
207                    Err(cause) => {
208                        co.yield_(AddProgressItem::Error(cause.into())).await;
209                        break;
210                    }
211                    Ok(None) => break,
212                }
213            }
214        });
215        AddProgress::new(self, stream)
216    }
217
218    pub fn add_path(&self, path: impl AsRef<Path>) -> AddProgress<'_> {
219        self.add_path_with_opts(AddPathOptions {
220            path: path.as_ref().to_owned(),
221            mode: ImportMode::Copy,
222            format: BlobFormat::Raw,
223        })
224    }
225
226    pub async fn add_stream(
227        &self,
228        data: impl Stream<Item = io::Result<Bytes>> + Send + Sync + 'static,
229    ) -> AddProgress<'_> {
230        let inner = ImportByteStreamRequest {
231            format: crate::BlobFormat::Raw,
232            scope: Scope::default(),
233        };
234        let client = self.client.clone();
235        let stream = Gen::new(|co| async move {
236            let (sender, mut receiver) = match client.bidi_streaming(inner, 32, 32).await {
237                Ok(x) => x,
238                Err(cause) => {
239                    co.yield_(AddProgressItem::Error(cause.into())).await;
240                    return;
241                }
242            };
243            let recv = async {
244                loop {
245                    match receiver.recv().await {
246                        Ok(Some(item)) => co.yield_(item).await,
247                        Err(cause) => {
248                            co.yield_(AddProgressItem::Error(cause.into())).await;
249                            break;
250                        }
251                        Ok(None) => break,
252                    }
253                }
254            };
255            let send = async {
256                tokio::pin!(data);
257                while let Some(item) = data.next().await {
258                    sender.send(ImportByteStreamUpdate::Bytes(item?)).await?;
259                }
260                sender.send(ImportByteStreamUpdate::Done).await?;
261                anyhow::Ok(())
262            };
263            let _ = tokio::join!(send, recv);
264        });
265        AddProgress::new(self, stream)
266    }
267
268    pub fn export_ranges(
269        &self,
270        hash: impl Into<Hash>,
271        ranges: impl Into<RangeSet2<u64>>,
272    ) -> ExportRangesProgress {
273        self.export_ranges_with_opts(ExportRangesOptions {
274            hash: hash.into(),
275            ranges: ranges.into(),
276        })
277    }
278
279    pub fn export_ranges_with_opts(&self, options: ExportRangesOptions) -> ExportRangesProgress {
280        trace!("{options:?}");
281        ExportRangesProgress::new(
282            options.ranges.clone(),
283            self.client.server_streaming(options, 32),
284        )
285    }
286
287    pub fn export_bao_with_opts(
288        &self,
289        options: ExportBaoOptions,
290        local_update_cap: usize,
291    ) -> ExportBaoProgress {
292        trace!("{options:?}");
293        ExportBaoProgress::new(self.client.server_streaming(options, local_update_cap))
294    }
295
296    pub fn export_bao(
297        &self,
298        hash: impl Into<Hash>,
299        ranges: impl Into<ChunkRanges>,
300    ) -> ExportBaoProgress {
301        self.export_bao_with_opts(
302            ExportBaoRequest {
303                hash: hash.into(),
304                ranges: ranges.into(),
305            },
306            32,
307        )
308    }
309
310    /// Export a single chunk from the given hash, at the given offset.
311    pub async fn export_chunk(
312        &self,
313        hash: impl Into<Hash>,
314        offset: u64,
315    ) -> super::ExportBaoResult<Leaf> {
316        let base = ChunkNum::full_chunks(offset);
317        let ranges = ChunkRanges::from(base..base + 1);
318        let mut stream = self.export_bao(hash, ranges).stream();
319        while let Some(item) = stream.next().await {
320            match item {
321                EncodedItem::Leaf(leaf) => return Ok(leaf),
322                EncodedItem::Parent(_) => {}
323                EncodedItem::Size(_) => {}
324                EncodedItem::Done => break,
325                EncodedItem::Error(cause) => return Err(cause.into()),
326            }
327        }
328        Err(io::Error::other("unexpected end of stream").into())
329    }
330
331    /// Get the entire blob into a Bytes
332    ///
333    /// This will run out of memory when called for very large blobs, so be careful!
334    pub async fn get_bytes(&self, hash: impl Into<Hash>) -> super::ExportBaoResult<Bytes> {
335        self.export_bao(hash.into(), ChunkRanges::all())
336            .data_to_bytes()
337            .await
338    }
339
340    /// Observe the bitfield of the given hash.
341    pub fn observe(&self, hash: impl Into<Hash>) -> ObserveProgress {
342        self.observe_with_opts(ObserveOptions { hash: hash.into() })
343    }
344
345    pub fn observe_with_opts(&self, options: ObserveOptions) -> ObserveProgress {
346        trace!("{:?}", options);
347        if options.hash == Hash::EMPTY {
348            return ObserveProgress::new(async move {
349                let (tx, rx) = mpsc::channel(1);
350                tx.send(Bitfield::complete(0)).await.ok();
351                Ok(rx)
352            });
353        }
354        ObserveProgress::new(self.client.server_streaming(options, 32))
355    }
356
357    pub fn export_with_opts(&self, options: ExportOptions) -> ExportProgress {
358        trace!("{:?}", options);
359        ExportProgress::new(self.client.server_streaming(options, 32))
360    }
361
362    pub fn export(&self, hash: impl Into<Hash>, target: impl AsRef<Path>) -> ExportProgress {
363        let options = ExportOptions {
364            hash: hash.into(),
365            mode: ExportMode::Copy,
366            target: target.as_ref().to_owned(),
367        };
368        self.export_with_opts(options)
369    }
370
371    /// Import BaoContentItems from a stream.
372    ///
373    /// The store assumes that these are already verified and in the correct order.
374    #[cfg_attr(feature = "hide-proto-docs", doc(hidden))]
375    pub async fn import_bao(
376        &self,
377        hash: impl Into<Hash>,
378        size: NonZeroU64,
379        local_update_cap: usize,
380    ) -> irpc::Result<ImportBaoHandle> {
381        let options = ImportBaoRequest {
382            hash: hash.into(),
383            size,
384        };
385        self.import_bao_with_opts(options, local_update_cap).await
386    }
387
388    #[cfg_attr(feature = "hide-proto-docs", doc(hidden))]
389    pub async fn import_bao_with_opts(
390        &self,
391        options: ImportBaoOptions,
392        local_update_cap: usize,
393    ) -> irpc::Result<ImportBaoHandle> {
394        trace!("{:?}", options);
395        ImportBaoHandle::new(self.client.client_streaming(options, local_update_cap)).await
396    }
397
398    #[cfg_attr(feature = "hide-proto-docs", doc(hidden))]
399    async fn import_bao_reader<R: AsyncStreamReader>(
400        &self,
401        hash: Hash,
402        ranges: ChunkRanges,
403        mut reader: R,
404    ) -> RequestResult<R> {
405        let size = u64::from_le_bytes(reader.read::<8>().await.map_err(super::Error::other)?);
406        let Some(size) = NonZeroU64::new(size) else {
407            return if hash == Hash::EMPTY {
408                Ok(reader)
409            } else {
410                Err(super::Error::other("invalid size for hash").into())
411            };
412        };
413        let tree = BaoTree::new(size.get(), IROH_BLOCK_SIZE);
414        let mut decoder = ResponseDecoder::new(hash.into(), ranges, tree, reader);
415        let options = ImportBaoOptions { hash, size };
416        let handle = self.import_bao_with_opts(options, 32).await?;
417        let driver = async move {
418            let reader = loop {
419                match decoder.next().await {
420                    ResponseDecoderNext::More((rest, item)) => {
421                        handle.tx.send(item?).await?;
422                        decoder = rest;
423                    }
424                    ResponseDecoderNext::Done(reader) => break reader,
425                };
426            };
427            drop(handle.tx);
428            io::Result::Ok(reader)
429        };
430        let fut = async move { handle.rx.await.map_err(io::Error::other)? };
431        let (reader, res) = tokio::join!(driver, fut);
432        res?;
433        Ok(reader?)
434    }
435
436    #[cfg_attr(feature = "hide-proto-docs", doc(hidden))]
437    pub async fn import_bao_quinn(
438        &self,
439        hash: Hash,
440        ranges: ChunkRanges,
441        stream: &mut iroh::endpoint::RecvStream,
442    ) -> RequestResult<()> {
443        let reader = TokioStreamReader::new(stream);
444        self.import_bao_reader(hash, ranges, reader).await?;
445        Ok(())
446    }
447
448    #[cfg_attr(feature = "hide-proto-docs", doc(hidden))]
449    pub async fn import_bao_bytes(
450        &self,
451        hash: Hash,
452        ranges: ChunkRanges,
453        data: impl Into<Bytes>,
454    ) -> RequestResult<()> {
455        self.import_bao_reader(hash, ranges, data.into()).await?;
456        Ok(())
457    }
458
459    pub fn list(&self) -> BlobsListProgress {
460        let msg = ListRequest;
461        let client = self.client.clone();
462        BlobsListProgress::new(client.server_streaming(msg, 32))
463    }
464
465    pub async fn status(&self, hash: impl Into<Hash>) -> irpc::Result<BlobStatus> {
466        let hash = hash.into();
467        let msg = BlobStatusRequest { hash };
468        self.client.rpc(msg).await
469    }
470
471    pub async fn has(&self, hash: impl Into<Hash>) -> irpc::Result<bool> {
472        match self.status(hash).await? {
473            BlobStatus::Complete { .. } => Ok(true),
474            _ => Ok(false),
475        }
476    }
477
478    pub(crate) async fn clear_protected(&self) -> RequestResult<()> {
479        let msg = ClearProtectedRequest;
480        self.client.rpc(msg).await??;
481        Ok(())
482    }
483}
484
485/// A progress handle for a batch scoped add operation.
486pub struct BatchAddProgress<'a>(AddProgress<'a>);
487
488impl<'a> IntoFuture for BatchAddProgress<'a> {
489    type Output = RequestResult<TempTag>;
490
491    type IntoFuture = Pin<Box<dyn Future<Output = Self::Output> + Send + 'a>>;
492
493    fn into_future(self) -> Self::IntoFuture {
494        Box::pin(self.temp_tag())
495    }
496}
497
498impl<'a> BatchAddProgress<'a> {
499    pub async fn with_named_tag(self, name: impl AsRef<[u8]>) -> RequestResult<HashAndFormat> {
500        self.0.with_named_tag(name).await
501    }
502
503    pub async fn with_tag(self) -> RequestResult<TagInfo> {
504        self.0.with_tag().await
505    }
506
507    pub async fn stream(self) -> impl Stream<Item = AddProgressItem> {
508        self.0.stream().await
509    }
510
511    pub async fn temp_tag(self) -> RequestResult<TempTag> {
512        self.0.temp_tag().await
513    }
514}
515
516/// A batch of operations that modify the blob store.
517pub struct Batch<'a> {
518    scope: Scope,
519    blobs: &'a Blobs,
520    _tx: mpsc::Sender<BatchResponse>,
521}
522
523impl<'a> Batch<'a> {
524    pub fn add_bytes(&self, data: impl Into<Bytes>) -> BatchAddProgress<'_> {
525        let options = ImportBytesRequest {
526            data: data.into(),
527            format: crate::BlobFormat::Raw,
528            scope: self.scope,
529        };
530        BatchAddProgress(self.blobs.add_bytes_impl(options))
531    }
532
533    pub fn add_bytes_with_opts(&self, options: impl Into<AddBytesOptions>) -> BatchAddProgress<'_> {
534        let options = options.into();
535        BatchAddProgress(self.blobs.add_bytes_impl(ImportBytesRequest {
536            data: options.data,
537            format: options.format,
538            scope: self.scope,
539        }))
540    }
541
542    pub fn add_slice(&self, data: impl AsRef<[u8]>) -> BatchAddProgress<'_> {
543        let options = ImportBytesRequest {
544            data: Bytes::copy_from_slice(data.as_ref()),
545            format: crate::BlobFormat::Raw,
546            scope: self.scope,
547        };
548        BatchAddProgress(self.blobs.add_bytes_impl(options))
549    }
550
551    pub fn add_path_with_opts(&self, options: impl Into<AddPathOptions>) -> BatchAddProgress<'_> {
552        let options = options.into();
553        BatchAddProgress(self.blobs.add_path_with_opts_impl(ImportPathRequest {
554            path: options.path,
555            mode: options.mode,
556            format: options.format,
557            scope: self.scope,
558        }))
559    }
560
561    pub async fn temp_tag(&self, value: impl Into<HashAndFormat>) -> irpc::Result<TempTag> {
562        let value = value.into();
563        let msg = CreateTempTagRequest {
564            scope: self.scope,
565            value,
566        };
567        self.blobs.client.rpc(msg).await
568    }
569}
570
571/// Options for adding data from a file system path.
572#[derive(Debug)]
573pub struct AddPathOptions {
574    pub path: PathBuf,
575    pub format: BlobFormat,
576    pub mode: ImportMode,
577}
578
579/// A progress handle for an import operation.
580///
581/// Internally this is a stream of [`AddProgressItem`] items. Working with this
582/// stream directly can be inconvenient, so this struct provides some convenience
583/// methods to work with the result.
584///
585/// It also implements [`IntoFuture`], so you can await it to get the [`TempTag`] that
586/// contains the hash of the added content and also protects the content.
587///
588/// If you want access to the stream, you can use the [`AddProgress::stream`] method.
589pub struct AddProgress<'a> {
590    blobs: &'a Blobs,
591    inner: stream::Boxed<AddProgressItem>,
592}
593
594impl<'a> IntoFuture for AddProgress<'a> {
595    type Output = RequestResult<TagInfo>;
596
597    type IntoFuture = Pin<Box<dyn Future<Output = Self::Output> + Send + 'a>>;
598
599    fn into_future(self) -> Self::IntoFuture {
600        Box::pin(self.with_tag())
601    }
602}
603
604impl<'a> AddProgress<'a> {
605    fn new(blobs: &'a Blobs, stream: impl Stream<Item = AddProgressItem> + Send + 'static) -> Self {
606        Self {
607            blobs,
608            inner: Box::pin(stream),
609        }
610    }
611
612    pub async fn temp_tag(self) -> RequestResult<TempTag> {
613        let mut stream = self.inner;
614        while let Some(item) = stream.next().await {
615            match item {
616                AddProgressItem::Done(tt) => return Ok(tt),
617                AddProgressItem::Error(e) => return Err(e.into()),
618                _ => {}
619            }
620        }
621        Err(super::Error::other("unexpected end of stream").into())
622    }
623
624    pub async fn with_named_tag(self, name: impl AsRef<[u8]>) -> RequestResult<HashAndFormat> {
625        let blobs = self.blobs.clone();
626        let tt = self.temp_tag().await?;
627        let haf = *tt.hash_and_format();
628        let tags = Tags::ref_from_sender(&blobs.client);
629        tags.set(name, *tt.hash_and_format()).await?;
630        drop(tt);
631        Ok(haf)
632    }
633
634    pub async fn with_tag(self) -> RequestResult<TagInfo> {
635        let blobs = self.blobs.clone();
636        let tt = self.temp_tag().await?;
637        let hash = *tt.hash();
638        let format = tt.format();
639        let tags = Tags::ref_from_sender(&blobs.client);
640        let name = tags.create(*tt.hash_and_format()).await?;
641        drop(tt);
642        Ok(TagInfo { name, hash, format })
643    }
644
645    pub async fn stream(self) -> impl Stream<Item = AddProgressItem> {
646        self.inner
647    }
648}
649
650/// An observe result. Awaiting this will return the current state.
651///
652/// Calling [`ObserveProgress::stream`] will return a stream of updates, where
653/// the first item is the current state and subsequent items are updates.
654pub struct ObserveProgress {
655    inner: future::Boxed<irpc::Result<mpsc::Receiver<Bitfield>>>,
656}
657
658impl IntoFuture for ObserveProgress {
659    type Output = RequestResult<Bitfield>;
660
661    type IntoFuture = Pin<Box<dyn Future<Output = Self::Output> + Send>>;
662
663    fn into_future(self) -> Self::IntoFuture {
664        Box::pin(async move {
665            let mut rx = self.inner.await?;
666            match rx.recv().await? {
667                Some(bitfield) => Ok(bitfield),
668                None => Err(super::Error::other("unexpected end of stream").into()),
669            }
670        })
671    }
672}
673
674impl ObserveProgress {
675    fn new(
676        fut: impl Future<Output = irpc::Result<mpsc::Receiver<Bitfield>>> + Send + 'static,
677    ) -> Self {
678        Self {
679            inner: Box::pin(fut),
680        }
681    }
682
683    pub async fn await_completion(self) -> RequestResult<Bitfield> {
684        let mut stream = self.stream().await?;
685        while let Some(item) = stream.next().await {
686            if item.is_complete() {
687                return Ok(item);
688            }
689        }
690        Err(super::Error::other("unexpected end of stream").into())
691    }
692
693    /// Returns an infinite stream of bitfields. The first bitfield is the
694    /// current state, and the following bitfields are updates.
695    ///
696    /// Once a blob is complete, there will be no more updates.
697    pub async fn stream(self) -> irpc::Result<impl Stream<Item = Bitfield>> {
698        let mut rx = self.inner.await?;
699        Ok(Gen::new(|co| async move {
700            while let Ok(Some(item)) = rx.recv().await {
701                co.yield_(item).await;
702            }
703        }))
704    }
705}
706
707/// A progress handle for an export operation.
708///
709/// Internally this is a stream of [`ExportProgress`] items. Working with this
710/// stream directly can be inconvenient, so this struct provides some convenience
711/// methods to work with the result.
712///
713/// To get the underlying stream, use the [`ExportProgress::stream`] method.
714///
715/// It also implements [`IntoFuture`], so you can await it to get the size of the
716/// exported blob.
717pub struct ExportProgress {
718    inner: future::Boxed<irpc::Result<mpsc::Receiver<ExportProgressItem>>>,
719}
720
721impl IntoFuture for ExportProgress {
722    type Output = RequestResult<u64>;
723
724    type IntoFuture = Pin<Box<dyn Future<Output = Self::Output> + Send>>;
725
726    fn into_future(self) -> Self::IntoFuture {
727        Box::pin(self.finish())
728    }
729}
730
731impl ExportProgress {
732    fn new(
733        fut: impl Future<Output = irpc::Result<mpsc::Receiver<ExportProgressItem>>> + Send + 'static,
734    ) -> Self {
735        Self {
736            inner: Box::pin(fut),
737        }
738    }
739
740    pub async fn stream(self) -> impl Stream<Item = ExportProgressItem> {
741        Gen::new(|co| async move {
742            let mut rx = match self.inner.await {
743                Ok(rx) => rx,
744                Err(e) => {
745                    co.yield_(ExportProgressItem::Error(e.into())).await;
746                    return;
747                }
748            };
749            while let Ok(Some(item)) = rx.recv().await {
750                co.yield_(item).await;
751            }
752        })
753    }
754
755    pub async fn finish(self) -> RequestResult<u64> {
756        let mut rx = self.inner.await?;
757        let mut size = None;
758        loop {
759            match rx.recv().await? {
760                Some(ExportProgressItem::Done) => break,
761                Some(ExportProgressItem::Size(s)) => size = Some(s),
762                Some(ExportProgressItem::Error(cause)) => return Err(cause.into()),
763                _ => {}
764            }
765        }
766        if let Some(size) = size {
767            Ok(size)
768        } else {
769            Err(super::Error::other("unexpected end of stream").into())
770        }
771    }
772}
773
774/// A handle for an ongoing bao import operation.
775pub struct ImportBaoHandle {
776    pub tx: mpsc::Sender<BaoContentItem>,
777    pub rx: oneshot::Receiver<super::Result<()>>,
778}
779
780impl ImportBaoHandle {
781    pub(crate) async fn new(
782        fut: impl Future<
783                Output = irpc::Result<(
784                    mpsc::Sender<BaoContentItem>,
785                    oneshot::Receiver<super::Result<()>>,
786                )>,
787            > + Send
788            + 'static,
789    ) -> irpc::Result<Self> {
790        let (tx, rx) = fut.await?;
791        Ok(Self { tx, rx })
792    }
793}
794
795/// A progress handle for a blobs list operation.
796pub struct BlobsListProgress {
797    inner: future::Boxed<irpc::Result<mpsc::Receiver<super::Result<Hash>>>>,
798}
799
800impl BlobsListProgress {
801    fn new(
802        fut: impl Future<Output = irpc::Result<mpsc::Receiver<super::Result<Hash>>>> + Send + 'static,
803    ) -> Self {
804        Self {
805            inner: Box::pin(fut),
806        }
807    }
808
809    pub async fn hashes(self) -> RequestResult<Vec<Hash>> {
810        let mut rx: mpsc::Receiver<Result<Hash, super::Error>> = self.inner.await?;
811        let mut hashes = Vec::new();
812        while let Some(item) = rx.recv().await? {
813            hashes.push(item?);
814        }
815        Ok(hashes)
816    }
817
818    pub async fn stream(self) -> irpc::Result<impl Stream<Item = super::Result<Hash>>> {
819        let mut rx = self.inner.await?;
820        Ok(Gen::new(|co| async move {
821            while let Ok(Some(item)) = rx.recv().await {
822                co.yield_(item).await;
823            }
824        }))
825    }
826}
827
828/// A progress handle for a bao export operation.
829///
830/// Internally, this is a stream of [`EncodedItem`]s. Using this stream directly
831/// is often inconvenient, so there are a number of higher level methods to
832/// process the stream.
833///
834/// You can get access to the underlying stream using the [`ExportBaoProgress::stream`] method.
835pub struct ExportRangesProgress {
836    ranges: RangeSet2<u64>,
837    inner: future::Boxed<irpc::Result<mpsc::Receiver<ExportRangesItem>>>,
838}
839
840impl ExportRangesProgress {
841    fn new(
842        ranges: RangeSet2<u64>,
843        fut: impl Future<Output = irpc::Result<mpsc::Receiver<ExportRangesItem>>> + Send + 'static,
844    ) -> Self {
845        Self {
846            ranges,
847            inner: Box::pin(fut),
848        }
849    }
850}
851
852impl ExportRangesProgress {
853    /// A raw stream of [`ExportRangesItem`]s.
854    ///
855    /// Ranges will be rounded up to chunk boundaries. So if you request a
856    /// range of 0..100, you will get the entire first chunk, 0..1024.
857    ///
858    /// It is up to the caller to clip the ranges to the requested ranges.
859    pub async fn stream(self) -> impl Stream<Item = ExportRangesItem> {
860        Gen::new(|co| async move {
861            let mut rx = match self.inner.await {
862                Ok(rx) => rx,
863                Err(e) => {
864                    co.yield_(ExportRangesItem::Error(e.into())).await;
865                    return;
866                }
867            };
868            while let Ok(Some(item)) = rx.recv().await {
869                co.yield_(item).await;
870            }
871        })
872    }
873
874    /// Concatenate all the data into a single `Bytes`.
875    pub async fn concatenate(self) -> RequestResult<Vec<u8>> {
876        let mut rx = self.inner.await?;
877        let mut data = BTreeMap::new();
878        while let Some(item) = rx.recv().await? {
879            match item {
880                ExportRangesItem::Size(_) => {}
881                ExportRangesItem::Data(leaf) => {
882                    data.insert(leaf.offset, leaf.data);
883                }
884                ExportRangesItem::Error(cause) => return Err(cause.into()),
885            }
886        }
887        let mut res = Vec::new();
888        for range in self.ranges.iter() {
889            let (start, end) = match range {
890                RangeSetRange::RangeFrom(range) => (*range.start, u64::MAX),
891                RangeSetRange::Range(range) => (*range.start, *range.end),
892            };
893            for (offset, data) in data.iter() {
894                let cstart = *offset;
895                let cend = *offset + (data.len() as u64);
896                if cstart >= end || cend <= start {
897                    continue;
898                }
899                let start = start.max(cstart);
900                let end = end.min(cend);
901                let data = &data[(start - cstart) as usize..(end - cstart) as usize];
902                res.extend_from_slice(data);
903            }
904        }
905        Ok(res)
906    }
907}
908
909/// A progress handle for a bao export operation.
910///
911/// Internally, this is a stream of [`EncodedItem`]s. Using this stream directly
912/// is often inconvenient, so there are a number of higher level methods to
913/// process the stream.
914///
915/// You can get access to the underlying stream using the [`ExportBaoProgress::stream`] method.
916pub struct ExportBaoProgress {
917    inner: future::Boxed<irpc::Result<mpsc::Receiver<EncodedItem>>>,
918}
919
920impl ExportBaoProgress {
921    fn new(
922        fut: impl Future<Output = irpc::Result<mpsc::Receiver<EncodedItem>>> + Send + 'static,
923    ) -> Self {
924        Self {
925            inner: Box::pin(fut),
926        }
927    }
928
929    /// Interprets this blob as a hash sequence and returns a stream of hashes.
930    ///
931    /// Errors will be reported, but the iterator will nevertheless continue.
932    /// If you get an error despite having asked for ranges that should be present,
933    /// this means that the data is corrupted. It can still make sense to continue
934    /// to get all non-corrupted sections.
935    pub fn hashes_with_index(
936        self,
937    ) -> impl Stream<Item = std::result::Result<(u64, Hash), anyhow::Error>> {
938        let mut stream = self.stream();
939        Gen::new(|co| async move {
940            while let Some(item) = stream.next().await {
941                let leaf = match item {
942                    EncodedItem::Leaf(leaf) => leaf,
943                    EncodedItem::Error(e) => {
944                        co.yield_(Err(e.into())).await;
945                        continue;
946                    }
947                    _ => continue,
948                };
949                let slice = match HashSeqChunk::try_from(leaf) {
950                    Ok(slice) => slice,
951                    Err(e) => {
952                        co.yield_(Err(e)).await;
953                        continue;
954                    }
955                };
956                let offset = slice.base();
957                for (o, hash) in slice.into_iter().enumerate() {
958                    co.yield_(Ok((offset + o as u64, hash))).await;
959                }
960            }
961        })
962    }
963
964    /// Same as [`Self::hashes_with_index`], but without the indexes.
965    pub fn hashes(self) -> impl Stream<Item = std::result::Result<Hash, anyhow::Error>> {
966        self.hashes_with_index().map(|x| x.map(|(_, hash)| hash))
967    }
968
969    pub async fn bao_to_vec(self) -> RequestResult<Vec<u8>> {
970        let mut data = Vec::new();
971        let mut stream = self.into_byte_stream();
972        while let Some(item) = stream.next().await {
973            data.extend_from_slice(&item?);
974        }
975        Ok(data)
976    }
977
978    pub async fn data_to_bytes(self) -> super::ExportBaoResult<Bytes> {
979        let mut rx = self.inner.await?;
980        let mut data = Vec::new();
981        while let Some(item) = rx.recv().await? {
982            match item {
983                EncodedItem::Leaf(leaf) => {
984                    data.push(leaf.data);
985                }
986                EncodedItem::Parent(_) => {}
987                EncodedItem::Size(_) => {}
988                EncodedItem::Done => break,
989                EncodedItem::Error(cause) => return Err(cause.into()),
990            }
991        }
992        if data.len() == 1 {
993            Ok(data.pop().unwrap())
994        } else {
995            let mut out = Vec::new();
996            for item in data {
997                out.extend_from_slice(&item);
998            }
999            Ok(out.into())
1000        }
1001    }
1002
1003    pub async fn data_to_vec(self) -> super::ExportBaoResult<Vec<u8>> {
1004        let mut rx = self.inner.await?;
1005        let mut data = Vec::new();
1006        while let Some(item) = rx.recv().await? {
1007            match item {
1008                EncodedItem::Leaf(leaf) => {
1009                    data.extend_from_slice(&leaf.data);
1010                }
1011                EncodedItem::Parent(_) => {}
1012                EncodedItem::Size(_) => {}
1013                EncodedItem::Done => break,
1014                EncodedItem::Error(cause) => return Err(cause.into()),
1015            }
1016        }
1017        Ok(data)
1018    }
1019
1020    pub async fn write_quinn(self, target: &mut quinn::SendStream) -> super::ExportBaoResult<()> {
1021        let mut rx = self.inner.await?;
1022        while let Some(item) = rx.recv().await? {
1023            match item {
1024                EncodedItem::Size(size) => {
1025                    target.write_u64_le(size).await?;
1026                }
1027                EncodedItem::Parent(parent) => {
1028                    let mut data = vec![0u8; 64];
1029                    data[..32].copy_from_slice(parent.pair.0.as_bytes());
1030                    data[32..].copy_from_slice(parent.pair.1.as_bytes());
1031                    target.write_all(&data).await.map_err(io::Error::from)?;
1032                }
1033                EncodedItem::Leaf(leaf) => {
1034                    target
1035                        .write_chunk(leaf.data)
1036                        .await
1037                        .map_err(io::Error::from)?;
1038                }
1039                EncodedItem::Done => break,
1040                EncodedItem::Error(cause) => return Err(cause.into()),
1041            }
1042        }
1043        Ok(())
1044    }
1045
1046    /// Write quinn variant that also feeds a progress writer.
1047    pub(crate) async fn write_quinn_with_progress(
1048        self,
1049        writer: &mut SendStream,
1050        progress: &mut impl WriteProgress,
1051        hash: &Hash,
1052        index: u64,
1053    ) -> super::ExportBaoResult<()> {
1054        let mut rx = self.inner.await?;
1055        while let Some(item) = rx.recv().await? {
1056            match item {
1057                EncodedItem::Size(size) => {
1058                    progress.send_transfer_started(index, hash, size).await;
1059                    writer.write_u64_le(size).await?;
1060                    progress.log_other_write(8);
1061                }
1062                EncodedItem::Parent(parent) => {
1063                    let mut data = vec![0u8; 64];
1064                    data[..32].copy_from_slice(parent.pair.0.as_bytes());
1065                    data[32..].copy_from_slice(parent.pair.1.as_bytes());
1066                    writer.write_all(&data).await.map_err(io::Error::from)?;
1067                    progress.log_other_write(64);
1068                }
1069                EncodedItem::Leaf(leaf) => {
1070                    let len = leaf.data.len();
1071                    writer
1072                        .write_chunk(leaf.data)
1073                        .await
1074                        .map_err(io::Error::from)?;
1075                    progress.notify_payload_write(index, leaf.offset, len).await;
1076                }
1077                EncodedItem::Done => break,
1078                EncodedItem::Error(cause) => return Err(cause.into()),
1079            }
1080        }
1081        Ok(())
1082    }
1083
1084    pub fn into_byte_stream(self) -> impl Stream<Item = super::Result<Bytes>> {
1085        self.stream().filter_map(|item| match item {
1086            EncodedItem::Size(size) => {
1087                let size = size.to_le_bytes().to_vec().into();
1088                Some(Ok(size))
1089            }
1090            EncodedItem::Parent(parent) => {
1091                let mut data = vec![0u8; 64];
1092                data[..32].copy_from_slice(parent.pair.0.as_bytes());
1093                data[32..].copy_from_slice(parent.pair.1.as_bytes());
1094                Some(Ok(data.into()))
1095            }
1096            EncodedItem::Leaf(leaf) => Some(Ok(leaf.data)),
1097            EncodedItem::Done => None,
1098            EncodedItem::Error(cause) => Some(Err(cause.into())),
1099        })
1100    }
1101
1102    pub fn stream(self) -> impl Stream<Item = EncodedItem> {
1103        Gen::new(|co| async move {
1104            let mut rx = match self.inner.await {
1105                Ok(rx) => rx,
1106                Err(cause) => {
1107                    co.yield_(EncodedItem::Error(io::Error::other(cause).into()))
1108                        .await;
1109                    return;
1110                }
1111            };
1112            while let Ok(Some(item)) = rx.recv().await {
1113                co.yield_(item).await;
1114            }
1115        })
1116    }
1117}
1118
1119pub(crate) trait WriteProgress {
1120    /// Notify the progress writer that a payload write has happened.
1121    async fn notify_payload_write(&mut self, index: u64, offset: u64, len: usize);
1122
1123    /// Log a write of some other data.
1124    fn log_other_write(&mut self, len: usize);
1125
1126    /// Notify the progress writer that a transfer has started.
1127    async fn send_transfer_started(&mut self, index: u64, hash: &Hash, size: u64);
1128}
1129
1130impl WriteProgress for StreamContext {
1131    async fn notify_payload_write(&mut self, index: u64, offset: u64, len: usize) {
1132        StreamContext::notify_payload_write(self, index, offset, len);
1133    }
1134
1135    fn log_other_write(&mut self, len: usize) {
1136        StreamContext::log_other_write(self, len);
1137    }
1138
1139    async fn send_transfer_started(&mut self, index: u64, hash: &Hash, size: u64) {
1140        StreamContext::send_transfer_started(self, index, hash, size).await
1141    }
1142}