whatsapp_rust/cache_store.rs
1//! Typed cache wrapper that dispatches to either moka (in-process) or a custom
2//! [`CacheStore`] backend (e.g., Redis).
3//!
4//! [`TypedCache`] presents the same interface regardless of the backing store.
5//! Keys are serialised via [`Display`]; values are serialised with `serde_json`
6//! only on the custom-store path — the moka path has zero extra overhead.
7
8use std::borrow::Borrow;
9use std::fmt::Display;
10use std::marker::PhantomData;
11use std::sync::Arc;
12use std::time::Duration;
13
14use crate::cache::Cache;
15use serde::{Serialize, de::DeserializeOwned};
16
17pub use wacore::store::cache::CacheStore;
18
19// ── Internal storage variant ──────────────────────────────────────────────────
20
21enum Inner<K, V> {
22 Moka(Cache<K, V>),
23 Custom {
24 store: Arc<dyn CacheStore>,
25 namespace: &'static str,
26 ttl: Option<Duration>,
27 _marker: PhantomData<fn(K, V)>,
28 },
29}
30
31// ── TypedCache ─────────────────────────────────────────────────────────────────
32
33/// A cache over `K → V` backed by either moka or any [`CacheStore`].
34///
35/// The moka path has **zero extra overhead** — values are stored in-process
36/// without any serialisation. The custom-store path serialises values with
37/// `serde_json` and keys via [`Display`].
38///
39/// Methods mirror moka's API so call sites need no changes.
40pub struct TypedCache<K, V> {
41 inner: Inner<K, V>,
42}
43
44impl<K, V> TypedCache<K, V>
45where
46 K: std::hash::Hash + Eq + Clone + Send + Sync + 'static,
47 V: Clone + Send + Sync + 'static,
48{
49 /// Wrap an existing cache (zero overhead vs. using the cache directly).
50 pub fn from_moka(cache: Cache<K, V>) -> Self {
51 Self {
52 inner: Inner::Moka(cache),
53 }
54 }
55}
56
57impl<K, V> TypedCache<K, V>
58where
59 K: std::hash::Hash + Eq + Clone + Display + Send + Sync + 'static,
60 V: Clone + Serialize + DeserializeOwned + Send + Sync + 'static,
61{
62 /// Create a cache backed by a custom store.
63 ///
64 /// - `namespace` — unique string for this cache (e.g., `"group"`)
65 /// - `ttl` — forwarded to [`CacheStore::set`]; `None` means no expiry
66 pub fn from_store(
67 store: Arc<dyn CacheStore>,
68 namespace: &'static str,
69 ttl: Option<Duration>,
70 ) -> Self {
71 Self {
72 inner: Inner::Custom {
73 store,
74 namespace,
75 ttl,
76 _marker: PhantomData,
77 },
78 }
79 }
80
81 /// Look up a value.
82 ///
83 /// Accepts borrowed keys (`&str` for `String`, `&Jid` for `Jid`, etc.)
84 /// following the same pattern as [`std::collections::HashMap::get`].
85 ///
86 /// Cache misses and deserialisation failures both return `None`; the
87 /// caller re-fetches from the authoritative source.
88 pub async fn get<Q>(&self, key: &Q) -> Option<V>
89 where
90 K: Borrow<Q>,
91 Q: std::hash::Hash + Eq + Display + ?Sized,
92 {
93 match &self.inner {
94 Inner::Moka(cache) => cache.get(key).await,
95 Inner::Custom {
96 store, namespace, ..
97 } => {
98 let key_str = key.to_string();
99 match store.get(namespace, &key_str).await {
100 Ok(Some(bytes)) => serde_json::from_slice(&bytes)
101 .inspect_err(|e| {
102 log::warn!(
103 "TypedCache[{namespace}]: deserialise failed for {key_str}: {e}"
104 );
105 })
106 .ok(),
107 Ok(None) => None,
108 Err(e) => {
109 log::warn!("TypedCache[{namespace}]: get({key_str}) error: {e}");
110 None
111 }
112 }
113 }
114 }
115 }
116
117 /// Insert or update a value (takes ownership of key and value).
118 pub async fn insert(&self, key: K, value: V) {
119 match &self.inner {
120 Inner::Moka(cache) => cache.insert(key, value).await,
121 Inner::Custom {
122 store,
123 namespace,
124 ttl,
125 ..
126 } => {
127 let key_str = key.to_string();
128 match serde_json::to_vec(&value) {
129 Ok(bytes) => {
130 if let Err(e) = store.set(namespace, &key_str, &bytes, *ttl).await {
131 log::warn!("TypedCache[{namespace}]: set({key_str}) error: {e}");
132 }
133 }
134 Err(e) => {
135 log::warn!("TypedCache[{namespace}]: serialise failed for {key_str}: {e}");
136 }
137 }
138 }
139 }
140 }
141
142 /// Remove a single key.
143 ///
144 /// Accepts borrowed keys following the same pattern as `get`.
145 pub async fn invalidate<Q>(&self, key: &Q)
146 where
147 K: Borrow<Q>,
148 Q: std::hash::Hash + Eq + Display + ?Sized,
149 {
150 match &self.inner {
151 Inner::Moka(cache) => cache.invalidate(key).await,
152 Inner::Custom {
153 store, namespace, ..
154 } => {
155 let key_str = key.to_string();
156 if let Err(e) = store.delete(namespace, &key_str).await {
157 log::warn!("TypedCache[{namespace}]: delete({key_str}) error: {e}");
158 }
159 }
160 }
161 }
162
163 /// Remove all entries.
164 ///
165 /// For the moka backend this is synchronous (matching moka's API).
166 /// For the custom backend this spawns a fire-and-forget task via
167 /// [`tokio::runtime::Handle::try_current`] to avoid panicking if
168 /// called outside a Tokio runtime.
169 pub fn invalidate_all(&self) {
170 match &self.inner {
171 Inner::Moka(cache) => cache.invalidate_all(),
172 Inner::Custom {
173 store, namespace, ..
174 } => {
175 let _store = store.clone();
176 let _ns = *namespace;
177 #[cfg(not(target_arch = "wasm32"))]
178 match tokio::runtime::Handle::try_current() {
179 Ok(handle) => {
180 handle.spawn(async move {
181 if let Err(e) = _store.clear(_ns).await {
182 log::warn!("TypedCache[{_ns}]: clear() error: {e}");
183 }
184 });
185 }
186 Err(_) => {
187 log::warn!("TypedCache[{_ns}]: clear() skipped: no runtime");
188 }
189 }
190 }
191 }
192 }
193
194 /// Remove all entries, awaiting completion for custom backends.
195 pub async fn clear(&self) {
196 match &self.inner {
197 Inner::Moka(cache) => cache.invalidate_all(),
198 Inner::Custom {
199 store, namespace, ..
200 } => {
201 if let Err(e) = store.clear(namespace).await {
202 log::warn!("TypedCache[{namespace}]: clear() error: {e}");
203 }
204 }
205 }
206 }
207
208 /// Run any pending internal housekeeping tasks (moka only).
209 ///
210 /// For the moka backend this ensures all writes have been applied before
211 /// calling [`entry_count`](Self::entry_count), which can otherwise lag.
212 /// For custom backends this is a no-op.
213 pub async fn run_pending_tasks(&self) {
214 if let Inner::Moka(cache) = &self.inner {
215 cache.run_pending_tasks().await;
216 }
217 }
218
219 /// Approximate entry count (sync). Returns `0` for custom backends.
220 ///
221 /// For diagnostics that need custom backend counts, use
222 /// [`entry_count_async`](Self::entry_count_async) instead.
223 pub fn entry_count(&self) -> u64 {
224 match &self.inner {
225 Inner::Moka(cache) => cache.entry_count(),
226 Inner::Custom { .. } => 0,
227 }
228 }
229
230 /// Approximate entry count, delegating to the custom backend if available.
231 pub async fn entry_count_async(&self) -> u64 {
232 match &self.inner {
233 Inner::Moka(cache) => cache.entry_count(),
234 Inner::Custom {
235 store, namespace, ..
236 } => store.entry_count(namespace).await.unwrap_or(0),
237 }
238 }
239}