iroh_blobs/
util.rs

1use std::ops::{Bound, RangeBounds};
2
3use bao_tree::{io::round_up_to_chunks, ChunkNum, ChunkRanges};
4use range_collections::{range_set::RangeSetEntry, RangeSet2};
5
6pub mod channel;
7pub(crate) mod temp_tag;
8pub mod serde {
9    // Module that handles io::Error serialization/deserialization
10    pub mod io_error_serde {
11        use std::{fmt, io};
12
13        use serde::{
14            de::{self, Visitor},
15            Deserializer, Serializer,
16        };
17
18        pub fn serialize<S>(error: &io::Error, serializer: S) -> Result<S::Ok, S::Error>
19        where
20            S: Serializer,
21        {
22            // Serialize the error kind and message
23            serializer.serialize_str(&format!("{:?}:{}", error.kind(), error))
24        }
25
26        pub fn deserialize<'de, D>(deserializer: D) -> Result<io::Error, D::Error>
27        where
28            D: Deserializer<'de>,
29        {
30            struct IoErrorVisitor;
31
32            impl<'de> Visitor<'de> for IoErrorVisitor {
33                type Value = io::Error;
34
35                fn expecting(&self, formatter: &mut fmt::Formatter) -> fmt::Result {
36                    formatter.write_str("an io::Error string representation")
37                }
38
39                fn visit_str<E>(self, value: &str) -> Result<Self::Value, E>
40                where
41                    E: de::Error,
42                {
43                    // For simplicity, create a generic error
44                    // In a real app, you might want to parse the kind from the string
45                    Ok(io::Error::other(value))
46                }
47            }
48
49            deserializer.deserialize_str(IoErrorVisitor)
50        }
51    }
52}
53
54pub trait ChunkRangesExt {
55    fn last_chunk() -> Self;
56    fn chunk(offset: u64) -> Self;
57    fn bytes(ranges: impl RangeBounds<u64>) -> Self;
58    fn chunks(ranges: impl RangeBounds<u64>) -> Self;
59    fn offset(offset: u64) -> Self;
60}
61
62impl ChunkRangesExt for ChunkRanges {
63    fn last_chunk() -> Self {
64        ChunkRanges::from(ChunkNum(u64::MAX)..)
65    }
66
67    /// Create a chunk range that contains a single chunk.
68    fn chunk(offset: u64) -> Self {
69        ChunkRanges::from(ChunkNum(offset)..ChunkNum(offset + 1))
70    }
71
72    /// Create a range of chunks that contains the given byte ranges.
73    /// The byte ranges are rounded up to the nearest chunk size.
74    fn bytes(ranges: impl RangeBounds<u64>) -> Self {
75        round_up_to_chunks(&bounds_from_range(ranges, |v| v))
76    }
77
78    /// Create a range of chunks from u64 chunk bounds.
79    ///
80    /// This is equivalent but more convenient than using the ChunkNum newtype.
81    fn chunks(ranges: impl RangeBounds<u64>) -> Self {
82        bounds_from_range(ranges, ChunkNum)
83    }
84
85    /// Create a chunk range that contains a single byte offset.
86    fn offset(offset: u64) -> Self {
87        Self::bytes(offset..offset + 1)
88    }
89}
90
91// todo: move to range_collections
92pub(crate) fn bounds_from_range<R, T, F>(range: R, f: F) -> RangeSet2<T>
93where
94    R: RangeBounds<u64>,
95    T: RangeSetEntry,
96    F: Fn(u64) -> T,
97{
98    let from = match range.start_bound() {
99        Bound::Included(start) => Some(*start),
100        Bound::Excluded(start) => {
101            let Some(start) = start.checked_add(1) else {
102                return RangeSet2::empty();
103            };
104            Some(start)
105        }
106        Bound::Unbounded => None,
107    };
108    let to = match range.end_bound() {
109        Bound::Included(end) => end.checked_add(1),
110        Bound::Excluded(end) => Some(*end),
111        Bound::Unbounded => None,
112    };
113    match (from, to) {
114        (Some(from), Some(to)) => RangeSet2::from(f(from)..f(to)),
115        (Some(from), None) => RangeSet2::from(f(from)..),
116        (None, Some(to)) => RangeSet2::from(..f(to)),
117        (None, None) => RangeSet2::all(),
118    }
119}
120
121pub mod outboard_with_progress {
122    use std::io::{self, BufReader, Read};
123
124    use bao_tree::{
125        blake3,
126        io::{
127            outboard::PreOrderOutboard,
128            sync::{OutboardMut, WriteAt},
129        },
130        iter::BaoChunk,
131        BaoTree, ChunkNum,
132    };
133    use smallvec::SmallVec;
134
135    use super::sink::Sink;
136
137    fn hash_subtree(start_chunk: u64, data: &[u8], is_root: bool) -> blake3::Hash {
138        use blake3::hazmat::{ChainingValue, HasherExt};
139        if is_root {
140            debug_assert!(start_chunk == 0);
141            blake3::hash(data)
142        } else {
143            let mut hasher = blake3::Hasher::new();
144            hasher.set_input_offset(start_chunk * 1024);
145            hasher.update(data);
146            let non_root_hash: ChainingValue = hasher.finalize_non_root();
147            blake3::Hash::from(non_root_hash)
148        }
149    }
150
151    fn parent_cv(
152        left_child: &blake3::Hash,
153        right_child: &blake3::Hash,
154        is_root: bool,
155    ) -> blake3::Hash {
156        use blake3::hazmat::{merge_subtrees_non_root, merge_subtrees_root, ChainingValue, Mode};
157        let left_child: ChainingValue = *left_child.as_bytes();
158        let right_child: ChainingValue = *right_child.as_bytes();
159        if is_root {
160            merge_subtrees_root(&left_child, &right_child, Mode::Hash)
161        } else {
162            blake3::Hash::from(merge_subtrees_non_root(
163                &left_child,
164                &right_child,
165                Mode::Hash,
166            ))
167        }
168    }
169
170    pub async fn init_outboard<R, W, P>(
171        data: R,
172        outboard: &mut PreOrderOutboard<W>,
173        progress: &mut P,
174    ) -> std::io::Result<std::result::Result<(), P::Error>>
175    where
176        W: WriteAt,
177        R: Read,
178        P: Sink<ChunkNum>,
179    {
180        // wrap the reader in a buffered reader, so we read in large chunks
181        // this reduces the number of io ops
182        let size = usize::try_from(outboard.tree.size()).unwrap_or(usize::MAX);
183        let read_buf_size = size.min(1024 * 1024);
184        let chunk_buf_size = size.min(outboard.tree.block_size().bytes());
185        let reader = BufReader::with_capacity(read_buf_size, data);
186        let mut buffer = SmallVec::<[u8; 128]>::from_elem(0u8, chunk_buf_size);
187        let res = init_impl(outboard.tree, reader, outboard, &mut buffer, progress).await?;
188        Ok(res)
189    }
190
191    async fn init_impl<W, P>(
192        tree: BaoTree,
193        mut data: impl Read,
194        outboard: &mut PreOrderOutboard<W>,
195        buffer: &mut [u8],
196        progress: &mut P,
197    ) -> io::Result<std::result::Result<(), P::Error>>
198    where
199        W: WriteAt,
200        P: Sink<ChunkNum>,
201    {
202        // do not allocate for small trees
203        let mut stack = SmallVec::<[blake3::Hash; 10]>::new();
204        // debug_assert!(buffer.len() == tree.chunk_group_bytes());
205        for item in tree.post_order_chunks_iter() {
206            match item {
207                BaoChunk::Parent { is_root, node, .. } => {
208                    let right_hash = stack.pop().unwrap();
209                    let left_hash = stack.pop().unwrap();
210                    outboard.save(node, &(left_hash, right_hash))?;
211                    let parent = parent_cv(&left_hash, &right_hash, is_root);
212                    stack.push(parent);
213                }
214                BaoChunk::Leaf {
215                    size,
216                    is_root,
217                    start_chunk,
218                    ..
219                } => {
220                    if let Err(err) = progress.send(start_chunk).await {
221                        return Ok(Err(err));
222                    }
223                    let buf = &mut buffer[..size];
224                    data.read_exact(buf)?;
225                    let hash = hash_subtree(start_chunk.0, buf, is_root);
226                    stack.push(hash);
227                }
228            }
229        }
230        debug_assert_eq!(stack.len(), 1);
231        outboard.root = stack.pop().unwrap();
232        Ok(Ok(()))
233    }
234
235    #[cfg(test)]
236    mod tests {
237        use bao_tree::{
238            blake3,
239            io::{outboard::PreOrderOutboard, sync::CreateOutboard},
240            BaoTree,
241        };
242        use testresult::TestResult;
243
244        use crate::{
245            store::{fs::tests::test_data, IROH_BLOCK_SIZE},
246            util::{outboard_with_progress::init_outboard, sink::Drain},
247        };
248
249        #[tokio::test]
250        async fn init_outboard_with_progress() -> TestResult<()> {
251            for size in [1024 * 18 + 1] {
252                let data = test_data(size);
253                let mut o1 = PreOrderOutboard::<Vec<u8>> {
254                    tree: BaoTree::new(data.len() as u64, IROH_BLOCK_SIZE),
255                    ..Default::default()
256                };
257                let mut o2 = o1.clone();
258                o1.init_from(data.as_ref())?;
259                init_outboard(data.as_ref(), &mut o2, &mut Drain).await??;
260                assert_eq!(o1.root, blake3::hash(&data));
261                assert_eq!(o1.root, o2.root);
262                assert_eq!(o1.data, o2.data);
263            }
264            Ok(())
265        }
266    }
267}
268
269pub mod sink {
270    use std::{future::Future, io};
271
272    use irpc::RpcMessage;
273
274    /// Our version of a sink, that can be mapped etc.
275    pub trait Sink<Item> {
276        type Error;
277        fn send(
278            &mut self,
279            value: Item,
280        ) -> impl Future<Output = std::result::Result<(), Self::Error>>;
281
282        fn with_map_err<F, U>(self, f: F) -> WithMapErr<Self, F>
283        where
284            Self: Sized,
285            F: Fn(Self::Error) -> U + Send + 'static,
286        {
287            WithMapErr { inner: self, f }
288        }
289
290        fn with_map<F, U>(self, f: F) -> WithMap<Self, F>
291        where
292            Self: Sized,
293            F: Fn(U) -> Item + Send + 'static,
294        {
295            WithMap { inner: self, f }
296        }
297    }
298
299    impl<I, T> Sink<T> for &mut I
300    where
301        I: Sink<T>,
302    {
303        type Error = I::Error;
304
305        async fn send(&mut self, value: T) -> std::result::Result<(), Self::Error> {
306            (*self).send(value).await
307        }
308    }
309
310    pub struct IrpcSenderSink<T>(pub irpc::channel::mpsc::Sender<T>);
311
312    impl<T> Sink<T> for IrpcSenderSink<T>
313    where
314        T: RpcMessage,
315    {
316        type Error = irpc::channel::SendError;
317
318        async fn send(&mut self, value: T) -> std::result::Result<(), Self::Error> {
319            self.0.send(value).await
320        }
321    }
322
323    pub struct IrpcSenderRefSink<'a, T>(pub &'a mut irpc::channel::mpsc::Sender<T>);
324
325    impl<'a, T> Sink<T> for IrpcSenderRefSink<'a, T>
326    where
327        T: RpcMessage,
328    {
329        type Error = irpc::channel::SendError;
330
331        async fn send(&mut self, value: T) -> std::result::Result<(), Self::Error> {
332            self.0.send(value).await
333        }
334    }
335
336    pub struct TokioMpscSenderSink<T>(pub tokio::sync::mpsc::Sender<T>);
337
338    impl<T> Sink<T> for TokioMpscSenderSink<T> {
339        type Error = tokio::sync::mpsc::error::SendError<T>;
340
341        async fn send(&mut self, value: T) -> std::result::Result<(), Self::Error> {
342            self.0.send(value).await
343        }
344    }
345
346    pub struct WithMapErr<P, F> {
347        inner: P,
348        f: F,
349    }
350
351    impl<P, F, E, U> Sink<U> for WithMapErr<P, F>
352    where
353        P: Sink<U>,
354        F: Fn(P::Error) -> E + Send + 'static,
355    {
356        type Error = E;
357
358        async fn send(&mut self, value: U) -> std::result::Result<(), Self::Error> {
359            match self.inner.send(value).await {
360                Ok(()) => Ok(()),
361                Err(err) => {
362                    let err = (self.f)(err);
363                    Err(err)
364                }
365            }
366        }
367    }
368
369    pub struct WithMap<P, F> {
370        inner: P,
371        f: F,
372    }
373
374    impl<P, F, T, U> Sink<T> for WithMap<P, F>
375    where
376        P: Sink<U>,
377        F: Fn(T) -> U + Send + 'static,
378    {
379        type Error = P::Error;
380
381        async fn send(&mut self, value: T) -> std::result::Result<(), Self::Error> {
382            self.inner.send((self.f)(value)).await
383        }
384    }
385
386    pub struct Drain;
387
388    impl<T> Sink<T> for Drain {
389        type Error = io::Error;
390
391        async fn send(&mut self, _offset: T) -> std::result::Result<(), Self::Error> {
392            io::Result::Ok(())
393        }
394    }
395}