1#![allow(
3 rustdoc::bare_urls,
4 rustdoc::invalid_html_tags,
5 rustdoc::broken_intra_doc_links
6)]
7#![allow(clippy::arc_with_non_send_sync)]
8#![allow(unused_assignments)]
10#![allow(mismatched_lifetime_syntaxes)]
12#![allow(unpredictable_function_pointer_comparisons)]
14#![allow(
16 clippy::bool_assert_comparison,
17 clippy::collapsible_match,
18 clippy::clone_on_copy,
19 clippy::comparison_to_empty,
20 clippy::derivable_impls,
21 clippy::doc_lazy_continuation,
22 clippy::doc_overindented_list_items,
23 clippy::duplicated_attributes,
24 clippy::enum_variant_names,
25 clippy::excessive_precision,
26 clippy::explicit_auto_deref,
27 clippy::explicit_counter_loop,
28 clippy::extra_unused_lifetimes,
29 clippy::filter_next,
30 clippy::from_over_into,
31 clippy::get_first,
32 clippy::identity_op,
33 clippy::inherent_to_string,
34 clippy::iter_cloned_collect,
35 clippy::large_enum_variant,
36 clippy::len_without_is_empty,
37 clippy::len_zero,
38 clippy::let_and_return,
39 clippy::manual_ignore_case_cmp,
40 clippy::manual_inspect,
41 clippy::manual_is_multiple_of,
42 clippy::manual_map,
43 clippy::manual_ok_err,
44 clippy::manual_range_contains,
45 clippy::manual_repeat_n,
46 clippy::manual_saturating_arithmetic,
47 clippy::manual_strip,
48 clippy::map_clone,
49 clippy::match_like_matches_macro,
50 clippy::needless_borrow,
51 clippy::needless_borrows_for_generic_args,
52 clippy::needless_lifetimes,
53 clippy::needless_range_loop,
54 clippy::needless_return,
55 clippy::new_without_default,
56 clippy::nonminimal_bool,
57 clippy::obfuscated_if_else,
58 clippy::option_as_ref_deref,
59 clippy::option_map_or_none,
60 clippy::partialeq_ne_impl,
61 clippy::partialeq_to_none,
62 clippy::ptr_arg,
63 clippy::question_mark,
64 clippy::redundant_closure,
65 clippy::redundant_field_names,
66 clippy::redundant_pattern_matching,
67 clippy::should_implement_trait,
68 clippy::single_match,
69 clippy::too_many_arguments,
70 clippy::unnecessary_cast,
71 clippy::unnecessary_map_or,
72 clippy::unnecessary_mut_passed,
73 clippy::unnecessary_unwrap,
74 clippy::unneeded_struct_pattern,
75 clippy::unused_unit,
76 clippy::upper_case_acronyms,
77 clippy::useless_conversion,
78 clippy::useless_format,
79 clippy::wrong_self_convention
80)]
81
82mod error;
83mod ext;
84mod fast_lock;
85mod function;
86mod functions;
87mod info;
88mod io;
89#[cfg(feature = "json")]
90mod json;
91pub mod mvcc;
92mod parameters;
93mod pragma;
94mod pseudo;
95pub mod result;
96mod schema;
97mod statistics;
98mod storage;
99mod translate;
100pub mod types;
101#[allow(dead_code)]
102mod util;
103mod vdbe;
104mod vector;
105mod vtab;
106
107#[cfg(feature = "fuzz")]
108pub mod numeric;
109
110#[cfg(not(feature = "fuzz"))]
111mod numeric;
112
113use crate::vtab::VirtualTable;
114use crate::{fast_lock::SpinLock, translate::optimizer::optimize_plan};
115use core::str;
116pub use error::LimboError;
117use fallible_iterator::FallibleIterator;
118pub use io::clock::{Clock, Instant};
119#[cfg(all(feature = "fs", target_family = "unix", feature = "native-io"))]
120pub use io::UnixIO;
121#[cfg(all(feature = "fs", target_os = "linux", feature = "io_uring"))]
122pub use io::UringIO;
123pub use io::{
124 Buffer, Completion, File, MemoryIO, OpenFlags, PlatformIO, SyscallIO, WriteCompletion, IO,
125};
126use limbo_sqlite3_parser::{ast, ast::Cmd, lexer::sql::Parser};
127use parking_lot::RwLock;
128use schema::Schema;
129use std::{
130 borrow::Cow,
131 cell::{Cell, RefCell, UnsafeCell},
132 collections::HashMap,
133 fmt::Display,
134 io::Write,
135 num::NonZero,
136 ops::Deref,
137 rc::Rc,
138 sync::{Arc, OnceLock},
139};
140use storage::btree::{btree_init_page, BTreePageInner};
141#[cfg(feature = "fs")]
142use storage::database::DatabaseFile;
143pub use storage::pager::PagerCacheflushStatus;
144pub use storage::{
145 buffer_pool::BufferPool,
146 database::DatabaseStorage,
147 pager::PageRef,
148 pager::{Page, Pager},
149 wal::{CheckpointMode, CheckpointResult, CheckpointStatus, Wal, WalFile, WalFileShared},
150};
151use storage::{
152 page_cache::DumbLruPageCache,
153 pager::allocate_page,
154 sqlite3_ondisk::{DatabaseHeader, DATABASE_HEADER_SIZE},
155};
156use tracing::{instrument, Level};
157use translate::select::prepare_select_plan;
158pub use types::RefValue;
159pub use types::Value;
160use util::parse_schema_rows;
161use vdbe::builder::QueryMode;
162use vdbe::builder::TableRefIdCounter;
163
164pub type Result<T, E = LimboError> = std::result::Result<T, E>;
165pub static DATABASE_VERSION: OnceLock<String> = OnceLock::new();
166
167#[derive(Clone, Copy, PartialEq, Eq)]
168enum TransactionState {
169 Write,
170 Read,
171 None,
172}
173
174pub(crate) type MvStore = mvcc::MvStore<mvcc::LocalClock>;
175
176pub(crate) type MvCursor = mvcc::cursor::ScanCursor<mvcc::LocalClock>;
177
178pub struct Database {
179 mv_store: Option<Rc<MvStore>>,
180 schema: Arc<RwLock<Schema>>,
181 header: Arc<SpinLock<DatabaseHeader>>,
183 db_file: Arc<dyn DatabaseStorage>,
184 io: Arc<dyn IO>,
185 page_size: u32,
186 _shared_page_cache: Arc<RwLock<DumbLruPageCache>>,
189 shared_wal: Arc<UnsafeCell<WalFileShared>>,
190 open_flags: OpenFlags,
191}
192
193unsafe impl Send for Database {}
194unsafe impl Sync for Database {}
195
196impl Database {
197 #[cfg(feature = "fs")]
198 pub fn open_file(io: Arc<dyn IO>, path: &str, enable_mvcc: bool) -> Result<Arc<Database>> {
199 Self::open_file_with_flags(io, path, OpenFlags::default(), enable_mvcc)
200 }
201
202 #[cfg(feature = "fs")]
203 pub fn open_file_with_flags(
204 io: Arc<dyn IO>,
205 path: &str,
206 flags: OpenFlags,
207 enable_mvcc: bool,
208 ) -> Result<Arc<Database>> {
209 let file = io.open_file(path, flags, true)?;
210 let db_freshly_created = maybe_init_database_file(&file, &io)?;
216 let db_file = Arc::new(DatabaseFile::new(file));
217 Self::open_with_flags_inner(io, path, db_file, flags, enable_mvcc, db_freshly_created)
218 }
219
220 #[allow(clippy::arc_with_non_send_sync)]
221 pub fn open(
222 io: Arc<dyn IO>,
223 path: &str,
224 db_file: Arc<dyn DatabaseStorage>,
225 enable_mvcc: bool,
226 ) -> Result<Arc<Database>> {
227 Self::open_with_flags(io, path, db_file, OpenFlags::default(), enable_mvcc)
228 }
229
230 #[allow(clippy::arc_with_non_send_sync)]
231 pub fn open_with_flags(
232 io: Arc<dyn IO>,
233 path: &str,
234 db_file: Arc<dyn DatabaseStorage>,
235 flags: OpenFlags,
236 enable_mvcc: bool,
237 ) -> Result<Arc<Database>> {
238 Self::open_with_flags_inner(io, path, db_file, flags, enable_mvcc, false)
242 }
243
244 #[allow(clippy::arc_with_non_send_sync)]
245 fn open_with_flags_inner(
246 io: Arc<dyn IO>,
247 path: &str,
248 db_file: Arc<dyn DatabaseStorage>,
249 flags: OpenFlags,
250 enable_mvcc: bool,
251 db_freshly_created: bool,
252 ) -> Result<Arc<Database>> {
253 let db_header = Pager::begin_open(db_file.clone())?;
254 io.run_once()?;
256
257 let page_size = db_header.lock().get_page_size();
258 let wal_path = format!("{}-wal", path);
259 let shared_wal = WalFileShared::open_shared_inner(
260 &io,
261 wal_path.as_str(),
262 page_size,
263 db_freshly_created,
264 )?;
265
266 DATABASE_VERSION.get_or_init(|| {
267 let version = db_header.lock().version_number;
268 version.to_string()
269 });
270
271 let mv_store = if enable_mvcc {
272 Some(Rc::new(MvStore::new(
273 mvcc::LocalClock::new(),
274 mvcc::persistent_storage::Storage::new_noop(),
275 )))
276 } else {
277 None
278 };
279
280 let shared_page_cache = Arc::new(RwLock::new(DumbLruPageCache::default()));
281 let schema = Arc::new(RwLock::new(Schema::new()));
282 let db = Database {
283 mv_store,
284 schema: schema.clone(),
285 header: db_header.clone(),
286 _shared_page_cache: shared_page_cache.clone(),
287 shared_wal: shared_wal.clone(),
288 db_file,
289 io: io.clone(),
290 page_size,
291 open_flags: flags,
292 };
293 let db = Arc::new(db);
294 {
295 let conn = db.connect()?;
297 conn.pager.refresh_header_from_wal()?;
305 {
306 let rows = conn.query("SELECT * FROM sqlite_schema")?;
307 let mut schema = schema
308 .try_write()
309 .expect("lock on schema should succeed first try");
310 let syms = conn.syms.borrow();
311 if let Err(LimboError::ExtensionError(e)) =
312 parse_schema_rows(rows, &mut schema, io, &syms, None)
313 {
314 eprintln!("Warning: {}", e);
317 }
318 } conn.load_persistent_stats()?;
321 }
322 Ok(db)
323 }
324
325 pub fn connect(self: &Arc<Database>) -> Result<Arc<Connection>> {
326 let buffer_pool = Rc::new(BufferPool::new(self.page_size as usize));
327
328 let wal = Rc::new(RefCell::new(WalFile::new(
329 self.io.clone(),
330 self.page_size,
331 self.shared_wal.clone(),
332 buffer_pool.clone(),
333 )));
334 let pager = Rc::new(Pager::finish_open(
336 self.header.clone(),
337 self.db_file.clone(),
338 wal,
339 self.io.clone(),
340 Arc::new(RwLock::new(DumbLruPageCache::default())),
341 buffer_pool,
342 )?);
343 let conn = Arc::new(Connection {
344 _db: self.clone(),
345 pager: pager.clone(),
346 schema: self.schema.clone(),
347 header: self.header.clone(),
348 last_insert_rowid: Cell::new(0),
349 auto_commit: Cell::new(true),
350 mv_transactions: RefCell::new(Vec::new()),
351 transaction_state: Cell::new(TransactionState::None),
352 last_change: Cell::new(0),
353 syms: RefCell::new(SymbolTable::new()),
354 total_changes: Cell::new(0),
355 _shared_cache: false,
356 cache_size: Cell::new(self.header.lock().default_page_cache_size),
357 closed: Cell::new(false),
358 });
359 if let Err(e) = conn.register_builtins() {
360 return Err(LimboError::ExtensionError(e));
361 }
362 Ok(conn)
363 }
364
365 #[cfg(feature = "fs")]
368 #[allow(clippy::arc_with_non_send_sync)]
369 pub fn open_new(path: &str, vfs: &str) -> Result<(Arc<dyn IO>, Arc<Database>)> {
370 let vfsmods = ext::add_builtin_vfs_extensions(None)?;
371 let io: Arc<dyn IO> = match vfsmods.iter().find(|v| v.0 == vfs).map(|v| v.1.clone()) {
372 Some(vfs) => vfs,
373 None => match vfs.trim() {
374 "memory" => Arc::new(MemoryIO::new()),
375 "syscall" => Arc::new(SyscallIO::new()?),
376 #[cfg(all(target_os = "linux", feature = "io_uring"))]
377 "io_uring" => Arc::new(UringIO::new()?),
378 other => {
379 return Err(LimboError::InvalidArgument(format!(
380 "no such VFS: {}",
381 other
382 )));
383 }
384 },
385 };
386 let db = Self::open_file(io.clone(), path, false)?;
387 Ok((io, db))
388 }
389}
390
391pub fn maybe_init_database_file(file: &Arc<dyn File>, io: &Arc<dyn IO>) -> Result<bool> {
401 if file.size()? == 0 {
402 let db_header = DatabaseHeader::default();
404 let page1 = allocate_page(
405 1,
406 &Rc::new(BufferPool::new(db_header.get_page_size() as usize)),
407 DATABASE_HEADER_SIZE,
408 );
409 let page1 = Arc::new(BTreePageInner {
410 page: RefCell::new(page1),
411 });
412 {
413 btree_init_page(
418 &page1,
419 storage::sqlite3_ondisk::PageType::TableLeaf,
420 DATABASE_HEADER_SIZE,
421 (db_header.get_page_size() - db_header.reserved_space as u32) as u16,
422 );
423
424 let page1 = page1.get();
425 let contents = page1
426 .get()
427 .contents
428 .as_mut()
429 .expect("invariant: page1 contents initialized before write"); contents.write_database_header(&db_header);
431 let flag_complete = Rc::new(RefCell::new(false));
433 {
434 let flag_complete = flag_complete.clone();
435 let completion = Completion::Write(WriteCompletion::new(Box::new(move |_| {
436 *flag_complete.borrow_mut() = true;
437 })));
438 #[allow(clippy::arc_with_non_send_sync)]
439 file.pwrite(0, contents.buffer.clone(), Arc::new(completion))?;
440 }
441 let mut limit = 100;
442 loop {
443 io.run_once()?;
444 if *flag_complete.borrow() {
445 break;
446 }
447 limit -= 1;
448 if limit == 0 {
449 panic!("Database file couldn't be initialized, io loop run for {} iterations and write didn't finish", limit);
450 }
451 }
452 }
453 return Ok(true);
456 }
457 Ok(false)
458}
459
460pub struct Connection {
461 _db: Arc<Database>,
462 pager: Rc<Pager>,
463 schema: Arc<RwLock<Schema>>,
464 header: Arc<SpinLock<DatabaseHeader>>,
465 auto_commit: Cell<bool>,
466 mv_transactions: RefCell<Vec<crate::mvcc::database::TxID>>,
467 transaction_state: Cell<TransactionState>,
468 last_insert_rowid: Cell<i64>,
469 last_change: Cell<i64>,
470 total_changes: Cell<i64>,
471 syms: RefCell<SymbolTable>,
472 _shared_cache: bool,
473 cache_size: Cell<i32>,
474 closed: Cell<bool>,
477}
478
479impl Connection {
480 #[instrument(skip_all, level = Level::TRACE)]
481 pub fn prepare(self: &Arc<Connection>, sql: impl AsRef<str>) -> Result<Statement> {
482 if sql.as_ref().is_empty() {
483 return Err(LimboError::InvalidArgument(
484 "The supplied SQL string contains no statements".to_string(),
485 ));
486 }
487
488 let sql = sql.as_ref();
489 tracing::trace!("Preparing: {}", sql);
490 let mut parser = Parser::new(sql.as_bytes());
491 let cmd = parser.next()?;
492 let syms = self.syms.borrow();
493 let cmd = cmd.expect("Successful parse on nonempty input string should produce a command");
494 let byte_offset_end = parser.offset();
495 let input = str::from_utf8(&sql.as_bytes()[..byte_offset_end])
496 .expect("invariant: sql is valid UTF-8 bytes slice") .trim();
498 match cmd {
499 Cmd::Stmt(stmt) => {
500 let program = Rc::new(translate::translate(
501 self.schema
502 .try_read()
503 .ok_or(LimboError::SchemaLocked)?
504 .deref(),
505 stmt,
506 self.header.clone(),
507 self.pager.clone(),
508 self.clone(),
509 &syms,
510 QueryMode::Normal,
511 &input,
512 )?);
513 Ok(Statement::new(
514 program,
515 self._db.mv_store.clone(),
516 self.pager.clone(),
517 ))
518 }
519 Cmd::Explain(_stmt) => todo!(),
520 Cmd::ExplainQueryPlan(_stmt) => todo!(),
521 }
522 }
523
524 #[instrument(skip_all, level = Level::TRACE)]
525 pub fn query(self: &Arc<Connection>, sql: impl AsRef<str>) -> Result<Option<Statement>> {
526 let sql = sql.as_ref();
527 tracing::trace!("Querying: {}", sql);
528 let mut parser = Parser::new(sql.as_bytes());
529 let cmd = parser.next()?;
530 let byte_offset_end = parser.offset();
531 let input = str::from_utf8(&sql.as_bytes()[..byte_offset_end])
532 .expect("invariant: sql is valid UTF-8 bytes slice") .trim();
534 match cmd {
535 Some(cmd) => self.run_cmd(cmd, input),
536 None => Ok(None),
537 }
538 }
539
540 #[instrument(skip_all, level = Level::TRACE)]
541 pub(crate) fn run_cmd(
542 self: &Arc<Connection>,
543 cmd: Cmd,
544 input: &str,
545 ) -> Result<Option<Statement>> {
546 let syms = self.syms.borrow();
547 match cmd {
548 Cmd::Stmt(ref stmt) | Cmd::Explain(ref stmt) => {
549 let program = translate::translate(
550 self.schema
551 .try_read()
552 .ok_or(LimboError::SchemaLocked)?
553 .deref(),
554 stmt.clone(),
555 self.header.clone(),
556 self.pager.clone(),
557 self.clone(),
558 &syms,
559 cmd.into(),
560 input,
561 )?;
562 let stmt = Statement::new(
563 program.into(),
564 self._db.mv_store.clone(),
565 self.pager.clone(),
566 );
567 Ok(Some(stmt))
568 }
569 Cmd::ExplainQueryPlan(stmt) => {
570 let mut table_ref_counter = TableRefIdCounter::new();
571 match stmt {
572 ast::Stmt::Select(select) => {
573 let mut plan = prepare_select_plan(
574 self.schema
575 .try_read()
576 .ok_or(LimboError::SchemaLocked)?
577 .deref(),
578 *select,
579 &syms,
580 &[],
581 &mut table_ref_counter,
582 translate::plan::QueryDestination::ResultRows,
583 )?;
584 optimize_plan(
585 &mut plan,
586 self.schema
587 .try_read()
588 .ok_or(LimboError::SchemaLocked)?
589 .deref(),
590 )?;
591 let _ = std::io::stdout().write_all(plan.to_string().as_bytes());
592 }
593 _ => todo!(),
594 }
595 Ok(None)
596 }
597 }
598 }
599
600 pub fn query_runner<'a>(self: &'a Arc<Connection>, sql: &'a [u8]) -> QueryRunner<'a> {
601 QueryRunner::new(self, sql)
602 }
603
604 #[instrument(skip_all, level = Level::TRACE)]
607 pub fn execute(self: &Arc<Connection>, sql: impl AsRef<str>) -> Result<()> {
608 let sql = sql.as_ref();
609 let mut parser = Parser::new(sql.as_bytes());
610 let cmd = parser.next()?;
611 let syms = self.syms.borrow();
612 let byte_offset_end = parser.offset();
613 let input = str::from_utf8(&sql.as_bytes()[..byte_offset_end])
614 .expect("invariant: sql is valid UTF-8 bytes slice") .trim();
616 if let Some(cmd) = cmd {
617 match cmd {
618 Cmd::Explain(stmt) => {
619 let program = translate::translate(
620 self.schema
621 .try_read()
622 .ok_or(LimboError::SchemaLocked)?
623 .deref(),
624 stmt,
625 self.header.clone(),
626 self.pager.clone(),
627 self.clone(),
628 &syms,
629 QueryMode::Explain,
630 &input,
631 )?;
632 let _ = std::io::stdout().write_all(program.explain().as_bytes());
633 }
634 Cmd::ExplainQueryPlan(_stmt) => todo!(),
635 Cmd::Stmt(stmt) => {
636 let program = translate::translate(
637 self.schema
638 .try_read()
639 .ok_or(LimboError::SchemaLocked)?
640 .deref(),
641 stmt,
642 self.header.clone(),
643 self.pager.clone(),
644 self.clone(),
645 &syms,
646 QueryMode::Normal,
647 &input,
648 )?;
649
650 let mut state =
651 vdbe::ProgramState::new(program.max_registers, program.cursor_ref.len());
652 loop {
653 let res = program.step(
654 &mut state,
655 self._db.mv_store.clone(),
656 self.pager.clone(),
657 )?;
658 if matches!(res, StepResult::Done) {
659 break;
660 }
661 self._db.io.run_once()?;
662 }
663 }
664 }
665 }
666 Ok(())
667 }
668
669 pub fn wal_frame_count(&self) -> Result<u64> {
670 self.pager.wal_frame_count()
671 }
672
673 pub fn wal_get_frame(
674 &self,
675 frame_no: u32,
676 p_frame: *mut u8,
677 frame_len: u32,
678 ) -> Result<Arc<Completion>> {
679 self.pager.wal_get_frame(frame_no, p_frame, frame_len)
680 }
681
682 pub fn cacheflush(&self) -> Result<PagerCacheflushStatus> {
687 self.pager.cacheflush()
688 }
689
690 pub fn clear_page_cache(&self) -> Result<()> {
691 self.pager.clear_page_cache();
692 Ok(())
693 }
694
695 pub fn checkpoint(&self) -> Result<CheckpointResult> {
696 let checkpoint_result = self.pager.wal_checkpoint();
697 Ok(checkpoint_result)
698 }
699
700 pub fn checkpoint_truncate(&self) -> Result<CheckpointResult> {
702 self.pager.wal_checkpoint_mode(CheckpointMode::Truncate)
703 }
704
705 pub fn close(&self) -> Result<()> {
708 if self.closed.replace(true) {
709 return Ok(());
710 }
711 self.pager.checkpoint_shutdown()
712 }
713
714 pub fn last_insert_rowid(&self) -> i64 {
715 self.last_insert_rowid.get()
716 }
717
718 fn update_last_rowid(&self, rowid: i64) {
719 self.last_insert_rowid.set(rowid);
720 }
721
722 pub fn set_changes(&self, nchange: i64) {
723 self.last_change.set(nchange);
724 let prev_total_changes = self.total_changes.get();
725 self.total_changes.set(prev_total_changes + nchange);
726 }
727
728 pub fn changes(&self) -> i64 {
734 self.last_change.get()
735 }
736
737 pub fn total_changes(&self) -> i64 {
738 self.total_changes.get()
739 }
740
741 pub fn get_cache_size(&self) -> i32 {
742 self.cache_size.get()
743 }
744 pub fn set_cache_size(&self, size: i32) {
745 self.cache_size.set(size);
746 }
747
748 #[cfg(feature = "fs")]
749 pub fn open_new(&self, path: &str, vfs: &str) -> Result<(Arc<dyn IO>, Arc<Database>)> {
750 Database::open_with_vfs(&self._db, path, vfs)
751 }
752
753 pub fn list_vfs(&self) -> Vec<String> {
754 let mut all_vfs = vec![String::from("memory")];
755 #[cfg(feature = "fs")]
756 {
757 #[cfg(target_family = "unix")]
758 {
759 all_vfs.push("syscall".to_string());
760 }
761 #[cfg(all(target_os = "linux", feature = "io_uring"))]
762 {
763 all_vfs.push("io_uring".to_string());
764 }
765 all_vfs.extend(crate::ext::list_vfs_modules());
766 }
767 all_vfs
768 }
769
770 pub fn get_auto_commit(&self) -> bool {
771 self.auto_commit.get()
772 }
773
774 pub fn parse_schema_rows(self: &Arc<Connection>) -> Result<()> {
775 let rows = self.query("SELECT * FROM sqlite_schema")?;
776 {
777 let mut schema = self
778 .schema
779 .try_write()
780 .expect("lock on schema should succeed first try");
781 let syms = self.syms.borrow();
782 if let Err(LimboError::ExtensionError(e)) =
783 parse_schema_rows(rows, &mut schema, self.pager.io.clone(), &syms, None)
784 {
785 eprintln!("Warning: {}", e);
788 }
789 } self.load_persistent_stats()?;
791 Ok(())
792 }
793
794 pub fn load_persistent_stats(self: &Arc<Connection>) -> Result<()> {
798 {
799 let schema = self.schema.read();
800 if schema.get_table("sqlite_stat1").is_none() {
801 return Ok(());
802 }
803 } let rows = self.query("SELECT tbl, idx, stat FROM sqlite_stat1")?;
805 let mut schema = self.schema.write();
806 schema.stats.clear();
807 crate::util::load_stat1(rows, &mut schema, self.pager.io.clone(), None)?;
808 Ok(())
809 }
810
811 pub fn pragma_query(self: &Arc<Connection>, pragma_name: &str) -> Result<Vec<Vec<Value>>> {
814 let pragma = format!("PRAGMA {}", pragma_name);
815 let mut stmt = self.prepare(pragma)?;
816 let mut results = Vec::new();
817 loop {
818 match stmt.step()? {
819 vdbe::StepResult::Row => {
820 let row: Vec<Value> = stmt
821 .row()
822 .expect("invariant: row available after StepResult::Row") .get_values()
824 .map(|v| v.clone())
825 .collect();
826 results.push(row);
827 }
828 vdbe::StepResult::Interrupt | vdbe::StepResult::Busy => {
829 return Err(LimboError::Busy);
830 }
831 _ => break,
832 }
833 }
834
835 Ok(results)
836 }
837
838 pub fn pragma_update<V: Display>(
843 self: &Arc<Connection>,
844 pragma_name: &str,
845 pragma_value: V,
846 ) -> Result<Vec<Vec<Value>>> {
847 let pragma = format!("PRAGMA {} = {}", pragma_name, pragma_value);
848 let mut stmt = self.prepare(pragma)?;
849 let mut results = Vec::new();
850 loop {
851 match stmt.step()? {
852 vdbe::StepResult::Row => {
853 let row: Vec<Value> = stmt
854 .row()
855 .expect("invariant: row available after StepResult::Row") .get_values()
857 .map(|v| v.clone())
858 .collect();
859 results.push(row);
860 }
861 vdbe::StepResult::IO => {
862 stmt.run_once()?;
865 }
866 vdbe::StepResult::Interrupt | vdbe::StepResult::Busy => {
867 return Err(LimboError::Busy);
868 }
869 _ => break,
870 }
871 }
872
873 Ok(results)
874 }
875
876 pub fn pragma<V: Display>(
883 self: &Arc<Connection>,
884 pragma_name: &str,
885 pragma_value: V,
886 ) -> Result<Vec<Vec<Value>>> {
887 let pragma = format!("PRAGMA {}({})", pragma_name, pragma_value);
888 let mut stmt = self.prepare(pragma)?;
889 let mut results = Vec::new();
890 loop {
891 match stmt.step()? {
892 vdbe::StepResult::Row => {
893 let row: Vec<Value> = stmt
894 .row()
895 .expect("invariant: row available after StepResult::Row") .get_values()
897 .map(|v| v.clone())
898 .collect();
899 results.push(row);
900 }
901 vdbe::StepResult::IO => {
902 stmt.run_once()?;
905 }
906 vdbe::StepResult::Interrupt | vdbe::StepResult::Busy => {
907 return Err(LimboError::Busy);
908 }
909 _ => break,
910 }
911 }
912
913 Ok(results)
914 }
915}
916
917impl Drop for Connection {
918 fn drop(&mut self) {
919 if self.closed.replace(true) {
923 return;
924 }
925 if let Err(e) = self.pager.checkpoint_shutdown() {
926 tracing::warn!("Connection::drop best-effort checkpoint failed: {e}");
927 }
928 }
929}
930
931pub struct Statement {
932 program: Rc<vdbe::Program>,
933 state: vdbe::ProgramState,
934 mv_store: Option<Rc<MvStore>>,
935 pager: Rc<Pager>,
936}
937
938impl Statement {
939 pub fn new(
940 program: Rc<vdbe::Program>,
941 mv_store: Option<Rc<MvStore>>,
942 pager: Rc<Pager>,
943 ) -> Self {
944 let state = vdbe::ProgramState::new(program.max_registers, program.cursor_ref.len());
945 Self {
946 program,
947 state,
948 mv_store,
949 pager,
950 }
951 }
952
953 pub fn set_mv_tx_id(&mut self, mv_tx_id: Option<u64>) {
954 self.state.mv_tx_id = mv_tx_id;
955 }
956
957 pub fn interrupt(&mut self) {
958 self.state.interrupt();
959 }
960
961 pub fn step(&mut self) -> Result<StepResult> {
962 self.program
963 .step(&mut self.state, self.mv_store.clone(), self.pager.clone())
964 }
965
966 pub fn run_once(&self) -> Result<()> {
967 self.pager.io.run_once()
968 }
969
970 pub fn num_columns(&self) -> usize {
971 self.program.result_columns.len()
972 }
973
974 pub fn get_column_name(&self, idx: usize) -> Cow<str> {
975 let column = &self.program.result_columns.get(idx).expect("No column");
976 match column.name(&self.program.table_references) {
977 Some(name) => Cow::Borrowed(name),
978 None => Cow::Owned(column.expr.to_string()),
979 }
980 }
981
982 pub fn get_column_decl_type(&self, idx: usize) -> Option<Cow<str>> {
988 let column = self.program.result_columns.get(idx)?;
989 match &column.expr {
990 limbo_sqlite3_parser::ast::Expr::Column {
991 table,
992 column: col_idx,
993 ..
994 } => self
995 .program
996 .table_references
997 .find_table_by_internal_id(*table)
998 .and_then(|tbl| tbl.get_column_at(*col_idx))
999 .map(|c| Cow::Owned(c.ty_str.clone())),
1000 _ => None,
1001 }
1002 }
1003
1004 pub fn parameters(&self) -> ¶meters::Parameters {
1005 &self.program.parameters
1006 }
1007
1008 pub fn parameters_count(&self) -> usize {
1009 self.program.parameters.count()
1010 }
1011
1012 pub fn bind_at(&mut self, index: NonZero<usize>, value: Value) {
1013 self.state.bind_at(index, value);
1014 }
1015
1016 pub fn reset(&mut self) {
1017 self.state.reset();
1018 self.program.n_change.set(0);
1019 }
1020
1021 pub fn row(&self) -> Option<&Row> {
1022 self.state.result_row.as_ref()
1023 }
1024
1025 pub fn explain(&self) -> String {
1026 self.program.explain()
1027 }
1028}
1029
1030pub type Row = vdbe::Row;
1031
1032pub type StepResult = vdbe::StepResult;
1033
1034pub struct SymbolTable {
1035 pub functions: HashMap<String, Rc<function::ExternalFunc>>,
1036 pub vtabs: HashMap<String, Rc<VirtualTable>>,
1037 pub vtab_modules: HashMap<String, Rc<crate::ext::VTabImpl>>,
1038}
1039
1040impl std::fmt::Debug for SymbolTable {
1041 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
1042 f.debug_struct("SymbolTable")
1043 .field("functions", &self.functions)
1044 .finish()
1045 }
1046}
1047
1048fn is_shared_library(path: &std::path::Path) -> bool {
1049 path.extension()
1050 .map_or(false, |ext| ext == "so" || ext == "dylib" || ext == "dll")
1051}
1052
1053pub fn resolve_ext_path(extpath: &str) -> Result<std::path::PathBuf> {
1054 let path = std::path::Path::new(extpath);
1055 if !path.exists() {
1056 if is_shared_library(path) {
1057 return Err(LimboError::ExtensionError(format!(
1058 "Extension file not found: {}",
1059 extpath
1060 )));
1061 };
1062 let maybe = path.with_extension(std::env::consts::DLL_EXTENSION);
1063 maybe
1064 .exists()
1065 .then_some(maybe)
1066 .ok_or(LimboError::ExtensionError(format!(
1067 "Extension file not found: {}",
1068 extpath
1069 )))
1070 } else {
1071 Ok(path.to_path_buf())
1072 }
1073}
1074
1075impl SymbolTable {
1076 pub fn new() -> Self {
1077 Self {
1078 functions: HashMap::new(),
1079 vtabs: HashMap::new(),
1080 vtab_modules: HashMap::new(),
1081 }
1082 }
1083
1084 pub fn resolve_function(
1085 &self,
1086 name: &str,
1087 _arg_count: usize,
1088 ) -> Option<Rc<function::ExternalFunc>> {
1089 self.functions.get(name).cloned()
1090 }
1091}
1092
1093pub struct QueryRunner<'a> {
1094 parser: Parser<'a>,
1095 conn: &'a Arc<Connection>,
1096 statements: &'a [u8],
1097 last_offset: usize,
1098}
1099
1100impl<'a> QueryRunner<'a> {
1101 pub(crate) fn new(conn: &'a Arc<Connection>, statements: &'a [u8]) -> Self {
1102 Self {
1103 parser: Parser::new(statements),
1104 conn,
1105 statements,
1106 last_offset: 0,
1107 }
1108 }
1109}
1110
1111impl Iterator for QueryRunner<'_> {
1112 type Item = Result<Option<Statement>>;
1113
1114 fn next(&mut self) -> Option<Self::Item> {
1115 match self.parser.next() {
1116 Ok(Some(cmd)) => {
1117 let byte_offset_end = self.parser.offset();
1118 let input = str::from_utf8(&self.statements[self.last_offset..byte_offset_end])
1119 .expect("invariant: statements are valid UTF-8 bytes slice") .trim();
1121 self.last_offset = byte_offset_end;
1122 Some(self.conn.run_cmd(cmd, &input))
1123 }
1124 Ok(None) => None,
1125 Err(err) => {
1126 self.parser.finalize();
1127 Some(Result::Err(LimboError::from(err)))
1128 }
1129 }
1130 }
1131}