middleware_core/
session.rs1use std::collections::HashMap;
2
3use core_types::{SessionId, Timestamp, TransportDomain};
4
5#[derive(Clone, Copy, Debug, Eq, PartialEq)]
6pub struct ReconnectBackoffPolicy {
7 pub base_delay_ms: u64,
8 pub max_delay_ms: u64,
9}
10
11impl Default for ReconnectBackoffPolicy {
12 fn default() -> Self {
13 Self {
14 base_delay_ms: 200,
15 max_delay_ms: 5_000,
16 }
17 }
18}
19
20#[derive(Clone, Copy, Debug, Eq, PartialEq)]
21pub enum SessionLifecycle {
22 Connecting,
23 Connected,
24 Reconnecting,
25 Closed,
26}
27
28#[derive(Clone, Debug, Eq, PartialEq)]
29pub struct SessionRecord {
30 pub id: SessionId,
31 pub domain: TransportDomain,
32 pub target: String,
33 pub opened_at: Timestamp,
34 pub lifecycle: SessionLifecycle,
35 pub reconnect_attempts: u32,
36 pub reconnect_backoff_ms: u64,
37 pub next_retry_at: Option<Timestamp>,
38 pub last_error: Option<String>,
39}
40
41#[derive(Clone, Copy, Debug, Default, Eq, PartialEq)]
42pub struct SessionLifecycleCounts {
43 pub connecting: usize,
44 pub connected: usize,
45 pub reconnecting: usize,
46 pub closed: usize,
47}
48
49pub trait SessionManager {
50 fn open_session(&mut self, domain: TransportDomain, target: impl Into<String>) -> SessionId;
51 fn get_session(&self, id: SessionId) -> Option<&SessionRecord>;
52 fn set_backoff_policy(&mut self, policy: ReconnectBackoffPolicy);
53 fn mark_connected(&mut self, id: SessionId);
54 fn mark_reconnecting(&mut self, id: SessionId, reason: impl Into<String>);
55 fn mark_closed(&mut self, id: SessionId);
56}
57
58#[derive(Default)]
59pub struct SimpleSessionManager {
60 next_id: u64,
61 policy: ReconnectBackoffPolicy,
62 sessions: HashMap<SessionId, SessionRecord>,
63}
64
65impl SimpleSessionManager {
66 pub fn lifecycle_counts(&self) -> SessionLifecycleCounts {
67 let mut out = SessionLifecycleCounts::default();
68 for record in self.sessions.values() {
69 match record.lifecycle {
70 SessionLifecycle::Connecting => out.connecting += 1,
71 SessionLifecycle::Connected => out.connected += 1,
72 SessionLifecycle::Reconnecting => out.reconnecting += 1,
73 SessionLifecycle::Closed => out.closed += 1,
74 }
75 }
76 out
77 }
78}
79
80impl SessionManager for SimpleSessionManager {
81 fn open_session(&mut self, domain: TransportDomain, target: impl Into<String>) -> SessionId {
82 self.next_id += 1;
83 let id = SessionId::new(self.next_id);
84 self.sessions.insert(
85 id,
86 SessionRecord {
87 id,
88 domain,
89 target: target.into(),
90 opened_at: Timestamp::now(),
91 lifecycle: SessionLifecycle::Connecting,
92 reconnect_attempts: 0,
93 reconnect_backoff_ms: 0,
94 next_retry_at: None,
95 last_error: None,
96 },
97 );
98 id
99 }
100
101 fn get_session(&self, id: SessionId) -> Option<&SessionRecord> {
102 self.sessions.get(&id)
103 }
104
105 fn set_backoff_policy(&mut self, policy: ReconnectBackoffPolicy) {
106 self.policy = policy;
107 }
108
109 fn mark_connected(&mut self, id: SessionId) {
110 if let Some(record) = self.sessions.get_mut(&id) {
111 record.lifecycle = SessionLifecycle::Connected;
112 record.reconnect_backoff_ms = 0;
113 record.next_retry_at = None;
114 record.last_error = None;
115 }
116 }
117
118 fn mark_reconnecting(&mut self, id: SessionId, reason: impl Into<String>) {
119 let policy = self.policy;
120 if let Some(record) = self.sessions.get_mut(&id) {
121 record.lifecycle = SessionLifecycle::Reconnecting;
122 record.reconnect_attempts += 1;
123 let shift = record.reconnect_attempts.saturating_sub(1).min(16);
124 let factor = 1u64 << shift;
125 let backoff = policy
126 .base_delay_ms
127 .saturating_mul(factor)
128 .min(policy.max_delay_ms);
129 record.reconnect_backoff_ms = backoff;
130 record.next_retry_at = Some(Timestamp(
131 Timestamp::now().0 + (backoff as u128) * 1_000_000,
132 ));
133 record.last_error = Some(reason.into());
134 }
135 }
136
137 fn mark_closed(&mut self, id: SessionId) {
138 if let Some(record) = self.sessions.get_mut(&id) {
139 record.lifecycle = SessionLifecycle::Closed;
140 }
141 }
142}