weezl/
encode_into_async.rs

1use crate::encode::IntoAsync;
2use crate::error::LzwStatus;
3use crate::error::StreamResult;
4use crate::StreamBuf;
5use std::io;
6
7impl<'d, W: futures::io::AsyncWrite + core::marker::Unpin> IntoAsync<'d, W> {
8    /// Encode data from a reader.
9    ///
10    /// This will drain the supplied reader. It will not encode an end marker after all data has
11    /// been processed.
12    pub async fn encode(&mut self, read: impl futures::io::AsyncBufRead) -> StreamResult {
13        self.encode_part(read, false).await
14    }
15
16    /// Encode data from a reader and an end marker.
17    pub async fn encode_all(mut self, read: impl futures::io::AsyncBufRead) -> StreamResult {
18        self.encode_part(read, true).await
19    }
20
21    /// Set the size of the intermediate decode buffer.
22    ///
23    /// A buffer of this size is allocated to hold one part of the decoded stream when no buffer is
24    /// available and any decoding method is called. No buffer is allocated if `set_buffer` has
25    /// been called. The buffer is reused.
26    ///
27    /// # Panics
28    /// This method panics if `size` is `0`.
29    pub fn set_buffer_size(&mut self, size: usize) {
30        assert_ne!(size, 0, "Attempted to set empty buffer");
31        self.default_size = size;
32    }
33
34    /// Use a particular buffer as an intermediate decode buffer.
35    ///
36    /// Calling this sets or replaces the buffer. When a buffer has been set then it is used
37    /// instead of dynamically allocating a buffer. Note that the size of the buffer is critical
38    /// for efficient decoding. Some optimization techniques require the buffer to hold one or more
39    /// previous decoded words. There is also additional overhead from `write` calls each time the
40    /// buffer has been filled.
41    ///
42    /// # Panics
43    /// This method panics if the `buffer` is empty.
44    pub fn set_buffer(&mut self, buffer: &'d mut [u8]) {
45        assert_ne!(buffer.len(), 0, "Attempted to set empty buffer");
46        self.buffer = Some(StreamBuf::Borrowed(buffer));
47    }
48
49    async fn encode_part(
50        &mut self,
51        read: impl futures::io::AsyncBufRead,
52        finish: bool,
53    ) -> StreamResult {
54        use futures::io::AsyncBufReadExt;
55        use futures::io::AsyncWriteExt;
56
57        let IntoAsync {
58            encoder,
59            writer,
60            buffer,
61            default_size,
62        } = self;
63
64        futures::pin_mut!(read);
65        let mut read: core::pin::Pin<_> = read;
66
67        let mut bytes_read = 0;
68        let mut bytes_written = 0;
69
70        // Converting to mutable refs to move into the `once` closure.
71        let read_bytes = &mut bytes_read;
72        let write_bytes = &mut bytes_written;
73
74        let outbuf: &mut [u8] =
75            match { buffer.get_or_insert_with(|| StreamBuf::Owned(vec![0u8; *default_size])) } {
76                StreamBuf::Borrowed(slice) => &mut *slice,
77                StreamBuf::Owned(vec) => &mut *vec,
78            };
79        assert!(!outbuf.is_empty());
80
81        let status = loop {
82            // Try to grab one buffer of input data.
83            let mut filler = read.as_mut();
84            let data = match filler.fill_buf().await {
85                Ok(buf) => buf,
86                Err(err) => break Err(err),
87            };
88
89            if data.is_empty() {
90                if finish {
91                    encoder.finish();
92                } else {
93                    break Ok(());
94                }
95            }
96
97            // Decode as much of the buffer as fits.
98            let result = encoder.encode_bytes(data, &mut outbuf[..]);
99            // Do the bookkeeping and consume the buffer.
100            *read_bytes += result.consumed_in;
101            *write_bytes += result.consumed_out;
102            read.as_mut().consume(result.consumed_in);
103
104            // Handle an error status in the result.
105            let done = match result.status {
106                Ok(ok) => ok,
107                Err(err) => {
108                    break Err(io::Error::new(
109                        io::ErrorKind::InvalidData,
110                        &*format!("{:?}", err),
111                    ));
112                }
113            };
114
115            if let LzwStatus::Done = done {
116                break writer.write_all(&outbuf[..result.consumed_out]).await;
117            }
118
119            if let LzwStatus::NoProgress = done {
120                break Err(io::Error::new(
121                    io::ErrorKind::UnexpectedEof,
122                    "No more data but no end marker detected",
123                ));
124            }
125
126            // And finish by writing our result.
127            // TODO: we may lose data on error (also on status error above) which we might want to
128            // deterministically handle so that we don't need to restart everything from scratch as
129            // the only recovery strategy. Any changes welcome.
130            match writer.write_all(&outbuf[..result.consumed_out]).await {
131                Ok(_) => {}
132                Err(err) => break Err(err),
133            }
134        };
135
136        StreamResult {
137            bytes_read,
138            bytes_written,
139            status,
140        }
141    }
142}