pingora_proxy/
proxy_cache.rs

1// Copyright 2025 Cloudflare, Inc.
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7// http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use super::*;
16use http::{Method, StatusCode};
17use pingora_cache::key::CacheHashKey;
18use pingora_cache::lock::LockStatus;
19use pingora_cache::max_file_size::ERR_RESPONSE_TOO_LARGE;
20use pingora_cache::{ForcedInvalidationKind, HitStatus, RespCacheable::*};
21use pingora_core::protocols::http::conditional_filter::to_304;
22use pingora_core::protocols::http::v1::common::header_value_content_length;
23use pingora_core::ErrorType;
24use range_filter::RangeBodyFilter;
25use std::time::SystemTime;
26
27impl<SV> HttpProxy<SV> {
28    // return bool: server_session can be reused, and error if any
29    pub(crate) async fn proxy_cache(
30        self: &Arc<Self>,
31        session: &mut Session,
32        ctx: &mut SV::CTX,
33    ) -> Option<(bool, Option<Box<Error>>)>
34    // None: continue to proxy, Some: return
35    where
36        SV: ProxyHttp + Send + Sync + 'static,
37        SV::CTX: Send + Sync,
38    {
39        // Cache logic request phase
40        if let Err(e) = self.inner.request_cache_filter(session, ctx) {
41            // TODO: handle this error
42            warn!(
43                "Fail to request_cache_filter: {e}, {}",
44                self.inner.request_summary(session, ctx)
45            );
46        }
47
48        // cache key logic, should this be part of request_cache_filter?
49        if session.cache.enabled() {
50            match self.inner.cache_key_callback(session, ctx) {
51                Ok(key) => {
52                    session.cache.set_cache_key(key);
53                }
54                Err(e) => {
55                    // TODO: handle this error
56                    session.cache.disable(NoCacheReason::StorageError);
57                    warn!(
58                        "Fail to cache_key_callback: {e}, {}",
59                        self.inner.request_summary(session, ctx)
60                    );
61                }
62            }
63        }
64
65        // cache purge logic: PURGE short-circuits rest of request
66        if self.inner.is_purge(session, ctx) {
67            return self.proxy_purge(session, ctx).await;
68        }
69
70        // bypass cache lookup if we predict to be uncacheable
71        if session.cache.enabled() && !session.cache.cacheable_prediction() {
72            session.cache.bypass();
73        }
74
75        if !session.cache.enabled() {
76            return None;
77        }
78
79        // cache lookup logic
80        loop {
81            // for cache lock, TODO: cap the max number of loops
82            match session.cache.cache_lookup().await {
83                Ok(res) => {
84                    let mut hit_status_opt = None;
85                    if let Some((mut meta, handler)) = res {
86                        // Vary logic
87                        // Because this branch can be called multiple times in a loop, and we only
88                        // need to update the vary once, check if variance is already set to
89                        // prevent unnecessary vary lookups.
90                        let cache_key = session.cache.cache_key();
91                        if let Some(variance) = cache_key.variance_bin() {
92                            // We've looked up a secondary slot.
93                            // Adhoc double check that the variance found is the variance we want.
94                            if Some(variance) != meta.variance() {
95                                warn!("Cache variance mismatch, {variance:?}, {cache_key:?}");
96                                session.cache.disable(NoCacheReason::InternalError);
97                                break None;
98                            }
99                        } else {
100                            // Basic cache key; either variance is off, or this is the primary slot.
101                            let req_header = session.req_header();
102                            let variance = self.inner.cache_vary_filter(&meta, ctx, req_header);
103                            if let Some(variance) = variance {
104                                // Variance is on. This is the primary slot.
105                                if !session.cache.cache_vary_lookup(variance, &meta) {
106                                    // This wasn't the desired variant. Updated cache key variance, cause another
107                                    // lookup to get the desired variant, which would be in a secondary slot.
108                                    continue;
109                                }
110                            } // else: vary is not in use
111                        }
112
113                        // Either no variance, or the current handler targets the correct variant.
114
115                        // hit
116                        // TODO: maybe round and/or cache now()
117                        let is_fresh = meta.is_fresh(SystemTime::now());
118                        // check if we should force expire or force miss
119                        let hit_status = match self
120                            .inner
121                            .cache_hit_filter(session, &meta, is_fresh, ctx)
122                            .await
123                        {
124                            Err(e) => {
125                                error!(
126                                    "Failed to filter cache hit: {e}, {}",
127                                    self.inner.request_summary(session, ctx)
128                                );
129                                // this return value will cause us to fetch from upstream
130                                HitStatus::FailedHitFilter
131                            }
132                            Ok(None) => {
133                                if is_fresh {
134                                    HitStatus::Fresh
135                                } else {
136                                    HitStatus::Expired
137                                }
138                            }
139                            Ok(Some(ForcedInvalidationKind::ForceExpired)) => {
140                                // force expired asset should not be serve as stale
141                                // because force expire is usually to remove data
142                                meta.disable_serve_stale();
143                                HitStatus::ForceExpired
144                            }
145                            Ok(Some(ForcedInvalidationKind::ForceMiss)) => HitStatus::ForceMiss,
146                        };
147
148                        hit_status_opt = Some(hit_status);
149
150                        // init cache for hit / stale
151                        session.cache.cache_found(meta, handler, hit_status);
152                    }
153
154                    if hit_status_opt.map_or(true, HitStatus::is_treated_as_miss) {
155                        // cache miss
156                        if session.cache.is_cache_locked() {
157                            // Another request is filling the cache; try waiting til that's done and retry.
158                            let lock_status = session.cache.cache_lock_wait().await;
159                            if self.handle_lock_status(session, ctx, lock_status) {
160                                continue;
161                            } else {
162                                break None;
163                            }
164                        } else {
165                            self.inner.cache_miss(session, ctx);
166                            break None;
167                        }
168                    }
169
170                    // Safe because an empty hit status would have broken out
171                    // in the block above
172                    let hit_status = hit_status_opt.expect("None case handled as miss");
173
174                    if !hit_status.is_fresh() {
175                        // expired or force expired asset
176                        if session.cache.is_cache_locked() {
177                            // first if this is the sub request for the background cache update
178                            if let Some(write_lock) = session
179                                .subrequest_ctx
180                                .as_mut()
181                                .and_then(|ctx| ctx.take_write_lock())
182                            {
183                                // Put the write lock in the request
184                                session.cache.set_write_lock(write_lock);
185                                session.cache.tag_as_subrequest();
186                                // and then let it go to upstream
187                                break None;
188                            }
189                            let will_serve_stale = session.cache.can_serve_stale_updating()
190                                && self.inner.should_serve_stale(session, ctx, None);
191                            if !will_serve_stale {
192                                let lock_status = session.cache.cache_lock_wait().await;
193                                if self.handle_lock_status(session, ctx, lock_status) {
194                                    continue;
195                                } else {
196                                    break None;
197                                }
198                            }
199                            // else continue to serve stale
200                            session.cache.set_stale_updating();
201                        } else if session.cache.is_cache_lock_writer() {
202                            // stale while revalidate logic for the writer
203                            let will_serve_stale = session.cache.can_serve_stale_updating()
204                                && self.inner.should_serve_stale(session, ctx, None);
205                            if will_serve_stale {
206                                // create a background thread to do the actual update
207                                let subrequest =
208                                    Box::new(crate::subrequest::create_dummy_session(session));
209                                let new_app = self.clone(); // Clone the Arc
210                                let (permit, cache_lock) = session.cache.take_write_lock();
211                                let sub_req_ctx = Box::new(SubReqCtx::with_write_lock(
212                                    cache_lock,
213                                    session.cache.cache_key().clone(),
214                                    permit,
215                                ));
216                                tokio::spawn(async move {
217                                    new_app.process_subrequest(subrequest, sub_req_ctx).await;
218                                });
219                                // continue to serve stale for this request
220                                session.cache.set_stale_updating();
221                            } else {
222                                // return to fetch from upstream
223                                break None;
224                            }
225                        } else {
226                            // return to fetch from upstream
227                            break None;
228                        }
229                    }
230
231                    let (reuse, err) = self.proxy_cache_hit(session, ctx).await;
232                    if let Some(e) = err.as_ref() {
233                        error!(
234                            "Fail to serve cache: {e}, {}",
235                            self.inner.request_summary(session, ctx)
236                        );
237                    }
238                    // responses is served from cache, exit
239                    break Some((reuse, err));
240                }
241                Err(e) => {
242                    // Allow cache miss to fill cache even if cache lookup errors
243                    // this is mostly to support backward incompatible metadata update
244                    // TODO: check error types
245                    // session.cache.disable();
246                    self.inner.cache_miss(session, ctx);
247                    warn!(
248                        "Fail to cache lookup: {e}, {}",
249                        self.inner.request_summary(session, ctx)
250                    );
251                    break None;
252                }
253            }
254        }
255    }
256
257    // return bool: server_session can be reused, and error if any
258    pub(crate) async fn proxy_cache_hit(
259        &self,
260        session: &mut Session,
261        ctx: &mut SV::CTX,
262    ) -> (bool, Option<Box<Error>>)
263    where
264        SV: ProxyHttp + Send + Sync,
265        SV::CTX: Send + Sync,
266    {
267        use range_filter::*;
268
269        let seekable = session.cache.hit_handler().can_seek();
270        let mut header = cache_hit_header(&session.cache);
271
272        let req = session.req_header();
273
274        let not_modified = match self.inner.cache_not_modified_filter(session, &header, ctx) {
275            Ok(not_modified) => not_modified,
276            Err(e) => {
277                // fail open if cache_not_modified_filter errors,
278                // just return the whole original response
279                warn!(
280                    "Failed to run cache not modified filter: {e}, {}",
281                    self.inner.request_summary(session, ctx)
282                );
283                false
284            }
285        };
286        if not_modified {
287            to_304(&mut header);
288        }
289        let header_only = not_modified || req.method == http::method::Method::HEAD;
290
291        // process range header if the cache storage supports seek
292        let range_type = if seekable && !session.ignore_downstream_range {
293            self.inner.range_header_filter(req, &mut header, ctx)
294        } else {
295            RangeType::None
296        };
297
298        // return a 416 with an empty body for simplicity
299        let header_only = header_only || matches!(range_type, RangeType::Invalid);
300
301        // TODO: use ProxyUseCache to replace the logic below
302        match self.inner.response_filter(session, &mut header, ctx).await {
303            Ok(_) => {
304                if let Err(e) = session
305                    .downstream_modules_ctx
306                    .response_header_filter(&mut header, header_only)
307                    .await
308                {
309                    error!(
310                        "Failed to run downstream modules response header filter in hit: {e}, {}",
311                        self.inner.request_summary(session, ctx)
312                    );
313                    session
314                        .as_mut()
315                        .respond_error(500)
316                        .await
317                        .unwrap_or_else(|e| {
318                            error!("failed to send error response to downstream: {e}");
319                        });
320                    // we have not write anything dirty to downstream, it is still reusable
321                    return (true, Some(e));
322                }
323
324                if let Err(e) = session
325                    .as_mut()
326                    .write_response_header(header)
327                    .await
328                    .map_err(|e| e.into_down())
329                {
330                    // downstream connection is bad already
331                    return (false, Some(e));
332                }
333            }
334            Err(e) => {
335                error!(
336                    "Failed to run response filter in hit: {e}, {}",
337                    self.inner.request_summary(session, ctx)
338                );
339                session
340                    .as_mut()
341                    .respond_error(500)
342                    .await
343                    .unwrap_or_else(|e| {
344                        error!("failed to send error response to downstream: {e}");
345                    });
346                // we have not write anything dirty to downstream, it is still reusable
347                return (true, Some(e));
348            }
349        }
350        debug!("finished sending cached header to downstream");
351
352        if !header_only {
353            if let RangeType::Single(r) = range_type {
354                if let Err(e) = session.cache.hit_handler().seek(r.start, Some(r.end)) {
355                    return (false, Some(e));
356                }
357            }
358            loop {
359                match session.cache.hit_handler().read_body().await {
360                    Ok(mut body) => {
361                        let end = body.is_none();
362                        match self
363                            .inner
364                            .response_body_filter(session, &mut body, end, ctx)
365                        {
366                            Ok(Some(duration)) => {
367                                trace!("delaying response for {duration:?}");
368                                time::sleep(duration).await;
369                            }
370                            Ok(None) => { /* continue */ }
371                            Err(e) => {
372                                // body is being sent, don't treat downstream as reusable
373                                return (false, Some(e));
374                            }
375                        }
376
377                        if let Err(e) = session
378                            .downstream_modules_ctx
379                            .response_body_filter(&mut body, end)
380                        {
381                            // body is being sent, don't treat downstream as reusable
382                            return (false, Some(e));
383                        }
384
385                        if !end && body.as_ref().map_or(true, |b| b.is_empty()) {
386                            // Don't write empty body which will end session,
387                            // still more hit handler bytes to read
388                            continue;
389                        }
390
391                        // write to downstream
392                        let b = body.unwrap_or_default();
393                        if let Err(e) = session
394                            .as_mut()
395                            .write_response_body(b, end)
396                            .await
397                            .map_err(|e| e.into_down())
398                        {
399                            return (false, Some(e));
400                        }
401                        if end {
402                            break;
403                        }
404                    }
405                    Err(e) => return (false, Some(e)),
406                }
407            }
408        }
409
410        if let Err(e) = session.cache.finish_hit_handler().await {
411            warn!("Error during finish_hit_handler: {}", e);
412        }
413
414        match session.as_mut().finish_body().await {
415            Ok(_) => {
416                debug!("finished sending cached body to downstream");
417                (true, None)
418            }
419            Err(e) => (false, Some(e)),
420        }
421    }
422
423    /* Downstream revalidation, only needed when cache is on because otherwise origin
424     * will handle it */
425    pub(crate) fn downstream_response_conditional_filter(
426        &self,
427        use_cache: &mut ServeFromCache,
428        session: &Session,
429        resp: &mut ResponseHeader,
430        ctx: &mut SV::CTX,
431    ) where
432        SV: ProxyHttp,
433    {
434        // TODO: range
435        let req = session.req_header();
436
437        let not_modified = match self.inner.cache_not_modified_filter(session, resp, ctx) {
438            Ok(not_modified) => not_modified,
439            Err(e) => {
440                // fail open if cache_not_modified_filter errors,
441                // just return the whole original response
442                warn!(
443                    "Failed to run cache not modified filter: {e}, {}",
444                    self.inner.request_summary(session, ctx)
445                );
446                false
447            }
448        };
449
450        if not_modified {
451            to_304(resp);
452        }
453        let header_only = not_modified || req.method == http::method::Method::HEAD;
454        if header_only {
455            if use_cache.is_on() {
456                // tell cache to stop after yielding header
457                use_cache.enable_header_only();
458            } else {
459                // headers only during cache miss, upstream should continue send
460                // body to cache, `session` will ignore body automatically because
461                // of the signature of `header` (304)
462                // TODO: we should drop body before/within this filter so that body
463                // filter only runs on data downstream sees
464            }
465        }
466    }
467
468    // TODO: cache upstream header filter to add/remove headers
469
470    pub(crate) async fn cache_http_task(
471        &self,
472        session: &mut Session,
473        task: &HttpTask,
474        ctx: &mut SV::CTX,
475        serve_from_cache: &mut ServeFromCache,
476    ) -> Result<()>
477    where
478        SV: ProxyHttp + Send + Sync,
479        SV::CTX: Send + Sync,
480    {
481        if !session.cache.enabled() && !session.cache.bypassing() {
482            return Ok(());
483        }
484
485        match task {
486            HttpTask::Header(header, end_stream) => {
487                // decide if cacheable and create cache meta
488                // for now, skip 1xxs (should not affect response cache decisions)
489                // However 101 is an exception because it is the final response header
490                if header.status.is_informational()
491                    && header.status != StatusCode::SWITCHING_PROTOCOLS
492                {
493                    return Ok(());
494                }
495                match self.inner.response_cache_filter(session, header, ctx)? {
496                    Cacheable(meta) => {
497                        let mut fill_cache = true;
498                        if session.cache.bypassing() {
499                            // The cache might have been bypassed because the response exceeded the
500                            // maximum cacheable asset size. If that looks like the case (there
501                            // is a maximum file size configured and we don't know the content
502                            // length up front), attempting to re-enable the cache now would cause
503                            // the request to fail when the chunked response exceeds the maximum
504                            // file size again.
505                            if session.cache.max_file_size_bytes().is_some()
506                                && !meta.headers().contains_key(header::CONTENT_LENGTH)
507                            {
508                                session.cache.disable(NoCacheReason::ResponseTooLarge);
509                                return Ok(());
510                            }
511
512                            session.cache.response_became_cacheable();
513
514                            if session.req_header().method == Method::GET
515                                && meta.response_header().status == StatusCode::OK
516                            {
517                                self.inner.cache_miss(session, ctx);
518                            } else {
519                                // we've allowed caching on the next request,
520                                // but do not cache _this_ request if bypassed and not 200
521                                // (We didn't run upstream request cache filters to strip range or condition headers,
522                                // so this could be an uncacheable response e.g. 206 or 304 or HEAD.
523                                // Exclude all non-200/GET for simplicity, may expand allowable codes in the future.)
524                                fill_cache = false;
525                                session.cache.disable(NoCacheReason::Deferred);
526                            }
527                        }
528
529                        // If the Content-Length is known, and a maximum asset size has been configured
530                        // on the cache, validate that the response does not exceed the maximum asset size.
531                        if session.cache.enabled() {
532                            if let Some(max_file_size) = session.cache.max_file_size_bytes() {
533                                let content_length_hdr = meta.headers().get(header::CONTENT_LENGTH);
534                                if let Some(content_length) =
535                                    header_value_content_length(content_length_hdr)
536                                {
537                                    if content_length > max_file_size {
538                                        fill_cache = false;
539                                        session.cache.response_became_uncacheable(
540                                            NoCacheReason::ResponseTooLarge,
541                                        );
542                                        session.cache.disable(NoCacheReason::ResponseTooLarge);
543                                    }
544                                }
545                                // if the content-length header is not specified, the miss handler
546                                // will count the response size on the fly, aborting the request
547                                // mid-transfer if the max file size is exceeded
548                            }
549                        }
550                        if fill_cache {
551                            let req_header = session.req_header();
552                            // Update the variance in the meta via the same callback,
553                            // cache_vary_filter(), used in cache lookup for consistency.
554                            // Future cache lookups need a matching variance in the meta
555                            // with the cache key to pick up the correct variance
556                            let variance = self.inner.cache_vary_filter(&meta, ctx, req_header);
557                            session.cache.set_cache_meta(meta);
558                            session.cache.update_variance(variance);
559                            // this sends the meta and header
560                            session.cache.set_miss_handler().await?;
561                            if session.cache.miss_body_reader().is_some() {
562                                serve_from_cache.enable_miss();
563                            }
564                            if *end_stream {
565                                session
566                                    .cache
567                                    .miss_handler()
568                                    .unwrap() // safe, it is set above
569                                    .write_body(Bytes::new(), true)
570                                    .await?;
571                                session.cache.finish_miss_handler().await?;
572                            }
573                        }
574                    }
575                    Uncacheable(reason) => {
576                        if !session.cache.bypassing() {
577                            // mark as uncacheable, so we bypass cache next time
578                            session.cache.response_became_uncacheable(reason);
579                        }
580                        session.cache.disable(reason);
581                    }
582                }
583            }
584            HttpTask::Body(data, end_stream) => match data {
585                Some(d) => {
586                    if session.cache.enabled() {
587                        // this will panic if more data is sent after we see end_stream
588                        // but should be impossible in real world
589                        let miss_handler = session.cache.miss_handler().unwrap();
590                        // TODO: do this async
591                        let res = miss_handler.write_body(d.clone(), *end_stream).await;
592                        if let Err(err) = res {
593                            if err.etype == ERR_RESPONSE_TOO_LARGE {
594                                debug!("chunked response exceeded max cache size, remembering that it is uncacheable");
595                                session
596                                    .cache
597                                    .response_became_uncacheable(NoCacheReason::ResponseTooLarge);
598                            }
599
600                            return Err(err);
601                        }
602                        if *end_stream {
603                            session.cache.finish_miss_handler().await?;
604                        }
605                    }
606                }
607                None => {
608                    if session.cache.enabled() && *end_stream {
609                        session.cache.finish_miss_handler().await?;
610                    }
611                }
612            },
613            HttpTask::Trailer(_) => {} // h1 trailer is not supported yet
614            HttpTask::Done => {
615                if session.cache.enabled() {
616                    session.cache.finish_miss_handler().await?;
617                }
618            }
619            HttpTask::Failed(_) => {
620                // TODO: handle this failure: delete the temp files?
621            }
622        }
623        Ok(())
624    }
625
626    // Decide if local cache can be used according to upstream http header
627    // 1. when upstream returns 304, the local cache is refreshed and served fresh
628    // 2. when upstream returns certain HTTP error status, the local cache is served stale
629    // Return true if local cache should be used, false otherwise
630    pub(crate) async fn revalidate_or_stale(
631        &self,
632        session: &mut Session,
633        task: &mut HttpTask,
634        ctx: &mut SV::CTX,
635    ) -> bool
636    where
637        SV: ProxyHttp + Send + Sync,
638        SV::CTX: Send + Sync,
639    {
640        if !session.cache.enabled() {
641            return false;
642        }
643
644        match task {
645            HttpTask::Header(resp, _eos) => {
646                if resp.status == StatusCode::NOT_MODIFIED {
647                    if session.cache.maybe_cache_meta().is_some() {
648                        // run upstream response filters on upstream 304 first
649                        if let Err(err) = self.inner.upstream_response_filter(session, resp, ctx) {
650                            error!("upstream response filter error on 304: {err:?}");
651                            session.cache.revalidate_uncacheable(
652                                *resp.clone(),
653                                NoCacheReason::InternalError,
654                            );
655                            // always serve from cache after receiving the 304
656                            return true;
657                        }
658                        // 304 doesn't contain all the headers, merge 304 into cached 200 header
659                        // in order for response_cache_filter to run correctly
660                        let merged_header = session.cache.revalidate_merge_header(resp);
661                        match self
662                            .inner
663                            .response_cache_filter(session, &merged_header, ctx)
664                        {
665                            Ok(Cacheable(mut meta)) => {
666                                // For simplicity, ignore changes to variance over 304 for now.
667                                // Note this means upstream can only update variance via 2xx
668                                // (expired response).
669                                //
670                                // TODO: if we choose to respect changing Vary / variance over 304,
671                                // then there are a few cases to consider. See `update_variance` in
672                                // the `pingora-cache` module.
673                                let old_meta = session.cache.maybe_cache_meta().unwrap(); // safe, checked above
674                                if let Some(old_variance) = old_meta.variance() {
675                                    meta.set_variance(old_variance);
676                                }
677                                if let Err(e) = session.cache.revalidate_cache_meta(meta).await {
678                                    // Fail open: we can continue use the revalidated response even
679                                    // if the meta failed to write to storage
680                                    warn!("revalidate_cache_meta failed {e:?}");
681                                }
682                            }
683                            Ok(Uncacheable(reason)) => {
684                                // This response was once cacheable, and upstream tells us it has not changed
685                                // but now we decided it is uncacheable!
686                                // RFC 9111: still allowed to reuse stored response this time because
687                                // it was "successfully validated"
688                                // https://www.rfc-editor.org/rfc/rfc9111#constructing.responses.from.caches
689                                // Serve the response, but do not update cache
690
691                                // We also want to avoid poisoning downstream's cache with an unsolicited 304
692                                // if we did not receive a conditional request from downstream
693                                // (downstream may have a different cacheability assessment and could cache the 304)
694
695                                //TODO: log more
696                                warn!("Uncacheable {reason:?} 304 received");
697                                session.cache.response_became_uncacheable(reason);
698                                session.cache.revalidate_uncacheable(merged_header, reason);
699                            }
700                            Err(e) => {
701                                // Error during revalidation, similarly to the reasons above
702                                // (avoid poisoning downstream cache with passthrough 304),
703                                // allow serving the stored response without updating cache
704                                warn!("Error {e:?} response_cache_filter during revalidation");
705                                session.cache.revalidate_uncacheable(
706                                    merged_header,
707                                    NoCacheReason::InternalError,
708                                );
709                                // Assume the next 304 may succeed, so don't mark uncacheable
710                            }
711                        }
712                        // always serve from cache after receiving the 304
713                        true
714                    } else {
715                        //TODO: log more
716                        warn!("304 received without cached asset, disable caching");
717                        let reason = NoCacheReason::Custom("304 on miss");
718                        session.cache.response_became_uncacheable(reason);
719                        session.cache.disable(reason);
720                        false
721                    }
722                } else if resp.status.is_server_error() {
723                    // stale if error logic, 5xx only for now
724
725                    // this is response header filter, response_written should always be None?
726                    if !session.cache.can_serve_stale_error()
727                        || session.response_written().is_some()
728                    {
729                        return false;
730                    }
731
732                    // create an error to encode the http status code
733                    let http_status_error = Error::create(
734                        ErrorType::HTTPStatus(resp.status.as_u16()),
735                        ErrorSource::Upstream,
736                        None,
737                        None,
738                    );
739                    if self
740                        .inner
741                        .should_serve_stale(session, ctx, Some(&http_status_error))
742                    {
743                        // no more need to keep the write lock
744                        session
745                            .cache
746                            .release_write_lock(NoCacheReason::UpstreamError);
747                        true
748                    } else {
749                        false
750                    }
751                } else {
752                    false // not 304, not stale if error status code
753                }
754            }
755            _ => false, // not header
756        }
757    }
758
759    // None: no staled asset is used, Some(_): staled asset is sent to downstream
760    // bool: can the downstream connection be reused
761    pub(crate) async fn handle_stale_if_error(
762        &self,
763        session: &mut Session,
764        ctx: &mut SV::CTX,
765        error: &Error,
766    ) -> Option<(bool, Option<Box<Error>>)>
767    where
768        SV: ProxyHttp + Send + Sync,
769        SV::CTX: Send + Sync,
770    {
771        // the caller might already checked this as an optimization
772        if !session.cache.can_serve_stale_error() {
773            return None;
774        }
775
776        // the error happen halfway through a regular response to downstream
777        // can't resend the response
778        if session.response_written().is_some() {
779            return None;
780        }
781
782        // check error types
783        if !self.inner.should_serve_stale(session, ctx, Some(error)) {
784            return None;
785        }
786
787        // log the original error
788        warn!(
789            "Fail to proxy: {}, serving stale, {}",
790            error,
791            self.inner.request_summary(session, ctx)
792        );
793
794        // no more need to hang onto the cache lock
795        session
796            .cache
797            .release_write_lock(NoCacheReason::UpstreamError);
798
799        Some(self.proxy_cache_hit(session, ctx).await)
800    }
801
802    // helper function to check when to continue to retry lock (true) or give up (false)
803    fn handle_lock_status(
804        &self,
805        session: &mut Session,
806        ctx: &SV::CTX,
807        lock_status: LockStatus,
808    ) -> bool
809    where
810        SV: ProxyHttp,
811    {
812        debug!("cache unlocked {lock_status:?}");
813        match lock_status {
814            // should lookup the cached asset again
815            LockStatus::Done => true,
816            // should compete to be a new writer
817            LockStatus::TransientError => true,
818            // the request is uncacheable, go ahead to fetch from the origin
819            LockStatus::GiveUp => {
820                // TODO: It will be nice for the writer to propagate the real reason
821                session.cache.disable(NoCacheReason::CacheLockGiveUp);
822                // not cacheable, just go to the origin.
823                false
824            }
825            // treat this the same as TransientError
826            LockStatus::Dangling => {
827                // software bug, but request can recover from this
828                warn!(
829                    "Dangling cache lock, {}",
830                    self.inner.request_summary(session, ctx)
831                );
832                true
833            }
834            /* We have 3 options when a lock is held too long
835             * 1. release the lock and let every request complete for it again
836             * 2. let every request cache miss
837             * 3. let every request through while disabling cache
838             * #1 could repeat the situation but protect the origin from load
839             * #2 could amplify disk writes and storage for temp file
840             * #3 is the simplest option for now */
841            LockStatus::Timeout => {
842                warn!(
843                    "Cache lock timeout, {}",
844                    self.inner.request_summary(session, ctx)
845                );
846                session.cache.disable(NoCacheReason::CacheLockTimeout);
847                // not cacheable, just go to the origin.
848                false
849            }
850            // software bug, this status should be impossible to reach
851            LockStatus::Waiting => panic!("impossible LockStatus::Waiting"),
852        }
853    }
854}
855
856fn cache_hit_header(cache: &HttpCache) -> Box<ResponseHeader> {
857    let mut header = Box::new(cache.cache_meta().response_header_copy());
858    // convert cache response
859
860    // these status codes / method cannot have body, so no need to add chunked encoding
861    let no_body = matches!(header.status.as_u16(), 204 | 304);
862
863    // https://www.rfc-editor.org/rfc/rfc9111#section-4:
864    // When a stored response is used to satisfy a request without validation, a cache
865    // MUST generate an Age header field
866    if !cache.upstream_used() {
867        let age = cache.cache_meta().age().as_secs();
868        header.insert_header(http::header::AGE, age).unwrap();
869    }
870
871    /* Add chunked header to tell downstream to use chunked encoding
872     * during the absent of content-length in h2 */
873    if !no_body
874        && !header.status.is_informational()
875        && header.headers.get(http::header::CONTENT_LENGTH).is_none()
876    {
877        header
878            .insert_header(http::header::TRANSFER_ENCODING, "chunked")
879            .unwrap();
880    }
881    header
882}
883
884// https://datatracker.ietf.org/doc/html/rfc7233#section-3
885pub mod range_filter {
886    use super::*;
887    use http::header::*;
888    use std::ops::Range;
889
890    // parse bytes into usize, ignores specific error
891    fn parse_number(input: &[u8]) -> Option<usize> {
892        str::from_utf8(input).ok()?.parse().ok()
893    }
894
895    fn parse_range_header(range: &[u8], content_length: usize) -> RangeType {
896        use regex::Regex;
897
898        // single byte range only for now
899        // https://datatracker.ietf.org/doc/html/rfc7233#section-2.1
900        // https://datatracker.ietf.org/doc/html/rfc7233#appendix-C: case-insensitive
901        static RE_SINGLE_RANGE: Lazy<Regex> =
902            Lazy::new(|| Regex::new(r"(?i)bytes=(?P<start>\d*)-(?P<end>\d*)").unwrap());
903
904        // ignore invalid range header
905        let Ok(range_str) = str::from_utf8(range) else {
906            return RangeType::None;
907        };
908
909        let Some(captured) = RE_SINGLE_RANGE.captures(range_str) else {
910            return RangeType::None;
911        };
912        let maybe_start = captured
913            .name("start")
914            .and_then(|s| s.as_str().parse::<usize>().ok());
915        let end = captured
916            .name("end")
917            .and_then(|s| s.as_str().parse::<usize>().ok());
918
919        if let Some(start) = maybe_start {
920            if start >= content_length {
921                RangeType::Invalid
922            } else {
923                // open-ended range should end at the last byte
924                // over sized end is allow but ignored
925                // range end is inclusive
926                let end = std::cmp::min(end.unwrap_or(content_length - 1), content_length - 1) + 1;
927                if end <= start {
928                    RangeType::Invalid
929                } else {
930                    RangeType::new_single(start, end)
931                }
932            }
933        } else {
934            // start is empty, this changes the meaning of the value of `end`
935            // Now it means to read the last `end` bytes
936            if let Some(end) = end {
937                if content_length >= end {
938                    RangeType::new_single(content_length - end, content_length)
939                } else {
940                    // over sized end is allow but ignored
941                    RangeType::new_single(0, content_length)
942                }
943            } else {
944                // both empty/invalid
945                RangeType::Invalid
946            }
947        }
948    }
949    #[test]
950    fn test_parse_range() {
951        assert_eq!(
952            parse_range_header(b"bytes=0-1", 10),
953            RangeType::new_single(0, 2)
954        );
955        assert_eq!(
956            parse_range_header(b"bYTes=0-9", 10),
957            RangeType::new_single(0, 10)
958        );
959        assert_eq!(
960            parse_range_header(b"bytes=0-12", 10),
961            RangeType::new_single(0, 10)
962        );
963        assert_eq!(
964            parse_range_header(b"bytes=0-", 10),
965            RangeType::new_single(0, 10)
966        );
967        assert_eq!(parse_range_header(b"bytes=2-1", 10), RangeType::Invalid);
968        assert_eq!(parse_range_header(b"bytes=10-11", 10), RangeType::Invalid);
969        assert_eq!(
970            parse_range_header(b"bytes=-2", 10),
971            RangeType::new_single(8, 10)
972        );
973        assert_eq!(
974            parse_range_header(b"bytes=-12", 10),
975            RangeType::new_single(0, 10)
976        );
977        assert_eq!(parse_range_header(b"bytes=-", 10), RangeType::Invalid);
978        assert_eq!(parse_range_header(b"bytes=", 10), RangeType::None);
979    }
980
981    #[derive(Debug, Eq, PartialEq, Clone)]
982    pub enum RangeType {
983        None,
984        Single(Range<usize>),
985        // TODO: multi-range
986        Invalid,
987    }
988
989    impl RangeType {
990        fn new_single(start: usize, end: usize) -> Self {
991            RangeType::Single(Range { start, end })
992        }
993    }
994
995    // TODO: if-range
996
997    // single range for now
998    pub fn range_header_filter(req: &RequestHeader, resp: &mut ResponseHeader) -> RangeType {
999        // The Range header field is evaluated after evaluating the precondition
1000        // header fields defined in [RFC7232], and only if the result in absence
1001        // of the Range header field would be a 200 (OK) response
1002        if resp.status != StatusCode::OK {
1003            return RangeType::None;
1004        }
1005
1006        // "A server MUST ignore a Range header field received with a request method other than GET."
1007        if req.method != http::Method::GET && req.method != http::Method::HEAD {
1008            return RangeType::None;
1009        }
1010
1011        let Some(range_header) = req.headers.get(RANGE) else {
1012            return RangeType::None;
1013        };
1014
1015        // Content-Length is not required by RFC but it is what nginx does and easier to implement
1016        // with this header present.
1017        let Some(content_length_bytes) = resp.headers.get(CONTENT_LENGTH) else {
1018            return RangeType::None;
1019        };
1020        // bail on invalid content length
1021        let Some(content_length) = parse_number(content_length_bytes.as_bytes()) else {
1022            return RangeType::None;
1023        };
1024
1025        // if-range wants to understand if the Last-Modified / ETag value matches exactly for use
1026        // with resumable downloads.
1027        // https://datatracker.ietf.org/doc/html/rfc9110#name-if-range
1028        // Note that the RFC wants strong validation, and suggests that
1029        // "A valid entity-tag can be distinguished from a valid HTTP-date
1030        // by examining the first three characters for a DQUOTE,"
1031        // but this current etag matching behavior most closely mirrors nginx.
1032        if let Some(if_range) = req.headers.get(IF_RANGE) {
1033            let ir = if_range.as_bytes();
1034            let matches = if ir.len() >= 2 && ir.last() == Some(&b'"') {
1035                resp.headers.get(ETAG).is_some_and(|etag| etag == if_range)
1036            } else if let Some(last_modified) = resp.headers.get(LAST_MODIFIED) {
1037                last_modified == if_range
1038            } else {
1039                false
1040            };
1041            if !matches {
1042                return RangeType::None;
1043            }
1044        }
1045
1046        // TODO: we can also check Accept-Range header from resp. Nginx gives uses the option
1047        // see proxy_force_ranges
1048
1049        let range_type = parse_range_header(range_header.as_bytes(), content_length);
1050
1051        match &range_type {
1052            RangeType::None => { /* nothing to do*/ }
1053            RangeType::Single(r) => {
1054                // 206 response
1055                resp.set_status(StatusCode::PARTIAL_CONTENT).unwrap();
1056                resp.insert_header(&CONTENT_LENGTH, r.end - r.start)
1057                    .unwrap();
1058                resp.insert_header(
1059                    &CONTENT_RANGE,
1060                    format!("bytes {}-{}/{content_length}", r.start, r.end - 1), // range end is inclusive
1061                )
1062                .unwrap()
1063            }
1064            RangeType::Invalid => {
1065                // 416 response
1066                resp.set_status(StatusCode::RANGE_NOT_SATISFIABLE).unwrap();
1067                // empty body for simplicity
1068                resp.insert_header(&CONTENT_LENGTH, HeaderValue::from_static("0"))
1069                    .unwrap();
1070                // TODO: remove other headers like content-encoding
1071                resp.remove_header(&CONTENT_TYPE);
1072                resp.insert_header(&CONTENT_RANGE, format!("bytes */{content_length}"))
1073                    .unwrap()
1074            }
1075        }
1076
1077        range_type
1078    }
1079
1080    #[test]
1081    fn test_range_filter() {
1082        fn gen_req() -> RequestHeader {
1083            RequestHeader::build(http::Method::GET, b"/", Some(1)).unwrap()
1084        }
1085        fn gen_resp() -> ResponseHeader {
1086            let mut resp = ResponseHeader::build(200, Some(1)).unwrap();
1087            resp.append_header("Content-Length", "10").unwrap();
1088            resp
1089        }
1090
1091        // no range
1092        let req = gen_req();
1093        let mut resp = gen_resp();
1094        assert_eq!(RangeType::None, range_header_filter(&req, &mut resp));
1095        assert_eq!(resp.status.as_u16(), 200);
1096
1097        // regular range
1098        let mut req = gen_req();
1099        req.insert_header("Range", "bytes=0-1").unwrap();
1100        let mut resp = gen_resp();
1101        assert_eq!(
1102            RangeType::new_single(0, 2),
1103            range_header_filter(&req, &mut resp)
1104        );
1105        assert_eq!(resp.status.as_u16(), 206);
1106        assert_eq!(resp.headers.get("content-length").unwrap().as_bytes(), b"2");
1107        assert_eq!(
1108            resp.headers.get("content-range").unwrap().as_bytes(),
1109            b"bytes 0-1/10"
1110        );
1111
1112        // bad range
1113        let mut req = gen_req();
1114        req.insert_header("Range", "bytes=1-0").unwrap();
1115        let mut resp = gen_resp();
1116        assert_eq!(RangeType::Invalid, range_header_filter(&req, &mut resp));
1117        assert_eq!(resp.status.as_u16(), 416);
1118        assert_eq!(resp.headers.get("content-length").unwrap().as_bytes(), b"0");
1119        assert_eq!(
1120            resp.headers.get("content-range").unwrap().as_bytes(),
1121            b"bytes */10"
1122        );
1123    }
1124
1125    #[test]
1126    fn test_if_range() {
1127        const DATE: &str = "Fri, 07 Jul 2023 22:03:29 GMT";
1128        const ETAG: &str = "\"1234\"";
1129
1130        fn gen_req() -> RequestHeader {
1131            let mut req = RequestHeader::build(http::Method::GET, b"/", Some(1)).unwrap();
1132            req.append_header("Range", "bytes=0-1").unwrap();
1133            req
1134        }
1135        fn gen_resp() -> ResponseHeader {
1136            let mut resp = ResponseHeader::build(200, Some(1)).unwrap();
1137            resp.append_header("Content-Length", "10").unwrap();
1138            resp.append_header("Last-Modified", DATE).unwrap();
1139            resp.append_header("ETag", ETAG).unwrap();
1140            resp
1141        }
1142
1143        // matching Last-Modified date
1144        let mut req = gen_req();
1145        req.insert_header("If-Range", DATE).unwrap();
1146        let mut resp = gen_resp();
1147        assert_eq!(
1148            RangeType::new_single(0, 2),
1149            range_header_filter(&req, &mut resp)
1150        );
1151
1152        // non-matching date
1153        let mut req = gen_req();
1154        req.insert_header("If-Range", "Fri, 07 Jul 2023 22:03:25 GMT")
1155            .unwrap();
1156        let mut resp = gen_resp();
1157        assert_eq!(RangeType::None, range_header_filter(&req, &mut resp));
1158
1159        // match ETag
1160        let mut req = gen_req();
1161        req.insert_header("If-Range", ETAG).unwrap();
1162        let mut resp = gen_resp();
1163        assert_eq!(
1164            RangeType::new_single(0, 2),
1165            range_header_filter(&req, &mut resp)
1166        );
1167
1168        // non-matching ETags do not result in range
1169        let mut req = gen_req();
1170        req.insert_header("If-Range", "\"4567\"").unwrap();
1171        let mut resp = gen_resp();
1172        assert_eq!(RangeType::None, range_header_filter(&req, &mut resp));
1173
1174        let mut req = gen_req();
1175        req.insert_header("If-Range", "1234").unwrap();
1176        let mut resp = gen_resp();
1177        assert_eq!(RangeType::None, range_header_filter(&req, &mut resp));
1178    }
1179
1180    pub struct RangeBodyFilter {
1181        pub range: RangeType,
1182        current: usize,
1183    }
1184
1185    impl RangeBodyFilter {
1186        pub fn new() -> Self {
1187            RangeBodyFilter {
1188                range: RangeType::None,
1189                current: 0,
1190            }
1191        }
1192
1193        pub fn set(&mut self, range: RangeType) {
1194            self.range = range;
1195        }
1196
1197        pub fn filter_body(&mut self, data: Option<Bytes>) -> Option<Bytes> {
1198            match &self.range {
1199                RangeType::None => data,
1200                RangeType::Invalid => None,
1201                RangeType::Single(r) => {
1202                    let current = self.current;
1203                    self.current += data.as_ref().map_or(0, |d| d.len());
1204                    data.and_then(|d| Self::filter_range_data(r.start, r.end, current, d))
1205                }
1206            }
1207        }
1208
1209        fn filter_range_data(
1210            start: usize,
1211            end: usize,
1212            current: usize,
1213            data: Bytes,
1214        ) -> Option<Bytes> {
1215            if current + data.len() < start || current >= end {
1216                // if the current data is out side the desired range, just drop the data
1217                None
1218            } else if current >= start && current + data.len() <= end {
1219                // all data is within the slice
1220                Some(data)
1221            } else {
1222                // data:  current........current+data.len()
1223                // range: start...........end
1224                let slice_start = start.saturating_sub(current);
1225                let slice_end = std::cmp::min(data.len(), end - current);
1226                Some(data.slice(slice_start..slice_end))
1227            }
1228        }
1229    }
1230
1231    #[test]
1232    fn test_range_body_filter() {
1233        let mut body_filter = RangeBodyFilter::new();
1234        assert_eq!(body_filter.filter_body(Some("123".into())).unwrap(), "123");
1235
1236        let mut body_filter = RangeBodyFilter::new();
1237        body_filter.set(RangeType::Invalid);
1238        assert!(body_filter.filter_body(Some("123".into())).is_none());
1239
1240        let mut body_filter = RangeBodyFilter::new();
1241        body_filter.set(RangeType::new_single(0, 1));
1242        assert_eq!(body_filter.filter_body(Some("012".into())).unwrap(), "0");
1243        assert!(body_filter.filter_body(Some("345".into())).is_none());
1244
1245        let mut body_filter = RangeBodyFilter::new();
1246        body_filter.set(RangeType::new_single(4, 6));
1247        assert!(body_filter.filter_body(Some("012".into())).is_none());
1248        assert_eq!(body_filter.filter_body(Some("345".into())).unwrap(), "45");
1249        assert!(body_filter.filter_body(Some("678".into())).is_none());
1250
1251        let mut body_filter = RangeBodyFilter::new();
1252        body_filter.set(RangeType::new_single(1, 7));
1253        assert_eq!(body_filter.filter_body(Some("012".into())).unwrap(), "12");
1254        assert_eq!(body_filter.filter_body(Some("345".into())).unwrap(), "345");
1255        assert_eq!(body_filter.filter_body(Some("678".into())).unwrap(), "6");
1256    }
1257}
1258
1259// a state machine for proxy logic to tell when to use cache in the case of
1260// miss/revalidation/error.
1261#[derive(Debug)]
1262pub(crate) enum ServeFromCache {
1263    Off,                 // not using cache
1264    CacheHeader,         // should serve cache header
1265    CacheHeaderOnly,     // should serve cache header
1266    CacheBody(bool), // should serve cache body with a bool to indicate if it has already called seek on the hit handler
1267    CacheHeaderMiss, // should serve cache header but upstream response should be admitted to cache
1268    CacheBodyMiss(bool), // should serve cache body but upstream response should be admitted to cache, bool to indicate seek status
1269    Done,                // should serve cache body
1270}
1271
1272impl ServeFromCache {
1273    pub fn new() -> Self {
1274        Self::Off
1275    }
1276
1277    pub fn is_on(&self) -> bool {
1278        !matches!(self, Self::Off)
1279    }
1280
1281    pub fn is_miss(&self) -> bool {
1282        matches!(self, Self::CacheHeaderMiss | Self::CacheBodyMiss(_))
1283    }
1284
1285    pub fn is_miss_header(&self) -> bool {
1286        matches!(self, Self::CacheHeaderMiss)
1287    }
1288
1289    pub fn is_miss_body(&self) -> bool {
1290        matches!(self, Self::CacheBodyMiss(_))
1291    }
1292
1293    pub fn should_discard_upstream(&self) -> bool {
1294        self.is_on() && !self.is_miss()
1295    }
1296
1297    pub fn should_send_to_downstream(&self) -> bool {
1298        !self.is_on()
1299    }
1300
1301    pub fn enable(&mut self) {
1302        *self = Self::CacheHeader;
1303    }
1304
1305    pub fn enable_miss(&mut self) {
1306        if !self.is_on() {
1307            *self = Self::CacheHeaderMiss;
1308        }
1309    }
1310
1311    pub fn enable_header_only(&mut self) {
1312        match self {
1313            Self::CacheBody(_) | Self::CacheBodyMiss(_) => *self = Self::Done, // TODO: make sure no body is read yet
1314            _ => *self = Self::CacheHeaderOnly,
1315        }
1316    }
1317
1318    // This function is (best effort) cancel-safe to be used in select
1319    pub async fn next_http_task(
1320        &mut self,
1321        cache: &mut HttpCache,
1322        range: &mut RangeBodyFilter,
1323    ) -> Result<HttpTask> {
1324        if !cache.enabled() {
1325            // Cache is disabled due to internal error
1326            // TODO: if nothing is sent to eyeball yet, figure out a way to recovery by
1327            // fetching from upstream
1328            return Error::e_explain(InternalError, "Cache disabled");
1329        }
1330        match self {
1331            Self::Off => panic!("ProxyUseCache not enabled"),
1332            Self::CacheHeader => {
1333                *self = Self::CacheBody(true);
1334                Ok(HttpTask::Header(cache_hit_header(cache), false)) // false for now
1335            }
1336            Self::CacheHeaderMiss => {
1337                *self = Self::CacheBodyMiss(true);
1338                Ok(HttpTask::Header(cache_hit_header(cache), false)) // false for now
1339            }
1340            Self::CacheHeaderOnly => {
1341                *self = Self::Done;
1342                Ok(HttpTask::Header(cache_hit_header(cache), true))
1343            }
1344            Self::CacheBody(should_seek) => {
1345                if *should_seek {
1346                    self.maybe_seek_hit_handler(cache, range)?;
1347                }
1348                if let Some(b) = cache.hit_handler().read_body().await? {
1349                    Ok(HttpTask::Body(Some(b), false)) // false for now
1350                } else {
1351                    *self = Self::Done;
1352                    Ok(HttpTask::Done)
1353                }
1354            }
1355            Self::CacheBodyMiss(should_seek) => {
1356                if *should_seek {
1357                    self.maybe_seek_miss_handler(cache, range)?;
1358                }
1359                // safety: called of enable_miss() call it only if the async_body_reader exist
1360                if let Some(b) = cache.miss_body_reader().unwrap().read_body().await? {
1361                    Ok(HttpTask::Body(Some(b), false)) // false for now
1362                } else {
1363                    *self = Self::Done;
1364                    Ok(HttpTask::Done)
1365                }
1366            }
1367            Self::Done => Ok(HttpTask::Done),
1368        }
1369    }
1370
1371    fn maybe_seek_miss_handler(
1372        &mut self,
1373        cache: &mut HttpCache,
1374        range_filter: &mut RangeBodyFilter,
1375    ) -> Result<()> {
1376        if let RangeType::Single(range) = &range_filter.range {
1377            // safety: called only if the async_body_reader exists
1378            if cache.miss_body_reader().unwrap().can_seek() {
1379                cache
1380                    .miss_body_reader()
1381                    // safety: called only if the async_body_reader exists
1382                    .unwrap()
1383                    .seek(range.start, Some(range.end))
1384                    .or_err(InternalError, "cannot seek miss handler")?;
1385                // Because the miss body reader is seeking, we no longer need the
1386                // RangeBodyFilter's help to return the requested byte range.
1387                range_filter.range = RangeType::None;
1388            }
1389        }
1390        *self = Self::CacheBodyMiss(false);
1391        Ok(())
1392    }
1393
1394    fn maybe_seek_hit_handler(
1395        &mut self,
1396        cache: &mut HttpCache,
1397        range_filter: &mut RangeBodyFilter,
1398    ) -> Result<()> {
1399        if let RangeType::Single(range) = &range_filter.range {
1400            if cache.hit_handler().can_seek() {
1401                cache
1402                    .hit_handler()
1403                    .seek(range.start, Some(range.end))
1404                    .or_err(InternalError, "cannot seek hit handler")?;
1405                // Because the hit handler is seeking, we no longer need the
1406                // RangeBodyFilter's help to return the requested byte range.
1407                range_filter.range = RangeType::None;
1408            }
1409        }
1410        *self = Self::CacheBody(false);
1411        Ok(())
1412    }
1413}