repo_stream/
walk.rs

1//! Depth-first MST traversal
2
3use crate::drive::{MaybeProcessedBlock, ProcRes};
4use crate::mst::Node;
5use ipld_core::cid::Cid;
6use std::collections::HashMap;
7use std::error::Error;
8
9#[derive(Debug, thiserror::Error)]
10pub enum Trip<E: Error> {
11    #[error("empty mst nodes are not allowed")]
12    NodeEmpty,
13    #[error("Failed to decode commit block: {0}")]
14    BadCommit(Box<dyn std::error::Error>),
15    #[error("Failed to process record: {0}")]
16    RecordFailedProcessing(Box<dyn Error>),
17    #[error("Action node error: {0}")]
18    ActionNode(#[from] ActionNodeError),
19    #[error("Process failed: {0}")]
20    ProcessFailed(E),
21}
22
23#[derive(Debug, thiserror::Error)]
24pub enum ActionNodeError {
25    #[error("Failed to compute an rkey due to invalid prefix_len")]
26    EntryPrefixOutOfbounds,
27    #[error("RKey was not utf-8")]
28    EntryRkeyNotUtf8(#[from] std::string::FromUtf8Error),
29}
30
31#[derive(Debug)]
32pub enum Step<T> {
33    Rest(Cid),
34    Finish,
35    Step { rkey: String, data: T },
36}
37
38#[derive(Debug, Clone, PartialEq)]
39enum Need {
40    Node(Cid),
41    Record { rkey: String, cid: Cid },
42}
43
44fn push_from_node(stack: &mut Vec<Need>, node: &Node) -> Result<(), ActionNodeError> {
45    let mut entries = Vec::with_capacity(node.entries.len());
46
47    let mut prefix = vec![];
48    for entry in &node.entries {
49        let mut rkey = vec![];
50        let pre_checked = prefix
51            .get(..entry.prefix_len)
52            .ok_or(ActionNodeError::EntryPrefixOutOfbounds)?;
53        rkey.extend_from_slice(pre_checked);
54        rkey.extend_from_slice(&entry.keysuffix);
55        prefix = rkey.clone();
56
57        entries.push(Need::Record {
58            rkey: String::from_utf8(rkey)?,
59            cid: entry.value,
60        });
61        if let Some(ref tree) = entry.tree {
62            entries.push(Need::Node(*tree));
63        }
64    }
65
66    entries.reverse();
67    stack.append(&mut entries);
68
69    if let Some(tree) = node.left {
70        stack.push(Need::Node(tree));
71    }
72    Ok(())
73}
74
75#[derive(Debug)]
76pub struct Walker {
77    stack: Vec<Need>,
78}
79
80impl Walker {
81    pub fn new(tree_root_cid: Cid) -> Self {
82        Self {
83            stack: vec![Need::Node(tree_root_cid)],
84        }
85    }
86
87    pub fn walk<T: Clone, E: Error>(
88        &mut self,
89        blocks: &mut HashMap<Cid, MaybeProcessedBlock<T, E>>,
90        process: impl Fn(&[u8]) -> ProcRes<T, E>,
91    ) -> Result<Step<T>, Trip<E>> {
92        loop {
93            let Some(mut need) = self.stack.last() else {
94                log::trace!("tried to walk but we're actually done.");
95                return Ok(Step::Finish);
96            };
97
98            match &mut need {
99                Need::Node(cid) => {
100                    log::trace!("need node {cid:?}");
101                    let Some(block) = blocks.remove(cid) else {
102                        log::trace!("node not found, resting");
103                        return Ok(Step::Rest(*cid));
104                    };
105
106                    let MaybeProcessedBlock::Raw(data) = block else {
107                        return Err(Trip::BadCommit("failed commit fingerprint".into()));
108                    };
109                    let node = serde_ipld_dagcbor::from_slice::<Node>(&data)
110                        .map_err(|e| Trip::BadCommit(e.into()))?;
111
112                    // found node, make sure we remember
113                    self.stack.pop();
114
115                    // queue up work on the found node next
116                    push_from_node(&mut self.stack, &node)?;
117                }
118                Need::Record { rkey, cid } => {
119                    log::trace!("need record {cid:?}");
120                    let Some(data) = blocks.get_mut(cid) else {
121                        log::trace!("record block not found, resting");
122                        return Ok(Step::Rest(*cid));
123                    };
124                    let rkey = rkey.clone();
125                    let data = match data {
126                        MaybeProcessedBlock::Raw(data) => process(data),
127                        MaybeProcessedBlock::Processed(Ok(t)) => Ok(t.clone()),
128                        bad => {
129                            // big hack to pull the error out -- this corrupts
130                            // a block, so we should not continue trying to work
131                            let mut steal = MaybeProcessedBlock::Raw(vec![]);
132                            std::mem::swap(&mut steal, bad);
133                            let MaybeProcessedBlock::Processed(Err(e)) = steal else {
134                                unreachable!();
135                            };
136                            return Err(Trip::ProcessFailed(e));
137                        }
138                    };
139
140                    // found node, make sure we remember
141                    self.stack.pop();
142
143                    log::trace!("emitting a block as a step. depth={}", self.stack.len());
144                    let data = data.map_err(Trip::ProcessFailed)?;
145                    return Ok(Step::Step { rkey, data });
146                }
147            }
148        }
149    }
150}
151
152#[cfg(test)]
153mod test {
154    use super::*;
155    // use crate::mst::Entry;
156
157    fn cid1() -> Cid {
158        "bafyreihixenvk3ahqbytas4hk4a26w43bh6eo3w6usjqtxkpzsvi655a3m"
159            .parse()
160            .unwrap()
161    }
162    //     fn cid2() -> Cid {
163    //         "QmY7Yh4UquoXHLPFo2XbhXkhBvFoPwmQUSa92pxnxjQuPU"
164    //             .parse()
165    //             .unwrap()
166    //     }
167    //     fn cid3() -> Cid {
168    //         "bafybeigdyrzt5sfp7udm7hu76uh7y26nf3efuylqabf3oclgtqy55fbzdi"
169    //             .parse()
170    //             .unwrap()
171    //     }
172    //     fn cid4() -> Cid {
173    //         "QmbWqxBEKC3P8tqsKc98xmWNzrzDtRLMiMPL8wBuTGsMnR"
174    //             .parse()
175    //             .unwrap()
176    //     }
177    //     fn cid5() -> Cid {
178    //         "QmSnuWmxptJZdLJpKRarxBMS2Ju2oANVrgbr2xWbie9b2D"
179    //             .parse()
180    //             .unwrap()
181    //     }
182    //     fn cid6() -> Cid {
183    //         "QmdmQXB2mzChmMeKY47C43LxUdg1NDJ5MWcKMKxDu7RgQm"
184    //             .parse()
185    //             .unwrap()
186    //     }
187    //     fn cid7() -> Cid {
188    //         "bafybeiaysi4s6lnjev27ln5icwm6tueaw2vdykrtjkwiphwekaywqhcjze"
189    //             .parse()
190    //             .unwrap()
191    //     }
192    //     fn cid8() -> Cid {
193    //         "bafyreif3tfdpr5n4jdrbielmcapwvbpcthepfkwq2vwonmlhirbjmotedi"
194    //             .parse()
195    //             .unwrap()
196    //     }
197    //     fn cid9() -> Cid {
198    //         "bafyreicnokmhmrnlp2wjhyk2haep4tqxiptwfrp2rrs7rzq7uk766chqvq"
199    //             .parse()
200    //             .unwrap()
201    //     }
202
203    #[test]
204    fn test_next_from_node_empty() {
205        let node = Node {
206            left: None,
207            entries: vec![],
208        };
209        let mut stack = vec![];
210        push_from_node(&mut stack, &node).unwrap();
211        assert_eq!(stack.last(), None);
212    }
213
214    #[test]
215    fn test_needs_from_node_just_left() {
216        let node = Node {
217            left: Some(cid1()),
218            entries: vec![],
219        };
220        let mut stack = vec![];
221        push_from_node(&mut stack, &node).unwrap();
222        assert_eq!(stack.last(), Some(Need::Node(cid1())).as_ref());
223    }
224
225    //     #[test]
226    //     fn test_needs_from_node_just_one_record() {
227    //         let node = Node {
228    //             left: None,
229    //             entries: vec![Entry {
230    //                 keysuffix: "asdf".into(),
231    //                 prefix_len: 0,
232    //                 value: cid1(),
233    //                 tree: None,
234    //             }],
235    //         };
236    //         assert_eq!(
237    //             needs_from_node(node).unwrap(),
238    //             vec![Need::Record {
239    //                 rkey: "asdf".into(),
240    //                 cid: cid1(),
241    //             },]
242    //         );
243    //     }
244
245    //     #[test]
246    //     fn test_needs_from_node_two_records() {
247    //         let node = Node {
248    //             left: None,
249    //             entries: vec![
250    //                 Entry {
251    //                     keysuffix: "asdf".into(),
252    //                     prefix_len: 0,
253    //                     value: cid1(),
254    //                     tree: None,
255    //                 },
256    //                 Entry {
257    //                     keysuffix: "gh".into(),
258    //                     prefix_len: 2,
259    //                     value: cid2(),
260    //                     tree: None,
261    //                 },
262    //             ],
263    //         };
264    //         assert_eq!(
265    //             needs_from_node(node).unwrap(),
266    //             vec![
267    //                 Need::Record {
268    //                     rkey: "asdf".into(),
269    //                     cid: cid1(),
270    //                 },
271    //                 Need::Record {
272    //                     rkey: "asgh".into(),
273    //                     cid: cid2(),
274    //                 },
275    //             ]
276    //         );
277    //     }
278
279    //     #[test]
280    //     fn test_needs_from_node_with_both() {
281    //         let node = Node {
282    //             left: None,
283    //             entries: vec![Entry {
284    //                 keysuffix: "asdf".into(),
285    //                 prefix_len: 0,
286    //                 value: cid1(),
287    //                 tree: Some(cid2()),
288    //             }],
289    //         };
290    //         assert_eq!(
291    //             needs_from_node(node).unwrap(),
292    //             vec![
293    //                 Need::Record {
294    //                     rkey: "asdf".into(),
295    //                     cid: cid1(),
296    //                 },
297    //                 Need::Node(cid2()),
298    //             ]
299    //         );
300    //     }
301
302    //     #[test]
303    //     fn test_needs_from_node_left_and_record() {
304    //         let node = Node {
305    //             left: Some(cid1()),
306    //             entries: vec![Entry {
307    //                 keysuffix: "asdf".into(),
308    //                 prefix_len: 0,
309    //                 value: cid2(),
310    //                 tree: None,
311    //             }],
312    //         };
313    //         assert_eq!(
314    //             needs_from_node(node).unwrap(),
315    //             vec![
316    //                 Need::Node(cid1()),
317    //                 Need::Record {
318    //                     rkey: "asdf".into(),
319    //                     cid: cid2(),
320    //                 },
321    //             ]
322    //         );
323    //     }
324
325    //     #[test]
326    //     fn test_needs_from_full_node() {
327    //         let node = Node {
328    //             left: Some(cid1()),
329    //             entries: vec![
330    //                 Entry {
331    //                     keysuffix: "asdf".into(),
332    //                     prefix_len: 0,
333    //                     value: cid2(),
334    //                     tree: Some(cid3()),
335    //                 },
336    //                 Entry {
337    //                     keysuffix: "ghi".into(),
338    //                     prefix_len: 1,
339    //                     value: cid4(),
340    //                     tree: Some(cid5()),
341    //                 },
342    //                 Entry {
343    //                     keysuffix: "jkl".into(),
344    //                     prefix_len: 2,
345    //                     value: cid6(),
346    //                     tree: Some(cid7()),
347    //                 },
348    //                 Entry {
349    //                     keysuffix: "mno".into(),
350    //                     prefix_len: 4,
351    //                     value: cid8(),
352    //                     tree: Some(cid9()),
353    //                 },
354    //             ],
355    //         };
356    //         assert_eq!(
357    //             needs_from_node(node).unwrap(),
358    //             vec![
359    //                 Need::Node(cid1()),
360    //                 Need::Record {
361    //                     rkey: "asdf".into(),
362    //                     cid: cid2(),
363    //                 },
364    //                 Need::Node(cid3()),
365    //                 Need::Record {
366    //                     rkey: "aghi".into(),
367    //                     cid: cid4(),
368    //                 },
369    //                 Need::Node(cid5()),
370    //                 Need::Record {
371    //                     rkey: "agjkl".into(),
372    //                     cid: cid6(),
373    //                 },
374    //                 Need::Node(cid7()),
375    //                 Need::Record {
376    //                     rkey: "agjkmno".into(),
377    //                     cid: cid8(),
378    //                 },
379    //                 Need::Node(cid9()),
380    //             ]
381    //         );
382    //     }
383}