1use std::{
8 collections::BTreeMap,
9 future::{Future, IntoFuture},
10 io,
11 num::NonZeroU64,
12 path::{Path, PathBuf},
13 pin::Pin,
14};
15
16pub use bao_tree::io::mixed::EncodedItem;
17use bao_tree::{
18 io::{
19 fsm::{ResponseDecoder, ResponseDecoderNext},
20 BaoContentItem, Leaf,
21 },
22 BaoTree, ChunkNum, ChunkRanges,
23};
24use bytes::Bytes;
25use genawaiter::sync::Gen;
26use iroh_io::{AsyncStreamReader, TokioStreamReader};
27use irpc::channel::{mpsc, oneshot};
28use n0_future::{future, stream, Stream, StreamExt};
29use quinn::SendStream;
30use range_collections::{range_set::RangeSetRange, RangeSet2};
31use ref_cast::RefCast;
32use serde::{Deserialize, Serialize};
33use tokio::io::AsyncWriteExt;
34use tracing::trace;
35mod reader;
36pub use reader::BlobReader;
37
38pub use super::proto::{
43 AddProgressItem, Bitfield, BlobDeleteRequest as DeleteOptions, BlobStatus,
44 ExportBaoRequest as ExportBaoOptions, ExportMode, ExportPathRequest as ExportOptions,
45 ExportProgressItem, ExportRangesRequest as ExportRangesOptions,
46 ImportBaoRequest as ImportBaoOptions, ImportMode, ObserveRequest as ObserveOptions,
47};
48use super::{
49 proto::{
50 BatchResponse, BlobStatusRequest, ClearProtectedRequest, CreateTempTagRequest,
51 ExportBaoRequest, ExportRangesItem, ImportBaoRequest, ImportByteStreamRequest,
52 ImportBytesRequest, ImportPathRequest, ListRequest, Scope,
53 },
54 remote::HashSeqChunk,
55 tags::TagInfo,
56 ApiClient, RequestResult, Tags,
57};
58use crate::{
59 api::proto::{BatchRequest, ImportByteStreamUpdate},
60 provider::events::ClientResult,
61 store::IROH_BLOCK_SIZE,
62 util::temp_tag::TempTag,
63 BlobFormat, Hash, HashAndFormat,
64};
65
66#[derive(Debug)]
68pub struct AddBytesOptions {
69 pub data: Bytes,
70 pub format: BlobFormat,
71}
72
73impl<T: Into<Bytes>> From<(T, BlobFormat)> for AddBytesOptions {
74 fn from(item: (T, BlobFormat)) -> Self {
75 let (data, format) = item;
76 Self {
77 data: data.into(),
78 format,
79 }
80 }
81}
82
83#[derive(Debug, Clone, ref_cast::RefCast)]
85#[repr(transparent)]
86pub struct Blobs {
87 client: ApiClient,
88}
89
90impl Blobs {
91 pub(crate) fn ref_from_sender(sender: &ApiClient) -> &Self {
92 Self::ref_cast(sender)
93 }
94
95 pub async fn batch(&self) -> irpc::Result<Batch<'_>> {
96 let msg = BatchRequest;
97 trace!("{msg:?}");
98 let (tx, rx) = self.client.client_streaming(msg, 32).await?;
99 let scope = rx.await?;
100
101 Ok(Batch {
102 scope,
103 blobs: self,
104 _tx: tx,
105 })
106 }
107
108 pub fn reader(&self, hash: impl Into<Hash>) -> BlobReader {
129 self.reader_with_opts(ReaderOptions { hash: hash.into() })
130 }
131
132 pub fn reader_with_opts(&self, options: ReaderOptions) -> BlobReader {
137 BlobReader::new(self.clone(), options)
138 }
139
140 #[cfg(feature = "fs-store")]
148 pub(crate) async fn delete_with_opts(&self, options: DeleteOptions) -> RequestResult<()> {
149 trace!("{options:?}");
150 self.client.rpc(options).await??;
151 Ok(())
152 }
153
154 #[cfg(feature = "fs-store")]
156 pub(crate) async fn delete(
157 &self,
158 hashes: impl IntoIterator<Item = impl Into<Hash>>,
159 ) -> RequestResult<()> {
160 self.delete_with_opts(DeleteOptions {
161 hashes: hashes.into_iter().map(Into::into).collect(),
162 force: false,
163 })
164 .await
165 }
166
167 pub fn add_slice(&self, data: impl AsRef<[u8]>) -> AddProgress<'_> {
168 let options = ImportBytesRequest {
169 data: Bytes::copy_from_slice(data.as_ref()),
170 format: crate::BlobFormat::Raw,
171 scope: Scope::GLOBAL,
172 };
173 self.add_bytes_impl(options)
174 }
175
176 pub fn add_bytes(&self, data: impl Into<bytes::Bytes>) -> AddProgress<'_> {
177 let options = ImportBytesRequest {
178 data: data.into(),
179 format: crate::BlobFormat::Raw,
180 scope: Scope::GLOBAL,
181 };
182 self.add_bytes_impl(options)
183 }
184
185 pub fn add_bytes_with_opts(&self, options: impl Into<AddBytesOptions>) -> AddProgress<'_> {
186 let options = options.into();
187 let request = ImportBytesRequest {
188 data: options.data,
189 format: options.format,
190 scope: Scope::GLOBAL,
191 };
192 self.add_bytes_impl(request)
193 }
194
195 fn add_bytes_impl(&self, options: ImportBytesRequest) -> AddProgress<'_> {
196 trace!("{options:?}");
197 let this = self.clone();
198 let stream = Gen::new(|co| async move {
199 let mut receiver = match this.client.server_streaming(options, 32).await {
200 Ok(receiver) => receiver,
201 Err(cause) => {
202 co.yield_(AddProgressItem::Error(cause.into())).await;
203 return;
204 }
205 };
206 loop {
207 match receiver.recv().await {
208 Ok(Some(item)) => co.yield_(item).await,
209 Err(cause) => {
210 co.yield_(AddProgressItem::Error(cause.into())).await;
211 break;
212 }
213 Ok(None) => break,
214 }
215 }
216 });
217 AddProgress::new(self, stream)
218 }
219
220 pub fn add_path_with_opts(&self, options: impl Into<AddPathOptions>) -> AddProgress<'_> {
221 let options = options.into();
222 self.add_path_with_opts_impl(ImportPathRequest {
223 path: options.path,
224 mode: options.mode,
225 format: options.format,
226 scope: Scope::GLOBAL,
227 })
228 }
229
230 fn add_path_with_opts_impl(&self, options: ImportPathRequest) -> AddProgress<'_> {
231 trace!("{:?}", options);
232 let client = self.client.clone();
233 let stream = Gen::new(|co| async move {
234 let mut receiver = match client.server_streaming(options, 32).await {
235 Ok(receiver) => receiver,
236 Err(cause) => {
237 co.yield_(AddProgressItem::Error(cause.into())).await;
238 return;
239 }
240 };
241 loop {
242 match receiver.recv().await {
243 Ok(Some(item)) => co.yield_(item).await,
244 Err(cause) => {
245 co.yield_(AddProgressItem::Error(cause.into())).await;
246 break;
247 }
248 Ok(None) => break,
249 }
250 }
251 });
252 AddProgress::new(self, stream)
253 }
254
255 pub fn add_path(&self, path: impl AsRef<Path>) -> AddProgress<'_> {
256 self.add_path_with_opts(AddPathOptions {
257 path: path.as_ref().to_owned(),
258 mode: ImportMode::Copy,
259 format: BlobFormat::Raw,
260 })
261 }
262
263 pub async fn add_stream(
264 &self,
265 data: impl Stream<Item = io::Result<Bytes>> + Send + Sync + 'static,
266 ) -> AddProgress<'_> {
267 let inner = ImportByteStreamRequest {
268 format: crate::BlobFormat::Raw,
269 scope: Scope::default(),
270 };
271 let client = self.client.clone();
272 let stream = Gen::new(|co| async move {
273 let (sender, mut receiver) = match client.bidi_streaming(inner, 32, 32).await {
274 Ok(x) => x,
275 Err(cause) => {
276 co.yield_(AddProgressItem::Error(cause.into())).await;
277 return;
278 }
279 };
280 let recv = async {
281 loop {
282 match receiver.recv().await {
283 Ok(Some(item)) => co.yield_(item).await,
284 Err(cause) => {
285 co.yield_(AddProgressItem::Error(cause.into())).await;
286 break;
287 }
288 Ok(None) => break,
289 }
290 }
291 };
292 let send = async {
293 tokio::pin!(data);
294 while let Some(item) = data.next().await {
295 sender.send(ImportByteStreamUpdate::Bytes(item?)).await?;
296 }
297 sender.send(ImportByteStreamUpdate::Done).await?;
298 anyhow::Ok(())
299 };
300 let _ = tokio::join!(send, recv);
301 });
302 AddProgress::new(self, stream)
303 }
304
305 pub fn export_ranges(
306 &self,
307 hash: impl Into<Hash>,
308 ranges: impl Into<RangeSet2<u64>>,
309 ) -> ExportRangesProgress {
310 self.export_ranges_with_opts(ExportRangesOptions {
311 hash: hash.into(),
312 ranges: ranges.into(),
313 })
314 }
315
316 pub fn export_ranges_with_opts(&self, options: ExportRangesOptions) -> ExportRangesProgress {
317 trace!("{options:?}");
318 ExportRangesProgress::new(
319 options.ranges.clone(),
320 self.client.server_streaming(options, 32),
321 )
322 }
323
324 pub fn export_bao_with_opts(
325 &self,
326 options: ExportBaoOptions,
327 local_update_cap: usize,
328 ) -> ExportBaoProgress {
329 trace!("{options:?}");
330 ExportBaoProgress::new(self.client.server_streaming(options, local_update_cap))
331 }
332
333 pub fn export_bao(
334 &self,
335 hash: impl Into<Hash>,
336 ranges: impl Into<ChunkRanges>,
337 ) -> ExportBaoProgress {
338 self.export_bao_with_opts(
339 ExportBaoRequest {
340 hash: hash.into(),
341 ranges: ranges.into(),
342 },
343 32,
344 )
345 }
346
347 pub async fn export_chunk(
349 &self,
350 hash: impl Into<Hash>,
351 offset: u64,
352 ) -> super::ExportBaoResult<Leaf> {
353 let base = ChunkNum::full_chunks(offset);
354 let ranges = ChunkRanges::from(base..base + 1);
355 let mut stream = self.export_bao(hash, ranges).stream();
356 while let Some(item) = stream.next().await {
357 match item {
358 EncodedItem::Leaf(leaf) => return Ok(leaf),
359 EncodedItem::Parent(_) => {}
360 EncodedItem::Size(_) => {}
361 EncodedItem::Done => break,
362 EncodedItem::Error(cause) => return Err(cause.into()),
363 }
364 }
365 Err(io::Error::other("unexpected end of stream").into())
366 }
367
368 pub async fn get_bytes(&self, hash: impl Into<Hash>) -> super::ExportBaoResult<Bytes> {
372 self.export_bao(hash.into(), ChunkRanges::all())
373 .data_to_bytes()
374 .await
375 }
376
377 pub fn observe(&self, hash: impl Into<Hash>) -> ObserveProgress {
379 self.observe_with_opts(ObserveOptions { hash: hash.into() })
380 }
381
382 pub fn observe_with_opts(&self, options: ObserveOptions) -> ObserveProgress {
383 trace!("{:?}", options);
384 if options.hash == Hash::EMPTY {
385 return ObserveProgress::new(async move {
386 let (tx, rx) = mpsc::channel(1);
387 tx.send(Bitfield::complete(0)).await.ok();
388 Ok(rx)
389 });
390 }
391 ObserveProgress::new(self.client.server_streaming(options, 32))
392 }
393
394 pub fn export_with_opts(&self, options: ExportOptions) -> ExportProgress {
395 trace!("{:?}", options);
396 ExportProgress::new(self.client.server_streaming(options, 32))
397 }
398
399 pub fn export(&self, hash: impl Into<Hash>, target: impl AsRef<Path>) -> ExportProgress {
400 let options = ExportOptions {
401 hash: hash.into(),
402 mode: ExportMode::Copy,
403 target: target.as_ref().to_owned(),
404 };
405 self.export_with_opts(options)
406 }
407
408 #[cfg_attr(feature = "hide-proto-docs", doc(hidden))]
412 pub async fn import_bao(
413 &self,
414 hash: impl Into<Hash>,
415 size: NonZeroU64,
416 local_update_cap: usize,
417 ) -> irpc::Result<ImportBaoHandle> {
418 let options = ImportBaoRequest {
419 hash: hash.into(),
420 size,
421 };
422 self.import_bao_with_opts(options, local_update_cap).await
423 }
424
425 #[cfg_attr(feature = "hide-proto-docs", doc(hidden))]
426 pub async fn import_bao_with_opts(
427 &self,
428 options: ImportBaoOptions,
429 local_update_cap: usize,
430 ) -> irpc::Result<ImportBaoHandle> {
431 trace!("{:?}", options);
432 ImportBaoHandle::new(self.client.client_streaming(options, local_update_cap)).await
433 }
434
435 #[cfg_attr(feature = "hide-proto-docs", doc(hidden))]
436 async fn import_bao_reader<R: AsyncStreamReader>(
437 &self,
438 hash: Hash,
439 ranges: ChunkRanges,
440 mut reader: R,
441 ) -> RequestResult<R> {
442 let size = u64::from_le_bytes(reader.read::<8>().await.map_err(super::Error::other)?);
443 let Some(size) = NonZeroU64::new(size) else {
444 return if hash == Hash::EMPTY {
445 Ok(reader)
446 } else {
447 Err(super::Error::other("invalid size for hash").into())
448 };
449 };
450 let tree = BaoTree::new(size.get(), IROH_BLOCK_SIZE);
451 let mut decoder = ResponseDecoder::new(hash.into(), ranges, tree, reader);
452 let options = ImportBaoOptions { hash, size };
453 let handle = self.import_bao_with_opts(options, 32).await?;
454 let driver = async move {
455 let reader = loop {
456 match decoder.next().await {
457 ResponseDecoderNext::More((rest, item)) => {
458 handle.tx.send(item?).await?;
459 decoder = rest;
460 }
461 ResponseDecoderNext::Done(reader) => break reader,
462 };
463 };
464 drop(handle.tx);
465 io::Result::Ok(reader)
466 };
467 let fut = async move { handle.rx.await.map_err(io::Error::other)? };
468 let (reader, res) = tokio::join!(driver, fut);
469 res?;
470 Ok(reader?)
471 }
472
473 #[cfg_attr(feature = "hide-proto-docs", doc(hidden))]
474 pub async fn import_bao_quinn(
475 &self,
476 hash: Hash,
477 ranges: ChunkRanges,
478 stream: &mut iroh::endpoint::RecvStream,
479 ) -> RequestResult<()> {
480 let reader = TokioStreamReader::new(stream);
481 self.import_bao_reader(hash, ranges, reader).await?;
482 Ok(())
483 }
484
485 #[cfg_attr(feature = "hide-proto-docs", doc(hidden))]
486 pub async fn import_bao_bytes(
487 &self,
488 hash: Hash,
489 ranges: ChunkRanges,
490 data: impl Into<Bytes>,
491 ) -> RequestResult<()> {
492 self.import_bao_reader(hash, ranges, data.into()).await?;
493 Ok(())
494 }
495
496 pub fn list(&self) -> BlobsListProgress {
497 let msg = ListRequest;
498 let client = self.client.clone();
499 BlobsListProgress::new(client.server_streaming(msg, 32))
500 }
501
502 pub async fn status(&self, hash: impl Into<Hash>) -> irpc::Result<BlobStatus> {
503 let hash = hash.into();
504 let msg = BlobStatusRequest { hash };
505 self.client.rpc(msg).await
506 }
507
508 pub async fn has(&self, hash: impl Into<Hash>) -> irpc::Result<bool> {
509 match self.status(hash).await? {
510 BlobStatus::Complete { .. } => Ok(true),
511 _ => Ok(false),
512 }
513 }
514
515 #[allow(dead_code)]
516 pub(crate) async fn clear_protected(&self) -> RequestResult<()> {
517 let msg = ClearProtectedRequest;
518 self.client.rpc(msg).await??;
519 Ok(())
520 }
521}
522
523pub struct BatchAddProgress<'a>(AddProgress<'a>);
525
526impl<'a> IntoFuture for BatchAddProgress<'a> {
527 type Output = RequestResult<TempTag>;
528
529 type IntoFuture = Pin<Box<dyn Future<Output = Self::Output> + Send + 'a>>;
530
531 fn into_future(self) -> Self::IntoFuture {
532 Box::pin(self.temp_tag())
533 }
534}
535
536impl<'a> BatchAddProgress<'a> {
537 pub async fn with_named_tag(self, name: impl AsRef<[u8]>) -> RequestResult<HashAndFormat> {
538 self.0.with_named_tag(name).await
539 }
540
541 pub async fn with_tag(self) -> RequestResult<TagInfo> {
542 self.0.with_tag().await
543 }
544
545 pub async fn stream(self) -> impl Stream<Item = AddProgressItem> {
546 self.0.stream().await
547 }
548
549 pub async fn temp_tag(self) -> RequestResult<TempTag> {
550 self.0.temp_tag().await
551 }
552}
553
554pub struct Batch<'a> {
556 scope: Scope,
557 blobs: &'a Blobs,
558 _tx: mpsc::Sender<BatchResponse>,
559}
560
561impl<'a> Batch<'a> {
562 pub fn add_bytes(&self, data: impl Into<Bytes>) -> BatchAddProgress<'_> {
563 let options = ImportBytesRequest {
564 data: data.into(),
565 format: crate::BlobFormat::Raw,
566 scope: self.scope,
567 };
568 BatchAddProgress(self.blobs.add_bytes_impl(options))
569 }
570
571 pub fn add_bytes_with_opts(&self, options: impl Into<AddBytesOptions>) -> BatchAddProgress<'_> {
572 let options = options.into();
573 BatchAddProgress(self.blobs.add_bytes_impl(ImportBytesRequest {
574 data: options.data,
575 format: options.format,
576 scope: self.scope,
577 }))
578 }
579
580 pub fn add_slice(&self, data: impl AsRef<[u8]>) -> BatchAddProgress<'_> {
581 let options = ImportBytesRequest {
582 data: Bytes::copy_from_slice(data.as_ref()),
583 format: crate::BlobFormat::Raw,
584 scope: self.scope,
585 };
586 BatchAddProgress(self.blobs.add_bytes_impl(options))
587 }
588
589 pub fn add_path_with_opts(&self, options: impl Into<AddPathOptions>) -> BatchAddProgress<'_> {
590 let options = options.into();
591 BatchAddProgress(self.blobs.add_path_with_opts_impl(ImportPathRequest {
592 path: options.path,
593 mode: options.mode,
594 format: options.format,
595 scope: self.scope,
596 }))
597 }
598
599 pub async fn temp_tag(&self, value: impl Into<HashAndFormat>) -> irpc::Result<TempTag> {
600 let value = value.into();
601 let msg = CreateTempTagRequest {
602 scope: self.scope,
603 value,
604 };
605 self.blobs.client.rpc(msg).await
606 }
607}
608
609#[derive(Debug)]
611pub struct AddPathOptions {
612 pub path: PathBuf,
613 pub format: BlobFormat,
614 pub mode: ImportMode,
615}
616
617pub struct AddProgress<'a> {
628 blobs: &'a Blobs,
629 inner: stream::Boxed<AddProgressItem>,
630}
631
632impl<'a> IntoFuture for AddProgress<'a> {
633 type Output = RequestResult<TagInfo>;
634
635 type IntoFuture = Pin<Box<dyn Future<Output = Self::Output> + Send + 'a>>;
636
637 fn into_future(self) -> Self::IntoFuture {
638 Box::pin(self.with_tag())
639 }
640}
641
642impl<'a> AddProgress<'a> {
643 fn new(blobs: &'a Blobs, stream: impl Stream<Item = AddProgressItem> + Send + 'static) -> Self {
644 Self {
645 blobs,
646 inner: Box::pin(stream),
647 }
648 }
649
650 pub async fn temp_tag(self) -> RequestResult<TempTag> {
651 let mut stream = self.inner;
652 while let Some(item) = stream.next().await {
653 match item {
654 AddProgressItem::Done(tt) => return Ok(tt),
655 AddProgressItem::Error(e) => return Err(e.into()),
656 _ => {}
657 }
658 }
659 Err(super::Error::other("unexpected end of stream").into())
660 }
661
662 pub async fn with_named_tag(self, name: impl AsRef<[u8]>) -> RequestResult<HashAndFormat> {
663 let blobs = self.blobs.clone();
664 let tt = self.temp_tag().await?;
665 let haf = *tt.hash_and_format();
666 let tags = Tags::ref_from_sender(&blobs.client);
667 tags.set(name, *tt.hash_and_format()).await?;
668 drop(tt);
669 Ok(haf)
670 }
671
672 pub async fn with_tag(self) -> RequestResult<TagInfo> {
673 let blobs = self.blobs.clone();
674 let tt = self.temp_tag().await?;
675 let hash = *tt.hash();
676 let format = tt.format();
677 let tags = Tags::ref_from_sender(&blobs.client);
678 let name = tags.create(*tt.hash_and_format()).await?;
679 drop(tt);
680 Ok(TagInfo { name, hash, format })
681 }
682
683 pub async fn stream(self) -> impl Stream<Item = AddProgressItem> {
684 self.inner
685 }
686}
687
688#[derive(Debug, Clone, Serialize, Deserialize)]
690pub struct ReaderOptions {
691 pub hash: Hash,
692}
693
694pub struct ObserveProgress {
699 inner: future::Boxed<irpc::Result<mpsc::Receiver<Bitfield>>>,
700}
701
702impl IntoFuture for ObserveProgress {
703 type Output = RequestResult<Bitfield>;
704
705 type IntoFuture = Pin<Box<dyn Future<Output = Self::Output> + Send>>;
706
707 fn into_future(self) -> Self::IntoFuture {
708 Box::pin(async move {
709 let mut rx = self.inner.await?;
710 match rx.recv().await? {
711 Some(bitfield) => Ok(bitfield),
712 None => Err(super::Error::other("unexpected end of stream").into()),
713 }
714 })
715 }
716}
717
718impl ObserveProgress {
719 fn new(
720 fut: impl Future<Output = irpc::Result<mpsc::Receiver<Bitfield>>> + Send + 'static,
721 ) -> Self {
722 Self {
723 inner: Box::pin(fut),
724 }
725 }
726
727 pub async fn await_completion(self) -> RequestResult<Bitfield> {
728 let mut stream = self.stream().await?;
729 while let Some(item) = stream.next().await {
730 if item.is_complete() {
731 return Ok(item);
732 }
733 }
734 Err(super::Error::other("unexpected end of stream").into())
735 }
736
737 pub async fn stream(self) -> irpc::Result<impl Stream<Item = Bitfield>> {
742 let mut rx = self.inner.await?;
743 Ok(Gen::new(|co| async move {
744 while let Ok(Some(item)) = rx.recv().await {
745 co.yield_(item).await;
746 }
747 }))
748 }
749}
750
751pub struct ExportProgress {
762 inner: future::Boxed<irpc::Result<mpsc::Receiver<ExportProgressItem>>>,
763}
764
765impl IntoFuture for ExportProgress {
766 type Output = RequestResult<u64>;
767
768 type IntoFuture = Pin<Box<dyn Future<Output = Self::Output> + Send>>;
769
770 fn into_future(self) -> Self::IntoFuture {
771 Box::pin(self.finish())
772 }
773}
774
775impl ExportProgress {
776 fn new(
777 fut: impl Future<Output = irpc::Result<mpsc::Receiver<ExportProgressItem>>> + Send + 'static,
778 ) -> Self {
779 Self {
780 inner: Box::pin(fut),
781 }
782 }
783
784 pub async fn stream(self) -> impl Stream<Item = ExportProgressItem> {
785 Gen::new(|co| async move {
786 let mut rx = match self.inner.await {
787 Ok(rx) => rx,
788 Err(e) => {
789 co.yield_(ExportProgressItem::Error(e.into())).await;
790 return;
791 }
792 };
793 while let Ok(Some(item)) = rx.recv().await {
794 co.yield_(item).await;
795 }
796 })
797 }
798
799 pub async fn finish(self) -> RequestResult<u64> {
800 let mut rx = self.inner.await?;
801 let mut size = None;
802 loop {
803 match rx.recv().await? {
804 Some(ExportProgressItem::Done) => break,
805 Some(ExportProgressItem::Size(s)) => size = Some(s),
806 Some(ExportProgressItem::Error(cause)) => return Err(cause.into()),
807 _ => {}
808 }
809 }
810 if let Some(size) = size {
811 Ok(size)
812 } else {
813 Err(super::Error::other("unexpected end of stream").into())
814 }
815 }
816}
817
818pub struct ImportBaoHandle {
820 pub tx: mpsc::Sender<BaoContentItem>,
821 pub rx: oneshot::Receiver<super::Result<()>>,
822}
823
824impl ImportBaoHandle {
825 pub(crate) async fn new(
826 fut: impl Future<
827 Output = irpc::Result<(
828 mpsc::Sender<BaoContentItem>,
829 oneshot::Receiver<super::Result<()>>,
830 )>,
831 > + Send
832 + 'static,
833 ) -> irpc::Result<Self> {
834 let (tx, rx) = fut.await?;
835 Ok(Self { tx, rx })
836 }
837}
838
839pub struct BlobsListProgress {
841 inner: future::Boxed<irpc::Result<mpsc::Receiver<super::Result<Hash>>>>,
842}
843
844impl BlobsListProgress {
845 fn new(
846 fut: impl Future<Output = irpc::Result<mpsc::Receiver<super::Result<Hash>>>> + Send + 'static,
847 ) -> Self {
848 Self {
849 inner: Box::pin(fut),
850 }
851 }
852
853 pub async fn hashes(self) -> RequestResult<Vec<Hash>> {
854 let mut rx: mpsc::Receiver<Result<Hash, super::Error>> = self.inner.await?;
855 let mut hashes = Vec::new();
856 while let Some(item) = rx.recv().await? {
857 hashes.push(item?);
858 }
859 Ok(hashes)
860 }
861
862 pub async fn stream(self) -> irpc::Result<impl Stream<Item = super::Result<Hash>>> {
863 let mut rx = self.inner.await?;
864 Ok(Gen::new(|co| async move {
865 while let Ok(Some(item)) = rx.recv().await {
866 co.yield_(item).await;
867 }
868 }))
869 }
870}
871
872pub struct ExportRangesProgress {
880 ranges: RangeSet2<u64>,
881 inner: future::Boxed<irpc::Result<mpsc::Receiver<ExportRangesItem>>>,
882}
883
884impl ExportRangesProgress {
885 fn new(
886 ranges: RangeSet2<u64>,
887 fut: impl Future<Output = irpc::Result<mpsc::Receiver<ExportRangesItem>>> + Send + 'static,
888 ) -> Self {
889 Self {
890 ranges,
891 inner: Box::pin(fut),
892 }
893 }
894}
895
896impl ExportRangesProgress {
897 pub fn stream(self) -> impl Stream<Item = ExportRangesItem> {
904 Gen::new(|co| async move {
905 let mut rx = match self.inner.await {
906 Ok(rx) => rx,
907 Err(e) => {
908 co.yield_(ExportRangesItem::Error(e.into())).await;
909 return;
910 }
911 };
912 while let Ok(Some(item)) = rx.recv().await {
913 co.yield_(item).await;
914 }
915 })
916 }
917
918 pub async fn concatenate(self) -> RequestResult<Vec<u8>> {
920 let mut rx = self.inner.await?;
921 let mut data = BTreeMap::new();
922 while let Some(item) = rx.recv().await? {
923 match item {
924 ExportRangesItem::Size(_) => {}
925 ExportRangesItem::Data(leaf) => {
926 data.insert(leaf.offset, leaf.data);
927 }
928 ExportRangesItem::Error(cause) => return Err(cause.into()),
929 }
930 }
931 let mut res = Vec::new();
932 for range in self.ranges.iter() {
933 let (start, end) = match range {
934 RangeSetRange::RangeFrom(range) => (*range.start, u64::MAX),
935 RangeSetRange::Range(range) => (*range.start, *range.end),
936 };
937 for (offset, data) in data.iter() {
938 let cstart = *offset;
939 let cend = *offset + (data.len() as u64);
940 if cstart >= end || cend <= start {
941 continue;
942 }
943 let start = start.max(cstart);
944 let end = end.min(cend);
945 let data = &data[(start - cstart) as usize..(end - cstart) as usize];
946 res.extend_from_slice(data);
947 }
948 }
949 Ok(res)
950 }
951}
952
953pub struct ExportBaoProgress {
961 inner: future::Boxed<irpc::Result<mpsc::Receiver<EncodedItem>>>,
962}
963
964impl ExportBaoProgress {
965 fn new(
966 fut: impl Future<Output = irpc::Result<mpsc::Receiver<EncodedItem>>> + Send + 'static,
967 ) -> Self {
968 Self {
969 inner: Box::pin(fut),
970 }
971 }
972
973 pub fn hashes_with_index(
980 self,
981 ) -> impl Stream<Item = std::result::Result<(u64, Hash), anyhow::Error>> {
982 let mut stream = self.stream();
983 Gen::new(|co| async move {
984 while let Some(item) = stream.next().await {
985 let leaf = match item {
986 EncodedItem::Leaf(leaf) => leaf,
987 EncodedItem::Error(e) => {
988 co.yield_(Err(e.into())).await;
989 continue;
990 }
991 _ => continue,
992 };
993 let slice = match HashSeqChunk::try_from(leaf) {
994 Ok(slice) => slice,
995 Err(e) => {
996 co.yield_(Err(e)).await;
997 continue;
998 }
999 };
1000 let offset = slice.base();
1001 for (o, hash) in slice.into_iter().enumerate() {
1002 co.yield_(Ok((offset + o as u64, hash))).await;
1003 }
1004 }
1005 })
1006 }
1007
1008 pub fn hashes(self) -> impl Stream<Item = std::result::Result<Hash, anyhow::Error>> {
1010 self.hashes_with_index().map(|x| x.map(|(_, hash)| hash))
1011 }
1012
1013 pub async fn bao_to_vec(self) -> RequestResult<Vec<u8>> {
1014 let mut data = Vec::new();
1015 let mut stream = self.into_byte_stream();
1016 while let Some(item) = stream.next().await {
1017 data.extend_from_slice(&item?);
1018 }
1019 Ok(data)
1020 }
1021
1022 pub async fn data_to_bytes(self) -> super::ExportBaoResult<Bytes> {
1023 let mut rx = self.inner.await?;
1024 let mut data = Vec::new();
1025 while let Some(item) = rx.recv().await? {
1026 match item {
1027 EncodedItem::Leaf(leaf) => {
1028 data.push(leaf.data);
1029 }
1030 EncodedItem::Parent(_) => {}
1031 EncodedItem::Size(_) => {}
1032 EncodedItem::Done => break,
1033 EncodedItem::Error(cause) => return Err(cause.into()),
1034 }
1035 }
1036 if data.len() == 1 {
1037 Ok(data.pop().unwrap())
1038 } else {
1039 let mut out = Vec::new();
1040 for item in data {
1041 out.extend_from_slice(&item);
1042 }
1043 Ok(out.into())
1044 }
1045 }
1046
1047 pub async fn data_to_vec(self) -> super::ExportBaoResult<Vec<u8>> {
1048 let mut rx = self.inner.await?;
1049 let mut data = Vec::new();
1050 while let Some(item) = rx.recv().await? {
1051 match item {
1052 EncodedItem::Leaf(leaf) => {
1053 data.extend_from_slice(&leaf.data);
1054 }
1055 EncodedItem::Parent(_) => {}
1056 EncodedItem::Size(_) => {}
1057 EncodedItem::Done => break,
1058 EncodedItem::Error(cause) => return Err(cause.into()),
1059 }
1060 }
1061 Ok(data)
1062 }
1063
1064 pub async fn write_quinn(self, target: &mut quinn::SendStream) -> super::ExportBaoResult<()> {
1065 let mut rx = self.inner.await?;
1066 while let Some(item) = rx.recv().await? {
1067 match item {
1068 EncodedItem::Size(size) => {
1069 target.write_u64_le(size).await?;
1070 }
1071 EncodedItem::Parent(parent) => {
1072 let mut data = vec![0u8; 64];
1073 data[..32].copy_from_slice(parent.pair.0.as_bytes());
1074 data[32..].copy_from_slice(parent.pair.1.as_bytes());
1075 target.write_all(&data).await.map_err(io::Error::from)?;
1076 }
1077 EncodedItem::Leaf(leaf) => {
1078 target
1079 .write_chunk(leaf.data)
1080 .await
1081 .map_err(io::Error::from)?;
1082 }
1083 EncodedItem::Done => break,
1084 EncodedItem::Error(cause) => return Err(cause.into()),
1085 }
1086 }
1087 Ok(())
1088 }
1089
1090 pub(crate) async fn write_quinn_with_progress(
1092 self,
1093 writer: &mut SendStream,
1094 progress: &mut impl WriteProgress,
1095 hash: &Hash,
1096 index: u64,
1097 ) -> super::ExportBaoResult<()> {
1098 let mut rx = self.inner.await?;
1099 while let Some(item) = rx.recv().await? {
1100 match item {
1101 EncodedItem::Size(size) => {
1102 progress.send_transfer_started(index, hash, size).await;
1103 writer.write_u64_le(size).await?;
1104 progress.log_other_write(8);
1105 }
1106 EncodedItem::Parent(parent) => {
1107 let mut data = vec![0u8; 64];
1108 data[..32].copy_from_slice(parent.pair.0.as_bytes());
1109 data[32..].copy_from_slice(parent.pair.1.as_bytes());
1110 writer.write_all(&data).await.map_err(io::Error::from)?;
1111 progress.log_other_write(64);
1112 }
1113 EncodedItem::Leaf(leaf) => {
1114 let len = leaf.data.len();
1115 writer
1116 .write_chunk(leaf.data)
1117 .await
1118 .map_err(io::Error::from)?;
1119 progress
1120 .notify_payload_write(index, leaf.offset, len)
1121 .await?;
1122 }
1123 EncodedItem::Done => break,
1124 EncodedItem::Error(cause) => return Err(cause.into()),
1125 }
1126 }
1127 Ok(())
1128 }
1129
1130 pub fn into_byte_stream(self) -> impl Stream<Item = super::Result<Bytes>> {
1131 self.stream().filter_map(|item| match item {
1132 EncodedItem::Size(size) => {
1133 let size = size.to_le_bytes().to_vec().into();
1134 Some(Ok(size))
1135 }
1136 EncodedItem::Parent(parent) => {
1137 let mut data = vec![0u8; 64];
1138 data[..32].copy_from_slice(parent.pair.0.as_bytes());
1139 data[32..].copy_from_slice(parent.pair.1.as_bytes());
1140 Some(Ok(data.into()))
1141 }
1142 EncodedItem::Leaf(leaf) => Some(Ok(leaf.data)),
1143 EncodedItem::Done => None,
1144 EncodedItem::Error(cause) => Some(Err(cause.into())),
1145 })
1146 }
1147
1148 pub fn stream(self) -> impl Stream<Item = EncodedItem> {
1149 Gen::new(|co| async move {
1150 let mut rx = match self.inner.await {
1151 Ok(rx) => rx,
1152 Err(cause) => {
1153 co.yield_(EncodedItem::Error(io::Error::other(cause).into()))
1154 .await;
1155 return;
1156 }
1157 };
1158 while let Ok(Some(item)) = rx.recv().await {
1159 co.yield_(item).await;
1160 }
1161 })
1162 }
1163}
1164
1165pub(crate) trait WriteProgress {
1166 async fn notify_payload_write(&mut self, index: u64, offset: u64, len: usize) -> ClientResult;
1168
1169 fn log_other_write(&mut self, len: usize);
1171
1172 async fn send_transfer_started(&mut self, index: u64, hash: &Hash, size: u64);
1174}