Crate repo_stream

Crate repo_stream 

Source
Expand description

A robust CAR file -> MST walker for atproto

Small CARs have their blocks buffered in memory. If a configurable memory limit is reached while reading blocks, CAR reading is suspended, and can be continued by providing disk storage to buffer the CAR blocks instead.

A process function can be provided for tasks where records are transformed into a smaller representation, to save memory (and disk) during block reading.

Once blocks are loaded, the MST is walked and emitted as chunks of pairs of (rkey, processed_block) pairs, in order (depth first, left-to-right).

Some MST validations are applied

  • Keys must appear in order
  • Keys must be at the correct MST tree depth

iroh_car additionally applies a block size limit of 2MiB.

use repo_stream::{Driver, DriverBuilder, DiskBuilder};

let mut total_size = 0;

match DriverBuilder::new()
    .with_mem_limit_mb(10)
    .with_block_processor(|rec| rec.len()) // block processing: just extract the raw record size
    .load_car(reader)
    .await?
{

    // if all blocks fit within memory
    Driver::Memory(_commit, mut driver) => {
        while let Some(chunk) = driver.next_chunk(256).await? {
            for (_rkey, size) in chunk {
                total_size += size;
            }
        }
    },

    // if the CAR was too big for in-memory processing
    Driver::Disk(paused) => {
        // set up a disk store we can spill to
        let store = DiskBuilder::new().open("some/path.db".into()).await?;
        // do the spilling, get back a (similar) driver
        let (_commit, mut driver) = paused.finish_loading(store).await?;

        while let Some(chunk) = driver.next_chunk(256).await? {
            for (_rkey, size) in chunk {
                total_size += size;
            }
        }

        // clean up the disk store (drop tables etc)
        driver.reset_store().await?;
    }
};
println!("sum of size of all records: {total_size}");

Disk spilling suspends and returns a Driver::Disk(paused) instead of going ahead and eagerly using disk I/O. This means you have to write a bit more code to handle both cases, but it allows you to have finer control over resource usage. For example, you can drive a number of parallel memory CAR workers, and separately have a different number of disk workers picking up suspended disk tasks from a queue.

Find more examples in the repo.

Re-exports§

pub use disk::DiskBuilder;
pub use disk::DiskError;
pub use disk::DiskStore;
pub use drive::DriveError;
pub use drive::Driver;
pub use drive::DriverBuilder;
pub use drive::NeedDisk;
pub use mst::Commit;
pub use process::Processable;

Modules§

disk
Disk storage for blocks on disk
drive
Consume a CAR from an AsyncRead, producing an ordered stream of records
mst
Low-level types for parsing raw atproto MST CARs
process
Record processor function output trait