1use std::{cmp::max, fmt::Display, time::Duration};
2
3use tokio::net::ToSocketAddrs;
4use tracing::debug;
5
6use crate::{
7 client::Connection,
8 codec::{consts::TransactionIsolationLevel, request::Id},
9 errors::Error,
10 transport::Dispatcher,
11};
12
13const DEFAULT_DISPATCHER_INTERNAL_QUEUE_SIZE: usize = 500;
14const DEFAULT_SQL_STATEMENT_CACHE_CAPACITY: usize = 500;
15
16#[derive(Clone, Debug, PartialEq)]
18pub enum ReconnectInterval {
19 Fixed(Duration),
20 ExponentialBackoff {
21 min: Duration,
22 max: Duration,
23 randomization_factor: f64,
24 multiplier: f64,
25 },
26}
27
28impl Default for ReconnectInterval {
29 fn default() -> Self {
30 Self::exponential_backoff(Duration::from_millis(1), Duration::from_secs(1), 0.5, 5.0)
31 }
32}
33
34impl ReconnectInterval {
35 pub fn fixed(interval: Duration) -> Self {
37 Self::Fixed(interval)
38 }
39
40 pub fn exponential_backoff(
45 min_interval: Duration,
46 max_interval: Duration,
47 randomization_factor: f64,
48 multiplier: f64,
49 ) -> Self {
50 Self::ExponentialBackoff {
51 min: max(min_interval, Duration::from_micros(1)),
52 max: max_interval,
53 randomization_factor,
54 multiplier,
55 }
56 }
57}
58
59#[derive(Debug)]
61pub struct ConnectionBuilder {
62 user: Option<String>,
63 password: Option<String>,
64 timeout: Option<Duration>,
65 transaction_timeout: Option<Duration>,
66 transaction_isolation_level: TransactionIsolationLevel,
67 connect_timeout: Option<Duration>,
68 reconnect_interval: Option<ReconnectInterval>,
69 sql_statement_cache_capacity: usize,
70 internal_simultaneous_requests_threshold: usize,
71}
72
73impl Default for ConnectionBuilder {
74 fn default() -> Self {
75 Self {
76 user: None,
77 password: None,
78 timeout: None,
79 transaction_timeout: None,
80 transaction_isolation_level: Default::default(),
81 connect_timeout: None,
82 reconnect_interval: Some(ReconnectInterval::default()),
83 sql_statement_cache_capacity: DEFAULT_SQL_STATEMENT_CACHE_CAPACITY,
84 internal_simultaneous_requests_threshold: DEFAULT_DISPATCHER_INTERNAL_QUEUE_SIZE,
85 }
86 }
87}
88
89impl ConnectionBuilder {
90 pub async fn build<A>(&self, addr: A) -> Result<Connection, Error>
92 where
93 A: ToSocketAddrs + Display + Clone + Send + Sync + 'static,
94 {
95 let (dispatcher_fut, disaptcher_sender) = Dispatcher::prepare(
96 addr,
97 self.user.as_deref(),
98 self.password.as_deref(),
99 self.timeout,
100 self.reconnect_interval.clone(),
101 self.internal_simultaneous_requests_threshold,
102 )
103 .await?;
104
105 tokio::spawn(dispatcher_fut);
107 let conn = Connection::new(
108 disaptcher_sender,
109 self.timeout,
110 self.transaction_timeout,
111 self.transaction_isolation_level,
112 self.sql_statement_cache_capacity,
113 );
114
115 let features = Id::default();
117 debug!(
118 "Setting supported features: VERSION - {}, STREAMS - {}, TRANSACTIONS - {}, ERROR_EXTENSION - {}, WATCHERS = {}",
119 features.protocol_version,
120 features.streams,
121 features.transactions,
122 features.error_extension,
123 features.watchers
124 );
125 conn.id(features).await?;
126
127 Ok(conn)
128 }
129
130 pub fn auth<'a>(&mut self, user: &str, password: impl Into<Option<&'a str>>) -> &mut Self {
134 self.user = Some(user.into());
135 self.password = password.into().map(Into::into);
136 self
137 }
138
139 pub fn timeout(&mut self, timeout: impl Into<Option<Duration>>) -> &mut Self {
143 self.timeout = timeout.into();
144 self
145 }
146
147 pub fn transaction_timeout(
151 &mut self,
152 transaction_timeout: impl Into<Option<Duration>>,
153 ) -> &mut Self {
154 self.transaction_timeout = transaction_timeout.into();
155 self
156 }
157
158 pub fn transaction_isolation_level(
162 &mut self,
163 transaction_isolation_level: TransactionIsolationLevel,
164 ) -> &mut Self {
165 self.transaction_isolation_level = transaction_isolation_level;
166 self
167 }
168
169 pub fn connect_timeout(&mut self, connect_timeout: impl Into<Option<Duration>>) -> &mut Self {
173 self.connect_timeout = connect_timeout.into();
174 self
175 }
176
177 pub fn reconnect_interval(
183 &mut self,
184 reconnect_interval: impl Into<Option<ReconnectInterval>>,
185 ) -> &mut Self {
186 self.reconnect_interval = reconnect_interval.into();
187 self
188 }
189
190 pub fn sql_statement_cache_capacity(&mut self, capacity: usize) -> &mut Self {
194 self.sql_statement_cache_capacity = capacity;
195 self
196 }
197
198 pub fn internal_simultaneous_requests_threshold(&mut self, value: usize) -> &mut Self {
211 self.internal_simultaneous_requests_threshold = value;
212 self
213 }
214}