iroh_blobs/api/
blobs.rs

1//! API to interact with a local blob store
2//!
3//! This API is for local interactions with the blob store, such as importing
4//! and exporting blobs, observing the bitfield of a blob, and deleting blobs.
5//!
6//! The main entry point is the [`Blobs`] struct.
7use std::{
8    collections::BTreeMap,
9    future::{Future, IntoFuture},
10    io,
11    num::NonZeroU64,
12    path::{Path, PathBuf},
13    pin::Pin,
14};
15
16pub use bao_tree::io::mixed::EncodedItem;
17use bao_tree::{
18    io::{
19        fsm::{ResponseDecoder, ResponseDecoderNext},
20        BaoContentItem, Leaf,
21    },
22    BaoTree, ChunkNum, ChunkRanges,
23};
24use bytes::Bytes;
25use genawaiter::sync::Gen;
26use iroh_io::AsyncStreamWriter;
27use irpc::channel::{mpsc, oneshot};
28use n0_future::{future, stream, Stream, StreamExt};
29use range_collections::{range_set::RangeSetRange, RangeSet2};
30use ref_cast::RefCast;
31use serde::{Deserialize, Serialize};
32use tracing::trace;
33mod reader;
34pub use reader::BlobReader;
35
36// Public reexports from the proto module.
37//
38// Due to the fact that the proto module is hidden from docs by default,
39// these will appear in the docs as if they were declared here.
40pub use super::proto::{
41    AddProgressItem, Bitfield, BlobDeleteRequest as DeleteOptions, BlobStatus,
42    ExportBaoRequest as ExportBaoOptions, ExportMode, ExportPathRequest as ExportOptions,
43    ExportProgressItem, ExportRangesRequest as ExportRangesOptions,
44    ImportBaoRequest as ImportBaoOptions, ImportMode, ObserveRequest as ObserveOptions,
45};
46use super::{
47    proto::{
48        BatchResponse, BlobStatusRequest, ClearProtectedRequest, CreateTempTagRequest,
49        ExportBaoRequest, ExportRangesItem, ImportBaoRequest, ImportByteStreamRequest,
50        ImportBytesRequest, ImportPathRequest, ListRequest, Scope,
51    },
52    remote::HashSeqChunk,
53    tags::TagInfo,
54    ApiClient, RequestResult, Tags,
55};
56use crate::{
57    api::proto::{BatchRequest, ImportByteStreamUpdate},
58    provider::events::ClientResult,
59    store::IROH_BLOCK_SIZE,
60    util::{temp_tag::TempTag, RecvStreamAsyncStreamReader},
61    BlobFormat, Hash, HashAndFormat,
62};
63
64/// Options for adding bytes.
65#[derive(Debug)]
66pub struct AddBytesOptions {
67    pub data: Bytes,
68    pub format: BlobFormat,
69}
70
71impl<T: Into<Bytes>> From<(T, BlobFormat)> for AddBytesOptions {
72    fn from(item: (T, BlobFormat)) -> Self {
73        let (data, format) = item;
74        Self {
75            data: data.into(),
76            format,
77        }
78    }
79}
80
81/// Blobs API
82#[derive(Debug, Clone, ref_cast::RefCast)]
83#[repr(transparent)]
84pub struct Blobs {
85    client: ApiClient,
86}
87
88impl Blobs {
89    pub(crate) fn ref_from_sender(sender: &ApiClient) -> &Self {
90        Self::ref_cast(sender)
91    }
92
93    pub async fn batch(&self) -> irpc::Result<Batch<'_>> {
94        let msg = BatchRequest;
95        trace!("{msg:?}");
96        let (tx, rx) = self.client.client_streaming(msg, 32).await?;
97        let scope = rx.await?;
98
99        Ok(Batch {
100            scope,
101            blobs: self,
102            _tx: tx,
103        })
104    }
105
106    /// Create a reader for the given hash. The reader implements [`tokio::io::AsyncRead`] and [`tokio::io::AsyncSeek`]
107    /// and therefore can be used to read the blob's content.
108    ///
109    /// Any access to parts of the blob that are not present will result in an error.
110    ///
111    /// Example:
112    /// ```rust
113    /// use iroh_blobs::{store::mem::MemStore, api::blobs::Blobs};
114    /// use tokio::io::AsyncReadExt;
115    ///
116    /// # async fn example() -> anyhow::Result<()> {
117    /// let store = MemStore::new();
118    /// let tag = store.add_slice(b"Hello, world!").await?;
119    /// let mut reader = store.reader(tag.hash);
120    /// let mut buf = String::new();
121    /// reader.read_to_string(&mut buf).await?;
122    /// assert_eq!(buf, "Hello, world!");
123    /// # Ok(())
124    /// }
125    /// ```
126    pub fn reader(&self, hash: impl Into<Hash>) -> BlobReader {
127        self.reader_with_opts(ReaderOptions { hash: hash.into() })
128    }
129
130    /// Create a reader for the given options. The reader implements [`tokio::io::AsyncRead`] and [`tokio::io::AsyncSeek`]
131    /// and therefore can be used to read the blob's content.
132    ///
133    /// Any access to parts of the blob that are not present will result in an error.
134    pub fn reader_with_opts(&self, options: ReaderOptions) -> BlobReader {
135        BlobReader::new(self.clone(), options)
136    }
137
138    /// Delete a blob.
139    ///
140    /// This function is not public, because it does not work as expected when called manually,
141    /// because blobs are protected from deletion. This is only called from the gc task, which
142    /// clears the protections before.
143    ///
144    /// Users should rely only on garbage collection for blob deletion.
145    pub(crate) async fn delete_with_opts(&self, options: DeleteOptions) -> RequestResult<()> {
146        trace!("{options:?}");
147        self.client.rpc(options).await??;
148        Ok(())
149    }
150
151    /// See [`Self::delete_with_opts`].
152    pub(crate) async fn delete(
153        &self,
154        hashes: impl IntoIterator<Item = impl Into<Hash>>,
155    ) -> RequestResult<()> {
156        self.delete_with_opts(DeleteOptions {
157            hashes: hashes.into_iter().map(Into::into).collect(),
158            force: false,
159        })
160        .await
161    }
162
163    pub fn add_slice(&self, data: impl AsRef<[u8]>) -> AddProgress<'_> {
164        let options = ImportBytesRequest {
165            data: Bytes::copy_from_slice(data.as_ref()),
166            format: crate::BlobFormat::Raw,
167            scope: Scope::GLOBAL,
168        };
169        self.add_bytes_impl(options)
170    }
171
172    pub fn add_bytes(&self, data: impl Into<bytes::Bytes>) -> AddProgress<'_> {
173        let options = ImportBytesRequest {
174            data: data.into(),
175            format: crate::BlobFormat::Raw,
176            scope: Scope::GLOBAL,
177        };
178        self.add_bytes_impl(options)
179    }
180
181    pub fn add_bytes_with_opts(&self, options: impl Into<AddBytesOptions>) -> AddProgress<'_> {
182        let options = options.into();
183        let request = ImportBytesRequest {
184            data: options.data,
185            format: options.format,
186            scope: Scope::GLOBAL,
187        };
188        self.add_bytes_impl(request)
189    }
190
191    fn add_bytes_impl(&self, options: ImportBytesRequest) -> AddProgress<'_> {
192        trace!("{options:?}");
193        let this = self.clone();
194        let stream = Gen::new(|co| async move {
195            let mut receiver = match this.client.server_streaming(options, 32).await {
196                Ok(receiver) => receiver,
197                Err(cause) => {
198                    co.yield_(AddProgressItem::Error(cause.into())).await;
199                    return;
200                }
201            };
202            loop {
203                match receiver.recv().await {
204                    Ok(Some(item)) => co.yield_(item).await,
205                    Err(cause) => {
206                        co.yield_(AddProgressItem::Error(cause.into())).await;
207                        break;
208                    }
209                    Ok(None) => break,
210                }
211            }
212        });
213        AddProgress::new(self, stream)
214    }
215
216    pub fn add_path_with_opts(&self, options: impl Into<AddPathOptions>) -> AddProgress<'_> {
217        let options = options.into();
218        self.add_path_with_opts_impl(ImportPathRequest {
219            path: options.path,
220            mode: options.mode,
221            format: options.format,
222            scope: Scope::GLOBAL,
223        })
224    }
225
226    fn add_path_with_opts_impl(&self, options: ImportPathRequest) -> AddProgress<'_> {
227        trace!("{:?}", options);
228        let client = self.client.clone();
229        let stream = Gen::new(|co| async move {
230            let mut receiver = match client.server_streaming(options, 32).await {
231                Ok(receiver) => receiver,
232                Err(cause) => {
233                    co.yield_(AddProgressItem::Error(cause.into())).await;
234                    return;
235                }
236            };
237            loop {
238                match receiver.recv().await {
239                    Ok(Some(item)) => co.yield_(item).await,
240                    Err(cause) => {
241                        co.yield_(AddProgressItem::Error(cause.into())).await;
242                        break;
243                    }
244                    Ok(None) => break,
245                }
246            }
247        });
248        AddProgress::new(self, stream)
249    }
250
251    pub fn add_path(&self, path: impl AsRef<Path>) -> AddProgress<'_> {
252        self.add_path_with_opts(AddPathOptions {
253            path: path.as_ref().to_owned(),
254            mode: ImportMode::Copy,
255            format: BlobFormat::Raw,
256        })
257    }
258
259    pub async fn add_stream(
260        &self,
261        data: impl Stream<Item = io::Result<Bytes>> + Send + Sync + 'static,
262    ) -> AddProgress<'_> {
263        let inner = ImportByteStreamRequest {
264            format: crate::BlobFormat::Raw,
265            scope: Scope::default(),
266        };
267        let client = self.client.clone();
268        let stream = Gen::new(|co| async move {
269            let (sender, mut receiver) = match client.bidi_streaming(inner, 32, 32).await {
270                Ok(x) => x,
271                Err(cause) => {
272                    co.yield_(AddProgressItem::Error(cause.into())).await;
273                    return;
274                }
275            };
276            let recv = async {
277                loop {
278                    match receiver.recv().await {
279                        Ok(Some(item)) => co.yield_(item).await,
280                        Err(cause) => {
281                            co.yield_(AddProgressItem::Error(cause.into())).await;
282                            break;
283                        }
284                        Ok(None) => break,
285                    }
286                }
287            };
288            let send = async {
289                tokio::pin!(data);
290                while let Some(item) = data.next().await {
291                    sender.send(ImportByteStreamUpdate::Bytes(item?)).await?;
292                }
293                sender.send(ImportByteStreamUpdate::Done).await?;
294                anyhow::Ok(())
295            };
296            let _ = tokio::join!(send, recv);
297        });
298        AddProgress::new(self, stream)
299    }
300
301    pub fn export_ranges(
302        &self,
303        hash: impl Into<Hash>,
304        ranges: impl Into<RangeSet2<u64>>,
305    ) -> ExportRangesProgress {
306        self.export_ranges_with_opts(ExportRangesOptions {
307            hash: hash.into(),
308            ranges: ranges.into(),
309        })
310    }
311
312    pub fn export_ranges_with_opts(&self, options: ExportRangesOptions) -> ExportRangesProgress {
313        trace!("{options:?}");
314        ExportRangesProgress::new(
315            options.ranges.clone(),
316            self.client.server_streaming(options, 32),
317        )
318    }
319
320    pub fn export_bao_with_opts(
321        &self,
322        options: ExportBaoOptions,
323        local_update_cap: usize,
324    ) -> ExportBaoProgress {
325        trace!("{options:?}");
326        ExportBaoProgress::new(self.client.server_streaming(options, local_update_cap))
327    }
328
329    pub fn export_bao(
330        &self,
331        hash: impl Into<Hash>,
332        ranges: impl Into<ChunkRanges>,
333    ) -> ExportBaoProgress {
334        self.export_bao_with_opts(
335            ExportBaoRequest {
336                hash: hash.into(),
337                ranges: ranges.into(),
338            },
339            32,
340        )
341    }
342
343    /// Export a single chunk from the given hash, at the given offset.
344    pub async fn export_chunk(
345        &self,
346        hash: impl Into<Hash>,
347        offset: u64,
348    ) -> super::ExportBaoResult<Leaf> {
349        let base = ChunkNum::full_chunks(offset);
350        let ranges = ChunkRanges::from(base..base + 1);
351        let mut stream = self.export_bao(hash, ranges).stream();
352        while let Some(item) = stream.next().await {
353            match item {
354                EncodedItem::Leaf(leaf) => return Ok(leaf),
355                EncodedItem::Parent(_) => {}
356                EncodedItem::Size(_) => {}
357                EncodedItem::Done => break,
358                EncodedItem::Error(cause) => return Err(cause.into()),
359            }
360        }
361        Err(io::Error::other("unexpected end of stream").into())
362    }
363
364    /// Get the entire blob into a Bytes
365    ///
366    /// This will run out of memory when called for very large blobs, so be careful!
367    pub async fn get_bytes(&self, hash: impl Into<Hash>) -> super::ExportBaoResult<Bytes> {
368        self.export_bao(hash.into(), ChunkRanges::all())
369            .data_to_bytes()
370            .await
371    }
372
373    /// Observe the bitfield of the given hash.
374    pub fn observe(&self, hash: impl Into<Hash>) -> ObserveProgress {
375        self.observe_with_opts(ObserveOptions { hash: hash.into() })
376    }
377
378    pub fn observe_with_opts(&self, options: ObserveOptions) -> ObserveProgress {
379        trace!("{:?}", options);
380        if options.hash == Hash::EMPTY {
381            return ObserveProgress::new(async move {
382                let (tx, rx) = mpsc::channel(1);
383                tx.send(Bitfield::complete(0)).await.ok();
384                Ok(rx)
385            });
386        }
387        ObserveProgress::new(self.client.server_streaming(options, 32))
388    }
389
390    pub fn export_with_opts(&self, options: ExportOptions) -> ExportProgress {
391        trace!("{:?}", options);
392        ExportProgress::new(self.client.server_streaming(options, 32))
393    }
394
395    pub fn export(&self, hash: impl Into<Hash>, target: impl AsRef<Path>) -> ExportProgress {
396        let options = ExportOptions {
397            hash: hash.into(),
398            mode: ExportMode::Copy,
399            target: target.as_ref().to_owned(),
400        };
401        self.export_with_opts(options)
402    }
403
404    /// Import BaoContentItems from a stream.
405    ///
406    /// The store assumes that these are already verified and in the correct order.
407    #[cfg_attr(feature = "hide-proto-docs", doc(hidden))]
408    pub async fn import_bao(
409        &self,
410        hash: impl Into<Hash>,
411        size: NonZeroU64,
412        local_update_cap: usize,
413    ) -> irpc::Result<ImportBaoHandle> {
414        let options = ImportBaoRequest {
415            hash: hash.into(),
416            size,
417        };
418        self.import_bao_with_opts(options, local_update_cap).await
419    }
420
421    #[cfg_attr(feature = "hide-proto-docs", doc(hidden))]
422    pub async fn import_bao_with_opts(
423        &self,
424        options: ImportBaoOptions,
425        local_update_cap: usize,
426    ) -> irpc::Result<ImportBaoHandle> {
427        trace!("{:?}", options);
428        ImportBaoHandle::new(self.client.client_streaming(options, local_update_cap)).await
429    }
430
431    #[cfg_attr(feature = "hide-proto-docs", doc(hidden))]
432    pub async fn import_bao_reader<R: crate::util::RecvStream>(
433        &self,
434        hash: Hash,
435        ranges: ChunkRanges,
436        mut reader: R,
437    ) -> RequestResult<R> {
438        let mut size = [0; 8];
439        reader
440            .recv_exact(&mut size)
441            .await
442            .map_err(super::Error::other)?;
443        let size = u64::from_le_bytes(size);
444        let Some(size) = NonZeroU64::new(size) else {
445            return if hash == Hash::EMPTY {
446                Ok(reader)
447            } else {
448                Err(super::Error::other("invalid size for hash").into())
449            };
450        };
451        let tree = BaoTree::new(size.get(), IROH_BLOCK_SIZE);
452        let mut decoder = ResponseDecoder::new(
453            hash.into(),
454            ranges,
455            tree,
456            RecvStreamAsyncStreamReader::new(reader),
457        );
458        let options = ImportBaoOptions { hash, size };
459        let handle = self.import_bao_with_opts(options, 32).await?;
460        let driver = async move {
461            let reader = loop {
462                match decoder.next().await {
463                    ResponseDecoderNext::More((rest, item)) => {
464                        handle.tx.send(item?).await?;
465                        decoder = rest;
466                    }
467                    ResponseDecoderNext::Done(reader) => break reader,
468                };
469            };
470            drop(handle.tx);
471            io::Result::Ok(reader)
472        };
473        let fut = async move { handle.rx.await.map_err(io::Error::other)? };
474        let (reader, res) = tokio::join!(driver, fut);
475        res?;
476        Ok(reader?.into_inner())
477    }
478
479    #[cfg_attr(feature = "hide-proto-docs", doc(hidden))]
480    pub async fn import_bao_bytes(
481        &self,
482        hash: Hash,
483        ranges: ChunkRanges,
484        data: impl Into<Bytes>,
485    ) -> RequestResult<()> {
486        self.import_bao_reader(hash, ranges, data.into()).await?;
487        Ok(())
488    }
489
490    pub fn list(&self) -> BlobsListProgress {
491        let msg = ListRequest;
492        let client = self.client.clone();
493        BlobsListProgress::new(client.server_streaming(msg, 32))
494    }
495
496    pub async fn status(&self, hash: impl Into<Hash>) -> irpc::Result<BlobStatus> {
497        let hash = hash.into();
498        let msg = BlobStatusRequest { hash };
499        self.client.rpc(msg).await
500    }
501
502    pub async fn has(&self, hash: impl Into<Hash>) -> irpc::Result<bool> {
503        match self.status(hash).await? {
504            BlobStatus::Complete { .. } => Ok(true),
505            _ => Ok(false),
506        }
507    }
508
509    #[allow(dead_code)]
510    pub(crate) async fn clear_protected(&self) -> RequestResult<()> {
511        let msg = ClearProtectedRequest;
512        self.client.rpc(msg).await??;
513        Ok(())
514    }
515}
516
517/// A progress handle for a batch scoped add operation.
518pub struct BatchAddProgress<'a>(AddProgress<'a>);
519
520impl<'a> IntoFuture for BatchAddProgress<'a> {
521    type Output = RequestResult<TempTag>;
522
523    type IntoFuture = Pin<Box<dyn Future<Output = Self::Output> + Send + 'a>>;
524
525    fn into_future(self) -> Self::IntoFuture {
526        Box::pin(self.temp_tag())
527    }
528}
529
530impl<'a> BatchAddProgress<'a> {
531    pub async fn with_named_tag(self, name: impl AsRef<[u8]>) -> RequestResult<HashAndFormat> {
532        self.0.with_named_tag(name).await
533    }
534
535    pub async fn with_tag(self) -> RequestResult<TagInfo> {
536        self.0.with_tag().await
537    }
538
539    pub async fn stream(self) -> impl Stream<Item = AddProgressItem> {
540        self.0.stream().await
541    }
542
543    pub async fn temp_tag(self) -> RequestResult<TempTag> {
544        self.0.temp_tag().await
545    }
546}
547
548/// A batch of operations that modify the blob store.
549pub struct Batch<'a> {
550    scope: Scope,
551    blobs: &'a Blobs,
552    _tx: mpsc::Sender<BatchResponse>,
553}
554
555impl<'a> Batch<'a> {
556    pub fn add_bytes(&self, data: impl Into<Bytes>) -> BatchAddProgress<'_> {
557        let options = ImportBytesRequest {
558            data: data.into(),
559            format: crate::BlobFormat::Raw,
560            scope: self.scope,
561        };
562        BatchAddProgress(self.blobs.add_bytes_impl(options))
563    }
564
565    pub fn add_bytes_with_opts(&self, options: impl Into<AddBytesOptions>) -> BatchAddProgress<'_> {
566        let options = options.into();
567        BatchAddProgress(self.blobs.add_bytes_impl(ImportBytesRequest {
568            data: options.data,
569            format: options.format,
570            scope: self.scope,
571        }))
572    }
573
574    pub fn add_slice(&self, data: impl AsRef<[u8]>) -> BatchAddProgress<'_> {
575        let options = ImportBytesRequest {
576            data: Bytes::copy_from_slice(data.as_ref()),
577            format: crate::BlobFormat::Raw,
578            scope: self.scope,
579        };
580        BatchAddProgress(self.blobs.add_bytes_impl(options))
581    }
582
583    pub fn add_path_with_opts(&self, options: impl Into<AddPathOptions>) -> BatchAddProgress<'_> {
584        let options = options.into();
585        BatchAddProgress(self.blobs.add_path_with_opts_impl(ImportPathRequest {
586            path: options.path,
587            mode: options.mode,
588            format: options.format,
589            scope: self.scope,
590        }))
591    }
592
593    pub async fn temp_tag(&self, value: impl Into<HashAndFormat>) -> irpc::Result<TempTag> {
594        let value = value.into();
595        let msg = CreateTempTagRequest {
596            scope: self.scope,
597            value,
598        };
599        self.blobs.client.rpc(msg).await
600    }
601}
602
603/// Options for adding data from a file system path.
604#[derive(Debug)]
605pub struct AddPathOptions {
606    pub path: PathBuf,
607    pub format: BlobFormat,
608    pub mode: ImportMode,
609}
610
611/// A progress handle for an import operation.
612///
613/// Internally this is a stream of [`AddProgressItem`] items. Working with this
614/// stream directly can be inconvenient, so this struct provides some convenience
615/// methods to work with the result.
616///
617/// It also implements [`IntoFuture`], so you can await it to get the [`TagInfo`] that
618/// contains the hash of the added content and also protects the content.
619///
620/// If you want access to the stream, you can use the [`AddProgress::stream`] method.
621pub struct AddProgress<'a> {
622    blobs: &'a Blobs,
623    inner: stream::Boxed<AddProgressItem>,
624}
625
626impl<'a> IntoFuture for AddProgress<'a> {
627    type Output = RequestResult<TagInfo>;
628
629    type IntoFuture = Pin<Box<dyn Future<Output = Self::Output> + Send + 'a>>;
630
631    fn into_future(self) -> Self::IntoFuture {
632        Box::pin(self.with_tag())
633    }
634}
635
636impl<'a> AddProgress<'a> {
637    fn new(blobs: &'a Blobs, stream: impl Stream<Item = AddProgressItem> + Send + 'static) -> Self {
638        Self {
639            blobs,
640            inner: Box::pin(stream),
641        }
642    }
643
644    pub async fn temp_tag(self) -> RequestResult<TempTag> {
645        let mut stream = self.inner;
646        while let Some(item) = stream.next().await {
647            match item {
648                AddProgressItem::Done(tt) => return Ok(tt),
649                AddProgressItem::Error(e) => return Err(e.into()),
650                _ => {}
651            }
652        }
653        Err(super::Error::other("unexpected end of stream").into())
654    }
655
656    pub async fn with_named_tag(self, name: impl AsRef<[u8]>) -> RequestResult<HashAndFormat> {
657        let blobs = self.blobs.clone();
658        let tt = self.temp_tag().await?;
659        let haf = tt.hash_and_format();
660        let tags = Tags::ref_from_sender(&blobs.client);
661        tags.set(name, haf).await?;
662        drop(tt);
663        Ok(haf)
664    }
665
666    pub async fn with_tag(self) -> RequestResult<TagInfo> {
667        let blobs = self.blobs.clone();
668        let tt = self.temp_tag().await?;
669        let hash = tt.hash();
670        let format = tt.format();
671        let tags = Tags::ref_from_sender(&blobs.client);
672        let name = tags.create(tt.hash_and_format()).await?;
673        drop(tt);
674        Ok(TagInfo { name, hash, format })
675    }
676
677    pub async fn stream(self) -> impl Stream<Item = AddProgressItem> {
678        self.inner
679    }
680}
681
682/// Options for an async reader for blobs that supports AsyncRead and AsyncSeek.
683#[derive(Debug, Clone, Serialize, Deserialize)]
684pub struct ReaderOptions {
685    pub hash: Hash,
686}
687
688/// An observe result. Awaiting this will return the current state.
689///
690/// Calling [`ObserveProgress::stream`] will return a stream of updates, where
691/// the first item is the current state and subsequent items are updates.
692pub struct ObserveProgress {
693    inner: future::Boxed<irpc::Result<mpsc::Receiver<Bitfield>>>,
694}
695
696impl IntoFuture for ObserveProgress {
697    type Output = RequestResult<Bitfield>;
698
699    type IntoFuture = Pin<Box<dyn Future<Output = Self::Output> + Send>>;
700
701    fn into_future(self) -> Self::IntoFuture {
702        Box::pin(async move {
703            let mut rx = self.inner.await?;
704            match rx.recv().await? {
705                Some(bitfield) => Ok(bitfield),
706                None => Err(super::Error::other("unexpected end of stream").into()),
707            }
708        })
709    }
710}
711
712impl ObserveProgress {
713    fn new(
714        fut: impl Future<Output = irpc::Result<mpsc::Receiver<Bitfield>>> + Send + 'static,
715    ) -> Self {
716        Self {
717            inner: Box::pin(fut),
718        }
719    }
720
721    pub async fn await_completion(self) -> RequestResult<Bitfield> {
722        let mut stream = self.stream().await?;
723        while let Some(item) = stream.next().await {
724            if item.is_complete() {
725                return Ok(item);
726            }
727        }
728        Err(super::Error::other("unexpected end of stream").into())
729    }
730
731    /// Returns an infinite stream of bitfields. The first bitfield is the
732    /// current state, and the following bitfields are updates.
733    ///
734    /// Once a blob is complete, there will be no more updates.
735    pub async fn stream(self) -> irpc::Result<impl Stream<Item = Bitfield>> {
736        let mut rx = self.inner.await?;
737        Ok(Gen::new(|co| async move {
738            while let Ok(Some(item)) = rx.recv().await {
739                co.yield_(item).await;
740            }
741        }))
742    }
743}
744
745/// A progress handle for an export operation.
746///
747/// Internally this is a stream of [`ExportProgress`] items. Working with this
748/// stream directly can be inconvenient, so this struct provides some convenience
749/// methods to work with the result.
750///
751/// To get the underlying stream, use the [`ExportProgress::stream`] method.
752///
753/// It also implements [`IntoFuture`], so you can await it to get the size of the
754/// exported blob.
755pub struct ExportProgress {
756    inner: future::Boxed<irpc::Result<mpsc::Receiver<ExportProgressItem>>>,
757}
758
759impl IntoFuture for ExportProgress {
760    type Output = RequestResult<u64>;
761
762    type IntoFuture = Pin<Box<dyn Future<Output = Self::Output> + Send>>;
763
764    fn into_future(self) -> Self::IntoFuture {
765        Box::pin(self.finish())
766    }
767}
768
769impl ExportProgress {
770    fn new(
771        fut: impl Future<Output = irpc::Result<mpsc::Receiver<ExportProgressItem>>> + Send + 'static,
772    ) -> Self {
773        Self {
774            inner: Box::pin(fut),
775        }
776    }
777
778    pub async fn stream(self) -> impl Stream<Item = ExportProgressItem> {
779        Gen::new(|co| async move {
780            let mut rx = match self.inner.await {
781                Ok(rx) => rx,
782                Err(e) => {
783                    co.yield_(ExportProgressItem::Error(e.into())).await;
784                    return;
785                }
786            };
787            while let Ok(Some(item)) = rx.recv().await {
788                co.yield_(item).await;
789            }
790        })
791    }
792
793    pub async fn finish(self) -> RequestResult<u64> {
794        let mut rx = self.inner.await?;
795        let mut size = None;
796        loop {
797            match rx.recv().await? {
798                Some(ExportProgressItem::Done) => break,
799                Some(ExportProgressItem::Size(s)) => size = Some(s),
800                Some(ExportProgressItem::Error(cause)) => return Err(cause.into()),
801                _ => {}
802            }
803        }
804        if let Some(size) = size {
805            Ok(size)
806        } else {
807            Err(super::Error::other("unexpected end of stream").into())
808        }
809    }
810}
811
812/// A handle for an ongoing bao import operation.
813pub struct ImportBaoHandle {
814    pub tx: mpsc::Sender<BaoContentItem>,
815    pub rx: oneshot::Receiver<super::Result<()>>,
816}
817
818impl ImportBaoHandle {
819    pub(crate) async fn new(
820        fut: impl Future<
821                Output = irpc::Result<(
822                    mpsc::Sender<BaoContentItem>,
823                    oneshot::Receiver<super::Result<()>>,
824                )>,
825            > + Send
826            + 'static,
827    ) -> irpc::Result<Self> {
828        let (tx, rx) = fut.await?;
829        Ok(Self { tx, rx })
830    }
831}
832
833/// A progress handle for a blobs list operation.
834pub struct BlobsListProgress {
835    inner: future::Boxed<irpc::Result<mpsc::Receiver<super::Result<Hash>>>>,
836}
837
838impl BlobsListProgress {
839    fn new(
840        fut: impl Future<Output = irpc::Result<mpsc::Receiver<super::Result<Hash>>>> + Send + 'static,
841    ) -> Self {
842        Self {
843            inner: Box::pin(fut),
844        }
845    }
846
847    pub async fn hashes(self) -> RequestResult<Vec<Hash>> {
848        let mut rx: mpsc::Receiver<Result<Hash, super::Error>> = self.inner.await?;
849        let mut hashes = Vec::new();
850        while let Some(item) = rx.recv().await? {
851            hashes.push(item?);
852        }
853        Ok(hashes)
854    }
855
856    pub async fn stream(self) -> irpc::Result<impl Stream<Item = super::Result<Hash>>> {
857        let mut rx = self.inner.await?;
858        Ok(Gen::new(|co| async move {
859            while let Ok(Some(item)) = rx.recv().await {
860                co.yield_(item).await;
861            }
862        }))
863    }
864}
865
866/// A progress handle for a bao export operation.
867///
868/// Internally, this is a stream of [`EncodedItem`]s. Using this stream directly
869/// is often inconvenient, so there are a number of higher level methods to
870/// process the stream.
871///
872/// You can get access to the underlying stream using the [`ExportBaoProgress::stream`] method.
873pub struct ExportRangesProgress {
874    ranges: RangeSet2<u64>,
875    inner: future::Boxed<irpc::Result<mpsc::Receiver<ExportRangesItem>>>,
876}
877
878impl ExportRangesProgress {
879    fn new(
880        ranges: RangeSet2<u64>,
881        fut: impl Future<Output = irpc::Result<mpsc::Receiver<ExportRangesItem>>> + Send + 'static,
882    ) -> Self {
883        Self {
884            ranges,
885            inner: Box::pin(fut),
886        }
887    }
888}
889
890impl ExportRangesProgress {
891    /// A raw stream of [`ExportRangesItem`]s.
892    ///
893    /// Ranges will be rounded up to chunk boundaries. So if you request a
894    /// range of 0..100, you will get the entire first chunk, 0..1024.
895    ///
896    /// It is up to the caller to clip the ranges to the requested ranges.
897    pub fn stream(self) -> impl Stream<Item = ExportRangesItem> {
898        Gen::new(|co| async move {
899            let mut rx = match self.inner.await {
900                Ok(rx) => rx,
901                Err(e) => {
902                    co.yield_(ExportRangesItem::Error(e.into())).await;
903                    return;
904                }
905            };
906            while let Ok(Some(item)) = rx.recv().await {
907                co.yield_(item).await;
908            }
909        })
910    }
911
912    /// Concatenate all the data into a single `Bytes`.
913    pub async fn concatenate(self) -> RequestResult<Vec<u8>> {
914        let mut rx = self.inner.await?;
915        let mut data = BTreeMap::new();
916        while let Some(item) = rx.recv().await? {
917            match item {
918                ExportRangesItem::Size(_) => {}
919                ExportRangesItem::Data(leaf) => {
920                    data.insert(leaf.offset, leaf.data);
921                }
922                ExportRangesItem::Error(cause) => return Err(cause.into()),
923            }
924        }
925        let mut res = Vec::new();
926        for range in self.ranges.iter() {
927            let (start, end) = match range {
928                RangeSetRange::RangeFrom(range) => (*range.start, u64::MAX),
929                RangeSetRange::Range(range) => (*range.start, *range.end),
930            };
931            for (offset, data) in data.iter() {
932                let cstart = *offset;
933                let cend = *offset + (data.len() as u64);
934                if cstart >= end || cend <= start {
935                    continue;
936                }
937                let start = start.max(cstart);
938                let end = end.min(cend);
939                let data = &data[(start - cstart) as usize..(end - cstart) as usize];
940                res.extend_from_slice(data);
941            }
942        }
943        Ok(res)
944    }
945}
946
947/// A progress handle for a bao export operation.
948///
949/// Internally, this is a stream of [`EncodedItem`]s. Using this stream directly
950/// is often inconvenient, so there are a number of higher level methods to
951/// process the stream.
952///
953/// You can get access to the underlying stream using the [`ExportBaoProgress::stream`] method.
954pub struct ExportBaoProgress {
955    inner: future::Boxed<irpc::Result<mpsc::Receiver<EncodedItem>>>,
956}
957
958impl ExportBaoProgress {
959    fn new(
960        fut: impl Future<Output = irpc::Result<mpsc::Receiver<EncodedItem>>> + Send + 'static,
961    ) -> Self {
962        Self {
963            inner: Box::pin(fut),
964        }
965    }
966
967    /// Interprets this blob as a hash sequence and returns a stream of hashes.
968    ///
969    /// Errors will be reported, but the iterator will nevertheless continue.
970    /// If you get an error despite having asked for ranges that should be present,
971    /// this means that the data is corrupted. It can still make sense to continue
972    /// to get all non-corrupted sections.
973    pub fn hashes_with_index(
974        self,
975    ) -> impl Stream<Item = std::result::Result<(u64, Hash), anyhow::Error>> {
976        let mut stream = self.stream();
977        Gen::new(|co| async move {
978            while let Some(item) = stream.next().await {
979                let leaf = match item {
980                    EncodedItem::Leaf(leaf) => leaf,
981                    EncodedItem::Error(e) => {
982                        co.yield_(Err(e.into())).await;
983                        continue;
984                    }
985                    _ => continue,
986                };
987                let slice = match HashSeqChunk::try_from(leaf) {
988                    Ok(slice) => slice,
989                    Err(e) => {
990                        co.yield_(Err(e)).await;
991                        continue;
992                    }
993                };
994                let offset = slice.base();
995                for (o, hash) in slice.into_iter().enumerate() {
996                    co.yield_(Ok((offset + o as u64, hash))).await;
997                }
998            }
999        })
1000    }
1001
1002    /// Same as [`Self::hashes_with_index`], but without the indexes.
1003    pub fn hashes(self) -> impl Stream<Item = std::result::Result<Hash, anyhow::Error>> {
1004        self.hashes_with_index().map(|x| x.map(|(_, hash)| hash))
1005    }
1006
1007    pub async fn bao_to_vec(self) -> RequestResult<Vec<u8>> {
1008        let mut data = Vec::new();
1009        let mut stream = self.into_byte_stream();
1010        while let Some(item) = stream.next().await {
1011            data.extend_from_slice(&item?);
1012        }
1013        Ok(data)
1014    }
1015
1016    pub async fn data_to_bytes(self) -> super::ExportBaoResult<Bytes> {
1017        let mut rx = self.inner.await?;
1018        let mut data = Vec::new();
1019        while let Some(item) = rx.recv().await? {
1020            match item {
1021                EncodedItem::Leaf(leaf) => {
1022                    data.push(leaf.data);
1023                }
1024                EncodedItem::Parent(_) => {}
1025                EncodedItem::Size(_) => {}
1026                EncodedItem::Done => break,
1027                EncodedItem::Error(cause) => return Err(cause.into()),
1028            }
1029        }
1030        if data.len() == 1 {
1031            Ok(data.pop().unwrap())
1032        } else {
1033            let mut out = Vec::new();
1034            for item in data {
1035                out.extend_from_slice(&item);
1036            }
1037            Ok(out.into())
1038        }
1039    }
1040
1041    pub async fn data_to_vec(self) -> super::ExportBaoResult<Vec<u8>> {
1042        let mut rx = self.inner.await?;
1043        let mut data = Vec::new();
1044        while let Some(item) = rx.recv().await? {
1045            match item {
1046                EncodedItem::Leaf(leaf) => {
1047                    data.extend_from_slice(&leaf.data);
1048                }
1049                EncodedItem::Parent(_) => {}
1050                EncodedItem::Size(_) => {}
1051                EncodedItem::Done => break,
1052                EncodedItem::Error(cause) => return Err(cause.into()),
1053            }
1054        }
1055        Ok(data)
1056    }
1057
1058    pub async fn write<W: AsyncStreamWriter>(self, target: &mut W) -> super::ExportBaoResult<()> {
1059        let mut rx = self.inner.await?;
1060        while let Some(item) = rx.recv().await? {
1061            match item {
1062                EncodedItem::Size(size) => {
1063                    target.write(&size.to_le_bytes()).await?;
1064                }
1065                EncodedItem::Parent(parent) => {
1066                    let mut data = vec![0u8; 64];
1067                    data[..32].copy_from_slice(parent.pair.0.as_bytes());
1068                    data[32..].copy_from_slice(parent.pair.1.as_bytes());
1069                    target.write(&data).await?;
1070                }
1071                EncodedItem::Leaf(leaf) => {
1072                    target.write_bytes(leaf.data).await?;
1073                }
1074                EncodedItem::Done => break,
1075                EncodedItem::Error(cause) => return Err(cause.into()),
1076            }
1077        }
1078        Ok(())
1079    }
1080
1081    /// Write quinn variant that also feeds a progress writer.
1082    pub(crate) async fn write_with_progress<W: crate::util::SendStream>(
1083        self,
1084        writer: &mut W,
1085        progress: &mut impl WriteProgress,
1086        hash: &Hash,
1087        index: u64,
1088    ) -> super::ExportBaoResult<()> {
1089        let mut rx = self.inner.await?;
1090        while let Some(item) = rx.recv().await? {
1091            match item {
1092                EncodedItem::Size(size) => {
1093                    progress.send_transfer_started(index, hash, size).await;
1094                    writer.send(&size.to_le_bytes()).await?;
1095                    progress.log_other_write(8);
1096                }
1097                EncodedItem::Parent(parent) => {
1098                    let mut data = [0u8; 64];
1099                    data[..32].copy_from_slice(parent.pair.0.as_bytes());
1100                    data[32..].copy_from_slice(parent.pair.1.as_bytes());
1101                    writer.send(&data).await?;
1102                    progress.log_other_write(64);
1103                }
1104                EncodedItem::Leaf(leaf) => {
1105                    let len = leaf.data.len();
1106                    writer.send_bytes(leaf.data).await?;
1107                    progress
1108                        .notify_payload_write(index, leaf.offset, len)
1109                        .await?;
1110                }
1111                EncodedItem::Done => break,
1112                EncodedItem::Error(cause) => return Err(cause.into()),
1113            }
1114        }
1115        Ok(())
1116    }
1117
1118    pub fn into_byte_stream(self) -> impl Stream<Item = super::Result<Bytes>> {
1119        self.stream().filter_map(|item| match item {
1120            EncodedItem::Size(size) => {
1121                let size = size.to_le_bytes().to_vec().into();
1122                Some(Ok(size))
1123            }
1124            EncodedItem::Parent(parent) => {
1125                let mut data = vec![0u8; 64];
1126                data[..32].copy_from_slice(parent.pair.0.as_bytes());
1127                data[32..].copy_from_slice(parent.pair.1.as_bytes());
1128                Some(Ok(data.into()))
1129            }
1130            EncodedItem::Leaf(leaf) => Some(Ok(leaf.data)),
1131            EncodedItem::Done => None,
1132            EncodedItem::Error(cause) => Some(Err(cause.into())),
1133        })
1134    }
1135
1136    pub fn stream(self) -> impl Stream<Item = EncodedItem> {
1137        Gen::new(|co| async move {
1138            let mut rx = match self.inner.await {
1139                Ok(rx) => rx,
1140                Err(cause) => {
1141                    co.yield_(EncodedItem::Error(io::Error::other(cause).into()))
1142                        .await;
1143                    return;
1144                }
1145            };
1146            while let Ok(Some(item)) = rx.recv().await {
1147                co.yield_(item).await;
1148            }
1149        })
1150    }
1151}
1152
1153pub(crate) trait WriteProgress {
1154    /// Notify the progress writer that a payload write has happened.
1155    async fn notify_payload_write(&mut self, index: u64, offset: u64, len: usize) -> ClientResult;
1156
1157    /// Log a write of some other data.
1158    fn log_other_write(&mut self, len: usize);
1159
1160    /// Notify the progress writer that a transfer has started.
1161    async fn send_transfer_started(&mut self, index: u64, hash: &Hash, size: u64);
1162}