1use bytes::{Bytes, BytesMut};
55use std::io::IoSlice;
56use std::sync::atomic::{AtomicU64, AtomicUsize, Ordering};
57use std::time::{Duration, Instant};
58
59pub const DEFAULT_COALESCE_CAPACITY: usize = 4096;
65
66pub const DEFAULT_FLUSH_THRESHOLD: usize = 16384;
68
69pub const MIN_COALESCE_SIZE: usize = 512;
71
72pub const MAX_COALESCE_BUFFER: usize = 1024 * 1024;
74
75pub const DEFAULT_FLUSH_TIMEOUT_US: u64 = 100;
77
78#[derive(Debug, Clone)]
84pub struct CoalesceConfig {
85 pub initial_capacity: usize,
87 pub flush_threshold: usize,
89 pub max_buffer_size: usize,
91 pub bypass_threshold: usize,
93 pub flush_timeout_us: u64,
95 pub use_tcp_cork: bool,
97 pub collect_stats: bool,
99}
100
101impl Default for CoalesceConfig {
102 fn default() -> Self {
103 Self {
104 initial_capacity: DEFAULT_COALESCE_CAPACITY,
105 flush_threshold: DEFAULT_FLUSH_THRESHOLD,
106 max_buffer_size: MAX_COALESCE_BUFFER,
107 bypass_threshold: 65536, flush_timeout_us: DEFAULT_FLUSH_TIMEOUT_US,
109 use_tcp_cork: true,
110 collect_stats: true,
111 }
112 }
113}
114
115impl CoalesceConfig {
116 pub fn high_throughput() -> Self {
118 Self {
119 initial_capacity: 8192,
120 flush_threshold: 32768,
121 max_buffer_size: MAX_COALESCE_BUFFER,
122 bypass_threshold: 131072, flush_timeout_us: 500, use_tcp_cork: true,
125 collect_stats: false,
126 }
127 }
128
129 pub fn low_latency() -> Self {
131 Self {
132 initial_capacity: 2048,
133 flush_threshold: 4096,
134 max_buffer_size: 65536,
135 bypass_threshold: 16384, flush_timeout_us: 10, use_tcp_cork: false,
138 collect_stats: false,
139 }
140 }
141
142 pub fn memory_efficient() -> Self {
144 Self {
145 initial_capacity: 1024,
146 flush_threshold: 8192,
147 max_buffer_size: 65536,
148 bypass_threshold: 32768,
149 flush_timeout_us: 200,
150 use_tcp_cork: true,
151 collect_stats: true,
152 }
153 }
154}
155
156#[derive(Debug)]
162pub struct WriteCoalescer {
163 buffer: BytesMut,
165 config: CoalesceConfig,
167 writes_coalesced: usize,
169 first_write_time: Option<Instant>,
171 total_bytes: usize,
173}
174
175impl WriteCoalescer {
176 pub fn new(config: CoalesceConfig) -> Self {
178 Self {
179 buffer: BytesMut::with_capacity(config.initial_capacity),
180 config,
181 writes_coalesced: 0,
182 first_write_time: None,
183 total_bytes: 0,
184 }
185 }
186
187 pub fn default_config() -> Self {
189 Self::new(CoalesceConfig::default())
190 }
191
192 #[inline]
196 pub fn write(&mut self, data: &[u8]) -> WriteResult {
197 if data.is_empty() {
198 return WriteResult::Buffered;
199 }
200
201 if data.len() >= self.config.bypass_threshold {
203 COALESCE_STATS.record_bypass(data.len());
204 return WriteResult::Bypass(Bytes::copy_from_slice(data));
205 }
206
207 if self.first_write_time.is_none() {
209 self.first_write_time = Some(Instant::now());
210 }
211
212 self.buffer.extend_from_slice(data);
214 self.writes_coalesced += 1;
215 self.total_bytes += data.len();
216
217 if self.config.collect_stats {
218 COALESCE_STATS.record_coalesce(data.len());
219 }
220
221 if self.should_flush() {
223 WriteResult::ShouldFlush
224 } else {
225 WriteResult::Buffered
226 }
227 }
228
229 #[inline]
231 pub fn write_bytes(&mut self, data: Bytes) -> WriteResult {
232 if data.is_empty() {
233 return WriteResult::Buffered;
234 }
235
236 if data.len() >= self.config.bypass_threshold {
238 COALESCE_STATS.record_bypass(data.len());
239 return WriteResult::Bypass(data);
240 }
241
242 if self.first_write_time.is_none() {
244 self.first_write_time = Some(Instant::now());
245 }
246
247 self.buffer.extend_from_slice(&data);
249 self.writes_coalesced += 1;
250 self.total_bytes += data.len();
251
252 if self.config.collect_stats {
253 COALESCE_STATS.record_coalesce(data.len());
254 }
255
256 if self.should_flush() {
258 WriteResult::ShouldFlush
259 } else {
260 WriteResult::Buffered
261 }
262 }
263
264 #[inline]
266 pub fn should_flush(&self) -> bool {
267 if self.buffer.len() >= self.config.flush_threshold {
269 return true;
270 }
271
272 if self.buffer.len() >= self.config.max_buffer_size {
274 return true;
275 }
276
277 if self.config.flush_timeout_us > 0 {
279 if let Some(first_time) = self.first_write_time {
280 let elapsed_us = first_time.elapsed().as_micros() as u64;
281 if elapsed_us >= self.config.flush_timeout_us {
282 return true;
283 }
284 }
285 }
286
287 false
288 }
289
290 #[inline]
292 pub fn must_flush(&self) -> bool {
293 self.buffer.len() >= self.config.max_buffer_size
294 }
295
296 #[inline]
298 pub fn len(&self) -> usize {
299 self.buffer.len()
300 }
301
302 #[inline]
304 pub fn is_empty(&self) -> bool {
305 self.buffer.is_empty()
306 }
307
308 #[inline]
310 pub fn writes_coalesced(&self) -> usize {
311 self.writes_coalesced
312 }
313
314 #[inline]
316 pub fn total_bytes(&self) -> usize {
317 self.total_bytes
318 }
319
320 #[inline]
322 pub fn remaining_capacity(&self) -> usize {
323 self.config
324 .flush_threshold
325 .saturating_sub(self.buffer.len())
326 }
327
328 #[inline]
330 pub fn take(&mut self) -> Bytes {
331 let writes = self.writes_coalesced;
332 let bytes = self.buffer.len();
333
334 let data = self.buffer.split().freeze();
335
336 self.writes_coalesced = 0;
338 self.first_write_time = None;
339
340 if self.config.collect_stats && bytes > 0 {
341 COALESCE_STATS.record_flush(writes, bytes);
342 }
343
344 data
345 }
346
347 #[inline]
349 pub fn take_mut(&mut self) -> BytesMut {
350 let writes = self.writes_coalesced;
351 let bytes = self.buffer.len();
352
353 let data = self.buffer.split();
354
355 self.writes_coalesced = 0;
357 self.first_write_time = None;
358
359 if self.config.collect_stats && bytes > 0 {
360 COALESCE_STATS.record_flush(writes, bytes);
361 }
362
363 data
364 }
365
366 #[inline]
368 pub fn peek(&self) -> &[u8] {
369 &self.buffer
370 }
371
372 #[inline]
374 pub fn clear(&mut self) {
375 self.buffer.clear();
376 self.writes_coalesced = 0;
377 self.first_write_time = None;
378 }
379
380 pub fn reset(&mut self) {
382 self.buffer.clear();
383 self.writes_coalesced = 0;
384 self.first_write_time = None;
385 self.total_bytes = 0;
386 }
387
388 #[inline]
390 pub fn config(&self) -> &CoalesceConfig {
391 &self.config
392 }
393
394 #[inline]
396 pub fn reserve(&mut self, additional: usize) {
397 self.buffer.reserve(additional);
398 }
399
400 pub fn time_since_first_write(&self) -> Option<Duration> {
402 self.first_write_time.map(|t| t.elapsed())
403 }
404}
405
406#[derive(Debug)]
408pub enum WriteResult {
409 Buffered,
411 ShouldFlush,
413 Bypass(Bytes),
415}
416
417impl WriteResult {
418 #[inline]
420 pub fn is_buffered(&self) -> bool {
421 matches!(self, Self::Buffered)
422 }
423
424 #[inline]
426 pub fn should_flush(&self) -> bool {
427 matches!(self, Self::ShouldFlush)
428 }
429
430 #[inline]
432 pub fn is_bypass(&self) -> bool {
433 matches!(self, Self::Bypass(_))
434 }
435
436 #[inline]
438 pub fn take_bypass(self) -> Option<Bytes> {
439 match self {
440 Self::Bypass(data) => Some(data),
441 _ => None,
442 }
443 }
444}
445
446#[derive(Debug)]
452pub struct MultiBufferCoalescer {
453 headers: WriteCoalescer,
455 body: WriteCoalescer,
457 trailers: WriteCoalescer,
459}
460
461impl MultiBufferCoalescer {
462 pub fn new(config: CoalesceConfig) -> Self {
464 Self {
465 headers: WriteCoalescer::new(CoalesceConfig {
466 initial_capacity: 1024,
467 flush_threshold: 4096,
468 max_buffer_size: 16384,
469 bypass_threshold: 8192,
470 ..config.clone()
471 }),
472 body: WriteCoalescer::new(config.clone()),
473 trailers: WriteCoalescer::new(CoalesceConfig {
474 initial_capacity: 256,
475 flush_threshold: 1024,
476 max_buffer_size: 4096,
477 bypass_threshold: 2048,
478 ..config
479 }),
480 }
481 }
482
483 #[inline]
485 pub fn write_header(&mut self, data: &[u8]) -> WriteResult {
486 self.headers.write(data)
487 }
488
489 #[inline]
491 pub fn write_header_line(&mut self, name: &str, value: &str) {
492 self.headers.buffer.extend_from_slice(name.as_bytes());
493 self.headers.buffer.extend_from_slice(b": ");
494 self.headers.buffer.extend_from_slice(value.as_bytes());
495 self.headers.buffer.extend_from_slice(b"\r\n");
496 self.headers.writes_coalesced += 1;
497 }
498
499 #[inline]
501 pub fn write_body(&mut self, data: &[u8]) -> WriteResult {
502 self.body.write(data)
503 }
504
505 #[inline]
507 pub fn write_trailer(&mut self, data: &[u8]) -> WriteResult {
508 self.trailers.write(data)
509 }
510
511 #[inline]
513 pub fn should_flush(&self) -> bool {
514 self.headers.should_flush() || self.body.should_flush() || self.trailers.should_flush()
515 }
516
517 #[inline]
519 pub fn total_len(&self) -> usize {
520 self.headers.len() + self.body.len() + self.trailers.len()
521 }
522
523 pub fn take_combined(&mut self) -> Bytes {
525 let total = self.total_len();
526 if total == 0 {
527 return Bytes::new();
528 }
529
530 let mut combined = BytesMut::with_capacity(total);
531 combined.extend_from_slice(self.headers.peek());
532 combined.extend_from_slice(self.body.peek());
533 combined.extend_from_slice(self.trailers.peek());
534
535 self.headers.clear();
536 self.body.clear();
537 self.trailers.clear();
538
539 combined.freeze()
540 }
541
542 pub fn as_io_slices(&self) -> Vec<IoSlice<'_>> {
544 let mut slices = Vec::with_capacity(3);
545 if !self.headers.is_empty() {
546 slices.push(IoSlice::new(self.headers.peek()));
547 }
548 if !self.body.is_empty() {
549 slices.push(IoSlice::new(self.body.peek()));
550 }
551 if !self.trailers.is_empty() {
552 slices.push(IoSlice::new(self.trailers.peek()));
553 }
554 slices
555 }
556
557 pub fn reset(&mut self) {
559 self.headers.reset();
560 self.body.reset();
561 self.trailers.reset();
562 }
563}
564
565#[derive(Debug)]
571pub struct ConnectionWriteBuffer {
572 coalescer: WriteCoalescer,
574 pending_large: Vec<Bytes>,
576 #[allow(dead_code)]
578 connection_id: u64,
579 flushes: usize,
581}
582
583impl ConnectionWriteBuffer {
584 pub fn new(connection_id: u64, config: CoalesceConfig) -> Self {
586 Self {
587 coalescer: WriteCoalescer::new(config),
588 pending_large: Vec::new(),
589 connection_id,
590 flushes: 0,
591 }
592 }
593
594 #[inline]
596 pub fn write(&mut self, data: &[u8]) {
597 if let WriteResult::Bypass(bytes) = self.coalescer.write(data) {
598 self.pending_large.push(bytes);
599 }
600 }
601
602 #[inline]
604 pub fn write_bytes(&mut self, data: Bytes) {
605 if let WriteResult::Bypass(bytes) = self.coalescer.write_bytes(data) {
606 self.pending_large.push(bytes);
607 }
608 }
609
610 #[inline]
612 pub fn should_flush(&self) -> bool {
613 !self.pending_large.is_empty() || self.coalescer.should_flush()
614 }
615
616 pub fn take_all(&mut self) -> Vec<Bytes> {
618 let mut result = Vec::with_capacity(1 + self.pending_large.len());
619
620 if !self.coalescer.is_empty() {
622 result.push(self.coalescer.take());
623 }
624
625 result.append(&mut self.pending_large);
627
628 self.flushes += 1;
629 result
630 }
631
632 pub fn as_io_slices(&self) -> Vec<IoSlice<'_>> {
634 let mut slices = Vec::with_capacity(1 + self.pending_large.len());
635
636 if !self.coalescer.is_empty() {
637 slices.push(IoSlice::new(self.coalescer.peek()));
638 }
639
640 for large in &self.pending_large {
641 slices.push(IoSlice::new(large));
642 }
643
644 slices
645 }
646
647 #[inline]
649 pub fn pending_bytes(&self) -> usize {
650 let large_bytes: usize = self.pending_large.iter().map(|b| b.len()).sum();
651 self.coalescer.len() + large_bytes
652 }
653
654 #[inline]
656 pub fn flushes(&self) -> usize {
657 self.flushes
658 }
659
660 pub fn reset(&mut self) {
662 self.coalescer.reset();
663 self.pending_large.clear();
664 }
665}
666
667#[derive(Debug, Default)]
673pub struct CoalesceStats {
674 coalesced: AtomicU64,
676 bytes_coalesced: AtomicU64,
678 bypassed: AtomicU64,
680 bytes_bypassed: AtomicU64,
682 flushes: AtomicU64,
684 writes_per_flush_sum: AtomicU64,
686 max_writes_per_flush: AtomicUsize,
688}
689
690impl CoalesceStats {
691 pub fn new() -> Self {
693 Self::default()
694 }
695
696 #[inline]
698 pub fn record_coalesce(&self, bytes: usize) {
699 self.coalesced.fetch_add(1, Ordering::Relaxed);
700 self.bytes_coalesced
701 .fetch_add(bytes as u64, Ordering::Relaxed);
702 }
703
704 #[inline]
706 pub fn record_bypass(&self, bytes: usize) {
707 self.bypassed.fetch_add(1, Ordering::Relaxed);
708 self.bytes_bypassed
709 .fetch_add(bytes as u64, Ordering::Relaxed);
710 }
711
712 #[inline]
714 pub fn record_flush(&self, writes: usize, _bytes: usize) {
715 self.flushes.fetch_add(1, Ordering::Relaxed);
716 self.writes_per_flush_sum
717 .fetch_add(writes as u64, Ordering::Relaxed);
718 self.max_writes_per_flush
719 .fetch_max(writes, Ordering::Relaxed);
720 }
721
722 pub fn coalesced(&self) -> u64 {
724 self.coalesced.load(Ordering::Relaxed)
725 }
726
727 pub fn bytes_coalesced(&self) -> u64 {
729 self.bytes_coalesced.load(Ordering::Relaxed)
730 }
731
732 pub fn bypassed(&self) -> u64 {
734 self.bypassed.load(Ordering::Relaxed)
735 }
736
737 pub fn bytes_bypassed(&self) -> u64 {
739 self.bytes_bypassed.load(Ordering::Relaxed)
740 }
741
742 pub fn flushes(&self) -> u64 {
744 self.flushes.load(Ordering::Relaxed)
745 }
746
747 pub fn avg_writes_per_flush(&self) -> f64 {
749 let flushes = self.flushes();
750 let sum = self.writes_per_flush_sum.load(Ordering::Relaxed);
751 if flushes > 0 {
752 sum as f64 / flushes as f64
753 } else {
754 0.0
755 }
756 }
757
758 pub fn max_writes_per_flush(&self) -> usize {
760 self.max_writes_per_flush.load(Ordering::Relaxed)
761 }
762
763 pub fn coalesce_ratio(&self) -> f64 {
765 let coalesced = self.coalesced();
766 let bypassed = self.bypassed();
767 let total = coalesced + bypassed;
768 if total > 0 {
769 (coalesced as f64 / total as f64) * 100.0
770 } else {
771 0.0
772 }
773 }
774
775 pub fn syscall_reduction_ratio(&self) -> f64 {
780 let writes = self.coalesced();
781 let flushes = self.flushes();
782 if writes > 0 {
783 let saved = writes.saturating_sub(flushes);
784 (saved as f64 / writes as f64) * 100.0
785 } else {
786 0.0
787 }
788 }
789}
790
791static COALESCE_STATS: CoalesceStats = CoalesceStats {
793 coalesced: AtomicU64::new(0),
794 bytes_coalesced: AtomicU64::new(0),
795 bypassed: AtomicU64::new(0),
796 bytes_bypassed: AtomicU64::new(0),
797 flushes: AtomicU64::new(0),
798 writes_per_flush_sum: AtomicU64::new(0),
799 max_writes_per_flush: AtomicUsize::new(0),
800};
801
802pub fn coalesce_stats() -> &'static CoalesceStats {
804 &COALESCE_STATS
805}
806
807#[cfg(test)]
812mod tests {
813 use super::*;
814
815 #[test]
816 fn test_coalesce_config_default() {
817 let config = CoalesceConfig::default();
818 assert_eq!(config.initial_capacity, DEFAULT_COALESCE_CAPACITY);
819 assert_eq!(config.flush_threshold, DEFAULT_FLUSH_THRESHOLD);
820 }
821
822 #[test]
823 fn test_coalesce_config_presets() {
824 let high = CoalesceConfig::high_throughput();
825 assert!(high.flush_threshold > DEFAULT_FLUSH_THRESHOLD);
826
827 let low = CoalesceConfig::low_latency();
828 assert!(low.flush_threshold < DEFAULT_FLUSH_THRESHOLD);
829 }
830
831 #[test]
832 fn test_write_coalescer_basic() {
833 let mut coalescer = WriteCoalescer::new(CoalesceConfig::default());
834
835 let result = coalescer.write(b"Hello");
837 assert!(!result.is_bypass());
839 assert_eq!(coalescer.len(), 5);
840
841 coalescer.write(b", World!");
843 assert_eq!(coalescer.len(), 13);
844 assert_eq!(coalescer.writes_coalesced(), 2);
845
846 let data = coalescer.take();
848 assert_eq!(&data[..], b"Hello, World!");
849 assert!(coalescer.is_empty());
850 }
851
852 #[test]
853 fn test_write_coalescer_bypass() {
854 let config = CoalesceConfig {
855 bypass_threshold: 10,
856 ..Default::default()
857 };
858 let mut coalescer = WriteCoalescer::new(config);
859
860 let result = coalescer.write(b"This is a large write that exceeds threshold");
862 assert!(result.is_bypass());
863 assert!(coalescer.is_empty()); }
865
866 #[test]
867 fn test_write_coalescer_flush_threshold() {
868 let config = CoalesceConfig {
869 flush_threshold: 20,
870 ..Default::default()
871 };
872 let mut coalescer = WriteCoalescer::new(config);
873
874 coalescer.write(b"12345");
875 assert!(!coalescer.should_flush());
876
877 coalescer.write(b"1234567890");
878 assert!(!coalescer.should_flush()); coalescer.write(b"12345");
881 assert!(coalescer.should_flush()); }
883
884 #[test]
885 fn test_write_result_methods() {
886 let buffered = WriteResult::Buffered;
887 assert!(buffered.is_buffered());
888 assert!(!buffered.should_flush());
889 assert!(!buffered.is_bypass());
890
891 let should_flush = WriteResult::ShouldFlush;
892 assert!(!should_flush.is_buffered());
893 assert!(should_flush.should_flush());
894
895 let bypass = WriteResult::Bypass(Bytes::from_static(b"test"));
896 assert!(bypass.is_bypass());
897 if let Some(data) = bypass.take_bypass() {
898 assert_eq!(&data[..], b"test");
899 }
900 }
901
902 #[test]
903 fn test_multi_buffer_coalescer() {
904 let mut coalescer = MultiBufferCoalescer::new(CoalesceConfig::default());
905
906 coalescer.write_header(b"HTTP/1.1 200 OK\r\n");
907 coalescer.write_header_line("Content-Type", "text/plain");
908 coalescer.write_header(b"\r\n");
909 coalescer.write_body(b"Hello, World!");
910
911 assert!(coalescer.total_len() > 0);
912
913 let slices = coalescer.as_io_slices();
914 assert_eq!(slices.len(), 2); let combined = coalescer.take_combined();
917 assert!(!combined.is_empty());
918 }
919
920 #[test]
921 fn test_connection_write_buffer() {
922 let mut buffer = ConnectionWriteBuffer::new(1, CoalesceConfig::default());
923
924 buffer.write(b"Small write 1");
925 buffer.write(b"Small write 2");
926 buffer.write(b"Small write 3");
927
928 assert!(buffer.pending_bytes() > 0);
929
930 let data = buffer.take_all();
931 assert!(!data.is_empty());
932 assert_eq!(buffer.flushes(), 1);
933 }
934
935 #[test]
936 fn test_connection_write_buffer_large_bypass() {
937 let config = CoalesceConfig {
938 bypass_threshold: 10,
939 ..Default::default()
940 };
941 let mut buffer = ConnectionWriteBuffer::new(1, config);
942
943 buffer.write(b"Small");
944 buffer.write(b"This is a large write that will be bypassed");
945
946 let slices = buffer.as_io_slices();
948 assert_eq!(slices.len(), 2);
949 }
950
951 #[test]
952 fn test_coalesce_stats() {
953 let stats = coalesce_stats();
954 let _ = stats.coalesced();
956 let _ = stats.bypassed();
957 let _ = stats.flushes();
958 let _ = stats.coalesce_ratio();
959 let _ = stats.syscall_reduction_ratio();
960 }
961
962 #[test]
963 fn test_take_mut() {
964 let mut coalescer = WriteCoalescer::new(CoalesceConfig::default());
965 coalescer.write(b"Hello");
966
967 let mut buf = coalescer.take_mut();
968 buf.extend_from_slice(b", World!");
969
970 assert_eq!(&buf[..], b"Hello, World!");
971 assert!(coalescer.is_empty());
972 }
973
974 #[test]
975 fn test_reserve() {
976 let mut coalescer = WriteCoalescer::new(CoalesceConfig::default());
977 coalescer.reserve(10000);
978 coalescer.write(b"Now we have plenty of space");
979 assert!(coalescer.len() < 10000);
980 }
981}