1use std::{
10 pin::Pin,
11 task::{Context, Poll},
12};
13
14use bytes::{Buf, BytesMut};
15use regex::bytes::Regex;
16use tokio::io::AsyncRead;
17use tokio_stream::Stream;
18use tokio_util::codec::{Decoder, FramedRead};
19
20use crate::{Adapter, MatchDisposition, RcErr};
21
22struct ByteDecoder {
23 fence: Regex,
24 match_dispo: MatchDisposition,
25 scan_offset: usize,
26}
27
28impl Decoder for ByteDecoder {
29 type Item = Vec<u8>;
30 type Error = RcErr;
31
32 fn decode(&mut self, src: &mut BytesMut) -> Result<Option<Self::Item>, Self::Error> {
33 let (start, end) = match self.fence.find_at(src.as_ref(), self.scan_offset) {
34 Some(m) => (m.start(), m.end()),
35 None => return Ok(None),
36 };
37 let length = end - start;
38
39 let new_buff = match self.match_dispo {
40 MatchDisposition::Drop => {
41 let new_buff: Vec<u8> = src.split_to(start).into();
42 src.advance(length);
43 new_buff
44 }
45 MatchDisposition::Append => src.split_to(end).into(),
46 MatchDisposition::Prepend => {
47 self.scan_offset = length;
48 src.split_to(start).into()
49 }
50 };
51
52 Ok(Some(new_buff))
53 }
54
55 fn decode_eof(&mut self, src: &mut BytesMut) -> Result<Option<Self::Item>, Self::Error> {
56 if let Some(v) = self.decode(src)? {
57 Ok(Some(v))
58 } else if src.is_empty() {
59 Ok(None)
60 } else {
61 Ok(Some(src.split().into()))
62 }
63 }
64}
65
66pub struct ByteChunker<R: AsyncRead> {
79 freader: FramedRead<R, ByteDecoder>,
80}
81
82impl<R: AsyncRead> ByteChunker<R> {
83 pub fn new(source: R, pattern: &str) -> Result<Self, RcErr> {
87 let fence = Regex::new(pattern)?;
88 let decoder = ByteDecoder {
89 fence,
90 match_dispo: MatchDisposition::default(),
92 scan_offset: 0,
93 };
94
95 let freader = FramedRead::new(source, decoder);
96 Ok(Self { freader })
97 }
98
99 pub fn with_adapter<A>(self, adapter: A) -> CustomChunker<R, A> {
100 CustomChunker {
101 chunker: self,
102 adapter,
103 }
104 }
105
106 pub fn with_match(mut self, behavior: MatchDisposition) -> Self {
109 let d = self.freader.decoder_mut();
110 d.match_dispo = behavior;
111 if matches!(behavior, MatchDisposition::Drop | MatchDisposition::Append) {
112 d.scan_offset = 0;
113 }
114 self
115 }
116}
117
118impl<A: AsyncRead + Unpin> Stream for ByteChunker<A> {
119 type Item = Result<Vec<u8>, RcErr>;
120
121 fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
122 Pin::new(&mut self.freader).poll_next(cx)
123 }
124}
125
126pub struct CustomChunker<R: AsyncRead, A> {
161 chunker: ByteChunker<R>,
162 adapter: A,
163}
164
165impl<R: AsyncRead, A> CustomChunker<R, A> {
166 pub fn into_innards(self) -> (ByteChunker<R>, A) {
169 (self.chunker, self.adapter)
170 }
171
172 pub fn get_adapter(&self) -> &A { &self.adapter }
174
175 pub fn get_adapter_mut(&mut self) -> &mut A { &mut self.adapter }
177}
178
179impl<R: AsyncRead, A> Unpin for CustomChunker<R, A> {}
180
181impl<R, A> Stream for CustomChunker<R, A>
182where
183 R: AsyncRead + Unpin,
184 A: Adapter
185{
186 type Item = A::Item;
187
188 fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
189 let p = Pin::new(&mut self.chunker).poll_next(cx);
190 match p {
191 Poll::Pending => Poll::Pending,
192 Poll::Ready(x) => Poll::Ready(self.adapter.adapt(x)),
193 }
194 }
195}
196
197#[cfg(test)]
198mod tests {
199 use super::*;
200 #[allow(unused_imports)]
201 use crate::tests::{
202 chunk_vec, ref_slice_cmp, HTTP_PATT, HTTP_URL, PASSWD_PATH, PASSWD_PATT, TEST_PATH,
203 TEST_PATT,
204 };
205
206 use std::process::Stdio;
207
208 use tokio::{fs::File, process::Command};
209 use tokio_stream::StreamExt;
210
211 static SOURCE: &str = "target/debug/slowsource";
212 static SOURCE_ARGS: &[&str] = &[TEST_PATH, "0.0", "0.1"];
213
214 #[tokio::test]
215 async fn basic_async() {
216 let byte_vec = std::fs::read(TEST_PATH).unwrap();
217 let re = Regex::new(TEST_PATT).unwrap();
218 let slice_vec = chunk_vec(&re, &byte_vec, MatchDisposition::Drop);
219
220 let f = File::open(TEST_PATH).await.unwrap();
221 let chunker = ByteChunker::new(f, TEST_PATT).unwrap();
222 let vec_vec: Vec<Vec<u8>> = chunker.map(|res| res.unwrap()).collect().await;
223
224 ref_slice_cmp(&vec_vec, &slice_vec);
225 }
226
227 #[tokio::test]
228 async fn slow_async() {
229 let byte_vec = std::fs::read(TEST_PATH).unwrap();
230 let re = Regex::new(TEST_PATT).unwrap();
231 let slice_vec = chunk_vec(&re, &byte_vec, MatchDisposition::Drop);
232
233 let mut child = Command::new(SOURCE)
234 .args(SOURCE_ARGS)
235 .stdout(Stdio::piped())
236 .spawn()
237 .unwrap();
238 let stdout = child.stdout.take().unwrap();
239 let chunker = ByteChunker::new(stdout, TEST_PATT).unwrap();
240 let vec_vec: Vec<Vec<u8>> = chunker.map(|res| res.unwrap()).collect().await;
241 child.wait().await.unwrap();
242
243 ref_slice_cmp(&vec_vec, &slice_vec);
244 }
245}