libsql/connection.rs
1use std::collections::VecDeque;
2use std::fmt;
3use std::path::Path;
4use std::sync::Arc;
5use std::time::Duration;
6
7use crate::params::{IntoParams, Params};
8use crate::rows::Rows;
9use crate::statement::Statement;
10use crate::transaction::Transaction;
11use crate::{Result, TransactionBehavior};
12
13#[async_trait::async_trait]
14pub(crate) trait Conn {
15 async fn execute(&self, sql: &str, params: Params) -> Result<u64>;
16
17 async fn execute_batch(&self, sql: &str) -> Result<BatchRows>;
18
19 async fn execute_transactional_batch(&self, sql: &str) -> Result<BatchRows>;
20
21 async fn prepare(&self, sql: &str) -> Result<Statement>;
22
23 async fn transaction(&self, tx_behavior: TransactionBehavior) -> Result<Transaction>;
24
25 fn interrupt(&self) -> Result<()>;
26
27 fn busy_timeout(&self, timeout: Duration) -> Result<()>;
28
29 fn is_autocommit(&self) -> bool;
30
31 fn changes(&self) -> u64;
32
33 fn total_changes(&self) -> u64;
34
35 fn last_insert_rowid(&self) -> i64;
36
37 async fn reset(&self);
38
39 fn enable_load_extension(&self, _onoff: bool) -> Result<()> {
40 Err(crate::Error::LoadExtensionNotSupported)
41 }
42
43 fn load_extension(&self, _dylib_path: &Path, _entry_point: Option<&str>) -> Result<()> {
44 Err(crate::Error::LoadExtensionNotSupported)
45 }
46}
47
48/// A set of rows returned from `execute_batch`/`execute_transactional_batch`. It is essentially
49/// rows of rows for each statement in the batch call.
50///
51/// # Note
52///
53/// All rows will be materialized in memory, if you would like to stream them then use `query`
54/// instead as this is optimized better for memory usage.
55pub struct BatchRows {
56 inner: VecDeque<Option<Rows>>,
57 skip_last_amt: usize,
58}
59
60impl BatchRows {
61 #[allow(unused)]
62 pub(crate) fn empty() -> Self {
63 Self {
64 inner: VecDeque::new(),
65 skip_last_amt: 0,
66 }
67 }
68
69 #[cfg(any(feature = "hrana", feature = "core"))]
70 pub(crate) fn new(rows: Vec<Option<Rows>>) -> Self {
71 Self {
72 inner: rows.into(),
73 skip_last_amt: 0,
74 }
75 }
76
77 #[cfg(feature = "hrana")]
78 pub(crate) fn new_skip_last(rows: Vec<Option<Rows>>, skip_last_amt: usize) -> Self {
79 Self {
80 inner: rows.into(),
81 skip_last_amt,
82 }
83 }
84
85 /// Get the next set of rows, it is wrapped in two options, if the first option returns `None`
86 /// then the set of batch statement results has ended. If the inner option returns `None` then
87 /// the statement was never executed (potentially due to a conditional).
88 pub fn next_stmt_row(&mut self) -> Option<Option<Rows>> {
89 if self.inner.len() <= self.skip_last_amt {
90 return None;
91 }
92
93 self.inner.pop_front()
94 }
95}
96
97impl fmt::Debug for BatchRows {
98 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
99 f.debug_struct("BatchRows").finish()
100 }
101}
102
103/// A connection to some libsql database, this can be a remote one or a local one.
104#[derive(Clone)]
105pub struct Connection {
106 pub(crate) conn: Arc<dyn Conn + Send + Sync>,
107}
108
109impl Connection {
110 /// Execute sql query provided some type that implements [`IntoParams`] returning
111 /// on success the number of rows that were changed.
112 ///
113 /// # Example
114 ///
115 /// ```rust,no_run
116 /// # async fn run(conn: &libsql::Connection) {
117 /// # use libsql::params;
118 /// conn.execute("INSERT INTO foo (id) VALUES (?1)", [42]).await.unwrap();
119 /// conn.execute("INSERT INTO foo (id, name) VALUES (?1, ?2)", params![42, "baz"]).await.unwrap();
120 /// # }
121 /// ```
122 ///
123 /// For more info on how to pass params check [`IntoParams`]'s docs.
124 pub async fn execute(&self, sql: &str, params: impl IntoParams) -> Result<u64> {
125 tracing::trace!("executing `{}`", sql);
126 self.conn.execute(sql, params.into_params()?).await
127 }
128
129 /// Execute a batch set of statements.
130 ///
131 /// # Return
132 ///
133 /// This returns a `BatchRows` currently only the `remote` and `local` connection supports this feature and
134 /// all other connection types will return an empty set always.
135 pub async fn execute_batch(&self, sql: &str) -> Result<BatchRows> {
136 tracing::trace!("executing batch `{}`", sql);
137 self.conn.execute_batch(sql).await
138 }
139
140 /// Execute a batch set of statements atomically in a transaction.
141 ///
142 /// # Return
143 ///
144 /// This returns a `BatchRows` currently only the `remote` and `local` connection supports this feature and
145 /// all other connection types will return an empty set always.
146 pub async fn execute_transactional_batch(&self, sql: &str) -> Result<BatchRows> {
147 tracing::trace!("executing batch transactional `{}`", sql);
148 self.conn.execute_transactional_batch(sql).await
149 }
150
151 /// Execute sql query provided some type that implements [`IntoParams`] returning
152 /// on success the [`Rows`].
153 ///
154 /// # Example
155 ///
156 /// ```rust,no_run
157 /// # async fn run(conn: &libsql::Connection) {
158 /// # use libsql::params;
159 /// conn.query("SELECT foo FROM bar WHERE id = ?1", [42]).await.unwrap();
160 /// conn.query("SELECT foo FROM bar WHERE id = ?1 AND name = ?2", params![42, "baz"]).await.unwrap();
161 /// # }
162 /// ```
163 /// For more info on how to pass params check [`IntoParams`]'s docs and on how to
164 /// extract values out of the rows check the [`Rows`] docs.
165 pub async fn query(&self, sql: &str, params: impl IntoParams) -> Result<Rows> {
166 let mut stmt = self.prepare(sql).await?;
167
168 stmt.query(params).await
169 }
170
171 /// Prepares a cached statement.
172 pub async fn prepare(&self, sql: &str) -> Result<Statement> {
173 tracing::trace!("preparing `{}`", sql);
174 self.conn.prepare(sql).await
175 }
176
177 /// Begin a new transaction in `DEFERRED` mode, which is the default.
178 pub async fn transaction(&self) -> Result<Transaction> {
179 tracing::trace!("starting deferred transaction");
180 self.transaction_with_behavior(TransactionBehavior::Deferred)
181 .await
182 }
183
184 /// Begin a new transaction in the given [`TransactionBehavior`].
185 pub async fn transaction_with_behavior(
186 &self,
187 tx_behavior: TransactionBehavior,
188 ) -> Result<Transaction> {
189 tracing::trace!("starting {:?} transaction", tx_behavior);
190 self.conn.transaction(tx_behavior).await
191 }
192
193 /// Cancel ongoing operations and return at earliest opportunity.
194 pub fn interrupt(&self) -> Result<()> {
195 self.conn.interrupt()
196 }
197
198 pub fn busy_timeout(&self, timeout: Duration) -> Result<()> {
199 self.conn.busy_timeout(timeout)
200 }
201
202 /// Check weather libsql is in `autocommit` or not.
203 pub fn is_autocommit(&self) -> bool {
204 self.conn.is_autocommit()
205 }
206
207 /// Check the amount of changes the last query created.
208 pub fn changes(&self) -> u64 {
209 self.conn.changes()
210 }
211
212 /// Check the total amount of changes the connection has done.
213 pub fn total_changes(&self) -> u64 {
214 self.conn.total_changes()
215 }
216
217 /// Check the last inserted row id.
218 pub fn last_insert_rowid(&self) -> i64 {
219 self.conn.last_insert_rowid()
220 }
221
222 pub async fn reset(&self) {
223 self.conn.reset().await
224 }
225
226 /// Enable loading SQLite extensions from SQL queries and Rust API.
227 ///
228 /// See [`load_extension`](Connection::load_extension) documentation for more details.
229 pub fn load_extension_enable(&self) -> Result<()> {
230 self.conn.enable_load_extension(true)
231 }
232
233 /// Disable loading SQLite extensions from SQL queries and Rust API.
234 ///
235 /// See [`load_extension`](Connection::load_extension) documentation for more details.
236 pub fn load_extension_disable(&self) -> Result<()> {
237 self.conn.enable_load_extension(false)
238 }
239
240 /// Load a SQLite extension from a dynamic library at `dylib_path`, specifying optional
241 /// entry point `entry_point`.
242 ///
243 /// # Security
244 ///
245 /// Loading extensions from dynamic libraries is a potential security risk, as it allows
246 /// arbitrary code execution. Only load extensions that you trust.
247 ///
248 /// Extension loading is disabled by default. Please use the [`load_extension_enable`](Connection::load_extension_enable)
249 /// method to enable it. It's recommended to disable extension loading after you're done
250 /// loading extensions to avoid SQL injection attacks from loading extensions.
251 ///
252 /// See SQLite's documentation on `sqlite3_load_extension` for more information:
253 /// https://sqlite.org/c3ref/load_extension.html
254 pub fn load_extension<P: AsRef<Path>>(
255 &self,
256 dylib_path: P,
257 entry_point: Option<&str>,
258 ) -> Result<()> {
259 self.conn.load_extension(dylib_path.as_ref(), entry_point)
260 }
261}
262
263impl fmt::Debug for Connection {
264 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
265 f.debug_struct("Connection").finish()
266 }
267}