streamson_tokio/
decoder.rs

1//! Decoders which implement `tokio_util::codec::Decoder`
2//! and are able to extract (path, bytes) items for `AsyncRead`
3//!
4
5use bytes::{Bytes, BytesMut};
6use std::{
7    io,
8    sync::{Arc, Mutex},
9};
10use streamson_lib::{
11    error, handler, matcher,
12    strategy::{self, Strategy},
13};
14use tokio_util::codec::Decoder;
15
16/// This struct uses `streamson_lib::matcher` to decode data.
17///
18/// # Examples
19/// ```
20/// use std::io;
21/// use streamson_lib::{error, matcher};
22/// use streamson_tokio::decoder::Extractor;
23/// use tokio::{fs, stream::StreamExt};
24/// use tokio_util::codec::FramedRead;
25///
26/// async fn process() -> Result<(), error::General> {
27///     let mut file = fs::File::open("/tmp/large.json").await?;
28///     let matcher = matcher::Combinator::new(matcher::Simple::new(r#"{"users"}[]"#).unwrap())
29///         | matcher::Combinator::new(matcher::Simple::new(r#"{"groups"}[]"#).unwrap());
30///     let extractor = Extractor::new(matcher, true);
31///     let mut output = FramedRead::new(file, extractor);
32///     while let Some(item) = output.next().await {
33///         let (path, data) = item?;
34///         // Do something with extracted data
35///     }
36///     Ok(())
37/// }
38/// ```
39pub struct Extractor {
40    trigger: strategy::Trigger,
41    handler: Arc<Mutex<handler::Buffer>>,
42}
43
44impl Extractor {
45    /// Creates a new `Extractor`
46    ///
47    /// # Arguments
48    /// * `matcher` - matcher to be used for extractions (see `streamson_lib::matcher`)
49    /// * `include_path` - will path be included in output
50    pub fn new(matcher: impl matcher::Matcher + 'static, include_path: bool) -> Self {
51        // TODO limit max length and fail when reached
52        let handler = Arc::new(Mutex::new(
53            handler::Buffer::new().set_use_path(include_path),
54        ));
55        let mut trigger = strategy::Trigger::new();
56        trigger.add_matcher(Box::new(matcher), handler.clone());
57        Self { trigger, handler }
58    }
59}
60
61impl Decoder for Extractor {
62    type Item = (Option<String>, Bytes);
63    type Error = error::General;
64
65    fn decode(&mut self, buf: &mut BytesMut) -> Result<Option<Self::Item>, Self::Error> {
66        loop {
67            {
68                // pop if necessary
69                let mut handler = self.handler.lock().unwrap();
70                if let Some((path, bytes)) = handler.pop() {
71                    return Ok(Some((path, Bytes::from(bytes))));
72                }
73                // handler is unlocked here so it can be used later withing `process` method
74            }
75            if buf.is_empty() {
76                // end has been reached
77                return Ok(None);
78            }
79            let data = buf.split_to(buf.len());
80            self.trigger.process(&data[..])?;
81        }
82    }
83
84    fn decode_eof(&mut self, buf: &mut BytesMut) -> Result<Option<Self::Item>, Self::Error> {
85        self.trigger.terminate()?;
86        match self.decode(buf)? {
87            Some(frame) => Ok(Some(frame)),
88            None => {
89                if buf.is_empty() {
90                    Ok(None)
91                } else {
92                    Err(io::Error::new(io::ErrorKind::Other, "bytes remaining on stream").into())
93                }
94            }
95        }
96    }
97}
98
99#[cfg(test)]
100mod tests {
101    use super::Extractor;
102    use bytes::Bytes;
103    use std::io::Cursor;
104    use streamson_lib::matcher;
105    use tokio::stream::StreamExt;
106    use tokio_util::codec::FramedRead;
107
108    #[tokio::test]
109    async fn with_included_path() {
110        let cursor =
111            Cursor::new(br#"{"users": ["mike","john"], "groups": ["admin", "staff"]}"#.to_vec());
112        let matcher = matcher::Combinator::new(matcher::Simple::new(r#"{"users"}[]"#).unwrap())
113            | matcher::Combinator::new(matcher::Simple::new(r#"{"groups"}[]"#).unwrap());
114        let extractor = Extractor::new(matcher, true);
115        let mut output = FramedRead::new(cursor, extractor);
116
117        assert_eq!(
118            output.next().await.unwrap().unwrap(),
119            (
120                Some(r#"{"users"}[0]"#.to_string()),
121                Bytes::from_static(br#""mike""#)
122            )
123        );
124
125        assert_eq!(
126            output.next().await.unwrap().unwrap(),
127            (
128                Some(r#"{"users"}[1]"#.to_string()),
129                Bytes::from_static(br#""john""#)
130            )
131        );
132
133        assert_eq!(
134            output.next().await.unwrap().unwrap(),
135            (
136                Some(r#"{"groups"}[0]"#.to_string()),
137                Bytes::from_static(br#""admin""#)
138            )
139        );
140
141        assert_eq!(
142            output.next().await.unwrap().unwrap(),
143            (
144                Some(r#"{"groups"}[1]"#.to_string()),
145                Bytes::from_static(br#""staff""#)
146            )
147        );
148
149        assert!(output.next().await.is_none());
150    }
151
152    #[tokio::test]
153    async fn without_included_path() {
154        let cursor =
155            Cursor::new(br#"{"users": ["mike","john"], "groups": ["admin", "staff"]}"#.to_vec());
156        let matcher = matcher::Combinator::new(matcher::Simple::new(r#"{"users"}[]"#).unwrap())
157            | matcher::Combinator::new(matcher::Simple::new(r#"{"groups"}[]"#).unwrap());
158        let extractor = Extractor::new(matcher, false);
159        let mut output = FramedRead::new(cursor, extractor);
160
161        assert_eq!(
162            output.next().await.unwrap().unwrap(),
163            (None, Bytes::from_static(br#""mike""#))
164        );
165
166        assert_eq!(
167            output.next().await.unwrap().unwrap(),
168            (None, Bytes::from_static(br#""john""#))
169        );
170
171        assert_eq!(
172            output.next().await.unwrap().unwrap(),
173            (None, Bytes::from_static(br#""admin""#))
174        );
175
176        assert_eq!(
177            output.next().await.unwrap().unwrap(),
178            (None, Bytes::from_static(br#""staff""#))
179        );
180
181        assert!(output.next().await.is_none());
182    }
183
184    #[tokio::test]
185    async fn multiple_json_input() {
186        let cursor = Cursor::new(
187            br#"{"users": ["user1","user2", "user3"]} {"users": ["user4","user5"]}"#.to_vec(),
188        );
189        let matcher = matcher::Simple::new(r#"{"users"}[]"#).unwrap();
190        let extractor = Extractor::new(matcher, true);
191
192        let mut output = FramedRead::new(cursor, extractor);
193
194        assert_eq!(
195            output.next().await.unwrap().unwrap(),
196            (
197                Some(r#"{"users"}[0]"#.to_string()),
198                Bytes::from_static(br#""user1""#)
199            )
200        );
201        assert_eq!(
202            output.next().await.unwrap().unwrap(),
203            (
204                Some(r#"{"users"}[1]"#.to_string()),
205                Bytes::from_static(br#""user2""#)
206            )
207        );
208        assert_eq!(
209            output.next().await.unwrap().unwrap(),
210            (
211                Some(r#"{"users"}[2]"#.to_string()),
212                Bytes::from_static(br#""user3""#)
213            )
214        );
215        assert_eq!(
216            output.next().await.unwrap().unwrap(),
217            (
218                Some(r#"{"users"}[0]"#.to_string()),
219                Bytes::from_static(br#""user4""#)
220            )
221        );
222        assert_eq!(
223            output.next().await.unwrap().unwrap(),
224            (
225                Some(r#"{"users"}[1]"#.to_string()),
226                Bytes::from_static(br#""user5""#)
227            )
228        );
229
230        assert!(output.next().await.is_none());
231    }
232}