1use std::{
8 collections::BTreeMap,
9 future::{Future, IntoFuture},
10 io,
11 num::NonZeroU64,
12 path::{Path, PathBuf},
13 pin::Pin,
14};
15
16pub use bao_tree::io::mixed::EncodedItem;
17use bao_tree::{
18 io::{
19 fsm::{ResponseDecoder, ResponseDecoderNext},
20 BaoContentItem, Leaf,
21 },
22 BaoTree, ChunkNum, ChunkRanges,
23};
24use bytes::Bytes;
25use genawaiter::sync::Gen;
26use iroh_io::AsyncStreamWriter;
27use irpc::channel::{mpsc, oneshot};
28use n0_future::{future, stream, Stream, StreamExt};
29use range_collections::{range_set::RangeSetRange, RangeSet2};
30use ref_cast::RefCast;
31use serde::{Deserialize, Serialize};
32use tracing::trace;
33mod reader;
34pub use reader::BlobReader;
35
36pub use super::proto::{
41 AddProgressItem, Bitfield, BlobDeleteRequest as DeleteOptions, BlobStatus,
42 ExportBaoRequest as ExportBaoOptions, ExportMode, ExportPathRequest as ExportOptions,
43 ExportProgressItem, ExportRangesRequest as ExportRangesOptions,
44 ImportBaoRequest as ImportBaoOptions, ImportMode, ObserveRequest as ObserveOptions,
45};
46use super::{
47 proto::{
48 BatchResponse, BlobStatusRequest, ClearProtectedRequest, CreateTempTagRequest,
49 ExportBaoRequest, ExportRangesItem, ImportBaoRequest, ImportByteStreamRequest,
50 ImportBytesRequest, ImportPathRequest, ListRequest, Scope,
51 },
52 remote::HashSeqChunk,
53 tags::TagInfo,
54 ApiClient, RequestResult, Tags,
55};
56use crate::{
57 api::proto::{BatchRequest, ImportByteStreamUpdate},
58 provider::events::ClientResult,
59 store::IROH_BLOCK_SIZE,
60 util::{temp_tag::TempTag, RecvStreamAsyncStreamReader},
61 BlobFormat, Hash, HashAndFormat,
62};
63
64#[derive(Debug)]
66pub struct AddBytesOptions {
67 pub data: Bytes,
68 pub format: BlobFormat,
69}
70
71impl<T: Into<Bytes>> From<(T, BlobFormat)> for AddBytesOptions {
72 fn from(item: (T, BlobFormat)) -> Self {
73 let (data, format) = item;
74 Self {
75 data: data.into(),
76 format,
77 }
78 }
79}
80
81#[derive(Debug, Clone, ref_cast::RefCast)]
83#[repr(transparent)]
84pub struct Blobs {
85 client: ApiClient,
86}
87
88impl Blobs {
89 pub(crate) fn ref_from_sender(sender: &ApiClient) -> &Self {
90 Self::ref_cast(sender)
91 }
92
93 pub async fn batch(&self) -> irpc::Result<Batch<'_>> {
94 let msg = BatchRequest;
95 trace!("{msg:?}");
96 let (tx, rx) = self.client.client_streaming(msg, 32).await?;
97 let scope = rx.await?;
98
99 Ok(Batch {
100 scope,
101 blobs: self,
102 _tx: tx,
103 })
104 }
105
106 pub fn reader(&self, hash: impl Into<Hash>) -> BlobReader {
127 self.reader_with_opts(ReaderOptions { hash: hash.into() })
128 }
129
130 pub fn reader_with_opts(&self, options: ReaderOptions) -> BlobReader {
135 BlobReader::new(self.clone(), options)
136 }
137
138 pub(crate) async fn delete_with_opts(&self, options: DeleteOptions) -> RequestResult<()> {
146 trace!("{options:?}");
147 self.client.rpc(options).await??;
148 Ok(())
149 }
150
151 pub(crate) async fn delete(
153 &self,
154 hashes: impl IntoIterator<Item = impl Into<Hash>>,
155 ) -> RequestResult<()> {
156 self.delete_with_opts(DeleteOptions {
157 hashes: hashes.into_iter().map(Into::into).collect(),
158 force: false,
159 })
160 .await
161 }
162
163 pub fn add_slice(&self, data: impl AsRef<[u8]>) -> AddProgress<'_> {
164 let options = ImportBytesRequest {
165 data: Bytes::copy_from_slice(data.as_ref()),
166 format: crate::BlobFormat::Raw,
167 scope: Scope::GLOBAL,
168 };
169 self.add_bytes_impl(options)
170 }
171
172 pub fn add_bytes(&self, data: impl Into<bytes::Bytes>) -> AddProgress<'_> {
173 let options = ImportBytesRequest {
174 data: data.into(),
175 format: crate::BlobFormat::Raw,
176 scope: Scope::GLOBAL,
177 };
178 self.add_bytes_impl(options)
179 }
180
181 pub fn add_bytes_with_opts(&self, options: impl Into<AddBytesOptions>) -> AddProgress<'_> {
182 let options = options.into();
183 let request = ImportBytesRequest {
184 data: options.data,
185 format: options.format,
186 scope: Scope::GLOBAL,
187 };
188 self.add_bytes_impl(request)
189 }
190
191 fn add_bytes_impl(&self, options: ImportBytesRequest) -> AddProgress<'_> {
192 trace!("{options:?}");
193 let this = self.clone();
194 let stream = Gen::new(|co| async move {
195 let mut receiver = match this.client.server_streaming(options, 32).await {
196 Ok(receiver) => receiver,
197 Err(cause) => {
198 co.yield_(AddProgressItem::Error(cause.into())).await;
199 return;
200 }
201 };
202 loop {
203 match receiver.recv().await {
204 Ok(Some(item)) => co.yield_(item).await,
205 Err(cause) => {
206 co.yield_(AddProgressItem::Error(cause.into())).await;
207 break;
208 }
209 Ok(None) => break,
210 }
211 }
212 });
213 AddProgress::new(self, stream)
214 }
215
216 pub fn add_path_with_opts(&self, options: impl Into<AddPathOptions>) -> AddProgress<'_> {
217 let options = options.into();
218 self.add_path_with_opts_impl(ImportPathRequest {
219 path: options.path,
220 mode: options.mode,
221 format: options.format,
222 scope: Scope::GLOBAL,
223 })
224 }
225
226 fn add_path_with_opts_impl(&self, options: ImportPathRequest) -> AddProgress<'_> {
227 trace!("{:?}", options);
228 let client = self.client.clone();
229 let stream = Gen::new(|co| async move {
230 let mut receiver = match client.server_streaming(options, 32).await {
231 Ok(receiver) => receiver,
232 Err(cause) => {
233 co.yield_(AddProgressItem::Error(cause.into())).await;
234 return;
235 }
236 };
237 loop {
238 match receiver.recv().await {
239 Ok(Some(item)) => co.yield_(item).await,
240 Err(cause) => {
241 co.yield_(AddProgressItem::Error(cause.into())).await;
242 break;
243 }
244 Ok(None) => break,
245 }
246 }
247 });
248 AddProgress::new(self, stream)
249 }
250
251 pub fn add_path(&self, path: impl AsRef<Path>) -> AddProgress<'_> {
252 self.add_path_with_opts(AddPathOptions {
253 path: path.as_ref().to_owned(),
254 mode: ImportMode::Copy,
255 format: BlobFormat::Raw,
256 })
257 }
258
259 pub async fn add_stream(
260 &self,
261 data: impl Stream<Item = io::Result<Bytes>> + Send + Sync + 'static,
262 ) -> AddProgress<'_> {
263 let inner = ImportByteStreamRequest {
264 format: crate::BlobFormat::Raw,
265 scope: Scope::default(),
266 };
267 let client = self.client.clone();
268 let stream = Gen::new(|co| async move {
269 let (sender, mut receiver) = match client.bidi_streaming(inner, 32, 32).await {
270 Ok(x) => x,
271 Err(cause) => {
272 co.yield_(AddProgressItem::Error(cause.into())).await;
273 return;
274 }
275 };
276 let recv = async {
277 loop {
278 match receiver.recv().await {
279 Ok(Some(item)) => co.yield_(item).await,
280 Err(cause) => {
281 co.yield_(AddProgressItem::Error(cause.into())).await;
282 break;
283 }
284 Ok(None) => break,
285 }
286 }
287 };
288 let send = async {
289 tokio::pin!(data);
290 while let Some(item) = data.next().await {
291 sender.send(ImportByteStreamUpdate::Bytes(item?)).await?;
292 }
293 sender.send(ImportByteStreamUpdate::Done).await?;
294 anyhow::Ok(())
295 };
296 let _ = tokio::join!(send, recv);
297 });
298 AddProgress::new(self, stream)
299 }
300
301 pub fn export_ranges(
302 &self,
303 hash: impl Into<Hash>,
304 ranges: impl Into<RangeSet2<u64>>,
305 ) -> ExportRangesProgress {
306 self.export_ranges_with_opts(ExportRangesOptions {
307 hash: hash.into(),
308 ranges: ranges.into(),
309 })
310 }
311
312 pub fn export_ranges_with_opts(&self, options: ExportRangesOptions) -> ExportRangesProgress {
313 trace!("{options:?}");
314 ExportRangesProgress::new(
315 options.ranges.clone(),
316 self.client.server_streaming(options, 32),
317 )
318 }
319
320 pub fn export_bao_with_opts(
321 &self,
322 options: ExportBaoOptions,
323 local_update_cap: usize,
324 ) -> ExportBaoProgress {
325 trace!("{options:?}");
326 ExportBaoProgress::new(self.client.server_streaming(options, local_update_cap))
327 }
328
329 pub fn export_bao(
330 &self,
331 hash: impl Into<Hash>,
332 ranges: impl Into<ChunkRanges>,
333 ) -> ExportBaoProgress {
334 self.export_bao_with_opts(
335 ExportBaoRequest {
336 hash: hash.into(),
337 ranges: ranges.into(),
338 },
339 32,
340 )
341 }
342
343 pub async fn export_chunk(
345 &self,
346 hash: impl Into<Hash>,
347 offset: u64,
348 ) -> super::ExportBaoResult<Leaf> {
349 let base = ChunkNum::full_chunks(offset);
350 let ranges = ChunkRanges::from(base..base + 1);
351 let mut stream = self.export_bao(hash, ranges).stream();
352 while let Some(item) = stream.next().await {
353 match item {
354 EncodedItem::Leaf(leaf) => return Ok(leaf),
355 EncodedItem::Parent(_) => {}
356 EncodedItem::Size(_) => {}
357 EncodedItem::Done => break,
358 EncodedItem::Error(cause) => return Err(cause.into()),
359 }
360 }
361 Err(io::Error::other("unexpected end of stream").into())
362 }
363
364 pub async fn get_bytes(&self, hash: impl Into<Hash>) -> super::ExportBaoResult<Bytes> {
368 self.export_bao(hash.into(), ChunkRanges::all())
369 .data_to_bytes()
370 .await
371 }
372
373 pub fn observe(&self, hash: impl Into<Hash>) -> ObserveProgress {
375 self.observe_with_opts(ObserveOptions { hash: hash.into() })
376 }
377
378 pub fn observe_with_opts(&self, options: ObserveOptions) -> ObserveProgress {
379 trace!("{:?}", options);
380 if options.hash == Hash::EMPTY {
381 return ObserveProgress::new(async move {
382 let (tx, rx) = mpsc::channel(1);
383 tx.send(Bitfield::complete(0)).await.ok();
384 Ok(rx)
385 });
386 }
387 ObserveProgress::new(self.client.server_streaming(options, 32))
388 }
389
390 pub fn export_with_opts(&self, options: ExportOptions) -> ExportProgress {
391 trace!("{:?}", options);
392 ExportProgress::new(self.client.server_streaming(options, 32))
393 }
394
395 pub fn export(&self, hash: impl Into<Hash>, target: impl AsRef<Path>) -> ExportProgress {
396 let options = ExportOptions {
397 hash: hash.into(),
398 mode: ExportMode::Copy,
399 target: target.as_ref().to_owned(),
400 };
401 self.export_with_opts(options)
402 }
403
404 #[cfg_attr(feature = "hide-proto-docs", doc(hidden))]
408 pub async fn import_bao(
409 &self,
410 hash: impl Into<Hash>,
411 size: NonZeroU64,
412 local_update_cap: usize,
413 ) -> irpc::Result<ImportBaoHandle> {
414 let options = ImportBaoRequest {
415 hash: hash.into(),
416 size,
417 };
418 self.import_bao_with_opts(options, local_update_cap).await
419 }
420
421 #[cfg_attr(feature = "hide-proto-docs", doc(hidden))]
422 pub async fn import_bao_with_opts(
423 &self,
424 options: ImportBaoOptions,
425 local_update_cap: usize,
426 ) -> irpc::Result<ImportBaoHandle> {
427 trace!("{:?}", options);
428 ImportBaoHandle::new(self.client.client_streaming(options, local_update_cap)).await
429 }
430
431 #[cfg_attr(feature = "hide-proto-docs", doc(hidden))]
432 pub async fn import_bao_reader<R: crate::util::RecvStream>(
433 &self,
434 hash: Hash,
435 ranges: ChunkRanges,
436 mut reader: R,
437 ) -> RequestResult<R> {
438 let mut size = [0; 8];
439 reader
440 .recv_exact(&mut size)
441 .await
442 .map_err(super::Error::other)?;
443 let size = u64::from_le_bytes(size);
444 let Some(size) = NonZeroU64::new(size) else {
445 return if hash == Hash::EMPTY {
446 Ok(reader)
447 } else {
448 Err(super::Error::other("invalid size for hash").into())
449 };
450 };
451 let tree = BaoTree::new(size.get(), IROH_BLOCK_SIZE);
452 let mut decoder = ResponseDecoder::new(
453 hash.into(),
454 ranges,
455 tree,
456 RecvStreamAsyncStreamReader::new(reader),
457 );
458 let options = ImportBaoOptions { hash, size };
459 let handle = self.import_bao_with_opts(options, 32).await?;
460 let driver = async move {
461 let reader = loop {
462 match decoder.next().await {
463 ResponseDecoderNext::More((rest, item)) => {
464 handle.tx.send(item?).await?;
465 decoder = rest;
466 }
467 ResponseDecoderNext::Done(reader) => break reader,
468 };
469 };
470 drop(handle.tx);
471 io::Result::Ok(reader)
472 };
473 let fut = async move { handle.rx.await.map_err(io::Error::other)? };
474 let (reader, res) = tokio::join!(driver, fut);
475 res?;
476 Ok(reader?.into_inner())
477 }
478
479 #[cfg_attr(feature = "hide-proto-docs", doc(hidden))]
480 pub async fn import_bao_bytes(
481 &self,
482 hash: Hash,
483 ranges: ChunkRanges,
484 data: impl Into<Bytes>,
485 ) -> RequestResult<()> {
486 self.import_bao_reader(hash, ranges, data.into()).await?;
487 Ok(())
488 }
489
490 pub fn list(&self) -> BlobsListProgress {
491 let msg = ListRequest;
492 let client = self.client.clone();
493 BlobsListProgress::new(client.server_streaming(msg, 32))
494 }
495
496 pub async fn status(&self, hash: impl Into<Hash>) -> irpc::Result<BlobStatus> {
497 let hash = hash.into();
498 let msg = BlobStatusRequest { hash };
499 self.client.rpc(msg).await
500 }
501
502 pub async fn has(&self, hash: impl Into<Hash>) -> irpc::Result<bool> {
503 match self.status(hash).await? {
504 BlobStatus::Complete { .. } => Ok(true),
505 _ => Ok(false),
506 }
507 }
508
509 #[allow(dead_code)]
510 pub(crate) async fn clear_protected(&self) -> RequestResult<()> {
511 let msg = ClearProtectedRequest;
512 self.client.rpc(msg).await??;
513 Ok(())
514 }
515}
516
517pub struct BatchAddProgress<'a>(AddProgress<'a>);
519
520impl<'a> IntoFuture for BatchAddProgress<'a> {
521 type Output = RequestResult<TempTag>;
522
523 type IntoFuture = Pin<Box<dyn Future<Output = Self::Output> + Send + 'a>>;
524
525 fn into_future(self) -> Self::IntoFuture {
526 Box::pin(self.temp_tag())
527 }
528}
529
530impl<'a> BatchAddProgress<'a> {
531 pub async fn with_named_tag(self, name: impl AsRef<[u8]>) -> RequestResult<HashAndFormat> {
532 self.0.with_named_tag(name).await
533 }
534
535 pub async fn with_tag(self) -> RequestResult<TagInfo> {
536 self.0.with_tag().await
537 }
538
539 pub async fn stream(self) -> impl Stream<Item = AddProgressItem> {
540 self.0.stream().await
541 }
542
543 pub async fn temp_tag(self) -> RequestResult<TempTag> {
544 self.0.temp_tag().await
545 }
546}
547
548pub struct Batch<'a> {
550 scope: Scope,
551 blobs: &'a Blobs,
552 _tx: mpsc::Sender<BatchResponse>,
553}
554
555impl<'a> Batch<'a> {
556 pub fn add_bytes(&self, data: impl Into<Bytes>) -> BatchAddProgress<'_> {
557 let options = ImportBytesRequest {
558 data: data.into(),
559 format: crate::BlobFormat::Raw,
560 scope: self.scope,
561 };
562 BatchAddProgress(self.blobs.add_bytes_impl(options))
563 }
564
565 pub fn add_bytes_with_opts(&self, options: impl Into<AddBytesOptions>) -> BatchAddProgress<'_> {
566 let options = options.into();
567 BatchAddProgress(self.blobs.add_bytes_impl(ImportBytesRequest {
568 data: options.data,
569 format: options.format,
570 scope: self.scope,
571 }))
572 }
573
574 pub fn add_slice(&self, data: impl AsRef<[u8]>) -> BatchAddProgress<'_> {
575 let options = ImportBytesRequest {
576 data: Bytes::copy_from_slice(data.as_ref()),
577 format: crate::BlobFormat::Raw,
578 scope: self.scope,
579 };
580 BatchAddProgress(self.blobs.add_bytes_impl(options))
581 }
582
583 pub fn add_path_with_opts(&self, options: impl Into<AddPathOptions>) -> BatchAddProgress<'_> {
584 let options = options.into();
585 BatchAddProgress(self.blobs.add_path_with_opts_impl(ImportPathRequest {
586 path: options.path,
587 mode: options.mode,
588 format: options.format,
589 scope: self.scope,
590 }))
591 }
592
593 pub async fn temp_tag(&self, value: impl Into<HashAndFormat>) -> irpc::Result<TempTag> {
594 let value = value.into();
595 let msg = CreateTempTagRequest {
596 scope: self.scope,
597 value,
598 };
599 self.blobs.client.rpc(msg).await
600 }
601}
602
603#[derive(Debug)]
605pub struct AddPathOptions {
606 pub path: PathBuf,
607 pub format: BlobFormat,
608 pub mode: ImportMode,
609}
610
611pub struct AddProgress<'a> {
622 blobs: &'a Blobs,
623 inner: stream::Boxed<AddProgressItem>,
624}
625
626impl<'a> IntoFuture for AddProgress<'a> {
627 type Output = RequestResult<TagInfo>;
628
629 type IntoFuture = Pin<Box<dyn Future<Output = Self::Output> + Send + 'a>>;
630
631 fn into_future(self) -> Self::IntoFuture {
632 Box::pin(self.with_tag())
633 }
634}
635
636impl<'a> AddProgress<'a> {
637 fn new(blobs: &'a Blobs, stream: impl Stream<Item = AddProgressItem> + Send + 'static) -> Self {
638 Self {
639 blobs,
640 inner: Box::pin(stream),
641 }
642 }
643
644 pub async fn temp_tag(self) -> RequestResult<TempTag> {
645 let mut stream = self.inner;
646 while let Some(item) = stream.next().await {
647 match item {
648 AddProgressItem::Done(tt) => return Ok(tt),
649 AddProgressItem::Error(e) => return Err(e.into()),
650 _ => {}
651 }
652 }
653 Err(super::Error::other("unexpected end of stream").into())
654 }
655
656 pub async fn with_named_tag(self, name: impl AsRef<[u8]>) -> RequestResult<HashAndFormat> {
657 let blobs = self.blobs.clone();
658 let tt = self.temp_tag().await?;
659 let haf = tt.hash_and_format();
660 let tags = Tags::ref_from_sender(&blobs.client);
661 tags.set(name, haf).await?;
662 drop(tt);
663 Ok(haf)
664 }
665
666 pub async fn with_tag(self) -> RequestResult<TagInfo> {
667 let blobs = self.blobs.clone();
668 let tt = self.temp_tag().await?;
669 let hash = tt.hash();
670 let format = tt.format();
671 let tags = Tags::ref_from_sender(&blobs.client);
672 let name = tags.create(tt.hash_and_format()).await?;
673 drop(tt);
674 Ok(TagInfo { name, hash, format })
675 }
676
677 pub async fn stream(self) -> impl Stream<Item = AddProgressItem> {
678 self.inner
679 }
680}
681
682#[derive(Debug, Clone, Serialize, Deserialize)]
684pub struct ReaderOptions {
685 pub hash: Hash,
686}
687
688pub struct ObserveProgress {
693 inner: future::Boxed<irpc::Result<mpsc::Receiver<Bitfield>>>,
694}
695
696impl IntoFuture for ObserveProgress {
697 type Output = RequestResult<Bitfield>;
698
699 type IntoFuture = Pin<Box<dyn Future<Output = Self::Output> + Send>>;
700
701 fn into_future(self) -> Self::IntoFuture {
702 Box::pin(async move {
703 let mut rx = self.inner.await?;
704 match rx.recv().await? {
705 Some(bitfield) => Ok(bitfield),
706 None => Err(super::Error::other("unexpected end of stream").into()),
707 }
708 })
709 }
710}
711
712impl ObserveProgress {
713 fn new(
714 fut: impl Future<Output = irpc::Result<mpsc::Receiver<Bitfield>>> + Send + 'static,
715 ) -> Self {
716 Self {
717 inner: Box::pin(fut),
718 }
719 }
720
721 pub async fn await_completion(self) -> RequestResult<Bitfield> {
722 let mut stream = self.stream().await?;
723 while let Some(item) = stream.next().await {
724 if item.is_complete() {
725 return Ok(item);
726 }
727 }
728 Err(super::Error::other("unexpected end of stream").into())
729 }
730
731 pub async fn stream(self) -> irpc::Result<impl Stream<Item = Bitfield>> {
736 let mut rx = self.inner.await?;
737 Ok(Gen::new(|co| async move {
738 while let Ok(Some(item)) = rx.recv().await {
739 co.yield_(item).await;
740 }
741 }))
742 }
743}
744
745pub struct ExportProgress {
756 inner: future::Boxed<irpc::Result<mpsc::Receiver<ExportProgressItem>>>,
757}
758
759impl IntoFuture for ExportProgress {
760 type Output = RequestResult<u64>;
761
762 type IntoFuture = Pin<Box<dyn Future<Output = Self::Output> + Send>>;
763
764 fn into_future(self) -> Self::IntoFuture {
765 Box::pin(self.finish())
766 }
767}
768
769impl ExportProgress {
770 fn new(
771 fut: impl Future<Output = irpc::Result<mpsc::Receiver<ExportProgressItem>>> + Send + 'static,
772 ) -> Self {
773 Self {
774 inner: Box::pin(fut),
775 }
776 }
777
778 pub async fn stream(self) -> impl Stream<Item = ExportProgressItem> {
779 Gen::new(|co| async move {
780 let mut rx = match self.inner.await {
781 Ok(rx) => rx,
782 Err(e) => {
783 co.yield_(ExportProgressItem::Error(e.into())).await;
784 return;
785 }
786 };
787 while let Ok(Some(item)) = rx.recv().await {
788 co.yield_(item).await;
789 }
790 })
791 }
792
793 pub async fn finish(self) -> RequestResult<u64> {
794 let mut rx = self.inner.await?;
795 let mut size = None;
796 loop {
797 match rx.recv().await? {
798 Some(ExportProgressItem::Done) => break,
799 Some(ExportProgressItem::Size(s)) => size = Some(s),
800 Some(ExportProgressItem::Error(cause)) => return Err(cause.into()),
801 _ => {}
802 }
803 }
804 if let Some(size) = size {
805 Ok(size)
806 } else {
807 Err(super::Error::other("unexpected end of stream").into())
808 }
809 }
810}
811
812pub struct ImportBaoHandle {
814 pub tx: mpsc::Sender<BaoContentItem>,
815 pub rx: oneshot::Receiver<super::Result<()>>,
816}
817
818impl ImportBaoHandle {
819 pub(crate) async fn new(
820 fut: impl Future<
821 Output = irpc::Result<(
822 mpsc::Sender<BaoContentItem>,
823 oneshot::Receiver<super::Result<()>>,
824 )>,
825 > + Send
826 + 'static,
827 ) -> irpc::Result<Self> {
828 let (tx, rx) = fut.await?;
829 Ok(Self { tx, rx })
830 }
831}
832
833pub struct BlobsListProgress {
835 inner: future::Boxed<irpc::Result<mpsc::Receiver<super::Result<Hash>>>>,
836}
837
838impl BlobsListProgress {
839 fn new(
840 fut: impl Future<Output = irpc::Result<mpsc::Receiver<super::Result<Hash>>>> + Send + 'static,
841 ) -> Self {
842 Self {
843 inner: Box::pin(fut),
844 }
845 }
846
847 pub async fn hashes(self) -> RequestResult<Vec<Hash>> {
848 let mut rx: mpsc::Receiver<Result<Hash, super::Error>> = self.inner.await?;
849 let mut hashes = Vec::new();
850 while let Some(item) = rx.recv().await? {
851 hashes.push(item?);
852 }
853 Ok(hashes)
854 }
855
856 pub async fn stream(self) -> irpc::Result<impl Stream<Item = super::Result<Hash>>> {
857 let mut rx = self.inner.await?;
858 Ok(Gen::new(|co| async move {
859 while let Ok(Some(item)) = rx.recv().await {
860 co.yield_(item).await;
861 }
862 }))
863 }
864}
865
866pub struct ExportRangesProgress {
874 ranges: RangeSet2<u64>,
875 inner: future::Boxed<irpc::Result<mpsc::Receiver<ExportRangesItem>>>,
876}
877
878impl ExportRangesProgress {
879 fn new(
880 ranges: RangeSet2<u64>,
881 fut: impl Future<Output = irpc::Result<mpsc::Receiver<ExportRangesItem>>> + Send + 'static,
882 ) -> Self {
883 Self {
884 ranges,
885 inner: Box::pin(fut),
886 }
887 }
888}
889
890impl ExportRangesProgress {
891 pub fn stream(self) -> impl Stream<Item = ExportRangesItem> {
898 Gen::new(|co| async move {
899 let mut rx = match self.inner.await {
900 Ok(rx) => rx,
901 Err(e) => {
902 co.yield_(ExportRangesItem::Error(e.into())).await;
903 return;
904 }
905 };
906 while let Ok(Some(item)) = rx.recv().await {
907 co.yield_(item).await;
908 }
909 })
910 }
911
912 pub async fn concatenate(self) -> RequestResult<Vec<u8>> {
914 let mut rx = self.inner.await?;
915 let mut data = BTreeMap::new();
916 while let Some(item) = rx.recv().await? {
917 match item {
918 ExportRangesItem::Size(_) => {}
919 ExportRangesItem::Data(leaf) => {
920 data.insert(leaf.offset, leaf.data);
921 }
922 ExportRangesItem::Error(cause) => return Err(cause.into()),
923 }
924 }
925 let mut res = Vec::new();
926 for range in self.ranges.iter() {
927 let (start, end) = match range {
928 RangeSetRange::RangeFrom(range) => (*range.start, u64::MAX),
929 RangeSetRange::Range(range) => (*range.start, *range.end),
930 };
931 for (offset, data) in data.iter() {
932 let cstart = *offset;
933 let cend = *offset + (data.len() as u64);
934 if cstart >= end || cend <= start {
935 continue;
936 }
937 let start = start.max(cstart);
938 let end = end.min(cend);
939 let data = &data[(start - cstart) as usize..(end - cstart) as usize];
940 res.extend_from_slice(data);
941 }
942 }
943 Ok(res)
944 }
945}
946
947pub struct ExportBaoProgress {
955 inner: future::Boxed<irpc::Result<mpsc::Receiver<EncodedItem>>>,
956}
957
958impl ExportBaoProgress {
959 fn new(
960 fut: impl Future<Output = irpc::Result<mpsc::Receiver<EncodedItem>>> + Send + 'static,
961 ) -> Self {
962 Self {
963 inner: Box::pin(fut),
964 }
965 }
966
967 pub fn hashes_with_index(
974 self,
975 ) -> impl Stream<Item = std::result::Result<(u64, Hash), anyhow::Error>> {
976 let mut stream = self.stream();
977 Gen::new(|co| async move {
978 while let Some(item) = stream.next().await {
979 let leaf = match item {
980 EncodedItem::Leaf(leaf) => leaf,
981 EncodedItem::Error(e) => {
982 co.yield_(Err(e.into())).await;
983 continue;
984 }
985 _ => continue,
986 };
987 let slice = match HashSeqChunk::try_from(leaf) {
988 Ok(slice) => slice,
989 Err(e) => {
990 co.yield_(Err(e)).await;
991 continue;
992 }
993 };
994 let offset = slice.base();
995 for (o, hash) in slice.into_iter().enumerate() {
996 co.yield_(Ok((offset + o as u64, hash))).await;
997 }
998 }
999 })
1000 }
1001
1002 pub fn hashes(self) -> impl Stream<Item = std::result::Result<Hash, anyhow::Error>> {
1004 self.hashes_with_index().map(|x| x.map(|(_, hash)| hash))
1005 }
1006
1007 pub async fn bao_to_vec(self) -> RequestResult<Vec<u8>> {
1008 let mut data = Vec::new();
1009 let mut stream = self.into_byte_stream();
1010 while let Some(item) = stream.next().await {
1011 data.extend_from_slice(&item?);
1012 }
1013 Ok(data)
1014 }
1015
1016 pub async fn data_to_bytes(self) -> super::ExportBaoResult<Bytes> {
1017 let mut rx = self.inner.await?;
1018 let mut data = Vec::new();
1019 while let Some(item) = rx.recv().await? {
1020 match item {
1021 EncodedItem::Leaf(leaf) => {
1022 data.push(leaf.data);
1023 }
1024 EncodedItem::Parent(_) => {}
1025 EncodedItem::Size(_) => {}
1026 EncodedItem::Done => break,
1027 EncodedItem::Error(cause) => return Err(cause.into()),
1028 }
1029 }
1030 if data.len() == 1 {
1031 Ok(data.pop().unwrap())
1032 } else {
1033 let mut out = Vec::new();
1034 for item in data {
1035 out.extend_from_slice(&item);
1036 }
1037 Ok(out.into())
1038 }
1039 }
1040
1041 pub async fn data_to_vec(self) -> super::ExportBaoResult<Vec<u8>> {
1042 let mut rx = self.inner.await?;
1043 let mut data = Vec::new();
1044 while let Some(item) = rx.recv().await? {
1045 match item {
1046 EncodedItem::Leaf(leaf) => {
1047 data.extend_from_slice(&leaf.data);
1048 }
1049 EncodedItem::Parent(_) => {}
1050 EncodedItem::Size(_) => {}
1051 EncodedItem::Done => break,
1052 EncodedItem::Error(cause) => return Err(cause.into()),
1053 }
1054 }
1055 Ok(data)
1056 }
1057
1058 pub async fn write<W: AsyncStreamWriter>(self, target: &mut W) -> super::ExportBaoResult<()> {
1059 let mut rx = self.inner.await?;
1060 while let Some(item) = rx.recv().await? {
1061 match item {
1062 EncodedItem::Size(size) => {
1063 target.write(&size.to_le_bytes()).await?;
1064 }
1065 EncodedItem::Parent(parent) => {
1066 let mut data = vec![0u8; 64];
1067 data[..32].copy_from_slice(parent.pair.0.as_bytes());
1068 data[32..].copy_from_slice(parent.pair.1.as_bytes());
1069 target.write(&data).await?;
1070 }
1071 EncodedItem::Leaf(leaf) => {
1072 target.write_bytes(leaf.data).await?;
1073 }
1074 EncodedItem::Done => break,
1075 EncodedItem::Error(cause) => return Err(cause.into()),
1076 }
1077 }
1078 Ok(())
1079 }
1080
1081 pub(crate) async fn write_with_progress<W: crate::util::SendStream>(
1083 self,
1084 writer: &mut W,
1085 progress: &mut impl WriteProgress,
1086 hash: &Hash,
1087 index: u64,
1088 ) -> super::ExportBaoResult<()> {
1089 let mut rx = self.inner.await?;
1090 while let Some(item) = rx.recv().await? {
1091 match item {
1092 EncodedItem::Size(size) => {
1093 progress.send_transfer_started(index, hash, size).await;
1094 writer.send(&size.to_le_bytes()).await?;
1095 progress.log_other_write(8);
1096 }
1097 EncodedItem::Parent(parent) => {
1098 let mut data = [0u8; 64];
1099 data[..32].copy_from_slice(parent.pair.0.as_bytes());
1100 data[32..].copy_from_slice(parent.pair.1.as_bytes());
1101 writer.send(&data).await?;
1102 progress.log_other_write(64);
1103 }
1104 EncodedItem::Leaf(leaf) => {
1105 let len = leaf.data.len();
1106 writer.send_bytes(leaf.data).await?;
1107 progress
1108 .notify_payload_write(index, leaf.offset, len)
1109 .await?;
1110 }
1111 EncodedItem::Done => break,
1112 EncodedItem::Error(cause) => return Err(cause.into()),
1113 }
1114 }
1115 Ok(())
1116 }
1117
1118 pub fn into_byte_stream(self) -> impl Stream<Item = super::Result<Bytes>> {
1119 self.stream().filter_map(|item| match item {
1120 EncodedItem::Size(size) => {
1121 let size = size.to_le_bytes().to_vec().into();
1122 Some(Ok(size))
1123 }
1124 EncodedItem::Parent(parent) => {
1125 let mut data = vec![0u8; 64];
1126 data[..32].copy_from_slice(parent.pair.0.as_bytes());
1127 data[32..].copy_from_slice(parent.pair.1.as_bytes());
1128 Some(Ok(data.into()))
1129 }
1130 EncodedItem::Leaf(leaf) => Some(Ok(leaf.data)),
1131 EncodedItem::Done => None,
1132 EncodedItem::Error(cause) => Some(Err(cause.into())),
1133 })
1134 }
1135
1136 pub fn stream(self) -> impl Stream<Item = EncodedItem> {
1137 Gen::new(|co| async move {
1138 let mut rx = match self.inner.await {
1139 Ok(rx) => rx,
1140 Err(cause) => {
1141 co.yield_(EncodedItem::Error(io::Error::other(cause).into()))
1142 .await;
1143 return;
1144 }
1145 };
1146 while let Ok(Some(item)) = rx.recv().await {
1147 co.yield_(item).await;
1148 }
1149 })
1150 }
1151}
1152
1153pub(crate) trait WriteProgress {
1154 async fn notify_payload_write(&mut self, index: u64, offset: u64, len: usize) -> ClientResult;
1156
1157 fn log_other_write(&mut self, len: usize);
1159
1160 async fn send_transfer_started(&mut self, index: u64, hash: &Hash, size: u64);
1162}