askar_storage/backend/postgres/
mod.rs

1use std::collections::BTreeMap;
2use std::fmt::{self, Debug, Formatter};
3use std::sync::Arc;
4
5use async_stream::try_stream;
6
7use futures_lite::{
8    pin,
9    stream::{Stream, StreamExt},
10};
11
12use sqlx::{
13    pool::PoolConnection,
14    postgres::{PgPool, Postgres},
15    Acquire, Row,
16};
17
18use super::{
19    db_utils::{
20        decode_tags, decrypt_scan_batch, encode_profile_key, encode_tag_filter, expiry_timestamp,
21        extend_query, prepare_tags, random_profile_name, replace_arg_placeholders, DbSession,
22        DbSessionActive, DbSessionRef, DbSessionTxn, EncScanEntry, ExtDatabase, QueryParams,
23        QueryPrepare, PAGE_SIZE,
24    },
25    Backend, BackendSession,
26};
27use crate::{
28    backend::OrderBy,
29    entry::{EncEntryTag, Entry, EntryKind, EntryOperation, EntryTag, Scan, TagFilter},
30    error::Error,
31    future::{unblock, BoxFuture},
32    protect::{EntryEncryptor, KeyCache, PassKey, ProfileId, ProfileKey, StoreKeyMethod},
33};
34
35mod provision;
36pub use self::provision::PostgresStoreOptions;
37
38#[cfg(any(test, feature = "pg_test"))]
39mod test_db;
40#[cfg(any(test, feature = "pg_test"))]
41pub use self::test_db::TestDB;
42
43const CONFIG_FETCH_QUERY: &str = "SELECT value FROM config WHERE name = $1";
44const CONFIG_UPDATE_QUERY: &str = "INSERT INTO config (name, value) VALUES ($1, $2)
45    ON CONFLICT(name) DO UPDATE SET value = excluded.value";
46const COUNT_QUERY: &str = "SELECT COUNT(*) FROM items i
47    WHERE profile_id = $1
48    AND (kind = $2 OR $2 IS NULL)
49    AND (category = $3 OR $3 IS NULL)
50    AND (expiry IS NULL OR expiry > CURRENT_TIMESTAMP)";
51const DELETE_QUERY: &str = "DELETE FROM items
52    WHERE profile_id = $1 AND kind = $2 AND category = $3 AND name = $4";
53const FETCH_QUERY: &str = "SELECT id, value,
54    (SELECT ARRAY_TO_STRING(ARRAY_AGG(it.plaintext || ':'
55        || ENCODE(it.name, 'hex') || ':' || ENCODE(it.value, 'hex')), ',')
56        FROM items_tags it WHERE it.item_id = i.id) tags
57    FROM items i
58    WHERE profile_id = $1 AND kind = $2 AND category = $3 AND name = $4
59    AND (expiry IS NULL OR expiry > CURRENT_TIMESTAMP)";
60const FETCH_QUERY_UPDATE: &str = "SELECT id, value,
61    (SELECT ARRAY_TO_STRING(ARRAY_AGG(it.plaintext || ':'
62        || ENCODE(it.name, 'hex') || ':' || ENCODE(it.value, 'hex')), ',')
63        FROM items_tags it WHERE it.item_id = i.id) tags
64    FROM items i
65    WHERE profile_id = $1 AND kind = $2 AND category = $3 AND name = $4
66    AND (expiry IS NULL OR expiry > CURRENT_TIMESTAMP) FOR NO KEY UPDATE";
67const INSERT_QUERY: &str = "INSERT INTO items (profile_id, kind, category, name, value, expiry)
68    VALUES ($1, $2, $3, $4, $5, $6)
69    ON CONFLICT DO NOTHING RETURNING id";
70const UPDATE_QUERY: &str = "UPDATE items SET value=$5, expiry=$6
71    WHERE profile_id=$1 AND kind=$2 AND category=$3 AND name=$4
72    RETURNING id";
73const SCAN_QUERY: &str = "SELECT id, kind, category, name, value,
74    (SELECT ARRAY_TO_STRING(ARRAY_AGG(it.plaintext || ':'
75        || ENCODE(it.name, 'hex') || ':' || ENCODE(it.value, 'hex')), ',')
76        FROM items_tags it WHERE it.item_id = i.id) tags
77    FROM items i WHERE profile_id = $1
78    AND (kind = $2 OR $2 IS NULL)
79    AND (category = $3 OR $3 IS NULL)
80    AND (expiry IS NULL OR expiry > CURRENT_TIMESTAMP)";
81const DELETE_ALL_QUERY: &str = "DELETE FROM items i
82    WHERE profile_id = $1
83    AND (kind = $2 OR $2 IS NULL)
84    AND (category = $3 OR $3 IS NULL)";
85const TAG_INSERT_QUERY: &str = "INSERT INTO items_tags
86    (item_id, name, value, plaintext) VALUES ($1, $2, $3, $4)";
87const TAG_DELETE_QUERY: &str = "DELETE FROM items_tags
88    WHERE item_id=$1";
89
90/// A PostgreSQL database store
91pub struct PostgresBackend {
92    conn_pool: PgPool,
93    active_profile: String,
94    key_cache: Arc<KeyCache>,
95    host: String,
96    name: String,
97}
98
99impl PostgresBackend {
100    pub(crate) fn new(
101        conn_pool: PgPool,
102        active_profile: String,
103        key_cache: KeyCache,
104        host: String,
105        name: String,
106    ) -> Self {
107        Self {
108            conn_pool,
109            active_profile,
110            key_cache: Arc::new(key_cache),
111            host,
112            name,
113        }
114    }
115}
116
117impl Backend for PostgresBackend {
118    type Session = DbSession<Postgres>;
119
120    fn create_profile(&self, name: Option<String>) -> BoxFuture<'_, Result<String, Error>> {
121        let name = name.unwrap_or_else(random_profile_name);
122        Box::pin(async move {
123            let store_key = self.key_cache.store_key.clone();
124            let (profile_key, enc_key) = unblock(move || {
125                let profile_key = ProfileKey::new()?;
126                let enc_key = encode_profile_key(&profile_key, &store_key)?;
127                Result::<_, Error>::Ok((profile_key, enc_key))
128            })
129            .await?;
130            let mut conn = self.conn_pool.acquire().await?;
131            let res = sqlx::query_scalar(
132                "INSERT INTO profiles (name, profile_key) VALUES ($1, $2) 
133                ON CONFLICT DO NOTHING RETURNING id",
134            )
135            .bind(&name)
136            .bind(enc_key)
137            .fetch_optional(conn.as_mut())
138            .await?;
139            conn.return_to_pool().await;
140            if let Some(pid) = res {
141                self.key_cache
142                    .add_profile(name.clone(), pid, Arc::new(profile_key))
143                    .await;
144                Ok(name)
145            } else {
146                Err(err_msg!(Duplicate, "Duplicate profile name"))
147            }
148        })
149    }
150
151    fn get_active_profile(&self) -> String {
152        self.active_profile.clone()
153    }
154
155    fn get_default_profile(&self) -> BoxFuture<'_, Result<String, Error>> {
156        Box::pin(async move {
157            let mut conn = self.conn_pool.acquire().await?;
158            let profile: Option<String> = sqlx::query_scalar(CONFIG_FETCH_QUERY)
159                .bind("default_profile")
160                .fetch_one(conn.as_mut())
161                .await
162                .map_err(err_map!(Backend, "Error fetching default profile name"))?;
163            conn.return_to_pool().await;
164            Ok(profile.unwrap_or_default())
165        })
166    }
167
168    fn set_default_profile(&self, profile: String) -> BoxFuture<'_, Result<(), Error>> {
169        Box::pin(async move {
170            let mut conn = self.conn_pool.acquire().await?;
171            sqlx::query(CONFIG_UPDATE_QUERY)
172                .bind("default_profile")
173                .bind(profile)
174                .execute(conn.as_mut())
175                .await
176                .map_err(err_map!(Backend, "Error setting default profile name"))?;
177            conn.return_to_pool().await;
178            Ok(())
179        })
180    }
181
182    fn list_profiles(&self) -> BoxFuture<'_, Result<Vec<String>, Error>> {
183        Box::pin(async move {
184            let mut conn = self.conn_pool.acquire().await?;
185            let rows = sqlx::query("SELECT name FROM profiles")
186                .fetch_all(conn.as_mut())
187                .await
188                .map_err(err_map!(Backend, "Error fetching profile list"))?;
189            let names = rows.into_iter().flat_map(|r| r.try_get(0)).collect();
190            conn.return_to_pool().await;
191            Ok(names)
192        })
193    }
194
195    fn remove_profile(&self, name: String) -> BoxFuture<'_, Result<bool, Error>> {
196        Box::pin(async move {
197            let mut conn = self.conn_pool.acquire().await?;
198            let ret = sqlx::query("DELETE FROM profiles WHERE name=$1")
199                .bind(&name)
200                .execute(conn.as_mut())
201                .await
202                .map_err(err_map!(Backend, "Error removing profile"))?
203                .rows_affected()
204                != 0;
205            conn.return_to_pool().await;
206            Ok(ret)
207        })
208    }
209
210    fn rekey(
211        &mut self,
212        method: StoreKeyMethod,
213        pass_key: PassKey<'_>,
214    ) -> BoxFuture<'_, Result<(), Error>> {
215        let pass_key = pass_key.into_owned();
216        Box::pin(async move {
217            let (store_key, store_key_ref) = unblock(move || method.resolve(pass_key)).await?;
218            let store_key = Arc::new(store_key);
219            let mut conn = self.conn_pool.acquire().await?;
220            let mut txn = conn.begin().await?;
221            let mut rows = sqlx::query("SELECT id, profile_key FROM profiles").fetch(txn.as_mut());
222            let mut upd_keys = BTreeMap::<ProfileId, Vec<u8>>::new();
223            while let Some(row) = rows.next().await {
224                let row = row?;
225                let pid = row.try_get(0)?;
226                let enc_key = row.try_get(1)?;
227                let profile_key = self.key_cache.load_key(enc_key).await?;
228                let upd_key = unblock({
229                    let store_key = store_key.clone();
230                    move || encode_profile_key(&profile_key, &store_key)
231                })
232                .await?;
233                upd_keys.insert(pid, upd_key);
234            }
235            drop(rows);
236            for (pid, key) in upd_keys {
237                if sqlx::query("UPDATE profiles SET profile_key=$1 WHERE id=$2")
238                    .bind(key)
239                    .bind(pid)
240                    .execute(txn.as_mut())
241                    .await?
242                    .rows_affected()
243                    != 1
244                {
245                    return Err(err_msg!(Backend, "Error updating profile key"));
246                }
247            }
248            if sqlx::query("UPDATE config SET value=$1 WHERE name='key'")
249                .bind(store_key_ref.into_uri())
250                .execute(txn.as_mut())
251                .await?
252                .rows_affected()
253                != 1
254            {
255                return Err(err_msg!(Backend, "Error updating store key"));
256            }
257            txn.commit().await?;
258            conn.return_to_pool().await;
259            self.key_cache = Arc::new(KeyCache::new(store_key));
260            Ok(())
261        })
262    }
263
264    fn scan(
265        &self,
266        profile: Option<String>,
267        kind: Option<EntryKind>,
268        category: Option<String>,
269        tag_filter: Option<TagFilter>,
270        offset: Option<i64>,
271        limit: Option<i64>,
272        order_by: Option<OrderBy>,
273        descending: bool,
274    ) -> BoxFuture<'_, Result<Scan<'static, Entry>, Error>> {
275        Box::pin(async move {
276            let session = self.session(profile, false)?;
277            let mut active = session.owned_ref();
278            let (profile_id, key) = acquire_key(&mut active).await?;
279            let scan = perform_scan(
280                active,
281                profile_id,
282                key.clone(),
283                kind,
284                category.clone(),
285                tag_filter,
286                offset,
287                limit,
288                order_by,
289                descending,
290                false,
291            );
292            let stream = scan.then(move |enc_rows| {
293                let category = category.clone();
294                let key = key.clone();
295                unblock(move || decrypt_scan_batch(category, enc_rows?, &key))
296            });
297            Ok(Scan::new(stream, PAGE_SIZE))
298        })
299    }
300
301    fn session(&self, profile: Option<String>, transaction: bool) -> Result<Self::Session, Error> {
302        Ok(DbSession::new(
303            self.conn_pool.clone(),
304            self.key_cache.clone(),
305            profile.unwrap_or_else(|| self.active_profile.clone()),
306            transaction,
307        ))
308    }
309
310    fn close(&self) -> BoxFuture<'_, Result<(), Error>> {
311        Box::pin(async move {
312            self.conn_pool.close().await;
313            Ok(())
314        })
315    }
316}
317
318impl Debug for PostgresBackend {
319    fn fmt(&self, f: &mut Formatter<'_>) -> fmt::Result {
320        f.debug_struct("PostgresStore")
321            .field("active_profile", &self.active_profile)
322            .field("host", &self.host)
323            .field("name", &self.name)
324            .finish()
325    }
326}
327
328impl BackendSession for DbSession<Postgres> {
329    fn count<'q>(
330        &'q mut self,
331        kind: Option<EntryKind>,
332        category: Option<&'q str>,
333        tag_filter: Option<TagFilter>,
334    ) -> BoxFuture<'q, Result<i64, Error>> {
335        let enc_category = category.map(|c| ProfileKey::prepare_input(c.as_bytes()));
336
337        Box::pin(async move {
338            let (profile_id, key) = acquire_key(&mut *self).await?;
339            let mut params = QueryParams::new();
340            params.push(profile_id);
341            params.push(kind.map(|k| k as i16));
342            let (enc_category, tag_filter) = unblock({
343                let params_len = params.len() + 1; // plus category
344                move || {
345                    Result::<_, Error>::Ok((
346                        enc_category
347                            .map(|c| key.encrypt_entry_category(c))
348                            .transpose()?,
349                        encode_tag_filter::<PostgresBackend>(tag_filter, &key, params_len)?,
350                    ))
351                }
352            })
353            .await?;
354            params.push(enc_category);
355            let query = extend_query::<PostgresBackend>(
356                COUNT_QUERY,
357                &mut params,
358                tag_filter,
359                None,
360                None,
361                None,
362                false,
363            )?;
364            let mut active = acquire_session(&mut *self).await?;
365            let count = sqlx::query_scalar_with(query.as_str(), params)
366                .fetch_one(active.connection_mut())
367                .await
368                .map_err(err_map!(Backend, "Error performing count query"))?;
369            Ok(count)
370        })
371    }
372
373    fn fetch(
374        &mut self,
375        kind: EntryKind,
376        category: &str,
377        name: &str,
378        for_update: bool,
379    ) -> BoxFuture<'_, Result<Option<Entry>, Error>> {
380        let category = category.to_string();
381        let name = name.to_string();
382
383        Box::pin(async move {
384            let (profile_id, key) = acquire_key(&mut *self).await?;
385            let (enc_category, enc_name) = unblock({
386                let key = key.clone();
387                let category = ProfileKey::prepare_input(category.as_bytes());
388                let name = ProfileKey::prepare_input(name.as_bytes());
389                move || {
390                    Result::<_, Error>::Ok((
391                        key.encrypt_entry_category(category)?,
392                        key.encrypt_entry_name(name)?,
393                    ))
394                }
395            })
396            .await?;
397            let mut active = acquire_session(&mut *self).await?;
398            if let Some(row) = sqlx::query(if for_update && active.in_transaction() {
399                FETCH_QUERY_UPDATE
400            } else {
401                FETCH_QUERY
402            })
403            .bind(profile_id)
404            .bind(kind as i16)
405            .bind(enc_category)
406            .bind(enc_name)
407            .fetch_optional(active.connection_mut())
408            .await
409            .map_err(err_map!(Backend, "Error performing fetch query"))?
410            {
411                let value = row.try_get(1)?;
412                let tags = row.try_get::<Option<String>, _>(2)?.map(String::into_bytes);
413                let (category, name, value, tags) = unblock(move || {
414                    let value = key.decrypt_entry_value(category.as_ref(), name.as_ref(), value)?;
415                    let tags = if let Some(enc_tags) = tags {
416                        key.decrypt_entry_tags(
417                            decode_tags(enc_tags)
418                                .map_err(|_| err_msg!(Unexpected, "Error decoding tags"))?,
419                        )?
420                    } else {
421                        Vec::new()
422                    };
423                    Result::<_, Error>::Ok((category, name, value, tags))
424                })
425                .await?;
426                Ok(Some(Entry::new(kind, category, name, value, tags)))
427            } else {
428                Ok(None)
429            }
430        })
431    }
432
433    fn fetch_all<'q>(
434        &'q mut self,
435        kind: Option<EntryKind>,
436        category: Option<&'q str>,
437        tag_filter: Option<TagFilter>,
438        limit: Option<i64>,
439        order_by: Option<OrderBy>,
440        descending: bool,
441        for_update: bool,
442    ) -> BoxFuture<'q, Result<Vec<Entry>, Error>> {
443        let category = category.map(|c| c.to_string());
444        Box::pin(async move {
445            let for_update = for_update && self.in_transaction();
446            let mut active = self.borrow_mut();
447            let (profile_id, key) = acquire_key(&mut active).await?;
448            let scan = perform_scan(
449                active,
450                profile_id,
451                key.clone(),
452                kind,
453                category.clone(),
454                tag_filter,
455                None,
456                limit,
457                order_by,
458                descending,
459                for_update,
460            );
461            pin!(scan);
462            let mut enc_rows = vec![];
463            while let Some(rows) = scan.try_next().await? {
464                enc_rows.extend(rows)
465            }
466            unblock(move || decrypt_scan_batch(category, enc_rows, &key)).await
467        })
468    }
469
470    fn remove_all<'q>(
471        &'q mut self,
472        kind: Option<EntryKind>,
473        category: Option<&'q str>,
474        tag_filter: Option<TagFilter>,
475    ) -> BoxFuture<'q, Result<i64, Error>> {
476        let enc_category = category.map(|c| ProfileKey::prepare_input(c.as_bytes()));
477
478        Box::pin(async move {
479            let (profile_id, key) = acquire_key(&mut *self).await?;
480            let mut params = QueryParams::new();
481            params.push(profile_id);
482            params.push(kind.map(|k| k as i16));
483            let (enc_category, tag_filter) = unblock({
484                let params_len = params.len() + 1; // plus category
485                move || {
486                    Result::<_, Error>::Ok((
487                        enc_category
488                            .map(|c| key.encrypt_entry_category(c))
489                            .transpose()?,
490                        encode_tag_filter::<PostgresBackend>(tag_filter, &key, params_len)?,
491                    ))
492                }
493            })
494            .await?;
495            params.push(enc_category);
496            let query = extend_query::<PostgresBackend>(
497                DELETE_ALL_QUERY,
498                &mut params,
499                tag_filter,
500                None,
501                None,
502                None,
503                false,
504            )?;
505
506            let mut active = acquire_session(&mut *self).await?;
507            let removed = sqlx::query_with(query.as_str(), params)
508                .execute(active.connection_mut())
509                .await?
510                .rows_affected();
511            Ok(removed as i64)
512        })
513    }
514
515    fn update<'q>(
516        &'q mut self,
517        kind: EntryKind,
518        operation: EntryOperation,
519        category: &'q str,
520        name: &'q str,
521        value: Option<&'q [u8]>,
522        tags: Option<&'q [EntryTag]>,
523        expiry_ms: Option<i64>,
524    ) -> BoxFuture<'q, Result<(), Error>> {
525        let category = ProfileKey::prepare_input(category.as_bytes());
526        let name = ProfileKey::prepare_input(name.as_bytes());
527
528        match operation {
529            op @ EntryOperation::Insert | op @ EntryOperation::Replace => {
530                let value = ProfileKey::prepare_input(value.unwrap_or_default());
531                let tags = tags.map(prepare_tags);
532                Box::pin(async move {
533                    let (_, key) = acquire_key(&mut *self).await?;
534                    let (enc_category, enc_name, enc_value, enc_tags) = unblock(move || {
535                        let enc_value =
536                            key.encrypt_entry_value(category.as_ref(), name.as_ref(), value)?;
537                        Result::<_, Error>::Ok((
538                            key.encrypt_entry_category(category)?,
539                            key.encrypt_entry_name(name)?,
540                            enc_value,
541                            tags.transpose()?
542                                .map(|t| key.encrypt_entry_tags(t))
543                                .transpose()?,
544                        ))
545                    })
546                    .await?;
547                    let mut active = acquire_session(&mut *self).await?;
548                    let mut txn = active.as_transaction().await?;
549                    perform_insert(
550                        &mut txn,
551                        kind,
552                        &enc_category,
553                        &enc_name,
554                        &enc_value,
555                        enc_tags,
556                        expiry_ms,
557                        op == EntryOperation::Insert,
558                    )
559                    .await?;
560                    txn.commit().await?;
561                    Ok(())
562                })
563            }
564
565            EntryOperation::Remove => Box::pin(async move {
566                let (_, key) = acquire_key(&mut *self).await?;
567                let (enc_category, enc_name) = unblock(move || {
568                    Result::<_, Error>::Ok((
569                        key.encrypt_entry_category(category)?,
570                        key.encrypt_entry_name(name)?,
571                    ))
572                })
573                .await?;
574                let mut active = acquire_session(&mut *self).await?;
575                perform_remove(&mut active, kind, &enc_category, &enc_name, false).await
576            }),
577        }
578    }
579
580    fn ping(&mut self) -> BoxFuture<'_, Result<(), Error>> {
581        Box::pin(async move {
582            let mut sess = acquire_session(&mut *self).await?;
583            if sess.in_transaction() {
584                // the profile row is locked, perform a typical ping
585                sqlx::Connection::ping(sess.connection_mut())
586                    .await
587                    .map_err(err_map!(Backend, "Error pinging session"))?;
588            } else {
589                let count: i64 = sqlx::query_scalar("SELECT COUNT(*) FROM profiles WHERE id=$1")
590                    .bind(sess.profile_id)
591                    .fetch_one(sess.connection_mut())
592                    .await
593                    .map_err(err_map!(Backend, "Error pinging session"))?;
594                if count == 0 {
595                    return Err(err_msg!(NotFound, "Session profile has been removed"));
596                }
597            }
598            Ok(())
599        })
600    }
601
602    fn close(&mut self, commit: bool) -> BoxFuture<'_, Result<(), Error>> {
603        Box::pin(self.close(commit))
604    }
605}
606
607impl ExtDatabase for Postgres {}
608
609impl QueryPrepare for PostgresBackend {
610    type DB = Postgres;
611
612    fn placeholder(index: i64) -> String {
613        format!("${}", index)
614    }
615
616    fn limit_query<'q>(
617        mut query: String,
618        args: &mut QueryParams<'q, Self::DB>,
619        offset: Option<i64>,
620        limit: Option<i64>,
621    ) -> String
622    where
623        i64: for<'e> sqlx::Encode<'e, Self::DB> + sqlx::Type<Self::DB>,
624    {
625        if offset.is_some() || limit.is_some() {
626            let last_idx = (args.len() + 1) as i64;
627            args.push(limit);
628            args.push(offset.unwrap_or(0));
629            let limit = replace_arg_placeholders::<Self>(" LIMIT $$ OFFSET $$", last_idx);
630            query.push_str(&limit);
631        }
632        query
633    }
634}
635
636async fn acquire_key(
637    session: &mut DbSession<Postgres>,
638) -> Result<(ProfileId, Arc<ProfileKey>), Error> {
639    acquire_session(session).await?;
640    Ok(session.profile_and_key().unwrap())
641}
642
643async fn acquire_session(
644    session: &'_ mut DbSession<Postgres>,
645) -> Result<DbSessionActive<'_, Postgres>, Error> {
646    session.make_active(&resolve_profile_key).await
647}
648
649async fn resolve_profile_key(
650    conn: &mut PoolConnection<Postgres>,
651    cache: Arc<KeyCache>,
652    profile: String,
653    in_txn: bool,
654) -> Result<(ProfileId, Arc<ProfileKey>), Error> {
655    if let Some((pid, key)) = cache.get_profile(profile.as_str()).await {
656        if in_txn {
657            // lock the profile row to prevent it from being removed
658            let check: Option<i64> =
659                sqlx::query_scalar("SELECT id FROM profiles WHERE id=$1 FOR NO KEY UPDATE")
660                    .bind(pid)
661                    .fetch_optional(conn.as_mut())
662                    .await?;
663            if check.is_none() {
664                return Err(err_msg!(NotFound, "Session profile has been removed"));
665            }
666        }
667        Ok((pid, key))
668    } else if let Some(row) =
669        sqlx::query("SELECT id, profile_key FROM profiles WHERE name=$1 FOR NO KEY UPDATE")
670            .bind(profile.as_str())
671            .fetch_optional(conn.as_mut())
672            .await?
673    {
674        let pid = row.try_get(0)?;
675        let key = Arc::new(cache.load_key(row.try_get(1)?).await?);
676        cache.add_profile(profile, pid, key.clone()).await;
677        Ok((pid, key))
678    } else {
679        Err(err_msg!(NotFound, "Profile not found"))
680    }
681}
682
683#[allow(clippy::too_many_arguments)]
684async fn perform_insert(
685    active: &mut DbSessionTxn<'_, Postgres>,
686    kind: EntryKind,
687    enc_category: &[u8],
688    enc_name: &[u8],
689    enc_value: &[u8],
690    enc_tags: Option<Vec<EncEntryTag>>,
691    expiry_ms: Option<i64>,
692    new_row: bool,
693) -> Result<(), Error> {
694    let row_id = if new_row {
695        trace!("Insert entry");
696        sqlx::query_scalar(INSERT_QUERY)
697            .bind(active.profile_id)
698            .bind(kind as i16)
699            .bind(enc_category)
700            .bind(enc_name)
701            .bind(enc_value)
702            .bind(expiry_ms.map(expiry_timestamp).transpose()?)
703            .fetch_optional(active.connection_mut())
704            .await?
705            .ok_or_else(|| err_msg!(Duplicate, "Duplicate entry"))?
706    } else {
707        trace!("Update entry");
708        let row_id: i64 = sqlx::query_scalar(UPDATE_QUERY)
709            .bind(active.profile_id)
710            .bind(kind as i16)
711            .bind(enc_category)
712            .bind(enc_name)
713            .bind(enc_value)
714            .bind(expiry_ms.map(expiry_timestamp).transpose()?)
715            .fetch_one(active.connection_mut())
716            .await
717            .map_err(|_| err_msg!(NotFound, "Error updating existing entry"))?;
718        sqlx::query(TAG_DELETE_QUERY)
719            .bind(row_id)
720            .execute(active.connection_mut())
721            .await
722            .map_err(err_map!(Backend, "Error removing existing entry tags"))?;
723        row_id
724    };
725    if let Some(tags) = enc_tags {
726        for tag in tags {
727            sqlx::query(TAG_INSERT_QUERY)
728                .bind(row_id)
729                .bind(&tag.name)
730                .bind(&tag.value)
731                .bind(tag.plaintext as i16)
732                .execute(active.connection_mut())
733                .await
734                .map_err(err_map!(Backend, "Error inserting entry tags"))?;
735        }
736    }
737    Ok(())
738}
739
740async fn perform_remove<'q>(
741    active: &mut DbSessionActive<'q, Postgres>,
742    kind: EntryKind,
743    enc_category: &[u8],
744    enc_name: &[u8],
745    ignore_error: bool,
746) -> Result<(), Error> {
747    trace!("Remove entry");
748    let done = sqlx::query(DELETE_QUERY)
749        .bind(active.profile_id)
750        .bind(kind as i16)
751        .bind(enc_category)
752        .bind(enc_name)
753        .execute(active.connection_mut())
754        .await
755        .map_err(err_map!(Backend, "Error removing entry"))?;
756    if done.rows_affected() == 0 && !ignore_error {
757        Err(err_msg!(NotFound, "Entry not found"))
758    } else {
759        Ok(())
760    }
761}
762
763#[allow(clippy::too_many_arguments)]
764fn perform_scan(
765    mut active: DbSessionRef<'_, Postgres>,
766    profile_id: ProfileId,
767    key: Arc<ProfileKey>,
768    kind: Option<EntryKind>,
769    category: Option<String>,
770    tag_filter: Option<TagFilter>,
771    offset: Option<i64>,
772    limit: Option<i64>,
773    order_by: Option<OrderBy>,
774    descending: bool,
775    for_update: bool,
776) -> impl Stream<Item = Result<Vec<EncScanEntry>, Error>> + '_ {
777    try_stream! {
778        let mut params = QueryParams::new();
779        params.push(profile_id);
780        params.push(kind.map(|k| k as i16));
781        let (enc_category, tag_filter) = unblock({
782            let key = key.clone();
783            let enc_category = category.map(|c| ProfileKey::prepare_input(c.as_bytes()));
784            let params_len = params.len() + 1; // plus category
785            move || {
786                Result::<_, Error>::Ok((
787                    enc_category
788                        .map(|c| key.encrypt_entry_category(c))
789                        .transpose()?,
790                    encode_tag_filter::<PostgresBackend>(tag_filter, &key, params_len)?
791                ))
792            }
793        }).await?;
794        params.push(enc_category);
795        let mut query = extend_query::<PostgresBackend>(SCAN_QUERY, &mut params, tag_filter, offset, limit, order_by, descending)?;
796        if for_update {
797            query.push_str(" FOR NO KEY UPDATE");
798        }
799        let mut batch = Vec::with_capacity(PAGE_SIZE);
800
801        let mut acquired = acquire_session(&mut active).await?;
802        let mut rows = sqlx::query_with(query.as_str(), params).fetch(acquired.connection_mut());
803        while let Some(row) = rows.try_next().await? {
804            let tags = row.try_get::<Option<String>, _>(5)?.map(String::into_bytes).unwrap_or_default();
805            let kind: i16 = row.try_get(1)?;
806            let kind = EntryKind::try_from(kind as usize)?;
807            batch.push(EncScanEntry {
808                kind, category: row.try_get(2)?, name: row.try_get(3)?, value: row.try_get(4)?, tags
809            });
810            if batch.len() == PAGE_SIZE {
811                yield batch.split_off(0);
812            }
813        }
814        drop(rows);
815        if active.is_owned() {
816            active.close(false).await?;
817        }
818        drop(active);
819
820        if !batch.is_empty() {
821            yield batch;
822        }
823    }
824}
825
826#[cfg(test)]
827mod tests {
828    use super::*;
829    use crate::backend::db_utils::replace_arg_placeholders;
830
831    #[test]
832    fn postgres_simple_and_convert_args_works() {
833        assert_eq!(
834            &replace_arg_placeholders::<PostgresBackend>("This $$ is $10 a $$ string!", 3),
835            "This $3 is $12 a $5 string!",
836        );
837    }
838}