repo-stream 0.3.0

Fast and robust atproto CAR file processing
Documentation
//! Depth-first MST traversal

use crate::mst::{Depth, MstNode, NodeThing, ThingKind};
use crate::{Bytes, HashMap, disk::DiskStore, drive::MaybeProcessedBlock};
use cid::Cid;
use std::convert::Infallible;

/// Errors that can happen while walking
#[derive(Debug, thiserror::Error)]
pub enum WalkError {
    #[error("Failed to fingerprint commit block")]
    BadCommitFingerprint,
    #[error("Failed to decode commit block: {0}")]
    BadCommit(#[from] serde_ipld_dagcbor::DecodeError<Infallible>),
    #[error("Action node error: {0}")]
    MstError(#[from] MstError),
    #[error("storage error: {0}")]
    StorageError(#[from] fjall::Error),
    #[error("block not found: {0}")]
    MissingBlock(Cid),
}

/// Errors from invalid Rkeys
#[derive(Debug, PartialEq, thiserror::Error)]
pub enum MstError {
    #[error("Nodes cannot be empty (except for an entirely empty MST)")]
    EmptyNode,
    #[error("Expected node to be at depth {expected}, but it was at {depth}")]
    WrongDepth { depth: Depth, expected: Depth },
    #[error("MST depth underflow: depth-0 node with child trees")]
    DepthUnderflow,
    #[error("Encountered rkey {rkey:?} which cannot follow the previous: {prev:?}")]
    RkeyOutOfOrder { prev: String, rkey: String },
}

/// Walker outputs
#[derive(Debug, PartialEq)]
pub struct Output {
    pub rkey: String,
    pub cid: Cid,
    pub data: Bytes,
}

/// Traverser of an atproto MST
///
/// Walks the tree from left-to-right in depth-first order
#[derive(Debug)]
pub struct Walker {
    prev_rkey: String,
    root_depth: Depth,
    todo: Vec<Vec<NodeThing>>,
}

impl Walker {
    pub fn new(root_node: MstNode) -> Self {
        Self {
            prev_rkey: "".to_string(),
            root_depth: root_node.depth.unwrap_or(0), // empty root node = empty mst
            todo: vec![root_node.things],
        }
    }

    fn mpb_step(
        &mut self,
        kind: ThingKind,
        cid: Cid,
        mpb: &MaybeProcessedBlock,
        process: impl Fn(Bytes) -> Bytes,
    ) -> Result<Option<Output>, WalkError> {
        match kind {
            ThingKind::Value { rkey } => {
                let data = match mpb {
                    MaybeProcessedBlock::Raw(data) => process(data.clone()),
                    MaybeProcessedBlock::Processed(t) => t.clone(),
                };

                if rkey <= self.prev_rkey {
                    return Err(WalkError::MstError(MstError::RkeyOutOfOrder {
                        rkey,
                        prev: self.prev_rkey.clone(),
                    }));
                }
                self.prev_rkey = rkey.clone();

                log::trace!("val @ {rkey}");
                Ok(Some(Output { rkey, cid, data }))
            }
            ThingKind::Tree => {
                let MaybeProcessedBlock::Raw(data) = mpb else {
                    return Err(WalkError::BadCommitFingerprint);
                };

                let node: MstNode =
                    serde_ipld_dagcbor::from_slice(data).map_err(WalkError::BadCommit)?;

                if node.is_empty() {
                    return Err(WalkError::MstError(MstError::EmptyNode));
                }

                let current_depth = self.root_depth - (self.todo.len() - 1) as u32;
                let next_depth = current_depth
                    .checked_sub(1)
                    .ok_or(MstError::DepthUnderflow)?;
                if let Some(d) = node.depth
                    && d != next_depth
                {
                    return Err(WalkError::MstError(MstError::WrongDepth {
                        depth: d,
                        expected: next_depth,
                    }));
                }

                log::trace!("node into depth {next_depth}");
                self.todo.push(node.things);
                Ok(None)
            }
        }
    }

    #[inline(always)]
    fn next_todo(&mut self) -> Option<NodeThing> {
        while let Some(last) = self.todo.last_mut() {
            let Some(thing) = last.pop() else {
                self.todo.pop();
                continue;
            };
            return Some(thing);
        }
        None
    }

    /// Advance through nodes until we find a record or can't go further
    pub fn step(
        &mut self,
        blocks: &mut HashMap<Cid, MaybeProcessedBlock>,
        process: impl Fn(Bytes) -> Bytes,
    ) -> Result<Option<Output>, WalkError> {
        while let Some(NodeThing { cid, kind }) = self.next_todo() {
            let Some(mpb) = blocks.get(&cid) else {
                return Err(WalkError::MissingBlock(cid));
            };
            if let Some(out) = self.mpb_step(kind, cid, mpb, &process)? {
                return Ok(Some(out));
            }
        }
        Ok(None)
    }

    /// blocking!!!!!!
    pub fn disk_step(
        &mut self,
        blocks: &mut DiskStore,
        process: impl Fn(Bytes) -> Bytes,
    ) -> Result<Option<Output>, WalkError> {
        while let Some(NodeThing { cid, kind }) = self.next_todo() {
            let Some(block_slice) = blocks.get(&cid.to_bytes())? else {
                return Err(WalkError::MissingBlock(cid));
            };
            let mpb = MaybeProcessedBlock::from_bytes(block_slice.to_vec());
            if let Some(out) = self.mpb_step(kind, cid, &mpb, &process)? {
                return Ok(Some(out));
            }
        }
        Ok(None)
    }
}