1use std::collections::BTreeMap;
2use std::fmt::{self, Debug, Formatter};
3use std::sync::Arc;
4
5use async_stream::try_stream;
6
7use futures_lite::{
8 pin,
9 stream::{Stream, StreamExt},
10};
11
12use sqlx::{
13 pool::PoolConnection,
14 postgres::{PgPool, Postgres},
15 Acquire, Row,
16};
17
18use super::{
19 db_utils::{
20 decode_tags, decrypt_scan_batch, encode_profile_key, encode_tag_filter, expiry_timestamp,
21 extend_query, prepare_tags, random_profile_name, replace_arg_placeholders, DbSession,
22 DbSessionActive, DbSessionRef, DbSessionTxn, EncScanEntry, ExtDatabase, QueryParams,
23 QueryPrepare, PAGE_SIZE,
24 },
25 Backend, BackendSession,
26};
27use crate::{
28 backend::OrderBy,
29 entry::{EncEntryTag, Entry, EntryKind, EntryOperation, EntryTag, Scan, TagFilter},
30 error::Error,
31 future::{unblock, BoxFuture},
32 protect::{EntryEncryptor, KeyCache, PassKey, ProfileId, ProfileKey, StoreKeyMethod},
33};
34
35mod provision;
36pub use self::provision::PostgresStoreOptions;
37
38#[cfg(any(test, feature = "pg_test"))]
39mod test_db;
40#[cfg(any(test, feature = "pg_test"))]
41pub use self::test_db::TestDB;
42
43const CONFIG_FETCH_QUERY: &str = "SELECT value FROM config WHERE name = $1";
44const CONFIG_UPDATE_QUERY: &str = "INSERT INTO config (name, value) VALUES ($1, $2)
45 ON CONFLICT(name) DO UPDATE SET value = excluded.value";
46const COUNT_QUERY: &str = "SELECT COUNT(*) FROM items i
47 WHERE profile_id = $1
48 AND (kind = $2 OR $2 IS NULL)
49 AND (category = $3 OR $3 IS NULL)
50 AND (expiry IS NULL OR expiry > CURRENT_TIMESTAMP)";
51const DELETE_QUERY: &str = "DELETE FROM items
52 WHERE profile_id = $1 AND kind = $2 AND category = $3 AND name = $4";
53const FETCH_QUERY: &str = "SELECT id, value,
54 (SELECT ARRAY_TO_STRING(ARRAY_AGG(it.plaintext || ':'
55 || ENCODE(it.name, 'hex') || ':' || ENCODE(it.value, 'hex')), ',')
56 FROM items_tags it WHERE it.item_id = i.id) tags
57 FROM items i
58 WHERE profile_id = $1 AND kind = $2 AND category = $3 AND name = $4
59 AND (expiry IS NULL OR expiry > CURRENT_TIMESTAMP)";
60const FETCH_QUERY_UPDATE: &str = "SELECT id, value,
61 (SELECT ARRAY_TO_STRING(ARRAY_AGG(it.plaintext || ':'
62 || ENCODE(it.name, 'hex') || ':' || ENCODE(it.value, 'hex')), ',')
63 FROM items_tags it WHERE it.item_id = i.id) tags
64 FROM items i
65 WHERE profile_id = $1 AND kind = $2 AND category = $3 AND name = $4
66 AND (expiry IS NULL OR expiry > CURRENT_TIMESTAMP) FOR NO KEY UPDATE";
67const INSERT_QUERY: &str = "INSERT INTO items (profile_id, kind, category, name, value, expiry)
68 VALUES ($1, $2, $3, $4, $5, $6)
69 ON CONFLICT DO NOTHING RETURNING id";
70const UPDATE_QUERY: &str = "UPDATE items SET value=$5, expiry=$6
71 WHERE profile_id=$1 AND kind=$2 AND category=$3 AND name=$4
72 RETURNING id";
73const SCAN_QUERY: &str = "SELECT id, kind, category, name, value,
74 (SELECT ARRAY_TO_STRING(ARRAY_AGG(it.plaintext || ':'
75 || ENCODE(it.name, 'hex') || ':' || ENCODE(it.value, 'hex')), ',')
76 FROM items_tags it WHERE it.item_id = i.id) tags
77 FROM items i WHERE profile_id = $1
78 AND (kind = $2 OR $2 IS NULL)
79 AND (category = $3 OR $3 IS NULL)
80 AND (expiry IS NULL OR expiry > CURRENT_TIMESTAMP)";
81const DELETE_ALL_QUERY: &str = "DELETE FROM items i
82 WHERE profile_id = $1
83 AND (kind = $2 OR $2 IS NULL)
84 AND (category = $3 OR $3 IS NULL)";
85const TAG_INSERT_QUERY: &str = "INSERT INTO items_tags
86 (item_id, name, value, plaintext) VALUES ($1, $2, $3, $4)";
87const TAG_DELETE_QUERY: &str = "DELETE FROM items_tags
88 WHERE item_id=$1";
89
90pub struct PostgresBackend {
92 conn_pool: PgPool,
93 active_profile: String,
94 key_cache: Arc<KeyCache>,
95 host: String,
96 name: String,
97}
98
99impl PostgresBackend {
100 pub(crate) fn new(
101 conn_pool: PgPool,
102 active_profile: String,
103 key_cache: KeyCache,
104 host: String,
105 name: String,
106 ) -> Self {
107 Self {
108 conn_pool,
109 active_profile,
110 key_cache: Arc::new(key_cache),
111 host,
112 name,
113 }
114 }
115}
116
117impl Backend for PostgresBackend {
118 type Session = DbSession<Postgres>;
119
120 fn create_profile(&self, name: Option<String>) -> BoxFuture<'_, Result<String, Error>> {
121 let name = name.unwrap_or_else(random_profile_name);
122 Box::pin(async move {
123 let store_key = self.key_cache.store_key.clone();
124 let (profile_key, enc_key) = unblock(move || {
125 let profile_key = ProfileKey::new()?;
126 let enc_key = encode_profile_key(&profile_key, &store_key)?;
127 Result::<_, Error>::Ok((profile_key, enc_key))
128 })
129 .await?;
130 let mut conn = self.conn_pool.acquire().await?;
131 let res = sqlx::query_scalar(
132 "INSERT INTO profiles (name, profile_key) VALUES ($1, $2)
133 ON CONFLICT DO NOTHING RETURNING id",
134 )
135 .bind(&name)
136 .bind(enc_key)
137 .fetch_optional(conn.as_mut())
138 .await?;
139 conn.return_to_pool().await;
140 if let Some(pid) = res {
141 self.key_cache
142 .add_profile(name.clone(), pid, Arc::new(profile_key))
143 .await;
144 Ok(name)
145 } else {
146 Err(err_msg!(Duplicate, "Duplicate profile name"))
147 }
148 })
149 }
150
151 fn get_active_profile(&self) -> String {
152 self.active_profile.clone()
153 }
154
155 fn get_default_profile(&self) -> BoxFuture<'_, Result<String, Error>> {
156 Box::pin(async move {
157 let mut conn = self.conn_pool.acquire().await?;
158 let profile: Option<String> = sqlx::query_scalar(CONFIG_FETCH_QUERY)
159 .bind("default_profile")
160 .fetch_one(conn.as_mut())
161 .await
162 .map_err(err_map!(Backend, "Error fetching default profile name"))?;
163 conn.return_to_pool().await;
164 Ok(profile.unwrap_or_default())
165 })
166 }
167
168 fn set_default_profile(&self, profile: String) -> BoxFuture<'_, Result<(), Error>> {
169 Box::pin(async move {
170 let mut conn = self.conn_pool.acquire().await?;
171 sqlx::query(CONFIG_UPDATE_QUERY)
172 .bind("default_profile")
173 .bind(profile)
174 .execute(conn.as_mut())
175 .await
176 .map_err(err_map!(Backend, "Error setting default profile name"))?;
177 conn.return_to_pool().await;
178 Ok(())
179 })
180 }
181
182 fn list_profiles(&self) -> BoxFuture<'_, Result<Vec<String>, Error>> {
183 Box::pin(async move {
184 let mut conn = self.conn_pool.acquire().await?;
185 let rows = sqlx::query("SELECT name FROM profiles")
186 .fetch_all(conn.as_mut())
187 .await
188 .map_err(err_map!(Backend, "Error fetching profile list"))?;
189 let names = rows.into_iter().flat_map(|r| r.try_get(0)).collect();
190 conn.return_to_pool().await;
191 Ok(names)
192 })
193 }
194
195 fn remove_profile(&self, name: String) -> BoxFuture<'_, Result<bool, Error>> {
196 Box::pin(async move {
197 let mut conn = self.conn_pool.acquire().await?;
198 let ret = sqlx::query("DELETE FROM profiles WHERE name=$1")
199 .bind(&name)
200 .execute(conn.as_mut())
201 .await
202 .map_err(err_map!(Backend, "Error removing profile"))?
203 .rows_affected()
204 != 0;
205 conn.return_to_pool().await;
206 self.key_cache.clear_profile(&name).await;
207 Ok(ret)
208 })
209 }
210
211 fn rename_profile(
212 &self,
213 from_name: String,
214 to_name: String,
215 ) -> BoxFuture<'_, Result<bool, Error>> {
216 Box::pin(async move {
217 let mut conn = self.conn_pool.acquire().await?;
218 let ret = sqlx::query("UPDATE profiles SET name=$1 WHERE name=$2")
219 .bind(&to_name)
220 .bind(&from_name)
221 .execute(conn.as_mut())
222 .await
223 .map_err(err_map!(Backend, "Error renaming profile"))?
224 .rows_affected()
225 != 0;
226 conn.return_to_pool().await;
227 self.key_cache.clear_profile(&from_name).await;
228 Ok(ret)
229 })
230 }
231
232 fn rekey(
233 &mut self,
234 method: StoreKeyMethod,
235 pass_key: PassKey<'_>,
236 ) -> BoxFuture<'_, Result<(), Error>> {
237 let pass_key = pass_key.into_owned();
238 Box::pin(async move {
239 let (store_key, store_key_ref) = unblock(move || method.resolve(pass_key)).await?;
240 let store_key = Arc::new(store_key);
241 let mut conn = self.conn_pool.acquire().await?;
242 let mut txn = conn.begin().await?;
243 let mut rows = sqlx::query("SELECT id, profile_key FROM profiles").fetch(txn.as_mut());
244 let mut upd_keys = BTreeMap::<ProfileId, Vec<u8>>::new();
245 while let Some(row) = rows.next().await {
246 let row = row?;
247 let pid = row.try_get(0)?;
248 let enc_key = row.try_get(1)?;
249 let profile_key = self.key_cache.load_key(enc_key).await?;
250 let upd_key = unblock({
251 let store_key = store_key.clone();
252 move || encode_profile_key(&profile_key, &store_key)
253 })
254 .await?;
255 upd_keys.insert(pid, upd_key);
256 }
257 drop(rows);
258 for (pid, key) in upd_keys {
259 if sqlx::query("UPDATE profiles SET profile_key=$1 WHERE id=$2")
260 .bind(key)
261 .bind(pid)
262 .execute(txn.as_mut())
263 .await?
264 .rows_affected()
265 != 1
266 {
267 return Err(err_msg!(Backend, "Error updating profile key"));
268 }
269 }
270 if sqlx::query("UPDATE config SET value=$1 WHERE name='key'")
271 .bind(store_key_ref.into_uri())
272 .execute(txn.as_mut())
273 .await?
274 .rows_affected()
275 != 1
276 {
277 return Err(err_msg!(Backend, "Error updating store key"));
278 }
279 txn.commit().await?;
280 conn.return_to_pool().await;
281 self.key_cache = Arc::new(KeyCache::new(store_key));
282 Ok(())
283 })
284 }
285
286 fn scan(
287 &self,
288 profile: Option<String>,
289 kind: Option<EntryKind>,
290 category: Option<String>,
291 tag_filter: Option<TagFilter>,
292 offset: Option<i64>,
293 limit: Option<i64>,
294 order_by: Option<OrderBy>,
295 descending: bool,
296 ) -> BoxFuture<'_, Result<Scan<'static, Entry>, Error>> {
297 Box::pin(async move {
298 let session = self.session(profile, false)?;
299 let mut active = session.owned_ref();
300 let (profile_id, key) = acquire_key(&mut active).await?;
301 let scan = perform_scan(
302 active,
303 profile_id,
304 key.clone(),
305 kind,
306 category.clone(),
307 tag_filter,
308 offset,
309 limit,
310 order_by,
311 descending,
312 false,
313 );
314 let stream = scan.then(move |enc_rows| {
315 let category = category.clone();
316 let key = key.clone();
317 unblock(move || decrypt_scan_batch(category, enc_rows?, &key))
318 });
319 Ok(Scan::new(stream, PAGE_SIZE))
320 })
321 }
322
323 fn session(&self, profile: Option<String>, transaction: bool) -> Result<Self::Session, Error> {
324 Ok(DbSession::new(
325 self.conn_pool.clone(),
326 self.key_cache.clone(),
327 profile.unwrap_or_else(|| self.active_profile.clone()),
328 transaction,
329 ))
330 }
331
332 fn close(&self) -> BoxFuture<'_, Result<(), Error>> {
333 Box::pin(async move {
334 self.conn_pool.close().await;
335 Ok(())
336 })
337 }
338}
339
340impl Debug for PostgresBackend {
341 fn fmt(&self, f: &mut Formatter<'_>) -> fmt::Result {
342 f.debug_struct("PostgresStore")
343 .field("active_profile", &self.active_profile)
344 .field("host", &self.host)
345 .field("name", &self.name)
346 .finish()
347 }
348}
349
350impl BackendSession for DbSession<Postgres> {
351 fn count<'q>(
352 &'q mut self,
353 kind: Option<EntryKind>,
354 category: Option<&'q str>,
355 tag_filter: Option<TagFilter>,
356 ) -> BoxFuture<'q, Result<i64, Error>> {
357 let enc_category = category.map(|c| ProfileKey::prepare_input(c.as_bytes()));
358
359 Box::pin(async move {
360 let (profile_id, key) = acquire_key(&mut *self).await?;
361 let mut params = QueryParams::new();
362 params.push(profile_id);
363 params.push(kind.map(|k| k as i16));
364 let (enc_category, tag_filter) = unblock({
365 let params_len = params.len() + 1; move || {
367 Result::<_, Error>::Ok((
368 enc_category
369 .map(|c| key.encrypt_entry_category(c))
370 .transpose()?,
371 encode_tag_filter::<PostgresBackend>(tag_filter, &key, params_len)?,
372 ))
373 }
374 })
375 .await?;
376 params.push(enc_category);
377 let query = extend_query::<PostgresBackend>(
378 COUNT_QUERY,
379 &mut params,
380 tag_filter,
381 None,
382 None,
383 None,
384 false,
385 )?;
386 let mut active = acquire_session(&mut *self).await?;
387 let count = sqlx::query_scalar_with(query.as_str(), params)
388 .fetch_one(active.connection_mut())
389 .await
390 .map_err(err_map!(Backend, "Error performing count query"))?;
391 Ok(count)
392 })
393 }
394
395 fn fetch(
396 &mut self,
397 kind: EntryKind,
398 category: &str,
399 name: &str,
400 for_update: bool,
401 ) -> BoxFuture<'_, Result<Option<Entry>, Error>> {
402 let category = category.to_string();
403 let name = name.to_string();
404
405 Box::pin(async move {
406 let (profile_id, key) = acquire_key(&mut *self).await?;
407 let (enc_category, enc_name) = unblock({
408 let key = key.clone();
409 let category = ProfileKey::prepare_input(category.as_bytes());
410 let name = ProfileKey::prepare_input(name.as_bytes());
411 move || {
412 Result::<_, Error>::Ok((
413 key.encrypt_entry_category(category)?,
414 key.encrypt_entry_name(name)?,
415 ))
416 }
417 })
418 .await?;
419 let mut active = acquire_session(&mut *self).await?;
420 if let Some(row) = sqlx::query(if for_update && active.in_transaction() {
421 FETCH_QUERY_UPDATE
422 } else {
423 FETCH_QUERY
424 })
425 .bind(profile_id)
426 .bind(kind as i16)
427 .bind(enc_category)
428 .bind(enc_name)
429 .fetch_optional(active.connection_mut())
430 .await
431 .map_err(err_map!(Backend, "Error performing fetch query"))?
432 {
433 let value = row.try_get(1)?;
434 let tags = row.try_get::<Option<String>, _>(2)?.map(String::into_bytes);
435 let (category, name, value, tags) = unblock(move || {
436 let value = key.decrypt_entry_value(category.as_ref(), name.as_ref(), value)?;
437 let tags = if let Some(enc_tags) = tags {
438 key.decrypt_entry_tags(
439 decode_tags(enc_tags)
440 .map_err(|_| err_msg!(Unexpected, "Error decoding tags"))?,
441 )?
442 } else {
443 Vec::new()
444 };
445 Result::<_, Error>::Ok((category, name, value, tags))
446 })
447 .await?;
448 Ok(Some(Entry::new(kind, category, name, value, tags)))
449 } else {
450 Ok(None)
451 }
452 })
453 }
454
455 fn fetch_all<'q>(
456 &'q mut self,
457 kind: Option<EntryKind>,
458 category: Option<&'q str>,
459 tag_filter: Option<TagFilter>,
460 limit: Option<i64>,
461 order_by: Option<OrderBy>,
462 descending: bool,
463 for_update: bool,
464 ) -> BoxFuture<'q, Result<Vec<Entry>, Error>> {
465 let category = category.map(|c| c.to_string());
466 Box::pin(async move {
467 let for_update = for_update && self.in_transaction();
468 let mut active = self.borrow_mut();
469 let (profile_id, key) = acquire_key(&mut active).await?;
470 let scan = perform_scan(
471 active,
472 profile_id,
473 key.clone(),
474 kind,
475 category.clone(),
476 tag_filter,
477 None,
478 limit,
479 order_by,
480 descending,
481 for_update,
482 );
483 pin!(scan);
484 let mut enc_rows = vec![];
485 while let Some(rows) = scan.try_next().await? {
486 enc_rows.extend(rows)
487 }
488 unblock(move || decrypt_scan_batch(category, enc_rows, &key)).await
489 })
490 }
491
492 fn remove_all<'q>(
493 &'q mut self,
494 kind: Option<EntryKind>,
495 category: Option<&'q str>,
496 tag_filter: Option<TagFilter>,
497 ) -> BoxFuture<'q, Result<i64, Error>> {
498 let enc_category = category.map(|c| ProfileKey::prepare_input(c.as_bytes()));
499
500 Box::pin(async move {
501 let (profile_id, key) = acquire_key(&mut *self).await?;
502 let mut params = QueryParams::new();
503 params.push(profile_id);
504 params.push(kind.map(|k| k as i16));
505 let (enc_category, tag_filter) = unblock({
506 let params_len = params.len() + 1; move || {
508 Result::<_, Error>::Ok((
509 enc_category
510 .map(|c| key.encrypt_entry_category(c))
511 .transpose()?,
512 encode_tag_filter::<PostgresBackend>(tag_filter, &key, params_len)?,
513 ))
514 }
515 })
516 .await?;
517 params.push(enc_category);
518 let query = extend_query::<PostgresBackend>(
519 DELETE_ALL_QUERY,
520 &mut params,
521 tag_filter,
522 None,
523 None,
524 None,
525 false,
526 )?;
527
528 let mut active = acquire_session(&mut *self).await?;
529 let removed = sqlx::query_with(query.as_str(), params)
530 .execute(active.connection_mut())
531 .await?
532 .rows_affected();
533 Ok(removed as i64)
534 })
535 }
536
537 fn update<'q>(
538 &'q mut self,
539 kind: EntryKind,
540 operation: EntryOperation,
541 category: &'q str,
542 name: &'q str,
543 value: Option<&'q [u8]>,
544 tags: Option<&'q [EntryTag]>,
545 expiry_ms: Option<i64>,
546 ) -> BoxFuture<'q, Result<(), Error>> {
547 let category = ProfileKey::prepare_input(category.as_bytes());
548 let name = ProfileKey::prepare_input(name.as_bytes());
549
550 match operation {
551 op @ EntryOperation::Insert | op @ EntryOperation::Replace => {
552 let value = ProfileKey::prepare_input(value.unwrap_or_default());
553 let tags = tags.map(prepare_tags);
554 Box::pin(async move {
555 let (_, key) = acquire_key(&mut *self).await?;
556 let (enc_category, enc_name, enc_value, enc_tags) = unblock(move || {
557 let enc_value =
558 key.encrypt_entry_value(category.as_ref(), name.as_ref(), value)?;
559 Result::<_, Error>::Ok((
560 key.encrypt_entry_category(category)?,
561 key.encrypt_entry_name(name)?,
562 enc_value,
563 tags.transpose()?
564 .map(|t| key.encrypt_entry_tags(t))
565 .transpose()?,
566 ))
567 })
568 .await?;
569 let mut active = acquire_session(&mut *self).await?;
570 let mut txn = active.as_transaction().await?;
571 perform_insert(
572 &mut txn,
573 kind,
574 &enc_category,
575 &enc_name,
576 &enc_value,
577 enc_tags,
578 expiry_ms,
579 op == EntryOperation::Insert,
580 )
581 .await?;
582 txn.commit().await?;
583 Ok(())
584 })
585 }
586
587 EntryOperation::Remove => Box::pin(async move {
588 let (_, key) = acquire_key(&mut *self).await?;
589 let (enc_category, enc_name) = unblock(move || {
590 Result::<_, Error>::Ok((
591 key.encrypt_entry_category(category)?,
592 key.encrypt_entry_name(name)?,
593 ))
594 })
595 .await?;
596 let mut active = acquire_session(&mut *self).await?;
597 perform_remove(&mut active, kind, &enc_category, &enc_name, false).await
598 }),
599 }
600 }
601
602 fn ping(&mut self) -> BoxFuture<'_, Result<(), Error>> {
603 Box::pin(async move {
604 let mut sess = acquire_session(&mut *self).await?;
605 if sess.in_transaction() {
606 sqlx::Connection::ping(sess.connection_mut())
608 .await
609 .map_err(err_map!(Backend, "Error pinging session"))?;
610 } else {
611 let count: i64 = sqlx::query_scalar("SELECT COUNT(*) FROM profiles WHERE id=$1")
612 .bind(sess.profile_id)
613 .fetch_one(sess.connection_mut())
614 .await
615 .map_err(err_map!(Backend, "Error pinging session"))?;
616 if count == 0 {
617 return Err(err_msg!(NotFound, "Session profile has been removed"));
618 }
619 }
620 Ok(())
621 })
622 }
623
624 fn close(&mut self, commit: bool) -> BoxFuture<'_, Result<(), Error>> {
625 Box::pin(self.close(commit))
626 }
627}
628
629impl ExtDatabase for Postgres {}
630
631impl QueryPrepare for PostgresBackend {
632 type DB = Postgres;
633
634 fn placeholder(index: i64) -> String {
635 format!("${}", index)
636 }
637
638 fn limit_query<'q>(
639 mut query: String,
640 args: &mut QueryParams<'q, Self::DB>,
641 offset: Option<i64>,
642 limit: Option<i64>,
643 ) -> String
644 where
645 i64: for<'e> sqlx::Encode<'e, Self::DB> + sqlx::Type<Self::DB>,
646 {
647 if offset.is_some() || limit.is_some() {
648 let last_idx = (args.len() + 1) as i64;
649 args.push(limit);
650 args.push(offset.unwrap_or(0));
651 let limit = replace_arg_placeholders::<Self>(" LIMIT $$ OFFSET $$", last_idx);
652 query.push_str(&limit);
653 }
654 query
655 }
656}
657
658async fn acquire_key(
659 session: &mut DbSession<Postgres>,
660) -> Result<(ProfileId, Arc<ProfileKey>), Error> {
661 acquire_session(session).await?;
662 Ok(session.profile_and_key().unwrap())
663}
664
665async fn acquire_session(
666 session: &'_ mut DbSession<Postgres>,
667) -> Result<DbSessionActive<'_, Postgres>, Error> {
668 session.make_active(&resolve_profile_key).await
669}
670
671async fn resolve_profile_key(
672 conn: &mut PoolConnection<Postgres>,
673 cache: Arc<KeyCache>,
674 profile: String,
675 in_txn: bool,
676) -> Result<(ProfileId, Arc<ProfileKey>), Error> {
677 if let Some((pid, key)) = cache.get_profile(profile.as_str()).await {
678 if in_txn {
679 let check: Option<i64> =
681 sqlx::query_scalar("SELECT id FROM profiles WHERE id=$1 FOR NO KEY UPDATE")
682 .bind(pid)
683 .fetch_optional(conn.as_mut())
684 .await?;
685 if check.is_none() {
686 return Err(err_msg!(NotFound, "Session profile has been removed"));
687 }
688 }
689 Ok((pid, key))
690 } else if let Some(row) =
691 sqlx::query("SELECT id, profile_key FROM profiles WHERE name=$1 FOR NO KEY UPDATE")
692 .bind(profile.as_str())
693 .fetch_optional(conn.as_mut())
694 .await?
695 {
696 let pid = row.try_get(0)?;
697 let key = Arc::new(cache.load_key(row.try_get(1)?).await?);
698 cache.add_profile(profile, pid, key.clone()).await;
699 Ok((pid, key))
700 } else {
701 Err(err_msg!(NotFound, "Profile not found"))
702 }
703}
704
705#[allow(clippy::too_many_arguments)]
706async fn perform_insert(
707 active: &mut DbSessionTxn<'_, Postgres>,
708 kind: EntryKind,
709 enc_category: &[u8],
710 enc_name: &[u8],
711 enc_value: &[u8],
712 enc_tags: Option<Vec<EncEntryTag>>,
713 expiry_ms: Option<i64>,
714 new_row: bool,
715) -> Result<(), Error> {
716 let row_id = if new_row {
717 trace!("Insert entry");
718 sqlx::query_scalar(INSERT_QUERY)
719 .bind(active.profile_id)
720 .bind(kind as i16)
721 .bind(enc_category)
722 .bind(enc_name)
723 .bind(enc_value)
724 .bind(expiry_ms.map(expiry_timestamp).transpose()?)
725 .fetch_optional(active.connection_mut())
726 .await?
727 .ok_or_else(|| err_msg!(Duplicate, "Duplicate entry"))?
728 } else {
729 trace!("Update entry");
730 let row_id: i64 = sqlx::query_scalar(UPDATE_QUERY)
731 .bind(active.profile_id)
732 .bind(kind as i16)
733 .bind(enc_category)
734 .bind(enc_name)
735 .bind(enc_value)
736 .bind(expiry_ms.map(expiry_timestamp).transpose()?)
737 .fetch_one(active.connection_mut())
738 .await
739 .map_err(|_| err_msg!(NotFound, "Error updating existing entry"))?;
740 sqlx::query(TAG_DELETE_QUERY)
741 .bind(row_id)
742 .execute(active.connection_mut())
743 .await
744 .map_err(err_map!(Backend, "Error removing existing entry tags"))?;
745 row_id
746 };
747 if let Some(tags) = enc_tags {
748 for tag in tags {
749 sqlx::query(TAG_INSERT_QUERY)
750 .bind(row_id)
751 .bind(&tag.name)
752 .bind(&tag.value)
753 .bind(tag.plaintext as i16)
754 .execute(active.connection_mut())
755 .await
756 .map_err(err_map!(Backend, "Error inserting entry tags"))?;
757 }
758 }
759 Ok(())
760}
761
762async fn perform_remove(
763 active: &mut DbSessionActive<'_, Postgres>,
764 kind: EntryKind,
765 enc_category: &[u8],
766 enc_name: &[u8],
767 ignore_error: bool,
768) -> Result<(), Error> {
769 trace!("Remove entry");
770 let done = sqlx::query(DELETE_QUERY)
771 .bind(active.profile_id)
772 .bind(kind as i16)
773 .bind(enc_category)
774 .bind(enc_name)
775 .execute(active.connection_mut())
776 .await
777 .map_err(err_map!(Backend, "Error removing entry"))?;
778 if done.rows_affected() == 0 && !ignore_error {
779 Err(err_msg!(NotFound, "Entry not found"))
780 } else {
781 Ok(())
782 }
783}
784
785#[allow(clippy::too_many_arguments)]
786fn perform_scan(
787 mut active: DbSessionRef<'_, Postgres>,
788 profile_id: ProfileId,
789 key: Arc<ProfileKey>,
790 kind: Option<EntryKind>,
791 category: Option<String>,
792 tag_filter: Option<TagFilter>,
793 offset: Option<i64>,
794 limit: Option<i64>,
795 order_by: Option<OrderBy>,
796 descending: bool,
797 for_update: bool,
798) -> impl Stream<Item = Result<Vec<EncScanEntry>, Error>> + '_ {
799 try_stream! {
800 let mut params = QueryParams::new();
801 params.push(profile_id);
802 params.push(kind.map(|k| k as i16));
803 let (enc_category, tag_filter) = unblock({
804 let key = key.clone();
805 let enc_category = category.map(|c| ProfileKey::prepare_input(c.as_bytes()));
806 let params_len = params.len() + 1; move || {
808 Result::<_, Error>::Ok((
809 enc_category
810 .map(|c| key.encrypt_entry_category(c))
811 .transpose()?,
812 encode_tag_filter::<PostgresBackend>(tag_filter, &key, params_len)?
813 ))
814 }
815 }).await?;
816 params.push(enc_category);
817 let mut query = extend_query::<PostgresBackend>(SCAN_QUERY, &mut params, tag_filter, offset, limit, order_by, descending)?;
818 if for_update {
819 query.push_str(" FOR NO KEY UPDATE");
820 }
821 let mut batch = Vec::with_capacity(PAGE_SIZE);
822
823 let mut acquired = acquire_session(&mut active).await?;
824 let mut rows = sqlx::query_with(query.as_str(), params).fetch(acquired.connection_mut());
825 while let Some(row) = rows.try_next().await? {
826 let tags = row.try_get::<Option<String>, _>(5)?.map(String::into_bytes).unwrap_or_default();
827 let kind: i16 = row.try_get(1)?;
828 let kind = EntryKind::try_from(kind as usize)?;
829 batch.push(EncScanEntry {
830 kind, category: row.try_get(2)?, name: row.try_get(3)?, value: row.try_get(4)?, tags
831 });
832 if batch.len() == PAGE_SIZE {
833 yield batch.split_off(0);
834 }
835 }
836 drop(rows);
837 if active.is_owned() {
838 active.close(false).await?;
839 }
840 drop(active);
841
842 if !batch.is_empty() {
843 yield batch;
844 }
845 }
846}
847
848#[cfg(test)]
849mod tests {
850 use super::*;
851 use crate::backend::db_utils::replace_arg_placeholders;
852
853 #[test]
854 fn postgres_simple_and_convert_args_works() {
855 assert_eq!(
856 &replace_arg_placeholders::<PostgresBackend>("This $$ is $10 a $$ string!", 3),
857 "This $3 is $12 a $5 string!",
858 );
859 }
860}