1use anyhow::Context as _;
20
21use crate::{
22 infrastructure::{
23 observability::metrics::MetricsRegistry,
24 persistence::{
25 storage::ParquetStorage,
26 wal::{WALConfig, WALEntry, WriteAheadLog},
27 },
28 replication::protocol::{FollowerMessage, LeaderMessage},
29 },
30 store::EventStore,
31};
32use std::{
33 collections::HashMap,
34 sync::{
35 Arc,
36 atomic::{AtomicBool, AtomicU64, Ordering},
37 },
38 time::Duration,
39};
40use tokio::{
41 io::{AsyncBufReadExt, AsyncWriteExt, BufReader},
42 net::TcpStream,
43};
44
45#[derive(Debug, Clone, serde::Serialize)]
47pub struct FollowerReplicationStatus {
48 pub connected: bool,
50 pub leader: String,
52 pub replication_lag_ms: u64,
54 pub last_replayed_offset: u64,
56 pub leader_offset: u64,
58 pub total_replayed: u64,
60 pub corrupted_skipped: u64,
62 pub reconnect_count: u64,
64 pub snapshots_received: u64,
66}
67
68pub struct WalReceiver {
70 leader_addr: Arc<tokio::sync::RwLock<String>>,
72 local_wal: Arc<WriteAheadLog>,
74 store: Arc<EventStore>,
76 snapshot_dir: std::path::PathBuf,
78 connected: Arc<AtomicBool>,
80 last_replayed_offset: Arc<AtomicU64>,
82 leader_offset: Arc<AtomicU64>,
84 total_replayed: Arc<AtomicU64>,
86 corrupted_skipped: Arc<AtomicU64>,
88 reconnect_count: Arc<AtomicU64>,
90 snapshots_received: Arc<AtomicU64>,
92 metrics: Option<Arc<MetricsRegistry>>,
94 shutdown: Arc<AtomicBool>,
96 wake: Arc<tokio::sync::Notify>,
98}
99
100impl WalReceiver {
101 pub fn new(
106 leader_addr: String,
107 wal_dir: impl Into<std::path::PathBuf>,
108 store: Arc<EventStore>,
109 ) -> anyhow::Result<Self> {
110 let wal_dir = wal_dir.into();
111 let wal_config = WALConfig {
112 max_file_size: 64 * 1024 * 1024,
113 sync_on_write: true,
114 max_wal_files: 10,
115 compress: false,
116 };
117 let local_wal = Arc::new(WriteAheadLog::new(&wal_dir, wal_config)?);
118
119 let last_offset = local_wal.current_sequence();
121
122 let snapshot_dir = wal_dir
124 .parent()
125 .unwrap_or(&wal_dir)
126 .join("follower-snapshots");
127
128 Ok(Self {
129 leader_addr: Arc::new(tokio::sync::RwLock::new(leader_addr)),
130 local_wal,
131 store,
132 snapshot_dir,
133 connected: Arc::new(AtomicBool::new(false)),
134 last_replayed_offset: Arc::new(AtomicU64::new(last_offset)),
135 leader_offset: Arc::new(AtomicU64::new(0)),
136 total_replayed: Arc::new(AtomicU64::new(0)),
137 corrupted_skipped: Arc::new(AtomicU64::new(0)),
138 reconnect_count: Arc::new(AtomicU64::new(0)),
139 snapshots_received: Arc::new(AtomicU64::new(0)),
140 metrics: None,
141 shutdown: Arc::new(AtomicBool::new(false)),
142 wake: Arc::new(tokio::sync::Notify::new()),
143 })
144 }
145
146 pub fn set_metrics(&mut self, metrics: Arc<MetricsRegistry>) {
148 self.metrics = Some(metrics);
149 }
150
151 pub fn status(&self) -> FollowerReplicationStatus {
153 let last_replayed = self.last_replayed_offset.load(Ordering::Relaxed);
154 let leader_off = self.leader_offset.load(Ordering::Relaxed);
155 let lag = leader_off.saturating_sub(last_replayed);
156
157 let leader = self
159 .leader_addr
160 .try_read()
161 .map(|g| g.clone())
162 .unwrap_or_else(|_| "unknown".to_string());
163
164 FollowerReplicationStatus {
165 connected: self.connected.load(Ordering::Relaxed),
166 leader,
167 replication_lag_ms: lag,
168 last_replayed_offset: last_replayed,
169 leader_offset: leader_off,
170 total_replayed: self.total_replayed.load(Ordering::Relaxed),
171 corrupted_skipped: self.corrupted_skipped.load(Ordering::Relaxed),
172 reconnect_count: self.reconnect_count.load(Ordering::Relaxed),
173 snapshots_received: self.snapshots_received.load(Ordering::Relaxed),
174 }
175 }
176
177 pub fn shutdown(&self) {
181 self.shutdown.store(true, Ordering::Relaxed);
182 self.wake.notify_waiters();
183 }
184
185 pub fn repoint(&self, new_leader: &str) {
190 if let Ok(mut guard) = self.leader_addr.try_write() {
192 *guard = new_leader.to_string();
193 } else {
194 tracing::warn!(
195 "REPOINT: Could not acquire write lock on leader_addr, will retry on next reconnect"
196 );
197 }
198 self.wake.notify_waiters();
200 }
201
202 pub async fn run(self: Arc<Self>) {
206 let mut backoff = Duration::from_secs(1);
207 let max_backoff = Duration::from_secs(30);
208
209 loop {
210 if self.shutdown.load(Ordering::Relaxed) {
211 tracing::info!("WAL receiver shutdown requested — stopping");
212 break;
213 }
214
215 let leader_addr = self.leader_addr.read().await.clone();
216
217 tracing::info!(
218 "Connecting to leader at {} (last_offset={})",
219 leader_addr,
220 self.last_replayed_offset.load(Ordering::Relaxed),
221 );
222
223 match self.connect_and_stream().await {
224 Ok(()) => {
225 tracing::info!("Leader connection closed normally");
226 }
227 Err(e) => {
228 tracing::warn!("Leader connection error: {}", e);
229 }
230 }
231
232 if self.shutdown.load(Ordering::Relaxed) {
233 tracing::info!("WAL receiver shutdown requested — stopping");
234 break;
235 }
236
237 self.connected.store(false, Ordering::Relaxed);
238 self.reconnect_count.fetch_add(1, Ordering::Relaxed);
239 if let Some(ref m) = self.metrics {
240 m.replication_connected.set(0);
241 m.replication_reconnects_total.inc();
242 }
243
244 tracing::info!(
245 "Reconnecting to leader in {:?} (attempt {})",
246 backoff,
247 self.reconnect_count.load(Ordering::Relaxed),
248 );
249
250 tokio::select! {
252 _ = tokio::time::sleep(backoff) => {}
253 _ = self.wake.notified() => {
254 tracing::info!("WAL receiver woken early (repoint or shutdown)");
255 backoff = Duration::from_secs(1);
257 }
258 }
259
260 backoff = (backoff * 2).min(max_backoff);
262 }
263 }
264
265 async fn connect_and_stream(&self) -> anyhow::Result<()> {
267 let leader_addr = self.leader_addr.read().await.clone();
268 let stream = TcpStream::connect(&leader_addr)
269 .await
270 .context(format!("TCP connect to leader at {}", leader_addr))?;
271 let peer = stream.peer_addr()?;
272 tracing::info!("Connected to leader at {}", peer);
273
274 self.connected.store(true, Ordering::Relaxed);
275 if let Some(ref m) = self.metrics {
276 m.replication_connected.set(1);
277 }
278
279 let (reader, mut writer) = stream.into_split();
280 let mut reader = BufReader::new(reader);
281
282 let last_offset = self.last_replayed_offset.load(Ordering::Relaxed);
284 let subscribe = FollowerMessage::Subscribe { last_offset };
285 let mut json = serde_json::to_string(&subscribe)?;
286 json.push('\n');
287 writer
288 .write_all(json.as_bytes())
289 .await
290 .context("sending Subscribe message to leader")?;
291 writer
292 .flush()
293 .await
294 .context("flushing Subscribe message to leader")?;
295
296 tracing::info!("Subscribed to leader with last_offset={}", last_offset);
297
298 let mut line = String::new();
300 loop {
301 line.clear();
302 let bytes_read = reader
303 .read_line(&mut line)
304 .await
305 .context("reading WAL message from leader")?;
306 if bytes_read == 0 {
307 anyhow::bail!("Leader closed the connection");
309 }
310
311 let trimmed = line.trim();
312 if trimmed.is_empty() {
313 continue;
314 }
315
316 let msg: LeaderMessage =
317 serde_json::from_str(trimmed).context("parsing WAL LeaderMessage JSON")?;
318
319 match msg {
320 LeaderMessage::CaughtUp { current_offset } => {
321 tracing::info!("Caught up with leader at offset {}", current_offset,);
322 self.leader_offset.store(current_offset, Ordering::Relaxed);
323 if let Some(ref m) = self.metrics {
324 let last_replayed = self.last_replayed_offset.load(Ordering::Relaxed);
325 let lag = current_offset.saturating_sub(last_replayed);
326 m.replication_lag_seconds.set(lag as i64);
327 }
328 }
329 LeaderMessage::WalEntry { offset, data } => {
330 self.handle_wal_entry(offset, data, &mut writer).await?;
331 }
332 LeaderMessage::SnapshotStart { parquet_files } => {
333 self.handle_snapshot(&parquet_files, &mut reader, &mut writer)
334 .await?;
335 }
336 LeaderMessage::SnapshotChunk { .. } | LeaderMessage::SnapshotEnd { .. } => {
338 tracing::warn!(
339 "Received unexpected snapshot message outside of snapshot transfer"
340 );
341 }
342 }
343 }
344 }
345
346 async fn handle_snapshot(
352 &self,
353 expected_files: &[String],
354 reader: &mut BufReader<tokio::net::tcp::OwnedReadHalf>,
355 writer: &mut tokio::net::tcp::OwnedWriteHalf,
356 ) -> anyhow::Result<()> {
357 tracing::info!(
358 "Receiving Parquet snapshot ({} files: {:?})",
359 expected_files.len(),
360 expected_files,
361 );
362
363 tokio::fs::create_dir_all(&self.snapshot_dir).await?;
365
366 let mut file_buffers: HashMap<String, Vec<u8>> = HashMap::new();
368 for filename in expected_files {
369 file_buffers.insert(filename.clone(), Vec::new());
370 }
371
372 let mut line = String::new();
374 let wal_offset_after_snapshot;
375
376 loop {
377 line.clear();
378 let bytes_read = reader
379 .read_line(&mut line)
380 .await
381 .context("reading snapshot message from leader")?;
382 if bytes_read == 0 {
383 anyhow::bail!("Leader closed connection during snapshot transfer");
384 }
385
386 let trimmed = line.trim();
387 if trimmed.is_empty() {
388 continue;
389 }
390
391 let msg: LeaderMessage =
392 serde_json::from_str(trimmed).context("parsing snapshot LeaderMessage JSON")?;
393
394 match msg {
395 LeaderMessage::SnapshotChunk {
396 filename,
397 data,
398 chunk_offset: _,
399 is_last,
400 } => {
401 use base64::Engine;
402 let decoded = base64::engine::general_purpose::STANDARD.decode(&data)?;
403
404 let buffer = file_buffers.entry(filename.clone()).or_default();
405 buffer.extend_from_slice(&decoded);
406
407 if is_last {
408 let file_path = self.snapshot_dir.join(&filename);
410 tokio::fs::write(&file_path, &buffer).await?;
411 tracing::info!(
412 "Received Parquet file {} ({} bytes)",
413 filename,
414 buffer.len(),
415 );
416 }
417 }
418 LeaderMessage::SnapshotEnd {
419 wal_offset_after_snapshot: offset,
420 } => {
421 wal_offset_after_snapshot = offset;
422 tracing::info!(
423 "Snapshot transfer complete, WAL resume offset={}",
424 wal_offset_after_snapshot,
425 );
426 break;
427 }
428 LeaderMessage::WalEntry { .. } | LeaderMessage::CaughtUp { .. } => {
429 tracing::warn!("Received unexpected WAL message during snapshot transfer");
430 }
431 LeaderMessage::SnapshotStart { .. } => {
432 tracing::warn!("Received unexpected SnapshotStart during snapshot transfer");
433 }
434 }
435 }
436
437 let snapshot_dir = self.snapshot_dir.clone();
439 let store = Arc::clone(&self.store);
440
441 let temp_storage = ParquetStorage::new(&snapshot_dir)?;
443 let events = temp_storage.load_all_events()?;
444
445 tracing::info!(
446 "Loading {} events from snapshot into EventStore",
447 events.len(),
448 );
449
450 let mut replayed = 0u64;
451 for event in events {
452 if let Err(e) = store.ingest_replicated(event) {
453 tracing::error!("Failed to replay snapshot event: {}", e);
454 } else {
455 replayed += 1;
456 }
457 }
458
459 self.last_replayed_offset
461 .store(wal_offset_after_snapshot, Ordering::Relaxed);
462 self.total_replayed.fetch_add(replayed, Ordering::Relaxed);
463 self.snapshots_received.fetch_add(1, Ordering::Relaxed);
464
465 for filename in expected_files {
467 let file_path = self.snapshot_dir.join(filename);
468 if let Err(e) = tokio::fs::remove_file(&file_path).await {
469 tracing::debug!("Failed to clean up snapshot file {}: {}", filename, e);
470 }
471 }
472
473 tracing::info!(
474 "Snapshot catch-up complete: {} events loaded, resuming WAL from offset {}",
475 replayed,
476 wal_offset_after_snapshot,
477 );
478
479 self.send_ack(wal_offset_after_snapshot, writer).await?;
481
482 Ok(())
483 }
484
485 async fn handle_wal_entry(
487 &self,
488 offset: u64,
489 entry: WALEntry,
490 writer: &mut tokio::net::tcp::OwnedWriteHalf,
491 ) -> anyhow::Result<()> {
492 self.leader_offset.store(offset, Ordering::Relaxed);
494
495 if let Some(ref m) = self.metrics {
497 m.replication_wal_received_total.inc();
498 }
499
500 if !entry.verify() {
502 tracing::error!(
503 "CRC32 validation failed for WAL entry at offset {} — skipping",
504 offset,
505 );
506 self.corrupted_skipped.fetch_add(1, Ordering::Relaxed);
507 return Ok(());
508 }
509
510 let current = self.last_replayed_offset.load(Ordering::Relaxed);
512 if offset <= current {
513 tracing::debug!("Skipping already-replayed offset {}", offset);
514 self.send_ack(offset, writer).await?;
516 return Ok(());
517 }
518
519 let event = entry.event.clone();
521 if let Err(e) = self.local_wal.append(event.clone()) {
522 tracing::error!("Failed to write to local WAL at offset {}: {}", offset, e);
523 }
526
527 if let Err(e) = self.store.ingest_replicated(event) {
529 tracing::error!(
530 "Failed to replay event at offset {} into store: {}",
531 offset,
532 e
533 );
534 return Ok(());
536 }
537
538 self.last_replayed_offset.store(offset, Ordering::Relaxed);
540 self.total_replayed.fetch_add(1, Ordering::Relaxed);
541
542 if let Some(ref m) = self.metrics {
544 m.replication_wal_replayed_total.inc();
545 let lag = self
546 .leader_offset
547 .load(Ordering::Relaxed)
548 .saturating_sub(offset);
549 m.replication_lag_seconds.set(lag as i64);
550 }
551
552 self.send_ack(offset, writer).await?;
554
555 tracing::trace!("Replayed WAL entry at offset {}", offset);
556
557 Ok(())
558 }
559
560 async fn send_ack(
562 &self,
563 offset: u64,
564 writer: &mut tokio::net::tcp::OwnedWriteHalf,
565 ) -> anyhow::Result<()> {
566 let ack = FollowerMessage::Ack { offset };
567 let mut json = serde_json::to_string(&ack)?;
568 json.push('\n');
569 writer
570 .write_all(json.as_bytes())
571 .await
572 .context("sending ACK to leader")?;
573 writer.flush().await.context("flushing ACK to leader")?;
574 Ok(())
575 }
576}
577
578#[cfg(test)]
579mod tests {
580 use super::*;
581
582 #[test]
583 fn test_follower_replication_status_serialization() {
584 let status = FollowerReplicationStatus {
585 connected: true,
586 leader: "core-leader:3910".to_string(),
587 replication_lag_ms: 42,
588 last_replayed_offset: 100,
589 leader_offset: 142,
590 total_replayed: 100,
591 corrupted_skipped: 0,
592 reconnect_count: 1,
593 snapshots_received: 0,
594 };
595 let json = serde_json::to_value(&status).unwrap();
596 assert_eq!(json["connected"], true);
597 assert_eq!(json["leader"], "core-leader:3910");
598 assert_eq!(json["replication_lag_ms"], 42);
599 assert_eq!(json["last_replayed_offset"], 100);
600 assert_eq!(json["leader_offset"], 142);
601 assert_eq!(json["total_replayed"], 100);
602 assert_eq!(json["corrupted_skipped"], 0);
603 assert_eq!(json["reconnect_count"], 1);
604 assert_eq!(json["snapshots_received"], 0);
605 }
606
607 #[test]
608 fn test_follower_replication_status_defaults() {
609 let status = FollowerReplicationStatus {
610 connected: false,
611 leader: "localhost:3910".to_string(),
612 replication_lag_ms: 0,
613 last_replayed_offset: 0,
614 leader_offset: 0,
615 total_replayed: 0,
616 corrupted_skipped: 0,
617 reconnect_count: 0,
618 snapshots_received: 0,
619 };
620 let json = serde_json::to_value(&status).unwrap();
621 assert_eq!(json["connected"], false);
622 assert_eq!(json["replication_lag_ms"], 0);
623 assert_eq!(json["snapshots_received"], 0);
624 }
625
626 #[test]
627 fn test_wal_receiver_creation() {
628 let store = Arc::new(EventStore::new());
629 let temp_dir = tempfile::TempDir::new().unwrap();
630 let receiver = WalReceiver::new(
631 "localhost:3910".to_string(),
632 temp_dir.path().join("follower-wal"),
633 store,
634 );
635 assert!(receiver.is_ok());
636
637 let receiver = receiver.unwrap();
638 let status = receiver.status();
639 assert!(!status.connected);
640 assert_eq!(status.leader, "localhost:3910");
641 assert_eq!(status.last_replayed_offset, 0);
642 assert_eq!(status.total_replayed, 0);
643 assert_eq!(status.snapshots_received, 0);
644 }
645
646 #[test]
647 fn test_wal_receiver_recovers_offset_from_local_wal() {
648 let store = Arc::new(EventStore::new());
649 let temp_dir = tempfile::TempDir::new().unwrap();
650 let wal_dir = temp_dir.path().join("follower-wal");
651
652 {
654 let wal = WriteAheadLog::new(&wal_dir, WALConfig::default()).unwrap();
655 let event = crate::test_utils::test_event("test-entity", "test.replicated");
656 wal.append(event).unwrap();
657 let event2 = crate::test_utils::test_event("test-entity", "test.replicated");
658 wal.append(event2).unwrap();
659 }
660
661 let receiver = WalReceiver::new("localhost:3910".to_string(), &wal_dir, store).unwrap();
663
664 let status = receiver.status();
670 assert_eq!(status.last_replayed_offset, 0);
671 }
672
673 #[test]
674 fn test_snapshot_dir_created_correctly() {
675 let store = Arc::new(EventStore::new());
676 let temp_dir = tempfile::TempDir::new().unwrap();
677 let wal_dir = temp_dir.path().join("follower-wal");
678
679 let receiver = WalReceiver::new("localhost:3910".to_string(), &wal_dir, store).unwrap();
680
681 assert_eq!(
683 receiver.snapshot_dir,
684 temp_dir.path().join("follower-snapshots"),
685 );
686 }
687}