Skip to main content

lnc_client/
connection.rs

1//! Connection Management and Resilience
2//!
3//! Provides connection pooling, automatic reconnection, and resilience features
4//! for LANCE client connections.
5//!
6//! # Features
7//!
8//! - **Connection Pool**: Manage multiple connections to a LANCE server
9//! - **Auto-Reconnect**: Automatically reconnect on connection failures
10//! - **Health Checking**: Periodic health checks with ping/pong
11//! - **Backoff**: Exponential backoff for reconnection attempts
12//! - **Circuit Breaker**: Prevent cascading failures
13//!
14//! # Example
15//!
16//! ```rust,no_run
17//! use lnc_client::{ConnectionPool, ConnectionPoolConfig};
18//!
19//! #[tokio::main]
20//! async fn main() -> Result<(), Box<dyn std::error::Error>> {
21//!     let pool = ConnectionPool::new(
22//!         "127.0.0.1:1992",
23//!         ConnectionPoolConfig::new()
24//!             .with_max_connections(10)
25//!             .with_health_check_interval(30),
26//!     ).await?;
27//!
28//!     // Get a connection from the pool
29//!     let mut conn = pool.get().await?;
30//!     
31//!     // Use the connection
32//!     conn.ping().await?;
33//!     
34//!     // Connection is returned to pool when dropped
35//!     Ok(())
36//! }
37//! ```
38
39use std::collections::VecDeque;
40use std::net::SocketAddr;
41use std::sync::Arc;
42use std::sync::atomic::{AtomicBool, AtomicU64, Ordering};
43use std::time::{Duration, Instant};
44
45use tokio::sync::{Mutex, OwnedSemaphorePermit, Semaphore};
46
47use crate::client::{ClientConfig, LanceClient};
48use crate::error::{ClientError, Result};
49use crate::tls::TlsClientConfig;
50
51/// Configuration knobs for the LANCE client connection pool, mirroring the
52/// resilience patterns documented in Architecture §§14 and 20.
53#[derive(Debug, Clone)]
54pub struct ConnectionPoolConfig {
55    /// Maximum number of connections in the pool
56    pub max_connections: usize,
57    /// Minimum number of idle connections to maintain
58    pub min_idle: usize,
59    /// Connection timeout
60    pub connect_timeout: Duration,
61    /// Maximum time to wait for a connection from the pool
62    pub acquire_timeout: Duration,
63    /// Health check interval (0 = disabled)
64    pub health_check_interval: Duration,
65    /// Maximum connection lifetime (0 = unlimited)
66    pub max_lifetime: Duration,
67    /// Idle timeout before closing a connection
68    pub idle_timeout: Duration,
69    /// Enable automatic reconnection
70    pub auto_reconnect: bool,
71    /// Maximum reconnection attempts (0 = unlimited)
72    pub max_reconnect_attempts: u32,
73    /// Base delay for exponential backoff
74    pub reconnect_base_delay: Duration,
75    /// Maximum delay for exponential backoff
76    pub reconnect_max_delay: Duration,
77    /// TLS configuration (None = plain TCP)
78    pub tls_config: Option<TlsClientConfig>,
79}
80
81impl Default for ConnectionPoolConfig {
82    fn default() -> Self {
83        Self {
84            max_connections: 10,
85            min_idle: 1,
86            connect_timeout: Duration::from_secs(30),
87            acquire_timeout: Duration::from_secs(30),
88            health_check_interval: Duration::from_secs(30),
89            max_lifetime: Duration::from_secs(3600), // 1 hour
90            idle_timeout: Duration::from_secs(300),  // 5 minutes
91            auto_reconnect: true,
92            max_reconnect_attempts: 5,
93            reconnect_base_delay: Duration::from_millis(100),
94            reconnect_max_delay: Duration::from_secs(30),
95            tls_config: None,
96        }
97    }
98}
99
100impl ConnectionPoolConfig {
101    /// Create a new connection pool configuration with defaults
102    pub fn new() -> Self {
103        Self::default()
104    }
105
106    /// Set maximum connections
107    pub fn with_max_connections(mut self, n: usize) -> Self {
108        self.max_connections = n;
109        self
110    }
111
112    /// Set minimum idle connections
113    pub fn with_min_idle(mut self, n: usize) -> Self {
114        self.min_idle = n;
115        self
116    }
117
118    /// Set connection timeout
119    pub fn with_connect_timeout(mut self, timeout: Duration) -> Self {
120        self.connect_timeout = timeout;
121        self
122    }
123
124    /// Set acquire timeout
125    pub fn with_acquire_timeout(mut self, timeout: Duration) -> Self {
126        self.acquire_timeout = timeout;
127        self
128    }
129
130    /// Set health check interval (seconds)
131    pub fn with_health_check_interval(mut self, secs: u64) -> Self {
132        self.health_check_interval = Duration::from_secs(secs);
133        self
134    }
135
136    /// Set maximum connection lifetime
137    pub fn with_max_lifetime(mut self, lifetime: Duration) -> Self {
138        self.max_lifetime = lifetime;
139        self
140    }
141
142    /// Set idle timeout
143    pub fn with_idle_timeout(mut self, timeout: Duration) -> Self {
144        self.idle_timeout = timeout;
145        self
146    }
147
148    /// Enable or disable auto-reconnect
149    pub fn with_auto_reconnect(mut self, enabled: bool) -> Self {
150        self.auto_reconnect = enabled;
151        self
152    }
153
154    /// Set maximum reconnection attempts
155    pub fn with_max_reconnect_attempts(mut self, attempts: u32) -> Self {
156        self.max_reconnect_attempts = attempts;
157        self
158    }
159
160    /// Set TLS configuration for encrypted connections
161    pub fn with_tls(mut self, tls_config: TlsClientConfig) -> Self {
162        self.tls_config = Some(tls_config);
163        self
164    }
165}
166
167/// Snapshot of connection-pool statistics exposed for observability and sized
168/// to match Architecture §27's metrics guidance.
169#[derive(Debug, Clone, Default)]
170pub struct PoolStats {
171    /// Total connections created
172    pub connections_created: u64,
173    /// Total connections closed
174    pub connections_closed: u64,
175    /// Current active connections (in use)
176    pub active_connections: u64,
177    /// Current idle connections (available)
178    pub idle_connections: u64,
179    /// Total acquire attempts
180    pub acquire_attempts: u64,
181    /// Successful acquires
182    pub acquire_successes: u64,
183    /// Failed acquires (timeout, error)
184    pub acquire_failures: u64,
185    /// Health check failures
186    pub health_check_failures: u64,
187    /// Reconnection attempts
188    pub reconnect_attempts: u64,
189}
190
191/// Internal pool metrics using atomics (not exported outside the crate) so we
192/// can keep Architecture §27 counters non-blocking.
193#[derive(Debug, Default)]
194struct PoolMetrics {
195    connections_created: AtomicU64,
196    connections_closed: AtomicU64,
197    active_connections: AtomicU64,
198    idle_connections: AtomicU64,
199    acquire_attempts: AtomicU64,
200    acquire_successes: AtomicU64,
201    acquire_failures: AtomicU64,
202    health_check_failures: AtomicU64,
203    reconnect_attempts: AtomicU64,
204}
205
206impl PoolMetrics {
207    fn snapshot(&self) -> PoolStats {
208        PoolStats {
209            connections_created: self.connections_created.load(Ordering::Relaxed),
210            connections_closed: self.connections_closed.load(Ordering::Relaxed),
211            active_connections: self.active_connections.load(Ordering::Relaxed),
212            idle_connections: self.idle_connections.load(Ordering::Relaxed),
213            acquire_attempts: self.acquire_attempts.load(Ordering::Relaxed),
214            acquire_successes: self.acquire_successes.load(Ordering::Relaxed),
215            acquire_failures: self.acquire_failures.load(Ordering::Relaxed),
216            health_check_failures: self.health_check_failures.load(Ordering::Relaxed),
217            reconnect_attempts: self.reconnect_attempts.load(Ordering::Relaxed),
218        }
219    }
220}
221
222/// Internal tracked connection paired with lifecycle metadata so we can enforce
223/// Architecture §14 lifetime/idle policies.
224struct PooledConnection {
225    client: LanceClient,
226    created_at: Instant,
227    last_used: Instant,
228}
229
230impl PooledConnection {
231    fn new(client: LanceClient) -> Self {
232        let now = Instant::now();
233        Self {
234            client,
235            created_at: now,
236            last_used: now,
237        }
238    }
239
240    fn is_expired(&self, max_lifetime: Duration) -> bool {
241        if max_lifetime.is_zero() {
242            return false;
243        }
244        self.created_at.elapsed() > max_lifetime
245    }
246
247    fn is_idle_too_long(&self, idle_timeout: Duration) -> bool {
248        if idle_timeout.is_zero() {
249            return false;
250        }
251        self.last_used.elapsed() > idle_timeout
252    }
253}
254
255/// Connection pool for managing LANCE client connections, ensuring we reuse
256/// TCP/TLS sessions per Architecture §14 instead of spawning new ones per
257/// request. The pool keeps pre-warmed connections, enforces max/idle caps, and
258/// wires up background health checks so ingestion/consumer threads avoid
259/// blocking on connect storms.
260pub struct ConnectionPool {
261    addr: String,
262    config: ConnectionPoolConfig,
263    connections: Arc<Mutex<VecDeque<PooledConnection>>>,
264    semaphore: Arc<Semaphore>,
265    metrics: Arc<PoolMetrics>,
266    running: Arc<AtomicBool>,
267}
268
269impl ConnectionPool {
270    /// Creates a new connection pool bound to the provided address in keeping
271    /// with Architecture §14's pinning and reuse strategy.
272    ///
273    /// # Arguments
274    /// * `addr` - Target endpoint (`"host:port"`) shared by pooled clients.
275    /// * `config` - Pool configuration describing capacity, timeouts, and TLS.
276    ///
277    /// # Returns
278    /// * `Result<Self>` - Ready-to-use pool with `min_idle` connections opened
279    ///   and a health-check task running (if enabled).
280    ///
281    /// # Errors
282    /// Propagates connection failures when seeding initial idle clients.
283    pub async fn new(addr: &str, config: ConnectionPoolConfig) -> Result<Self> {
284        let pool = Self {
285            addr: addr.to_string(),
286            config: config.clone(),
287            connections: Arc::new(Mutex::new(VecDeque::new())),
288            semaphore: Arc::new(Semaphore::new(config.max_connections)),
289            metrics: Arc::new(PoolMetrics::default()),
290            running: Arc::new(AtomicBool::new(true)),
291        };
292
293        // Pre-populate with minimum idle connections
294        for _ in 0..config.min_idle {
295            if let Ok(conn) = pool.create_connection().await {
296                let mut connections = pool.connections.lock().await;
297                connections.push_back(conn);
298                pool.metrics
299                    .idle_connections
300                    .fetch_add(1, Ordering::Relaxed);
301            }
302        }
303
304        // Start health check task if enabled
305        if !config.health_check_interval.is_zero() {
306            let pool_clone = ConnectionPool {
307                addr: pool.addr.clone(),
308                config: pool.config.clone(),
309                connections: pool.connections.clone(),
310                semaphore: pool.semaphore.clone(),
311                metrics: pool.metrics.clone(),
312                running: pool.running.clone(),
313            };
314            tokio::spawn(async move {
315                pool_clone.health_check_task().await;
316            });
317        }
318
319        Ok(pool)
320    }
321
322    /// Acquires a pooled connection, respecting the configured acquire timeout
323    /// and returning the RAII guard used throughout the client APIs.
324    ///
325    /// # Returns
326    /// * `Result<PooledClient>` - Guard wrapping the loaned connection.
327    ///
328    /// # Errors
329    /// * [`ClientError::Timeout`] if acquisition exceeds `acquire_timeout`.
330    /// * [`ClientError::ConnectionClosed`] if the semaphore is closed.
331    pub async fn get(&self) -> Result<PooledClient> {
332        self.metrics
333            .acquire_attempts
334            .fetch_add(1, Ordering::Relaxed);
335
336        // Acquire permit with timeout
337        let permit = tokio::time::timeout(
338            self.config.acquire_timeout,
339            self.semaphore.clone().acquire_owned(),
340        )
341        .await
342        .map_err(|_| {
343            self.metrics
344                .acquire_failures
345                .fetch_add(1, Ordering::Relaxed);
346            ClientError::Timeout
347        })?
348        .map_err(|_| {
349            self.metrics
350                .acquire_failures
351                .fetch_add(1, Ordering::Relaxed);
352            ClientError::ConnectionClosed
353        })?;
354
355        // Try to get an existing connection
356        let conn = {
357            let mut connections = self.connections.lock().await;
358            loop {
359                match connections.pop_front() {
360                    Some(conn) => {
361                        self.metrics
362                            .idle_connections
363                            .fetch_sub(1, Ordering::Relaxed);
364
365                        // Check if connection is still valid
366                        if conn.is_expired(self.config.max_lifetime)
367                            || conn.is_idle_too_long(self.config.idle_timeout)
368                        {
369                            self.metrics
370                                .connections_closed
371                                .fetch_add(1, Ordering::Relaxed);
372                            continue;
373                        }
374                        break Some(conn);
375                    },
376                    None => break None,
377                }
378            }
379        };
380
381        let conn = match conn {
382            Some(mut c) => {
383                c.last_used = Instant::now();
384                c
385            },
386            None => {
387                // Create a new connection
388                self.create_connection().await?
389            },
390        };
391
392        self.metrics
393            .active_connections
394            .fetch_add(1, Ordering::Relaxed);
395        self.metrics
396            .acquire_successes
397            .fetch_add(1, Ordering::Relaxed);
398
399        Ok(PooledClient {
400            conn: Some(conn),
401            pool: self.connections.clone(),
402            metrics: self.metrics.clone(),
403            permit: Some(permit),
404            config: self.config.clone(),
405        })
406    }
407
408    /// Creates a brand-new `LanceClient` based on the pool's configuration.
409    ///
410    /// # Returns
411    /// * `Result<PooledConnection>` - Fresh connection wrapped with lifecycle
412    ///   metadata for expiry/idle enforcement.
413    ///
414    /// # Errors
415    /// Propagates underlying `LanceClient::connect`/`connect_tls` failures.
416    async fn create_connection(&self) -> Result<PooledConnection> {
417        let mut client_config = ClientConfig::new(&self.addr);
418        client_config.connect_timeout = self.config.connect_timeout;
419
420        let client = match &self.config.tls_config {
421            Some(tls_config) => LanceClient::connect_tls(client_config, tls_config.clone()).await?,
422            None => LanceClient::connect(client_config).await?,
423        };
424        self.metrics
425            .connections_created
426            .fetch_add(1, Ordering::Relaxed);
427
428        Ok(PooledConnection::new(client))
429    }
430
431    /// Get pool statistics
432    pub fn stats(&self) -> PoolStats {
433        self.metrics.snapshot()
434    }
435
436    /// Gracefully shuts down the pool, clearing idle connections and marking
437    /// future `get` calls as invalid per Architecture §14 teardown guidance.
438    pub async fn close(&self) {
439        self.running.store(false, Ordering::Relaxed);
440
441        let mut connections = self.connections.lock().await;
442        let count = connections.len() as u64;
443        connections.clear();
444        self.metrics
445            .connections_closed
446            .fetch_add(count, Ordering::Relaxed);
447        self.metrics.idle_connections.store(0, Ordering::Relaxed);
448    }
449
450    /// Health check task
451    async fn health_check_task(&self) {
452        let mut interval = tokio::time::interval(self.config.health_check_interval);
453
454        while self.running.load(Ordering::Relaxed) {
455            interval.tick().await;
456
457            // Get all connections for health check
458            let mut to_check = {
459                let mut connections = self.connections.lock().await;
460                std::mem::take(&mut *connections)
461            };
462
463            let mut healthy = VecDeque::new();
464            let _initial_count = to_check.len();
465
466            for mut conn in to_check.drain(..) {
467                // Check expiry
468                if conn.is_expired(self.config.max_lifetime) {
469                    self.metrics
470                        .connections_closed
471                        .fetch_add(1, Ordering::Relaxed);
472                    continue;
473                }
474
475                // Ping to check health
476                match conn.client.ping().await {
477                    Ok(_) => {
478                        conn.last_used = Instant::now();
479                        healthy.push_back(conn);
480                    },
481                    Err(_) => {
482                        self.metrics
483                            .health_check_failures
484                            .fetch_add(1, Ordering::Relaxed);
485                        self.metrics
486                            .connections_closed
487                            .fetch_add(1, Ordering::Relaxed);
488                    },
489                }
490            }
491
492            // Return healthy connections
493            {
494                let mut connections = self.connections.lock().await;
495                connections.extend(healthy);
496                self.metrics
497                    .idle_connections
498                    .store(connections.len() as u64, Ordering::Relaxed);
499            }
500        }
501    }
502}
503
504/// RAII guard that returns the connection to the pool on drop, keeping the
505/// Architecture §14 resource caps accurate.
506pub struct PooledClient {
507    conn: Option<PooledConnection>,
508    pool: Arc<Mutex<VecDeque<PooledConnection>>>,
509    metrics: Arc<PoolMetrics>,
510    #[allow(dead_code)]
511    permit: Option<OwnedSemaphorePermit>,
512    #[allow(dead_code)]
513    config: ConnectionPoolConfig,
514}
515
516impl PooledClient {
517    /// Get a reference to the underlying client
518    pub fn client(&mut self) -> Result<&mut LanceClient> {
519        match self.conn.as_mut() {
520            Some(conn) => Ok(&mut conn.client),
521            None => Err(ClientError::ConnectionClosed),
522        }
523    }
524
525    /// Ping the server
526    pub async fn ping(&mut self) -> Result<Duration> {
527        if let Some(ref mut conn) = self.conn {
528            conn.client.ping().await
529        } else {
530            Err(ClientError::ConnectionClosed)
531        }
532    }
533
534    /// Mark the connection as unhealthy (don't return to pool)
535    pub fn mark_unhealthy(&mut self) {
536        self.conn = None;
537        self.metrics
538            .connections_closed
539            .fetch_add(1, Ordering::Relaxed);
540    }
541}
542
543impl Drop for PooledClient {
544    fn drop(&mut self) {
545        if let Some(mut conn) = self.conn.take() {
546            conn.last_used = Instant::now();
547
548            // Return to pool
549            let pool = self.pool.clone();
550            let metrics = self.metrics.clone();
551
552            tokio::spawn(async move {
553                let mut connections = pool.lock().await;
554                connections.push_back(conn);
555                metrics.active_connections.fetch_sub(1, Ordering::Relaxed);
556                metrics.idle_connections.fetch_add(1, Ordering::Relaxed);
557            });
558        } else {
559            self.metrics
560                .active_connections
561                .fetch_sub(1, Ordering::Relaxed);
562        }
563
564        // Permit is released when dropped
565    }
566}
567
568/// Reconnecting client wrapper offering automatic reconnection and leader
569/// redirection support per Architecture §21.
570pub struct ReconnectingClient {
571    addr: String,
572    config: ClientConfig,
573    tls_config: Option<TlsClientConfig>,
574    client: Option<LanceClient>,
575    reconnect_attempts: u32,
576    max_attempts: u32,
577    base_delay: Duration,
578    max_delay: Duration,
579    /// Current leader address (for redirection)
580    leader_addr: Option<SocketAddr>,
581    /// Whether to follow leader redirects
582    follow_leader: bool,
583}
584
585impl ReconnectingClient {
586    /// Create a new reconnecting client
587    ///
588    /// The address can be either an IP:port (e.g., "127.0.0.1:1992") or
589    /// a hostname:port (e.g., "lance.example.com:1992").
590    pub async fn connect(addr: &str) -> Result<Self> {
591        let config = ClientConfig::new(addr);
592        let client = LanceClient::connect(config.clone()).await?;
593
594        Ok(Self {
595            addr: addr.to_string(),
596            config,
597            tls_config: None,
598            client: Some(client),
599            reconnect_attempts: 0,
600            max_attempts: 5,
601            base_delay: Duration::from_millis(100),
602            max_delay: Duration::from_secs(30),
603            leader_addr: None,
604            follow_leader: true,
605        })
606    }
607
608    /// Wrap an existing LanceClient with auto-reconnect support.
609    ///
610    /// The `addr` is used for DNS re-resolution on reconnect (important for
611    /// load-balanced endpoints). By default, retries are unlimited.
612    pub fn from_existing(client: LanceClient, addr: &str) -> Self {
613        let config = client.config().clone();
614        Self {
615            addr: addr.to_string(),
616            config,
617            tls_config: None,
618            client: Some(client),
619            reconnect_attempts: 0,
620            max_attempts: 0, // unlimited by default
621            base_delay: Duration::from_millis(500),
622            max_delay: Duration::from_secs(30),
623            leader_addr: None,
624            follow_leader: true,
625        }
626    }
627
628    /// Create a new reconnecting client with TLS
629    ///
630    /// The address can be either an IP:port (e.g., "127.0.0.1:1992") or
631    /// a hostname:port (e.g., "lance.example.com:1992").
632    pub async fn connect_tls(addr: &str, tls_config: TlsClientConfig) -> Result<Self> {
633        let config = ClientConfig::new(addr);
634        let client = LanceClient::connect_tls(config.clone(), tls_config.clone()).await?;
635
636        Ok(Self {
637            addr: addr.to_string(),
638            config,
639            tls_config: Some(tls_config),
640            client: Some(client),
641            reconnect_attempts: 0,
642            max_attempts: 5,
643            base_delay: Duration::from_millis(100),
644            max_delay: Duration::from_secs(30),
645            leader_addr: None,
646            follow_leader: true,
647        })
648    }
649
650    /// Set maximum reconnection attempts (0 = unlimited)
651    pub fn with_max_attempts(mut self, attempts: u32) -> Self {
652        self.max_attempts = attempts;
653        self
654    }
655
656    /// Configure for unlimited reconnection attempts (never give up)
657    pub fn with_unlimited_retries(mut self) -> Self {
658        self.max_attempts = 0;
659        self
660    }
661
662    /// Set base delay for exponential backoff
663    pub fn with_base_delay(mut self, delay: Duration) -> Self {
664        self.base_delay = delay;
665        self
666    }
667
668    /// Set maximum delay for exponential backoff
669    pub fn with_max_delay(mut self, delay: Duration) -> Self {
670        self.max_delay = delay;
671        self
672    }
673
674    /// Enable or disable automatic leader following
675    pub fn with_follow_leader(mut self, follow: bool) -> Self {
676        self.follow_leader = follow;
677        self
678    }
679
680    /// Get the original connection address
681    pub fn original_addr(&self) -> &str {
682        &self.addr
683    }
684
685    /// Get the current leader address if known
686    pub fn leader_addr(&self) -> Option<SocketAddr> {
687        self.leader_addr
688    }
689
690    /// Update the known leader address (called when redirect received)
691    pub fn set_leader_addr(&mut self, addr: SocketAddr) {
692        self.leader_addr = Some(addr);
693        if self.follow_leader {
694            // Update config to connect to leader on next reconnect
695            self.config.addr = addr.to_string();
696        }
697    }
698
699    /// Get total reconnection attempts made
700    pub fn reconnect_attempts(&self) -> u32 {
701        self.reconnect_attempts
702    }
703
704    /// Get a reference to the underlying client, reconnecting if needed
705    pub async fn client(&mut self) -> Result<&mut LanceClient> {
706        if self.client.is_none() {
707            self.reconnect().await?;
708        }
709        self.client.as_mut().ok_or(ClientError::ConnectionClosed)
710    }
711
712    /// Attempt to reconnect with exponential backoff and DNS re-resolution.
713    /// On success, resets the reconnect attempt counter.
714    pub async fn reconnect(&mut self) -> Result<()> {
715        let mut attempts = 0;
716
717        loop {
718            attempts += 1;
719            self.reconnect_attempts += 1;
720
721            // Re-resolve DNS by connecting with the original address.
722            // This is critical for LB environments where DNS round-robin
723            // may route us to a different (healthy) node.
724            let mut config = self.config.clone();
725            config.addr = self.addr.clone();
726
727            let result = match &self.tls_config {
728                Some(tls) => LanceClient::connect_tls(config, tls.clone()).await,
729                None => LanceClient::connect(config).await,
730            };
731
732            match result {
733                Ok(client) => {
734                    self.client = Some(client);
735                    return Ok(());
736                },
737                Err(e) => {
738                    if self.max_attempts > 0 && attempts >= self.max_attempts {
739                        return Err(e);
740                    }
741
742                    // Calculate backoff delay
743                    let delay = self.base_delay * 2u32.saturating_pow(attempts - 1);
744                    let delay = delay.min(self.max_delay);
745
746                    tokio::time::sleep(delay).await;
747                },
748            }
749        }
750    }
751
752    /// Execute an operation with automatic reconnection on failure.
753    /// Retries on all retryable errors (connection failures, FORWARD_FAILED,
754    /// timeouts, backpressure, NOT_LEADER) with exponential backoff.
755    pub async fn execute<F, T>(&mut self, op: F) -> Result<T>
756    where
757        F: Fn(
758            &mut LanceClient,
759        )
760            -> std::pin::Pin<Box<dyn std::future::Future<Output = Result<T>> + Send + '_>>,
761    {
762        let mut attempts = 0u32;
763        loop {
764            let client = self.client().await?;
765
766            match op(client).await {
767                Ok(result) => return Ok(result),
768                Err(e) if e.is_retryable() => {
769                    attempts += 1;
770                    if self.max_attempts > 0 && attempts >= self.max_attempts {
771                        return Err(e);
772                    }
773                    // Mark connection as failed so next client() call reconnects
774                    self.client = None;
775                    // Backoff before retry
776                    let delay = self.base_delay * 2u32.saturating_pow(attempts.saturating_sub(1));
777                    let delay = delay.min(self.max_delay);
778                    tokio::time::sleep(delay).await;
779                },
780                Err(e) => return Err(e),
781            }
782        }
783    }
784
785    /// Mark connection as failed
786    pub fn mark_failed(&mut self) {
787        self.client = None;
788    }
789}
790
791/// Cluster-aware client with automatic node discovery. It discovers nodes and
792/// maintains healthy connections for read availability per Architecture §21.
793pub struct ClusterClient {
794    /// Known cluster nodes
795    nodes: Vec<SocketAddr>,
796    /// Primary node for this client (for connection affinity, not write routing)
797    primary: Option<SocketAddr>,
798    /// Client configuration
799    config: ClientConfig,
800    /// TLS configuration
801    tls_config: Option<TlsClientConfig>,
802    /// Active client connection
803    client: Option<LanceClient>,
804    /// Last successful discovery time
805    last_discovery: Option<Instant>,
806    /// Discovery refresh interval
807    discovery_interval: Duration,
808}
809
810impl ClusterClient {
811    /// Creates a cluster client using the provided seed nodes, aligning with
812    /// Architecture §21's discovery process.
813    ///
814    /// # Arguments
815    /// * `seed_addrs` - List of seed endpoints (`"host:port"`) used to fetch
816    ///   the cluster topology.
817    ///
818    /// # Returns
819    /// * `Result<Self>` - Initialized client after a successful discovery
820    ///   round-trip.
821    ///
822    /// # Errors
823    /// * [`ClientError::ProtocolError`] when no seed nodes resolve.
824    /// * Propagates underlying connect/discovery errors when all nodes fail.
825    pub async fn connect(seed_addrs: &[&str]) -> Result<Self> {
826        let nodes: Vec<SocketAddr> = seed_addrs.iter().filter_map(|s| s.parse().ok()).collect();
827
828        if nodes.is_empty() {
829            return Err(ClientError::ProtocolError(
830                "No valid seed addresses".to_string(),
831            ));
832        }
833
834        let config = ClientConfig::new(nodes[0].to_string());
835        let mut cluster = Self {
836            nodes,
837            primary: None,
838            config,
839            tls_config: None,
840            client: None,
841            last_discovery: None,
842            discovery_interval: Duration::from_secs(60),
843        };
844
845        cluster.discover_cluster().await?;
846        Ok(cluster)
847    }
848
849    /// Creates a TLS-enabled cluster client using the provided seed nodes so we
850    /// can satisfy Architecture §14's transport security guidance.
851    ///
852    /// # Arguments
853    /// * `seed_addrs` - List of seed endpoints for discovery.
854    /// * `tls_config` - Certificates and trust roots forwarded to
855    ///   `LanceClient`.
856    ///
857    /// # Returns
858    /// * `Result<Self>` - Initialized TLS client once discovery succeeds.
859    ///
860    /// # Errors
861    /// Mirrors [`ClusterClient::connect`], plus TLS handshake failures.
862    pub async fn connect_tls(seed_addrs: &[&str], tls_config: TlsClientConfig) -> Result<Self> {
863        let nodes: Vec<SocketAddr> = seed_addrs.iter().filter_map(|s| s.parse().ok()).collect();
864
865        if nodes.is_empty() {
866            return Err(ClientError::ProtocolError(
867                "No valid seed addresses".to_string(),
868            ));
869        }
870
871        let config = ClientConfig::new(nodes[0].to_string()).with_tls(tls_config.clone());
872        let mut cluster = Self {
873            nodes,
874            primary: None,
875            config,
876            tls_config: Some(tls_config),
877            client: None,
878            last_discovery: None,
879            discovery_interval: Duration::from_secs(60),
880        };
881
882        cluster.discover_cluster().await?;
883        Ok(cluster)
884    }
885
886    /// Set the discovery refresh interval
887    pub fn with_discovery_interval(mut self, interval: Duration) -> Self {
888        self.discovery_interval = interval;
889        self
890    }
891
892    /// Discover cluster topology from any available node
893    async fn discover_cluster(&mut self) -> Result<()> {
894        for &node in &self.nodes.clone() {
895            let mut config = self.config.clone();
896            config.addr = node.to_string();
897
898            match LanceClient::connect(config).await {
899                Ok(mut client) => {
900                    match client.get_cluster_status().await {
901                        Ok(status) => {
902                            self.primary = status.leader_id.map(|id| {
903                                // Try to find node in peer_states or use first node
904                                status
905                                    .peer_states
906                                    .get(&id)
907                                    .and_then(|s| s.parse().ok())
908                                    .unwrap_or(node)
909                            });
910                            self.last_discovery = Some(Instant::now());
911
912                            // Connect to primary if found
913                            if let Some(primary_addr) = self.primary {
914                                self.config.addr = primary_addr.to_string();
915                                self.client =
916                                    Some(LanceClient::connect(self.config.clone()).await?);
917                            } else {
918                                self.client = Some(client);
919                            }
920                            return Ok(());
921                        },
922                        Err(_) => {
923                            // Single-node mode or cluster not available
924                            self.client = Some(client);
925                            self.primary = Some(node);
926                            self.last_discovery = Some(Instant::now());
927                            return Ok(());
928                        },
929                    }
930                },
931                Err(_) => continue,
932            }
933        }
934
935        Err(ClientError::ConnectionFailed(std::io::Error::new(
936            std::io::ErrorKind::NotConnected,
937            "Could not connect to any cluster node",
938        )))
939    }
940
941    /// Get a client connection, refreshing discovery if needed
942    pub async fn client(&mut self) -> Result<&mut LanceClient> {
943        // Check if discovery refresh is needed
944        let needs_refresh = self
945            .last_discovery
946            .map(|t| t.elapsed() > self.discovery_interval)
947            .unwrap_or(true);
948
949        if needs_refresh || self.client.is_none() {
950            self.discover_cluster().await?;
951        }
952
953        self.client.as_mut().ok_or(ClientError::ConnectionClosed)
954    }
955
956    /// Get the current primary node address
957    pub fn primary(&self) -> Option<SocketAddr> {
958        self.primary
959    }
960
961    /// Get all known cluster nodes
962    pub fn nodes(&self) -> &[SocketAddr] {
963        &self.nodes
964    }
965
966    /// Get the TLS configuration if set
967    pub fn tls_config(&self) -> Option<&TlsClientConfig> {
968        self.tls_config.as_ref()
969    }
970
971    /// Check if TLS is enabled
972    pub fn is_tls_enabled(&self) -> bool {
973        self.tls_config.is_some()
974    }
975
976    /// Force a cluster discovery refresh
977    pub async fn refresh(&mut self) -> Result<()> {
978        self.discover_cluster().await
979    }
980}
981
982#[cfg(test)]
983#[allow(clippy::unwrap_used)]
984mod tests {
985    use super::*;
986
987    #[test]
988    fn test_pool_config_defaults() {
989        let config = ConnectionPoolConfig::new();
990
991        assert_eq!(config.max_connections, 10);
992        assert_eq!(config.min_idle, 1);
993        assert!(config.auto_reconnect);
994    }
995
996    #[test]
997    fn test_pool_config_builder() {
998        let config = ConnectionPoolConfig::new()
999            .with_max_connections(20)
1000            .with_min_idle(5)
1001            .with_health_check_interval(60)
1002            .with_auto_reconnect(false);
1003
1004        assert_eq!(config.max_connections, 20);
1005        assert_eq!(config.min_idle, 5);
1006        assert_eq!(config.health_check_interval, Duration::from_secs(60));
1007        assert!(!config.auto_reconnect);
1008    }
1009
1010    #[test]
1011    fn test_pool_stats_default() {
1012        let stats = PoolStats::default();
1013
1014        assert_eq!(stats.connections_created, 0);
1015        assert_eq!(stats.active_connections, 0);
1016    }
1017
1018    #[test]
1019    fn test_pooled_connection_expiry() {
1020        use std::thread::sleep;
1021
1022        // Can't easily test without actual connection, just test the logic
1023        let max_lifetime = Duration::from_millis(10);
1024        let created_at = Instant::now();
1025
1026        sleep(Duration::from_millis(20));
1027
1028        assert!(created_at.elapsed() > max_lifetime);
1029    }
1030
1031    #[test]
1032    fn test_reconnecting_client_leader_addr() {
1033        // Test leader address tracking (without actual connection)
1034        let addr: SocketAddr = "127.0.0.1:1992".parse().unwrap();
1035        let leader: SocketAddr = "127.0.0.1:1993".parse().unwrap();
1036
1037        // Simulate leader address update logic
1038        let follow_leader = true;
1039        let mut config_addr = addr;
1040
1041        // Set leader - simulates set_leader_addr behavior
1042        let leader_addr: Option<SocketAddr> = Some(leader);
1043        if follow_leader {
1044            config_addr = leader;
1045        }
1046
1047        assert_eq!(leader_addr, Some(leader));
1048        assert_eq!(config_addr, leader);
1049    }
1050
1051    #[test]
1052    fn test_connection_pool_config_auto_reconnect() {
1053        let config = ConnectionPoolConfig::new()
1054            .with_auto_reconnect(true)
1055            .with_max_reconnect_attempts(10);
1056
1057        assert!(config.auto_reconnect);
1058        assert_eq!(config.max_reconnect_attempts, 10);
1059    }
1060}