crdb_postgres/
lib.rs

1use anyhow::{anyhow, Context};
2use crdb_cache::CacheDb;
3use crdb_core::{
4    normalizer_version, BinPtr, CanDoCallbacks, Db, DbPtr, Event, EventId, Importance, Object,
5    ObjectData, ObjectId, Query, ResultExt, Session, SessionRef, SessionToken, SystemTimeExt,
6    TypeId, Update, UpdateData, Updatedness, User, UsersWhoCanRead,
7};
8use crdb_core::{Decimal, JsonPathItem, ReadPermsChanges, ServerSideDb};
9use crdb_helpers::parse_snapshot;
10use futures::{future::Either, StreamExt, TryStreamExt};
11use lockable::{LockPool, Lockable};
12use sqlx::Row;
13use std::{
14    collections::{hash_map, BTreeMap, HashMap, HashSet},
15    marker::PhantomData,
16    pin::Pin,
17    sync::{Arc, Weak},
18    time::SystemTime,
19};
20use tokio::sync::Mutex;
21
22#[cfg(test)]
23mod tests;
24
25pub use crdb_core::{Error, Result};
26
27pub struct PostgresDb<Config: crdb_core::Config> {
28    db: sqlx::PgPool,
29    cache_db: Weak<CacheDb<PostgresDb<Config>>>,
30    event_locks: LockPool<EventId>,
31    // TODO(perf-high): make into a RwLockPool, which locks the snapshot fields
32    // create and submit take a write(), get and update_users_who_can_read
33    // take a read(). This should significantly improve performance
34    object_locks: LockPool<ObjectId>,
35    _phantom: PhantomData<Config>,
36}
37
38impl<Config: crdb_core::Config> PostgresDb<Config> {
39    pub async fn connect(
40        db: sqlx::PgPool,
41        cache_watermark: usize,
42    ) -> anyhow::Result<Arc<CacheDb<PostgresDb<Config>>>> {
43        sqlx::migrate!("./migrations")
44            .run(&db)
45            .await
46            .context("running migrations on postgresql database")?;
47        let cache_db = Arc::new_cyclic(|cache_db| {
48            let db = PostgresDb {
49                db,
50                cache_db: cache_db.clone(),
51                event_locks: LockPool::new(),
52                object_locks: LockPool::new(),
53                _phantom: PhantomData,
54            };
55            CacheDb::new(db, cache_watermark)
56        });
57        Ok(cache_db)
58    }
59
60    async fn reencode<T: Object>(
61        &self,
62        object_id: ObjectId,
63        snapshot_id: EventId,
64    ) -> crate::Result<()> {
65        // First, take a lock on the object, so that no new event comes in during
66        // execution that would be overwritten by this select-then-update
67        let _lock = self.object_locks.async_lock(object_id).await;
68
69        // Then, read the snapshot, knowing it can't change under our feet
70        let Some(s) = sqlx::query!(
71            "
72                SELECT normalizer_version, snapshot_version, snapshot
73                FROM snapshots
74                WHERE snapshot_id = $1
75            ",
76            snapshot_id as EventId,
77        )
78        .fetch_optional(&self.db)
79        .await
80        .wrap_with_context(|| format!("fetching snapshot {snapshot_id:?}"))?
81        else {
82            // No such snapshot. It was certainly deleted by a concurrent vacuum or similar. Ignore.
83            return Ok(());
84        };
85
86        // If it is already up-to-date (eg. has been rewritten by an event coming in), ignore
87        if s.normalizer_version >= normalizer_version()
88            && s.snapshot_version >= T::snapshot_version()
89        {
90            return Ok(());
91        }
92
93        // If not, we still need to parse-and-reencode it
94        let snapshot = parse_snapshot::<T>(s.snapshot_version, s.snapshot)
95            .wrap_with_context(|| format!("parsing snapshot {snapshot_id:?}"))?;
96        sqlx::query(
97            "
98                UPDATE snapshots
99                SET snapshot = $1, snapshot_version = $2, normalizer_version = $3
100                WHERE snapshot_id = $4
101            ",
102        )
103        .bind(sqlx::types::Json(&snapshot))
104        .bind(T::snapshot_version())
105        .bind(normalizer_version())
106        .bind(snapshot_id)
107        .execute(&self.db)
108        .await
109        .wrap_with_context(|| format!("re-encoding snapshot data for {snapshot_id:?}"))?;
110
111        Ok(())
112    }
113
114    /// Returns the list of all reverse-dependencies of `object_id`
115    async fn get_rdeps<'a, E: sqlx::Executor<'a, Database = sqlx::Postgres>>(
116        &self,
117        connection: E,
118        object_id: ObjectId,
119    ) -> anyhow::Result<Vec<ObjectId>> {
120        reord::point().await;
121        let rdeps = sqlx::query!(
122            "
123                SELECT object_id
124                FROM snapshots
125                WHERE $1 = ANY (users_who_can_read_depends_on)
126                AND is_latest
127                AND object_id != $1
128            ",
129            object_id as ObjectId,
130        )
131        .map(|o| ObjectId::from_uuid(o.object_id))
132        .fetch_all(connection)
133        .await
134        .with_context(|| format!("fetching the list of reverse-dependencies for {object_id:?}"))?;
135        Ok(rdeps)
136    }
137
138    /// Update the list of users who can read `object`
139    ///
140    /// `_lock` is a lock that makes sure `object` is not being modified while this executes.
141    pub async fn update_users_who_can_read<C: CanDoCallbacks>(
142        &self,
143        requested_by: ObjectId,
144        object_id: ObjectId,
145        cb: &C,
146    ) -> anyhow::Result<ReadPermsChanges> {
147        reord::point().await;
148        let mut transaction = self
149            .db
150            .begin()
151            .await
152            .wrap_context("acquiring postgresql transaction")?;
153
154        // Take the locks
155        let _lock = (
156            reord::Lock::take_named(format!("{object_id:?}")).await,
157            self.object_locks.async_lock(object_id).await,
158        );
159
160        // Retrieve the snapshot
161        reord::point().await;
162        let res = sqlx::query!(
163            "
164                SELECT type_id, snapshot_version, snapshot, users_who_can_read FROM snapshots
165                WHERE object_id = $1
166                AND is_latest
167            ",
168            object_id as ObjectId,
169        )
170        .fetch_one(&mut *transaction)
171        .await
172        .with_context(|| format!("fetching latest snapshot for object {object_id:?}"))?;
173        let type_id = TypeId::from_uuid(res.type_id);
174        let users_who_can_read_before = res
175            .users_who_can_read
176            .ok_or_else(|| {
177                crate::Error::Other(anyhow!("Latest snapshot had NULL users_who_can_read"))
178            })?
179            .into_iter()
180            .map(User::from_uuid)
181            .collect::<HashSet<User>>();
182
183        // Figure out the new value of users_who_can_read
184        let can_read = Config::get_users_who_can_read(
185            self,
186            object_id,
187            type_id,
188            res.snapshot_version,
189            res.snapshot,
190            cb,
191        )
192        .await
193        .with_context(|| format!("updating users_who_can_read cache of {object_id:?}"))?;
194        let users_who_can_read_after = can_read.users.iter().copied().collect::<HashSet<User>>();
195
196        // Save it
197        reord::maybe_lock().await;
198        let affected = sqlx::query(
199            "
200                UPDATE snapshots
201                SET users_who_can_read = $1,
202                    users_who_can_read_depends_on = $2
203                WHERE object_id = $3
204                AND is_latest
205            ",
206        )
207        .bind(can_read.users.into_iter().collect::<Vec<_>>())
208        .bind(&can_read.depends_on)
209        .bind(object_id)
210        .execute(&mut *transaction)
211        .await
212        .with_context(|| {
213            format!("updating users_who_can_read in latest snapshot for {object_id:?}")
214        })?
215        .rows_affected();
216        reord::point().await;
217        anyhow::ensure!(
218            affected == 1,
219            "Failed to update latest snapshot of users_who_can_read"
220        );
221
222        // If needed, take a lock on the requester to update its requested-updates field
223        let _lock = if !can_read.depends_on.iter().any(|o| *o == requested_by) {
224            Some((
225                reord::Lock::take_named(format!("{requested_by:?}")),
226                self.object_locks.async_lock(requested_by).await,
227            ))
228        } else {
229            None
230        };
231
232        // Remove the request to update
233        // TODO(misc-low): Consider switching to serializable transactions only and discarding locking. This would
234        // remove the requirement on `Object::users_who_can_read` that `.get()` must always be called in the same
235        // order. Maybe make serializable transactions a feature of this crate? If not we should at least provide
236        // an easy way for users to fuzz their Object implementation for these deadlock conditions
237        reord::maybe_lock().await;
238        let affected = sqlx::query(
239            "
240                UPDATE snapshots
241                SET reverse_dependents_to_update = array_remove(reverse_dependents_to_update, $1)
242                WHERE object_id = $2 AND is_latest
243            ",
244        )
245        .bind(object_id)
246        .bind(requested_by)
247        .execute(&mut *transaction)
248        .await
249        .with_context(|| {
250            format!(
251                "removing {object_id:?} from the list of rev-deps of {requested_by:?} to update"
252            )
253        })?
254        .rows_affected();
255        reord::point().await;
256        anyhow::ensure!(
257            affected == 1,
258            "Failed to mark reverse dependent {object_id:?} of {requested_by:?} as updated"
259        );
260
261        reord::point().await;
262        transaction.commit().await.wrap_with_context(|| {
263            format!("committing transaction that updated the rdeps of {object_id:?}")
264        })?;
265        reord::point().await;
266
267        // TODO(perf-low): there must be some libstd function to compute the two at once?
268        // but symmetric_difference doesn't seem to indicate which set the value came from
269        let lost_read = users_who_can_read_before
270            .difference(&users_who_can_read_after)
271            .copied()
272            .collect();
273        let gained_read = users_who_can_read_after
274            .difference(&users_who_can_read_before)
275            .copied()
276            .collect();
277        Ok(ReadPermsChanges {
278            object_id,
279            type_id,
280            lost_read,
281            gained_read,
282        })
283    }
284
285    async fn update_rdeps<C: CanDoCallbacks>(
286        &self,
287        object_id: ObjectId,
288        cb: &C,
289    ) -> anyhow::Result<Vec<ReadPermsChanges>> {
290        // This is NOT in the same transaction as creation/submission of the object!
291        // The reason is, because CanDoCallbacks will read data outside of the transaction. So it would miss the changes that we brought in
292        // with the transaction otherwise.
293        // An alternative would be to have CanDoCallbacks run inside the transaction itself; but let's keep the current behavior of lazily
294        // recomputing users_who_can_read upon reverse-dependent event submission.
295
296        let rdeps = sqlx::query!(
297            r#"
298                SELECT UNNEST(reverse_dependents_to_update) AS "rdep!"
299                FROM snapshots
300                WHERE object_id = $1
301                AND reverse_dependents_to_update IS NOT NULL
302            "#,
303            object_id as ObjectId,
304        )
305        .map(|r| ObjectId::from_uuid(r.rdep))
306        .fetch_all(&self.db)
307        .await
308        .wrap_with_context(|| {
309            format!("listing reverse dependents of {object_id:?} that need updating")
310        })?;
311        let mut res = Vec::with_capacity(rdeps.len());
312        for o in rdeps {
313            if o != object_id {
314                let changes = self.update_users_who_can_read(object_id, o, cb)
315                    .await
316                    .with_context(|| format!("updating users_who_can_read field for {o:?} on behalf of {object_id:?}"))?;
317                res.push(changes);
318            }
319        }
320        Ok(res)
321    }
322
323    #[allow(clippy::too_many_arguments)] // TODO(misc-low): refactor to have a proper struct
324    async fn write_snapshot<'a, T: Object, C: CanDoCallbacks>(
325        &'a self,
326        transaction: &mut sqlx::PgConnection,
327        snapshot_id: EventId,
328        object_id: ObjectId,
329        is_creation: bool,
330        is_latest: bool,
331        rdeps: Option<&[ObjectId]>,
332        object: &T,
333        updatedness: Updatedness,
334        cb: &'a C,
335    ) -> crate::Result<Vec<ComboLock<'a>>> {
336        let can_read = if is_latest {
337            Some(
338                self.get_users_who_can_read::<T, _>(object_id, object, cb)
339                    .await
340                    .wrap_with_context(|| {
341                        format!(
342                        "listing users who can read for snapshot {snapshot_id:?} of {object_id:?}"
343                    )
344                    })?,
345            )
346        } else {
347            None
348        };
349        assert!(
350            !is_latest || rdeps.is_some(),
351            "Latest snapshots must always list their reverse dependencies"
352        );
353
354        reord::maybe_lock().await;
355        let result = sqlx::query(
356            "INSERT INTO snapshots VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9, $10, $11, $12, $13)",
357        )
358        .bind(snapshot_id)
359        .bind(T::type_ulid())
360        .bind(object_id)
361        .bind(is_creation)
362        .bind(is_latest)
363        .bind(normalizer_version())
364        .bind(T::snapshot_version())
365        .bind(sqlx::types::Json(object))
366        .bind(
367            can_read
368                .as_ref()
369                .map(|u| u.users.iter().cloned().collect::<Vec<_>>()),
370        )
371        .bind(can_read.as_ref().map(|u| &u.depends_on))
372        .bind(rdeps)
373        .bind(object.required_binaries())
374        .bind(updatedness)
375        .execute(&mut *transaction)
376        .await;
377        reord::point().await;
378
379        match result {
380            Ok(_) => Ok(can_read.map(|c| c.locks).unwrap_or_else(Vec::new)),
381            Err(sqlx::Error::Database(err)) if err.constraint() == Some("snapshots_pkey") => {
382                Err(crate::Error::EventAlreadyExists(snapshot_id))
383            }
384            Err(e) => Err(e)
385                .wrap_with_context(|| format!("inserting snapshot {snapshot_id:?} into table")),
386        }
387    }
388
389    async fn create_impl<T: Object, C: CanDoCallbacks>(
390        &self,
391        object_id: ObjectId,
392        created_at: EventId,
393        object: Arc<T>,
394        updatedness: Updatedness,
395        cb: &C,
396    ) -> crate::Result<Either<Arc<T>, Option<EventId>>> {
397        reord::point().await;
398        let mut transaction = self
399            .db
400            .begin()
401            .await
402            .wrap_context("acquiring postgresql transaction")?;
403
404        // Acquire the locks required to create the object
405        let _lock = reord::Lock::take_named(format!("{created_at:?}")).await;
406        let _lock = self.event_locks.async_lock(created_at).await;
407        let _lock = reord::Lock::take_named(format!("{object_id:?}")).await;
408        let _lock = self.object_locks.async_lock(object_id).await;
409        reord::point().await;
410
411        // Object ID uniqueness is enforced by the `snapshot_creations` unique index
412        let type_id = *T::type_ulid();
413        let snapshot_version = T::snapshot_version();
414        let object_json = sqlx::types::Json(&object);
415        let can_read = self
416            .get_users_who_can_read(object_id, &*object, cb)
417            .await
418            .wrap_with_context(|| format!("listing users who can read object {object_id:?}"))?;
419        let rdeps = self
420            .get_rdeps(&mut *transaction, object_id)
421            .await
422            .wrap_with_context(|| format!("listing reverse dependencies of {object_id:?}"))?;
423        let required_binaries = object.required_binaries();
424        reord::maybe_lock().await;
425        let affected = // PostgreSQL needs a lock on the unique index from here until transaction completion, hence the above reord::Lock
426            sqlx::query("INSERT INTO snapshots VALUES ($1, $2, $3, TRUE, TRUE, $4, $5, $6, $7, $8, $9, $10, $11) ON CONFLICT DO NOTHING")
427                .bind(created_at)
428                .bind(type_id)
429                .bind(object_id)
430                .bind(normalizer_version())
431                .bind(snapshot_version)
432                .bind(object_json)
433                .bind(can_read.users.iter().copied().collect::<Vec<_>>())
434                .bind(&can_read.depends_on)
435                .bind(&rdeps)
436                .bind(&required_binaries)
437                .bind(updatedness)
438                .execute(&mut *transaction)
439                .await
440                .wrap_with_context(|| format!("inserting snapshot {created_at:?}"))?
441                .rows_affected();
442        reord::point().await;
443        if affected != 1 {
444            // Check for equality with pre-existing
445            reord::point().await;
446            let rdeps_to_update = sqlx::query!(
447                "
448                    SELECT array_length(reverse_dependents_to_update, 1) AS num_rdeps
449                    FROM snapshots
450                    WHERE snapshot_id = $1
451                    AND type_id = $2
452                    AND object_id = $3
453                    AND is_creation
454                    AND snapshot_version = $4
455                    AND snapshot = $5
456                ",
457                created_at as EventId,
458                *T::type_ulid() as TypeId,
459                object_id as ObjectId,
460                snapshot_version,
461                object_json as _,
462            )
463            .fetch_optional(&mut *transaction)
464            .await
465            .wrap_with_context(|| {
466                format!("checking pre-existing snapshot for {created_at:?} is the same")
467            })?;
468            let Some(rdeps_to_update) = rdeps_to_update else {
469                // There is a conflict. Is it an object conflict or an event conflict?
470                reord::point().await;
471                let object_exists_affected =
472                    sqlx::query("SELECT 1 FROM snapshots WHERE object_id = $1")
473                        .bind(object_id)
474                        .execute(&mut *transaction)
475                        .await
476                        .wrap_with_context(|| {
477                            format!("checking whether {object_id:?} already exists")
478                        })?
479                        .rows_affected();
480                return if object_exists_affected >= 1 {
481                    Err(crate::Error::ObjectAlreadyExists(object_id))
482                } else {
483                    Err(crate::Error::EventAlreadyExists(created_at))
484                };
485            };
486
487            return Ok(Either::Right(
488                rdeps_to_update.num_rdeps.is_some().then_some(created_at),
489            ));
490        }
491
492        // We just inserted. Check that no event existed at this id
493        reord::point().await;
494        let affected = sqlx::query("SELECT event_id FROM events WHERE event_id = $1")
495            .bind(created_at)
496            .execute(&mut *transaction)
497            .await
498            .wrap_context("checking that no event existed with this id yet")?
499            .rows_affected();
500        if affected != 0 {
501            return Err(crate::Error::EventAlreadyExists(created_at));
502        }
503
504        // Check that all required binaries are present, always as the last lock obtained in the transaction
505        check_required_binaries(&mut transaction, required_binaries)
506            .await
507            .wrap_with_context(|| {
508                format!("checking that all binaries for object {object_id:?} are already present")
509            })?;
510
511        reord::point().await;
512        transaction
513            .commit()
514            .await
515            .wrap_with_context(|| format!("committing transaction that created {object_id:?}"))?;
516        reord::point().await;
517
518        Ok(Either::Left(object))
519    }
520
521    async fn submit_impl<T: Object, C: CanDoCallbacks>(
522        &self,
523        object_id: ObjectId,
524        event_id: EventId,
525        event: Arc<T::Event>,
526        updatedness: Updatedness,
527        cb: &C,
528    ) -> crate::Result<Either<Arc<T>, Option<EventId>>> {
529        reord::point().await;
530        let mut transaction = self
531            .db
532            .begin()
533            .await
534            .wrap_context("acquiring postgresql transaction")?;
535
536        // Acquire the locks required to submit the event
537        let _lock = reord::Lock::take_named(format!("{event_id:?}")).await;
538        let _lock = self.event_locks.async_lock(event_id).await;
539        let _lock = reord::Lock::take_named(format!("{object_id:?}")).await;
540        let _lock = self.object_locks.async_lock(object_id).await;
541        reord::point().await;
542
543        // Check the object does exist, is of the right type and is not too new
544        reord::point().await;
545        let creation_snapshot = sqlx::query!(
546            "SELECT snapshot_id, type_id FROM snapshots WHERE object_id = $1 AND is_creation",
547            object_id as ObjectId,
548        )
549        .fetch_optional(&mut *transaction)
550        .await
551        .wrap_with_context(|| format!("locking object {object_id:?} in database"))?;
552        reord::point().await;
553        match creation_snapshot {
554            None => {
555                return Err(crate::Error::ObjectDoesNotExist(object_id));
556            }
557            Some(s) if TypeId::from_uuid(s.type_id) != *T::type_ulid() => {
558                return Err(crate::Error::WrongType {
559                    object_id,
560                    expected_type_id: *T::type_ulid(),
561                    real_type_id: TypeId::from_uuid(s.type_id),
562                })
563            }
564            Some(s) if s.snapshot_id >= event_id.to_uuid() => {
565                return Err(crate::Error::EventTooEarly {
566                    event_id,
567                    object_id,
568                    created_at: EventId::from_uuid(s.snapshot_id),
569                });
570            }
571            _ => (),
572        }
573
574        // Insert the event itself
575        let event_json = sqlx::types::Json(&event);
576        let required_binaries = event.required_binaries();
577        reord::maybe_lock().await;
578        let affected =
579            sqlx::query("INSERT INTO events VALUES ($1, $2, $3, $4, $5) ON CONFLICT DO NOTHING")
580                .bind(event_id)
581                .bind(object_id)
582                .bind(event_json)
583                .bind(required_binaries)
584                .bind(updatedness)
585                .execute(&mut *transaction)
586                .await
587                .wrap_with_context(|| format!("inserting event {event_id:?} in database"))?
588                .rows_affected();
589        reord::point().await;
590        if affected != 1 {
591            // Check for equality with pre-existing
592            reord::point().await;
593            let affected = sqlx::query(
594                "
595                    SELECT 1 FROM events
596                    WHERE event_id = $1
597                    AND object_id = $2
598                    AND data = $3
599                ",
600            )
601            .bind(event_id)
602            .bind(object_id)
603            .bind(event_json)
604            .execute(&mut *transaction)
605            .await
606            .wrap_with_context(|| {
607                format!("checking pre-existing snapshot for {event_id:?} is the same")
608            })?
609            .rows_affected();
610            if affected != 1 {
611                return Err(crate::Error::EventAlreadyExists(event_id));
612            }
613            // Nothing else to do, event was already inserted. Has the current latest snapshot finished
614            // updating all its rdeps?
615            let last_snapshot_id = sqlx::query!(
616                "
617                    SELECT snapshot_id
618                    FROM snapshots
619                    WHERE object_id = $1
620                    AND is_latest
621                    AND array_length(reverse_dependents_to_update, 1) IS NOT NULL
622                ",
623                object_id as ObjectId,
624            )
625            .map(|r| EventId::from_uuid(r.snapshot_id))
626            .fetch_optional(&mut *transaction)
627            .await
628            .wrap_context(
629                "checking whether the event's reverse-dependency changes have been handled",
630            )?;
631            return Ok(Either::Right(last_snapshot_id));
632        }
633
634        // Clear all snapshots after the event
635        reord::maybe_lock().await;
636        sqlx::query("DELETE FROM snapshots WHERE object_id = $1 AND snapshot_id > $2")
637            .bind(object_id)
638            .bind(event_id)
639            .execute(&mut *transaction)
640            .await
641            .wrap_with_context(|| {
642                format!("clearing all snapshots for object {object_id:?} after event {event_id:?}")
643            })?;
644        reord::point().await;
645
646        // Find the last snapshot for the object
647        let last_snapshot = sqlx::query!(
648            "
649                SELECT snapshot_id, is_latest, snapshot_version, snapshot
650                FROM snapshots
651                WHERE object_id = $1
652                ORDER BY snapshot_id DESC
653                LIMIT 1
654            ",
655            object_id.to_uuid(),
656        )
657        .fetch_one(&mut *transaction)
658        .await
659        .wrap_with_context(|| format!("fetching the last snapshot for object {object_id:?}"))?;
660        let mut object =
661            parse_snapshot::<T>(last_snapshot.snapshot_version, last_snapshot.snapshot)
662                .wrap_with_context(|| format!("parsing last snapshot for object {object_id:?}"))?;
663
664        // Remove the "latest snapshot" flag for the object
665        // Note that this can be a no-op if the latest snapshot was already deleted above
666        reord::maybe_lock().await;
667        sqlx::query("UPDATE snapshots SET is_latest = FALSE WHERE object_id = $1 AND is_latest")
668            .bind(object_id)
669            .execute(&mut *transaction)
670            .await
671            .wrap_with_context(|| {
672                format!("removing latest-snapshot flag for object {object_id:?}")
673            })?;
674        reord::point().await;
675
676        // Apply all events between the last snapshot (excluded) and the current event (excluded)
677        if !last_snapshot.is_latest {
678            let from = EventId::from_uuid(last_snapshot.snapshot_id);
679            let to = EventId::from_u128(event_id.as_u128() - 1);
680            apply_events_between(&mut transaction, &mut object, object_id, from, to)
681                .await
682                .wrap_with_context(|| {
683                    format!("applying all events on {object_id:?} between {from:?} and {to:?}")
684                })?;
685        }
686
687        // Add the current event to the last snapshot
688        object.apply(DbPtr::from(object_id), &event);
689
690        // Save the new snapshot (the new event was already saved above)
691        let rdeps = self
692            .get_rdeps(&mut *transaction, object_id)
693            .await
694            .wrap_with_context(|| format!("fetching reverse dependencies of {object_id:?}"))?;
695        let mut _dep_locks = self
696            .write_snapshot(
697                &mut transaction,
698                event_id,
699                object_id,
700                false,
701                last_snapshot.is_latest,
702                if last_snapshot.is_latest {
703                    Some(&rdeps)
704                } else {
705                    None
706                },
707                &object,
708                updatedness,
709                cb,
710            )
711            .await
712            .wrap_with_context(|| format!("writing snapshot {event_id:?} for {object_id:?}"))?;
713
714        // If needed, re-compute the last snapshot
715        if !last_snapshot.is_latest {
716            // Free the locks taken above, as we don't actually need them
717            std::mem::drop(_dep_locks);
718
719            // List all the events since the inserted event
720            reord::point().await;
721            let mut events_since_inserted = sqlx::query!(
722                "
723                    SELECT event_id, data
724                    FROM events
725                    WHERE object_id = $1
726                    AND event_id > $2
727                    ORDER BY event_id ASC
728                ",
729                object_id.to_uuid(),
730                event_id.to_uuid(),
731            )
732            .fetch(&mut *transaction);
733            let mut last_event_id = None;
734            while let Some(e) = events_since_inserted.next().await {
735                let e = e.wrap_with_context(|| {
736                    format!("fetching all events for {object_id:?} after {event_id:?}")
737                })?;
738                last_event_id = Some(e.event_id);
739                let e = serde_json::from_value::<T::Event>(e.data).wrap_with_context(|| {
740                    format!(
741                        "parsing event {:?} of type {:?}",
742                        e.event_id,
743                        T::type_ulid()
744                    )
745                })?;
746
747                object.apply(DbPtr::from(object_id), &e);
748            }
749            std::mem::drop(events_since_inserted);
750
751            // Save the latest snapshot
752            let snapshot_id = EventId::from_uuid(
753                last_event_id
754                    .expect("Entered the 'recomputing last snapshot' stage without any new events"),
755            );
756            _dep_locks = self
757                .write_snapshot(
758                    &mut transaction,
759                    snapshot_id,
760                    object_id,
761                    false,
762                    true,
763                    Some(&rdeps),
764                    &object,
765                    updatedness,
766                    cb,
767                )
768                .await
769                .wrap_with_context(|| {
770                    format!("writing snapshot {snapshot_id:?} for {object_id:?}")
771                })?;
772        }
773
774        // Check that all required binaries are present, always as the last lock obtained in the transaction
775        check_required_binaries(&mut transaction, event.required_binaries())
776            .await
777            .wrap_with_context(|| {
778                format!("checking that all binaries for object {object_id:?} are already present")
779            })?;
780
781        reord::point().await;
782        transaction.commit().await.wrap_with_context(|| {
783            format!("committing transaction adding event {event_id:?} to object {object_id:?}")
784        })?;
785        reord::point().await;
786
787        Ok(Either::Left(Arc::new(object)))
788    }
789}
790
791impl<Config: crdb_core::Config> Db for PostgresDb<Config> {
792    async fn create<T: Object>(
793        &self,
794        object_id: ObjectId,
795        created_at: EventId,
796        object: Arc<T>,
797        updatedness: Option<Updatedness>,
798        _importance: Importance,
799    ) -> crate::Result<Option<Arc<T>>> {
800        let updatedness =
801            updatedness.expect("Called PostgresDb::create without specifying updatedness");
802        Ok(self
803            .create_and_return_rdep_changes::<T>(object_id, created_at, object, updatedness)
804            .await?
805            .map(|(snap, _)| snap))
806    }
807
808    async fn submit<T: Object>(
809        &self,
810        object_id: ObjectId,
811        event_id: EventId,
812        event: Arc<T::Event>,
813        updatedness: Option<Updatedness>,
814        _additional_importance: Importance,
815    ) -> crate::Result<Option<Arc<T>>> {
816        let updatedness =
817            updatedness.expect("Called PostgresDb::create without specifying updatedness");
818        Ok(self
819            .submit_and_return_rdep_changes::<T>(object_id, event_id, event, updatedness)
820            .await?
821            .map(|(snap, _)| snap))
822    }
823
824    async fn get_latest<T: Object>(
825        &self,
826        object_id: ObjectId,
827        _importance: Importance,
828    ) -> crate::Result<Arc<T>> {
829        reord::point().await;
830        let mut transaction = self
831            .db
832            .begin()
833            .await
834            .wrap_context("acquiring postgresql transaction")?;
835
836        // First, check the existence and requested type
837        reord::point().await;
838        let latest_snapshot = sqlx::query!(
839            "
840                SELECT snapshot_id, type_id, snapshot_version, snapshot
841                FROM snapshots
842                WHERE object_id = $1
843                AND is_latest
844            ",
845            object_id as ObjectId,
846        )
847        .fetch_optional(&mut *transaction)
848        .await
849        .wrap_with_context(|| format!("fetching latest snapshot for object {object_id:?}"))?;
850        let latest_snapshot = match latest_snapshot {
851            Some(s) => s,
852            None => return Err(crate::Error::ObjectDoesNotExist(object_id)),
853        };
854        let real_type_id = TypeId::from_uuid(latest_snapshot.type_id);
855        let expected_type_id = *T::type_ulid();
856        if real_type_id != expected_type_id {
857            return Err(crate::Error::WrongType {
858                object_id,
859                expected_type_id,
860                real_type_id,
861            });
862        }
863
864        // All good, let's parse the snapshot and return
865        reord::point().await;
866        let res = parse_snapshot::<T>(latest_snapshot.snapshot_version, latest_snapshot.snapshot)
867            .wrap_with_context(|| format!("parsing latest snapshot for {object_id:?}"))?;
868        Ok(Arc::new(res))
869    }
870
871    async fn create_binary(&self, binary_id: BinPtr, data: Arc<[u8]>) -> crate::Result<()> {
872        if crdb_core::hash_binary(&data) != binary_id {
873            return Err(crate::Error::BinaryHashMismatch(binary_id));
874        }
875        reord::maybe_lock().await;
876        sqlx::query("INSERT INTO binaries VALUES ($1, $2) ON CONFLICT DO NOTHING")
877            .bind(binary_id)
878            .bind(&*data)
879            .execute(&self.db)
880            .await
881            .wrap_with_context(|| format!("inserting binary {binary_id:?} into database"))?;
882        reord::point().await;
883        Ok(())
884    }
885
886    async fn get_binary(&self, binary_id: BinPtr) -> crate::Result<Option<Arc<[u8]>>> {
887        reord::point().await;
888        Ok(sqlx::query!(
889            "SELECT data FROM binaries WHERE binary_id = $1",
890            binary_id as BinPtr
891        )
892        .fetch_optional(&self.db)
893        .await
894        .wrap_with_context(|| format!("getting {binary_id:?} from database"))?
895        .map(|res| res.data.into_boxed_slice().into()))
896    }
897
898    /// Returns the number of errors that happened while re-encoding
899    async fn reencode_old_versions<T: Object>(&self) -> usize {
900        let mut num_errors = 0;
901        let mut old_snapshots = sqlx::query(
902            "
903                SELECT object_id, snapshot_id
904                FROM snapshots
905                WHERE type_id = $1
906                AND (snapshot_version < $2 OR normalizer_version < $3)
907            ",
908        )
909        .bind(T::type_ulid())
910        .bind(T::snapshot_version())
911        .bind(normalizer_version())
912        .fetch(&self.db);
913        while let Some(s) = old_snapshots.next().await {
914            let s = match s {
915                Ok(s) => s,
916                Err(err) => {
917                    num_errors += 1;
918                    tracing::error!(?err, "failed retrieving one snapshot for upgrade");
919                    continue;
920                }
921            };
922            let object_id = ObjectId::from_uuid(s.get(0));
923            let snapshot_id = EventId::from_uuid(s.get(1));
924            if let Err(err) = self.reencode::<T>(object_id, snapshot_id).await {
925                num_errors += 1;
926                tracing::error!(
927                    ?err,
928                    ?object_id,
929                    ?snapshot_id,
930                    "failed reencoding snapshot with newer version",
931                );
932            }
933        }
934        num_errors
935    }
936
937    async fn assert_invariants_generic(&self) {
938        // All binaries are present
939        assert_eq!(
940            0,
941            sqlx::query(
942                "
943                    (
944                        SELECT unnest(required_binaries)
945                        FROM snapshots
946                        UNION
947                        SELECT unnest(required_binaries)
948                        FROM events
949                    )
950                    EXCEPT
951                    SELECT binary_id
952                    FROM binaries
953                ",
954            )
955            .execute(&self.db)
956            .await
957            .unwrap()
958            .rows_affected()
959        );
960
961        // No event references an object without a creation snapshot
962        assert_eq!(
963            0,
964            sqlx::query(
965                "
966                    SELECT object_id FROM events
967                    EXCEPT
968                    SELECT object_id FROM snapshots WHERE is_creation
969                "
970            )
971            .execute(&self.db)
972            .await
973            .unwrap()
974            .rows_affected()
975        );
976
977        // All non-creation snapshots match an event
978        assert_eq!(
979            0,
980            sqlx::query(
981                "
982                    SELECT snapshot_id AS id FROM snapshots WHERE NOT is_creation
983                    EXCEPT
984                    SELECT event_id AS id FROM events
985                "
986            )
987            .execute(&self.db)
988            .await
989            .unwrap()
990            .rows_affected()
991        );
992
993        // Snapshot and events at the same time are on the same object
994        assert_eq!(
995            0,
996            sqlx::query(
997                "
998                    SELECT snapshot_id FROM snapshots
999                    LEFT JOIN events ON snapshots.snapshot_id = events.event_id
1000                    WHERE snapshots.object_id != events.object_id
1001                "
1002            )
1003            .execute(&self.db)
1004            .await
1005            .unwrap()
1006            .rows_affected()
1007        );
1008
1009        // All objects have a single type
1010        assert_eq!(
1011            0,
1012            sqlx::query(
1013                "
1014                    SELECT object_id
1015                    FROM snapshots
1016                    GROUP BY object_id
1017                    HAVING COUNT(DISTINCT type_id) > 1
1018                "
1019            )
1020            .execute(&self.db)
1021            .await
1022            .unwrap()
1023            .rows_affected()
1024        )
1025    }
1026
1027    async fn assert_invariants_for<T: Object>(&self) {
1028        // For each object
1029        let objects = sqlx::query!(
1030            "SELECT object_id FROM snapshots WHERE type_id = $1",
1031            T::type_ulid() as &TypeId
1032        )
1033        .fetch_all(&self.db)
1034        .await
1035        .unwrap();
1036        for o in objects {
1037            // It has a creation and a latest snapshot
1038            let creation: uuid::Uuid = sqlx::query(
1039                "SELECT snapshot_id FROM snapshots WHERE object_id = $1 AND is_creation",
1040            )
1041            .bind(o.object_id)
1042            .fetch_one(&self.db)
1043            .await
1044            .unwrap()
1045            .get(0);
1046            let latest: uuid::Uuid =
1047                sqlx::query("SELECT snapshot_id FROM snapshots WHERE object_id = $1 AND is_latest")
1048                    .bind(o.object_id)
1049                    .fetch_one(&self.db)
1050                    .await
1051                    .unwrap()
1052                    .get(0);
1053
1054            // They surround all events and snapshots
1055            assert_eq!(
1056                0,
1057                sqlx::query(
1058                    "
1059                        SELECT snapshot_id
1060                        FROM snapshots
1061                        WHERE object_id = $1
1062                        AND (snapshot_id < $2 OR snapshot_id > $3)
1063                    ",
1064                )
1065                .bind(o.object_id)
1066                .bind(creation)
1067                .bind(latest)
1068                .execute(&self.db)
1069                .await
1070                .unwrap()
1071                .rows_affected()
1072            );
1073            assert_eq!(
1074                0,
1075                sqlx::query(
1076                    "
1077                        SELECT event_id
1078                        FROM events
1079                        WHERE object_id = $1
1080                        AND (event_id <= $2 OR event_id > $3)
1081                    ",
1082                )
1083                .bind(o.object_id)
1084                .bind(creation)
1085                .bind(latest)
1086                .execute(&self.db)
1087                .await
1088                .unwrap()
1089                .rows_affected()
1090            );
1091
1092            // Rebuilding the object gives the same snapshots
1093            let snapshots = sqlx::query!(
1094                "SELECT * FROM snapshots WHERE object_id = $1 ORDER BY snapshot_id",
1095                o.object_id
1096            )
1097            .fetch_all(&self.db)
1098            .await
1099            .unwrap();
1100            let events = sqlx::query!(
1101                "SELECT * FROM events WHERE object_id = $1 ORDER BY event_id",
1102                o.object_id
1103            )
1104            .fetch_all(&self.db)
1105            .await
1106            .unwrap();
1107
1108            assert_eq!(TypeId::from_uuid(snapshots[0].type_id), *T::type_ulid());
1109            assert!(snapshots[0].is_creation);
1110            let mut object =
1111                parse_snapshot::<T>(snapshots[0].snapshot_version, snapshots[0].snapshot.clone())
1112                    .unwrap();
1113            assert_eq!(
1114                snapshots[0].required_binaries,
1115                object
1116                    .required_binaries()
1117                    .into_iter()
1118                    .map(|b| b.to_uuid())
1119                    .collect::<Vec<_>>()
1120            );
1121
1122            let mut snapshot_idx = 1;
1123            let mut event_idx = 0;
1124            loop {
1125                if event_idx == events.len() {
1126                    assert_eq!(snapshot_idx, snapshots.len());
1127                    break;
1128                }
1129                let e = &events[event_idx];
1130                event_idx += 1;
1131                let event = serde_json::from_value::<T::Event>(e.data.clone()).unwrap();
1132                assert_eq!(
1133                    event
1134                        .required_binaries()
1135                        .into_iter()
1136                        .map(|b| b.to_uuid())
1137                        .collect::<Vec<_>>(),
1138                    e.required_binaries
1139                );
1140                object.apply(DbPtr::from(ObjectId::from_uuid(o.object_id)), &event);
1141                if snapshots[snapshot_idx].snapshot_id != e.event_id {
1142                    continue;
1143                }
1144                let s = &snapshots[snapshot_idx];
1145                snapshot_idx += 1;
1146                assert_eq!(TypeId::from_uuid(s.type_id), *T::type_ulid());
1147                let snapshot = parse_snapshot::<T>(s.snapshot_version, s.snapshot.clone()).unwrap();
1148                assert!(object == snapshot);
1149                assert_eq!(
1150                    s.required_binaries,
1151                    snapshot
1152                        .required_binaries()
1153                        .into_iter()
1154                        .map(|b| b.to_uuid())
1155                        .collect::<Vec<_>>()
1156                );
1157            }
1158            if events.is_empty() {
1159                assert!(snapshots.len() == 1);
1160            } else {
1161                assert_eq!(
1162                    snapshots[snapshots.len() - 1].snapshot_id,
1163                    events[events.len() - 1].event_id
1164                );
1165            }
1166            assert!(snapshots[snapshots.len() - 1].is_latest);
1167            assert_eq!(
1168                snapshots[snapshots.len() - 1]
1169                    .users_who_can_read
1170                    .as_ref()
1171                    .unwrap()
1172                    .iter()
1173                    .map(|u| User::from_uuid(*u))
1174                    .collect::<HashSet<_>>(),
1175                object.users_who_can_read(self).await.unwrap()
1176            );
1177        }
1178    }
1179}
1180
1181type TrackedLock<'cb> = (
1182    reord::Lock,
1183    <LockPool<ObjectId> as Lockable<ObjectId, ()>>::Guard<'cb>,
1184);
1185
1186struct TrackingCanDoCallbacks<'cb, 'lockpool, C: CanDoCallbacks> {
1187    cb: &'cb C,
1188    already_taken_lock: ObjectId,
1189    object_locks: &'lockpool LockPool<ObjectId>,
1190    locks: Mutex<HashMap<ObjectId, TrackedLock<'lockpool>>>,
1191}
1192
1193impl<C: CanDoCallbacks> CanDoCallbacks for TrackingCanDoCallbacks<'_, '_, C> {
1194    async fn get<T: Object>(&self, object_id: crate::DbPtr<T>) -> crate::Result<Arc<T>> {
1195        let id = ObjectId(object_id.id);
1196        if id != self.already_taken_lock {
1197            if let hash_map::Entry::Vacant(v) = self.locks.lock().await.entry(id) {
1198                v.insert((
1199                    reord::Lock::take_named(format!("{id:?}")).await,
1200                    self.object_locks.async_lock(id).await,
1201                ));
1202            }
1203        }
1204        self.cb
1205            .get::<T>(DbPtr::from(ObjectId(object_id.id)))
1206            .await
1207            .wrap_with_context(|| format!("requesting {object_id:?} from database"))
1208    }
1209}
1210
1211pub type ComboLock<'a> = (
1212    reord::Lock,
1213    <lockable::LockPool<ObjectId> as lockable::Lockable<ObjectId, ()>>::Guard<'a>,
1214);
1215
1216impl<Config: crdb_core::Config> ServerSideDb for PostgresDb<Config> {
1217    type Connection = sqlx::PgConnection;
1218    type Transaction<'a> = sqlx::Transaction<'a, sqlx::Postgres>;
1219    type Lock<'a> = ComboLock<'a>;
1220
1221    /// This function assumes that the lock on `object_id` is already taken.
1222    fn get_users_who_can_read<'a, 'ret: 'a, T: Object, C: CanDoCallbacks>(
1223        &'ret self,
1224        object_id: ObjectId,
1225        object: &'a T,
1226        cb: &'a C,
1227    ) -> Pin<Box<dyn 'a + waaaa::Future<Output = anyhow::Result<UsersWhoCanRead<Self::Lock<'ret>>>>>>
1228    {
1229        Box::pin(async move {
1230            let cb = TrackingCanDoCallbacks::<'a, 'ret> {
1231                cb,
1232                already_taken_lock: object_id,
1233                object_locks: &self.object_locks,
1234                locks: Mutex::new(HashMap::new()),
1235            };
1236
1237            let users = object.users_who_can_read(&cb).await.with_context(|| {
1238                format!("figuring out the list of users who can read {object_id:?}")
1239            })?;
1240            let cb_locks = cb.locks.into_inner();
1241            let mut depends_on = Vec::with_capacity(cb_locks.len());
1242            let mut locks = Vec::with_capacity(cb_locks.len());
1243            for (o, l) in cb_locks {
1244                depends_on.push(o);
1245                locks.push(l);
1246            }
1247            let res = UsersWhoCanRead {
1248                users,
1249                depends_on,
1250                locks,
1251            };
1252            Ok(res)
1253        })
1254    }
1255
1256    async fn get_transaction(&self) -> crate::Result<sqlx::Transaction<'_, sqlx::Postgres>> {
1257        let mut transaction = self
1258            .db
1259            .begin()
1260            .await
1261            .wrap_context("acquiring postgresql transaction")?;
1262
1263        // Atomically perform all the reads in this transaction
1264        reord::point().await;
1265        sqlx::query("SET TRANSACTION ISOLATION LEVEL REPEATABLE READ")
1266            .execute(&mut *transaction)
1267            .await
1268            .wrap_context("setting transaction as repeatable read")?;
1269
1270        Ok(transaction)
1271    }
1272
1273    async fn get_latest_snapshot(
1274        &self,
1275        transaction: &mut sqlx::PgConnection,
1276        user: User,
1277        object_id: ObjectId,
1278    ) -> crate::Result<Arc<serde_json::Value>> {
1279        reord::point().await;
1280        let latest_snapshot = sqlx::query!(
1281            "
1282                SELECT snapshot_id, type_id, snapshot_version, snapshot
1283                FROM snapshots
1284                WHERE object_id = $1
1285                AND is_latest
1286                AND $2 = ANY (users_who_can_read)
1287            ",
1288            object_id as ObjectId,
1289            user as User,
1290        )
1291        .fetch_optional(&mut *transaction)
1292        .await
1293        .wrap_with_context(|| format!("fetching latest snapshot for object {object_id:?}"))?;
1294        let latest_snapshot = match latest_snapshot {
1295            Some(s) => s,
1296            None => return Err(crate::Error::ObjectDoesNotExist(object_id)),
1297        };
1298        Ok(Arc::new(latest_snapshot.snapshot))
1299    }
1300
1301    // TODO(test-high): introduce in server-side fuzzer
1302    async fn get_all(
1303        &self,
1304        transaction: &mut sqlx::PgConnection,
1305        user: User,
1306        object_id: ObjectId,
1307        only_updated_since: Option<Updatedness>,
1308    ) -> crate::Result<ObjectData> {
1309        let min_last_modified = only_updated_since
1310            .map(|t| EventId::from_u128(t.as_u128().saturating_add(1))) // Handle None, Some(0) and Some(N)
1311            .unwrap_or(EventId::from_u128(0));
1312
1313        // Check that our user has permissions to read the object and retrieve the type_id
1314        reord::point().await;
1315        let latest = sqlx::query!(
1316            "
1317                SELECT type_id
1318                FROM snapshots
1319                WHERE object_id = $1
1320                AND is_latest
1321                AND $2 = ANY (users_who_can_read)
1322            ",
1323            object_id as ObjectId,
1324            user as User,
1325        )
1326        .fetch_optional(&mut *transaction)
1327        .await
1328        .wrap_with_context(|| format!("checking whether {user:?} can read {object_id:?}"))?;
1329        let Some(latest) = latest else {
1330            return Err(crate::Error::ObjectDoesNotExist(object_id));
1331        };
1332        let type_id = TypeId::from_uuid(latest.type_id);
1333
1334        reord::point().await;
1335        let creation_snapshot = sqlx::query!(
1336            "
1337                SELECT snapshot_id, type_id, snapshot_version, snapshot
1338                FROM snapshots
1339                WHERE object_id = $1
1340                AND is_creation
1341                AND last_modified >= $2
1342            ",
1343            object_id as ObjectId,
1344            min_last_modified as EventId,
1345        )
1346        .fetch_optional(&mut *transaction)
1347        .await
1348        .wrap_with_context(|| format!("fetching creation snapshot for object {object_id:?}"))?;
1349
1350        reord::point().await;
1351        let events = sqlx::query!(
1352            "
1353                SELECT event_id, data
1354                FROM events
1355                WHERE object_id = $1
1356                AND last_modified >= $2
1357            ",
1358            object_id as ObjectId,
1359            min_last_modified as EventId,
1360        )
1361        .map(|r| (EventId::from_uuid(r.event_id), Arc::new(r.data)))
1362        .fetch(&mut *transaction)
1363        .try_collect::<BTreeMap<EventId, Arc<serde_json::Value>>>()
1364        .await
1365        .wrap_with_context(|| format!("fetching all events for object {object_id:?}"))?;
1366
1367        reord::point().await;
1368        let last_modified = sqlx::query!(
1369            "SELECT last_modified FROM snapshots WHERE object_id = $1 AND is_latest",
1370            object_id as ObjectId
1371        )
1372        .fetch_one(&mut *transaction)
1373        .await
1374        .wrap_context("retrieving the last_modified time")?;
1375        let last_modified = Updatedness::from_uuid(last_modified.last_modified);
1376
1377        Ok(ObjectData {
1378            object_id,
1379            type_id,
1380            creation_snapshot: creation_snapshot.map(|c| {
1381                (
1382                    EventId::from_uuid(c.snapshot_id),
1383                    c.snapshot_version,
1384                    Arc::new(c.snapshot),
1385                )
1386            }),
1387            events,
1388            now_have_all_until: last_modified,
1389        })
1390    }
1391
1392    async fn server_query(
1393        &self,
1394        user: User,
1395        type_id: TypeId,
1396        only_updated_since: Option<Updatedness>,
1397        query: Arc<Query>,
1398    ) -> crate::Result<Vec<ObjectId>> {
1399        reord::point().await;
1400        let mut transaction = self
1401            .db
1402            .begin()
1403            .await
1404            .wrap_context("acquiring postgresql transaction")?;
1405
1406        // Atomically perform all the reads here
1407        reord::point().await;
1408        sqlx::query("SET TRANSACTION ISOLATION LEVEL REPEATABLE READ")
1409            .execute(&mut *transaction)
1410            .await
1411            .wrap_context("setting transaction as repeatable read")?;
1412
1413        let query_sql = format!(
1414            "
1415                SELECT object_id
1416                FROM snapshots
1417                WHERE is_latest
1418                AND type_id = $1
1419                AND $2 = ANY (users_who_can_read)
1420                AND last_modified >= $3
1421                AND ({})
1422            ",
1423            where_clause(&query, 4)
1424        );
1425        let min_last_modified = only_updated_since
1426            .map(|t| EventId::from_u128(t.as_u128().saturating_add(1))) // Handle None, Some(0) and Some(N)
1427            .unwrap_or(EventId::from_u128(0));
1428        reord::point().await;
1429        let mut query_sql = sqlx::query(&query_sql)
1430            .persistent(false) // TODO(blocked): remove when https://github.com/launchbadge/sqlx/issues/2981 is fixed
1431            .bind(type_id)
1432            .bind(user)
1433            .bind(min_last_modified);
1434        for b in binds(&query)? {
1435            match b {
1436                Bind::Json(v) => query_sql = query_sql.bind(v),
1437                Bind::Str(v) => query_sql = query_sql.bind(v),
1438                Bind::String(v) => query_sql = query_sql.bind(v),
1439                Bind::Decimal(v) => query_sql = query_sql.bind(v),
1440                Bind::I32(v) => query_sql = query_sql.bind(v),
1441            }
1442        }
1443        reord::point().await;
1444        let res = query_sql
1445            .map(|row| ObjectId::from_uuid(row.get(0)))
1446            .fetch_all(&mut *transaction)
1447            .await
1448            .wrap_with_context(|| format!("listing objects matching query {query:?}"))?;
1449
1450        Ok(res)
1451    }
1452
1453    /// Cleans up and optimizes up the database
1454    ///
1455    /// After running this, the database will reject any new change that would happen before
1456    /// `no_new_changes_before` if it is set.
1457    async fn server_vacuum(
1458        &self,
1459        no_new_changes_before: Option<EventId>,
1460        updatedness: Updatedness,
1461        kill_sessions_older_than: Option<SystemTime>,
1462        mut notify_recreation: impl FnMut(Update, HashSet<User>),
1463    ) -> crate::Result<()> {
1464        // TODO(perf-high): do not vacuum away binaries that have been uploaded less than an hour ago
1465        if let Some(t) = kill_sessions_older_than {
1466            // Discard all sessions that were last active too long ago
1467            reord::point().await;
1468            sqlx::query!(
1469                "DELETE FROM sessions WHERE last_active < $1",
1470                t.ms_since_posix()?,
1471            )
1472            .execute(&self.db)
1473            .await
1474            .wrap_context("cleaning up old sessions")?;
1475        }
1476
1477        let cache_db = self
1478            .cache_db
1479            .upgrade()
1480            .expect("Called PostgresDb::vacuum after CacheDb went away");
1481
1482        {
1483            // Discard all unrequired snapshots, as well as unused fields of creation snapshots
1484            // In addition, auto-recreate the objects that need re-creation
1485            reord::point().await;
1486            let mut objects = sqlx::query!(
1487                "
1488                    SELECT DISTINCT object_id, type_id
1489                    FROM snapshots
1490                    WHERE (NOT (is_creation OR is_latest))
1491                    OR ((NOT is_latest)
1492                        AND (users_who_can_read IS NOT NULL
1493                            OR users_who_can_read_depends_on IS NOT NULL
1494                            OR reverse_dependents_to_update IS NOT NULL))
1495                    OR ((NOT is_creation) AND snapshot_id < $1)
1496                ",
1497                no_new_changes_before.unwrap_or_else(|| EventId::from_u128(0)) as EventId
1498            )
1499            .fetch(&self.db);
1500            while let Some(row) = objects.next().await {
1501                let row = row.wrap_context("listing objects with snapshots to cleanup")?;
1502                let object_id = ObjectId::from_uuid(row.object_id);
1503                let _lock = reord::Lock::take_named(format!("{object_id:?}")).await;
1504                let _lock = self.object_locks.async_lock(object_id).await;
1505                reord::maybe_lock().await;
1506                sqlx::query(
1507                    "DELETE FROM snapshots WHERE object_id = $1 AND NOT (is_creation OR is_latest)",
1508                )
1509                .bind(object_id)
1510                .execute(&self.db)
1511                .await
1512                .wrap_with_context(|| format!("deleting useless snapshots from {object_id:?}"))?;
1513                reord::maybe_lock().await;
1514                sqlx::query(
1515                    "
1516                        UPDATE snapshots
1517                        SET users_who_can_read = NULL,
1518                            users_who_can_read_depends_on = NULL,
1519                            reverse_dependents_to_update = NULL
1520                        WHERE object_id = $1
1521                        AND NOT is_latest
1522                    ",
1523                )
1524                .bind(object_id)
1525                .execute(&self.db)
1526                .await
1527                .wrap_with_context(|| format!("resetting creation snapshot of {object_id:?}"))?;
1528                reord::point().await;
1529                if let Some(event_id) = no_new_changes_before {
1530                    let type_id = TypeId::from_uuid(row.type_id);
1531                    reord::point().await;
1532                    let recreation_result = Config::recreate_no_lock(
1533                        self,
1534                        type_id,
1535                        object_id,
1536                        event_id,
1537                        updatedness,
1538                        &*cache_db,
1539                    )
1540                    .await
1541                    .wrap_with_context(|| {
1542                        format!("recreating {object_id:?} at time {event_id:?}")
1543                    })?;
1544                    if let Some((new_created_at, snapshot_version, data, users_who_can_read)) =
1545                        recreation_result
1546                    {
1547                        reord::point().await;
1548                        notify_recreation(
1549                            Update {
1550                                object_id,
1551                                data: UpdateData::Creation {
1552                                    type_id,
1553                                    created_at: new_created_at,
1554                                    snapshot_version,
1555                                    data: Arc::new(data),
1556                                },
1557                            },
1558                            users_who_can_read,
1559                        );
1560                    }
1561                }
1562            }
1563        }
1564
1565        // Get rid of no-longer-referenced binaries
1566        reord::maybe_lock().await;
1567        sqlx::query(
1568            "
1569                DELETE FROM binaries
1570                WHERE NOT EXISTS (
1571                    SELECT 1 FROM snapshots WHERE binary_id = ANY(required_binaries)
1572                    UNION
1573                    SELECT 1 FROM events WHERE binary_id = ANY(required_binaries)
1574                )
1575            ",
1576        )
1577        .execute(&self.db)
1578        .await
1579        .wrap_context("deleting no-longer-referenced binaries")?;
1580        reord::point().await;
1581
1582        // Finally, take care of the database itself
1583        // This is very slow for tests and produces deadlock false-positives, plus it's useless there.
1584        // So let's just skip it in tests.
1585        #[cfg(not(test))]
1586        sqlx::query("VACUUM ANALYZE")
1587            .execute(&self.db)
1588            .await
1589            .wrap_context("vacuuming database")?;
1590        reord::point().await;
1591
1592        Ok(())
1593    }
1594
1595    async fn recreate_at<'a, T: Object, C: CanDoCallbacks>(
1596        &'a self,
1597        object_id: ObjectId,
1598        event_id: EventId,
1599        updatedness: Updatedness,
1600        cb: &'a C,
1601    ) -> crate::Result<Option<(EventId, Arc<T>)>> {
1602        if event_id.0.timestamp_ms()
1603            > SystemTime::now()
1604                .duration_since(SystemTime::UNIX_EPOCH)
1605                .map(|t| t.as_millis())
1606                .unwrap_or(0) as u64
1607                - 1000 * 3600
1608        {
1609            tracing::warn!(
1610                "Re-creating object {object_id:?} at time {event_id:?} which is less than an hour old"
1611            );
1612        }
1613
1614        reord::point().await;
1615        let mut transaction = self
1616            .db
1617            .begin()
1618            .await
1619            .wrap_context("acquiring postgresql transaction")?;
1620
1621        // Get the creation snapshot
1622        reord::point().await;
1623        let creation_snapshot = sqlx::query!(
1624            "SELECT snapshot_id, type_id FROM snapshots WHERE object_id = $1 AND is_creation",
1625            object_id as ObjectId,
1626        )
1627        .fetch_optional(&mut *transaction)
1628        .await
1629        .wrap_with_context(|| {
1630            format!("getting creation snapshot of {object_id:?} for re-creation")
1631        })?
1632        .ok_or(crate::Error::ObjectDoesNotExist(object_id))?;
1633        let real_type_id = TypeId::from_uuid(creation_snapshot.type_id);
1634        let expected_type_id = *T::type_ulid();
1635        if real_type_id != expected_type_id {
1636            return Err(crate::Error::WrongType {
1637                object_id,
1638                expected_type_id,
1639                real_type_id,
1640            });
1641        }
1642        if EventId::from_uuid(creation_snapshot.snapshot_id) >= event_id {
1643            // Already created after the requested time
1644            return Ok(None);
1645        }
1646
1647        // Figure out the cutoff event
1648        reord::point().await;
1649        let event = sqlx::query!(
1650            "
1651                SELECT event_id
1652                FROM events
1653                WHERE object_id = $1
1654                AND event_id <= $2
1655                ORDER BY event_id DESC
1656                LIMIT 1
1657            ",
1658            object_id as ObjectId,
1659            event_id as EventId,
1660        )
1661        .fetch_optional(&mut *transaction)
1662        .await
1663        .wrap_with_context(|| {
1664            format!("recovering the last event for {object_id:?} before cutoff time {event_id:?}")
1665        })?;
1666        let cutoff_time = match event {
1667            None => return Ok(None), // Nothing to do, there was no event before the cutoff already
1668            Some(e) => EventId::from_uuid(e.event_id),
1669        };
1670
1671        // Fetch the last snapshot before cutoff
1672        reord::point().await;
1673        let snapshot = sqlx::query!(
1674            "
1675                SELECT snapshot_id, snapshot_version, snapshot
1676                FROM snapshots
1677                WHERE object_id = $1
1678                AND snapshot_id <= $2
1679                ORDER BY snapshot_id DESC
1680                LIMIT 1
1681            ",
1682            object_id as ObjectId,
1683            cutoff_time as EventId,
1684        )
1685        .fetch_one(&mut *transaction)
1686        .await
1687        .wrap_with_context(|| {
1688            format!("fetching latest snapshot before {cutoff_time:?} for object {object_id:?}")
1689        })?;
1690
1691        // Delete all the snapshots before cutoff
1692        reord::maybe_lock().await;
1693        sqlx::query("DELETE FROM snapshots WHERE object_id = $1 AND snapshot_id < $2")
1694            .bind(object_id)
1695            .bind(cutoff_time)
1696            .execute(&mut *transaction)
1697            .await
1698            .wrap_with_context(|| {
1699                format!("deleting all snapshots for {object_id:?} before {cutoff_time:?}")
1700            })?;
1701        reord::point().await;
1702
1703        let latest_object = if EventId::from_uuid(snapshot.snapshot_id) != cutoff_time {
1704            // Insert a new snapshot dated at `cutoff_time`
1705
1706            // Apply all the events between latest snapshot (excluded) and asked recreation time (included)
1707            let mut object = parse_snapshot::<T>(snapshot.snapshot_version, snapshot.snapshot)
1708                .wrap_with_context(|| {
1709                    format!(
1710                        "parsing snapshot {:?} as {:?}",
1711                        snapshot.snapshot_id,
1712                        T::type_ulid()
1713                    )
1714                })?;
1715
1716            let snapshot_id = EventId::from_uuid(snapshot.snapshot_id);
1717            apply_events_between(
1718                &mut transaction,
1719                &mut object,
1720                object_id,
1721                snapshot_id,
1722                cutoff_time,
1723            )
1724            .await
1725            .wrap_with_context(|| {
1726                format!(
1727                    "applying on {object_id:?} events between {snapshot_id:?} and {cutoff_time:?}"
1728                )
1729            })?;
1730
1731            // Insert the new creation snapshot. This cannot conflict because we deleted
1732            // the previous creation snapshot just above. There was no snapshot at this event
1733            // before, so it cannot be the latest snapshot.
1734            // Note that we do not save the locks here. This is okay, because this is never a latest snapshot,
1735            // and thus cannot need the remote locks.
1736            self.write_snapshot(
1737                &mut transaction,
1738                cutoff_time,
1739                object_id,
1740                true,
1741                false,
1742                None, // is_latest = false, we don't care about rdeps
1743                &object,
1744                updatedness,
1745                cb,
1746            )
1747            .await
1748            .wrap_with_context(|| format!("writing snapshot {cutoff_time:?} for {object_id:?}"))?;
1749            object
1750        } else {
1751            // Just update the `cutoff_time` snapshot to record it's the creation snapshot
1752            reord::maybe_lock().await;
1753            let latest = sqlx::query!(
1754                "
1755                    UPDATE snapshots
1756                    SET is_creation = TRUE
1757                    WHERE snapshot_id = $1
1758                    RETURNING snapshot_version, snapshot
1759                ",
1760                cutoff_time as EventId,
1761            )
1762                .fetch_one(&mut *transaction)
1763                .await
1764                .wrap_with_context(|| {
1765                    format!(
1766                        "marking snapshot {cutoff_time:?} as the creation one for {object_id:?} and retrieving its data"
1767                    )
1768                })?;
1769            let object = parse_snapshot::<T>(latest.snapshot_version, latest.snapshot)
1770                .wrap_context("deserializing snapshot data")?;
1771            reord::point().await;
1772            object
1773        };
1774
1775        // We now have all the new information. We can delete the events.
1776        reord::maybe_lock().await;
1777        sqlx::query("DELETE FROM events WHERE object_id = $1 AND event_id <= $2")
1778            .bind(object_id)
1779            .bind(cutoff_time)
1780            .execute(&mut *transaction)
1781            .await
1782            .wrap_with_context(|| {
1783                format!("deleting all events for {object_id:?} before {cutoff_time:?}")
1784            })?;
1785        reord::point().await;
1786
1787        // Mark the corresponding latest snapshot as updated
1788        reord::maybe_lock().await;
1789        let affected = sqlx::query(
1790            "UPDATE snapshots SET last_modified = $1 WHERE is_latest AND object_id = $2",
1791        )
1792        .bind(updatedness)
1793        .bind(object_id)
1794        .execute(&mut *transaction)
1795        .await
1796        .wrap_with_context(|| format!("failed marking last snapshot of {object_id:?} as modified"))?
1797        .rows_affected();
1798        reord::point().await;
1799        assert!(
1800            affected == 1,
1801            "Object {object_id:?} did not have a latest snapshot, something went very wrong"
1802        );
1803
1804        // Finally, commit the transaction
1805        reord::maybe_lock().await;
1806        transaction.commit().await.wrap_with_context(|| {
1807            format!("committing transaction that recreated {object_id:?} at {cutoff_time:?}")
1808        })?;
1809        reord::point().await;
1810
1811        Ok(Some((cutoff_time, Arc::new(latest_object))))
1812    }
1813
1814    async fn create_and_return_rdep_changes<T: Object>(
1815        &self,
1816        object_id: ObjectId,
1817        created_at: EventId,
1818        object: Arc<T>,
1819        updatedness: Updatedness,
1820    ) -> crate::Result<Option<(Arc<T>, Vec<ReadPermsChanges>)>> {
1821        let cache_db = self
1822            .cache_db
1823            .upgrade()
1824            .expect("Called PostgresDb::create after CacheDb went away");
1825
1826        let res = self
1827            .create_impl(object_id, created_at, object, updatedness, &*cache_db)
1828            .await?;
1829        match res {
1830            Either::Left(res) => {
1831                // Newly inserted, update rdeps and return
1832                // Update the reverse-dependencies, now that we have updated the object itself.
1833                let rdeps = self
1834                    .update_rdeps(object_id, &*cache_db)
1835                    .await
1836                    .wrap_with_context(|| {
1837                        format!("updating permissions for reverse-dependencies of {object_id:?}")
1838                    })?;
1839
1840                Ok(Some((res, rdeps)))
1841            }
1842            Either::Right(None) => {
1843                // Was already present, and has no pending rdeps
1844                Ok(None)
1845            }
1846            Either::Right(Some(_snapshot_id)) => {
1847                // Was already present, but had a task ongoing to update the rdeps
1848                // TODO(perf-high): this will duplicate the work done by the other create call
1849                self.update_rdeps(object_id, &*cache_db)
1850                    .await
1851                    .wrap_with_context(|| {
1852                        format!("updating permissions for reverse-dependencies of {object_id:?}")
1853                    })?;
1854                Ok(None)
1855            }
1856        }
1857    }
1858
1859    async fn submit_and_return_rdep_changes<T: Object>(
1860        &self,
1861        object_id: ObjectId,
1862        event_id: EventId,
1863        event: Arc<T::Event>,
1864        updatedness: Updatedness,
1865    ) -> crate::Result<Option<(Arc<T>, Vec<ReadPermsChanges>)>> {
1866        let cache_db = self
1867            .cache_db
1868            .upgrade()
1869            .expect("Called PostgresDb::submit after CacheDb went away");
1870
1871        let res = self
1872            .submit_impl(object_id, event_id, event, updatedness, &*cache_db)
1873            .await?;
1874        match res {
1875            Either::Left(res) => {
1876                // Newly inserted, update rdeps and return
1877                // Update the reverse-dependencies, now that we have updated the object itself.
1878                let rdeps = self
1879                    .update_rdeps(object_id, &*cache_db)
1880                    .await
1881                    .wrap_with_context(|| {
1882                        format!("updating permissions for reverse-dependencies of {object_id:?}")
1883                    })?;
1884
1885                Ok(Some((res, rdeps)))
1886            }
1887            Either::Right(None) => {
1888                // Was already present, and has no pending rdeps
1889                Ok(None)
1890            }
1891            Either::Right(Some(_snapshot_id)) => {
1892                // Was already present, but had a task ongoing to update the rdeps
1893                // TODO(perf-high): this will duplicate the work done by the other create call
1894                self.update_rdeps(object_id, &*cache_db)
1895                    .await
1896                    .wrap_with_context(|| {
1897                        format!("updating permissions for reverse-dependencies of {object_id:?}")
1898                    })?;
1899                Ok(None)
1900            }
1901        }
1902    }
1903
1904    async fn update_pending_rdeps(&self) -> crate::Result<()> {
1905        let cache_db = self
1906            .cache_db
1907            .upgrade()
1908            .expect("Called PostgresDb::create after CacheDb went away");
1909        let cache_db = &*cache_db;
1910
1911        let mut rdep_update_res = sqlx::query!(
1912            "
1913                SELECT object_id
1914                FROM snapshots
1915                WHERE array_length(reverse_dependents_to_update, 1) IS NOT NULL
1916            "
1917        )
1918        .map(|r| ObjectId::from_uuid(r.object_id))
1919        .fetch(&self.db)
1920        .map(|object_id| async move {
1921            let object_id = object_id.wrap_context("listing updates with rdeps to update")?;
1922            self.update_rdeps(object_id, cache_db)
1923                .await
1924                .wrap_with_context(|| format!("updating rdeps of {object_id:?}"))
1925        })
1926        .buffered(32);
1927        while let Some(res) = rdep_update_res.next().await {
1928            res?;
1929        }
1930        Ok(())
1931    }
1932
1933    async fn login_session(&self, session: Session) -> crate::Result<(SessionToken, SessionRef)> {
1934        let token = SessionToken::new();
1935        sqlx::query("INSERT INTO sessions VALUES ($1, $2, $3, $4, $5, $6, $7)")
1936            .bind(token)
1937            .bind(session.session_ref)
1938            .bind(session.user_id)
1939            .bind(&session.session_name)
1940            .bind(session.login_time.ms_since_posix()?)
1941            .bind(session.last_active.ms_since_posix()?)
1942            .bind(
1943                session
1944                    .expiration_time
1945                    .map(|t| t.ms_since_posix())
1946                    .transpose()?,
1947            )
1948            .execute(&self.db)
1949            .await
1950            .wrap_with_context(|| {
1951                format!("logging in new session {token:?} with data {session:?}")
1952            })?;
1953        Ok((token, session.session_ref))
1954    }
1955
1956    async fn resume_session(&self, token: SessionToken) -> crate::Result<Session> {
1957        let res = sqlx::query!(
1958            "SELECT * FROM sessions WHERE session_token = $1",
1959            token as SessionToken
1960        )
1961        .fetch_optional(&self.db)
1962        .await
1963        .wrap_with_context(|| format!("resuming session for {token:?}"))?;
1964        let Some(res) = res else {
1965            return Err(crate::Error::InvalidToken(token));
1966        };
1967        let expiration_time = res
1968            .expiration_time
1969            .map(SystemTime::from_ms_since_posix)
1970            .transpose()
1971            .expect("negative timestamp made its way into database");
1972        if expiration_time
1973            .map(|t| t < SystemTime::now())
1974            .unwrap_or(false)
1975        {
1976            return Err(crate::Error::InvalidToken(token));
1977        }
1978        Ok(Session {
1979            user_id: User::from_uuid(res.user_id),
1980            session_ref: SessionRef::from_uuid(res.session_ref),
1981            session_name: res.name,
1982            login_time: SystemTime::from_ms_since_posix(res.login_time)
1983                .expect("negative timestamp made its way into database"),
1984            last_active: SystemTime::from_ms_since_posix(res.last_active)
1985                .expect("negative timestamp made its way into database"),
1986            expiration_time,
1987        })
1988    }
1989
1990    async fn mark_session_active(&self, token: SessionToken, at: SystemTime) -> crate::Result<()> {
1991        let affected = sqlx::query("UPDATE sessions SET last_active = $1 WHERE session_token = $2")
1992            .bind(at.ms_since_posix()?)
1993            .bind(token)
1994            .execute(&self.db)
1995            .await
1996            .wrap_with_context(|| format!("marking session {token:?} as active as of {at:?}"))?
1997            .rows_affected();
1998        if affected != 1 {
1999            return Err(crate::Error::InvalidToken(token));
2000        }
2001        Ok(())
2002    }
2003
2004    async fn rename_session<'a>(
2005        &'a self,
2006        token: SessionToken,
2007        new_name: &'a str,
2008    ) -> crate::Result<()> {
2009        let affected = sqlx::query("UPDATE sessions SET name = $1 WHERE session_token = $2")
2010            .bind(new_name)
2011            .bind(token)
2012            .execute(&self.db)
2013            .await
2014            .wrap_with_context(|| format!("renaming session {token:?} into {new_name:?}"))?
2015            .rows_affected();
2016        if affected != 1 {
2017            return Err(crate::Error::InvalidToken(token));
2018        }
2019        Ok(())
2020    }
2021
2022    async fn list_sessions(&self, user: User) -> crate::Result<Vec<Session>> {
2023        let rows = sqlx::query!("SELECT * FROM sessions WHERE user_id = $1", user as User)
2024            .fetch_all(&self.db)
2025            .await
2026            .wrap_with_context(|| format!("listing sessions for {user:?}"))?;
2027        let sessions = rows
2028            .into_iter()
2029            .map(|r| Session {
2030                user_id: User::from_uuid(r.user_id),
2031                session_ref: SessionRef::from_uuid(r.session_ref),
2032                session_name: r.name,
2033                login_time: SystemTime::from_ms_since_posix(r.login_time)
2034                    .expect("negative timestamp made its way into database"),
2035                last_active: SystemTime::from_ms_since_posix(r.last_active)
2036                    .expect("negative timestamp made its way into database"),
2037                expiration_time: r
2038                    .expiration_time
2039                    .map(SystemTime::from_ms_since_posix)
2040                    .transpose()
2041                    .expect("negative timestamp made its way into database"),
2042            })
2043            .collect();
2044        Ok(sessions)
2045    }
2046
2047    async fn disconnect_session(&self, user: User, session: SessionRef) -> crate::Result<()> {
2048        sqlx::query("DELETE FROM sessions WHERE user_id = $1 AND session_ref = $2")
2049            .bind(user)
2050            .bind(session)
2051            .execute(&self.db)
2052            .await
2053            .wrap_with_context(|| format!("disconnecting session {session:?}"))?;
2054        // If nothing to delete it's fine, the session was probably already disconnected
2055        Ok(())
2056    }
2057}
2058
2059async fn check_required_binaries(
2060    t: &mut sqlx::PgConnection,
2061    mut binaries: Vec<BinPtr>,
2062) -> crate::Result<()> {
2063    // FOR KEY SHARE: prevent DELETE of the binaries while `t` is running
2064    reord::maybe_lock().await;
2065    let present_ids =
2066        sqlx::query("SELECT binary_id FROM binaries WHERE binary_id = ANY ($1) FOR KEY SHARE")
2067            .bind(&binaries)
2068            .fetch_all(&mut *t)
2069            .await
2070            .wrap_context("listing binaries already present in database")?;
2071    reord::point().await;
2072    binaries.retain(|b| {
2073        present_ids
2074            .iter()
2075            .all(|i| i.get::<uuid::Uuid, _>(0) != b.to_uuid())
2076    });
2077    if !binaries.is_empty() {
2078        return Err(crate::Error::MissingBinaries(binaries));
2079    }
2080    Ok(())
2081}
2082
2083/// `from` is excluded
2084/// `to` is included
2085async fn apply_events_between<T: Object>(
2086    transaction: &mut sqlx::PgConnection,
2087    object: &mut T,
2088    object_id: ObjectId,
2089    from: EventId,
2090    to: EventId,
2091) -> anyhow::Result<()> {
2092    reord::point().await;
2093    let mut events = sqlx::query!(
2094        "
2095            SELECT event_id, data
2096            FROM events
2097            WHERE object_id = $1
2098            AND event_id > $2
2099            AND event_id <= $3
2100            ORDER BY event_id ASC
2101        ",
2102        object_id as ObjectId,
2103        from as EventId,
2104        to as EventId,
2105    )
2106    .fetch(&mut *transaction);
2107    while let Some(e) = events.next().await {
2108        let e = e.with_context(|| {
2109            format!("fetching all events for {object_id:?} betwen {from:?} and {to:?}")
2110        })?;
2111        let e = serde_json::from_value::<T::Event>(e.data).with_context(|| {
2112            format!(
2113                "parsing event {:?} of type {:?}",
2114                e.event_id,
2115                T::type_ulid()
2116            )
2117        })?;
2118
2119        object.apply(DbPtr::from(object_id), &e);
2120    }
2121    Ok(())
2122}
2123
2124pub fn where_clause(this: &Query, first_idx: usize) -> String {
2125    let mut res = String::new();
2126    let mut bind_idx = first_idx;
2127    add_to_where_clause(&mut res, &mut bind_idx, this);
2128    res
2129}
2130
2131pub fn binds(this: &Query) -> crate::Result<Vec<Bind<'_>>> {
2132    let mut res = Vec::new();
2133    add_to_binds(&mut res, this)?;
2134    Ok(res)
2135}
2136
2137fn add_to_where_clause(res: &mut String, bind_idx: &mut usize, query: &Query) {
2138    let mut initial_bind_idx = *bind_idx;
2139    match query {
2140        Query::All(v) => {
2141            res.push_str("TRUE");
2142            for q in v {
2143                res.push_str(" AND (");
2144                add_to_where_clause(&mut *res, &mut *bind_idx, q);
2145                res.push(')');
2146            }
2147        }
2148        Query::Any(v) => {
2149            res.push_str("FALSE");
2150            for q in v {
2151                res.push_str(" OR (");
2152                add_to_where_clause(&mut *res, &mut *bind_idx, q);
2153                res.push(')');
2154            }
2155        }
2156        Query::Not(q) => {
2157            res.push_str("NOT (");
2158            add_to_where_clause(&mut *res, &mut *bind_idx, q);
2159            res.push(')');
2160        }
2161        Query::Eq(path, _) => {
2162            res.push_str("COALESCE(");
2163            add_path_to_clause(&mut *res, &mut *bind_idx, path);
2164            res.push_str(&format!(" = ${}, FALSE)", bind_idx));
2165            *bind_idx += 1;
2166        }
2167        Query::Le(path, _) => {
2168            res.push_str("CASE WHEN jsonb_typeof(");
2169            add_path_to_clause(&mut *res, &mut *bind_idx, path);
2170            res.push_str(") = 'number' THEN (");
2171            add_path_to_clause(&mut *res, &mut initial_bind_idx, path);
2172            res.push_str(&format!(")::numeric <= ${} ELSE FALSE END", bind_idx));
2173            *bind_idx += 1;
2174        }
2175        Query::Lt(path, _) => {
2176            res.push_str("CASE WHEN jsonb_typeof(");
2177            add_path_to_clause(&mut *res, &mut *bind_idx, path);
2178            res.push_str(") = 'number' THEN (");
2179            add_path_to_clause(&mut *res, &mut initial_bind_idx, path);
2180            res.push_str(&format!(")::numeric < ${} ELSE FALSE END", bind_idx));
2181            *bind_idx += 1;
2182        }
2183        Query::Ge(path, _) => {
2184            res.push_str("CASE WHEN jsonb_typeof(");
2185            add_path_to_clause(&mut *res, &mut *bind_idx, path);
2186            res.push_str(") = 'number' THEN (");
2187            add_path_to_clause(&mut *res, &mut initial_bind_idx, path);
2188            res.push_str(&format!(")::numeric >= ${} ELSE FALSE END", bind_idx));
2189            *bind_idx += 1;
2190        }
2191        Query::Gt(path, _) => {
2192            res.push_str("CASE WHEN jsonb_typeof(");
2193            add_path_to_clause(&mut *res, &mut *bind_idx, path);
2194            res.push_str(") = 'number' THEN (");
2195            add_path_to_clause(&mut *res, &mut initial_bind_idx, path);
2196            res.push_str(&format!(")::numeric > ${} ELSE FALSE END", bind_idx));
2197            *bind_idx += 1;
2198        }
2199        Query::Contains(path, _) => {
2200            res.push_str("COALESCE(");
2201            add_path_to_clause(&mut *res, &mut *bind_idx, path);
2202            res.push_str(&format!(" @> ${}, FALSE)", bind_idx));
2203            *bind_idx += 1;
2204        }
2205        Query::ContainsStr(path, pat) => {
2206            res.push_str("COALESCE(to_tsvector(");
2207            add_path_to_clause(&mut *res, &mut *bind_idx, path);
2208            // If the pattern is only spaces, then postgresql wrongly returns `false`. But we do want
2209            // to check that the field does exist. So add an IS NOT NULL in that case
2210            let or_empty_pat = match pat.chars().all(|c| c == ' ') {
2211                true => "IS NOT NULL",
2212                false => "",
2213            };
2214            res.push_str(&format!(
2215                "->'_crdb-normalized') @@ phraseto_tsquery(${}) {or_empty_pat}, FALSE)",
2216                bind_idx,
2217            ));
2218            *bind_idx += 1;
2219        }
2220    }
2221}
2222
2223fn add_path_to_clause(res: &mut String, bind_idx: &mut usize, path: &[JsonPathItem]) {
2224    if let Some(JsonPathItem::Id(i)) = path.last() {
2225        if *i == -1 || *i == 0 {
2226            // PostgreSQL currently treats numerics as arrays of size 1
2227            // See also https://www.postgresql.org/message-id/87h6jbbxma.fsf%40coegni.ekleog.org
2228            res.push_str("CASE WHEN jsonb_typeof(snapshot");
2229            for (i, _) in path[..path.len() - 1].iter().enumerate() {
2230                res.push_str(&format!("->${}", *bind_idx + i));
2231            }
2232            res.push_str(") = 'array' THEN ")
2233        }
2234    }
2235    res.push_str("snapshot");
2236    for _ in path {
2237        res.push_str(&format!("->${bind_idx}"));
2238        *bind_idx += 1;
2239    }
2240    if let Some(JsonPathItem::Id(i)) = path.last() {
2241        if *i == -1 || *i == 0 {
2242            res.push_str(" ELSE NULL END");
2243        }
2244    }
2245}
2246
2247fn add_path_to_binds<'a>(res: &mut Vec<Bind<'a>>, path: &'a [JsonPathItem]) {
2248    for p in path {
2249        match p {
2250            JsonPathItem::Key(k) => res.push(Bind::Str(k)),
2251            JsonPathItem::Id(i) => res.push(Bind::I32(*i)),
2252        }
2253    }
2254}
2255
2256pub enum Bind<'a> {
2257    Json(&'a serde_json::Value),
2258    Str(&'a str),
2259    String(String),
2260    Decimal(Decimal),
2261    I32(i32),
2262}
2263
2264fn add_to_binds<'a>(res: &mut Vec<Bind<'a>>, query: &'a Query) -> crate::Result<()> {
2265    match query {
2266        Query::All(v) => {
2267            for q in v {
2268                add_to_binds(&mut *res, q)?;
2269            }
2270        }
2271        Query::Any(v) => {
2272            for q in v {
2273                add_to_binds(&mut *res, q)?;
2274            }
2275        }
2276        Query::Not(q) => {
2277            add_to_binds(&mut *res, q)?;
2278        }
2279        Query::Eq(p, v) => {
2280            add_path_to_binds(&mut *res, p);
2281            res.push(Bind::Json(v));
2282        }
2283        Query::Le(p, v) => {
2284            add_path_to_binds(&mut *res, p);
2285            res.push(Bind::Decimal(*v));
2286        }
2287        Query::Lt(p, v) => {
2288            add_path_to_binds(&mut *res, p);
2289            res.push(Bind::Decimal(*v));
2290        }
2291        Query::Ge(p, v) => {
2292            add_path_to_binds(&mut *res, p);
2293            res.push(Bind::Decimal(*v));
2294        }
2295        Query::Gt(p, v) => {
2296            add_path_to_binds(&mut *res, p);
2297            res.push(Bind::Decimal(*v));
2298        }
2299        Query::Contains(p, v) => {
2300            add_path_to_binds(&mut *res, p);
2301            res.push(Bind::Json(v));
2302        }
2303        Query::ContainsStr(p, v) => {
2304            add_path_to_binds(&mut *res, p);
2305            crdb_core::check_string(v)?;
2306            res.push(Bind::String(crdb_core::normalize(v)));
2307        }
2308    }
2309    Ok(())
2310}