crdb_client/
client_db.rs

1use super::{api_db::OnError, connection::ConnectionEvent, ApiDb};
2use crate::Obj;
3use anyhow::anyhow;
4use crdb_cache::CacheDb;
5use crdb_core::{
6    BinPtr, ClientSideDb, Db, DbPtr, EventId, Importance, MaybeObject, Object, ObjectData,
7    ObjectId, Query, QueryId, ResultExt, SavedQuery, Session, SessionRef, SessionToken, TypeId,
8    Update, UpdateData, Updatedness, Updates, Upload, UploadId, User,
9};
10use crdb_core::{ClientStorageInfo, LoginInfo};
11use futures::{channel::mpsc, stream, FutureExt, StreamExt, TryStreamExt};
12use std::ops::Deref;
13use std::{
14    collections::{hash_map, HashMap, HashSet},
15    future::Future,
16    sync::{Arc, Mutex, RwLock},
17    time::Duration,
18};
19use tokio::sync::{broadcast, oneshot, watch};
20use tokio_util::sync::CancellationToken;
21use ulid::Ulid;
22
23enum UpdateResult {
24    LostAccess,
25    LatestUnchanged,
26    LatestChanged(TypeId, serde_json::Value),
27}
28
29#[derive(Clone)]
30pub(crate) struct SavedObject {
31    pub have_all_until: Option<Updatedness>,
32    pub importance: Importance,
33    pub matches_queries: HashSet<QueryId>,
34    pub importance_from_queries: Importance,
35}
36
37impl SavedObject {
38    fn now_have_all_until(&mut self, until: Updatedness) {
39        self.have_all_until = std::cmp::max(self.have_all_until, Some(until));
40    }
41}
42
43type SavedObjectMap = HashMap<ObjectId, SavedObject>;
44type SavedQueriesMap = HashMap<QueryId, SavedQuery>;
45type QueryUpdatesBroadcastMap =
46    HashMap<QueryId, (broadcast::Sender<ObjectId>, broadcast::Receiver<ObjectId>)>;
47
48pub struct ClientDb<LocalDb: ClientSideDb> {
49    user: RwLock<Option<User>>,
50    ulid: Mutex<ulid::Generator>,
51    api: Arc<ApiDb<LocalDb>>,
52    db: Arc<CacheDb<LocalDb>>,
53    // The `Lock` here is ONLY the accumulated queries lock for this object! The object lock is
54    // handled directly in the database only.
55    saved_objects: Arc<Mutex<SavedObjectMap>>,
56    saved_queries: Arc<Mutex<SavedQueriesMap>>,
57    data_saver: mpsc::UnboundedSender<DataSaverMessage>,
58    data_saver_skipper: watch::Sender<bool>,
59    updates_broadcastee: broadcast::Receiver<ObjectId>,
60    query_updates_broadcastees: Arc<Mutex<QueryUpdatesBroadcastMap>>,
61    vacuum_guard: Arc<tokio::sync::RwLock<()>>,
62    _cleanup_token: tokio_util::sync::DropGuard,
63}
64
65const BROADCAST_CHANNEL_SIZE: usize = 64;
66
67enum DataSaverMessage {
68    Data {
69        update: Arc<Update>,
70        now_have_all_until: Option<Updatedness>,
71        additional_importance: Importance,
72        reply: oneshot::Sender<crate::Result<UpdateResult>>,
73    },
74    StopFrame(oneshot::Sender<()>),
75    ResumeFrame,
76}
77
78impl<LocalDb: ClientSideDb> ClientDb<LocalDb> {
79    pub async fn connect<C, RRL, EH, EHF, VS>(
80        config: C,
81        db: LocalDb,
82        cache_watermark: usize,
83        require_relogin: RRL,
84        error_handler: EH,
85        vacuum_schedule: ClientVacuumSchedule<VS>,
86    ) -> crate::Result<(Arc<ClientDb<LocalDb>>, impl waaaa::Future<Output = usize>)>
87    where
88        C: crdb_core::Config,
89        RRL: 'static + waaaa::Send + Fn(),
90        EH: 'static + waaaa::Send + Fn(Upload, crate::Error) -> EHF,
91        EHF: 'static + waaaa::Future<Output = OnError>,
92        VS: 'static + Send + Fn(ClientStorageInfo) -> bool,
93    {
94        let _ = config; // mark used
95        C::check_ulids();
96        let (updates_broadcaster, updates_broadcastee) = broadcast::channel(BROADCAST_CHANNEL_SIZE);
97        let db = Arc::new(CacheDb::new(db, cache_watermark));
98        let maybe_login = db
99            .get_saved_login()
100            .await
101            .wrap_context("retrieving saved login info")?;
102        if maybe_login.is_none() {
103            (require_relogin)();
104        }
105        let saved_queries = db
106            .get_saved_queries()
107            .await
108            .wrap_context("listing saved queries")?;
109        let saved_objects = stream::iter(
110            db.get_saved_objects()
111                .await
112                .wrap_context("listing saved objects")?,
113        )
114        .map(async |(object_id, saved)| -> crate::Result<_> {
115            // TODO(perf-high): replace get_latest with get_json wherever possible
116            let value = db.get_json(object_id, Importance::NONE).await?;
117            let (matches_queries, importance_from_queries) =
118                queries_for(&saved_queries, saved.type_id, &value);
119            Ok((
120                object_id,
121                SavedObject {
122                    have_all_until: saved.have_all_until,
123                    importance: saved.importance,
124                    matches_queries,
125                    importance_from_queries,
126                },
127            ))
128        })
129        .buffer_unordered(16)
130        .try_collect::<HashMap<_, _>>()
131        .await?;
132        let query_updates_broadcastees = Arc::new(Mutex::new(
133            saved_queries
134                .keys()
135                .map(|&q| (q, broadcast::channel(BROADCAST_CHANNEL_SIZE)))
136                .collect(),
137        ));
138        let saved_queries = Arc::new(Mutex::new(saved_queries));
139        let saved_objects = Arc::new(Mutex::new(saved_objects));
140        let (api, updates_receiver) = ApiDb::new::<C, _, _, _, _, _>(
141            db.clone(),
142            {
143                let saved_objects = saved_objects.clone();
144                move || {
145                    // TODO(api-highest): rework this to return all objects and not only subscribed objects
146                    // We want all objects to get sent to the server with updatedness information, so that
147                    // the server does not re-send it all to us when we already have it. But it'll mean the
148                    // user of these callbacks must filter on subscription status to define whether it wants
149                    // to subscribe on the object or not.
150                    // The same does not need to be done for saved_queries below, that will have no impact on
151                    // any further subscription.
152                    saved_objects
153                        .lock()
154                        .unwrap()
155                        .iter()
156                        .filter(|(_, o)| o.importance.subscribe())
157                        .map(|(id, o)| (*id, o.clone()))
158                        .collect()
159                }
160            },
161            {
162                let saved_queries = saved_queries.clone();
163                move || {
164                    saved_queries
165                        .lock()
166                        .unwrap()
167                        .iter()
168                        .filter(|(_, q)| q.importance.subscribe())
169                        .map(|(id, q)| (*id, q.clone()))
170                        .collect()
171                }
172            },
173            error_handler,
174            require_relogin,
175        )
176        .await?;
177        let api = Arc::new(api);
178        let cancellation_token = CancellationToken::new();
179        let (data_saver, data_saver_receiver) = mpsc::unbounded();
180        let this = Arc::new(ClientDb {
181            user: RwLock::new(maybe_login.as_ref().map(|l| l.user)),
182            ulid: Mutex::new(ulid::Generator::new()),
183            api,
184            db,
185            saved_objects,
186            saved_queries,
187            data_saver,
188            data_saver_skipper: watch::channel(false).0,
189            updates_broadcastee,
190            query_updates_broadcastees,
191            vacuum_guard: Arc::new(tokio::sync::RwLock::new(())),
192            _cleanup_token: cancellation_token.clone().drop_guard(),
193        });
194        this.setup_update_watcher(updates_receiver);
195        this.setup_autovacuum(vacuum_schedule, cancellation_token);
196        this.setup_data_saver::<C>(data_saver_receiver, updates_broadcaster);
197        let (upgrade_finished_sender, upgrade_finished) = oneshot::channel();
198        waaaa::spawn({
199            let db = this.db.clone();
200            async move {
201                let num_errors = C::reencode_old_versions(&*db).await;
202                let _ = upgrade_finished_sender.send(num_errors);
203            }
204        });
205        if let Some(login_info) = maybe_login {
206            this.login(login_info.url, login_info.user, login_info.token)
207                .await
208                .wrap_context("relogging in as previously logged-in user")?;
209        }
210        Ok((
211            this,
212            upgrade_finished.map(|res| res.expect("upgrade task was killed")),
213        ))
214    }
215
216    pub fn make_ulid(self: &Arc<Self>) -> Ulid {
217        // TODO(blocked): replace with generate_overflowing once https://github.com/dylanhart/ulid-rs/pull/75 lands
218        self.ulid
219            .lock()
220            .unwrap()
221            .generate()
222            .expect("Failed to generate ulid")
223    }
224
225    pub fn listen_for_all_updates(self: &Arc<Self>) -> broadcast::Receiver<ObjectId> {
226        self.updates_broadcastee.resubscribe()
227    }
228
229    pub fn listen_for_updates_on(
230        self: &Arc<Self>,
231        q: QueryId,
232    ) -> Option<broadcast::Receiver<ObjectId>> {
233        self.query_updates_broadcastees
234            .lock()
235            .unwrap()
236            .get(&q)
237            .map(|(_, recv)| recv.resubscribe())
238    }
239
240    fn setup_update_watcher(
241        self: &Arc<Self>,
242        mut updates_receiver: mpsc::UnboundedReceiver<Updates>,
243    ) {
244        // No need for a cancellation token: this task will automatically end as soon as the stream
245        // coming from `ApiDb` closes, which will happen when `ApiDb` gets dropped.
246        waaaa::spawn({
247            let data_saver = self.data_saver.clone();
248            async move {
249                while let Some(updates) = updates_receiver.next().await {
250                    for update in updates.data {
251                        let (reply, _) = oneshot::channel();
252                        data_saver
253                            .unbounded_send(DataSaverMessage::Data {
254                                update,
255                                now_have_all_until: Some(updates.now_have_all_until),
256                                additional_importance: Importance::NONE,
257                                reply,
258                            })
259                            .expect("data saver thread cannot go away");
260                        // Ignore the result of saving
261                    }
262                }
263            }
264        });
265    }
266
267    fn setup_autovacuum<F: 'static + Send + Fn(ClientStorageInfo) -> bool>(
268        self: &Arc<Self>,
269        vacuum_schedule: ClientVacuumSchedule<F>,
270        cancellation_token: CancellationToken,
271    ) {
272        let db = self.db.clone();
273        let vacuum_guard = self.vacuum_guard.clone();
274        let saved_objects = self.saved_objects.clone();
275        let saved_queries = self.saved_queries.clone();
276        let api = self.api.clone();
277        waaaa::spawn(async move {
278            loop {
279                match db.storage_info().await {
280                    Ok(storage_info) => {
281                        if (vacuum_schedule.filter)(storage_info) {
282                            let _lock = vacuum_guard.write().await;
283                            let to_unsubscribe = Arc::new(Mutex::new(HashSet::new()));
284                            if let Err(err) = db
285                                .client_vacuum(
286                                    {
287                                        let to_unsubscribe = to_unsubscribe.clone();
288                                        move |object_id| {
289                                            to_unsubscribe.lock().unwrap().insert(object_id);
290                                        }
291                                    },
292                                    {
293                                        let saved_queries = saved_queries.clone();
294                                        let api = api.clone();
295                                        move |query_id| {
296                                            saved_queries.lock().unwrap().remove(&query_id);
297                                            api.unsubscribe_query(query_id);
298                                        }
299                                    },
300                                )
301                                .await
302                            {
303                                tracing::error!(?err, "error occurred while vacuuming");
304                            }
305                            let mut to_unsubscribe = to_unsubscribe.lock().unwrap();
306                            if !to_unsubscribe.is_empty() {
307                                {
308                                    let mut saved_objects = saved_objects.lock().unwrap();
309                                    for object_id in to_unsubscribe.iter() {
310                                        saved_objects.remove(object_id);
311                                    }
312                                }
313                                api.unsubscribe(std::mem::take(&mut to_unsubscribe));
314                            }
315                        }
316                    }
317                    Err(err) => tracing::error!(
318                        ?err,
319                        "failed recovering storage info to check for vacuumability"
320                    ),
321                };
322
323                tokio::select! {
324                    _ = waaaa::sleep(vacuum_schedule.frequency) => (),
325                    _ = cancellation_token.cancelled() => break,
326                }
327            }
328        });
329    }
330
331    fn setup_data_saver<C: crdb_core::Config>(
332        self: &Arc<Self>,
333        data_receiver: mpsc::UnboundedReceiver<DataSaverMessage>,
334        updates_broadcaster: broadcast::Sender<ObjectId>,
335    ) {
336        let db = self.db.clone();
337        let api = self.api.clone();
338        let saved_objects = self.saved_objects.clone();
339        let saved_queries = self.saved_queries.clone();
340        let vacuum_guard = self.vacuum_guard.clone();
341        let query_updates_broadcasters = self.query_updates_broadcastees.clone();
342        let data_saver_skipper = self.data_saver_skipper.subscribe();
343        waaaa::spawn(async move {
344            Self::data_saver::<C>(
345                data_receiver,
346                data_saver_skipper,
347                saved_objects,
348                saved_queries,
349                vacuum_guard,
350                db,
351                api,
352                updates_broadcaster,
353                query_updates_broadcasters,
354            )
355            .await
356        });
357    }
358
359    #[allow(clippy::too_many_arguments)] // TODO(misc-low): refactor to have a good struct
360    async fn data_saver<C: crdb_core::Config>(
361        mut data_receiver: mpsc::UnboundedReceiver<DataSaverMessage>,
362        mut data_saver_skipper: watch::Receiver<bool>,
363        saved_objects: Arc<Mutex<SavedObjectMap>>,
364        saved_queries: Arc<Mutex<SavedQueriesMap>>,
365        vacuum_guard: Arc<tokio::sync::RwLock<()>>,
366        db: Arc<CacheDb<LocalDb>>,
367        api: Arc<ApiDb<LocalDb>>,
368        updates_broadcaster: broadcast::Sender<ObjectId>,
369        query_updates_broadcasters: Arc<Mutex<QueryUpdatesBroadcastMap>>,
370    ) {
371        // Handle all updates in-order! Without that, the updatedness checks will get completely borken up
372        while let Some(msg) = data_receiver.next().await {
373            match msg {
374                DataSaverMessage::StopFrame(reply) => {
375                    // Skip all messages until we received enough ResumeFrame
376                    let _ = reply.send(());
377                    let mut depth = 1;
378                    while let Some(msg) = data_receiver.next().await {
379                        match msg {
380                            DataSaverMessage::Data { .. } => (), // Skip until we're no longer stopped
381                            DataSaverMessage::StopFrame(reply) => {
382                                let _ = reply.send(());
383                                depth += 1;
384                            }
385                            DataSaverMessage::ResumeFrame => {
386                                depth -= 1;
387                            }
388                        }
389                        if depth == 0 {
390                            break;
391                        }
392                    }
393                }
394                DataSaverMessage::ResumeFrame => panic!("data saver protocol violation"),
395                DataSaverMessage::Data {
396                    update,
397                    now_have_all_until,
398                    additional_importance,
399                    reply,
400                } => {
401                    if *data_saver_skipper.borrow() {
402                        continue;
403                    }
404                    let _guard = vacuum_guard.read().await; // Do not vacuum while we're inserting new data
405                    match Self::save_data::<C>(
406                        &vacuum_guard,
407                        &db,
408                        &saved_objects,
409                        &saved_queries,
410                        &update,
411                        now_have_all_until,
412                        additional_importance,
413                        &updates_broadcaster,
414                        &query_updates_broadcasters,
415                    )
416                    .await
417                    {
418                        Ok(res) => {
419                            let _ = reply.send(Ok(res));
420                        }
421                        Err(crate::Error::ObjectDoesNotExist(_)) => {
422                            // Ignore this error, because if we received from the server an event with an object that does not exist
423                            // locally, it probably means that we recently vacuumed it and unsubscribed, but the server had already
424                            // sent us the update.
425                        }
426                        Err(crate::Error::MissingBinaries(binary_ids)) => {
427                            let fetch_binaries_res = tokio::select! {
428                                bins = Self::fetch_binaries(binary_ids, &db, &api) => bins,
429                                _ = data_saver_skipper.wait_for(|do_skip| *do_skip) => continue,
430                            };
431                            if let Err(err) = fetch_binaries_res {
432                                tracing::error!(
433                                    ?err,
434                                    ?update,
435                                    "failed retrieving required binaries for data the server sent us"
436                                );
437                                continue;
438                            }
439                            match Self::save_data::<C>(
440                                &vacuum_guard,
441                                &db,
442                                &saved_objects,
443                                &saved_queries,
444                                &update,
445                                now_have_all_until,
446                                additional_importance,
447                                &updates_broadcaster,
448                                &query_updates_broadcasters,
449                            )
450                            .await
451                            {
452                                Ok(res) => {
453                                    let _ = reply.send(Ok(res));
454                                }
455                                Err(err) => {
456                                    tracing::error!(
457                                        ?err,
458                                        ?update,
459                                        "unexpected error saving data after fetching the binaries"
460                                    );
461                                    let _ = reply.send(Err(err));
462                                }
463                            }
464                        }
465                        Err(err) => {
466                            tracing::error!(?err, ?update, "unexpected error saving data");
467                        }
468                    }
469                }
470            }
471        }
472    }
473
474    async fn fetch_binaries(
475        binary_ids: Vec<BinPtr>,
476        db: &CacheDb<LocalDb>,
477        api: &ApiDb<LocalDb>,
478    ) -> crate::Result<()> {
479        let mut bins = stream::iter(binary_ids)
480            .map(|binary_id| api.get_binary(binary_id).map(move |bin| (binary_id, bin)))
481            .buffer_unordered(16); // TODO(perf-low): is 16 a good number?
482        while let Some((binary_id, bin)) = bins.next().await {
483            match bin? {
484                Some(bin) => db.create_binary(binary_id, bin).await?,
485                None => {
486                    return Err(crate::Error::Other(anyhow!("Binary {binary_id:?} was not present on server, yet server sent us data requiring it")));
487                }
488            }
489        }
490        Ok(())
491    }
492
493    #[allow(clippy::too_many_arguments)] // TODO(misc-low): refactor to have a good struct
494    async fn save_data<C: crdb_core::Config>(
495        vacuum_guard: &tokio::sync::RwLock<()>,
496        db: &CacheDb<LocalDb>,
497        saved_objects: &Mutex<SavedObjectMap>,
498        saved_queries: &Mutex<SavedQueriesMap>,
499        u: &Update,
500        now_have_all_until: Option<Updatedness>,
501        additional_importance: Importance,
502        updates_broadcaster: &broadcast::Sender<ObjectId>,
503        query_updates_broadcasters: &Mutex<QueryUpdatesBroadcastMap>,
504    ) -> crate::Result<UpdateResult> {
505        let _lock = vacuum_guard.read().await; // avoid vacuum before setting importance
506        let object_id = u.object_id;
507        let res = match &u.data {
508            UpdateData::Creation {
509                type_id,
510                created_at,
511                snapshot_version,
512                data,
513            } => match C::recreate(
514                db,
515                *type_id,
516                object_id,
517                *created_at,
518                *snapshot_version,
519                data,
520                now_have_all_until,
521                additional_importance,
522            )
523            .await?
524            {
525                Some(res) => UpdateResult::LatestChanged(*type_id, res),
526                None => UpdateResult::LatestUnchanged,
527            },
528            UpdateData::Event {
529                type_id,
530                event_id,
531                data,
532            } => match C::submit(
533                db,
534                *type_id,
535                object_id,
536                *event_id,
537                data,
538                now_have_all_until,
539                additional_importance,
540            )
541            .await?
542            {
543                Some(res) => UpdateResult::LatestChanged(*type_id, res),
544                None => UpdateResult::LatestUnchanged,
545            },
546            UpdateData::LostReadRights => {
547                if let Err(err) = db.remove(object_id).await {
548                    tracing::error!(
549                        ?err,
550                        ?object_id,
551                        "failed removing object for which we lost read rights"
552                    );
553                }
554                UpdateResult::LostAccess
555            }
556        };
557        match &res {
558            UpdateResult::LostAccess => {
559                // Lost access to the object
560                let prev = saved_objects.lock().unwrap().remove(&object_id);
561                if let (Some(o), Some(now_have_all_until)) = (prev, now_have_all_until) {
562                    if let Err(err) = db
563                        .update_queries(&o.matches_queries, now_have_all_until)
564                        .await
565                    {
566                        tracing::error!(
567                            ?err,
568                            ?o.matches_queries,
569                            "failed updating now_have_all_until for queries"
570                        );
571                    }
572                    Self::broadcast_query_updates(
573                        object_id,
574                        saved_queries,
575                        query_updates_broadcasters,
576                        &o.matches_queries,
577                        Some(now_have_all_until),
578                    )?;
579                }
580            }
581            UpdateResult::LatestUnchanged => {
582                // No change in the object's latest_snapshot
583                if let Some(now_have_all_until) = now_have_all_until {
584                    let (queries, set_importance) = {
585                        let mut saved_objects = saved_objects.lock().unwrap();
586                        let Some(o) = saved_objects.get_mut(&object_id) else {
587                            return Err(crate::Error::Other(anyhow!("no change in object for which we received an update, but we had not saved it yet?")));
588                        };
589                        o.now_have_all_until(now_have_all_until);
590                        let queries = o.matches_queries.clone();
591                        if !o.importance.contains(additional_importance) {
592                            o.importance |= additional_importance;
593                            (queries, Some((object_id, o.importance)))
594                        } else {
595                            (queries, None)
596                        }
597                    };
598                    if let Some((object_id, importance)) = set_importance {
599                        db.set_object_importance(object_id, importance)
600                            .await
601                            .wrap_context("updating importance")?;
602                    }
603                    if let Err(err) = db.update_queries(&queries, now_have_all_until).await {
604                        tracing::error!(
605                            ?err,
606                            ?queries,
607                            "failed updating now_have_all_until for queries"
608                        );
609                    }
610                    Self::broadcast_query_updates(
611                        object_id,
612                        saved_queries,
613                        query_updates_broadcasters,
614                        &queries,
615                        Some(now_have_all_until),
616                    )?;
617                }
618            }
619            UpdateResult::LatestChanged(type_id, res) => {
620                // Something changed in the object's latest_snapshot
621                let (queries_after, importance_from_queries_after) =
622                    queries_for(&saved_queries.lock().unwrap(), *type_id, res);
623                if let Some(now_have_all_until) = now_have_all_until {
624                    let (
625                        queries_before,
626                        importance_from_queries_before,
627                        importance_before,
628                        importance_after,
629                    ) = match saved_objects.lock().unwrap().entry(object_id) {
630                        hash_map::Entry::Occupied(mut o) => {
631                            let o = o.get_mut();
632                            // Update easy metadata
633                            o.now_have_all_until(now_have_all_until);
634                            let importance_from_queries_before = o.importance_from_queries;
635                            let importance_before = o.importance;
636                            o.importance_from_queries = importance_from_queries_after;
637                            if !o.importance.contains(additional_importance) {
638                                o.importance |= additional_importance;
639                            }
640                            let importance_after = o.importance;
641
642                            // Update queries and broadcast removals (updates & additions will be broadcast later)
643                            let queries_before =
644                                std::mem::replace(&mut o.matches_queries, queries_after.clone());
645                            Self::broadcast_query_updates(
646                                object_id,
647                                saved_queries,
648                                query_updates_broadcasters,
649                                &o.matches_queries,
650                                Some(now_have_all_until),
651                            )?;
652
653                            (
654                                queries_before,
655                                importance_from_queries_before,
656                                importance_before,
657                                importance_after,
658                            )
659                        }
660                        hash_map::Entry::Vacant(v) => {
661                            v.insert(SavedObject {
662                                have_all_until: Some(now_have_all_until),
663                                importance: additional_importance,
664                                matches_queries: queries_after.clone(),
665                                importance_from_queries: importance_from_queries_after,
666                            });
667                            (
668                                HashSet::new(),
669                                Importance::NONE,
670                                Importance::NONE,
671                                additional_importance,
672                            )
673                        }
674                    };
675                    if importance_before != importance_after {
676                        db.set_object_importance(object_id, importance_after)
677                            .await
678                            .wrap_context("updating importance")?;
679                    }
680                    if importance_from_queries_before != importance_from_queries_after {
681                        db.set_importance_from_queries(object_id, importance_from_queries_after)
682                            .await
683                            .wrap_context("updating query locking status")?;
684                    }
685                    if let Err(err) = db
686                        .update_queries(
687                            &queries_before.union(&queries_after).copied().collect(),
688                            now_have_all_until,
689                        )
690                        .await
691                    {
692                        tracing::error!(
693                            ?err,
694                            ?queries_before,
695                            ?queries_after,
696                            "failed updating now_have_all_until for queries"
697                        );
698                    }
699                }
700                // Broadcast query updates even if the update was local (ie. without now_have_all_until)
701                Self::broadcast_query_updates(
702                    object_id,
703                    saved_queries,
704                    query_updates_broadcasters,
705                    &queries_after,
706                    now_have_all_until,
707                )?;
708            }
709        }
710        if let Err(err) = updates_broadcaster.send(object_id) {
711            tracing::error!(?err, ?object_id, "failed broadcasting update");
712        }
713        Ok(res)
714    }
715
716    fn broadcast_query_updates(
717        object_id: ObjectId,
718        saved_queries: &Mutex<SavedQueriesMap>,
719        query_updates_broadcasters: &Mutex<QueryUpdatesBroadcastMap>,
720        queries: &HashSet<QueryId>,
721        now_have_all_until: Option<Updatedness>,
722    ) -> crate::Result<()> {
723        let mut saved_queries = saved_queries.lock().unwrap();
724        let query_updates_broadcasters = query_updates_broadcasters.lock().unwrap();
725        for query_id in queries {
726            if let Some((sender, _)) = query_updates_broadcasters.get(query_id) {
727                if let Err(err) = sender.send(object_id) {
728                    tracing::error!(
729                        ?err,
730                        ?query_id,
731                        ?object_id,
732                        "failed broadcasting query update"
733                    );
734                }
735            }
736            if let Some(query) = saved_queries.get_mut(query_id) {
737                if let Some(u) = now_have_all_until {
738                    query.now_have_all_until(u);
739                }
740            }
741        }
742        Ok(())
743    }
744
745    pub fn on_connection_event(
746        self: &Arc<Self>,
747        cb: impl 'static + waaaa::Send + waaaa::Sync + Fn(ConnectionEvent),
748    ) {
749        self.api.on_connection_event(cb)
750    }
751
752    /// Note: The fact that `token` is actually a token for the `user` passed at creation of this [`ClientDb`]
753    /// is not actually checked, and is assumed to be true. Providing the wrong `user` may lead to object creations
754    /// or event submissions being spuriously rejected locally, but will not allow them to succeed remotely anyway.
755    pub async fn login(
756        self: &Arc<Self>,
757        url: Arc<String>,
758        user: User,
759        token: SessionToken,
760    ) -> crate::Result<()> {
761        if self
762            .user
763            .read()
764            .unwrap()
765            .map(|u| u != user)
766            .unwrap_or(false)
767        {
768            // There was already a user logged in, and it is not this user.
769            // Start by doing as though the previous user had logged out: kill everything with fire.
770            self.logout().await?;
771        }
772        *self.user.write().unwrap() = Some(user);
773        self.api.login(url.clone(), token);
774        self.db.save_login(LoginInfo { url, user, token }).await?;
775        Ok(())
776    }
777
778    pub async fn logout(&self) -> crate::Result<()> {
779        // Log out from server
780        self.api.logout();
781
782        // Clear local state
783        *self.user.write().unwrap() = None;
784        self.saved_objects.lock().unwrap().clear();
785        self.saved_queries.lock().unwrap().clear();
786
787        // Pause data saving and clear everything locally
788        let (reply, reply_receiver) = oneshot::channel();
789        self.data_saver_skipper.send_replace(true);
790        let _ = self
791            .data_saver
792            .unbounded_send(DataSaverMessage::StopFrame(reply));
793        let _ = reply_receiver.await;
794        self.db.remove_everything().await?;
795        self.data_saver_skipper.send_replace(false);
796        let _ = self
797            .data_saver
798            .unbounded_send(DataSaverMessage::ResumeFrame);
799
800        Ok(())
801    }
802
803    pub fn user(&self) -> Option<User> {
804        *self.user.read().unwrap()
805    }
806
807    pub fn watch_upload_queue(&self) -> watch::Receiver<Vec<UploadId>> {
808        self.api.watch_upload_queue()
809    }
810
811    pub async fn list_uploads(&self) -> crate::Result<Vec<UploadId>> {
812        self.db.list_uploads().await
813    }
814
815    pub async fn get_upload(
816        self: &Arc<Self>,
817        upload_id: UploadId,
818    ) -> crate::Result<Option<Upload>> {
819        self.db.get_upload(upload_id).await
820    }
821
822    pub fn rename_session(self: &Arc<Self>, name: String) -> oneshot::Receiver<crate::Result<()>> {
823        self.api.rename_session(name)
824    }
825
826    pub async fn current_session(&self) -> crate::Result<Session> {
827        self.api.current_session().await
828    }
829
830    pub async fn list_sessions(&self) -> crate::Result<Vec<Session>> {
831        self.api.list_sessions().await
832    }
833
834    pub fn disconnect_session(
835        self: &Arc<Self>,
836        session_ref: SessionRef,
837    ) -> oneshot::Receiver<crate::Result<()>> {
838        self.api.disconnect_session(session_ref)
839    }
840
841    /// Pauses the vacuum until the returned mutex guard is dropped
842    pub async fn pause_vacuum(&self) -> tokio::sync::RwLockReadGuard<'_, ()> {
843        self.vacuum_guard.read().await
844    }
845
846    pub async fn set_importance<T: Object>(
847        self: &Arc<Self>,
848        ptr: DbPtr<T>,
849        new_importance: Importance,
850    ) -> crate::Result<()> {
851        let object_id = ptr.to_object_id();
852        let old_importance = std::mem::replace(
853            &mut self
854                .saved_objects
855                .lock()
856                .unwrap()
857                .get_mut(&object_id)
858                .ok_or_else(|| crate::Error::ObjectDoesNotExist(object_id))?
859                .importance,
860            new_importance,
861        );
862        if old_importance != new_importance {
863            self.db
864                .set_object_importance(object_id, new_importance)
865                .await?;
866            if old_importance.subscribe() && !new_importance.subscribe() {
867                let mut set = HashSet::with_capacity(1);
868                set.insert(object_id);
869                self.api.unsubscribe(set);
870            } else if !old_importance.subscribe() && new_importance.subscribe() {
871                self.get::<T>(new_importance, ptr).await?;
872            }
873        }
874        Ok(())
875    }
876
877    pub fn list_saved_queries(self: &Arc<Self>) -> SavedQueriesMap {
878        self.saved_queries.lock().unwrap().clone()
879    }
880
881    // Note there's no remove_query method. This is fine, as non-locked queries will be
882    // vacuumed on the next vacuum run.
883    #[allow(clippy::await_holding_lock)] // TODO(api-highest): likely false positive, but do reinvestigate when finished API rewrite
884    pub async fn set_query_importance<T: Object>(
885        self: &Arc<Self>,
886        query_id: QueryId,
887        new_importance: Importance,
888    ) -> crate::Result<()> {
889        let mut saved_queries = self.saved_queries.lock().unwrap();
890        let (old_importance, query) = {
891            let Some(saved_query) = saved_queries.get_mut(&query_id) else {
892                return Err(crate::Error::QueryDoesNotExist(query_id));
893            };
894            let old_importance = saved_query.importance;
895            if saved_query.importance == new_importance {
896                return Ok(()); // Nothing to do
897            }
898            saved_query.importance = new_importance;
899            (old_importance, saved_query.query.clone())
900        };
901        let importance_changed_objects = {
902            // TODO(test-high): fuzz that subscribed_queries/objects always stay in sync with in-database data as well as server data
903            let mut saved_objects = self.saved_objects.lock().unwrap();
904            let mut importance_changed_objects = Vec::new();
905            // Increase importance when requested, decrease only if no other query keeps it up
906            for (object_id, o) in saved_objects.iter_mut() {
907                if o.matches_queries.contains(&query_id) {
908                    let new_importance = o
909                        .matches_queries
910                        .iter()
911                        .map(|query_id| match saved_queries.get(query_id) {
912                            Some(q) => q.importance,
913                            None => {
914                                tracing::error!(?query_id, "matched query was not saved");
915                                Importance::NONE
916                            }
917                        })
918                        .fold(Importance::NONE, |a, i| a | i);
919                    if new_importance != o.importance_from_queries {
920                        o.importance_from_queries = new_importance;
921                        importance_changed_objects.push(*object_id);
922                    }
923                }
924            }
925            importance_changed_objects
926        };
927        std::mem::drop(saved_queries);
928        if old_importance.subscribe() && !new_importance.subscribe() {
929            self.api.unsubscribe_query(query_id);
930        } else if !old_importance.subscribe() && new_importance.subscribe() {
931            self.query_remote::<T>(new_importance, query_id, query)
932                .await?
933                .count()
934                .await; // Drive the stream to completion
935        }
936        if old_importance != new_importance {
937            self.db
938                .set_query_importance(query_id, new_importance, importance_changed_objects)
939                .await?;
940        }
941        Ok(())
942    }
943
944    pub async fn create<T: Object>(
945        self: &Arc<Self>,
946        importance: Importance,
947        object: Arc<T>,
948    ) -> crate::Result<(Obj<T, LocalDb>, impl Future<Output = crate::Result<()>>)> {
949        let id = self.make_ulid();
950        let object_id = ObjectId(id);
951        let completion = self
952            .create_with(importance, object_id, EventId(id), object.clone())
953            .await?;
954        let res = Obj::new(DbPtr::from(object_id), object, self.clone());
955        Ok((res, completion))
956    }
957
958    pub async fn create_with<T: Object>(
959        self: &Arc<Self>,
960        importance: Importance,
961        object_id: ObjectId,
962        created_at: EventId,
963        object: Arc<T>,
964    ) -> crate::Result<impl Future<Output = crate::Result<()>>> {
965        // TODO(api-highest): unify this with data saver setup (and same for submit_with below)
966        let user =
967            self.user.read().unwrap().ok_or_else(|| {
968                crate::Error::Other(anyhow!("called `submit` with no known user"))
969            })?;
970        if !object
971            .can_create(user, object_id, &*self.db)
972            .await
973            .wrap_context("checking whether object creation seems to be allowed locally")?
974        {
975            return Err(crate::Error::Forbidden);
976        }
977        let _lock = self.vacuum_guard.read().await; // avoid vacuum before setting importance
978        let val = self
979            .db
980            .create(
981                object_id,
982                created_at,
983                object.clone(),
984                None, // Locally-created object, has no updatedness yet
985                importance,
986            )
987            .await?;
988        let do_subscribe = if let Some(val) = val {
989            let val_json =
990                serde_json::to_value(val).wrap_context("serializing new last snapshot")?;
991            let (matches_queries, importance_from_queries) = queries_for(
992                &self.saved_queries.lock().unwrap(),
993                *T::type_ulid(),
994                &val_json,
995            );
996            self.saved_objects.lock().unwrap().insert(
997                object_id,
998                SavedObject {
999                    have_all_until: None,
1000                    importance,
1001                    matches_queries,
1002                    importance_from_queries,
1003                },
1004            );
1005            if importance_from_queries != Importance::NONE {
1006                self.db
1007                    .set_importance_from_queries(object_id, importance_from_queries)
1008                    .await
1009                    .wrap_context("updating queries locks")?;
1010            }
1011            importance.subscribe() || importance_from_queries.subscribe()
1012        } else {
1013            importance.subscribe()
1014        };
1015        self.api
1016            .create(object_id, created_at, object, do_subscribe)
1017            .await
1018    }
1019
1020    pub async fn submit<T: Object>(
1021        self: &Arc<Self>,
1022        ptr: DbPtr<T>,
1023        event: T::Event,
1024    ) -> crate::Result<impl Future<Output = crate::Result<()>>> {
1025        let event_id = EventId(self.make_ulid());
1026        self.submit_with::<T>(
1027            Importance::NONE,
1028            ptr.to_object_id(),
1029            event_id,
1030            Arc::new(event),
1031        )
1032        .await
1033    }
1034
1035    /// Note: this will fail if the object is not subscribed upon yet: it would not make sense anyway.
1036    pub async fn submit_with<T: Object>(
1037        self: &Arc<Self>,
1038        importance: Importance,
1039        object_id: ObjectId,
1040        event_id: EventId,
1041        event: Arc<T::Event>,
1042    ) -> crate::Result<impl Future<Output = crate::Result<()>>> {
1043        let user =
1044            self.user.read().unwrap().ok_or_else(|| {
1045                crate::Error::Other(anyhow!("called `submit` with no known user"))
1046            })?;
1047        let _lock = self.vacuum_guard.read().await; // avoid vacuum before setting queries lock
1048        let object = self.db.get_latest::<T>(object_id, Importance::NONE).await?;
1049        if !object
1050            .can_apply(user, object_id, &event, &*self.db)
1051            .await
1052            .wrap_context("checking whether object creation seems to be allowed locally")?
1053        {
1054            return Err(crate::Error::Forbidden);
1055        }
1056        let val = self
1057            .db
1058            .submit::<T>(
1059                object_id,
1060                event_id,
1061                event.clone(),
1062                None, // Locally-submitted event, has no updatedness yet
1063                importance,
1064            )
1065            .await?;
1066        let do_subscribe = if let Some(val) = val {
1067            let val_json =
1068                serde_json::to_value(val).wrap_context("serializing new last snapshot")?;
1069            let (queries, importance_from_queries_after) = queries_for(
1070                &self.saved_queries.lock().unwrap(),
1071                *T::type_ulid(),
1072                &val_json,
1073            );
1074            let do_subscribe = importance.subscribe() || importance_from_queries_after.subscribe();
1075            let importance_from_queries_before = {
1076                let mut saved_objects = self.saved_objects.lock().unwrap();
1077                let o = saved_objects.get_mut(&object_id).ok_or_else(|| {
1078                    crate::Error::Other(anyhow!("Submitted event to non-saved object"))
1079                })?;
1080                o.matches_queries = queries;
1081                let importance_from_queries_before = o.importance_from_queries;
1082                o.importance_from_queries = importance_from_queries_after;
1083                importance_from_queries_before
1084            };
1085            if importance_from_queries_before != importance_from_queries_after {
1086                self.db
1087                    .set_importance_from_queries(object_id, importance_from_queries_after)
1088                    .await
1089                    .wrap_context("updating importance from queries")?;
1090            }
1091            do_subscribe
1092        } else {
1093            importance.subscribe()
1094        };
1095        // TODO(api-high): add Subscribe and Lock values (the struct, that also has info about queries) to Obj
1096        self.api
1097            .submit::<T>(object_id, event_id, event, do_subscribe)
1098            .await
1099    }
1100
1101    pub async fn get<T: Object>(
1102        self: &Arc<Self>,
1103        importance: Importance,
1104        ptr: DbPtr<T>,
1105    ) -> crate::Result<Obj<T, LocalDb>> {
1106        let object_id = ptr.to_object_id();
1107        match self.db.get_latest::<T>(object_id, importance).await {
1108            Ok(r) => return Ok(Obj::new(ptr, r, self.clone())),
1109            Err(crate::Error::ObjectDoesNotExist(_)) => (), // fall-through and fetch from API
1110            Err(e) => return Err(e),
1111        }
1112        let data = self.api.get(object_id, importance.subscribe()).await?;
1113        let res = save_object_data_locally::<T, _>(
1114            data,
1115            &self.data_saver,
1116            &self.db,
1117            importance,
1118            &self.saved_objects,
1119            &self.saved_queries,
1120        )
1121        .await?;
1122        Ok(Obj::new(ptr, res, self.clone()))
1123    }
1124
1125    pub async fn get_local<T: Object>(
1126        self: &Arc<Self>,
1127        importance: Importance,
1128        ptr: DbPtr<T>,
1129    ) -> crate::Result<Option<Obj<T, LocalDb>>> {
1130        let object_id = ptr.to_object_id();
1131        match self.db.get_latest::<T>(object_id, importance).await {
1132            Ok(res) => Ok(Some(Obj::new(ptr, res, self.clone()))),
1133            Err(crate::Error::ObjectDoesNotExist(o)) if o == object_id => Ok(None),
1134            Err(err) => Err(err),
1135        }
1136    }
1137
1138    pub async fn query_local<'a, T: Object>(
1139        self: &'a Arc<Self>,
1140        query: Arc<Query>,
1141    ) -> crate::Result<impl 'a + waaaa::Stream<Item = crate::Result<Obj<T, LocalDb>>>> {
1142        let object_ids = self
1143            .db
1144            .client_query(*T::type_ulid(), query)
1145            .await
1146            .wrap_context("listing objects matching query")?;
1147
1148        Ok(async_stream::stream! {
1149            for object_id in object_ids {
1150                match self.db.get_latest::<T>(object_id, Importance::NONE).await {
1151                    Ok(res) => yield Ok(Obj::new(DbPtr::from(object_id), res, self.clone())),
1152                    // Ignore missing objects, they were just vacuumed between listing and getting
1153                    Err(crate::Error::ObjectDoesNotExist(id)) if id == object_id => continue,
1154                    Err(err) => yield Err(err),
1155                }
1156            }
1157        })
1158    }
1159
1160    // TODO(client-med): should we somehow expose only_updated_since / now_have_all_until?
1161    /// Note that it is assumed here that the same QueryId will always be associated with the same Query.
1162    /// In particular, this means that when bumping an Object's snapshot_version and adjusting the queries
1163    /// accordingly, you should change the QueryId, as well as unsubscribe/resubscribe on startup so that
1164    /// the database gets updated.
1165    pub async fn query_remote<'a, T: Object>(
1166        self: &'a Arc<Self>,
1167        importance: Importance,
1168        query_id: QueryId,
1169        query: Arc<Query>,
1170    ) -> crate::Result<impl 'a + waaaa::Stream<Item = crate::Result<Obj<T, LocalDb>>>> {
1171        self.query_updates_broadcastees
1172            .lock()
1173            .unwrap()
1174            .entry(query_id)
1175            .or_insert_with(|| broadcast::channel(BROADCAST_CHANNEL_SIZE));
1176        let only_updated_since = {
1177            let mut subscribed_queries = self.saved_queries.lock().unwrap();
1178            let entry = subscribed_queries.entry(query_id);
1179            match entry {
1180                hash_map::Entry::Occupied(mut q) => {
1181                    if !q.get().importance.lock() && importance.lock() {
1182                        // Increasing the lock behavior. Re-subscribe from scratch
1183                        q.get_mut().importance |= importance;
1184                        q.get_mut().have_all_until = None; // Force have_all_until to 0 as previous stuff could have been vacuumed
1185                        None
1186                    } else {
1187                        // The query was already locked enough. Just proceed.
1188                        q.get().have_all_until
1189                    }
1190                }
1191                hash_map::Entry::Vacant(v) => {
1192                    v.insert(SavedQuery {
1193                        query: query.clone(),
1194                        type_id: *T::type_ulid(),
1195                        have_all_until: None,
1196                        importance,
1197                    });
1198                    None
1199                }
1200            }
1201        };
1202        // TODO(api-high): add "ephemeral query" option, that does not save the query to the database
1203        self.db
1204            .record_query(query_id, query.clone(), *T::type_ulid(), importance)
1205            .await?;
1206        Ok(self
1207            .api
1208            .query::<T>(query_id, only_updated_since, importance.subscribe(), query)
1209            .then({
1210                let data_saver = self.data_saver.clone();
1211                let db = self.db.clone();
1212                let saved_objects = self.saved_objects.clone();
1213                let saved_queries = self.saved_queries.clone();
1214                move |data| {
1215                    // TODO(api-high): async closures are now stable?
1216                    let data_saver = data_saver.clone();
1217                    let db = db.clone();
1218                    let saved_objects = saved_objects.clone();
1219                    let saved_queries = saved_queries.clone();
1220                    async move {
1221                        let (data, updatedness) = data?;
1222                        match data {
1223                            MaybeObject::NotYetSubscribed(data) => {
1224                                let object_id = data.object_id;
1225                                let res = save_object_data_locally::<T, _>(
1226                                    data,
1227                                    &data_saver,
1228                                    &db,
1229                                    Importance::NONE,
1230                                    &saved_objects,
1231                                    &saved_queries,
1232                                )
1233                                .await?;
1234                                if let Some(now_have_all_until) = updatedness {
1235                                    let mut the_query = HashSet::with_capacity(1);
1236                                    the_query.insert(query_id);
1237                                    db.update_queries(&the_query, now_have_all_until).await?;
1238                                    if let Some(q) =
1239                                        saved_queries.lock().unwrap().get_mut(&query_id)
1240                                    {
1241                                        q.now_have_all_until(now_have_all_until);
1242                                    }
1243                                }
1244                                Ok(Obj::new(DbPtr::from(object_id), res, self.clone()))
1245                            }
1246                            MaybeObject::AlreadySubscribed(object_id) => {
1247                                // TODO(api-highest): also add the query to o.matches_query and o.importance_from_queries,
1248                                // as well as reset self.db.set_importance_from_queries
1249                                // TODO(test-high): make sure o.matches_query, o.importance_from_queries, db's
1250                                // importance_from_queries and db's saved_queries are all aligned
1251                                // TODO(test-high): make sure o.importance and db's importance are aligned
1252                                self.db
1253                                    .get_latest::<T>(object_id, Importance::NONE)
1254                                    .await
1255                                    .map(|res| Obj::new(DbPtr::from(object_id), res, self.clone()))
1256                            }
1257                        }
1258                    }
1259                }
1260            }))
1261    }
1262
1263    // TODO(misc-low): should the client be allowed to request a recreation?
1264
1265    /// Note: when creating a binary, it can be vacuumed away at any time until an object or
1266    /// event is added that requires it. As such, you probably want to use `pause_vacuum`
1267    /// to make sure the created binary is not vacuumed away before the object or event
1268    /// had enough time to get created.
1269    pub async fn create_binary(self: &Arc<Self>, data: Arc<[u8]>) -> crate::Result<()> {
1270        let binary_id = crdb_core::hash_binary(&data);
1271        self.db.create_binary(binary_id, data.clone()).await
1272        // Do not create the binary over the API. We'll try uploading the object that requires it when
1273        // that happens, hoping for the binary to already be known by the server. If it is not, then we'll
1274        // create the binary there then re-send the object. This avoids spurious binary retransmits, and
1275        // needs to be handled anyway because the server could have vacuumed between the create_binary and
1276        // the create_object.
1277    }
1278
1279    pub async fn get_binary(
1280        self: &Arc<Self>,
1281        binary_id: BinPtr,
1282    ) -> anyhow::Result<Option<Arc<[u8]>>> {
1283        if let Some(res) = self.db.get_binary(binary_id).await? {
1284            return Ok(Some(res));
1285        }
1286        let Some(res) = self.api.get_binary(binary_id).await? else {
1287            return Ok(None);
1288        };
1289        self.db.create_binary(binary_id, res.clone()).await?;
1290        Ok(Some(res))
1291    }
1292}
1293
1294pub struct ClientVacuumSchedule<F> {
1295    frequency: Duration,
1296    filter: F,
1297}
1298
1299impl ClientVacuumSchedule<fn(ClientStorageInfo) -> bool> {
1300    pub fn new(frequency: Duration) -> Self {
1301        ClientVacuumSchedule {
1302            frequency,
1303            filter: |_| true,
1304        }
1305    }
1306
1307    pub fn never() -> Self {
1308        ClientVacuumSchedule {
1309            frequency: Duration::from_secs(86400),
1310            filter: |_| false,
1311        }
1312    }
1313}
1314
1315impl<F> ClientVacuumSchedule<F> {
1316    /// If the provided function returns `true`, then vacuum will happen
1317    ///
1318    /// The `ClientStorageInfo` provided is all approximate information.
1319    pub fn filter<G: Fn(ClientStorageInfo) -> bool>(self, filter: G) -> ClientVacuumSchedule<G> {
1320        ClientVacuumSchedule {
1321            frequency: self.frequency,
1322            filter,
1323        }
1324    }
1325}
1326
1327/// Returns the list of queries matching, as well as whether at least one of the
1328/// queries requires locking the value.
1329fn queries_for(
1330    saved_queries: &SavedQueriesMap,
1331    type_id: TypeId,
1332    value: &serde_json::Value,
1333) -> (HashSet<QueryId>, Importance) {
1334    let mut importance = Importance::NONE;
1335    let queries = saved_queries
1336        .iter()
1337        .filter_map(|(id, q)| {
1338            if type_id == q.type_id && q.query.matches_json(value) {
1339                importance |= q.importance;
1340                Some(*id)
1341            } else {
1342                None
1343            }
1344        })
1345        .collect();
1346    (queries, importance)
1347}
1348
1349/// Returns the latest snapshot for the object described by `data`
1350async fn locally_create_all<T: Object, LocalDb: ClientSideDb>(
1351    data_saver: &mpsc::UnboundedSender<DataSaverMessage>,
1352    db: &CacheDb<LocalDb>,
1353    importance: Importance,
1354    data: ObjectData,
1355) -> crate::Result<Option<serde_json::Value>> {
1356    if data.type_id != *T::type_ulid() {
1357        return Err(crate::Error::WrongType {
1358            object_id: data.object_id,
1359            expected_type_id: *T::type_ulid(),
1360            real_type_id: data.type_id,
1361        });
1362    }
1363
1364    // First, submit object creation request
1365    let mut latest_snapshot_returns =
1366        Vec::with_capacity(data.events.len() + data.creation_snapshot.is_some() as usize);
1367    if let Some((created_at, snapshot_version, snapshot_data)) = data.creation_snapshot {
1368        let (reply, reply_receiver) = oneshot::channel();
1369        latest_snapshot_returns.push(reply_receiver);
1370        data_saver
1371            .unbounded_send(DataSaverMessage::Data {
1372                update: Arc::new(Update {
1373                    object_id: data.object_id,
1374                    data: UpdateData::Creation {
1375                        type_id: data.type_id,
1376                        created_at,
1377                        snapshot_version,
1378                        data: snapshot_data,
1379                    },
1380                }),
1381                now_have_all_until: data.events.is_empty().then_some(data.now_have_all_until),
1382                additional_importance: importance,
1383                reply,
1384            })
1385            .map_err(|_| crate::Error::Other(anyhow!("Data saver task disappeared early")))?;
1386    } else if importance != Importance::NONE {
1387        db.get_latest::<T>(data.object_id, importance)
1388            .await
1389            .wrap_context("updating importance for object as requested")?;
1390    }
1391
1392    // Then, submit all the events
1393    let events_len = data.events.len();
1394    for (i, (event_id, event)) in data.events.into_iter().enumerate() {
1395        // We already handled the importance of the object just above if requested
1396        let (reply, reply_receiver) = oneshot::channel();
1397        latest_snapshot_returns.push(reply_receiver);
1398        data_saver
1399            .unbounded_send(DataSaverMessage::Data {
1400                update: Arc::new(Update {
1401                    object_id: data.object_id,
1402                    data: UpdateData::Event {
1403                        type_id: data.type_id,
1404                        event_id,
1405                        data: event,
1406                    },
1407                }),
1408                now_have_all_until: (i + 1 == events_len).then_some(data.now_have_all_until),
1409                additional_importance: Importance::NONE,
1410                reply,
1411            })
1412            .map_err(|_| crate::Error::Other(anyhow!("Data saver task disappeared early")))?;
1413    }
1414
1415    // Finally, retrieve the result
1416    let mut res = None;
1417    for receiver in latest_snapshot_returns {
1418        if let Ok(res_data) = receiver.await {
1419            match res_data {
1420                Ok(UpdateResult::LatestChanged(_, res_data)) => res = Some(res_data),
1421                Ok(_) => (),
1422                Err(crate::Error::ObjectDoesNotExist(o))
1423                    if o == data.object_id && !importance.lock() =>
1424                {
1425                    // Ignore this error, the object was just vacuumed during creation
1426                }
1427                Err(err) => return Err(err).wrap_context("creating in local database"),
1428            }
1429        }
1430    }
1431
1432    Ok(res)
1433}
1434
1435async fn save_object_data_locally<T: Object, LocalDb: ClientSideDb>(
1436    data: ObjectData,
1437    data_saver: &mpsc::UnboundedSender<DataSaverMessage>,
1438    db: &CacheDb<LocalDb>,
1439    importance: Importance,
1440    saved_objects: &Mutex<SavedObjectMap>,
1441    saved_queries: &Mutex<SavedQueriesMap>,
1442) -> crate::Result<Arc<T>> {
1443    let now_have_all_until = data.now_have_all_until;
1444    let object_id = data.object_id;
1445    let type_id = data.type_id;
1446    if type_id != *T::type_ulid() {
1447        return Err(crate::Error::WrongType {
1448            object_id,
1449            expected_type_id: *T::type_ulid(),
1450            real_type_id: type_id,
1451        });
1452    }
1453    let res_json = locally_create_all::<T, _>(data_saver, db, importance, data).await?;
1454    let (res, res_json) = match res_json {
1455        Some(res_json) => (
1456            Arc::new(T::deserialize(&res_json).wrap_context("deserializing snapshot")?),
1457            res_json,
1458        ),
1459        None => {
1460            let res = db.get_latest::<T>(object_id, importance).await?;
1461            let res_json = serde_json::to_value(&res).wrap_context("serializing snapshot")?;
1462            (res, res_json)
1463        }
1464    };
1465    let (queries, importance_from_queries) =
1466        queries_for(&saved_queries.lock().unwrap(), type_id, &res_json);
1467    db.set_importance_from_queries(object_id, importance_from_queries)
1468        .await
1469        .wrap_context("updating queries lock")?;
1470    let mut saved_objects = saved_objects.lock().unwrap();
1471    match saved_objects.entry(object_id) {
1472        hash_map::Entry::Occupied(mut o) => {
1473            let o = o.get_mut();
1474            o.now_have_all_until(now_have_all_until);
1475            o.importance |= importance;
1476            o.matches_queries = queries;
1477            o.importance_from_queries = importance_from_queries;
1478        }
1479        hash_map::Entry::Vacant(v) => {
1480            v.insert(SavedObject {
1481                have_all_until: Some(now_have_all_until),
1482                importance,
1483                matches_queries: queries,
1484                importance_from_queries,
1485            });
1486        }
1487    }
1488    Ok(res)
1489}
1490
1491// TODO(api-highest): is this really required?
1492impl<LocalDb: ClientSideDb> Deref for ClientDb<LocalDb> {
1493    type Target = CacheDb<LocalDb>;
1494
1495    fn deref(&self) -> &Self::Target {
1496        &self.db
1497    }
1498}