turso/
lib.rs

1//! # Turso bindings for Rust
2//!
3//! Turso is an in-process SQL database engine, compatible with SQLite.
4//!
5//! ## Getting Started
6//!
7//! To get started, you first need to create a [`Database`] object and then open a [`Connection`] to it, which you use to query:
8//!
9//! ```rust,no_run
10//! # async fn run() {
11//! use turso::Builder;
12//!
13//! let db = Builder::new_local(":memory:").build().await.unwrap();
14//! let conn = db.connect().unwrap();
15//! conn.execute("CREATE TABLE IF NOT EXISTS users (email TEXT)", ()).await.unwrap();
16//! conn.execute("INSERT INTO users (email) VALUES ('alice@example.org')", ()).await.unwrap();
17//! # }
18//! ```
19//!
20//! You can also prepare statements with the [`Connection`] object and then execute the [`Statement`] objects:
21//!
22//! ```rust,no_run
23//! # async fn run() {
24//! # use turso::Builder;
25//! # let db = Builder::new_local(":memory:").build().await.unwrap();
26//! # let conn = db.connect().unwrap();
27//! let mut stmt = conn.prepare("SELECT * FROM users WHERE email = ?1").await.unwrap();
28//! let mut rows = stmt.query(["foo@example.com"]).await.unwrap();
29//! let row = rows.next().await.unwrap().unwrap();
30//! let value = row.get_value(0).unwrap();
31//! println!("Row: {:?}", value);
32//! # }
33//! ```
34
35pub mod params;
36mod rows;
37pub mod transaction;
38pub mod value;
39
40use transaction::TransactionBehavior;
41#[cfg(feature = "conn_raw_api")]
42use turso_core::types::WalFrameInfo;
43pub use value::Value;
44
45pub use params::params_from_iter;
46pub use params::IntoParams;
47
48use std::fmt::Debug;
49use std::num::NonZero;
50use std::sync::{Arc, Mutex};
51
52// Re-exports rows
53pub use crate::rows::{Row, Rows};
54
55#[derive(Debug, thiserror::Error)]
56pub enum Error {
57    #[error("SQL conversion failure: `{0}`")]
58    ToSqlConversionFailure(BoxError),
59    #[error("Mutex lock error: {0}")]
60    MutexError(String),
61    #[error("SQL execution failure: `{0}`")]
62    SqlExecutionFailure(String),
63    #[error("WAL operation error: `{0}`")]
64    WalOperationError(String),
65    #[error("Query returned no rows")]
66    QueryReturnedNoRows,
67    #[error("Conversion failure: `{0}`")]
68    ConversionFailure(String),
69}
70
71impl From<turso_core::LimboError> for Error {
72    fn from(err: turso_core::LimboError) -> Self {
73        Error::SqlExecutionFailure(err.to_string())
74    }
75}
76
77pub(crate) type BoxError = Box<dyn std::error::Error + Send + Sync>;
78
79pub type Result<T> = std::result::Result<T, Error>;
80
81/// A builder for `Database`.
82pub struct Builder {
83    path: String,
84    enable_mvcc: bool,
85    vfs: Option<String>,
86}
87
88impl Builder {
89    /// Create a new local database.
90    pub fn new_local(path: &str) -> Self {
91        Self {
92            path: path.to_string(),
93            enable_mvcc: false,
94            vfs: None,
95        }
96    }
97
98    pub fn with_mvcc(mut self, mvcc_enabled: bool) -> Self {
99        self.enable_mvcc = mvcc_enabled;
100        self
101    }
102
103    pub fn with_io(mut self, vfs: String) -> Self {
104        self.vfs = Some(vfs);
105        self
106    }
107
108    /// Build the database.
109    #[allow(unused_variables, clippy::arc_with_non_send_sync)]
110    pub async fn build(self) -> Result<Database> {
111        let io = self.get_io()?;
112        let db = turso_core::Database::open_file(io, self.path.as_str(), self.enable_mvcc, true)?;
113        Ok(Database { inner: db })
114    }
115
116    fn get_io(&self) -> Result<Arc<dyn turso_core::IO>> {
117        let vfs_choice = self.vfs.as_deref().unwrap_or("");
118
119        if self.path == ":memory:" && vfs_choice.is_empty() {
120            return Ok(Arc::new(turso_core::MemoryIO::new()));
121        }
122
123        match vfs_choice {
124            "memory" => Ok(Arc::new(turso_core::MemoryIO::new())),
125            "syscall" => {
126                #[cfg(target_family = "unix")]
127                {
128                    Ok(Arc::new(
129                        turso_core::UnixIO::new()
130                            .map_err(|e| Error::SqlExecutionFailure(e.to_string()))?,
131                    ))
132                }
133                #[cfg(not(target_family = "unix"))]
134                {
135                    Ok(Arc::new(
136                        turso_core::PlatformIO::new()
137                            .map_err(|e| Error::SqlExecutionFailure(e.to_string()))?,
138                    ))
139                }
140            }
141            #[cfg(target_os = "linux")]
142            "io_uring" => Ok(Arc::new(
143                turso_core::UringIO::new()
144                    .map_err(|e| Error::SqlExecutionFailure(e.to_string()))?,
145            )),
146            #[cfg(not(target_os = "linux"))]
147            "io_uring" => Err(Error::SqlExecutionFailure(
148                "io_uring is only available on Linux targets".to_string(),
149            )),
150            "" => {
151                // Default behavior: memory for ":memory:", platform IO for files
152                if self.path == ":memory:" {
153                    Ok(Arc::new(turso_core::MemoryIO::new()))
154                } else {
155                    Ok(Arc::new(
156                        turso_core::PlatformIO::new()
157                            .map_err(|e| Error::SqlExecutionFailure(e.to_string()))?,
158                    ))
159                }
160            }
161            _ => Ok(Arc::new(
162                turso_core::PlatformIO::new()
163                    .map_err(|e| Error::SqlExecutionFailure(e.to_string()))?,
164            )),
165        }
166    }
167}
168
169/// A database.
170///
171/// The `Database` object points to a database and allows you to connect to it
172#[derive(Clone)]
173pub struct Database {
174    inner: Arc<turso_core::Database>,
175}
176
177unsafe impl Send for Database {}
178unsafe impl Sync for Database {}
179
180impl Debug for Database {
181    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
182        f.debug_struct("Database").finish()
183    }
184}
185
186impl Database {
187    /// Connect to the database.
188    pub fn connect(&self) -> Result<Connection> {
189        let conn = self.inner.connect()?;
190        Ok(Connection::create(conn))
191    }
192}
193
194/// A database connection.
195pub struct Connection {
196    inner: Arc<Mutex<Arc<turso_core::Connection>>>,
197    transaction_behavior: TransactionBehavior,
198}
199
200impl Clone for Connection {
201    fn clone(&self) -> Self {
202        Self {
203            inner: Arc::clone(&self.inner),
204            transaction_behavior: self.transaction_behavior,
205        }
206    }
207}
208
209unsafe impl Send for Connection {}
210unsafe impl Sync for Connection {}
211
212impl Connection {
213    pub fn create(conn: Arc<turso_core::Connection>) -> Self {
214        #[allow(clippy::arc_with_non_send_sync)]
215        let connection = Connection {
216            inner: Arc::new(Mutex::new(conn)),
217            transaction_behavior: TransactionBehavior::Deferred,
218        };
219        connection
220    }
221    /// Query the database with SQL.
222    pub async fn query(&self, sql: &str, params: impl IntoParams) -> Result<Rows> {
223        let mut stmt = self.prepare(sql).await?;
224        stmt.query(params).await
225    }
226
227    /// Execute SQL statement on the database.
228    pub async fn execute(&self, sql: &str, params: impl IntoParams) -> Result<u64> {
229        let mut stmt = self.prepare(sql).await?;
230        stmt.execute(params).await
231    }
232
233    #[cfg(feature = "conn_raw_api")]
234    pub fn wal_frame_count(&self) -> Result<u64> {
235        let conn = self
236            .inner
237            .lock()
238            .map_err(|e| Error::MutexError(e.to_string()))?;
239        conn.wal_state()
240            .map_err(|e| Error::WalOperationError(format!("wal_insert_begin failed: {e}")))
241            .map(|state| state.max_frame)
242    }
243
244    #[cfg(feature = "conn_raw_api")]
245    pub fn try_wal_watermark_read_page(
246        &self,
247        page_idx: u32,
248        page: &mut [u8],
249        frame_watermark: Option<u64>,
250    ) -> Result<bool> {
251        let conn = self
252            .inner
253            .lock()
254            .map_err(|e| Error::MutexError(e.to_string()))?;
255        conn.try_wal_watermark_read_page(page_idx, page, frame_watermark)
256            .map_err(|e| {
257                Error::WalOperationError(format!("try_wal_watermark_read_page failed: {e}"))
258            })
259    }
260
261    #[cfg(feature = "conn_raw_api")]
262    pub fn wal_changed_pages_after(&self, frame_watermark: u64) -> Result<Vec<u32>> {
263        let conn = self
264            .inner
265            .lock()
266            .map_err(|e| Error::MutexError(e.to_string()))?;
267        conn.wal_changed_pages_after(frame_watermark)
268            .map_err(|e| Error::WalOperationError(format!("wal_changed_pages_after failed: {e}")))
269    }
270
271    #[cfg(feature = "conn_raw_api")]
272    pub fn wal_insert_begin(&self) -> Result<()> {
273        let conn = self
274            .inner
275            .lock()
276            .map_err(|e| Error::MutexError(e.to_string()))?;
277        conn.wal_insert_begin()
278            .map_err(|e| Error::WalOperationError(format!("wal_insert_begin failed: {e}")))
279    }
280
281    #[cfg(feature = "conn_raw_api")]
282    pub fn wal_insert_end(&self, force_commit: bool) -> Result<()> {
283        let conn = self
284            .inner
285            .lock()
286            .map_err(|e| Error::MutexError(e.to_string()))?;
287        conn.wal_insert_end(force_commit)
288            .map_err(|e| Error::WalOperationError(format!("wal_insert_end failed: {e}")))
289    }
290
291    #[cfg(feature = "conn_raw_api")]
292    pub fn wal_insert_frame(&self, frame_no: u64, frame: &[u8]) -> Result<WalFrameInfo> {
293        let conn = self
294            .inner
295            .lock()
296            .map_err(|e| Error::MutexError(e.to_string()))?;
297        conn.wal_insert_frame(frame_no, frame)
298            .map_err(|e| Error::WalOperationError(format!("wal_insert_frame failed: {e}")))
299    }
300
301    #[cfg(feature = "conn_raw_api")]
302    pub fn wal_get_frame(&self, frame_no: u64, frame: &mut [u8]) -> Result<WalFrameInfo> {
303        let conn = self
304            .inner
305            .lock()
306            .map_err(|e| Error::MutexError(e.to_string()))?;
307        conn.wal_get_frame(frame_no, frame)
308            .map_err(|e| Error::WalOperationError(format!("wal_insert_frame failed: {e}")))
309    }
310
311    /// Execute a batch of SQL statements on the database.
312    pub async fn execute_batch(&self, sql: &str) -> Result<()> {
313        self.prepare_execute_batch(sql).await?;
314        Ok(())
315    }
316
317    /// Prepare a SQL statement for later execution.
318    pub async fn prepare(&self, sql: &str) -> Result<Statement> {
319        let conn = self
320            .inner
321            .lock()
322            .map_err(|e| Error::MutexError(e.to_string()))?;
323
324        let stmt = conn.prepare(sql)?;
325
326        #[allow(clippy::arc_with_non_send_sync)]
327        let statement = Statement {
328            inner: Arc::new(Mutex::new(stmt)),
329        };
330        Ok(statement)
331    }
332
333    async fn prepare_execute_batch(&self, sql: impl AsRef<str>) -> Result<()> {
334        let conn = self
335            .inner
336            .lock()
337            .map_err(|e| Error::MutexError(e.to_string()))?;
338        conn.prepare_execute_batch(sql)?;
339        Ok(())
340    }
341
342    /// Query a pragma.
343    pub fn pragma_query<F>(&self, pragma_name: &str, mut f: F) -> Result<()>
344    where
345        F: FnMut(&Row) -> turso_core::Result<()>,
346    {
347        let conn = self
348            .inner
349            .lock()
350            .map_err(|e| Error::MutexError(e.to_string()))?;
351
352        let rows: Vec<Row> = conn
353            .pragma_query(pragma_name)
354            .map_err(|e| Error::SqlExecutionFailure(e.to_string()))?
355            .iter()
356            .map(|row| row.iter().collect::<Row>())
357            .collect();
358
359        rows.iter().try_for_each(|row| {
360            f(row).map_err(|e| {
361                Error::SqlExecutionFailure(format!("Error executing user defined function: {e}"))
362            })
363        })?;
364        Ok(())
365    }
366
367    /// Returns the rowid of the last row inserted.
368    pub fn last_insert_rowid(&self) -> i64 {
369        let conn = self.inner.lock().unwrap();
370        conn.last_insert_rowid()
371    }
372
373    /// Flush dirty pages to disk.
374    /// This will write the dirty pages to the WAL.
375    pub fn cacheflush(&self) -> Result<()> {
376        let conn = self
377            .inner
378            .lock()
379            .map_err(|e| Error::MutexError(e.to_string()))?;
380        let completions = conn.cacheflush()?;
381        let pager = conn.get_pager();
382        for c in completions {
383            pager.io.wait_for_completion(c)?;
384        }
385        Ok(())
386    }
387
388    pub fn is_autocommit(&self) -> Result<bool> {
389        let conn = self
390            .inner
391            .lock()
392            .map_err(|e| Error::MutexError(e.to_string()))?;
393
394        Ok(conn.get_auto_commit())
395    }
396}
397
398impl Debug for Connection {
399    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
400        f.debug_struct("Connection").finish()
401    }
402}
403
404/// A prepared statement.
405pub struct Statement {
406    inner: Arc<Mutex<turso_core::Statement>>,
407}
408
409impl Clone for Statement {
410    fn clone(&self) -> Self {
411        Self {
412            inner: Arc::clone(&self.inner),
413        }
414    }
415}
416
417unsafe impl Send for Statement {}
418unsafe impl Sync for Statement {}
419
420impl Statement {
421    /// Query the database with this prepared statement.
422    pub async fn query(&mut self, params: impl IntoParams) -> Result<Rows> {
423        let params = params.into_params()?;
424        match params {
425            params::Params::None => (),
426            params::Params::Positional(values) => {
427                for (i, value) in values.into_iter().enumerate() {
428                    let mut stmt = self.inner.lock().unwrap();
429                    stmt.bind_at(NonZero::new(i + 1).unwrap(), value.into());
430                }
431            }
432            params::Params::Named(values) => {
433                for (name, value) in values.into_iter() {
434                    let mut stmt = self.inner.lock().unwrap();
435                    let i = stmt.parameters().index(name).unwrap();
436                    stmt.bind_at(i, value.into());
437                }
438            }
439        }
440        let rows = Rows::new(&self.inner);
441        Ok(rows)
442    }
443
444    /// Execute this prepared statement.
445    pub async fn execute(&mut self, params: impl IntoParams) -> Result<u64> {
446        {
447            // Reset the statement before executing
448            self.inner.lock().unwrap().reset();
449        }
450        let params = params.into_params()?;
451        match params {
452            params::Params::None => (),
453            params::Params::Positional(values) => {
454                for (i, value) in values.into_iter().enumerate() {
455                    let mut stmt = self.inner.lock().unwrap();
456                    stmt.bind_at(NonZero::new(i + 1).unwrap(), value.into());
457                }
458            }
459            params::Params::Named(values) => {
460                for (name, value) in values.into_iter() {
461                    let mut stmt = self.inner.lock().unwrap();
462                    let i = stmt.parameters().index(name).unwrap();
463                    stmt.bind_at(i, value.into());
464                }
465            }
466        }
467        loop {
468            let mut stmt = self.inner.lock().unwrap();
469            match stmt.step() {
470                Ok(turso_core::StepResult::Row) => {
471                    return Err(Error::SqlExecutionFailure(
472                        "unexpected row during execution".to_string(),
473                    ));
474                }
475                Ok(turso_core::StepResult::Done) => {
476                    let changes = stmt.n_change();
477                    assert!(changes >= 0);
478                    return Ok(changes as u64);
479                }
480                Ok(turso_core::StepResult::IO) => {
481                    stmt.run_once()?;
482                }
483                Ok(turso_core::StepResult::Busy) => {
484                    return Err(Error::SqlExecutionFailure("database is locked".to_string()));
485                }
486                Ok(turso_core::StepResult::Interrupt) => {
487                    return Err(Error::SqlExecutionFailure("interrupted".to_string()));
488                }
489                Err(err) => {
490                    return Err(err.into());
491                }
492            }
493        }
494    }
495
496    /// Returns columns of the result of this prepared statement.
497    pub fn columns(&self) -> Vec<Column> {
498        let stmt = self.inner.lock().unwrap();
499
500        let n = stmt.num_columns();
501
502        let mut cols = Vec::with_capacity(n);
503
504        for i in 0..n {
505            let name = stmt.get_column_name(i).into_owned();
506            cols.push(Column {
507                name,
508                decl_type: None, // TODO
509            });
510        }
511
512        cols
513    }
514
515    /// Reset internal statement state after previous execution so it can be reused again
516    pub fn reset(&self) {
517        let mut stmt = self.inner.lock().unwrap();
518        stmt.reset();
519    }
520
521    /// Execute a query that returns the first [`Row`].
522    ///
523    /// # Errors
524    ///
525    /// - Returns `QueryReturnedNoRows` if no rows were returned.
526    pub async fn query_row(&mut self, params: impl IntoParams) -> Result<Row> {
527        let mut rows = self.query(params).await?;
528
529        rows.next().await?.ok_or(Error::QueryReturnedNoRows)
530    }
531}
532
533/// Column information.
534pub struct Column {
535    name: String,
536    decl_type: Option<String>,
537}
538
539impl Column {
540    /// Return the name of the column.
541    pub fn name(&self) -> &str {
542        &self.name
543    }
544
545    /// Returns the type of the column.
546    pub fn decl_type(&self) -> Option<&str> {
547        self.decl_type.as_deref()
548    }
549}
550
551pub trait IntoValue {
552    fn into_value(self) -> Result<Value>;
553}
554
555#[derive(Debug, Clone)]
556pub enum Params {
557    None,
558    Positional(Vec<Value>),
559    Named(Vec<(String, Value)>),
560}
561
562pub struct Transaction {}
563
564#[cfg(test)]
565mod tests {
566    use super::*;
567    use tempfile::NamedTempFile;
568
569    #[tokio::test]
570    async fn test_database_persistence() -> Result<()> {
571        let temp_file = NamedTempFile::new().unwrap();
572        let db_path = temp_file.path().to_str().unwrap();
573
574        // First, create the database, a table, and insert some data
575        {
576            let db = Builder::new_local(db_path).build().await?;
577            let conn = db.connect()?;
578            conn.execute(
579                "CREATE TABLE test_persistence (id INTEGER PRIMARY KEY, name TEXT NOT NULL);",
580                (),
581            )
582            .await?;
583            conn.execute("INSERT INTO test_persistence (name) VALUES ('Alice');", ())
584                .await?;
585            conn.execute("INSERT INTO test_persistence (name) VALUES ('Bob');", ())
586                .await?;
587        } // db and conn are dropped here, simulating closing
588
589        // Now, re-open the database and check if the data is still there
590        let db = Builder::new_local(db_path).build().await?;
591        let conn = db.connect()?;
592
593        let mut rows = conn
594            .query("SELECT name FROM test_persistence ORDER BY id;", ())
595            .await?;
596
597        let row1 = rows.next().await?.expect("Expected first row");
598        assert_eq!(row1.get_value(0)?, Value::Text("Alice".to_string()));
599
600        let row2 = rows.next().await?.expect("Expected second row");
601        assert_eq!(row2.get_value(0)?, Value::Text("Bob".to_string()));
602
603        assert!(rows.next().await?.is_none(), "Expected no more rows");
604
605        Ok(())
606    }
607
608    #[tokio::test]
609    async fn test_database_persistence_many_frames() -> Result<()> {
610        let temp_file = NamedTempFile::new().unwrap();
611        let db_path = temp_file.path().to_str().unwrap();
612
613        const NUM_INSERTS: usize = 100;
614        const TARGET_STRING_LEN: usize = 1024; // 1KB
615
616        let mut original_data = Vec::with_capacity(NUM_INSERTS);
617        for i in 0..NUM_INSERTS {
618            let prefix = format!("test_string_{i:04}_");
619            let padding_len = TARGET_STRING_LEN.saturating_sub(prefix.len());
620            let padding: String = "A".repeat(padding_len);
621            original_data.push(format!("{prefix}{padding}"));
622        }
623
624        // First, create the database, a table, and insert many large strings
625        {
626            let db = Builder::new_local(db_path).build().await?;
627            let conn = db.connect()?;
628            conn.execute(
629                "CREATE TABLE test_large_persistence (id INTEGER PRIMARY KEY AUTOINCREMENT, data TEXT NOT NULL);",
630                (),
631            )
632            .await?;
633
634            for data_val in &original_data {
635                conn.execute(
636                    "INSERT INTO test_large_persistence (data) VALUES (?);",
637                    params::Params::Positional(vec![Value::Text(data_val.clone())]),
638                )
639                .await?;
640            }
641        } // db and conn are dropped here, simulating closing
642
643        {
644            // Now, re-open the database and check if the data is still there
645            let db = Builder::new_local(db_path).build().await?;
646            let conn = db.connect()?;
647
648            let mut rows = conn
649                .query("SELECT data FROM test_large_persistence ORDER BY id;", ())
650                .await?;
651
652            for (i, value) in original_data.iter().enumerate().take(NUM_INSERTS) {
653                let row = rows
654                    .next()
655                    .await?
656                    .unwrap_or_else(|| panic!("Expected row {i} but found None"));
657                assert_eq!(
658                    row.get_value(0)?,
659                    Value::Text(value.clone()),
660                    "Mismatch in retrieved data for row {i}"
661                );
662            }
663
664            assert!(
665                rows.next().await?.is_none(),
666                "Expected no more rows after retrieving all inserted data"
667            );
668
669            // Delete the WAL file only and try to re-open and query
670            let wal_path = format!("{db_path}-wal");
671            std::fs::remove_file(&wal_path)
672                .map_err(|e| eprintln!("Warning: Failed to delete WAL file for test: {e}"))
673                .unwrap();
674        }
675
676        // Attempt to re-open the database after deleting WAL and assert that table is missing.
677        let db_after_wal_delete = Builder::new_local(db_path).build().await?;
678        let conn_after_wal_delete = db_after_wal_delete.connect()?;
679
680        let query_result_after_wal_delete = conn_after_wal_delete
681            .query("SELECT data FROM test_large_persistence ORDER BY id;", ())
682            .await;
683
684        match query_result_after_wal_delete {
685            Ok(_) => panic!("Query succeeded after WAL deletion and DB reopen, but was expected to fail because the table definition should have been in the WAL."),
686            Err(Error::SqlExecutionFailure(msg)) => {
687                assert!(
688                    msg.contains("no such table: test_large_persistence"),
689                    "Expected 'test_large_persistence not found' error, but got: {msg}"
690                );
691            }
692            Err(e) => panic!(
693                "Expected SqlExecutionFailure for 'no such table', but got a different error: {e:?}"
694            ),
695        }
696
697        Ok(())
698    }
699
700    #[tokio::test]
701    async fn test_database_persistence_write_one_frame_many_times() -> Result<()> {
702        let temp_file = NamedTempFile::new().unwrap();
703        let db_path = temp_file.path().to_str().unwrap();
704
705        for i in 0..100 {
706            {
707                let db = Builder::new_local(db_path).build().await?;
708                let conn = db.connect()?;
709
710                conn.execute("CREATE TABLE IF NOT EXISTS test_persistence (id INTEGER PRIMARY KEY, name TEXT NOT NULL);", ()).await?;
711                conn.execute("INSERT INTO test_persistence (name) VALUES ('Alice');", ())
712                    .await?;
713            }
714            {
715                let db = Builder::new_local(db_path).build().await?;
716                let conn = db.connect()?;
717
718                let mut rows_iter = conn
719                    .query("SELECT count(*) FROM test_persistence;", ())
720                    .await?;
721                let rows = rows_iter.next().await?.unwrap();
722                assert_eq!(rows.get_value(0)?, Value::Integer(i as i64 + 1));
723                assert!(rows_iter.next().await?.is_none());
724            }
725        }
726
727        Ok(())
728    }
729}