1use std::collections::BTreeMap;
2use std::fmt::{self, Debug, Formatter};
3use std::sync::Arc;
4
5use async_stream::try_stream;
6use futures_lite::{
7 pin,
8 stream::{Stream, StreamExt},
9};
10
11use sqlx::{
12 pool::PoolConnection,
13 sqlite::{Sqlite, SqlitePool},
14 Acquire, Database, Error as SqlxError, Row, TransactionManager,
15};
16
17use super::{
18 db_utils::{
19 decode_tags, decrypt_scan_batch, encode_profile_key, encode_tag_filter, expiry_timestamp,
20 extend_query, prepare_tags, random_profile_name, Connection, DbSession, DbSessionActive,
21 DbSessionRef, DbSessionTxn, EncScanEntry, ExtDatabase, QueryParams, QueryPrepare,
22 PAGE_SIZE,
23 },
24 Backend, BackendSession,
25};
26use crate::{
27 backend::OrderBy,
28 entry::{EncEntryTag, Entry, EntryKind, EntryOperation, EntryTag, Scan, TagFilter},
29 error::Error,
30 future::{unblock, BoxFuture},
31 protect::{EntryEncryptor, KeyCache, PassKey, ProfileId, ProfileKey, StoreKeyMethod},
32};
33
34mod provision;
35pub use provision::SqliteStoreOptions;
36
37const CONFIG_FETCH_QUERY: &str = "SELECT value FROM config WHERE name = ?1";
38const CONFIG_UPDATE_QUERY: &str = "INSERT OR REPLACE INTO config (name, value) VALUES (?1, ?2)";
39const COUNT_QUERY: &str = "SELECT COUNT(*) FROM items i
40 WHERE profile_id = ?1
41 AND (kind = ?2 OR ?2 IS NULL)
42 AND (category = ?3 OR ?3 IS NULL)
43 AND (expiry IS NULL OR DATETIME(expiry) > DATETIME('now'))";
44const DELETE_QUERY: &str = "DELETE FROM items
45 WHERE profile_id = ?1 AND kind = ?2 AND category = ?3 AND name = ?4";
46const FETCH_QUERY: &str = "SELECT i.id, i.value,
47 (SELECT GROUP_CONCAT(it.plaintext || ':' || HEX(it.name) || ':' || HEX(it.value))
48 FROM items_tags it WHERE it.item_id = i.id) AS tags
49 FROM items i WHERE i.profile_id = ?1 AND i.kind = ?2
50 AND i.category = ?3 AND i.name = ?4
51 AND (i.expiry IS NULL OR DATETIME(i.expiry) > DATETIME('now'))";
52const INSERT_QUERY: &str =
53 "INSERT OR IGNORE INTO items (profile_id, kind, category, name, value, expiry)
54 VALUES (?1, ?2, ?3, ?4, ?5, ?6)";
55const UPDATE_QUERY: &str = "UPDATE items SET value=?5, expiry=?6 WHERE profile_id=?1 AND kind=?2
56 AND category=?3 AND name=?4 RETURNING id";
57const SCAN_QUERY: &str = "SELECT i.id, i.kind, i.category, i.name, i.value,
58 (SELECT GROUP_CONCAT(it.plaintext || ':' || HEX(it.name) || ':' || HEX(it.value))
59 FROM items_tags it WHERE it.item_id = i.id) AS tags
60 FROM items i WHERE i.profile_id = ?1
61 AND (i.kind = ?2 OR ?2 IS NULL)
62 AND (i.category = ?3 OR ?3 IS NULL)
63 AND (i.expiry IS NULL OR DATETIME(i.expiry) > DATETIME('now'))";
64const DELETE_ALL_QUERY: &str = "DELETE FROM items AS i
65 WHERE i.profile_id = ?1
66 AND (i.kind = ?2 OR ?2 IS NULL)
67 AND (i.category = ?3 OR ?3 IS NULL)";
68const TAG_INSERT_QUERY: &str = "INSERT INTO items_tags
69 (item_id, name, value, plaintext) VALUES (?1, ?2, ?3, ?4)";
70const TAG_DELETE_QUERY: &str = "DELETE FROM items_tags
71 WHERE item_id=?1";
72
73pub struct SqliteBackend {
75 conn_pool: SqlitePool,
76 active_profile: String,
77 key_cache: Arc<KeyCache>,
78 path: String,
79}
80
81impl SqliteBackend {
82 pub(crate) fn new(
83 conn_pool: SqlitePool,
84 active_profile: String,
85 key_cache: KeyCache,
86 path: String,
87 ) -> Self {
88 Self {
89 conn_pool,
90 active_profile,
91 key_cache: Arc::new(key_cache),
92 path,
93 }
94 }
95}
96
97impl Debug for SqliteBackend {
98 fn fmt(&self, f: &mut Formatter<'_>) -> fmt::Result {
99 f.debug_struct("SqliteStore")
100 .field("active_profile", &self.active_profile)
101 .field("path", &self.path)
102 .finish()
103 }
104}
105
106impl QueryPrepare for SqliteBackend {
107 type DB = Sqlite;
108}
109
110impl Backend for SqliteBackend {
111 type Session = DbSession<Sqlite>;
112
113 fn create_profile(&self, name: Option<String>) -> BoxFuture<'_, Result<String, Error>> {
114 let name = name.unwrap_or_else(random_profile_name);
115 Box::pin(async move {
116 let store_key = self.key_cache.store_key.clone();
117 let (profile_key, enc_key) = unblock(move || {
118 let profile_key = ProfileKey::new()?;
119 let enc_key = encode_profile_key(&profile_key, &store_key)?;
120 Result::<_, Error>::Ok((profile_key, enc_key))
121 })
122 .await?;
123 let mut conn = self.conn_pool.acquire().await?;
124 let done =
125 sqlx::query("INSERT OR IGNORE INTO profiles (name, profile_key) VALUES (?1, ?2)")
126 .bind(&name)
127 .bind(enc_key)
128 .execute(conn.as_mut())
129 .await?;
130 conn.return_to_pool().await;
131 if done.rows_affected() == 0 {
132 return Err(err_msg!(Duplicate, "Duplicate profile name"));
133 }
134 self.key_cache
135 .add_profile(
136 name.clone(),
137 done.last_insert_rowid(),
138 Arc::new(profile_key),
139 )
140 .await;
141 Ok(name)
142 })
143 }
144
145 fn get_active_profile(&self) -> String {
146 self.active_profile.clone()
147 }
148
149 fn get_default_profile(&self) -> BoxFuture<'_, Result<String, Error>> {
150 Box::pin(async move {
151 let mut conn = self.conn_pool.acquire().await?;
152 let profile: Option<String> = sqlx::query_scalar(CONFIG_FETCH_QUERY)
153 .bind("default_profile")
154 .fetch_one(conn.as_mut())
155 .await
156 .map_err(err_map!(Backend, "Error fetching default profile name"))?;
157 conn.return_to_pool().await;
158 Ok(profile.unwrap_or_default())
159 })
160 }
161
162 fn set_default_profile(&self, profile: String) -> BoxFuture<'_, Result<(), Error>> {
163 Box::pin(async move {
164 let mut conn = self.conn_pool.acquire().await?;
165 sqlx::query(CONFIG_UPDATE_QUERY)
166 .bind("default_profile")
167 .bind(profile)
168 .execute(conn.as_mut())
169 .await
170 .map_err(err_map!(Backend, "Error setting default profile name"))?;
171 conn.return_to_pool().await;
172 Ok(())
173 })
174 }
175
176 fn list_profiles(&self) -> BoxFuture<'_, Result<Vec<String>, Error>> {
177 Box::pin(async move {
178 let mut conn = self.conn_pool.acquire().await?;
179 let rows = sqlx::query("SELECT name FROM profiles")
180 .fetch_all(conn.as_mut())
181 .await
182 .map_err(err_map!(Backend, "Error fetching profile list"))?;
183 conn.return_to_pool().await;
184 let names = rows.into_iter().flat_map(|r| r.try_get(0)).collect();
185 Ok(names)
186 })
187 }
188
189 fn remove_profile(&self, name: String) -> BoxFuture<'_, Result<bool, Error>> {
190 Box::pin(async move {
191 let mut conn = self.conn_pool.acquire().await?;
192 let ret = sqlx::query("DELETE FROM profiles WHERE name=?")
193 .bind(&name)
194 .execute(conn.as_mut())
195 .await
196 .map_err(err_map!(Backend, "Error removing profile"))?
197 .rows_affected()
198 != 0;
199 conn.return_to_pool().await;
200 Ok(ret)
201 })
202 }
203
204 fn rekey(
205 &mut self,
206 method: StoreKeyMethod,
207 pass_key: PassKey<'_>,
208 ) -> BoxFuture<'_, Result<(), Error>> {
209 let pass_key = pass_key.into_owned();
210 Box::pin(async move {
211 let (store_key, store_key_ref) = unblock(move || method.resolve(pass_key)).await?;
212 let store_key = Arc::new(store_key);
213 let mut conn = self.conn_pool.acquire().await?;
214 let mut txn = conn.begin().await?;
215 let mut rows = sqlx::query("SELECT id, profile_key FROM profiles").fetch(txn.as_mut());
216 let mut upd_keys = BTreeMap::<ProfileId, Vec<u8>>::new();
217 while let Some(row) = rows.next().await {
218 let row = row?;
219 let pid = row.try_get(0)?;
220 let enc_key = row.try_get(1)?;
221 let profile_key = self.key_cache.load_key(enc_key).await?;
222 let upd_key = unblock({
223 let store_key = store_key.clone();
224 move || encode_profile_key(&profile_key, &store_key)
225 })
226 .await?;
227 upd_keys.insert(pid, upd_key);
228 }
229 drop(rows);
230 for (pid, key) in upd_keys {
231 if sqlx::query("UPDATE profiles SET profile_key=?1 WHERE id=?2")
232 .bind(key)
233 .bind(pid)
234 .execute(txn.as_mut())
235 .await?
236 .rows_affected()
237 != 1
238 {
239 return Err(err_msg!(Backend, "Error updating profile key"));
240 }
241 }
242 if sqlx::query("UPDATE config SET value=?1 WHERE name='key'")
243 .bind(store_key_ref.into_uri())
244 .execute(txn.as_mut())
245 .await?
246 .rows_affected()
247 != 1
248 {
249 return Err(err_msg!(Backend, "Error updating store key"));
250 }
251 txn.commit().await?;
252 conn.return_to_pool().await;
253 self.key_cache = Arc::new(KeyCache::new(store_key));
254 Ok(())
255 })
256 }
257
258 fn scan(
259 &self,
260 profile: Option<String>,
261 kind: Option<EntryKind>,
262 category: Option<String>,
263 tag_filter: Option<TagFilter>,
264 offset: Option<i64>,
265 limit: Option<i64>,
266 order_by: Option<OrderBy>,
267 descending: bool,
268 ) -> BoxFuture<'_, Result<Scan<'static, Entry>, Error>> {
269 Box::pin(async move {
270 let session = self.session(profile, false)?;
271 let mut active = session.owned_ref();
272 let (profile_id, key) = acquire_key(&mut active).await?;
273 let scan = perform_scan(
274 active,
275 profile_id,
276 key.clone(),
277 kind,
278 category.clone(),
279 tag_filter,
280 offset,
281 limit,
282 order_by,
283 descending,
284 );
285 let stream = scan.then(move |enc_rows| {
286 let category = category.clone();
287 let key = key.clone();
288 unblock(move || decrypt_scan_batch(category, enc_rows?, &key))
289 });
290 Ok(Scan::new(stream, PAGE_SIZE))
291 })
292 }
293
294 fn session(&self, profile: Option<String>, transaction: bool) -> Result<Self::Session, Error> {
295 Ok(DbSession::new(
296 self.conn_pool.clone(),
297 self.key_cache.clone(),
298 profile.unwrap_or_else(|| self.active_profile.clone()),
299 transaction,
300 ))
301 }
302
303 fn close(&self) -> BoxFuture<'_, Result<(), Error>> {
304 Box::pin(async move {
305 self.conn_pool.close().await;
306 Ok(())
307 })
308 }
309}
310
311impl BackendSession for DbSession<Sqlite> {
312 fn count<'q>(
313 &'q mut self,
314 kind: Option<EntryKind>,
315 category: Option<&'q str>,
316 tag_filter: Option<TagFilter>,
317 ) -> BoxFuture<'q, Result<i64, Error>> {
318 let enc_category = category.map(|c| ProfileKey::prepare_input(c.as_bytes()));
319
320 Box::pin(async move {
321 let (profile_id, key) = acquire_key(&mut *self).await?;
322 let mut params = QueryParams::new();
323 params.push(profile_id);
324 params.push(kind.map(|k| k as i16));
325 let (enc_category, tag_filter) = unblock({
326 let params_len = params.len() + 1; move || {
328 Result::<_, Error>::Ok((
329 enc_category
330 .map(|c| key.encrypt_entry_category(c))
331 .transpose()?,
332 encode_tag_filter::<SqliteBackend>(tag_filter, &key, params_len)?,
333 ))
334 }
335 })
336 .await?;
337 params.push(enc_category);
338 let query = extend_query::<SqliteBackend>(
339 COUNT_QUERY,
340 &mut params,
341 tag_filter,
342 None,
343 None,
344 None,
345 false,
346 )?;
347 let mut active = acquire_session(&mut *self).await?;
348 let count = sqlx::query_scalar_with(query.as_str(), params)
349 .fetch_one(active.connection_mut())
350 .await
351 .map_err(err_map!(Backend, "Error performing count query"))?;
352 Ok(count)
353 })
354 }
355
356 fn fetch(
357 &mut self,
358 kind: EntryKind,
359 category: &str,
360 name: &str,
361 _for_update: bool,
362 ) -> BoxFuture<'_, Result<Option<Entry>, Error>> {
363 let category = category.to_string();
364 let name = name.to_string();
365
366 Box::pin(async move {
367 let (profile_id, key) = acquire_key(&mut *self).await?;
368 let (enc_category, enc_name) = unblock({
369 let key = key.clone();
370 let category = ProfileKey::prepare_input(category.as_bytes());
371 let name = ProfileKey::prepare_input(name.as_bytes());
372 move || {
373 Result::<_, Error>::Ok((
374 key.encrypt_entry_category(category)?,
375 key.encrypt_entry_name(name)?,
376 ))
377 }
378 })
379 .await?;
380 let mut active = acquire_session(&mut *self).await?;
381 if let Some(row) = sqlx::query(FETCH_QUERY)
382 .bind(profile_id)
383 .bind(kind as i16)
384 .bind(enc_category)
385 .bind(enc_name)
386 .fetch_optional(active.connection_mut())
387 .await
388 .map_err(err_map!(Backend, "Error performing fetch query"))?
389 {
390 let value = row.try_get(1)?;
391 let tags = row.try_get(2)?;
392 let (category, name, value, tags) = unblock(move || {
393 let value = key.decrypt_entry_value(category.as_ref(), name.as_ref(), value)?;
394 let enc_tags = decode_tags(tags)
395 .map_err(|_| err_msg!(Unexpected, "Error decoding entry tags"))?;
396 let tags = key.decrypt_entry_tags(enc_tags)?;
397 Result::<_, Error>::Ok((category, name, value, tags))
398 })
399 .await?;
400 Ok(Some(Entry::new(kind, category, name, value, tags)))
401 } else {
402 Ok(None)
403 }
404 })
405 }
406
407 fn fetch_all<'q>(
408 &'q mut self,
409 kind: Option<EntryKind>,
410 category: Option<&'q str>,
411 tag_filter: Option<TagFilter>,
412 limit: Option<i64>,
413 order_by: Option<OrderBy>,
414 descending: bool,
415 _for_update: bool,
416 ) -> BoxFuture<'q, Result<Vec<Entry>, Error>> {
417 let category = category.map(|c| c.to_string());
418 Box::pin(async move {
419 let mut active = self.borrow_mut();
420 let (profile_id, key) = acquire_key(&mut active).await?;
421 let scan = perform_scan(
422 active,
423 profile_id,
424 key.clone(),
425 kind,
426 category.clone(),
427 tag_filter,
428 None,
429 limit,
430 order_by,
431 descending,
432 );
433 pin!(scan);
434 let mut enc_rows = vec![];
435 while let Some(rows) = scan.try_next().await? {
436 enc_rows.extend(rows)
437 }
438 unblock(move || decrypt_scan_batch(category, enc_rows, &key)).await
439 })
440 }
441
442 fn remove_all<'q>(
443 &'q mut self,
444 kind: Option<EntryKind>,
445 category: Option<&'q str>,
446 tag_filter: Option<TagFilter>,
447 ) -> BoxFuture<'q, Result<i64, Error>> {
448 let enc_category = category.map(|c| ProfileKey::prepare_input(c.as_bytes()));
449
450 Box::pin(async move {
451 let (profile_id, key) = acquire_key(&mut *self).await?;
452 let mut params = QueryParams::new();
453 params.push(profile_id);
454 params.push(kind.map(|k| k as i16));
455 let (enc_category, tag_filter) = unblock({
456 let params_len = params.len() + 1; move || {
458 Result::<_, Error>::Ok((
459 enc_category
460 .map(|c| key.encrypt_entry_category(c))
461 .transpose()?,
462 encode_tag_filter::<SqliteBackend>(tag_filter, &key, params_len)?,
463 ))
464 }
465 })
466 .await?;
467 params.push(enc_category);
468 let query = extend_query::<SqliteBackend>(
469 DELETE_ALL_QUERY,
470 &mut params,
471 tag_filter,
472 None,
473 None,
474 None,
475 false,
476 )?;
477
478 let mut active = acquire_session(&mut *self).await?;
479 let removed = sqlx::query_with(query.as_str(), params)
480 .execute(active.connection_mut())
481 .await?
482 .rows_affected();
483 Ok(removed as i64)
484 })
485 }
486
487 fn update<'q>(
488 &'q mut self,
489 kind: EntryKind,
490 operation: EntryOperation,
491 category: &'q str,
492 name: &'q str,
493 value: Option<&'q [u8]>,
494 tags: Option<&'q [EntryTag]>,
495 expiry_ms: Option<i64>,
496 ) -> BoxFuture<'q, Result<(), Error>> {
497 let category = ProfileKey::prepare_input(category.as_bytes());
498 let name = ProfileKey::prepare_input(name.as_bytes());
499
500 match operation {
501 op @ EntryOperation::Insert | op @ EntryOperation::Replace => {
502 let value = ProfileKey::prepare_input(value.unwrap_or_default());
503 let tags = tags.map(prepare_tags);
504 Box::pin(async move {
505 let (_, key) = acquire_key(&mut *self).await?;
506 let (enc_category, enc_name, enc_value, enc_tags) = unblock(move || {
507 let enc_value =
508 key.encrypt_entry_value(category.as_ref(), name.as_ref(), value)?;
509 Result::<_, Error>::Ok((
510 key.encrypt_entry_category(category)?,
511 key.encrypt_entry_name(name)?,
512 enc_value,
513 tags.transpose()?
514 .map(|t| key.encrypt_entry_tags(t))
515 .transpose()?,
516 ))
517 })
518 .await?;
519 let mut active = acquire_session(&mut *self).await?;
520 let mut txn = active.as_transaction().await?;
521 perform_insert(
522 &mut txn,
523 kind,
524 &enc_category,
525 &enc_name,
526 &enc_value,
527 enc_tags,
528 expiry_ms,
529 op == EntryOperation::Insert,
530 )
531 .await?;
532 txn.commit().await?;
533 Ok(())
534 })
535 }
536
537 EntryOperation::Remove => Box::pin(async move {
538 let (_, key) = acquire_key(&mut *self).await?;
539 let (enc_category, enc_name) = unblock(move || {
540 Result::<_, Error>::Ok((
541 key.encrypt_entry_category(category)?,
542 key.encrypt_entry_name(name)?,
543 ))
544 })
545 .await?;
546 let mut active = acquire_session(&mut *self).await?;
547 perform_remove(&mut active, kind, &enc_category, &enc_name, false).await
548 }),
549 }
550 }
551
552 fn ping(&mut self) -> BoxFuture<'_, Result<(), Error>> {
553 Box::pin(async move {
554 let mut sess = acquire_session(&mut *self).await?;
555 let count: i64 = sqlx::query_scalar("SELECT COUNT(*) FROM profiles WHERE id=$1")
556 .bind(sess.profile_id)
557 .fetch_one(sess.connection_mut())
558 .await
559 .map_err(err_map!(Backend, "Error pinging session"))?;
560 if count == 0 {
561 Err(err_msg!(NotFound, "Session profile has been removed"))
562 } else {
563 Ok(())
564 }
565 })
566 }
567
568 fn close(&mut self, commit: bool) -> BoxFuture<'_, Result<(), Error>> {
569 Box::pin(self.close(commit))
570 }
571}
572
573impl ExtDatabase for Sqlite {
574 fn start_transaction(
575 conn: &mut Connection<Self>,
576 nested: bool,
577 ) -> BoxFuture<'_, std::result::Result<(), SqlxError>> {
578 Box::pin(async move {
582 <Sqlite as Database>::TransactionManager::begin(conn).await?;
583 if !nested {
584 sqlx::query("DELETE FROM config WHERE 0")
586 .execute(conn)
587 .await?;
588 }
589 Ok(())
590 })
591 }
592}
593
594async fn acquire_key(
595 session: &mut DbSession<Sqlite>,
596) -> Result<(ProfileId, Arc<ProfileKey>), Error> {
597 acquire_session(session).await?;
598 Ok(session.profile_and_key().unwrap())
599}
600
601async fn acquire_session(
602 session: &mut DbSession<Sqlite>,
603) -> Result<DbSessionActive<'_, Sqlite>, Error> {
604 session.make_active(&resolve_profile_key).await
605}
606
607async fn resolve_profile_key(
608 conn: &mut PoolConnection<Sqlite>,
609 cache: Arc<KeyCache>,
610 profile: String,
611 _in_txn: bool,
612) -> Result<(ProfileId, Arc<ProfileKey>), Error> {
613 if let Some((pid, key)) = cache.get_profile(profile.as_str()).await {
614 Ok((pid, key))
615 } else if let Some(row) = sqlx::query("SELECT id, profile_key FROM profiles WHERE name=?1")
616 .bind(profile.as_str())
617 .fetch_optional(conn.as_mut())
618 .await
619 .map_err(err_map!(Backend, "Error fetching profile key"))?
620 {
621 let pid = row.try_get(0)?;
622 let key = Arc::new(cache.load_key(row.try_get(1)?).await?);
623 cache.add_profile(profile, pid, key.clone()).await;
624 Ok((pid, key))
625 } else {
626 Err(err_msg!(NotFound, "Profile not found"))
627 }
628}
629
630#[allow(clippy::too_many_arguments)]
631async fn perform_insert(
632 active: &mut DbSessionTxn<'_, Sqlite>,
633 kind: EntryKind,
634 enc_category: &[u8],
635 enc_name: &[u8],
636 enc_value: &[u8],
637 enc_tags: Option<Vec<EncEntryTag>>,
638 expiry_ms: Option<i64>,
639 new_row: bool,
640) -> Result<(), Error> {
641 let row_id = if new_row {
642 trace!("Insert entry");
643 let done = sqlx::query(INSERT_QUERY)
644 .bind(active.profile_id)
645 .bind(kind as i16)
646 .bind(enc_category)
647 .bind(enc_name)
648 .bind(enc_value)
649 .bind(expiry_ms.map(expiry_timestamp).transpose()?)
650 .execute(active.connection_mut())
651 .await
652 .map_err(err_map!(Backend, "Error inserting new entry"))?;
653 if done.rows_affected() == 0 {
654 return Err(err_msg!(Duplicate, "Duplicate entry"));
655 }
656 done.last_insert_rowid()
657 } else {
658 trace!("Update entry");
659 let row_id: i64 = sqlx::query_scalar(UPDATE_QUERY)
660 .bind(active.profile_id)
661 .bind(kind as i16)
662 .bind(enc_category)
663 .bind(enc_name)
664 .bind(enc_value)
665 .bind(expiry_ms.map(expiry_timestamp).transpose()?)
666 .fetch_one(active.connection_mut())
667 .await
668 .map_err(|_| err_msg!(NotFound, "Error updating existing entry"))?;
669 sqlx::query(TAG_DELETE_QUERY)
670 .bind(row_id)
671 .execute(active.connection_mut())
672 .await
673 .map_err(err_map!(Backend, "Error removing existing entry tags"))?;
674 row_id
675 };
676 if let Some(tags) = enc_tags {
677 for tag in tags {
678 sqlx::query(TAG_INSERT_QUERY)
679 .bind(row_id)
680 .bind(&tag.name)
681 .bind(&tag.value)
682 .bind(tag.plaintext as i16)
683 .execute(active.connection_mut())
684 .await
685 .map_err(err_map!(Backend, "Error inserting entry tags"))?;
686 }
687 }
688 Ok(())
689}
690
691async fn perform_remove<'q>(
692 active: &mut DbSessionActive<'q, Sqlite>,
693 kind: EntryKind,
694 enc_category: &[u8],
695 enc_name: &[u8],
696 ignore_error: bool,
697) -> Result<(), Error> {
698 trace!("Remove entry");
699 let done = sqlx::query(DELETE_QUERY)
700 .bind(active.profile_id)
701 .bind(kind as i16)
702 .bind(enc_category)
703 .bind(enc_name)
704 .execute(active.connection_mut())
705 .await
706 .map_err(err_map!(Backend, "Error removing entry"))?;
707 if done.rows_affected() == 0 && !ignore_error {
708 Err(err_msg!(NotFound, "Entry not found"))
709 } else {
710 Ok(())
711 }
712}
713
714#[allow(clippy::too_many_arguments)]
715fn perform_scan(
716 mut active: DbSessionRef<'_, Sqlite>,
717 profile_id: ProfileId,
718 key: Arc<ProfileKey>,
719 kind: Option<EntryKind>,
720 category: Option<String>,
721 tag_filter: Option<TagFilter>,
722 offset: Option<i64>,
723 limit: Option<i64>,
724 order_by: Option<OrderBy>,
725 descending: bool,
726) -> impl Stream<Item = Result<Vec<EncScanEntry>, Error>> + '_ {
727 try_stream! {
728 let mut params = QueryParams::new();
729 params.push(profile_id);
730 params.push(kind.map(|k| k as i16));
731 let (enc_category, tag_filter) = unblock({
732 let key = key.clone();
733 let enc_category = category.as_ref().map(|c| ProfileKey::prepare_input(c.as_bytes()));
734 let params_len = params.len() + 1; move || {
736 Result::<_, Error>::Ok((
737 enc_category.map(|c| key.encrypt_entry_category(c)).transpose()?,
738 encode_tag_filter::<SqliteBackend>(tag_filter, &key, params_len)?
739 ))
740 }
741 }).await?;
742 params.push(enc_category);
743 let query = extend_query::<SqliteBackend>(SCAN_QUERY, &mut params, tag_filter, offset, limit, order_by, descending)?;
744
745 let mut batch = Vec::with_capacity(PAGE_SIZE);
746
747 let mut acquired = acquire_session(&mut active).await?;
748 let mut rows = sqlx::query_with(query.as_str(), params).fetch(acquired.connection_mut());
749 while let Some(row) = rows.try_next().await? {
750 let kind: u32 = row.try_get(1)?;
751 let kind = EntryKind::try_from(kind as usize)?;
752 batch.push(EncScanEntry {
753 kind, category: row.try_get(2)?, name: row.try_get(3)?, value: row.try_get(4)?, tags: row.try_get(5)?
754 });
755 if batch.len() == PAGE_SIZE {
756 yield batch.split_off(0);
757 }
758 }
759 drop(rows);
760 if active.is_owned() {
761 active.close(false).await?;
762 }
763 drop(active);
764
765 if !batch.is_empty() {
766 yield batch;
767 }
768 }
769}
770
771#[cfg(test)]
772mod tests {
773 use super::*;
774 use crate::backend::db_utils::replace_arg_placeholders;
775 use crate::future::block_on;
776 use crate::protect::{generate_raw_store_key, StoreKeyMethod};
777
778 #[test]
779 fn sqlite_check_expiry_timestamp() {
780 block_on(async {
781 let key = generate_raw_store_key(None)?;
782 let db = SqliteStoreOptions::in_memory()
783 .provision(StoreKeyMethod::RawKey, key, None, false)
784 .await?;
785 let ts = expiry_timestamp(1000).unwrap();
786 let check = sqlx::query("SELECT datetime('now'), ?1, datetime(?1) > datetime('now')")
787 .bind(ts)
788 .fetch_one(&db.conn_pool)
789 .await?;
790 let now: String = check.try_get(0)?;
791 let cmp_ts: String = check.try_get(1)?;
792 let cmp: bool = check.try_get(2)?;
793 if !cmp {
794 panic!("now ({}) > expiry timestamp ({})", now, cmp_ts);
795 }
796 Result::<_, Error>::Ok(())
797 })
798 .unwrap();
799 }
800
801 #[test]
802 fn sqlite_check_expiry_timestamp_expired() {
803 block_on(async {
804 let key = generate_raw_store_key(None)?;
805 let db = SqliteStoreOptions::in_memory()
806 .provision(StoreKeyMethod::RawKey, key, None, false)
807 .await?;
808 let ts = expiry_timestamp(-1000).unwrap(); let check = sqlx::query("SELECT datetime('now'), ?1, datetime(?1) > datetime('now')")
810 .bind(ts)
811 .fetch_one(&db.conn_pool)
812 .await?;
813 let now: String = check.try_get(0)?;
814 let cmp_ts: String = check.try_get(1)?;
815 let cmp: bool = check.try_get(2)?;
816 if cmp {
817 panic!("now ({}) < expiry timestamp ({})", now, cmp_ts);
818 }
819 Result::<_, Error>::Ok(())
820 })
821 .unwrap();
822 }
823
824 #[test]
825 fn sqlite_query_placeholders() {
826 assert_eq!(
827 &replace_arg_placeholders::<SqliteBackend>("This $$ is $10 a $$ string!", 3),
828 "This ?3 is ?12 a ?5 string!",
829 );
830 assert_eq!(
831 &replace_arg_placeholders::<SqliteBackend>("This $a is a string!", 1),
832 "This $a is a string!",
833 );
834 }
835}