1use std::{
8 collections::BTreeMap,
9 future::{Future, IntoFuture},
10 io,
11 num::NonZeroU64,
12 path::{Path, PathBuf},
13 pin::Pin,
14};
15
16pub use bao_tree::io::mixed::EncodedItem;
17use bao_tree::{
18 io::{
19 fsm::{ResponseDecoder, ResponseDecoderNext},
20 BaoContentItem, Leaf,
21 },
22 BaoTree, ChunkNum, ChunkRanges,
23};
24use bytes::Bytes;
25use genawaiter::sync::Gen;
26use iroh_io::{AsyncStreamReader, TokioStreamReader};
27use irpc::channel::{mpsc, oneshot};
28use n0_future::{future, stream, Stream, StreamExt};
29use quinn::SendStream;
30use range_collections::{range_set::RangeSetRange, RangeSet2};
31use ref_cast::RefCast;
32use tokio::io::AsyncWriteExt;
33use tracing::trace;
34
35pub use super::proto::{
40 AddProgressItem, Bitfield, BlobDeleteRequest as DeleteOptions, BlobStatus,
41 ExportBaoRequest as ExportBaoOptions, ExportMode, ExportPathRequest as ExportOptions,
42 ExportProgressItem, ExportRangesRequest as ExportRangesOptions,
43 ImportBaoRequest as ImportBaoOptions, ImportMode, ObserveRequest as ObserveOptions,
44};
45use super::{
46 proto::{
47 BatchResponse, BlobStatusRequest, ClearProtectedRequest, CreateTempTagRequest,
48 ExportBaoRequest, ExportRangesItem, ImportBaoRequest, ImportByteStreamRequest,
49 ImportBytesRequest, ImportPathRequest, ListRequest, Scope,
50 },
51 remote::HashSeqChunk,
52 tags::TagInfo,
53 ApiClient, RequestResult, Tags,
54};
55use crate::{
56 api::proto::{BatchRequest, ImportByteStreamUpdate},
57 provider::StreamContext,
58 store::IROH_BLOCK_SIZE,
59 util::temp_tag::TempTag,
60 BlobFormat, Hash, HashAndFormat,
61};
62
63#[derive(Debug)]
65pub struct AddBytesOptions {
66 pub data: Bytes,
67 pub format: BlobFormat,
68}
69
70impl<T: Into<Bytes>> From<(T, BlobFormat)> for AddBytesOptions {
71 fn from(item: (T, BlobFormat)) -> Self {
72 let (data, format) = item;
73 Self {
74 data: data.into(),
75 format,
76 }
77 }
78}
79
80#[derive(Debug, Clone, ref_cast::RefCast)]
82#[repr(transparent)]
83pub struct Blobs {
84 client: ApiClient,
85}
86
87impl Blobs {
88 pub(crate) fn ref_from_sender(sender: &ApiClient) -> &Self {
89 Self::ref_cast(sender)
90 }
91
92 pub async fn batch(&self) -> irpc::Result<Batch<'_>> {
93 let msg = BatchRequest;
94 trace!("{msg:?}");
95 let (tx, rx) = self.client.client_streaming(msg, 32).await?;
96 let scope = rx.await?;
97
98 Ok(Batch {
99 scope,
100 blobs: self,
101 _tx: tx,
102 })
103 }
104
105 pub(crate) async fn delete_with_opts(&self, options: DeleteOptions) -> RequestResult<()> {
113 trace!("{options:?}");
114 self.client.rpc(options).await??;
115 Ok(())
116 }
117
118 pub(crate) async fn delete(
120 &self,
121 hashes: impl IntoIterator<Item = impl Into<Hash>>,
122 ) -> RequestResult<()> {
123 self.delete_with_opts(DeleteOptions {
124 hashes: hashes.into_iter().map(Into::into).collect(),
125 force: false,
126 })
127 .await
128 }
129
130 pub fn add_slice(&self, data: impl AsRef<[u8]>) -> AddProgress<'_> {
131 let options = ImportBytesRequest {
132 data: Bytes::copy_from_slice(data.as_ref()),
133 format: crate::BlobFormat::Raw,
134 scope: Scope::GLOBAL,
135 };
136 self.add_bytes_impl(options)
137 }
138
139 pub fn add_bytes(&self, data: impl Into<bytes::Bytes>) -> AddProgress<'_> {
140 let options = ImportBytesRequest {
141 data: data.into(),
142 format: crate::BlobFormat::Raw,
143 scope: Scope::GLOBAL,
144 };
145 self.add_bytes_impl(options)
146 }
147
148 pub fn add_bytes_with_opts(&self, options: impl Into<AddBytesOptions>) -> AddProgress<'_> {
149 let options = options.into();
150 let request = ImportBytesRequest {
151 data: options.data,
152 format: options.format,
153 scope: Scope::GLOBAL,
154 };
155 self.add_bytes_impl(request)
156 }
157
158 fn add_bytes_impl(&self, options: ImportBytesRequest) -> AddProgress<'_> {
159 trace!("{options:?}");
160 let this = self.clone();
161 let stream = Gen::new(|co| async move {
162 let mut receiver = match this.client.server_streaming(options, 32).await {
163 Ok(receiver) => receiver,
164 Err(cause) => {
165 co.yield_(AddProgressItem::Error(cause.into())).await;
166 return;
167 }
168 };
169 loop {
170 match receiver.recv().await {
171 Ok(Some(item)) => co.yield_(item).await,
172 Err(cause) => {
173 co.yield_(AddProgressItem::Error(cause.into())).await;
174 break;
175 }
176 Ok(None) => break,
177 }
178 }
179 });
180 AddProgress::new(self, stream)
181 }
182
183 pub fn add_path_with_opts(&self, options: impl Into<AddPathOptions>) -> AddProgress<'_> {
184 let options = options.into();
185 self.add_path_with_opts_impl(ImportPathRequest {
186 path: options.path,
187 mode: options.mode,
188 format: options.format,
189 scope: Scope::GLOBAL,
190 })
191 }
192
193 fn add_path_with_opts_impl(&self, options: ImportPathRequest) -> AddProgress<'_> {
194 trace!("{:?}", options);
195 let client = self.client.clone();
196 let stream = Gen::new(|co| async move {
197 let mut receiver = match client.server_streaming(options, 32).await {
198 Ok(receiver) => receiver,
199 Err(cause) => {
200 co.yield_(AddProgressItem::Error(cause.into())).await;
201 return;
202 }
203 };
204 loop {
205 match receiver.recv().await {
206 Ok(Some(item)) => co.yield_(item).await,
207 Err(cause) => {
208 co.yield_(AddProgressItem::Error(cause.into())).await;
209 break;
210 }
211 Ok(None) => break,
212 }
213 }
214 });
215 AddProgress::new(self, stream)
216 }
217
218 pub fn add_path(&self, path: impl AsRef<Path>) -> AddProgress<'_> {
219 self.add_path_with_opts(AddPathOptions {
220 path: path.as_ref().to_owned(),
221 mode: ImportMode::Copy,
222 format: BlobFormat::Raw,
223 })
224 }
225
226 pub async fn add_stream(
227 &self,
228 data: impl Stream<Item = io::Result<Bytes>> + Send + Sync + 'static,
229 ) -> AddProgress<'_> {
230 let inner = ImportByteStreamRequest {
231 format: crate::BlobFormat::Raw,
232 scope: Scope::default(),
233 };
234 let client = self.client.clone();
235 let stream = Gen::new(|co| async move {
236 let (sender, mut receiver) = match client.bidi_streaming(inner, 32, 32).await {
237 Ok(x) => x,
238 Err(cause) => {
239 co.yield_(AddProgressItem::Error(cause.into())).await;
240 return;
241 }
242 };
243 let recv = async {
244 loop {
245 match receiver.recv().await {
246 Ok(Some(item)) => co.yield_(item).await,
247 Err(cause) => {
248 co.yield_(AddProgressItem::Error(cause.into())).await;
249 break;
250 }
251 Ok(None) => break,
252 }
253 }
254 };
255 let send = async {
256 tokio::pin!(data);
257 while let Some(item) = data.next().await {
258 sender.send(ImportByteStreamUpdate::Bytes(item?)).await?;
259 }
260 sender.send(ImportByteStreamUpdate::Done).await?;
261 anyhow::Ok(())
262 };
263 let _ = tokio::join!(send, recv);
264 });
265 AddProgress::new(self, stream)
266 }
267
268 pub fn export_ranges(
269 &self,
270 hash: impl Into<Hash>,
271 ranges: impl Into<RangeSet2<u64>>,
272 ) -> ExportRangesProgress {
273 self.export_ranges_with_opts(ExportRangesOptions {
274 hash: hash.into(),
275 ranges: ranges.into(),
276 })
277 }
278
279 pub fn export_ranges_with_opts(&self, options: ExportRangesOptions) -> ExportRangesProgress {
280 trace!("{options:?}");
281 ExportRangesProgress::new(
282 options.ranges.clone(),
283 self.client.server_streaming(options, 32),
284 )
285 }
286
287 pub fn export_bao_with_opts(
288 &self,
289 options: ExportBaoOptions,
290 local_update_cap: usize,
291 ) -> ExportBaoProgress {
292 trace!("{options:?}");
293 ExportBaoProgress::new(self.client.server_streaming(options, local_update_cap))
294 }
295
296 pub fn export_bao(
297 &self,
298 hash: impl Into<Hash>,
299 ranges: impl Into<ChunkRanges>,
300 ) -> ExportBaoProgress {
301 self.export_bao_with_opts(
302 ExportBaoRequest {
303 hash: hash.into(),
304 ranges: ranges.into(),
305 },
306 32,
307 )
308 }
309
310 pub async fn export_chunk(
312 &self,
313 hash: impl Into<Hash>,
314 offset: u64,
315 ) -> super::ExportBaoResult<Leaf> {
316 let base = ChunkNum::full_chunks(offset);
317 let ranges = ChunkRanges::from(base..base + 1);
318 let mut stream = self.export_bao(hash, ranges).stream();
319 while let Some(item) = stream.next().await {
320 match item {
321 EncodedItem::Leaf(leaf) => return Ok(leaf),
322 EncodedItem::Parent(_) => {}
323 EncodedItem::Size(_) => {}
324 EncodedItem::Done => break,
325 EncodedItem::Error(cause) => return Err(cause.into()),
326 }
327 }
328 Err(io::Error::other("unexpected end of stream").into())
329 }
330
331 pub async fn get_bytes(&self, hash: impl Into<Hash>) -> super::ExportBaoResult<Bytes> {
335 self.export_bao(hash.into(), ChunkRanges::all())
336 .data_to_bytes()
337 .await
338 }
339
340 pub fn observe(&self, hash: impl Into<Hash>) -> ObserveProgress {
342 self.observe_with_opts(ObserveOptions { hash: hash.into() })
343 }
344
345 pub fn observe_with_opts(&self, options: ObserveOptions) -> ObserveProgress {
346 trace!("{:?}", options);
347 if options.hash == Hash::EMPTY {
348 return ObserveProgress::new(async move {
349 let (tx, rx) = mpsc::channel(1);
350 tx.send(Bitfield::complete(0)).await.ok();
351 Ok(rx)
352 });
353 }
354 ObserveProgress::new(self.client.server_streaming(options, 32))
355 }
356
357 pub fn export_with_opts(&self, options: ExportOptions) -> ExportProgress {
358 trace!("{:?}", options);
359 ExportProgress::new(self.client.server_streaming(options, 32))
360 }
361
362 pub fn export(&self, hash: impl Into<Hash>, target: impl AsRef<Path>) -> ExportProgress {
363 let options = ExportOptions {
364 hash: hash.into(),
365 mode: ExportMode::Copy,
366 target: target.as_ref().to_owned(),
367 };
368 self.export_with_opts(options)
369 }
370
371 #[cfg_attr(feature = "hide-proto-docs", doc(hidden))]
375 pub async fn import_bao(
376 &self,
377 hash: impl Into<Hash>,
378 size: NonZeroU64,
379 local_update_cap: usize,
380 ) -> irpc::Result<ImportBaoHandle> {
381 let options = ImportBaoRequest {
382 hash: hash.into(),
383 size,
384 };
385 self.import_bao_with_opts(options, local_update_cap).await
386 }
387
388 #[cfg_attr(feature = "hide-proto-docs", doc(hidden))]
389 pub async fn import_bao_with_opts(
390 &self,
391 options: ImportBaoOptions,
392 local_update_cap: usize,
393 ) -> irpc::Result<ImportBaoHandle> {
394 trace!("{:?}", options);
395 ImportBaoHandle::new(self.client.client_streaming(options, local_update_cap)).await
396 }
397
398 #[cfg_attr(feature = "hide-proto-docs", doc(hidden))]
399 async fn import_bao_reader<R: AsyncStreamReader>(
400 &self,
401 hash: Hash,
402 ranges: ChunkRanges,
403 mut reader: R,
404 ) -> RequestResult<R> {
405 let size = u64::from_le_bytes(reader.read::<8>().await.map_err(super::Error::other)?);
406 let Some(size) = NonZeroU64::new(size) else {
407 return if hash == Hash::EMPTY {
408 Ok(reader)
409 } else {
410 Err(super::Error::other("invalid size for hash").into())
411 };
412 };
413 let tree = BaoTree::new(size.get(), IROH_BLOCK_SIZE);
414 let mut decoder = ResponseDecoder::new(hash.into(), ranges, tree, reader);
415 let options = ImportBaoOptions { hash, size };
416 let handle = self.import_bao_with_opts(options, 32).await?;
417 let driver = async move {
418 let reader = loop {
419 match decoder.next().await {
420 ResponseDecoderNext::More((rest, item)) => {
421 handle.tx.send(item?).await?;
422 decoder = rest;
423 }
424 ResponseDecoderNext::Done(reader) => break reader,
425 };
426 };
427 drop(handle.tx);
428 io::Result::Ok(reader)
429 };
430 let fut = async move { handle.rx.await.map_err(io::Error::other)? };
431 let (reader, res) = tokio::join!(driver, fut);
432 res?;
433 Ok(reader?)
434 }
435
436 #[cfg_attr(feature = "hide-proto-docs", doc(hidden))]
437 pub async fn import_bao_quinn(
438 &self,
439 hash: Hash,
440 ranges: ChunkRanges,
441 stream: &mut iroh::endpoint::RecvStream,
442 ) -> RequestResult<()> {
443 let reader = TokioStreamReader::new(stream);
444 self.import_bao_reader(hash, ranges, reader).await?;
445 Ok(())
446 }
447
448 #[cfg_attr(feature = "hide-proto-docs", doc(hidden))]
449 pub async fn import_bao_bytes(
450 &self,
451 hash: Hash,
452 ranges: ChunkRanges,
453 data: impl Into<Bytes>,
454 ) -> RequestResult<()> {
455 self.import_bao_reader(hash, ranges, data.into()).await?;
456 Ok(())
457 }
458
459 pub fn list(&self) -> BlobsListProgress {
460 let msg = ListRequest;
461 let client = self.client.clone();
462 BlobsListProgress::new(client.server_streaming(msg, 32))
463 }
464
465 pub async fn status(&self, hash: impl Into<Hash>) -> irpc::Result<BlobStatus> {
466 let hash = hash.into();
467 let msg = BlobStatusRequest { hash };
468 self.client.rpc(msg).await
469 }
470
471 pub async fn has(&self, hash: impl Into<Hash>) -> irpc::Result<bool> {
472 match self.status(hash).await? {
473 BlobStatus::Complete { .. } => Ok(true),
474 _ => Ok(false),
475 }
476 }
477
478 pub(crate) async fn clear_protected(&self) -> RequestResult<()> {
479 let msg = ClearProtectedRequest;
480 self.client.rpc(msg).await??;
481 Ok(())
482 }
483}
484
485pub struct BatchAddProgress<'a>(AddProgress<'a>);
487
488impl<'a> IntoFuture for BatchAddProgress<'a> {
489 type Output = RequestResult<TempTag>;
490
491 type IntoFuture = Pin<Box<dyn Future<Output = Self::Output> + Send + 'a>>;
492
493 fn into_future(self) -> Self::IntoFuture {
494 Box::pin(self.temp_tag())
495 }
496}
497
498impl<'a> BatchAddProgress<'a> {
499 pub async fn with_named_tag(self, name: impl AsRef<[u8]>) -> RequestResult<HashAndFormat> {
500 self.0.with_named_tag(name).await
501 }
502
503 pub async fn with_tag(self) -> RequestResult<TagInfo> {
504 self.0.with_tag().await
505 }
506
507 pub async fn stream(self) -> impl Stream<Item = AddProgressItem> {
508 self.0.stream().await
509 }
510
511 pub async fn temp_tag(self) -> RequestResult<TempTag> {
512 self.0.temp_tag().await
513 }
514}
515
516pub struct Batch<'a> {
518 scope: Scope,
519 blobs: &'a Blobs,
520 _tx: mpsc::Sender<BatchResponse>,
521}
522
523impl<'a> Batch<'a> {
524 pub fn add_bytes(&self, data: impl Into<Bytes>) -> BatchAddProgress<'_> {
525 let options = ImportBytesRequest {
526 data: data.into(),
527 format: crate::BlobFormat::Raw,
528 scope: self.scope,
529 };
530 BatchAddProgress(self.blobs.add_bytes_impl(options))
531 }
532
533 pub fn add_bytes_with_opts(&self, options: impl Into<AddBytesOptions>) -> BatchAddProgress<'_> {
534 let options = options.into();
535 BatchAddProgress(self.blobs.add_bytes_impl(ImportBytesRequest {
536 data: options.data,
537 format: options.format,
538 scope: self.scope,
539 }))
540 }
541
542 pub fn add_slice(&self, data: impl AsRef<[u8]>) -> BatchAddProgress<'_> {
543 let options = ImportBytesRequest {
544 data: Bytes::copy_from_slice(data.as_ref()),
545 format: crate::BlobFormat::Raw,
546 scope: self.scope,
547 };
548 BatchAddProgress(self.blobs.add_bytes_impl(options))
549 }
550
551 pub fn add_path_with_opts(&self, options: impl Into<AddPathOptions>) -> BatchAddProgress<'_> {
552 let options = options.into();
553 BatchAddProgress(self.blobs.add_path_with_opts_impl(ImportPathRequest {
554 path: options.path,
555 mode: options.mode,
556 format: options.format,
557 scope: self.scope,
558 }))
559 }
560
561 pub async fn temp_tag(&self, value: impl Into<HashAndFormat>) -> irpc::Result<TempTag> {
562 let value = value.into();
563 let msg = CreateTempTagRequest {
564 scope: self.scope,
565 value,
566 };
567 self.blobs.client.rpc(msg).await
568 }
569}
570
571#[derive(Debug)]
573pub struct AddPathOptions {
574 pub path: PathBuf,
575 pub format: BlobFormat,
576 pub mode: ImportMode,
577}
578
579pub struct AddProgress<'a> {
590 blobs: &'a Blobs,
591 inner: stream::Boxed<AddProgressItem>,
592}
593
594impl<'a> IntoFuture for AddProgress<'a> {
595 type Output = RequestResult<TagInfo>;
596
597 type IntoFuture = Pin<Box<dyn Future<Output = Self::Output> + Send + 'a>>;
598
599 fn into_future(self) -> Self::IntoFuture {
600 Box::pin(self.with_tag())
601 }
602}
603
604impl<'a> AddProgress<'a> {
605 fn new(blobs: &'a Blobs, stream: impl Stream<Item = AddProgressItem> + Send + 'static) -> Self {
606 Self {
607 blobs,
608 inner: Box::pin(stream),
609 }
610 }
611
612 pub async fn temp_tag(self) -> RequestResult<TempTag> {
613 let mut stream = self.inner;
614 while let Some(item) = stream.next().await {
615 match item {
616 AddProgressItem::Done(tt) => return Ok(tt),
617 AddProgressItem::Error(e) => return Err(e.into()),
618 _ => {}
619 }
620 }
621 Err(super::Error::other("unexpected end of stream").into())
622 }
623
624 pub async fn with_named_tag(self, name: impl AsRef<[u8]>) -> RequestResult<HashAndFormat> {
625 let blobs = self.blobs.clone();
626 let tt = self.temp_tag().await?;
627 let haf = *tt.hash_and_format();
628 let tags = Tags::ref_from_sender(&blobs.client);
629 tags.set(name, *tt.hash_and_format()).await?;
630 drop(tt);
631 Ok(haf)
632 }
633
634 pub async fn with_tag(self) -> RequestResult<TagInfo> {
635 let blobs = self.blobs.clone();
636 let tt = self.temp_tag().await?;
637 let hash = *tt.hash();
638 let format = tt.format();
639 let tags = Tags::ref_from_sender(&blobs.client);
640 let name = tags.create(*tt.hash_and_format()).await?;
641 drop(tt);
642 Ok(TagInfo { name, hash, format })
643 }
644
645 pub async fn stream(self) -> impl Stream<Item = AddProgressItem> {
646 self.inner
647 }
648}
649
650pub struct ObserveProgress {
655 inner: future::Boxed<irpc::Result<mpsc::Receiver<Bitfield>>>,
656}
657
658impl IntoFuture for ObserveProgress {
659 type Output = RequestResult<Bitfield>;
660
661 type IntoFuture = Pin<Box<dyn Future<Output = Self::Output> + Send>>;
662
663 fn into_future(self) -> Self::IntoFuture {
664 Box::pin(async move {
665 let mut rx = self.inner.await?;
666 match rx.recv().await? {
667 Some(bitfield) => Ok(bitfield),
668 None => Err(super::Error::other("unexpected end of stream").into()),
669 }
670 })
671 }
672}
673
674impl ObserveProgress {
675 fn new(
676 fut: impl Future<Output = irpc::Result<mpsc::Receiver<Bitfield>>> + Send + 'static,
677 ) -> Self {
678 Self {
679 inner: Box::pin(fut),
680 }
681 }
682
683 pub async fn await_completion(self) -> RequestResult<Bitfield> {
684 let mut stream = self.stream().await?;
685 while let Some(item) = stream.next().await {
686 if item.is_complete() {
687 return Ok(item);
688 }
689 }
690 Err(super::Error::other("unexpected end of stream").into())
691 }
692
693 pub async fn stream(self) -> irpc::Result<impl Stream<Item = Bitfield>> {
698 let mut rx = self.inner.await?;
699 Ok(Gen::new(|co| async move {
700 while let Ok(Some(item)) = rx.recv().await {
701 co.yield_(item).await;
702 }
703 }))
704 }
705}
706
707pub struct ExportProgress {
718 inner: future::Boxed<irpc::Result<mpsc::Receiver<ExportProgressItem>>>,
719}
720
721impl IntoFuture for ExportProgress {
722 type Output = RequestResult<u64>;
723
724 type IntoFuture = Pin<Box<dyn Future<Output = Self::Output> + Send>>;
725
726 fn into_future(self) -> Self::IntoFuture {
727 Box::pin(self.finish())
728 }
729}
730
731impl ExportProgress {
732 fn new(
733 fut: impl Future<Output = irpc::Result<mpsc::Receiver<ExportProgressItem>>> + Send + 'static,
734 ) -> Self {
735 Self {
736 inner: Box::pin(fut),
737 }
738 }
739
740 pub async fn stream(self) -> impl Stream<Item = ExportProgressItem> {
741 Gen::new(|co| async move {
742 let mut rx = match self.inner.await {
743 Ok(rx) => rx,
744 Err(e) => {
745 co.yield_(ExportProgressItem::Error(e.into())).await;
746 return;
747 }
748 };
749 while let Ok(Some(item)) = rx.recv().await {
750 co.yield_(item).await;
751 }
752 })
753 }
754
755 pub async fn finish(self) -> RequestResult<u64> {
756 let mut rx = self.inner.await?;
757 let mut size = None;
758 loop {
759 match rx.recv().await? {
760 Some(ExportProgressItem::Done) => break,
761 Some(ExportProgressItem::Size(s)) => size = Some(s),
762 Some(ExportProgressItem::Error(cause)) => return Err(cause.into()),
763 _ => {}
764 }
765 }
766 if let Some(size) = size {
767 Ok(size)
768 } else {
769 Err(super::Error::other("unexpected end of stream").into())
770 }
771 }
772}
773
774pub struct ImportBaoHandle {
776 pub tx: mpsc::Sender<BaoContentItem>,
777 pub rx: oneshot::Receiver<super::Result<()>>,
778}
779
780impl ImportBaoHandle {
781 pub(crate) async fn new(
782 fut: impl Future<
783 Output = irpc::Result<(
784 mpsc::Sender<BaoContentItem>,
785 oneshot::Receiver<super::Result<()>>,
786 )>,
787 > + Send
788 + 'static,
789 ) -> irpc::Result<Self> {
790 let (tx, rx) = fut.await?;
791 Ok(Self { tx, rx })
792 }
793}
794
795pub struct BlobsListProgress {
797 inner: future::Boxed<irpc::Result<mpsc::Receiver<super::Result<Hash>>>>,
798}
799
800impl BlobsListProgress {
801 fn new(
802 fut: impl Future<Output = irpc::Result<mpsc::Receiver<super::Result<Hash>>>> + Send + 'static,
803 ) -> Self {
804 Self {
805 inner: Box::pin(fut),
806 }
807 }
808
809 pub async fn hashes(self) -> RequestResult<Vec<Hash>> {
810 let mut rx: mpsc::Receiver<Result<Hash, super::Error>> = self.inner.await?;
811 let mut hashes = Vec::new();
812 while let Some(item) = rx.recv().await? {
813 hashes.push(item?);
814 }
815 Ok(hashes)
816 }
817
818 pub async fn stream(self) -> irpc::Result<impl Stream<Item = super::Result<Hash>>> {
819 let mut rx = self.inner.await?;
820 Ok(Gen::new(|co| async move {
821 while let Ok(Some(item)) = rx.recv().await {
822 co.yield_(item).await;
823 }
824 }))
825 }
826}
827
828pub struct ExportRangesProgress {
836 ranges: RangeSet2<u64>,
837 inner: future::Boxed<irpc::Result<mpsc::Receiver<ExportRangesItem>>>,
838}
839
840impl ExportRangesProgress {
841 fn new(
842 ranges: RangeSet2<u64>,
843 fut: impl Future<Output = irpc::Result<mpsc::Receiver<ExportRangesItem>>> + Send + 'static,
844 ) -> Self {
845 Self {
846 ranges,
847 inner: Box::pin(fut),
848 }
849 }
850}
851
852impl ExportRangesProgress {
853 pub async fn stream(self) -> impl Stream<Item = ExportRangesItem> {
860 Gen::new(|co| async move {
861 let mut rx = match self.inner.await {
862 Ok(rx) => rx,
863 Err(e) => {
864 co.yield_(ExportRangesItem::Error(e.into())).await;
865 return;
866 }
867 };
868 while let Ok(Some(item)) = rx.recv().await {
869 co.yield_(item).await;
870 }
871 })
872 }
873
874 pub async fn concatenate(self) -> RequestResult<Vec<u8>> {
876 let mut rx = self.inner.await?;
877 let mut data = BTreeMap::new();
878 while let Some(item) = rx.recv().await? {
879 match item {
880 ExportRangesItem::Size(_) => {}
881 ExportRangesItem::Data(leaf) => {
882 data.insert(leaf.offset, leaf.data);
883 }
884 ExportRangesItem::Error(cause) => return Err(cause.into()),
885 }
886 }
887 let mut res = Vec::new();
888 for range in self.ranges.iter() {
889 let (start, end) = match range {
890 RangeSetRange::RangeFrom(range) => (*range.start, u64::MAX),
891 RangeSetRange::Range(range) => (*range.start, *range.end),
892 };
893 for (offset, data) in data.iter() {
894 let cstart = *offset;
895 let cend = *offset + (data.len() as u64);
896 if cstart >= end || cend <= start {
897 continue;
898 }
899 let start = start.max(cstart);
900 let end = end.min(cend);
901 let data = &data[(start - cstart) as usize..(end - cstart) as usize];
902 res.extend_from_slice(data);
903 }
904 }
905 Ok(res)
906 }
907}
908
909pub struct ExportBaoProgress {
917 inner: future::Boxed<irpc::Result<mpsc::Receiver<EncodedItem>>>,
918}
919
920impl ExportBaoProgress {
921 fn new(
922 fut: impl Future<Output = irpc::Result<mpsc::Receiver<EncodedItem>>> + Send + 'static,
923 ) -> Self {
924 Self {
925 inner: Box::pin(fut),
926 }
927 }
928
929 pub fn hashes_with_index(
936 self,
937 ) -> impl Stream<Item = std::result::Result<(u64, Hash), anyhow::Error>> {
938 let mut stream = self.stream();
939 Gen::new(|co| async move {
940 while let Some(item) = stream.next().await {
941 let leaf = match item {
942 EncodedItem::Leaf(leaf) => leaf,
943 EncodedItem::Error(e) => {
944 co.yield_(Err(e.into())).await;
945 continue;
946 }
947 _ => continue,
948 };
949 let slice = match HashSeqChunk::try_from(leaf) {
950 Ok(slice) => slice,
951 Err(e) => {
952 co.yield_(Err(e)).await;
953 continue;
954 }
955 };
956 let offset = slice.base();
957 for (o, hash) in slice.into_iter().enumerate() {
958 co.yield_(Ok((offset + o as u64, hash))).await;
959 }
960 }
961 })
962 }
963
964 pub fn hashes(self) -> impl Stream<Item = std::result::Result<Hash, anyhow::Error>> {
966 self.hashes_with_index().map(|x| x.map(|(_, hash)| hash))
967 }
968
969 pub async fn bao_to_vec(self) -> RequestResult<Vec<u8>> {
970 let mut data = Vec::new();
971 let mut stream = self.into_byte_stream();
972 while let Some(item) = stream.next().await {
973 data.extend_from_slice(&item?);
974 }
975 Ok(data)
976 }
977
978 pub async fn data_to_bytes(self) -> super::ExportBaoResult<Bytes> {
979 let mut rx = self.inner.await?;
980 let mut data = Vec::new();
981 while let Some(item) = rx.recv().await? {
982 match item {
983 EncodedItem::Leaf(leaf) => {
984 data.push(leaf.data);
985 }
986 EncodedItem::Parent(_) => {}
987 EncodedItem::Size(_) => {}
988 EncodedItem::Done => break,
989 EncodedItem::Error(cause) => return Err(cause.into()),
990 }
991 }
992 if data.len() == 1 {
993 Ok(data.pop().unwrap())
994 } else {
995 let mut out = Vec::new();
996 for item in data {
997 out.extend_from_slice(&item);
998 }
999 Ok(out.into())
1000 }
1001 }
1002
1003 pub async fn data_to_vec(self) -> super::ExportBaoResult<Vec<u8>> {
1004 let mut rx = self.inner.await?;
1005 let mut data = Vec::new();
1006 while let Some(item) = rx.recv().await? {
1007 match item {
1008 EncodedItem::Leaf(leaf) => {
1009 data.extend_from_slice(&leaf.data);
1010 }
1011 EncodedItem::Parent(_) => {}
1012 EncodedItem::Size(_) => {}
1013 EncodedItem::Done => break,
1014 EncodedItem::Error(cause) => return Err(cause.into()),
1015 }
1016 }
1017 Ok(data)
1018 }
1019
1020 pub async fn write_quinn(self, target: &mut quinn::SendStream) -> super::ExportBaoResult<()> {
1021 let mut rx = self.inner.await?;
1022 while let Some(item) = rx.recv().await? {
1023 match item {
1024 EncodedItem::Size(size) => {
1025 target.write_u64_le(size).await?;
1026 }
1027 EncodedItem::Parent(parent) => {
1028 let mut data = vec![0u8; 64];
1029 data[..32].copy_from_slice(parent.pair.0.as_bytes());
1030 data[32..].copy_from_slice(parent.pair.1.as_bytes());
1031 target.write_all(&data).await.map_err(io::Error::from)?;
1032 }
1033 EncodedItem::Leaf(leaf) => {
1034 target
1035 .write_chunk(leaf.data)
1036 .await
1037 .map_err(io::Error::from)?;
1038 }
1039 EncodedItem::Done => break,
1040 EncodedItem::Error(cause) => return Err(cause.into()),
1041 }
1042 }
1043 Ok(())
1044 }
1045
1046 pub(crate) async fn write_quinn_with_progress(
1048 self,
1049 writer: &mut SendStream,
1050 progress: &mut impl WriteProgress,
1051 hash: &Hash,
1052 index: u64,
1053 ) -> super::ExportBaoResult<()> {
1054 let mut rx = self.inner.await?;
1055 while let Some(item) = rx.recv().await? {
1056 match item {
1057 EncodedItem::Size(size) => {
1058 progress.send_transfer_started(index, hash, size).await;
1059 writer.write_u64_le(size).await?;
1060 progress.log_other_write(8);
1061 }
1062 EncodedItem::Parent(parent) => {
1063 let mut data = vec![0u8; 64];
1064 data[..32].copy_from_slice(parent.pair.0.as_bytes());
1065 data[32..].copy_from_slice(parent.pair.1.as_bytes());
1066 writer.write_all(&data).await.map_err(io::Error::from)?;
1067 progress.log_other_write(64);
1068 }
1069 EncodedItem::Leaf(leaf) => {
1070 let len = leaf.data.len();
1071 writer
1072 .write_chunk(leaf.data)
1073 .await
1074 .map_err(io::Error::from)?;
1075 progress.notify_payload_write(index, leaf.offset, len).await;
1076 }
1077 EncodedItem::Done => break,
1078 EncodedItem::Error(cause) => return Err(cause.into()),
1079 }
1080 }
1081 Ok(())
1082 }
1083
1084 pub fn into_byte_stream(self) -> impl Stream<Item = super::Result<Bytes>> {
1085 self.stream().filter_map(|item| match item {
1086 EncodedItem::Size(size) => {
1087 let size = size.to_le_bytes().to_vec().into();
1088 Some(Ok(size))
1089 }
1090 EncodedItem::Parent(parent) => {
1091 let mut data = vec![0u8; 64];
1092 data[..32].copy_from_slice(parent.pair.0.as_bytes());
1093 data[32..].copy_from_slice(parent.pair.1.as_bytes());
1094 Some(Ok(data.into()))
1095 }
1096 EncodedItem::Leaf(leaf) => Some(Ok(leaf.data)),
1097 EncodedItem::Done => None,
1098 EncodedItem::Error(cause) => Some(Err(cause.into())),
1099 })
1100 }
1101
1102 pub fn stream(self) -> impl Stream<Item = EncodedItem> {
1103 Gen::new(|co| async move {
1104 let mut rx = match self.inner.await {
1105 Ok(rx) => rx,
1106 Err(cause) => {
1107 co.yield_(EncodedItem::Error(io::Error::other(cause).into()))
1108 .await;
1109 return;
1110 }
1111 };
1112 while let Ok(Some(item)) = rx.recv().await {
1113 co.yield_(item).await;
1114 }
1115 })
1116 }
1117}
1118
1119pub(crate) trait WriteProgress {
1120 async fn notify_payload_write(&mut self, index: u64, offset: u64, len: usize);
1122
1123 fn log_other_write(&mut self, len: usize);
1125
1126 async fn send_transfer_started(&mut self, index: u64, hash: &Hash, size: u64);
1128}
1129
1130impl WriteProgress for StreamContext {
1131 async fn notify_payload_write(&mut self, index: u64, offset: u64, len: usize) {
1132 StreamContext::notify_payload_write(self, index, offset, len);
1133 }
1134
1135 fn log_other_write(&mut self, len: usize) {
1136 StreamContext::log_other_write(self, len);
1137 }
1138
1139 async fn send_transfer_started(&mut self, index: u64, hash: &Hash, size: u64) {
1140 StreamContext::send_transfer_started(self, index, hash, size).await
1141 }
1142}