1use std::collections::VecDeque;
40use std::net::SocketAddr;
41use std::sync::Arc;
42use std::sync::atomic::{AtomicBool, AtomicU64, Ordering};
43use std::time::{Duration, Instant};
44
45use tokio::sync::{Mutex, OwnedSemaphorePermit, Semaphore};
46
47use crate::client::{ClientConfig, LanceClient};
48use crate::error::{ClientError, Result};
49use crate::tls::TlsClientConfig;
50
51#[derive(Debug, Clone)]
54pub struct ConnectionPoolConfig {
55 pub max_connections: usize,
57 pub min_idle: usize,
59 pub connect_timeout: Duration,
61 pub acquire_timeout: Duration,
63 pub health_check_interval: Duration,
65 pub max_lifetime: Duration,
67 pub idle_timeout: Duration,
69 pub auto_reconnect: bool,
71 pub max_reconnect_attempts: u32,
73 pub reconnect_base_delay: Duration,
75 pub reconnect_max_delay: Duration,
77 pub tls_config: Option<TlsClientConfig>,
79}
80
81impl Default for ConnectionPoolConfig {
82 fn default() -> Self {
83 Self {
84 max_connections: 10,
85 min_idle: 1,
86 connect_timeout: Duration::from_secs(30),
87 acquire_timeout: Duration::from_secs(30),
88 health_check_interval: Duration::from_secs(30),
89 max_lifetime: Duration::from_secs(3600), idle_timeout: Duration::from_secs(300), auto_reconnect: true,
92 max_reconnect_attempts: 5,
93 reconnect_base_delay: Duration::from_millis(100),
94 reconnect_max_delay: Duration::from_secs(30),
95 tls_config: None,
96 }
97 }
98}
99
100impl ConnectionPoolConfig {
101 pub fn new() -> Self {
103 Self::default()
104 }
105
106 pub fn with_max_connections(mut self, n: usize) -> Self {
108 self.max_connections = n;
109 self
110 }
111
112 pub fn with_min_idle(mut self, n: usize) -> Self {
114 self.min_idle = n;
115 self
116 }
117
118 pub fn with_connect_timeout(mut self, timeout: Duration) -> Self {
120 self.connect_timeout = timeout;
121 self
122 }
123
124 pub fn with_acquire_timeout(mut self, timeout: Duration) -> Self {
126 self.acquire_timeout = timeout;
127 self
128 }
129
130 pub fn with_health_check_interval(mut self, secs: u64) -> Self {
132 self.health_check_interval = Duration::from_secs(secs);
133 self
134 }
135
136 pub fn with_max_lifetime(mut self, lifetime: Duration) -> Self {
138 self.max_lifetime = lifetime;
139 self
140 }
141
142 pub fn with_idle_timeout(mut self, timeout: Duration) -> Self {
144 self.idle_timeout = timeout;
145 self
146 }
147
148 pub fn with_auto_reconnect(mut self, enabled: bool) -> Self {
150 self.auto_reconnect = enabled;
151 self
152 }
153
154 pub fn with_max_reconnect_attempts(mut self, attempts: u32) -> Self {
156 self.max_reconnect_attempts = attempts;
157 self
158 }
159
160 pub fn with_tls(mut self, tls_config: TlsClientConfig) -> Self {
162 self.tls_config = Some(tls_config);
163 self
164 }
165}
166
167#[derive(Debug, Clone, Default)]
170pub struct PoolStats {
171 pub connections_created: u64,
173 pub connections_closed: u64,
175 pub active_connections: u64,
177 pub idle_connections: u64,
179 pub acquire_attempts: u64,
181 pub acquire_successes: u64,
183 pub acquire_failures: u64,
185 pub health_check_failures: u64,
187 pub reconnect_attempts: u64,
189}
190
191#[derive(Debug, Default)]
194struct PoolMetrics {
195 connections_created: AtomicU64,
196 connections_closed: AtomicU64,
197 active_connections: AtomicU64,
198 idle_connections: AtomicU64,
199 acquire_attempts: AtomicU64,
200 acquire_successes: AtomicU64,
201 acquire_failures: AtomicU64,
202 health_check_failures: AtomicU64,
203 reconnect_attempts: AtomicU64,
204}
205
206impl PoolMetrics {
207 fn snapshot(&self) -> PoolStats {
208 PoolStats {
209 connections_created: self.connections_created.load(Ordering::Relaxed),
210 connections_closed: self.connections_closed.load(Ordering::Relaxed),
211 active_connections: self.active_connections.load(Ordering::Relaxed),
212 idle_connections: self.idle_connections.load(Ordering::Relaxed),
213 acquire_attempts: self.acquire_attempts.load(Ordering::Relaxed),
214 acquire_successes: self.acquire_successes.load(Ordering::Relaxed),
215 acquire_failures: self.acquire_failures.load(Ordering::Relaxed),
216 health_check_failures: self.health_check_failures.load(Ordering::Relaxed),
217 reconnect_attempts: self.reconnect_attempts.load(Ordering::Relaxed),
218 }
219 }
220}
221
222struct PooledConnection {
225 client: LanceClient,
226 created_at: Instant,
227 last_used: Instant,
228}
229
230impl PooledConnection {
231 fn new(client: LanceClient) -> Self {
232 let now = Instant::now();
233 Self {
234 client,
235 created_at: now,
236 last_used: now,
237 }
238 }
239
240 fn is_expired(&self, max_lifetime: Duration) -> bool {
241 if max_lifetime.is_zero() {
242 return false;
243 }
244 self.created_at.elapsed() > max_lifetime
245 }
246
247 fn is_idle_too_long(&self, idle_timeout: Duration) -> bool {
248 if idle_timeout.is_zero() {
249 return false;
250 }
251 self.last_used.elapsed() > idle_timeout
252 }
253}
254
255pub struct ConnectionPool {
261 addr: String,
262 config: ConnectionPoolConfig,
263 connections: Arc<Mutex<VecDeque<PooledConnection>>>,
264 semaphore: Arc<Semaphore>,
265 metrics: Arc<PoolMetrics>,
266 running: Arc<AtomicBool>,
267}
268
269impl ConnectionPool {
270 pub async fn new(addr: &str, config: ConnectionPoolConfig) -> Result<Self> {
284 let pool = Self {
285 addr: addr.to_string(),
286 config: config.clone(),
287 connections: Arc::new(Mutex::new(VecDeque::new())),
288 semaphore: Arc::new(Semaphore::new(config.max_connections)),
289 metrics: Arc::new(PoolMetrics::default()),
290 running: Arc::new(AtomicBool::new(true)),
291 };
292
293 for _ in 0..config.min_idle {
295 if let Ok(conn) = pool.create_connection().await {
296 let mut connections = pool.connections.lock().await;
297 connections.push_back(conn);
298 pool.metrics
299 .idle_connections
300 .fetch_add(1, Ordering::Relaxed);
301 }
302 }
303
304 if !config.health_check_interval.is_zero() {
306 let pool_clone = ConnectionPool {
307 addr: pool.addr.clone(),
308 config: pool.config.clone(),
309 connections: pool.connections.clone(),
310 semaphore: pool.semaphore.clone(),
311 metrics: pool.metrics.clone(),
312 running: pool.running.clone(),
313 };
314 tokio::spawn(async move {
315 pool_clone.health_check_task().await;
316 });
317 }
318
319 Ok(pool)
320 }
321
322 pub async fn get(&self) -> Result<PooledClient> {
332 self.metrics
333 .acquire_attempts
334 .fetch_add(1, Ordering::Relaxed);
335
336 let permit = tokio::time::timeout(
338 self.config.acquire_timeout,
339 self.semaphore.clone().acquire_owned(),
340 )
341 .await
342 .map_err(|_| {
343 self.metrics
344 .acquire_failures
345 .fetch_add(1, Ordering::Relaxed);
346 ClientError::Timeout
347 })?
348 .map_err(|_| {
349 self.metrics
350 .acquire_failures
351 .fetch_add(1, Ordering::Relaxed);
352 ClientError::ConnectionClosed
353 })?;
354
355 let conn = {
357 let mut connections = self.connections.lock().await;
358 loop {
359 match connections.pop_front() {
360 Some(conn) => {
361 self.metrics
362 .idle_connections
363 .fetch_sub(1, Ordering::Relaxed);
364
365 if conn.is_expired(self.config.max_lifetime)
367 || conn.is_idle_too_long(self.config.idle_timeout)
368 {
369 self.metrics
370 .connections_closed
371 .fetch_add(1, Ordering::Relaxed);
372 continue;
373 }
374 break Some(conn);
375 },
376 None => break None,
377 }
378 }
379 };
380
381 let conn = match conn {
382 Some(mut c) => {
383 c.last_used = Instant::now();
384 c
385 },
386 None => {
387 self.create_connection().await?
389 },
390 };
391
392 self.metrics
393 .active_connections
394 .fetch_add(1, Ordering::Relaxed);
395 self.metrics
396 .acquire_successes
397 .fetch_add(1, Ordering::Relaxed);
398
399 Ok(PooledClient {
400 conn: Some(conn),
401 pool: self.connections.clone(),
402 metrics: self.metrics.clone(),
403 permit: Some(permit),
404 config: self.config.clone(),
405 })
406 }
407
408 async fn create_connection(&self) -> Result<PooledConnection> {
417 let mut client_config = ClientConfig::new(&self.addr);
418 client_config.connect_timeout = self.config.connect_timeout;
419
420 let client = match &self.config.tls_config {
421 Some(tls_config) => LanceClient::connect_tls(client_config, tls_config.clone()).await?,
422 None => LanceClient::connect(client_config).await?,
423 };
424 self.metrics
425 .connections_created
426 .fetch_add(1, Ordering::Relaxed);
427
428 Ok(PooledConnection::new(client))
429 }
430
431 pub fn stats(&self) -> PoolStats {
433 self.metrics.snapshot()
434 }
435
436 pub async fn close(&self) {
439 self.running.store(false, Ordering::Relaxed);
440
441 let mut connections = self.connections.lock().await;
442 let count = connections.len() as u64;
443 connections.clear();
444 self.metrics
445 .connections_closed
446 .fetch_add(count, Ordering::Relaxed);
447 self.metrics.idle_connections.store(0, Ordering::Relaxed);
448 }
449
450 async fn health_check_task(&self) {
452 let mut interval = tokio::time::interval(self.config.health_check_interval);
453
454 while self.running.load(Ordering::Relaxed) {
455 interval.tick().await;
456
457 let mut to_check = {
459 let mut connections = self.connections.lock().await;
460 std::mem::take(&mut *connections)
461 };
462
463 let mut healthy = VecDeque::new();
464 let _initial_count = to_check.len();
465
466 for mut conn in to_check.drain(..) {
467 if conn.is_expired(self.config.max_lifetime) {
469 self.metrics
470 .connections_closed
471 .fetch_add(1, Ordering::Relaxed);
472 continue;
473 }
474
475 match conn.client.ping().await {
477 Ok(_) => {
478 conn.last_used = Instant::now();
479 healthy.push_back(conn);
480 },
481 Err(_) => {
482 self.metrics
483 .health_check_failures
484 .fetch_add(1, Ordering::Relaxed);
485 self.metrics
486 .connections_closed
487 .fetch_add(1, Ordering::Relaxed);
488 },
489 }
490 }
491
492 {
494 let mut connections = self.connections.lock().await;
495 connections.extend(healthy);
496 self.metrics
497 .idle_connections
498 .store(connections.len() as u64, Ordering::Relaxed);
499 }
500 }
501 }
502}
503
504pub struct PooledClient {
507 conn: Option<PooledConnection>,
508 pool: Arc<Mutex<VecDeque<PooledConnection>>>,
509 metrics: Arc<PoolMetrics>,
510 #[allow(dead_code)]
511 permit: Option<OwnedSemaphorePermit>,
512 #[allow(dead_code)]
513 config: ConnectionPoolConfig,
514}
515
516impl PooledClient {
517 pub fn client(&mut self) -> Result<&mut LanceClient> {
519 match self.conn.as_mut() {
520 Some(conn) => Ok(&mut conn.client),
521 None => Err(ClientError::ConnectionClosed),
522 }
523 }
524
525 pub async fn ping(&mut self) -> Result<Duration> {
527 if let Some(ref mut conn) = self.conn {
528 conn.client.ping().await
529 } else {
530 Err(ClientError::ConnectionClosed)
531 }
532 }
533
534 pub fn mark_unhealthy(&mut self) {
536 self.conn = None;
537 self.metrics
538 .connections_closed
539 .fetch_add(1, Ordering::Relaxed);
540 }
541}
542
543impl Drop for PooledClient {
544 fn drop(&mut self) {
545 if let Some(mut conn) = self.conn.take() {
546 conn.last_used = Instant::now();
547
548 let pool = self.pool.clone();
550 let metrics = self.metrics.clone();
551
552 tokio::spawn(async move {
553 let mut connections = pool.lock().await;
554 connections.push_back(conn);
555 metrics.active_connections.fetch_sub(1, Ordering::Relaxed);
556 metrics.idle_connections.fetch_add(1, Ordering::Relaxed);
557 });
558 } else {
559 self.metrics
560 .active_connections
561 .fetch_sub(1, Ordering::Relaxed);
562 }
563
564 }
566}
567
568pub struct ReconnectingClient {
571 addr: String,
572 config: ClientConfig,
573 tls_config: Option<TlsClientConfig>,
574 client: Option<LanceClient>,
575 reconnect_attempts: u32,
576 max_attempts: u32,
577 base_delay: Duration,
578 max_delay: Duration,
579 leader_addr: Option<SocketAddr>,
581 follow_leader: bool,
583}
584
585impl ReconnectingClient {
586 pub async fn connect(addr: &str) -> Result<Self> {
591 let config = ClientConfig::new(addr);
592 let client = LanceClient::connect(config.clone()).await?;
593
594 Ok(Self {
595 addr: addr.to_string(),
596 config,
597 tls_config: None,
598 client: Some(client),
599 reconnect_attempts: 0,
600 max_attempts: 5,
601 base_delay: Duration::from_millis(100),
602 max_delay: Duration::from_secs(30),
603 leader_addr: None,
604 follow_leader: true,
605 })
606 }
607
608 pub fn from_existing(client: LanceClient, addr: &str) -> Self {
613 let config = client.config().clone();
614 Self {
615 addr: addr.to_string(),
616 config,
617 tls_config: None,
618 client: Some(client),
619 reconnect_attempts: 0,
620 max_attempts: 0, base_delay: Duration::from_millis(500),
622 max_delay: Duration::from_secs(30),
623 leader_addr: None,
624 follow_leader: true,
625 }
626 }
627
628 pub async fn connect_tls(addr: &str, tls_config: TlsClientConfig) -> Result<Self> {
633 let config = ClientConfig::new(addr);
634 let client = LanceClient::connect_tls(config.clone(), tls_config.clone()).await?;
635
636 Ok(Self {
637 addr: addr.to_string(),
638 config,
639 tls_config: Some(tls_config),
640 client: Some(client),
641 reconnect_attempts: 0,
642 max_attempts: 5,
643 base_delay: Duration::from_millis(100),
644 max_delay: Duration::from_secs(30),
645 leader_addr: None,
646 follow_leader: true,
647 })
648 }
649
650 pub fn with_max_attempts(mut self, attempts: u32) -> Self {
652 self.max_attempts = attempts;
653 self
654 }
655
656 pub fn with_unlimited_retries(mut self) -> Self {
658 self.max_attempts = 0;
659 self
660 }
661
662 pub fn with_base_delay(mut self, delay: Duration) -> Self {
664 self.base_delay = delay;
665 self
666 }
667
668 pub fn with_max_delay(mut self, delay: Duration) -> Self {
670 self.max_delay = delay;
671 self
672 }
673
674 pub fn with_follow_leader(mut self, follow: bool) -> Self {
676 self.follow_leader = follow;
677 self
678 }
679
680 pub fn original_addr(&self) -> &str {
682 &self.addr
683 }
684
685 pub fn leader_addr(&self) -> Option<SocketAddr> {
687 self.leader_addr
688 }
689
690 pub fn set_leader_addr(&mut self, addr: SocketAddr) {
692 self.leader_addr = Some(addr);
693 if self.follow_leader {
694 self.config.addr = addr.to_string();
696 }
697 }
698
699 pub fn reconnect_attempts(&self) -> u32 {
701 self.reconnect_attempts
702 }
703
704 pub async fn client(&mut self) -> Result<&mut LanceClient> {
706 if self.client.is_none() {
707 self.reconnect().await?;
708 }
709 self.client.as_mut().ok_or(ClientError::ConnectionClosed)
710 }
711
712 pub async fn reconnect(&mut self) -> Result<()> {
715 let mut attempts = 0;
716
717 loop {
718 attempts += 1;
719 self.reconnect_attempts += 1;
720
721 let mut config = self.config.clone();
725 config.addr = self.addr.clone();
726
727 let result = match &self.tls_config {
728 Some(tls) => LanceClient::connect_tls(config, tls.clone()).await,
729 None => LanceClient::connect(config).await,
730 };
731
732 match result {
733 Ok(client) => {
734 self.client = Some(client);
735 return Ok(());
736 },
737 Err(e) => {
738 if self.max_attempts > 0 && attempts >= self.max_attempts {
739 return Err(e);
740 }
741
742 let delay = self.base_delay * 2u32.saturating_pow(attempts - 1);
744 let delay = delay.min(self.max_delay);
745
746 tokio::time::sleep(delay).await;
747 },
748 }
749 }
750 }
751
752 pub async fn execute<F, T>(&mut self, op: F) -> Result<T>
756 where
757 F: Fn(
758 &mut LanceClient,
759 )
760 -> std::pin::Pin<Box<dyn std::future::Future<Output = Result<T>> + Send + '_>>,
761 {
762 let mut attempts = 0u32;
763 loop {
764 let client = self.client().await?;
765
766 match op(client).await {
767 Ok(result) => return Ok(result),
768 Err(e) if e.is_retryable() => {
769 attempts += 1;
770 if self.max_attempts > 0 && attempts >= self.max_attempts {
771 return Err(e);
772 }
773 self.client = None;
775 let delay = self.base_delay * 2u32.saturating_pow(attempts.saturating_sub(1));
777 let delay = delay.min(self.max_delay);
778 tokio::time::sleep(delay).await;
779 },
780 Err(e) => return Err(e),
781 }
782 }
783 }
784
785 pub fn mark_failed(&mut self) {
787 self.client = None;
788 }
789}
790
791pub struct ClusterClient {
794 nodes: Vec<SocketAddr>,
796 primary: Option<SocketAddr>,
798 config: ClientConfig,
800 tls_config: Option<TlsClientConfig>,
802 client: Option<LanceClient>,
804 last_discovery: Option<Instant>,
806 discovery_interval: Duration,
808}
809
810impl ClusterClient {
811 pub async fn connect(seed_addrs: &[&str]) -> Result<Self> {
826 let nodes: Vec<SocketAddr> = seed_addrs.iter().filter_map(|s| s.parse().ok()).collect();
827
828 if nodes.is_empty() {
829 return Err(ClientError::ProtocolError(
830 "No valid seed addresses".to_string(),
831 ));
832 }
833
834 let config = ClientConfig::new(nodes[0].to_string());
835 let mut cluster = Self {
836 nodes,
837 primary: None,
838 config,
839 tls_config: None,
840 client: None,
841 last_discovery: None,
842 discovery_interval: Duration::from_secs(60),
843 };
844
845 cluster.discover_cluster().await?;
846 Ok(cluster)
847 }
848
849 pub async fn connect_tls(seed_addrs: &[&str], tls_config: TlsClientConfig) -> Result<Self> {
863 let nodes: Vec<SocketAddr> = seed_addrs.iter().filter_map(|s| s.parse().ok()).collect();
864
865 if nodes.is_empty() {
866 return Err(ClientError::ProtocolError(
867 "No valid seed addresses".to_string(),
868 ));
869 }
870
871 let config = ClientConfig::new(nodes[0].to_string()).with_tls(tls_config.clone());
872 let mut cluster = Self {
873 nodes,
874 primary: None,
875 config,
876 tls_config: Some(tls_config),
877 client: None,
878 last_discovery: None,
879 discovery_interval: Duration::from_secs(60),
880 };
881
882 cluster.discover_cluster().await?;
883 Ok(cluster)
884 }
885
886 pub fn with_discovery_interval(mut self, interval: Duration) -> Self {
888 self.discovery_interval = interval;
889 self
890 }
891
892 async fn discover_cluster(&mut self) -> Result<()> {
894 for &node in &self.nodes.clone() {
895 let mut config = self.config.clone();
896 config.addr = node.to_string();
897
898 match LanceClient::connect(config).await {
899 Ok(mut client) => {
900 match client.get_cluster_status().await {
901 Ok(status) => {
902 self.primary = status.leader_id.map(|id| {
903 status
905 .peer_states
906 .get(&id)
907 .and_then(|s| s.parse().ok())
908 .unwrap_or(node)
909 });
910 self.last_discovery = Some(Instant::now());
911
912 if let Some(primary_addr) = self.primary {
914 self.config.addr = primary_addr.to_string();
915 self.client =
916 Some(LanceClient::connect(self.config.clone()).await?);
917 } else {
918 self.client = Some(client);
919 }
920 return Ok(());
921 },
922 Err(_) => {
923 self.client = Some(client);
925 self.primary = Some(node);
926 self.last_discovery = Some(Instant::now());
927 return Ok(());
928 },
929 }
930 },
931 Err(_) => continue,
932 }
933 }
934
935 Err(ClientError::ConnectionFailed(std::io::Error::new(
936 std::io::ErrorKind::NotConnected,
937 "Could not connect to any cluster node",
938 )))
939 }
940
941 pub async fn client(&mut self) -> Result<&mut LanceClient> {
943 let needs_refresh = self
945 .last_discovery
946 .map(|t| t.elapsed() > self.discovery_interval)
947 .unwrap_or(true);
948
949 if needs_refresh || self.client.is_none() {
950 self.discover_cluster().await?;
951 }
952
953 self.client.as_mut().ok_or(ClientError::ConnectionClosed)
954 }
955
956 pub fn primary(&self) -> Option<SocketAddr> {
958 self.primary
959 }
960
961 pub fn nodes(&self) -> &[SocketAddr] {
963 &self.nodes
964 }
965
966 pub fn tls_config(&self) -> Option<&TlsClientConfig> {
968 self.tls_config.as_ref()
969 }
970
971 pub fn is_tls_enabled(&self) -> bool {
973 self.tls_config.is_some()
974 }
975
976 pub async fn refresh(&mut self) -> Result<()> {
978 self.discover_cluster().await
979 }
980}
981
982#[cfg(test)]
983#[allow(clippy::unwrap_used)]
984mod tests {
985 use super::*;
986
987 #[test]
988 fn test_pool_config_defaults() {
989 let config = ConnectionPoolConfig::new();
990
991 assert_eq!(config.max_connections, 10);
992 assert_eq!(config.min_idle, 1);
993 assert!(config.auto_reconnect);
994 }
995
996 #[test]
997 fn test_pool_config_builder() {
998 let config = ConnectionPoolConfig::new()
999 .with_max_connections(20)
1000 .with_min_idle(5)
1001 .with_health_check_interval(60)
1002 .with_auto_reconnect(false);
1003
1004 assert_eq!(config.max_connections, 20);
1005 assert_eq!(config.min_idle, 5);
1006 assert_eq!(config.health_check_interval, Duration::from_secs(60));
1007 assert!(!config.auto_reconnect);
1008 }
1009
1010 #[test]
1011 fn test_pool_stats_default() {
1012 let stats = PoolStats::default();
1013
1014 assert_eq!(stats.connections_created, 0);
1015 assert_eq!(stats.active_connections, 0);
1016 }
1017
1018 #[test]
1019 fn test_pooled_connection_expiry() {
1020 use std::thread::sleep;
1021
1022 let max_lifetime = Duration::from_millis(10);
1024 let created_at = Instant::now();
1025
1026 sleep(Duration::from_millis(20));
1027
1028 assert!(created_at.elapsed() > max_lifetime);
1029 }
1030
1031 #[test]
1032 fn test_reconnecting_client_leader_addr() {
1033 let addr: SocketAddr = "127.0.0.1:1992".parse().unwrap();
1035 let leader: SocketAddr = "127.0.0.1:1993".parse().unwrap();
1036
1037 let follow_leader = true;
1039 let mut config_addr = addr;
1040
1041 let leader_addr: Option<SocketAddr> = Some(leader);
1043 if follow_leader {
1044 config_addr = leader;
1045 }
1046
1047 assert_eq!(leader_addr, Some(leader));
1048 assert_eq!(config_addr, leader);
1049 }
1050
1051 #[test]
1052 fn test_connection_pool_config_auto_reconnect() {
1053 let config = ConnectionPoolConfig::new()
1054 .with_auto_reconnect(true)
1055 .with_max_reconnect_attempts(10);
1056
1057 assert!(config.auto_reconnect);
1058 assert_eq!(config.max_reconnect_attempts, 10);
1059 }
1060}