1use futures::{Stream, TryStreamExt};
2use ipld_core::cid::Cid;
3use std::collections::HashMap;
4use std::error::Error;
5
6use crate::mst::{Commit, Node};
7use crate::walk::{Step, Trip, Walker};
8
9#[derive(Debug, thiserror::Error)]
10pub enum DriveError<E: Error> {
11 #[error("Failed to initialize CarReader: {0}")]
12 CarReader(#[from] iroh_car::Error),
13 #[error("CAR file requires a root to be present")]
14 MissingRoot,
15 #[error("Car block stream error: {0}")]
16 CarBlockError(Box<dyn Error>),
17 #[error("Failed to decode commit block: {0}")]
18 BadCommit(Box<dyn Error>),
19 #[error("Failed to decode record block: {0}")]
20 BadRecord(Box<dyn Error>),
21 #[error("The Commit block reference by the root was not found")]
22 MissingCommit,
23 #[error("The MST block {0} could not be found")]
24 MissingBlock(Cid),
25 #[error("Failed to walk the mst tree: {0}")]
26 Tripped(#[from] Trip<E>),
27 #[error("Not finished walking, but no more blocks are available to continue")]
28 Dnf,
29}
30
31type CarBlock<E> = Result<(Cid, Vec<u8>), E>;
32
33#[derive(Debug)]
34pub struct Rkey(pub String);
35
36#[derive(Debug)]
37pub enum MaybeProcessedBlock<T, E> {
38 Raw(Vec<u8>),
44 Processed(Result<T, E>),
63}
64
65pub type ProcRes<T, E> = Result<T, E>;
67
68pub struct Vehicle<SE, S, T, P, PE>
69where
70 S: Stream<Item = CarBlock<SE>>,
71 P: Fn(&[u8]) -> ProcRes<T, PE>,
72 PE: Error,
73{
74 block_stream: S,
75 blocks: HashMap<Cid, MaybeProcessedBlock<T, PE>>,
76 walker: Walker,
77 process: P,
78}
79
80impl<SE, S, T: Clone, P, PE> Vehicle<SE, S, T, P, PE>
81where
82 SE: Error + 'static,
83 S: Stream<Item = CarBlock<SE>> + Unpin,
84 P: Fn(&[u8]) -> ProcRes<T, PE>,
85 PE: Error,
86{
87 pub async fn init(
88 root: Cid,
89 mut block_stream: S,
90 process: P,
91 ) -> Result<(Commit, Self), DriveError<PE>> {
92 let mut blocks = HashMap::new();
93
94 let mut commit = None;
95
96 while let Some((cid, data)) = block_stream
97 .try_next()
98 .await
99 .map_err(|e| DriveError::CarBlockError(e.into()))?
100 {
101 if cid == root {
102 let c: Commit = serde_ipld_dagcbor::from_slice(&data)
103 .map_err(|e| DriveError::BadCommit(e.into()))?;
104 commit = Some(c);
105 break; } else {
107 blocks.insert(
108 cid,
109 if Node::could_be(&data) {
110 MaybeProcessedBlock::Raw(data)
111 } else {
112 MaybeProcessedBlock::Processed(process(&data))
113 },
114 );
115 }
116 }
117
118 let commit = commit.ok_or(DriveError::MissingCommit)?;
120
121 let walker = Walker::new(commit.data);
122
123 let me = Self {
124 block_stream,
125 blocks,
126 walker,
127 process,
128 };
129 Ok((commit, me))
130 }
131
132 async fn drive_until(&mut self, cid_needed: Cid) -> Result<(), DriveError<PE>> {
133 while let Some((cid, data)) = self
134 .block_stream
135 .try_next()
136 .await
137 .map_err(|e| DriveError::CarBlockError(e.into()))?
138 {
139 self.blocks.insert(
140 cid,
141 if Node::could_be(&data) {
142 MaybeProcessedBlock::Raw(data)
143 } else {
144 MaybeProcessedBlock::Processed((self.process)(&data))
145 },
146 );
147 if cid == cid_needed {
148 return Ok(());
149 }
150 }
151
152 Err(DriveError::MissingBlock(cid_needed))
154 }
155
156 pub async fn next_record(&mut self) -> Result<Option<(Rkey, T)>, DriveError<PE>> {
157 loop {
158 let cid_needed = match self.walker.walk(&mut self.blocks, &self.process)? {
160 Step::Rest(cid) => cid,
161 Step::Finish => return Ok(None),
162 Step::Step { rkey, data } => return Ok(Some((Rkey(rkey), data))),
163 };
164
165 self.drive_until(cid_needed).await?;
167 }
168 }
169
170 pub fn stream(self) -> impl Stream<Item = Result<(Rkey, T), DriveError<PE>>> {
171 futures::stream::try_unfold(self, |mut this| async move {
172 let maybe_record = this.next_record().await?;
173 Ok(maybe_record.map(|b| (b, this)))
174 })
175 }
176}