Skip to main content

hashtree_core/hashtree/
stream.rs

1use std::pin::Pin;
2
3use futures::stream::{self, Stream};
4
5use crate::codec::{decode_tree_node, is_tree_node};
6use crate::crypto::{decrypt_chk, EncryptionKey};
7use crate::store::Store;
8use crate::types::{to_hex, Cid, Hash};
9
10use super::{HashTree, HashTreeError};
11
12impl<S: Store> HashTree<S> {
13    /// Read content as a stream of chunks by Cid (handles decryption automatically)
14    ///
15    /// Returns an async stream that yields chunks as they are read.
16    /// Useful for large files or when you want to process data incrementally.
17    pub fn get_stream(
18        &self,
19        cid: &Cid,
20    ) -> Pin<Box<dyn Stream<Item = Result<Vec<u8>, HashTreeError>> + Send + '_>> {
21        let hash = cid.hash;
22        let key = cid.key;
23
24        if let Some(k) = key {
25            // Encrypted stream
26            Box::pin(self.read_file_stream_encrypted(hash, k))
27        } else {
28            // Unencrypted stream
29            self.read_file_stream(hash)
30        }
31    }
32
33    /// Read encrypted file as stream (internal)
34    fn read_file_stream_encrypted(
35        &self,
36        hash: Hash,
37        key: EncryptionKey,
38    ) -> impl Stream<Item = Result<Vec<u8>, HashTreeError>> + Send + '_ {
39        stream::unfold(
40            EncryptedStreamState::Init {
41                hash,
42                key,
43                tree: self,
44            },
45            |state| async move {
46                match state {
47                    EncryptedStreamState::Init { hash, key, tree } => {
48                        let data = match tree.store.get(&hash).await {
49                            Ok(Some(d)) => d,
50                            Ok(None) => return None,
51                            Err(e) => {
52                                return Some((
53                                    Err(HashTreeError::Store(e.to_string())),
54                                    EncryptedStreamState::Done,
55                                ))
56                            }
57                        };
58
59                        // Try to decrypt
60                        let decrypted = match decrypt_chk(&data, &key) {
61                            Ok(d) => d,
62                            Err(e) => {
63                                return Some((
64                                    Err(HashTreeError::Decryption(e.to_string())),
65                                    EncryptedStreamState::Done,
66                                ))
67                            }
68                        };
69
70                        if !is_tree_node(&decrypted) {
71                            // Single blob - yield decrypted data
72                            return Some((Ok(decrypted), EncryptedStreamState::Done));
73                        }
74
75                        // Tree node - parse and traverse
76                        let node = match decode_tree_node(&decrypted) {
77                            Ok(n) => n,
78                            Err(e) => {
79                                return Some((
80                                    Err(HashTreeError::Codec(e)),
81                                    EncryptedStreamState::Done,
82                                ))
83                            }
84                        };
85
86                        let mut stack: Vec<EncryptedStackItem> = Vec::new();
87                        for link in node.links.into_iter().rev() {
88                            stack.push(EncryptedStackItem {
89                                hash: link.hash,
90                                key: link.key,
91                            });
92                        }
93
94                        tree.process_encrypted_stream_stack(&mut stack).await
95                    }
96                    EncryptedStreamState::Processing { mut stack, tree } => {
97                        tree.process_encrypted_stream_stack(&mut stack).await
98                    }
99                    EncryptedStreamState::Done => None,
100                }
101            },
102        )
103    }
104
105    async fn process_encrypted_stream_stack<'a>(
106        &'a self,
107        stack: &mut Vec<EncryptedStackItem>,
108    ) -> Option<(Result<Vec<u8>, HashTreeError>, EncryptedStreamState<'a, S>)> {
109        while let Some(item) = stack.pop() {
110            let data = match self.store.get(&item.hash).await {
111                Ok(Some(d)) => d,
112                Ok(None) => {
113                    return Some((
114                        Err(HashTreeError::MissingChunk(to_hex(&item.hash))),
115                        EncryptedStreamState::Done,
116                    ))
117                }
118                Err(e) => {
119                    return Some((
120                        Err(HashTreeError::Store(e.to_string())),
121                        EncryptedStreamState::Done,
122                    ))
123                }
124            };
125
126            // Decrypt if we have a key
127            let decrypted = if let Some(key) = item.key {
128                match decrypt_chk(&data, &key) {
129                    Ok(d) => d,
130                    Err(e) => {
131                        return Some((
132                            Err(HashTreeError::Decryption(e.to_string())),
133                            EncryptedStreamState::Done,
134                        ))
135                    }
136                }
137            } else {
138                data
139            };
140
141            if is_tree_node(&decrypted) {
142                // Nested tree node - add children to stack
143                let node = match decode_tree_node(&decrypted) {
144                    Ok(n) => n,
145                    Err(e) => {
146                        return Some((Err(HashTreeError::Codec(e)), EncryptedStreamState::Done))
147                    }
148                };
149                for link in node.links.into_iter().rev() {
150                    stack.push(EncryptedStackItem {
151                        hash: link.hash,
152                        key: link.key,
153                    });
154                }
155            } else {
156                // Leaf chunk - yield decrypted data
157                return Some((
158                    Ok(decrypted),
159                    EncryptedStreamState::Processing {
160                        stack: std::mem::take(stack),
161                        tree: self,
162                    },
163                ));
164            }
165        }
166        None
167    }
168
169    /// Read a file as stream of chunks
170    /// Returns an async stream that yields chunks as they are read
171    pub fn read_file_stream(
172        &self,
173        hash: Hash,
174    ) -> Pin<Box<dyn Stream<Item = Result<Vec<u8>, HashTreeError>> + Send + '_>> {
175        Box::pin(stream::unfold(
176            ReadStreamState::Init { hash, tree: self },
177            |state| async move {
178                match state {
179                    ReadStreamState::Init { hash, tree } => {
180                        let data = match tree.store.get(&hash).await {
181                            Ok(Some(d)) => d,
182                            Ok(None) => return None,
183                            Err(e) => {
184                                return Some((
185                                    Err(HashTreeError::Store(e.to_string())),
186                                    ReadStreamState::Done,
187                                ))
188                            }
189                        };
190
191                        if !is_tree_node(&data) {
192                            // Single blob - yield it and finish
193                            return Some((Ok(data), ReadStreamState::Done));
194                        }
195
196                        // Tree node - start streaming chunks
197                        let node = match decode_tree_node(&data) {
198                            Ok(n) => n,
199                            Err(e) => {
200                                return Some((Err(HashTreeError::Codec(e)), ReadStreamState::Done))
201                            }
202                        };
203
204                        // Create stack with all links to process
205                        let mut stack: Vec<StreamStackItem> = Vec::new();
206                        for link in node.links.into_iter().rev() {
207                            stack.push(StreamStackItem::Hash(link.hash));
208                        }
209
210                        // Process first item
211                        tree.process_stream_stack(&mut stack).await
212                    }
213                    ReadStreamState::Processing { mut stack, tree } => {
214                        tree.process_stream_stack(&mut stack).await
215                    }
216                    ReadStreamState::Done => None,
217                }
218            },
219        ))
220    }
221
222    async fn process_stream_stack<'a>(
223        &'a self,
224        stack: &mut Vec<StreamStackItem>,
225    ) -> Option<(Result<Vec<u8>, HashTreeError>, ReadStreamState<'a, S>)> {
226        while let Some(item) = stack.pop() {
227            match item {
228                StreamStackItem::Hash(hash) => {
229                    let data = match self.store.get(&hash).await {
230                        Ok(Some(d)) => d,
231                        Ok(None) => {
232                            return Some((
233                                Err(HashTreeError::MissingChunk(to_hex(&hash))),
234                                ReadStreamState::Done,
235                            ))
236                        }
237                        Err(e) => {
238                            return Some((
239                                Err(HashTreeError::Store(e.to_string())),
240                                ReadStreamState::Done,
241                            ))
242                        }
243                    };
244
245                    if is_tree_node(&data) {
246                        // Nested tree - push its children to stack
247                        let node = match decode_tree_node(&data) {
248                            Ok(n) => n,
249                            Err(e) => {
250                                return Some((Err(HashTreeError::Codec(e)), ReadStreamState::Done))
251                            }
252                        };
253                        for link in node.links.into_iter().rev() {
254                            stack.push(StreamStackItem::Hash(link.hash));
255                        }
256                    } else {
257                        // Leaf blob - yield it
258                        return Some((
259                            Ok(data),
260                            ReadStreamState::Processing {
261                                stack: std::mem::take(stack),
262                                tree: self,
263                            },
264                        ));
265                    }
266                }
267            }
268        }
269        None
270    }
271}
272
273// Internal state types for streaming
274
275enum StreamStackItem {
276    Hash(Hash),
277}
278
279enum ReadStreamState<'a, S: Store> {
280    Init {
281        hash: Hash,
282        tree: &'a HashTree<S>,
283    },
284    Processing {
285        stack: Vec<StreamStackItem>,
286        tree: &'a HashTree<S>,
287    },
288    Done,
289}
290
291// Encrypted stream state types
292struct EncryptedStackItem {
293    hash: Hash,
294    key: Option<[u8; 32]>,
295}
296
297enum EncryptedStreamState<'a, S: Store> {
298    Init {
299        hash: Hash,
300        key: [u8; 32],
301        tree: &'a HashTree<S>,
302    },
303    Processing {
304        stack: Vec<EncryptedStackItem>,
305        tree: &'a HashTree<S>,
306    },
307    Done,
308}