repo_stream/
process.rs

1/*!
2Record processor function output trait
3
4The return type must satisfy the `Processable` trait, which requires:
5
6- `Clone` because two rkeys can refer to the same record by CID, which may
7  only appear once in the CAR file.
8- `Serialize + DeserializeOwned` so it can be spilled to disk.
9
10One required function must be implemented, `get_size()`: this should return the
11approximate total off-stack size of the type. (the on-stack size will be added
12automatically via `std::mem::get_size`).
13
14Note that it is **not guaranteed** that the `process` function will run on a
15block before storing it in memory or on disk: it's not possible to know if a
16block is a record without actually walking the MST, so the best we can do is
17apply `process` to any block that we know *cannot* be an MST node, and otherwise
18store the raw block bytes.
19
20Here's a silly processing function that just collects 'eyy's found in the raw
21record bytes
22
23```
24# use repo_stream::Processable;
25# use serde::{Serialize, Deserialize};
26#[derive(Debug, Clone, Serialize, Deserialize)]
27struct Eyy(usize, String);
28
29impl Processable for Eyy {
30    fn get_size(&self) -> usize {
31        // don't need to compute the usize, it's on the stack
32        self.1.capacity() // in-mem size from the string's capacity, in bytes
33    }
34}
35
36fn process(raw: Vec<u8>) -> Vec<Eyy> {
37    let mut out = Vec::new();
38    let to_find = "eyy".as_bytes();
39    for i in 0..(raw.len() - 3) {
40        if &raw[i..(i+3)] == to_find {
41            out.push(Eyy(i, "eyy".to_string()));
42        }
43    }
44    out
45}
46```
47
48The memory sizing stuff is a little sketch but probably at least approximately
49works.
50*/
51
52use serde::{Serialize, de::DeserializeOwned};
53
54/// Output trait for record processing
55pub trait Processable: Clone + Serialize + DeserializeOwned {
56    /// Any additional in-memory size taken by the processed type
57    ///
58    /// Do not include stack size (`std::mem::size_of`)
59    fn get_size(&self) -> usize;
60}
61
62/// Processor that just returns the raw blocks
63#[inline]
64pub fn noop(block: Vec<u8>) -> Vec<u8> {
65    block
66}
67
68impl Processable for u8 {
69    fn get_size(&self) -> usize {
70        0
71    }
72}
73
74impl Processable for usize {
75    fn get_size(&self) -> usize {
76        0 // no additional space taken, just its stack size (newtype is free)
77    }
78}
79
80impl<Item: Sized + Processable> Processable for Vec<Item> {
81    fn get_size(&self) -> usize {
82        let slot_size = std::mem::size_of::<Item>();
83        let direct_size = slot_size * self.capacity();
84        let items_referenced_size: usize = self.iter().map(|item| item.get_size()).sum();
85        direct_size + items_referenced_size
86    }
87}