iroh_fake_store/
lib.rs

1//! fake iroh-blobs store for testing.
2//!
3//! generates data on-the-fly, stores nothing. serves deterministic data (zeros or
4//! pseudo-random) at specific lengths for testing big blob transfers.
5//!
6//! # Examples
7//!
8//! ```
9//! use iroh_fake_store::FakeStore;
10//!
11//! # tokio_test::block_on(async {
12//! let store = FakeStore::builder()
13//!     .with_blob(1024)           // 1KB blob
14//!     .with_blob(1024 * 1024)    // 1MB blob
15//!     .build();
16//!
17//! let hashes = store.blobs().list().hashes().await.unwrap();
18//! assert_eq!(hashes.len(), 2);
19//! # });
20//! ```
21//!
22//! generates on-the-fly without storing, deterministic hashes, configurable patterns,
23//! has safety limits to prevent accidentally making huge blobs
24
25use std::{
26    collections::{BTreeMap, HashMap},
27    io::{self, Write},
28    path::PathBuf,
29    sync::{Arc, Mutex},
30};
31
32use bao_tree::{
33    BaoTree, ChunkRanges,
34    io::{
35        Leaf,
36        mixed::{EncodedItem, ReadBytesAt, traverse_ranges_validated},
37        outboard::PreOrderMemOutboard,
38        sync::ReadAt,
39    },
40};
41use bytes::Bytes;
42use iroh_blobs::{
43    BlobFormat, Hash, HashAndFormat,
44    api::{
45        self, Store, TempTag,
46        blobs::{Bitfield, ExportProgressItem},
47        proto::{
48            self, BlobStatus, Command, ExportBaoMsg, ExportBaoRequest, ExportPathMsg,
49            ExportPathRequest, ExportRangesItem, ExportRangesMsg, ExportRangesRequest,
50            ImportBaoMsg, ImportByteStreamMsg, ImportBytesMsg, ObserveMsg, ObserveRequest,
51            WaitIdleMsg,
52        },
53    },
54    protocol::ChunkRangesExt,
55    store::IROH_BLOCK_SIZE,
56};
57use irpc::channel::mpsc;
58use range_collections::range_set::RangeSetRange;
59use ref_cast::RefCast;
60use tokio::task::{JoinError, JoinSet};
61
62/// data generation strategy for fake blobs
63#[derive(Debug, Clone, Copy, PartialEq, Eq)]
64pub enum DataStrategy {
65    /// all zeros (default, most efficient)
66    Zeros,
67    /// all ones
68    Ones,
69    /// deterministic pseudo-random based on seed
70    PseudoRandom { seed: u64 },
71}
72
73impl Default for DataStrategy {
74    fn default() -> Self {
75        Self::Zeros
76    }
77}
78
79#[derive(Debug, Clone)]
80pub struct FakeStoreConfig {
81    pub strategy: DataStrategy,
82    /// max blob size (prevents accidents)
83    pub max_blob_size: Option<u64>,
84}
85
86impl Default for FakeStoreConfig {
87    fn default() -> Self {
88        Self {
89            strategy: DataStrategy::Zeros,
90            // 10GB limit to prevent accidents
91            max_blob_size: Some(10 * 1024 * 1024 * 1024),
92        }
93    }
94}
95
96#[derive(Debug, Clone, Default)]
97pub struct FakeStoreBuilder {
98    config: FakeStoreConfig,
99    sizes: Vec<u64>,
100}
101
102impl FakeStoreBuilder {
103    pub fn new() -> Self {
104        Self::default()
105    }
106
107    pub fn strategy(mut self, strategy: DataStrategy) -> Self {
108        self.config.strategy = strategy;
109        self
110    }
111
112    /// None for unlimited
113    pub fn max_blob_size(mut self, max: Option<u64>) -> Self {
114        self.config.max_blob_size = max;
115        self
116    }
117
118    pub fn with_blob(mut self, size: u64) -> Self {
119        self.sizes.push(size);
120        self
121    }
122
123    pub fn with_blobs(mut self, sizes: impl IntoIterator<Item = u64>) -> Self {
124        self.sizes.extend(sizes);
125        self
126    }
127
128    /// # Panics
129    /// panics if any blob size exceeds the configured max
130    pub fn build(self) -> FakeStore {
131        if let Some(max) = self.config.max_blob_size {
132            for &size in &self.sizes {
133                assert!(size <= max, "Blob size {} exceeds maximum {}", size, max);
134            }
135        }
136
137        FakeStore::new_with_config(self.sizes, self.config)
138    }
139}
140
141/// fake iroh-blobs store for testing
142///
143/// generates data on-the-fly, stores nothing. for testing big blobs when you don't
144/// care about content.
145///
146/// # Examples
147///
148/// ```
149/// use iroh_fake_store::{FakeStore, DataStrategy};
150///
151/// # tokio_test::block_on(async {
152/// let store = FakeStore::new([1024, 2048]);
153///
154/// let store = FakeStore::builder()
155///     .strategy(DataStrategy::Zeros)
156///     .max_blob_size(Some(1024 * 1024 * 100)) // 100MB max
157///     .with_blob(1024)
158///     .build();
159/// # });
160/// ```
161#[derive(Debug, Clone)]
162pub struct FakeStore {
163    store: Store,
164}
165
166impl std::ops::Deref for FakeStore {
167    type Target = Store;
168
169    fn deref(&self) -> &Self::Target {
170        &self.store
171    }
172}
173
174/// wrapper around mpsc::Sender<EncodedItem> that impls bao_tree::io::mixed::Sender
175#[derive(RefCast)]
176#[repr(transparent)]
177struct BaoTreeSender(mpsc::Sender<EncodedItem>);
178
179impl bao_tree::io::mixed::Sender for BaoTreeSender {
180    type Error = irpc::channel::SendError;
181    async fn send(&mut self, item: EncodedItem) -> std::result::Result<(), Self::Error> {
182        self.0.send(item).await
183    }
184}
185
186#[derive(Debug, Clone)]
187struct BlobMetadata {
188    size: u64,
189    outboard: Bytes,
190    strategy: DataStrategy,
191}
192
193struct Actor {
194    commands: tokio::sync::mpsc::Receiver<proto::Command>,
195    tasks: JoinSet<()>,
196    idle_waiters: Vec<irpc::channel::oneshot::Sender<()>>,
197    blobs: Arc<Mutex<HashMap<Hash, BlobMetadata>>>,
198    strategy: DataStrategy,
199    tags: BTreeMap<api::Tag, HashAndFormat>,
200}
201
202impl Actor {
203    fn new(
204        commands: tokio::sync::mpsc::Receiver<proto::Command>,
205        blobs: HashMap<Hash, BlobMetadata>,
206        strategy: DataStrategy,
207    ) -> Self {
208        Self {
209            blobs: Arc::new(Mutex::new(blobs)),
210            commands,
211            tasks: JoinSet::new(),
212            idle_waiters: Vec::new(),
213            strategy,
214            tags: BTreeMap::new(),
215        }
216    }
217
218    async fn handle_command(&mut self, cmd: Command) -> Option<irpc::channel::oneshot::Sender<()>> {
219        match cmd {
220            Command::ImportBao(msg) => {
221                self.handle_import_bao(msg).await;
222            }
223            Command::WaitIdle(WaitIdleMsg { tx, .. }) => {
224                if self.tasks.is_empty() {
225                    tx.send(()).await.ok();
226                } else {
227                    self.idle_waiters.push(tx);
228                }
229            }
230            Command::ImportBytes(msg) => {
231                self.handle_import_bytes(msg).await;
232            }
233            Command::ImportByteStream(msg) => {
234                self.handle_import_byte_stream(msg).await;
235            }
236            Command::ImportPath(msg) => {
237                msg.tx
238                    .send(io::Error::other("import path not supported").into())
239                    .await
240                    .ok();
241            }
242            Command::Observe(ObserveMsg {
243                inner: ObserveRequest { hash },
244                tx,
245                ..
246            }) => {
247                let size = self.blobs.lock().unwrap().get(&hash).map(|x| x.size);
248                self.tasks.spawn(async move {
249                    if let Some(size) = size {
250                        tx.send(Bitfield::complete(size)).await.ok();
251                    } else {
252                        tx.send(Bitfield::empty()).await.ok();
253                    };
254                });
255            }
256            Command::ExportBao(ExportBaoMsg {
257                inner: ExportBaoRequest { hash, ranges, .. },
258                tx,
259                ..
260            }) => {
261                let metadata = self.blobs.lock().unwrap().get(&hash).cloned();
262                self.tasks.spawn(export_bao(hash, metadata, ranges, tx));
263            }
264            Command::ExportPath(ExportPathMsg {
265                inner: ExportPathRequest { hash, target, .. },
266                tx,
267                ..
268            }) => {
269                let metadata = self.blobs.lock().unwrap().get(&hash).cloned();
270                self.tasks.spawn(export_path(metadata, target, tx));
271            }
272            Command::Batch(_cmd) => {}
273            Command::ClearProtected(cmd) => {
274                cmd.tx.send(Ok(())).await.ok();
275            }
276            Command::CreateTag(cmd) => {
277                use api::proto::CreateTagRequest;
278                use std::time::SystemTime;
279
280                let CreateTagRequest { value } = cmd.inner;
281                let tag = api::Tag::auto(SystemTime::now(), |t| self.tags.contains_key(t));
282                self.tags.insert(tag.clone(), value);
283                cmd.tx.send(Ok(tag)).await.ok();
284            }
285            Command::CreateTempTag(cmd) => {
286                cmd.tx.send(TempTag::new(cmd.inner.value, None)).await.ok();
287            }
288            Command::RenameTag(cmd) => {
289                use api::proto::RenameTagRequest;
290                let RenameTagRequest { from, to } = cmd.inner;
291
292                if let Some(value) = self.tags.remove(&from) {
293                    self.tags.insert(to, value);
294                    cmd.tx.send(Ok(())).await.ok();
295                } else {
296                    cmd.tx
297                        .send(Err(io::Error::new(io::ErrorKind::NotFound, "tag not found").into()))
298                        .await
299                        .ok();
300                }
301            }
302            Command::DeleteTags(cmd) => {
303                use api::proto::DeleteTagsRequest;
304                use std::ops::Bound;
305                let DeleteTagsRequest { from, to } = cmd.inner;
306
307                // delete all tags in the range [from, to)
308                let start: Bound<&api::Tag> = from.as_ref().map_or(Bound::Unbounded, |t| Bound::Included(t));
309                let end: Bound<&api::Tag> = to.as_ref().map_or(Bound::Unbounded, |t| Bound::Excluded(t));
310
311                let to_delete: Vec<_> = self
312                    .tags
313                    .range::<api::Tag, _>((start, end))
314                    .map(|(k, _)| k.clone())
315                    .collect();
316
317                for tag in to_delete {
318                    self.tags.remove(&tag);
319                }
320                cmd.tx.send(Ok(())).await.ok();
321            }
322            Command::DeleteBlobs(cmd) => {
323                cmd.tx
324                    .send(Err(io::Error::other("delete blobs not supported").into()))
325                    .await
326                    .ok();
327            }
328            Command::ListBlobs(cmd) => {
329                let hashes: Vec<Hash> = self.blobs.lock().unwrap().keys().cloned().collect();
330                self.tasks.spawn(async move {
331                    for hash in hashes {
332                        cmd.tx.send(Ok(hash)).await.ok();
333                    }
334                });
335            }
336            Command::BlobStatus(cmd) => {
337                let hash = cmd.inner.hash;
338                let metadata = self.blobs.lock().unwrap().get(&hash).cloned();
339                let status = if let Some(metadata) = metadata {
340                    BlobStatus::Complete {
341                        size: metadata.size,
342                    }
343                } else {
344                    BlobStatus::NotFound
345                };
346                cmd.tx.send(status).await.ok();
347            }
348            Command::ListTags(cmd) => {
349                use api::proto::TagInfo;
350                let tags: Vec<_> = self
351                    .tags
352                    .iter()
353                    .map(|(name, value)| Ok(TagInfo {
354                        name: name.clone(),
355                        hash: value.hash,
356                        format: value.format,
357                    }))
358                    .collect();
359                cmd.tx.send(tags).await.ok();
360            }
361            Command::SetTag(cmd) => {
362                use api::proto::SetTagRequest;
363                let SetTagRequest { name, value } = cmd.inner;
364
365                self.tags.insert(name, value);
366                cmd.tx.send(Ok(())).await.ok();
367            }
368            Command::ListTempTags(cmd) => {
369                cmd.tx.send(Vec::new()).await.ok();
370            }
371            Command::SyncDb(cmd) => {
372                cmd.tx.send(Ok(())).await.ok();
373            }
374            Command::Shutdown(cmd) => {
375                return Some(cmd.tx);
376            }
377            Command::ExportRanges(cmd) => {
378                let metadata = self.blobs.lock().unwrap().get(&cmd.inner.hash).cloned();
379                self.tasks.spawn(export_ranges(cmd, metadata));
380            }
381        }
382        None
383    }
384
385    fn log_unit_task(&self, res: Result<(), JoinError>) {
386        if let Err(e) = res {
387            eprintln!("task failed: {e}");
388        }
389    }
390
391    async fn run(mut self) {
392        loop {
393            tokio::select! {
394                Some(cmd) = self.commands.recv() => {
395                    if let Some(shutdown) = self.handle_command(cmd).await {
396                        shutdown.send(()).await.ok();
397                        break;
398                    }
399                },
400                Some(res) = self.tasks.join_next(), if !self.tasks.is_empty() => {
401                    self.log_unit_task(res);
402                    if self.tasks.is_empty() {
403                        for tx in self.idle_waiters.drain(..) {
404                            tx.send(()).await.ok();
405                        }
406                    }
407                },
408                else => break,
409            }
410        }
411    }
412
413    async fn handle_import_bytes(&mut self, msg: ImportBytesMsg) {
414        use bao_tree::io::outboard::PreOrderMemOutboard;
415        use iroh_blobs::api::blobs::AddProgressItem;
416
417        let ImportBytesMsg {
418            inner: proto::ImportBytesRequest { data, .. },
419            tx,
420            ..
421        } = msg;
422
423        let size = data.len() as u64;
424
425        if tx.send(AddProgressItem::Size(size)).await.is_err() {
426            return;
427        }
428        if tx.send(AddProgressItem::CopyDone).await.is_err() {
429            return;
430        }
431
432        // compute hash from the actual data
433        let outboard = PreOrderMemOutboard::create(&data, IROH_BLOCK_SIZE);
434        let hash = Hash::from(outboard.root);
435
436        // store metadata (we don't store the actual data, just remember the size)
437        self.blobs.lock().unwrap().insert(
438            hash,
439            BlobMetadata {
440                size,
441                outboard: outboard.data.into(),
442                strategy: self.strategy,
443            },
444        );
445
446        // send completion with the hash
447        let temp_tag = api::TempTag::new(
448            HashAndFormat {
449                hash,
450                format: BlobFormat::Raw,
451            },
452            None,
453        );
454        if tx.send(AddProgressItem::Done(temp_tag)).await.is_err() {}
455    }
456
457    async fn handle_import_byte_stream(&mut self, msg: ImportByteStreamMsg) {
458        use bao_tree::io::outboard::PreOrderMemOutboard;
459        use iroh_blobs::api::blobs::AddProgressItem;
460        use proto::ImportByteStreamUpdate;
461
462        let ImportByteStreamMsg { tx, mut rx, .. } = msg;
463
464        // collect all bytes to compute hash
465        let mut data = Vec::new();
466        loop {
467            match rx.recv().await {
468                Ok(Some(ImportByteStreamUpdate::Bytes(chunk))) => {
469                    data.extend_from_slice(&chunk);
470                    if tx
471                        .send(AddProgressItem::CopyProgress(data.len() as u64))
472                        .await
473                        .is_err()
474                    {
475                        return;
476                    }
477                }
478                Ok(Some(ImportByteStreamUpdate::Done)) => {
479                    break;
480                }
481                Ok(None) | Err(_) => {
482                    tx.send(AddProgressItem::Error(io::Error::other(
483                        "stream ended unexpectedly",
484                    )))
485                    .await
486                    .ok();
487                    return;
488                }
489            }
490        }
491
492        let size = data.len() as u64;
493
494        if tx.send(AddProgressItem::CopyDone).await.is_err() {
495            return;
496        }
497
498        // compute hash
499        let outboard = PreOrderMemOutboard::create(&data, IROH_BLOCK_SIZE);
500        let hash = Hash::from(outboard.root);
501
502        // store metadata
503        self.blobs.lock().unwrap().insert(
504            hash,
505            BlobMetadata {
506                size,
507                outboard: outboard.data.into(),
508                strategy: self.strategy,
509            },
510        );
511
512        // send completion
513        let temp_tag = api::TempTag::new(
514            HashAndFormat {
515                hash,
516                format: BlobFormat::Raw,
517            },
518            None,
519        );
520        if tx.send(AddProgressItem::Done(temp_tag)).await.is_err() {}
521    }
522
523    async fn handle_import_bao(&mut self, msg: ImportBaoMsg) {
524        use bao_tree::io::outboard::PreOrderMemOutboard;
525        use proto::ImportBaoRequest;
526
527        let ImportBaoMsg {
528            inner: ImportBaoRequest { hash, size },
529            tx,
530            mut rx,
531            ..
532        } = msg;
533
534        let size_u64 = size.get();
535        let strategy = self.strategy;
536        let blobs = self.blobs.clone();
537
538        self.tasks.spawn(async move {
539            // consume all incoming chunks
540            while let Ok(Some(_item)) = rx.recv().await {
541                // just drain them, don't store
542            }
543
544            // once all chunks consumed, generate fake data to compute outboard
545            let data = generate_data_for_strategy(size_u64, strategy);
546            let outboard = PreOrderMemOutboard::create(&data, IROH_BLOCK_SIZE);
547
548            // store metadata
549            blobs.lock().unwrap().insert(
550                hash,
551                BlobMetadata {
552                    size: size_u64,
553                    outboard: outboard.data.into(),
554                    strategy,
555                },
556            );
557
558            tx.send(Ok(())).await.ok();
559        });
560    }
561}
562
563/// generates data on-the-fly based on strategy and offset
564#[derive(Clone)]
565struct DataReader {
566    strategy: DataStrategy,
567}
568
569impl DataReader {
570    fn new(strategy: DataStrategy) -> Self {
571        Self { strategy }
572    }
573
574    fn byte_at(&self, offset: u64) -> u8 {
575        match self.strategy {
576            DataStrategy::Zeros => 0,
577            DataStrategy::Ones => 0xFF,
578            DataStrategy::PseudoRandom { seed } => {
579                // offset-based hashing for independent byte generation
580                // simpler than LCG jump-ahead, still deterministic
581                // simple mixing function (SplitMix64-inspired)
582                let mut x = seed.wrapping_add(offset);
583                x = (x ^ (x >> 30)).wrapping_mul(0xbf58476d1ce4e5b9);
584                x = (x ^ (x >> 27)).wrapping_mul(0x94d049bb133111eb);
585                x = x ^ (x >> 31);
586                (x >> 24) as u8
587            }
588        }
589    }
590}
591
592impl ReadAt for DataReader {
593    fn read_at(&self, offset: u64, buf: &mut [u8]) -> io::Result<usize> {
594        for (i, byte) in buf.iter_mut().enumerate() {
595            *byte = self.byte_at(offset + i as u64);
596        }
597        Ok(buf.len())
598    }
599}
600
601impl ReadBytesAt for DataReader {
602    fn read_bytes_at(&self, offset: u64, size: usize) -> io::Result<Bytes> {
603        let mut data = vec![0u8; size];
604        self.read_at(offset, &mut data)?;
605        Ok(Bytes::from(data))
606    }
607}
608
609fn generate_data_for_strategy(size: u64, strategy: DataStrategy) -> Vec<u8> {
610    let reader = DataReader::new(strategy);
611    let mut data = vec![0u8; size as usize];
612    reader.read_at(0, &mut data).expect("read should succeed");
613    data
614}
615
616fn compute_hash_for_strategy(size: u64, strategy: DataStrategy) -> Hash {
617    let data = generate_data_for_strategy(size, strategy);
618    let outboard = PreOrderMemOutboard::create(&data, IROH_BLOCK_SIZE);
619    Hash::from(outboard.root)
620}
621
622async fn export_bao(
623    hash: Hash,
624    metadata: Option<BlobMetadata>,
625    ranges: ChunkRanges,
626    mut sender: mpsc::Sender<EncodedItem>,
627) {
628    let metadata = match metadata {
629        Some(metadata) => metadata,
630        None => {
631            sender
632                .send(EncodedItem::Error(bao_tree::io::EncodeError::Io(
633                    io::Error::new(
634                        io::ErrorKind::UnexpectedEof,
635                        "export task ended unexpectedly",
636                    ),
637                )))
638                .await
639                .ok();
640            return;
641        }
642    };
643
644    let size = metadata.size;
645    let data = DataReader::new(metadata.strategy);
646
647    let tree = BaoTree::new(size, IROH_BLOCK_SIZE);
648    let outboard = PreOrderMemOutboard {
649        root: hash.into(),
650        tree,
651        data: metadata.outboard,
652    };
653
654    let sender = BaoTreeSender::ref_cast_mut(&mut sender);
655    traverse_ranges_validated(data, outboard, &ranges, sender)
656        .await
657        .ok();
658}
659
660async fn export_ranges(mut cmd: ExportRangesMsg, metadata: Option<BlobMetadata>) {
661    let Some(metadata) = metadata else {
662        cmd.tx
663            .send(ExportRangesItem::Error(api::Error::io(
664                io::ErrorKind::NotFound,
665                "hash not found",
666            )))
667            .await
668            .ok();
669        return;
670    };
671    if let Err(cause) = export_ranges_impl(cmd.inner, &mut cmd.tx, metadata).await {
672        cmd.tx
673            .send(ExportRangesItem::Error(cause.into()))
674            .await
675            .ok();
676    }
677}
678
679async fn export_ranges_impl(
680    cmd: ExportRangesRequest,
681    tx: &mut mpsc::Sender<ExportRangesItem>,
682    metadata: BlobMetadata,
683) -> io::Result<()> {
684    let ExportRangesRequest { ranges, .. } = cmd;
685    let size = metadata.size;
686    let bitfield = Bitfield::complete(size);
687
688    for range in ranges.iter() {
689        let range = match range {
690            RangeSetRange::Range(range) => size.min(*range.start)..size.min(*range.end),
691            RangeSetRange::RangeFrom(range) => size.min(*range.start)..size,
692        };
693        let requested = ChunkRanges::bytes(range.start..range.end);
694        if !bitfield.ranges.is_superset(&requested) {
695            return Err(io::Error::other(format!(
696                "missing range: {requested:?}, present: {bitfield:?}",
697            )));
698        }
699        let reader = DataReader::new(metadata.strategy);
700        let bs = 1024;
701        let mut offset = range.start;
702        loop {
703            let end: u64 = (offset + bs).min(range.end);
704            let chunk_size = (end - offset) as usize;
705            let data = reader.read_bytes_at(offset, chunk_size)?;
706            tx.send(Leaf { offset, data }.into()).await?;
707            offset = end;
708            if offset >= range.end {
709                break;
710            }
711        }
712    }
713    Ok(())
714}
715
716impl FakeStore {
717    /// uses zeros strategy. for more control use [`FakeStore::builder()`]
718    pub fn new(sizes: impl IntoIterator<Item = u64>) -> Self {
719        Self::builder().with_blobs(sizes).build()
720    }
721
722    pub fn builder() -> FakeStoreBuilder {
723        FakeStoreBuilder::new()
724    }
725
726    fn new_with_config(sizes: impl IntoIterator<Item = u64>, config: FakeStoreConfig) -> Self {
727        let mut blobs = HashMap::new();
728        for size in sizes {
729            let hash = compute_hash_for_strategy(size, config.strategy);
730            let data = generate_data_for_strategy(size, config.strategy);
731            let outboard_data = PreOrderMemOutboard::create(&data, IROH_BLOCK_SIZE);
732            blobs.insert(
733                hash,
734                BlobMetadata {
735                    size,
736                    outboard: outboard_data.data.into(),
737                    strategy: config.strategy,
738                },
739            );
740        }
741
742        let (sender, receiver) = tokio::sync::mpsc::channel(1);
743        let actor = Actor::new(receiver, blobs, config.strategy);
744        tokio::spawn(actor.run());
745
746        // Store is #[repr(transparent)] so we can use RefCast
747        let local = irpc::LocalSender::from(sender);
748        let client = local.into();
749
750        Self {
751            store: Store::ref_cast(&client).clone(),
752        }
753    }
754
755    pub fn store(&self) -> &Store {
756        &self.store
757    }
758}
759
760async fn export_path(
761    metadata: Option<BlobMetadata>,
762    target: PathBuf,
763    mut tx: mpsc::Sender<ExportProgressItem>,
764) {
765    let Some(metadata) = metadata else {
766        tx.send(api::Error::io(io::ErrorKind::NotFound, "hash not found").into())
767            .await
768            .ok();
769        return;
770    };
771    match export_path_impl(metadata, target, &mut tx).await {
772        Ok(()) => tx.send(ExportProgressItem::Done).await.ok(),
773        Err(cause) => tx.send(api::Error::from(cause).into()).await.ok(),
774    };
775}
776
777async fn export_path_impl(
778    metadata: BlobMetadata,
779    target: PathBuf,
780    tx: &mut mpsc::Sender<ExportProgressItem>,
781) -> io::Result<()> {
782    let size = metadata.size;
783    let mut file = std::fs::File::create(&target)?;
784    tx.send(ExportProgressItem::Size(size)).await?;
785
786    let buf = [0u8; 1024 * 64];
787    for offset in (0..size).step_by(1024 * 64) {
788        let len = std::cmp::min(size - offset, 1024 * 64) as usize;
789        let buf = &buf[..len];
790        file.write_all(buf)?;
791        tx.try_send(ExportProgressItem::CopyProgress(offset))
792            .await?;
793    }
794    Ok(())
795}
796
797#[cfg(test)]
798mod tests;