1use std::{
60 future::Future,
61 io,
62 path::PathBuf,
63 pin::Pin,
64 sync::Arc,
65 task::{Context, Poll},
66};
67
68use anyhow::{anyhow, Context as _, Result};
69use bytes::Bytes;
70use futures_lite::{Stream, StreamExt};
71use futures_util::SinkExt;
72use genawaiter::sync::{Co, Gen};
73use iroh::NodeAddr;
74use portable_atomic::{AtomicU64, Ordering};
75use quic_rpc::{
76 client::{BoxStreamSync, BoxedConnector},
77 transport::boxed::BoxableConnector,
78 Connector, RpcClient,
79};
80use serde::{Deserialize, Serialize};
81use tokio::io::{AsyncRead, AsyncReadExt, ReadBuf};
82use tokio_util::io::{ReaderStream, StreamReader};
83use tracing::warn;
84
85pub use crate::net_protocol::DownloadMode;
86use crate::{
87 export::ExportProgress as BytesExportProgress,
88 format::collection::{Collection, SimpleStore},
89 get::db::DownloadProgress as BytesDownloadProgress,
90 net_protocol::BlobDownloadRequest,
91 rpc::proto::{Request, Response, RpcService},
92 store::{BaoBlobSize, ConsistencyCheckProgress, ExportFormat, ExportMode, ValidateProgress},
93 util::SetTagOption,
94 BlobFormat, Hash, Tag,
95};
96
97mod batch;
98pub use batch::{AddDirOpts, AddFileOpts, AddReaderOpts, Batch};
99
100use super::{flatten, tags};
101use crate::rpc::proto::blobs::{
102 AddPathRequest, AddStreamRequest, AddStreamUpdate, BatchCreateRequest, BatchCreateResponse,
103 BlobStatusRequest, ConsistencyCheckRequest, CreateCollectionRequest, CreateCollectionResponse,
104 DeleteRequest, ExportRequest, ListIncompleteRequest, ListRequest, ReadAtRequest,
105 ReadAtResponse, ValidateRequest,
106};
107
108#[derive(Debug, Clone)]
110#[repr(transparent)]
111pub struct Client<C = BoxedConnector<RpcService>> {
112 pub(crate) rpc: RpcClient<RpcService, C>,
113}
114
115pub type MemClient = Client<crate::rpc::MemConnector>;
117
118impl<C> Client<C>
119where
120 C: Connector<RpcService>,
121{
122 pub fn new(rpc: RpcClient<RpcService, C>) -> Self {
124 Self { rpc }
125 }
126
127 pub fn boxed(&self) -> Client<BoxedConnector<RpcService>>
129 where
130 C: BoxableConnector<Response, Request>,
131 {
132 Client::new(self.rpc.clone().boxed())
133 }
134
135 pub fn tags(&self) -> tags::Client<C> {
137 tags::Client::new(self.rpc.clone())
138 }
139
140 pub async fn status(&self, hash: Hash) -> Result<BlobStatus> {
145 let status = self.rpc.rpc(BlobStatusRequest { hash }).await??;
146 Ok(status.0)
147 }
148
149 pub async fn has(&self, hash: Hash) -> Result<bool> {
153 match self.status(hash).await {
154 Ok(BlobStatus::Complete { .. }) => Ok(true),
155 Ok(_) => Ok(false),
156 Err(err) => Err(err),
157 }
158 }
159
160 pub async fn batch(&self) -> Result<Batch<C>> {
166 let (updates, mut stream) = self.rpc.bidi(BatchCreateRequest).await?;
167 let BatchCreateResponse::Id(batch) = stream.next().await.context("expected scope id")??;
168 let rpc = self.rpc.clone();
169 Ok(Batch::new(batch, rpc, updates, 1024))
170 }
171
172 pub async fn read(&self, hash: Hash) -> Result<Reader> {
176 Reader::from_rpc_read(&self.rpc, hash).await
177 }
178
179 pub async fn read_at(&self, hash: Hash, offset: u64, len: ReadAtLen) -> Result<Reader> {
183 Reader::from_rpc_read_at(&self.rpc, hash, offset, len).await
184 }
185
186 pub async fn read_to_bytes(&self, hash: Hash) -> Result<Bytes> {
192 Reader::from_rpc_read(&self.rpc, hash)
193 .await?
194 .read_to_bytes()
195 .await
196 }
197
198 pub async fn read_at_to_bytes(&self, hash: Hash, offset: u64, len: ReadAtLen) -> Result<Bytes> {
202 Reader::from_rpc_read_at(&self.rpc, hash, offset, len)
203 .await?
204 .read_to_bytes()
205 .await
206 }
207
208 pub async fn add_from_path(
215 &self,
216 path: PathBuf,
217 in_place: bool,
218 tag: SetTagOption,
219 wrap: WrapOption,
220 ) -> Result<AddProgress> {
221 let stream = self
222 .rpc
223 .server_streaming(AddPathRequest {
224 path,
225 in_place,
226 tag,
227 wrap,
228 })
229 .await?;
230 Ok(AddProgress::new(stream))
231 }
232
233 pub async fn create_collection(
238 &self,
239 collection: Collection,
240 tag: SetTagOption,
241 tags_to_delete: Vec<Tag>,
242 ) -> anyhow::Result<(Hash, Tag)> {
243 let CreateCollectionResponse { hash, tag } = self
244 .rpc
245 .rpc(CreateCollectionRequest {
246 collection,
247 tag,
248 tags_to_delete,
249 })
250 .await??;
251 Ok((hash, tag))
252 }
253
254 pub async fn add_reader(
256 &self,
257 reader: impl AsyncRead + Unpin + Send + 'static,
258 tag: SetTagOption,
259 ) -> anyhow::Result<AddProgress> {
260 const CAP: usize = 1024 * 64; let input = ReaderStream::with_capacity(reader, CAP);
262 self.add_stream(input, tag).await
263 }
264
265 pub async fn add_stream(
267 &self,
268 input: impl Stream<Item = io::Result<Bytes>> + Send + Unpin + 'static,
269 tag: SetTagOption,
270 ) -> anyhow::Result<AddProgress> {
271 let (mut sink, progress) = self.rpc.bidi(AddStreamRequest { tag }).await?;
272 let mut input = input.map(|chunk| match chunk {
273 Ok(chunk) => Ok(AddStreamUpdate::Chunk(chunk)),
274 Err(err) => {
275 warn!("Abort send, reason: failed to read from source stream: {err:?}");
276 Ok(AddStreamUpdate::Abort)
277 }
278 });
279 tokio::spawn(async move {
280 if let Err(err) = sink.send_all(&mut input).await {
281 sink.send(AddStreamUpdate::Abort).await.ok();
284 warn!("Failed to send input stream to remote: {err:?}");
285 }
286 });
287
288 Ok(AddProgress::new(progress))
289 }
290
291 pub async fn add_bytes(&self, bytes: impl Into<Bytes>) -> anyhow::Result<AddOutcome> {
293 let input = chunked_bytes_stream(bytes.into(), 1024 * 64).map(Ok);
294 self.add_stream(input, SetTagOption::Auto).await?.await
295 }
296
297 pub async fn add_bytes_named(
299 &self,
300 bytes: impl Into<Bytes>,
301 name: impl Into<Tag>,
302 ) -> anyhow::Result<AddOutcome> {
303 let input = chunked_bytes_stream(bytes.into(), 1024 * 64).map(Ok);
304 self.add_stream(input, SetTagOption::Named(name.into()))
305 .await?
306 .await
307 }
308
309 pub async fn validate(
313 &self,
314 repair: bool,
315 ) -> Result<impl Stream<Item = Result<ValidateProgress>>> {
316 let stream = self
317 .rpc
318 .server_streaming(ValidateRequest { repair })
319 .await?;
320 Ok(stream.map(|res| res.map_err(anyhow::Error::from)))
321 }
322
323 pub async fn consistency_check(
327 &self,
328 repair: bool,
329 ) -> Result<impl Stream<Item = Result<ConsistencyCheckProgress>>> {
330 let stream = self
331 .rpc
332 .server_streaming(ConsistencyCheckRequest { repair })
333 .await?;
334 Ok(stream.map(|r| r.map_err(anyhow::Error::from)))
335 }
336
337 pub async fn download(&self, hash: Hash, node: NodeAddr) -> Result<DownloadProgress> {
339 self.download_with_opts(
340 hash,
341 DownloadOptions {
342 format: BlobFormat::Raw,
343 nodes: vec![node],
344 tag: SetTagOption::Auto,
345 mode: DownloadMode::Queued,
346 },
347 )
348 .await
349 }
350
351 pub async fn download_hash_seq(&self, hash: Hash, node: NodeAddr) -> Result<DownloadProgress> {
353 self.download_with_opts(
354 hash,
355 DownloadOptions {
356 format: BlobFormat::HashSeq,
357 nodes: vec![node],
358 tag: SetTagOption::Auto,
359 mode: DownloadMode::Queued,
360 },
361 )
362 .await
363 }
364
365 pub async fn download_with_opts(
367 &self,
368 hash: Hash,
369 opts: DownloadOptions,
370 ) -> Result<DownloadProgress> {
371 let DownloadOptions {
372 format,
373 nodes,
374 tag,
375 mode,
376 } = opts;
377 let stream = self
378 .rpc
379 .server_streaming(BlobDownloadRequest {
380 hash,
381 format,
382 nodes,
383 tag,
384 mode,
385 })
386 .await?;
387 Ok(DownloadProgress::new(
388 stream.map(|res| res.map_err(anyhow::Error::from)),
389 ))
390 }
391
392 pub async fn export(
402 &self,
403 hash: Hash,
404 destination: PathBuf,
405 format: ExportFormat,
406 mode: ExportMode,
407 ) -> Result<ExportProgress> {
408 let req = ExportRequest {
409 hash,
410 path: destination,
411 format,
412 mode,
413 };
414 let stream = self.rpc.server_streaming(req).await?;
415 Ok(ExportProgress::new(
416 stream.map(|r| r.map_err(anyhow::Error::from)),
417 ))
418 }
419
420 pub async fn list(&self) -> Result<impl Stream<Item = Result<BlobInfo>>> {
422 let stream = self.rpc.server_streaming(ListRequest).await?;
423 Ok(flatten(stream))
424 }
425
426 pub async fn list_incomplete(&self) -> Result<impl Stream<Item = Result<IncompleteBlobInfo>>> {
428 let stream = self.rpc.server_streaming(ListIncompleteRequest).await?;
429 Ok(flatten(stream))
430 }
431
432 pub async fn get_collection(&self, hash: Hash) -> Result<Collection> {
434 Collection::load(hash, self).await
435 }
436
437 pub fn list_collections(&self) -> Result<impl Stream<Item = Result<CollectionInfo>>> {
439 let this = self.clone();
440 Ok(Gen::new(|co| async move {
441 if let Err(cause) = this.list_collections_impl(&co).await {
442 co.yield_(Err(cause)).await;
443 }
444 }))
445 }
446
447 async fn list_collections_impl(&self, co: &Co<Result<CollectionInfo>>) -> Result<()> {
448 let tags = self.tags_client();
449 let mut tags = tags.list_hash_seq().await?;
450 while let Some(tag) = tags.next().await {
451 let tag = tag?;
452 if let Ok(collection) = self.get_collection(tag.hash).await {
453 let info = CollectionInfo {
454 tag: tag.name,
455 hash: tag.hash,
456 total_blobs_count: Some(collection.len() as u64 + 1),
457 total_blobs_size: Some(0),
458 };
459 co.yield_(Ok(info)).await;
460 }
461 }
462 Ok(())
463 }
464
465 pub async fn delete_blob(&self, hash: Hash) -> Result<()> {
471 self.rpc.rpc(DeleteRequest { hash }).await??;
472 Ok(())
473 }
474
475 fn tags_client(&self) -> tags::Client<C> {
476 tags::Client::new(self.rpc.clone())
477 }
478}
479
480impl<C> SimpleStore for Client<C>
481where
482 C: Connector<RpcService>,
483{
484 async fn load(&self, hash: Hash) -> anyhow::Result<Bytes> {
485 self.read_to_bytes(hash).await
486 }
487}
488
489#[derive(Debug, Serialize, Deserialize, Default, Clone, Copy)]
491pub enum ReadAtLen {
492 #[default]
494 All,
495 Exact(u64),
497 AtMost(u64),
499}
500
501impl ReadAtLen {
502 pub fn as_result_len(&self, size_remaining: u64) -> u64 {
504 match self {
505 ReadAtLen::All => size_remaining,
506 ReadAtLen::Exact(len) => *len,
507 ReadAtLen::AtMost(len) => std::cmp::min(*len, size_remaining),
508 }
509 }
510}
511
512#[derive(Debug, Serialize, Deserialize, Default, Clone)]
514pub enum WrapOption {
515 #[default]
517 NoWrap,
518 Wrap {
520 name: Option<String>,
522 },
523}
524
525#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
527pub enum BlobStatus {
528 NotFound,
530 Partial {
532 size: BaoBlobSize,
534 },
535 Complete {
537 size: u64,
539 },
540}
541
542#[derive(Debug, Clone)]
544pub struct AddOutcome {
545 pub hash: Hash,
547 pub format: BlobFormat,
549 pub size: u64,
551 pub tag: Tag,
553}
554
555#[derive(Debug, Serialize, Deserialize)]
557pub struct CollectionInfo {
558 pub tag: Tag,
560
561 pub hash: Hash,
563 pub total_blobs_count: Option<u64>,
567 pub total_blobs_size: Option<u64>,
571}
572
573#[derive(Debug, Serialize, Deserialize)]
575pub struct BlobInfo {
576 pub path: String,
578 pub hash: Hash,
580 pub size: u64,
582}
583
584#[derive(Debug, Serialize, Deserialize)]
586pub struct IncompleteBlobInfo {
587 pub size: u64,
589 pub expected_size: u64,
591 pub hash: Hash,
593}
594
595#[derive(derive_more::Debug)]
597pub struct AddProgress {
598 #[debug(skip)]
599 stream:
600 Pin<Box<dyn Stream<Item = Result<crate::provider::AddProgress>> + Send + Unpin + 'static>>,
601 current_total_size: Arc<AtomicU64>,
602}
603
604impl AddProgress {
605 fn new(
606 stream: (impl Stream<
607 Item = Result<impl Into<crate::provider::AddProgress>, impl Into<anyhow::Error>>,
608 > + Send
609 + Unpin
610 + 'static),
611 ) -> Self {
612 let current_total_size = Arc::new(AtomicU64::new(0));
613 let total_size = current_total_size.clone();
614 let stream = stream.map(move |item| match item {
615 Ok(item) => {
616 let item = item.into();
617 if let crate::provider::AddProgress::Found { size, .. } = &item {
618 total_size.fetch_add(*size, Ordering::Relaxed);
619 }
620 Ok(item)
621 }
622 Err(err) => Err(err.into()),
623 });
624 Self {
625 stream: Box::pin(stream),
626 current_total_size,
627 }
628 }
629 pub async fn finish(self) -> Result<AddOutcome> {
637 self.await
638 }
639}
640
641impl Stream for AddProgress {
642 type Item = Result<crate::provider::AddProgress>;
643 fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
644 Pin::new(&mut self.stream).poll_next(cx)
645 }
646}
647
648impl Future for AddProgress {
649 type Output = Result<AddOutcome>;
650
651 fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
652 loop {
653 match Pin::new(&mut self.stream).poll_next(cx) {
654 Poll::Pending => return Poll::Pending,
655 Poll::Ready(None) => {
656 return Poll::Ready(Err(anyhow!("Response stream ended prematurely")))
657 }
658 Poll::Ready(Some(Err(err))) => return Poll::Ready(Err(err)),
659 Poll::Ready(Some(Ok(msg))) => match msg {
660 crate::provider::AddProgress::AllDone { hash, format, tag } => {
661 let outcome = AddOutcome {
662 hash,
663 format,
664 tag,
665 size: self.current_total_size.load(Ordering::Relaxed),
666 };
667 return Poll::Ready(Ok(outcome));
668 }
669 crate::provider::AddProgress::Abort(err) => {
670 return Poll::Ready(Err(err.into()));
671 }
672 _ => {}
673 },
674 }
675 }
676 }
677}
678
679#[derive(Debug, Clone)]
681pub struct DownloadOutcome {
682 pub local_size: u64,
684 pub downloaded_size: u64,
686 pub stats: crate::get::Stats,
688}
689
690#[derive(derive_more::Debug)]
692pub struct DownloadProgress {
693 #[debug(skip)]
694 stream: Pin<Box<dyn Stream<Item = Result<BytesDownloadProgress>> + Send + Unpin + 'static>>,
695 current_local_size: Arc<AtomicU64>,
696 current_network_size: Arc<AtomicU64>,
697}
698
699impl DownloadProgress {
700 pub fn new(
702 stream: (impl Stream<Item = Result<impl Into<BytesDownloadProgress>, impl Into<anyhow::Error>>>
703 + Send
704 + Unpin
705 + 'static),
706 ) -> Self {
707 let current_local_size = Arc::new(AtomicU64::new(0));
708 let current_network_size = Arc::new(AtomicU64::new(0));
709
710 let local_size = current_local_size.clone();
711 let network_size = current_network_size.clone();
712
713 let stream = stream.map(move |item| match item {
714 Ok(item) => {
715 let item = item.into();
716 match &item {
717 BytesDownloadProgress::FoundLocal { size, .. } => {
718 local_size.fetch_add(size.value(), Ordering::Relaxed);
719 }
720 BytesDownloadProgress::Found { size, .. } => {
721 network_size.fetch_add(*size, Ordering::Relaxed);
722 }
723 _ => {}
724 }
725
726 Ok(item)
727 }
728 Err(err) => Err(err.into()),
729 });
730 Self {
731 stream: Box::pin(stream),
732 current_local_size,
733 current_network_size,
734 }
735 }
736
737 pub async fn finish(self) -> Result<DownloadOutcome> {
743 self.await
744 }
745}
746
747impl Stream for DownloadProgress {
748 type Item = Result<BytesDownloadProgress>;
749 fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
750 Pin::new(&mut self.stream).poll_next(cx)
751 }
752}
753
754impl Future for DownloadProgress {
755 type Output = Result<DownloadOutcome>;
756
757 fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
758 loop {
759 match Pin::new(&mut self.stream).poll_next(cx) {
760 Poll::Pending => return Poll::Pending,
761 Poll::Ready(None) => {
762 return Poll::Ready(Err(anyhow!("Response stream ended prematurely")))
763 }
764 Poll::Ready(Some(Err(err))) => return Poll::Ready(Err(err)),
765 Poll::Ready(Some(Ok(msg))) => match msg {
766 BytesDownloadProgress::AllDone(stats) => {
767 let outcome = DownloadOutcome {
768 local_size: self.current_local_size.load(Ordering::Relaxed),
769 downloaded_size: self.current_network_size.load(Ordering::Relaxed),
770 stats,
771 };
772 return Poll::Ready(Ok(outcome));
773 }
774 BytesDownloadProgress::Abort(err) => {
775 return Poll::Ready(Err(err.into()));
776 }
777 _ => {}
778 },
779 }
780 }
781 }
782}
783
784#[derive(Debug, Clone, PartialEq, Eq)]
786pub struct ExportOutcome {
787 total_size: u64,
789}
790
791#[derive(derive_more::Debug)]
793pub struct ExportProgress {
794 #[debug(skip)]
795 stream: Pin<Box<dyn Stream<Item = Result<BytesExportProgress>> + Send + Unpin + 'static>>,
796 current_total_size: Arc<AtomicU64>,
797}
798
799impl ExportProgress {
800 pub fn new(
803 stream: (impl Stream<Item = Result<impl Into<BytesExportProgress>, impl Into<anyhow::Error>>>
804 + Send
805 + Unpin
806 + 'static),
807 ) -> Self {
808 let current_total_size = Arc::new(AtomicU64::new(0));
809 let total_size = current_total_size.clone();
810 let stream = stream.map(move |item| match item {
811 Ok(item) => {
812 let item = item.into();
813 if let BytesExportProgress::Found { size, .. } = &item {
814 let size = size.value();
815 total_size.fetch_add(size, Ordering::Relaxed);
816 }
817
818 Ok(item)
819 }
820 Err(err) => Err(err.into()),
821 });
822 Self {
823 stream: Box::pin(stream),
824 current_total_size,
825 }
826 }
827
828 pub async fn finish(self) -> Result<ExportOutcome> {
832 self.await
833 }
834}
835
836impl Stream for ExportProgress {
837 type Item = Result<BytesExportProgress>;
838 fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
839 Pin::new(&mut self.stream).poll_next(cx)
840 }
841}
842
843impl Future for ExportProgress {
844 type Output = Result<ExportOutcome>;
845
846 fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
847 loop {
848 match Pin::new(&mut self.stream).poll_next(cx) {
849 Poll::Pending => return Poll::Pending,
850 Poll::Ready(None) => {
851 return Poll::Ready(Err(anyhow!("Response stream ended prematurely")))
852 }
853 Poll::Ready(Some(Err(err))) => return Poll::Ready(Err(err)),
854 Poll::Ready(Some(Ok(msg))) => match msg {
855 BytesExportProgress::AllDone => {
856 let outcome = ExportOutcome {
857 total_size: self.current_total_size.load(Ordering::Relaxed),
858 };
859 return Poll::Ready(Ok(outcome));
860 }
861 BytesExportProgress::Abort(err) => {
862 return Poll::Ready(Err(err.into()));
863 }
864 _ => {}
865 },
866 }
867 }
868 }
869}
870
871#[derive(derive_more::Debug)]
875pub struct Reader {
876 size: u64,
877 response_size: u64,
878 is_complete: bool,
879 #[debug("StreamReader")]
880 stream: tokio_util::io::StreamReader<BoxStreamSync<'static, io::Result<Bytes>>, Bytes>,
881}
882
883impl Reader {
884 fn new(
885 size: u64,
886 response_size: u64,
887 is_complete: bool,
888 stream: BoxStreamSync<'static, io::Result<Bytes>>,
889 ) -> Self {
890 Self {
891 size,
892 response_size,
893 is_complete,
894 stream: StreamReader::new(stream),
895 }
896 }
897
898 pub async fn from_rpc_read<C>(
900 rpc: &RpcClient<RpcService, C>,
901 hash: Hash,
902 ) -> anyhow::Result<Self>
903 where
904 C: Connector<RpcService>,
905 {
906 Self::from_rpc_read_at(rpc, hash, 0, ReadAtLen::All).await
907 }
908
909 async fn from_rpc_read_at<C>(
910 rpc: &RpcClient<RpcService, C>,
911 hash: Hash,
912 offset: u64,
913 len: ReadAtLen,
914 ) -> anyhow::Result<Self>
915 where
916 C: Connector<RpcService>,
917 {
918 let stream = rpc
919 .server_streaming(ReadAtRequest { hash, offset, len })
920 .await?;
921 let mut stream = flatten(stream);
922
923 let (size, is_complete) = match stream.next().await {
924 Some(Ok(ReadAtResponse::Entry { size, is_complete })) => (size, is_complete),
925 Some(Err(err)) => return Err(err),
926 Some(Ok(_)) => return Err(anyhow!("Expected header frame, but got data frame")),
927 None => return Err(anyhow!("Expected header frame, but RPC stream was dropped")),
928 };
929
930 let stream = stream.map(|item| match item {
931 Ok(ReadAtResponse::Data { chunk }) => Ok(chunk),
932 Ok(_) => Err(io::Error::new(io::ErrorKind::Other, "Expected data frame")),
933 Err(err) => Err(io::Error::new(io::ErrorKind::Other, format!("{err}"))),
934 });
935 let len = len.as_result_len(size.value() - offset);
936 Ok(Self::new(size.value(), len, is_complete, Box::pin(stream)))
937 }
938
939 pub fn size(&self) -> u64 {
941 self.size
942 }
943
944 pub fn is_complete(&self) -> bool {
948 self.is_complete
949 }
950
951 pub async fn read_to_bytes(&mut self) -> anyhow::Result<Bytes> {
953 let mut buf = Vec::with_capacity(self.response_size as usize);
954 self.read_to_end(&mut buf).await?;
955 Ok(buf.into())
956 }
957}
958
959impl AsyncRead for Reader {
960 fn poll_read(
961 mut self: Pin<&mut Self>,
962 cx: &mut Context<'_>,
963 buf: &mut ReadBuf<'_>,
964 ) -> Poll<io::Result<()>> {
965 Pin::new(&mut self.stream).poll_read(cx, buf)
966 }
967}
968
969impl Stream for Reader {
970 type Item = io::Result<Bytes>;
971
972 fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
973 Pin::new(&mut self.stream).get_pin_mut().poll_next(cx)
974 }
975
976 fn size_hint(&self) -> (usize, Option<usize>) {
977 self.stream.get_ref().size_hint()
978 }
979}
980
981#[derive(Debug, Clone, Serialize, Deserialize)]
983pub struct DownloadOptions {
984 pub format: BlobFormat,
986 pub nodes: Vec<NodeAddr>,
993 pub tag: SetTagOption,
995 pub mode: DownloadMode,
997}
998
999fn chunked_bytes_stream(mut b: Bytes, c: usize) -> impl Stream<Item = Bytes> {
1000 futures_lite::stream::iter(std::iter::from_fn(move || {
1001 Some(b.split_to(b.len().min(c))).filter(|x| !x.is_empty())
1002 }))
1003}
1004
1005#[cfg(test)]
1006mod tests {
1007 use std::{path::Path, time::Duration};
1008
1009 use iroh::{test_utils::DnsPkarrServer, NodeId, RelayMode, SecretKey};
1010 use node::Node;
1011 use rand::RngCore;
1012 use testresult::TestResult;
1013 use tokio::{io::AsyncWriteExt, sync::mpsc};
1014 use tracing_test::traced_test;
1015
1016 use super::*;
1017 use crate::{hashseq::HashSeq, ticket::BlobTicket};
1018
1019 mod node {
1020 use std::path::Path;
1022
1023 use iroh::{protocol::Router, Endpoint, NodeAddr, NodeId};
1024 use tokio_util::task::AbortOnDropHandle;
1025
1026 use super::RpcService;
1027 use crate::{
1028 net_protocol::Blobs,
1029 provider::{CustomEventSender, EventSender},
1030 rpc::client::{blobs, tags},
1031 };
1032
1033 type RpcClient = quic_rpc::RpcClient<RpcService>;
1034
1035 #[derive(Debug)]
1037 pub struct Node {
1038 router: iroh::protocol::Router,
1039 client: RpcClient,
1040 _rpc_task: AbortOnDropHandle<()>,
1041 }
1042
1043 #[derive(Debug)]
1045 pub struct Builder<S> {
1046 store: S,
1047 events: EventSender,
1048 endpoint: Option<iroh::endpoint::Builder>,
1049 }
1050
1051 impl<S: crate::store::Store> Builder<S> {
1052 pub fn blobs_events(self, events: impl CustomEventSender) -> Self {
1054 Self {
1055 events: events.into(),
1056 ..self
1057 }
1058 }
1059
1060 pub fn endpoint(self, endpoint: iroh::endpoint::Builder) -> Self {
1062 Self {
1063 endpoint: Some(endpoint),
1064 ..self
1065 }
1066 }
1067
1068 pub async fn spawn(self) -> anyhow::Result<Node> {
1070 let store = self.store;
1071 let events = self.events;
1072 let endpoint = self
1073 .endpoint
1074 .unwrap_or_else(|| Endpoint::builder().discovery_n0())
1075 .bind()
1076 .await?;
1077 let mut router = Router::builder(endpoint.clone());
1078
1079 let blobs = Blobs::builder(store.clone())
1081 .events(events)
1082 .build(&endpoint);
1083 router = router.accept(crate::ALPN, blobs.clone());
1084
1085 let router = router.spawn();
1087
1088 let (internal_rpc, controller) = quic_rpc::transport::flume::channel(32);
1090 let internal_rpc = quic_rpc::RpcServer::new(internal_rpc).boxed();
1091 let _rpc_task = internal_rpc.spawn_accept_loop(move |msg, chan| {
1092 blobs.clone().handle_rpc_request(msg, chan)
1093 });
1094 let client = quic_rpc::RpcClient::new(controller).boxed();
1095 Ok(Node {
1096 router,
1097 client,
1098 _rpc_task,
1099 })
1100 }
1101 }
1102
1103 impl Node {
1104 pub fn memory() -> Builder<crate::store::mem::Store> {
1106 Builder {
1107 store: crate::store::mem::Store::new(),
1108 events: Default::default(),
1109 endpoint: None,
1110 }
1111 }
1112
1113 pub async fn persistent(
1115 path: impl AsRef<Path>,
1116 ) -> anyhow::Result<Builder<crate::store::fs::Store>> {
1117 Ok(Builder {
1118 store: crate::store::fs::Store::load(path).await?,
1119 events: Default::default(),
1120 endpoint: None,
1121 })
1122 }
1123
1124 pub fn node_id(&self) -> NodeId {
1126 self.router.endpoint().node_id()
1127 }
1128
1129 pub async fn node_addr(&self) -> anyhow::Result<NodeAddr> {
1131 self.router.endpoint().node_addr().await
1132 }
1133
1134 pub async fn shutdown(self) -> anyhow::Result<()> {
1136 self.router.shutdown().await
1137 }
1138
1139 pub fn blobs(&self) -> blobs::Client {
1141 blobs::Client::new(self.client.clone())
1142 }
1143
1144 pub fn tags(&self) -> tags::Client {
1146 tags::Client::new(self.client.clone())
1147 }
1148 }
1149 }
1150
1151 #[tokio::test]
1152 #[traced_test]
1153 async fn test_blob_create_collection() -> Result<()> {
1154 let node = node::Node::memory().spawn().await?;
1155
1156 let temp_dir = tempfile::tempdir().context("tempdir")?;
1158
1159 let in_root = temp_dir.path().join("in");
1160 tokio::fs::create_dir_all(in_root.clone())
1161 .await
1162 .context("create dir all")?;
1163
1164 let mut paths = Vec::new();
1165 for i in 0..5 {
1166 let path = in_root.join(format!("test-{i}"));
1167 let size = 100;
1168 let mut buf = vec![0u8; size];
1169 rand::thread_rng().fill_bytes(&mut buf);
1170 let mut file = tokio::fs::File::create(path.clone())
1171 .await
1172 .context("create file")?;
1173 file.write_all(&buf.clone()).await.context("write_all")?;
1174 file.flush().await.context("flush")?;
1175 paths.push(path);
1176 }
1177
1178 let blobs = node.blobs();
1179
1180 let mut collection = Collection::default();
1181 let mut tags = Vec::new();
1182 for path in &paths {
1184 let import_outcome = blobs
1185 .add_from_path(
1186 path.to_path_buf(),
1187 false,
1188 SetTagOption::Auto,
1189 WrapOption::NoWrap,
1190 )
1191 .await
1192 .context("import file")?
1193 .finish()
1194 .await
1195 .context("import finish")?;
1196
1197 collection.push(
1198 path.file_name().unwrap().to_str().unwrap().to_string(),
1199 import_outcome.hash,
1200 );
1201 tags.push(import_outcome.tag);
1202 }
1203
1204 let (hash, tag) = blobs
1205 .create_collection(collection, SetTagOption::Auto, tags)
1206 .await?;
1207
1208 let collections: Vec<_> = blobs.list_collections()?.try_collect().await?;
1209
1210 assert_eq!(collections.len(), 1);
1211 {
1212 let CollectionInfo {
1213 tag,
1214 hash,
1215 total_blobs_count,
1216 ..
1217 } = &collections[0];
1218 assert_eq!(tag, tag);
1219 assert_eq!(hash, hash);
1220 assert_eq!(total_blobs_count, &Some(5 + 1));
1222 }
1223
1224 let tags: Vec<_> = node.tags().list().await?.try_collect().await?;
1226 assert_eq!(tags.len(), 1);
1227 assert_eq!(tags[0].hash, hash);
1228 assert_eq!(tags[0].name, tag);
1229 assert_eq!(tags[0].format, BlobFormat::HashSeq);
1230
1231 Ok(())
1232 }
1233
1234 #[tokio::test]
1235 #[traced_test]
1236 async fn test_blob_read_at() -> Result<()> {
1237 let node = node::Node::memory().spawn().await?;
1238
1239 let temp_dir = tempfile::tempdir().context("tempdir")?;
1241
1242 let in_root = temp_dir.path().join("in");
1243 tokio::fs::create_dir_all(in_root.clone())
1244 .await
1245 .context("create dir all")?;
1246
1247 let path = in_root.join("test-blob");
1248 let size = 1024 * 128;
1249 let buf: Vec<u8> = (0..size).map(|i| i as u8).collect();
1250 let mut file = tokio::fs::File::create(path.clone())
1251 .await
1252 .context("create file")?;
1253 file.write_all(&buf.clone()).await.context("write_all")?;
1254 file.flush().await.context("flush")?;
1255
1256 let blobs = node.blobs();
1257
1258 let import_outcome = blobs
1259 .add_from_path(
1260 path.to_path_buf(),
1261 false,
1262 SetTagOption::Auto,
1263 WrapOption::NoWrap,
1264 )
1265 .await
1266 .context("import file")?
1267 .finish()
1268 .await
1269 .context("import finish")?;
1270
1271 let hash = import_outcome.hash;
1272
1273 let res = blobs.read_to_bytes(hash).await?;
1275 assert_eq!(&res, &buf[..]);
1276
1277 let res = blobs
1279 .read_at_to_bytes(hash, 0, ReadAtLen::Exact(100))
1280 .await?;
1281 assert_eq!(res.len(), 100);
1282 assert_eq!(&res[..], &buf[0..100]);
1283
1284 let res = blobs
1285 .read_at_to_bytes(hash, 20, ReadAtLen::Exact(120))
1286 .await?;
1287 assert_eq!(res.len(), 120);
1288 assert_eq!(&res[..], &buf[20..140]);
1289
1290 let res = blobs
1292 .read_at_to_bytes(hash, 0, ReadAtLen::Exact(1024 * 64))
1293 .await?;
1294 assert_eq!(res.len(), 1024 * 64);
1295 assert_eq!(&res[..], &buf[0..1024 * 64]);
1296
1297 let res = blobs
1298 .read_at_to_bytes(hash, 20, ReadAtLen::Exact(1024 * 64))
1299 .await?;
1300 assert_eq!(res.len(), 1024 * 64);
1301 assert_eq!(&res[..], &buf[20..(20 + 1024 * 64)]);
1302
1303 let res = blobs
1305 .read_at_to_bytes(hash, 0, ReadAtLen::Exact(10 + 1024 * 64))
1306 .await?;
1307 assert_eq!(res.len(), 10 + 1024 * 64);
1308 assert_eq!(&res[..], &buf[0..(10 + 1024 * 64)]);
1309
1310 let res = blobs
1311 .read_at_to_bytes(hash, 20, ReadAtLen::Exact(10 + 1024 * 64))
1312 .await?;
1313 assert_eq!(res.len(), 10 + 1024 * 64);
1314 assert_eq!(&res[..], &buf[20..(20 + 10 + 1024 * 64)]);
1315
1316 let res = blobs.read_at_to_bytes(hash, 20, ReadAtLen::All).await?;
1318 assert_eq!(res.len(), 1024 * 128 - 20);
1319 assert_eq!(&res[..], &buf[20..]);
1320
1321 let reader = blobs.read_at(hash, 0, ReadAtLen::Exact(20)).await?;
1323 assert_eq!(reader.size(), 1024 * 128);
1324 assert_eq!(reader.response_size, 20);
1325
1326 let res = blobs
1328 .read_at_to_bytes(hash, 1024 * 127, ReadAtLen::Exact(1024))
1329 .await?;
1330 assert_eq!(res.len(), 1024);
1331 assert_eq!(res, &buf[1024 * 127..]);
1332
1333 let res = blobs
1335 .read_at_to_bytes(hash, 1024 * 127, ReadAtLen::All)
1336 .await?;
1337 assert_eq!(res.len(), 1024);
1338 assert_eq!(res, &buf[1024 * 127..]);
1339
1340 let mut res = blobs
1342 .read_at(hash, 1024 * 127, ReadAtLen::AtMost(2048))
1343 .await?;
1344 assert_eq!(res.size, 1024 * 128);
1345 assert_eq!(res.response_size, 1024);
1346 let res = res.read_to_bytes().await?;
1347 assert_eq!(res.len(), 1024);
1348 assert_eq!(res, &buf[1024 * 127..]);
1349
1350 let res = blobs
1352 .read_at(hash, 0, ReadAtLen::Exact(1024 * 128 + 1))
1353 .await;
1354 let err = res.unwrap_err();
1355 assert!(err.to_string().contains("out of bound"));
1356
1357 let res = blobs.read_at(hash, 1024 * 128 + 1, ReadAtLen::All).await;
1359 let err = res.unwrap_err();
1360 assert!(err.to_string().contains("out of range"));
1361
1362 let res = blobs
1364 .read_at(hash, 1024 * 127, ReadAtLen::Exact(1025))
1365 .await;
1366 let err = res.unwrap_err();
1367 assert!(err.to_string().contains("out of bound"));
1368
1369 Ok(())
1370 }
1371
1372 #[tokio::test]
1373 #[traced_test]
1374 async fn test_blob_get_collection() -> Result<()> {
1375 let node = node::Node::memory().spawn().await?;
1376
1377 let temp_dir = tempfile::tempdir().context("tempdir")?;
1379
1380 let in_root = temp_dir.path().join("in");
1381 tokio::fs::create_dir_all(in_root.clone())
1382 .await
1383 .context("create dir all")?;
1384
1385 let mut paths = Vec::new();
1386 for i in 0..5 {
1387 let path = in_root.join(format!("test-{i}"));
1388 let size = 100;
1389 let mut buf = vec![0u8; size];
1390 rand::thread_rng().fill_bytes(&mut buf);
1391 let mut file = tokio::fs::File::create(path.clone())
1392 .await
1393 .context("create file")?;
1394 file.write_all(&buf.clone()).await.context("write_all")?;
1395 file.flush().await.context("flush")?;
1396 paths.push(path);
1397 }
1398
1399 let blobs = node.blobs();
1400
1401 let mut collection = Collection::default();
1402 let mut tags = Vec::new();
1403 for path in &paths {
1405 let import_outcome = blobs
1406 .add_from_path(
1407 path.to_path_buf(),
1408 false,
1409 SetTagOption::Auto,
1410 WrapOption::NoWrap,
1411 )
1412 .await
1413 .context("import file")?
1414 .finish()
1415 .await
1416 .context("import finish")?;
1417
1418 collection.push(
1419 path.file_name().unwrap().to_str().unwrap().to_string(),
1420 import_outcome.hash,
1421 );
1422 tags.push(import_outcome.tag);
1423 }
1424
1425 let (hash, _tag) = blobs
1426 .create_collection(collection, SetTagOption::Auto, tags)
1427 .await?;
1428
1429 let collection = blobs.get_collection(hash).await?;
1430
1431 assert_eq!(collection.len(), 5);
1433
1434 Ok(())
1435 }
1436
1437 #[tokio::test]
1438 #[traced_test]
1439 async fn test_blob_share() -> Result<()> {
1440 let node = node::Node::memory().spawn().await?;
1441
1442 let temp_dir = tempfile::tempdir().context("tempdir")?;
1444
1445 let in_root = temp_dir.path().join("in");
1446 tokio::fs::create_dir_all(in_root.clone())
1447 .await
1448 .context("create dir all")?;
1449
1450 let path = in_root.join("test-blob");
1451 let size = 1024 * 128;
1452 let buf: Vec<u8> = (0..size).map(|i| i as u8).collect();
1453 let mut file = tokio::fs::File::create(path.clone())
1454 .await
1455 .context("create file")?;
1456 file.write_all(&buf.clone()).await.context("write_all")?;
1457 file.flush().await.context("flush")?;
1458
1459 let blobs = node.blobs();
1460
1461 let import_outcome = blobs
1462 .add_from_path(
1463 path.to_path_buf(),
1464 false,
1465 SetTagOption::Auto,
1466 WrapOption::NoWrap,
1467 )
1468 .await
1469 .context("import file")?
1470 .finish()
1471 .await
1472 .context("import finish")?;
1473
1474 let status = blobs.status(import_outcome.hash).await?;
1480 assert_eq!(status, BlobStatus::Complete { size });
1481
1482 Ok(())
1483 }
1484
1485 #[derive(Debug, Clone)]
1486 struct BlobEvents {
1487 sender: mpsc::Sender<crate::provider::Event>,
1488 }
1489
1490 impl BlobEvents {
1491 fn new(cap: usize) -> (Self, mpsc::Receiver<crate::provider::Event>) {
1492 let (s, r) = mpsc::channel(cap);
1493 (Self { sender: s }, r)
1494 }
1495 }
1496
1497 impl crate::provider::CustomEventSender for BlobEvents {
1498 fn send(&self, event: crate::provider::Event) -> futures_lite::future::Boxed<()> {
1499 let sender = self.sender.clone();
1500 Box::pin(async move {
1501 sender.send(event).await.ok();
1502 })
1503 }
1504
1505 fn try_send(&self, event: crate::provider::Event) {
1506 self.sender.try_send(event).ok();
1507 }
1508 }
1509
1510 #[tokio::test]
1511 #[traced_test]
1512 async fn test_blob_provide_events() -> Result<()> {
1513 let (node1_events, mut node1_events_r) = BlobEvents::new(16);
1514 let node1 = node::Node::memory()
1515 .blobs_events(node1_events)
1516 .spawn()
1517 .await?;
1518
1519 let (node2_events, mut node2_events_r) = BlobEvents::new(16);
1520 let node2 = node::Node::memory()
1521 .blobs_events(node2_events)
1522 .spawn()
1523 .await?;
1524
1525 let import_outcome = node1.blobs().add_bytes(&b"hello world"[..]).await?;
1526
1527 let node1_addr = node1.node_addr().await?;
1529 let res = node2
1530 .blobs()
1531 .download(import_outcome.hash, node1_addr)
1532 .await?
1533 .await?;
1534 dbg!(&res);
1535 assert_eq!(res.local_size, 0);
1536 assert_eq!(res.downloaded_size, 11);
1537
1538 node1.shutdown().await?;
1539 node2.shutdown().await?;
1540
1541 let mut ev1 = Vec::new();
1542 while let Some(ev) = node1_events_r.recv().await {
1543 ev1.push(ev);
1544 }
1545 assert!(matches!(
1547 ev1[0],
1548 crate::provider::Event::ClientConnected { .. }
1549 ));
1550 assert!(matches!(
1551 ev1[1],
1552 crate::provider::Event::GetRequestReceived { .. }
1553 ));
1554 assert!(matches!(
1555 ev1[2],
1556 crate::provider::Event::TransferProgress { .. }
1557 ));
1558 assert!(matches!(
1559 ev1[3],
1560 crate::provider::Event::TransferCompleted { .. }
1561 ));
1562 dbg!(&ev1);
1563
1564 let mut ev2 = Vec::new();
1565 while let Some(ev) = node2_events_r.recv().await {
1566 ev2.push(ev);
1567 }
1568
1569 assert!(ev2.is_empty());
1571 Ok(())
1572 }
1573 #[tokio::test]
1575 #[traced_test]
1576 async fn test_blob_get_self_existing() -> TestResult<()> {
1577 let node = node::Node::memory().spawn().await?;
1578 let node_id = node.node_id();
1579 let blobs = node.blobs();
1580
1581 let AddOutcome { hash, size, .. } = blobs.add_bytes("foo").await?;
1582
1583 let res = blobs
1585 .download_with_opts(
1586 hash,
1587 DownloadOptions {
1588 format: BlobFormat::Raw,
1589 nodes: vec![node_id.into()],
1590 tag: SetTagOption::Auto,
1591 mode: DownloadMode::Direct,
1592 },
1593 )
1594 .await?
1595 .await?;
1596
1597 assert_eq!(res.local_size, size);
1598 assert_eq!(res.downloaded_size, 0);
1599
1600 let res = blobs
1602 .download_with_opts(
1603 hash,
1604 DownloadOptions {
1605 format: BlobFormat::Raw,
1606 nodes: vec![node_id.into()],
1607 tag: SetTagOption::Auto,
1608 mode: DownloadMode::Queued,
1609 },
1610 )
1611 .await?
1612 .await?;
1613
1614 assert_eq!(res.local_size, size);
1615 assert_eq!(res.downloaded_size, 0);
1616
1617 Ok(())
1618 }
1619
1620 #[tokio::test]
1622 #[traced_test]
1623 async fn test_blob_get_self_missing() -> TestResult<()> {
1624 let node = node::Node::memory().spawn().await?;
1625 let node_id = node.node_id();
1626 let blobs = node.blobs();
1627
1628 let hash = Hash::from_bytes([0u8; 32]);
1629
1630 let res = blobs
1632 .download_with_opts(
1633 hash,
1634 DownloadOptions {
1635 format: BlobFormat::Raw,
1636 nodes: vec![node_id.into()],
1637 tag: SetTagOption::Auto,
1638 mode: DownloadMode::Direct,
1639 },
1640 )
1641 .await?
1642 .await;
1643 assert!(res.is_err());
1644 assert_eq!(
1645 res.err().unwrap().to_string().as_str(),
1646 "No nodes to download from provided"
1647 );
1648
1649 let res = blobs
1651 .download_with_opts(
1652 hash,
1653 DownloadOptions {
1654 format: BlobFormat::Raw,
1655 nodes: vec![node_id.into()],
1656 tag: SetTagOption::Auto,
1657 mode: DownloadMode::Queued,
1658 },
1659 )
1660 .await?
1661 .await;
1662 assert!(res.is_err());
1663 assert_eq!(
1664 res.err().unwrap().to_string().as_str(),
1665 "No provider nodes found"
1666 );
1667
1668 Ok(())
1669 }
1670
1671 #[tokio::test]
1673 #[traced_test]
1674 async fn test_blob_get_existing_collection() -> TestResult<()> {
1675 let node = node::Node::memory().spawn().await?;
1676 let node_id = NodeId::from_bytes(&[0u8; 32])?;
1679 let blobs = node.blobs();
1680
1681 let mut collection = Collection::default();
1682 let mut tags = Vec::new();
1683 let mut size = 0;
1684 for value in ["iroh", "is", "cool"] {
1685 let import_outcome = blobs.add_bytes(value).await.context("add bytes")?;
1686 collection.push(value.to_string(), import_outcome.hash);
1687 tags.push(import_outcome.tag);
1688 size += import_outcome.size;
1689 }
1690
1691 let (hash, _tag) = blobs
1692 .create_collection(collection, SetTagOption::Auto, tags)
1693 .await?;
1694
1695 let hashseq_bytes = blobs.read_to_bytes(hash).await?;
1697 size += hashseq_bytes.len() as u64;
1698 let hashseq = HashSeq::try_from(hashseq_bytes)?;
1699 let collection_header_bytes = blobs
1700 .read_to_bytes(hashseq.into_iter().next().expect("header to exist"))
1701 .await?;
1702 size += collection_header_bytes.len() as u64;
1703
1704 let res = blobs
1706 .download_with_opts(
1707 hash,
1708 DownloadOptions {
1709 format: BlobFormat::HashSeq,
1710 nodes: vec![node_id.into()],
1711 tag: SetTagOption::Auto,
1712 mode: DownloadMode::Direct,
1713 },
1714 )
1715 .await?
1716 .await
1717 .context("direct (download)")?;
1718
1719 assert_eq!(res.local_size, size);
1720 assert_eq!(res.downloaded_size, 0);
1721
1722 let res = blobs
1724 .download_with_opts(
1725 hash,
1726 DownloadOptions {
1727 format: BlobFormat::HashSeq,
1728 nodes: vec![node_id.into()],
1729 tag: SetTagOption::Auto,
1730 mode: DownloadMode::Queued,
1731 },
1732 )
1733 .await?
1734 .await
1735 .context("queued")?;
1736
1737 assert_eq!(res.local_size, size);
1738 assert_eq!(res.downloaded_size, 0);
1739
1740 Ok(())
1741 }
1742
1743 #[tokio::test]
1744 #[traced_test]
1745 #[cfg_attr(target_os = "windows", ignore = "flaky")]
1746 async fn test_blob_delete_mem() -> Result<()> {
1747 let node = node::Node::memory().spawn().await?;
1748
1749 let res = node.blobs().add_bytes(&b"hello world"[..]).await?;
1750
1751 let hashes: Vec<_> = node.blobs().list().await?.try_collect().await?;
1752 assert_eq!(hashes.len(), 1);
1753 assert_eq!(hashes[0].hash, res.hash);
1754
1755 node.blobs().delete_blob(res.hash).await?;
1757
1758 let hashes: Vec<_> = node.blobs().list().await?.try_collect().await?;
1759 assert!(hashes.is_empty());
1760
1761 Ok(())
1762 }
1763
1764 #[tokio::test]
1765 #[traced_test]
1766 async fn test_blob_delete_fs() -> Result<()> {
1767 let dir = tempfile::tempdir()?;
1768 let node = node::Node::persistent(dir.path()).await?.spawn().await?;
1769
1770 let res = node.blobs().add_bytes(&b"hello world"[..]).await?;
1771
1772 let hashes: Vec<_> = node.blobs().list().await?.try_collect().await?;
1773 assert_eq!(hashes.len(), 1);
1774 assert_eq!(hashes[0].hash, res.hash);
1775
1776 node.blobs().delete_blob(res.hash).await?;
1778
1779 let hashes: Vec<_> = node.blobs().list().await?.try_collect().await?;
1780 assert!(hashes.is_empty());
1781
1782 Ok(())
1783 }
1784
1785 #[tokio::test]
1786 #[traced_test]
1787 async fn test_ticket_multiple_addrs() -> TestResult<()> {
1788 let node = Node::memory().spawn().await?;
1789 let hash = node
1790 .blobs()
1791 .add_bytes(Bytes::from_static(b"hello"))
1792 .await?
1793 .hash;
1794
1795 let addr = node.node_addr().await?;
1796 let ticket = BlobTicket::new(addr, hash, BlobFormat::Raw)?;
1797 println!("addrs: {:?}", ticket.node_addr());
1798 assert!(!ticket.node_addr().direct_addresses.is_empty());
1799 Ok(())
1800 }
1801
1802 #[tokio::test]
1803 #[traced_test]
1804 async fn test_node_add_blob_stream() -> Result<()> {
1805 use std::io::Cursor;
1806 let node = Node::memory().spawn().await?;
1807
1808 let blobs = node.blobs();
1809 let input = vec![2u8; 1024 * 256]; let reader = Cursor::new(input.clone());
1811 let progress = blobs.add_reader(reader, SetTagOption::Auto).await?;
1812 let outcome = progress.finish().await?;
1813 let hash = outcome.hash;
1814 let output = blobs.read_to_bytes(hash).await?;
1815 assert_eq!(input, output.to_vec());
1816 Ok(())
1817 }
1818
1819 #[tokio::test]
1820 #[traced_test]
1821 async fn test_node_add_tagged_blob_event() -> Result<()> {
1822 let node = Node::memory().spawn().await?;
1823
1824 let _got_hash = tokio::time::timeout(Duration::from_secs(10), async move {
1825 let mut stream = node
1826 .blobs()
1827 .add_from_path(
1828 Path::new(env!("CARGO_MANIFEST_DIR")).join("README.md"),
1829 false,
1830 SetTagOption::Auto,
1831 WrapOption::NoWrap,
1832 )
1833 .await?;
1834
1835 while let Some(progress) = stream.next().await {
1836 match progress? {
1837 crate::provider::AddProgress::AllDone { hash, .. } => {
1838 return Ok(hash);
1839 }
1840 crate::provider::AddProgress::Abort(e) => {
1841 anyhow::bail!("Error while adding data: {e}");
1842 }
1843 _ => {}
1844 }
1845 }
1846 anyhow::bail!("stream ended without providing data");
1847 })
1848 .await
1849 .context("timeout")?
1850 .context("get failed")?;
1851
1852 Ok(())
1853 }
1854
1855 #[tokio::test]
1856 #[traced_test]
1857 async fn test_download_via_relay() -> Result<()> {
1858 let (relay_map, relay_url, _guard) = iroh::test_utils::run_relay_server().await?;
1859
1860 let endpoint1 = iroh::Endpoint::builder()
1861 .relay_mode(RelayMode::Custom(relay_map.clone()))
1862 .insecure_skip_relay_cert_verify(true);
1863 let node1 = Node::memory().endpoint(endpoint1).spawn().await?;
1864 let endpoint2 = iroh::Endpoint::builder()
1865 .relay_mode(RelayMode::Custom(relay_map.clone()))
1866 .insecure_skip_relay_cert_verify(true);
1867 let node2 = Node::memory().endpoint(endpoint2).spawn().await?;
1868 let AddOutcome { hash, .. } = node1.blobs().add_bytes(b"foo".to_vec()).await?;
1869
1870 let addr = NodeAddr::new(node1.node_id()).with_relay_url(relay_url);
1872 node2.blobs().download(hash, addr).await?.await?;
1873 assert_eq!(
1874 node2
1875 .blobs()
1876 .read_to_bytes(hash)
1877 .await
1878 .context("get")?
1879 .as_ref(),
1880 b"foo"
1881 );
1882 Ok(())
1883 }
1884
1885 #[tokio::test]
1886 #[traced_test]
1887 #[ignore = "flaky"]
1888 async fn test_download_via_relay_with_discovery() -> Result<()> {
1889 let (relay_map, _relay_url, _guard) = iroh::test_utils::run_relay_server().await?;
1890 let dns_pkarr_server = DnsPkarrServer::run().await?;
1891
1892 let mut rng = rand::thread_rng();
1893
1894 let secret1 = SecretKey::generate(&mut rng);
1895 let endpoint1 = iroh::Endpoint::builder()
1896 .relay_mode(RelayMode::Custom(relay_map.clone()))
1897 .insecure_skip_relay_cert_verify(true)
1898 .dns_resolver(dns_pkarr_server.dns_resolver())
1899 .secret_key(secret1.clone())
1900 .discovery(dns_pkarr_server.discovery(secret1));
1901 let node1 = Node::memory().endpoint(endpoint1).spawn().await?;
1902 let secret2 = SecretKey::generate(&mut rng);
1903 let endpoint2 = iroh::Endpoint::builder()
1904 .relay_mode(RelayMode::Custom(relay_map.clone()))
1905 .insecure_skip_relay_cert_verify(true)
1906 .dns_resolver(dns_pkarr_server.dns_resolver())
1907 .secret_key(secret2.clone())
1908 .discovery(dns_pkarr_server.discovery(secret2));
1909 let node2 = Node::memory().endpoint(endpoint2).spawn().await?;
1910 let hash = node1.blobs().add_bytes(b"foo".to_vec()).await?.hash;
1911
1912 let addr = NodeAddr::new(node1.node_id());
1914 node2.blobs().download(hash, addr).await?.await?;
1915 assert_eq!(
1916 node2
1917 .blobs()
1918 .read_to_bytes(hash)
1919 .await
1920 .context("get")?
1921 .as_ref(),
1922 b"foo"
1923 );
1924 Ok(())
1925 }
1926}