1use std::collections::HashSet;
9
10use crate::range::RangeHandling;
11
12#[derive(Debug, Clone, Copy, PartialEq, Eq)]
14#[allow(clippy::manual_non_exhaustive)]
15pub enum StreamingDecision {
16 Buffer,
18
19 SkipCache,
21
22 #[doc(hidden)]
24 StreamThrough,
25
26 StreamIfPossible,
28}
29
30#[derive(Debug, Clone)]
55pub struct StreamingPolicy {
56 pub enabled: bool,
58
59 pub max_cacheable_size: Option<usize>,
62
63 pub excluded_content_types: HashSet<String>,
66
67 pub force_cache_content_types: HashSet<String>,
70
71 pub stream_threshold: usize,
74
75 pub range_handling: RangeHandling,
77
78 pub enable_chunk_cache: bool,
83
84 pub chunk_size: usize,
90
91 pub min_chunk_file_size: u64,
96}
97
98impl Default for StreamingPolicy {
99 fn default() -> Self {
100 Self {
101 enabled: true,
102 max_cacheable_size: Some(1024 * 1024), excluded_content_types: HashSet::from([
104 "application/pdf".to_string(),
105 "video/*".to_string(),
106 "audio/*".to_string(),
107 "application/zip".to_string(),
108 "application/x-rar".to_string(),
109 "application/x-tar".to_string(),
110 "application/gzip".to_string(),
111 "application/x-7z-compressed".to_string(),
112 "application/octet-stream".to_string(),
113 ]),
114 force_cache_content_types: HashSet::from([
115 "application/json".to_string(),
116 "application/xml".to_string(),
117 "text/*".to_string(),
118 ]),
119 stream_threshold: 512 * 1024, range_handling: RangeHandling::default(),
121 enable_chunk_cache: false, chunk_size: 1024 * 1024, min_chunk_file_size: 10 * 1024 * 1024, }
125 }
126}
127
128impl StreamingPolicy {
129 pub fn disabled() -> Self {
132 Self {
133 enabled: false,
134 max_cacheable_size: None,
135 excluded_content_types: HashSet::new(),
136 force_cache_content_types: HashSet::new(),
137 stream_threshold: usize::MAX,
138 range_handling: RangeHandling::PassThrough,
139 enable_chunk_cache: false,
140 chunk_size: 1024 * 1024,
141 min_chunk_file_size: 0,
142 }
143 }
144
145 pub fn size_only(max_size: usize) -> Self {
147 Self {
148 enabled: true,
149 max_cacheable_size: Some(max_size),
150 excluded_content_types: HashSet::new(),
151 force_cache_content_types: HashSet::new(),
152 stream_threshold: max_size,
153 range_handling: RangeHandling::PassThrough,
154 enable_chunk_cache: false,
155 chunk_size: 1024 * 1024,
156 min_chunk_file_size: 0,
157 }
158 }
159
160 pub fn content_type_only(excluded: HashSet<String>) -> Self {
162 Self {
163 enabled: true,
164 max_cacheable_size: None,
165 excluded_content_types: excluded,
166 force_cache_content_types: HashSet::new(),
167 stream_threshold: usize::MAX,
168 range_handling: RangeHandling::PassThrough,
169 enable_chunk_cache: false,
170 chunk_size: 1024 * 1024,
171 min_chunk_file_size: 0,
172 }
173 }
174}
175
176pub fn should_stream(
195 policy: &StreamingPolicy,
196 size_hint: &http_body::SizeHint,
197 content_type: Option<&str>,
198 content_length: Option<u64>,
199) -> StreamingDecision {
200 if !policy.enabled {
202 return StreamingDecision::Buffer;
203 }
204
205 let is_forced = if let Some(ct) = content_type {
207 if is_excluded_content_type(ct, &policy.excluded_content_types) {
209 return StreamingDecision::SkipCache;
210 }
211
212 is_forced_content_type(ct, &policy.force_cache_content_types)
214 } else {
215 false
216 };
217
218 if let Some(exact_size) = size_hint.exact() {
220 return decide_by_size(exact_size as usize, policy, is_forced);
221 }
222
223 if let Some(upper_bound) = size_hint.upper() {
224 return decide_by_size(upper_bound as usize, policy, is_forced);
225 }
226
227 if let Some(content_len) = content_length {
229 return decide_by_size(content_len as usize, policy, is_forced);
230 }
231
232 StreamingDecision::StreamIfPossible
235}
236
237fn decide_by_size(size: usize, policy: &StreamingPolicy, _is_forced: bool) -> StreamingDecision {
242 if let Some(max_size) = policy.max_cacheable_size {
243 if size > max_size {
244 return StreamingDecision::SkipCache;
245 }
246 }
247
248 StreamingDecision::Buffer
251}
252
253fn is_excluded_content_type(content_type: &str, excluded: &HashSet<String>) -> bool {
257 let normalized = content_type.to_lowercase();
258
259 for pattern in excluded {
260 if matches_pattern(&normalized, pattern) {
261 return true;
262 }
263 }
264
265 false
266}
267
268fn is_forced_content_type(content_type: &str, forced: &HashSet<String>) -> bool {
270 let normalized = content_type.to_lowercase();
271
272 for pattern in forced {
273 if matches_pattern(&normalized, pattern) {
274 return true;
275 }
276 }
277
278 false
279}
280
281fn matches_pattern(content_type: &str, pattern: &str) -> bool {
286 let pattern_lower = pattern.to_lowercase();
287
288 if pattern_lower.ends_with("/*") {
289 let prefix = &pattern_lower[..pattern_lower.len() - 2];
291 content_type.starts_with(prefix)
292 } else {
293 content_type == pattern_lower
295 || content_type.starts_with(&format!("{};", pattern_lower))
296 }
297}
298
299pub fn extract_size_info(
301 size_hint: &http_body::SizeHint,
302 content_length: Option<u64>,
303) -> Option<u64> {
304 size_hint
305 .exact()
306 .or_else(|| size_hint.upper())
307 .or(content_length)
308}
309
310#[cfg(test)]
311mod tests {
312 use super::*;
313 use http_body::SizeHint;
314
315 #[test]
316 fn test_default_policy_excludes_pdf() {
317 let policy = StreamingPolicy::default();
318 let size_hint = SizeHint::with_exact(5 * 1024 * 1024); let decision = should_stream(
321 &policy,
322 &size_hint,
323 Some("application/pdf"),
324 Some(5 * 1024 * 1024),
325 );
326
327 assert_eq!(decision, StreamingDecision::SkipCache);
328 }
329
330 #[test]
331 fn test_default_policy_excludes_video() {
332 let policy = StreamingPolicy::default();
333 let size_hint = SizeHint::with_exact(10 * 1024 * 1024);
334
335 let decision = should_stream(&policy, &size_hint, Some("video/mp4"), None);
336
337 assert_eq!(decision, StreamingDecision::SkipCache);
338 }
339
340 #[test]
341 fn test_small_json_gets_buffered() {
342 let policy = StreamingPolicy::default();
343 let size_hint = SizeHint::with_exact(1024); let decision = should_stream(&policy, &size_hint, Some("application/json"), Some(1024));
346
347 assert_eq!(decision, StreamingDecision::Buffer);
348 }
349
350 #[test]
351 fn test_large_json_skipped_by_size() {
352 let policy = StreamingPolicy::default();
353 let size_hint = SizeHint::with_exact(2 * 1024 * 1024); let decision = should_stream(
356 &policy,
357 &size_hint,
358 Some("application/json"),
359 Some(2 * 1024 * 1024),
360 );
361
362 assert_eq!(decision, StreamingDecision::SkipCache);
363 }
364
365 #[test]
366 fn test_force_cache_respects_size_limits() {
367 let mut policy = StreamingPolicy::default();
369 policy
370 .force_cache_content_types
371 .insert("application/important".to_string());
372
373 let size_hint = SizeHint::with_exact(5 * 1024 * 1024); let decision = should_stream(
376 &policy,
377 &size_hint,
378 Some("application/important"),
379 Some(5 * 1024 * 1024),
380 );
381
382 assert_eq!(decision, StreamingDecision::SkipCache);
384
385 let small_hint = SizeHint::with_exact(500 * 1024); let decision_small = should_stream(
388 &policy,
389 &small_hint,
390 Some("application/important"),
391 Some(500 * 1024),
392 );
393 assert_eq!(decision_small, StreamingDecision::Buffer);
394 }
395
396 #[test]
397 fn test_disabled_policy_always_buffers() {
398 let policy = StreamingPolicy::disabled();
399 let size_hint = SizeHint::with_exact(10 * 1024 * 1024);
400
401 let decision = should_stream(
402 &policy,
403 &size_hint,
404 Some("application/pdf"),
405 Some(10 * 1024 * 1024),
406 );
407
408 assert_eq!(decision, StreamingDecision::Buffer);
409 }
410
411 #[test]
412 fn test_wildcard_pattern_matching() {
413 assert!(matches_pattern("video/mp4", "video/*"));
414 assert!(matches_pattern("video/mpeg", "video/*"));
415 assert!(matches_pattern("audio/mp3", "audio/*"));
416 assert!(!matches_pattern("application/json", "video/*"));
417 }
418
419 #[test]
420 fn test_exact_pattern_matching() {
421 assert!(matches_pattern("application/pdf", "application/pdf"));
422 assert!(!matches_pattern("application/pdf", "pdf")); assert!(!matches_pattern("text/plain", "application/pdf"));
424 assert!(matches_pattern(
426 "application/json; charset=utf-8",
427 "application/json"
428 ));
429 }
430
431 #[test]
432 fn test_size_hint_exact() {
433 let policy = StreamingPolicy::default();
434 let size_hint = SizeHint::with_exact(500 * 1024); let decision = should_stream(&policy, &size_hint, None, None);
437 assert_eq!(decision, StreamingDecision::Buffer);
438 }
439
440 #[test]
441 fn test_size_hint_upper_bound() {
442 let policy = StreamingPolicy::default();
443 let mut size_hint = SizeHint::default();
444 size_hint.set_upper(500 * 1024);
445
446 let decision = should_stream(&policy, &size_hint, None, None);
447 assert_eq!(decision, StreamingDecision::Buffer);
448 }
449
450 #[test]
451 fn test_content_length_fallback() {
452 let policy = StreamingPolicy::default();
453 let size_hint = SizeHint::default(); let decision = should_stream(&policy, &size_hint, None, Some(500 * 1024));
456 assert_eq!(decision, StreamingDecision::Buffer);
457 }
458
459 #[test]
460 fn test_unknown_size_conservative() {
461 let policy = StreamingPolicy::default();
462 let size_hint = SizeHint::default();
463
464 let decision = should_stream(&policy, &size_hint, None, None);
465 assert_eq!(decision, StreamingDecision::StreamIfPossible);
466 }
467
468 #[test]
469 fn test_size_only_policy() {
470 let policy = StreamingPolicy::size_only(512 * 1024);
471 let size_hint = SizeHint::with_exact(1024 * 1024);
472
473 let decision = should_stream(&policy, &size_hint, Some("application/pdf"), None);
475 assert_eq!(decision, StreamingDecision::SkipCache); let size_hint_small = SizeHint::with_exact(256 * 1024);
478 let decision_small =
479 should_stream(&policy, &size_hint_small, Some("application/pdf"), None);
480 assert_eq!(decision_small, StreamingDecision::Buffer); }
482
483 #[test]
484 fn test_content_type_only_policy() {
485 let mut excluded = HashSet::new();
486 excluded.insert("application/pdf".to_string());
487 let policy = StreamingPolicy::content_type_only(excluded);
488
489 let size_hint = SizeHint::with_exact(10 * 1024 * 1024); let decision = should_stream(&policy, &size_hint, Some("application/json"), None);
493 assert_eq!(decision, StreamingDecision::Buffer);
494
495 let decision_pdf = should_stream(&policy, &size_hint, Some("application/pdf"), None);
497 assert_eq!(decision_pdf, StreamingDecision::SkipCache);
498 }
499
500 #[test]
501 fn test_extract_size_info() {
502 let size_hint = SizeHint::with_exact(1024);
503 assert_eq!(extract_size_info(&size_hint, None), Some(1024));
504
505 let mut size_hint_upper = SizeHint::default();
506 size_hint_upper.set_upper(2048);
507 assert_eq!(extract_size_info(&size_hint_upper, None), Some(2048));
508
509 let size_hint_none = SizeHint::default();
510 assert_eq!(extract_size_info(&size_hint_none, Some(4096)), Some(4096));
511
512 assert_eq!(extract_size_info(&size_hint_none, None), None);
513 }
514
515 #[test]
516 fn test_case_insensitive_content_type() {
517 let policy = StreamingPolicy::default();
518 let size_hint = SizeHint::with_exact(1024);
519
520 assert_eq!(
522 should_stream(&policy, &size_hint, Some("Application/PDF"), None),
523 StreamingDecision::SkipCache
524 );
525 assert_eq!(
526 should_stream(&policy, &size_hint, Some("APPLICATION/PDF"), None),
527 StreamingDecision::SkipCache
528 );
529 assert_eq!(
530 should_stream(&policy, &size_hint, Some("Video/MP4"), None),
531 StreamingDecision::SkipCache
532 );
533 }
534
535 #[test]
536 fn test_content_type_with_charset() {
537 let policy = StreamingPolicy::default();
538 let size_hint = SizeHint::with_exact(1024);
539
540 let decision = should_stream(
542 &policy,
543 &size_hint,
544 Some("application/json; charset=utf-8"),
545 None,
546 );
547 assert_eq!(decision, StreamingDecision::Buffer);
548 }
549}