Skip to main content

keyvaluedb_sqlite/
lib.rs

1#![deny(clippy::all)]
2
3mod tools;
4
5pub use async_sqlite::rusqlite::OpenFlags;
6use async_sqlite::rusqlite::{params, OptionalExtension as _};
7use async_sqlite::*;
8use keyvaluedb::{
9    DBKeyRef, DBKeyValue, DBKeyValueRef, DBOp, DBTransaction, DBTransactionError, DBValue, IoStats,
10    IoStatsKind, KeyValueDB, KeyValueDBPinBoxFuture,
11};
12use parking_lot::Mutex;
13use std::sync::Arc;
14use std::time::{Duration, Instant};
15use std::{
16    io,
17    path::{Path, PathBuf},
18    str::FromStr,
19};
20use tools::*;
21
22///////////////////////////////////////////////////////////////////////////////
23
24#[derive(Copy, Clone, Debug, Eq, PartialEq)]
25pub enum VacuumMode {
26    None,
27    Incremental,
28    Full,
29}
30
31/// Database configuration
32#[derive(Clone)]
33pub struct DatabaseConfig {
34    /// Set number of columns.
35    /// The number of columns must not be zero.
36    pub columns: u32,
37    /// Set flags used to open the database
38    pub flags: OpenFlags,
39    /// Number of connections to open
40    pub num_conns: usize,
41    /// Vacuum mode
42    pub vacuum_mode: VacuumMode,
43}
44
45impl DatabaseConfig {
46    /// Create new `DatabaseConfig` with default parameters
47    pub fn new() -> Self {
48        Default::default()
49    }
50
51    /// Set the number of columns. `columns` must not be zero.
52    pub fn with_columns(self, columns: u32) -> Self {
53        assert!(columns > 0, "the number of columns must not be zero");
54        Self { columns, ..self }
55    }
56
57    /// Sets the flags to 'in-memory database'
58    pub fn with_in_memory(self) -> Self {
59        Self {
60            flags: OpenFlags::SQLITE_OPEN_READ_WRITE
61                | OpenFlags::SQLITE_OPEN_CREATE
62                | OpenFlags::SQLITE_OPEN_NO_MUTEX
63                | OpenFlags::SQLITE_OPEN_MEMORY,
64            ..self
65        }
66    }
67
68    /// Replaces all the flags
69    pub fn with_flags(self, flags: OpenFlags) -> Self {
70        Self { flags, ..self }
71    }
72
73    /// Sets the number of connections for this database
74    pub fn with_num_conns(self, num_conns: usize) -> Self {
75        Self { num_conns, ..self }
76    }
77
78    /// Set the vacuum mode used by 'cleanup'
79    pub fn with_vacuum_mode(self, vacuum_mode: VacuumMode) -> Self {
80        Self {
81            vacuum_mode,
82            ..self
83        }
84    }
85}
86
87impl Default for DatabaseConfig {
88    fn default() -> DatabaseConfig {
89        DatabaseConfig {
90            columns: 1,
91            flags: OpenFlags::SQLITE_OPEN_READ_WRITE
92                | OpenFlags::SQLITE_OPEN_CREATE
93                | OpenFlags::SQLITE_OPEN_NO_MUTEX,
94            num_conns: 1,
95            vacuum_mode: VacuumMode::None,
96        }
97    }
98}
99
100///////////////////////////////////////////////////////////////////////////////
101
102/// An sqlite table with its statement strings
103pub struct DatabaseTable {
104    _table: String,
105    str_has_value: String,
106    str_has_value_like: String,
107    str_get_unique_value: String,
108    str_get_first_value_like: String,
109    str_set_unique_value: String,
110    str_remove_unique_value: String,
111    str_remove_and_return_unique_value: String,
112    str_remove_unique_value_like: String,
113    str_iter_with_prefix: String,
114    str_iter_no_prefix: String,
115    str_iter_keys_with_prefix: String,
116    str_iter_keys_no_prefix: String,
117}
118
119impl DatabaseTable {
120    pub fn new(table: String) -> Self {
121        let str_has_value = format!("SELECT 1 FROM {} WHERE [key] = ? LIMIT 1", table);
122        let str_has_value_like = format!(
123            "SELECT 1 FROM {} WHERE [key] LIKE ? ESCAPE '\\' LIMIT 1",
124            table
125        );
126        let str_get_unique_value = format!("SELECT value FROM {} WHERE [key] = ? LIMIT 1", table);
127        let str_get_first_value_like = format!(
128            "SELECT key, value FROM {} WHERE [key] LIKE ? ESCAPE '\\' LIMIT 1",
129            table
130        );
131        let str_set_unique_value = format!(
132            "INSERT OR REPLACE INTO {} ([key], value) VALUES(?, ?)",
133            table
134        );
135        let str_remove_unique_value = format!("DELETE FROM {} WHERE [key] = ?", table);
136        let str_remove_and_return_unique_value =
137            format!("DELETE FROM {} WHERE [key] = ? RETURNING value", table);
138        let str_remove_unique_value_like =
139            format!("DELETE FROM {} WHERE [key] LIKE ? ESCAPE '\\'", table);
140        let str_iter_with_prefix = format!(
141            "SELECT key, value FROM {} WHERE [key] LIKE ? ESCAPE '\\'",
142            table
143        );
144        let str_iter_no_prefix = format!("SELECT key, value FROM {}", table);
145        let str_iter_keys_with_prefix =
146            format!("SELECT key FROM {} WHERE [key] LIKE ? ESCAPE '\\'", table);
147        let str_iter_keys_no_prefix = format!("SELECT key FROM {}", table);
148
149        Self {
150            _table: table,
151            str_has_value,
152            str_has_value_like,
153            str_get_unique_value,
154            str_get_first_value_like,
155            str_set_unique_value,
156            str_remove_unique_value,
157            str_remove_and_return_unique_value,
158            str_remove_unique_value_like,
159            str_iter_with_prefix,
160            str_iter_no_prefix,
161            str_iter_keys_with_prefix,
162            str_iter_keys_no_prefix,
163        }
164    }
165}
166
167///////////////////////////////////////////////////////////////////////////////
168
169/// An sqlite key-value database fulfilling the `KeyValueDB` trait
170pub struct DatabaseUnlockedInner {
171    path: PathBuf,
172    config: DatabaseConfig,
173    pool: Pool,
174    control_table: Arc<DatabaseTable>,
175    column_tables: Vec<Arc<DatabaseTable>>,
176}
177
178impl Drop for DatabaseUnlockedInner {
179    fn drop(&mut self) {
180        let _ = self.pool.close_blocking();
181    }
182}
183
184pub struct DatabaseInner {
185    overall_stats: IoStats,
186    current_stats: IoStats,
187}
188
189#[derive(Clone)]
190pub struct Database {
191    unlocked_inner: Arc<DatabaseUnlockedInner>,
192    inner: Arc<Mutex<DatabaseInner>>,
193}
194
195impl Database {
196    ////////////////////////////////////////////////////////////////
197    // Initialization
198
199    pub fn open<P: AsRef<Path>>(path: P, config: DatabaseConfig) -> io::Result<Self> {
200        assert_ne!(config.columns, 0, "number of columns must be >= 1");
201
202        let path = PathBuf::from(path.as_ref());
203        let flags = config.flags;
204
205        let mut column_tables = vec![];
206        for n in 0..config.columns {
207            column_tables.push(Arc::new(DatabaseTable::new(get_column_table_name(n))))
208        }
209        let control_table = Arc::new(DatabaseTable::new("control".to_string()));
210
211        let pool_builder = PoolBuilder::new()
212            .path(&path)
213            .flags(flags)
214            .num_conns(config.num_conns);
215
216        let pool = pool_builder.open_blocking().map_err(io::Error::other)?;
217
218        let out = Self {
219            unlocked_inner: Arc::new(DatabaseUnlockedInner {
220                path,
221                config,
222                pool,
223                control_table,
224                column_tables,
225            }),
226            inner: Arc::new(Mutex::new(DatabaseInner {
227                overall_stats: IoStats::empty(),
228                current_stats: IoStats::empty(),
229            })),
230        };
231
232        let vacuum_mode = out.config().vacuum_mode;
233
234        out.conn_blocking(move |conn| {
235            // Don't rely on STATEMENT_CACHE_DEFAULT_CAPACITY in rusqlite, set it explicitly
236            conn.set_prepared_statement_cache_capacity(256);
237
238            conn.pragma_update(None, "case_sensitive_like", "ON")?;
239            conn.pragma_update(None, "journal_mode", "WAL")?;
240            conn.pragma_update(None, "synchronous", "normal")?;
241            conn.pragma_update(None, "journal_size_limit", 6144000)?;
242            conn.pragma_update(None, "wal_checkpoint", "TRUNCATE")?;
243
244            match vacuum_mode {
245                VacuumMode::None | VacuumMode::Full => {
246                    let current: u32 =
247                        conn.pragma_query_value(None, "auto_vacuum", |x| x.get(0))?;
248                    if current != 0 {
249                        conn.execute("VACUUM", [])?;
250                        conn.pragma_update(None, "auto_vacuum", 0)?;
251                    }
252                }
253                VacuumMode::Incremental => {
254                    let current: u32 =
255                        conn.pragma_query_value(None, "auto_vacuum", |x| x.get(0))?;
256                    if current != 2 {
257                        conn.execute("VACUUM", [])?;
258                        conn.pragma_update(None, "auto_vacuum", "2")?;
259                    }
260                }
261            }
262
263            Ok(())
264        })
265        .map_err(io::Error::other)?;
266
267        out.open_resize_columns()?;
268
269        Ok(out)
270    }
271
272    pub fn path(&self) -> PathBuf {
273        self.unlocked_inner.path.clone()
274    }
275
276    pub fn config(&self) -> DatabaseConfig {
277        self.unlocked_inner.config.clone()
278    }
279
280    pub fn columns(&self) -> u32 {
281        self.unlocked_inner.config.columns
282    }
283
284    pub fn control_table(&self) -> Arc<DatabaseTable> {
285        self.unlocked_inner.control_table.clone()
286    }
287
288    pub fn column_table(&self, col: u32) -> Arc<DatabaseTable> {
289        self.unlocked_inner.column_tables[col as usize].clone()
290    }
291
292    pub fn conn_blocking<T, F>(&self, func: F) -> Result<T, Error>
293    where
294        F: FnOnce(&rusqlite::Connection) -> Result<T, rusqlite::Error> + Send + 'static,
295        T: Send + 'static,
296    {
297        self.unlocked_inner.pool.conn_blocking(func)
298    }
299
300    pub async fn conn<T, F>(&self, func: F) -> Result<T, Error>
301    where
302        F: FnOnce(&rusqlite::Connection) -> Result<T, rusqlite::Error> + Send + 'static,
303        T: Send + 'static,
304    {
305        self.unlocked_inner.pool.conn(func).await
306    }
307
308    pub async fn conn_mut<T, F>(&self, func: F) -> Result<T, Error>
309    where
310        F: FnOnce(&mut rusqlite::Connection) -> Result<T, rusqlite::Error> + Send + 'static,
311        T: Send + 'static,
312    {
313        self.unlocked_inner.pool.conn_mut(func).await
314    }
315
316    ////////////////////////////////////////////////////////////////
317    // Low level operations
318
319    /// Remove the last column family in the database. The deletion is definitive.
320    pub fn remove_last_column(&self) -> Result<(), Error> {
321        let this = self.clone();
322        self.conn_blocking(move |conn| {
323            let columns = Self::get_unique_value(conn, this.control_table(), "columns", 0u32)?;
324            if columns == 0 {
325                return Err(rusqlite::Error::QueryReturnedNoRows);
326            }
327            Self::set_unique_value(conn, this.control_table(), "columns", columns - 1)?;
328
329            conn.execute(
330                &format!("DROP TABLE {}", get_column_table_name(columns - 1)),
331                [],
332            )?;
333            Ok(())
334        })
335    }
336
337    /// Add a new column family to the DB.
338    pub fn add_column(&self) -> Result<(), Error> {
339        let this = self.clone();
340
341        self.conn_blocking(move |conn| {
342            let columns = Self::get_unique_value(conn, this.control_table(), "columns", 0u32)?;
343            Self::set_unique_value(conn, this.control_table(), "columns", columns + 1)?;
344            Self::create_column_table(conn, columns)
345        })
346    }
347    /// Helper to create new transaction for this database.
348    pub fn transaction(&self) -> DBTransaction {
349        DBTransaction::new()
350    }
351
352    /// Vacuum database
353    pub async fn vacuum(&self) -> Result<(), Error> {
354        match self.config().vacuum_mode {
355            VacuumMode::None => {
356                self.conn(move |conn| {
357                    conn.pragma_update(None, "wal_checkpoint", "TRUNCATE")?;
358                    Ok(())
359                })
360                .await
361            }
362            VacuumMode::Incremental => {
363                self.conn(move |conn| {
364                    conn.execute("PRAGMA incremental_vacuum", [])?;
365                    conn.pragma_update(None, "wal_checkpoint", "TRUNCATE")?;
366                    Ok(())
367                })
368                .await
369            }
370            VacuumMode::Full => {
371                self.conn(move |conn| {
372                    conn.execute("VACUUM", [])?;
373                    conn.pragma_update(None, "wal_checkpoint", "TRUNCATE")?;
374                    Ok(())
375                })
376                .await
377            }
378        }
379    }
380
381    ////////////////////////////////////////////////////////////////
382    // Internal helpers
383
384    fn validate_column(&self, col: u32) -> rusqlite::Result<()> {
385        if col >= self.columns() {
386            return Err(rusqlite::Error::InvalidColumnIndex(col as usize));
387        }
388        Ok(())
389    }
390
391    fn create_column_table(conn: &rusqlite::Connection, column: u32) -> rusqlite::Result<()> {
392        conn.execute(&format!("CREATE TABLE IF NOT EXISTS {} (id INTEGER PRIMARY KEY AUTOINCREMENT, [key] TEXT UNIQUE, value BLOB)", get_column_table_name(column)), []).map(drop)
393    }
394
395    fn get_unique_value<V>(
396        conn: &rusqlite::Connection,
397        table: Arc<DatabaseTable>,
398        key: &str,
399        default: V,
400    ) -> rusqlite::Result<V>
401    where
402        V: FromStr,
403    {
404        let mut stmt = conn.prepare_cached(&table.str_get_unique_value)?;
405
406        if let Ok(found) = stmt.query_row([key], |row| -> rusqlite::Result<String> { row.get(0) }) {
407            if let Ok(v) = V::from_str(&found) {
408                return Ok(v);
409            }
410        }
411        Ok(default)
412    }
413
414    fn set_unique_value<V>(
415        conn: &rusqlite::Connection,
416        table: Arc<DatabaseTable>,
417        key: &str,
418        value: V,
419    ) -> rusqlite::Result<()>
420    where
421        V: ToString,
422    {
423        let mut stmt = conn.prepare_cached(&table.str_set_unique_value)?;
424
425        let changed = stmt.execute([key, value.to_string().as_str()])?;
426
427        assert!(
428            changed <= 1,
429            "multiple changes to unique key should not occur"
430        );
431        if changed == 0 {
432            return Err(rusqlite::Error::QueryReturnedNoRows);
433        }
434
435        Ok(())
436    }
437
438    fn has_value(
439        conn: &rusqlite::Connection,
440        table: Arc<DatabaseTable>,
441        key: &str,
442    ) -> rusqlite::Result<bool> {
443        let mut stmt = conn.prepare_cached(&table.str_has_value)?;
444        stmt.exists([key])
445    }
446
447    fn has_value_like(
448        conn: &rusqlite::Connection,
449        table: Arc<DatabaseTable>,
450        key: &str,
451    ) -> rusqlite::Result<bool> {
452        let mut stmt = conn.prepare_cached(&table.str_has_value_like)?;
453        stmt.exists([key])
454    }
455
456    fn load_unique_value_blob(
457        conn: &rusqlite::Connection,
458        table: Arc<DatabaseTable>,
459        key: &str,
460    ) -> rusqlite::Result<Option<Vec<u8>>> {
461        let mut stmt = conn.prepare_cached(&table.str_get_unique_value)?;
462
463        stmt.query_row([key], |row| -> rusqlite::Result<Vec<u8>> { row.get(0) })
464            .optional()
465    }
466
467    fn load_first_value_blob_like(
468        conn: &rusqlite::Connection,
469        table: Arc<DatabaseTable>,
470        like: &str,
471    ) -> rusqlite::Result<Option<(String, Vec<u8>)>> {
472        let mut stmt = conn.prepare_cached(&table.str_get_first_value_like)?;
473
474        stmt.query_row([like], |row| -> rusqlite::Result<(String, Vec<u8>)> {
475            Ok((row.get(0)?, row.get(1)?))
476        })
477        .optional()
478    }
479
480    fn store_unique_value_blob(
481        conn: &rusqlite::Connection,
482        table: Arc<DatabaseTable>,
483        key: &str,
484        value: &[u8],
485    ) -> rusqlite::Result<()> {
486        let mut stmt = conn.prepare_cached(&table.str_set_unique_value)?;
487
488        let changed = stmt.execute(params![key, value])?;
489        assert!(
490            changed <= 1,
491            "multiple changes to unique key should not occur"
492        );
493        if changed == 0 {
494            return Err(rusqlite::Error::QueryReturnedNoRows);
495        }
496        Ok(())
497    }
498
499    fn remove_unique_value_blob(
500        conn: &rusqlite::Connection,
501        table: Arc<DatabaseTable>,
502        key: &str,
503    ) -> rusqlite::Result<()> {
504        let mut stmt = conn.prepare_cached(&table.str_remove_unique_value)?;
505
506        let _ = stmt.execute([key])?;
507
508        Ok(())
509    }
510
511    fn remove_and_return_unique_value_blob(
512        conn: &rusqlite::Connection,
513        table: Arc<DatabaseTable>,
514        key: &str,
515    ) -> rusqlite::Result<Option<Vec<u8>>> {
516        let mut stmt = conn.prepare_cached(&table.str_remove_and_return_unique_value)?;
517
518        stmt.query_row([key], |row| -> rusqlite::Result<Vec<u8>> { row.get(0) })
519            .optional()
520    }
521
522    fn remove_unique_value_blob_like(
523        conn: &rusqlite::Connection,
524        table: Arc<DatabaseTable>,
525        like: &str,
526    ) -> rusqlite::Result<usize> {
527        let mut stmt = conn.prepare_cached(&table.str_remove_unique_value_like)?;
528
529        let changed = stmt.execute([like])?;
530        Ok(changed)
531    }
532
533    fn open_resize_columns(&self) -> io::Result<()> {
534        let columns = self.columns();
535        let this = self.clone();
536        self.conn_blocking(move |conn| {
537			// First see if we have a control table with the number of columns
538			conn.execute("CREATE TABLE IF NOT EXISTS control (id INTEGER PRIMARY KEY AUTOINCREMENT, [key] TEXT UNIQUE, value TEXT)", [])?;
539
540            // Get column count
541            let on_disk_columns =
542                Self::get_unique_value(conn, this.control_table(), "columns", 0u32)?;
543
544            // If desired column count is less than or equal to current column count, then allow it, but restrict access to columns
545            if columns <= on_disk_columns {
546                return Ok(());
547            }
548
549            // Otherwise resize and add other columns
550            for cn in on_disk_columns..columns {
551                // Create the column table if we don't have it
552                Self::create_column_table(conn, cn)?;
553            }
554            Self::set_unique_value(
555                conn,
556                this.control_table(),
557                "columns",
558                columns,
559            )?;
560            Ok(())
561        }).map_err(io::Error::other)
562    }
563
564    fn stats_read(&self, count: usize, bytes: usize) {
565        let mut inner = self.inner.lock();
566        inner.current_stats.reads += count as u64;
567        inner.overall_stats.reads += count as u64;
568        inner.current_stats.bytes_read += bytes as u64;
569        inner.overall_stats.bytes_read += bytes as u64;
570    }
571
572    fn stats_write(&self, sizes: &[usize]) {
573        if sizes.is_empty() {
574            return;
575        }
576
577        let mut inner = self.inner.lock();
578        for &size in sizes {
579            inner.current_stats.record_write(size);
580            inner.overall_stats.record_write(size);
581        }
582    }
583
584    fn stats_tx_write(&self, size: usize, duration: Duration) {
585        let mut inner = self.inner.lock();
586        inner
587            .current_stats
588            .record_tx_write(size, duration.as_micros() as f64);
589        inner
590            .overall_stats
591            .record_tx_write(size, duration.as_micros() as f64);
592    }
593
594    fn stats_delete(&self, count: usize) {
595        if count == 0 {
596            return;
597        }
598
599        let mut inner = self.inner.lock();
600        inner.current_stats.deletes += count as u64;
601        inner.overall_stats.deletes += count as u64;
602    }
603
604    fn stats_delete_prefix(&self, count: usize) {
605        if count == 0 {
606            return;
607        }
608
609        let mut inner = self.inner.lock();
610        inner.current_stats.prefix_deletes += count as u64;
611        inner.overall_stats.prefix_deletes += count as u64;
612    }
613
614    fn stats_transaction(&self, count: usize) {
615        let mut inner = self.inner.lock();
616        inner.current_stats.transactions += count as u64;
617        inner.overall_stats.transactions += count as u64;
618    }
619}
620
621impl KeyValueDB for Database {
622    fn get(&self, col: u32, key: &[u8]) -> KeyValueDBPinBoxFuture<'_, io::Result<Option<DBValue>>> {
623        let key_text = key_to_text(key);
624        let key_len = key.len();
625
626        Box::pin(async move {
627            let that = self.clone();
628            that.validate_column(col).map_err(io::Error::other)?;
629            let someval = self
630                .conn_blocking(move |conn| {
631                    Self::load_unique_value_blob(conn, that.column_table(col), &key_text)
632                })
633                .map_err(io::Error::other)?;
634            {
635                match &someval {
636                    Some(val) => self.stats_read(1, key_len + val.len()),
637                    None => self.stats_read(1, key_len),
638                }
639            }
640
641            Ok(someval)
642        })
643    }
644
645    /// Remove a value by key, returning the old value
646    fn delete(
647        &self,
648        col: u32,
649        key: &[u8],
650    ) -> KeyValueDBPinBoxFuture<'_, io::Result<Option<DBValue>>> {
651        let key_text = key_to_text(key);
652        let key_len = key.len();
653
654        Box::pin(async move {
655            let that = self.clone();
656            that.validate_column(col).map_err(io::Error::other)?;
657            self.conn_blocking(move |conn| {
658                let someval = Self::remove_and_return_unique_value_blob(
659                    conn,
660                    that.column_table(col),
661                    &key_text,
662                )?;
663
664                match &someval {
665                    Some(val) => {
666                        that.stats_read(1, key_len + val.len());
667                    }
668                    None => that.stats_read(1, key_len),
669                }
670
671                Ok(someval)
672            })
673            .map_err(io::Error::other)
674        })
675    }
676
677    fn write(
678        &self,
679        transaction: DBTransaction,
680    ) -> KeyValueDBPinBoxFuture<'_, Result<(), DBTransactionError>> {
681        let transaction = Arc::new(transaction);
682        Box::pin(async move {
683            self.stats_transaction(1);
684
685            let that = self.clone();
686            let transaction_clone = transaction.clone();
687            self.conn_mut(move |conn| {
688                let mut sizes = Vec::with_capacity(transaction_clone.ops.len());
689                let mut total_tx_size = 0;
690                let mut deletes = 0usize;
691                let mut prefix_deletes = 0usize;
692                let start = Instant::now();
693
694                let tx = conn.transaction()?;
695
696                for op in &transaction_clone.ops {
697                    match op {
698                        DBOp::Insert { col, key, value } => {
699                            that.validate_column(*col)?;
700                            Self::store_unique_value_blob(
701                                &tx,
702                                that.column_table(*col),
703                                &key_to_text(key),
704                                value,
705                            )?;
706                            sizes.push(key.len() + value.len());
707                            total_tx_size += key.len() + value.len();
708                        }
709                        DBOp::Delete { col, key } => {
710                            that.validate_column(*col)?;
711                            Self::remove_unique_value_blob(
712                                &tx,
713                                that.column_table(*col),
714                                &key_to_text(key),
715                            )?;
716                            deletes += 1;
717                        }
718                        DBOp::DeletePrefix { col, prefix } => {
719                            that.validate_column(*col)?;
720                            Self::remove_unique_value_blob_like(
721                                &tx,
722                                that.column_table(*col),
723                                &(like_key_to_text(prefix) + "%"),
724                            )?;
725                            prefix_deletes += 1;
726                        }
727                    }
728                }
729                tx.commit()?;
730
731                let duration = Instant::now() - start;
732                that.stats_write(&sizes);
733                that.stats_tx_write(total_tx_size, duration);
734                that.stats_delete(deletes);
735                that.stats_delete_prefix(prefix_deletes);
736
737                Ok(())
738            })
739            .await
740            .map_err(io::Error::other)
741            .map_err(|error| {
742                let transaction = transaction.as_ref().clone();
743                DBTransactionError { error, transaction }
744            })
745        })
746    }
747
748    fn iter<
749        'a,
750        T: Send + 'static,
751        C: Send + 'static,
752        F: FnMut(&mut C, DBKeyValueRef) -> io::Result<Option<T>> + Send + Sync + 'static,
753    >(
754        &'a self,
755        col: u32,
756        prefix: Option<&'a [u8]>,
757        context: C,
758        mut f: F,
759    ) -> KeyValueDBPinBoxFuture<'a, io::Result<(C, Option<T>)>> {
760        let opt_prefix_query = prefix.map(|p| like_key_to_text(p) + "%");
761        Box::pin(async move {
762            if col >= self.columns() {
763                return Err(io::Error::from(io::ErrorKind::NotFound));
764            }
765
766            let that = self.clone();
767            let context = Arc::new(Mutex::new(Some(context)));
768            let context_ref = context.clone();
769
770            let res = self
771                .conn(move |conn| {
772                    let mut context = context_ref.lock();
773                    let context = context.as_mut().unwrap();
774
775                    let mut stmt;
776                    let mut rows;
777                    if let Some(prefix_query) = opt_prefix_query {
778                        stmt = match conn
779                            .prepare_cached(&that.column_table(col).str_iter_with_prefix)
780                        {
781                            Ok(v) => v,
782                            Err(e) => {
783                                return Ok(Err(io::Error::other(e)));
784                            }
785                        };
786                        rows = match stmt.query([prefix_query]) {
787                            Ok(v) => v,
788                            Err(e) => {
789                                return Ok(Err(io::Error::other(e)));
790                            }
791                        };
792                    } else {
793                        stmt = match conn.prepare_cached(&that.column_table(col).str_iter_no_prefix)
794                        {
795                            Ok(v) => v,
796                            Err(e) => {
797                                return Ok(Err(io::Error::other(e)));
798                            }
799                        };
800                        rows = match stmt.query([]) {
801                            Ok(v) => v,
802                            Err(e) => {
803                                return Ok(Err(io::Error::other(e)));
804                            }
805                        };
806                    }
807
808                    let mut sw = 0usize;
809                    let mut sbw = 0usize;
810
811                    let out = loop {
812                        match rows.next() {
813                            // Iterated value
814                            Ok(Some(row)) => {
815                                let kt: String = match row.get(0) {
816                                    Err(e) => {
817                                        break Err(io::Error::other(e));
818                                    }
819                                    Ok(v) => v,
820                                };
821                                let v: Vec<u8> = match row.get(1) {
822                                    Err(e) => {
823                                        break Err(io::Error::other(e));
824                                    }
825                                    Ok(v) => v,
826                                };
827                                let k: Vec<u8> = match text_to_key(&kt) {
828                                    Err(e) => {
829                                        break Err(io::Error::other(format!(
830                                            "SQLite row get column 0 text convert error: {:?}",
831                                            e
832                                        )));
833                                    }
834                                    Ok(v) => v,
835                                };
836
837                                sw += 1;
838                                sbw += k.len() + v.len();
839
840                                match f(context, (&k, &v)) {
841                                    Ok(None) => (),
842                                    Ok(Some(out)) => {
843                                        // Callback early termination
844                                        that.stats_read(sw, sbw);
845                                        break Ok(Some(out));
846                                    }
847                                    Err(e) => {
848                                        // Callback error termination
849                                        that.stats_read(sw, sbw);
850                                        break Err(e);
851                                    }
852                                }
853                            }
854                            // Natural iterator termination
855                            Ok(None) => {
856                                break Ok(None);
857                            }
858                            // Error iterator termination
859                            Err(_) => {
860                                break Ok(None);
861                            }
862                        }
863                    };
864
865                    that.stats_read(sw, sbw);
866
867                    Ok(out)
868                })
869                .await
870                .map_err(io::Error::other)?;
871
872            let context = context.lock().take().unwrap();
873
874            res.map(|x| (context, x))
875        })
876    }
877
878    fn iter_keys<
879        'a,
880        T: Send + 'static,
881        C: Send + 'static,
882        F: FnMut(&mut C, DBKeyRef) -> io::Result<Option<T>> + Send + Sync + 'static,
883    >(
884        &'a self,
885        col: u32,
886        prefix: Option<&'a [u8]>,
887        context: C,
888        mut f: F,
889    ) -> KeyValueDBPinBoxFuture<'a, io::Result<(C, Option<T>)>> {
890        let opt_prefix_query = prefix.map(|p| like_key_to_text(p) + "%");
891        Box::pin(async move {
892            if col >= self.columns() {
893                return Err(io::Error::from(io::ErrorKind::NotFound));
894            }
895
896            let that = self.clone();
897            let context = Arc::new(Mutex::new(Some(context)));
898            let context_ref = context.clone();
899
900            let res = self
901                .conn(move |conn| {
902                    let mut context = context_ref.lock();
903                    let context = context.as_mut().unwrap();
904
905                    let mut stmt;
906                    let mut rows;
907                    if let Some(prefix_query) = opt_prefix_query {
908                        stmt = match conn
909                            .prepare_cached(&that.column_table(col).str_iter_keys_with_prefix)
910                        {
911                            Ok(v) => v,
912                            Err(e) => {
913                                return Ok(Err(io::Error::other(e)));
914                            }
915                        };
916                        rows = match stmt.query([prefix_query]) {
917                            Ok(v) => v,
918                            Err(e) => {
919                                return Ok(Err(io::Error::other(e)));
920                            }
921                        };
922                    } else {
923                        stmt = match conn
924                            .prepare_cached(&that.column_table(col).str_iter_keys_no_prefix)
925                        {
926                            Ok(v) => v,
927                            Err(e) => {
928                                return Ok(Err(io::Error::other(e)));
929                            }
930                        };
931                        rows = match stmt.query([]) {
932                            Ok(v) => v,
933                            Err(e) => {
934                                return Ok(Err(io::Error::other(e)));
935                            }
936                        };
937                    }
938
939                    let mut sw = 0usize;
940                    let mut sbw = 0usize;
941
942                    let out = loop {
943                        match rows.next() {
944                            // Iterated value
945                            Ok(Some(row)) => {
946                                let kt: String = match row.get(0) {
947                                    Err(e) => {
948                                        break Err(io::Error::other(e));
949                                    }
950                                    Ok(v) => v,
951                                };
952                                let k: Vec<u8> = match text_to_key(&kt) {
953                                    Err(e) => {
954                                        break Err(io::Error::other(format!(
955                                            "SQLite row get column 0 text convert error: {:?}",
956                                            e
957                                        )));
958                                    }
959                                    Ok(v) => v,
960                                };
961
962                                sw += 1;
963                                sbw += k.len();
964
965                                match f(context, &k) {
966                                    Ok(None) => (),
967                                    Ok(Some(out)) => {
968                                        // Callback early termination
969                                        that.stats_read(sw, sbw);
970                                        break Ok(Some(out));
971                                    }
972                                    Err(e) => {
973                                        // Callback error termination
974                                        that.stats_read(sw, sbw);
975                                        break Err(e);
976                                    }
977                                }
978                            }
979                            // Natural iterator termination
980                            Ok(None) => {
981                                break Ok(None);
982                            }
983                            // Error iterator termination
984                            Err(_) => {
985                                break Ok(None);
986                            }
987                        }
988                    };
989
990                    that.stats_read(sw, sbw);
991
992                    Ok(out)
993                })
994                .await
995                .map_err(io::Error::other)?;
996
997            let context = context.lock().take().unwrap();
998
999            res.map(|x| (context, x))
1000        })
1001    }
1002
1003    fn io_stats(&self, kind: IoStatsKind) -> IoStats {
1004        fn duration_since(timestamp_microseconds: u64) -> Duration {
1005            std::time::SystemTime::now()
1006                .duration_since(std::time::UNIX_EPOCH)
1007                .map_or(Duration::from_micros(0), |time| {
1008                    let now = time.as_micros() as u64;
1009                    if now >= timestamp_microseconds {
1010                        Duration::from_micros(now - timestamp_microseconds)
1011                    } else {
1012                        Duration::from_micros(0)
1013                    }
1014                })
1015        }
1016
1017        let mut inner = self.inner.lock();
1018        match kind {
1019            IoStatsKind::Overall => {
1020                let mut stats = inner.overall_stats.clone();
1021                stats.span = duration_since(stats.started);
1022                stats
1023            }
1024            IoStatsKind::SincePrevious => {
1025                let mut stats = inner.current_stats.clone();
1026                stats.span = duration_since(stats.started);
1027                inner.current_stats = IoStats::empty();
1028                stats
1029            }
1030        }
1031    }
1032
1033    fn num_columns(&self) -> io::Result<u32> {
1034        let this = self.clone();
1035        self.conn_blocking(move |conn| {
1036            Self::get_unique_value(conn, this.control_table(), "columns", 0u32)
1037        })
1038        .map_err(io::Error::other)
1039    }
1040
1041    fn num_keys(&self, col: u32) -> KeyValueDBPinBoxFuture<'_, io::Result<u64>> {
1042        let this = self.clone();
1043        Box::pin(async move {
1044            this.conn(move |conn| {
1045                conn.query_row(
1046                    &format!("SELECT Count(*) FROM {}", get_column_table_name(col)),
1047                    [],
1048                    |row| -> rusqlite::Result<u64> { row.get(0) },
1049                )
1050            })
1051            .await
1052            .map_err(|_| io::Error::from(io::ErrorKind::NotFound))
1053        })
1054    }
1055
1056    /// Check for the existence of a value by key.
1057    fn has_key<'a>(
1058        &'a self,
1059        col: u32,
1060        key: &'a [u8],
1061    ) -> KeyValueDBPinBoxFuture<'a, io::Result<bool>> {
1062        let key_text = key_to_text(key);
1063        let key_len = key.len();
1064
1065        Box::pin(async move {
1066            let that = self.clone();
1067            that.validate_column(col).map_err(io::Error::other)?;
1068            let someval = self
1069                .conn_blocking(move |conn| Self::has_value(conn, that.column_table(col), &key_text))
1070                .map_err(io::Error::other)?;
1071
1072            self.stats_read(1, key_len);
1073
1074            Ok(someval)
1075        })
1076    }
1077
1078    /// Check for the existence of a value by prefix.
1079    fn has_prefix<'a>(
1080        &'a self,
1081        col: u32,
1082        prefix: &'a [u8],
1083    ) -> KeyValueDBPinBoxFuture<'a, io::Result<bool>> {
1084        let prefix_len = prefix.len();
1085        let prefix_text = like_key_to_text(prefix) + "%";
1086
1087        Box::pin(async move {
1088            let that = self.clone();
1089            that.validate_column(col).map_err(io::Error::other)?;
1090            let someval = self
1091                .conn_blocking(move |conn| {
1092                    Self::has_value_like(conn, that.column_table(col), &prefix_text)
1093                })
1094                .map_err(io::Error::other)?;
1095
1096            self.stats_read(1, prefix_len);
1097
1098            Ok(someval)
1099        })
1100    }
1101
1102    /// Get the first value matching the given prefix.
1103    fn first_with_prefix<'a>(
1104        &'a self,
1105        col: u32,
1106        prefix: &'a [u8],
1107    ) -> KeyValueDBPinBoxFuture<'a, io::Result<Option<DBKeyValue>>> {
1108        let prefix_len = prefix.len();
1109        let like = like_key_to_text(prefix) + "%";
1110
1111        Box::pin(async move {
1112            let that = self.clone();
1113            that.validate_column(col).map_err(io::Error::other)?;
1114            let someval = self
1115                .conn_blocking(move |conn| {
1116                    Self::load_first_value_blob_like(conn, that.column_table(col), &like)
1117                })
1118                .map_err(io::Error::other)?;
1119
1120            self.stats_read(1, prefix_len);
1121
1122            match someval {
1123                Some((kt, val)) => match text_to_key(&kt) {
1124                    Err(e) => Err(io::Error::other(format!(
1125                        "SQLite row get column 0 text convert error: {:?}",
1126                        e
1127                    ))),
1128                    Ok(k) => Ok(Some((k, val))),
1129                },
1130                None => Ok(None),
1131            }
1132        })
1133    }
1134
1135    /// Vacuum database
1136    fn cleanup(&self) -> KeyValueDBPinBoxFuture<'_, io::Result<()>> {
1137        Box::pin(async { self.vacuum().await.map_err(io::Error::other) })
1138    }
1139}
1140
1141#[cfg(test)]
1142mod tests {
1143
1144    use super::*;
1145    use keyvaluedb_shared_tests as st;
1146    use tempfile::Builder as TempfileBuilder;
1147
1148    fn create(columns: u32) -> io::Result<Database> {
1149        let tempfile = TempfileBuilder::new()
1150            .prefix("")
1151            .tempfile()?
1152            .path()
1153            .to_path_buf();
1154        let config = DatabaseConfig::new().with_columns(columns);
1155        Database::open(tempfile, config)
1156    }
1157
1158    fn create_vacuum_mode(columns: u32, vacuum_mode: VacuumMode) -> io::Result<Database> {
1159        let tempfile = TempfileBuilder::new()
1160            .prefix("")
1161            .tempfile()?
1162            .path()
1163            .to_path_buf();
1164        let config = DatabaseConfig::new()
1165            .with_columns(columns)
1166            .with_vacuum_mode(vacuum_mode);
1167        Database::open(tempfile, config)
1168    }
1169
1170    #[tokio::test]
1171    async fn get_fails_with_non_existing_column() -> io::Result<()> {
1172        let db = create(1)?;
1173        st::test_get_fails_with_non_existing_column(db).await
1174    }
1175
1176    #[tokio::test]
1177    async fn num_keys() -> io::Result<()> {
1178        let db = create(1)?;
1179        st::test_num_keys(db).await
1180    }
1181
1182    #[tokio::test]
1183    async fn put_and_get() -> io::Result<()> {
1184        let db = create(1)?;
1185        st::test_put_and_get(db).await
1186    }
1187
1188    #[tokio::test]
1189    async fn delete_and_get() -> io::Result<()> {
1190        let db = create(1)?;
1191        st::test_delete_and_get(db).await
1192    }
1193
1194    #[tokio::test]
1195    async fn delete_and_get_single() -> io::Result<()> {
1196        let db = create(1)?;
1197        st::test_delete_and_get_single(db).await
1198    }
1199
1200    #[tokio::test]
1201    async fn delete_prefix() -> io::Result<()> {
1202        let db = create(st::DELETE_PREFIX_NUM_COLUMNS)?;
1203        st::test_delete_prefix(db).await
1204    }
1205
1206    #[tokio::test]
1207    async fn iter() -> io::Result<()> {
1208        let db = create(1)?;
1209        st::test_iter(db).await
1210    }
1211
1212    #[tokio::test]
1213    async fn iter_keys() -> io::Result<()> {
1214        let db = create(1)?;
1215        st::test_iter_keys(db).await
1216    }
1217
1218    #[tokio::test]
1219    async fn iter_with_prefix() -> io::Result<()> {
1220        let db = create(1)?;
1221        st::test_iter_with_prefix(db).await
1222    }
1223
1224    #[tokio::test]
1225    async fn complex() -> io::Result<()> {
1226        let db = create(1)?;
1227        st::test_complex(db).await
1228    }
1229
1230    #[tokio::test]
1231    async fn cleanup() -> io::Result<()> {
1232        let db = create(1)?;
1233        st::test_cleanup(db).await?;
1234
1235        let db = create_vacuum_mode(1, VacuumMode::None)?;
1236        st::test_cleanup(db).await?;
1237
1238        let db = create_vacuum_mode(1, VacuumMode::Incremental)?;
1239        st::test_cleanup(db).await?;
1240
1241        let db = create_vacuum_mode(1, VacuumMode::Full)?;
1242        st::test_cleanup(db).await?;
1243
1244        let tempfile = TempfileBuilder::new()
1245            .prefix("")
1246            .tempfile()?
1247            .path()
1248            .to_path_buf();
1249        let config = DatabaseConfig::new().with_vacuum_mode(VacuumMode::None);
1250        let db = Database::open(tempfile.clone(), config)?;
1251        st::test_cleanup(db).await?;
1252
1253        let config = DatabaseConfig::new().with_vacuum_mode(VacuumMode::Incremental);
1254        let db = Database::open(tempfile.clone(), config)?;
1255        st::test_cleanup(db).await?;
1256
1257        let config = DatabaseConfig::new().with_vacuum_mode(VacuumMode::Full);
1258        let db = Database::open(tempfile.clone(), config)?;
1259        st::test_cleanup(db).await?;
1260
1261        let config = DatabaseConfig::new().with_vacuum_mode(VacuumMode::None);
1262        let db = Database::open(tempfile, config)?;
1263        st::test_cleanup(db).await?;
1264
1265        Ok(())
1266    }
1267
1268    #[tokio::test]
1269    async fn stats() -> io::Result<()> {
1270        let db = create(st::IO_STATS_NUM_COLUMNS)?;
1271        st::test_io_stats(db).await
1272    }
1273
1274    #[tokio::test]
1275    #[should_panic]
1276    async fn db_config_with_zero_columns() {
1277        let _cfg = DatabaseConfig::new().with_columns(0);
1278    }
1279
1280    #[tokio::test]
1281    #[should_panic]
1282    async fn open_db_with_zero_columns() {
1283        let cfg = DatabaseConfig::new().with_columns(0);
1284        let _db = Database::open("", cfg);
1285    }
1286
1287    #[tokio::test]
1288    async fn add_columns() {
1289        let config_1 = DatabaseConfig::default();
1290        let config_5 = DatabaseConfig::new().with_columns(5);
1291
1292        let tempfile = TempfileBuilder::new()
1293            .prefix("")
1294            .tempfile()
1295            .unwrap()
1296            .path()
1297            .to_path_buf();
1298
1299        // open 1, add 4.
1300        {
1301            let db = Database::open(&tempfile, config_1).unwrap();
1302            assert_eq!(db.num_columns().unwrap(), 1);
1303
1304            for i in 2..=5 {
1305                db.add_column().unwrap();
1306                assert_eq!(db.num_columns().unwrap(), i);
1307            }
1308        }
1309
1310        // reopen as 5.
1311        {
1312            let db = Database::open(&tempfile, config_5).unwrap();
1313            assert_eq!(db.num_columns().unwrap(), 5);
1314        }
1315    }
1316
1317    #[tokio::test]
1318    async fn remove_columns() {
1319        let config_1 = DatabaseConfig::default();
1320        let config_5 = DatabaseConfig::new().with_columns(5);
1321
1322        let tempfile = TempfileBuilder::new()
1323            .prefix("drop_columns")
1324            .tempfile()
1325            .unwrap()
1326            .path()
1327            .to_path_buf();
1328
1329        // open 5, remove 4.
1330        {
1331            let db = Database::open(&tempfile, config_5).expect("open with 5 columns");
1332            assert_eq!(db.num_columns().unwrap(), 5);
1333
1334            for i in (1..5).rev() {
1335                db.remove_last_column().unwrap();
1336                assert_eq!(db.num_columns().unwrap(), i);
1337            }
1338        }
1339
1340        // reopen as 1.
1341        {
1342            let db = Database::open(&tempfile, config_1).unwrap();
1343            assert_eq!(db.num_columns().unwrap(), 1);
1344        }
1345    }
1346
1347    #[tokio::test]
1348    async fn test_num_keys() {
1349        let tempfile = TempfileBuilder::new()
1350            .prefix("")
1351            .tempfile()
1352            .unwrap()
1353            .path()
1354            .to_path_buf();
1355        let config = DatabaseConfig::new().with_columns(1);
1356        let db = Database::open(tempfile, config).unwrap();
1357
1358        assert_eq!(
1359            db.num_keys(0).await.unwrap(),
1360            0,
1361            "database is empty after creation"
1362        );
1363        let key1 = b"beef";
1364        let mut batch = db.transaction();
1365        batch.put(0, key1, key1);
1366        db.write(batch).await.unwrap();
1367        assert_eq!(
1368            db.num_keys(0).await.unwrap(),
1369            1,
1370            "adding a key increases the count"
1371        );
1372    }
1373}