vectorizer_sdk/rpc/pool.rs
1//! Minimal RPC connection pool.
2//!
3//! A bounded pool of [`RpcClient`]s. `acquire()` returns an idle
4//! client (or builds a new one if none are available and the pool
5//! isn't at capacity); the returned guard returns the client to the
6//! pool on `Drop`.
7//!
8//! This is intentionally NOT `bb8` / `deadpool` — those bring async
9//! traits and heavyweight reconnect logic that the v1 SDK doesn't
10//! need. If a future workload requires fancier pooling (e.g.
11//! per-connection health checks, idle eviction), swap to a real pool
12//! crate at that point.
13
14use std::sync::Arc;
15
16use parking_lot::Mutex;
17use tokio::sync::Semaphore;
18
19use super::client::{HelloPayload, RpcClient, RpcClientError};
20
21/// Configuration for [`RpcPool`].
22#[derive(Debug, Clone)]
23pub struct RpcPoolConfig {
24 /// Server address (`host:port`) every connection in the pool
25 /// dials.
26 pub address: String,
27 /// Maximum number of concurrent open connections. Calls block on
28 /// `acquire()` once this many are checked out.
29 pub max_connections: usize,
30 /// HELLO payload sent on every newly-built connection.
31 pub hello: HelloPayload,
32}
33
34/// A minimal connection pool.
35pub struct RpcPool {
36 config: RpcPoolConfig,
37 /// Semaphore limits the total number of live + checked-out
38 /// connections to `max_connections`.
39 permits: Arc<Semaphore>,
40 /// Idle clients available for reuse. `None` is also a valid pool
41 /// state (the slot is just empty); callers build a fresh client
42 /// in that case.
43 idle: Arc<Mutex<Vec<RpcClient>>>,
44}
45
46impl RpcPool {
47 /// Build a new pool. Does NOT open any connections eagerly; the
48 /// first `acquire()` call dials the first connection.
49 pub fn new(config: RpcPoolConfig) -> Self {
50 let max = config.max_connections.max(1);
51 Self {
52 permits: Arc::new(Semaphore::new(max)),
53 idle: Arc::new(Mutex::new(Vec::with_capacity(max))),
54 config,
55 }
56 }
57
58 /// Acquire a client from the pool. Blocks (asynchronously) when
59 /// the pool is at capacity until a slot frees. The returned
60 /// [`PooledClient`] returns the client to the pool on `Drop`.
61 pub async fn acquire(&self) -> Result<PooledClient, RpcClientError> {
62 let permit = self
63 .permits
64 .clone()
65 .acquire_owned()
66 .await
67 .expect("semaphore not closed");
68
69 // Try idle list first; on miss, dial + handshake.
70 let client = {
71 let mut idle = self.idle.lock();
72 idle.pop()
73 };
74 let client = match client {
75 Some(c) => c,
76 None => {
77 let c = RpcClient::connect(&self.config.address).await?;
78 let _ = c.hello(self.config.hello.clone()).await?;
79 c
80 }
81 };
82
83 Ok(PooledClient {
84 inner: Some(client),
85 idle: Arc::clone(&self.idle),
86 _permit: permit,
87 })
88 }
89
90 /// Number of idle clients currently sitting in the pool. Useful
91 /// for diagnostics and testing — production code should not
92 /// branch on this.
93 pub fn idle_count(&self) -> usize {
94 self.idle.lock().len()
95 }
96}
97
98/// RAII guard returned by [`RpcPool::acquire`]. Returns the client to
99/// the pool on `Drop` so subsequent acquires reuse the connection.
100pub struct PooledClient {
101 inner: Option<RpcClient>,
102 idle: Arc<Mutex<Vec<RpcClient>>>,
103 _permit: tokio::sync::OwnedSemaphorePermit,
104}
105
106impl PooledClient {
107 /// Borrow the underlying client.
108 pub fn client(&self) -> &RpcClient {
109 self.inner
110 .as_ref()
111 .expect("PooledClient inner is only None during Drop")
112 }
113}
114
115impl Drop for PooledClient {
116 fn drop(&mut self) {
117 // Move the client back into the idle list. If the inner
118 // RpcClient was already torn (reader task dead) the next
119 // acquire will see a stale client; for the v1 pool we rely on
120 // the call-time `ConnectionClosed` error to surface the
121 // problem rather than pre-validating on return. A future
122 // version can add a health check here.
123 if let Some(client) = self.inner.take() {
124 let mut idle = self.idle.lock();
125 idle.push(client);
126 }
127 }
128}
129
130#[cfg(test)]
131#[allow(clippy::unwrap_used, clippy::expect_used)]
132mod tests {
133 use super::*;
134
135 #[test]
136 fn config_round_trip() {
137 let cfg = RpcPoolConfig {
138 address: "localhost:15503".into(),
139 max_connections: 4,
140 hello: HelloPayload::new("test"),
141 };
142 // Constructing the pool with a non-empty config doesn't dial
143 // — we can verify shape without a real server. The first
144 // dial happens inside acquire(); that path is exercised by
145 // the integration suite.
146 let pool = RpcPool::new(cfg);
147 assert_eq!(pool.idle_count(), 0);
148 }
149}