1use super::{BinariesCache, ObjectCache};
2use crdb_core::{
3 BinPtr, ClientSideDb, ClientStorageInfo, CrdbSyncFn, Db, DynSized, EventId, Importance,
4 LoginInfo, Object, ObjectId, Query, QueryId, SavedObjectMeta, SavedQuery, ServerSideDb,
5 Session, SessionRef, SessionToken, TypeId, Updatedness, Upload, UploadId, User,
6 UsersWhoCanRead,
7};
8use std::{
9 collections::{HashMap, HashSet},
10 sync::{Arc, Mutex, RwLock},
11};
12use web_time::SystemTime;
13
14pub struct CacheDb<D: Db> {
15 db: D,
16 cache: Arc<RwLock<ObjectCache>>,
17 binaries: Arc<RwLock<BinariesCache>>,
18}
19
20impl<D: Db> CacheDb<D> {
21 pub fn new(db: D, watermark: usize) -> CacheDb<D> {
27 let cache = Arc::new(RwLock::new(ObjectCache::new(watermark)));
28 CacheDb {
29 db,
30 cache,
31 binaries: Arc::new(RwLock::new(BinariesCache::new())),
32 }
33 }
34
35 }
40
41impl<D: Db> Db for CacheDb<D> {
42 async fn create<T: Object>(
43 &self,
44 object_id: ObjectId,
45 created_at: EventId,
46 object: Arc<T>,
47 updatedness: Option<Updatedness>,
48 importance: Importance,
49 ) -> crate::Result<Option<Arc<T>>> {
50 self.cache.write().unwrap().remove(&object_id);
51 let res = self
52 .db
53 .create(
54 object_id,
55 created_at,
56 object.clone(),
57 updatedness,
58 importance,
59 )
60 .await?;
61 if let Some(value) = res.clone() {
62 self.cache.write().unwrap().set(object_id, value);
63 }
64 Ok(res)
65 }
66
67 async fn submit<T: Object>(
68 &self,
69 object_id: ObjectId,
70 event_id: EventId,
71 event: Arc<T::Event>,
72 updatedness: Option<Updatedness>,
73 additional_importance: Importance,
74 ) -> crate::Result<Option<Arc<T>>> {
75 let res = self
76 .db
77 .submit::<T>(
78 object_id,
79 event_id,
80 event.clone(),
81 updatedness,
82 additional_importance,
83 )
84 .await?;
85 if let Some(value) = res.clone() {
86 self.cache.write().unwrap().set(object_id, value);
87 }
88 Ok(res)
89 }
90
91 async fn get_latest<T: Object>(
92 &self,
93 object_id: ObjectId,
94 importance: Importance,
95 ) -> crate::Result<Arc<T>> {
96 if let Some(res) = self.cache.read().unwrap().get(&object_id) {
97 if let Ok(res) = Arc::downcast(DynSized::arc_to_any(res)) {
99 return Ok(res);
100 }
101 }
103 let res = self.db.get_latest::<T>(object_id, importance).await?;
104 self.cache.write().unwrap().set(object_id, res.clone() as _);
105 Ok(res)
106 }
107
108 async fn create_binary(&self, binary_id: BinPtr, data: Arc<[u8]>) -> crate::Result<()> {
109 if crdb_core::hash_binary(&data) != binary_id {
113 return Err(crate::Error::BinaryHashMismatch(binary_id));
114 }
115 self.binaries
116 .write()
117 .unwrap()
118 .insert(binary_id, Arc::downgrade(&data));
119 self.db.create_binary(binary_id, data).await
120 }
121
122 async fn get_binary(&self, binary_id: BinPtr) -> crate::Result<Option<Arc<[u8]>>> {
123 if let Some(res) = self.binaries.read().unwrap().get(&binary_id) {
124 return Ok(Some(res.clone()));
125 }
126 let Some(res) = self.db.get_binary(binary_id).await? else {
127 return Ok(None);
128 };
129 self.binaries
130 .write()
131 .unwrap()
132 .insert(binary_id, Arc::downgrade(&res));
133 Ok(Some(res))
134 }
135
136 async fn reencode_old_versions<T: Object>(&self) -> usize {
138 self.db.reencode_old_versions::<T>().await
139 }
140
141 async fn assert_invariants_generic(&self) {
142 self.db.assert_invariants_generic().await;
143 }
144
145 async fn assert_invariants_for<T: Object>(&self) {
146 self.db.assert_invariants_for::<T>().await;
147 }
148}
149
150impl<D: ClientSideDb> ClientSideDb for CacheDb<D> {
151 async fn storage_info(&self) -> crate::Result<ClientStorageInfo> {
152 self.db.storage_info().await
153 }
154
155 async fn save_login(&self, info: LoginInfo) -> crate::Result<()> {
156 self.db.save_login(info).await
157 }
158
159 async fn get_saved_login(&self) -> crate::Result<Option<LoginInfo>> {
160 self.db.get_saved_login().await
161 }
162
163 async fn remove_everything(&self) -> crate::Result<()> {
164 self.db.remove_everything().await
165 }
166
167 async fn get_json(
168 &self,
169 object_id: ObjectId,
170 importance: Importance,
171 ) -> crate::Result<serde_json::Value> {
172 self.db.get_json(object_id, importance).await
173 }
174
175 async fn recreate<T: Object>(
176 &self,
177 object_id: ObjectId,
178 new_created_at: EventId,
179 object: Arc<T>,
180 updatedness: Option<Updatedness>,
181 additional_importance: Importance,
182 ) -> crate::Result<Option<Arc<T>>> {
183 let res = self
184 .db
185 .recreate(
186 object_id,
187 new_created_at,
188 object,
189 updatedness,
190 additional_importance,
191 )
192 .await?;
193 if let Some(res) = res.clone() {
194 self.cache.write().unwrap().set(object_id, res as _);
195 }
196 Ok(res)
197 }
198
199 async fn client_query(
200 &self,
201 type_id: TypeId,
202 query: Arc<Query>,
203 ) -> crate::Result<Vec<ObjectId>> {
204 self.db.client_query(type_id, query).await
205 }
206
207 async fn remove(&self, object_id: ObjectId) -> crate::Result<()> {
208 self.cache.write().unwrap().remove(&object_id);
209 self.db.remove(object_id).await
210 }
211
212 async fn remove_event<T: Object>(
213 &self,
214 object_id: ObjectId,
215 event_id: EventId,
216 ) -> crate::Result<()> {
217 self.cache.write().unwrap().remove(&object_id);
218 self.db.remove_event::<T>(object_id, event_id).await
219 }
220
221 async fn set_object_importance(
222 &self,
223 object_id: ObjectId,
224 new_importance: Importance,
225 ) -> crate::Result<()> {
226 self.db
227 .set_object_importance(object_id, new_importance)
228 .await
229 }
230
231 async fn set_importance_from_queries(
232 &self,
233 object_id: ObjectId,
234 new_importance_from_queries: Importance,
235 ) -> crate::Result<()> {
236 self.db
237 .set_importance_from_queries(object_id, new_importance_from_queries)
238 .await
239 }
240
241 async fn client_vacuum(
242 &self,
243 notify_removals: impl 'static + CrdbSyncFn<ObjectId>,
244 notify_query_removals: impl 'static + CrdbSyncFn<QueryId>,
245 ) -> crate::Result<()> {
246 let objects_to_remove = Arc::new(Mutex::new(HashSet::new()));
247 let res = self
248 .db
249 .client_vacuum(
250 {
251 let objects_to_remove = objects_to_remove.clone();
252 move |object_id| {
253 objects_to_remove.lock().unwrap().insert(object_id);
254 notify_removals(object_id);
255 }
256 },
257 notify_query_removals,
258 )
259 .await;
260 let mut cache = self.cache.write().unwrap();
261 for o in objects_to_remove.lock().unwrap().iter() {
262 cache.remove(o);
263 }
264 res
265 }
266
267 async fn list_uploads(&self) -> crate::Result<Vec<UploadId>> {
268 self.db.list_uploads().await
269 }
270
271 async fn get_upload(&self, upload_id: UploadId) -> crate::Result<Option<Upload>> {
272 self.db.get_upload(upload_id).await
273 }
274
275 async fn enqueue_upload(
276 &self,
277 upload: Upload,
278 required_binaries: Vec<BinPtr>,
279 ) -> crate::Result<UploadId> {
280 self.db.enqueue_upload(upload, required_binaries).await
281 }
282
283 async fn upload_finished(&self, upload_id: UploadId) -> crate::Result<()> {
284 self.db.upload_finished(upload_id).await
285 }
286
287 async fn get_saved_objects(&self) -> crate::Result<HashMap<ObjectId, SavedObjectMeta>> {
288 self.db.get_saved_objects().await
289 }
290
291 async fn get_saved_queries(&self) -> crate::Result<HashMap<QueryId, SavedQuery>> {
292 self.db.get_saved_queries().await
293 }
294
295 async fn record_query(
296 &self,
297 query_id: QueryId,
298 query: Arc<Query>,
299 type_id: TypeId,
300 importance: Importance,
301 ) -> crate::Result<()> {
302 self.db
303 .record_query(query_id, query, type_id, importance)
304 .await
305 }
306
307 async fn set_query_importance(
308 &self,
309 query_id: QueryId,
310 importance: Importance,
311 objects_matching_query: Vec<ObjectId>,
312 ) -> crate::Result<()> {
313 self.db
314 .set_query_importance(query_id, importance, objects_matching_query)
315 .await
316 }
317
318 async fn forget_query(
319 &self,
320 query_id: QueryId,
321 objects_matching_query: Vec<ObjectId>,
322 ) -> crate::Result<()> {
323 self.db.forget_query(query_id, objects_matching_query).await
324 }
325
326 async fn update_queries(
327 &self,
328 queries: &HashSet<QueryId>,
329 now_have_all_until: Updatedness,
330 ) -> crate::Result<()> {
331 self.db.update_queries(queries, now_have_all_until).await
332 }
333}
334
335impl<D: ServerSideDb> ServerSideDb for CacheDb<D> {
336 type Connection = D::Connection;
337 type Transaction<'a> = D::Transaction<'a>;
338 type Lock<'a> = D::Lock<'a>;
339
340 fn get_users_who_can_read<'a, 'ret: 'a, T: Object, C: crdb_core::CanDoCallbacks>(
341 &'ret self,
342 object_id: ObjectId,
343 object: &'a T,
344 cb: &'a C,
345 ) -> std::pin::Pin<
346 Box<dyn 'a + waaaa::Future<Output = anyhow::Result<UsersWhoCanRead<Self::Lock<'ret>>>>>,
347 > {
348 self.db.get_users_who_can_read(object_id, object, cb)
349 }
350
351 async fn get_transaction(&self) -> crdb_core::Result<Self::Transaction<'_>> {
352 self.db.get_transaction().await
353 }
354
355 async fn get_latest_snapshot(
356 &self,
357 transaction: &mut Self::Connection,
358 user: User,
359 object_id: ObjectId,
360 ) -> crate::Result<Arc<serde_json::Value>> {
361 self.db
362 .get_latest_snapshot(transaction, user, object_id)
363 .await
364 }
365
366 async fn get_all(
367 &self,
368 connection: &mut Self::Connection,
369 user: crdb_core::User,
370 object_id: ObjectId,
371 only_updated_since: Option<Updatedness>,
372 ) -> crdb_core::Result<crdb_core::ObjectData> {
373 self.db
374 .get_all(connection, user, object_id, only_updated_since)
375 .await
376 }
377
378 async fn server_query(
379 &self,
380 user: User,
381 type_id: TypeId,
382 only_updated_since: Option<Updatedness>,
383 query: Arc<Query>,
384 ) -> crate::Result<Vec<ObjectId>> {
385 self.db
386 .server_query(user, type_id, only_updated_since, query)
387 .await
388 }
389
390 async fn server_vacuum(
391 &self,
392 no_new_changes_before: Option<EventId>,
393 updatedness: Updatedness,
394 kill_sessions_older_than: Option<SystemTime>,
395 notify_recreation: impl FnMut(crdb_core::Update, HashSet<crdb_core::User>),
396 ) -> crdb_core::Result<()> {
397 self.db
399 .server_vacuum(
400 no_new_changes_before,
401 updatedness,
402 kill_sessions_older_than,
403 notify_recreation,
404 )
405 .await
406 }
407
408 async fn recreate_at<'a, T: Object, C: crdb_core::CanDoCallbacks>(
409 &'a self,
410 object_id: ObjectId,
411 event_id: EventId,
412 updatedness: Updatedness,
413 cb: &'a C,
414 ) -> crdb_core::Result<Option<(EventId, Arc<T>)>> {
415 self.db
417 .recreate_at(object_id, event_id, updatedness, cb)
418 .await
419 }
420
421 async fn create_and_return_rdep_changes<T: Object>(
422 &self,
423 object_id: ObjectId,
424 created_at: EventId,
425 object: Arc<T>,
426 updatedness: Updatedness,
427 ) -> crdb_core::Result<Option<(Arc<T>, Vec<crdb_core::ReadPermsChanges>)>> {
428 self.cache.write().unwrap().remove(&object_id);
429 self.db
430 .create_and_return_rdep_changes(object_id, created_at, object, updatedness)
431 .await
432 }
433
434 async fn submit_and_return_rdep_changes<T: Object>(
435 &self,
436 object_id: ObjectId,
437 event_id: EventId,
438 event: Arc<T::Event>,
439 updatedness: Updatedness,
440 ) -> crdb_core::Result<Option<(Arc<T>, Vec<crdb_core::ReadPermsChanges>)>> {
441 self.cache.write().unwrap().remove(&object_id);
442 self.db
443 .submit_and_return_rdep_changes(object_id, event_id, event, updatedness)
444 .await
445 }
446
447 async fn update_pending_rdeps(&self) -> crdb_core::Result<()> {
448 self.db.update_pending_rdeps().await
449 }
450
451 async fn login_session(
452 &self,
453 session: Session,
454 ) -> crdb_core::Result<(SessionToken, SessionRef)> {
455 self.db.login_session(session).await
456 }
457
458 async fn resume_session(&self, token: SessionToken) -> crdb_core::Result<Session> {
459 self.db.resume_session(token).await
460 }
461
462 async fn mark_session_active(
463 &self,
464 token: SessionToken,
465 at: SystemTime,
466 ) -> crdb_core::Result<()> {
467 self.db.mark_session_active(token, at).await
468 }
469
470 async fn rename_session<'a>(
471 &'a self,
472 token: SessionToken,
473 new_name: &'a str,
474 ) -> crdb_core::Result<()> {
475 self.db.rename_session(token, new_name).await
476 }
477
478 async fn list_sessions(&self, user: User) -> crdb_core::Result<Vec<Session>> {
479 self.db.list_sessions(user).await
480 }
481
482 async fn disconnect_session(&self, user: User, session: SessionRef) -> crdb_core::Result<()> {
483 self.db.disconnect_session(user, session).await
484 }
485}