1use anyhow::{anyhow, Context};
2use crdb_cache::CacheDb;
3use crdb_core::{
4 normalizer_version, BinPtr, CanDoCallbacks, Db, DbPtr, Event, EventId, Importance, Object,
5 ObjectData, ObjectId, Query, ResultExt, Session, SessionRef, SessionToken, SystemTimeExt,
6 TypeId, Update, UpdateData, Updatedness, User, UsersWhoCanRead,
7};
8use crdb_core::{Decimal, JsonPathItem, ReadPermsChanges, ServerSideDb};
9use crdb_helpers::parse_snapshot;
10use futures::{future::Either, StreamExt, TryStreamExt};
11use lockable::{LockPool, Lockable};
12use sqlx::Row;
13use std::{
14 collections::{hash_map, BTreeMap, HashMap, HashSet},
15 marker::PhantomData,
16 pin::Pin,
17 sync::{Arc, Weak},
18 time::SystemTime,
19};
20use tokio::sync::Mutex;
21
22#[cfg(test)]
23mod tests;
24
25pub use crdb_core::{Error, Result};
26
27pub struct PostgresDb<Config: crdb_core::Config> {
28 db: sqlx::PgPool,
29 cache_db: Weak<CacheDb<PostgresDb<Config>>>,
30 event_locks: LockPool<EventId>,
31 object_locks: LockPool<ObjectId>,
35 _phantom: PhantomData<Config>,
36}
37
38impl<Config: crdb_core::Config> PostgresDb<Config> {
39 pub async fn connect(
40 db: sqlx::PgPool,
41 cache_watermark: usize,
42 ) -> anyhow::Result<Arc<CacheDb<PostgresDb<Config>>>> {
43 sqlx::migrate!("./migrations")
44 .run(&db)
45 .await
46 .context("running migrations on postgresql database")?;
47 let cache_db = Arc::new_cyclic(|cache_db| {
48 let db = PostgresDb {
49 db,
50 cache_db: cache_db.clone(),
51 event_locks: LockPool::new(),
52 object_locks: LockPool::new(),
53 _phantom: PhantomData,
54 };
55 CacheDb::new(db, cache_watermark)
56 });
57 Ok(cache_db)
58 }
59
60 async fn reencode<T: Object>(
61 &self,
62 object_id: ObjectId,
63 snapshot_id: EventId,
64 ) -> crate::Result<()> {
65 let _lock = self.object_locks.async_lock(object_id).await;
68
69 let Some(s) = sqlx::query!(
71 "
72 SELECT normalizer_version, snapshot_version, snapshot
73 FROM snapshots
74 WHERE snapshot_id = $1
75 ",
76 snapshot_id as EventId,
77 )
78 .fetch_optional(&self.db)
79 .await
80 .wrap_with_context(|| format!("fetching snapshot {snapshot_id:?}"))?
81 else {
82 return Ok(());
84 };
85
86 if s.normalizer_version >= normalizer_version()
88 && s.snapshot_version >= T::snapshot_version()
89 {
90 return Ok(());
91 }
92
93 let snapshot = parse_snapshot::<T>(s.snapshot_version, s.snapshot)
95 .wrap_with_context(|| format!("parsing snapshot {snapshot_id:?}"))?;
96 sqlx::query(
97 "
98 UPDATE snapshots
99 SET snapshot = $1, snapshot_version = $2, normalizer_version = $3
100 WHERE snapshot_id = $4
101 ",
102 )
103 .bind(sqlx::types::Json(&snapshot))
104 .bind(T::snapshot_version())
105 .bind(normalizer_version())
106 .bind(snapshot_id)
107 .execute(&self.db)
108 .await
109 .wrap_with_context(|| format!("re-encoding snapshot data for {snapshot_id:?}"))?;
110
111 Ok(())
112 }
113
114 async fn get_rdeps<'a, E: sqlx::Executor<'a, Database = sqlx::Postgres>>(
116 &self,
117 connection: E,
118 object_id: ObjectId,
119 ) -> anyhow::Result<Vec<ObjectId>> {
120 reord::point().await;
121 let rdeps = sqlx::query!(
122 "
123 SELECT object_id
124 FROM snapshots
125 WHERE $1 = ANY (users_who_can_read_depends_on)
126 AND is_latest
127 AND object_id != $1
128 ",
129 object_id as ObjectId,
130 )
131 .map(|o| ObjectId::from_uuid(o.object_id))
132 .fetch_all(connection)
133 .await
134 .with_context(|| format!("fetching the list of reverse-dependencies for {object_id:?}"))?;
135 Ok(rdeps)
136 }
137
138 pub async fn update_users_who_can_read<C: CanDoCallbacks>(
142 &self,
143 requested_by: ObjectId,
144 object_id: ObjectId,
145 cb: &C,
146 ) -> anyhow::Result<ReadPermsChanges> {
147 reord::point().await;
148 let mut transaction = self
149 .db
150 .begin()
151 .await
152 .wrap_context("acquiring postgresql transaction")?;
153
154 let _lock = (
156 reord::Lock::take_named(format!("{object_id:?}")).await,
157 self.object_locks.async_lock(object_id).await,
158 );
159
160 reord::point().await;
162 let res = sqlx::query!(
163 "
164 SELECT type_id, snapshot_version, snapshot, users_who_can_read FROM snapshots
165 WHERE object_id = $1
166 AND is_latest
167 ",
168 object_id as ObjectId,
169 )
170 .fetch_one(&mut *transaction)
171 .await
172 .with_context(|| format!("fetching latest snapshot for object {object_id:?}"))?;
173 let type_id = TypeId::from_uuid(res.type_id);
174 let users_who_can_read_before = res
175 .users_who_can_read
176 .ok_or_else(|| {
177 crate::Error::Other(anyhow!("Latest snapshot had NULL users_who_can_read"))
178 })?
179 .into_iter()
180 .map(User::from_uuid)
181 .collect::<HashSet<User>>();
182
183 let can_read = Config::get_users_who_can_read(
185 self,
186 object_id,
187 type_id,
188 res.snapshot_version,
189 res.snapshot,
190 cb,
191 )
192 .await
193 .with_context(|| format!("updating users_who_can_read cache of {object_id:?}"))?;
194 let users_who_can_read_after = can_read.users.iter().copied().collect::<HashSet<User>>();
195
196 reord::maybe_lock().await;
198 let affected = sqlx::query(
199 "
200 UPDATE snapshots
201 SET users_who_can_read = $1,
202 users_who_can_read_depends_on = $2
203 WHERE object_id = $3
204 AND is_latest
205 ",
206 )
207 .bind(can_read.users.into_iter().collect::<Vec<_>>())
208 .bind(&can_read.depends_on)
209 .bind(object_id)
210 .execute(&mut *transaction)
211 .await
212 .with_context(|| {
213 format!("updating users_who_can_read in latest snapshot for {object_id:?}")
214 })?
215 .rows_affected();
216 reord::point().await;
217 anyhow::ensure!(
218 affected == 1,
219 "Failed to update latest snapshot of users_who_can_read"
220 );
221
222 let _lock = if !can_read.depends_on.iter().any(|o| *o == requested_by) {
224 Some((
225 reord::Lock::take_named(format!("{requested_by:?}")),
226 self.object_locks.async_lock(requested_by).await,
227 ))
228 } else {
229 None
230 };
231
232 reord::maybe_lock().await;
238 let affected = sqlx::query(
239 "
240 UPDATE snapshots
241 SET reverse_dependents_to_update = array_remove(reverse_dependents_to_update, $1)
242 WHERE object_id = $2 AND is_latest
243 ",
244 )
245 .bind(object_id)
246 .bind(requested_by)
247 .execute(&mut *transaction)
248 .await
249 .with_context(|| {
250 format!(
251 "removing {object_id:?} from the list of rev-deps of {requested_by:?} to update"
252 )
253 })?
254 .rows_affected();
255 reord::point().await;
256 anyhow::ensure!(
257 affected == 1,
258 "Failed to mark reverse dependent {object_id:?} of {requested_by:?} as updated"
259 );
260
261 reord::point().await;
262 transaction.commit().await.wrap_with_context(|| {
263 format!("committing transaction that updated the rdeps of {object_id:?}")
264 })?;
265 reord::point().await;
266
267 let lost_read = users_who_can_read_before
270 .difference(&users_who_can_read_after)
271 .copied()
272 .collect();
273 let gained_read = users_who_can_read_after
274 .difference(&users_who_can_read_before)
275 .copied()
276 .collect();
277 Ok(ReadPermsChanges {
278 object_id,
279 type_id,
280 lost_read,
281 gained_read,
282 })
283 }
284
285 async fn update_rdeps<C: CanDoCallbacks>(
286 &self,
287 object_id: ObjectId,
288 cb: &C,
289 ) -> anyhow::Result<Vec<ReadPermsChanges>> {
290 let rdeps = sqlx::query!(
297 r#"
298 SELECT UNNEST(reverse_dependents_to_update) AS "rdep!"
299 FROM snapshots
300 WHERE object_id = $1
301 AND reverse_dependents_to_update IS NOT NULL
302 "#,
303 object_id as ObjectId,
304 )
305 .map(|r| ObjectId::from_uuid(r.rdep))
306 .fetch_all(&self.db)
307 .await
308 .wrap_with_context(|| {
309 format!("listing reverse dependents of {object_id:?} that need updating")
310 })?;
311 let mut res = Vec::with_capacity(rdeps.len());
312 for o in rdeps {
313 if o != object_id {
314 let changes = self.update_users_who_can_read(object_id, o, cb)
315 .await
316 .with_context(|| format!("updating users_who_can_read field for {o:?} on behalf of {object_id:?}"))?;
317 res.push(changes);
318 }
319 }
320 Ok(res)
321 }
322
323 #[allow(clippy::too_many_arguments)] async fn write_snapshot<'a, T: Object, C: CanDoCallbacks>(
325 &'a self,
326 transaction: &mut sqlx::PgConnection,
327 snapshot_id: EventId,
328 object_id: ObjectId,
329 is_creation: bool,
330 is_latest: bool,
331 rdeps: Option<&[ObjectId]>,
332 object: &T,
333 updatedness: Updatedness,
334 cb: &'a C,
335 ) -> crate::Result<Vec<ComboLock<'a>>> {
336 let can_read = if is_latest {
337 Some(
338 self.get_users_who_can_read::<T, _>(object_id, object, cb)
339 .await
340 .wrap_with_context(|| {
341 format!(
342 "listing users who can read for snapshot {snapshot_id:?} of {object_id:?}"
343 )
344 })?,
345 )
346 } else {
347 None
348 };
349 assert!(
350 !is_latest || rdeps.is_some(),
351 "Latest snapshots must always list their reverse dependencies"
352 );
353
354 reord::maybe_lock().await;
355 let result = sqlx::query(
356 "INSERT INTO snapshots VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9, $10, $11, $12, $13)",
357 )
358 .bind(snapshot_id)
359 .bind(T::type_ulid())
360 .bind(object_id)
361 .bind(is_creation)
362 .bind(is_latest)
363 .bind(normalizer_version())
364 .bind(T::snapshot_version())
365 .bind(sqlx::types::Json(object))
366 .bind(
367 can_read
368 .as_ref()
369 .map(|u| u.users.iter().cloned().collect::<Vec<_>>()),
370 )
371 .bind(can_read.as_ref().map(|u| &u.depends_on))
372 .bind(rdeps)
373 .bind(object.required_binaries())
374 .bind(updatedness)
375 .execute(&mut *transaction)
376 .await;
377 reord::point().await;
378
379 match result {
380 Ok(_) => Ok(can_read.map(|c| c.locks).unwrap_or_else(Vec::new)),
381 Err(sqlx::Error::Database(err)) if err.constraint() == Some("snapshots_pkey") => {
382 Err(crate::Error::EventAlreadyExists(snapshot_id))
383 }
384 Err(e) => Err(e)
385 .wrap_with_context(|| format!("inserting snapshot {snapshot_id:?} into table")),
386 }
387 }
388
389 async fn create_impl<T: Object, C: CanDoCallbacks>(
390 &self,
391 object_id: ObjectId,
392 created_at: EventId,
393 object: Arc<T>,
394 updatedness: Updatedness,
395 cb: &C,
396 ) -> crate::Result<Either<Arc<T>, Option<EventId>>> {
397 reord::point().await;
398 let mut transaction = self
399 .db
400 .begin()
401 .await
402 .wrap_context("acquiring postgresql transaction")?;
403
404 let _lock = reord::Lock::take_named(format!("{created_at:?}")).await;
406 let _lock = self.event_locks.async_lock(created_at).await;
407 let _lock = reord::Lock::take_named(format!("{object_id:?}")).await;
408 let _lock = self.object_locks.async_lock(object_id).await;
409 reord::point().await;
410
411 let type_id = *T::type_ulid();
413 let snapshot_version = T::snapshot_version();
414 let object_json = sqlx::types::Json(&object);
415 let can_read = self
416 .get_users_who_can_read(object_id, &*object, cb)
417 .await
418 .wrap_with_context(|| format!("listing users who can read object {object_id:?}"))?;
419 let rdeps = self
420 .get_rdeps(&mut *transaction, object_id)
421 .await
422 .wrap_with_context(|| format!("listing reverse dependencies of {object_id:?}"))?;
423 let required_binaries = object.required_binaries();
424 reord::maybe_lock().await;
425 let affected = sqlx::query("INSERT INTO snapshots VALUES ($1, $2, $3, TRUE, TRUE, $4, $5, $6, $7, $8, $9, $10, $11) ON CONFLICT DO NOTHING")
427 .bind(created_at)
428 .bind(type_id)
429 .bind(object_id)
430 .bind(normalizer_version())
431 .bind(snapshot_version)
432 .bind(object_json)
433 .bind(can_read.users.iter().copied().collect::<Vec<_>>())
434 .bind(&can_read.depends_on)
435 .bind(&rdeps)
436 .bind(&required_binaries)
437 .bind(updatedness)
438 .execute(&mut *transaction)
439 .await
440 .wrap_with_context(|| format!("inserting snapshot {created_at:?}"))?
441 .rows_affected();
442 reord::point().await;
443 if affected != 1 {
444 reord::point().await;
446 let rdeps_to_update = sqlx::query!(
447 "
448 SELECT array_length(reverse_dependents_to_update, 1) AS num_rdeps
449 FROM snapshots
450 WHERE snapshot_id = $1
451 AND type_id = $2
452 AND object_id = $3
453 AND is_creation
454 AND snapshot_version = $4
455 AND snapshot = $5
456 ",
457 created_at as EventId,
458 *T::type_ulid() as TypeId,
459 object_id as ObjectId,
460 snapshot_version,
461 object_json as _,
462 )
463 .fetch_optional(&mut *transaction)
464 .await
465 .wrap_with_context(|| {
466 format!("checking pre-existing snapshot for {created_at:?} is the same")
467 })?;
468 let Some(rdeps_to_update) = rdeps_to_update else {
469 reord::point().await;
471 let object_exists_affected =
472 sqlx::query("SELECT 1 FROM snapshots WHERE object_id = $1")
473 .bind(object_id)
474 .execute(&mut *transaction)
475 .await
476 .wrap_with_context(|| {
477 format!("checking whether {object_id:?} already exists")
478 })?
479 .rows_affected();
480 return if object_exists_affected >= 1 {
481 Err(crate::Error::ObjectAlreadyExists(object_id))
482 } else {
483 Err(crate::Error::EventAlreadyExists(created_at))
484 };
485 };
486
487 return Ok(Either::Right(
488 rdeps_to_update.num_rdeps.is_some().then_some(created_at),
489 ));
490 }
491
492 reord::point().await;
494 let affected = sqlx::query("SELECT event_id FROM events WHERE event_id = $1")
495 .bind(created_at)
496 .execute(&mut *transaction)
497 .await
498 .wrap_context("checking that no event existed with this id yet")?
499 .rows_affected();
500 if affected != 0 {
501 return Err(crate::Error::EventAlreadyExists(created_at));
502 }
503
504 check_required_binaries(&mut transaction, required_binaries)
506 .await
507 .wrap_with_context(|| {
508 format!("checking that all binaries for object {object_id:?} are already present")
509 })?;
510
511 reord::point().await;
512 transaction
513 .commit()
514 .await
515 .wrap_with_context(|| format!("committing transaction that created {object_id:?}"))?;
516 reord::point().await;
517
518 Ok(Either::Left(object))
519 }
520
521 async fn submit_impl<T: Object, C: CanDoCallbacks>(
522 &self,
523 object_id: ObjectId,
524 event_id: EventId,
525 event: Arc<T::Event>,
526 updatedness: Updatedness,
527 cb: &C,
528 ) -> crate::Result<Either<Arc<T>, Option<EventId>>> {
529 reord::point().await;
530 let mut transaction = self
531 .db
532 .begin()
533 .await
534 .wrap_context("acquiring postgresql transaction")?;
535
536 let _lock = reord::Lock::take_named(format!("{event_id:?}")).await;
538 let _lock = self.event_locks.async_lock(event_id).await;
539 let _lock = reord::Lock::take_named(format!("{object_id:?}")).await;
540 let _lock = self.object_locks.async_lock(object_id).await;
541 reord::point().await;
542
543 reord::point().await;
545 let creation_snapshot = sqlx::query!(
546 "SELECT snapshot_id, type_id FROM snapshots WHERE object_id = $1 AND is_creation",
547 object_id as ObjectId,
548 )
549 .fetch_optional(&mut *transaction)
550 .await
551 .wrap_with_context(|| format!("locking object {object_id:?} in database"))?;
552 reord::point().await;
553 match creation_snapshot {
554 None => {
555 return Err(crate::Error::ObjectDoesNotExist(object_id));
556 }
557 Some(s) if TypeId::from_uuid(s.type_id) != *T::type_ulid() => {
558 return Err(crate::Error::WrongType {
559 object_id,
560 expected_type_id: *T::type_ulid(),
561 real_type_id: TypeId::from_uuid(s.type_id),
562 })
563 }
564 Some(s) if s.snapshot_id >= event_id.to_uuid() => {
565 return Err(crate::Error::EventTooEarly {
566 event_id,
567 object_id,
568 created_at: EventId::from_uuid(s.snapshot_id),
569 });
570 }
571 _ => (),
572 }
573
574 let event_json = sqlx::types::Json(&event);
576 let required_binaries = event.required_binaries();
577 reord::maybe_lock().await;
578 let affected =
579 sqlx::query("INSERT INTO events VALUES ($1, $2, $3, $4, $5) ON CONFLICT DO NOTHING")
580 .bind(event_id)
581 .bind(object_id)
582 .bind(event_json)
583 .bind(required_binaries)
584 .bind(updatedness)
585 .execute(&mut *transaction)
586 .await
587 .wrap_with_context(|| format!("inserting event {event_id:?} in database"))?
588 .rows_affected();
589 reord::point().await;
590 if affected != 1 {
591 reord::point().await;
593 let affected = sqlx::query(
594 "
595 SELECT 1 FROM events
596 WHERE event_id = $1
597 AND object_id = $2
598 AND data = $3
599 ",
600 )
601 .bind(event_id)
602 .bind(object_id)
603 .bind(event_json)
604 .execute(&mut *transaction)
605 .await
606 .wrap_with_context(|| {
607 format!("checking pre-existing snapshot for {event_id:?} is the same")
608 })?
609 .rows_affected();
610 if affected != 1 {
611 return Err(crate::Error::EventAlreadyExists(event_id));
612 }
613 let last_snapshot_id = sqlx::query!(
616 "
617 SELECT snapshot_id
618 FROM snapshots
619 WHERE object_id = $1
620 AND is_latest
621 AND array_length(reverse_dependents_to_update, 1) IS NOT NULL
622 ",
623 object_id as ObjectId,
624 )
625 .map(|r| EventId::from_uuid(r.snapshot_id))
626 .fetch_optional(&mut *transaction)
627 .await
628 .wrap_context(
629 "checking whether the event's reverse-dependency changes have been handled",
630 )?;
631 return Ok(Either::Right(last_snapshot_id));
632 }
633
634 reord::maybe_lock().await;
636 sqlx::query("DELETE FROM snapshots WHERE object_id = $1 AND snapshot_id > $2")
637 .bind(object_id)
638 .bind(event_id)
639 .execute(&mut *transaction)
640 .await
641 .wrap_with_context(|| {
642 format!("clearing all snapshots for object {object_id:?} after event {event_id:?}")
643 })?;
644 reord::point().await;
645
646 let last_snapshot = sqlx::query!(
648 "
649 SELECT snapshot_id, is_latest, snapshot_version, snapshot
650 FROM snapshots
651 WHERE object_id = $1
652 ORDER BY snapshot_id DESC
653 LIMIT 1
654 ",
655 object_id.to_uuid(),
656 )
657 .fetch_one(&mut *transaction)
658 .await
659 .wrap_with_context(|| format!("fetching the last snapshot for object {object_id:?}"))?;
660 let mut object =
661 parse_snapshot::<T>(last_snapshot.snapshot_version, last_snapshot.snapshot)
662 .wrap_with_context(|| format!("parsing last snapshot for object {object_id:?}"))?;
663
664 reord::maybe_lock().await;
667 sqlx::query("UPDATE snapshots SET is_latest = FALSE WHERE object_id = $1 AND is_latest")
668 .bind(object_id)
669 .execute(&mut *transaction)
670 .await
671 .wrap_with_context(|| {
672 format!("removing latest-snapshot flag for object {object_id:?}")
673 })?;
674 reord::point().await;
675
676 if !last_snapshot.is_latest {
678 let from = EventId::from_uuid(last_snapshot.snapshot_id);
679 let to = EventId::from_u128(event_id.as_u128() - 1);
680 apply_events_between(&mut transaction, &mut object, object_id, from, to)
681 .await
682 .wrap_with_context(|| {
683 format!("applying all events on {object_id:?} between {from:?} and {to:?}")
684 })?;
685 }
686
687 object.apply(DbPtr::from(object_id), &event);
689
690 let rdeps = self
692 .get_rdeps(&mut *transaction, object_id)
693 .await
694 .wrap_with_context(|| format!("fetching reverse dependencies of {object_id:?}"))?;
695 let mut _dep_locks = self
696 .write_snapshot(
697 &mut transaction,
698 event_id,
699 object_id,
700 false,
701 last_snapshot.is_latest,
702 if last_snapshot.is_latest {
703 Some(&rdeps)
704 } else {
705 None
706 },
707 &object,
708 updatedness,
709 cb,
710 )
711 .await
712 .wrap_with_context(|| format!("writing snapshot {event_id:?} for {object_id:?}"))?;
713
714 if !last_snapshot.is_latest {
716 std::mem::drop(_dep_locks);
718
719 reord::point().await;
721 let mut events_since_inserted = sqlx::query!(
722 "
723 SELECT event_id, data
724 FROM events
725 WHERE object_id = $1
726 AND event_id > $2
727 ORDER BY event_id ASC
728 ",
729 object_id.to_uuid(),
730 event_id.to_uuid(),
731 )
732 .fetch(&mut *transaction);
733 let mut last_event_id = None;
734 while let Some(e) = events_since_inserted.next().await {
735 let e = e.wrap_with_context(|| {
736 format!("fetching all events for {object_id:?} after {event_id:?}")
737 })?;
738 last_event_id = Some(e.event_id);
739 let e = serde_json::from_value::<T::Event>(e.data).wrap_with_context(|| {
740 format!(
741 "parsing event {:?} of type {:?}",
742 e.event_id,
743 T::type_ulid()
744 )
745 })?;
746
747 object.apply(DbPtr::from(object_id), &e);
748 }
749 std::mem::drop(events_since_inserted);
750
751 let snapshot_id = EventId::from_uuid(
753 last_event_id
754 .expect("Entered the 'recomputing last snapshot' stage without any new events"),
755 );
756 _dep_locks = self
757 .write_snapshot(
758 &mut transaction,
759 snapshot_id,
760 object_id,
761 false,
762 true,
763 Some(&rdeps),
764 &object,
765 updatedness,
766 cb,
767 )
768 .await
769 .wrap_with_context(|| {
770 format!("writing snapshot {snapshot_id:?} for {object_id:?}")
771 })?;
772 }
773
774 check_required_binaries(&mut transaction, event.required_binaries())
776 .await
777 .wrap_with_context(|| {
778 format!("checking that all binaries for object {object_id:?} are already present")
779 })?;
780
781 reord::point().await;
782 transaction.commit().await.wrap_with_context(|| {
783 format!("committing transaction adding event {event_id:?} to object {object_id:?}")
784 })?;
785 reord::point().await;
786
787 Ok(Either::Left(Arc::new(object)))
788 }
789}
790
791impl<Config: crdb_core::Config> Db for PostgresDb<Config> {
792 async fn create<T: Object>(
793 &self,
794 object_id: ObjectId,
795 created_at: EventId,
796 object: Arc<T>,
797 updatedness: Option<Updatedness>,
798 _importance: Importance,
799 ) -> crate::Result<Option<Arc<T>>> {
800 let updatedness =
801 updatedness.expect("Called PostgresDb::create without specifying updatedness");
802 Ok(self
803 .create_and_return_rdep_changes::<T>(object_id, created_at, object, updatedness)
804 .await?
805 .map(|(snap, _)| snap))
806 }
807
808 async fn submit<T: Object>(
809 &self,
810 object_id: ObjectId,
811 event_id: EventId,
812 event: Arc<T::Event>,
813 updatedness: Option<Updatedness>,
814 _additional_importance: Importance,
815 ) -> crate::Result<Option<Arc<T>>> {
816 let updatedness =
817 updatedness.expect("Called PostgresDb::create without specifying updatedness");
818 Ok(self
819 .submit_and_return_rdep_changes::<T>(object_id, event_id, event, updatedness)
820 .await?
821 .map(|(snap, _)| snap))
822 }
823
824 async fn get_latest<T: Object>(
825 &self,
826 object_id: ObjectId,
827 _importance: Importance,
828 ) -> crate::Result<Arc<T>> {
829 reord::point().await;
830 let mut transaction = self
831 .db
832 .begin()
833 .await
834 .wrap_context("acquiring postgresql transaction")?;
835
836 reord::point().await;
838 let latest_snapshot = sqlx::query!(
839 "
840 SELECT snapshot_id, type_id, snapshot_version, snapshot
841 FROM snapshots
842 WHERE object_id = $1
843 AND is_latest
844 ",
845 object_id as ObjectId,
846 )
847 .fetch_optional(&mut *transaction)
848 .await
849 .wrap_with_context(|| format!("fetching latest snapshot for object {object_id:?}"))?;
850 let latest_snapshot = match latest_snapshot {
851 Some(s) => s,
852 None => return Err(crate::Error::ObjectDoesNotExist(object_id)),
853 };
854 let real_type_id = TypeId::from_uuid(latest_snapshot.type_id);
855 let expected_type_id = *T::type_ulid();
856 if real_type_id != expected_type_id {
857 return Err(crate::Error::WrongType {
858 object_id,
859 expected_type_id,
860 real_type_id,
861 });
862 }
863
864 reord::point().await;
866 let res = parse_snapshot::<T>(latest_snapshot.snapshot_version, latest_snapshot.snapshot)
867 .wrap_with_context(|| format!("parsing latest snapshot for {object_id:?}"))?;
868 Ok(Arc::new(res))
869 }
870
871 async fn create_binary(&self, binary_id: BinPtr, data: Arc<[u8]>) -> crate::Result<()> {
872 if crdb_core::hash_binary(&data) != binary_id {
873 return Err(crate::Error::BinaryHashMismatch(binary_id));
874 }
875 reord::maybe_lock().await;
876 sqlx::query("INSERT INTO binaries VALUES ($1, $2) ON CONFLICT DO NOTHING")
877 .bind(binary_id)
878 .bind(&*data)
879 .execute(&self.db)
880 .await
881 .wrap_with_context(|| format!("inserting binary {binary_id:?} into database"))?;
882 reord::point().await;
883 Ok(())
884 }
885
886 async fn get_binary(&self, binary_id: BinPtr) -> crate::Result<Option<Arc<[u8]>>> {
887 reord::point().await;
888 Ok(sqlx::query!(
889 "SELECT data FROM binaries WHERE binary_id = $1",
890 binary_id as BinPtr
891 )
892 .fetch_optional(&self.db)
893 .await
894 .wrap_with_context(|| format!("getting {binary_id:?} from database"))?
895 .map(|res| res.data.into_boxed_slice().into()))
896 }
897
898 async fn reencode_old_versions<T: Object>(&self) -> usize {
900 let mut num_errors = 0;
901 let mut old_snapshots = sqlx::query(
902 "
903 SELECT object_id, snapshot_id
904 FROM snapshots
905 WHERE type_id = $1
906 AND (snapshot_version < $2 OR normalizer_version < $3)
907 ",
908 )
909 .bind(T::type_ulid())
910 .bind(T::snapshot_version())
911 .bind(normalizer_version())
912 .fetch(&self.db);
913 while let Some(s) = old_snapshots.next().await {
914 let s = match s {
915 Ok(s) => s,
916 Err(err) => {
917 num_errors += 1;
918 tracing::error!(?err, "failed retrieving one snapshot for upgrade");
919 continue;
920 }
921 };
922 let object_id = ObjectId::from_uuid(s.get(0));
923 let snapshot_id = EventId::from_uuid(s.get(1));
924 if let Err(err) = self.reencode::<T>(object_id, snapshot_id).await {
925 num_errors += 1;
926 tracing::error!(
927 ?err,
928 ?object_id,
929 ?snapshot_id,
930 "failed reencoding snapshot with newer version",
931 );
932 }
933 }
934 num_errors
935 }
936
937 async fn assert_invariants_generic(&self) {
938 assert_eq!(
940 0,
941 sqlx::query(
942 "
943 (
944 SELECT unnest(required_binaries)
945 FROM snapshots
946 UNION
947 SELECT unnest(required_binaries)
948 FROM events
949 )
950 EXCEPT
951 SELECT binary_id
952 FROM binaries
953 ",
954 )
955 .execute(&self.db)
956 .await
957 .unwrap()
958 .rows_affected()
959 );
960
961 assert_eq!(
963 0,
964 sqlx::query(
965 "
966 SELECT object_id FROM events
967 EXCEPT
968 SELECT object_id FROM snapshots WHERE is_creation
969 "
970 )
971 .execute(&self.db)
972 .await
973 .unwrap()
974 .rows_affected()
975 );
976
977 assert_eq!(
979 0,
980 sqlx::query(
981 "
982 SELECT snapshot_id AS id FROM snapshots WHERE NOT is_creation
983 EXCEPT
984 SELECT event_id AS id FROM events
985 "
986 )
987 .execute(&self.db)
988 .await
989 .unwrap()
990 .rows_affected()
991 );
992
993 assert_eq!(
995 0,
996 sqlx::query(
997 "
998 SELECT snapshot_id FROM snapshots
999 LEFT JOIN events ON snapshots.snapshot_id = events.event_id
1000 WHERE snapshots.object_id != events.object_id
1001 "
1002 )
1003 .execute(&self.db)
1004 .await
1005 .unwrap()
1006 .rows_affected()
1007 );
1008
1009 assert_eq!(
1011 0,
1012 sqlx::query(
1013 "
1014 SELECT object_id
1015 FROM snapshots
1016 GROUP BY object_id
1017 HAVING COUNT(DISTINCT type_id) > 1
1018 "
1019 )
1020 .execute(&self.db)
1021 .await
1022 .unwrap()
1023 .rows_affected()
1024 )
1025 }
1026
1027 async fn assert_invariants_for<T: Object>(&self) {
1028 let objects = sqlx::query!(
1030 "SELECT object_id FROM snapshots WHERE type_id = $1",
1031 T::type_ulid() as &TypeId
1032 )
1033 .fetch_all(&self.db)
1034 .await
1035 .unwrap();
1036 for o in objects {
1037 let creation: uuid::Uuid = sqlx::query(
1039 "SELECT snapshot_id FROM snapshots WHERE object_id = $1 AND is_creation",
1040 )
1041 .bind(o.object_id)
1042 .fetch_one(&self.db)
1043 .await
1044 .unwrap()
1045 .get(0);
1046 let latest: uuid::Uuid =
1047 sqlx::query("SELECT snapshot_id FROM snapshots WHERE object_id = $1 AND is_latest")
1048 .bind(o.object_id)
1049 .fetch_one(&self.db)
1050 .await
1051 .unwrap()
1052 .get(0);
1053
1054 assert_eq!(
1056 0,
1057 sqlx::query(
1058 "
1059 SELECT snapshot_id
1060 FROM snapshots
1061 WHERE object_id = $1
1062 AND (snapshot_id < $2 OR snapshot_id > $3)
1063 ",
1064 )
1065 .bind(o.object_id)
1066 .bind(creation)
1067 .bind(latest)
1068 .execute(&self.db)
1069 .await
1070 .unwrap()
1071 .rows_affected()
1072 );
1073 assert_eq!(
1074 0,
1075 sqlx::query(
1076 "
1077 SELECT event_id
1078 FROM events
1079 WHERE object_id = $1
1080 AND (event_id <= $2 OR event_id > $3)
1081 ",
1082 )
1083 .bind(o.object_id)
1084 .bind(creation)
1085 .bind(latest)
1086 .execute(&self.db)
1087 .await
1088 .unwrap()
1089 .rows_affected()
1090 );
1091
1092 let snapshots = sqlx::query!(
1094 "SELECT * FROM snapshots WHERE object_id = $1 ORDER BY snapshot_id",
1095 o.object_id
1096 )
1097 .fetch_all(&self.db)
1098 .await
1099 .unwrap();
1100 let events = sqlx::query!(
1101 "SELECT * FROM events WHERE object_id = $1 ORDER BY event_id",
1102 o.object_id
1103 )
1104 .fetch_all(&self.db)
1105 .await
1106 .unwrap();
1107
1108 assert_eq!(TypeId::from_uuid(snapshots[0].type_id), *T::type_ulid());
1109 assert!(snapshots[0].is_creation);
1110 let mut object =
1111 parse_snapshot::<T>(snapshots[0].snapshot_version, snapshots[0].snapshot.clone())
1112 .unwrap();
1113 assert_eq!(
1114 snapshots[0].required_binaries,
1115 object
1116 .required_binaries()
1117 .into_iter()
1118 .map(|b| b.to_uuid())
1119 .collect::<Vec<_>>()
1120 );
1121
1122 let mut snapshot_idx = 1;
1123 let mut event_idx = 0;
1124 loop {
1125 if event_idx == events.len() {
1126 assert_eq!(snapshot_idx, snapshots.len());
1127 break;
1128 }
1129 let e = &events[event_idx];
1130 event_idx += 1;
1131 let event = serde_json::from_value::<T::Event>(e.data.clone()).unwrap();
1132 assert_eq!(
1133 event
1134 .required_binaries()
1135 .into_iter()
1136 .map(|b| b.to_uuid())
1137 .collect::<Vec<_>>(),
1138 e.required_binaries
1139 );
1140 object.apply(DbPtr::from(ObjectId::from_uuid(o.object_id)), &event);
1141 if snapshots[snapshot_idx].snapshot_id != e.event_id {
1142 continue;
1143 }
1144 let s = &snapshots[snapshot_idx];
1145 snapshot_idx += 1;
1146 assert_eq!(TypeId::from_uuid(s.type_id), *T::type_ulid());
1147 let snapshot = parse_snapshot::<T>(s.snapshot_version, s.snapshot.clone()).unwrap();
1148 assert!(object == snapshot);
1149 assert_eq!(
1150 s.required_binaries,
1151 snapshot
1152 .required_binaries()
1153 .into_iter()
1154 .map(|b| b.to_uuid())
1155 .collect::<Vec<_>>()
1156 );
1157 }
1158 if events.is_empty() {
1159 assert!(snapshots.len() == 1);
1160 } else {
1161 assert_eq!(
1162 snapshots[snapshots.len() - 1].snapshot_id,
1163 events[events.len() - 1].event_id
1164 );
1165 }
1166 assert!(snapshots[snapshots.len() - 1].is_latest);
1167 assert_eq!(
1168 snapshots[snapshots.len() - 1]
1169 .users_who_can_read
1170 .as_ref()
1171 .unwrap()
1172 .iter()
1173 .map(|u| User::from_uuid(*u))
1174 .collect::<HashSet<_>>(),
1175 object.users_who_can_read(self).await.unwrap()
1176 );
1177 }
1178 }
1179}
1180
1181type TrackedLock<'cb> = (
1182 reord::Lock,
1183 <LockPool<ObjectId> as Lockable<ObjectId, ()>>::Guard<'cb>,
1184);
1185
1186struct TrackingCanDoCallbacks<'cb, 'lockpool, C: CanDoCallbacks> {
1187 cb: &'cb C,
1188 already_taken_lock: ObjectId,
1189 object_locks: &'lockpool LockPool<ObjectId>,
1190 locks: Mutex<HashMap<ObjectId, TrackedLock<'lockpool>>>,
1191}
1192
1193impl<C: CanDoCallbacks> CanDoCallbacks for TrackingCanDoCallbacks<'_, '_, C> {
1194 async fn get<T: Object>(&self, object_id: crate::DbPtr<T>) -> crate::Result<Arc<T>> {
1195 let id = ObjectId(object_id.id);
1196 if id != self.already_taken_lock {
1197 if let hash_map::Entry::Vacant(v) = self.locks.lock().await.entry(id) {
1198 v.insert((
1199 reord::Lock::take_named(format!("{id:?}")).await,
1200 self.object_locks.async_lock(id).await,
1201 ));
1202 }
1203 }
1204 self.cb
1205 .get::<T>(DbPtr::from(ObjectId(object_id.id)))
1206 .await
1207 .wrap_with_context(|| format!("requesting {object_id:?} from database"))
1208 }
1209}
1210
1211pub type ComboLock<'a> = (
1212 reord::Lock,
1213 <lockable::LockPool<ObjectId> as lockable::Lockable<ObjectId, ()>>::Guard<'a>,
1214);
1215
1216impl<Config: crdb_core::Config> ServerSideDb for PostgresDb<Config> {
1217 type Connection = sqlx::PgConnection;
1218 type Transaction<'a> = sqlx::Transaction<'a, sqlx::Postgres>;
1219 type Lock<'a> = ComboLock<'a>;
1220
1221 fn get_users_who_can_read<'a, 'ret: 'a, T: Object, C: CanDoCallbacks>(
1223 &'ret self,
1224 object_id: ObjectId,
1225 object: &'a T,
1226 cb: &'a C,
1227 ) -> Pin<Box<dyn 'a + waaaa::Future<Output = anyhow::Result<UsersWhoCanRead<Self::Lock<'ret>>>>>>
1228 {
1229 Box::pin(async move {
1230 let cb = TrackingCanDoCallbacks::<'a, 'ret> {
1231 cb,
1232 already_taken_lock: object_id,
1233 object_locks: &self.object_locks,
1234 locks: Mutex::new(HashMap::new()),
1235 };
1236
1237 let users = object.users_who_can_read(&cb).await.with_context(|| {
1238 format!("figuring out the list of users who can read {object_id:?}")
1239 })?;
1240 let cb_locks = cb.locks.into_inner();
1241 let mut depends_on = Vec::with_capacity(cb_locks.len());
1242 let mut locks = Vec::with_capacity(cb_locks.len());
1243 for (o, l) in cb_locks {
1244 depends_on.push(o);
1245 locks.push(l);
1246 }
1247 let res = UsersWhoCanRead {
1248 users,
1249 depends_on,
1250 locks,
1251 };
1252 Ok(res)
1253 })
1254 }
1255
1256 async fn get_transaction(&self) -> crate::Result<sqlx::Transaction<'_, sqlx::Postgres>> {
1257 let mut transaction = self
1258 .db
1259 .begin()
1260 .await
1261 .wrap_context("acquiring postgresql transaction")?;
1262
1263 reord::point().await;
1265 sqlx::query("SET TRANSACTION ISOLATION LEVEL REPEATABLE READ")
1266 .execute(&mut *transaction)
1267 .await
1268 .wrap_context("setting transaction as repeatable read")?;
1269
1270 Ok(transaction)
1271 }
1272
1273 async fn get_latest_snapshot(
1274 &self,
1275 transaction: &mut sqlx::PgConnection,
1276 user: User,
1277 object_id: ObjectId,
1278 ) -> crate::Result<Arc<serde_json::Value>> {
1279 reord::point().await;
1280 let latest_snapshot = sqlx::query!(
1281 "
1282 SELECT snapshot_id, type_id, snapshot_version, snapshot
1283 FROM snapshots
1284 WHERE object_id = $1
1285 AND is_latest
1286 AND $2 = ANY (users_who_can_read)
1287 ",
1288 object_id as ObjectId,
1289 user as User,
1290 )
1291 .fetch_optional(&mut *transaction)
1292 .await
1293 .wrap_with_context(|| format!("fetching latest snapshot for object {object_id:?}"))?;
1294 let latest_snapshot = match latest_snapshot {
1295 Some(s) => s,
1296 None => return Err(crate::Error::ObjectDoesNotExist(object_id)),
1297 };
1298 Ok(Arc::new(latest_snapshot.snapshot))
1299 }
1300
1301 async fn get_all(
1303 &self,
1304 transaction: &mut sqlx::PgConnection,
1305 user: User,
1306 object_id: ObjectId,
1307 only_updated_since: Option<Updatedness>,
1308 ) -> crate::Result<ObjectData> {
1309 let min_last_modified = only_updated_since
1310 .map(|t| EventId::from_u128(t.as_u128().saturating_add(1))) .unwrap_or(EventId::from_u128(0));
1312
1313 reord::point().await;
1315 let latest = sqlx::query!(
1316 "
1317 SELECT type_id
1318 FROM snapshots
1319 WHERE object_id = $1
1320 AND is_latest
1321 AND $2 = ANY (users_who_can_read)
1322 ",
1323 object_id as ObjectId,
1324 user as User,
1325 )
1326 .fetch_optional(&mut *transaction)
1327 .await
1328 .wrap_with_context(|| format!("checking whether {user:?} can read {object_id:?}"))?;
1329 let Some(latest) = latest else {
1330 return Err(crate::Error::ObjectDoesNotExist(object_id));
1331 };
1332 let type_id = TypeId::from_uuid(latest.type_id);
1333
1334 reord::point().await;
1335 let creation_snapshot = sqlx::query!(
1336 "
1337 SELECT snapshot_id, type_id, snapshot_version, snapshot
1338 FROM snapshots
1339 WHERE object_id = $1
1340 AND is_creation
1341 AND last_modified >= $2
1342 ",
1343 object_id as ObjectId,
1344 min_last_modified as EventId,
1345 )
1346 .fetch_optional(&mut *transaction)
1347 .await
1348 .wrap_with_context(|| format!("fetching creation snapshot for object {object_id:?}"))?;
1349
1350 reord::point().await;
1351 let events = sqlx::query!(
1352 "
1353 SELECT event_id, data
1354 FROM events
1355 WHERE object_id = $1
1356 AND last_modified >= $2
1357 ",
1358 object_id as ObjectId,
1359 min_last_modified as EventId,
1360 )
1361 .map(|r| (EventId::from_uuid(r.event_id), Arc::new(r.data)))
1362 .fetch(&mut *transaction)
1363 .try_collect::<BTreeMap<EventId, Arc<serde_json::Value>>>()
1364 .await
1365 .wrap_with_context(|| format!("fetching all events for object {object_id:?}"))?;
1366
1367 reord::point().await;
1368 let last_modified = sqlx::query!(
1369 "SELECT last_modified FROM snapshots WHERE object_id = $1 AND is_latest",
1370 object_id as ObjectId
1371 )
1372 .fetch_one(&mut *transaction)
1373 .await
1374 .wrap_context("retrieving the last_modified time")?;
1375 let last_modified = Updatedness::from_uuid(last_modified.last_modified);
1376
1377 Ok(ObjectData {
1378 object_id,
1379 type_id,
1380 creation_snapshot: creation_snapshot.map(|c| {
1381 (
1382 EventId::from_uuid(c.snapshot_id),
1383 c.snapshot_version,
1384 Arc::new(c.snapshot),
1385 )
1386 }),
1387 events,
1388 now_have_all_until: last_modified,
1389 })
1390 }
1391
1392 async fn server_query(
1393 &self,
1394 user: User,
1395 type_id: TypeId,
1396 only_updated_since: Option<Updatedness>,
1397 query: Arc<Query>,
1398 ) -> crate::Result<Vec<ObjectId>> {
1399 reord::point().await;
1400 let mut transaction = self
1401 .db
1402 .begin()
1403 .await
1404 .wrap_context("acquiring postgresql transaction")?;
1405
1406 reord::point().await;
1408 sqlx::query("SET TRANSACTION ISOLATION LEVEL REPEATABLE READ")
1409 .execute(&mut *transaction)
1410 .await
1411 .wrap_context("setting transaction as repeatable read")?;
1412
1413 let query_sql = format!(
1414 "
1415 SELECT object_id
1416 FROM snapshots
1417 WHERE is_latest
1418 AND type_id = $1
1419 AND $2 = ANY (users_who_can_read)
1420 AND last_modified >= $3
1421 AND ({})
1422 ",
1423 where_clause(&query, 4)
1424 );
1425 let min_last_modified = only_updated_since
1426 .map(|t| EventId::from_u128(t.as_u128().saturating_add(1))) .unwrap_or(EventId::from_u128(0));
1428 reord::point().await;
1429 let mut query_sql = sqlx::query(&query_sql)
1430 .persistent(false) .bind(type_id)
1432 .bind(user)
1433 .bind(min_last_modified);
1434 for b in binds(&query)? {
1435 match b {
1436 Bind::Json(v) => query_sql = query_sql.bind(v),
1437 Bind::Str(v) => query_sql = query_sql.bind(v),
1438 Bind::String(v) => query_sql = query_sql.bind(v),
1439 Bind::Decimal(v) => query_sql = query_sql.bind(v),
1440 Bind::I32(v) => query_sql = query_sql.bind(v),
1441 }
1442 }
1443 reord::point().await;
1444 let res = query_sql
1445 .map(|row| ObjectId::from_uuid(row.get(0)))
1446 .fetch_all(&mut *transaction)
1447 .await
1448 .wrap_with_context(|| format!("listing objects matching query {query:?}"))?;
1449
1450 Ok(res)
1451 }
1452
1453 async fn server_vacuum(
1458 &self,
1459 no_new_changes_before: Option<EventId>,
1460 updatedness: Updatedness,
1461 kill_sessions_older_than: Option<SystemTime>,
1462 mut notify_recreation: impl FnMut(Update, HashSet<User>),
1463 ) -> crate::Result<()> {
1464 if let Some(t) = kill_sessions_older_than {
1466 reord::point().await;
1468 sqlx::query!(
1469 "DELETE FROM sessions WHERE last_active < $1",
1470 t.ms_since_posix()?,
1471 )
1472 .execute(&self.db)
1473 .await
1474 .wrap_context("cleaning up old sessions")?;
1475 }
1476
1477 let cache_db = self
1478 .cache_db
1479 .upgrade()
1480 .expect("Called PostgresDb::vacuum after CacheDb went away");
1481
1482 {
1483 reord::point().await;
1486 let mut objects = sqlx::query!(
1487 "
1488 SELECT DISTINCT object_id, type_id
1489 FROM snapshots
1490 WHERE (NOT (is_creation OR is_latest))
1491 OR ((NOT is_latest)
1492 AND (users_who_can_read IS NOT NULL
1493 OR users_who_can_read_depends_on IS NOT NULL
1494 OR reverse_dependents_to_update IS NOT NULL))
1495 OR ((NOT is_creation) AND snapshot_id < $1)
1496 ",
1497 no_new_changes_before.unwrap_or_else(|| EventId::from_u128(0)) as EventId
1498 )
1499 .fetch(&self.db);
1500 while let Some(row) = objects.next().await {
1501 let row = row.wrap_context("listing objects with snapshots to cleanup")?;
1502 let object_id = ObjectId::from_uuid(row.object_id);
1503 let _lock = reord::Lock::take_named(format!("{object_id:?}")).await;
1504 let _lock = self.object_locks.async_lock(object_id).await;
1505 reord::maybe_lock().await;
1506 sqlx::query(
1507 "DELETE FROM snapshots WHERE object_id = $1 AND NOT (is_creation OR is_latest)",
1508 )
1509 .bind(object_id)
1510 .execute(&self.db)
1511 .await
1512 .wrap_with_context(|| format!("deleting useless snapshots from {object_id:?}"))?;
1513 reord::maybe_lock().await;
1514 sqlx::query(
1515 "
1516 UPDATE snapshots
1517 SET users_who_can_read = NULL,
1518 users_who_can_read_depends_on = NULL,
1519 reverse_dependents_to_update = NULL
1520 WHERE object_id = $1
1521 AND NOT is_latest
1522 ",
1523 )
1524 .bind(object_id)
1525 .execute(&self.db)
1526 .await
1527 .wrap_with_context(|| format!("resetting creation snapshot of {object_id:?}"))?;
1528 reord::point().await;
1529 if let Some(event_id) = no_new_changes_before {
1530 let type_id = TypeId::from_uuid(row.type_id);
1531 reord::point().await;
1532 let recreation_result = Config::recreate_no_lock(
1533 self,
1534 type_id,
1535 object_id,
1536 event_id,
1537 updatedness,
1538 &*cache_db,
1539 )
1540 .await
1541 .wrap_with_context(|| {
1542 format!("recreating {object_id:?} at time {event_id:?}")
1543 })?;
1544 if let Some((new_created_at, snapshot_version, data, users_who_can_read)) =
1545 recreation_result
1546 {
1547 reord::point().await;
1548 notify_recreation(
1549 Update {
1550 object_id,
1551 data: UpdateData::Creation {
1552 type_id,
1553 created_at: new_created_at,
1554 snapshot_version,
1555 data: Arc::new(data),
1556 },
1557 },
1558 users_who_can_read,
1559 );
1560 }
1561 }
1562 }
1563 }
1564
1565 reord::maybe_lock().await;
1567 sqlx::query(
1568 "
1569 DELETE FROM binaries
1570 WHERE NOT EXISTS (
1571 SELECT 1 FROM snapshots WHERE binary_id = ANY(required_binaries)
1572 UNION
1573 SELECT 1 FROM events WHERE binary_id = ANY(required_binaries)
1574 )
1575 ",
1576 )
1577 .execute(&self.db)
1578 .await
1579 .wrap_context("deleting no-longer-referenced binaries")?;
1580 reord::point().await;
1581
1582 #[cfg(not(test))]
1586 sqlx::query("VACUUM ANALYZE")
1587 .execute(&self.db)
1588 .await
1589 .wrap_context("vacuuming database")?;
1590 reord::point().await;
1591
1592 Ok(())
1593 }
1594
1595 async fn recreate_at<'a, T: Object, C: CanDoCallbacks>(
1596 &'a self,
1597 object_id: ObjectId,
1598 event_id: EventId,
1599 updatedness: Updatedness,
1600 cb: &'a C,
1601 ) -> crate::Result<Option<(EventId, Arc<T>)>> {
1602 if event_id.0.timestamp_ms()
1603 > SystemTime::now()
1604 .duration_since(SystemTime::UNIX_EPOCH)
1605 .map(|t| t.as_millis())
1606 .unwrap_or(0) as u64
1607 - 1000 * 3600
1608 {
1609 tracing::warn!(
1610 "Re-creating object {object_id:?} at time {event_id:?} which is less than an hour old"
1611 );
1612 }
1613
1614 reord::point().await;
1615 let mut transaction = self
1616 .db
1617 .begin()
1618 .await
1619 .wrap_context("acquiring postgresql transaction")?;
1620
1621 reord::point().await;
1623 let creation_snapshot = sqlx::query!(
1624 "SELECT snapshot_id, type_id FROM snapshots WHERE object_id = $1 AND is_creation",
1625 object_id as ObjectId,
1626 )
1627 .fetch_optional(&mut *transaction)
1628 .await
1629 .wrap_with_context(|| {
1630 format!("getting creation snapshot of {object_id:?} for re-creation")
1631 })?
1632 .ok_or(crate::Error::ObjectDoesNotExist(object_id))?;
1633 let real_type_id = TypeId::from_uuid(creation_snapshot.type_id);
1634 let expected_type_id = *T::type_ulid();
1635 if real_type_id != expected_type_id {
1636 return Err(crate::Error::WrongType {
1637 object_id,
1638 expected_type_id,
1639 real_type_id,
1640 });
1641 }
1642 if EventId::from_uuid(creation_snapshot.snapshot_id) >= event_id {
1643 return Ok(None);
1645 }
1646
1647 reord::point().await;
1649 let event = sqlx::query!(
1650 "
1651 SELECT event_id
1652 FROM events
1653 WHERE object_id = $1
1654 AND event_id <= $2
1655 ORDER BY event_id DESC
1656 LIMIT 1
1657 ",
1658 object_id as ObjectId,
1659 event_id as EventId,
1660 )
1661 .fetch_optional(&mut *transaction)
1662 .await
1663 .wrap_with_context(|| {
1664 format!("recovering the last event for {object_id:?} before cutoff time {event_id:?}")
1665 })?;
1666 let cutoff_time = match event {
1667 None => return Ok(None), Some(e) => EventId::from_uuid(e.event_id),
1669 };
1670
1671 reord::point().await;
1673 let snapshot = sqlx::query!(
1674 "
1675 SELECT snapshot_id, snapshot_version, snapshot
1676 FROM snapshots
1677 WHERE object_id = $1
1678 AND snapshot_id <= $2
1679 ORDER BY snapshot_id DESC
1680 LIMIT 1
1681 ",
1682 object_id as ObjectId,
1683 cutoff_time as EventId,
1684 )
1685 .fetch_one(&mut *transaction)
1686 .await
1687 .wrap_with_context(|| {
1688 format!("fetching latest snapshot before {cutoff_time:?} for object {object_id:?}")
1689 })?;
1690
1691 reord::maybe_lock().await;
1693 sqlx::query("DELETE FROM snapshots WHERE object_id = $1 AND snapshot_id < $2")
1694 .bind(object_id)
1695 .bind(cutoff_time)
1696 .execute(&mut *transaction)
1697 .await
1698 .wrap_with_context(|| {
1699 format!("deleting all snapshots for {object_id:?} before {cutoff_time:?}")
1700 })?;
1701 reord::point().await;
1702
1703 let latest_object = if EventId::from_uuid(snapshot.snapshot_id) != cutoff_time {
1704 let mut object = parse_snapshot::<T>(snapshot.snapshot_version, snapshot.snapshot)
1708 .wrap_with_context(|| {
1709 format!(
1710 "parsing snapshot {:?} as {:?}",
1711 snapshot.snapshot_id,
1712 T::type_ulid()
1713 )
1714 })?;
1715
1716 let snapshot_id = EventId::from_uuid(snapshot.snapshot_id);
1717 apply_events_between(
1718 &mut transaction,
1719 &mut object,
1720 object_id,
1721 snapshot_id,
1722 cutoff_time,
1723 )
1724 .await
1725 .wrap_with_context(|| {
1726 format!(
1727 "applying on {object_id:?} events between {snapshot_id:?} and {cutoff_time:?}"
1728 )
1729 })?;
1730
1731 self.write_snapshot(
1737 &mut transaction,
1738 cutoff_time,
1739 object_id,
1740 true,
1741 false,
1742 None, &object,
1744 updatedness,
1745 cb,
1746 )
1747 .await
1748 .wrap_with_context(|| format!("writing snapshot {cutoff_time:?} for {object_id:?}"))?;
1749 object
1750 } else {
1751 reord::maybe_lock().await;
1753 let latest = sqlx::query!(
1754 "
1755 UPDATE snapshots
1756 SET is_creation = TRUE
1757 WHERE snapshot_id = $1
1758 RETURNING snapshot_version, snapshot
1759 ",
1760 cutoff_time as EventId,
1761 )
1762 .fetch_one(&mut *transaction)
1763 .await
1764 .wrap_with_context(|| {
1765 format!(
1766 "marking snapshot {cutoff_time:?} as the creation one for {object_id:?} and retrieving its data"
1767 )
1768 })?;
1769 let object = parse_snapshot::<T>(latest.snapshot_version, latest.snapshot)
1770 .wrap_context("deserializing snapshot data")?;
1771 reord::point().await;
1772 object
1773 };
1774
1775 reord::maybe_lock().await;
1777 sqlx::query("DELETE FROM events WHERE object_id = $1 AND event_id <= $2")
1778 .bind(object_id)
1779 .bind(cutoff_time)
1780 .execute(&mut *transaction)
1781 .await
1782 .wrap_with_context(|| {
1783 format!("deleting all events for {object_id:?} before {cutoff_time:?}")
1784 })?;
1785 reord::point().await;
1786
1787 reord::maybe_lock().await;
1789 let affected = sqlx::query(
1790 "UPDATE snapshots SET last_modified = $1 WHERE is_latest AND object_id = $2",
1791 )
1792 .bind(updatedness)
1793 .bind(object_id)
1794 .execute(&mut *transaction)
1795 .await
1796 .wrap_with_context(|| format!("failed marking last snapshot of {object_id:?} as modified"))?
1797 .rows_affected();
1798 reord::point().await;
1799 assert!(
1800 affected == 1,
1801 "Object {object_id:?} did not have a latest snapshot, something went very wrong"
1802 );
1803
1804 reord::maybe_lock().await;
1806 transaction.commit().await.wrap_with_context(|| {
1807 format!("committing transaction that recreated {object_id:?} at {cutoff_time:?}")
1808 })?;
1809 reord::point().await;
1810
1811 Ok(Some((cutoff_time, Arc::new(latest_object))))
1812 }
1813
1814 async fn create_and_return_rdep_changes<T: Object>(
1815 &self,
1816 object_id: ObjectId,
1817 created_at: EventId,
1818 object: Arc<T>,
1819 updatedness: Updatedness,
1820 ) -> crate::Result<Option<(Arc<T>, Vec<ReadPermsChanges>)>> {
1821 let cache_db = self
1822 .cache_db
1823 .upgrade()
1824 .expect("Called PostgresDb::create after CacheDb went away");
1825
1826 let res = self
1827 .create_impl(object_id, created_at, object, updatedness, &*cache_db)
1828 .await?;
1829 match res {
1830 Either::Left(res) => {
1831 let rdeps = self
1834 .update_rdeps(object_id, &*cache_db)
1835 .await
1836 .wrap_with_context(|| {
1837 format!("updating permissions for reverse-dependencies of {object_id:?}")
1838 })?;
1839
1840 Ok(Some((res, rdeps)))
1841 }
1842 Either::Right(None) => {
1843 Ok(None)
1845 }
1846 Either::Right(Some(_snapshot_id)) => {
1847 self.update_rdeps(object_id, &*cache_db)
1850 .await
1851 .wrap_with_context(|| {
1852 format!("updating permissions for reverse-dependencies of {object_id:?}")
1853 })?;
1854 Ok(None)
1855 }
1856 }
1857 }
1858
1859 async fn submit_and_return_rdep_changes<T: Object>(
1860 &self,
1861 object_id: ObjectId,
1862 event_id: EventId,
1863 event: Arc<T::Event>,
1864 updatedness: Updatedness,
1865 ) -> crate::Result<Option<(Arc<T>, Vec<ReadPermsChanges>)>> {
1866 let cache_db = self
1867 .cache_db
1868 .upgrade()
1869 .expect("Called PostgresDb::submit after CacheDb went away");
1870
1871 let res = self
1872 .submit_impl(object_id, event_id, event, updatedness, &*cache_db)
1873 .await?;
1874 match res {
1875 Either::Left(res) => {
1876 let rdeps = self
1879 .update_rdeps(object_id, &*cache_db)
1880 .await
1881 .wrap_with_context(|| {
1882 format!("updating permissions for reverse-dependencies of {object_id:?}")
1883 })?;
1884
1885 Ok(Some((res, rdeps)))
1886 }
1887 Either::Right(None) => {
1888 Ok(None)
1890 }
1891 Either::Right(Some(_snapshot_id)) => {
1892 self.update_rdeps(object_id, &*cache_db)
1895 .await
1896 .wrap_with_context(|| {
1897 format!("updating permissions for reverse-dependencies of {object_id:?}")
1898 })?;
1899 Ok(None)
1900 }
1901 }
1902 }
1903
1904 async fn update_pending_rdeps(&self) -> crate::Result<()> {
1905 let cache_db = self
1906 .cache_db
1907 .upgrade()
1908 .expect("Called PostgresDb::create after CacheDb went away");
1909 let cache_db = &*cache_db;
1910
1911 let mut rdep_update_res = sqlx::query!(
1912 "
1913 SELECT object_id
1914 FROM snapshots
1915 WHERE array_length(reverse_dependents_to_update, 1) IS NOT NULL
1916 "
1917 )
1918 .map(|r| ObjectId::from_uuid(r.object_id))
1919 .fetch(&self.db)
1920 .map(|object_id| async move {
1921 let object_id = object_id.wrap_context("listing updates with rdeps to update")?;
1922 self.update_rdeps(object_id, cache_db)
1923 .await
1924 .wrap_with_context(|| format!("updating rdeps of {object_id:?}"))
1925 })
1926 .buffered(32);
1927 while let Some(res) = rdep_update_res.next().await {
1928 res?;
1929 }
1930 Ok(())
1931 }
1932
1933 async fn login_session(&self, session: Session) -> crate::Result<(SessionToken, SessionRef)> {
1934 let token = SessionToken::new();
1935 sqlx::query("INSERT INTO sessions VALUES ($1, $2, $3, $4, $5, $6, $7)")
1936 .bind(token)
1937 .bind(session.session_ref)
1938 .bind(session.user_id)
1939 .bind(&session.session_name)
1940 .bind(session.login_time.ms_since_posix()?)
1941 .bind(session.last_active.ms_since_posix()?)
1942 .bind(
1943 session
1944 .expiration_time
1945 .map(|t| t.ms_since_posix())
1946 .transpose()?,
1947 )
1948 .execute(&self.db)
1949 .await
1950 .wrap_with_context(|| {
1951 format!("logging in new session {token:?} with data {session:?}")
1952 })?;
1953 Ok((token, session.session_ref))
1954 }
1955
1956 async fn resume_session(&self, token: SessionToken) -> crate::Result<Session> {
1957 let res = sqlx::query!(
1958 "SELECT * FROM sessions WHERE session_token = $1",
1959 token as SessionToken
1960 )
1961 .fetch_optional(&self.db)
1962 .await
1963 .wrap_with_context(|| format!("resuming session for {token:?}"))?;
1964 let Some(res) = res else {
1965 return Err(crate::Error::InvalidToken(token));
1966 };
1967 let expiration_time = res
1968 .expiration_time
1969 .map(SystemTime::from_ms_since_posix)
1970 .transpose()
1971 .expect("negative timestamp made its way into database");
1972 if expiration_time
1973 .map(|t| t < SystemTime::now())
1974 .unwrap_or(false)
1975 {
1976 return Err(crate::Error::InvalidToken(token));
1977 }
1978 Ok(Session {
1979 user_id: User::from_uuid(res.user_id),
1980 session_ref: SessionRef::from_uuid(res.session_ref),
1981 session_name: res.name,
1982 login_time: SystemTime::from_ms_since_posix(res.login_time)
1983 .expect("negative timestamp made its way into database"),
1984 last_active: SystemTime::from_ms_since_posix(res.last_active)
1985 .expect("negative timestamp made its way into database"),
1986 expiration_time,
1987 })
1988 }
1989
1990 async fn mark_session_active(&self, token: SessionToken, at: SystemTime) -> crate::Result<()> {
1991 let affected = sqlx::query("UPDATE sessions SET last_active = $1 WHERE session_token = $2")
1992 .bind(at.ms_since_posix()?)
1993 .bind(token)
1994 .execute(&self.db)
1995 .await
1996 .wrap_with_context(|| format!("marking session {token:?} as active as of {at:?}"))?
1997 .rows_affected();
1998 if affected != 1 {
1999 return Err(crate::Error::InvalidToken(token));
2000 }
2001 Ok(())
2002 }
2003
2004 async fn rename_session<'a>(
2005 &'a self,
2006 token: SessionToken,
2007 new_name: &'a str,
2008 ) -> crate::Result<()> {
2009 let affected = sqlx::query("UPDATE sessions SET name = $1 WHERE session_token = $2")
2010 .bind(new_name)
2011 .bind(token)
2012 .execute(&self.db)
2013 .await
2014 .wrap_with_context(|| format!("renaming session {token:?} into {new_name:?}"))?
2015 .rows_affected();
2016 if affected != 1 {
2017 return Err(crate::Error::InvalidToken(token));
2018 }
2019 Ok(())
2020 }
2021
2022 async fn list_sessions(&self, user: User) -> crate::Result<Vec<Session>> {
2023 let rows = sqlx::query!("SELECT * FROM sessions WHERE user_id = $1", user as User)
2024 .fetch_all(&self.db)
2025 .await
2026 .wrap_with_context(|| format!("listing sessions for {user:?}"))?;
2027 let sessions = rows
2028 .into_iter()
2029 .map(|r| Session {
2030 user_id: User::from_uuid(r.user_id),
2031 session_ref: SessionRef::from_uuid(r.session_ref),
2032 session_name: r.name,
2033 login_time: SystemTime::from_ms_since_posix(r.login_time)
2034 .expect("negative timestamp made its way into database"),
2035 last_active: SystemTime::from_ms_since_posix(r.last_active)
2036 .expect("negative timestamp made its way into database"),
2037 expiration_time: r
2038 .expiration_time
2039 .map(SystemTime::from_ms_since_posix)
2040 .transpose()
2041 .expect("negative timestamp made its way into database"),
2042 })
2043 .collect();
2044 Ok(sessions)
2045 }
2046
2047 async fn disconnect_session(&self, user: User, session: SessionRef) -> crate::Result<()> {
2048 sqlx::query("DELETE FROM sessions WHERE user_id = $1 AND session_ref = $2")
2049 .bind(user)
2050 .bind(session)
2051 .execute(&self.db)
2052 .await
2053 .wrap_with_context(|| format!("disconnecting session {session:?}"))?;
2054 Ok(())
2056 }
2057}
2058
2059async fn check_required_binaries(
2060 t: &mut sqlx::PgConnection,
2061 mut binaries: Vec<BinPtr>,
2062) -> crate::Result<()> {
2063 reord::maybe_lock().await;
2065 let present_ids =
2066 sqlx::query("SELECT binary_id FROM binaries WHERE binary_id = ANY ($1) FOR KEY SHARE")
2067 .bind(&binaries)
2068 .fetch_all(&mut *t)
2069 .await
2070 .wrap_context("listing binaries already present in database")?;
2071 reord::point().await;
2072 binaries.retain(|b| {
2073 present_ids
2074 .iter()
2075 .all(|i| i.get::<uuid::Uuid, _>(0) != b.to_uuid())
2076 });
2077 if !binaries.is_empty() {
2078 return Err(crate::Error::MissingBinaries(binaries));
2079 }
2080 Ok(())
2081}
2082
2083async fn apply_events_between<T: Object>(
2086 transaction: &mut sqlx::PgConnection,
2087 object: &mut T,
2088 object_id: ObjectId,
2089 from: EventId,
2090 to: EventId,
2091) -> anyhow::Result<()> {
2092 reord::point().await;
2093 let mut events = sqlx::query!(
2094 "
2095 SELECT event_id, data
2096 FROM events
2097 WHERE object_id = $1
2098 AND event_id > $2
2099 AND event_id <= $3
2100 ORDER BY event_id ASC
2101 ",
2102 object_id as ObjectId,
2103 from as EventId,
2104 to as EventId,
2105 )
2106 .fetch(&mut *transaction);
2107 while let Some(e) = events.next().await {
2108 let e = e.with_context(|| {
2109 format!("fetching all events for {object_id:?} betwen {from:?} and {to:?}")
2110 })?;
2111 let e = serde_json::from_value::<T::Event>(e.data).with_context(|| {
2112 format!(
2113 "parsing event {:?} of type {:?}",
2114 e.event_id,
2115 T::type_ulid()
2116 )
2117 })?;
2118
2119 object.apply(DbPtr::from(object_id), &e);
2120 }
2121 Ok(())
2122}
2123
2124pub fn where_clause(this: &Query, first_idx: usize) -> String {
2125 let mut res = String::new();
2126 let mut bind_idx = first_idx;
2127 add_to_where_clause(&mut res, &mut bind_idx, this);
2128 res
2129}
2130
2131pub fn binds(this: &Query) -> crate::Result<Vec<Bind<'_>>> {
2132 let mut res = Vec::new();
2133 add_to_binds(&mut res, this)?;
2134 Ok(res)
2135}
2136
2137fn add_to_where_clause(res: &mut String, bind_idx: &mut usize, query: &Query) {
2138 let mut initial_bind_idx = *bind_idx;
2139 match query {
2140 Query::All(v) => {
2141 res.push_str("TRUE");
2142 for q in v {
2143 res.push_str(" AND (");
2144 add_to_where_clause(&mut *res, &mut *bind_idx, q);
2145 res.push(')');
2146 }
2147 }
2148 Query::Any(v) => {
2149 res.push_str("FALSE");
2150 for q in v {
2151 res.push_str(" OR (");
2152 add_to_where_clause(&mut *res, &mut *bind_idx, q);
2153 res.push(')');
2154 }
2155 }
2156 Query::Not(q) => {
2157 res.push_str("NOT (");
2158 add_to_where_clause(&mut *res, &mut *bind_idx, q);
2159 res.push(')');
2160 }
2161 Query::Eq(path, _) => {
2162 res.push_str("COALESCE(");
2163 add_path_to_clause(&mut *res, &mut *bind_idx, path);
2164 res.push_str(&format!(" = ${}, FALSE)", bind_idx));
2165 *bind_idx += 1;
2166 }
2167 Query::Le(path, _) => {
2168 res.push_str("CASE WHEN jsonb_typeof(");
2169 add_path_to_clause(&mut *res, &mut *bind_idx, path);
2170 res.push_str(") = 'number' THEN (");
2171 add_path_to_clause(&mut *res, &mut initial_bind_idx, path);
2172 res.push_str(&format!(")::numeric <= ${} ELSE FALSE END", bind_idx));
2173 *bind_idx += 1;
2174 }
2175 Query::Lt(path, _) => {
2176 res.push_str("CASE WHEN jsonb_typeof(");
2177 add_path_to_clause(&mut *res, &mut *bind_idx, path);
2178 res.push_str(") = 'number' THEN (");
2179 add_path_to_clause(&mut *res, &mut initial_bind_idx, path);
2180 res.push_str(&format!(")::numeric < ${} ELSE FALSE END", bind_idx));
2181 *bind_idx += 1;
2182 }
2183 Query::Ge(path, _) => {
2184 res.push_str("CASE WHEN jsonb_typeof(");
2185 add_path_to_clause(&mut *res, &mut *bind_idx, path);
2186 res.push_str(") = 'number' THEN (");
2187 add_path_to_clause(&mut *res, &mut initial_bind_idx, path);
2188 res.push_str(&format!(")::numeric >= ${} ELSE FALSE END", bind_idx));
2189 *bind_idx += 1;
2190 }
2191 Query::Gt(path, _) => {
2192 res.push_str("CASE WHEN jsonb_typeof(");
2193 add_path_to_clause(&mut *res, &mut *bind_idx, path);
2194 res.push_str(") = 'number' THEN (");
2195 add_path_to_clause(&mut *res, &mut initial_bind_idx, path);
2196 res.push_str(&format!(")::numeric > ${} ELSE FALSE END", bind_idx));
2197 *bind_idx += 1;
2198 }
2199 Query::Contains(path, _) => {
2200 res.push_str("COALESCE(");
2201 add_path_to_clause(&mut *res, &mut *bind_idx, path);
2202 res.push_str(&format!(" @> ${}, FALSE)", bind_idx));
2203 *bind_idx += 1;
2204 }
2205 Query::ContainsStr(path, pat) => {
2206 res.push_str("COALESCE(to_tsvector(");
2207 add_path_to_clause(&mut *res, &mut *bind_idx, path);
2208 let or_empty_pat = match pat.chars().all(|c| c == ' ') {
2211 true => "IS NOT NULL",
2212 false => "",
2213 };
2214 res.push_str(&format!(
2215 "->'_crdb-normalized') @@ phraseto_tsquery(${}) {or_empty_pat}, FALSE)",
2216 bind_idx,
2217 ));
2218 *bind_idx += 1;
2219 }
2220 }
2221}
2222
2223fn add_path_to_clause(res: &mut String, bind_idx: &mut usize, path: &[JsonPathItem]) {
2224 if let Some(JsonPathItem::Id(i)) = path.last() {
2225 if *i == -1 || *i == 0 {
2226 res.push_str("CASE WHEN jsonb_typeof(snapshot");
2229 for (i, _) in path[..path.len() - 1].iter().enumerate() {
2230 res.push_str(&format!("->${}", *bind_idx + i));
2231 }
2232 res.push_str(") = 'array' THEN ")
2233 }
2234 }
2235 res.push_str("snapshot");
2236 for _ in path {
2237 res.push_str(&format!("->${bind_idx}"));
2238 *bind_idx += 1;
2239 }
2240 if let Some(JsonPathItem::Id(i)) = path.last() {
2241 if *i == -1 || *i == 0 {
2242 res.push_str(" ELSE NULL END");
2243 }
2244 }
2245}
2246
2247fn add_path_to_binds<'a>(res: &mut Vec<Bind<'a>>, path: &'a [JsonPathItem]) {
2248 for p in path {
2249 match p {
2250 JsonPathItem::Key(k) => res.push(Bind::Str(k)),
2251 JsonPathItem::Id(i) => res.push(Bind::I32(*i)),
2252 }
2253 }
2254}
2255
2256pub enum Bind<'a> {
2257 Json(&'a serde_json::Value),
2258 Str(&'a str),
2259 String(String),
2260 Decimal(Decimal),
2261 I32(i32),
2262}
2263
2264fn add_to_binds<'a>(res: &mut Vec<Bind<'a>>, query: &'a Query) -> crate::Result<()> {
2265 match query {
2266 Query::All(v) => {
2267 for q in v {
2268 add_to_binds(&mut *res, q)?;
2269 }
2270 }
2271 Query::Any(v) => {
2272 for q in v {
2273 add_to_binds(&mut *res, q)?;
2274 }
2275 }
2276 Query::Not(q) => {
2277 add_to_binds(&mut *res, q)?;
2278 }
2279 Query::Eq(p, v) => {
2280 add_path_to_binds(&mut *res, p);
2281 res.push(Bind::Json(v));
2282 }
2283 Query::Le(p, v) => {
2284 add_path_to_binds(&mut *res, p);
2285 res.push(Bind::Decimal(*v));
2286 }
2287 Query::Lt(p, v) => {
2288 add_path_to_binds(&mut *res, p);
2289 res.push(Bind::Decimal(*v));
2290 }
2291 Query::Ge(p, v) => {
2292 add_path_to_binds(&mut *res, p);
2293 res.push(Bind::Decimal(*v));
2294 }
2295 Query::Gt(p, v) => {
2296 add_path_to_binds(&mut *res, p);
2297 res.push(Bind::Decimal(*v));
2298 }
2299 Query::Contains(p, v) => {
2300 add_path_to_binds(&mut *res, p);
2301 res.push(Bind::Json(v));
2302 }
2303 Query::ContainsStr(p, v) => {
2304 add_path_to_binds(&mut *res, p);
2305 crdb_core::check_string(v)?;
2306 res.push(Bind::String(crdb_core::normalize(v)));
2307 }
2308 }
2309 Ok(())
2310}