whatsapp_rust/cache_store.rs
1//! Typed cache wrapper that dispatches to either moka (in-process) or a custom
2//! [`CacheStore`] backend (e.g., Redis).
3//!
4//! [`TypedCache`] presents the same interface regardless of the backing store.
5//! Keys are serialised via [`Display`]; values are serialised with `serde_json`
6//! only on the custom-store path — the moka path has zero extra overhead.
7
8use std::borrow::Borrow;
9use std::fmt::Display;
10use std::marker::PhantomData;
11use std::sync::Arc;
12use std::time::Duration;
13
14use crate::cache::Cache;
15use serde::{Serialize, de::DeserializeOwned};
16
17pub use wacore::store::cache::CacheStore;
18
19// ── Internal storage variant ──────────────────────────────────────────────────
20
21enum Inner<K, V> {
22 Moka(Cache<K, V>),
23 Custom {
24 store: Arc<dyn CacheStore>,
25 namespace: &'static str,
26 ttl: Option<Duration>,
27 _marker: PhantomData<fn(K, V)>,
28 },
29}
30
31// ── TypedCache ─────────────────────────────────────────────────────────────────
32
33/// A cache over `K → V` backed by either moka or any [`CacheStore`].
34///
35/// The moka path has **zero extra overhead** — values are stored in-process
36/// without any serialisation. The custom-store path serialises values with
37/// `serde_json` and keys via [`Display`].
38///
39/// Methods mirror moka's API so call sites need no changes.
40pub struct TypedCache<K, V> {
41 inner: Inner<K, V>,
42}
43
44impl<K, V> TypedCache<K, V>
45where
46 K: std::hash::Hash + Eq + Clone + Send + Sync + 'static,
47 V: Clone + Send + Sync + 'static,
48{
49 /// Wrap an existing cache (zero overhead vs. using the cache directly).
50 pub fn from_moka(cache: Cache<K, V>) -> Self {
51 Self {
52 inner: Inner::Moka(cache),
53 }
54 }
55}
56
57impl<K, V> TypedCache<K, V>
58where
59 K: std::hash::Hash + Eq + Clone + Display + Send + Sync + 'static,
60 V: Clone + Serialize + DeserializeOwned + Send + Sync + 'static,
61{
62 /// Create a cache backed by a custom store.
63 ///
64 /// - `namespace` — unique string for this cache (e.g., `"group"`)
65 /// - `ttl` — forwarded to [`CacheStore::set`]; `None` means no expiry
66 pub fn from_store(
67 store: Arc<dyn CacheStore>,
68 namespace: &'static str,
69 ttl: Option<Duration>,
70 ) -> Self {
71 Self {
72 inner: Inner::Custom {
73 store,
74 namespace,
75 ttl,
76 _marker: PhantomData,
77 },
78 }
79 }
80
81 /// Look up a value.
82 ///
83 /// Accepts borrowed keys (`&str` for `String`, `&Jid` for `Jid`, etc.)
84 /// following the same pattern as [`std::collections::HashMap::get`].
85 ///
86 /// Cache misses and deserialisation failures both return `None`; the
87 /// caller re-fetches from the authoritative source.
88 pub async fn get<Q>(&self, key: &Q) -> Option<V>
89 where
90 K: Borrow<Q>,
91 Q: std::hash::Hash + Eq + Display + ?Sized,
92 {
93 match &self.inner {
94 Inner::Moka(cache) => cache.get(key).await,
95 Inner::Custom {
96 store, namespace, ..
97 } => {
98 let key_str = key.to_string();
99 match store.get(namespace, &key_str).await {
100 Ok(Some(bytes)) => serde_json::from_slice(&bytes)
101 .inspect_err(|e| {
102 log::warn!(
103 "TypedCache[{namespace}]: deserialise failed for {key_str}: {e}"
104 );
105 })
106 .ok(),
107 Ok(None) => None,
108 Err(e) => {
109 log::warn!("TypedCache[{namespace}]: get({key_str}) error: {e}");
110 None
111 }
112 }
113 }
114 }
115 }
116
117 /// Insert or update a value (takes ownership of key and value).
118 pub async fn insert(&self, key: K, value: V) {
119 match &self.inner {
120 Inner::Moka(cache) => cache.insert(key, value).await,
121 Inner::Custom {
122 store,
123 namespace,
124 ttl,
125 ..
126 } => {
127 let key_str = key.to_string();
128 match serde_json::to_vec(&value) {
129 Ok(bytes) => {
130 if let Err(e) = store.set(namespace, &key_str, &bytes, *ttl).await {
131 log::warn!("TypedCache[{namespace}]: set({key_str}) error: {e}");
132 }
133 }
134 Err(e) => {
135 log::warn!("TypedCache[{namespace}]: serialise failed for {key_str}: {e}");
136 }
137 }
138 }
139 }
140 }
141
142 /// Remove a single key.
143 ///
144 /// Accepts borrowed keys following the same pattern as `get`.
145 pub async fn invalidate<Q>(&self, key: &Q)
146 where
147 K: Borrow<Q>,
148 Q: std::hash::Hash + Eq + Display + ?Sized,
149 {
150 match &self.inner {
151 Inner::Moka(cache) => cache.invalidate(key).await,
152 Inner::Custom {
153 store, namespace, ..
154 } => {
155 let key_str = key.to_string();
156 if let Err(e) = store.delete(namespace, &key_str).await {
157 log::warn!("TypedCache[{namespace}]: delete({key_str}) error: {e}");
158 }
159 }
160 }
161 }
162
163 /// Remove all entries.
164 ///
165 /// For the moka backend this is synchronous (matching moka's API).
166 /// For the custom backend this spawns a fire-and-forget task via
167 /// [`tokio::runtime::Handle::try_current`] (requires `tokio-runtime`
168 /// feature) to avoid panicking if called outside a Tokio runtime.
169 /// Without `tokio-runtime`, the clear is skipped with a warning.
170 pub fn invalidate_all(&self) {
171 match &self.inner {
172 Inner::Moka(cache) => cache.invalidate_all(),
173 Inner::Custom {
174 store, namespace, ..
175 } => {
176 let _store = store.clone();
177 let _ns = *namespace;
178 #[cfg(all(not(target_arch = "wasm32"), feature = "tokio-runtime"))]
179 match tokio::runtime::Handle::try_current() {
180 Ok(handle) => {
181 handle.spawn(async move {
182 if let Err(e) = _store.clear(_ns).await {
183 log::warn!("TypedCache[{_ns}]: clear() error: {e}");
184 }
185 });
186 }
187 Err(_) => {
188 log::warn!("TypedCache[{_ns}]: clear() skipped: no runtime");
189 }
190 }
191 #[cfg(all(not(target_arch = "wasm32"), not(feature = "tokio-runtime")))]
192 log::warn!("TypedCache[{_ns}]: clear() skipped: tokio-runtime feature not enabled");
193 }
194 }
195 }
196
197 /// Remove all entries, awaiting completion for custom backends.
198 pub async fn clear(&self) {
199 match &self.inner {
200 Inner::Moka(cache) => cache.invalidate_all(),
201 Inner::Custom {
202 store, namespace, ..
203 } => {
204 if let Err(e) = store.clear(namespace).await {
205 log::warn!("TypedCache[{namespace}]: clear() error: {e}");
206 }
207 }
208 }
209 }
210
211 /// Run any pending internal housekeeping tasks (moka only).
212 ///
213 /// For the moka backend this ensures all writes have been applied before
214 /// calling [`entry_count`](Self::entry_count), which can otherwise lag.
215 /// For custom backends this is a no-op.
216 pub async fn run_pending_tasks(&self) {
217 if let Inner::Moka(cache) = &self.inner {
218 cache.run_pending_tasks().await;
219 }
220 }
221
222 /// Approximate entry count (sync). Returns `0` for custom backends.
223 ///
224 /// For diagnostics that need custom backend counts, use
225 /// [`entry_count_async`](Self::entry_count_async) instead.
226 pub fn entry_count(&self) -> u64 {
227 match &self.inner {
228 Inner::Moka(cache) => cache.entry_count(),
229 Inner::Custom { .. } => 0,
230 }
231 }
232
233 /// Approximate entry count, delegating to the custom backend if available.
234 pub async fn entry_count_async(&self) -> u64 {
235 match &self.inner {
236 Inner::Moka(cache) => cache.entry_count(),
237 Inner::Custom {
238 store, namespace, ..
239 } => store.entry_count(namespace).await.unwrap_or(0),
240 }
241 }
242}