iroh_blobs/
util.rs

1//! Utilities
2pub(crate) mod channel;
3pub mod connection_pool;
4mod stream;
5pub(crate) mod temp_tag;
6pub use stream::{
7    AsyncReadRecvStream, AsyncReadRecvStreamExtra, AsyncWriteSendStream, AsyncWriteSendStreamExtra,
8    RecvStream, RecvStreamAsyncStreamReader, SendStream,
9};
10pub(crate) use stream::{RecvStreamExt, SendStreamExt};
11
12pub(crate) mod serde {
13    // Module that handles io::Error serialization/deserialization
14    pub mod io_error_serde {
15        use std::{fmt, io};
16
17        use serde::{
18            de::{self, SeqAccess, Visitor},
19            ser::SerializeTuple,
20            Deserializer, Serializer,
21        };
22
23        fn error_kind_to_u8(kind: io::ErrorKind) -> u8 {
24            match kind {
25                io::ErrorKind::AddrInUse => 0,
26                io::ErrorKind::AddrNotAvailable => 1,
27                io::ErrorKind::AlreadyExists => 2,
28                io::ErrorKind::ArgumentListTooLong => 3,
29                io::ErrorKind::BrokenPipe => 4,
30                io::ErrorKind::ConnectionAborted => 5,
31                io::ErrorKind::ConnectionRefused => 6,
32                io::ErrorKind::ConnectionReset => 7,
33                io::ErrorKind::CrossesDevices => 8,
34                io::ErrorKind::Deadlock => 9,
35                io::ErrorKind::DirectoryNotEmpty => 10,
36                io::ErrorKind::ExecutableFileBusy => 11,
37                io::ErrorKind::FileTooLarge => 12,
38                io::ErrorKind::HostUnreachable => 13,
39                io::ErrorKind::Interrupted => 14,
40                io::ErrorKind::InvalidData => 15,
41                io::ErrorKind::InvalidInput => 17,
42                io::ErrorKind::IsADirectory => 18,
43                io::ErrorKind::NetworkDown => 19,
44                io::ErrorKind::NetworkUnreachable => 20,
45                io::ErrorKind::NotADirectory => 21,
46                io::ErrorKind::NotConnected => 22,
47                io::ErrorKind::NotFound => 23,
48                io::ErrorKind::NotSeekable => 24,
49                io::ErrorKind::Other => 25,
50                io::ErrorKind::OutOfMemory => 26,
51                io::ErrorKind::PermissionDenied => 27,
52                io::ErrorKind::QuotaExceeded => 28,
53                io::ErrorKind::ReadOnlyFilesystem => 29,
54                io::ErrorKind::ResourceBusy => 30,
55                io::ErrorKind::StaleNetworkFileHandle => 31,
56                io::ErrorKind::StorageFull => 32,
57                io::ErrorKind::TimedOut => 33,
58                io::ErrorKind::TooManyLinks => 34,
59                io::ErrorKind::UnexpectedEof => 35,
60                io::ErrorKind::Unsupported => 36,
61                io::ErrorKind::WouldBlock => 37,
62                io::ErrorKind::WriteZero => 38,
63                _ => 25,
64            }
65        }
66
67        fn u8_to_error_kind(num: u8) -> io::ErrorKind {
68            match num {
69                0 => io::ErrorKind::AddrInUse,
70                1 => io::ErrorKind::AddrNotAvailable,
71                2 => io::ErrorKind::AlreadyExists,
72                3 => io::ErrorKind::ArgumentListTooLong,
73                4 => io::ErrorKind::BrokenPipe,
74                5 => io::ErrorKind::ConnectionAborted,
75                6 => io::ErrorKind::ConnectionRefused,
76                7 => io::ErrorKind::ConnectionReset,
77                8 => io::ErrorKind::CrossesDevices,
78                9 => io::ErrorKind::Deadlock,
79                10 => io::ErrorKind::DirectoryNotEmpty,
80                11 => io::ErrorKind::ExecutableFileBusy,
81                12 => io::ErrorKind::FileTooLarge,
82                13 => io::ErrorKind::HostUnreachable,
83                14 => io::ErrorKind::Interrupted,
84                15 => io::ErrorKind::InvalidData,
85                // 16 => io::ErrorKind::InvalidFilename,
86                17 => io::ErrorKind::InvalidInput,
87                18 => io::ErrorKind::IsADirectory,
88                19 => io::ErrorKind::NetworkDown,
89                20 => io::ErrorKind::NetworkUnreachable,
90                21 => io::ErrorKind::NotADirectory,
91                22 => io::ErrorKind::NotConnected,
92                23 => io::ErrorKind::NotFound,
93                24 => io::ErrorKind::NotSeekable,
94                25 => io::ErrorKind::Other,
95                26 => io::ErrorKind::OutOfMemory,
96                27 => io::ErrorKind::PermissionDenied,
97                28 => io::ErrorKind::QuotaExceeded,
98                29 => io::ErrorKind::ReadOnlyFilesystem,
99                30 => io::ErrorKind::ResourceBusy,
100                31 => io::ErrorKind::StaleNetworkFileHandle,
101                32 => io::ErrorKind::StorageFull,
102                33 => io::ErrorKind::TimedOut,
103                34 => io::ErrorKind::TooManyLinks,
104                35 => io::ErrorKind::UnexpectedEof,
105                36 => io::ErrorKind::Unsupported,
106                37 => io::ErrorKind::WouldBlock,
107                38 => io::ErrorKind::WriteZero,
108                _ => io::ErrorKind::Other,
109            }
110        }
111
112        pub fn serialize<S>(error: &io::Error, serializer: S) -> Result<S::Ok, S::Error>
113        where
114            S: Serializer,
115        {
116            let mut tup = serializer.serialize_tuple(2)?;
117            tup.serialize_element(&error_kind_to_u8(error.kind()))?;
118            tup.serialize_element(&error.to_string())?;
119            tup.end()
120        }
121
122        pub fn deserialize<'de, D>(deserializer: D) -> Result<io::Error, D::Error>
123        where
124            D: Deserializer<'de>,
125        {
126            struct IoErrorVisitor;
127
128            impl<'de> Visitor<'de> for IoErrorVisitor {
129                type Value = io::Error;
130
131                fn expecting(&self, formatter: &mut fmt::Formatter) -> fmt::Result {
132                    formatter.write_str("a tuple of (u32, String) representing io::Error")
133                }
134
135                fn visit_seq<A>(self, mut seq: A) -> Result<Self::Value, A::Error>
136                where
137                    A: SeqAccess<'de>,
138                {
139                    let num: u8 = seq
140                        .next_element()?
141                        .ok_or_else(|| de::Error::invalid_length(0, &self))?;
142                    let message: String = seq
143                        .next_element()?
144                        .ok_or_else(|| de::Error::invalid_length(1, &self))?;
145                    let kind = u8_to_error_kind(num);
146                    Ok(io::Error::new(kind, message))
147                }
148            }
149
150            deserializer.deserialize_tuple(2, IoErrorVisitor)
151        }
152    }
153
154    #[cfg(test)]
155    mod tests {
156        use std::io::{self, ErrorKind};
157
158        use postcard;
159        use serde::{Deserialize, Serialize};
160
161        use super::io_error_serde;
162
163        #[derive(Serialize, Deserialize)]
164        struct TestError(#[serde(with = "io_error_serde")] io::Error);
165
166        #[test]
167        fn test_roundtrip_error_kinds() {
168            let message = "test error";
169            let kinds = [
170                ErrorKind::AddrInUse,
171                ErrorKind::AddrNotAvailable,
172                ErrorKind::AlreadyExists,
173                ErrorKind::ArgumentListTooLong,
174                ErrorKind::BrokenPipe,
175                ErrorKind::ConnectionAborted,
176                ErrorKind::ConnectionRefused,
177                ErrorKind::ConnectionReset,
178                ErrorKind::CrossesDevices,
179                ErrorKind::Deadlock,
180                ErrorKind::DirectoryNotEmpty,
181                ErrorKind::ExecutableFileBusy,
182                ErrorKind::FileTooLarge,
183                ErrorKind::HostUnreachable,
184                ErrorKind::Interrupted,
185                ErrorKind::InvalidData,
186                // ErrorKind::InvalidFilename,
187                ErrorKind::InvalidInput,
188                ErrorKind::IsADirectory,
189                ErrorKind::NetworkDown,
190                ErrorKind::NetworkUnreachable,
191                ErrorKind::NotADirectory,
192                ErrorKind::NotConnected,
193                ErrorKind::NotFound,
194                ErrorKind::NotSeekable,
195                ErrorKind::Other,
196                ErrorKind::OutOfMemory,
197                ErrorKind::PermissionDenied,
198                ErrorKind::QuotaExceeded,
199                ErrorKind::ReadOnlyFilesystem,
200                ErrorKind::ResourceBusy,
201                ErrorKind::StaleNetworkFileHandle,
202                ErrorKind::StorageFull,
203                ErrorKind::TimedOut,
204                ErrorKind::TooManyLinks,
205                ErrorKind::UnexpectedEof,
206                ErrorKind::Unsupported,
207                ErrorKind::WouldBlock,
208                ErrorKind::WriteZero,
209            ];
210
211            for kind in kinds {
212                let err = TestError(io::Error::new(kind, message));
213                let serialized = postcard::to_allocvec(&err).unwrap();
214                let deserialized: TestError = postcard::from_bytes(&serialized).unwrap();
215
216                assert_eq!(err.0.kind(), deserialized.0.kind());
217                assert_eq!(err.0.to_string(), deserialized.0.to_string());
218            }
219        }
220    }
221}
222
223#[cfg(feature = "fs-store")]
224pub(crate) mod outboard_with_progress {
225    use std::io::{self, BufReader, Read};
226
227    use bao_tree::{
228        blake3,
229        io::{
230            outboard::PreOrderOutboard,
231            sync::{OutboardMut, WriteAt},
232        },
233        iter::BaoChunk,
234        BaoTree, ChunkNum,
235    };
236    use smallvec::SmallVec;
237
238    use super::sink::Sink;
239
240    fn hash_subtree(start_chunk: u64, data: &[u8], is_root: bool) -> blake3::Hash {
241        use blake3::hazmat::{ChainingValue, HasherExt};
242        if is_root {
243            debug_assert!(start_chunk == 0);
244            blake3::hash(data)
245        } else {
246            let mut hasher = blake3::Hasher::new();
247            hasher.set_input_offset(start_chunk * 1024);
248            hasher.update(data);
249            let non_root_hash: ChainingValue = hasher.finalize_non_root();
250            blake3::Hash::from(non_root_hash)
251        }
252    }
253
254    fn parent_cv(
255        left_child: &blake3::Hash,
256        right_child: &blake3::Hash,
257        is_root: bool,
258    ) -> blake3::Hash {
259        use blake3::hazmat::{merge_subtrees_non_root, merge_subtrees_root, ChainingValue, Mode};
260        let left_child: ChainingValue = *left_child.as_bytes();
261        let right_child: ChainingValue = *right_child.as_bytes();
262        if is_root {
263            merge_subtrees_root(&left_child, &right_child, Mode::Hash)
264        } else {
265            blake3::Hash::from(merge_subtrees_non_root(
266                &left_child,
267                &right_child,
268                Mode::Hash,
269            ))
270        }
271    }
272
273    pub async fn init_outboard<R, W, P>(
274        data: R,
275        outboard: &mut PreOrderOutboard<W>,
276        progress: &mut P,
277    ) -> std::io::Result<std::result::Result<(), P::Error>>
278    where
279        W: WriteAt,
280        R: Read,
281        P: Sink<ChunkNum>,
282    {
283        // wrap the reader in a buffered reader, so we read in large chunks
284        // this reduces the number of io ops
285        let size = usize::try_from(outboard.tree.size()).unwrap_or(usize::MAX);
286        let read_buf_size = size.min(1024 * 1024);
287        let chunk_buf_size = size.min(outboard.tree.block_size().bytes());
288        let reader = BufReader::with_capacity(read_buf_size, data);
289        let mut buffer = SmallVec::<[u8; 128]>::from_elem(0u8, chunk_buf_size);
290        let res = init_impl(outboard.tree, reader, outboard, &mut buffer, progress).await?;
291        Ok(res)
292    }
293
294    async fn init_impl<W, P>(
295        tree: BaoTree,
296        mut data: impl Read,
297        outboard: &mut PreOrderOutboard<W>,
298        buffer: &mut [u8],
299        progress: &mut P,
300    ) -> io::Result<std::result::Result<(), P::Error>>
301    where
302        W: WriteAt,
303        P: Sink<ChunkNum>,
304    {
305        // do not allocate for small trees
306        let mut stack = SmallVec::<[blake3::Hash; 10]>::new();
307        // debug_assert!(buffer.len() == tree.chunk_group_bytes());
308        for item in tree.post_order_chunks_iter() {
309            match item {
310                BaoChunk::Parent { is_root, node, .. } => {
311                    let right_hash = stack.pop().unwrap();
312                    let left_hash = stack.pop().unwrap();
313                    outboard.save(node, &(left_hash, right_hash))?;
314                    let parent = parent_cv(&left_hash, &right_hash, is_root);
315                    stack.push(parent);
316                }
317                BaoChunk::Leaf {
318                    size,
319                    is_root,
320                    start_chunk,
321                    ..
322                } => {
323                    if let Err(err) = progress.send(start_chunk).await {
324                        return Ok(Err(err));
325                    }
326                    let buf = &mut buffer[..size];
327                    data.read_exact(buf)?;
328                    let hash = hash_subtree(start_chunk.0, buf, is_root);
329                    stack.push(hash);
330                }
331            }
332        }
333        debug_assert_eq!(stack.len(), 1);
334        outboard.root = stack.pop().unwrap();
335        Ok(Ok(()))
336    }
337
338    #[cfg(test)]
339    mod tests {
340        use bao_tree::{
341            blake3,
342            io::{outboard::PreOrderOutboard, sync::CreateOutboard},
343            BaoTree,
344        };
345        use testresult::TestResult;
346
347        use crate::{
348            store::{fs::tests::test_data, IROH_BLOCK_SIZE},
349            util::{outboard_with_progress::init_outboard, sink::Drain},
350        };
351
352        #[tokio::test]
353        async fn init_outboard_with_progress() -> TestResult<()> {
354            for size in [1024 * 18 + 1] {
355                let data = test_data(size);
356                let mut o1 = PreOrderOutboard::<Vec<u8>> {
357                    tree: BaoTree::new(data.len() as u64, IROH_BLOCK_SIZE),
358                    ..Default::default()
359                };
360                let mut o2 = o1.clone();
361                o1.init_from(data.as_ref())?;
362                init_outboard(data.as_ref(), &mut o2, &mut Drain).await??;
363                assert_eq!(o1.root, blake3::hash(&data));
364                assert_eq!(o1.root, o2.root);
365                assert_eq!(o1.data, o2.data);
366            }
367            Ok(())
368        }
369    }
370}
371
372pub(crate) mod sink {
373    use std::future::Future;
374
375    use irpc::RpcMessage;
376
377    /// Our version of a sink, that can be mapped etc.
378    pub trait Sink<Item> {
379        type Error;
380        fn send(
381            &mut self,
382            value: Item,
383        ) -> impl Future<Output = std::result::Result<(), Self::Error>>;
384
385        fn with_map_err<F, U>(self, f: F) -> WithMapErr<Self, F>
386        where
387            Self: Sized,
388            F: Fn(Self::Error) -> U + Send + 'static,
389        {
390            WithMapErr { inner: self, f }
391        }
392
393        fn with_map<F, U>(self, f: F) -> WithMap<Self, F>
394        where
395            Self: Sized,
396            F: Fn(U) -> Item + Send + 'static,
397        {
398            WithMap { inner: self, f }
399        }
400    }
401
402    impl<I, T> Sink<T> for &mut I
403    where
404        I: Sink<T>,
405    {
406        type Error = I::Error;
407
408        async fn send(&mut self, value: T) -> std::result::Result<(), Self::Error> {
409            (*self).send(value).await
410        }
411    }
412
413    #[allow(dead_code)]
414    pub struct IrpcSenderSink<T>(pub irpc::channel::mpsc::Sender<T>);
415
416    impl<T> Sink<T> for IrpcSenderSink<T>
417    where
418        T: RpcMessage,
419    {
420        type Error = irpc::channel::SendError;
421
422        async fn send(&mut self, value: T) -> std::result::Result<(), Self::Error> {
423            self.0.send(value).await
424        }
425    }
426
427    pub struct IrpcSenderRefSink<'a, T>(pub &'a mut irpc::channel::mpsc::Sender<T>);
428
429    impl<'a, T> Sink<T> for IrpcSenderRefSink<'a, T>
430    where
431        T: RpcMessage,
432    {
433        type Error = irpc::channel::SendError;
434
435        async fn send(&mut self, value: T) -> std::result::Result<(), Self::Error> {
436            self.0.send(value).await
437        }
438    }
439
440    pub struct TokioMpscSenderSink<T>(pub tokio::sync::mpsc::Sender<T>);
441
442    impl<T> Sink<T> for TokioMpscSenderSink<T> {
443        type Error = irpc::channel::SendError;
444
445        async fn send(&mut self, value: T) -> std::result::Result<(), Self::Error> {
446            self.0
447                .send(value)
448                .await
449                .map_err(|_| irpc::channel::SendError::ReceiverClosed)
450        }
451    }
452
453    pub struct WithMapErr<P, F> {
454        inner: P,
455        f: F,
456    }
457
458    impl<P, F, E, U> Sink<U> for WithMapErr<P, F>
459    where
460        P: Sink<U>,
461        F: Fn(P::Error) -> E + Send + 'static,
462    {
463        type Error = E;
464
465        async fn send(&mut self, value: U) -> std::result::Result<(), Self::Error> {
466            match self.inner.send(value).await {
467                Ok(()) => Ok(()),
468                Err(err) => {
469                    let err = (self.f)(err);
470                    Err(err)
471                }
472            }
473        }
474    }
475
476    pub struct WithMap<P, F> {
477        inner: P,
478        f: F,
479    }
480
481    impl<P, F, T, U> Sink<T> for WithMap<P, F>
482    where
483        P: Sink<U>,
484        F: Fn(T) -> U + Send + 'static,
485    {
486        type Error = P::Error;
487
488        async fn send(&mut self, value: T) -> std::result::Result<(), Self::Error> {
489            self.inner.send((self.f)(value)).await
490        }
491    }
492
493    pub struct Drain;
494
495    impl<T> Sink<T> for Drain {
496        type Error = irpc::channel::SendError;
497
498        async fn send(&mut self, _offset: T) -> std::result::Result<(), Self::Error> {
499            Ok(())
500        }
501    }
502}