iroh_blobs/api/
blobs.rs

1//! API to interact with a local blob store
2//!
3//! This API is for local interactions with the blob store, such as importing
4//! and exporting blobs, observing the bitfield of a blob, and deleting blobs.
5//!
6//! The main entry point is the [`Blobs`] struct.
7use std::{
8    collections::BTreeMap,
9    future::{Future, IntoFuture},
10    io,
11    num::NonZeroU64,
12    path::{Path, PathBuf},
13    pin::Pin,
14};
15
16pub use bao_tree::io::mixed::EncodedItem;
17use bao_tree::{
18    io::{
19        fsm::{ResponseDecoder, ResponseDecoderNext},
20        BaoContentItem, Leaf,
21    },
22    BaoTree, ChunkNum, ChunkRanges,
23};
24use bytes::Bytes;
25use genawaiter::sync::Gen;
26use iroh_io::{AsyncStreamReader, TokioStreamReader};
27use irpc::channel::{mpsc, oneshot};
28use n0_future::{future, stream, Stream, StreamExt};
29use quinn::SendStream;
30use range_collections::{range_set::RangeSetRange, RangeSet2};
31use ref_cast::RefCast;
32use tokio::io::AsyncWriteExt;
33use tracing::trace;
34
35// Public reexports from the proto module.
36//
37// Due to the fact that the proto module is hidden from docs by default,
38// these will appear in the docs as if they were declared here.
39pub use super::proto::{
40    AddProgressItem, Bitfield, BlobDeleteRequest as DeleteOptions, BlobStatus,
41    ExportBaoRequest as ExportBaoOptions, ExportMode, ExportPathRequest as ExportOptions,
42    ExportProgressItem, ExportRangesRequest as ExportRangesOptions,
43    ImportBaoRequest as ImportBaoOptions, ImportMode, ObserveRequest as ObserveOptions,
44};
45use super::{
46    proto::{
47        BatchResponse, BlobStatusRequest, ClearProtectedRequest, CreateTempTagRequest,
48        ExportBaoRequest, ExportRangesItem, ImportBaoRequest, ImportByteStreamRequest,
49        ImportBytesRequest, ImportPathRequest, ListRequest, Scope,
50    },
51    remote::HashSeqChunk,
52    tags::TagInfo,
53    ApiClient, RequestResult, Tags,
54};
55use crate::{
56    api::proto::{BatchRequest, ImportByteStreamUpdate},
57    provider::StreamContext,
58    store::IROH_BLOCK_SIZE,
59    util::temp_tag::TempTag,
60    BlobFormat, Hash, HashAndFormat,
61};
62
63/// Options for adding bytes.
64#[derive(Debug)]
65pub struct AddBytesOptions {
66    pub data: Bytes,
67    pub format: BlobFormat,
68}
69
70impl<T: Into<Bytes>> From<(T, BlobFormat)> for AddBytesOptions {
71    fn from(item: (T, BlobFormat)) -> Self {
72        let (data, format) = item;
73        Self {
74            data: data.into(),
75            format,
76        }
77    }
78}
79
80/// Blobs API
81#[derive(Debug, Clone, ref_cast::RefCast)]
82#[repr(transparent)]
83pub struct Blobs {
84    client: ApiClient,
85}
86
87impl Blobs {
88    pub(crate) fn ref_from_sender(sender: &ApiClient) -> &Self {
89        Self::ref_cast(sender)
90    }
91
92    pub async fn batch(&self) -> irpc::Result<Batch<'_>> {
93        let msg = BatchRequest;
94        trace!("{msg:?}");
95        let (tx, rx) = self.client.client_streaming(msg, 32).await?;
96        let scope = rx.await?;
97
98        Ok(Batch {
99            scope,
100            blobs: self,
101            _tx: tx,
102        })
103    }
104
105    pub async fn delete_with_opts(&self, options: DeleteOptions) -> RequestResult<()> {
106        trace!("{options:?}");
107        self.client.rpc(options).await??;
108        Ok(())
109    }
110
111    pub async fn delete(
112        &self,
113        hashes: impl IntoIterator<Item = impl Into<Hash>>,
114    ) -> RequestResult<()> {
115        self.delete_with_opts(DeleteOptions {
116            hashes: hashes.into_iter().map(Into::into).collect(),
117            force: false,
118        })
119        .await
120    }
121
122    pub fn add_slice(&self, data: impl AsRef<[u8]>) -> AddProgress {
123        let options = ImportBytesRequest {
124            data: Bytes::copy_from_slice(data.as_ref()),
125            format: crate::BlobFormat::Raw,
126            scope: Scope::GLOBAL,
127        };
128        self.add_bytes_impl(options)
129    }
130
131    pub fn add_bytes(&self, data: impl Into<bytes::Bytes>) -> AddProgress {
132        let options = ImportBytesRequest {
133            data: data.into(),
134            format: crate::BlobFormat::Raw,
135            scope: Scope::GLOBAL,
136        };
137        self.add_bytes_impl(options)
138    }
139
140    pub fn add_bytes_with_opts(&self, options: impl Into<AddBytesOptions>) -> AddProgress {
141        let options = options.into();
142        let request = ImportBytesRequest {
143            data: options.data,
144            format: options.format,
145            scope: Scope::GLOBAL,
146        };
147        self.add_bytes_impl(request)
148    }
149
150    fn add_bytes_impl(&self, options: ImportBytesRequest) -> AddProgress {
151        trace!("{options:?}");
152        let this = self.clone();
153        let stream = Gen::new(|co| async move {
154            let mut receiver = match this.client.server_streaming(options, 32).await {
155                Ok(receiver) => receiver,
156                Err(cause) => {
157                    co.yield_(AddProgressItem::Error(cause.into())).await;
158                    return;
159                }
160            };
161            loop {
162                match receiver.recv().await {
163                    Ok(Some(item)) => co.yield_(item).await,
164                    Err(cause) => {
165                        co.yield_(AddProgressItem::Error(cause.into())).await;
166                        break;
167                    }
168                    Ok(None) => break,
169                }
170            }
171        });
172        AddProgress::new(self, stream)
173    }
174
175    pub fn add_path_with_opts(&self, options: impl Into<AddPathOptions>) -> AddProgress {
176        let options = options.into();
177        self.add_path_with_opts_impl(ImportPathRequest {
178            path: options.path,
179            mode: options.mode,
180            format: options.format,
181            scope: Scope::GLOBAL,
182        })
183    }
184
185    fn add_path_with_opts_impl(&self, options: ImportPathRequest) -> AddProgress {
186        trace!("{:?}", options);
187        let client = self.client.clone();
188        let stream = Gen::new(|co| async move {
189            let mut receiver = match client.server_streaming(options, 32).await {
190                Ok(receiver) => receiver,
191                Err(cause) => {
192                    co.yield_(AddProgressItem::Error(cause.into())).await;
193                    return;
194                }
195            };
196            loop {
197                match receiver.recv().await {
198                    Ok(Some(item)) => co.yield_(item).await,
199                    Err(cause) => {
200                        co.yield_(AddProgressItem::Error(cause.into())).await;
201                        break;
202                    }
203                    Ok(None) => break,
204                }
205            }
206        });
207        AddProgress::new(self, stream)
208    }
209
210    pub fn add_path(&self, path: impl AsRef<Path>) -> AddProgress {
211        self.add_path_with_opts(AddPathOptions {
212            path: path.as_ref().to_owned(),
213            mode: ImportMode::Copy,
214            format: BlobFormat::Raw,
215        })
216    }
217
218    pub async fn add_stream(
219        &self,
220        data: impl Stream<Item = io::Result<Bytes>> + Send + Sync + 'static,
221    ) -> AddProgress {
222        let inner = ImportByteStreamRequest {
223            format: crate::BlobFormat::Raw,
224            scope: Scope::default(),
225        };
226        let client = self.client.clone();
227        let stream = Gen::new(|co| async move {
228            let (sender, mut receiver) = match client.bidi_streaming(inner, 32, 32).await {
229                Ok(x) => x,
230                Err(cause) => {
231                    co.yield_(AddProgressItem::Error(cause.into())).await;
232                    return;
233                }
234            };
235            let recv = async {
236                loop {
237                    match receiver.recv().await {
238                        Ok(Some(item)) => co.yield_(item).await,
239                        Err(cause) => {
240                            co.yield_(AddProgressItem::Error(cause.into())).await;
241                            break;
242                        }
243                        Ok(None) => break,
244                    }
245                }
246            };
247            let send = async {
248                tokio::pin!(data);
249                while let Some(item) = data.next().await {
250                    sender.send(ImportByteStreamUpdate::Bytes(item?)).await?;
251                }
252                sender.send(ImportByteStreamUpdate::Done).await?;
253                anyhow::Ok(())
254            };
255            let _ = tokio::join!(send, recv);
256        });
257        AddProgress::new(self, stream)
258    }
259
260    pub fn export_ranges(
261        &self,
262        hash: impl Into<Hash>,
263        ranges: impl Into<RangeSet2<u64>>,
264    ) -> ExportRangesProgress {
265        self.export_ranges_with_opts(ExportRangesOptions {
266            hash: hash.into(),
267            ranges: ranges.into(),
268        })
269    }
270
271    pub fn export_ranges_with_opts(&self, options: ExportRangesOptions) -> ExportRangesProgress {
272        trace!("{options:?}");
273        ExportRangesProgress::new(
274            options.ranges.clone(),
275            self.client.server_streaming(options, 32),
276        )
277    }
278
279    pub fn export_bao_with_opts(
280        &self,
281        options: ExportBaoOptions,
282        local_update_cap: usize,
283    ) -> ExportBaoProgress {
284        trace!("{options:?}");
285        ExportBaoProgress::new(self.client.server_streaming(options, local_update_cap))
286    }
287
288    pub fn export_bao(
289        &self,
290        hash: impl Into<Hash>,
291        ranges: impl Into<ChunkRanges>,
292    ) -> ExportBaoProgress {
293        self.export_bao_with_opts(
294            ExportBaoRequest {
295                hash: hash.into(),
296                ranges: ranges.into(),
297            },
298            32,
299        )
300    }
301
302    /// Export a single chunk from the given hash, at the given offset.
303    pub async fn export_chunk(
304        &self,
305        hash: impl Into<Hash>,
306        offset: u64,
307    ) -> super::ExportBaoResult<Leaf> {
308        let base = ChunkNum::full_chunks(offset);
309        let ranges = ChunkRanges::from(base..base + 1);
310        let mut stream = self.export_bao(hash, ranges).stream();
311        while let Some(item) = stream.next().await {
312            match item {
313                EncodedItem::Leaf(leaf) => return Ok(leaf),
314                EncodedItem::Parent(_) => {}
315                EncodedItem::Size(_) => {}
316                EncodedItem::Done => break,
317                EncodedItem::Error(cause) => return Err(cause.into()),
318            }
319        }
320        Err(io::Error::other("unexpected end of stream").into())
321    }
322
323    /// Get the entire blob into a Bytes
324    ///
325    /// This will run out of memory when called for very large blobs, so be careful!
326    pub async fn get_bytes(&self, hash: impl Into<Hash>) -> super::ExportBaoResult<Bytes> {
327        self.export_bao(hash.into(), ChunkRanges::all())
328            .data_to_bytes()
329            .await
330    }
331
332    /// Observe the bitfield of the given hash.
333    pub fn observe(&self, hash: impl Into<Hash>) -> ObserveProgress {
334        self.observe_with_opts(ObserveOptions { hash: hash.into() })
335    }
336
337    pub fn observe_with_opts(&self, options: ObserveOptions) -> ObserveProgress {
338        trace!("{:?}", options);
339        if options.hash == Hash::EMPTY {
340            return ObserveProgress::new(async move {
341                let (tx, rx) = mpsc::channel(1);
342                tx.send(Bitfield::complete(0)).await.ok();
343                Ok(rx)
344            });
345        }
346        ObserveProgress::new(self.client.server_streaming(options, 32))
347    }
348
349    pub fn export_with_opts(&self, options: ExportOptions) -> ExportProgress {
350        trace!("{:?}", options);
351        ExportProgress::new(self.client.server_streaming(options, 32))
352    }
353
354    pub fn export(&self, hash: impl Into<Hash>, target: impl AsRef<Path>) -> ExportProgress {
355        let options = ExportOptions {
356            hash: hash.into(),
357            mode: ExportMode::Copy,
358            target: target.as_ref().to_owned(),
359        };
360        self.export_with_opts(options)
361    }
362
363    /// Import BaoContentItems from a stream.
364    ///
365    /// The store assumes that these are already verified and in the correct order.
366    #[cfg_attr(feature = "hide-proto-docs", doc(hidden))]
367    pub async fn import_bao(
368        &self,
369        hash: impl Into<Hash>,
370        size: NonZeroU64,
371        local_update_cap: usize,
372    ) -> irpc::Result<ImportBaoHandle> {
373        let options = ImportBaoRequest {
374            hash: hash.into(),
375            size,
376        };
377        self.import_bao_with_opts(options, local_update_cap).await
378    }
379
380    #[cfg_attr(feature = "hide-proto-docs", doc(hidden))]
381    pub async fn import_bao_with_opts(
382        &self,
383        options: ImportBaoOptions,
384        local_update_cap: usize,
385    ) -> irpc::Result<ImportBaoHandle> {
386        trace!("{:?}", options);
387        ImportBaoHandle::new(self.client.client_streaming(options, local_update_cap)).await
388    }
389
390    #[cfg_attr(feature = "hide-proto-docs", doc(hidden))]
391    async fn import_bao_reader<R: AsyncStreamReader>(
392        &self,
393        hash: Hash,
394        ranges: ChunkRanges,
395        mut reader: R,
396    ) -> RequestResult<R> {
397        let size = u64::from_le_bytes(reader.read::<8>().await.map_err(super::Error::other)?);
398        let Some(size) = NonZeroU64::new(size) else {
399            return if hash == Hash::EMPTY {
400                Ok(reader)
401            } else {
402                Err(super::Error::other("invalid size for hash").into())
403            };
404        };
405        let tree = BaoTree::new(size.get(), IROH_BLOCK_SIZE);
406        let mut decoder = ResponseDecoder::new(hash.into(), ranges, tree, reader);
407        let options = ImportBaoOptions { hash, size };
408        let handle = self.import_bao_with_opts(options, 32).await?;
409        let driver = async move {
410            let reader = loop {
411                match decoder.next().await {
412                    ResponseDecoderNext::More((rest, item)) => {
413                        handle.tx.send(item?).await?;
414                        decoder = rest;
415                    }
416                    ResponseDecoderNext::Done(reader) => break reader,
417                };
418            };
419            drop(handle.tx);
420            io::Result::Ok(reader)
421        };
422        let fut = async move { handle.rx.await.map_err(io::Error::other)? };
423        let (reader, res) = tokio::join!(driver, fut);
424        res?;
425        Ok(reader?)
426    }
427
428    #[cfg_attr(feature = "hide-proto-docs", doc(hidden))]
429    pub async fn import_bao_quinn(
430        &self,
431        hash: Hash,
432        ranges: ChunkRanges,
433        stream: &mut iroh::endpoint::RecvStream,
434    ) -> RequestResult<()> {
435        let reader = TokioStreamReader::new(stream);
436        self.import_bao_reader(hash, ranges, reader).await?;
437        Ok(())
438    }
439
440    #[cfg_attr(feature = "hide-proto-docs", doc(hidden))]
441    pub async fn import_bao_bytes(
442        &self,
443        hash: Hash,
444        ranges: ChunkRanges,
445        data: impl Into<Bytes>,
446    ) -> RequestResult<()> {
447        self.import_bao_reader(hash, ranges, data.into()).await?;
448        Ok(())
449    }
450
451    pub fn list(&self) -> BlobsListProgress {
452        let msg = ListRequest;
453        let client = self.client.clone();
454        BlobsListProgress::new(client.server_streaming(msg, 32))
455    }
456
457    pub async fn status(&self, hash: impl Into<Hash>) -> irpc::Result<BlobStatus> {
458        let hash = hash.into();
459        let msg = BlobStatusRequest { hash };
460        self.client.rpc(msg).await
461    }
462
463    pub async fn has(&self, hash: impl Into<Hash>) -> irpc::Result<bool> {
464        match self.status(hash).await? {
465            BlobStatus::Complete { .. } => Ok(true),
466            _ => Ok(false),
467        }
468    }
469
470    pub(crate) async fn clear_protected(&self) -> RequestResult<()> {
471        let msg = ClearProtectedRequest;
472        self.client.rpc(msg).await??;
473        Ok(())
474    }
475}
476
477/// A progress handle for a batch scoped add operation.
478pub struct BatchAddProgress<'a>(AddProgress<'a>);
479
480impl<'a> IntoFuture for BatchAddProgress<'a> {
481    type Output = RequestResult<TempTag>;
482
483    type IntoFuture = Pin<Box<dyn Future<Output = Self::Output> + Send + 'a>>;
484
485    fn into_future(self) -> Self::IntoFuture {
486        Box::pin(self.temp_tag())
487    }
488}
489
490impl<'a> BatchAddProgress<'a> {
491    pub async fn with_named_tag(self, name: impl AsRef<[u8]>) -> RequestResult<HashAndFormat> {
492        self.0.with_named_tag(name).await
493    }
494
495    pub async fn with_tag(self) -> RequestResult<TagInfo> {
496        self.0.with_tag().await
497    }
498
499    pub async fn stream(self) -> impl Stream<Item = AddProgressItem> {
500        self.0.stream().await
501    }
502
503    pub async fn temp_tag(self) -> RequestResult<TempTag> {
504        self.0.temp_tag().await
505    }
506}
507
508/// A batch of operations that modify the blob store.
509pub struct Batch<'a> {
510    scope: Scope,
511    blobs: &'a Blobs,
512    _tx: mpsc::Sender<BatchResponse>,
513}
514
515impl<'a> Batch<'a> {
516    pub fn add_bytes(&self, data: impl Into<Bytes>) -> BatchAddProgress {
517        let options = ImportBytesRequest {
518            data: data.into(),
519            format: crate::BlobFormat::Raw,
520            scope: self.scope,
521        };
522        BatchAddProgress(self.blobs.add_bytes_impl(options))
523    }
524
525    pub fn add_bytes_with_opts(&self, options: impl Into<AddBytesOptions>) -> BatchAddProgress {
526        let options = options.into();
527        BatchAddProgress(self.blobs.add_bytes_impl(ImportBytesRequest {
528            data: options.data,
529            format: options.format,
530            scope: self.scope,
531        }))
532    }
533
534    pub fn add_slice(&self, data: impl AsRef<[u8]>) -> BatchAddProgress {
535        let options = ImportBytesRequest {
536            data: Bytes::copy_from_slice(data.as_ref()),
537            format: crate::BlobFormat::Raw,
538            scope: self.scope,
539        };
540        BatchAddProgress(self.blobs.add_bytes_impl(options))
541    }
542
543    pub fn add_path_with_opts(&self, options: impl Into<AddPathOptions>) -> BatchAddProgress {
544        let options = options.into();
545        BatchAddProgress(self.blobs.add_path_with_opts_impl(ImportPathRequest {
546            path: options.path,
547            mode: options.mode,
548            format: options.format,
549            scope: self.scope,
550        }))
551    }
552
553    pub async fn temp_tag(&self, value: impl Into<HashAndFormat>) -> irpc::Result<TempTag> {
554        let value = value.into();
555        let msg = CreateTempTagRequest {
556            scope: self.scope,
557            value,
558        };
559        self.blobs.client.rpc(msg).await
560    }
561}
562
563/// Options for adding data from a file system path.
564#[derive(Debug)]
565pub struct AddPathOptions {
566    pub path: PathBuf,
567    pub format: BlobFormat,
568    pub mode: ImportMode,
569}
570
571/// A progress handle for an import operation.
572///
573/// Internally this is a stream of [`AddProgressItem`] items. Working with this
574/// stream directly can be inconvenient, so this struct provides some convenience
575/// methods to work with the result.
576///
577/// It also implements [`IntoFuture`], so you can await it to get the [`TempTag`] that
578/// contains the hash of the added content and also protects the content.
579///
580/// If you want access to the stream, you can use the [`AddProgress::stream`] method.
581pub struct AddProgress<'a> {
582    blobs: &'a Blobs,
583    inner: stream::Boxed<AddProgressItem>,
584}
585
586impl<'a> IntoFuture for AddProgress<'a> {
587    type Output = RequestResult<TagInfo>;
588
589    type IntoFuture = Pin<Box<dyn Future<Output = Self::Output> + Send + 'a>>;
590
591    fn into_future(self) -> Self::IntoFuture {
592        Box::pin(self.with_tag())
593    }
594}
595
596impl<'a> AddProgress<'a> {
597    fn new(blobs: &'a Blobs, stream: impl Stream<Item = AddProgressItem> + Send + 'static) -> Self {
598        Self {
599            blobs,
600            inner: Box::pin(stream),
601        }
602    }
603
604    pub async fn temp_tag(self) -> RequestResult<TempTag> {
605        let mut stream = self.inner;
606        while let Some(item) = stream.next().await {
607            match item {
608                AddProgressItem::Done(tt) => return Ok(tt),
609                AddProgressItem::Error(e) => return Err(e.into()),
610                _ => {}
611            }
612        }
613        Err(super::Error::other("unexpected end of stream").into())
614    }
615
616    pub async fn with_named_tag(self, name: impl AsRef<[u8]>) -> RequestResult<HashAndFormat> {
617        let blobs = self.blobs.clone();
618        let tt = self.temp_tag().await?;
619        let haf = *tt.hash_and_format();
620        let tags = Tags::ref_from_sender(&blobs.client);
621        tags.set(name, *tt.hash_and_format()).await?;
622        drop(tt);
623        Ok(haf)
624    }
625
626    pub async fn with_tag(self) -> RequestResult<TagInfo> {
627        let blobs = self.blobs.clone();
628        let tt = self.temp_tag().await?;
629        let hash = *tt.hash();
630        let format = tt.format();
631        let tags = Tags::ref_from_sender(&blobs.client);
632        let name = tags.create(*tt.hash_and_format()).await?;
633        drop(tt);
634        Ok(TagInfo { name, hash, format })
635    }
636
637    pub async fn stream(self) -> impl Stream<Item = AddProgressItem> {
638        self.inner
639    }
640}
641
642/// An observe result. Awaiting this will return the current state.
643///
644/// Calling [`ObserveProgress::stream`] will return a stream of updates, where
645/// the first item is the current state and subsequent items are updates.
646pub struct ObserveProgress {
647    inner: future::Boxed<irpc::Result<mpsc::Receiver<Bitfield>>>,
648}
649
650impl IntoFuture for ObserveProgress {
651    type Output = RequestResult<Bitfield>;
652
653    type IntoFuture = Pin<Box<dyn Future<Output = Self::Output> + Send>>;
654
655    fn into_future(self) -> Self::IntoFuture {
656        Box::pin(async move {
657            let mut rx = self.inner.await?;
658            match rx.recv().await? {
659                Some(bitfield) => Ok(bitfield),
660                None => Err(super::Error::other("unexpected end of stream").into()),
661            }
662        })
663    }
664}
665
666impl ObserveProgress {
667    fn new(
668        fut: impl Future<Output = irpc::Result<mpsc::Receiver<Bitfield>>> + Send + 'static,
669    ) -> Self {
670        Self {
671            inner: Box::pin(fut),
672        }
673    }
674
675    pub async fn await_completion(self) -> RequestResult<Bitfield> {
676        let mut stream = self.stream().await?;
677        while let Some(item) = stream.next().await {
678            if item.is_complete() {
679                return Ok(item);
680            }
681        }
682        Err(super::Error::other("unexpected end of stream").into())
683    }
684
685    /// Returns an infinite stream of bitfields. The first bitfield is the
686    /// current state, and the following bitfields are updates.
687    ///
688    /// Once a blob is complete, there will be no more updates.
689    pub async fn stream(self) -> irpc::Result<impl Stream<Item = Bitfield>> {
690        let mut rx = self.inner.await?;
691        Ok(Gen::new(|co| async move {
692            while let Ok(Some(item)) = rx.recv().await {
693                co.yield_(item).await;
694            }
695        }))
696    }
697}
698
699/// A progress handle for an export operation.
700///
701/// Internally this is a stream of [`ExportProgress`] items. Working with this
702/// stream directly can be inconvenient, so this struct provides some convenience
703/// methods to work with the result.
704///
705/// To get the underlying stream, use the [`ExportProgress::stream`] method.
706///
707/// It also implements [`IntoFuture`], so you can await it to get the size of the
708/// exported blob.
709pub struct ExportProgress {
710    inner: future::Boxed<irpc::Result<mpsc::Receiver<ExportProgressItem>>>,
711}
712
713impl IntoFuture for ExportProgress {
714    type Output = RequestResult<u64>;
715
716    type IntoFuture = Pin<Box<dyn Future<Output = Self::Output> + Send>>;
717
718    fn into_future(self) -> Self::IntoFuture {
719        Box::pin(self.finish())
720    }
721}
722
723impl ExportProgress {
724    fn new(
725        fut: impl Future<Output = irpc::Result<mpsc::Receiver<ExportProgressItem>>> + Send + 'static,
726    ) -> Self {
727        Self {
728            inner: Box::pin(fut),
729        }
730    }
731
732    pub async fn stream(self) -> impl Stream<Item = ExportProgressItem> {
733        Gen::new(|co| async move {
734            let mut rx = match self.inner.await {
735                Ok(rx) => rx,
736                Err(e) => {
737                    co.yield_(ExportProgressItem::Error(e.into())).await;
738                    return;
739                }
740            };
741            while let Ok(Some(item)) = rx.recv().await {
742                co.yield_(item).await;
743            }
744        })
745    }
746
747    pub async fn finish(self) -> RequestResult<u64> {
748        let mut rx = self.inner.await?;
749        let mut size = None;
750        loop {
751            match rx.recv().await? {
752                Some(ExportProgressItem::Done) => break,
753                Some(ExportProgressItem::Size(s)) => size = Some(s),
754                Some(ExportProgressItem::Error(cause)) => return Err(cause.into()),
755                _ => {}
756            }
757        }
758        if let Some(size) = size {
759            Ok(size)
760        } else {
761            Err(super::Error::other("unexpected end of stream").into())
762        }
763    }
764}
765
766/// A handle for an ongoing bao import operation.
767pub struct ImportBaoHandle {
768    pub tx: mpsc::Sender<BaoContentItem>,
769    pub rx: oneshot::Receiver<super::Result<()>>,
770}
771
772impl ImportBaoHandle {
773    pub(crate) async fn new(
774        fut: impl Future<
775                Output = irpc::Result<(
776                    mpsc::Sender<BaoContentItem>,
777                    oneshot::Receiver<super::Result<()>>,
778                )>,
779            > + Send
780            + 'static,
781    ) -> irpc::Result<Self> {
782        let (tx, rx) = fut.await?;
783        Ok(Self { tx, rx })
784    }
785}
786
787/// A progress handle for a blobs list operation.
788pub struct BlobsListProgress {
789    inner: future::Boxed<irpc::Result<mpsc::Receiver<super::Result<Hash>>>>,
790}
791
792impl BlobsListProgress {
793    fn new(
794        fut: impl Future<Output = irpc::Result<mpsc::Receiver<super::Result<Hash>>>> + Send + 'static,
795    ) -> Self {
796        Self {
797            inner: Box::pin(fut),
798        }
799    }
800
801    pub async fn hashes(self) -> RequestResult<Vec<Hash>> {
802        let mut rx: mpsc::Receiver<Result<Hash, super::Error>> = self.inner.await?;
803        let mut hashes = Vec::new();
804        while let Some(item) = rx.recv().await? {
805            hashes.push(item?);
806        }
807        Ok(hashes)
808    }
809
810    pub async fn stream(self) -> irpc::Result<impl Stream<Item = super::Result<Hash>>> {
811        let mut rx = self.inner.await?;
812        Ok(Gen::new(|co| async move {
813            while let Ok(Some(item)) = rx.recv().await {
814                co.yield_(item).await;
815            }
816        }))
817    }
818}
819
820/// A progress handle for a bao export operation.
821///
822/// Internally, this is a stream of [`EncodedItem`]s. Using this stream directly
823/// is often inconvenient, so there are a number of higher level methods to
824/// process the stream.
825///
826/// You can get access to the underlying stream using the [`ExportBaoProgress::stream`] method.
827pub struct ExportRangesProgress {
828    ranges: RangeSet2<u64>,
829    inner: future::Boxed<irpc::Result<mpsc::Receiver<ExportRangesItem>>>,
830}
831
832impl ExportRangesProgress {
833    fn new(
834        ranges: RangeSet2<u64>,
835        fut: impl Future<Output = irpc::Result<mpsc::Receiver<ExportRangesItem>>> + Send + 'static,
836    ) -> Self {
837        Self {
838            ranges,
839            inner: Box::pin(fut),
840        }
841    }
842}
843
844impl ExportRangesProgress {
845    /// A raw stream of [`ExportRangesItem`]s.
846    ///
847    /// Ranges will be rounded up to chunk boundaries. So if you request a
848    /// range of 0..100, you will get the entire first chunk, 0..1024.
849    ///
850    /// It is up to the caller to clip the ranges to the requested ranges.
851    pub async fn stream(self) -> impl Stream<Item = ExportRangesItem> {
852        Gen::new(|co| async move {
853            let mut rx = match self.inner.await {
854                Ok(rx) => rx,
855                Err(e) => {
856                    co.yield_(ExportRangesItem::Error(e.into())).await;
857                    return;
858                }
859            };
860            while let Ok(Some(item)) = rx.recv().await {
861                co.yield_(item).await;
862            }
863        })
864    }
865
866    /// Concatenate all the data into a single `Bytes`.
867    pub async fn concatenate(self) -> RequestResult<Vec<u8>> {
868        let mut rx = self.inner.await?;
869        let mut data = BTreeMap::new();
870        while let Some(item) = rx.recv().await? {
871            match item {
872                ExportRangesItem::Size(_) => {}
873                ExportRangesItem::Data(leaf) => {
874                    data.insert(leaf.offset, leaf.data);
875                }
876                ExportRangesItem::Error(cause) => return Err(cause.into()),
877            }
878        }
879        let mut res = Vec::new();
880        for range in self.ranges.iter() {
881            let (start, end) = match range {
882                RangeSetRange::RangeFrom(range) => (*range.start, u64::MAX),
883                RangeSetRange::Range(range) => (*range.start, *range.end),
884            };
885            for (offset, data) in data.iter() {
886                let cstart = *offset;
887                let cend = *offset + (data.len() as u64);
888                if cstart >= end || cend <= start {
889                    continue;
890                }
891                let start = start.max(cstart);
892                let end = end.min(cend);
893                let data = &data[(start - cstart) as usize..(end - cstart) as usize];
894                res.extend_from_slice(data);
895            }
896        }
897        Ok(res)
898    }
899}
900
901/// A progress handle for a bao export operation.
902///
903/// Internally, this is a stream of [`EncodedItem`]s. Using this stream directly
904/// is often inconvenient, so there are a number of higher level methods to
905/// process the stream.
906///
907/// You can get access to the underlying stream using the [`ExportBaoProgress::stream`] method.
908pub struct ExportBaoProgress {
909    inner: future::Boxed<irpc::Result<mpsc::Receiver<EncodedItem>>>,
910}
911
912impl ExportBaoProgress {
913    fn new(
914        fut: impl Future<Output = irpc::Result<mpsc::Receiver<EncodedItem>>> + Send + 'static,
915    ) -> Self {
916        Self {
917            inner: Box::pin(fut),
918        }
919    }
920
921    /// Interprets this blob as a hash sequence and returns a stream of hashes.
922    ///
923    /// Errors will be reported, but the iterator will nevertheless continue.
924    /// If you get an error despite having asked for ranges that should be present,
925    /// this means that the data is corrupted. It can still make sense to continue
926    /// to get all non-corrupted sections.
927    pub fn hashes_with_index(
928        self,
929    ) -> impl Stream<Item = std::result::Result<(u64, Hash), anyhow::Error>> {
930        let mut stream = self.stream();
931        Gen::new(|co| async move {
932            while let Some(item) = stream.next().await {
933                let leaf = match item {
934                    EncodedItem::Leaf(leaf) => leaf,
935                    EncodedItem::Error(e) => {
936                        co.yield_(Err(e.into())).await;
937                        continue;
938                    }
939                    _ => continue,
940                };
941                let slice = match HashSeqChunk::try_from(leaf) {
942                    Ok(slice) => slice,
943                    Err(e) => {
944                        co.yield_(Err(e)).await;
945                        continue;
946                    }
947                };
948                let offset = slice.base();
949                for (o, hash) in slice.into_iter().enumerate() {
950                    co.yield_(Ok((offset + o as u64, hash))).await;
951                }
952            }
953        })
954    }
955
956    /// Same as [`Self::hashes_with_index`], but without the indexes.
957    pub fn hashes(self) -> impl Stream<Item = std::result::Result<Hash, anyhow::Error>> {
958        self.hashes_with_index().map(|x| x.map(|(_, hash)| hash))
959    }
960
961    pub async fn bao_to_vec(self) -> RequestResult<Vec<u8>> {
962        let mut data = Vec::new();
963        let mut stream = self.into_byte_stream();
964        while let Some(item) = stream.next().await {
965            println!("item: {item:?}");
966            data.extend_from_slice(&item?);
967        }
968        Ok(data)
969    }
970
971    pub async fn data_to_bytes(self) -> super::ExportBaoResult<Bytes> {
972        let mut rx = self.inner.await?;
973        let mut data = Vec::new();
974        while let Some(item) = rx.recv().await? {
975            match item {
976                EncodedItem::Leaf(leaf) => {
977                    data.push(leaf.data);
978                }
979                EncodedItem::Parent(_) => {}
980                EncodedItem::Size(_) => {}
981                EncodedItem::Done => break,
982                EncodedItem::Error(cause) => return Err(cause.into()),
983            }
984        }
985        if data.len() == 1 {
986            Ok(data.pop().unwrap())
987        } else {
988            let mut out = Vec::new();
989            for item in data {
990                out.extend_from_slice(&item);
991            }
992            Ok(out.into())
993        }
994    }
995
996    pub async fn data_to_vec(self) -> super::ExportBaoResult<Vec<u8>> {
997        let mut rx = self.inner.await?;
998        let mut data = Vec::new();
999        while let Some(item) = rx.recv().await? {
1000            match item {
1001                EncodedItem::Leaf(leaf) => {
1002                    data.extend_from_slice(&leaf.data);
1003                }
1004                EncodedItem::Parent(_) => {}
1005                EncodedItem::Size(_) => {}
1006                EncodedItem::Done => break,
1007                EncodedItem::Error(cause) => return Err(cause.into()),
1008            }
1009        }
1010        Ok(data)
1011    }
1012
1013    pub async fn write_quinn(self, target: &mut quinn::SendStream) -> super::ExportBaoResult<()> {
1014        let mut rx = self.inner.await?;
1015        while let Some(item) = rx.recv().await? {
1016            match item {
1017                EncodedItem::Size(size) => {
1018                    target.write_u64_le(size).await?;
1019                }
1020                EncodedItem::Parent(parent) => {
1021                    let mut data = vec![0u8; 64];
1022                    data[..32].copy_from_slice(parent.pair.0.as_bytes());
1023                    data[32..].copy_from_slice(parent.pair.1.as_bytes());
1024                    target.write_all(&data).await.map_err(io::Error::from)?;
1025                }
1026                EncodedItem::Leaf(leaf) => {
1027                    target
1028                        .write_chunk(leaf.data)
1029                        .await
1030                        .map_err(io::Error::from)?;
1031                }
1032                EncodedItem::Done => break,
1033                EncodedItem::Error(cause) => return Err(cause.into()),
1034            }
1035        }
1036        Ok(())
1037    }
1038
1039    /// Write quinn variant that also feeds a progress writer.
1040    pub(crate) async fn write_quinn_with_progress(
1041        self,
1042        writer: &mut SendStream,
1043        progress: &mut impl WriteProgress,
1044        hash: &Hash,
1045        index: u64,
1046    ) -> super::ExportBaoResult<()> {
1047        let mut rx = self.inner.await?;
1048        while let Some(item) = rx.recv().await? {
1049            match item {
1050                EncodedItem::Size(size) => {
1051                    progress.send_transfer_started(index, hash, size).await;
1052                    writer.write_u64_le(size).await?;
1053                    progress.log_other_write(8);
1054                }
1055                EncodedItem::Parent(parent) => {
1056                    let mut data = vec![0u8; 64];
1057                    data[..32].copy_from_slice(parent.pair.0.as_bytes());
1058                    data[32..].copy_from_slice(parent.pair.1.as_bytes());
1059                    writer.write_all(&data).await.map_err(io::Error::from)?;
1060                    progress.log_other_write(64);
1061                }
1062                EncodedItem::Leaf(leaf) => {
1063                    let len = leaf.data.len();
1064                    writer
1065                        .write_chunk(leaf.data)
1066                        .await
1067                        .map_err(io::Error::from)?;
1068                    progress.notify_payload_write(index, leaf.offset, len).await;
1069                }
1070                EncodedItem::Done => break,
1071                EncodedItem::Error(cause) => return Err(cause.into()),
1072            }
1073        }
1074        Ok(())
1075    }
1076
1077    pub fn into_byte_stream(self) -> impl Stream<Item = super::Result<Bytes>> {
1078        self.stream().filter_map(|item| match item {
1079            EncodedItem::Size(size) => {
1080                let size = size.to_le_bytes().to_vec().into();
1081                Some(Ok(size))
1082            }
1083            EncodedItem::Parent(parent) => {
1084                let mut data = vec![0u8; 64];
1085                data[..32].copy_from_slice(parent.pair.0.as_bytes());
1086                data[32..].copy_from_slice(parent.pair.1.as_bytes());
1087                Some(Ok(data.into()))
1088            }
1089            EncodedItem::Leaf(leaf) => Some(Ok(leaf.data)),
1090            EncodedItem::Done => None,
1091            EncodedItem::Error(cause) => Some(Err(super::Error::other(cause))),
1092        })
1093    }
1094
1095    pub fn stream(self) -> impl Stream<Item = EncodedItem> {
1096        Gen::new(|co| async move {
1097            let mut rx = match self.inner.await {
1098                Ok(rx) => rx,
1099                Err(cause) => {
1100                    co.yield_(EncodedItem::Error(io::Error::other(cause).into()))
1101                        .await;
1102                    return;
1103                }
1104            };
1105            while let Ok(Some(item)) = rx.recv().await {
1106                co.yield_(item).await;
1107            }
1108        })
1109    }
1110}
1111
1112pub(crate) trait WriteProgress {
1113    /// Notify the progress writer that a payload write has happened.
1114    async fn notify_payload_write(&mut self, index: u64, offset: u64, len: usize);
1115
1116    /// Log a write of some other data.
1117    fn log_other_write(&mut self, len: usize);
1118
1119    /// Notify the progress writer that a transfer has started.
1120    async fn send_transfer_started(&mut self, index: u64, hash: &Hash, size: u64);
1121}
1122
1123impl WriteProgress for StreamContext {
1124    async fn notify_payload_write(&mut self, index: u64, offset: u64, len: usize) {
1125        StreamContext::notify_payload_write(self, index, offset, len);
1126    }
1127
1128    fn log_other_write(&mut self, len: usize) {
1129        StreamContext::log_other_write(self, len);
1130    }
1131
1132    async fn send_transfer_started(&mut self, index: u64, hash: &Hash, size: u64) {
1133        StreamContext::send_transfer_started(self, index, hash, size).await
1134    }
1135}