replication_engine/coordinator/
mod.rs1mod types;
21mod hot_path;
22mod cold_path;
23
24pub use types::{EngineState, HealthCheck, PeerHealth};
25
26use crate::circuit_breaker::SyncEngineCircuit;
27use crate::config::ReplicationConfig;
28use crate::cursor::CursorStore;
29use crate::error::{ReplicationError, Result};
30use crate::metrics;
31use crate::peer::PeerManager;
32use crate::resilience::{RetryConfig, RateLimiter};
33use crate::sync_engine::{SyncEngineRef, NoOpSyncEngine};
34use std::sync::Arc;
35use std::sync::atomic::AtomicUsize;
36use tokio::sync::{watch, RwLock};
37use tracing::{debug, error, info, warn};
38
39pub struct ReplicationEngine<S: SyncEngineRef = NoOpSyncEngine> {
54 config: ReplicationConfig,
56
57 #[allow(dead_code)]
59 config_rx: watch::Receiver<ReplicationConfig>,
60
61 state_tx: watch::Sender<EngineState>,
63
64 state_rx: watch::Receiver<EngineState>,
66
67 sync_engine: Arc<S>,
69
70 circuit: Arc<SyncEngineCircuit>,
72
73 peer_manager: Arc<PeerManager>,
75
76 cursor_store: Arc<RwLock<Option<CursorStore>>>,
78
79 shutdown_tx: watch::Sender<bool>,
81
82 shutdown_rx: watch::Receiver<bool>,
84
85 catching_up_count: Arc<AtomicUsize>,
88
89 hot_path_handles: RwLock<Vec<tokio::task::JoinHandle<()>>>,
91}
92
93impl ReplicationEngine<NoOpSyncEngine> {
94 pub fn new(
99 config: ReplicationConfig,
100 config_rx: watch::Receiver<ReplicationConfig>,
101 ) -> Self {
102 Self::with_sync_engine(config, config_rx, Arc::new(NoOpSyncEngine))
103 }
104}
105
106impl<S: SyncEngineRef> ReplicationEngine<S> {
107 pub fn with_sync_engine(
116 config: ReplicationConfig,
117 config_rx: watch::Receiver<ReplicationConfig>,
118 sync_engine: Arc<S>,
119 ) -> Self {
120 let (state_tx, state_rx) = watch::channel(EngineState::Created);
121 let (shutdown_tx, shutdown_rx) = watch::channel(false);
122
123 let retry_config = RetryConfig::daemon();
125 let peer_manager = Arc::new(PeerManager::new(retry_config));
126
127 for peer_config in &config.peers {
129 peer_manager.add_peer(peer_config.clone());
130 }
131
132 Self {
133 config,
134 config_rx,
135 state_tx,
136 state_rx,
137 sync_engine,
138 circuit: Arc::new(SyncEngineCircuit::new()),
139 peer_manager,
140 cursor_store: Arc::new(RwLock::new(None)),
141 shutdown_tx,
142 shutdown_rx,
143 catching_up_count: Arc::new(AtomicUsize::new(0)),
144 hot_path_handles: RwLock::new(Vec::new()),
145 }
146 }
147
148 pub fn sync_engine(&self) -> &Arc<S> {
150 &self.sync_engine
151 }
152
153 pub fn circuit(&self) -> &Arc<SyncEngineCircuit> {
155 &self.circuit
156 }
157
158 pub fn state(&self) -> EngineState {
160 *self.state_rx.borrow()
161 }
162
163 pub fn state_receiver(&self) -> watch::Receiver<EngineState> {
165 self.state_rx.clone()
166 }
167
168 pub fn is_running(&self) -> bool {
170 matches!(self.state(), EngineState::Running)
171 }
172
173 pub async fn health_check(&self) -> HealthCheck {
205 use crate::peer::PeerCircuitState;
206
207 let state = self.state();
208 let sync_engine_accepting_writes = self.sync_engine.should_accept_writes();
209
210 let all_peers = self.peer_manager.all();
212 let peers_total = all_peers.len();
213 let mut peers = Vec::with_capacity(peers_total);
214 let mut peers_connected = 0;
215 let mut peers_circuit_open = 0;
216 let peers_catching_up = self.catching_up_count.load(std::sync::atomic::Ordering::Relaxed);
217
218 for peer in all_peers {
219 let connected = peer.is_connected().await;
220 let circuit_state = peer.circuit_state().await;
221 let circuit_open = circuit_state == PeerCircuitState::Open;
222
223 if connected {
224 peers_connected += 1;
225 }
226 if circuit_open {
227 peers_circuit_open += 1;
228 }
229
230 peers.push(PeerHealth {
231 node_id: peer.node_id().to_string(),
232 connected,
233 circuit_state,
234 circuit_open,
235 failure_count: peer.failure_count(),
236 millis_since_success: peer.millis_since_success(),
237 lag_ms: None, catching_up: false, });
240 }
241
242 let ready = state == EngineState::Running && peers_connected > 0;
244 let healthy = ready && sync_engine_accepting_writes;
245
246 HealthCheck {
247 state,
248 ready,
249 sync_engine_accepting_writes,
250 peers_total,
251 peers_connected,
252 peers_circuit_open,
253 peers_catching_up,
254 peers,
255 healthy,
256 }
257 }
258
259 pub async fn start(&mut self) -> Result<()> {
266 if self.state() != EngineState::Created {
267 return Err(ReplicationError::InvalidState {
268 expected: "Created".to_string(),
269 actual: format!("{:?}", self.state()),
270 });
271 }
272
273 info!(
274 node_id = %self.config.local_node_id,
275 peer_count = self.config.peers.len(),
276 "Starting replication engine"
277 );
278
279 let _ = self.state_tx.send(EngineState::Connecting);
281 metrics::set_engine_state("Connecting");
282
283 let cursor_store = CursorStore::new(&self.config.cursor.sqlite_path).await?;
285 *self.cursor_store.write().await = Some(cursor_store);
286 info!(path = %self.config.cursor.sqlite_path, "Cursor store initialized");
287
288 let results = self.peer_manager.connect_all().await;
290 let connected = results.iter().filter(|r| r.is_ok()).count();
291 let failed = results.iter().filter(|r| r.is_err()).count();
292
293 metrics::set_connected_peers(connected);
295
296 if failed > 0 {
297 warn!(connected, failed, "Some peer connections failed");
298 }
299
300 if connected == 0 && !self.config.peers.is_empty() {
301 error!("Failed to connect to any peers");
302 let _ = self.state_tx.send(EngineState::Failed);
303 metrics::set_engine_state("Failed");
304 return Err(ReplicationError::Internal(
305 "No peers connected".to_string(),
306 ));
307 }
308
309 self.spawn_hot_path_tasks().await;
311
312 if self.config.settings.cold_path.enabled {
314 self.spawn_cold_path_task().await;
315 }
316
317 self.spawn_cursor_flush_task().await;
319
320 if self.config.settings.peer_health.enabled {
322 self.spawn_peer_health_task().await;
323 }
324
325 let _ = self.state_tx.send(EngineState::Running);
326 metrics::set_engine_state("Running");
327 info!(
328 connected,
329 total = self.config.peers.len(),
330 "Replication engine running"
331 );
332
333 Ok(())
334 }
335
336 async fn spawn_hot_path_tasks(&self) {
338 let peers = self.peer_manager.all();
339 let mut handles = self.hot_path_handles.write().await;
340
341 let rate_limiter: Option<Arc<RateLimiter>> = self.config.settings.hot_path
343 .rate_limit_config()
344 .map(|cfg| {
345 info!(
346 rate_per_sec = cfg.refill_rate,
347 burst = cfg.burst_size,
348 "Rate limiting enabled for hot path"
349 );
350 Arc::new(RateLimiter::new(cfg))
351 });
352
353 for peer in peers {
354 if !peer.is_connected().await {
355 continue;
356 }
357
358 let peer_id = peer.node_id().to_string();
359 let peer = Arc::clone(&peer);
360 let cursor_store = Arc::clone(&self.cursor_store);
361 let sync_engine = Arc::clone(&self.sync_engine);
362 let circuit = Arc::clone(&self.circuit);
363 let shutdown_rx = self.shutdown_rx.clone();
364 let config = self.config.settings.hot_path.clone();
365 let catching_up_count = Arc::clone(&self.catching_up_count);
366 let rate_limiter = rate_limiter.clone();
367
368 let handle = tokio::spawn(async move {
369 hot_path::run_tailer(peer, cursor_store, sync_engine, circuit, config, shutdown_rx, catching_up_count, rate_limiter).await;
370 });
371
372 info!(peer_id = %peer_id, "Spawned hot path tailer");
373 handles.push(handle);
374 }
375 }
376
377 async fn spawn_cold_path_task(&self)
379 where
380 S: Send + Sync + 'static,
381 {
382 let sync_engine = Arc::clone(&self.sync_engine);
383 let peer_manager = Arc::clone(&self.peer_manager);
384 let shutdown_rx = self.shutdown_rx.clone();
385 let config = self.config.settings.cold_path.clone();
386 let catching_up_count = Arc::clone(&self.catching_up_count);
387
388 let handle = tokio::spawn(async move {
389 cold_path::run_repair(sync_engine, peer_manager, config, shutdown_rx, catching_up_count).await;
390 });
391
392 info!("Spawned cold path repair task");
393 self.hot_path_handles.write().await.push(handle);
394 }
395
396 async fn spawn_cursor_flush_task(&self) {
400 let cursor_store = Arc::clone(&self.cursor_store);
401 let mut shutdown_rx = self.shutdown_rx.clone();
402
403 let handle = tokio::spawn(async move {
404 let _ = shutdown_rx.borrow_and_update();
406
407 let flush_interval = std::time::Duration::from_secs(5);
408 let mut timer = tokio::time::interval(flush_interval);
409 timer.set_missed_tick_behavior(tokio::time::MissedTickBehavior::Skip);
410
411 loop {
412 tokio::select! {
413 biased;
414
415 result = shutdown_rx.changed() => {
416 if result.is_err() || *shutdown_rx.borrow() {
417 debug!("Cursor flush task stopping");
418 break;
419 }
420 continue;
421 }
422
423 _ = timer.tick() => {
424 let store_guard = cursor_store.read().await;
425 if let Some(ref store) = *store_guard {
426 if let Err(e) = store.flush_dirty().await {
427 warn!(error = %e, "Failed to flush cursors");
428 }
429 }
430 }
431 }
432 }
433 });
434
435 debug!("Spawned cursor flush task");
436 self.hot_path_handles.write().await.push(handle);
437 }
438
439 async fn spawn_peer_health_task(&self) {
444 let peer_manager = Arc::clone(&self.peer_manager);
445 let mut shutdown_rx = self.shutdown_rx.clone();
446 let config = self.config.settings.peer_health.clone();
447
448 let handle = tokio::spawn(async move {
449 let _ = shutdown_rx.borrow_and_update();
451
452 let ping_interval = std::time::Duration::from_secs(config.ping_interval_sec);
453 let idle_threshold_ms = config.idle_threshold_sec * 1000;
454 let mut timer = tokio::time::interval(ping_interval);
455
456 info!(
457 ping_interval_sec = config.ping_interval_sec,
458 idle_threshold_sec = config.idle_threshold_sec,
459 "Starting peer health check task"
460 );
461
462 loop {
463 tokio::select! {
464 biased;
465
466 result = shutdown_rx.changed() => {
467 if result.is_err() || *shutdown_rx.borrow() {
468 debug!("Peer health task stopping");
469 break;
470 }
471 continue;
472 }
473
474 _ = timer.tick() => {
475 for peer in peer_manager.all() {
477 if !peer.is_connected().await {
478 continue;
479 }
480
481 let idle_ms = peer.millis_since_success();
483 if idle_ms < idle_threshold_ms {
484 continue;
485 }
486
487 let peer_id = peer.node_id().to_string();
488 debug!(
489 peer_id = %peer_id,
490 idle_ms,
491 "Pinging idle peer"
492 );
493
494 match peer.ping().await {
495 Ok(latency) => {
496 debug!(
497 peer_id = %peer_id,
498 latency_ms = latency.as_millis(),
499 "Peer ping successful"
500 );
501 }
502 Err(e) => {
503 warn!(
504 peer_id = %peer_id,
505 error = %e,
506 "Peer ping failed"
507 );
508 peer.mark_disconnected().await;
510 }
511 }
512 }
513 }
514 }
515 }
516
517 info!("Peer health check task stopped");
518 });
519
520 info!("Spawned peer health check task");
521 self.hot_path_handles.write().await.push(handle);
522 }
523
524 pub async fn shutdown(&mut self) {
532 info!("Shutting down replication engine");
533 let _ = self.state_tx.send(EngineState::ShuttingDown);
534 metrics::set_engine_state("ShuttingDown");
535
536 let _ = self.shutdown_tx.send(true);
538
539 let handles: Vec<_> = {
541 let mut guard = self.hot_path_handles.write().await;
542 std::mem::take(&mut *guard)
543 };
544
545 let task_count = handles.len();
546 if task_count > 0 {
547 info!(task_count, "Waiting for tasks to drain and complete");
548 }
549
550 let drain_timeout = std::time::Duration::from_secs(10);
552 for (i, handle) in handles.into_iter().enumerate() {
553 match tokio::time::timeout(drain_timeout, handle).await {
554 Ok(Ok(())) => {
555 debug!(task = i + 1, "Task completed gracefully");
556 }
557 Ok(Err(e)) => {
558 warn!(task = i + 1, error = %e, "Task panicked during shutdown");
559 }
560 Err(_) => {
561 warn!(task = i + 1, "Task timed out during shutdown (batch may be lost)");
562 }
563 }
564 }
565
566 self.peer_manager.shutdown_all();
568 metrics::set_connected_peers(0);
569
570 if let Some(cursor_store) = self.cursor_store.write().await.take() {
572 cursor_store.close().await;
573 }
574
575 let _ = self.state_tx.send(EngineState::Stopped);
576 metrics::set_engine_state("Stopped");
577 info!("Replication engine stopped");
578 }
579
580 pub fn peer_manager(&self) -> &Arc<PeerManager> {
582 &self.peer_manager
583 }
584
585 pub fn node_id(&self) -> &str {
587 &self.config.local_node_id
588 }
589}
590
591#[cfg(test)]
592mod tests {
593 use super::*;
594 use crate::config::{PeerConfig, CursorConfig};
595
596 fn test_config() -> ReplicationConfig {
597 ReplicationConfig {
598 local_node_id: "test-node".to_string(),
599 peers: vec![],
600 settings: Default::default(),
601 cursor: CursorConfig::in_memory(),
602 }
603 }
604
605 #[test]
606 fn test_engine_initial_state() {
607 let (_tx, rx) = watch::channel(test_config());
608 let engine = ReplicationEngine::new(test_config(), rx);
609
610 assert_eq!(engine.state(), EngineState::Created);
611 assert!(!engine.is_running());
612 assert_eq!(engine.node_id(), "test-node");
613 }
614
615 #[test]
616 fn test_engine_state_receiver() {
617 let (_tx, rx) = watch::channel(test_config());
618 let engine = ReplicationEngine::new(test_config(), rx);
619
620 let state_rx = engine.state_receiver();
621 assert_eq!(*state_rx.borrow(), EngineState::Created);
622 }
623
624 #[test]
625 fn test_engine_with_sync_engine() {
626 let (_tx, rx) = watch::channel(test_config());
627 let sync_engine = Arc::new(NoOpSyncEngine);
628 let engine = ReplicationEngine::with_sync_engine(
629 test_config(),
630 rx,
631 sync_engine,
632 );
633
634 assert_eq!(engine.state(), EngineState::Created);
635 let _ = engine.sync_engine();
637 }
638
639 #[test]
640 fn test_engine_circuit_accessor() {
641 let (_tx, rx) = watch::channel(test_config());
642 let engine = ReplicationEngine::new(test_config(), rx);
643
644 let circuit = engine.circuit();
646 let _ = circuit.clone();
648 }
649
650 #[test]
651 fn test_engine_peer_manager_accessor() {
652 let (_tx, rx) = watch::channel(test_config());
653 let engine = ReplicationEngine::new(test_config(), rx);
654
655 let pm = engine.peer_manager();
656 assert!(pm.all().is_empty());
658 }
659
660 #[test]
661 fn test_engine_with_peers() {
662 let mut config = test_config();
663 config.peers.push(PeerConfig::for_testing("peer-1", "redis://localhost:6379"));
664 config.peers.push(PeerConfig::for_testing("peer-2", "redis://localhost:6380"));
665
666 let (_tx, rx) = watch::channel(config.clone());
667 let engine = ReplicationEngine::new(config, rx);
668
669 let pm = engine.peer_manager();
670 assert_eq!(pm.all().len(), 2);
672 }
673
674 #[tokio::test]
675 async fn test_engine_start_invalid_state() {
676 let (_tx, rx) = watch::channel(test_config());
677 let mut engine = ReplicationEngine::new(test_config(), rx);
678
679 let _ = engine.state_tx.send(EngineState::Running);
681
682 let result = engine.start().await;
684 assert!(result.is_err());
685
686 if let Err(ReplicationError::InvalidState { expected, actual }) = result {
687 assert_eq!(expected, "Created");
688 assert_eq!(actual, "Running");
689 } else {
690 panic!("Expected InvalidState error");
691 }
692 }
693
694 #[tokio::test]
695 async fn test_engine_shutdown_from_created() {
696 let (_tx, rx) = watch::channel(test_config());
697 let mut engine = ReplicationEngine::new(test_config(), rx);
698
699 engine.shutdown().await;
701
702 assert_eq!(engine.state(), EngineState::Stopped);
703 assert!(!engine.is_running());
704 }
705
706 #[test]
707 fn test_engine_state_is_running() {
708 let (_tx, rx) = watch::channel(test_config());
709 let engine = ReplicationEngine::new(test_config(), rx);
710
711 assert!(!engine.is_running());
713
714 let _ = engine.state_tx.send(EngineState::Running);
716 assert!(engine.is_running());
717
718 let _ = engine.state_tx.send(EngineState::Stopped);
720 assert!(!engine.is_running());
721 }
722}