keyvaluedb_sqlite/
lib.rs

1#![deny(clippy::all)]
2
3mod tools;
4
5pub use async_sqlite::rusqlite::OpenFlags;
6use async_sqlite::rusqlite::{params, OptionalExtension as _};
7use async_sqlite::*;
8use keyvaluedb::{
9    DBKeyRef, DBKeyValue, DBKeyValueRef, DBOp, DBTransaction, DBTransactionError, DBValue, IoStats,
10    IoStatsKind, KeyValueDB,
11};
12use parking_lot::Mutex;
13use std::sync::Arc;
14use std::{
15    future::Future,
16    io,
17    path::{Path, PathBuf},
18    pin::Pin,
19    str::FromStr,
20};
21use tools::*;
22
23///////////////////////////////////////////////////////////////////////////////
24
25#[derive(Copy, Clone, Debug, Eq, PartialEq)]
26pub enum VacuumMode {
27    None,
28    Incremental,
29    Full,
30}
31
32/// Database configuration
33#[derive(Clone)]
34pub struct DatabaseConfig {
35    /// Set number of columns.
36    /// The number of columns must not be zero.
37    pub columns: u32,
38    /// Set flags used to open the database
39    pub flags: OpenFlags,
40    /// Number of connections to open
41    pub num_conns: usize,
42    /// Vacuum mode
43    pub vacuum_mode: VacuumMode,
44}
45
46impl DatabaseConfig {
47    /// Create new `DatabaseConfig` with default parameters
48    pub fn new() -> Self {
49        Default::default()
50    }
51
52    /// Set the number of columns. `columns` must not be zero.
53    pub fn with_columns(self, columns: u32) -> Self {
54        assert!(columns > 0, "the number of columns must not be zero");
55        Self { columns, ..self }
56    }
57
58    /// Sets the flags to 'in-memory database'
59    pub fn with_in_memory(self) -> Self {
60        Self {
61            flags: OpenFlags::SQLITE_OPEN_READ_WRITE
62                | OpenFlags::SQLITE_OPEN_CREATE
63                | OpenFlags::SQLITE_OPEN_NO_MUTEX
64                | OpenFlags::SQLITE_OPEN_MEMORY,
65            ..self
66        }
67    }
68
69    /// Replaces all the flags
70    pub fn with_flags(self, flags: OpenFlags) -> Self {
71        Self { flags, ..self }
72    }
73
74    /// Sets the number of connections for this database
75    pub fn with_num_conns(self, num_conns: usize) -> Self {
76        Self { num_conns, ..self }
77    }
78
79    /// Set the vacuum mode used by 'cleanup'
80    pub fn with_vacuum_mode(self, vacuum_mode: VacuumMode) -> Self {
81        Self {
82            vacuum_mode,
83            ..self
84        }
85    }
86}
87
88impl Default for DatabaseConfig {
89    fn default() -> DatabaseConfig {
90        DatabaseConfig {
91            columns: 1,
92            flags: OpenFlags::SQLITE_OPEN_READ_WRITE
93                | OpenFlags::SQLITE_OPEN_CREATE
94                | OpenFlags::SQLITE_OPEN_NO_MUTEX,
95            num_conns: 1,
96            vacuum_mode: VacuumMode::None,
97        }
98    }
99}
100
101///////////////////////////////////////////////////////////////////////////////
102
103/// An sqlite table with its statement strings
104pub struct DatabaseTable {
105    _table: String,
106    str_has_value: String,
107    str_has_value_like: String,
108    str_get_unique_value: String,
109    str_get_first_value_like: String,
110    str_set_unique_value: String,
111    str_remove_unique_value: String,
112    str_remove_and_return_unique_value: String,
113    str_remove_unique_value_like: String,
114    str_iter_with_prefix: String,
115    str_iter_no_prefix: String,
116    str_iter_keys_with_prefix: String,
117    str_iter_keys_no_prefix: String,
118}
119
120impl DatabaseTable {
121    pub fn new(table: String) -> Self {
122        let str_has_value = format!("SELECT 1 FROM {} WHERE [key] = ? LIMIT 1", table);
123        let str_has_value_like = format!(
124            "SELECT 1 FROM {} WHERE [key] LIKE ? ESCAPE '\\' LIMIT 1",
125            table
126        );
127        let str_get_unique_value = format!("SELECT value FROM {} WHERE [key] = ? LIMIT 1", table);
128        let str_get_first_value_like = format!(
129            "SELECT key, value FROM {} WHERE [key] LIKE ? ESCAPE '\\' LIMIT 1",
130            table
131        );
132        let str_set_unique_value = format!(
133            "INSERT OR REPLACE INTO {} ([key], value) VALUES(?, ?)",
134            table
135        );
136        let str_remove_unique_value = format!("DELETE FROM {} WHERE [key] = ?", table);
137        let str_remove_and_return_unique_value =
138            format!("DELETE FROM {} WHERE [key] = ? RETURNING value", table);
139        let str_remove_unique_value_like =
140            format!("DELETE FROM {} WHERE [key] LIKE ? ESCAPE '\\'", table);
141        let str_iter_with_prefix = format!(
142            "SELECT key, value FROM {} WHERE [key] LIKE ? ESCAPE '\\'",
143            table
144        );
145        let str_iter_no_prefix = format!("SELECT key, value FROM {}", table);
146        let str_iter_keys_with_prefix =
147            format!("SELECT key FROM {} WHERE [key] LIKE ? ESCAPE '\\'", table);
148        let str_iter_keys_no_prefix = format!("SELECT key FROM {}", table);
149
150        Self {
151            _table: table,
152            str_has_value,
153            str_has_value_like,
154            str_get_unique_value,
155            str_get_first_value_like,
156            str_set_unique_value,
157            str_remove_unique_value,
158            str_remove_and_return_unique_value,
159            str_remove_unique_value_like,
160            str_iter_with_prefix,
161            str_iter_no_prefix,
162            str_iter_keys_with_prefix,
163            str_iter_keys_no_prefix,
164        }
165    }
166}
167
168///////////////////////////////////////////////////////////////////////////////
169
170/// An sqlite key-value database fulfilling the `KeyValueDB` trait
171pub struct DatabaseUnlockedInner {
172    path: PathBuf,
173    config: DatabaseConfig,
174    pool: Pool,
175    control_table: Arc<DatabaseTable>,
176    column_tables: Vec<Arc<DatabaseTable>>,
177}
178
179pub struct DatabaseInner {
180    overall_stats: IoStats,
181    current_stats: IoStats,
182}
183
184#[derive(Clone)]
185pub struct Database {
186    unlocked_inner: Arc<DatabaseUnlockedInner>,
187    inner: Arc<Mutex<DatabaseInner>>,
188}
189
190impl Database {
191    ////////////////////////////////////////////////////////////////
192    // Initialization
193
194    pub fn open<P: AsRef<Path>>(path: P, config: DatabaseConfig) -> io::Result<Self> {
195        assert_ne!(config.columns, 0, "number of columns must be >= 1");
196
197        let path = PathBuf::from(path.as_ref());
198        let flags = config.flags;
199
200        let mut column_tables = vec![];
201        for n in 0..config.columns {
202            column_tables.push(Arc::new(DatabaseTable::new(get_column_table_name(n))))
203        }
204        let control_table = Arc::new(DatabaseTable::new("control".to_string()));
205
206        let pool_builder = PoolBuilder::new()
207            .path(&path)
208            .flags(flags)
209            .num_conns(config.num_conns);
210
211        let pool = pool_builder.open_blocking().map_err(io::Error::other)?;
212
213        let out = Self {
214            unlocked_inner: Arc::new(DatabaseUnlockedInner {
215                path,
216                config,
217                pool,
218                control_table,
219                column_tables,
220            }),
221            inner: Arc::new(Mutex::new(DatabaseInner {
222                overall_stats: IoStats::empty(),
223                current_stats: IoStats::empty(),
224            })),
225        };
226
227        let vacuum_mode = out.config().vacuum_mode;
228
229        out.conn_blocking(move |conn| {
230            // Don't rely on STATEMENT_CACHE_DEFAULT_CAPACITY in rusqlite, set it explicitly
231            conn.set_prepared_statement_cache_capacity(256);
232
233            conn.pragma_update(None, "case_sensitive_like", "ON")?;
234            conn.pragma_update(None, "journal_mode", "WAL")?;
235            conn.pragma_update(None, "synchronous", "normal")?;
236            conn.pragma_update(None, "journal_size_limit", 6144000)?;
237            conn.pragma_update(None, "wal_checkpoint", "TRUNCATE")?;
238
239            match vacuum_mode {
240                VacuumMode::None | VacuumMode::Full => {
241                    let current: u32 =
242                        conn.pragma_query_value(None, "auto_vacuum", |x| x.get(0))?;
243                    if current != 0 {
244                        conn.execute("VACUUM", [])?;
245                        conn.pragma_update(None, "auto_vacuum", 0)?;
246                    }
247                }
248                VacuumMode::Incremental => {
249                    let current: u32 =
250                        conn.pragma_query_value(None, "auto_vacuum", |x| x.get(0))?;
251                    if current != 2 {
252                        conn.execute("VACUUM", [])?;
253                        conn.pragma_update(None, "auto_vacuum", "2")?;
254                    }
255                }
256            }
257
258            Ok(())
259        })
260        .map_err(io::Error::other)?;
261
262        out.open_resize_columns()?;
263
264        Ok(out)
265    }
266
267    pub fn path(&self) -> PathBuf {
268        self.unlocked_inner.path.clone()
269    }
270
271    pub fn config(&self) -> DatabaseConfig {
272        self.unlocked_inner.config.clone()
273    }
274
275    pub fn columns(&self) -> u32 {
276        self.unlocked_inner.config.columns
277    }
278
279    pub fn control_table(&self) -> Arc<DatabaseTable> {
280        self.unlocked_inner.control_table.clone()
281    }
282
283    pub fn column_table(&self, col: u32) -> Arc<DatabaseTable> {
284        self.unlocked_inner.column_tables[col as usize].clone()
285    }
286
287    pub fn conn_blocking<T, F>(&self, func: F) -> Result<T, Error>
288    where
289        F: FnOnce(&rusqlite::Connection) -> Result<T, rusqlite::Error> + Send + 'static,
290        T: Send + 'static,
291    {
292        self.unlocked_inner.pool.conn_blocking(func)
293    }
294
295    pub async fn conn<T, F>(&self, func: F) -> Result<T, Error>
296    where
297        F: FnOnce(&rusqlite::Connection) -> Result<T, rusqlite::Error> + Send + 'static,
298        T: Send + 'static,
299    {
300        self.unlocked_inner.pool.conn(func).await
301    }
302
303    pub async fn conn_mut<T, F>(&self, func: F) -> Result<T, Error>
304    where
305        F: FnOnce(&mut rusqlite::Connection) -> Result<T, rusqlite::Error> + Send + 'static,
306        T: Send + 'static,
307    {
308        self.unlocked_inner.pool.conn_mut(func).await
309    }
310
311    ////////////////////////////////////////////////////////////////
312    // Low level operations
313
314    /// Remove the last column family in the database. The deletion is definitive.
315    pub fn remove_last_column(&self) -> Result<(), Error> {
316        let this = self.clone();
317        self.conn_blocking(move |conn| {
318            let columns = Self::get_unique_value(conn, this.control_table(), "columns", 0u32)?;
319            if columns == 0 {
320                return Err(rusqlite::Error::QueryReturnedNoRows);
321            }
322            Self::set_unique_value(conn, this.control_table(), "columns", columns - 1)?;
323
324            conn.execute(
325                &format!("DROP TABLE {}", get_column_table_name(columns - 1)),
326                [],
327            )?;
328            Ok(())
329        })
330    }
331
332    /// Add a new column family to the DB.
333    pub fn add_column(&self) -> Result<(), Error> {
334        let this = self.clone();
335
336        self.conn_blocking(move |conn| {
337            let columns = Self::get_unique_value(conn, this.control_table(), "columns", 0u32)?;
338            Self::set_unique_value(conn, this.control_table(), "columns", columns + 1)?;
339            Self::create_column_table(conn, columns)
340        })
341    }
342    /// Helper to create new transaction for this database.
343    pub fn transaction(&self) -> DBTransaction {
344        DBTransaction::new()
345    }
346
347    /// Vacuum database
348    pub async fn vacuum(&self) -> Result<(), Error> {
349        match self.config().vacuum_mode {
350            VacuumMode::None => {
351                self.conn(move |conn| {
352                    conn.pragma_update(None, "wal_checkpoint", "TRUNCATE")?;
353                    Ok(())
354                })
355                .await
356            }
357            VacuumMode::Incremental => {
358                self.conn(move |conn| {
359                    conn.execute("PRAGMA incremental_vacuum", [])?;
360                    conn.pragma_update(None, "wal_checkpoint", "TRUNCATE")?;
361                    Ok(())
362                })
363                .await
364            }
365            VacuumMode::Full => {
366                self.conn(move |conn| {
367                    conn.execute("VACUUM", [])?;
368                    conn.pragma_update(None, "wal_checkpoint", "TRUNCATE")?;
369                    Ok(())
370                })
371                .await
372            }
373        }
374    }
375
376    ////////////////////////////////////////////////////////////////
377    // Internal helpers
378
379    fn validate_column(&self, col: u32) -> rusqlite::Result<()> {
380        if col >= self.columns() {
381            return Err(rusqlite::Error::InvalidColumnIndex(col as usize));
382        }
383        Ok(())
384    }
385
386    fn create_column_table(conn: &rusqlite::Connection, column: u32) -> rusqlite::Result<()> {
387        conn.execute(&format!("CREATE TABLE IF NOT EXISTS {} (id INTEGER PRIMARY KEY AUTOINCREMENT, [key] TEXT UNIQUE, value BLOB)", get_column_table_name(column)), []).map(drop)
388    }
389
390    fn get_unique_value<V>(
391        conn: &rusqlite::Connection,
392        table: Arc<DatabaseTable>,
393        key: &str,
394        default: V,
395    ) -> rusqlite::Result<V>
396    where
397        V: FromStr,
398    {
399        let mut stmt = conn.prepare_cached(&table.str_get_unique_value)?;
400
401        if let Ok(found) = stmt.query_row([key], |row| -> rusqlite::Result<String> { row.get(0) }) {
402            if let Ok(v) = V::from_str(&found) {
403                return Ok(v);
404            }
405        }
406        Ok(default)
407    }
408
409    fn set_unique_value<V>(
410        conn: &rusqlite::Connection,
411        table: Arc<DatabaseTable>,
412        key: &str,
413        value: V,
414    ) -> rusqlite::Result<()>
415    where
416        V: ToString,
417    {
418        let mut stmt = conn.prepare_cached(&table.str_set_unique_value)?;
419
420        let changed = stmt.execute([key, value.to_string().as_str()])?;
421
422        assert!(
423            changed <= 1,
424            "multiple changes to unique key should not occur"
425        );
426        if changed == 0 {
427            return Err(rusqlite::Error::QueryReturnedNoRows);
428        }
429
430        Ok(())
431    }
432
433    fn has_value(
434        conn: &rusqlite::Connection,
435        table: Arc<DatabaseTable>,
436        key: &str,
437    ) -> rusqlite::Result<bool> {
438        let mut stmt = conn.prepare_cached(&table.str_has_value)?;
439        stmt.exists([key])
440    }
441
442    fn has_value_like(
443        conn: &rusqlite::Connection,
444        table: Arc<DatabaseTable>,
445        key: &str,
446    ) -> rusqlite::Result<bool> {
447        let mut stmt = conn.prepare_cached(&table.str_has_value_like)?;
448        stmt.exists([key])
449    }
450
451    fn load_unique_value_blob(
452        conn: &rusqlite::Connection,
453        table: Arc<DatabaseTable>,
454        key: &str,
455    ) -> rusqlite::Result<Option<Vec<u8>>> {
456        let mut stmt = conn.prepare_cached(&table.str_get_unique_value)?;
457
458        stmt.query_row([key], |row| -> rusqlite::Result<Vec<u8>> { row.get(0) })
459            .optional()
460    }
461
462    fn load_first_value_blob_like(
463        conn: &rusqlite::Connection,
464        table: Arc<DatabaseTable>,
465        like: &str,
466    ) -> rusqlite::Result<Option<(String, Vec<u8>)>> {
467        let mut stmt = conn.prepare_cached(&table.str_get_first_value_like)?;
468
469        stmt.query_row([like], |row| -> rusqlite::Result<(String, Vec<u8>)> {
470            Ok((row.get(0)?, row.get(1)?))
471        })
472        .optional()
473    }
474
475    fn store_unique_value_blob(
476        conn: &rusqlite::Connection,
477        table: Arc<DatabaseTable>,
478        key: &str,
479        value: &[u8],
480    ) -> rusqlite::Result<()> {
481        let mut stmt = conn.prepare_cached(&table.str_set_unique_value)?;
482
483        let changed = stmt.execute(params![key, value])?;
484        assert!(
485            changed <= 1,
486            "multiple changes to unique key should not occur"
487        );
488        if changed == 0 {
489            return Err(rusqlite::Error::QueryReturnedNoRows);
490        }
491        Ok(())
492    }
493
494    fn remove_unique_value_blob(
495        conn: &rusqlite::Connection,
496        table: Arc<DatabaseTable>,
497        key: &str,
498    ) -> rusqlite::Result<()> {
499        let mut stmt = conn.prepare_cached(&table.str_remove_unique_value)?;
500
501        let changed = stmt.execute([key])?;
502        assert!(
503            changed <= 1,
504            "multiple deletions of unique key should not occur"
505        );
506        if changed == 0 {
507            return Err(rusqlite::Error::QueryReturnedNoRows);
508        }
509        Ok(())
510    }
511
512    fn remove_and_return_unique_value_blob(
513        conn: &rusqlite::Connection,
514        table: Arc<DatabaseTable>,
515        key: &str,
516    ) -> rusqlite::Result<Option<Vec<u8>>> {
517        let mut stmt = conn.prepare_cached(&table.str_remove_and_return_unique_value)?;
518
519        stmt.query_row([key], |row| -> rusqlite::Result<Vec<u8>> { row.get(0) })
520            .optional()
521    }
522
523    fn remove_unique_value_blob_like(
524        conn: &rusqlite::Connection,
525        table: Arc<DatabaseTable>,
526        like: &str,
527    ) -> rusqlite::Result<usize> {
528        let mut stmt = conn.prepare_cached(&table.str_remove_unique_value_like)?;
529
530        let changed = stmt.execute([like])?;
531        Ok(changed)
532    }
533
534    fn open_resize_columns(&self) -> io::Result<()> {
535        let columns = self.columns();
536        let this = self.clone();
537        self.conn_blocking(move |conn| {
538			// First see if we have a control table with the number of columns
539			conn.execute("CREATE TABLE IF NOT EXISTS control (id INTEGER PRIMARY KEY AUTOINCREMENT, [key] TEXT UNIQUE, value TEXT)", [])?;
540
541            // Get column count
542            let on_disk_columns =
543                Self::get_unique_value(conn, this.control_table(), "columns", 0u32)?;
544
545            // If desired column count is less than or equal to current column count, then allow it, but restrict access to columns
546            if columns <= on_disk_columns {
547                return Ok(());
548            }
549
550            // Otherwise resize and add other columns
551            for cn in on_disk_columns..columns {
552                // Create the column table if we don't have it
553                Self::create_column_table(conn, cn)?;
554            }
555            Self::set_unique_value(
556                conn,
557                this.control_table(),
558                "columns",
559                columns,
560            )?;
561            Ok(())
562        }).map_err(io::Error::other)
563    }
564
565    fn stats_read(&self, count: usize, bytes: usize) {
566        let mut inner = self.inner.lock();
567        inner.current_stats.reads += count as u64;
568        inner.overall_stats.reads += count as u64;
569        inner.current_stats.bytes_read += bytes as u64;
570        inner.overall_stats.bytes_read += bytes as u64;
571    }
572    fn stats_write(&self, count: usize, bytes: usize) {
573        let mut inner = self.inner.lock();
574        inner.current_stats.writes += count as u64;
575        inner.overall_stats.writes += count as u64;
576        inner.current_stats.bytes_written += bytes as u64;
577        inner.overall_stats.bytes_written += bytes as u64;
578    }
579    fn stats_transaction(&self, count: usize) {
580        let mut inner = self.inner.lock();
581        inner.current_stats.transactions += count as u64;
582        inner.overall_stats.transactions += count as u64;
583    }
584}
585
586impl KeyValueDB for Database {
587    fn get(
588        &self,
589        col: u32,
590        key: &[u8],
591    ) -> Pin<Box<dyn Future<Output = io::Result<Option<DBValue>>> + Send + '_>> {
592        let key_text = key_to_text(key);
593        let key_len = key.len();
594
595        Box::pin(async move {
596            let that = self.clone();
597            that.validate_column(col).map_err(io::Error::other)?;
598            let someval = self
599                .conn_blocking(move |conn| {
600                    Self::load_unique_value_blob(conn, that.column_table(col), &key_text)
601                })
602                .map_err(io::Error::other)?;
603            {
604                match &someval {
605                    Some(val) => self.stats_read(1, key_len + val.len()),
606                    None => self.stats_read(1, key_len),
607                }
608            }
609
610            Ok(someval)
611        })
612    }
613
614    /// Remove a value by key, returning the old value
615    fn delete(
616        &self,
617        col: u32,
618        key: &[u8],
619    ) -> Pin<Box<dyn Future<Output = io::Result<Option<DBValue>>> + Send + '_>> {
620        let key_text = key_to_text(key);
621        let key_len = key.len();
622
623        Box::pin(async move {
624            let that = self.clone();
625            that.validate_column(col).map_err(io::Error::other)?;
626            self.conn_blocking(move |conn| {
627                let someval = Self::remove_and_return_unique_value_blob(
628                    conn,
629                    that.column_table(col),
630                    &key_text,
631                )?;
632
633                match &someval {
634                    Some(val) => {
635                        that.stats_read(1, key_len + val.len());
636                    }
637                    None => that.stats_read(1, key_len),
638                }
639
640                Ok(someval)
641            })
642            .map_err(io::Error::other)
643        })
644    }
645
646    fn write(
647        &self,
648        transaction: DBTransaction,
649    ) -> Pin<Box<dyn Future<Output = Result<(), DBTransactionError>> + Send + '_>> {
650        let transaction = Arc::new(transaction);
651        Box::pin(async move {
652            self.stats_transaction(1);
653
654            let that = self.clone();
655            let transaction_clone = transaction.clone();
656            self.conn_mut(move |conn| {
657                let mut sw = 0usize;
658                let mut sbw = 0usize;
659
660                let tx = conn.transaction()?;
661
662                for op in &transaction_clone.ops {
663                    match op {
664                        DBOp::Insert { col, key, value } => {
665                            that.validate_column(*col)?;
666                            Self::store_unique_value_blob(
667                                &tx,
668                                that.column_table(*col),
669                                &key_to_text(key),
670                                value,
671                            )?;
672                            sw += 1;
673                            sbw += key.len() + value.len();
674                        }
675                        DBOp::Delete { col, key } => {
676                            that.validate_column(*col)?;
677                            Self::remove_unique_value_blob(
678                                &tx,
679                                that.column_table(*col),
680                                &key_to_text(key),
681                            )?;
682                            sw += 1;
683                        }
684                        DBOp::DeletePrefix { col, prefix } => {
685                            that.validate_column(*col)?;
686                            Self::remove_unique_value_blob_like(
687                                &tx,
688                                that.column_table(*col),
689                                &(like_key_to_text(prefix) + "%"),
690                            )?;
691                            sw += 1;
692                        }
693                    }
694                }
695                tx.commit()?;
696
697                that.stats_write(sw, sbw);
698
699                Ok(())
700            })
701            .await
702            .map_err(io::Error::other)
703            .map_err(|error| {
704                let transaction = transaction.as_ref().clone();
705                DBTransactionError { error, transaction }
706            })
707        })
708    }
709
710    fn iter<
711        'a,
712        T: Send + 'static,
713        C: Send + 'static,
714        F: FnMut(&mut C, DBKeyValueRef) -> io::Result<Option<T>> + Send + Sync + 'static,
715    >(
716        &'a self,
717        col: u32,
718        prefix: Option<&'a [u8]>,
719        context: C,
720        mut f: F,
721    ) -> Pin<Box<dyn Future<Output = io::Result<(C, Option<T>)>> + Send + 'a>> {
722        let opt_prefix_query = prefix.map(|p| like_key_to_text(p) + "%");
723        Box::pin(async move {
724            if col >= self.columns() {
725                return Err(io::Error::from(io::ErrorKind::NotFound));
726            }
727
728            let that = self.clone();
729            let context = Arc::new(Mutex::new(Some(context)));
730            let context_ref = context.clone();
731
732            let res = self
733                .conn(move |conn| {
734                    let mut context = context_ref.lock();
735                    let context = context.as_mut().unwrap();
736
737                    let mut stmt;
738                    let mut rows;
739                    if let Some(prefix_query) = opt_prefix_query {
740                        stmt = match conn
741                            .prepare_cached(&that.column_table(col).str_iter_with_prefix)
742                        {
743                            Ok(v) => v,
744                            Err(e) => {
745                                return Ok(Err(io::Error::other(e)));
746                            }
747                        };
748                        rows = match stmt.query([prefix_query]) {
749                            Ok(v) => v,
750                            Err(e) => {
751                                return Ok(Err(io::Error::other(e)));
752                            }
753                        };
754                    } else {
755                        stmt = match conn.prepare_cached(&that.column_table(col).str_iter_no_prefix)
756                        {
757                            Ok(v) => v,
758                            Err(e) => {
759                                return Ok(Err(io::Error::other(e)));
760                            }
761                        };
762                        rows = match stmt.query([]) {
763                            Ok(v) => v,
764                            Err(e) => {
765                                return Ok(Err(io::Error::other(e)));
766                            }
767                        };
768                    }
769
770                    let mut sw = 0usize;
771                    let mut sbw = 0usize;
772
773                    let out = loop {
774                        match rows.next() {
775                            // Iterated value
776                            Ok(Some(row)) => {
777                                let kt: String = match row.get(0) {
778                                    Err(e) => {
779                                        break Err(io::Error::other(e));
780                                    }
781                                    Ok(v) => v,
782                                };
783                                let v: Vec<u8> = match row.get(1) {
784                                    Err(e) => {
785                                        break Err(io::Error::other(e));
786                                    }
787                                    Ok(v) => v,
788                                };
789                                let k: Vec<u8> = match text_to_key(&kt) {
790                                    Err(e) => {
791                                        break Err(io::Error::other(format!(
792                                            "SQLite row get column 0 text convert error: {:?}",
793                                            e
794                                        )));
795                                    }
796                                    Ok(v) => v,
797                                };
798
799                                sw += 1;
800                                sbw += k.len() + v.len();
801
802                                match f(context, (&k, &v)) {
803                                    Ok(None) => (),
804                                    Ok(Some(out)) => {
805                                        // Callback early termination
806                                        that.stats_read(sw, sbw);
807                                        break Ok(Some(out));
808                                    }
809                                    Err(e) => {
810                                        // Callback error termination
811                                        that.stats_read(sw, sbw);
812                                        break Err(e);
813                                    }
814                                }
815                            }
816                            // Natural iterator termination
817                            Ok(None) => {
818                                break Ok(None);
819                            }
820                            // Error iterator termination
821                            Err(_) => {
822                                break Ok(None);
823                            }
824                        }
825                    };
826
827                    that.stats_read(sw, sbw);
828
829                    Ok(out)
830                })
831                .await
832                .map_err(io::Error::other)?;
833
834            let context = context.lock().take().unwrap();
835
836            res.map(|x| (context, x))
837        })
838    }
839
840    fn iter_keys<
841        'a,
842        T: Send + 'static,
843        C: Send + 'static,
844        F: FnMut(&mut C, DBKeyRef) -> io::Result<Option<T>> + Send + Sync + 'static,
845    >(
846        &'a self,
847        col: u32,
848        prefix: Option<&'a [u8]>,
849        context: C,
850        mut f: F,
851    ) -> Pin<Box<dyn Future<Output = io::Result<(C, Option<T>)>> + Send + 'a>> {
852        let opt_prefix_query = prefix.map(|p| like_key_to_text(p) + "%");
853        Box::pin(async move {
854            if col >= self.columns() {
855                return Err(io::Error::from(io::ErrorKind::NotFound));
856            }
857
858            let that = self.clone();
859            let context = Arc::new(Mutex::new(Some(context)));
860            let context_ref = context.clone();
861
862            let res = self
863                .conn(move |conn| {
864                    let mut context = context_ref.lock();
865                    let context = context.as_mut().unwrap();
866
867                    let mut stmt;
868                    let mut rows;
869                    if let Some(prefix_query) = opt_prefix_query {
870                        stmt = match conn
871                            .prepare_cached(&that.column_table(col).str_iter_keys_with_prefix)
872                        {
873                            Ok(v) => v,
874                            Err(e) => {
875                                return Ok(Err(io::Error::other(e)));
876                            }
877                        };
878                        rows = match stmt.query([prefix_query]) {
879                            Ok(v) => v,
880                            Err(e) => {
881                                return Ok(Err(io::Error::other(e)));
882                            }
883                        };
884                    } else {
885                        stmt = match conn
886                            .prepare_cached(&that.column_table(col).str_iter_keys_no_prefix)
887                        {
888                            Ok(v) => v,
889                            Err(e) => {
890                                return Ok(Err(io::Error::other(e)));
891                            }
892                        };
893                        rows = match stmt.query([]) {
894                            Ok(v) => v,
895                            Err(e) => {
896                                return Ok(Err(io::Error::other(e)));
897                            }
898                        };
899                    }
900
901                    let mut sw = 0usize;
902                    let mut sbw = 0usize;
903
904                    let out = loop {
905                        match rows.next() {
906                            // Iterated value
907                            Ok(Some(row)) => {
908                                let kt: String = match row.get(0) {
909                                    Err(e) => {
910                                        break Err(io::Error::other(e));
911                                    }
912                                    Ok(v) => v,
913                                };
914                                let k: Vec<u8> = match text_to_key(&kt) {
915                                    Err(e) => {
916                                        break Err(io::Error::other(format!(
917                                            "SQLite row get column 0 text convert error: {:?}",
918                                            e
919                                        )));
920                                    }
921                                    Ok(v) => v,
922                                };
923
924                                sw += 1;
925                                sbw += k.len();
926
927                                match f(context, &k) {
928                                    Ok(None) => (),
929                                    Ok(Some(out)) => {
930                                        // Callback early termination
931                                        that.stats_read(sw, sbw);
932                                        break Ok(Some(out));
933                                    }
934                                    Err(e) => {
935                                        // Callback error termination
936                                        that.stats_read(sw, sbw);
937                                        break Err(e);
938                                    }
939                                }
940                            }
941                            // Natural iterator termination
942                            Ok(None) => {
943                                break Ok(None);
944                            }
945                            // Error iterator termination
946                            Err(_) => {
947                                break Ok(None);
948                            }
949                        }
950                    };
951
952                    that.stats_read(sw, sbw);
953
954                    Ok(out)
955                })
956                .await
957                .map_err(io::Error::other)?;
958
959            let context = context.lock().take().unwrap();
960
961            res.map(|x| (context, x))
962        })
963    }
964
965    fn io_stats(&self, kind: IoStatsKind) -> IoStats {
966        let mut inner = self.inner.lock();
967        match kind {
968            IoStatsKind::Overall => {
969                let mut stats = inner.overall_stats.clone();
970                stats.span = std::time::SystemTime::now()
971                    .duration_since(stats.started)
972                    .unwrap_or_default();
973                stats
974            }
975            IoStatsKind::SincePrevious => {
976                let mut stats = inner.current_stats.clone();
977                stats.span = std::time::SystemTime::now()
978                    .duration_since(stats.started)
979                    .unwrap_or_default();
980                inner.current_stats = IoStats::empty();
981                stats
982            }
983        }
984    }
985
986    fn num_columns(&self) -> io::Result<u32> {
987        let this = self.clone();
988        self.conn_blocking(move |conn| {
989            Self::get_unique_value(conn, this.control_table(), "columns", 0u32)
990        })
991        .map_err(io::Error::other)
992    }
993
994    fn num_keys(&self, col: u32) -> Pin<Box<dyn Future<Output = io::Result<u64>> + Send + '_>> {
995        let this = self.clone();
996        Box::pin(async move {
997            this.conn(move |conn| {
998                conn.query_row(
999                    &format!("SELECT Count(*) FROM {}", get_column_table_name(col)),
1000                    [],
1001                    |row| -> rusqlite::Result<u64> { row.get(0) },
1002                )
1003            })
1004            .await
1005            .map_err(|_| io::Error::from(io::ErrorKind::NotFound))
1006        })
1007    }
1008
1009    /// Check for the existence of a value by key.
1010    fn has_key<'a>(
1011        &'a self,
1012        col: u32,
1013        key: &'a [u8],
1014    ) -> Pin<Box<dyn Future<Output = io::Result<bool>> + Send + 'a>> {
1015        let key_text = key_to_text(key);
1016        let key_len = key.len();
1017
1018        Box::pin(async move {
1019            let that = self.clone();
1020            that.validate_column(col).map_err(io::Error::other)?;
1021            let someval = self
1022                .conn_blocking(move |conn| Self::has_value(conn, that.column_table(col), &key_text))
1023                .map_err(io::Error::other)?;
1024
1025            self.stats_read(1, key_len);
1026
1027            Ok(someval)
1028        })
1029    }
1030
1031    /// Check for the existence of a value by prefix.
1032    fn has_prefix<'a>(
1033        &'a self,
1034        col: u32,
1035        prefix: &'a [u8],
1036    ) -> Pin<Box<dyn Future<Output = io::Result<bool>> + Send + 'a>> {
1037        let prefix_len = prefix.len();
1038        let prefix_text = like_key_to_text(prefix) + "%";
1039
1040        Box::pin(async move {
1041            let that = self.clone();
1042            that.validate_column(col).map_err(io::Error::other)?;
1043            let someval = self
1044                .conn_blocking(move |conn| {
1045                    Self::has_value_like(conn, that.column_table(col), &prefix_text)
1046                })
1047                .map_err(io::Error::other)?;
1048
1049            self.stats_read(1, prefix_len);
1050
1051            Ok(someval)
1052        })
1053    }
1054
1055    /// Get the first value matching the given prefix.
1056    fn first_with_prefix<'a>(
1057        &'a self,
1058        col: u32,
1059        prefix: &'a [u8],
1060    ) -> Pin<Box<dyn Future<Output = io::Result<Option<DBKeyValue>>> + Send + 'a>> {
1061        let prefix_len = prefix.len();
1062        let like = like_key_to_text(prefix) + "%";
1063
1064        Box::pin(async move {
1065            let that = self.clone();
1066            that.validate_column(col).map_err(io::Error::other)?;
1067            let someval = self
1068                .conn_blocking(move |conn| {
1069                    Self::load_first_value_blob_like(conn, that.column_table(col), &like)
1070                })
1071                .map_err(io::Error::other)?;
1072
1073            self.stats_read(1, prefix_len);
1074
1075            match someval {
1076                Some((kt, val)) => match text_to_key(&kt) {
1077                    Err(e) => {
1078                        return Err(io::Error::other(format!(
1079                            "SQLite row get column 0 text convert error: {:?}",
1080                            e
1081                        )))
1082                    }
1083                    Ok(k) => Ok(Some((k, val))),
1084                },
1085                None => Ok(None),
1086            }
1087        })
1088    }
1089
1090    /// Vacuum database
1091    fn cleanup(&self) -> Pin<Box<dyn Future<Output = io::Result<()>> + Send + '_>> {
1092        Box::pin(async { self.vacuum().await.map_err(io::Error::other) })
1093    }
1094}
1095
1096#[cfg(test)]
1097mod tests {
1098
1099    use super::*;
1100    use keyvaluedb_shared_tests as st;
1101    use tempfile::Builder as TempfileBuilder;
1102
1103    fn create(columns: u32) -> io::Result<Database> {
1104        let tempfile = TempfileBuilder::new()
1105            .prefix("")
1106            .tempfile()?
1107            .path()
1108            .to_path_buf();
1109        let config = DatabaseConfig::new().with_columns(columns);
1110        Database::open(tempfile, config)
1111    }
1112
1113    fn create_vacuum_mode(columns: u32, vacuum_mode: VacuumMode) -> io::Result<Database> {
1114        let tempfile = TempfileBuilder::new()
1115            .prefix("")
1116            .tempfile()?
1117            .path()
1118            .to_path_buf();
1119        let config = DatabaseConfig::new()
1120            .with_columns(columns)
1121            .with_vacuum_mode(vacuum_mode);
1122        Database::open(tempfile, config)
1123    }
1124
1125    #[tokio::test]
1126    async fn get_fails_with_non_existing_column() -> io::Result<()> {
1127        let db = create(1)?;
1128        st::test_get_fails_with_non_existing_column(db).await
1129    }
1130
1131    #[tokio::test]
1132    async fn num_keys() -> io::Result<()> {
1133        let db = create(1)?;
1134        st::test_num_keys(db).await
1135    }
1136
1137    #[tokio::test]
1138    async fn put_and_get() -> io::Result<()> {
1139        let db = create(1)?;
1140        st::test_put_and_get(db).await
1141    }
1142
1143    #[tokio::test]
1144    async fn delete_and_get() -> io::Result<()> {
1145        let db = create(1)?;
1146        st::test_delete_and_get(db).await
1147    }
1148
1149    #[tokio::test]
1150    async fn delete_and_get_single() -> io::Result<()> {
1151        let db = create(1)?;
1152        st::test_delete_and_get_single(db).await
1153    }
1154
1155    #[tokio::test]
1156    async fn delete_prefix() -> io::Result<()> {
1157        let db = create(st::DELETE_PREFIX_NUM_COLUMNS)?;
1158        st::test_delete_prefix(db).await
1159    }
1160
1161    #[tokio::test]
1162    async fn iter() -> io::Result<()> {
1163        let db = create(1)?;
1164        st::test_iter(db).await
1165    }
1166
1167    #[tokio::test]
1168    async fn iter_keys() -> io::Result<()> {
1169        let db = create(1)?;
1170        st::test_iter_keys(db).await
1171    }
1172
1173    #[tokio::test]
1174    async fn iter_with_prefix() -> io::Result<()> {
1175        let db = create(1)?;
1176        st::test_iter_with_prefix(db).await
1177    }
1178
1179    #[tokio::test]
1180    async fn complex() -> io::Result<()> {
1181        let db = create(1)?;
1182        st::test_complex(db).await
1183    }
1184
1185    #[tokio::test]
1186    async fn cleanup() -> io::Result<()> {
1187        let db = create(1)?;
1188        st::test_cleanup(db).await?;
1189
1190        let db = create_vacuum_mode(1, VacuumMode::None)?;
1191        st::test_cleanup(db).await?;
1192
1193        let db = create_vacuum_mode(1, VacuumMode::Incremental)?;
1194        st::test_cleanup(db).await?;
1195
1196        let db = create_vacuum_mode(1, VacuumMode::Full)?;
1197        st::test_cleanup(db).await?;
1198
1199        let tempfile = TempfileBuilder::new()
1200            .prefix("")
1201            .tempfile()?
1202            .path()
1203            .to_path_buf();
1204        let config = DatabaseConfig::new().with_vacuum_mode(VacuumMode::None);
1205        let db = Database::open(tempfile.clone(), config)?;
1206        st::test_cleanup(db).await?;
1207
1208        let config = DatabaseConfig::new().with_vacuum_mode(VacuumMode::Incremental);
1209        let db = Database::open(tempfile.clone(), config)?;
1210        st::test_cleanup(db).await?;
1211
1212        let config = DatabaseConfig::new().with_vacuum_mode(VacuumMode::Full);
1213        let db = Database::open(tempfile.clone(), config)?;
1214        st::test_cleanup(db).await?;
1215
1216        let config = DatabaseConfig::new().with_vacuum_mode(VacuumMode::None);
1217        let db = Database::open(tempfile, config)?;
1218        st::test_cleanup(db).await?;
1219
1220        Ok(())
1221    }
1222
1223    #[tokio::test]
1224    async fn stats() -> io::Result<()> {
1225        let db = create(st::IO_STATS_NUM_COLUMNS)?;
1226        st::test_io_stats(db).await
1227    }
1228
1229    #[tokio::test]
1230    #[should_panic]
1231    async fn db_config_with_zero_columns() {
1232        let _cfg = DatabaseConfig::new().with_columns(0);
1233    }
1234
1235    #[tokio::test]
1236    #[should_panic]
1237    async fn open_db_with_zero_columns() {
1238        let cfg = DatabaseConfig::new().with_columns(0);
1239        let _db = Database::open("", cfg);
1240    }
1241
1242    #[tokio::test]
1243    async fn add_columns() {
1244        let config_1 = DatabaseConfig::default();
1245        let config_5 = DatabaseConfig::new().with_columns(5);
1246
1247        let tempfile = TempfileBuilder::new()
1248            .prefix("")
1249            .tempfile()
1250            .unwrap()
1251            .path()
1252            .to_path_buf();
1253
1254        // open 1, add 4.
1255        {
1256            let db = Database::open(&tempfile, config_1).unwrap();
1257            assert_eq!(db.num_columns().unwrap(), 1);
1258
1259            for i in 2..=5 {
1260                db.add_column().unwrap();
1261                assert_eq!(db.num_columns().unwrap(), i);
1262            }
1263        }
1264
1265        // reopen as 5.
1266        {
1267            let db = Database::open(&tempfile, config_5).unwrap();
1268            assert_eq!(db.num_columns().unwrap(), 5);
1269        }
1270    }
1271
1272    #[tokio::test]
1273    async fn remove_columns() {
1274        let config_1 = DatabaseConfig::default();
1275        let config_5 = DatabaseConfig::new().with_columns(5);
1276
1277        let tempfile = TempfileBuilder::new()
1278            .prefix("drop_columns")
1279            .tempfile()
1280            .unwrap()
1281            .path()
1282            .to_path_buf();
1283
1284        // open 5, remove 4.
1285        {
1286            let db = Database::open(&tempfile, config_5).expect("open with 5 columns");
1287            assert_eq!(db.num_columns().unwrap(), 5);
1288
1289            for i in (1..5).rev() {
1290                db.remove_last_column().unwrap();
1291                assert_eq!(db.num_columns().unwrap(), i);
1292            }
1293        }
1294
1295        // reopen as 1.
1296        {
1297            let db = Database::open(&tempfile, config_1).unwrap();
1298            assert_eq!(db.num_columns().unwrap(), 1);
1299        }
1300    }
1301
1302    #[tokio::test]
1303    async fn test_num_keys() {
1304        let tempfile = TempfileBuilder::new()
1305            .prefix("")
1306            .tempfile()
1307            .unwrap()
1308            .path()
1309            .to_path_buf();
1310        let config = DatabaseConfig::new().with_columns(1);
1311        let db = Database::open(tempfile, config).unwrap();
1312
1313        assert_eq!(
1314            db.num_keys(0).await.unwrap(),
1315            0,
1316            "database is empty after creation"
1317        );
1318        let key1 = b"beef";
1319        let mut batch = db.transaction();
1320        batch.put(0, key1, key1);
1321        db.write(batch).await.unwrap();
1322        assert_eq!(
1323            db.num_keys(0).await.unwrap(),
1324            1,
1325            "adding a key increases the count"
1326        );
1327    }
1328}