ffsend_api/pipe/
mod.rs

1//! Provides IO stream pipe functionality.
2//!
3//! This module provides piping functionality for IO streams implementing `Read` or `Write`.
4//! A pipe can be used to wrap an existing reader or writer, monitoring or transforming data read
5//! from or written to it.
6//!
7//! You may use a pipe for archiving (zipping) or to encrypt/decrypt bytes flowing through the
8//! pipe reader or writer.
9
10use bytes::BytesMut;
11use std::cmp::min;
12use std::io::{self, Read, Write};
13
14pub mod crypto;
15pub mod progress;
16mod traits;
17
18// Re-export modules
19pub use progress::{ProgressPipe, ProgressReader, ProgressReporter, ProgressWriter};
20pub use traits::{Pipe, PipeLen, PipeRead, PipeWrite, ReadLen, WriteLen};
21
22/// Prelude for common pipe traits.
23pub mod prelude {
24    pub use super::{crypto::prelude::*, Pipe, PipeLen, PipeRead, PipeWrite, ReadLen, WriteLen};
25}
26
27/// The default size of byte buffers.
28const DEFAULT_BUF_SIZE: usize = 8 * 1024;
29
30/// A simple generic reader implementation for a pipe.
31///
32/// This type may be used as simple reader implementation for any pipe.
33/// For better performance it is better to implement a custom reader in some situations.
34///
35/// It is recommended to expose a custom reader type definition for each of your pipes like this:
36///
37/// ```ignore
38/// pub type MyReader = PipeReader<MyPipe>;
39/// ```
40pub struct PipeReader<P>
41where
42    P: Pipe,
43{
44    pipe: P,
45    inner: Box<dyn Read>,
46    buf: BytesMut,
47}
48
49/// A simple generic writer implementation for a pipe.
50///
51/// This type may be used as simple writer implementation for any pipe.
52/// For better performance it is better to implement a custom writer in some situations.
53///
54/// It is recommended to expose a custom writer type definition for each of your pipes like this:
55///
56/// ```ignore
57/// pub type MyWriter = PipeWriter<MyPipe>;
58/// ```
59pub struct PipeWriter<P>
60where
61    P: Pipe,
62{
63    pipe: P,
64    inner: Box<dyn Write>,
65}
66
67impl<P> PipeRead<P> for PipeReader<P>
68where
69    P: Pipe,
70{
71    fn new(pipe: P, inner: Box<dyn Read>) -> Self {
72        Self {
73            pipe,
74            inner,
75            buf: BytesMut::with_capacity(DEFAULT_BUF_SIZE),
76        }
77    }
78}
79
80impl<P> PipeWrite<P> for PipeWriter<P>
81where
82    P: Pipe,
83{
84    fn new(pipe: P, inner: Box<dyn Write>) -> Self {
85        Self { pipe, inner }
86    }
87}
88
89impl<P> Read for PipeReader<P>
90where
91    P: Pipe,
92{
93    fn read(&mut self, mut buf: &mut [u8]) -> io::Result<usize> {
94        // Attempt to fill input buffer if has capacity upto default buffer size and output length
95        let capacity = min(DEFAULT_BUF_SIZE, buf.len()) - self.buf.len();
96        if capacity > 0 {
97            // Read from inner to input buffer
98            let mut inner_buf = vec![0u8; capacity];
99            let read = self.inner.read(&mut inner_buf)?;
100            self.buf.extend_from_slice(&inner_buf[..read]);
101
102            // If nothing is read, return the same
103            if read == 0 {
104                return Ok(0);
105            }
106        }
107
108        // Move input buffer into the pipe
109        let (read, out) = self.pipe.pipe(&self.buf);
110        let _ = self.buf.split_to(read);
111
112        // Number of bytes written to given buffer
113        let mut total = 0;
114
115        // Write any pipe output to given buffer and remaining to output buffer
116        if let Some(out) = out {
117            // Copy as much data as possible from pipe output to read buffer
118            let write = min(out.len(), buf.len());
119            total += write;
120            buf[..write].copy_from_slice(&out[..write]);
121
122            // Assert there are no unwritten output bytes
123            assert_eq!(
124                write,
125                out.len(),
126                "failed to write all pipe output bytes to output buffer"
127            );
128
129            // Return if given buffer is full, or slice to unwritten buffer
130            if write == buf.len() {
131                return Ok(total);
132            }
133            buf = &mut buf[write..];
134        }
135
136        // Try again with remaining given buffer
137        self.read(buf).map(|n| n + total)
138    }
139}
140
141impl<P> Write for PipeWriter<P>
142where
143    P: Pipe,
144{
145    fn write(&mut self, buf: &[u8]) -> io::Result<usize> {
146        // Transform input data through crypter, write result to inner writer
147        let (read, data) = self.pipe.pipe(buf);
148        if let Some(data) = data {
149            self.inner.write_all(&data)?;
150        }
151
152        Ok(read)
153    }
154
155    fn flush(&mut self) -> io::Result<()> {
156        self.inner.flush()
157    }
158}