1use crate::types::{QueryKey, QueryMeta, QueryStatus, QueryObserverId, QueryKeyPattern};
6use crate::retry::QueryError;
7use crate::infinite::{InfiniteQueryOptions, Page};
8use serde::{Deserialize, Serialize};
9use serde::de::DeserializeOwned;
10use std::collections::HashMap;
11use std::sync::Arc;
12use std::time::{Duration, Instant};
13use parking_lot::RwLock;
14
15#[derive(Clone, Debug, Serialize, Deserialize)]
17pub struct SerializedData {
18 pub data: Vec<u8>,
19 #[serde(with = "instant_serde")]
20 pub timestamp: Instant,
21}
22
23#[derive(Clone, Debug, Serialize, Deserialize)]
25pub struct CacheEntry {
26 pub data: SerializedData,
27 pub meta: QueryMeta,
28}
29
30impl CacheEntry {
31 pub fn is_stale(&self) -> bool {
33 self.meta.is_stale()
34 }
35
36 pub fn get_data<T: DeserializeOwned>(&self) -> Result<T, QueryError> {
38 bincode::deserialize(&self.data.data)
39 .map_err(|e| QueryError::SerializationError(e.to_string()))
40 }
41}
42
43#[derive(Clone)]
45pub struct QueryClient {
46 cache: Arc<RwLock<HashMap<QueryKey, CacheEntry>>>,
47 stale_time: Duration,
48 cache_time: Duration,
49}
50
51impl QueryClient {
52 pub fn new() -> Self {
54 Self {
55 cache: Arc::new(RwLock::new(HashMap::new())),
56 stale_time: Duration::from_secs(0),
57 cache_time: Duration::from_secs(5 * 60), }
59 }
60
61 pub fn with_settings(stale_time: Duration, cache_time: Duration) -> Self {
63 Self {
64 cache: Arc::new(RwLock::new(HashMap::new())),
65 stale_time,
66 cache_time,
67 }
68 }
69
70 pub fn get_cache_entry(&self, key: &QueryKey) -> Option<CacheEntry> {
72 let cache = self.cache.read();
73 cache.get(key).cloned()
74 }
75
76 pub fn set_query_data<T: Serialize>(
78 &self,
79 key: &QueryKey,
80 data: T,
81 ) -> Result<(), QueryError> {
82 let serialized = bincode::serialize(&data)
83 .map_err(|e| QueryError::SerializationError(e.to_string()))?;
84
85 let entry = CacheEntry {
86 data: SerializedData {
87 data: serialized,
88 timestamp: Instant::now(),
89 },
90 meta: QueryMeta {
91 status: QueryStatus::Success,
92 updated_at: Instant::now(),
93 stale_time: self.stale_time,
94 cache_time: self.cache_time,
95 },
96 };
97
98 let mut cache = self.cache.write();
99 cache.insert(key.clone(), entry);
100
101 Ok(())
102 }
103
104 pub fn get_query_data<T: DeserializeOwned>(&self, key: &QueryKey) -> Option<T> {
106 let cache = self.cache.read();
107 if let Some(entry) = cache.get(key) {
108 bincode::deserialize(&entry.data.data).ok()
109 } else {
110 None
111 }
112 }
113
114 pub fn remove_query(&self, key: &QueryKey) {
116 let mut cache = self.cache.write();
117 cache.remove(key);
118 }
119
120 pub fn clear_cache(&self) {
122 let mut cache = self.cache.write();
123 cache.clear();
124 }
125
126 pub fn cache_stats(&self) -> CacheStats {
128 let cache = self.cache.read();
129 CacheStats {
130 total_entries: cache.len(),
131 stale_entries: cache.values().filter(|entry| entry.is_stale()).count(),
132 total_size: cache.values().map(|entry| entry.data.data.len()).sum(),
133 }
134 }
135
136 pub fn get_cache_entries(&self) -> Vec<(QueryKey, CacheEntry)> {
138 let cache = self.cache.read();
139 cache.iter().map(|(key, entry)| (key.clone(), entry.clone())).collect()
140 }
141
142 pub fn invalidate_queries(&self, pattern: &QueryKeyPattern) {
144 let mut cache = self.cache.write();
145 let keys_to_remove: Vec<QueryKey> = cache
146 .keys()
147 .filter(|key| key.matches_pattern(pattern))
148 .cloned()
149 .collect();
150
151 for key in keys_to_remove {
152 cache.remove(&key);
153 }
154 }
155
156 pub fn cleanup_stale_entries(&self) {
158 let mut cache = self.cache.write();
159 cache.retain(|_, entry| !entry.is_stale());
160 }
161
162 pub async fn fetch_infinite_page<T: Clone + Serialize + DeserializeOwned>(
165 &self,
166 _key: &QueryKey,
167 _page: usize,
168 ) -> Result<Page<T>, QueryError> {
169 todo!("Infinite page fetching not yet implemented")
172 }
173
174 pub fn get_infinite_options(&self, _key: &QueryKey) -> InfiniteQueryOptions {
176 InfiniteQueryOptions::default()
177 }
178
179 pub fn register_infinite_observer(&self, _key: &QueryKey) -> QueryObserverId {
181 QueryObserverId::new()
183 }
184}
185
186#[derive(Debug, Clone)]
188pub struct CacheStats {
189 pub total_entries: usize,
190 pub stale_entries: usize,
191 pub total_size: usize,
192}
193
194impl Default for QueryClient {
195 fn default() -> Self {
196 Self::new()
197 }
198}
199
200#[cfg(test)]
201mod tests {
202 use super::*;
203 use serde::{Serialize, Deserialize};
204
205 #[derive(Serialize, Deserialize, Clone, Debug, PartialEq)]
206 struct TestData {
207 value: i32,
208 text: String,
209 }
210
211 #[test]
212 fn test_cache_operations() {
213 let client = QueryClient::new();
214 let key = QueryKey::from("test");
215 let data = TestData {
216 value: 42,
217 text: "hello".to_string(),
218 };
219
220 assert!(client.set_query_data(&key, data.clone()).is_ok());
222
223 let entry = client.get_cache_entry(&key);
225 assert!(entry.is_some());
226
227 let cached_data = entry.unwrap().get_data::<TestData>().unwrap();
228 assert_eq!(cached_data, data);
229
230 client.remove_query(&key);
232 assert!(client.get_cache_entry(&key).is_none());
233 }
234
235 #[test]
236 fn test_cache_stats() {
237 let client = QueryClient::with_settings(
238 Duration::from_secs(60), Duration::from_secs(300) );
241 let key1 = QueryKey::from("test1");
242 let key2 = QueryKey::from("test2");
243
244 client.set_query_data(&key1, TestData { value: 1, text: "a".to_string() }).unwrap();
245 client.set_query_data(&key2, TestData { value: 2, text: "b".to_string() }).unwrap();
246
247 let stats = client.cache_stats();
248 assert_eq!(stats.total_entries, 2);
249 assert_eq!(stats.stale_entries, 0);
250 }
251}
252
253mod instant_serde {
255 use serde::{Deserialize, Deserializer, Serialize, Serializer};
256 use std::time::{Duration, Instant, SystemTime, UNIX_EPOCH};
257
258 pub fn serialize<S>(instant: &Instant, serializer: S) -> Result<S::Ok, S::Error>
259 where
260 S: Serializer,
261 {
262 let system_time = SystemTime::now() - instant.elapsed();
264 let duration = system_time.duration_since(UNIX_EPOCH).unwrap_or(Duration::ZERO);
265 duration.serialize(serializer)
266 }
267
268 pub fn deserialize<'de, D>(deserializer: D) -> Result<Instant, D::Error>
269 where
270 D: Deserializer<'de>,
271 {
272 let duration = Duration::deserialize(deserializer)?;
273 let system_time = UNIX_EPOCH + duration;
274 let now = SystemTime::now();
275 let elapsed = now.duration_since(system_time).unwrap_or(Duration::ZERO);
276 Ok(Instant::now() - elapsed)
277 }
278}