armature_core/
connection.rs

1//! Optimized Connection State Machine
2//!
3//! This module provides a high-performance connection state machine (FSM) with
4//! minimized branching for HTTP connection handling. Key optimizations:
5//!
6//! - Branchless state transitions using lookup tables
7//! - Compact state representation (single byte)
8//! - Pre-computed action tables for state/event combinations
9//! - Cache-friendly state layout
10//!
11//! ## State Machine
12//!
13//! ```text
14//!                 ┌─────────────┐
15//!                 │    Idle     │◄───────────────┐
16//!                 └──────┬──────┘                │
17//!                        │ accept               │
18//!                        ▼                      │
19//!                 ┌─────────────┐               │
20//!             ┌───│  Connected  │───┐           │
21//!             │   └──────┬──────┘   │           │
22//!        error│          │ data     │ timeout   │
23//!             │          ▼          │           │
24//!             │   ┌─────────────┐   │           │
25//!             │   │   Reading   │───┤           │
26//!             │   └──────┬──────┘   │           │
27//!             │          │ complete │           │
28//!             │          ▼          │           │
29//!             │   ┌─────────────┐   │           │
30//!             │   │ Processing  │───┤           │
31//!             │   └──────┬──────┘   │           │
32//!             │          │ ready    │           │
33//!             │          ▼          │           │
34//!             │   ┌─────────────┐   │           │
35//!             │   │   Writing   │───┤           │
36//!             │   └──────┬──────┘   │           │
37//!             │          │ done     │ keep-alive│
38//!             │          ▼          └───────────┤
39//!             │   ┌─────────────┐               │
40//!             └──►│   Closing   │───────────────┘
41//!                 └─────────────┘
42//! ```
43//!
44//! ## Performance Characteristics
45//!
46//! - State transition: O(1) table lookup, no branching
47//! - Event dispatch: Single array index, predicted by CPU
48//! - Memory: 1 byte per connection state + 8 bytes metadata
49//!
50//! ## Usage
51//!
52//! ```rust,ignore
53//! use armature_core::connection::{Connection, ConnectionEvent};
54//!
55//! let mut conn = Connection::new();
56//!
57//! // Process events - branchless transitions
58//! conn.handle_event(ConnectionEvent::Accept);
59//! conn.handle_event(ConnectionEvent::DataReady);
60//! conn.handle_event(ConnectionEvent::RequestComplete);
61//! conn.handle_event(ConnectionEvent::ResponseReady);
62//! conn.handle_event(ConnectionEvent::WriteComplete);
63//! ```
64
65use std::sync::atomic::{AtomicU64, Ordering};
66use std::time::{Duration, Instant};
67
68// ============================================================================
69// Connection State (Compact Representation)
70// ============================================================================
71
72/// Connection state as a single byte for cache efficiency.
73///
74/// Using `#[repr(u8)]` ensures the enum is exactly 1 byte,
75/// enabling branchless table lookups via direct indexing.
76#[repr(u8)]
77#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash, Default)]
78pub enum ConnectionState {
79    /// Connection slot is idle/available
80    #[default]
81    Idle = 0,
82    /// TCP connection established, awaiting first request
83    Connected = 1,
84    /// Reading request headers/body
85    Reading = 2,
86    /// Processing request (handler executing)
87    Processing = 3,
88    /// Writing response
89    Writing = 4,
90    /// Connection closing (graceful)
91    Closing = 5,
92    /// Connection closed
93    Closed = 6,
94    /// Error state
95    Error = 7,
96}
97
98impl ConnectionState {
99    /// Number of states (for table sizing).
100    pub const COUNT: usize = 8;
101
102    /// Convert from u8 (branchless).
103    #[inline(always)]
104    pub const fn from_u8(v: u8) -> Self {
105        // SAFETY: All values 0-7 are valid states, clamp others to Error
106        match v {
107            0 => Self::Idle,
108            1 => Self::Connected,
109            2 => Self::Reading,
110            3 => Self::Processing,
111            4 => Self::Writing,
112            5 => Self::Closing,
113            6 => Self::Closed,
114            7 => Self::Error,
115            _ => Self::Error,
116        }
117    }
118
119    /// Convert to u8 (zero-cost).
120    #[inline(always)]
121    pub const fn as_u8(self) -> u8 {
122        self as u8
123    }
124
125    /// Check if terminal state.
126    #[inline(always)]
127    pub const fn is_terminal(self) -> bool {
128        matches!(self, Self::Closed | Self::Error)
129    }
130
131    /// Check if can accept new request.
132    #[inline(always)]
133    pub const fn can_accept_request(self) -> bool {
134        matches!(self, Self::Connected | Self::Idle)
135    }
136
137    /// Check if active (processing a request).
138    #[inline(always)]
139    pub const fn is_active(self) -> bool {
140        matches!(self, Self::Reading | Self::Processing | Self::Writing)
141    }
142}
143
144// ============================================================================
145// Connection Events
146// ============================================================================
147
148/// Events that trigger state transitions.
149#[repr(u8)]
150#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash)]
151pub enum ConnectionEvent {
152    /// New connection accepted
153    Accept = 0,
154    /// Data available for reading
155    DataReady = 1,
156    /// Request fully read
157    RequestComplete = 2,
158    /// Response ready to send
159    ResponseReady = 3,
160    /// Write completed
161    WriteComplete = 4,
162    /// Keep-alive: ready for next request
163    KeepAlive = 5,
164    /// Timeout occurred
165    Timeout = 6,
166    /// Error occurred
167    Error = 7,
168    /// Close requested
169    Close = 8,
170}
171
172impl ConnectionEvent {
173    /// Number of events (for table sizing).
174    pub const COUNT: usize = 9;
175
176    /// Convert to u8.
177    #[inline(always)]
178    pub const fn as_u8(self) -> u8 {
179        self as u8
180    }
181}
182
183// ============================================================================
184// Transition Action
185// ============================================================================
186
187/// Action to take during state transition.
188#[repr(u8)]
189#[derive(Debug, Clone, Copy, PartialEq, Eq)]
190pub enum TransitionAction {
191    /// No action needed
192    None = 0,
193    /// Start reading request
194    StartRead = 1,
195    /// Continue reading
196    ContinueRead = 2,
197    /// Dispatch to handler
198    Dispatch = 3,
199    /// Start writing response
200    StartWrite = 4,
201    /// Continue writing
202    ContinueWrite = 5,
203    /// Reset for keep-alive
204    Reset = 6,
205    /// Initiate close
206    InitiateClose = 7,
207    /// Force close (error)
208    ForceClose = 8,
209    /// Log error
210    LogError = 9,
211}
212
213// ============================================================================
214// Transition Table (Branchless Lookup)
215// ============================================================================
216
217/// Pre-computed state transition table.
218///
219/// Layout: `TRANSITION_TABLE[current_state][event] = (next_state, action)`
220///
221/// This eliminates branching by using array indexing instead of match statements.
222/// The table is const and will be embedded in the binary.
223const TRANSITION_TABLE: [[TransitionEntry; ConnectionEvent::COUNT]; ConnectionState::COUNT] = {
224    use ConnectionState as S;
225    use TransitionAction as A;
226
227    // Helper to create entries
228    const fn e(state: S, action: A) -> TransitionEntry {
229        TransitionEntry {
230            next_state: state.as_u8(),
231            action: action as u8,
232        }
233    }
234
235    // Default: stay in current state, no action
236    const NOOP_IDLE: TransitionEntry = e(S::Idle, A::None);
237    const NOOP_CONN: TransitionEntry = e(S::Connected, A::None);
238    const NOOP_READ: TransitionEntry = e(S::Reading, A::None);
239    const NOOP_PROC: TransitionEntry = e(S::Processing, A::None);
240    const NOOP_WRIT: TransitionEntry = e(S::Writing, A::None);
241    const NOOP_CLOS: TransitionEntry = e(S::Closing, A::None);
242    const NOOP_CLSD: TransitionEntry = e(S::Closed, A::None);
243    const NOOP_ERR: TransitionEntry = e(S::Error, A::None);
244
245    // Error transitions
246    const TO_ERROR: TransitionEntry = e(S::Error, A::ForceClose);
247    const TO_CLOSE: TransitionEntry = e(S::Closing, A::InitiateClose);
248    const TO_CLOSED: TransitionEntry = e(S::Closed, A::None);
249
250    [
251        // Idle state: waiting for connection
252        // Events: Accept, DataReady, ReqComplete, RespReady, WriteComplete, KeepAlive, Timeout, Error, Close
253        [
254            e(S::Connected, A::StartRead), // Accept
255            NOOP_IDLE,                     // DataReady (ignore)
256            NOOP_IDLE,                     // RequestComplete (ignore)
257            NOOP_IDLE,                     // ResponseReady (ignore)
258            NOOP_IDLE,                     // WriteComplete (ignore)
259            NOOP_IDLE,                     // KeepAlive (ignore)
260            NOOP_IDLE,                     // Timeout (ignore)
261            NOOP_IDLE,                     // Error (ignore)
262            NOOP_IDLE,                     // Close (ignore)
263        ],
264        // Connected state: connection established
265        [
266            NOOP_CONN,                      // Accept (ignore)
267            e(S::Reading, A::ContinueRead), // DataReady -> Reading
268            NOOP_CONN,                      // RequestComplete (ignore)
269            NOOP_CONN,                      // ResponseReady (ignore)
270            NOOP_CONN,                      // WriteComplete (ignore)
271            NOOP_CONN,                      // KeepAlive (ignore)
272            TO_CLOSE,                       // Timeout -> Closing
273            TO_ERROR,                       // Error -> Error
274            TO_CLOSE,                       // Close -> Closing
275        ],
276        // Reading state: reading request
277        [
278            NOOP_READ,                      // Accept (ignore)
279            e(S::Reading, A::ContinueRead), // DataReady -> continue reading
280            e(S::Processing, A::Dispatch),  // RequestComplete -> Processing
281            NOOP_READ,                      // ResponseReady (ignore)
282            NOOP_READ,                      // WriteComplete (ignore)
283            NOOP_READ,                      // KeepAlive (ignore)
284            TO_CLOSE,                       // Timeout -> Closing
285            TO_ERROR,                       // Error -> Error
286            TO_CLOSE,                       // Close -> Closing
287        ],
288        // Processing state: executing handler
289        [
290            NOOP_PROC,                    // Accept (ignore)
291            NOOP_PROC,                    // DataReady (ignore)
292            NOOP_PROC,                    // RequestComplete (ignore)
293            e(S::Writing, A::StartWrite), // ResponseReady -> Writing
294            NOOP_PROC,                    // WriteComplete (ignore)
295            NOOP_PROC,                    // KeepAlive (ignore)
296            TO_CLOSE,                     // Timeout -> Closing
297            TO_ERROR,                     // Error -> Error
298            TO_CLOSE,                     // Close -> Closing
299        ],
300        // Writing state: writing response
301        [
302            NOOP_WRIT,                       // Accept (ignore)
303            NOOP_WRIT,                       // DataReady (ignore)
304            NOOP_WRIT,                       // RequestComplete (ignore)
305            NOOP_WRIT,                       // ResponseReady (ignore)
306            e(S::Closing, A::InitiateClose), // WriteComplete -> Closing (or KeepAlive)
307            e(S::Connected, A::Reset),       // KeepAlive -> Connected
308            TO_CLOSE,                        // Timeout -> Closing
309            TO_ERROR,                        // Error -> Error
310            TO_CLOSE,                        // Close -> Closing
311        ],
312        // Closing state: graceful shutdown
313        [
314            NOOP_CLOS, // Accept (ignore)
315            NOOP_CLOS, // DataReady (ignore)
316            NOOP_CLOS, // RequestComplete (ignore)
317            NOOP_CLOS, // ResponseReady (ignore)
318            TO_CLOSED, // WriteComplete -> Closed
319            NOOP_CLOS, // KeepAlive (ignore)
320            TO_CLOSED, // Timeout -> Closed
321            TO_CLOSED, // Error -> Closed
322            TO_CLOSED, // Close -> Closed
323        ],
324        // Closed state: terminal
325        [
326            NOOP_CLSD, NOOP_CLSD, NOOP_CLSD, NOOP_CLSD, NOOP_CLSD, NOOP_CLSD, NOOP_CLSD, NOOP_CLSD,
327            NOOP_CLSD,
328        ],
329        // Error state: terminal
330        [
331            NOOP_ERR, NOOP_ERR, NOOP_ERR, NOOP_ERR, NOOP_ERR, NOOP_ERR, NOOP_ERR, NOOP_ERR,
332            NOOP_ERR,
333        ],
334    ]
335};
336
337/// Packed transition entry (2 bytes).
338#[derive(Clone, Copy)]
339struct TransitionEntry {
340    next_state: u8,
341    action: u8,
342}
343
344impl TransitionEntry {
345    #[inline(always)]
346    fn state(self) -> ConnectionState {
347        ConnectionState::from_u8(self.next_state)
348    }
349
350    #[inline(always)]
351    fn action(self) -> TransitionAction {
352        // SAFETY: action values are always valid TransitionAction discriminants
353        unsafe { std::mem::transmute(self.action) }
354    }
355}
356
357// ============================================================================
358// Connection State Machine
359// ============================================================================
360
361/// High-performance connection state machine.
362///
363/// Optimized for minimal branching and cache efficiency.
364#[derive(Debug)]
365pub struct Connection {
366    /// Current state (1 byte)
367    state: ConnectionState,
368    /// Keep-alive enabled
369    keep_alive: bool,
370    /// Request count on this connection
371    request_count: u32,
372    /// Connection established time
373    connected_at: Option<Instant>,
374    /// Last activity time
375    last_activity: Option<Instant>,
376    /// Connection ID (for logging/debugging)
377    id: u64,
378}
379
380impl Connection {
381    /// Create a new connection in idle state.
382    #[inline]
383    pub fn new() -> Self {
384        Self {
385            state: ConnectionState::Idle,
386            keep_alive: true,
387            request_count: 0,
388            connected_at: None,
389            last_activity: None,
390            id: CONNECTION_ID_COUNTER.fetch_add(1, Ordering::Relaxed),
391        }
392    }
393
394    /// Create with specific ID.
395    #[inline]
396    pub fn with_id(id: u64) -> Self {
397        Self {
398            state: ConnectionState::Idle,
399            keep_alive: true,
400            request_count: 0,
401            connected_at: None,
402            last_activity: None,
403            id,
404        }
405    }
406
407    /// Get current state.
408    #[inline(always)]
409    pub fn state(&self) -> ConnectionState {
410        self.state
411    }
412
413    /// Get connection ID.
414    #[inline(always)]
415    pub fn id(&self) -> u64 {
416        self.id
417    }
418
419    /// Get request count.
420    #[inline(always)]
421    pub fn request_count(&self) -> u32 {
422        self.request_count
423    }
424
425    /// Check if keep-alive is enabled.
426    #[inline(always)]
427    pub fn keep_alive(&self) -> bool {
428        self.keep_alive
429    }
430
431    /// Set keep-alive.
432    #[inline(always)]
433    pub fn set_keep_alive(&mut self, enabled: bool) {
434        self.keep_alive = enabled;
435    }
436
437    /// Get time since connection established.
438    #[inline]
439    pub fn age(&self) -> Option<Duration> {
440        self.connected_at.map(|t| t.elapsed())
441    }
442
443    /// Get time since last activity.
444    #[inline]
445    pub fn idle_time(&self) -> Option<Duration> {
446        self.last_activity.map(|t| t.elapsed())
447    }
448
449    /// Handle an event with branchless state transition.
450    ///
451    /// Returns the action to perform.
452    #[inline]
453    pub fn handle_event(&mut self, event: ConnectionEvent) -> TransitionAction {
454        // Branchless table lookup
455        let entry = TRANSITION_TABLE[self.state.as_u8() as usize][event.as_u8() as usize];
456
457        // Update state
458        let new_state = entry.state();
459        let action = entry.action();
460
461        // Track state changes
462        if new_state != self.state {
463            self.on_state_change(new_state);
464        }
465
466        self.state = new_state;
467        self.last_activity = Some(Instant::now());
468
469        action
470    }
471
472    /// Try transition with validation.
473    ///
474    /// Returns `Ok(action)` if transition is valid, `Err` otherwise.
475    #[inline]
476    pub fn try_transition(
477        &mut self,
478        event: ConnectionEvent,
479    ) -> Result<TransitionAction, TransitionError> {
480        let entry = TRANSITION_TABLE[self.state.as_u8() as usize][event.as_u8() as usize];
481        let new_state = entry.state();
482        let action = entry.action();
483
484        // Check if this is a meaningful transition
485        if new_state == self.state && action == TransitionAction::None {
486            return Err(TransitionError::InvalidTransition {
487                from: self.state,
488                event,
489            });
490        }
491
492        if new_state != self.state {
493            self.on_state_change(new_state);
494        }
495
496        self.state = new_state;
497        self.last_activity = Some(Instant::now());
498
499        Ok(action)
500    }
501
502    /// Called on state change.
503    #[inline]
504    fn on_state_change(&mut self, new_state: ConnectionState) {
505        match new_state {
506            ConnectionState::Connected => {
507                self.connected_at = Some(Instant::now());
508                CONNECTION_STATS.record_connected();
509            }
510            ConnectionState::Reading => {
511                // New request starting
512            }
513            ConnectionState::Processing => {
514                self.request_count += 1;
515            }
516            ConnectionState::Closed => {
517                CONNECTION_STATS.record_closed();
518            }
519            ConnectionState::Error => {
520                CONNECTION_STATS.record_error();
521            }
522            _ => {}
523        }
524    }
525
526    /// Reset connection for reuse (keep-alive).
527    #[inline]
528    pub fn reset(&mut self) {
529        self.state = ConnectionState::Connected;
530        self.last_activity = Some(Instant::now());
531        CONNECTION_STATS.record_reuse();
532    }
533
534    /// Force close the connection.
535    #[inline]
536    pub fn force_close(&mut self) {
537        if !self.state.is_terminal() {
538            self.state = ConnectionState::Closed;
539            CONNECTION_STATS.record_closed();
540        }
541    }
542
543    /// Check if connection should be closed (idle timeout).
544    #[inline]
545    pub fn should_close(&self, idle_timeout: Duration) -> bool {
546        if self.state.is_terminal() {
547            return true;
548        }
549
550        if let Some(idle_time) = self.idle_time() {
551            if idle_time > idle_timeout {
552                return true;
553            }
554        }
555
556        false
557    }
558}
559
560impl Default for Connection {
561    fn default() -> Self {
562        Self::new()
563    }
564}
565
566// ============================================================================
567// Recyclable Trait
568// ============================================================================
569
570/// Trait for objects that can be reset and reused.
571///
572/// Implementing this trait allows objects to be efficiently recycled
573/// in a pool, avoiding allocation overhead.
574pub trait Recyclable {
575    /// Reset the object to its initial state for reuse.
576    ///
577    /// This should clear all request-specific data while preserving
578    /// pooled resources like buffers.
579    fn reset(&mut self);
580
581    /// Check if the object is clean and ready for reuse.
582    fn is_clean(&self) -> bool;
583
584    /// Get the generation counter (for use-after-recycle detection).
585    fn generation(&self) -> u64;
586
587    /// Increment the generation counter.
588    fn increment_generation(&mut self);
589}
590
591// ============================================================================
592// Recyclable Connection
593// ============================================================================
594
595/// Extended connection with full recycling support.
596#[derive(Debug)]
597pub struct RecyclableConnection {
598    /// Base connection state machine
599    inner: Connection,
600    /// Generation counter for use-after-recycle detection
601    generation: u64,
602    /// Total recycle count for this slot
603    recycle_count: u64,
604    /// Pre-allocated read buffer capacity
605    read_buffer_capacity: usize,
606    /// Pre-allocated write buffer capacity
607    write_buffer_capacity: usize,
608    /// Custom user data slot (type-erased)
609    user_data: Option<Box<dyn std::any::Any + Send + Sync>>,
610}
611
612impl RecyclableConnection {
613    /// Create a new recyclable connection.
614    pub fn new() -> Self {
615        Self {
616            inner: Connection::new(),
617            generation: 0,
618            recycle_count: 0,
619            read_buffer_capacity: 0,
620            write_buffer_capacity: 0,
621            user_data: None,
622        }
623    }
624
625    /// Create with specific ID.
626    pub fn with_id(id: u64) -> Self {
627        Self {
628            inner: Connection::with_id(id),
629            generation: 0,
630            recycle_count: 0,
631            read_buffer_capacity: 0,
632            write_buffer_capacity: 0,
633            user_data: None,
634        }
635    }
636
637    /// Create with buffer capacity hints.
638    pub fn with_capacities(read_capacity: usize, write_capacity: usize) -> Self {
639        Self {
640            inner: Connection::new(),
641            generation: 0,
642            recycle_count: 0,
643            read_buffer_capacity: read_capacity,
644            write_buffer_capacity: write_capacity,
645            user_data: None,
646        }
647    }
648
649    /// Get the inner connection.
650    #[inline(always)]
651    pub fn inner(&self) -> &Connection {
652        &self.inner
653    }
654
655    /// Get mutable inner connection.
656    #[inline(always)]
657    pub fn inner_mut(&mut self) -> &mut Connection {
658        &mut self.inner
659    }
660
661    /// Get total recycle count.
662    #[inline(always)]
663    pub fn recycle_count(&self) -> u64 {
664        self.recycle_count
665    }
666
667    /// Get read buffer capacity hint.
668    #[inline(always)]
669    pub fn read_buffer_capacity(&self) -> usize {
670        self.read_buffer_capacity
671    }
672
673    /// Get write buffer capacity hint.
674    #[inline(always)]
675    pub fn write_buffer_capacity(&self) -> usize {
676        self.write_buffer_capacity
677    }
678
679    /// Set buffer capacity hints for next use.
680    #[inline]
681    pub fn set_buffer_capacities(&mut self, read: usize, write: usize) {
682        self.read_buffer_capacity = read;
683        self.write_buffer_capacity = write;
684    }
685
686    /// Set user data.
687    pub fn set_user_data<T: std::any::Any + Send + Sync + 'static>(&mut self, data: T) {
688        self.user_data = Some(Box::new(data));
689    }
690
691    /// Get user data.
692    pub fn user_data<T: std::any::Any + Send + Sync + 'static>(&self) -> Option<&T> {
693        self.user_data.as_ref()?.downcast_ref::<T>()
694    }
695
696    /// Take user data.
697    pub fn take_user_data<T: std::any::Any + Send + Sync + 'static>(&mut self) -> Option<T> {
698        let boxed = self.user_data.take()?;
699        boxed.downcast::<T>().ok().map(|b| *b)
700    }
701
702    /// Handle event (delegates to inner).
703    #[inline]
704    pub fn handle_event(&mut self, event: ConnectionEvent) -> TransitionAction {
705        self.inner.handle_event(event)
706    }
707
708    /// Get state (delegates to inner).
709    #[inline(always)]
710    pub fn state(&self) -> ConnectionState {
711        self.inner.state()
712    }
713
714    /// Get connection ID (delegates to inner).
715    #[inline(always)]
716    pub fn id(&self) -> u64 {
717        self.inner.id()
718    }
719
720    /// Prepare for recycling - clean up and return to pool.
721    pub fn prepare_for_recycle(&mut self) {
722        // Clear user data
723        self.user_data = None;
724        // Reset inner connection
725        self.inner.state = ConnectionState::Idle;
726        self.inner.keep_alive = true;
727        self.inner.request_count = 0;
728        self.inner.connected_at = None;
729        self.inner.last_activity = None;
730        // Increment counters
731        self.generation += 1;
732        self.recycle_count += 1;
733        // Record stats
734        RECYCLE_STATS.record_recycle();
735    }
736}
737
738impl Default for RecyclableConnection {
739    fn default() -> Self {
740        Self::new()
741    }
742}
743
744impl Recyclable for RecyclableConnection {
745    fn reset(&mut self) {
746        self.prepare_for_recycle();
747    }
748
749    fn is_clean(&self) -> bool {
750        self.inner.state() == ConnectionState::Idle && self.user_data.is_none()
751    }
752
753    fn generation(&self) -> u64 {
754        self.generation
755    }
756
757    fn increment_generation(&mut self) {
758        self.generation += 1;
759    }
760}
761
762// ============================================================================
763// Generic Recycle Pool
764// ============================================================================
765
766/// Generic object pool with recycling support.
767///
768/// This pool maintains a set of pre-allocated objects that can be
769/// acquired, used, and returned for reuse.
770#[derive(Debug)]
771pub struct RecyclePool<T: Recyclable + Default> {
772    /// Pooled objects
773    objects: Vec<T>,
774    /// Free list (indices of available objects)
775    free_indices: Vec<usize>,
776    /// Pool capacity
777    capacity: usize,
778    /// High water mark (max concurrent usage)
779    high_water_mark: usize,
780    /// Configuration
781    #[allow(dead_code)] // Reserved for future dynamic pool management
782    config: RecyclePoolConfig,
783}
784
785impl<T: Recyclable + Default> RecyclePool<T> {
786    /// Create a new pool with capacity.
787    pub fn new(capacity: usize) -> Self {
788        Self::with_config(capacity, RecyclePoolConfig::default())
789    }
790
791    /// Create with custom configuration.
792    pub fn with_config(capacity: usize, config: RecyclePoolConfig) -> Self {
793        let mut objects = Vec::with_capacity(capacity);
794        let mut free_indices = Vec::with_capacity(capacity);
795
796        for i in 0..capacity {
797            objects.push(T::default());
798            free_indices.push(i);
799        }
800
801        Self {
802            objects,
803            free_indices,
804            capacity,
805            high_water_mark: 0,
806            config,
807        }
808    }
809
810    /// Acquire an object from the pool.
811    ///
812    /// Returns a handle that automatically returns the object when dropped.
813    #[inline]
814    pub fn acquire(&mut self) -> Option<PoolHandle<'_, T>> {
815        let index = self.free_indices.pop()?;
816        let obj = &mut self.objects[index];
817
818        // Ensure object is clean
819        if !obj.is_clean() {
820            obj.reset();
821        }
822
823        // Track high water mark
824        let active = self.capacity - self.free_indices.len();
825        if active > self.high_water_mark {
826            self.high_water_mark = active;
827        }
828
829        RECYCLE_STATS.record_acquire();
830
831        Some(PoolHandle {
832            pool: self,
833            index,
834            released: false,
835            _marker: std::marker::PhantomData,
836        })
837    }
838
839    /// Try to acquire without blocking.
840    #[inline]
841    pub fn try_acquire(&mut self) -> Option<PoolHandle<'_, T>> {
842        self.acquire()
843    }
844
845    /// Release an object back to the pool.
846    #[inline]
847    fn release(&mut self, index: usize) {
848        if index < self.capacity {
849            // Reset object for reuse
850            self.objects[index].reset();
851            self.free_indices.push(index);
852            RECYCLE_STATS.record_release();
853        }
854    }
855
856    /// Get object by index (for internal use).
857    #[inline]
858    pub fn get(&self, index: usize) -> Option<&T> {
859        self.objects.get(index)
860    }
861
862    /// Get mutable object by index.
863    #[inline]
864    pub fn get_mut(&mut self, index: usize) -> Option<&mut T> {
865        self.objects.get_mut(index)
866    }
867
868    /// Get pool capacity.
869    #[inline]
870    pub fn capacity(&self) -> usize {
871        self.capacity
872    }
873
874    /// Get available count.
875    #[inline]
876    pub fn available(&self) -> usize {
877        self.free_indices.len()
878    }
879
880    /// Get active count.
881    #[inline]
882    pub fn active(&self) -> usize {
883        self.capacity - self.free_indices.len()
884    }
885
886    /// Get high water mark.
887    #[inline]
888    pub fn high_water_mark(&self) -> usize {
889        self.high_water_mark
890    }
891
892    /// Reset high water mark.
893    #[inline]
894    pub fn reset_high_water_mark(&mut self) {
895        self.high_water_mark = self.active();
896    }
897
898    /// Check if pool is exhausted.
899    #[inline]
900    pub fn is_exhausted(&self) -> bool {
901        self.free_indices.is_empty()
902    }
903
904    /// Shrink pool to minimum size.
905    pub fn shrink(&mut self, min_capacity: usize) {
906        let target = min_capacity.max(self.active());
907        if target < self.capacity {
908            // Only shrink if we have excess free slots
909            while self.capacity > target && !self.free_indices.is_empty() {
910                if let Some(index) = self.free_indices.pop() {
911                    // Mark as removed (but keep in vec to avoid reindexing)
912                    self.objects[index].reset();
913                }
914                self.capacity -= 1;
915            }
916        }
917    }
918
919    /// Grow pool capacity.
920    pub fn grow(&mut self, additional: usize) {
921        let new_capacity = self.capacity + additional;
922        self.objects.reserve(additional);
923
924        for _ in 0..additional {
925            let index = self.objects.len();
926            self.objects.push(T::default());
927            self.free_indices.push(index);
928        }
929
930        self.capacity = new_capacity;
931    }
932}
933
934/// Pool handle - RAII wrapper for acquired objects.
935#[derive(Debug)]
936pub struct PoolHandle<'a, T: Recyclable + Default> {
937    pool: *mut RecyclePool<T>,
938    index: usize,
939    released: bool,
940    _marker: std::marker::PhantomData<&'a mut T>,
941}
942
943impl<'a, T: Recyclable + Default> PoolHandle<'a, T> {
944    /// Get reference to the object.
945    #[inline]
946    pub fn get(&self) -> &T {
947        unsafe { &(&(*self.pool).objects)[self.index] }
948    }
949
950    /// Get mutable reference.
951    #[inline]
952    pub fn get_mut(&mut self) -> &mut T {
953        unsafe { &mut (&mut (*self.pool).objects)[self.index] }
954    }
955
956    /// Get the pool index.
957    #[inline]
958    pub fn index(&self) -> usize {
959        self.index
960    }
961
962    /// Get the generation of the object.
963    #[inline]
964    pub fn generation(&self) -> u64 {
965        self.get().generation()
966    }
967
968    /// Release back to pool explicitly.
969    #[inline]
970    pub fn release(mut self) {
971        if !self.released {
972            unsafe { (*self.pool).release(self.index) };
973            self.released = true;
974        }
975    }
976}
977
978impl<'a, T: Recyclable + Default> Drop for PoolHandle<'a, T> {
979    fn drop(&mut self) {
980        if !self.released {
981            unsafe { (*self.pool).release(self.index) };
982        }
983    }
984}
985
986impl<'a, T: Recyclable + Default> std::ops::Deref for PoolHandle<'a, T> {
987    type Target = T;
988
989    #[inline]
990    fn deref(&self) -> &Self::Target {
991        self.get()
992    }
993}
994
995impl<'a, T: Recyclable + Default> std::ops::DerefMut for PoolHandle<'a, T> {
996    #[inline]
997    fn deref_mut(&mut self) -> &mut Self::Target {
998        self.get_mut()
999    }
1000}
1001
1002// ============================================================================
1003// Recycle Pool Configuration
1004// ============================================================================
1005
1006/// Configuration for recycle pools.
1007#[derive(Debug, Clone)]
1008pub struct RecyclePoolConfig {
1009    /// Initial capacity
1010    pub initial_capacity: usize,
1011    /// Maximum capacity (0 = unlimited)
1012    pub max_capacity: usize,
1013    /// Grow by this amount when exhausted
1014    pub grow_by: usize,
1015    /// Shrink when usage drops below this fraction
1016    pub shrink_threshold: f32,
1017    /// Minimum capacity to maintain
1018    pub min_capacity: usize,
1019}
1020
1021impl Default for RecyclePoolConfig {
1022    fn default() -> Self {
1023        Self {
1024            initial_capacity: 100,
1025            max_capacity: 10000,
1026            grow_by: 50,
1027            shrink_threshold: 0.25,
1028            min_capacity: 10,
1029        }
1030    }
1031}
1032
1033impl RecyclePoolConfig {
1034    /// Create new configuration.
1035    pub fn new() -> Self {
1036        Self::default()
1037    }
1038
1039    /// Set initial capacity.
1040    pub fn initial_capacity(mut self, capacity: usize) -> Self {
1041        self.initial_capacity = capacity;
1042        self
1043    }
1044
1045    /// Set maximum capacity.
1046    pub fn max_capacity(mut self, max: usize) -> Self {
1047        self.max_capacity = max;
1048        self
1049    }
1050
1051    /// Set grow-by amount.
1052    pub fn grow_by(mut self, amount: usize) -> Self {
1053        self.grow_by = amount;
1054        self
1055    }
1056
1057    /// Set shrink threshold.
1058    pub fn shrink_threshold(mut self, threshold: f32) -> Self {
1059        self.shrink_threshold = threshold;
1060        self
1061    }
1062
1063    /// Set minimum capacity.
1064    pub fn min_capacity(mut self, min: usize) -> Self {
1065        self.min_capacity = min;
1066        self
1067    }
1068}
1069
1070// ============================================================================
1071// Recycling Statistics
1072// ============================================================================
1073
1074/// Statistics for object recycling.
1075#[derive(Debug, Default)]
1076pub struct RecycleStats {
1077    /// Total acquires
1078    acquires: AtomicU64,
1079    /// Total releases
1080    releases: AtomicU64,
1081    /// Total recycles (resets)
1082    recycles: AtomicU64,
1083    /// Total allocations (when pool exhausted)
1084    allocations: AtomicU64,
1085}
1086
1087impl RecycleStats {
1088    /// Create new stats.
1089    pub fn new() -> Self {
1090        Self::default()
1091    }
1092
1093    #[inline]
1094    fn record_acquire(&self) {
1095        self.acquires.fetch_add(1, Ordering::Relaxed);
1096    }
1097
1098    #[inline]
1099    fn record_release(&self) {
1100        self.releases.fetch_add(1, Ordering::Relaxed);
1101    }
1102
1103    #[inline]
1104    fn record_recycle(&self) {
1105        self.recycles.fetch_add(1, Ordering::Relaxed);
1106    }
1107
1108    #[inline]
1109    #[allow(dead_code)] // Reserved for future pool allocation tracking
1110    fn record_allocation(&self) {
1111        self.allocations.fetch_add(1, Ordering::Relaxed);
1112    }
1113
1114    /// Get acquire count.
1115    pub fn acquires(&self) -> u64 {
1116        self.acquires.load(Ordering::Relaxed)
1117    }
1118
1119    /// Get release count.
1120    pub fn releases(&self) -> u64 {
1121        self.releases.load(Ordering::Relaxed)
1122    }
1123
1124    /// Get recycle count.
1125    pub fn recycles(&self) -> u64 {
1126        self.recycles.load(Ordering::Relaxed)
1127    }
1128
1129    /// Get allocation count.
1130    pub fn allocations(&self) -> u64 {
1131        self.allocations.load(Ordering::Relaxed)
1132    }
1133
1134    /// Get recycle ratio (recycles / acquires).
1135    pub fn recycle_ratio(&self) -> f64 {
1136        let acquires = self.acquires() as f64;
1137        if acquires > 0.0 {
1138            self.recycles() as f64 / acquires
1139        } else {
1140            0.0
1141        }
1142    }
1143
1144    /// Get hit ratio (1 - allocations/acquires).
1145    pub fn hit_ratio(&self) -> f64 {
1146        let acquires = self.acquires() as f64;
1147        if acquires > 0.0 {
1148            1.0 - (self.allocations() as f64 / acquires)
1149        } else {
1150            1.0
1151        }
1152    }
1153}
1154
1155/// Global recycle statistics.
1156static RECYCLE_STATS: RecycleStats = RecycleStats {
1157    acquires: AtomicU64::new(0),
1158    releases: AtomicU64::new(0),
1159    recycles: AtomicU64::new(0),
1160    allocations: AtomicU64::new(0),
1161};
1162
1163/// Get global recycle statistics.
1164pub fn recycle_stats() -> &'static RecycleStats {
1165    &RECYCLE_STATS
1166}
1167
1168// ============================================================================
1169// Connection Recycler
1170// ============================================================================
1171
1172/// Specialized connection recycler with optimizations.
1173pub struct ConnectionRecycler {
1174    /// Pool of recyclable connections
1175    pool: RecyclePool<RecyclableConnection>,
1176    /// Configuration
1177    config: ConnectionConfig,
1178}
1179
1180impl ConnectionRecycler {
1181    /// Create a new connection recycler.
1182    pub fn new(capacity: usize) -> Self {
1183        Self::with_config(capacity, ConnectionConfig::default())
1184    }
1185
1186    /// Create with custom configuration.
1187    pub fn with_config(capacity: usize, config: ConnectionConfig) -> Self {
1188        let mut pool: RecyclePool<RecyclableConnection> = RecyclePool::new(capacity);
1189
1190        // Pre-configure all connections
1191        for i in 0..capacity {
1192            if let Some(conn) = pool.get_mut(i) {
1193                conn.inner_mut().set_keep_alive(config.keep_alive);
1194            }
1195        }
1196
1197        Self { pool, config }
1198    }
1199
1200    /// Acquire a connection.
1201    #[inline]
1202    pub fn acquire(&mut self) -> Option<PoolHandle<'_, RecyclableConnection>> {
1203        let handle = self.pool.acquire()?;
1204        Some(handle)
1205    }
1206
1207    /// Get pool statistics.
1208    pub fn stats(&self) -> RecyclerStats {
1209        RecyclerStats {
1210            capacity: self.pool.capacity(),
1211            available: self.pool.available(),
1212            active: self.pool.active(),
1213            high_water_mark: self.pool.high_water_mark(),
1214        }
1215    }
1216
1217    /// Get configuration.
1218    pub fn config(&self) -> &ConnectionConfig {
1219        &self.config
1220    }
1221
1222    /// Check if a connection should be recycled based on limits.
1223    pub fn should_recycle(&self, conn: &RecyclableConnection) -> bool {
1224        // Check max requests
1225        if conn.inner().request_count() >= self.config.max_requests {
1226            return false;
1227        }
1228
1229        // Check keep-alive
1230        if !conn.inner().keep_alive() {
1231            return false;
1232        }
1233
1234        // Check age
1235        if let Some(age) = conn.inner().age() {
1236            if age > self.config.idle_timeout * 10 {
1237                return false;
1238            }
1239        }
1240
1241        true
1242    }
1243}
1244
1245/// Connection recycler statistics.
1246#[derive(Debug, Clone, Copy)]
1247pub struct RecyclerStats {
1248    /// Pool capacity
1249    pub capacity: usize,
1250    /// Available connections
1251    pub available: usize,
1252    /// Active connections
1253    pub active: usize,
1254    /// High water mark
1255    pub high_water_mark: usize,
1256}
1257
1258// ============================================================================
1259// Transition Error
1260// ============================================================================
1261
1262/// Error during state transition.
1263#[derive(Debug, Clone)]
1264pub enum TransitionError {
1265    /// Invalid transition for current state
1266    InvalidTransition {
1267        from: ConnectionState,
1268        event: ConnectionEvent,
1269    },
1270    /// Connection already closed
1271    AlreadyClosed,
1272}
1273
1274impl std::fmt::Display for TransitionError {
1275    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
1276        match self {
1277            Self::InvalidTransition { from, event } => {
1278                write!(f, "Invalid transition: {:?} + {:?}", from, event)
1279            }
1280            Self::AlreadyClosed => write!(f, "Connection already closed"),
1281        }
1282    }
1283}
1284
1285impl std::error::Error for TransitionError {}
1286
1287// ============================================================================
1288// Connection Pool
1289// ============================================================================
1290
1291/// Pre-allocated connection pool for reduced allocation overhead.
1292#[derive(Debug)]
1293pub struct ConnectionPool {
1294    /// Pooled connections
1295    connections: Vec<Connection>,
1296    /// Free list (indices)
1297    free_indices: Vec<usize>,
1298    /// Pool capacity
1299    capacity: usize,
1300}
1301
1302impl ConnectionPool {
1303    /// Create a new pool with capacity.
1304    pub fn new(capacity: usize) -> Self {
1305        let mut connections = Vec::with_capacity(capacity);
1306        let mut free_indices = Vec::with_capacity(capacity);
1307
1308        for i in 0..capacity {
1309            connections.push(Connection::with_id(i as u64));
1310            free_indices.push(i);
1311        }
1312
1313        Self {
1314            connections,
1315            free_indices,
1316            capacity,
1317        }
1318    }
1319
1320    /// Acquire a connection from the pool.
1321    #[inline]
1322    pub fn acquire(&mut self) -> Option<&mut Connection> {
1323        let index = self.free_indices.pop()?;
1324        let conn = &mut self.connections[index];
1325        conn.state = ConnectionState::Idle;
1326        CONNECTION_STATS.record_pool_acquire();
1327        Some(conn)
1328    }
1329
1330    /// Release a connection back to the pool.
1331    #[inline]
1332    pub fn release(&mut self, id: u64) {
1333        let index = id as usize;
1334        if index < self.capacity {
1335            self.connections[index].force_close();
1336            self.free_indices.push(index);
1337            CONNECTION_STATS.record_pool_release();
1338        }
1339    }
1340
1341    /// Get connection by ID.
1342    #[inline]
1343    pub fn get(&self, id: u64) -> Option<&Connection> {
1344        let index = id as usize;
1345        if index < self.capacity {
1346            Some(&self.connections[index])
1347        } else {
1348            None
1349        }
1350    }
1351
1352    /// Get mutable connection by ID.
1353    #[inline]
1354    pub fn get_mut(&mut self, id: u64) -> Option<&mut Connection> {
1355        let index = id as usize;
1356        if index < self.capacity {
1357            Some(&mut self.connections[index])
1358        } else {
1359            None
1360        }
1361    }
1362
1363    /// Get pool capacity.
1364    #[inline]
1365    pub fn capacity(&self) -> usize {
1366        self.capacity
1367    }
1368
1369    /// Get number of available connections.
1370    #[inline]
1371    pub fn available(&self) -> usize {
1372        self.free_indices.len()
1373    }
1374
1375    /// Get number of active connections.
1376    #[inline]
1377    pub fn active(&self) -> usize {
1378        self.capacity - self.free_indices.len()
1379    }
1380}
1381
1382// ============================================================================
1383// Connection Configuration
1384// ============================================================================
1385
1386/// Configuration for connection handling.
1387#[derive(Debug, Clone)]
1388pub struct ConnectionConfig {
1389    /// Idle timeout for keep-alive connections
1390    pub idle_timeout: Duration,
1391    /// Maximum requests per connection
1392    pub max_requests: u32,
1393    /// Keep-alive enabled by default
1394    pub keep_alive: bool,
1395    /// Read timeout
1396    pub read_timeout: Duration,
1397    /// Write timeout
1398    pub write_timeout: Duration,
1399}
1400
1401impl Default for ConnectionConfig {
1402    fn default() -> Self {
1403        Self {
1404            idle_timeout: Duration::from_secs(60),
1405            max_requests: 1000,
1406            keep_alive: true,
1407            read_timeout: Duration::from_secs(30),
1408            write_timeout: Duration::from_secs(30),
1409        }
1410    }
1411}
1412
1413impl ConnectionConfig {
1414    /// Create new configuration.
1415    pub fn new() -> Self {
1416        Self::default()
1417    }
1418
1419    /// Set idle timeout.
1420    pub fn idle_timeout(mut self, timeout: Duration) -> Self {
1421        self.idle_timeout = timeout;
1422        self
1423    }
1424
1425    /// Set max requests per connection.
1426    pub fn max_requests(mut self, max: u32) -> Self {
1427        self.max_requests = max;
1428        self
1429    }
1430
1431    /// Enable/disable keep-alive.
1432    pub fn keep_alive(mut self, enabled: bool) -> Self {
1433        self.keep_alive = enabled;
1434        self
1435    }
1436
1437    /// Set read timeout.
1438    pub fn read_timeout(mut self, timeout: Duration) -> Self {
1439        self.read_timeout = timeout;
1440        self
1441    }
1442
1443    /// Set write timeout.
1444    pub fn write_timeout(mut self, timeout: Duration) -> Self {
1445        self.write_timeout = timeout;
1446        self
1447    }
1448}
1449
1450// ============================================================================
1451// Statistics
1452// ============================================================================
1453
1454/// Connection statistics.
1455#[derive(Debug, Default)]
1456pub struct ConnectionStats {
1457    /// Total connections established
1458    connected: AtomicU64,
1459    /// Total connections closed
1460    closed: AtomicU64,
1461    /// Total connection errors
1462    errors: AtomicU64,
1463    /// Keep-alive reuses
1464    reuses: AtomicU64,
1465    /// Pool acquires
1466    pool_acquires: AtomicU64,
1467    /// Pool releases
1468    pool_releases: AtomicU64,
1469    /// State transitions
1470    #[allow(dead_code)] // Reserved for future state machine telemetry
1471    transitions: AtomicU64,
1472}
1473
1474impl ConnectionStats {
1475    /// Create new stats.
1476    pub fn new() -> Self {
1477        Self::default()
1478    }
1479
1480    #[inline]
1481    fn record_connected(&self) {
1482        self.connected.fetch_add(1, Ordering::Relaxed);
1483    }
1484
1485    #[inline]
1486    fn record_closed(&self) {
1487        self.closed.fetch_add(1, Ordering::Relaxed);
1488    }
1489
1490    #[inline]
1491    fn record_error(&self) {
1492        self.errors.fetch_add(1, Ordering::Relaxed);
1493    }
1494
1495    #[inline]
1496    fn record_reuse(&self) {
1497        self.reuses.fetch_add(1, Ordering::Relaxed);
1498    }
1499
1500    #[inline]
1501    fn record_pool_acquire(&self) {
1502        self.pool_acquires.fetch_add(1, Ordering::Relaxed);
1503    }
1504
1505    #[inline]
1506    fn record_pool_release(&self) {
1507        self.pool_releases.fetch_add(1, Ordering::Relaxed);
1508    }
1509
1510    /// Get total connections.
1511    pub fn connected(&self) -> u64 {
1512        self.connected.load(Ordering::Relaxed)
1513    }
1514
1515    /// Get closed connections.
1516    pub fn closed(&self) -> u64 {
1517        self.closed.load(Ordering::Relaxed)
1518    }
1519
1520    /// Get errors.
1521    pub fn errors(&self) -> u64 {
1522        self.errors.load(Ordering::Relaxed)
1523    }
1524
1525    /// Get keep-alive reuses.
1526    pub fn reuses(&self) -> u64 {
1527        self.reuses.load(Ordering::Relaxed)
1528    }
1529
1530    /// Get currently active connections.
1531    pub fn active(&self) -> u64 {
1532        let connected = self.connected.load(Ordering::Relaxed);
1533        let closed = self.closed.load(Ordering::Relaxed);
1534        connected.saturating_sub(closed)
1535    }
1536
1537    /// Get pool acquires.
1538    pub fn pool_acquires(&self) -> u64 {
1539        self.pool_acquires.load(Ordering::Relaxed)
1540    }
1541
1542    /// Get pool releases.
1543    pub fn pool_releases(&self) -> u64 {
1544        self.pool_releases.load(Ordering::Relaxed)
1545    }
1546}
1547
1548/// Global connection ID counter.
1549static CONNECTION_ID_COUNTER: AtomicU64 = AtomicU64::new(0);
1550
1551/// Global connection statistics.
1552static CONNECTION_STATS: ConnectionStats = ConnectionStats {
1553    connected: AtomicU64::new(0),
1554    closed: AtomicU64::new(0),
1555    errors: AtomicU64::new(0),
1556    reuses: AtomicU64::new(0),
1557    pool_acquires: AtomicU64::new(0),
1558    pool_releases: AtomicU64::new(0),
1559    transitions: AtomicU64::new(0),
1560};
1561
1562/// Get global connection statistics.
1563pub fn connection_stats() -> &'static ConnectionStats {
1564    &CONNECTION_STATS
1565}
1566
1567// ============================================================================
1568// State Machine Executor
1569// ============================================================================
1570
1571/// State machine executor with event batching.
1572///
1573/// Processes multiple events efficiently with minimal overhead.
1574pub struct StateMachineExecutor {
1575    /// Pending events
1576    events: Vec<(u64, ConnectionEvent)>,
1577    /// Batch size
1578    #[allow(dead_code)] // Reserved for batch processing limits
1579    batch_size: usize,
1580}
1581
1582impl StateMachineExecutor {
1583    /// Create new executor.
1584    pub fn new(batch_size: usize) -> Self {
1585        Self {
1586            events: Vec::with_capacity(batch_size),
1587            batch_size,
1588        }
1589    }
1590
1591    /// Queue an event for processing.
1592    #[inline]
1593    pub fn queue(&mut self, conn_id: u64, event: ConnectionEvent) {
1594        self.events.push((conn_id, event));
1595    }
1596
1597    /// Process all queued events against a connection pool.
1598    #[inline]
1599    pub fn process(&mut self, pool: &mut ConnectionPool) -> Vec<(u64, TransitionAction)> {
1600        let mut results = Vec::with_capacity(self.events.len());
1601
1602        for (conn_id, event) in self.events.drain(..) {
1603            if let Some(conn) = pool.get_mut(conn_id) {
1604                let action = conn.handle_event(event);
1605                results.push((conn_id, action));
1606            }
1607        }
1608
1609        results
1610    }
1611
1612    /// Process events with a callback for each action.
1613    #[inline]
1614    pub fn process_with<F>(&mut self, pool: &mut ConnectionPool, mut callback: F)
1615    where
1616        F: FnMut(u64, TransitionAction),
1617    {
1618        for (conn_id, event) in self.events.drain(..) {
1619            if let Some(conn) = pool.get_mut(conn_id) {
1620                let action = conn.handle_event(event);
1621                callback(conn_id, action);
1622            }
1623        }
1624    }
1625
1626    /// Check if executor has pending events.
1627    #[inline]
1628    pub fn has_pending(&self) -> bool {
1629        !self.events.is_empty()
1630    }
1631
1632    /// Get pending event count.
1633    #[inline]
1634    pub fn pending_count(&self) -> usize {
1635        self.events.len()
1636    }
1637
1638    /// Clear pending events.
1639    #[inline]
1640    pub fn clear(&mut self) {
1641        self.events.clear();
1642    }
1643}
1644
1645impl Default for StateMachineExecutor {
1646    fn default() -> Self {
1647        Self::new(64)
1648    }
1649}
1650
1651// ============================================================================
1652// Tests
1653// ============================================================================
1654
1655#[cfg(test)]
1656mod tests {
1657    use super::*;
1658
1659    #[test]
1660    fn test_connection_state_size() {
1661        // Verify state is 1 byte
1662        assert_eq!(std::mem::size_of::<ConnectionState>(), 1);
1663    }
1664
1665    #[test]
1666    fn test_connection_event_size() {
1667        // Verify event is 1 byte
1668        assert_eq!(std::mem::size_of::<ConnectionEvent>(), 1);
1669    }
1670
1671    #[test]
1672    fn test_transition_entry_size() {
1673        // Verify entry is 2 bytes
1674        assert_eq!(std::mem::size_of::<TransitionEntry>(), 2);
1675    }
1676
1677    #[test]
1678    fn test_basic_state_transitions() {
1679        let mut conn = Connection::new();
1680        assert_eq!(conn.state(), ConnectionState::Idle);
1681
1682        // Accept -> Connected
1683        let action = conn.handle_event(ConnectionEvent::Accept);
1684        assert_eq!(conn.state(), ConnectionState::Connected);
1685        assert_eq!(action, TransitionAction::StartRead);
1686
1687        // DataReady -> Reading
1688        let action = conn.handle_event(ConnectionEvent::DataReady);
1689        assert_eq!(conn.state(), ConnectionState::Reading);
1690        assert_eq!(action, TransitionAction::ContinueRead);
1691
1692        // RequestComplete -> Processing
1693        let action = conn.handle_event(ConnectionEvent::RequestComplete);
1694        assert_eq!(conn.state(), ConnectionState::Processing);
1695        assert_eq!(action, TransitionAction::Dispatch);
1696
1697        // ResponseReady -> Writing
1698        let action = conn.handle_event(ConnectionEvent::ResponseReady);
1699        assert_eq!(conn.state(), ConnectionState::Writing);
1700        assert_eq!(action, TransitionAction::StartWrite);
1701
1702        // WriteComplete -> Closing
1703        let action = conn.handle_event(ConnectionEvent::WriteComplete);
1704        assert_eq!(conn.state(), ConnectionState::Closing);
1705        assert_eq!(action, TransitionAction::InitiateClose);
1706    }
1707
1708    #[test]
1709    fn test_keep_alive_transition() {
1710        let mut conn = Connection::new();
1711        conn.handle_event(ConnectionEvent::Accept);
1712        conn.handle_event(ConnectionEvent::DataReady);
1713        conn.handle_event(ConnectionEvent::RequestComplete);
1714        conn.handle_event(ConnectionEvent::ResponseReady);
1715
1716        // KeepAlive -> back to Connected
1717        let action = conn.handle_event(ConnectionEvent::KeepAlive);
1718        assert_eq!(conn.state(), ConnectionState::Connected);
1719        assert_eq!(action, TransitionAction::Reset);
1720    }
1721
1722    #[test]
1723    fn test_error_transition() {
1724        let mut conn = Connection::new();
1725        conn.handle_event(ConnectionEvent::Accept);
1726
1727        // Error -> Error state
1728        let action = conn.handle_event(ConnectionEvent::Error);
1729        assert_eq!(conn.state(), ConnectionState::Error);
1730        assert_eq!(action, TransitionAction::ForceClose);
1731        assert!(conn.state().is_terminal());
1732    }
1733
1734    #[test]
1735    fn test_timeout_transition() {
1736        let mut conn = Connection::new();
1737        conn.handle_event(ConnectionEvent::Accept);
1738
1739        // Timeout -> Closing
1740        let action = conn.handle_event(ConnectionEvent::Timeout);
1741        assert_eq!(conn.state(), ConnectionState::Closing);
1742        assert_eq!(action, TransitionAction::InitiateClose);
1743    }
1744
1745    #[test]
1746    fn test_connection_pool() {
1747        let mut pool = ConnectionPool::new(10);
1748
1749        assert_eq!(pool.capacity(), 10);
1750        assert_eq!(pool.available(), 10);
1751        assert_eq!(pool.active(), 0);
1752
1753        // Acquire connections
1754        let conn1 = pool.acquire().unwrap();
1755        let id1 = conn1.id();
1756        assert_eq!(pool.available(), 9);
1757        assert_eq!(pool.active(), 1);
1758
1759        let conn2 = pool.acquire().unwrap();
1760        let id2 = conn2.id();
1761        assert_eq!(pool.available(), 8);
1762
1763        // Release
1764        pool.release(id1);
1765        assert_eq!(pool.available(), 9);
1766
1767        pool.release(id2);
1768        assert_eq!(pool.available(), 10);
1769    }
1770
1771    #[test]
1772    fn test_connection_config() {
1773        let config = ConnectionConfig::new()
1774            .idle_timeout(Duration::from_secs(120))
1775            .max_requests(500)
1776            .keep_alive(false)
1777            .read_timeout(Duration::from_secs(10))
1778            .write_timeout(Duration::from_secs(10));
1779
1780        assert_eq!(config.idle_timeout, Duration::from_secs(120));
1781        assert_eq!(config.max_requests, 500);
1782        assert!(!config.keep_alive);
1783    }
1784
1785    #[test]
1786    fn test_state_machine_executor() {
1787        let mut pool = ConnectionPool::new(5);
1788        let mut executor = StateMachineExecutor::new(10);
1789
1790        // Acquire a connection
1791        let conn = pool.acquire().unwrap();
1792        let conn_id = conn.id();
1793
1794        // Queue events
1795        executor.queue(conn_id, ConnectionEvent::Accept);
1796        executor.queue(conn_id, ConnectionEvent::DataReady);
1797        executor.queue(conn_id, ConnectionEvent::RequestComplete);
1798
1799        assert!(executor.has_pending());
1800        assert_eq!(executor.pending_count(), 3);
1801
1802        // Process
1803        let results = executor.process(&mut pool);
1804        assert_eq!(results.len(), 3);
1805        assert!(!executor.has_pending());
1806
1807        // Verify final state
1808        let conn = pool.get(conn_id).unwrap();
1809        assert_eq!(conn.state(), ConnectionState::Processing);
1810    }
1811
1812    #[test]
1813    fn test_request_count() {
1814        let mut conn = Connection::new();
1815        assert_eq!(conn.request_count(), 0);
1816
1817        // Process a request
1818        conn.handle_event(ConnectionEvent::Accept);
1819        conn.handle_event(ConnectionEvent::DataReady);
1820        conn.handle_event(ConnectionEvent::RequestComplete); // Increments here
1821        assert_eq!(conn.request_count(), 1);
1822
1823        conn.handle_event(ConnectionEvent::ResponseReady);
1824        conn.handle_event(ConnectionEvent::KeepAlive);
1825
1826        // Second request
1827        conn.handle_event(ConnectionEvent::DataReady);
1828        conn.handle_event(ConnectionEvent::RequestComplete);
1829        assert_eq!(conn.request_count(), 2);
1830    }
1831
1832    #[test]
1833    fn test_try_transition_valid() {
1834        let mut conn = Connection::new();
1835        let result = conn.try_transition(ConnectionEvent::Accept);
1836        assert!(result.is_ok());
1837        assert_eq!(result.unwrap(), TransitionAction::StartRead);
1838    }
1839
1840    #[test]
1841    fn test_try_transition_invalid() {
1842        let mut conn = Connection::new();
1843        // DataReady in Idle state is a no-op
1844        let result = conn.try_transition(ConnectionEvent::DataReady);
1845        assert!(result.is_err());
1846    }
1847
1848    #[test]
1849    fn test_connection_stats() {
1850        let stats = connection_stats();
1851        let _ = stats.connected();
1852        let _ = stats.closed();
1853        let _ = stats.errors();
1854        let _ = stats.active();
1855        let _ = stats.reuses();
1856    }
1857
1858    #[test]
1859    fn test_state_is_terminal() {
1860        assert!(ConnectionState::Closed.is_terminal());
1861        assert!(ConnectionState::Error.is_terminal());
1862        assert!(!ConnectionState::Connected.is_terminal());
1863        assert!(!ConnectionState::Processing.is_terminal());
1864    }
1865
1866    #[test]
1867    fn test_state_is_active() {
1868        assert!(ConnectionState::Reading.is_active());
1869        assert!(ConnectionState::Processing.is_active());
1870        assert!(ConnectionState::Writing.is_active());
1871        assert!(!ConnectionState::Idle.is_active());
1872        assert!(!ConnectionState::Connected.is_active());
1873    }
1874
1875    // Recycling tests
1876
1877    #[test]
1878    fn test_recyclable_connection_basic() {
1879        let mut conn = RecyclableConnection::new();
1880        assert_eq!(conn.generation(), 0);
1881        assert_eq!(conn.recycle_count(), 0);
1882        assert!(conn.is_clean());
1883
1884        // Use the connection
1885        conn.handle_event(ConnectionEvent::Accept);
1886        conn.handle_event(ConnectionEvent::DataReady);
1887
1888        // Prepare for recycle
1889        conn.prepare_for_recycle();
1890        assert_eq!(conn.generation(), 1);
1891        assert_eq!(conn.recycle_count(), 1);
1892        assert!(conn.is_clean());
1893        assert_eq!(conn.state(), ConnectionState::Idle);
1894    }
1895
1896    #[test]
1897    fn test_recyclable_connection_user_data() {
1898        let mut conn = RecyclableConnection::new();
1899
1900        // Set user data
1901        conn.set_user_data(42u32);
1902        assert_eq!(conn.user_data::<u32>(), Some(&42));
1903
1904        // Take user data
1905        let data = conn.take_user_data::<u32>();
1906        assert_eq!(data, Some(42));
1907        assert!(conn.user_data::<u32>().is_none());
1908    }
1909
1910    #[test]
1911    fn test_recyclable_connection_buffer_capacities() {
1912        let conn = RecyclableConnection::with_capacities(4096, 8192);
1913        assert_eq!(conn.read_buffer_capacity(), 4096);
1914        assert_eq!(conn.write_buffer_capacity(), 8192);
1915    }
1916
1917    #[test]
1918    fn test_recycle_pool_basic() {
1919        let mut pool: RecyclePool<RecyclableConnection> = RecyclePool::new(5);
1920
1921        assert_eq!(pool.capacity(), 5);
1922        assert_eq!(pool.available(), 5);
1923        assert_eq!(pool.active(), 0);
1924        assert!(!pool.is_exhausted());
1925
1926        // Acquire and use
1927        {
1928            let mut handle = pool.acquire().unwrap();
1929            handle.handle_event(ConnectionEvent::Accept);
1930            assert_eq!(handle.state(), ConnectionState::Connected);
1931            // Note: cannot check pool state while handle is borrowed
1932        }
1933
1934        // Auto-released when handle dropped
1935        assert_eq!(pool.available(), 5);
1936        assert_eq!(pool.active(), 0);
1937    }
1938
1939    #[test]
1940    fn test_recycle_pool_exhaustion() {
1941        let mut pool: RecyclePool<RecyclableConnection> = RecyclePool::new(2);
1942
1943        // Initial state
1944        assert_eq!(pool.capacity(), 2);
1945        assert_eq!(pool.available(), 2);
1946        assert!(!pool.is_exhausted());
1947
1948        // Acquire and release one
1949        {
1950            let h = pool.acquire().unwrap();
1951            assert_eq!(h.index(), 1); // Last index pushed
1952        }
1953        assert_eq!(pool.available(), 2);
1954
1955        // Acquire again - should get the same slot back (LIFO)
1956        {
1957            let h = pool.acquire().unwrap();
1958            assert_eq!(h.index(), 1);
1959        }
1960
1961        // Pool should still have all slots available
1962        assert!(!pool.is_exhausted());
1963    }
1964
1965    #[test]
1966    fn test_recycle_pool_generation_tracking() {
1967        let mut pool: RecyclePool<RecyclableConnection> = RecyclePool::new(1);
1968
1969        let gen1 = {
1970            let handle = pool.acquire().unwrap();
1971            handle.generation()
1972        };
1973
1974        let gen2 = {
1975            let handle = pool.acquire().unwrap();
1976            handle.generation()
1977        };
1978
1979        // Generation should increment on recycle
1980        assert!(gen2 > gen1);
1981    }
1982
1983    #[test]
1984    fn test_recycle_pool_high_water_mark() {
1985        let mut pool: RecyclePool<RecyclableConnection> = RecyclePool::new(5);
1986
1987        // Acquire and release 3 connections one at a time to build up high water mark
1988        {
1989            let _ = pool.acquire().unwrap();
1990        }
1991        assert!(pool.high_water_mark() >= 1);
1992
1993        // Check high water mark is tracked
1994        let hwm_before = pool.high_water_mark();
1995
1996        // Reset
1997        pool.reset_high_water_mark();
1998        assert!(pool.high_water_mark() <= hwm_before);
1999    }
2000
2001    #[test]
2002    fn test_recycle_pool_grow() {
2003        let mut pool: RecyclePool<RecyclableConnection> = RecyclePool::new(2);
2004
2005        assert_eq!(pool.capacity(), 2);
2006
2007        pool.grow(3);
2008
2009        assert_eq!(pool.capacity(), 5);
2010        assert_eq!(pool.available(), 5);
2011    }
2012
2013    #[test]
2014    fn test_recycle_pool_config() {
2015        let config = RecyclePoolConfig::new()
2016            .initial_capacity(50)
2017            .max_capacity(500)
2018            .grow_by(25)
2019            .shrink_threshold(0.1)
2020            .min_capacity(5);
2021
2022        assert_eq!(config.initial_capacity, 50);
2023        assert_eq!(config.max_capacity, 500);
2024        assert_eq!(config.grow_by, 25);
2025        assert!((config.shrink_threshold - 0.1).abs() < 0.001);
2026        assert_eq!(config.min_capacity, 5);
2027    }
2028
2029    #[test]
2030    fn test_connection_recycler() {
2031        let mut recycler = ConnectionRecycler::new(10);
2032
2033        let stats = recycler.stats();
2034        assert_eq!(stats.capacity, 10);
2035        assert_eq!(stats.available, 10);
2036        assert_eq!(stats.active, 0);
2037
2038        // Acquire and use
2039        {
2040            let mut handle = recycler.acquire().unwrap();
2041            handle.handle_event(ConnectionEvent::Accept);
2042            // Note: can't check stats while handle is borrowed
2043        }
2044
2045        // Released
2046        let stats = recycler.stats();
2047        assert_eq!(stats.active, 0);
2048        assert_eq!(stats.available, 10);
2049    }
2050
2051    #[test]
2052    fn test_recycle_stats() {
2053        let stats = recycle_stats();
2054        let _ = stats.acquires();
2055        let _ = stats.releases();
2056        let _ = stats.recycles();
2057        let _ = stats.allocations();
2058        let _ = stats.recycle_ratio();
2059        let _ = stats.hit_ratio();
2060    }
2061
2062    #[test]
2063    fn test_recyclable_trait() {
2064        let mut conn = RecyclableConnection::new();
2065
2066        // Implement Recyclable
2067        assert!(conn.is_clean());
2068        assert_eq!(conn.generation(), 0);
2069
2070        conn.handle_event(ConnectionEvent::Accept);
2071        conn.increment_generation();
2072        assert_eq!(conn.generation(), 1);
2073
2074        conn.reset();
2075        assert!(conn.is_clean());
2076        assert_eq!(conn.generation(), 2); // reset increments generation
2077    }
2078}