pingora_cache/
put.rs

1// Copyright 2025 Cloudflare, Inc.
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7// http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15//! Cache Put module
16
17use crate::max_file_size::ERR_RESPONSE_TOO_LARGE;
18use crate::*;
19use bytes::Bytes;
20use http::header;
21use log::warn;
22use pingora_core::protocols::http::{
23    v1::common::header_value_content_length, HttpTask, ServerSession,
24};
25use pingora_error::Error;
26
27/// The interface to define cache put behavior
28pub trait CachePut {
29    /// Return whether to cache the asset according to the given response header.
30    fn cacheable(&self, response: ResponseHeader) -> RespCacheable {
31        let cc = cache_control::CacheControl::from_resp_headers(&response);
32        filters::resp_cacheable(cc.as_ref(), response, false, Self::cache_defaults())
33    }
34
35    /// Return the [CacheMetaDefaults]
36    fn cache_defaults() -> &'static CacheMetaDefaults;
37}
38
39use parse_response::ResponseParse;
40
41/// The cache put context
42pub struct CachePutCtx<C: CachePut> {
43    cache_put: C, // the user defined cache put behavior
44    key: CacheKey,
45    storage: &'static (dyn storage::Storage + Sync), // static for now
46    eviction: Option<&'static (dyn eviction::EvictionManager + Sync)>,
47    miss_handler: Option<MissHandler>,
48    max_file_size_tracker: Option<MaxFileSizeTracker>,
49    meta: Option<CacheMeta>,
50    parser: ResponseParse,
51    // FIXME: cache put doesn't have cache lock but some storage cannot handle concurrent put
52    // to the same asset.
53    trace: trace::Span,
54}
55
56impl<C: CachePut> CachePutCtx<C> {
57    /// Create a new [CachePutCtx]
58    pub fn new(
59        cache_put: C,
60        key: CacheKey,
61        storage: &'static (dyn storage::Storage + Sync),
62        eviction: Option<&'static (dyn eviction::EvictionManager + Sync)>,
63        trace: trace::Span,
64    ) -> Self {
65        CachePutCtx {
66            cache_put,
67            key,
68            storage,
69            eviction,
70            miss_handler: None,
71            max_file_size_tracker: None,
72            meta: None,
73            parser: ResponseParse::new(),
74            trace,
75        }
76    }
77
78    /// Set the max cacheable size limit
79    pub fn set_max_file_size_bytes(&mut self, max_file_size_bytes: usize) {
80        self.max_file_size_tracker = Some(MaxFileSizeTracker::new(max_file_size_bytes));
81    }
82
83    async fn put_header(&mut self, meta: CacheMeta) -> Result<()> {
84        let trace = self.trace.child("cache put header", |o| o.start()).handle();
85        let miss_handler = self
86            .storage
87            .get_miss_handler(&self.key, &meta, &trace)
88            .await?;
89        self.miss_handler = Some(miss_handler);
90        self.meta = Some(meta);
91        Ok(())
92    }
93
94    async fn put_body(&mut self, data: Bytes, eof: bool) -> Result<()> {
95        // fail if writing the body would exceed the max_file_size_bytes
96        if let Some(size_tracker) = self.max_file_size_tracker.as_mut() {
97            let body_size_allowed = size_tracker.add_body_bytes(data.len());
98            if !body_size_allowed {
99                return Error::e_explain(
100                    ERR_RESPONSE_TOO_LARGE,
101                    format!(
102                        "writing data of size {} bytes would exceed max file size of {} bytes",
103                        data.len(),
104                        size_tracker.max_file_size_bytes(),
105                    ),
106                );
107            }
108        }
109
110        let miss_handler = self.miss_handler.as_mut().unwrap();
111        miss_handler.write_body(data, eof).await
112    }
113
114    async fn finish(&mut self) -> Result<()> {
115        let Some(miss_handler) = self.miss_handler.take() else {
116            // no miss_handler, uncacheable
117            return Ok(());
118        };
119        let finish = miss_handler.finish().await?;
120        if let Some(eviction) = self.eviction.as_ref() {
121            let cache_key = self.key.to_compact();
122            let meta = self.meta.as_ref().unwrap();
123            let evicted = match finish {
124                MissFinishType::Appended(delta) => eviction.increment_weight(cache_key, delta),
125                MissFinishType::Created(size) => {
126                    eviction.admit(cache_key, size, meta.0.internal.fresh_until)
127                }
128            };
129            // actual eviction can be done async
130            let trace = self
131                .trace
132                .child("cache put eviction", |o| o.start())
133                .handle();
134            let storage = self.storage;
135            tokio::task::spawn(async move {
136                for item in evicted {
137                    if let Err(e) = storage.purge(&item, PurgeType::Eviction, &trace).await {
138                        warn!("Failed to purge {item} during eviction for cache put: {e}");
139                    }
140                }
141            });
142        }
143
144        Ok(())
145    }
146
147    async fn do_cache_put(&mut self, data: &[u8]) -> Result<Option<NoCacheReason>> {
148        let tasks = self.parser.inject_data(data)?;
149        for task in tasks {
150            match task {
151                HttpTask::Header(header, _eos) => match self.cache_put.cacheable(*header) {
152                    RespCacheable::Cacheable(meta) => {
153                        if let Some(max_file_size_tracker) = &self.max_file_size_tracker {
154                            let content_length_hdr = meta.headers().get(header::CONTENT_LENGTH);
155                            if let Some(content_length) =
156                                header_value_content_length(content_length_hdr)
157                            {
158                                if content_length > max_file_size_tracker.max_file_size_bytes() {
159                                    return Ok(Some(NoCacheReason::ResponseTooLarge));
160                                }
161                            }
162                        }
163
164                        self.put_header(meta).await?;
165                    }
166                    RespCacheable::Uncacheable(reason) => {
167                        return Ok(Some(reason));
168                    }
169                },
170                HttpTask::Body(data, eos) => {
171                    if let Some(data) = data {
172                        self.put_body(data, eos).await?;
173                    }
174                }
175                _ => {
176                    panic!("unexpected HttpTask during cache put {task:?}");
177                }
178            }
179        }
180        Ok(None)
181    }
182
183    /// Start the cache put logic for the given request
184    ///
185    /// This function will start to read the request body to put into cache.
186    /// Return:
187    /// - `Ok(None)` when the payload will be cache.
188    /// - `Ok(Some(reason))` when the payload is not cacheable
189    pub async fn cache_put(
190        &mut self,
191        session: &mut ServerSession,
192    ) -> Result<Option<NoCacheReason>> {
193        let mut no_cache_reason = None;
194        while let Some(data) = session.read_request_body().await? {
195            if no_cache_reason.is_some() {
196                // even uncacheable, the entire body needs to be drains for 1. downstream
197                // not throwing errors 2. connection reuse
198                continue;
199            }
200            no_cache_reason = self.do_cache_put(&data).await?
201        }
202        self.parser.finish()?;
203        self.finish().await?;
204        Ok(no_cache_reason)
205    }
206}
207
208#[cfg(test)]
209mod test {
210    use super::*;
211    use cf_rustracing::span::Span;
212    use once_cell::sync::Lazy;
213
214    struct TestCachePut();
215    impl CachePut for TestCachePut {
216        fn cache_defaults() -> &'static CacheMetaDefaults {
217            const DEFAULT: CacheMetaDefaults =
218                CacheMetaDefaults::new(|_| Some(Duration::from_secs(1)), 1, 1);
219            &DEFAULT
220        }
221    }
222
223    type TestCachePutCtx = CachePutCtx<TestCachePut>;
224    static CACHE_BACKEND: Lazy<MemCache> = Lazy::new(MemCache::new);
225
226    #[tokio::test]
227    async fn test_cache_put() {
228        let key = CacheKey::new("", "a", "1");
229        let span = Span::inactive();
230        let put = TestCachePut();
231        let mut ctx = TestCachePutCtx::new(put, key.clone(), &*CACHE_BACKEND, None, span);
232        let payload = b"HTTP/1.1 200 OK\r\n\
233        Date: Thu, 26 Apr 2018 05:42:05 GMT\r\n\
234        Content-Type: text/html; charset=utf-8\r\n\
235        Connection: keep-alive\r\n\
236        X-Frame-Options: SAMEORIGIN\r\n\
237        Cache-Control: public, max-age=1\r\n\
238        Server: origin-server\r\n\
239        Content-Length: 4\r\n\r\nrust";
240        // here we skip mocking a real http session for simplicity
241        let res = ctx.do_cache_put(payload).await.unwrap();
242        assert!(res.is_none()); // cacheable
243        ctx.parser.finish().unwrap();
244        ctx.finish().await.unwrap();
245
246        let span = Span::inactive();
247        let (meta, mut hit) = CACHE_BACKEND
248            .lookup(&key, &span.handle())
249            .await
250            .unwrap()
251            .unwrap();
252        assert_eq!(
253            meta.headers().get("date").unwrap(),
254            "Thu, 26 Apr 2018 05:42:05 GMT"
255        );
256        let data = hit.read_body().await.unwrap().unwrap();
257        assert_eq!(data, "rust");
258    }
259
260    #[tokio::test]
261    async fn test_cache_put_uncacheable() {
262        let key = CacheKey::new("", "a", "1");
263        let span = Span::inactive();
264        let put = TestCachePut();
265        let mut ctx = TestCachePutCtx::new(put, key.clone(), &*CACHE_BACKEND, None, span);
266        let payload = b"HTTP/1.1 200 OK\r\n\
267        Date: Thu, 26 Apr 2018 05:42:05 GMT\r\n\
268        Content-Type: text/html; charset=utf-8\r\n\
269        Connection: keep-alive\r\n\
270        X-Frame-Options: SAMEORIGIN\r\n\
271        Cache-Control: no-store\r\n\
272        Server: origin-server\r\n\
273        Content-Length: 4\r\n\r\nrust";
274        // here we skip mocking a real http session for simplicity
275        let no_cache = ctx.do_cache_put(payload).await.unwrap().unwrap();
276        assert_eq!(no_cache, NoCacheReason::OriginNotCache);
277        ctx.parser.finish().unwrap();
278        ctx.finish().await.unwrap();
279    }
280
281    #[tokio::test]
282    async fn test_cache_put_204_invalid_body() {
283        let key = CacheKey::new("", "b", "1");
284        let span = Span::inactive();
285        let put = TestCachePut();
286        let mut ctx = TestCachePutCtx::new(put, key.clone(), &*CACHE_BACKEND, None, span);
287        let payload = b"HTTP/1.1 204 OK\r\n\
288        Date: Thu, 26 Apr 2018 05:42:05 GMT\r\n\
289        Content-Type: text/html; charset=utf-8\r\n\
290        Connection: keep-alive\r\n\
291        X-Frame-Options: SAMEORIGIN\r\n\
292        Cache-Control: public, max-age=1\r\n\
293        Server: origin-server\r\n\
294        Content-Length: 4\r\n\r\n";
295        // here we skip mocking a real http session for simplicity
296        let res = ctx.do_cache_put(payload).await.unwrap();
297        assert!(res.is_none()); // cacheable
298                                // 204 should not have body, invalid client input may try to pass one
299        let res = ctx.do_cache_put(b"rust").await.unwrap();
300        assert!(res.is_none()); // still cacheable
301        ctx.parser.finish().unwrap();
302        ctx.finish().await.unwrap();
303
304        let span = Span::inactive();
305        let (meta, mut hit) = CACHE_BACKEND
306            .lookup(&key, &span.handle())
307            .await
308            .unwrap()
309            .unwrap();
310        assert_eq!(
311            meta.headers().get("date").unwrap(),
312            "Thu, 26 Apr 2018 05:42:05 GMT"
313        );
314        // just treated as empty body
315        // (TODO: should we reset content-length/transfer-encoding
316        // headers on 204/304?)
317        let data = hit.read_body().await.unwrap().unwrap();
318        assert!(data.is_empty());
319    }
320
321    #[tokio::test]
322    async fn test_cache_put_extra_body() {
323        let key = CacheKey::new("", "c", "1");
324        let span = Span::inactive();
325        let put = TestCachePut();
326        let mut ctx = TestCachePutCtx::new(put, key.clone(), &*CACHE_BACKEND, None, span);
327        let payload = b"HTTP/1.1 200 OK\r\n\
328        Date: Thu, 26 Apr 2018 05:42:05 GMT\r\n\
329        Content-Type: text/html; charset=utf-8\r\n\
330        Connection: keep-alive\r\n\
331        X-Frame-Options: SAMEORIGIN\r\n\
332        Cache-Control: public, max-age=1\r\n\
333        Server: origin-server\r\n\
334        Content-Length: 4\r\n\r\n";
335        // here we skip mocking a real http session for simplicity
336        let res = ctx.do_cache_put(payload).await.unwrap();
337        assert!(res.is_none()); // cacheable
338                                // pass in more extra request body that needs to be drained
339        let res = ctx.do_cache_put(b"rustab").await.unwrap();
340        assert!(res.is_none()); // still cacheable
341        let res = ctx.do_cache_put(b"cdef").await.unwrap();
342        assert!(res.is_none()); // still cacheable
343        ctx.parser.finish().unwrap();
344        ctx.finish().await.unwrap();
345
346        let span = Span::inactive();
347        let (meta, mut hit) = CACHE_BACKEND
348            .lookup(&key, &span.handle())
349            .await
350            .unwrap()
351            .unwrap();
352        assert_eq!(
353            meta.headers().get("date").unwrap(),
354            "Thu, 26 Apr 2018 05:42:05 GMT"
355        );
356        let data = hit.read_body().await.unwrap().unwrap();
357        // body only contains specified content-length bounds
358        assert_eq!(data, "rust");
359    }
360}
361
362// maybe this can simplify some logic in pingora::h1
363
364mod parse_response {
365    use super::*;
366    use bytes::BytesMut;
367    use httparse::Status;
368    use pingora_error::{
369        Error,
370        ErrorType::{self, *},
371    };
372
373    pub const INCOMPLETE_BODY: ErrorType = ErrorType::new("IncompleteHttpBody");
374
375    const MAX_HEADERS: usize = 256;
376    const INIT_HEADER_BUF_SIZE: usize = 4096;
377
378    #[derive(Debug, Clone, Copy, PartialEq)]
379    enum ParseState {
380        Init,
381        PartialHeader,
382        PartialBodyContentLength(usize, usize),
383        PartialBody(usize),
384        Done(usize),
385        Invalid(httparse::Error),
386    }
387
388    impl ParseState {
389        fn is_done(&self) -> bool {
390            matches!(self, Self::Done(_))
391        }
392        fn read_header(&self) -> bool {
393            matches!(self, Self::Init | Self::PartialHeader)
394        }
395        fn read_body(&self) -> bool {
396            matches!(
397                self,
398                Self::PartialBodyContentLength(..) | Self::PartialBody(_)
399            )
400        }
401    }
402
403    pub(super) struct ResponseParse {
404        state: ParseState,
405        buf: BytesMut,
406        header_bytes: Bytes,
407    }
408
409    impl ResponseParse {
410        pub fn new() -> Self {
411            ResponseParse {
412                state: ParseState::Init,
413                buf: BytesMut::with_capacity(INIT_HEADER_BUF_SIZE),
414                header_bytes: Bytes::new(),
415            }
416        }
417
418        pub fn inject_data(&mut self, data: &[u8]) -> Result<Vec<HttpTask>> {
419            if self.state.is_done() {
420                // just ignore extra response body after parser is done
421                // could be invalid body appended to a no-content status
422                // or invalid body after content-length
423                // TODO: consider propagating an error to the client
424                return Ok(vec![]);
425            }
426
427            self.put_data(data);
428
429            let mut tasks = vec![];
430            while !self.state.is_done() {
431                if self.state.read_header() {
432                    let header = self.parse_header()?;
433                    let Some(header) = header else {
434                        break;
435                    };
436                    tasks.push(HttpTask::Header(Box::new(header), self.state.is_done()));
437                } else if self.state.read_body() {
438                    let body = self.parse_body()?;
439                    let Some(body) = body else {
440                        break;
441                    };
442                    tasks.push(HttpTask::Body(Some(body), self.state.is_done()));
443                } else {
444                    break;
445                }
446            }
447            Ok(tasks)
448        }
449
450        fn put_data(&mut self, data: &[u8]) {
451            use ParseState::*;
452            if matches!(self.state, Done(_) | Invalid(_)) {
453                panic!("Wrong phase {:?}", self.state);
454            }
455            self.buf.extend_from_slice(data);
456        }
457
458        fn parse_header(&mut self) -> Result<Option<ResponseHeader>> {
459            let mut headers = [httparse::EMPTY_HEADER; MAX_HEADERS];
460            let mut resp = httparse::Response::new(&mut headers);
461            let mut parser = httparse::ParserConfig::default();
462            parser.allow_spaces_after_header_name_in_responses(true);
463            parser.allow_obsolete_multiline_headers_in_responses(true);
464
465            let res = parser.parse_response(&mut resp, &self.buf);
466            let res = match res {
467                Ok(res) => res,
468                Err(e) => {
469                    self.state = ParseState::Invalid(e);
470                    return Error::e_because(
471                        InvalidHTTPHeader,
472                        format!("buf: {:?}", String::from_utf8_lossy(&self.buf)),
473                        e,
474                    );
475                }
476            };
477
478            let split_to = match res {
479                Status::Complete(s) => s,
480                Status::Partial => {
481                    self.state = ParseState::PartialHeader;
482                    return Ok(None);
483                }
484            };
485            // safe to unwrap, valid response always has code set.
486            let mut response =
487                ResponseHeader::build(resp.code.unwrap(), Some(resp.headers.len())).unwrap();
488            for header in resp.headers {
489                // TODO: consider hold a Bytes and all header values can be Bytes referencing the
490                // original buffer without reallocation
491                response.append_header(header.name.to_owned(), header.value.to_owned())?;
492            }
493            // TODO: see above, we can make header value `Bytes` referencing header_bytes
494            let header_bytes = self.buf.split_to(split_to).freeze();
495            self.header_bytes = header_bytes;
496            self.state = body_type(&response);
497
498            Ok(Some(response))
499        }
500
501        fn parse_body(&mut self) -> Result<Option<Bytes>> {
502            use ParseState::*;
503            if self.buf.is_empty() {
504                return Ok(None);
505            }
506            match self.state {
507                Init | PartialHeader | Invalid(_) => {
508                    panic!("Wrong phase {:?}", self.state);
509                }
510                Done(_) => Ok(None),
511                PartialBodyContentLength(total, mut seen) => {
512                    let end = if total < self.buf.len() + seen {
513                        // TODO: warn! more data than expected
514                        total - seen
515                    } else {
516                        self.buf.len()
517                    };
518                    seen += end;
519                    if seen >= total {
520                        self.state = Done(seen);
521                    } else {
522                        self.state = PartialBodyContentLength(total, seen);
523                    }
524                    Ok(Some(self.buf.split_to(end).freeze()))
525                }
526                PartialBody(seen) => {
527                    self.state = PartialBody(seen + self.buf.len());
528                    Ok(Some(self.buf.split().freeze()))
529                }
530            }
531        }
532
533        pub fn finish(&mut self) -> Result<()> {
534            if let ParseState::PartialBody(seen) = self.state {
535                self.state = ParseState::Done(seen);
536            }
537            if !self.state.is_done() {
538                Error::e_explain(INCOMPLETE_BODY, format!("{:?}", self.state))
539            } else {
540                Ok(())
541            }
542        }
543    }
544
545    fn body_type(resp: &ResponseHeader) -> ParseState {
546        use http::StatusCode;
547
548        if matches!(
549            resp.status,
550            StatusCode::NO_CONTENT | StatusCode::NOT_MODIFIED
551        ) {
552            // these status codes cannot have body by definition
553            return ParseState::Done(0);
554        }
555        if let Some(cl) = resp.headers.get(http::header::CONTENT_LENGTH) {
556            // ignore invalid header value
557            if let Some(cl) = std::str::from_utf8(cl.as_bytes())
558                .ok()
559                .and_then(|cl| cl.parse::<usize>().ok())
560            {
561                return if cl == 0 {
562                    ParseState::Done(0)
563                } else {
564                    ParseState::PartialBodyContentLength(cl, 0)
565                };
566            }
567        }
568        // HTTP/1.0 and chunked encoding are both treated as PartialBody
569        // The response body payload should _not_ be chunked encoded
570        // even if the Transfer-Encoding: chunked header is added
571        ParseState::PartialBody(0)
572    }
573
574    #[cfg(test)]
575    mod test {
576        use super::*;
577
578        #[test]
579        fn test_basic_response() {
580            let input = b"HTTP/1.1 200 OK\r\n\r\n";
581            let mut parser = ResponseParse::new();
582            let output = parser.inject_data(input).unwrap();
583            assert_eq!(output.len(), 1);
584            let HttpTask::Header(header, eos) = &output[0] else {
585                panic!("{:?}", output);
586            };
587            assert_eq!(header.status, 200);
588            assert!(!eos);
589
590            let body = b"abc";
591            let output = parser.inject_data(body).unwrap();
592            assert_eq!(output.len(), 1);
593            let HttpTask::Body(data, _eos) = &output[0] else {
594                panic!("{:?}", output);
595            };
596            assert_eq!(data.as_ref().unwrap(), &body[..]);
597            parser.finish().unwrap();
598        }
599
600        #[test]
601        fn test_partial_response_headers() {
602            let input = b"HTTP/1.1 200 OK\r\n";
603            let mut parser = ResponseParse::new();
604            let output = parser.inject_data(input).unwrap();
605            // header is not complete
606            assert_eq!(output.len(), 0);
607
608            let output = parser
609                .inject_data("Server: pingora\r\n\r\n".as_bytes())
610                .unwrap();
611            assert_eq!(output.len(), 1);
612            let HttpTask::Header(header, eos) = &output[0] else {
613                panic!("{:?}", output);
614            };
615            assert_eq!(header.status, 200);
616            assert_eq!(header.headers.get("Server").unwrap(), "pingora");
617            assert!(!eos);
618        }
619
620        #[test]
621        fn test_invalid_headers() {
622            let input = b"HTP/1.1 200 OK\r\nServer: pingora\r\n\r\n";
623            let mut parser = ResponseParse::new();
624            let output = parser.inject_data(input);
625            // header is not complete
626            assert!(output.is_err());
627            match parser.state {
628                ParseState::Invalid(httparse::Error::Version) => {}
629                _ => panic!("should have failed to parse"),
630            }
631        }
632
633        #[test]
634        fn test_body_content_length() {
635            let input = b"HTTP/1.1 200 OK\r\nContent-Length: 6\r\n\r\nabc";
636            let mut parser = ResponseParse::new();
637            let output = parser.inject_data(input).unwrap();
638
639            assert_eq!(output.len(), 2);
640            let HttpTask::Header(header, _eos) = &output[0] else {
641                panic!("{:?}", output);
642            };
643            assert_eq!(header.status, 200);
644
645            let HttpTask::Body(data, eos) = &output[1] else {
646                panic!("{:?}", output);
647            };
648            assert_eq!(data.as_ref().unwrap(), "abc");
649            assert!(!eos);
650
651            let output = parser.inject_data(b"def").unwrap();
652            assert_eq!(output.len(), 1);
653            let HttpTask::Body(data, eos) = &output[0] else {
654                panic!("{:?}", output);
655            };
656            assert_eq!(data.as_ref().unwrap(), "def");
657            assert!(eos);
658
659            parser.finish().unwrap();
660        }
661
662        #[test]
663        fn test_body_chunked() {
664            let input = b"HTTP/1.1 200 OK\r\nTransfer-Encoding: chunked\r\n\r\nrust";
665            let mut parser = ResponseParse::new();
666            let output = parser.inject_data(input).unwrap();
667
668            assert_eq!(output.len(), 2);
669            let HttpTask::Header(header, _eos) = &output[0] else {
670                panic!("{:?}", output);
671            };
672            assert_eq!(header.status, 200);
673
674            let HttpTask::Body(data, eos) = &output[1] else {
675                panic!("{:?}", output);
676            };
677            assert_eq!(data.as_ref().unwrap(), "rust");
678            assert!(!eos);
679
680            parser.finish().unwrap();
681        }
682
683        #[test]
684        fn test_body_content_length_early() {
685            let input = b"HTTP/1.1 200 OK\r\nContent-Length: 6\r\n\r\nabc";
686            let mut parser = ResponseParse::new();
687            let output = parser.inject_data(input).unwrap();
688
689            assert_eq!(output.len(), 2);
690            let HttpTask::Header(header, _eos) = &output[0] else {
691                panic!("{:?}", output);
692            };
693            assert_eq!(header.status, 200);
694
695            let HttpTask::Body(data, eos) = &output[1] else {
696                panic!("{:?}", output);
697            };
698            assert_eq!(data.as_ref().unwrap(), "abc");
699            assert!(!eos);
700
701            parser.finish().unwrap_err();
702        }
703
704        #[test]
705        fn test_body_content_length_more_data() {
706            let input = b"HTTP/1.1 200 OK\r\nContent-Length: 2\r\n\r\nabc";
707            let mut parser = ResponseParse::new();
708            let output = parser.inject_data(input).unwrap();
709
710            assert_eq!(output.len(), 2);
711            let HttpTask::Header(header, _eos) = &output[0] else {
712                panic!("{:?}", output);
713            };
714            assert_eq!(header.status, 200);
715
716            let HttpTask::Body(data, eos) = &output[1] else {
717                panic!("{:?}", output);
718            };
719            assert_eq!(data.as_ref().unwrap(), "ab");
720            assert!(eos);
721
722            // extra data is dropped without error
723            parser.finish().unwrap();
724        }
725
726        #[test]
727        fn test_body_chunked_partial_chunk() {
728            let input = b"HTTP/1.1 200 OK\r\nTransfer-Encoding: chunked\r\n\r\nru";
729            let mut parser = ResponseParse::new();
730            let output = parser.inject_data(input).unwrap();
731
732            assert_eq!(output.len(), 2);
733            let HttpTask::Header(header, _eos) = &output[0] else {
734                panic!("{:?}", output);
735            };
736            assert_eq!(header.status, 200);
737
738            let HttpTask::Body(data, eos) = &output[1] else {
739                panic!("{:?}", output);
740            };
741            assert_eq!(data.as_ref().unwrap(), "ru");
742            assert!(!eos);
743
744            let output = parser.inject_data(b"st\r\n").unwrap();
745            assert_eq!(output.len(), 1);
746            let HttpTask::Body(data, eos) = &output[0] else {
747                panic!("{:?}", output);
748            };
749            assert_eq!(data.as_ref().unwrap(), "st\r\n");
750            assert!(!eos);
751        }
752
753        #[test]
754        fn test_no_body_content_length() {
755            let input = b"HTTP/1.1 200 OK\r\nContent-Length: 0\r\n\r\n";
756            let mut parser = ResponseParse::new();
757            let output = parser.inject_data(input).unwrap();
758
759            assert_eq!(output.len(), 1);
760            let HttpTask::Header(header, eos) = &output[0] else {
761                panic!("{:?}", output);
762            };
763            assert_eq!(header.status, 200);
764            assert!(eos);
765
766            parser.finish().unwrap();
767        }
768
769        #[test]
770        fn test_no_body_304_no_content_length() {
771            let input = b"HTTP/1.1 304 Not Modified\r\nCache-Control: public, max-age=10\r\n\r\n";
772            let mut parser = ResponseParse::new();
773            let output = parser.inject_data(input).unwrap();
774
775            assert_eq!(output.len(), 1);
776            let HttpTask::Header(header, eos) = &output[0] else {
777                panic!("{:?}", output);
778            };
779            assert_eq!(header.status, 304);
780            assert!(eos);
781
782            parser.finish().unwrap();
783        }
784
785        #[test]
786        fn test_204_with_chunked_body() {
787            let input = b"HTTP/1.1 204 No Content\r\nCache-Control: public, max-age=10\r\nTransfer-Encoding: chunked\r\n\r\n";
788            let mut parser = ResponseParse::new();
789            let output = parser.inject_data(input).unwrap();
790
791            assert_eq!(output.len(), 1);
792            let HttpTask::Header(header, eos) = &output[0] else {
793                panic!("{:?}", output);
794            };
795            assert_eq!(header.status, 204);
796            assert!(eos);
797
798            // 204 should not have a body, parser ignores bad input
799            let output = parser.inject_data(b"4\r\nrust\r\n0\r\n\r\n").unwrap();
800            assert!(output.is_empty());
801            parser.finish().unwrap();
802        }
803
804        #[test]
805        fn test_204_with_content_length() {
806            let input = b"HTTP/1.1 204 No Content\r\nCache-Control: public, max-age=10\r\nContent-Length: 4\r\n\r\n";
807            let mut parser = ResponseParse::new();
808            let output = parser.inject_data(input).unwrap();
809
810            assert_eq!(output.len(), 1);
811            let HttpTask::Header(header, eos) = &output[0] else {
812                panic!("{:?}", output);
813            };
814            assert_eq!(header.status, 204);
815            assert!(eos);
816
817            // 204 should not have a body, parser ignores bad input
818            let output = parser.inject_data(b"rust").unwrap();
819            assert!(output.is_empty());
820            parser.finish().unwrap();
821        }
822
823        #[test]
824        fn test_200_with_zero_content_length_more_data() {
825            let input = b"HTTP/1.1 200 OK\r\nCache-Control: public, max-age=10\r\nContent-Length: 0\r\n\r\n";
826            let mut parser = ResponseParse::new();
827            let output = parser.inject_data(input).unwrap();
828
829            assert_eq!(output.len(), 1);
830            let HttpTask::Header(header, eos) = &output[0] else {
831                panic!("{:?}", output);
832            };
833            assert_eq!(header.status, 200);
834            assert!(eos);
835
836            let output = parser.inject_data(b"rust").unwrap();
837            assert!(output.is_empty());
838            parser.finish().unwrap();
839        }
840    }
841}