1use std::collections::BTreeMap;
2use std::fmt::{self, Debug, Formatter};
3use std::sync::Arc;
4
5use async_stream::try_stream;
6use futures_lite::{
7 pin,
8 stream::{Stream, StreamExt},
9};
10
11use sqlx::{
12 pool::PoolConnection,
13 sqlite::{Sqlite, SqlitePool},
14 Acquire, Database, Error as SqlxError, Row, TransactionManager,
15};
16
17use super::{
18 db_utils::{
19 decode_tags, decrypt_scan_batch, encode_profile_key, encode_tag_filter, expiry_timestamp,
20 extend_query, prepare_tags, random_profile_name, Connection, DbSession, DbSessionActive,
21 DbSessionRef, DbSessionTxn, EncScanEntry, ExtDatabase, QueryParams, QueryPrepare,
22 PAGE_SIZE,
23 },
24 Backend, BackendSession,
25};
26use crate::{
27 backend::OrderBy,
28 entry::{EncEntryTag, Entry, EntryKind, EntryOperation, EntryTag, Scan, TagFilter},
29 error::Error,
30 future::{unblock, BoxFuture},
31 protect::{EntryEncryptor, KeyCache, PassKey, ProfileId, ProfileKey, StoreKeyMethod},
32};
33
34mod provision;
35pub use provision::SqliteStoreOptions;
36
37const CONFIG_FETCH_QUERY: &str = "SELECT value FROM config WHERE name = ?1";
38const CONFIG_UPDATE_QUERY: &str = "INSERT OR REPLACE INTO config (name, value) VALUES (?1, ?2)";
39const COUNT_QUERY: &str = "SELECT COUNT(*) FROM items i
40 WHERE profile_id = ?1
41 AND (kind = ?2 OR ?2 IS NULL)
42 AND (category = ?3 OR ?3 IS NULL)
43 AND (expiry IS NULL OR DATETIME(expiry) > DATETIME('now'))";
44const DELETE_QUERY: &str = "DELETE FROM items
45 WHERE profile_id = ?1 AND kind = ?2 AND category = ?3 AND name = ?4";
46const FETCH_QUERY: &str = "SELECT i.id, i.value,
47 (SELECT GROUP_CONCAT(it.plaintext || ':' || HEX(it.name) || ':' || HEX(it.value))
48 FROM items_tags it WHERE it.item_id = i.id) AS tags
49 FROM items i WHERE i.profile_id = ?1 AND i.kind = ?2
50 AND i.category = ?3 AND i.name = ?4
51 AND (i.expiry IS NULL OR DATETIME(i.expiry) > DATETIME('now'))";
52const INSERT_QUERY: &str =
53 "INSERT OR IGNORE INTO items (profile_id, kind, category, name, value, expiry)
54 VALUES (?1, ?2, ?3, ?4, ?5, ?6)";
55const UPDATE_QUERY: &str = "UPDATE items SET value=?5, expiry=?6 WHERE profile_id=?1 AND kind=?2
56 AND category=?3 AND name=?4 RETURNING id";
57const SCAN_QUERY: &str = "SELECT i.id, i.kind, i.category, i.name, i.value,
58 (SELECT GROUP_CONCAT(it.plaintext || ':' || HEX(it.name) || ':' || HEX(it.value))
59 FROM items_tags it WHERE it.item_id = i.id) AS tags
60 FROM items i WHERE i.profile_id = ?1
61 AND (i.kind = ?2 OR ?2 IS NULL)
62 AND (i.category = ?3 OR ?3 IS NULL)
63 AND (i.expiry IS NULL OR DATETIME(i.expiry) > DATETIME('now'))";
64const DELETE_ALL_QUERY: &str = "DELETE FROM items AS i
65 WHERE i.profile_id = ?1
66 AND (i.kind = ?2 OR ?2 IS NULL)
67 AND (i.category = ?3 OR ?3 IS NULL)";
68const TAG_INSERT_QUERY: &str = "INSERT INTO items_tags
69 (item_id, name, value, plaintext) VALUES (?1, ?2, ?3, ?4)";
70const TAG_DELETE_QUERY: &str = "DELETE FROM items_tags
71 WHERE item_id=?1";
72
73pub struct SqliteBackend {
75 conn_pool: SqlitePool,
76 active_profile: String,
77 key_cache: Arc<KeyCache>,
78 path: String,
79}
80
81impl SqliteBackend {
82 pub(crate) fn new(
83 conn_pool: SqlitePool,
84 active_profile: String,
85 key_cache: KeyCache,
86 path: String,
87 ) -> Self {
88 Self {
89 conn_pool,
90 active_profile,
91 key_cache: Arc::new(key_cache),
92 path,
93 }
94 }
95}
96
97impl Debug for SqliteBackend {
98 fn fmt(&self, f: &mut Formatter<'_>) -> fmt::Result {
99 f.debug_struct("SqliteStore")
100 .field("active_profile", &self.active_profile)
101 .field("path", &self.path)
102 .finish()
103 }
104}
105
106impl QueryPrepare for SqliteBackend {
107 type DB = Sqlite;
108}
109
110impl Backend for SqliteBackend {
111 type Session = DbSession<Sqlite>;
112
113 fn create_profile(&self, name: Option<String>) -> BoxFuture<'_, Result<String, Error>> {
114 let name = name.unwrap_or_else(random_profile_name);
115 Box::pin(async move {
116 let store_key = self.key_cache.store_key.clone();
117 let (profile_key, enc_key) = unblock(move || {
118 let profile_key = ProfileKey::new()?;
119 let enc_key = encode_profile_key(&profile_key, &store_key)?;
120 Result::<_, Error>::Ok((profile_key, enc_key))
121 })
122 .await?;
123 let mut conn = self.conn_pool.acquire().await?;
124 let done =
125 sqlx::query("INSERT OR IGNORE INTO profiles (name, profile_key) VALUES (?1, ?2)")
126 .bind(&name)
127 .bind(enc_key)
128 .execute(conn.as_mut())
129 .await?;
130 conn.return_to_pool().await;
131 if done.rows_affected() == 0 {
132 return Err(err_msg!(Duplicate, "Duplicate profile name"));
133 }
134 self.key_cache
135 .add_profile(
136 name.clone(),
137 done.last_insert_rowid(),
138 Arc::new(profile_key),
139 )
140 .await;
141 Ok(name)
142 })
143 }
144
145 fn get_active_profile(&self) -> String {
146 self.active_profile.clone()
147 }
148
149 fn get_default_profile(&self) -> BoxFuture<'_, Result<String, Error>> {
150 Box::pin(async move {
151 let mut conn = self.conn_pool.acquire().await?;
152 let profile: Option<String> = sqlx::query_scalar(CONFIG_FETCH_QUERY)
153 .bind("default_profile")
154 .fetch_one(conn.as_mut())
155 .await
156 .map_err(err_map!(Backend, "Error fetching default profile name"))?;
157 conn.return_to_pool().await;
158 Ok(profile.unwrap_or_default())
159 })
160 }
161
162 fn set_default_profile(&self, profile: String) -> BoxFuture<'_, Result<(), Error>> {
163 Box::pin(async move {
164 let mut conn = self.conn_pool.acquire().await?;
165 sqlx::query(CONFIG_UPDATE_QUERY)
166 .bind("default_profile")
167 .bind(profile)
168 .execute(conn.as_mut())
169 .await
170 .map_err(err_map!(Backend, "Error setting default profile name"))?;
171 conn.return_to_pool().await;
172 Ok(())
173 })
174 }
175
176 fn list_profiles(&self) -> BoxFuture<'_, Result<Vec<String>, Error>> {
177 Box::pin(async move {
178 let mut conn = self.conn_pool.acquire().await?;
179 let rows = sqlx::query("SELECT name FROM profiles")
180 .fetch_all(conn.as_mut())
181 .await
182 .map_err(err_map!(Backend, "Error fetching profile list"))?;
183 conn.return_to_pool().await;
184 let names = rows.into_iter().flat_map(|r| r.try_get(0)).collect();
185 Ok(names)
186 })
187 }
188
189 fn remove_profile(&self, name: String) -> BoxFuture<'_, Result<bool, Error>> {
190 Box::pin(async move {
191 let mut conn = self.conn_pool.acquire().await?;
192 let ret = sqlx::query("DELETE FROM profiles WHERE name=?")
193 .bind(&name)
194 .execute(conn.as_mut())
195 .await
196 .map_err(err_map!(Backend, "Error removing profile"))?
197 .rows_affected()
198 != 0;
199 conn.return_to_pool().await;
200 self.key_cache.clear_profile(&name).await;
201 Ok(ret)
202 })
203 }
204
205 fn rename_profile(
206 &self,
207 from_name: String,
208 to_name: String,
209 ) -> BoxFuture<'_, Result<bool, Error>> {
210 Box::pin(async move {
211 let mut conn = self.conn_pool.acquire().await?;
212 let ret = sqlx::query("UPDATE profiles SET name=$1 WHERE name=$2")
213 .bind(&to_name)
214 .bind(&from_name)
215 .execute(conn.as_mut())
216 .await
217 .map_err(err_map!(Backend, "Error renaming profile"))?
218 .rows_affected()
219 != 0;
220 conn.return_to_pool().await;
221 self.key_cache.clear_profile(&from_name).await;
222 Ok(ret)
223 })
224 }
225
226 fn rekey(
227 &mut self,
228 method: StoreKeyMethod,
229 pass_key: PassKey<'_>,
230 ) -> BoxFuture<'_, Result<(), Error>> {
231 let pass_key = pass_key.into_owned();
232 Box::pin(async move {
233 let (store_key, store_key_ref) = unblock(move || method.resolve(pass_key)).await?;
234 let store_key = Arc::new(store_key);
235 let mut conn = self.conn_pool.acquire().await?;
236 let mut txn = conn.begin().await?;
237 let mut rows = sqlx::query("SELECT id, profile_key FROM profiles").fetch(txn.as_mut());
238 let mut upd_keys = BTreeMap::<ProfileId, Vec<u8>>::new();
239 while let Some(row) = rows.next().await {
240 let row = row?;
241 let pid = row.try_get(0)?;
242 let enc_key = row.try_get(1)?;
243 let profile_key = self.key_cache.load_key(enc_key).await?;
244 let upd_key = unblock({
245 let store_key = store_key.clone();
246 move || encode_profile_key(&profile_key, &store_key)
247 })
248 .await?;
249 upd_keys.insert(pid, upd_key);
250 }
251 drop(rows);
252 for (pid, key) in upd_keys {
253 if sqlx::query("UPDATE profiles SET profile_key=?1 WHERE id=?2")
254 .bind(key)
255 .bind(pid)
256 .execute(txn.as_mut())
257 .await?
258 .rows_affected()
259 != 1
260 {
261 return Err(err_msg!(Backend, "Error updating profile key"));
262 }
263 }
264 if sqlx::query("UPDATE config SET value=?1 WHERE name='key'")
265 .bind(store_key_ref.into_uri())
266 .execute(txn.as_mut())
267 .await?
268 .rows_affected()
269 != 1
270 {
271 return Err(err_msg!(Backend, "Error updating store key"));
272 }
273 txn.commit().await?;
274 conn.return_to_pool().await;
275 self.key_cache = Arc::new(KeyCache::new(store_key));
276 Ok(())
277 })
278 }
279
280 fn scan(
281 &self,
282 profile: Option<String>,
283 kind: Option<EntryKind>,
284 category: Option<String>,
285 tag_filter: Option<TagFilter>,
286 offset: Option<i64>,
287 limit: Option<i64>,
288 order_by: Option<OrderBy>,
289 descending: bool,
290 ) -> BoxFuture<'_, Result<Scan<'static, Entry>, Error>> {
291 Box::pin(async move {
292 let session = self.session(profile, false)?;
293 let mut active = session.owned_ref();
294 let (profile_id, key) = acquire_key(&mut active).await?;
295 let scan = perform_scan(
296 active,
297 profile_id,
298 key.clone(),
299 kind,
300 category.clone(),
301 tag_filter,
302 offset,
303 limit,
304 order_by,
305 descending,
306 );
307 let stream = scan.then(move |enc_rows| {
308 let category = category.clone();
309 let key = key.clone();
310 unblock(move || decrypt_scan_batch(category, enc_rows?, &key))
311 });
312 Ok(Scan::new(stream, PAGE_SIZE))
313 })
314 }
315
316 fn session(&self, profile: Option<String>, transaction: bool) -> Result<Self::Session, Error> {
317 Ok(DbSession::new(
318 self.conn_pool.clone(),
319 self.key_cache.clone(),
320 profile.unwrap_or_else(|| self.active_profile.clone()),
321 transaction,
322 ))
323 }
324
325 fn close(&self) -> BoxFuture<'_, Result<(), Error>> {
326 Box::pin(async move {
327 self.conn_pool.close().await;
328 Ok(())
329 })
330 }
331}
332
333impl BackendSession for DbSession<Sqlite> {
334 fn count<'q>(
335 &'q mut self,
336 kind: Option<EntryKind>,
337 category: Option<&'q str>,
338 tag_filter: Option<TagFilter>,
339 ) -> BoxFuture<'q, Result<i64, Error>> {
340 let enc_category = category.map(|c| ProfileKey::prepare_input(c.as_bytes()));
341
342 Box::pin(async move {
343 let (profile_id, key) = acquire_key(&mut *self).await?;
344 let mut params = QueryParams::new();
345 params.push(profile_id);
346 params.push(kind.map(|k| k as i16));
347 let (enc_category, tag_filter) = unblock({
348 let params_len = params.len() + 1; move || {
350 Result::<_, Error>::Ok((
351 enc_category
352 .map(|c| key.encrypt_entry_category(c))
353 .transpose()?,
354 encode_tag_filter::<SqliteBackend>(tag_filter, &key, params_len)?,
355 ))
356 }
357 })
358 .await?;
359 params.push(enc_category);
360 let query = extend_query::<SqliteBackend>(
361 COUNT_QUERY,
362 &mut params,
363 tag_filter,
364 None,
365 None,
366 None,
367 false,
368 )?;
369 let mut active = acquire_session(&mut *self).await?;
370 let count = sqlx::query_scalar_with(query.as_str(), params)
371 .fetch_one(active.connection_mut())
372 .await
373 .map_err(err_map!(Backend, "Error performing count query"))?;
374 Ok(count)
375 })
376 }
377
378 fn fetch(
379 &mut self,
380 kind: EntryKind,
381 category: &str,
382 name: &str,
383 _for_update: bool,
384 ) -> BoxFuture<'_, Result<Option<Entry>, Error>> {
385 let category = category.to_string();
386 let name = name.to_string();
387
388 Box::pin(async move {
389 let (profile_id, key) = acquire_key(&mut *self).await?;
390 let (enc_category, enc_name) = unblock({
391 let key = key.clone();
392 let category = ProfileKey::prepare_input(category.as_bytes());
393 let name = ProfileKey::prepare_input(name.as_bytes());
394 move || {
395 Result::<_, Error>::Ok((
396 key.encrypt_entry_category(category)?,
397 key.encrypt_entry_name(name)?,
398 ))
399 }
400 })
401 .await?;
402 let mut active = acquire_session(&mut *self).await?;
403 if let Some(row) = sqlx::query(FETCH_QUERY)
404 .bind(profile_id)
405 .bind(kind as i16)
406 .bind(enc_category)
407 .bind(enc_name)
408 .fetch_optional(active.connection_mut())
409 .await
410 .map_err(err_map!(Backend, "Error performing fetch query"))?
411 {
412 let value = row.try_get(1)?;
413 let tags = row.try_get(2)?;
414 let (category, name, value, tags) = unblock(move || {
415 let value = key.decrypt_entry_value(category.as_ref(), name.as_ref(), value)?;
416 let enc_tags = decode_tags(tags)
417 .map_err(|_| err_msg!(Unexpected, "Error decoding entry tags"))?;
418 let tags = key.decrypt_entry_tags(enc_tags)?;
419 Result::<_, Error>::Ok((category, name, value, tags))
420 })
421 .await?;
422 Ok(Some(Entry::new(kind, category, name, value, tags)))
423 } else {
424 Ok(None)
425 }
426 })
427 }
428
429 fn fetch_all<'q>(
430 &'q mut self,
431 kind: Option<EntryKind>,
432 category: Option<&'q str>,
433 tag_filter: Option<TagFilter>,
434 limit: Option<i64>,
435 order_by: Option<OrderBy>,
436 descending: bool,
437 _for_update: bool,
438 ) -> BoxFuture<'q, Result<Vec<Entry>, Error>> {
439 let category = category.map(|c| c.to_string());
440 Box::pin(async move {
441 let mut active = self.borrow_mut();
442 let (profile_id, key) = acquire_key(&mut active).await?;
443 let scan = perform_scan(
444 active,
445 profile_id,
446 key.clone(),
447 kind,
448 category.clone(),
449 tag_filter,
450 None,
451 limit,
452 order_by,
453 descending,
454 );
455 pin!(scan);
456 let mut enc_rows = vec![];
457 while let Some(rows) = scan.try_next().await? {
458 enc_rows.extend(rows)
459 }
460 unblock(move || decrypt_scan_batch(category, enc_rows, &key)).await
461 })
462 }
463
464 fn remove_all<'q>(
465 &'q mut self,
466 kind: Option<EntryKind>,
467 category: Option<&'q str>,
468 tag_filter: Option<TagFilter>,
469 ) -> BoxFuture<'q, Result<i64, Error>> {
470 let enc_category = category.map(|c| ProfileKey::prepare_input(c.as_bytes()));
471
472 Box::pin(async move {
473 let (profile_id, key) = acquire_key(&mut *self).await?;
474 let mut params = QueryParams::new();
475 params.push(profile_id);
476 params.push(kind.map(|k| k as i16));
477 let (enc_category, tag_filter) = unblock({
478 let params_len = params.len() + 1; move || {
480 Result::<_, Error>::Ok((
481 enc_category
482 .map(|c| key.encrypt_entry_category(c))
483 .transpose()?,
484 encode_tag_filter::<SqliteBackend>(tag_filter, &key, params_len)?,
485 ))
486 }
487 })
488 .await?;
489 params.push(enc_category);
490 let query = extend_query::<SqliteBackend>(
491 DELETE_ALL_QUERY,
492 &mut params,
493 tag_filter,
494 None,
495 None,
496 None,
497 false,
498 )?;
499
500 let mut active = acquire_session(&mut *self).await?;
501 let removed = sqlx::query_with(query.as_str(), params)
502 .execute(active.connection_mut())
503 .await?
504 .rows_affected();
505 Ok(removed as i64)
506 })
507 }
508
509 fn update<'q>(
510 &'q mut self,
511 kind: EntryKind,
512 operation: EntryOperation,
513 category: &'q str,
514 name: &'q str,
515 value: Option<&'q [u8]>,
516 tags: Option<&'q [EntryTag]>,
517 expiry_ms: Option<i64>,
518 ) -> BoxFuture<'q, Result<(), Error>> {
519 let category = ProfileKey::prepare_input(category.as_bytes());
520 let name = ProfileKey::prepare_input(name.as_bytes());
521
522 match operation {
523 op @ EntryOperation::Insert | op @ EntryOperation::Replace => {
524 let value = ProfileKey::prepare_input(value.unwrap_or_default());
525 let tags = tags.map(prepare_tags);
526 Box::pin(async move {
527 let (_, key) = acquire_key(&mut *self).await?;
528 let (enc_category, enc_name, enc_value, enc_tags) = unblock(move || {
529 let enc_value =
530 key.encrypt_entry_value(category.as_ref(), name.as_ref(), value)?;
531 Result::<_, Error>::Ok((
532 key.encrypt_entry_category(category)?,
533 key.encrypt_entry_name(name)?,
534 enc_value,
535 tags.transpose()?
536 .map(|t| key.encrypt_entry_tags(t))
537 .transpose()?,
538 ))
539 })
540 .await?;
541 let mut active = acquire_session(&mut *self).await?;
542 let mut txn = active.as_transaction().await?;
543 perform_insert(
544 &mut txn,
545 kind,
546 &enc_category,
547 &enc_name,
548 &enc_value,
549 enc_tags,
550 expiry_ms,
551 op == EntryOperation::Insert,
552 )
553 .await?;
554 txn.commit().await?;
555 Ok(())
556 })
557 }
558
559 EntryOperation::Remove => Box::pin(async move {
560 let (_, key) = acquire_key(&mut *self).await?;
561 let (enc_category, enc_name) = unblock(move || {
562 Result::<_, Error>::Ok((
563 key.encrypt_entry_category(category)?,
564 key.encrypt_entry_name(name)?,
565 ))
566 })
567 .await?;
568 let mut active = acquire_session(&mut *self).await?;
569 perform_remove(&mut active, kind, &enc_category, &enc_name, false).await
570 }),
571 }
572 }
573
574 fn ping(&mut self) -> BoxFuture<'_, Result<(), Error>> {
575 Box::pin(async move {
576 let mut sess = acquire_session(&mut *self).await?;
577 let count: i64 = sqlx::query_scalar("SELECT COUNT(*) FROM profiles WHERE id=$1")
578 .bind(sess.profile_id)
579 .fetch_one(sess.connection_mut())
580 .await
581 .map_err(err_map!(Backend, "Error pinging session"))?;
582 if count == 0 {
583 Err(err_msg!(NotFound, "Session profile has been removed"))
584 } else {
585 Ok(())
586 }
587 })
588 }
589
590 fn close(&mut self, commit: bool) -> BoxFuture<'_, Result<(), Error>> {
591 Box::pin(self.close(commit))
592 }
593}
594
595impl ExtDatabase for Sqlite {
596 fn start_transaction(
597 conn: &mut Connection<Self>,
598 nested: bool,
599 ) -> BoxFuture<'_, std::result::Result<(), SqlxError>> {
600 Box::pin(async move {
604 <Sqlite as Database>::TransactionManager::begin(conn, None).await?;
605 if !nested {
606 sqlx::query("DELETE FROM config WHERE 0")
608 .execute(conn)
609 .await?;
610 }
611 Ok(())
612 })
613 }
614}
615
616async fn acquire_key(
617 session: &mut DbSession<Sqlite>,
618) -> Result<(ProfileId, Arc<ProfileKey>), Error> {
619 acquire_session(session).await?;
620 Ok(session.profile_and_key().unwrap())
621}
622
623async fn acquire_session(
624 session: &mut DbSession<Sqlite>,
625) -> Result<DbSessionActive<'_, Sqlite>, Error> {
626 session.make_active(&resolve_profile_key).await
627}
628
629async fn resolve_profile_key(
630 conn: &mut PoolConnection<Sqlite>,
631 cache: Arc<KeyCache>,
632 profile: String,
633 _in_txn: bool,
634) -> Result<(ProfileId, Arc<ProfileKey>), Error> {
635 if let Some((pid, key)) = cache.get_profile(profile.as_str()).await {
636 Ok((pid, key))
637 } else if let Some(row) = sqlx::query("SELECT id, profile_key FROM profiles WHERE name=?1")
638 .bind(profile.as_str())
639 .fetch_optional(conn.as_mut())
640 .await
641 .map_err(err_map!(Backend, "Error fetching profile key"))?
642 {
643 let pid = row.try_get(0)?;
644 let key = Arc::new(cache.load_key(row.try_get(1)?).await?);
645 cache.add_profile(profile, pid, key.clone()).await;
646 Ok((pid, key))
647 } else {
648 Err(err_msg!(NotFound, "Profile not found"))
649 }
650}
651
652#[allow(clippy::too_many_arguments)]
653async fn perform_insert(
654 active: &mut DbSessionTxn<'_, Sqlite>,
655 kind: EntryKind,
656 enc_category: &[u8],
657 enc_name: &[u8],
658 enc_value: &[u8],
659 enc_tags: Option<Vec<EncEntryTag>>,
660 expiry_ms: Option<i64>,
661 new_row: bool,
662) -> Result<(), Error> {
663 let row_id = if new_row {
664 trace!("Insert entry");
665 let done = sqlx::query(INSERT_QUERY)
666 .bind(active.profile_id)
667 .bind(kind as i16)
668 .bind(enc_category)
669 .bind(enc_name)
670 .bind(enc_value)
671 .bind(expiry_ms.map(expiry_timestamp).transpose()?)
672 .execute(active.connection_mut())
673 .await
674 .map_err(err_map!(Backend, "Error inserting new entry"))?;
675 if done.rows_affected() == 0 {
676 return Err(err_msg!(Duplicate, "Duplicate entry"));
677 }
678 done.last_insert_rowid()
679 } else {
680 trace!("Update entry");
681 let row_id: i64 = sqlx::query_scalar(UPDATE_QUERY)
682 .bind(active.profile_id)
683 .bind(kind as i16)
684 .bind(enc_category)
685 .bind(enc_name)
686 .bind(enc_value)
687 .bind(expiry_ms.map(expiry_timestamp).transpose()?)
688 .fetch_one(active.connection_mut())
689 .await
690 .map_err(|_| err_msg!(NotFound, "Error updating existing entry"))?;
691 sqlx::query(TAG_DELETE_QUERY)
692 .bind(row_id)
693 .execute(active.connection_mut())
694 .await
695 .map_err(err_map!(Backend, "Error removing existing entry tags"))?;
696 row_id
697 };
698 if let Some(tags) = enc_tags {
699 for tag in tags {
700 sqlx::query(TAG_INSERT_QUERY)
701 .bind(row_id)
702 .bind(&tag.name)
703 .bind(&tag.value)
704 .bind(tag.plaintext as i16)
705 .execute(active.connection_mut())
706 .await
707 .map_err(err_map!(Backend, "Error inserting entry tags"))?;
708 }
709 }
710 Ok(())
711}
712
713async fn perform_remove(
714 active: &mut DbSessionActive<'_, Sqlite>,
715 kind: EntryKind,
716 enc_category: &[u8],
717 enc_name: &[u8],
718 ignore_error: bool,
719) -> Result<(), Error> {
720 trace!("Remove entry");
721 let done = sqlx::query(DELETE_QUERY)
722 .bind(active.profile_id)
723 .bind(kind as i16)
724 .bind(enc_category)
725 .bind(enc_name)
726 .execute(active.connection_mut())
727 .await
728 .map_err(err_map!(Backend, "Error removing entry"))?;
729 if done.rows_affected() == 0 && !ignore_error {
730 Err(err_msg!(NotFound, "Entry not found"))
731 } else {
732 Ok(())
733 }
734}
735
736#[allow(clippy::too_many_arguments)]
737fn perform_scan(
738 mut active: DbSessionRef<'_, Sqlite>,
739 profile_id: ProfileId,
740 key: Arc<ProfileKey>,
741 kind: Option<EntryKind>,
742 category: Option<String>,
743 tag_filter: Option<TagFilter>,
744 offset: Option<i64>,
745 limit: Option<i64>,
746 order_by: Option<OrderBy>,
747 descending: bool,
748) -> impl Stream<Item = Result<Vec<EncScanEntry>, Error>> + '_ {
749 try_stream! {
750 let mut params = QueryParams::new();
751 params.push(profile_id);
752 params.push(kind.map(|k| k as i16));
753 let (enc_category, tag_filter) = unblock({
754 let key = key.clone();
755 let enc_category = category.as_ref().map(|c| ProfileKey::prepare_input(c.as_bytes()));
756 let params_len = params.len() + 1; move || {
758 Result::<_, Error>::Ok((
759 enc_category.map(|c| key.encrypt_entry_category(c)).transpose()?,
760 encode_tag_filter::<SqliteBackend>(tag_filter, &key, params_len)?
761 ))
762 }
763 }).await?;
764 params.push(enc_category);
765 let query = extend_query::<SqliteBackend>(SCAN_QUERY, &mut params, tag_filter, offset, limit, order_by, descending)?;
766
767 let mut batch = Vec::with_capacity(PAGE_SIZE);
768
769 let mut acquired = acquire_session(&mut active).await?;
770 let mut rows = sqlx::query_with(query.as_str(), params).fetch(acquired.connection_mut());
771 while let Some(row) = rows.try_next().await? {
772 let kind: u32 = row.try_get(1)?;
773 let kind = EntryKind::try_from(kind as usize)?;
774 batch.push(EncScanEntry {
775 kind, category: row.try_get(2)?, name: row.try_get(3)?, value: row.try_get(4)?, tags: row.try_get(5)?
776 });
777 if batch.len() == PAGE_SIZE {
778 yield batch.split_off(0);
779 }
780 }
781 drop(rows);
782 if active.is_owned() {
783 active.close(false).await?;
784 }
785 drop(active);
786
787 if !batch.is_empty() {
788 yield batch;
789 }
790 }
791}
792
793#[cfg(test)]
794mod tests {
795 use super::*;
796 use crate::backend::db_utils::replace_arg_placeholders;
797 use crate::future::block_on;
798 use crate::protect::{generate_raw_store_key, StoreKeyMethod};
799
800 #[test]
801 fn sqlite_check_expiry_timestamp() {
802 block_on(async {
803 let key = generate_raw_store_key(None)?;
804 let db = SqliteStoreOptions::in_memory()
805 .provision(StoreKeyMethod::RawKey, key, None, false)
806 .await?;
807 let ts = expiry_timestamp(1000).unwrap();
808 let check = sqlx::query("SELECT datetime('now'), ?1, datetime(?1) > datetime('now')")
809 .bind(ts)
810 .fetch_one(&db.conn_pool)
811 .await?;
812 let now: String = check.try_get(0)?;
813 let cmp_ts: String = check.try_get(1)?;
814 let cmp: bool = check.try_get(2)?;
815 if !cmp {
816 panic!("now ({}) > expiry timestamp ({})", now, cmp_ts);
817 }
818 Result::<_, Error>::Ok(())
819 })
820 .unwrap();
821 }
822
823 #[test]
824 fn sqlite_check_expiry_timestamp_expired() {
825 block_on(async {
826 let key = generate_raw_store_key(None)?;
827 let db = SqliteStoreOptions::in_memory()
828 .provision(StoreKeyMethod::RawKey, key, None, false)
829 .await?;
830 let ts = expiry_timestamp(-1000).unwrap(); let check = sqlx::query("SELECT datetime('now'), ?1, datetime(?1) > datetime('now')")
832 .bind(ts)
833 .fetch_one(&db.conn_pool)
834 .await?;
835 let now: String = check.try_get(0)?;
836 let cmp_ts: String = check.try_get(1)?;
837 let cmp: bool = check.try_get(2)?;
838 if cmp {
839 panic!("now ({}) < expiry timestamp ({})", now, cmp_ts);
840 }
841 Result::<_, Error>::Ok(())
842 })
843 .unwrap();
844 }
845
846 #[test]
847 fn sqlite_query_placeholders() {
848 assert_eq!(
849 &replace_arg_placeholders::<SqliteBackend>("This $$ is $10 a $$ string!", 3),
850 "This ?3 is ?12 a ?5 string!",
851 );
852 assert_eq!(
853 &replace_arg_placeholders::<SqliteBackend>("This $a is a string!", 1),
854 "This $a is a string!",
855 );
856 }
857}