1#![deny(clippy::all)]
2
3mod tools;
4
5pub use async_sqlite::rusqlite::OpenFlags;
6use async_sqlite::rusqlite::{params, OptionalExtension as _};
7use async_sqlite::*;
8use keyvaluedb::{
9 DBKeyRef, DBKeyValue, DBKeyValueRef, DBOp, DBTransaction, DBTransactionError, DBValue, IoStats,
10 IoStatsKind, KeyValueDB, KeyValueDBPinBoxFuture,
11};
12use parking_lot::Mutex;
13use std::sync::Arc;
14use std::time::{Duration, Instant};
15use std::{
16 io,
17 path::{Path, PathBuf},
18 str::FromStr,
19};
20use tools::*;
21
22#[derive(Copy, Clone, Debug, Eq, PartialEq)]
25pub enum VacuumMode {
26 None,
27 Incremental,
28 Full,
29}
30
31#[derive(Clone)]
33pub struct DatabaseConfig {
34 pub columns: u32,
37 pub flags: OpenFlags,
39 pub num_conns: usize,
41 pub vacuum_mode: VacuumMode,
43}
44
45impl DatabaseConfig {
46 pub fn new() -> Self {
48 Default::default()
49 }
50
51 pub fn with_columns(self, columns: u32) -> Self {
53 assert!(columns > 0, "the number of columns must not be zero");
54 Self { columns, ..self }
55 }
56
57 pub fn with_in_memory(self) -> Self {
59 Self {
60 flags: OpenFlags::SQLITE_OPEN_READ_WRITE
61 | OpenFlags::SQLITE_OPEN_CREATE
62 | OpenFlags::SQLITE_OPEN_NO_MUTEX
63 | OpenFlags::SQLITE_OPEN_MEMORY,
64 ..self
65 }
66 }
67
68 pub fn with_flags(self, flags: OpenFlags) -> Self {
70 Self { flags, ..self }
71 }
72
73 pub fn with_num_conns(self, num_conns: usize) -> Self {
75 Self { num_conns, ..self }
76 }
77
78 pub fn with_vacuum_mode(self, vacuum_mode: VacuumMode) -> Self {
80 Self {
81 vacuum_mode,
82 ..self
83 }
84 }
85}
86
87impl Default for DatabaseConfig {
88 fn default() -> DatabaseConfig {
89 DatabaseConfig {
90 columns: 1,
91 flags: OpenFlags::SQLITE_OPEN_READ_WRITE
92 | OpenFlags::SQLITE_OPEN_CREATE
93 | OpenFlags::SQLITE_OPEN_NO_MUTEX,
94 num_conns: 1,
95 vacuum_mode: VacuumMode::None,
96 }
97 }
98}
99
100pub struct DatabaseTable {
104 _table: String,
105 str_has_value: String,
106 str_has_value_like: String,
107 str_get_unique_value: String,
108 str_get_first_value_like: String,
109 str_set_unique_value: String,
110 str_remove_unique_value: String,
111 str_remove_and_return_unique_value: String,
112 str_remove_unique_value_like: String,
113 str_iter_with_prefix: String,
114 str_iter_no_prefix: String,
115 str_iter_keys_with_prefix: String,
116 str_iter_keys_no_prefix: String,
117}
118
119impl DatabaseTable {
120 pub fn new(table: String) -> Self {
121 let str_has_value = format!("SELECT 1 FROM {} WHERE [key] = ? LIMIT 1", table);
122 let str_has_value_like = format!(
123 "SELECT 1 FROM {} WHERE [key] LIKE ? ESCAPE '\\' LIMIT 1",
124 table
125 );
126 let str_get_unique_value = format!("SELECT value FROM {} WHERE [key] = ? LIMIT 1", table);
127 let str_get_first_value_like = format!(
128 "SELECT key, value FROM {} WHERE [key] LIKE ? ESCAPE '\\' LIMIT 1",
129 table
130 );
131 let str_set_unique_value = format!(
132 "INSERT OR REPLACE INTO {} ([key], value) VALUES(?, ?)",
133 table
134 );
135 let str_remove_unique_value = format!("DELETE FROM {} WHERE [key] = ?", table);
136 let str_remove_and_return_unique_value =
137 format!("DELETE FROM {} WHERE [key] = ? RETURNING value", table);
138 let str_remove_unique_value_like =
139 format!("DELETE FROM {} WHERE [key] LIKE ? ESCAPE '\\'", table);
140 let str_iter_with_prefix = format!(
141 "SELECT key, value FROM {} WHERE [key] LIKE ? ESCAPE '\\'",
142 table
143 );
144 let str_iter_no_prefix = format!("SELECT key, value FROM {}", table);
145 let str_iter_keys_with_prefix =
146 format!("SELECT key FROM {} WHERE [key] LIKE ? ESCAPE '\\'", table);
147 let str_iter_keys_no_prefix = format!("SELECT key FROM {}", table);
148
149 Self {
150 _table: table,
151 str_has_value,
152 str_has_value_like,
153 str_get_unique_value,
154 str_get_first_value_like,
155 str_set_unique_value,
156 str_remove_unique_value,
157 str_remove_and_return_unique_value,
158 str_remove_unique_value_like,
159 str_iter_with_prefix,
160 str_iter_no_prefix,
161 str_iter_keys_with_prefix,
162 str_iter_keys_no_prefix,
163 }
164 }
165}
166
167pub struct DatabaseUnlockedInner {
171 path: PathBuf,
172 config: DatabaseConfig,
173 pool: Pool,
174 control_table: Arc<DatabaseTable>,
175 column_tables: Vec<Arc<DatabaseTable>>,
176}
177
178impl Drop for DatabaseUnlockedInner {
179 fn drop(&mut self) {
180 let _ = self.pool.close_blocking();
181 }
182}
183
184pub struct DatabaseInner {
185 overall_stats: IoStats,
186 current_stats: IoStats,
187}
188
189#[derive(Clone)]
190pub struct Database {
191 unlocked_inner: Arc<DatabaseUnlockedInner>,
192 inner: Arc<Mutex<DatabaseInner>>,
193}
194
195impl Database {
196 pub fn open<P: AsRef<Path>>(path: P, config: DatabaseConfig) -> io::Result<Self> {
200 assert_ne!(config.columns, 0, "number of columns must be >= 1");
201
202 let path = PathBuf::from(path.as_ref());
203 let flags = config.flags;
204
205 let mut column_tables = vec![];
206 for n in 0..config.columns {
207 column_tables.push(Arc::new(DatabaseTable::new(get_column_table_name(n))))
208 }
209 let control_table = Arc::new(DatabaseTable::new("control".to_string()));
210
211 let pool_builder = PoolBuilder::new()
212 .path(&path)
213 .flags(flags)
214 .num_conns(config.num_conns);
215
216 let pool = pool_builder.open_blocking().map_err(io::Error::other)?;
217
218 let out = Self {
219 unlocked_inner: Arc::new(DatabaseUnlockedInner {
220 path,
221 config,
222 pool,
223 control_table,
224 column_tables,
225 }),
226 inner: Arc::new(Mutex::new(DatabaseInner {
227 overall_stats: IoStats::empty(),
228 current_stats: IoStats::empty(),
229 })),
230 };
231
232 let vacuum_mode = out.config().vacuum_mode;
233
234 out.conn_blocking(move |conn| {
235 conn.set_prepared_statement_cache_capacity(256);
237
238 conn.pragma_update(None, "case_sensitive_like", "ON")?;
239 conn.pragma_update(None, "journal_mode", "WAL")?;
240 conn.pragma_update(None, "synchronous", "normal")?;
241 conn.pragma_update(None, "journal_size_limit", 6144000)?;
242 conn.pragma_update(None, "wal_checkpoint", "TRUNCATE")?;
243
244 match vacuum_mode {
245 VacuumMode::None | VacuumMode::Full => {
246 let current: u32 =
247 conn.pragma_query_value(None, "auto_vacuum", |x| x.get(0))?;
248 if current != 0 {
249 conn.execute("VACUUM", [])?;
250 conn.pragma_update(None, "auto_vacuum", 0)?;
251 }
252 }
253 VacuumMode::Incremental => {
254 let current: u32 =
255 conn.pragma_query_value(None, "auto_vacuum", |x| x.get(0))?;
256 if current != 2 {
257 conn.execute("VACUUM", [])?;
258 conn.pragma_update(None, "auto_vacuum", "2")?;
259 }
260 }
261 }
262
263 Ok(())
264 })
265 .map_err(io::Error::other)?;
266
267 out.open_resize_columns()?;
268
269 Ok(out)
270 }
271
272 pub fn path(&self) -> PathBuf {
273 self.unlocked_inner.path.clone()
274 }
275
276 pub fn config(&self) -> DatabaseConfig {
277 self.unlocked_inner.config.clone()
278 }
279
280 pub fn columns(&self) -> u32 {
281 self.unlocked_inner.config.columns
282 }
283
284 pub fn control_table(&self) -> Arc<DatabaseTable> {
285 self.unlocked_inner.control_table.clone()
286 }
287
288 pub fn column_table(&self, col: u32) -> Arc<DatabaseTable> {
289 self.unlocked_inner.column_tables[col as usize].clone()
290 }
291
292 pub fn conn_blocking<T, F>(&self, func: F) -> Result<T, Error>
293 where
294 F: FnOnce(&rusqlite::Connection) -> Result<T, rusqlite::Error> + Send + 'static,
295 T: Send + 'static,
296 {
297 self.unlocked_inner.pool.conn_blocking(func)
298 }
299
300 pub async fn conn<T, F>(&self, func: F) -> Result<T, Error>
301 where
302 F: FnOnce(&rusqlite::Connection) -> Result<T, rusqlite::Error> + Send + 'static,
303 T: Send + 'static,
304 {
305 self.unlocked_inner.pool.conn(func).await
306 }
307
308 pub async fn conn_mut<T, F>(&self, func: F) -> Result<T, Error>
309 where
310 F: FnOnce(&mut rusqlite::Connection) -> Result<T, rusqlite::Error> + Send + 'static,
311 T: Send + 'static,
312 {
313 self.unlocked_inner.pool.conn_mut(func).await
314 }
315
316 pub fn remove_last_column(&self) -> Result<(), Error> {
321 let this = self.clone();
322 self.conn_blocking(move |conn| {
323 let columns = Self::get_unique_value(conn, this.control_table(), "columns", 0u32)?;
324 if columns == 0 {
325 return Err(rusqlite::Error::QueryReturnedNoRows);
326 }
327 Self::set_unique_value(conn, this.control_table(), "columns", columns - 1)?;
328
329 conn.execute(
330 &format!("DROP TABLE {}", get_column_table_name(columns - 1)),
331 [],
332 )?;
333 Ok(())
334 })
335 }
336
337 pub fn add_column(&self) -> Result<(), Error> {
339 let this = self.clone();
340
341 self.conn_blocking(move |conn| {
342 let columns = Self::get_unique_value(conn, this.control_table(), "columns", 0u32)?;
343 Self::set_unique_value(conn, this.control_table(), "columns", columns + 1)?;
344 Self::create_column_table(conn, columns)
345 })
346 }
347 pub fn transaction(&self) -> DBTransaction {
349 DBTransaction::new()
350 }
351
352 pub async fn vacuum(&self) -> Result<(), Error> {
354 match self.config().vacuum_mode {
355 VacuumMode::None => {
356 self.conn(move |conn| {
357 conn.pragma_update(None, "wal_checkpoint", "TRUNCATE")?;
358 Ok(())
359 })
360 .await
361 }
362 VacuumMode::Incremental => {
363 self.conn(move |conn| {
364 conn.execute("PRAGMA incremental_vacuum", [])?;
365 conn.pragma_update(None, "wal_checkpoint", "TRUNCATE")?;
366 Ok(())
367 })
368 .await
369 }
370 VacuumMode::Full => {
371 self.conn(move |conn| {
372 conn.execute("VACUUM", [])?;
373 conn.pragma_update(None, "wal_checkpoint", "TRUNCATE")?;
374 Ok(())
375 })
376 .await
377 }
378 }
379 }
380
381 fn validate_column(&self, col: u32) -> rusqlite::Result<()> {
385 if col >= self.columns() {
386 return Err(rusqlite::Error::InvalidColumnIndex(col as usize));
387 }
388 Ok(())
389 }
390
391 fn create_column_table(conn: &rusqlite::Connection, column: u32) -> rusqlite::Result<()> {
392 conn.execute(&format!("CREATE TABLE IF NOT EXISTS {} (id INTEGER PRIMARY KEY AUTOINCREMENT, [key] TEXT UNIQUE, value BLOB)", get_column_table_name(column)), []).map(drop)
393 }
394
395 fn get_unique_value<V>(
396 conn: &rusqlite::Connection,
397 table: Arc<DatabaseTable>,
398 key: &str,
399 default: V,
400 ) -> rusqlite::Result<V>
401 where
402 V: FromStr,
403 {
404 let mut stmt = conn.prepare_cached(&table.str_get_unique_value)?;
405
406 if let Ok(found) = stmt.query_row([key], |row| -> rusqlite::Result<String> { row.get(0) }) {
407 if let Ok(v) = V::from_str(&found) {
408 return Ok(v);
409 }
410 }
411 Ok(default)
412 }
413
414 fn set_unique_value<V>(
415 conn: &rusqlite::Connection,
416 table: Arc<DatabaseTable>,
417 key: &str,
418 value: V,
419 ) -> rusqlite::Result<()>
420 where
421 V: ToString,
422 {
423 let mut stmt = conn.prepare_cached(&table.str_set_unique_value)?;
424
425 let changed = stmt.execute([key, value.to_string().as_str()])?;
426
427 assert!(
428 changed <= 1,
429 "multiple changes to unique key should not occur"
430 );
431 if changed == 0 {
432 return Err(rusqlite::Error::QueryReturnedNoRows);
433 }
434
435 Ok(())
436 }
437
438 fn has_value(
439 conn: &rusqlite::Connection,
440 table: Arc<DatabaseTable>,
441 key: &str,
442 ) -> rusqlite::Result<bool> {
443 let mut stmt = conn.prepare_cached(&table.str_has_value)?;
444 stmt.exists([key])
445 }
446
447 fn has_value_like(
448 conn: &rusqlite::Connection,
449 table: Arc<DatabaseTable>,
450 key: &str,
451 ) -> rusqlite::Result<bool> {
452 let mut stmt = conn.prepare_cached(&table.str_has_value_like)?;
453 stmt.exists([key])
454 }
455
456 fn load_unique_value_blob(
457 conn: &rusqlite::Connection,
458 table: Arc<DatabaseTable>,
459 key: &str,
460 ) -> rusqlite::Result<Option<Vec<u8>>> {
461 let mut stmt = conn.prepare_cached(&table.str_get_unique_value)?;
462
463 stmt.query_row([key], |row| -> rusqlite::Result<Vec<u8>> { row.get(0) })
464 .optional()
465 }
466
467 fn load_first_value_blob_like(
468 conn: &rusqlite::Connection,
469 table: Arc<DatabaseTable>,
470 like: &str,
471 ) -> rusqlite::Result<Option<(String, Vec<u8>)>> {
472 let mut stmt = conn.prepare_cached(&table.str_get_first_value_like)?;
473
474 stmt.query_row([like], |row| -> rusqlite::Result<(String, Vec<u8>)> {
475 Ok((row.get(0)?, row.get(1)?))
476 })
477 .optional()
478 }
479
480 fn store_unique_value_blob(
481 conn: &rusqlite::Connection,
482 table: Arc<DatabaseTable>,
483 key: &str,
484 value: &[u8],
485 ) -> rusqlite::Result<()> {
486 let mut stmt = conn.prepare_cached(&table.str_set_unique_value)?;
487
488 let changed = stmt.execute(params![key, value])?;
489 assert!(
490 changed <= 1,
491 "multiple changes to unique key should not occur"
492 );
493 if changed == 0 {
494 return Err(rusqlite::Error::QueryReturnedNoRows);
495 }
496 Ok(())
497 }
498
499 fn remove_unique_value_blob(
500 conn: &rusqlite::Connection,
501 table: Arc<DatabaseTable>,
502 key: &str,
503 ) -> rusqlite::Result<()> {
504 let mut stmt = conn.prepare_cached(&table.str_remove_unique_value)?;
505
506 let _ = stmt.execute([key])?;
507
508 Ok(())
509 }
510
511 fn remove_and_return_unique_value_blob(
512 conn: &rusqlite::Connection,
513 table: Arc<DatabaseTable>,
514 key: &str,
515 ) -> rusqlite::Result<Option<Vec<u8>>> {
516 let mut stmt = conn.prepare_cached(&table.str_remove_and_return_unique_value)?;
517
518 stmt.query_row([key], |row| -> rusqlite::Result<Vec<u8>> { row.get(0) })
519 .optional()
520 }
521
522 fn remove_unique_value_blob_like(
523 conn: &rusqlite::Connection,
524 table: Arc<DatabaseTable>,
525 like: &str,
526 ) -> rusqlite::Result<usize> {
527 let mut stmt = conn.prepare_cached(&table.str_remove_unique_value_like)?;
528
529 let changed = stmt.execute([like])?;
530 Ok(changed)
531 }
532
533 fn open_resize_columns(&self) -> io::Result<()> {
534 let columns = self.columns();
535 let this = self.clone();
536 self.conn_blocking(move |conn| {
537 conn.execute("CREATE TABLE IF NOT EXISTS control (id INTEGER PRIMARY KEY AUTOINCREMENT, [key] TEXT UNIQUE, value TEXT)", [])?;
539
540 let on_disk_columns =
542 Self::get_unique_value(conn, this.control_table(), "columns", 0u32)?;
543
544 if columns <= on_disk_columns {
546 return Ok(());
547 }
548
549 for cn in on_disk_columns..columns {
551 Self::create_column_table(conn, cn)?;
553 }
554 Self::set_unique_value(
555 conn,
556 this.control_table(),
557 "columns",
558 columns,
559 )?;
560 Ok(())
561 }).map_err(io::Error::other)
562 }
563
564 fn stats_read(&self, count: usize, bytes: usize) {
565 let mut inner = self.inner.lock();
566 inner.current_stats.reads += count as u64;
567 inner.overall_stats.reads += count as u64;
568 inner.current_stats.bytes_read += bytes as u64;
569 inner.overall_stats.bytes_read += bytes as u64;
570 }
571
572 fn stats_write(&self, sizes: &[usize]) {
573 if sizes.is_empty() {
574 return;
575 }
576
577 let mut inner = self.inner.lock();
578 for &size in sizes {
579 inner.current_stats.record_write(size);
580 inner.overall_stats.record_write(size);
581 }
582 }
583
584 fn stats_tx_write(&self, size: usize, duration: Duration) {
585 let mut inner = self.inner.lock();
586 inner
587 .current_stats
588 .record_tx_write(size, duration.as_micros() as f64);
589 inner
590 .overall_stats
591 .record_tx_write(size, duration.as_micros() as f64);
592 }
593
594 fn stats_delete(&self, count: usize) {
595 if count == 0 {
596 return;
597 }
598
599 let mut inner = self.inner.lock();
600 inner.current_stats.deletes += count as u64;
601 inner.overall_stats.deletes += count as u64;
602 }
603
604 fn stats_delete_prefix(&self, count: usize) {
605 if count == 0 {
606 return;
607 }
608
609 let mut inner = self.inner.lock();
610 inner.current_stats.prefix_deletes += count as u64;
611 inner.overall_stats.prefix_deletes += count as u64;
612 }
613
614 fn stats_transaction(&self, count: usize) {
615 let mut inner = self.inner.lock();
616 inner.current_stats.transactions += count as u64;
617 inner.overall_stats.transactions += count as u64;
618 }
619}
620
621impl KeyValueDB for Database {
622 fn get(&self, col: u32, key: &[u8]) -> KeyValueDBPinBoxFuture<'_, io::Result<Option<DBValue>>> {
623 let key_text = key_to_text(key);
624 let key_len = key.len();
625
626 Box::pin(async move {
627 let that = self.clone();
628 that.validate_column(col).map_err(io::Error::other)?;
629 let someval = self
630 .conn_blocking(move |conn| {
631 Self::load_unique_value_blob(conn, that.column_table(col), &key_text)
632 })
633 .map_err(io::Error::other)?;
634 {
635 match &someval {
636 Some(val) => self.stats_read(1, key_len + val.len()),
637 None => self.stats_read(1, key_len),
638 }
639 }
640
641 Ok(someval)
642 })
643 }
644
645 fn delete(
647 &self,
648 col: u32,
649 key: &[u8],
650 ) -> KeyValueDBPinBoxFuture<'_, io::Result<Option<DBValue>>> {
651 let key_text = key_to_text(key);
652 let key_len = key.len();
653
654 Box::pin(async move {
655 let that = self.clone();
656 that.validate_column(col).map_err(io::Error::other)?;
657 self.conn_blocking(move |conn| {
658 let someval = Self::remove_and_return_unique_value_blob(
659 conn,
660 that.column_table(col),
661 &key_text,
662 )?;
663
664 match &someval {
665 Some(val) => {
666 that.stats_read(1, key_len + val.len());
667 }
668 None => that.stats_read(1, key_len),
669 }
670
671 Ok(someval)
672 })
673 .map_err(io::Error::other)
674 })
675 }
676
677 fn write(
678 &self,
679 transaction: DBTransaction,
680 ) -> KeyValueDBPinBoxFuture<'_, Result<(), DBTransactionError>> {
681 let transaction = Arc::new(transaction);
682 Box::pin(async move {
683 self.stats_transaction(1);
684
685 let that = self.clone();
686 let transaction_clone = transaction.clone();
687 self.conn_mut(move |conn| {
688 let mut sizes = Vec::with_capacity(transaction_clone.ops.len());
689 let mut total_tx_size = 0;
690 let mut deletes = 0usize;
691 let mut prefix_deletes = 0usize;
692 let start = Instant::now();
693
694 let tx = conn.transaction()?;
695
696 for op in &transaction_clone.ops {
697 match op {
698 DBOp::Insert { col, key, value } => {
699 that.validate_column(*col)?;
700 Self::store_unique_value_blob(
701 &tx,
702 that.column_table(*col),
703 &key_to_text(key),
704 value,
705 )?;
706 sizes.push(key.len() + value.len());
707 total_tx_size += key.len() + value.len();
708 }
709 DBOp::Delete { col, key } => {
710 that.validate_column(*col)?;
711 Self::remove_unique_value_blob(
712 &tx,
713 that.column_table(*col),
714 &key_to_text(key),
715 )?;
716 deletes += 1;
717 }
718 DBOp::DeletePrefix { col, prefix } => {
719 that.validate_column(*col)?;
720 Self::remove_unique_value_blob_like(
721 &tx,
722 that.column_table(*col),
723 &(like_key_to_text(prefix) + "%"),
724 )?;
725 prefix_deletes += 1;
726 }
727 }
728 }
729 tx.commit()?;
730
731 let duration = Instant::now() - start;
732 that.stats_write(&sizes);
733 that.stats_tx_write(total_tx_size, duration);
734 that.stats_delete(deletes);
735 that.stats_delete_prefix(prefix_deletes);
736
737 Ok(())
738 })
739 .await
740 .map_err(io::Error::other)
741 .map_err(|error| {
742 let transaction = transaction.as_ref().clone();
743 DBTransactionError { error, transaction }
744 })
745 })
746 }
747
748 fn iter<
749 'a,
750 T: Send + 'static,
751 C: Send + 'static,
752 F: FnMut(&mut C, DBKeyValueRef) -> io::Result<Option<T>> + Send + Sync + 'static,
753 >(
754 &'a self,
755 col: u32,
756 prefix: Option<&'a [u8]>,
757 context: C,
758 mut f: F,
759 ) -> KeyValueDBPinBoxFuture<'a, io::Result<(C, Option<T>)>> {
760 let opt_prefix_query = prefix.map(|p| like_key_to_text(p) + "%");
761 Box::pin(async move {
762 if col >= self.columns() {
763 return Err(io::Error::from(io::ErrorKind::NotFound));
764 }
765
766 let that = self.clone();
767 let context = Arc::new(Mutex::new(Some(context)));
768 let context_ref = context.clone();
769
770 let res = self
771 .conn(move |conn| {
772 let mut context = context_ref.lock();
773 let context = context.as_mut().unwrap();
774
775 let mut stmt;
776 let mut rows;
777 if let Some(prefix_query) = opt_prefix_query {
778 stmt = match conn
779 .prepare_cached(&that.column_table(col).str_iter_with_prefix)
780 {
781 Ok(v) => v,
782 Err(e) => {
783 return Ok(Err(io::Error::other(e)));
784 }
785 };
786 rows = match stmt.query([prefix_query]) {
787 Ok(v) => v,
788 Err(e) => {
789 return Ok(Err(io::Error::other(e)));
790 }
791 };
792 } else {
793 stmt = match conn.prepare_cached(&that.column_table(col).str_iter_no_prefix)
794 {
795 Ok(v) => v,
796 Err(e) => {
797 return Ok(Err(io::Error::other(e)));
798 }
799 };
800 rows = match stmt.query([]) {
801 Ok(v) => v,
802 Err(e) => {
803 return Ok(Err(io::Error::other(e)));
804 }
805 };
806 }
807
808 let mut sw = 0usize;
809 let mut sbw = 0usize;
810
811 let out = loop {
812 match rows.next() {
813 Ok(Some(row)) => {
815 let kt: String = match row.get(0) {
816 Err(e) => {
817 break Err(io::Error::other(e));
818 }
819 Ok(v) => v,
820 };
821 let v: Vec<u8> = match row.get(1) {
822 Err(e) => {
823 break Err(io::Error::other(e));
824 }
825 Ok(v) => v,
826 };
827 let k: Vec<u8> = match text_to_key(&kt) {
828 Err(e) => {
829 break Err(io::Error::other(format!(
830 "SQLite row get column 0 text convert error: {:?}",
831 e
832 )));
833 }
834 Ok(v) => v,
835 };
836
837 sw += 1;
838 sbw += k.len() + v.len();
839
840 match f(context, (&k, &v)) {
841 Ok(None) => (),
842 Ok(Some(out)) => {
843 that.stats_read(sw, sbw);
845 break Ok(Some(out));
846 }
847 Err(e) => {
848 that.stats_read(sw, sbw);
850 break Err(e);
851 }
852 }
853 }
854 Ok(None) => {
856 break Ok(None);
857 }
858 Err(_) => {
860 break Ok(None);
861 }
862 }
863 };
864
865 that.stats_read(sw, sbw);
866
867 Ok(out)
868 })
869 .await
870 .map_err(io::Error::other)?;
871
872 let context = context.lock().take().unwrap();
873
874 res.map(|x| (context, x))
875 })
876 }
877
878 fn iter_keys<
879 'a,
880 T: Send + 'static,
881 C: Send + 'static,
882 F: FnMut(&mut C, DBKeyRef) -> io::Result<Option<T>> + Send + Sync + 'static,
883 >(
884 &'a self,
885 col: u32,
886 prefix: Option<&'a [u8]>,
887 context: C,
888 mut f: F,
889 ) -> KeyValueDBPinBoxFuture<'a, io::Result<(C, Option<T>)>> {
890 let opt_prefix_query = prefix.map(|p| like_key_to_text(p) + "%");
891 Box::pin(async move {
892 if col >= self.columns() {
893 return Err(io::Error::from(io::ErrorKind::NotFound));
894 }
895
896 let that = self.clone();
897 let context = Arc::new(Mutex::new(Some(context)));
898 let context_ref = context.clone();
899
900 let res = self
901 .conn(move |conn| {
902 let mut context = context_ref.lock();
903 let context = context.as_mut().unwrap();
904
905 let mut stmt;
906 let mut rows;
907 if let Some(prefix_query) = opt_prefix_query {
908 stmt = match conn
909 .prepare_cached(&that.column_table(col).str_iter_keys_with_prefix)
910 {
911 Ok(v) => v,
912 Err(e) => {
913 return Ok(Err(io::Error::other(e)));
914 }
915 };
916 rows = match stmt.query([prefix_query]) {
917 Ok(v) => v,
918 Err(e) => {
919 return Ok(Err(io::Error::other(e)));
920 }
921 };
922 } else {
923 stmt = match conn
924 .prepare_cached(&that.column_table(col).str_iter_keys_no_prefix)
925 {
926 Ok(v) => v,
927 Err(e) => {
928 return Ok(Err(io::Error::other(e)));
929 }
930 };
931 rows = match stmt.query([]) {
932 Ok(v) => v,
933 Err(e) => {
934 return Ok(Err(io::Error::other(e)));
935 }
936 };
937 }
938
939 let mut sw = 0usize;
940 let mut sbw = 0usize;
941
942 let out = loop {
943 match rows.next() {
944 Ok(Some(row)) => {
946 let kt: String = match row.get(0) {
947 Err(e) => {
948 break Err(io::Error::other(e));
949 }
950 Ok(v) => v,
951 };
952 let k: Vec<u8> = match text_to_key(&kt) {
953 Err(e) => {
954 break Err(io::Error::other(format!(
955 "SQLite row get column 0 text convert error: {:?}",
956 e
957 )));
958 }
959 Ok(v) => v,
960 };
961
962 sw += 1;
963 sbw += k.len();
964
965 match f(context, &k) {
966 Ok(None) => (),
967 Ok(Some(out)) => {
968 that.stats_read(sw, sbw);
970 break Ok(Some(out));
971 }
972 Err(e) => {
973 that.stats_read(sw, sbw);
975 break Err(e);
976 }
977 }
978 }
979 Ok(None) => {
981 break Ok(None);
982 }
983 Err(_) => {
985 break Ok(None);
986 }
987 }
988 };
989
990 that.stats_read(sw, sbw);
991
992 Ok(out)
993 })
994 .await
995 .map_err(io::Error::other)?;
996
997 let context = context.lock().take().unwrap();
998
999 res.map(|x| (context, x))
1000 })
1001 }
1002
1003 fn io_stats(&self, kind: IoStatsKind) -> IoStats {
1004 fn duration_since(timestamp_microseconds: u64) -> Duration {
1005 std::time::SystemTime::now()
1006 .duration_since(std::time::UNIX_EPOCH)
1007 .map_or(Duration::from_micros(0), |time| {
1008 let now = time.as_micros() as u64;
1009 if now >= timestamp_microseconds {
1010 Duration::from_micros(now - timestamp_microseconds)
1011 } else {
1012 Duration::from_micros(0)
1013 }
1014 })
1015 }
1016
1017 let mut inner = self.inner.lock();
1018 match kind {
1019 IoStatsKind::Overall => {
1020 let mut stats = inner.overall_stats.clone();
1021 stats.span = duration_since(stats.started);
1022 stats
1023 }
1024 IoStatsKind::SincePrevious => {
1025 let mut stats = inner.current_stats.clone();
1026 stats.span = duration_since(stats.started);
1027 inner.current_stats = IoStats::empty();
1028 stats
1029 }
1030 }
1031 }
1032
1033 fn num_columns(&self) -> io::Result<u32> {
1034 let this = self.clone();
1035 self.conn_blocking(move |conn| {
1036 Self::get_unique_value(conn, this.control_table(), "columns", 0u32)
1037 })
1038 .map_err(io::Error::other)
1039 }
1040
1041 fn num_keys(&self, col: u32) -> KeyValueDBPinBoxFuture<'_, io::Result<u64>> {
1042 let this = self.clone();
1043 Box::pin(async move {
1044 this.conn(move |conn| {
1045 conn.query_row(
1046 &format!("SELECT Count(*) FROM {}", get_column_table_name(col)),
1047 [],
1048 |row| -> rusqlite::Result<u64> { row.get(0) },
1049 )
1050 })
1051 .await
1052 .map_err(|_| io::Error::from(io::ErrorKind::NotFound))
1053 })
1054 }
1055
1056 fn has_key<'a>(
1058 &'a self,
1059 col: u32,
1060 key: &'a [u8],
1061 ) -> KeyValueDBPinBoxFuture<'a, io::Result<bool>> {
1062 let key_text = key_to_text(key);
1063 let key_len = key.len();
1064
1065 Box::pin(async move {
1066 let that = self.clone();
1067 that.validate_column(col).map_err(io::Error::other)?;
1068 let someval = self
1069 .conn_blocking(move |conn| Self::has_value(conn, that.column_table(col), &key_text))
1070 .map_err(io::Error::other)?;
1071
1072 self.stats_read(1, key_len);
1073
1074 Ok(someval)
1075 })
1076 }
1077
1078 fn has_prefix<'a>(
1080 &'a self,
1081 col: u32,
1082 prefix: &'a [u8],
1083 ) -> KeyValueDBPinBoxFuture<'a, io::Result<bool>> {
1084 let prefix_len = prefix.len();
1085 let prefix_text = like_key_to_text(prefix) + "%";
1086
1087 Box::pin(async move {
1088 let that = self.clone();
1089 that.validate_column(col).map_err(io::Error::other)?;
1090 let someval = self
1091 .conn_blocking(move |conn| {
1092 Self::has_value_like(conn, that.column_table(col), &prefix_text)
1093 })
1094 .map_err(io::Error::other)?;
1095
1096 self.stats_read(1, prefix_len);
1097
1098 Ok(someval)
1099 })
1100 }
1101
1102 fn first_with_prefix<'a>(
1104 &'a self,
1105 col: u32,
1106 prefix: &'a [u8],
1107 ) -> KeyValueDBPinBoxFuture<'a, io::Result<Option<DBKeyValue>>> {
1108 let prefix_len = prefix.len();
1109 let like = like_key_to_text(prefix) + "%";
1110
1111 Box::pin(async move {
1112 let that = self.clone();
1113 that.validate_column(col).map_err(io::Error::other)?;
1114 let someval = self
1115 .conn_blocking(move |conn| {
1116 Self::load_first_value_blob_like(conn, that.column_table(col), &like)
1117 })
1118 .map_err(io::Error::other)?;
1119
1120 self.stats_read(1, prefix_len);
1121
1122 match someval {
1123 Some((kt, val)) => match text_to_key(&kt) {
1124 Err(e) => Err(io::Error::other(format!(
1125 "SQLite row get column 0 text convert error: {:?}",
1126 e
1127 ))),
1128 Ok(k) => Ok(Some((k, val))),
1129 },
1130 None => Ok(None),
1131 }
1132 })
1133 }
1134
1135 fn cleanup(&self) -> KeyValueDBPinBoxFuture<'_, io::Result<()>> {
1137 Box::pin(async { self.vacuum().await.map_err(io::Error::other) })
1138 }
1139}
1140
1141#[cfg(test)]
1142mod tests {
1143
1144 use super::*;
1145 use keyvaluedb_shared_tests as st;
1146 use tempfile::Builder as TempfileBuilder;
1147
1148 fn create(columns: u32) -> io::Result<Database> {
1149 let tempfile = TempfileBuilder::new()
1150 .prefix("")
1151 .tempfile()?
1152 .path()
1153 .to_path_buf();
1154 let config = DatabaseConfig::new().with_columns(columns);
1155 Database::open(tempfile, config)
1156 }
1157
1158 fn create_vacuum_mode(columns: u32, vacuum_mode: VacuumMode) -> io::Result<Database> {
1159 let tempfile = TempfileBuilder::new()
1160 .prefix("")
1161 .tempfile()?
1162 .path()
1163 .to_path_buf();
1164 let config = DatabaseConfig::new()
1165 .with_columns(columns)
1166 .with_vacuum_mode(vacuum_mode);
1167 Database::open(tempfile, config)
1168 }
1169
1170 #[tokio::test]
1171 async fn get_fails_with_non_existing_column() -> io::Result<()> {
1172 let db = create(1)?;
1173 st::test_get_fails_with_non_existing_column(db).await
1174 }
1175
1176 #[tokio::test]
1177 async fn num_keys() -> io::Result<()> {
1178 let db = create(1)?;
1179 st::test_num_keys(db).await
1180 }
1181
1182 #[tokio::test]
1183 async fn put_and_get() -> io::Result<()> {
1184 let db = create(1)?;
1185 st::test_put_and_get(db).await
1186 }
1187
1188 #[tokio::test]
1189 async fn delete_and_get() -> io::Result<()> {
1190 let db = create(1)?;
1191 st::test_delete_and_get(db).await
1192 }
1193
1194 #[tokio::test]
1195 async fn delete_and_get_single() -> io::Result<()> {
1196 let db = create(1)?;
1197 st::test_delete_and_get_single(db).await
1198 }
1199
1200 #[tokio::test]
1201 async fn delete_prefix() -> io::Result<()> {
1202 let db = create(st::DELETE_PREFIX_NUM_COLUMNS)?;
1203 st::test_delete_prefix(db).await
1204 }
1205
1206 #[tokio::test]
1207 async fn iter() -> io::Result<()> {
1208 let db = create(1)?;
1209 st::test_iter(db).await
1210 }
1211
1212 #[tokio::test]
1213 async fn iter_keys() -> io::Result<()> {
1214 let db = create(1)?;
1215 st::test_iter_keys(db).await
1216 }
1217
1218 #[tokio::test]
1219 async fn iter_with_prefix() -> io::Result<()> {
1220 let db = create(1)?;
1221 st::test_iter_with_prefix(db).await
1222 }
1223
1224 #[tokio::test]
1225 async fn complex() -> io::Result<()> {
1226 let db = create(1)?;
1227 st::test_complex(db).await
1228 }
1229
1230 #[tokio::test]
1231 async fn cleanup() -> io::Result<()> {
1232 let db = create(1)?;
1233 st::test_cleanup(db).await?;
1234
1235 let db = create_vacuum_mode(1, VacuumMode::None)?;
1236 st::test_cleanup(db).await?;
1237
1238 let db = create_vacuum_mode(1, VacuumMode::Incremental)?;
1239 st::test_cleanup(db).await?;
1240
1241 let db = create_vacuum_mode(1, VacuumMode::Full)?;
1242 st::test_cleanup(db).await?;
1243
1244 let tempfile = TempfileBuilder::new()
1245 .prefix("")
1246 .tempfile()?
1247 .path()
1248 .to_path_buf();
1249 let config = DatabaseConfig::new().with_vacuum_mode(VacuumMode::None);
1250 let db = Database::open(tempfile.clone(), config)?;
1251 st::test_cleanup(db).await?;
1252
1253 let config = DatabaseConfig::new().with_vacuum_mode(VacuumMode::Incremental);
1254 let db = Database::open(tempfile.clone(), config)?;
1255 st::test_cleanup(db).await?;
1256
1257 let config = DatabaseConfig::new().with_vacuum_mode(VacuumMode::Full);
1258 let db = Database::open(tempfile.clone(), config)?;
1259 st::test_cleanup(db).await?;
1260
1261 let config = DatabaseConfig::new().with_vacuum_mode(VacuumMode::None);
1262 let db = Database::open(tempfile, config)?;
1263 st::test_cleanup(db).await?;
1264
1265 Ok(())
1266 }
1267
1268 #[tokio::test]
1269 async fn stats() -> io::Result<()> {
1270 let db = create(st::IO_STATS_NUM_COLUMNS)?;
1271 st::test_io_stats(db).await
1272 }
1273
1274 #[tokio::test]
1275 #[should_panic]
1276 async fn db_config_with_zero_columns() {
1277 let _cfg = DatabaseConfig::new().with_columns(0);
1278 }
1279
1280 #[tokio::test]
1281 #[should_panic]
1282 async fn open_db_with_zero_columns() {
1283 let cfg = DatabaseConfig::new().with_columns(0);
1284 let _db = Database::open("", cfg);
1285 }
1286
1287 #[tokio::test]
1288 async fn add_columns() {
1289 let config_1 = DatabaseConfig::default();
1290 let config_5 = DatabaseConfig::new().with_columns(5);
1291
1292 let tempfile = TempfileBuilder::new()
1293 .prefix("")
1294 .tempfile()
1295 .unwrap()
1296 .path()
1297 .to_path_buf();
1298
1299 {
1301 let db = Database::open(&tempfile, config_1).unwrap();
1302 assert_eq!(db.num_columns().unwrap(), 1);
1303
1304 for i in 2..=5 {
1305 db.add_column().unwrap();
1306 assert_eq!(db.num_columns().unwrap(), i);
1307 }
1308 }
1309
1310 {
1312 let db = Database::open(&tempfile, config_5).unwrap();
1313 assert_eq!(db.num_columns().unwrap(), 5);
1314 }
1315 }
1316
1317 #[tokio::test]
1318 async fn remove_columns() {
1319 let config_1 = DatabaseConfig::default();
1320 let config_5 = DatabaseConfig::new().with_columns(5);
1321
1322 let tempfile = TempfileBuilder::new()
1323 .prefix("drop_columns")
1324 .tempfile()
1325 .unwrap()
1326 .path()
1327 .to_path_buf();
1328
1329 {
1331 let db = Database::open(&tempfile, config_5).expect("open with 5 columns");
1332 assert_eq!(db.num_columns().unwrap(), 5);
1333
1334 for i in (1..5).rev() {
1335 db.remove_last_column().unwrap();
1336 assert_eq!(db.num_columns().unwrap(), i);
1337 }
1338 }
1339
1340 {
1342 let db = Database::open(&tempfile, config_1).unwrap();
1343 assert_eq!(db.num_columns().unwrap(), 1);
1344 }
1345 }
1346
1347 #[tokio::test]
1348 async fn test_num_keys() {
1349 let tempfile = TempfileBuilder::new()
1350 .prefix("")
1351 .tempfile()
1352 .unwrap()
1353 .path()
1354 .to_path_buf();
1355 let config = DatabaseConfig::new().with_columns(1);
1356 let db = Database::open(tempfile, config).unwrap();
1357
1358 assert_eq!(
1359 db.num_keys(0).await.unwrap(),
1360 0,
1361 "database is empty after creation"
1362 );
1363 let key1 = b"beef";
1364 let mut batch = db.transaction();
1365 batch.put(0, key1, key1);
1366 db.write(batch).await.unwrap();
1367 assert_eq!(
1368 db.num_keys(0).await.unwrap(),
1369 1,
1370 "adding a key increases the count"
1371 );
1372 }
1373}