replication_engine/coordinator/
mod.rs1mod types;
21mod hot_path;
22mod cold_path;
23
24pub use types::{EngineState, HealthCheck, PeerHealth};
25
26use crate::circuit_breaker::SyncEngineCircuit;
27use crate::config::ReplicationEngineConfig;
28use crate::cursor::CursorStore;
29use crate::error::{ReplicationError, Result};
30use crate::metrics;
31use crate::peer::PeerManager;
32use crate::resilience::{RetryConfig, RateLimiter};
33use crate::sync_engine::{SyncEngineRef, NoOpSyncEngine};
34use std::sync::Arc;
35use std::sync::atomic::AtomicUsize;
36use tokio::sync::{watch, RwLock};
37use tracing::{debug, error, info, warn};
38
39pub struct ReplicationEngine<S: SyncEngineRef = NoOpSyncEngine> {
54 config: ReplicationEngineConfig,
56
57 #[allow(dead_code)]
59 config_rx: watch::Receiver<ReplicationEngineConfig>,
60 state_tx: watch::Sender<EngineState>,
62
63 state_rx: watch::Receiver<EngineState>,
65
66 sync_engine: Arc<S>,
68
69 circuit: Arc<SyncEngineCircuit>,
71
72 peer_manager: Arc<PeerManager>,
74
75 cursor_store: Arc<RwLock<Option<CursorStore>>>,
77
78 shutdown_tx: watch::Sender<bool>,
80
81 shutdown_rx: watch::Receiver<bool>,
83
84 catching_up_count: Arc<AtomicUsize>,
87
88 hot_path_handles: RwLock<Vec<tokio::task::JoinHandle<()>>>,
90}
91
92impl ReplicationEngine<NoOpSyncEngine> {
93 pub fn new(
98 config: ReplicationEngineConfig,
99 config_rx: watch::Receiver<ReplicationEngineConfig>,
100 ) -> Self {
101 Self::with_sync_engine(config, config_rx, Arc::new(NoOpSyncEngine))
102 }
103}
104
105impl<S: SyncEngineRef> ReplicationEngine<S> {
106 pub fn with_sync_engine(
115 config: ReplicationEngineConfig,
116 config_rx: watch::Receiver<ReplicationEngineConfig>,
117 sync_engine: Arc<S>,
118 ) -> Self {
119 let (state_tx, state_rx) = watch::channel(EngineState::Created);
120 let (shutdown_tx, shutdown_rx) = watch::channel(false);
121
122 let retry_config = RetryConfig::daemon();
124 let peer_manager = Arc::new(PeerManager::new(retry_config));
125
126 for peer_config in &config.peers {
128 peer_manager.add_peer(peer_config.clone());
129 }
130
131 Self {
132 config,
133 config_rx,
134 state_tx,
135 state_rx,
136 sync_engine,
137 circuit: Arc::new(SyncEngineCircuit::new()),
138 peer_manager,
139 cursor_store: Arc::new(RwLock::new(None)),
140 shutdown_tx,
141 shutdown_rx,
142 catching_up_count: Arc::new(AtomicUsize::new(0)),
143 hot_path_handles: RwLock::new(Vec::new()),
144 }
145 }
146
147 pub fn sync_engine(&self) -> &Arc<S> {
149 &self.sync_engine
150 }
151
152 pub fn circuit(&self) -> &Arc<SyncEngineCircuit> {
154 &self.circuit
155 }
156
157 pub fn state(&self) -> EngineState {
159 *self.state_rx.borrow()
160 }
161
162 pub fn state_receiver(&self) -> watch::Receiver<EngineState> {
164 self.state_rx.clone()
165 }
166
167 pub fn is_running(&self) -> bool {
169 matches!(self.state(), EngineState::Running)
170 }
171
172 pub async fn health_check(&self) -> HealthCheck {
204 use crate::peer::PeerCircuitState;
205
206 let state = self.state();
207 let sync_engine_accepting_writes = self.sync_engine.should_accept_writes();
208
209 let all_peers = self.peer_manager.all();
211 let peers_total = all_peers.len();
212 let mut peers = Vec::with_capacity(peers_total);
213 let mut peers_connected = 0;
214 let mut peers_circuit_open = 0;
215 let peers_catching_up = self.catching_up_count.load(std::sync::atomic::Ordering::Relaxed);
216
217 for peer in all_peers {
218 let connected = peer.is_connected().await;
219 let circuit_state = peer.circuit_state().await;
220 let circuit_open = circuit_state == PeerCircuitState::Open;
221
222 if connected {
223 peers_connected += 1;
224 }
225 if circuit_open {
226 peers_circuit_open += 1;
227 }
228
229 peers.push(PeerHealth {
230 node_id: peer.node_id().to_string(),
231 connected,
232 circuit_state,
233 circuit_open,
234 failure_count: peer.failure_count(),
235 millis_since_success: peer.millis_since_success(),
236 lag_ms: None, catching_up: false, });
239 }
240
241 let ready = state == EngineState::Running && peers_connected > 0;
243 let healthy = ready && sync_engine_accepting_writes;
244
245 HealthCheck {
246 state,
247 ready,
248 sync_engine_accepting_writes,
249 peers_total,
250 peers_connected,
251 peers_circuit_open,
252 peers_catching_up,
253 peers,
254 healthy,
255 }
256 }
257
258 pub async fn start(&mut self) -> Result<()> {
265 if self.state() != EngineState::Created {
266 return Err(ReplicationError::InvalidState {
267 expected: "Created".to_string(),
268 actual: format!("{:?}", self.state()),
269 });
270 }
271
272 info!(
273 node_id = %self.config.local_node_id,
274 peer_count = self.config.peers.len(),
275 "Starting replication engine"
276 );
277
278 let _ = self.state_tx.send(EngineState::Connecting);
280 metrics::set_engine_state("Connecting");
281
282 let cursor_store = CursorStore::new(&self.config.cursor.sqlite_path).await?;
284 *self.cursor_store.write().await = Some(cursor_store);
285 info!(path = %self.config.cursor.sqlite_path, "Cursor store initialized");
286
287 let results = self.peer_manager.connect_all().await;
289 let connected = results.iter().filter(|r| r.is_ok()).count();
290 let failed = results.iter().filter(|r| r.is_err()).count();
291
292 metrics::set_connected_peers(connected);
294
295 if failed > 0 {
296 warn!(connected, failed, "Some peer connections failed");
297 }
298
299 if connected == 0 && !self.config.peers.is_empty() {
300 error!("Failed to connect to any peers");
301 let _ = self.state_tx.send(EngineState::Failed);
302 metrics::set_engine_state("Failed");
303 return Err(ReplicationError::Internal(
304 "No peers connected".to_string(),
305 ));
306 }
307
308 self.spawn_hot_path_tasks().await;
310
311 if self.config.settings.cold_path.enabled {
313 self.spawn_cold_path_task().await;
314 }
315
316 self.spawn_cursor_flush_task().await;
318
319 if self.config.settings.peer_health.enabled {
321 self.spawn_peer_health_task().await;
322 }
323
324 let _ = self.state_tx.send(EngineState::Running);
325 metrics::set_engine_state("Running");
326 info!(
327 connected,
328 total = self.config.peers.len(),
329 "Replication engine running"
330 );
331
332 Ok(())
333 }
334
335 async fn spawn_hot_path_tasks(&self) {
337 let peers = self.peer_manager.all();
338 let mut handles = self.hot_path_handles.write().await;
339
340 let rate_limiter: Option<Arc<RateLimiter>> = self.config.settings.hot_path
342 .rate_limit_config()
343 .map(|cfg| {
344 info!(
345 rate_per_sec = cfg.refill_rate,
346 burst = cfg.burst_size,
347 "Rate limiting enabled for hot path"
348 );
349 Arc::new(RateLimiter::new(cfg))
350 });
351
352 for peer in peers {
353 if !peer.is_connected().await {
354 continue;
355 }
356
357 let peer_id = peer.node_id().to_string();
358 let peer = Arc::clone(&peer);
359 let cursor_store = Arc::clone(&self.cursor_store);
360 let sync_engine = Arc::clone(&self.sync_engine);
361 let circuit = Arc::clone(&self.circuit);
362 let shutdown_rx = self.shutdown_rx.clone();
363 let config = self.config.settings.hot_path.clone();
364 let catching_up_count = Arc::clone(&self.catching_up_count);
365 let rate_limiter = rate_limiter.clone();
366
367 let handle = tokio::spawn(async move {
368 hot_path::run_tailer(peer, cursor_store, sync_engine, circuit, config, shutdown_rx, catching_up_count, rate_limiter).await;
369 });
370
371 info!(peer_id = %peer_id, "Spawned hot path tailer");
372 handles.push(handle);
373 }
374 }
375
376 async fn spawn_cold_path_task(&self)
378 where
379 S: Send + Sync + 'static,
380 {
381 let sync_engine = Arc::clone(&self.sync_engine);
382 let peer_manager = Arc::clone(&self.peer_manager);
383 let shutdown_rx = self.shutdown_rx.clone();
384 let config = self.config.settings.cold_path.clone();
385 let catching_up_count = Arc::clone(&self.catching_up_count);
386
387 let handle = tokio::spawn(async move {
388 cold_path::run_repair(sync_engine, peer_manager, config, shutdown_rx, catching_up_count).await;
389 });
390
391 info!("Spawned cold path repair task");
392 self.hot_path_handles.write().await.push(handle);
393 }
394
395 async fn spawn_cursor_flush_task(&self) {
399 let cursor_store = Arc::clone(&self.cursor_store);
400 let mut shutdown_rx = self.shutdown_rx.clone();
401
402 let handle = tokio::spawn(async move {
403 let _ = shutdown_rx.borrow_and_update();
405
406 let flush_interval = std::time::Duration::from_secs(5);
407 let mut timer = tokio::time::interval(flush_interval);
408 timer.set_missed_tick_behavior(tokio::time::MissedTickBehavior::Skip);
409
410 loop {
411 tokio::select! {
412 biased;
413
414 result = shutdown_rx.changed() => {
415 if result.is_err() || *shutdown_rx.borrow() {
416 debug!("Cursor flush task stopping");
417 break;
418 }
419 continue;
420 }
421
422 _ = timer.tick() => {
423 let store_guard = cursor_store.read().await;
424 if let Some(ref store) = *store_guard {
425 if let Err(e) = store.flush_dirty().await {
426 warn!(error = %e, "Failed to flush cursors");
427 }
428 }
429 }
430 }
431 }
432 });
433
434 debug!("Spawned cursor flush task");
435 self.hot_path_handles.write().await.push(handle);
436 }
437
438 async fn spawn_peer_health_task(&self) {
443 let peer_manager = Arc::clone(&self.peer_manager);
444 let mut shutdown_rx = self.shutdown_rx.clone();
445 let config = self.config.settings.peer_health.clone();
446
447 let handle = tokio::spawn(async move {
448 let _ = shutdown_rx.borrow_and_update();
450
451 let ping_interval = std::time::Duration::from_secs(config.ping_interval_sec);
452 let idle_threshold_ms = config.idle_threshold_sec * 1000;
453 let mut timer = tokio::time::interval(ping_interval);
454
455 info!(
456 ping_interval_sec = config.ping_interval_sec,
457 idle_threshold_sec = config.idle_threshold_sec,
458 "Starting peer health check task"
459 );
460
461 loop {
462 tokio::select! {
463 biased;
464
465 result = shutdown_rx.changed() => {
466 if result.is_err() || *shutdown_rx.borrow() {
467 debug!("Peer health task stopping");
468 break;
469 }
470 continue;
471 }
472
473 _ = timer.tick() => {
474 for peer in peer_manager.all() {
476 if !peer.is_connected().await {
477 continue;
478 }
479
480 let idle_ms = peer.millis_since_success();
482 if idle_ms < idle_threshold_ms {
483 continue;
484 }
485
486 let peer_id = peer.node_id().to_string();
487 debug!(
488 peer_id = %peer_id,
489 idle_ms,
490 "Pinging idle peer"
491 );
492
493 match peer.ping().await {
494 Ok(latency) => {
495 debug!(
496 peer_id = %peer_id,
497 latency_ms = latency.as_millis(),
498 "Peer ping successful"
499 );
500 }
501 Err(e) => {
502 warn!(
503 peer_id = %peer_id,
504 error = %e,
505 "Peer ping failed"
506 );
507 peer.mark_disconnected().await;
509 }
510 }
511 }
512 }
513 }
514 }
515
516 info!("Peer health check task stopped");
517 });
518
519 info!("Spawned peer health check task");
520 self.hot_path_handles.write().await.push(handle);
521 }
522
523 pub async fn shutdown(&mut self) {
531 info!("Shutting down replication engine");
532 let _ = self.state_tx.send(EngineState::ShuttingDown);
533 metrics::set_engine_state("ShuttingDown");
534
535 let _ = self.shutdown_tx.send(true);
537
538 let handles: Vec<_> = {
540 let mut guard = self.hot_path_handles.write().await;
541 std::mem::take(&mut *guard)
542 };
543
544 let task_count = handles.len();
545 if task_count > 0 {
546 info!(task_count, "Waiting for tasks to drain and complete");
547 }
548
549 let drain_timeout = std::time::Duration::from_secs(10);
551 for (i, handle) in handles.into_iter().enumerate() {
552 match tokio::time::timeout(drain_timeout, handle).await {
553 Ok(Ok(())) => {
554 debug!(task = i + 1, "Task completed gracefully");
555 }
556 Ok(Err(e)) => {
557 warn!(task = i + 1, error = %e, "Task panicked during shutdown");
558 }
559 Err(_) => {
560 warn!(task = i + 1, "Task timed out during shutdown (batch may be lost)");
561 }
562 }
563 }
564
565 self.peer_manager.shutdown_all();
567 metrics::set_connected_peers(0);
568
569 if let Some(cursor_store) = self.cursor_store.write().await.take() {
571 cursor_store.close().await;
572 }
573
574 let _ = self.state_tx.send(EngineState::Stopped);
575 metrics::set_engine_state("Stopped");
576 info!("Replication engine stopped");
577 }
578
579 pub fn peer_manager(&self) -> &Arc<PeerManager> {
581 &self.peer_manager
582 }
583
584 pub fn node_id(&self) -> &str {
586 &self.config.local_node_id
587 }
588}
589
590#[cfg(test)]
591mod tests {
592 use super::*;
593 use crate::config::{PeerConfig, CursorConfig};
594
595 fn test_config() -> ReplicationEngineConfig {
596 ReplicationEngineConfig {
597 local_node_id: "test-node".to_string(),
598 peers: vec![],
599 settings: Default::default(),
600 cursor: CursorConfig::in_memory(),
601 }
602 }
603
604 #[test]
605 fn test_engine_initial_state() {
606 let (_tx, rx) = watch::channel(test_config());
607 let engine = ReplicationEngine::new(test_config(), rx);
608
609 assert_eq!(engine.state(), EngineState::Created);
610 assert!(!engine.is_running());
611 assert_eq!(engine.node_id(), "test-node");
612 }
613
614 #[test]
615 fn test_engine_state_receiver() {
616 let (_tx, rx) = watch::channel(test_config());
617 let engine = ReplicationEngine::new(test_config(), rx);
618
619 let state_rx = engine.state_receiver();
620 assert_eq!(*state_rx.borrow(), EngineState::Created);
621 }
622
623 #[test]
624 fn test_engine_with_sync_engine() {
625 let (_tx, rx) = watch::channel(test_config());
626 let sync_engine = Arc::new(NoOpSyncEngine);
627 let engine = ReplicationEngine::with_sync_engine(
628 test_config(),
629 rx,
630 sync_engine,
631 );
632
633 assert_eq!(engine.state(), EngineState::Created);
634 let _ = engine.sync_engine();
636 }
637
638 #[test]
639 fn test_engine_circuit_accessor() {
640 let (_tx, rx) = watch::channel(test_config());
641 let engine = ReplicationEngine::new(test_config(), rx);
642
643 let circuit = engine.circuit();
645 let _ = circuit.clone();
647 }
648
649 #[test]
650 fn test_engine_peer_manager_accessor() {
651 let (_tx, rx) = watch::channel(test_config());
652 let engine = ReplicationEngine::new(test_config(), rx);
653
654 let pm = engine.peer_manager();
655 assert!(pm.all().is_empty());
657 }
658
659 #[test]
660 fn test_engine_with_peers() {
661 let mut config = test_config();
662 config.peers.push(PeerConfig::for_testing("peer-1", "redis://localhost:6379"));
663 config.peers.push(PeerConfig::for_testing("peer-2", "redis://localhost:6380"));
664
665 let (_tx, rx) = watch::channel(config.clone());
666 let engine = ReplicationEngine::new(config, rx);
667
668 let pm = engine.peer_manager();
669 assert_eq!(pm.all().len(), 2);
671 }
672
673 #[tokio::test]
674 async fn test_engine_start_invalid_state() {
675 let (_tx, rx) = watch::channel(test_config());
676 let mut engine = ReplicationEngine::new(test_config(), rx);
677
678 let _ = engine.state_tx.send(EngineState::Running);
680
681 let result = engine.start().await;
683 assert!(result.is_err());
684
685 if let Err(ReplicationError::InvalidState { expected, actual }) = result {
686 assert_eq!(expected, "Created");
687 assert_eq!(actual, "Running");
688 } else {
689 panic!("Expected InvalidState error");
690 }
691 }
692
693 #[tokio::test]
694 async fn test_engine_shutdown_from_created() {
695 let (_tx, rx) = watch::channel(test_config());
696 let mut engine = ReplicationEngine::new(test_config(), rx);
697
698 engine.shutdown().await;
700
701 assert_eq!(engine.state(), EngineState::Stopped);
702 assert!(!engine.is_running());
703 }
704
705 #[test]
706 fn test_engine_state_is_running() {
707 let (_tx, rx) = watch::channel(test_config());
708 let engine = ReplicationEngine::new(test_config(), rx);
709
710 assert!(!engine.is_running());
712
713 let _ = engine.state_tx.send(EngineState::Running);
715 assert!(engine.is_running());
716
717 let _ = engine.state_tx.send(EngineState::Stopped);
719 assert!(!engine.is_running());
720 }
721}