1use std::ops::{Bound, RangeBounds};
2
3use bao_tree::{io::round_up_to_chunks, ChunkNum, ChunkRanges};
4use range_collections::{range_set::RangeSetEntry, RangeSet2};
5
6pub mod channel;
7pub(crate) mod temp_tag;
8pub mod serde {
9 pub mod io_error_serde {
11 use std::{fmt, io};
12
13 use serde::{
14 de::{self, Visitor},
15 Deserializer, Serializer,
16 };
17
18 pub fn serialize<S>(error: &io::Error, serializer: S) -> Result<S::Ok, S::Error>
19 where
20 S: Serializer,
21 {
22 serializer.serialize_str(&format!("{:?}:{}", error.kind(), error))
24 }
25
26 pub fn deserialize<'de, D>(deserializer: D) -> Result<io::Error, D::Error>
27 where
28 D: Deserializer<'de>,
29 {
30 struct IoErrorVisitor;
31
32 impl<'de> Visitor<'de> for IoErrorVisitor {
33 type Value = io::Error;
34
35 fn expecting(&self, formatter: &mut fmt::Formatter) -> fmt::Result {
36 formatter.write_str("an io::Error string representation")
37 }
38
39 fn visit_str<E>(self, value: &str) -> Result<Self::Value, E>
40 where
41 E: de::Error,
42 {
43 Ok(io::Error::other(value))
46 }
47 }
48
49 deserializer.deserialize_str(IoErrorVisitor)
50 }
51 }
52}
53
54pub trait ChunkRangesExt {
55 fn last_chunk() -> Self;
56 fn chunk(offset: u64) -> Self;
57 fn bytes(ranges: impl RangeBounds<u64>) -> Self;
58 fn chunks(ranges: impl RangeBounds<u64>) -> Self;
59 fn offset(offset: u64) -> Self;
60}
61
62impl ChunkRangesExt for ChunkRanges {
63 fn last_chunk() -> Self {
64 ChunkRanges::from(ChunkNum(u64::MAX)..)
65 }
66
67 fn chunk(offset: u64) -> Self {
69 ChunkRanges::from(ChunkNum(offset)..ChunkNum(offset + 1))
70 }
71
72 fn bytes(ranges: impl RangeBounds<u64>) -> Self {
75 round_up_to_chunks(&bounds_from_range(ranges, |v| v))
76 }
77
78 fn chunks(ranges: impl RangeBounds<u64>) -> Self {
82 bounds_from_range(ranges, ChunkNum)
83 }
84
85 fn offset(offset: u64) -> Self {
87 Self::bytes(offset..offset + 1)
88 }
89}
90
91pub(crate) fn bounds_from_range<R, T, F>(range: R, f: F) -> RangeSet2<T>
93where
94 R: RangeBounds<u64>,
95 T: RangeSetEntry,
96 F: Fn(u64) -> T,
97{
98 let from = match range.start_bound() {
99 Bound::Included(start) => Some(*start),
100 Bound::Excluded(start) => {
101 let Some(start) = start.checked_add(1) else {
102 return RangeSet2::empty();
103 };
104 Some(start)
105 }
106 Bound::Unbounded => None,
107 };
108 let to = match range.end_bound() {
109 Bound::Included(end) => end.checked_add(1),
110 Bound::Excluded(end) => Some(*end),
111 Bound::Unbounded => None,
112 };
113 match (from, to) {
114 (Some(from), Some(to)) => RangeSet2::from(f(from)..f(to)),
115 (Some(from), None) => RangeSet2::from(f(from)..),
116 (None, Some(to)) => RangeSet2::from(..f(to)),
117 (None, None) => RangeSet2::all(),
118 }
119}
120
121pub mod outboard_with_progress {
122 use std::io::{self, BufReader, Read};
123
124 use bao_tree::{
125 blake3,
126 io::{
127 outboard::PreOrderOutboard,
128 sync::{OutboardMut, WriteAt},
129 },
130 iter::BaoChunk,
131 BaoTree, ChunkNum,
132 };
133 use smallvec::SmallVec;
134
135 use super::sink::Sink;
136
137 fn hash_subtree(start_chunk: u64, data: &[u8], is_root: bool) -> blake3::Hash {
138 use blake3::hazmat::{ChainingValue, HasherExt};
139 if is_root {
140 debug_assert!(start_chunk == 0);
141 blake3::hash(data)
142 } else {
143 let mut hasher = blake3::Hasher::new();
144 hasher.set_input_offset(start_chunk * 1024);
145 hasher.update(data);
146 let non_root_hash: ChainingValue = hasher.finalize_non_root();
147 blake3::Hash::from(non_root_hash)
148 }
149 }
150
151 fn parent_cv(
152 left_child: &blake3::Hash,
153 right_child: &blake3::Hash,
154 is_root: bool,
155 ) -> blake3::Hash {
156 use blake3::hazmat::{merge_subtrees_non_root, merge_subtrees_root, ChainingValue, Mode};
157 let left_child: ChainingValue = *left_child.as_bytes();
158 let right_child: ChainingValue = *right_child.as_bytes();
159 if is_root {
160 merge_subtrees_root(&left_child, &right_child, Mode::Hash)
161 } else {
162 blake3::Hash::from(merge_subtrees_non_root(
163 &left_child,
164 &right_child,
165 Mode::Hash,
166 ))
167 }
168 }
169
170 pub async fn init_outboard<R, W, P>(
171 data: R,
172 outboard: &mut PreOrderOutboard<W>,
173 progress: &mut P,
174 ) -> std::io::Result<std::result::Result<(), P::Error>>
175 where
176 W: WriteAt,
177 R: Read,
178 P: Sink<ChunkNum>,
179 {
180 let size = usize::try_from(outboard.tree.size()).unwrap_or(usize::MAX);
183 let read_buf_size = size.min(1024 * 1024);
184 let chunk_buf_size = size.min(outboard.tree.block_size().bytes());
185 let reader = BufReader::with_capacity(read_buf_size, data);
186 let mut buffer = SmallVec::<[u8; 128]>::from_elem(0u8, chunk_buf_size);
187 let res = init_impl(outboard.tree, reader, outboard, &mut buffer, progress).await?;
188 Ok(res)
189 }
190
191 async fn init_impl<W, P>(
192 tree: BaoTree,
193 mut data: impl Read,
194 outboard: &mut PreOrderOutboard<W>,
195 buffer: &mut [u8],
196 progress: &mut P,
197 ) -> io::Result<std::result::Result<(), P::Error>>
198 where
199 W: WriteAt,
200 P: Sink<ChunkNum>,
201 {
202 let mut stack = SmallVec::<[blake3::Hash; 10]>::new();
204 for item in tree.post_order_chunks_iter() {
206 match item {
207 BaoChunk::Parent { is_root, node, .. } => {
208 let right_hash = stack.pop().unwrap();
209 let left_hash = stack.pop().unwrap();
210 outboard.save(node, &(left_hash, right_hash))?;
211 let parent = parent_cv(&left_hash, &right_hash, is_root);
212 stack.push(parent);
213 }
214 BaoChunk::Leaf {
215 size,
216 is_root,
217 start_chunk,
218 ..
219 } => {
220 if let Err(err) = progress.send(start_chunk).await {
221 return Ok(Err(err));
222 }
223 let buf = &mut buffer[..size];
224 data.read_exact(buf)?;
225 let hash = hash_subtree(start_chunk.0, buf, is_root);
226 stack.push(hash);
227 }
228 }
229 }
230 debug_assert_eq!(stack.len(), 1);
231 outboard.root = stack.pop().unwrap();
232 Ok(Ok(()))
233 }
234
235 #[cfg(test)]
236 mod tests {
237 use bao_tree::{
238 blake3,
239 io::{outboard::PreOrderOutboard, sync::CreateOutboard},
240 BaoTree,
241 };
242 use testresult::TestResult;
243
244 use crate::{
245 store::{fs::tests::test_data, IROH_BLOCK_SIZE},
246 util::{outboard_with_progress::init_outboard, sink::Drain},
247 };
248
249 #[tokio::test]
250 async fn init_outboard_with_progress() -> TestResult<()> {
251 for size in [1024 * 18 + 1] {
252 let data = test_data(size);
253 let mut o1 = PreOrderOutboard::<Vec<u8>> {
254 tree: BaoTree::new(data.len() as u64, IROH_BLOCK_SIZE),
255 ..Default::default()
256 };
257 let mut o2 = o1.clone();
258 o1.init_from(data.as_ref())?;
259 init_outboard(data.as_ref(), &mut o2, &mut Drain).await??;
260 assert_eq!(o1.root, blake3::hash(&data));
261 assert_eq!(o1.root, o2.root);
262 assert_eq!(o1.data, o2.data);
263 }
264 Ok(())
265 }
266 }
267}
268
269pub mod sink {
270 use std::{future::Future, io};
271
272 use irpc::RpcMessage;
273
274 pub trait Sink<Item> {
276 type Error;
277 fn send(
278 &mut self,
279 value: Item,
280 ) -> impl Future<Output = std::result::Result<(), Self::Error>>;
281
282 fn with_map_err<F, U>(self, f: F) -> WithMapErr<Self, F>
283 where
284 Self: Sized,
285 F: Fn(Self::Error) -> U + Send + 'static,
286 {
287 WithMapErr { inner: self, f }
288 }
289
290 fn with_map<F, U>(self, f: F) -> WithMap<Self, F>
291 where
292 Self: Sized,
293 F: Fn(U) -> Item + Send + 'static,
294 {
295 WithMap { inner: self, f }
296 }
297 }
298
299 impl<I, T> Sink<T> for &mut I
300 where
301 I: Sink<T>,
302 {
303 type Error = I::Error;
304
305 async fn send(&mut self, value: T) -> std::result::Result<(), Self::Error> {
306 (*self).send(value).await
307 }
308 }
309
310 pub struct IrpcSenderSink<T>(pub irpc::channel::mpsc::Sender<T>);
311
312 impl<T> Sink<T> for IrpcSenderSink<T>
313 where
314 T: RpcMessage,
315 {
316 type Error = irpc::channel::SendError;
317
318 async fn send(&mut self, value: T) -> std::result::Result<(), Self::Error> {
319 self.0.send(value).await
320 }
321 }
322
323 pub struct IrpcSenderRefSink<'a, T>(pub &'a mut irpc::channel::mpsc::Sender<T>);
324
325 impl<'a, T> Sink<T> for IrpcSenderRefSink<'a, T>
326 where
327 T: RpcMessage,
328 {
329 type Error = irpc::channel::SendError;
330
331 async fn send(&mut self, value: T) -> std::result::Result<(), Self::Error> {
332 self.0.send(value).await
333 }
334 }
335
336 pub struct TokioMpscSenderSink<T>(pub tokio::sync::mpsc::Sender<T>);
337
338 impl<T> Sink<T> for TokioMpscSenderSink<T> {
339 type Error = tokio::sync::mpsc::error::SendError<T>;
340
341 async fn send(&mut self, value: T) -> std::result::Result<(), Self::Error> {
342 self.0.send(value).await
343 }
344 }
345
346 pub struct WithMapErr<P, F> {
347 inner: P,
348 f: F,
349 }
350
351 impl<P, F, E, U> Sink<U> for WithMapErr<P, F>
352 where
353 P: Sink<U>,
354 F: Fn(P::Error) -> E + Send + 'static,
355 {
356 type Error = E;
357
358 async fn send(&mut self, value: U) -> std::result::Result<(), Self::Error> {
359 match self.inner.send(value).await {
360 Ok(()) => Ok(()),
361 Err(err) => {
362 let err = (self.f)(err);
363 Err(err)
364 }
365 }
366 }
367 }
368
369 pub struct WithMap<P, F> {
370 inner: P,
371 f: F,
372 }
373
374 impl<P, F, T, U> Sink<T> for WithMap<P, F>
375 where
376 P: Sink<U>,
377 F: Fn(T) -> U + Send + 'static,
378 {
379 type Error = P::Error;
380
381 async fn send(&mut self, value: T) -> std::result::Result<(), Self::Error> {
382 self.inner.send((self.f)(value)).await
383 }
384 }
385
386 pub struct Drain;
387
388 impl<T> Sink<T> for Drain {
389 type Error = io::Error;
390
391 async fn send(&mut self, _offset: T) -> std::result::Result<(), Self::Error> {
392 io::Result::Ok(())
393 }
394 }
395}