Skip to main content

crdb_indexed_db/
lib.rs

1#![cfg(target_arch = "wasm32")]
2
3use anyhow::anyhow;
4use crdb_core::{check_strings, ClientStorageInfo, LoginInfo, SavedObjectMeta, SavedQuery};
5use crdb_core::{
6    normalizer_version, BinPtr, ClientSideDb, Db, DbPtr, Event, EventId, Importance, Object,
7    ObjectId, Query, QueryId, ResultExt, TypeId, Updatedness, Upload, UploadId,
8};
9use crdb_helpers::parse_snapshot_js;
10use futures::{future, TryFutureExt};
11use indexed_db::CursorDirection;
12use js_sys::{Array, JsString, Uint8Array};
13use std::{
14    cell::Cell,
15    collections::{HashMap, HashSet},
16    ops::Bound,
17    sync::Arc,
18};
19use wasm_bindgen::{JsCast, JsValue};
20
21pub use crdb_core::{Error, Result};
22
23const OBJECT_STORE_LIST: &[&str] = &[
24    "binaries",
25    "config",
26    "events",
27    "events_meta",
28    "queries_meta",
29    "snapshots",
30    "snapshots_meta",
31    "upload_queue",
32    "upload_queue_meta",
33];
34
35const CONFIG_SAVED_LOGIN: &str = "login";
36
37#[derive(Debug, PartialEq, serde::Deserialize, serde::Serialize)]
38struct SnapshotMeta {
39    snapshot_id: EventId,
40    type_id: TypeId,
41    object_id: ObjectId,
42    is_creation: Option<usize>, // IndexedDB cannot index booleans, but never indexes missing properties
43    is_latest: Option<usize>,   // So, use None for "false" and Some(1) for "true"
44    normalizer_version: i32,
45    snapshot_version: i32,
46    // TODO(api-highest): introduce a side-table for all the metadata: have_all_until, importance*, is_creation/latest
47    have_all_until: Option<Updatedness>, // This value is always up-to-date on the latest snapshot
48    // The two below are only set on creation snapshot.
49    importance: Option<Importance>,
50    importance_from_queries: Option<Importance>,
51    required_binaries: Vec<BinPtr>,
52}
53
54#[derive(Debug, PartialEq, serde::Deserialize, serde::Serialize)]
55struct EventMeta {
56    event_id: EventId,
57    object_id: ObjectId,
58    required_binaries: Vec<BinPtr>,
59}
60
61#[derive(Debug, serde::Deserialize, serde::Serialize)]
62struct QueryMeta {
63    query_id: QueryId,
64    query: Arc<Query>,
65    type_id: TypeId,
66    have_all_until: Option<Updatedness>,
67    importance: Importance,
68}
69
70#[derive(Debug, serde::Deserialize, serde::Serialize)]
71struct UploadMeta {
72    required_binaries: Vec<BinPtr>,
73}
74
75pub struct IndexedDb {
76    is_persistent: bool,
77    db: indexed_db::Database<crate::Error>,
78    objects_unlocked_this_run: Cell<usize>,
79}
80
81impl IndexedDb {
82    pub async fn connect(url: &str) -> anyhow::Result<IndexedDb> {
83        #[cfg(not(feature = "_tests"))]
84        let is_persistent = {
85            // If not running tests, try to persist the storage — in tests this times out, so ignore it
86            let window = web_sys::window().ok_or_else(|| anyhow!("not running in a browser"))?;
87            let persist_fut = window.navigator().storage().persist().wrap_context(
88                "failed to request persistence, did the user disable storage altogether?",
89            )?;
90            wasm_bindgen_futures::JsFuture::from(persist_fut)
91                .await
92                .wrap_context("failed to resolve request for persistence")?
93                .as_bool()
94                .ok_or_else(|| anyhow!("requesting for persistence did not return a boolean"))?
95        };
96        #[cfg(feature = "_tests")]
97        let is_persistent = false;
98
99        let factory = indexed_db::Factory::get().wrap_context("getting IndexedDb factory")?;
100
101        const VERSION: u32 = 1;
102        let db = factory
103            .open(url, VERSION, |evt| async move {
104                let db = evt.database();
105
106                // Note: whenever changing this list, remember to also update OBJECT_STORE_LIST
107                db.build_object_store("config").create()?;
108                db.build_object_store("snapshots").create()?;
109                db.build_object_store("events").create()?;
110                db.build_object_store("binaries").create()?;
111                db.build_object_store("upload_queue").create()?;
112                db.build_object_store("queries_meta")
113                    .key_path("query_id")
114                    .create()?;
115                let snapshots_meta = db
116                    .build_object_store("snapshots_meta")
117                    .key_path("snapshot_id")
118                    .create()?;
119                let events_meta = db
120                    .build_object_store("events_meta")
121                    .key_path("event_id")
122                    .create()?;
123                let upload_queue_meta = db
124                    .build_object_store("upload_queue_meta")
125                    .auto_increment()
126                    .create()?;
127
128                snapshots_meta
129                    .build_compound_index(
130                        "latest_type_object",
131                        &["is_latest", "type_id", "object_id"],
132                    )
133                    .unique()
134                    .create()?;
135                snapshots_meta
136                    .build_compound_index("creation_object", &["is_creation", "object_id"])
137                    .unique()
138                    .create()?;
139                snapshots_meta
140                    .build_compound_index("object_snapshot", &["object_id", "snapshot_id"])
141                    .create()?;
142                snapshots_meta
143                    .build_index("required_binaries", "required_binaries")
144                    .multi_entry()
145                    .create()?;
146
147                events_meta
148                    .build_compound_index("object_event", &["object_id", "event_id"])
149                    .create()?;
150                events_meta
151                    .build_index("required_binaries", "required_binaries")
152                    .multi_entry()
153                    .create()?;
154
155                upload_queue_meta
156                    .build_index("required_binaries", "required_binaries")
157                    .multi_entry()
158                    .create()?;
159
160                Ok(())
161            })
162            .await
163            .wrap_with_context(|| format!("opening IndexedDb {url:?} at version {VERSION}"))?;
164
165        Ok(IndexedDb {
166            is_persistent,
167            db,
168            objects_unlocked_this_run: Cell::new(0),
169        })
170    }
171
172    #[cfg(feature = "_tests")]
173    pub fn close(&self) {
174        self.db.close();
175    }
176
177    pub fn is_persistent(&self) -> bool {
178        self.is_persistent
179    }
180
181    async fn list_required_binaries(
182        transaction: &indexed_db::Transaction<crate::Error>,
183    ) -> crate::Result<HashSet<BinPtr>> {
184        let snapshots_meta = transaction
185            .object_store("snapshots_meta")
186            .wrap_context("opening 'snapshots_meta' object store")?;
187        let events_meta = transaction
188            .object_store("events_meta")
189            .wrap_context("opening 'events_meta' object store")?;
190        let upload_queue_meta = transaction
191            .object_store("upload_queue_meta")
192            .wrap_context("opening 'upload_queue_meta' object store")?;
193
194        let snapshot_required_binaries = snapshots_meta
195            .index("required_binaries")
196            .wrap_context("opening 'required_binaries' snapshot index")?;
197        let event_required_binaries = events_meta
198            .index("required_binaries")
199            .wrap_context("opening 'required_binaries' event index")?;
200        let upload_queue_required_binaries = upload_queue_meta
201            .index("required_binaries")
202            .wrap_context("opening 'required_binaries' upload_queue index")?;
203
204        let required_binaries = snapshot_required_binaries
205            .get_all_keys(None)
206            .await
207            .wrap_context("listing all required binaries for snapshots")?
208            .into_iter()
209            .chain(
210                event_required_binaries
211                    .get_all_keys(None)
212                    .await
213                    .wrap_context("listing all required binaries for events")?
214                    .into_iter(),
215            )
216            .chain(
217                upload_queue_required_binaries
218                    .get_all_keys(None)
219                    .await
220                    .wrap_context("listing all required binaries for the upload queue")?
221                    .into_iter(),
222            );
223        let mut res = HashSet::new();
224        for b in required_binaries {
225            let b = serde_wasm_bindgen::from_value::<BinPtr>(b).wrap_context("parsing BinPtr")?;
226            res.insert(b);
227        }
228
229        Ok(res)
230    }
231
232    async fn create_impl<T: Object>(
233        transaction: indexed_db::Transaction<crate::Error>,
234        object_id: ObjectId,
235        created_at: EventId,
236        object: Arc<T>,
237        updatedness: Option<Updatedness>,
238        importance: Importance,
239        importance_from_queries: Importance,
240    ) -> std::result::Result<Option<Arc<T>>, indexed_db::Error<crate::Error>> {
241        let mut new_snapshot_meta = SnapshotMeta {
242            snapshot_id: created_at,
243            type_id: *T::type_ulid(),
244            object_id,
245            is_creation: Some(1),
246            is_latest: Some(1),
247            normalizer_version: normalizer_version(),
248            snapshot_version: T::snapshot_version(),
249            have_all_until: updatedness,
250            importance: Some(importance),
251            importance_from_queries: Some(importance_from_queries),
252            required_binaries: object.required_binaries(),
253        };
254        let object_id_js = object_id.to_js_string();
255        let new_snapshot_id_js = created_at.to_js_string();
256        let new_snapshot_meta_js = to_js(&new_snapshot_meta)
257            .wrap_with_context(|| format!("serializing metadata for {object_id:?}"))?;
258        let new_snapshot_js =
259            to_js(&*object).wrap_with_context(|| format!("serializing {object_id:?}"))?;
260        let required_binaries = object.required_binaries();
261        // TODO(perf-low): should make this happen as part of the walk happening anyway in to_js
262        check_strings(&serde_json::to_value(&*object).wrap_context("serializing to json")?)?;
263
264        let snapshots = transaction
265            .object_store("snapshots")
266            .wrap_context("retrieving 'snapshots' object store")?;
267        let snapshots_meta = transaction
268            .object_store("snapshots_meta")
269            .wrap_context("retrieving 'snapshots_meta' object store")?;
270        let events = transaction
271            .object_store("events")
272            .wrap_context("retrieving 'events' object store")?;
273        let binaries = transaction
274            .object_store("binaries")
275            .wrap_context("retrieving 'binaries' object store")?;
276
277        let creation_object = snapshots_meta
278            .index("creation_object")
279            .wrap_context("retrieving 'creation_object' index")?;
280
281        // First, check for absence of object id conflict
282        if let Some(old_meta_js) = creation_object
283            .get(&Array::from_iter([&JsValue::from(1), &object_id_js]))
284            .await
285            .wrap_context("checking whether {object_id:?} already existed")?
286        {
287            // Snapshot metadata for this object already exists. Check that the already-existing value was the same
288            let mut old_meta = serde_wasm_bindgen::from_value::<SnapshotMeta>(old_meta_js)
289                .wrap_with_context(|| {
290                    format!("deserializing preexisting snapshot metadata for {object_id:?}")
291                })?;
292            // Ignore a few fields in comparison below
293            new_snapshot_meta.is_latest = old_meta.is_latest;
294            old_meta.have_all_until = std::cmp::max(old_meta.have_all_until, updatedness);
295            new_snapshot_meta.have_all_until = old_meta.have_all_until;
296            let importance_before = old_meta.importance.unwrap_or(Importance::NONE);
297            let importance_from_queries_before =
298                old_meta.importance_from_queries.unwrap_or(Importance::NONE);
299            let importance_after = importance_before | importance;
300            let importance_from_queries_after =
301                importance_from_queries_before | importance_from_queries;
302            old_meta.importance = Some(importance_after);
303            old_meta.importance_from_queries = Some(importance_from_queries_after);
304            new_snapshot_meta.importance = Some(importance_after);
305            new_snapshot_meta.importance_from_queries = Some(importance_from_queries_after);
306            if old_meta != new_snapshot_meta {
307                return Err(crate::Error::ObjectAlreadyExists(object_id).into());
308            }
309
310            // Metadata is the same, still need to check snapshot contents
311            let old_data_js = snapshots
312                .get(&new_snapshot_id_js)
313                .await
314                .wrap_with_context(|| format!("retrieving snapshot data for {created_at:?}"))?
315                .ok_or_else(|| {
316                    crate::Error::Other(anyhow!(
317                        "Snapshot metadata existed without data for {created_at:?}"
318                    ))
319                })?;
320            let old_data = parse_snapshot_js::<T>(old_meta.snapshot_version, old_data_js)
321                .wrap_with_context(|| {
322                    format!("deserializing preexisting snapshot for {created_at:?}")
323                })?;
324            if old_data != *object {
325                return Err(crate::Error::ObjectAlreadyExists(object_id).into());
326            }
327
328            // The old snapshot and data were the same, we only need to increase the importance if required
329            if importance_before != importance_after {
330                todo!() // TODO(api-highest): set_object_importance_impl(transaction, object_id, importance_after)
331            }
332            if importance_from_queries_before != importance_from_queries_after {
333                todo!() // TODO(api-highest): set_query_importance_impl(transaction, object_id, importance_from_queries_after)
334            }
335            return Ok(None);
336        }
337
338        // The object didn't exist yet, try inserting it
339        match snapshots_meta.add(&new_snapshot_meta_js).await {
340            Err(indexed_db::Error::AlreadyExists) => {
341                // `created_at` already exists, but we already checked that `object_id` did not. This is a collision.
342                Err(crate::Error::EventAlreadyExists(created_at).into())
343            }
344            Err(e) => Err(e),
345            Ok(_) => {
346                // Snapshot metadata addition succeeded. Now, time to add the data itself
347                snapshots
348                    .add_kv(&new_snapshot_id_js, &new_snapshot_js)
349                    .await
350                    .wrap_with_context(|| {
351                        format!("saving new snapshot {created_at:?} in database")
352                    })?;
353
354                // Check for no event id conflict
355                if events
356                    .contains(&new_snapshot_id_js)
357                    .await
358                    .wrap_with_context(|| {
359                        format!("checking whether {created_at:?} already existed as an event")
360                    })?
361                {
362                    return Err(crate::Error::EventAlreadyExists(created_at).into());
363                }
364
365                // Finally, validate the required binaries
366                check_required_binaries(binaries, required_binaries).await?;
367
368                Ok(Some(object))
369            }
370        }
371    }
372}
373
374impl Db for IndexedDb {
375    async fn create<T: Object>(
376        &self,
377        object_id: ObjectId,
378        created_at: EventId,
379        object: Arc<T>,
380        updatedness: Option<Updatedness>,
381        importance: Importance,
382    ) -> crate::Result<Option<Arc<T>>> {
383        let res = self
384            .db
385            .transaction(&["snapshots", "snapshots_meta", "events", "binaries"])
386            .rw()
387            .run(move |transaction| {
388                Self::create_impl(
389                    transaction,
390                    object_id,
391                    created_at,
392                    object,
393                    updatedness,
394                    importance,
395                    Importance::NONE, // TODO(api-highest): this is definitely wrong, rethink the API
396                )
397            })
398            .await
399            .wrap_with_context(|| format!("running creation transaction for {object_id:?}"));
400        if res.is_ok() && importance == Importance::NONE {
401            self.objects_unlocked_this_run
402                .set(self.objects_unlocked_this_run.get() + 1);
403        }
404        res
405    }
406
407    async fn submit<T: Object>(
408        &self,
409        object_id: ObjectId,
410        event_id: EventId,
411        event: Arc<T::Event>,
412        updatedness: Option<Updatedness>,
413        additional_importance: Importance,
414    ) -> crate::Result<Option<Arc<T>>> {
415        let new_event_meta = EventMeta {
416            event_id,
417            object_id,
418            required_binaries: event.required_binaries(),
419        };
420        let object_id_js = object_id.to_js_string();
421        let new_event_id_js = event_id.to_js_string();
422        let zero_event_id_js = EventId::from_u128(0).to_js_string();
423        let max_event_id_js = EventId::from_u128(u128::MAX).to_js_string();
424        let new_event_meta_js =
425            to_js(&new_event_meta).wrap_with_context(|| format!("serializing {event_id:?}"))?;
426        let new_event_js =
427            to_js(&*event).wrap_with_context(|| format!("serializing {event_id:?}"))?;
428
429        self.db
430            .transaction(&["snapshots", "snapshots_meta", "events", "events_meta", "binaries"])
431            .rw()
432            .run(move |transaction| async move {
433                let snapshots = transaction
434                    .object_store("snapshots")
435                    .wrap_context("retrieving 'snapshots' object store")?;
436                let snapshots_meta = transaction
437                    .object_store("snapshots_meta")
438                    .wrap_context("retrieving 'snapshots_meta' object store")?;
439                let events = transaction
440                    .object_store("events")
441                    .wrap_context("retrieving 'events' object store")?;
442                let events_meta = transaction
443                    .object_store("events_meta")
444                    .wrap_context("retrieving 'events_meta' object store")?;
445                let binaries = transaction
446                    .object_store("binaries")
447                    .wrap_context("retrieving 'binaries' object store")?;
448
449                let creation_object = snapshots_meta
450                    .index("creation_object")
451                    .wrap_context("retrieving 'creation_object' index")?;
452                let object_snapshot = snapshots_meta
453                    .index("object_snapshot")
454                    .wrap_context("retrieving 'object_snapshot' index")?;
455
456                // Check the object does exist, is of the right type and is not too new
457                let Some(creation_snapshot_js) = creation_object
458                    .get(&Array::from_iter([&JsValue::from(1), &object_id_js]))
459                    .await
460                    .wrap_with_context(|| format!("checking that {object_id:?} already exists"))?
461                else {
462                    return Err(crate::Error::ObjectDoesNotExist(object_id).into());
463                };
464                let mut creation_snapshot =
465                    serde_wasm_bindgen::from_value::<SnapshotMeta>(creation_snapshot_js)
466                        .wrap_with_context(|| {
467                            format!("deserializing creation snapshot metadata for {object_id:?}")
468                        })?;
469                if creation_snapshot.type_id != *T::type_ulid() {
470                    return Err(crate::Error::WrongType {
471                        object_id,
472                        expected_type_id: *T::type_ulid(),
473                        real_type_id: creation_snapshot.type_id,
474                    }
475                    .into());
476                }
477                if creation_snapshot.snapshot_id >= event_id {
478                    return Err(crate::Error::EventTooEarly {
479                        event_id,
480                        object_id,
481                        created_at: creation_snapshot.snapshot_id,
482                    }
483                    .into());
484                }
485
486                // TODO(perf-low): should make this happen as part of the walk happening anyway in to_js
487                check_strings(&serde_json::to_value(&*event).wrap_context("serializing to json")?)?;
488
489                // Insert the event metadata, checking for collisions
490                match events_meta.add(&new_event_meta_js).await {
491                    Err(indexed_db::Error::AlreadyExists) => {
492                        // Got a collision. Check whether the event already exist in the database.
493                        let old_meta_js = events_meta.get(&new_event_id_js)
494                            .await
495                            .wrap_with_context(|| format!("retrieving pre-existing event metadata for {event_id:?}"))?
496                            .ok_or_else(|| {
497                                crate::Error::Other(anyhow!("inserting {event_id:?} failed but the preexisting duplicate seems not to exist"))
498                            })?;
499                        let old_meta = serde_wasm_bindgen::from_value::<EventMeta>(old_meta_js)
500                            .wrap_with_context(|| {
501                                format!("deserializing preexisting event metadata for {event_id:?}")
502                            })?;
503                        if old_meta != new_event_meta {
504                            return Err(crate::Error::EventAlreadyExists(event_id).into());
505                        }
506
507                        // Metadata is the same, still need to check event contents
508                        let old_data_js = events
509                            .get(&new_event_id_js)
510                            .await
511                            .wrap_with_context(|| {
512                                format!("retrieving event data for {event_id:?}")
513                            })?
514                            .ok_or_else(|| {
515                                crate::Error::Other(anyhow!(
516                                    "Event metadata existed without data for {event_id:?}"
517                                ))
518                            })?;
519                        let old_data = serde_wasm_bindgen::from_value::<T::Event>(old_data_js)
520                            .wrap_with_context(|| {
521                                format!("deserializing preexisting event data for {event_id:?}")
522                            })?;
523                        if old_data != *event {
524                            return Err(crate::Error::EventAlreadyExists(event_id).into());
525                        }
526
527                        // The old snapshot and data were the same, we just need to lock the object if requested
528                        let importance_before = creation_snapshot.importance;
529                        creation_snapshot.importance = Some(importance_before.unwrap_or(Importance::NONE) | additional_importance);
530                        if importance_before != creation_snapshot.importance {
531                            let creation_snapshot_js = to_js(creation_snapshot)
532                                .wrap_context("serializing snapshot metadata")?;
533                            snapshots_meta.put(&creation_snapshot_js)
534                                .await
535                                .wrap_context("locking creation snapshot")?;
536                        }
537
538                        // All done, we're good to go
539                        return Ok(None);
540                    }
541                    Err(e) => return Err(e),
542                    Ok(_) => (),
543                }
544
545                // Lock the object if requested to
546                let importance_before = creation_snapshot.importance;
547                creation_snapshot.importance = Some(importance_before.unwrap_or(Importance::NONE) | additional_importance);
548                if importance_before != creation_snapshot.importance {
549                    let creation_snapshot_js = to_js(creation_snapshot)
550                        .wrap_context("serializing snapshot metadata")?;
551                    snapshots_meta.put(&creation_snapshot_js)
552                        .await
553                        .wrap_context("locking creation snapshot")?;
554                }
555
556                // Figure out the current `have_all_until` field
557                let latest_snapshot_meta_js = object_snapshot
558                    .cursor()
559                    .range(&**Array::from_iter([&object_id_js, &zero_event_id_js])..=&**Array::from_iter([&object_id_js, &max_event_id_js]))
560                    .wrap_with_context(|| format!("limiting the latest snapshot to recover to those of {object_id:?}"))?
561                    .direction(CursorDirection::Prev)
562                    .open()
563                    .await
564                    .wrap_with_context(|| format!("opening cursor for the latest snapshot of {object_id:?}"))?
565                    .value()
566                    .ok_or_else(|| crate::Error::Other(anyhow!("cannot find a latest snapshot for {object_id:?}")))?;
567                let latest_snapshot_meta = serde_wasm_bindgen::from_value::<SnapshotMeta>(latest_snapshot_meta_js)
568                    .wrap_with_context(|| format!("deserializing the latest known snapshot for {object_id:?}"))?;
569                let now_have_all_until = match (updatedness, latest_snapshot_meta.have_all_until) {
570                    (None, None) => None,
571                    (Some(u), None) => Some(u),
572                    (None, Some(u)) => Some(u),
573                    (Some(now_have_all_until), Some(had_all_until)) => {
574                        if now_have_all_until < had_all_until {
575                            tracing::warn!("server sent an event with an older updatedness than we already had");
576                            Some(had_all_until)
577                        } else {
578                            Some(now_have_all_until)
579                        }
580                    }
581                };
582
583                // Insert the event itself
584                events.add_kv(&new_event_id_js, &new_event_js).await.wrap_with_context(|| format!("saving {event_id:?} in database"))?;
585
586                // Clear all snapshots after the event
587                let mut to_clear = object_snapshot
588                    .cursor()
589                    .range(&**Array::from_iter([&object_id_js, &new_event_id_js])..=&**Array::from_iter([&object_id_js, &max_event_id_js]))
590                    .wrap_with_context(|| format!("limiting the snapshots to delete to only those of {object_id:?} after {event_id:?}"))?
591                    .open()
592                    .await
593                    .wrap_with_context(|| format!("opening cursor of snapshots to delete for {object_id:?} after {event_id:?}"))?;
594                while let Some(snapshot_meta_js) = to_clear.value() {
595                    let snapshot_meta = serde_wasm_bindgen::from_value::<SnapshotMeta>(snapshot_meta_js)
596                        .wrap_with_context(|| format!("deserializing to-clear snapshot of {object_id:?}, after {event_id:?}"))?;
597                    let snapshot_id = snapshot_meta.snapshot_id;
598                    snapshots.delete(&snapshot_id.to_js_string()).await.wrap_with_context(|| format!("deleting snapshot {snapshot_id:?}"))?;
599                    to_clear.delete().await.wrap_with_context(|| format!("deleting snapshot metadata {snapshot_id:?}"))?;
600                    to_clear.advance(1).await.wrap_context("getting next snapshot to delete")?;
601                }
602
603                // Find the last remaining snapshot for the object
604                let last_snapshot_meta_js = object_snapshot
605                    .cursor()
606                    .range(&**Array::from_iter([&object_id_js, &zero_event_id_js])..=&**Array::from_iter([&object_id_js, &max_event_id_js]))
607                    .wrap_with_context(|| format!("limiting the last snapshot to recover to those of {object_id:?}"))?
608                    .direction(CursorDirection::Prev)
609                    .open()
610                    .await
611                    .wrap_with_context(|| format!("opening cursor for the last snapshot of {object_id:?}"))?
612                    .value()
613                    .ok_or_else(|| crate::Error::Other(anyhow!("cannot find a latest snapshot for {object_id:?}")))?;
614                let mut last_snapshot_meta = serde_wasm_bindgen::from_value::<SnapshotMeta>(last_snapshot_meta_js)
615                    .wrap_with_context(|| format!("deserializing the last known snapshot for {object_id:?}"))?;
616                let last_snapshot_id = last_snapshot_meta.snapshot_id;
617                let last_snapshot_id_js = last_snapshot_id.to_js_string();
618                let last_snapshot_js = snapshots.get(&last_snapshot_id_js)
619                    .await
620                    .wrap_with_context(|| format!("retrieving last available snapshot {last_snapshot_id:?}"))?
621                    .ok_or_else(|| crate::Error::Other(anyhow!("cannot retrieve snapshot data for {last_snapshot_id:?}")))?;
622                let mut last_snapshot = parse_snapshot_js::<T>(last_snapshot_meta.snapshot_version, last_snapshot_js)
623                    .wrap_with_context(|| format!("deserializing snapshot data {last_snapshot_id:?}"))?;
624
625                // Mark it as non-latest if needed
626                if last_snapshot_meta.is_latest.is_some() {
627                    last_snapshot_meta.is_latest = None;
628                    let last_snapshot_meta_js = to_js(last_snapshot_meta)
629                        .wrap_with_context(|| format!("reserializing {last_snapshot_id:?}"))?;
630                    snapshots_meta.put(&last_snapshot_meta_js)
631                        .await
632                        .wrap_with_context(|| format!("marking {last_snapshot_id:?} as not the latest any longer"))?;
633                }
634
635                // Apply all the events since the last snapshot (excluded)
636                let last_applied_event_id = apply_events_after(&transaction, &mut last_snapshot, object_id, last_snapshot_id).await?;
637
638                // Save the new last snapshot
639                let new_last_snapshot_meta = SnapshotMeta {
640                    snapshot_id: last_applied_event_id,
641                    type_id: *T::type_ulid(),
642                    object_id,
643                    is_creation: None,
644                    is_latest: Some(1),
645                    normalizer_version: normalizer_version(),
646                    snapshot_version: T::snapshot_version(),
647                    have_all_until: now_have_all_until,
648                    importance: None,
649                    importance_from_queries: None,
650                    required_binaries: last_snapshot.required_binaries(),
651                };
652                let last_applied_event_id_js = to_js(last_applied_event_id)
653                    .wrap_with_context(|| format!("serializing {last_applied_event_id:?}"))?;
654                let new_last_snapshot_meta_js = to_js(new_last_snapshot_meta)
655                    .wrap_with_context(|| format!("serializing the snapshot metadata for {object_id:?} at {last_applied_event_id:?}"))?;
656                let new_last_snapshot_js = to_js(&last_snapshot)
657                    .wrap_with_context(|| format!("serializing the snapshot for {object_id:?} at {last_applied_event_id:?}"))?;
658                snapshots_meta.add(&new_last_snapshot_meta_js).await.map_err(|err| match err {
659                    indexed_db::Error::AlreadyExists => crate::Error::EventAlreadyExists(event_id),
660                    e => crate::Error::Other(anyhow::Error::from(e).context(format!("saving new last snapshot metadata for {object_id:?} at {last_applied_event_id:?}"))),
661
662                })?;
663                snapshots.add_kv(&last_applied_event_id_js, &new_last_snapshot_js)
664                    .await
665                    .wrap_with_context(|| format!("saving new last snapshot data for {object_id:?} at {last_applied_event_id:?}"))?;
666
667                // And finally, check that all required binaries are present
668                check_required_binaries(binaries, event.required_binaries()).await
669                    .wrap_with_context(|| format!("checking that all the binaries required by {event_id:?} are already present"))?;
670
671                Ok(Some(Arc::new(last_snapshot)))
672            })
673            .await
674            .wrap_with_context(|| {
675                format!("running submission creation for {event_id:?} on {object_id:?}")
676            })
677    }
678
679    async fn get_latest<T: Object>(
680        &self,
681        object_id: ObjectId,
682        importance: Importance,
683    ) -> crate::Result<Arc<T>> {
684        let object_id_js = object_id.to_js_string();
685        let type_id_js = T::type_ulid().to_js_string();
686        let mut transaction =
687            self.db
688                .transaction(&["snapshots", "snapshots_meta", "events", "events_meta"]);
689        if importance != Importance::NONE {
690            transaction = transaction.rw();
691        }
692        transaction
693            .run(move |transaction| async move {
694                let snapshots = transaction
695                    .object_store("snapshots")
696                    .wrap_context("retrieving 'snapshots' object store")?;
697                let snapshots_meta = transaction
698                    .object_store("snapshots_meta")
699                    .wrap_context("retrieving 'snapshots_meta' object store")?;
700
701                let creation_object = snapshots_meta
702                    .index("creation_object")
703                    .wrap_context("retrieving 'creation_object' index")?;
704                let latest_type_object = snapshots_meta
705                    .index("latest_type_object")
706                    .wrap_context("retrieving 'latest_type_object' index")?;
707
708                // Figure out the creation snapshot to validate input
709                let creation_snapshot_meta_js = creation_object
710                    .get(&Array::from_iter([&JsValue::from(1), &object_id_js]))
711                    .await
712                    .wrap_with_context(|| {
713                        format!("fetching creation snapshot metadata for {object_id:?}")
714                    })?
715                    .ok_or_else(|| crate::Error::ObjectDoesNotExist(object_id))?;
716                let mut creation_snapshot_meta =
717                    serde_wasm_bindgen::from_value::<SnapshotMeta>(creation_snapshot_meta_js)
718                        .wrap_with_context(|| {
719                            format!("deserializing creation snapshot metadata for {object_id:?}")
720                        })?;
721                if creation_snapshot_meta.type_id != *T::type_ulid() {
722                    return Err(crate::Error::WrongType {
723                        object_id,
724                        expected_type_id: *T::type_ulid(),
725                        real_type_id: creation_snapshot_meta.type_id,
726                    }
727                    .into());
728                }
729
730                // Rewrite the creation snapshot if needed
731                let importance_before = creation_snapshot_meta.importance;
732                creation_snapshot_meta.importance =
733                    Some(importance_before.unwrap_or(Importance::NONE) | importance);
734                if importance_before != creation_snapshot_meta.importance {
735                    let new_snapshot_js = to_js(creation_snapshot_meta)
736                        .wrap_context("serializing snapshot metadata")?;
737                    snapshots_meta
738                        .put(&new_snapshot_js)
739                        .await
740                        .wrap_with_context(|| {
741                            format!("locking creation snapshot for {object_id:?} in database")
742                        })?;
743                }
744
745                // Get the latest snapshot
746                let latest_snapshot_meta_js = latest_type_object
747                    .get(&Array::from_iter([
748                        &JsValue::from(1),
749                        &type_id_js,
750                        &object_id_js,
751                    ]))
752                    .await
753                    .wrap_with_context(|| {
754                        format!("fetching latest snapshot metadata for {object_id:?}")
755                    })?
756                    .ok_or_else(|| {
757                        crate::Error::Other(anyhow!(
758                            "failed to recover metadata for latest snapshot of {object_id:?}"
759                        ))
760                    })?;
761                let latest_snapshot_meta =
762                    serde_wasm_bindgen::from_value::<SnapshotMeta>(latest_snapshot_meta_js)
763                        .wrap_with_context(|| {
764                            format!("deserializing latest snapshot metadata for {object_id:?}")
765                        })?;
766                let latest_snapshot_id = latest_snapshot_meta.snapshot_id;
767                let latest_snapshot_id_js = latest_snapshot_id.to_js_string();
768                let latest_snapshot_js = snapshots
769                    .get(&latest_snapshot_id_js)
770                    .await
771                    .wrap_with_context(|| {
772                        format!("fetching snapshot data for {latest_snapshot_id:?}")
773                    })?
774                    .ok_or_else(|| {
775                        crate::Error::Other(anyhow!(
776                            "failed to recover data for snapshot {latest_snapshot_id:?}"
777                        ))
778                    })?;
779                let latest_snapshot = Arc::new(
780                    parse_snapshot_js::<T>(
781                        latest_snapshot_meta.snapshot_version,
782                        latest_snapshot_js,
783                    )
784                    .wrap_with_context(|| {
785                        format!("deserializing snapshot data for {latest_snapshot_id:?}")
786                    })?,
787                );
788
789                // Return the latest snapshot
790                Ok(latest_snapshot)
791            })
792            .await
793            .wrap_with_context(|| format!("retrieving {object_id:?} from IndexedDB"))
794    }
795
796    async fn create_binary(&self, binary_id: BinPtr, data: Arc<[u8]>) -> crate::Result<()> {
797        if crdb_core::hash_binary(&data) != binary_id {
798            return Err(crate::Error::BinaryHashMismatch(binary_id));
799        }
800        let ary = Uint8Array::new_with_length(u32::try_from(data.len()).unwrap());
801        ary.copy_from(&data);
802        let res = self
803            .db
804            .transaction(&["binaries"])
805            .rw()
806            .run(move |transaction| async move {
807                let binaries = transaction
808                    .object_store("binaries")
809                    .wrap_context("retrieving the 'binaries' object store")?;
810
811                binaries
812                    .put_kv(&binary_id.to_js_string(), &ary)
813                    .await
814                    .wrap_context("writing binary")?;
815
816                Ok(())
817            })
818            .await
819            .wrap_with_context(|| format!("writing {binary_id:?}"));
820        if res.is_ok() {
821            self.objects_unlocked_this_run
822                .set(self.objects_unlocked_this_run.get() + 1);
823        }
824        res
825    }
826
827    async fn get_binary(&self, binary_id: BinPtr) -> crate::Result<Option<Arc<[u8]>>> {
828        let ary = self
829            .db
830            .transaction(&["binaries"])
831            .run(move |transaction| async move {
832                let binaries = transaction
833                    .object_store("binaries")
834                    .wrap_context("retrieving the 'binaries' object store")?;
835
836                binaries.get(&binary_id.to_js_string()).await
837            })
838            .await
839            .wrap_with_context(|| format!("fetching {binary_id:?}"))?;
840        let Some(ary) = ary else {
841            return Ok(None);
842        };
843        let ary = ary
844            .dyn_into::<Uint8Array>()
845            .wrap_context("recovering Uint8Array from stored data")?;
846        Ok(Some(ary.to_vec().into_boxed_slice().into()))
847    }
848
849    /// Returns the number of errors that happened while re-encoding
850    async fn reencode_old_versions<T: Object>(&self) -> usize {
851        let res = self
852            .db
853            .transaction(&["snapshots_meta", "snapshots"])
854            .rw()
855            .run(|transaction| async move {
856                let snapshots_meta = transaction
857                    .object_store("snapshots_meta")
858                    .wrap_context("retrieving snapshots_meta object store")?;
859                let mut cursor = snapshots_meta
860                    .cursor()
861                    .open()
862                    .await
863                    .wrap_context("opening cursor over all snapshots")?;
864                let mut num_errors = 0;
865                // TODO(perf-high): only re-encode creation and latest snapshots, deleting intermediate snapshots
866                while let Some(snapshot_meta_js) = cursor.value() {
867                    let snapshot_meta =
868                        serde_wasm_bindgen::from_value::<SnapshotMeta>(snapshot_meta_js)
869                            .wrap_context("deserializing snapshot metadata")?;
870                    if snapshot_meta.type_id == *T::type_ulid()
871                        && (snapshot_meta.snapshot_version < T::snapshot_version()
872                            || snapshot_meta.normalizer_version < normalizer_version())
873                    {
874                        let snapshot_id = snapshot_meta.snapshot_id;
875                        if let Err(err) = reencode_snapshot::<T>(
876                            &transaction,
877                            snapshot_meta.snapshot_id,
878                            snapshot_meta.snapshot_version,
879                        )
880                        .await
881                        {
882                            let type_id = *T::type_ulid();
883                            tracing::error!(
884                                ?err,
885                                ?snapshot_id,
886                                ?type_id,
887                                "failed reencoding snapshot to latest version"
888                            );
889                            num_errors += 1;
890                        }
891                    }
892                    cursor
893                        .advance(1)
894                        .await
895                        .wrap_context("advancing cursor to next item")?;
896                }
897                Ok(num_errors)
898            })
899            .await
900            .wrap_with_context(|| format!("reencoding old versions of type {:?}", T::type_ulid()));
901        match res {
902            Ok(num_errs) => num_errs,
903            Err(err) => {
904                tracing::error!(?err, type_id=?T::type_ulid(), "failed running transaction to reencode all objects");
905                1
906            }
907        }
908    }
909
910    async fn assert_invariants_generic(&self) {
911        use std::collections::hash_map;
912
913        self.db
914            .transaction(&[
915                "snapshots_meta",
916                "events_meta",
917                "upload_queue_meta",
918                "binaries",
919            ])
920            .run(move |transaction| async move {
921                let snapshots_meta = transaction.object_store("snapshots_meta").unwrap();
922                let events_meta = transaction.object_store("events_meta").unwrap();
923                let binaries = transaction.object_store("binaries").unwrap();
924
925                let creation_object = snapshots_meta.index("creation_object").unwrap();
926
927                // All binaries are present
928                let required_binaries = Self::list_required_binaries(&transaction).await.unwrap();
929                for b in required_binaries {
930                    if !binaries.contains(&to_js(b).unwrap()).await.unwrap() {
931                        panic!("missing required binary {b:?}");
932                    }
933                }
934
935                // No event references an object without a creation snapshot
936                let mut event_cursor = events_meta.cursor().open().await.unwrap();
937                while let Some(e) = event_cursor.value() {
938                    let e = serde_wasm_bindgen::from_value::<EventMeta>(e).unwrap();
939                    if !creation_object
940                        .contains(&Array::from_iter([
941                            &JsValue::from(1),
942                            &e.object_id.to_js_string(),
943                        ]))
944                        .await
945                        .unwrap()
946                    {
947                        panic!(
948                            "event {:?} references object {:?} that has no creation snapshot",
949                            e.event_id, e.object_id
950                        );
951                    }
952                    event_cursor.advance(1).await.unwrap();
953                }
954
955                // All non-creation snapshots match an event, on the same object
956                let mut snapshot_cursor = snapshots_meta.cursor().open().await.unwrap();
957                while let Some(s) = snapshot_cursor.value() {
958                    let s = serde_wasm_bindgen::from_value::<SnapshotMeta>(s).unwrap();
959                    let e = events_meta
960                        .get(&s.snapshot_id.to_js_string())
961                        .await
962                        .unwrap();
963                    if s.is_creation.is_none() && !e.is_some() {
964                        panic!("snapshot {:?} has no corresponding event", s.snapshot_id);
965                    }
966                    if let Some(e) = e {
967                        let e = serde_wasm_bindgen::from_value::<EventMeta>(e).unwrap();
968                        if e.object_id != s.object_id {
969                            panic!(
970                                "object for snapshot and event at {:?} does not match",
971                                s.snapshot_id
972                            );
973                        }
974                    }
975                    snapshot_cursor.advance(1).await.unwrap();
976                }
977
978                // All objects have a single type
979                let mut snapshot_cursor = snapshots_meta.cursor().open().await.unwrap();
980                let mut types = HashMap::new();
981                while let Some(s) = snapshot_cursor.value() {
982                    let s = serde_wasm_bindgen::from_value::<SnapshotMeta>(s).unwrap();
983                    match types.entry(s.object_id) {
984                        hash_map::Entry::Occupied(o) => {
985                            if *o.get() != s.type_id {
986                                panic!("object {:?} has multiple type ids", s.object_id);
987                            }
988                        }
989                        hash_map::Entry::Vacant(v) => {
990                            v.insert(s.type_id);
991                        }
992                    }
993                    snapshot_cursor.advance(1).await.unwrap();
994                }
995
996                Ok(())
997            })
998            .await
999            .unwrap();
1000    }
1001
1002    async fn assert_invariants_for<T: Object>(&self) {
1003        use std::collections::BTreeMap;
1004
1005        self.db
1006            .transaction(&["snapshots", "snapshots_meta", "events", "events_meta"])
1007            .run(move |transaction| async move {
1008                let snapshots_store = transaction.object_store("snapshots").unwrap();
1009                let snapshots_meta = transaction.object_store("snapshots_meta").unwrap();
1010                let events_store = transaction.object_store("events").unwrap();
1011                let events_meta = transaction.object_store("events_meta").unwrap();
1012
1013                // Fetch all snapshots
1014                let snapshots = snapshots_meta.get_all(None).await.unwrap();
1015                let snapshots = snapshots
1016                    .into_iter()
1017                    .map(|s| serde_wasm_bindgen::from_value::<SnapshotMeta>(s).unwrap())
1018                    .collect::<Vec<_>>();
1019                let objects = snapshots
1020                    .iter()
1021                    .filter_map(|s| (s.type_id == *T::type_ulid()).then(|| s.object_id))
1022                    .collect::<HashSet<_>>();
1023                let mut object_snapshots_map = HashMap::new();
1024                for o in objects.iter() {
1025                    object_snapshots_map.insert(*o, BTreeMap::new());
1026                }
1027                for s in snapshots {
1028                    if let Some(o) = object_snapshots_map.get_mut(&s.object_id) {
1029                        o.insert(s.snapshot_id, s);
1030                    }
1031                }
1032
1033                // Fetch all events
1034                let events = events_meta.get_all(None).await.unwrap();
1035                let events = events
1036                    .into_iter()
1037                    .map(|e| serde_wasm_bindgen::from_value::<EventMeta>(e).unwrap())
1038                    .collect::<Vec<_>>();
1039                let mut object_events_map = HashMap::new();
1040                for o in objects.iter() {
1041                    object_events_map.insert(*o, BTreeMap::new());
1042                }
1043                for e in events {
1044                    if let Some(o) = object_events_map.get_mut(&e.object_id) {
1045                        o.insert(e.event_id, e);
1046                    }
1047                }
1048
1049                // For each object
1050                for object_id in objects {
1051                    let snapshots = object_snapshots_map.get(&object_id).unwrap();
1052                    let events = object_events_map.get(&object_id).unwrap();
1053
1054                    // It has a creation and a latest snapshot that surround all other snapshots
1055                    let creation = snapshots.first_key_value().unwrap().1;
1056                    let latest = snapshots.last_key_value().unwrap().1;
1057                    assert!(creation.is_creation == Some(1));
1058                    assert!(latest.is_latest == Some(1));
1059                    assert!(creation.importance.is_some());
1060                    // TODO(test-high): assert importance_from_queries is well-set
1061                    assert!(creation.importance_from_queries.is_some());
1062                    if creation.snapshot_id == latest.snapshot_id {
1063                        continue;
1064                    }
1065
1066                    // Creation and latest snapshots surround all other events
1067                    assert!(events.first_key_value().unwrap().0 > &creation.snapshot_id);
1068                    assert!(events.last_key_value().unwrap().0 == &latest.snapshot_id);
1069
1070                    // Rebuilding the object gives the same snapshots
1071                    let object = snapshots_store
1072                        .get(&creation.snapshot_id.to_js_string())
1073                        .await
1074                        .unwrap()
1075                        .unwrap();
1076                    let mut object =
1077                        parse_snapshot_js::<T>(creation.snapshot_version, object).unwrap();
1078                    for (event_id, event_meta) in events.iter() {
1079                        let e = events_store
1080                            .get(&event_id.to_js_string())
1081                            .await
1082                            .unwrap()
1083                            .unwrap();
1084                        let e = serde_wasm_bindgen::from_value::<T::Event>(e).unwrap();
1085                        assert_eq!(event_meta.required_binaries, e.required_binaries());
1086                        object.apply(DbPtr::from(object_id), &e);
1087                        if let Some(snapshot_meta) = snapshots.get(&event_id) {
1088                            let s = snapshots_store
1089                                .get(&event_id.to_js_string())
1090                                .await
1091                                .unwrap()
1092                                .unwrap();
1093                            let s =
1094                                parse_snapshot_js::<T>(snapshot_meta.snapshot_version, s).unwrap();
1095                            assert!(object == s);
1096                            assert!(snapshot_meta.required_binaries == object.required_binaries());
1097                            assert!(snapshot_meta.type_id == *T::type_ulid());
1098                            assert!(snapshot_meta.importance.is_none());
1099                            assert!(snapshot_meta.importance_from_queries.is_none());
1100                        }
1101                    }
1102                }
1103
1104                Ok(())
1105            })
1106            .await
1107            .unwrap();
1108    }
1109}
1110
1111impl ClientSideDb for IndexedDb {
1112    async fn storage_info(&self) -> crate::Result<ClientStorageInfo> {
1113        let window = web_sys::window()
1114            .ok_or_else(|| crate::Error::Other(anyhow!("not running in a browser")))?;
1115        let storage = window.navigator().storage();
1116        let estimate = storage.estimate().map_err(|_| {
1117            crate::Error::Other(anyhow!(
1118                "failed getting a storage estimate from the browser"
1119            ))
1120        })?;
1121        let estimate = wasm_bindgen_futures::JsFuture::from(estimate)
1122            .await
1123            .map_err(|_| {
1124                crate::Error::Other(anyhow!("storage estimate promise returned an error"))
1125            })?;
1126        let quota = js_sys::Reflect::get(&estimate, &JsString::from("quota"))
1127            .map_err(|_| crate::Error::Other(anyhow!("storage estimate had no quota field")))?
1128            .as_f64()
1129            .ok_or_else(|| {
1130                crate::Error::Other(anyhow!("storage estimate for quota was not a number"))
1131            })? as usize;
1132        let usage = js_sys::Reflect::get(&estimate, &JsString::from("usage"))
1133            .map_err(|_| crate::Error::Other(anyhow!("storage estimate had no quota field")))?
1134            .as_f64()
1135            .ok_or_else(|| {
1136                crate::Error::Other(anyhow!("storage estimate for quota was not a number"))
1137            })? as usize;
1138        Ok(ClientStorageInfo {
1139            quota,
1140            usage,
1141            objects_unlocked_this_run: self.objects_unlocked_this_run.get(),
1142        })
1143    }
1144
1145    async fn save_login(&self, info: LoginInfo) -> crate::Result<()> {
1146        let info_js = to_js(info).wrap_context("serializing login info")?;
1147        self.db
1148            .transaction(&["config"])
1149            .rw()
1150            .run(move |transaction| async move {
1151                transaction
1152                    .object_store("config")
1153                    .wrap_context("retrieving 'config' object store")?
1154                    .put_kv(&JsString::from(CONFIG_SAVED_LOGIN), &info_js)
1155                    .await
1156                    .wrap_context("saving login info to database")?;
1157                Ok(())
1158            })
1159            .await
1160            .wrap_context("saving login info to database")
1161    }
1162
1163    async fn get_saved_login(&self) -> crate::Result<Option<LoginInfo>> {
1164        let saved_info = self
1165            .db
1166            .transaction(&["config"])
1167            .run(move |transaction| async move {
1168                let saved_info = transaction
1169                    .object_store("config")
1170                    .wrap_context("retrieving 'config' object store")?
1171                    .get(&JsString::from(CONFIG_SAVED_LOGIN))
1172                    .await
1173                    .wrap_context("retrieving login from database")?;
1174                Ok(saved_info)
1175            })
1176            .await
1177            .wrap_context("retrieving login from database")?;
1178        let Some(saved_info) = saved_info else {
1179            return Ok(None);
1180        };
1181        let saved_info = serde_wasm_bindgen::from_value(saved_info)
1182            .wrap_context("deserializing saved login info")?;
1183        Ok(Some(saved_info))
1184    }
1185
1186    async fn remove_everything(&self) -> crate::Result<()> {
1187        self.db
1188            .transaction(OBJECT_STORE_LIST)
1189            .rw()
1190            .run(move |transaction| async move {
1191                for store in OBJECT_STORE_LIST {
1192                    transaction
1193                        .object_store(store)
1194                        .wrap_with_context(|| format!("retrieving {store:?} object store"))?
1195                        .clear()
1196                        .await
1197                        .wrap_with_context(|| format!("clearing {store:?} object store"))?;
1198                }
1199                Ok(())
1200            })
1201            .await
1202            .wrap_context("clearing the IndexedDB database")
1203    }
1204
1205    async fn recreate<T: Object>(
1206        &self,
1207        object_id: ObjectId,
1208        new_created_at: EventId,
1209        mut object: Arc<T>,
1210        updatedness: Option<Updatedness>,
1211        additional_importance: Importance,
1212    ) -> crate::Result<Option<Arc<T>>> {
1213        let object_id_js = object_id.to_js_string();
1214        let type_id_js = T::type_ulid().to_js_string();
1215        let new_created_at_js = new_created_at.to_js_string();
1216        let max_id = EventId::from_u128(u128::MAX).to_js_string();
1217        let zero_id = EventId::from_u128(0).to_js_string();
1218        let required_binaries = object.required_binaries();
1219
1220        self.db
1221            .transaction(&[
1222                "snapshots",
1223                "snapshots_meta",
1224                "events",
1225                "events_meta",
1226                "binaries",
1227            ])
1228            .rw()
1229            .run(move |transaction| async move {
1230                let snapshots = transaction
1231                    .object_store("snapshots")
1232                    .wrap_context("retrieving 'snapshots' object store")?;
1233                let snapshots_meta = transaction
1234                    .object_store("snapshots_meta")
1235                    .wrap_context("retrieving 'snapshots_meta' object store")?;
1236                let events = transaction
1237                    .object_store("events")
1238                    .wrap_context("retrieving 'events' object store")?;
1239                let events_meta = transaction
1240                    .object_store("events_meta")
1241                    .wrap_context("retrieving 'events_meta' object store")?;
1242                let binaries = transaction
1243                    .object_store("binaries")
1244                    .wrap_context("retrieving the 'binaries' object store")?;
1245
1246                let creation_object = snapshots_meta
1247                    .index("creation_object")
1248                    .wrap_context("retrieving 'creation_object' index")?;
1249                let latest_type_object = snapshots_meta
1250                    .index("latest_type_object")
1251                    .wrap_context("retrieving 'latest_type_object' index")?;
1252                let object_snapshot = snapshots_meta
1253                    .index("object_snapshot")
1254                    .wrap_context("retrieving 'object_snapshot' index")?;
1255
1256                let object_event = events_meta
1257                    .index("object_event")
1258                    .wrap_context("retrieving 'object_event' index")?;
1259
1260                // Get the current creation snapshot
1261                let Some(creation_meta) = creation_object
1262                    .get(&Array::from_iter([&JsValue::from(1), &object_id_js]))
1263                    .await
1264                    .wrap_context("checking creation object")?
1265                else {
1266                    // Object does not exist, create it
1267                    return Self::create_impl::<T>(
1268                        transaction,
1269                        object_id,
1270                        new_created_at,
1271                        object,
1272                        updatedness,
1273                        additional_importance,
1274                        Importance::NONE, // TODO(api-highest): this is definitely wrong, and must be fixed with rethinking the API
1275                    )
1276                    .await;
1277                };
1278                let creation_meta = serde_wasm_bindgen::from_value::<SnapshotMeta>(creation_meta)
1279                    .wrap_context("parsing snapshot metadata")?;
1280                if creation_meta.type_id != *T::type_ulid() {
1281                    return Err(crate::Error::WrongType {
1282                        object_id,
1283                        expected_type_id: *T::type_ulid(),
1284                        real_type_id: creation_meta.type_id,
1285                    }
1286                    .into());
1287                }
1288                if creation_meta.snapshot_id > new_created_at {
1289                    return Err(crate::Error::EventTooEarly {
1290                        event_id: new_created_at,
1291                        object_id,
1292                        created_at: creation_meta.snapshot_id,
1293                    }
1294                    .into());
1295                }
1296
1297                // TODO(perf-low): should make this happen as part of the walk happening anyway in to_js
1298                check_strings(
1299                    &serde_json::to_value(&*object).wrap_context("serializing to json")?,
1300                )?;
1301
1302                // Check if the requested new_created_at is after the current latest snapshot
1303                let latest_snapshot_meta_js = latest_type_object
1304                    .get(&Array::from_iter([
1305                        &JsValue::from(1),
1306                        &type_id_js,
1307                        &object_id_js,
1308                    ]))
1309                    .await
1310                    .wrap_context("retrieving latest snapshot")?
1311                    .ok_or_else(|| {
1312                        crate::Error::Other(anyhow!(
1313                            "No latest snapshot for an object with a creation snapshot"
1314                        ))
1315                    })?;
1316                let latest_snapshot_meta =
1317                    serde_wasm_bindgen::from_value::<SnapshotMeta>(latest_snapshot_meta_js)
1318                        .wrap_context("deserializing latest snapshot metadata")?;
1319                let new_creation_should_be_latest =
1320                    latest_snapshot_meta.snapshot_id <= new_created_at;
1321
1322                // Delete all events prior to the new creation snapshot
1323                let mut to_delete = object_event
1324                    .cursor()
1325                    .range(
1326                        &**Array::from_iter([&object_id_js, &zero_id])
1327                            ..=&**Array::from_iter([&object_id_js, &new_created_at_js]),
1328                    )
1329                    .wrap_context("limiting the range for the events to delete")?
1330                    .open()
1331                    .await
1332                    .wrap_context("opening cursor of all events to delete")?;
1333                while let Some(event_id_js) = to_delete.primary_key() {
1334                    events
1335                        .delete(&event_id_js)
1336                        .await
1337                        .wrap_context("deleting event data")?;
1338                    to_delete
1339                        .delete()
1340                        .await
1341                        .wrap_context("deleting event metadata")?;
1342                    to_delete
1343                        .advance(1)
1344                        .await
1345                        .wrap_context("moving to next to-apply event")?;
1346                }
1347
1348                // Delete all snapshots for the object, we'll recreate creation and latest soon
1349                let mut to_delete = object_snapshot
1350                    .cursor()
1351                    .range(
1352                        &**Array::from_iter([&object_id_js, &zero_id])
1353                            ..=&**Array::from_iter([&object_id_js, &max_id]),
1354                    )
1355                    .wrap_context("limiting to-delete snapshot range")?
1356                    .open()
1357                    .await
1358                    .wrap_context("opening cursor of snapshots to delete")?;
1359                while let Some(snapshot_id_js) = to_delete.primary_key() {
1360                    snapshots
1361                        .delete(&snapshot_id_js)
1362                        .await
1363                        .wrap_context("deleting snapshot data")?;
1364                    to_delete
1365                        .delete()
1366                        .await
1367                        .wrap_context("deleting snapshot metadata")?;
1368                    to_delete
1369                        .advance(1)
1370                        .await
1371                        .wrap_context("moving to next to-delete snapshot")?;
1372                }
1373
1374                // Write the new creation snapshot data
1375                let object_js = to_js(&object).wrap_context("serializing new creation snapshot")?;
1376                snapshots
1377                    .add_kv(&new_created_at_js, &object_js)
1378                    .await
1379                    .wrap_context("saving new creation snapshot data")?;
1380
1381                // And the new creation snapshot metadata
1382                let new_creation_snapshot_meta = SnapshotMeta {
1383                    snapshot_id: new_created_at,
1384                    type_id: *T::type_ulid(),
1385                    object_id,
1386                    is_creation: Some(1),
1387                    is_latest: new_creation_should_be_latest.then(|| 1),
1388                    normalizer_version: normalizer_version(),
1389                    snapshot_version: T::snapshot_version(),
1390                    have_all_until: updatedness,
1391                    importance: Some(
1392                        creation_meta.importance.unwrap_or(Importance::NONE)
1393                            | additional_importance,
1394                    ),
1395                    // TODO(api-high): is this actually correct? Maybe recreate can
1396                    // edit the latest snapshot, thus changing queries?
1397                    importance_from_queries: creation_meta.importance_from_queries,
1398                    required_binaries: required_binaries.clone(),
1399                };
1400                let new_creation_snapshot_meta_js = to_js(new_creation_snapshot_meta)
1401                    .wrap_context("serializing snapshot metadata")?;
1402                snapshots_meta
1403                    .put(&new_creation_snapshot_meta_js)
1404                    .await
1405                    .wrap_context("saving the new creation snapshot metadata")?;
1406
1407                // Re-compute the latest snapshot if needed
1408                if !new_creation_should_be_latest {
1409                    let mut last_applied_event_id = new_created_at;
1410                    let mut_object = Arc::make_mut(&mut object);
1411                    // Read all the events one by one, applying them to the object
1412                    let mut to_apply = object_event
1413                        .cursor()
1414                        .range(
1415                            &**Array::from_iter([&object_id_js, &zero_id])
1416                                ..=&**Array::from_iter([&object_id_js, &max_id]),
1417                        )
1418                        .wrap_context("limiting the range of events to apply")?
1419                        .open_key()
1420                        .await
1421                        .wrap_context("opening the cursor on events to apply")?;
1422                    while let Some(event_id_js) = to_apply.primary_key() {
1423                        let event_js = events
1424                            .get(&event_id_js)
1425                            .await
1426                            .wrap_context(
1427                                "retrieving data for an event for which we have the metadata",
1428                            )?
1429                            .ok_or_else(|| {
1430                                crate::Error::Other(anyhow!(
1431                                    "had no event data for an event with known metadata"
1432                                ))
1433                            })?;
1434                        let event_id = serde_wasm_bindgen::from_value::<EventId>(event_id_js)
1435                            .wrap_context("deserializing event id")?;
1436                        last_applied_event_id = event_id;
1437                        let event = serde_wasm_bindgen::from_value::<T::Event>(event_js)
1438                            .wrap_context("deserializing event data")?;
1439                        mut_object.apply(DbPtr::from(object_id), &event);
1440                        to_apply
1441                            .advance(1)
1442                            .await
1443                            .wrap_context("advancing events-to-apply cursor")?;
1444                    }
1445
1446                    // Write the metadata
1447                    let new_latest_snapshot_meta = SnapshotMeta {
1448                        snapshot_id: last_applied_event_id,
1449                        type_id: *T::type_ulid(),
1450                        object_id,
1451                        is_creation: None,
1452                        is_latest: Some(1),
1453                        normalizer_version: normalizer_version(),
1454                        snapshot_version: T::snapshot_version(),
1455                        have_all_until: updatedness,
1456                        importance: None,
1457                        importance_from_queries: None,
1458                        required_binaries: object.required_binaries(),
1459                    };
1460                    let new_latest_snapshot_meta_js = to_js(new_latest_snapshot_meta)
1461                        .wrap_context("serializing snapshot metadata")?;
1462                    snapshots_meta
1463                        .put(&new_latest_snapshot_meta_js)
1464                        .await
1465                        .wrap_context("saving the new latest snapshot metadata")?;
1466
1467                    // And the data
1468                    let last_applied_event_id_js = last_applied_event_id.to_js_string();
1469                    let object_js =
1470                        to_js(&object).wrap_context("serializing new latest snapshot")?;
1471                    snapshots
1472                        .add_kv(&last_applied_event_id_js, &object_js)
1473                        .await
1474                        .wrap_context("saving new latest snapshot data")?;
1475                }
1476
1477                // And finally, validate the required binaries
1478                check_required_binaries(binaries, required_binaries).await?;
1479
1480                Ok(Some(object))
1481            })
1482            .await
1483            .wrap_with_context(|| {
1484                format!("recreating {object_id:?} with new data from {new_created_at:?}")
1485            })
1486    }
1487
1488    async fn get_json(
1489        &self,
1490        _object_id: ObjectId,
1491        _importance: Importance,
1492    ) -> crate::Result<serde_json::Value> {
1493        todo!() // TODO(api-highest): implement if it's still here after the API rethinking
1494    }
1495
1496    async fn client_query(
1497        &self,
1498        type_id: TypeId,
1499        query: Arc<Query>,
1500    ) -> crate::Result<Vec<ObjectId>> {
1501        query.check()?;
1502        let type_id_js = type_id.to_js_string();
1503        let zero_id = EventId::from_u128(0).to_js_string();
1504        let max_id = EventId::from_u128(u128::MAX).to_js_string();
1505        // TODO(perf-low): look into setting up indexes and allowing the user to use them?
1506        // TODO(perf-low): think a lot about splitting this transaction to be able to return a real stream by
1507        // using cursors? The difficulty will be that another task could clobber the latest index
1508        // during that time.
1509
1510        // List all objects matching the query
1511        let objects = self
1512            .db
1513            .transaction(&["snapshots_meta", "snapshots"])
1514            .run(move |transaction| async move {
1515                let snapshots = transaction
1516                    .object_store("snapshots")
1517                    .wrap_context("retrieving 'snapshots' object store")?;
1518                let snapshots_meta = transaction
1519                    .object_store("snapshots_meta")
1520                    .wrap_context("retrieving 'snapshots_meta' object store")?;
1521
1522                let latest_type_object = snapshots_meta
1523                    .index("latest_type_object")
1524                    .wrap_context("retrieving 'latest_type_object' index")?;
1525
1526                let mut cursor = latest_type_object
1527                    .cursor()
1528                    .range(
1529                        &**Array::from_iter([&JsValue::from(1), &type_id_js, &zero_id])
1530                            ..=&**Array::from_iter([&JsValue::from(1), &type_id_js, &max_id]),
1531                    )
1532                    .wrap_context("limiting cursor to only snapshots of the right type")?
1533                    .open_key()
1534                    .await
1535                    .wrap_context("opening cursor over all latest objects")?;
1536                let mut objects = Vec::new();
1537                while let Some(snapshot_id_js) = cursor.primary_key() {
1538                    let snapshot = snapshots
1539                        .get(&snapshot_id_js)
1540                        .await
1541                        .wrap_context("retrieving snapshot data for known metadata")?
1542                        .ok_or_else(|| {
1543                            crate::Error::Other(anyhow!("no snapshot data for known metadata"))
1544                        })?;
1545                    let snapshot = serde_wasm_bindgen::from_value::<serde_json::Value>(snapshot)
1546                        .wrap_context("deserializing snapshot data as serde_json::Value")?;
1547                    if query.matches_json(&snapshot) {
1548                        let object_id_js = cursor
1549                            .key()
1550                            .ok_or_else(|| {
1551                                crate::Error::Other(anyhow!("cursor had a primary key but no key"))
1552                            })?
1553                            .dyn_into::<Array>()
1554                            .wrap_context("cursor key was not an array")?
1555                            .get(2);
1556                        let object_id = serde_wasm_bindgen::from_value::<ObjectId>(object_id_js)
1557                            .wrap_context("deserializing object id")?;
1558                        objects.push(object_id);
1559                    }
1560                    cursor
1561                        .advance(1)
1562                        .await
1563                        .wrap_context("going to next snapshot in the database")?;
1564                }
1565                Ok(objects)
1566            })
1567            .await
1568            .wrap_context("finding a first snapshot to answer")?;
1569
1570        // Retrieve them one by one
1571        Ok(objects)
1572    }
1573
1574    async fn remove(&self, object_id: ObjectId) -> crate::Result<()> {
1575        let object_id_js = object_id.to_js_string();
1576        let zero_id = EventId::from_u128(0).to_js_string();
1577        let max_id = EventId::from_u128(u128::MAX).to_js_string();
1578
1579        self.db
1580            .transaction(&["snapshots", "snapshots_meta", "events", "events_meta"])
1581            .rw()
1582            .run(move |transaction| async move {
1583                let snapshots_meta = transaction
1584                    .object_store("snapshots_meta")
1585                    .wrap_context("retrieving the 'snapshots_meta' object store")?;
1586                let events_meta = transaction
1587                    .object_store("events_meta")
1588                    .wrap_context("retrieving the 'events_meta' object store")?;
1589                let snapshots = transaction
1590                    .object_store("snapshots")
1591                    .wrap_context("retrieving the 'snapshots' object store")?;
1592                let events = transaction
1593                    .object_store("events")
1594                    .wrap_context("retrieving the 'events' object store")?;
1595
1596                let creation_object = snapshots_meta
1597                    .index("creation_object")
1598                    .wrap_context("retrieving the 'creation_object' index")?;
1599                let object_snapshot = snapshots_meta
1600                    .index("object_snapshot")
1601                    .wrap_context("retrieving the 'object_snapshot' index")?;
1602
1603                let object_event = events_meta
1604                    .index("object_event")
1605                    .wrap_context("retrieving the 'object_event' index")?;
1606
1607                // Check we're good to delete this object
1608                let object_does_exist = creation_object
1609                    .contains(&Array::from_iter([&JsValue::from(1), &object_id_js]))
1610                    .await
1611                    .wrap_context("retrieving creation snapshot")?;
1612                if !object_does_exist {
1613                    // Object already does not exist, so the removal already succeeded
1614                    return Ok(());
1615                };
1616
1617                // We're good to go, delete everything
1618                let mut to_remove = object_snapshot
1619                    .cursor()
1620                    .range(
1621                        &**Array::from_iter([&object_id_js, &zero_id])
1622                            ..=&**Array::from_iter([&object_id_js, &max_id]),
1623                    )
1624                    .wrap_context("limiting to-delete range")?
1625                    .open()
1626                    .await
1627                    .wrap_context("opening to-delete cursor")?;
1628                while let Some(snapshot_id_js) = to_remove.primary_key() {
1629                    snapshots
1630                        .delete(&snapshot_id_js)
1631                        .await
1632                        .wrap_context("deleting snapshot data")?;
1633                    to_remove
1634                        .delete()
1635                        .await
1636                        .wrap_context("deleting snapshot metadata")?;
1637                    to_remove
1638                        .advance(1)
1639                        .await
1640                        .wrap_context("going to next to-delete snapshot")?;
1641                }
1642
1643                let mut to_remove = object_event
1644                    .cursor()
1645                    .range(
1646                        &**Array::from_iter([&object_id_js, &zero_id])
1647                            ..=&**Array::from_iter([&object_id_js, &max_id]),
1648                    )
1649                    .wrap_context("limiting to-delete range")?
1650                    .open()
1651                    .await
1652                    .wrap_context("opening to-delete cursor")?;
1653                while let Some(event_id_js) = to_remove.primary_key() {
1654                    events
1655                        .delete(&event_id_js)
1656                        .await
1657                        .wrap_context("deleting event data")?;
1658                    to_remove
1659                        .delete()
1660                        .await
1661                        .wrap_context("deleting event metadata")?;
1662                    to_remove
1663                        .advance(1)
1664                        .await
1665                        .wrap_context("going to next to-delete event")?;
1666                }
1667
1668                Ok(())
1669            })
1670            .await
1671            .wrap_with_context(|| format!("removing {object_id:?}"))
1672    }
1673
1674    async fn remove_event<T: Object>(
1675        &self,
1676        object_id: ObjectId,
1677        event_id: EventId,
1678    ) -> crate::Result<()> {
1679        let object_id_js = object_id.to_js_string();
1680        let event_id_js = event_id.to_js_string();
1681        let zero_event_id_js = EventId::from_u128(0).to_js_string();
1682        let max_event_id_js = EventId::from_u128(u128::MAX).to_js_string();
1683        self.db
1684            .transaction(&["snapshots", "snapshots_meta", "events", "events_meta"])
1685            .rw()
1686            .run(move |transaction| async move {
1687                let snapshots = transaction
1688                    .object_store("snapshots")
1689                    .wrap_context("retrieving 'snapshots' object store")?;
1690                let snapshots_meta = transaction
1691                    .object_store("snapshots_meta")
1692                    .wrap_context("retrieving 'snapshots_meta' object store")?;
1693                let events = transaction
1694                    .object_store("events")
1695                    .wrap_context("retrieving 'events' object store")?;
1696                let events_meta = transaction
1697                    .object_store("events_meta")
1698                    .wrap_context("retrieving 'events_meta' object store")?;
1699
1700                let creation_object = snapshots_meta
1701                    .index("creation_object")
1702                    .wrap_context("retrieving 'creation_object' index")?;
1703                let object_snapshot = snapshots_meta
1704                    .index("object_snapshot")
1705                    .wrap_context("retrieving 'object_snapshot' index")?;
1706
1707                // Check the object does exist, is of the right type and is not too new
1708                let Some(creation_snapshot_js) = creation_object
1709                    .get(&Array::from_iter([&JsValue::from(1), &object_id_js]))
1710                    .await
1711                    .wrap_with_context(|| format!("checking that {object_id:?} already exists"))?
1712                else {
1713                    return Err(crate::Error::ObjectDoesNotExist(object_id).into());
1714                };
1715                let creation_snapshot =
1716                    serde_wasm_bindgen::from_value::<SnapshotMeta>(creation_snapshot_js)
1717                        .wrap_with_context(|| {
1718                            format!("deserializing creation snapshot metadata for {object_id:?}")
1719                        })?;
1720                if creation_snapshot.type_id != *T::type_ulid() {
1721                    return Err(crate::Error::WrongType {
1722                        object_id,
1723                        expected_type_id: *T::type_ulid(),
1724                        real_type_id: creation_snapshot.type_id,
1725                    }
1726                    .into());
1727                }
1728                if creation_snapshot.snapshot_id >= event_id {
1729                    return Err(crate::Error::EventTooEarly {
1730                        event_id,
1731                        object_id,
1732                        created_at: creation_snapshot.snapshot_id,
1733                    }
1734                    .into());
1735                }
1736
1737                // Check whether the event actually exists
1738                if !events_meta.contains(&event_id_js).await.wrap_context("checking whether to-remove event does exist")? {
1739                    // Event already removed, we're good to go
1740                    return Ok(());
1741                }
1742
1743                // Remove the event itself
1744                events_meta.delete(&event_id_js).await.wrap_context("deleting to-remove event metadata")?;
1745                events.delete(&event_id_js).await.wrap_context("deleting to-remove event data")?;
1746
1747                // Figure out the current `have_all_until` field
1748                let latest_snapshot_meta_js = object_snapshot
1749                    .cursor()
1750                    .range(&**Array::from_iter([&object_id_js, &zero_event_id_js])..=&**Array::from_iter([&object_id_js, &max_event_id_js]))
1751                    .wrap_with_context(|| format!("limiting the latest snapshot to recover to those of {object_id:?}"))?
1752                    .direction(CursorDirection::Prev)
1753                    .open()
1754                    .await
1755                    .wrap_with_context(|| format!("opening cursor for the latest snapshot of {object_id:?}"))?
1756                    .value()
1757                    .ok_or_else(|| crate::Error::Other(anyhow!("cannot find a latest snapshot for {object_id:?}")))?;
1758                let latest_snapshot_meta = serde_wasm_bindgen::from_value::<SnapshotMeta>(latest_snapshot_meta_js)
1759                    .wrap_with_context(|| format!("deserializing the latest known snapshot for {object_id:?}"))?;
1760                let now_have_all_until = latest_snapshot_meta.have_all_until;
1761
1762                // Clear all snapshots after the removed event
1763                let mut to_clear = object_snapshot
1764                    .cursor()
1765                    .range(&**Array::from_iter([&object_id_js, &event_id_js])..=&**Array::from_iter([&object_id_js, &max_event_id_js]))
1766                    .wrap_with_context(|| format!("limiting the snapshots to delete to only those of {object_id:?} after {event_id:?}"))?
1767                    .open()
1768                    .await
1769                    .wrap_with_context(|| format!("opening cursor of snapshots to delete for {object_id:?} after {event_id:?}"))?;
1770                while let Some(snapshot_meta_js) = to_clear.value() {
1771                    let snapshot_meta = serde_wasm_bindgen::from_value::<SnapshotMeta>(snapshot_meta_js)
1772                        .wrap_with_context(|| format!("deserializing to-clear snapshot of {object_id:?}, after {event_id:?}"))?;
1773                    let snapshot_id = snapshot_meta.snapshot_id;
1774                    snapshots.delete(&snapshot_id.to_js_string()).await.wrap_with_context(|| format!("deleting snapshot {snapshot_id:?}"))?;
1775                    to_clear.delete().await.wrap_with_context(|| format!("deleting snapshot metadata {snapshot_id:?}"))?;
1776                    to_clear.advance(1).await.wrap_context("getting next snapshot to delete")?;
1777                }
1778
1779                // Find the last remaining snapshot for the object
1780                let last_snapshot_meta_js = object_snapshot
1781                    .cursor()
1782                    .range(&**Array::from_iter([&object_id_js, &zero_event_id_js])..=&**Array::from_iter([&object_id_js, &max_event_id_js]))
1783                    .wrap_with_context(|| format!("limiting the last snapshot to recover to those of {object_id:?}"))?
1784                    .direction(CursorDirection::Prev)
1785                    .open()
1786                    .await
1787                    .wrap_with_context(|| format!("opening cursor for the last snapshot of {object_id:?}"))?
1788                    .value()
1789                    .ok_or_else(|| crate::Error::Other(anyhow!("cannot find a latest snapshot for {object_id:?}")))?;
1790                let last_snapshot_meta = serde_wasm_bindgen::from_value::<SnapshotMeta>(last_snapshot_meta_js)
1791                    .wrap_with_context(|| format!("deserializing the last known snapshot for {object_id:?}"))?;
1792                let last_snapshot_id = last_snapshot_meta.snapshot_id;
1793                let last_snapshot_id_js = last_snapshot_id.to_js_string();
1794                let last_snapshot_js = snapshots.get(&last_snapshot_id_js)
1795                    .await
1796                    .wrap_with_context(|| format!("retrieving last available snapshot {last_snapshot_id:?}"))?
1797                    .ok_or_else(|| crate::Error::Other(anyhow!("cannot retrieve snapshot data for {last_snapshot_id:?}")))?;
1798                let mut last_snapshot = parse_snapshot_js::<T>(last_snapshot_meta.snapshot_version, last_snapshot_js)
1799                    .wrap_with_context(|| format!("deserializing snapshot data {last_snapshot_id:?}"))?;
1800
1801                // Apply all the events since the last snapshot (excluded)
1802                let last_applied_event_id = apply_events_after(&transaction, &mut last_snapshot, object_id, last_snapshot_id).await?;
1803
1804                // Save the new last snapshot
1805                let new_last_snapshot_meta = SnapshotMeta {
1806                    snapshot_id: last_applied_event_id,
1807                    type_id: *T::type_ulid(),
1808                    object_id,
1809                    is_creation: None,
1810                    is_latest: Some(1),
1811                    normalizer_version: normalizer_version(),
1812                    snapshot_version: T::snapshot_version(),
1813                    have_all_until: now_have_all_until,
1814                    importance: None,
1815                    importance_from_queries: None,
1816                    required_binaries: last_snapshot.required_binaries(),
1817                };
1818                let last_applied_event_id_js = to_js(last_applied_event_id)
1819                    .wrap_with_context(|| format!("serializing {last_applied_event_id:?}"))?;
1820                let new_last_snapshot_meta_js = to_js(new_last_snapshot_meta)
1821                    .wrap_with_context(|| format!("serializing the snapshot metadata for {object_id:?} at {last_applied_event_id:?}"))?;
1822                let new_last_snapshot_js = to_js(last_snapshot)
1823                    .wrap_with_context(|| format!("serializing the snapshot for {object_id:?} at {last_applied_event_id:?}"))?;
1824                snapshots_meta.put(&new_last_snapshot_meta_js).await.wrap_with_context(|| format!("saving new last snapshot metadata for {object_id:?} at {last_applied_event_id:?}"))?;
1825                snapshots.put_kv(&last_applied_event_id_js, &new_last_snapshot_js)
1826                    .await
1827                    .wrap_with_context(|| format!("saving new last snapshot data for {object_id:?} at {last_applied_event_id:?}"))?;
1828
1829                Ok(())
1830            })
1831            .await
1832            .wrap_with_context(|| {
1833                format!("removing {event_id:?} on {object_id:?}")
1834            })
1835    }
1836
1837    async fn set_object_importance(
1838        &self,
1839        object_id: ObjectId,
1840        importance: Importance,
1841    ) -> crate::Result<()> {
1842        let object_id_js = object_id.to_js_string();
1843
1844        let res = self
1845            .db
1846            .transaction(&["snapshots_meta"])
1847            .rw()
1848            .run(move |transaction| async move {
1849                let snapshots_meta = transaction
1850                    .object_store("snapshots_meta")
1851                    .wrap_context("retrieving the 'snapshots_meta' object store")?;
1852
1853                let creation_object = snapshots_meta
1854                    .index("creation_object")
1855                    .wrap_context("retrieving the 'creation_object' index")?;
1856
1857                let Some(snapshot_js) = creation_object
1858                    .get(&Array::from_iter([&JsValue::from(1), &object_id_js]))
1859                    .await
1860                    .wrap_context("retrieving creation snapshot")?
1861                else {
1862                    // Object was already removed from database, so it was already unlocked
1863                    return Ok(());
1864                };
1865
1866                let mut snapshot_meta = serde_wasm_bindgen::from_value::<SnapshotMeta>(snapshot_js)
1867                    .wrap_context("deserializing snapshot metadata")?;
1868                let importance_before = snapshot_meta.importance;
1869                snapshot_meta.importance = Some(importance);
1870
1871                if importance_before != snapshot_meta.importance {
1872                    let snapshot_js =
1873                        to_js(snapshot_meta).wrap_context("reserializing snapshot metadata")?;
1874                    snapshots_meta
1875                        .put(&snapshot_js)
1876                        .await
1877                        .wrap_context("saving the unlocked creation snapshot metadata")?;
1878                }
1879
1880                Ok(())
1881            })
1882            .await
1883            .wrap_with_context(|| format!("unlocking {object_id:?} from IndexedDB"));
1884        if res.is_ok() {
1885            self.objects_unlocked_this_run
1886                .set(self.objects_unlocked_this_run.get() + 1);
1887        }
1888        res
1889    }
1890
1891    async fn set_importance_from_queries(
1892        &self,
1893        object_id: ObjectId,
1894        importance_from_queries: Importance,
1895    ) -> crate::Result<()> {
1896        let object_id_js = object_id.to_js_string();
1897
1898        let res = self
1899            .db
1900            .transaction(&["snapshots_meta"])
1901            .rw()
1902            .run(move |transaction| async move {
1903                let snapshots_meta = transaction
1904                    .object_store("snapshots_meta")
1905                    .wrap_context("retrieving the 'snapshots_meta' object store")?;
1906
1907                let creation_object = snapshots_meta
1908                    .index("creation_object")
1909                    .wrap_context("retrieving the 'creation_object' index")?;
1910
1911                let Some(snapshot_js) = creation_object
1912                    .get(&Array::from_iter([&JsValue::from(1), &object_id_js]))
1913                    .await
1914                    .wrap_context("retrieving creation snapshot")?
1915                else {
1916                    // Object was already removed from database, so it was already unlocked
1917                    return Ok(());
1918                };
1919
1920                let mut snapshot_meta = serde_wasm_bindgen::from_value::<SnapshotMeta>(snapshot_js)
1921                    .wrap_context("deserializing snapshot metadata")?;
1922                let importance_from_queries_before = snapshot_meta.importance_from_queries;
1923                snapshot_meta.importance_from_queries = Some(importance_from_queries);
1924
1925                if importance_from_queries_before != snapshot_meta.importance_from_queries {
1926                    let snapshot_js =
1927                        to_js(snapshot_meta).wrap_context("reserializing snapshot metadata")?;
1928                    snapshots_meta
1929                        .put(&snapshot_js)
1930                        .await
1931                        .wrap_context("saving the unlocked creation snapshot metadata")?;
1932                }
1933
1934                Ok(())
1935            })
1936            .await
1937            .wrap_with_context(|| format!("unlocking {object_id:?} from IndexedDB"));
1938        if res.is_ok() {
1939            self.objects_unlocked_this_run
1940                .set(self.objects_unlocked_this_run.get() + 1);
1941        }
1942        res
1943    }
1944
1945    async fn set_query_importance(
1946        &self,
1947        _query_id: QueryId,
1948        _importance: Importance,
1949        _objects_matched_by_query: Vec<ObjectId>,
1950    ) -> crate::Result<()> {
1951        todo!() // TODO(api-highest): implement once API has been rethought
1952    }
1953
1954    async fn client_vacuum(
1955        &self,
1956        mut notify_removals: impl 'static + FnMut(ObjectId),
1957        mut notify_query_removals: impl 'static + FnMut(QueryId),
1958    ) -> crate::Result<()> {
1959        let zero_id = ObjectId::from_u128(0).to_js_string();
1960        let max_id = ObjectId::from_u128(u128::MAX).to_js_string();
1961
1962        let res = self
1963            .db
1964            .transaction(&[
1965                "snapshots",
1966                "snapshots_meta",
1967                "events",
1968                "events_meta",
1969                "queries_meta",
1970                "upload_queue_meta",
1971                "binaries",
1972            ])
1973            .rw()
1974            .run(move |transaction| async move {
1975                let snapshots_meta = transaction
1976                    .object_store("snapshots_meta")
1977                    .wrap_context("retrieving the 'snapshots_meta' object store")?;
1978                let events_meta = transaction
1979                    .object_store("events_meta")
1980                    .wrap_context("retrieving the 'events_meta' object store")?;
1981                let snapshots = transaction
1982                    .object_store("snapshots")
1983                    .wrap_context("retrieving the 'snapshots' object store")?;
1984                let events = transaction
1985                    .object_store("events")
1986                    .wrap_context("retrieving the 'events' object store")?;
1987                let queries_meta = transaction
1988                    .object_store("queries_meta")
1989                    .wrap_context("retrieving the 'queries_meta' object store")?;
1990                let binaries = transaction
1991                    .object_store("binaries")
1992                    .wrap_context("retrieving the 'binaries' object store")?;
1993
1994                let object_snapshot = snapshots_meta
1995                    .index("object_snapshot")
1996                    .wrap_context("retrieving the 'object_snapshot' index")?;
1997                let creation_object = snapshots_meta
1998                    .index("creation_object")
1999                    .wrap_context("retrieving the 'creation_object' index")?;
2000
2001                let object_event = events_meta
2002                    .index("object_event")
2003                    .wrap_context("retrieving the 'object_event' index")?;
2004
2005                // Remove all unlocked queries
2006                let mut cursor = queries_meta
2007                    .cursor()
2008                    .open()
2009                    .await
2010                    .wrap_context("listing all queries")?;
2011                while let Some(query_meta_js) = cursor.value() {
2012                    let query_meta = serde_wasm_bindgen::from_value::<QueryMeta>(query_meta_js)
2013                        .wrap_context("deserializing query metadata")?;
2014                    if !query_meta.importance.lock() {
2015                        notify_query_removals(query_meta.query_id);
2016                        cursor
2017                            .delete()
2018                            .await
2019                            .wrap_context("removing unlocked query")?;
2020                    }
2021                    cursor
2022                        .advance(1)
2023                        .await
2024                        .wrap_context("moving cursor forward")?;
2025                }
2026
2027                // Remove all unlocked objects
2028                // TODO(perf-high): index on importance(s).lock = 0
2029                let mut to_remove = creation_object
2030                    .cursor()
2031                    .open()
2032                    .await
2033                    .wrap_context("listing unlocked objects")?;
2034                // TODO(perf-med): could trigger all deletion requests in parallel and only wait for them
2035                // all before listing still-required binaries, for performance
2036                while let Some(s) = to_remove.value() {
2037                    let s = serde_wasm_bindgen::from_value::<SnapshotMeta>(s)
2038                        .wrap_context("deserializing unlocked object")?;
2039                    // TODO(api-highest): the unwrap should go with the side-table
2040                    if s.importance.unwrap().lock() || s.importance_from_queries.unwrap().lock() {
2041                        // Do not delete locked objects
2042                        to_remove
2043                            .advance(1)
2044                            .await
2045                            .wrap_context("going to next to-remove object")?;
2046                        continue;
2047                    }
2048                    let object_id = s.object_id;
2049                    let object_id_js = object_id.to_js_string();
2050
2051                    // Remove all the snapshots
2052                    let mut snapshots_to_remove = object_snapshot
2053                        .cursor()
2054                        .range(
2055                            &**Array::from_iter([&object_id_js, &zero_id])
2056                                ..=&**Array::from_iter([&object_id_js, &max_id]),
2057                        )
2058                        .wrap_context("limiting the range to to-delete snapshots")?
2059                        .open()
2060                        .await
2061                        .wrap_context("opening cursor of to-delete snapshots")?;
2062                    while let Some(s) = snapshots_to_remove.value() {
2063                        let s = serde_wasm_bindgen::from_value::<SnapshotMeta>(s)
2064                            .wrap_context("deserializing to-remove snapshot metadata")?;
2065                        snapshots
2066                            .delete(&s.snapshot_id.to_js_string())
2067                            .await
2068                            .wrap_context("failed deleting snapshot")?;
2069                        snapshots_to_remove.delete().await.wrap_with_context(|| {
2070                            format!("failed deleting snapshot metadata of {object_id:?}")
2071                        })?;
2072                        snapshots_to_remove
2073                            .advance(1)
2074                            .await
2075                            .wrap_context("going to next to-remove snapshot")?;
2076                    }
2077
2078                    // Remove all the events
2079                    let mut events_to_remove = object_event
2080                        .cursor()
2081                        .range(
2082                            &**Array::from_iter([&object_id_js, &zero_id])
2083                                ..=&**Array::from_iter([&object_id_js, &max_id]),
2084                        )
2085                        .wrap_context("limiting the range to to-delete events")?
2086                        .open()
2087                        .await
2088                        .wrap_context("opening cursor of to-delete events")?;
2089                    while let Some(e) = events_to_remove.value() {
2090                        let e = serde_wasm_bindgen::from_value::<EventMeta>(e)
2091                            .wrap_context("deserializing to-remove event metadata")?;
2092                        events
2093                            .delete(&e.event_id.to_js_string())
2094                            .await
2095                            .wrap_context("failed deleting event")?;
2096                        events_to_remove.delete().await.wrap_with_context(|| {
2097                            format!("failed deleting event of {object_id:?}")
2098                        })?;
2099                        events_to_remove
2100                            .advance(1)
2101                            .await
2102                            .wrap_context("going to next to-remove event")?;
2103                    }
2104
2105                    // Notify the removal
2106                    notify_removals(object_id);
2107
2108                    // Continue
2109                    to_remove
2110                        .advance(1)
2111                        .await
2112                        .wrap_context("going to next to-remove object")?;
2113                }
2114
2115                let required_binaries = Self::list_required_binaries(&transaction)
2116                    .await
2117                    .wrap_context("listing still-required binaries")?;
2118                let mut binaries_cursor = binaries
2119                    .cursor()
2120                    .open()
2121                    .await
2122                    .wrap_context("opening cursor over all binaries")?;
2123                while let Some(b) = binaries_cursor.key() {
2124                    let b = serde_wasm_bindgen::from_value::<BinPtr>(b)
2125                        .wrap_context("deserializing binary id")?;
2126                    if !required_binaries.contains(&b) {
2127                        binaries_cursor
2128                            .delete()
2129                            .await
2130                            .wrap_with_context(|| format!("deleting {b:?}"))?;
2131                    }
2132                    binaries_cursor
2133                        .advance(1)
2134                        .await
2135                        .wrap_context("going to next binary")?;
2136                }
2137
2138                Ok(())
2139            })
2140            .await
2141            .wrap_context("vacuuming the database");
2142        if res.is_ok() {
2143            self.objects_unlocked_this_run.set(0);
2144        }
2145        res
2146    }
2147
2148    async fn list_uploads(&self) -> crate::Result<Vec<UploadId>> {
2149        // TODO(test-high): fuzz upload-queue behavior
2150        self.db
2151            .transaction(&["upload_queue_meta"])
2152            .run(move |transaction| async move {
2153                let keys = transaction
2154                    .object_store("upload_queue_meta")
2155                    .wrap_context("retrieving 'upload_queue_meta' object store")?
2156                    .get_all_keys(None)
2157                    .await
2158                    .wrap_context("getting all keys from upload_queue_meta")?;
2159                let mut res = Vec::with_capacity(keys.len());
2160                for k in keys.into_iter() {
2161                    let k = serde_wasm_bindgen::from_value::<UploadId>(k)
2162                        .wrap_context("deserializing upload id")?;
2163                    res.push(k);
2164                }
2165                Ok(res)
2166            })
2167            .await
2168            .wrap_context("listing upload queue")
2169    }
2170
2171    async fn get_upload(&self, upload_id: UploadId) -> crate::Result<Option<Upload>> {
2172        self.db
2173            .transaction(&["upload_queue"])
2174            .run(move |transaction| async move {
2175                let Some(res) = transaction
2176                    .object_store("upload_queue")
2177                    .wrap_context("retrieving 'upload_queue' object store")?
2178                    .get(&JsValue::try_from(upload_id.0).unwrap())
2179                    .await
2180                    .wrap_context("fetching data from upload_queue store")?
2181                else {
2182                    return Ok(None);
2183                };
2184                let res = serde_wasm_bindgen::from_value::<Upload>(res)
2185                    .wrap_context("deserializing data")?;
2186                Ok(Some(res))
2187            })
2188            .await
2189            .wrap_with_context(|| format!("retrieving data for {upload_id:?}"))
2190    }
2191
2192    async fn enqueue_upload(
2193        &self,
2194        upload: Upload,
2195        required_binaries: Vec<BinPtr>,
2196    ) -> crate::Result<UploadId> {
2197        let metadata = UploadMeta { required_binaries };
2198        let metadata = to_js(metadata).wrap_context("serializing upload metadata")?;
2199        let data = to_js(upload).wrap_context("serializing upload data")?;
2200        self.db
2201            .transaction(&["upload_queue", "upload_queue_meta"])
2202            .rw()
2203            .run(move |transaction| async move {
2204                let upload_id = transaction
2205                    .object_store("upload_queue_meta")
2206                    .wrap_context("retrieving 'upload_queue_meta' object store")?
2207                    .add(&metadata)
2208                    .await
2209                    .wrap_context("saving upload metadata")?;
2210                transaction
2211                    .object_store("upload_queue")
2212                    .wrap_context("retrieving 'upload_queue' object store")?
2213                    .add_kv(&upload_id, &data)
2214                    .await
2215                    .wrap_context("saving upload data")?;
2216                let upload_id = serde_wasm_bindgen::from_value::<UploadId>(upload_id)
2217                    .wrap_context("deserializing upload id")?;
2218                Ok(upload_id)
2219            })
2220            .await
2221            .wrap_context("registering not-yet-completed upload")
2222    }
2223
2224    async fn upload_finished(&self, upload_id: UploadId) -> crate::Result<()> {
2225        let res = self
2226            .db
2227            .transaction(&["upload_queue", "upload_queue_meta"])
2228            .rw()
2229            .run(move |transaction| async move {
2230                let upload_id = to_js(upload_id).wrap_context("serializing upload id")?;
2231                transaction
2232                    .object_store("upload_queue")
2233                    .wrap_context("retrieving 'upload_queue' object store")?
2234                    .delete(&upload_id)
2235                    .await
2236                    .wrap_context("deleting upload data")?;
2237                transaction
2238                    .object_store("upload_queue_meta")
2239                    .wrap_context("retrieving 'upload_queue_meta' object store")?
2240                    .delete(&upload_id)
2241                    .await
2242                    .wrap_context("deleting upload metadata")?;
2243                Ok(())
2244            })
2245            .await
2246            .wrap_with_context(|| format!("registering {upload_id:?} as having completed"));
2247        if res.is_ok() {
2248            self.objects_unlocked_this_run
2249                .set(self.objects_unlocked_this_run.get() + 1);
2250        }
2251        res
2252    }
2253
2254    // TODO(api-highest): go through all the crates' API, one by one, and check that eg. all trait methods are indeed required
2255    async fn get_saved_objects(&self) -> crate::Result<HashMap<ObjectId, SavedObjectMeta>> {
2256        // TODO(test-high): fuzz this, and all other LocalDb functions
2257        // TODO(test-high): fuzz connection handling
2258        let zero_id = TypeId::from_u128(0).to_js_string();
2259        let max_id = TypeId::from_u128(u128::MAX).to_js_string();
2260        self.db
2261            .transaction(&["snapshots", "snapshots_meta"])
2262            .run(|transaction| async move {
2263                let snapshots_meta = transaction
2264                    .object_store("snapshots_meta")
2265                    .wrap_context("retrieving snapshots_meta object store")?;
2266                let creation_type_object = snapshots_meta
2267                    .index("creation_type_object")
2268                    .wrap_context("opening 'creation_type_object' snapshot index")?;
2269                let mut cursor = creation_type_object
2270                    .cursor()
2271                    .range(
2272                        &**Array::from_iter([&JsValue::from(1), &zero_id, &zero_id])
2273                            ..=&**Array::from_iter([&JsValue::from(1), &max_id, &max_id]),
2274                    )
2275                    .wrap_context("limiting cursor to only creation snapshots")?
2276                    .open()
2277                    .await
2278                    .wrap_context("opening cursor over all creation objects")?;
2279                let mut res = HashMap::new();
2280                while let Some(snapshot_meta_js) = cursor.value() {
2281                    let snapshot_meta =
2282                        serde_wasm_bindgen::from_value::<SnapshotMeta>(snapshot_meta_js)
2283                            .wrap_context("deserializing snapshot metadata")?;
2284                    res.insert(
2285                        snapshot_meta.object_id,
2286                        SavedObjectMeta {
2287                            type_id: snapshot_meta.type_id,
2288                            // TODO(api-highest): this is wrong, see todo about using a side-table
2289                            have_all_until: snapshot_meta.have_all_until,
2290                            // TODO(api-highest): check this is correct
2291                            importance: snapshot_meta.importance.unwrap(),
2292                        },
2293                    );
2294                    cursor
2295                        .advance(1)
2296                        .await
2297                        .wrap_context("going to next object in the database")?;
2298                }
2299                Ok(res)
2300            })
2301            .await
2302            .wrap_context("listing subscribed objects")
2303    }
2304
2305    async fn get_saved_queries(&self) -> crate::Result<HashMap<QueryId, SavedQuery>> {
2306        self.db
2307            .transaction(&["queries_meta"])
2308            .run(|transaction| async move {
2309                let queries_meta = transaction
2310                    .object_store("queries_meta")
2311                    .wrap_context("retrieving queries_meta object store")?;
2312                let queries = queries_meta
2313                    .get_all(None)
2314                    .await
2315                    .wrap_context("listing subscribed queries")?;
2316                let res = queries
2317                    .into_iter()
2318                    .map(|q| {
2319                        let q = serde_wasm_bindgen::from_value::<QueryMeta>(q)
2320                            .wrap_context("deserializing query metadata")?;
2321                        Ok((
2322                            q.query_id,
2323                            SavedQuery {
2324                                query: q.query,
2325                                type_id: q.type_id,
2326                                have_all_until: q.have_all_until,
2327                                importance: q.importance,
2328                            },
2329                        ))
2330                    })
2331                    .collect();
2332                res
2333            })
2334            .await
2335            .wrap_context("listing subscribed objects")
2336    }
2337
2338    async fn record_query(
2339        &self,
2340        query_id: QueryId,
2341        query: Arc<Query>,
2342        type_id: TypeId,
2343        importance: Importance,
2344    ) -> crate::Result<()> {
2345        let new_query = QueryMeta {
2346            query_id,
2347            query,
2348            type_id,
2349            importance,
2350            have_all_until: None,
2351        };
2352        let new_query_js = to_js(new_query).wrap_context("serializing query metadata")?;
2353        self.db
2354            .transaction(&["queries_meta"])
2355            .rw()
2356            .run(|transaction| async move {
2357                let queries_meta = transaction
2358                    .object_store("queries_meta")
2359                    .wrap_context("retrieving queries_meta object store")?;
2360                queries_meta
2361                    .put(&new_query_js)
2362                    .await
2363                    .wrap_context("inserting the new query")?;
2364                Ok(())
2365            })
2366            .await
2367            .wrap_context("subscribing to query")
2368    }
2369
2370    async fn forget_query(
2371        &self,
2372        query_id: QueryId,
2373        objects_matching_query: Vec<ObjectId>,
2374    ) -> crate::Result<()> {
2375        let query_id_js = query_id.to_js_string();
2376        self.db
2377            .transaction(&["queries_meta", "snapshots_meta"])
2378            .rw()
2379            .run(move |transaction| async move {
2380                let queries_meta = transaction
2381                    .object_store("queries_meta")
2382                    .wrap_context("retrieving queries_meta object store")?;
2383                let snapshots_meta = transaction
2384                    .object_store("snapshots_meta")
2385                    .wrap_context("retrieving snapshots_meta object store")?;
2386                let creation_object = snapshots_meta
2387                    .index("creation_object")
2388                    .wrap_context("retrieving creation_object index")?;
2389
2390                queries_meta
2391                    .delete(&query_id_js)
2392                    .await
2393                    .wrap_context("removing the unsubscribed query")?;
2394                for object_id in objects_matching_query {
2395                    let Some(snapshot_meta_js) = creation_object
2396                        .get(&**Array::from_iter([
2397                            &JsValue::from(1),
2398                            &object_id.to_js_string(),
2399                        ]))
2400                        .await
2401                        .wrap_context("fetching existing snapshot metadata")?
2402                    else {
2403                        tracing::error!(
2404                            ?object_id,
2405                            "object supposed to get unlocked does not actually exist"
2406                        );
2407                        continue;
2408                    };
2409                    let mut snapshot_meta =
2410                        serde_wasm_bindgen::from_value::<SnapshotMeta>(snapshot_meta_js)
2411                            .wrap_context("deserializing snapshot metadata")?;
2412                    // TODO(api-highest): this is definitely wrong.
2413                    let importance_from_queries_before = snapshot_meta.importance_from_queries;
2414                    snapshot_meta.importance_from_queries = Some(Importance::NONE);
2415                    if importance_from_queries_before != snapshot_meta.importance_from_queries {
2416                        let snapshot_meta_js =
2417                            to_js(snapshot_meta).wrap_context("serializing snapshot metadata")?;
2418                        snapshots_meta
2419                            .put(&snapshot_meta_js)
2420                            .await
2421                            .wrap_context("saving unlocked-for-queries snapshot")?;
2422                    }
2423                }
2424                Ok(())
2425            })
2426            .await
2427            .wrap_context("subscribing to query")
2428    }
2429
2430    async fn update_queries(
2431        &self,
2432        queries: &HashSet<QueryId>,
2433        now_have_all_until: Updatedness,
2434    ) -> crate::Result<()> {
2435        let queries = queries.clone();
2436        self.db
2437            .transaction(&["queries_meta"])
2438            .rw()
2439            .run(move |transaction| async move {
2440                let queries_meta = transaction
2441                    .object_store("queries_meta")
2442                    .wrap_context("retrieving queries_meta object store")?;
2443                for query_id in queries {
2444                    let Some(query_meta_js) = queries_meta
2445                        .get(&query_id.to_js_string())
2446                        .await
2447                        .wrap_context("fetching existing query metadata")?
2448                    else {
2449                        tracing::error!(
2450                            ?query_id,
2451                            "query supposed to get updated does not actually exist"
2452                        );
2453                        continue;
2454                    };
2455                    let mut query_meta = serde_wasm_bindgen::from_value::<QueryMeta>(query_meta_js)
2456                        .wrap_context("deserializing query metadata")?;
2457                    query_meta.have_all_until = Some(now_have_all_until);
2458                    let query_meta_js =
2459                        to_js(query_meta).wrap_context("reserializing query metadata")?;
2460                    queries_meta
2461                        .put(&query_meta_js)
2462                        .await
2463                        .wrap_context("saving updated query metadata")?;
2464                }
2465                Ok(())
2466            })
2467            .await
2468            .wrap_context("subscribing to query")
2469    }
2470}
2471
2472async fn reencode_snapshot<T: Object>(
2473    transaction: &indexed_db::Transaction<crate::Error>,
2474    snapshot_id: EventId,
2475    snapshot_version: i32,
2476) -> crate::Result<()> {
2477    let snapshot_id_js = snapshot_id.to_js_string();
2478    let snapshots = transaction
2479        .object_store("snapshots")
2480        .wrap_context("retrieving 'snapshots' object store")?;
2481    let snapshot_js = snapshots
2482        .get(&snapshot_id_js)
2483        .await
2484        .wrap_context("fetching snapshot to update")?
2485        .ok_or_else(|| {
2486            crate::Error::Other(anyhow!(
2487                "Snapshot {snapshot_id:?} does not exist despite having associated metadata"
2488            ))
2489        })?;
2490    let snapshot_json = serde_wasm_bindgen::from_value::<serde_json::Value>(snapshot_js)
2491        .wrap_context("deserializing indexeddb data as json-value")?;
2492    let value = T::from_old_snapshot(snapshot_version, snapshot_json)
2493        .wrap_context("parsing old snapshot")?;
2494    let new_snapshot_js = to_js(value).wrap_context("serializing snapshot data")?;
2495    snapshots
2496        .put_kv(&snapshot_id_js, &new_snapshot_js)
2497        .await
2498        .wrap_context("updating saved snapshot data")?;
2499    Ok(())
2500}
2501
2502async fn check_required_binaries(
2503    binaries_store: indexed_db::ObjectStore<crate::Error>,
2504    binaries: Vec<BinPtr>,
2505) -> crate::Result<()> {
2506    let missing_binaries = future::try_join_all(binaries.iter().map(|&b| {
2507        binaries_store
2508            .contains(&b.to_js_string())
2509            .map_ok(move |present| (!present).then_some(b))
2510    }))
2511    .await
2512    .wrap_with_context(|| format!("checking for required binaries {binaries:?}"))?
2513    .into_iter()
2514    .filter_map(|b| b)
2515    .collect::<Vec<_>>();
2516
2517    if !missing_binaries.is_empty() {
2518        return Err(crate::Error::MissingBinaries(missing_binaries));
2519    }
2520
2521    Ok(())
2522}
2523
2524async fn apply_events_after<T: Object>(
2525    transaction: &indexed_db::Transaction<crate::Error>,
2526    object: &mut T,
2527    object_id: ObjectId,
2528    mut last_applied_event_id: EventId,
2529) -> indexed_db::Result<EventId, crate::Error> {
2530    let events = transaction
2531        .object_store("events")
2532        .wrap_context("retrieving 'events' object store")?;
2533    let object_event = transaction
2534        .object_store("events_meta")
2535        .wrap_context("retrieving 'events_meta' object store")?
2536        .index("object_event")
2537        .wrap_context("retrieving 'object_event' index")?;
2538
2539    let object_id_js = object_id.to_js_string();
2540    let last_applied_event_id_js = last_applied_event_id.to_js_string();
2541    let max_event_id_js = EventId::from_u128(u128::MAX).to_js_string();
2542
2543    let mut to_apply = object_event.cursor()
2544        .range((
2545            Bound::Excluded(&**Array::from_iter([&object_id_js, &last_applied_event_id_js])),
2546            Bound::Included(&**Array::from_iter([&object_id_js, &max_event_id_js])),
2547        ))
2548        .wrap_with_context(|| format!("limiting the events to apply to those on {object_id:?} since {last_applied_event_id:?}"))?
2549        .open()
2550        .await
2551        .wrap_with_context(|| format!("opening cursor for the events to apply on {object_id:?} since {last_applied_event_id:?}"))?;
2552    while let Some(apply_event_meta_js) = to_apply.value() {
2553        let apply_event_meta = serde_wasm_bindgen::from_value::<EventMeta>(apply_event_meta_js)
2554            .wrap_with_context(|| format!("deserializing event to apply on {object_id:?}"))?;
2555        let apply_event_id = apply_event_meta.event_id;
2556        let apply_event_id_js = apply_event_id.to_js_string();
2557        let apply_event_js = events
2558            .get(&apply_event_id_js)
2559            .await
2560            .wrap_with_context(|| format!("recovering event data for {apply_event_id:?}"))?
2561            .ok_or_else(|| {
2562                crate::Error::Other(anyhow!(
2563                    "no event data for event with metadata {apply_event_id:?}"
2564                ))
2565            })?;
2566        let apply_event = serde_wasm_bindgen::from_value::<T::Event>(apply_event_js)
2567            .wrap_with_context(|| format!("deserializing event data for {apply_event_id:?}"))?;
2568        object.apply(DbPtr::from(object_id), &apply_event);
2569        last_applied_event_id = apply_event_id;
2570        to_apply
2571            .advance(1)
2572            .await
2573            .wrap_context("getting next event to apply")?;
2574    }
2575
2576    Ok(last_applied_event_id)
2577}
2578
2579fn to_js<T: serde::Serialize>(v: T) -> std::result::Result<JsValue, serde_wasm_bindgen::Error> {
2580    static JSON_SERIALIZER: serde_wasm_bindgen::Serializer =
2581        serde_wasm_bindgen::Serializer::json_compatible();
2582    v.serialize(&JSON_SERIALIZER)
2583}