1use std::sync::atomic::{AtomicU64, AtomicUsize, Ordering};
47use std::time::{Duration, Instant};
48
49pub const TINY_BUFFER: usize = 256;
55
56pub const SMALL_BUFFER: usize = 4096;
58
59pub const MEDIUM_BUFFER: usize = 16384;
61
62pub const LARGE_BUFFER: usize = 65536;
64
65pub const HUGE_BUFFER: usize = 262144;
67
68pub const DEFAULT_INITIAL_BUFFER: usize = SMALL_BUFFER;
70
71pub const MIN_BUFFER: usize = 256;
73
74pub const MAX_BUFFER: usize = 1024 * 1024; #[derive(Debug, Clone)]
83pub struct ReadBufferConfig {
84 pub initial_size: usize,
86 pub min_size: usize,
88 pub max_size: usize,
90 pub growth_factor: f32,
92 pub shrink_threshold: f32,
94 pub adaptive: bool,
96 pub recalculate_interval: usize,
98 pub content_type_hints: bool,
100}
101
102impl Default for ReadBufferConfig {
103 fn default() -> Self {
104 Self {
105 initial_size: DEFAULT_INITIAL_BUFFER,
106 min_size: MIN_BUFFER,
107 max_size: MAX_BUFFER,
108 growth_factor: 1.5,
109 shrink_threshold: 0.25,
110 adaptive: true,
111 recalculate_interval: 1000,
112 content_type_hints: true,
113 }
114 }
115}
116
117impl ReadBufferConfig {
118 pub fn high_throughput() -> Self {
120 Self {
121 initial_size: MEDIUM_BUFFER,
122 min_size: SMALL_BUFFER,
123 max_size: MAX_BUFFER,
124 growth_factor: 2.0,
125 shrink_threshold: 0.1,
126 adaptive: true,
127 recalculate_interval: 500,
128 content_type_hints: true,
129 }
130 }
131
132 pub fn low_memory() -> Self {
134 Self {
135 initial_size: TINY_BUFFER,
136 min_size: MIN_BUFFER,
137 max_size: LARGE_BUFFER,
138 growth_factor: 1.25,
139 shrink_threshold: 0.5,
140 adaptive: true,
141 recalculate_interval: 100,
142 content_type_hints: true,
143 }
144 }
145
146 pub fn api_server() -> Self {
148 Self {
149 initial_size: SMALL_BUFFER,
150 min_size: TINY_BUFFER,
151 max_size: MEDIUM_BUFFER,
152 growth_factor: 1.5,
153 shrink_threshold: 0.25,
154 adaptive: true,
155 recalculate_interval: 1000,
156 content_type_hints: true,
157 }
158 }
159
160 pub fn file_upload() -> Self {
162 Self {
163 initial_size: LARGE_BUFFER,
164 min_size: MEDIUM_BUFFER,
165 max_size: MAX_BUFFER,
166 growth_factor: 2.0,
167 shrink_threshold: 0.1,
168 adaptive: false, recalculate_interval: 0,
170 content_type_hints: false,
171 }
172 }
173
174 #[inline]
176 pub fn grow_size(&self, current: usize) -> usize {
177 let grown = (current as f32 * self.growth_factor) as usize;
178 grown.min(self.max_size).max(self.min_size)
179 }
180
181 #[inline]
183 pub fn shrink_size(&self, current: usize, used: usize) -> usize {
184 let usage_ratio = used as f32 / current as f32;
185 if usage_ratio < self.shrink_threshold && current > self.min_size {
186 let target = used.next_power_of_two();
188 target.max(self.min_size)
189 } else {
190 current
191 }
192 }
193
194 #[inline]
196 pub fn size_for_content_type(&self, content_type: &str) -> usize {
197 if !self.content_type_hints {
198 return self.initial_size;
199 }
200
201 if content_type.contains("application/json") {
202 SMALL_BUFFER } else if content_type.contains("text/html") {
204 MEDIUM_BUFFER } else if content_type.contains("multipart/form-data")
206 || content_type.contains("application/octet-stream")
207 {
208 LARGE_BUFFER } else if content_type.contains("text/plain") {
210 SMALL_BUFFER
211 } else if content_type.contains("application/x-www-form-urlencoded") {
212 TINY_BUFFER } else {
214 self.initial_size
215 }
216 }
217}
218
219#[derive(Debug)]
225pub struct PayloadTracker {
226 count: AtomicU64,
228 total_bytes: AtomicU64,
230 sum_squares: AtomicU64,
232 min_size: AtomicUsize,
234 max_size: AtomicUsize,
236 histogram: [AtomicU64; 24], last_recalc: AtomicU64, cached_size: AtomicUsize,
242}
243
244impl Default for PayloadTracker {
245 fn default() -> Self {
246 Self::new()
247 }
248}
249
250impl PayloadTracker {
251 pub fn new() -> Self {
253 Self {
254 count: AtomicU64::new(0),
255 total_bytes: AtomicU64::new(0),
256 sum_squares: AtomicU64::new(0),
257 min_size: AtomicUsize::new(usize::MAX),
258 max_size: AtomicUsize::new(0),
259 histogram: std::array::from_fn(|_| AtomicU64::new(0)),
260 last_recalc: AtomicU64::new(0),
261 cached_size: AtomicUsize::new(DEFAULT_INITIAL_BUFFER),
262 }
263 }
264
265 #[inline]
267 pub fn record(&self, size: usize) {
268 self.count.fetch_add(1, Ordering::Relaxed);
269 self.total_bytes.fetch_add(size as u64, Ordering::Relaxed);
270
271 let sq = (size as u64).saturating_mul(size as u64);
273 self.sum_squares
274 .fetch_add(sq.min(u64::MAX / 2), Ordering::Relaxed);
275
276 self.min_size.fetch_min(size, Ordering::Relaxed);
278 self.max_size.fetch_max(size, Ordering::Relaxed);
279
280 let bucket = if size == 0 {
282 0
283 } else {
284 (64 - (size as u64).leading_zeros()).min(23) as usize
285 };
286 self.histogram[bucket].fetch_add(1, Ordering::Relaxed);
287 }
288
289 #[inline]
291 pub fn count(&self) -> u64 {
292 self.count.load(Ordering::Relaxed)
293 }
294
295 #[inline]
297 pub fn total_bytes(&self) -> u64 {
298 self.total_bytes.load(Ordering::Relaxed)
299 }
300
301 #[inline]
303 pub fn average_size(&self) -> f64 {
304 let count = self.count();
305 if count == 0 {
306 return DEFAULT_INITIAL_BUFFER as f64;
307 }
308 self.total_bytes() as f64 / count as f64
309 }
310
311 #[inline]
313 pub fn min_size(&self) -> usize {
314 let min = self.min_size.load(Ordering::Relaxed);
315 if min == usize::MAX { 0 } else { min }
316 }
317
318 #[inline]
320 pub fn max_size(&self) -> usize {
321 self.max_size.load(Ordering::Relaxed)
322 }
323
324 pub fn std_deviation(&self) -> f64 {
326 let count = self.count();
327 if count < 2 {
328 return 0.0;
329 }
330
331 let mean = self.average_size();
332 let sum_sq = self.sum_squares.load(Ordering::Relaxed) as f64;
333 let variance = (sum_sq / count as f64) - (mean * mean);
334 variance.max(0.0).sqrt()
335 }
336
337 pub fn percentile(&self, p: f64) -> usize {
339 let count = self.count();
340 if count == 0 {
341 return DEFAULT_INITIAL_BUFFER;
342 }
343
344 let target = (count as f64 * p / 100.0) as u64;
345 let mut cumulative = 0u64;
346
347 for (bucket, counter) in self.histogram.iter().enumerate() {
348 cumulative += counter.load(Ordering::Relaxed);
349 if cumulative >= target {
350 return 1usize << bucket;
351 }
352 }
353
354 self.max_size()
355 }
356
357 pub fn recommended_buffer_size(&self) -> usize {
362 let count = self.count();
363 if count < 10 {
364 return DEFAULT_INITIAL_BUFFER;
366 }
367
368 let p90 = self.percentile(90.0);
370
371 if p90 <= TINY_BUFFER {
373 TINY_BUFFER
374 } else if p90 <= SMALL_BUFFER {
375 SMALL_BUFFER
376 } else if p90 <= MEDIUM_BUFFER {
377 MEDIUM_BUFFER
378 } else if p90 <= LARGE_BUFFER {
379 LARGE_BUFFER
380 } else if p90 <= HUGE_BUFFER {
381 HUGE_BUFFER
382 } else {
383 MAX_BUFFER
385 }
386 }
387
388 pub fn update_cached_size(&self) {
390 let size = self.recommended_buffer_size();
391 self.cached_size.store(size, Ordering::Relaxed);
392 self.last_recalc.store(
393 std::time::SystemTime::now()
394 .duration_since(std::time::UNIX_EPOCH)
395 .unwrap_or_default()
396 .as_millis() as u64,
397 Ordering::Relaxed,
398 );
399 }
400
401 #[inline]
403 pub fn cached_recommended_size(&self) -> usize {
404 self.cached_size.load(Ordering::Relaxed)
405 }
406
407 pub fn needs_recalc(&self, interval: Duration) -> bool {
409 let now = std::time::SystemTime::now()
410 .duration_since(std::time::UNIX_EPOCH)
411 .unwrap_or_default()
412 .as_millis() as u64;
413 let last = self.last_recalc.load(Ordering::Relaxed);
414 now.saturating_sub(last) > interval.as_millis() as u64
415 }
416
417 pub fn histogram(&self) -> Vec<(usize, u64)> {
419 self.histogram
420 .iter()
421 .enumerate()
422 .map(|(i, c)| (1usize << i, c.load(Ordering::Relaxed)))
423 .filter(|(_, c)| *c > 0)
424 .collect()
425 }
426
427 pub fn reset(&self) {
429 self.count.store(0, Ordering::Relaxed);
430 self.total_bytes.store(0, Ordering::Relaxed);
431 self.sum_squares.store(0, Ordering::Relaxed);
432 self.min_size.store(usize::MAX, Ordering::Relaxed);
433 self.max_size.store(0, Ordering::Relaxed);
434 for counter in &self.histogram {
435 counter.store(0, Ordering::Relaxed);
436 }
437 self.cached_size
438 .store(DEFAULT_INITIAL_BUFFER, Ordering::Relaxed);
439 }
440}
441
442#[derive(Debug, Clone, Copy, PartialEq, Eq)]
448pub enum ContentCategory {
449 Api,
451 Html,
453 Form,
455 Multipart,
457 Binary,
459 Streaming,
461 Unknown,
463}
464
465impl ContentCategory {
466 pub fn from_content_type(content_type: &str) -> Self {
468 let ct = content_type.to_lowercase();
469
470 if ct.contains("application/json")
471 || ct.contains("application/xml")
472 || ct.contains("text/xml")
473 {
474 Self::Api
475 } else if ct.contains("text/html") {
476 Self::Html
477 } else if ct.contains("application/x-www-form-urlencoded") {
478 Self::Form
479 } else if ct.contains("multipart/form-data") {
480 Self::Multipart
481 } else if ct.contains("application/octet-stream")
482 || ct.contains("image/")
483 || ct.contains("video/")
484 || ct.contains("audio/")
485 {
486 Self::Binary
487 } else if ct.contains("text/event-stream") || ct.contains("application/grpc") {
488 Self::Streaming
489 } else {
490 Self::Unknown
491 }
492 }
493
494 #[inline]
496 pub const fn recommended_buffer_size(self) -> usize {
497 match self {
498 Self::Api => SMALL_BUFFER, Self::Html => MEDIUM_BUFFER, Self::Form => TINY_BUFFER, Self::Multipart => LARGE_BUFFER, Self::Binary => LARGE_BUFFER, Self::Streaming => MEDIUM_BUFFER, Self::Unknown => SMALL_BUFFER, }
506 }
507
508 #[inline]
510 pub const fn streaming_threshold(self) -> usize {
511 match self {
512 Self::Api => LARGE_BUFFER, Self::Html => HUGE_BUFFER, Self::Form => MEDIUM_BUFFER, Self::Multipart => MAX_BUFFER, Self::Binary => MAX_BUFFER, Self::Streaming => MEDIUM_BUFFER, Self::Unknown => LARGE_BUFFER, }
520 }
521}
522
523#[derive(Debug)]
529pub struct AdaptiveBufferSizer {
530 config: ReadBufferConfig,
532 request_tracker: PayloadTracker,
534 response_tracker: PayloadTracker,
536 api_tracker: PayloadTracker,
538 html_tracker: PayloadTracker,
539 multipart_tracker: PayloadTracker,
540 requests_since_recalc: AtomicUsize,
542 created: Instant,
544}
545
546impl Default for AdaptiveBufferSizer {
547 fn default() -> Self {
548 Self::new(ReadBufferConfig::default())
549 }
550}
551
552impl AdaptiveBufferSizer {
553 pub fn new(config: ReadBufferConfig) -> Self {
555 Self {
556 config,
557 request_tracker: PayloadTracker::new(),
558 response_tracker: PayloadTracker::new(),
559 api_tracker: PayloadTracker::new(),
560 html_tracker: PayloadTracker::new(),
561 multipart_tracker: PayloadTracker::new(),
562 requests_since_recalc: AtomicUsize::new(0),
563 created: Instant::now(),
564 }
565 }
566
567 #[inline]
569 pub fn record_request(&self, size: usize, content_type: Option<&str>) {
570 self.request_tracker.record(size);
571
572 if let Some(ct) = content_type {
573 match ContentCategory::from_content_type(ct) {
574 ContentCategory::Api => self.api_tracker.record(size),
575 ContentCategory::Html => self.html_tracker.record(size),
576 ContentCategory::Multipart => self.multipart_tracker.record(size),
577 _ => {}
578 }
579 }
580
581 self.maybe_recalculate();
582 }
583
584 #[inline]
586 pub fn record_response(&self, size: usize, content_type: Option<&str>) {
587 self.response_tracker.record(size);
588
589 if let Some(ct) = content_type {
590 match ContentCategory::from_content_type(ct) {
591 ContentCategory::Api => self.api_tracker.record(size),
592 ContentCategory::Html => self.html_tracker.record(size),
593 _ => {}
594 }
595 }
596 }
597
598 fn maybe_recalculate(&self) {
600 if !self.config.adaptive {
601 return;
602 }
603
604 let count = self.requests_since_recalc.fetch_add(1, Ordering::Relaxed);
605 if count >= self.config.recalculate_interval {
606 self.requests_since_recalc.store(0, Ordering::Relaxed);
607 self.request_tracker.update_cached_size();
608 self.response_tracker.update_cached_size();
609 self.api_tracker.update_cached_size();
610 self.html_tracker.update_cached_size();
611 self.multipart_tracker.update_cached_size();
612 }
613 }
614
615 #[inline]
617 pub fn request_buffer_size(&self) -> usize {
618 if self.config.adaptive {
619 self.request_tracker.cached_recommended_size()
620 } else {
621 self.config.initial_size
622 }
623 }
624
625 #[inline]
627 pub fn buffer_size_for_content_type(&self, content_type: &str) -> usize {
628 if !self.config.adaptive {
629 return self.config.size_for_content_type(content_type);
630 }
631
632 match ContentCategory::from_content_type(content_type) {
633 ContentCategory::Api => self.api_tracker.cached_recommended_size(),
634 ContentCategory::Html => self.html_tracker.cached_recommended_size(),
635 ContentCategory::Multipart => self.multipart_tracker.cached_recommended_size(),
636 category => category.recommended_buffer_size(),
637 }
638 }
639
640 #[inline]
642 pub fn config(&self) -> &ReadBufferConfig {
643 &self.config
644 }
645
646 #[inline]
648 pub fn request_stats(&self) -> &PayloadTracker {
649 &self.request_tracker
650 }
651
652 #[inline]
654 pub fn response_stats(&self) -> &PayloadTracker {
655 &self.response_tracker
656 }
657
658 #[inline]
660 pub fn uptime(&self) -> Duration {
661 self.created.elapsed()
662 }
663
664 pub fn reset(&self) {
666 self.request_tracker.reset();
667 self.response_tracker.reset();
668 self.api_tracker.reset();
669 self.html_tracker.reset();
670 self.multipart_tracker.reset();
671 self.requests_since_recalc.store(0, Ordering::Relaxed);
672 }
673}
674
675#[derive(Debug, Default)]
681pub struct BufferSizingStats {
682 allocations: AtomicU64,
684 reallocations: AtomicU64,
686 bytes_wasted: AtomicU64,
688 perfect_fits: AtomicU64,
690}
691
692impl BufferSizingStats {
693 pub fn new() -> Self {
695 Self::default()
696 }
697
698 #[inline]
700 pub fn record_allocation(&self, allocated: usize, used: usize) {
701 self.allocations.fetch_add(1, Ordering::Relaxed);
702
703 if used == allocated || (used as f64 / allocated as f64) > 0.75 {
704 self.perfect_fits.fetch_add(1, Ordering::Relaxed);
705 } else {
706 let wasted = allocated.saturating_sub(used);
707 self.bytes_wasted
708 .fetch_add(wasted as u64, Ordering::Relaxed);
709 }
710 }
711
712 #[inline]
714 pub fn record_reallocation(&self) {
715 self.reallocations.fetch_add(1, Ordering::Relaxed);
716 }
717
718 pub fn allocations(&self) -> u64 {
720 self.allocations.load(Ordering::Relaxed)
721 }
722
723 pub fn reallocations(&self) -> u64 {
725 self.reallocations.load(Ordering::Relaxed)
726 }
727
728 pub fn bytes_wasted(&self) -> u64 {
730 self.bytes_wasted.load(Ordering::Relaxed)
731 }
732
733 pub fn perfect_fits(&self) -> u64 {
735 self.perfect_fits.load(Ordering::Relaxed)
736 }
737
738 pub fn perfect_fit_ratio(&self) -> f64 {
740 let total = self.allocations();
741 let perfect = self.perfect_fits();
742 if total > 0 {
743 (perfect as f64 / total as f64) * 100.0
744 } else {
745 0.0
746 }
747 }
748
749 pub fn reallocation_ratio(&self) -> f64 {
751 let total = self.allocations();
752 let reallocs = self.reallocations();
753 if total > 0 {
754 (reallocs as f64 / total as f64) * 100.0
755 } else {
756 0.0
757 }
758 }
759}
760
761static BUFFER_SIZING_STATS: BufferSizingStats = BufferSizingStats {
763 allocations: AtomicU64::new(0),
764 reallocations: AtomicU64::new(0),
765 bytes_wasted: AtomicU64::new(0),
766 perfect_fits: AtomicU64::new(0),
767};
768
769pub fn buffer_sizing_stats() -> &'static BufferSizingStats {
771 &BUFFER_SIZING_STATS
772}
773
774#[cfg(test)]
779mod tests {
780 use super::*;
781
782 #[test]
783 fn test_read_buffer_config_default() {
784 let config = ReadBufferConfig::default();
785 assert_eq!(config.initial_size, SMALL_BUFFER);
786 assert!(config.adaptive);
787 }
788
789 #[test]
790 fn test_read_buffer_config_presets() {
791 let high = ReadBufferConfig::high_throughput();
792 assert_eq!(high.initial_size, MEDIUM_BUFFER);
793
794 let low = ReadBufferConfig::low_memory();
795 assert_eq!(low.initial_size, TINY_BUFFER);
796
797 let api = ReadBufferConfig::api_server();
798 assert_eq!(api.initial_size, SMALL_BUFFER);
799
800 let upload = ReadBufferConfig::file_upload();
801 assert_eq!(upload.initial_size, LARGE_BUFFER);
802 assert!(!upload.adaptive);
803 }
804
805 #[test]
806 fn test_grow_size() {
807 let config = ReadBufferConfig::default();
808 assert_eq!(config.grow_size(4096), 6144); assert_eq!(config.grow_size(MAX_BUFFER), MAX_BUFFER); }
811
812 #[test]
813 fn test_shrink_size() {
814 let config = ReadBufferConfig::default();
815 let shrunk = config.shrink_size(16384, 1000);
817 assert!(shrunk < 16384);
818 assert!(shrunk >= MIN_BUFFER);
819
820 let not_shrunk = config.shrink_size(16384, 10000);
822 assert_eq!(not_shrunk, 16384);
823 }
824
825 #[test]
826 fn test_size_for_content_type() {
827 let config = ReadBufferConfig::default();
828 assert_eq!(
829 config.size_for_content_type("application/json"),
830 SMALL_BUFFER
831 );
832 assert_eq!(
833 config.size_for_content_type("multipart/form-data"),
834 LARGE_BUFFER
835 );
836 assert_eq!(config.size_for_content_type("text/html"), MEDIUM_BUFFER);
837 }
838
839 #[test]
840 fn test_payload_tracker_basic() {
841 let tracker = PayloadTracker::new();
842
843 tracker.record(100);
844 tracker.record(200);
845 tracker.record(300);
846
847 assert_eq!(tracker.count(), 3);
848 assert_eq!(tracker.total_bytes(), 600);
849 assert_eq!(tracker.min_size(), 100);
850 assert_eq!(tracker.max_size(), 300);
851 assert!((tracker.average_size() - 200.0).abs() < 0.1);
852 }
853
854 #[test]
855 fn test_payload_tracker_percentile() {
856 let tracker = PayloadTracker::new();
857
858 for _ in 0..100 {
860 tracker.record(100); }
862 for _ in 0..100 {
863 tracker.record(1000); }
865 for _ in 0..10 {
866 tracker.record(100000); }
868
869 let p50 = tracker.percentile(50.0);
870 let p90 = tracker.percentile(90.0);
871 let p99 = tracker.percentile(99.0);
872
873 assert!(p50 <= p90);
874 assert!(p90 <= p99);
875 }
876
877 #[test]
878 fn test_payload_tracker_recommended_size() {
879 let tracker = PayloadTracker::new();
880
881 for i in 0..1000 {
883 let size = 500 + (i * 17 % 1000);
885 tracker.record(size);
886 }
887
888 let recommended = tracker.recommended_buffer_size();
889 assert!((TINY_BUFFER..=LARGE_BUFFER).contains(&recommended));
891 }
892
893 #[test]
894 fn test_content_category_detection() {
895 assert_eq!(
896 ContentCategory::from_content_type("application/json"),
897 ContentCategory::Api
898 );
899 assert_eq!(
900 ContentCategory::from_content_type("text/html; charset=utf-8"),
901 ContentCategory::Html
902 );
903 assert_eq!(
904 ContentCategory::from_content_type("multipart/form-data; boundary=---"),
905 ContentCategory::Multipart
906 );
907 assert_eq!(
908 ContentCategory::from_content_type("image/png"),
909 ContentCategory::Binary
910 );
911 }
912
913 #[test]
914 fn test_content_category_buffer_sizes() {
915 assert_eq!(ContentCategory::Api.recommended_buffer_size(), SMALL_BUFFER);
916 assert_eq!(
917 ContentCategory::Html.recommended_buffer_size(),
918 MEDIUM_BUFFER
919 );
920 assert_eq!(
921 ContentCategory::Multipart.recommended_buffer_size(),
922 LARGE_BUFFER
923 );
924 }
925
926 #[test]
927 fn test_adaptive_buffer_sizer() {
928 let sizer = AdaptiveBufferSizer::new(ReadBufferConfig::default());
929
930 sizer.record_request(1024, Some("application/json"));
932 sizer.record_request(2048, Some("application/json"));
933 sizer.record_request(512, Some("text/html"));
934
935 let request_size = sizer.request_buffer_size();
936 assert!(request_size >= MIN_BUFFER);
937 }
938
939 #[test]
940 fn test_buffer_sizing_stats() {
941 let stats = buffer_sizing_stats();
942 stats.record_allocation(4096, 3000);
943 stats.record_allocation(4096, 4000); assert!(stats.allocations() >= 2);
946 }
947
948 #[test]
949 fn test_histogram() {
950 let tracker = PayloadTracker::new();
951
952 tracker.record(100); tracker.record(1000); tracker.record(10000); let hist = tracker.histogram();
957 assert!(!hist.is_empty());
958 }
959}