1#![deny(clippy::all)]
2
3mod tools;
4
5pub use async_sqlite::rusqlite::OpenFlags;
6use async_sqlite::rusqlite::{params, OptionalExtension as _};
7use async_sqlite::*;
8use keyvaluedb::{
9 DBKeyRef, DBKeyValue, DBKeyValueRef, DBOp, DBTransaction, DBTransactionError, DBValue, IoStats,
10 IoStatsKind, KeyValueDB,
11};
12use parking_lot::Mutex;
13use std::sync::Arc;
14use std::{
15 future::Future,
16 io,
17 path::{Path, PathBuf},
18 pin::Pin,
19 str::FromStr,
20};
21use tools::*;
22
23#[derive(Copy, Clone, Debug, Eq, PartialEq)]
26pub enum VacuumMode {
27 None,
28 Incremental,
29 Full,
30}
31
32#[derive(Clone)]
34pub struct DatabaseConfig {
35 pub columns: u32,
38 pub flags: OpenFlags,
40 pub num_conns: usize,
42 pub vacuum_mode: VacuumMode,
44}
45
46impl DatabaseConfig {
47 pub fn new() -> Self {
49 Default::default()
50 }
51
52 pub fn with_columns(self, columns: u32) -> Self {
54 assert!(columns > 0, "the number of columns must not be zero");
55 Self { columns, ..self }
56 }
57
58 pub fn with_in_memory(self) -> Self {
60 Self {
61 flags: OpenFlags::SQLITE_OPEN_READ_WRITE
62 | OpenFlags::SQLITE_OPEN_CREATE
63 | OpenFlags::SQLITE_OPEN_NO_MUTEX
64 | OpenFlags::SQLITE_OPEN_MEMORY,
65 ..self
66 }
67 }
68
69 pub fn with_flags(self, flags: OpenFlags) -> Self {
71 Self { flags, ..self }
72 }
73
74 pub fn with_num_conns(self, num_conns: usize) -> Self {
76 Self { num_conns, ..self }
77 }
78
79 pub fn with_vacuum_mode(self, vacuum_mode: VacuumMode) -> Self {
81 Self {
82 vacuum_mode,
83 ..self
84 }
85 }
86}
87
88impl Default for DatabaseConfig {
89 fn default() -> DatabaseConfig {
90 DatabaseConfig {
91 columns: 1,
92 flags: OpenFlags::SQLITE_OPEN_READ_WRITE
93 | OpenFlags::SQLITE_OPEN_CREATE
94 | OpenFlags::SQLITE_OPEN_NO_MUTEX,
95 num_conns: 1,
96 vacuum_mode: VacuumMode::None,
97 }
98 }
99}
100
101pub struct DatabaseTable {
105 _table: String,
106 str_has_value: String,
107 str_has_value_like: String,
108 str_get_unique_value: String,
109 str_get_first_value_like: String,
110 str_set_unique_value: String,
111 str_remove_unique_value: String,
112 str_remove_and_return_unique_value: String,
113 str_remove_unique_value_like: String,
114 str_iter_with_prefix: String,
115 str_iter_no_prefix: String,
116 str_iter_keys_with_prefix: String,
117 str_iter_keys_no_prefix: String,
118}
119
120impl DatabaseTable {
121 pub fn new(table: String) -> Self {
122 let str_has_value = format!("SELECT 1 FROM {} WHERE [key] = ? LIMIT 1", table);
123 let str_has_value_like = format!(
124 "SELECT 1 FROM {} WHERE [key] LIKE ? ESCAPE '\\' LIMIT 1",
125 table
126 );
127 let str_get_unique_value = format!("SELECT value FROM {} WHERE [key] = ? LIMIT 1", table);
128 let str_get_first_value_like = format!(
129 "SELECT key, value FROM {} WHERE [key] LIKE ? ESCAPE '\\' LIMIT 1",
130 table
131 );
132 let str_set_unique_value = format!(
133 "INSERT OR REPLACE INTO {} ([key], value) VALUES(?, ?)",
134 table
135 );
136 let str_remove_unique_value = format!("DELETE FROM {} WHERE [key] = ?", table);
137 let str_remove_and_return_unique_value =
138 format!("DELETE FROM {} WHERE [key] = ? RETURNING value", table);
139 let str_remove_unique_value_like =
140 format!("DELETE FROM {} WHERE [key] LIKE ? ESCAPE '\\'", table);
141 let str_iter_with_prefix = format!(
142 "SELECT key, value FROM {} WHERE [key] LIKE ? ESCAPE '\\'",
143 table
144 );
145 let str_iter_no_prefix = format!("SELECT key, value FROM {}", table);
146 let str_iter_keys_with_prefix =
147 format!("SELECT key FROM {} WHERE [key] LIKE ? ESCAPE '\\'", table);
148 let str_iter_keys_no_prefix = format!("SELECT key FROM {}", table);
149
150 Self {
151 _table: table,
152 str_has_value,
153 str_has_value_like,
154 str_get_unique_value,
155 str_get_first_value_like,
156 str_set_unique_value,
157 str_remove_unique_value,
158 str_remove_and_return_unique_value,
159 str_remove_unique_value_like,
160 str_iter_with_prefix,
161 str_iter_no_prefix,
162 str_iter_keys_with_prefix,
163 str_iter_keys_no_prefix,
164 }
165 }
166}
167
168pub struct DatabaseUnlockedInner {
172 path: PathBuf,
173 config: DatabaseConfig,
174 pool: Pool,
175 control_table: Arc<DatabaseTable>,
176 column_tables: Vec<Arc<DatabaseTable>>,
177}
178
179pub struct DatabaseInner {
180 overall_stats: IoStats,
181 current_stats: IoStats,
182}
183
184#[derive(Clone)]
185pub struct Database {
186 unlocked_inner: Arc<DatabaseUnlockedInner>,
187 inner: Arc<Mutex<DatabaseInner>>,
188}
189
190impl Database {
191 pub fn open<P: AsRef<Path>>(path: P, config: DatabaseConfig) -> io::Result<Self> {
195 assert_ne!(config.columns, 0, "number of columns must be >= 1");
196
197 let path = PathBuf::from(path.as_ref());
198 let flags = config.flags;
199
200 let mut column_tables = vec![];
201 for n in 0..config.columns {
202 column_tables.push(Arc::new(DatabaseTable::new(get_column_table_name(n))))
203 }
204 let control_table = Arc::new(DatabaseTable::new("control".to_string()));
205
206 let pool_builder = PoolBuilder::new()
207 .path(&path)
208 .flags(flags)
209 .num_conns(config.num_conns);
210
211 let pool = pool_builder.open_blocking().map_err(io::Error::other)?;
212
213 let out = Self {
214 unlocked_inner: Arc::new(DatabaseUnlockedInner {
215 path,
216 config,
217 pool,
218 control_table,
219 column_tables,
220 }),
221 inner: Arc::new(Mutex::new(DatabaseInner {
222 overall_stats: IoStats::empty(),
223 current_stats: IoStats::empty(),
224 })),
225 };
226
227 let vacuum_mode = out.config().vacuum_mode;
228
229 out.conn_blocking(move |conn| {
230 conn.set_prepared_statement_cache_capacity(256);
232
233 conn.pragma_update(None, "case_sensitive_like", "ON")?;
234 conn.pragma_update(None, "journal_mode", "WAL")?;
235 conn.pragma_update(None, "synchronous", "normal")?;
236 conn.pragma_update(None, "journal_size_limit", 6144000)?;
237 conn.pragma_update(None, "wal_checkpoint", "TRUNCATE")?;
238
239 match vacuum_mode {
240 VacuumMode::None | VacuumMode::Full => {
241 let current: u32 =
242 conn.pragma_query_value(None, "auto_vacuum", |x| x.get(0))?;
243 if current != 0 {
244 conn.execute("VACUUM", [])?;
245 conn.pragma_update(None, "auto_vacuum", 0)?;
246 }
247 }
248 VacuumMode::Incremental => {
249 let current: u32 =
250 conn.pragma_query_value(None, "auto_vacuum", |x| x.get(0))?;
251 if current != 2 {
252 conn.execute("VACUUM", [])?;
253 conn.pragma_update(None, "auto_vacuum", "2")?;
254 }
255 }
256 }
257
258 Ok(())
259 })
260 .map_err(io::Error::other)?;
261
262 out.open_resize_columns()?;
263
264 Ok(out)
265 }
266
267 pub fn path(&self) -> PathBuf {
268 self.unlocked_inner.path.clone()
269 }
270
271 pub fn config(&self) -> DatabaseConfig {
272 self.unlocked_inner.config.clone()
273 }
274
275 pub fn columns(&self) -> u32 {
276 self.unlocked_inner.config.columns
277 }
278
279 pub fn control_table(&self) -> Arc<DatabaseTable> {
280 self.unlocked_inner.control_table.clone()
281 }
282
283 pub fn column_table(&self, col: u32) -> Arc<DatabaseTable> {
284 self.unlocked_inner.column_tables[col as usize].clone()
285 }
286
287 pub fn conn_blocking<T, F>(&self, func: F) -> Result<T, Error>
288 where
289 F: FnOnce(&rusqlite::Connection) -> Result<T, rusqlite::Error> + Send + 'static,
290 T: Send + 'static,
291 {
292 self.unlocked_inner.pool.conn_blocking(func)
293 }
294
295 pub async fn conn<T, F>(&self, func: F) -> Result<T, Error>
296 where
297 F: FnOnce(&rusqlite::Connection) -> Result<T, rusqlite::Error> + Send + 'static,
298 T: Send + 'static,
299 {
300 self.unlocked_inner.pool.conn(func).await
301 }
302
303 pub async fn conn_mut<T, F>(&self, func: F) -> Result<T, Error>
304 where
305 F: FnOnce(&mut rusqlite::Connection) -> Result<T, rusqlite::Error> + Send + 'static,
306 T: Send + 'static,
307 {
308 self.unlocked_inner.pool.conn_mut(func).await
309 }
310
311 pub fn remove_last_column(&self) -> Result<(), Error> {
316 let this = self.clone();
317 self.conn_blocking(move |conn| {
318 let columns = Self::get_unique_value(conn, this.control_table(), "columns", 0u32)?;
319 if columns == 0 {
320 return Err(rusqlite::Error::QueryReturnedNoRows);
321 }
322 Self::set_unique_value(conn, this.control_table(), "columns", columns - 1)?;
323
324 conn.execute(
325 &format!("DROP TABLE {}", get_column_table_name(columns - 1)),
326 [],
327 )?;
328 Ok(())
329 })
330 }
331
332 pub fn add_column(&self) -> Result<(), Error> {
334 let this = self.clone();
335
336 self.conn_blocking(move |conn| {
337 let columns = Self::get_unique_value(conn, this.control_table(), "columns", 0u32)?;
338 Self::set_unique_value(conn, this.control_table(), "columns", columns + 1)?;
339 Self::create_column_table(conn, columns)
340 })
341 }
342 pub fn transaction(&self) -> DBTransaction {
344 DBTransaction::new()
345 }
346
347 pub async fn vacuum(&self) -> Result<(), Error> {
349 match self.config().vacuum_mode {
350 VacuumMode::None => {
351 self.conn(move |conn| {
352 conn.pragma_update(None, "wal_checkpoint", "TRUNCATE")?;
353 Ok(())
354 })
355 .await
356 }
357 VacuumMode::Incremental => {
358 self.conn(move |conn| {
359 conn.execute("PRAGMA incremental_vacuum", [])?;
360 conn.pragma_update(None, "wal_checkpoint", "TRUNCATE")?;
361 Ok(())
362 })
363 .await
364 }
365 VacuumMode::Full => {
366 self.conn(move |conn| {
367 conn.execute("VACUUM", [])?;
368 conn.pragma_update(None, "wal_checkpoint", "TRUNCATE")?;
369 Ok(())
370 })
371 .await
372 }
373 }
374 }
375
376 fn validate_column(&self, col: u32) -> rusqlite::Result<()> {
380 if col >= self.columns() {
381 return Err(rusqlite::Error::InvalidColumnIndex(col as usize));
382 }
383 Ok(())
384 }
385
386 fn create_column_table(conn: &rusqlite::Connection, column: u32) -> rusqlite::Result<()> {
387 conn.execute(&format!("CREATE TABLE IF NOT EXISTS {} (id INTEGER PRIMARY KEY AUTOINCREMENT, [key] TEXT UNIQUE, value BLOB)", get_column_table_name(column)), []).map(drop)
388 }
389
390 fn get_unique_value<V>(
391 conn: &rusqlite::Connection,
392 table: Arc<DatabaseTable>,
393 key: &str,
394 default: V,
395 ) -> rusqlite::Result<V>
396 where
397 V: FromStr,
398 {
399 let mut stmt = conn.prepare_cached(&table.str_get_unique_value)?;
400
401 if let Ok(found) = stmt.query_row([key], |row| -> rusqlite::Result<String> { row.get(0) }) {
402 if let Ok(v) = V::from_str(&found) {
403 return Ok(v);
404 }
405 }
406 Ok(default)
407 }
408
409 fn set_unique_value<V>(
410 conn: &rusqlite::Connection,
411 table: Arc<DatabaseTable>,
412 key: &str,
413 value: V,
414 ) -> rusqlite::Result<()>
415 where
416 V: ToString,
417 {
418 let mut stmt = conn.prepare_cached(&table.str_set_unique_value)?;
419
420 let changed = stmt.execute([key, value.to_string().as_str()])?;
421
422 assert!(
423 changed <= 1,
424 "multiple changes to unique key should not occur"
425 );
426 if changed == 0 {
427 return Err(rusqlite::Error::QueryReturnedNoRows);
428 }
429
430 Ok(())
431 }
432
433 fn has_value(
434 conn: &rusqlite::Connection,
435 table: Arc<DatabaseTable>,
436 key: &str,
437 ) -> rusqlite::Result<bool> {
438 let mut stmt = conn.prepare_cached(&table.str_has_value)?;
439 stmt.exists([key])
440 }
441
442 fn has_value_like(
443 conn: &rusqlite::Connection,
444 table: Arc<DatabaseTable>,
445 key: &str,
446 ) -> rusqlite::Result<bool> {
447 let mut stmt = conn.prepare_cached(&table.str_has_value_like)?;
448 stmt.exists([key])
449 }
450
451 fn load_unique_value_blob(
452 conn: &rusqlite::Connection,
453 table: Arc<DatabaseTable>,
454 key: &str,
455 ) -> rusqlite::Result<Option<Vec<u8>>> {
456 let mut stmt = conn.prepare_cached(&table.str_get_unique_value)?;
457
458 stmt.query_row([key], |row| -> rusqlite::Result<Vec<u8>> { row.get(0) })
459 .optional()
460 }
461
462 fn load_first_value_blob_like(
463 conn: &rusqlite::Connection,
464 table: Arc<DatabaseTable>,
465 like: &str,
466 ) -> rusqlite::Result<Option<(String, Vec<u8>)>> {
467 let mut stmt = conn.prepare_cached(&table.str_get_first_value_like)?;
468
469 stmt.query_row([like], |row| -> rusqlite::Result<(String, Vec<u8>)> {
470 Ok((row.get(0)?, row.get(1)?))
471 })
472 .optional()
473 }
474
475 fn store_unique_value_blob(
476 conn: &rusqlite::Connection,
477 table: Arc<DatabaseTable>,
478 key: &str,
479 value: &[u8],
480 ) -> rusqlite::Result<()> {
481 let mut stmt = conn.prepare_cached(&table.str_set_unique_value)?;
482
483 let changed = stmt.execute(params![key, value])?;
484 assert!(
485 changed <= 1,
486 "multiple changes to unique key should not occur"
487 );
488 if changed == 0 {
489 return Err(rusqlite::Error::QueryReturnedNoRows);
490 }
491 Ok(())
492 }
493
494 fn remove_unique_value_blob(
495 conn: &rusqlite::Connection,
496 table: Arc<DatabaseTable>,
497 key: &str,
498 ) -> rusqlite::Result<()> {
499 let mut stmt = conn.prepare_cached(&table.str_remove_unique_value)?;
500
501 let changed = stmt.execute([key])?;
502 assert!(
503 changed <= 1,
504 "multiple deletions of unique key should not occur"
505 );
506 if changed == 0 {
507 return Err(rusqlite::Error::QueryReturnedNoRows);
508 }
509 Ok(())
510 }
511
512 fn remove_and_return_unique_value_blob(
513 conn: &rusqlite::Connection,
514 table: Arc<DatabaseTable>,
515 key: &str,
516 ) -> rusqlite::Result<Option<Vec<u8>>> {
517 let mut stmt = conn.prepare_cached(&table.str_remove_and_return_unique_value)?;
518
519 stmt.query_row([key], |row| -> rusqlite::Result<Vec<u8>> { row.get(0) })
520 .optional()
521 }
522
523 fn remove_unique_value_blob_like(
524 conn: &rusqlite::Connection,
525 table: Arc<DatabaseTable>,
526 like: &str,
527 ) -> rusqlite::Result<usize> {
528 let mut stmt = conn.prepare_cached(&table.str_remove_unique_value_like)?;
529
530 let changed = stmt.execute([like])?;
531 Ok(changed)
532 }
533
534 fn open_resize_columns(&self) -> io::Result<()> {
535 let columns = self.columns();
536 let this = self.clone();
537 self.conn_blocking(move |conn| {
538 conn.execute("CREATE TABLE IF NOT EXISTS control (id INTEGER PRIMARY KEY AUTOINCREMENT, [key] TEXT UNIQUE, value TEXT)", [])?;
540
541 let on_disk_columns =
543 Self::get_unique_value(conn, this.control_table(), "columns", 0u32)?;
544
545 if columns <= on_disk_columns {
547 return Ok(());
548 }
549
550 for cn in on_disk_columns..columns {
552 Self::create_column_table(conn, cn)?;
554 }
555 Self::set_unique_value(
556 conn,
557 this.control_table(),
558 "columns",
559 columns,
560 )?;
561 Ok(())
562 }).map_err(io::Error::other)
563 }
564
565 fn stats_read(&self, count: usize, bytes: usize) {
566 let mut inner = self.inner.lock();
567 inner.current_stats.reads += count as u64;
568 inner.overall_stats.reads += count as u64;
569 inner.current_stats.bytes_read += bytes as u64;
570 inner.overall_stats.bytes_read += bytes as u64;
571 }
572 fn stats_write(&self, count: usize, bytes: usize) {
573 let mut inner = self.inner.lock();
574 inner.current_stats.writes += count as u64;
575 inner.overall_stats.writes += count as u64;
576 inner.current_stats.bytes_written += bytes as u64;
577 inner.overall_stats.bytes_written += bytes as u64;
578 }
579 fn stats_transaction(&self, count: usize) {
580 let mut inner = self.inner.lock();
581 inner.current_stats.transactions += count as u64;
582 inner.overall_stats.transactions += count as u64;
583 }
584}
585
586impl KeyValueDB for Database {
587 fn get(
588 &self,
589 col: u32,
590 key: &[u8],
591 ) -> Pin<Box<dyn Future<Output = io::Result<Option<DBValue>>> + Send + '_>> {
592 let key_text = key_to_text(key);
593 let key_len = key.len();
594
595 Box::pin(async move {
596 let that = self.clone();
597 that.validate_column(col).map_err(io::Error::other)?;
598 let someval = self
599 .conn_blocking(move |conn| {
600 Self::load_unique_value_blob(conn, that.column_table(col), &key_text)
601 })
602 .map_err(io::Error::other)?;
603 {
604 match &someval {
605 Some(val) => self.stats_read(1, key_len + val.len()),
606 None => self.stats_read(1, key_len),
607 }
608 }
609
610 Ok(someval)
611 })
612 }
613
614 fn delete(
616 &self,
617 col: u32,
618 key: &[u8],
619 ) -> Pin<Box<dyn Future<Output = io::Result<Option<DBValue>>> + Send + '_>> {
620 let key_text = key_to_text(key);
621 let key_len = key.len();
622
623 Box::pin(async move {
624 let that = self.clone();
625 that.validate_column(col).map_err(io::Error::other)?;
626 self.conn_blocking(move |conn| {
627 let someval = Self::remove_and_return_unique_value_blob(
628 conn,
629 that.column_table(col),
630 &key_text,
631 )?;
632
633 match &someval {
634 Some(val) => {
635 that.stats_read(1, key_len + val.len());
636 }
637 None => that.stats_read(1, key_len),
638 }
639
640 Ok(someval)
641 })
642 .map_err(io::Error::other)
643 })
644 }
645
646 fn write(
647 &self,
648 transaction: DBTransaction,
649 ) -> Pin<Box<dyn Future<Output = Result<(), DBTransactionError>> + Send + '_>> {
650 let transaction = Arc::new(transaction);
651 Box::pin(async move {
652 self.stats_transaction(1);
653
654 let that = self.clone();
655 let transaction_clone = transaction.clone();
656 self.conn_mut(move |conn| {
657 let mut sw = 0usize;
658 let mut sbw = 0usize;
659
660 let tx = conn.transaction()?;
661
662 for op in &transaction_clone.ops {
663 match op {
664 DBOp::Insert { col, key, value } => {
665 that.validate_column(*col)?;
666 Self::store_unique_value_blob(
667 &tx,
668 that.column_table(*col),
669 &key_to_text(key),
670 value,
671 )?;
672 sw += 1;
673 sbw += key.len() + value.len();
674 }
675 DBOp::Delete { col, key } => {
676 that.validate_column(*col)?;
677 Self::remove_unique_value_blob(
678 &tx,
679 that.column_table(*col),
680 &key_to_text(key),
681 )?;
682 sw += 1;
683 }
684 DBOp::DeletePrefix { col, prefix } => {
685 that.validate_column(*col)?;
686 Self::remove_unique_value_blob_like(
687 &tx,
688 that.column_table(*col),
689 &(like_key_to_text(prefix) + "%"),
690 )?;
691 sw += 1;
692 }
693 }
694 }
695 tx.commit()?;
696
697 that.stats_write(sw, sbw);
698
699 Ok(())
700 })
701 .await
702 .map_err(io::Error::other)
703 .map_err(|error| {
704 let transaction = transaction.as_ref().clone();
705 DBTransactionError { error, transaction }
706 })
707 })
708 }
709
710 fn iter<
711 'a,
712 T: Send + 'static,
713 C: Send + 'static,
714 F: FnMut(&mut C, DBKeyValueRef) -> io::Result<Option<T>> + Send + Sync + 'static,
715 >(
716 &'a self,
717 col: u32,
718 prefix: Option<&'a [u8]>,
719 context: C,
720 mut f: F,
721 ) -> Pin<Box<dyn Future<Output = io::Result<(C, Option<T>)>> + Send + 'a>> {
722 let opt_prefix_query = prefix.map(|p| like_key_to_text(p) + "%");
723 Box::pin(async move {
724 if col >= self.columns() {
725 return Err(io::Error::from(io::ErrorKind::NotFound));
726 }
727
728 let that = self.clone();
729 let context = Arc::new(Mutex::new(Some(context)));
730 let context_ref = context.clone();
731
732 let res = self
733 .conn(move |conn| {
734 let mut context = context_ref.lock();
735 let context = context.as_mut().unwrap();
736
737 let mut stmt;
738 let mut rows;
739 if let Some(prefix_query) = opt_prefix_query {
740 stmt = match conn
741 .prepare_cached(&that.column_table(col).str_iter_with_prefix)
742 {
743 Ok(v) => v,
744 Err(e) => {
745 return Ok(Err(io::Error::other(e)));
746 }
747 };
748 rows = match stmt.query([prefix_query]) {
749 Ok(v) => v,
750 Err(e) => {
751 return Ok(Err(io::Error::other(e)));
752 }
753 };
754 } else {
755 stmt = match conn.prepare_cached(&that.column_table(col).str_iter_no_prefix)
756 {
757 Ok(v) => v,
758 Err(e) => {
759 return Ok(Err(io::Error::other(e)));
760 }
761 };
762 rows = match stmt.query([]) {
763 Ok(v) => v,
764 Err(e) => {
765 return Ok(Err(io::Error::other(e)));
766 }
767 };
768 }
769
770 let mut sw = 0usize;
771 let mut sbw = 0usize;
772
773 let out = loop {
774 match rows.next() {
775 Ok(Some(row)) => {
777 let kt: String = match row.get(0) {
778 Err(e) => {
779 break Err(io::Error::other(e));
780 }
781 Ok(v) => v,
782 };
783 let v: Vec<u8> = match row.get(1) {
784 Err(e) => {
785 break Err(io::Error::other(e));
786 }
787 Ok(v) => v,
788 };
789 let k: Vec<u8> = match text_to_key(&kt) {
790 Err(e) => {
791 break Err(io::Error::other(format!(
792 "SQLite row get column 0 text convert error: {:?}",
793 e
794 )));
795 }
796 Ok(v) => v,
797 };
798
799 sw += 1;
800 sbw += k.len() + v.len();
801
802 match f(context, (&k, &v)) {
803 Ok(None) => (),
804 Ok(Some(out)) => {
805 that.stats_read(sw, sbw);
807 break Ok(Some(out));
808 }
809 Err(e) => {
810 that.stats_read(sw, sbw);
812 break Err(e);
813 }
814 }
815 }
816 Ok(None) => {
818 break Ok(None);
819 }
820 Err(_) => {
822 break Ok(None);
823 }
824 }
825 };
826
827 that.stats_read(sw, sbw);
828
829 Ok(out)
830 })
831 .await
832 .map_err(io::Error::other)?;
833
834 let context = context.lock().take().unwrap();
835
836 res.map(|x| (context, x))
837 })
838 }
839
840 fn iter_keys<
841 'a,
842 T: Send + 'static,
843 C: Send + 'static,
844 F: FnMut(&mut C, DBKeyRef) -> io::Result<Option<T>> + Send + Sync + 'static,
845 >(
846 &'a self,
847 col: u32,
848 prefix: Option<&'a [u8]>,
849 context: C,
850 mut f: F,
851 ) -> Pin<Box<dyn Future<Output = io::Result<(C, Option<T>)>> + Send + 'a>> {
852 let opt_prefix_query = prefix.map(|p| like_key_to_text(p) + "%");
853 Box::pin(async move {
854 if col >= self.columns() {
855 return Err(io::Error::from(io::ErrorKind::NotFound));
856 }
857
858 let that = self.clone();
859 let context = Arc::new(Mutex::new(Some(context)));
860 let context_ref = context.clone();
861
862 let res = self
863 .conn(move |conn| {
864 let mut context = context_ref.lock();
865 let context = context.as_mut().unwrap();
866
867 let mut stmt;
868 let mut rows;
869 if let Some(prefix_query) = opt_prefix_query {
870 stmt = match conn
871 .prepare_cached(&that.column_table(col).str_iter_keys_with_prefix)
872 {
873 Ok(v) => v,
874 Err(e) => {
875 return Ok(Err(io::Error::other(e)));
876 }
877 };
878 rows = match stmt.query([prefix_query]) {
879 Ok(v) => v,
880 Err(e) => {
881 return Ok(Err(io::Error::other(e)));
882 }
883 };
884 } else {
885 stmt = match conn
886 .prepare_cached(&that.column_table(col).str_iter_keys_no_prefix)
887 {
888 Ok(v) => v,
889 Err(e) => {
890 return Ok(Err(io::Error::other(e)));
891 }
892 };
893 rows = match stmt.query([]) {
894 Ok(v) => v,
895 Err(e) => {
896 return Ok(Err(io::Error::other(e)));
897 }
898 };
899 }
900
901 let mut sw = 0usize;
902 let mut sbw = 0usize;
903
904 let out = loop {
905 match rows.next() {
906 Ok(Some(row)) => {
908 let kt: String = match row.get(0) {
909 Err(e) => {
910 break Err(io::Error::other(e));
911 }
912 Ok(v) => v,
913 };
914 let k: Vec<u8> = match text_to_key(&kt) {
915 Err(e) => {
916 break Err(io::Error::other(format!(
917 "SQLite row get column 0 text convert error: {:?}",
918 e
919 )));
920 }
921 Ok(v) => v,
922 };
923
924 sw += 1;
925 sbw += k.len();
926
927 match f(context, &k) {
928 Ok(None) => (),
929 Ok(Some(out)) => {
930 that.stats_read(sw, sbw);
932 break Ok(Some(out));
933 }
934 Err(e) => {
935 that.stats_read(sw, sbw);
937 break Err(e);
938 }
939 }
940 }
941 Ok(None) => {
943 break Ok(None);
944 }
945 Err(_) => {
947 break Ok(None);
948 }
949 }
950 };
951
952 that.stats_read(sw, sbw);
953
954 Ok(out)
955 })
956 .await
957 .map_err(io::Error::other)?;
958
959 let context = context.lock().take().unwrap();
960
961 res.map(|x| (context, x))
962 })
963 }
964
965 fn io_stats(&self, kind: IoStatsKind) -> IoStats {
966 let mut inner = self.inner.lock();
967 match kind {
968 IoStatsKind::Overall => {
969 let mut stats = inner.overall_stats.clone();
970 stats.span = std::time::SystemTime::now()
971 .duration_since(stats.started)
972 .unwrap_or_default();
973 stats
974 }
975 IoStatsKind::SincePrevious => {
976 let mut stats = inner.current_stats.clone();
977 stats.span = std::time::SystemTime::now()
978 .duration_since(stats.started)
979 .unwrap_or_default();
980 inner.current_stats = IoStats::empty();
981 stats
982 }
983 }
984 }
985
986 fn num_columns(&self) -> io::Result<u32> {
987 let this = self.clone();
988 self.conn_blocking(move |conn| {
989 Self::get_unique_value(conn, this.control_table(), "columns", 0u32)
990 })
991 .map_err(io::Error::other)
992 }
993
994 fn num_keys(&self, col: u32) -> Pin<Box<dyn Future<Output = io::Result<u64>> + Send + '_>> {
995 let this = self.clone();
996 Box::pin(async move {
997 this.conn(move |conn| {
998 conn.query_row(
999 &format!("SELECT Count(*) FROM {}", get_column_table_name(col)),
1000 [],
1001 |row| -> rusqlite::Result<u64> { row.get(0) },
1002 )
1003 })
1004 .await
1005 .map_err(|_| io::Error::from(io::ErrorKind::NotFound))
1006 })
1007 }
1008
1009 fn has_key<'a>(
1011 &'a self,
1012 col: u32,
1013 key: &'a [u8],
1014 ) -> Pin<Box<dyn Future<Output = io::Result<bool>> + Send + 'a>> {
1015 let key_text = key_to_text(key);
1016 let key_len = key.len();
1017
1018 Box::pin(async move {
1019 let that = self.clone();
1020 that.validate_column(col).map_err(io::Error::other)?;
1021 let someval = self
1022 .conn_blocking(move |conn| Self::has_value(conn, that.column_table(col), &key_text))
1023 .map_err(io::Error::other)?;
1024
1025 self.stats_read(1, key_len);
1026
1027 Ok(someval)
1028 })
1029 }
1030
1031 fn has_prefix<'a>(
1033 &'a self,
1034 col: u32,
1035 prefix: &'a [u8],
1036 ) -> Pin<Box<dyn Future<Output = io::Result<bool>> + Send + 'a>> {
1037 let prefix_len = prefix.len();
1038 let prefix_text = like_key_to_text(prefix) + "%";
1039
1040 Box::pin(async move {
1041 let that = self.clone();
1042 that.validate_column(col).map_err(io::Error::other)?;
1043 let someval = self
1044 .conn_blocking(move |conn| {
1045 Self::has_value_like(conn, that.column_table(col), &prefix_text)
1046 })
1047 .map_err(io::Error::other)?;
1048
1049 self.stats_read(1, prefix_len);
1050
1051 Ok(someval)
1052 })
1053 }
1054
1055 fn first_with_prefix<'a>(
1057 &'a self,
1058 col: u32,
1059 prefix: &'a [u8],
1060 ) -> Pin<Box<dyn Future<Output = io::Result<Option<DBKeyValue>>> + Send + 'a>> {
1061 let prefix_len = prefix.len();
1062 let like = like_key_to_text(prefix) + "%";
1063
1064 Box::pin(async move {
1065 let that = self.clone();
1066 that.validate_column(col).map_err(io::Error::other)?;
1067 let someval = self
1068 .conn_blocking(move |conn| {
1069 Self::load_first_value_blob_like(conn, that.column_table(col), &like)
1070 })
1071 .map_err(io::Error::other)?;
1072
1073 self.stats_read(1, prefix_len);
1074
1075 match someval {
1076 Some((kt, val)) => match text_to_key(&kt) {
1077 Err(e) => {
1078 return Err(io::Error::other(format!(
1079 "SQLite row get column 0 text convert error: {:?}",
1080 e
1081 )))
1082 }
1083 Ok(k) => Ok(Some((k, val))),
1084 },
1085 None => Ok(None),
1086 }
1087 })
1088 }
1089
1090 fn cleanup(&self) -> Pin<Box<dyn Future<Output = io::Result<()>> + Send + '_>> {
1092 Box::pin(async { self.vacuum().await.map_err(io::Error::other) })
1093 }
1094}
1095
1096#[cfg(test)]
1097mod tests {
1098
1099 use super::*;
1100 use keyvaluedb_shared_tests as st;
1101 use tempfile::Builder as TempfileBuilder;
1102
1103 fn create(columns: u32) -> io::Result<Database> {
1104 let tempfile = TempfileBuilder::new()
1105 .prefix("")
1106 .tempfile()?
1107 .path()
1108 .to_path_buf();
1109 let config = DatabaseConfig::new().with_columns(columns);
1110 Database::open(tempfile, config)
1111 }
1112
1113 fn create_vacuum_mode(columns: u32, vacuum_mode: VacuumMode) -> io::Result<Database> {
1114 let tempfile = TempfileBuilder::new()
1115 .prefix("")
1116 .tempfile()?
1117 .path()
1118 .to_path_buf();
1119 let config = DatabaseConfig::new()
1120 .with_columns(columns)
1121 .with_vacuum_mode(vacuum_mode);
1122 Database::open(tempfile, config)
1123 }
1124
1125 #[tokio::test]
1126 async fn get_fails_with_non_existing_column() -> io::Result<()> {
1127 let db = create(1)?;
1128 st::test_get_fails_with_non_existing_column(db).await
1129 }
1130
1131 #[tokio::test]
1132 async fn num_keys() -> io::Result<()> {
1133 let db = create(1)?;
1134 st::test_num_keys(db).await
1135 }
1136
1137 #[tokio::test]
1138 async fn put_and_get() -> io::Result<()> {
1139 let db = create(1)?;
1140 st::test_put_and_get(db).await
1141 }
1142
1143 #[tokio::test]
1144 async fn delete_and_get() -> io::Result<()> {
1145 let db = create(1)?;
1146 st::test_delete_and_get(db).await
1147 }
1148
1149 #[tokio::test]
1150 async fn delete_and_get_single() -> io::Result<()> {
1151 let db = create(1)?;
1152 st::test_delete_and_get_single(db).await
1153 }
1154
1155 #[tokio::test]
1156 async fn delete_prefix() -> io::Result<()> {
1157 let db = create(st::DELETE_PREFIX_NUM_COLUMNS)?;
1158 st::test_delete_prefix(db).await
1159 }
1160
1161 #[tokio::test]
1162 async fn iter() -> io::Result<()> {
1163 let db = create(1)?;
1164 st::test_iter(db).await
1165 }
1166
1167 #[tokio::test]
1168 async fn iter_keys() -> io::Result<()> {
1169 let db = create(1)?;
1170 st::test_iter_keys(db).await
1171 }
1172
1173 #[tokio::test]
1174 async fn iter_with_prefix() -> io::Result<()> {
1175 let db = create(1)?;
1176 st::test_iter_with_prefix(db).await
1177 }
1178
1179 #[tokio::test]
1180 async fn complex() -> io::Result<()> {
1181 let db = create(1)?;
1182 st::test_complex(db).await
1183 }
1184
1185 #[tokio::test]
1186 async fn cleanup() -> io::Result<()> {
1187 let db = create(1)?;
1188 st::test_cleanup(db).await?;
1189
1190 let db = create_vacuum_mode(1, VacuumMode::None)?;
1191 st::test_cleanup(db).await?;
1192
1193 let db = create_vacuum_mode(1, VacuumMode::Incremental)?;
1194 st::test_cleanup(db).await?;
1195
1196 let db = create_vacuum_mode(1, VacuumMode::Full)?;
1197 st::test_cleanup(db).await?;
1198
1199 let tempfile = TempfileBuilder::new()
1200 .prefix("")
1201 .tempfile()?
1202 .path()
1203 .to_path_buf();
1204 let config = DatabaseConfig::new().with_vacuum_mode(VacuumMode::None);
1205 let db = Database::open(tempfile.clone(), config)?;
1206 st::test_cleanup(db).await?;
1207
1208 let config = DatabaseConfig::new().with_vacuum_mode(VacuumMode::Incremental);
1209 let db = Database::open(tempfile.clone(), config)?;
1210 st::test_cleanup(db).await?;
1211
1212 let config = DatabaseConfig::new().with_vacuum_mode(VacuumMode::Full);
1213 let db = Database::open(tempfile.clone(), config)?;
1214 st::test_cleanup(db).await?;
1215
1216 let config = DatabaseConfig::new().with_vacuum_mode(VacuumMode::None);
1217 let db = Database::open(tempfile, config)?;
1218 st::test_cleanup(db).await?;
1219
1220 Ok(())
1221 }
1222
1223 #[tokio::test]
1224 async fn stats() -> io::Result<()> {
1225 let db = create(st::IO_STATS_NUM_COLUMNS)?;
1226 st::test_io_stats(db).await
1227 }
1228
1229 #[tokio::test]
1230 #[should_panic]
1231 async fn db_config_with_zero_columns() {
1232 let _cfg = DatabaseConfig::new().with_columns(0);
1233 }
1234
1235 #[tokio::test]
1236 #[should_panic]
1237 async fn open_db_with_zero_columns() {
1238 let cfg = DatabaseConfig::new().with_columns(0);
1239 let _db = Database::open("", cfg);
1240 }
1241
1242 #[tokio::test]
1243 async fn add_columns() {
1244 let config_1 = DatabaseConfig::default();
1245 let config_5 = DatabaseConfig::new().with_columns(5);
1246
1247 let tempfile = TempfileBuilder::new()
1248 .prefix("")
1249 .tempfile()
1250 .unwrap()
1251 .path()
1252 .to_path_buf();
1253
1254 {
1256 let db = Database::open(&tempfile, config_1).unwrap();
1257 assert_eq!(db.num_columns().unwrap(), 1);
1258
1259 for i in 2..=5 {
1260 db.add_column().unwrap();
1261 assert_eq!(db.num_columns().unwrap(), i);
1262 }
1263 }
1264
1265 {
1267 let db = Database::open(&tempfile, config_5).unwrap();
1268 assert_eq!(db.num_columns().unwrap(), 5);
1269 }
1270 }
1271
1272 #[tokio::test]
1273 async fn remove_columns() {
1274 let config_1 = DatabaseConfig::default();
1275 let config_5 = DatabaseConfig::new().with_columns(5);
1276
1277 let tempfile = TempfileBuilder::new()
1278 .prefix("drop_columns")
1279 .tempfile()
1280 .unwrap()
1281 .path()
1282 .to_path_buf();
1283
1284 {
1286 let db = Database::open(&tempfile, config_5).expect("open with 5 columns");
1287 assert_eq!(db.num_columns().unwrap(), 5);
1288
1289 for i in (1..5).rev() {
1290 db.remove_last_column().unwrap();
1291 assert_eq!(db.num_columns().unwrap(), i);
1292 }
1293 }
1294
1295 {
1297 let db = Database::open(&tempfile, config_1).unwrap();
1298 assert_eq!(db.num_columns().unwrap(), 1);
1299 }
1300 }
1301
1302 #[tokio::test]
1303 async fn test_num_keys() {
1304 let tempfile = TempfileBuilder::new()
1305 .prefix("")
1306 .tempfile()
1307 .unwrap()
1308 .path()
1309 .to_path_buf();
1310 let config = DatabaseConfig::new().with_columns(1);
1311 let db = Database::open(tempfile, config).unwrap();
1312
1313 assert_eq!(
1314 db.num_keys(0).await.unwrap(),
1315 0,
1316 "database is empty after creation"
1317 );
1318 let key1 = b"beef";
1319 let mut batch = db.transaction();
1320 batch.put(0, key1, key1);
1321 db.write(batch).await.unwrap();
1322 assert_eq!(
1323 db.num_keys(0).await.unwrap(),
1324 1,
1325 "adding a key increases the count"
1326 );
1327 }
1328}