1use std::{
60 future::Future,
61 io,
62 path::PathBuf,
63 pin::Pin,
64 sync::Arc,
65 task::{Context, Poll},
66};
67
68use anyhow::{anyhow, Context as _, Result};
69use bytes::Bytes;
70use futures_lite::{Stream, StreamExt};
71use futures_util::SinkExt;
72use genawaiter::sync::{Co, Gen};
73use iroh::NodeAddr;
74use portable_atomic::{AtomicU64, Ordering};
75use quic_rpc::{
76 client::{BoxStreamSync, BoxedConnector},
77 Connector, RpcClient,
78};
79use serde::{Deserialize, Serialize};
80use tokio::io::{AsyncRead, AsyncReadExt, ReadBuf};
81use tokio_util::io::{ReaderStream, StreamReader};
82use tracing::warn;
83
84pub use crate::net_protocol::DownloadMode;
85use crate::{
86 export::ExportProgress as BytesExportProgress,
87 format::collection::{Collection, SimpleStore},
88 get::db::DownloadProgress as BytesDownloadProgress,
89 net_protocol::BlobDownloadRequest,
90 rpc::proto::RpcService,
91 store::{BaoBlobSize, ConsistencyCheckProgress, ExportFormat, ExportMode, ValidateProgress},
92 util::SetTagOption,
93 BlobFormat, Hash, Tag,
94};
95
96mod batch;
97pub use batch::{AddDirOpts, AddFileOpts, AddReaderOpts, Batch};
98
99use super::{flatten, tags};
100use crate::rpc::proto::blobs::{
101 AddPathRequest, AddStreamRequest, AddStreamUpdate, BatchCreateRequest, BatchCreateResponse,
102 BlobStatusRequest, ConsistencyCheckRequest, CreateCollectionRequest, CreateCollectionResponse,
103 DeleteRequest, ExportRequest, ListIncompleteRequest, ListRequest, ReadAtRequest,
104 ReadAtResponse, ValidateRequest,
105};
106
107#[derive(Debug, Clone)]
109#[repr(transparent)]
110pub struct Client<C = BoxedConnector<RpcService>> {
111 pub(super) rpc: RpcClient<RpcService, C>,
112}
113
114pub type MemClient = Client<crate::rpc::MemConnector>;
116
117impl<C> Client<C>
118where
119 C: Connector<RpcService>,
120{
121 pub fn new(rpc: RpcClient<RpcService, C>) -> Self {
123 Self { rpc }
124 }
125
126 pub fn tags(&self) -> tags::Client<C> {
128 tags::Client::new(self.rpc.clone())
129 }
130
131 pub async fn status(&self, hash: Hash) -> Result<BlobStatus> {
136 let status = self.rpc.rpc(BlobStatusRequest { hash }).await??;
137 Ok(status.0)
138 }
139
140 pub async fn has(&self, hash: Hash) -> Result<bool> {
144 match self.status(hash).await {
145 Ok(BlobStatus::Complete { .. }) => Ok(true),
146 Ok(_) => Ok(false),
147 Err(err) => Err(err),
148 }
149 }
150
151 pub async fn batch(&self) -> Result<Batch<C>> {
157 let (updates, mut stream) = self.rpc.bidi(BatchCreateRequest).await?;
158 let BatchCreateResponse::Id(batch) = stream.next().await.context("expected scope id")??;
159 let rpc = self.rpc.clone();
160 Ok(Batch::new(batch, rpc, updates, 1024))
161 }
162
163 pub async fn read(&self, hash: Hash) -> Result<Reader> {
167 Reader::from_rpc_read(&self.rpc, hash).await
168 }
169
170 pub async fn read_at(&self, hash: Hash, offset: u64, len: ReadAtLen) -> Result<Reader> {
174 Reader::from_rpc_read_at(&self.rpc, hash, offset, len).await
175 }
176
177 pub async fn read_to_bytes(&self, hash: Hash) -> Result<Bytes> {
183 Reader::from_rpc_read(&self.rpc, hash)
184 .await?
185 .read_to_bytes()
186 .await
187 }
188
189 pub async fn read_at_to_bytes(&self, hash: Hash, offset: u64, len: ReadAtLen) -> Result<Bytes> {
193 Reader::from_rpc_read_at(&self.rpc, hash, offset, len)
194 .await?
195 .read_to_bytes()
196 .await
197 }
198
199 pub async fn add_from_path(
206 &self,
207 path: PathBuf,
208 in_place: bool,
209 tag: SetTagOption,
210 wrap: WrapOption,
211 ) -> Result<AddProgress> {
212 let stream = self
213 .rpc
214 .server_streaming(AddPathRequest {
215 path,
216 in_place,
217 tag,
218 wrap,
219 })
220 .await?;
221 Ok(AddProgress::new(stream))
222 }
223
224 pub async fn create_collection(
229 &self,
230 collection: Collection,
231 tag: SetTagOption,
232 tags_to_delete: Vec<Tag>,
233 ) -> anyhow::Result<(Hash, Tag)> {
234 let CreateCollectionResponse { hash, tag } = self
235 .rpc
236 .rpc(CreateCollectionRequest {
237 collection,
238 tag,
239 tags_to_delete,
240 })
241 .await??;
242 Ok((hash, tag))
243 }
244
245 pub async fn add_reader(
247 &self,
248 reader: impl AsyncRead + Unpin + Send + 'static,
249 tag: SetTagOption,
250 ) -> anyhow::Result<AddProgress> {
251 const CAP: usize = 1024 * 64; let input = ReaderStream::with_capacity(reader, CAP);
253 self.add_stream(input, tag).await
254 }
255
256 pub async fn add_stream(
258 &self,
259 input: impl Stream<Item = io::Result<Bytes>> + Send + Unpin + 'static,
260 tag: SetTagOption,
261 ) -> anyhow::Result<AddProgress> {
262 let (mut sink, progress) = self.rpc.bidi(AddStreamRequest { tag }).await?;
263 let mut input = input.map(|chunk| match chunk {
264 Ok(chunk) => Ok(AddStreamUpdate::Chunk(chunk)),
265 Err(err) => {
266 warn!("Abort send, reason: failed to read from source stream: {err:?}");
267 Ok(AddStreamUpdate::Abort)
268 }
269 });
270 tokio::spawn(async move {
271 if let Err(err) = sink.send_all(&mut input).await {
272 sink.send(AddStreamUpdate::Abort).await.ok();
275 warn!("Failed to send input stream to remote: {err:?}");
276 }
277 });
278
279 Ok(AddProgress::new(progress))
280 }
281
282 pub async fn add_bytes(&self, bytes: impl Into<Bytes>) -> anyhow::Result<AddOutcome> {
284 let input = chunked_bytes_stream(bytes.into(), 1024 * 64).map(Ok);
285 self.add_stream(input, SetTagOption::Auto).await?.await
286 }
287
288 pub async fn add_bytes_named(
290 &self,
291 bytes: impl Into<Bytes>,
292 name: impl Into<Tag>,
293 ) -> anyhow::Result<AddOutcome> {
294 let input = chunked_bytes_stream(bytes.into(), 1024 * 64).map(Ok);
295 self.add_stream(input, SetTagOption::Named(name.into()))
296 .await?
297 .await
298 }
299
300 pub async fn validate(
304 &self,
305 repair: bool,
306 ) -> Result<impl Stream<Item = Result<ValidateProgress>>> {
307 let stream = self
308 .rpc
309 .server_streaming(ValidateRequest { repair })
310 .await?;
311 Ok(stream.map(|res| res.map_err(anyhow::Error::from)))
312 }
313
314 pub async fn consistency_check(
318 &self,
319 repair: bool,
320 ) -> Result<impl Stream<Item = Result<ConsistencyCheckProgress>>> {
321 let stream = self
322 .rpc
323 .server_streaming(ConsistencyCheckRequest { repair })
324 .await?;
325 Ok(stream.map(|r| r.map_err(anyhow::Error::from)))
326 }
327
328 pub async fn download(&self, hash: Hash, node: NodeAddr) -> Result<DownloadProgress> {
330 self.download_with_opts(
331 hash,
332 DownloadOptions {
333 format: BlobFormat::Raw,
334 nodes: vec![node],
335 tag: SetTagOption::Auto,
336 mode: DownloadMode::Queued,
337 },
338 )
339 .await
340 }
341
342 pub async fn download_hash_seq(&self, hash: Hash, node: NodeAddr) -> Result<DownloadProgress> {
344 self.download_with_opts(
345 hash,
346 DownloadOptions {
347 format: BlobFormat::HashSeq,
348 nodes: vec![node],
349 tag: SetTagOption::Auto,
350 mode: DownloadMode::Queued,
351 },
352 )
353 .await
354 }
355
356 pub async fn download_with_opts(
358 &self,
359 hash: Hash,
360 opts: DownloadOptions,
361 ) -> Result<DownloadProgress> {
362 let DownloadOptions {
363 format,
364 nodes,
365 tag,
366 mode,
367 } = opts;
368 let stream = self
369 .rpc
370 .server_streaming(BlobDownloadRequest {
371 hash,
372 format,
373 nodes,
374 tag,
375 mode,
376 })
377 .await?;
378 Ok(DownloadProgress::new(
379 stream.map(|res| res.map_err(anyhow::Error::from)),
380 ))
381 }
382
383 pub async fn export(
393 &self,
394 hash: Hash,
395 destination: PathBuf,
396 format: ExportFormat,
397 mode: ExportMode,
398 ) -> Result<ExportProgress> {
399 let req = ExportRequest {
400 hash,
401 path: destination,
402 format,
403 mode,
404 };
405 let stream = self.rpc.server_streaming(req).await?;
406 Ok(ExportProgress::new(
407 stream.map(|r| r.map_err(anyhow::Error::from)),
408 ))
409 }
410
411 pub async fn list(&self) -> Result<impl Stream<Item = Result<BlobInfo>>> {
413 let stream = self.rpc.server_streaming(ListRequest).await?;
414 Ok(flatten(stream))
415 }
416
417 pub async fn list_incomplete(&self) -> Result<impl Stream<Item = Result<IncompleteBlobInfo>>> {
419 let stream = self.rpc.server_streaming(ListIncompleteRequest).await?;
420 Ok(flatten(stream))
421 }
422
423 pub async fn get_collection(&self, hash: Hash) -> Result<Collection> {
425 Collection::load(hash, self).await
426 }
427
428 pub fn list_collections(&self) -> Result<impl Stream<Item = Result<CollectionInfo>>> {
430 let this = self.clone();
431 Ok(Gen::new(|co| async move {
432 if let Err(cause) = this.list_collections_impl(&co).await {
433 co.yield_(Err(cause)).await;
434 }
435 }))
436 }
437
438 async fn list_collections_impl(&self, co: &Co<Result<CollectionInfo>>) -> Result<()> {
439 let tags = self.tags_client();
440 let mut tags = tags.list_hash_seq().await?;
441 while let Some(tag) = tags.next().await {
442 let tag = tag?;
443 if let Ok(collection) = self.get_collection(tag.hash).await {
444 let info = CollectionInfo {
445 tag: tag.name,
446 hash: tag.hash,
447 total_blobs_count: Some(collection.len() as u64 + 1),
448 total_blobs_size: Some(0),
449 };
450 co.yield_(Ok(info)).await;
451 }
452 }
453 Ok(())
454 }
455
456 pub async fn delete_blob(&self, hash: Hash) -> Result<()> {
462 self.rpc.rpc(DeleteRequest { hash }).await??;
463 Ok(())
464 }
465
466 fn tags_client(&self) -> tags::Client<C> {
467 tags::Client::new(self.rpc.clone())
468 }
469}
470
471impl<C> SimpleStore for Client<C>
472where
473 C: Connector<RpcService>,
474{
475 async fn load(&self, hash: Hash) -> anyhow::Result<Bytes> {
476 self.read_to_bytes(hash).await
477 }
478}
479
480#[derive(Debug, Serialize, Deserialize, Default, Clone, Copy)]
482pub enum ReadAtLen {
483 #[default]
485 All,
486 Exact(u64),
488 AtMost(u64),
490}
491
492impl ReadAtLen {
493 pub fn as_result_len(&self, size_remaining: u64) -> u64 {
495 match self {
496 ReadAtLen::All => size_remaining,
497 ReadAtLen::Exact(len) => *len,
498 ReadAtLen::AtMost(len) => std::cmp::min(*len, size_remaining),
499 }
500 }
501}
502
503#[derive(Debug, Serialize, Deserialize, Default, Clone)]
505pub enum WrapOption {
506 #[default]
508 NoWrap,
509 Wrap {
511 name: Option<String>,
513 },
514}
515
516#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
518pub enum BlobStatus {
519 NotFound,
521 Partial {
523 size: BaoBlobSize,
525 },
526 Complete {
528 size: u64,
530 },
531}
532
533#[derive(Debug, Clone)]
535pub struct AddOutcome {
536 pub hash: Hash,
538 pub format: BlobFormat,
540 pub size: u64,
542 pub tag: Tag,
544}
545
546#[derive(Debug, Serialize, Deserialize)]
548pub struct CollectionInfo {
549 pub tag: Tag,
551
552 pub hash: Hash,
554 pub total_blobs_count: Option<u64>,
558 pub total_blobs_size: Option<u64>,
562}
563
564#[derive(Debug, Serialize, Deserialize)]
566pub struct BlobInfo {
567 pub path: String,
569 pub hash: Hash,
571 pub size: u64,
573}
574
575#[derive(Debug, Serialize, Deserialize)]
577pub struct IncompleteBlobInfo {
578 pub size: u64,
580 pub expected_size: u64,
582 pub hash: Hash,
584}
585
586#[derive(derive_more::Debug)]
588pub struct AddProgress {
589 #[debug(skip)]
590 stream:
591 Pin<Box<dyn Stream<Item = Result<crate::provider::AddProgress>> + Send + Unpin + 'static>>,
592 current_total_size: Arc<AtomicU64>,
593}
594
595impl AddProgress {
596 fn new(
597 stream: (impl Stream<
598 Item = Result<impl Into<crate::provider::AddProgress>, impl Into<anyhow::Error>>,
599 > + Send
600 + Unpin
601 + 'static),
602 ) -> Self {
603 let current_total_size = Arc::new(AtomicU64::new(0));
604 let total_size = current_total_size.clone();
605 let stream = stream.map(move |item| match item {
606 Ok(item) => {
607 let item = item.into();
608 if let crate::provider::AddProgress::Found { size, .. } = &item {
609 total_size.fetch_add(*size, Ordering::Relaxed);
610 }
611 Ok(item)
612 }
613 Err(err) => Err(err.into()),
614 });
615 Self {
616 stream: Box::pin(stream),
617 current_total_size,
618 }
619 }
620 pub async fn finish(self) -> Result<AddOutcome> {
628 self.await
629 }
630}
631
632impl Stream for AddProgress {
633 type Item = Result<crate::provider::AddProgress>;
634 fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
635 Pin::new(&mut self.stream).poll_next(cx)
636 }
637}
638
639impl Future for AddProgress {
640 type Output = Result<AddOutcome>;
641
642 fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
643 loop {
644 match Pin::new(&mut self.stream).poll_next(cx) {
645 Poll::Pending => return Poll::Pending,
646 Poll::Ready(None) => {
647 return Poll::Ready(Err(anyhow!("Response stream ended prematurely")))
648 }
649 Poll::Ready(Some(Err(err))) => return Poll::Ready(Err(err)),
650 Poll::Ready(Some(Ok(msg))) => match msg {
651 crate::provider::AddProgress::AllDone { hash, format, tag } => {
652 let outcome = AddOutcome {
653 hash,
654 format,
655 tag,
656 size: self.current_total_size.load(Ordering::Relaxed),
657 };
658 return Poll::Ready(Ok(outcome));
659 }
660 crate::provider::AddProgress::Abort(err) => {
661 return Poll::Ready(Err(err.into()));
662 }
663 _ => {}
664 },
665 }
666 }
667 }
668}
669
670#[derive(Debug, Clone)]
672pub struct DownloadOutcome {
673 pub local_size: u64,
675 pub downloaded_size: u64,
677 pub stats: crate::get::Stats,
679}
680
681#[derive(derive_more::Debug)]
683pub struct DownloadProgress {
684 #[debug(skip)]
685 stream: Pin<Box<dyn Stream<Item = Result<BytesDownloadProgress>> + Send + Unpin + 'static>>,
686 current_local_size: Arc<AtomicU64>,
687 current_network_size: Arc<AtomicU64>,
688}
689
690impl DownloadProgress {
691 pub fn new(
693 stream: (impl Stream<Item = Result<impl Into<BytesDownloadProgress>, impl Into<anyhow::Error>>>
694 + Send
695 + Unpin
696 + 'static),
697 ) -> Self {
698 let current_local_size = Arc::new(AtomicU64::new(0));
699 let current_network_size = Arc::new(AtomicU64::new(0));
700
701 let local_size = current_local_size.clone();
702 let network_size = current_network_size.clone();
703
704 let stream = stream.map(move |item| match item {
705 Ok(item) => {
706 let item = item.into();
707 match &item {
708 BytesDownloadProgress::FoundLocal { size, .. } => {
709 local_size.fetch_add(size.value(), Ordering::Relaxed);
710 }
711 BytesDownloadProgress::Found { size, .. } => {
712 network_size.fetch_add(*size, Ordering::Relaxed);
713 }
714 _ => {}
715 }
716
717 Ok(item)
718 }
719 Err(err) => Err(err.into()),
720 });
721 Self {
722 stream: Box::pin(stream),
723 current_local_size,
724 current_network_size,
725 }
726 }
727
728 pub async fn finish(self) -> Result<DownloadOutcome> {
734 self.await
735 }
736}
737
738impl Stream for DownloadProgress {
739 type Item = Result<BytesDownloadProgress>;
740 fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
741 Pin::new(&mut self.stream).poll_next(cx)
742 }
743}
744
745impl Future for DownloadProgress {
746 type Output = Result<DownloadOutcome>;
747
748 fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
749 loop {
750 match Pin::new(&mut self.stream).poll_next(cx) {
751 Poll::Pending => return Poll::Pending,
752 Poll::Ready(None) => {
753 return Poll::Ready(Err(anyhow!("Response stream ended prematurely")))
754 }
755 Poll::Ready(Some(Err(err))) => return Poll::Ready(Err(err)),
756 Poll::Ready(Some(Ok(msg))) => match msg {
757 BytesDownloadProgress::AllDone(stats) => {
758 let outcome = DownloadOutcome {
759 local_size: self.current_local_size.load(Ordering::Relaxed),
760 downloaded_size: self.current_network_size.load(Ordering::Relaxed),
761 stats,
762 };
763 return Poll::Ready(Ok(outcome));
764 }
765 BytesDownloadProgress::Abort(err) => {
766 return Poll::Ready(Err(err.into()));
767 }
768 _ => {}
769 },
770 }
771 }
772 }
773}
774
775#[derive(Debug, Clone, PartialEq, Eq)]
777pub struct ExportOutcome {
778 total_size: u64,
780}
781
782#[derive(derive_more::Debug)]
784pub struct ExportProgress {
785 #[debug(skip)]
786 stream: Pin<Box<dyn Stream<Item = Result<BytesExportProgress>> + Send + Unpin + 'static>>,
787 current_total_size: Arc<AtomicU64>,
788}
789
790impl ExportProgress {
791 pub fn new(
794 stream: (impl Stream<Item = Result<impl Into<BytesExportProgress>, impl Into<anyhow::Error>>>
795 + Send
796 + Unpin
797 + 'static),
798 ) -> Self {
799 let current_total_size = Arc::new(AtomicU64::new(0));
800 let total_size = current_total_size.clone();
801 let stream = stream.map(move |item| match item {
802 Ok(item) => {
803 let item = item.into();
804 if let BytesExportProgress::Found { size, .. } = &item {
805 let size = size.value();
806 total_size.fetch_add(size, Ordering::Relaxed);
807 }
808
809 Ok(item)
810 }
811 Err(err) => Err(err.into()),
812 });
813 Self {
814 stream: Box::pin(stream),
815 current_total_size,
816 }
817 }
818
819 pub async fn finish(self) -> Result<ExportOutcome> {
823 self.await
824 }
825}
826
827impl Stream for ExportProgress {
828 type Item = Result<BytesExportProgress>;
829 fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
830 Pin::new(&mut self.stream).poll_next(cx)
831 }
832}
833
834impl Future for ExportProgress {
835 type Output = Result<ExportOutcome>;
836
837 fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
838 loop {
839 match Pin::new(&mut self.stream).poll_next(cx) {
840 Poll::Pending => return Poll::Pending,
841 Poll::Ready(None) => {
842 return Poll::Ready(Err(anyhow!("Response stream ended prematurely")))
843 }
844 Poll::Ready(Some(Err(err))) => return Poll::Ready(Err(err)),
845 Poll::Ready(Some(Ok(msg))) => match msg {
846 BytesExportProgress::AllDone => {
847 let outcome = ExportOutcome {
848 total_size: self.current_total_size.load(Ordering::Relaxed),
849 };
850 return Poll::Ready(Ok(outcome));
851 }
852 BytesExportProgress::Abort(err) => {
853 return Poll::Ready(Err(err.into()));
854 }
855 _ => {}
856 },
857 }
858 }
859 }
860}
861
862#[derive(derive_more::Debug)]
866pub struct Reader {
867 size: u64,
868 response_size: u64,
869 is_complete: bool,
870 #[debug("StreamReader")]
871 stream: tokio_util::io::StreamReader<BoxStreamSync<'static, io::Result<Bytes>>, Bytes>,
872}
873
874impl Reader {
875 fn new(
876 size: u64,
877 response_size: u64,
878 is_complete: bool,
879 stream: BoxStreamSync<'static, io::Result<Bytes>>,
880 ) -> Self {
881 Self {
882 size,
883 response_size,
884 is_complete,
885 stream: StreamReader::new(stream),
886 }
887 }
888
889 pub async fn from_rpc_read<C>(
891 rpc: &RpcClient<RpcService, C>,
892 hash: Hash,
893 ) -> anyhow::Result<Self>
894 where
895 C: Connector<RpcService>,
896 {
897 Self::from_rpc_read_at(rpc, hash, 0, ReadAtLen::All).await
898 }
899
900 async fn from_rpc_read_at<C>(
901 rpc: &RpcClient<RpcService, C>,
902 hash: Hash,
903 offset: u64,
904 len: ReadAtLen,
905 ) -> anyhow::Result<Self>
906 where
907 C: Connector<RpcService>,
908 {
909 let stream = rpc
910 .server_streaming(ReadAtRequest { hash, offset, len })
911 .await?;
912 let mut stream = flatten(stream);
913
914 let (size, is_complete) = match stream.next().await {
915 Some(Ok(ReadAtResponse::Entry { size, is_complete })) => (size, is_complete),
916 Some(Err(err)) => return Err(err),
917 Some(Ok(_)) => return Err(anyhow!("Expected header frame, but got data frame")),
918 None => return Err(anyhow!("Expected header frame, but RPC stream was dropped")),
919 };
920
921 let stream = stream.map(|item| match item {
922 Ok(ReadAtResponse::Data { chunk }) => Ok(chunk),
923 Ok(_) => Err(io::Error::new(io::ErrorKind::Other, "Expected data frame")),
924 Err(err) => Err(io::Error::new(io::ErrorKind::Other, format!("{err}"))),
925 });
926 let len = len.as_result_len(size.value() - offset);
927 Ok(Self::new(size.value(), len, is_complete, Box::pin(stream)))
928 }
929
930 pub fn size(&self) -> u64 {
932 self.size
933 }
934
935 pub fn is_complete(&self) -> bool {
939 self.is_complete
940 }
941
942 pub async fn read_to_bytes(&mut self) -> anyhow::Result<Bytes> {
944 let mut buf = Vec::with_capacity(self.response_size as usize);
945 self.read_to_end(&mut buf).await?;
946 Ok(buf.into())
947 }
948}
949
950impl AsyncRead for Reader {
951 fn poll_read(
952 mut self: Pin<&mut Self>,
953 cx: &mut Context<'_>,
954 buf: &mut ReadBuf<'_>,
955 ) -> Poll<io::Result<()>> {
956 Pin::new(&mut self.stream).poll_read(cx, buf)
957 }
958}
959
960impl Stream for Reader {
961 type Item = io::Result<Bytes>;
962
963 fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
964 Pin::new(&mut self.stream).get_pin_mut().poll_next(cx)
965 }
966
967 fn size_hint(&self) -> (usize, Option<usize>) {
968 self.stream.get_ref().size_hint()
969 }
970}
971
972#[derive(Debug, Clone, Serialize, Deserialize)]
974pub struct DownloadOptions {
975 pub format: BlobFormat,
977 pub nodes: Vec<NodeAddr>,
984 pub tag: SetTagOption,
986 pub mode: DownloadMode,
988}
989
990fn chunked_bytes_stream(mut b: Bytes, c: usize) -> impl Stream<Item = Bytes> {
991 futures_lite::stream::iter(std::iter::from_fn(move || {
992 Some(b.split_to(b.len().min(c))).filter(|x| !x.is_empty())
993 }))
994}
995
996#[cfg(test)]
997mod tests {
998 use std::{path::Path, time::Duration};
999
1000 use iroh::{test_utils::DnsPkarrServer, NodeId, RelayMode, SecretKey};
1001 use node::Node;
1002 use rand::RngCore;
1003 use testresult::TestResult;
1004 use tokio::{io::AsyncWriteExt, sync::mpsc};
1005 use tracing_test::traced_test;
1006
1007 use super::*;
1008 use crate::{hashseq::HashSeq, ticket::BlobTicket};
1009
1010 mod node {
1011 use std::path::Path;
1013
1014 use iroh::{protocol::Router, Endpoint, NodeAddr, NodeId};
1015 use tokio_util::task::AbortOnDropHandle;
1016
1017 use super::RpcService;
1018 use crate::{
1019 net_protocol::Blobs,
1020 provider::{CustomEventSender, EventSender},
1021 rpc::client::{blobs, tags},
1022 };
1023
1024 type RpcClient = quic_rpc::RpcClient<RpcService>;
1025
1026 #[derive(Debug)]
1028 pub struct Node {
1029 router: iroh::protocol::Router,
1030 client: RpcClient,
1031 _rpc_task: AbortOnDropHandle<()>,
1032 }
1033
1034 #[derive(Debug)]
1036 pub struct Builder<S> {
1037 store: S,
1038 events: EventSender,
1039 endpoint: Option<iroh::endpoint::Builder>,
1040 }
1041
1042 impl<S: crate::store::Store> Builder<S> {
1043 pub fn blobs_events(self, events: impl CustomEventSender) -> Self {
1045 Self {
1046 events: events.into(),
1047 ..self
1048 }
1049 }
1050
1051 pub fn endpoint(self, endpoint: iroh::endpoint::Builder) -> Self {
1053 Self {
1054 endpoint: Some(endpoint),
1055 ..self
1056 }
1057 }
1058
1059 pub async fn spawn(self) -> anyhow::Result<Node> {
1061 let store = self.store;
1062 let events = self.events;
1063 let endpoint = self
1064 .endpoint
1065 .unwrap_or_else(|| Endpoint::builder().discovery_n0())
1066 .bind()
1067 .await?;
1068 let mut router = Router::builder(endpoint.clone());
1069
1070 let blobs = Blobs::builder(store.clone())
1072 .events(events)
1073 .build(&endpoint);
1074 router = router.accept(crate::ALPN, blobs.clone());
1075
1076 let router = router.spawn().await?;
1078
1079 let (internal_rpc, controller) = quic_rpc::transport::flume::channel(32);
1081 let internal_rpc = quic_rpc::RpcServer::new(internal_rpc).boxed();
1082 let _rpc_task = internal_rpc.spawn_accept_loop(move |msg, chan| {
1083 blobs.clone().handle_rpc_request(msg, chan)
1084 });
1085 let client = quic_rpc::RpcClient::new(controller).boxed();
1086 Ok(Node {
1087 router,
1088 client,
1089 _rpc_task,
1090 })
1091 }
1092 }
1093
1094 impl Node {
1095 pub fn memory() -> Builder<crate::store::mem::Store> {
1097 Builder {
1098 store: crate::store::mem::Store::new(),
1099 events: Default::default(),
1100 endpoint: None,
1101 }
1102 }
1103
1104 pub async fn persistent(
1106 path: impl AsRef<Path>,
1107 ) -> anyhow::Result<Builder<crate::store::fs::Store>> {
1108 Ok(Builder {
1109 store: crate::store::fs::Store::load(path).await?,
1110 events: Default::default(),
1111 endpoint: None,
1112 })
1113 }
1114
1115 pub fn node_id(&self) -> NodeId {
1117 self.router.endpoint().node_id()
1118 }
1119
1120 pub async fn node_addr(&self) -> anyhow::Result<NodeAddr> {
1122 self.router.endpoint().node_addr().await
1123 }
1124
1125 pub async fn shutdown(self) -> anyhow::Result<()> {
1127 self.router.shutdown().await
1128 }
1129
1130 pub fn blobs(&self) -> blobs::Client {
1132 blobs::Client::new(self.client.clone())
1133 }
1134
1135 pub fn tags(&self) -> tags::Client {
1137 tags::Client::new(self.client.clone())
1138 }
1139 }
1140 }
1141
1142 #[tokio::test]
1143 #[traced_test]
1144 async fn test_blob_create_collection() -> Result<()> {
1145 let node = node::Node::memory().spawn().await?;
1146
1147 let temp_dir = tempfile::tempdir().context("tempdir")?;
1149
1150 let in_root = temp_dir.path().join("in");
1151 tokio::fs::create_dir_all(in_root.clone())
1152 .await
1153 .context("create dir all")?;
1154
1155 let mut paths = Vec::new();
1156 for i in 0..5 {
1157 let path = in_root.join(format!("test-{i}"));
1158 let size = 100;
1159 let mut buf = vec![0u8; size];
1160 rand::thread_rng().fill_bytes(&mut buf);
1161 let mut file = tokio::fs::File::create(path.clone())
1162 .await
1163 .context("create file")?;
1164 file.write_all(&buf.clone()).await.context("write_all")?;
1165 file.flush().await.context("flush")?;
1166 paths.push(path);
1167 }
1168
1169 let blobs = node.blobs();
1170
1171 let mut collection = Collection::default();
1172 let mut tags = Vec::new();
1173 for path in &paths {
1175 let import_outcome = blobs
1176 .add_from_path(
1177 path.to_path_buf(),
1178 false,
1179 SetTagOption::Auto,
1180 WrapOption::NoWrap,
1181 )
1182 .await
1183 .context("import file")?
1184 .finish()
1185 .await
1186 .context("import finish")?;
1187
1188 collection.push(
1189 path.file_name().unwrap().to_str().unwrap().to_string(),
1190 import_outcome.hash,
1191 );
1192 tags.push(import_outcome.tag);
1193 }
1194
1195 let (hash, tag) = blobs
1196 .create_collection(collection, SetTagOption::Auto, tags)
1197 .await?;
1198
1199 let collections: Vec<_> = blobs.list_collections()?.try_collect().await?;
1200
1201 assert_eq!(collections.len(), 1);
1202 {
1203 let CollectionInfo {
1204 tag,
1205 hash,
1206 total_blobs_count,
1207 ..
1208 } = &collections[0];
1209 assert_eq!(tag, tag);
1210 assert_eq!(hash, hash);
1211 assert_eq!(total_blobs_count, &Some(5 + 1));
1213 }
1214
1215 let tags: Vec<_> = node.tags().list().await?.try_collect().await?;
1217 assert_eq!(tags.len(), 1);
1218 assert_eq!(tags[0].hash, hash);
1219 assert_eq!(tags[0].name, tag);
1220 assert_eq!(tags[0].format, BlobFormat::HashSeq);
1221
1222 Ok(())
1223 }
1224
1225 #[tokio::test]
1226 #[traced_test]
1227 async fn test_blob_read_at() -> Result<()> {
1228 let node = node::Node::memory().spawn().await?;
1229
1230 let temp_dir = tempfile::tempdir().context("tempdir")?;
1232
1233 let in_root = temp_dir.path().join("in");
1234 tokio::fs::create_dir_all(in_root.clone())
1235 .await
1236 .context("create dir all")?;
1237
1238 let path = in_root.join("test-blob");
1239 let size = 1024 * 128;
1240 let buf: Vec<u8> = (0..size).map(|i| i as u8).collect();
1241 let mut file = tokio::fs::File::create(path.clone())
1242 .await
1243 .context("create file")?;
1244 file.write_all(&buf.clone()).await.context("write_all")?;
1245 file.flush().await.context("flush")?;
1246
1247 let blobs = node.blobs();
1248
1249 let import_outcome = blobs
1250 .add_from_path(
1251 path.to_path_buf(),
1252 false,
1253 SetTagOption::Auto,
1254 WrapOption::NoWrap,
1255 )
1256 .await
1257 .context("import file")?
1258 .finish()
1259 .await
1260 .context("import finish")?;
1261
1262 let hash = import_outcome.hash;
1263
1264 let res = blobs.read_to_bytes(hash).await?;
1266 assert_eq!(&res, &buf[..]);
1267
1268 let res = blobs
1270 .read_at_to_bytes(hash, 0, ReadAtLen::Exact(100))
1271 .await?;
1272 assert_eq!(res.len(), 100);
1273 assert_eq!(&res[..], &buf[0..100]);
1274
1275 let res = blobs
1276 .read_at_to_bytes(hash, 20, ReadAtLen::Exact(120))
1277 .await?;
1278 assert_eq!(res.len(), 120);
1279 assert_eq!(&res[..], &buf[20..140]);
1280
1281 let res = blobs
1283 .read_at_to_bytes(hash, 0, ReadAtLen::Exact(1024 * 64))
1284 .await?;
1285 assert_eq!(res.len(), 1024 * 64);
1286 assert_eq!(&res[..], &buf[0..1024 * 64]);
1287
1288 let res = blobs
1289 .read_at_to_bytes(hash, 20, ReadAtLen::Exact(1024 * 64))
1290 .await?;
1291 assert_eq!(res.len(), 1024 * 64);
1292 assert_eq!(&res[..], &buf[20..(20 + 1024 * 64)]);
1293
1294 let res = blobs
1296 .read_at_to_bytes(hash, 0, ReadAtLen::Exact(10 + 1024 * 64))
1297 .await?;
1298 assert_eq!(res.len(), 10 + 1024 * 64);
1299 assert_eq!(&res[..], &buf[0..(10 + 1024 * 64)]);
1300
1301 let res = blobs
1302 .read_at_to_bytes(hash, 20, ReadAtLen::Exact(10 + 1024 * 64))
1303 .await?;
1304 assert_eq!(res.len(), 10 + 1024 * 64);
1305 assert_eq!(&res[..], &buf[20..(20 + 10 + 1024 * 64)]);
1306
1307 let res = blobs.read_at_to_bytes(hash, 20, ReadAtLen::All).await?;
1309 assert_eq!(res.len(), 1024 * 128 - 20);
1310 assert_eq!(&res[..], &buf[20..]);
1311
1312 let reader = blobs.read_at(hash, 0, ReadAtLen::Exact(20)).await?;
1314 assert_eq!(reader.size(), 1024 * 128);
1315 assert_eq!(reader.response_size, 20);
1316
1317 let res = blobs
1319 .read_at_to_bytes(hash, 1024 * 127, ReadAtLen::Exact(1024))
1320 .await?;
1321 assert_eq!(res.len(), 1024);
1322 assert_eq!(res, &buf[1024 * 127..]);
1323
1324 let res = blobs
1326 .read_at_to_bytes(hash, 1024 * 127, ReadAtLen::All)
1327 .await?;
1328 assert_eq!(res.len(), 1024);
1329 assert_eq!(res, &buf[1024 * 127..]);
1330
1331 let mut res = blobs
1333 .read_at(hash, 1024 * 127, ReadAtLen::AtMost(2048))
1334 .await?;
1335 assert_eq!(res.size, 1024 * 128);
1336 assert_eq!(res.response_size, 1024);
1337 let res = res.read_to_bytes().await?;
1338 assert_eq!(res.len(), 1024);
1339 assert_eq!(res, &buf[1024 * 127..]);
1340
1341 let res = blobs
1343 .read_at(hash, 0, ReadAtLen::Exact(1024 * 128 + 1))
1344 .await;
1345 let err = res.unwrap_err();
1346 assert!(err.to_string().contains("out of bound"));
1347
1348 let res = blobs.read_at(hash, 1024 * 128 + 1, ReadAtLen::All).await;
1350 let err = res.unwrap_err();
1351 assert!(err.to_string().contains("out of range"));
1352
1353 let res = blobs
1355 .read_at(hash, 1024 * 127, ReadAtLen::Exact(1025))
1356 .await;
1357 let err = res.unwrap_err();
1358 assert!(err.to_string().contains("out of bound"));
1359
1360 Ok(())
1361 }
1362
1363 #[tokio::test]
1364 #[traced_test]
1365 async fn test_blob_get_collection() -> Result<()> {
1366 let node = node::Node::memory().spawn().await?;
1367
1368 let temp_dir = tempfile::tempdir().context("tempdir")?;
1370
1371 let in_root = temp_dir.path().join("in");
1372 tokio::fs::create_dir_all(in_root.clone())
1373 .await
1374 .context("create dir all")?;
1375
1376 let mut paths = Vec::new();
1377 for i in 0..5 {
1378 let path = in_root.join(format!("test-{i}"));
1379 let size = 100;
1380 let mut buf = vec![0u8; size];
1381 rand::thread_rng().fill_bytes(&mut buf);
1382 let mut file = tokio::fs::File::create(path.clone())
1383 .await
1384 .context("create file")?;
1385 file.write_all(&buf.clone()).await.context("write_all")?;
1386 file.flush().await.context("flush")?;
1387 paths.push(path);
1388 }
1389
1390 let blobs = node.blobs();
1391
1392 let mut collection = Collection::default();
1393 let mut tags = Vec::new();
1394 for path in &paths {
1396 let import_outcome = blobs
1397 .add_from_path(
1398 path.to_path_buf(),
1399 false,
1400 SetTagOption::Auto,
1401 WrapOption::NoWrap,
1402 )
1403 .await
1404 .context("import file")?
1405 .finish()
1406 .await
1407 .context("import finish")?;
1408
1409 collection.push(
1410 path.file_name().unwrap().to_str().unwrap().to_string(),
1411 import_outcome.hash,
1412 );
1413 tags.push(import_outcome.tag);
1414 }
1415
1416 let (hash, _tag) = blobs
1417 .create_collection(collection, SetTagOption::Auto, tags)
1418 .await?;
1419
1420 let collection = blobs.get_collection(hash).await?;
1421
1422 assert_eq!(collection.len(), 5);
1424
1425 Ok(())
1426 }
1427
1428 #[tokio::test]
1429 #[traced_test]
1430 async fn test_blob_share() -> Result<()> {
1431 let node = node::Node::memory().spawn().await?;
1432
1433 let temp_dir = tempfile::tempdir().context("tempdir")?;
1435
1436 let in_root = temp_dir.path().join("in");
1437 tokio::fs::create_dir_all(in_root.clone())
1438 .await
1439 .context("create dir all")?;
1440
1441 let path = in_root.join("test-blob");
1442 let size = 1024 * 128;
1443 let buf: Vec<u8> = (0..size).map(|i| i as u8).collect();
1444 let mut file = tokio::fs::File::create(path.clone())
1445 .await
1446 .context("create file")?;
1447 file.write_all(&buf.clone()).await.context("write_all")?;
1448 file.flush().await.context("flush")?;
1449
1450 let blobs = node.blobs();
1451
1452 let import_outcome = blobs
1453 .add_from_path(
1454 path.to_path_buf(),
1455 false,
1456 SetTagOption::Auto,
1457 WrapOption::NoWrap,
1458 )
1459 .await
1460 .context("import file")?
1461 .finish()
1462 .await
1463 .context("import finish")?;
1464
1465 let status = blobs.status(import_outcome.hash).await?;
1471 assert_eq!(status, BlobStatus::Complete { size });
1472
1473 Ok(())
1474 }
1475
1476 #[derive(Debug, Clone)]
1477 struct BlobEvents {
1478 sender: mpsc::Sender<crate::provider::Event>,
1479 }
1480
1481 impl BlobEvents {
1482 fn new(cap: usize) -> (Self, mpsc::Receiver<crate::provider::Event>) {
1483 let (s, r) = mpsc::channel(cap);
1484 (Self { sender: s }, r)
1485 }
1486 }
1487
1488 impl crate::provider::CustomEventSender for BlobEvents {
1489 fn send(&self, event: crate::provider::Event) -> futures_lite::future::Boxed<()> {
1490 let sender = self.sender.clone();
1491 Box::pin(async move {
1492 sender.send(event).await.ok();
1493 })
1494 }
1495
1496 fn try_send(&self, event: crate::provider::Event) {
1497 self.sender.try_send(event).ok();
1498 }
1499 }
1500
1501 #[tokio::test]
1502 #[traced_test]
1503 async fn test_blob_provide_events() -> Result<()> {
1504 let (node1_events, mut node1_events_r) = BlobEvents::new(16);
1505 let node1 = node::Node::memory()
1506 .blobs_events(node1_events)
1507 .spawn()
1508 .await?;
1509
1510 let (node2_events, mut node2_events_r) = BlobEvents::new(16);
1511 let node2 = node::Node::memory()
1512 .blobs_events(node2_events)
1513 .spawn()
1514 .await?;
1515
1516 let import_outcome = node1.blobs().add_bytes(&b"hello world"[..]).await?;
1517
1518 let node1_addr = node1.node_addr().await?;
1520 let res = node2
1521 .blobs()
1522 .download(import_outcome.hash, node1_addr)
1523 .await?
1524 .await?;
1525 dbg!(&res);
1526 assert_eq!(res.local_size, 0);
1527 assert_eq!(res.downloaded_size, 11);
1528
1529 node1.shutdown().await?;
1530 node2.shutdown().await?;
1531
1532 let mut ev1 = Vec::new();
1533 while let Some(ev) = node1_events_r.recv().await {
1534 ev1.push(ev);
1535 }
1536 assert!(matches!(
1538 ev1[0],
1539 crate::provider::Event::ClientConnected { .. }
1540 ));
1541 assert!(matches!(
1542 ev1[1],
1543 crate::provider::Event::GetRequestReceived { .. }
1544 ));
1545 assert!(matches!(
1546 ev1[2],
1547 crate::provider::Event::TransferProgress { .. }
1548 ));
1549 assert!(matches!(
1550 ev1[3],
1551 crate::provider::Event::TransferCompleted { .. }
1552 ));
1553 dbg!(&ev1);
1554
1555 let mut ev2 = Vec::new();
1556 while let Some(ev) = node2_events_r.recv().await {
1557 ev2.push(ev);
1558 }
1559
1560 assert!(ev2.is_empty());
1562 Ok(())
1563 }
1564 #[tokio::test]
1566 #[traced_test]
1567 async fn test_blob_get_self_existing() -> TestResult<()> {
1568 let node = node::Node::memory().spawn().await?;
1569 let node_id = node.node_id();
1570 let blobs = node.blobs();
1571
1572 let AddOutcome { hash, size, .. } = blobs.add_bytes("foo").await?;
1573
1574 let res = blobs
1576 .download_with_opts(
1577 hash,
1578 DownloadOptions {
1579 format: BlobFormat::Raw,
1580 nodes: vec![node_id.into()],
1581 tag: SetTagOption::Auto,
1582 mode: DownloadMode::Direct,
1583 },
1584 )
1585 .await?
1586 .await?;
1587
1588 assert_eq!(res.local_size, size);
1589 assert_eq!(res.downloaded_size, 0);
1590
1591 let res = blobs
1593 .download_with_opts(
1594 hash,
1595 DownloadOptions {
1596 format: BlobFormat::Raw,
1597 nodes: vec![node_id.into()],
1598 tag: SetTagOption::Auto,
1599 mode: DownloadMode::Queued,
1600 },
1601 )
1602 .await?
1603 .await?;
1604
1605 assert_eq!(res.local_size, size);
1606 assert_eq!(res.downloaded_size, 0);
1607
1608 Ok(())
1609 }
1610
1611 #[tokio::test]
1613 #[traced_test]
1614 async fn test_blob_get_self_missing() -> TestResult<()> {
1615 let node = node::Node::memory().spawn().await?;
1616 let node_id = node.node_id();
1617 let blobs = node.blobs();
1618
1619 let hash = Hash::from_bytes([0u8; 32]);
1620
1621 let res = blobs
1623 .download_with_opts(
1624 hash,
1625 DownloadOptions {
1626 format: BlobFormat::Raw,
1627 nodes: vec![node_id.into()],
1628 tag: SetTagOption::Auto,
1629 mode: DownloadMode::Direct,
1630 },
1631 )
1632 .await?
1633 .await;
1634 assert!(res.is_err());
1635 assert_eq!(
1636 res.err().unwrap().to_string().as_str(),
1637 "No nodes to download from provided"
1638 );
1639
1640 let res = blobs
1642 .download_with_opts(
1643 hash,
1644 DownloadOptions {
1645 format: BlobFormat::Raw,
1646 nodes: vec![node_id.into()],
1647 tag: SetTagOption::Auto,
1648 mode: DownloadMode::Queued,
1649 },
1650 )
1651 .await?
1652 .await;
1653 assert!(res.is_err());
1654 assert_eq!(
1655 res.err().unwrap().to_string().as_str(),
1656 "No provider nodes found"
1657 );
1658
1659 Ok(())
1660 }
1661
1662 #[tokio::test]
1664 #[traced_test]
1665 async fn test_blob_get_existing_collection() -> TestResult<()> {
1666 let node = node::Node::memory().spawn().await?;
1667 let node_id = NodeId::from_bytes(&[0u8; 32])?;
1670 let blobs = node.blobs();
1671
1672 let mut collection = Collection::default();
1673 let mut tags = Vec::new();
1674 let mut size = 0;
1675 for value in ["iroh", "is", "cool"] {
1676 let import_outcome = blobs.add_bytes(value).await.context("add bytes")?;
1677 collection.push(value.to_string(), import_outcome.hash);
1678 tags.push(import_outcome.tag);
1679 size += import_outcome.size;
1680 }
1681
1682 let (hash, _tag) = blobs
1683 .create_collection(collection, SetTagOption::Auto, tags)
1684 .await?;
1685
1686 let hashseq_bytes = blobs.read_to_bytes(hash).await?;
1688 size += hashseq_bytes.len() as u64;
1689 let hashseq = HashSeq::try_from(hashseq_bytes)?;
1690 let collection_header_bytes = blobs
1691 .read_to_bytes(hashseq.into_iter().next().expect("header to exist"))
1692 .await?;
1693 size += collection_header_bytes.len() as u64;
1694
1695 let res = blobs
1697 .download_with_opts(
1698 hash,
1699 DownloadOptions {
1700 format: BlobFormat::HashSeq,
1701 nodes: vec![node_id.into()],
1702 tag: SetTagOption::Auto,
1703 mode: DownloadMode::Direct,
1704 },
1705 )
1706 .await?
1707 .await
1708 .context("direct (download)")?;
1709
1710 assert_eq!(res.local_size, size);
1711 assert_eq!(res.downloaded_size, 0);
1712
1713 let res = blobs
1715 .download_with_opts(
1716 hash,
1717 DownloadOptions {
1718 format: BlobFormat::HashSeq,
1719 nodes: vec![node_id.into()],
1720 tag: SetTagOption::Auto,
1721 mode: DownloadMode::Queued,
1722 },
1723 )
1724 .await?
1725 .await
1726 .context("queued")?;
1727
1728 assert_eq!(res.local_size, size);
1729 assert_eq!(res.downloaded_size, 0);
1730
1731 Ok(())
1732 }
1733
1734 #[tokio::test]
1735 #[traced_test]
1736 #[cfg_attr(target_os = "windows", ignore = "flaky")]
1737 async fn test_blob_delete_mem() -> Result<()> {
1738 let node = node::Node::memory().spawn().await?;
1739
1740 let res = node.blobs().add_bytes(&b"hello world"[..]).await?;
1741
1742 let hashes: Vec<_> = node.blobs().list().await?.try_collect().await?;
1743 assert_eq!(hashes.len(), 1);
1744 assert_eq!(hashes[0].hash, res.hash);
1745
1746 node.blobs().delete_blob(res.hash).await?;
1748
1749 let hashes: Vec<_> = node.blobs().list().await?.try_collect().await?;
1750 assert!(hashes.is_empty());
1751
1752 Ok(())
1753 }
1754
1755 #[tokio::test]
1756 #[traced_test]
1757 async fn test_blob_delete_fs() -> Result<()> {
1758 let dir = tempfile::tempdir()?;
1759 let node = node::Node::persistent(dir.path()).await?.spawn().await?;
1760
1761 let res = node.blobs().add_bytes(&b"hello world"[..]).await?;
1762
1763 let hashes: Vec<_> = node.blobs().list().await?.try_collect().await?;
1764 assert_eq!(hashes.len(), 1);
1765 assert_eq!(hashes[0].hash, res.hash);
1766
1767 node.blobs().delete_blob(res.hash).await?;
1769
1770 let hashes: Vec<_> = node.blobs().list().await?.try_collect().await?;
1771 assert!(hashes.is_empty());
1772
1773 Ok(())
1774 }
1775
1776 #[tokio::test]
1777 #[traced_test]
1778 async fn test_ticket_multiple_addrs() -> TestResult<()> {
1779 let node = Node::memory().spawn().await?;
1780 let hash = node
1781 .blobs()
1782 .add_bytes(Bytes::from_static(b"hello"))
1783 .await?
1784 .hash;
1785
1786 let addr = node.node_addr().await?;
1787 let ticket = BlobTicket::new(addr, hash, BlobFormat::Raw)?;
1788 println!("addrs: {:?}", ticket.node_addr());
1789 assert!(!ticket.node_addr().direct_addresses.is_empty());
1790 Ok(())
1791 }
1792
1793 #[tokio::test]
1794 #[traced_test]
1795 async fn test_node_add_blob_stream() -> Result<()> {
1796 use std::io::Cursor;
1797 let node = Node::memory().spawn().await?;
1798
1799 let blobs = node.blobs();
1800 let input = vec![2u8; 1024 * 256]; let reader = Cursor::new(input.clone());
1802 let progress = blobs.add_reader(reader, SetTagOption::Auto).await?;
1803 let outcome = progress.finish().await?;
1804 let hash = outcome.hash;
1805 let output = blobs.read_to_bytes(hash).await?;
1806 assert_eq!(input, output.to_vec());
1807 Ok(())
1808 }
1809
1810 #[tokio::test]
1811 #[traced_test]
1812 async fn test_node_add_tagged_blob_event() -> Result<()> {
1813 let node = Node::memory().spawn().await?;
1814
1815 let _got_hash = tokio::time::timeout(Duration::from_secs(10), async move {
1816 let mut stream = node
1817 .blobs()
1818 .add_from_path(
1819 Path::new(env!("CARGO_MANIFEST_DIR")).join("README.md"),
1820 false,
1821 SetTagOption::Auto,
1822 WrapOption::NoWrap,
1823 )
1824 .await?;
1825
1826 while let Some(progress) = stream.next().await {
1827 match progress? {
1828 crate::provider::AddProgress::AllDone { hash, .. } => {
1829 return Ok(hash);
1830 }
1831 crate::provider::AddProgress::Abort(e) => {
1832 anyhow::bail!("Error while adding data: {e}");
1833 }
1834 _ => {}
1835 }
1836 }
1837 anyhow::bail!("stream ended without providing data");
1838 })
1839 .await
1840 .context("timeout")?
1841 .context("get failed")?;
1842
1843 Ok(())
1844 }
1845
1846 #[tokio::test]
1847 #[traced_test]
1848 async fn test_download_via_relay() -> Result<()> {
1849 let (relay_map, relay_url, _guard) = iroh::test_utils::run_relay_server().await?;
1850
1851 let endpoint1 = iroh::Endpoint::builder()
1852 .relay_mode(RelayMode::Custom(relay_map.clone()))
1853 .insecure_skip_relay_cert_verify(true);
1854 let node1 = Node::memory().endpoint(endpoint1).spawn().await?;
1855 let endpoint2 = iroh::Endpoint::builder()
1856 .relay_mode(RelayMode::Custom(relay_map.clone()))
1857 .insecure_skip_relay_cert_verify(true);
1858 let node2 = Node::memory().endpoint(endpoint2).spawn().await?;
1859 let AddOutcome { hash, .. } = node1.blobs().add_bytes(b"foo".to_vec()).await?;
1860
1861 let addr = NodeAddr::new(node1.node_id()).with_relay_url(relay_url);
1863 node2.blobs().download(hash, addr).await?.await?;
1864 assert_eq!(
1865 node2
1866 .blobs()
1867 .read_to_bytes(hash)
1868 .await
1869 .context("get")?
1870 .as_ref(),
1871 b"foo"
1872 );
1873 Ok(())
1874 }
1875
1876 #[tokio::test]
1877 #[traced_test]
1878 #[ignore = "flaky"]
1879 async fn test_download_via_relay_with_discovery() -> Result<()> {
1880 let (relay_map, _relay_url, _guard) = iroh::test_utils::run_relay_server().await?;
1881 let dns_pkarr_server = DnsPkarrServer::run().await?;
1882
1883 let mut rng = rand::thread_rng();
1884
1885 let secret1 = SecretKey::generate(&mut rng);
1886 let endpoint1 = iroh::Endpoint::builder()
1887 .relay_mode(RelayMode::Custom(relay_map.clone()))
1888 .insecure_skip_relay_cert_verify(true)
1889 .dns_resolver(dns_pkarr_server.dns_resolver())
1890 .secret_key(secret1.clone())
1891 .discovery(dns_pkarr_server.discovery(secret1));
1892 let node1 = Node::memory().endpoint(endpoint1).spawn().await?;
1893 let secret2 = SecretKey::generate(&mut rng);
1894 let endpoint2 = iroh::Endpoint::builder()
1895 .relay_mode(RelayMode::Custom(relay_map.clone()))
1896 .insecure_skip_relay_cert_verify(true)
1897 .dns_resolver(dns_pkarr_server.dns_resolver())
1898 .secret_key(secret2.clone())
1899 .discovery(dns_pkarr_server.discovery(secret2));
1900 let node2 = Node::memory().endpoint(endpoint2).spawn().await?;
1901 let hash = node1.blobs().add_bytes(b"foo".to_vec()).await?.hash;
1902
1903 let addr = NodeAddr::new(node1.node_id());
1905 node2.blobs().download(hash, addr).await?.await?;
1906 assert_eq!(
1907 node2
1908 .blobs()
1909 .read_to_bytes(hash)
1910 .await
1911 .context("get")?
1912 .as_ref(),
1913 b"foo"
1914 );
1915 Ok(())
1916 }
1917}