1pub mod async_io;
16pub mod network;
17
18use std::{io::Read, pin::Pin, task::Poll};
19
20use futures::AsyncRead;
21
22pub trait Serializable {
23 fn estimated_size(&self) -> usize;
24}
25
26pub struct BytesReader<'a> {
27 data: &'a [u8],
28 pos: usize,
29}
30
31impl BytesReader<'_> {
32 pub fn new<'a>(data: &'a [u8]) -> BytesReader<'a> {
33 BytesReader { data, pos: 0 }
34 }
35}
36
37impl Read for BytesReader<'_> {
38 fn read(&mut self, buf: &mut [u8]) -> std::io::Result<usize> {
39 let mut i = 0;
40 while self.pos + i < self.data.len() && i < buf.len() {
41 buf[i] = self.data[self.pos + i];
42 i += 1;
43 }
44 self.pos += i;
45 Ok(i)
46 }
47}
48
49pub struct DynamicChain<'a> {
50 readers: Vec<Box<dyn Read + Send + 'a>>,
51}
52
53impl<'a> DynamicChain<'a> {
54 pub fn new(readers: Vec<Box<dyn Read + Send + 'a>>) -> DynamicChain<'a> {
55 DynamicChain { readers }
56 }
57
58 pub fn push<'b, T>(&'a mut self, reader: T)
59 where
60 T: Read + Send + 'b,
61 'b: 'a,
62 {
63 self.readers.push(Box::new(reader));
64 }
65}
66
67impl Read for DynamicChain<'_> {
68 fn read(&mut self, buf: &mut [u8]) -> std::io::Result<usize> {
69 let mut i = 0;
70 for reader in &mut self.readers {
71 loop {
72 let r = reader.read(&mut buf[i..])?;
73 i += r;
74 if r == 0 {
75 break;
76 }
77 }
78 if i >= buf.len() {
79 break;
80 }
81 }
82 Ok(i)
83 }
84}
85
86pub struct ProgressReportingReader<T> {
87 inner: T,
88 read: usize,
89 callback: Box<dyn Fn(usize) + Send + Sync>,
90}
91
92impl<T> ProgressReportingReader<T> {
93 pub fn new<C>(reader: T, callback: C) -> ProgressReportingReader<T>
94 where
95 C: Fn(usize) + Send + Sync + 'static,
96 {
97 ProgressReportingReader {
98 inner: reader,
99 read: 0,
100 callback: Box::new(callback),
101 }
102 }
103}
104
105impl<T> AsyncRead for ProgressReportingReader<T>
106where
107 T: AsyncRead + Send + Unpin,
108{
109 fn poll_read(
110 self: Pin<&mut Self>,
111 cx: &mut std::task::Context<'_>,
112 buf: &mut [u8],
113 ) -> Poll<std::io::Result<usize>> {
114 let r = self.get_mut();
115 match Pin::new(&mut r.inner).poll_read(cx, buf) {
116 Poll::Ready(Ok(size)) => {
117 r.read += size;
118 (r.callback)(r.read);
119 Poll::Ready(Ok(size))
120 }
121
122 Poll::Ready(Err(e)) => Poll::Ready(Err(e)),
123
124 Poll::Pending => Poll::Pending,
125 }
126 }
127}