armature_core/
write_coalesce.rs

1//! Write Buffer Coalescing Module
2//!
3//! This module provides write buffer coalescing to combine multiple small
4//! writes into a single larger write, reducing syscall overhead and improving
5//! TCP efficiency.
6//!
7//! ## The Problem
8//!
9//! Without coalescing, each small write triggers:
10//! 1. A syscall (expensive context switch)
11//! 2. Potential TCP small packet issues (Nagle's algorithm)
12//! 3. Multiple kernel copies
13//!
14//! ```text
15//! write("HTTP/1.1 200 OK\r\n")     → syscall
16//! write("Content-Type: text/plain\r\n") → syscall
17//! write("Content-Length: 5\r\n")   → syscall
18//! write("\r\n")                     → syscall
19//! write("Hello")                    → syscall
20//! Total: 5 syscalls
21//! ```
22//!
23//! ## The Solution
24//!
25//! With coalescing:
26//! ```text
27//! coalesce("HTTP/1.1 200 OK\r\n")
28//! coalesce("Content-Type: text/plain\r\n")
29//! coalesce("Content-Length: 5\r\n")
30//! coalesce("\r\n")
31//! coalesce("Hello")
32//! flush() → single syscall with entire response
33//! Total: 1 syscall
34//! ```
35//!
36//! ## Usage
37//!
38//! ```rust,ignore
39//! use armature_core::write_coalesce::{WriteCoalescer, CoalesceConfig};
40//!
41//! let mut coalescer = WriteCoalescer::new(CoalesceConfig::default());
42//!
43//! // Small writes are buffered
44//! coalescer.write(b"HTTP/1.1 200 OK\r\n");
45//! coalescer.write(b"Content-Type: application/json\r\n");
46//! coalescer.write(b"\r\n");
47//! coalescer.write(b"{\"status\":\"ok\"}");
48//!
49//! // Flush when ready (or auto-flush on threshold)
50//! let data = coalescer.take();
51//! socket.write_all(&data).await?;
52//! ```
53
54use bytes::{Bytes, BytesMut};
55use std::io::IoSlice;
56use std::sync::atomic::{AtomicU64, AtomicUsize, Ordering};
57use std::time::{Duration, Instant};
58
59// ============================================================================
60// Constants
61// ============================================================================
62
63/// Default initial buffer capacity (4KB)
64pub const DEFAULT_COALESCE_CAPACITY: usize = 4096;
65
66/// Default flush threshold (16KB)
67pub const DEFAULT_FLUSH_THRESHOLD: usize = 16384;
68
69/// Minimum write size before coalescing (below this, always coalesce)
70pub const MIN_COALESCE_SIZE: usize = 512;
71
72/// Maximum coalesce buffer size (1MB)
73pub const MAX_COALESCE_BUFFER: usize = 1024 * 1024;
74
75/// Default flush timeout (100 microseconds)
76pub const DEFAULT_FLUSH_TIMEOUT_US: u64 = 100;
77
78// ============================================================================
79// Configuration
80// ============================================================================
81
82/// Configuration for write buffer coalescing.
83#[derive(Debug, Clone)]
84pub struct CoalesceConfig {
85    /// Initial buffer capacity
86    pub initial_capacity: usize,
87    /// Flush when buffer exceeds this size
88    pub flush_threshold: usize,
89    /// Maximum buffer size before forcing flush
90    pub max_buffer_size: usize,
91    /// Minimum write size to bypass coalescing (write directly)
92    pub bypass_threshold: usize,
93    /// Auto-flush timeout (microseconds, 0 = disabled)
94    pub flush_timeout_us: u64,
95    /// Enable TCP_CORK during coalescing (Linux only)
96    pub use_tcp_cork: bool,
97    /// Collect statistics
98    pub collect_stats: bool,
99}
100
101impl Default for CoalesceConfig {
102    fn default() -> Self {
103        Self {
104            initial_capacity: DEFAULT_COALESCE_CAPACITY,
105            flush_threshold: DEFAULT_FLUSH_THRESHOLD,
106            max_buffer_size: MAX_COALESCE_BUFFER,
107            bypass_threshold: 65536, // 64KB writes go direct
108            flush_timeout_us: DEFAULT_FLUSH_TIMEOUT_US,
109            use_tcp_cork: true,
110            collect_stats: true,
111        }
112    }
113}
114
115impl CoalesceConfig {
116    /// Create configuration for high throughput.
117    pub fn high_throughput() -> Self {
118        Self {
119            initial_capacity: 8192,
120            flush_threshold: 32768,
121            max_buffer_size: MAX_COALESCE_BUFFER,
122            bypass_threshold: 131072, // 128KB
123            flush_timeout_us: 500,    // Longer timeout for more batching
124            use_tcp_cork: true,
125            collect_stats: false,
126        }
127    }
128
129    /// Create configuration for low latency.
130    pub fn low_latency() -> Self {
131        Self {
132            initial_capacity: 2048,
133            flush_threshold: 4096,
134            max_buffer_size: 65536,
135            bypass_threshold: 16384, // 16KB
136            flush_timeout_us: 10,    // Very short timeout
137            use_tcp_cork: false,
138            collect_stats: false,
139        }
140    }
141
142    /// Create configuration for memory efficiency.
143    pub fn memory_efficient() -> Self {
144        Self {
145            initial_capacity: 1024,
146            flush_threshold: 8192,
147            max_buffer_size: 65536,
148            bypass_threshold: 32768,
149            flush_timeout_us: 200,
150            use_tcp_cork: true,
151            collect_stats: true,
152        }
153    }
154}
155
156// ============================================================================
157// Write Coalescer
158// ============================================================================
159
160/// A write buffer that coalesces small writes into larger batches.
161#[derive(Debug)]
162pub struct WriteCoalescer {
163    /// Internal buffer
164    buffer: BytesMut,
165    /// Configuration
166    config: CoalesceConfig,
167    /// Number of writes coalesced
168    writes_coalesced: usize,
169    /// First write timestamp (for timeout)
170    first_write_time: Option<Instant>,
171    /// Total bytes written to this coalescer
172    total_bytes: usize,
173}
174
175impl WriteCoalescer {
176    /// Create a new write coalescer with default configuration.
177    pub fn new(config: CoalesceConfig) -> Self {
178        Self {
179            buffer: BytesMut::with_capacity(config.initial_capacity),
180            config,
181            writes_coalesced: 0,
182            first_write_time: None,
183            total_bytes: 0,
184        }
185    }
186
187    /// Create with default configuration.
188    pub fn default_config() -> Self {
189        Self::new(CoalesceConfig::default())
190    }
191
192    /// Write data to the coalesce buffer.
193    ///
194    /// Returns `WriteResult` indicating what action should be taken.
195    #[inline]
196    pub fn write(&mut self, data: &[u8]) -> WriteResult {
197        if data.is_empty() {
198            return WriteResult::Buffered;
199        }
200
201        // Large writes bypass coalescing
202        if data.len() >= self.config.bypass_threshold {
203            COALESCE_STATS.record_bypass(data.len());
204            return WriteResult::Bypass(Bytes::copy_from_slice(data));
205        }
206
207        // Track first write time for timeout
208        if self.first_write_time.is_none() {
209            self.first_write_time = Some(Instant::now());
210        }
211
212        // Append to buffer
213        self.buffer.extend_from_slice(data);
214        self.writes_coalesced += 1;
215        self.total_bytes += data.len();
216
217        if self.config.collect_stats {
218            COALESCE_STATS.record_coalesce(data.len());
219        }
220
221        // Check if we should flush
222        if self.should_flush() {
223            WriteResult::ShouldFlush
224        } else {
225            WriteResult::Buffered
226        }
227    }
228
229    /// Write bytes directly (owned).
230    #[inline]
231    pub fn write_bytes(&mut self, data: Bytes) -> WriteResult {
232        if data.is_empty() {
233            return WriteResult::Buffered;
234        }
235
236        // Large writes bypass coalescing
237        if data.len() >= self.config.bypass_threshold {
238            COALESCE_STATS.record_bypass(data.len());
239            return WriteResult::Bypass(data);
240        }
241
242        // Track first write time for timeout
243        if self.first_write_time.is_none() {
244            self.first_write_time = Some(Instant::now());
245        }
246
247        // Append to buffer
248        self.buffer.extend_from_slice(&data);
249        self.writes_coalesced += 1;
250        self.total_bytes += data.len();
251
252        if self.config.collect_stats {
253            COALESCE_STATS.record_coalesce(data.len());
254        }
255
256        // Check if we should flush
257        if self.should_flush() {
258            WriteResult::ShouldFlush
259        } else {
260            WriteResult::Buffered
261        }
262    }
263
264    /// Check if buffer should be flushed.
265    #[inline]
266    pub fn should_flush(&self) -> bool {
267        // Size threshold
268        if self.buffer.len() >= self.config.flush_threshold {
269            return true;
270        }
271
272        // Max buffer size
273        if self.buffer.len() >= self.config.max_buffer_size {
274            return true;
275        }
276
277        // Timeout (if enabled)
278        if self.config.flush_timeout_us > 0 {
279            if let Some(first_time) = self.first_write_time {
280                let elapsed_us = first_time.elapsed().as_micros() as u64;
281                if elapsed_us >= self.config.flush_timeout_us {
282                    return true;
283                }
284            }
285        }
286
287        false
288    }
289
290    /// Check if buffer must be flushed (hard limits reached).
291    #[inline]
292    pub fn must_flush(&self) -> bool {
293        self.buffer.len() >= self.config.max_buffer_size
294    }
295
296    /// Get current buffer length.
297    #[inline]
298    pub fn len(&self) -> usize {
299        self.buffer.len()
300    }
301
302    /// Check if buffer is empty.
303    #[inline]
304    pub fn is_empty(&self) -> bool {
305        self.buffer.is_empty()
306    }
307
308    /// Get number of writes coalesced.
309    #[inline]
310    pub fn writes_coalesced(&self) -> usize {
311        self.writes_coalesced
312    }
313
314    /// Get total bytes written.
315    #[inline]
316    pub fn total_bytes(&self) -> usize {
317        self.total_bytes
318    }
319
320    /// Get remaining capacity before flush threshold.
321    #[inline]
322    pub fn remaining_capacity(&self) -> usize {
323        self.config
324            .flush_threshold
325            .saturating_sub(self.buffer.len())
326    }
327
328    /// Take the buffered data, resetting the coalescer.
329    #[inline]
330    pub fn take(&mut self) -> Bytes {
331        let writes = self.writes_coalesced;
332        let bytes = self.buffer.len();
333
334        let data = self.buffer.split().freeze();
335
336        // Reset state
337        self.writes_coalesced = 0;
338        self.first_write_time = None;
339
340        if self.config.collect_stats && bytes > 0 {
341            COALESCE_STATS.record_flush(writes, bytes);
342        }
343
344        data
345    }
346
347    /// Take the buffered data as BytesMut (for further modification).
348    #[inline]
349    pub fn take_mut(&mut self) -> BytesMut {
350        let writes = self.writes_coalesced;
351        let bytes = self.buffer.len();
352
353        let data = self.buffer.split();
354
355        // Reset state
356        self.writes_coalesced = 0;
357        self.first_write_time = None;
358
359        if self.config.collect_stats && bytes > 0 {
360            COALESCE_STATS.record_flush(writes, bytes);
361        }
362
363        data
364    }
365
366    /// Peek at the buffered data without taking it.
367    #[inline]
368    pub fn peek(&self) -> &[u8] {
369        &self.buffer
370    }
371
372    /// Clear the buffer without returning data.
373    #[inline]
374    pub fn clear(&mut self) {
375        self.buffer.clear();
376        self.writes_coalesced = 0;
377        self.first_write_time = None;
378    }
379
380    /// Reset the coalescer completely.
381    pub fn reset(&mut self) {
382        self.buffer.clear();
383        self.writes_coalesced = 0;
384        self.first_write_time = None;
385        self.total_bytes = 0;
386    }
387
388    /// Get configuration.
389    #[inline]
390    pub fn config(&self) -> &CoalesceConfig {
391        &self.config
392    }
393
394    /// Reserve additional capacity.
395    #[inline]
396    pub fn reserve(&mut self, additional: usize) {
397        self.buffer.reserve(additional);
398    }
399
400    /// Get time since first write (for timeout checking).
401    pub fn time_since_first_write(&self) -> Option<Duration> {
402        self.first_write_time.map(|t| t.elapsed())
403    }
404}
405
406/// Result of a write operation.
407#[derive(Debug)]
408pub enum WriteResult {
409    /// Data was buffered, no action needed
410    Buffered,
411    /// Buffer should be flushed (threshold reached)
412    ShouldFlush,
413    /// Data bypassed coalescing (too large), write directly
414    Bypass(Bytes),
415}
416
417impl WriteResult {
418    /// Check if write was buffered.
419    #[inline]
420    pub fn is_buffered(&self) -> bool {
421        matches!(self, Self::Buffered)
422    }
423
424    /// Check if flush is needed.
425    #[inline]
426    pub fn should_flush(&self) -> bool {
427        matches!(self, Self::ShouldFlush)
428    }
429
430    /// Check if write was bypassed.
431    #[inline]
432    pub fn is_bypass(&self) -> bool {
433        matches!(self, Self::Bypass(_))
434    }
435
436    /// Take bypass data if present.
437    #[inline]
438    pub fn take_bypass(self) -> Option<Bytes> {
439        match self {
440            Self::Bypass(data) => Some(data),
441            _ => None,
442        }
443    }
444}
445
446// ============================================================================
447// Multi-Buffer Coalescer
448// ============================================================================
449
450/// Coalescer that manages multiple buffers for different purposes.
451#[derive(Debug)]
452pub struct MultiBufferCoalescer {
453    /// Headers buffer
454    headers: WriteCoalescer,
455    /// Body buffer
456    body: WriteCoalescer,
457    /// Trailer buffer (for chunked encoding)
458    trailers: WriteCoalescer,
459}
460
461impl MultiBufferCoalescer {
462    /// Create a new multi-buffer coalescer.
463    pub fn new(config: CoalesceConfig) -> Self {
464        Self {
465            headers: WriteCoalescer::new(CoalesceConfig {
466                initial_capacity: 1024,
467                flush_threshold: 4096,
468                max_buffer_size: 16384,
469                bypass_threshold: 8192,
470                ..config.clone()
471            }),
472            body: WriteCoalescer::new(config.clone()),
473            trailers: WriteCoalescer::new(CoalesceConfig {
474                initial_capacity: 256,
475                flush_threshold: 1024,
476                max_buffer_size: 4096,
477                bypass_threshold: 2048,
478                ..config
479            }),
480        }
481    }
482
483    /// Write to headers buffer.
484    #[inline]
485    pub fn write_header(&mut self, data: &[u8]) -> WriteResult {
486        self.headers.write(data)
487    }
488
489    /// Write a header line (name: value\r\n).
490    #[inline]
491    pub fn write_header_line(&mut self, name: &str, value: &str) {
492        self.headers.buffer.extend_from_slice(name.as_bytes());
493        self.headers.buffer.extend_from_slice(b": ");
494        self.headers.buffer.extend_from_slice(value.as_bytes());
495        self.headers.buffer.extend_from_slice(b"\r\n");
496        self.headers.writes_coalesced += 1;
497    }
498
499    /// Write to body buffer.
500    #[inline]
501    pub fn write_body(&mut self, data: &[u8]) -> WriteResult {
502        self.body.write(data)
503    }
504
505    /// Write to trailers buffer.
506    #[inline]
507    pub fn write_trailer(&mut self, data: &[u8]) -> WriteResult {
508        self.trailers.write(data)
509    }
510
511    /// Check if any buffer should be flushed.
512    #[inline]
513    pub fn should_flush(&self) -> bool {
514        self.headers.should_flush() || self.body.should_flush() || self.trailers.should_flush()
515    }
516
517    /// Get total buffered size.
518    #[inline]
519    pub fn total_len(&self) -> usize {
520        self.headers.len() + self.body.len() + self.trailers.len()
521    }
522
523    /// Take all buffers and combine into a single Bytes.
524    pub fn take_combined(&mut self) -> Bytes {
525        let total = self.total_len();
526        if total == 0 {
527            return Bytes::new();
528        }
529
530        let mut combined = BytesMut::with_capacity(total);
531        combined.extend_from_slice(self.headers.peek());
532        combined.extend_from_slice(self.body.peek());
533        combined.extend_from_slice(self.trailers.peek());
534
535        self.headers.clear();
536        self.body.clear();
537        self.trailers.clear();
538
539        combined.freeze()
540    }
541
542    /// Get IoSlices for vectored I/O.
543    pub fn as_io_slices(&self) -> Vec<IoSlice<'_>> {
544        let mut slices = Vec::with_capacity(3);
545        if !self.headers.is_empty() {
546            slices.push(IoSlice::new(self.headers.peek()));
547        }
548        if !self.body.is_empty() {
549            slices.push(IoSlice::new(self.body.peek()));
550        }
551        if !self.trailers.is_empty() {
552            slices.push(IoSlice::new(self.trailers.peek()));
553        }
554        slices
555    }
556
557    /// Reset all buffers.
558    pub fn reset(&mut self) {
559        self.headers.reset();
560        self.body.reset();
561        self.trailers.reset();
562    }
563}
564
565// ============================================================================
566// Connection Write Buffer
567// ============================================================================
568
569/// Per-connection write buffer with coalescing and flush management.
570#[derive(Debug)]
571pub struct ConnectionWriteBuffer {
572    /// Main coalescer
573    coalescer: WriteCoalescer,
574    /// Pending large writes (bypassed coalescing)
575    pending_large: Vec<Bytes>,
576    /// Connection ID (for logging)
577    #[allow(dead_code)]
578    connection_id: u64,
579    /// Total flushes
580    flushes: usize,
581}
582
583impl ConnectionWriteBuffer {
584    /// Create a new connection write buffer.
585    pub fn new(connection_id: u64, config: CoalesceConfig) -> Self {
586        Self {
587            coalescer: WriteCoalescer::new(config),
588            pending_large: Vec::new(),
589            connection_id,
590            flushes: 0,
591        }
592    }
593
594    /// Write data to the buffer.
595    #[inline]
596    pub fn write(&mut self, data: &[u8]) {
597        if let WriteResult::Bypass(bytes) = self.coalescer.write(data) {
598            self.pending_large.push(bytes);
599        }
600    }
601
602    /// Write owned bytes.
603    #[inline]
604    pub fn write_bytes(&mut self, data: Bytes) {
605        if let WriteResult::Bypass(bytes) = self.coalescer.write_bytes(data) {
606            self.pending_large.push(bytes);
607        }
608    }
609
610    /// Check if ready to flush.
611    #[inline]
612    pub fn should_flush(&self) -> bool {
613        !self.pending_large.is_empty() || self.coalescer.should_flush()
614    }
615
616    /// Get all data ready for writing.
617    pub fn take_all(&mut self) -> Vec<Bytes> {
618        let mut result = Vec::with_capacity(1 + self.pending_large.len());
619
620        // Coalesced data first
621        if !self.coalescer.is_empty() {
622            result.push(self.coalescer.take());
623        }
624
625        // Then large writes
626        result.append(&mut self.pending_large);
627
628        self.flushes += 1;
629        result
630    }
631
632    /// Get IoSlices for all pending data.
633    pub fn as_io_slices(&self) -> Vec<IoSlice<'_>> {
634        let mut slices = Vec::with_capacity(1 + self.pending_large.len());
635
636        if !self.coalescer.is_empty() {
637            slices.push(IoSlice::new(self.coalescer.peek()));
638        }
639
640        for large in &self.pending_large {
641            slices.push(IoSlice::new(large));
642        }
643
644        slices
645    }
646
647    /// Get total pending bytes.
648    #[inline]
649    pub fn pending_bytes(&self) -> usize {
650        let large_bytes: usize = self.pending_large.iter().map(|b| b.len()).sum();
651        self.coalescer.len() + large_bytes
652    }
653
654    /// Get number of flushes.
655    #[inline]
656    pub fn flushes(&self) -> usize {
657        self.flushes
658    }
659
660    /// Reset the buffer.
661    pub fn reset(&mut self) {
662        self.coalescer.reset();
663        self.pending_large.clear();
664    }
665}
666
667// ============================================================================
668// Statistics
669// ============================================================================
670
671/// Statistics for write coalescing.
672#[derive(Debug, Default)]
673pub struct CoalesceStats {
674    /// Total writes coalesced
675    coalesced: AtomicU64,
676    /// Total bytes coalesced
677    bytes_coalesced: AtomicU64,
678    /// Total writes bypassed
679    bypassed: AtomicU64,
680    /// Total bytes bypassed
681    bytes_bypassed: AtomicU64,
682    /// Total flushes
683    flushes: AtomicU64,
684    /// Total writes per flush (sum)
685    writes_per_flush_sum: AtomicU64,
686    /// Maximum writes per flush
687    max_writes_per_flush: AtomicUsize,
688}
689
690impl CoalesceStats {
691    /// Create new stats.
692    pub fn new() -> Self {
693        Self::default()
694    }
695
696    /// Record a coalesced write.
697    #[inline]
698    pub fn record_coalesce(&self, bytes: usize) {
699        self.coalesced.fetch_add(1, Ordering::Relaxed);
700        self.bytes_coalesced
701            .fetch_add(bytes as u64, Ordering::Relaxed);
702    }
703
704    /// Record a bypassed write.
705    #[inline]
706    pub fn record_bypass(&self, bytes: usize) {
707        self.bypassed.fetch_add(1, Ordering::Relaxed);
708        self.bytes_bypassed
709            .fetch_add(bytes as u64, Ordering::Relaxed);
710    }
711
712    /// Record a flush.
713    #[inline]
714    pub fn record_flush(&self, writes: usize, _bytes: usize) {
715        self.flushes.fetch_add(1, Ordering::Relaxed);
716        self.writes_per_flush_sum
717            .fetch_add(writes as u64, Ordering::Relaxed);
718        self.max_writes_per_flush
719            .fetch_max(writes, Ordering::Relaxed);
720    }
721
722    /// Get total coalesced writes.
723    pub fn coalesced(&self) -> u64 {
724        self.coalesced.load(Ordering::Relaxed)
725    }
726
727    /// Get total bytes coalesced.
728    pub fn bytes_coalesced(&self) -> u64 {
729        self.bytes_coalesced.load(Ordering::Relaxed)
730    }
731
732    /// Get total bypassed writes.
733    pub fn bypassed(&self) -> u64 {
734        self.bypassed.load(Ordering::Relaxed)
735    }
736
737    /// Get total bytes bypassed.
738    pub fn bytes_bypassed(&self) -> u64 {
739        self.bytes_bypassed.load(Ordering::Relaxed)
740    }
741
742    /// Get total flushes.
743    pub fn flushes(&self) -> u64 {
744        self.flushes.load(Ordering::Relaxed)
745    }
746
747    /// Get average writes per flush.
748    pub fn avg_writes_per_flush(&self) -> f64 {
749        let flushes = self.flushes();
750        let sum = self.writes_per_flush_sum.load(Ordering::Relaxed);
751        if flushes > 0 {
752            sum as f64 / flushes as f64
753        } else {
754            0.0
755        }
756    }
757
758    /// Get max writes per flush.
759    pub fn max_writes_per_flush(&self) -> usize {
760        self.max_writes_per_flush.load(Ordering::Relaxed)
761    }
762
763    /// Get coalesce ratio (higher is better).
764    pub fn coalesce_ratio(&self) -> f64 {
765        let coalesced = self.coalesced();
766        let bypassed = self.bypassed();
767        let total = coalesced + bypassed;
768        if total > 0 {
769            (coalesced as f64 / total as f64) * 100.0
770        } else {
771            0.0
772        }
773    }
774
775    /// Get syscall reduction ratio.
776    ///
777    /// This estimates how many syscalls were saved by coalescing.
778    /// Formula: (writes_coalesced - flushes) / writes_coalesced * 100
779    pub fn syscall_reduction_ratio(&self) -> f64 {
780        let writes = self.coalesced();
781        let flushes = self.flushes();
782        if writes > 0 {
783            let saved = writes.saturating_sub(flushes);
784            (saved as f64 / writes as f64) * 100.0
785        } else {
786            0.0
787        }
788    }
789}
790
791/// Global coalesce statistics.
792static COALESCE_STATS: CoalesceStats = CoalesceStats {
793    coalesced: AtomicU64::new(0),
794    bytes_coalesced: AtomicU64::new(0),
795    bypassed: AtomicU64::new(0),
796    bytes_bypassed: AtomicU64::new(0),
797    flushes: AtomicU64::new(0),
798    writes_per_flush_sum: AtomicU64::new(0),
799    max_writes_per_flush: AtomicUsize::new(0),
800};
801
802/// Get global coalesce statistics.
803pub fn coalesce_stats() -> &'static CoalesceStats {
804    &COALESCE_STATS
805}
806
807// ============================================================================
808// Tests
809// ============================================================================
810
811#[cfg(test)]
812mod tests {
813    use super::*;
814
815    #[test]
816    fn test_coalesce_config_default() {
817        let config = CoalesceConfig::default();
818        assert_eq!(config.initial_capacity, DEFAULT_COALESCE_CAPACITY);
819        assert_eq!(config.flush_threshold, DEFAULT_FLUSH_THRESHOLD);
820    }
821
822    #[test]
823    fn test_coalesce_config_presets() {
824        let high = CoalesceConfig::high_throughput();
825        assert!(high.flush_threshold > DEFAULT_FLUSH_THRESHOLD);
826
827        let low = CoalesceConfig::low_latency();
828        assert!(low.flush_threshold < DEFAULT_FLUSH_THRESHOLD);
829    }
830
831    #[test]
832    fn test_write_coalescer_basic() {
833        let mut coalescer = WriteCoalescer::new(CoalesceConfig::default());
834
835        // Small write should be buffered
836        let result = coalescer.write(b"Hello");
837        // Result can be Buffered or ShouldFlush depending on timing
838        assert!(!result.is_bypass());
839        assert_eq!(coalescer.len(), 5);
840
841        // Another small write
842        coalescer.write(b", World!");
843        assert_eq!(coalescer.len(), 13);
844        assert_eq!(coalescer.writes_coalesced(), 2);
845
846        // Take data
847        let data = coalescer.take();
848        assert_eq!(&data[..], b"Hello, World!");
849        assert!(coalescer.is_empty());
850    }
851
852    #[test]
853    fn test_write_coalescer_bypass() {
854        let config = CoalesceConfig {
855            bypass_threshold: 10,
856            ..Default::default()
857        };
858        let mut coalescer = WriteCoalescer::new(config);
859
860        // Large write should bypass
861        let result = coalescer.write(b"This is a large write that exceeds threshold");
862        assert!(result.is_bypass());
863        assert!(coalescer.is_empty()); // Not buffered
864    }
865
866    #[test]
867    fn test_write_coalescer_flush_threshold() {
868        let config = CoalesceConfig {
869            flush_threshold: 20,
870            ..Default::default()
871        };
872        let mut coalescer = WriteCoalescer::new(config);
873
874        coalescer.write(b"12345");
875        assert!(!coalescer.should_flush());
876
877        coalescer.write(b"1234567890");
878        assert!(!coalescer.should_flush()); // 15 bytes
879
880        coalescer.write(b"12345");
881        assert!(coalescer.should_flush()); // 20 bytes = threshold
882    }
883
884    #[test]
885    fn test_write_result_methods() {
886        let buffered = WriteResult::Buffered;
887        assert!(buffered.is_buffered());
888        assert!(!buffered.should_flush());
889        assert!(!buffered.is_bypass());
890
891        let should_flush = WriteResult::ShouldFlush;
892        assert!(!should_flush.is_buffered());
893        assert!(should_flush.should_flush());
894
895        let bypass = WriteResult::Bypass(Bytes::from_static(b"test"));
896        assert!(bypass.is_bypass());
897        if let Some(data) = bypass.take_bypass() {
898            assert_eq!(&data[..], b"test");
899        }
900    }
901
902    #[test]
903    fn test_multi_buffer_coalescer() {
904        let mut coalescer = MultiBufferCoalescer::new(CoalesceConfig::default());
905
906        coalescer.write_header(b"HTTP/1.1 200 OK\r\n");
907        coalescer.write_header_line("Content-Type", "text/plain");
908        coalescer.write_header(b"\r\n");
909        coalescer.write_body(b"Hello, World!");
910
911        assert!(coalescer.total_len() > 0);
912
913        let slices = coalescer.as_io_slices();
914        assert_eq!(slices.len(), 2); // headers + body
915
916        let combined = coalescer.take_combined();
917        assert!(!combined.is_empty());
918    }
919
920    #[test]
921    fn test_connection_write_buffer() {
922        let mut buffer = ConnectionWriteBuffer::new(1, CoalesceConfig::default());
923
924        buffer.write(b"Small write 1");
925        buffer.write(b"Small write 2");
926        buffer.write(b"Small write 3");
927
928        assert!(buffer.pending_bytes() > 0);
929
930        let data = buffer.take_all();
931        assert!(!data.is_empty());
932        assert_eq!(buffer.flushes(), 1);
933    }
934
935    #[test]
936    fn test_connection_write_buffer_large_bypass() {
937        let config = CoalesceConfig {
938            bypass_threshold: 10,
939            ..Default::default()
940        };
941        let mut buffer = ConnectionWriteBuffer::new(1, config);
942
943        buffer.write(b"Small");
944        buffer.write(b"This is a large write that will be bypassed");
945
946        // Should have coalesced + pending large
947        let slices = buffer.as_io_slices();
948        assert_eq!(slices.len(), 2);
949    }
950
951    #[test]
952    fn test_coalesce_stats() {
953        let stats = coalesce_stats();
954        // Just verify we can read stats
955        let _ = stats.coalesced();
956        let _ = stats.bypassed();
957        let _ = stats.flushes();
958        let _ = stats.coalesce_ratio();
959        let _ = stats.syscall_reduction_ratio();
960    }
961
962    #[test]
963    fn test_take_mut() {
964        let mut coalescer = WriteCoalescer::new(CoalesceConfig::default());
965        coalescer.write(b"Hello");
966
967        let mut buf = coalescer.take_mut();
968        buf.extend_from_slice(b", World!");
969
970        assert_eq!(&buf[..], b"Hello, World!");
971        assert!(coalescer.is_empty());
972    }
973
974    #[test]
975    fn test_reserve() {
976        let mut coalescer = WriteCoalescer::new(CoalesceConfig::default());
977        coalescer.reserve(10000);
978        coalescer.write(b"Now we have plenty of space");
979        assert!(coalescer.len() < 10000);
980    }
981}