1use std::{
8 collections::BTreeMap,
9 future::{Future, IntoFuture},
10 io,
11 num::NonZeroU64,
12 path::{Path, PathBuf},
13 pin::Pin,
14};
15
16pub use bao_tree::io::mixed::EncodedItem;
17use bao_tree::{
18 io::{
19 fsm::{ResponseDecoder, ResponseDecoderNext},
20 BaoContentItem, Leaf,
21 },
22 BaoTree, ChunkNum, ChunkRanges,
23};
24use bytes::Bytes;
25use genawaiter::sync::Gen;
26use iroh_io::{AsyncStreamReader, TokioStreamReader};
27use irpc::channel::{mpsc, oneshot};
28use n0_future::{future, stream, Stream, StreamExt};
29use quinn::SendStream;
30use range_collections::{range_set::RangeSetRange, RangeSet2};
31use ref_cast::RefCast;
32use tokio::io::AsyncWriteExt;
33use tracing::trace;
34
35pub use super::proto::{
40 AddProgressItem, Bitfield, BlobDeleteRequest as DeleteOptions, BlobStatus,
41 ExportBaoRequest as ExportBaoOptions, ExportMode, ExportPathRequest as ExportOptions,
42 ExportProgressItem, ExportRangesRequest as ExportRangesOptions,
43 ImportBaoRequest as ImportBaoOptions, ImportMode, ObserveRequest as ObserveOptions,
44};
45use super::{
46 proto::{
47 BatchResponse, BlobStatusRequest, ClearProtectedRequest, CreateTempTagRequest,
48 ExportBaoRequest, ExportRangesItem, ImportBaoRequest, ImportByteStreamRequest,
49 ImportBytesRequest, ImportPathRequest, ListRequest, Scope,
50 },
51 remote::HashSeqChunk,
52 tags::TagInfo,
53 ApiClient, RequestResult, Tags,
54};
55use crate::{
56 api::proto::{BatchRequest, ImportByteStreamUpdate},
57 provider::StreamContext,
58 store::IROH_BLOCK_SIZE,
59 util::temp_tag::TempTag,
60 BlobFormat, Hash, HashAndFormat,
61};
62
63#[derive(Debug)]
65pub struct AddBytesOptions {
66 pub data: Bytes,
67 pub format: BlobFormat,
68}
69
70impl<T: Into<Bytes>> From<(T, BlobFormat)> for AddBytesOptions {
71 fn from(item: (T, BlobFormat)) -> Self {
72 let (data, format) = item;
73 Self {
74 data: data.into(),
75 format,
76 }
77 }
78}
79
80#[derive(Debug, Clone, ref_cast::RefCast)]
82#[repr(transparent)]
83pub struct Blobs {
84 client: ApiClient,
85}
86
87impl Blobs {
88 pub(crate) fn ref_from_sender(sender: &ApiClient) -> &Self {
89 Self::ref_cast(sender)
90 }
91
92 pub async fn batch(&self) -> irpc::Result<Batch<'_>> {
93 let msg = BatchRequest;
94 trace!("{msg:?}");
95 let (tx, rx) = self.client.client_streaming(msg, 32).await?;
96 let scope = rx.await?;
97
98 Ok(Batch {
99 scope,
100 blobs: self,
101 _tx: tx,
102 })
103 }
104
105 pub async fn delete_with_opts(&self, options: DeleteOptions) -> RequestResult<()> {
106 trace!("{options:?}");
107 self.client.rpc(options).await??;
108 Ok(())
109 }
110
111 pub async fn delete(
112 &self,
113 hashes: impl IntoIterator<Item = impl Into<Hash>>,
114 ) -> RequestResult<()> {
115 self.delete_with_opts(DeleteOptions {
116 hashes: hashes.into_iter().map(Into::into).collect(),
117 force: false,
118 })
119 .await
120 }
121
122 pub fn add_slice(&self, data: impl AsRef<[u8]>) -> AddProgress {
123 let options = ImportBytesRequest {
124 data: Bytes::copy_from_slice(data.as_ref()),
125 format: crate::BlobFormat::Raw,
126 scope: Scope::GLOBAL,
127 };
128 self.add_bytes_impl(options)
129 }
130
131 pub fn add_bytes(&self, data: impl Into<bytes::Bytes>) -> AddProgress {
132 let options = ImportBytesRequest {
133 data: data.into(),
134 format: crate::BlobFormat::Raw,
135 scope: Scope::GLOBAL,
136 };
137 self.add_bytes_impl(options)
138 }
139
140 pub fn add_bytes_with_opts(&self, options: impl Into<AddBytesOptions>) -> AddProgress {
141 let options = options.into();
142 let request = ImportBytesRequest {
143 data: options.data,
144 format: options.format,
145 scope: Scope::GLOBAL,
146 };
147 self.add_bytes_impl(request)
148 }
149
150 fn add_bytes_impl(&self, options: ImportBytesRequest) -> AddProgress {
151 trace!("{options:?}");
152 let this = self.clone();
153 let stream = Gen::new(|co| async move {
154 let mut receiver = match this.client.server_streaming(options, 32).await {
155 Ok(receiver) => receiver,
156 Err(cause) => {
157 co.yield_(AddProgressItem::Error(cause.into())).await;
158 return;
159 }
160 };
161 loop {
162 match receiver.recv().await {
163 Ok(Some(item)) => co.yield_(item).await,
164 Err(cause) => {
165 co.yield_(AddProgressItem::Error(cause.into())).await;
166 break;
167 }
168 Ok(None) => break,
169 }
170 }
171 });
172 AddProgress::new(self, stream)
173 }
174
175 pub fn add_path_with_opts(&self, options: impl Into<AddPathOptions>) -> AddProgress {
176 let options = options.into();
177 self.add_path_with_opts_impl(ImportPathRequest {
178 path: options.path,
179 mode: options.mode,
180 format: options.format,
181 scope: Scope::GLOBAL,
182 })
183 }
184
185 fn add_path_with_opts_impl(&self, options: ImportPathRequest) -> AddProgress {
186 trace!("{:?}", options);
187 let client = self.client.clone();
188 let stream = Gen::new(|co| async move {
189 let mut receiver = match client.server_streaming(options, 32).await {
190 Ok(receiver) => receiver,
191 Err(cause) => {
192 co.yield_(AddProgressItem::Error(cause.into())).await;
193 return;
194 }
195 };
196 loop {
197 match receiver.recv().await {
198 Ok(Some(item)) => co.yield_(item).await,
199 Err(cause) => {
200 co.yield_(AddProgressItem::Error(cause.into())).await;
201 break;
202 }
203 Ok(None) => break,
204 }
205 }
206 });
207 AddProgress::new(self, stream)
208 }
209
210 pub fn add_path(&self, path: impl AsRef<Path>) -> AddProgress {
211 self.add_path_with_opts(AddPathOptions {
212 path: path.as_ref().to_owned(),
213 mode: ImportMode::Copy,
214 format: BlobFormat::Raw,
215 })
216 }
217
218 pub async fn add_stream(
219 &self,
220 data: impl Stream<Item = io::Result<Bytes>> + Send + Sync + 'static,
221 ) -> AddProgress {
222 let inner = ImportByteStreamRequest {
223 format: crate::BlobFormat::Raw,
224 scope: Scope::default(),
225 };
226 let client = self.client.clone();
227 let stream = Gen::new(|co| async move {
228 let (sender, mut receiver) = match client.bidi_streaming(inner, 32, 32).await {
229 Ok(x) => x,
230 Err(cause) => {
231 co.yield_(AddProgressItem::Error(cause.into())).await;
232 return;
233 }
234 };
235 let recv = async {
236 loop {
237 match receiver.recv().await {
238 Ok(Some(item)) => co.yield_(item).await,
239 Err(cause) => {
240 co.yield_(AddProgressItem::Error(cause.into())).await;
241 break;
242 }
243 Ok(None) => break,
244 }
245 }
246 };
247 let send = async {
248 tokio::pin!(data);
249 while let Some(item) = data.next().await {
250 sender.send(ImportByteStreamUpdate::Bytes(item?)).await?;
251 }
252 sender.send(ImportByteStreamUpdate::Done).await?;
253 anyhow::Ok(())
254 };
255 let _ = tokio::join!(send, recv);
256 });
257 AddProgress::new(self, stream)
258 }
259
260 pub fn export_ranges(
261 &self,
262 hash: impl Into<Hash>,
263 ranges: impl Into<RangeSet2<u64>>,
264 ) -> ExportRangesProgress {
265 self.export_ranges_with_opts(ExportRangesOptions {
266 hash: hash.into(),
267 ranges: ranges.into(),
268 })
269 }
270
271 pub fn export_ranges_with_opts(&self, options: ExportRangesOptions) -> ExportRangesProgress {
272 trace!("{options:?}");
273 ExportRangesProgress::new(
274 options.ranges.clone(),
275 self.client.server_streaming(options, 32),
276 )
277 }
278
279 pub fn export_bao_with_opts(
280 &self,
281 options: ExportBaoOptions,
282 local_update_cap: usize,
283 ) -> ExportBaoProgress {
284 trace!("{options:?}");
285 ExportBaoProgress::new(self.client.server_streaming(options, local_update_cap))
286 }
287
288 pub fn export_bao(
289 &self,
290 hash: impl Into<Hash>,
291 ranges: impl Into<ChunkRanges>,
292 ) -> ExportBaoProgress {
293 self.export_bao_with_opts(
294 ExportBaoRequest {
295 hash: hash.into(),
296 ranges: ranges.into(),
297 },
298 32,
299 )
300 }
301
302 pub async fn export_chunk(
304 &self,
305 hash: impl Into<Hash>,
306 offset: u64,
307 ) -> super::ExportBaoResult<Leaf> {
308 let base = ChunkNum::full_chunks(offset);
309 let ranges = ChunkRanges::from(base..base + 1);
310 let mut stream = self.export_bao(hash, ranges).stream();
311 while let Some(item) = stream.next().await {
312 match item {
313 EncodedItem::Leaf(leaf) => return Ok(leaf),
314 EncodedItem::Parent(_) => {}
315 EncodedItem::Size(_) => {}
316 EncodedItem::Done => break,
317 EncodedItem::Error(cause) => return Err(cause.into()),
318 }
319 }
320 Err(io::Error::other("unexpected end of stream").into())
321 }
322
323 pub async fn get_bytes(&self, hash: impl Into<Hash>) -> super::ExportBaoResult<Bytes> {
327 self.export_bao(hash.into(), ChunkRanges::all())
328 .data_to_bytes()
329 .await
330 }
331
332 pub fn observe(&self, hash: impl Into<Hash>) -> ObserveProgress {
334 self.observe_with_opts(ObserveOptions { hash: hash.into() })
335 }
336
337 pub fn observe_with_opts(&self, options: ObserveOptions) -> ObserveProgress {
338 trace!("{:?}", options);
339 if options.hash == Hash::EMPTY {
340 return ObserveProgress::new(async move {
341 let (tx, rx) = mpsc::channel(1);
342 tx.send(Bitfield::complete(0)).await.ok();
343 Ok(rx)
344 });
345 }
346 ObserveProgress::new(self.client.server_streaming(options, 32))
347 }
348
349 pub fn export_with_opts(&self, options: ExportOptions) -> ExportProgress {
350 trace!("{:?}", options);
351 ExportProgress::new(self.client.server_streaming(options, 32))
352 }
353
354 pub fn export(&self, hash: impl Into<Hash>, target: impl AsRef<Path>) -> ExportProgress {
355 let options = ExportOptions {
356 hash: hash.into(),
357 mode: ExportMode::Copy,
358 target: target.as_ref().to_owned(),
359 };
360 self.export_with_opts(options)
361 }
362
363 #[cfg_attr(feature = "hide-proto-docs", doc(hidden))]
367 pub async fn import_bao(
368 &self,
369 hash: impl Into<Hash>,
370 size: NonZeroU64,
371 local_update_cap: usize,
372 ) -> irpc::Result<ImportBaoHandle> {
373 let options = ImportBaoRequest {
374 hash: hash.into(),
375 size,
376 };
377 self.import_bao_with_opts(options, local_update_cap).await
378 }
379
380 #[cfg_attr(feature = "hide-proto-docs", doc(hidden))]
381 pub async fn import_bao_with_opts(
382 &self,
383 options: ImportBaoOptions,
384 local_update_cap: usize,
385 ) -> irpc::Result<ImportBaoHandle> {
386 trace!("{:?}", options);
387 ImportBaoHandle::new(self.client.client_streaming(options, local_update_cap)).await
388 }
389
390 #[cfg_attr(feature = "hide-proto-docs", doc(hidden))]
391 async fn import_bao_reader<R: AsyncStreamReader>(
392 &self,
393 hash: Hash,
394 ranges: ChunkRanges,
395 mut reader: R,
396 ) -> RequestResult<R> {
397 let size = u64::from_le_bytes(reader.read::<8>().await.map_err(super::Error::other)?);
398 let Some(size) = NonZeroU64::new(size) else {
399 return if hash == Hash::EMPTY {
400 Ok(reader)
401 } else {
402 Err(super::Error::other("invalid size for hash").into())
403 };
404 };
405 let tree = BaoTree::new(size.get(), IROH_BLOCK_SIZE);
406 let mut decoder = ResponseDecoder::new(hash.into(), ranges, tree, reader);
407 let options = ImportBaoOptions { hash, size };
408 let handle = self.import_bao_with_opts(options, 32).await?;
409 let driver = async move {
410 let reader = loop {
411 match decoder.next().await {
412 ResponseDecoderNext::More((rest, item)) => {
413 handle.tx.send(item?).await?;
414 decoder = rest;
415 }
416 ResponseDecoderNext::Done(reader) => break reader,
417 };
418 };
419 drop(handle.tx);
420 io::Result::Ok(reader)
421 };
422 let fut = async move { handle.rx.await.map_err(io::Error::other)? };
423 let (reader, res) = tokio::join!(driver, fut);
424 res?;
425 Ok(reader?)
426 }
427
428 #[cfg_attr(feature = "hide-proto-docs", doc(hidden))]
429 pub async fn import_bao_quinn(
430 &self,
431 hash: Hash,
432 ranges: ChunkRanges,
433 stream: &mut iroh::endpoint::RecvStream,
434 ) -> RequestResult<()> {
435 let reader = TokioStreamReader::new(stream);
436 self.import_bao_reader(hash, ranges, reader).await?;
437 Ok(())
438 }
439
440 #[cfg_attr(feature = "hide-proto-docs", doc(hidden))]
441 pub async fn import_bao_bytes(
442 &self,
443 hash: Hash,
444 ranges: ChunkRanges,
445 data: impl Into<Bytes>,
446 ) -> RequestResult<()> {
447 self.import_bao_reader(hash, ranges, data.into()).await?;
448 Ok(())
449 }
450
451 pub fn list(&self) -> BlobsListProgress {
452 let msg = ListRequest;
453 let client = self.client.clone();
454 BlobsListProgress::new(client.server_streaming(msg, 32))
455 }
456
457 pub async fn status(&self, hash: impl Into<Hash>) -> irpc::Result<BlobStatus> {
458 let hash = hash.into();
459 let msg = BlobStatusRequest { hash };
460 self.client.rpc(msg).await
461 }
462
463 pub async fn has(&self, hash: impl Into<Hash>) -> irpc::Result<bool> {
464 match self.status(hash).await? {
465 BlobStatus::Complete { .. } => Ok(true),
466 _ => Ok(false),
467 }
468 }
469
470 pub(crate) async fn clear_protected(&self) -> RequestResult<()> {
471 let msg = ClearProtectedRequest;
472 self.client.rpc(msg).await??;
473 Ok(())
474 }
475}
476
477pub struct BatchAddProgress<'a>(AddProgress<'a>);
479
480impl<'a> IntoFuture for BatchAddProgress<'a> {
481 type Output = RequestResult<TempTag>;
482
483 type IntoFuture = Pin<Box<dyn Future<Output = Self::Output> + Send + 'a>>;
484
485 fn into_future(self) -> Self::IntoFuture {
486 Box::pin(self.temp_tag())
487 }
488}
489
490impl<'a> BatchAddProgress<'a> {
491 pub async fn with_named_tag(self, name: impl AsRef<[u8]>) -> RequestResult<HashAndFormat> {
492 self.0.with_named_tag(name).await
493 }
494
495 pub async fn with_tag(self) -> RequestResult<TagInfo> {
496 self.0.with_tag().await
497 }
498
499 pub async fn stream(self) -> impl Stream<Item = AddProgressItem> {
500 self.0.stream().await
501 }
502
503 pub async fn temp_tag(self) -> RequestResult<TempTag> {
504 self.0.temp_tag().await
505 }
506}
507
508pub struct Batch<'a> {
510 scope: Scope,
511 blobs: &'a Blobs,
512 _tx: mpsc::Sender<BatchResponse>,
513}
514
515impl<'a> Batch<'a> {
516 pub fn add_bytes(&self, data: impl Into<Bytes>) -> BatchAddProgress {
517 let options = ImportBytesRequest {
518 data: data.into(),
519 format: crate::BlobFormat::Raw,
520 scope: self.scope,
521 };
522 BatchAddProgress(self.blobs.add_bytes_impl(options))
523 }
524
525 pub fn add_bytes_with_opts(&self, options: impl Into<AddBytesOptions>) -> BatchAddProgress {
526 let options = options.into();
527 BatchAddProgress(self.blobs.add_bytes_impl(ImportBytesRequest {
528 data: options.data,
529 format: options.format,
530 scope: self.scope,
531 }))
532 }
533
534 pub fn add_slice(&self, data: impl AsRef<[u8]>) -> BatchAddProgress {
535 let options = ImportBytesRequest {
536 data: Bytes::copy_from_slice(data.as_ref()),
537 format: crate::BlobFormat::Raw,
538 scope: self.scope,
539 };
540 BatchAddProgress(self.blobs.add_bytes_impl(options))
541 }
542
543 pub fn add_path_with_opts(&self, options: impl Into<AddPathOptions>) -> BatchAddProgress {
544 let options = options.into();
545 BatchAddProgress(self.blobs.add_path_with_opts_impl(ImportPathRequest {
546 path: options.path,
547 mode: options.mode,
548 format: options.format,
549 scope: self.scope,
550 }))
551 }
552
553 pub async fn temp_tag(&self, value: impl Into<HashAndFormat>) -> irpc::Result<TempTag> {
554 let value = value.into();
555 let msg = CreateTempTagRequest {
556 scope: self.scope,
557 value,
558 };
559 self.blobs.client.rpc(msg).await
560 }
561}
562
563#[derive(Debug)]
565pub struct AddPathOptions {
566 pub path: PathBuf,
567 pub format: BlobFormat,
568 pub mode: ImportMode,
569}
570
571pub struct AddProgress<'a> {
582 blobs: &'a Blobs,
583 inner: stream::Boxed<AddProgressItem>,
584}
585
586impl<'a> IntoFuture for AddProgress<'a> {
587 type Output = RequestResult<TagInfo>;
588
589 type IntoFuture = Pin<Box<dyn Future<Output = Self::Output> + Send + 'a>>;
590
591 fn into_future(self) -> Self::IntoFuture {
592 Box::pin(self.with_tag())
593 }
594}
595
596impl<'a> AddProgress<'a> {
597 fn new(blobs: &'a Blobs, stream: impl Stream<Item = AddProgressItem> + Send + 'static) -> Self {
598 Self {
599 blobs,
600 inner: Box::pin(stream),
601 }
602 }
603
604 pub async fn temp_tag(self) -> RequestResult<TempTag> {
605 let mut stream = self.inner;
606 while let Some(item) = stream.next().await {
607 match item {
608 AddProgressItem::Done(tt) => return Ok(tt),
609 AddProgressItem::Error(e) => return Err(e.into()),
610 _ => {}
611 }
612 }
613 Err(super::Error::other("unexpected end of stream").into())
614 }
615
616 pub async fn with_named_tag(self, name: impl AsRef<[u8]>) -> RequestResult<HashAndFormat> {
617 let blobs = self.blobs.clone();
618 let tt = self.temp_tag().await?;
619 let haf = *tt.hash_and_format();
620 let tags = Tags::ref_from_sender(&blobs.client);
621 tags.set(name, *tt.hash_and_format()).await?;
622 drop(tt);
623 Ok(haf)
624 }
625
626 pub async fn with_tag(self) -> RequestResult<TagInfo> {
627 let blobs = self.blobs.clone();
628 let tt = self.temp_tag().await?;
629 let hash = *tt.hash();
630 let format = tt.format();
631 let tags = Tags::ref_from_sender(&blobs.client);
632 let name = tags.create(*tt.hash_and_format()).await?;
633 drop(tt);
634 Ok(TagInfo { name, hash, format })
635 }
636
637 pub async fn stream(self) -> impl Stream<Item = AddProgressItem> {
638 self.inner
639 }
640}
641
642pub struct ObserveProgress {
647 inner: future::Boxed<irpc::Result<mpsc::Receiver<Bitfield>>>,
648}
649
650impl IntoFuture for ObserveProgress {
651 type Output = RequestResult<Bitfield>;
652
653 type IntoFuture = Pin<Box<dyn Future<Output = Self::Output> + Send>>;
654
655 fn into_future(self) -> Self::IntoFuture {
656 Box::pin(async move {
657 let mut rx = self.inner.await?;
658 match rx.recv().await? {
659 Some(bitfield) => Ok(bitfield),
660 None => Err(super::Error::other("unexpected end of stream").into()),
661 }
662 })
663 }
664}
665
666impl ObserveProgress {
667 fn new(
668 fut: impl Future<Output = irpc::Result<mpsc::Receiver<Bitfield>>> + Send + 'static,
669 ) -> Self {
670 Self {
671 inner: Box::pin(fut),
672 }
673 }
674
675 pub async fn await_completion(self) -> RequestResult<Bitfield> {
676 let mut stream = self.stream().await?;
677 while let Some(item) = stream.next().await {
678 if item.is_complete() {
679 return Ok(item);
680 }
681 }
682 Err(super::Error::other("unexpected end of stream").into())
683 }
684
685 pub async fn stream(self) -> irpc::Result<impl Stream<Item = Bitfield>> {
690 let mut rx = self.inner.await?;
691 Ok(Gen::new(|co| async move {
692 while let Ok(Some(item)) = rx.recv().await {
693 co.yield_(item).await;
694 }
695 }))
696 }
697}
698
699pub struct ExportProgress {
710 inner: future::Boxed<irpc::Result<mpsc::Receiver<ExportProgressItem>>>,
711}
712
713impl IntoFuture for ExportProgress {
714 type Output = RequestResult<u64>;
715
716 type IntoFuture = Pin<Box<dyn Future<Output = Self::Output> + Send>>;
717
718 fn into_future(self) -> Self::IntoFuture {
719 Box::pin(self.finish())
720 }
721}
722
723impl ExportProgress {
724 fn new(
725 fut: impl Future<Output = irpc::Result<mpsc::Receiver<ExportProgressItem>>> + Send + 'static,
726 ) -> Self {
727 Self {
728 inner: Box::pin(fut),
729 }
730 }
731
732 pub async fn stream(self) -> impl Stream<Item = ExportProgressItem> {
733 Gen::new(|co| async move {
734 let mut rx = match self.inner.await {
735 Ok(rx) => rx,
736 Err(e) => {
737 co.yield_(ExportProgressItem::Error(e.into())).await;
738 return;
739 }
740 };
741 while let Ok(Some(item)) = rx.recv().await {
742 co.yield_(item).await;
743 }
744 })
745 }
746
747 pub async fn finish(self) -> RequestResult<u64> {
748 let mut rx = self.inner.await?;
749 let mut size = None;
750 loop {
751 match rx.recv().await? {
752 Some(ExportProgressItem::Done) => break,
753 Some(ExportProgressItem::Size(s)) => size = Some(s),
754 Some(ExportProgressItem::Error(cause)) => return Err(cause.into()),
755 _ => {}
756 }
757 }
758 if let Some(size) = size {
759 Ok(size)
760 } else {
761 Err(super::Error::other("unexpected end of stream").into())
762 }
763 }
764}
765
766pub struct ImportBaoHandle {
768 pub tx: mpsc::Sender<BaoContentItem>,
769 pub rx: oneshot::Receiver<super::Result<()>>,
770}
771
772impl ImportBaoHandle {
773 pub(crate) async fn new(
774 fut: impl Future<
775 Output = irpc::Result<(
776 mpsc::Sender<BaoContentItem>,
777 oneshot::Receiver<super::Result<()>>,
778 )>,
779 > + Send
780 + 'static,
781 ) -> irpc::Result<Self> {
782 let (tx, rx) = fut.await?;
783 Ok(Self { tx, rx })
784 }
785}
786
787pub struct BlobsListProgress {
789 inner: future::Boxed<irpc::Result<mpsc::Receiver<super::Result<Hash>>>>,
790}
791
792impl BlobsListProgress {
793 fn new(
794 fut: impl Future<Output = irpc::Result<mpsc::Receiver<super::Result<Hash>>>> + Send + 'static,
795 ) -> Self {
796 Self {
797 inner: Box::pin(fut),
798 }
799 }
800
801 pub async fn hashes(self) -> RequestResult<Vec<Hash>> {
802 let mut rx: mpsc::Receiver<Result<Hash, super::Error>> = self.inner.await?;
803 let mut hashes = Vec::new();
804 while let Some(item) = rx.recv().await? {
805 hashes.push(item?);
806 }
807 Ok(hashes)
808 }
809
810 pub async fn stream(self) -> irpc::Result<impl Stream<Item = super::Result<Hash>>> {
811 let mut rx = self.inner.await?;
812 Ok(Gen::new(|co| async move {
813 while let Ok(Some(item)) = rx.recv().await {
814 co.yield_(item).await;
815 }
816 }))
817 }
818}
819
820pub struct ExportRangesProgress {
828 ranges: RangeSet2<u64>,
829 inner: future::Boxed<irpc::Result<mpsc::Receiver<ExportRangesItem>>>,
830}
831
832impl ExportRangesProgress {
833 fn new(
834 ranges: RangeSet2<u64>,
835 fut: impl Future<Output = irpc::Result<mpsc::Receiver<ExportRangesItem>>> + Send + 'static,
836 ) -> Self {
837 Self {
838 ranges,
839 inner: Box::pin(fut),
840 }
841 }
842}
843
844impl ExportRangesProgress {
845 pub async fn stream(self) -> impl Stream<Item = ExportRangesItem> {
852 Gen::new(|co| async move {
853 let mut rx = match self.inner.await {
854 Ok(rx) => rx,
855 Err(e) => {
856 co.yield_(ExportRangesItem::Error(e.into())).await;
857 return;
858 }
859 };
860 while let Ok(Some(item)) = rx.recv().await {
861 co.yield_(item).await;
862 }
863 })
864 }
865
866 pub async fn concatenate(self) -> RequestResult<Vec<u8>> {
868 let mut rx = self.inner.await?;
869 let mut data = BTreeMap::new();
870 while let Some(item) = rx.recv().await? {
871 match item {
872 ExportRangesItem::Size(_) => {}
873 ExportRangesItem::Data(leaf) => {
874 data.insert(leaf.offset, leaf.data);
875 }
876 ExportRangesItem::Error(cause) => return Err(cause.into()),
877 }
878 }
879 let mut res = Vec::new();
880 for range in self.ranges.iter() {
881 let (start, end) = match range {
882 RangeSetRange::RangeFrom(range) => (*range.start, u64::MAX),
883 RangeSetRange::Range(range) => (*range.start, *range.end),
884 };
885 for (offset, data) in data.iter() {
886 let cstart = *offset;
887 let cend = *offset + (data.len() as u64);
888 if cstart >= end || cend <= start {
889 continue;
890 }
891 let start = start.max(cstart);
892 let end = end.min(cend);
893 let data = &data[(start - cstart) as usize..(end - cstart) as usize];
894 res.extend_from_slice(data);
895 }
896 }
897 Ok(res)
898 }
899}
900
901pub struct ExportBaoProgress {
909 inner: future::Boxed<irpc::Result<mpsc::Receiver<EncodedItem>>>,
910}
911
912impl ExportBaoProgress {
913 fn new(
914 fut: impl Future<Output = irpc::Result<mpsc::Receiver<EncodedItem>>> + Send + 'static,
915 ) -> Self {
916 Self {
917 inner: Box::pin(fut),
918 }
919 }
920
921 pub fn hashes_with_index(
928 self,
929 ) -> impl Stream<Item = std::result::Result<(u64, Hash), anyhow::Error>> {
930 let mut stream = self.stream();
931 Gen::new(|co| async move {
932 while let Some(item) = stream.next().await {
933 let leaf = match item {
934 EncodedItem::Leaf(leaf) => leaf,
935 EncodedItem::Error(e) => {
936 co.yield_(Err(e.into())).await;
937 continue;
938 }
939 _ => continue,
940 };
941 let slice = match HashSeqChunk::try_from(leaf) {
942 Ok(slice) => slice,
943 Err(e) => {
944 co.yield_(Err(e)).await;
945 continue;
946 }
947 };
948 let offset = slice.base();
949 for (o, hash) in slice.into_iter().enumerate() {
950 co.yield_(Ok((offset + o as u64, hash))).await;
951 }
952 }
953 })
954 }
955
956 pub fn hashes(self) -> impl Stream<Item = std::result::Result<Hash, anyhow::Error>> {
958 self.hashes_with_index().map(|x| x.map(|(_, hash)| hash))
959 }
960
961 pub async fn bao_to_vec(self) -> RequestResult<Vec<u8>> {
962 let mut data = Vec::new();
963 let mut stream = self.into_byte_stream();
964 while let Some(item) = stream.next().await {
965 println!("item: {item:?}");
966 data.extend_from_slice(&item?);
967 }
968 Ok(data)
969 }
970
971 pub async fn data_to_bytes(self) -> super::ExportBaoResult<Bytes> {
972 let mut rx = self.inner.await?;
973 let mut data = Vec::new();
974 while let Some(item) = rx.recv().await? {
975 match item {
976 EncodedItem::Leaf(leaf) => {
977 data.push(leaf.data);
978 }
979 EncodedItem::Parent(_) => {}
980 EncodedItem::Size(_) => {}
981 EncodedItem::Done => break,
982 EncodedItem::Error(cause) => return Err(cause.into()),
983 }
984 }
985 if data.len() == 1 {
986 Ok(data.pop().unwrap())
987 } else {
988 let mut out = Vec::new();
989 for item in data {
990 out.extend_from_slice(&item);
991 }
992 Ok(out.into())
993 }
994 }
995
996 pub async fn data_to_vec(self) -> super::ExportBaoResult<Vec<u8>> {
997 let mut rx = self.inner.await?;
998 let mut data = Vec::new();
999 while let Some(item) = rx.recv().await? {
1000 match item {
1001 EncodedItem::Leaf(leaf) => {
1002 data.extend_from_slice(&leaf.data);
1003 }
1004 EncodedItem::Parent(_) => {}
1005 EncodedItem::Size(_) => {}
1006 EncodedItem::Done => break,
1007 EncodedItem::Error(cause) => return Err(cause.into()),
1008 }
1009 }
1010 Ok(data)
1011 }
1012
1013 pub async fn write_quinn(self, target: &mut quinn::SendStream) -> super::ExportBaoResult<()> {
1014 let mut rx = self.inner.await?;
1015 while let Some(item) = rx.recv().await? {
1016 match item {
1017 EncodedItem::Size(size) => {
1018 target.write_u64_le(size).await?;
1019 }
1020 EncodedItem::Parent(parent) => {
1021 let mut data = vec![0u8; 64];
1022 data[..32].copy_from_slice(parent.pair.0.as_bytes());
1023 data[32..].copy_from_slice(parent.pair.1.as_bytes());
1024 target.write_all(&data).await.map_err(io::Error::from)?;
1025 }
1026 EncodedItem::Leaf(leaf) => {
1027 target
1028 .write_chunk(leaf.data)
1029 .await
1030 .map_err(io::Error::from)?;
1031 }
1032 EncodedItem::Done => break,
1033 EncodedItem::Error(cause) => return Err(cause.into()),
1034 }
1035 }
1036 Ok(())
1037 }
1038
1039 pub(crate) async fn write_quinn_with_progress(
1041 self,
1042 writer: &mut SendStream,
1043 progress: &mut impl WriteProgress,
1044 hash: &Hash,
1045 index: u64,
1046 ) -> super::ExportBaoResult<()> {
1047 let mut rx = self.inner.await?;
1048 while let Some(item) = rx.recv().await? {
1049 match item {
1050 EncodedItem::Size(size) => {
1051 progress.send_transfer_started(index, hash, size).await;
1052 writer.write_u64_le(size).await?;
1053 progress.log_other_write(8);
1054 }
1055 EncodedItem::Parent(parent) => {
1056 let mut data = vec![0u8; 64];
1057 data[..32].copy_from_slice(parent.pair.0.as_bytes());
1058 data[32..].copy_from_slice(parent.pair.1.as_bytes());
1059 writer.write_all(&data).await.map_err(io::Error::from)?;
1060 progress.log_other_write(64);
1061 }
1062 EncodedItem::Leaf(leaf) => {
1063 let len = leaf.data.len();
1064 writer
1065 .write_chunk(leaf.data)
1066 .await
1067 .map_err(io::Error::from)?;
1068 progress.notify_payload_write(index, leaf.offset, len).await;
1069 }
1070 EncodedItem::Done => break,
1071 EncodedItem::Error(cause) => return Err(cause.into()),
1072 }
1073 }
1074 Ok(())
1075 }
1076
1077 pub fn into_byte_stream(self) -> impl Stream<Item = super::Result<Bytes>> {
1078 self.stream().filter_map(|item| match item {
1079 EncodedItem::Size(size) => {
1080 let size = size.to_le_bytes().to_vec().into();
1081 Some(Ok(size))
1082 }
1083 EncodedItem::Parent(parent) => {
1084 let mut data = vec![0u8; 64];
1085 data[..32].copy_from_slice(parent.pair.0.as_bytes());
1086 data[32..].copy_from_slice(parent.pair.1.as_bytes());
1087 Some(Ok(data.into()))
1088 }
1089 EncodedItem::Leaf(leaf) => Some(Ok(leaf.data)),
1090 EncodedItem::Done => None,
1091 EncodedItem::Error(cause) => Some(Err(super::Error::other(cause))),
1092 })
1093 }
1094
1095 pub fn stream(self) -> impl Stream<Item = EncodedItem> {
1096 Gen::new(|co| async move {
1097 let mut rx = match self.inner.await {
1098 Ok(rx) => rx,
1099 Err(cause) => {
1100 co.yield_(EncodedItem::Error(io::Error::other(cause).into()))
1101 .await;
1102 return;
1103 }
1104 };
1105 while let Ok(Some(item)) = rx.recv().await {
1106 co.yield_(item).await;
1107 }
1108 })
1109 }
1110}
1111
1112pub(crate) trait WriteProgress {
1113 async fn notify_payload_write(&mut self, index: u64, offset: u64, len: usize);
1115
1116 fn log_other_write(&mut self, len: usize);
1118
1119 async fn send_transfer_started(&mut self, index: u64, hash: &Hash, size: u64);
1121}
1122
1123impl WriteProgress for StreamContext {
1124 async fn notify_payload_write(&mut self, index: u64, offset: u64, len: usize) {
1125 StreamContext::notify_payload_write(self, index, offset, len);
1126 }
1127
1128 fn log_other_write(&mut self, len: usize) {
1129 StreamContext::log_other_write(self, len);
1130 }
1131
1132 async fn send_transfer_started(&mut self, index: u64, hash: &Hash, size: u64) {
1133 StreamContext::send_transfer_started(self, index, hash, size).await
1134 }
1135}