replication_engine/coordinator/
mod.rs1mod types;
21mod hot_path;
22mod cold_path;
23
24pub use types::{EngineState, HealthCheck, PeerHealth};
25
26use crate::circuit_breaker::SyncEngineCircuit;
27use crate::config::ReplicationConfig;
28use crate::cursor::CursorStore;
29use crate::error::{ReplicationError, Result};
30use crate::metrics;
31use crate::peer::PeerManager;
32use crate::resilience::{RetryConfig, RateLimiter};
33use crate::sync_engine::{SyncEngineRef, NoOpSyncEngine};
34use std::sync::Arc;
35use std::sync::atomic::AtomicUsize;
36use tokio::sync::{watch, RwLock};
37use tracing::{debug, error, info, warn};
38
39pub struct ReplicationEngine<S: SyncEngineRef = NoOpSyncEngine> {
54 config: ReplicationConfig,
56
57 #[allow(dead_code)]
59 config_rx: watch::Receiver<ReplicationConfig>,
60
61 state_tx: watch::Sender<EngineState>,
63
64 state_rx: watch::Receiver<EngineState>,
66
67 sync_engine: Arc<S>,
69
70 circuit: Arc<SyncEngineCircuit>,
72
73 peer_manager: Arc<PeerManager>,
75
76 cursor_store: Arc<RwLock<Option<CursorStore>>>,
78
79 shutdown_tx: watch::Sender<bool>,
81
82 shutdown_rx: watch::Receiver<bool>,
84
85 catching_up_count: Arc<AtomicUsize>,
88
89 hot_path_handles: RwLock<Vec<tokio::task::JoinHandle<()>>>,
91}
92
93impl ReplicationEngine<NoOpSyncEngine> {
94 pub fn new(
99 config: ReplicationConfig,
100 config_rx: watch::Receiver<ReplicationConfig>,
101 ) -> Self {
102 Self::with_sync_engine(config, config_rx, Arc::new(NoOpSyncEngine))
103 }
104}
105
106impl<S: SyncEngineRef> ReplicationEngine<S> {
107 pub fn with_sync_engine(
116 config: ReplicationConfig,
117 config_rx: watch::Receiver<ReplicationConfig>,
118 sync_engine: Arc<S>,
119 ) -> Self {
120 let (state_tx, state_rx) = watch::channel(EngineState::Created);
121 let (shutdown_tx, shutdown_rx) = watch::channel(false);
122
123 let retry_config = RetryConfig::daemon();
125 let peer_manager = Arc::new(PeerManager::new(retry_config));
126
127 for peer_config in &config.peers {
129 peer_manager.add_peer(peer_config.clone());
130 }
131
132 Self {
133 config,
134 config_rx,
135 state_tx,
136 state_rx,
137 sync_engine,
138 circuit: Arc::new(SyncEngineCircuit::new()),
139 peer_manager,
140 cursor_store: Arc::new(RwLock::new(None)),
141 shutdown_tx,
142 shutdown_rx,
143 catching_up_count: Arc::new(AtomicUsize::new(0)),
144 hot_path_handles: RwLock::new(Vec::new()),
145 }
146 }
147
148 pub fn sync_engine(&self) -> &Arc<S> {
150 &self.sync_engine
151 }
152
153 pub fn circuit(&self) -> &Arc<SyncEngineCircuit> {
155 &self.circuit
156 }
157
158 pub fn state(&self) -> EngineState {
160 *self.state_rx.borrow()
161 }
162
163 pub fn state_receiver(&self) -> watch::Receiver<EngineState> {
165 self.state_rx.clone()
166 }
167
168 pub fn is_running(&self) -> bool {
170 matches!(self.state(), EngineState::Running)
171 }
172
173 pub async fn health_check(&self) -> HealthCheck {
205 use crate::peer::PeerCircuitState;
206
207 let state = self.state();
208 let sync_engine_accepting_writes = self.sync_engine.should_accept_writes();
209
210 let all_peers = self.peer_manager.all();
212 let peers_total = all_peers.len();
213 let mut peers = Vec::with_capacity(peers_total);
214 let mut peers_connected = 0;
215 let mut peers_circuit_open = 0;
216 let peers_catching_up = self.catching_up_count.load(std::sync::atomic::Ordering::Relaxed);
217
218 for peer in all_peers {
219 let connected = peer.is_connected().await;
220 let circuit_state = peer.circuit_state().await;
221 let circuit_open = circuit_state == PeerCircuitState::Open;
222
223 if connected {
224 peers_connected += 1;
225 }
226 if circuit_open {
227 peers_circuit_open += 1;
228 }
229
230 peers.push(PeerHealth {
231 node_id: peer.node_id().to_string(),
232 connected,
233 circuit_state,
234 circuit_open,
235 failure_count: peer.failure_count(),
236 millis_since_success: peer.millis_since_success(),
237 lag_ms: None, catching_up: false, });
240 }
241
242 let ready = state == EngineState::Running && peers_connected > 0;
244 let healthy = ready && sync_engine_accepting_writes;
245
246 HealthCheck {
247 state,
248 ready,
249 sync_engine_accepting_writes,
250 peers_total,
251 peers_connected,
252 peers_circuit_open,
253 peers_catching_up,
254 peers,
255 healthy,
256 }
257 }
258
259 pub async fn start(&mut self) -> Result<()> {
266 if self.state() != EngineState::Created {
267 return Err(ReplicationError::InvalidState {
268 expected: "Created".to_string(),
269 actual: format!("{:?}", self.state()),
270 });
271 }
272
273 info!(
274 node_id = %self.config.local_node_id,
275 peer_count = self.config.peers.len(),
276 "Starting replication engine"
277 );
278
279 let _ = self.state_tx.send(EngineState::Connecting);
281 metrics::set_engine_state("Connecting");
282
283 let cursor_store = CursorStore::new(&self.config.cursor.sqlite_path).await?;
285 *self.cursor_store.write().await = Some(cursor_store);
286 info!(path = %self.config.cursor.sqlite_path, "Cursor store initialized");
287
288 let results = self.peer_manager.connect_all().await;
290 let connected = results.iter().filter(|r| r.is_ok()).count();
291 let failed = results.iter().filter(|r| r.is_err()).count();
292
293 metrics::set_connected_peers(connected);
295
296 if failed > 0 {
297 warn!(connected, failed, "Some peer connections failed");
298 }
299
300 if connected == 0 && !self.config.peers.is_empty() {
301 error!("Failed to connect to any peers");
302 let _ = self.state_tx.send(EngineState::Failed);
303 metrics::set_engine_state("Failed");
304 return Err(ReplicationError::Internal(
305 "No peers connected".to_string(),
306 ));
307 }
308
309 self.spawn_hot_path_tasks().await;
311
312 if self.config.settings.cold_path.enabled {
314 self.spawn_cold_path_task().await;
315 }
316
317 self.spawn_cursor_flush_task().await;
319
320 if self.config.settings.peer_health.enabled {
322 self.spawn_peer_health_task().await;
323 }
324
325 let _ = self.state_tx.send(EngineState::Running);
326 metrics::set_engine_state("Running");
327 info!(
328 connected,
329 total = self.config.peers.len(),
330 "Replication engine running"
331 );
332
333 Ok(())
334 }
335
336 async fn spawn_hot_path_tasks(&self) {
338 let peers = self.peer_manager.all();
339 let mut handles = self.hot_path_handles.write().await;
340
341 let rate_limiter: Option<Arc<RateLimiter>> = self.config.settings.hot_path
343 .rate_limit_config()
344 .map(|cfg| {
345 info!(
346 rate_per_sec = cfg.refill_rate,
347 burst = cfg.burst_size,
348 "Rate limiting enabled for hot path"
349 );
350 Arc::new(RateLimiter::new(cfg))
351 });
352
353 for peer in peers {
354 if !peer.is_connected().await {
355 continue;
356 }
357
358 let peer_id = peer.node_id().to_string();
359 let peer = Arc::clone(&peer);
360 let cursor_store = Arc::clone(&self.cursor_store);
361 let sync_engine = Arc::clone(&self.sync_engine);
362 let circuit = Arc::clone(&self.circuit);
363 let shutdown_rx = self.shutdown_rx.clone();
364 let config = self.config.settings.hot_path.clone();
365 let catching_up_count = Arc::clone(&self.catching_up_count);
366 let rate_limiter = rate_limiter.clone();
367
368 let handle = tokio::spawn(async move {
369 hot_path::run_tailer(peer, cursor_store, sync_engine, circuit, config, shutdown_rx, catching_up_count, rate_limiter).await;
370 });
371
372 info!(peer_id = %peer_id, "Spawned hot path tailer");
373 handles.push(handle);
374 }
375 }
376
377 async fn spawn_cold_path_task(&self)
379 where
380 S: Send + Sync + 'static,
381 {
382 let sync_engine = Arc::clone(&self.sync_engine);
383 let peer_manager = Arc::clone(&self.peer_manager);
384 let shutdown_rx = self.shutdown_rx.clone();
385 let config = self.config.settings.cold_path.clone();
386 let catching_up_count = Arc::clone(&self.catching_up_count);
387
388 let handle = tokio::spawn(async move {
389 cold_path::run_repair(sync_engine, peer_manager, config, shutdown_rx, catching_up_count).await;
390 });
391
392 info!("Spawned cold path repair task");
393 self.hot_path_handles.write().await.push(handle);
394 }
395
396 async fn spawn_cursor_flush_task(&self) {
400 let cursor_store = Arc::clone(&self.cursor_store);
401 let mut shutdown_rx = self.shutdown_rx.clone();
402
403 let handle = tokio::spawn(async move {
404 let flush_interval = std::time::Duration::from_secs(5);
405 let mut timer = tokio::time::interval(flush_interval);
406
407 loop {
408 tokio::select! {
409 _ = timer.tick() => {
410 let store_guard = cursor_store.read().await;
411 if let Some(ref store) = *store_guard {
412 if let Err(e) = store.flush_dirty().await {
413 warn!(error = %e, "Failed to flush cursors");
414 }
415 }
416 }
417 _ = shutdown_rx.changed() => {
418 if *shutdown_rx.borrow() {
419 debug!("Cursor flush task stopping");
420 break;
421 }
422 }
423 }
424 }
425 });
426
427 debug!("Spawned cursor flush task");
428 self.hot_path_handles.write().await.push(handle);
429 }
430
431 async fn spawn_peer_health_task(&self) {
436 let peer_manager = Arc::clone(&self.peer_manager);
437 let mut shutdown_rx = self.shutdown_rx.clone();
438 let config = self.config.settings.peer_health.clone();
439
440 let handle = tokio::spawn(async move {
441 let ping_interval = std::time::Duration::from_secs(config.ping_interval_sec);
442 let idle_threshold_ms = config.idle_threshold_sec * 1000;
443 let mut timer = tokio::time::interval(ping_interval);
444
445 info!(
446 ping_interval_sec = config.ping_interval_sec,
447 idle_threshold_sec = config.idle_threshold_sec,
448 "Starting peer health check task"
449 );
450
451 loop {
452 tokio::select! {
453 _ = timer.tick() => {
454 for peer in peer_manager.all() {
456 if !peer.is_connected().await {
457 continue;
458 }
459
460 let idle_ms = peer.millis_since_success();
462 if idle_ms < idle_threshold_ms {
463 continue;
464 }
465
466 let peer_id = peer.node_id().to_string();
467 debug!(
468 peer_id = %peer_id,
469 idle_ms,
470 "Pinging idle peer"
471 );
472
473 match peer.ping().await {
474 Ok(latency) => {
475 debug!(
476 peer_id = %peer_id,
477 latency_ms = latency.as_millis(),
478 "Peer ping successful"
479 );
480 }
481 Err(e) => {
482 warn!(
483 peer_id = %peer_id,
484 error = %e,
485 "Peer ping failed"
486 );
487 peer.mark_disconnected().await;
489 }
490 }
491 }
492 }
493 _ = shutdown_rx.changed() => {
494 if *shutdown_rx.borrow() {
495 debug!("Peer health task stopping");
496 break;
497 }
498 }
499 }
500 }
501
502 info!("Peer health check task stopped");
503 });
504
505 info!("Spawned peer health check task");
506 self.hot_path_handles.write().await.push(handle);
507 }
508
509 pub async fn shutdown(&mut self) {
517 info!("Shutting down replication engine");
518 let _ = self.state_tx.send(EngineState::ShuttingDown);
519 metrics::set_engine_state("ShuttingDown");
520
521 let _ = self.shutdown_tx.send(true);
523
524 let handles: Vec<_> = {
526 let mut guard = self.hot_path_handles.write().await;
527 std::mem::take(&mut *guard)
528 };
529
530 let task_count = handles.len();
531 if task_count > 0 {
532 info!(task_count, "Waiting for tasks to drain and complete");
533 }
534
535 let drain_timeout = std::time::Duration::from_secs(10);
537 for (i, handle) in handles.into_iter().enumerate() {
538 match tokio::time::timeout(drain_timeout, handle).await {
539 Ok(Ok(())) => {
540 debug!(task = i + 1, "Task completed gracefully");
541 }
542 Ok(Err(e)) => {
543 warn!(task = i + 1, error = %e, "Task panicked during shutdown");
544 }
545 Err(_) => {
546 warn!(task = i + 1, "Task timed out during shutdown (batch may be lost)");
547 }
548 }
549 }
550
551 self.peer_manager.shutdown_all();
553 metrics::set_connected_peers(0);
554
555 if let Some(cursor_store) = self.cursor_store.write().await.take() {
557 cursor_store.close().await;
558 }
559
560 let _ = self.state_tx.send(EngineState::Stopped);
561 metrics::set_engine_state("Stopped");
562 info!("Replication engine stopped");
563 }
564
565 pub fn peer_manager(&self) -> &Arc<PeerManager> {
567 &self.peer_manager
568 }
569
570 pub fn node_id(&self) -> &str {
572 &self.config.local_node_id
573 }
574}
575
576#[cfg(test)]
577mod tests {
578 use super::*;
579 use crate::config::{PeerConfig, CursorConfig};
580
581 fn test_config() -> ReplicationConfig {
582 ReplicationConfig {
583 local_node_id: "test-node".to_string(),
584 peers: vec![],
585 settings: Default::default(),
586 cursor: CursorConfig::in_memory(),
587 }
588 }
589
590 #[test]
591 fn test_engine_initial_state() {
592 let (_tx, rx) = watch::channel(test_config());
593 let engine = ReplicationEngine::new(test_config(), rx);
594
595 assert_eq!(engine.state(), EngineState::Created);
596 assert!(!engine.is_running());
597 assert_eq!(engine.node_id(), "test-node");
598 }
599
600 #[test]
601 fn test_engine_state_receiver() {
602 let (_tx, rx) = watch::channel(test_config());
603 let engine = ReplicationEngine::new(test_config(), rx);
604
605 let state_rx = engine.state_receiver();
606 assert_eq!(*state_rx.borrow(), EngineState::Created);
607 }
608
609 #[test]
610 fn test_engine_with_sync_engine() {
611 let (_tx, rx) = watch::channel(test_config());
612 let sync_engine = Arc::new(NoOpSyncEngine);
613 let engine = ReplicationEngine::with_sync_engine(
614 test_config(),
615 rx,
616 sync_engine,
617 );
618
619 assert_eq!(engine.state(), EngineState::Created);
620 let _ = engine.sync_engine();
622 }
623
624 #[test]
625 fn test_engine_circuit_accessor() {
626 let (_tx, rx) = watch::channel(test_config());
627 let engine = ReplicationEngine::new(test_config(), rx);
628
629 let circuit = engine.circuit();
631 let _ = circuit.clone();
633 }
634
635 #[test]
636 fn test_engine_peer_manager_accessor() {
637 let (_tx, rx) = watch::channel(test_config());
638 let engine = ReplicationEngine::new(test_config(), rx);
639
640 let pm = engine.peer_manager();
641 assert!(pm.all().is_empty());
643 }
644
645 #[test]
646 fn test_engine_with_peers() {
647 let mut config = test_config();
648 config.peers.push(PeerConfig::for_testing("peer-1", "redis://localhost:6379"));
649 config.peers.push(PeerConfig::for_testing("peer-2", "redis://localhost:6380"));
650
651 let (_tx, rx) = watch::channel(config.clone());
652 let engine = ReplicationEngine::new(config, rx);
653
654 let pm = engine.peer_manager();
655 assert_eq!(pm.all().len(), 2);
657 }
658
659 #[tokio::test]
660 async fn test_engine_start_invalid_state() {
661 let (_tx, rx) = watch::channel(test_config());
662 let mut engine = ReplicationEngine::new(test_config(), rx);
663
664 let _ = engine.state_tx.send(EngineState::Running);
666
667 let result = engine.start().await;
669 assert!(result.is_err());
670
671 if let Err(ReplicationError::InvalidState { expected, actual }) = result {
672 assert_eq!(expected, "Created");
673 assert_eq!(actual, "Running");
674 } else {
675 panic!("Expected InvalidState error");
676 }
677 }
678
679 #[tokio::test]
680 async fn test_engine_shutdown_from_created() {
681 let (_tx, rx) = watch::channel(test_config());
682 let mut engine = ReplicationEngine::new(test_config(), rx);
683
684 engine.shutdown().await;
686
687 assert_eq!(engine.state(), EngineState::Stopped);
688 assert!(!engine.is_running());
689 }
690
691 #[test]
692 fn test_engine_state_is_running() {
693 let (_tx, rx) = watch::channel(test_config());
694 let engine = ReplicationEngine::new(test_config(), rx);
695
696 assert!(!engine.is_running());
698
699 let _ = engine.state_tx.send(EngineState::Running);
701 assert!(engine.is_running());
702
703 let _ = engine.state_tx.send(EngineState::Stopped);
705 assert!(!engine.is_running());
706 }
707}