Skip to main content

fast_cache/
persistence.rs

1//! WAL and snapshot persistence support.
2//!
3//! The server uses [`PersistenceRuntime`] to append mutation records and write
4//! periodic snapshots. Embedded applications can use [`SnapshotStore`] through
5//! [`SnapshotRepository`] when they want to control persistence orchestration
6//! themselves.
7
8mod recovery;
9mod runtime;
10mod snapshot;
11mod stats;
12mod tcp_export;
13mod wal;
14
15use std::fs;
16use std::path::{Path, PathBuf};
17use std::sync::Arc;
18use std::thread::JoinHandle;
19
20use crossbeam_channel::Sender;
21use parking_lot::Mutex;
22
23use crate::config::PersistenceConfig;
24#[cfg(feature = "telemetry")]
25use crate::storage::{CacheTelemetry, CacheTelemetryHandle};
26use crate::storage::{MutationRecord, StoredEntry, WalStatsSnapshot};
27use crate::{FastCacheError, Result};
28
29use recovery::{PersistenceDataDir, RecoveryLoader};
30use runtime::{PersistenceStartup, StartedPersistenceRuntime, StopSignal};
31pub use snapshot::{LoadedSnapshot, SnapshotCompression, SnapshotRepository, SnapshotStore};
32use stats::WalStats;
33
34/// Immutable WAL wire frame shared across disk append and TCP export paths.
35///
36/// This matches the byte owner used by `bytes-handoff`, so retry and fanout
37/// paths can pass the encoded frame through without rebuilding it.
38pub(super) type WalFrameBytes = bytes::Bytes;
39
40#[derive(Debug, Clone)]
41pub struct RecoveryState {
42    pub entries: Vec<StoredEntry>,
43    pub snapshot_timestamp_ms: u64,
44}
45
46#[derive(Debug, Clone)]
47pub struct WalAppender {
48    sender: Sender<MutationRecord>,
49}
50
51pub struct PersistenceRuntime {
52    config: PersistenceConfig,
53    appenders: Vec<WalAppender>,
54    stats: Arc<WalStats>,
55    #[cfg(feature = "telemetry")]
56    _metrics_owner: Option<Arc<CacheTelemetry>>,
57    stop_tx: Option<Sender<()>>,
58    join_handle: Mutex<Option<JoinHandle<()>>>,
59    tcp_export_stop_tx: Option<Sender<()>>,
60    tcp_export_join_handle: Mutex<Option<JoinHandle<()>>>,
61}
62
63impl PersistenceRuntime {
64    pub fn start(shard_count: usize, config: PersistenceConfig) -> Result<Self> {
65        #[cfg(feature = "telemetry")]
66        {
67            Self::start_with_metrics(shard_count, config, None)
68        }
69        #[cfg(not(feature = "telemetry"))]
70        {
71            Self::start_inner(shard_count, config, ())
72        }
73    }
74
75    #[cfg(feature = "telemetry")]
76    pub fn start_with_metrics(
77        shard_count: usize,
78        config: PersistenceConfig,
79        metrics: Option<Arc<CacheTelemetry>>,
80    ) -> Result<Self> {
81        Self::start_inner(shard_count, config, metrics)
82    }
83
84    #[cfg(feature = "telemetry")]
85    fn start_inner(
86        shard_count: usize,
87        config: PersistenceConfig,
88        metrics: Option<Arc<CacheTelemetry>>,
89    ) -> Result<Self> {
90        Self::start_impl(shard_count, config, metrics)
91    }
92
93    #[cfg(not(feature = "telemetry"))]
94    fn start_inner(shard_count: usize, config: PersistenceConfig, _unit: ()) -> Result<Self> {
95        Self::start_impl(shard_count, config)
96    }
97
98    #[cfg(feature = "telemetry")]
99    fn start_impl(
100        shard_count: usize,
101        config: PersistenceConfig,
102        metrics: Option<Arc<CacheTelemetry>>,
103    ) -> Result<Self> {
104        match config.enabled {
105            true => Self::start_enabled(shard_count, config, metrics),
106            false => Ok(Self::disabled(config, None)),
107        }
108    }
109
110    #[cfg(not(feature = "telemetry"))]
111    fn start_impl(shard_count: usize, config: PersistenceConfig) -> Result<Self> {
112        match config.enabled {
113            true => Self::start_enabled(shard_count, config),
114            false => Ok(Self::disabled(config)),
115        }
116    }
117
118    #[cfg(feature = "telemetry")]
119    fn disabled(config: PersistenceConfig, metrics: Option<Arc<CacheTelemetry>>) -> Self {
120        Self {
121            config,
122            appenders: Vec::new(),
123            stats: Arc::new(WalStats::disabled()),
124            _metrics_owner: metrics,
125            stop_tx: None,
126            join_handle: Mutex::new(None),
127            tcp_export_stop_tx: None,
128            tcp_export_join_handle: Mutex::new(None),
129        }
130    }
131
132    #[cfg(not(feature = "telemetry"))]
133    fn disabled(config: PersistenceConfig) -> Self {
134        Self {
135            config,
136            appenders: Vec::new(),
137            stats: Arc::new(WalStats::disabled()),
138            stop_tx: None,
139            join_handle: Mutex::new(None),
140            tcp_export_stop_tx: None,
141            tcp_export_join_handle: Mutex::new(None),
142        }
143    }
144
145    #[cfg(feature = "telemetry")]
146    fn start_enabled(
147        shard_count: usize,
148        config: PersistenceConfig,
149        metrics: Option<Arc<CacheTelemetry>>,
150    ) -> Result<Self> {
151        let writer_metrics = metrics.as_ref().map(CacheTelemetryHandle::from_arc);
152        let runtime = Self::start_enabled_parts(shard_count, &config, writer_metrics)?;
153        Ok(Self {
154            config,
155            appenders: runtime.appenders,
156            stats: runtime.stats,
157            _metrics_owner: metrics,
158            stop_tx: Some(runtime.stop_tx),
159            join_handle: Mutex::new(Some(runtime.join_handle)),
160            tcp_export_stop_tx: runtime.tcp_export.stop_tx,
161            tcp_export_join_handle: Mutex::new(runtime.tcp_export.join_handle),
162        })
163    }
164
165    #[cfg(not(feature = "telemetry"))]
166    fn start_enabled(shard_count: usize, config: PersistenceConfig) -> Result<Self> {
167        let runtime = Self::start_enabled_parts(shard_count, &config)?;
168        Ok(Self {
169            config,
170            appenders: runtime.appenders,
171            stats: runtime.stats,
172            stop_tx: Some(runtime.stop_tx),
173            join_handle: Mutex::new(Some(runtime.join_handle)),
174            tcp_export_stop_tx: runtime.tcp_export.stop_tx,
175            tcp_export_join_handle: Mutex::new(runtime.tcp_export.join_handle),
176        })
177    }
178
179    #[cfg(feature = "telemetry")]
180    fn start_enabled_parts(
181        shard_count: usize,
182        config: &PersistenceConfig,
183        metrics: Option<CacheTelemetryHandle>,
184    ) -> Result<StartedPersistenceRuntime> {
185        PersistenceStartup::new(shard_count, config)?.start_writer(config.clone(), metrics)
186    }
187
188    #[cfg(not(feature = "telemetry"))]
189    fn start_enabled_parts(
190        shard_count: usize,
191        config: &PersistenceConfig,
192    ) -> Result<StartedPersistenceRuntime> {
193        PersistenceStartup::new(shard_count, config)?.start_writer(config.clone())
194    }
195
196    pub fn appender(&self, shard_id: usize) -> Option<WalAppender> {
197        self.appenders.get(shard_id).cloned()
198    }
199
200    pub fn is_enabled(&self) -> bool {
201        self.config.enabled
202    }
203
204    pub fn stats_snapshot(&self) -> WalStatsSnapshot {
205        self.stats.snapshot()
206    }
207
208    pub fn snapshot(&self, entries: &[StoredEntry], timestamp_ms: u64) -> Result<PathBuf> {
209        match self.config.enabled {
210            true => self.write_snapshot(entries, timestamp_ms),
211            false => Err(FastCacheError::Persistence(
212                "persistence is disabled".into(),
213            )),
214        }
215    }
216
217    fn write_snapshot(&self, entries: &[StoredEntry], timestamp_ms: u64) -> Result<PathBuf> {
218        fs::create_dir_all(&self.config.data_dir)?;
219        let snapshots = SnapshotStore::new(&self.config.data_dir);
220        let path = snapshots.write_snapshot(
221            entries,
222            timestamp_ms,
223            SnapshotCompression::from_enabled(self.config.compress_snapshots),
224        )?;
225        self.stats.record_snapshot_written();
226        wal::SegmentStore::new(&self.config.data_dir).prune_through(timestamp_ms)?;
227        Ok(path)
228    }
229
230    pub fn shutdown(&self) -> Result<()> {
231        Self::signal_stop(&self.stop_tx);
232        Self::signal_stop(&self.tcp_export_stop_tx);
233        Self::join_thread(&self.join_handle, "WAL thread panicked")?;
234        Self::join_thread(
235            &self.tcp_export_join_handle,
236            "TCP WAL exporter thread panicked",
237        )
238    }
239
240    fn signal_stop(stop_tx: &Option<Sender<()>>) {
241        match StopSignal::from_option(stop_tx) {
242            StopSignal::Enabled(stop_tx) => {
243                let _ = stop_tx.send(());
244            }
245            StopSignal::Disabled => {}
246        }
247    }
248
249    fn join_thread(handle: &Mutex<Option<JoinHandle<()>>>, panic_message: &str) -> Result<()> {
250        match handle.lock().take() {
251            Some(join_handle) => join_handle
252                .join()
253                .map_err(|_| FastCacheError::TaskJoin(panic_message.into())),
254            None => Ok(()),
255        }
256    }
257}
258
259impl WalAppender {
260    pub fn append(&self, record: MutationRecord) -> Result<()> {
261        self.sender
262            .send(record)
263            .map_err(|_| FastCacheError::ChannelClosed("wal appender"))
264    }
265}
266
267impl Drop for PersistenceRuntime {
268    fn drop(&mut self) {
269        let _ = self.shutdown();
270    }
271}
272
273pub fn load_recovery_state(config: &PersistenceConfig) -> Result<RecoveryState> {
274    RecoveryLoader::new(config).load()
275}
276
277pub fn data_dir_path(path: &Path) -> PathBuf {
278    PersistenceDataDir::normalize(path)
279}
280
281#[cfg(test)]
282mod tests {
283    use std::io::{Read, Write};
284    use std::net::{SocketAddr, TcpListener, TcpStream};
285    use std::thread;
286    use std::time::{Duration, Instant};
287
288    use crate::config::{PersistenceConfig, WalTcpExportMode};
289    use bytes::Bytes as SharedBytes;
290
291    use crate::storage::{MutationOp, MutationRecord};
292
293    use super::PersistenceRuntime;
294
295    #[test]
296    fn tcp_export_streams_framed_wal_bytes() {
297        let listener = TcpListener::bind("127.0.0.1:0").expect("bind tcp collector");
298        let addr = listener.local_addr().expect("local addr");
299        let reader = thread::spawn(move || {
300            let (mut stream, _) = listener.accept().expect("accept WAL stream");
301            stream
302                .set_read_timeout(Some(Duration::from_secs(3)))
303                .expect("read timeout");
304            let mut header = [0_u8; 9];
305            stream.read_exact(&mut header).expect("read WAL header");
306            let payload_len = u32::from_le_bytes(header[5..9].try_into().unwrap()) as usize;
307            let mut tail = vec![0_u8; payload_len + 4];
308            stream.read_exact(&mut tail).expect("read WAL payload");
309            let mut frame = header.to_vec();
310            frame.extend_from_slice(&tail);
311            frame
312        });
313
314        let temp_dir = tempfile::TempDir::new().expect("tempdir");
315        let mut config = PersistenceConfig {
316            data_dir: temp_dir.path().to_path_buf(),
317            compress_wal: false,
318            ..PersistenceConfig::default()
319        };
320        config.tcp_export.enabled = true;
321        config.tcp_export.addr = addr.to_string();
322        config.tcp_export.connect_timeout_ms = 50;
323        config.tcp_export.write_timeout_ms = 50;
324        config.tcp_export.reconnect_backoff_ms = 10;
325
326        let runtime = PersistenceRuntime::start(1, config).expect("persistence runtime");
327        runtime
328            .appender(0)
329            .expect("appender")
330            .append(MutationRecord {
331                shard_id: 0,
332                sequence: 1,
333                timestamp_ms: 42,
334                op: MutationOp::Set,
335                key: SharedBytes::from_static(b"alpha"),
336                value: SharedBytes::from_static(b"one"),
337                expire_at_ms: None,
338            })
339            .expect("append");
340
341        let frame = reader.join().expect("reader thread");
342        runtime.shutdown().expect("shutdown");
343
344        assert!(frame.starts_with(b"FCW2"));
345        assert_eq!(
346            frame.len(),
347            9 + u32::from_le_bytes(frame[5..9].try_into().unwrap()) as usize + 4
348        );
349    }
350
351    #[test]
352    fn tcp_export_listen_mode_authenticates_subscribers() {
353        let addr = free_local_addr();
354        let temp_dir = tempfile::TempDir::new().expect("tempdir");
355        let mut config = PersistenceConfig {
356            data_dir: temp_dir.path().to_path_buf(),
357            compress_wal: false,
358            ..PersistenceConfig::default()
359        };
360        config.tcp_export.enabled = true;
361        config.tcp_export.mode = WalTcpExportMode::Listen;
362        config.tcp_export.addr = addr.to_string();
363        config.tcp_export.auth_token = Some("secret".to_string());
364        config.tcp_export.connect_timeout_ms = 50;
365        config.tcp_export.write_timeout_ms = 50;
366        config.tcp_export.reconnect_backoff_ms = 10;
367
368        let runtime = PersistenceRuntime::start(1, config).expect("persistence runtime");
369        let mut subscriber = connect_with_retry(addr);
370        subscriber
371            .set_read_timeout(Some(Duration::from_secs(3)))
372            .expect("read timeout");
373        subscriber
374            .write_all(b"FCWAL-AUTH/1 secret\n")
375            .expect("write auth");
376        wait_for_subscriber(&runtime);
377
378        runtime
379            .appender(0)
380            .expect("appender")
381            .append(MutationRecord {
382                shard_id: 0,
383                sequence: 1,
384                timestamp_ms: 42,
385                op: MutationOp::Set,
386                key: SharedBytes::from_static(b"alpha"),
387                value: SharedBytes::from_static(b"one"),
388                expire_at_ms: None,
389            })
390            .expect("append");
391
392        let frame = read_wal_frame(&mut subscriber);
393        runtime.shutdown().expect("shutdown");
394
395        assert!(frame.starts_with(b"FCW2"));
396        assert_eq!(
397            frame.len(),
398            9 + u32::from_le_bytes(frame[5..9].try_into().unwrap()) as usize + 4
399        );
400    }
401
402    fn free_local_addr() -> SocketAddr {
403        let listener = TcpListener::bind("127.0.0.1:0").expect("bind ephemeral port");
404        listener.local_addr().expect("local addr")
405    }
406
407    fn connect_with_retry(addr: SocketAddr) -> TcpStream {
408        let deadline = Instant::now() + Duration::from_secs(2);
409        loop {
410            match TcpStream::connect(addr) {
411                Ok(stream) => return stream,
412                Err(error) if Instant::now() < deadline => {
413                    let _ = error;
414                    thread::sleep(Duration::from_millis(10));
415                }
416                Err(error) => panic!("connect to TCP WAL listener failed: {error}"),
417            }
418        }
419    }
420
421    fn wait_for_subscriber(runtime: &PersistenceRuntime) {
422        let deadline = Instant::now() + Duration::from_secs(2);
423        while Instant::now() < deadline {
424            if runtime.stats_snapshot().tcp_export_active_subscribers > 0 {
425                return;
426            }
427            thread::sleep(Duration::from_millis(10));
428        }
429        panic!("TCP WAL subscriber was not accepted");
430    }
431
432    fn read_wal_frame(stream: &mut TcpStream) -> Vec<u8> {
433        let mut header = [0_u8; 9];
434        stream.read_exact(&mut header).expect("read WAL header");
435        let payload_len = u32::from_le_bytes(header[5..9].try_into().unwrap()) as usize;
436        let mut tail = vec![0_u8; payload_len + 4];
437        stream.read_exact(&mut tail).expect("read WAL payload");
438        let mut frame = header.to_vec();
439        frame.extend_from_slice(&tail);
440        frame
441    }
442}