1use super::{api_db::OnError, connection::ConnectionEvent, ApiDb};
2use crate::Obj;
3use anyhow::anyhow;
4use crdb_cache::CacheDb;
5use crdb_core::{
6 BinPtr, ClientSideDb, Db, DbPtr, EventId, Importance, MaybeObject, Object, ObjectData,
7 ObjectId, Query, QueryId, ResultExt, SavedQuery, Session, SessionRef, SessionToken, TypeId,
8 Update, UpdateData, Updatedness, Updates, Upload, UploadId, User,
9};
10use crdb_core::{ClientStorageInfo, LoginInfo};
11use futures::{channel::mpsc, stream, FutureExt, StreamExt, TryStreamExt};
12use std::ops::Deref;
13use std::{
14 collections::{hash_map, HashMap, HashSet},
15 future::Future,
16 sync::{Arc, Mutex, RwLock},
17 time::Duration,
18};
19use tokio::sync::{broadcast, oneshot, watch};
20use tokio_util::sync::CancellationToken;
21use ulid::Ulid;
22
23enum UpdateResult {
24 LostAccess,
25 LatestUnchanged,
26 LatestChanged(TypeId, serde_json::Value),
27}
28
29#[derive(Clone)]
30pub(crate) struct SavedObject {
31 pub have_all_until: Option<Updatedness>,
32 pub importance: Importance,
33 pub matches_queries: HashSet<QueryId>,
34 pub importance_from_queries: Importance,
35}
36
37impl SavedObject {
38 fn now_have_all_until(&mut self, until: Updatedness) {
39 self.have_all_until = std::cmp::max(self.have_all_until, Some(until));
40 }
41}
42
43type SavedObjectMap = HashMap<ObjectId, SavedObject>;
44type SavedQueriesMap = HashMap<QueryId, SavedQuery>;
45type QueryUpdatesBroadcastMap =
46 HashMap<QueryId, (broadcast::Sender<ObjectId>, broadcast::Receiver<ObjectId>)>;
47
48pub struct ClientDb<LocalDb: ClientSideDb> {
49 user: RwLock<Option<User>>,
50 ulid: Mutex<ulid::Generator>,
51 api: Arc<ApiDb<LocalDb>>,
52 db: Arc<CacheDb<LocalDb>>,
53 saved_objects: Arc<Mutex<SavedObjectMap>>,
56 saved_queries: Arc<Mutex<SavedQueriesMap>>,
57 data_saver: mpsc::UnboundedSender<DataSaverMessage>,
58 data_saver_skipper: watch::Sender<bool>,
59 updates_broadcastee: broadcast::Receiver<ObjectId>,
60 query_updates_broadcastees: Arc<Mutex<QueryUpdatesBroadcastMap>>,
61 vacuum_guard: Arc<tokio::sync::RwLock<()>>,
62 _cleanup_token: tokio_util::sync::DropGuard,
63}
64
65const BROADCAST_CHANNEL_SIZE: usize = 64;
66
67enum DataSaverMessage {
68 Data {
69 update: Arc<Update>,
70 now_have_all_until: Option<Updatedness>,
71 additional_importance: Importance,
72 reply: oneshot::Sender<crate::Result<UpdateResult>>,
73 },
74 StopFrame(oneshot::Sender<()>),
75 ResumeFrame,
76}
77
78impl<LocalDb: ClientSideDb> ClientDb<LocalDb> {
79 pub async fn connect<C, RRL, EH, EHF, VS>(
80 config: C,
81 db: LocalDb,
82 cache_watermark: usize,
83 require_relogin: RRL,
84 error_handler: EH,
85 vacuum_schedule: ClientVacuumSchedule<VS>,
86 ) -> crate::Result<(Arc<ClientDb<LocalDb>>, impl waaaa::Future<Output = usize>)>
87 where
88 C: crdb_core::Config,
89 RRL: 'static + waaaa::Send + Fn(),
90 EH: 'static + waaaa::Send + Fn(Upload, crate::Error) -> EHF,
91 EHF: 'static + waaaa::Future<Output = OnError>,
92 VS: 'static + Send + Fn(ClientStorageInfo) -> bool,
93 {
94 let _ = config; C::check_ulids();
96 let (updates_broadcaster, updates_broadcastee) = broadcast::channel(BROADCAST_CHANNEL_SIZE);
97 let db = Arc::new(CacheDb::new(db, cache_watermark));
98 let maybe_login = db
99 .get_saved_login()
100 .await
101 .wrap_context("retrieving saved login info")?;
102 if maybe_login.is_none() {
103 (require_relogin)();
104 }
105 let saved_queries = db
106 .get_saved_queries()
107 .await
108 .wrap_context("listing saved queries")?;
109 let saved_objects = stream::iter(
110 db.get_saved_objects()
111 .await
112 .wrap_context("listing saved objects")?,
113 )
114 .map(async |(object_id, saved)| -> crate::Result<_> {
115 let value = db.get_json(object_id, Importance::NONE).await?;
117 let (matches_queries, importance_from_queries) =
118 queries_for(&saved_queries, saved.type_id, &value);
119 Ok((
120 object_id,
121 SavedObject {
122 have_all_until: saved.have_all_until,
123 importance: saved.importance,
124 matches_queries,
125 importance_from_queries,
126 },
127 ))
128 })
129 .buffer_unordered(16)
130 .try_collect::<HashMap<_, _>>()
131 .await?;
132 let query_updates_broadcastees = Arc::new(Mutex::new(
133 saved_queries
134 .keys()
135 .map(|&q| (q, broadcast::channel(BROADCAST_CHANNEL_SIZE)))
136 .collect(),
137 ));
138 let saved_queries = Arc::new(Mutex::new(saved_queries));
139 let saved_objects = Arc::new(Mutex::new(saved_objects));
140 let (api, updates_receiver) = ApiDb::new::<C, _, _, _, _, _>(
141 db.clone(),
142 {
143 let saved_objects = saved_objects.clone();
144 move || {
145 saved_objects
153 .lock()
154 .unwrap()
155 .iter()
156 .filter(|(_, o)| o.importance.subscribe())
157 .map(|(id, o)| (*id, o.clone()))
158 .collect()
159 }
160 },
161 {
162 let saved_queries = saved_queries.clone();
163 move || {
164 saved_queries
165 .lock()
166 .unwrap()
167 .iter()
168 .filter(|(_, q)| q.importance.subscribe())
169 .map(|(id, q)| (*id, q.clone()))
170 .collect()
171 }
172 },
173 error_handler,
174 require_relogin,
175 )
176 .await?;
177 let api = Arc::new(api);
178 let cancellation_token = CancellationToken::new();
179 let (data_saver, data_saver_receiver) = mpsc::unbounded();
180 let this = Arc::new(ClientDb {
181 user: RwLock::new(maybe_login.as_ref().map(|l| l.user)),
182 ulid: Mutex::new(ulid::Generator::new()),
183 api,
184 db,
185 saved_objects,
186 saved_queries,
187 data_saver,
188 data_saver_skipper: watch::channel(false).0,
189 updates_broadcastee,
190 query_updates_broadcastees,
191 vacuum_guard: Arc::new(tokio::sync::RwLock::new(())),
192 _cleanup_token: cancellation_token.clone().drop_guard(),
193 });
194 this.setup_update_watcher(updates_receiver);
195 this.setup_autovacuum(vacuum_schedule, cancellation_token);
196 this.setup_data_saver::<C>(data_saver_receiver, updates_broadcaster);
197 let (upgrade_finished_sender, upgrade_finished) = oneshot::channel();
198 waaaa::spawn({
199 let db = this.db.clone();
200 async move {
201 let num_errors = C::reencode_old_versions(&*db).await;
202 let _ = upgrade_finished_sender.send(num_errors);
203 }
204 });
205 if let Some(login_info) = maybe_login {
206 this.login(login_info.url, login_info.user, login_info.token)
207 .await
208 .wrap_context("relogging in as previously logged-in user")?;
209 }
210 Ok((
211 this,
212 upgrade_finished.map(|res| res.expect("upgrade task was killed")),
213 ))
214 }
215
216 pub fn make_ulid(self: &Arc<Self>) -> Ulid {
217 self.ulid
219 .lock()
220 .unwrap()
221 .generate()
222 .expect("Failed to generate ulid")
223 }
224
225 pub fn listen_for_all_updates(self: &Arc<Self>) -> broadcast::Receiver<ObjectId> {
226 self.updates_broadcastee.resubscribe()
227 }
228
229 pub fn listen_for_updates_on(
230 self: &Arc<Self>,
231 q: QueryId,
232 ) -> Option<broadcast::Receiver<ObjectId>> {
233 self.query_updates_broadcastees
234 .lock()
235 .unwrap()
236 .get(&q)
237 .map(|(_, recv)| recv.resubscribe())
238 }
239
240 fn setup_update_watcher(
241 self: &Arc<Self>,
242 mut updates_receiver: mpsc::UnboundedReceiver<Updates>,
243 ) {
244 waaaa::spawn({
247 let data_saver = self.data_saver.clone();
248 async move {
249 while let Some(updates) = updates_receiver.next().await {
250 for update in updates.data {
251 let (reply, _) = oneshot::channel();
252 data_saver
253 .unbounded_send(DataSaverMessage::Data {
254 update,
255 now_have_all_until: Some(updates.now_have_all_until),
256 additional_importance: Importance::NONE,
257 reply,
258 })
259 .expect("data saver thread cannot go away");
260 }
262 }
263 }
264 });
265 }
266
267 fn setup_autovacuum<F: 'static + Send + Fn(ClientStorageInfo) -> bool>(
268 self: &Arc<Self>,
269 vacuum_schedule: ClientVacuumSchedule<F>,
270 cancellation_token: CancellationToken,
271 ) {
272 let db = self.db.clone();
273 let vacuum_guard = self.vacuum_guard.clone();
274 let saved_objects = self.saved_objects.clone();
275 let saved_queries = self.saved_queries.clone();
276 let api = self.api.clone();
277 waaaa::spawn(async move {
278 loop {
279 match db.storage_info().await {
280 Ok(storage_info) => {
281 if (vacuum_schedule.filter)(storage_info) {
282 let _lock = vacuum_guard.write().await;
283 let to_unsubscribe = Arc::new(Mutex::new(HashSet::new()));
284 if let Err(err) = db
285 .client_vacuum(
286 {
287 let to_unsubscribe = to_unsubscribe.clone();
288 move |object_id| {
289 to_unsubscribe.lock().unwrap().insert(object_id);
290 }
291 },
292 {
293 let saved_queries = saved_queries.clone();
294 let api = api.clone();
295 move |query_id| {
296 saved_queries.lock().unwrap().remove(&query_id);
297 api.unsubscribe_query(query_id);
298 }
299 },
300 )
301 .await
302 {
303 tracing::error!(?err, "error occurred while vacuuming");
304 }
305 let mut to_unsubscribe = to_unsubscribe.lock().unwrap();
306 if !to_unsubscribe.is_empty() {
307 {
308 let mut saved_objects = saved_objects.lock().unwrap();
309 for object_id in to_unsubscribe.iter() {
310 saved_objects.remove(object_id);
311 }
312 }
313 api.unsubscribe(std::mem::take(&mut to_unsubscribe));
314 }
315 }
316 }
317 Err(err) => tracing::error!(
318 ?err,
319 "failed recovering storage info to check for vacuumability"
320 ),
321 };
322
323 tokio::select! {
324 _ = waaaa::sleep(vacuum_schedule.frequency) => (),
325 _ = cancellation_token.cancelled() => break,
326 }
327 }
328 });
329 }
330
331 fn setup_data_saver<C: crdb_core::Config>(
332 self: &Arc<Self>,
333 data_receiver: mpsc::UnboundedReceiver<DataSaverMessage>,
334 updates_broadcaster: broadcast::Sender<ObjectId>,
335 ) {
336 let db = self.db.clone();
337 let api = self.api.clone();
338 let saved_objects = self.saved_objects.clone();
339 let saved_queries = self.saved_queries.clone();
340 let vacuum_guard = self.vacuum_guard.clone();
341 let query_updates_broadcasters = self.query_updates_broadcastees.clone();
342 let data_saver_skipper = self.data_saver_skipper.subscribe();
343 waaaa::spawn(async move {
344 Self::data_saver::<C>(
345 data_receiver,
346 data_saver_skipper,
347 saved_objects,
348 saved_queries,
349 vacuum_guard,
350 db,
351 api,
352 updates_broadcaster,
353 query_updates_broadcasters,
354 )
355 .await
356 });
357 }
358
359 #[allow(clippy::too_many_arguments)] async fn data_saver<C: crdb_core::Config>(
361 mut data_receiver: mpsc::UnboundedReceiver<DataSaverMessage>,
362 mut data_saver_skipper: watch::Receiver<bool>,
363 saved_objects: Arc<Mutex<SavedObjectMap>>,
364 saved_queries: Arc<Mutex<SavedQueriesMap>>,
365 vacuum_guard: Arc<tokio::sync::RwLock<()>>,
366 db: Arc<CacheDb<LocalDb>>,
367 api: Arc<ApiDb<LocalDb>>,
368 updates_broadcaster: broadcast::Sender<ObjectId>,
369 query_updates_broadcasters: Arc<Mutex<QueryUpdatesBroadcastMap>>,
370 ) {
371 while let Some(msg) = data_receiver.next().await {
373 match msg {
374 DataSaverMessage::StopFrame(reply) => {
375 let _ = reply.send(());
377 let mut depth = 1;
378 while let Some(msg) = data_receiver.next().await {
379 match msg {
380 DataSaverMessage::Data { .. } => (), DataSaverMessage::StopFrame(reply) => {
382 let _ = reply.send(());
383 depth += 1;
384 }
385 DataSaverMessage::ResumeFrame => {
386 depth -= 1;
387 }
388 }
389 if depth == 0 {
390 break;
391 }
392 }
393 }
394 DataSaverMessage::ResumeFrame => panic!("data saver protocol violation"),
395 DataSaverMessage::Data {
396 update,
397 now_have_all_until,
398 additional_importance,
399 reply,
400 } => {
401 if *data_saver_skipper.borrow() {
402 continue;
403 }
404 let _guard = vacuum_guard.read().await; match Self::save_data::<C>(
406 &vacuum_guard,
407 &db,
408 &saved_objects,
409 &saved_queries,
410 &update,
411 now_have_all_until,
412 additional_importance,
413 &updates_broadcaster,
414 &query_updates_broadcasters,
415 )
416 .await
417 {
418 Ok(res) => {
419 let _ = reply.send(Ok(res));
420 }
421 Err(crate::Error::ObjectDoesNotExist(_)) => {
422 }
426 Err(crate::Error::MissingBinaries(binary_ids)) => {
427 let fetch_binaries_res = tokio::select! {
428 bins = Self::fetch_binaries(binary_ids, &db, &api) => bins,
429 _ = data_saver_skipper.wait_for(|do_skip| *do_skip) => continue,
430 };
431 if let Err(err) = fetch_binaries_res {
432 tracing::error!(
433 ?err,
434 ?update,
435 "failed retrieving required binaries for data the server sent us"
436 );
437 continue;
438 }
439 match Self::save_data::<C>(
440 &vacuum_guard,
441 &db,
442 &saved_objects,
443 &saved_queries,
444 &update,
445 now_have_all_until,
446 additional_importance,
447 &updates_broadcaster,
448 &query_updates_broadcasters,
449 )
450 .await
451 {
452 Ok(res) => {
453 let _ = reply.send(Ok(res));
454 }
455 Err(err) => {
456 tracing::error!(
457 ?err,
458 ?update,
459 "unexpected error saving data after fetching the binaries"
460 );
461 let _ = reply.send(Err(err));
462 }
463 }
464 }
465 Err(err) => {
466 tracing::error!(?err, ?update, "unexpected error saving data");
467 }
468 }
469 }
470 }
471 }
472 }
473
474 async fn fetch_binaries(
475 binary_ids: Vec<BinPtr>,
476 db: &CacheDb<LocalDb>,
477 api: &ApiDb<LocalDb>,
478 ) -> crate::Result<()> {
479 let mut bins = stream::iter(binary_ids)
480 .map(|binary_id| api.get_binary(binary_id).map(move |bin| (binary_id, bin)))
481 .buffer_unordered(16); while let Some((binary_id, bin)) = bins.next().await {
483 match bin? {
484 Some(bin) => db.create_binary(binary_id, bin).await?,
485 None => {
486 return Err(crate::Error::Other(anyhow!("Binary {binary_id:?} was not present on server, yet server sent us data requiring it")));
487 }
488 }
489 }
490 Ok(())
491 }
492
493 #[allow(clippy::too_many_arguments)] async fn save_data<C: crdb_core::Config>(
495 vacuum_guard: &tokio::sync::RwLock<()>,
496 db: &CacheDb<LocalDb>,
497 saved_objects: &Mutex<SavedObjectMap>,
498 saved_queries: &Mutex<SavedQueriesMap>,
499 u: &Update,
500 now_have_all_until: Option<Updatedness>,
501 additional_importance: Importance,
502 updates_broadcaster: &broadcast::Sender<ObjectId>,
503 query_updates_broadcasters: &Mutex<QueryUpdatesBroadcastMap>,
504 ) -> crate::Result<UpdateResult> {
505 let _lock = vacuum_guard.read().await; let object_id = u.object_id;
507 let res = match &u.data {
508 UpdateData::Creation {
509 type_id,
510 created_at,
511 snapshot_version,
512 data,
513 } => match C::recreate(
514 db,
515 *type_id,
516 object_id,
517 *created_at,
518 *snapshot_version,
519 data,
520 now_have_all_until,
521 additional_importance,
522 )
523 .await?
524 {
525 Some(res) => UpdateResult::LatestChanged(*type_id, res),
526 None => UpdateResult::LatestUnchanged,
527 },
528 UpdateData::Event {
529 type_id,
530 event_id,
531 data,
532 } => match C::submit(
533 db,
534 *type_id,
535 object_id,
536 *event_id,
537 data,
538 now_have_all_until,
539 additional_importance,
540 )
541 .await?
542 {
543 Some(res) => UpdateResult::LatestChanged(*type_id, res),
544 None => UpdateResult::LatestUnchanged,
545 },
546 UpdateData::LostReadRights => {
547 if let Err(err) = db.remove(object_id).await {
548 tracing::error!(
549 ?err,
550 ?object_id,
551 "failed removing object for which we lost read rights"
552 );
553 }
554 UpdateResult::LostAccess
555 }
556 };
557 match &res {
558 UpdateResult::LostAccess => {
559 let prev = saved_objects.lock().unwrap().remove(&object_id);
561 if let (Some(o), Some(now_have_all_until)) = (prev, now_have_all_until) {
562 if let Err(err) = db
563 .update_queries(&o.matches_queries, now_have_all_until)
564 .await
565 {
566 tracing::error!(
567 ?err,
568 ?o.matches_queries,
569 "failed updating now_have_all_until for queries"
570 );
571 }
572 Self::broadcast_query_updates(
573 object_id,
574 saved_queries,
575 query_updates_broadcasters,
576 &o.matches_queries,
577 Some(now_have_all_until),
578 )?;
579 }
580 }
581 UpdateResult::LatestUnchanged => {
582 if let Some(now_have_all_until) = now_have_all_until {
584 let (queries, set_importance) = {
585 let mut saved_objects = saved_objects.lock().unwrap();
586 let Some(o) = saved_objects.get_mut(&object_id) else {
587 return Err(crate::Error::Other(anyhow!("no change in object for which we received an update, but we had not saved it yet?")));
588 };
589 o.now_have_all_until(now_have_all_until);
590 let queries = o.matches_queries.clone();
591 if !o.importance.contains(additional_importance) {
592 o.importance |= additional_importance;
593 (queries, Some((object_id, o.importance)))
594 } else {
595 (queries, None)
596 }
597 };
598 if let Some((object_id, importance)) = set_importance {
599 db.set_object_importance(object_id, importance)
600 .await
601 .wrap_context("updating importance")?;
602 }
603 if let Err(err) = db.update_queries(&queries, now_have_all_until).await {
604 tracing::error!(
605 ?err,
606 ?queries,
607 "failed updating now_have_all_until for queries"
608 );
609 }
610 Self::broadcast_query_updates(
611 object_id,
612 saved_queries,
613 query_updates_broadcasters,
614 &queries,
615 Some(now_have_all_until),
616 )?;
617 }
618 }
619 UpdateResult::LatestChanged(type_id, res) => {
620 let (queries_after, importance_from_queries_after) =
622 queries_for(&saved_queries.lock().unwrap(), *type_id, res);
623 if let Some(now_have_all_until) = now_have_all_until {
624 let (
625 queries_before,
626 importance_from_queries_before,
627 importance_before,
628 importance_after,
629 ) = match saved_objects.lock().unwrap().entry(object_id) {
630 hash_map::Entry::Occupied(mut o) => {
631 let o = o.get_mut();
632 o.now_have_all_until(now_have_all_until);
634 let importance_from_queries_before = o.importance_from_queries;
635 let importance_before = o.importance;
636 o.importance_from_queries = importance_from_queries_after;
637 if !o.importance.contains(additional_importance) {
638 o.importance |= additional_importance;
639 }
640 let importance_after = o.importance;
641
642 let queries_before =
644 std::mem::replace(&mut o.matches_queries, queries_after.clone());
645 Self::broadcast_query_updates(
646 object_id,
647 saved_queries,
648 query_updates_broadcasters,
649 &o.matches_queries,
650 Some(now_have_all_until),
651 )?;
652
653 (
654 queries_before,
655 importance_from_queries_before,
656 importance_before,
657 importance_after,
658 )
659 }
660 hash_map::Entry::Vacant(v) => {
661 v.insert(SavedObject {
662 have_all_until: Some(now_have_all_until),
663 importance: additional_importance,
664 matches_queries: queries_after.clone(),
665 importance_from_queries: importance_from_queries_after,
666 });
667 (
668 HashSet::new(),
669 Importance::NONE,
670 Importance::NONE,
671 additional_importance,
672 )
673 }
674 };
675 if importance_before != importance_after {
676 db.set_object_importance(object_id, importance_after)
677 .await
678 .wrap_context("updating importance")?;
679 }
680 if importance_from_queries_before != importance_from_queries_after {
681 db.set_importance_from_queries(object_id, importance_from_queries_after)
682 .await
683 .wrap_context("updating query locking status")?;
684 }
685 if let Err(err) = db
686 .update_queries(
687 &queries_before.union(&queries_after).copied().collect(),
688 now_have_all_until,
689 )
690 .await
691 {
692 tracing::error!(
693 ?err,
694 ?queries_before,
695 ?queries_after,
696 "failed updating now_have_all_until for queries"
697 );
698 }
699 }
700 Self::broadcast_query_updates(
702 object_id,
703 saved_queries,
704 query_updates_broadcasters,
705 &queries_after,
706 now_have_all_until,
707 )?;
708 }
709 }
710 if let Err(err) = updates_broadcaster.send(object_id) {
711 tracing::error!(?err, ?object_id, "failed broadcasting update");
712 }
713 Ok(res)
714 }
715
716 fn broadcast_query_updates(
717 object_id: ObjectId,
718 saved_queries: &Mutex<SavedQueriesMap>,
719 query_updates_broadcasters: &Mutex<QueryUpdatesBroadcastMap>,
720 queries: &HashSet<QueryId>,
721 now_have_all_until: Option<Updatedness>,
722 ) -> crate::Result<()> {
723 let mut saved_queries = saved_queries.lock().unwrap();
724 let query_updates_broadcasters = query_updates_broadcasters.lock().unwrap();
725 for query_id in queries {
726 if let Some((sender, _)) = query_updates_broadcasters.get(query_id) {
727 if let Err(err) = sender.send(object_id) {
728 tracing::error!(
729 ?err,
730 ?query_id,
731 ?object_id,
732 "failed broadcasting query update"
733 );
734 }
735 }
736 if let Some(query) = saved_queries.get_mut(query_id) {
737 if let Some(u) = now_have_all_until {
738 query.now_have_all_until(u);
739 }
740 }
741 }
742 Ok(())
743 }
744
745 pub fn on_connection_event(
746 self: &Arc<Self>,
747 cb: impl 'static + waaaa::Send + waaaa::Sync + Fn(ConnectionEvent),
748 ) {
749 self.api.on_connection_event(cb)
750 }
751
752 pub async fn login(
756 self: &Arc<Self>,
757 url: Arc<String>,
758 user: User,
759 token: SessionToken,
760 ) -> crate::Result<()> {
761 if self
762 .user
763 .read()
764 .unwrap()
765 .map(|u| u != user)
766 .unwrap_or(false)
767 {
768 self.logout().await?;
771 }
772 *self.user.write().unwrap() = Some(user);
773 self.api.login(url.clone(), token);
774 self.db.save_login(LoginInfo { url, user, token }).await?;
775 Ok(())
776 }
777
778 pub async fn logout(&self) -> crate::Result<()> {
779 self.api.logout();
781
782 *self.user.write().unwrap() = None;
784 self.saved_objects.lock().unwrap().clear();
785 self.saved_queries.lock().unwrap().clear();
786
787 let (reply, reply_receiver) = oneshot::channel();
789 self.data_saver_skipper.send_replace(true);
790 let _ = self
791 .data_saver
792 .unbounded_send(DataSaverMessage::StopFrame(reply));
793 let _ = reply_receiver.await;
794 self.db.remove_everything().await?;
795 self.data_saver_skipper.send_replace(false);
796 let _ = self
797 .data_saver
798 .unbounded_send(DataSaverMessage::ResumeFrame);
799
800 Ok(())
801 }
802
803 pub fn user(&self) -> Option<User> {
804 *self.user.read().unwrap()
805 }
806
807 pub fn watch_upload_queue(&self) -> watch::Receiver<Vec<UploadId>> {
808 self.api.watch_upload_queue()
809 }
810
811 pub async fn list_uploads(&self) -> crate::Result<Vec<UploadId>> {
812 self.db.list_uploads().await
813 }
814
815 pub async fn get_upload(
816 self: &Arc<Self>,
817 upload_id: UploadId,
818 ) -> crate::Result<Option<Upload>> {
819 self.db.get_upload(upload_id).await
820 }
821
822 pub fn rename_session(self: &Arc<Self>, name: String) -> oneshot::Receiver<crate::Result<()>> {
823 self.api.rename_session(name)
824 }
825
826 pub async fn current_session(&self) -> crate::Result<Session> {
827 self.api.current_session().await
828 }
829
830 pub async fn list_sessions(&self) -> crate::Result<Vec<Session>> {
831 self.api.list_sessions().await
832 }
833
834 pub fn disconnect_session(
835 self: &Arc<Self>,
836 session_ref: SessionRef,
837 ) -> oneshot::Receiver<crate::Result<()>> {
838 self.api.disconnect_session(session_ref)
839 }
840
841 pub async fn pause_vacuum(&self) -> tokio::sync::RwLockReadGuard<'_, ()> {
843 self.vacuum_guard.read().await
844 }
845
846 pub async fn set_importance<T: Object>(
847 self: &Arc<Self>,
848 ptr: DbPtr<T>,
849 new_importance: Importance,
850 ) -> crate::Result<()> {
851 let object_id = ptr.to_object_id();
852 let old_importance = std::mem::replace(
853 &mut self
854 .saved_objects
855 .lock()
856 .unwrap()
857 .get_mut(&object_id)
858 .ok_or_else(|| crate::Error::ObjectDoesNotExist(object_id))?
859 .importance,
860 new_importance,
861 );
862 if old_importance != new_importance {
863 self.db
864 .set_object_importance(object_id, new_importance)
865 .await?;
866 if old_importance.subscribe() && !new_importance.subscribe() {
867 let mut set = HashSet::with_capacity(1);
868 set.insert(object_id);
869 self.api.unsubscribe(set);
870 } else if !old_importance.subscribe() && new_importance.subscribe() {
871 self.get::<T>(new_importance, ptr).await?;
872 }
873 }
874 Ok(())
875 }
876
877 pub fn list_saved_queries(self: &Arc<Self>) -> SavedQueriesMap {
878 self.saved_queries.lock().unwrap().clone()
879 }
880
881 #[allow(clippy::await_holding_lock)] pub async fn set_query_importance<T: Object>(
885 self: &Arc<Self>,
886 query_id: QueryId,
887 new_importance: Importance,
888 ) -> crate::Result<()> {
889 let mut saved_queries = self.saved_queries.lock().unwrap();
890 let (old_importance, query) = {
891 let Some(saved_query) = saved_queries.get_mut(&query_id) else {
892 return Err(crate::Error::QueryDoesNotExist(query_id));
893 };
894 let old_importance = saved_query.importance;
895 if saved_query.importance == new_importance {
896 return Ok(()); }
898 saved_query.importance = new_importance;
899 (old_importance, saved_query.query.clone())
900 };
901 let importance_changed_objects = {
902 let mut saved_objects = self.saved_objects.lock().unwrap();
904 let mut importance_changed_objects = Vec::new();
905 for (object_id, o) in saved_objects.iter_mut() {
907 if o.matches_queries.contains(&query_id) {
908 let new_importance = o
909 .matches_queries
910 .iter()
911 .map(|query_id| match saved_queries.get(query_id) {
912 Some(q) => q.importance,
913 None => {
914 tracing::error!(?query_id, "matched query was not saved");
915 Importance::NONE
916 }
917 })
918 .fold(Importance::NONE, |a, i| a | i);
919 if new_importance != o.importance_from_queries {
920 o.importance_from_queries = new_importance;
921 importance_changed_objects.push(*object_id);
922 }
923 }
924 }
925 importance_changed_objects
926 };
927 std::mem::drop(saved_queries);
928 if old_importance.subscribe() && !new_importance.subscribe() {
929 self.api.unsubscribe_query(query_id);
930 } else if !old_importance.subscribe() && new_importance.subscribe() {
931 self.query_remote::<T>(new_importance, query_id, query)
932 .await?
933 .count()
934 .await; }
936 if old_importance != new_importance {
937 self.db
938 .set_query_importance(query_id, new_importance, importance_changed_objects)
939 .await?;
940 }
941 Ok(())
942 }
943
944 pub async fn create<T: Object>(
945 self: &Arc<Self>,
946 importance: Importance,
947 object: Arc<T>,
948 ) -> crate::Result<(Obj<T, LocalDb>, impl Future<Output = crate::Result<()>>)> {
949 let id = self.make_ulid();
950 let object_id = ObjectId(id);
951 let completion = self
952 .create_with(importance, object_id, EventId(id), object.clone())
953 .await?;
954 let res = Obj::new(DbPtr::from(object_id), object, self.clone());
955 Ok((res, completion))
956 }
957
958 pub async fn create_with<T: Object>(
959 self: &Arc<Self>,
960 importance: Importance,
961 object_id: ObjectId,
962 created_at: EventId,
963 object: Arc<T>,
964 ) -> crate::Result<impl Future<Output = crate::Result<()>>> {
965 let user =
967 self.user.read().unwrap().ok_or_else(|| {
968 crate::Error::Other(anyhow!("called `submit` with no known user"))
969 })?;
970 if !object
971 .can_create(user, object_id, &*self.db)
972 .await
973 .wrap_context("checking whether object creation seems to be allowed locally")?
974 {
975 return Err(crate::Error::Forbidden);
976 }
977 let _lock = self.vacuum_guard.read().await; let val = self
979 .db
980 .create(
981 object_id,
982 created_at,
983 object.clone(),
984 None, importance,
986 )
987 .await?;
988 let do_subscribe = if let Some(val) = val {
989 let val_json =
990 serde_json::to_value(val).wrap_context("serializing new last snapshot")?;
991 let (matches_queries, importance_from_queries) = queries_for(
992 &self.saved_queries.lock().unwrap(),
993 *T::type_ulid(),
994 &val_json,
995 );
996 self.saved_objects.lock().unwrap().insert(
997 object_id,
998 SavedObject {
999 have_all_until: None,
1000 importance,
1001 matches_queries,
1002 importance_from_queries,
1003 },
1004 );
1005 if importance_from_queries != Importance::NONE {
1006 self.db
1007 .set_importance_from_queries(object_id, importance_from_queries)
1008 .await
1009 .wrap_context("updating queries locks")?;
1010 }
1011 importance.subscribe() || importance_from_queries.subscribe()
1012 } else {
1013 importance.subscribe()
1014 };
1015 self.api
1016 .create(object_id, created_at, object, do_subscribe)
1017 .await
1018 }
1019
1020 pub async fn submit<T: Object>(
1021 self: &Arc<Self>,
1022 ptr: DbPtr<T>,
1023 event: T::Event,
1024 ) -> crate::Result<impl Future<Output = crate::Result<()>>> {
1025 let event_id = EventId(self.make_ulid());
1026 self.submit_with::<T>(
1027 Importance::NONE,
1028 ptr.to_object_id(),
1029 event_id,
1030 Arc::new(event),
1031 )
1032 .await
1033 }
1034
1035 pub async fn submit_with<T: Object>(
1037 self: &Arc<Self>,
1038 importance: Importance,
1039 object_id: ObjectId,
1040 event_id: EventId,
1041 event: Arc<T::Event>,
1042 ) -> crate::Result<impl Future<Output = crate::Result<()>>> {
1043 let user =
1044 self.user.read().unwrap().ok_or_else(|| {
1045 crate::Error::Other(anyhow!("called `submit` with no known user"))
1046 })?;
1047 let _lock = self.vacuum_guard.read().await; let object = self.db.get_latest::<T>(object_id, Importance::NONE).await?;
1049 if !object
1050 .can_apply(user, object_id, &event, &*self.db)
1051 .await
1052 .wrap_context("checking whether object creation seems to be allowed locally")?
1053 {
1054 return Err(crate::Error::Forbidden);
1055 }
1056 let val = self
1057 .db
1058 .submit::<T>(
1059 object_id,
1060 event_id,
1061 event.clone(),
1062 None, importance,
1064 )
1065 .await?;
1066 let do_subscribe = if let Some(val) = val {
1067 let val_json =
1068 serde_json::to_value(val).wrap_context("serializing new last snapshot")?;
1069 let (queries, importance_from_queries_after) = queries_for(
1070 &self.saved_queries.lock().unwrap(),
1071 *T::type_ulid(),
1072 &val_json,
1073 );
1074 let do_subscribe = importance.subscribe() || importance_from_queries_after.subscribe();
1075 let importance_from_queries_before = {
1076 let mut saved_objects = self.saved_objects.lock().unwrap();
1077 let o = saved_objects.get_mut(&object_id).ok_or_else(|| {
1078 crate::Error::Other(anyhow!("Submitted event to non-saved object"))
1079 })?;
1080 o.matches_queries = queries;
1081 let importance_from_queries_before = o.importance_from_queries;
1082 o.importance_from_queries = importance_from_queries_after;
1083 importance_from_queries_before
1084 };
1085 if importance_from_queries_before != importance_from_queries_after {
1086 self.db
1087 .set_importance_from_queries(object_id, importance_from_queries_after)
1088 .await
1089 .wrap_context("updating importance from queries")?;
1090 }
1091 do_subscribe
1092 } else {
1093 importance.subscribe()
1094 };
1095 self.api
1097 .submit::<T>(object_id, event_id, event, do_subscribe)
1098 .await
1099 }
1100
1101 pub async fn get<T: Object>(
1102 self: &Arc<Self>,
1103 importance: Importance,
1104 ptr: DbPtr<T>,
1105 ) -> crate::Result<Obj<T, LocalDb>> {
1106 let object_id = ptr.to_object_id();
1107 match self.db.get_latest::<T>(object_id, importance).await {
1108 Ok(r) => return Ok(Obj::new(ptr, r, self.clone())),
1109 Err(crate::Error::ObjectDoesNotExist(_)) => (), Err(e) => return Err(e),
1111 }
1112 let data = self.api.get(object_id, importance.subscribe()).await?;
1113 let res = save_object_data_locally::<T, _>(
1114 data,
1115 &self.data_saver,
1116 &self.db,
1117 importance,
1118 &self.saved_objects,
1119 &self.saved_queries,
1120 )
1121 .await?;
1122 Ok(Obj::new(ptr, res, self.clone()))
1123 }
1124
1125 pub async fn get_local<T: Object>(
1126 self: &Arc<Self>,
1127 importance: Importance,
1128 ptr: DbPtr<T>,
1129 ) -> crate::Result<Option<Obj<T, LocalDb>>> {
1130 let object_id = ptr.to_object_id();
1131 match self.db.get_latest::<T>(object_id, importance).await {
1132 Ok(res) => Ok(Some(Obj::new(ptr, res, self.clone()))),
1133 Err(crate::Error::ObjectDoesNotExist(o)) if o == object_id => Ok(None),
1134 Err(err) => Err(err),
1135 }
1136 }
1137
1138 pub async fn query_local<'a, T: Object>(
1139 self: &'a Arc<Self>,
1140 query: Arc<Query>,
1141 ) -> crate::Result<impl 'a + waaaa::Stream<Item = crate::Result<Obj<T, LocalDb>>>> {
1142 let object_ids = self
1143 .db
1144 .client_query(*T::type_ulid(), query)
1145 .await
1146 .wrap_context("listing objects matching query")?;
1147
1148 Ok(async_stream::stream! {
1149 for object_id in object_ids {
1150 match self.db.get_latest::<T>(object_id, Importance::NONE).await {
1151 Ok(res) => yield Ok(Obj::new(DbPtr::from(object_id), res, self.clone())),
1152 Err(crate::Error::ObjectDoesNotExist(id)) if id == object_id => continue,
1154 Err(err) => yield Err(err),
1155 }
1156 }
1157 })
1158 }
1159
1160 pub async fn query_remote<'a, T: Object>(
1166 self: &'a Arc<Self>,
1167 importance: Importance,
1168 query_id: QueryId,
1169 query: Arc<Query>,
1170 ) -> crate::Result<impl 'a + waaaa::Stream<Item = crate::Result<Obj<T, LocalDb>>>> {
1171 self.query_updates_broadcastees
1172 .lock()
1173 .unwrap()
1174 .entry(query_id)
1175 .or_insert_with(|| broadcast::channel(BROADCAST_CHANNEL_SIZE));
1176 let only_updated_since = {
1177 let mut subscribed_queries = self.saved_queries.lock().unwrap();
1178 let entry = subscribed_queries.entry(query_id);
1179 match entry {
1180 hash_map::Entry::Occupied(mut q) => {
1181 if !q.get().importance.lock() && importance.lock() {
1182 q.get_mut().importance |= importance;
1184 q.get_mut().have_all_until = None; None
1186 } else {
1187 q.get().have_all_until
1189 }
1190 }
1191 hash_map::Entry::Vacant(v) => {
1192 v.insert(SavedQuery {
1193 query: query.clone(),
1194 type_id: *T::type_ulid(),
1195 have_all_until: None,
1196 importance,
1197 });
1198 None
1199 }
1200 }
1201 };
1202 self.db
1204 .record_query(query_id, query.clone(), *T::type_ulid(), importance)
1205 .await?;
1206 Ok(self
1207 .api
1208 .query::<T>(query_id, only_updated_since, importance.subscribe(), query)
1209 .then({
1210 let data_saver = self.data_saver.clone();
1211 let db = self.db.clone();
1212 let saved_objects = self.saved_objects.clone();
1213 let saved_queries = self.saved_queries.clone();
1214 move |data| {
1215 let data_saver = data_saver.clone();
1217 let db = db.clone();
1218 let saved_objects = saved_objects.clone();
1219 let saved_queries = saved_queries.clone();
1220 async move {
1221 let (data, updatedness) = data?;
1222 match data {
1223 MaybeObject::NotYetSubscribed(data) => {
1224 let object_id = data.object_id;
1225 let res = save_object_data_locally::<T, _>(
1226 data,
1227 &data_saver,
1228 &db,
1229 Importance::NONE,
1230 &saved_objects,
1231 &saved_queries,
1232 )
1233 .await?;
1234 if let Some(now_have_all_until) = updatedness {
1235 let mut the_query = HashSet::with_capacity(1);
1236 the_query.insert(query_id);
1237 db.update_queries(&the_query, now_have_all_until).await?;
1238 if let Some(q) =
1239 saved_queries.lock().unwrap().get_mut(&query_id)
1240 {
1241 q.now_have_all_until(now_have_all_until);
1242 }
1243 }
1244 Ok(Obj::new(DbPtr::from(object_id), res, self.clone()))
1245 }
1246 MaybeObject::AlreadySubscribed(object_id) => {
1247 self.db
1253 .get_latest::<T>(object_id, Importance::NONE)
1254 .await
1255 .map(|res| Obj::new(DbPtr::from(object_id), res, self.clone()))
1256 }
1257 }
1258 }
1259 }
1260 }))
1261 }
1262
1263 pub async fn create_binary(self: &Arc<Self>, data: Arc<[u8]>) -> crate::Result<()> {
1270 let binary_id = crdb_core::hash_binary(&data);
1271 self.db.create_binary(binary_id, data.clone()).await
1272 }
1278
1279 pub async fn get_binary(
1280 self: &Arc<Self>,
1281 binary_id: BinPtr,
1282 ) -> anyhow::Result<Option<Arc<[u8]>>> {
1283 if let Some(res) = self.db.get_binary(binary_id).await? {
1284 return Ok(Some(res));
1285 }
1286 let Some(res) = self.api.get_binary(binary_id).await? else {
1287 return Ok(None);
1288 };
1289 self.db.create_binary(binary_id, res.clone()).await?;
1290 Ok(Some(res))
1291 }
1292}
1293
1294pub struct ClientVacuumSchedule<F> {
1295 frequency: Duration,
1296 filter: F,
1297}
1298
1299impl ClientVacuumSchedule<fn(ClientStorageInfo) -> bool> {
1300 pub fn new(frequency: Duration) -> Self {
1301 ClientVacuumSchedule {
1302 frequency,
1303 filter: |_| true,
1304 }
1305 }
1306
1307 pub fn never() -> Self {
1308 ClientVacuumSchedule {
1309 frequency: Duration::from_secs(86400),
1310 filter: |_| false,
1311 }
1312 }
1313}
1314
1315impl<F> ClientVacuumSchedule<F> {
1316 pub fn filter<G: Fn(ClientStorageInfo) -> bool>(self, filter: G) -> ClientVacuumSchedule<G> {
1320 ClientVacuumSchedule {
1321 frequency: self.frequency,
1322 filter,
1323 }
1324 }
1325}
1326
1327fn queries_for(
1330 saved_queries: &SavedQueriesMap,
1331 type_id: TypeId,
1332 value: &serde_json::Value,
1333) -> (HashSet<QueryId>, Importance) {
1334 let mut importance = Importance::NONE;
1335 let queries = saved_queries
1336 .iter()
1337 .filter_map(|(id, q)| {
1338 if type_id == q.type_id && q.query.matches_json(value) {
1339 importance |= q.importance;
1340 Some(*id)
1341 } else {
1342 None
1343 }
1344 })
1345 .collect();
1346 (queries, importance)
1347}
1348
1349async fn locally_create_all<T: Object, LocalDb: ClientSideDb>(
1351 data_saver: &mpsc::UnboundedSender<DataSaverMessage>,
1352 db: &CacheDb<LocalDb>,
1353 importance: Importance,
1354 data: ObjectData,
1355) -> crate::Result<Option<serde_json::Value>> {
1356 if data.type_id != *T::type_ulid() {
1357 return Err(crate::Error::WrongType {
1358 object_id: data.object_id,
1359 expected_type_id: *T::type_ulid(),
1360 real_type_id: data.type_id,
1361 });
1362 }
1363
1364 let mut latest_snapshot_returns =
1366 Vec::with_capacity(data.events.len() + data.creation_snapshot.is_some() as usize);
1367 if let Some((created_at, snapshot_version, snapshot_data)) = data.creation_snapshot {
1368 let (reply, reply_receiver) = oneshot::channel();
1369 latest_snapshot_returns.push(reply_receiver);
1370 data_saver
1371 .unbounded_send(DataSaverMessage::Data {
1372 update: Arc::new(Update {
1373 object_id: data.object_id,
1374 data: UpdateData::Creation {
1375 type_id: data.type_id,
1376 created_at,
1377 snapshot_version,
1378 data: snapshot_data,
1379 },
1380 }),
1381 now_have_all_until: data.events.is_empty().then_some(data.now_have_all_until),
1382 additional_importance: importance,
1383 reply,
1384 })
1385 .map_err(|_| crate::Error::Other(anyhow!("Data saver task disappeared early")))?;
1386 } else if importance != Importance::NONE {
1387 db.get_latest::<T>(data.object_id, importance)
1388 .await
1389 .wrap_context("updating importance for object as requested")?;
1390 }
1391
1392 let events_len = data.events.len();
1394 for (i, (event_id, event)) in data.events.into_iter().enumerate() {
1395 let (reply, reply_receiver) = oneshot::channel();
1397 latest_snapshot_returns.push(reply_receiver);
1398 data_saver
1399 .unbounded_send(DataSaverMessage::Data {
1400 update: Arc::new(Update {
1401 object_id: data.object_id,
1402 data: UpdateData::Event {
1403 type_id: data.type_id,
1404 event_id,
1405 data: event,
1406 },
1407 }),
1408 now_have_all_until: (i + 1 == events_len).then_some(data.now_have_all_until),
1409 additional_importance: Importance::NONE,
1410 reply,
1411 })
1412 .map_err(|_| crate::Error::Other(anyhow!("Data saver task disappeared early")))?;
1413 }
1414
1415 let mut res = None;
1417 for receiver in latest_snapshot_returns {
1418 if let Ok(res_data) = receiver.await {
1419 match res_data {
1420 Ok(UpdateResult::LatestChanged(_, res_data)) => res = Some(res_data),
1421 Ok(_) => (),
1422 Err(crate::Error::ObjectDoesNotExist(o))
1423 if o == data.object_id && !importance.lock() =>
1424 {
1425 }
1427 Err(err) => return Err(err).wrap_context("creating in local database"),
1428 }
1429 }
1430 }
1431
1432 Ok(res)
1433}
1434
1435async fn save_object_data_locally<T: Object, LocalDb: ClientSideDb>(
1436 data: ObjectData,
1437 data_saver: &mpsc::UnboundedSender<DataSaverMessage>,
1438 db: &CacheDb<LocalDb>,
1439 importance: Importance,
1440 saved_objects: &Mutex<SavedObjectMap>,
1441 saved_queries: &Mutex<SavedQueriesMap>,
1442) -> crate::Result<Arc<T>> {
1443 let now_have_all_until = data.now_have_all_until;
1444 let object_id = data.object_id;
1445 let type_id = data.type_id;
1446 if type_id != *T::type_ulid() {
1447 return Err(crate::Error::WrongType {
1448 object_id,
1449 expected_type_id: *T::type_ulid(),
1450 real_type_id: type_id,
1451 });
1452 }
1453 let res_json = locally_create_all::<T, _>(data_saver, db, importance, data).await?;
1454 let (res, res_json) = match res_json {
1455 Some(res_json) => (
1456 Arc::new(T::deserialize(&res_json).wrap_context("deserializing snapshot")?),
1457 res_json,
1458 ),
1459 None => {
1460 let res = db.get_latest::<T>(object_id, importance).await?;
1461 let res_json = serde_json::to_value(&res).wrap_context("serializing snapshot")?;
1462 (res, res_json)
1463 }
1464 };
1465 let (queries, importance_from_queries) =
1466 queries_for(&saved_queries.lock().unwrap(), type_id, &res_json);
1467 db.set_importance_from_queries(object_id, importance_from_queries)
1468 .await
1469 .wrap_context("updating queries lock")?;
1470 let mut saved_objects = saved_objects.lock().unwrap();
1471 match saved_objects.entry(object_id) {
1472 hash_map::Entry::Occupied(mut o) => {
1473 let o = o.get_mut();
1474 o.now_have_all_until(now_have_all_until);
1475 o.importance |= importance;
1476 o.matches_queries = queries;
1477 o.importance_from_queries = importance_from_queries;
1478 }
1479 hash_map::Entry::Vacant(v) => {
1480 v.insert(SavedObject {
1481 have_all_until: Some(now_have_all_until),
1482 importance,
1483 matches_queries: queries,
1484 importance_from_queries,
1485 });
1486 }
1487 }
1488 Ok(res)
1489}
1490
1491impl<LocalDb: ClientSideDb> Deref for ClientDb<LocalDb> {
1493 type Target = CacheDb<LocalDb>;
1494
1495 fn deref(&self) -> &Self::Target {
1496 &self.db
1497 }
1498}