Skip to main content

limbo_core/
lib.rs

1// vendored from limbo 0.0.22 upstream — upstream URLs/HTML in doc comments
2#![allow(
3    rustdoc::bare_urls,
4    rustdoc::invalid_html_tags,
5    rustdoc::broken_intra_doc_links
6)]
7#![allow(clippy::arc_with_non_send_sync)]
8// vendored from limbo 0.0.22 upstream; see NOTICE
9#![allow(unused_assignments)]
10// vendored from limbo 0.0.22 upstream; see NOTICE
11#![allow(mismatched_lifetime_syntaxes)]
12// vendored from limbo 0.0.22 upstream; see NOTICE
13#![allow(unpredictable_function_pointer_comparisons)]
14// UPSTREAM: vendored Limbo fork — allow upstream style
15#![allow(
16    clippy::bool_assert_comparison,
17    clippy::collapsible_match,
18    clippy::clone_on_copy,
19    clippy::comparison_to_empty,
20    clippy::derivable_impls,
21    clippy::doc_lazy_continuation,
22    clippy::doc_overindented_list_items,
23    clippy::duplicated_attributes,
24    clippy::enum_variant_names,
25    clippy::excessive_precision,
26    clippy::explicit_auto_deref,
27    clippy::explicit_counter_loop,
28    clippy::extra_unused_lifetimes,
29    clippy::filter_next,
30    clippy::from_over_into,
31    clippy::get_first,
32    clippy::identity_op,
33    clippy::inherent_to_string,
34    clippy::iter_cloned_collect,
35    clippy::large_enum_variant,
36    clippy::len_without_is_empty,
37    clippy::len_zero,
38    clippy::let_and_return,
39    clippy::manual_ignore_case_cmp,
40    clippy::manual_inspect,
41    clippy::manual_is_multiple_of,
42    clippy::manual_map,
43    clippy::manual_ok_err,
44    clippy::manual_range_contains,
45    clippy::manual_repeat_n,
46    clippy::manual_saturating_arithmetic,
47    clippy::manual_strip,
48    clippy::map_clone,
49    clippy::match_like_matches_macro,
50    clippy::needless_borrow,
51    clippy::needless_borrows_for_generic_args,
52    clippy::needless_lifetimes,
53    clippy::needless_range_loop,
54    clippy::needless_return,
55    clippy::new_without_default,
56    clippy::nonminimal_bool,
57    clippy::obfuscated_if_else,
58    clippy::option_as_ref_deref,
59    clippy::option_map_or_none,
60    clippy::partialeq_ne_impl,
61    clippy::partialeq_to_none,
62    clippy::ptr_arg,
63    clippy::question_mark,
64    clippy::redundant_closure,
65    clippy::redundant_field_names,
66    clippy::redundant_pattern_matching,
67    clippy::should_implement_trait,
68    clippy::single_match,
69    clippy::too_many_arguments,
70    clippy::unnecessary_cast,
71    clippy::unnecessary_map_or,
72    clippy::unnecessary_mut_passed,
73    clippy::unnecessary_unwrap,
74    clippy::unneeded_struct_pattern,
75    clippy::unused_unit,
76    clippy::upper_case_acronyms,
77    clippy::useless_conversion,
78    clippy::useless_format,
79    clippy::wrong_self_convention
80)]
81
82mod error;
83mod ext;
84mod fast_lock;
85mod function;
86mod functions;
87mod info;
88mod io;
89#[cfg(feature = "json")]
90mod json;
91pub mod mvcc;
92mod parameters;
93mod pragma;
94mod pseudo;
95pub mod result;
96mod schema;
97mod statistics;
98mod storage;
99mod translate;
100pub mod types;
101#[allow(dead_code)]
102mod util;
103mod vdbe;
104mod vector;
105mod vtab;
106
107#[cfg(feature = "fuzz")]
108pub mod numeric;
109
110#[cfg(not(feature = "fuzz"))]
111mod numeric;
112
113use crate::vtab::VirtualTable;
114use crate::{fast_lock::SpinLock, translate::optimizer::optimize_plan};
115use core::str;
116pub use error::LimboError;
117use fallible_iterator::FallibleIterator;
118pub use io::clock::{Clock, Instant};
119#[cfg(all(feature = "fs", target_family = "unix", feature = "native-io"))]
120pub use io::UnixIO;
121#[cfg(all(feature = "fs", target_os = "linux", feature = "io_uring"))]
122pub use io::UringIO;
123pub use io::{
124    Buffer, Completion, File, MemoryIO, OpenFlags, PlatformIO, SyscallIO, WriteCompletion, IO,
125};
126use limbo_sqlite3_parser::{ast, ast::Cmd, lexer::sql::Parser};
127use parking_lot::RwLock;
128use schema::Schema;
129use std::{
130    borrow::Cow,
131    cell::{Cell, RefCell, UnsafeCell},
132    collections::HashMap,
133    fmt::Display,
134    io::Write,
135    num::NonZero,
136    ops::Deref,
137    rc::Rc,
138    sync::{Arc, OnceLock},
139};
140use storage::btree::{btree_init_page, BTreePageInner};
141#[cfg(feature = "fs")]
142use storage::database::DatabaseFile;
143pub use storage::pager::PagerCacheflushStatus;
144pub use storage::{
145    buffer_pool::BufferPool,
146    database::DatabaseStorage,
147    pager::PageRef,
148    pager::{Page, Pager},
149    wal::{CheckpointMode, CheckpointResult, CheckpointStatus, Wal, WalFile, WalFileShared},
150};
151use storage::{
152    page_cache::DumbLruPageCache,
153    pager::allocate_page,
154    sqlite3_ondisk::{DatabaseHeader, DATABASE_HEADER_SIZE},
155};
156use tracing::{instrument, Level};
157use translate::select::prepare_select_plan;
158pub use types::RefValue;
159pub use types::Value;
160use util::parse_schema_rows;
161use vdbe::builder::QueryMode;
162use vdbe::builder::TableRefIdCounter;
163
164pub type Result<T, E = LimboError> = std::result::Result<T, E>;
165pub static DATABASE_VERSION: OnceLock<String> = OnceLock::new();
166
167#[derive(Clone, Copy, PartialEq, Eq)]
168enum TransactionState {
169    Write,
170    Read,
171    None,
172}
173
174pub(crate) type MvStore = mvcc::MvStore<mvcc::LocalClock>;
175
176pub(crate) type MvCursor = mvcc::cursor::ScanCursor<mvcc::LocalClock>;
177
178pub struct Database {
179    mv_store: Option<Rc<MvStore>>,
180    schema: Arc<RwLock<Schema>>,
181    // TODO: make header work without lock
182    header: Arc<SpinLock<DatabaseHeader>>,
183    db_file: Arc<dyn DatabaseStorage>,
184    io: Arc<dyn IO>,
185    page_size: u32,
186    // Shared structures of a Database are the parts that are common to multiple threads that might
187    // create DB connections.
188    _shared_page_cache: Arc<RwLock<DumbLruPageCache>>,
189    shared_wal: Arc<UnsafeCell<WalFileShared>>,
190    open_flags: OpenFlags,
191}
192
193unsafe impl Send for Database {}
194unsafe impl Sync for Database {}
195
196impl Database {
197    #[cfg(feature = "fs")]
198    pub fn open_file(io: Arc<dyn IO>, path: &str, enable_mvcc: bool) -> Result<Arc<Database>> {
199        Self::open_file_with_flags(io, path, OpenFlags::default(), enable_mvcc)
200    }
201
202    #[cfg(feature = "fs")]
203    pub fn open_file_with_flags(
204        io: Arc<dyn IO>,
205        path: &str,
206        flags: OpenFlags,
207        enable_mvcc: bool,
208    ) -> Result<Arc<Database>> {
209        let file = io.open_file(path, flags, true)?;
210        // `db_freshly_created` is true when the main database file did not exist
211        // (or was empty) and we just wrote its bootstrap page. In that case any
212        // `-wal` file found alongside it is an orphan from a previous database
213        // incarnation and must NOT be replayed (doing so resurrects stale
214        // committed data — see `WalFileShared::open_shared`).
215        let db_freshly_created = maybe_init_database_file(&file, &io)?;
216        let db_file = Arc::new(DatabaseFile::new(file));
217        Self::open_with_flags_inner(io, path, db_file, flags, enable_mvcc, db_freshly_created)
218    }
219
220    #[allow(clippy::arc_with_non_send_sync)]
221    pub fn open(
222        io: Arc<dyn IO>,
223        path: &str,
224        db_file: Arc<dyn DatabaseStorage>,
225        enable_mvcc: bool,
226    ) -> Result<Arc<Database>> {
227        Self::open_with_flags(io, path, db_file, OpenFlags::default(), enable_mvcc)
228    }
229
230    #[allow(clippy::arc_with_non_send_sync)]
231    pub fn open_with_flags(
232        io: Arc<dyn IO>,
233        path: &str,
234        db_file: Arc<dyn DatabaseStorage>,
235        flags: OpenFlags,
236        enable_mvcc: bool,
237    ) -> Result<Arc<Database>> {
238        // Callers that supply their own `db_file` (custom storage backends,
239        // tests) take responsibility for WAL lifecycle, so we conservatively
240        // treat the database as pre-existing (never discard a WAL here).
241        Self::open_with_flags_inner(io, path, db_file, flags, enable_mvcc, false)
242    }
243
244    #[allow(clippy::arc_with_non_send_sync)]
245    fn open_with_flags_inner(
246        io: Arc<dyn IO>,
247        path: &str,
248        db_file: Arc<dyn DatabaseStorage>,
249        flags: OpenFlags,
250        enable_mvcc: bool,
251        db_freshly_created: bool,
252    ) -> Result<Arc<Database>> {
253        let db_header = Pager::begin_open(db_file.clone())?;
254        // ensure db header is there
255        io.run_once()?;
256
257        let page_size = db_header.lock().get_page_size();
258        let wal_path = format!("{}-wal", path);
259        let shared_wal = WalFileShared::open_shared_inner(
260            &io,
261            wal_path.as_str(),
262            page_size,
263            db_freshly_created,
264        )?;
265
266        DATABASE_VERSION.get_or_init(|| {
267            let version = db_header.lock().version_number;
268            version.to_string()
269        });
270
271        let mv_store = if enable_mvcc {
272            Some(Rc::new(MvStore::new(
273                mvcc::LocalClock::new(),
274                mvcc::persistent_storage::Storage::new_noop(),
275            )))
276        } else {
277            None
278        };
279
280        let shared_page_cache = Arc::new(RwLock::new(DumbLruPageCache::default()));
281        let schema = Arc::new(RwLock::new(Schema::new()));
282        let db = Database {
283            mv_store,
284            schema: schema.clone(),
285            header: db_header.clone(),
286            _shared_page_cache: shared_page_cache.clone(),
287            shared_wal: shared_wal.clone(),
288            db_file,
289            io: io.clone(),
290            page_size,
291            open_flags: flags,
292        };
293        let db = Arc::new(db);
294        {
295            // parse schema
296            let conn = db.connect()?;
297            // The header was read straight from the main database file above,
298            // bypassing the WAL. If a previous session committed a page-1 change
299            // (e.g. `PRAGMA application_id` / `PRAGMA user_version`) to the WAL
300            // without checkpointing it back into the main file, that change is
301            // durable in the WAL but absent from the freshly-read header. Resolve
302            // page 1 through the now-recovered WAL so the shared header reflects
303            // the latest committed cookie values before any query runs.
304            conn.pager.refresh_header_from_wal()?;
305            {
306                let rows = conn.query("SELECT * FROM sqlite_schema")?;
307                let mut schema = schema
308                    .try_write()
309                    .expect("lock on schema should succeed first try");
310                let syms = conn.syms.borrow();
311                if let Err(LimboError::ExtensionError(e)) =
312                    parse_schema_rows(rows, &mut schema, io, &syms, None)
313                {
314                    // this means that a vtab exists and we no longer have the module loaded. we print
315                    // a warning to the user to load the module
316                    eprintln!("Warning: {}", e);
317                }
318            } // schema write guard + syms dropped here
319              // Load persisted ANALYZE statistics (no-op for un-analyzed databases).
320            conn.load_persistent_stats()?;
321        }
322        Ok(db)
323    }
324
325    pub fn connect(self: &Arc<Database>) -> Result<Arc<Connection>> {
326        let buffer_pool = Rc::new(BufferPool::new(self.page_size as usize));
327
328        let wal = Rc::new(RefCell::new(WalFile::new(
329            self.io.clone(),
330            self.page_size,
331            self.shared_wal.clone(),
332            buffer_pool.clone(),
333        )));
334        // For now let's open database without shared cache by default.
335        let pager = Rc::new(Pager::finish_open(
336            self.header.clone(),
337            self.db_file.clone(),
338            wal,
339            self.io.clone(),
340            Arc::new(RwLock::new(DumbLruPageCache::default())),
341            buffer_pool,
342        )?);
343        let conn = Arc::new(Connection {
344            _db: self.clone(),
345            pager: pager.clone(),
346            schema: self.schema.clone(),
347            header: self.header.clone(),
348            last_insert_rowid: Cell::new(0),
349            auto_commit: Cell::new(true),
350            mv_transactions: RefCell::new(Vec::new()),
351            transaction_state: Cell::new(TransactionState::None),
352            last_change: Cell::new(0),
353            syms: RefCell::new(SymbolTable::new()),
354            total_changes: Cell::new(0),
355            _shared_cache: false,
356            cache_size: Cell::new(self.header.lock().default_page_cache_size),
357            closed: Cell::new(false),
358        });
359        if let Err(e) = conn.register_builtins() {
360            return Err(LimboError::ExtensionError(e));
361        }
362        Ok(conn)
363    }
364
365    /// Open a new database file with a specified VFS without an existing database
366    /// connection and symbol table to register extensions.
367    #[cfg(feature = "fs")]
368    #[allow(clippy::arc_with_non_send_sync)]
369    pub fn open_new(path: &str, vfs: &str) -> Result<(Arc<dyn IO>, Arc<Database>)> {
370        let vfsmods = ext::add_builtin_vfs_extensions(None)?;
371        let io: Arc<dyn IO> = match vfsmods.iter().find(|v| v.0 == vfs).map(|v| v.1.clone()) {
372            Some(vfs) => vfs,
373            None => match vfs.trim() {
374                "memory" => Arc::new(MemoryIO::new()),
375                "syscall" => Arc::new(SyscallIO::new()?),
376                #[cfg(all(target_os = "linux", feature = "io_uring"))]
377                "io_uring" => Arc::new(UringIO::new()?),
378                other => {
379                    return Err(LimboError::InvalidArgument(format!(
380                        "no such VFS: {}",
381                        other
382                    )));
383                }
384            },
385        };
386        let db = Self::open_file(io.clone(), path, false)?;
387        Ok((io, db))
388    }
389}
390
391/// Initialize a brand-new database file (write the bootstrap page 1) if the
392/// file is empty.
393///
394/// Returns `true` when the file was freshly created (its size was 0 before this
395/// call wrote the bootstrap header), and `false` when an existing, non-empty
396/// database file was opened. Callers use this to detect the
397/// "fresh main DB file + pre-existing `-wal`" situation, where the WAL is
398/// orphaned from a previous database incarnation and MUST NOT be replayed (see
399/// [`storage::wal::WalFileShared::open_shared`]).
400pub fn maybe_init_database_file(file: &Arc<dyn File>, io: &Arc<dyn IO>) -> Result<bool> {
401    if file.size()? == 0 {
402        // init db
403        let db_header = DatabaseHeader::default();
404        let page1 = allocate_page(
405            1,
406            &Rc::new(BufferPool::new(db_header.get_page_size() as usize)),
407            DATABASE_HEADER_SIZE,
408        );
409        let page1 = Arc::new(BTreePageInner {
410            page: RefCell::new(page1),
411        });
412        {
413            // Create the sqlite_schema table, for this we just need to create the btree page
414            // for the first page of the database which is basically like any other btree page
415            // but with a 100 byte offset, so we just init the page so that sqlite understands
416            // this is a correct page.
417            btree_init_page(
418                &page1,
419                storage::sqlite3_ondisk::PageType::TableLeaf,
420                DATABASE_HEADER_SIZE,
421                (db_header.get_page_size() - db_header.reserved_space as u32) as u16,
422            );
423
424            let page1 = page1.get();
425            let contents = page1
426                .get()
427                .contents
428                .as_mut()
429                .expect("invariant: page1 contents initialized before write"); // UPSTREAM (Limbo): unwrap — needs proper error propagation
430            contents.write_database_header(&db_header);
431            // write the first page to disk synchronously
432            let flag_complete = Rc::new(RefCell::new(false));
433            {
434                let flag_complete = flag_complete.clone();
435                let completion = Completion::Write(WriteCompletion::new(Box::new(move |_| {
436                    *flag_complete.borrow_mut() = true;
437                })));
438                #[allow(clippy::arc_with_non_send_sync)]
439                file.pwrite(0, contents.buffer.clone(), Arc::new(completion))?;
440            }
441            let mut limit = 100;
442            loop {
443                io.run_once()?;
444                if *flag_complete.borrow() {
445                    break;
446                }
447                limit -= 1;
448                if limit == 0 {
449                    panic!("Database file couldn't be initialized, io loop run for {} iterations and write didn't finish", limit);
450                }
451            }
452        }
453        // The file was empty and has just been initialized: it is a brand-new
454        // database, so any pre-existing WAL belongs to a previous incarnation.
455        return Ok(true);
456    }
457    Ok(false)
458}
459
460pub struct Connection {
461    _db: Arc<Database>,
462    pager: Rc<Pager>,
463    schema: Arc<RwLock<Schema>>,
464    header: Arc<SpinLock<DatabaseHeader>>,
465    auto_commit: Cell<bool>,
466    mv_transactions: RefCell<Vec<crate::mvcc::database::TxID>>,
467    transaction_state: Cell<TransactionState>,
468    last_insert_rowid: Cell<i64>,
469    last_change: Cell<i64>,
470    total_changes: Cell<i64>,
471    syms: RefCell<SymbolTable>,
472    _shared_cache: bool,
473    cache_size: Cell<i32>,
474    /// Set once the connection has been checkpoint-closed (via `close()` or
475    /// `Drop`) so the work is not repeated.
476    closed: Cell<bool>,
477}
478
479impl Connection {
480    #[instrument(skip_all, level = Level::TRACE)]
481    pub fn prepare(self: &Arc<Connection>, sql: impl AsRef<str>) -> Result<Statement> {
482        if sql.as_ref().is_empty() {
483            return Err(LimboError::InvalidArgument(
484                "The supplied SQL string contains no statements".to_string(),
485            ));
486        }
487
488        let sql = sql.as_ref();
489        tracing::trace!("Preparing: {}", sql);
490        let mut parser = Parser::new(sql.as_bytes());
491        let cmd = parser.next()?;
492        let syms = self.syms.borrow();
493        let cmd = cmd.expect("Successful parse on nonempty input string should produce a command");
494        let byte_offset_end = parser.offset();
495        let input = str::from_utf8(&sql.as_bytes()[..byte_offset_end])
496            .expect("invariant: sql is valid UTF-8 bytes slice") // UPSTREAM (Limbo): unwrap — needs proper error propagation
497            .trim();
498        match cmd {
499            Cmd::Stmt(stmt) => {
500                let program = Rc::new(translate::translate(
501                    self.schema
502                        .try_read()
503                        .ok_or(LimboError::SchemaLocked)?
504                        .deref(),
505                    stmt,
506                    self.header.clone(),
507                    self.pager.clone(),
508                    self.clone(),
509                    &syms,
510                    QueryMode::Normal,
511                    &input,
512                )?);
513                Ok(Statement::new(
514                    program,
515                    self._db.mv_store.clone(),
516                    self.pager.clone(),
517                ))
518            }
519            Cmd::Explain(_stmt) => todo!(),
520            Cmd::ExplainQueryPlan(_stmt) => todo!(),
521        }
522    }
523
524    #[instrument(skip_all, level = Level::TRACE)]
525    pub fn query(self: &Arc<Connection>, sql: impl AsRef<str>) -> Result<Option<Statement>> {
526        let sql = sql.as_ref();
527        tracing::trace!("Querying: {}", sql);
528        let mut parser = Parser::new(sql.as_bytes());
529        let cmd = parser.next()?;
530        let byte_offset_end = parser.offset();
531        let input = str::from_utf8(&sql.as_bytes()[..byte_offset_end])
532            .expect("invariant: sql is valid UTF-8 bytes slice") // UPSTREAM (Limbo): unwrap — needs proper error propagation
533            .trim();
534        match cmd {
535            Some(cmd) => self.run_cmd(cmd, input),
536            None => Ok(None),
537        }
538    }
539
540    #[instrument(skip_all, level = Level::TRACE)]
541    pub(crate) fn run_cmd(
542        self: &Arc<Connection>,
543        cmd: Cmd,
544        input: &str,
545    ) -> Result<Option<Statement>> {
546        let syms = self.syms.borrow();
547        match cmd {
548            Cmd::Stmt(ref stmt) | Cmd::Explain(ref stmt) => {
549                let program = translate::translate(
550                    self.schema
551                        .try_read()
552                        .ok_or(LimboError::SchemaLocked)?
553                        .deref(),
554                    stmt.clone(),
555                    self.header.clone(),
556                    self.pager.clone(),
557                    self.clone(),
558                    &syms,
559                    cmd.into(),
560                    input,
561                )?;
562                let stmt = Statement::new(
563                    program.into(),
564                    self._db.mv_store.clone(),
565                    self.pager.clone(),
566                );
567                Ok(Some(stmt))
568            }
569            Cmd::ExplainQueryPlan(stmt) => {
570                let mut table_ref_counter = TableRefIdCounter::new();
571                match stmt {
572                    ast::Stmt::Select(select) => {
573                        let mut plan = prepare_select_plan(
574                            self.schema
575                                .try_read()
576                                .ok_or(LimboError::SchemaLocked)?
577                                .deref(),
578                            *select,
579                            &syms,
580                            &[],
581                            &mut table_ref_counter,
582                            translate::plan::QueryDestination::ResultRows,
583                        )?;
584                        optimize_plan(
585                            &mut plan,
586                            self.schema
587                                .try_read()
588                                .ok_or(LimboError::SchemaLocked)?
589                                .deref(),
590                        )?;
591                        let _ = std::io::stdout().write_all(plan.to_string().as_bytes());
592                    }
593                    _ => todo!(),
594                }
595                Ok(None)
596            }
597        }
598    }
599
600    pub fn query_runner<'a>(self: &'a Arc<Connection>, sql: &'a [u8]) -> QueryRunner<'a> {
601        QueryRunner::new(self, sql)
602    }
603
604    /// Execute will run a query from start to finish taking ownership of I/O because it will run pending I/Os if it didn't finish.
605    /// TODO: make this api async
606    #[instrument(skip_all, level = Level::TRACE)]
607    pub fn execute(self: &Arc<Connection>, sql: impl AsRef<str>) -> Result<()> {
608        let sql = sql.as_ref();
609        let mut parser = Parser::new(sql.as_bytes());
610        let cmd = parser.next()?;
611        let syms = self.syms.borrow();
612        let byte_offset_end = parser.offset();
613        let input = str::from_utf8(&sql.as_bytes()[..byte_offset_end])
614            .expect("invariant: sql is valid UTF-8 bytes slice") // UPSTREAM (Limbo): unwrap — needs proper error propagation
615            .trim();
616        if let Some(cmd) = cmd {
617            match cmd {
618                Cmd::Explain(stmt) => {
619                    let program = translate::translate(
620                        self.schema
621                            .try_read()
622                            .ok_or(LimboError::SchemaLocked)?
623                            .deref(),
624                        stmt,
625                        self.header.clone(),
626                        self.pager.clone(),
627                        self.clone(),
628                        &syms,
629                        QueryMode::Explain,
630                        &input,
631                    )?;
632                    let _ = std::io::stdout().write_all(program.explain().as_bytes());
633                }
634                Cmd::ExplainQueryPlan(_stmt) => todo!(),
635                Cmd::Stmt(stmt) => {
636                    let program = translate::translate(
637                        self.schema
638                            .try_read()
639                            .ok_or(LimboError::SchemaLocked)?
640                            .deref(),
641                        stmt,
642                        self.header.clone(),
643                        self.pager.clone(),
644                        self.clone(),
645                        &syms,
646                        QueryMode::Normal,
647                        &input,
648                    )?;
649
650                    let mut state =
651                        vdbe::ProgramState::new(program.max_registers, program.cursor_ref.len());
652                    loop {
653                        let res = program.step(
654                            &mut state,
655                            self._db.mv_store.clone(),
656                            self.pager.clone(),
657                        )?;
658                        if matches!(res, StepResult::Done) {
659                            break;
660                        }
661                        self._db.io.run_once()?;
662                    }
663                }
664            }
665        }
666        Ok(())
667    }
668
669    pub fn wal_frame_count(&self) -> Result<u64> {
670        self.pager.wal_frame_count()
671    }
672
673    pub fn wal_get_frame(
674        &self,
675        frame_no: u32,
676        p_frame: *mut u8,
677        frame_len: u32,
678    ) -> Result<Arc<Completion>> {
679        self.pager.wal_get_frame(frame_no, p_frame, frame_len)
680    }
681
682    /// Flush dirty pages to disk.
683    /// This will write the dirty pages to the WAL and then fsync the WAL.
684    /// If the WAL size is over the checkpoint threshold, it will checkpoint the WAL to
685    /// the database file and then fsync the database file.
686    pub fn cacheflush(&self) -> Result<PagerCacheflushStatus> {
687        self.pager.cacheflush()
688    }
689
690    pub fn clear_page_cache(&self) -> Result<()> {
691        self.pager.clear_page_cache();
692        Ok(())
693    }
694
695    pub fn checkpoint(&self) -> Result<CheckpointResult> {
696        let checkpoint_result = self.pager.wal_checkpoint();
697        Ok(checkpoint_result)
698    }
699
700    /// Run a checkpoint in TRUNCATE mode (resets the WAL file to empty).
701    pub fn checkpoint_truncate(&self) -> Result<CheckpointResult> {
702        self.pager.wal_checkpoint_mode(CheckpointMode::Truncate)
703    }
704
705    /// Close a connection, checkpointing and truncating the WAL so the database
706    /// file is self-contained. Idempotent: a second call is a no-op.
707    pub fn close(&self) -> Result<()> {
708        if self.closed.replace(true) {
709            return Ok(());
710        }
711        self.pager.checkpoint_shutdown()
712    }
713
714    pub fn last_insert_rowid(&self) -> i64 {
715        self.last_insert_rowid.get()
716    }
717
718    fn update_last_rowid(&self, rowid: i64) {
719        self.last_insert_rowid.set(rowid);
720    }
721
722    pub fn set_changes(&self, nchange: i64) {
723        self.last_change.set(nchange);
724        let prev_total_changes = self.total_changes.get();
725        self.total_changes.set(prev_total_changes + nchange);
726    }
727
728    /// Return the number of rows changed by the most recent DML statement.
729    ///
730    /// Mirrors `sqlite3_changes()` semantics: DDL and `BEGIN`/`COMMIT`/`ROLLBACK`
731    /// return 0.  The count is updated by [`Connection::set_changes`] when a write transaction
732    /// commits.
733    pub fn changes(&self) -> i64 {
734        self.last_change.get()
735    }
736
737    pub fn total_changes(&self) -> i64 {
738        self.total_changes.get()
739    }
740
741    pub fn get_cache_size(&self) -> i32 {
742        self.cache_size.get()
743    }
744    pub fn set_cache_size(&self, size: i32) {
745        self.cache_size.set(size);
746    }
747
748    #[cfg(feature = "fs")]
749    pub fn open_new(&self, path: &str, vfs: &str) -> Result<(Arc<dyn IO>, Arc<Database>)> {
750        Database::open_with_vfs(&self._db, path, vfs)
751    }
752
753    pub fn list_vfs(&self) -> Vec<String> {
754        let mut all_vfs = vec![String::from("memory")];
755        #[cfg(feature = "fs")]
756        {
757            #[cfg(target_family = "unix")]
758            {
759                all_vfs.push("syscall".to_string());
760            }
761            #[cfg(all(target_os = "linux", feature = "io_uring"))]
762            {
763                all_vfs.push("io_uring".to_string());
764            }
765            all_vfs.extend(crate::ext::list_vfs_modules());
766        }
767        all_vfs
768    }
769
770    pub fn get_auto_commit(&self) -> bool {
771        self.auto_commit.get()
772    }
773
774    pub fn parse_schema_rows(self: &Arc<Connection>) -> Result<()> {
775        let rows = self.query("SELECT * FROM sqlite_schema")?;
776        {
777            let mut schema = self
778                .schema
779                .try_write()
780                .expect("lock on schema should succeed first try");
781            let syms = self.syms.borrow();
782            if let Err(LimboError::ExtensionError(e)) =
783                parse_schema_rows(rows, &mut schema, self.pager.io.clone(), &syms, None)
784            {
785                // this means that a vtab exists and we no longer have the module loaded. we print
786                // a warning to the user to load the module
787                eprintln!("Warning: {}", e);
788            }
789        } // schema write guard dropped here
790        self.load_persistent_stats()?;
791        Ok(())
792    }
793
794    /// Load persisted `ANALYZE` statistics (`sqlite_stat1`) into the schema's
795    /// in-memory side-map. No-op (bit-for-bit unchanged) when `sqlite_stat1`
796    /// does not exist, i.e. for databases that have never been analyzed.
797    pub fn load_persistent_stats(self: &Arc<Connection>) -> Result<()> {
798        {
799            let schema = self.schema.read();
800            if schema.get_table("sqlite_stat1").is_none() {
801                return Ok(());
802            }
803        } // read lock dropped before preparing / write-locking (no deadlock)
804        let rows = self.query("SELECT tbl, idx, stat FROM sqlite_stat1")?;
805        let mut schema = self.schema.write();
806        schema.stats.clear();
807        crate::util::load_stat1(rows, &mut schema, self.pager.io.clone(), None)?;
808        Ok(())
809    }
810
811    // Clearly there is something to improve here, Vec<Vec<Value>> isn't a couple of tea
812    /// Query the current rows/values of `pragma_name`.
813    pub fn pragma_query(self: &Arc<Connection>, pragma_name: &str) -> Result<Vec<Vec<Value>>> {
814        let pragma = format!("PRAGMA {}", pragma_name);
815        let mut stmt = self.prepare(pragma)?;
816        let mut results = Vec::new();
817        loop {
818            match stmt.step()? {
819                vdbe::StepResult::Row => {
820                    let row: Vec<Value> = stmt
821                        .row()
822                        .expect("invariant: row available after StepResult::Row") // UPSTREAM (Limbo): unwrap — needs proper error propagation
823                        .get_values()
824                        .map(|v| v.clone())
825                        .collect();
826                    results.push(row);
827                }
828                vdbe::StepResult::Interrupt | vdbe::StepResult::Busy => {
829                    return Err(LimboError::Busy);
830                }
831                _ => break,
832            }
833        }
834
835        Ok(results)
836    }
837
838    /// Set a new value to `pragma_name`.
839    ///
840    /// Some pragmas will return the updated value which cannot be retrieved
841    /// with this method.
842    pub fn pragma_update<V: Display>(
843        self: &Arc<Connection>,
844        pragma_name: &str,
845        pragma_value: V,
846    ) -> Result<Vec<Vec<Value>>> {
847        let pragma = format!("PRAGMA {} = {}", pragma_name, pragma_value);
848        let mut stmt = self.prepare(pragma)?;
849        let mut results = Vec::new();
850        loop {
851            match stmt.step()? {
852                vdbe::StepResult::Row => {
853                    let row: Vec<Value> = stmt
854                        .row()
855                        .expect("invariant: row available after StepResult::Row") // UPSTREAM (Limbo): unwrap — needs proper error propagation
856                        .get_values()
857                        .map(|v| v.clone())
858                        .collect();
859                    results.push(row);
860                }
861                vdbe::StepResult::IO => {
862                    // The async IO model requires re-polling until the pager
863                    // finishes flushing; continue the loop instead of breaking.
864                    stmt.run_once()?;
865                }
866                vdbe::StepResult::Interrupt | vdbe::StepResult::Busy => {
867                    return Err(LimboError::Busy);
868                }
869                _ => break,
870            }
871        }
872
873        Ok(results)
874    }
875
876    /// Query the current value(s) of `pragma_name` associated to
877    /// `pragma_value`.
878    ///
879    /// This method can be used with query-only pragmas which need an argument
880    /// (e.g. `table_info('one_tbl')`) or pragmas which returns value(s)
881    /// (e.g. `integrity_check`).
882    pub fn pragma<V: Display>(
883        self: &Arc<Connection>,
884        pragma_name: &str,
885        pragma_value: V,
886    ) -> Result<Vec<Vec<Value>>> {
887        let pragma = format!("PRAGMA {}({})", pragma_name, pragma_value);
888        let mut stmt = self.prepare(pragma)?;
889        let mut results = Vec::new();
890        loop {
891            match stmt.step()? {
892                vdbe::StepResult::Row => {
893                    let row: Vec<Value> = stmt
894                        .row()
895                        .expect("invariant: row available after StepResult::Row") // UPSTREAM (Limbo): unwrap — needs proper error propagation
896                        .get_values()
897                        .map(|v| v.clone())
898                        .collect();
899                    results.push(row);
900                }
901                vdbe::StepResult::IO => {
902                    // The async IO model requires re-polling until the pager
903                    // finishes flushing; continue the loop instead of breaking.
904                    stmt.run_once()?;
905                }
906                vdbe::StepResult::Interrupt | vdbe::StepResult::Busy => {
907                    return Err(LimboError::Busy);
908                }
909                _ => break,
910            }
911        }
912
913        Ok(results)
914    }
915}
916
917impl Drop for Connection {
918    fn drop(&mut self) {
919        // Best-effort clean close: checkpoint + truncate the WAL so a clean
920        // process exit leaves the `.db` self-contained for a byte-level reader.
921        // This must never panic or propagate errors from a destructor.
922        if self.closed.replace(true) {
923            return;
924        }
925        if let Err(e) = self.pager.checkpoint_shutdown() {
926            tracing::warn!("Connection::drop best-effort checkpoint failed: {e}");
927        }
928    }
929}
930
931pub struct Statement {
932    program: Rc<vdbe::Program>,
933    state: vdbe::ProgramState,
934    mv_store: Option<Rc<MvStore>>,
935    pager: Rc<Pager>,
936}
937
938impl Statement {
939    pub fn new(
940        program: Rc<vdbe::Program>,
941        mv_store: Option<Rc<MvStore>>,
942        pager: Rc<Pager>,
943    ) -> Self {
944        let state = vdbe::ProgramState::new(program.max_registers, program.cursor_ref.len());
945        Self {
946            program,
947            state,
948            mv_store,
949            pager,
950        }
951    }
952
953    pub fn set_mv_tx_id(&mut self, mv_tx_id: Option<u64>) {
954        self.state.mv_tx_id = mv_tx_id;
955    }
956
957    pub fn interrupt(&mut self) {
958        self.state.interrupt();
959    }
960
961    pub fn step(&mut self) -> Result<StepResult> {
962        self.program
963            .step(&mut self.state, self.mv_store.clone(), self.pager.clone())
964    }
965
966    pub fn run_once(&self) -> Result<()> {
967        self.pager.io.run_once()
968    }
969
970    pub fn num_columns(&self) -> usize {
971        self.program.result_columns.len()
972    }
973
974    pub fn get_column_name(&self, idx: usize) -> Cow<str> {
975        let column = &self.program.result_columns.get(idx).expect("No column");
976        match column.name(&self.program.table_references) {
977            Some(name) => Cow::Borrowed(name),
978            None => Cow::Owned(column.expr.to_string()),
979        }
980    }
981
982    /// Return the declared SQL type string for result column `idx`, if available.
983    ///
984    /// The declared type is the text written after the column name in `CREATE TABLE`
985    /// (e.g. `"DATE"`, `"TIMESTAMP"`, `"UUID"`, `"INTEGER"`, …).  For computed
986    /// expressions or columns from sub-selects the method returns `None`.
987    pub fn get_column_decl_type(&self, idx: usize) -> Option<Cow<str>> {
988        let column = self.program.result_columns.get(idx)?;
989        match &column.expr {
990            limbo_sqlite3_parser::ast::Expr::Column {
991                table,
992                column: col_idx,
993                ..
994            } => self
995                .program
996                .table_references
997                .find_table_by_internal_id(*table)
998                .and_then(|tbl| tbl.get_column_at(*col_idx))
999                .map(|c| Cow::Owned(c.ty_str.clone())),
1000            _ => None,
1001        }
1002    }
1003
1004    pub fn parameters(&self) -> &parameters::Parameters {
1005        &self.program.parameters
1006    }
1007
1008    pub fn parameters_count(&self) -> usize {
1009        self.program.parameters.count()
1010    }
1011
1012    pub fn bind_at(&mut self, index: NonZero<usize>, value: Value) {
1013        self.state.bind_at(index, value);
1014    }
1015
1016    pub fn reset(&mut self) {
1017        self.state.reset();
1018        self.program.n_change.set(0);
1019    }
1020
1021    pub fn row(&self) -> Option<&Row> {
1022        self.state.result_row.as_ref()
1023    }
1024
1025    pub fn explain(&self) -> String {
1026        self.program.explain()
1027    }
1028}
1029
1030pub type Row = vdbe::Row;
1031
1032pub type StepResult = vdbe::StepResult;
1033
1034pub struct SymbolTable {
1035    pub functions: HashMap<String, Rc<function::ExternalFunc>>,
1036    pub vtabs: HashMap<String, Rc<VirtualTable>>,
1037    pub vtab_modules: HashMap<String, Rc<crate::ext::VTabImpl>>,
1038}
1039
1040impl std::fmt::Debug for SymbolTable {
1041    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
1042        f.debug_struct("SymbolTable")
1043            .field("functions", &self.functions)
1044            .finish()
1045    }
1046}
1047
1048fn is_shared_library(path: &std::path::Path) -> bool {
1049    path.extension()
1050        .map_or(false, |ext| ext == "so" || ext == "dylib" || ext == "dll")
1051}
1052
1053pub fn resolve_ext_path(extpath: &str) -> Result<std::path::PathBuf> {
1054    let path = std::path::Path::new(extpath);
1055    if !path.exists() {
1056        if is_shared_library(path) {
1057            return Err(LimboError::ExtensionError(format!(
1058                "Extension file not found: {}",
1059                extpath
1060            )));
1061        };
1062        let maybe = path.with_extension(std::env::consts::DLL_EXTENSION);
1063        maybe
1064            .exists()
1065            .then_some(maybe)
1066            .ok_or(LimboError::ExtensionError(format!(
1067                "Extension file not found: {}",
1068                extpath
1069            )))
1070    } else {
1071        Ok(path.to_path_buf())
1072    }
1073}
1074
1075impl SymbolTable {
1076    pub fn new() -> Self {
1077        Self {
1078            functions: HashMap::new(),
1079            vtabs: HashMap::new(),
1080            vtab_modules: HashMap::new(),
1081        }
1082    }
1083
1084    pub fn resolve_function(
1085        &self,
1086        name: &str,
1087        _arg_count: usize,
1088    ) -> Option<Rc<function::ExternalFunc>> {
1089        self.functions.get(name).cloned()
1090    }
1091}
1092
1093pub struct QueryRunner<'a> {
1094    parser: Parser<'a>,
1095    conn: &'a Arc<Connection>,
1096    statements: &'a [u8],
1097    last_offset: usize,
1098}
1099
1100impl<'a> QueryRunner<'a> {
1101    pub(crate) fn new(conn: &'a Arc<Connection>, statements: &'a [u8]) -> Self {
1102        Self {
1103            parser: Parser::new(statements),
1104            conn,
1105            statements,
1106            last_offset: 0,
1107        }
1108    }
1109}
1110
1111impl Iterator for QueryRunner<'_> {
1112    type Item = Result<Option<Statement>>;
1113
1114    fn next(&mut self) -> Option<Self::Item> {
1115        match self.parser.next() {
1116            Ok(Some(cmd)) => {
1117                let byte_offset_end = self.parser.offset();
1118                let input = str::from_utf8(&self.statements[self.last_offset..byte_offset_end])
1119                    .expect("invariant: statements are valid UTF-8 bytes slice") // UPSTREAM (Limbo): unwrap — needs proper error propagation
1120                    .trim();
1121                self.last_offset = byte_offset_end;
1122                Some(self.conn.run_cmd(cmd, &input))
1123            }
1124            Ok(None) => None,
1125            Err(err) => {
1126                self.parser.finalize();
1127                Some(Result::Err(LimboError::from(err)))
1128            }
1129        }
1130    }
1131}