repo_stream/
drive.rs

1use futures::{Stream, TryStreamExt};
2use ipld_core::cid::Cid;
3use std::collections::HashMap;
4use std::error::Error;
5
6use crate::mst::{Commit, Node};
7use crate::walk::{Step, Trip, Walker};
8
9#[derive(Debug, thiserror::Error)]
10pub enum DriveError<E: Error> {
11    #[error("Failed to initialize CarReader: {0}")]
12    CarReader(#[from] iroh_car::Error),
13    #[error("CAR file requires a root to be present")]
14    MissingRoot,
15    #[error("Car block stream error: {0}")]
16    CarBlockError(Box<dyn Error>),
17    #[error("Failed to decode commit block: {0}")]
18    BadCommit(Box<dyn Error>),
19    #[error("Failed to decode record block: {0}")]
20    BadRecord(Box<dyn Error>),
21    #[error("The Commit block reference by the root was not found")]
22    MissingCommit,
23    #[error("The MST block {0} could not be found")]
24    MissingBlock(Cid),
25    #[error("Failed to walk the mst tree: {0}")]
26    Tripped(#[from] Trip<E>),
27    #[error("Not finished walking, but no more blocks are available to continue")]
28    Dnf,
29}
30
31type CarBlock<E> = Result<(Cid, Vec<u8>), E>;
32
33#[derive(Debug)]
34pub struct Rkey(pub String);
35
36#[derive(Debug)]
37pub enum MaybeProcessedBlock<T, E> {
38    /// A block that's *probably* a Node (but we can't know yet)
39    ///
40    /// It *can be* a record that suspiciously looks a lot like a node, so we
41    /// cannot eagerly turn it into a Node. We only know for sure what it is
42    /// when we actually walk down the MST
43    Raw(Vec<u8>),
44    /// A processed record from a block that was definitely not a Node
45    ///
46    /// Processing has to be fallible because the CAR can have totally-unused
47    /// blocks, which can just be garbage. since we're eagerly trying to process
48    /// record blocks without knowing for sure that they *are* records, we
49    /// discard any definitely-not-nodes that fail processing and keep their
50    /// error in the buffer for them. if we later try to retreive them as a
51    /// record, then we can surface the error.
52    ///
53    /// If we _never_ needed this block, then we may have wasted a bit of effort
54    /// trying to process it. Oh well.
55    ///
56    /// It would be nice to store the real error type from the processing
57    /// function, but I'm leaving that generics puzzle for later.
58    ///
59    /// There's an alternative here, which would be to kick unprocessable blocks
60    /// back to Raw, or maybe even a new RawUnprocessable variant. Then we could
61    /// surface the typed error later if needed by trying to reprocess.
62    Processed(Result<T, E>),
63}
64
65// TODO: generic error not box dyn nonsense.
66pub type ProcRes<T, E> = Result<T, E>;
67
68pub struct Vehicle<SE, S, T, P, PE>
69where
70    S: Stream<Item = CarBlock<SE>>,
71    P: Fn(&[u8]) -> ProcRes<T, PE>,
72    PE: Error,
73{
74    block_stream: S,
75    blocks: HashMap<Cid, MaybeProcessedBlock<T, PE>>,
76    walker: Walker,
77    process: P,
78}
79
80impl<SE, S, T: Clone, P, PE> Vehicle<SE, S, T, P, PE>
81where
82    SE: Error + 'static,
83    S: Stream<Item = CarBlock<SE>> + Unpin,
84    P: Fn(&[u8]) -> ProcRes<T, PE>,
85    PE: Error,
86{
87    pub async fn init(
88        root: Cid,
89        mut block_stream: S,
90        process: P,
91    ) -> Result<(Commit, Self), DriveError<PE>> {
92        let mut blocks = HashMap::new();
93
94        let mut commit = None;
95
96        while let Some((cid, data)) = block_stream
97            .try_next()
98            .await
99            .map_err(|e| DriveError::CarBlockError(e.into()))?
100        {
101            if cid == root {
102                let c: Commit = serde_ipld_dagcbor::from_slice(&data)
103                    .map_err(|e| DriveError::BadCommit(e.into()))?;
104                commit = Some(c);
105                break; // inner while
106            } else {
107                blocks.insert(
108                    cid,
109                    if Node::could_be(&data) {
110                        MaybeProcessedBlock::Raw(data)
111                    } else {
112                        MaybeProcessedBlock::Processed(process(&data))
113                    },
114                );
115            }
116        }
117
118        // we either broke out or read all the blocks without finding the commit...
119        let commit = commit.ok_or(DriveError::MissingCommit)?;
120
121        let walker = Walker::new(commit.data);
122
123        let me = Self {
124            block_stream,
125            blocks,
126            walker,
127            process,
128        };
129        Ok((commit, me))
130    }
131
132    async fn drive_until(&mut self, cid_needed: Cid) -> Result<(), DriveError<PE>> {
133        while let Some((cid, data)) = self
134            .block_stream
135            .try_next()
136            .await
137            .map_err(|e| DriveError::CarBlockError(e.into()))?
138        {
139            self.blocks.insert(
140                cid,
141                if Node::could_be(&data) {
142                    MaybeProcessedBlock::Raw(data)
143                } else {
144                    MaybeProcessedBlock::Processed((self.process)(&data))
145                },
146            );
147            if cid == cid_needed {
148                return Ok(());
149            }
150        }
151
152        // if we never found the block
153        Err(DriveError::MissingBlock(cid_needed))
154    }
155
156    pub async fn next_record(&mut self) -> Result<Option<(Rkey, T)>, DriveError<PE>> {
157        loop {
158            // walk as far as we can until we run out of blocks or find a record
159            let cid_needed = match self.walker.walk(&mut self.blocks, &self.process)? {
160                Step::Rest(cid) => cid,
161                Step::Finish => return Ok(None),
162                Step::Step { rkey, data } => return Ok(Some((Rkey(rkey), data))),
163            };
164
165            // load blocks until we reach that cid
166            self.drive_until(cid_needed).await?;
167        }
168    }
169
170    pub fn stream(self) -> impl Stream<Item = Result<(Rkey, T), DriveError<PE>>> {
171        futures::stream::try_unfold(self, |mut this| async move {
172            let maybe_record = this.next_record().await?;
173            Ok(maybe_record.map(|b| (b, this)))
174        })
175    }
176}