zlayer_storage/replicator/
cache.rs1use crate::error::Result;
7use serde::{Deserialize, Serialize};
8use std::collections::VecDeque;
9use std::path::PathBuf;
10use std::sync::atomic::{AtomicU64, AtomicUsize, Ordering};
11use std::sync::Arc;
12use tokio::sync::RwLock;
13use tracing::{debug, info, warn};
14
15#[derive(Debug, Clone, Serialize, Deserialize)]
17pub struct CacheEntry {
18 pub sequence: u64,
20 #[serde(with = "serde_bytes")]
22 pub data: Vec<u8>,
23 pub cached_at: chrono::DateTime<chrono::Utc>,
25 pub attempts: u32,
27}
28
29impl CacheEntry {
30 #[must_use]
32 pub fn new(sequence: u64, data: Vec<u8>) -> Self {
33 Self {
34 sequence,
35 data,
36 cached_at: chrono::Utc::now(),
37 attempts: 0,
38 }
39 }
40
41 #[must_use]
43 pub fn size(&self) -> u64 {
44 self.data.len() as u64
45 }
46}
47
48pub struct WriteCache {
54 cache_dir: PathBuf,
56 max_size: u64,
58 entries: Arc<RwLock<VecDeque<CacheEntry>>>,
60 current_size: Arc<AtomicU64>,
62 entry_count: Arc<AtomicUsize>,
64}
65
66impl WriteCache {
67 pub fn new(cache_dir: PathBuf, max_size: u64) -> Self {
74 Self {
75 cache_dir,
76 max_size,
77 entries: Arc::new(RwLock::new(VecDeque::new())),
78 current_size: Arc::new(AtomicU64::new(0)),
79 entry_count: Arc::new(AtomicUsize::new(0)),
80 }
81 }
82
83 #[allow(dead_code)]
85 pub async fn load(&self) -> Result<()> {
86 if !self.cache_dir.exists() {
87 return Ok(());
88 }
89
90 let mut entries = self.entries.write().await;
91 let mut dir = tokio::fs::read_dir(&self.cache_dir).await?;
92
93 while let Some(entry) = dir.next_entry().await? {
94 let path = entry.path();
95 if path.extension().and_then(|e| e.to_str()) == Some("cache") {
96 match tokio::fs::read(&path).await {
97 Ok(data) => match serde_json::from_slice::<CacheEntry>(&data) {
98 Ok(cache_entry) => {
99 let size = cache_entry.size();
100 entries.push_back(cache_entry);
101 self.current_size.fetch_add(size, Ordering::SeqCst);
102 self.entry_count.fetch_add(1, Ordering::SeqCst);
103 }
104 Err(e) => {
105 warn!("Failed to parse cache entry {:?}: {}", path, e);
106 }
107 },
108 Err(e) => {
109 warn!("Failed to read cache entry {:?}: {}", path, e);
110 }
111 }
112 }
113 }
114
115 entries.make_contiguous().sort_by_key(|e| e.sequence);
117
118 info!(
119 "Loaded {} cache entries ({} bytes)",
120 entries.len(),
121 self.current_size.load(Ordering::SeqCst)
122 );
123
124 Ok(())
125 }
126
127 pub async fn add(&self, sequence: u64, data: Vec<u8>) -> Result<()> {
131 let entry = CacheEntry::new(sequence, data);
132 let entry_size = entry.size();
133
134 while self.current_size.load(Ordering::SeqCst) + entry_size > self.max_size {
136 if let Some(evicted) = self.pop_oldest_internal().await? {
137 warn!(
138 "Cache full, evicting entry {} ({} bytes)",
139 evicted.sequence,
140 evicted.size()
141 );
142 self.remove_from_disk(&evicted).await?;
143 } else {
144 break;
145 }
146 }
147
148 let filename = format!("{:020}.cache", entry.sequence);
150 let path = self.cache_dir.join(&filename);
151 let serialized = serde_json::to_vec(&entry)?;
152 tokio::fs::write(&path, &serialized).await?;
153
154 let mut entries = self.entries.write().await;
156 entries.push_back(entry);
157 self.current_size.fetch_add(entry_size, Ordering::SeqCst);
158 self.entry_count.fetch_add(1, Ordering::SeqCst);
159
160 debug!(
161 "Cached WAL segment {} ({} bytes, {} total entries)",
162 sequence,
163 entry_size,
164 entries.len()
165 );
166
167 Ok(())
168 }
169
170 pub async fn pop_oldest(&self) -> Result<Option<CacheEntry>> {
175 let mut entries = self.entries.write().await;
176 if let Some(entry) = entries.pop_front() {
177 let size = entry.size();
178 self.current_size.fetch_sub(size, Ordering::SeqCst);
179 self.entry_count.fetch_sub(1, Ordering::SeqCst);
180 Ok(Some(entry))
181 } else {
182 Ok(None)
183 }
184 }
185
186 async fn pop_oldest_internal(&self) -> Result<Option<CacheEntry>> {
188 let mut entries = self.entries.write().await;
189 if let Some(entry) = entries.pop_front() {
190 let size = entry.size();
191 self.current_size.fetch_sub(size, Ordering::SeqCst);
192 self.entry_count.fetch_sub(1, Ordering::SeqCst);
193 Ok(Some(entry))
194 } else {
195 Ok(None)
196 }
197 }
198
199 pub async fn remove(&self, entry: &CacheEntry) -> Result<()> {
201 self.remove_from_disk(entry).await
202 }
203
204 async fn remove_from_disk(&self, entry: &CacheEntry) -> Result<()> {
206 let filename = format!("{:020}.cache", entry.sequence);
207 let path = self.cache_dir.join(&filename);
208 if path.exists() {
209 tokio::fs::remove_file(&path).await?;
210 }
211 Ok(())
212 }
213
214 #[allow(dead_code)]
216 pub async fn peek_oldest(&self) -> Option<CacheEntry> {
217 let entries = self.entries.read().await;
218 entries.front().cloned()
219 }
220
221 #[allow(dead_code)]
223 pub fn is_empty(&self) -> bool {
224 self.entry_count.load(Ordering::SeqCst) == 0
225 }
226
227 pub fn stats(&self) -> (usize, u64) {
229 (
230 self.entry_count.load(Ordering::SeqCst),
231 self.current_size.load(Ordering::SeqCst),
232 )
233 }
234
235 #[allow(dead_code)]
237 pub fn len(&self) -> usize {
238 self.entry_count.load(Ordering::SeqCst)
239 }
240
241 #[allow(dead_code)]
243 pub fn size(&self) -> u64 {
244 self.current_size.load(Ordering::SeqCst)
245 }
246
247 #[allow(dead_code)]
249 pub fn max_size(&self) -> u64 {
250 self.max_size
251 }
252
253 #[allow(dead_code)]
255 pub async fn clear(&self) -> Result<()> {
256 let mut entries = self.entries.write().await;
257
258 for entry in entries.iter() {
260 let _ = self.remove_from_disk(entry).await;
261 }
262
263 entries.clear();
264 self.current_size.store(0, Ordering::SeqCst);
265 self.entry_count.store(0, Ordering::SeqCst);
266
267 info!("Cache cleared");
268 Ok(())
269 }
270}
271
272#[cfg(test)]
273mod tests {
274 use super::*;
275 use tempfile::TempDir;
276
277 #[tokio::test]
278 async fn test_cache_add_and_pop() {
279 let temp = TempDir::new().unwrap();
280 let cache = WriteCache::new(temp.path().to_path_buf(), 1024 * 1024);
281
282 cache.add(1, vec![1, 2, 3]).await.unwrap();
284 cache.add(2, vec![4, 5, 6]).await.unwrap();
285 cache.add(3, vec![7, 8, 9]).await.unwrap();
286
287 assert_eq!(cache.len(), 3);
288
289 let entry1 = cache.pop_oldest().await.unwrap().unwrap();
291 assert_eq!(entry1.sequence, 1);
292 assert_eq!(entry1.data, vec![1, 2, 3]);
293
294 let entry2 = cache.pop_oldest().await.unwrap().unwrap();
295 assert_eq!(entry2.sequence, 2);
296
297 cache.remove(&entry1).await.unwrap();
299 cache.remove(&entry2).await.unwrap();
300
301 assert_eq!(cache.len(), 1);
302 }
303
304 #[tokio::test]
305 async fn test_cache_eviction() {
306 let temp = TempDir::new().unwrap();
307 let cache = WriteCache::new(temp.path().to_path_buf(), 20);
309
310 cache.add(1, vec![0; 10]).await.unwrap();
312 cache.add(2, vec![0; 10]).await.unwrap();
313 cache.add(3, vec![0; 10]).await.unwrap();
315
316 assert_eq!(cache.len(), 2);
317
318 let entry = cache.pop_oldest().await.unwrap().unwrap();
320 assert_eq!(entry.sequence, 2);
321 }
322
323 #[tokio::test]
324 async fn test_cache_persistence() {
325 let temp = TempDir::new().unwrap();
326
327 {
329 let cache = WriteCache::new(temp.path().to_path_buf(), 1024 * 1024);
330 cache.add(1, vec![1, 2, 3]).await.unwrap();
331 cache.add(2, vec![4, 5, 6]).await.unwrap();
332 }
333
334 {
336 let cache = WriteCache::new(temp.path().to_path_buf(), 1024 * 1024);
337 cache.load().await.unwrap();
338
339 assert_eq!(cache.len(), 2);
340
341 let entry = cache.pop_oldest().await.unwrap().unwrap();
342 assert_eq!(entry.sequence, 1);
343 }
344 }
345
346 #[tokio::test]
347 async fn test_cache_clear() {
348 let temp = TempDir::new().unwrap();
349 let cache = WriteCache::new(temp.path().to_path_buf(), 1024 * 1024);
350
351 cache.add(1, vec![1, 2, 3]).await.unwrap();
352 cache.add(2, vec![4, 5, 6]).await.unwrap();
353
354 cache.clear().await.unwrap();
355
356 assert!(cache.is_empty());
357 assert_eq!(cache.size(), 0);
358 }
359}