1use std::collections::{BTreeMap, VecDeque};
8use std::fs::{self, File, OpenOptions};
9use std::io::{self, Write};
10use std::path::{Path, PathBuf};
11use std::sync::{Arc, Condvar, Mutex, RwLock};
12use std::time::{Duration, Instant, SystemTime, UNIX_EPOCH};
13
14use reddb_file::{ReplicationSlot, ReplicationSlotInvalidationCause};
15use tracing::warn;
16
17mod slots;
18use slots::{
19 load_replication_slot_catalog, load_replication_slots, persist_replication_slot_catalog,
20 persist_replication_slots,
21};
22
23fn term_from_payload(payload: &[u8]) -> u64 {
24 crate::replication::cdc::ChangeRecord::decode(payload)
25 .map(|record| record.term)
26 .unwrap_or(crate::replication::DEFAULT_REPLICATION_TERM)
27}
28
29pub struct WalBuffer {
37 records: RwLock<VecDeque<(u64, Arc<[u8]>)>>,
39 current_lsn: RwLock<u64>,
41}
42
43impl WalBuffer {
44 pub fn new(max_size: usize) -> Self {
45 Self {
46 records: RwLock::new(VecDeque::with_capacity(max_size)),
47 current_lsn: RwLock::new(0),
48 }
49 }
50
51 pub fn append(&self, lsn: u64, data: Vec<u8>) {
53 let mut records = self.records.write().unwrap_or_else(|e| e.into_inner());
54 records.push_back((lsn, Arc::from(data.into_boxed_slice())));
55
56 let mut current = self.current_lsn.write().unwrap_or_else(|e| e.into_inner());
57 *current = (*current).max(lsn);
58 }
59
60 pub fn read_since(&self, since_lsn: u64, max_count: usize) -> Vec<(u64, Vec<u8>)> {
66 self.read_since_shared(since_lsn, max_count)
67 .into_iter()
68 .map(|(lsn, data)| (lsn, data.to_vec()))
69 .collect()
70 }
71
72 pub fn read_since_shared(&self, since_lsn: u64, max_count: usize) -> Vec<(u64, Arc<[u8]>)> {
77 let records = self.records.read().unwrap_or_else(|e| e.into_inner());
78 records
79 .iter()
80 .filter(|(lsn, _)| *lsn > since_lsn)
81 .take(max_count)
82 .cloned()
83 .collect()
84 }
85
86 pub fn current_lsn(&self) -> u64 {
88 *self.current_lsn.read().unwrap_or_else(|e| e.into_inner())
89 }
90
91 pub fn set_current_lsn(&self, lsn: u64) {
92 let mut current = self.current_lsn.write().unwrap_or_else(|e| e.into_inner());
93 *current = (*current).max(lsn);
94 }
95
96 pub fn prune_through(&self, upto_lsn: u64) {
97 let mut records = self.records.write().unwrap_or_else(|e| e.into_inner());
98 while records
99 .front()
100 .map(|(lsn, _)| *lsn <= upto_lsn)
101 .unwrap_or(false)
102 {
103 records.pop_front();
104 }
105 }
106
107 pub fn oldest_lsn(&self) -> Option<u64> {
109 let records = self.records.read().unwrap_or_else(|e| e.into_inner());
110 records.front().map(|(lsn, _)| *lsn)
111 }
112}
113
114fn logical_wal_entry_term(entry: &reddb_file::LogicalWalEntry) -> u64 {
115 if entry.term == 0 {
116 term_from_payload(&entry.data)
117 } else {
118 entry.term
119 }
120}
121
122fn logical_wal_data_with_framing_term(entry: &reddb_file::LogicalWalEntry) -> Vec<u8> {
123 let term = logical_wal_entry_term(entry);
124 match crate::replication::cdc::ChangeRecord::decode(&entry.data) {
125 Ok(mut record) if record.term != term => {
126 record.term = term;
127 record.encode()
128 }
129 _ => entry.data.clone(),
130 }
131}
132
133#[derive(Debug, Default)]
141struct LogicalWalSpoolState {
142 current_lsn: u64,
143 seek_index: Vec<(u64, u64)>,
147 write_offset: u64,
151 record_count: u64,
154}
155
156impl LogicalWalSpoolState {
157 fn note_record(&mut self, ordinal: u64, lsn: u64, offset: u64) {
161 if ordinal.is_multiple_of(reddb_file::LOGICAL_WAL_SEEK_INDEX_INTERVAL) {
162 if self.seek_index.last().map(|(l, _)| *l) != Some(lsn) {
166 self.seek_index.push((lsn, offset));
167 }
168 }
169 }
170
171 fn seek_floor_offset(&self, since_lsn: u64) -> u64 {
176 match self
177 .seek_index
178 .binary_search_by(|(lsn, _)| lsn.cmp(&since_lsn))
179 {
180 Ok(idx) => self.seek_index[idx].1,
181 Err(0) => 0,
182 Err(idx) => self.seek_index[idx - 1].1,
183 }
184 }
185}
186
187pub struct LogicalWalSpool {
191 path: PathBuf,
192 state: Mutex<LogicalWalSpoolState>,
193}
194
195impl LogicalWalSpool {
196 pub fn path_for(data_path: &Path) -> PathBuf {
197 reddb_file::layout::logical_wal_path(data_path)
198 }
199
200 pub fn open(data_path: &Path) -> io::Result<Self> {
201 let path = Self::path_for(data_path);
202 if let Some(parent) = path.parent() {
203 fs::create_dir_all(parent)?;
204 }
205 if !path.exists() {
206 File::create(&path)?;
207 }
208 let entries = reddb_file::read_and_repair_logical_wal_entries(&path)?;
213 let current_lsn = entries.last().map(|entry| entry.lsn).unwrap_or(0);
214 let (seek_index, write_offset, record_count) =
217 reddb_file::build_logical_wal_seek_index(&path)?;
218 Ok(Self {
219 path,
220 state: Mutex::new(LogicalWalSpoolState {
221 current_lsn,
222 seek_index,
223 write_offset,
224 record_count,
225 }),
226 })
227 }
228
229 pub fn append(&self, lsn: u64, data: &[u8]) -> io::Result<()> {
230 let timestamp_ms = SystemTime::now()
231 .duration_since(UNIX_EPOCH)
232 .map(|d| d.as_millis() as u64)
233 .unwrap_or(0);
234 self.append_with_timestamp(lsn, timestamp_ms, data)
235 }
236
237 pub fn append_with_timestamp(
241 &self,
242 lsn: u64,
243 timestamp_ms: u64,
244 data: &[u8],
245 ) -> io::Result<()> {
246 self.append_with_term_and_timestamp(term_from_payload(data), lsn, timestamp_ms, data)
247 }
248
249 pub fn append_with_term_and_timestamp(
250 &self,
251 term: u64,
252 lsn: u64,
253 timestamp_ms: u64,
254 data: &[u8],
255 ) -> io::Result<()> {
256 let mut file = OpenOptions::new()
257 .create(true)
258 .append(true)
259 .open(&self.path)?;
260 let frame = reddb_file::encode_logical_wal_v3(term, lsn, timestamp_ms, data)?;
270
271 file.write_all(&frame)?;
272 file.sync_all()?;
277
278 let mut state = self.state.lock().unwrap_or_else(|e| e.into_inner());
279 state.current_lsn = state.current_lsn.max(lsn);
280 let record_start = state.write_offset;
284 let ordinal = state.record_count;
285 state.note_record(ordinal, lsn, record_start);
286 state.write_offset = record_start + frame.len() as u64;
287 state.record_count = ordinal + 1;
288 Ok(())
289 }
290
291 pub fn read_since(&self, since_lsn: u64, max_count: usize) -> io::Result<Vec<(u64, Vec<u8>)>> {
292 let start_offset = {
298 let state = self.state.lock().unwrap_or_else(|e| e.into_inner());
299 state.seek_floor_offset(since_lsn)
300 };
301 let entries = reddb_file::read_logical_wal_entries_from(&self.path, start_offset)?;
302 Ok(entries
303 .into_iter()
304 .filter(|entry| entry.lsn > since_lsn)
305 .take(max_count)
306 .map(|entry| (entry.lsn, logical_wal_data_with_framing_term(&entry)))
307 .collect())
308 }
309
310 #[cfg(test)]
314 fn seek_floor_offset(&self, since_lsn: u64) -> u64 {
315 self.state
316 .lock()
317 .unwrap_or_else(|e| e.into_inner())
318 .seek_floor_offset(since_lsn)
319 }
320
321 pub fn current_lsn(&self) -> u64 {
322 self.state
323 .lock()
324 .unwrap_or_else(|e| e.into_inner())
325 .current_lsn
326 }
327
328 pub fn oldest_lsn(&self) -> io::Result<Option<u64>> {
329 Ok(reddb_file::read_and_repair_logical_wal_entries(&self.path)?
330 .into_iter()
331 .next()
332 .map(|entry| entry.lsn))
333 }
334
335 pub fn prune_through(&self, upto_lsn: u64) -> io::Result<()> {
336 let previous_lsn = self.current_lsn();
337 let mut retained: Vec<_> = reddb_file::read_and_repair_logical_wal_entries(&self.path)?
338 .into_iter()
339 .filter(|entry| entry.lsn > upto_lsn)
340 .collect();
341 for entry in &mut retained {
342 entry.term = logical_wal_entry_term(entry);
343 }
344 let temp_path = reddb_file::layout::logical_wal_temp_path(&self.path);
345 for entry in &mut retained {
346 let timestamp_ms = if entry.timestamp_ms > 0 {
354 entry.timestamp_ms
355 } else {
356 SystemTime::now()
357 .duration_since(UNIX_EPOCH)
358 .map(|d| d.as_millis() as u64)
359 .unwrap_or(0)
360 };
361 entry.timestamp_ms = timestamp_ms;
362 }
363 let current_lsn =
364 reddb_file::rewrite_logical_wal_entries(&self.path, &temp_path, &retained)?;
365
366 let (seek_index, write_offset, record_count) =
369 reddb_file::build_logical_wal_seek_index(&self.path)?;
370 let mut state = self.state.lock().unwrap_or_else(|e| e.into_inner());
371 state.current_lsn = previous_lsn.max(current_lsn).max(upto_lsn);
372 state.seek_index = seek_index;
373 state.write_offset = write_offset;
374 state.record_count = record_count;
375 Ok(())
376 }
377}
378
379#[derive(Debug, Clone)]
385pub struct ReplicaState {
386 pub id: String,
387 pub last_acked_lsn: u64,
388 pub last_sent_lsn: u64,
389 pub last_durable_lsn: u64,
390 pub apply_error_count: u64,
391 pub divergence_count: u64,
392 pub connected_at_unix_ms: u128,
393 pub last_seen_at_unix_ms: u128,
394 pub region: Option<String>,
399 pub rebootstrapping: bool,
406}
407
408#[derive(Debug, Clone, Copy, PartialEq, Eq)]
410pub struct ReplicationProgress {
411 pub lag_lsn: u64,
412 pub safe_replay_lsn: u64,
413}
414
415impl ReplicationProgress {
416 pub fn from_replicas(replicas: &[ReplicaState]) -> Option<Self> {
417 let max_sent_lsn = replicas.iter().map(|replica| replica.last_sent_lsn).max()?;
418 let min_acked_lsn = replicas
419 .iter()
420 .map(|replica| replica.last_acked_lsn)
421 .min()?;
422 let safe_replay_lsn = replicas
423 .iter()
424 .map(|replica| replica.last_durable_lsn)
425 .min()?;
426
427 Some(Self {
428 lag_lsn: max_sent_lsn.saturating_sub(min_acked_lsn),
429 safe_replay_lsn,
430 })
431 }
432}
433
434pub struct PrimaryReplication {
436 pub wal_buffer: Arc<WalBuffer>,
437 pub logical_wal_spool: Option<Arc<LogicalWalSpool>>,
438 pub replicas: RwLock<Vec<ReplicaState>>,
439 wal_appended: (Mutex<u64>, Condvar),
440 slot_path: Option<PathBuf>,
441 slot_catalog_path: Option<PathBuf>,
442 primary_replica_file_plan: Option<reddb_file::PrimaryReplicaFilePlan>,
443 primary_replica_wal_lock: Mutex<()>,
444 slots: RwLock<BTreeMap<String, ReplicationSlot>>,
445 slot_retention_max_lag_lsn: u64,
446 slot_idle_timeout_ms: u64,
447 pub commit_waiter: Arc<crate::replication::commit_waiter::CommitWaiter>,
451 topology_epoch: std::sync::atomic::AtomicU64,
458 partial_resync_count: std::sync::atomic::AtomicU64,
464 full_resync_count: std::sync::atomic::AtomicU64,
471}
472
473#[derive(Debug, Clone, Copy, PartialEq, Eq)]
475pub enum ResumeMode {
476 PartialResync { resume_lsn: u64 },
480 FullRebootstrap {
483 cause: ReplicationSlotInvalidationCause,
484 },
485}
486
487impl PrimaryReplication {
488 pub fn slot_path_for(data_path: &Path) -> PathBuf {
489 reddb_file::layout::legacy_logical_slots_path(data_path)
490 }
491
492 pub fn primary_replica_root_for(data_path: &Path) -> PathBuf {
493 reddb_file::layout::primary_replica_root(data_path)
494 }
495
496 pub fn slot_catalog_path_for(data_path: &Path) -> PathBuf {
497 Self::primary_replica_file_plan_for(data_path).slots_path()
498 }
499
500 fn primary_replica_file_plan_for(data_path: &Path) -> reddb_file::PrimaryReplicaFilePlan {
501 let root = Self::primary_replica_root_for(data_path);
502 let timeline =
503 Self::primary_replica_current_timeline_for_root(&root).unwrap_or_else(|err| {
504 warn!(
505 target: "reddb::replication::primary",
506 error = %err,
507 "failed to read primary-replica timeline history; using initial timeline"
508 );
509 reddb_file::TimelineId::initial()
510 });
511 reddb_file::PrimaryReplicaFilePlan::new(root, timeline)
512 }
513
514 fn primary_replica_current_timeline_for_root(
515 root: &Path,
516 ) -> Result<reddb_file::TimelineId, reddb_file::RdbFileError> {
517 let path = reddb_file::PrimaryReplicaFilePlan::new(root, reddb_file::TimelineId::initial())
518 .timeline_history_path();
519 match reddb_file::TimelineHistory::read_from_path(&path) {
520 Ok(history) => Ok(history
521 .current()
522 .unwrap_or_else(reddb_file::TimelineId::initial)),
523 Err(reddb_file::RdbFileError::Io(err))
524 if err.kind() == std::io::ErrorKind::NotFound =>
525 {
526 Ok(reddb_file::TimelineId::initial())
527 }
528 Err(err) => Err(err),
529 }
530 }
531
532 pub fn new(data_path: Option<&Path>) -> Self {
533 Self::new_with_config(data_path, &crate::replication::ReplicationConfig::primary())
534 }
535
536 pub fn new_with_config(
537 data_path: Option<&Path>,
538 config: &crate::replication::ReplicationConfig,
539 ) -> Self {
540 let now_ms = crate::utils::now_unix_millis() as u128;
541 let slot_path = data_path.map(Self::slot_path_for);
542 let slot_catalog_path = data_path.map(Self::slot_catalog_path_for);
543 let primary_replica_file_plan = data_path.map(Self::primary_replica_file_plan_for);
544 let mut slots = load_replication_slot_catalog(slot_catalog_path.as_deref(), now_ms);
545 slots.extend(load_replication_slots(slot_path.as_deref(), now_ms));
546 let logical_wal_spool = data_path
547 .and_then(|path| LogicalWalSpool::open(path).ok())
548 .map(Arc::new);
549 let current_lsn = logical_wal_spool
550 .as_ref()
551 .map(|spool| spool.current_lsn())
552 .unwrap_or(0);
553 Self {
554 wal_buffer: Arc::new(WalBuffer::new(100_000)),
555 logical_wal_spool,
556 replicas: RwLock::new(Vec::new()),
557 wal_appended: (Mutex::new(current_lsn), Condvar::new()),
558 slot_path,
559 slot_catalog_path,
560 primary_replica_file_plan,
561 primary_replica_wal_lock: Mutex::new(()),
562 slots: RwLock::new(slots),
563 slot_retention_max_lag_lsn: config.slot_retention_max_lag_lsn,
564 slot_idle_timeout_ms: config.slot_idle_timeout_ms,
565 commit_waiter: Arc::new(crate::replication::commit_waiter::CommitWaiter::new()),
566 topology_epoch: std::sync::atomic::AtomicU64::new(0),
567 partial_resync_count: std::sync::atomic::AtomicU64::new(0),
568 full_resync_count: std::sync::atomic::AtomicU64::new(0),
569 }
570 }
571
572 pub fn append_logical_record(&self, lsn: u64, encoded: Vec<u8>) {
573 self.wal_buffer.append(lsn, encoded.clone());
574 if let Some(spool) = &self.logical_wal_spool {
575 let _ = spool.append(lsn, &encoded);
576 }
577 if let Some(plan) = &self.primary_replica_file_plan {
578 let _guard = self
579 .primary_replica_wal_lock
580 .lock()
581 .unwrap_or_else(|err| err.into_inner());
582 match Self::primary_replica_current_timeline_for_root(&plan.root) {
583 Ok(timeline) => {
584 let plan = reddb_file::PrimaryReplicaFilePlan::new(plan.root.clone(), timeline);
585 if let Err(err) = plan.append_wal_record(lsn, &encoded) {
586 warn!(
587 target: "reddb::replication::primary",
588 lsn,
589 error = %err,
590 "failed to append primary-replica WAL segment"
591 );
592 }
593 }
594 Err(err) => {
595 warn!(
596 target: "reddb::replication::primary",
597 lsn,
598 error = %err,
599 "failed to read primary-replica timeline history; skipping WAL append"
600 );
601 }
602 }
603 }
604 let (lock, cvar) = &self.wal_appended;
605 let mut latest = lock.lock().unwrap_or_else(|e| e.into_inner());
606 *latest = (*latest).max(lsn);
607 cvar.notify_all();
608 }
609
610 pub fn wait_for_logical_lsn_after(&self, since_lsn: u64, timeout: Duration) -> bool {
611 if self.current_logical_lsn() > since_lsn {
612 return true;
613 }
614 let deadline = Instant::now() + timeout;
615 let (lock, cvar) = &self.wal_appended;
616 let mut latest = lock.lock().unwrap_or_else(|e| e.into_inner());
617 while *latest <= since_lsn {
618 let now = Instant::now();
619 if now >= deadline {
620 return false;
621 }
622 let remaining = deadline.saturating_duration_since(now);
623 let (guard, result) = cvar
624 .wait_timeout(latest, remaining)
625 .unwrap_or_else(|e| e.into_inner());
626 latest = guard;
627 if result.timed_out() && *latest <= since_lsn {
628 return false;
629 }
630 }
631 true
632 }
633
634 pub fn register_replica(&self, id: String) -> u64 {
635 self.register_replica_with_region(id, None)
636 }
637
638 pub fn register_replica_with_region(&self, id: String, region: Option<String>) -> u64 {
655 let now_ms = crate::utils::now_unix_millis() as u128;
656 let resume_lsn = self.ensure_slot(&id, self.current_logical_lsn());
657 let mut replicas = self.replicas.write().unwrap_or_else(|e| e.into_inner());
658 if let Some(existing) = replicas.iter_mut().find(|r| r.id == id) {
659 existing.last_seen_at_unix_ms = now_ms;
660 if region.is_some() {
661 existing.region = region;
662 }
663 return resume_lsn;
664 }
665 replicas.push(ReplicaState {
666 id,
667 last_acked_lsn: resume_lsn,
668 last_sent_lsn: resume_lsn,
669 last_durable_lsn: resume_lsn,
670 apply_error_count: 0,
671 divergence_count: 0,
672 connected_at_unix_ms: now_ms,
673 last_seen_at_unix_ms: now_ms,
674 region,
675 rebootstrapping: false,
676 });
677 drop(replicas);
678 self.bump_topology_epoch();
679 resume_lsn
680 }
681
682 pub fn set_replica_rebootstrapping(&self, id: &str, rebootstrapping: bool) -> bool {
697 let mut replicas = self.replicas.write().unwrap_or_else(|e| e.into_inner());
698 let Some(state) = replicas.iter_mut().find(|r| r.id == id) else {
699 return false;
700 };
701 if state.rebootstrapping == rebootstrapping {
702 return true;
703 }
704 state.rebootstrapping = rebootstrapping;
705 drop(replicas);
706 self.bump_topology_epoch();
707 true
708 }
709
710 pub fn ensure_replica_registered(&self, id: &str) -> bool {
719 let already = self
720 .replicas
721 .read()
722 .unwrap_or_else(|e| e.into_inner())
723 .iter()
724 .any(|r| r.id == id);
725 if already {
726 return false;
727 }
728 self.register_replica(id.to_string());
729 true
730 }
731
732 pub fn unregister_replica(&self, id: &str) -> bool {
736 let mut replicas = self.replicas.write().unwrap_or_else(|e| e.into_inner());
737 let before = replicas.len();
738 replicas.retain(|r| r.id != id);
739 let removed = replicas.len() != before;
740 drop(replicas);
741 if removed {
742 self.commit_waiter.drop_replica(id);
743 self.bump_topology_epoch();
744 }
745 removed
746 }
747
748 pub fn topology_epoch(&self) -> u64 {
751 self.topology_epoch
752 .load(std::sync::atomic::Ordering::Relaxed)
753 }
754
755 pub fn bump_topology_epoch(&self) {
763 self.topology_epoch
764 .fetch_add(1, std::sync::atomic::Ordering::Relaxed);
765 }
766
767 pub fn ack_replica(&self, id: &str, lsn: u64) {
768 let now_ms = crate::utils::now_unix_millis() as u128;
769 let mut replicas = self.replicas.write().unwrap_or_else(|e| e.into_inner());
770 if let Some(r) = replicas.iter_mut().find(|r| r.id == id) {
771 r.last_acked_lsn = r.last_acked_lsn.max(lsn);
772 r.last_durable_lsn = r.last_durable_lsn.max(lsn);
773 r.last_seen_at_unix_ms = now_ms;
774 }
775 drop(replicas);
776 self.commit_waiter.record_replica_ack(id, lsn);
777 }
778
779 pub fn ack_replica_lsn(&self, id: &str, applied_lsn: u64, durable_lsn: u64) {
785 self.ack_replica_lsn_with_observability(id, applied_lsn, durable_lsn, 0, 0);
786 }
787
788 pub fn ack_replica_lsn_with_observability(
789 &self,
790 id: &str,
791 applied_lsn: u64,
792 durable_lsn: u64,
793 apply_error_count: u64,
794 divergence_count: u64,
795 ) {
796 let now_ms = crate::utils::now_unix_millis() as u128;
797 self.advance_slot(id, applied_lsn, durable_lsn, now_ms);
798 let mut replicas = self.replicas.write().unwrap_or_else(|e| e.into_inner());
799 if let Some(r) = replicas.iter_mut().find(|r| r.id == id) {
800 r.last_acked_lsn = r.last_acked_lsn.max(applied_lsn);
801 r.last_durable_lsn = r.last_durable_lsn.max(durable_lsn);
802 r.apply_error_count = r.apply_error_count.max(apply_error_count);
803 r.divergence_count = r.divergence_count.max(divergence_count);
804 r.last_seen_at_unix_ms = now_ms;
805 }
806 drop(replicas);
810 self.commit_waiter.record_replica_ack(id, durable_lsn);
811 }
812
813 pub fn note_replica_pull(&self, id: &str, last_sent_lsn: u64) {
818 let now_ms = crate::utils::now_unix_millis() as u128;
819 self.touch_slot(id, now_ms);
820 let mut replicas = self.replicas.write().unwrap_or_else(|e| e.into_inner());
821 if let Some(r) = replicas.iter_mut().find(|r| r.id == id) {
822 r.last_sent_lsn = r.last_sent_lsn.max(last_sent_lsn);
823 r.last_seen_at_unix_ms = now_ms;
824 }
825 }
826
827 pub fn replica_snapshots(&self) -> Vec<ReplicaState> {
831 self.replicas
832 .read()
833 .unwrap_or_else(|e| e.into_inner())
834 .clone()
835 }
836
837 pub fn replication_progress(&self) -> Option<ReplicationProgress> {
838 let replicas = self.replicas.read().unwrap_or_else(|e| e.into_inner());
839 ReplicationProgress::from_replicas(&replicas)
840 }
841
842 pub fn slot_snapshots(&self) -> Vec<ReplicationSlot> {
843 self.slots
844 .read()
845 .unwrap_or_else(|e| e.into_inner())
846 .values()
847 .cloned()
848 .collect()
849 }
850
851 pub fn retention_floor_lsn(&self) -> Option<u64> {
852 self.slots
853 .read()
854 .unwrap_or_else(|e| e.into_inner())
855 .values()
856 .filter(|slot| slot.invalidation_reason.is_none())
857 .map(|slot| slot.restart_lsn)
858 .min()
859 }
860
861 pub fn prune_retained_wal_through(&self, archived_lsn: u64) -> io::Result<u64> {
862 self.enforce_retention_limits(crate::utils::now_unix_millis() as u128);
863 let prune_lsn = self
864 .retention_floor_lsn()
865 .map(|floor| floor.min(archived_lsn))
866 .unwrap_or(archived_lsn);
867 if prune_lsn > 0 {
868 if let Some(spool) = &self.logical_wal_spool {
869 spool.prune_through(prune_lsn)?;
870 }
871 self.wal_buffer.prune_through(prune_lsn);
872 }
873 Ok(prune_lsn)
874 }
875
876 pub fn replica_count(&self) -> usize {
877 self.replicas
878 .read()
879 .unwrap_or_else(|e| e.into_inner())
880 .len()
881 }
882
883 pub fn current_logical_lsn(&self) -> u64 {
887 self.logical_wal_spool
888 .as_ref()
889 .map(|spool| spool.current_lsn())
890 .unwrap_or_else(|| self.wal_buffer.current_lsn())
891 }
892
893 fn ensure_slot(&self, id: &str, initial_lsn: u64) -> u64 {
894 let now_ms = crate::utils::now_unix_millis() as u128;
895 let mut slots = self.slots.write().unwrap_or_else(|e| e.into_inner());
896 if let Some(slot) = slots.get_mut(id) {
897 slot.last_seen_at_unix_ms = now_ms;
898 let restart_lsn = slot.restart_lsn;
899 self.persist_slots_locked(&slots);
900 return restart_lsn;
901 }
902 let mut slot = ReplicationSlot::new(
903 id.to_string(),
904 reddb_file::TimelineId::initial(),
905 initial_lsn,
906 );
907 slot.last_seen_at_unix_ms = now_ms;
908 slots.insert(id.to_string(), slot);
909 let restart_lsn = initial_lsn;
910 self.persist_slots_locked(&slots);
911 restart_lsn
912 }
913
914 fn advance_slot(&self, id: &str, confirmed_lsn: u64, restart_lsn: u64, now_ms: u128) {
915 let mut slots = self.slots.write().unwrap_or_else(|e| e.into_inner());
916 let slot = slots.entry(id.to_string()).or_insert_with(|| {
917 let mut slot =
918 ReplicationSlot::new(id.to_string(), reddb_file::TimelineId::initial(), 0);
919 slot.last_seen_at_unix_ms = now_ms;
920 slot
921 });
922 if slot.invalidation_reason.is_some() {
923 return;
924 }
925 slot.confirmed_write_lsn = slot.confirmed_lsn().max(confirmed_lsn).max(restart_lsn);
926 slot.restart_lsn = slot.restart_lsn.max(restart_lsn);
927 slot.confirmed_flush_lsn = slot.confirmed_flush_lsn.max(slot.restart_lsn);
928 slot.confirmed_apply_lsn = slot.confirmed_apply_lsn.max(slot.restart_lsn);
929 slot.last_seen_at_unix_ms = now_ms;
930 self.persist_slots_locked(&slots);
931 }
932
933 pub fn touch_slot(&self, id: &str, now_ms: u128) {
934 let mut slots = self.slots.write().unwrap_or_else(|e| e.into_inner());
935 let mut changed = false;
936 if let Some(slot) = slots.get_mut(id) {
937 if slot.invalidation_reason.is_none() {
938 slot.last_seen_at_unix_ms = now_ms;
939 changed = true;
940 }
941 }
942 if changed {
943 self.persist_slots_locked(&slots);
944 }
945 }
946
947 pub fn enforce_retention_limits(
948 &self,
949 now_ms: u128,
950 ) -> Vec<(String, ReplicationSlotInvalidationCause)> {
951 let current_lsn = self.current_logical_lsn();
952 let mut invalidated = Vec::new();
953 let mut slots = self.slots.write().unwrap_or_else(|e| e.into_inner());
954 for slot in slots.values_mut() {
955 if slot.invalidation_reason.is_some() {
956 continue;
957 }
958 let reason = if self.slot_retention_max_lag_lsn > 0
959 && current_lsn.saturating_sub(slot.restart_lsn) > self.slot_retention_max_lag_lsn
960 {
961 Some(ReplicationSlotInvalidationCause::Horizon)
962 } else if self.slot_idle_timeout_ms > 0
963 && now_ms.saturating_sub(slot.last_seen_at_unix_ms)
964 > u128::from(self.slot_idle_timeout_ms)
965 {
966 Some(ReplicationSlotInvalidationCause::IdleTimeout)
967 } else {
968 None
969 };
970 if let Some(reason) = reason {
971 slot.invalidation_reason = Some(reason);
972 slot.invalidated_at_unix_ms = Some(now_ms);
973 invalidated.push((slot.replica_id.clone(), reason));
974 }
975 }
976 if !invalidated.is_empty() {
977 self.persist_slots_locked(&slots);
978 }
979 invalidated
980 }
981
982 pub fn slot_rebootstrap_reason(
983 &self,
984 id: &str,
985 requested_since_lsn: u64,
986 oldest_available_lsn: Option<u64>,
987 ) -> Option<ReplicationSlotInvalidationCause> {
988 let now_ms = crate::utils::now_unix_millis() as u128;
989 let mut slots = self.slots.write().unwrap_or_else(|e| e.into_inner());
990 let slot = slots.get_mut(id)?;
991 if let Some(reason) = slot.invalidation_reason {
992 return Some(reason);
993 }
994 let slot_floor = slot.restart_lsn.max(requested_since_lsn);
995 if oldest_available_lsn
996 .map(|oldest| oldest > slot_floor.saturating_add(1))
997 .unwrap_or(false)
998 {
999 slot.invalidation_reason = Some(ReplicationSlotInvalidationCause::WalRemoved);
1000 slot.invalidated_at_unix_ms = Some(now_ms);
1001 self.persist_slots_locked(&slots);
1002 return Some(ReplicationSlotInvalidationCause::WalRemoved);
1003 }
1004 None
1005 }
1006
1007 pub fn plan_replica_resume(
1016 &self,
1017 id: &str,
1018 requested_since_lsn: u64,
1019 oldest_available_lsn: Option<u64>,
1020 ) -> ResumeMode {
1021 if let Some(cause) =
1022 self.slot_rebootstrap_reason(id, requested_since_lsn, oldest_available_lsn)
1023 {
1024 self.full_resync_count
1025 .fetch_add(1, std::sync::atomic::Ordering::Relaxed);
1026 return ResumeMode::FullRebootstrap { cause };
1027 }
1028 let resume_lsn = self
1029 .slot_snapshots()
1030 .into_iter()
1031 .find(|slot| slot.replica_id == id)
1032 .map(|slot| requested_since_lsn.max(slot.restart_lsn))
1033 .unwrap_or(requested_since_lsn);
1034 self.partial_resync_count
1035 .fetch_add(1, std::sync::atomic::Ordering::Relaxed);
1036 ResumeMode::PartialResync { resume_lsn }
1037 }
1038
1039 pub fn partial_resync_count(&self) -> u64 {
1042 self.partial_resync_count
1043 .load(std::sync::atomic::Ordering::Relaxed)
1044 }
1045
1046 pub fn full_resync_count(&self) -> u64 {
1050 self.full_resync_count
1051 .load(std::sync::atomic::Ordering::Relaxed)
1052 }
1053
1054 fn persist_slots_locked(&self, slots: &BTreeMap<String, ReplicationSlot>) {
1055 if let Err(err) = persist_replication_slots(self.slot_path.as_deref(), slots) {
1056 warn!(
1057 target: "reddb::replication::slots",
1058 error = %err,
1059 "failed to persist replication slots"
1060 );
1061 }
1062 if let Err(err) = persist_replication_slot_catalog(self.slot_catalog_path.as_deref(), slots)
1063 {
1064 warn!(
1065 target: "reddb::replication::slots",
1066 error = %err,
1067 "failed to persist binary replication slot catalog"
1068 );
1069 }
1070 }
1071}
1072
1073#[cfg(test)]
1074mod tests;