tower_http_cache/
streaming.rs

1//! Smart streaming and large file handling for tower-http-cache.
2//!
3//! This module provides intelligent body size detection and content-type based
4//! filtering to prevent large files (PDFs, videos, archives) from overwhelming
5//! the cache. Large bodies are automatically streamed through without buffering,
6//! preserving memory and cache efficiency.
7
8use std::collections::HashSet;
9
10use crate::range::RangeHandling;
11
12/// Decision on how to handle a response body based on size and content type.
13#[derive(Debug, Clone, Copy, PartialEq, Eq)]
14pub enum StreamingDecision {
15    /// Buffer the body and cache it (small enough, appropriate type)
16    Buffer,
17
18    /// Skip caching entirely (too large or excluded content type)
19    SkipCache,
20
21    /// Stream the body without buffering (not implemented yet)
22    StreamThrough,
23
24    /// Try to stream if possible, fallback to buffer (unknown size)
25    StreamIfPossible,
26}
27
28/// Configuration for smart streaming and large file handling.
29///
30/// The streaming policy allows you to configure how the cache handles
31/// large response bodies and specific content types. This prevents memory
32/// exhaustion and cache pollution from large files.
33///
34/// # Examples
35///
36/// ```
37/// use tower_http_cache::streaming::StreamingPolicy;
38/// use tower_http_cache::range::RangeHandling;
39/// use std::collections::HashSet;
40///
41/// let policy = StreamingPolicy {
42///     enabled: true,
43///     max_cacheable_size: Some(1024 * 1024), // 1MB
44///     excluded_content_types: HashSet::from([
45///         "application/pdf".to_string(),
46///         "video/*".to_string(),
47///     ]),
48///     range_handling: RangeHandling::PassThrough,
49///     ..Default::default()
50/// };
51/// ```
52#[derive(Debug, Clone)]
53pub struct StreamingPolicy {
54    /// Enable smart streaming (default: true)
55    pub enabled: bool,
56
57    /// Skip caching bodies larger than this (default: 1MB)
58    /// Set to None to disable size-based filtering
59    pub max_cacheable_size: Option<usize>,
60
61    /// Content types to never cache (PDFs, videos, archives, etc.)
62    /// Supports wildcards like "video/*"
63    pub excluded_content_types: HashSet<String>,
64
65    /// Always cache these content types regardless of size
66    /// Useful for API responses that should always be cached
67    pub force_cache_content_types: HashSet<String>,
68
69    /// Use streaming for bodies above this size (default: 512KB)
70    /// Currently used for decision making; actual streaming not yet implemented
71    pub stream_threshold: usize,
72
73    /// How to handle HTTP Range requests (default: PassThrough)
74    pub range_handling: RangeHandling,
75
76    /// Enable chunk caching for large files (default: false, opt-in)
77    ///
78    /// When enabled, large files are split into chunks and cached separately,
79    /// allowing efficient range request handling without caching the entire file.
80    pub enable_chunk_cache: bool,
81
82    /// Chunk size for large files (default: 1MB)
83    ///
84    /// Files are split into chunks of this size when chunk caching is enabled.
85    /// Larger chunks use less memory overhead but may waste bandwidth for small
86    /// range requests. Smaller chunks are more granular but have more overhead.
87    pub chunk_size: usize,
88
89    /// Minimum file size for chunking (default: 10MB)
90    ///
91    /// Files smaller than this will not be chunked even if chunk caching is enabled.
92    /// This prevents overhead for files that don't benefit from chunking.
93    pub min_chunk_file_size: u64,
94}
95
96impl Default for StreamingPolicy {
97    fn default() -> Self {
98        Self {
99            enabled: true,
100            max_cacheable_size: Some(1024 * 1024), // 1MB
101            excluded_content_types: HashSet::from([
102                "application/pdf".to_string(),
103                "video/*".to_string(),
104                "audio/*".to_string(),
105                "application/zip".to_string(),
106                "application/x-rar".to_string(),
107                "application/x-tar".to_string(),
108                "application/gzip".to_string(),
109                "application/x-7z-compressed".to_string(),
110                "application/octet-stream".to_string(),
111            ]),
112            force_cache_content_types: HashSet::from([
113                "application/json".to_string(),
114                "application/xml".to_string(),
115                "text/*".to_string(),
116            ]),
117            stream_threshold: 512 * 1024, // 512KB
118            range_handling: RangeHandling::default(),
119            enable_chunk_cache: false,             // Opt-in feature
120            chunk_size: 1024 * 1024,               // 1MB chunks
121            min_chunk_file_size: 10 * 1024 * 1024, // 10MB minimum
122        }
123    }
124}
125
126impl StreamingPolicy {
127    /// Creates a new streaming policy with all features disabled.
128    /// Useful for gradually migrating existing code.
129    pub fn disabled() -> Self {
130        Self {
131            enabled: false,
132            max_cacheable_size: None,
133            excluded_content_types: HashSet::new(),
134            force_cache_content_types: HashSet::new(),
135            stream_threshold: usize::MAX,
136            range_handling: RangeHandling::PassThrough,
137            enable_chunk_cache: false,
138            chunk_size: 1024 * 1024,
139            min_chunk_file_size: 0,
140        }
141    }
142
143    /// Creates a streaming policy that only filters by size.
144    pub fn size_only(max_size: usize) -> Self {
145        Self {
146            enabled: true,
147            max_cacheable_size: Some(max_size),
148            excluded_content_types: HashSet::new(),
149            force_cache_content_types: HashSet::new(),
150            stream_threshold: max_size,
151            range_handling: RangeHandling::PassThrough,
152            enable_chunk_cache: false,
153            chunk_size: 1024 * 1024,
154            min_chunk_file_size: 0,
155        }
156    }
157
158    /// Creates a streaming policy that only filters by content type.
159    pub fn content_type_only(excluded: HashSet<String>) -> Self {
160        Self {
161            enabled: true,
162            max_cacheable_size: None,
163            excluded_content_types: excluded,
164            force_cache_content_types: HashSet::new(),
165            stream_threshold: usize::MAX,
166            range_handling: RangeHandling::PassThrough,
167            enable_chunk_cache: false,
168            chunk_size: 1024 * 1024,
169            min_chunk_file_size: 0,
170        }
171    }
172}
173
174/// Determines how to handle a response body based on size hints and content type.
175///
176/// This function implements the core streaming decision logic by examining:
177/// 1. Whether streaming is enabled in the policy
178/// 2. Content-Type header (fast path for exclusions/forced caching)
179/// 3. Size hints from the body
180/// 4. Content-Length header as fallback
181///
182/// # Arguments
183///
184/// * `policy` - The streaming policy configuration
185/// * `size_hint` - Size hint from the response body
186/// * `content_type` - Content-Type header value, if present
187/// * `content_length` - Content-Length header value, if present
188///
189/// # Returns
190///
191/// A [`StreamingDecision`] indicating how to handle the body.
192pub fn should_stream(
193    policy: &StreamingPolicy,
194    size_hint: &http_body::SizeHint,
195    content_type: Option<&str>,
196    content_length: Option<u64>,
197) -> StreamingDecision {
198    // Check if streaming is disabled
199    if !policy.enabled {
200        return StreamingDecision::Buffer;
201    }
202
203    // Check content type first (fast path)
204    let is_forced = if let Some(ct) = content_type {
205        // Check if content type is excluded
206        if is_excluded_content_type(ct, &policy.excluded_content_types) {
207            return StreamingDecision::SkipCache;
208        }
209
210        // Check if content type is forced to cache
211        is_forced_content_type(ct, &policy.force_cache_content_types)
212    } else {
213        false
214    };
215
216    // Check size hints - size limits apply even to forced types
217    if let Some(exact_size) = size_hint.exact() {
218        return decide_by_size(exact_size as usize, policy, is_forced);
219    }
220
221    if let Some(upper_bound) = size_hint.upper() {
222        return decide_by_size(upper_bound as usize, policy, is_forced);
223    }
224
225    // Fallback to Content-Length header
226    if let Some(content_len) = content_length {
227        return decide_by_size(content_len as usize, policy, is_forced);
228    }
229
230    // Unknown size - use conservative approach
231    // If we have no size information, buffer it (existing behavior)
232    StreamingDecision::StreamIfPossible
233}
234
235/// Decides caching strategy based on body size.
236///
237/// The `is_forced` parameter indicates if the content-type is in force_cache list,
238/// but size limits still apply regardless.
239fn decide_by_size(size: usize, policy: &StreamingPolicy, _is_forced: bool) -> StreamingDecision {
240    if let Some(max_size) = policy.max_cacheable_size {
241        if size > max_size {
242            return StreamingDecision::SkipCache;
243        }
244    }
245
246    // For now, always buffer if size is acceptable
247    // Future: return StreamingDecision::StreamThrough for large-but-cacheable bodies
248    StreamingDecision::Buffer
249}
250
251/// Checks if a content type matches any excluded patterns.
252///
253/// Supports exact matches and wildcard patterns like "video/*".
254fn is_excluded_content_type(content_type: &str, excluded: &HashSet<String>) -> bool {
255    let normalized = content_type.to_lowercase();
256
257    for pattern in excluded {
258        if matches_pattern(&normalized, pattern) {
259            return true;
260        }
261    }
262
263    false
264}
265
266/// Checks if a content type matches any forced cache patterns.
267fn is_forced_content_type(content_type: &str, forced: &HashSet<String>) -> bool {
268    let normalized = content_type.to_lowercase();
269
270    for pattern in forced {
271        if matches_pattern(&normalized, pattern) {
272            return true;
273        }
274    }
275
276    false
277}
278
279/// Matches a content type against a pattern (supports wildcards).
280fn matches_pattern(content_type: &str, pattern: &str) -> bool {
281    let pattern_lower = pattern.to_lowercase();
282
283    if pattern_lower.ends_with("/*") {
284        // Wildcard pattern like "video/*"
285        let prefix = &pattern_lower[..pattern_lower.len() - 2];
286        content_type.starts_with(prefix)
287    } else {
288        // Exact match or substring match
289        content_type.contains(&pattern_lower)
290    }
291}
292
293/// Extracts size information from various sources for logging/metrics.
294pub fn extract_size_info(
295    size_hint: &http_body::SizeHint,
296    content_length: Option<u64>,
297) -> Option<u64> {
298    size_hint
299        .exact()
300        .or_else(|| size_hint.upper())
301        .or(content_length)
302}
303
304#[cfg(test)]
305mod tests {
306    use super::*;
307    use http_body::SizeHint;
308
309    #[test]
310    fn test_default_policy_excludes_pdf() {
311        let policy = StreamingPolicy::default();
312        let size_hint = SizeHint::with_exact(5 * 1024 * 1024); // 5MB
313
314        let decision = should_stream(
315            &policy,
316            &size_hint,
317            Some("application/pdf"),
318            Some(5 * 1024 * 1024),
319        );
320
321        assert_eq!(decision, StreamingDecision::SkipCache);
322    }
323
324    #[test]
325    fn test_default_policy_excludes_video() {
326        let policy = StreamingPolicy::default();
327        let size_hint = SizeHint::with_exact(10 * 1024 * 1024);
328
329        let decision = should_stream(&policy, &size_hint, Some("video/mp4"), None);
330
331        assert_eq!(decision, StreamingDecision::SkipCache);
332    }
333
334    #[test]
335    fn test_small_json_gets_buffered() {
336        let policy = StreamingPolicy::default();
337        let size_hint = SizeHint::with_exact(1024); // 1KB
338
339        let decision = should_stream(&policy, &size_hint, Some("application/json"), Some(1024));
340
341        assert_eq!(decision, StreamingDecision::Buffer);
342    }
343
344    #[test]
345    fn test_large_json_skipped_by_size() {
346        let policy = StreamingPolicy::default();
347        let size_hint = SizeHint::with_exact(2 * 1024 * 1024); // 2MB
348
349        let decision = should_stream(
350            &policy,
351            &size_hint,
352            Some("application/json"),
353            Some(2 * 1024 * 1024),
354        );
355
356        assert_eq!(decision, StreamingDecision::SkipCache);
357    }
358
359    #[test]
360    fn test_force_cache_respects_size_limits() {
361        // force_cache only bypasses content-type exclusions, not size limits
362        let mut policy = StreamingPolicy::default();
363        policy
364            .force_cache_content_types
365            .insert("application/important".to_string());
366
367        let size_hint = SizeHint::with_exact(5 * 1024 * 1024); // 5MB (over 1MB limit)
368
369        let decision = should_stream(
370            &policy,
371            &size_hint,
372            Some("application/important"),
373            Some(5 * 1024 * 1024),
374        );
375
376        // Even though it's forced to cache, size limit still applies
377        assert_eq!(decision, StreamingDecision::SkipCache);
378
379        // But under the size limit it should cache
380        let small_hint = SizeHint::with_exact(500 * 1024); // 500KB
381        let decision_small = should_stream(
382            &policy,
383            &small_hint,
384            Some("application/important"),
385            Some(500 * 1024),
386        );
387        assert_eq!(decision_small, StreamingDecision::Buffer);
388    }
389
390    #[test]
391    fn test_disabled_policy_always_buffers() {
392        let policy = StreamingPolicy::disabled();
393        let size_hint = SizeHint::with_exact(10 * 1024 * 1024);
394
395        let decision = should_stream(
396            &policy,
397            &size_hint,
398            Some("application/pdf"),
399            Some(10 * 1024 * 1024),
400        );
401
402        assert_eq!(decision, StreamingDecision::Buffer);
403    }
404
405    #[test]
406    fn test_wildcard_pattern_matching() {
407        assert!(matches_pattern("video/mp4", "video/*"));
408        assert!(matches_pattern("video/mpeg", "video/*"));
409        assert!(matches_pattern("audio/mp3", "audio/*"));
410        assert!(!matches_pattern("application/json", "video/*"));
411    }
412
413    #[test]
414    fn test_exact_pattern_matching() {
415        assert!(matches_pattern("application/pdf", "application/pdf"));
416        assert!(matches_pattern("application/pdf", "pdf")); // substring match
417        assert!(!matches_pattern("text/plain", "application/pdf"));
418    }
419
420    #[test]
421    fn test_size_hint_exact() {
422        let policy = StreamingPolicy::default();
423        let size_hint = SizeHint::with_exact(500 * 1024); // 500KB
424
425        let decision = should_stream(&policy, &size_hint, None, None);
426        assert_eq!(decision, StreamingDecision::Buffer);
427    }
428
429    #[test]
430    fn test_size_hint_upper_bound() {
431        let policy = StreamingPolicy::default();
432        let mut size_hint = SizeHint::default();
433        size_hint.set_upper(500 * 1024);
434
435        let decision = should_stream(&policy, &size_hint, None, None);
436        assert_eq!(decision, StreamingDecision::Buffer);
437    }
438
439    #[test]
440    fn test_content_length_fallback() {
441        let policy = StreamingPolicy::default();
442        let size_hint = SizeHint::default(); // No hints
443
444        let decision = should_stream(&policy, &size_hint, None, Some(500 * 1024));
445        assert_eq!(decision, StreamingDecision::Buffer);
446    }
447
448    #[test]
449    fn test_unknown_size_conservative() {
450        let policy = StreamingPolicy::default();
451        let size_hint = SizeHint::default();
452
453        let decision = should_stream(&policy, &size_hint, None, None);
454        assert_eq!(decision, StreamingDecision::StreamIfPossible);
455    }
456
457    #[test]
458    fn test_size_only_policy() {
459        let policy = StreamingPolicy::size_only(512 * 1024);
460        let size_hint = SizeHint::with_exact(1024 * 1024);
461
462        // PDF should be cached (no content-type filtering)
463        let decision = should_stream(&policy, &size_hint, Some("application/pdf"), None);
464        assert_eq!(decision, StreamingDecision::SkipCache); // Due to size
465
466        let size_hint_small = SizeHint::with_exact(256 * 1024);
467        let decision_small =
468            should_stream(&policy, &size_hint_small, Some("application/pdf"), None);
469        assert_eq!(decision_small, StreamingDecision::Buffer); // Small enough
470    }
471
472    #[test]
473    fn test_content_type_only_policy() {
474        let mut excluded = HashSet::new();
475        excluded.insert("application/pdf".to_string());
476        let policy = StreamingPolicy::content_type_only(excluded);
477
478        let size_hint = SizeHint::with_exact(10 * 1024 * 1024); // 10MB
479
480        // Large non-PDF should be cached (no size filtering)
481        let decision = should_stream(&policy, &size_hint, Some("application/json"), None);
482        assert_eq!(decision, StreamingDecision::Buffer);
483
484        // PDF should be skipped
485        let decision_pdf = should_stream(&policy, &size_hint, Some("application/pdf"), None);
486        assert_eq!(decision_pdf, StreamingDecision::SkipCache);
487    }
488
489    #[test]
490    fn test_extract_size_info() {
491        let size_hint = SizeHint::with_exact(1024);
492        assert_eq!(extract_size_info(&size_hint, None), Some(1024));
493
494        let mut size_hint_upper = SizeHint::default();
495        size_hint_upper.set_upper(2048);
496        assert_eq!(extract_size_info(&size_hint_upper, None), Some(2048));
497
498        let size_hint_none = SizeHint::default();
499        assert_eq!(extract_size_info(&size_hint_none, Some(4096)), Some(4096));
500
501        assert_eq!(extract_size_info(&size_hint_none, None), None);
502    }
503
504    #[test]
505    fn test_case_insensitive_content_type() {
506        let policy = StreamingPolicy::default();
507        let size_hint = SizeHint::with_exact(1024);
508
509        // All variations should be excluded
510        assert_eq!(
511            should_stream(&policy, &size_hint, Some("Application/PDF"), None),
512            StreamingDecision::SkipCache
513        );
514        assert_eq!(
515            should_stream(&policy, &size_hint, Some("APPLICATION/PDF"), None),
516            StreamingDecision::SkipCache
517        );
518        assert_eq!(
519            should_stream(&policy, &size_hint, Some("Video/MP4"), None),
520            StreamingDecision::SkipCache
521        );
522    }
523
524    #[test]
525    fn test_content_type_with_charset() {
526        let policy = StreamingPolicy::default();
527        let size_hint = SizeHint::with_exact(1024);
528
529        // Should match even with charset parameter
530        let decision = should_stream(
531            &policy,
532            &size_hint,
533            Some("application/json; charset=utf-8"),
534            None,
535        );
536        assert_eq!(decision, StreamingDecision::Buffer);
537    }
538}