ffsend_api/pipe/mod.rs
1//! Provides IO stream pipe functionality.
2//!
3//! This module provides piping functionality for IO streams implementing `Read` or `Write`.
4//! A pipe can be used to wrap an existing reader or writer, monitoring or transforming data read
5//! from or written to it.
6//!
7//! You may use a pipe for archiving (zipping) or to encrypt/decrypt bytes flowing through the
8//! pipe reader or writer.
9
10use bytes::BytesMut;
11use std::cmp::min;
12use std::io::{self, Read, Write};
13
14pub mod crypto;
15pub mod progress;
16mod traits;
17
18// Re-export modules
19pub use progress::{ProgressPipe, ProgressReader, ProgressReporter, ProgressWriter};
20pub use traits::{Pipe, PipeLen, PipeRead, PipeWrite, ReadLen, WriteLen};
21
22/// Prelude for common pipe traits.
23pub mod prelude {
24 pub use super::{crypto::prelude::*, Pipe, PipeLen, PipeRead, PipeWrite, ReadLen, WriteLen};
25}
26
27/// The default size of byte buffers.
28const DEFAULT_BUF_SIZE: usize = 8 * 1024;
29
30/// A simple generic reader implementation for a pipe.
31///
32/// This type may be used as simple reader implementation for any pipe.
33/// For better performance it is better to implement a custom reader in some situations.
34///
35/// It is recommended to expose a custom reader type definition for each of your pipes like this:
36///
37/// ```ignore
38/// pub type MyReader = PipeReader<MyPipe>;
39/// ```
40pub struct PipeReader<P>
41where
42 P: Pipe,
43{
44 pipe: P,
45 inner: Box<dyn Read>,
46 buf: BytesMut,
47}
48
49/// A simple generic writer implementation for a pipe.
50///
51/// This type may be used as simple writer implementation for any pipe.
52/// For better performance it is better to implement a custom writer in some situations.
53///
54/// It is recommended to expose a custom writer type definition for each of your pipes like this:
55///
56/// ```ignore
57/// pub type MyWriter = PipeWriter<MyPipe>;
58/// ```
59pub struct PipeWriter<P>
60where
61 P: Pipe,
62{
63 pipe: P,
64 inner: Box<dyn Write>,
65}
66
67impl<P> PipeRead<P> for PipeReader<P>
68where
69 P: Pipe,
70{
71 fn new(pipe: P, inner: Box<dyn Read>) -> Self {
72 Self {
73 pipe,
74 inner,
75 buf: BytesMut::with_capacity(DEFAULT_BUF_SIZE),
76 }
77 }
78}
79
80impl<P> PipeWrite<P> for PipeWriter<P>
81where
82 P: Pipe,
83{
84 fn new(pipe: P, inner: Box<dyn Write>) -> Self {
85 Self { pipe, inner }
86 }
87}
88
89impl<P> Read for PipeReader<P>
90where
91 P: Pipe,
92{
93 fn read(&mut self, mut buf: &mut [u8]) -> io::Result<usize> {
94 // Attempt to fill input buffer if has capacity upto default buffer size and output length
95 let capacity = min(DEFAULT_BUF_SIZE, buf.len()) - self.buf.len();
96 if capacity > 0 {
97 // Read from inner to input buffer
98 let mut inner_buf = vec![0u8; capacity];
99 let read = self.inner.read(&mut inner_buf)?;
100 self.buf.extend_from_slice(&inner_buf[..read]);
101
102 // If nothing is read, return the same
103 if read == 0 {
104 return Ok(0);
105 }
106 }
107
108 // Move input buffer into the pipe
109 let (read, out) = self.pipe.pipe(&self.buf);
110 let _ = self.buf.split_to(read);
111
112 // Number of bytes written to given buffer
113 let mut total = 0;
114
115 // Write any pipe output to given buffer and remaining to output buffer
116 if let Some(out) = out {
117 // Copy as much data as possible from pipe output to read buffer
118 let write = min(out.len(), buf.len());
119 total += write;
120 buf[..write].copy_from_slice(&out[..write]);
121
122 // Assert there are no unwritten output bytes
123 assert_eq!(
124 write,
125 out.len(),
126 "failed to write all pipe output bytes to output buffer"
127 );
128
129 // Return if given buffer is full, or slice to unwritten buffer
130 if write == buf.len() {
131 return Ok(total);
132 }
133 buf = &mut buf[write..];
134 }
135
136 // Try again with remaining given buffer
137 self.read(buf).map(|n| n + total)
138 }
139}
140
141impl<P> Write for PipeWriter<P>
142where
143 P: Pipe,
144{
145 fn write(&mut self, buf: &[u8]) -> io::Result<usize> {
146 // Transform input data through crypter, write result to inner writer
147 let (read, data) = self.pipe.pipe(buf);
148 if let Some(data) = data {
149 self.inner.write_all(&data)?;
150 }
151
152 Ok(read)
153 }
154
155 fn flush(&mut self) -> io::Result<()> {
156 self.inner.flush()
157 }
158}