1use std::pin::Pin;
2
3use futures::stream::{self, Stream};
4
5use crate::codec::{decode_tree_node, is_tree_node};
6use crate::crypto::{decrypt_chk, EncryptionKey};
7use crate::store::Store;
8use crate::types::{to_hex, Cid, Hash};
9
10use super::{HashTree, HashTreeError};
11
12impl<S: Store> HashTree<S> {
13 pub fn get_stream(
18 &self,
19 cid: &Cid,
20 ) -> Pin<Box<dyn Stream<Item = Result<Vec<u8>, HashTreeError>> + Send + '_>> {
21 let hash = cid.hash;
22 let key = cid.key;
23
24 if let Some(k) = key {
25 Box::pin(self.read_file_stream_encrypted(hash, k))
27 } else {
28 self.read_file_stream(hash)
30 }
31 }
32
33 fn read_file_stream_encrypted(
35 &self,
36 hash: Hash,
37 key: EncryptionKey,
38 ) -> impl Stream<Item = Result<Vec<u8>, HashTreeError>> + Send + '_ {
39 stream::unfold(
40 EncryptedStreamState::Init {
41 hash,
42 key,
43 tree: self,
44 },
45 |state| async move {
46 match state {
47 EncryptedStreamState::Init { hash, key, tree } => {
48 let data = match tree.store.get(&hash).await {
49 Ok(Some(d)) => d,
50 Ok(None) => return None,
51 Err(e) => {
52 return Some((
53 Err(HashTreeError::Store(e.to_string())),
54 EncryptedStreamState::Done,
55 ))
56 }
57 };
58
59 let decrypted = match decrypt_chk(&data, &key) {
61 Ok(d) => d,
62 Err(e) => {
63 return Some((
64 Err(HashTreeError::Decryption(e.to_string())),
65 EncryptedStreamState::Done,
66 ))
67 }
68 };
69
70 if !is_tree_node(&decrypted) {
71 return Some((Ok(decrypted), EncryptedStreamState::Done));
73 }
74
75 let node = match decode_tree_node(&decrypted) {
77 Ok(n) => n,
78 Err(e) => {
79 return Some((
80 Err(HashTreeError::Codec(e)),
81 EncryptedStreamState::Done,
82 ))
83 }
84 };
85
86 let mut stack: Vec<EncryptedStackItem> = Vec::new();
87 for link in node.links.into_iter().rev() {
88 stack.push(EncryptedStackItem {
89 hash: link.hash,
90 key: link.key,
91 });
92 }
93
94 tree.process_encrypted_stream_stack(&mut stack).await
95 }
96 EncryptedStreamState::Processing { mut stack, tree } => {
97 tree.process_encrypted_stream_stack(&mut stack).await
98 }
99 EncryptedStreamState::Done => None,
100 }
101 },
102 )
103 }
104
105 async fn process_encrypted_stream_stack<'a>(
106 &'a self,
107 stack: &mut Vec<EncryptedStackItem>,
108 ) -> Option<(Result<Vec<u8>, HashTreeError>, EncryptedStreamState<'a, S>)> {
109 while let Some(item) = stack.pop() {
110 let data = match self.store.get(&item.hash).await {
111 Ok(Some(d)) => d,
112 Ok(None) => {
113 return Some((
114 Err(HashTreeError::MissingChunk(to_hex(&item.hash))),
115 EncryptedStreamState::Done,
116 ))
117 }
118 Err(e) => {
119 return Some((
120 Err(HashTreeError::Store(e.to_string())),
121 EncryptedStreamState::Done,
122 ))
123 }
124 };
125
126 let decrypted = if let Some(key) = item.key {
128 match decrypt_chk(&data, &key) {
129 Ok(d) => d,
130 Err(e) => {
131 return Some((
132 Err(HashTreeError::Decryption(e.to_string())),
133 EncryptedStreamState::Done,
134 ))
135 }
136 }
137 } else {
138 data
139 };
140
141 if is_tree_node(&decrypted) {
142 let node = match decode_tree_node(&decrypted) {
144 Ok(n) => n,
145 Err(e) => {
146 return Some((Err(HashTreeError::Codec(e)), EncryptedStreamState::Done))
147 }
148 };
149 for link in node.links.into_iter().rev() {
150 stack.push(EncryptedStackItem {
151 hash: link.hash,
152 key: link.key,
153 });
154 }
155 } else {
156 return Some((
158 Ok(decrypted),
159 EncryptedStreamState::Processing {
160 stack: std::mem::take(stack),
161 tree: self,
162 },
163 ));
164 }
165 }
166 None
167 }
168
169 pub fn read_file_stream(
172 &self,
173 hash: Hash,
174 ) -> Pin<Box<dyn Stream<Item = Result<Vec<u8>, HashTreeError>> + Send + '_>> {
175 Box::pin(stream::unfold(
176 ReadStreamState::Init { hash, tree: self },
177 |state| async move {
178 match state {
179 ReadStreamState::Init { hash, tree } => {
180 let data = match tree.store.get(&hash).await {
181 Ok(Some(d)) => d,
182 Ok(None) => return None,
183 Err(e) => {
184 return Some((
185 Err(HashTreeError::Store(e.to_string())),
186 ReadStreamState::Done,
187 ))
188 }
189 };
190
191 if !is_tree_node(&data) {
192 return Some((Ok(data), ReadStreamState::Done));
194 }
195
196 let node = match decode_tree_node(&data) {
198 Ok(n) => n,
199 Err(e) => {
200 return Some((Err(HashTreeError::Codec(e)), ReadStreamState::Done))
201 }
202 };
203
204 let mut stack: Vec<StreamStackItem> = Vec::new();
206 for link in node.links.into_iter().rev() {
207 stack.push(StreamStackItem::Hash(link.hash));
208 }
209
210 tree.process_stream_stack(&mut stack).await
212 }
213 ReadStreamState::Processing { mut stack, tree } => {
214 tree.process_stream_stack(&mut stack).await
215 }
216 ReadStreamState::Done => None,
217 }
218 },
219 ))
220 }
221
222 async fn process_stream_stack<'a>(
223 &'a self,
224 stack: &mut Vec<StreamStackItem>,
225 ) -> Option<(Result<Vec<u8>, HashTreeError>, ReadStreamState<'a, S>)> {
226 while let Some(item) = stack.pop() {
227 match item {
228 StreamStackItem::Hash(hash) => {
229 let data = match self.store.get(&hash).await {
230 Ok(Some(d)) => d,
231 Ok(None) => {
232 return Some((
233 Err(HashTreeError::MissingChunk(to_hex(&hash))),
234 ReadStreamState::Done,
235 ))
236 }
237 Err(e) => {
238 return Some((
239 Err(HashTreeError::Store(e.to_string())),
240 ReadStreamState::Done,
241 ))
242 }
243 };
244
245 if is_tree_node(&data) {
246 let node = match decode_tree_node(&data) {
248 Ok(n) => n,
249 Err(e) => {
250 return Some((Err(HashTreeError::Codec(e)), ReadStreamState::Done))
251 }
252 };
253 for link in node.links.into_iter().rev() {
254 stack.push(StreamStackItem::Hash(link.hash));
255 }
256 } else {
257 return Some((
259 Ok(data),
260 ReadStreamState::Processing {
261 stack: std::mem::take(stack),
262 tree: self,
263 },
264 ));
265 }
266 }
267 }
268 }
269 None
270 }
271}
272
273enum StreamStackItem {
276 Hash(Hash),
277}
278
279enum ReadStreamState<'a, S: Store> {
280 Init {
281 hash: Hash,
282 tree: &'a HashTree<S>,
283 },
284 Processing {
285 stack: Vec<StreamStackItem>,
286 tree: &'a HashTree<S>,
287 },
288 Done,
289}
290
291struct EncryptedStackItem {
293 hash: Hash,
294 key: Option<[u8; 32]>,
295}
296
297enum EncryptedStreamState<'a, S: Store> {
298 Init {
299 hash: Hash,
300 key: [u8; 32],
301 tree: &'a HashTree<S>,
302 },
303 Processing {
304 stack: Vec<EncryptedStackItem>,
305 tree: &'a HashTree<S>,
306 },
307 Done,
308}