1use std::collections::VecDeque;
36use std::fs::{self, File, OpenOptions};
37use std::io::{self, Read, Seek, SeekFrom, Write};
38use std::path::{Path, PathBuf};
39use std::sync::{Arc, Mutex, RwLock};
40use std::time::{SystemTime, UNIX_EPOCH};
41
42use tracing::warn;
43
44const LOGICAL_WAL_SPOOL_MAGIC: &[u8; 4] = b"RDLW";
45const LOGICAL_WAL_SPOOL_VERSION_V1: u8 = 1;
46const LOGICAL_WAL_SPOOL_VERSION_V2: u8 = 2;
47const LOGICAL_WAL_SPOOL_VERSION_CURRENT: u8 = LOGICAL_WAL_SPOOL_VERSION_V2;
48const LOGICAL_WAL_V2_HEADER_LEN: u64 = 4 + 1 + 8 + 8 + 4;
51const LOGICAL_WAL_V2_CRC_LEN: u64 = 4;
53
54fn compute_logical_v2_crc(version: u8, lsn: u64, timestamp: u64, payload: &[u8]) -> u32 {
63 use crate::storage::engine::crc32::crc32_update;
64 let mut crc = crc32_update(0, &[version]);
65 crc = crc32_update(crc, &lsn.to_le_bytes());
66 crc = crc32_update(crc, ×tamp.to_le_bytes());
67 crc = crc32_update(crc, &(payload.len() as u32).to_le_bytes());
68 crc = crc32_update(crc, payload);
69 crc
70}
71
72pub struct WalBuffer {
75 records: RwLock<VecDeque<(u64, Vec<u8>)>>,
77 max_size: usize,
79 current_lsn: RwLock<u64>,
81}
82
83impl WalBuffer {
84 pub fn new(max_size: usize) -> Self {
85 Self {
86 records: RwLock::new(VecDeque::with_capacity(max_size)),
87 max_size,
88 current_lsn: RwLock::new(0),
89 }
90 }
91
92 pub fn append(&self, lsn: u64, data: Vec<u8>) {
94 let mut records = self.records.write().unwrap_or_else(|e| e.into_inner());
95 records.push_back((lsn, data));
96 while records.len() > self.max_size {
97 records.pop_front();
98 }
99
100 let mut current = self.current_lsn.write().unwrap_or_else(|e| e.into_inner());
101 *current = (*current).max(lsn);
102 }
103
104 pub fn read_since(&self, since_lsn: u64, max_count: usize) -> Vec<(u64, Vec<u8>)> {
106 let records = self.records.read().unwrap_or_else(|e| e.into_inner());
107 records
108 .iter()
109 .filter(|(lsn, _)| *lsn > since_lsn)
110 .take(max_count)
111 .cloned()
112 .collect()
113 }
114
115 pub fn current_lsn(&self) -> u64 {
117 *self.current_lsn.read().unwrap_or_else(|e| e.into_inner())
118 }
119
120 pub fn set_current_lsn(&self, lsn: u64) {
121 let mut current = self.current_lsn.write().unwrap_or_else(|e| e.into_inner());
122 *current = (*current).max(lsn);
123 }
124
125 pub fn oldest_lsn(&self) -> Option<u64> {
127 let records = self.records.read().unwrap_or_else(|e| e.into_inner());
128 records.front().map(|(lsn, _)| *lsn)
129 }
130}
131
132#[derive(Debug, Clone)]
133struct LogicalWalEntry {
134 lsn: u64,
135 timestamp_ms: u64,
138 data: Vec<u8>,
139}
140
141#[derive(Debug, Default)]
142struct LogicalWalSpoolState {
143 current_lsn: u64,
144}
145
146pub struct LogicalWalSpool {
150 path: PathBuf,
151 state: Mutex<LogicalWalSpoolState>,
152}
153
154impl LogicalWalSpool {
155 pub fn path_for(data_path: &Path) -> PathBuf {
156 let file_name = data_path
157 .file_name()
158 .and_then(|name| name.to_str())
159 .unwrap_or("reddb.rdb");
160 let spool_name = format!("{file_name}.logical.wal");
161 match data_path.parent() {
162 Some(parent) => parent.join(spool_name),
163 None => PathBuf::from(spool_name),
164 }
165 }
166
167 pub fn open(data_path: &Path) -> io::Result<Self> {
168 let path = Self::path_for(data_path);
169 if let Some(parent) = path.parent() {
170 fs::create_dir_all(parent)?;
171 }
172 if !path.exists() {
173 File::create(&path)?;
174 }
175 let entries = read_and_repair_entries(&path)?;
180 let current_lsn = entries.last().map(|entry| entry.lsn).unwrap_or(0);
181 Ok(Self {
182 path,
183 state: Mutex::new(LogicalWalSpoolState { current_lsn }),
184 })
185 }
186
187 pub fn append(&self, lsn: u64, data: &[u8]) -> io::Result<()> {
188 let timestamp_ms = SystemTime::now()
189 .duration_since(UNIX_EPOCH)
190 .map(|d| d.as_millis() as u64)
191 .unwrap_or(0);
192 self.append_with_timestamp(lsn, timestamp_ms, data)
193 }
194
195 pub fn append_with_timestamp(
199 &self,
200 lsn: u64,
201 timestamp_ms: u64,
202 data: &[u8],
203 ) -> io::Result<()> {
204 if data.len() > u32::MAX as usize {
205 return Err(io::Error::new(
206 io::ErrorKind::InvalidInput,
207 format!(
208 "logical WAL payload of {} bytes exceeds 4 GiB framing limit",
209 data.len()
210 ),
211 ));
212 }
213 let mut file = OpenOptions::new()
214 .create(true)
215 .append(true)
216 .open(&self.path)?;
217 let mut frame = Vec::with_capacity(
227 LOGICAL_WAL_V2_HEADER_LEN as usize + data.len() + LOGICAL_WAL_V2_CRC_LEN as usize,
228 );
229 frame.extend_from_slice(LOGICAL_WAL_SPOOL_MAGIC);
230 frame.push(LOGICAL_WAL_SPOOL_VERSION_CURRENT);
231 frame.extend_from_slice(&lsn.to_le_bytes());
232 frame.extend_from_slice(×tamp_ms.to_le_bytes());
233 frame.extend_from_slice(&(data.len() as u32).to_le_bytes());
234 frame.extend_from_slice(data);
235 let crc =
236 compute_logical_v2_crc(LOGICAL_WAL_SPOOL_VERSION_CURRENT, lsn, timestamp_ms, data);
237 frame.extend_from_slice(&crc.to_le_bytes());
238
239 file.write_all(&frame)?;
240 file.sync_all()?;
245
246 let mut state = self.state.lock().unwrap_or_else(|e| e.into_inner());
247 state.current_lsn = state.current_lsn.max(lsn);
248 Ok(())
249 }
250
251 pub fn read_since(&self, since_lsn: u64, max_count: usize) -> io::Result<Vec<(u64, Vec<u8>)>> {
252 let entries = read_and_repair_entries(&self.path)?;
253 Ok(entries
254 .into_iter()
255 .filter(|entry| entry.lsn > since_lsn)
256 .take(max_count)
257 .map(|entry| (entry.lsn, entry.data))
258 .collect())
259 }
260
261 pub fn current_lsn(&self) -> u64 {
262 self.state
263 .lock()
264 .unwrap_or_else(|e| e.into_inner())
265 .current_lsn
266 }
267
268 pub fn oldest_lsn(&self) -> io::Result<Option<u64>> {
269 Ok(read_and_repair_entries(&self.path)?
270 .into_iter()
271 .next()
272 .map(|entry| entry.lsn))
273 }
274
275 pub fn prune_through(&self, upto_lsn: u64) -> io::Result<()> {
276 let previous_lsn = self.current_lsn();
277 let retained: Vec<_> = read_and_repair_entries(&self.path)?
278 .into_iter()
279 .filter(|entry| entry.lsn > upto_lsn)
280 .collect();
281 let temp_path = self.path.with_extension("logical.wal.tmp");
282 let mut temp = File::create(&temp_path)?;
283 let mut current_lsn = 0;
284 for entry in retained {
285 let timestamp_ms = if entry.timestamp_ms > 0 {
293 entry.timestamp_ms
294 } else {
295 SystemTime::now()
296 .duration_since(UNIX_EPOCH)
297 .map(|d| d.as_millis() as u64)
298 .unwrap_or(0)
299 };
300 let crc = compute_logical_v2_crc(
301 LOGICAL_WAL_SPOOL_VERSION_CURRENT,
302 entry.lsn,
303 timestamp_ms,
304 &entry.data,
305 );
306 temp.write_all(LOGICAL_WAL_SPOOL_MAGIC)?;
307 temp.write_all(&[LOGICAL_WAL_SPOOL_VERSION_CURRENT])?;
308 temp.write_all(&entry.lsn.to_le_bytes())?;
309 temp.write_all(×tamp_ms.to_le_bytes())?;
310 temp.write_all(&(entry.data.len() as u32).to_le_bytes())?;
311 temp.write_all(&entry.data)?;
312 temp.write_all(&crc.to_le_bytes())?;
313 current_lsn = current_lsn.max(entry.lsn);
314 }
315 temp.sync_all()?;
316 fs::rename(&temp_path, &self.path)?;
317
318 let mut state = self.state.lock().unwrap_or_else(|e| e.into_inner());
319 state.current_lsn = previous_lsn.max(current_lsn).max(upto_lsn);
320 Ok(())
321 }
322}
323
324fn read_and_repair_entries(path: &Path) -> io::Result<Vec<LogicalWalEntry>> {
349 if !path.exists() {
350 return Ok(Vec::new());
351 }
352
353 let mut file = OpenOptions::new().read(true).write(true).open(path)?;
354 let mut entries = Vec::new();
355 let mut last_good_offset: u64 = 0;
356 let mut corrupt_reason: Option<String> = None;
357
358 loop {
359 let record_start = file.stream_position()?;
360
361 let mut magic = [0u8; 4];
362 match file.read_exact(&mut magic) {
363 Ok(()) => {}
364 Err(err) if err.kind() == io::ErrorKind::UnexpectedEof => break,
365 Err(err) => return Err(err),
366 }
367 if &magic != LOGICAL_WAL_SPOOL_MAGIC {
368 corrupt_reason = Some(format!(
369 "bad magic at offset {record_start}: got {magic:02x?}"
370 ));
371 break;
372 }
373
374 let mut version = [0u8; 1];
375 if let Err(err) = file.read_exact(&mut version) {
376 if err.kind() == io::ErrorKind::UnexpectedEof {
377 corrupt_reason = Some(format!("torn header at offset {record_start}"));
378 break;
379 }
380 return Err(err);
381 }
382
383 let entry_result = match version[0] {
384 LOGICAL_WAL_SPOOL_VERSION_V2 => read_one_v2(&mut file, record_start),
385 LOGICAL_WAL_SPOOL_VERSION_V1 => read_one_v1(&mut file, record_start),
386 other => {
387 corrupt_reason = Some(format!(
388 "unsupported version {other} at offset {record_start}"
389 ));
390 break;
391 }
392 };
393
394 match entry_result {
395 Ok(entry) => {
396 entries.push(entry);
397 last_good_offset = file.stream_position()?;
398 }
399 Err(reason) => {
400 corrupt_reason = Some(reason);
401 break;
402 }
403 }
404 }
405
406 if let Some(reason) = corrupt_reason {
407 let total_len = file.metadata()?.len();
408 if last_good_offset < total_len {
409 warn!(
410 target: "reddb::replication::logical_wal",
411 path = %path.display(),
412 reason = %reason,
413 truncating_from = last_good_offset,
414 truncating_to = total_len,
415 kept_records = entries.len(),
416 "truncating logical-WAL spool to last valid record"
417 );
418 file.set_len(last_good_offset)?;
419 file.sync_all()?;
420 }
421 }
422
423 Ok(entries)
424}
425
426fn read_one_v2(file: &mut File, record_start: u64) -> Result<LogicalWalEntry, String> {
430 let mut lsn = [0u8; 8];
431 if let Err(err) = file.read_exact(&mut lsn) {
432 return Err(format!("torn lsn at offset {record_start}: {err}"));
433 }
434 let mut timestamp = [0u8; 8];
435 if let Err(err) = file.read_exact(&mut timestamp) {
436 return Err(format!("torn timestamp at offset {record_start}: {err}"));
437 }
438 let mut len_bytes = [0u8; 4];
439 if let Err(err) = file.read_exact(&mut len_bytes) {
440 return Err(format!(
441 "torn payload length at offset {record_start}: {err}"
442 ));
443 }
444 let payload_len = u32::from_le_bytes(len_bytes) as usize;
445 const MAX_PLAUSIBLE_PAYLOAD: usize = 256 * 1024 * 1024;
450 if payload_len > MAX_PLAUSIBLE_PAYLOAD {
451 return Err(format!(
452 "implausible payload_len {payload_len} at offset {record_start}"
453 ));
454 }
455 let mut payload = vec![0u8; payload_len];
456 if let Err(err) = file.read_exact(&mut payload) {
457 return Err(format!(
458 "torn payload at offset {record_start} (expected {payload_len} bytes): {err}"
459 ));
460 }
461 let mut crc_bytes = [0u8; 4];
462 if let Err(err) = file.read_exact(&mut crc_bytes) {
463 return Err(format!("torn crc at offset {record_start}: {err}"));
464 }
465 let stored_crc = u32::from_le_bytes(crc_bytes);
466 let expected_crc = compute_logical_v2_crc(
467 LOGICAL_WAL_SPOOL_VERSION_V2,
468 u64::from_le_bytes(lsn),
469 u64::from_le_bytes(timestamp),
470 &payload,
471 );
472 if stored_crc != expected_crc {
473 return Err(format!(
474 "crc mismatch at offset {record_start}: stored {stored_crc:#010x}, expected {expected_crc:#010x}"
475 ));
476 }
477 Ok(LogicalWalEntry {
478 lsn: u64::from_le_bytes(lsn),
479 timestamp_ms: u64::from_le_bytes(timestamp),
480 data: payload,
481 })
482}
483
484fn read_one_v1(file: &mut File, record_start: u64) -> Result<LogicalWalEntry, String> {
489 let mut lsn = [0u8; 8];
490 if let Err(err) = file.read_exact(&mut lsn) {
491 return Err(format!("v1 torn lsn at offset {record_start}: {err}"));
492 }
493 let mut len_bytes = [0u8; 8];
494 if let Err(err) = file.read_exact(&mut len_bytes) {
495 return Err(format!(
496 "v1 torn payload length at offset {record_start}: {err}"
497 ));
498 }
499 let payload_len = u64::from_le_bytes(len_bytes) as usize;
500 if payload_len > 256 * 1024 * 1024 {
501 return Err(format!(
502 "v1 implausible payload_len {payload_len} at offset {record_start}"
503 ));
504 }
505 let mut payload = vec![0u8; payload_len];
506 if let Err(err) = file.read_exact(&mut payload) {
507 return Err(format!("v1 torn payload at offset {record_start}: {err}"));
508 }
509 Ok(LogicalWalEntry {
510 lsn: u64::from_le_bytes(lsn),
511 timestamp_ms: 0,
512 data: payload,
513 })
514}
515
516#[derive(Debug, Clone)]
522pub struct ReplicaState {
523 pub id: String,
524 pub last_acked_lsn: u64,
525 pub last_sent_lsn: u64,
526 pub last_durable_lsn: u64,
527 pub connected_at_unix_ms: u128,
528 pub last_seen_at_unix_ms: u128,
529 pub region: Option<String>,
534}
535
536pub struct PrimaryReplication {
538 pub wal_buffer: Arc<WalBuffer>,
539 pub logical_wal_spool: Option<Arc<LogicalWalSpool>>,
540 pub replicas: RwLock<Vec<ReplicaState>>,
541 pub commit_waiter: Arc<crate::replication::commit_waiter::CommitWaiter>,
545 topology_epoch: std::sync::atomic::AtomicU64,
552}
553
554impl PrimaryReplication {
555 pub fn new(data_path: Option<&Path>) -> Self {
556 Self {
557 wal_buffer: Arc::new(WalBuffer::new(100_000)),
558 logical_wal_spool: data_path
559 .and_then(|path| LogicalWalSpool::open(path).ok())
560 .map(Arc::new),
561 replicas: RwLock::new(Vec::new()),
562 commit_waiter: Arc::new(crate::replication::commit_waiter::CommitWaiter::new()),
563 topology_epoch: std::sync::atomic::AtomicU64::new(0),
564 }
565 }
566
567 pub fn register_replica(&self, id: String) -> u64 {
568 self.register_replica_with_region(id, None)
569 }
570
571 pub fn register_replica_with_region(&self, id: String, region: Option<String>) -> u64 {
577 let lsn = self.wal_buffer.current_lsn();
578 let now_ms = crate::utils::now_unix_millis() as u128;
579 let state = ReplicaState {
580 id,
581 last_acked_lsn: lsn,
582 last_sent_lsn: lsn,
583 last_durable_lsn: lsn,
584 connected_at_unix_ms: now_ms,
585 last_seen_at_unix_ms: now_ms,
586 region,
587 };
588 let mut replicas = self.replicas.write().unwrap_or_else(|e| e.into_inner());
589 replicas.push(state);
590 drop(replicas);
591 self.bump_topology_epoch();
592 lsn
593 }
594
595 pub fn unregister_replica(&self, id: &str) -> bool {
599 let mut replicas = self.replicas.write().unwrap_or_else(|e| e.into_inner());
600 let before = replicas.len();
601 replicas.retain(|r| r.id != id);
602 let removed = replicas.len() != before;
603 drop(replicas);
604 if removed {
605 self.bump_topology_epoch();
606 }
607 removed
608 }
609
610 pub fn topology_epoch(&self) -> u64 {
613 self.topology_epoch
614 .load(std::sync::atomic::Ordering::Relaxed)
615 }
616
617 pub fn bump_topology_epoch(&self) {
625 self.topology_epoch
626 .fetch_add(1, std::sync::atomic::Ordering::Relaxed);
627 }
628
629 pub fn ack_replica(&self, id: &str, lsn: u64) {
630 let now_ms = crate::utils::now_unix_millis() as u128;
631 let mut replicas = self.replicas.write().unwrap_or_else(|e| e.into_inner());
632 if let Some(r) = replicas.iter_mut().find(|r| r.id == id) {
633 r.last_acked_lsn = r.last_acked_lsn.max(lsn);
634 r.last_seen_at_unix_ms = now_ms;
635 }
636 }
637
638 pub fn ack_replica_lsn(&self, id: &str, applied_lsn: u64, durable_lsn: u64) {
644 let now_ms = crate::utils::now_unix_millis() as u128;
645 let mut replicas = self.replicas.write().unwrap_or_else(|e| e.into_inner());
646 if let Some(r) = replicas.iter_mut().find(|r| r.id == id) {
647 r.last_acked_lsn = r.last_acked_lsn.max(applied_lsn);
648 r.last_durable_lsn = r.last_durable_lsn.max(durable_lsn);
649 r.last_seen_at_unix_ms = now_ms;
650 }
651 drop(replicas);
655 self.commit_waiter.record_replica_ack(id, durable_lsn);
656 }
657
658 pub fn note_replica_pull(&self, id: &str, last_sent_lsn: u64) {
663 let now_ms = crate::utils::now_unix_millis() as u128;
664 let mut replicas = self.replicas.write().unwrap_or_else(|e| e.into_inner());
665 if let Some(r) = replicas.iter_mut().find(|r| r.id == id) {
666 r.last_sent_lsn = r.last_sent_lsn.max(last_sent_lsn);
667 r.last_seen_at_unix_ms = now_ms;
668 }
669 }
670
671 pub fn replica_snapshots(&self) -> Vec<ReplicaState> {
675 self.replicas
676 .read()
677 .unwrap_or_else(|e| e.into_inner())
678 .clone()
679 }
680
681 pub fn replica_count(&self) -> usize {
682 self.replicas
683 .read()
684 .unwrap_or_else(|e| e.into_inner())
685 .len()
686 }
687}
688
689#[cfg(test)]
690mod tests {
691 use super::*;
692 use crate::replication::cdc::{ChangeOperation, ChangeRecord};
693 use std::time::{SystemTime, UNIX_EPOCH};
694
695 fn temp_data_path(name: &str) -> PathBuf {
696 let suffix = SystemTime::now()
697 .duration_since(UNIX_EPOCH)
698 .unwrap()
699 .as_nanos();
700 std::env::temp_dir().join(format!("reddb_{name}_{suffix}.rdb"))
701 }
702
703 #[test]
704 fn logical_wal_spool_roundtrip_and_prune() {
705 let data_path = temp_data_path("logical_spool");
706 let spool_path = LogicalWalSpool::path_for(&data_path);
707 let spool = LogicalWalSpool::open(&data_path).expect("open spool");
708
709 let record1 = ChangeRecord {
710 lsn: 7,
711 timestamp: 1,
712 operation: ChangeOperation::Insert,
713 collection: "users".to_string(),
714 entity_id: 10,
715 entity_kind: "row".to_string(),
716 entity_bytes: Some(vec![1, 2, 3]),
717 metadata: None,
718 };
719 let record2 = ChangeRecord {
720 lsn: 8,
721 timestamp: 2,
722 operation: ChangeOperation::Update,
723 collection: "users".to_string(),
724 entity_id: 10,
725 entity_kind: "row".to_string(),
726 entity_bytes: Some(vec![4, 5, 6]),
727 metadata: None,
728 };
729
730 spool
731 .append(record1.lsn, &record1.encode())
732 .expect("append 1");
733 spool
734 .append(record2.lsn, &record2.encode())
735 .expect("append 2");
736
737 let entries = spool.read_since(0, usize::MAX).expect("read");
738 assert_eq!(entries.len(), 2);
739 assert_eq!(entries[0].0, 7);
740 assert_eq!(entries[1].0, 8);
741
742 spool.prune_through(7).expect("prune");
743 let retained = spool.read_since(0, usize::MAX).expect("read retained");
744 assert_eq!(retained.len(), 1);
745 assert_eq!(retained[0].0, 8);
746
747 let _ = fs::remove_file(spool_path);
748 }
749
750 #[test]
751 fn topology_epoch_monotonic_on_register_and_unregister() {
752 let primary = PrimaryReplication::new(None);
757 let e0 = primary.topology_epoch();
758 primary.register_replica("r1".to_string());
759 let e1 = primary.topology_epoch();
760 primary.register_replica("r2".to_string());
761 let e2 = primary.topology_epoch();
762 assert!(e1 > e0, "register must bump epoch ({e0} -> {e1})");
763 assert!(e2 > e1, "second register must bump epoch ({e1} -> {e2})");
764
765 let removed = primary.unregister_replica("r1");
766 assert!(removed);
767 let e3 = primary.topology_epoch();
768 assert!(e3 > e2, "unregister must bump epoch ({e2} -> {e3})");
769
770 let absent = primary.unregister_replica("ghost");
773 assert!(!absent);
774 assert_eq!(
775 primary.topology_epoch(),
776 e3,
777 "unregistering a missing replica must not bump the epoch"
778 );
779 }
780}