sqlx_core/
connection.rs

1use crate::database::{Database, HasStatementCache};
2use crate::error::Error;
3
4use crate::config;
5use crate::sql_str::SqlSafeStr;
6use crate::transaction::{Transaction, TransactionManager};
7use futures_core::future::BoxFuture;
8use log::LevelFilter;
9use std::fmt::Debug;
10use std::future::Future;
11use std::str::FromStr;
12use std::time::Duration;
13use url::Url;
14
15/// Represents a single database connection.
16pub trait Connection: Send {
17    type Database: Database<Connection = Self>;
18
19    type Options: ConnectOptions<Connection = Self>;
20
21    /// Explicitly close this database connection.
22    ///
23    /// This notifies the database server that the connection is closing so that it can
24    /// free up any server-side resources in use.
25    ///
26    /// While connections can simply be dropped to clean up local resources,
27    /// the `Drop` handler itself cannot notify the server that the connection is being closed
28    /// because that may require I/O to send a termination message. That can result in a delay
29    /// before the server learns that the connection is gone, usually from a TCP keepalive timeout.
30    ///
31    /// Creating and dropping many connections in short order without calling `.close()` may
32    /// lead to errors from the database server because those senescent connections will still
33    /// count against any connection limit or quota that is configured.
34    ///
35    /// Therefore it is recommended to call `.close()` on a connection when you are done using it
36    /// and to `.await` the result to ensure the termination message is sent.
37    fn close(self) -> impl Future<Output = Result<(), Error>> + Send + 'static;
38
39    /// Immediately close the connection without sending a graceful shutdown.
40    ///
41    /// This should still at least send a TCP `FIN` frame to let the server know we're dying.
42    #[doc(hidden)]
43    fn close_hard(self) -> impl Future<Output = Result<(), Error>> + Send + 'static;
44
45    /// Checks if a connection to the database is still valid.
46    fn ping(&mut self) -> impl Future<Output = Result<(), Error>> + Send + '_;
47
48    /// Begin a new transaction or establish a savepoint within the active transaction.
49    ///
50    /// Returns a [`Transaction`] for controlling and tracking the new transaction.
51    fn begin(
52        &mut self,
53    ) -> impl Future<Output = Result<Transaction<'_, Self::Database>, Error>> + Send + '_;
54
55    /// Begin a new transaction with a custom statement.
56    ///
57    /// Returns a [`Transaction`] for controlling and tracking the new transaction.
58    ///
59    /// Returns an error if the connection is already in a transaction or if
60    /// `statement` does not put the connection into a transaction.
61    fn begin_with(
62        &mut self,
63        statement: impl SqlSafeStr,
64    ) -> impl Future<Output = Result<Transaction<'_, Self::Database>, Error>> + Send + '_
65    where
66        Self: Sized,
67    {
68        Transaction::begin(self, Some(statement.into_sql_str()))
69    }
70
71    /// Returns `true` if the connection is currently in a transaction.
72    ///
73    /// # Note: Automatic Rollbacks May Not Be Counted
74    /// Certain database errors (such as a serializable isolation failure)
75    /// can cause automatic rollbacks of a transaction
76    /// which may not be indicated in the return value of this method.
77    #[inline]
78    fn is_in_transaction(&self) -> bool {
79        <Self::Database as Database>::TransactionManager::get_transaction_depth(self) != 0
80    }
81
82    /// Execute the function inside a transaction.
83    ///
84    /// If the function returns an error, the transaction will be rolled back. If it does not
85    /// return an error, the transaction will be committed.
86    ///
87    /// # Example
88    ///
89    /// ```rust
90    /// use sqlx::postgres::{PgConnection, PgRow};
91    /// use sqlx::Connection;
92    ///
93    /// # pub async fn _f(conn: &mut PgConnection) -> sqlx::Result<Vec<PgRow>> {
94    /// conn.transaction(|txn| Box::pin(async move {
95    ///     sqlx::query("select * from ..").fetch_all(&mut **txn).await
96    /// })).await
97    /// # }
98    /// ```
99    fn transaction<'a, F, R, E>(
100        &'a mut self,
101        callback: F,
102    ) -> impl Future<Output = Result<R, E>> + Send + 'a
103    where
104        for<'c> F: FnOnce(&'c mut Transaction<'_, Self::Database>) -> BoxFuture<'c, Result<R, E>>
105            + 'a
106            + Send
107            + Sync,
108        Self: Sized,
109        R: Send,
110        E: From<Error> + Send,
111    {
112        async move {
113            let mut transaction = self.begin().await?;
114            let ret = callback(&mut transaction).await;
115
116            match ret {
117                Ok(ret) => {
118                    transaction.commit().await?;
119
120                    Ok(ret)
121                }
122                Err(err) => {
123                    transaction.rollback().await?;
124
125                    Err(err)
126                }
127            }
128        }
129    }
130
131    /// The number of statements currently cached in the connection.
132    fn cached_statements_size(&self) -> usize
133    where
134        Self::Database: HasStatementCache,
135    {
136        0
137    }
138
139    /// Removes all statements from the cache, closing them on the server if
140    /// needed.
141    fn clear_cached_statements(&mut self) -> impl Future<Output = Result<(), Error>> + Send + '_
142    where
143        Self::Database: HasStatementCache,
144    {
145        async move { Ok(()) }
146    }
147
148    /// Restore any buffers in the connection to their default capacity, if possible.
149    ///
150    /// Sending a large query or receiving a resultset with many columns can cause the connection
151    /// to allocate additional buffer space to fit the data which is retained afterwards in
152    /// case it's needed again. This can give the outward appearance of a memory leak, but is
153    /// in fact the intended behavior.
154    ///
155    /// Calling this method tells the connection to release that excess memory if it can,
156    /// though be aware that calling this too often can cause unnecessary thrashing or
157    /// fragmentation in the global allocator. If there's still data in the connection buffers
158    /// (unlikely if the last query was run to completion) then it may need to be moved to
159    /// allow the buffers to shrink.
160    fn shrink_buffers(&mut self);
161
162    #[doc(hidden)]
163    fn flush(&mut self) -> impl Future<Output = Result<(), Error>> + Send + '_;
164
165    #[doc(hidden)]
166    fn should_flush(&self) -> bool;
167
168    /// Establish a new database connection.
169    ///
170    /// A value of [`Options`][Self::Options] is parsed from the provided connection string. This parsing
171    /// is database-specific.
172    #[inline]
173    fn connect(url: &str) -> impl Future<Output = Result<Self, Error>> + Send + 'static
174    where
175        Self: Sized,
176    {
177        let options = url.parse();
178
179        async move { Self::connect_with(&options?).await }
180    }
181
182    /// Establish a new database connection with the provided options.
183    fn connect_with(
184        options: &Self::Options,
185    ) -> impl Future<Output = Result<Self, Error>> + Send + '_
186    where
187        Self: Sized,
188    {
189        options.connect()
190    }
191}
192
193#[derive(Clone, Debug)]
194#[non_exhaustive]
195pub struct LogSettings {
196    pub statements_level: LevelFilter,
197    pub slow_statements_level: LevelFilter,
198    pub slow_statements_duration: Duration,
199}
200
201impl Default for LogSettings {
202    fn default() -> Self {
203        LogSettings {
204            statements_level: LevelFilter::Debug,
205            slow_statements_level: LevelFilter::Warn,
206            slow_statements_duration: Duration::from_secs(1),
207        }
208    }
209}
210
211impl LogSettings {
212    pub fn log_statements(&mut self, level: LevelFilter) {
213        self.statements_level = level;
214    }
215    pub fn log_slow_statements(&mut self, level: LevelFilter, duration: Duration) {
216        self.slow_statements_level = level;
217        self.slow_statements_duration = duration;
218    }
219}
220
221pub trait ConnectOptions: 'static + Send + Sync + FromStr<Err = Error> + Debug + Clone {
222    type Connection: Connection<Options = Self> + ?Sized;
223
224    /// Parse the `ConnectOptions` from a URL.
225    fn from_url(url: &Url) -> Result<Self, Error>;
226
227    /// Get a connection URL that may be used to connect to the same database as this `ConnectOptions`.
228    ///
229    /// ### Note: Lossy
230    /// Any flags or settings which do not have a representation in the URL format will be lost.
231    /// They will fall back to their default settings when the URL is parsed.
232    ///
233    /// The only settings guaranteed to be preserved are:
234    /// * Username
235    /// * Password
236    /// * Hostname
237    /// * Port
238    /// * Database name
239    /// * Unix socket or SQLite database file path
240    /// * SSL mode (if applicable)
241    /// * SSL CA certificate path
242    /// * SSL client certificate path
243    /// * SSL client key path
244    ///
245    /// Additional settings are driver-specific. Refer to the source of a given implementation
246    /// to see which options are preserved in the URL.
247    ///
248    /// ### Panics
249    /// This defaults to `unimplemented!()`.
250    ///
251    /// Individual drivers should override this to implement the intended behavior.
252    fn to_url_lossy(&self) -> Url {
253        unimplemented!()
254    }
255
256    /// Establish a new database connection with the options specified by `self`.
257    fn connect(&self) -> impl Future<Output = Result<Self::Connection, Error>> + Send + '_
258    where
259        Self::Connection: Sized;
260
261    /// Log executed statements with the specified `level`
262    fn log_statements(self, level: LevelFilter) -> Self;
263
264    /// Log executed statements with a duration above the specified `duration`
265    /// at the specified `level`.
266    fn log_slow_statements(self, level: LevelFilter, duration: Duration) -> Self;
267
268    /// Entirely disables statement logging (both slow and regular).
269    fn disable_statement_logging(self) -> Self {
270        self.log_statements(LevelFilter::Off)
271            .log_slow_statements(LevelFilter::Off, Duration::default())
272    }
273
274    #[doc(hidden)]
275    fn __unstable_apply_driver_config(
276        self,
277        _config: &config::drivers::Config,
278    ) -> crate::Result<Self> {
279        Ok(self)
280    }
281}