iroh_blobs/rpc/client/
blobs.rs

1//! API for blobs management.
2//!
3//! The main entry point is the [`Client`].
4//!
5//! ## Interacting with the local blob store
6//!
7//! ### Importing data
8//!
9//! There are several ways to import data into the local blob store:
10//!
11//! - [`add_bytes`](Client::add_bytes)
12//!   imports in memory data.
13//! - [`add_stream`](Client::add_stream)
14//!   imports data from a stream of bytes.
15//! - [`add_reader`](Client::add_reader)
16//!   imports data from an [async reader](tokio::io::AsyncRead).
17//! - [`add_from_path`](Client::add_from_path)
18//!   imports data from a file.
19//!
20//! The last method imports data from a file on the local filesystem.
21//! This is the most efficient way to import large amounts of data.
22//!
23//! ### Exporting data
24//!
25//! There are several ways to export data from the local blob store:
26//!
27//! - [`read_to_bytes`](Client::read_to_bytes) reads data into memory.
28//! - [`read`](Client::read) creates a [reader](Reader) to read data from.
29//! - [`export`](Client::export) eports data to a file on the local filesystem.
30//!
31//! ## Interacting with remote nodes
32//!
33//! - [`download`](Client::download) downloads data from a remote node.
34//!   remote node.
35//!
36//! ## Interacting with the blob store itself
37//!
38//! These are more advanced operations that are usually not needed in normal
39//! operation.
40//!
41//! - [`consistency_check`](Client::consistency_check) checks the internal
42//!   consistency of the local blob store.
43//! - [`validate`](Client::validate) validates the locally stored data against
44//!   their BLAKE3 hashes.
45//! - [`delete_blob`](Client::delete_blob) deletes a blob from the local store.
46//!
47//! ### Batch operations
48//!
49//! For complex update operations, there is a [`batch`](Client::batch) API that
50//! allows you to add multiple blobs in a single logical batch.
51//!
52//! Operations in a batch return [temporary tags](crate::util::TempTag) that
53//! protect the added data from garbage collection as long as the batch is
54//! alive.
55//!
56//! To store the data permanently, a temp tag needs to be upgraded to a
57//! permanent tag using [`persist`](crate::rpc::client::blobs::Batch::persist) or
58//! [`persist_to`](crate::rpc::client::blobs::Batch::persist_to).
59use std::{
60    future::Future,
61    io,
62    path::PathBuf,
63    pin::Pin,
64    sync::Arc,
65    task::{Context, Poll},
66};
67
68use anyhow::{anyhow, Context as _, Result};
69use bytes::Bytes;
70use futures_lite::{Stream, StreamExt};
71use futures_util::SinkExt;
72use genawaiter::sync::{Co, Gen};
73use iroh::NodeAddr;
74use portable_atomic::{AtomicU64, Ordering};
75use quic_rpc::{
76    client::{BoxStreamSync, BoxedConnector},
77    transport::boxed::BoxableConnector,
78    Connector, RpcClient,
79};
80use serde::{Deserialize, Serialize};
81use tokio::io::{AsyncRead, AsyncReadExt, ReadBuf};
82use tokio_util::io::{ReaderStream, StreamReader};
83use tracing::warn;
84
85pub use crate::net_protocol::DownloadMode;
86use crate::{
87    export::ExportProgress as BytesExportProgress,
88    format::collection::{Collection, SimpleStore},
89    get::db::DownloadProgress as BytesDownloadProgress,
90    net_protocol::BlobDownloadRequest,
91    rpc::proto::{Request, Response, RpcService},
92    store::{BaoBlobSize, ConsistencyCheckProgress, ExportFormat, ExportMode, ValidateProgress},
93    util::SetTagOption,
94    BlobFormat, Hash, Tag,
95};
96
97mod batch;
98pub use batch::{AddDirOpts, AddFileOpts, AddReaderOpts, Batch};
99
100use super::{flatten, tags};
101use crate::rpc::proto::blobs::{
102    AddPathRequest, AddStreamRequest, AddStreamUpdate, BatchCreateRequest, BatchCreateResponse,
103    BlobStatusRequest, ConsistencyCheckRequest, CreateCollectionRequest, CreateCollectionResponse,
104    DeleteRequest, ExportRequest, ListIncompleteRequest, ListRequest, ReadAtRequest,
105    ReadAtResponse, ValidateRequest,
106};
107
108/// Iroh blobs client.
109#[derive(Debug, Clone)]
110#[repr(transparent)]
111pub struct Client<C = BoxedConnector<RpcService>> {
112    pub(crate) rpc: RpcClient<RpcService, C>,
113}
114
115/// Type alias for a memory-backed client.
116pub type MemClient = Client<crate::rpc::MemConnector>;
117
118impl<C> Client<C>
119where
120    C: Connector<RpcService>,
121{
122    /// Create a new client
123    pub fn new(rpc: RpcClient<RpcService, C>) -> Self {
124        Self { rpc }
125    }
126
127    /// Box the client to avoid having to provide the connector type.
128    pub fn boxed(&self) -> Client<BoxedConnector<RpcService>>
129    where
130        C: BoxableConnector<Response, Request>,
131    {
132        Client::new(self.rpc.clone().boxed())
133    }
134
135    /// Get a tags client.
136    pub fn tags(&self) -> tags::Client<C> {
137        tags::Client::new(self.rpc.clone())
138    }
139
140    /// Check if a blob is completely stored on the node.
141    ///
142    /// Note that this will return false for blobs that are partially stored on
143    /// the node.
144    pub async fn status(&self, hash: Hash) -> Result<BlobStatus> {
145        let status = self.rpc.rpc(BlobStatusRequest { hash }).await??;
146        Ok(status.0)
147    }
148
149    /// Check if a blob is completely stored on the node.
150    ///
151    /// This is just a convenience wrapper around `status` that returns a boolean.
152    pub async fn has(&self, hash: Hash) -> Result<bool> {
153        match self.status(hash).await {
154            Ok(BlobStatus::Complete { .. }) => Ok(true),
155            Ok(_) => Ok(false),
156            Err(err) => Err(err),
157        }
158    }
159
160    /// Create a new batch for adding data.
161    ///
162    /// A batch is a context in which temp tags are created and data is added to the node. Temp tags
163    /// are automatically deleted when the batch is dropped, leading to the data being garbage collected
164    /// unless a permanent tag is created for it.
165    pub async fn batch(&self) -> Result<Batch<C>> {
166        let (updates, mut stream) = self.rpc.bidi(BatchCreateRequest).await?;
167        let BatchCreateResponse::Id(batch) = stream.next().await.context("expected scope id")??;
168        let rpc = self.rpc.clone();
169        Ok(Batch::new(batch, rpc, updates, 1024))
170    }
171
172    /// Stream the contents of a a single blob.
173    ///
174    /// Returns a [`Reader`], which can report the size of the blob before reading it.
175    pub async fn read(&self, hash: Hash) -> Result<Reader> {
176        Reader::from_rpc_read(&self.rpc, hash).await
177    }
178
179    /// Read offset + len from a single blob.
180    ///
181    /// If `len` is `None` it will read the full blob.
182    pub async fn read_at(&self, hash: Hash, offset: u64, len: ReadAtLen) -> Result<Reader> {
183        Reader::from_rpc_read_at(&self.rpc, hash, offset, len).await
184    }
185
186    /// Read all bytes of single blob.
187    ///
188    /// This allocates a buffer for the full blob. Use only if you know that the blob you're
189    /// reading is small. If not sure, use [`Self::read`] and check the size with
190    /// [`Reader::size`] before calling [`Reader::read_to_bytes`].
191    pub async fn read_to_bytes(&self, hash: Hash) -> Result<Bytes> {
192        Reader::from_rpc_read(&self.rpc, hash)
193            .await?
194            .read_to_bytes()
195            .await
196    }
197
198    /// Read all bytes of single blob at `offset` for length `len`.
199    ///
200    /// This allocates a buffer for the full length.
201    pub async fn read_at_to_bytes(&self, hash: Hash, offset: u64, len: ReadAtLen) -> Result<Bytes> {
202        Reader::from_rpc_read_at(&self.rpc, hash, offset, len)
203            .await?
204            .read_to_bytes()
205            .await
206    }
207
208    /// Import a blob from a filesystem path.
209    ///
210    /// `path` should be an absolute path valid for the file system on which
211    /// the node runs.
212    /// If `in_place` is true, Iroh will assume that the data will not change and will share it in
213    /// place without copying to the Iroh data directory.
214    pub async fn add_from_path(
215        &self,
216        path: PathBuf,
217        in_place: bool,
218        tag: SetTagOption,
219        wrap: WrapOption,
220    ) -> Result<AddProgress> {
221        let stream = self
222            .rpc
223            .server_streaming(AddPathRequest {
224                path,
225                in_place,
226                tag,
227                wrap,
228            })
229            .await?;
230        Ok(AddProgress::new(stream))
231    }
232
233    /// Create a collection from already existing blobs.
234    ///
235    /// For automatically clearing the tags for the passed in blobs you can set
236    /// `tags_to_delete` to those tags, and they will be deleted once the collection is created.
237    pub async fn create_collection(
238        &self,
239        collection: Collection,
240        tag: SetTagOption,
241        tags_to_delete: Vec<Tag>,
242    ) -> anyhow::Result<(Hash, Tag)> {
243        let CreateCollectionResponse { hash, tag } = self
244            .rpc
245            .rpc(CreateCollectionRequest {
246                collection,
247                tag,
248                tags_to_delete,
249            })
250            .await??;
251        Ok((hash, tag))
252    }
253
254    /// Write a blob by passing an async reader.
255    pub async fn add_reader(
256        &self,
257        reader: impl AsyncRead + Unpin + Send + 'static,
258        tag: SetTagOption,
259    ) -> anyhow::Result<AddProgress> {
260        const CAP: usize = 1024 * 64; // send 64KB per request by default
261        let input = ReaderStream::with_capacity(reader, CAP);
262        self.add_stream(input, tag).await
263    }
264
265    /// Write a blob by passing a stream of bytes.
266    pub async fn add_stream(
267        &self,
268        input: impl Stream<Item = io::Result<Bytes>> + Send + Unpin + 'static,
269        tag: SetTagOption,
270    ) -> anyhow::Result<AddProgress> {
271        let (mut sink, progress) = self.rpc.bidi(AddStreamRequest { tag }).await?;
272        let mut input = input.map(|chunk| match chunk {
273            Ok(chunk) => Ok(AddStreamUpdate::Chunk(chunk)),
274            Err(err) => {
275                warn!("Abort send, reason: failed to read from source stream: {err:?}");
276                Ok(AddStreamUpdate::Abort)
277            }
278        });
279        tokio::spawn(async move {
280            if let Err(err) = sink.send_all(&mut input).await {
281                // if we get an error in send_all due to the connection being closed, this will just fail again.
282                // if we get an error due to something else (serialization or size limit), tell the remote to abort.
283                sink.send(AddStreamUpdate::Abort).await.ok();
284                warn!("Failed to send input stream to remote: {err:?}");
285            }
286        });
287
288        Ok(AddProgress::new(progress))
289    }
290
291    /// Write a blob by passing bytes.
292    pub async fn add_bytes(&self, bytes: impl Into<Bytes>) -> anyhow::Result<AddOutcome> {
293        let input = chunked_bytes_stream(bytes.into(), 1024 * 64).map(Ok);
294        self.add_stream(input, SetTagOption::Auto).await?.await
295    }
296
297    /// Write a blob by passing bytes, setting an explicit tag name.
298    pub async fn add_bytes_named(
299        &self,
300        bytes: impl Into<Bytes>,
301        name: impl Into<Tag>,
302    ) -> anyhow::Result<AddOutcome> {
303        let input = chunked_bytes_stream(bytes.into(), 1024 * 64).map(Ok);
304        self.add_stream(input, SetTagOption::Named(name.into()))
305            .await?
306            .await
307    }
308
309    /// Validate hashes on the running node.
310    ///
311    /// If `repair` is true, repair the store by removing invalid data.
312    pub async fn validate(
313        &self,
314        repair: bool,
315    ) -> Result<impl Stream<Item = Result<ValidateProgress>>> {
316        let stream = self
317            .rpc
318            .server_streaming(ValidateRequest { repair })
319            .await?;
320        Ok(stream.map(|res| res.map_err(anyhow::Error::from)))
321    }
322
323    /// Validate hashes on the running node.
324    ///
325    /// If `repair` is true, repair the store by removing invalid data.
326    pub async fn consistency_check(
327        &self,
328        repair: bool,
329    ) -> Result<impl Stream<Item = Result<ConsistencyCheckProgress>>> {
330        let stream = self
331            .rpc
332            .server_streaming(ConsistencyCheckRequest { repair })
333            .await?;
334        Ok(stream.map(|r| r.map_err(anyhow::Error::from)))
335    }
336
337    /// Download a blob from another node and add it to the local database.
338    pub async fn download(&self, hash: Hash, node: NodeAddr) -> Result<DownloadProgress> {
339        self.download_with_opts(
340            hash,
341            DownloadOptions {
342                format: BlobFormat::Raw,
343                nodes: vec![node],
344                tag: SetTagOption::Auto,
345                mode: DownloadMode::Queued,
346            },
347        )
348        .await
349    }
350
351    /// Download a hash sequence from another node and add it to the local database.
352    pub async fn download_hash_seq(&self, hash: Hash, node: NodeAddr) -> Result<DownloadProgress> {
353        self.download_with_opts(
354            hash,
355            DownloadOptions {
356                format: BlobFormat::HashSeq,
357                nodes: vec![node],
358                tag: SetTagOption::Auto,
359                mode: DownloadMode::Queued,
360            },
361        )
362        .await
363    }
364
365    /// Download a blob, with additional options.
366    pub async fn download_with_opts(
367        &self,
368        hash: Hash,
369        opts: DownloadOptions,
370    ) -> Result<DownloadProgress> {
371        let DownloadOptions {
372            format,
373            nodes,
374            tag,
375            mode,
376        } = opts;
377        let stream = self
378            .rpc
379            .server_streaming(BlobDownloadRequest {
380                hash,
381                format,
382                nodes,
383                tag,
384                mode,
385            })
386            .await?;
387        Ok(DownloadProgress::new(
388            stream.map(|res| res.map_err(anyhow::Error::from)),
389        ))
390    }
391
392    /// Export a blob from the internal blob store to a path on the node's filesystem.
393    ///
394    /// `destination` should be an writeable, absolute path on the local node's filesystem.
395    ///
396    /// If `format` is set to [`ExportFormat::Collection`], and the `hash` refers to a collection,
397    /// all children of the collection will be exported. See [`ExportFormat`] for details.
398    ///
399    /// The `mode` argument defines if the blob should be copied to the target location or moved out of
400    /// the internal store into the target location. See [`ExportMode`] for details.
401    pub async fn export(
402        &self,
403        hash: Hash,
404        destination: PathBuf,
405        format: ExportFormat,
406        mode: ExportMode,
407    ) -> Result<ExportProgress> {
408        let req = ExportRequest {
409            hash,
410            path: destination,
411            format,
412            mode,
413        };
414        let stream = self.rpc.server_streaming(req).await?;
415        Ok(ExportProgress::new(
416            stream.map(|r| r.map_err(anyhow::Error::from)),
417        ))
418    }
419
420    /// List all complete blobs.
421    pub async fn list(&self) -> Result<impl Stream<Item = Result<BlobInfo>>> {
422        let stream = self.rpc.server_streaming(ListRequest).await?;
423        Ok(flatten(stream))
424    }
425
426    /// List all incomplete (partial) blobs.
427    pub async fn list_incomplete(&self) -> Result<impl Stream<Item = Result<IncompleteBlobInfo>>> {
428        let stream = self.rpc.server_streaming(ListIncompleteRequest).await?;
429        Ok(flatten(stream))
430    }
431
432    /// Read the content of a collection.
433    pub async fn get_collection(&self, hash: Hash) -> Result<Collection> {
434        Collection::load(hash, self).await
435    }
436
437    /// List all collections.
438    pub fn list_collections(&self) -> Result<impl Stream<Item = Result<CollectionInfo>>> {
439        let this = self.clone();
440        Ok(Gen::new(|co| async move {
441            if let Err(cause) = this.list_collections_impl(&co).await {
442                co.yield_(Err(cause)).await;
443            }
444        }))
445    }
446
447    async fn list_collections_impl(&self, co: &Co<Result<CollectionInfo>>) -> Result<()> {
448        let tags = self.tags_client();
449        let mut tags = tags.list_hash_seq().await?;
450        while let Some(tag) = tags.next().await {
451            let tag = tag?;
452            if let Ok(collection) = self.get_collection(tag.hash).await {
453                let info = CollectionInfo {
454                    tag: tag.name,
455                    hash: tag.hash,
456                    total_blobs_count: Some(collection.len() as u64 + 1),
457                    total_blobs_size: Some(0),
458                };
459                co.yield_(Ok(info)).await;
460            }
461        }
462        Ok(())
463    }
464
465    /// Delete a blob.
466    ///
467    /// **Warning**: this operation deletes the blob from the local store even
468    /// if it is tagged. You should usually not do this manually, but rely on the
469    /// node to remove data that is not tagged.
470    pub async fn delete_blob(&self, hash: Hash) -> Result<()> {
471        self.rpc.rpc(DeleteRequest { hash }).await??;
472        Ok(())
473    }
474
475    fn tags_client(&self) -> tags::Client<C> {
476        tags::Client::new(self.rpc.clone())
477    }
478}
479
480impl<C> SimpleStore for Client<C>
481where
482    C: Connector<RpcService>,
483{
484    async fn load(&self, hash: Hash) -> anyhow::Result<Bytes> {
485        self.read_to_bytes(hash).await
486    }
487}
488
489/// Defines the way to read bytes.
490#[derive(Debug, Serialize, Deserialize, Default, Clone, Copy)]
491pub enum ReadAtLen {
492    /// Reads all available bytes.
493    #[default]
494    All,
495    /// Reads exactly this many bytes, erroring out on larger or smaller.
496    Exact(u64),
497    /// Reads at most this many bytes.
498    AtMost(u64),
499}
500
501impl ReadAtLen {
502    /// todo make private again
503    pub fn as_result_len(&self, size_remaining: u64) -> u64 {
504        match self {
505            ReadAtLen::All => size_remaining,
506            ReadAtLen::Exact(len) => *len,
507            ReadAtLen::AtMost(len) => std::cmp::min(*len, size_remaining),
508        }
509    }
510}
511
512/// Whether to wrap the added data in a collection.
513#[derive(Debug, Serialize, Deserialize, Default, Clone)]
514pub enum WrapOption {
515    /// Do not wrap the file or directory.
516    #[default]
517    NoWrap,
518    /// Wrap the file or directory in a collection.
519    Wrap {
520        /// Override the filename in the wrapping collection.
521        name: Option<String>,
522    },
523}
524
525/// Status information about a blob.
526#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
527pub enum BlobStatus {
528    /// The blob is not stored at all.
529    NotFound,
530    /// The blob is only stored partially.
531    Partial {
532        /// The size of the currently stored partial blob.
533        size: BaoBlobSize,
534    },
535    /// The blob is stored completely.
536    Complete {
537        /// The size of the blob.
538        size: u64,
539    },
540}
541
542/// Outcome of a blob add operation.
543#[derive(Debug, Clone)]
544pub struct AddOutcome {
545    /// The hash of the blob
546    pub hash: Hash,
547    /// The format the blob
548    pub format: BlobFormat,
549    /// The size of the blob
550    pub size: u64,
551    /// The tag of the blob
552    pub tag: Tag,
553}
554
555/// Information about a stored collection.
556#[derive(Debug, Serialize, Deserialize)]
557pub struct CollectionInfo {
558    /// Tag of the collection
559    pub tag: Tag,
560
561    /// Hash of the collection
562    pub hash: Hash,
563    /// Number of children in the collection
564    ///
565    /// This is an optional field, because the data is not always available.
566    pub total_blobs_count: Option<u64>,
567    /// Total size of the raw data referred to by all links
568    ///
569    /// This is an optional field, because the data is not always available.
570    pub total_blobs_size: Option<u64>,
571}
572
573/// Information about a complete blob.
574#[derive(Debug, Serialize, Deserialize)]
575pub struct BlobInfo {
576    /// Location of the blob
577    pub path: String,
578    /// The hash of the blob
579    pub hash: Hash,
580    /// The size of the blob
581    pub size: u64,
582}
583
584/// Information about an incomplete blob.
585#[derive(Debug, Serialize, Deserialize)]
586pub struct IncompleteBlobInfo {
587    /// The size we got
588    pub size: u64,
589    /// The size we expect
590    pub expected_size: u64,
591    /// The hash of the blob
592    pub hash: Hash,
593}
594
595/// Progress stream for blob add operations.
596#[derive(derive_more::Debug)]
597pub struct AddProgress {
598    #[debug(skip)]
599    stream:
600        Pin<Box<dyn Stream<Item = Result<crate::provider::AddProgress>> + Send + Unpin + 'static>>,
601    current_total_size: Arc<AtomicU64>,
602}
603
604impl AddProgress {
605    fn new(
606        stream: (impl Stream<
607            Item = Result<impl Into<crate::provider::AddProgress>, impl Into<anyhow::Error>>,
608        > + Send
609             + Unpin
610             + 'static),
611    ) -> Self {
612        let current_total_size = Arc::new(AtomicU64::new(0));
613        let total_size = current_total_size.clone();
614        let stream = stream.map(move |item| match item {
615            Ok(item) => {
616                let item = item.into();
617                if let crate::provider::AddProgress::Found { size, .. } = &item {
618                    total_size.fetch_add(*size, Ordering::Relaxed);
619                }
620                Ok(item)
621            }
622            Err(err) => Err(err.into()),
623        });
624        Self {
625            stream: Box::pin(stream),
626            current_total_size,
627        }
628    }
629    /// Finish writing the stream, ignoring all intermediate progress events.
630    ///
631    /// Returns a [`AddOutcome`] which contains a tag, format, hash and a size.
632    /// When importing a single blob, this is the hash and size of that blob.
633    /// When importing a collection, the hash is the hash of the collection and the size
634    /// is the total size of all imported blobs (but excluding the size of the collection blob
635    /// itself).
636    pub async fn finish(self) -> Result<AddOutcome> {
637        self.await
638    }
639}
640
641impl Stream for AddProgress {
642    type Item = Result<crate::provider::AddProgress>;
643    fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
644        Pin::new(&mut self.stream).poll_next(cx)
645    }
646}
647
648impl Future for AddProgress {
649    type Output = Result<AddOutcome>;
650
651    fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
652        loop {
653            match Pin::new(&mut self.stream).poll_next(cx) {
654                Poll::Pending => return Poll::Pending,
655                Poll::Ready(None) => {
656                    return Poll::Ready(Err(anyhow!("Response stream ended prematurely")))
657                }
658                Poll::Ready(Some(Err(err))) => return Poll::Ready(Err(err)),
659                Poll::Ready(Some(Ok(msg))) => match msg {
660                    crate::provider::AddProgress::AllDone { hash, format, tag } => {
661                        let outcome = AddOutcome {
662                            hash,
663                            format,
664                            tag,
665                            size: self.current_total_size.load(Ordering::Relaxed),
666                        };
667                        return Poll::Ready(Ok(outcome));
668                    }
669                    crate::provider::AddProgress::Abort(err) => {
670                        return Poll::Ready(Err(err.into()));
671                    }
672                    _ => {}
673                },
674            }
675        }
676    }
677}
678
679/// Outcome of a blob download operation.
680#[derive(Debug, Clone)]
681pub struct DownloadOutcome {
682    /// The size of the data we already had locally
683    pub local_size: u64,
684    /// The size of the data we downloaded from the network
685    pub downloaded_size: u64,
686    /// Statistics about the download
687    pub stats: crate::get::Stats,
688}
689
690/// Progress stream for blob download operations.
691#[derive(derive_more::Debug)]
692pub struct DownloadProgress {
693    #[debug(skip)]
694    stream: Pin<Box<dyn Stream<Item = Result<BytesDownloadProgress>> + Send + Unpin + 'static>>,
695    current_local_size: Arc<AtomicU64>,
696    current_network_size: Arc<AtomicU64>,
697}
698
699impl DownloadProgress {
700    /// Create a [`DownloadProgress`] that can help you easily poll the [`BytesDownloadProgress`] stream from your download until it is finished or errors.
701    pub fn new(
702        stream: (impl Stream<Item = Result<impl Into<BytesDownloadProgress>, impl Into<anyhow::Error>>>
703             + Send
704             + Unpin
705             + 'static),
706    ) -> Self {
707        let current_local_size = Arc::new(AtomicU64::new(0));
708        let current_network_size = Arc::new(AtomicU64::new(0));
709
710        let local_size = current_local_size.clone();
711        let network_size = current_network_size.clone();
712
713        let stream = stream.map(move |item| match item {
714            Ok(item) => {
715                let item = item.into();
716                match &item {
717                    BytesDownloadProgress::FoundLocal { size, .. } => {
718                        local_size.fetch_add(size.value(), Ordering::Relaxed);
719                    }
720                    BytesDownloadProgress::Found { size, .. } => {
721                        network_size.fetch_add(*size, Ordering::Relaxed);
722                    }
723                    _ => {}
724                }
725
726                Ok(item)
727            }
728            Err(err) => Err(err.into()),
729        });
730        Self {
731            stream: Box::pin(stream),
732            current_local_size,
733            current_network_size,
734        }
735    }
736
737    /// Finish writing the stream, ignoring all intermediate progress events.
738    ///
739    /// Returns a [`DownloadOutcome`] which contains the size of the content we downloaded and the size of the content we already had locally.
740    /// When importing a single blob, this is the size of that blob.
741    /// When importing a collection, this is the total size of all imported blobs (but excluding the size of the collection blob itself).
742    pub async fn finish(self) -> Result<DownloadOutcome> {
743        self.await
744    }
745}
746
747impl Stream for DownloadProgress {
748    type Item = Result<BytesDownloadProgress>;
749    fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
750        Pin::new(&mut self.stream).poll_next(cx)
751    }
752}
753
754impl Future for DownloadProgress {
755    type Output = Result<DownloadOutcome>;
756
757    fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
758        loop {
759            match Pin::new(&mut self.stream).poll_next(cx) {
760                Poll::Pending => return Poll::Pending,
761                Poll::Ready(None) => {
762                    return Poll::Ready(Err(anyhow!("Response stream ended prematurely")))
763                }
764                Poll::Ready(Some(Err(err))) => return Poll::Ready(Err(err)),
765                Poll::Ready(Some(Ok(msg))) => match msg {
766                    BytesDownloadProgress::AllDone(stats) => {
767                        let outcome = DownloadOutcome {
768                            local_size: self.current_local_size.load(Ordering::Relaxed),
769                            downloaded_size: self.current_network_size.load(Ordering::Relaxed),
770                            stats,
771                        };
772                        return Poll::Ready(Ok(outcome));
773                    }
774                    BytesDownloadProgress::Abort(err) => {
775                        return Poll::Ready(Err(err.into()));
776                    }
777                    _ => {}
778                },
779            }
780        }
781    }
782}
783
784/// Outcome of a blob export operation.
785#[derive(Debug, Clone, PartialEq, Eq)]
786pub struct ExportOutcome {
787    /// The total size of the exported data.
788    total_size: u64,
789}
790
791/// Progress stream for blob export operations.
792#[derive(derive_more::Debug)]
793pub struct ExportProgress {
794    #[debug(skip)]
795    stream: Pin<Box<dyn Stream<Item = Result<BytesExportProgress>> + Send + Unpin + 'static>>,
796    current_total_size: Arc<AtomicU64>,
797}
798
799impl ExportProgress {
800    /// Create a [`ExportProgress`] that can help you easily poll the [`BytesExportProgress`] stream from your
801    /// download until it is finished or errors.
802    pub fn new(
803        stream: (impl Stream<Item = Result<impl Into<BytesExportProgress>, impl Into<anyhow::Error>>>
804             + Send
805             + Unpin
806             + 'static),
807    ) -> Self {
808        let current_total_size = Arc::new(AtomicU64::new(0));
809        let total_size = current_total_size.clone();
810        let stream = stream.map(move |item| match item {
811            Ok(item) => {
812                let item = item.into();
813                if let BytesExportProgress::Found { size, .. } = &item {
814                    let size = size.value();
815                    total_size.fetch_add(size, Ordering::Relaxed);
816                }
817
818                Ok(item)
819            }
820            Err(err) => Err(err.into()),
821        });
822        Self {
823            stream: Box::pin(stream),
824            current_total_size,
825        }
826    }
827
828    /// Finish writing the stream, ignoring all intermediate progress events.
829    ///
830    /// Returns a [`ExportOutcome`] which contains the size of the content we exported.
831    pub async fn finish(self) -> Result<ExportOutcome> {
832        self.await
833    }
834}
835
836impl Stream for ExportProgress {
837    type Item = Result<BytesExportProgress>;
838    fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
839        Pin::new(&mut self.stream).poll_next(cx)
840    }
841}
842
843impl Future for ExportProgress {
844    type Output = Result<ExportOutcome>;
845
846    fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
847        loop {
848            match Pin::new(&mut self.stream).poll_next(cx) {
849                Poll::Pending => return Poll::Pending,
850                Poll::Ready(None) => {
851                    return Poll::Ready(Err(anyhow!("Response stream ended prematurely")))
852                }
853                Poll::Ready(Some(Err(err))) => return Poll::Ready(Err(err)),
854                Poll::Ready(Some(Ok(msg))) => match msg {
855                    BytesExportProgress::AllDone => {
856                        let outcome = ExportOutcome {
857                            total_size: self.current_total_size.load(Ordering::Relaxed),
858                        };
859                        return Poll::Ready(Ok(outcome));
860                    }
861                    BytesExportProgress::Abort(err) => {
862                        return Poll::Ready(Err(err.into()));
863                    }
864                    _ => {}
865                },
866            }
867        }
868    }
869}
870
871/// Data reader for a single blob.
872///
873/// Implements [`AsyncRead`].
874#[derive(derive_more::Debug)]
875pub struct Reader {
876    size: u64,
877    response_size: u64,
878    is_complete: bool,
879    #[debug("StreamReader")]
880    stream: tokio_util::io::StreamReader<BoxStreamSync<'static, io::Result<Bytes>>, Bytes>,
881}
882
883impl Reader {
884    fn new(
885        size: u64,
886        response_size: u64,
887        is_complete: bool,
888        stream: BoxStreamSync<'static, io::Result<Bytes>>,
889    ) -> Self {
890        Self {
891            size,
892            response_size,
893            is_complete,
894            stream: StreamReader::new(stream),
895        }
896    }
897
898    /// todo make private again
899    pub async fn from_rpc_read<C>(
900        rpc: &RpcClient<RpcService, C>,
901        hash: Hash,
902    ) -> anyhow::Result<Self>
903    where
904        C: Connector<RpcService>,
905    {
906        Self::from_rpc_read_at(rpc, hash, 0, ReadAtLen::All).await
907    }
908
909    async fn from_rpc_read_at<C>(
910        rpc: &RpcClient<RpcService, C>,
911        hash: Hash,
912        offset: u64,
913        len: ReadAtLen,
914    ) -> anyhow::Result<Self>
915    where
916        C: Connector<RpcService>,
917    {
918        let stream = rpc
919            .server_streaming(ReadAtRequest { hash, offset, len })
920            .await?;
921        let mut stream = flatten(stream);
922
923        let (size, is_complete) = match stream.next().await {
924            Some(Ok(ReadAtResponse::Entry { size, is_complete })) => (size, is_complete),
925            Some(Err(err)) => return Err(err),
926            Some(Ok(_)) => return Err(anyhow!("Expected header frame, but got data frame")),
927            None => return Err(anyhow!("Expected header frame, but RPC stream was dropped")),
928        };
929
930        let stream = stream.map(|item| match item {
931            Ok(ReadAtResponse::Data { chunk }) => Ok(chunk),
932            Ok(_) => Err(io::Error::new(io::ErrorKind::Other, "Expected data frame")),
933            Err(err) => Err(io::Error::new(io::ErrorKind::Other, format!("{err}"))),
934        });
935        let len = len.as_result_len(size.value() - offset);
936        Ok(Self::new(size.value(), len, is_complete, Box::pin(stream)))
937    }
938
939    /// Total size of this blob.
940    pub fn size(&self) -> u64 {
941        self.size
942    }
943
944    /// Whether this blob has been downloaded completely.
945    ///
946    /// Returns false for partial blobs for which some chunks are missing.
947    pub fn is_complete(&self) -> bool {
948        self.is_complete
949    }
950
951    /// Read all bytes of the blob.
952    pub async fn read_to_bytes(&mut self) -> anyhow::Result<Bytes> {
953        let mut buf = Vec::with_capacity(self.response_size as usize);
954        self.read_to_end(&mut buf).await?;
955        Ok(buf.into())
956    }
957}
958
959impl AsyncRead for Reader {
960    fn poll_read(
961        mut self: Pin<&mut Self>,
962        cx: &mut Context<'_>,
963        buf: &mut ReadBuf<'_>,
964    ) -> Poll<io::Result<()>> {
965        Pin::new(&mut self.stream).poll_read(cx, buf)
966    }
967}
968
969impl Stream for Reader {
970    type Item = io::Result<Bytes>;
971
972    fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
973        Pin::new(&mut self.stream).get_pin_mut().poll_next(cx)
974    }
975
976    fn size_hint(&self) -> (usize, Option<usize>) {
977        self.stream.get_ref().size_hint()
978    }
979}
980
981/// Options to configure a download request.
982#[derive(Debug, Clone, Serialize, Deserialize)]
983pub struct DownloadOptions {
984    /// The format of the data to download.
985    pub format: BlobFormat,
986    /// Source nodes to download from.
987    ///
988    /// If set to more than a single node, they will all be tried. If `mode` is set to
989    /// [`DownloadMode::Direct`], they will be tried sequentially until a download succeeds.
990    /// If `mode` is set to [`DownloadMode::Queued`], the nodes may be dialed in parallel,
991    /// if the concurrency limits permit.
992    pub nodes: Vec<NodeAddr>,
993    /// Optional tag to tag the data with.
994    pub tag: SetTagOption,
995    /// Whether to directly start the download or add it to the download queue.
996    pub mode: DownloadMode,
997}
998
999fn chunked_bytes_stream(mut b: Bytes, c: usize) -> impl Stream<Item = Bytes> {
1000    futures_lite::stream::iter(std::iter::from_fn(move || {
1001        Some(b.split_to(b.len().min(c))).filter(|x| !x.is_empty())
1002    }))
1003}
1004
1005#[cfg(test)]
1006mod tests {
1007    use std::{path::Path, time::Duration};
1008
1009    use iroh::{test_utils::DnsPkarrServer, NodeId, RelayMode, SecretKey};
1010    use node::Node;
1011    use rand::RngCore;
1012    use testresult::TestResult;
1013    use tokio::{io::AsyncWriteExt, sync::mpsc};
1014    use tracing_test::traced_test;
1015
1016    use super::*;
1017    use crate::{hashseq::HashSeq, ticket::BlobTicket};
1018
1019    mod node {
1020        //! An iroh node that just has the blobs transport
1021        use std::path::Path;
1022
1023        use iroh::{protocol::Router, Endpoint, NodeAddr, NodeId};
1024        use tokio_util::task::AbortOnDropHandle;
1025
1026        use super::RpcService;
1027        use crate::{
1028            net_protocol::Blobs,
1029            provider::{CustomEventSender, EventSender},
1030            rpc::client::{blobs, tags},
1031        };
1032
1033        type RpcClient = quic_rpc::RpcClient<RpcService>;
1034
1035        /// An iroh node that just has the blobs transport
1036        #[derive(Debug)]
1037        pub struct Node {
1038            router: iroh::protocol::Router,
1039            client: RpcClient,
1040            _rpc_task: AbortOnDropHandle<()>,
1041        }
1042
1043        /// An iroh node builder
1044        #[derive(Debug)]
1045        pub struct Builder<S> {
1046            store: S,
1047            events: EventSender,
1048            endpoint: Option<iroh::endpoint::Builder>,
1049        }
1050
1051        impl<S: crate::store::Store> Builder<S> {
1052            /// Sets the event sender
1053            pub fn blobs_events(self, events: impl CustomEventSender) -> Self {
1054                Self {
1055                    events: events.into(),
1056                    ..self
1057                }
1058            }
1059
1060            /// Set an endpoint builder
1061            pub fn endpoint(self, endpoint: iroh::endpoint::Builder) -> Self {
1062                Self {
1063                    endpoint: Some(endpoint),
1064                    ..self
1065                }
1066            }
1067
1068            /// Spawns the node
1069            pub async fn spawn(self) -> anyhow::Result<Node> {
1070                let store = self.store;
1071                let events = self.events;
1072                let endpoint = self
1073                    .endpoint
1074                    .unwrap_or_else(|| Endpoint::builder().discovery_n0())
1075                    .bind()
1076                    .await?;
1077                let mut router = Router::builder(endpoint.clone());
1078
1079                // Setup blobs
1080                let blobs = Blobs::builder(store.clone())
1081                    .events(events)
1082                    .build(&endpoint);
1083                router = router.accept(crate::ALPN, blobs.clone());
1084
1085                // Build the router
1086                let router = router.spawn();
1087
1088                // Setup RPC
1089                let (internal_rpc, controller) = quic_rpc::transport::flume::channel(32);
1090                let internal_rpc = quic_rpc::RpcServer::new(internal_rpc).boxed();
1091                let _rpc_task = internal_rpc.spawn_accept_loop(move |msg, chan| {
1092                    blobs.clone().handle_rpc_request(msg, chan)
1093                });
1094                let client = quic_rpc::RpcClient::new(controller).boxed();
1095                Ok(Node {
1096                    router,
1097                    client,
1098                    _rpc_task,
1099                })
1100            }
1101        }
1102
1103        impl Node {
1104            /// Creates a new node with memory storage
1105            pub fn memory() -> Builder<crate::store::mem::Store> {
1106                Builder {
1107                    store: crate::store::mem::Store::new(),
1108                    events: Default::default(),
1109                    endpoint: None,
1110                }
1111            }
1112
1113            /// Creates a new node with persistent storage
1114            pub async fn persistent(
1115                path: impl AsRef<Path>,
1116            ) -> anyhow::Result<Builder<crate::store::fs::Store>> {
1117                Ok(Builder {
1118                    store: crate::store::fs::Store::load(path).await?,
1119                    events: Default::default(),
1120                    endpoint: None,
1121                })
1122            }
1123
1124            /// Returns the node id
1125            pub fn node_id(&self) -> NodeId {
1126                self.router.endpoint().node_id()
1127            }
1128
1129            /// Returns the node address
1130            pub async fn node_addr(&self) -> anyhow::Result<NodeAddr> {
1131                self.router.endpoint().node_addr().await
1132            }
1133
1134            /// Shuts down the node
1135            pub async fn shutdown(self) -> anyhow::Result<()> {
1136                self.router.shutdown().await
1137            }
1138
1139            /// Returns an in-memory blobs client
1140            pub fn blobs(&self) -> blobs::Client {
1141                blobs::Client::new(self.client.clone())
1142            }
1143
1144            /// Returns an in-memory tags client
1145            pub fn tags(&self) -> tags::Client {
1146                tags::Client::new(self.client.clone())
1147            }
1148        }
1149    }
1150
1151    #[tokio::test]
1152    #[traced_test]
1153    async fn test_blob_create_collection() -> Result<()> {
1154        let node = node::Node::memory().spawn().await?;
1155
1156        // create temp file
1157        let temp_dir = tempfile::tempdir().context("tempdir")?;
1158
1159        let in_root = temp_dir.path().join("in");
1160        tokio::fs::create_dir_all(in_root.clone())
1161            .await
1162            .context("create dir all")?;
1163
1164        let mut paths = Vec::new();
1165        for i in 0..5 {
1166            let path = in_root.join(format!("test-{i}"));
1167            let size = 100;
1168            let mut buf = vec![0u8; size];
1169            rand::thread_rng().fill_bytes(&mut buf);
1170            let mut file = tokio::fs::File::create(path.clone())
1171                .await
1172                .context("create file")?;
1173            file.write_all(&buf.clone()).await.context("write_all")?;
1174            file.flush().await.context("flush")?;
1175            paths.push(path);
1176        }
1177
1178        let blobs = node.blobs();
1179
1180        let mut collection = Collection::default();
1181        let mut tags = Vec::new();
1182        // import files
1183        for path in &paths {
1184            let import_outcome = blobs
1185                .add_from_path(
1186                    path.to_path_buf(),
1187                    false,
1188                    SetTagOption::Auto,
1189                    WrapOption::NoWrap,
1190                )
1191                .await
1192                .context("import file")?
1193                .finish()
1194                .await
1195                .context("import finish")?;
1196
1197            collection.push(
1198                path.file_name().unwrap().to_str().unwrap().to_string(),
1199                import_outcome.hash,
1200            );
1201            tags.push(import_outcome.tag);
1202        }
1203
1204        let (hash, tag) = blobs
1205            .create_collection(collection, SetTagOption::Auto, tags)
1206            .await?;
1207
1208        let collections: Vec<_> = blobs.list_collections()?.try_collect().await?;
1209
1210        assert_eq!(collections.len(), 1);
1211        {
1212            let CollectionInfo {
1213                tag,
1214                hash,
1215                total_blobs_count,
1216                ..
1217            } = &collections[0];
1218            assert_eq!(tag, tag);
1219            assert_eq!(hash, hash);
1220            // 5 blobs + 1 meta
1221            assert_eq!(total_blobs_count, &Some(5 + 1));
1222        }
1223
1224        // check that "temp" tags have been deleted
1225        let tags: Vec<_> = node.tags().list().await?.try_collect().await?;
1226        assert_eq!(tags.len(), 1);
1227        assert_eq!(tags[0].hash, hash);
1228        assert_eq!(tags[0].name, tag);
1229        assert_eq!(tags[0].format, BlobFormat::HashSeq);
1230
1231        Ok(())
1232    }
1233
1234    #[tokio::test]
1235    #[traced_test]
1236    async fn test_blob_read_at() -> Result<()> {
1237        let node = node::Node::memory().spawn().await?;
1238
1239        // create temp file
1240        let temp_dir = tempfile::tempdir().context("tempdir")?;
1241
1242        let in_root = temp_dir.path().join("in");
1243        tokio::fs::create_dir_all(in_root.clone())
1244            .await
1245            .context("create dir all")?;
1246
1247        let path = in_root.join("test-blob");
1248        let size = 1024 * 128;
1249        let buf: Vec<u8> = (0..size).map(|i| i as u8).collect();
1250        let mut file = tokio::fs::File::create(path.clone())
1251            .await
1252            .context("create file")?;
1253        file.write_all(&buf.clone()).await.context("write_all")?;
1254        file.flush().await.context("flush")?;
1255
1256        let blobs = node.blobs();
1257
1258        let import_outcome = blobs
1259            .add_from_path(
1260                path.to_path_buf(),
1261                false,
1262                SetTagOption::Auto,
1263                WrapOption::NoWrap,
1264            )
1265            .await
1266            .context("import file")?
1267            .finish()
1268            .await
1269            .context("import finish")?;
1270
1271        let hash = import_outcome.hash;
1272
1273        // Read everything
1274        let res = blobs.read_to_bytes(hash).await?;
1275        assert_eq!(&res, &buf[..]);
1276
1277        // Read at smaller than blob_get_chunk_size
1278        let res = blobs
1279            .read_at_to_bytes(hash, 0, ReadAtLen::Exact(100))
1280            .await?;
1281        assert_eq!(res.len(), 100);
1282        assert_eq!(&res[..], &buf[0..100]);
1283
1284        let res = blobs
1285            .read_at_to_bytes(hash, 20, ReadAtLen::Exact(120))
1286            .await?;
1287        assert_eq!(res.len(), 120);
1288        assert_eq!(&res[..], &buf[20..140]);
1289
1290        // Read at equal to blob_get_chunk_size
1291        let res = blobs
1292            .read_at_to_bytes(hash, 0, ReadAtLen::Exact(1024 * 64))
1293            .await?;
1294        assert_eq!(res.len(), 1024 * 64);
1295        assert_eq!(&res[..], &buf[0..1024 * 64]);
1296
1297        let res = blobs
1298            .read_at_to_bytes(hash, 20, ReadAtLen::Exact(1024 * 64))
1299            .await?;
1300        assert_eq!(res.len(), 1024 * 64);
1301        assert_eq!(&res[..], &buf[20..(20 + 1024 * 64)]);
1302
1303        // Read at larger than blob_get_chunk_size
1304        let res = blobs
1305            .read_at_to_bytes(hash, 0, ReadAtLen::Exact(10 + 1024 * 64))
1306            .await?;
1307        assert_eq!(res.len(), 10 + 1024 * 64);
1308        assert_eq!(&res[..], &buf[0..(10 + 1024 * 64)]);
1309
1310        let res = blobs
1311            .read_at_to_bytes(hash, 20, ReadAtLen::Exact(10 + 1024 * 64))
1312            .await?;
1313        assert_eq!(res.len(), 10 + 1024 * 64);
1314        assert_eq!(&res[..], &buf[20..(20 + 10 + 1024 * 64)]);
1315
1316        // full length
1317        let res = blobs.read_at_to_bytes(hash, 20, ReadAtLen::All).await?;
1318        assert_eq!(res.len(), 1024 * 128 - 20);
1319        assert_eq!(&res[..], &buf[20..]);
1320
1321        // size should be total
1322        let reader = blobs.read_at(hash, 0, ReadAtLen::Exact(20)).await?;
1323        assert_eq!(reader.size(), 1024 * 128);
1324        assert_eq!(reader.response_size, 20);
1325
1326        // last chunk - exact
1327        let res = blobs
1328            .read_at_to_bytes(hash, 1024 * 127, ReadAtLen::Exact(1024))
1329            .await?;
1330        assert_eq!(res.len(), 1024);
1331        assert_eq!(res, &buf[1024 * 127..]);
1332
1333        // last chunk - open
1334        let res = blobs
1335            .read_at_to_bytes(hash, 1024 * 127, ReadAtLen::All)
1336            .await?;
1337        assert_eq!(res.len(), 1024);
1338        assert_eq!(res, &buf[1024 * 127..]);
1339
1340        // last chunk - larger
1341        let mut res = blobs
1342            .read_at(hash, 1024 * 127, ReadAtLen::AtMost(2048))
1343            .await?;
1344        assert_eq!(res.size, 1024 * 128);
1345        assert_eq!(res.response_size, 1024);
1346        let res = res.read_to_bytes().await?;
1347        assert_eq!(res.len(), 1024);
1348        assert_eq!(res, &buf[1024 * 127..]);
1349
1350        // out of bounds - too long
1351        let res = blobs
1352            .read_at(hash, 0, ReadAtLen::Exact(1024 * 128 + 1))
1353            .await;
1354        let err = res.unwrap_err();
1355        assert!(err.to_string().contains("out of bound"));
1356
1357        // out of bounds - offset larger than blob
1358        let res = blobs.read_at(hash, 1024 * 128 + 1, ReadAtLen::All).await;
1359        let err = res.unwrap_err();
1360        assert!(err.to_string().contains("out of range"));
1361
1362        // out of bounds - offset + length too large
1363        let res = blobs
1364            .read_at(hash, 1024 * 127, ReadAtLen::Exact(1025))
1365            .await;
1366        let err = res.unwrap_err();
1367        assert!(err.to_string().contains("out of bound"));
1368
1369        Ok(())
1370    }
1371
1372    #[tokio::test]
1373    #[traced_test]
1374    async fn test_blob_get_collection() -> Result<()> {
1375        let node = node::Node::memory().spawn().await?;
1376
1377        // create temp file
1378        let temp_dir = tempfile::tempdir().context("tempdir")?;
1379
1380        let in_root = temp_dir.path().join("in");
1381        tokio::fs::create_dir_all(in_root.clone())
1382            .await
1383            .context("create dir all")?;
1384
1385        let mut paths = Vec::new();
1386        for i in 0..5 {
1387            let path = in_root.join(format!("test-{i}"));
1388            let size = 100;
1389            let mut buf = vec![0u8; size];
1390            rand::thread_rng().fill_bytes(&mut buf);
1391            let mut file = tokio::fs::File::create(path.clone())
1392                .await
1393                .context("create file")?;
1394            file.write_all(&buf.clone()).await.context("write_all")?;
1395            file.flush().await.context("flush")?;
1396            paths.push(path);
1397        }
1398
1399        let blobs = node.blobs();
1400
1401        let mut collection = Collection::default();
1402        let mut tags = Vec::new();
1403        // import files
1404        for path in &paths {
1405            let import_outcome = blobs
1406                .add_from_path(
1407                    path.to_path_buf(),
1408                    false,
1409                    SetTagOption::Auto,
1410                    WrapOption::NoWrap,
1411                )
1412                .await
1413                .context("import file")?
1414                .finish()
1415                .await
1416                .context("import finish")?;
1417
1418            collection.push(
1419                path.file_name().unwrap().to_str().unwrap().to_string(),
1420                import_outcome.hash,
1421            );
1422            tags.push(import_outcome.tag);
1423        }
1424
1425        let (hash, _tag) = blobs
1426            .create_collection(collection, SetTagOption::Auto, tags)
1427            .await?;
1428
1429        let collection = blobs.get_collection(hash).await?;
1430
1431        // 5 blobs
1432        assert_eq!(collection.len(), 5);
1433
1434        Ok(())
1435    }
1436
1437    #[tokio::test]
1438    #[traced_test]
1439    async fn test_blob_share() -> Result<()> {
1440        let node = node::Node::memory().spawn().await?;
1441
1442        // create temp file
1443        let temp_dir = tempfile::tempdir().context("tempdir")?;
1444
1445        let in_root = temp_dir.path().join("in");
1446        tokio::fs::create_dir_all(in_root.clone())
1447            .await
1448            .context("create dir all")?;
1449
1450        let path = in_root.join("test-blob");
1451        let size = 1024 * 128;
1452        let buf: Vec<u8> = (0..size).map(|i| i as u8).collect();
1453        let mut file = tokio::fs::File::create(path.clone())
1454            .await
1455            .context("create file")?;
1456        file.write_all(&buf.clone()).await.context("write_all")?;
1457        file.flush().await.context("flush")?;
1458
1459        let blobs = node.blobs();
1460
1461        let import_outcome = blobs
1462            .add_from_path(
1463                path.to_path_buf(),
1464                false,
1465                SetTagOption::Auto,
1466                WrapOption::NoWrap,
1467            )
1468            .await
1469            .context("import file")?
1470            .finish()
1471            .await
1472            .context("import finish")?;
1473
1474        // let ticket = blobs
1475        //     .share(import_outcome.hash, BlobFormat::Raw, Default::default())
1476        //     .await?;
1477        // assert_eq!(ticket.hash(), import_outcome.hash);
1478
1479        let status = blobs.status(import_outcome.hash).await?;
1480        assert_eq!(status, BlobStatus::Complete { size });
1481
1482        Ok(())
1483    }
1484
1485    #[derive(Debug, Clone)]
1486    struct BlobEvents {
1487        sender: mpsc::Sender<crate::provider::Event>,
1488    }
1489
1490    impl BlobEvents {
1491        fn new(cap: usize) -> (Self, mpsc::Receiver<crate::provider::Event>) {
1492            let (s, r) = mpsc::channel(cap);
1493            (Self { sender: s }, r)
1494        }
1495    }
1496
1497    impl crate::provider::CustomEventSender for BlobEvents {
1498        fn send(&self, event: crate::provider::Event) -> futures_lite::future::Boxed<()> {
1499            let sender = self.sender.clone();
1500            Box::pin(async move {
1501                sender.send(event).await.ok();
1502            })
1503        }
1504
1505        fn try_send(&self, event: crate::provider::Event) {
1506            self.sender.try_send(event).ok();
1507        }
1508    }
1509
1510    #[tokio::test]
1511    #[traced_test]
1512    async fn test_blob_provide_events() -> Result<()> {
1513        let (node1_events, mut node1_events_r) = BlobEvents::new(16);
1514        let node1 = node::Node::memory()
1515            .blobs_events(node1_events)
1516            .spawn()
1517            .await?;
1518
1519        let (node2_events, mut node2_events_r) = BlobEvents::new(16);
1520        let node2 = node::Node::memory()
1521            .blobs_events(node2_events)
1522            .spawn()
1523            .await?;
1524
1525        let import_outcome = node1.blobs().add_bytes(&b"hello world"[..]).await?;
1526
1527        // Download in node2
1528        let node1_addr = node1.node_addr().await?;
1529        let res = node2
1530            .blobs()
1531            .download(import_outcome.hash, node1_addr)
1532            .await?
1533            .await?;
1534        dbg!(&res);
1535        assert_eq!(res.local_size, 0);
1536        assert_eq!(res.downloaded_size, 11);
1537
1538        node1.shutdown().await?;
1539        node2.shutdown().await?;
1540
1541        let mut ev1 = Vec::new();
1542        while let Some(ev) = node1_events_r.recv().await {
1543            ev1.push(ev);
1544        }
1545        // assert_eq!(ev1.len(), 3);
1546        assert!(matches!(
1547            ev1[0],
1548            crate::provider::Event::ClientConnected { .. }
1549        ));
1550        assert!(matches!(
1551            ev1[1],
1552            crate::provider::Event::GetRequestReceived { .. }
1553        ));
1554        assert!(matches!(
1555            ev1[2],
1556            crate::provider::Event::TransferProgress { .. }
1557        ));
1558        assert!(matches!(
1559            ev1[3],
1560            crate::provider::Event::TransferCompleted { .. }
1561        ));
1562        dbg!(&ev1);
1563
1564        let mut ev2 = Vec::new();
1565        while let Some(ev) = node2_events_r.recv().await {
1566            ev2.push(ev);
1567        }
1568
1569        // Node 2 did not provide anything
1570        assert!(ev2.is_empty());
1571        Ok(())
1572    }
1573    /// Download a existing blob from oneself
1574    #[tokio::test]
1575    #[traced_test]
1576    async fn test_blob_get_self_existing() -> TestResult<()> {
1577        let node = node::Node::memory().spawn().await?;
1578        let node_id = node.node_id();
1579        let blobs = node.blobs();
1580
1581        let AddOutcome { hash, size, .. } = blobs.add_bytes("foo").await?;
1582
1583        // Direct
1584        let res = blobs
1585            .download_with_opts(
1586                hash,
1587                DownloadOptions {
1588                    format: BlobFormat::Raw,
1589                    nodes: vec![node_id.into()],
1590                    tag: SetTagOption::Auto,
1591                    mode: DownloadMode::Direct,
1592                },
1593            )
1594            .await?
1595            .await?;
1596
1597        assert_eq!(res.local_size, size);
1598        assert_eq!(res.downloaded_size, 0);
1599
1600        // Queued
1601        let res = blobs
1602            .download_with_opts(
1603                hash,
1604                DownloadOptions {
1605                    format: BlobFormat::Raw,
1606                    nodes: vec![node_id.into()],
1607                    tag: SetTagOption::Auto,
1608                    mode: DownloadMode::Queued,
1609                },
1610            )
1611            .await?
1612            .await?;
1613
1614        assert_eq!(res.local_size, size);
1615        assert_eq!(res.downloaded_size, 0);
1616
1617        Ok(())
1618    }
1619
1620    /// Download a missing blob from oneself
1621    #[tokio::test]
1622    #[traced_test]
1623    async fn test_blob_get_self_missing() -> TestResult<()> {
1624        let node = node::Node::memory().spawn().await?;
1625        let node_id = node.node_id();
1626        let blobs = node.blobs();
1627
1628        let hash = Hash::from_bytes([0u8; 32]);
1629
1630        // Direct
1631        let res = blobs
1632            .download_with_opts(
1633                hash,
1634                DownloadOptions {
1635                    format: BlobFormat::Raw,
1636                    nodes: vec![node_id.into()],
1637                    tag: SetTagOption::Auto,
1638                    mode: DownloadMode::Direct,
1639                },
1640            )
1641            .await?
1642            .await;
1643        assert!(res.is_err());
1644        assert_eq!(
1645            res.err().unwrap().to_string().as_str(),
1646            "No nodes to download from provided"
1647        );
1648
1649        // Queued
1650        let res = blobs
1651            .download_with_opts(
1652                hash,
1653                DownloadOptions {
1654                    format: BlobFormat::Raw,
1655                    nodes: vec![node_id.into()],
1656                    tag: SetTagOption::Auto,
1657                    mode: DownloadMode::Queued,
1658                },
1659            )
1660            .await?
1661            .await;
1662        assert!(res.is_err());
1663        assert_eq!(
1664            res.err().unwrap().to_string().as_str(),
1665            "No provider nodes found"
1666        );
1667
1668        Ok(())
1669    }
1670
1671    /// Download a existing collection. Check that things succeed and no download is performed.
1672    #[tokio::test]
1673    #[traced_test]
1674    async fn test_blob_get_existing_collection() -> TestResult<()> {
1675        let node = node::Node::memory().spawn().await?;
1676        // We use a nonexisting node id because we just want to check that this succeeds without
1677        // hitting the network.
1678        let node_id = NodeId::from_bytes(&[0u8; 32])?;
1679        let blobs = node.blobs();
1680
1681        let mut collection = Collection::default();
1682        let mut tags = Vec::new();
1683        let mut size = 0;
1684        for value in ["iroh", "is", "cool"] {
1685            let import_outcome = blobs.add_bytes(value).await.context("add bytes")?;
1686            collection.push(value.to_string(), import_outcome.hash);
1687            tags.push(import_outcome.tag);
1688            size += import_outcome.size;
1689        }
1690
1691        let (hash, _tag) = blobs
1692            .create_collection(collection, SetTagOption::Auto, tags)
1693            .await?;
1694
1695        // load the hashseq and collection header manually to calculate our expected size
1696        let hashseq_bytes = blobs.read_to_bytes(hash).await?;
1697        size += hashseq_bytes.len() as u64;
1698        let hashseq = HashSeq::try_from(hashseq_bytes)?;
1699        let collection_header_bytes = blobs
1700            .read_to_bytes(hashseq.into_iter().next().expect("header to exist"))
1701            .await?;
1702        size += collection_header_bytes.len() as u64;
1703
1704        // Direct
1705        let res = blobs
1706            .download_with_opts(
1707                hash,
1708                DownloadOptions {
1709                    format: BlobFormat::HashSeq,
1710                    nodes: vec![node_id.into()],
1711                    tag: SetTagOption::Auto,
1712                    mode: DownloadMode::Direct,
1713                },
1714            )
1715            .await?
1716            .await
1717            .context("direct (download)")?;
1718
1719        assert_eq!(res.local_size, size);
1720        assert_eq!(res.downloaded_size, 0);
1721
1722        // Queued
1723        let res = blobs
1724            .download_with_opts(
1725                hash,
1726                DownloadOptions {
1727                    format: BlobFormat::HashSeq,
1728                    nodes: vec![node_id.into()],
1729                    tag: SetTagOption::Auto,
1730                    mode: DownloadMode::Queued,
1731                },
1732            )
1733            .await?
1734            .await
1735            .context("queued")?;
1736
1737        assert_eq!(res.local_size, size);
1738        assert_eq!(res.downloaded_size, 0);
1739
1740        Ok(())
1741    }
1742
1743    #[tokio::test]
1744    #[traced_test]
1745    #[cfg_attr(target_os = "windows", ignore = "flaky")]
1746    async fn test_blob_delete_mem() -> Result<()> {
1747        let node = node::Node::memory().spawn().await?;
1748
1749        let res = node.blobs().add_bytes(&b"hello world"[..]).await?;
1750
1751        let hashes: Vec<_> = node.blobs().list().await?.try_collect().await?;
1752        assert_eq!(hashes.len(), 1);
1753        assert_eq!(hashes[0].hash, res.hash);
1754
1755        // delete
1756        node.blobs().delete_blob(res.hash).await?;
1757
1758        let hashes: Vec<_> = node.blobs().list().await?.try_collect().await?;
1759        assert!(hashes.is_empty());
1760
1761        Ok(())
1762    }
1763
1764    #[tokio::test]
1765    #[traced_test]
1766    async fn test_blob_delete_fs() -> Result<()> {
1767        let dir = tempfile::tempdir()?;
1768        let node = node::Node::persistent(dir.path()).await?.spawn().await?;
1769
1770        let res = node.blobs().add_bytes(&b"hello world"[..]).await?;
1771
1772        let hashes: Vec<_> = node.blobs().list().await?.try_collect().await?;
1773        assert_eq!(hashes.len(), 1);
1774        assert_eq!(hashes[0].hash, res.hash);
1775
1776        // delete
1777        node.blobs().delete_blob(res.hash).await?;
1778
1779        let hashes: Vec<_> = node.blobs().list().await?.try_collect().await?;
1780        assert!(hashes.is_empty());
1781
1782        Ok(())
1783    }
1784
1785    #[tokio::test]
1786    #[traced_test]
1787    async fn test_ticket_multiple_addrs() -> TestResult<()> {
1788        let node = Node::memory().spawn().await?;
1789        let hash = node
1790            .blobs()
1791            .add_bytes(Bytes::from_static(b"hello"))
1792            .await?
1793            .hash;
1794
1795        let addr = node.node_addr().await?;
1796        let ticket = BlobTicket::new(addr, hash, BlobFormat::Raw)?;
1797        println!("addrs: {:?}", ticket.node_addr());
1798        assert!(!ticket.node_addr().direct_addresses.is_empty());
1799        Ok(())
1800    }
1801
1802    #[tokio::test]
1803    #[traced_test]
1804    async fn test_node_add_blob_stream() -> Result<()> {
1805        use std::io::Cursor;
1806        let node = Node::memory().spawn().await?;
1807
1808        let blobs = node.blobs();
1809        let input = vec![2u8; 1024 * 256]; // 265kb so actually streaming, chunk size is 64kb
1810        let reader = Cursor::new(input.clone());
1811        let progress = blobs.add_reader(reader, SetTagOption::Auto).await?;
1812        let outcome = progress.finish().await?;
1813        let hash = outcome.hash;
1814        let output = blobs.read_to_bytes(hash).await?;
1815        assert_eq!(input, output.to_vec());
1816        Ok(())
1817    }
1818
1819    #[tokio::test]
1820    #[traced_test]
1821    async fn test_node_add_tagged_blob_event() -> Result<()> {
1822        let node = Node::memory().spawn().await?;
1823
1824        let _got_hash = tokio::time::timeout(Duration::from_secs(10), async move {
1825            let mut stream = node
1826                .blobs()
1827                .add_from_path(
1828                    Path::new(env!("CARGO_MANIFEST_DIR")).join("README.md"),
1829                    false,
1830                    SetTagOption::Auto,
1831                    WrapOption::NoWrap,
1832                )
1833                .await?;
1834
1835            while let Some(progress) = stream.next().await {
1836                match progress? {
1837                    crate::provider::AddProgress::AllDone { hash, .. } => {
1838                        return Ok(hash);
1839                    }
1840                    crate::provider::AddProgress::Abort(e) => {
1841                        anyhow::bail!("Error while adding data: {e}");
1842                    }
1843                    _ => {}
1844                }
1845            }
1846            anyhow::bail!("stream ended without providing data");
1847        })
1848        .await
1849        .context("timeout")?
1850        .context("get failed")?;
1851
1852        Ok(())
1853    }
1854
1855    #[tokio::test]
1856    #[traced_test]
1857    async fn test_download_via_relay() -> Result<()> {
1858        let (relay_map, relay_url, _guard) = iroh::test_utils::run_relay_server().await?;
1859
1860        let endpoint1 = iroh::Endpoint::builder()
1861            .relay_mode(RelayMode::Custom(relay_map.clone()))
1862            .insecure_skip_relay_cert_verify(true);
1863        let node1 = Node::memory().endpoint(endpoint1).spawn().await?;
1864        let endpoint2 = iroh::Endpoint::builder()
1865            .relay_mode(RelayMode::Custom(relay_map.clone()))
1866            .insecure_skip_relay_cert_verify(true);
1867        let node2 = Node::memory().endpoint(endpoint2).spawn().await?;
1868        let AddOutcome { hash, .. } = node1.blobs().add_bytes(b"foo".to_vec()).await?;
1869
1870        // create a node addr with only a relay URL, no direct addresses
1871        let addr = NodeAddr::new(node1.node_id()).with_relay_url(relay_url);
1872        node2.blobs().download(hash, addr).await?.await?;
1873        assert_eq!(
1874            node2
1875                .blobs()
1876                .read_to_bytes(hash)
1877                .await
1878                .context("get")?
1879                .as_ref(),
1880            b"foo"
1881        );
1882        Ok(())
1883    }
1884
1885    #[tokio::test]
1886    #[traced_test]
1887    #[ignore = "flaky"]
1888    async fn test_download_via_relay_with_discovery() -> Result<()> {
1889        let (relay_map, _relay_url, _guard) = iroh::test_utils::run_relay_server().await?;
1890        let dns_pkarr_server = DnsPkarrServer::run().await?;
1891
1892        let mut rng = rand::thread_rng();
1893
1894        let secret1 = SecretKey::generate(&mut rng);
1895        let endpoint1 = iroh::Endpoint::builder()
1896            .relay_mode(RelayMode::Custom(relay_map.clone()))
1897            .insecure_skip_relay_cert_verify(true)
1898            .dns_resolver(dns_pkarr_server.dns_resolver())
1899            .secret_key(secret1.clone())
1900            .discovery(dns_pkarr_server.discovery(secret1));
1901        let node1 = Node::memory().endpoint(endpoint1).spawn().await?;
1902        let secret2 = SecretKey::generate(&mut rng);
1903        let endpoint2 = iroh::Endpoint::builder()
1904            .relay_mode(RelayMode::Custom(relay_map.clone()))
1905            .insecure_skip_relay_cert_verify(true)
1906            .dns_resolver(dns_pkarr_server.dns_resolver())
1907            .secret_key(secret2.clone())
1908            .discovery(dns_pkarr_server.discovery(secret2));
1909        let node2 = Node::memory().endpoint(endpoint2).spawn().await?;
1910        let hash = node1.blobs().add_bytes(b"foo".to_vec()).await?.hash;
1911
1912        // create a node addr with node id only
1913        let addr = NodeAddr::new(node1.node_id());
1914        node2.blobs().download(hash, addr).await?.await?;
1915        assert_eq!(
1916            node2
1917                .blobs()
1918                .read_to_bytes(hash)
1919                .await
1920                .context("get")?
1921                .as_ref(),
1922            b"foo"
1923        );
1924        Ok(())
1925    }
1926}