1pub mod params;
36mod rows;
37pub mod transaction;
38pub mod value;
39
40use transaction::TransactionBehavior;
41#[cfg(feature = "conn_raw_api")]
42use turso_core::types::WalFrameInfo;
43pub use value::Value;
44
45pub use params::params_from_iter;
46pub use params::IntoParams;
47
48use std::fmt::Debug;
49use std::num::NonZero;
50use std::sync::{Arc, Mutex};
51
52pub use crate::rows::{Row, Rows};
54
55#[derive(Debug, thiserror::Error)]
56pub enum Error {
57 #[error("SQL conversion failure: `{0}`")]
58 ToSqlConversionFailure(BoxError),
59 #[error("Mutex lock error: {0}")]
60 MutexError(String),
61 #[error("SQL execution failure: `{0}`")]
62 SqlExecutionFailure(String),
63 #[error("WAL operation error: `{0}`")]
64 WalOperationError(String),
65 #[error("Query returned no rows")]
66 QueryReturnedNoRows,
67 #[error("Conversion failure: `{0}`")]
68 ConversionFailure(String),
69}
70
71impl From<turso_core::LimboError> for Error {
72 fn from(err: turso_core::LimboError) -> Self {
73 Error::SqlExecutionFailure(err.to_string())
74 }
75}
76
77pub(crate) type BoxError = Box<dyn std::error::Error + Send + Sync>;
78
79pub type Result<T> = std::result::Result<T, Error>;
80
81pub struct Builder {
83 path: String,
84 enable_mvcc: bool,
85 vfs: Option<String>,
86}
87
88impl Builder {
89 pub fn new_local(path: &str) -> Self {
91 Self {
92 path: path.to_string(),
93 enable_mvcc: false,
94 vfs: None,
95 }
96 }
97
98 pub fn with_mvcc(mut self, mvcc_enabled: bool) -> Self {
99 self.enable_mvcc = mvcc_enabled;
100 self
101 }
102
103 pub fn with_io(mut self, vfs: String) -> Self {
104 self.vfs = Some(vfs);
105 self
106 }
107
108 #[allow(unused_variables, clippy::arc_with_non_send_sync)]
110 pub async fn build(self) -> Result<Database> {
111 let io = self.get_io()?;
112 let db = turso_core::Database::open_file(io, self.path.as_str(), self.enable_mvcc, true)?;
113 Ok(Database { inner: db })
114 }
115
116 fn get_io(&self) -> Result<Arc<dyn turso_core::IO>> {
117 let vfs_choice = self.vfs.as_deref().unwrap_or("");
118
119 if self.path == ":memory:" && vfs_choice.is_empty() {
120 return Ok(Arc::new(turso_core::MemoryIO::new()));
121 }
122
123 match vfs_choice {
124 "memory" => Ok(Arc::new(turso_core::MemoryIO::new())),
125 "syscall" => {
126 #[cfg(target_family = "unix")]
127 {
128 Ok(Arc::new(
129 turso_core::UnixIO::new()
130 .map_err(|e| Error::SqlExecutionFailure(e.to_string()))?,
131 ))
132 }
133 #[cfg(not(target_family = "unix"))]
134 {
135 Ok(Arc::new(
136 turso_core::PlatformIO::new()
137 .map_err(|e| Error::SqlExecutionFailure(e.to_string()))?,
138 ))
139 }
140 }
141 #[cfg(target_os = "linux")]
142 "io_uring" => Ok(Arc::new(
143 turso_core::UringIO::new()
144 .map_err(|e| Error::SqlExecutionFailure(e.to_string()))?,
145 )),
146 #[cfg(not(target_os = "linux"))]
147 "io_uring" => Err(Error::SqlExecutionFailure(
148 "io_uring is only available on Linux targets".to_string(),
149 )),
150 "" => {
151 if self.path == ":memory:" {
153 Ok(Arc::new(turso_core::MemoryIO::new()))
154 } else {
155 Ok(Arc::new(
156 turso_core::PlatformIO::new()
157 .map_err(|e| Error::SqlExecutionFailure(e.to_string()))?,
158 ))
159 }
160 }
161 _ => Ok(Arc::new(
162 turso_core::PlatformIO::new()
163 .map_err(|e| Error::SqlExecutionFailure(e.to_string()))?,
164 )),
165 }
166 }
167}
168
169#[derive(Clone)]
173pub struct Database {
174 inner: Arc<turso_core::Database>,
175}
176
177unsafe impl Send for Database {}
178unsafe impl Sync for Database {}
179
180impl Debug for Database {
181 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
182 f.debug_struct("Database").finish()
183 }
184}
185
186impl Database {
187 pub fn connect(&self) -> Result<Connection> {
189 let conn = self.inner.connect()?;
190 Ok(Connection::create(conn))
191 }
192}
193
194pub struct Connection {
196 inner: Arc<Mutex<Arc<turso_core::Connection>>>,
197 transaction_behavior: TransactionBehavior,
198}
199
200impl Clone for Connection {
201 fn clone(&self) -> Self {
202 Self {
203 inner: Arc::clone(&self.inner),
204 transaction_behavior: self.transaction_behavior,
205 }
206 }
207}
208
209unsafe impl Send for Connection {}
210unsafe impl Sync for Connection {}
211
212impl Connection {
213 pub fn create(conn: Arc<turso_core::Connection>) -> Self {
214 #[allow(clippy::arc_with_non_send_sync)]
215 let connection = Connection {
216 inner: Arc::new(Mutex::new(conn)),
217 transaction_behavior: TransactionBehavior::Deferred,
218 };
219 connection
220 }
221 pub async fn query(&self, sql: &str, params: impl IntoParams) -> Result<Rows> {
223 let mut stmt = self.prepare(sql).await?;
224 stmt.query(params).await
225 }
226
227 pub async fn execute(&self, sql: &str, params: impl IntoParams) -> Result<u64> {
229 let mut stmt = self.prepare(sql).await?;
230 stmt.execute(params).await
231 }
232
233 #[cfg(feature = "conn_raw_api")]
234 pub fn wal_frame_count(&self) -> Result<u64> {
235 let conn = self
236 .inner
237 .lock()
238 .map_err(|e| Error::MutexError(e.to_string()))?;
239 conn.wal_state()
240 .map_err(|e| Error::WalOperationError(format!("wal_insert_begin failed: {e}")))
241 .map(|state| state.max_frame)
242 }
243
244 #[cfg(feature = "conn_raw_api")]
245 pub fn try_wal_watermark_read_page(
246 &self,
247 page_idx: u32,
248 page: &mut [u8],
249 frame_watermark: Option<u64>,
250 ) -> Result<bool> {
251 let conn = self
252 .inner
253 .lock()
254 .map_err(|e| Error::MutexError(e.to_string()))?;
255 conn.try_wal_watermark_read_page(page_idx, page, frame_watermark)
256 .map_err(|e| {
257 Error::WalOperationError(format!("try_wal_watermark_read_page failed: {e}"))
258 })
259 }
260
261 #[cfg(feature = "conn_raw_api")]
262 pub fn wal_changed_pages_after(&self, frame_watermark: u64) -> Result<Vec<u32>> {
263 let conn = self
264 .inner
265 .lock()
266 .map_err(|e| Error::MutexError(e.to_string()))?;
267 conn.wal_changed_pages_after(frame_watermark)
268 .map_err(|e| Error::WalOperationError(format!("wal_changed_pages_after failed: {e}")))
269 }
270
271 #[cfg(feature = "conn_raw_api")]
272 pub fn wal_insert_begin(&self) -> Result<()> {
273 let conn = self
274 .inner
275 .lock()
276 .map_err(|e| Error::MutexError(e.to_string()))?;
277 conn.wal_insert_begin()
278 .map_err(|e| Error::WalOperationError(format!("wal_insert_begin failed: {e}")))
279 }
280
281 #[cfg(feature = "conn_raw_api")]
282 pub fn wal_insert_end(&self, force_commit: bool) -> Result<()> {
283 let conn = self
284 .inner
285 .lock()
286 .map_err(|e| Error::MutexError(e.to_string()))?;
287 conn.wal_insert_end(force_commit)
288 .map_err(|e| Error::WalOperationError(format!("wal_insert_end failed: {e}")))
289 }
290
291 #[cfg(feature = "conn_raw_api")]
292 pub fn wal_insert_frame(&self, frame_no: u64, frame: &[u8]) -> Result<WalFrameInfo> {
293 let conn = self
294 .inner
295 .lock()
296 .map_err(|e| Error::MutexError(e.to_string()))?;
297 conn.wal_insert_frame(frame_no, frame)
298 .map_err(|e| Error::WalOperationError(format!("wal_insert_frame failed: {e}")))
299 }
300
301 #[cfg(feature = "conn_raw_api")]
302 pub fn wal_get_frame(&self, frame_no: u64, frame: &mut [u8]) -> Result<WalFrameInfo> {
303 let conn = self
304 .inner
305 .lock()
306 .map_err(|e| Error::MutexError(e.to_string()))?;
307 conn.wal_get_frame(frame_no, frame)
308 .map_err(|e| Error::WalOperationError(format!("wal_insert_frame failed: {e}")))
309 }
310
311 pub async fn execute_batch(&self, sql: &str) -> Result<()> {
313 self.prepare_execute_batch(sql).await?;
314 Ok(())
315 }
316
317 pub async fn prepare(&self, sql: &str) -> Result<Statement> {
319 let conn = self
320 .inner
321 .lock()
322 .map_err(|e| Error::MutexError(e.to_string()))?;
323
324 let stmt = conn.prepare(sql)?;
325
326 #[allow(clippy::arc_with_non_send_sync)]
327 let statement = Statement {
328 inner: Arc::new(Mutex::new(stmt)),
329 };
330 Ok(statement)
331 }
332
333 async fn prepare_execute_batch(&self, sql: impl AsRef<str>) -> Result<()> {
334 let conn = self
335 .inner
336 .lock()
337 .map_err(|e| Error::MutexError(e.to_string()))?;
338 conn.prepare_execute_batch(sql)?;
339 Ok(())
340 }
341
342 pub fn pragma_query<F>(&self, pragma_name: &str, mut f: F) -> Result<()>
344 where
345 F: FnMut(&Row) -> turso_core::Result<()>,
346 {
347 let conn = self
348 .inner
349 .lock()
350 .map_err(|e| Error::MutexError(e.to_string()))?;
351
352 let rows: Vec<Row> = conn
353 .pragma_query(pragma_name)
354 .map_err(|e| Error::SqlExecutionFailure(e.to_string()))?
355 .iter()
356 .map(|row| row.iter().collect::<Row>())
357 .collect();
358
359 rows.iter().try_for_each(|row| {
360 f(row).map_err(|e| {
361 Error::SqlExecutionFailure(format!("Error executing user defined function: {e}"))
362 })
363 })?;
364 Ok(())
365 }
366
367 pub fn last_insert_rowid(&self) -> i64 {
369 let conn = self.inner.lock().unwrap();
370 conn.last_insert_rowid()
371 }
372
373 pub fn cacheflush(&self) -> Result<()> {
376 let conn = self
377 .inner
378 .lock()
379 .map_err(|e| Error::MutexError(e.to_string()))?;
380 let completions = conn.cacheflush()?;
381 let pager = conn.get_pager();
382 for c in completions {
383 pager.io.wait_for_completion(c)?;
384 }
385 Ok(())
386 }
387
388 pub fn is_autocommit(&self) -> Result<bool> {
389 let conn = self
390 .inner
391 .lock()
392 .map_err(|e| Error::MutexError(e.to_string()))?;
393
394 Ok(conn.get_auto_commit())
395 }
396}
397
398impl Debug for Connection {
399 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
400 f.debug_struct("Connection").finish()
401 }
402}
403
404pub struct Statement {
406 inner: Arc<Mutex<turso_core::Statement>>,
407}
408
409impl Clone for Statement {
410 fn clone(&self) -> Self {
411 Self {
412 inner: Arc::clone(&self.inner),
413 }
414 }
415}
416
417unsafe impl Send for Statement {}
418unsafe impl Sync for Statement {}
419
420impl Statement {
421 pub async fn query(&mut self, params: impl IntoParams) -> Result<Rows> {
423 let params = params.into_params()?;
424 match params {
425 params::Params::None => (),
426 params::Params::Positional(values) => {
427 for (i, value) in values.into_iter().enumerate() {
428 let mut stmt = self.inner.lock().unwrap();
429 stmt.bind_at(NonZero::new(i + 1).unwrap(), value.into());
430 }
431 }
432 params::Params::Named(values) => {
433 for (name, value) in values.into_iter() {
434 let mut stmt = self.inner.lock().unwrap();
435 let i = stmt.parameters().index(name).unwrap();
436 stmt.bind_at(i, value.into());
437 }
438 }
439 }
440 let rows = Rows::new(&self.inner);
441 Ok(rows)
442 }
443
444 pub async fn execute(&mut self, params: impl IntoParams) -> Result<u64> {
446 {
447 self.inner.lock().unwrap().reset();
449 }
450 let params = params.into_params()?;
451 match params {
452 params::Params::None => (),
453 params::Params::Positional(values) => {
454 for (i, value) in values.into_iter().enumerate() {
455 let mut stmt = self.inner.lock().unwrap();
456 stmt.bind_at(NonZero::new(i + 1).unwrap(), value.into());
457 }
458 }
459 params::Params::Named(values) => {
460 for (name, value) in values.into_iter() {
461 let mut stmt = self.inner.lock().unwrap();
462 let i = stmt.parameters().index(name).unwrap();
463 stmt.bind_at(i, value.into());
464 }
465 }
466 }
467 loop {
468 let mut stmt = self.inner.lock().unwrap();
469 match stmt.step() {
470 Ok(turso_core::StepResult::Row) => {
471 return Err(Error::SqlExecutionFailure(
472 "unexpected row during execution".to_string(),
473 ));
474 }
475 Ok(turso_core::StepResult::Done) => {
476 let changes = stmt.n_change();
477 assert!(changes >= 0);
478 return Ok(changes as u64);
479 }
480 Ok(turso_core::StepResult::IO) => {
481 stmt.run_once()?;
482 }
483 Ok(turso_core::StepResult::Busy) => {
484 return Err(Error::SqlExecutionFailure("database is locked".to_string()));
485 }
486 Ok(turso_core::StepResult::Interrupt) => {
487 return Err(Error::SqlExecutionFailure("interrupted".to_string()));
488 }
489 Err(err) => {
490 return Err(err.into());
491 }
492 }
493 }
494 }
495
496 pub fn columns(&self) -> Vec<Column> {
498 let stmt = self.inner.lock().unwrap();
499
500 let n = stmt.num_columns();
501
502 let mut cols = Vec::with_capacity(n);
503
504 for i in 0..n {
505 let name = stmt.get_column_name(i).into_owned();
506 cols.push(Column {
507 name,
508 decl_type: None, });
510 }
511
512 cols
513 }
514
515 pub fn reset(&self) {
517 let mut stmt = self.inner.lock().unwrap();
518 stmt.reset();
519 }
520
521 pub async fn query_row(&mut self, params: impl IntoParams) -> Result<Row> {
527 let mut rows = self.query(params).await?;
528
529 rows.next().await?.ok_or(Error::QueryReturnedNoRows)
530 }
531}
532
533pub struct Column {
535 name: String,
536 decl_type: Option<String>,
537}
538
539impl Column {
540 pub fn name(&self) -> &str {
542 &self.name
543 }
544
545 pub fn decl_type(&self) -> Option<&str> {
547 self.decl_type.as_deref()
548 }
549}
550
551pub trait IntoValue {
552 fn into_value(self) -> Result<Value>;
553}
554
555#[derive(Debug, Clone)]
556pub enum Params {
557 None,
558 Positional(Vec<Value>),
559 Named(Vec<(String, Value)>),
560}
561
562pub struct Transaction {}
563
564#[cfg(test)]
565mod tests {
566 use super::*;
567 use tempfile::NamedTempFile;
568
569 #[tokio::test]
570 async fn test_database_persistence() -> Result<()> {
571 let temp_file = NamedTempFile::new().unwrap();
572 let db_path = temp_file.path().to_str().unwrap();
573
574 {
576 let db = Builder::new_local(db_path).build().await?;
577 let conn = db.connect()?;
578 conn.execute(
579 "CREATE TABLE test_persistence (id INTEGER PRIMARY KEY, name TEXT NOT NULL);",
580 (),
581 )
582 .await?;
583 conn.execute("INSERT INTO test_persistence (name) VALUES ('Alice');", ())
584 .await?;
585 conn.execute("INSERT INTO test_persistence (name) VALUES ('Bob');", ())
586 .await?;
587 } let db = Builder::new_local(db_path).build().await?;
591 let conn = db.connect()?;
592
593 let mut rows = conn
594 .query("SELECT name FROM test_persistence ORDER BY id;", ())
595 .await?;
596
597 let row1 = rows.next().await?.expect("Expected first row");
598 assert_eq!(row1.get_value(0)?, Value::Text("Alice".to_string()));
599
600 let row2 = rows.next().await?.expect("Expected second row");
601 assert_eq!(row2.get_value(0)?, Value::Text("Bob".to_string()));
602
603 assert!(rows.next().await?.is_none(), "Expected no more rows");
604
605 Ok(())
606 }
607
608 #[tokio::test]
609 async fn test_database_persistence_many_frames() -> Result<()> {
610 let temp_file = NamedTempFile::new().unwrap();
611 let db_path = temp_file.path().to_str().unwrap();
612
613 const NUM_INSERTS: usize = 100;
614 const TARGET_STRING_LEN: usize = 1024; let mut original_data = Vec::with_capacity(NUM_INSERTS);
617 for i in 0..NUM_INSERTS {
618 let prefix = format!("test_string_{i:04}_");
619 let padding_len = TARGET_STRING_LEN.saturating_sub(prefix.len());
620 let padding: String = "A".repeat(padding_len);
621 original_data.push(format!("{prefix}{padding}"));
622 }
623
624 {
626 let db = Builder::new_local(db_path).build().await?;
627 let conn = db.connect()?;
628 conn.execute(
629 "CREATE TABLE test_large_persistence (id INTEGER PRIMARY KEY AUTOINCREMENT, data TEXT NOT NULL);",
630 (),
631 )
632 .await?;
633
634 for data_val in &original_data {
635 conn.execute(
636 "INSERT INTO test_large_persistence (data) VALUES (?);",
637 params::Params::Positional(vec![Value::Text(data_val.clone())]),
638 )
639 .await?;
640 }
641 } {
644 let db = Builder::new_local(db_path).build().await?;
646 let conn = db.connect()?;
647
648 let mut rows = conn
649 .query("SELECT data FROM test_large_persistence ORDER BY id;", ())
650 .await?;
651
652 for (i, value) in original_data.iter().enumerate().take(NUM_INSERTS) {
653 let row = rows
654 .next()
655 .await?
656 .unwrap_or_else(|| panic!("Expected row {i} but found None"));
657 assert_eq!(
658 row.get_value(0)?,
659 Value::Text(value.clone()),
660 "Mismatch in retrieved data for row {i}"
661 );
662 }
663
664 assert!(
665 rows.next().await?.is_none(),
666 "Expected no more rows after retrieving all inserted data"
667 );
668
669 let wal_path = format!("{db_path}-wal");
671 std::fs::remove_file(&wal_path)
672 .map_err(|e| eprintln!("Warning: Failed to delete WAL file for test: {e}"))
673 .unwrap();
674 }
675
676 let db_after_wal_delete = Builder::new_local(db_path).build().await?;
678 let conn_after_wal_delete = db_after_wal_delete.connect()?;
679
680 let query_result_after_wal_delete = conn_after_wal_delete
681 .query("SELECT data FROM test_large_persistence ORDER BY id;", ())
682 .await;
683
684 match query_result_after_wal_delete {
685 Ok(_) => panic!("Query succeeded after WAL deletion and DB reopen, but was expected to fail because the table definition should have been in the WAL."),
686 Err(Error::SqlExecutionFailure(msg)) => {
687 assert!(
688 msg.contains("no such table: test_large_persistence"),
689 "Expected 'test_large_persistence not found' error, but got: {msg}"
690 );
691 }
692 Err(e) => panic!(
693 "Expected SqlExecutionFailure for 'no such table', but got a different error: {e:?}"
694 ),
695 }
696
697 Ok(())
698 }
699
700 #[tokio::test]
701 async fn test_database_persistence_write_one_frame_many_times() -> Result<()> {
702 let temp_file = NamedTempFile::new().unwrap();
703 let db_path = temp_file.path().to_str().unwrap();
704
705 for i in 0..100 {
706 {
707 let db = Builder::new_local(db_path).build().await?;
708 let conn = db.connect()?;
709
710 conn.execute("CREATE TABLE IF NOT EXISTS test_persistence (id INTEGER PRIMARY KEY, name TEXT NOT NULL);", ()).await?;
711 conn.execute("INSERT INTO test_persistence (name) VALUES ('Alice');", ())
712 .await?;
713 }
714 {
715 let db = Builder::new_local(db_path).build().await?;
716 let conn = db.connect()?;
717
718 let mut rows_iter = conn
719 .query("SELECT count(*) FROM test_persistence;", ())
720 .await?;
721 let rows = rows_iter.next().await?.unwrap();
722 assert_eq!(rows.get_value(0)?, Value::Integer(i as i64 + 1));
723 assert!(rows_iter.next().await?.is_none());
724 }
725 }
726
727 Ok(())
728 }
729}