iroh_blobs/rpc/client/blobs/
batch.rs

1use std::{
2    io,
3    path::PathBuf,
4    sync::{Arc, Mutex},
5};
6
7use anyhow::{anyhow, Context, Result};
8use bytes::Bytes;
9use futures_buffered::BufferedStreamExt;
10use futures_lite::StreamExt;
11use futures_util::{sink::Buffer, FutureExt, SinkExt, Stream};
12use quic_rpc::{client::UpdateSink, Connector, RpcClient};
13use tokio::io::AsyncRead;
14use tokio_util::io::ReaderStream;
15use tracing::{debug, warn};
16
17use super::WrapOption;
18use crate::{
19    format::collection::Collection,
20    net_protocol::BatchId,
21    provider::BatchAddPathProgress,
22    rpc::proto::{
23        blobs::{
24            BatchAddPathRequest, BatchAddStreamRequest, BatchAddStreamResponse,
25            BatchAddStreamUpdate, BatchCreateTempTagRequest, BatchUpdate,
26        },
27        tags::{self, SyncMode},
28        RpcService,
29    },
30    store::ImportMode,
31    util::{SetTagOption, TagDrop},
32    BlobFormat, HashAndFormat, Tag, TempTag,
33};
34
35/// A scope in which blobs can be added.
36#[derive(derive_more::Debug)]
37struct BatchInner<C>
38where
39    C: Connector<RpcService>,
40{
41    /// The id of the scope.
42    batch: BatchId,
43    /// The rpc client.
44    rpc: RpcClient<RpcService, C>,
45    /// The stream to send drop
46    #[debug(skip)]
47    updates: Mutex<Buffer<UpdateSink<C, BatchUpdate>, BatchUpdate>>,
48}
49
50/// A batch for write operations.
51///
52/// This serves mostly as a scope for temporary tags.
53///
54/// It is not a transaction, so things in a batch are not atomic. Also, there is
55/// no isolation between batches.
56#[derive(derive_more::Debug)]
57pub struct Batch<C>(Arc<BatchInner<C>>)
58where
59    C: Connector<RpcService>;
60
61impl<C> TagDrop for BatchInner<C>
62where
63    C: Connector<RpcService>,
64{
65    fn on_drop(&self, content: &HashAndFormat) {
66        let mut updates = self.updates.lock().unwrap();
67        // make a spirited attempt to notify the server that we are dropping the content
68        //
69        // this will occasionally fail, but that's acceptable. The temp tags for the batch
70        // will be cleaned up as soon as the entire batch is dropped.
71        //
72        // E.g. a typical scenario is that you create a large array of temp tags, and then
73        // store them in a hash sequence and then drop the array. You will get many drops
74        // at the same time, and might get a send failure here.
75        //
76        // But that just means that the server will clean up the temp tags when the batch is
77        // dropped.
78        updates.feed(BatchUpdate::Drop(*content)).now_or_never();
79        updates.flush().now_or_never();
80    }
81}
82
83/// Options for adding a file as a blob
84#[derive(Debug, Clone, Copy, Default)]
85pub struct AddFileOpts {
86    /// The import mode
87    pub import_mode: ImportMode,
88    /// The format of the blob
89    pub format: BlobFormat,
90}
91
92/// Options for adding a directory as a collection
93#[derive(Debug, Clone)]
94pub struct AddDirOpts {
95    /// The import mode
96    pub import_mode: ImportMode,
97    /// Whether to preserve the directory name
98    pub wrap: WrapOption,
99    /// Io parallelism
100    pub io_parallelism: usize,
101}
102
103impl Default for AddDirOpts {
104    fn default() -> Self {
105        Self {
106            import_mode: ImportMode::TryReference,
107            wrap: WrapOption::NoWrap,
108            io_parallelism: 4,
109        }
110    }
111}
112
113/// Options for adding a directory as a collection
114#[derive(Debug, Clone)]
115pub struct AddReaderOpts {
116    /// The format of the blob
117    pub format: BlobFormat,
118    /// Size of the chunks to send
119    pub chunk_size: usize,
120}
121
122impl Default for AddReaderOpts {
123    fn default() -> Self {
124        Self {
125            format: BlobFormat::Raw,
126            chunk_size: 1024 * 64,
127        }
128    }
129}
130
131impl<C> Batch<C>
132where
133    C: Connector<RpcService>,
134{
135    pub(super) fn new(
136        batch: BatchId,
137        rpc: RpcClient<RpcService, C>,
138        updates: UpdateSink<C, BatchUpdate>,
139        buffer_size: usize,
140    ) -> Self {
141        let updates = updates.buffer(buffer_size);
142        Self(Arc::new(BatchInner {
143            batch,
144            rpc,
145            updates: updates.into(),
146        }))
147    }
148
149    /// Write a blob by passing bytes.
150    pub async fn add_bytes(&self, bytes: impl Into<Bytes>) -> Result<TempTag> {
151        self.add_bytes_with_opts(bytes, Default::default()).await
152    }
153
154    /// Import a blob from a filesystem path, using the default options.
155    ///
156    /// For more control, use [`Self::add_file_with_opts`].
157    pub async fn add_file(&self, path: PathBuf) -> Result<(TempTag, u64)> {
158        self.add_file_with_opts(path, AddFileOpts::default()).await
159    }
160
161    /// Add a directory as a hashseq in iroh collection format
162    pub async fn add_dir(&self, root: PathBuf) -> Result<TempTag> {
163        self.add_dir_with_opts(root, Default::default()).await
164    }
165
166    /// Write a blob by passing an async reader.
167    ///
168    /// This will consume the stream in 64KB chunks, and use a format of [BlobFormat::Raw].
169    ///
170    /// For more options, see [`Self::add_reader_with_opts`].
171    pub async fn add_reader(
172        &self,
173        reader: impl AsyncRead + Unpin + Send + 'static,
174    ) -> anyhow::Result<TempTag> {
175        self.add_reader_with_opts(reader, Default::default()).await
176    }
177
178    /// Write a blob by passing a stream of bytes.
179    pub async fn add_stream(
180        &self,
181        input: impl Stream<Item = io::Result<Bytes>> + Send + Unpin + 'static,
182    ) -> Result<TempTag> {
183        self.add_stream_with_opts(input, Default::default()).await
184    }
185
186    /// Creates a temp tag to protect some content (blob or hashseq) from being deleted.
187    ///
188    /// This is a lower-level API. The other functions in [`Batch`] already create [`TempTag`]s automatically.
189    ///
190    /// [`TempTag`]s allow you to protect some data from deletion while a download is ongoing,
191    /// even if you don't want to protect it permanently.
192    pub async fn temp_tag(&self, content: HashAndFormat) -> Result<TempTag> {
193        // Notify the server that we want one temp tag for the given content
194        self.0
195            .rpc
196            .rpc(BatchCreateTempTagRequest {
197                batch: self.0.batch,
198                content,
199            })
200            .await??;
201        // Only after success of the above call, we can create the corresponding local temp tag
202        Ok(self.local_temp_tag(content, None))
203    }
204
205    /// Write a blob by passing an async reader.
206    ///
207    /// This consumes the stream in chunks using `opts.chunk_size`. A good default is 64KB.
208    pub async fn add_reader_with_opts(
209        &self,
210        reader: impl AsyncRead + Unpin + Send + 'static,
211        opts: AddReaderOpts,
212    ) -> anyhow::Result<TempTag> {
213        let AddReaderOpts { format, chunk_size } = opts;
214        let input = ReaderStream::with_capacity(reader, chunk_size);
215        self.add_stream_with_opts(input, format).await
216    }
217
218    /// Write a blob by passing bytes.
219    pub async fn add_bytes_with_opts(
220        &self,
221        bytes: impl Into<Bytes>,
222        format: BlobFormat,
223    ) -> Result<TempTag> {
224        let input = futures_lite::stream::once(Ok(bytes.into()));
225        self.add_stream_with_opts(input, format).await
226    }
227
228    /// Import a blob from a filesystem path.
229    ///
230    /// `path` should be an absolute path valid for the file system on which
231    /// the node runs, which refers to a file.
232    ///
233    /// If you use [`ImportMode::TryReference`], Iroh will assume that the data will not
234    /// change and will share it in place without copying to the Iroh data directory
235    /// if appropriate. However, for tiny files, Iroh will copy the data.
236    ///
237    /// If you use [`ImportMode::Copy`], Iroh will always copy the data.
238    ///
239    /// Will return a temp tag for the added blob, as well as the size of the file.
240    pub async fn add_file_with_opts(
241        &self,
242        path: PathBuf,
243        opts: AddFileOpts,
244    ) -> Result<(TempTag, u64)> {
245        let AddFileOpts {
246            import_mode,
247            format,
248        } = opts;
249        anyhow::ensure!(
250            path.is_absolute(),
251            "Path must be absolute, but got: {:?}",
252            path
253        );
254        anyhow::ensure!(path.is_file(), "Path does not refer to a file: {:?}", path);
255        let mut stream = self
256            .0
257            .rpc
258            .server_streaming(BatchAddPathRequest {
259                path,
260                import_mode,
261                format,
262                batch: self.0.batch,
263            })
264            .await?;
265        let mut res_hash = None;
266        let mut res_size = None;
267        while let Some(item) = stream.next().await {
268            match item?.0 {
269                BatchAddPathProgress::Abort(cause) => {
270                    Err(cause)?;
271                }
272                BatchAddPathProgress::Done { hash } => {
273                    res_hash = Some(hash);
274                }
275                BatchAddPathProgress::Found { size } => {
276                    res_size = Some(size);
277                }
278                _ => {}
279            }
280        }
281        let hash = res_hash.context("Missing hash")?;
282        let size = res_size.context("Missing size")?;
283        Ok((
284            self.local_temp_tag(HashAndFormat { hash, format }, Some(size)),
285            size,
286        ))
287    }
288
289    /// Add a directory as a hashseq in iroh collection format
290    ///
291    /// This can also be used to add a single file as a collection, if
292    /// wrap is set to [WrapOption::Wrap].
293    ///
294    /// However, if you want to add a single file as a raw blob, use add_file instead.
295    pub async fn add_dir_with_opts(&self, root: PathBuf, opts: AddDirOpts) -> Result<TempTag> {
296        let AddDirOpts {
297            import_mode,
298            wrap,
299            io_parallelism,
300        } = opts;
301        anyhow::ensure!(root.is_absolute(), "Path must be absolute");
302
303        // let (send, recv) = flume::bounded(32);
304        // let import_progress = FlumeProgressSender::new(send);
305
306        // import all files below root recursively
307        let data_sources = crate::util::fs::scan_path(root, wrap)?;
308        let opts = AddFileOpts {
309            import_mode,
310            format: BlobFormat::Raw,
311        };
312        let result: Vec<_> = futures_lite::stream::iter(data_sources)
313            .map(|source| {
314                // let import_progress = import_progress.clone();
315                async move {
316                    let name = source.name().to_string();
317                    let (tag, size) = self
318                        .add_file_with_opts(source.path().to_owned(), opts)
319                        .await?;
320                    let hash = *tag.hash();
321                    anyhow::Ok((name, hash, size, tag))
322                }
323            })
324            .buffered_ordered(io_parallelism)
325            .try_collect()
326            .await?;
327
328        // create a collection
329        let (collection, child_tags): (Collection, Vec<_>) = result
330            .into_iter()
331            .map(|(name, hash, _, tag)| ((name, hash), tag))
332            .unzip();
333
334        let tag = self.add_collection(collection).await?;
335        drop(child_tags);
336        Ok(tag)
337    }
338
339    /// Write a blob by passing a stream of bytes.
340    ///
341    /// For convenient interop with common sources of data, this function takes a stream of `io::Result<Bytes>`.
342    /// If you have raw bytes, you need to wrap them in `io::Result::Ok`.
343    pub async fn add_stream_with_opts(
344        &self,
345        mut input: impl Stream<Item = io::Result<Bytes>> + Send + Unpin + 'static,
346        format: BlobFormat,
347    ) -> Result<TempTag> {
348        let (mut sink, mut stream) = self
349            .0
350            .rpc
351            .bidi(BatchAddStreamRequest {
352                batch: self.0.batch,
353                format,
354            })
355            .await?;
356        let mut size = 0u64;
357        while let Some(item) = input.next().await {
358            match item {
359                Ok(chunk) => {
360                    size += chunk.len() as u64;
361                    sink.send(BatchAddStreamUpdate::Chunk(chunk))
362                        .await
363                        .map_err(|err| anyhow!("Failed to send input stream to remote: {err:?}"))?;
364                }
365                Err(err) => {
366                    warn!("Abort send, reason: failed to read from source stream: {err:?}");
367                    sink.send(BatchAddStreamUpdate::Abort)
368                        .await
369                        .map_err(|err| anyhow!("Failed to send input stream to remote: {err:?}"))?;
370                    break;
371                }
372            }
373        }
374        // this is needed for the remote to notice that the stream is closed
375        drop(sink);
376        let mut res = None;
377        while let Some(item) = stream.next().await {
378            match item? {
379                BatchAddStreamResponse::Abort(cause) => {
380                    Err(cause)?;
381                }
382                BatchAddStreamResponse::Result { hash } => {
383                    res = Some(hash);
384                }
385                _ => {}
386            }
387        }
388        let hash = res.context("Missing answer")?;
389        Ok(self.local_temp_tag(HashAndFormat { hash, format }, Some(size)))
390    }
391
392    /// Add a collection.
393    ///
394    /// This is a convenience function that converts the collection into two blobs
395    /// (the metadata and the hash sequence) and adds them, returning a temp tag for
396    /// the hash sequence.
397    ///
398    /// Note that this does not guarantee that the data that the collection refers to
399    /// actually exists. It will just create 2 blobs, the metadata and the hash sequence
400    /// itself.
401    pub async fn add_collection(&self, collection: Collection) -> Result<TempTag> {
402        self.add_blob_seq(collection.to_blobs()).await
403    }
404
405    /// Add a sequence of blobs, where the last is a hash sequence.
406    ///
407    /// It is a common pattern in iroh to have a hash sequence with one or more
408    /// blobs of metadata, and the remaining blobs being the actual data. E.g.
409    /// a collection is a hash sequence where the first child is the metadata.
410    pub async fn add_blob_seq(&self, iter: impl Iterator<Item = Bytes>) -> Result<TempTag> {
411        let mut blobs = iter.peekable();
412        // put the tags somewhere
413        let mut tags = vec![];
414        loop {
415            let blob = blobs.next().context("Failed to get next blob")?;
416            if blobs.peek().is_none() {
417                return self.add_bytes_with_opts(blob, BlobFormat::HashSeq).await;
418            } else {
419                tags.push(self.add_bytes(blob).await?);
420            }
421        }
422    }
423
424    /// Upgrades a temp tag to a persistent tag.
425    pub async fn persist(&self, tt: TempTag) -> Result<Tag> {
426        let tag = self
427            .0
428            .rpc
429            .rpc(tags::CreateRequest {
430                value: tt.hash_and_format(),
431                batch: Some(self.0.batch),
432                sync: SyncMode::Full,
433            })
434            .await??;
435        Ok(tag)
436    }
437
438    /// Upgrades a temp tag to a persistent tag with a specific name.
439    pub async fn persist_to(&self, tt: TempTag, tag: Tag) -> Result<()> {
440        self.0
441            .rpc
442            .rpc(tags::SetRequest {
443                name: tag,
444                value: tt.hash_and_format(),
445                batch: Some(self.0.batch),
446                sync: SyncMode::Full,
447            })
448            .await??;
449        Ok(())
450    }
451
452    /// Upgrades a temp tag to a persistent tag with either a specific name or
453    /// an automatically generated name.
454    pub async fn persist_with_opts(&self, tt: TempTag, opts: SetTagOption) -> Result<Tag> {
455        match opts {
456            SetTagOption::Auto => self.persist(tt).await,
457            SetTagOption::Named(tag) => {
458                self.persist_to(tt, tag.clone()).await?;
459                Ok(tag)
460            }
461        }
462    }
463
464    /// Creates a temp tag for the given hash and format, without notifying the server.
465    ///
466    /// Caution: only do this for data for which you know the server side has created a temp tag.
467    fn local_temp_tag(&self, inner: HashAndFormat, _size: Option<u64>) -> TempTag {
468        let on_drop: Arc<dyn TagDrop> = self.0.clone();
469        let on_drop = Some(Arc::downgrade(&on_drop));
470        TempTag::new(inner, on_drop)
471    }
472}