pingora_cache/
put.rs

1// Copyright 2025 Cloudflare, Inc.
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7// http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15//! Cache Put module
16
17use crate::*;
18use bytes::Bytes;
19use http::header;
20use log::warn;
21use pingora_core::protocols::http::{
22    v1::common::header_value_content_length, HttpTask, ServerSession,
23};
24
25/// The interface to define cache put behavior
26pub trait CachePut {
27    /// Return whether to cache the asset according to the given response header.
28    fn cacheable(&self, response: ResponseHeader) -> RespCacheable {
29        let cc = cache_control::CacheControl::from_resp_headers(&response);
30        filters::resp_cacheable(cc.as_ref(), response, false, Self::cache_defaults())
31    }
32
33    /// Return the [CacheMetaDefaults]
34    fn cache_defaults() -> &'static CacheMetaDefaults;
35}
36
37use parse_response::ResponseParse;
38
39/// The cache put context
40pub struct CachePutCtx<C: CachePut> {
41    cache_put: C, // the user defined cache put behavior
42    key: CacheKey,
43    storage: &'static (dyn storage::Storage + Sync), // static for now
44    eviction: Option<&'static (dyn eviction::EvictionManager + Sync)>,
45    miss_handler: Option<MissHandler>,
46    max_file_size_bytes: Option<usize>,
47    meta: Option<CacheMeta>,
48    parser: ResponseParse,
49    // FIXME: cache put doesn't have cache lock but some storage cannot handle concurrent put
50    // to the same asset.
51    trace: trace::Span,
52}
53
54impl<C: CachePut> CachePutCtx<C> {
55    /// Create a new [CachePutCtx]
56    pub fn new(
57        cache_put: C,
58        key: CacheKey,
59        storage: &'static (dyn storage::Storage + Sync),
60        eviction: Option<&'static (dyn eviction::EvictionManager + Sync)>,
61        trace: trace::Span,
62    ) -> Self {
63        CachePutCtx {
64            cache_put,
65            key,
66            storage,
67            eviction,
68            miss_handler: None,
69            max_file_size_bytes: None,
70            meta: None,
71            parser: ResponseParse::new(),
72            trace,
73        }
74    }
75
76    /// Set the max cacheable size limit
77    pub fn set_max_file_size_bytes(&mut self, max_file_size_bytes: usize) {
78        self.max_file_size_bytes = Some(max_file_size_bytes);
79    }
80
81    async fn put_header(&mut self, meta: CacheMeta) -> Result<()> {
82        let trace = self.trace.child("cache put header", |o| o.start()).handle();
83        let miss_handler = self
84            .storage
85            .get_miss_handler(&self.key, &meta, &trace)
86            .await?;
87        self.miss_handler = Some(
88            if let Some(max_file_size_bytes) = self.max_file_size_bytes {
89                Box::new(MaxFileSizeMissHandler::new(
90                    miss_handler,
91                    max_file_size_bytes,
92                ))
93            } else {
94                miss_handler
95            },
96        );
97        self.meta = Some(meta);
98        Ok(())
99    }
100
101    async fn put_body(&mut self, data: Bytes, eof: bool) -> Result<()> {
102        let miss_handler = self.miss_handler.as_mut().unwrap();
103        miss_handler.write_body(data, eof).await
104    }
105
106    async fn finish(&mut self) -> Result<()> {
107        let Some(miss_handler) = self.miss_handler.take() else {
108            // no miss_handler, uncacheable
109            return Ok(());
110        };
111        let finish = miss_handler.finish().await?;
112        if let Some(eviction) = self.eviction.as_ref() {
113            let cache_key = self.key.to_compact();
114            let meta = self.meta.as_ref().unwrap();
115            let evicted = match finish {
116                MissFinishType::Appended(delta) => eviction.increment_weight(cache_key, delta),
117                MissFinishType::Created(size) => {
118                    eviction.admit(cache_key, size, meta.0.internal.fresh_until)
119                }
120            };
121            // actual eviction can be done async
122            let trace = self
123                .trace
124                .child("cache put eviction", |o| o.start())
125                .handle();
126            let storage = self.storage;
127            tokio::task::spawn(async move {
128                for item in evicted {
129                    if let Err(e) = storage.purge(&item, PurgeType::Eviction, &trace).await {
130                        warn!("Failed to purge {item} during eviction for cache put: {e}");
131                    }
132                }
133            });
134        }
135
136        Ok(())
137    }
138
139    async fn do_cache_put(&mut self, data: &[u8]) -> Result<Option<NoCacheReason>> {
140        let tasks = self.parser.inject_data(data)?;
141        for task in tasks {
142            match task {
143                HttpTask::Header(header, _eos) => match self.cache_put.cacheable(*header) {
144                    RespCacheable::Cacheable(meta) => {
145                        if let Some(max_file_size_bytes) = self.max_file_size_bytes {
146                            let content_length_hdr = meta.headers().get(header::CONTENT_LENGTH);
147                            if let Some(content_length) =
148                                header_value_content_length(content_length_hdr)
149                            {
150                                if content_length > max_file_size_bytes {
151                                    return Ok(Some(NoCacheReason::ResponseTooLarge));
152                                }
153                            }
154                        }
155
156                        self.put_header(meta).await?;
157                    }
158                    RespCacheable::Uncacheable(reason) => {
159                        return Ok(Some(reason));
160                    }
161                },
162                HttpTask::Body(data, eos) => {
163                    if let Some(data) = data {
164                        self.put_body(data, eos).await?;
165                    }
166                }
167                _ => {
168                    panic!("unexpected HttpTask during cache put {task:?}");
169                }
170            }
171        }
172        Ok(None)
173    }
174
175    /// Start the cache put logic for the given request
176    ///
177    /// This function will start to read the request body to put into cache.
178    /// Return:
179    /// - `Ok(None)` when the payload will be cache.
180    /// - `Ok(Some(reason))` when the payload is not cacheable
181    pub async fn cache_put(
182        &mut self,
183        session: &mut ServerSession,
184    ) -> Result<Option<NoCacheReason>> {
185        let mut no_cache_reason = None;
186        while let Some(data) = session.read_request_body().await? {
187            if no_cache_reason.is_some() {
188                // even uncacheable, the entire body needs to be drains for 1. downstream
189                // not throwing errors 2. connection reuse
190                continue;
191            }
192            no_cache_reason = self.do_cache_put(&data).await?
193        }
194        self.parser.finish()?;
195        self.finish().await?;
196        Ok(no_cache_reason)
197    }
198}
199
200#[cfg(test)]
201mod test {
202    use super::*;
203    use cf_rustracing::span::Span;
204    use once_cell::sync::Lazy;
205
206    struct TestCachePut();
207    impl CachePut for TestCachePut {
208        fn cache_defaults() -> &'static CacheMetaDefaults {
209            const DEFAULT: CacheMetaDefaults = CacheMetaDefaults::new(|_| Some(1), 1, 1);
210            &DEFAULT
211        }
212    }
213
214    type TestCachePutCtx = CachePutCtx<TestCachePut>;
215    static CACHE_BACKEND: Lazy<MemCache> = Lazy::new(MemCache::new);
216
217    #[tokio::test]
218    async fn test_cache_put() {
219        let key = CacheKey::new("", "a", "1");
220        let span = Span::inactive();
221        let put = TestCachePut();
222        let mut ctx = TestCachePutCtx::new(put, key.clone(), &*CACHE_BACKEND, None, span);
223        let payload = b"HTTP/1.1 200 OK\r\n\
224        Date: Thu, 26 Apr 2018 05:42:05 GMT\r\n\
225        Content-Type: text/html; charset=utf-8\r\n\
226        Connection: keep-alive\r\n\
227        X-Frame-Options: SAMEORIGIN\r\n\
228        Cache-Control: public, max-age=1\r\n\
229        Server: origin-server\r\n\
230        Content-Length: 4\r\n\r\nrust";
231        // here we skip mocking a real http session for simplicity
232        let res = ctx.do_cache_put(payload).await.unwrap();
233        assert!(res.is_none()); // cacheable
234        ctx.parser.finish().unwrap();
235        ctx.finish().await.unwrap();
236
237        let span = Span::inactive();
238        let (meta, mut hit) = CACHE_BACKEND
239            .lookup(&key, &span.handle())
240            .await
241            .unwrap()
242            .unwrap();
243        assert_eq!(
244            meta.headers().get("date").unwrap(),
245            "Thu, 26 Apr 2018 05:42:05 GMT"
246        );
247        let data = hit.read_body().await.unwrap().unwrap();
248        assert_eq!(data, "rust");
249    }
250
251    #[tokio::test]
252    async fn test_cache_put_uncacheable() {
253        let key = CacheKey::new("", "a", "1");
254        let span = Span::inactive();
255        let put = TestCachePut();
256        let mut ctx = TestCachePutCtx::new(put, key.clone(), &*CACHE_BACKEND, None, span);
257        let payload = b"HTTP/1.1 200 OK\r\n\
258        Date: Thu, 26 Apr 2018 05:42:05 GMT\r\n\
259        Content-Type: text/html; charset=utf-8\r\n\
260        Connection: keep-alive\r\n\
261        X-Frame-Options: SAMEORIGIN\r\n\
262        Cache-Control: no-store\r\n\
263        Server: origin-server\r\n\
264        Content-Length: 4\r\n\r\nrust";
265        // here we skip mocking a real http session for simplicity
266        let no_cache = ctx.do_cache_put(payload).await.unwrap().unwrap();
267        assert_eq!(no_cache, NoCacheReason::OriginNotCache);
268        ctx.parser.finish().unwrap();
269        ctx.finish().await.unwrap();
270    }
271
272    #[tokio::test]
273    async fn test_cache_put_204_invalid_body() {
274        let key = CacheKey::new("", "b", "1");
275        let span = Span::inactive();
276        let put = TestCachePut();
277        let mut ctx = TestCachePutCtx::new(put, key.clone(), &*CACHE_BACKEND, None, span);
278        let payload = b"HTTP/1.1 204 OK\r\n\
279        Date: Thu, 26 Apr 2018 05:42:05 GMT\r\n\
280        Content-Type: text/html; charset=utf-8\r\n\
281        Connection: keep-alive\r\n\
282        X-Frame-Options: SAMEORIGIN\r\n\
283        Cache-Control: public, max-age=1\r\n\
284        Server: origin-server\r\n\
285        Content-Length: 4\r\n\r\n";
286        // here we skip mocking a real http session for simplicity
287        let res = ctx.do_cache_put(payload).await.unwrap();
288        assert!(res.is_none()); // cacheable
289                                // 204 should not have body, invalid client input may try to pass one
290        let res = ctx.do_cache_put(b"rust").await.unwrap();
291        assert!(res.is_none()); // still cacheable
292        ctx.parser.finish().unwrap();
293        ctx.finish().await.unwrap();
294
295        let span = Span::inactive();
296        let (meta, mut hit) = CACHE_BACKEND
297            .lookup(&key, &span.handle())
298            .await
299            .unwrap()
300            .unwrap();
301        assert_eq!(
302            meta.headers().get("date").unwrap(),
303            "Thu, 26 Apr 2018 05:42:05 GMT"
304        );
305        // just treated as empty body
306        // (TODO: should we reset content-length/transfer-encoding
307        // headers on 204/304?)
308        let data = hit.read_body().await.unwrap().unwrap();
309        assert!(data.is_empty());
310    }
311
312    #[tokio::test]
313    async fn test_cache_put_extra_body() {
314        let key = CacheKey::new("", "c", "1");
315        let span = Span::inactive();
316        let put = TestCachePut();
317        let mut ctx = TestCachePutCtx::new(put, key.clone(), &*CACHE_BACKEND, None, span);
318        let payload = b"HTTP/1.1 200 OK\r\n\
319        Date: Thu, 26 Apr 2018 05:42:05 GMT\r\n\
320        Content-Type: text/html; charset=utf-8\r\n\
321        Connection: keep-alive\r\n\
322        X-Frame-Options: SAMEORIGIN\r\n\
323        Cache-Control: public, max-age=1\r\n\
324        Server: origin-server\r\n\
325        Content-Length: 4\r\n\r\n";
326        // here we skip mocking a real http session for simplicity
327        let res = ctx.do_cache_put(payload).await.unwrap();
328        assert!(res.is_none()); // cacheable
329                                // pass in more extra request body that needs to be drained
330        let res = ctx.do_cache_put(b"rustab").await.unwrap();
331        assert!(res.is_none()); // still cacheable
332        let res = ctx.do_cache_put(b"cdef").await.unwrap();
333        assert!(res.is_none()); // still cacheable
334        ctx.parser.finish().unwrap();
335        ctx.finish().await.unwrap();
336
337        let span = Span::inactive();
338        let (meta, mut hit) = CACHE_BACKEND
339            .lookup(&key, &span.handle())
340            .await
341            .unwrap()
342            .unwrap();
343        assert_eq!(
344            meta.headers().get("date").unwrap(),
345            "Thu, 26 Apr 2018 05:42:05 GMT"
346        );
347        let data = hit.read_body().await.unwrap().unwrap();
348        // body only contains specified content-length bounds
349        assert_eq!(data, "rust");
350    }
351}
352
353// maybe this can simplify some logic in pingora::h1
354
355mod parse_response {
356    use super::*;
357    use bytes::BytesMut;
358    use httparse::Status;
359    use pingora_error::{
360        Error,
361        ErrorType::{self, *},
362    };
363
364    pub const INCOMPLETE_BODY: ErrorType = ErrorType::new("IncompleteHttpBody");
365
366    const MAX_HEADERS: usize = 256;
367    const INIT_HEADER_BUF_SIZE: usize = 4096;
368
369    #[derive(Debug, Clone, Copy, PartialEq)]
370    enum ParseState {
371        Init,
372        PartialHeader,
373        PartialBodyContentLength(usize, usize),
374        PartialBody(usize),
375        Done(usize),
376        Invalid(httparse::Error),
377    }
378
379    impl ParseState {
380        fn is_done(&self) -> bool {
381            matches!(self, Self::Done(_))
382        }
383        fn read_header(&self) -> bool {
384            matches!(self, Self::Init | Self::PartialHeader)
385        }
386        fn read_body(&self) -> bool {
387            matches!(
388                self,
389                Self::PartialBodyContentLength(..) | Self::PartialBody(_)
390            )
391        }
392    }
393
394    pub(super) struct ResponseParse {
395        state: ParseState,
396        buf: BytesMut,
397        header_bytes: Bytes,
398    }
399
400    impl ResponseParse {
401        pub fn new() -> Self {
402            ResponseParse {
403                state: ParseState::Init,
404                buf: BytesMut::with_capacity(INIT_HEADER_BUF_SIZE),
405                header_bytes: Bytes::new(),
406            }
407        }
408
409        pub fn inject_data(&mut self, data: &[u8]) -> Result<Vec<HttpTask>> {
410            if self.state.is_done() {
411                // just ignore extra response body after parser is done
412                // could be invalid body appended to a no-content status
413                // or invalid body after content-length
414                // TODO: consider propagating an error to the client
415                return Ok(vec![]);
416            }
417
418            self.put_data(data);
419
420            let mut tasks = vec![];
421            while !self.state.is_done() {
422                if self.state.read_header() {
423                    let header = self.parse_header()?;
424                    let Some(header) = header else {
425                        break;
426                    };
427                    tasks.push(HttpTask::Header(Box::new(header), self.state.is_done()));
428                } else if self.state.read_body() {
429                    let body = self.parse_body()?;
430                    let Some(body) = body else {
431                        break;
432                    };
433                    tasks.push(HttpTask::Body(Some(body), self.state.is_done()));
434                } else {
435                    break;
436                }
437            }
438            Ok(tasks)
439        }
440
441        fn put_data(&mut self, data: &[u8]) {
442            use ParseState::*;
443            if matches!(self.state, Done(_) | Invalid(_)) {
444                panic!("Wrong phase {:?}", self.state);
445            }
446            self.buf.extend_from_slice(data);
447        }
448
449        fn parse_header(&mut self) -> Result<Option<ResponseHeader>> {
450            let mut headers = [httparse::EMPTY_HEADER; MAX_HEADERS];
451            let mut resp = httparse::Response::new(&mut headers);
452            let mut parser = httparse::ParserConfig::default();
453            parser.allow_spaces_after_header_name_in_responses(true);
454            parser.allow_obsolete_multiline_headers_in_responses(true);
455
456            let res = parser.parse_response(&mut resp, &self.buf);
457            let res = match res {
458                Ok(res) => res,
459                Err(e) => {
460                    self.state = ParseState::Invalid(e);
461                    return Error::e_because(
462                        InvalidHTTPHeader,
463                        format!("buf: {:?}", String::from_utf8_lossy(&self.buf)),
464                        e,
465                    );
466                }
467            };
468
469            let split_to = match res {
470                Status::Complete(s) => s,
471                Status::Partial => {
472                    self.state = ParseState::PartialHeader;
473                    return Ok(None);
474                }
475            };
476            // safe to unwrap, valid response always has code set.
477            let mut response =
478                ResponseHeader::build(resp.code.unwrap(), Some(resp.headers.len())).unwrap();
479            for header in resp.headers {
480                // TODO: consider hold a Bytes and all header values can be Bytes referencing the
481                // original buffer without reallocation
482                response.append_header(header.name.to_owned(), header.value.to_owned())?;
483            }
484            // TODO: see above, we can make header value `Bytes` referencing header_bytes
485            let header_bytes = self.buf.split_to(split_to).freeze();
486            self.header_bytes = header_bytes;
487            self.state = body_type(&response);
488
489            Ok(Some(response))
490        }
491
492        fn parse_body(&mut self) -> Result<Option<Bytes>> {
493            use ParseState::*;
494            if self.buf.is_empty() {
495                return Ok(None);
496            }
497            match self.state {
498                Init | PartialHeader | Invalid(_) => {
499                    panic!("Wrong phase {:?}", self.state);
500                }
501                Done(_) => Ok(None),
502                PartialBodyContentLength(total, mut seen) => {
503                    let end = if total < self.buf.len() + seen {
504                        // TODO: warn! more data than expected
505                        total - seen
506                    } else {
507                        self.buf.len()
508                    };
509                    seen += end;
510                    if seen >= total {
511                        self.state = Done(seen);
512                    } else {
513                        self.state = PartialBodyContentLength(total, seen);
514                    }
515                    Ok(Some(self.buf.split_to(end).freeze()))
516                }
517                PartialBody(seen) => {
518                    self.state = PartialBody(seen + self.buf.len());
519                    Ok(Some(self.buf.split().freeze()))
520                }
521            }
522        }
523
524        pub fn finish(&mut self) -> Result<()> {
525            if let ParseState::PartialBody(seen) = self.state {
526                self.state = ParseState::Done(seen);
527            }
528            if !self.state.is_done() {
529                Error::e_explain(INCOMPLETE_BODY, format!("{:?}", self.state))
530            } else {
531                Ok(())
532            }
533        }
534    }
535
536    fn body_type(resp: &ResponseHeader) -> ParseState {
537        use http::StatusCode;
538
539        if matches!(
540            resp.status,
541            StatusCode::NO_CONTENT | StatusCode::NOT_MODIFIED
542        ) {
543            // these status codes cannot have body by definition
544            return ParseState::Done(0);
545        }
546        if let Some(cl) = resp.headers.get(http::header::CONTENT_LENGTH) {
547            // ignore invalid header value
548            if let Some(cl) = std::str::from_utf8(cl.as_bytes())
549                .ok()
550                .and_then(|cl| cl.parse::<usize>().ok())
551            {
552                return if cl == 0 {
553                    ParseState::Done(0)
554                } else {
555                    ParseState::PartialBodyContentLength(cl, 0)
556                };
557            }
558        }
559        // HTTP/1.0 and chunked encoding are both treated as PartialBody
560        // The response body payload should _not_ be chunked encoded
561        // even if the Transfer-Encoding: chunked header is added
562        ParseState::PartialBody(0)
563    }
564
565    #[cfg(test)]
566    mod test {
567        use super::*;
568
569        #[test]
570        fn test_basic_response() {
571            let input = b"HTTP/1.1 200 OK\r\n\r\n";
572            let mut parser = ResponseParse::new();
573            let output = parser.inject_data(input).unwrap();
574            assert_eq!(output.len(), 1);
575            let HttpTask::Header(header, eos) = &output[0] else {
576                panic!("{:?}", output);
577            };
578            assert_eq!(header.status, 200);
579            assert!(!eos);
580
581            let body = b"abc";
582            let output = parser.inject_data(body).unwrap();
583            assert_eq!(output.len(), 1);
584            let HttpTask::Body(data, _eos) = &output[0] else {
585                panic!("{:?}", output);
586            };
587            assert_eq!(data.as_ref().unwrap(), &body[..]);
588            parser.finish().unwrap();
589        }
590
591        #[test]
592        fn test_partial_response_headers() {
593            let input = b"HTTP/1.1 200 OK\r\n";
594            let mut parser = ResponseParse::new();
595            let output = parser.inject_data(input).unwrap();
596            // header is not complete
597            assert_eq!(output.len(), 0);
598
599            let output = parser
600                .inject_data("Server: pingora\r\n\r\n".as_bytes())
601                .unwrap();
602            assert_eq!(output.len(), 1);
603            let HttpTask::Header(header, eos) = &output[0] else {
604                panic!("{:?}", output);
605            };
606            assert_eq!(header.status, 200);
607            assert_eq!(header.headers.get("Server").unwrap(), "pingora");
608            assert!(!eos);
609        }
610
611        #[test]
612        fn test_invalid_headers() {
613            let input = b"HTP/1.1 200 OK\r\nServer: pingora\r\n\r\n";
614            let mut parser = ResponseParse::new();
615            let output = parser.inject_data(input);
616            // header is not complete
617            assert!(output.is_err());
618            match parser.state {
619                ParseState::Invalid(httparse::Error::Version) => {}
620                _ => panic!("should have failed to parse"),
621            }
622        }
623
624        #[test]
625        fn test_body_content_length() {
626            let input = b"HTTP/1.1 200 OK\r\nContent-Length: 6\r\n\r\nabc";
627            let mut parser = ResponseParse::new();
628            let output = parser.inject_data(input).unwrap();
629
630            assert_eq!(output.len(), 2);
631            let HttpTask::Header(header, _eos) = &output[0] else {
632                panic!("{:?}", output);
633            };
634            assert_eq!(header.status, 200);
635
636            let HttpTask::Body(data, eos) = &output[1] else {
637                panic!("{:?}", output);
638            };
639            assert_eq!(data.as_ref().unwrap(), "abc");
640            assert!(!eos);
641
642            let output = parser.inject_data(b"def").unwrap();
643            assert_eq!(output.len(), 1);
644            let HttpTask::Body(data, eos) = &output[0] else {
645                panic!("{:?}", output);
646            };
647            assert_eq!(data.as_ref().unwrap(), "def");
648            assert!(eos);
649
650            parser.finish().unwrap();
651        }
652
653        #[test]
654        fn test_body_chunked() {
655            let input = b"HTTP/1.1 200 OK\r\nTransfer-Encoding: chunked\r\n\r\nrust";
656            let mut parser = ResponseParse::new();
657            let output = parser.inject_data(input).unwrap();
658
659            assert_eq!(output.len(), 2);
660            let HttpTask::Header(header, _eos) = &output[0] else {
661                panic!("{:?}", output);
662            };
663            assert_eq!(header.status, 200);
664
665            let HttpTask::Body(data, eos) = &output[1] else {
666                panic!("{:?}", output);
667            };
668            assert_eq!(data.as_ref().unwrap(), "rust");
669            assert!(!eos);
670
671            parser.finish().unwrap();
672        }
673
674        #[test]
675        fn test_body_content_length_early() {
676            let input = b"HTTP/1.1 200 OK\r\nContent-Length: 6\r\n\r\nabc";
677            let mut parser = ResponseParse::new();
678            let output = parser.inject_data(input).unwrap();
679
680            assert_eq!(output.len(), 2);
681            let HttpTask::Header(header, _eos) = &output[0] else {
682                panic!("{:?}", output);
683            };
684            assert_eq!(header.status, 200);
685
686            let HttpTask::Body(data, eos) = &output[1] else {
687                panic!("{:?}", output);
688            };
689            assert_eq!(data.as_ref().unwrap(), "abc");
690            assert!(!eos);
691
692            parser.finish().unwrap_err();
693        }
694
695        #[test]
696        fn test_body_content_length_more_data() {
697            let input = b"HTTP/1.1 200 OK\r\nContent-Length: 2\r\n\r\nabc";
698            let mut parser = ResponseParse::new();
699            let output = parser.inject_data(input).unwrap();
700
701            assert_eq!(output.len(), 2);
702            let HttpTask::Header(header, _eos) = &output[0] else {
703                panic!("{:?}", output);
704            };
705            assert_eq!(header.status, 200);
706
707            let HttpTask::Body(data, eos) = &output[1] else {
708                panic!("{:?}", output);
709            };
710            assert_eq!(data.as_ref().unwrap(), "ab");
711            assert!(eos);
712
713            // extra data is dropped without error
714            parser.finish().unwrap();
715        }
716
717        #[test]
718        fn test_body_chunked_partial_chunk() {
719            let input = b"HTTP/1.1 200 OK\r\nTransfer-Encoding: chunked\r\n\r\nru";
720            let mut parser = ResponseParse::new();
721            let output = parser.inject_data(input).unwrap();
722
723            assert_eq!(output.len(), 2);
724            let HttpTask::Header(header, _eos) = &output[0] else {
725                panic!("{:?}", output);
726            };
727            assert_eq!(header.status, 200);
728
729            let HttpTask::Body(data, eos) = &output[1] else {
730                panic!("{:?}", output);
731            };
732            assert_eq!(data.as_ref().unwrap(), "ru");
733            assert!(!eos);
734
735            let output = parser.inject_data(b"st\r\n").unwrap();
736            assert_eq!(output.len(), 1);
737            let HttpTask::Body(data, eos) = &output[0] else {
738                panic!("{:?}", output);
739            };
740            assert_eq!(data.as_ref().unwrap(), "st\r\n");
741            assert!(!eos);
742        }
743
744        #[test]
745        fn test_no_body_content_length() {
746            let input = b"HTTP/1.1 200 OK\r\nContent-Length: 0\r\n\r\n";
747            let mut parser = ResponseParse::new();
748            let output = parser.inject_data(input).unwrap();
749
750            assert_eq!(output.len(), 1);
751            let HttpTask::Header(header, eos) = &output[0] else {
752                panic!("{:?}", output);
753            };
754            assert_eq!(header.status, 200);
755            assert!(eos);
756
757            parser.finish().unwrap();
758        }
759
760        #[test]
761        fn test_no_body_304_no_content_length() {
762            let input = b"HTTP/1.1 304 Not Modified\r\nCache-Control: public, max-age=10\r\n\r\n";
763            let mut parser = ResponseParse::new();
764            let output = parser.inject_data(input).unwrap();
765
766            assert_eq!(output.len(), 1);
767            let HttpTask::Header(header, eos) = &output[0] else {
768                panic!("{:?}", output);
769            };
770            assert_eq!(header.status, 304);
771            assert!(eos);
772
773            parser.finish().unwrap();
774        }
775
776        #[test]
777        fn test_204_with_chunked_body() {
778            let input = b"HTTP/1.1 204 No Content\r\nCache-Control: public, max-age=10\r\nTransfer-Encoding: chunked\r\n\r\n";
779            let mut parser = ResponseParse::new();
780            let output = parser.inject_data(input).unwrap();
781
782            assert_eq!(output.len(), 1);
783            let HttpTask::Header(header, eos) = &output[0] else {
784                panic!("{:?}", output);
785            };
786            assert_eq!(header.status, 204);
787            assert!(eos);
788
789            // 204 should not have a body, parser ignores bad input
790            let output = parser.inject_data(b"4\r\nrust\r\n0\r\n\r\n").unwrap();
791            assert!(output.is_empty());
792            parser.finish().unwrap();
793        }
794
795        #[test]
796        fn test_204_with_content_length() {
797            let input = b"HTTP/1.1 204 No Content\r\nCache-Control: public, max-age=10\r\nContent-Length: 4\r\n\r\n";
798            let mut parser = ResponseParse::new();
799            let output = parser.inject_data(input).unwrap();
800
801            assert_eq!(output.len(), 1);
802            let HttpTask::Header(header, eos) = &output[0] else {
803                panic!("{:?}", output);
804            };
805            assert_eq!(header.status, 204);
806            assert!(eos);
807
808            // 204 should not have a body, parser ignores bad input
809            let output = parser.inject_data(b"rust").unwrap();
810            assert!(output.is_empty());
811            parser.finish().unwrap();
812        }
813
814        #[test]
815        fn test_200_with_zero_content_length_more_data() {
816            let input = b"HTTP/1.1 200 OK\r\nCache-Control: public, max-age=10\r\nContent-Length: 0\r\n\r\n";
817            let mut parser = ResponseParse::new();
818            let output = parser.inject_data(input).unwrap();
819
820            assert_eq!(output.len(), 1);
821            let HttpTask::Header(header, eos) = &output[0] else {
822                panic!("{:?}", output);
823            };
824            assert_eq!(header.status, 200);
825            assert!(eos);
826
827            let output = parser.inject_data(b"rust").unwrap();
828            assert!(output.is_empty());
829            parser.finish().unwrap();
830        }
831    }
832}