tarantool_rs/
builder.rs

1use std::{cmp::max, fmt::Display, time::Duration};
2
3use tokio::net::ToSocketAddrs;
4use tracing::debug;
5
6use crate::{
7    client::Connection,
8    codec::{consts::TransactionIsolationLevel, request::Id},
9    errors::Error,
10    transport::Dispatcher,
11};
12
13const DEFAULT_DISPATCHER_INTERNAL_QUEUE_SIZE: usize = 500;
14const DEFAULT_SQL_STATEMENT_CACHE_CAPACITY: usize = 500;
15
16/// Interval parameters for background reconnection.
17#[derive(Clone, Debug, PartialEq)]
18pub enum ReconnectInterval {
19    Fixed(Duration),
20    ExponentialBackoff {
21        min: Duration,
22        max: Duration,
23        randomization_factor: f64,
24        multiplier: f64,
25    },
26}
27
28impl Default for ReconnectInterval {
29    fn default() -> Self {
30        Self::exponential_backoff(Duration::from_millis(1), Duration::from_secs(1), 0.5, 5.0)
31    }
32}
33
34impl ReconnectInterval {
35    /// Fixed interval between reconnection attempts.
36    pub fn fixed(interval: Duration) -> Self {
37        Self::Fixed(interval)
38    }
39
40    /// Interval between reconnection attempts calculated as
41    /// exponentially growing period.
42    ///
43    /// For details on this values check [`backoff::ExponentialBackoff`] docs.
44    pub fn exponential_backoff(
45        min_interval: Duration,
46        max_interval: Duration,
47        randomization_factor: f64,
48        multiplier: f64,
49    ) -> Self {
50        Self::ExponentialBackoff {
51            min: max(min_interval, Duration::from_micros(1)),
52            max: max_interval,
53            randomization_factor,
54            multiplier,
55        }
56    }
57}
58
59/// Build connection to Tarantool.
60#[derive(Debug)]
61pub struct ConnectionBuilder {
62    user: Option<String>,
63    password: Option<String>,
64    timeout: Option<Duration>,
65    transaction_timeout: Option<Duration>,
66    transaction_isolation_level: TransactionIsolationLevel,
67    connect_timeout: Option<Duration>,
68    reconnect_interval: Option<ReconnectInterval>,
69    sql_statement_cache_capacity: usize,
70    internal_simultaneous_requests_threshold: usize,
71}
72
73impl Default for ConnectionBuilder {
74    fn default() -> Self {
75        Self {
76            user: None,
77            password: None,
78            timeout: None,
79            transaction_timeout: None,
80            transaction_isolation_level: Default::default(),
81            connect_timeout: None,
82            reconnect_interval: Some(ReconnectInterval::default()),
83            sql_statement_cache_capacity: DEFAULT_SQL_STATEMENT_CACHE_CAPACITY,
84            internal_simultaneous_requests_threshold: DEFAULT_DISPATCHER_INTERNAL_QUEUE_SIZE,
85        }
86    }
87}
88
89impl ConnectionBuilder {
90    /// Create connection to Tarantool using provided address.
91    pub async fn build<A>(&self, addr: A) -> Result<Connection, Error>
92    where
93        A: ToSocketAddrs + Display + Clone + Send + Sync + 'static,
94    {
95        let (dispatcher_fut, disaptcher_sender) = Dispatcher::prepare(
96            addr,
97            self.user.as_deref(),
98            self.password.as_deref(),
99            self.timeout,
100            self.reconnect_interval.clone(),
101            self.internal_simultaneous_requests_threshold,
102        )
103        .await?;
104
105        // TODO: support setting custom executor
106        tokio::spawn(dispatcher_fut);
107        let conn = Connection::new(
108            disaptcher_sender,
109            self.timeout,
110            self.transaction_timeout,
111            self.transaction_isolation_level,
112            self.sql_statement_cache_capacity,
113        );
114
115        // TODO: add option to disable pre 2.10 features (ID request, streams, watchers)
116        let features = Id::default();
117        debug!(
118            "Setting supported features: VERSION - {}, STREAMS - {}, TRANSACTIONS - {}, ERROR_EXTENSION - {}, WATCHERS = {}",
119            features.protocol_version,
120            features.streams,
121            features.transactions,
122            features.error_extension,
123            features.watchers
124        );
125        conn.id(features).await?;
126
127        Ok(conn)
128    }
129
130    /// Sets user login and, optionally, password, used for this connection.
131    ///
132    /// AUTH message sent upon connecting to server.
133    pub fn auth<'a>(&mut self, user: &str, password: impl Into<Option<&'a str>>) -> &mut Self {
134        self.user = Some(user.into());
135        self.password = password.into().map(Into::into);
136        self
137    }
138
139    /// Sets timeout for requests.
140    ///
141    /// By default disabled.
142    pub fn timeout(&mut self, timeout: impl Into<Option<Duration>>) -> &mut Self {
143        self.timeout = timeout.into();
144        self
145    }
146
147    /// Sets default timeout for transactions.
148    ///
149    /// By default disabled.
150    pub fn transaction_timeout(
151        &mut self,
152        transaction_timeout: impl Into<Option<Duration>>,
153    ) -> &mut Self {
154        self.transaction_timeout = transaction_timeout.into();
155        self
156    }
157
158    /// Sets default transaction isolation level.
159    ///
160    /// By default `TransactionIsolationLevel::Default` (i.e. use box.cfg default value).
161    pub fn transaction_isolation_level(
162        &mut self,
163        transaction_isolation_level: TransactionIsolationLevel,
164    ) -> &mut Self {
165        self.transaction_isolation_level = transaction_isolation_level;
166        self
167    }
168
169    /// Sets timeout for connect.
170    ///
171    /// By default disabled.
172    pub fn connect_timeout(&mut self, connect_timeout: impl Into<Option<Duration>>) -> &mut Self {
173        self.connect_timeout = connect_timeout.into();
174        self
175    }
176
177    /// Sets interval between reconnection attempts.
178    ///
179    /// If disabled, next attempt wil lbe started as soon as last one finished.
180    ///
181    /// By default set to `ReconnectInterval::exponential_backoff(Duration::from_millis(1), Duration::from_secs(1), 0.5, 5.0)`.
182    pub fn reconnect_interval(
183        &mut self,
184        reconnect_interval: impl Into<Option<ReconnectInterval>>,
185    ) -> &mut Self {
186        self.reconnect_interval = reconnect_interval.into();
187        self
188    }
189
190    /// Sets capacity of SQL statment cache.
191    ///
192    /// Setting 0 disables cache. By default set to 100.
193    pub fn sql_statement_cache_capacity(&mut self, capacity: usize) -> &mut Self {
194        self.sql_statement_cache_capacity = capacity;
195        self
196    }
197
198    /// Prepare `Connection` to process `value` number of simultaneously created requests.
199    ///
200    /// It is not hard limit, but making more simultaneous requests than this value
201    /// will result in degradation in performance, so try to increase this value,
202    /// if you unsatisfied with performance.
203    ///
204    /// Internally connection have multiple bounded channels, and this parameter mostly
205    /// affect size of this channels. Increasing this value can help if you have a lot of simultaneously
206    /// created requests, however this will increase memory consumption.
207    ///
208    /// By default set to 500, which should be reasonable compromise between memory
209    /// (about 100 KB) and performance.
210    pub fn internal_simultaneous_requests_threshold(&mut self, value: usize) -> &mut Self {
211        self.internal_simultaneous_requests_threshold = value;
212        self
213    }
214}