Skip to main content

vectorizer_sdk/rpc/
pool.rs

1//! Minimal RPC connection pool.
2//!
3//! A bounded pool of [`RpcClient`]s. `acquire()` returns an idle
4//! client (or builds a new one if none are available and the pool
5//! isn't at capacity); the returned guard returns the client to the
6//! pool on `Drop`.
7//!
8//! This is intentionally NOT `bb8` / `deadpool` — those bring async
9//! traits and heavyweight reconnect logic that the v1 SDK doesn't
10//! need. If a future workload requires fancier pooling (e.g.
11//! per-connection health checks, idle eviction), swap to a real pool
12//! crate at that point.
13
14use std::sync::Arc;
15
16use parking_lot::Mutex;
17use tokio::sync::Semaphore;
18
19use super::client::{HelloPayload, RpcClient, RpcClientError};
20
21/// Configuration for [`RpcPool`].
22#[derive(Debug, Clone)]
23pub struct RpcPoolConfig {
24    /// Server address (`host:port`) every connection in the pool
25    /// dials.
26    pub address: String,
27    /// Maximum number of concurrent open connections. Calls block on
28    /// `acquire()` once this many are checked out.
29    pub max_connections: usize,
30    /// HELLO payload sent on every newly-built connection.
31    pub hello: HelloPayload,
32}
33
34/// A minimal connection pool.
35pub struct RpcPool {
36    config: RpcPoolConfig,
37    /// Semaphore limits the total number of live + checked-out
38    /// connections to `max_connections`.
39    permits: Arc<Semaphore>,
40    /// Idle clients available for reuse. `None` is also a valid pool
41    /// state (the slot is just empty); callers build a fresh client
42    /// in that case.
43    idle: Arc<Mutex<Vec<RpcClient>>>,
44}
45
46impl RpcPool {
47    /// Build a new pool. Does NOT open any connections eagerly; the
48    /// first `acquire()` call dials the first connection.
49    pub fn new(config: RpcPoolConfig) -> Self {
50        let max = config.max_connections.max(1);
51        Self {
52            permits: Arc::new(Semaphore::new(max)),
53            idle: Arc::new(Mutex::new(Vec::with_capacity(max))),
54            config,
55        }
56    }
57
58    /// Acquire a client from the pool. Blocks (asynchronously) when
59    /// the pool is at capacity until a slot frees. The returned
60    /// [`PooledClient`] returns the client to the pool on `Drop`.
61    pub async fn acquire(&self) -> Result<PooledClient, RpcClientError> {
62        let permit = self
63            .permits
64            .clone()
65            .acquire_owned()
66            .await
67            .expect("semaphore not closed");
68
69        // Try idle list first; on miss, dial + handshake.
70        let client = {
71            let mut idle = self.idle.lock();
72            idle.pop()
73        };
74        let client = match client {
75            Some(c) => c,
76            None => {
77                let c = RpcClient::connect(&self.config.address).await?;
78                let _ = c.hello(self.config.hello.clone()).await?;
79                c
80            }
81        };
82
83        Ok(PooledClient {
84            inner: Some(client),
85            idle: Arc::clone(&self.idle),
86            _permit: permit,
87        })
88    }
89
90    /// Number of idle clients currently sitting in the pool. Useful
91    /// for diagnostics and testing — production code should not
92    /// branch on this.
93    pub fn idle_count(&self) -> usize {
94        self.idle.lock().len()
95    }
96}
97
98/// RAII guard returned by [`RpcPool::acquire`]. Returns the client to
99/// the pool on `Drop` so subsequent acquires reuse the connection.
100pub struct PooledClient {
101    inner: Option<RpcClient>,
102    idle: Arc<Mutex<Vec<RpcClient>>>,
103    _permit: tokio::sync::OwnedSemaphorePermit,
104}
105
106impl PooledClient {
107    /// Borrow the underlying client.
108    pub fn client(&self) -> &RpcClient {
109        self.inner
110            .as_ref()
111            .expect("PooledClient inner is only None during Drop")
112    }
113}
114
115impl Drop for PooledClient {
116    fn drop(&mut self) {
117        // Move the client back into the idle list. If the inner
118        // RpcClient was already torn (reader task dead) the next
119        // acquire will see a stale client; for the v1 pool we rely on
120        // the call-time `ConnectionClosed` error to surface the
121        // problem rather than pre-validating on return. A future
122        // version can add a health check here.
123        if let Some(client) = self.inner.take() {
124            let mut idle = self.idle.lock();
125            idle.push(client);
126        }
127    }
128}
129
130#[cfg(test)]
131#[allow(clippy::unwrap_used, clippy::expect_used)]
132mod tests {
133    use super::*;
134
135    #[test]
136    fn config_round_trip() {
137        let cfg = RpcPoolConfig {
138            address: "localhost:15503".into(),
139            max_connections: 4,
140            hello: HelloPayload::new("test"),
141        };
142        // Constructing the pool with a non-empty config doesn't dial
143        // — we can verify shape without a real server. The first
144        // dial happens inside acquire(); that path is exercised by
145        // the integration suite.
146        let pool = RpcPool::new(cfg);
147        assert_eq!(pool.idle_count(), 0);
148    }
149}