iroh_blobs/store/
bao_file.rs

1//! An implementation of a bao file, meaning some data blob with associated
2//! outboard.
3//!
4//! Compared to just a pair of (data, outboard), this implementation also works
5//! when both the data and the outboard is incomplete, and not even the size
6//! is fully known.
7//!
8//! There is a full in memory implementation, and an implementation that uses
9//! the file system for the data, outboard, and sizes file. There is also a
10//! combined implementation that starts in memory and switches to file when
11//! the memory limit is reached.
12use std::{
13    fs::{File, OpenOptions},
14    io,
15    ops::{Deref, DerefMut},
16    path::{Path, PathBuf},
17    sync::{Arc, RwLock, Weak},
18};
19
20use bao_tree::{
21    io::{
22        fsm::BaoContentItem,
23        outboard::PreOrderOutboard,
24        sync::{ReadAt, WriteAt},
25    },
26    BaoTree,
27};
28use bytes::{Bytes, BytesMut};
29use derive_more::Debug;
30use iroh_io::AsyncSliceReader;
31
32use super::mutable_mem_storage::{MutableMemStorage, SizeInfo};
33use crate::{
34    store::BaoBatchWriter,
35    util::{get_limited_slice, MemOrFile, SparseMemFile},
36    Hash, IROH_BLOCK_SIZE,
37};
38
39/// Data files are stored in 3 files. The data file, the outboard file,
40/// and a sizes file. The sizes file contains the size that the remote side told us
41/// when writing each data block.
42///
43/// For complete data files, the sizes file is not needed, since you can just
44/// use the size of the data file.
45///
46/// For files below the chunk size, the outboard file is not needed, since
47/// there is only one leaf, and the outboard file is empty.
48struct DataPaths {
49    /// The data file. Size is determined by the chunk with the highest offset
50    /// that has been written.
51    ///
52    /// Gaps will be filled with zeros.
53    data: PathBuf,
54    /// The outboard file. This is *without* the size header, since that is not
55    /// known for partial files.
56    ///
57    /// The size of the outboard file is therefore a multiple of a hash pair
58    /// (64 bytes).
59    ///
60    /// The naming convention is to use obao for pre order traversal and oboa
61    /// for post order traversal. The log2 of the chunk group size is appended,
62    /// so for the default chunk group size in iroh of 4, the file extension
63    /// is .obao4.
64    outboard: PathBuf,
65    /// The sizes file. This is a file with 8 byte sizes for each chunk group.
66    /// The naming convention is to prepend the log2 of the chunk group size,
67    /// so for the default chunk group size in iroh of 4, the file extension
68    /// is .sizes4.
69    ///
70    /// The traversal order is not relevant for the sizes file, since it is
71    /// about the data chunks, not the hash pairs.
72    sizes: PathBuf,
73}
74
75/// Storage for complete blobs. There is no longer any uncertainty about the
76/// size, so we don't need a sizes file.
77///
78/// Writing is not possible but also not needed, since the file is complete.
79/// This covers all combinations of data and outboard being in memory or on
80/// disk.
81///
82/// For the memory variant, it does reading in a zero copy way, since storage
83/// is already a `Bytes`.
84#[derive(Default, derive_more::Debug)]
85pub struct CompleteStorage {
86    /// data part, which can be in memory or on disk.
87    #[debug("{:?}", data.as_ref().map_mem(|x| x.len()))]
88    pub data: MemOrFile<Bytes, (File, u64)>,
89    /// outboard part, which can be in memory or on disk.
90    #[debug("{:?}", outboard.as_ref().map_mem(|x| x.len()))]
91    pub outboard: MemOrFile<Bytes, (File, u64)>,
92}
93
94impl CompleteStorage {
95    /// Read from the data file at the given offset, until end of file or max bytes.
96    pub fn read_data_at(&self, offset: u64, len: usize) -> Bytes {
97        match &self.data {
98            MemOrFile::Mem(mem) => get_limited_slice(mem, offset, len),
99            MemOrFile::File((file, _size)) => read_to_end(file, offset, len).unwrap(),
100        }
101    }
102
103    /// Read from the outboard file at the given offset, until end of file or max bytes.
104    pub fn read_outboard_at(&self, offset: u64, len: usize) -> Bytes {
105        match &self.outboard {
106            MemOrFile::Mem(mem) => get_limited_slice(mem, offset, len),
107            MemOrFile::File((file, _size)) => read_to_end(file, offset, len).unwrap(),
108        }
109    }
110
111    /// The size of the data file.
112    pub fn data_size(&self) -> u64 {
113        match &self.data {
114            MemOrFile::Mem(mem) => mem.len() as u64,
115            MemOrFile::File((_file, size)) => *size,
116        }
117    }
118
119    /// The size of the outboard file.
120    pub fn outboard_size(&self) -> u64 {
121        match &self.outboard {
122            MemOrFile::Mem(mem) => mem.len() as u64,
123            MemOrFile::File((_file, size)) => *size,
124        }
125    }
126}
127
128/// Create a file for reading and writing, but *without* truncating the existing
129/// file.
130fn create_read_write(path: impl AsRef<Path>) -> io::Result<File> {
131    OpenOptions::new()
132        .read(true)
133        .write(true)
134        .create(true)
135        .truncate(false)
136        .open(path)
137}
138
139/// Read from the given file at the given offset, until end of file or max bytes.
140fn read_to_end(file: impl ReadAt, offset: u64, max: usize) -> io::Result<Bytes> {
141    let mut res = BytesMut::new();
142    let mut buf = [0u8; 4096];
143    let mut remaining = max;
144    let mut offset = offset;
145    while remaining > 0 {
146        let end = buf.len().min(remaining);
147        let read = file.read_at(offset, &mut buf[..end])?;
148        if read == 0 {
149            // eof
150            break;
151        }
152        res.extend_from_slice(&buf[..read]);
153        offset += read as u64;
154        remaining -= read;
155    }
156    Ok(res.freeze())
157}
158
159fn max_offset(batch: &[BaoContentItem]) -> u64 {
160    batch
161        .iter()
162        .filter_map(|item| match item {
163            BaoContentItem::Leaf(leaf) => {
164                let len = leaf.data.len().try_into().unwrap();
165                let end = leaf
166                    .offset
167                    .checked_add(len)
168                    .expect("u64 overflow for leaf end");
169                Some(end)
170            }
171            _ => None,
172        })
173        .max()
174        .unwrap_or(0)
175}
176
177/// A file storage for an incomplete bao file.
178#[derive(Debug)]
179pub struct FileStorage {
180    data: std::fs::File,
181    outboard: std::fs::File,
182    sizes: std::fs::File,
183}
184
185impl FileStorage {
186    /// Split into data, outboard and sizes files.
187    pub fn into_parts(self) -> (File, File, File) {
188        (self.data, self.outboard, self.sizes)
189    }
190
191    fn current_size(&self) -> io::Result<u64> {
192        let len = self.sizes.metadata()?.len();
193        if len < 8 {
194            Ok(0)
195        } else {
196            // todo: use the last full u64 in case the sizes file is not a multiple of 8
197            // bytes. Not sure how that would happen, but we should handle it.
198            let mut buf = [0u8; 8];
199            self.sizes.read_exact_at(len - 8, &mut buf)?;
200            Ok(u64::from_le_bytes(buf))
201        }
202    }
203
204    fn write_batch(&mut self, size: u64, batch: &[BaoContentItem]) -> io::Result<()> {
205        let tree = BaoTree::new(size, IROH_BLOCK_SIZE);
206        for item in batch {
207            match item {
208                BaoContentItem::Parent(parent) => {
209                    if let Some(offset) = tree.pre_order_offset(parent.node) {
210                        let o0 = offset * 64;
211                        self.outboard
212                            .write_all_at(o0, parent.pair.0.as_bytes().as_slice())?;
213                        self.outboard
214                            .write_all_at(o0 + 32, parent.pair.1.as_bytes().as_slice())?;
215                    }
216                }
217                BaoContentItem::Leaf(leaf) => {
218                    let o0 = leaf.offset;
219                    // divide by chunk size, multiply by 8
220                    let index = (leaf.offset >> (tree.block_size().chunk_log() + 10)) << 3;
221                    tracing::trace!(
222                        "write_batch f={:?} o={} l={}",
223                        self.data,
224                        o0,
225                        leaf.data.len()
226                    );
227                    self.data.write_all_at(o0, leaf.data.as_ref())?;
228                    let size = tree.size();
229                    self.sizes.write_all_at(index, &size.to_le_bytes())?;
230                }
231            }
232        }
233        Ok(())
234    }
235
236    fn read_data_at(&self, offset: u64, len: usize) -> io::Result<Bytes> {
237        read_to_end(&self.data, offset, len)
238    }
239
240    fn read_outboard_at(&self, offset: u64, len: usize) -> io::Result<Bytes> {
241        read_to_end(&self.outboard, offset, len)
242    }
243}
244
245/// The storage for a bao file. This can be either in memory or on disk.
246#[derive(Debug)]
247pub(crate) enum BaoFileStorage {
248    /// The entry is incomplete and in memory.
249    ///
250    /// Since it is incomplete, it must be writeable.
251    ///
252    /// This is used mostly for tiny entries, <= 16 KiB. But in principle it
253    /// can be used for larger sizes.
254    ///
255    /// Incomplete mem entries are *not* persisted at all. So if the store
256    /// crashes they will be gone.
257    IncompleteMem(MutableMemStorage),
258    /// The entry is incomplete and on disk.
259    IncompleteFile(FileStorage),
260    /// The entry is complete. Outboard and data can come from different sources
261    /// (memory or file).
262    ///
263    /// Writing to this is a no-op, since it is already complete.
264    Complete(CompleteStorage),
265}
266
267impl Default for BaoFileStorage {
268    fn default() -> Self {
269        BaoFileStorage::Complete(Default::default())
270    }
271}
272
273impl BaoFileStorage {
274    /// Take the storage out, leaving an empty storage in its place.
275    ///
276    /// Be careful to put something back in its place, or you will lose data.
277    #[cfg(feature = "fs-store")]
278    pub fn take(&mut self) -> Self {
279        std::mem::take(self)
280    }
281
282    /// Create a new mutable mem storage.
283    pub fn incomplete_mem() -> Self {
284        Self::IncompleteMem(Default::default())
285    }
286
287    /// Call sync_all on all the files.
288    fn sync_all(&self) -> io::Result<()> {
289        match self {
290            Self::Complete(_) => Ok(()),
291            Self::IncompleteMem(_) => Ok(()),
292            Self::IncompleteFile(file) => {
293                file.data.sync_all()?;
294                file.outboard.sync_all()?;
295                file.sizes.sync_all()?;
296                Ok(())
297            }
298        }
299    }
300
301    /// True if the storage is in memory.
302    pub fn is_mem(&self) -> bool {
303        match self {
304            Self::IncompleteMem(_) => true,
305            Self::IncompleteFile(_) => false,
306            Self::Complete(c) => c.data.is_mem() && c.outboard.is_mem(),
307        }
308    }
309}
310
311/// A weak reference to a bao file handle.
312#[derive(Debug, Clone)]
313pub struct BaoFileHandleWeak(Weak<BaoFileHandleInner>);
314
315impl BaoFileHandleWeak {
316    /// Upgrade to a strong reference if possible.
317    pub fn upgrade(&self) -> Option<BaoFileHandle> {
318        self.0.upgrade().map(BaoFileHandle)
319    }
320
321    /// True if the handle is still live (has strong references)
322    pub fn is_live(&self) -> bool {
323        self.0.strong_count() > 0
324    }
325}
326
327/// The inner part of a bao file handle.
328#[derive(Debug)]
329pub struct BaoFileHandleInner {
330    pub(crate) storage: RwLock<BaoFileStorage>,
331    config: Arc<BaoFileConfig>,
332    hash: Hash,
333}
334
335/// A cheaply cloneable handle to a bao file, including the hash and the configuration.
336#[derive(Debug, Clone, derive_more::Deref)]
337pub struct BaoFileHandle(Arc<BaoFileHandleInner>);
338
339pub(crate) type CreateCb = Arc<dyn Fn(&Hash) -> io::Result<()> + Send + Sync>;
340
341/// Configuration for the deferred batch writer. It will start writing to memory,
342/// and then switch to a file when the memory limit is reached.
343#[derive(derive_more::Debug, Clone)]
344pub struct BaoFileConfig {
345    /// Directory to store files in. Only used when memory limit is reached.
346    dir: Arc<PathBuf>,
347    /// Maximum data size (inclusive) before switching to file mode.
348    max_mem: usize,
349    /// Callback to call when we switch to file mode.
350    ///
351    /// Todo: make this async.
352    #[debug("{:?}", on_file_create.as_ref().map(|_| ()))]
353    on_file_create: Option<CreateCb>,
354}
355
356impl BaoFileConfig {
357    /// Create a new deferred batch writer configuration.
358    pub fn new(dir: Arc<PathBuf>, max_mem: usize, on_file_create: Option<CreateCb>) -> Self {
359        Self {
360            dir,
361            max_mem,
362            on_file_create,
363        }
364    }
365
366    /// Get the paths for a hash.
367    fn paths(&self, hash: &Hash) -> DataPaths {
368        DataPaths {
369            data: self.dir.join(format!("{}.data", hash.to_hex())),
370            outboard: self.dir.join(format!("{}.obao4", hash.to_hex())),
371            sizes: self.dir.join(format!("{}.sizes4", hash.to_hex())),
372        }
373    }
374}
375
376/// A reader for a bao file, reading just the data.
377#[derive(Debug)]
378pub struct DataReader(Option<BaoFileHandle>);
379
380async fn with_storage<T, P, F>(opt: &mut Option<BaoFileHandle>, no_io: P, f: F) -> io::Result<T>
381where
382    P: Fn(&BaoFileStorage) -> bool + Send + 'static,
383    F: FnOnce(&BaoFileStorage) -> io::Result<T> + Send + 'static,
384    T: Send + 'static,
385{
386    let handle = opt
387        .take()
388        .ok_or_else(|| io::Error::new(io::ErrorKind::Other, "deferred batch busy"))?;
389    // if we can get the lock immediately, and we are in memory mode, we can
390    // avoid spawning a task.
391    if let Ok(storage) = handle.storage.try_read() {
392        if no_io(&storage) {
393            let res = f(&storage);
394            // clone because for some reason even when we drop storage, the
395            // borrow checker still thinks handle is borrowed.
396            *opt = Some(handle.clone());
397            return res;
398        }
399    };
400    // otherwise, we have to spawn a task.
401    let (handle, res) = tokio::task::spawn_blocking(move || {
402        let storage = handle.storage.read().unwrap();
403        let res = f(storage.deref());
404        drop(storage);
405        (handle, res)
406    })
407    .await
408    .expect("spawn_blocking failed");
409    *opt = Some(handle);
410    res
411}
412
413impl AsyncSliceReader for DataReader {
414    async fn read_at(&mut self, offset: u64, len: usize) -> io::Result<Bytes> {
415        with_storage(
416            &mut self.0,
417            BaoFileStorage::is_mem,
418            move |storage| match storage {
419                BaoFileStorage::Complete(mem) => Ok(mem.read_data_at(offset, len)),
420                BaoFileStorage::IncompleteMem(mem) => Ok(mem.read_data_at(offset, len)),
421                BaoFileStorage::IncompleteFile(file) => file.read_data_at(offset, len),
422            },
423        )
424        .await
425    }
426
427    async fn size(&mut self) -> io::Result<u64> {
428        with_storage(
429            &mut self.0,
430            BaoFileStorage::is_mem,
431            move |storage| match storage {
432                BaoFileStorage::Complete(mem) => Ok(mem.data_size()),
433                BaoFileStorage::IncompleteMem(mem) => Ok(mem.data.len() as u64),
434                BaoFileStorage::IncompleteFile(file) => file.data.metadata().map(|m| m.len()),
435            },
436        )
437        .await
438    }
439}
440
441/// A reader for the outboard part of a bao file.
442#[derive(Debug)]
443pub struct OutboardReader(Option<BaoFileHandle>);
444
445impl AsyncSliceReader for OutboardReader {
446    async fn read_at(&mut self, offset: u64, len: usize) -> io::Result<Bytes> {
447        with_storage(
448            &mut self.0,
449            BaoFileStorage::is_mem,
450            move |storage| match storage {
451                BaoFileStorage::Complete(mem) => Ok(mem.read_outboard_at(offset, len)),
452                BaoFileStorage::IncompleteMem(mem) => Ok(mem.read_outboard_at(offset, len)),
453                BaoFileStorage::IncompleteFile(file) => file.read_outboard_at(offset, len),
454            },
455        )
456        .await
457    }
458
459    async fn size(&mut self) -> io::Result<u64> {
460        with_storage(
461            &mut self.0,
462            BaoFileStorage::is_mem,
463            move |storage| match storage {
464                BaoFileStorage::Complete(mem) => Ok(mem.outboard_size()),
465                BaoFileStorage::IncompleteMem(mem) => Ok(mem.outboard.len() as u64),
466                BaoFileStorage::IncompleteFile(file) => file.outboard.metadata().map(|m| m.len()),
467            },
468        )
469        .await
470    }
471}
472
473enum HandleChange {
474    None,
475    MemToFile,
476    // later: size verified
477}
478
479impl BaoFileHandle {
480    /// Create a new bao file handle.
481    ///
482    /// This will create a new file handle with an empty memory storage.
483    /// Since there are very likely to be many of these, we use an arc rwlock
484    pub fn incomplete_mem(config: Arc<BaoFileConfig>, hash: Hash) -> Self {
485        let storage = BaoFileStorage::incomplete_mem();
486        Self(Arc::new(BaoFileHandleInner {
487            storage: RwLock::new(storage),
488            config,
489            hash,
490        }))
491    }
492
493    /// Create a new bao file handle with a partial file.
494    pub fn incomplete_file(config: Arc<BaoFileConfig>, hash: Hash) -> io::Result<Self> {
495        let paths = config.paths(&hash);
496        let storage = BaoFileStorage::IncompleteFile(FileStorage {
497            data: create_read_write(&paths.data)?,
498            outboard: create_read_write(&paths.outboard)?,
499            sizes: create_read_write(&paths.sizes)?,
500        });
501        Ok(Self(Arc::new(BaoFileHandleInner {
502            storage: RwLock::new(storage),
503            config,
504            hash,
505        })))
506    }
507
508    /// Create a new complete bao file handle.
509    pub fn new_complete(
510        config: Arc<BaoFileConfig>,
511        hash: Hash,
512        data: MemOrFile<Bytes, (File, u64)>,
513        outboard: MemOrFile<Bytes, (File, u64)>,
514    ) -> Self {
515        let storage = BaoFileStorage::Complete(CompleteStorage { data, outboard });
516        Self(Arc::new(BaoFileHandleInner {
517            storage: RwLock::new(storage),
518            config,
519            hash,
520        }))
521    }
522
523    /// Transform the storage in place. If the transform fails, the storage will
524    /// be an immutable empty storage.
525    #[cfg(feature = "fs-store")]
526    pub(crate) fn transform(
527        &self,
528        f: impl FnOnce(BaoFileStorage) -> io::Result<BaoFileStorage>,
529    ) -> io::Result<()> {
530        let mut lock = self.storage.write().unwrap();
531        let storage = lock.take();
532        *lock = f(storage)?;
533        Ok(())
534    }
535
536    /// True if the file is complete.
537    pub fn is_complete(&self) -> bool {
538        matches!(
539            self.storage.read().unwrap().deref(),
540            BaoFileStorage::Complete(_)
541        )
542    }
543
544    /// An AsyncSliceReader for the data file.
545    ///
546    /// Caution: this is a reader for the unvalidated data file. Reading this
547    /// can produce data that does not match the hash.
548    pub fn data_reader(&self) -> DataReader {
549        DataReader(Some(self.clone()))
550    }
551
552    /// An AsyncSliceReader for the outboard file.
553    ///
554    /// The outboard file is used to validate the data file. It is not guaranteed
555    /// to be complete.
556    pub fn outboard_reader(&self) -> OutboardReader {
557        OutboardReader(Some(self.clone()))
558    }
559
560    /// The most precise known total size of the data file.
561    pub fn current_size(&self) -> io::Result<u64> {
562        match self.storage.read().unwrap().deref() {
563            BaoFileStorage::Complete(mem) => Ok(mem.data_size()),
564            BaoFileStorage::IncompleteMem(mem) => Ok(mem.current_size()),
565            BaoFileStorage::IncompleteFile(file) => file.current_size(),
566        }
567    }
568
569    /// The outboard for the file.
570    pub fn outboard(&self) -> io::Result<PreOrderOutboard<OutboardReader>> {
571        let root = self.hash.into();
572        let tree = BaoTree::new(self.current_size()?, IROH_BLOCK_SIZE);
573        let outboard = self.outboard_reader();
574        Ok(PreOrderOutboard {
575            root,
576            tree,
577            data: outboard,
578        })
579    }
580
581    /// The hash of the file.
582    pub fn hash(&self) -> Hash {
583        self.hash
584    }
585
586    /// Create a new writer from the handle.
587    pub fn writer(&self) -> BaoFileWriter {
588        BaoFileWriter(Some(self.clone()))
589    }
590
591    /// This is the synchronous impl for writing a batch.
592    fn write_batch(&self, size: u64, batch: &[BaoContentItem]) -> io::Result<HandleChange> {
593        let mut storage = self.storage.write().unwrap();
594        match storage.deref_mut() {
595            BaoFileStorage::IncompleteMem(mem) => {
596                // check if we need to switch to file mode, otherwise write to memory
597                if max_offset(batch) <= self.config.max_mem as u64 {
598                    mem.write_batch(size, batch)?;
599                    Ok(HandleChange::None)
600                } else {
601                    // create the paths. This allocates 3 pathbufs, so we do it
602                    // only when we need to.
603                    let paths = self.config.paths(&self.hash);
604                    // *first* switch to file mode, *then* write the batch.
605                    //
606                    // otherwise we might allocate a lot of memory if we get
607                    // a write at the end of a very large file.
608                    let mut file_batch = mem.persist(paths)?;
609                    file_batch.write_batch(size, batch)?;
610                    *storage = BaoFileStorage::IncompleteFile(file_batch);
611                    Ok(HandleChange::MemToFile)
612                }
613            }
614            BaoFileStorage::IncompleteFile(file) => {
615                // already in file mode, just write the batch
616                file.write_batch(size, batch)?;
617                Ok(HandleChange::None)
618            }
619            BaoFileStorage::Complete(_) => {
620                // we are complete, so just ignore the write
621                // unless there is a bug, this would just write the exact same data
622                Ok(HandleChange::None)
623            }
624        }
625    }
626
627    /// Downgrade to a weak reference.
628    pub fn downgrade(&self) -> BaoFileHandleWeak {
629        BaoFileHandleWeak(Arc::downgrade(&self.0))
630    }
631}
632
633impl SizeInfo {
634    /// Persist into a file where each chunk has its own slot.
635    pub fn persist(&self, mut target: impl WriteAt) -> io::Result<()> {
636        let size_offset = (self.offset >> IROH_BLOCK_SIZE.chunk_log()) << 3;
637        target.write_all_at(size_offset, self.size.to_le_bytes().as_slice())?;
638        Ok(())
639    }
640
641    /// Convert to a vec in slot format.
642    pub fn to_vec(&self) -> Vec<u8> {
643        let mut res = Vec::new();
644        self.persist(&mut res).expect("io error writing to vec");
645        res
646    }
647}
648
649impl MutableMemStorage {
650    /// Persist the batch to disk, creating a FileBatch.
651    fn persist(&self, paths: DataPaths) -> io::Result<FileStorage> {
652        let mut data = create_read_write(&paths.data)?;
653        let mut outboard = create_read_write(&paths.outboard)?;
654        let mut sizes = create_read_write(&paths.sizes)?;
655        self.data.persist(&mut data)?;
656        self.outboard.persist(&mut outboard)?;
657        self.sizes.persist(&mut sizes)?;
658        data.sync_all()?;
659        outboard.sync_all()?;
660        sizes.sync_all()?;
661        Ok(FileStorage {
662            data,
663            outboard,
664            sizes,
665        })
666    }
667
668    /// Get the parts data, outboard and sizes
669    pub fn into_parts(self) -> (SparseMemFile, SparseMemFile, SizeInfo) {
670        (self.data, self.outboard, self.sizes)
671    }
672}
673
674/// This is finally the thing for which we can implement BaoPairMut.
675///
676/// It is a BaoFileHandle wrapped in an Option, so that we can take it out
677/// in the future.
678#[derive(Debug)]
679pub struct BaoFileWriter(Option<BaoFileHandle>);
680
681impl BaoBatchWriter for BaoFileWriter {
682    async fn write_batch(&mut self, size: u64, batch: Vec<BaoContentItem>) -> std::io::Result<()> {
683        let Some(handle) = self.0.take() else {
684            return Err(io::Error::new(io::ErrorKind::Other, "deferred batch busy"));
685        };
686        let (handle, change) = tokio::task::spawn_blocking(move || {
687            let change = handle.write_batch(size, &batch);
688            (handle, change)
689        })
690        .await
691        .expect("spawn_blocking failed");
692        match change? {
693            HandleChange::None => {}
694            HandleChange::MemToFile => {
695                if let Some(cb) = handle.config.on_file_create.as_ref() {
696                    cb(&handle.hash)?;
697                }
698            }
699        }
700        self.0 = Some(handle);
701        Ok(())
702    }
703
704    async fn sync(&mut self) -> io::Result<()> {
705        let Some(handle) = self.0.take() else {
706            return Err(io::Error::new(io::ErrorKind::Other, "deferred batch busy"));
707        };
708        let (handle, res) = tokio::task::spawn_blocking(move || {
709            let res = handle.storage.write().unwrap().sync_all();
710            (handle, res)
711        })
712        .await
713        .expect("spawn_blocking failed");
714        self.0 = Some(handle);
715        res
716    }
717}
718
719#[cfg(test)]
720pub mod test_support {
721    use std::{future::Future, io::Cursor, ops::Range};
722
723    use bao_tree::{
724        io::{
725            fsm::{ResponseDecoder, ResponseDecoderNext},
726            outboard::PostOrderMemOutboard,
727            round_up_to_chunks,
728            sync::encode_ranges_validated,
729        },
730        BlockSize, ChunkRanges,
731    };
732    use futures_lite::{Stream, StreamExt};
733    use iroh_io::AsyncStreamReader;
734    use rand::RngCore;
735    use range_collections::RangeSet2;
736
737    use super::*;
738    use crate::util::limited_range;
739
740    pub const IROH_BLOCK_SIZE: BlockSize = BlockSize::from_chunk_log(4);
741
742    /// Decode a response into a batch file writer.
743    pub async fn decode_response_into_batch<R, W>(
744        root: Hash,
745        block_size: BlockSize,
746        ranges: ChunkRanges,
747        mut encoded: R,
748        mut target: W,
749    ) -> io::Result<()>
750    where
751        R: AsyncStreamReader,
752        W: BaoBatchWriter,
753    {
754        let size = encoded.read::<8>().await?;
755        let size = u64::from_le_bytes(size);
756        let mut reading =
757            ResponseDecoder::new(root.into(), ranges, BaoTree::new(size, block_size), encoded);
758        let mut stack = Vec::new();
759        loop {
760            let item = match reading.next().await {
761                ResponseDecoderNext::Done(_reader) => break,
762                ResponseDecoderNext::More((next, item)) => {
763                    reading = next;
764                    item?
765                }
766            };
767            match item {
768                BaoContentItem::Parent(_) => {
769                    stack.push(item);
770                }
771                BaoContentItem::Leaf(_) => {
772                    // write a batch every time we see a leaf
773                    // the last item will be a leaf.
774                    stack.push(item);
775                    target.write_batch(size, std::mem::take(&mut stack)).await?;
776                }
777            }
778        }
779        assert!(stack.is_empty(), "last item should be a leaf");
780        Ok(())
781    }
782
783    pub fn random_test_data(size: usize) -> Vec<u8> {
784        let mut rand = rand::thread_rng();
785        let mut res = vec![0u8; size];
786        rand.fill_bytes(&mut res);
787        res
788    }
789
790    /// Take some data and encode it
791    pub fn simulate_remote(data: &[u8]) -> (Hash, Cursor<Bytes>) {
792        let outboard = bao_tree::io::outboard::PostOrderMemOutboard::create(data, IROH_BLOCK_SIZE);
793        let size = data.len() as u64;
794        let mut encoded = size.to_le_bytes().to_vec();
795        bao_tree::io::sync::encode_ranges_validated(
796            data,
797            &outboard,
798            &ChunkRanges::all(),
799            &mut encoded,
800        )
801        .unwrap();
802        let hash = outboard.root;
803        (hash.into(), Cursor::new(encoded.into()))
804    }
805
806    pub fn to_ranges(ranges: &[Range<u64>]) -> RangeSet2<u64> {
807        let mut range_set = RangeSet2::empty();
808        for range in ranges.as_ref().iter().cloned() {
809            range_set |= RangeSet2::from(range);
810        }
811        range_set
812    }
813
814    /// Simulate the send side, when asked to send bao encoded data for the given ranges.
815    pub fn make_wire_data(
816        data: &[u8],
817        ranges: impl AsRef<[Range<u64>]>,
818    ) -> (Hash, ChunkRanges, Vec<u8>) {
819        // compute a range set from the given ranges
820        let range_set = to_ranges(ranges.as_ref());
821        // round up to chunks
822        let chunk_ranges = round_up_to_chunks(&range_set);
823        // compute the outboard
824        let outboard = PostOrderMemOutboard::create(data, IROH_BLOCK_SIZE).flip();
825        let size = data.len() as u64;
826        let mut encoded = size.to_le_bytes().to_vec();
827        encode_ranges_validated(data, &outboard, &chunk_ranges, &mut encoded).unwrap();
828        (outboard.root.into(), chunk_ranges, encoded)
829    }
830
831    pub async fn validate(handle: &BaoFileHandle, original: &[u8], ranges: &[Range<u64>]) {
832        let mut r = handle.data_reader();
833        for range in ranges {
834            let start = range.start;
835            let len = (range.end - range.start).try_into().unwrap();
836            let data = &original[limited_range(start, len, original.len())];
837            let read = r.read_at(start, len).await.unwrap();
838            assert_eq!(data.len(), read.as_ref().len());
839            assert_eq!(data, read.as_ref());
840        }
841    }
842
843    /// Helper to simulate a slow request.
844    pub fn trickle(
845        data: &[u8],
846        mtu: usize,
847        delay: std::time::Duration,
848    ) -> impl Stream<Item = Bytes> {
849        let parts = data
850            .chunks(mtu)
851            .map(Bytes::copy_from_slice)
852            .collect::<Vec<_>>();
853        futures_lite::stream::iter(parts).then(move |part| async move {
854            tokio::time::sleep(delay).await;
855            part
856        })
857    }
858
859    pub async fn local<F>(f: F) -> F::Output
860    where
861        F: Future,
862    {
863        tokio::task::LocalSet::new().run_until(f).await
864    }
865}
866
867#[cfg(test)]
868mod tests {
869    use std::io::Write;
870
871    use bao_tree::{blake3, ChunkNum, ChunkRanges};
872    use futures_lite::StreamExt;
873    use iroh_io::TokioStreamReader;
874    use tests::test_support::{
875        decode_response_into_batch, local, make_wire_data, random_test_data, trickle, validate,
876    };
877    use tokio::task::JoinSet;
878
879    use super::*;
880    use crate::util::local_pool::LocalPool;
881
882    #[tokio::test]
883    async fn partial_downloads() {
884        local(async move {
885            let n = 1024 * 64u64;
886            let test_data = random_test_data(n as usize);
887            let temp_dir = tempfile::tempdir().unwrap();
888            let hash = blake3::hash(&test_data);
889            let handle = BaoFileHandle::incomplete_mem(
890                Arc::new(BaoFileConfig::new(
891                    Arc::new(temp_dir.as_ref().to_owned()),
892                    1024 * 16,
893                    None,
894                )),
895                hash.into(),
896            );
897            let mut tasks = JoinSet::new();
898            for i in 1..3 {
899                let file = handle.writer();
900                let range = (i * (n / 4))..((i + 1) * (n / 4));
901                println!("range: {:?}", range);
902                let (hash, chunk_ranges, wire_data) = make_wire_data(&test_data, &[range]);
903                let trickle = trickle(&wire_data, 1200, std::time::Duration::from_millis(10))
904                    .map(io::Result::Ok)
905                    .boxed();
906                let trickle = TokioStreamReader::new(tokio_util::io::StreamReader::new(trickle));
907                let _task = tasks.spawn_local(async move {
908                    decode_response_into_batch(hash, IROH_BLOCK_SIZE, chunk_ranges, trickle, file)
909                        .await
910                });
911            }
912            while let Some(res) = tasks.join_next().await {
913                res.unwrap().unwrap();
914            }
915            println!(
916                "len {:?} {:?}",
917                handle,
918                handle.data_reader().size().await.unwrap()
919            );
920            #[allow(clippy::single_range_in_vec_init)]
921            let ranges = [1024 * 16..1024 * 48];
922            validate(&handle, &test_data, &ranges).await;
923
924            // let ranges =
925            // let full_chunks = bao_tree::io::full_chunk_groups();
926            let mut encoded = Vec::new();
927            let ob = handle.outboard().unwrap();
928            encoded
929                .write_all(ob.tree.size().to_le_bytes().as_slice())
930                .unwrap();
931            bao_tree::io::fsm::encode_ranges_validated(
932                handle.data_reader(),
933                ob,
934                &ChunkRanges::from(ChunkNum(16)..ChunkNum(48)),
935                encoded,
936            )
937            .await
938            .unwrap();
939        })
940        .await;
941    }
942
943    #[tokio::test]
944    async fn concurrent_downloads() {
945        let n = 1024 * 32u64;
946        let test_data = random_test_data(n as usize);
947        let temp_dir = tempfile::tempdir().unwrap();
948        let hash = blake3::hash(&test_data);
949        let handle = BaoFileHandle::incomplete_mem(
950            Arc::new(BaoFileConfig::new(
951                Arc::new(temp_dir.as_ref().to_owned()),
952                1024 * 16,
953                None,
954            )),
955            hash.into(),
956        );
957        let local = LocalPool::default();
958        let mut tasks = Vec::new();
959        for i in 0..4 {
960            let file = handle.writer();
961            let range = (i * (n / 4))..((i + 1) * (n / 4));
962            println!("range: {:?}", range);
963            let (hash, chunk_ranges, wire_data) = make_wire_data(&test_data, &[range]);
964            let trickle = trickle(&wire_data, 1200, std::time::Duration::from_millis(10))
965                .map(io::Result::Ok)
966                .boxed();
967            let trickle = TokioStreamReader::new(tokio_util::io::StreamReader::new(trickle));
968            let task = local.spawn(move || async move {
969                decode_response_into_batch(hash, IROH_BLOCK_SIZE, chunk_ranges, trickle, file).await
970            });
971            tasks.push(task);
972        }
973        for task in tasks {
974            task.await.unwrap().unwrap();
975        }
976        println!(
977            "len {:?} {:?}",
978            handle,
979            handle.data_reader().size().await.unwrap()
980        );
981        #[allow(clippy::single_range_in_vec_init)]
982        let ranges = [0..n];
983        validate(&handle, &test_data, &ranges).await;
984
985        let mut encoded = Vec::new();
986        let ob = handle.outboard().unwrap();
987        encoded
988            .write_all(ob.tree.size().to_le_bytes().as_slice())
989            .unwrap();
990        bao_tree::io::fsm::encode_ranges_validated(
991            handle.data_reader(),
992            ob,
993            &ChunkRanges::all(),
994            encoded,
995        )
996        .await
997        .unwrap();
998    }
999
1000    #[tokio::test]
1001    async fn stay_in_mem() {
1002        let test_data = random_test_data(1024 * 17);
1003        #[allow(clippy::single_range_in_vec_init)]
1004        let ranges = [0..test_data.len().try_into().unwrap()];
1005        let (hash, chunk_ranges, wire_data) = make_wire_data(&test_data, &ranges);
1006        println!("file len is {:?}", chunk_ranges);
1007        let temp_dir = tempfile::tempdir().unwrap();
1008        let handle = BaoFileHandle::incomplete_mem(
1009            Arc::new(BaoFileConfig::new(
1010                Arc::new(temp_dir.as_ref().to_owned()),
1011                1024 * 16,
1012                None,
1013            )),
1014            hash,
1015        );
1016        decode_response_into_batch(
1017            hash,
1018            IROH_BLOCK_SIZE,
1019            chunk_ranges,
1020            wire_data.as_slice(),
1021            handle.writer(),
1022        )
1023        .await
1024        .unwrap();
1025        validate(&handle, &test_data, &ranges).await;
1026
1027        let mut encoded = Vec::new();
1028        let ob = handle.outboard().unwrap();
1029        encoded
1030            .write_all(ob.tree.size().to_le_bytes().as_slice())
1031            .unwrap();
1032        bao_tree::io::fsm::encode_ranges_validated(
1033            handle.data_reader(),
1034            ob,
1035            &ChunkRanges::all(),
1036            encoded,
1037        )
1038        .await
1039        .unwrap();
1040        println!("{:?}", handle);
1041    }
1042}