Skip to main content

middleware_core/
session.rs

1use std::collections::HashMap;
2
3use core_types::{SessionId, Timestamp, TransportDomain};
4
5#[derive(Clone, Copy, Debug, Eq, PartialEq)]
6pub struct ReconnectBackoffPolicy {
7    pub base_delay_ms: u64,
8    pub max_delay_ms: u64,
9}
10
11impl Default for ReconnectBackoffPolicy {
12    fn default() -> Self {
13        Self {
14            base_delay_ms: 200,
15            max_delay_ms: 5_000,
16        }
17    }
18}
19
20#[derive(Clone, Copy, Debug, Eq, PartialEq)]
21pub enum SessionLifecycle {
22    Connecting,
23    Connected,
24    Reconnecting,
25    Closed,
26}
27
28#[derive(Clone, Debug, Eq, PartialEq)]
29pub struct SessionRecord {
30    pub id: SessionId,
31    pub domain: TransportDomain,
32    pub target: String,
33    pub opened_at: Timestamp,
34    pub lifecycle: SessionLifecycle,
35    pub reconnect_attempts: u32,
36    pub reconnect_backoff_ms: u64,
37    pub next_retry_at: Option<Timestamp>,
38    pub last_error: Option<String>,
39}
40
41#[derive(Clone, Copy, Debug, Default, Eq, PartialEq)]
42pub struct SessionLifecycleCounts {
43    pub connecting: usize,
44    pub connected: usize,
45    pub reconnecting: usize,
46    pub closed: usize,
47}
48
49pub trait SessionManager {
50    fn open_session(&mut self, domain: TransportDomain, target: impl Into<String>) -> SessionId;
51    fn get_session(&self, id: SessionId) -> Option<&SessionRecord>;
52    fn set_backoff_policy(&mut self, policy: ReconnectBackoffPolicy);
53    fn mark_connected(&mut self, id: SessionId);
54    fn mark_reconnecting(&mut self, id: SessionId, reason: impl Into<String>);
55    fn mark_closed(&mut self, id: SessionId);
56}
57
58#[derive(Default)]
59pub struct SimpleSessionManager {
60    next_id: u64,
61    policy: ReconnectBackoffPolicy,
62    sessions: HashMap<SessionId, SessionRecord>,
63}
64
65impl SimpleSessionManager {
66    pub fn lifecycle_counts(&self) -> SessionLifecycleCounts {
67        let mut out = SessionLifecycleCounts::default();
68        for record in self.sessions.values() {
69            match record.lifecycle {
70                SessionLifecycle::Connecting => out.connecting += 1,
71                SessionLifecycle::Connected => out.connected += 1,
72                SessionLifecycle::Reconnecting => out.reconnecting += 1,
73                SessionLifecycle::Closed => out.closed += 1,
74            }
75        }
76        out
77    }
78}
79
80impl SessionManager for SimpleSessionManager {
81    fn open_session(&mut self, domain: TransportDomain, target: impl Into<String>) -> SessionId {
82        self.next_id += 1;
83        let id = SessionId::new(self.next_id);
84        self.sessions.insert(
85            id,
86            SessionRecord {
87                id,
88                domain,
89                target: target.into(),
90                opened_at: Timestamp::now(),
91                lifecycle: SessionLifecycle::Connecting,
92                reconnect_attempts: 0,
93                reconnect_backoff_ms: 0,
94                next_retry_at: None,
95                last_error: None,
96            },
97        );
98        id
99    }
100
101    fn get_session(&self, id: SessionId) -> Option<&SessionRecord> {
102        self.sessions.get(&id)
103    }
104
105    fn set_backoff_policy(&mut self, policy: ReconnectBackoffPolicy) {
106        self.policy = policy;
107    }
108
109    fn mark_connected(&mut self, id: SessionId) {
110        if let Some(record) = self.sessions.get_mut(&id) {
111            record.lifecycle = SessionLifecycle::Connected;
112            record.reconnect_backoff_ms = 0;
113            record.next_retry_at = None;
114            record.last_error = None;
115        }
116    }
117
118    fn mark_reconnecting(&mut self, id: SessionId, reason: impl Into<String>) {
119        let policy = self.policy;
120        if let Some(record) = self.sessions.get_mut(&id) {
121            record.lifecycle = SessionLifecycle::Reconnecting;
122            record.reconnect_attempts += 1;
123            let shift = record.reconnect_attempts.saturating_sub(1).min(16);
124            let factor = 1u64 << shift;
125            let backoff = policy
126                .base_delay_ms
127                .saturating_mul(factor)
128                .min(policy.max_delay_ms);
129            record.reconnect_backoff_ms = backoff;
130            record.next_retry_at = Some(Timestamp(
131                Timestamp::now().0 + (backoff as u128) * 1_000_000,
132            ));
133            record.last_error = Some(reason.into());
134        }
135    }
136
137    fn mark_closed(&mut self, id: SessionId) {
138        if let Some(record) = self.sessions.get_mut(&id) {
139            record.lifecycle = SessionLifecycle::Closed;
140        }
141    }
142}