iroh_blobs/
util.rs

1use std::ops::{Bound, RangeBounds};
2
3use bao_tree::{io::round_up_to_chunks, ChunkNum, ChunkRanges};
4use range_collections::{range_set::RangeSetEntry, RangeSet2};
5
6pub mod channel;
7pub(crate) mod temp_tag;
8pub mod serde {
9    // Module that handles io::Error serialization/deserialization
10    pub mod io_error_serde {
11        use std::{fmt, io};
12
13        use serde::{
14            de::{self, SeqAccess, Visitor},
15            ser::SerializeTuple,
16            Deserializer, Serializer,
17        };
18
19        fn error_kind_to_u8(kind: io::ErrorKind) -> u8 {
20            match kind {
21                io::ErrorKind::AddrInUse => 0,
22                io::ErrorKind::AddrNotAvailable => 1,
23                io::ErrorKind::AlreadyExists => 2,
24                io::ErrorKind::ArgumentListTooLong => 3,
25                io::ErrorKind::BrokenPipe => 4,
26                io::ErrorKind::ConnectionAborted => 5,
27                io::ErrorKind::ConnectionRefused => 6,
28                io::ErrorKind::ConnectionReset => 7,
29                io::ErrorKind::CrossesDevices => 8,
30                io::ErrorKind::Deadlock => 9,
31                io::ErrorKind::DirectoryNotEmpty => 10,
32                io::ErrorKind::ExecutableFileBusy => 11,
33                io::ErrorKind::FileTooLarge => 12,
34                io::ErrorKind::HostUnreachable => 13,
35                io::ErrorKind::Interrupted => 14,
36                io::ErrorKind::InvalidData => 15,
37                io::ErrorKind::InvalidInput => 17,
38                io::ErrorKind::IsADirectory => 18,
39                io::ErrorKind::NetworkDown => 19,
40                io::ErrorKind::NetworkUnreachable => 20,
41                io::ErrorKind::NotADirectory => 21,
42                io::ErrorKind::NotConnected => 22,
43                io::ErrorKind::NotFound => 23,
44                io::ErrorKind::NotSeekable => 24,
45                io::ErrorKind::Other => 25,
46                io::ErrorKind::OutOfMemory => 26,
47                io::ErrorKind::PermissionDenied => 27,
48                io::ErrorKind::QuotaExceeded => 28,
49                io::ErrorKind::ReadOnlyFilesystem => 29,
50                io::ErrorKind::ResourceBusy => 30,
51                io::ErrorKind::StaleNetworkFileHandle => 31,
52                io::ErrorKind::StorageFull => 32,
53                io::ErrorKind::TimedOut => 33,
54                io::ErrorKind::TooManyLinks => 34,
55                io::ErrorKind::UnexpectedEof => 35,
56                io::ErrorKind::Unsupported => 36,
57                io::ErrorKind::WouldBlock => 37,
58                io::ErrorKind::WriteZero => 38,
59                _ => 25,
60            }
61        }
62
63        fn u8_to_error_kind(num: u8) -> io::ErrorKind {
64            match num {
65                0 => io::ErrorKind::AddrInUse,
66                1 => io::ErrorKind::AddrNotAvailable,
67                2 => io::ErrorKind::AlreadyExists,
68                3 => io::ErrorKind::ArgumentListTooLong,
69                4 => io::ErrorKind::BrokenPipe,
70                5 => io::ErrorKind::ConnectionAborted,
71                6 => io::ErrorKind::ConnectionRefused,
72                7 => io::ErrorKind::ConnectionReset,
73                8 => io::ErrorKind::CrossesDevices,
74                9 => io::ErrorKind::Deadlock,
75                10 => io::ErrorKind::DirectoryNotEmpty,
76                11 => io::ErrorKind::ExecutableFileBusy,
77                12 => io::ErrorKind::FileTooLarge,
78                13 => io::ErrorKind::HostUnreachable,
79                14 => io::ErrorKind::Interrupted,
80                15 => io::ErrorKind::InvalidData,
81                // 16 => io::ErrorKind::InvalidFilename,
82                17 => io::ErrorKind::InvalidInput,
83                18 => io::ErrorKind::IsADirectory,
84                19 => io::ErrorKind::NetworkDown,
85                20 => io::ErrorKind::NetworkUnreachable,
86                21 => io::ErrorKind::NotADirectory,
87                22 => io::ErrorKind::NotConnected,
88                23 => io::ErrorKind::NotFound,
89                24 => io::ErrorKind::NotSeekable,
90                25 => io::ErrorKind::Other,
91                26 => io::ErrorKind::OutOfMemory,
92                27 => io::ErrorKind::PermissionDenied,
93                28 => io::ErrorKind::QuotaExceeded,
94                29 => io::ErrorKind::ReadOnlyFilesystem,
95                30 => io::ErrorKind::ResourceBusy,
96                31 => io::ErrorKind::StaleNetworkFileHandle,
97                32 => io::ErrorKind::StorageFull,
98                33 => io::ErrorKind::TimedOut,
99                34 => io::ErrorKind::TooManyLinks,
100                35 => io::ErrorKind::UnexpectedEof,
101                36 => io::ErrorKind::Unsupported,
102                37 => io::ErrorKind::WouldBlock,
103                38 => io::ErrorKind::WriteZero,
104                _ => io::ErrorKind::Other,
105            }
106        }
107
108        pub fn serialize<S>(error: &io::Error, serializer: S) -> Result<S::Ok, S::Error>
109        where
110            S: Serializer,
111        {
112            let mut tup = serializer.serialize_tuple(2)?;
113            tup.serialize_element(&error_kind_to_u8(error.kind()))?;
114            tup.serialize_element(&error.to_string())?;
115            tup.end()
116        }
117
118        pub fn deserialize<'de, D>(deserializer: D) -> Result<io::Error, D::Error>
119        where
120            D: Deserializer<'de>,
121        {
122            struct IoErrorVisitor;
123
124            impl<'de> Visitor<'de> for IoErrorVisitor {
125                type Value = io::Error;
126
127                fn expecting(&self, formatter: &mut fmt::Formatter) -> fmt::Result {
128                    formatter.write_str("a tuple of (u32, String) representing io::Error")
129                }
130
131                fn visit_seq<A>(self, mut seq: A) -> Result<Self::Value, A::Error>
132                where
133                    A: SeqAccess<'de>,
134                {
135                    let num: u8 = seq
136                        .next_element()?
137                        .ok_or_else(|| de::Error::invalid_length(0, &self))?;
138                    let message: String = seq
139                        .next_element()?
140                        .ok_or_else(|| de::Error::invalid_length(1, &self))?;
141                    let kind = u8_to_error_kind(num);
142                    Ok(io::Error::new(kind, message))
143                }
144            }
145
146            deserializer.deserialize_tuple(2, IoErrorVisitor)
147        }
148    }
149
150    #[cfg(test)]
151    mod tests {
152        use std::io::{self, ErrorKind};
153
154        use postcard;
155        use serde::{Deserialize, Serialize};
156
157        use super::io_error_serde;
158
159        #[derive(Serialize, Deserialize)]
160        struct TestError(#[serde(with = "io_error_serde")] io::Error);
161
162        #[test]
163        fn test_roundtrip_error_kinds() {
164            let message = "test error";
165            let kinds = [
166                ErrorKind::AddrInUse,
167                ErrorKind::AddrNotAvailable,
168                ErrorKind::AlreadyExists,
169                ErrorKind::ArgumentListTooLong,
170                ErrorKind::BrokenPipe,
171                ErrorKind::ConnectionAborted,
172                ErrorKind::ConnectionRefused,
173                ErrorKind::ConnectionReset,
174                ErrorKind::CrossesDevices,
175                ErrorKind::Deadlock,
176                ErrorKind::DirectoryNotEmpty,
177                ErrorKind::ExecutableFileBusy,
178                ErrorKind::FileTooLarge,
179                ErrorKind::HostUnreachable,
180                ErrorKind::Interrupted,
181                ErrorKind::InvalidData,
182                // ErrorKind::InvalidFilename,
183                ErrorKind::InvalidInput,
184                ErrorKind::IsADirectory,
185                ErrorKind::NetworkDown,
186                ErrorKind::NetworkUnreachable,
187                ErrorKind::NotADirectory,
188                ErrorKind::NotConnected,
189                ErrorKind::NotFound,
190                ErrorKind::NotSeekable,
191                ErrorKind::Other,
192                ErrorKind::OutOfMemory,
193                ErrorKind::PermissionDenied,
194                ErrorKind::QuotaExceeded,
195                ErrorKind::ReadOnlyFilesystem,
196                ErrorKind::ResourceBusy,
197                ErrorKind::StaleNetworkFileHandle,
198                ErrorKind::StorageFull,
199                ErrorKind::TimedOut,
200                ErrorKind::TooManyLinks,
201                ErrorKind::UnexpectedEof,
202                ErrorKind::Unsupported,
203                ErrorKind::WouldBlock,
204                ErrorKind::WriteZero,
205            ];
206
207            for kind in kinds {
208                let err = TestError(io::Error::new(kind, message));
209                let serialized = postcard::to_allocvec(&err).unwrap();
210                let deserialized: TestError = postcard::from_bytes(&serialized).unwrap();
211
212                assert_eq!(err.0.kind(), deserialized.0.kind());
213                assert_eq!(err.0.to_string(), deserialized.0.to_string());
214            }
215        }
216    }
217}
218
219pub trait ChunkRangesExt {
220    fn last_chunk() -> Self;
221    fn chunk(offset: u64) -> Self;
222    fn bytes(ranges: impl RangeBounds<u64>) -> Self;
223    fn chunks(ranges: impl RangeBounds<u64>) -> Self;
224    fn offset(offset: u64) -> Self;
225}
226
227impl ChunkRangesExt for ChunkRanges {
228    fn last_chunk() -> Self {
229        ChunkRanges::from(ChunkNum(u64::MAX)..)
230    }
231
232    /// Create a chunk range that contains a single chunk.
233    fn chunk(offset: u64) -> Self {
234        ChunkRanges::from(ChunkNum(offset)..ChunkNum(offset + 1))
235    }
236
237    /// Create a range of chunks that contains the given byte ranges.
238    /// The byte ranges are rounded up to the nearest chunk size.
239    fn bytes(ranges: impl RangeBounds<u64>) -> Self {
240        round_up_to_chunks(&bounds_from_range(ranges, |v| v))
241    }
242
243    /// Create a range of chunks from u64 chunk bounds.
244    ///
245    /// This is equivalent but more convenient than using the ChunkNum newtype.
246    fn chunks(ranges: impl RangeBounds<u64>) -> Self {
247        bounds_from_range(ranges, ChunkNum)
248    }
249
250    /// Create a chunk range that contains a single byte offset.
251    fn offset(offset: u64) -> Self {
252        Self::bytes(offset..offset + 1)
253    }
254}
255
256// todo: move to range_collections
257pub(crate) fn bounds_from_range<R, T, F>(range: R, f: F) -> RangeSet2<T>
258where
259    R: RangeBounds<u64>,
260    T: RangeSetEntry,
261    F: Fn(u64) -> T,
262{
263    let from = match range.start_bound() {
264        Bound::Included(start) => Some(*start),
265        Bound::Excluded(start) => {
266            let Some(start) = start.checked_add(1) else {
267                return RangeSet2::empty();
268            };
269            Some(start)
270        }
271        Bound::Unbounded => None,
272    };
273    let to = match range.end_bound() {
274        Bound::Included(end) => end.checked_add(1),
275        Bound::Excluded(end) => Some(*end),
276        Bound::Unbounded => None,
277    };
278    match (from, to) {
279        (Some(from), Some(to)) => RangeSet2::from(f(from)..f(to)),
280        (Some(from), None) => RangeSet2::from(f(from)..),
281        (None, Some(to)) => RangeSet2::from(..f(to)),
282        (None, None) => RangeSet2::all(),
283    }
284}
285
286pub mod outboard_with_progress {
287    use std::io::{self, BufReader, Read};
288
289    use bao_tree::{
290        blake3,
291        io::{
292            outboard::PreOrderOutboard,
293            sync::{OutboardMut, WriteAt},
294        },
295        iter::BaoChunk,
296        BaoTree, ChunkNum,
297    };
298    use smallvec::SmallVec;
299
300    use super::sink::Sink;
301
302    fn hash_subtree(start_chunk: u64, data: &[u8], is_root: bool) -> blake3::Hash {
303        use blake3::hazmat::{ChainingValue, HasherExt};
304        if is_root {
305            debug_assert!(start_chunk == 0);
306            blake3::hash(data)
307        } else {
308            let mut hasher = blake3::Hasher::new();
309            hasher.set_input_offset(start_chunk * 1024);
310            hasher.update(data);
311            let non_root_hash: ChainingValue = hasher.finalize_non_root();
312            blake3::Hash::from(non_root_hash)
313        }
314    }
315
316    fn parent_cv(
317        left_child: &blake3::Hash,
318        right_child: &blake3::Hash,
319        is_root: bool,
320    ) -> blake3::Hash {
321        use blake3::hazmat::{merge_subtrees_non_root, merge_subtrees_root, ChainingValue, Mode};
322        let left_child: ChainingValue = *left_child.as_bytes();
323        let right_child: ChainingValue = *right_child.as_bytes();
324        if is_root {
325            merge_subtrees_root(&left_child, &right_child, Mode::Hash)
326        } else {
327            blake3::Hash::from(merge_subtrees_non_root(
328                &left_child,
329                &right_child,
330                Mode::Hash,
331            ))
332        }
333    }
334
335    pub async fn init_outboard<R, W, P>(
336        data: R,
337        outboard: &mut PreOrderOutboard<W>,
338        progress: &mut P,
339    ) -> std::io::Result<std::result::Result<(), P::Error>>
340    where
341        W: WriteAt,
342        R: Read,
343        P: Sink<ChunkNum>,
344    {
345        // wrap the reader in a buffered reader, so we read in large chunks
346        // this reduces the number of io ops
347        let size = usize::try_from(outboard.tree.size()).unwrap_or(usize::MAX);
348        let read_buf_size = size.min(1024 * 1024);
349        let chunk_buf_size = size.min(outboard.tree.block_size().bytes());
350        let reader = BufReader::with_capacity(read_buf_size, data);
351        let mut buffer = SmallVec::<[u8; 128]>::from_elem(0u8, chunk_buf_size);
352        let res = init_impl(outboard.tree, reader, outboard, &mut buffer, progress).await?;
353        Ok(res)
354    }
355
356    async fn init_impl<W, P>(
357        tree: BaoTree,
358        mut data: impl Read,
359        outboard: &mut PreOrderOutboard<W>,
360        buffer: &mut [u8],
361        progress: &mut P,
362    ) -> io::Result<std::result::Result<(), P::Error>>
363    where
364        W: WriteAt,
365        P: Sink<ChunkNum>,
366    {
367        // do not allocate for small trees
368        let mut stack = SmallVec::<[blake3::Hash; 10]>::new();
369        // debug_assert!(buffer.len() == tree.chunk_group_bytes());
370        for item in tree.post_order_chunks_iter() {
371            match item {
372                BaoChunk::Parent { is_root, node, .. } => {
373                    let right_hash = stack.pop().unwrap();
374                    let left_hash = stack.pop().unwrap();
375                    outboard.save(node, &(left_hash, right_hash))?;
376                    let parent = parent_cv(&left_hash, &right_hash, is_root);
377                    stack.push(parent);
378                }
379                BaoChunk::Leaf {
380                    size,
381                    is_root,
382                    start_chunk,
383                    ..
384                } => {
385                    if let Err(err) = progress.send(start_chunk).await {
386                        return Ok(Err(err));
387                    }
388                    let buf = &mut buffer[..size];
389                    data.read_exact(buf)?;
390                    let hash = hash_subtree(start_chunk.0, buf, is_root);
391                    stack.push(hash);
392                }
393            }
394        }
395        debug_assert_eq!(stack.len(), 1);
396        outboard.root = stack.pop().unwrap();
397        Ok(Ok(()))
398    }
399
400    #[cfg(test)]
401    mod tests {
402        use bao_tree::{
403            blake3,
404            io::{outboard::PreOrderOutboard, sync::CreateOutboard},
405            BaoTree,
406        };
407        use testresult::TestResult;
408
409        use crate::{
410            store::{fs::tests::test_data, IROH_BLOCK_SIZE},
411            util::{outboard_with_progress::init_outboard, sink::Drain},
412        };
413
414        #[tokio::test]
415        async fn init_outboard_with_progress() -> TestResult<()> {
416            for size in [1024 * 18 + 1] {
417                let data = test_data(size);
418                let mut o1 = PreOrderOutboard::<Vec<u8>> {
419                    tree: BaoTree::new(data.len() as u64, IROH_BLOCK_SIZE),
420                    ..Default::default()
421                };
422                let mut o2 = o1.clone();
423                o1.init_from(data.as_ref())?;
424                init_outboard(data.as_ref(), &mut o2, &mut Drain).await??;
425                assert_eq!(o1.root, blake3::hash(&data));
426                assert_eq!(o1.root, o2.root);
427                assert_eq!(o1.data, o2.data);
428            }
429            Ok(())
430        }
431    }
432}
433
434pub mod sink {
435    use std::{future::Future, io};
436
437    use irpc::RpcMessage;
438
439    /// Our version of a sink, that can be mapped etc.
440    pub trait Sink<Item> {
441        type Error;
442        fn send(
443            &mut self,
444            value: Item,
445        ) -> impl Future<Output = std::result::Result<(), Self::Error>>;
446
447        fn with_map_err<F, U>(self, f: F) -> WithMapErr<Self, F>
448        where
449            Self: Sized,
450            F: Fn(Self::Error) -> U + Send + 'static,
451        {
452            WithMapErr { inner: self, f }
453        }
454
455        fn with_map<F, U>(self, f: F) -> WithMap<Self, F>
456        where
457            Self: Sized,
458            F: Fn(U) -> Item + Send + 'static,
459        {
460            WithMap { inner: self, f }
461        }
462    }
463
464    impl<I, T> Sink<T> for &mut I
465    where
466        I: Sink<T>,
467    {
468        type Error = I::Error;
469
470        async fn send(&mut self, value: T) -> std::result::Result<(), Self::Error> {
471            (*self).send(value).await
472        }
473    }
474
475    pub struct IrpcSenderSink<T>(pub irpc::channel::mpsc::Sender<T>);
476
477    impl<T> Sink<T> for IrpcSenderSink<T>
478    where
479        T: RpcMessage,
480    {
481        type Error = irpc::channel::SendError;
482
483        async fn send(&mut self, value: T) -> std::result::Result<(), Self::Error> {
484            self.0.send(value).await
485        }
486    }
487
488    pub struct IrpcSenderRefSink<'a, T>(pub &'a mut irpc::channel::mpsc::Sender<T>);
489
490    impl<'a, T> Sink<T> for IrpcSenderRefSink<'a, T>
491    where
492        T: RpcMessage,
493    {
494        type Error = irpc::channel::SendError;
495
496        async fn send(&mut self, value: T) -> std::result::Result<(), Self::Error> {
497            self.0.send(value).await
498        }
499    }
500
501    pub struct TokioMpscSenderSink<T>(pub tokio::sync::mpsc::Sender<T>);
502
503    impl<T> Sink<T> for TokioMpscSenderSink<T> {
504        type Error = tokio::sync::mpsc::error::SendError<T>;
505
506        async fn send(&mut self, value: T) -> std::result::Result<(), Self::Error> {
507            self.0.send(value).await
508        }
509    }
510
511    pub struct WithMapErr<P, F> {
512        inner: P,
513        f: F,
514    }
515
516    impl<P, F, E, U> Sink<U> for WithMapErr<P, F>
517    where
518        P: Sink<U>,
519        F: Fn(P::Error) -> E + Send + 'static,
520    {
521        type Error = E;
522
523        async fn send(&mut self, value: U) -> std::result::Result<(), Self::Error> {
524            match self.inner.send(value).await {
525                Ok(()) => Ok(()),
526                Err(err) => {
527                    let err = (self.f)(err);
528                    Err(err)
529                }
530            }
531        }
532    }
533
534    pub struct WithMap<P, F> {
535        inner: P,
536        f: F,
537    }
538
539    impl<P, F, T, U> Sink<T> for WithMap<P, F>
540    where
541        P: Sink<U>,
542        F: Fn(T) -> U + Send + 'static,
543    {
544        type Error = P::Error;
545
546        async fn send(&mut self, value: T) -> std::result::Result<(), Self::Error> {
547            self.inner.send((self.f)(value)).await
548        }
549    }
550
551    pub struct Drain;
552
553    impl<T> Sink<T> for Drain {
554        type Error = io::Error;
555
556        async fn send(&mut self, _offset: T) -> std::result::Result<(), Self::Error> {
557            io::Result::Ok(())
558        }
559    }
560}