1use std::ops::{Bound, RangeBounds};
2
3use bao_tree::{io::round_up_to_chunks, ChunkNum, ChunkRanges};
4use range_collections::{range_set::RangeSetEntry, RangeSet2};
5
6pub mod channel;
7pub(crate) mod temp_tag;
8pub mod serde {
9 pub mod io_error_serde {
11 use std::{fmt, io};
12
13 use serde::{
14 de::{self, SeqAccess, Visitor},
15 ser::SerializeTuple,
16 Deserializer, Serializer,
17 };
18
19 fn error_kind_to_u8(kind: io::ErrorKind) -> u8 {
20 match kind {
21 io::ErrorKind::AddrInUse => 0,
22 io::ErrorKind::AddrNotAvailable => 1,
23 io::ErrorKind::AlreadyExists => 2,
24 io::ErrorKind::ArgumentListTooLong => 3,
25 io::ErrorKind::BrokenPipe => 4,
26 io::ErrorKind::ConnectionAborted => 5,
27 io::ErrorKind::ConnectionRefused => 6,
28 io::ErrorKind::ConnectionReset => 7,
29 io::ErrorKind::CrossesDevices => 8,
30 io::ErrorKind::Deadlock => 9,
31 io::ErrorKind::DirectoryNotEmpty => 10,
32 io::ErrorKind::ExecutableFileBusy => 11,
33 io::ErrorKind::FileTooLarge => 12,
34 io::ErrorKind::HostUnreachable => 13,
35 io::ErrorKind::Interrupted => 14,
36 io::ErrorKind::InvalidData => 15,
37 io::ErrorKind::InvalidInput => 17,
38 io::ErrorKind::IsADirectory => 18,
39 io::ErrorKind::NetworkDown => 19,
40 io::ErrorKind::NetworkUnreachable => 20,
41 io::ErrorKind::NotADirectory => 21,
42 io::ErrorKind::NotConnected => 22,
43 io::ErrorKind::NotFound => 23,
44 io::ErrorKind::NotSeekable => 24,
45 io::ErrorKind::Other => 25,
46 io::ErrorKind::OutOfMemory => 26,
47 io::ErrorKind::PermissionDenied => 27,
48 io::ErrorKind::QuotaExceeded => 28,
49 io::ErrorKind::ReadOnlyFilesystem => 29,
50 io::ErrorKind::ResourceBusy => 30,
51 io::ErrorKind::StaleNetworkFileHandle => 31,
52 io::ErrorKind::StorageFull => 32,
53 io::ErrorKind::TimedOut => 33,
54 io::ErrorKind::TooManyLinks => 34,
55 io::ErrorKind::UnexpectedEof => 35,
56 io::ErrorKind::Unsupported => 36,
57 io::ErrorKind::WouldBlock => 37,
58 io::ErrorKind::WriteZero => 38,
59 _ => 25,
60 }
61 }
62
63 fn u8_to_error_kind(num: u8) -> io::ErrorKind {
64 match num {
65 0 => io::ErrorKind::AddrInUse,
66 1 => io::ErrorKind::AddrNotAvailable,
67 2 => io::ErrorKind::AlreadyExists,
68 3 => io::ErrorKind::ArgumentListTooLong,
69 4 => io::ErrorKind::BrokenPipe,
70 5 => io::ErrorKind::ConnectionAborted,
71 6 => io::ErrorKind::ConnectionRefused,
72 7 => io::ErrorKind::ConnectionReset,
73 8 => io::ErrorKind::CrossesDevices,
74 9 => io::ErrorKind::Deadlock,
75 10 => io::ErrorKind::DirectoryNotEmpty,
76 11 => io::ErrorKind::ExecutableFileBusy,
77 12 => io::ErrorKind::FileTooLarge,
78 13 => io::ErrorKind::HostUnreachable,
79 14 => io::ErrorKind::Interrupted,
80 15 => io::ErrorKind::InvalidData,
81 17 => io::ErrorKind::InvalidInput,
83 18 => io::ErrorKind::IsADirectory,
84 19 => io::ErrorKind::NetworkDown,
85 20 => io::ErrorKind::NetworkUnreachable,
86 21 => io::ErrorKind::NotADirectory,
87 22 => io::ErrorKind::NotConnected,
88 23 => io::ErrorKind::NotFound,
89 24 => io::ErrorKind::NotSeekable,
90 25 => io::ErrorKind::Other,
91 26 => io::ErrorKind::OutOfMemory,
92 27 => io::ErrorKind::PermissionDenied,
93 28 => io::ErrorKind::QuotaExceeded,
94 29 => io::ErrorKind::ReadOnlyFilesystem,
95 30 => io::ErrorKind::ResourceBusy,
96 31 => io::ErrorKind::StaleNetworkFileHandle,
97 32 => io::ErrorKind::StorageFull,
98 33 => io::ErrorKind::TimedOut,
99 34 => io::ErrorKind::TooManyLinks,
100 35 => io::ErrorKind::UnexpectedEof,
101 36 => io::ErrorKind::Unsupported,
102 37 => io::ErrorKind::WouldBlock,
103 38 => io::ErrorKind::WriteZero,
104 _ => io::ErrorKind::Other,
105 }
106 }
107
108 pub fn serialize<S>(error: &io::Error, serializer: S) -> Result<S::Ok, S::Error>
109 where
110 S: Serializer,
111 {
112 let mut tup = serializer.serialize_tuple(2)?;
113 tup.serialize_element(&error_kind_to_u8(error.kind()))?;
114 tup.serialize_element(&error.to_string())?;
115 tup.end()
116 }
117
118 pub fn deserialize<'de, D>(deserializer: D) -> Result<io::Error, D::Error>
119 where
120 D: Deserializer<'de>,
121 {
122 struct IoErrorVisitor;
123
124 impl<'de> Visitor<'de> for IoErrorVisitor {
125 type Value = io::Error;
126
127 fn expecting(&self, formatter: &mut fmt::Formatter) -> fmt::Result {
128 formatter.write_str("a tuple of (u32, String) representing io::Error")
129 }
130
131 fn visit_seq<A>(self, mut seq: A) -> Result<Self::Value, A::Error>
132 where
133 A: SeqAccess<'de>,
134 {
135 let num: u8 = seq
136 .next_element()?
137 .ok_or_else(|| de::Error::invalid_length(0, &self))?;
138 let message: String = seq
139 .next_element()?
140 .ok_or_else(|| de::Error::invalid_length(1, &self))?;
141 let kind = u8_to_error_kind(num);
142 Ok(io::Error::new(kind, message))
143 }
144 }
145
146 deserializer.deserialize_tuple(2, IoErrorVisitor)
147 }
148 }
149
150 #[cfg(test)]
151 mod tests {
152 use std::io::{self, ErrorKind};
153
154 use postcard;
155 use serde::{Deserialize, Serialize};
156
157 use super::io_error_serde;
158
159 #[derive(Serialize, Deserialize)]
160 struct TestError(#[serde(with = "io_error_serde")] io::Error);
161
162 #[test]
163 fn test_roundtrip_error_kinds() {
164 let message = "test error";
165 let kinds = [
166 ErrorKind::AddrInUse,
167 ErrorKind::AddrNotAvailable,
168 ErrorKind::AlreadyExists,
169 ErrorKind::ArgumentListTooLong,
170 ErrorKind::BrokenPipe,
171 ErrorKind::ConnectionAborted,
172 ErrorKind::ConnectionRefused,
173 ErrorKind::ConnectionReset,
174 ErrorKind::CrossesDevices,
175 ErrorKind::Deadlock,
176 ErrorKind::DirectoryNotEmpty,
177 ErrorKind::ExecutableFileBusy,
178 ErrorKind::FileTooLarge,
179 ErrorKind::HostUnreachable,
180 ErrorKind::Interrupted,
181 ErrorKind::InvalidData,
182 ErrorKind::InvalidInput,
184 ErrorKind::IsADirectory,
185 ErrorKind::NetworkDown,
186 ErrorKind::NetworkUnreachable,
187 ErrorKind::NotADirectory,
188 ErrorKind::NotConnected,
189 ErrorKind::NotFound,
190 ErrorKind::NotSeekable,
191 ErrorKind::Other,
192 ErrorKind::OutOfMemory,
193 ErrorKind::PermissionDenied,
194 ErrorKind::QuotaExceeded,
195 ErrorKind::ReadOnlyFilesystem,
196 ErrorKind::ResourceBusy,
197 ErrorKind::StaleNetworkFileHandle,
198 ErrorKind::StorageFull,
199 ErrorKind::TimedOut,
200 ErrorKind::TooManyLinks,
201 ErrorKind::UnexpectedEof,
202 ErrorKind::Unsupported,
203 ErrorKind::WouldBlock,
204 ErrorKind::WriteZero,
205 ];
206
207 for kind in kinds {
208 let err = TestError(io::Error::new(kind, message));
209 let serialized = postcard::to_allocvec(&err).unwrap();
210 let deserialized: TestError = postcard::from_bytes(&serialized).unwrap();
211
212 assert_eq!(err.0.kind(), deserialized.0.kind());
213 assert_eq!(err.0.to_string(), deserialized.0.to_string());
214 }
215 }
216 }
217}
218
219pub trait ChunkRangesExt {
220 fn last_chunk() -> Self;
221 fn chunk(offset: u64) -> Self;
222 fn bytes(ranges: impl RangeBounds<u64>) -> Self;
223 fn chunks(ranges: impl RangeBounds<u64>) -> Self;
224 fn offset(offset: u64) -> Self;
225}
226
227impl ChunkRangesExt for ChunkRanges {
228 fn last_chunk() -> Self {
229 ChunkRanges::from(ChunkNum(u64::MAX)..)
230 }
231
232 fn chunk(offset: u64) -> Self {
234 ChunkRanges::from(ChunkNum(offset)..ChunkNum(offset + 1))
235 }
236
237 fn bytes(ranges: impl RangeBounds<u64>) -> Self {
240 round_up_to_chunks(&bounds_from_range(ranges, |v| v))
241 }
242
243 fn chunks(ranges: impl RangeBounds<u64>) -> Self {
247 bounds_from_range(ranges, ChunkNum)
248 }
249
250 fn offset(offset: u64) -> Self {
252 Self::bytes(offset..offset + 1)
253 }
254}
255
256pub(crate) fn bounds_from_range<R, T, F>(range: R, f: F) -> RangeSet2<T>
258where
259 R: RangeBounds<u64>,
260 T: RangeSetEntry,
261 F: Fn(u64) -> T,
262{
263 let from = match range.start_bound() {
264 Bound::Included(start) => Some(*start),
265 Bound::Excluded(start) => {
266 let Some(start) = start.checked_add(1) else {
267 return RangeSet2::empty();
268 };
269 Some(start)
270 }
271 Bound::Unbounded => None,
272 };
273 let to = match range.end_bound() {
274 Bound::Included(end) => end.checked_add(1),
275 Bound::Excluded(end) => Some(*end),
276 Bound::Unbounded => None,
277 };
278 match (from, to) {
279 (Some(from), Some(to)) => RangeSet2::from(f(from)..f(to)),
280 (Some(from), None) => RangeSet2::from(f(from)..),
281 (None, Some(to)) => RangeSet2::from(..f(to)),
282 (None, None) => RangeSet2::all(),
283 }
284}
285
286pub mod outboard_with_progress {
287 use std::io::{self, BufReader, Read};
288
289 use bao_tree::{
290 blake3,
291 io::{
292 outboard::PreOrderOutboard,
293 sync::{OutboardMut, WriteAt},
294 },
295 iter::BaoChunk,
296 BaoTree, ChunkNum,
297 };
298 use smallvec::SmallVec;
299
300 use super::sink::Sink;
301
302 fn hash_subtree(start_chunk: u64, data: &[u8], is_root: bool) -> blake3::Hash {
303 use blake3::hazmat::{ChainingValue, HasherExt};
304 if is_root {
305 debug_assert!(start_chunk == 0);
306 blake3::hash(data)
307 } else {
308 let mut hasher = blake3::Hasher::new();
309 hasher.set_input_offset(start_chunk * 1024);
310 hasher.update(data);
311 let non_root_hash: ChainingValue = hasher.finalize_non_root();
312 blake3::Hash::from(non_root_hash)
313 }
314 }
315
316 fn parent_cv(
317 left_child: &blake3::Hash,
318 right_child: &blake3::Hash,
319 is_root: bool,
320 ) -> blake3::Hash {
321 use blake3::hazmat::{merge_subtrees_non_root, merge_subtrees_root, ChainingValue, Mode};
322 let left_child: ChainingValue = *left_child.as_bytes();
323 let right_child: ChainingValue = *right_child.as_bytes();
324 if is_root {
325 merge_subtrees_root(&left_child, &right_child, Mode::Hash)
326 } else {
327 blake3::Hash::from(merge_subtrees_non_root(
328 &left_child,
329 &right_child,
330 Mode::Hash,
331 ))
332 }
333 }
334
335 pub async fn init_outboard<R, W, P>(
336 data: R,
337 outboard: &mut PreOrderOutboard<W>,
338 progress: &mut P,
339 ) -> std::io::Result<std::result::Result<(), P::Error>>
340 where
341 W: WriteAt,
342 R: Read,
343 P: Sink<ChunkNum>,
344 {
345 let size = usize::try_from(outboard.tree.size()).unwrap_or(usize::MAX);
348 let read_buf_size = size.min(1024 * 1024);
349 let chunk_buf_size = size.min(outboard.tree.block_size().bytes());
350 let reader = BufReader::with_capacity(read_buf_size, data);
351 let mut buffer = SmallVec::<[u8; 128]>::from_elem(0u8, chunk_buf_size);
352 let res = init_impl(outboard.tree, reader, outboard, &mut buffer, progress).await?;
353 Ok(res)
354 }
355
356 async fn init_impl<W, P>(
357 tree: BaoTree,
358 mut data: impl Read,
359 outboard: &mut PreOrderOutboard<W>,
360 buffer: &mut [u8],
361 progress: &mut P,
362 ) -> io::Result<std::result::Result<(), P::Error>>
363 where
364 W: WriteAt,
365 P: Sink<ChunkNum>,
366 {
367 let mut stack = SmallVec::<[blake3::Hash; 10]>::new();
369 for item in tree.post_order_chunks_iter() {
371 match item {
372 BaoChunk::Parent { is_root, node, .. } => {
373 let right_hash = stack.pop().unwrap();
374 let left_hash = stack.pop().unwrap();
375 outboard.save(node, &(left_hash, right_hash))?;
376 let parent = parent_cv(&left_hash, &right_hash, is_root);
377 stack.push(parent);
378 }
379 BaoChunk::Leaf {
380 size,
381 is_root,
382 start_chunk,
383 ..
384 } => {
385 if let Err(err) = progress.send(start_chunk).await {
386 return Ok(Err(err));
387 }
388 let buf = &mut buffer[..size];
389 data.read_exact(buf)?;
390 let hash = hash_subtree(start_chunk.0, buf, is_root);
391 stack.push(hash);
392 }
393 }
394 }
395 debug_assert_eq!(stack.len(), 1);
396 outboard.root = stack.pop().unwrap();
397 Ok(Ok(()))
398 }
399
400 #[cfg(test)]
401 mod tests {
402 use bao_tree::{
403 blake3,
404 io::{outboard::PreOrderOutboard, sync::CreateOutboard},
405 BaoTree,
406 };
407 use testresult::TestResult;
408
409 use crate::{
410 store::{fs::tests::test_data, IROH_BLOCK_SIZE},
411 util::{outboard_with_progress::init_outboard, sink::Drain},
412 };
413
414 #[tokio::test]
415 async fn init_outboard_with_progress() -> TestResult<()> {
416 for size in [1024 * 18 + 1] {
417 let data = test_data(size);
418 let mut o1 = PreOrderOutboard::<Vec<u8>> {
419 tree: BaoTree::new(data.len() as u64, IROH_BLOCK_SIZE),
420 ..Default::default()
421 };
422 let mut o2 = o1.clone();
423 o1.init_from(data.as_ref())?;
424 init_outboard(data.as_ref(), &mut o2, &mut Drain).await??;
425 assert_eq!(o1.root, blake3::hash(&data));
426 assert_eq!(o1.root, o2.root);
427 assert_eq!(o1.data, o2.data);
428 }
429 Ok(())
430 }
431 }
432}
433
434pub mod sink {
435 use std::{future::Future, io};
436
437 use irpc::RpcMessage;
438
439 pub trait Sink<Item> {
441 type Error;
442 fn send(
443 &mut self,
444 value: Item,
445 ) -> impl Future<Output = std::result::Result<(), Self::Error>>;
446
447 fn with_map_err<F, U>(self, f: F) -> WithMapErr<Self, F>
448 where
449 Self: Sized,
450 F: Fn(Self::Error) -> U + Send + 'static,
451 {
452 WithMapErr { inner: self, f }
453 }
454
455 fn with_map<F, U>(self, f: F) -> WithMap<Self, F>
456 where
457 Self: Sized,
458 F: Fn(U) -> Item + Send + 'static,
459 {
460 WithMap { inner: self, f }
461 }
462 }
463
464 impl<I, T> Sink<T> for &mut I
465 where
466 I: Sink<T>,
467 {
468 type Error = I::Error;
469
470 async fn send(&mut self, value: T) -> std::result::Result<(), Self::Error> {
471 (*self).send(value).await
472 }
473 }
474
475 pub struct IrpcSenderSink<T>(pub irpc::channel::mpsc::Sender<T>);
476
477 impl<T> Sink<T> for IrpcSenderSink<T>
478 where
479 T: RpcMessage,
480 {
481 type Error = irpc::channel::SendError;
482
483 async fn send(&mut self, value: T) -> std::result::Result<(), Self::Error> {
484 self.0.send(value).await
485 }
486 }
487
488 pub struct IrpcSenderRefSink<'a, T>(pub &'a mut irpc::channel::mpsc::Sender<T>);
489
490 impl<'a, T> Sink<T> for IrpcSenderRefSink<'a, T>
491 where
492 T: RpcMessage,
493 {
494 type Error = irpc::channel::SendError;
495
496 async fn send(&mut self, value: T) -> std::result::Result<(), Self::Error> {
497 self.0.send(value).await
498 }
499 }
500
501 pub struct TokioMpscSenderSink<T>(pub tokio::sync::mpsc::Sender<T>);
502
503 impl<T> Sink<T> for TokioMpscSenderSink<T> {
504 type Error = tokio::sync::mpsc::error::SendError<T>;
505
506 async fn send(&mut self, value: T) -> std::result::Result<(), Self::Error> {
507 self.0.send(value).await
508 }
509 }
510
511 pub struct WithMapErr<P, F> {
512 inner: P,
513 f: F,
514 }
515
516 impl<P, F, E, U> Sink<U> for WithMapErr<P, F>
517 where
518 P: Sink<U>,
519 F: Fn(P::Error) -> E + Send + 'static,
520 {
521 type Error = E;
522
523 async fn send(&mut self, value: U) -> std::result::Result<(), Self::Error> {
524 match self.inner.send(value).await {
525 Ok(()) => Ok(()),
526 Err(err) => {
527 let err = (self.f)(err);
528 Err(err)
529 }
530 }
531 }
532 }
533
534 pub struct WithMap<P, F> {
535 inner: P,
536 f: F,
537 }
538
539 impl<P, F, T, U> Sink<T> for WithMap<P, F>
540 where
541 P: Sink<U>,
542 F: Fn(T) -> U + Send + 'static,
543 {
544 type Error = P::Error;
545
546 async fn send(&mut self, value: T) -> std::result::Result<(), Self::Error> {
547 self.inner.send((self.f)(value)).await
548 }
549 }
550
551 pub struct Drain;
552
553 impl<T> Sink<T> for Drain {
554 type Error = io::Error;
555
556 async fn send(&mut self, _offset: T) -> std::result::Result<(), Self::Error> {
557 io::Result::Ok(())
558 }
559 }
560}