repo_stream/
drive.rs

1//! Consume an MST block stream, producing an ordered stream of records
2
3use futures::{Stream, TryStreamExt};
4use ipld_core::cid::Cid;
5use std::collections::HashMap;
6use std::error::Error;
7
8use crate::mst::{Commit, Node};
9use crate::walk::{Step, Trip, Walker};
10
11/// Errors that can happen while consuming and emitting blocks and records
12#[derive(Debug, thiserror::Error)]
13pub enum DriveError<E: Error> {
14    #[error("Failed to initialize CarReader: {0}")]
15    CarReader(#[from] iroh_car::Error),
16    #[error("Car block stream error: {0}")]
17    CarBlockError(Box<dyn Error>),
18    #[error("Failed to decode commit block: {0}")]
19    BadCommit(Box<dyn Error>),
20    #[error("The Commit block reference by the root was not found")]
21    MissingCommit,
22    #[error("The MST block {0} could not be found")]
23    MissingBlock(Cid),
24    #[error("Failed to walk the mst tree: {0}")]
25    Tripped(#[from] Trip<E>),
26}
27
28type CarBlock<E> = Result<(Cid, Vec<u8>), E>;
29
30/// Newtype because i'll mix up strings somewhere if i don't
31#[derive(Debug)]
32pub struct Rkey(pub String);
33
34#[derive(Debug)]
35pub enum MaybeProcessedBlock<T, E> {
36    /// A block that's *probably* a Node (but we can't know yet)
37    ///
38    /// It *can be* a record that suspiciously looks a lot like a node, so we
39    /// cannot eagerly turn it into a Node. We only know for sure what it is
40    /// when we actually walk down the MST
41    Raw(Vec<u8>),
42    /// A processed record from a block that was definitely not a Node
43    ///
44    /// Processing has to be fallible because the CAR can have totally-unused
45    /// blocks, which can just be garbage. since we're eagerly trying to process
46    /// record blocks without knowing for sure that they *are* records, we
47    /// discard any definitely-not-nodes that fail processing and keep their
48    /// error in the buffer for them. if we later try to retreive them as a
49    /// record, then we can surface the error.
50    ///
51    /// If we _never_ needed this block, then we may have wasted a bit of effort
52    /// trying to process it. Oh well.
53    ///
54    /// There's an alternative here, which would be to kick unprocessable blocks
55    /// back to Raw, or maybe even a new RawUnprocessable variant. Then we could
56    /// surface the typed error later if needed by trying to reprocess.
57    Processed(Result<T, E>),
58}
59
60/// The core driver between the block stream and MST walker
61pub struct Vehicle<SE, S, T, P, PE>
62where
63    S: Stream<Item = CarBlock<SE>>,
64    P: Fn(&[u8]) -> Result<T, PE>,
65    PE: Error,
66{
67    block_stream: S,
68    blocks: HashMap<Cid, MaybeProcessedBlock<T, PE>>,
69    walker: Walker,
70    process: P,
71}
72
73impl<SE, S, T: Clone, P, PE> Vehicle<SE, S, T, P, PE>
74where
75    SE: Error + 'static,
76    S: Stream<Item = CarBlock<SE>> + Unpin,
77    P: Fn(&[u8]) -> Result<T, PE>,
78    PE: Error,
79{
80    /// Set up the stream
81    ///
82    /// This will eagerly consume blocks until the `Commit` object is found.
83    /// *Usually* the it's the first block, but there is no guarantee.
84    ///
85    /// ### Parameters
86    ///
87    /// `root`: CID of the commit object that is the root of the MST
88    ///
89    /// `block_stream`: Input stream of raw CAR blocks
90    ///
91    /// `process`: record-transforming callback:
92    ///
93    /// For tasks where records can be quickly processed into a *smaller*
94    /// useful representation, you can do that eagerly as blocks come in by
95    /// passing the processor as a callback here. This can reduce overall
96    /// memory usage.
97    pub async fn init(
98        root: Cid,
99        mut block_stream: S,
100        process: P,
101    ) -> Result<(Commit, Self), DriveError<PE>> {
102        let mut blocks = HashMap::new();
103
104        let mut commit = None;
105
106        while let Some((cid, data)) = block_stream
107            .try_next()
108            .await
109            .map_err(|e| DriveError::CarBlockError(e.into()))?
110        {
111            if cid == root {
112                let c: Commit = serde_ipld_dagcbor::from_slice(&data)
113                    .map_err(|e| DriveError::BadCommit(e.into()))?;
114                commit = Some(c);
115                break;
116            } else {
117                blocks.insert(
118                    cid,
119                    if Node::could_be(&data) {
120                        MaybeProcessedBlock::Raw(data)
121                    } else {
122                        MaybeProcessedBlock::Processed(process(&data))
123                    },
124                );
125            }
126        }
127
128        // we either broke out or read all the blocks without finding the commit...
129        let commit = commit.ok_or(DriveError::MissingCommit)?;
130
131        let walker = Walker::new(commit.data);
132
133        let me = Self {
134            block_stream,
135            blocks,
136            walker,
137            process,
138        };
139        Ok((commit, me))
140    }
141
142    async fn drive_until(&mut self, cid_needed: Cid) -> Result<(), DriveError<PE>> {
143        while let Some((cid, data)) = self
144            .block_stream
145            .try_next()
146            .await
147            .map_err(|e| DriveError::CarBlockError(e.into()))?
148        {
149            self.blocks.insert(
150                cid,
151                if Node::could_be(&data) {
152                    MaybeProcessedBlock::Raw(data)
153                } else {
154                    MaybeProcessedBlock::Processed((self.process)(&data))
155                },
156            );
157            if cid == cid_needed {
158                return Ok(());
159            }
160        }
161
162        // if we never found the block
163        Err(DriveError::MissingBlock(cid_needed))
164    }
165
166    /// Manually step through the record outputs
167    pub async fn next_record(&mut self) -> Result<Option<(String, T)>, DriveError<PE>> {
168        loop {
169            // walk as far as we can until we run out of blocks or find a record
170            let cid_needed = match self.walker.step(&mut self.blocks, &self.process)? {
171                Step::Rest(cid) => cid,
172                Step::Finish => return Ok(None),
173                Step::Step { rkey, data } => return Ok(Some((rkey, data))),
174            };
175
176            // load blocks until we reach that cid
177            self.drive_until(cid_needed).await?;
178        }
179    }
180
181    /// Convert to a futures::stream of record outputs
182    pub fn stream(self) -> impl Stream<Item = Result<(String, T), DriveError<PE>>> {
183        futures::stream::try_unfold(self, |mut this| async move {
184            let maybe_record = this.next_record().await?;
185            Ok(maybe_record.map(|b| (b, this)))
186        })
187    }
188}