1use std::sync::atomic::{AtomicU64, Ordering};
66use std::time::{Duration, Instant};
67
68#[repr(u8)]
77#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash, Default)]
78pub enum ConnectionState {
79 #[default]
81 Idle = 0,
82 Connected = 1,
84 Reading = 2,
86 Processing = 3,
88 Writing = 4,
90 Closing = 5,
92 Closed = 6,
94 Error = 7,
96}
97
98impl ConnectionState {
99 pub const COUNT: usize = 8;
101
102 #[inline(always)]
104 pub const fn from_u8(v: u8) -> Self {
105 match v {
107 0 => Self::Idle,
108 1 => Self::Connected,
109 2 => Self::Reading,
110 3 => Self::Processing,
111 4 => Self::Writing,
112 5 => Self::Closing,
113 6 => Self::Closed,
114 7 => Self::Error,
115 _ => Self::Error,
116 }
117 }
118
119 #[inline(always)]
121 pub const fn as_u8(self) -> u8 {
122 self as u8
123 }
124
125 #[inline(always)]
127 pub const fn is_terminal(self) -> bool {
128 matches!(self, Self::Closed | Self::Error)
129 }
130
131 #[inline(always)]
133 pub const fn can_accept_request(self) -> bool {
134 matches!(self, Self::Connected | Self::Idle)
135 }
136
137 #[inline(always)]
139 pub const fn is_active(self) -> bool {
140 matches!(self, Self::Reading | Self::Processing | Self::Writing)
141 }
142}
143
144#[repr(u8)]
150#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash)]
151pub enum ConnectionEvent {
152 Accept = 0,
154 DataReady = 1,
156 RequestComplete = 2,
158 ResponseReady = 3,
160 WriteComplete = 4,
162 KeepAlive = 5,
164 Timeout = 6,
166 Error = 7,
168 Close = 8,
170}
171
172impl ConnectionEvent {
173 pub const COUNT: usize = 9;
175
176 #[inline(always)]
178 pub const fn as_u8(self) -> u8 {
179 self as u8
180 }
181}
182
183#[repr(u8)]
189#[derive(Debug, Clone, Copy, PartialEq, Eq)]
190pub enum TransitionAction {
191 None = 0,
193 StartRead = 1,
195 ContinueRead = 2,
197 Dispatch = 3,
199 StartWrite = 4,
201 ContinueWrite = 5,
203 Reset = 6,
205 InitiateClose = 7,
207 ForceClose = 8,
209 LogError = 9,
211}
212
213const TRANSITION_TABLE: [[TransitionEntry; ConnectionEvent::COUNT]; ConnectionState::COUNT] = {
224 use ConnectionState as S;
225 use TransitionAction as A;
226
227 const fn e(state: S, action: A) -> TransitionEntry {
229 TransitionEntry {
230 next_state: state.as_u8(),
231 action: action as u8,
232 }
233 }
234
235 const NOOP_IDLE: TransitionEntry = e(S::Idle, A::None);
237 const NOOP_CONN: TransitionEntry = e(S::Connected, A::None);
238 const NOOP_READ: TransitionEntry = e(S::Reading, A::None);
239 const NOOP_PROC: TransitionEntry = e(S::Processing, A::None);
240 const NOOP_WRIT: TransitionEntry = e(S::Writing, A::None);
241 const NOOP_CLOS: TransitionEntry = e(S::Closing, A::None);
242 const NOOP_CLSD: TransitionEntry = e(S::Closed, A::None);
243 const NOOP_ERR: TransitionEntry = e(S::Error, A::None);
244
245 const TO_ERROR: TransitionEntry = e(S::Error, A::ForceClose);
247 const TO_CLOSE: TransitionEntry = e(S::Closing, A::InitiateClose);
248 const TO_CLOSED: TransitionEntry = e(S::Closed, A::None);
249
250 [
251 [
254 e(S::Connected, A::StartRead), NOOP_IDLE, NOOP_IDLE, NOOP_IDLE, NOOP_IDLE, NOOP_IDLE, NOOP_IDLE, NOOP_IDLE, NOOP_IDLE, ],
264 [
266 NOOP_CONN, e(S::Reading, A::ContinueRead), NOOP_CONN, NOOP_CONN, NOOP_CONN, NOOP_CONN, TO_CLOSE, TO_ERROR, TO_CLOSE, ],
276 [
278 NOOP_READ, e(S::Reading, A::ContinueRead), e(S::Processing, A::Dispatch), NOOP_READ, NOOP_READ, NOOP_READ, TO_CLOSE, TO_ERROR, TO_CLOSE, ],
288 [
290 NOOP_PROC, NOOP_PROC, NOOP_PROC, e(S::Writing, A::StartWrite), NOOP_PROC, NOOP_PROC, TO_CLOSE, TO_ERROR, TO_CLOSE, ],
300 [
302 NOOP_WRIT, NOOP_WRIT, NOOP_WRIT, NOOP_WRIT, e(S::Closing, A::InitiateClose), e(S::Connected, A::Reset), TO_CLOSE, TO_ERROR, TO_CLOSE, ],
312 [
314 NOOP_CLOS, NOOP_CLOS, NOOP_CLOS, NOOP_CLOS, TO_CLOSED, NOOP_CLOS, TO_CLOSED, TO_CLOSED, TO_CLOSED, ],
324 [
326 NOOP_CLSD, NOOP_CLSD, NOOP_CLSD, NOOP_CLSD, NOOP_CLSD, NOOP_CLSD, NOOP_CLSD, NOOP_CLSD,
327 NOOP_CLSD,
328 ],
329 [
331 NOOP_ERR, NOOP_ERR, NOOP_ERR, NOOP_ERR, NOOP_ERR, NOOP_ERR, NOOP_ERR, NOOP_ERR,
332 NOOP_ERR,
333 ],
334 ]
335};
336
337#[derive(Clone, Copy)]
339struct TransitionEntry {
340 next_state: u8,
341 action: u8,
342}
343
344impl TransitionEntry {
345 #[inline(always)]
346 fn state(self) -> ConnectionState {
347 ConnectionState::from_u8(self.next_state)
348 }
349
350 #[inline(always)]
351 fn action(self) -> TransitionAction {
352 unsafe { std::mem::transmute(self.action) }
354 }
355}
356
357#[derive(Debug)]
365pub struct Connection {
366 state: ConnectionState,
368 keep_alive: bool,
370 request_count: u32,
372 connected_at: Option<Instant>,
374 last_activity: Option<Instant>,
376 id: u64,
378}
379
380impl Connection {
381 #[inline]
383 pub fn new() -> Self {
384 Self {
385 state: ConnectionState::Idle,
386 keep_alive: true,
387 request_count: 0,
388 connected_at: None,
389 last_activity: None,
390 id: CONNECTION_ID_COUNTER.fetch_add(1, Ordering::Relaxed),
391 }
392 }
393
394 #[inline]
396 pub fn with_id(id: u64) -> Self {
397 Self {
398 state: ConnectionState::Idle,
399 keep_alive: true,
400 request_count: 0,
401 connected_at: None,
402 last_activity: None,
403 id,
404 }
405 }
406
407 #[inline(always)]
409 pub fn state(&self) -> ConnectionState {
410 self.state
411 }
412
413 #[inline(always)]
415 pub fn id(&self) -> u64 {
416 self.id
417 }
418
419 #[inline(always)]
421 pub fn request_count(&self) -> u32 {
422 self.request_count
423 }
424
425 #[inline(always)]
427 pub fn keep_alive(&self) -> bool {
428 self.keep_alive
429 }
430
431 #[inline(always)]
433 pub fn set_keep_alive(&mut self, enabled: bool) {
434 self.keep_alive = enabled;
435 }
436
437 #[inline]
439 pub fn age(&self) -> Option<Duration> {
440 self.connected_at.map(|t| t.elapsed())
441 }
442
443 #[inline]
445 pub fn idle_time(&self) -> Option<Duration> {
446 self.last_activity.map(|t| t.elapsed())
447 }
448
449 #[inline]
453 pub fn handle_event(&mut self, event: ConnectionEvent) -> TransitionAction {
454 let entry = TRANSITION_TABLE[self.state.as_u8() as usize][event.as_u8() as usize];
456
457 let new_state = entry.state();
459 let action = entry.action();
460
461 if new_state != self.state {
463 self.on_state_change(new_state);
464 }
465
466 self.state = new_state;
467 self.last_activity = Some(Instant::now());
468
469 action
470 }
471
472 #[inline]
476 pub fn try_transition(
477 &mut self,
478 event: ConnectionEvent,
479 ) -> Result<TransitionAction, TransitionError> {
480 let entry = TRANSITION_TABLE[self.state.as_u8() as usize][event.as_u8() as usize];
481 let new_state = entry.state();
482 let action = entry.action();
483
484 if new_state == self.state && action == TransitionAction::None {
486 return Err(TransitionError::InvalidTransition {
487 from: self.state,
488 event,
489 });
490 }
491
492 if new_state != self.state {
493 self.on_state_change(new_state);
494 }
495
496 self.state = new_state;
497 self.last_activity = Some(Instant::now());
498
499 Ok(action)
500 }
501
502 #[inline]
504 fn on_state_change(&mut self, new_state: ConnectionState) {
505 match new_state {
506 ConnectionState::Connected => {
507 self.connected_at = Some(Instant::now());
508 CONNECTION_STATS.record_connected();
509 }
510 ConnectionState::Reading => {
511 }
513 ConnectionState::Processing => {
514 self.request_count += 1;
515 }
516 ConnectionState::Closed => {
517 CONNECTION_STATS.record_closed();
518 }
519 ConnectionState::Error => {
520 CONNECTION_STATS.record_error();
521 }
522 _ => {}
523 }
524 }
525
526 #[inline]
528 pub fn reset(&mut self) {
529 self.state = ConnectionState::Connected;
530 self.last_activity = Some(Instant::now());
531 CONNECTION_STATS.record_reuse();
532 }
533
534 #[inline]
536 pub fn force_close(&mut self) {
537 if !self.state.is_terminal() {
538 self.state = ConnectionState::Closed;
539 CONNECTION_STATS.record_closed();
540 }
541 }
542
543 #[inline]
545 pub fn should_close(&self, idle_timeout: Duration) -> bool {
546 if self.state.is_terminal() {
547 return true;
548 }
549
550 if let Some(idle_time) = self.idle_time() {
551 if idle_time > idle_timeout {
552 return true;
553 }
554 }
555
556 false
557 }
558}
559
560impl Default for Connection {
561 fn default() -> Self {
562 Self::new()
563 }
564}
565
566pub trait Recyclable {
575 fn reset(&mut self);
580
581 fn is_clean(&self) -> bool;
583
584 fn generation(&self) -> u64;
586
587 fn increment_generation(&mut self);
589}
590
591#[derive(Debug)]
597pub struct RecyclableConnection {
598 inner: Connection,
600 generation: u64,
602 recycle_count: u64,
604 read_buffer_capacity: usize,
606 write_buffer_capacity: usize,
608 user_data: Option<Box<dyn std::any::Any + Send + Sync>>,
610}
611
612impl RecyclableConnection {
613 pub fn new() -> Self {
615 Self {
616 inner: Connection::new(),
617 generation: 0,
618 recycle_count: 0,
619 read_buffer_capacity: 0,
620 write_buffer_capacity: 0,
621 user_data: None,
622 }
623 }
624
625 pub fn with_id(id: u64) -> Self {
627 Self {
628 inner: Connection::with_id(id),
629 generation: 0,
630 recycle_count: 0,
631 read_buffer_capacity: 0,
632 write_buffer_capacity: 0,
633 user_data: None,
634 }
635 }
636
637 pub fn with_capacities(read_capacity: usize, write_capacity: usize) -> Self {
639 Self {
640 inner: Connection::new(),
641 generation: 0,
642 recycle_count: 0,
643 read_buffer_capacity: read_capacity,
644 write_buffer_capacity: write_capacity,
645 user_data: None,
646 }
647 }
648
649 #[inline(always)]
651 pub fn inner(&self) -> &Connection {
652 &self.inner
653 }
654
655 #[inline(always)]
657 pub fn inner_mut(&mut self) -> &mut Connection {
658 &mut self.inner
659 }
660
661 #[inline(always)]
663 pub fn recycle_count(&self) -> u64 {
664 self.recycle_count
665 }
666
667 #[inline(always)]
669 pub fn read_buffer_capacity(&self) -> usize {
670 self.read_buffer_capacity
671 }
672
673 #[inline(always)]
675 pub fn write_buffer_capacity(&self) -> usize {
676 self.write_buffer_capacity
677 }
678
679 #[inline]
681 pub fn set_buffer_capacities(&mut self, read: usize, write: usize) {
682 self.read_buffer_capacity = read;
683 self.write_buffer_capacity = write;
684 }
685
686 pub fn set_user_data<T: std::any::Any + Send + Sync + 'static>(&mut self, data: T) {
688 self.user_data = Some(Box::new(data));
689 }
690
691 pub fn user_data<T: std::any::Any + Send + Sync + 'static>(&self) -> Option<&T> {
693 self.user_data.as_ref()?.downcast_ref::<T>()
694 }
695
696 pub fn take_user_data<T: std::any::Any + Send + Sync + 'static>(&mut self) -> Option<T> {
698 let boxed = self.user_data.take()?;
699 boxed.downcast::<T>().ok().map(|b| *b)
700 }
701
702 #[inline]
704 pub fn handle_event(&mut self, event: ConnectionEvent) -> TransitionAction {
705 self.inner.handle_event(event)
706 }
707
708 #[inline(always)]
710 pub fn state(&self) -> ConnectionState {
711 self.inner.state()
712 }
713
714 #[inline(always)]
716 pub fn id(&self) -> u64 {
717 self.inner.id()
718 }
719
720 pub fn prepare_for_recycle(&mut self) {
722 self.user_data = None;
724 self.inner.state = ConnectionState::Idle;
726 self.inner.keep_alive = true;
727 self.inner.request_count = 0;
728 self.inner.connected_at = None;
729 self.inner.last_activity = None;
730 self.generation += 1;
732 self.recycle_count += 1;
733 RECYCLE_STATS.record_recycle();
735 }
736}
737
738impl Default for RecyclableConnection {
739 fn default() -> Self {
740 Self::new()
741 }
742}
743
744impl Recyclable for RecyclableConnection {
745 fn reset(&mut self) {
746 self.prepare_for_recycle();
747 }
748
749 fn is_clean(&self) -> bool {
750 self.inner.state() == ConnectionState::Idle && self.user_data.is_none()
751 }
752
753 fn generation(&self) -> u64 {
754 self.generation
755 }
756
757 fn increment_generation(&mut self) {
758 self.generation += 1;
759 }
760}
761
762#[derive(Debug)]
771pub struct RecyclePool<T: Recyclable + Default> {
772 objects: Vec<T>,
774 free_indices: Vec<usize>,
776 capacity: usize,
778 high_water_mark: usize,
780 #[allow(dead_code)] config: RecyclePoolConfig,
783}
784
785impl<T: Recyclable + Default> RecyclePool<T> {
786 pub fn new(capacity: usize) -> Self {
788 Self::with_config(capacity, RecyclePoolConfig::default())
789 }
790
791 pub fn with_config(capacity: usize, config: RecyclePoolConfig) -> Self {
793 let mut objects = Vec::with_capacity(capacity);
794 let mut free_indices = Vec::with_capacity(capacity);
795
796 for i in 0..capacity {
797 objects.push(T::default());
798 free_indices.push(i);
799 }
800
801 Self {
802 objects,
803 free_indices,
804 capacity,
805 high_water_mark: 0,
806 config,
807 }
808 }
809
810 #[inline]
814 pub fn acquire(&mut self) -> Option<PoolHandle<'_, T>> {
815 let index = self.free_indices.pop()?;
816 let obj = &mut self.objects[index];
817
818 if !obj.is_clean() {
820 obj.reset();
821 }
822
823 let active = self.capacity - self.free_indices.len();
825 if active > self.high_water_mark {
826 self.high_water_mark = active;
827 }
828
829 RECYCLE_STATS.record_acquire();
830
831 Some(PoolHandle {
832 pool: self,
833 index,
834 released: false,
835 _marker: std::marker::PhantomData,
836 })
837 }
838
839 #[inline]
841 pub fn try_acquire(&mut self) -> Option<PoolHandle<'_, T>> {
842 self.acquire()
843 }
844
845 #[inline]
847 fn release(&mut self, index: usize) {
848 if index < self.capacity {
849 self.objects[index].reset();
851 self.free_indices.push(index);
852 RECYCLE_STATS.record_release();
853 }
854 }
855
856 #[inline]
858 pub fn get(&self, index: usize) -> Option<&T> {
859 self.objects.get(index)
860 }
861
862 #[inline]
864 pub fn get_mut(&mut self, index: usize) -> Option<&mut T> {
865 self.objects.get_mut(index)
866 }
867
868 #[inline]
870 pub fn capacity(&self) -> usize {
871 self.capacity
872 }
873
874 #[inline]
876 pub fn available(&self) -> usize {
877 self.free_indices.len()
878 }
879
880 #[inline]
882 pub fn active(&self) -> usize {
883 self.capacity - self.free_indices.len()
884 }
885
886 #[inline]
888 pub fn high_water_mark(&self) -> usize {
889 self.high_water_mark
890 }
891
892 #[inline]
894 pub fn reset_high_water_mark(&mut self) {
895 self.high_water_mark = self.active();
896 }
897
898 #[inline]
900 pub fn is_exhausted(&self) -> bool {
901 self.free_indices.is_empty()
902 }
903
904 pub fn shrink(&mut self, min_capacity: usize) {
906 let target = min_capacity.max(self.active());
907 if target < self.capacity {
908 while self.capacity > target && !self.free_indices.is_empty() {
910 if let Some(index) = self.free_indices.pop() {
911 self.objects[index].reset();
913 }
914 self.capacity -= 1;
915 }
916 }
917 }
918
919 pub fn grow(&mut self, additional: usize) {
921 let new_capacity = self.capacity + additional;
922 self.objects.reserve(additional);
923
924 for _ in 0..additional {
925 let index = self.objects.len();
926 self.objects.push(T::default());
927 self.free_indices.push(index);
928 }
929
930 self.capacity = new_capacity;
931 }
932}
933
934#[derive(Debug)]
936pub struct PoolHandle<'a, T: Recyclable + Default> {
937 pool: *mut RecyclePool<T>,
938 index: usize,
939 released: bool,
940 _marker: std::marker::PhantomData<&'a mut T>,
941}
942
943impl<'a, T: Recyclable + Default> PoolHandle<'a, T> {
944 #[inline]
946 pub fn get(&self) -> &T {
947 unsafe { &(&(*self.pool).objects)[self.index] }
948 }
949
950 #[inline]
952 pub fn get_mut(&mut self) -> &mut T {
953 unsafe { &mut (&mut (*self.pool).objects)[self.index] }
954 }
955
956 #[inline]
958 pub fn index(&self) -> usize {
959 self.index
960 }
961
962 #[inline]
964 pub fn generation(&self) -> u64 {
965 self.get().generation()
966 }
967
968 #[inline]
970 pub fn release(mut self) {
971 if !self.released {
972 unsafe { (*self.pool).release(self.index) };
973 self.released = true;
974 }
975 }
976}
977
978impl<'a, T: Recyclable + Default> Drop for PoolHandle<'a, T> {
979 fn drop(&mut self) {
980 if !self.released {
981 unsafe { (*self.pool).release(self.index) };
982 }
983 }
984}
985
986impl<'a, T: Recyclable + Default> std::ops::Deref for PoolHandle<'a, T> {
987 type Target = T;
988
989 #[inline]
990 fn deref(&self) -> &Self::Target {
991 self.get()
992 }
993}
994
995impl<'a, T: Recyclable + Default> std::ops::DerefMut for PoolHandle<'a, T> {
996 #[inline]
997 fn deref_mut(&mut self) -> &mut Self::Target {
998 self.get_mut()
999 }
1000}
1001
1002#[derive(Debug, Clone)]
1008pub struct RecyclePoolConfig {
1009 pub initial_capacity: usize,
1011 pub max_capacity: usize,
1013 pub grow_by: usize,
1015 pub shrink_threshold: f32,
1017 pub min_capacity: usize,
1019}
1020
1021impl Default for RecyclePoolConfig {
1022 fn default() -> Self {
1023 Self {
1024 initial_capacity: 100,
1025 max_capacity: 10000,
1026 grow_by: 50,
1027 shrink_threshold: 0.25,
1028 min_capacity: 10,
1029 }
1030 }
1031}
1032
1033impl RecyclePoolConfig {
1034 pub fn new() -> Self {
1036 Self::default()
1037 }
1038
1039 pub fn initial_capacity(mut self, capacity: usize) -> Self {
1041 self.initial_capacity = capacity;
1042 self
1043 }
1044
1045 pub fn max_capacity(mut self, max: usize) -> Self {
1047 self.max_capacity = max;
1048 self
1049 }
1050
1051 pub fn grow_by(mut self, amount: usize) -> Self {
1053 self.grow_by = amount;
1054 self
1055 }
1056
1057 pub fn shrink_threshold(mut self, threshold: f32) -> Self {
1059 self.shrink_threshold = threshold;
1060 self
1061 }
1062
1063 pub fn min_capacity(mut self, min: usize) -> Self {
1065 self.min_capacity = min;
1066 self
1067 }
1068}
1069
1070#[derive(Debug, Default)]
1076pub struct RecycleStats {
1077 acquires: AtomicU64,
1079 releases: AtomicU64,
1081 recycles: AtomicU64,
1083 allocations: AtomicU64,
1085}
1086
1087impl RecycleStats {
1088 pub fn new() -> Self {
1090 Self::default()
1091 }
1092
1093 #[inline]
1094 fn record_acquire(&self) {
1095 self.acquires.fetch_add(1, Ordering::Relaxed);
1096 }
1097
1098 #[inline]
1099 fn record_release(&self) {
1100 self.releases.fetch_add(1, Ordering::Relaxed);
1101 }
1102
1103 #[inline]
1104 fn record_recycle(&self) {
1105 self.recycles.fetch_add(1, Ordering::Relaxed);
1106 }
1107
1108 #[inline]
1109 #[allow(dead_code)] fn record_allocation(&self) {
1111 self.allocations.fetch_add(1, Ordering::Relaxed);
1112 }
1113
1114 pub fn acquires(&self) -> u64 {
1116 self.acquires.load(Ordering::Relaxed)
1117 }
1118
1119 pub fn releases(&self) -> u64 {
1121 self.releases.load(Ordering::Relaxed)
1122 }
1123
1124 pub fn recycles(&self) -> u64 {
1126 self.recycles.load(Ordering::Relaxed)
1127 }
1128
1129 pub fn allocations(&self) -> u64 {
1131 self.allocations.load(Ordering::Relaxed)
1132 }
1133
1134 pub fn recycle_ratio(&self) -> f64 {
1136 let acquires = self.acquires() as f64;
1137 if acquires > 0.0 {
1138 self.recycles() as f64 / acquires
1139 } else {
1140 0.0
1141 }
1142 }
1143
1144 pub fn hit_ratio(&self) -> f64 {
1146 let acquires = self.acquires() as f64;
1147 if acquires > 0.0 {
1148 1.0 - (self.allocations() as f64 / acquires)
1149 } else {
1150 1.0
1151 }
1152 }
1153}
1154
1155static RECYCLE_STATS: RecycleStats = RecycleStats {
1157 acquires: AtomicU64::new(0),
1158 releases: AtomicU64::new(0),
1159 recycles: AtomicU64::new(0),
1160 allocations: AtomicU64::new(0),
1161};
1162
1163pub fn recycle_stats() -> &'static RecycleStats {
1165 &RECYCLE_STATS
1166}
1167
1168pub struct ConnectionRecycler {
1174 pool: RecyclePool<RecyclableConnection>,
1176 config: ConnectionConfig,
1178}
1179
1180impl ConnectionRecycler {
1181 pub fn new(capacity: usize) -> Self {
1183 Self::with_config(capacity, ConnectionConfig::default())
1184 }
1185
1186 pub fn with_config(capacity: usize, config: ConnectionConfig) -> Self {
1188 let mut pool: RecyclePool<RecyclableConnection> = RecyclePool::new(capacity);
1189
1190 for i in 0..capacity {
1192 if let Some(conn) = pool.get_mut(i) {
1193 conn.inner_mut().set_keep_alive(config.keep_alive);
1194 }
1195 }
1196
1197 Self { pool, config }
1198 }
1199
1200 #[inline]
1202 pub fn acquire(&mut self) -> Option<PoolHandle<'_, RecyclableConnection>> {
1203 let handle = self.pool.acquire()?;
1204 Some(handle)
1205 }
1206
1207 pub fn stats(&self) -> RecyclerStats {
1209 RecyclerStats {
1210 capacity: self.pool.capacity(),
1211 available: self.pool.available(),
1212 active: self.pool.active(),
1213 high_water_mark: self.pool.high_water_mark(),
1214 }
1215 }
1216
1217 pub fn config(&self) -> &ConnectionConfig {
1219 &self.config
1220 }
1221
1222 pub fn should_recycle(&self, conn: &RecyclableConnection) -> bool {
1224 if conn.inner().request_count() >= self.config.max_requests {
1226 return false;
1227 }
1228
1229 if !conn.inner().keep_alive() {
1231 return false;
1232 }
1233
1234 if let Some(age) = conn.inner().age() {
1236 if age > self.config.idle_timeout * 10 {
1237 return false;
1238 }
1239 }
1240
1241 true
1242 }
1243}
1244
1245#[derive(Debug, Clone, Copy)]
1247pub struct RecyclerStats {
1248 pub capacity: usize,
1250 pub available: usize,
1252 pub active: usize,
1254 pub high_water_mark: usize,
1256}
1257
1258#[derive(Debug, Clone)]
1264pub enum TransitionError {
1265 InvalidTransition {
1267 from: ConnectionState,
1268 event: ConnectionEvent,
1269 },
1270 AlreadyClosed,
1272}
1273
1274impl std::fmt::Display for TransitionError {
1275 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
1276 match self {
1277 Self::InvalidTransition { from, event } => {
1278 write!(f, "Invalid transition: {:?} + {:?}", from, event)
1279 }
1280 Self::AlreadyClosed => write!(f, "Connection already closed"),
1281 }
1282 }
1283}
1284
1285impl std::error::Error for TransitionError {}
1286
1287#[derive(Debug)]
1293pub struct ConnectionPool {
1294 connections: Vec<Connection>,
1296 free_indices: Vec<usize>,
1298 capacity: usize,
1300}
1301
1302impl ConnectionPool {
1303 pub fn new(capacity: usize) -> Self {
1305 let mut connections = Vec::with_capacity(capacity);
1306 let mut free_indices = Vec::with_capacity(capacity);
1307
1308 for i in 0..capacity {
1309 connections.push(Connection::with_id(i as u64));
1310 free_indices.push(i);
1311 }
1312
1313 Self {
1314 connections,
1315 free_indices,
1316 capacity,
1317 }
1318 }
1319
1320 #[inline]
1322 pub fn acquire(&mut self) -> Option<&mut Connection> {
1323 let index = self.free_indices.pop()?;
1324 let conn = &mut self.connections[index];
1325 conn.state = ConnectionState::Idle;
1326 CONNECTION_STATS.record_pool_acquire();
1327 Some(conn)
1328 }
1329
1330 #[inline]
1332 pub fn release(&mut self, id: u64) {
1333 let index = id as usize;
1334 if index < self.capacity {
1335 self.connections[index].force_close();
1336 self.free_indices.push(index);
1337 CONNECTION_STATS.record_pool_release();
1338 }
1339 }
1340
1341 #[inline]
1343 pub fn get(&self, id: u64) -> Option<&Connection> {
1344 let index = id as usize;
1345 if index < self.capacity {
1346 Some(&self.connections[index])
1347 } else {
1348 None
1349 }
1350 }
1351
1352 #[inline]
1354 pub fn get_mut(&mut self, id: u64) -> Option<&mut Connection> {
1355 let index = id as usize;
1356 if index < self.capacity {
1357 Some(&mut self.connections[index])
1358 } else {
1359 None
1360 }
1361 }
1362
1363 #[inline]
1365 pub fn capacity(&self) -> usize {
1366 self.capacity
1367 }
1368
1369 #[inline]
1371 pub fn available(&self) -> usize {
1372 self.free_indices.len()
1373 }
1374
1375 #[inline]
1377 pub fn active(&self) -> usize {
1378 self.capacity - self.free_indices.len()
1379 }
1380}
1381
1382#[derive(Debug, Clone)]
1388pub struct ConnectionConfig {
1389 pub idle_timeout: Duration,
1391 pub max_requests: u32,
1393 pub keep_alive: bool,
1395 pub read_timeout: Duration,
1397 pub write_timeout: Duration,
1399}
1400
1401impl Default for ConnectionConfig {
1402 fn default() -> Self {
1403 Self {
1404 idle_timeout: Duration::from_secs(60),
1405 max_requests: 1000,
1406 keep_alive: true,
1407 read_timeout: Duration::from_secs(30),
1408 write_timeout: Duration::from_secs(30),
1409 }
1410 }
1411}
1412
1413impl ConnectionConfig {
1414 pub fn new() -> Self {
1416 Self::default()
1417 }
1418
1419 pub fn idle_timeout(mut self, timeout: Duration) -> Self {
1421 self.idle_timeout = timeout;
1422 self
1423 }
1424
1425 pub fn max_requests(mut self, max: u32) -> Self {
1427 self.max_requests = max;
1428 self
1429 }
1430
1431 pub fn keep_alive(mut self, enabled: bool) -> Self {
1433 self.keep_alive = enabled;
1434 self
1435 }
1436
1437 pub fn read_timeout(mut self, timeout: Duration) -> Self {
1439 self.read_timeout = timeout;
1440 self
1441 }
1442
1443 pub fn write_timeout(mut self, timeout: Duration) -> Self {
1445 self.write_timeout = timeout;
1446 self
1447 }
1448}
1449
1450#[derive(Debug, Default)]
1456pub struct ConnectionStats {
1457 connected: AtomicU64,
1459 closed: AtomicU64,
1461 errors: AtomicU64,
1463 reuses: AtomicU64,
1465 pool_acquires: AtomicU64,
1467 pool_releases: AtomicU64,
1469 #[allow(dead_code)] transitions: AtomicU64,
1472}
1473
1474impl ConnectionStats {
1475 pub fn new() -> Self {
1477 Self::default()
1478 }
1479
1480 #[inline]
1481 fn record_connected(&self) {
1482 self.connected.fetch_add(1, Ordering::Relaxed);
1483 }
1484
1485 #[inline]
1486 fn record_closed(&self) {
1487 self.closed.fetch_add(1, Ordering::Relaxed);
1488 }
1489
1490 #[inline]
1491 fn record_error(&self) {
1492 self.errors.fetch_add(1, Ordering::Relaxed);
1493 }
1494
1495 #[inline]
1496 fn record_reuse(&self) {
1497 self.reuses.fetch_add(1, Ordering::Relaxed);
1498 }
1499
1500 #[inline]
1501 fn record_pool_acquire(&self) {
1502 self.pool_acquires.fetch_add(1, Ordering::Relaxed);
1503 }
1504
1505 #[inline]
1506 fn record_pool_release(&self) {
1507 self.pool_releases.fetch_add(1, Ordering::Relaxed);
1508 }
1509
1510 pub fn connected(&self) -> u64 {
1512 self.connected.load(Ordering::Relaxed)
1513 }
1514
1515 pub fn closed(&self) -> u64 {
1517 self.closed.load(Ordering::Relaxed)
1518 }
1519
1520 pub fn errors(&self) -> u64 {
1522 self.errors.load(Ordering::Relaxed)
1523 }
1524
1525 pub fn reuses(&self) -> u64 {
1527 self.reuses.load(Ordering::Relaxed)
1528 }
1529
1530 pub fn active(&self) -> u64 {
1532 let connected = self.connected.load(Ordering::Relaxed);
1533 let closed = self.closed.load(Ordering::Relaxed);
1534 connected.saturating_sub(closed)
1535 }
1536
1537 pub fn pool_acquires(&self) -> u64 {
1539 self.pool_acquires.load(Ordering::Relaxed)
1540 }
1541
1542 pub fn pool_releases(&self) -> u64 {
1544 self.pool_releases.load(Ordering::Relaxed)
1545 }
1546}
1547
1548static CONNECTION_ID_COUNTER: AtomicU64 = AtomicU64::new(0);
1550
1551static CONNECTION_STATS: ConnectionStats = ConnectionStats {
1553 connected: AtomicU64::new(0),
1554 closed: AtomicU64::new(0),
1555 errors: AtomicU64::new(0),
1556 reuses: AtomicU64::new(0),
1557 pool_acquires: AtomicU64::new(0),
1558 pool_releases: AtomicU64::new(0),
1559 transitions: AtomicU64::new(0),
1560};
1561
1562pub fn connection_stats() -> &'static ConnectionStats {
1564 &CONNECTION_STATS
1565}
1566
1567pub struct StateMachineExecutor {
1575 events: Vec<(u64, ConnectionEvent)>,
1577 #[allow(dead_code)] batch_size: usize,
1580}
1581
1582impl StateMachineExecutor {
1583 pub fn new(batch_size: usize) -> Self {
1585 Self {
1586 events: Vec::with_capacity(batch_size),
1587 batch_size,
1588 }
1589 }
1590
1591 #[inline]
1593 pub fn queue(&mut self, conn_id: u64, event: ConnectionEvent) {
1594 self.events.push((conn_id, event));
1595 }
1596
1597 #[inline]
1599 pub fn process(&mut self, pool: &mut ConnectionPool) -> Vec<(u64, TransitionAction)> {
1600 let mut results = Vec::with_capacity(self.events.len());
1601
1602 for (conn_id, event) in self.events.drain(..) {
1603 if let Some(conn) = pool.get_mut(conn_id) {
1604 let action = conn.handle_event(event);
1605 results.push((conn_id, action));
1606 }
1607 }
1608
1609 results
1610 }
1611
1612 #[inline]
1614 pub fn process_with<F>(&mut self, pool: &mut ConnectionPool, mut callback: F)
1615 where
1616 F: FnMut(u64, TransitionAction),
1617 {
1618 for (conn_id, event) in self.events.drain(..) {
1619 if let Some(conn) = pool.get_mut(conn_id) {
1620 let action = conn.handle_event(event);
1621 callback(conn_id, action);
1622 }
1623 }
1624 }
1625
1626 #[inline]
1628 pub fn has_pending(&self) -> bool {
1629 !self.events.is_empty()
1630 }
1631
1632 #[inline]
1634 pub fn pending_count(&self) -> usize {
1635 self.events.len()
1636 }
1637
1638 #[inline]
1640 pub fn clear(&mut self) {
1641 self.events.clear();
1642 }
1643}
1644
1645impl Default for StateMachineExecutor {
1646 fn default() -> Self {
1647 Self::new(64)
1648 }
1649}
1650
1651#[cfg(test)]
1656mod tests {
1657 use super::*;
1658
1659 #[test]
1660 fn test_connection_state_size() {
1661 assert_eq!(std::mem::size_of::<ConnectionState>(), 1);
1663 }
1664
1665 #[test]
1666 fn test_connection_event_size() {
1667 assert_eq!(std::mem::size_of::<ConnectionEvent>(), 1);
1669 }
1670
1671 #[test]
1672 fn test_transition_entry_size() {
1673 assert_eq!(std::mem::size_of::<TransitionEntry>(), 2);
1675 }
1676
1677 #[test]
1678 fn test_basic_state_transitions() {
1679 let mut conn = Connection::new();
1680 assert_eq!(conn.state(), ConnectionState::Idle);
1681
1682 let action = conn.handle_event(ConnectionEvent::Accept);
1684 assert_eq!(conn.state(), ConnectionState::Connected);
1685 assert_eq!(action, TransitionAction::StartRead);
1686
1687 let action = conn.handle_event(ConnectionEvent::DataReady);
1689 assert_eq!(conn.state(), ConnectionState::Reading);
1690 assert_eq!(action, TransitionAction::ContinueRead);
1691
1692 let action = conn.handle_event(ConnectionEvent::RequestComplete);
1694 assert_eq!(conn.state(), ConnectionState::Processing);
1695 assert_eq!(action, TransitionAction::Dispatch);
1696
1697 let action = conn.handle_event(ConnectionEvent::ResponseReady);
1699 assert_eq!(conn.state(), ConnectionState::Writing);
1700 assert_eq!(action, TransitionAction::StartWrite);
1701
1702 let action = conn.handle_event(ConnectionEvent::WriteComplete);
1704 assert_eq!(conn.state(), ConnectionState::Closing);
1705 assert_eq!(action, TransitionAction::InitiateClose);
1706 }
1707
1708 #[test]
1709 fn test_keep_alive_transition() {
1710 let mut conn = Connection::new();
1711 conn.handle_event(ConnectionEvent::Accept);
1712 conn.handle_event(ConnectionEvent::DataReady);
1713 conn.handle_event(ConnectionEvent::RequestComplete);
1714 conn.handle_event(ConnectionEvent::ResponseReady);
1715
1716 let action = conn.handle_event(ConnectionEvent::KeepAlive);
1718 assert_eq!(conn.state(), ConnectionState::Connected);
1719 assert_eq!(action, TransitionAction::Reset);
1720 }
1721
1722 #[test]
1723 fn test_error_transition() {
1724 let mut conn = Connection::new();
1725 conn.handle_event(ConnectionEvent::Accept);
1726
1727 let action = conn.handle_event(ConnectionEvent::Error);
1729 assert_eq!(conn.state(), ConnectionState::Error);
1730 assert_eq!(action, TransitionAction::ForceClose);
1731 assert!(conn.state().is_terminal());
1732 }
1733
1734 #[test]
1735 fn test_timeout_transition() {
1736 let mut conn = Connection::new();
1737 conn.handle_event(ConnectionEvent::Accept);
1738
1739 let action = conn.handle_event(ConnectionEvent::Timeout);
1741 assert_eq!(conn.state(), ConnectionState::Closing);
1742 assert_eq!(action, TransitionAction::InitiateClose);
1743 }
1744
1745 #[test]
1746 fn test_connection_pool() {
1747 let mut pool = ConnectionPool::new(10);
1748
1749 assert_eq!(pool.capacity(), 10);
1750 assert_eq!(pool.available(), 10);
1751 assert_eq!(pool.active(), 0);
1752
1753 let conn1 = pool.acquire().unwrap();
1755 let id1 = conn1.id();
1756 assert_eq!(pool.available(), 9);
1757 assert_eq!(pool.active(), 1);
1758
1759 let conn2 = pool.acquire().unwrap();
1760 let id2 = conn2.id();
1761 assert_eq!(pool.available(), 8);
1762
1763 pool.release(id1);
1765 assert_eq!(pool.available(), 9);
1766
1767 pool.release(id2);
1768 assert_eq!(pool.available(), 10);
1769 }
1770
1771 #[test]
1772 fn test_connection_config() {
1773 let config = ConnectionConfig::new()
1774 .idle_timeout(Duration::from_secs(120))
1775 .max_requests(500)
1776 .keep_alive(false)
1777 .read_timeout(Duration::from_secs(10))
1778 .write_timeout(Duration::from_secs(10));
1779
1780 assert_eq!(config.idle_timeout, Duration::from_secs(120));
1781 assert_eq!(config.max_requests, 500);
1782 assert!(!config.keep_alive);
1783 }
1784
1785 #[test]
1786 fn test_state_machine_executor() {
1787 let mut pool = ConnectionPool::new(5);
1788 let mut executor = StateMachineExecutor::new(10);
1789
1790 let conn = pool.acquire().unwrap();
1792 let conn_id = conn.id();
1793
1794 executor.queue(conn_id, ConnectionEvent::Accept);
1796 executor.queue(conn_id, ConnectionEvent::DataReady);
1797 executor.queue(conn_id, ConnectionEvent::RequestComplete);
1798
1799 assert!(executor.has_pending());
1800 assert_eq!(executor.pending_count(), 3);
1801
1802 let results = executor.process(&mut pool);
1804 assert_eq!(results.len(), 3);
1805 assert!(!executor.has_pending());
1806
1807 let conn = pool.get(conn_id).unwrap();
1809 assert_eq!(conn.state(), ConnectionState::Processing);
1810 }
1811
1812 #[test]
1813 fn test_request_count() {
1814 let mut conn = Connection::new();
1815 assert_eq!(conn.request_count(), 0);
1816
1817 conn.handle_event(ConnectionEvent::Accept);
1819 conn.handle_event(ConnectionEvent::DataReady);
1820 conn.handle_event(ConnectionEvent::RequestComplete); assert_eq!(conn.request_count(), 1);
1822
1823 conn.handle_event(ConnectionEvent::ResponseReady);
1824 conn.handle_event(ConnectionEvent::KeepAlive);
1825
1826 conn.handle_event(ConnectionEvent::DataReady);
1828 conn.handle_event(ConnectionEvent::RequestComplete);
1829 assert_eq!(conn.request_count(), 2);
1830 }
1831
1832 #[test]
1833 fn test_try_transition_valid() {
1834 let mut conn = Connection::new();
1835 let result = conn.try_transition(ConnectionEvent::Accept);
1836 assert!(result.is_ok());
1837 assert_eq!(result.unwrap(), TransitionAction::StartRead);
1838 }
1839
1840 #[test]
1841 fn test_try_transition_invalid() {
1842 let mut conn = Connection::new();
1843 let result = conn.try_transition(ConnectionEvent::DataReady);
1845 assert!(result.is_err());
1846 }
1847
1848 #[test]
1849 fn test_connection_stats() {
1850 let stats = connection_stats();
1851 let _ = stats.connected();
1852 let _ = stats.closed();
1853 let _ = stats.errors();
1854 let _ = stats.active();
1855 let _ = stats.reuses();
1856 }
1857
1858 #[test]
1859 fn test_state_is_terminal() {
1860 assert!(ConnectionState::Closed.is_terminal());
1861 assert!(ConnectionState::Error.is_terminal());
1862 assert!(!ConnectionState::Connected.is_terminal());
1863 assert!(!ConnectionState::Processing.is_terminal());
1864 }
1865
1866 #[test]
1867 fn test_state_is_active() {
1868 assert!(ConnectionState::Reading.is_active());
1869 assert!(ConnectionState::Processing.is_active());
1870 assert!(ConnectionState::Writing.is_active());
1871 assert!(!ConnectionState::Idle.is_active());
1872 assert!(!ConnectionState::Connected.is_active());
1873 }
1874
1875 #[test]
1878 fn test_recyclable_connection_basic() {
1879 let mut conn = RecyclableConnection::new();
1880 assert_eq!(conn.generation(), 0);
1881 assert_eq!(conn.recycle_count(), 0);
1882 assert!(conn.is_clean());
1883
1884 conn.handle_event(ConnectionEvent::Accept);
1886 conn.handle_event(ConnectionEvent::DataReady);
1887
1888 conn.prepare_for_recycle();
1890 assert_eq!(conn.generation(), 1);
1891 assert_eq!(conn.recycle_count(), 1);
1892 assert!(conn.is_clean());
1893 assert_eq!(conn.state(), ConnectionState::Idle);
1894 }
1895
1896 #[test]
1897 fn test_recyclable_connection_user_data() {
1898 let mut conn = RecyclableConnection::new();
1899
1900 conn.set_user_data(42u32);
1902 assert_eq!(conn.user_data::<u32>(), Some(&42));
1903
1904 let data = conn.take_user_data::<u32>();
1906 assert_eq!(data, Some(42));
1907 assert!(conn.user_data::<u32>().is_none());
1908 }
1909
1910 #[test]
1911 fn test_recyclable_connection_buffer_capacities() {
1912 let conn = RecyclableConnection::with_capacities(4096, 8192);
1913 assert_eq!(conn.read_buffer_capacity(), 4096);
1914 assert_eq!(conn.write_buffer_capacity(), 8192);
1915 }
1916
1917 #[test]
1918 fn test_recycle_pool_basic() {
1919 let mut pool: RecyclePool<RecyclableConnection> = RecyclePool::new(5);
1920
1921 assert_eq!(pool.capacity(), 5);
1922 assert_eq!(pool.available(), 5);
1923 assert_eq!(pool.active(), 0);
1924 assert!(!pool.is_exhausted());
1925
1926 {
1928 let mut handle = pool.acquire().unwrap();
1929 handle.handle_event(ConnectionEvent::Accept);
1930 assert_eq!(handle.state(), ConnectionState::Connected);
1931 }
1933
1934 assert_eq!(pool.available(), 5);
1936 assert_eq!(pool.active(), 0);
1937 }
1938
1939 #[test]
1940 fn test_recycle_pool_exhaustion() {
1941 let mut pool: RecyclePool<RecyclableConnection> = RecyclePool::new(2);
1942
1943 assert_eq!(pool.capacity(), 2);
1945 assert_eq!(pool.available(), 2);
1946 assert!(!pool.is_exhausted());
1947
1948 {
1950 let h = pool.acquire().unwrap();
1951 assert_eq!(h.index(), 1); }
1953 assert_eq!(pool.available(), 2);
1954
1955 {
1957 let h = pool.acquire().unwrap();
1958 assert_eq!(h.index(), 1);
1959 }
1960
1961 assert!(!pool.is_exhausted());
1963 }
1964
1965 #[test]
1966 fn test_recycle_pool_generation_tracking() {
1967 let mut pool: RecyclePool<RecyclableConnection> = RecyclePool::new(1);
1968
1969 let gen1 = {
1970 let handle = pool.acquire().unwrap();
1971 handle.generation()
1972 };
1973
1974 let gen2 = {
1975 let handle = pool.acquire().unwrap();
1976 handle.generation()
1977 };
1978
1979 assert!(gen2 > gen1);
1981 }
1982
1983 #[test]
1984 fn test_recycle_pool_high_water_mark() {
1985 let mut pool: RecyclePool<RecyclableConnection> = RecyclePool::new(5);
1986
1987 {
1989 let _ = pool.acquire().unwrap();
1990 }
1991 assert!(pool.high_water_mark() >= 1);
1992
1993 let hwm_before = pool.high_water_mark();
1995
1996 pool.reset_high_water_mark();
1998 assert!(pool.high_water_mark() <= hwm_before);
1999 }
2000
2001 #[test]
2002 fn test_recycle_pool_grow() {
2003 let mut pool: RecyclePool<RecyclableConnection> = RecyclePool::new(2);
2004
2005 assert_eq!(pool.capacity(), 2);
2006
2007 pool.grow(3);
2008
2009 assert_eq!(pool.capacity(), 5);
2010 assert_eq!(pool.available(), 5);
2011 }
2012
2013 #[test]
2014 fn test_recycle_pool_config() {
2015 let config = RecyclePoolConfig::new()
2016 .initial_capacity(50)
2017 .max_capacity(500)
2018 .grow_by(25)
2019 .shrink_threshold(0.1)
2020 .min_capacity(5);
2021
2022 assert_eq!(config.initial_capacity, 50);
2023 assert_eq!(config.max_capacity, 500);
2024 assert_eq!(config.grow_by, 25);
2025 assert!((config.shrink_threshold - 0.1).abs() < 0.001);
2026 assert_eq!(config.min_capacity, 5);
2027 }
2028
2029 #[test]
2030 fn test_connection_recycler() {
2031 let mut recycler = ConnectionRecycler::new(10);
2032
2033 let stats = recycler.stats();
2034 assert_eq!(stats.capacity, 10);
2035 assert_eq!(stats.available, 10);
2036 assert_eq!(stats.active, 0);
2037
2038 {
2040 let mut handle = recycler.acquire().unwrap();
2041 handle.handle_event(ConnectionEvent::Accept);
2042 }
2044
2045 let stats = recycler.stats();
2047 assert_eq!(stats.active, 0);
2048 assert_eq!(stats.available, 10);
2049 }
2050
2051 #[test]
2052 fn test_recycle_stats() {
2053 let stats = recycle_stats();
2054 let _ = stats.acquires();
2055 let _ = stats.releases();
2056 let _ = stats.recycles();
2057 let _ = stats.allocations();
2058 let _ = stats.recycle_ratio();
2059 let _ = stats.hit_ratio();
2060 }
2061
2062 #[test]
2063 fn test_recyclable_trait() {
2064 let mut conn = RecyclableConnection::new();
2065
2066 assert!(conn.is_clean());
2068 assert_eq!(conn.generation(), 0);
2069
2070 conn.handle_event(ConnectionEvent::Accept);
2071 conn.increment_generation();
2072 assert_eq!(conn.generation(), 1);
2073
2074 conn.reset();
2075 assert!(conn.is_clean());
2076 assert_eq!(conn.generation(), 2); }
2078}