askar_storage/backend/sqlite/
mod.rs

1use std::collections::BTreeMap;
2use std::fmt::{self, Debug, Formatter};
3use std::sync::Arc;
4
5use async_stream::try_stream;
6use futures_lite::{
7    pin,
8    stream::{Stream, StreamExt},
9};
10
11use sqlx::{
12    pool::PoolConnection,
13    sqlite::{Sqlite, SqlitePool},
14    Acquire, Database, Error as SqlxError, Row, TransactionManager,
15};
16
17use super::{
18    db_utils::{
19        decode_tags, decrypt_scan_batch, encode_profile_key, encode_tag_filter, expiry_timestamp,
20        extend_query, prepare_tags, random_profile_name, Connection, DbSession, DbSessionActive,
21        DbSessionRef, DbSessionTxn, EncScanEntry, ExtDatabase, QueryParams, QueryPrepare,
22        PAGE_SIZE,
23    },
24    Backend, BackendSession,
25};
26use crate::{
27    backend::OrderBy,
28    entry::{EncEntryTag, Entry, EntryKind, EntryOperation, EntryTag, Scan, TagFilter},
29    error::Error,
30    future::{unblock, BoxFuture},
31    protect::{EntryEncryptor, KeyCache, PassKey, ProfileId, ProfileKey, StoreKeyMethod},
32};
33
34mod provision;
35pub use provision::SqliteStoreOptions;
36
37const CONFIG_FETCH_QUERY: &str = "SELECT value FROM config WHERE name = ?1";
38const CONFIG_UPDATE_QUERY: &str = "INSERT OR REPLACE INTO config (name, value) VALUES (?1, ?2)";
39const COUNT_QUERY: &str = "SELECT COUNT(*) FROM items i
40    WHERE profile_id = ?1
41    AND (kind = ?2 OR ?2 IS NULL)
42    AND (category = ?3 OR ?3 IS NULL)
43    AND (expiry IS NULL OR DATETIME(expiry) > DATETIME('now'))";
44const DELETE_QUERY: &str = "DELETE FROM items
45    WHERE profile_id = ?1 AND kind = ?2 AND category = ?3 AND name = ?4";
46const FETCH_QUERY: &str = "SELECT i.id, i.value,
47    (SELECT GROUP_CONCAT(it.plaintext || ':' || HEX(it.name) || ':' || HEX(it.value))
48        FROM items_tags it WHERE it.item_id = i.id) AS tags
49    FROM items i WHERE i.profile_id = ?1 AND i.kind = ?2
50    AND i.category = ?3 AND i.name = ?4
51    AND (i.expiry IS NULL OR DATETIME(i.expiry) > DATETIME('now'))";
52const INSERT_QUERY: &str =
53    "INSERT OR IGNORE INTO items (profile_id, kind, category, name, value, expiry)
54    VALUES (?1, ?2, ?3, ?4, ?5, ?6)";
55const UPDATE_QUERY: &str = "UPDATE items SET value=?5, expiry=?6 WHERE profile_id=?1 AND kind=?2
56    AND category=?3 AND name=?4 RETURNING id";
57const SCAN_QUERY: &str = "SELECT i.id, i.kind, i.category, i.name, i.value,
58    (SELECT GROUP_CONCAT(it.plaintext || ':' || HEX(it.name) || ':' || HEX(it.value))
59        FROM items_tags it WHERE it.item_id = i.id) AS tags
60    FROM items i WHERE i.profile_id = ?1
61    AND (i.kind = ?2 OR ?2 IS NULL)
62    AND (i.category = ?3 OR ?3 IS NULL)
63    AND (i.expiry IS NULL OR DATETIME(i.expiry) > DATETIME('now'))";
64const DELETE_ALL_QUERY: &str = "DELETE FROM items AS i
65    WHERE i.profile_id = ?1
66    AND (i.kind = ?2 OR ?2 IS NULL)
67    AND (i.category = ?3 OR ?3 IS NULL)";
68const TAG_INSERT_QUERY: &str = "INSERT INTO items_tags
69    (item_id, name, value, plaintext) VALUES (?1, ?2, ?3, ?4)";
70const TAG_DELETE_QUERY: &str = "DELETE FROM items_tags
71    WHERE item_id=?1";
72
73/// A Sqlite database store
74pub struct SqliteBackend {
75    conn_pool: SqlitePool,
76    active_profile: String,
77    key_cache: Arc<KeyCache>,
78    path: String,
79}
80
81impl SqliteBackend {
82    pub(crate) fn new(
83        conn_pool: SqlitePool,
84        active_profile: String,
85        key_cache: KeyCache,
86        path: String,
87    ) -> Self {
88        Self {
89            conn_pool,
90            active_profile,
91            key_cache: Arc::new(key_cache),
92            path,
93        }
94    }
95}
96
97impl Debug for SqliteBackend {
98    fn fmt(&self, f: &mut Formatter<'_>) -> fmt::Result {
99        f.debug_struct("SqliteStore")
100            .field("active_profile", &self.active_profile)
101            .field("path", &self.path)
102            .finish()
103    }
104}
105
106impl QueryPrepare for SqliteBackend {
107    type DB = Sqlite;
108}
109
110impl Backend for SqliteBackend {
111    type Session = DbSession<Sqlite>;
112
113    fn create_profile(&self, name: Option<String>) -> BoxFuture<'_, Result<String, Error>> {
114        let name = name.unwrap_or_else(random_profile_name);
115        Box::pin(async move {
116            let store_key = self.key_cache.store_key.clone();
117            let (profile_key, enc_key) = unblock(move || {
118                let profile_key = ProfileKey::new()?;
119                let enc_key = encode_profile_key(&profile_key, &store_key)?;
120                Result::<_, Error>::Ok((profile_key, enc_key))
121            })
122            .await?;
123            let mut conn = self.conn_pool.acquire().await?;
124            let done =
125                sqlx::query("INSERT OR IGNORE INTO profiles (name, profile_key) VALUES (?1, ?2)")
126                    .bind(&name)
127                    .bind(enc_key)
128                    .execute(conn.as_mut())
129                    .await?;
130            conn.return_to_pool().await;
131            if done.rows_affected() == 0 {
132                return Err(err_msg!(Duplicate, "Duplicate profile name"));
133            }
134            self.key_cache
135                .add_profile(
136                    name.clone(),
137                    done.last_insert_rowid(),
138                    Arc::new(profile_key),
139                )
140                .await;
141            Ok(name)
142        })
143    }
144
145    fn get_active_profile(&self) -> String {
146        self.active_profile.clone()
147    }
148
149    fn get_default_profile(&self) -> BoxFuture<'_, Result<String, Error>> {
150        Box::pin(async move {
151            let mut conn = self.conn_pool.acquire().await?;
152            let profile: Option<String> = sqlx::query_scalar(CONFIG_FETCH_QUERY)
153                .bind("default_profile")
154                .fetch_one(conn.as_mut())
155                .await
156                .map_err(err_map!(Backend, "Error fetching default profile name"))?;
157            conn.return_to_pool().await;
158            Ok(profile.unwrap_or_default())
159        })
160    }
161
162    fn set_default_profile(&self, profile: String) -> BoxFuture<'_, Result<(), Error>> {
163        Box::pin(async move {
164            let mut conn = self.conn_pool.acquire().await?;
165            sqlx::query(CONFIG_UPDATE_QUERY)
166                .bind("default_profile")
167                .bind(profile)
168                .execute(conn.as_mut())
169                .await
170                .map_err(err_map!(Backend, "Error setting default profile name"))?;
171            conn.return_to_pool().await;
172            Ok(())
173        })
174    }
175
176    fn list_profiles(&self) -> BoxFuture<'_, Result<Vec<String>, Error>> {
177        Box::pin(async move {
178            let mut conn = self.conn_pool.acquire().await?;
179            let rows = sqlx::query("SELECT name FROM profiles")
180                .fetch_all(conn.as_mut())
181                .await
182                .map_err(err_map!(Backend, "Error fetching profile list"))?;
183            conn.return_to_pool().await;
184            let names = rows.into_iter().flat_map(|r| r.try_get(0)).collect();
185            Ok(names)
186        })
187    }
188
189    fn remove_profile(&self, name: String) -> BoxFuture<'_, Result<bool, Error>> {
190        Box::pin(async move {
191            let mut conn = self.conn_pool.acquire().await?;
192            let ret = sqlx::query("DELETE FROM profiles WHERE name=?")
193                .bind(&name)
194                .execute(conn.as_mut())
195                .await
196                .map_err(err_map!(Backend, "Error removing profile"))?
197                .rows_affected()
198                != 0;
199            conn.return_to_pool().await;
200            Ok(ret)
201        })
202    }
203
204    fn rekey(
205        &mut self,
206        method: StoreKeyMethod,
207        pass_key: PassKey<'_>,
208    ) -> BoxFuture<'_, Result<(), Error>> {
209        let pass_key = pass_key.into_owned();
210        Box::pin(async move {
211            let (store_key, store_key_ref) = unblock(move || method.resolve(pass_key)).await?;
212            let store_key = Arc::new(store_key);
213            let mut conn = self.conn_pool.acquire().await?;
214            let mut txn = conn.begin().await?;
215            let mut rows = sqlx::query("SELECT id, profile_key FROM profiles").fetch(txn.as_mut());
216            let mut upd_keys = BTreeMap::<ProfileId, Vec<u8>>::new();
217            while let Some(row) = rows.next().await {
218                let row = row?;
219                let pid = row.try_get(0)?;
220                let enc_key = row.try_get(1)?;
221                let profile_key = self.key_cache.load_key(enc_key).await?;
222                let upd_key = unblock({
223                    let store_key = store_key.clone();
224                    move || encode_profile_key(&profile_key, &store_key)
225                })
226                .await?;
227                upd_keys.insert(pid, upd_key);
228            }
229            drop(rows);
230            for (pid, key) in upd_keys {
231                if sqlx::query("UPDATE profiles SET profile_key=?1 WHERE id=?2")
232                    .bind(key)
233                    .bind(pid)
234                    .execute(txn.as_mut())
235                    .await?
236                    .rows_affected()
237                    != 1
238                {
239                    return Err(err_msg!(Backend, "Error updating profile key"));
240                }
241            }
242            if sqlx::query("UPDATE config SET value=?1 WHERE name='key'")
243                .bind(store_key_ref.into_uri())
244                .execute(txn.as_mut())
245                .await?
246                .rows_affected()
247                != 1
248            {
249                return Err(err_msg!(Backend, "Error updating store key"));
250            }
251            txn.commit().await?;
252            conn.return_to_pool().await;
253            self.key_cache = Arc::new(KeyCache::new(store_key));
254            Ok(())
255        })
256    }
257
258    fn scan(
259        &self,
260        profile: Option<String>,
261        kind: Option<EntryKind>,
262        category: Option<String>,
263        tag_filter: Option<TagFilter>,
264        offset: Option<i64>,
265        limit: Option<i64>,
266        order_by: Option<OrderBy>,
267        descending: bool,
268    ) -> BoxFuture<'_, Result<Scan<'static, Entry>, Error>> {
269        Box::pin(async move {
270            let session = self.session(profile, false)?;
271            let mut active = session.owned_ref();
272            let (profile_id, key) = acquire_key(&mut active).await?;
273            let scan = perform_scan(
274                active,
275                profile_id,
276                key.clone(),
277                kind,
278                category.clone(),
279                tag_filter,
280                offset,
281                limit,
282                order_by,
283                descending,
284            );
285            let stream = scan.then(move |enc_rows| {
286                let category = category.clone();
287                let key = key.clone();
288                unblock(move || decrypt_scan_batch(category, enc_rows?, &key))
289            });
290            Ok(Scan::new(stream, PAGE_SIZE))
291        })
292    }
293
294    fn session(&self, profile: Option<String>, transaction: bool) -> Result<Self::Session, Error> {
295        Ok(DbSession::new(
296            self.conn_pool.clone(),
297            self.key_cache.clone(),
298            profile.unwrap_or_else(|| self.active_profile.clone()),
299            transaction,
300        ))
301    }
302
303    fn close(&self) -> BoxFuture<'_, Result<(), Error>> {
304        Box::pin(async move {
305            self.conn_pool.close().await;
306            Ok(())
307        })
308    }
309}
310
311impl BackendSession for DbSession<Sqlite> {
312    fn count<'q>(
313        &'q mut self,
314        kind: Option<EntryKind>,
315        category: Option<&'q str>,
316        tag_filter: Option<TagFilter>,
317    ) -> BoxFuture<'q, Result<i64, Error>> {
318        let enc_category = category.map(|c| ProfileKey::prepare_input(c.as_bytes()));
319
320        Box::pin(async move {
321            let (profile_id, key) = acquire_key(&mut *self).await?;
322            let mut params = QueryParams::new();
323            params.push(profile_id);
324            params.push(kind.map(|k| k as i16));
325            let (enc_category, tag_filter) = unblock({
326                let params_len = params.len() + 1; // plus category
327                move || {
328                    Result::<_, Error>::Ok((
329                        enc_category
330                            .map(|c| key.encrypt_entry_category(c))
331                            .transpose()?,
332                        encode_tag_filter::<SqliteBackend>(tag_filter, &key, params_len)?,
333                    ))
334                }
335            })
336            .await?;
337            params.push(enc_category);
338            let query = extend_query::<SqliteBackend>(
339                COUNT_QUERY,
340                &mut params,
341                tag_filter,
342                None,
343                None,
344                None,
345                false,
346            )?;
347            let mut active = acquire_session(&mut *self).await?;
348            let count = sqlx::query_scalar_with(query.as_str(), params)
349                .fetch_one(active.connection_mut())
350                .await
351                .map_err(err_map!(Backend, "Error performing count query"))?;
352            Ok(count)
353        })
354    }
355
356    fn fetch(
357        &mut self,
358        kind: EntryKind,
359        category: &str,
360        name: &str,
361        _for_update: bool,
362    ) -> BoxFuture<'_, Result<Option<Entry>, Error>> {
363        let category = category.to_string();
364        let name = name.to_string();
365
366        Box::pin(async move {
367            let (profile_id, key) = acquire_key(&mut *self).await?;
368            let (enc_category, enc_name) = unblock({
369                let key = key.clone();
370                let category = ProfileKey::prepare_input(category.as_bytes());
371                let name = ProfileKey::prepare_input(name.as_bytes());
372                move || {
373                    Result::<_, Error>::Ok((
374                        key.encrypt_entry_category(category)?,
375                        key.encrypt_entry_name(name)?,
376                    ))
377                }
378            })
379            .await?;
380            let mut active = acquire_session(&mut *self).await?;
381            if let Some(row) = sqlx::query(FETCH_QUERY)
382                .bind(profile_id)
383                .bind(kind as i16)
384                .bind(enc_category)
385                .bind(enc_name)
386                .fetch_optional(active.connection_mut())
387                .await
388                .map_err(err_map!(Backend, "Error performing fetch query"))?
389            {
390                let value = row.try_get(1)?;
391                let tags = row.try_get(2)?;
392                let (category, name, value, tags) = unblock(move || {
393                    let value = key.decrypt_entry_value(category.as_ref(), name.as_ref(), value)?;
394                    let enc_tags = decode_tags(tags)
395                        .map_err(|_| err_msg!(Unexpected, "Error decoding entry tags"))?;
396                    let tags = key.decrypt_entry_tags(enc_tags)?;
397                    Result::<_, Error>::Ok((category, name, value, tags))
398                })
399                .await?;
400                Ok(Some(Entry::new(kind, category, name, value, tags)))
401            } else {
402                Ok(None)
403            }
404        })
405    }
406
407    fn fetch_all<'q>(
408        &'q mut self,
409        kind: Option<EntryKind>,
410        category: Option<&'q str>,
411        tag_filter: Option<TagFilter>,
412        limit: Option<i64>,
413        order_by: Option<OrderBy>,
414        descending: bool,
415        _for_update: bool,
416    ) -> BoxFuture<'q, Result<Vec<Entry>, Error>> {
417        let category = category.map(|c| c.to_string());
418        Box::pin(async move {
419            let mut active = self.borrow_mut();
420            let (profile_id, key) = acquire_key(&mut active).await?;
421            let scan = perform_scan(
422                active,
423                profile_id,
424                key.clone(),
425                kind,
426                category.clone(),
427                tag_filter,
428                None,
429                limit,
430                order_by,
431                descending,
432            );
433            pin!(scan);
434            let mut enc_rows = vec![];
435            while let Some(rows) = scan.try_next().await? {
436                enc_rows.extend(rows)
437            }
438            unblock(move || decrypt_scan_batch(category, enc_rows, &key)).await
439        })
440    }
441
442    fn remove_all<'q>(
443        &'q mut self,
444        kind: Option<EntryKind>,
445        category: Option<&'q str>,
446        tag_filter: Option<TagFilter>,
447    ) -> BoxFuture<'q, Result<i64, Error>> {
448        let enc_category = category.map(|c| ProfileKey::prepare_input(c.as_bytes()));
449
450        Box::pin(async move {
451            let (profile_id, key) = acquire_key(&mut *self).await?;
452            let mut params = QueryParams::new();
453            params.push(profile_id);
454            params.push(kind.map(|k| k as i16));
455            let (enc_category, tag_filter) = unblock({
456                let params_len = params.len() + 1; // plus category
457                move || {
458                    Result::<_, Error>::Ok((
459                        enc_category
460                            .map(|c| key.encrypt_entry_category(c))
461                            .transpose()?,
462                        encode_tag_filter::<SqliteBackend>(tag_filter, &key, params_len)?,
463                    ))
464                }
465            })
466            .await?;
467            params.push(enc_category);
468            let query = extend_query::<SqliteBackend>(
469                DELETE_ALL_QUERY,
470                &mut params,
471                tag_filter,
472                None,
473                None,
474                None,
475                false,
476            )?;
477
478            let mut active = acquire_session(&mut *self).await?;
479            let removed = sqlx::query_with(query.as_str(), params)
480                .execute(active.connection_mut())
481                .await?
482                .rows_affected();
483            Ok(removed as i64)
484        })
485    }
486
487    fn update<'q>(
488        &'q mut self,
489        kind: EntryKind,
490        operation: EntryOperation,
491        category: &'q str,
492        name: &'q str,
493        value: Option<&'q [u8]>,
494        tags: Option<&'q [EntryTag]>,
495        expiry_ms: Option<i64>,
496    ) -> BoxFuture<'q, Result<(), Error>> {
497        let category = ProfileKey::prepare_input(category.as_bytes());
498        let name = ProfileKey::prepare_input(name.as_bytes());
499
500        match operation {
501            op @ EntryOperation::Insert | op @ EntryOperation::Replace => {
502                let value = ProfileKey::prepare_input(value.unwrap_or_default());
503                let tags = tags.map(prepare_tags);
504                Box::pin(async move {
505                    let (_, key) = acquire_key(&mut *self).await?;
506                    let (enc_category, enc_name, enc_value, enc_tags) = unblock(move || {
507                        let enc_value =
508                            key.encrypt_entry_value(category.as_ref(), name.as_ref(), value)?;
509                        Result::<_, Error>::Ok((
510                            key.encrypt_entry_category(category)?,
511                            key.encrypt_entry_name(name)?,
512                            enc_value,
513                            tags.transpose()?
514                                .map(|t| key.encrypt_entry_tags(t))
515                                .transpose()?,
516                        ))
517                    })
518                    .await?;
519                    let mut active = acquire_session(&mut *self).await?;
520                    let mut txn = active.as_transaction().await?;
521                    perform_insert(
522                        &mut txn,
523                        kind,
524                        &enc_category,
525                        &enc_name,
526                        &enc_value,
527                        enc_tags,
528                        expiry_ms,
529                        op == EntryOperation::Insert,
530                    )
531                    .await?;
532                    txn.commit().await?;
533                    Ok(())
534                })
535            }
536
537            EntryOperation::Remove => Box::pin(async move {
538                let (_, key) = acquire_key(&mut *self).await?;
539                let (enc_category, enc_name) = unblock(move || {
540                    Result::<_, Error>::Ok((
541                        key.encrypt_entry_category(category)?,
542                        key.encrypt_entry_name(name)?,
543                    ))
544                })
545                .await?;
546                let mut active = acquire_session(&mut *self).await?;
547                perform_remove(&mut active, kind, &enc_category, &enc_name, false).await
548            }),
549        }
550    }
551
552    fn ping(&mut self) -> BoxFuture<'_, Result<(), Error>> {
553        Box::pin(async move {
554            let mut sess = acquire_session(&mut *self).await?;
555            let count: i64 = sqlx::query_scalar("SELECT COUNT(*) FROM profiles WHERE id=$1")
556                .bind(sess.profile_id)
557                .fetch_one(sess.connection_mut())
558                .await
559                .map_err(err_map!(Backend, "Error pinging session"))?;
560            if count == 0 {
561                Err(err_msg!(NotFound, "Session profile has been removed"))
562            } else {
563                Ok(())
564            }
565        })
566    }
567
568    fn close(&mut self, commit: bool) -> BoxFuture<'_, Result<(), Error>> {
569        Box::pin(self.close(commit))
570    }
571}
572
573impl ExtDatabase for Sqlite {
574    fn start_transaction(
575        conn: &mut Connection<Self>,
576        nested: bool,
577    ) -> BoxFuture<'_, std::result::Result<(), SqlxError>> {
578        // FIXME - this is a horrible workaround because there is currently
579        // no good way to start an immediate transaction with sqlx. Without this
580        // adjustment, updates will run into 'database is locked' errors.
581        Box::pin(async move {
582            <Sqlite as Database>::TransactionManager::begin(conn).await?;
583            if !nested {
584                // a no-op write transaction
585                sqlx::query("DELETE FROM config WHERE 0")
586                    .execute(conn)
587                    .await?;
588            }
589            Ok(())
590        })
591    }
592}
593
594async fn acquire_key(
595    session: &mut DbSession<Sqlite>,
596) -> Result<(ProfileId, Arc<ProfileKey>), Error> {
597    acquire_session(session).await?;
598    Ok(session.profile_and_key().unwrap())
599}
600
601async fn acquire_session(
602    session: &mut DbSession<Sqlite>,
603) -> Result<DbSessionActive<'_, Sqlite>, Error> {
604    session.make_active(&resolve_profile_key).await
605}
606
607async fn resolve_profile_key(
608    conn: &mut PoolConnection<Sqlite>,
609    cache: Arc<KeyCache>,
610    profile: String,
611    _in_txn: bool,
612) -> Result<(ProfileId, Arc<ProfileKey>), Error> {
613    if let Some((pid, key)) = cache.get_profile(profile.as_str()).await {
614        Ok((pid, key))
615    } else if let Some(row) = sqlx::query("SELECT id, profile_key FROM profiles WHERE name=?1")
616        .bind(profile.as_str())
617        .fetch_optional(conn.as_mut())
618        .await
619        .map_err(err_map!(Backend, "Error fetching profile key"))?
620    {
621        let pid = row.try_get(0)?;
622        let key = Arc::new(cache.load_key(row.try_get(1)?).await?);
623        cache.add_profile(profile, pid, key.clone()).await;
624        Ok((pid, key))
625    } else {
626        Err(err_msg!(NotFound, "Profile not found"))
627    }
628}
629
630#[allow(clippy::too_many_arguments)]
631async fn perform_insert(
632    active: &mut DbSessionTxn<'_, Sqlite>,
633    kind: EntryKind,
634    enc_category: &[u8],
635    enc_name: &[u8],
636    enc_value: &[u8],
637    enc_tags: Option<Vec<EncEntryTag>>,
638    expiry_ms: Option<i64>,
639    new_row: bool,
640) -> Result<(), Error> {
641    let row_id = if new_row {
642        trace!("Insert entry");
643        let done = sqlx::query(INSERT_QUERY)
644            .bind(active.profile_id)
645            .bind(kind as i16)
646            .bind(enc_category)
647            .bind(enc_name)
648            .bind(enc_value)
649            .bind(expiry_ms.map(expiry_timestamp).transpose()?)
650            .execute(active.connection_mut())
651            .await
652            .map_err(err_map!(Backend, "Error inserting new entry"))?;
653        if done.rows_affected() == 0 {
654            return Err(err_msg!(Duplicate, "Duplicate entry"));
655        }
656        done.last_insert_rowid()
657    } else {
658        trace!("Update entry");
659        let row_id: i64 = sqlx::query_scalar(UPDATE_QUERY)
660            .bind(active.profile_id)
661            .bind(kind as i16)
662            .bind(enc_category)
663            .bind(enc_name)
664            .bind(enc_value)
665            .bind(expiry_ms.map(expiry_timestamp).transpose()?)
666            .fetch_one(active.connection_mut())
667            .await
668            .map_err(|_| err_msg!(NotFound, "Error updating existing entry"))?;
669        sqlx::query(TAG_DELETE_QUERY)
670            .bind(row_id)
671            .execute(active.connection_mut())
672            .await
673            .map_err(err_map!(Backend, "Error removing existing entry tags"))?;
674        row_id
675    };
676    if let Some(tags) = enc_tags {
677        for tag in tags {
678            sqlx::query(TAG_INSERT_QUERY)
679                .bind(row_id)
680                .bind(&tag.name)
681                .bind(&tag.value)
682                .bind(tag.plaintext as i16)
683                .execute(active.connection_mut())
684                .await
685                .map_err(err_map!(Backend, "Error inserting entry tags"))?;
686        }
687    }
688    Ok(())
689}
690
691async fn perform_remove<'q>(
692    active: &mut DbSessionActive<'q, Sqlite>,
693    kind: EntryKind,
694    enc_category: &[u8],
695    enc_name: &[u8],
696    ignore_error: bool,
697) -> Result<(), Error> {
698    trace!("Remove entry");
699    let done = sqlx::query(DELETE_QUERY)
700        .bind(active.profile_id)
701        .bind(kind as i16)
702        .bind(enc_category)
703        .bind(enc_name)
704        .execute(active.connection_mut())
705        .await
706        .map_err(err_map!(Backend, "Error removing entry"))?;
707    if done.rows_affected() == 0 && !ignore_error {
708        Err(err_msg!(NotFound, "Entry not found"))
709    } else {
710        Ok(())
711    }
712}
713
714#[allow(clippy::too_many_arguments)]
715fn perform_scan(
716    mut active: DbSessionRef<'_, Sqlite>,
717    profile_id: ProfileId,
718    key: Arc<ProfileKey>,
719    kind: Option<EntryKind>,
720    category: Option<String>,
721    tag_filter: Option<TagFilter>,
722    offset: Option<i64>,
723    limit: Option<i64>,
724    order_by: Option<OrderBy>,
725    descending: bool,
726) -> impl Stream<Item = Result<Vec<EncScanEntry>, Error>> + '_ {
727    try_stream! {
728        let mut params = QueryParams::new();
729        params.push(profile_id);
730        params.push(kind.map(|k| k as i16));
731        let (enc_category, tag_filter) = unblock({
732            let key = key.clone();
733            let enc_category = category.as_ref().map(|c| ProfileKey::prepare_input(c.as_bytes()));
734            let params_len = params.len() + 1; // plus category
735            move || {
736                Result::<_, Error>::Ok((
737                    enc_category.map(|c| key.encrypt_entry_category(c)).transpose()?,
738                    encode_tag_filter::<SqliteBackend>(tag_filter, &key, params_len)?
739                ))
740            }
741        }).await?;
742        params.push(enc_category);
743        let query = extend_query::<SqliteBackend>(SCAN_QUERY, &mut params, tag_filter, offset, limit, order_by, descending)?;
744
745        let mut batch = Vec::with_capacity(PAGE_SIZE);
746
747        let mut acquired = acquire_session(&mut active).await?;
748        let mut rows = sqlx::query_with(query.as_str(), params).fetch(acquired.connection_mut());
749        while let Some(row) = rows.try_next().await? {
750            let kind: u32 = row.try_get(1)?;
751            let kind = EntryKind::try_from(kind as usize)?;
752            batch.push(EncScanEntry {
753                kind, category: row.try_get(2)?, name: row.try_get(3)?, value: row.try_get(4)?, tags: row.try_get(5)?
754            });
755            if batch.len() == PAGE_SIZE {
756                yield batch.split_off(0);
757            }
758        }
759        drop(rows);
760        if active.is_owned() {
761            active.close(false).await?;
762        }
763        drop(active);
764
765        if !batch.is_empty() {
766            yield batch;
767        }
768    }
769}
770
771#[cfg(test)]
772mod tests {
773    use super::*;
774    use crate::backend::db_utils::replace_arg_placeholders;
775    use crate::future::block_on;
776    use crate::protect::{generate_raw_store_key, StoreKeyMethod};
777
778    #[test]
779    fn sqlite_check_expiry_timestamp() {
780        block_on(async {
781            let key = generate_raw_store_key(None)?;
782            let db = SqliteStoreOptions::in_memory()
783                .provision(StoreKeyMethod::RawKey, key, None, false)
784                .await?;
785            let ts = expiry_timestamp(1000).unwrap();
786            let check = sqlx::query("SELECT datetime('now'), ?1, datetime(?1) > datetime('now')")
787                .bind(ts)
788                .fetch_one(&db.conn_pool)
789                .await?;
790            let now: String = check.try_get(0)?;
791            let cmp_ts: String = check.try_get(1)?;
792            let cmp: bool = check.try_get(2)?;
793            if !cmp {
794                panic!("now ({}) > expiry timestamp ({})", now, cmp_ts);
795            }
796            Result::<_, Error>::Ok(())
797        })
798        .unwrap();
799    }
800
801    #[test]
802    fn sqlite_check_expiry_timestamp_expired() {
803        block_on(async {
804            let key = generate_raw_store_key(None)?;
805            let db = SqliteStoreOptions::in_memory()
806                .provision(StoreKeyMethod::RawKey, key, None, false)
807                .await?;
808            let ts = expiry_timestamp(-1000).unwrap(); // put it to be already expired
809            let check = sqlx::query("SELECT datetime('now'), ?1, datetime(?1) > datetime('now')")
810                .bind(ts)
811                .fetch_one(&db.conn_pool)
812                .await?;
813            let now: String = check.try_get(0)?;
814            let cmp_ts: String = check.try_get(1)?;
815            let cmp: bool = check.try_get(2)?;
816            if cmp {
817                panic!("now ({}) < expiry timestamp ({})", now, cmp_ts);
818            }
819            Result::<_, Error>::Ok(())
820        })
821        .unwrap();
822    }
823
824    #[test]
825    fn sqlite_query_placeholders() {
826        assert_eq!(
827            &replace_arg_placeholders::<SqliteBackend>("This $$ is $10 a $$ string!", 3),
828            "This ?3 is ?12 a ?5 string!",
829        );
830        assert_eq!(
831            &replace_arg_placeholders::<SqliteBackend>("This $a is a string!", 1),
832            "This $a is a string!",
833        );
834    }
835}