askar_storage/backend/sqlite/
mod.rs

1use std::collections::BTreeMap;
2use std::fmt::{self, Debug, Formatter};
3use std::sync::Arc;
4
5use async_stream::try_stream;
6use futures_lite::{
7    pin,
8    stream::{Stream, StreamExt},
9};
10
11use sqlx::{
12    pool::PoolConnection,
13    sqlite::{Sqlite, SqlitePool},
14    Acquire, Database, Error as SqlxError, Row, TransactionManager,
15};
16
17use super::{
18    db_utils::{
19        decode_tags, decrypt_scan_batch, encode_profile_key, encode_tag_filter, expiry_timestamp,
20        extend_query, prepare_tags, random_profile_name, Connection, DbSession, DbSessionActive,
21        DbSessionRef, DbSessionTxn, EncScanEntry, ExtDatabase, QueryParams, QueryPrepare,
22        PAGE_SIZE,
23    },
24    Backend, BackendSession,
25};
26use crate::{
27    backend::OrderBy,
28    entry::{EncEntryTag, Entry, EntryKind, EntryOperation, EntryTag, Scan, TagFilter},
29    error::Error,
30    future::{unblock, BoxFuture},
31    protect::{EntryEncryptor, KeyCache, PassKey, ProfileId, ProfileKey, StoreKeyMethod},
32};
33
34mod provision;
35pub use provision::SqliteStoreOptions;
36
37const CONFIG_FETCH_QUERY: &str = "SELECT value FROM config WHERE name = ?1";
38const CONFIG_UPDATE_QUERY: &str = "INSERT OR REPLACE INTO config (name, value) VALUES (?1, ?2)";
39const COUNT_QUERY: &str = "SELECT COUNT(*) FROM items i
40    WHERE profile_id = ?1
41    AND (kind = ?2 OR ?2 IS NULL)
42    AND (category = ?3 OR ?3 IS NULL)
43    AND (expiry IS NULL OR DATETIME(expiry) > DATETIME('now'))";
44const DELETE_QUERY: &str = "DELETE FROM items
45    WHERE profile_id = ?1 AND kind = ?2 AND category = ?3 AND name = ?4";
46const FETCH_QUERY: &str = "SELECT i.id, i.value,
47    (SELECT GROUP_CONCAT(it.plaintext || ':' || HEX(it.name) || ':' || HEX(it.value))
48        FROM items_tags it WHERE it.item_id = i.id) AS tags
49    FROM items i WHERE i.profile_id = ?1 AND i.kind = ?2
50    AND i.category = ?3 AND i.name = ?4
51    AND (i.expiry IS NULL OR DATETIME(i.expiry) > DATETIME('now'))";
52const INSERT_QUERY: &str =
53    "INSERT OR IGNORE INTO items (profile_id, kind, category, name, value, expiry)
54    VALUES (?1, ?2, ?3, ?4, ?5, ?6)";
55const UPDATE_QUERY: &str = "UPDATE items SET value=?5, expiry=?6 WHERE profile_id=?1 AND kind=?2
56    AND category=?3 AND name=?4 RETURNING id";
57const SCAN_QUERY: &str = "SELECT i.id, i.kind, i.category, i.name, i.value,
58    (SELECT GROUP_CONCAT(it.plaintext || ':' || HEX(it.name) || ':' || HEX(it.value))
59        FROM items_tags it WHERE it.item_id = i.id) AS tags
60    FROM items i WHERE i.profile_id = ?1
61    AND (i.kind = ?2 OR ?2 IS NULL)
62    AND (i.category = ?3 OR ?3 IS NULL)
63    AND (i.expiry IS NULL OR DATETIME(i.expiry) > DATETIME('now'))";
64const DELETE_ALL_QUERY: &str = "DELETE FROM items AS i
65    WHERE i.profile_id = ?1
66    AND (i.kind = ?2 OR ?2 IS NULL)
67    AND (i.category = ?3 OR ?3 IS NULL)";
68const TAG_INSERT_QUERY: &str = "INSERT INTO items_tags
69    (item_id, name, value, plaintext) VALUES (?1, ?2, ?3, ?4)";
70const TAG_DELETE_QUERY: &str = "DELETE FROM items_tags
71    WHERE item_id=?1";
72
73/// A Sqlite database store
74pub struct SqliteBackend {
75    conn_pool: SqlitePool,
76    active_profile: String,
77    key_cache: Arc<KeyCache>,
78    path: String,
79}
80
81impl SqliteBackend {
82    pub(crate) fn new(
83        conn_pool: SqlitePool,
84        active_profile: String,
85        key_cache: KeyCache,
86        path: String,
87    ) -> Self {
88        Self {
89            conn_pool,
90            active_profile,
91            key_cache: Arc::new(key_cache),
92            path,
93        }
94    }
95}
96
97impl Debug for SqliteBackend {
98    fn fmt(&self, f: &mut Formatter<'_>) -> fmt::Result {
99        f.debug_struct("SqliteStore")
100            .field("active_profile", &self.active_profile)
101            .field("path", &self.path)
102            .finish()
103    }
104}
105
106impl QueryPrepare for SqliteBackend {
107    type DB = Sqlite;
108}
109
110impl Backend for SqliteBackend {
111    type Session = DbSession<Sqlite>;
112
113    fn create_profile(&self, name: Option<String>) -> BoxFuture<'_, Result<String, Error>> {
114        let name = name.unwrap_or_else(random_profile_name);
115        Box::pin(async move {
116            let store_key = self.key_cache.store_key.clone();
117            let (profile_key, enc_key) = unblock(move || {
118                let profile_key = ProfileKey::new()?;
119                let enc_key = encode_profile_key(&profile_key, &store_key)?;
120                Result::<_, Error>::Ok((profile_key, enc_key))
121            })
122            .await?;
123            let mut conn = self.conn_pool.acquire().await?;
124            let done =
125                sqlx::query("INSERT OR IGNORE INTO profiles (name, profile_key) VALUES (?1, ?2)")
126                    .bind(&name)
127                    .bind(enc_key)
128                    .execute(conn.as_mut())
129                    .await?;
130            conn.return_to_pool().await;
131            if done.rows_affected() == 0 {
132                return Err(err_msg!(Duplicate, "Duplicate profile name"));
133            }
134            self.key_cache
135                .add_profile(
136                    name.clone(),
137                    done.last_insert_rowid(),
138                    Arc::new(profile_key),
139                )
140                .await;
141            Ok(name)
142        })
143    }
144
145    fn get_active_profile(&self) -> String {
146        self.active_profile.clone()
147    }
148
149    fn get_default_profile(&self) -> BoxFuture<'_, Result<String, Error>> {
150        Box::pin(async move {
151            let mut conn = self.conn_pool.acquire().await?;
152            let profile: Option<String> = sqlx::query_scalar(CONFIG_FETCH_QUERY)
153                .bind("default_profile")
154                .fetch_one(conn.as_mut())
155                .await
156                .map_err(err_map!(Backend, "Error fetching default profile name"))?;
157            conn.return_to_pool().await;
158            Ok(profile.unwrap_or_default())
159        })
160    }
161
162    fn set_default_profile(&self, profile: String) -> BoxFuture<'_, Result<(), Error>> {
163        Box::pin(async move {
164            let mut conn = self.conn_pool.acquire().await?;
165            sqlx::query(CONFIG_UPDATE_QUERY)
166                .bind("default_profile")
167                .bind(profile)
168                .execute(conn.as_mut())
169                .await
170                .map_err(err_map!(Backend, "Error setting default profile name"))?;
171            conn.return_to_pool().await;
172            Ok(())
173        })
174    }
175
176    fn list_profiles(&self) -> BoxFuture<'_, Result<Vec<String>, Error>> {
177        Box::pin(async move {
178            let mut conn = self.conn_pool.acquire().await?;
179            let rows = sqlx::query("SELECT name FROM profiles")
180                .fetch_all(conn.as_mut())
181                .await
182                .map_err(err_map!(Backend, "Error fetching profile list"))?;
183            conn.return_to_pool().await;
184            let names = rows.into_iter().flat_map(|r| r.try_get(0)).collect();
185            Ok(names)
186        })
187    }
188
189    fn remove_profile(&self, name: String) -> BoxFuture<'_, Result<bool, Error>> {
190        Box::pin(async move {
191            let mut conn = self.conn_pool.acquire().await?;
192            let ret = sqlx::query("DELETE FROM profiles WHERE name=?")
193                .bind(&name)
194                .execute(conn.as_mut())
195                .await
196                .map_err(err_map!(Backend, "Error removing profile"))?
197                .rows_affected()
198                != 0;
199            conn.return_to_pool().await;
200            self.key_cache.clear_profile(&name).await;
201            Ok(ret)
202        })
203    }
204
205    fn rename_profile(
206        &self,
207        from_name: String,
208        to_name: String,
209    ) -> BoxFuture<'_, Result<bool, Error>> {
210        Box::pin(async move {
211            let mut conn = self.conn_pool.acquire().await?;
212            let ret = sqlx::query("UPDATE profiles SET name=$1 WHERE name=$2")
213                .bind(&to_name)
214                .bind(&from_name)
215                .execute(conn.as_mut())
216                .await
217                .map_err(err_map!(Backend, "Error renaming profile"))?
218                .rows_affected()
219                != 0;
220            conn.return_to_pool().await;
221            self.key_cache.clear_profile(&from_name).await;
222            Ok(ret)
223        })
224    }
225
226    fn rekey(
227        &mut self,
228        method: StoreKeyMethod,
229        pass_key: PassKey<'_>,
230    ) -> BoxFuture<'_, Result<(), Error>> {
231        let pass_key = pass_key.into_owned();
232        Box::pin(async move {
233            let (store_key, store_key_ref) = unblock(move || method.resolve(pass_key)).await?;
234            let store_key = Arc::new(store_key);
235            let mut conn = self.conn_pool.acquire().await?;
236            let mut txn = conn.begin().await?;
237            let mut rows = sqlx::query("SELECT id, profile_key FROM profiles").fetch(txn.as_mut());
238            let mut upd_keys = BTreeMap::<ProfileId, Vec<u8>>::new();
239            while let Some(row) = rows.next().await {
240                let row = row?;
241                let pid = row.try_get(0)?;
242                let enc_key = row.try_get(1)?;
243                let profile_key = self.key_cache.load_key(enc_key).await?;
244                let upd_key = unblock({
245                    let store_key = store_key.clone();
246                    move || encode_profile_key(&profile_key, &store_key)
247                })
248                .await?;
249                upd_keys.insert(pid, upd_key);
250            }
251            drop(rows);
252            for (pid, key) in upd_keys {
253                if sqlx::query("UPDATE profiles SET profile_key=?1 WHERE id=?2")
254                    .bind(key)
255                    .bind(pid)
256                    .execute(txn.as_mut())
257                    .await?
258                    .rows_affected()
259                    != 1
260                {
261                    return Err(err_msg!(Backend, "Error updating profile key"));
262                }
263            }
264            if sqlx::query("UPDATE config SET value=?1 WHERE name='key'")
265                .bind(store_key_ref.into_uri())
266                .execute(txn.as_mut())
267                .await?
268                .rows_affected()
269                != 1
270            {
271                return Err(err_msg!(Backend, "Error updating store key"));
272            }
273            txn.commit().await?;
274            conn.return_to_pool().await;
275            self.key_cache = Arc::new(KeyCache::new(store_key));
276            Ok(())
277        })
278    }
279
280    fn scan(
281        &self,
282        profile: Option<String>,
283        kind: Option<EntryKind>,
284        category: Option<String>,
285        tag_filter: Option<TagFilter>,
286        offset: Option<i64>,
287        limit: Option<i64>,
288        order_by: Option<OrderBy>,
289        descending: bool,
290    ) -> BoxFuture<'_, Result<Scan<'static, Entry>, Error>> {
291        Box::pin(async move {
292            let session = self.session(profile, false)?;
293            let mut active = session.owned_ref();
294            let (profile_id, key) = acquire_key(&mut active).await?;
295            let scan = perform_scan(
296                active,
297                profile_id,
298                key.clone(),
299                kind,
300                category.clone(),
301                tag_filter,
302                offset,
303                limit,
304                order_by,
305                descending,
306            );
307            let stream = scan.then(move |enc_rows| {
308                let category = category.clone();
309                let key = key.clone();
310                unblock(move || decrypt_scan_batch(category, enc_rows?, &key))
311            });
312            Ok(Scan::new(stream, PAGE_SIZE))
313        })
314    }
315
316    fn session(&self, profile: Option<String>, transaction: bool) -> Result<Self::Session, Error> {
317        Ok(DbSession::new(
318            self.conn_pool.clone(),
319            self.key_cache.clone(),
320            profile.unwrap_or_else(|| self.active_profile.clone()),
321            transaction,
322        ))
323    }
324
325    fn close(&self) -> BoxFuture<'_, Result<(), Error>> {
326        Box::pin(async move {
327            self.conn_pool.close().await;
328            Ok(())
329        })
330    }
331}
332
333impl BackendSession for DbSession<Sqlite> {
334    fn count<'q>(
335        &'q mut self,
336        kind: Option<EntryKind>,
337        category: Option<&'q str>,
338        tag_filter: Option<TagFilter>,
339    ) -> BoxFuture<'q, Result<i64, Error>> {
340        let enc_category = category.map(|c| ProfileKey::prepare_input(c.as_bytes()));
341
342        Box::pin(async move {
343            let (profile_id, key) = acquire_key(&mut *self).await?;
344            let mut params = QueryParams::new();
345            params.push(profile_id);
346            params.push(kind.map(|k| k as i16));
347            let (enc_category, tag_filter) = unblock({
348                let params_len = params.len() + 1; // plus category
349                move || {
350                    Result::<_, Error>::Ok((
351                        enc_category
352                            .map(|c| key.encrypt_entry_category(c))
353                            .transpose()?,
354                        encode_tag_filter::<SqliteBackend>(tag_filter, &key, params_len)?,
355                    ))
356                }
357            })
358            .await?;
359            params.push(enc_category);
360            let query = extend_query::<SqliteBackend>(
361                COUNT_QUERY,
362                &mut params,
363                tag_filter,
364                None,
365                None,
366                None,
367                false,
368            )?;
369            let mut active = acquire_session(&mut *self).await?;
370            let count = sqlx::query_scalar_with(query.as_str(), params)
371                .fetch_one(active.connection_mut())
372                .await
373                .map_err(err_map!(Backend, "Error performing count query"))?;
374            Ok(count)
375        })
376    }
377
378    fn fetch(
379        &mut self,
380        kind: EntryKind,
381        category: &str,
382        name: &str,
383        _for_update: bool,
384    ) -> BoxFuture<'_, Result<Option<Entry>, Error>> {
385        let category = category.to_string();
386        let name = name.to_string();
387
388        Box::pin(async move {
389            let (profile_id, key) = acquire_key(&mut *self).await?;
390            let (enc_category, enc_name) = unblock({
391                let key = key.clone();
392                let category = ProfileKey::prepare_input(category.as_bytes());
393                let name = ProfileKey::prepare_input(name.as_bytes());
394                move || {
395                    Result::<_, Error>::Ok((
396                        key.encrypt_entry_category(category)?,
397                        key.encrypt_entry_name(name)?,
398                    ))
399                }
400            })
401            .await?;
402            let mut active = acquire_session(&mut *self).await?;
403            if let Some(row) = sqlx::query(FETCH_QUERY)
404                .bind(profile_id)
405                .bind(kind as i16)
406                .bind(enc_category)
407                .bind(enc_name)
408                .fetch_optional(active.connection_mut())
409                .await
410                .map_err(err_map!(Backend, "Error performing fetch query"))?
411            {
412                let value = row.try_get(1)?;
413                let tags = row.try_get(2)?;
414                let (category, name, value, tags) = unblock(move || {
415                    let value = key.decrypt_entry_value(category.as_ref(), name.as_ref(), value)?;
416                    let enc_tags = decode_tags(tags)
417                        .map_err(|_| err_msg!(Unexpected, "Error decoding entry tags"))?;
418                    let tags = key.decrypt_entry_tags(enc_tags)?;
419                    Result::<_, Error>::Ok((category, name, value, tags))
420                })
421                .await?;
422                Ok(Some(Entry::new(kind, category, name, value, tags)))
423            } else {
424                Ok(None)
425            }
426        })
427    }
428
429    fn fetch_all<'q>(
430        &'q mut self,
431        kind: Option<EntryKind>,
432        category: Option<&'q str>,
433        tag_filter: Option<TagFilter>,
434        limit: Option<i64>,
435        order_by: Option<OrderBy>,
436        descending: bool,
437        _for_update: bool,
438    ) -> BoxFuture<'q, Result<Vec<Entry>, Error>> {
439        let category = category.map(|c| c.to_string());
440        Box::pin(async move {
441            let mut active = self.borrow_mut();
442            let (profile_id, key) = acquire_key(&mut active).await?;
443            let scan = perform_scan(
444                active,
445                profile_id,
446                key.clone(),
447                kind,
448                category.clone(),
449                tag_filter,
450                None,
451                limit,
452                order_by,
453                descending,
454            );
455            pin!(scan);
456            let mut enc_rows = vec![];
457            while let Some(rows) = scan.try_next().await? {
458                enc_rows.extend(rows)
459            }
460            unblock(move || decrypt_scan_batch(category, enc_rows, &key)).await
461        })
462    }
463
464    fn remove_all<'q>(
465        &'q mut self,
466        kind: Option<EntryKind>,
467        category: Option<&'q str>,
468        tag_filter: Option<TagFilter>,
469    ) -> BoxFuture<'q, Result<i64, Error>> {
470        let enc_category = category.map(|c| ProfileKey::prepare_input(c.as_bytes()));
471
472        Box::pin(async move {
473            let (profile_id, key) = acquire_key(&mut *self).await?;
474            let mut params = QueryParams::new();
475            params.push(profile_id);
476            params.push(kind.map(|k| k as i16));
477            let (enc_category, tag_filter) = unblock({
478                let params_len = params.len() + 1; // plus category
479                move || {
480                    Result::<_, Error>::Ok((
481                        enc_category
482                            .map(|c| key.encrypt_entry_category(c))
483                            .transpose()?,
484                        encode_tag_filter::<SqliteBackend>(tag_filter, &key, params_len)?,
485                    ))
486                }
487            })
488            .await?;
489            params.push(enc_category);
490            let query = extend_query::<SqliteBackend>(
491                DELETE_ALL_QUERY,
492                &mut params,
493                tag_filter,
494                None,
495                None,
496                None,
497                false,
498            )?;
499
500            let mut active = acquire_session(&mut *self).await?;
501            let removed = sqlx::query_with(query.as_str(), params)
502                .execute(active.connection_mut())
503                .await?
504                .rows_affected();
505            Ok(removed as i64)
506        })
507    }
508
509    fn update<'q>(
510        &'q mut self,
511        kind: EntryKind,
512        operation: EntryOperation,
513        category: &'q str,
514        name: &'q str,
515        value: Option<&'q [u8]>,
516        tags: Option<&'q [EntryTag]>,
517        expiry_ms: Option<i64>,
518    ) -> BoxFuture<'q, Result<(), Error>> {
519        let category = ProfileKey::prepare_input(category.as_bytes());
520        let name = ProfileKey::prepare_input(name.as_bytes());
521
522        match operation {
523            op @ EntryOperation::Insert | op @ EntryOperation::Replace => {
524                let value = ProfileKey::prepare_input(value.unwrap_or_default());
525                let tags = tags.map(prepare_tags);
526                Box::pin(async move {
527                    let (_, key) = acquire_key(&mut *self).await?;
528                    let (enc_category, enc_name, enc_value, enc_tags) = unblock(move || {
529                        let enc_value =
530                            key.encrypt_entry_value(category.as_ref(), name.as_ref(), value)?;
531                        Result::<_, Error>::Ok((
532                            key.encrypt_entry_category(category)?,
533                            key.encrypt_entry_name(name)?,
534                            enc_value,
535                            tags.transpose()?
536                                .map(|t| key.encrypt_entry_tags(t))
537                                .transpose()?,
538                        ))
539                    })
540                    .await?;
541                    let mut active = acquire_session(&mut *self).await?;
542                    let mut txn = active.as_transaction().await?;
543                    perform_insert(
544                        &mut txn,
545                        kind,
546                        &enc_category,
547                        &enc_name,
548                        &enc_value,
549                        enc_tags,
550                        expiry_ms,
551                        op == EntryOperation::Insert,
552                    )
553                    .await?;
554                    txn.commit().await?;
555                    Ok(())
556                })
557            }
558
559            EntryOperation::Remove => Box::pin(async move {
560                let (_, key) = acquire_key(&mut *self).await?;
561                let (enc_category, enc_name) = unblock(move || {
562                    Result::<_, Error>::Ok((
563                        key.encrypt_entry_category(category)?,
564                        key.encrypt_entry_name(name)?,
565                    ))
566                })
567                .await?;
568                let mut active = acquire_session(&mut *self).await?;
569                perform_remove(&mut active, kind, &enc_category, &enc_name, false).await
570            }),
571        }
572    }
573
574    fn ping(&mut self) -> BoxFuture<'_, Result<(), Error>> {
575        Box::pin(async move {
576            let mut sess = acquire_session(&mut *self).await?;
577            let count: i64 = sqlx::query_scalar("SELECT COUNT(*) FROM profiles WHERE id=$1")
578                .bind(sess.profile_id)
579                .fetch_one(sess.connection_mut())
580                .await
581                .map_err(err_map!(Backend, "Error pinging session"))?;
582            if count == 0 {
583                Err(err_msg!(NotFound, "Session profile has been removed"))
584            } else {
585                Ok(())
586            }
587        })
588    }
589
590    fn close(&mut self, commit: bool) -> BoxFuture<'_, Result<(), Error>> {
591        Box::pin(self.close(commit))
592    }
593}
594
595impl ExtDatabase for Sqlite {
596    fn start_transaction(
597        conn: &mut Connection<Self>,
598        nested: bool,
599    ) -> BoxFuture<'_, std::result::Result<(), SqlxError>> {
600        // FIXME - this is a horrible workaround because there is currently
601        // no good way to start an immediate transaction with sqlx. Without this
602        // adjustment, updates will run into 'database is locked' errors.
603        Box::pin(async move {
604            <Sqlite as Database>::TransactionManager::begin(conn, None).await?;
605            if !nested {
606                // a no-op write transaction
607                sqlx::query("DELETE FROM config WHERE 0")
608                    .execute(conn)
609                    .await?;
610            }
611            Ok(())
612        })
613    }
614}
615
616async fn acquire_key(
617    session: &mut DbSession<Sqlite>,
618) -> Result<(ProfileId, Arc<ProfileKey>), Error> {
619    acquire_session(session).await?;
620    Ok(session.profile_and_key().unwrap())
621}
622
623async fn acquire_session(
624    session: &mut DbSession<Sqlite>,
625) -> Result<DbSessionActive<'_, Sqlite>, Error> {
626    session.make_active(&resolve_profile_key).await
627}
628
629async fn resolve_profile_key(
630    conn: &mut PoolConnection<Sqlite>,
631    cache: Arc<KeyCache>,
632    profile: String,
633    _in_txn: bool,
634) -> Result<(ProfileId, Arc<ProfileKey>), Error> {
635    if let Some((pid, key)) = cache.get_profile(profile.as_str()).await {
636        Ok((pid, key))
637    } else if let Some(row) = sqlx::query("SELECT id, profile_key FROM profiles WHERE name=?1")
638        .bind(profile.as_str())
639        .fetch_optional(conn.as_mut())
640        .await
641        .map_err(err_map!(Backend, "Error fetching profile key"))?
642    {
643        let pid = row.try_get(0)?;
644        let key = Arc::new(cache.load_key(row.try_get(1)?).await?);
645        cache.add_profile(profile, pid, key.clone()).await;
646        Ok((pid, key))
647    } else {
648        Err(err_msg!(NotFound, "Profile not found"))
649    }
650}
651
652#[allow(clippy::too_many_arguments)]
653async fn perform_insert(
654    active: &mut DbSessionTxn<'_, Sqlite>,
655    kind: EntryKind,
656    enc_category: &[u8],
657    enc_name: &[u8],
658    enc_value: &[u8],
659    enc_tags: Option<Vec<EncEntryTag>>,
660    expiry_ms: Option<i64>,
661    new_row: bool,
662) -> Result<(), Error> {
663    let row_id = if new_row {
664        trace!("Insert entry");
665        let done = sqlx::query(INSERT_QUERY)
666            .bind(active.profile_id)
667            .bind(kind as i16)
668            .bind(enc_category)
669            .bind(enc_name)
670            .bind(enc_value)
671            .bind(expiry_ms.map(expiry_timestamp).transpose()?)
672            .execute(active.connection_mut())
673            .await
674            .map_err(err_map!(Backend, "Error inserting new entry"))?;
675        if done.rows_affected() == 0 {
676            return Err(err_msg!(Duplicate, "Duplicate entry"));
677        }
678        done.last_insert_rowid()
679    } else {
680        trace!("Update entry");
681        let row_id: i64 = sqlx::query_scalar(UPDATE_QUERY)
682            .bind(active.profile_id)
683            .bind(kind as i16)
684            .bind(enc_category)
685            .bind(enc_name)
686            .bind(enc_value)
687            .bind(expiry_ms.map(expiry_timestamp).transpose()?)
688            .fetch_one(active.connection_mut())
689            .await
690            .map_err(|_| err_msg!(NotFound, "Error updating existing entry"))?;
691        sqlx::query(TAG_DELETE_QUERY)
692            .bind(row_id)
693            .execute(active.connection_mut())
694            .await
695            .map_err(err_map!(Backend, "Error removing existing entry tags"))?;
696        row_id
697    };
698    if let Some(tags) = enc_tags {
699        for tag in tags {
700            sqlx::query(TAG_INSERT_QUERY)
701                .bind(row_id)
702                .bind(&tag.name)
703                .bind(&tag.value)
704                .bind(tag.plaintext as i16)
705                .execute(active.connection_mut())
706                .await
707                .map_err(err_map!(Backend, "Error inserting entry tags"))?;
708        }
709    }
710    Ok(())
711}
712
713async fn perform_remove(
714    active: &mut DbSessionActive<'_, Sqlite>,
715    kind: EntryKind,
716    enc_category: &[u8],
717    enc_name: &[u8],
718    ignore_error: bool,
719) -> Result<(), Error> {
720    trace!("Remove entry");
721    let done = sqlx::query(DELETE_QUERY)
722        .bind(active.profile_id)
723        .bind(kind as i16)
724        .bind(enc_category)
725        .bind(enc_name)
726        .execute(active.connection_mut())
727        .await
728        .map_err(err_map!(Backend, "Error removing entry"))?;
729    if done.rows_affected() == 0 && !ignore_error {
730        Err(err_msg!(NotFound, "Entry not found"))
731    } else {
732        Ok(())
733    }
734}
735
736#[allow(clippy::too_many_arguments)]
737fn perform_scan(
738    mut active: DbSessionRef<'_, Sqlite>,
739    profile_id: ProfileId,
740    key: Arc<ProfileKey>,
741    kind: Option<EntryKind>,
742    category: Option<String>,
743    tag_filter: Option<TagFilter>,
744    offset: Option<i64>,
745    limit: Option<i64>,
746    order_by: Option<OrderBy>,
747    descending: bool,
748) -> impl Stream<Item = Result<Vec<EncScanEntry>, Error>> + '_ {
749    try_stream! {
750        let mut params = QueryParams::new();
751        params.push(profile_id);
752        params.push(kind.map(|k| k as i16));
753        let (enc_category, tag_filter) = unblock({
754            let key = key.clone();
755            let enc_category = category.as_ref().map(|c| ProfileKey::prepare_input(c.as_bytes()));
756            let params_len = params.len() + 1; // plus category
757            move || {
758                Result::<_, Error>::Ok((
759                    enc_category.map(|c| key.encrypt_entry_category(c)).transpose()?,
760                    encode_tag_filter::<SqliteBackend>(tag_filter, &key, params_len)?
761                ))
762            }
763        }).await?;
764        params.push(enc_category);
765        let query = extend_query::<SqliteBackend>(SCAN_QUERY, &mut params, tag_filter, offset, limit, order_by, descending)?;
766
767        let mut batch = Vec::with_capacity(PAGE_SIZE);
768
769        let mut acquired = acquire_session(&mut active).await?;
770        let mut rows = sqlx::query_with(query.as_str(), params).fetch(acquired.connection_mut());
771        while let Some(row) = rows.try_next().await? {
772            let kind: u32 = row.try_get(1)?;
773            let kind = EntryKind::try_from(kind as usize)?;
774            batch.push(EncScanEntry {
775                kind, category: row.try_get(2)?, name: row.try_get(3)?, value: row.try_get(4)?, tags: row.try_get(5)?
776            });
777            if batch.len() == PAGE_SIZE {
778                yield batch.split_off(0);
779            }
780        }
781        drop(rows);
782        if active.is_owned() {
783            active.close(false).await?;
784        }
785        drop(active);
786
787        if !batch.is_empty() {
788            yield batch;
789        }
790    }
791}
792
793#[cfg(test)]
794mod tests {
795    use super::*;
796    use crate::backend::db_utils::replace_arg_placeholders;
797    use crate::future::block_on;
798    use crate::protect::{generate_raw_store_key, StoreKeyMethod};
799
800    #[test]
801    fn sqlite_check_expiry_timestamp() {
802        block_on(async {
803            let key = generate_raw_store_key(None)?;
804            let db = SqliteStoreOptions::in_memory()
805                .provision(StoreKeyMethod::RawKey, key, None, false)
806                .await?;
807            let ts = expiry_timestamp(1000).unwrap();
808            let check = sqlx::query("SELECT datetime('now'), ?1, datetime(?1) > datetime('now')")
809                .bind(ts)
810                .fetch_one(&db.conn_pool)
811                .await?;
812            let now: String = check.try_get(0)?;
813            let cmp_ts: String = check.try_get(1)?;
814            let cmp: bool = check.try_get(2)?;
815            if !cmp {
816                panic!("now ({}) > expiry timestamp ({})", now, cmp_ts);
817            }
818            Result::<_, Error>::Ok(())
819        })
820        .unwrap();
821    }
822
823    #[test]
824    fn sqlite_check_expiry_timestamp_expired() {
825        block_on(async {
826            let key = generate_raw_store_key(None)?;
827            let db = SqliteStoreOptions::in_memory()
828                .provision(StoreKeyMethod::RawKey, key, None, false)
829                .await?;
830            let ts = expiry_timestamp(-1000).unwrap(); // put it to be already expired
831            let check = sqlx::query("SELECT datetime('now'), ?1, datetime(?1) > datetime('now')")
832                .bind(ts)
833                .fetch_one(&db.conn_pool)
834                .await?;
835            let now: String = check.try_get(0)?;
836            let cmp_ts: String = check.try_get(1)?;
837            let cmp: bool = check.try_get(2)?;
838            if cmp {
839                panic!("now ({}) < expiry timestamp ({})", now, cmp_ts);
840            }
841            Result::<_, Error>::Ok(())
842        })
843        .unwrap();
844    }
845
846    #[test]
847    fn sqlite_query_placeholders() {
848        assert_eq!(
849            &replace_arg_placeholders::<SqliteBackend>("This $$ is $10 a $$ string!", 3),
850            "This ?3 is ?12 a ?5 string!",
851        );
852        assert_eq!(
853            &replace_arg_placeholders::<SqliteBackend>("This $a is a string!", 1),
854            "This $a is a string!",
855        );
856    }
857}