1use std::collections::HashSet;
9
10use crate::range::RangeHandling;
11
12#[derive(Debug, Clone, Copy, PartialEq, Eq)]
14pub enum StreamingDecision {
15 Buffer,
17
18 SkipCache,
20
21 StreamThrough,
23
24 StreamIfPossible,
26}
27
28#[derive(Debug, Clone)]
53pub struct StreamingPolicy {
54 pub enabled: bool,
56
57 pub max_cacheable_size: Option<usize>,
60
61 pub excluded_content_types: HashSet<String>,
64
65 pub force_cache_content_types: HashSet<String>,
68
69 pub stream_threshold: usize,
72
73 pub range_handling: RangeHandling,
75
76 pub enable_chunk_cache: bool,
81
82 pub chunk_size: usize,
88
89 pub min_chunk_file_size: u64,
94}
95
96impl Default for StreamingPolicy {
97 fn default() -> Self {
98 Self {
99 enabled: true,
100 max_cacheable_size: Some(1024 * 1024), excluded_content_types: HashSet::from([
102 "application/pdf".to_string(),
103 "video/*".to_string(),
104 "audio/*".to_string(),
105 "application/zip".to_string(),
106 "application/x-rar".to_string(),
107 "application/x-tar".to_string(),
108 "application/gzip".to_string(),
109 "application/x-7z-compressed".to_string(),
110 "application/octet-stream".to_string(),
111 ]),
112 force_cache_content_types: HashSet::from([
113 "application/json".to_string(),
114 "application/xml".to_string(),
115 "text/*".to_string(),
116 ]),
117 stream_threshold: 512 * 1024, range_handling: RangeHandling::default(),
119 enable_chunk_cache: false, chunk_size: 1024 * 1024, min_chunk_file_size: 10 * 1024 * 1024, }
123 }
124}
125
126impl StreamingPolicy {
127 pub fn disabled() -> Self {
130 Self {
131 enabled: false,
132 max_cacheable_size: None,
133 excluded_content_types: HashSet::new(),
134 force_cache_content_types: HashSet::new(),
135 stream_threshold: usize::MAX,
136 range_handling: RangeHandling::PassThrough,
137 enable_chunk_cache: false,
138 chunk_size: 1024 * 1024,
139 min_chunk_file_size: 0,
140 }
141 }
142
143 pub fn size_only(max_size: usize) -> Self {
145 Self {
146 enabled: true,
147 max_cacheable_size: Some(max_size),
148 excluded_content_types: HashSet::new(),
149 force_cache_content_types: HashSet::new(),
150 stream_threshold: max_size,
151 range_handling: RangeHandling::PassThrough,
152 enable_chunk_cache: false,
153 chunk_size: 1024 * 1024,
154 min_chunk_file_size: 0,
155 }
156 }
157
158 pub fn content_type_only(excluded: HashSet<String>) -> Self {
160 Self {
161 enabled: true,
162 max_cacheable_size: None,
163 excluded_content_types: excluded,
164 force_cache_content_types: HashSet::new(),
165 stream_threshold: usize::MAX,
166 range_handling: RangeHandling::PassThrough,
167 enable_chunk_cache: false,
168 chunk_size: 1024 * 1024,
169 min_chunk_file_size: 0,
170 }
171 }
172}
173
174pub fn should_stream(
193 policy: &StreamingPolicy,
194 size_hint: &http_body::SizeHint,
195 content_type: Option<&str>,
196 content_length: Option<u64>,
197) -> StreamingDecision {
198 if !policy.enabled {
200 return StreamingDecision::Buffer;
201 }
202
203 let is_forced = if let Some(ct) = content_type {
205 if is_excluded_content_type(ct, &policy.excluded_content_types) {
207 return StreamingDecision::SkipCache;
208 }
209
210 is_forced_content_type(ct, &policy.force_cache_content_types)
212 } else {
213 false
214 };
215
216 if let Some(exact_size) = size_hint.exact() {
218 return decide_by_size(exact_size as usize, policy, is_forced);
219 }
220
221 if let Some(upper_bound) = size_hint.upper() {
222 return decide_by_size(upper_bound as usize, policy, is_forced);
223 }
224
225 if let Some(content_len) = content_length {
227 return decide_by_size(content_len as usize, policy, is_forced);
228 }
229
230 StreamingDecision::StreamIfPossible
233}
234
235fn decide_by_size(size: usize, policy: &StreamingPolicy, _is_forced: bool) -> StreamingDecision {
240 if let Some(max_size) = policy.max_cacheable_size {
241 if size > max_size {
242 return StreamingDecision::SkipCache;
243 }
244 }
245
246 StreamingDecision::Buffer
249}
250
251fn is_excluded_content_type(content_type: &str, excluded: &HashSet<String>) -> bool {
255 let normalized = content_type.to_lowercase();
256
257 for pattern in excluded {
258 if matches_pattern(&normalized, pattern) {
259 return true;
260 }
261 }
262
263 false
264}
265
266fn is_forced_content_type(content_type: &str, forced: &HashSet<String>) -> bool {
268 let normalized = content_type.to_lowercase();
269
270 for pattern in forced {
271 if matches_pattern(&normalized, pattern) {
272 return true;
273 }
274 }
275
276 false
277}
278
279fn matches_pattern(content_type: &str, pattern: &str) -> bool {
281 let pattern_lower = pattern.to_lowercase();
282
283 if pattern_lower.ends_with("/*") {
284 let prefix = &pattern_lower[..pattern_lower.len() - 2];
286 content_type.starts_with(prefix)
287 } else {
288 content_type.contains(&pattern_lower)
290 }
291}
292
293pub fn extract_size_info(
295 size_hint: &http_body::SizeHint,
296 content_length: Option<u64>,
297) -> Option<u64> {
298 size_hint
299 .exact()
300 .or_else(|| size_hint.upper())
301 .or(content_length)
302}
303
304#[cfg(test)]
305mod tests {
306 use super::*;
307 use http_body::SizeHint;
308
309 #[test]
310 fn test_default_policy_excludes_pdf() {
311 let policy = StreamingPolicy::default();
312 let size_hint = SizeHint::with_exact(5 * 1024 * 1024); let decision = should_stream(
315 &policy,
316 &size_hint,
317 Some("application/pdf"),
318 Some(5 * 1024 * 1024),
319 );
320
321 assert_eq!(decision, StreamingDecision::SkipCache);
322 }
323
324 #[test]
325 fn test_default_policy_excludes_video() {
326 let policy = StreamingPolicy::default();
327 let size_hint = SizeHint::with_exact(10 * 1024 * 1024);
328
329 let decision = should_stream(&policy, &size_hint, Some("video/mp4"), None);
330
331 assert_eq!(decision, StreamingDecision::SkipCache);
332 }
333
334 #[test]
335 fn test_small_json_gets_buffered() {
336 let policy = StreamingPolicy::default();
337 let size_hint = SizeHint::with_exact(1024); let decision = should_stream(&policy, &size_hint, Some("application/json"), Some(1024));
340
341 assert_eq!(decision, StreamingDecision::Buffer);
342 }
343
344 #[test]
345 fn test_large_json_skipped_by_size() {
346 let policy = StreamingPolicy::default();
347 let size_hint = SizeHint::with_exact(2 * 1024 * 1024); let decision = should_stream(
350 &policy,
351 &size_hint,
352 Some("application/json"),
353 Some(2 * 1024 * 1024),
354 );
355
356 assert_eq!(decision, StreamingDecision::SkipCache);
357 }
358
359 #[test]
360 fn test_force_cache_respects_size_limits() {
361 let mut policy = StreamingPolicy::default();
363 policy
364 .force_cache_content_types
365 .insert("application/important".to_string());
366
367 let size_hint = SizeHint::with_exact(5 * 1024 * 1024); let decision = should_stream(
370 &policy,
371 &size_hint,
372 Some("application/important"),
373 Some(5 * 1024 * 1024),
374 );
375
376 assert_eq!(decision, StreamingDecision::SkipCache);
378
379 let small_hint = SizeHint::with_exact(500 * 1024); let decision_small = should_stream(
382 &policy,
383 &small_hint,
384 Some("application/important"),
385 Some(500 * 1024),
386 );
387 assert_eq!(decision_small, StreamingDecision::Buffer);
388 }
389
390 #[test]
391 fn test_disabled_policy_always_buffers() {
392 let policy = StreamingPolicy::disabled();
393 let size_hint = SizeHint::with_exact(10 * 1024 * 1024);
394
395 let decision = should_stream(
396 &policy,
397 &size_hint,
398 Some("application/pdf"),
399 Some(10 * 1024 * 1024),
400 );
401
402 assert_eq!(decision, StreamingDecision::Buffer);
403 }
404
405 #[test]
406 fn test_wildcard_pattern_matching() {
407 assert!(matches_pattern("video/mp4", "video/*"));
408 assert!(matches_pattern("video/mpeg", "video/*"));
409 assert!(matches_pattern("audio/mp3", "audio/*"));
410 assert!(!matches_pattern("application/json", "video/*"));
411 }
412
413 #[test]
414 fn test_exact_pattern_matching() {
415 assert!(matches_pattern("application/pdf", "application/pdf"));
416 assert!(matches_pattern("application/pdf", "pdf")); assert!(!matches_pattern("text/plain", "application/pdf"));
418 }
419
420 #[test]
421 fn test_size_hint_exact() {
422 let policy = StreamingPolicy::default();
423 let size_hint = SizeHint::with_exact(500 * 1024); let decision = should_stream(&policy, &size_hint, None, None);
426 assert_eq!(decision, StreamingDecision::Buffer);
427 }
428
429 #[test]
430 fn test_size_hint_upper_bound() {
431 let policy = StreamingPolicy::default();
432 let mut size_hint = SizeHint::default();
433 size_hint.set_upper(500 * 1024);
434
435 let decision = should_stream(&policy, &size_hint, None, None);
436 assert_eq!(decision, StreamingDecision::Buffer);
437 }
438
439 #[test]
440 fn test_content_length_fallback() {
441 let policy = StreamingPolicy::default();
442 let size_hint = SizeHint::default(); let decision = should_stream(&policy, &size_hint, None, Some(500 * 1024));
445 assert_eq!(decision, StreamingDecision::Buffer);
446 }
447
448 #[test]
449 fn test_unknown_size_conservative() {
450 let policy = StreamingPolicy::default();
451 let size_hint = SizeHint::default();
452
453 let decision = should_stream(&policy, &size_hint, None, None);
454 assert_eq!(decision, StreamingDecision::StreamIfPossible);
455 }
456
457 #[test]
458 fn test_size_only_policy() {
459 let policy = StreamingPolicy::size_only(512 * 1024);
460 let size_hint = SizeHint::with_exact(1024 * 1024);
461
462 let decision = should_stream(&policy, &size_hint, Some("application/pdf"), None);
464 assert_eq!(decision, StreamingDecision::SkipCache); let size_hint_small = SizeHint::with_exact(256 * 1024);
467 let decision_small =
468 should_stream(&policy, &size_hint_small, Some("application/pdf"), None);
469 assert_eq!(decision_small, StreamingDecision::Buffer); }
471
472 #[test]
473 fn test_content_type_only_policy() {
474 let mut excluded = HashSet::new();
475 excluded.insert("application/pdf".to_string());
476 let policy = StreamingPolicy::content_type_only(excluded);
477
478 let size_hint = SizeHint::with_exact(10 * 1024 * 1024); let decision = should_stream(&policy, &size_hint, Some("application/json"), None);
482 assert_eq!(decision, StreamingDecision::Buffer);
483
484 let decision_pdf = should_stream(&policy, &size_hint, Some("application/pdf"), None);
486 assert_eq!(decision_pdf, StreamingDecision::SkipCache);
487 }
488
489 #[test]
490 fn test_extract_size_info() {
491 let size_hint = SizeHint::with_exact(1024);
492 assert_eq!(extract_size_info(&size_hint, None), Some(1024));
493
494 let mut size_hint_upper = SizeHint::default();
495 size_hint_upper.set_upper(2048);
496 assert_eq!(extract_size_info(&size_hint_upper, None), Some(2048));
497
498 let size_hint_none = SizeHint::default();
499 assert_eq!(extract_size_info(&size_hint_none, Some(4096)), Some(4096));
500
501 assert_eq!(extract_size_info(&size_hint_none, None), None);
502 }
503
504 #[test]
505 fn test_case_insensitive_content_type() {
506 let policy = StreamingPolicy::default();
507 let size_hint = SizeHint::with_exact(1024);
508
509 assert_eq!(
511 should_stream(&policy, &size_hint, Some("Application/PDF"), None),
512 StreamingDecision::SkipCache
513 );
514 assert_eq!(
515 should_stream(&policy, &size_hint, Some("APPLICATION/PDF"), None),
516 StreamingDecision::SkipCache
517 );
518 assert_eq!(
519 should_stream(&policy, &size_hint, Some("Video/MP4"), None),
520 StreamingDecision::SkipCache
521 );
522 }
523
524 #[test]
525 fn test_content_type_with_charset() {
526 let policy = StreamingPolicy::default();
527 let size_hint = SizeHint::with_exact(1024);
528
529 let decision = should_stream(
531 &policy,
532 &size_hint,
533 Some("application/json; charset=utf-8"),
534 None,
535 );
536 assert_eq!(decision, StreamingDecision::Buffer);
537 }
538}