sqlx_core/connection.rs
1use crate::database::{Database, HasStatementCache};
2use crate::error::Error;
3
4use crate::transaction::{Transaction, TransactionManager};
5use futures_core::future::BoxFuture;
6use log::LevelFilter;
7use std::borrow::Cow;
8use std::fmt::Debug;
9use std::str::FromStr;
10use std::time::Duration;
11use url::Url;
12
13/// Represents a single database connection.
14pub trait Connection: Send {
15 type Database: Database<Connection = Self>;
16
17 type Options: ConnectOptions<Connection = Self>;
18
19 /// Explicitly close this database connection.
20 ///
21 /// This notifies the database server that the connection is closing so that it can
22 /// free up any server-side resources in use.
23 ///
24 /// While connections can simply be dropped to clean up local resources,
25 /// the `Drop` handler itself cannot notify the server that the connection is being closed
26 /// because that may require I/O to send a termination message. That can result in a delay
27 /// before the server learns that the connection is gone, usually from a TCP keepalive timeout.
28 ///
29 /// Creating and dropping many connections in short order without calling `.close()` may
30 /// lead to errors from the database server because those senescent connections will still
31 /// count against any connection limit or quota that is configured.
32 ///
33 /// Therefore it is recommended to call `.close()` on a connection when you are done using it
34 /// and to `.await` the result to ensure the termination message is sent.
35 fn close(self) -> BoxFuture<'static, Result<(), Error>>;
36
37 /// Immediately close the connection without sending a graceful shutdown.
38 ///
39 /// This should still at least send a TCP `FIN` frame to let the server know we're dying.
40 #[doc(hidden)]
41 fn close_hard(self) -> BoxFuture<'static, Result<(), Error>>;
42
43 /// Checks if a connection to the database is still valid.
44 fn ping(&mut self) -> BoxFuture<'_, Result<(), Error>>;
45
46 /// Begin a new transaction or establish a savepoint within the active transaction.
47 ///
48 /// Returns a [`Transaction`] for controlling and tracking the new transaction.
49 fn begin(&mut self) -> BoxFuture<'_, Result<Transaction<'_, Self::Database>, Error>>
50 where
51 Self: Sized;
52
53 /// Begin a new transaction with a custom statement.
54 ///
55 /// Returns a [`Transaction`] for controlling and tracking the new transaction.
56 ///
57 /// Returns an error if the connection is already in a transaction or if
58 /// `statement` does not put the connection into a transaction.
59 fn begin_with(
60 &mut self,
61 statement: impl Into<Cow<'static, str>>,
62 ) -> BoxFuture<'_, Result<Transaction<'_, Self::Database>, Error>>
63 where
64 Self: Sized,
65 {
66 Transaction::begin(self, Some(statement.into()))
67 }
68
69 /// Returns `true` if the connection is currently in a transaction.
70 ///
71 /// # Note: Automatic Rollbacks May Not Be Counted
72 /// Certain database errors (such as a serializable isolation failure)
73 /// can cause automatic rollbacks of a transaction
74 /// which may not be indicated in the return value of this method.
75 #[inline]
76 fn is_in_transaction(&self) -> bool {
77 <Self::Database as Database>::TransactionManager::get_transaction_depth(self) != 0
78 }
79
80 /// Execute the function inside a transaction.
81 ///
82 /// If the function returns an error, the transaction will be rolled back. If it does not
83 /// return an error, the transaction will be committed.
84 ///
85 /// # Example
86 ///
87 /// ```rust
88 /// use sqlx::postgres::{PgConnection, PgRow};
89 /// use sqlx::Connection;
90 ///
91 /// # pub async fn _f(conn: &mut PgConnection) -> sqlx::Result<Vec<PgRow>> {
92 /// conn.transaction(|txn| Box::pin(async move {
93 /// sqlx::query("select * from ..").fetch_all(&mut **txn).await
94 /// })).await
95 /// # }
96 /// ```
97 fn transaction<'a, F, R, E>(&'a mut self, callback: F) -> BoxFuture<'a, Result<R, E>>
98 where
99 for<'c> F: FnOnce(&'c mut Transaction<'_, Self::Database>) -> BoxFuture<'c, Result<R, E>>
100 + 'a
101 + Send
102 + Sync,
103 Self: Sized,
104 R: Send,
105 E: From<Error> + Send,
106 {
107 Box::pin(async move {
108 let mut transaction = self.begin().await?;
109 let ret = callback(&mut transaction).await;
110
111 match ret {
112 Ok(ret) => {
113 transaction.commit().await?;
114
115 Ok(ret)
116 }
117 Err(err) => {
118 transaction.rollback().await?;
119
120 Err(err)
121 }
122 }
123 })
124 }
125
126 /// The number of statements currently cached in the connection.
127 fn cached_statements_size(&self) -> usize
128 where
129 Self::Database: HasStatementCache,
130 {
131 0
132 }
133
134 /// Removes all statements from the cache, closing them on the server if
135 /// needed.
136 fn clear_cached_statements(&mut self) -> BoxFuture<'_, Result<(), Error>>
137 where
138 Self::Database: HasStatementCache,
139 {
140 Box::pin(async move { Ok(()) })
141 }
142
143 /// Restore any buffers in the connection to their default capacity, if possible.
144 ///
145 /// Sending a large query or receiving a resultset with many columns can cause the connection
146 /// to allocate additional buffer space to fit the data which is retained afterwards in
147 /// case it's needed again. This can give the outward appearance of a memory leak, but is
148 /// in fact the intended behavior.
149 ///
150 /// Calling this method tells the connection to release that excess memory if it can,
151 /// though be aware that calling this too often can cause unnecessary thrashing or
152 /// fragmentation in the global allocator. If there's still data in the connection buffers
153 /// (unlikely if the last query was run to completion) then it may need to be moved to
154 /// allow the buffers to shrink.
155 fn shrink_buffers(&mut self);
156
157 #[doc(hidden)]
158 fn flush(&mut self) -> BoxFuture<'_, Result<(), Error>>;
159
160 #[doc(hidden)]
161 fn should_flush(&self) -> bool;
162
163 /// Establish a new database connection.
164 ///
165 /// A value of [`Options`][Self::Options] is parsed from the provided connection string. This parsing
166 /// is database-specific.
167 #[inline]
168 fn connect(url: &str) -> BoxFuture<'static, Result<Self, Error>>
169 where
170 Self: Sized,
171 {
172 let options = url.parse();
173
174 Box::pin(async move { Self::connect_with(&options?).await })
175 }
176
177 /// Establish a new database connection with the provided options.
178 fn connect_with(options: &Self::Options) -> BoxFuture<'_, Result<Self, Error>>
179 where
180 Self: Sized,
181 {
182 options.connect()
183 }
184}
185
186#[derive(Clone, Debug)]
187#[non_exhaustive]
188pub struct LogSettings {
189 pub statements_level: LevelFilter,
190 pub slow_statements_level: LevelFilter,
191 pub slow_statements_duration: Duration,
192}
193
194impl Default for LogSettings {
195 fn default() -> Self {
196 LogSettings {
197 statements_level: LevelFilter::Debug,
198 slow_statements_level: LevelFilter::Warn,
199 slow_statements_duration: Duration::from_secs(1),
200 }
201 }
202}
203
204impl LogSettings {
205 pub fn log_statements(&mut self, level: LevelFilter) {
206 self.statements_level = level;
207 }
208 pub fn log_slow_statements(&mut self, level: LevelFilter, duration: Duration) {
209 self.slow_statements_level = level;
210 self.slow_statements_duration = duration;
211 }
212}
213
214pub trait ConnectOptions: 'static + Send + Sync + FromStr<Err = Error> + Debug + Clone {
215 type Connection: Connection<Options = Self> + ?Sized;
216
217 /// Parse the `ConnectOptions` from a URL.
218 fn from_url(url: &Url) -> Result<Self, Error>;
219
220 /// Get a connection URL that may be used to connect to the same database as this `ConnectOptions`.
221 ///
222 /// ### Note: Lossy
223 /// Any flags or settings which do not have a representation in the URL format will be lost.
224 /// They will fall back to their default settings when the URL is parsed.
225 ///
226 /// The only settings guaranteed to be preserved are:
227 /// * Username
228 /// * Password
229 /// * Hostname
230 /// * Port
231 /// * Database name
232 /// * Unix socket or SQLite database file path
233 /// * SSL mode (if applicable)
234 /// * SSL CA certificate path
235 /// * SSL client certificate path
236 /// * SSL client key path
237 ///
238 /// Additional settings are driver-specific. Refer to the source of a given implementation
239 /// to see which options are preserved in the URL.
240 ///
241 /// ### Panics
242 /// This defaults to `unimplemented!()`.
243 ///
244 /// Individual drivers should override this to implement the intended behavior.
245 fn to_url_lossy(&self) -> Url {
246 unimplemented!()
247 }
248
249 /// Establish a new database connection with the options specified by `self`.
250 fn connect(&self) -> BoxFuture<'_, Result<Self::Connection, Error>>
251 where
252 Self::Connection: Sized;
253
254 /// Log executed statements with the specified `level`
255 fn log_statements(self, level: LevelFilter) -> Self;
256
257 /// Log executed statements with a duration above the specified `duration`
258 /// at the specified `level`.
259 fn log_slow_statements(self, level: LevelFilter, duration: Duration) -> Self;
260
261 /// Entirely disables statement logging (both slow and regular).
262 fn disable_statement_logging(self) -> Self {
263 self.log_statements(LevelFilter::Off)
264 .log_slow_statements(LevelFilter::Off, Duration::default())
265 }
266}