1use futures::{Stream, TryStreamExt};
4use ipld_core::cid::Cid;
5use std::collections::HashMap;
6use std::error::Error;
7
8use crate::mst::{Commit, Node};
9use crate::walk::{Step, Trip, Walker};
10
11#[derive(Debug, thiserror::Error)]
13pub enum DriveError<E: Error> {
14 #[error("Failed to initialize CarReader: {0}")]
15 CarReader(#[from] iroh_car::Error),
16 #[error("Car block stream error: {0}")]
17 CarBlockError(Box<dyn Error>),
18 #[error("Failed to decode commit block: {0}")]
19 BadCommit(Box<dyn Error>),
20 #[error("The Commit block reference by the root was not found")]
21 MissingCommit,
22 #[error("The MST block {0} could not be found")]
23 MissingBlock(Cid),
24 #[error("Failed to walk the mst tree: {0}")]
25 Tripped(#[from] Trip<E>),
26}
27
28type CarBlock<E> = Result<(Cid, Vec<u8>), E>;
29
30#[derive(Debug)]
32pub struct Rkey(pub String);
33
34#[derive(Debug)]
35pub enum MaybeProcessedBlock<T, E> {
36 Raw(Vec<u8>),
42 Processed(Result<T, E>),
58}
59
60pub struct Vehicle<SE, S, T, P, PE>
62where
63 S: Stream<Item = CarBlock<SE>>,
64 P: Fn(&[u8]) -> Result<T, PE>,
65 PE: Error,
66{
67 block_stream: S,
68 blocks: HashMap<Cid, MaybeProcessedBlock<T, PE>>,
69 walker: Walker,
70 process: P,
71}
72
73impl<SE, S, T: Clone, P, PE> Vehicle<SE, S, T, P, PE>
74where
75 SE: Error + 'static,
76 S: Stream<Item = CarBlock<SE>> + Unpin,
77 P: Fn(&[u8]) -> Result<T, PE>,
78 PE: Error,
79{
80 pub async fn init(
98 root: Cid,
99 mut block_stream: S,
100 process: P,
101 ) -> Result<(Commit, Self), DriveError<PE>> {
102 let mut blocks = HashMap::new();
103
104 let mut commit = None;
105
106 while let Some((cid, data)) = block_stream
107 .try_next()
108 .await
109 .map_err(|e| DriveError::CarBlockError(e.into()))?
110 {
111 if cid == root {
112 let c: Commit = serde_ipld_dagcbor::from_slice(&data)
113 .map_err(|e| DriveError::BadCommit(e.into()))?;
114 commit = Some(c);
115 break;
116 } else {
117 blocks.insert(
118 cid,
119 if Node::could_be(&data) {
120 MaybeProcessedBlock::Raw(data)
121 } else {
122 MaybeProcessedBlock::Processed(process(&data))
123 },
124 );
125 }
126 }
127
128 let commit = commit.ok_or(DriveError::MissingCommit)?;
130
131 let walker = Walker::new(commit.data);
132
133 let me = Self {
134 block_stream,
135 blocks,
136 walker,
137 process,
138 };
139 Ok((commit, me))
140 }
141
142 async fn drive_until(&mut self, cid_needed: Cid) -> Result<(), DriveError<PE>> {
143 while let Some((cid, data)) = self
144 .block_stream
145 .try_next()
146 .await
147 .map_err(|e| DriveError::CarBlockError(e.into()))?
148 {
149 self.blocks.insert(
150 cid,
151 if Node::could_be(&data) {
152 MaybeProcessedBlock::Raw(data)
153 } else {
154 MaybeProcessedBlock::Processed((self.process)(&data))
155 },
156 );
157 if cid == cid_needed {
158 return Ok(());
159 }
160 }
161
162 Err(DriveError::MissingBlock(cid_needed))
164 }
165
166 pub async fn next_record(&mut self) -> Result<Option<(String, T)>, DriveError<PE>> {
168 loop {
169 let cid_needed = match self.walker.step(&mut self.blocks, &self.process)? {
171 Step::Rest(cid) => cid,
172 Step::Finish => return Ok(None),
173 Step::Step { rkey, data } => return Ok(Some((rkey, data))),
174 };
175
176 self.drive_until(cid_needed).await?;
178 }
179 }
180
181 pub fn stream(self) -> impl Stream<Item = Result<(String, T), DriveError<PE>>> {
183 futures::stream::try_unfold(self, |mut this| async move {
184 let maybe_record = this.next_record().await?;
185 Ok(maybe_record.map(|b| (b, this)))
186 })
187 }
188}