Skip to main content

liminal_sdk/connection/
pool.rs

1use alloc::format;
2use alloc::vec::Vec;
3
4use crate::SdkError;
5
6use super::{
7    ConnectionEvent, ConnectionLifecycle, ReconnectConfig, ResumeRequest, SubscriptionId,
8    SubscriptionRecovery,
9};
10
11/// Caller-supplied connection pool sizing and resource configuration.
12///
13/// This type deliberately has no [`Default`] implementation: pool sizing must be
14/// supplied by the caller, builder, or runtime so the SDK never bakes in a hidden
15/// connection-count default.
16#[derive(Clone, Copy, Debug, PartialEq, Eq)]
17pub struct ConnectionPoolConfig {
18    /// Maximum number of remote connections managed by this pool.
19    pub max_connections: usize,
20    /// Per-connection operation timeout, in milliseconds.
21    pub timeout_millis: u64,
22    /// Per-connection inbound buffer size.
23    pub buffer_size: usize,
24}
25
26impl ConnectionPoolConfig {
27    /// Creates pool configuration from caller-supplied values.
28    #[must_use]
29    pub const fn new(max_connections: usize, timeout_millis: u64, buffer_size: usize) -> Self {
30        Self {
31            max_connections,
32            timeout_millis,
33            buffer_size,
34        }
35    }
36
37    /// Validates caller-supplied pool configuration.
38    ///
39    /// # Errors
40    ///
41    /// Returns [`SdkError`] when no connection can be allocated.
42    pub fn validate(self) -> Result<Self, SdkError> {
43        if self.max_connections == 0 {
44            return Err(SdkError::Connection {
45                description: "connection pool max_connections must be greater than zero".into(),
46            });
47        }
48
49        Ok(self)
50    }
51}
52
53/// Stable identifier for an internally managed pooled connection.
54#[derive(Clone, Copy, Debug, PartialEq, Eq, PartialOrd, Ord, Hash)]
55pub struct PoolConnectionId(usize);
56
57impl PoolConnectionId {
58    /// Creates a pooled connection identifier.
59    #[must_use]
60    pub const fn new(value: usize) -> Self {
61        Self(value)
62    }
63
64    /// Returns the numeric connection slot.
65    #[must_use]
66    pub const fn get(self) -> usize {
67        self.0
68    }
69}
70
71/// Assignment returned when a subscription is placed on a pooled connection.
72#[derive(Clone, Copy, Debug, PartialEq, Eq)]
73pub struct SubscriptionAssignment {
74    /// Subscription assigned to the pool.
75    pub subscription_id: SubscriptionId,
76    /// Connection that owns the subscription.
77    pub connection_id: PoolConnectionId,
78}
79
80#[derive(Debug)]
81struct PoolConnection {
82    id: PoolConnectionId,
83    subscription_count: usize,
84    lifecycle: ConnectionLifecycle,
85    recovery: SubscriptionRecovery,
86}
87
88impl PoolConnection {
89    fn new(id: PoolConnectionId, reconnect_config: ReconnectConfig) -> Self {
90        Self {
91            id,
92            subscription_count: 0,
93            lifecycle: ConnectionLifecycle::new(reconnect_config),
94            recovery: SubscriptionRecovery::new(),
95        }
96    }
97}
98
99/// Configurable pool for remote SDK connections.
100#[derive(Debug)]
101pub struct ConnectionPool {
102    config: ConnectionPoolConfig,
103    connections: Vec<PoolConnection>,
104}
105
106impl ConnectionPool {
107    /// Creates a pool with exactly the caller-supplied connection count.
108    ///
109    /// # Errors
110    ///
111    /// Returns [`SdkError`] if the supplied configuration is invalid.
112    pub fn new(
113        config: ConnectionPoolConfig,
114        reconnect_config: ReconnectConfig,
115    ) -> Result<Self, SdkError> {
116        let config = config.validate()?;
117        let mut connections = Vec::with_capacity(config.max_connections);
118
119        for slot in 0..config.max_connections {
120            connections.push(PoolConnection::new(
121                PoolConnectionId::new(slot),
122                reconnect_config,
123            ));
124        }
125
126        Ok(Self {
127            config,
128            connections,
129        })
130    }
131
132    /// Returns the caller-supplied pool configuration.
133    #[must_use]
134    pub const fn config(&self) -> ConnectionPoolConfig {
135        self.config
136    }
137
138    /// Returns the caller-supplied maximum connection count.
139    #[must_use]
140    pub const fn max_connections(&self) -> usize {
141        self.config.max_connections
142    }
143
144    /// Returns the number of managed connection slots.
145    #[must_use]
146    pub fn connection_count(&self) -> usize {
147        self.connections.len()
148    }
149
150    /// Assigns a subscription to the least-loaded pooled connection.
151    ///
152    /// # Errors
153    ///
154    /// Returns [`SdkError`] if the pool has no available connection entries.
155    pub fn assign_subscription(
156        &mut self,
157        subscription_id: SubscriptionId,
158    ) -> Result<SubscriptionAssignment, SdkError> {
159        if let Some(existing) = self.connection_for_subscription(subscription_id) {
160            return Ok(SubscriptionAssignment {
161                subscription_id,
162                connection_id: existing,
163            });
164        }
165
166        let connection = self
167            .connections
168            .iter_mut()
169            .min_by_key(|connection| (connection.subscription_count, connection.id))
170            .ok_or_else(|| SdkError::Connection {
171                description: "connection pool has no connections".into(),
172            })?;
173
174        connection.subscription_count = connection.subscription_count.saturating_add(1);
175        connection.recovery.track_subscription(subscription_id);
176
177        Ok(SubscriptionAssignment {
178            subscription_id,
179            connection_id: connection.id,
180        })
181    }
182
183    /// Records an acknowledged sequence for the connection that owns a subscription.
184    ///
185    /// # Errors
186    ///
187    /// Returns [`SdkError`] when the subscription is not active in this pool.
188    pub fn acknowledge(
189        &mut self,
190        subscription_id: SubscriptionId,
191        sequence: u64,
192    ) -> Result<(), SdkError> {
193        let connection = self.connection_for_subscription_mut(subscription_id)?;
194        connection.recovery.acknowledge(subscription_id, sequence);
195        Ok(())
196    }
197
198    /// Removes a subscription assignment and its recovery state.
199    ///
200    /// # Errors
201    ///
202    /// Returns [`SdkError`] when the subscription is not active in this pool.
203    pub fn unsubscribe(&mut self, subscription_id: SubscriptionId) -> Result<(), SdkError> {
204        let connection = self.connection_for_subscription_mut(subscription_id)?;
205        connection.recovery.unsubscribe(subscription_id);
206        connection.subscription_count = connection.subscription_count.saturating_sub(1);
207        Ok(())
208    }
209
210    /// Builds subscription resume requests for every pooled connection on reconnect.
211    ///
212    /// # Errors
213    ///
214    /// Returns [`SdkError`] if any active subscription cannot compute a resume sequence.
215    pub fn resume_requests_for_transition(
216        &self,
217        event: &ConnectionEvent,
218    ) -> Result<Vec<ResumeRequest>, SdkError> {
219        let mut requests = Vec::new();
220        for connection in &self.connections {
221            requests.extend(connection.recovery.resume_requests_for_transition(event)?);
222        }
223        Ok(requests)
224    }
225
226    /// Returns the connection assigned to a subscription, if it is active.
227    #[must_use]
228    pub fn connection_for_subscription(
229        &self,
230        subscription_id: SubscriptionId,
231    ) -> Option<PoolConnectionId> {
232        self.connections
233            .iter()
234            .find(|connection| connection.recovery.is_active(subscription_id))
235            .map(|connection| connection.id)
236    }
237
238    /// Returns the number of active subscriptions on a connection.
239    ///
240    /// # Errors
241    ///
242    /// Returns [`SdkError`] if the connection identifier is not part of this pool.
243    pub fn subscription_count(&self, connection_id: PoolConnectionId) -> Result<usize, SdkError> {
244        self.connection(connection_id)
245            .map(|connection| connection.subscription_count)
246    }
247
248    /// Returns recovery state for a pooled connection.
249    ///
250    /// # Errors
251    ///
252    /// Returns [`SdkError`] if the connection identifier is not part of this pool.
253    pub fn recovery(
254        &self,
255        connection_id: PoolConnectionId,
256    ) -> Result<&SubscriptionRecovery, SdkError> {
257        self.connection(connection_id)
258            .map(|connection| &connection.recovery)
259    }
260
261    /// Returns lifecycle state for a pooled connection.
262    ///
263    /// # Errors
264    ///
265    /// Returns [`SdkError`] if the connection identifier is not part of this pool.
266    pub fn lifecycle(
267        &self,
268        connection_id: PoolConnectionId,
269    ) -> Result<&ConnectionLifecycle, SdkError> {
270        self.connection(connection_id)
271            .map(|connection| &connection.lifecycle)
272    }
273
274    fn connection(&self, connection_id: PoolConnectionId) -> Result<&PoolConnection, SdkError> {
275        self.connections
276            .iter()
277            .find(|connection| connection.id == connection_id)
278            .ok_or_else(|| SdkError::Connection {
279                description: format!("unknown pooled connection {}", connection_id.get()),
280            })
281    }
282
283    fn connection_for_subscription_mut(
284        &mut self,
285        subscription_id: SubscriptionId,
286    ) -> Result<&mut PoolConnection, SdkError> {
287        self.connections
288            .iter_mut()
289            .find(|connection| connection.recovery.is_active(subscription_id))
290            .ok_or_else(|| SdkError::Connection {
291                description: format!("unknown subscription {}", subscription_id.get()),
292            })
293    }
294}
295
296#[cfg(test)]
297mod tests {
298    use super::*;
299    use crate::connection::{ConnectionState, DisconnectReason};
300
301    #[test]
302    fn invalid_pool_size_is_rejected() {
303        let config = ConnectionPoolConfig::new(0, 10, 16);
304
305        assert!(ConnectionPool::new(config, ReconnectConfig::default()).is_err());
306    }
307
308    #[test]
309    fn subscriptions_are_distributed_across_connections() -> Result<(), SdkError> {
310        let config = ConnectionPoolConfig::new(2, 10, 16);
311        let mut pool = ConnectionPool::new(config, ReconnectConfig::default())?;
312
313        let first = pool.assign_subscription(SubscriptionId::new(1))?;
314        let second = pool.assign_subscription(SubscriptionId::new(2))?;
315        let third = pool.assign_subscription(SubscriptionId::new(3))?;
316
317        assert_ne!(first.connection_id, second.connection_id);
318        assert_eq!(third.connection_id, first.connection_id);
319        assert_eq!(pool.subscription_count(first.connection_id)?, 2);
320        assert_eq!(pool.subscription_count(second.connection_id)?, 1);
321        Ok(())
322    }
323
324    #[test]
325    fn multiple_subscriptions_share_configured_connections() -> Result<(), SdkError> {
326        let config = ConnectionPoolConfig::new(1, 20, 32);
327        let mut pool = ConnectionPool::new(config, ReconnectConfig::default())?;
328
329        let first = pool.assign_subscription(SubscriptionId::new(10))?;
330        let second = pool.assign_subscription(SubscriptionId::new(11))?;
331
332        assert_eq!(first.connection_id, second.connection_id);
333        assert_eq!(pool.max_connections(), 1);
334        assert_eq!(pool.subscription_count(first.connection_id)?, 2);
335        Ok(())
336    }
337
338    #[test]
339    fn pooled_recovery_builds_resume_requests_on_reconnect() -> Result<(), SdkError> {
340        let config = ConnectionPoolConfig::new(2, 10, 16);
341        let mut pool = ConnectionPool::new(config, ReconnectConfig::default())?;
342        let first = SubscriptionId::new(21);
343        let second = SubscriptionId::new(22);
344        let event = ConnectionEvent::new(
345            ConnectionState::Reconnecting { attempt: 0 },
346            ConnectionState::Connected,
347        );
348
349        pool.assign_subscription(first)?;
350        pool.assign_subscription(second)?;
351        pool.acknowledge(first, 4)?;
352
353        let requests = pool.resume_requests_for_transition(&event)?;
354
355        assert_eq!(
356            requests,
357            vec![ResumeRequest::new(first, 5), ResumeRequest::new(second, 0)]
358        );
359        Ok(())
360    }
361
362    #[test]
363    fn unsubscribe_removes_assignment() -> Result<(), SdkError> {
364        let config = ConnectionPoolConfig::new(2, 10, 16);
365        let mut pool = ConnectionPool::new(config, ReconnectConfig::default())?;
366        let subscription_id = SubscriptionId::new(31);
367        let assignment = pool.assign_subscription(subscription_id)?;
368
369        pool.unsubscribe(subscription_id)?;
370
371        assert_eq!(pool.connection_for_subscription(subscription_id), None);
372        assert_eq!(pool.subscription_count(assignment.connection_id)?, 0);
373        assert!(pool.unsubscribe(subscription_id).is_err());
374        Ok(())
375    }
376
377    #[test]
378    fn non_reconnect_transition_does_not_resume() -> Result<(), SdkError> {
379        let config = ConnectionPoolConfig::new(2, 10, 16);
380        let mut pool = ConnectionPool::new(config, ReconnectConfig::default())?;
381        let event = ConnectionEvent::new(
382            ConnectionState::Connected,
383            ConnectionState::Disconnected {
384                reason: DisconnectReason::Normal,
385            },
386        );
387
388        pool.assign_subscription(SubscriptionId::new(41))?;
389
390        assert!(pool.resume_requests_for_transition(&event)?.is_empty());
391        Ok(())
392    }
393}