1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
/*!
A robust CAR file -> MST walker for atproto
Small CARs have their blocks buffered in memory. If a configurable memory limit
is reached while reading blocks, CAR reading is suspended, and can be continued
by providing disk storage to buffer the CAR blocks instead.
A `process` function can be provided for tasks where records are transformed
into a smaller representation, to save memory (and disk) during block reading.
Once blocks are loaded, the MST is walked and emitted as chunks of pairs of
`(rkey, processed_block)` pairs, in order (depth first, left-to-right).
Some MST validations are applied
- Keys must appear in order
- Keys must be at the correct MST tree depth
`iroh_car` additionally applies a block size limit of `2MiB`.
```
use repo_stream::{Driver, DriverBuilder, DiskBuilder};
# #[tokio::main]
# async fn main() -> Result<(), Box<dyn std::error::Error>> {
# let reader = include_bytes!("../car-samples/tiny.car").as_slice();
let mut total_size = 0;
match DriverBuilder::new()
.with_mem_limit_mb(10)
.with_block_processor(
|rec| rec.len().to_ne_bytes().to_vec()
) // block processing: just extract the raw record size
.load_car(reader)
.await?
{
// if all blocks fit within memory
Driver::Memory(_commit, mut driver) => {
while let Some(chunk) = driver.next_chunk(256).await? {
for (_rkey, bytes) in chunk {
let size = usize::from_ne_bytes(bytes.try_into().unwrap());
total_size += size;
}
}
},
// if the CAR was too big for in-memory processing
Driver::Disk(paused) => {
// set up a disk store we can spill to
let store = DiskBuilder::new().open("some/path.db".into()).await?;
// do the spilling, get back a (similar) driver
let (_commit, mut driver) = paused.finish_loading(store).await?;
while let Some(chunk) = driver.next_chunk(256).await? {
for (_rkey, bytes) in chunk {
let size = usize::from_ne_bytes(bytes.try_into().unwrap());
total_size += size;
}
}
}
};
println!("sum of size of all records: {total_size}");
# Ok(())
# }
```
Disk spilling suspends and returns a `Driver::Disk(paused)` instead of going
ahead and eagerly using disk I/O. This means you have to write a bit more code
to handle both cases, but it allows you to have finer control over resource
usage. For example, you can drive a number of parallel memory CAR workers, and
separately have a different number of disk workers picking up suspended disk
tasks from a queue.
Find more [examples in the repo](https://tangled.org/@microcosm.blue/repo-stream/tree/main/examples).
*/
pub use ;
pub use ;
pub use Commit;
pub type Bytes = ;
pub use HashMap;
;