Skip to main content

zlayer_storage/replicator/
cache.rs

1//! Local write cache for network tolerance
2//!
3//! Buffers WAL segments locally when network is unavailable, maintaining FIFO order
4//! for upload and respecting configurable size limits.
5
6use crate::error::Result;
7use serde::{Deserialize, Serialize};
8use std::collections::VecDeque;
9use std::path::PathBuf;
10use std::sync::atomic::{AtomicU64, AtomicUsize, Ordering};
11use std::sync::Arc;
12use tokio::sync::RwLock;
13use tracing::{debug, info, warn};
14
15/// A cached WAL segment entry
16#[derive(Debug, Clone, Serialize, Deserialize)]
17pub struct CacheEntry {
18    /// Sequence number (frame count at time of capture)
19    pub sequence: u64,
20    /// WAL data
21    #[serde(with = "serde_bytes")]
22    pub data: Vec<u8>,
23    /// Timestamp when cached
24    pub cached_at: chrono::DateTime<chrono::Utc>,
25    /// Number of upload attempts
26    pub attempts: u32,
27}
28
29impl CacheEntry {
30    /// Create a new cache entry
31    #[must_use]
32    pub fn new(sequence: u64, data: Vec<u8>) -> Self {
33        Self {
34            sequence,
35            data,
36            cached_at: chrono::Utc::now(),
37            attempts: 0,
38        }
39    }
40
41    /// Size of this entry in bytes
42    #[must_use]
43    pub fn size(&self) -> u64 {
44        self.data.len() as u64
45    }
46}
47
48/// Local write cache for WAL segments
49///
50/// Provides network tolerance by buffering WAL segments locally when S3 is
51/// unavailable. Entries are stored in FIFO order and persisted to disk for
52/// crash recovery.
53pub struct WriteCache {
54    /// Cache directory for persisted entries
55    cache_dir: PathBuf,
56    /// Maximum cache size in bytes
57    max_size: u64,
58    /// In-memory queue of entries
59    entries: Arc<RwLock<VecDeque<CacheEntry>>>,
60    /// Current total size in bytes
61    current_size: Arc<AtomicU64>,
62    /// Number of entries
63    entry_count: Arc<AtomicUsize>,
64}
65
66impl WriteCache {
67    /// Create a new write cache
68    ///
69    /// # Arguments
70    ///
71    /// * `cache_dir` - Directory for persisting cache entries
72    /// * `max_size` - Maximum total size of cached data in bytes
73    pub fn new(cache_dir: PathBuf, max_size: u64) -> Self {
74        Self {
75            cache_dir,
76            max_size,
77            entries: Arc::new(RwLock::new(VecDeque::new())),
78            current_size: Arc::new(AtomicU64::new(0)),
79            entry_count: Arc::new(AtomicUsize::new(0)),
80        }
81    }
82
83    /// Load existing cache entries from disk
84    #[allow(dead_code)]
85    pub async fn load(&self) -> Result<()> {
86        if !self.cache_dir.exists() {
87            return Ok(());
88        }
89
90        let mut entries = self.entries.write().await;
91        let mut dir = tokio::fs::read_dir(&self.cache_dir).await?;
92
93        while let Some(entry) = dir.next_entry().await? {
94            let path = entry.path();
95            if path.extension().and_then(|e| e.to_str()) == Some("cache") {
96                match tokio::fs::read(&path).await {
97                    Ok(data) => match serde_json::from_slice::<CacheEntry>(&data) {
98                        Ok(cache_entry) => {
99                            let size = cache_entry.size();
100                            entries.push_back(cache_entry);
101                            self.current_size.fetch_add(size, Ordering::SeqCst);
102                            self.entry_count.fetch_add(1, Ordering::SeqCst);
103                        }
104                        Err(e) => {
105                            warn!("Failed to parse cache entry {:?}: {}", path, e);
106                        }
107                    },
108                    Err(e) => {
109                        warn!("Failed to read cache entry {:?}: {}", path, e);
110                    }
111                }
112            }
113        }
114
115        // Sort by sequence number
116        entries.make_contiguous().sort_by_key(|e| e.sequence);
117
118        info!(
119            "Loaded {} cache entries ({} bytes)",
120            entries.len(),
121            self.current_size.load(Ordering::SeqCst)
122        );
123
124        Ok(())
125    }
126
127    /// Add a new entry to the cache
128    ///
129    /// If the cache is full, this will evict the oldest entries to make room.
130    pub async fn add(&self, sequence: u64, data: Vec<u8>) -> Result<()> {
131        let entry = CacheEntry::new(sequence, data);
132        let entry_size = entry.size();
133
134        // Evict old entries if necessary
135        while self.current_size.load(Ordering::SeqCst) + entry_size > self.max_size {
136            if let Some(evicted) = self.pop_oldest_internal().await? {
137                warn!(
138                    "Cache full, evicting entry {} ({} bytes)",
139                    evicted.sequence,
140                    evicted.size()
141                );
142                self.remove_from_disk(&evicted).await?;
143            } else {
144                break;
145            }
146        }
147
148        // Persist to disk
149        let filename = format!("{:020}.cache", entry.sequence);
150        let path = self.cache_dir.join(&filename);
151        let serialized = serde_json::to_vec(&entry)?;
152        tokio::fs::write(&path, &serialized).await?;
153
154        // Add to in-memory queue
155        let mut entries = self.entries.write().await;
156        entries.push_back(entry);
157        self.current_size.fetch_add(entry_size, Ordering::SeqCst);
158        self.entry_count.fetch_add(1, Ordering::SeqCst);
159
160        debug!(
161            "Cached WAL segment {} ({} bytes, {} total entries)",
162            sequence,
163            entry_size,
164            entries.len()
165        );
166
167        Ok(())
168    }
169
170    /// Pop the oldest entry from the cache (for upload)
171    ///
172    /// The entry is removed from the in-memory queue but NOT from disk.
173    /// Call `remove()` after successful upload to clean up the disk file.
174    pub async fn pop_oldest(&self) -> Result<Option<CacheEntry>> {
175        let mut entries = self.entries.write().await;
176        if let Some(entry) = entries.pop_front() {
177            let size = entry.size();
178            self.current_size.fetch_sub(size, Ordering::SeqCst);
179            self.entry_count.fetch_sub(1, Ordering::SeqCst);
180            Ok(Some(entry))
181        } else {
182            Ok(None)
183        }
184    }
185
186    /// Internal pop without modifying counters (for eviction)
187    async fn pop_oldest_internal(&self) -> Result<Option<CacheEntry>> {
188        let mut entries = self.entries.write().await;
189        if let Some(entry) = entries.pop_front() {
190            let size = entry.size();
191            self.current_size.fetch_sub(size, Ordering::SeqCst);
192            self.entry_count.fetch_sub(1, Ordering::SeqCst);
193            Ok(Some(entry))
194        } else {
195            Ok(None)
196        }
197    }
198
199    /// Remove an entry from disk after successful upload
200    pub async fn remove(&self, entry: &CacheEntry) -> Result<()> {
201        self.remove_from_disk(entry).await
202    }
203
204    /// Remove entry file from disk
205    async fn remove_from_disk(&self, entry: &CacheEntry) -> Result<()> {
206        let filename = format!("{:020}.cache", entry.sequence);
207        let path = self.cache_dir.join(&filename);
208        if path.exists() {
209            tokio::fs::remove_file(&path).await?;
210        }
211        Ok(())
212    }
213
214    /// Peek at the oldest entry without removing it
215    #[allow(dead_code)]
216    pub async fn peek_oldest(&self) -> Option<CacheEntry> {
217        let entries = self.entries.read().await;
218        entries.front().cloned()
219    }
220
221    /// Check if the cache is empty
222    #[allow(dead_code)]
223    pub fn is_empty(&self) -> bool {
224        self.entry_count.load(Ordering::SeqCst) == 0
225    }
226
227    /// Get cache statistics
228    pub fn stats(&self) -> (usize, u64) {
229        (
230            self.entry_count.load(Ordering::SeqCst),
231            self.current_size.load(Ordering::SeqCst),
232        )
233    }
234
235    /// Get the number of cached entries
236    #[allow(dead_code)]
237    pub fn len(&self) -> usize {
238        self.entry_count.load(Ordering::SeqCst)
239    }
240
241    /// Get the current cache size in bytes
242    #[allow(dead_code)]
243    pub fn size(&self) -> u64 {
244        self.current_size.load(Ordering::SeqCst)
245    }
246
247    /// Get the maximum cache size in bytes
248    #[allow(dead_code)]
249    pub fn max_size(&self) -> u64 {
250        self.max_size
251    }
252
253    /// Clear all entries from the cache
254    #[allow(dead_code)]
255    pub async fn clear(&self) -> Result<()> {
256        let mut entries = self.entries.write().await;
257
258        // Remove all disk files
259        for entry in entries.iter() {
260            let _ = self.remove_from_disk(entry).await;
261        }
262
263        entries.clear();
264        self.current_size.store(0, Ordering::SeqCst);
265        self.entry_count.store(0, Ordering::SeqCst);
266
267        info!("Cache cleared");
268        Ok(())
269    }
270}
271
272#[cfg(test)]
273mod tests {
274    use super::*;
275    use tempfile::TempDir;
276
277    #[tokio::test]
278    async fn test_cache_add_and_pop() {
279        let temp = TempDir::new().unwrap();
280        let cache = WriteCache::new(temp.path().to_path_buf(), 1024 * 1024);
281
282        // Add entries
283        cache.add(1, vec![1, 2, 3]).await.unwrap();
284        cache.add(2, vec![4, 5, 6]).await.unwrap();
285        cache.add(3, vec![7, 8, 9]).await.unwrap();
286
287        assert_eq!(cache.len(), 3);
288
289        // Pop in FIFO order
290        let entry1 = cache.pop_oldest().await.unwrap().unwrap();
291        assert_eq!(entry1.sequence, 1);
292        assert_eq!(entry1.data, vec![1, 2, 3]);
293
294        let entry2 = cache.pop_oldest().await.unwrap().unwrap();
295        assert_eq!(entry2.sequence, 2);
296
297        // Remove from disk
298        cache.remove(&entry1).await.unwrap();
299        cache.remove(&entry2).await.unwrap();
300
301        assert_eq!(cache.len(), 1);
302    }
303
304    #[tokio::test]
305    async fn test_cache_eviction() {
306        let temp = TempDir::new().unwrap();
307        // Max size of 20 bytes
308        let cache = WriteCache::new(temp.path().to_path_buf(), 20);
309
310        // Add entries that exceed max size
311        cache.add(1, vec![0; 10]).await.unwrap();
312        cache.add(2, vec![0; 10]).await.unwrap();
313        // This should evict entry 1
314        cache.add(3, vec![0; 10]).await.unwrap();
315
316        assert_eq!(cache.len(), 2);
317
318        // Entry 1 should have been evicted
319        let entry = cache.pop_oldest().await.unwrap().unwrap();
320        assert_eq!(entry.sequence, 2);
321    }
322
323    #[tokio::test]
324    async fn test_cache_persistence() {
325        let temp = TempDir::new().unwrap();
326
327        // Create and populate cache
328        {
329            let cache = WriteCache::new(temp.path().to_path_buf(), 1024 * 1024);
330            cache.add(1, vec![1, 2, 3]).await.unwrap();
331            cache.add(2, vec![4, 5, 6]).await.unwrap();
332        }
333
334        // Create new cache and load from disk
335        {
336            let cache = WriteCache::new(temp.path().to_path_buf(), 1024 * 1024);
337            cache.load().await.unwrap();
338
339            assert_eq!(cache.len(), 2);
340
341            let entry = cache.pop_oldest().await.unwrap().unwrap();
342            assert_eq!(entry.sequence, 1);
343        }
344    }
345
346    #[tokio::test]
347    async fn test_cache_clear() {
348        let temp = TempDir::new().unwrap();
349        let cache = WriteCache::new(temp.path().to_path_buf(), 1024 * 1024);
350
351        cache.add(1, vec![1, 2, 3]).await.unwrap();
352        cache.add(2, vec![4, 5, 6]).await.unwrap();
353
354        cache.clear().await.unwrap();
355
356        assert!(cache.is_empty());
357        assert_eq!(cache.size(), 0);
358    }
359}