1pub(crate) mod channel;
3pub mod connection_pool;
4mod stream;
5pub(crate) mod temp_tag;
6pub use stream::{
7 AsyncReadRecvStream, AsyncReadRecvStreamExtra, AsyncWriteSendStream, AsyncWriteSendStreamExtra,
8 RecvStream, RecvStreamAsyncStreamReader, SendStream,
9};
10pub(crate) use stream::{RecvStreamExt, SendStreamExt};
11
12pub(crate) mod serde {
13 pub mod io_error_serde {
15 use std::{fmt, io};
16
17 use serde::{
18 de::{self, SeqAccess, Visitor},
19 ser::SerializeTuple,
20 Deserializer, Serializer,
21 };
22
23 fn error_kind_to_u8(kind: io::ErrorKind) -> u8 {
24 match kind {
25 io::ErrorKind::AddrInUse => 0,
26 io::ErrorKind::AddrNotAvailable => 1,
27 io::ErrorKind::AlreadyExists => 2,
28 io::ErrorKind::ArgumentListTooLong => 3,
29 io::ErrorKind::BrokenPipe => 4,
30 io::ErrorKind::ConnectionAborted => 5,
31 io::ErrorKind::ConnectionRefused => 6,
32 io::ErrorKind::ConnectionReset => 7,
33 io::ErrorKind::CrossesDevices => 8,
34 io::ErrorKind::Deadlock => 9,
35 io::ErrorKind::DirectoryNotEmpty => 10,
36 io::ErrorKind::ExecutableFileBusy => 11,
37 io::ErrorKind::FileTooLarge => 12,
38 io::ErrorKind::HostUnreachable => 13,
39 io::ErrorKind::Interrupted => 14,
40 io::ErrorKind::InvalidData => 15,
41 io::ErrorKind::InvalidInput => 17,
42 io::ErrorKind::IsADirectory => 18,
43 io::ErrorKind::NetworkDown => 19,
44 io::ErrorKind::NetworkUnreachable => 20,
45 io::ErrorKind::NotADirectory => 21,
46 io::ErrorKind::NotConnected => 22,
47 io::ErrorKind::NotFound => 23,
48 io::ErrorKind::NotSeekable => 24,
49 io::ErrorKind::Other => 25,
50 io::ErrorKind::OutOfMemory => 26,
51 io::ErrorKind::PermissionDenied => 27,
52 io::ErrorKind::QuotaExceeded => 28,
53 io::ErrorKind::ReadOnlyFilesystem => 29,
54 io::ErrorKind::ResourceBusy => 30,
55 io::ErrorKind::StaleNetworkFileHandle => 31,
56 io::ErrorKind::StorageFull => 32,
57 io::ErrorKind::TimedOut => 33,
58 io::ErrorKind::TooManyLinks => 34,
59 io::ErrorKind::UnexpectedEof => 35,
60 io::ErrorKind::Unsupported => 36,
61 io::ErrorKind::WouldBlock => 37,
62 io::ErrorKind::WriteZero => 38,
63 _ => 25,
64 }
65 }
66
67 fn u8_to_error_kind(num: u8) -> io::ErrorKind {
68 match num {
69 0 => io::ErrorKind::AddrInUse,
70 1 => io::ErrorKind::AddrNotAvailable,
71 2 => io::ErrorKind::AlreadyExists,
72 3 => io::ErrorKind::ArgumentListTooLong,
73 4 => io::ErrorKind::BrokenPipe,
74 5 => io::ErrorKind::ConnectionAborted,
75 6 => io::ErrorKind::ConnectionRefused,
76 7 => io::ErrorKind::ConnectionReset,
77 8 => io::ErrorKind::CrossesDevices,
78 9 => io::ErrorKind::Deadlock,
79 10 => io::ErrorKind::DirectoryNotEmpty,
80 11 => io::ErrorKind::ExecutableFileBusy,
81 12 => io::ErrorKind::FileTooLarge,
82 13 => io::ErrorKind::HostUnreachable,
83 14 => io::ErrorKind::Interrupted,
84 15 => io::ErrorKind::InvalidData,
85 17 => io::ErrorKind::InvalidInput,
87 18 => io::ErrorKind::IsADirectory,
88 19 => io::ErrorKind::NetworkDown,
89 20 => io::ErrorKind::NetworkUnreachable,
90 21 => io::ErrorKind::NotADirectory,
91 22 => io::ErrorKind::NotConnected,
92 23 => io::ErrorKind::NotFound,
93 24 => io::ErrorKind::NotSeekable,
94 25 => io::ErrorKind::Other,
95 26 => io::ErrorKind::OutOfMemory,
96 27 => io::ErrorKind::PermissionDenied,
97 28 => io::ErrorKind::QuotaExceeded,
98 29 => io::ErrorKind::ReadOnlyFilesystem,
99 30 => io::ErrorKind::ResourceBusy,
100 31 => io::ErrorKind::StaleNetworkFileHandle,
101 32 => io::ErrorKind::StorageFull,
102 33 => io::ErrorKind::TimedOut,
103 34 => io::ErrorKind::TooManyLinks,
104 35 => io::ErrorKind::UnexpectedEof,
105 36 => io::ErrorKind::Unsupported,
106 37 => io::ErrorKind::WouldBlock,
107 38 => io::ErrorKind::WriteZero,
108 _ => io::ErrorKind::Other,
109 }
110 }
111
112 pub fn serialize<S>(error: &io::Error, serializer: S) -> Result<S::Ok, S::Error>
113 where
114 S: Serializer,
115 {
116 let mut tup = serializer.serialize_tuple(2)?;
117 tup.serialize_element(&error_kind_to_u8(error.kind()))?;
118 tup.serialize_element(&error.to_string())?;
119 tup.end()
120 }
121
122 pub fn deserialize<'de, D>(deserializer: D) -> Result<io::Error, D::Error>
123 where
124 D: Deserializer<'de>,
125 {
126 struct IoErrorVisitor;
127
128 impl<'de> Visitor<'de> for IoErrorVisitor {
129 type Value = io::Error;
130
131 fn expecting(&self, formatter: &mut fmt::Formatter) -> fmt::Result {
132 formatter.write_str("a tuple of (u32, String) representing io::Error")
133 }
134
135 fn visit_seq<A>(self, mut seq: A) -> Result<Self::Value, A::Error>
136 where
137 A: SeqAccess<'de>,
138 {
139 let num: u8 = seq
140 .next_element()?
141 .ok_or_else(|| de::Error::invalid_length(0, &self))?;
142 let message: String = seq
143 .next_element()?
144 .ok_or_else(|| de::Error::invalid_length(1, &self))?;
145 let kind = u8_to_error_kind(num);
146 Ok(io::Error::new(kind, message))
147 }
148 }
149
150 deserializer.deserialize_tuple(2, IoErrorVisitor)
151 }
152 }
153
154 #[cfg(test)]
155 mod tests {
156 use std::io::{self, ErrorKind};
157
158 use postcard;
159 use serde::{Deserialize, Serialize};
160
161 use super::io_error_serde;
162
163 #[derive(Serialize, Deserialize)]
164 struct TestError(#[serde(with = "io_error_serde")] io::Error);
165
166 #[test]
167 fn test_roundtrip_error_kinds() {
168 let message = "test error";
169 let kinds = [
170 ErrorKind::AddrInUse,
171 ErrorKind::AddrNotAvailable,
172 ErrorKind::AlreadyExists,
173 ErrorKind::ArgumentListTooLong,
174 ErrorKind::BrokenPipe,
175 ErrorKind::ConnectionAborted,
176 ErrorKind::ConnectionRefused,
177 ErrorKind::ConnectionReset,
178 ErrorKind::CrossesDevices,
179 ErrorKind::Deadlock,
180 ErrorKind::DirectoryNotEmpty,
181 ErrorKind::ExecutableFileBusy,
182 ErrorKind::FileTooLarge,
183 ErrorKind::HostUnreachable,
184 ErrorKind::Interrupted,
185 ErrorKind::InvalidData,
186 ErrorKind::InvalidInput,
188 ErrorKind::IsADirectory,
189 ErrorKind::NetworkDown,
190 ErrorKind::NetworkUnreachable,
191 ErrorKind::NotADirectory,
192 ErrorKind::NotConnected,
193 ErrorKind::NotFound,
194 ErrorKind::NotSeekable,
195 ErrorKind::Other,
196 ErrorKind::OutOfMemory,
197 ErrorKind::PermissionDenied,
198 ErrorKind::QuotaExceeded,
199 ErrorKind::ReadOnlyFilesystem,
200 ErrorKind::ResourceBusy,
201 ErrorKind::StaleNetworkFileHandle,
202 ErrorKind::StorageFull,
203 ErrorKind::TimedOut,
204 ErrorKind::TooManyLinks,
205 ErrorKind::UnexpectedEof,
206 ErrorKind::Unsupported,
207 ErrorKind::WouldBlock,
208 ErrorKind::WriteZero,
209 ];
210
211 for kind in kinds {
212 let err = TestError(io::Error::new(kind, message));
213 let serialized = postcard::to_allocvec(&err).unwrap();
214 let deserialized: TestError = postcard::from_bytes(&serialized).unwrap();
215
216 assert_eq!(err.0.kind(), deserialized.0.kind());
217 assert_eq!(err.0.to_string(), deserialized.0.to_string());
218 }
219 }
220 }
221}
222
223#[cfg(feature = "fs-store")]
224pub(crate) mod outboard_with_progress {
225 use std::io::{self, BufReader, Read};
226
227 use bao_tree::{
228 blake3,
229 io::{
230 outboard::PreOrderOutboard,
231 sync::{OutboardMut, WriteAt},
232 },
233 iter::BaoChunk,
234 BaoTree, ChunkNum,
235 };
236 use smallvec::SmallVec;
237
238 use super::sink::Sink;
239
240 fn hash_subtree(start_chunk: u64, data: &[u8], is_root: bool) -> blake3::Hash {
241 use blake3::hazmat::{ChainingValue, HasherExt};
242 if is_root {
243 debug_assert!(start_chunk == 0);
244 blake3::hash(data)
245 } else {
246 let mut hasher = blake3::Hasher::new();
247 hasher.set_input_offset(start_chunk * 1024);
248 hasher.update(data);
249 let non_root_hash: ChainingValue = hasher.finalize_non_root();
250 blake3::Hash::from(non_root_hash)
251 }
252 }
253
254 fn parent_cv(
255 left_child: &blake3::Hash,
256 right_child: &blake3::Hash,
257 is_root: bool,
258 ) -> blake3::Hash {
259 use blake3::hazmat::{merge_subtrees_non_root, merge_subtrees_root, ChainingValue, Mode};
260 let left_child: ChainingValue = *left_child.as_bytes();
261 let right_child: ChainingValue = *right_child.as_bytes();
262 if is_root {
263 merge_subtrees_root(&left_child, &right_child, Mode::Hash)
264 } else {
265 blake3::Hash::from(merge_subtrees_non_root(
266 &left_child,
267 &right_child,
268 Mode::Hash,
269 ))
270 }
271 }
272
273 pub async fn init_outboard<R, W, P>(
274 data: R,
275 outboard: &mut PreOrderOutboard<W>,
276 progress: &mut P,
277 ) -> std::io::Result<std::result::Result<(), P::Error>>
278 where
279 W: WriteAt,
280 R: Read,
281 P: Sink<ChunkNum>,
282 {
283 let size = usize::try_from(outboard.tree.size()).unwrap_or(usize::MAX);
286 let read_buf_size = size.min(1024 * 1024);
287 let chunk_buf_size = size.min(outboard.tree.block_size().bytes());
288 let reader = BufReader::with_capacity(read_buf_size, data);
289 let mut buffer = SmallVec::<[u8; 128]>::from_elem(0u8, chunk_buf_size);
290 let res = init_impl(outboard.tree, reader, outboard, &mut buffer, progress).await?;
291 Ok(res)
292 }
293
294 async fn init_impl<W, P>(
295 tree: BaoTree,
296 mut data: impl Read,
297 outboard: &mut PreOrderOutboard<W>,
298 buffer: &mut [u8],
299 progress: &mut P,
300 ) -> io::Result<std::result::Result<(), P::Error>>
301 where
302 W: WriteAt,
303 P: Sink<ChunkNum>,
304 {
305 let mut stack = SmallVec::<[blake3::Hash; 10]>::new();
307 for item in tree.post_order_chunks_iter() {
309 match item {
310 BaoChunk::Parent { is_root, node, .. } => {
311 let right_hash = stack.pop().unwrap();
312 let left_hash = stack.pop().unwrap();
313 outboard.save(node, &(left_hash, right_hash))?;
314 let parent = parent_cv(&left_hash, &right_hash, is_root);
315 stack.push(parent);
316 }
317 BaoChunk::Leaf {
318 size,
319 is_root,
320 start_chunk,
321 ..
322 } => {
323 if let Err(err) = progress.send(start_chunk).await {
324 return Ok(Err(err));
325 }
326 let buf = &mut buffer[..size];
327 data.read_exact(buf)?;
328 let hash = hash_subtree(start_chunk.0, buf, is_root);
329 stack.push(hash);
330 }
331 }
332 }
333 debug_assert_eq!(stack.len(), 1);
334 outboard.root = stack.pop().unwrap();
335 Ok(Ok(()))
336 }
337
338 #[cfg(test)]
339 mod tests {
340 use bao_tree::{
341 blake3,
342 io::{outboard::PreOrderOutboard, sync::CreateOutboard},
343 BaoTree,
344 };
345 use testresult::TestResult;
346
347 use crate::{
348 store::{fs::tests::test_data, IROH_BLOCK_SIZE},
349 util::{outboard_with_progress::init_outboard, sink::Drain},
350 };
351
352 #[tokio::test]
353 async fn init_outboard_with_progress() -> TestResult<()> {
354 for size in [1024 * 18 + 1] {
355 let data = test_data(size);
356 let mut o1 = PreOrderOutboard::<Vec<u8>> {
357 tree: BaoTree::new(data.len() as u64, IROH_BLOCK_SIZE),
358 ..Default::default()
359 };
360 let mut o2 = o1.clone();
361 o1.init_from(data.as_ref())?;
362 init_outboard(data.as_ref(), &mut o2, &mut Drain).await??;
363 assert_eq!(o1.root, blake3::hash(&data));
364 assert_eq!(o1.root, o2.root);
365 assert_eq!(o1.data, o2.data);
366 }
367 Ok(())
368 }
369 }
370}
371
372pub(crate) mod sink {
373 use std::future::Future;
374
375 use irpc::RpcMessage;
376
377 pub trait Sink<Item> {
379 type Error;
380 fn send(
381 &mut self,
382 value: Item,
383 ) -> impl Future<Output = std::result::Result<(), Self::Error>>;
384
385 fn with_map_err<F, U>(self, f: F) -> WithMapErr<Self, F>
386 where
387 Self: Sized,
388 F: Fn(Self::Error) -> U + Send + 'static,
389 {
390 WithMapErr { inner: self, f }
391 }
392
393 fn with_map<F, U>(self, f: F) -> WithMap<Self, F>
394 where
395 Self: Sized,
396 F: Fn(U) -> Item + Send + 'static,
397 {
398 WithMap { inner: self, f }
399 }
400 }
401
402 impl<I, T> Sink<T> for &mut I
403 where
404 I: Sink<T>,
405 {
406 type Error = I::Error;
407
408 async fn send(&mut self, value: T) -> std::result::Result<(), Self::Error> {
409 (*self).send(value).await
410 }
411 }
412
413 #[allow(dead_code)]
414 pub struct IrpcSenderSink<T>(pub irpc::channel::mpsc::Sender<T>);
415
416 impl<T> Sink<T> for IrpcSenderSink<T>
417 where
418 T: RpcMessage,
419 {
420 type Error = irpc::channel::SendError;
421
422 async fn send(&mut self, value: T) -> std::result::Result<(), Self::Error> {
423 self.0.send(value).await
424 }
425 }
426
427 pub struct IrpcSenderRefSink<'a, T>(pub &'a mut irpc::channel::mpsc::Sender<T>);
428
429 impl<'a, T> Sink<T> for IrpcSenderRefSink<'a, T>
430 where
431 T: RpcMessage,
432 {
433 type Error = irpc::channel::SendError;
434
435 async fn send(&mut self, value: T) -> std::result::Result<(), Self::Error> {
436 self.0.send(value).await
437 }
438 }
439
440 pub struct TokioMpscSenderSink<T>(pub tokio::sync::mpsc::Sender<T>);
441
442 impl<T> Sink<T> for TokioMpscSenderSink<T> {
443 type Error = irpc::channel::SendError;
444
445 async fn send(&mut self, value: T) -> std::result::Result<(), Self::Error> {
446 self.0
447 .send(value)
448 .await
449 .map_err(|_| irpc::channel::SendError::ReceiverClosed)
450 }
451 }
452
453 pub struct WithMapErr<P, F> {
454 inner: P,
455 f: F,
456 }
457
458 impl<P, F, E, U> Sink<U> for WithMapErr<P, F>
459 where
460 P: Sink<U>,
461 F: Fn(P::Error) -> E + Send + 'static,
462 {
463 type Error = E;
464
465 async fn send(&mut self, value: U) -> std::result::Result<(), Self::Error> {
466 match self.inner.send(value).await {
467 Ok(()) => Ok(()),
468 Err(err) => {
469 let err = (self.f)(err);
470 Err(err)
471 }
472 }
473 }
474 }
475
476 pub struct WithMap<P, F> {
477 inner: P,
478 f: F,
479 }
480
481 impl<P, F, T, U> Sink<T> for WithMap<P, F>
482 where
483 P: Sink<U>,
484 F: Fn(T) -> U + Send + 'static,
485 {
486 type Error = P::Error;
487
488 async fn send(&mut self, value: T) -> std::result::Result<(), Self::Error> {
489 self.inner.send((self.f)(value)).await
490 }
491 }
492
493 pub struct Drain;
494
495 impl<T> Sink<T> for Drain {
496 type Error = irpc::channel::SendError;
497
498 async fn send(&mut self, _offset: T) -> std::result::Result<(), Self::Error> {
499 Ok(())
500 }
501 }
502}