streamson_tokio/
decoder.rs1use bytes::{Bytes, BytesMut};
6use std::{
7 io,
8 sync::{Arc, Mutex},
9};
10use streamson_lib::{
11 error, handler, matcher,
12 strategy::{self, Strategy},
13};
14use tokio_util::codec::Decoder;
15
16pub struct Extractor {
40 trigger: strategy::Trigger,
41 handler: Arc<Mutex<handler::Buffer>>,
42}
43
44impl Extractor {
45 pub fn new(matcher: impl matcher::Matcher + 'static, include_path: bool) -> Self {
51 let handler = Arc::new(Mutex::new(
53 handler::Buffer::new().set_use_path(include_path),
54 ));
55 let mut trigger = strategy::Trigger::new();
56 trigger.add_matcher(Box::new(matcher), handler.clone());
57 Self { trigger, handler }
58 }
59}
60
61impl Decoder for Extractor {
62 type Item = (Option<String>, Bytes);
63 type Error = error::General;
64
65 fn decode(&mut self, buf: &mut BytesMut) -> Result<Option<Self::Item>, Self::Error> {
66 loop {
67 {
68 let mut handler = self.handler.lock().unwrap();
70 if let Some((path, bytes)) = handler.pop() {
71 return Ok(Some((path, Bytes::from(bytes))));
72 }
73 }
75 if buf.is_empty() {
76 return Ok(None);
78 }
79 let data = buf.split_to(buf.len());
80 self.trigger.process(&data[..])?;
81 }
82 }
83
84 fn decode_eof(&mut self, buf: &mut BytesMut) -> Result<Option<Self::Item>, Self::Error> {
85 self.trigger.terminate()?;
86 match self.decode(buf)? {
87 Some(frame) => Ok(Some(frame)),
88 None => {
89 if buf.is_empty() {
90 Ok(None)
91 } else {
92 Err(io::Error::new(io::ErrorKind::Other, "bytes remaining on stream").into())
93 }
94 }
95 }
96 }
97}
98
99#[cfg(test)]
100mod tests {
101 use super::Extractor;
102 use bytes::Bytes;
103 use std::io::Cursor;
104 use streamson_lib::matcher;
105 use tokio::stream::StreamExt;
106 use tokio_util::codec::FramedRead;
107
108 #[tokio::test]
109 async fn with_included_path() {
110 let cursor =
111 Cursor::new(br#"{"users": ["mike","john"], "groups": ["admin", "staff"]}"#.to_vec());
112 let matcher = matcher::Combinator::new(matcher::Simple::new(r#"{"users"}[]"#).unwrap())
113 | matcher::Combinator::new(matcher::Simple::new(r#"{"groups"}[]"#).unwrap());
114 let extractor = Extractor::new(matcher, true);
115 let mut output = FramedRead::new(cursor, extractor);
116
117 assert_eq!(
118 output.next().await.unwrap().unwrap(),
119 (
120 Some(r#"{"users"}[0]"#.to_string()),
121 Bytes::from_static(br#""mike""#)
122 )
123 );
124
125 assert_eq!(
126 output.next().await.unwrap().unwrap(),
127 (
128 Some(r#"{"users"}[1]"#.to_string()),
129 Bytes::from_static(br#""john""#)
130 )
131 );
132
133 assert_eq!(
134 output.next().await.unwrap().unwrap(),
135 (
136 Some(r#"{"groups"}[0]"#.to_string()),
137 Bytes::from_static(br#""admin""#)
138 )
139 );
140
141 assert_eq!(
142 output.next().await.unwrap().unwrap(),
143 (
144 Some(r#"{"groups"}[1]"#.to_string()),
145 Bytes::from_static(br#""staff""#)
146 )
147 );
148
149 assert!(output.next().await.is_none());
150 }
151
152 #[tokio::test]
153 async fn without_included_path() {
154 let cursor =
155 Cursor::new(br#"{"users": ["mike","john"], "groups": ["admin", "staff"]}"#.to_vec());
156 let matcher = matcher::Combinator::new(matcher::Simple::new(r#"{"users"}[]"#).unwrap())
157 | matcher::Combinator::new(matcher::Simple::new(r#"{"groups"}[]"#).unwrap());
158 let extractor = Extractor::new(matcher, false);
159 let mut output = FramedRead::new(cursor, extractor);
160
161 assert_eq!(
162 output.next().await.unwrap().unwrap(),
163 (None, Bytes::from_static(br#""mike""#))
164 );
165
166 assert_eq!(
167 output.next().await.unwrap().unwrap(),
168 (None, Bytes::from_static(br#""john""#))
169 );
170
171 assert_eq!(
172 output.next().await.unwrap().unwrap(),
173 (None, Bytes::from_static(br#""admin""#))
174 );
175
176 assert_eq!(
177 output.next().await.unwrap().unwrap(),
178 (None, Bytes::from_static(br#""staff""#))
179 );
180
181 assert!(output.next().await.is_none());
182 }
183
184 #[tokio::test]
185 async fn multiple_json_input() {
186 let cursor = Cursor::new(
187 br#"{"users": ["user1","user2", "user3"]} {"users": ["user4","user5"]}"#.to_vec(),
188 );
189 let matcher = matcher::Simple::new(r#"{"users"}[]"#).unwrap();
190 let extractor = Extractor::new(matcher, true);
191
192 let mut output = FramedRead::new(cursor, extractor);
193
194 assert_eq!(
195 output.next().await.unwrap().unwrap(),
196 (
197 Some(r#"{"users"}[0]"#.to_string()),
198 Bytes::from_static(br#""user1""#)
199 )
200 );
201 assert_eq!(
202 output.next().await.unwrap().unwrap(),
203 (
204 Some(r#"{"users"}[1]"#.to_string()),
205 Bytes::from_static(br#""user2""#)
206 )
207 );
208 assert_eq!(
209 output.next().await.unwrap().unwrap(),
210 (
211 Some(r#"{"users"}[2]"#.to_string()),
212 Bytes::from_static(br#""user3""#)
213 )
214 );
215 assert_eq!(
216 output.next().await.unwrap().unwrap(),
217 (
218 Some(r#"{"users"}[0]"#.to_string()),
219 Bytes::from_static(br#""user4""#)
220 )
221 );
222 assert_eq!(
223 output.next().await.unwrap().unwrap(),
224 (
225 Some(r#"{"users"}[1]"#.to_string()),
226 Bytes::from_static(br#""user5""#)
227 )
228 );
229
230 assert!(output.next().await.is_none());
231 }
232}