Skip to main content

nautilus_connector/
pool_options.rs

1//! Shared connection-pool and statement-cache overrides for connector executors.
2
3use std::time::Duration;
4
5use sqlx::{
6    mysql::MySqlConnectOptions, pool::PoolOptions as SqlxPoolOptions, postgres::PgConnectOptions,
7    sqlite::SqliteConnectOptions, Database,
8};
9
10/// Optional overrides for the sqlx connection pool and statement cache used by
11/// Nautilus executors.
12///
13/// Any field left unset preserves the backend-specific defaults used by
14/// [`crate::PgExecutor::new`], [`crate::MysqlExecutor::new`], or
15/// [`crate::SqliteExecutor::new`].
16#[derive(Debug, Clone, Copy, Default, PartialEq, Eq)]
17pub struct ConnectorPoolOptions {
18    max_connections: Option<u32>,
19    min_connections: Option<u32>,
20    acquire_timeout: Option<Duration>,
21    idle_timeout: Option<Option<Duration>>,
22    test_before_acquire: Option<bool>,
23    statement_cache_capacity: Option<usize>,
24}
25
26impl ConnectorPoolOptions {
27    /// Create an empty set of pool overrides.
28    pub const fn new() -> Self {
29        Self {
30            max_connections: None,
31            min_connections: None,
32            acquire_timeout: None,
33            idle_timeout: None,
34            test_before_acquire: None,
35            statement_cache_capacity: None,
36        }
37    }
38
39    /// Override the maximum number of pooled connections.
40    pub fn max_connections(mut self, max_connections: u32) -> Self {
41        self.max_connections = Some(max_connections);
42        self
43    }
44
45    /// Override the minimum number of pooled connections kept warm.
46    pub fn min_connections(mut self, min_connections: u32) -> Self {
47        self.min_connections = Some(min_connections);
48        self
49    }
50
51    /// Override the maximum time spent waiting for a pooled connection.
52    pub fn acquire_timeout(mut self, acquire_timeout: Duration) -> Self {
53        self.acquire_timeout = Some(acquire_timeout);
54        self
55    }
56
57    /// Override the maximum idle duration for pooled connections.
58    ///
59    /// Pass `None` to disable idle reaping entirely.
60    pub fn idle_timeout(mut self, idle_timeout: impl Into<Option<Duration>>) -> Self {
61        self.idle_timeout = Some(idle_timeout.into());
62        self
63    }
64
65    /// Override whether sqlx pings a connection before returning it from the pool.
66    pub fn test_before_acquire(mut self, test_before_acquire: bool) -> Self {
67        self.test_before_acquire = Some(test_before_acquire);
68        self
69    }
70
71    /// Override the per-connection statement cache capacity used by sqlx.
72    ///
73    /// Set this to `0` to disable statement caching entirely.
74    pub fn statement_cache_capacity(mut self, statement_cache_capacity: usize) -> Self {
75        self.statement_cache_capacity = Some(statement_cache_capacity);
76        self
77    }
78
79    /// Return the configured maximum-connection override, if any.
80    pub const fn get_max_connections(&self) -> Option<u32> {
81        self.max_connections
82    }
83
84    /// Return the configured minimum-connection override, if any.
85    pub const fn get_min_connections(&self) -> Option<u32> {
86        self.min_connections
87    }
88
89    /// Return the configured acquire-timeout override, if any.
90    pub const fn get_acquire_timeout(&self) -> Option<Duration> {
91        self.acquire_timeout
92    }
93
94    /// Return the configured idle-timeout override, if any.
95    ///
96    /// `None` means "use the executor default". `Some(None)` means "disable
97    /// idle timeout". `Some(Some(duration))` sets a custom timeout.
98    pub const fn get_idle_timeout(&self) -> Option<Option<Duration>> {
99        self.idle_timeout
100    }
101
102    /// Return the configured `test_before_acquire` override, if any.
103    pub const fn get_test_before_acquire(&self) -> Option<bool> {
104        self.test_before_acquire
105    }
106
107    /// Return the configured statement-cache-capacity override, if any.
108    pub const fn get_statement_cache_capacity(&self) -> Option<usize> {
109        self.statement_cache_capacity
110    }
111
112    pub(crate) fn apply_to<DB: Database>(
113        &self,
114        mut options: SqlxPoolOptions<DB>,
115    ) -> SqlxPoolOptions<DB> {
116        if let Some(max_connections) = self.max_connections {
117            options = options.max_connections(max_connections);
118        }
119        if let Some(min_connections) = self.min_connections {
120            options = options.min_connections(min_connections);
121        }
122        if let Some(acquire_timeout) = self.acquire_timeout {
123            options = options.acquire_timeout(acquire_timeout);
124        }
125        if let Some(idle_timeout) = self.idle_timeout {
126            options = options.idle_timeout(idle_timeout);
127        }
128        if let Some(test_before_acquire) = self.test_before_acquire {
129            options = options.test_before_acquire(test_before_acquire);
130        }
131        options
132    }
133
134    pub(crate) fn apply_to_postgres_connect_options(
135        &self,
136        mut options: PgConnectOptions,
137    ) -> PgConnectOptions {
138        if let Some(statement_cache_capacity) = self.statement_cache_capacity {
139            options = options.statement_cache_capacity(statement_cache_capacity);
140        }
141        options
142    }
143
144    pub(crate) fn apply_to_mysql_connect_options(
145        &self,
146        mut options: MySqlConnectOptions,
147    ) -> MySqlConnectOptions {
148        if let Some(statement_cache_capacity) = self.statement_cache_capacity {
149            options = options.statement_cache_capacity(statement_cache_capacity);
150        }
151        options
152    }
153
154    pub(crate) fn apply_to_sqlite_connect_options(
155        &self,
156        mut options: SqliteConnectOptions,
157    ) -> SqliteConnectOptions {
158        if let Some(statement_cache_capacity) = self.statement_cache_capacity {
159            options = options.statement_cache_capacity(statement_cache_capacity);
160        }
161        options
162    }
163}
164
165#[cfg(test)]
166mod tests {
167    use std::time::Duration;
168
169    use sqlx::{
170        mysql::MySqlConnectOptions,
171        postgres::{PgConnectOptions, PgPoolOptions},
172        sqlite::SqliteConnectOptions,
173        ConnectOptions,
174    };
175
176    use super::ConnectorPoolOptions;
177
178    #[test]
179    fn apply_to_preserves_unspecified_backend_defaults() {
180        let base = PgPoolOptions::new()
181            .max_connections(10)
182            .min_connections(1)
183            .acquire_timeout(Duration::from_secs(10))
184            .idle_timeout(Duration::from_secs(300))
185            .test_before_acquire(true);
186
187        let applied = ConnectorPoolOptions::new()
188            .max_connections(24)
189            .apply_to(base);
190
191        assert_eq!(applied.get_max_connections(), 24);
192        assert_eq!(applied.get_min_connections(), 1);
193        assert_eq!(applied.get_acquire_timeout(), Duration::from_secs(10));
194        assert_eq!(applied.get_idle_timeout(), Some(Duration::from_secs(300)));
195        assert!(applied.get_test_before_acquire());
196    }
197
198    #[test]
199    fn apply_to_can_disable_idle_timeout() {
200        let base = PgPoolOptions::new().idle_timeout(Duration::from_secs(300));
201
202        let applied = ConnectorPoolOptions::new()
203            .idle_timeout(None)
204            .apply_to(base);
205
206        assert_eq!(applied.get_idle_timeout(), None);
207    }
208
209    #[test]
210    fn apply_to_postgres_connect_options_can_override_statement_cache_capacity() {
211        let applied = ConnectorPoolOptions::new()
212            .statement_cache_capacity(7)
213            .apply_to_postgres_connect_options(
214                "postgres://localhost/nautilus"
215                    .parse::<PgConnectOptions>()
216                    .expect("postgres url should parse"),
217            );
218
219        let query = applied.to_url_lossy();
220        assert_eq!(
221            query
222                .query_pairs()
223                .find(|(key, _)| key == "statement-cache-capacity")
224                .map(|(_, value)| value.into_owned())
225                .as_deref(),
226            Some("7")
227        );
228    }
229
230    #[test]
231    fn apply_to_mysql_connect_options_can_override_statement_cache_capacity() {
232        let applied = ConnectorPoolOptions::new()
233            .statement_cache_capacity(9)
234            .apply_to_mysql_connect_options(
235                "mysql://root:password@localhost/nautilus"
236                    .parse::<MySqlConnectOptions>()
237                    .expect("mysql url should parse"),
238            );
239
240        let query = applied.to_url_lossy();
241        assert_eq!(
242            query
243                .query_pairs()
244                .find(|(key, _)| key == "statement-cache-capacity")
245                .map(|(_, value)| value.into_owned())
246                .as_deref(),
247            Some("9")
248        );
249    }
250
251    #[test]
252    fn apply_to_sqlite_connect_options_can_override_statement_cache_capacity() {
253        let applied = ConnectorPoolOptions::new()
254            .statement_cache_capacity(0)
255            .apply_to_sqlite_connect_options(
256                "sqlite://nautilus.db"
257                    .parse::<SqliteConnectOptions>()
258                    .expect("sqlite url should parse"),
259            );
260
261        assert!(
262            format!("{applied:?}").contains("statement_cache_capacity: 0"),
263            "sqlite connect options should reflect the overridden statement cache capacity: {applied:?}"
264        );
265    }
266}