Skip to main content

couchbase_core/httpx/
decoder.rs

1/*
2 *
3 *  * Copyright (c) 2025 Couchbase, Inc.
4 *  *
5 *  * Licensed under the Apache License, Version 2.0 (the "License");
6 *  * you may not use this file except in compliance with the License.
7 *  * You may obtain a copy of the License at
8 *  *
9 *  *    http://www.apache.org/licenses/LICENSE-2.0
10 *  *
11 *  * Unless required by applicable law or agreed to in writing, software
12 *  * distributed under the License is distributed on an "AS IS" BASIS,
13 *  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14 *  * See the License for the specific language governing permissions and
15 *  * limitations under the License.
16 *
17 */
18
19use crate::httpx::error;
20use crate::httpx::error::Result as HttpxResult;
21use crate::httpx::scanner::{ScanState, Scanner};
22use bytes::Bytes;
23use futures_core::Stream;
24use std::pin::Pin;
25use tokio_stream::StreamExt;
26
27pub type DecoderStream = dyn Stream<Item = error::Result<Bytes>> + Send + Unpin;
28
29pub struct Decoder {
30    r: Pin<Box<DecoderStream>>,
31    buf: Vec<u8>,
32    scanp: usize,
33    scanned: usize,
34    scan: Scanner,
35    err: Option<error::Error>,
36    token_state: TokenState,
37    token_stack: Vec<TokenState>,
38}
39
40impl Decoder {
41    pub fn new<R>(r: R) -> Self
42    where
43        R: Stream<Item = error::Result<Bytes>> + Send + 'static + Unpin,
44    {
45        Decoder {
46            r: Box::pin(r),
47            buf: Vec::new(),
48            scanp: 0,
49            scanned: 0,
50            scan: Scanner::new(),
51            err: None,
52            token_state: TokenState::TopValue,
53            token_stack: Vec::new(),
54        }
55    }
56
57    pub async fn decode(&mut self) -> HttpxResult<Vec<u8>> {
58        if let Some(err) = &self.err {
59            return Err(err.clone());
60        }
61
62        self.token_prepare_for_decode().await?;
63
64        if !self.token_value_allowed() {
65            return Err(error::Error::new_message_error("not at beginning of value"));
66        }
67
68        let n = self.read_value().await?;
69        let val = self.buf[self.scanp..self.scanp + n].trim_ascii().to_vec();
70        self.scanp += n;
71
72        self.token_value_end();
73
74        Ok(val)
75    }
76
77    fn buffered(&self) -> &[u8] {
78        &self.buf[self.scanp..]
79    }
80
81    async fn read_value(&mut self) -> HttpxResult<usize> {
82        self.scan.reset();
83        let mut scanp = self.scanp;
84        let mut res: Option<HttpxResult<()>> = None;
85
86        loop {
87            while scanp < self.buf.len() {
88                let c = self.buf[scanp];
89                self.scan.incr_bytes(1);
90                match self.scan.step(c) {
91                    ScanState::End => {
92                        self.scan.incr_bytes(-1);
93                        return Ok(scanp - self.scanp);
94                    }
95                    ScanState::EndObject | ScanState::EndArray => {
96                        if self.scan.step(b' ') == ScanState::End {
97                            scanp += 1;
98                            return Ok(scanp - self.scanp);
99                        }
100                    }
101                    ScanState::Error => {
102                        let scan_err = self.scan.err().expect("scan state error but no error set");
103                        self.err = Some(scan_err.clone());
104                        return Err(scan_err.clone());
105                    }
106                    _ => {}
107                }
108                scanp += 1;
109            }
110
111            // Did the last read have an error?
112            // Delayed until now to allow buffer scan.
113            if let Some(Err(e)) = res {
114                self.err = Some(e.clone());
115                return Err(e);
116            }
117
118            let n = scanp - self.scanp;
119            res = self.refill().await;
120            scanp = self.scanp + n;
121
122            if res.is_none() {
123                if self.scan.step(b' ') == ScanState::End {
124                    return Ok(scanp - self.scanp);
125                }
126
127                if self.buf.iter().any(|&b| !b.is_ascii_whitespace()) {
128                    self.err = Some(error::Error::new_message_error("unexpected EOF"));
129                }
130
131                return match self.err {
132                    Some(ref e) => Err(e.clone()),
133                    None => Ok(scanp - self.scanp),
134                };
135            }
136        }
137    }
138
139    async fn refill(&mut self) -> Option<HttpxResult<()>> {
140        // Make room to read more into the buffer.
141        // First slide down data already consumed.
142        if self.scanp > 0 {
143            self.scanned += self.scanp;
144            let n = self.buf.len() - self.scanp;
145            self.buf.copy_within(self.scanp.., 0);
146            self.buf.truncate(n);
147            self.scanp = 0;
148        }
149
150        if let Some(r) = self.r.next().await {
151            return match r {
152                Ok(buf) => {
153                    self.buf.extend_from_slice(&buf[..]);
154                    Some(Ok(()))
155                }
156                Err(e) => Some(Err(e)),
157            };
158        };
159
160        None
161    }
162
163    async fn token_prepare_for_decode(&mut self) -> HttpxResult<()> {
164        match self.token_state {
165            TokenState::ArrayComma => {
166                let c = match self.peek().await {
167                    Some(Ok(c)) => c,
168                    Some(Err(e)) => return Err(e),
169                    None => return Err(error::Error::new_message_error("unexpected EOF")),
170                };
171                if c != b',' {
172                    return Err(error::Error::new_message_error(
173                        "expected comma after array element",
174                    ));
175                }
176                self.scanp += 1;
177                self.token_state = TokenState::ArrayValue;
178            }
179            TokenState::ObjectColon => {
180                let c = match self.peek().await {
181                    Some(Ok(c)) => c,
182                    Some(Err(e)) => return Err(e),
183                    None => return Err(error::Error::new_message_error("unexpected EOF")),
184                };
185                if c != b':' {
186                    return Err(error::Error::new_message_error(
187                        "expected colon after object key",
188                    ));
189                }
190                self.scanp += 1;
191                self.token_state = TokenState::ObjectValue;
192            }
193            _ => {}
194        }
195        Ok(())
196    }
197
198    fn token_value_allowed(&self) -> bool {
199        matches!(
200            self.token_state,
201            TokenState::TopValue
202                | TokenState::ArrayStart
203                | TokenState::ArrayValue
204                | TokenState::ObjectValue
205        )
206    }
207
208    fn token_value_end(&mut self) {
209        match self.token_state {
210            TokenState::ArrayStart | TokenState::ArrayValue => {
211                self.token_state = TokenState::ArrayComma;
212            }
213            TokenState::ObjectValue => {
214                self.token_state = TokenState::ObjectComma;
215            }
216            _ => {}
217        }
218    }
219
220    async fn peek(&mut self) -> Option<HttpxResult<u8>> {
221        let mut res = None;
222        loop {
223            for i in self.scanp..self.buf.len() {
224                let c = self.buf[i];
225                if c.is_ascii_whitespace() {
226                    continue;
227                }
228                self.scanp = i;
229                return Some(Ok(c));
230            }
231            if let Some(r) = res {
232                match r {
233                    Ok(_) => {}
234                    Err(e) => {
235                        return Some(Err(e));
236                    }
237                }
238            }
239
240            res = match self.refill().await {
241                Some(r) => Some(r),
242                None => {
243                    return None;
244                }
245            };
246        }
247    }
248
249    fn input_offset(&self) -> usize {
250        self.scanned + self.scanp
251    }
252
253    pub async fn token(&mut self) -> HttpxResult<Token> {
254        loop {
255            let c = match self.peek().await {
256                Some(Ok(c)) => c,
257                Some(Err(e)) => return Err(e),
258                None => return Err(error::Error::new_message_error("unexpected EOF")),
259            };
260            match c {
261                b'[' => {
262                    if !self.token_value_allowed() {
263                        return self.token_error(c);
264                    }
265                    self.scanp += 1;
266                    self.token_stack.push(self.token_state);
267                    self.token_state = TokenState::ArrayStart;
268                    return Ok(Token::Delim('['));
269                }
270                b']' => {
271                    if self.token_state != TokenState::ArrayStart
272                        && self.token_state != TokenState::ArrayComma
273                    {
274                        return self.token_error(c);
275                    }
276                    self.scanp += 1;
277                    self.token_state = self.token_stack.pop().unwrap();
278                    self.token_value_end();
279                    return Ok(Token::Delim(']'));
280                }
281                b'{' => {
282                    if !self.token_value_allowed() {
283                        return self.token_error(c);
284                    }
285                    self.scanp += 1;
286                    self.token_stack.push(self.token_state);
287                    self.token_state = TokenState::ObjectStart;
288                    return Ok(Token::Delim('{'));
289                }
290                b'}' => {
291                    if self.token_state != TokenState::ObjectStart
292                        && self.token_state != TokenState::ObjectComma
293                    {
294                        return self.token_error(c);
295                    }
296                    self.scanp += 1;
297                    self.token_state = self.token_stack.pop().unwrap();
298                    self.token_value_end();
299                    return Ok(Token::Delim('}'));
300                }
301                b':' => {
302                    if self.token_state != TokenState::ObjectColon {
303                        return self.token_error(c);
304                    }
305                    self.scanp += 1;
306                    self.token_state = TokenState::ObjectValue;
307                    continue;
308                }
309                b',' => {
310                    if self.token_state == TokenState::ArrayComma {
311                        self.scanp += 1;
312                        self.token_state = TokenState::ArrayValue;
313                        continue;
314                    }
315                    if self.token_state == TokenState::ObjectComma {
316                        self.scanp += 1;
317                        self.token_state = TokenState::ObjectKey;
318                        continue;
319                    }
320                    return self.token_error(c);
321                }
322                b'"' => {
323                    if self.token_state == TokenState::ObjectStart
324                        || self.token_state == TokenState::ObjectKey
325                    {
326                        let old = self.token_state;
327                        self.token_state = TokenState::TopValue;
328                        let decoded = self.decode().await?;
329                        let x = serde_json::from_slice(&decoded)
330                            .map_err(|e| error::Error::new_message_error(format!("{e}")))?;
331                        self.token_state = old;
332                        self.token_state = TokenState::ObjectColon;
333                        return Ok(Token::String(x));
334                    }
335
336                    if !self.token_value_allowed() {
337                        return self.token_error(c);
338                    }
339
340                    let decoded = self.decode().await?;
341                    return Ok(Token::Value(decoded));
342                }
343                _ => {
344                    if !self.token_value_allowed() {
345                        return self.token_error(c);
346                    }
347
348                    let decoded = self.decode().await?;
349                    return Ok(Token::Value(decoded));
350                }
351            }
352        }
353    }
354
355    fn token_error(&self, c: u8) -> HttpxResult<Token> {
356        let context = match self.token_state {
357            TokenState::TopValue => " looking for beginning of value",
358            TokenState::ArrayStart | TokenState::ArrayValue | TokenState::ObjectValue => {
359                " looking for beginning of value"
360            }
361            TokenState::ArrayComma => " after array element",
362            TokenState::ObjectKey => " looking for beginning of object key string",
363            TokenState::ObjectColon => " after object key",
364            TokenState::ObjectComma => " after object key:value pair",
365            _ => "",
366        };
367        Err(error::Error::new_message_error(format!(
368            "invalid character {}{}",
369            Scanner::quote_char(c),
370            context
371        )))
372    }
373
374    pub async fn more(&mut self) -> bool {
375        let c = self.peek().await;
376        match c {
377            Some(Ok(c)) => c != b']' && c != b'}',
378            Some(Err(_)) => false,
379            None => false,
380        }
381    }
382}
383
384#[derive(Copy, Clone, Debug, PartialEq)]
385pub enum TokenState {
386    TopValue,
387    ArrayStart,
388    ArrayValue,
389    ArrayComma,
390    ObjectStart,
391    ObjectKey,
392    ObjectColon,
393    ObjectValue,
394    ObjectComma,
395}
396
397#[derive(Clone, Debug, PartialEq)]
398pub enum Token {
399    Delim(char),
400    String(String),
401    Value(Vec<u8>),
402}
403
404#[cfg(test)]
405mod tests {
406    use super::*;
407    use bytes::Bytes;
408
409    struct TestStream {
410        data: Vec<Bytes>,
411    }
412
413    impl TestStream {
414        fn new(data: Vec<Bytes>) -> Self {
415            TestStream { data }
416        }
417    }
418
419    impl Unpin for TestStream {}
420
421    impl Stream for TestStream {
422        type Item = error::Result<Bytes>;
423
424        fn poll_next(
425            mut self: std::pin::Pin<&mut Self>,
426            _cx: &mut std::task::Context<'_>,
427        ) -> std::task::Poll<Option<Self::Item>> {
428            if self.data.is_empty() {
429                std::task::Poll::Ready(None)
430            } else {
431                std::task::Poll::Ready(Some(Ok(self.data.remove(0))))
432            }
433        }
434    }
435
436    #[tokio::test]
437    async fn test_decode_object() {
438        let data = vec![Bytes::from_static(b"{\"key\":\"value\"}")];
439        let stream = TestStream::new(data);
440        let mut decoder = Decoder::new(stream);
441
442        let result = decoder.decode().await.unwrap();
443        let result: serde_json::Value = serde_json::from_slice(&result).unwrap();
444        assert_eq!(result, serde_json::json!({"key": "value"}));
445    }
446
447    #[tokio::test]
448    async fn test_decode_array() {
449        let data = vec![Bytes::from_static(b"[1, 2, 3]")];
450        let stream = TestStream::new(data);
451        let mut decoder = Decoder::new(stream);
452
453        let result = decoder.decode().await.unwrap();
454        let result: serde_json::Value = serde_json::from_slice(&result).unwrap();
455        assert_eq!(result, serde_json::json!([1, 2, 3]));
456    }
457
458    #[tokio::test]
459    async fn test_decode_string() {
460        let data = vec![Bytes::from_static(b"\"hello\"")];
461        let stream = TestStream::new(data);
462        let mut decoder = Decoder::new(stream);
463
464        let result = decoder.decode().await.unwrap();
465        let result: serde_json::Value = serde_json::from_slice(&result).unwrap();
466        assert_eq!(result, serde_json::json!("hello"));
467    }
468
469    #[tokio::test]
470    async fn test_decode_number() {
471        let data = vec![Bytes::from_static(b"123")];
472        let stream = TestStream::new(data);
473        let mut decoder = Decoder::new(stream);
474
475        let result = decoder.decode().await.unwrap();
476        let result: serde_json::Value = serde_json::from_slice(&result).unwrap();
477        assert_eq!(result, serde_json::json!(123));
478    }
479
480    #[tokio::test]
481    async fn test_decode_boolean() {
482        let data = vec![Bytes::from_static(b"true")];
483        let stream = TestStream::new(data);
484        let mut decoder = Decoder::new(stream);
485
486        let result = decoder.decode().await.unwrap();
487        let result: serde_json::Value = serde_json::from_slice(&result).unwrap();
488        assert_eq!(result, serde_json::json!(true));
489    }
490
491    #[tokio::test]
492    async fn test_decode_null() {
493        let data = vec![Bytes::from_static(b"null")];
494        let stream = TestStream::new(data);
495        let mut decoder = Decoder::new(stream);
496
497        let result = decoder.decode().await.unwrap();
498        let result: serde_json::Value = serde_json::from_slice(&result).unwrap();
499        assert_eq!(result, serde_json::json!(null));
500    }
501    #[tokio::test]
502    async fn test_token_object_start() {
503        let data = vec![Bytes::from_static(b"{\"key\":\"value\"}")];
504        let stream = TestStream::new(data);
505        let mut decoder = Decoder::new(stream);
506
507        let token = decoder.token().await.unwrap();
508        assert_eq!(token, Token::Delim('{'));
509    }
510
511    #[tokio::test]
512    async fn test_token_object_end() {
513        let data = vec![Bytes::from_static(
514            b"{\"key\":\"value\", \"key2\":\"value2\"}",
515        )];
516        let stream = TestStream::new(data);
517        let mut decoder = Decoder::new(stream);
518
519        // Read the start of the object
520        let token = decoder.token().await.unwrap();
521        assert_eq!(token, Token::Delim('{'));
522        // Read the key
523        let token = decoder.token().await.unwrap();
524        assert_eq!(token, Token::String("key".to_string()));
525        // Read the value
526        let token = decoder.token().await.unwrap();
527        assert_eq!(token, Token::Value(Vec::from(r#""value""#)));
528        // Read the key2
529        let token = decoder.token().await.unwrap();
530        assert_eq!(token, Token::String("key2".to_string()));
531        // Read the value2
532        let token = decoder.token().await.unwrap();
533        assert_eq!(token, Token::Value(Vec::from(r#""value2""#)));
534        // Read the end of the object
535        let token = decoder.token().await.unwrap();
536        assert_eq!(token, Token::Delim('}'));
537    }
538
539    #[tokio::test]
540    async fn test_token_array_start() {
541        let data = vec![Bytes::from_static(b"[1, 2, 3]")];
542        let stream = TestStream::new(data);
543        let mut decoder = Decoder::new(stream);
544
545        let token = decoder.token().await.unwrap();
546        assert_eq!(token, Token::Delim('['));
547    }
548
549    #[tokio::test]
550    async fn test_token_array_end() {
551        let data = vec![Bytes::from_static(b"[1, 2, 3]")];
552        let stream = TestStream::new(data);
553        let mut decoder = Decoder::new(stream);
554
555        // Read the start of the array
556        let token = decoder.token().await.unwrap();
557        assert_eq!(token, Token::Delim('['));
558        // Read the first value
559        let token = decoder.token().await.unwrap();
560        assert_eq!(token, Token::Value(b"1".to_vec()));
561        // Read the second value
562        let token = decoder.token().await.unwrap();
563        assert_eq!(token, Token::Value(b"2".to_vec()));
564        // Read the third value
565        let token = decoder.token().await.unwrap();
566        assert_eq!(token, Token::Value(b"3".to_vec()));
567        // Read the end of the array
568        let token = decoder.token().await.unwrap();
569        assert_eq!(token, Token::Delim(']'));
570    }
571
572    #[tokio::test]
573    async fn test_token_string() {
574        let data = vec![Bytes::from_static(b"\"hello\"")];
575        let stream = TestStream::new(data);
576        let mut decoder = Decoder::new(stream);
577
578        let token = decoder.token().await.unwrap();
579        assert_eq!(token, Token::Value(Vec::from(r#""hello""#)));
580    }
581
582    #[tokio::test]
583    async fn test_token_number() {
584        let data = vec![Bytes::from_static(b"123")];
585        let stream = TestStream::new(data);
586        let mut decoder = Decoder::new(stream);
587
588        let token = decoder.token().await.unwrap();
589        assert_eq!(token, Token::Value(b"123".to_vec()));
590    }
591
592    #[tokio::test]
593    async fn test_token_boolean() {
594        let data = vec![Bytes::from_static(b"true")];
595        let stream = TestStream::new(data);
596        let mut decoder = Decoder::new(stream);
597
598        let token = decoder.token().await.unwrap();
599        assert_eq!(token, Token::Value(b"true".to_vec()));
600    }
601
602    #[tokio::test]
603    async fn test_token_null() {
604        let data = vec![Bytes::from_static(b"null")];
605        let stream = TestStream::new(data);
606        let mut decoder = Decoder::new(stream);
607
608        let token = decoder.token().await.unwrap();
609        assert_eq!(token, Token::Value(b"null".to_vec()));
610    }
611}