1#![allow(
3 rustdoc::bare_urls,
4 rustdoc::invalid_html_tags,
5 rustdoc::broken_intra_doc_links
6)]
7#![allow(
8 clippy::collapsible_match,
9 clippy::doc_overindented_list_items,
10 clippy::from_over_into
11)]
12
13pub mod params;
14pub mod value;
15
16pub use value::Value;
17
18pub use params::params_from_iter;
19
20use crate::params::*;
21use std::fmt::Debug;
22use std::num::NonZero;
23use std::sync::{Arc, Mutex};
24
25#[derive(Debug, thiserror::Error)]
26pub enum Error {
27 #[error("SQL conversion failure: `{0}`")]
28 ToSqlConversionFailure(BoxError),
29 #[error("Mutex lock error: {0}")]
30 MutexError(String),
31 #[error("SQL execution failure: `{0}`")]
32 SqlExecutionFailure(String),
33}
34
35impl From<limbo_core::LimboError> for Error {
36 fn from(err: limbo_core::LimboError) -> Self {
37 Error::SqlExecutionFailure(err.to_string())
38 }
39}
40
41pub(crate) type BoxError = Box<dyn std::error::Error + Send + Sync>;
42
43pub type Result<T> = std::result::Result<T, Error>;
44pub struct Builder {
45 path: String,
46}
47
48impl Builder {
49 pub fn new_local(path: &str) -> Self {
50 Self {
51 path: path.to_string(),
52 }
53 }
54
55 #[allow(unused_variables, clippy::arc_with_non_send_sync)]
56 pub async fn build(self) -> Result<Database> {
57 match self.path.as_str() {
58 ":memory:" => {
59 let io: Arc<dyn limbo_core::IO> = Arc::new(limbo_core::MemoryIO::new());
60 let db = limbo_core::Database::open_file(io, self.path.as_str(), false)?;
61 Ok(Database { inner: db })
62 }
63 path => {
64 let io: Arc<dyn limbo_core::IO> = Arc::new(limbo_core::PlatformIO::new()?);
65 let db = limbo_core::Database::open_file(io, path, false)?;
66 Ok(Database { inner: db })
67 }
68 }
69 }
70}
71
72#[derive(Clone)]
73pub struct Database {
74 inner: Arc<limbo_core::Database>,
75}
76
77unsafe impl Send for Database {}
78unsafe impl Sync for Database {}
79
80impl Debug for Database {
81 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
82 f.debug_struct("Database").finish()
83 }
84}
85
86impl Database {
87 pub fn connect(&self) -> Result<Connection> {
88 let conn = self.inner.connect()?;
89 #[allow(clippy::arc_with_non_send_sync)]
90 let connection = Connection {
91 inner: Arc::new(Mutex::new(conn)),
92 };
93 Ok(connection)
94 }
95}
96
97pub struct Connection {
98 inner: Arc<Mutex<Arc<limbo_core::Connection>>>,
99}
100
101impl Clone for Connection {
102 fn clone(&self) -> Self {
103 Self {
104 inner: Arc::clone(&self.inner),
105 }
106 }
107}
108
109unsafe impl Send for Connection {}
110unsafe impl Sync for Connection {}
111
112impl Connection {
113 pub async fn query(&self, sql: &str, params: impl IntoParams) -> Result<Rows> {
114 let mut stmt = self.prepare(sql).await?;
115 stmt.query(params).await
116 }
117
118 pub async fn execute(&self, sql: &str, params: impl IntoParams) -> Result<u64> {
119 let mut stmt = self.prepare(sql).await?;
120 stmt.execute(params).await
121 }
122
123 pub async fn prepare(&self, sql: &str) -> Result<Statement> {
124 let conn = self
125 .inner
126 .lock()
127 .map_err(|e| Error::MutexError(e.to_string()))?;
128
129 let stmt = conn.prepare(sql)?;
130
131 #[allow(clippy::arc_with_non_send_sync)]
132 let statement = Statement {
133 inner: Arc::new(Mutex::new(stmt)),
134 };
135 Ok(statement)
136 }
137
138 pub fn changes(&self) -> Result<i64> {
142 let conn = self
143 .inner
144 .lock()
145 .map_err(|e| Error::MutexError(e.to_string()))?;
146 Ok(conn.changes())
147 }
148
149 pub fn pragma_query<F>(&self, pragma_name: &str, mut f: F) -> Result<()>
150 where
151 F: FnMut(&Row) -> limbo_core::Result<()>,
152 {
153 let conn = self
154 .inner
155 .lock()
156 .map_err(|e| Error::MutexError(e.to_string()))?;
157
158 let rows: Vec<Row> = conn
159 .pragma_query(pragma_name)
160 .map_err(|e| Error::SqlExecutionFailure(e.to_string()))?
161 .iter()
162 .map(|row| row.iter().collect::<Row>())
163 .collect();
164
165 rows.iter().try_for_each(|row| {
166 f(row).map_err(|e| {
167 Error::SqlExecutionFailure(format!("Error executing user defined function: {}", e))
168 })
169 })?;
170 Ok(())
171 }
172}
173
174pub struct Statement {
175 inner: Arc<Mutex<limbo_core::Statement>>,
176}
177
178impl Clone for Statement {
179 fn clone(&self) -> Self {
180 Self {
181 inner: Arc::clone(&self.inner),
182 }
183 }
184}
185
186unsafe impl Send for Statement {}
187unsafe impl Sync for Statement {}
188
189impl Statement {
190 pub async fn query(&mut self, params: impl IntoParams) -> Result<Rows> {
191 let params = params.into_params()?;
192 match params {
193 params::Params::None => (),
194 params::Params::Positional(values) => {
195 for (i, value) in values.into_iter().enumerate() {
196 let mut stmt = self.inner.lock().unwrap();
197 stmt.bind_at(NonZero::new(i + 1).unwrap(), value.into());
198 }
199 }
200 params::Params::Named(_items) => todo!(),
201 }
202 #[allow(clippy::arc_with_non_send_sync)]
203 let rows = Rows {
204 inner: Arc::clone(&self.inner),
205 };
206 Ok(rows)
207 }
208
209 pub async fn execute(&mut self, params: impl IntoParams) -> Result<u64> {
210 {
211 self.inner.lock().unwrap().reset();
213 }
214 let params = params.into_params()?;
215 match params {
216 params::Params::None => (),
217 params::Params::Positional(values) => {
218 for (i, value) in values.into_iter().enumerate() {
219 let mut stmt = self.inner.lock().unwrap();
220 stmt.bind_at(NonZero::new(i + 1).unwrap(), value.into());
221 }
222 }
223 params::Params::Named(_items) => todo!(),
224 }
225 loop {
226 let mut stmt = self.inner.lock().unwrap();
227 match stmt.step() {
228 Ok(limbo_core::StepResult::Row) => {
229 return Ok(2);
231 }
232 Ok(limbo_core::StepResult::Done) => {
233 return Ok(0);
234 }
235 Ok(limbo_core::StepResult::IO) => {
236 let _ = stmt.run_once();
237 }
239 Ok(limbo_core::StepResult::Busy) => {
240 return Ok(4);
241 }
242 Ok(limbo_core::StepResult::Interrupt) => {
243 return Ok(3);
244 }
245 Err(err) => {
246 return Err(err.into());
247 }
248 }
249 }
250 }
251
252 pub fn columns(&self) -> Vec<Column> {
253 let stmt = self.inner.lock().unwrap();
254
255 let n = stmt.num_columns();
256
257 let mut cols = Vec::with_capacity(n);
258
259 for i in 0..n {
260 let name = stmt.get_column_name(i).into_owned();
261 let decl_type = stmt.get_column_decl_type(i).map(|s| s.into_owned());
262 cols.push(Column { name, decl_type });
263 }
264
265 cols
266 }
267}
268
269pub struct Column {
270 name: String,
271 decl_type: Option<String>,
272}
273
274impl Column {
275 pub fn name(&self) -> &str {
276 &self.name
277 }
278
279 pub fn decl_type(&self) -> Option<&str> {
280 self.decl_type.as_deref()
281 }
282}
283
284pub trait IntoValue {
285 fn into_value(self) -> Result<Value>;
286}
287
288#[derive(Debug, Clone)]
289pub enum Params {
290 None,
291 Positional(Vec<Value>),
292 Named(Vec<(String, Value)>),
293}
294pub struct Transaction {}
295
296pub struct Rows {
297 inner: Arc<Mutex<limbo_core::Statement>>,
298}
299
300impl Clone for Rows {
301 fn clone(&self) -> Self {
302 Self {
303 inner: Arc::clone(&self.inner),
304 }
305 }
306}
307
308unsafe impl Send for Rows {}
309unsafe impl Sync for Rows {}
310
311impl Rows {
312 pub async fn next(&mut self) -> Result<Option<Row>> {
313 loop {
314 let mut stmt = self
315 .inner
316 .lock()
317 .map_err(|e| Error::MutexError(e.to_string()))?;
318 match stmt.step() {
319 Ok(limbo_core::StepResult::Row) => {
320 let row = stmt.row().unwrap();
321 return Ok(Some(Row {
322 values: row.get_values().map(|v| v.to_owned()).collect(),
323 }));
324 }
325 Ok(limbo_core::StepResult::Done) => return Ok(None),
326 Ok(limbo_core::StepResult::IO) => {
327 if let Err(e) = stmt.run_once() {
328 return Err(e.into());
329 }
330 continue;
331 }
332 Ok(limbo_core::StepResult::Busy) => return Ok(None),
333 Ok(limbo_core::StepResult::Interrupt) => return Ok(None),
334 _ => return Ok(None),
335 }
336 }
337 }
338}
339
340#[derive(Debug)]
341pub struct Row {
342 values: Vec<limbo_core::Value>,
343}
344
345unsafe impl Send for Row {}
346unsafe impl Sync for Row {}
347
348impl Row {
349 pub fn get_value(&self, index: usize) -> Result<Value> {
350 let value = &self.values[index];
351 match value {
352 limbo_core::Value::Integer(i) => Ok(Value::Integer(*i)),
353 limbo_core::Value::Null => Ok(Value::Null),
354 limbo_core::Value::Float(f) => Ok(Value::Real(*f)),
355 limbo_core::Value::Text(text) => Ok(Value::Text(text.to_string())),
356 limbo_core::Value::Blob(items) => Ok(Value::Blob(items.to_vec())),
357 }
358 }
359
360 pub fn column_count(&self) -> usize {
361 self.values.len()
362 }
363}
364
365impl<'a> FromIterator<&'a limbo_core::Value> for Row {
366 fn from_iter<T: IntoIterator<Item = &'a limbo_core::Value>>(iter: T) -> Self {
367 let values = iter
368 .into_iter()
369 .map(|v| match v {
370 limbo_core::Value::Integer(i) => limbo_core::Value::Integer(*i),
371 limbo_core::Value::Null => limbo_core::Value::Null,
372 limbo_core::Value::Float(f) => limbo_core::Value::Float(*f),
373 limbo_core::Value::Text(s) => limbo_core::Value::Text(s.clone()),
374 limbo_core::Value::Blob(b) => limbo_core::Value::Blob(b.clone()),
375 })
376 .collect();
377
378 Row { values }
379 }
380}
381
382#[cfg(test)]
383mod tests {
384 use super::*;
385 use tempfile::NamedTempFile;
386
387 #[tokio::test]
388 async fn test_database_persistence() -> Result<()> {
389 let temp_file = NamedTempFile::new().unwrap();
390 let db_path = temp_file.path().to_str().unwrap();
391
392 {
394 let db = Builder::new_local(db_path).build().await?;
395 let conn = db.connect()?;
396 conn.execute(
397 "CREATE TABLE test_persistence (id INTEGER PRIMARY KEY, name TEXT NOT NULL);",
398 (),
399 )
400 .await?;
401 conn.execute("INSERT INTO test_persistence (name) VALUES ('Alice');", ())
402 .await?;
403 conn.execute("INSERT INTO test_persistence (name) VALUES ('Bob');", ())
404 .await?;
405 } let db = Builder::new_local(db_path).build().await?;
409 let conn = db.connect()?;
410
411 let mut rows = conn
412 .query("SELECT name FROM test_persistence ORDER BY id;", ())
413 .await?;
414
415 let row1 = rows.next().await?.expect("Expected first row");
416 assert_eq!(row1.get_value(0)?, Value::Text("Alice".to_string()));
417
418 let row2 = rows.next().await?.expect("Expected second row");
419 assert_eq!(row2.get_value(0)?, Value::Text("Bob".to_string()));
420
421 assert!(rows.next().await?.is_none(), "Expected no more rows");
422
423 Ok(())
424 }
425
426 #[tokio::test]
427 async fn test_database_persistence_many_frames() -> Result<()> {
428 let temp_file = NamedTempFile::new().unwrap();
429 let db_path = temp_file.path().to_str().unwrap();
430
431 const NUM_INSERTS: usize = 100;
432 const TARGET_STRING_LEN: usize = 1024; let mut original_data = Vec::with_capacity(NUM_INSERTS);
435 for i in 0..NUM_INSERTS {
436 let prefix = format!("test_string_{:04}_", i);
437 let padding_len = TARGET_STRING_LEN.saturating_sub(prefix.len());
438 let padding: String = "A".repeat(padding_len);
439 original_data.push(format!("{}{}", prefix, padding));
440 }
441
442 {
444 let db = Builder::new_local(db_path).build().await?;
445 let conn = db.connect()?;
446 conn.execute(
447 "CREATE TABLE test_large_persistence (id INTEGER PRIMARY KEY AUTOINCREMENT, data TEXT NOT NULL);",
448 (),
449 )
450 .await?;
451
452 for data_val in &original_data {
453 conn.execute(
454 "INSERT INTO test_large_persistence (data) VALUES (?);",
455 params::Params::Positional(vec![Value::Text(data_val.clone())]),
456 )
457 .await?;
458 }
459 } let db = Builder::new_local(db_path).build().await?;
463 let conn = db.connect()?;
464
465 let mut rows = conn
466 .query("SELECT data FROM test_large_persistence ORDER BY id;", ())
467 .await?;
468
469 for (i, expected) in original_data.iter().enumerate().take(NUM_INSERTS) {
470 let row = rows
471 .next()
472 .await?
473 .unwrap_or_else(|| panic!("Expected row {} but found None", i));
474 assert_eq!(
475 row.get_value(0)?,
476 Value::Text(expected.clone()),
477 "Mismatch in retrieved data for row {}",
478 i
479 );
480 }
481
482 assert!(
483 rows.next().await?.is_none(),
484 "Expected no more rows after retrieving all inserted data"
485 );
486
487 let wal_path = format!("{}-wal", db_path);
489 std::fs::remove_file(&wal_path)
490 .map_err(|e| eprintln!("Warning: Failed to delete WAL file for test: {}", e))
491 .unwrap();
492
493 let db_after_wal_delete = Builder::new_local(db_path).build().await?;
495 let conn_after_wal_delete = db_after_wal_delete.connect()?;
496
497 let query_result_after_wal_delete = conn_after_wal_delete
498 .query("SELECT data FROM test_large_persistence ORDER BY id;", ())
499 .await;
500
501 match query_result_after_wal_delete {
502 Ok(_) => panic!("Query succeeded after WAL deletion and DB reopen, but was expected to fail because the table definition should have been in the WAL."),
503 Err(Error::SqlExecutionFailure(msg)) => {
504 assert!(
505 msg.contains("test_large_persistence not found"),
506 "Expected 'test_large_persistence not found' error, but got: {}",
507 msg
508 );
509 }
510 Err(e) => panic!(
511 "Expected SqlExecutionFailure for 'no such table', but got a different error: {:?}",
512 e
513 ),
514 }
515
516 Ok(())
517 }
518
519 #[tokio::test]
520 async fn test_database_persistence_write_one_frame_many_times() -> Result<()> {
521 let temp_file = NamedTempFile::new().unwrap();
522 let db_path = temp_file.path().to_str().unwrap();
523
524 for i in 0..100 {
525 {
526 let db = Builder::new_local(db_path).build().await?;
527 let conn = db.connect()?;
528
529 conn.execute("CREATE TABLE IF NOT EXISTS test_persistence (id INTEGER PRIMARY KEY, name TEXT NOT NULL);", ()).await?;
530 conn.execute("INSERT INTO test_persistence (name) VALUES ('Alice');", ())
531 .await?;
532 }
533 {
534 let db = Builder::new_local(db_path).build().await?;
535 let conn = db.connect()?;
536
537 let mut rows_iter = conn
538 .query("SELECT count(*) FROM test_persistence;", ())
539 .await?;
540 let rows = rows_iter.next().await?.unwrap();
541 assert_eq!(rows.get_value(0)?, Value::Integer(i as i64 + 1));
542 assert!(rows_iter.next().await?.is_none());
543 }
544 }
545
546 Ok(())
547 }
548}