1#![cfg(target_arch = "wasm32")]
2
3use anyhow::anyhow;
4use crdb_core::{check_strings, ClientStorageInfo, LoginInfo, SavedObjectMeta, SavedQuery};
5use crdb_core::{
6 normalizer_version, BinPtr, ClientSideDb, Db, DbPtr, Event, EventId, Importance, Object,
7 ObjectId, Query, QueryId, ResultExt, TypeId, Updatedness, Upload, UploadId,
8};
9use crdb_helpers::parse_snapshot_js;
10use futures::{future, TryFutureExt};
11use indexed_db::CursorDirection;
12use js_sys::{Array, JsString, Uint8Array};
13use std::{
14 cell::Cell,
15 collections::{HashMap, HashSet},
16 ops::Bound,
17 sync::Arc,
18};
19use wasm_bindgen::{JsCast, JsValue};
20
21pub use crdb_core::{Error, Result};
22
23const OBJECT_STORE_LIST: &[&str] = &[
24 "binaries",
25 "config",
26 "events",
27 "events_meta",
28 "queries_meta",
29 "snapshots",
30 "snapshots_meta",
31 "upload_queue",
32 "upload_queue_meta",
33];
34
35const CONFIG_SAVED_LOGIN: &str = "login";
36
37#[derive(Debug, PartialEq, serde::Deserialize, serde::Serialize)]
38struct SnapshotMeta {
39 snapshot_id: EventId,
40 type_id: TypeId,
41 object_id: ObjectId,
42 is_creation: Option<usize>, is_latest: Option<usize>, normalizer_version: i32,
45 snapshot_version: i32,
46 have_all_until: Option<Updatedness>, importance: Option<Importance>,
50 importance_from_queries: Option<Importance>,
51 required_binaries: Vec<BinPtr>,
52}
53
54#[derive(Debug, PartialEq, serde::Deserialize, serde::Serialize)]
55struct EventMeta {
56 event_id: EventId,
57 object_id: ObjectId,
58 required_binaries: Vec<BinPtr>,
59}
60
61#[derive(Debug, serde::Deserialize, serde::Serialize)]
62struct QueryMeta {
63 query_id: QueryId,
64 query: Arc<Query>,
65 type_id: TypeId,
66 have_all_until: Option<Updatedness>,
67 importance: Importance,
68}
69
70#[derive(Debug, serde::Deserialize, serde::Serialize)]
71struct UploadMeta {
72 required_binaries: Vec<BinPtr>,
73}
74
75pub struct IndexedDb {
76 is_persistent: bool,
77 db: indexed_db::Database<crate::Error>,
78 objects_unlocked_this_run: Cell<usize>,
79}
80
81impl IndexedDb {
82 pub async fn connect(url: &str) -> anyhow::Result<IndexedDb> {
83 #[cfg(not(feature = "_tests"))]
84 let is_persistent = {
85 let window = web_sys::window().ok_or_else(|| anyhow!("not running in a browser"))?;
87 let persist_fut = window.navigator().storage().persist().wrap_context(
88 "failed to request persistence, did the user disable storage altogether?",
89 )?;
90 wasm_bindgen_futures::JsFuture::from(persist_fut)
91 .await
92 .wrap_context("failed to resolve request for persistence")?
93 .as_bool()
94 .ok_or_else(|| anyhow!("requesting for persistence did not return a boolean"))?
95 };
96 #[cfg(feature = "_tests")]
97 let is_persistent = false;
98
99 let factory = indexed_db::Factory::get().wrap_context("getting IndexedDb factory")?;
100
101 const VERSION: u32 = 1;
102 let db = factory
103 .open(url, VERSION, |evt| async move {
104 let db = evt.database();
105
106 db.build_object_store("config").create()?;
108 db.build_object_store("snapshots").create()?;
109 db.build_object_store("events").create()?;
110 db.build_object_store("binaries").create()?;
111 db.build_object_store("upload_queue").create()?;
112 db.build_object_store("queries_meta")
113 .key_path("query_id")
114 .create()?;
115 let snapshots_meta = db
116 .build_object_store("snapshots_meta")
117 .key_path("snapshot_id")
118 .create()?;
119 let events_meta = db
120 .build_object_store("events_meta")
121 .key_path("event_id")
122 .create()?;
123 let upload_queue_meta = db
124 .build_object_store("upload_queue_meta")
125 .auto_increment()
126 .create()?;
127
128 snapshots_meta
129 .build_compound_index(
130 "latest_type_object",
131 &["is_latest", "type_id", "object_id"],
132 )
133 .unique()
134 .create()?;
135 snapshots_meta
136 .build_compound_index("creation_object", &["is_creation", "object_id"])
137 .unique()
138 .create()?;
139 snapshots_meta
140 .build_compound_index("object_snapshot", &["object_id", "snapshot_id"])
141 .create()?;
142 snapshots_meta
143 .build_index("required_binaries", "required_binaries")
144 .multi_entry()
145 .create()?;
146
147 events_meta
148 .build_compound_index("object_event", &["object_id", "event_id"])
149 .create()?;
150 events_meta
151 .build_index("required_binaries", "required_binaries")
152 .multi_entry()
153 .create()?;
154
155 upload_queue_meta
156 .build_index("required_binaries", "required_binaries")
157 .multi_entry()
158 .create()?;
159
160 Ok(())
161 })
162 .await
163 .wrap_with_context(|| format!("opening IndexedDb {url:?} at version {VERSION}"))?;
164
165 Ok(IndexedDb {
166 is_persistent,
167 db,
168 objects_unlocked_this_run: Cell::new(0),
169 })
170 }
171
172 #[cfg(feature = "_tests")]
173 pub fn close(&self) {
174 self.db.close();
175 }
176
177 pub fn is_persistent(&self) -> bool {
178 self.is_persistent
179 }
180
181 async fn list_required_binaries(
182 transaction: &indexed_db::Transaction<crate::Error>,
183 ) -> crate::Result<HashSet<BinPtr>> {
184 let snapshots_meta = transaction
185 .object_store("snapshots_meta")
186 .wrap_context("opening 'snapshots_meta' object store")?;
187 let events_meta = transaction
188 .object_store("events_meta")
189 .wrap_context("opening 'events_meta' object store")?;
190 let upload_queue_meta = transaction
191 .object_store("upload_queue_meta")
192 .wrap_context("opening 'upload_queue_meta' object store")?;
193
194 let snapshot_required_binaries = snapshots_meta
195 .index("required_binaries")
196 .wrap_context("opening 'required_binaries' snapshot index")?;
197 let event_required_binaries = events_meta
198 .index("required_binaries")
199 .wrap_context("opening 'required_binaries' event index")?;
200 let upload_queue_required_binaries = upload_queue_meta
201 .index("required_binaries")
202 .wrap_context("opening 'required_binaries' upload_queue index")?;
203
204 let required_binaries = snapshot_required_binaries
205 .get_all_keys(None)
206 .await
207 .wrap_context("listing all required binaries for snapshots")?
208 .into_iter()
209 .chain(
210 event_required_binaries
211 .get_all_keys(None)
212 .await
213 .wrap_context("listing all required binaries for events")?
214 .into_iter(),
215 )
216 .chain(
217 upload_queue_required_binaries
218 .get_all_keys(None)
219 .await
220 .wrap_context("listing all required binaries for the upload queue")?
221 .into_iter(),
222 );
223 let mut res = HashSet::new();
224 for b in required_binaries {
225 let b = serde_wasm_bindgen::from_value::<BinPtr>(b).wrap_context("parsing BinPtr")?;
226 res.insert(b);
227 }
228
229 Ok(res)
230 }
231
232 async fn create_impl<T: Object>(
233 transaction: indexed_db::Transaction<crate::Error>,
234 object_id: ObjectId,
235 created_at: EventId,
236 object: Arc<T>,
237 updatedness: Option<Updatedness>,
238 importance: Importance,
239 importance_from_queries: Importance,
240 ) -> std::result::Result<Option<Arc<T>>, indexed_db::Error<crate::Error>> {
241 let mut new_snapshot_meta = SnapshotMeta {
242 snapshot_id: created_at,
243 type_id: *T::type_ulid(),
244 object_id,
245 is_creation: Some(1),
246 is_latest: Some(1),
247 normalizer_version: normalizer_version(),
248 snapshot_version: T::snapshot_version(),
249 have_all_until: updatedness,
250 importance: Some(importance),
251 importance_from_queries: Some(importance_from_queries),
252 required_binaries: object.required_binaries(),
253 };
254 let object_id_js = object_id.to_js_string();
255 let new_snapshot_id_js = created_at.to_js_string();
256 let new_snapshot_meta_js = to_js(&new_snapshot_meta)
257 .wrap_with_context(|| format!("serializing metadata for {object_id:?}"))?;
258 let new_snapshot_js =
259 to_js(&*object).wrap_with_context(|| format!("serializing {object_id:?}"))?;
260 let required_binaries = object.required_binaries();
261 check_strings(&serde_json::to_value(&*object).wrap_context("serializing to json")?)?;
263
264 let snapshots = transaction
265 .object_store("snapshots")
266 .wrap_context("retrieving 'snapshots' object store")?;
267 let snapshots_meta = transaction
268 .object_store("snapshots_meta")
269 .wrap_context("retrieving 'snapshots_meta' object store")?;
270 let events = transaction
271 .object_store("events")
272 .wrap_context("retrieving 'events' object store")?;
273 let binaries = transaction
274 .object_store("binaries")
275 .wrap_context("retrieving 'binaries' object store")?;
276
277 let creation_object = snapshots_meta
278 .index("creation_object")
279 .wrap_context("retrieving 'creation_object' index")?;
280
281 if let Some(old_meta_js) = creation_object
283 .get(&Array::from_iter([&JsValue::from(1), &object_id_js]))
284 .await
285 .wrap_context("checking whether {object_id:?} already existed")?
286 {
287 let mut old_meta = serde_wasm_bindgen::from_value::<SnapshotMeta>(old_meta_js)
289 .wrap_with_context(|| {
290 format!("deserializing preexisting snapshot metadata for {object_id:?}")
291 })?;
292 new_snapshot_meta.is_latest = old_meta.is_latest;
294 old_meta.have_all_until = std::cmp::max(old_meta.have_all_until, updatedness);
295 new_snapshot_meta.have_all_until = old_meta.have_all_until;
296 let importance_before = old_meta.importance.unwrap_or(Importance::NONE);
297 let importance_from_queries_before =
298 old_meta.importance_from_queries.unwrap_or(Importance::NONE);
299 let importance_after = importance_before | importance;
300 let importance_from_queries_after =
301 importance_from_queries_before | importance_from_queries;
302 old_meta.importance = Some(importance_after);
303 old_meta.importance_from_queries = Some(importance_from_queries_after);
304 new_snapshot_meta.importance = Some(importance_after);
305 new_snapshot_meta.importance_from_queries = Some(importance_from_queries_after);
306 if old_meta != new_snapshot_meta {
307 return Err(crate::Error::ObjectAlreadyExists(object_id).into());
308 }
309
310 let old_data_js = snapshots
312 .get(&new_snapshot_id_js)
313 .await
314 .wrap_with_context(|| format!("retrieving snapshot data for {created_at:?}"))?
315 .ok_or_else(|| {
316 crate::Error::Other(anyhow!(
317 "Snapshot metadata existed without data for {created_at:?}"
318 ))
319 })?;
320 let old_data = parse_snapshot_js::<T>(old_meta.snapshot_version, old_data_js)
321 .wrap_with_context(|| {
322 format!("deserializing preexisting snapshot for {created_at:?}")
323 })?;
324 if old_data != *object {
325 return Err(crate::Error::ObjectAlreadyExists(object_id).into());
326 }
327
328 if importance_before != importance_after {
330 todo!() }
332 if importance_from_queries_before != importance_from_queries_after {
333 todo!() }
335 return Ok(None);
336 }
337
338 match snapshots_meta.add(&new_snapshot_meta_js).await {
340 Err(indexed_db::Error::AlreadyExists) => {
341 Err(crate::Error::EventAlreadyExists(created_at).into())
343 }
344 Err(e) => Err(e),
345 Ok(_) => {
346 snapshots
348 .add_kv(&new_snapshot_id_js, &new_snapshot_js)
349 .await
350 .wrap_with_context(|| {
351 format!("saving new snapshot {created_at:?} in database")
352 })?;
353
354 if events
356 .contains(&new_snapshot_id_js)
357 .await
358 .wrap_with_context(|| {
359 format!("checking whether {created_at:?} already existed as an event")
360 })?
361 {
362 return Err(crate::Error::EventAlreadyExists(created_at).into());
363 }
364
365 check_required_binaries(binaries, required_binaries).await?;
367
368 Ok(Some(object))
369 }
370 }
371 }
372}
373
374impl Db for IndexedDb {
375 async fn create<T: Object>(
376 &self,
377 object_id: ObjectId,
378 created_at: EventId,
379 object: Arc<T>,
380 updatedness: Option<Updatedness>,
381 importance: Importance,
382 ) -> crate::Result<Option<Arc<T>>> {
383 let res = self
384 .db
385 .transaction(&["snapshots", "snapshots_meta", "events", "binaries"])
386 .rw()
387 .run(move |transaction| {
388 Self::create_impl(
389 transaction,
390 object_id,
391 created_at,
392 object,
393 updatedness,
394 importance,
395 Importance::NONE, )
397 })
398 .await
399 .wrap_with_context(|| format!("running creation transaction for {object_id:?}"));
400 if res.is_ok() && importance == Importance::NONE {
401 self.objects_unlocked_this_run
402 .set(self.objects_unlocked_this_run.get() + 1);
403 }
404 res
405 }
406
407 async fn submit<T: Object>(
408 &self,
409 object_id: ObjectId,
410 event_id: EventId,
411 event: Arc<T::Event>,
412 updatedness: Option<Updatedness>,
413 additional_importance: Importance,
414 ) -> crate::Result<Option<Arc<T>>> {
415 let new_event_meta = EventMeta {
416 event_id,
417 object_id,
418 required_binaries: event.required_binaries(),
419 };
420 let object_id_js = object_id.to_js_string();
421 let new_event_id_js = event_id.to_js_string();
422 let zero_event_id_js = EventId::from_u128(0).to_js_string();
423 let max_event_id_js = EventId::from_u128(u128::MAX).to_js_string();
424 let new_event_meta_js =
425 to_js(&new_event_meta).wrap_with_context(|| format!("serializing {event_id:?}"))?;
426 let new_event_js =
427 to_js(&*event).wrap_with_context(|| format!("serializing {event_id:?}"))?;
428
429 self.db
430 .transaction(&["snapshots", "snapshots_meta", "events", "events_meta", "binaries"])
431 .rw()
432 .run(move |transaction| async move {
433 let snapshots = transaction
434 .object_store("snapshots")
435 .wrap_context("retrieving 'snapshots' object store")?;
436 let snapshots_meta = transaction
437 .object_store("snapshots_meta")
438 .wrap_context("retrieving 'snapshots_meta' object store")?;
439 let events = transaction
440 .object_store("events")
441 .wrap_context("retrieving 'events' object store")?;
442 let events_meta = transaction
443 .object_store("events_meta")
444 .wrap_context("retrieving 'events_meta' object store")?;
445 let binaries = transaction
446 .object_store("binaries")
447 .wrap_context("retrieving 'binaries' object store")?;
448
449 let creation_object = snapshots_meta
450 .index("creation_object")
451 .wrap_context("retrieving 'creation_object' index")?;
452 let object_snapshot = snapshots_meta
453 .index("object_snapshot")
454 .wrap_context("retrieving 'object_snapshot' index")?;
455
456 let Some(creation_snapshot_js) = creation_object
458 .get(&Array::from_iter([&JsValue::from(1), &object_id_js]))
459 .await
460 .wrap_with_context(|| format!("checking that {object_id:?} already exists"))?
461 else {
462 return Err(crate::Error::ObjectDoesNotExist(object_id).into());
463 };
464 let mut creation_snapshot =
465 serde_wasm_bindgen::from_value::<SnapshotMeta>(creation_snapshot_js)
466 .wrap_with_context(|| {
467 format!("deserializing creation snapshot metadata for {object_id:?}")
468 })?;
469 if creation_snapshot.type_id != *T::type_ulid() {
470 return Err(crate::Error::WrongType {
471 object_id,
472 expected_type_id: *T::type_ulid(),
473 real_type_id: creation_snapshot.type_id,
474 }
475 .into());
476 }
477 if creation_snapshot.snapshot_id >= event_id {
478 return Err(crate::Error::EventTooEarly {
479 event_id,
480 object_id,
481 created_at: creation_snapshot.snapshot_id,
482 }
483 .into());
484 }
485
486 check_strings(&serde_json::to_value(&*event).wrap_context("serializing to json")?)?;
488
489 match events_meta.add(&new_event_meta_js).await {
491 Err(indexed_db::Error::AlreadyExists) => {
492 let old_meta_js = events_meta.get(&new_event_id_js)
494 .await
495 .wrap_with_context(|| format!("retrieving pre-existing event metadata for {event_id:?}"))?
496 .ok_or_else(|| {
497 crate::Error::Other(anyhow!("inserting {event_id:?} failed but the preexisting duplicate seems not to exist"))
498 })?;
499 let old_meta = serde_wasm_bindgen::from_value::<EventMeta>(old_meta_js)
500 .wrap_with_context(|| {
501 format!("deserializing preexisting event metadata for {event_id:?}")
502 })?;
503 if old_meta != new_event_meta {
504 return Err(crate::Error::EventAlreadyExists(event_id).into());
505 }
506
507 let old_data_js = events
509 .get(&new_event_id_js)
510 .await
511 .wrap_with_context(|| {
512 format!("retrieving event data for {event_id:?}")
513 })?
514 .ok_or_else(|| {
515 crate::Error::Other(anyhow!(
516 "Event metadata existed without data for {event_id:?}"
517 ))
518 })?;
519 let old_data = serde_wasm_bindgen::from_value::<T::Event>(old_data_js)
520 .wrap_with_context(|| {
521 format!("deserializing preexisting event data for {event_id:?}")
522 })?;
523 if old_data != *event {
524 return Err(crate::Error::EventAlreadyExists(event_id).into());
525 }
526
527 let importance_before = creation_snapshot.importance;
529 creation_snapshot.importance = Some(importance_before.unwrap_or(Importance::NONE) | additional_importance);
530 if importance_before != creation_snapshot.importance {
531 let creation_snapshot_js = to_js(creation_snapshot)
532 .wrap_context("serializing snapshot metadata")?;
533 snapshots_meta.put(&creation_snapshot_js)
534 .await
535 .wrap_context("locking creation snapshot")?;
536 }
537
538 return Ok(None);
540 }
541 Err(e) => return Err(e),
542 Ok(_) => (),
543 }
544
545 let importance_before = creation_snapshot.importance;
547 creation_snapshot.importance = Some(importance_before.unwrap_or(Importance::NONE) | additional_importance);
548 if importance_before != creation_snapshot.importance {
549 let creation_snapshot_js = to_js(creation_snapshot)
550 .wrap_context("serializing snapshot metadata")?;
551 snapshots_meta.put(&creation_snapshot_js)
552 .await
553 .wrap_context("locking creation snapshot")?;
554 }
555
556 let latest_snapshot_meta_js = object_snapshot
558 .cursor()
559 .range(&**Array::from_iter([&object_id_js, &zero_event_id_js])..=&**Array::from_iter([&object_id_js, &max_event_id_js]))
560 .wrap_with_context(|| format!("limiting the latest snapshot to recover to those of {object_id:?}"))?
561 .direction(CursorDirection::Prev)
562 .open()
563 .await
564 .wrap_with_context(|| format!("opening cursor for the latest snapshot of {object_id:?}"))?
565 .value()
566 .ok_or_else(|| crate::Error::Other(anyhow!("cannot find a latest snapshot for {object_id:?}")))?;
567 let latest_snapshot_meta = serde_wasm_bindgen::from_value::<SnapshotMeta>(latest_snapshot_meta_js)
568 .wrap_with_context(|| format!("deserializing the latest known snapshot for {object_id:?}"))?;
569 let now_have_all_until = match (updatedness, latest_snapshot_meta.have_all_until) {
570 (None, None) => None,
571 (Some(u), None) => Some(u),
572 (None, Some(u)) => Some(u),
573 (Some(now_have_all_until), Some(had_all_until)) => {
574 if now_have_all_until < had_all_until {
575 tracing::warn!("server sent an event with an older updatedness than we already had");
576 Some(had_all_until)
577 } else {
578 Some(now_have_all_until)
579 }
580 }
581 };
582
583 events.add_kv(&new_event_id_js, &new_event_js).await.wrap_with_context(|| format!("saving {event_id:?} in database"))?;
585
586 let mut to_clear = object_snapshot
588 .cursor()
589 .range(&**Array::from_iter([&object_id_js, &new_event_id_js])..=&**Array::from_iter([&object_id_js, &max_event_id_js]))
590 .wrap_with_context(|| format!("limiting the snapshots to delete to only those of {object_id:?} after {event_id:?}"))?
591 .open()
592 .await
593 .wrap_with_context(|| format!("opening cursor of snapshots to delete for {object_id:?} after {event_id:?}"))?;
594 while let Some(snapshot_meta_js) = to_clear.value() {
595 let snapshot_meta = serde_wasm_bindgen::from_value::<SnapshotMeta>(snapshot_meta_js)
596 .wrap_with_context(|| format!("deserializing to-clear snapshot of {object_id:?}, after {event_id:?}"))?;
597 let snapshot_id = snapshot_meta.snapshot_id;
598 snapshots.delete(&snapshot_id.to_js_string()).await.wrap_with_context(|| format!("deleting snapshot {snapshot_id:?}"))?;
599 to_clear.delete().await.wrap_with_context(|| format!("deleting snapshot metadata {snapshot_id:?}"))?;
600 to_clear.advance(1).await.wrap_context("getting next snapshot to delete")?;
601 }
602
603 let last_snapshot_meta_js = object_snapshot
605 .cursor()
606 .range(&**Array::from_iter([&object_id_js, &zero_event_id_js])..=&**Array::from_iter([&object_id_js, &max_event_id_js]))
607 .wrap_with_context(|| format!("limiting the last snapshot to recover to those of {object_id:?}"))?
608 .direction(CursorDirection::Prev)
609 .open()
610 .await
611 .wrap_with_context(|| format!("opening cursor for the last snapshot of {object_id:?}"))?
612 .value()
613 .ok_or_else(|| crate::Error::Other(anyhow!("cannot find a latest snapshot for {object_id:?}")))?;
614 let mut last_snapshot_meta = serde_wasm_bindgen::from_value::<SnapshotMeta>(last_snapshot_meta_js)
615 .wrap_with_context(|| format!("deserializing the last known snapshot for {object_id:?}"))?;
616 let last_snapshot_id = last_snapshot_meta.snapshot_id;
617 let last_snapshot_id_js = last_snapshot_id.to_js_string();
618 let last_snapshot_js = snapshots.get(&last_snapshot_id_js)
619 .await
620 .wrap_with_context(|| format!("retrieving last available snapshot {last_snapshot_id:?}"))?
621 .ok_or_else(|| crate::Error::Other(anyhow!("cannot retrieve snapshot data for {last_snapshot_id:?}")))?;
622 let mut last_snapshot = parse_snapshot_js::<T>(last_snapshot_meta.snapshot_version, last_snapshot_js)
623 .wrap_with_context(|| format!("deserializing snapshot data {last_snapshot_id:?}"))?;
624
625 if last_snapshot_meta.is_latest.is_some() {
627 last_snapshot_meta.is_latest = None;
628 let last_snapshot_meta_js = to_js(last_snapshot_meta)
629 .wrap_with_context(|| format!("reserializing {last_snapshot_id:?}"))?;
630 snapshots_meta.put(&last_snapshot_meta_js)
631 .await
632 .wrap_with_context(|| format!("marking {last_snapshot_id:?} as not the latest any longer"))?;
633 }
634
635 let last_applied_event_id = apply_events_after(&transaction, &mut last_snapshot, object_id, last_snapshot_id).await?;
637
638 let new_last_snapshot_meta = SnapshotMeta {
640 snapshot_id: last_applied_event_id,
641 type_id: *T::type_ulid(),
642 object_id,
643 is_creation: None,
644 is_latest: Some(1),
645 normalizer_version: normalizer_version(),
646 snapshot_version: T::snapshot_version(),
647 have_all_until: now_have_all_until,
648 importance: None,
649 importance_from_queries: None,
650 required_binaries: last_snapshot.required_binaries(),
651 };
652 let last_applied_event_id_js = to_js(last_applied_event_id)
653 .wrap_with_context(|| format!("serializing {last_applied_event_id:?}"))?;
654 let new_last_snapshot_meta_js = to_js(new_last_snapshot_meta)
655 .wrap_with_context(|| format!("serializing the snapshot metadata for {object_id:?} at {last_applied_event_id:?}"))?;
656 let new_last_snapshot_js = to_js(&last_snapshot)
657 .wrap_with_context(|| format!("serializing the snapshot for {object_id:?} at {last_applied_event_id:?}"))?;
658 snapshots_meta.add(&new_last_snapshot_meta_js).await.map_err(|err| match err {
659 indexed_db::Error::AlreadyExists => crate::Error::EventAlreadyExists(event_id),
660 e => crate::Error::Other(anyhow::Error::from(e).context(format!("saving new last snapshot metadata for {object_id:?} at {last_applied_event_id:?}"))),
661
662 })?;
663 snapshots.add_kv(&last_applied_event_id_js, &new_last_snapshot_js)
664 .await
665 .wrap_with_context(|| format!("saving new last snapshot data for {object_id:?} at {last_applied_event_id:?}"))?;
666
667 check_required_binaries(binaries, event.required_binaries()).await
669 .wrap_with_context(|| format!("checking that all the binaries required by {event_id:?} are already present"))?;
670
671 Ok(Some(Arc::new(last_snapshot)))
672 })
673 .await
674 .wrap_with_context(|| {
675 format!("running submission creation for {event_id:?} on {object_id:?}")
676 })
677 }
678
679 async fn get_latest<T: Object>(
680 &self,
681 object_id: ObjectId,
682 importance: Importance,
683 ) -> crate::Result<Arc<T>> {
684 let object_id_js = object_id.to_js_string();
685 let type_id_js = T::type_ulid().to_js_string();
686 let mut transaction =
687 self.db
688 .transaction(&["snapshots", "snapshots_meta", "events", "events_meta"]);
689 if importance != Importance::NONE {
690 transaction = transaction.rw();
691 }
692 transaction
693 .run(move |transaction| async move {
694 let snapshots = transaction
695 .object_store("snapshots")
696 .wrap_context("retrieving 'snapshots' object store")?;
697 let snapshots_meta = transaction
698 .object_store("snapshots_meta")
699 .wrap_context("retrieving 'snapshots_meta' object store")?;
700
701 let creation_object = snapshots_meta
702 .index("creation_object")
703 .wrap_context("retrieving 'creation_object' index")?;
704 let latest_type_object = snapshots_meta
705 .index("latest_type_object")
706 .wrap_context("retrieving 'latest_type_object' index")?;
707
708 let creation_snapshot_meta_js = creation_object
710 .get(&Array::from_iter([&JsValue::from(1), &object_id_js]))
711 .await
712 .wrap_with_context(|| {
713 format!("fetching creation snapshot metadata for {object_id:?}")
714 })?
715 .ok_or_else(|| crate::Error::ObjectDoesNotExist(object_id))?;
716 let mut creation_snapshot_meta =
717 serde_wasm_bindgen::from_value::<SnapshotMeta>(creation_snapshot_meta_js)
718 .wrap_with_context(|| {
719 format!("deserializing creation snapshot metadata for {object_id:?}")
720 })?;
721 if creation_snapshot_meta.type_id != *T::type_ulid() {
722 return Err(crate::Error::WrongType {
723 object_id,
724 expected_type_id: *T::type_ulid(),
725 real_type_id: creation_snapshot_meta.type_id,
726 }
727 .into());
728 }
729
730 let importance_before = creation_snapshot_meta.importance;
732 creation_snapshot_meta.importance =
733 Some(importance_before.unwrap_or(Importance::NONE) | importance);
734 if importance_before != creation_snapshot_meta.importance {
735 let new_snapshot_js = to_js(creation_snapshot_meta)
736 .wrap_context("serializing snapshot metadata")?;
737 snapshots_meta
738 .put(&new_snapshot_js)
739 .await
740 .wrap_with_context(|| {
741 format!("locking creation snapshot for {object_id:?} in database")
742 })?;
743 }
744
745 let latest_snapshot_meta_js = latest_type_object
747 .get(&Array::from_iter([
748 &JsValue::from(1),
749 &type_id_js,
750 &object_id_js,
751 ]))
752 .await
753 .wrap_with_context(|| {
754 format!("fetching latest snapshot metadata for {object_id:?}")
755 })?
756 .ok_or_else(|| {
757 crate::Error::Other(anyhow!(
758 "failed to recover metadata for latest snapshot of {object_id:?}"
759 ))
760 })?;
761 let latest_snapshot_meta =
762 serde_wasm_bindgen::from_value::<SnapshotMeta>(latest_snapshot_meta_js)
763 .wrap_with_context(|| {
764 format!("deserializing latest snapshot metadata for {object_id:?}")
765 })?;
766 let latest_snapshot_id = latest_snapshot_meta.snapshot_id;
767 let latest_snapshot_id_js = latest_snapshot_id.to_js_string();
768 let latest_snapshot_js = snapshots
769 .get(&latest_snapshot_id_js)
770 .await
771 .wrap_with_context(|| {
772 format!("fetching snapshot data for {latest_snapshot_id:?}")
773 })?
774 .ok_or_else(|| {
775 crate::Error::Other(anyhow!(
776 "failed to recover data for snapshot {latest_snapshot_id:?}"
777 ))
778 })?;
779 let latest_snapshot = Arc::new(
780 parse_snapshot_js::<T>(
781 latest_snapshot_meta.snapshot_version,
782 latest_snapshot_js,
783 )
784 .wrap_with_context(|| {
785 format!("deserializing snapshot data for {latest_snapshot_id:?}")
786 })?,
787 );
788
789 Ok(latest_snapshot)
791 })
792 .await
793 .wrap_with_context(|| format!("retrieving {object_id:?} from IndexedDB"))
794 }
795
796 async fn create_binary(&self, binary_id: BinPtr, data: Arc<[u8]>) -> crate::Result<()> {
797 if crdb_core::hash_binary(&data) != binary_id {
798 return Err(crate::Error::BinaryHashMismatch(binary_id));
799 }
800 let ary = Uint8Array::new_with_length(u32::try_from(data.len()).unwrap());
801 ary.copy_from(&data);
802 let res = self
803 .db
804 .transaction(&["binaries"])
805 .rw()
806 .run(move |transaction| async move {
807 let binaries = transaction
808 .object_store("binaries")
809 .wrap_context("retrieving the 'binaries' object store")?;
810
811 binaries
812 .put_kv(&binary_id.to_js_string(), &ary)
813 .await
814 .wrap_context("writing binary")?;
815
816 Ok(())
817 })
818 .await
819 .wrap_with_context(|| format!("writing {binary_id:?}"));
820 if res.is_ok() {
821 self.objects_unlocked_this_run
822 .set(self.objects_unlocked_this_run.get() + 1);
823 }
824 res
825 }
826
827 async fn get_binary(&self, binary_id: BinPtr) -> crate::Result<Option<Arc<[u8]>>> {
828 let ary = self
829 .db
830 .transaction(&["binaries"])
831 .run(move |transaction| async move {
832 let binaries = transaction
833 .object_store("binaries")
834 .wrap_context("retrieving the 'binaries' object store")?;
835
836 binaries.get(&binary_id.to_js_string()).await
837 })
838 .await
839 .wrap_with_context(|| format!("fetching {binary_id:?}"))?;
840 let Some(ary) = ary else {
841 return Ok(None);
842 };
843 let ary = ary
844 .dyn_into::<Uint8Array>()
845 .wrap_context("recovering Uint8Array from stored data")?;
846 Ok(Some(ary.to_vec().into_boxed_slice().into()))
847 }
848
849 async fn reencode_old_versions<T: Object>(&self) -> usize {
851 let res = self
852 .db
853 .transaction(&["snapshots_meta", "snapshots"])
854 .rw()
855 .run(|transaction| async move {
856 let snapshots_meta = transaction
857 .object_store("snapshots_meta")
858 .wrap_context("retrieving snapshots_meta object store")?;
859 let mut cursor = snapshots_meta
860 .cursor()
861 .open()
862 .await
863 .wrap_context("opening cursor over all snapshots")?;
864 let mut num_errors = 0;
865 while let Some(snapshot_meta_js) = cursor.value() {
867 let snapshot_meta =
868 serde_wasm_bindgen::from_value::<SnapshotMeta>(snapshot_meta_js)
869 .wrap_context("deserializing snapshot metadata")?;
870 if snapshot_meta.type_id == *T::type_ulid()
871 && (snapshot_meta.snapshot_version < T::snapshot_version()
872 || snapshot_meta.normalizer_version < normalizer_version())
873 {
874 let snapshot_id = snapshot_meta.snapshot_id;
875 if let Err(err) = reencode_snapshot::<T>(
876 &transaction,
877 snapshot_meta.snapshot_id,
878 snapshot_meta.snapshot_version,
879 )
880 .await
881 {
882 let type_id = *T::type_ulid();
883 tracing::error!(
884 ?err,
885 ?snapshot_id,
886 ?type_id,
887 "failed reencoding snapshot to latest version"
888 );
889 num_errors += 1;
890 }
891 }
892 cursor
893 .advance(1)
894 .await
895 .wrap_context("advancing cursor to next item")?;
896 }
897 Ok(num_errors)
898 })
899 .await
900 .wrap_with_context(|| format!("reencoding old versions of type {:?}", T::type_ulid()));
901 match res {
902 Ok(num_errs) => num_errs,
903 Err(err) => {
904 tracing::error!(?err, type_id=?T::type_ulid(), "failed running transaction to reencode all objects");
905 1
906 }
907 }
908 }
909
910 async fn assert_invariants_generic(&self) {
911 use std::collections::hash_map;
912
913 self.db
914 .transaction(&[
915 "snapshots_meta",
916 "events_meta",
917 "upload_queue_meta",
918 "binaries",
919 ])
920 .run(move |transaction| async move {
921 let snapshots_meta = transaction.object_store("snapshots_meta").unwrap();
922 let events_meta = transaction.object_store("events_meta").unwrap();
923 let binaries = transaction.object_store("binaries").unwrap();
924
925 let creation_object = snapshots_meta.index("creation_object").unwrap();
926
927 let required_binaries = Self::list_required_binaries(&transaction).await.unwrap();
929 for b in required_binaries {
930 if !binaries.contains(&to_js(b).unwrap()).await.unwrap() {
931 panic!("missing required binary {b:?}");
932 }
933 }
934
935 let mut event_cursor = events_meta.cursor().open().await.unwrap();
937 while let Some(e) = event_cursor.value() {
938 let e = serde_wasm_bindgen::from_value::<EventMeta>(e).unwrap();
939 if !creation_object
940 .contains(&Array::from_iter([
941 &JsValue::from(1),
942 &e.object_id.to_js_string(),
943 ]))
944 .await
945 .unwrap()
946 {
947 panic!(
948 "event {:?} references object {:?} that has no creation snapshot",
949 e.event_id, e.object_id
950 );
951 }
952 event_cursor.advance(1).await.unwrap();
953 }
954
955 let mut snapshot_cursor = snapshots_meta.cursor().open().await.unwrap();
957 while let Some(s) = snapshot_cursor.value() {
958 let s = serde_wasm_bindgen::from_value::<SnapshotMeta>(s).unwrap();
959 let e = events_meta
960 .get(&s.snapshot_id.to_js_string())
961 .await
962 .unwrap();
963 if s.is_creation.is_none() && !e.is_some() {
964 panic!("snapshot {:?} has no corresponding event", s.snapshot_id);
965 }
966 if let Some(e) = e {
967 let e = serde_wasm_bindgen::from_value::<EventMeta>(e).unwrap();
968 if e.object_id != s.object_id {
969 panic!(
970 "object for snapshot and event at {:?} does not match",
971 s.snapshot_id
972 );
973 }
974 }
975 snapshot_cursor.advance(1).await.unwrap();
976 }
977
978 let mut snapshot_cursor = snapshots_meta.cursor().open().await.unwrap();
980 let mut types = HashMap::new();
981 while let Some(s) = snapshot_cursor.value() {
982 let s = serde_wasm_bindgen::from_value::<SnapshotMeta>(s).unwrap();
983 match types.entry(s.object_id) {
984 hash_map::Entry::Occupied(o) => {
985 if *o.get() != s.type_id {
986 panic!("object {:?} has multiple type ids", s.object_id);
987 }
988 }
989 hash_map::Entry::Vacant(v) => {
990 v.insert(s.type_id);
991 }
992 }
993 snapshot_cursor.advance(1).await.unwrap();
994 }
995
996 Ok(())
997 })
998 .await
999 .unwrap();
1000 }
1001
1002 async fn assert_invariants_for<T: Object>(&self) {
1003 use std::collections::BTreeMap;
1004
1005 self.db
1006 .transaction(&["snapshots", "snapshots_meta", "events", "events_meta"])
1007 .run(move |transaction| async move {
1008 let snapshots_store = transaction.object_store("snapshots").unwrap();
1009 let snapshots_meta = transaction.object_store("snapshots_meta").unwrap();
1010 let events_store = transaction.object_store("events").unwrap();
1011 let events_meta = transaction.object_store("events_meta").unwrap();
1012
1013 let snapshots = snapshots_meta.get_all(None).await.unwrap();
1015 let snapshots = snapshots
1016 .into_iter()
1017 .map(|s| serde_wasm_bindgen::from_value::<SnapshotMeta>(s).unwrap())
1018 .collect::<Vec<_>>();
1019 let objects = snapshots
1020 .iter()
1021 .filter_map(|s| (s.type_id == *T::type_ulid()).then(|| s.object_id))
1022 .collect::<HashSet<_>>();
1023 let mut object_snapshots_map = HashMap::new();
1024 for o in objects.iter() {
1025 object_snapshots_map.insert(*o, BTreeMap::new());
1026 }
1027 for s in snapshots {
1028 if let Some(o) = object_snapshots_map.get_mut(&s.object_id) {
1029 o.insert(s.snapshot_id, s);
1030 }
1031 }
1032
1033 let events = events_meta.get_all(None).await.unwrap();
1035 let events = events
1036 .into_iter()
1037 .map(|e| serde_wasm_bindgen::from_value::<EventMeta>(e).unwrap())
1038 .collect::<Vec<_>>();
1039 let mut object_events_map = HashMap::new();
1040 for o in objects.iter() {
1041 object_events_map.insert(*o, BTreeMap::new());
1042 }
1043 for e in events {
1044 if let Some(o) = object_events_map.get_mut(&e.object_id) {
1045 o.insert(e.event_id, e);
1046 }
1047 }
1048
1049 for object_id in objects {
1051 let snapshots = object_snapshots_map.get(&object_id).unwrap();
1052 let events = object_events_map.get(&object_id).unwrap();
1053
1054 let creation = snapshots.first_key_value().unwrap().1;
1056 let latest = snapshots.last_key_value().unwrap().1;
1057 assert!(creation.is_creation == Some(1));
1058 assert!(latest.is_latest == Some(1));
1059 assert!(creation.importance.is_some());
1060 assert!(creation.importance_from_queries.is_some());
1062 if creation.snapshot_id == latest.snapshot_id {
1063 continue;
1064 }
1065
1066 assert!(events.first_key_value().unwrap().0 > &creation.snapshot_id);
1068 assert!(events.last_key_value().unwrap().0 == &latest.snapshot_id);
1069
1070 let object = snapshots_store
1072 .get(&creation.snapshot_id.to_js_string())
1073 .await
1074 .unwrap()
1075 .unwrap();
1076 let mut object =
1077 parse_snapshot_js::<T>(creation.snapshot_version, object).unwrap();
1078 for (event_id, event_meta) in events.iter() {
1079 let e = events_store
1080 .get(&event_id.to_js_string())
1081 .await
1082 .unwrap()
1083 .unwrap();
1084 let e = serde_wasm_bindgen::from_value::<T::Event>(e).unwrap();
1085 assert_eq!(event_meta.required_binaries, e.required_binaries());
1086 object.apply(DbPtr::from(object_id), &e);
1087 if let Some(snapshot_meta) = snapshots.get(&event_id) {
1088 let s = snapshots_store
1089 .get(&event_id.to_js_string())
1090 .await
1091 .unwrap()
1092 .unwrap();
1093 let s =
1094 parse_snapshot_js::<T>(snapshot_meta.snapshot_version, s).unwrap();
1095 assert!(object == s);
1096 assert!(snapshot_meta.required_binaries == object.required_binaries());
1097 assert!(snapshot_meta.type_id == *T::type_ulid());
1098 assert!(snapshot_meta.importance.is_none());
1099 assert!(snapshot_meta.importance_from_queries.is_none());
1100 }
1101 }
1102 }
1103
1104 Ok(())
1105 })
1106 .await
1107 .unwrap();
1108 }
1109}
1110
1111impl ClientSideDb for IndexedDb {
1112 async fn storage_info(&self) -> crate::Result<ClientStorageInfo> {
1113 let window = web_sys::window()
1114 .ok_or_else(|| crate::Error::Other(anyhow!("not running in a browser")))?;
1115 let storage = window.navigator().storage();
1116 let estimate = storage.estimate().map_err(|_| {
1117 crate::Error::Other(anyhow!(
1118 "failed getting a storage estimate from the browser"
1119 ))
1120 })?;
1121 let estimate = wasm_bindgen_futures::JsFuture::from(estimate)
1122 .await
1123 .map_err(|_| {
1124 crate::Error::Other(anyhow!("storage estimate promise returned an error"))
1125 })?;
1126 let quota = js_sys::Reflect::get(&estimate, &JsString::from("quota"))
1127 .map_err(|_| crate::Error::Other(anyhow!("storage estimate had no quota field")))?
1128 .as_f64()
1129 .ok_or_else(|| {
1130 crate::Error::Other(anyhow!("storage estimate for quota was not a number"))
1131 })? as usize;
1132 let usage = js_sys::Reflect::get(&estimate, &JsString::from("usage"))
1133 .map_err(|_| crate::Error::Other(anyhow!("storage estimate had no quota field")))?
1134 .as_f64()
1135 .ok_or_else(|| {
1136 crate::Error::Other(anyhow!("storage estimate for quota was not a number"))
1137 })? as usize;
1138 Ok(ClientStorageInfo {
1139 quota,
1140 usage,
1141 objects_unlocked_this_run: self.objects_unlocked_this_run.get(),
1142 })
1143 }
1144
1145 async fn save_login(&self, info: LoginInfo) -> crate::Result<()> {
1146 let info_js = to_js(info).wrap_context("serializing login info")?;
1147 self.db
1148 .transaction(&["config"])
1149 .rw()
1150 .run(move |transaction| async move {
1151 transaction
1152 .object_store("config")
1153 .wrap_context("retrieving 'config' object store")?
1154 .put_kv(&JsString::from(CONFIG_SAVED_LOGIN), &info_js)
1155 .await
1156 .wrap_context("saving login info to database")?;
1157 Ok(())
1158 })
1159 .await
1160 .wrap_context("saving login info to database")
1161 }
1162
1163 async fn get_saved_login(&self) -> crate::Result<Option<LoginInfo>> {
1164 let saved_info = self
1165 .db
1166 .transaction(&["config"])
1167 .run(move |transaction| async move {
1168 let saved_info = transaction
1169 .object_store("config")
1170 .wrap_context("retrieving 'config' object store")?
1171 .get(&JsString::from(CONFIG_SAVED_LOGIN))
1172 .await
1173 .wrap_context("retrieving login from database")?;
1174 Ok(saved_info)
1175 })
1176 .await
1177 .wrap_context("retrieving login from database")?;
1178 let Some(saved_info) = saved_info else {
1179 return Ok(None);
1180 };
1181 let saved_info = serde_wasm_bindgen::from_value(saved_info)
1182 .wrap_context("deserializing saved login info")?;
1183 Ok(Some(saved_info))
1184 }
1185
1186 async fn remove_everything(&self) -> crate::Result<()> {
1187 self.db
1188 .transaction(OBJECT_STORE_LIST)
1189 .rw()
1190 .run(move |transaction| async move {
1191 for store in OBJECT_STORE_LIST {
1192 transaction
1193 .object_store(store)
1194 .wrap_with_context(|| format!("retrieving {store:?} object store"))?
1195 .clear()
1196 .await
1197 .wrap_with_context(|| format!("clearing {store:?} object store"))?;
1198 }
1199 Ok(())
1200 })
1201 .await
1202 .wrap_context("clearing the IndexedDB database")
1203 }
1204
1205 async fn recreate<T: Object>(
1206 &self,
1207 object_id: ObjectId,
1208 new_created_at: EventId,
1209 mut object: Arc<T>,
1210 updatedness: Option<Updatedness>,
1211 additional_importance: Importance,
1212 ) -> crate::Result<Option<Arc<T>>> {
1213 let object_id_js = object_id.to_js_string();
1214 let type_id_js = T::type_ulid().to_js_string();
1215 let new_created_at_js = new_created_at.to_js_string();
1216 let max_id = EventId::from_u128(u128::MAX).to_js_string();
1217 let zero_id = EventId::from_u128(0).to_js_string();
1218 let required_binaries = object.required_binaries();
1219
1220 self.db
1221 .transaction(&[
1222 "snapshots",
1223 "snapshots_meta",
1224 "events",
1225 "events_meta",
1226 "binaries",
1227 ])
1228 .rw()
1229 .run(move |transaction| async move {
1230 let snapshots = transaction
1231 .object_store("snapshots")
1232 .wrap_context("retrieving 'snapshots' object store")?;
1233 let snapshots_meta = transaction
1234 .object_store("snapshots_meta")
1235 .wrap_context("retrieving 'snapshots_meta' object store")?;
1236 let events = transaction
1237 .object_store("events")
1238 .wrap_context("retrieving 'events' object store")?;
1239 let events_meta = transaction
1240 .object_store("events_meta")
1241 .wrap_context("retrieving 'events_meta' object store")?;
1242 let binaries = transaction
1243 .object_store("binaries")
1244 .wrap_context("retrieving the 'binaries' object store")?;
1245
1246 let creation_object = snapshots_meta
1247 .index("creation_object")
1248 .wrap_context("retrieving 'creation_object' index")?;
1249 let latest_type_object = snapshots_meta
1250 .index("latest_type_object")
1251 .wrap_context("retrieving 'latest_type_object' index")?;
1252 let object_snapshot = snapshots_meta
1253 .index("object_snapshot")
1254 .wrap_context("retrieving 'object_snapshot' index")?;
1255
1256 let object_event = events_meta
1257 .index("object_event")
1258 .wrap_context("retrieving 'object_event' index")?;
1259
1260 let Some(creation_meta) = creation_object
1262 .get(&Array::from_iter([&JsValue::from(1), &object_id_js]))
1263 .await
1264 .wrap_context("checking creation object")?
1265 else {
1266 return Self::create_impl::<T>(
1268 transaction,
1269 object_id,
1270 new_created_at,
1271 object,
1272 updatedness,
1273 additional_importance,
1274 Importance::NONE, )
1276 .await;
1277 };
1278 let creation_meta = serde_wasm_bindgen::from_value::<SnapshotMeta>(creation_meta)
1279 .wrap_context("parsing snapshot metadata")?;
1280 if creation_meta.type_id != *T::type_ulid() {
1281 return Err(crate::Error::WrongType {
1282 object_id,
1283 expected_type_id: *T::type_ulid(),
1284 real_type_id: creation_meta.type_id,
1285 }
1286 .into());
1287 }
1288 if creation_meta.snapshot_id > new_created_at {
1289 return Err(crate::Error::EventTooEarly {
1290 event_id: new_created_at,
1291 object_id,
1292 created_at: creation_meta.snapshot_id,
1293 }
1294 .into());
1295 }
1296
1297 check_strings(
1299 &serde_json::to_value(&*object).wrap_context("serializing to json")?,
1300 )?;
1301
1302 let latest_snapshot_meta_js = latest_type_object
1304 .get(&Array::from_iter([
1305 &JsValue::from(1),
1306 &type_id_js,
1307 &object_id_js,
1308 ]))
1309 .await
1310 .wrap_context("retrieving latest snapshot")?
1311 .ok_or_else(|| {
1312 crate::Error::Other(anyhow!(
1313 "No latest snapshot for an object with a creation snapshot"
1314 ))
1315 })?;
1316 let latest_snapshot_meta =
1317 serde_wasm_bindgen::from_value::<SnapshotMeta>(latest_snapshot_meta_js)
1318 .wrap_context("deserializing latest snapshot metadata")?;
1319 let new_creation_should_be_latest =
1320 latest_snapshot_meta.snapshot_id <= new_created_at;
1321
1322 let mut to_delete = object_event
1324 .cursor()
1325 .range(
1326 &**Array::from_iter([&object_id_js, &zero_id])
1327 ..=&**Array::from_iter([&object_id_js, &new_created_at_js]),
1328 )
1329 .wrap_context("limiting the range for the events to delete")?
1330 .open()
1331 .await
1332 .wrap_context("opening cursor of all events to delete")?;
1333 while let Some(event_id_js) = to_delete.primary_key() {
1334 events
1335 .delete(&event_id_js)
1336 .await
1337 .wrap_context("deleting event data")?;
1338 to_delete
1339 .delete()
1340 .await
1341 .wrap_context("deleting event metadata")?;
1342 to_delete
1343 .advance(1)
1344 .await
1345 .wrap_context("moving to next to-apply event")?;
1346 }
1347
1348 let mut to_delete = object_snapshot
1350 .cursor()
1351 .range(
1352 &**Array::from_iter([&object_id_js, &zero_id])
1353 ..=&**Array::from_iter([&object_id_js, &max_id]),
1354 )
1355 .wrap_context("limiting to-delete snapshot range")?
1356 .open()
1357 .await
1358 .wrap_context("opening cursor of snapshots to delete")?;
1359 while let Some(snapshot_id_js) = to_delete.primary_key() {
1360 snapshots
1361 .delete(&snapshot_id_js)
1362 .await
1363 .wrap_context("deleting snapshot data")?;
1364 to_delete
1365 .delete()
1366 .await
1367 .wrap_context("deleting snapshot metadata")?;
1368 to_delete
1369 .advance(1)
1370 .await
1371 .wrap_context("moving to next to-delete snapshot")?;
1372 }
1373
1374 let object_js = to_js(&object).wrap_context("serializing new creation snapshot")?;
1376 snapshots
1377 .add_kv(&new_created_at_js, &object_js)
1378 .await
1379 .wrap_context("saving new creation snapshot data")?;
1380
1381 let new_creation_snapshot_meta = SnapshotMeta {
1383 snapshot_id: new_created_at,
1384 type_id: *T::type_ulid(),
1385 object_id,
1386 is_creation: Some(1),
1387 is_latest: new_creation_should_be_latest.then(|| 1),
1388 normalizer_version: normalizer_version(),
1389 snapshot_version: T::snapshot_version(),
1390 have_all_until: updatedness,
1391 importance: Some(
1392 creation_meta.importance.unwrap_or(Importance::NONE)
1393 | additional_importance,
1394 ),
1395 importance_from_queries: creation_meta.importance_from_queries,
1398 required_binaries: required_binaries.clone(),
1399 };
1400 let new_creation_snapshot_meta_js = to_js(new_creation_snapshot_meta)
1401 .wrap_context("serializing snapshot metadata")?;
1402 snapshots_meta
1403 .put(&new_creation_snapshot_meta_js)
1404 .await
1405 .wrap_context("saving the new creation snapshot metadata")?;
1406
1407 if !new_creation_should_be_latest {
1409 let mut last_applied_event_id = new_created_at;
1410 let mut_object = Arc::make_mut(&mut object);
1411 let mut to_apply = object_event
1413 .cursor()
1414 .range(
1415 &**Array::from_iter([&object_id_js, &zero_id])
1416 ..=&**Array::from_iter([&object_id_js, &max_id]),
1417 )
1418 .wrap_context("limiting the range of events to apply")?
1419 .open_key()
1420 .await
1421 .wrap_context("opening the cursor on events to apply")?;
1422 while let Some(event_id_js) = to_apply.primary_key() {
1423 let event_js = events
1424 .get(&event_id_js)
1425 .await
1426 .wrap_context(
1427 "retrieving data for an event for which we have the metadata",
1428 )?
1429 .ok_or_else(|| {
1430 crate::Error::Other(anyhow!(
1431 "had no event data for an event with known metadata"
1432 ))
1433 })?;
1434 let event_id = serde_wasm_bindgen::from_value::<EventId>(event_id_js)
1435 .wrap_context("deserializing event id")?;
1436 last_applied_event_id = event_id;
1437 let event = serde_wasm_bindgen::from_value::<T::Event>(event_js)
1438 .wrap_context("deserializing event data")?;
1439 mut_object.apply(DbPtr::from(object_id), &event);
1440 to_apply
1441 .advance(1)
1442 .await
1443 .wrap_context("advancing events-to-apply cursor")?;
1444 }
1445
1446 let new_latest_snapshot_meta = SnapshotMeta {
1448 snapshot_id: last_applied_event_id,
1449 type_id: *T::type_ulid(),
1450 object_id,
1451 is_creation: None,
1452 is_latest: Some(1),
1453 normalizer_version: normalizer_version(),
1454 snapshot_version: T::snapshot_version(),
1455 have_all_until: updatedness,
1456 importance: None,
1457 importance_from_queries: None,
1458 required_binaries: object.required_binaries(),
1459 };
1460 let new_latest_snapshot_meta_js = to_js(new_latest_snapshot_meta)
1461 .wrap_context("serializing snapshot metadata")?;
1462 snapshots_meta
1463 .put(&new_latest_snapshot_meta_js)
1464 .await
1465 .wrap_context("saving the new latest snapshot metadata")?;
1466
1467 let last_applied_event_id_js = last_applied_event_id.to_js_string();
1469 let object_js =
1470 to_js(&object).wrap_context("serializing new latest snapshot")?;
1471 snapshots
1472 .add_kv(&last_applied_event_id_js, &object_js)
1473 .await
1474 .wrap_context("saving new latest snapshot data")?;
1475 }
1476
1477 check_required_binaries(binaries, required_binaries).await?;
1479
1480 Ok(Some(object))
1481 })
1482 .await
1483 .wrap_with_context(|| {
1484 format!("recreating {object_id:?} with new data from {new_created_at:?}")
1485 })
1486 }
1487
1488 async fn get_json(
1489 &self,
1490 _object_id: ObjectId,
1491 _importance: Importance,
1492 ) -> crate::Result<serde_json::Value> {
1493 todo!() }
1495
1496 async fn client_query(
1497 &self,
1498 type_id: TypeId,
1499 query: Arc<Query>,
1500 ) -> crate::Result<Vec<ObjectId>> {
1501 query.check()?;
1502 let type_id_js = type_id.to_js_string();
1503 let zero_id = EventId::from_u128(0).to_js_string();
1504 let max_id = EventId::from_u128(u128::MAX).to_js_string();
1505 let objects = self
1512 .db
1513 .transaction(&["snapshots_meta", "snapshots"])
1514 .run(move |transaction| async move {
1515 let snapshots = transaction
1516 .object_store("snapshots")
1517 .wrap_context("retrieving 'snapshots' object store")?;
1518 let snapshots_meta = transaction
1519 .object_store("snapshots_meta")
1520 .wrap_context("retrieving 'snapshots_meta' object store")?;
1521
1522 let latest_type_object = snapshots_meta
1523 .index("latest_type_object")
1524 .wrap_context("retrieving 'latest_type_object' index")?;
1525
1526 let mut cursor = latest_type_object
1527 .cursor()
1528 .range(
1529 &**Array::from_iter([&JsValue::from(1), &type_id_js, &zero_id])
1530 ..=&**Array::from_iter([&JsValue::from(1), &type_id_js, &max_id]),
1531 )
1532 .wrap_context("limiting cursor to only snapshots of the right type")?
1533 .open_key()
1534 .await
1535 .wrap_context("opening cursor over all latest objects")?;
1536 let mut objects = Vec::new();
1537 while let Some(snapshot_id_js) = cursor.primary_key() {
1538 let snapshot = snapshots
1539 .get(&snapshot_id_js)
1540 .await
1541 .wrap_context("retrieving snapshot data for known metadata")?
1542 .ok_or_else(|| {
1543 crate::Error::Other(anyhow!("no snapshot data for known metadata"))
1544 })?;
1545 let snapshot = serde_wasm_bindgen::from_value::<serde_json::Value>(snapshot)
1546 .wrap_context("deserializing snapshot data as serde_json::Value")?;
1547 if query.matches_json(&snapshot) {
1548 let object_id_js = cursor
1549 .key()
1550 .ok_or_else(|| {
1551 crate::Error::Other(anyhow!("cursor had a primary key but no key"))
1552 })?
1553 .dyn_into::<Array>()
1554 .wrap_context("cursor key was not an array")?
1555 .get(2);
1556 let object_id = serde_wasm_bindgen::from_value::<ObjectId>(object_id_js)
1557 .wrap_context("deserializing object id")?;
1558 objects.push(object_id);
1559 }
1560 cursor
1561 .advance(1)
1562 .await
1563 .wrap_context("going to next snapshot in the database")?;
1564 }
1565 Ok(objects)
1566 })
1567 .await
1568 .wrap_context("finding a first snapshot to answer")?;
1569
1570 Ok(objects)
1572 }
1573
1574 async fn remove(&self, object_id: ObjectId) -> crate::Result<()> {
1575 let object_id_js = object_id.to_js_string();
1576 let zero_id = EventId::from_u128(0).to_js_string();
1577 let max_id = EventId::from_u128(u128::MAX).to_js_string();
1578
1579 self.db
1580 .transaction(&["snapshots", "snapshots_meta", "events", "events_meta"])
1581 .rw()
1582 .run(move |transaction| async move {
1583 let snapshots_meta = transaction
1584 .object_store("snapshots_meta")
1585 .wrap_context("retrieving the 'snapshots_meta' object store")?;
1586 let events_meta = transaction
1587 .object_store("events_meta")
1588 .wrap_context("retrieving the 'events_meta' object store")?;
1589 let snapshots = transaction
1590 .object_store("snapshots")
1591 .wrap_context("retrieving the 'snapshots' object store")?;
1592 let events = transaction
1593 .object_store("events")
1594 .wrap_context("retrieving the 'events' object store")?;
1595
1596 let creation_object = snapshots_meta
1597 .index("creation_object")
1598 .wrap_context("retrieving the 'creation_object' index")?;
1599 let object_snapshot = snapshots_meta
1600 .index("object_snapshot")
1601 .wrap_context("retrieving the 'object_snapshot' index")?;
1602
1603 let object_event = events_meta
1604 .index("object_event")
1605 .wrap_context("retrieving the 'object_event' index")?;
1606
1607 let object_does_exist = creation_object
1609 .contains(&Array::from_iter([&JsValue::from(1), &object_id_js]))
1610 .await
1611 .wrap_context("retrieving creation snapshot")?;
1612 if !object_does_exist {
1613 return Ok(());
1615 };
1616
1617 let mut to_remove = object_snapshot
1619 .cursor()
1620 .range(
1621 &**Array::from_iter([&object_id_js, &zero_id])
1622 ..=&**Array::from_iter([&object_id_js, &max_id]),
1623 )
1624 .wrap_context("limiting to-delete range")?
1625 .open()
1626 .await
1627 .wrap_context("opening to-delete cursor")?;
1628 while let Some(snapshot_id_js) = to_remove.primary_key() {
1629 snapshots
1630 .delete(&snapshot_id_js)
1631 .await
1632 .wrap_context("deleting snapshot data")?;
1633 to_remove
1634 .delete()
1635 .await
1636 .wrap_context("deleting snapshot metadata")?;
1637 to_remove
1638 .advance(1)
1639 .await
1640 .wrap_context("going to next to-delete snapshot")?;
1641 }
1642
1643 let mut to_remove = object_event
1644 .cursor()
1645 .range(
1646 &**Array::from_iter([&object_id_js, &zero_id])
1647 ..=&**Array::from_iter([&object_id_js, &max_id]),
1648 )
1649 .wrap_context("limiting to-delete range")?
1650 .open()
1651 .await
1652 .wrap_context("opening to-delete cursor")?;
1653 while let Some(event_id_js) = to_remove.primary_key() {
1654 events
1655 .delete(&event_id_js)
1656 .await
1657 .wrap_context("deleting event data")?;
1658 to_remove
1659 .delete()
1660 .await
1661 .wrap_context("deleting event metadata")?;
1662 to_remove
1663 .advance(1)
1664 .await
1665 .wrap_context("going to next to-delete event")?;
1666 }
1667
1668 Ok(())
1669 })
1670 .await
1671 .wrap_with_context(|| format!("removing {object_id:?}"))
1672 }
1673
1674 async fn remove_event<T: Object>(
1675 &self,
1676 object_id: ObjectId,
1677 event_id: EventId,
1678 ) -> crate::Result<()> {
1679 let object_id_js = object_id.to_js_string();
1680 let event_id_js = event_id.to_js_string();
1681 let zero_event_id_js = EventId::from_u128(0).to_js_string();
1682 let max_event_id_js = EventId::from_u128(u128::MAX).to_js_string();
1683 self.db
1684 .transaction(&["snapshots", "snapshots_meta", "events", "events_meta"])
1685 .rw()
1686 .run(move |transaction| async move {
1687 let snapshots = transaction
1688 .object_store("snapshots")
1689 .wrap_context("retrieving 'snapshots' object store")?;
1690 let snapshots_meta = transaction
1691 .object_store("snapshots_meta")
1692 .wrap_context("retrieving 'snapshots_meta' object store")?;
1693 let events = transaction
1694 .object_store("events")
1695 .wrap_context("retrieving 'events' object store")?;
1696 let events_meta = transaction
1697 .object_store("events_meta")
1698 .wrap_context("retrieving 'events_meta' object store")?;
1699
1700 let creation_object = snapshots_meta
1701 .index("creation_object")
1702 .wrap_context("retrieving 'creation_object' index")?;
1703 let object_snapshot = snapshots_meta
1704 .index("object_snapshot")
1705 .wrap_context("retrieving 'object_snapshot' index")?;
1706
1707 let Some(creation_snapshot_js) = creation_object
1709 .get(&Array::from_iter([&JsValue::from(1), &object_id_js]))
1710 .await
1711 .wrap_with_context(|| format!("checking that {object_id:?} already exists"))?
1712 else {
1713 return Err(crate::Error::ObjectDoesNotExist(object_id).into());
1714 };
1715 let creation_snapshot =
1716 serde_wasm_bindgen::from_value::<SnapshotMeta>(creation_snapshot_js)
1717 .wrap_with_context(|| {
1718 format!("deserializing creation snapshot metadata for {object_id:?}")
1719 })?;
1720 if creation_snapshot.type_id != *T::type_ulid() {
1721 return Err(crate::Error::WrongType {
1722 object_id,
1723 expected_type_id: *T::type_ulid(),
1724 real_type_id: creation_snapshot.type_id,
1725 }
1726 .into());
1727 }
1728 if creation_snapshot.snapshot_id >= event_id {
1729 return Err(crate::Error::EventTooEarly {
1730 event_id,
1731 object_id,
1732 created_at: creation_snapshot.snapshot_id,
1733 }
1734 .into());
1735 }
1736
1737 if !events_meta.contains(&event_id_js).await.wrap_context("checking whether to-remove event does exist")? {
1739 return Ok(());
1741 }
1742
1743 events_meta.delete(&event_id_js).await.wrap_context("deleting to-remove event metadata")?;
1745 events.delete(&event_id_js).await.wrap_context("deleting to-remove event data")?;
1746
1747 let latest_snapshot_meta_js = object_snapshot
1749 .cursor()
1750 .range(&**Array::from_iter([&object_id_js, &zero_event_id_js])..=&**Array::from_iter([&object_id_js, &max_event_id_js]))
1751 .wrap_with_context(|| format!("limiting the latest snapshot to recover to those of {object_id:?}"))?
1752 .direction(CursorDirection::Prev)
1753 .open()
1754 .await
1755 .wrap_with_context(|| format!("opening cursor for the latest snapshot of {object_id:?}"))?
1756 .value()
1757 .ok_or_else(|| crate::Error::Other(anyhow!("cannot find a latest snapshot for {object_id:?}")))?;
1758 let latest_snapshot_meta = serde_wasm_bindgen::from_value::<SnapshotMeta>(latest_snapshot_meta_js)
1759 .wrap_with_context(|| format!("deserializing the latest known snapshot for {object_id:?}"))?;
1760 let now_have_all_until = latest_snapshot_meta.have_all_until;
1761
1762 let mut to_clear = object_snapshot
1764 .cursor()
1765 .range(&**Array::from_iter([&object_id_js, &event_id_js])..=&**Array::from_iter([&object_id_js, &max_event_id_js]))
1766 .wrap_with_context(|| format!("limiting the snapshots to delete to only those of {object_id:?} after {event_id:?}"))?
1767 .open()
1768 .await
1769 .wrap_with_context(|| format!("opening cursor of snapshots to delete for {object_id:?} after {event_id:?}"))?;
1770 while let Some(snapshot_meta_js) = to_clear.value() {
1771 let snapshot_meta = serde_wasm_bindgen::from_value::<SnapshotMeta>(snapshot_meta_js)
1772 .wrap_with_context(|| format!("deserializing to-clear snapshot of {object_id:?}, after {event_id:?}"))?;
1773 let snapshot_id = snapshot_meta.snapshot_id;
1774 snapshots.delete(&snapshot_id.to_js_string()).await.wrap_with_context(|| format!("deleting snapshot {snapshot_id:?}"))?;
1775 to_clear.delete().await.wrap_with_context(|| format!("deleting snapshot metadata {snapshot_id:?}"))?;
1776 to_clear.advance(1).await.wrap_context("getting next snapshot to delete")?;
1777 }
1778
1779 let last_snapshot_meta_js = object_snapshot
1781 .cursor()
1782 .range(&**Array::from_iter([&object_id_js, &zero_event_id_js])..=&**Array::from_iter([&object_id_js, &max_event_id_js]))
1783 .wrap_with_context(|| format!("limiting the last snapshot to recover to those of {object_id:?}"))?
1784 .direction(CursorDirection::Prev)
1785 .open()
1786 .await
1787 .wrap_with_context(|| format!("opening cursor for the last snapshot of {object_id:?}"))?
1788 .value()
1789 .ok_or_else(|| crate::Error::Other(anyhow!("cannot find a latest snapshot for {object_id:?}")))?;
1790 let last_snapshot_meta = serde_wasm_bindgen::from_value::<SnapshotMeta>(last_snapshot_meta_js)
1791 .wrap_with_context(|| format!("deserializing the last known snapshot for {object_id:?}"))?;
1792 let last_snapshot_id = last_snapshot_meta.snapshot_id;
1793 let last_snapshot_id_js = last_snapshot_id.to_js_string();
1794 let last_snapshot_js = snapshots.get(&last_snapshot_id_js)
1795 .await
1796 .wrap_with_context(|| format!("retrieving last available snapshot {last_snapshot_id:?}"))?
1797 .ok_or_else(|| crate::Error::Other(anyhow!("cannot retrieve snapshot data for {last_snapshot_id:?}")))?;
1798 let mut last_snapshot = parse_snapshot_js::<T>(last_snapshot_meta.snapshot_version, last_snapshot_js)
1799 .wrap_with_context(|| format!("deserializing snapshot data {last_snapshot_id:?}"))?;
1800
1801 let last_applied_event_id = apply_events_after(&transaction, &mut last_snapshot, object_id, last_snapshot_id).await?;
1803
1804 let new_last_snapshot_meta = SnapshotMeta {
1806 snapshot_id: last_applied_event_id,
1807 type_id: *T::type_ulid(),
1808 object_id,
1809 is_creation: None,
1810 is_latest: Some(1),
1811 normalizer_version: normalizer_version(),
1812 snapshot_version: T::snapshot_version(),
1813 have_all_until: now_have_all_until,
1814 importance: None,
1815 importance_from_queries: None,
1816 required_binaries: last_snapshot.required_binaries(),
1817 };
1818 let last_applied_event_id_js = to_js(last_applied_event_id)
1819 .wrap_with_context(|| format!("serializing {last_applied_event_id:?}"))?;
1820 let new_last_snapshot_meta_js = to_js(new_last_snapshot_meta)
1821 .wrap_with_context(|| format!("serializing the snapshot metadata for {object_id:?} at {last_applied_event_id:?}"))?;
1822 let new_last_snapshot_js = to_js(last_snapshot)
1823 .wrap_with_context(|| format!("serializing the snapshot for {object_id:?} at {last_applied_event_id:?}"))?;
1824 snapshots_meta.put(&new_last_snapshot_meta_js).await.wrap_with_context(|| format!("saving new last snapshot metadata for {object_id:?} at {last_applied_event_id:?}"))?;
1825 snapshots.put_kv(&last_applied_event_id_js, &new_last_snapshot_js)
1826 .await
1827 .wrap_with_context(|| format!("saving new last snapshot data for {object_id:?} at {last_applied_event_id:?}"))?;
1828
1829 Ok(())
1830 })
1831 .await
1832 .wrap_with_context(|| {
1833 format!("removing {event_id:?} on {object_id:?}")
1834 })
1835 }
1836
1837 async fn set_object_importance(
1838 &self,
1839 object_id: ObjectId,
1840 importance: Importance,
1841 ) -> crate::Result<()> {
1842 let object_id_js = object_id.to_js_string();
1843
1844 let res = self
1845 .db
1846 .transaction(&["snapshots_meta"])
1847 .rw()
1848 .run(move |transaction| async move {
1849 let snapshots_meta = transaction
1850 .object_store("snapshots_meta")
1851 .wrap_context("retrieving the 'snapshots_meta' object store")?;
1852
1853 let creation_object = snapshots_meta
1854 .index("creation_object")
1855 .wrap_context("retrieving the 'creation_object' index")?;
1856
1857 let Some(snapshot_js) = creation_object
1858 .get(&Array::from_iter([&JsValue::from(1), &object_id_js]))
1859 .await
1860 .wrap_context("retrieving creation snapshot")?
1861 else {
1862 return Ok(());
1864 };
1865
1866 let mut snapshot_meta = serde_wasm_bindgen::from_value::<SnapshotMeta>(snapshot_js)
1867 .wrap_context("deserializing snapshot metadata")?;
1868 let importance_before = snapshot_meta.importance;
1869 snapshot_meta.importance = Some(importance);
1870
1871 if importance_before != snapshot_meta.importance {
1872 let snapshot_js =
1873 to_js(snapshot_meta).wrap_context("reserializing snapshot metadata")?;
1874 snapshots_meta
1875 .put(&snapshot_js)
1876 .await
1877 .wrap_context("saving the unlocked creation snapshot metadata")?;
1878 }
1879
1880 Ok(())
1881 })
1882 .await
1883 .wrap_with_context(|| format!("unlocking {object_id:?} from IndexedDB"));
1884 if res.is_ok() {
1885 self.objects_unlocked_this_run
1886 .set(self.objects_unlocked_this_run.get() + 1);
1887 }
1888 res
1889 }
1890
1891 async fn set_importance_from_queries(
1892 &self,
1893 object_id: ObjectId,
1894 importance_from_queries: Importance,
1895 ) -> crate::Result<()> {
1896 let object_id_js = object_id.to_js_string();
1897
1898 let res = self
1899 .db
1900 .transaction(&["snapshots_meta"])
1901 .rw()
1902 .run(move |transaction| async move {
1903 let snapshots_meta = transaction
1904 .object_store("snapshots_meta")
1905 .wrap_context("retrieving the 'snapshots_meta' object store")?;
1906
1907 let creation_object = snapshots_meta
1908 .index("creation_object")
1909 .wrap_context("retrieving the 'creation_object' index")?;
1910
1911 let Some(snapshot_js) = creation_object
1912 .get(&Array::from_iter([&JsValue::from(1), &object_id_js]))
1913 .await
1914 .wrap_context("retrieving creation snapshot")?
1915 else {
1916 return Ok(());
1918 };
1919
1920 let mut snapshot_meta = serde_wasm_bindgen::from_value::<SnapshotMeta>(snapshot_js)
1921 .wrap_context("deserializing snapshot metadata")?;
1922 let importance_from_queries_before = snapshot_meta.importance_from_queries;
1923 snapshot_meta.importance_from_queries = Some(importance_from_queries);
1924
1925 if importance_from_queries_before != snapshot_meta.importance_from_queries {
1926 let snapshot_js =
1927 to_js(snapshot_meta).wrap_context("reserializing snapshot metadata")?;
1928 snapshots_meta
1929 .put(&snapshot_js)
1930 .await
1931 .wrap_context("saving the unlocked creation snapshot metadata")?;
1932 }
1933
1934 Ok(())
1935 })
1936 .await
1937 .wrap_with_context(|| format!("unlocking {object_id:?} from IndexedDB"));
1938 if res.is_ok() {
1939 self.objects_unlocked_this_run
1940 .set(self.objects_unlocked_this_run.get() + 1);
1941 }
1942 res
1943 }
1944
1945 async fn set_query_importance(
1946 &self,
1947 _query_id: QueryId,
1948 _importance: Importance,
1949 _objects_matched_by_query: Vec<ObjectId>,
1950 ) -> crate::Result<()> {
1951 todo!() }
1953
1954 async fn client_vacuum(
1955 &self,
1956 mut notify_removals: impl 'static + FnMut(ObjectId),
1957 mut notify_query_removals: impl 'static + FnMut(QueryId),
1958 ) -> crate::Result<()> {
1959 let zero_id = ObjectId::from_u128(0).to_js_string();
1960 let max_id = ObjectId::from_u128(u128::MAX).to_js_string();
1961
1962 let res = self
1963 .db
1964 .transaction(&[
1965 "snapshots",
1966 "snapshots_meta",
1967 "events",
1968 "events_meta",
1969 "queries_meta",
1970 "upload_queue_meta",
1971 "binaries",
1972 ])
1973 .rw()
1974 .run(move |transaction| async move {
1975 let snapshots_meta = transaction
1976 .object_store("snapshots_meta")
1977 .wrap_context("retrieving the 'snapshots_meta' object store")?;
1978 let events_meta = transaction
1979 .object_store("events_meta")
1980 .wrap_context("retrieving the 'events_meta' object store")?;
1981 let snapshots = transaction
1982 .object_store("snapshots")
1983 .wrap_context("retrieving the 'snapshots' object store")?;
1984 let events = transaction
1985 .object_store("events")
1986 .wrap_context("retrieving the 'events' object store")?;
1987 let queries_meta = transaction
1988 .object_store("queries_meta")
1989 .wrap_context("retrieving the 'queries_meta' object store")?;
1990 let binaries = transaction
1991 .object_store("binaries")
1992 .wrap_context("retrieving the 'binaries' object store")?;
1993
1994 let object_snapshot = snapshots_meta
1995 .index("object_snapshot")
1996 .wrap_context("retrieving the 'object_snapshot' index")?;
1997 let creation_object = snapshots_meta
1998 .index("creation_object")
1999 .wrap_context("retrieving the 'creation_object' index")?;
2000
2001 let object_event = events_meta
2002 .index("object_event")
2003 .wrap_context("retrieving the 'object_event' index")?;
2004
2005 let mut cursor = queries_meta
2007 .cursor()
2008 .open()
2009 .await
2010 .wrap_context("listing all queries")?;
2011 while let Some(query_meta_js) = cursor.value() {
2012 let query_meta = serde_wasm_bindgen::from_value::<QueryMeta>(query_meta_js)
2013 .wrap_context("deserializing query metadata")?;
2014 if !query_meta.importance.lock() {
2015 notify_query_removals(query_meta.query_id);
2016 cursor
2017 .delete()
2018 .await
2019 .wrap_context("removing unlocked query")?;
2020 }
2021 cursor
2022 .advance(1)
2023 .await
2024 .wrap_context("moving cursor forward")?;
2025 }
2026
2027 let mut to_remove = creation_object
2030 .cursor()
2031 .open()
2032 .await
2033 .wrap_context("listing unlocked objects")?;
2034 while let Some(s) = to_remove.value() {
2037 let s = serde_wasm_bindgen::from_value::<SnapshotMeta>(s)
2038 .wrap_context("deserializing unlocked object")?;
2039 if s.importance.unwrap().lock() || s.importance_from_queries.unwrap().lock() {
2041 to_remove
2043 .advance(1)
2044 .await
2045 .wrap_context("going to next to-remove object")?;
2046 continue;
2047 }
2048 let object_id = s.object_id;
2049 let object_id_js = object_id.to_js_string();
2050
2051 let mut snapshots_to_remove = object_snapshot
2053 .cursor()
2054 .range(
2055 &**Array::from_iter([&object_id_js, &zero_id])
2056 ..=&**Array::from_iter([&object_id_js, &max_id]),
2057 )
2058 .wrap_context("limiting the range to to-delete snapshots")?
2059 .open()
2060 .await
2061 .wrap_context("opening cursor of to-delete snapshots")?;
2062 while let Some(s) = snapshots_to_remove.value() {
2063 let s = serde_wasm_bindgen::from_value::<SnapshotMeta>(s)
2064 .wrap_context("deserializing to-remove snapshot metadata")?;
2065 snapshots
2066 .delete(&s.snapshot_id.to_js_string())
2067 .await
2068 .wrap_context("failed deleting snapshot")?;
2069 snapshots_to_remove.delete().await.wrap_with_context(|| {
2070 format!("failed deleting snapshot metadata of {object_id:?}")
2071 })?;
2072 snapshots_to_remove
2073 .advance(1)
2074 .await
2075 .wrap_context("going to next to-remove snapshot")?;
2076 }
2077
2078 let mut events_to_remove = object_event
2080 .cursor()
2081 .range(
2082 &**Array::from_iter([&object_id_js, &zero_id])
2083 ..=&**Array::from_iter([&object_id_js, &max_id]),
2084 )
2085 .wrap_context("limiting the range to to-delete events")?
2086 .open()
2087 .await
2088 .wrap_context("opening cursor of to-delete events")?;
2089 while let Some(e) = events_to_remove.value() {
2090 let e = serde_wasm_bindgen::from_value::<EventMeta>(e)
2091 .wrap_context("deserializing to-remove event metadata")?;
2092 events
2093 .delete(&e.event_id.to_js_string())
2094 .await
2095 .wrap_context("failed deleting event")?;
2096 events_to_remove.delete().await.wrap_with_context(|| {
2097 format!("failed deleting event of {object_id:?}")
2098 })?;
2099 events_to_remove
2100 .advance(1)
2101 .await
2102 .wrap_context("going to next to-remove event")?;
2103 }
2104
2105 notify_removals(object_id);
2107
2108 to_remove
2110 .advance(1)
2111 .await
2112 .wrap_context("going to next to-remove object")?;
2113 }
2114
2115 let required_binaries = Self::list_required_binaries(&transaction)
2116 .await
2117 .wrap_context("listing still-required binaries")?;
2118 let mut binaries_cursor = binaries
2119 .cursor()
2120 .open()
2121 .await
2122 .wrap_context("opening cursor over all binaries")?;
2123 while let Some(b) = binaries_cursor.key() {
2124 let b = serde_wasm_bindgen::from_value::<BinPtr>(b)
2125 .wrap_context("deserializing binary id")?;
2126 if !required_binaries.contains(&b) {
2127 binaries_cursor
2128 .delete()
2129 .await
2130 .wrap_with_context(|| format!("deleting {b:?}"))?;
2131 }
2132 binaries_cursor
2133 .advance(1)
2134 .await
2135 .wrap_context("going to next binary")?;
2136 }
2137
2138 Ok(())
2139 })
2140 .await
2141 .wrap_context("vacuuming the database");
2142 if res.is_ok() {
2143 self.objects_unlocked_this_run.set(0);
2144 }
2145 res
2146 }
2147
2148 async fn list_uploads(&self) -> crate::Result<Vec<UploadId>> {
2149 self.db
2151 .transaction(&["upload_queue_meta"])
2152 .run(move |transaction| async move {
2153 let keys = transaction
2154 .object_store("upload_queue_meta")
2155 .wrap_context("retrieving 'upload_queue_meta' object store")?
2156 .get_all_keys(None)
2157 .await
2158 .wrap_context("getting all keys from upload_queue_meta")?;
2159 let mut res = Vec::with_capacity(keys.len());
2160 for k in keys.into_iter() {
2161 let k = serde_wasm_bindgen::from_value::<UploadId>(k)
2162 .wrap_context("deserializing upload id")?;
2163 res.push(k);
2164 }
2165 Ok(res)
2166 })
2167 .await
2168 .wrap_context("listing upload queue")
2169 }
2170
2171 async fn get_upload(&self, upload_id: UploadId) -> crate::Result<Option<Upload>> {
2172 self.db
2173 .transaction(&["upload_queue"])
2174 .run(move |transaction| async move {
2175 let Some(res) = transaction
2176 .object_store("upload_queue")
2177 .wrap_context("retrieving 'upload_queue' object store")?
2178 .get(&JsValue::try_from(upload_id.0).unwrap())
2179 .await
2180 .wrap_context("fetching data from upload_queue store")?
2181 else {
2182 return Ok(None);
2183 };
2184 let res = serde_wasm_bindgen::from_value::<Upload>(res)
2185 .wrap_context("deserializing data")?;
2186 Ok(Some(res))
2187 })
2188 .await
2189 .wrap_with_context(|| format!("retrieving data for {upload_id:?}"))
2190 }
2191
2192 async fn enqueue_upload(
2193 &self,
2194 upload: Upload,
2195 required_binaries: Vec<BinPtr>,
2196 ) -> crate::Result<UploadId> {
2197 let metadata = UploadMeta { required_binaries };
2198 let metadata = to_js(metadata).wrap_context("serializing upload metadata")?;
2199 let data = to_js(upload).wrap_context("serializing upload data")?;
2200 self.db
2201 .transaction(&["upload_queue", "upload_queue_meta"])
2202 .rw()
2203 .run(move |transaction| async move {
2204 let upload_id = transaction
2205 .object_store("upload_queue_meta")
2206 .wrap_context("retrieving 'upload_queue_meta' object store")?
2207 .add(&metadata)
2208 .await
2209 .wrap_context("saving upload metadata")?;
2210 transaction
2211 .object_store("upload_queue")
2212 .wrap_context("retrieving 'upload_queue' object store")?
2213 .add_kv(&upload_id, &data)
2214 .await
2215 .wrap_context("saving upload data")?;
2216 let upload_id = serde_wasm_bindgen::from_value::<UploadId>(upload_id)
2217 .wrap_context("deserializing upload id")?;
2218 Ok(upload_id)
2219 })
2220 .await
2221 .wrap_context("registering not-yet-completed upload")
2222 }
2223
2224 async fn upload_finished(&self, upload_id: UploadId) -> crate::Result<()> {
2225 let res = self
2226 .db
2227 .transaction(&["upload_queue", "upload_queue_meta"])
2228 .rw()
2229 .run(move |transaction| async move {
2230 let upload_id = to_js(upload_id).wrap_context("serializing upload id")?;
2231 transaction
2232 .object_store("upload_queue")
2233 .wrap_context("retrieving 'upload_queue' object store")?
2234 .delete(&upload_id)
2235 .await
2236 .wrap_context("deleting upload data")?;
2237 transaction
2238 .object_store("upload_queue_meta")
2239 .wrap_context("retrieving 'upload_queue_meta' object store")?
2240 .delete(&upload_id)
2241 .await
2242 .wrap_context("deleting upload metadata")?;
2243 Ok(())
2244 })
2245 .await
2246 .wrap_with_context(|| format!("registering {upload_id:?} as having completed"));
2247 if res.is_ok() {
2248 self.objects_unlocked_this_run
2249 .set(self.objects_unlocked_this_run.get() + 1);
2250 }
2251 res
2252 }
2253
2254 async fn get_saved_objects(&self) -> crate::Result<HashMap<ObjectId, SavedObjectMeta>> {
2256 let zero_id = TypeId::from_u128(0).to_js_string();
2259 let max_id = TypeId::from_u128(u128::MAX).to_js_string();
2260 self.db
2261 .transaction(&["snapshots", "snapshots_meta"])
2262 .run(|transaction| async move {
2263 let snapshots_meta = transaction
2264 .object_store("snapshots_meta")
2265 .wrap_context("retrieving snapshots_meta object store")?;
2266 let creation_type_object = snapshots_meta
2267 .index("creation_type_object")
2268 .wrap_context("opening 'creation_type_object' snapshot index")?;
2269 let mut cursor = creation_type_object
2270 .cursor()
2271 .range(
2272 &**Array::from_iter([&JsValue::from(1), &zero_id, &zero_id])
2273 ..=&**Array::from_iter([&JsValue::from(1), &max_id, &max_id]),
2274 )
2275 .wrap_context("limiting cursor to only creation snapshots")?
2276 .open()
2277 .await
2278 .wrap_context("opening cursor over all creation objects")?;
2279 let mut res = HashMap::new();
2280 while let Some(snapshot_meta_js) = cursor.value() {
2281 let snapshot_meta =
2282 serde_wasm_bindgen::from_value::<SnapshotMeta>(snapshot_meta_js)
2283 .wrap_context("deserializing snapshot metadata")?;
2284 res.insert(
2285 snapshot_meta.object_id,
2286 SavedObjectMeta {
2287 type_id: snapshot_meta.type_id,
2288 have_all_until: snapshot_meta.have_all_until,
2290 importance: snapshot_meta.importance.unwrap(),
2292 },
2293 );
2294 cursor
2295 .advance(1)
2296 .await
2297 .wrap_context("going to next object in the database")?;
2298 }
2299 Ok(res)
2300 })
2301 .await
2302 .wrap_context("listing subscribed objects")
2303 }
2304
2305 async fn get_saved_queries(&self) -> crate::Result<HashMap<QueryId, SavedQuery>> {
2306 self.db
2307 .transaction(&["queries_meta"])
2308 .run(|transaction| async move {
2309 let queries_meta = transaction
2310 .object_store("queries_meta")
2311 .wrap_context("retrieving queries_meta object store")?;
2312 let queries = queries_meta
2313 .get_all(None)
2314 .await
2315 .wrap_context("listing subscribed queries")?;
2316 let res = queries
2317 .into_iter()
2318 .map(|q| {
2319 let q = serde_wasm_bindgen::from_value::<QueryMeta>(q)
2320 .wrap_context("deserializing query metadata")?;
2321 Ok((
2322 q.query_id,
2323 SavedQuery {
2324 query: q.query,
2325 type_id: q.type_id,
2326 have_all_until: q.have_all_until,
2327 importance: q.importance,
2328 },
2329 ))
2330 })
2331 .collect();
2332 res
2333 })
2334 .await
2335 .wrap_context("listing subscribed objects")
2336 }
2337
2338 async fn record_query(
2339 &self,
2340 query_id: QueryId,
2341 query: Arc<Query>,
2342 type_id: TypeId,
2343 importance: Importance,
2344 ) -> crate::Result<()> {
2345 let new_query = QueryMeta {
2346 query_id,
2347 query,
2348 type_id,
2349 importance,
2350 have_all_until: None,
2351 };
2352 let new_query_js = to_js(new_query).wrap_context("serializing query metadata")?;
2353 self.db
2354 .transaction(&["queries_meta"])
2355 .rw()
2356 .run(|transaction| async move {
2357 let queries_meta = transaction
2358 .object_store("queries_meta")
2359 .wrap_context("retrieving queries_meta object store")?;
2360 queries_meta
2361 .put(&new_query_js)
2362 .await
2363 .wrap_context("inserting the new query")?;
2364 Ok(())
2365 })
2366 .await
2367 .wrap_context("subscribing to query")
2368 }
2369
2370 async fn forget_query(
2371 &self,
2372 query_id: QueryId,
2373 objects_matching_query: Vec<ObjectId>,
2374 ) -> crate::Result<()> {
2375 let query_id_js = query_id.to_js_string();
2376 self.db
2377 .transaction(&["queries_meta", "snapshots_meta"])
2378 .rw()
2379 .run(move |transaction| async move {
2380 let queries_meta = transaction
2381 .object_store("queries_meta")
2382 .wrap_context("retrieving queries_meta object store")?;
2383 let snapshots_meta = transaction
2384 .object_store("snapshots_meta")
2385 .wrap_context("retrieving snapshots_meta object store")?;
2386 let creation_object = snapshots_meta
2387 .index("creation_object")
2388 .wrap_context("retrieving creation_object index")?;
2389
2390 queries_meta
2391 .delete(&query_id_js)
2392 .await
2393 .wrap_context("removing the unsubscribed query")?;
2394 for object_id in objects_matching_query {
2395 let Some(snapshot_meta_js) = creation_object
2396 .get(&**Array::from_iter([
2397 &JsValue::from(1),
2398 &object_id.to_js_string(),
2399 ]))
2400 .await
2401 .wrap_context("fetching existing snapshot metadata")?
2402 else {
2403 tracing::error!(
2404 ?object_id,
2405 "object supposed to get unlocked does not actually exist"
2406 );
2407 continue;
2408 };
2409 let mut snapshot_meta =
2410 serde_wasm_bindgen::from_value::<SnapshotMeta>(snapshot_meta_js)
2411 .wrap_context("deserializing snapshot metadata")?;
2412 let importance_from_queries_before = snapshot_meta.importance_from_queries;
2414 snapshot_meta.importance_from_queries = Some(Importance::NONE);
2415 if importance_from_queries_before != snapshot_meta.importance_from_queries {
2416 let snapshot_meta_js =
2417 to_js(snapshot_meta).wrap_context("serializing snapshot metadata")?;
2418 snapshots_meta
2419 .put(&snapshot_meta_js)
2420 .await
2421 .wrap_context("saving unlocked-for-queries snapshot")?;
2422 }
2423 }
2424 Ok(())
2425 })
2426 .await
2427 .wrap_context("subscribing to query")
2428 }
2429
2430 async fn update_queries(
2431 &self,
2432 queries: &HashSet<QueryId>,
2433 now_have_all_until: Updatedness,
2434 ) -> crate::Result<()> {
2435 let queries = queries.clone();
2436 self.db
2437 .transaction(&["queries_meta"])
2438 .rw()
2439 .run(move |transaction| async move {
2440 let queries_meta = transaction
2441 .object_store("queries_meta")
2442 .wrap_context("retrieving queries_meta object store")?;
2443 for query_id in queries {
2444 let Some(query_meta_js) = queries_meta
2445 .get(&query_id.to_js_string())
2446 .await
2447 .wrap_context("fetching existing query metadata")?
2448 else {
2449 tracing::error!(
2450 ?query_id,
2451 "query supposed to get updated does not actually exist"
2452 );
2453 continue;
2454 };
2455 let mut query_meta = serde_wasm_bindgen::from_value::<QueryMeta>(query_meta_js)
2456 .wrap_context("deserializing query metadata")?;
2457 query_meta.have_all_until = Some(now_have_all_until);
2458 let query_meta_js =
2459 to_js(query_meta).wrap_context("reserializing query metadata")?;
2460 queries_meta
2461 .put(&query_meta_js)
2462 .await
2463 .wrap_context("saving updated query metadata")?;
2464 }
2465 Ok(())
2466 })
2467 .await
2468 .wrap_context("subscribing to query")
2469 }
2470}
2471
2472async fn reencode_snapshot<T: Object>(
2473 transaction: &indexed_db::Transaction<crate::Error>,
2474 snapshot_id: EventId,
2475 snapshot_version: i32,
2476) -> crate::Result<()> {
2477 let snapshot_id_js = snapshot_id.to_js_string();
2478 let snapshots = transaction
2479 .object_store("snapshots")
2480 .wrap_context("retrieving 'snapshots' object store")?;
2481 let snapshot_js = snapshots
2482 .get(&snapshot_id_js)
2483 .await
2484 .wrap_context("fetching snapshot to update")?
2485 .ok_or_else(|| {
2486 crate::Error::Other(anyhow!(
2487 "Snapshot {snapshot_id:?} does not exist despite having associated metadata"
2488 ))
2489 })?;
2490 let snapshot_json = serde_wasm_bindgen::from_value::<serde_json::Value>(snapshot_js)
2491 .wrap_context("deserializing indexeddb data as json-value")?;
2492 let value = T::from_old_snapshot(snapshot_version, snapshot_json)
2493 .wrap_context("parsing old snapshot")?;
2494 let new_snapshot_js = to_js(value).wrap_context("serializing snapshot data")?;
2495 snapshots
2496 .put_kv(&snapshot_id_js, &new_snapshot_js)
2497 .await
2498 .wrap_context("updating saved snapshot data")?;
2499 Ok(())
2500}
2501
2502async fn check_required_binaries(
2503 binaries_store: indexed_db::ObjectStore<crate::Error>,
2504 binaries: Vec<BinPtr>,
2505) -> crate::Result<()> {
2506 let missing_binaries = future::try_join_all(binaries.iter().map(|&b| {
2507 binaries_store
2508 .contains(&b.to_js_string())
2509 .map_ok(move |present| (!present).then_some(b))
2510 }))
2511 .await
2512 .wrap_with_context(|| format!("checking for required binaries {binaries:?}"))?
2513 .into_iter()
2514 .filter_map(|b| b)
2515 .collect::<Vec<_>>();
2516
2517 if !missing_binaries.is_empty() {
2518 return Err(crate::Error::MissingBinaries(missing_binaries));
2519 }
2520
2521 Ok(())
2522}
2523
2524async fn apply_events_after<T: Object>(
2525 transaction: &indexed_db::Transaction<crate::Error>,
2526 object: &mut T,
2527 object_id: ObjectId,
2528 mut last_applied_event_id: EventId,
2529) -> indexed_db::Result<EventId, crate::Error> {
2530 let events = transaction
2531 .object_store("events")
2532 .wrap_context("retrieving 'events' object store")?;
2533 let object_event = transaction
2534 .object_store("events_meta")
2535 .wrap_context("retrieving 'events_meta' object store")?
2536 .index("object_event")
2537 .wrap_context("retrieving 'object_event' index")?;
2538
2539 let object_id_js = object_id.to_js_string();
2540 let last_applied_event_id_js = last_applied_event_id.to_js_string();
2541 let max_event_id_js = EventId::from_u128(u128::MAX).to_js_string();
2542
2543 let mut to_apply = object_event.cursor()
2544 .range((
2545 Bound::Excluded(&**Array::from_iter([&object_id_js, &last_applied_event_id_js])),
2546 Bound::Included(&**Array::from_iter([&object_id_js, &max_event_id_js])),
2547 ))
2548 .wrap_with_context(|| format!("limiting the events to apply to those on {object_id:?} since {last_applied_event_id:?}"))?
2549 .open()
2550 .await
2551 .wrap_with_context(|| format!("opening cursor for the events to apply on {object_id:?} since {last_applied_event_id:?}"))?;
2552 while let Some(apply_event_meta_js) = to_apply.value() {
2553 let apply_event_meta = serde_wasm_bindgen::from_value::<EventMeta>(apply_event_meta_js)
2554 .wrap_with_context(|| format!("deserializing event to apply on {object_id:?}"))?;
2555 let apply_event_id = apply_event_meta.event_id;
2556 let apply_event_id_js = apply_event_id.to_js_string();
2557 let apply_event_js = events
2558 .get(&apply_event_id_js)
2559 .await
2560 .wrap_with_context(|| format!("recovering event data for {apply_event_id:?}"))?
2561 .ok_or_else(|| {
2562 crate::Error::Other(anyhow!(
2563 "no event data for event with metadata {apply_event_id:?}"
2564 ))
2565 })?;
2566 let apply_event = serde_wasm_bindgen::from_value::<T::Event>(apply_event_js)
2567 .wrap_with_context(|| format!("deserializing event data for {apply_event_id:?}"))?;
2568 object.apply(DbPtr::from(object_id), &apply_event);
2569 last_applied_event_id = apply_event_id;
2570 to_apply
2571 .advance(1)
2572 .await
2573 .wrap_context("getting next event to apply")?;
2574 }
2575
2576 Ok(last_applied_event_id)
2577}
2578
2579fn to_js<T: serde::Serialize>(v: T) -> std::result::Result<JsValue, serde_wasm_bindgen::Error> {
2580 static JSON_SERIALIZER: serde_wasm_bindgen::Serializer =
2581 serde_wasm_bindgen::Serializer::json_compatible();
2582 v.serialize(&JSON_SERIALIZER)
2583}