1use std::{
8 collections::BTreeMap,
9 future::{Future, IntoFuture},
10 io,
11 num::NonZeroU64,
12 path::{Path, PathBuf},
13 pin::Pin,
14};
15
16pub use bao_tree::io::mixed::EncodedItem;
17use bao_tree::{
18 io::{
19 fsm::{ResponseDecoder, ResponseDecoderNext},
20 BaoContentItem, Leaf,
21 },
22 BaoTree, ChunkNum, ChunkRanges,
23};
24use bytes::Bytes;
25use genawaiter::sync::Gen;
26use iroh_io::{AsyncStreamReader, TokioStreamReader};
27use irpc::channel::{mpsc, oneshot};
28use n0_future::{future, stream, Stream, StreamExt};
29use quinn::SendStream;
30use range_collections::{range_set::RangeSetRange, RangeSet2};
31use ref_cast::RefCast;
32use serde::{Deserialize, Serialize};
33use tokio::io::AsyncWriteExt;
34use tracing::trace;
35mod reader;
36pub use reader::BlobReader;
37
38pub use super::proto::{
43 AddProgressItem, Bitfield, BlobDeleteRequest as DeleteOptions, BlobStatus,
44 ExportBaoRequest as ExportBaoOptions, ExportMode, ExportPathRequest as ExportOptions,
45 ExportProgressItem, ExportRangesRequest as ExportRangesOptions,
46 ImportBaoRequest as ImportBaoOptions, ImportMode, ObserveRequest as ObserveOptions,
47};
48use super::{
49 proto::{
50 BatchResponse, BlobStatusRequest, ClearProtectedRequest, CreateTempTagRequest,
51 ExportBaoRequest, ExportRangesItem, ImportBaoRequest, ImportByteStreamRequest,
52 ImportBytesRequest, ImportPathRequest, ListRequest, Scope,
53 },
54 remote::HashSeqChunk,
55 tags::TagInfo,
56 ApiClient, RequestResult, Tags,
57};
58use crate::{
59 api::proto::{BatchRequest, ImportByteStreamUpdate},
60 provider::StreamContext,
61 store::IROH_BLOCK_SIZE,
62 util::temp_tag::TempTag,
63 BlobFormat, Hash, HashAndFormat,
64};
65
66#[derive(Debug)]
68pub struct AddBytesOptions {
69 pub data: Bytes,
70 pub format: BlobFormat,
71}
72
73impl<T: Into<Bytes>> From<(T, BlobFormat)> for AddBytesOptions {
74 fn from(item: (T, BlobFormat)) -> Self {
75 let (data, format) = item;
76 Self {
77 data: data.into(),
78 format,
79 }
80 }
81}
82
83#[derive(Debug, Clone, ref_cast::RefCast)]
85#[repr(transparent)]
86pub struct Blobs {
87 client: ApiClient,
88}
89
90impl Blobs {
91 pub(crate) fn ref_from_sender(sender: &ApiClient) -> &Self {
92 Self::ref_cast(sender)
93 }
94
95 pub async fn batch(&self) -> irpc::Result<Batch<'_>> {
96 let msg = BatchRequest;
97 trace!("{msg:?}");
98 let (tx, rx) = self.client.client_streaming(msg, 32).await?;
99 let scope = rx.await?;
100
101 Ok(Batch {
102 scope,
103 blobs: self,
104 _tx: tx,
105 })
106 }
107
108 pub fn reader(&self, hash: impl Into<Hash>) -> BlobReader {
129 self.reader_with_opts(ReaderOptions { hash: hash.into() })
130 }
131
132 pub fn reader_with_opts(&self, options: ReaderOptions) -> BlobReader {
137 BlobReader::new(self.clone(), options)
138 }
139
140 pub(crate) async fn delete_with_opts(&self, options: DeleteOptions) -> RequestResult<()> {
148 trace!("{options:?}");
149 self.client.rpc(options).await??;
150 Ok(())
151 }
152
153 pub(crate) async fn delete(
155 &self,
156 hashes: impl IntoIterator<Item = impl Into<Hash>>,
157 ) -> RequestResult<()> {
158 self.delete_with_opts(DeleteOptions {
159 hashes: hashes.into_iter().map(Into::into).collect(),
160 force: false,
161 })
162 .await
163 }
164
165 pub fn add_slice(&self, data: impl AsRef<[u8]>) -> AddProgress<'_> {
166 let options = ImportBytesRequest {
167 data: Bytes::copy_from_slice(data.as_ref()),
168 format: crate::BlobFormat::Raw,
169 scope: Scope::GLOBAL,
170 };
171 self.add_bytes_impl(options)
172 }
173
174 pub fn add_bytes(&self, data: impl Into<bytes::Bytes>) -> AddProgress<'_> {
175 let options = ImportBytesRequest {
176 data: data.into(),
177 format: crate::BlobFormat::Raw,
178 scope: Scope::GLOBAL,
179 };
180 self.add_bytes_impl(options)
181 }
182
183 pub fn add_bytes_with_opts(&self, options: impl Into<AddBytesOptions>) -> AddProgress<'_> {
184 let options = options.into();
185 let request = ImportBytesRequest {
186 data: options.data,
187 format: options.format,
188 scope: Scope::GLOBAL,
189 };
190 self.add_bytes_impl(request)
191 }
192
193 fn add_bytes_impl(&self, options: ImportBytesRequest) -> AddProgress<'_> {
194 trace!("{options:?}");
195 let this = self.clone();
196 let stream = Gen::new(|co| async move {
197 let mut receiver = match this.client.server_streaming(options, 32).await {
198 Ok(receiver) => receiver,
199 Err(cause) => {
200 co.yield_(AddProgressItem::Error(cause.into())).await;
201 return;
202 }
203 };
204 loop {
205 match receiver.recv().await {
206 Ok(Some(item)) => co.yield_(item).await,
207 Err(cause) => {
208 co.yield_(AddProgressItem::Error(cause.into())).await;
209 break;
210 }
211 Ok(None) => break,
212 }
213 }
214 });
215 AddProgress::new(self, stream)
216 }
217
218 pub fn add_path_with_opts(&self, options: impl Into<AddPathOptions>) -> AddProgress<'_> {
219 let options = options.into();
220 self.add_path_with_opts_impl(ImportPathRequest {
221 path: options.path,
222 mode: options.mode,
223 format: options.format,
224 scope: Scope::GLOBAL,
225 })
226 }
227
228 fn add_path_with_opts_impl(&self, options: ImportPathRequest) -> AddProgress<'_> {
229 trace!("{:?}", options);
230 let client = self.client.clone();
231 let stream = Gen::new(|co| async move {
232 let mut receiver = match client.server_streaming(options, 32).await {
233 Ok(receiver) => receiver,
234 Err(cause) => {
235 co.yield_(AddProgressItem::Error(cause.into())).await;
236 return;
237 }
238 };
239 loop {
240 match receiver.recv().await {
241 Ok(Some(item)) => co.yield_(item).await,
242 Err(cause) => {
243 co.yield_(AddProgressItem::Error(cause.into())).await;
244 break;
245 }
246 Ok(None) => break,
247 }
248 }
249 });
250 AddProgress::new(self, stream)
251 }
252
253 pub fn add_path(&self, path: impl AsRef<Path>) -> AddProgress<'_> {
254 self.add_path_with_opts(AddPathOptions {
255 path: path.as_ref().to_owned(),
256 mode: ImportMode::Copy,
257 format: BlobFormat::Raw,
258 })
259 }
260
261 pub async fn add_stream(
262 &self,
263 data: impl Stream<Item = io::Result<Bytes>> + Send + Sync + 'static,
264 ) -> AddProgress<'_> {
265 let inner = ImportByteStreamRequest {
266 format: crate::BlobFormat::Raw,
267 scope: Scope::default(),
268 };
269 let client = self.client.clone();
270 let stream = Gen::new(|co| async move {
271 let (sender, mut receiver) = match client.bidi_streaming(inner, 32, 32).await {
272 Ok(x) => x,
273 Err(cause) => {
274 co.yield_(AddProgressItem::Error(cause.into())).await;
275 return;
276 }
277 };
278 let recv = async {
279 loop {
280 match receiver.recv().await {
281 Ok(Some(item)) => co.yield_(item).await,
282 Err(cause) => {
283 co.yield_(AddProgressItem::Error(cause.into())).await;
284 break;
285 }
286 Ok(None) => break,
287 }
288 }
289 };
290 let send = async {
291 tokio::pin!(data);
292 while let Some(item) = data.next().await {
293 sender.send(ImportByteStreamUpdate::Bytes(item?)).await?;
294 }
295 sender.send(ImportByteStreamUpdate::Done).await?;
296 anyhow::Ok(())
297 };
298 let _ = tokio::join!(send, recv);
299 });
300 AddProgress::new(self, stream)
301 }
302
303 pub fn export_ranges(
304 &self,
305 hash: impl Into<Hash>,
306 ranges: impl Into<RangeSet2<u64>>,
307 ) -> ExportRangesProgress {
308 self.export_ranges_with_opts(ExportRangesOptions {
309 hash: hash.into(),
310 ranges: ranges.into(),
311 })
312 }
313
314 pub fn export_ranges_with_opts(&self, options: ExportRangesOptions) -> ExportRangesProgress {
315 trace!("{options:?}");
316 ExportRangesProgress::new(
317 options.ranges.clone(),
318 self.client.server_streaming(options, 32),
319 )
320 }
321
322 pub fn export_bao_with_opts(
323 &self,
324 options: ExportBaoOptions,
325 local_update_cap: usize,
326 ) -> ExportBaoProgress {
327 trace!("{options:?}");
328 ExportBaoProgress::new(self.client.server_streaming(options, local_update_cap))
329 }
330
331 pub fn export_bao(
332 &self,
333 hash: impl Into<Hash>,
334 ranges: impl Into<ChunkRanges>,
335 ) -> ExportBaoProgress {
336 self.export_bao_with_opts(
337 ExportBaoRequest {
338 hash: hash.into(),
339 ranges: ranges.into(),
340 },
341 32,
342 )
343 }
344
345 pub async fn export_chunk(
347 &self,
348 hash: impl Into<Hash>,
349 offset: u64,
350 ) -> super::ExportBaoResult<Leaf> {
351 let base = ChunkNum::full_chunks(offset);
352 let ranges = ChunkRanges::from(base..base + 1);
353 let mut stream = self.export_bao(hash, ranges).stream();
354 while let Some(item) = stream.next().await {
355 match item {
356 EncodedItem::Leaf(leaf) => return Ok(leaf),
357 EncodedItem::Parent(_) => {}
358 EncodedItem::Size(_) => {}
359 EncodedItem::Done => break,
360 EncodedItem::Error(cause) => return Err(cause.into()),
361 }
362 }
363 Err(io::Error::other("unexpected end of stream").into())
364 }
365
366 pub async fn get_bytes(&self, hash: impl Into<Hash>) -> super::ExportBaoResult<Bytes> {
370 self.export_bao(hash.into(), ChunkRanges::all())
371 .data_to_bytes()
372 .await
373 }
374
375 pub fn observe(&self, hash: impl Into<Hash>) -> ObserveProgress {
377 self.observe_with_opts(ObserveOptions { hash: hash.into() })
378 }
379
380 pub fn observe_with_opts(&self, options: ObserveOptions) -> ObserveProgress {
381 trace!("{:?}", options);
382 if options.hash == Hash::EMPTY {
383 return ObserveProgress::new(async move {
384 let (tx, rx) = mpsc::channel(1);
385 tx.send(Bitfield::complete(0)).await.ok();
386 Ok(rx)
387 });
388 }
389 ObserveProgress::new(self.client.server_streaming(options, 32))
390 }
391
392 pub fn export_with_opts(&self, options: ExportOptions) -> ExportProgress {
393 trace!("{:?}", options);
394 ExportProgress::new(self.client.server_streaming(options, 32))
395 }
396
397 pub fn export(&self, hash: impl Into<Hash>, target: impl AsRef<Path>) -> ExportProgress {
398 let options = ExportOptions {
399 hash: hash.into(),
400 mode: ExportMode::Copy,
401 target: target.as_ref().to_owned(),
402 };
403 self.export_with_opts(options)
404 }
405
406 #[cfg_attr(feature = "hide-proto-docs", doc(hidden))]
410 pub async fn import_bao(
411 &self,
412 hash: impl Into<Hash>,
413 size: NonZeroU64,
414 local_update_cap: usize,
415 ) -> irpc::Result<ImportBaoHandle> {
416 let options = ImportBaoRequest {
417 hash: hash.into(),
418 size,
419 };
420 self.import_bao_with_opts(options, local_update_cap).await
421 }
422
423 #[cfg_attr(feature = "hide-proto-docs", doc(hidden))]
424 pub async fn import_bao_with_opts(
425 &self,
426 options: ImportBaoOptions,
427 local_update_cap: usize,
428 ) -> irpc::Result<ImportBaoHandle> {
429 trace!("{:?}", options);
430 ImportBaoHandle::new(self.client.client_streaming(options, local_update_cap)).await
431 }
432
433 #[cfg_attr(feature = "hide-proto-docs", doc(hidden))]
434 async fn import_bao_reader<R: AsyncStreamReader>(
435 &self,
436 hash: Hash,
437 ranges: ChunkRanges,
438 mut reader: R,
439 ) -> RequestResult<R> {
440 let size = u64::from_le_bytes(reader.read::<8>().await.map_err(super::Error::other)?);
441 let Some(size) = NonZeroU64::new(size) else {
442 return if hash == Hash::EMPTY {
443 Ok(reader)
444 } else {
445 Err(super::Error::other("invalid size for hash").into())
446 };
447 };
448 let tree = BaoTree::new(size.get(), IROH_BLOCK_SIZE);
449 let mut decoder = ResponseDecoder::new(hash.into(), ranges, tree, reader);
450 let options = ImportBaoOptions { hash, size };
451 let handle = self.import_bao_with_opts(options, 32).await?;
452 let driver = async move {
453 let reader = loop {
454 match decoder.next().await {
455 ResponseDecoderNext::More((rest, item)) => {
456 handle.tx.send(item?).await?;
457 decoder = rest;
458 }
459 ResponseDecoderNext::Done(reader) => break reader,
460 };
461 };
462 drop(handle.tx);
463 io::Result::Ok(reader)
464 };
465 let fut = async move { handle.rx.await.map_err(io::Error::other)? };
466 let (reader, res) = tokio::join!(driver, fut);
467 res?;
468 Ok(reader?)
469 }
470
471 #[cfg_attr(feature = "hide-proto-docs", doc(hidden))]
472 pub async fn import_bao_quinn(
473 &self,
474 hash: Hash,
475 ranges: ChunkRanges,
476 stream: &mut iroh::endpoint::RecvStream,
477 ) -> RequestResult<()> {
478 let reader = TokioStreamReader::new(stream);
479 self.import_bao_reader(hash, ranges, reader).await?;
480 Ok(())
481 }
482
483 #[cfg_attr(feature = "hide-proto-docs", doc(hidden))]
484 pub async fn import_bao_bytes(
485 &self,
486 hash: Hash,
487 ranges: ChunkRanges,
488 data: impl Into<Bytes>,
489 ) -> RequestResult<()> {
490 self.import_bao_reader(hash, ranges, data.into()).await?;
491 Ok(())
492 }
493
494 pub fn list(&self) -> BlobsListProgress {
495 let msg = ListRequest;
496 let client = self.client.clone();
497 BlobsListProgress::new(client.server_streaming(msg, 32))
498 }
499
500 pub async fn status(&self, hash: impl Into<Hash>) -> irpc::Result<BlobStatus> {
501 let hash = hash.into();
502 let msg = BlobStatusRequest { hash };
503 self.client.rpc(msg).await
504 }
505
506 pub async fn has(&self, hash: impl Into<Hash>) -> irpc::Result<bool> {
507 match self.status(hash).await? {
508 BlobStatus::Complete { .. } => Ok(true),
509 _ => Ok(false),
510 }
511 }
512
513 pub(crate) async fn clear_protected(&self) -> RequestResult<()> {
514 let msg = ClearProtectedRequest;
515 self.client.rpc(msg).await??;
516 Ok(())
517 }
518}
519
520pub struct BatchAddProgress<'a>(AddProgress<'a>);
522
523impl<'a> IntoFuture for BatchAddProgress<'a> {
524 type Output = RequestResult<TempTag>;
525
526 type IntoFuture = Pin<Box<dyn Future<Output = Self::Output> + Send + 'a>>;
527
528 fn into_future(self) -> Self::IntoFuture {
529 Box::pin(self.temp_tag())
530 }
531}
532
533impl<'a> BatchAddProgress<'a> {
534 pub async fn with_named_tag(self, name: impl AsRef<[u8]>) -> RequestResult<HashAndFormat> {
535 self.0.with_named_tag(name).await
536 }
537
538 pub async fn with_tag(self) -> RequestResult<TagInfo> {
539 self.0.with_tag().await
540 }
541
542 pub async fn stream(self) -> impl Stream<Item = AddProgressItem> {
543 self.0.stream().await
544 }
545
546 pub async fn temp_tag(self) -> RequestResult<TempTag> {
547 self.0.temp_tag().await
548 }
549}
550
551pub struct Batch<'a> {
553 scope: Scope,
554 blobs: &'a Blobs,
555 _tx: mpsc::Sender<BatchResponse>,
556}
557
558impl<'a> Batch<'a> {
559 pub fn add_bytes(&self, data: impl Into<Bytes>) -> BatchAddProgress<'_> {
560 let options = ImportBytesRequest {
561 data: data.into(),
562 format: crate::BlobFormat::Raw,
563 scope: self.scope,
564 };
565 BatchAddProgress(self.blobs.add_bytes_impl(options))
566 }
567
568 pub fn add_bytes_with_opts(&self, options: impl Into<AddBytesOptions>) -> BatchAddProgress<'_> {
569 let options = options.into();
570 BatchAddProgress(self.blobs.add_bytes_impl(ImportBytesRequest {
571 data: options.data,
572 format: options.format,
573 scope: self.scope,
574 }))
575 }
576
577 pub fn add_slice(&self, data: impl AsRef<[u8]>) -> BatchAddProgress<'_> {
578 let options = ImportBytesRequest {
579 data: Bytes::copy_from_slice(data.as_ref()),
580 format: crate::BlobFormat::Raw,
581 scope: self.scope,
582 };
583 BatchAddProgress(self.blobs.add_bytes_impl(options))
584 }
585
586 pub fn add_path_with_opts(&self, options: impl Into<AddPathOptions>) -> BatchAddProgress<'_> {
587 let options = options.into();
588 BatchAddProgress(self.blobs.add_path_with_opts_impl(ImportPathRequest {
589 path: options.path,
590 mode: options.mode,
591 format: options.format,
592 scope: self.scope,
593 }))
594 }
595
596 pub async fn temp_tag(&self, value: impl Into<HashAndFormat>) -> irpc::Result<TempTag> {
597 let value = value.into();
598 let msg = CreateTempTagRequest {
599 scope: self.scope,
600 value,
601 };
602 self.blobs.client.rpc(msg).await
603 }
604}
605
606#[derive(Debug)]
608pub struct AddPathOptions {
609 pub path: PathBuf,
610 pub format: BlobFormat,
611 pub mode: ImportMode,
612}
613
614pub struct AddProgress<'a> {
625 blobs: &'a Blobs,
626 inner: stream::Boxed<AddProgressItem>,
627}
628
629impl<'a> IntoFuture for AddProgress<'a> {
630 type Output = RequestResult<TagInfo>;
631
632 type IntoFuture = Pin<Box<dyn Future<Output = Self::Output> + Send + 'a>>;
633
634 fn into_future(self) -> Self::IntoFuture {
635 Box::pin(self.with_tag())
636 }
637}
638
639impl<'a> AddProgress<'a> {
640 fn new(blobs: &'a Blobs, stream: impl Stream<Item = AddProgressItem> + Send + 'static) -> Self {
641 Self {
642 blobs,
643 inner: Box::pin(stream),
644 }
645 }
646
647 pub async fn temp_tag(self) -> RequestResult<TempTag> {
648 let mut stream = self.inner;
649 while let Some(item) = stream.next().await {
650 match item {
651 AddProgressItem::Done(tt) => return Ok(tt),
652 AddProgressItem::Error(e) => return Err(e.into()),
653 _ => {}
654 }
655 }
656 Err(super::Error::other("unexpected end of stream").into())
657 }
658
659 pub async fn with_named_tag(self, name: impl AsRef<[u8]>) -> RequestResult<HashAndFormat> {
660 let blobs = self.blobs.clone();
661 let tt = self.temp_tag().await?;
662 let haf = *tt.hash_and_format();
663 let tags = Tags::ref_from_sender(&blobs.client);
664 tags.set(name, *tt.hash_and_format()).await?;
665 drop(tt);
666 Ok(haf)
667 }
668
669 pub async fn with_tag(self) -> RequestResult<TagInfo> {
670 let blobs = self.blobs.clone();
671 let tt = self.temp_tag().await?;
672 let hash = *tt.hash();
673 let format = tt.format();
674 let tags = Tags::ref_from_sender(&blobs.client);
675 let name = tags.create(*tt.hash_and_format()).await?;
676 drop(tt);
677 Ok(TagInfo { name, hash, format })
678 }
679
680 pub async fn stream(self) -> impl Stream<Item = AddProgressItem> {
681 self.inner
682 }
683}
684
685#[derive(Debug, Clone, Serialize, Deserialize)]
687pub struct ReaderOptions {
688 pub hash: Hash,
689}
690
691pub struct ObserveProgress {
696 inner: future::Boxed<irpc::Result<mpsc::Receiver<Bitfield>>>,
697}
698
699impl IntoFuture for ObserveProgress {
700 type Output = RequestResult<Bitfield>;
701
702 type IntoFuture = Pin<Box<dyn Future<Output = Self::Output> + Send>>;
703
704 fn into_future(self) -> Self::IntoFuture {
705 Box::pin(async move {
706 let mut rx = self.inner.await?;
707 match rx.recv().await? {
708 Some(bitfield) => Ok(bitfield),
709 None => Err(super::Error::other("unexpected end of stream").into()),
710 }
711 })
712 }
713}
714
715impl ObserveProgress {
716 fn new(
717 fut: impl Future<Output = irpc::Result<mpsc::Receiver<Bitfield>>> + Send + 'static,
718 ) -> Self {
719 Self {
720 inner: Box::pin(fut),
721 }
722 }
723
724 pub async fn await_completion(self) -> RequestResult<Bitfield> {
725 let mut stream = self.stream().await?;
726 while let Some(item) = stream.next().await {
727 if item.is_complete() {
728 return Ok(item);
729 }
730 }
731 Err(super::Error::other("unexpected end of stream").into())
732 }
733
734 pub async fn stream(self) -> irpc::Result<impl Stream<Item = Bitfield>> {
739 let mut rx = self.inner.await?;
740 Ok(Gen::new(|co| async move {
741 while let Ok(Some(item)) = rx.recv().await {
742 co.yield_(item).await;
743 }
744 }))
745 }
746}
747
748pub struct ExportProgress {
759 inner: future::Boxed<irpc::Result<mpsc::Receiver<ExportProgressItem>>>,
760}
761
762impl IntoFuture for ExportProgress {
763 type Output = RequestResult<u64>;
764
765 type IntoFuture = Pin<Box<dyn Future<Output = Self::Output> + Send>>;
766
767 fn into_future(self) -> Self::IntoFuture {
768 Box::pin(self.finish())
769 }
770}
771
772impl ExportProgress {
773 fn new(
774 fut: impl Future<Output = irpc::Result<mpsc::Receiver<ExportProgressItem>>> + Send + 'static,
775 ) -> Self {
776 Self {
777 inner: Box::pin(fut),
778 }
779 }
780
781 pub async fn stream(self) -> impl Stream<Item = ExportProgressItem> {
782 Gen::new(|co| async move {
783 let mut rx = match self.inner.await {
784 Ok(rx) => rx,
785 Err(e) => {
786 co.yield_(ExportProgressItem::Error(e.into())).await;
787 return;
788 }
789 };
790 while let Ok(Some(item)) = rx.recv().await {
791 co.yield_(item).await;
792 }
793 })
794 }
795
796 pub async fn finish(self) -> RequestResult<u64> {
797 let mut rx = self.inner.await?;
798 let mut size = None;
799 loop {
800 match rx.recv().await? {
801 Some(ExportProgressItem::Done) => break,
802 Some(ExportProgressItem::Size(s)) => size = Some(s),
803 Some(ExportProgressItem::Error(cause)) => return Err(cause.into()),
804 _ => {}
805 }
806 }
807 if let Some(size) = size {
808 Ok(size)
809 } else {
810 Err(super::Error::other("unexpected end of stream").into())
811 }
812 }
813}
814
815pub struct ImportBaoHandle {
817 pub tx: mpsc::Sender<BaoContentItem>,
818 pub rx: oneshot::Receiver<super::Result<()>>,
819}
820
821impl ImportBaoHandle {
822 pub(crate) async fn new(
823 fut: impl Future<
824 Output = irpc::Result<(
825 mpsc::Sender<BaoContentItem>,
826 oneshot::Receiver<super::Result<()>>,
827 )>,
828 > + Send
829 + 'static,
830 ) -> irpc::Result<Self> {
831 let (tx, rx) = fut.await?;
832 Ok(Self { tx, rx })
833 }
834}
835
836pub struct BlobsListProgress {
838 inner: future::Boxed<irpc::Result<mpsc::Receiver<super::Result<Hash>>>>,
839}
840
841impl BlobsListProgress {
842 fn new(
843 fut: impl Future<Output = irpc::Result<mpsc::Receiver<super::Result<Hash>>>> + Send + 'static,
844 ) -> Self {
845 Self {
846 inner: Box::pin(fut),
847 }
848 }
849
850 pub async fn hashes(self) -> RequestResult<Vec<Hash>> {
851 let mut rx: mpsc::Receiver<Result<Hash, super::Error>> = self.inner.await?;
852 let mut hashes = Vec::new();
853 while let Some(item) = rx.recv().await? {
854 hashes.push(item?);
855 }
856 Ok(hashes)
857 }
858
859 pub async fn stream(self) -> irpc::Result<impl Stream<Item = super::Result<Hash>>> {
860 let mut rx = self.inner.await?;
861 Ok(Gen::new(|co| async move {
862 while let Ok(Some(item)) = rx.recv().await {
863 co.yield_(item).await;
864 }
865 }))
866 }
867}
868
869pub struct ExportRangesProgress {
877 ranges: RangeSet2<u64>,
878 inner: future::Boxed<irpc::Result<mpsc::Receiver<ExportRangesItem>>>,
879}
880
881impl ExportRangesProgress {
882 fn new(
883 ranges: RangeSet2<u64>,
884 fut: impl Future<Output = irpc::Result<mpsc::Receiver<ExportRangesItem>>> + Send + 'static,
885 ) -> Self {
886 Self {
887 ranges,
888 inner: Box::pin(fut),
889 }
890 }
891}
892
893impl ExportRangesProgress {
894 pub fn stream(self) -> impl Stream<Item = ExportRangesItem> {
901 Gen::new(|co| async move {
902 let mut rx = match self.inner.await {
903 Ok(rx) => rx,
904 Err(e) => {
905 co.yield_(ExportRangesItem::Error(e.into())).await;
906 return;
907 }
908 };
909 while let Ok(Some(item)) = rx.recv().await {
910 co.yield_(item).await;
911 }
912 })
913 }
914
915 pub async fn concatenate(self) -> RequestResult<Vec<u8>> {
917 let mut rx = self.inner.await?;
918 let mut data = BTreeMap::new();
919 while let Some(item) = rx.recv().await? {
920 match item {
921 ExportRangesItem::Size(_) => {}
922 ExportRangesItem::Data(leaf) => {
923 data.insert(leaf.offset, leaf.data);
924 }
925 ExportRangesItem::Error(cause) => return Err(cause.into()),
926 }
927 }
928 let mut res = Vec::new();
929 for range in self.ranges.iter() {
930 let (start, end) = match range {
931 RangeSetRange::RangeFrom(range) => (*range.start, u64::MAX),
932 RangeSetRange::Range(range) => (*range.start, *range.end),
933 };
934 for (offset, data) in data.iter() {
935 let cstart = *offset;
936 let cend = *offset + (data.len() as u64);
937 if cstart >= end || cend <= start {
938 continue;
939 }
940 let start = start.max(cstart);
941 let end = end.min(cend);
942 let data = &data[(start - cstart) as usize..(end - cstart) as usize];
943 res.extend_from_slice(data);
944 }
945 }
946 Ok(res)
947 }
948}
949
950pub struct ExportBaoProgress {
958 inner: future::Boxed<irpc::Result<mpsc::Receiver<EncodedItem>>>,
959}
960
961impl ExportBaoProgress {
962 fn new(
963 fut: impl Future<Output = irpc::Result<mpsc::Receiver<EncodedItem>>> + Send + 'static,
964 ) -> Self {
965 Self {
966 inner: Box::pin(fut),
967 }
968 }
969
970 pub fn hashes_with_index(
977 self,
978 ) -> impl Stream<Item = std::result::Result<(u64, Hash), anyhow::Error>> {
979 let mut stream = self.stream();
980 Gen::new(|co| async move {
981 while let Some(item) = stream.next().await {
982 let leaf = match item {
983 EncodedItem::Leaf(leaf) => leaf,
984 EncodedItem::Error(e) => {
985 co.yield_(Err(e.into())).await;
986 continue;
987 }
988 _ => continue,
989 };
990 let slice = match HashSeqChunk::try_from(leaf) {
991 Ok(slice) => slice,
992 Err(e) => {
993 co.yield_(Err(e)).await;
994 continue;
995 }
996 };
997 let offset = slice.base();
998 for (o, hash) in slice.into_iter().enumerate() {
999 co.yield_(Ok((offset + o as u64, hash))).await;
1000 }
1001 }
1002 })
1003 }
1004
1005 pub fn hashes(self) -> impl Stream<Item = std::result::Result<Hash, anyhow::Error>> {
1007 self.hashes_with_index().map(|x| x.map(|(_, hash)| hash))
1008 }
1009
1010 pub async fn bao_to_vec(self) -> RequestResult<Vec<u8>> {
1011 let mut data = Vec::new();
1012 let mut stream = self.into_byte_stream();
1013 while let Some(item) = stream.next().await {
1014 data.extend_from_slice(&item?);
1015 }
1016 Ok(data)
1017 }
1018
1019 pub async fn data_to_bytes(self) -> super::ExportBaoResult<Bytes> {
1020 let mut rx = self.inner.await?;
1021 let mut data = Vec::new();
1022 while let Some(item) = rx.recv().await? {
1023 match item {
1024 EncodedItem::Leaf(leaf) => {
1025 data.push(leaf.data);
1026 }
1027 EncodedItem::Parent(_) => {}
1028 EncodedItem::Size(_) => {}
1029 EncodedItem::Done => break,
1030 EncodedItem::Error(cause) => return Err(cause.into()),
1031 }
1032 }
1033 if data.len() == 1 {
1034 Ok(data.pop().unwrap())
1035 } else {
1036 let mut out = Vec::new();
1037 for item in data {
1038 out.extend_from_slice(&item);
1039 }
1040 Ok(out.into())
1041 }
1042 }
1043
1044 pub async fn data_to_vec(self) -> super::ExportBaoResult<Vec<u8>> {
1045 let mut rx = self.inner.await?;
1046 let mut data = Vec::new();
1047 while let Some(item) = rx.recv().await? {
1048 match item {
1049 EncodedItem::Leaf(leaf) => {
1050 data.extend_from_slice(&leaf.data);
1051 }
1052 EncodedItem::Parent(_) => {}
1053 EncodedItem::Size(_) => {}
1054 EncodedItem::Done => break,
1055 EncodedItem::Error(cause) => return Err(cause.into()),
1056 }
1057 }
1058 Ok(data)
1059 }
1060
1061 pub async fn write_quinn(self, target: &mut quinn::SendStream) -> super::ExportBaoResult<()> {
1062 let mut rx = self.inner.await?;
1063 while let Some(item) = rx.recv().await? {
1064 match item {
1065 EncodedItem::Size(size) => {
1066 target.write_u64_le(size).await?;
1067 }
1068 EncodedItem::Parent(parent) => {
1069 let mut data = vec![0u8; 64];
1070 data[..32].copy_from_slice(parent.pair.0.as_bytes());
1071 data[32..].copy_from_slice(parent.pair.1.as_bytes());
1072 target.write_all(&data).await.map_err(io::Error::from)?;
1073 }
1074 EncodedItem::Leaf(leaf) => {
1075 target
1076 .write_chunk(leaf.data)
1077 .await
1078 .map_err(io::Error::from)?;
1079 }
1080 EncodedItem::Done => break,
1081 EncodedItem::Error(cause) => return Err(cause.into()),
1082 }
1083 }
1084 Ok(())
1085 }
1086
1087 pub(crate) async fn write_quinn_with_progress(
1089 self,
1090 writer: &mut SendStream,
1091 progress: &mut impl WriteProgress,
1092 hash: &Hash,
1093 index: u64,
1094 ) -> super::ExportBaoResult<()> {
1095 let mut rx = self.inner.await?;
1096 while let Some(item) = rx.recv().await? {
1097 match item {
1098 EncodedItem::Size(size) => {
1099 progress.send_transfer_started(index, hash, size).await;
1100 writer.write_u64_le(size).await?;
1101 progress.log_other_write(8);
1102 }
1103 EncodedItem::Parent(parent) => {
1104 let mut data = vec![0u8; 64];
1105 data[..32].copy_from_slice(parent.pair.0.as_bytes());
1106 data[32..].copy_from_slice(parent.pair.1.as_bytes());
1107 writer.write_all(&data).await.map_err(io::Error::from)?;
1108 progress.log_other_write(64);
1109 }
1110 EncodedItem::Leaf(leaf) => {
1111 let len = leaf.data.len();
1112 writer
1113 .write_chunk(leaf.data)
1114 .await
1115 .map_err(io::Error::from)?;
1116 progress.notify_payload_write(index, leaf.offset, len).await;
1117 }
1118 EncodedItem::Done => break,
1119 EncodedItem::Error(cause) => return Err(cause.into()),
1120 }
1121 }
1122 Ok(())
1123 }
1124
1125 pub fn into_byte_stream(self) -> impl Stream<Item = super::Result<Bytes>> {
1126 self.stream().filter_map(|item| match item {
1127 EncodedItem::Size(size) => {
1128 let size = size.to_le_bytes().to_vec().into();
1129 Some(Ok(size))
1130 }
1131 EncodedItem::Parent(parent) => {
1132 let mut data = vec![0u8; 64];
1133 data[..32].copy_from_slice(parent.pair.0.as_bytes());
1134 data[32..].copy_from_slice(parent.pair.1.as_bytes());
1135 Some(Ok(data.into()))
1136 }
1137 EncodedItem::Leaf(leaf) => Some(Ok(leaf.data)),
1138 EncodedItem::Done => None,
1139 EncodedItem::Error(cause) => Some(Err(cause.into())),
1140 })
1141 }
1142
1143 pub fn stream(self) -> impl Stream<Item = EncodedItem> {
1144 Gen::new(|co| async move {
1145 let mut rx = match self.inner.await {
1146 Ok(rx) => rx,
1147 Err(cause) => {
1148 co.yield_(EncodedItem::Error(io::Error::other(cause).into()))
1149 .await;
1150 return;
1151 }
1152 };
1153 while let Ok(Some(item)) = rx.recv().await {
1154 co.yield_(item).await;
1155 }
1156 })
1157 }
1158}
1159
1160pub(crate) trait WriteProgress {
1161 async fn notify_payload_write(&mut self, index: u64, offset: u64, len: usize);
1163
1164 fn log_other_write(&mut self, len: usize);
1166
1167 async fn send_transfer_started(&mut self, index: u64, hash: &Hash, size: u64);
1169}
1170
1171impl WriteProgress for StreamContext {
1172 async fn notify_payload_write(&mut self, index: u64, offset: u64, len: usize) {
1173 StreamContext::notify_payload_write(self, index, offset, len);
1174 }
1175
1176 fn log_other_write(&mut self, len: usize) {
1177 StreamContext::log_other_write(self, len);
1178 }
1179
1180 async fn send_transfer_started(&mut self, index: u64, hash: &Hash, size: u64) {
1181 StreamContext::send_transfer_started(self, index, hash, size).await
1182 }
1183}