repo_stream/walk.rs
1//! Depth-first MST traversal
2
3use crate::drive::{MaybeProcessedBlock, ProcRes};
4use crate::mst::Node;
5use ipld_core::cid::Cid;
6use std::collections::HashMap;
7use std::error::Error;
8
9#[derive(Debug, thiserror::Error)]
10pub enum Trip<E: Error> {
11 #[error("empty mst nodes are not allowed")]
12 NodeEmpty,
13 #[error("Failed to decode commit block: {0}")]
14 BadCommit(Box<dyn std::error::Error>),
15 #[error("Failed to process record: {0}")]
16 RecordFailedProcessing(Box<dyn Error>),
17 #[error("Action node error: {0}")]
18 ActionNode(#[from] ActionNodeError),
19 #[error("Process failed: {0}")]
20 ProcessFailed(E),
21}
22
23#[derive(Debug, thiserror::Error)]
24pub enum ActionNodeError {
25 #[error("Failed to compute an rkey due to invalid prefix_len")]
26 EntryPrefixOutOfbounds,
27 #[error("RKey was not utf-8")]
28 EntryRkeyNotUtf8(#[from] std::string::FromUtf8Error),
29}
30
31#[derive(Debug)]
32pub enum Step<T> {
33 Rest(Cid),
34 Finish,
35 Step { rkey: String, data: T },
36}
37
38#[derive(Debug, Clone, PartialEq)]
39enum Need {
40 Node(Cid),
41 Record { rkey: String, cid: Cid },
42}
43
44fn push_from_node(stack: &mut Vec<Need>, node: &Node) -> Result<(), ActionNodeError> {
45 let mut entries = Vec::with_capacity(node.entries.len());
46
47 let mut prefix = vec![];
48 for entry in &node.entries {
49 let mut rkey = vec![];
50 let pre_checked = prefix
51 .get(..entry.prefix_len)
52 .ok_or(ActionNodeError::EntryPrefixOutOfbounds)?;
53 rkey.extend_from_slice(pre_checked);
54 rkey.extend_from_slice(&entry.keysuffix);
55 prefix = rkey.clone();
56
57 entries.push(Need::Record {
58 rkey: String::from_utf8(rkey)?,
59 cid: entry.value,
60 });
61 if let Some(ref tree) = entry.tree {
62 entries.push(Need::Node(*tree));
63 }
64 }
65
66 entries.reverse();
67 stack.append(&mut entries);
68
69 if let Some(tree) = node.left {
70 stack.push(Need::Node(tree));
71 }
72 Ok(())
73}
74
75#[derive(Debug)]
76pub struct Walker {
77 stack: Vec<Need>,
78}
79
80impl Walker {
81 pub fn new(tree_root_cid: Cid) -> Self {
82 Self {
83 stack: vec![Need::Node(tree_root_cid)],
84 }
85 }
86
87 pub fn walk<T: Clone, E: Error>(
88 &mut self,
89 blocks: &mut HashMap<Cid, MaybeProcessedBlock<T, E>>,
90 process: impl Fn(&[u8]) -> ProcRes<T, E>,
91 ) -> Result<Step<T>, Trip<E>> {
92 loop {
93 let Some(mut need) = self.stack.last() else {
94 log::trace!("tried to walk but we're actually done.");
95 return Ok(Step::Finish);
96 };
97
98 match &mut need {
99 Need::Node(cid) => {
100 log::trace!("need node {cid:?}");
101 let Some(block) = blocks.remove(cid) else {
102 log::trace!("node not found, resting");
103 return Ok(Step::Rest(*cid));
104 };
105
106 let MaybeProcessedBlock::Raw(data) = block else {
107 return Err(Trip::BadCommit("failed commit fingerprint".into()));
108 };
109 let node = serde_ipld_dagcbor::from_slice::<Node>(&data)
110 .map_err(|e| Trip::BadCommit(e.into()))?;
111
112 // found node, make sure we remember
113 self.stack.pop();
114
115 // queue up work on the found node next
116 push_from_node(&mut self.stack, &node)?;
117 }
118 Need::Record { rkey, cid } => {
119 log::trace!("need record {cid:?}");
120 let Some(data) = blocks.get_mut(cid) else {
121 log::trace!("record block not found, resting");
122 return Ok(Step::Rest(*cid));
123 };
124 let rkey = rkey.clone();
125 let data = match data {
126 MaybeProcessedBlock::Raw(data) => process(data),
127 MaybeProcessedBlock::Processed(Ok(t)) => Ok(t.clone()),
128 bad => {
129 // big hack to pull the error out -- this corrupts
130 // a block, so we should not continue trying to work
131 let mut steal = MaybeProcessedBlock::Raw(vec![]);
132 std::mem::swap(&mut steal, bad);
133 let MaybeProcessedBlock::Processed(Err(e)) = steal else {
134 unreachable!();
135 };
136 return Err(Trip::ProcessFailed(e));
137 }
138 };
139
140 // found node, make sure we remember
141 self.stack.pop();
142
143 log::trace!("emitting a block as a step. depth={}", self.stack.len());
144 let data = data.map_err(Trip::ProcessFailed)?;
145 return Ok(Step::Step { rkey, data });
146 }
147 }
148 }
149 }
150}
151
152#[cfg(test)]
153mod test {
154 use super::*;
155 // use crate::mst::Entry;
156
157 fn cid1() -> Cid {
158 "bafyreihixenvk3ahqbytas4hk4a26w43bh6eo3w6usjqtxkpzsvi655a3m"
159 .parse()
160 .unwrap()
161 }
162 // fn cid2() -> Cid {
163 // "QmY7Yh4UquoXHLPFo2XbhXkhBvFoPwmQUSa92pxnxjQuPU"
164 // .parse()
165 // .unwrap()
166 // }
167 // fn cid3() -> Cid {
168 // "bafybeigdyrzt5sfp7udm7hu76uh7y26nf3efuylqabf3oclgtqy55fbzdi"
169 // .parse()
170 // .unwrap()
171 // }
172 // fn cid4() -> Cid {
173 // "QmbWqxBEKC3P8tqsKc98xmWNzrzDtRLMiMPL8wBuTGsMnR"
174 // .parse()
175 // .unwrap()
176 // }
177 // fn cid5() -> Cid {
178 // "QmSnuWmxptJZdLJpKRarxBMS2Ju2oANVrgbr2xWbie9b2D"
179 // .parse()
180 // .unwrap()
181 // }
182 // fn cid6() -> Cid {
183 // "QmdmQXB2mzChmMeKY47C43LxUdg1NDJ5MWcKMKxDu7RgQm"
184 // .parse()
185 // .unwrap()
186 // }
187 // fn cid7() -> Cid {
188 // "bafybeiaysi4s6lnjev27ln5icwm6tueaw2vdykrtjkwiphwekaywqhcjze"
189 // .parse()
190 // .unwrap()
191 // }
192 // fn cid8() -> Cid {
193 // "bafyreif3tfdpr5n4jdrbielmcapwvbpcthepfkwq2vwonmlhirbjmotedi"
194 // .parse()
195 // .unwrap()
196 // }
197 // fn cid9() -> Cid {
198 // "bafyreicnokmhmrnlp2wjhyk2haep4tqxiptwfrp2rrs7rzq7uk766chqvq"
199 // .parse()
200 // .unwrap()
201 // }
202
203 #[test]
204 fn test_next_from_node_empty() {
205 let node = Node {
206 left: None,
207 entries: vec![],
208 };
209 let mut stack = vec![];
210 push_from_node(&mut stack, &node).unwrap();
211 assert_eq!(stack.last(), None);
212 }
213
214 #[test]
215 fn test_needs_from_node_just_left() {
216 let node = Node {
217 left: Some(cid1()),
218 entries: vec![],
219 };
220 let mut stack = vec![];
221 push_from_node(&mut stack, &node).unwrap();
222 assert_eq!(stack.last(), Some(Need::Node(cid1())).as_ref());
223 }
224
225 // #[test]
226 // fn test_needs_from_node_just_one_record() {
227 // let node = Node {
228 // left: None,
229 // entries: vec![Entry {
230 // keysuffix: "asdf".into(),
231 // prefix_len: 0,
232 // value: cid1(),
233 // tree: None,
234 // }],
235 // };
236 // assert_eq!(
237 // needs_from_node(node).unwrap(),
238 // vec![Need::Record {
239 // rkey: "asdf".into(),
240 // cid: cid1(),
241 // },]
242 // );
243 // }
244
245 // #[test]
246 // fn test_needs_from_node_two_records() {
247 // let node = Node {
248 // left: None,
249 // entries: vec![
250 // Entry {
251 // keysuffix: "asdf".into(),
252 // prefix_len: 0,
253 // value: cid1(),
254 // tree: None,
255 // },
256 // Entry {
257 // keysuffix: "gh".into(),
258 // prefix_len: 2,
259 // value: cid2(),
260 // tree: None,
261 // },
262 // ],
263 // };
264 // assert_eq!(
265 // needs_from_node(node).unwrap(),
266 // vec![
267 // Need::Record {
268 // rkey: "asdf".into(),
269 // cid: cid1(),
270 // },
271 // Need::Record {
272 // rkey: "asgh".into(),
273 // cid: cid2(),
274 // },
275 // ]
276 // );
277 // }
278
279 // #[test]
280 // fn test_needs_from_node_with_both() {
281 // let node = Node {
282 // left: None,
283 // entries: vec![Entry {
284 // keysuffix: "asdf".into(),
285 // prefix_len: 0,
286 // value: cid1(),
287 // tree: Some(cid2()),
288 // }],
289 // };
290 // assert_eq!(
291 // needs_from_node(node).unwrap(),
292 // vec![
293 // Need::Record {
294 // rkey: "asdf".into(),
295 // cid: cid1(),
296 // },
297 // Need::Node(cid2()),
298 // ]
299 // );
300 // }
301
302 // #[test]
303 // fn test_needs_from_node_left_and_record() {
304 // let node = Node {
305 // left: Some(cid1()),
306 // entries: vec![Entry {
307 // keysuffix: "asdf".into(),
308 // prefix_len: 0,
309 // value: cid2(),
310 // tree: None,
311 // }],
312 // };
313 // assert_eq!(
314 // needs_from_node(node).unwrap(),
315 // vec![
316 // Need::Node(cid1()),
317 // Need::Record {
318 // rkey: "asdf".into(),
319 // cid: cid2(),
320 // },
321 // ]
322 // );
323 // }
324
325 // #[test]
326 // fn test_needs_from_full_node() {
327 // let node = Node {
328 // left: Some(cid1()),
329 // entries: vec![
330 // Entry {
331 // keysuffix: "asdf".into(),
332 // prefix_len: 0,
333 // value: cid2(),
334 // tree: Some(cid3()),
335 // },
336 // Entry {
337 // keysuffix: "ghi".into(),
338 // prefix_len: 1,
339 // value: cid4(),
340 // tree: Some(cid5()),
341 // },
342 // Entry {
343 // keysuffix: "jkl".into(),
344 // prefix_len: 2,
345 // value: cid6(),
346 // tree: Some(cid7()),
347 // },
348 // Entry {
349 // keysuffix: "mno".into(),
350 // prefix_len: 4,
351 // value: cid8(),
352 // tree: Some(cid9()),
353 // },
354 // ],
355 // };
356 // assert_eq!(
357 // needs_from_node(node).unwrap(),
358 // vec![
359 // Need::Node(cid1()),
360 // Need::Record {
361 // rkey: "asdf".into(),
362 // cid: cid2(),
363 // },
364 // Need::Node(cid3()),
365 // Need::Record {
366 // rkey: "aghi".into(),
367 // cid: cid4(),
368 // },
369 // Need::Node(cid5()),
370 // Need::Record {
371 // rkey: "agjkl".into(),
372 // cid: cid6(),
373 // },
374 // Need::Node(cid7()),
375 // Need::Record {
376 // rkey: "agjkmno".into(),
377 // cid: cid8(),
378 // },
379 // Need::Node(cid9()),
380 // ]
381 // );
382 // }
383}