iroh_blobs/rpc/client/
blobs.rs

1//! API for blobs management.
2//!
3//! The main entry point is the [`Client`].
4//!
5//! ## Interacting with the local blob store
6//!
7//! ### Importing data
8//!
9//! There are several ways to import data into the local blob store:
10//!
11//! - [`add_bytes`](Client::add_bytes)
12//!   imports in memory data.
13//! - [`add_stream`](Client::add_stream)
14//!   imports data from a stream of bytes.
15//! - [`add_reader`](Client::add_reader)
16//!   imports data from an [async reader](tokio::io::AsyncRead).
17//! - [`add_from_path`](Client::add_from_path)
18//!   imports data from a file.
19//!
20//! The last method imports data from a file on the local filesystem.
21//! This is the most efficient way to import large amounts of data.
22//!
23//! ### Exporting data
24//!
25//! There are several ways to export data from the local blob store:
26//!
27//! - [`read_to_bytes`](Client::read_to_bytes) reads data into memory.
28//! - [`read`](Client::read) creates a [reader](Reader) to read data from.
29//! - [`export`](Client::export) eports data to a file on the local filesystem.
30//!
31//! ## Interacting with remote nodes
32//!
33//! - [`download`](Client::download) downloads data from a remote node.
34//!   remote node.
35//!
36//! ## Interacting with the blob store itself
37//!
38//! These are more advanced operations that are usually not needed in normal
39//! operation.
40//!
41//! - [`consistency_check`](Client::consistency_check) checks the internal
42//!   consistency of the local blob store.
43//! - [`validate`](Client::validate) validates the locally stored data against
44//!   their BLAKE3 hashes.
45//! - [`delete_blob`](Client::delete_blob) deletes a blob from the local store.
46//!
47//! ### Batch operations
48//!
49//! For complex update operations, there is a [`batch`](Client::batch) API that
50//! allows you to add multiple blobs in a single logical batch.
51//!
52//! Operations in a batch return [temporary tags](crate::util::TempTag) that
53//! protect the added data from garbage collection as long as the batch is
54//! alive.
55//!
56//! To store the data permanently, a temp tag needs to be upgraded to a
57//! permanent tag using [`persist`](crate::rpc::client::blobs::Batch::persist) or
58//! [`persist_to`](crate::rpc::client::blobs::Batch::persist_to).
59use std::{
60    future::Future,
61    io,
62    path::PathBuf,
63    pin::Pin,
64    sync::Arc,
65    task::{Context, Poll},
66};
67
68use anyhow::{anyhow, Context as _, Result};
69use bytes::Bytes;
70use futures_lite::{Stream, StreamExt};
71use futures_util::SinkExt;
72use genawaiter::sync::{Co, Gen};
73use iroh::NodeAddr;
74use portable_atomic::{AtomicU64, Ordering};
75use quic_rpc::{
76    client::{BoxStreamSync, BoxedConnector},
77    Connector, RpcClient,
78};
79use serde::{Deserialize, Serialize};
80use tokio::io::{AsyncRead, AsyncReadExt, ReadBuf};
81use tokio_util::io::{ReaderStream, StreamReader};
82use tracing::warn;
83
84pub use crate::net_protocol::DownloadMode;
85use crate::{
86    export::ExportProgress as BytesExportProgress,
87    format::collection::{Collection, SimpleStore},
88    get::db::DownloadProgress as BytesDownloadProgress,
89    net_protocol::BlobDownloadRequest,
90    rpc::proto::RpcService,
91    store::{BaoBlobSize, ConsistencyCheckProgress, ExportFormat, ExportMode, ValidateProgress},
92    util::SetTagOption,
93    BlobFormat, Hash, Tag,
94};
95
96mod batch;
97pub use batch::{AddDirOpts, AddFileOpts, AddReaderOpts, Batch};
98
99use super::{flatten, tags};
100use crate::rpc::proto::blobs::{
101    AddPathRequest, AddStreamRequest, AddStreamUpdate, BatchCreateRequest, BatchCreateResponse,
102    BlobStatusRequest, ConsistencyCheckRequest, CreateCollectionRequest, CreateCollectionResponse,
103    DeleteRequest, ExportRequest, ListIncompleteRequest, ListRequest, ReadAtRequest,
104    ReadAtResponse, ValidateRequest,
105};
106
107/// Iroh blobs client.
108#[derive(Debug, Clone)]
109#[repr(transparent)]
110pub struct Client<C = BoxedConnector<RpcService>> {
111    pub(super) rpc: RpcClient<RpcService, C>,
112}
113
114/// Type alias for a memory-backed client.
115pub type MemClient = Client<crate::rpc::MemConnector>;
116
117impl<C> Client<C>
118where
119    C: Connector<RpcService>,
120{
121    /// Create a new client
122    pub fn new(rpc: RpcClient<RpcService, C>) -> Self {
123        Self { rpc }
124    }
125
126    /// Get a tags client.
127    pub fn tags(&self) -> tags::Client<C> {
128        tags::Client::new(self.rpc.clone())
129    }
130
131    /// Check if a blob is completely stored on the node.
132    ///
133    /// Note that this will return false for blobs that are partially stored on
134    /// the node.
135    pub async fn status(&self, hash: Hash) -> Result<BlobStatus> {
136        let status = self.rpc.rpc(BlobStatusRequest { hash }).await??;
137        Ok(status.0)
138    }
139
140    /// Check if a blob is completely stored on the node.
141    ///
142    /// This is just a convenience wrapper around `status` that returns a boolean.
143    pub async fn has(&self, hash: Hash) -> Result<bool> {
144        match self.status(hash).await {
145            Ok(BlobStatus::Complete { .. }) => Ok(true),
146            Ok(_) => Ok(false),
147            Err(err) => Err(err),
148        }
149    }
150
151    /// Create a new batch for adding data.
152    ///
153    /// A batch is a context in which temp tags are created and data is added to the node. Temp tags
154    /// are automatically deleted when the batch is dropped, leading to the data being garbage collected
155    /// unless a permanent tag is created for it.
156    pub async fn batch(&self) -> Result<Batch<C>> {
157        let (updates, mut stream) = self.rpc.bidi(BatchCreateRequest).await?;
158        let BatchCreateResponse::Id(batch) = stream.next().await.context("expected scope id")??;
159        let rpc = self.rpc.clone();
160        Ok(Batch::new(batch, rpc, updates, 1024))
161    }
162
163    /// Stream the contents of a a single blob.
164    ///
165    /// Returns a [`Reader`], which can report the size of the blob before reading it.
166    pub async fn read(&self, hash: Hash) -> Result<Reader> {
167        Reader::from_rpc_read(&self.rpc, hash).await
168    }
169
170    /// Read offset + len from a single blob.
171    ///
172    /// If `len` is `None` it will read the full blob.
173    pub async fn read_at(&self, hash: Hash, offset: u64, len: ReadAtLen) -> Result<Reader> {
174        Reader::from_rpc_read_at(&self.rpc, hash, offset, len).await
175    }
176
177    /// Read all bytes of single blob.
178    ///
179    /// This allocates a buffer for the full blob. Use only if you know that the blob you're
180    /// reading is small. If not sure, use [`Self::read`] and check the size with
181    /// [`Reader::size`] before calling [`Reader::read_to_bytes`].
182    pub async fn read_to_bytes(&self, hash: Hash) -> Result<Bytes> {
183        Reader::from_rpc_read(&self.rpc, hash)
184            .await?
185            .read_to_bytes()
186            .await
187    }
188
189    /// Read all bytes of single blob at `offset` for length `len`.
190    ///
191    /// This allocates a buffer for the full length.
192    pub async fn read_at_to_bytes(&self, hash: Hash, offset: u64, len: ReadAtLen) -> Result<Bytes> {
193        Reader::from_rpc_read_at(&self.rpc, hash, offset, len)
194            .await?
195            .read_to_bytes()
196            .await
197    }
198
199    /// Import a blob from a filesystem path.
200    ///
201    /// `path` should be an absolute path valid for the file system on which
202    /// the node runs.
203    /// If `in_place` is true, Iroh will assume that the data will not change and will share it in
204    /// place without copying to the Iroh data directory.
205    pub async fn add_from_path(
206        &self,
207        path: PathBuf,
208        in_place: bool,
209        tag: SetTagOption,
210        wrap: WrapOption,
211    ) -> Result<AddProgress> {
212        let stream = self
213            .rpc
214            .server_streaming(AddPathRequest {
215                path,
216                in_place,
217                tag,
218                wrap,
219            })
220            .await?;
221        Ok(AddProgress::new(stream))
222    }
223
224    /// Create a collection from already existing blobs.
225    ///
226    /// For automatically clearing the tags for the passed in blobs you can set
227    /// `tags_to_delete` to those tags, and they will be deleted once the collection is created.
228    pub async fn create_collection(
229        &self,
230        collection: Collection,
231        tag: SetTagOption,
232        tags_to_delete: Vec<Tag>,
233    ) -> anyhow::Result<(Hash, Tag)> {
234        let CreateCollectionResponse { hash, tag } = self
235            .rpc
236            .rpc(CreateCollectionRequest {
237                collection,
238                tag,
239                tags_to_delete,
240            })
241            .await??;
242        Ok((hash, tag))
243    }
244
245    /// Write a blob by passing an async reader.
246    pub async fn add_reader(
247        &self,
248        reader: impl AsyncRead + Unpin + Send + 'static,
249        tag: SetTagOption,
250    ) -> anyhow::Result<AddProgress> {
251        const CAP: usize = 1024 * 64; // send 64KB per request by default
252        let input = ReaderStream::with_capacity(reader, CAP);
253        self.add_stream(input, tag).await
254    }
255
256    /// Write a blob by passing a stream of bytes.
257    pub async fn add_stream(
258        &self,
259        input: impl Stream<Item = io::Result<Bytes>> + Send + Unpin + 'static,
260        tag: SetTagOption,
261    ) -> anyhow::Result<AddProgress> {
262        let (mut sink, progress) = self.rpc.bidi(AddStreamRequest { tag }).await?;
263        let mut input = input.map(|chunk| match chunk {
264            Ok(chunk) => Ok(AddStreamUpdate::Chunk(chunk)),
265            Err(err) => {
266                warn!("Abort send, reason: failed to read from source stream: {err:?}");
267                Ok(AddStreamUpdate::Abort)
268            }
269        });
270        tokio::spawn(async move {
271            if let Err(err) = sink.send_all(&mut input).await {
272                // if we get an error in send_all due to the connection being closed, this will just fail again.
273                // if we get an error due to something else (serialization or size limit), tell the remote to abort.
274                sink.send(AddStreamUpdate::Abort).await.ok();
275                warn!("Failed to send input stream to remote: {err:?}");
276            }
277        });
278
279        Ok(AddProgress::new(progress))
280    }
281
282    /// Write a blob by passing bytes.
283    pub async fn add_bytes(&self, bytes: impl Into<Bytes>) -> anyhow::Result<AddOutcome> {
284        let input = chunked_bytes_stream(bytes.into(), 1024 * 64).map(Ok);
285        self.add_stream(input, SetTagOption::Auto).await?.await
286    }
287
288    /// Write a blob by passing bytes, setting an explicit tag name.
289    pub async fn add_bytes_named(
290        &self,
291        bytes: impl Into<Bytes>,
292        name: impl Into<Tag>,
293    ) -> anyhow::Result<AddOutcome> {
294        let input = chunked_bytes_stream(bytes.into(), 1024 * 64).map(Ok);
295        self.add_stream(input, SetTagOption::Named(name.into()))
296            .await?
297            .await
298    }
299
300    /// Validate hashes on the running node.
301    ///
302    /// If `repair` is true, repair the store by removing invalid data.
303    pub async fn validate(
304        &self,
305        repair: bool,
306    ) -> Result<impl Stream<Item = Result<ValidateProgress>>> {
307        let stream = self
308            .rpc
309            .server_streaming(ValidateRequest { repair })
310            .await?;
311        Ok(stream.map(|res| res.map_err(anyhow::Error::from)))
312    }
313
314    /// Validate hashes on the running node.
315    ///
316    /// If `repair` is true, repair the store by removing invalid data.
317    pub async fn consistency_check(
318        &self,
319        repair: bool,
320    ) -> Result<impl Stream<Item = Result<ConsistencyCheckProgress>>> {
321        let stream = self
322            .rpc
323            .server_streaming(ConsistencyCheckRequest { repair })
324            .await?;
325        Ok(stream.map(|r| r.map_err(anyhow::Error::from)))
326    }
327
328    /// Download a blob from another node and add it to the local database.
329    pub async fn download(&self, hash: Hash, node: NodeAddr) -> Result<DownloadProgress> {
330        self.download_with_opts(
331            hash,
332            DownloadOptions {
333                format: BlobFormat::Raw,
334                nodes: vec![node],
335                tag: SetTagOption::Auto,
336                mode: DownloadMode::Queued,
337            },
338        )
339        .await
340    }
341
342    /// Download a hash sequence from another node and add it to the local database.
343    pub async fn download_hash_seq(&self, hash: Hash, node: NodeAddr) -> Result<DownloadProgress> {
344        self.download_with_opts(
345            hash,
346            DownloadOptions {
347                format: BlobFormat::HashSeq,
348                nodes: vec![node],
349                tag: SetTagOption::Auto,
350                mode: DownloadMode::Queued,
351            },
352        )
353        .await
354    }
355
356    /// Download a blob, with additional options.
357    pub async fn download_with_opts(
358        &self,
359        hash: Hash,
360        opts: DownloadOptions,
361    ) -> Result<DownloadProgress> {
362        let DownloadOptions {
363            format,
364            nodes,
365            tag,
366            mode,
367        } = opts;
368        let stream = self
369            .rpc
370            .server_streaming(BlobDownloadRequest {
371                hash,
372                format,
373                nodes,
374                tag,
375                mode,
376            })
377            .await?;
378        Ok(DownloadProgress::new(
379            stream.map(|res| res.map_err(anyhow::Error::from)),
380        ))
381    }
382
383    /// Export a blob from the internal blob store to a path on the node's filesystem.
384    ///
385    /// `destination` should be an writeable, absolute path on the local node's filesystem.
386    ///
387    /// If `format` is set to [`ExportFormat::Collection`], and the `hash` refers to a collection,
388    /// all children of the collection will be exported. See [`ExportFormat`] for details.
389    ///
390    /// The `mode` argument defines if the blob should be copied to the target location or moved out of
391    /// the internal store into the target location. See [`ExportMode`] for details.
392    pub async fn export(
393        &self,
394        hash: Hash,
395        destination: PathBuf,
396        format: ExportFormat,
397        mode: ExportMode,
398    ) -> Result<ExportProgress> {
399        let req = ExportRequest {
400            hash,
401            path: destination,
402            format,
403            mode,
404        };
405        let stream = self.rpc.server_streaming(req).await?;
406        Ok(ExportProgress::new(
407            stream.map(|r| r.map_err(anyhow::Error::from)),
408        ))
409    }
410
411    /// List all complete blobs.
412    pub async fn list(&self) -> Result<impl Stream<Item = Result<BlobInfo>>> {
413        let stream = self.rpc.server_streaming(ListRequest).await?;
414        Ok(flatten(stream))
415    }
416
417    /// List all incomplete (partial) blobs.
418    pub async fn list_incomplete(&self) -> Result<impl Stream<Item = Result<IncompleteBlobInfo>>> {
419        let stream = self.rpc.server_streaming(ListIncompleteRequest).await?;
420        Ok(flatten(stream))
421    }
422
423    /// Read the content of a collection.
424    pub async fn get_collection(&self, hash: Hash) -> Result<Collection> {
425        Collection::load(hash, self).await
426    }
427
428    /// List all collections.
429    pub fn list_collections(&self) -> Result<impl Stream<Item = Result<CollectionInfo>>> {
430        let this = self.clone();
431        Ok(Gen::new(|co| async move {
432            if let Err(cause) = this.list_collections_impl(&co).await {
433                co.yield_(Err(cause)).await;
434            }
435        }))
436    }
437
438    async fn list_collections_impl(&self, co: &Co<Result<CollectionInfo>>) -> Result<()> {
439        let tags = self.tags_client();
440        let mut tags = tags.list_hash_seq().await?;
441        while let Some(tag) = tags.next().await {
442            let tag = tag?;
443            if let Ok(collection) = self.get_collection(tag.hash).await {
444                let info = CollectionInfo {
445                    tag: tag.name,
446                    hash: tag.hash,
447                    total_blobs_count: Some(collection.len() as u64 + 1),
448                    total_blobs_size: Some(0),
449                };
450                co.yield_(Ok(info)).await;
451            }
452        }
453        Ok(())
454    }
455
456    /// Delete a blob.
457    ///
458    /// **Warning**: this operation deletes the blob from the local store even
459    /// if it is tagged. You should usually not do this manually, but rely on the
460    /// node to remove data that is not tagged.
461    pub async fn delete_blob(&self, hash: Hash) -> Result<()> {
462        self.rpc.rpc(DeleteRequest { hash }).await??;
463        Ok(())
464    }
465
466    fn tags_client(&self) -> tags::Client<C> {
467        tags::Client::new(self.rpc.clone())
468    }
469}
470
471impl<C> SimpleStore for Client<C>
472where
473    C: Connector<RpcService>,
474{
475    async fn load(&self, hash: Hash) -> anyhow::Result<Bytes> {
476        self.read_to_bytes(hash).await
477    }
478}
479
480/// Defines the way to read bytes.
481#[derive(Debug, Serialize, Deserialize, Default, Clone, Copy)]
482pub enum ReadAtLen {
483    /// Reads all available bytes.
484    #[default]
485    All,
486    /// Reads exactly this many bytes, erroring out on larger or smaller.
487    Exact(u64),
488    /// Reads at most this many bytes.
489    AtMost(u64),
490}
491
492impl ReadAtLen {
493    /// todo make private again
494    pub fn as_result_len(&self, size_remaining: u64) -> u64 {
495        match self {
496            ReadAtLen::All => size_remaining,
497            ReadAtLen::Exact(len) => *len,
498            ReadAtLen::AtMost(len) => std::cmp::min(*len, size_remaining),
499        }
500    }
501}
502
503/// Whether to wrap the added data in a collection.
504#[derive(Debug, Serialize, Deserialize, Default, Clone)]
505pub enum WrapOption {
506    /// Do not wrap the file or directory.
507    #[default]
508    NoWrap,
509    /// Wrap the file or directory in a collection.
510    Wrap {
511        /// Override the filename in the wrapping collection.
512        name: Option<String>,
513    },
514}
515
516/// Status information about a blob.
517#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
518pub enum BlobStatus {
519    /// The blob is not stored at all.
520    NotFound,
521    /// The blob is only stored partially.
522    Partial {
523        /// The size of the currently stored partial blob.
524        size: BaoBlobSize,
525    },
526    /// The blob is stored completely.
527    Complete {
528        /// The size of the blob.
529        size: u64,
530    },
531}
532
533/// Outcome of a blob add operation.
534#[derive(Debug, Clone)]
535pub struct AddOutcome {
536    /// The hash of the blob
537    pub hash: Hash,
538    /// The format the blob
539    pub format: BlobFormat,
540    /// The size of the blob
541    pub size: u64,
542    /// The tag of the blob
543    pub tag: Tag,
544}
545
546/// Information about a stored collection.
547#[derive(Debug, Serialize, Deserialize)]
548pub struct CollectionInfo {
549    /// Tag of the collection
550    pub tag: Tag,
551
552    /// Hash of the collection
553    pub hash: Hash,
554    /// Number of children in the collection
555    ///
556    /// This is an optional field, because the data is not always available.
557    pub total_blobs_count: Option<u64>,
558    /// Total size of the raw data referred to by all links
559    ///
560    /// This is an optional field, because the data is not always available.
561    pub total_blobs_size: Option<u64>,
562}
563
564/// Information about a complete blob.
565#[derive(Debug, Serialize, Deserialize)]
566pub struct BlobInfo {
567    /// Location of the blob
568    pub path: String,
569    /// The hash of the blob
570    pub hash: Hash,
571    /// The size of the blob
572    pub size: u64,
573}
574
575/// Information about an incomplete blob.
576#[derive(Debug, Serialize, Deserialize)]
577pub struct IncompleteBlobInfo {
578    /// The size we got
579    pub size: u64,
580    /// The size we expect
581    pub expected_size: u64,
582    /// The hash of the blob
583    pub hash: Hash,
584}
585
586/// Progress stream for blob add operations.
587#[derive(derive_more::Debug)]
588pub struct AddProgress {
589    #[debug(skip)]
590    stream:
591        Pin<Box<dyn Stream<Item = Result<crate::provider::AddProgress>> + Send + Unpin + 'static>>,
592    current_total_size: Arc<AtomicU64>,
593}
594
595impl AddProgress {
596    fn new(
597        stream: (impl Stream<
598            Item = Result<impl Into<crate::provider::AddProgress>, impl Into<anyhow::Error>>,
599        > + Send
600             + Unpin
601             + 'static),
602    ) -> Self {
603        let current_total_size = Arc::new(AtomicU64::new(0));
604        let total_size = current_total_size.clone();
605        let stream = stream.map(move |item| match item {
606            Ok(item) => {
607                let item = item.into();
608                if let crate::provider::AddProgress::Found { size, .. } = &item {
609                    total_size.fetch_add(*size, Ordering::Relaxed);
610                }
611                Ok(item)
612            }
613            Err(err) => Err(err.into()),
614        });
615        Self {
616            stream: Box::pin(stream),
617            current_total_size,
618        }
619    }
620    /// Finish writing the stream, ignoring all intermediate progress events.
621    ///
622    /// Returns a [`AddOutcome`] which contains a tag, format, hash and a size.
623    /// When importing a single blob, this is the hash and size of that blob.
624    /// When importing a collection, the hash is the hash of the collection and the size
625    /// is the total size of all imported blobs (but excluding the size of the collection blob
626    /// itself).
627    pub async fn finish(self) -> Result<AddOutcome> {
628        self.await
629    }
630}
631
632impl Stream for AddProgress {
633    type Item = Result<crate::provider::AddProgress>;
634    fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
635        Pin::new(&mut self.stream).poll_next(cx)
636    }
637}
638
639impl Future for AddProgress {
640    type Output = Result<AddOutcome>;
641
642    fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
643        loop {
644            match Pin::new(&mut self.stream).poll_next(cx) {
645                Poll::Pending => return Poll::Pending,
646                Poll::Ready(None) => {
647                    return Poll::Ready(Err(anyhow!("Response stream ended prematurely")))
648                }
649                Poll::Ready(Some(Err(err))) => return Poll::Ready(Err(err)),
650                Poll::Ready(Some(Ok(msg))) => match msg {
651                    crate::provider::AddProgress::AllDone { hash, format, tag } => {
652                        let outcome = AddOutcome {
653                            hash,
654                            format,
655                            tag,
656                            size: self.current_total_size.load(Ordering::Relaxed),
657                        };
658                        return Poll::Ready(Ok(outcome));
659                    }
660                    crate::provider::AddProgress::Abort(err) => {
661                        return Poll::Ready(Err(err.into()));
662                    }
663                    _ => {}
664                },
665            }
666        }
667    }
668}
669
670/// Outcome of a blob download operation.
671#[derive(Debug, Clone)]
672pub struct DownloadOutcome {
673    /// The size of the data we already had locally
674    pub local_size: u64,
675    /// The size of the data we downloaded from the network
676    pub downloaded_size: u64,
677    /// Statistics about the download
678    pub stats: crate::get::Stats,
679}
680
681/// Progress stream for blob download operations.
682#[derive(derive_more::Debug)]
683pub struct DownloadProgress {
684    #[debug(skip)]
685    stream: Pin<Box<dyn Stream<Item = Result<BytesDownloadProgress>> + Send + Unpin + 'static>>,
686    current_local_size: Arc<AtomicU64>,
687    current_network_size: Arc<AtomicU64>,
688}
689
690impl DownloadProgress {
691    /// Create a [`DownloadProgress`] that can help you easily poll the [`BytesDownloadProgress`] stream from your download until it is finished or errors.
692    pub fn new(
693        stream: (impl Stream<Item = Result<impl Into<BytesDownloadProgress>, impl Into<anyhow::Error>>>
694             + Send
695             + Unpin
696             + 'static),
697    ) -> Self {
698        let current_local_size = Arc::new(AtomicU64::new(0));
699        let current_network_size = Arc::new(AtomicU64::new(0));
700
701        let local_size = current_local_size.clone();
702        let network_size = current_network_size.clone();
703
704        let stream = stream.map(move |item| match item {
705            Ok(item) => {
706                let item = item.into();
707                match &item {
708                    BytesDownloadProgress::FoundLocal { size, .. } => {
709                        local_size.fetch_add(size.value(), Ordering::Relaxed);
710                    }
711                    BytesDownloadProgress::Found { size, .. } => {
712                        network_size.fetch_add(*size, Ordering::Relaxed);
713                    }
714                    _ => {}
715                }
716
717                Ok(item)
718            }
719            Err(err) => Err(err.into()),
720        });
721        Self {
722            stream: Box::pin(stream),
723            current_local_size,
724            current_network_size,
725        }
726    }
727
728    /// Finish writing the stream, ignoring all intermediate progress events.
729    ///
730    /// Returns a [`DownloadOutcome`] which contains the size of the content we downloaded and the size of the content we already had locally.
731    /// When importing a single blob, this is the size of that blob.
732    /// When importing a collection, this is the total size of all imported blobs (but excluding the size of the collection blob itself).
733    pub async fn finish(self) -> Result<DownloadOutcome> {
734        self.await
735    }
736}
737
738impl Stream for DownloadProgress {
739    type Item = Result<BytesDownloadProgress>;
740    fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
741        Pin::new(&mut self.stream).poll_next(cx)
742    }
743}
744
745impl Future for DownloadProgress {
746    type Output = Result<DownloadOutcome>;
747
748    fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
749        loop {
750            match Pin::new(&mut self.stream).poll_next(cx) {
751                Poll::Pending => return Poll::Pending,
752                Poll::Ready(None) => {
753                    return Poll::Ready(Err(anyhow!("Response stream ended prematurely")))
754                }
755                Poll::Ready(Some(Err(err))) => return Poll::Ready(Err(err)),
756                Poll::Ready(Some(Ok(msg))) => match msg {
757                    BytesDownloadProgress::AllDone(stats) => {
758                        let outcome = DownloadOutcome {
759                            local_size: self.current_local_size.load(Ordering::Relaxed),
760                            downloaded_size: self.current_network_size.load(Ordering::Relaxed),
761                            stats,
762                        };
763                        return Poll::Ready(Ok(outcome));
764                    }
765                    BytesDownloadProgress::Abort(err) => {
766                        return Poll::Ready(Err(err.into()));
767                    }
768                    _ => {}
769                },
770            }
771        }
772    }
773}
774
775/// Outcome of a blob export operation.
776#[derive(Debug, Clone, PartialEq, Eq)]
777pub struct ExportOutcome {
778    /// The total size of the exported data.
779    total_size: u64,
780}
781
782/// Progress stream for blob export operations.
783#[derive(derive_more::Debug)]
784pub struct ExportProgress {
785    #[debug(skip)]
786    stream: Pin<Box<dyn Stream<Item = Result<BytesExportProgress>> + Send + Unpin + 'static>>,
787    current_total_size: Arc<AtomicU64>,
788}
789
790impl ExportProgress {
791    /// Create a [`ExportProgress`] that can help you easily poll the [`BytesExportProgress`] stream from your
792    /// download until it is finished or errors.
793    pub fn new(
794        stream: (impl Stream<Item = Result<impl Into<BytesExportProgress>, impl Into<anyhow::Error>>>
795             + Send
796             + Unpin
797             + 'static),
798    ) -> Self {
799        let current_total_size = Arc::new(AtomicU64::new(0));
800        let total_size = current_total_size.clone();
801        let stream = stream.map(move |item| match item {
802            Ok(item) => {
803                let item = item.into();
804                if let BytesExportProgress::Found { size, .. } = &item {
805                    let size = size.value();
806                    total_size.fetch_add(size, Ordering::Relaxed);
807                }
808
809                Ok(item)
810            }
811            Err(err) => Err(err.into()),
812        });
813        Self {
814            stream: Box::pin(stream),
815            current_total_size,
816        }
817    }
818
819    /// Finish writing the stream, ignoring all intermediate progress events.
820    ///
821    /// Returns a [`ExportOutcome`] which contains the size of the content we exported.
822    pub async fn finish(self) -> Result<ExportOutcome> {
823        self.await
824    }
825}
826
827impl Stream for ExportProgress {
828    type Item = Result<BytesExportProgress>;
829    fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
830        Pin::new(&mut self.stream).poll_next(cx)
831    }
832}
833
834impl Future for ExportProgress {
835    type Output = Result<ExportOutcome>;
836
837    fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
838        loop {
839            match Pin::new(&mut self.stream).poll_next(cx) {
840                Poll::Pending => return Poll::Pending,
841                Poll::Ready(None) => {
842                    return Poll::Ready(Err(anyhow!("Response stream ended prematurely")))
843                }
844                Poll::Ready(Some(Err(err))) => return Poll::Ready(Err(err)),
845                Poll::Ready(Some(Ok(msg))) => match msg {
846                    BytesExportProgress::AllDone => {
847                        let outcome = ExportOutcome {
848                            total_size: self.current_total_size.load(Ordering::Relaxed),
849                        };
850                        return Poll::Ready(Ok(outcome));
851                    }
852                    BytesExportProgress::Abort(err) => {
853                        return Poll::Ready(Err(err.into()));
854                    }
855                    _ => {}
856                },
857            }
858        }
859    }
860}
861
862/// Data reader for a single blob.
863///
864/// Implements [`AsyncRead`].
865#[derive(derive_more::Debug)]
866pub struct Reader {
867    size: u64,
868    response_size: u64,
869    is_complete: bool,
870    #[debug("StreamReader")]
871    stream: tokio_util::io::StreamReader<BoxStreamSync<'static, io::Result<Bytes>>, Bytes>,
872}
873
874impl Reader {
875    fn new(
876        size: u64,
877        response_size: u64,
878        is_complete: bool,
879        stream: BoxStreamSync<'static, io::Result<Bytes>>,
880    ) -> Self {
881        Self {
882            size,
883            response_size,
884            is_complete,
885            stream: StreamReader::new(stream),
886        }
887    }
888
889    /// todo make private again
890    pub async fn from_rpc_read<C>(
891        rpc: &RpcClient<RpcService, C>,
892        hash: Hash,
893    ) -> anyhow::Result<Self>
894    where
895        C: Connector<RpcService>,
896    {
897        Self::from_rpc_read_at(rpc, hash, 0, ReadAtLen::All).await
898    }
899
900    async fn from_rpc_read_at<C>(
901        rpc: &RpcClient<RpcService, C>,
902        hash: Hash,
903        offset: u64,
904        len: ReadAtLen,
905    ) -> anyhow::Result<Self>
906    where
907        C: Connector<RpcService>,
908    {
909        let stream = rpc
910            .server_streaming(ReadAtRequest { hash, offset, len })
911            .await?;
912        let mut stream = flatten(stream);
913
914        let (size, is_complete) = match stream.next().await {
915            Some(Ok(ReadAtResponse::Entry { size, is_complete })) => (size, is_complete),
916            Some(Err(err)) => return Err(err),
917            Some(Ok(_)) => return Err(anyhow!("Expected header frame, but got data frame")),
918            None => return Err(anyhow!("Expected header frame, but RPC stream was dropped")),
919        };
920
921        let stream = stream.map(|item| match item {
922            Ok(ReadAtResponse::Data { chunk }) => Ok(chunk),
923            Ok(_) => Err(io::Error::new(io::ErrorKind::Other, "Expected data frame")),
924            Err(err) => Err(io::Error::new(io::ErrorKind::Other, format!("{err}"))),
925        });
926        let len = len.as_result_len(size.value() - offset);
927        Ok(Self::new(size.value(), len, is_complete, Box::pin(stream)))
928    }
929
930    /// Total size of this blob.
931    pub fn size(&self) -> u64 {
932        self.size
933    }
934
935    /// Whether this blob has been downloaded completely.
936    ///
937    /// Returns false for partial blobs for which some chunks are missing.
938    pub fn is_complete(&self) -> bool {
939        self.is_complete
940    }
941
942    /// Read all bytes of the blob.
943    pub async fn read_to_bytes(&mut self) -> anyhow::Result<Bytes> {
944        let mut buf = Vec::with_capacity(self.response_size as usize);
945        self.read_to_end(&mut buf).await?;
946        Ok(buf.into())
947    }
948}
949
950impl AsyncRead for Reader {
951    fn poll_read(
952        mut self: Pin<&mut Self>,
953        cx: &mut Context<'_>,
954        buf: &mut ReadBuf<'_>,
955    ) -> Poll<io::Result<()>> {
956        Pin::new(&mut self.stream).poll_read(cx, buf)
957    }
958}
959
960impl Stream for Reader {
961    type Item = io::Result<Bytes>;
962
963    fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
964        Pin::new(&mut self.stream).get_pin_mut().poll_next(cx)
965    }
966
967    fn size_hint(&self) -> (usize, Option<usize>) {
968        self.stream.get_ref().size_hint()
969    }
970}
971
972/// Options to configure a download request.
973#[derive(Debug, Clone, Serialize, Deserialize)]
974pub struct DownloadOptions {
975    /// The format of the data to download.
976    pub format: BlobFormat,
977    /// Source nodes to download from.
978    ///
979    /// If set to more than a single node, they will all be tried. If `mode` is set to
980    /// [`DownloadMode::Direct`], they will be tried sequentially until a download succeeds.
981    /// If `mode` is set to [`DownloadMode::Queued`], the nodes may be dialed in parallel,
982    /// if the concurrency limits permit.
983    pub nodes: Vec<NodeAddr>,
984    /// Optional tag to tag the data with.
985    pub tag: SetTagOption,
986    /// Whether to directly start the download or add it to the download queue.
987    pub mode: DownloadMode,
988}
989
990fn chunked_bytes_stream(mut b: Bytes, c: usize) -> impl Stream<Item = Bytes> {
991    futures_lite::stream::iter(std::iter::from_fn(move || {
992        Some(b.split_to(b.len().min(c))).filter(|x| !x.is_empty())
993    }))
994}
995
996#[cfg(test)]
997mod tests {
998    use std::{path::Path, time::Duration};
999
1000    use iroh::{test_utils::DnsPkarrServer, NodeId, RelayMode, SecretKey};
1001    use node::Node;
1002    use rand::RngCore;
1003    use testresult::TestResult;
1004    use tokio::{io::AsyncWriteExt, sync::mpsc};
1005    use tracing_test::traced_test;
1006
1007    use super::*;
1008    use crate::{hashseq::HashSeq, ticket::BlobTicket};
1009
1010    mod node {
1011        //! An iroh node that just has the blobs transport
1012        use std::path::Path;
1013
1014        use iroh::{protocol::Router, Endpoint, NodeAddr, NodeId};
1015        use tokio_util::task::AbortOnDropHandle;
1016
1017        use super::RpcService;
1018        use crate::{
1019            net_protocol::Blobs,
1020            provider::{CustomEventSender, EventSender},
1021            rpc::client::{blobs, tags},
1022        };
1023
1024        type RpcClient = quic_rpc::RpcClient<RpcService>;
1025
1026        /// An iroh node that just has the blobs transport
1027        #[derive(Debug)]
1028        pub struct Node {
1029            router: iroh::protocol::Router,
1030            client: RpcClient,
1031            _rpc_task: AbortOnDropHandle<()>,
1032        }
1033
1034        /// An iroh node builder
1035        #[derive(Debug)]
1036        pub struct Builder<S> {
1037            store: S,
1038            events: EventSender,
1039            endpoint: Option<iroh::endpoint::Builder>,
1040        }
1041
1042        impl<S: crate::store::Store> Builder<S> {
1043            /// Sets the event sender
1044            pub fn blobs_events(self, events: impl CustomEventSender) -> Self {
1045                Self {
1046                    events: events.into(),
1047                    ..self
1048                }
1049            }
1050
1051            /// Set an endpoint builder
1052            pub fn endpoint(self, endpoint: iroh::endpoint::Builder) -> Self {
1053                Self {
1054                    endpoint: Some(endpoint),
1055                    ..self
1056                }
1057            }
1058
1059            /// Spawns the node
1060            pub async fn spawn(self) -> anyhow::Result<Node> {
1061                let store = self.store;
1062                let events = self.events;
1063                let endpoint = self
1064                    .endpoint
1065                    .unwrap_or_else(|| Endpoint::builder().discovery_n0())
1066                    .bind()
1067                    .await?;
1068                let mut router = Router::builder(endpoint.clone());
1069
1070                // Setup blobs
1071                let blobs = Blobs::builder(store.clone())
1072                    .events(events)
1073                    .build(&endpoint);
1074                router = router.accept(crate::ALPN, blobs.clone());
1075
1076                // Build the router
1077                let router = router.spawn().await?;
1078
1079                // Setup RPC
1080                let (internal_rpc, controller) = quic_rpc::transport::flume::channel(32);
1081                let internal_rpc = quic_rpc::RpcServer::new(internal_rpc).boxed();
1082                let _rpc_task = internal_rpc.spawn_accept_loop(move |msg, chan| {
1083                    blobs.clone().handle_rpc_request(msg, chan)
1084                });
1085                let client = quic_rpc::RpcClient::new(controller).boxed();
1086                Ok(Node {
1087                    router,
1088                    client,
1089                    _rpc_task,
1090                })
1091            }
1092        }
1093
1094        impl Node {
1095            /// Creates a new node with memory storage
1096            pub fn memory() -> Builder<crate::store::mem::Store> {
1097                Builder {
1098                    store: crate::store::mem::Store::new(),
1099                    events: Default::default(),
1100                    endpoint: None,
1101                }
1102            }
1103
1104            /// Creates a new node with persistent storage
1105            pub async fn persistent(
1106                path: impl AsRef<Path>,
1107            ) -> anyhow::Result<Builder<crate::store::fs::Store>> {
1108                Ok(Builder {
1109                    store: crate::store::fs::Store::load(path).await?,
1110                    events: Default::default(),
1111                    endpoint: None,
1112                })
1113            }
1114
1115            /// Returns the node id
1116            pub fn node_id(&self) -> NodeId {
1117                self.router.endpoint().node_id()
1118            }
1119
1120            /// Returns the node address
1121            pub async fn node_addr(&self) -> anyhow::Result<NodeAddr> {
1122                self.router.endpoint().node_addr().await
1123            }
1124
1125            /// Shuts down the node
1126            pub async fn shutdown(self) -> anyhow::Result<()> {
1127                self.router.shutdown().await
1128            }
1129
1130            /// Returns an in-memory blobs client
1131            pub fn blobs(&self) -> blobs::Client {
1132                blobs::Client::new(self.client.clone())
1133            }
1134
1135            /// Returns an in-memory tags client
1136            pub fn tags(&self) -> tags::Client {
1137                tags::Client::new(self.client.clone())
1138            }
1139        }
1140    }
1141
1142    #[tokio::test]
1143    #[traced_test]
1144    async fn test_blob_create_collection() -> Result<()> {
1145        let node = node::Node::memory().spawn().await?;
1146
1147        // create temp file
1148        let temp_dir = tempfile::tempdir().context("tempdir")?;
1149
1150        let in_root = temp_dir.path().join("in");
1151        tokio::fs::create_dir_all(in_root.clone())
1152            .await
1153            .context("create dir all")?;
1154
1155        let mut paths = Vec::new();
1156        for i in 0..5 {
1157            let path = in_root.join(format!("test-{i}"));
1158            let size = 100;
1159            let mut buf = vec![0u8; size];
1160            rand::thread_rng().fill_bytes(&mut buf);
1161            let mut file = tokio::fs::File::create(path.clone())
1162                .await
1163                .context("create file")?;
1164            file.write_all(&buf.clone()).await.context("write_all")?;
1165            file.flush().await.context("flush")?;
1166            paths.push(path);
1167        }
1168
1169        let blobs = node.blobs();
1170
1171        let mut collection = Collection::default();
1172        let mut tags = Vec::new();
1173        // import files
1174        for path in &paths {
1175            let import_outcome = blobs
1176                .add_from_path(
1177                    path.to_path_buf(),
1178                    false,
1179                    SetTagOption::Auto,
1180                    WrapOption::NoWrap,
1181                )
1182                .await
1183                .context("import file")?
1184                .finish()
1185                .await
1186                .context("import finish")?;
1187
1188            collection.push(
1189                path.file_name().unwrap().to_str().unwrap().to_string(),
1190                import_outcome.hash,
1191            );
1192            tags.push(import_outcome.tag);
1193        }
1194
1195        let (hash, tag) = blobs
1196            .create_collection(collection, SetTagOption::Auto, tags)
1197            .await?;
1198
1199        let collections: Vec<_> = blobs.list_collections()?.try_collect().await?;
1200
1201        assert_eq!(collections.len(), 1);
1202        {
1203            let CollectionInfo {
1204                tag,
1205                hash,
1206                total_blobs_count,
1207                ..
1208            } = &collections[0];
1209            assert_eq!(tag, tag);
1210            assert_eq!(hash, hash);
1211            // 5 blobs + 1 meta
1212            assert_eq!(total_blobs_count, &Some(5 + 1));
1213        }
1214
1215        // check that "temp" tags have been deleted
1216        let tags: Vec<_> = node.tags().list().await?.try_collect().await?;
1217        assert_eq!(tags.len(), 1);
1218        assert_eq!(tags[0].hash, hash);
1219        assert_eq!(tags[0].name, tag);
1220        assert_eq!(tags[0].format, BlobFormat::HashSeq);
1221
1222        Ok(())
1223    }
1224
1225    #[tokio::test]
1226    #[traced_test]
1227    async fn test_blob_read_at() -> Result<()> {
1228        let node = node::Node::memory().spawn().await?;
1229
1230        // create temp file
1231        let temp_dir = tempfile::tempdir().context("tempdir")?;
1232
1233        let in_root = temp_dir.path().join("in");
1234        tokio::fs::create_dir_all(in_root.clone())
1235            .await
1236            .context("create dir all")?;
1237
1238        let path = in_root.join("test-blob");
1239        let size = 1024 * 128;
1240        let buf: Vec<u8> = (0..size).map(|i| i as u8).collect();
1241        let mut file = tokio::fs::File::create(path.clone())
1242            .await
1243            .context("create file")?;
1244        file.write_all(&buf.clone()).await.context("write_all")?;
1245        file.flush().await.context("flush")?;
1246
1247        let blobs = node.blobs();
1248
1249        let import_outcome = blobs
1250            .add_from_path(
1251                path.to_path_buf(),
1252                false,
1253                SetTagOption::Auto,
1254                WrapOption::NoWrap,
1255            )
1256            .await
1257            .context("import file")?
1258            .finish()
1259            .await
1260            .context("import finish")?;
1261
1262        let hash = import_outcome.hash;
1263
1264        // Read everything
1265        let res = blobs.read_to_bytes(hash).await?;
1266        assert_eq!(&res, &buf[..]);
1267
1268        // Read at smaller than blob_get_chunk_size
1269        let res = blobs
1270            .read_at_to_bytes(hash, 0, ReadAtLen::Exact(100))
1271            .await?;
1272        assert_eq!(res.len(), 100);
1273        assert_eq!(&res[..], &buf[0..100]);
1274
1275        let res = blobs
1276            .read_at_to_bytes(hash, 20, ReadAtLen::Exact(120))
1277            .await?;
1278        assert_eq!(res.len(), 120);
1279        assert_eq!(&res[..], &buf[20..140]);
1280
1281        // Read at equal to blob_get_chunk_size
1282        let res = blobs
1283            .read_at_to_bytes(hash, 0, ReadAtLen::Exact(1024 * 64))
1284            .await?;
1285        assert_eq!(res.len(), 1024 * 64);
1286        assert_eq!(&res[..], &buf[0..1024 * 64]);
1287
1288        let res = blobs
1289            .read_at_to_bytes(hash, 20, ReadAtLen::Exact(1024 * 64))
1290            .await?;
1291        assert_eq!(res.len(), 1024 * 64);
1292        assert_eq!(&res[..], &buf[20..(20 + 1024 * 64)]);
1293
1294        // Read at larger than blob_get_chunk_size
1295        let res = blobs
1296            .read_at_to_bytes(hash, 0, ReadAtLen::Exact(10 + 1024 * 64))
1297            .await?;
1298        assert_eq!(res.len(), 10 + 1024 * 64);
1299        assert_eq!(&res[..], &buf[0..(10 + 1024 * 64)]);
1300
1301        let res = blobs
1302            .read_at_to_bytes(hash, 20, ReadAtLen::Exact(10 + 1024 * 64))
1303            .await?;
1304        assert_eq!(res.len(), 10 + 1024 * 64);
1305        assert_eq!(&res[..], &buf[20..(20 + 10 + 1024 * 64)]);
1306
1307        // full length
1308        let res = blobs.read_at_to_bytes(hash, 20, ReadAtLen::All).await?;
1309        assert_eq!(res.len(), 1024 * 128 - 20);
1310        assert_eq!(&res[..], &buf[20..]);
1311
1312        // size should be total
1313        let reader = blobs.read_at(hash, 0, ReadAtLen::Exact(20)).await?;
1314        assert_eq!(reader.size(), 1024 * 128);
1315        assert_eq!(reader.response_size, 20);
1316
1317        // last chunk - exact
1318        let res = blobs
1319            .read_at_to_bytes(hash, 1024 * 127, ReadAtLen::Exact(1024))
1320            .await?;
1321        assert_eq!(res.len(), 1024);
1322        assert_eq!(res, &buf[1024 * 127..]);
1323
1324        // last chunk - open
1325        let res = blobs
1326            .read_at_to_bytes(hash, 1024 * 127, ReadAtLen::All)
1327            .await?;
1328        assert_eq!(res.len(), 1024);
1329        assert_eq!(res, &buf[1024 * 127..]);
1330
1331        // last chunk - larger
1332        let mut res = blobs
1333            .read_at(hash, 1024 * 127, ReadAtLen::AtMost(2048))
1334            .await?;
1335        assert_eq!(res.size, 1024 * 128);
1336        assert_eq!(res.response_size, 1024);
1337        let res = res.read_to_bytes().await?;
1338        assert_eq!(res.len(), 1024);
1339        assert_eq!(res, &buf[1024 * 127..]);
1340
1341        // out of bounds - too long
1342        let res = blobs
1343            .read_at(hash, 0, ReadAtLen::Exact(1024 * 128 + 1))
1344            .await;
1345        let err = res.unwrap_err();
1346        assert!(err.to_string().contains("out of bound"));
1347
1348        // out of bounds - offset larger than blob
1349        let res = blobs.read_at(hash, 1024 * 128 + 1, ReadAtLen::All).await;
1350        let err = res.unwrap_err();
1351        assert!(err.to_string().contains("out of range"));
1352
1353        // out of bounds - offset + length too large
1354        let res = blobs
1355            .read_at(hash, 1024 * 127, ReadAtLen::Exact(1025))
1356            .await;
1357        let err = res.unwrap_err();
1358        assert!(err.to_string().contains("out of bound"));
1359
1360        Ok(())
1361    }
1362
1363    #[tokio::test]
1364    #[traced_test]
1365    async fn test_blob_get_collection() -> Result<()> {
1366        let node = node::Node::memory().spawn().await?;
1367
1368        // create temp file
1369        let temp_dir = tempfile::tempdir().context("tempdir")?;
1370
1371        let in_root = temp_dir.path().join("in");
1372        tokio::fs::create_dir_all(in_root.clone())
1373            .await
1374            .context("create dir all")?;
1375
1376        let mut paths = Vec::new();
1377        for i in 0..5 {
1378            let path = in_root.join(format!("test-{i}"));
1379            let size = 100;
1380            let mut buf = vec![0u8; size];
1381            rand::thread_rng().fill_bytes(&mut buf);
1382            let mut file = tokio::fs::File::create(path.clone())
1383                .await
1384                .context("create file")?;
1385            file.write_all(&buf.clone()).await.context("write_all")?;
1386            file.flush().await.context("flush")?;
1387            paths.push(path);
1388        }
1389
1390        let blobs = node.blobs();
1391
1392        let mut collection = Collection::default();
1393        let mut tags = Vec::new();
1394        // import files
1395        for path in &paths {
1396            let import_outcome = blobs
1397                .add_from_path(
1398                    path.to_path_buf(),
1399                    false,
1400                    SetTagOption::Auto,
1401                    WrapOption::NoWrap,
1402                )
1403                .await
1404                .context("import file")?
1405                .finish()
1406                .await
1407                .context("import finish")?;
1408
1409            collection.push(
1410                path.file_name().unwrap().to_str().unwrap().to_string(),
1411                import_outcome.hash,
1412            );
1413            tags.push(import_outcome.tag);
1414        }
1415
1416        let (hash, _tag) = blobs
1417            .create_collection(collection, SetTagOption::Auto, tags)
1418            .await?;
1419
1420        let collection = blobs.get_collection(hash).await?;
1421
1422        // 5 blobs
1423        assert_eq!(collection.len(), 5);
1424
1425        Ok(())
1426    }
1427
1428    #[tokio::test]
1429    #[traced_test]
1430    async fn test_blob_share() -> Result<()> {
1431        let node = node::Node::memory().spawn().await?;
1432
1433        // create temp file
1434        let temp_dir = tempfile::tempdir().context("tempdir")?;
1435
1436        let in_root = temp_dir.path().join("in");
1437        tokio::fs::create_dir_all(in_root.clone())
1438            .await
1439            .context("create dir all")?;
1440
1441        let path = in_root.join("test-blob");
1442        let size = 1024 * 128;
1443        let buf: Vec<u8> = (0..size).map(|i| i as u8).collect();
1444        let mut file = tokio::fs::File::create(path.clone())
1445            .await
1446            .context("create file")?;
1447        file.write_all(&buf.clone()).await.context("write_all")?;
1448        file.flush().await.context("flush")?;
1449
1450        let blobs = node.blobs();
1451
1452        let import_outcome = blobs
1453            .add_from_path(
1454                path.to_path_buf(),
1455                false,
1456                SetTagOption::Auto,
1457                WrapOption::NoWrap,
1458            )
1459            .await
1460            .context("import file")?
1461            .finish()
1462            .await
1463            .context("import finish")?;
1464
1465        // let ticket = blobs
1466        //     .share(import_outcome.hash, BlobFormat::Raw, Default::default())
1467        //     .await?;
1468        // assert_eq!(ticket.hash(), import_outcome.hash);
1469
1470        let status = blobs.status(import_outcome.hash).await?;
1471        assert_eq!(status, BlobStatus::Complete { size });
1472
1473        Ok(())
1474    }
1475
1476    #[derive(Debug, Clone)]
1477    struct BlobEvents {
1478        sender: mpsc::Sender<crate::provider::Event>,
1479    }
1480
1481    impl BlobEvents {
1482        fn new(cap: usize) -> (Self, mpsc::Receiver<crate::provider::Event>) {
1483            let (s, r) = mpsc::channel(cap);
1484            (Self { sender: s }, r)
1485        }
1486    }
1487
1488    impl crate::provider::CustomEventSender for BlobEvents {
1489        fn send(&self, event: crate::provider::Event) -> futures_lite::future::Boxed<()> {
1490            let sender = self.sender.clone();
1491            Box::pin(async move {
1492                sender.send(event).await.ok();
1493            })
1494        }
1495
1496        fn try_send(&self, event: crate::provider::Event) {
1497            self.sender.try_send(event).ok();
1498        }
1499    }
1500
1501    #[tokio::test]
1502    #[traced_test]
1503    async fn test_blob_provide_events() -> Result<()> {
1504        let (node1_events, mut node1_events_r) = BlobEvents::new(16);
1505        let node1 = node::Node::memory()
1506            .blobs_events(node1_events)
1507            .spawn()
1508            .await?;
1509
1510        let (node2_events, mut node2_events_r) = BlobEvents::new(16);
1511        let node2 = node::Node::memory()
1512            .blobs_events(node2_events)
1513            .spawn()
1514            .await?;
1515
1516        let import_outcome = node1.blobs().add_bytes(&b"hello world"[..]).await?;
1517
1518        // Download in node2
1519        let node1_addr = node1.node_addr().await?;
1520        let res = node2
1521            .blobs()
1522            .download(import_outcome.hash, node1_addr)
1523            .await?
1524            .await?;
1525        dbg!(&res);
1526        assert_eq!(res.local_size, 0);
1527        assert_eq!(res.downloaded_size, 11);
1528
1529        node1.shutdown().await?;
1530        node2.shutdown().await?;
1531
1532        let mut ev1 = Vec::new();
1533        while let Some(ev) = node1_events_r.recv().await {
1534            ev1.push(ev);
1535        }
1536        // assert_eq!(ev1.len(), 3);
1537        assert!(matches!(
1538            ev1[0],
1539            crate::provider::Event::ClientConnected { .. }
1540        ));
1541        assert!(matches!(
1542            ev1[1],
1543            crate::provider::Event::GetRequestReceived { .. }
1544        ));
1545        assert!(matches!(
1546            ev1[2],
1547            crate::provider::Event::TransferProgress { .. }
1548        ));
1549        assert!(matches!(
1550            ev1[3],
1551            crate::provider::Event::TransferCompleted { .. }
1552        ));
1553        dbg!(&ev1);
1554
1555        let mut ev2 = Vec::new();
1556        while let Some(ev) = node2_events_r.recv().await {
1557            ev2.push(ev);
1558        }
1559
1560        // Node 2 did not provide anything
1561        assert!(ev2.is_empty());
1562        Ok(())
1563    }
1564    /// Download a existing blob from oneself
1565    #[tokio::test]
1566    #[traced_test]
1567    async fn test_blob_get_self_existing() -> TestResult<()> {
1568        let node = node::Node::memory().spawn().await?;
1569        let node_id = node.node_id();
1570        let blobs = node.blobs();
1571
1572        let AddOutcome { hash, size, .. } = blobs.add_bytes("foo").await?;
1573
1574        // Direct
1575        let res = blobs
1576            .download_with_opts(
1577                hash,
1578                DownloadOptions {
1579                    format: BlobFormat::Raw,
1580                    nodes: vec![node_id.into()],
1581                    tag: SetTagOption::Auto,
1582                    mode: DownloadMode::Direct,
1583                },
1584            )
1585            .await?
1586            .await?;
1587
1588        assert_eq!(res.local_size, size);
1589        assert_eq!(res.downloaded_size, 0);
1590
1591        // Queued
1592        let res = blobs
1593            .download_with_opts(
1594                hash,
1595                DownloadOptions {
1596                    format: BlobFormat::Raw,
1597                    nodes: vec![node_id.into()],
1598                    tag: SetTagOption::Auto,
1599                    mode: DownloadMode::Queued,
1600                },
1601            )
1602            .await?
1603            .await?;
1604
1605        assert_eq!(res.local_size, size);
1606        assert_eq!(res.downloaded_size, 0);
1607
1608        Ok(())
1609    }
1610
1611    /// Download a missing blob from oneself
1612    #[tokio::test]
1613    #[traced_test]
1614    async fn test_blob_get_self_missing() -> TestResult<()> {
1615        let node = node::Node::memory().spawn().await?;
1616        let node_id = node.node_id();
1617        let blobs = node.blobs();
1618
1619        let hash = Hash::from_bytes([0u8; 32]);
1620
1621        // Direct
1622        let res = blobs
1623            .download_with_opts(
1624                hash,
1625                DownloadOptions {
1626                    format: BlobFormat::Raw,
1627                    nodes: vec![node_id.into()],
1628                    tag: SetTagOption::Auto,
1629                    mode: DownloadMode::Direct,
1630                },
1631            )
1632            .await?
1633            .await;
1634        assert!(res.is_err());
1635        assert_eq!(
1636            res.err().unwrap().to_string().as_str(),
1637            "No nodes to download from provided"
1638        );
1639
1640        // Queued
1641        let res = blobs
1642            .download_with_opts(
1643                hash,
1644                DownloadOptions {
1645                    format: BlobFormat::Raw,
1646                    nodes: vec![node_id.into()],
1647                    tag: SetTagOption::Auto,
1648                    mode: DownloadMode::Queued,
1649                },
1650            )
1651            .await?
1652            .await;
1653        assert!(res.is_err());
1654        assert_eq!(
1655            res.err().unwrap().to_string().as_str(),
1656            "No provider nodes found"
1657        );
1658
1659        Ok(())
1660    }
1661
1662    /// Download a existing collection. Check that things succeed and no download is performed.
1663    #[tokio::test]
1664    #[traced_test]
1665    async fn test_blob_get_existing_collection() -> TestResult<()> {
1666        let node = node::Node::memory().spawn().await?;
1667        // We use a nonexisting node id because we just want to check that this succeeds without
1668        // hitting the network.
1669        let node_id = NodeId::from_bytes(&[0u8; 32])?;
1670        let blobs = node.blobs();
1671
1672        let mut collection = Collection::default();
1673        let mut tags = Vec::new();
1674        let mut size = 0;
1675        for value in ["iroh", "is", "cool"] {
1676            let import_outcome = blobs.add_bytes(value).await.context("add bytes")?;
1677            collection.push(value.to_string(), import_outcome.hash);
1678            tags.push(import_outcome.tag);
1679            size += import_outcome.size;
1680        }
1681
1682        let (hash, _tag) = blobs
1683            .create_collection(collection, SetTagOption::Auto, tags)
1684            .await?;
1685
1686        // load the hashseq and collection header manually to calculate our expected size
1687        let hashseq_bytes = blobs.read_to_bytes(hash).await?;
1688        size += hashseq_bytes.len() as u64;
1689        let hashseq = HashSeq::try_from(hashseq_bytes)?;
1690        let collection_header_bytes = blobs
1691            .read_to_bytes(hashseq.into_iter().next().expect("header to exist"))
1692            .await?;
1693        size += collection_header_bytes.len() as u64;
1694
1695        // Direct
1696        let res = blobs
1697            .download_with_opts(
1698                hash,
1699                DownloadOptions {
1700                    format: BlobFormat::HashSeq,
1701                    nodes: vec![node_id.into()],
1702                    tag: SetTagOption::Auto,
1703                    mode: DownloadMode::Direct,
1704                },
1705            )
1706            .await?
1707            .await
1708            .context("direct (download)")?;
1709
1710        assert_eq!(res.local_size, size);
1711        assert_eq!(res.downloaded_size, 0);
1712
1713        // Queued
1714        let res = blobs
1715            .download_with_opts(
1716                hash,
1717                DownloadOptions {
1718                    format: BlobFormat::HashSeq,
1719                    nodes: vec![node_id.into()],
1720                    tag: SetTagOption::Auto,
1721                    mode: DownloadMode::Queued,
1722                },
1723            )
1724            .await?
1725            .await
1726            .context("queued")?;
1727
1728        assert_eq!(res.local_size, size);
1729        assert_eq!(res.downloaded_size, 0);
1730
1731        Ok(())
1732    }
1733
1734    #[tokio::test]
1735    #[traced_test]
1736    #[cfg_attr(target_os = "windows", ignore = "flaky")]
1737    async fn test_blob_delete_mem() -> Result<()> {
1738        let node = node::Node::memory().spawn().await?;
1739
1740        let res = node.blobs().add_bytes(&b"hello world"[..]).await?;
1741
1742        let hashes: Vec<_> = node.blobs().list().await?.try_collect().await?;
1743        assert_eq!(hashes.len(), 1);
1744        assert_eq!(hashes[0].hash, res.hash);
1745
1746        // delete
1747        node.blobs().delete_blob(res.hash).await?;
1748
1749        let hashes: Vec<_> = node.blobs().list().await?.try_collect().await?;
1750        assert!(hashes.is_empty());
1751
1752        Ok(())
1753    }
1754
1755    #[tokio::test]
1756    #[traced_test]
1757    async fn test_blob_delete_fs() -> Result<()> {
1758        let dir = tempfile::tempdir()?;
1759        let node = node::Node::persistent(dir.path()).await?.spawn().await?;
1760
1761        let res = node.blobs().add_bytes(&b"hello world"[..]).await?;
1762
1763        let hashes: Vec<_> = node.blobs().list().await?.try_collect().await?;
1764        assert_eq!(hashes.len(), 1);
1765        assert_eq!(hashes[0].hash, res.hash);
1766
1767        // delete
1768        node.blobs().delete_blob(res.hash).await?;
1769
1770        let hashes: Vec<_> = node.blobs().list().await?.try_collect().await?;
1771        assert!(hashes.is_empty());
1772
1773        Ok(())
1774    }
1775
1776    #[tokio::test]
1777    #[traced_test]
1778    async fn test_ticket_multiple_addrs() -> TestResult<()> {
1779        let node = Node::memory().spawn().await?;
1780        let hash = node
1781            .blobs()
1782            .add_bytes(Bytes::from_static(b"hello"))
1783            .await?
1784            .hash;
1785
1786        let addr = node.node_addr().await?;
1787        let ticket = BlobTicket::new(addr, hash, BlobFormat::Raw)?;
1788        println!("addrs: {:?}", ticket.node_addr());
1789        assert!(!ticket.node_addr().direct_addresses.is_empty());
1790        Ok(())
1791    }
1792
1793    #[tokio::test]
1794    #[traced_test]
1795    async fn test_node_add_blob_stream() -> Result<()> {
1796        use std::io::Cursor;
1797        let node = Node::memory().spawn().await?;
1798
1799        let blobs = node.blobs();
1800        let input = vec![2u8; 1024 * 256]; // 265kb so actually streaming, chunk size is 64kb
1801        let reader = Cursor::new(input.clone());
1802        let progress = blobs.add_reader(reader, SetTagOption::Auto).await?;
1803        let outcome = progress.finish().await?;
1804        let hash = outcome.hash;
1805        let output = blobs.read_to_bytes(hash).await?;
1806        assert_eq!(input, output.to_vec());
1807        Ok(())
1808    }
1809
1810    #[tokio::test]
1811    #[traced_test]
1812    async fn test_node_add_tagged_blob_event() -> Result<()> {
1813        let node = Node::memory().spawn().await?;
1814
1815        let _got_hash = tokio::time::timeout(Duration::from_secs(10), async move {
1816            let mut stream = node
1817                .blobs()
1818                .add_from_path(
1819                    Path::new(env!("CARGO_MANIFEST_DIR")).join("README.md"),
1820                    false,
1821                    SetTagOption::Auto,
1822                    WrapOption::NoWrap,
1823                )
1824                .await?;
1825
1826            while let Some(progress) = stream.next().await {
1827                match progress? {
1828                    crate::provider::AddProgress::AllDone { hash, .. } => {
1829                        return Ok(hash);
1830                    }
1831                    crate::provider::AddProgress::Abort(e) => {
1832                        anyhow::bail!("Error while adding data: {e}");
1833                    }
1834                    _ => {}
1835                }
1836            }
1837            anyhow::bail!("stream ended without providing data");
1838        })
1839        .await
1840        .context("timeout")?
1841        .context("get failed")?;
1842
1843        Ok(())
1844    }
1845
1846    #[tokio::test]
1847    #[traced_test]
1848    async fn test_download_via_relay() -> Result<()> {
1849        let (relay_map, relay_url, _guard) = iroh::test_utils::run_relay_server().await?;
1850
1851        let endpoint1 = iroh::Endpoint::builder()
1852            .relay_mode(RelayMode::Custom(relay_map.clone()))
1853            .insecure_skip_relay_cert_verify(true);
1854        let node1 = Node::memory().endpoint(endpoint1).spawn().await?;
1855        let endpoint2 = iroh::Endpoint::builder()
1856            .relay_mode(RelayMode::Custom(relay_map.clone()))
1857            .insecure_skip_relay_cert_verify(true);
1858        let node2 = Node::memory().endpoint(endpoint2).spawn().await?;
1859        let AddOutcome { hash, .. } = node1.blobs().add_bytes(b"foo".to_vec()).await?;
1860
1861        // create a node addr with only a relay URL, no direct addresses
1862        let addr = NodeAddr::new(node1.node_id()).with_relay_url(relay_url);
1863        node2.blobs().download(hash, addr).await?.await?;
1864        assert_eq!(
1865            node2
1866                .blobs()
1867                .read_to_bytes(hash)
1868                .await
1869                .context("get")?
1870                .as_ref(),
1871            b"foo"
1872        );
1873        Ok(())
1874    }
1875
1876    #[tokio::test]
1877    #[traced_test]
1878    #[ignore = "flaky"]
1879    async fn test_download_via_relay_with_discovery() -> Result<()> {
1880        let (relay_map, _relay_url, _guard) = iroh::test_utils::run_relay_server().await?;
1881        let dns_pkarr_server = DnsPkarrServer::run().await?;
1882
1883        let mut rng = rand::thread_rng();
1884
1885        let secret1 = SecretKey::generate(&mut rng);
1886        let endpoint1 = iroh::Endpoint::builder()
1887            .relay_mode(RelayMode::Custom(relay_map.clone()))
1888            .insecure_skip_relay_cert_verify(true)
1889            .dns_resolver(dns_pkarr_server.dns_resolver())
1890            .secret_key(secret1.clone())
1891            .discovery(dns_pkarr_server.discovery(secret1));
1892        let node1 = Node::memory().endpoint(endpoint1).spawn().await?;
1893        let secret2 = SecretKey::generate(&mut rng);
1894        let endpoint2 = iroh::Endpoint::builder()
1895            .relay_mode(RelayMode::Custom(relay_map.clone()))
1896            .insecure_skip_relay_cert_verify(true)
1897            .dns_resolver(dns_pkarr_server.dns_resolver())
1898            .secret_key(secret2.clone())
1899            .discovery(dns_pkarr_server.discovery(secret2));
1900        let node2 = Node::memory().endpoint(endpoint2).spawn().await?;
1901        let hash = node1.blobs().add_bytes(b"foo".to_vec()).await?.hash;
1902
1903        // create a node addr with node id only
1904        let addr = NodeAddr::new(node1.node_id());
1905        node2.blobs().download(hash, addr).await?.await?;
1906        assert_eq!(
1907            node2
1908                .blobs()
1909                .read_to_bytes(hash)
1910                .await
1911                .context("get")?
1912                .as_ref(),
1913            b"foo"
1914        );
1915        Ok(())
1916    }
1917}