1use anyhow::Context as _;
20
21use crate::{
22 infrastructure::{
23 observability::metrics::MetricsRegistry,
24 persistence::{
25 storage::ParquetStorage,
26 wal::{WALConfig, WALEntry, WriteAheadLog},
27 },
28 replication::protocol::{FollowerMessage, LeaderMessage},
29 },
30 store::EventStore,
31};
32use std::{
33 collections::HashMap,
34 sync::{
35 Arc,
36 atomic::{AtomicBool, AtomicU64, Ordering},
37 },
38 time::Duration,
39};
40use tokio::{
41 io::{AsyncBufReadExt, AsyncWriteExt, BufReader},
42 net::TcpStream,
43};
44
45#[derive(Debug, Clone, serde::Serialize)]
47pub struct FollowerReplicationStatus {
48 pub connected: bool,
50 pub leader: String,
52 pub replication_lag_ms: u64,
54 pub last_replayed_offset: u64,
56 pub leader_offset: u64,
58 pub total_replayed: u64,
60 pub corrupted_skipped: u64,
62 pub reconnect_count: u64,
64 pub snapshots_received: u64,
66}
67
68pub struct WalReceiver {
70 leader_addr: Arc<tokio::sync::RwLock<String>>,
72 local_wal: Arc<WriteAheadLog>,
74 store: Arc<EventStore>,
76 snapshot_dir: std::path::PathBuf,
78 connected: Arc<AtomicBool>,
80 last_replayed_offset: Arc<AtomicU64>,
82 leader_offset: Arc<AtomicU64>,
84 total_replayed: Arc<AtomicU64>,
86 corrupted_skipped: Arc<AtomicU64>,
88 reconnect_count: Arc<AtomicU64>,
90 snapshots_received: Arc<AtomicU64>,
92 metrics: Option<Arc<MetricsRegistry>>,
94 shutdown: Arc<AtomicBool>,
96 wake: Arc<tokio::sync::Notify>,
98}
99
100impl WalReceiver {
101 pub fn new(
106 leader_addr: String,
107 wal_dir: impl Into<std::path::PathBuf>,
108 store: Arc<EventStore>,
109 ) -> anyhow::Result<Self> {
110 let wal_dir = wal_dir.into();
111 let wal_config = WALConfig {
112 max_file_size: 64 * 1024 * 1024,
113 sync_on_write: true,
114 max_wal_files: 10,
115 compress: false,
116 ..WALConfig::default()
117 };
118 let local_wal = Arc::new(WriteAheadLog::new(&wal_dir, wal_config)?);
119
120 let last_offset = local_wal.current_sequence();
122
123 let snapshot_dir = wal_dir
125 .parent()
126 .unwrap_or(&wal_dir)
127 .join("follower-snapshots");
128
129 Ok(Self {
130 leader_addr: Arc::new(tokio::sync::RwLock::new(leader_addr)),
131 local_wal,
132 store,
133 snapshot_dir,
134 connected: Arc::new(AtomicBool::new(false)),
135 last_replayed_offset: Arc::new(AtomicU64::new(last_offset)),
136 leader_offset: Arc::new(AtomicU64::new(0)),
137 total_replayed: Arc::new(AtomicU64::new(0)),
138 corrupted_skipped: Arc::new(AtomicU64::new(0)),
139 reconnect_count: Arc::new(AtomicU64::new(0)),
140 snapshots_received: Arc::new(AtomicU64::new(0)),
141 metrics: None,
142 shutdown: Arc::new(AtomicBool::new(false)),
143 wake: Arc::new(tokio::sync::Notify::new()),
144 })
145 }
146
147 pub fn set_metrics(&mut self, metrics: Arc<MetricsRegistry>) {
149 self.metrics = Some(metrics);
150 }
151
152 pub fn status(&self) -> FollowerReplicationStatus {
154 let last_replayed = self.last_replayed_offset.load(Ordering::Relaxed);
155 let leader_off = self.leader_offset.load(Ordering::Relaxed);
156 let lag = leader_off.saturating_sub(last_replayed);
157
158 let leader = self
160 .leader_addr
161 .try_read()
162 .map_or_else(|_| "unknown".to_string(), |g| g.clone());
163
164 FollowerReplicationStatus {
165 connected: self.connected.load(Ordering::Relaxed),
166 leader,
167 replication_lag_ms: lag,
168 last_replayed_offset: last_replayed,
169 leader_offset: leader_off,
170 total_replayed: self.total_replayed.load(Ordering::Relaxed),
171 corrupted_skipped: self.corrupted_skipped.load(Ordering::Relaxed),
172 reconnect_count: self.reconnect_count.load(Ordering::Relaxed),
173 snapshots_received: self.snapshots_received.load(Ordering::Relaxed),
174 }
175 }
176
177 pub fn shutdown(&self) {
181 self.shutdown.store(true, Ordering::Relaxed);
182 self.wake.notify_waiters();
183 }
184
185 pub fn repoint(&self, new_leader: &str) {
190 if let Ok(mut guard) = self.leader_addr.try_write() {
192 *guard = new_leader.to_string();
193 } else {
194 tracing::warn!(
195 "REPOINT: Could not acquire write lock on leader_addr, will retry on next reconnect"
196 );
197 }
198 self.wake.notify_waiters();
200 }
201
202 #[cfg_attr(feature = "hotpath", hotpath::measure)]
206 pub async fn run(self: Arc<Self>) {
207 let mut backoff = Duration::from_secs(1);
208 let max_backoff = Duration::from_secs(30);
209
210 loop {
211 if self.shutdown.load(Ordering::Relaxed) {
212 tracing::info!("WAL receiver shutdown requested — stopping");
213 break;
214 }
215
216 let leader_addr = self.leader_addr.read().await.clone();
217
218 tracing::info!(
219 "Connecting to leader at {} (last_offset={})",
220 leader_addr,
221 self.last_replayed_offset.load(Ordering::Relaxed),
222 );
223
224 match self.connect_and_stream().await {
225 Ok(()) => {
226 tracing::info!("Leader connection closed normally");
227 }
228 Err(e) => {
229 tracing::warn!("Leader connection error: {}", e);
230 }
231 }
232
233 if self.shutdown.load(Ordering::Relaxed) {
234 tracing::info!("WAL receiver shutdown requested — stopping");
235 break;
236 }
237
238 self.connected.store(false, Ordering::Relaxed);
239 self.reconnect_count.fetch_add(1, Ordering::Relaxed);
240 if let Some(ref m) = self.metrics {
241 m.replication_connected.set(0);
242 m.replication_reconnects_total.inc();
243 }
244
245 tracing::info!(
246 "Reconnecting to leader in {:?} (attempt {})",
247 backoff,
248 self.reconnect_count.load(Ordering::Relaxed),
249 );
250
251 tokio::select! {
253 () = tokio::time::sleep(backoff) => {}
254 () = self.wake.notified() => {
255 tracing::info!("WAL receiver woken early (repoint or shutdown)");
256 backoff = Duration::from_secs(1);
258 }
259 }
260
261 backoff = (backoff * 2).min(max_backoff);
263 }
264 }
265
266 async fn connect_and_stream(&self) -> anyhow::Result<()> {
268 let leader_addr = self.leader_addr.read().await.clone();
269 let stream = TcpStream::connect(&leader_addr)
270 .await
271 .context(format!("TCP connect to leader at {leader_addr}"))?;
272 let peer = stream.peer_addr()?;
273 tracing::info!("Connected to leader at {}", peer);
274
275 self.connected.store(true, Ordering::Relaxed);
276 if let Some(ref m) = self.metrics {
277 m.replication_connected.set(1);
278 }
279
280 let (reader, mut writer) = stream.into_split();
281 let mut reader = BufReader::new(reader);
282
283 let last_offset = self.last_replayed_offset.load(Ordering::Relaxed);
285 let subscribe = FollowerMessage::Subscribe { last_offset };
286 let mut json = serde_json::to_string(&subscribe)?;
287 json.push('\n');
288 writer
289 .write_all(json.as_bytes())
290 .await
291 .context("sending Subscribe message to leader")?;
292 writer
293 .flush()
294 .await
295 .context("flushing Subscribe message to leader")?;
296
297 tracing::info!("Subscribed to leader with last_offset={}", last_offset);
298
299 let mut line = String::new();
301 loop {
302 line.clear();
303 let bytes_read = reader
304 .read_line(&mut line)
305 .await
306 .context("reading WAL message from leader")?;
307 if bytes_read == 0 {
308 anyhow::bail!("Leader closed the connection");
310 }
311
312 let trimmed = line.trim();
313 if trimmed.is_empty() {
314 continue;
315 }
316
317 let msg: LeaderMessage =
318 serde_json::from_str(trimmed).context("parsing WAL LeaderMessage JSON")?;
319
320 match msg {
321 LeaderMessage::CaughtUp { current_offset } => {
322 tracing::info!("Caught up with leader at offset {}", current_offset,);
323 self.leader_offset.store(current_offset, Ordering::Relaxed);
324 if let Some(ref m) = self.metrics {
325 let last_replayed = self.last_replayed_offset.load(Ordering::Relaxed);
326 let lag = current_offset.saturating_sub(last_replayed);
327 m.replication_lag_seconds.set(lag as i64);
328 }
329 }
330 LeaderMessage::WalEntry { offset, data } => {
331 self.handle_wal_entry(offset, data, &mut writer).await?;
332 }
333 LeaderMessage::SnapshotStart { parquet_files } => {
334 self.handle_snapshot(&parquet_files, &mut reader, &mut writer)
335 .await?;
336 }
337 LeaderMessage::SnapshotChunk { .. } | LeaderMessage::SnapshotEnd { .. } => {
339 tracing::warn!(
340 "Received unexpected snapshot message outside of snapshot transfer"
341 );
342 }
343 }
344 }
345 }
346
347 async fn handle_snapshot(
353 &self,
354 expected_files: &[String],
355 reader: &mut BufReader<tokio::net::tcp::OwnedReadHalf>,
356 writer: &mut tokio::net::tcp::OwnedWriteHalf,
357 ) -> anyhow::Result<()> {
358 tracing::info!(
359 "Receiving Parquet snapshot ({} files: {:?})",
360 expected_files.len(),
361 expected_files,
362 );
363
364 tokio::fs::create_dir_all(&self.snapshot_dir).await?;
366
367 let mut file_buffers: HashMap<String, Vec<u8>> = HashMap::new();
369 for filename in expected_files {
370 file_buffers.insert(filename.clone(), Vec::new());
371 }
372
373 let mut line = String::new();
375 let wal_offset_after_snapshot;
376
377 loop {
378 line.clear();
379 let bytes_read = reader
380 .read_line(&mut line)
381 .await
382 .context("reading snapshot message from leader")?;
383 if bytes_read == 0 {
384 anyhow::bail!("Leader closed connection during snapshot transfer");
385 }
386
387 let trimmed = line.trim();
388 if trimmed.is_empty() {
389 continue;
390 }
391
392 let msg: LeaderMessage =
393 serde_json::from_str(trimmed).context("parsing snapshot LeaderMessage JSON")?;
394
395 match msg {
396 LeaderMessage::SnapshotChunk {
397 filename,
398 data,
399 chunk_offset: _,
400 is_last,
401 } => {
402 use base64::Engine;
403 let decoded = base64::engine::general_purpose::STANDARD.decode(&data)?;
404
405 let buffer = file_buffers.entry(filename.clone()).or_default();
406 buffer.extend_from_slice(&decoded);
407
408 if is_last {
409 let file_path = self.snapshot_dir.join(&filename);
411 tokio::fs::write(&file_path, &buffer).await?;
412 tracing::info!(
413 "Received Parquet file {} ({} bytes)",
414 filename,
415 buffer.len(),
416 );
417 }
418 }
419 LeaderMessage::SnapshotEnd {
420 wal_offset_after_snapshot: offset,
421 } => {
422 wal_offset_after_snapshot = offset;
423 tracing::info!(
424 "Snapshot transfer complete, WAL resume offset={}",
425 wal_offset_after_snapshot,
426 );
427 break;
428 }
429 LeaderMessage::WalEntry { .. } | LeaderMessage::CaughtUp { .. } => {
430 tracing::warn!("Received unexpected WAL message during snapshot transfer");
431 }
432 LeaderMessage::SnapshotStart { .. } => {
433 tracing::warn!("Received unexpected SnapshotStart during snapshot transfer");
434 }
435 }
436 }
437
438 let snapshot_dir = self.snapshot_dir.clone();
440 let store = Arc::clone(&self.store);
441
442 let temp_storage = ParquetStorage::new(&snapshot_dir)?;
444 let events = temp_storage.load_all_events()?;
445
446 tracing::info!(
447 "Loading {} events from snapshot into EventStore",
448 events.len(),
449 );
450
451 let mut replayed = 0u64;
452 for event in events {
453 if let Err(e) = store.ingest_replicated(&event) {
454 tracing::error!("Failed to replay snapshot event: {}", e);
455 } else {
456 replayed += 1;
457 }
458 }
459
460 self.last_replayed_offset
462 .store(wal_offset_after_snapshot, Ordering::Relaxed);
463 self.total_replayed.fetch_add(replayed, Ordering::Relaxed);
464 self.snapshots_received.fetch_add(1, Ordering::Relaxed);
465
466 for filename in expected_files {
468 let file_path = self.snapshot_dir.join(filename);
469 if let Err(e) = tokio::fs::remove_file(&file_path).await {
470 tracing::debug!("Failed to clean up snapshot file {}: {}", filename, e);
471 }
472 }
473
474 tracing::info!(
475 "Snapshot catch-up complete: {} events loaded, resuming WAL from offset {}",
476 replayed,
477 wal_offset_after_snapshot,
478 );
479
480 self.send_ack(wal_offset_after_snapshot, writer).await?;
482
483 Ok(())
484 }
485
486 async fn handle_wal_entry(
488 &self,
489 offset: u64,
490 entry: WALEntry,
491 writer: &mut tokio::net::tcp::OwnedWriteHalf,
492 ) -> anyhow::Result<()> {
493 self.leader_offset.store(offset, Ordering::Relaxed);
495
496 if let Some(ref m) = self.metrics {
498 m.replication_wal_received_total.inc();
499 }
500
501 if !entry.verify() {
503 tracing::error!(
504 "CRC32 validation failed for WAL entry at offset {} — skipping",
505 offset,
506 );
507 self.corrupted_skipped.fetch_add(1, Ordering::Relaxed);
508 return Ok(());
509 }
510
511 let current = self.last_replayed_offset.load(Ordering::Relaxed);
513 if offset <= current {
514 tracing::debug!("Skipping already-replayed offset {}", offset);
515 self.send_ack(offset, writer).await?;
517 return Ok(());
518 }
519
520 let event = entry.event.clone();
522 if let Err(e) = self.local_wal.append(event.clone()) {
523 tracing::error!("Failed to write to local WAL at offset {}: {}", offset, e);
524 }
527
528 if let Err(e) = self.store.ingest_replicated(&event) {
530 tracing::error!(
531 "Failed to replay event at offset {} into store: {}",
532 offset,
533 e
534 );
535 return Ok(());
537 }
538
539 self.last_replayed_offset.store(offset, Ordering::Relaxed);
541 self.total_replayed.fetch_add(1, Ordering::Relaxed);
542
543 if let Some(ref m) = self.metrics {
545 m.replication_wal_replayed_total.inc();
546 let lag = self
547 .leader_offset
548 .load(Ordering::Relaxed)
549 .saturating_sub(offset);
550 m.replication_lag_seconds.set(lag as i64);
551 }
552
553 self.send_ack(offset, writer).await?;
555
556 tracing::trace!("Replayed WAL entry at offset {}", offset);
557
558 Ok(())
559 }
560
561 async fn send_ack(
563 &self,
564 offset: u64,
565 writer: &mut tokio::net::tcp::OwnedWriteHalf,
566 ) -> anyhow::Result<()> {
567 let ack = FollowerMessage::Ack { offset };
568 let mut json = serde_json::to_string(&ack)?;
569 json.push('\n');
570 writer
571 .write_all(json.as_bytes())
572 .await
573 .context("sending ACK to leader")?;
574 writer.flush().await.context("flushing ACK to leader")?;
575 Ok(())
576 }
577}
578
579#[cfg(test)]
580mod tests {
581 use super::*;
582
583 #[test]
584 fn test_follower_replication_status_serialization() {
585 let status = FollowerReplicationStatus {
586 connected: true,
587 leader: "core-leader:3910".to_string(),
588 replication_lag_ms: 42,
589 last_replayed_offset: 100,
590 leader_offset: 142,
591 total_replayed: 100,
592 corrupted_skipped: 0,
593 reconnect_count: 1,
594 snapshots_received: 0,
595 };
596 let json = serde_json::to_value(&status).unwrap();
597 assert_eq!(json["connected"], true);
598 assert_eq!(json["leader"], "core-leader:3910");
599 assert_eq!(json["replication_lag_ms"], 42);
600 assert_eq!(json["last_replayed_offset"], 100);
601 assert_eq!(json["leader_offset"], 142);
602 assert_eq!(json["total_replayed"], 100);
603 assert_eq!(json["corrupted_skipped"], 0);
604 assert_eq!(json["reconnect_count"], 1);
605 assert_eq!(json["snapshots_received"], 0);
606 }
607
608 #[test]
609 fn test_follower_replication_status_defaults() {
610 let status = FollowerReplicationStatus {
611 connected: false,
612 leader: "localhost:3910".to_string(),
613 replication_lag_ms: 0,
614 last_replayed_offset: 0,
615 leader_offset: 0,
616 total_replayed: 0,
617 corrupted_skipped: 0,
618 reconnect_count: 0,
619 snapshots_received: 0,
620 };
621 let json = serde_json::to_value(&status).unwrap();
622 assert_eq!(json["connected"], false);
623 assert_eq!(json["replication_lag_ms"], 0);
624 assert_eq!(json["snapshots_received"], 0);
625 }
626
627 #[test]
628 fn test_wal_receiver_creation() {
629 let store = Arc::new(EventStore::new());
630 let temp_dir = tempfile::TempDir::new().unwrap();
631 let receiver = WalReceiver::new(
632 "localhost:3910".to_string(),
633 temp_dir.path().join("follower-wal"),
634 store,
635 );
636 assert!(receiver.is_ok());
637
638 let receiver = receiver.unwrap();
639 let status = receiver.status();
640 assert!(!status.connected);
641 assert_eq!(status.leader, "localhost:3910");
642 assert_eq!(status.last_replayed_offset, 0);
643 assert_eq!(status.total_replayed, 0);
644 assert_eq!(status.snapshots_received, 0);
645 }
646
647 #[test]
648 fn test_wal_receiver_recovers_offset_from_local_wal() {
649 let store = Arc::new(EventStore::new());
650 let temp_dir = tempfile::TempDir::new().unwrap();
651 let wal_dir = temp_dir.path().join("follower-wal");
652
653 {
655 let wal = WriteAheadLog::new(&wal_dir, WALConfig::default()).unwrap();
656 let event = crate::test_utils::test_event("test-entity", "test.replicated");
657 wal.append(event).unwrap();
658 let event2 = crate::test_utils::test_event("test-entity", "test.replicated");
659 wal.append(event2).unwrap();
660 }
661
662 let receiver = WalReceiver::new("localhost:3910".to_string(), &wal_dir, store).unwrap();
664
665 let status = receiver.status();
671 assert_eq!(status.last_replayed_offset, 0);
672 }
673
674 #[test]
675 fn test_snapshot_dir_created_correctly() {
676 let store = Arc::new(EventStore::new());
677 let temp_dir = tempfile::TempDir::new().unwrap();
678 let wal_dir = temp_dir.path().join("follower-wal");
679
680 let receiver = WalReceiver::new("localhost:3910".to_string(), &wal_dir, store).unwrap();
681
682 assert_eq!(
684 receiver.snapshot_dir,
685 temp_dir.path().join("follower-snapshots"),
686 );
687 }
688}