iroh_blobs/
util.rs

1//! Utilities
2pub(crate) mod channel;
3pub mod connection_pool;
4pub(crate) mod temp_tag;
5
6pub(crate) mod serde {
7    // Module that handles io::Error serialization/deserialization
8    pub mod io_error_serde {
9        use std::{fmt, io};
10
11        use serde::{
12            de::{self, SeqAccess, Visitor},
13            ser::SerializeTuple,
14            Deserializer, Serializer,
15        };
16
17        fn error_kind_to_u8(kind: io::ErrorKind) -> u8 {
18            match kind {
19                io::ErrorKind::AddrInUse => 0,
20                io::ErrorKind::AddrNotAvailable => 1,
21                io::ErrorKind::AlreadyExists => 2,
22                io::ErrorKind::ArgumentListTooLong => 3,
23                io::ErrorKind::BrokenPipe => 4,
24                io::ErrorKind::ConnectionAborted => 5,
25                io::ErrorKind::ConnectionRefused => 6,
26                io::ErrorKind::ConnectionReset => 7,
27                io::ErrorKind::CrossesDevices => 8,
28                io::ErrorKind::Deadlock => 9,
29                io::ErrorKind::DirectoryNotEmpty => 10,
30                io::ErrorKind::ExecutableFileBusy => 11,
31                io::ErrorKind::FileTooLarge => 12,
32                io::ErrorKind::HostUnreachable => 13,
33                io::ErrorKind::Interrupted => 14,
34                io::ErrorKind::InvalidData => 15,
35                io::ErrorKind::InvalidInput => 17,
36                io::ErrorKind::IsADirectory => 18,
37                io::ErrorKind::NetworkDown => 19,
38                io::ErrorKind::NetworkUnreachable => 20,
39                io::ErrorKind::NotADirectory => 21,
40                io::ErrorKind::NotConnected => 22,
41                io::ErrorKind::NotFound => 23,
42                io::ErrorKind::NotSeekable => 24,
43                io::ErrorKind::Other => 25,
44                io::ErrorKind::OutOfMemory => 26,
45                io::ErrorKind::PermissionDenied => 27,
46                io::ErrorKind::QuotaExceeded => 28,
47                io::ErrorKind::ReadOnlyFilesystem => 29,
48                io::ErrorKind::ResourceBusy => 30,
49                io::ErrorKind::StaleNetworkFileHandle => 31,
50                io::ErrorKind::StorageFull => 32,
51                io::ErrorKind::TimedOut => 33,
52                io::ErrorKind::TooManyLinks => 34,
53                io::ErrorKind::UnexpectedEof => 35,
54                io::ErrorKind::Unsupported => 36,
55                io::ErrorKind::WouldBlock => 37,
56                io::ErrorKind::WriteZero => 38,
57                _ => 25,
58            }
59        }
60
61        fn u8_to_error_kind(num: u8) -> io::ErrorKind {
62            match num {
63                0 => io::ErrorKind::AddrInUse,
64                1 => io::ErrorKind::AddrNotAvailable,
65                2 => io::ErrorKind::AlreadyExists,
66                3 => io::ErrorKind::ArgumentListTooLong,
67                4 => io::ErrorKind::BrokenPipe,
68                5 => io::ErrorKind::ConnectionAborted,
69                6 => io::ErrorKind::ConnectionRefused,
70                7 => io::ErrorKind::ConnectionReset,
71                8 => io::ErrorKind::CrossesDevices,
72                9 => io::ErrorKind::Deadlock,
73                10 => io::ErrorKind::DirectoryNotEmpty,
74                11 => io::ErrorKind::ExecutableFileBusy,
75                12 => io::ErrorKind::FileTooLarge,
76                13 => io::ErrorKind::HostUnreachable,
77                14 => io::ErrorKind::Interrupted,
78                15 => io::ErrorKind::InvalidData,
79                // 16 => io::ErrorKind::InvalidFilename,
80                17 => io::ErrorKind::InvalidInput,
81                18 => io::ErrorKind::IsADirectory,
82                19 => io::ErrorKind::NetworkDown,
83                20 => io::ErrorKind::NetworkUnreachable,
84                21 => io::ErrorKind::NotADirectory,
85                22 => io::ErrorKind::NotConnected,
86                23 => io::ErrorKind::NotFound,
87                24 => io::ErrorKind::NotSeekable,
88                25 => io::ErrorKind::Other,
89                26 => io::ErrorKind::OutOfMemory,
90                27 => io::ErrorKind::PermissionDenied,
91                28 => io::ErrorKind::QuotaExceeded,
92                29 => io::ErrorKind::ReadOnlyFilesystem,
93                30 => io::ErrorKind::ResourceBusy,
94                31 => io::ErrorKind::StaleNetworkFileHandle,
95                32 => io::ErrorKind::StorageFull,
96                33 => io::ErrorKind::TimedOut,
97                34 => io::ErrorKind::TooManyLinks,
98                35 => io::ErrorKind::UnexpectedEof,
99                36 => io::ErrorKind::Unsupported,
100                37 => io::ErrorKind::WouldBlock,
101                38 => io::ErrorKind::WriteZero,
102                _ => io::ErrorKind::Other,
103            }
104        }
105
106        pub fn serialize<S>(error: &io::Error, serializer: S) -> Result<S::Ok, S::Error>
107        where
108            S: Serializer,
109        {
110            let mut tup = serializer.serialize_tuple(2)?;
111            tup.serialize_element(&error_kind_to_u8(error.kind()))?;
112            tup.serialize_element(&error.to_string())?;
113            tup.end()
114        }
115
116        pub fn deserialize<'de, D>(deserializer: D) -> Result<io::Error, D::Error>
117        where
118            D: Deserializer<'de>,
119        {
120            struct IoErrorVisitor;
121
122            impl<'de> Visitor<'de> for IoErrorVisitor {
123                type Value = io::Error;
124
125                fn expecting(&self, formatter: &mut fmt::Formatter) -> fmt::Result {
126                    formatter.write_str("a tuple of (u32, String) representing io::Error")
127                }
128
129                fn visit_seq<A>(self, mut seq: A) -> Result<Self::Value, A::Error>
130                where
131                    A: SeqAccess<'de>,
132                {
133                    let num: u8 = seq
134                        .next_element()?
135                        .ok_or_else(|| de::Error::invalid_length(0, &self))?;
136                    let message: String = seq
137                        .next_element()?
138                        .ok_or_else(|| de::Error::invalid_length(1, &self))?;
139                    let kind = u8_to_error_kind(num);
140                    Ok(io::Error::new(kind, message))
141                }
142            }
143
144            deserializer.deserialize_tuple(2, IoErrorVisitor)
145        }
146    }
147
148    #[cfg(test)]
149    mod tests {
150        use std::io::{self, ErrorKind};
151
152        use postcard;
153        use serde::{Deserialize, Serialize};
154
155        use super::io_error_serde;
156
157        #[derive(Serialize, Deserialize)]
158        struct TestError(#[serde(with = "io_error_serde")] io::Error);
159
160        #[test]
161        fn test_roundtrip_error_kinds() {
162            let message = "test error";
163            let kinds = [
164                ErrorKind::AddrInUse,
165                ErrorKind::AddrNotAvailable,
166                ErrorKind::AlreadyExists,
167                ErrorKind::ArgumentListTooLong,
168                ErrorKind::BrokenPipe,
169                ErrorKind::ConnectionAborted,
170                ErrorKind::ConnectionRefused,
171                ErrorKind::ConnectionReset,
172                ErrorKind::CrossesDevices,
173                ErrorKind::Deadlock,
174                ErrorKind::DirectoryNotEmpty,
175                ErrorKind::ExecutableFileBusy,
176                ErrorKind::FileTooLarge,
177                ErrorKind::HostUnreachable,
178                ErrorKind::Interrupted,
179                ErrorKind::InvalidData,
180                // ErrorKind::InvalidFilename,
181                ErrorKind::InvalidInput,
182                ErrorKind::IsADirectory,
183                ErrorKind::NetworkDown,
184                ErrorKind::NetworkUnreachable,
185                ErrorKind::NotADirectory,
186                ErrorKind::NotConnected,
187                ErrorKind::NotFound,
188                ErrorKind::NotSeekable,
189                ErrorKind::Other,
190                ErrorKind::OutOfMemory,
191                ErrorKind::PermissionDenied,
192                ErrorKind::QuotaExceeded,
193                ErrorKind::ReadOnlyFilesystem,
194                ErrorKind::ResourceBusy,
195                ErrorKind::StaleNetworkFileHandle,
196                ErrorKind::StorageFull,
197                ErrorKind::TimedOut,
198                ErrorKind::TooManyLinks,
199                ErrorKind::UnexpectedEof,
200                ErrorKind::Unsupported,
201                ErrorKind::WouldBlock,
202                ErrorKind::WriteZero,
203            ];
204
205            for kind in kinds {
206                let err = TestError(io::Error::new(kind, message));
207                let serialized = postcard::to_allocvec(&err).unwrap();
208                let deserialized: TestError = postcard::from_bytes(&serialized).unwrap();
209
210                assert_eq!(err.0.kind(), deserialized.0.kind());
211                assert_eq!(err.0.to_string(), deserialized.0.to_string());
212            }
213        }
214    }
215}
216
217#[cfg(feature = "fs-store")]
218pub(crate) mod outboard_with_progress {
219    use std::io::{self, BufReader, Read};
220
221    use bao_tree::{
222        blake3,
223        io::{
224            outboard::PreOrderOutboard,
225            sync::{OutboardMut, WriteAt},
226        },
227        iter::BaoChunk,
228        BaoTree, ChunkNum,
229    };
230    use smallvec::SmallVec;
231
232    use super::sink::Sink;
233
234    fn hash_subtree(start_chunk: u64, data: &[u8], is_root: bool) -> blake3::Hash {
235        use blake3::hazmat::{ChainingValue, HasherExt};
236        if is_root {
237            debug_assert!(start_chunk == 0);
238            blake3::hash(data)
239        } else {
240            let mut hasher = blake3::Hasher::new();
241            hasher.set_input_offset(start_chunk * 1024);
242            hasher.update(data);
243            let non_root_hash: ChainingValue = hasher.finalize_non_root();
244            blake3::Hash::from(non_root_hash)
245        }
246    }
247
248    fn parent_cv(
249        left_child: &blake3::Hash,
250        right_child: &blake3::Hash,
251        is_root: bool,
252    ) -> blake3::Hash {
253        use blake3::hazmat::{merge_subtrees_non_root, merge_subtrees_root, ChainingValue, Mode};
254        let left_child: ChainingValue = *left_child.as_bytes();
255        let right_child: ChainingValue = *right_child.as_bytes();
256        if is_root {
257            merge_subtrees_root(&left_child, &right_child, Mode::Hash)
258        } else {
259            blake3::Hash::from(merge_subtrees_non_root(
260                &left_child,
261                &right_child,
262                Mode::Hash,
263            ))
264        }
265    }
266
267    pub async fn init_outboard<R, W, P>(
268        data: R,
269        outboard: &mut PreOrderOutboard<W>,
270        progress: &mut P,
271    ) -> std::io::Result<std::result::Result<(), P::Error>>
272    where
273        W: WriteAt,
274        R: Read,
275        P: Sink<ChunkNum>,
276    {
277        // wrap the reader in a buffered reader, so we read in large chunks
278        // this reduces the number of io ops
279        let size = usize::try_from(outboard.tree.size()).unwrap_or(usize::MAX);
280        let read_buf_size = size.min(1024 * 1024);
281        let chunk_buf_size = size.min(outboard.tree.block_size().bytes());
282        let reader = BufReader::with_capacity(read_buf_size, data);
283        let mut buffer = SmallVec::<[u8; 128]>::from_elem(0u8, chunk_buf_size);
284        let res = init_impl(outboard.tree, reader, outboard, &mut buffer, progress).await?;
285        Ok(res)
286    }
287
288    async fn init_impl<W, P>(
289        tree: BaoTree,
290        mut data: impl Read,
291        outboard: &mut PreOrderOutboard<W>,
292        buffer: &mut [u8],
293        progress: &mut P,
294    ) -> io::Result<std::result::Result<(), P::Error>>
295    where
296        W: WriteAt,
297        P: Sink<ChunkNum>,
298    {
299        // do not allocate for small trees
300        let mut stack = SmallVec::<[blake3::Hash; 10]>::new();
301        // debug_assert!(buffer.len() == tree.chunk_group_bytes());
302        for item in tree.post_order_chunks_iter() {
303            match item {
304                BaoChunk::Parent { is_root, node, .. } => {
305                    let right_hash = stack.pop().unwrap();
306                    let left_hash = stack.pop().unwrap();
307                    outboard.save(node, &(left_hash, right_hash))?;
308                    let parent = parent_cv(&left_hash, &right_hash, is_root);
309                    stack.push(parent);
310                }
311                BaoChunk::Leaf {
312                    size,
313                    is_root,
314                    start_chunk,
315                    ..
316                } => {
317                    if let Err(err) = progress.send(start_chunk).await {
318                        return Ok(Err(err));
319                    }
320                    let buf = &mut buffer[..size];
321                    data.read_exact(buf)?;
322                    let hash = hash_subtree(start_chunk.0, buf, is_root);
323                    stack.push(hash);
324                }
325            }
326        }
327        debug_assert_eq!(stack.len(), 1);
328        outboard.root = stack.pop().unwrap();
329        Ok(Ok(()))
330    }
331
332    #[cfg(test)]
333    mod tests {
334        use bao_tree::{
335            blake3,
336            io::{outboard::PreOrderOutboard, sync::CreateOutboard},
337            BaoTree,
338        };
339        use testresult::TestResult;
340
341        use crate::{
342            store::{fs::tests::test_data, IROH_BLOCK_SIZE},
343            util::{outboard_with_progress::init_outboard, sink::Drain},
344        };
345
346        #[tokio::test]
347        async fn init_outboard_with_progress() -> TestResult<()> {
348            for size in [1024 * 18 + 1] {
349                let data = test_data(size);
350                let mut o1 = PreOrderOutboard::<Vec<u8>> {
351                    tree: BaoTree::new(data.len() as u64, IROH_BLOCK_SIZE),
352                    ..Default::default()
353                };
354                let mut o2 = o1.clone();
355                o1.init_from(data.as_ref())?;
356                init_outboard(data.as_ref(), &mut o2, &mut Drain).await??;
357                assert_eq!(o1.root, blake3::hash(&data));
358                assert_eq!(o1.root, o2.root);
359                assert_eq!(o1.data, o2.data);
360            }
361            Ok(())
362        }
363    }
364}
365
366pub(crate) mod sink {
367    use std::future::Future;
368
369    use irpc::RpcMessage;
370
371    /// Our version of a sink, that can be mapped etc.
372    pub trait Sink<Item> {
373        type Error;
374        fn send(
375            &mut self,
376            value: Item,
377        ) -> impl Future<Output = std::result::Result<(), Self::Error>>;
378
379        fn with_map_err<F, U>(self, f: F) -> WithMapErr<Self, F>
380        where
381            Self: Sized,
382            F: Fn(Self::Error) -> U + Send + 'static,
383        {
384            WithMapErr { inner: self, f }
385        }
386
387        fn with_map<F, U>(self, f: F) -> WithMap<Self, F>
388        where
389            Self: Sized,
390            F: Fn(U) -> Item + Send + 'static,
391        {
392            WithMap { inner: self, f }
393        }
394    }
395
396    impl<I, T> Sink<T> for &mut I
397    where
398        I: Sink<T>,
399    {
400        type Error = I::Error;
401
402        async fn send(&mut self, value: T) -> std::result::Result<(), Self::Error> {
403            (*self).send(value).await
404        }
405    }
406
407    #[allow(dead_code)]
408    pub struct IrpcSenderSink<T>(pub irpc::channel::mpsc::Sender<T>);
409
410    impl<T> Sink<T> for IrpcSenderSink<T>
411    where
412        T: RpcMessage,
413    {
414        type Error = irpc::channel::SendError;
415
416        async fn send(&mut self, value: T) -> std::result::Result<(), Self::Error> {
417            self.0.send(value).await
418        }
419    }
420
421    pub struct IrpcSenderRefSink<'a, T>(pub &'a mut irpc::channel::mpsc::Sender<T>);
422
423    impl<'a, T> Sink<T> for IrpcSenderRefSink<'a, T>
424    where
425        T: RpcMessage,
426    {
427        type Error = irpc::channel::SendError;
428
429        async fn send(&mut self, value: T) -> std::result::Result<(), Self::Error> {
430            self.0.send(value).await
431        }
432    }
433
434    pub struct TokioMpscSenderSink<T>(pub tokio::sync::mpsc::Sender<T>);
435
436    impl<T> Sink<T> for TokioMpscSenderSink<T> {
437        type Error = irpc::channel::SendError;
438
439        async fn send(&mut self, value: T) -> std::result::Result<(), Self::Error> {
440            self.0
441                .send(value)
442                .await
443                .map_err(|_| irpc::channel::SendError::ReceiverClosed)
444        }
445    }
446
447    pub struct WithMapErr<P, F> {
448        inner: P,
449        f: F,
450    }
451
452    impl<P, F, E, U> Sink<U> for WithMapErr<P, F>
453    where
454        P: Sink<U>,
455        F: Fn(P::Error) -> E + Send + 'static,
456    {
457        type Error = E;
458
459        async fn send(&mut self, value: U) -> std::result::Result<(), Self::Error> {
460            match self.inner.send(value).await {
461                Ok(()) => Ok(()),
462                Err(err) => {
463                    let err = (self.f)(err);
464                    Err(err)
465                }
466            }
467        }
468    }
469
470    pub struct WithMap<P, F> {
471        inner: P,
472        f: F,
473    }
474
475    impl<P, F, T, U> Sink<T> for WithMap<P, F>
476    where
477        P: Sink<U>,
478        F: Fn(T) -> U + Send + 'static,
479    {
480        type Error = P::Error;
481
482        async fn send(&mut self, value: T) -> std::result::Result<(), Self::Error> {
483            self.inner.send((self.f)(value)).await
484        }
485    }
486
487    pub struct Drain;
488
489    impl<T> Sink<T> for Drain {
490        type Error = irpc::channel::SendError;
491
492        async fn send(&mut self, _offset: T) -> std::result::Result<(), Self::Error> {
493            Ok(())
494        }
495    }
496}