weezl/encode_into_async.rs
1use crate::encode::IntoAsync;
2use crate::error::LzwStatus;
3use crate::error::StreamResult;
4use crate::StreamBuf;
5use std::io;
6
7impl<'d, W: futures::io::AsyncWrite + core::marker::Unpin> IntoAsync<'d, W> {
8 /// Encode data from a reader.
9 ///
10 /// This will drain the supplied reader. It will not encode an end marker after all data has
11 /// been processed.
12 pub async fn encode(&mut self, read: impl futures::io::AsyncBufRead) -> StreamResult {
13 self.encode_part(read, false).await
14 }
15
16 /// Encode data from a reader and an end marker.
17 pub async fn encode_all(mut self, read: impl futures::io::AsyncBufRead) -> StreamResult {
18 self.encode_part(read, true).await
19 }
20
21 /// Set the size of the intermediate decode buffer.
22 ///
23 /// A buffer of this size is allocated to hold one part of the decoded stream when no buffer is
24 /// available and any decoding method is called. No buffer is allocated if `set_buffer` has
25 /// been called. The buffer is reused.
26 ///
27 /// # Panics
28 /// This method panics if `size` is `0`.
29 pub fn set_buffer_size(&mut self, size: usize) {
30 assert_ne!(size, 0, "Attempted to set empty buffer");
31 self.default_size = size;
32 }
33
34 /// Use a particular buffer as an intermediate decode buffer.
35 ///
36 /// Calling this sets or replaces the buffer. When a buffer has been set then it is used
37 /// instead of dynamically allocating a buffer. Note that the size of the buffer is critical
38 /// for efficient decoding. Some optimization techniques require the buffer to hold one or more
39 /// previous decoded words. There is also additional overhead from `write` calls each time the
40 /// buffer has been filled.
41 ///
42 /// # Panics
43 /// This method panics if the `buffer` is empty.
44 pub fn set_buffer(&mut self, buffer: &'d mut [u8]) {
45 assert_ne!(buffer.len(), 0, "Attempted to set empty buffer");
46 self.buffer = Some(StreamBuf::Borrowed(buffer));
47 }
48
49 async fn encode_part(
50 &mut self,
51 read: impl futures::io::AsyncBufRead,
52 finish: bool,
53 ) -> StreamResult {
54 use futures::io::AsyncBufReadExt;
55 use futures::io::AsyncWriteExt;
56
57 let IntoAsync {
58 encoder,
59 writer,
60 buffer,
61 default_size,
62 } = self;
63
64 futures::pin_mut!(read);
65 let mut read: core::pin::Pin<_> = read;
66
67 let mut bytes_read = 0;
68 let mut bytes_written = 0;
69
70 // Converting to mutable refs to move into the `once` closure.
71 let read_bytes = &mut bytes_read;
72 let write_bytes = &mut bytes_written;
73
74 let outbuf: &mut [u8] =
75 match { buffer.get_or_insert_with(|| StreamBuf::Owned(vec![0u8; *default_size])) } {
76 StreamBuf::Borrowed(slice) => &mut *slice,
77 StreamBuf::Owned(vec) => &mut *vec,
78 };
79 assert!(!outbuf.is_empty());
80
81 let status = loop {
82 // Try to grab one buffer of input data.
83 let mut filler = read.as_mut();
84 let data = match filler.fill_buf().await {
85 Ok(buf) => buf,
86 Err(err) => break Err(err),
87 };
88
89 if data.is_empty() {
90 if finish {
91 encoder.finish();
92 } else {
93 break Ok(());
94 }
95 }
96
97 // Decode as much of the buffer as fits.
98 let result = encoder.encode_bytes(data, &mut outbuf[..]);
99 // Do the bookkeeping and consume the buffer.
100 *read_bytes += result.consumed_in;
101 *write_bytes += result.consumed_out;
102 read.as_mut().consume(result.consumed_in);
103
104 // Handle an error status in the result.
105 let done = match result.status {
106 Ok(ok) => ok,
107 Err(err) => {
108 break Err(io::Error::new(
109 io::ErrorKind::InvalidData,
110 &*format!("{:?}", err),
111 ));
112 }
113 };
114
115 if let LzwStatus::Done = done {
116 break writer.write_all(&outbuf[..result.consumed_out]).await;
117 }
118
119 if let LzwStatus::NoProgress = done {
120 break Err(io::Error::new(
121 io::ErrorKind::UnexpectedEof,
122 "No more data but no end marker detected",
123 ));
124 }
125
126 // And finish by writing our result.
127 // TODO: we may lose data on error (also on status error above) which we might want to
128 // deterministically handle so that we don't need to restart everything from scratch as
129 // the only recovery strategy. Any changes welcome.
130 match writer.write_all(&outbuf[..result.consumed_out]).await {
131 Ok(_) => {}
132 Err(err) => break Err(err),
133 }
134 };
135
136 StreamResult {
137 bytes_read,
138 bytes_written,
139 status,
140 }
141 }
142}