Skip to main content

tower_http_cache/
streaming.rs

1//! Smart streaming and large file handling for tower-http-cache.
2//!
3//! This module provides intelligent body size detection and content-type based
4//! filtering to prevent large files (PDFs, videos, archives) from overwhelming
5//! the cache. Large bodies are automatically streamed through without buffering,
6//! preserving memory and cache efficiency.
7
8use std::collections::HashSet;
9
10use crate::range::RangeHandling;
11
12/// Decision on how to handle a response body based on size and content type.
13#[derive(Debug, Clone, Copy, PartialEq, Eq)]
14#[allow(clippy::manual_non_exhaustive)]
15pub enum StreamingDecision {
16    /// Buffer the body and cache it (small enough, appropriate type)
17    Buffer,
18
19    /// Skip caching entirely (too large or excluded content type)
20    SkipCache,
21
22    /// Stream the body without buffering (reserved for future use).
23    #[doc(hidden)]
24    StreamThrough,
25
26    /// Try to stream if possible, fallback to buffer (unknown size)
27    StreamIfPossible,
28}
29
30/// Configuration for smart streaming and large file handling.
31///
32/// The streaming policy allows you to configure how the cache handles
33/// large response bodies and specific content types. This prevents memory
34/// exhaustion and cache pollution from large files.
35///
36/// # Examples
37///
38/// ```
39/// use tower_http_cache::streaming::StreamingPolicy;
40/// use tower_http_cache::range::RangeHandling;
41/// use std::collections::HashSet;
42///
43/// let policy = StreamingPolicy {
44///     enabled: true,
45///     max_cacheable_size: Some(1024 * 1024), // 1MB
46///     excluded_content_types: HashSet::from([
47///         "application/pdf".to_string(),
48///         "video/*".to_string(),
49///     ]),
50///     range_handling: RangeHandling::PassThrough,
51///     ..Default::default()
52/// };
53/// ```
54#[derive(Debug, Clone)]
55pub struct StreamingPolicy {
56    /// Enable smart streaming (default: true)
57    pub enabled: bool,
58
59    /// Skip caching bodies larger than this (default: 1MB)
60    /// Set to None to disable size-based filtering
61    pub max_cacheable_size: Option<usize>,
62
63    /// Content types to never cache (PDFs, videos, archives, etc.)
64    /// Supports wildcards like "video/*"
65    pub excluded_content_types: HashSet<String>,
66
67    /// Bypass content-type exclusions for these types (size limits still apply).
68    /// Useful for API responses that should be cached even if they match an exclusion pattern.
69    pub force_cache_content_types: HashSet<String>,
70
71    /// Use streaming for bodies above this size (default: 512KB)
72    /// Currently used for decision making; actual streaming not yet implemented
73    pub stream_threshold: usize,
74
75    /// How to handle HTTP Range requests (default: PassThrough)
76    pub range_handling: RangeHandling,
77
78    /// Enable chunk caching for large files (default: false, opt-in)
79    ///
80    /// When enabled, large files are split into chunks and cached separately,
81    /// allowing efficient range request handling without caching the entire file.
82    pub enable_chunk_cache: bool,
83
84    /// Chunk size for large files (default: 1MB)
85    ///
86    /// Files are split into chunks of this size when chunk caching is enabled.
87    /// Larger chunks use less memory overhead but may waste bandwidth for small
88    /// range requests. Smaller chunks are more granular but have more overhead.
89    pub chunk_size: usize,
90
91    /// Minimum file size for chunking (default: 10MB)
92    ///
93    /// Files smaller than this will not be chunked even if chunk caching is enabled.
94    /// This prevents overhead for files that don't benefit from chunking.
95    pub min_chunk_file_size: u64,
96}
97
98impl Default for StreamingPolicy {
99    fn default() -> Self {
100        Self {
101            enabled: true,
102            max_cacheable_size: Some(1024 * 1024), // 1MB
103            excluded_content_types: HashSet::from([
104                "application/pdf".to_string(),
105                "video/*".to_string(),
106                "audio/*".to_string(),
107                "application/zip".to_string(),
108                "application/x-rar".to_string(),
109                "application/x-tar".to_string(),
110                "application/gzip".to_string(),
111                "application/x-7z-compressed".to_string(),
112                "application/octet-stream".to_string(),
113            ]),
114            force_cache_content_types: HashSet::from([
115                "application/json".to_string(),
116                "application/xml".to_string(),
117                "text/*".to_string(),
118            ]),
119            stream_threshold: 512 * 1024, // 512KB
120            range_handling: RangeHandling::default(),
121            enable_chunk_cache: false,             // Opt-in feature
122            chunk_size: 1024 * 1024,               // 1MB chunks
123            min_chunk_file_size: 10 * 1024 * 1024, // 10MB minimum
124        }
125    }
126}
127
128impl StreamingPolicy {
129    /// Creates a new streaming policy with all features disabled.
130    /// Useful for gradually migrating existing code.
131    pub fn disabled() -> Self {
132        Self {
133            enabled: false,
134            max_cacheable_size: None,
135            excluded_content_types: HashSet::new(),
136            force_cache_content_types: HashSet::new(),
137            stream_threshold: usize::MAX,
138            range_handling: RangeHandling::PassThrough,
139            enable_chunk_cache: false,
140            chunk_size: 1024 * 1024,
141            min_chunk_file_size: 0,
142        }
143    }
144
145    /// Creates a streaming policy that only filters by size.
146    pub fn size_only(max_size: usize) -> Self {
147        Self {
148            enabled: true,
149            max_cacheable_size: Some(max_size),
150            excluded_content_types: HashSet::new(),
151            force_cache_content_types: HashSet::new(),
152            stream_threshold: max_size,
153            range_handling: RangeHandling::PassThrough,
154            enable_chunk_cache: false,
155            chunk_size: 1024 * 1024,
156            min_chunk_file_size: 0,
157        }
158    }
159
160    /// Creates a streaming policy that only filters by content type.
161    pub fn content_type_only(excluded: HashSet<String>) -> Self {
162        Self {
163            enabled: true,
164            max_cacheable_size: None,
165            excluded_content_types: excluded,
166            force_cache_content_types: HashSet::new(),
167            stream_threshold: usize::MAX,
168            range_handling: RangeHandling::PassThrough,
169            enable_chunk_cache: false,
170            chunk_size: 1024 * 1024,
171            min_chunk_file_size: 0,
172        }
173    }
174}
175
176/// Determines how to handle a response body based on size hints and content type.
177///
178/// This function implements the core streaming decision logic by examining:
179/// 1. Whether streaming is enabled in the policy
180/// 2. Content-Type header (fast path for exclusions/forced caching)
181/// 3. Size hints from the body
182/// 4. Content-Length header as fallback
183///
184/// # Arguments
185///
186/// * `policy` - The streaming policy configuration
187/// * `size_hint` - Size hint from the response body
188/// * `content_type` - Content-Type header value, if present
189/// * `content_length` - Content-Length header value, if present
190///
191/// # Returns
192///
193/// A [`StreamingDecision`] indicating how to handle the body.
194pub fn should_stream(
195    policy: &StreamingPolicy,
196    size_hint: &http_body::SizeHint,
197    content_type: Option<&str>,
198    content_length: Option<u64>,
199) -> StreamingDecision {
200    // Check if streaming is disabled
201    if !policy.enabled {
202        return StreamingDecision::Buffer;
203    }
204
205    // Check content type first (fast path)
206    let is_forced = if let Some(ct) = content_type {
207        // Check if content type is excluded
208        if is_excluded_content_type(ct, &policy.excluded_content_types) {
209            return StreamingDecision::SkipCache;
210        }
211
212        // Check if content type is forced to cache
213        is_forced_content_type(ct, &policy.force_cache_content_types)
214    } else {
215        false
216    };
217
218    // Check size hints - size limits apply even to forced types
219    if let Some(exact_size) = size_hint.exact() {
220        return decide_by_size(exact_size as usize, policy, is_forced);
221    }
222
223    if let Some(upper_bound) = size_hint.upper() {
224        return decide_by_size(upper_bound as usize, policy, is_forced);
225    }
226
227    // Fallback to Content-Length header
228    if let Some(content_len) = content_length {
229        return decide_by_size(content_len as usize, policy, is_forced);
230    }
231
232    // Unknown size - use conservative approach
233    // If we have no size information, buffer it (existing behavior)
234    StreamingDecision::StreamIfPossible
235}
236
237/// Decides caching strategy based on body size.
238///
239/// The `is_forced` parameter indicates if the content-type is in force_cache list,
240/// but size limits still apply regardless.
241fn decide_by_size(size: usize, policy: &StreamingPolicy, _is_forced: bool) -> StreamingDecision {
242    if let Some(max_size) = policy.max_cacheable_size {
243        if size > max_size {
244            return StreamingDecision::SkipCache;
245        }
246    }
247
248    // For now, always buffer if size is acceptable
249    // Future: return StreamingDecision::StreamThrough for large-but-cacheable bodies
250    StreamingDecision::Buffer
251}
252
253/// Checks if a content type matches any excluded patterns.
254///
255/// Supports exact matches and wildcard patterns like "video/*".
256fn is_excluded_content_type(content_type: &str, excluded: &HashSet<String>) -> bool {
257    let normalized = content_type.to_lowercase();
258
259    for pattern in excluded {
260        if matches_pattern(&normalized, pattern) {
261            return true;
262        }
263    }
264
265    false
266}
267
268/// Checks if a content type matches any forced cache patterns.
269fn is_forced_content_type(content_type: &str, forced: &HashSet<String>) -> bool {
270    let normalized = content_type.to_lowercase();
271
272    for pattern in forced {
273        if matches_pattern(&normalized, pattern) {
274            return true;
275        }
276    }
277
278    false
279}
280
281/// Matches a content type against a pattern (supports wildcards).
282///
283/// For non-wildcard patterns, matches if the content type equals the pattern
284/// or starts with it followed by a semicolon (to handle parameters like `; charset=utf-8`).
285fn matches_pattern(content_type: &str, pattern: &str) -> bool {
286    let pattern_lower = pattern.to_lowercase();
287
288    if pattern_lower.ends_with("/*") {
289        // Wildcard pattern like "video/*"
290        let prefix = &pattern_lower[..pattern_lower.len() - 2];
291        content_type.starts_with(prefix)
292    } else {
293        // Exact match (with optional parameter suffix)
294        content_type == pattern_lower
295            || content_type.starts_with(&format!("{};", pattern_lower))
296    }
297}
298
299/// Extracts size information from various sources for logging/metrics.
300pub fn extract_size_info(
301    size_hint: &http_body::SizeHint,
302    content_length: Option<u64>,
303) -> Option<u64> {
304    size_hint
305        .exact()
306        .or_else(|| size_hint.upper())
307        .or(content_length)
308}
309
310#[cfg(test)]
311mod tests {
312    use super::*;
313    use http_body::SizeHint;
314
315    #[test]
316    fn test_default_policy_excludes_pdf() {
317        let policy = StreamingPolicy::default();
318        let size_hint = SizeHint::with_exact(5 * 1024 * 1024); // 5MB
319
320        let decision = should_stream(
321            &policy,
322            &size_hint,
323            Some("application/pdf"),
324            Some(5 * 1024 * 1024),
325        );
326
327        assert_eq!(decision, StreamingDecision::SkipCache);
328    }
329
330    #[test]
331    fn test_default_policy_excludes_video() {
332        let policy = StreamingPolicy::default();
333        let size_hint = SizeHint::with_exact(10 * 1024 * 1024);
334
335        let decision = should_stream(&policy, &size_hint, Some("video/mp4"), None);
336
337        assert_eq!(decision, StreamingDecision::SkipCache);
338    }
339
340    #[test]
341    fn test_small_json_gets_buffered() {
342        let policy = StreamingPolicy::default();
343        let size_hint = SizeHint::with_exact(1024); // 1KB
344
345        let decision = should_stream(&policy, &size_hint, Some("application/json"), Some(1024));
346
347        assert_eq!(decision, StreamingDecision::Buffer);
348    }
349
350    #[test]
351    fn test_large_json_skipped_by_size() {
352        let policy = StreamingPolicy::default();
353        let size_hint = SizeHint::with_exact(2 * 1024 * 1024); // 2MB
354
355        let decision = should_stream(
356            &policy,
357            &size_hint,
358            Some("application/json"),
359            Some(2 * 1024 * 1024),
360        );
361
362        assert_eq!(decision, StreamingDecision::SkipCache);
363    }
364
365    #[test]
366    fn test_force_cache_respects_size_limits() {
367        // force_cache only bypasses content-type exclusions, not size limits
368        let mut policy = StreamingPolicy::default();
369        policy
370            .force_cache_content_types
371            .insert("application/important".to_string());
372
373        let size_hint = SizeHint::with_exact(5 * 1024 * 1024); // 5MB (over 1MB limit)
374
375        let decision = should_stream(
376            &policy,
377            &size_hint,
378            Some("application/important"),
379            Some(5 * 1024 * 1024),
380        );
381
382        // Even though it's forced to cache, size limit still applies
383        assert_eq!(decision, StreamingDecision::SkipCache);
384
385        // But under the size limit it should cache
386        let small_hint = SizeHint::with_exact(500 * 1024); // 500KB
387        let decision_small = should_stream(
388            &policy,
389            &small_hint,
390            Some("application/important"),
391            Some(500 * 1024),
392        );
393        assert_eq!(decision_small, StreamingDecision::Buffer);
394    }
395
396    #[test]
397    fn test_disabled_policy_always_buffers() {
398        let policy = StreamingPolicy::disabled();
399        let size_hint = SizeHint::with_exact(10 * 1024 * 1024);
400
401        let decision = should_stream(
402            &policy,
403            &size_hint,
404            Some("application/pdf"),
405            Some(10 * 1024 * 1024),
406        );
407
408        assert_eq!(decision, StreamingDecision::Buffer);
409    }
410
411    #[test]
412    fn test_wildcard_pattern_matching() {
413        assert!(matches_pattern("video/mp4", "video/*"));
414        assert!(matches_pattern("video/mpeg", "video/*"));
415        assert!(matches_pattern("audio/mp3", "audio/*"));
416        assert!(!matches_pattern("application/json", "video/*"));
417    }
418
419    #[test]
420    fn test_exact_pattern_matching() {
421        assert!(matches_pattern("application/pdf", "application/pdf"));
422        assert!(!matches_pattern("application/pdf", "pdf")); // no substring match
423        assert!(!matches_pattern("text/plain", "application/pdf"));
424        // Matches with parameter suffix
425        assert!(matches_pattern(
426            "application/json; charset=utf-8",
427            "application/json"
428        ));
429    }
430
431    #[test]
432    fn test_size_hint_exact() {
433        let policy = StreamingPolicy::default();
434        let size_hint = SizeHint::with_exact(500 * 1024); // 500KB
435
436        let decision = should_stream(&policy, &size_hint, None, None);
437        assert_eq!(decision, StreamingDecision::Buffer);
438    }
439
440    #[test]
441    fn test_size_hint_upper_bound() {
442        let policy = StreamingPolicy::default();
443        let mut size_hint = SizeHint::default();
444        size_hint.set_upper(500 * 1024);
445
446        let decision = should_stream(&policy, &size_hint, None, None);
447        assert_eq!(decision, StreamingDecision::Buffer);
448    }
449
450    #[test]
451    fn test_content_length_fallback() {
452        let policy = StreamingPolicy::default();
453        let size_hint = SizeHint::default(); // No hints
454
455        let decision = should_stream(&policy, &size_hint, None, Some(500 * 1024));
456        assert_eq!(decision, StreamingDecision::Buffer);
457    }
458
459    #[test]
460    fn test_unknown_size_conservative() {
461        let policy = StreamingPolicy::default();
462        let size_hint = SizeHint::default();
463
464        let decision = should_stream(&policy, &size_hint, None, None);
465        assert_eq!(decision, StreamingDecision::StreamIfPossible);
466    }
467
468    #[test]
469    fn test_size_only_policy() {
470        let policy = StreamingPolicy::size_only(512 * 1024);
471        let size_hint = SizeHint::with_exact(1024 * 1024);
472
473        // PDF should be cached (no content-type filtering)
474        let decision = should_stream(&policy, &size_hint, Some("application/pdf"), None);
475        assert_eq!(decision, StreamingDecision::SkipCache); // Due to size
476
477        let size_hint_small = SizeHint::with_exact(256 * 1024);
478        let decision_small =
479            should_stream(&policy, &size_hint_small, Some("application/pdf"), None);
480        assert_eq!(decision_small, StreamingDecision::Buffer); // Small enough
481    }
482
483    #[test]
484    fn test_content_type_only_policy() {
485        let mut excluded = HashSet::new();
486        excluded.insert("application/pdf".to_string());
487        let policy = StreamingPolicy::content_type_only(excluded);
488
489        let size_hint = SizeHint::with_exact(10 * 1024 * 1024); // 10MB
490
491        // Large non-PDF should be cached (no size filtering)
492        let decision = should_stream(&policy, &size_hint, Some("application/json"), None);
493        assert_eq!(decision, StreamingDecision::Buffer);
494
495        // PDF should be skipped
496        let decision_pdf = should_stream(&policy, &size_hint, Some("application/pdf"), None);
497        assert_eq!(decision_pdf, StreamingDecision::SkipCache);
498    }
499
500    #[test]
501    fn test_extract_size_info() {
502        let size_hint = SizeHint::with_exact(1024);
503        assert_eq!(extract_size_info(&size_hint, None), Some(1024));
504
505        let mut size_hint_upper = SizeHint::default();
506        size_hint_upper.set_upper(2048);
507        assert_eq!(extract_size_info(&size_hint_upper, None), Some(2048));
508
509        let size_hint_none = SizeHint::default();
510        assert_eq!(extract_size_info(&size_hint_none, Some(4096)), Some(4096));
511
512        assert_eq!(extract_size_info(&size_hint_none, None), None);
513    }
514
515    #[test]
516    fn test_case_insensitive_content_type() {
517        let policy = StreamingPolicy::default();
518        let size_hint = SizeHint::with_exact(1024);
519
520        // All variations should be excluded
521        assert_eq!(
522            should_stream(&policy, &size_hint, Some("Application/PDF"), None),
523            StreamingDecision::SkipCache
524        );
525        assert_eq!(
526            should_stream(&policy, &size_hint, Some("APPLICATION/PDF"), None),
527            StreamingDecision::SkipCache
528        );
529        assert_eq!(
530            should_stream(&policy, &size_hint, Some("Video/MP4"), None),
531            StreamingDecision::SkipCache
532        );
533    }
534
535    #[test]
536    fn test_content_type_with_charset() {
537        let policy = StreamingPolicy::default();
538        let size_hint = SizeHint::with_exact(1024);
539
540        // Should match even with charset parameter
541        let decision = should_stream(
542            &policy,
543            &size_hint,
544            Some("application/json; charset=utf-8"),
545            None,
546        );
547        assert_eq!(decision, StreamingDecision::Buffer);
548    }
549}