1pub(crate) mod channel;
3pub mod connection_pool;
4pub(crate) mod temp_tag;
5
6pub(crate) mod serde {
7 pub mod io_error_serde {
9 use std::{fmt, io};
10
11 use serde::{
12 de::{self, SeqAccess, Visitor},
13 ser::SerializeTuple,
14 Deserializer, Serializer,
15 };
16
17 fn error_kind_to_u8(kind: io::ErrorKind) -> u8 {
18 match kind {
19 io::ErrorKind::AddrInUse => 0,
20 io::ErrorKind::AddrNotAvailable => 1,
21 io::ErrorKind::AlreadyExists => 2,
22 io::ErrorKind::ArgumentListTooLong => 3,
23 io::ErrorKind::BrokenPipe => 4,
24 io::ErrorKind::ConnectionAborted => 5,
25 io::ErrorKind::ConnectionRefused => 6,
26 io::ErrorKind::ConnectionReset => 7,
27 io::ErrorKind::CrossesDevices => 8,
28 io::ErrorKind::Deadlock => 9,
29 io::ErrorKind::DirectoryNotEmpty => 10,
30 io::ErrorKind::ExecutableFileBusy => 11,
31 io::ErrorKind::FileTooLarge => 12,
32 io::ErrorKind::HostUnreachable => 13,
33 io::ErrorKind::Interrupted => 14,
34 io::ErrorKind::InvalidData => 15,
35 io::ErrorKind::InvalidInput => 17,
36 io::ErrorKind::IsADirectory => 18,
37 io::ErrorKind::NetworkDown => 19,
38 io::ErrorKind::NetworkUnreachable => 20,
39 io::ErrorKind::NotADirectory => 21,
40 io::ErrorKind::NotConnected => 22,
41 io::ErrorKind::NotFound => 23,
42 io::ErrorKind::NotSeekable => 24,
43 io::ErrorKind::Other => 25,
44 io::ErrorKind::OutOfMemory => 26,
45 io::ErrorKind::PermissionDenied => 27,
46 io::ErrorKind::QuotaExceeded => 28,
47 io::ErrorKind::ReadOnlyFilesystem => 29,
48 io::ErrorKind::ResourceBusy => 30,
49 io::ErrorKind::StaleNetworkFileHandle => 31,
50 io::ErrorKind::StorageFull => 32,
51 io::ErrorKind::TimedOut => 33,
52 io::ErrorKind::TooManyLinks => 34,
53 io::ErrorKind::UnexpectedEof => 35,
54 io::ErrorKind::Unsupported => 36,
55 io::ErrorKind::WouldBlock => 37,
56 io::ErrorKind::WriteZero => 38,
57 _ => 25,
58 }
59 }
60
61 fn u8_to_error_kind(num: u8) -> io::ErrorKind {
62 match num {
63 0 => io::ErrorKind::AddrInUse,
64 1 => io::ErrorKind::AddrNotAvailable,
65 2 => io::ErrorKind::AlreadyExists,
66 3 => io::ErrorKind::ArgumentListTooLong,
67 4 => io::ErrorKind::BrokenPipe,
68 5 => io::ErrorKind::ConnectionAborted,
69 6 => io::ErrorKind::ConnectionRefused,
70 7 => io::ErrorKind::ConnectionReset,
71 8 => io::ErrorKind::CrossesDevices,
72 9 => io::ErrorKind::Deadlock,
73 10 => io::ErrorKind::DirectoryNotEmpty,
74 11 => io::ErrorKind::ExecutableFileBusy,
75 12 => io::ErrorKind::FileTooLarge,
76 13 => io::ErrorKind::HostUnreachable,
77 14 => io::ErrorKind::Interrupted,
78 15 => io::ErrorKind::InvalidData,
79 17 => io::ErrorKind::InvalidInput,
81 18 => io::ErrorKind::IsADirectory,
82 19 => io::ErrorKind::NetworkDown,
83 20 => io::ErrorKind::NetworkUnreachable,
84 21 => io::ErrorKind::NotADirectory,
85 22 => io::ErrorKind::NotConnected,
86 23 => io::ErrorKind::NotFound,
87 24 => io::ErrorKind::NotSeekable,
88 25 => io::ErrorKind::Other,
89 26 => io::ErrorKind::OutOfMemory,
90 27 => io::ErrorKind::PermissionDenied,
91 28 => io::ErrorKind::QuotaExceeded,
92 29 => io::ErrorKind::ReadOnlyFilesystem,
93 30 => io::ErrorKind::ResourceBusy,
94 31 => io::ErrorKind::StaleNetworkFileHandle,
95 32 => io::ErrorKind::StorageFull,
96 33 => io::ErrorKind::TimedOut,
97 34 => io::ErrorKind::TooManyLinks,
98 35 => io::ErrorKind::UnexpectedEof,
99 36 => io::ErrorKind::Unsupported,
100 37 => io::ErrorKind::WouldBlock,
101 38 => io::ErrorKind::WriteZero,
102 _ => io::ErrorKind::Other,
103 }
104 }
105
106 pub fn serialize<S>(error: &io::Error, serializer: S) -> Result<S::Ok, S::Error>
107 where
108 S: Serializer,
109 {
110 let mut tup = serializer.serialize_tuple(2)?;
111 tup.serialize_element(&error_kind_to_u8(error.kind()))?;
112 tup.serialize_element(&error.to_string())?;
113 tup.end()
114 }
115
116 pub fn deserialize<'de, D>(deserializer: D) -> Result<io::Error, D::Error>
117 where
118 D: Deserializer<'de>,
119 {
120 struct IoErrorVisitor;
121
122 impl<'de> Visitor<'de> for IoErrorVisitor {
123 type Value = io::Error;
124
125 fn expecting(&self, formatter: &mut fmt::Formatter) -> fmt::Result {
126 formatter.write_str("a tuple of (u32, String) representing io::Error")
127 }
128
129 fn visit_seq<A>(self, mut seq: A) -> Result<Self::Value, A::Error>
130 where
131 A: SeqAccess<'de>,
132 {
133 let num: u8 = seq
134 .next_element()?
135 .ok_or_else(|| de::Error::invalid_length(0, &self))?;
136 let message: String = seq
137 .next_element()?
138 .ok_or_else(|| de::Error::invalid_length(1, &self))?;
139 let kind = u8_to_error_kind(num);
140 Ok(io::Error::new(kind, message))
141 }
142 }
143
144 deserializer.deserialize_tuple(2, IoErrorVisitor)
145 }
146 }
147
148 #[cfg(test)]
149 mod tests {
150 use std::io::{self, ErrorKind};
151
152 use postcard;
153 use serde::{Deserialize, Serialize};
154
155 use super::io_error_serde;
156
157 #[derive(Serialize, Deserialize)]
158 struct TestError(#[serde(with = "io_error_serde")] io::Error);
159
160 #[test]
161 fn test_roundtrip_error_kinds() {
162 let message = "test error";
163 let kinds = [
164 ErrorKind::AddrInUse,
165 ErrorKind::AddrNotAvailable,
166 ErrorKind::AlreadyExists,
167 ErrorKind::ArgumentListTooLong,
168 ErrorKind::BrokenPipe,
169 ErrorKind::ConnectionAborted,
170 ErrorKind::ConnectionRefused,
171 ErrorKind::ConnectionReset,
172 ErrorKind::CrossesDevices,
173 ErrorKind::Deadlock,
174 ErrorKind::DirectoryNotEmpty,
175 ErrorKind::ExecutableFileBusy,
176 ErrorKind::FileTooLarge,
177 ErrorKind::HostUnreachable,
178 ErrorKind::Interrupted,
179 ErrorKind::InvalidData,
180 ErrorKind::InvalidInput,
182 ErrorKind::IsADirectory,
183 ErrorKind::NetworkDown,
184 ErrorKind::NetworkUnreachable,
185 ErrorKind::NotADirectory,
186 ErrorKind::NotConnected,
187 ErrorKind::NotFound,
188 ErrorKind::NotSeekable,
189 ErrorKind::Other,
190 ErrorKind::OutOfMemory,
191 ErrorKind::PermissionDenied,
192 ErrorKind::QuotaExceeded,
193 ErrorKind::ReadOnlyFilesystem,
194 ErrorKind::ResourceBusy,
195 ErrorKind::StaleNetworkFileHandle,
196 ErrorKind::StorageFull,
197 ErrorKind::TimedOut,
198 ErrorKind::TooManyLinks,
199 ErrorKind::UnexpectedEof,
200 ErrorKind::Unsupported,
201 ErrorKind::WouldBlock,
202 ErrorKind::WriteZero,
203 ];
204
205 for kind in kinds {
206 let err = TestError(io::Error::new(kind, message));
207 let serialized = postcard::to_allocvec(&err).unwrap();
208 let deserialized: TestError = postcard::from_bytes(&serialized).unwrap();
209
210 assert_eq!(err.0.kind(), deserialized.0.kind());
211 assert_eq!(err.0.to_string(), deserialized.0.to_string());
212 }
213 }
214 }
215}
216
217#[cfg(feature = "fs-store")]
218pub(crate) mod outboard_with_progress {
219 use std::io::{self, BufReader, Read};
220
221 use bao_tree::{
222 blake3,
223 io::{
224 outboard::PreOrderOutboard,
225 sync::{OutboardMut, WriteAt},
226 },
227 iter::BaoChunk,
228 BaoTree, ChunkNum,
229 };
230 use smallvec::SmallVec;
231
232 use super::sink::Sink;
233
234 fn hash_subtree(start_chunk: u64, data: &[u8], is_root: bool) -> blake3::Hash {
235 use blake3::hazmat::{ChainingValue, HasherExt};
236 if is_root {
237 debug_assert!(start_chunk == 0);
238 blake3::hash(data)
239 } else {
240 let mut hasher = blake3::Hasher::new();
241 hasher.set_input_offset(start_chunk * 1024);
242 hasher.update(data);
243 let non_root_hash: ChainingValue = hasher.finalize_non_root();
244 blake3::Hash::from(non_root_hash)
245 }
246 }
247
248 fn parent_cv(
249 left_child: &blake3::Hash,
250 right_child: &blake3::Hash,
251 is_root: bool,
252 ) -> blake3::Hash {
253 use blake3::hazmat::{merge_subtrees_non_root, merge_subtrees_root, ChainingValue, Mode};
254 let left_child: ChainingValue = *left_child.as_bytes();
255 let right_child: ChainingValue = *right_child.as_bytes();
256 if is_root {
257 merge_subtrees_root(&left_child, &right_child, Mode::Hash)
258 } else {
259 blake3::Hash::from(merge_subtrees_non_root(
260 &left_child,
261 &right_child,
262 Mode::Hash,
263 ))
264 }
265 }
266
267 pub async fn init_outboard<R, W, P>(
268 data: R,
269 outboard: &mut PreOrderOutboard<W>,
270 progress: &mut P,
271 ) -> std::io::Result<std::result::Result<(), P::Error>>
272 where
273 W: WriteAt,
274 R: Read,
275 P: Sink<ChunkNum>,
276 {
277 let size = usize::try_from(outboard.tree.size()).unwrap_or(usize::MAX);
280 let read_buf_size = size.min(1024 * 1024);
281 let chunk_buf_size = size.min(outboard.tree.block_size().bytes());
282 let reader = BufReader::with_capacity(read_buf_size, data);
283 let mut buffer = SmallVec::<[u8; 128]>::from_elem(0u8, chunk_buf_size);
284 let res = init_impl(outboard.tree, reader, outboard, &mut buffer, progress).await?;
285 Ok(res)
286 }
287
288 async fn init_impl<W, P>(
289 tree: BaoTree,
290 mut data: impl Read,
291 outboard: &mut PreOrderOutboard<W>,
292 buffer: &mut [u8],
293 progress: &mut P,
294 ) -> io::Result<std::result::Result<(), P::Error>>
295 where
296 W: WriteAt,
297 P: Sink<ChunkNum>,
298 {
299 let mut stack = SmallVec::<[blake3::Hash; 10]>::new();
301 for item in tree.post_order_chunks_iter() {
303 match item {
304 BaoChunk::Parent { is_root, node, .. } => {
305 let right_hash = stack.pop().unwrap();
306 let left_hash = stack.pop().unwrap();
307 outboard.save(node, &(left_hash, right_hash))?;
308 let parent = parent_cv(&left_hash, &right_hash, is_root);
309 stack.push(parent);
310 }
311 BaoChunk::Leaf {
312 size,
313 is_root,
314 start_chunk,
315 ..
316 } => {
317 if let Err(err) = progress.send(start_chunk).await {
318 return Ok(Err(err));
319 }
320 let buf = &mut buffer[..size];
321 data.read_exact(buf)?;
322 let hash = hash_subtree(start_chunk.0, buf, is_root);
323 stack.push(hash);
324 }
325 }
326 }
327 debug_assert_eq!(stack.len(), 1);
328 outboard.root = stack.pop().unwrap();
329 Ok(Ok(()))
330 }
331
332 #[cfg(test)]
333 mod tests {
334 use bao_tree::{
335 blake3,
336 io::{outboard::PreOrderOutboard, sync::CreateOutboard},
337 BaoTree,
338 };
339 use testresult::TestResult;
340
341 use crate::{
342 store::{fs::tests::test_data, IROH_BLOCK_SIZE},
343 util::{outboard_with_progress::init_outboard, sink::Drain},
344 };
345
346 #[tokio::test]
347 async fn init_outboard_with_progress() -> TestResult<()> {
348 for size in [1024 * 18 + 1] {
349 let data = test_data(size);
350 let mut o1 = PreOrderOutboard::<Vec<u8>> {
351 tree: BaoTree::new(data.len() as u64, IROH_BLOCK_SIZE),
352 ..Default::default()
353 };
354 let mut o2 = o1.clone();
355 o1.init_from(data.as_ref())?;
356 init_outboard(data.as_ref(), &mut o2, &mut Drain).await??;
357 assert_eq!(o1.root, blake3::hash(&data));
358 assert_eq!(o1.root, o2.root);
359 assert_eq!(o1.data, o2.data);
360 }
361 Ok(())
362 }
363 }
364}
365
366pub(crate) mod sink {
367 use std::future::Future;
368
369 use irpc::RpcMessage;
370
371 pub trait Sink<Item> {
373 type Error;
374 fn send(
375 &mut self,
376 value: Item,
377 ) -> impl Future<Output = std::result::Result<(), Self::Error>>;
378
379 fn with_map_err<F, U>(self, f: F) -> WithMapErr<Self, F>
380 where
381 Self: Sized,
382 F: Fn(Self::Error) -> U + Send + 'static,
383 {
384 WithMapErr { inner: self, f }
385 }
386
387 fn with_map<F, U>(self, f: F) -> WithMap<Self, F>
388 where
389 Self: Sized,
390 F: Fn(U) -> Item + Send + 'static,
391 {
392 WithMap { inner: self, f }
393 }
394 }
395
396 impl<I, T> Sink<T> for &mut I
397 where
398 I: Sink<T>,
399 {
400 type Error = I::Error;
401
402 async fn send(&mut self, value: T) -> std::result::Result<(), Self::Error> {
403 (*self).send(value).await
404 }
405 }
406
407 #[allow(dead_code)]
408 pub struct IrpcSenderSink<T>(pub irpc::channel::mpsc::Sender<T>);
409
410 impl<T> Sink<T> for IrpcSenderSink<T>
411 where
412 T: RpcMessage,
413 {
414 type Error = irpc::channel::SendError;
415
416 async fn send(&mut self, value: T) -> std::result::Result<(), Self::Error> {
417 self.0.send(value).await
418 }
419 }
420
421 pub struct IrpcSenderRefSink<'a, T>(pub &'a mut irpc::channel::mpsc::Sender<T>);
422
423 impl<'a, T> Sink<T> for IrpcSenderRefSink<'a, T>
424 where
425 T: RpcMessage,
426 {
427 type Error = irpc::channel::SendError;
428
429 async fn send(&mut self, value: T) -> std::result::Result<(), Self::Error> {
430 self.0.send(value).await
431 }
432 }
433
434 pub struct TokioMpscSenderSink<T>(pub tokio::sync::mpsc::Sender<T>);
435
436 impl<T> Sink<T> for TokioMpscSenderSink<T> {
437 type Error = irpc::channel::SendError;
438
439 async fn send(&mut self, value: T) -> std::result::Result<(), Self::Error> {
440 self.0
441 .send(value)
442 .await
443 .map_err(|_| irpc::channel::SendError::ReceiverClosed)
444 }
445 }
446
447 pub struct WithMapErr<P, F> {
448 inner: P,
449 f: F,
450 }
451
452 impl<P, F, E, U> Sink<U> for WithMapErr<P, F>
453 where
454 P: Sink<U>,
455 F: Fn(P::Error) -> E + Send + 'static,
456 {
457 type Error = E;
458
459 async fn send(&mut self, value: U) -> std::result::Result<(), Self::Error> {
460 match self.inner.send(value).await {
461 Ok(()) => Ok(()),
462 Err(err) => {
463 let err = (self.f)(err);
464 Err(err)
465 }
466 }
467 }
468 }
469
470 pub struct WithMap<P, F> {
471 inner: P,
472 f: F,
473 }
474
475 impl<P, F, T, U> Sink<T> for WithMap<P, F>
476 where
477 P: Sink<U>,
478 F: Fn(T) -> U + Send + 'static,
479 {
480 type Error = P::Error;
481
482 async fn send(&mut self, value: T) -> std::result::Result<(), Self::Error> {
483 self.inner.send((self.f)(value)).await
484 }
485 }
486
487 pub struct Drain;
488
489 impl<T> Sink<T> for Drain {
490 type Error = irpc::channel::SendError;
491
492 async fn send(&mut self, _offset: T) -> std::result::Result<(), Self::Error> {
493 Ok(())
494 }
495 }
496}