libsql/
connection.rs

1use std::collections::VecDeque;
2use std::fmt;
3use std::path::Path;
4use std::sync::Arc;
5use std::time::Duration;
6
7use crate::params::{IntoParams, Params};
8use crate::rows::Rows;
9use crate::statement::Statement;
10use crate::transaction::Transaction;
11use crate::{Result, TransactionBehavior};
12
13#[async_trait::async_trait]
14pub(crate) trait Conn {
15    async fn execute(&self, sql: &str, params: Params) -> Result<u64>;
16
17    async fn execute_batch(&self, sql: &str) -> Result<BatchRows>;
18
19    async fn execute_transactional_batch(&self, sql: &str) -> Result<BatchRows>;
20
21    async fn prepare(&self, sql: &str) -> Result<Statement>;
22
23    async fn transaction(&self, tx_behavior: TransactionBehavior) -> Result<Transaction>;
24
25    fn interrupt(&self) -> Result<()>;
26
27    fn busy_timeout(&self, timeout: Duration) -> Result<()>;
28
29    fn is_autocommit(&self) -> bool;
30
31    fn changes(&self) -> u64;
32
33    fn total_changes(&self) -> u64;
34
35    fn last_insert_rowid(&self) -> i64;
36
37    async fn reset(&self);
38
39    fn enable_load_extension(&self, _onoff: bool) -> Result<()> {
40        Err(crate::Error::LoadExtensionNotSupported)
41    }
42
43    fn load_extension(&self, _dylib_path: &Path, _entry_point: Option<&str>) -> Result<()> {
44        Err(crate::Error::LoadExtensionNotSupported)
45    }
46}
47
48/// A set of rows returned from `execute_batch`/`execute_transactional_batch`. It is essentially
49/// rows of rows for each statement in the batch call.
50///
51/// # Note
52///
53/// All rows will be materialized in memory, if you would like to stream them then use `query`
54/// instead as this is optimized better for memory usage.
55pub struct BatchRows {
56    inner: VecDeque<Option<Rows>>,
57    skip_last_amt: usize,
58}
59
60impl BatchRows {
61    #[allow(unused)]
62    pub(crate) fn empty() -> Self {
63        Self {
64            inner: VecDeque::new(),
65            skip_last_amt: 0,
66        }
67    }
68
69    #[cfg(any(feature = "hrana", feature = "core"))]
70    pub(crate) fn new(rows: Vec<Option<Rows>>) -> Self {
71        Self {
72            inner: rows.into(),
73            skip_last_amt: 0,
74        }
75    }
76
77    #[cfg(feature = "hrana")]
78    pub(crate) fn new_skip_last(rows: Vec<Option<Rows>>, skip_last_amt: usize) -> Self {
79        Self {
80            inner: rows.into(),
81            skip_last_amt,
82        }
83    }
84
85    /// Get the next set of rows, it is wrapped in two options, if the first option returns `None`
86    /// then the set of batch statement results has ended. If the inner option returns `None` then
87    /// the statement was never executed (potentially due to a conditional).
88    pub fn next_stmt_row(&mut self) -> Option<Option<Rows>> {
89        if self.inner.len() <= self.skip_last_amt {
90            return None;
91        }
92
93        self.inner.pop_front()
94    }
95}
96
97impl fmt::Debug for BatchRows {
98    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
99        f.debug_struct("BatchRows").finish()
100    }
101}
102
103/// A connection to some libsql database, this can be a remote one or a local one.
104#[derive(Clone)]
105pub struct Connection {
106    pub(crate) conn: Arc<dyn Conn + Send + Sync>,
107}
108
109impl Connection {
110    /// Execute sql query provided some type that implements [`IntoParams`] returning
111    /// on success the number of rows that were changed.
112    ///
113    /// # Example
114    ///
115    /// ```rust,no_run
116    /// # async fn run(conn: &libsql::Connection) {
117    /// # use libsql::params;
118    /// conn.execute("INSERT INTO foo (id) VALUES (?1)", [42]).await.unwrap();
119    /// conn.execute("INSERT INTO foo (id, name) VALUES (?1, ?2)", params![42, "baz"]).await.unwrap();
120    /// # }
121    /// ```
122    ///
123    /// For more info on how to pass params check [`IntoParams`]'s docs.
124    pub async fn execute(&self, sql: &str, params: impl IntoParams) -> Result<u64> {
125        tracing::trace!("executing `{}`", sql);
126        self.conn.execute(sql, params.into_params()?).await
127    }
128
129    /// Execute a batch set of statements.
130    ///
131    /// # Return
132    ///
133    /// This returns a `BatchRows` currently only the `remote`  and `local` connection supports this feature and
134    /// all other connection types will return an empty set always.
135    pub async fn execute_batch(&self, sql: &str) -> Result<BatchRows> {
136        tracing::trace!("executing batch `{}`", sql);
137        self.conn.execute_batch(sql).await
138    }
139
140    /// Execute a batch set of statements atomically in a transaction.
141    ///
142    /// # Return
143    ///
144    /// This returns a `BatchRows` currently only the `remote` and `local` connection supports this feature and
145    /// all other connection types will return an empty set always.
146    pub async fn execute_transactional_batch(&self, sql: &str) -> Result<BatchRows> {
147        tracing::trace!("executing batch transactional `{}`", sql);
148        self.conn.execute_transactional_batch(sql).await
149    }
150
151    /// Execute sql query provided some type that implements [`IntoParams`] returning
152    /// on success the [`Rows`].
153    ///
154    /// # Example
155    ///
156    /// ```rust,no_run
157    /// # async fn run(conn: &libsql::Connection) {
158    /// # use libsql::params;
159    /// conn.query("SELECT foo FROM bar WHERE id = ?1", [42]).await.unwrap();
160    /// conn.query("SELECT foo FROM bar WHERE id = ?1 AND name = ?2", params![42, "baz"]).await.unwrap();
161    /// # }
162    /// ```
163    /// For more info on how to pass params check [`IntoParams`]'s docs and on how to
164    /// extract values out of the rows check the [`Rows`] docs.
165    pub async fn query(&self, sql: &str, params: impl IntoParams) -> Result<Rows> {
166        let mut stmt = self.prepare(sql).await?;
167
168        stmt.query(params).await
169    }
170
171    /// Prepares a cached statement.
172    pub async fn prepare(&self, sql: &str) -> Result<Statement> {
173        tracing::trace!("preparing `{}`", sql);
174        self.conn.prepare(sql).await
175    }
176
177    /// Begin a new transaction in `DEFERRED` mode, which is the default.
178    pub async fn transaction(&self) -> Result<Transaction> {
179        tracing::trace!("starting deferred transaction");
180        self.transaction_with_behavior(TransactionBehavior::Deferred)
181            .await
182    }
183
184    /// Begin a new transaction in the given [`TransactionBehavior`].
185    pub async fn transaction_with_behavior(
186        &self,
187        tx_behavior: TransactionBehavior,
188    ) -> Result<Transaction> {
189        tracing::trace!("starting {:?} transaction", tx_behavior);
190        self.conn.transaction(tx_behavior).await
191    }
192
193    /// Cancel ongoing operations and return at earliest opportunity.
194    pub fn interrupt(&self) -> Result<()> {
195        self.conn.interrupt()
196    }
197
198    pub fn busy_timeout(&self, timeout: Duration) -> Result<()> {
199        self.conn.busy_timeout(timeout)
200    }
201
202    /// Check weather libsql is in `autocommit` or not.
203    pub fn is_autocommit(&self) -> bool {
204        self.conn.is_autocommit()
205    }
206
207    /// Check the amount of changes the last query created.
208    pub fn changes(&self) -> u64 {
209        self.conn.changes()
210    }
211
212    /// Check the total amount of changes the connection has done.
213    pub fn total_changes(&self) -> u64 {
214        self.conn.total_changes()
215    }
216
217    /// Check the last inserted row id.
218    pub fn last_insert_rowid(&self) -> i64 {
219        self.conn.last_insert_rowid()
220    }
221
222    pub async fn reset(&self) {
223        self.conn.reset().await
224    }
225
226    /// Enable loading SQLite extensions from SQL queries and Rust API.
227    ///
228    /// See [`load_extension`](Connection::load_extension) documentation for more details.
229    pub fn load_extension_enable(&self) -> Result<()> {
230        self.conn.enable_load_extension(true)
231    }
232
233    /// Disable loading SQLite extensions from SQL queries and Rust API.
234    ///
235    /// See [`load_extension`](Connection::load_extension) documentation for more details.
236    pub fn load_extension_disable(&self) -> Result<()> {
237        self.conn.enable_load_extension(false)
238    }
239
240    /// Load a SQLite extension from a dynamic library at `dylib_path`, specifying optional
241    /// entry point `entry_point`.
242    ///
243    /// # Security
244    ///
245    /// Loading extensions from dynamic libraries is a potential security risk, as it allows
246    /// arbitrary code execution. Only load extensions that you trust.
247    ///
248    /// Extension loading is disabled by default. Please use the [`load_extension_enable`](Connection::load_extension_enable)
249    /// method to enable it. It's recommended to disable extension loading after you're done
250    /// loading extensions to avoid SQL injection attacks from loading extensions.
251    ///
252    /// See SQLite's documentation on `sqlite3_load_extension` for more information:
253    /// https://sqlite.org/c3ref/load_extension.html
254    pub fn load_extension<P: AsRef<Path>>(
255        &self,
256        dylib_path: P,
257        entry_point: Option<&str>,
258    ) -> Result<()> {
259        self.conn.load_extension(dylib_path.as_ref(), entry_point)
260    }
261}
262
263impl fmt::Debug for Connection {
264    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
265        f.debug_struct("Connection").finish()
266    }
267}