askar_storage/backend/postgres/
mod.rs

1use std::collections::BTreeMap;
2use std::fmt::{self, Debug, Formatter};
3use std::sync::Arc;
4
5use async_stream::try_stream;
6
7use futures_lite::{
8    pin,
9    stream::{Stream, StreamExt},
10};
11
12use sqlx::{
13    pool::PoolConnection,
14    postgres::{PgPool, Postgres},
15    Acquire, Row,
16};
17
18use super::{
19    db_utils::{
20        decode_tags, decrypt_scan_batch, encode_profile_key, encode_tag_filter, expiry_timestamp,
21        extend_query, prepare_tags, random_profile_name, replace_arg_placeholders, DbSession,
22        DbSessionActive, DbSessionRef, DbSessionTxn, EncScanEntry, ExtDatabase, QueryParams,
23        QueryPrepare, PAGE_SIZE,
24    },
25    Backend, BackendSession,
26};
27use crate::{
28    backend::OrderBy,
29    entry::{EncEntryTag, Entry, EntryKind, EntryOperation, EntryTag, Scan, TagFilter},
30    error::Error,
31    future::{unblock, BoxFuture},
32    protect::{EntryEncryptor, KeyCache, PassKey, ProfileId, ProfileKey, StoreKeyMethod},
33};
34
35mod provision;
36pub use self::provision::PostgresStoreOptions;
37
38#[cfg(any(test, feature = "pg_test"))]
39mod test_db;
40#[cfg(any(test, feature = "pg_test"))]
41pub use self::test_db::TestDB;
42
43const CONFIG_FETCH_QUERY: &str = "SELECT value FROM config WHERE name = $1";
44const CONFIG_UPDATE_QUERY: &str = "INSERT INTO config (name, value) VALUES ($1, $2)
45    ON CONFLICT(name) DO UPDATE SET value = excluded.value";
46const COUNT_QUERY: &str = "SELECT COUNT(*) FROM items i
47    WHERE profile_id = $1
48    AND (kind = $2 OR $2 IS NULL)
49    AND (category = $3 OR $3 IS NULL)
50    AND (expiry IS NULL OR expiry > CURRENT_TIMESTAMP)";
51const DELETE_QUERY: &str = "DELETE FROM items
52    WHERE profile_id = $1 AND kind = $2 AND category = $3 AND name = $4";
53const FETCH_QUERY: &str = "SELECT id, value,
54    (SELECT ARRAY_TO_STRING(ARRAY_AGG(it.plaintext || ':'
55        || ENCODE(it.name, 'hex') || ':' || ENCODE(it.value, 'hex')), ',')
56        FROM items_tags it WHERE it.item_id = i.id) tags
57    FROM items i
58    WHERE profile_id = $1 AND kind = $2 AND category = $3 AND name = $4
59    AND (expiry IS NULL OR expiry > CURRENT_TIMESTAMP)";
60const FETCH_QUERY_UPDATE: &str = "SELECT id, value,
61    (SELECT ARRAY_TO_STRING(ARRAY_AGG(it.plaintext || ':'
62        || ENCODE(it.name, 'hex') || ':' || ENCODE(it.value, 'hex')), ',')
63        FROM items_tags it WHERE it.item_id = i.id) tags
64    FROM items i
65    WHERE profile_id = $1 AND kind = $2 AND category = $3 AND name = $4
66    AND (expiry IS NULL OR expiry > CURRENT_TIMESTAMP) FOR NO KEY UPDATE";
67const INSERT_QUERY: &str = "INSERT INTO items (profile_id, kind, category, name, value, expiry)
68    VALUES ($1, $2, $3, $4, $5, $6)
69    ON CONFLICT DO NOTHING RETURNING id";
70const UPDATE_QUERY: &str = "UPDATE items SET value=$5, expiry=$6
71    WHERE profile_id=$1 AND kind=$2 AND category=$3 AND name=$4
72    RETURNING id";
73const SCAN_QUERY: &str = "SELECT id, kind, category, name, value,
74    (SELECT ARRAY_TO_STRING(ARRAY_AGG(it.plaintext || ':'
75        || ENCODE(it.name, 'hex') || ':' || ENCODE(it.value, 'hex')), ',')
76        FROM items_tags it WHERE it.item_id = i.id) tags
77    FROM items i WHERE profile_id = $1
78    AND (kind = $2 OR $2 IS NULL)
79    AND (category = $3 OR $3 IS NULL)
80    AND (expiry IS NULL OR expiry > CURRENT_TIMESTAMP)";
81const DELETE_ALL_QUERY: &str = "DELETE FROM items i
82    WHERE profile_id = $1
83    AND (kind = $2 OR $2 IS NULL)
84    AND (category = $3 OR $3 IS NULL)";
85const TAG_INSERT_QUERY: &str = "INSERT INTO items_tags
86    (item_id, name, value, plaintext) VALUES ($1, $2, $3, $4)";
87const TAG_DELETE_QUERY: &str = "DELETE FROM items_tags
88    WHERE item_id=$1";
89
90/// A PostgreSQL database store
91pub struct PostgresBackend {
92    conn_pool: PgPool,
93    active_profile: String,
94    key_cache: Arc<KeyCache>,
95    host: String,
96    name: String,
97}
98
99impl PostgresBackend {
100    pub(crate) fn new(
101        conn_pool: PgPool,
102        active_profile: String,
103        key_cache: KeyCache,
104        host: String,
105        name: String,
106    ) -> Self {
107        Self {
108            conn_pool,
109            active_profile,
110            key_cache: Arc::new(key_cache),
111            host,
112            name,
113        }
114    }
115}
116
117impl Backend for PostgresBackend {
118    type Session = DbSession<Postgres>;
119
120    fn create_profile(&self, name: Option<String>) -> BoxFuture<'_, Result<String, Error>> {
121        let name = name.unwrap_or_else(random_profile_name);
122        Box::pin(async move {
123            let store_key = self.key_cache.store_key.clone();
124            let (profile_key, enc_key) = unblock(move || {
125                let profile_key = ProfileKey::new()?;
126                let enc_key = encode_profile_key(&profile_key, &store_key)?;
127                Result::<_, Error>::Ok((profile_key, enc_key))
128            })
129            .await?;
130            let mut conn = self.conn_pool.acquire().await?;
131            let res = sqlx::query_scalar(
132                "INSERT INTO profiles (name, profile_key) VALUES ($1, $2) 
133                ON CONFLICT DO NOTHING RETURNING id",
134            )
135            .bind(&name)
136            .bind(enc_key)
137            .fetch_optional(conn.as_mut())
138            .await?;
139            conn.return_to_pool().await;
140            if let Some(pid) = res {
141                self.key_cache
142                    .add_profile(name.clone(), pid, Arc::new(profile_key))
143                    .await;
144                Ok(name)
145            } else {
146                Err(err_msg!(Duplicate, "Duplicate profile name"))
147            }
148        })
149    }
150
151    fn get_active_profile(&self) -> String {
152        self.active_profile.clone()
153    }
154
155    fn get_default_profile(&self) -> BoxFuture<'_, Result<String, Error>> {
156        Box::pin(async move {
157            let mut conn = self.conn_pool.acquire().await?;
158            let profile: Option<String> = sqlx::query_scalar(CONFIG_FETCH_QUERY)
159                .bind("default_profile")
160                .fetch_one(conn.as_mut())
161                .await
162                .map_err(err_map!(Backend, "Error fetching default profile name"))?;
163            conn.return_to_pool().await;
164            Ok(profile.unwrap_or_default())
165        })
166    }
167
168    fn set_default_profile(&self, profile: String) -> BoxFuture<'_, Result<(), Error>> {
169        Box::pin(async move {
170            let mut conn = self.conn_pool.acquire().await?;
171            sqlx::query(CONFIG_UPDATE_QUERY)
172                .bind("default_profile")
173                .bind(profile)
174                .execute(conn.as_mut())
175                .await
176                .map_err(err_map!(Backend, "Error setting default profile name"))?;
177            conn.return_to_pool().await;
178            Ok(())
179        })
180    }
181
182    fn list_profiles(&self) -> BoxFuture<'_, Result<Vec<String>, Error>> {
183        Box::pin(async move {
184            let mut conn = self.conn_pool.acquire().await?;
185            let rows = sqlx::query("SELECT name FROM profiles")
186                .fetch_all(conn.as_mut())
187                .await
188                .map_err(err_map!(Backend, "Error fetching profile list"))?;
189            let names = rows.into_iter().flat_map(|r| r.try_get(0)).collect();
190            conn.return_to_pool().await;
191            Ok(names)
192        })
193    }
194
195    fn remove_profile(&self, name: String) -> BoxFuture<'_, Result<bool, Error>> {
196        Box::pin(async move {
197            let mut conn = self.conn_pool.acquire().await?;
198            let ret = sqlx::query("DELETE FROM profiles WHERE name=$1")
199                .bind(&name)
200                .execute(conn.as_mut())
201                .await
202                .map_err(err_map!(Backend, "Error removing profile"))?
203                .rows_affected()
204                != 0;
205            conn.return_to_pool().await;
206            self.key_cache.clear_profile(&name).await;
207            Ok(ret)
208        })
209    }
210
211    fn rename_profile(
212        &self,
213        from_name: String,
214        to_name: String,
215    ) -> BoxFuture<'_, Result<bool, Error>> {
216        Box::pin(async move {
217            let mut conn = self.conn_pool.acquire().await?;
218            let ret = sqlx::query("UPDATE profiles SET name=$1 WHERE name=$2")
219                .bind(&to_name)
220                .bind(&from_name)
221                .execute(conn.as_mut())
222                .await
223                .map_err(err_map!(Backend, "Error renaming profile"))?
224                .rows_affected()
225                != 0;
226            conn.return_to_pool().await;
227            self.key_cache.clear_profile(&from_name).await;
228            Ok(ret)
229        })
230    }
231
232    fn rekey(
233        &mut self,
234        method: StoreKeyMethod,
235        pass_key: PassKey<'_>,
236    ) -> BoxFuture<'_, Result<(), Error>> {
237        let pass_key = pass_key.into_owned();
238        Box::pin(async move {
239            let (store_key, store_key_ref) = unblock(move || method.resolve(pass_key)).await?;
240            let store_key = Arc::new(store_key);
241            let mut conn = self.conn_pool.acquire().await?;
242            let mut txn = conn.begin().await?;
243            let mut rows = sqlx::query("SELECT id, profile_key FROM profiles").fetch(txn.as_mut());
244            let mut upd_keys = BTreeMap::<ProfileId, Vec<u8>>::new();
245            while let Some(row) = rows.next().await {
246                let row = row?;
247                let pid = row.try_get(0)?;
248                let enc_key = row.try_get(1)?;
249                let profile_key = self.key_cache.load_key(enc_key).await?;
250                let upd_key = unblock({
251                    let store_key = store_key.clone();
252                    move || encode_profile_key(&profile_key, &store_key)
253                })
254                .await?;
255                upd_keys.insert(pid, upd_key);
256            }
257            drop(rows);
258            for (pid, key) in upd_keys {
259                if sqlx::query("UPDATE profiles SET profile_key=$1 WHERE id=$2")
260                    .bind(key)
261                    .bind(pid)
262                    .execute(txn.as_mut())
263                    .await?
264                    .rows_affected()
265                    != 1
266                {
267                    return Err(err_msg!(Backend, "Error updating profile key"));
268                }
269            }
270            if sqlx::query("UPDATE config SET value=$1 WHERE name='key'")
271                .bind(store_key_ref.into_uri())
272                .execute(txn.as_mut())
273                .await?
274                .rows_affected()
275                != 1
276            {
277                return Err(err_msg!(Backend, "Error updating store key"));
278            }
279            txn.commit().await?;
280            conn.return_to_pool().await;
281            self.key_cache = Arc::new(KeyCache::new(store_key));
282            Ok(())
283        })
284    }
285
286    fn scan(
287        &self,
288        profile: Option<String>,
289        kind: Option<EntryKind>,
290        category: Option<String>,
291        tag_filter: Option<TagFilter>,
292        offset: Option<i64>,
293        limit: Option<i64>,
294        order_by: Option<OrderBy>,
295        descending: bool,
296    ) -> BoxFuture<'_, Result<Scan<'static, Entry>, Error>> {
297        Box::pin(async move {
298            let session = self.session(profile, false)?;
299            let mut active = session.owned_ref();
300            let (profile_id, key) = acquire_key(&mut active).await?;
301            let scan = perform_scan(
302                active,
303                profile_id,
304                key.clone(),
305                kind,
306                category.clone(),
307                tag_filter,
308                offset,
309                limit,
310                order_by,
311                descending,
312                false,
313            );
314            let stream = scan.then(move |enc_rows| {
315                let category = category.clone();
316                let key = key.clone();
317                unblock(move || decrypt_scan_batch(category, enc_rows?, &key))
318            });
319            Ok(Scan::new(stream, PAGE_SIZE))
320        })
321    }
322
323    fn session(&self, profile: Option<String>, transaction: bool) -> Result<Self::Session, Error> {
324        Ok(DbSession::new(
325            self.conn_pool.clone(),
326            self.key_cache.clone(),
327            profile.unwrap_or_else(|| self.active_profile.clone()),
328            transaction,
329        ))
330    }
331
332    fn close(&self) -> BoxFuture<'_, Result<(), Error>> {
333        Box::pin(async move {
334            self.conn_pool.close().await;
335            Ok(())
336        })
337    }
338}
339
340impl Debug for PostgresBackend {
341    fn fmt(&self, f: &mut Formatter<'_>) -> fmt::Result {
342        f.debug_struct("PostgresStore")
343            .field("active_profile", &self.active_profile)
344            .field("host", &self.host)
345            .field("name", &self.name)
346            .finish()
347    }
348}
349
350impl BackendSession for DbSession<Postgres> {
351    fn count<'q>(
352        &'q mut self,
353        kind: Option<EntryKind>,
354        category: Option<&'q str>,
355        tag_filter: Option<TagFilter>,
356    ) -> BoxFuture<'q, Result<i64, Error>> {
357        let enc_category = category.map(|c| ProfileKey::prepare_input(c.as_bytes()));
358
359        Box::pin(async move {
360            let (profile_id, key) = acquire_key(&mut *self).await?;
361            let mut params = QueryParams::new();
362            params.push(profile_id);
363            params.push(kind.map(|k| k as i16));
364            let (enc_category, tag_filter) = unblock({
365                let params_len = params.len() + 1; // plus category
366                move || {
367                    Result::<_, Error>::Ok((
368                        enc_category
369                            .map(|c| key.encrypt_entry_category(c))
370                            .transpose()?,
371                        encode_tag_filter::<PostgresBackend>(tag_filter, &key, params_len)?,
372                    ))
373                }
374            })
375            .await?;
376            params.push(enc_category);
377            let query = extend_query::<PostgresBackend>(
378                COUNT_QUERY,
379                &mut params,
380                tag_filter,
381                None,
382                None,
383                None,
384                false,
385            )?;
386            let mut active = acquire_session(&mut *self).await?;
387            let count = sqlx::query_scalar_with(query.as_str(), params)
388                .fetch_one(active.connection_mut())
389                .await
390                .map_err(err_map!(Backend, "Error performing count query"))?;
391            Ok(count)
392        })
393    }
394
395    fn fetch(
396        &mut self,
397        kind: EntryKind,
398        category: &str,
399        name: &str,
400        for_update: bool,
401    ) -> BoxFuture<'_, Result<Option<Entry>, Error>> {
402        let category = category.to_string();
403        let name = name.to_string();
404
405        Box::pin(async move {
406            let (profile_id, key) = acquire_key(&mut *self).await?;
407            let (enc_category, enc_name) = unblock({
408                let key = key.clone();
409                let category = ProfileKey::prepare_input(category.as_bytes());
410                let name = ProfileKey::prepare_input(name.as_bytes());
411                move || {
412                    Result::<_, Error>::Ok((
413                        key.encrypt_entry_category(category)?,
414                        key.encrypt_entry_name(name)?,
415                    ))
416                }
417            })
418            .await?;
419            let mut active = acquire_session(&mut *self).await?;
420            if let Some(row) = sqlx::query(if for_update && active.in_transaction() {
421                FETCH_QUERY_UPDATE
422            } else {
423                FETCH_QUERY
424            })
425            .bind(profile_id)
426            .bind(kind as i16)
427            .bind(enc_category)
428            .bind(enc_name)
429            .fetch_optional(active.connection_mut())
430            .await
431            .map_err(err_map!(Backend, "Error performing fetch query"))?
432            {
433                let value = row.try_get(1)?;
434                let tags = row.try_get::<Option<String>, _>(2)?.map(String::into_bytes);
435                let (category, name, value, tags) = unblock(move || {
436                    let value = key.decrypt_entry_value(category.as_ref(), name.as_ref(), value)?;
437                    let tags = if let Some(enc_tags) = tags {
438                        key.decrypt_entry_tags(
439                            decode_tags(enc_tags)
440                                .map_err(|_| err_msg!(Unexpected, "Error decoding tags"))?,
441                        )?
442                    } else {
443                        Vec::new()
444                    };
445                    Result::<_, Error>::Ok((category, name, value, tags))
446                })
447                .await?;
448                Ok(Some(Entry::new(kind, category, name, value, tags)))
449            } else {
450                Ok(None)
451            }
452        })
453    }
454
455    fn fetch_all<'q>(
456        &'q mut self,
457        kind: Option<EntryKind>,
458        category: Option<&'q str>,
459        tag_filter: Option<TagFilter>,
460        limit: Option<i64>,
461        order_by: Option<OrderBy>,
462        descending: bool,
463        for_update: bool,
464    ) -> BoxFuture<'q, Result<Vec<Entry>, Error>> {
465        let category = category.map(|c| c.to_string());
466        Box::pin(async move {
467            let for_update = for_update && self.in_transaction();
468            let mut active = self.borrow_mut();
469            let (profile_id, key) = acquire_key(&mut active).await?;
470            let scan = perform_scan(
471                active,
472                profile_id,
473                key.clone(),
474                kind,
475                category.clone(),
476                tag_filter,
477                None,
478                limit,
479                order_by,
480                descending,
481                for_update,
482            );
483            pin!(scan);
484            let mut enc_rows = vec![];
485            while let Some(rows) = scan.try_next().await? {
486                enc_rows.extend(rows)
487            }
488            unblock(move || decrypt_scan_batch(category, enc_rows, &key)).await
489        })
490    }
491
492    fn remove_all<'q>(
493        &'q mut self,
494        kind: Option<EntryKind>,
495        category: Option<&'q str>,
496        tag_filter: Option<TagFilter>,
497    ) -> BoxFuture<'q, Result<i64, Error>> {
498        let enc_category = category.map(|c| ProfileKey::prepare_input(c.as_bytes()));
499
500        Box::pin(async move {
501            let (profile_id, key) = acquire_key(&mut *self).await?;
502            let mut params = QueryParams::new();
503            params.push(profile_id);
504            params.push(kind.map(|k| k as i16));
505            let (enc_category, tag_filter) = unblock({
506                let params_len = params.len() + 1; // plus category
507                move || {
508                    Result::<_, Error>::Ok((
509                        enc_category
510                            .map(|c| key.encrypt_entry_category(c))
511                            .transpose()?,
512                        encode_tag_filter::<PostgresBackend>(tag_filter, &key, params_len)?,
513                    ))
514                }
515            })
516            .await?;
517            params.push(enc_category);
518            let query = extend_query::<PostgresBackend>(
519                DELETE_ALL_QUERY,
520                &mut params,
521                tag_filter,
522                None,
523                None,
524                None,
525                false,
526            )?;
527
528            let mut active = acquire_session(&mut *self).await?;
529            let removed = sqlx::query_with(query.as_str(), params)
530                .execute(active.connection_mut())
531                .await?
532                .rows_affected();
533            Ok(removed as i64)
534        })
535    }
536
537    fn update<'q>(
538        &'q mut self,
539        kind: EntryKind,
540        operation: EntryOperation,
541        category: &'q str,
542        name: &'q str,
543        value: Option<&'q [u8]>,
544        tags: Option<&'q [EntryTag]>,
545        expiry_ms: Option<i64>,
546    ) -> BoxFuture<'q, Result<(), Error>> {
547        let category = ProfileKey::prepare_input(category.as_bytes());
548        let name = ProfileKey::prepare_input(name.as_bytes());
549
550        match operation {
551            op @ EntryOperation::Insert | op @ EntryOperation::Replace => {
552                let value = ProfileKey::prepare_input(value.unwrap_or_default());
553                let tags = tags.map(prepare_tags);
554                Box::pin(async move {
555                    let (_, key) = acquire_key(&mut *self).await?;
556                    let (enc_category, enc_name, enc_value, enc_tags) = unblock(move || {
557                        let enc_value =
558                            key.encrypt_entry_value(category.as_ref(), name.as_ref(), value)?;
559                        Result::<_, Error>::Ok((
560                            key.encrypt_entry_category(category)?,
561                            key.encrypt_entry_name(name)?,
562                            enc_value,
563                            tags.transpose()?
564                                .map(|t| key.encrypt_entry_tags(t))
565                                .transpose()?,
566                        ))
567                    })
568                    .await?;
569                    let mut active = acquire_session(&mut *self).await?;
570                    let mut txn = active.as_transaction().await?;
571                    perform_insert(
572                        &mut txn,
573                        kind,
574                        &enc_category,
575                        &enc_name,
576                        &enc_value,
577                        enc_tags,
578                        expiry_ms,
579                        op == EntryOperation::Insert,
580                    )
581                    .await?;
582                    txn.commit().await?;
583                    Ok(())
584                })
585            }
586
587            EntryOperation::Remove => Box::pin(async move {
588                let (_, key) = acquire_key(&mut *self).await?;
589                let (enc_category, enc_name) = unblock(move || {
590                    Result::<_, Error>::Ok((
591                        key.encrypt_entry_category(category)?,
592                        key.encrypt_entry_name(name)?,
593                    ))
594                })
595                .await?;
596                let mut active = acquire_session(&mut *self).await?;
597                perform_remove(&mut active, kind, &enc_category, &enc_name, false).await
598            }),
599        }
600    }
601
602    fn ping(&mut self) -> BoxFuture<'_, Result<(), Error>> {
603        Box::pin(async move {
604            let mut sess = acquire_session(&mut *self).await?;
605            if sess.in_transaction() {
606                // the profile row is locked, perform a typical ping
607                sqlx::Connection::ping(sess.connection_mut())
608                    .await
609                    .map_err(err_map!(Backend, "Error pinging session"))?;
610            } else {
611                let count: i64 = sqlx::query_scalar("SELECT COUNT(*) FROM profiles WHERE id=$1")
612                    .bind(sess.profile_id)
613                    .fetch_one(sess.connection_mut())
614                    .await
615                    .map_err(err_map!(Backend, "Error pinging session"))?;
616                if count == 0 {
617                    return Err(err_msg!(NotFound, "Session profile has been removed"));
618                }
619            }
620            Ok(())
621        })
622    }
623
624    fn close(&mut self, commit: bool) -> BoxFuture<'_, Result<(), Error>> {
625        Box::pin(self.close(commit))
626    }
627}
628
629impl ExtDatabase for Postgres {}
630
631impl QueryPrepare for PostgresBackend {
632    type DB = Postgres;
633
634    fn placeholder(index: i64) -> String {
635        format!("${}", index)
636    }
637
638    fn limit_query<'q>(
639        mut query: String,
640        args: &mut QueryParams<'q, Self::DB>,
641        offset: Option<i64>,
642        limit: Option<i64>,
643    ) -> String
644    where
645        i64: for<'e> sqlx::Encode<'e, Self::DB> + sqlx::Type<Self::DB>,
646    {
647        if offset.is_some() || limit.is_some() {
648            let last_idx = (args.len() + 1) as i64;
649            args.push(limit);
650            args.push(offset.unwrap_or(0));
651            let limit = replace_arg_placeholders::<Self>(" LIMIT $$ OFFSET $$", last_idx);
652            query.push_str(&limit);
653        }
654        query
655    }
656}
657
658async fn acquire_key(
659    session: &mut DbSession<Postgres>,
660) -> Result<(ProfileId, Arc<ProfileKey>), Error> {
661    acquire_session(session).await?;
662    Ok(session.profile_and_key().unwrap())
663}
664
665async fn acquire_session(
666    session: &'_ mut DbSession<Postgres>,
667) -> Result<DbSessionActive<'_, Postgres>, Error> {
668    session.make_active(&resolve_profile_key).await
669}
670
671async fn resolve_profile_key(
672    conn: &mut PoolConnection<Postgres>,
673    cache: Arc<KeyCache>,
674    profile: String,
675    in_txn: bool,
676) -> Result<(ProfileId, Arc<ProfileKey>), Error> {
677    if let Some((pid, key)) = cache.get_profile(profile.as_str()).await {
678        if in_txn {
679            // lock the profile row to prevent it from being removed
680            let check: Option<i64> =
681                sqlx::query_scalar("SELECT id FROM profiles WHERE id=$1 FOR NO KEY UPDATE")
682                    .bind(pid)
683                    .fetch_optional(conn.as_mut())
684                    .await?;
685            if check.is_none() {
686                return Err(err_msg!(NotFound, "Session profile has been removed"));
687            }
688        }
689        Ok((pid, key))
690    } else if let Some(row) =
691        sqlx::query("SELECT id, profile_key FROM profiles WHERE name=$1 FOR NO KEY UPDATE")
692            .bind(profile.as_str())
693            .fetch_optional(conn.as_mut())
694            .await?
695    {
696        let pid = row.try_get(0)?;
697        let key = Arc::new(cache.load_key(row.try_get(1)?).await?);
698        cache.add_profile(profile, pid, key.clone()).await;
699        Ok((pid, key))
700    } else {
701        Err(err_msg!(NotFound, "Profile not found"))
702    }
703}
704
705#[allow(clippy::too_many_arguments)]
706async fn perform_insert(
707    active: &mut DbSessionTxn<'_, Postgres>,
708    kind: EntryKind,
709    enc_category: &[u8],
710    enc_name: &[u8],
711    enc_value: &[u8],
712    enc_tags: Option<Vec<EncEntryTag>>,
713    expiry_ms: Option<i64>,
714    new_row: bool,
715) -> Result<(), Error> {
716    let row_id = if new_row {
717        trace!("Insert entry");
718        sqlx::query_scalar(INSERT_QUERY)
719            .bind(active.profile_id)
720            .bind(kind as i16)
721            .bind(enc_category)
722            .bind(enc_name)
723            .bind(enc_value)
724            .bind(expiry_ms.map(expiry_timestamp).transpose()?)
725            .fetch_optional(active.connection_mut())
726            .await?
727            .ok_or_else(|| err_msg!(Duplicate, "Duplicate entry"))?
728    } else {
729        trace!("Update entry");
730        let row_id: i64 = sqlx::query_scalar(UPDATE_QUERY)
731            .bind(active.profile_id)
732            .bind(kind as i16)
733            .bind(enc_category)
734            .bind(enc_name)
735            .bind(enc_value)
736            .bind(expiry_ms.map(expiry_timestamp).transpose()?)
737            .fetch_one(active.connection_mut())
738            .await
739            .map_err(|_| err_msg!(NotFound, "Error updating existing entry"))?;
740        sqlx::query(TAG_DELETE_QUERY)
741            .bind(row_id)
742            .execute(active.connection_mut())
743            .await
744            .map_err(err_map!(Backend, "Error removing existing entry tags"))?;
745        row_id
746    };
747    if let Some(tags) = enc_tags {
748        for tag in tags {
749            sqlx::query(TAG_INSERT_QUERY)
750                .bind(row_id)
751                .bind(&tag.name)
752                .bind(&tag.value)
753                .bind(tag.plaintext as i16)
754                .execute(active.connection_mut())
755                .await
756                .map_err(err_map!(Backend, "Error inserting entry tags"))?;
757        }
758    }
759    Ok(())
760}
761
762async fn perform_remove(
763    active: &mut DbSessionActive<'_, Postgres>,
764    kind: EntryKind,
765    enc_category: &[u8],
766    enc_name: &[u8],
767    ignore_error: bool,
768) -> Result<(), Error> {
769    trace!("Remove entry");
770    let done = sqlx::query(DELETE_QUERY)
771        .bind(active.profile_id)
772        .bind(kind as i16)
773        .bind(enc_category)
774        .bind(enc_name)
775        .execute(active.connection_mut())
776        .await
777        .map_err(err_map!(Backend, "Error removing entry"))?;
778    if done.rows_affected() == 0 && !ignore_error {
779        Err(err_msg!(NotFound, "Entry not found"))
780    } else {
781        Ok(())
782    }
783}
784
785#[allow(clippy::too_many_arguments)]
786fn perform_scan(
787    mut active: DbSessionRef<'_, Postgres>,
788    profile_id: ProfileId,
789    key: Arc<ProfileKey>,
790    kind: Option<EntryKind>,
791    category: Option<String>,
792    tag_filter: Option<TagFilter>,
793    offset: Option<i64>,
794    limit: Option<i64>,
795    order_by: Option<OrderBy>,
796    descending: bool,
797    for_update: bool,
798) -> impl Stream<Item = Result<Vec<EncScanEntry>, Error>> + '_ {
799    try_stream! {
800        let mut params = QueryParams::new();
801        params.push(profile_id);
802        params.push(kind.map(|k| k as i16));
803        let (enc_category, tag_filter) = unblock({
804            let key = key.clone();
805            let enc_category = category.map(|c| ProfileKey::prepare_input(c.as_bytes()));
806            let params_len = params.len() + 1; // plus category
807            move || {
808                Result::<_, Error>::Ok((
809                    enc_category
810                        .map(|c| key.encrypt_entry_category(c))
811                        .transpose()?,
812                    encode_tag_filter::<PostgresBackend>(tag_filter, &key, params_len)?
813                ))
814            }
815        }).await?;
816        params.push(enc_category);
817        let mut query = extend_query::<PostgresBackend>(SCAN_QUERY, &mut params, tag_filter, offset, limit, order_by, descending)?;
818        if for_update {
819            query.push_str(" FOR NO KEY UPDATE");
820        }
821        let mut batch = Vec::with_capacity(PAGE_SIZE);
822
823        let mut acquired = acquire_session(&mut active).await?;
824        let mut rows = sqlx::query_with(query.as_str(), params).fetch(acquired.connection_mut());
825        while let Some(row) = rows.try_next().await? {
826            let tags = row.try_get::<Option<String>, _>(5)?.map(String::into_bytes).unwrap_or_default();
827            let kind: i16 = row.try_get(1)?;
828            let kind = EntryKind::try_from(kind as usize)?;
829            batch.push(EncScanEntry {
830                kind, category: row.try_get(2)?, name: row.try_get(3)?, value: row.try_get(4)?, tags
831            });
832            if batch.len() == PAGE_SIZE {
833                yield batch.split_off(0);
834            }
835        }
836        drop(rows);
837        if active.is_owned() {
838            active.close(false).await?;
839        }
840        drop(active);
841
842        if !batch.is_empty() {
843            yield batch;
844        }
845    }
846}
847
848#[cfg(test)]
849mod tests {
850    use super::*;
851    use crate::backend::db_utils::replace_arg_placeholders;
852
853    #[test]
854    fn postgres_simple_and_convert_args_works() {
855        assert_eq!(
856            &replace_arg_placeholders::<PostgresBackend>("This $$ is $10 a $$ string!", 3),
857            "This $3 is $12 a $5 string!",
858        );
859    }
860}