1use alloc::format;
2use alloc::vec::Vec;
3
4use crate::SdkError;
5
6use super::{
7 ConnectionEvent, ConnectionLifecycle, ReconnectConfig, ResumeRequest, SubscriptionId,
8 SubscriptionRecovery,
9};
10
11#[derive(Clone, Copy, Debug, PartialEq, Eq)]
17pub struct ConnectionPoolConfig {
18 pub max_connections: usize,
20 pub timeout_millis: u64,
22 pub buffer_size: usize,
24}
25
26impl ConnectionPoolConfig {
27 #[must_use]
29 pub const fn new(max_connections: usize, timeout_millis: u64, buffer_size: usize) -> Self {
30 Self {
31 max_connections,
32 timeout_millis,
33 buffer_size,
34 }
35 }
36
37 pub fn validate(self) -> Result<Self, SdkError> {
43 if self.max_connections == 0 {
44 return Err(SdkError::Connection {
45 description: "connection pool max_connections must be greater than zero".into(),
46 });
47 }
48
49 Ok(self)
50 }
51}
52
53#[derive(Clone, Copy, Debug, PartialEq, Eq, PartialOrd, Ord, Hash)]
55pub struct PoolConnectionId(usize);
56
57impl PoolConnectionId {
58 #[must_use]
60 pub const fn new(value: usize) -> Self {
61 Self(value)
62 }
63
64 #[must_use]
66 pub const fn get(self) -> usize {
67 self.0
68 }
69}
70
71#[derive(Clone, Copy, Debug, PartialEq, Eq)]
73pub struct SubscriptionAssignment {
74 pub subscription_id: SubscriptionId,
76 pub connection_id: PoolConnectionId,
78}
79
80#[derive(Debug)]
81struct PoolConnection {
82 id: PoolConnectionId,
83 subscription_count: usize,
84 lifecycle: ConnectionLifecycle,
85 recovery: SubscriptionRecovery,
86}
87
88impl PoolConnection {
89 fn new(id: PoolConnectionId, reconnect_config: ReconnectConfig) -> Self {
90 Self {
91 id,
92 subscription_count: 0,
93 lifecycle: ConnectionLifecycle::new(reconnect_config),
94 recovery: SubscriptionRecovery::new(),
95 }
96 }
97}
98
99#[derive(Debug)]
101pub struct ConnectionPool {
102 config: ConnectionPoolConfig,
103 connections: Vec<PoolConnection>,
104}
105
106impl ConnectionPool {
107 pub fn new(
113 config: ConnectionPoolConfig,
114 reconnect_config: ReconnectConfig,
115 ) -> Result<Self, SdkError> {
116 let config = config.validate()?;
117 let mut connections = Vec::with_capacity(config.max_connections);
118
119 for slot in 0..config.max_connections {
120 connections.push(PoolConnection::new(
121 PoolConnectionId::new(slot),
122 reconnect_config,
123 ));
124 }
125
126 Ok(Self {
127 config,
128 connections,
129 })
130 }
131
132 #[must_use]
134 pub const fn config(&self) -> ConnectionPoolConfig {
135 self.config
136 }
137
138 #[must_use]
140 pub const fn max_connections(&self) -> usize {
141 self.config.max_connections
142 }
143
144 #[must_use]
146 pub fn connection_count(&self) -> usize {
147 self.connections.len()
148 }
149
150 pub fn assign_subscription(
156 &mut self,
157 subscription_id: SubscriptionId,
158 ) -> Result<SubscriptionAssignment, SdkError> {
159 if let Some(existing) = self.connection_for_subscription(subscription_id) {
160 return Ok(SubscriptionAssignment {
161 subscription_id,
162 connection_id: existing,
163 });
164 }
165
166 let connection = self
167 .connections
168 .iter_mut()
169 .min_by_key(|connection| (connection.subscription_count, connection.id))
170 .ok_or_else(|| SdkError::Connection {
171 description: "connection pool has no connections".into(),
172 })?;
173
174 connection.subscription_count = connection.subscription_count.saturating_add(1);
175 connection.recovery.track_subscription(subscription_id);
176
177 Ok(SubscriptionAssignment {
178 subscription_id,
179 connection_id: connection.id,
180 })
181 }
182
183 pub fn acknowledge(
189 &mut self,
190 subscription_id: SubscriptionId,
191 sequence: u64,
192 ) -> Result<(), SdkError> {
193 let connection = self.connection_for_subscription_mut(subscription_id)?;
194 connection.recovery.acknowledge(subscription_id, sequence);
195 Ok(())
196 }
197
198 pub fn unsubscribe(&mut self, subscription_id: SubscriptionId) -> Result<(), SdkError> {
204 let connection = self.connection_for_subscription_mut(subscription_id)?;
205 connection.recovery.unsubscribe(subscription_id);
206 connection.subscription_count = connection.subscription_count.saturating_sub(1);
207 Ok(())
208 }
209
210 pub fn resume_requests_for_transition(
216 &self,
217 event: &ConnectionEvent,
218 ) -> Result<Vec<ResumeRequest>, SdkError> {
219 let mut requests = Vec::new();
220 for connection in &self.connections {
221 requests.extend(connection.recovery.resume_requests_for_transition(event)?);
222 }
223 Ok(requests)
224 }
225
226 #[must_use]
228 pub fn connection_for_subscription(
229 &self,
230 subscription_id: SubscriptionId,
231 ) -> Option<PoolConnectionId> {
232 self.connections
233 .iter()
234 .find(|connection| connection.recovery.is_active(subscription_id))
235 .map(|connection| connection.id)
236 }
237
238 pub fn subscription_count(&self, connection_id: PoolConnectionId) -> Result<usize, SdkError> {
244 self.connection(connection_id)
245 .map(|connection| connection.subscription_count)
246 }
247
248 pub fn recovery(
254 &self,
255 connection_id: PoolConnectionId,
256 ) -> Result<&SubscriptionRecovery, SdkError> {
257 self.connection(connection_id)
258 .map(|connection| &connection.recovery)
259 }
260
261 pub fn lifecycle(
267 &self,
268 connection_id: PoolConnectionId,
269 ) -> Result<&ConnectionLifecycle, SdkError> {
270 self.connection(connection_id)
271 .map(|connection| &connection.lifecycle)
272 }
273
274 fn connection(&self, connection_id: PoolConnectionId) -> Result<&PoolConnection, SdkError> {
275 self.connections
276 .iter()
277 .find(|connection| connection.id == connection_id)
278 .ok_or_else(|| SdkError::Connection {
279 description: format!("unknown pooled connection {}", connection_id.get()),
280 })
281 }
282
283 fn connection_for_subscription_mut(
284 &mut self,
285 subscription_id: SubscriptionId,
286 ) -> Result<&mut PoolConnection, SdkError> {
287 self.connections
288 .iter_mut()
289 .find(|connection| connection.recovery.is_active(subscription_id))
290 .ok_or_else(|| SdkError::Connection {
291 description: format!("unknown subscription {}", subscription_id.get()),
292 })
293 }
294}
295
296#[cfg(test)]
297mod tests {
298 use super::*;
299 use crate::connection::{ConnectionState, DisconnectReason};
300
301 #[test]
302 fn invalid_pool_size_is_rejected() {
303 let config = ConnectionPoolConfig::new(0, 10, 16);
304
305 assert!(ConnectionPool::new(config, ReconnectConfig::default()).is_err());
306 }
307
308 #[test]
309 fn subscriptions_are_distributed_across_connections() -> Result<(), SdkError> {
310 let config = ConnectionPoolConfig::new(2, 10, 16);
311 let mut pool = ConnectionPool::new(config, ReconnectConfig::default())?;
312
313 let first = pool.assign_subscription(SubscriptionId::new(1))?;
314 let second = pool.assign_subscription(SubscriptionId::new(2))?;
315 let third = pool.assign_subscription(SubscriptionId::new(3))?;
316
317 assert_ne!(first.connection_id, second.connection_id);
318 assert_eq!(third.connection_id, first.connection_id);
319 assert_eq!(pool.subscription_count(first.connection_id)?, 2);
320 assert_eq!(pool.subscription_count(second.connection_id)?, 1);
321 Ok(())
322 }
323
324 #[test]
325 fn multiple_subscriptions_share_configured_connections() -> Result<(), SdkError> {
326 let config = ConnectionPoolConfig::new(1, 20, 32);
327 let mut pool = ConnectionPool::new(config, ReconnectConfig::default())?;
328
329 let first = pool.assign_subscription(SubscriptionId::new(10))?;
330 let second = pool.assign_subscription(SubscriptionId::new(11))?;
331
332 assert_eq!(first.connection_id, second.connection_id);
333 assert_eq!(pool.max_connections(), 1);
334 assert_eq!(pool.subscription_count(first.connection_id)?, 2);
335 Ok(())
336 }
337
338 #[test]
339 fn pooled_recovery_builds_resume_requests_on_reconnect() -> Result<(), SdkError> {
340 let config = ConnectionPoolConfig::new(2, 10, 16);
341 let mut pool = ConnectionPool::new(config, ReconnectConfig::default())?;
342 let first = SubscriptionId::new(21);
343 let second = SubscriptionId::new(22);
344 let event = ConnectionEvent::new(
345 ConnectionState::Reconnecting { attempt: 0 },
346 ConnectionState::Connected,
347 );
348
349 pool.assign_subscription(first)?;
350 pool.assign_subscription(second)?;
351 pool.acknowledge(first, 4)?;
352
353 let requests = pool.resume_requests_for_transition(&event)?;
354
355 assert_eq!(
356 requests,
357 vec![ResumeRequest::new(first, 5), ResumeRequest::new(second, 0)]
358 );
359 Ok(())
360 }
361
362 #[test]
363 fn unsubscribe_removes_assignment() -> Result<(), SdkError> {
364 let config = ConnectionPoolConfig::new(2, 10, 16);
365 let mut pool = ConnectionPool::new(config, ReconnectConfig::default())?;
366 let subscription_id = SubscriptionId::new(31);
367 let assignment = pool.assign_subscription(subscription_id)?;
368
369 pool.unsubscribe(subscription_id)?;
370
371 assert_eq!(pool.connection_for_subscription(subscription_id), None);
372 assert_eq!(pool.subscription_count(assignment.connection_id)?, 0);
373 assert!(pool.unsubscribe(subscription_id).is_err());
374 Ok(())
375 }
376
377 #[test]
378 fn non_reconnect_transition_does_not_resume() -> Result<(), SdkError> {
379 let config = ConnectionPoolConfig::new(2, 10, 16);
380 let mut pool = ConnectionPool::new(config, ReconnectConfig::default())?;
381 let event = ConnectionEvent::new(
382 ConnectionState::Connected,
383 ConnectionState::Disconnected {
384 reason: DisconnectReason::Normal,
385 },
386 );
387
388 pool.assign_subscription(SubscriptionId::new(41))?;
389
390 assert!(pool.resume_requests_for_transition(&event)?.is_empty());
391 Ok(())
392 }
393}