Skip to main content

pingora_cache/
put.rs

1// Copyright 2026 Cloudflare, Inc.
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7// http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15//! Cache Put module
16
17use crate::max_file_size::ERR_RESPONSE_TOO_LARGE;
18use crate::*;
19use bytes::Bytes;
20use http::header;
21use log::warn;
22use pingora_core::protocols::http::{
23    v1::common::header_value_content_length, HttpTask, ServerSession,
24};
25use pingora_error::Error;
26
27/// The interface to define cache put behavior
28pub trait CachePut {
29    /// Return whether to cache the asset according to the given response header.
30    fn cacheable(&self, response: ResponseHeader) -> RespCacheable {
31        let cc = cache_control::CacheControl::from_resp_headers(&response);
32        filters::resp_cacheable(cc.as_ref(), response, false, Self::cache_defaults())
33    }
34
35    /// Return the [CacheMetaDefaults]
36    fn cache_defaults() -> &'static CacheMetaDefaults;
37
38    /// Put interesting things in the span given the parsed response header.
39    fn trace_header(&mut self, _response: &ResponseHeader) {}
40}
41
42use parse_response::ResponseParse;
43
44/// The cache put context
45pub struct CachePutCtx<C: CachePut> {
46    cache_put: C, // the user defined cache put behavior
47    key: CacheKey,
48    storage: &'static (dyn storage::Storage + Sync), // static for now
49    eviction: Option<&'static (dyn eviction::EvictionManager + Sync)>,
50    miss_handler: Option<MissHandler>,
51    max_file_size_tracker: Option<MaxFileSizeTracker>,
52    meta: Option<CacheMeta>,
53    parser: ResponseParse,
54    // FIXME: cache put doesn't have cache lock but some storage cannot handle concurrent put
55    // to the same asset.
56    trace: trace::Span,
57}
58
59impl<C: CachePut> CachePutCtx<C> {
60    /// Create a new [CachePutCtx]
61    pub fn new(
62        cache_put: C,
63        key: CacheKey,
64        storage: &'static (dyn storage::Storage + Sync),
65        eviction: Option<&'static (dyn eviction::EvictionManager + Sync)>,
66        trace: trace::Span,
67    ) -> Self {
68        CachePutCtx {
69            cache_put,
70            key,
71            storage,
72            eviction,
73            miss_handler: None,
74            max_file_size_tracker: None,
75            meta: None,
76            parser: ResponseParse::new(),
77            trace,
78        }
79    }
80
81    /// Set the max cacheable size limit
82    pub fn set_max_file_size_bytes(&mut self, max_file_size_bytes: usize) {
83        self.max_file_size_tracker = Some(MaxFileSizeTracker::new(max_file_size_bytes));
84    }
85
86    async fn put_header(&mut self, meta: CacheMeta) -> Result<()> {
87        let mut trace = self.trace.child("cache put header", |o| o.start());
88        let miss_handler = self
89            .storage
90            .get_miss_handler(&self.key, &meta, &trace.handle())
91            .await?;
92        trace::tag_span_with_meta(&mut trace, &meta);
93        self.miss_handler = Some(miss_handler);
94        self.meta = Some(meta);
95        Ok(())
96    }
97
98    async fn put_body(&mut self, data: Bytes, eof: bool) -> Result<()> {
99        // fail if writing the body would exceed the max_file_size_bytes
100        if let Some(size_tracker) = self.max_file_size_tracker.as_mut() {
101            let body_size_allowed = size_tracker.add_body_bytes(data.len());
102            if !body_size_allowed {
103                return Error::e_explain(
104                    ERR_RESPONSE_TOO_LARGE,
105                    format!(
106                        "writing data of size {} bytes would exceed max file size of {} bytes",
107                        data.len(),
108                        size_tracker.max_file_size_bytes(),
109                    ),
110                );
111            }
112        }
113
114        let miss_handler = self.miss_handler.as_mut().unwrap();
115        miss_handler.write_body(data, eof).await
116    }
117
118    async fn finish(&mut self) -> Result<()> {
119        let Some(miss_handler) = self.miss_handler.take() else {
120            // no miss_handler, uncacheable
121            return Ok(());
122        };
123        let finish = miss_handler.finish().await?;
124        if let Some(eviction) = self.eviction.as_ref() {
125            let cache_key = self.key.to_compact();
126            let meta = self.meta.as_ref().unwrap();
127            let evicted = match finish {
128                MissFinishType::Appended(delta, max_size) => {
129                    eviction.increment_weight(&cache_key, delta, max_size)
130                }
131                MissFinishType::Created(size) => {
132                    eviction.admit(cache_key, size, meta.0.internal.fresh_until)
133                }
134            };
135            // actual eviction can be done async
136            let trace = self
137                .trace
138                .child("cache put eviction", |o| o.start())
139                .handle();
140            let storage = self.storage;
141            tokio::task::spawn(async move {
142                for item in evicted {
143                    if let Err(e) = storage.purge(&item, PurgeType::Eviction, &trace).await {
144                        warn!("Failed to purge {item} during eviction for cache put: {e}");
145                    }
146                }
147            });
148        }
149
150        Ok(())
151    }
152
153    fn trace_header(&mut self, header: &ResponseHeader) {
154        self.trace.set_tag(|| {
155            Tag::new(
156                "cache-control",
157                header
158                    .headers
159                    .get_all(http::header::CACHE_CONTROL)
160                    .into_iter()
161                    .map(|v| String::from_utf8_lossy(v.as_bytes()).to_string())
162                    .collect::<Vec<_>>()
163                    .join(","),
164            )
165        });
166    }
167
168    async fn do_cache_put(&mut self, data: &[u8]) -> Result<Option<NoCacheReason>> {
169        let tasks = self.parser.inject_data(data)?;
170        for task in tasks {
171            match task {
172                HttpTask::Header(header, _eos) => {
173                    self.trace_header(&header);
174                    match self.cache_put.cacheable(*header) {
175                        RespCacheable::Cacheable(meta) => {
176                            if let Some(max_file_size_tracker) = &self.max_file_size_tracker {
177                                let content_length_hdr = meta.headers().get(header::CONTENT_LENGTH);
178                                if let Some(content_length) =
179                                    header_value_content_length(content_length_hdr)
180                                {
181                                    if content_length > max_file_size_tracker.max_file_size_bytes()
182                                    {
183                                        return Ok(Some(NoCacheReason::ResponseTooLarge));
184                                    }
185                                }
186                            }
187
188                            self.put_header(meta).await?;
189                        }
190                        RespCacheable::Uncacheable(reason) => {
191                            return Ok(Some(reason));
192                        }
193                    }
194                }
195                HttpTask::Body(data, eos) => {
196                    if let Some(data) = data {
197                        self.put_body(data, eos).await?;
198                    }
199                }
200                _ => {
201                    panic!("unexpected HttpTask during cache put {task:?}");
202                }
203            }
204        }
205        Ok(None)
206    }
207
208    /// Start the cache put logic for the given request
209    ///
210    /// This function will start to read the request body to put into cache.
211    /// Return:
212    /// - `Ok(None)` when the payload will be cache.
213    /// - `Ok(Some(reason))` when the payload is not cacheable
214    pub async fn cache_put(
215        &mut self,
216        session: &mut ServerSession,
217    ) -> Result<Option<NoCacheReason>> {
218        let mut no_cache_reason = None;
219        while let Some(data) = session.read_request_body().await? {
220            if no_cache_reason.is_some() {
221                // even uncacheable, the entire body needs to be drains for 1. downstream
222                // not throwing errors 2. connection reuse
223                continue;
224            }
225            no_cache_reason = self.do_cache_put(&data).await?
226        }
227        self.parser.finish()?;
228        self.finish().await?;
229
230        if let Some(reason) = no_cache_reason {
231            self.trace
232                .set_tag(|| Tag::new("uncacheable_reason", reason.as_str()));
233        }
234
235        Ok(no_cache_reason)
236    }
237}
238
239#[cfg(test)]
240mod test {
241    use super::*;
242    use cf_rustracing::span::Span;
243    use once_cell::sync::Lazy;
244
245    struct TestCachePut();
246    impl CachePut for TestCachePut {
247        fn cache_defaults() -> &'static CacheMetaDefaults {
248            const DEFAULT: CacheMetaDefaults =
249                CacheMetaDefaults::new(|_| Some(Duration::from_secs(1)), 1, 1);
250            &DEFAULT
251        }
252    }
253
254    type TestCachePutCtx = CachePutCtx<TestCachePut>;
255    static CACHE_BACKEND: Lazy<MemCache> = Lazy::new(MemCache::new);
256
257    #[tokio::test]
258    async fn test_cache_put() {
259        let key = CacheKey::new("", "a", "1");
260        let span = Span::inactive();
261        let put = TestCachePut();
262        let mut ctx = TestCachePutCtx::new(put, key.clone(), &*CACHE_BACKEND, None, span);
263        let payload = b"HTTP/1.1 200 OK\r\n\
264        Date: Thu, 26 Apr 2018 05:42:05 GMT\r\n\
265        Content-Type: text/html; charset=utf-8\r\n\
266        Connection: keep-alive\r\n\
267        X-Frame-Options: SAMEORIGIN\r\n\
268        Cache-Control: public, max-age=1\r\n\
269        Server: origin-server\r\n\
270        Content-Length: 4\r\n\r\nrust";
271        // here we skip mocking a real http session for simplicity
272        let res = ctx.do_cache_put(payload).await.unwrap();
273        assert!(res.is_none()); // cacheable
274        ctx.parser.finish().unwrap();
275        ctx.finish().await.unwrap();
276
277        let span = Span::inactive();
278        let (meta, mut hit) = CACHE_BACKEND
279            .lookup(&key, &span.handle())
280            .await
281            .unwrap()
282            .unwrap();
283        assert_eq!(
284            meta.headers().get("date").unwrap(),
285            "Thu, 26 Apr 2018 05:42:05 GMT"
286        );
287        let data = hit.read_body().await.unwrap().unwrap();
288        assert_eq!(data, "rust");
289    }
290
291    #[tokio::test]
292    async fn test_cache_put_uncacheable() {
293        let key = CacheKey::new("", "a", "1");
294        let span = Span::inactive();
295        let put = TestCachePut();
296        let mut ctx = TestCachePutCtx::new(put, key.clone(), &*CACHE_BACKEND, None, span);
297        let payload = b"HTTP/1.1 200 OK\r\n\
298        Date: Thu, 26 Apr 2018 05:42:05 GMT\r\n\
299        Content-Type: text/html; charset=utf-8\r\n\
300        Connection: keep-alive\r\n\
301        X-Frame-Options: SAMEORIGIN\r\n\
302        Cache-Control: no-store\r\n\
303        Server: origin-server\r\n\
304        Content-Length: 4\r\n\r\nrust";
305        // here we skip mocking a real http session for simplicity
306        let no_cache = ctx.do_cache_put(payload).await.unwrap().unwrap();
307        assert_eq!(no_cache, NoCacheReason::OriginNotCache);
308        ctx.parser.finish().unwrap();
309        ctx.finish().await.unwrap();
310    }
311
312    #[tokio::test]
313    async fn test_cache_put_204_invalid_body() {
314        let key = CacheKey::new("", "b", "1");
315        let span = Span::inactive();
316        let put = TestCachePut();
317        let mut ctx = TestCachePutCtx::new(put, key.clone(), &*CACHE_BACKEND, None, span);
318        let payload = b"HTTP/1.1 204 OK\r\n\
319        Date: Thu, 26 Apr 2018 05:42:05 GMT\r\n\
320        Content-Type: text/html; charset=utf-8\r\n\
321        Connection: keep-alive\r\n\
322        X-Frame-Options: SAMEORIGIN\r\n\
323        Cache-Control: public, max-age=1\r\n\
324        Server: origin-server\r\n\
325        Content-Length: 4\r\n\r\n";
326        // here we skip mocking a real http session for simplicity
327        let res = ctx.do_cache_put(payload).await.unwrap();
328        assert!(res.is_none()); // cacheable
329                                // 204 should not have body, invalid client input may try to pass one
330        let res = ctx.do_cache_put(b"rust").await.unwrap();
331        assert!(res.is_none()); // still cacheable
332        ctx.parser.finish().unwrap();
333        ctx.finish().await.unwrap();
334
335        let span = Span::inactive();
336        let (meta, mut hit) = CACHE_BACKEND
337            .lookup(&key, &span.handle())
338            .await
339            .unwrap()
340            .unwrap();
341        assert_eq!(
342            meta.headers().get("date").unwrap(),
343            "Thu, 26 Apr 2018 05:42:05 GMT"
344        );
345        // just treated as empty body
346        // (TODO: should we reset content-length/transfer-encoding
347        // headers on 204/304?)
348        let data = hit.read_body().await.unwrap().unwrap();
349        assert!(data.is_empty());
350    }
351
352    #[tokio::test]
353    async fn test_cache_put_extra_body() {
354        let key = CacheKey::new("", "c", "1");
355        let span = Span::inactive();
356        let put = TestCachePut();
357        let mut ctx = TestCachePutCtx::new(put, key.clone(), &*CACHE_BACKEND, None, span);
358        let payload = b"HTTP/1.1 200 OK\r\n\
359        Date: Thu, 26 Apr 2018 05:42:05 GMT\r\n\
360        Content-Type: text/html; charset=utf-8\r\n\
361        Connection: keep-alive\r\n\
362        X-Frame-Options: SAMEORIGIN\r\n\
363        Cache-Control: public, max-age=1\r\n\
364        Server: origin-server\r\n\
365        Content-Length: 4\r\n\r\n";
366        // here we skip mocking a real http session for simplicity
367        let res = ctx.do_cache_put(payload).await.unwrap();
368        assert!(res.is_none()); // cacheable
369                                // pass in more extra request body that needs to be drained
370        let res = ctx.do_cache_put(b"rustab").await.unwrap();
371        assert!(res.is_none()); // still cacheable
372        let res = ctx.do_cache_put(b"cdef").await.unwrap();
373        assert!(res.is_none()); // still cacheable
374        ctx.parser.finish().unwrap();
375        ctx.finish().await.unwrap();
376
377        let span = Span::inactive();
378        let (meta, mut hit) = CACHE_BACKEND
379            .lookup(&key, &span.handle())
380            .await
381            .unwrap()
382            .unwrap();
383        assert_eq!(
384            meta.headers().get("date").unwrap(),
385            "Thu, 26 Apr 2018 05:42:05 GMT"
386        );
387        let data = hit.read_body().await.unwrap().unwrap();
388        // body only contains specified content-length bounds
389        assert_eq!(data, "rust");
390    }
391}
392
393// maybe this can simplify some logic in pingora::h1
394
395mod parse_response {
396    use super::*;
397    use bstr::ByteSlice;
398    use bytes::BytesMut;
399    use httparse::Status;
400    use pingora_error::{
401        Error,
402        ErrorType::{self, *},
403    };
404
405    pub const INCOMPLETE_BODY: ErrorType = ErrorType::new("IncompleteHttpBody");
406
407    const MAX_HEADERS: usize = 256;
408    const INIT_HEADER_BUF_SIZE: usize = 4096;
409
410    #[derive(Debug, Clone, Copy, PartialEq)]
411    enum ParseState {
412        Init,
413        PartialHeader,
414        PartialBodyContentLength(usize, usize),
415        PartialBody(usize),
416        Done(usize),
417        Invalid(httparse::Error),
418    }
419
420    impl ParseState {
421        fn is_done(&self) -> bool {
422            matches!(self, Self::Done(_))
423        }
424        fn read_header(&self) -> bool {
425            matches!(self, Self::Init | Self::PartialHeader)
426        }
427        fn read_body(&self) -> bool {
428            matches!(
429                self,
430                Self::PartialBodyContentLength(..) | Self::PartialBody(_)
431            )
432        }
433    }
434
435    pub(super) struct ResponseParse {
436        state: ParseState,
437        buf: BytesMut,
438        header_bytes: Bytes,
439    }
440
441    impl ResponseParse {
442        pub fn new() -> Self {
443            ResponseParse {
444                state: ParseState::Init,
445                buf: BytesMut::with_capacity(INIT_HEADER_BUF_SIZE),
446                header_bytes: Bytes::new(),
447            }
448        }
449
450        pub fn inject_data(&mut self, data: &[u8]) -> Result<Vec<HttpTask>> {
451            if self.state.is_done() {
452                // just ignore extra response body after parser is done
453                // could be invalid body appended to a no-content status
454                // or invalid body after content-length
455                // TODO: consider propagating an error to the client
456                return Ok(vec![]);
457            }
458
459            self.put_data(data);
460
461            let mut tasks = vec![];
462            while !self.state.is_done() {
463                if self.state.read_header() {
464                    let header = self.parse_header()?;
465                    let Some(header) = header else {
466                        break;
467                    };
468                    tasks.push(HttpTask::Header(Box::new(header), self.state.is_done()));
469                } else if self.state.read_body() {
470                    let body = self.parse_body()?;
471                    let Some(body) = body else {
472                        break;
473                    };
474                    tasks.push(HttpTask::Body(Some(body), self.state.is_done()));
475                } else {
476                    break;
477                }
478            }
479            Ok(tasks)
480        }
481
482        fn put_data(&mut self, data: &[u8]) {
483            use ParseState::*;
484            if matches!(self.state, Done(_) | Invalid(_)) {
485                panic!("Wrong phase {:?}", self.state);
486            }
487            self.buf.extend_from_slice(data);
488        }
489
490        fn parse_header(&mut self) -> Result<Option<ResponseHeader>> {
491            let mut headers = [httparse::EMPTY_HEADER; MAX_HEADERS];
492            let mut resp = httparse::Response::new(&mut headers);
493            let mut parser = httparse::ParserConfig::default();
494            parser.allow_spaces_after_header_name_in_responses(true);
495            parser.allow_obsolete_multiline_headers_in_responses(true);
496
497            let res = parser.parse_response(&mut resp, &self.buf);
498            let res = match res {
499                Ok(res) => res,
500                Err(e) => {
501                    self.state = ParseState::Invalid(e);
502                    return Error::e_because(
503                        InvalidHTTPHeader,
504                        format!("buf: {:?}", self.buf.as_bstr()),
505                        e,
506                    );
507                }
508            };
509
510            let split_to = match res {
511                Status::Complete(s) => s,
512                Status::Partial => {
513                    self.state = ParseState::PartialHeader;
514                    return Ok(None);
515                }
516            };
517            // safe to unwrap, valid response always has code set.
518            let mut response =
519                ResponseHeader::build(resp.code.unwrap(), Some(resp.headers.len())).unwrap();
520            for header in resp.headers {
521                // TODO: consider hold a Bytes and all header values can be Bytes referencing the
522                // original buffer without reallocation
523                response.append_header(header.name.to_owned(), header.value.to_owned())?;
524            }
525            // TODO: see above, we can make header value `Bytes` referencing header_bytes
526            let header_bytes = self.buf.split_to(split_to).freeze();
527            self.header_bytes = header_bytes;
528            self.state = body_type(&response);
529
530            Ok(Some(response))
531        }
532
533        fn parse_body(&mut self) -> Result<Option<Bytes>> {
534            use ParseState::*;
535            if self.buf.is_empty() {
536                return Ok(None);
537            }
538            match self.state {
539                Init | PartialHeader | Invalid(_) => {
540                    panic!("Wrong phase {:?}", self.state);
541                }
542                Done(_) => Ok(None),
543                PartialBodyContentLength(total, mut seen) => {
544                    let end = if total < self.buf.len() + seen {
545                        // TODO: warn! more data than expected
546                        total - seen
547                    } else {
548                        self.buf.len()
549                    };
550                    seen += end;
551                    if seen >= total {
552                        self.state = Done(seen);
553                    } else {
554                        self.state = PartialBodyContentLength(total, seen);
555                    }
556                    Ok(Some(self.buf.split_to(end).freeze()))
557                }
558                PartialBody(seen) => {
559                    self.state = PartialBody(seen + self.buf.len());
560                    Ok(Some(self.buf.split().freeze()))
561                }
562            }
563        }
564
565        pub fn finish(&mut self) -> Result<()> {
566            if let ParseState::PartialBody(seen) = self.state {
567                self.state = ParseState::Done(seen);
568            }
569            if !self.state.is_done() {
570                Error::e_explain(INCOMPLETE_BODY, format!("{:?}", self.state))
571            } else {
572                Ok(())
573            }
574        }
575    }
576
577    fn body_type(resp: &ResponseHeader) -> ParseState {
578        use http::StatusCode;
579
580        if matches!(
581            resp.status,
582            StatusCode::NO_CONTENT | StatusCode::NOT_MODIFIED
583        ) {
584            // these status codes cannot have body by definition
585            return ParseState::Done(0);
586        }
587        if let Some(cl) = resp.headers.get(http::header::CONTENT_LENGTH) {
588            // ignore invalid header value
589            if let Some(cl) = std::str::from_utf8(cl.as_bytes())
590                .ok()
591                .and_then(|cl| cl.parse::<usize>().ok())
592            {
593                return if cl == 0 {
594                    ParseState::Done(0)
595                } else {
596                    ParseState::PartialBodyContentLength(cl, 0)
597                };
598            }
599        }
600        // HTTP/1.0 and chunked encoding are both treated as PartialBody
601        // The response body payload should _not_ be chunked encoded
602        // even if the Transfer-Encoding: chunked header is added
603        ParseState::PartialBody(0)
604    }
605
606    #[cfg(test)]
607    mod test {
608        use super::*;
609
610        #[test]
611        fn test_basic_response() {
612            let input = b"HTTP/1.1 200 OK\r\n\r\n";
613            let mut parser = ResponseParse::new();
614            let output = parser.inject_data(input).unwrap();
615            assert_eq!(output.len(), 1);
616            let HttpTask::Header(header, eos) = &output[0] else {
617                panic!("{:?}", output);
618            };
619            assert_eq!(header.status, 200);
620            assert!(!eos);
621
622            let body = b"abc";
623            let output = parser.inject_data(body).unwrap();
624            assert_eq!(output.len(), 1);
625            let HttpTask::Body(data, _eos) = &output[0] else {
626                panic!("{:?}", output);
627            };
628            assert_eq!(data.as_ref().unwrap(), &body[..]);
629            parser.finish().unwrap();
630        }
631
632        #[test]
633        fn test_partial_response_headers() {
634            let input = b"HTTP/1.1 200 OK\r\n";
635            let mut parser = ResponseParse::new();
636            let output = parser.inject_data(input).unwrap();
637            // header is not complete
638            assert_eq!(output.len(), 0);
639
640            let output = parser
641                .inject_data("Server: pingora\r\n\r\n".as_bytes())
642                .unwrap();
643            assert_eq!(output.len(), 1);
644            let HttpTask::Header(header, eos) = &output[0] else {
645                panic!("{:?}", output);
646            };
647            assert_eq!(header.status, 200);
648            assert_eq!(header.headers.get("Server").unwrap(), "pingora");
649            assert!(!eos);
650        }
651
652        #[test]
653        fn test_invalid_headers() {
654            let input = b"HTP/1.1 200 OK\r\nServer: pingora\r\n\r\n";
655            let mut parser = ResponseParse::new();
656            let output = parser.inject_data(input);
657            // header is not complete
658            assert!(output.is_err());
659            match parser.state {
660                ParseState::Invalid(httparse::Error::Version) => {}
661                _ => panic!("should have failed to parse"),
662            }
663        }
664
665        #[test]
666        fn test_body_content_length() {
667            let input = b"HTTP/1.1 200 OK\r\nContent-Length: 6\r\n\r\nabc";
668            let mut parser = ResponseParse::new();
669            let output = parser.inject_data(input).unwrap();
670
671            assert_eq!(output.len(), 2);
672            let HttpTask::Header(header, _eos) = &output[0] else {
673                panic!("{:?}", output);
674            };
675            assert_eq!(header.status, 200);
676
677            let HttpTask::Body(data, eos) = &output[1] else {
678                panic!("{:?}", output);
679            };
680            assert_eq!(data.as_ref().unwrap(), "abc");
681            assert!(!eos);
682
683            let output = parser.inject_data(b"def").unwrap();
684            assert_eq!(output.len(), 1);
685            let HttpTask::Body(data, eos) = &output[0] else {
686                panic!("{:?}", output);
687            };
688            assert_eq!(data.as_ref().unwrap(), "def");
689            assert!(eos);
690
691            parser.finish().unwrap();
692        }
693
694        #[test]
695        fn test_body_chunked() {
696            let input = b"HTTP/1.1 200 OK\r\nTransfer-Encoding: chunked\r\n\r\nrust";
697            let mut parser = ResponseParse::new();
698            let output = parser.inject_data(input).unwrap();
699
700            assert_eq!(output.len(), 2);
701            let HttpTask::Header(header, _eos) = &output[0] else {
702                panic!("{:?}", output);
703            };
704            assert_eq!(header.status, 200);
705
706            let HttpTask::Body(data, eos) = &output[1] else {
707                panic!("{:?}", output);
708            };
709            assert_eq!(data.as_ref().unwrap(), "rust");
710            assert!(!eos);
711
712            parser.finish().unwrap();
713        }
714
715        #[test]
716        fn test_body_content_length_early() {
717            let input = b"HTTP/1.1 200 OK\r\nContent-Length: 6\r\n\r\nabc";
718            let mut parser = ResponseParse::new();
719            let output = parser.inject_data(input).unwrap();
720
721            assert_eq!(output.len(), 2);
722            let HttpTask::Header(header, _eos) = &output[0] else {
723                panic!("{:?}", output);
724            };
725            assert_eq!(header.status, 200);
726
727            let HttpTask::Body(data, eos) = &output[1] else {
728                panic!("{:?}", output);
729            };
730            assert_eq!(data.as_ref().unwrap(), "abc");
731            assert!(!eos);
732
733            parser.finish().unwrap_err();
734        }
735
736        #[test]
737        fn test_body_content_length_more_data() {
738            let input = b"HTTP/1.1 200 OK\r\nContent-Length: 2\r\n\r\nabc";
739            let mut parser = ResponseParse::new();
740            let output = parser.inject_data(input).unwrap();
741
742            assert_eq!(output.len(), 2);
743            let HttpTask::Header(header, _eos) = &output[0] else {
744                panic!("{:?}", output);
745            };
746            assert_eq!(header.status, 200);
747
748            let HttpTask::Body(data, eos) = &output[1] else {
749                panic!("{:?}", output);
750            };
751            assert_eq!(data.as_ref().unwrap(), "ab");
752            assert!(eos);
753
754            // extra data is dropped without error
755            parser.finish().unwrap();
756        }
757
758        #[test]
759        fn test_body_chunked_partial_chunk() {
760            let input = b"HTTP/1.1 200 OK\r\nTransfer-Encoding: chunked\r\n\r\nru";
761            let mut parser = ResponseParse::new();
762            let output = parser.inject_data(input).unwrap();
763
764            assert_eq!(output.len(), 2);
765            let HttpTask::Header(header, _eos) = &output[0] else {
766                panic!("{:?}", output);
767            };
768            assert_eq!(header.status, 200);
769
770            let HttpTask::Body(data, eos) = &output[1] else {
771                panic!("{:?}", output);
772            };
773            assert_eq!(data.as_ref().unwrap(), "ru");
774            assert!(!eos);
775
776            let output = parser.inject_data(b"st\r\n").unwrap();
777            assert_eq!(output.len(), 1);
778            let HttpTask::Body(data, eos) = &output[0] else {
779                panic!("{:?}", output);
780            };
781            assert_eq!(data.as_ref().unwrap(), "st\r\n");
782            assert!(!eos);
783        }
784
785        #[test]
786        fn test_no_body_content_length() {
787            let input = b"HTTP/1.1 200 OK\r\nContent-Length: 0\r\n\r\n";
788            let mut parser = ResponseParse::new();
789            let output = parser.inject_data(input).unwrap();
790
791            assert_eq!(output.len(), 1);
792            let HttpTask::Header(header, eos) = &output[0] else {
793                panic!("{:?}", output);
794            };
795            assert_eq!(header.status, 200);
796            assert!(eos);
797
798            parser.finish().unwrap();
799        }
800
801        #[test]
802        fn test_no_body_304_no_content_length() {
803            let input = b"HTTP/1.1 304 Not Modified\r\nCache-Control: public, max-age=10\r\n\r\n";
804            let mut parser = ResponseParse::new();
805            let output = parser.inject_data(input).unwrap();
806
807            assert_eq!(output.len(), 1);
808            let HttpTask::Header(header, eos) = &output[0] else {
809                panic!("{:?}", output);
810            };
811            assert_eq!(header.status, 304);
812            assert!(eos);
813
814            parser.finish().unwrap();
815        }
816
817        #[test]
818        fn test_204_with_chunked_body() {
819            let input = b"HTTP/1.1 204 No Content\r\nCache-Control: public, max-age=10\r\nTransfer-Encoding: chunked\r\n\r\n";
820            let mut parser = ResponseParse::new();
821            let output = parser.inject_data(input).unwrap();
822
823            assert_eq!(output.len(), 1);
824            let HttpTask::Header(header, eos) = &output[0] else {
825                panic!("{:?}", output);
826            };
827            assert_eq!(header.status, 204);
828            assert!(eos);
829
830            // 204 should not have a body, parser ignores bad input
831            let output = parser.inject_data(b"4\r\nrust\r\n0\r\n\r\n").unwrap();
832            assert!(output.is_empty());
833            parser.finish().unwrap();
834        }
835
836        #[test]
837        fn test_204_with_content_length() {
838            let input = b"HTTP/1.1 204 No Content\r\nCache-Control: public, max-age=10\r\nContent-Length: 4\r\n\r\n";
839            let mut parser = ResponseParse::new();
840            let output = parser.inject_data(input).unwrap();
841
842            assert_eq!(output.len(), 1);
843            let HttpTask::Header(header, eos) = &output[0] else {
844                panic!("{:?}", output);
845            };
846            assert_eq!(header.status, 204);
847            assert!(eos);
848
849            // 204 should not have a body, parser ignores bad input
850            let output = parser.inject_data(b"rust").unwrap();
851            assert!(output.is_empty());
852            parser.finish().unwrap();
853        }
854
855        #[test]
856        fn test_200_with_zero_content_length_more_data() {
857            let input = b"HTTP/1.1 200 OK\r\nCache-Control: public, max-age=10\r\nContent-Length: 0\r\n\r\n";
858            let mut parser = ResponseParse::new();
859            let output = parser.inject_data(input).unwrap();
860
861            assert_eq!(output.len(), 1);
862            let HttpTask::Header(header, eos) = &output[0] else {
863                panic!("{:?}", output);
864            };
865            assert_eq!(header.status, 200);
866            assert!(eos);
867
868            let output = parser.inject_data(b"rust").unwrap();
869            assert!(output.is_empty());
870            parser.finish().unwrap();
871        }
872    }
873}