repo_stream/
lib.rs

1/*!
2A robust CAR file -> MST walker for atproto
3
4Small CARs have their blocks buffered in memory. If a configurable memory limit
5is reached while reading blocks, CAR reading is suspended, and can be continued
6by providing disk storage to buffer the CAR blocks instead.
7
8A `process` function can be provided for tasks where records are transformed
9into a smaller representation, to save memory (and disk) during block reading.
10
11Once blocks are loaded, the MST is walked and emitted as chunks of pairs of
12`(rkey, processed_block)` pairs, in order (depth first, left-to-right).
13
14Some MST validations are applied
15- Keys must appear in order
16- Keys must be at the correct MST tree depth
17
18`iroh_car` additionally applies a block size limit of `2MiB`.
19
20```
21use repo_stream::{Driver, DriverBuilder, DiskBuilder};
22
23# #[tokio::main]
24# async fn main() -> Result<(), Box<dyn std::error::Error>> {
25# let reader = include_bytes!("../car-samples/tiny.car").as_slice();
26let mut total_size = 0;
27
28match DriverBuilder::new()
29    .with_mem_limit_mb(10)
30    .with_block_processor(|rec| rec.len()) // block processing: just extract the raw record size
31    .load_car(reader)
32    .await?
33{
34
35    // if all blocks fit within memory
36    Driver::Memory(_commit, mut driver) => {
37        while let Some(chunk) = driver.next_chunk(256).await? {
38            for (_rkey, size) in chunk {
39                total_size += size;
40            }
41        }
42    },
43
44    // if the CAR was too big for in-memory processing
45    Driver::Disk(paused) => {
46        // set up a disk store we can spill to
47        let store = DiskBuilder::new().open("some/path.db".into()).await?;
48        // do the spilling, get back a (similar) driver
49        let (_commit, mut driver) = paused.finish_loading(store).await?;
50
51        while let Some(chunk) = driver.next_chunk(256).await? {
52            for (_rkey, size) in chunk {
53                total_size += size;
54            }
55        }
56
57        // clean up the disk store (drop tables etc)
58        driver.reset_store().await?;
59    }
60};
61println!("sum of size of all records: {total_size}");
62# Ok(())
63# }
64```
65
66Disk spilling suspends and returns a `Driver::Disk(paused)` instead of going
67ahead and eagerly using disk I/O. This means you have to write a bit more code
68to handle both cases, but it allows you to have finer control over resource
69usage. For example, you can drive a number of parallel memory CAR workers, and
70separately have a different number of disk workers picking up suspended disk
71tasks from a queue.
72
73Find more [examples in the repo](https://tangled.org/@microcosm.blue/repo-stream/tree/main/examples).
74
75*/
76
77pub mod mst;
78mod walk;
79
80pub mod disk;
81pub mod drive;
82pub mod process;
83
84pub use disk::{DiskBuilder, DiskError, DiskStore};
85pub use drive::{DriveError, Driver, DriverBuilder};
86pub use mst::Commit;
87pub use process::Processable;