1use crate::httpx::error;
20use crate::httpx::error::Result as HttpxResult;
21use crate::httpx::scanner::{ScanState, Scanner};
22use bytes::Bytes;
23use futures_core::Stream;
24use std::pin::Pin;
25use tokio_stream::StreamExt;
26
27pub type DecoderStream = dyn Stream<Item = error::Result<Bytes>> + Send + Unpin;
28
29pub struct Decoder {
30 r: Pin<Box<DecoderStream>>,
31 buf: Vec<u8>,
32 scanp: usize,
33 scanned: usize,
34 scan: Scanner,
35 err: Option<error::Error>,
36 token_state: TokenState,
37 token_stack: Vec<TokenState>,
38}
39
40impl Decoder {
41 pub fn new<R>(r: R) -> Self
42 where
43 R: Stream<Item = error::Result<Bytes>> + Send + 'static + Unpin,
44 {
45 Decoder {
46 r: Box::pin(r),
47 buf: Vec::new(),
48 scanp: 0,
49 scanned: 0,
50 scan: Scanner::new(),
51 err: None,
52 token_state: TokenState::TopValue,
53 token_stack: Vec::new(),
54 }
55 }
56
57 pub async fn decode(&mut self) -> HttpxResult<Vec<u8>> {
58 if let Some(err) = &self.err {
59 return Err(err.clone());
60 }
61
62 self.token_prepare_for_decode().await?;
63
64 if !self.token_value_allowed() {
65 return Err(error::Error::new_message_error("not at beginning of value"));
66 }
67
68 let n = self.read_value().await?;
69 let val = self.buf[self.scanp..self.scanp + n].trim_ascii().to_vec();
70 self.scanp += n;
71
72 self.token_value_end();
73
74 Ok(val)
75 }
76
77 fn buffered(&self) -> &[u8] {
78 &self.buf[self.scanp..]
79 }
80
81 async fn read_value(&mut self) -> HttpxResult<usize> {
82 self.scan.reset();
83 let mut scanp = self.scanp;
84 let mut res: Option<HttpxResult<()>> = None;
85
86 loop {
87 while scanp < self.buf.len() {
88 let c = self.buf[scanp];
89 self.scan.incr_bytes(1);
90 match self.scan.step(c) {
91 ScanState::End => {
92 self.scan.incr_bytes(-1);
93 return Ok(scanp - self.scanp);
94 }
95 ScanState::EndObject | ScanState::EndArray => {
96 if self.scan.step(b' ') == ScanState::End {
97 scanp += 1;
98 return Ok(scanp - self.scanp);
99 }
100 }
101 ScanState::Error => {
102 let scan_err = self.scan.err().expect("scan state error but no error set");
103 self.err = Some(scan_err.clone());
104 return Err(scan_err.clone());
105 }
106 _ => {}
107 }
108 scanp += 1;
109 }
110
111 if let Some(Err(e)) = res {
114 self.err = Some(e.clone());
115 return Err(e);
116 }
117
118 let n = scanp - self.scanp;
119 res = self.refill().await;
120 scanp = self.scanp + n;
121
122 if res.is_none() {
123 if self.scan.step(b' ') == ScanState::End {
124 return Ok(scanp - self.scanp);
125 }
126
127 if self.buf.iter().any(|&b| !b.is_ascii_whitespace()) {
128 self.err = Some(error::Error::new_message_error("unexpected EOF"));
129 }
130
131 return match self.err {
132 Some(ref e) => Err(e.clone()),
133 None => Ok(scanp - self.scanp),
134 };
135 }
136 }
137 }
138
139 async fn refill(&mut self) -> Option<HttpxResult<()>> {
140 if self.scanp > 0 {
143 self.scanned += self.scanp;
144 let n = self.buf.len() - self.scanp;
145 self.buf.copy_within(self.scanp.., 0);
146 self.buf.truncate(n);
147 self.scanp = 0;
148 }
149
150 if let Some(r) = self.r.next().await {
151 return match r {
152 Ok(buf) => {
153 self.buf.extend_from_slice(&buf[..]);
154 Some(Ok(()))
155 }
156 Err(e) => Some(Err(e)),
157 };
158 };
159
160 None
161 }
162
163 async fn token_prepare_for_decode(&mut self) -> HttpxResult<()> {
164 match self.token_state {
165 TokenState::ArrayComma => {
166 let c = match self.peek().await {
167 Some(Ok(c)) => c,
168 Some(Err(e)) => return Err(e),
169 None => return Err(error::Error::new_message_error("unexpected EOF")),
170 };
171 if c != b',' {
172 return Err(error::Error::new_message_error(
173 "expected comma after array element",
174 ));
175 }
176 self.scanp += 1;
177 self.token_state = TokenState::ArrayValue;
178 }
179 TokenState::ObjectColon => {
180 let c = match self.peek().await {
181 Some(Ok(c)) => c,
182 Some(Err(e)) => return Err(e),
183 None => return Err(error::Error::new_message_error("unexpected EOF")),
184 };
185 if c != b':' {
186 return Err(error::Error::new_message_error(
187 "expected colon after object key",
188 ));
189 }
190 self.scanp += 1;
191 self.token_state = TokenState::ObjectValue;
192 }
193 _ => {}
194 }
195 Ok(())
196 }
197
198 fn token_value_allowed(&self) -> bool {
199 matches!(
200 self.token_state,
201 TokenState::TopValue
202 | TokenState::ArrayStart
203 | TokenState::ArrayValue
204 | TokenState::ObjectValue
205 )
206 }
207
208 fn token_value_end(&mut self) {
209 match self.token_state {
210 TokenState::ArrayStart | TokenState::ArrayValue => {
211 self.token_state = TokenState::ArrayComma;
212 }
213 TokenState::ObjectValue => {
214 self.token_state = TokenState::ObjectComma;
215 }
216 _ => {}
217 }
218 }
219
220 async fn peek(&mut self) -> Option<HttpxResult<u8>> {
221 let mut res = None;
222 loop {
223 for i in self.scanp..self.buf.len() {
224 let c = self.buf[i];
225 if c.is_ascii_whitespace() {
226 continue;
227 }
228 self.scanp = i;
229 return Some(Ok(c));
230 }
231 if let Some(r) = res {
232 match r {
233 Ok(_) => {}
234 Err(e) => {
235 return Some(Err(e));
236 }
237 }
238 }
239
240 res = match self.refill().await {
241 Some(r) => Some(r),
242 None => {
243 return None;
244 }
245 };
246 }
247 }
248
249 fn input_offset(&self) -> usize {
250 self.scanned + self.scanp
251 }
252
253 pub async fn token(&mut self) -> HttpxResult<Token> {
254 loop {
255 let c = match self.peek().await {
256 Some(Ok(c)) => c,
257 Some(Err(e)) => return Err(e),
258 None => return Err(error::Error::new_message_error("unexpected EOF")),
259 };
260 match c {
261 b'[' => {
262 if !self.token_value_allowed() {
263 return self.token_error(c);
264 }
265 self.scanp += 1;
266 self.token_stack.push(self.token_state);
267 self.token_state = TokenState::ArrayStart;
268 return Ok(Token::Delim('['));
269 }
270 b']' => {
271 if self.token_state != TokenState::ArrayStart
272 && self.token_state != TokenState::ArrayComma
273 {
274 return self.token_error(c);
275 }
276 self.scanp += 1;
277 self.token_state = self.token_stack.pop().unwrap();
278 self.token_value_end();
279 return Ok(Token::Delim(']'));
280 }
281 b'{' => {
282 if !self.token_value_allowed() {
283 return self.token_error(c);
284 }
285 self.scanp += 1;
286 self.token_stack.push(self.token_state);
287 self.token_state = TokenState::ObjectStart;
288 return Ok(Token::Delim('{'));
289 }
290 b'}' => {
291 if self.token_state != TokenState::ObjectStart
292 && self.token_state != TokenState::ObjectComma
293 {
294 return self.token_error(c);
295 }
296 self.scanp += 1;
297 self.token_state = self.token_stack.pop().unwrap();
298 self.token_value_end();
299 return Ok(Token::Delim('}'));
300 }
301 b':' => {
302 if self.token_state != TokenState::ObjectColon {
303 return self.token_error(c);
304 }
305 self.scanp += 1;
306 self.token_state = TokenState::ObjectValue;
307 continue;
308 }
309 b',' => {
310 if self.token_state == TokenState::ArrayComma {
311 self.scanp += 1;
312 self.token_state = TokenState::ArrayValue;
313 continue;
314 }
315 if self.token_state == TokenState::ObjectComma {
316 self.scanp += 1;
317 self.token_state = TokenState::ObjectKey;
318 continue;
319 }
320 return self.token_error(c);
321 }
322 b'"' => {
323 if self.token_state == TokenState::ObjectStart
324 || self.token_state == TokenState::ObjectKey
325 {
326 let old = self.token_state;
327 self.token_state = TokenState::TopValue;
328 let decoded = self.decode().await?;
329 let x = serde_json::from_slice(&decoded)
330 .map_err(|e| error::Error::new_message_error(format!("{e}")))?;
331 self.token_state = old;
332 self.token_state = TokenState::ObjectColon;
333 return Ok(Token::String(x));
334 }
335
336 if !self.token_value_allowed() {
337 return self.token_error(c);
338 }
339
340 let decoded = self.decode().await?;
341 return Ok(Token::Value(decoded));
342 }
343 _ => {
344 if !self.token_value_allowed() {
345 return self.token_error(c);
346 }
347
348 let decoded = self.decode().await?;
349 return Ok(Token::Value(decoded));
350 }
351 }
352 }
353 }
354
355 fn token_error(&self, c: u8) -> HttpxResult<Token> {
356 let context = match self.token_state {
357 TokenState::TopValue => " looking for beginning of value",
358 TokenState::ArrayStart | TokenState::ArrayValue | TokenState::ObjectValue => {
359 " looking for beginning of value"
360 }
361 TokenState::ArrayComma => " after array element",
362 TokenState::ObjectKey => " looking for beginning of object key string",
363 TokenState::ObjectColon => " after object key",
364 TokenState::ObjectComma => " after object key:value pair",
365 _ => "",
366 };
367 Err(error::Error::new_message_error(format!(
368 "invalid character {}{}",
369 Scanner::quote_char(c),
370 context
371 )))
372 }
373
374 pub async fn more(&mut self) -> bool {
375 let c = self.peek().await;
376 match c {
377 Some(Ok(c)) => c != b']' && c != b'}',
378 Some(Err(_)) => false,
379 None => false,
380 }
381 }
382}
383
384#[derive(Copy, Clone, Debug, PartialEq)]
385pub enum TokenState {
386 TopValue,
387 ArrayStart,
388 ArrayValue,
389 ArrayComma,
390 ObjectStart,
391 ObjectKey,
392 ObjectColon,
393 ObjectValue,
394 ObjectComma,
395}
396
397#[derive(Clone, Debug, PartialEq)]
398pub enum Token {
399 Delim(char),
400 String(String),
401 Value(Vec<u8>),
402}
403
404#[cfg(test)]
405mod tests {
406 use super::*;
407 use bytes::Bytes;
408
409 struct TestStream {
410 data: Vec<Bytes>,
411 }
412
413 impl TestStream {
414 fn new(data: Vec<Bytes>) -> Self {
415 TestStream { data }
416 }
417 }
418
419 impl Unpin for TestStream {}
420
421 impl Stream for TestStream {
422 type Item = error::Result<Bytes>;
423
424 fn poll_next(
425 mut self: std::pin::Pin<&mut Self>,
426 _cx: &mut std::task::Context<'_>,
427 ) -> std::task::Poll<Option<Self::Item>> {
428 if self.data.is_empty() {
429 std::task::Poll::Ready(None)
430 } else {
431 std::task::Poll::Ready(Some(Ok(self.data.remove(0))))
432 }
433 }
434 }
435
436 #[tokio::test]
437 async fn test_decode_object() {
438 let data = vec![Bytes::from_static(b"{\"key\":\"value\"}")];
439 let stream = TestStream::new(data);
440 let mut decoder = Decoder::new(stream);
441
442 let result = decoder.decode().await.unwrap();
443 let result: serde_json::Value = serde_json::from_slice(&result).unwrap();
444 assert_eq!(result, serde_json::json!({"key": "value"}));
445 }
446
447 #[tokio::test]
448 async fn test_decode_array() {
449 let data = vec![Bytes::from_static(b"[1, 2, 3]")];
450 let stream = TestStream::new(data);
451 let mut decoder = Decoder::new(stream);
452
453 let result = decoder.decode().await.unwrap();
454 let result: serde_json::Value = serde_json::from_slice(&result).unwrap();
455 assert_eq!(result, serde_json::json!([1, 2, 3]));
456 }
457
458 #[tokio::test]
459 async fn test_decode_string() {
460 let data = vec![Bytes::from_static(b"\"hello\"")];
461 let stream = TestStream::new(data);
462 let mut decoder = Decoder::new(stream);
463
464 let result = decoder.decode().await.unwrap();
465 let result: serde_json::Value = serde_json::from_slice(&result).unwrap();
466 assert_eq!(result, serde_json::json!("hello"));
467 }
468
469 #[tokio::test]
470 async fn test_decode_number() {
471 let data = vec![Bytes::from_static(b"123")];
472 let stream = TestStream::new(data);
473 let mut decoder = Decoder::new(stream);
474
475 let result = decoder.decode().await.unwrap();
476 let result: serde_json::Value = serde_json::from_slice(&result).unwrap();
477 assert_eq!(result, serde_json::json!(123));
478 }
479
480 #[tokio::test]
481 async fn test_decode_boolean() {
482 let data = vec![Bytes::from_static(b"true")];
483 let stream = TestStream::new(data);
484 let mut decoder = Decoder::new(stream);
485
486 let result = decoder.decode().await.unwrap();
487 let result: serde_json::Value = serde_json::from_slice(&result).unwrap();
488 assert_eq!(result, serde_json::json!(true));
489 }
490
491 #[tokio::test]
492 async fn test_decode_null() {
493 let data = vec![Bytes::from_static(b"null")];
494 let stream = TestStream::new(data);
495 let mut decoder = Decoder::new(stream);
496
497 let result = decoder.decode().await.unwrap();
498 let result: serde_json::Value = serde_json::from_slice(&result).unwrap();
499 assert_eq!(result, serde_json::json!(null));
500 }
501 #[tokio::test]
502 async fn test_token_object_start() {
503 let data = vec![Bytes::from_static(b"{\"key\":\"value\"}")];
504 let stream = TestStream::new(data);
505 let mut decoder = Decoder::new(stream);
506
507 let token = decoder.token().await.unwrap();
508 assert_eq!(token, Token::Delim('{'));
509 }
510
511 #[tokio::test]
512 async fn test_token_object_end() {
513 let data = vec![Bytes::from_static(
514 b"{\"key\":\"value\", \"key2\":\"value2\"}",
515 )];
516 let stream = TestStream::new(data);
517 let mut decoder = Decoder::new(stream);
518
519 let token = decoder.token().await.unwrap();
521 assert_eq!(token, Token::Delim('{'));
522 let token = decoder.token().await.unwrap();
524 assert_eq!(token, Token::String("key".to_string()));
525 let token = decoder.token().await.unwrap();
527 assert_eq!(token, Token::Value(Vec::from(r#""value""#)));
528 let token = decoder.token().await.unwrap();
530 assert_eq!(token, Token::String("key2".to_string()));
531 let token = decoder.token().await.unwrap();
533 assert_eq!(token, Token::Value(Vec::from(r#""value2""#)));
534 let token = decoder.token().await.unwrap();
536 assert_eq!(token, Token::Delim('}'));
537 }
538
539 #[tokio::test]
540 async fn test_token_array_start() {
541 let data = vec![Bytes::from_static(b"[1, 2, 3]")];
542 let stream = TestStream::new(data);
543 let mut decoder = Decoder::new(stream);
544
545 let token = decoder.token().await.unwrap();
546 assert_eq!(token, Token::Delim('['));
547 }
548
549 #[tokio::test]
550 async fn test_token_array_end() {
551 let data = vec![Bytes::from_static(b"[1, 2, 3]")];
552 let stream = TestStream::new(data);
553 let mut decoder = Decoder::new(stream);
554
555 let token = decoder.token().await.unwrap();
557 assert_eq!(token, Token::Delim('['));
558 let token = decoder.token().await.unwrap();
560 assert_eq!(token, Token::Value(b"1".to_vec()));
561 let token = decoder.token().await.unwrap();
563 assert_eq!(token, Token::Value(b"2".to_vec()));
564 let token = decoder.token().await.unwrap();
566 assert_eq!(token, Token::Value(b"3".to_vec()));
567 let token = decoder.token().await.unwrap();
569 assert_eq!(token, Token::Delim(']'));
570 }
571
572 #[tokio::test]
573 async fn test_token_string() {
574 let data = vec![Bytes::from_static(b"\"hello\"")];
575 let stream = TestStream::new(data);
576 let mut decoder = Decoder::new(stream);
577
578 let token = decoder.token().await.unwrap();
579 assert_eq!(token, Token::Value(Vec::from(r#""hello""#)));
580 }
581
582 #[tokio::test]
583 async fn test_token_number() {
584 let data = vec![Bytes::from_static(b"123")];
585 let stream = TestStream::new(data);
586 let mut decoder = Decoder::new(stream);
587
588 let token = decoder.token().await.unwrap();
589 assert_eq!(token, Token::Value(b"123".to_vec()));
590 }
591
592 #[tokio::test]
593 async fn test_token_boolean() {
594 let data = vec![Bytes::from_static(b"true")];
595 let stream = TestStream::new(data);
596 let mut decoder = Decoder::new(stream);
597
598 let token = decoder.token().await.unwrap();
599 assert_eq!(token, Token::Value(b"true".to_vec()));
600 }
601
602 #[tokio::test]
603 async fn test_token_null() {
604 let data = vec![Bytes::from_static(b"null")];
605 let stream = TestStream::new(data);
606 let mut decoder = Decoder::new(stream);
607
608 let token = decoder.token().await.unwrap();
609 assert_eq!(token, Token::Value(b"null".to_vec()));
610 }
611}