1use super::{BinariesCache, ObjectCache};
2use crdb_core::{
3    BinPtr, ClientSideDb, ClientStorageInfo, CrdbSyncFn, Db, DynSized, EventId, Importance,
4    LoginInfo, Object, ObjectId, Query, QueryId, SavedObjectMeta, SavedQuery, ServerSideDb,
5    Session, SessionRef, SessionToken, TypeId, Updatedness, Upload, UploadId, User,
6    UsersWhoCanRead,
7};
8use std::{
9    collections::{HashMap, HashSet},
10    sync::{Arc, Mutex, RwLock},
11};
12use web_time::SystemTime;
13
14pub struct CacheDb<D: Db> {
15    db: D,
16    cache: Arc<RwLock<ObjectCache>>,
17    binaries: Arc<RwLock<BinariesCache>>,
18}
19
20impl<D: Db> CacheDb<D> {
21    pub fn new(db: D, watermark: usize) -> CacheDb<D> {
27        let cache = Arc::new(RwLock::new(ObjectCache::new(watermark)));
28        CacheDb {
29            db,
30            cache,
31            binaries: Arc::new(RwLock::new(BinariesCache::new())),
32        }
33    }
34
35    }
40
41impl<D: Db> Db for CacheDb<D> {
42    async fn create<T: Object>(
43        &self,
44        object_id: ObjectId,
45        created_at: EventId,
46        object: Arc<T>,
47        updatedness: Option<Updatedness>,
48        importance: Importance,
49    ) -> crate::Result<Option<Arc<T>>> {
50        self.cache.write().unwrap().remove(&object_id);
51        let res = self
52            .db
53            .create(
54                object_id,
55                created_at,
56                object.clone(),
57                updatedness,
58                importance,
59            )
60            .await?;
61        if let Some(value) = res.clone() {
62            self.cache.write().unwrap().set(object_id, value);
63        }
64        Ok(res)
65    }
66
67    async fn submit<T: Object>(
68        &self,
69        object_id: ObjectId,
70        event_id: EventId,
71        event: Arc<T::Event>,
72        updatedness: Option<Updatedness>,
73        additional_importance: Importance,
74    ) -> crate::Result<Option<Arc<T>>> {
75        let res = self
76            .db
77            .submit::<T>(
78                object_id,
79                event_id,
80                event.clone(),
81                updatedness,
82                additional_importance,
83            )
84            .await?;
85        if let Some(value) = res.clone() {
86            self.cache.write().unwrap().set(object_id, value);
87        }
88        Ok(res)
89    }
90
91    async fn get_latest<T: Object>(
92        &self,
93        object_id: ObjectId,
94        importance: Importance,
95    ) -> crate::Result<Arc<T>> {
96        if let Some(res) = self.cache.read().unwrap().get(&object_id) {
97            if let Ok(res) = Arc::downcast(DynSized::arc_to_any(res)) {
99                return Ok(res);
100            }
101            }
103        let res = self.db.get_latest::<T>(object_id, importance).await?;
104        self.cache.write().unwrap().set(object_id, res.clone() as _);
105        Ok(res)
106    }
107
108    async fn create_binary(&self, binary_id: BinPtr, data: Arc<[u8]>) -> crate::Result<()> {
109        if crdb_core::hash_binary(&data) != binary_id {
113            return Err(crate::Error::BinaryHashMismatch(binary_id));
114        }
115        self.binaries
116            .write()
117            .unwrap()
118            .insert(binary_id, Arc::downgrade(&data));
119        self.db.create_binary(binary_id, data).await
120    }
121
122    async fn get_binary(&self, binary_id: BinPtr) -> crate::Result<Option<Arc<[u8]>>> {
123        if let Some(res) = self.binaries.read().unwrap().get(&binary_id) {
124            return Ok(Some(res.clone()));
125        }
126        let Some(res) = self.db.get_binary(binary_id).await? else {
127            return Ok(None);
128        };
129        self.binaries
130            .write()
131            .unwrap()
132            .insert(binary_id, Arc::downgrade(&res));
133        Ok(Some(res))
134    }
135
136    async fn reencode_old_versions<T: Object>(&self) -> usize {
138        self.db.reencode_old_versions::<T>().await
139    }
140
141    async fn assert_invariants_generic(&self) {
142        self.db.assert_invariants_generic().await;
143    }
144
145    async fn assert_invariants_for<T: Object>(&self) {
146        self.db.assert_invariants_for::<T>().await;
147    }
148}
149
150impl<D: ClientSideDb> ClientSideDb for CacheDb<D> {
151    async fn storage_info(&self) -> crate::Result<ClientStorageInfo> {
152        self.db.storage_info().await
153    }
154
155    async fn save_login(&self, info: LoginInfo) -> crate::Result<()> {
156        self.db.save_login(info).await
157    }
158
159    async fn get_saved_login(&self) -> crate::Result<Option<LoginInfo>> {
160        self.db.get_saved_login().await
161    }
162
163    async fn remove_everything(&self) -> crate::Result<()> {
164        self.db.remove_everything().await
165    }
166
167    async fn get_json(
168        &self,
169        object_id: ObjectId,
170        importance: Importance,
171    ) -> crate::Result<serde_json::Value> {
172        self.db.get_json(object_id, importance).await
173    }
174
175    async fn recreate<T: Object>(
176        &self,
177        object_id: ObjectId,
178        new_created_at: EventId,
179        object: Arc<T>,
180        updatedness: Option<Updatedness>,
181        additional_importance: Importance,
182    ) -> crate::Result<Option<Arc<T>>> {
183        let res = self
184            .db
185            .recreate(
186                object_id,
187                new_created_at,
188                object,
189                updatedness,
190                additional_importance,
191            )
192            .await?;
193        if let Some(res) = res.clone() {
194            self.cache.write().unwrap().set(object_id, res as _);
195        }
196        Ok(res)
197    }
198
199    async fn client_query(
200        &self,
201        type_id: TypeId,
202        query: Arc<Query>,
203    ) -> crate::Result<Vec<ObjectId>> {
204        self.db.client_query(type_id, query).await
205    }
206
207    async fn remove(&self, object_id: ObjectId) -> crate::Result<()> {
208        self.cache.write().unwrap().remove(&object_id);
209        self.db.remove(object_id).await
210    }
211
212    async fn remove_event<T: Object>(
213        &self,
214        object_id: ObjectId,
215        event_id: EventId,
216    ) -> crate::Result<()> {
217        self.cache.write().unwrap().remove(&object_id);
218        self.db.remove_event::<T>(object_id, event_id).await
219    }
220
221    async fn set_object_importance(
222        &self,
223        object_id: ObjectId,
224        new_importance: Importance,
225    ) -> crate::Result<()> {
226        self.db
227            .set_object_importance(object_id, new_importance)
228            .await
229    }
230
231    async fn set_importance_from_queries(
232        &self,
233        object_id: ObjectId,
234        new_importance_from_queries: Importance,
235    ) -> crate::Result<()> {
236        self.db
237            .set_importance_from_queries(object_id, new_importance_from_queries)
238            .await
239    }
240
241    async fn client_vacuum(
242        &self,
243        notify_removals: impl 'static + CrdbSyncFn<ObjectId>,
244        notify_query_removals: impl 'static + CrdbSyncFn<QueryId>,
245    ) -> crate::Result<()> {
246        let objects_to_remove = Arc::new(Mutex::new(HashSet::new()));
247        let res = self
248            .db
249            .client_vacuum(
250                {
251                    let objects_to_remove = objects_to_remove.clone();
252                    move |object_id| {
253                        objects_to_remove.lock().unwrap().insert(object_id);
254                        notify_removals(object_id);
255                    }
256                },
257                notify_query_removals,
258            )
259            .await;
260        let mut cache = self.cache.write().unwrap();
261        for o in objects_to_remove.lock().unwrap().iter() {
262            cache.remove(o);
263        }
264        res
265    }
266
267    async fn list_uploads(&self) -> crate::Result<Vec<UploadId>> {
268        self.db.list_uploads().await
269    }
270
271    async fn get_upload(&self, upload_id: UploadId) -> crate::Result<Option<Upload>> {
272        self.db.get_upload(upload_id).await
273    }
274
275    async fn enqueue_upload(
276        &self,
277        upload: Upload,
278        required_binaries: Vec<BinPtr>,
279    ) -> crate::Result<UploadId> {
280        self.db.enqueue_upload(upload, required_binaries).await
281    }
282
283    async fn upload_finished(&self, upload_id: UploadId) -> crate::Result<()> {
284        self.db.upload_finished(upload_id).await
285    }
286
287    async fn get_saved_objects(&self) -> crate::Result<HashMap<ObjectId, SavedObjectMeta>> {
288        self.db.get_saved_objects().await
289    }
290
291    async fn get_saved_queries(&self) -> crate::Result<HashMap<QueryId, SavedQuery>> {
292        self.db.get_saved_queries().await
293    }
294
295    async fn record_query(
296        &self,
297        query_id: QueryId,
298        query: Arc<Query>,
299        type_id: TypeId,
300        importance: Importance,
301    ) -> crate::Result<()> {
302        self.db
303            .record_query(query_id, query, type_id, importance)
304            .await
305    }
306
307    async fn set_query_importance(
308        &self,
309        query_id: QueryId,
310        importance: Importance,
311        objects_matching_query: Vec<ObjectId>,
312    ) -> crate::Result<()> {
313        self.db
314            .set_query_importance(query_id, importance, objects_matching_query)
315            .await
316    }
317
318    async fn forget_query(
319        &self,
320        query_id: QueryId,
321        objects_matching_query: Vec<ObjectId>,
322    ) -> crate::Result<()> {
323        self.db.forget_query(query_id, objects_matching_query).await
324    }
325
326    async fn update_queries(
327        &self,
328        queries: &HashSet<QueryId>,
329        now_have_all_until: Updatedness,
330    ) -> crate::Result<()> {
331        self.db.update_queries(queries, now_have_all_until).await
332    }
333}
334
335impl<D: ServerSideDb> ServerSideDb for CacheDb<D> {
336    type Connection = D::Connection;
337    type Transaction<'a> = D::Transaction<'a>;
338    type Lock<'a> = D::Lock<'a>;
339
340    fn get_users_who_can_read<'a, 'ret: 'a, T: Object, C: crdb_core::CanDoCallbacks>(
341        &'ret self,
342        object_id: ObjectId,
343        object: &'a T,
344        cb: &'a C,
345    ) -> std::pin::Pin<
346        Box<dyn 'a + waaaa::Future<Output = anyhow::Result<UsersWhoCanRead<Self::Lock<'ret>>>>>,
347    > {
348        self.db.get_users_who_can_read(object_id, object, cb)
349    }
350
351    async fn get_transaction(&self) -> crdb_core::Result<Self::Transaction<'_>> {
352        self.db.get_transaction().await
353    }
354
355    async fn get_latest_snapshot(
356        &self,
357        transaction: &mut Self::Connection,
358        user: User,
359        object_id: ObjectId,
360    ) -> crate::Result<Arc<serde_json::Value>> {
361        self.db
362            .get_latest_snapshot(transaction, user, object_id)
363            .await
364    }
365
366    async fn get_all(
367        &self,
368        connection: &mut Self::Connection,
369        user: crdb_core::User,
370        object_id: ObjectId,
371        only_updated_since: Option<Updatedness>,
372    ) -> crdb_core::Result<crdb_core::ObjectData> {
373        self.db
374            .get_all(connection, user, object_id, only_updated_since)
375            .await
376    }
377
378    async fn server_query(
379        &self,
380        user: User,
381        type_id: TypeId,
382        only_updated_since: Option<Updatedness>,
383        query: Arc<Query>,
384    ) -> crate::Result<Vec<ObjectId>> {
385        self.db
386            .server_query(user, type_id, only_updated_since, query)
387            .await
388    }
389
390    async fn server_vacuum(
391        &self,
392        no_new_changes_before: Option<EventId>,
393        updatedness: Updatedness,
394        kill_sessions_older_than: Option<SystemTime>,
395        notify_recreation: impl FnMut(crdb_core::Update, HashSet<crdb_core::User>),
396    ) -> crdb_core::Result<()> {
397        self.db
399            .server_vacuum(
400                no_new_changes_before,
401                updatedness,
402                kill_sessions_older_than,
403                notify_recreation,
404            )
405            .await
406    }
407
408    async fn recreate_at<'a, T: Object, C: crdb_core::CanDoCallbacks>(
409        &'a self,
410        object_id: ObjectId,
411        event_id: EventId,
412        updatedness: Updatedness,
413        cb: &'a C,
414    ) -> crdb_core::Result<Option<(EventId, Arc<T>)>> {
415        self.db
417            .recreate_at(object_id, event_id, updatedness, cb)
418            .await
419    }
420
421    async fn create_and_return_rdep_changes<T: Object>(
422        &self,
423        object_id: ObjectId,
424        created_at: EventId,
425        object: Arc<T>,
426        updatedness: Updatedness,
427    ) -> crdb_core::Result<Option<(Arc<T>, Vec<crdb_core::ReadPermsChanges>)>> {
428        self.cache.write().unwrap().remove(&object_id);
429        self.db
430            .create_and_return_rdep_changes(object_id, created_at, object, updatedness)
431            .await
432    }
433
434    async fn submit_and_return_rdep_changes<T: Object>(
435        &self,
436        object_id: ObjectId,
437        event_id: EventId,
438        event: Arc<T::Event>,
439        updatedness: Updatedness,
440    ) -> crdb_core::Result<Option<(Arc<T>, Vec<crdb_core::ReadPermsChanges>)>> {
441        self.cache.write().unwrap().remove(&object_id);
442        self.db
443            .submit_and_return_rdep_changes(object_id, event_id, event, updatedness)
444            .await
445    }
446
447    async fn update_pending_rdeps(&self) -> crdb_core::Result<()> {
448        self.db.update_pending_rdeps().await
449    }
450
451    async fn login_session(
452        &self,
453        session: Session,
454    ) -> crdb_core::Result<(SessionToken, SessionRef)> {
455        self.db.login_session(session).await
456    }
457
458    async fn resume_session(&self, token: SessionToken) -> crdb_core::Result<Session> {
459        self.db.resume_session(token).await
460    }
461
462    async fn mark_session_active(
463        &self,
464        token: SessionToken,
465        at: SystemTime,
466    ) -> crdb_core::Result<()> {
467        self.db.mark_session_active(token, at).await
468    }
469
470    async fn rename_session<'a>(
471        &'a self,
472        token: SessionToken,
473        new_name: &'a str,
474    ) -> crdb_core::Result<()> {
475        self.db.rename_session(token, new_name).await
476    }
477
478    async fn list_sessions(&self, user: User) -> crdb_core::Result<Vec<Session>> {
479        self.db.list_sessions(user).await
480    }
481
482    async fn disconnect_session(&self, user: User, session: SessionRef) -> crdb_core::Result<()> {
483        self.db.disconnect_session(user, session).await
484    }
485}