1use std::collections::BTreeMap;
2use std::fmt::{self, Debug, Formatter};
3use std::sync::Arc;
4
5use async_stream::try_stream;
6
7use futures_lite::{
8 pin,
9 stream::{Stream, StreamExt},
10};
11
12use sqlx::{
13 pool::PoolConnection,
14 postgres::{PgPool, Postgres},
15 Acquire, Row,
16};
17
18use super::{
19 db_utils::{
20 decode_tags, decrypt_scan_batch, encode_profile_key, encode_tag_filter, expiry_timestamp,
21 extend_query, prepare_tags, random_profile_name, replace_arg_placeholders, DbSession,
22 DbSessionActive, DbSessionRef, DbSessionTxn, EncScanEntry, ExtDatabase, QueryParams,
23 QueryPrepare, PAGE_SIZE,
24 },
25 Backend, BackendSession,
26};
27use crate::{
28 backend::OrderBy,
29 entry::{EncEntryTag, Entry, EntryKind, EntryOperation, EntryTag, Scan, TagFilter},
30 error::Error,
31 future::{unblock, BoxFuture},
32 protect::{EntryEncryptor, KeyCache, PassKey, ProfileId, ProfileKey, StoreKeyMethod},
33};
34
35mod provision;
36pub use self::provision::PostgresStoreOptions;
37
38#[cfg(any(test, feature = "pg_test"))]
39mod test_db;
40#[cfg(any(test, feature = "pg_test"))]
41pub use self::test_db::TestDB;
42
43const CONFIG_FETCH_QUERY: &str = "SELECT value FROM config WHERE name = $1";
44const CONFIG_UPDATE_QUERY: &str = "INSERT INTO config (name, value) VALUES ($1, $2)
45 ON CONFLICT(name) DO UPDATE SET value = excluded.value";
46const COUNT_QUERY: &str = "SELECT COUNT(*) FROM items i
47 WHERE profile_id = $1
48 AND (kind = $2 OR $2 IS NULL)
49 AND (category = $3 OR $3 IS NULL)
50 AND (expiry IS NULL OR expiry > CURRENT_TIMESTAMP)";
51const DELETE_QUERY: &str = "DELETE FROM items
52 WHERE profile_id = $1 AND kind = $2 AND category = $3 AND name = $4";
53const FETCH_QUERY: &str = "SELECT id, value,
54 (SELECT ARRAY_TO_STRING(ARRAY_AGG(it.plaintext || ':'
55 || ENCODE(it.name, 'hex') || ':' || ENCODE(it.value, 'hex')), ',')
56 FROM items_tags it WHERE it.item_id = i.id) tags
57 FROM items i
58 WHERE profile_id = $1 AND kind = $2 AND category = $3 AND name = $4
59 AND (expiry IS NULL OR expiry > CURRENT_TIMESTAMP)";
60const FETCH_QUERY_UPDATE: &str = "SELECT id, value,
61 (SELECT ARRAY_TO_STRING(ARRAY_AGG(it.plaintext || ':'
62 || ENCODE(it.name, 'hex') || ':' || ENCODE(it.value, 'hex')), ',')
63 FROM items_tags it WHERE it.item_id = i.id) tags
64 FROM items i
65 WHERE profile_id = $1 AND kind = $2 AND category = $3 AND name = $4
66 AND (expiry IS NULL OR expiry > CURRENT_TIMESTAMP) FOR NO KEY UPDATE";
67const INSERT_QUERY: &str = "INSERT INTO items (profile_id, kind, category, name, value, expiry)
68 VALUES ($1, $2, $3, $4, $5, $6)
69 ON CONFLICT DO NOTHING RETURNING id";
70const UPDATE_QUERY: &str = "UPDATE items SET value=$5, expiry=$6
71 WHERE profile_id=$1 AND kind=$2 AND category=$3 AND name=$4
72 RETURNING id";
73const SCAN_QUERY: &str = "SELECT id, kind, category, name, value,
74 (SELECT ARRAY_TO_STRING(ARRAY_AGG(it.plaintext || ':'
75 || ENCODE(it.name, 'hex') || ':' || ENCODE(it.value, 'hex')), ',')
76 FROM items_tags it WHERE it.item_id = i.id) tags
77 FROM items i WHERE profile_id = $1
78 AND (kind = $2 OR $2 IS NULL)
79 AND (category = $3 OR $3 IS NULL)
80 AND (expiry IS NULL OR expiry > CURRENT_TIMESTAMP)";
81const DELETE_ALL_QUERY: &str = "DELETE FROM items i
82 WHERE profile_id = $1
83 AND (kind = $2 OR $2 IS NULL)
84 AND (category = $3 OR $3 IS NULL)";
85const TAG_INSERT_QUERY: &str = "INSERT INTO items_tags
86 (item_id, name, value, plaintext) VALUES ($1, $2, $3, $4)";
87const TAG_DELETE_QUERY: &str = "DELETE FROM items_tags
88 WHERE item_id=$1";
89
90pub struct PostgresBackend {
92 conn_pool: PgPool,
93 active_profile: String,
94 key_cache: Arc<KeyCache>,
95 host: String,
96 name: String,
97}
98
99impl PostgresBackend {
100 pub(crate) fn new(
101 conn_pool: PgPool,
102 active_profile: String,
103 key_cache: KeyCache,
104 host: String,
105 name: String,
106 ) -> Self {
107 Self {
108 conn_pool,
109 active_profile,
110 key_cache: Arc::new(key_cache),
111 host,
112 name,
113 }
114 }
115}
116
117impl Backend for PostgresBackend {
118 type Session = DbSession<Postgres>;
119
120 fn create_profile(&self, name: Option<String>) -> BoxFuture<'_, Result<String, Error>> {
121 let name = name.unwrap_or_else(random_profile_name);
122 Box::pin(async move {
123 let store_key = self.key_cache.store_key.clone();
124 let (profile_key, enc_key) = unblock(move || {
125 let profile_key = ProfileKey::new()?;
126 let enc_key = encode_profile_key(&profile_key, &store_key)?;
127 Result::<_, Error>::Ok((profile_key, enc_key))
128 })
129 .await?;
130 let mut conn = self.conn_pool.acquire().await?;
131 let res = sqlx::query_scalar(
132 "INSERT INTO profiles (name, profile_key) VALUES ($1, $2)
133 ON CONFLICT DO NOTHING RETURNING id",
134 )
135 .bind(&name)
136 .bind(enc_key)
137 .fetch_optional(conn.as_mut())
138 .await?;
139 conn.return_to_pool().await;
140 if let Some(pid) = res {
141 self.key_cache
142 .add_profile(name.clone(), pid, Arc::new(profile_key))
143 .await;
144 Ok(name)
145 } else {
146 Err(err_msg!(Duplicate, "Duplicate profile name"))
147 }
148 })
149 }
150
151 fn get_active_profile(&self) -> String {
152 self.active_profile.clone()
153 }
154
155 fn get_default_profile(&self) -> BoxFuture<'_, Result<String, Error>> {
156 Box::pin(async move {
157 let mut conn = self.conn_pool.acquire().await?;
158 let profile: Option<String> = sqlx::query_scalar(CONFIG_FETCH_QUERY)
159 .bind("default_profile")
160 .fetch_one(conn.as_mut())
161 .await
162 .map_err(err_map!(Backend, "Error fetching default profile name"))?;
163 conn.return_to_pool().await;
164 Ok(profile.unwrap_or_default())
165 })
166 }
167
168 fn set_default_profile(&self, profile: String) -> BoxFuture<'_, Result<(), Error>> {
169 Box::pin(async move {
170 let mut conn = self.conn_pool.acquire().await?;
171 sqlx::query(CONFIG_UPDATE_QUERY)
172 .bind("default_profile")
173 .bind(profile)
174 .execute(conn.as_mut())
175 .await
176 .map_err(err_map!(Backend, "Error setting default profile name"))?;
177 conn.return_to_pool().await;
178 Ok(())
179 })
180 }
181
182 fn list_profiles(&self) -> BoxFuture<'_, Result<Vec<String>, Error>> {
183 Box::pin(async move {
184 let mut conn = self.conn_pool.acquire().await?;
185 let rows = sqlx::query("SELECT name FROM profiles")
186 .fetch_all(conn.as_mut())
187 .await
188 .map_err(err_map!(Backend, "Error fetching profile list"))?;
189 let names = rows.into_iter().flat_map(|r| r.try_get(0)).collect();
190 conn.return_to_pool().await;
191 Ok(names)
192 })
193 }
194
195 fn remove_profile(&self, name: String) -> BoxFuture<'_, Result<bool, Error>> {
196 Box::pin(async move {
197 let mut conn = self.conn_pool.acquire().await?;
198 let ret = sqlx::query("DELETE FROM profiles WHERE name=$1")
199 .bind(&name)
200 .execute(conn.as_mut())
201 .await
202 .map_err(err_map!(Backend, "Error removing profile"))?
203 .rows_affected()
204 != 0;
205 conn.return_to_pool().await;
206 Ok(ret)
207 })
208 }
209
210 fn rekey(
211 &mut self,
212 method: StoreKeyMethod,
213 pass_key: PassKey<'_>,
214 ) -> BoxFuture<'_, Result<(), Error>> {
215 let pass_key = pass_key.into_owned();
216 Box::pin(async move {
217 let (store_key, store_key_ref) = unblock(move || method.resolve(pass_key)).await?;
218 let store_key = Arc::new(store_key);
219 let mut conn = self.conn_pool.acquire().await?;
220 let mut txn = conn.begin().await?;
221 let mut rows = sqlx::query("SELECT id, profile_key FROM profiles").fetch(txn.as_mut());
222 let mut upd_keys = BTreeMap::<ProfileId, Vec<u8>>::new();
223 while let Some(row) = rows.next().await {
224 let row = row?;
225 let pid = row.try_get(0)?;
226 let enc_key = row.try_get(1)?;
227 let profile_key = self.key_cache.load_key(enc_key).await?;
228 let upd_key = unblock({
229 let store_key = store_key.clone();
230 move || encode_profile_key(&profile_key, &store_key)
231 })
232 .await?;
233 upd_keys.insert(pid, upd_key);
234 }
235 drop(rows);
236 for (pid, key) in upd_keys {
237 if sqlx::query("UPDATE profiles SET profile_key=$1 WHERE id=$2")
238 .bind(key)
239 .bind(pid)
240 .execute(txn.as_mut())
241 .await?
242 .rows_affected()
243 != 1
244 {
245 return Err(err_msg!(Backend, "Error updating profile key"));
246 }
247 }
248 if sqlx::query("UPDATE config SET value=$1 WHERE name='key'")
249 .bind(store_key_ref.into_uri())
250 .execute(txn.as_mut())
251 .await?
252 .rows_affected()
253 != 1
254 {
255 return Err(err_msg!(Backend, "Error updating store key"));
256 }
257 txn.commit().await?;
258 conn.return_to_pool().await;
259 self.key_cache = Arc::new(KeyCache::new(store_key));
260 Ok(())
261 })
262 }
263
264 fn scan(
265 &self,
266 profile: Option<String>,
267 kind: Option<EntryKind>,
268 category: Option<String>,
269 tag_filter: Option<TagFilter>,
270 offset: Option<i64>,
271 limit: Option<i64>,
272 order_by: Option<OrderBy>,
273 descending: bool,
274 ) -> BoxFuture<'_, Result<Scan<'static, Entry>, Error>> {
275 Box::pin(async move {
276 let session = self.session(profile, false)?;
277 let mut active = session.owned_ref();
278 let (profile_id, key) = acquire_key(&mut active).await?;
279 let scan = perform_scan(
280 active,
281 profile_id,
282 key.clone(),
283 kind,
284 category.clone(),
285 tag_filter,
286 offset,
287 limit,
288 order_by,
289 descending,
290 false,
291 );
292 let stream = scan.then(move |enc_rows| {
293 let category = category.clone();
294 let key = key.clone();
295 unblock(move || decrypt_scan_batch(category, enc_rows?, &key))
296 });
297 Ok(Scan::new(stream, PAGE_SIZE))
298 })
299 }
300
301 fn session(&self, profile: Option<String>, transaction: bool) -> Result<Self::Session, Error> {
302 Ok(DbSession::new(
303 self.conn_pool.clone(),
304 self.key_cache.clone(),
305 profile.unwrap_or_else(|| self.active_profile.clone()),
306 transaction,
307 ))
308 }
309
310 fn close(&self) -> BoxFuture<'_, Result<(), Error>> {
311 Box::pin(async move {
312 self.conn_pool.close().await;
313 Ok(())
314 })
315 }
316}
317
318impl Debug for PostgresBackend {
319 fn fmt(&self, f: &mut Formatter<'_>) -> fmt::Result {
320 f.debug_struct("PostgresStore")
321 .field("active_profile", &self.active_profile)
322 .field("host", &self.host)
323 .field("name", &self.name)
324 .finish()
325 }
326}
327
328impl BackendSession for DbSession<Postgres> {
329 fn count<'q>(
330 &'q mut self,
331 kind: Option<EntryKind>,
332 category: Option<&'q str>,
333 tag_filter: Option<TagFilter>,
334 ) -> BoxFuture<'q, Result<i64, Error>> {
335 let enc_category = category.map(|c| ProfileKey::prepare_input(c.as_bytes()));
336
337 Box::pin(async move {
338 let (profile_id, key) = acquire_key(&mut *self).await?;
339 let mut params = QueryParams::new();
340 params.push(profile_id);
341 params.push(kind.map(|k| k as i16));
342 let (enc_category, tag_filter) = unblock({
343 let params_len = params.len() + 1; move || {
345 Result::<_, Error>::Ok((
346 enc_category
347 .map(|c| key.encrypt_entry_category(c))
348 .transpose()?,
349 encode_tag_filter::<PostgresBackend>(tag_filter, &key, params_len)?,
350 ))
351 }
352 })
353 .await?;
354 params.push(enc_category);
355 let query = extend_query::<PostgresBackend>(
356 COUNT_QUERY,
357 &mut params,
358 tag_filter,
359 None,
360 None,
361 None,
362 false,
363 )?;
364 let mut active = acquire_session(&mut *self).await?;
365 let count = sqlx::query_scalar_with(query.as_str(), params)
366 .fetch_one(active.connection_mut())
367 .await
368 .map_err(err_map!(Backend, "Error performing count query"))?;
369 Ok(count)
370 })
371 }
372
373 fn fetch(
374 &mut self,
375 kind: EntryKind,
376 category: &str,
377 name: &str,
378 for_update: bool,
379 ) -> BoxFuture<'_, Result<Option<Entry>, Error>> {
380 let category = category.to_string();
381 let name = name.to_string();
382
383 Box::pin(async move {
384 let (profile_id, key) = acquire_key(&mut *self).await?;
385 let (enc_category, enc_name) = unblock({
386 let key = key.clone();
387 let category = ProfileKey::prepare_input(category.as_bytes());
388 let name = ProfileKey::prepare_input(name.as_bytes());
389 move || {
390 Result::<_, Error>::Ok((
391 key.encrypt_entry_category(category)?,
392 key.encrypt_entry_name(name)?,
393 ))
394 }
395 })
396 .await?;
397 let mut active = acquire_session(&mut *self).await?;
398 if let Some(row) = sqlx::query(if for_update && active.in_transaction() {
399 FETCH_QUERY_UPDATE
400 } else {
401 FETCH_QUERY
402 })
403 .bind(profile_id)
404 .bind(kind as i16)
405 .bind(enc_category)
406 .bind(enc_name)
407 .fetch_optional(active.connection_mut())
408 .await
409 .map_err(err_map!(Backend, "Error performing fetch query"))?
410 {
411 let value = row.try_get(1)?;
412 let tags = row.try_get::<Option<String>, _>(2)?.map(String::into_bytes);
413 let (category, name, value, tags) = unblock(move || {
414 let value = key.decrypt_entry_value(category.as_ref(), name.as_ref(), value)?;
415 let tags = if let Some(enc_tags) = tags {
416 key.decrypt_entry_tags(
417 decode_tags(enc_tags)
418 .map_err(|_| err_msg!(Unexpected, "Error decoding tags"))?,
419 )?
420 } else {
421 Vec::new()
422 };
423 Result::<_, Error>::Ok((category, name, value, tags))
424 })
425 .await?;
426 Ok(Some(Entry::new(kind, category, name, value, tags)))
427 } else {
428 Ok(None)
429 }
430 })
431 }
432
433 fn fetch_all<'q>(
434 &'q mut self,
435 kind: Option<EntryKind>,
436 category: Option<&'q str>,
437 tag_filter: Option<TagFilter>,
438 limit: Option<i64>,
439 order_by: Option<OrderBy>,
440 descending: bool,
441 for_update: bool,
442 ) -> BoxFuture<'q, Result<Vec<Entry>, Error>> {
443 let category = category.map(|c| c.to_string());
444 Box::pin(async move {
445 let for_update = for_update && self.in_transaction();
446 let mut active = self.borrow_mut();
447 let (profile_id, key) = acquire_key(&mut active).await?;
448 let scan = perform_scan(
449 active,
450 profile_id,
451 key.clone(),
452 kind,
453 category.clone(),
454 tag_filter,
455 None,
456 limit,
457 order_by,
458 descending,
459 for_update,
460 );
461 pin!(scan);
462 let mut enc_rows = vec![];
463 while let Some(rows) = scan.try_next().await? {
464 enc_rows.extend(rows)
465 }
466 unblock(move || decrypt_scan_batch(category, enc_rows, &key)).await
467 })
468 }
469
470 fn remove_all<'q>(
471 &'q mut self,
472 kind: Option<EntryKind>,
473 category: Option<&'q str>,
474 tag_filter: Option<TagFilter>,
475 ) -> BoxFuture<'q, Result<i64, Error>> {
476 let enc_category = category.map(|c| ProfileKey::prepare_input(c.as_bytes()));
477
478 Box::pin(async move {
479 let (profile_id, key) = acquire_key(&mut *self).await?;
480 let mut params = QueryParams::new();
481 params.push(profile_id);
482 params.push(kind.map(|k| k as i16));
483 let (enc_category, tag_filter) = unblock({
484 let params_len = params.len() + 1; move || {
486 Result::<_, Error>::Ok((
487 enc_category
488 .map(|c| key.encrypt_entry_category(c))
489 .transpose()?,
490 encode_tag_filter::<PostgresBackend>(tag_filter, &key, params_len)?,
491 ))
492 }
493 })
494 .await?;
495 params.push(enc_category);
496 let query = extend_query::<PostgresBackend>(
497 DELETE_ALL_QUERY,
498 &mut params,
499 tag_filter,
500 None,
501 None,
502 None,
503 false,
504 )?;
505
506 let mut active = acquire_session(&mut *self).await?;
507 let removed = sqlx::query_with(query.as_str(), params)
508 .execute(active.connection_mut())
509 .await?
510 .rows_affected();
511 Ok(removed as i64)
512 })
513 }
514
515 fn update<'q>(
516 &'q mut self,
517 kind: EntryKind,
518 operation: EntryOperation,
519 category: &'q str,
520 name: &'q str,
521 value: Option<&'q [u8]>,
522 tags: Option<&'q [EntryTag]>,
523 expiry_ms: Option<i64>,
524 ) -> BoxFuture<'q, Result<(), Error>> {
525 let category = ProfileKey::prepare_input(category.as_bytes());
526 let name = ProfileKey::prepare_input(name.as_bytes());
527
528 match operation {
529 op @ EntryOperation::Insert | op @ EntryOperation::Replace => {
530 let value = ProfileKey::prepare_input(value.unwrap_or_default());
531 let tags = tags.map(prepare_tags);
532 Box::pin(async move {
533 let (_, key) = acquire_key(&mut *self).await?;
534 let (enc_category, enc_name, enc_value, enc_tags) = unblock(move || {
535 let enc_value =
536 key.encrypt_entry_value(category.as_ref(), name.as_ref(), value)?;
537 Result::<_, Error>::Ok((
538 key.encrypt_entry_category(category)?,
539 key.encrypt_entry_name(name)?,
540 enc_value,
541 tags.transpose()?
542 .map(|t| key.encrypt_entry_tags(t))
543 .transpose()?,
544 ))
545 })
546 .await?;
547 let mut active = acquire_session(&mut *self).await?;
548 let mut txn = active.as_transaction().await?;
549 perform_insert(
550 &mut txn,
551 kind,
552 &enc_category,
553 &enc_name,
554 &enc_value,
555 enc_tags,
556 expiry_ms,
557 op == EntryOperation::Insert,
558 )
559 .await?;
560 txn.commit().await?;
561 Ok(())
562 })
563 }
564
565 EntryOperation::Remove => Box::pin(async move {
566 let (_, key) = acquire_key(&mut *self).await?;
567 let (enc_category, enc_name) = unblock(move || {
568 Result::<_, Error>::Ok((
569 key.encrypt_entry_category(category)?,
570 key.encrypt_entry_name(name)?,
571 ))
572 })
573 .await?;
574 let mut active = acquire_session(&mut *self).await?;
575 perform_remove(&mut active, kind, &enc_category, &enc_name, false).await
576 }),
577 }
578 }
579
580 fn ping(&mut self) -> BoxFuture<'_, Result<(), Error>> {
581 Box::pin(async move {
582 let mut sess = acquire_session(&mut *self).await?;
583 if sess.in_transaction() {
584 sqlx::Connection::ping(sess.connection_mut())
586 .await
587 .map_err(err_map!(Backend, "Error pinging session"))?;
588 } else {
589 let count: i64 = sqlx::query_scalar("SELECT COUNT(*) FROM profiles WHERE id=$1")
590 .bind(sess.profile_id)
591 .fetch_one(sess.connection_mut())
592 .await
593 .map_err(err_map!(Backend, "Error pinging session"))?;
594 if count == 0 {
595 return Err(err_msg!(NotFound, "Session profile has been removed"));
596 }
597 }
598 Ok(())
599 })
600 }
601
602 fn close(&mut self, commit: bool) -> BoxFuture<'_, Result<(), Error>> {
603 Box::pin(self.close(commit))
604 }
605}
606
607impl ExtDatabase for Postgres {}
608
609impl QueryPrepare for PostgresBackend {
610 type DB = Postgres;
611
612 fn placeholder(index: i64) -> String {
613 format!("${}", index)
614 }
615
616 fn limit_query<'q>(
617 mut query: String,
618 args: &mut QueryParams<'q, Self::DB>,
619 offset: Option<i64>,
620 limit: Option<i64>,
621 ) -> String
622 where
623 i64: for<'e> sqlx::Encode<'e, Self::DB> + sqlx::Type<Self::DB>,
624 {
625 if offset.is_some() || limit.is_some() {
626 let last_idx = (args.len() + 1) as i64;
627 args.push(limit);
628 args.push(offset.unwrap_or(0));
629 let limit = replace_arg_placeholders::<Self>(" LIMIT $$ OFFSET $$", last_idx);
630 query.push_str(&limit);
631 }
632 query
633 }
634}
635
636async fn acquire_key(
637 session: &mut DbSession<Postgres>,
638) -> Result<(ProfileId, Arc<ProfileKey>), Error> {
639 acquire_session(session).await?;
640 Ok(session.profile_and_key().unwrap())
641}
642
643async fn acquire_session(
644 session: &'_ mut DbSession<Postgres>,
645) -> Result<DbSessionActive<'_, Postgres>, Error> {
646 session.make_active(&resolve_profile_key).await
647}
648
649async fn resolve_profile_key(
650 conn: &mut PoolConnection<Postgres>,
651 cache: Arc<KeyCache>,
652 profile: String,
653 in_txn: bool,
654) -> Result<(ProfileId, Arc<ProfileKey>), Error> {
655 if let Some((pid, key)) = cache.get_profile(profile.as_str()).await {
656 if in_txn {
657 let check: Option<i64> =
659 sqlx::query_scalar("SELECT id FROM profiles WHERE id=$1 FOR NO KEY UPDATE")
660 .bind(pid)
661 .fetch_optional(conn.as_mut())
662 .await?;
663 if check.is_none() {
664 return Err(err_msg!(NotFound, "Session profile has been removed"));
665 }
666 }
667 Ok((pid, key))
668 } else if let Some(row) =
669 sqlx::query("SELECT id, profile_key FROM profiles WHERE name=$1 FOR NO KEY UPDATE")
670 .bind(profile.as_str())
671 .fetch_optional(conn.as_mut())
672 .await?
673 {
674 let pid = row.try_get(0)?;
675 let key = Arc::new(cache.load_key(row.try_get(1)?).await?);
676 cache.add_profile(profile, pid, key.clone()).await;
677 Ok((pid, key))
678 } else {
679 Err(err_msg!(NotFound, "Profile not found"))
680 }
681}
682
683#[allow(clippy::too_many_arguments)]
684async fn perform_insert(
685 active: &mut DbSessionTxn<'_, Postgres>,
686 kind: EntryKind,
687 enc_category: &[u8],
688 enc_name: &[u8],
689 enc_value: &[u8],
690 enc_tags: Option<Vec<EncEntryTag>>,
691 expiry_ms: Option<i64>,
692 new_row: bool,
693) -> Result<(), Error> {
694 let row_id = if new_row {
695 trace!("Insert entry");
696 sqlx::query_scalar(INSERT_QUERY)
697 .bind(active.profile_id)
698 .bind(kind as i16)
699 .bind(enc_category)
700 .bind(enc_name)
701 .bind(enc_value)
702 .bind(expiry_ms.map(expiry_timestamp).transpose()?)
703 .fetch_optional(active.connection_mut())
704 .await?
705 .ok_or_else(|| err_msg!(Duplicate, "Duplicate entry"))?
706 } else {
707 trace!("Update entry");
708 let row_id: i64 = sqlx::query_scalar(UPDATE_QUERY)
709 .bind(active.profile_id)
710 .bind(kind as i16)
711 .bind(enc_category)
712 .bind(enc_name)
713 .bind(enc_value)
714 .bind(expiry_ms.map(expiry_timestamp).transpose()?)
715 .fetch_one(active.connection_mut())
716 .await
717 .map_err(|_| err_msg!(NotFound, "Error updating existing entry"))?;
718 sqlx::query(TAG_DELETE_QUERY)
719 .bind(row_id)
720 .execute(active.connection_mut())
721 .await
722 .map_err(err_map!(Backend, "Error removing existing entry tags"))?;
723 row_id
724 };
725 if let Some(tags) = enc_tags {
726 for tag in tags {
727 sqlx::query(TAG_INSERT_QUERY)
728 .bind(row_id)
729 .bind(&tag.name)
730 .bind(&tag.value)
731 .bind(tag.plaintext as i16)
732 .execute(active.connection_mut())
733 .await
734 .map_err(err_map!(Backend, "Error inserting entry tags"))?;
735 }
736 }
737 Ok(())
738}
739
740async fn perform_remove<'q>(
741 active: &mut DbSessionActive<'q, Postgres>,
742 kind: EntryKind,
743 enc_category: &[u8],
744 enc_name: &[u8],
745 ignore_error: bool,
746) -> Result<(), Error> {
747 trace!("Remove entry");
748 let done = sqlx::query(DELETE_QUERY)
749 .bind(active.profile_id)
750 .bind(kind as i16)
751 .bind(enc_category)
752 .bind(enc_name)
753 .execute(active.connection_mut())
754 .await
755 .map_err(err_map!(Backend, "Error removing entry"))?;
756 if done.rows_affected() == 0 && !ignore_error {
757 Err(err_msg!(NotFound, "Entry not found"))
758 } else {
759 Ok(())
760 }
761}
762
763#[allow(clippy::too_many_arguments)]
764fn perform_scan(
765 mut active: DbSessionRef<'_, Postgres>,
766 profile_id: ProfileId,
767 key: Arc<ProfileKey>,
768 kind: Option<EntryKind>,
769 category: Option<String>,
770 tag_filter: Option<TagFilter>,
771 offset: Option<i64>,
772 limit: Option<i64>,
773 order_by: Option<OrderBy>,
774 descending: bool,
775 for_update: bool,
776) -> impl Stream<Item = Result<Vec<EncScanEntry>, Error>> + '_ {
777 try_stream! {
778 let mut params = QueryParams::new();
779 params.push(profile_id);
780 params.push(kind.map(|k| k as i16));
781 let (enc_category, tag_filter) = unblock({
782 let key = key.clone();
783 let enc_category = category.map(|c| ProfileKey::prepare_input(c.as_bytes()));
784 let params_len = params.len() + 1; move || {
786 Result::<_, Error>::Ok((
787 enc_category
788 .map(|c| key.encrypt_entry_category(c))
789 .transpose()?,
790 encode_tag_filter::<PostgresBackend>(tag_filter, &key, params_len)?
791 ))
792 }
793 }).await?;
794 params.push(enc_category);
795 let mut query = extend_query::<PostgresBackend>(SCAN_QUERY, &mut params, tag_filter, offset, limit, order_by, descending)?;
796 if for_update {
797 query.push_str(" FOR NO KEY UPDATE");
798 }
799 let mut batch = Vec::with_capacity(PAGE_SIZE);
800
801 let mut acquired = acquire_session(&mut active).await?;
802 let mut rows = sqlx::query_with(query.as_str(), params).fetch(acquired.connection_mut());
803 while let Some(row) = rows.try_next().await? {
804 let tags = row.try_get::<Option<String>, _>(5)?.map(String::into_bytes).unwrap_or_default();
805 let kind: i16 = row.try_get(1)?;
806 let kind = EntryKind::try_from(kind as usize)?;
807 batch.push(EncScanEntry {
808 kind, category: row.try_get(2)?, name: row.try_get(3)?, value: row.try_get(4)?, tags
809 });
810 if batch.len() == PAGE_SIZE {
811 yield batch.split_off(0);
812 }
813 }
814 drop(rows);
815 if active.is_owned() {
816 active.close(false).await?;
817 }
818 drop(active);
819
820 if !batch.is_empty() {
821 yield batch;
822 }
823 }
824}
825
826#[cfg(test)]
827mod tests {
828 use super::*;
829 use crate::backend::db_utils::replace_arg_placeholders;
830
831 #[test]
832 fn postgres_simple_and_convert_args_works() {
833 assert_eq!(
834 &replace_arg_placeholders::<PostgresBackend>("This $$ is $10 a $$ string!", 3),
835 "This $3 is $12 a $5 string!",
836 );
837 }
838}