1mod recovery;
9mod runtime;
10mod snapshot;
11mod stats;
12mod tcp_export;
13mod wal;
14
15use std::fs;
16use std::path::{Path, PathBuf};
17use std::sync::Arc;
18use std::thread::JoinHandle;
19
20use crossbeam_channel::Sender;
21use parking_lot::Mutex;
22
23use crate::config::PersistenceConfig;
24#[cfg(feature = "telemetry")]
25use crate::storage::{CacheTelemetry, CacheTelemetryHandle};
26use crate::storage::{MutationRecord, StoredEntry, WalStatsSnapshot};
27use crate::{FastCacheError, Result};
28
29use recovery::{PersistenceDataDir, RecoveryLoader};
30use runtime::{PersistenceStartup, StartedPersistenceRuntime, StopSignal};
31pub use snapshot::{LoadedSnapshot, SnapshotCompression, SnapshotRepository, SnapshotStore};
32use stats::WalStats;
33
34pub(super) type WalFrameBytes = bytes::Bytes;
39
40#[derive(Debug, Clone)]
41pub struct RecoveryState {
42 pub entries: Vec<StoredEntry>,
43 pub snapshot_timestamp_ms: u64,
44}
45
46#[derive(Debug, Clone)]
47pub struct WalAppender {
48 sender: Sender<MutationRecord>,
49}
50
51pub struct PersistenceRuntime {
52 config: PersistenceConfig,
53 appenders: Vec<WalAppender>,
54 stats: Arc<WalStats>,
55 #[cfg(feature = "telemetry")]
56 _metrics_owner: Option<Arc<CacheTelemetry>>,
57 stop_tx: Option<Sender<()>>,
58 join_handle: Mutex<Option<JoinHandle<()>>>,
59 tcp_export_stop_tx: Option<Sender<()>>,
60 tcp_export_join_handle: Mutex<Option<JoinHandle<()>>>,
61}
62
63impl PersistenceRuntime {
64 pub fn start(shard_count: usize, config: PersistenceConfig) -> Result<Self> {
65 #[cfg(feature = "telemetry")]
66 {
67 Self::start_with_metrics(shard_count, config, None)
68 }
69 #[cfg(not(feature = "telemetry"))]
70 {
71 Self::start_inner(shard_count, config, ())
72 }
73 }
74
75 #[cfg(feature = "telemetry")]
76 pub fn start_with_metrics(
77 shard_count: usize,
78 config: PersistenceConfig,
79 metrics: Option<Arc<CacheTelemetry>>,
80 ) -> Result<Self> {
81 Self::start_inner(shard_count, config, metrics)
82 }
83
84 #[cfg(feature = "telemetry")]
85 fn start_inner(
86 shard_count: usize,
87 config: PersistenceConfig,
88 metrics: Option<Arc<CacheTelemetry>>,
89 ) -> Result<Self> {
90 Self::start_impl(shard_count, config, metrics)
91 }
92
93 #[cfg(not(feature = "telemetry"))]
94 fn start_inner(shard_count: usize, config: PersistenceConfig, _unit: ()) -> Result<Self> {
95 Self::start_impl(shard_count, config)
96 }
97
98 #[cfg(feature = "telemetry")]
99 fn start_impl(
100 shard_count: usize,
101 config: PersistenceConfig,
102 metrics: Option<Arc<CacheTelemetry>>,
103 ) -> Result<Self> {
104 match config.enabled {
105 true => Self::start_enabled(shard_count, config, metrics),
106 false => Ok(Self::disabled(config, None)),
107 }
108 }
109
110 #[cfg(not(feature = "telemetry"))]
111 fn start_impl(shard_count: usize, config: PersistenceConfig) -> Result<Self> {
112 match config.enabled {
113 true => Self::start_enabled(shard_count, config),
114 false => Ok(Self::disabled(config)),
115 }
116 }
117
118 #[cfg(feature = "telemetry")]
119 fn disabled(config: PersistenceConfig, metrics: Option<Arc<CacheTelemetry>>) -> Self {
120 Self {
121 config,
122 appenders: Vec::new(),
123 stats: Arc::new(WalStats::disabled()),
124 _metrics_owner: metrics,
125 stop_tx: None,
126 join_handle: Mutex::new(None),
127 tcp_export_stop_tx: None,
128 tcp_export_join_handle: Mutex::new(None),
129 }
130 }
131
132 #[cfg(not(feature = "telemetry"))]
133 fn disabled(config: PersistenceConfig) -> Self {
134 Self {
135 config,
136 appenders: Vec::new(),
137 stats: Arc::new(WalStats::disabled()),
138 stop_tx: None,
139 join_handle: Mutex::new(None),
140 tcp_export_stop_tx: None,
141 tcp_export_join_handle: Mutex::new(None),
142 }
143 }
144
145 #[cfg(feature = "telemetry")]
146 fn start_enabled(
147 shard_count: usize,
148 config: PersistenceConfig,
149 metrics: Option<Arc<CacheTelemetry>>,
150 ) -> Result<Self> {
151 let writer_metrics = metrics.as_ref().map(CacheTelemetryHandle::from_arc);
152 let runtime = Self::start_enabled_parts(shard_count, &config, writer_metrics)?;
153 Ok(Self {
154 config,
155 appenders: runtime.appenders,
156 stats: runtime.stats,
157 _metrics_owner: metrics,
158 stop_tx: Some(runtime.stop_tx),
159 join_handle: Mutex::new(Some(runtime.join_handle)),
160 tcp_export_stop_tx: runtime.tcp_export.stop_tx,
161 tcp_export_join_handle: Mutex::new(runtime.tcp_export.join_handle),
162 })
163 }
164
165 #[cfg(not(feature = "telemetry"))]
166 fn start_enabled(shard_count: usize, config: PersistenceConfig) -> Result<Self> {
167 let runtime = Self::start_enabled_parts(shard_count, &config)?;
168 Ok(Self {
169 config,
170 appenders: runtime.appenders,
171 stats: runtime.stats,
172 stop_tx: Some(runtime.stop_tx),
173 join_handle: Mutex::new(Some(runtime.join_handle)),
174 tcp_export_stop_tx: runtime.tcp_export.stop_tx,
175 tcp_export_join_handle: Mutex::new(runtime.tcp_export.join_handle),
176 })
177 }
178
179 #[cfg(feature = "telemetry")]
180 fn start_enabled_parts(
181 shard_count: usize,
182 config: &PersistenceConfig,
183 metrics: Option<CacheTelemetryHandle>,
184 ) -> Result<StartedPersistenceRuntime> {
185 PersistenceStartup::new(shard_count, config)?.start_writer(config.clone(), metrics)
186 }
187
188 #[cfg(not(feature = "telemetry"))]
189 fn start_enabled_parts(
190 shard_count: usize,
191 config: &PersistenceConfig,
192 ) -> Result<StartedPersistenceRuntime> {
193 PersistenceStartup::new(shard_count, config)?.start_writer(config.clone())
194 }
195
196 pub fn appender(&self, shard_id: usize) -> Option<WalAppender> {
197 self.appenders.get(shard_id).cloned()
198 }
199
200 pub fn is_enabled(&self) -> bool {
201 self.config.enabled
202 }
203
204 pub fn stats_snapshot(&self) -> WalStatsSnapshot {
205 self.stats.snapshot()
206 }
207
208 pub fn snapshot(&self, entries: &[StoredEntry], timestamp_ms: u64) -> Result<PathBuf> {
209 match self.config.enabled {
210 true => self.write_snapshot(entries, timestamp_ms),
211 false => Err(FastCacheError::Persistence(
212 "persistence is disabled".into(),
213 )),
214 }
215 }
216
217 fn write_snapshot(&self, entries: &[StoredEntry], timestamp_ms: u64) -> Result<PathBuf> {
218 fs::create_dir_all(&self.config.data_dir)?;
219 let snapshots = SnapshotStore::new(&self.config.data_dir);
220 let path = snapshots.write_snapshot(
221 entries,
222 timestamp_ms,
223 SnapshotCompression::from_enabled(self.config.compress_snapshots),
224 )?;
225 self.stats.record_snapshot_written();
226 wal::SegmentStore::new(&self.config.data_dir).prune_through(timestamp_ms)?;
227 Ok(path)
228 }
229
230 pub fn shutdown(&self) -> Result<()> {
231 Self::signal_stop(&self.stop_tx);
232 Self::signal_stop(&self.tcp_export_stop_tx);
233 Self::join_thread(&self.join_handle, "WAL thread panicked")?;
234 Self::join_thread(
235 &self.tcp_export_join_handle,
236 "TCP WAL exporter thread panicked",
237 )
238 }
239
240 fn signal_stop(stop_tx: &Option<Sender<()>>) {
241 match StopSignal::from_option(stop_tx) {
242 StopSignal::Enabled(stop_tx) => {
243 let _ = stop_tx.send(());
244 }
245 StopSignal::Disabled => {}
246 }
247 }
248
249 fn join_thread(handle: &Mutex<Option<JoinHandle<()>>>, panic_message: &str) -> Result<()> {
250 match handle.lock().take() {
251 Some(join_handle) => join_handle
252 .join()
253 .map_err(|_| FastCacheError::TaskJoin(panic_message.into())),
254 None => Ok(()),
255 }
256 }
257}
258
259impl WalAppender {
260 pub fn append(&self, record: MutationRecord) -> Result<()> {
261 self.sender
262 .send(record)
263 .map_err(|_| FastCacheError::ChannelClosed("wal appender"))
264 }
265}
266
267impl Drop for PersistenceRuntime {
268 fn drop(&mut self) {
269 let _ = self.shutdown();
270 }
271}
272
273pub fn load_recovery_state(config: &PersistenceConfig) -> Result<RecoveryState> {
274 RecoveryLoader::new(config).load()
275}
276
277pub fn data_dir_path(path: &Path) -> PathBuf {
278 PersistenceDataDir::normalize(path)
279}
280
281#[cfg(test)]
282mod tests {
283 use std::io::{Read, Write};
284 use std::net::{SocketAddr, TcpListener, TcpStream};
285 use std::thread;
286 use std::time::{Duration, Instant};
287
288 use crate::config::{PersistenceConfig, WalTcpExportMode};
289 use bytes::Bytes as SharedBytes;
290
291 use crate::storage::{MutationOp, MutationRecord};
292
293 use super::PersistenceRuntime;
294
295 #[test]
296 fn tcp_export_streams_framed_wal_bytes() {
297 let listener = TcpListener::bind("127.0.0.1:0").expect("bind tcp collector");
298 let addr = listener.local_addr().expect("local addr");
299 let reader = thread::spawn(move || {
300 let (mut stream, _) = listener.accept().expect("accept WAL stream");
301 stream
302 .set_read_timeout(Some(Duration::from_secs(3)))
303 .expect("read timeout");
304 let mut header = [0_u8; 9];
305 stream.read_exact(&mut header).expect("read WAL header");
306 let payload_len = u32::from_le_bytes(header[5..9].try_into().unwrap()) as usize;
307 let mut tail = vec![0_u8; payload_len + 4];
308 stream.read_exact(&mut tail).expect("read WAL payload");
309 let mut frame = header.to_vec();
310 frame.extend_from_slice(&tail);
311 frame
312 });
313
314 let temp_dir = tempfile::TempDir::new().expect("tempdir");
315 let mut config = PersistenceConfig {
316 data_dir: temp_dir.path().to_path_buf(),
317 compress_wal: false,
318 ..PersistenceConfig::default()
319 };
320 config.tcp_export.enabled = true;
321 config.tcp_export.addr = addr.to_string();
322 config.tcp_export.connect_timeout_ms = 50;
323 config.tcp_export.write_timeout_ms = 50;
324 config.tcp_export.reconnect_backoff_ms = 10;
325
326 let runtime = PersistenceRuntime::start(1, config).expect("persistence runtime");
327 runtime
328 .appender(0)
329 .expect("appender")
330 .append(MutationRecord {
331 shard_id: 0,
332 sequence: 1,
333 timestamp_ms: 42,
334 op: MutationOp::Set,
335 key: SharedBytes::from_static(b"alpha"),
336 value: SharedBytes::from_static(b"one"),
337 expire_at_ms: None,
338 })
339 .expect("append");
340
341 let frame = reader.join().expect("reader thread");
342 runtime.shutdown().expect("shutdown");
343
344 assert!(frame.starts_with(b"FCW2"));
345 assert_eq!(
346 frame.len(),
347 9 + u32::from_le_bytes(frame[5..9].try_into().unwrap()) as usize + 4
348 );
349 }
350
351 #[test]
352 fn tcp_export_listen_mode_authenticates_subscribers() {
353 let addr = free_local_addr();
354 let temp_dir = tempfile::TempDir::new().expect("tempdir");
355 let mut config = PersistenceConfig {
356 data_dir: temp_dir.path().to_path_buf(),
357 compress_wal: false,
358 ..PersistenceConfig::default()
359 };
360 config.tcp_export.enabled = true;
361 config.tcp_export.mode = WalTcpExportMode::Listen;
362 config.tcp_export.addr = addr.to_string();
363 config.tcp_export.auth_token = Some("secret".to_string());
364 config.tcp_export.connect_timeout_ms = 50;
365 config.tcp_export.write_timeout_ms = 50;
366 config.tcp_export.reconnect_backoff_ms = 10;
367
368 let runtime = PersistenceRuntime::start(1, config).expect("persistence runtime");
369 let mut subscriber = connect_with_retry(addr);
370 subscriber
371 .set_read_timeout(Some(Duration::from_secs(3)))
372 .expect("read timeout");
373 subscriber
374 .write_all(b"FCWAL-AUTH/1 secret\n")
375 .expect("write auth");
376 wait_for_subscriber(&runtime);
377
378 runtime
379 .appender(0)
380 .expect("appender")
381 .append(MutationRecord {
382 shard_id: 0,
383 sequence: 1,
384 timestamp_ms: 42,
385 op: MutationOp::Set,
386 key: SharedBytes::from_static(b"alpha"),
387 value: SharedBytes::from_static(b"one"),
388 expire_at_ms: None,
389 })
390 .expect("append");
391
392 let frame = read_wal_frame(&mut subscriber);
393 runtime.shutdown().expect("shutdown");
394
395 assert!(frame.starts_with(b"FCW2"));
396 assert_eq!(
397 frame.len(),
398 9 + u32::from_le_bytes(frame[5..9].try_into().unwrap()) as usize + 4
399 );
400 }
401
402 fn free_local_addr() -> SocketAddr {
403 let listener = TcpListener::bind("127.0.0.1:0").expect("bind ephemeral port");
404 listener.local_addr().expect("local addr")
405 }
406
407 fn connect_with_retry(addr: SocketAddr) -> TcpStream {
408 let deadline = Instant::now() + Duration::from_secs(2);
409 loop {
410 match TcpStream::connect(addr) {
411 Ok(stream) => return stream,
412 Err(error) if Instant::now() < deadline => {
413 let _ = error;
414 thread::sleep(Duration::from_millis(10));
415 }
416 Err(error) => panic!("connect to TCP WAL listener failed: {error}"),
417 }
418 }
419 }
420
421 fn wait_for_subscriber(runtime: &PersistenceRuntime) {
422 let deadline = Instant::now() + Duration::from_secs(2);
423 while Instant::now() < deadline {
424 if runtime.stats_snapshot().tcp_export_active_subscribers > 0 {
425 return;
426 }
427 thread::sleep(Duration::from_millis(10));
428 }
429 panic!("TCP WAL subscriber was not accepted");
430 }
431
432 fn read_wal_frame(stream: &mut TcpStream) -> Vec<u8> {
433 let mut header = [0_u8; 9];
434 stream.read_exact(&mut header).expect("read WAL header");
435 let payload_len = u32::from_le_bytes(header[5..9].try_into().unwrap()) as usize;
436 let mut tail = vec![0_u8; payload_len + 4];
437 stream.read_exact(&mut tail).expect("read WAL payload");
438 let mut frame = header.to_vec();
439 frame.extend_from_slice(&tail);
440 frame
441 }
442}