replication_engine/coordinator/
mod.rs1mod types;
21mod hot_path;
22mod cold_path;
23
24pub use types::EngineState;
25
26use crate::circuit_breaker::SyncEngineCircuit;
27use crate::config::ReplicationConfig;
28use crate::cursor::CursorStore;
29use crate::error::{ReplicationError, Result};
30use crate::metrics;
31use crate::peer::PeerManager;
32use crate::resilience::{RetryConfig, RateLimiter};
33use crate::sync_engine::{SyncEngineRef, NoOpSyncEngine};
34use std::sync::Arc;
35use std::sync::atomic::AtomicUsize;
36use tokio::sync::{watch, RwLock};
37use tracing::{debug, error, info, warn};
38
39pub struct ReplicationEngine<S: SyncEngineRef = NoOpSyncEngine> {
54 config: ReplicationConfig,
56
57 #[allow(dead_code)]
59 config_rx: watch::Receiver<ReplicationConfig>,
60
61 state_tx: watch::Sender<EngineState>,
63
64 state_rx: watch::Receiver<EngineState>,
66
67 sync_engine: Arc<S>,
69
70 circuit: Arc<SyncEngineCircuit>,
72
73 peer_manager: Arc<PeerManager>,
75
76 cursor_store: Arc<RwLock<Option<CursorStore>>>,
78
79 shutdown_tx: watch::Sender<bool>,
81
82 shutdown_rx: watch::Receiver<bool>,
84
85 catching_up_count: Arc<AtomicUsize>,
88
89 hot_path_handles: RwLock<Vec<tokio::task::JoinHandle<()>>>,
91}
92
93impl ReplicationEngine<NoOpSyncEngine> {
94 pub fn new(
99 config: ReplicationConfig,
100 config_rx: watch::Receiver<ReplicationConfig>,
101 ) -> Self {
102 Self::with_sync_engine(config, config_rx, Arc::new(NoOpSyncEngine))
103 }
104}
105
106impl<S: SyncEngineRef> ReplicationEngine<S> {
107 pub fn with_sync_engine(
116 config: ReplicationConfig,
117 config_rx: watch::Receiver<ReplicationConfig>,
118 sync_engine: Arc<S>,
119 ) -> Self {
120 let (state_tx, state_rx) = watch::channel(EngineState::Created);
121 let (shutdown_tx, shutdown_rx) = watch::channel(false);
122
123 let retry_config = RetryConfig::daemon();
125 let peer_manager = Arc::new(PeerManager::new(retry_config));
126
127 for peer_config in &config.peers {
129 peer_manager.add_peer(peer_config.clone());
130 }
131
132 Self {
133 config,
134 config_rx,
135 state_tx,
136 state_rx,
137 sync_engine,
138 circuit: Arc::new(SyncEngineCircuit::new()),
139 peer_manager,
140 cursor_store: Arc::new(RwLock::new(None)),
141 shutdown_tx,
142 shutdown_rx,
143 catching_up_count: Arc::new(AtomicUsize::new(0)),
144 hot_path_handles: RwLock::new(Vec::new()),
145 }
146 }
147
148 pub fn sync_engine(&self) -> &Arc<S> {
150 &self.sync_engine
151 }
152
153 pub fn circuit(&self) -> &Arc<SyncEngineCircuit> {
155 &self.circuit
156 }
157
158 pub fn state(&self) -> EngineState {
160 *self.state_rx.borrow()
161 }
162
163 pub fn state_receiver(&self) -> watch::Receiver<EngineState> {
165 self.state_rx.clone()
166 }
167
168 pub fn is_running(&self) -> bool {
170 matches!(self.state(), EngineState::Running)
171 }
172
173 pub async fn start(&mut self) -> Result<()> {
180 if self.state() != EngineState::Created {
181 return Err(ReplicationError::InvalidState {
182 expected: "Created".to_string(),
183 actual: format!("{:?}", self.state()),
184 });
185 }
186
187 info!(
188 node_id = %self.config.local_node_id,
189 peer_count = self.config.peers.len(),
190 "Starting replication engine"
191 );
192
193 let _ = self.state_tx.send(EngineState::Connecting);
195 metrics::set_engine_state("Connecting");
196
197 let cursor_store = CursorStore::new(&self.config.cursor.sqlite_path).await?;
199 *self.cursor_store.write().await = Some(cursor_store);
200 info!(path = %self.config.cursor.sqlite_path, "Cursor store initialized");
201
202 let results = self.peer_manager.connect_all().await;
204 let connected = results.iter().filter(|r| r.is_ok()).count();
205 let failed = results.iter().filter(|r| r.is_err()).count();
206
207 metrics::set_connected_peers(connected);
209
210 if failed > 0 {
211 warn!(connected, failed, "Some peer connections failed");
212 }
213
214 if connected == 0 && !self.config.peers.is_empty() {
215 error!("Failed to connect to any peers");
216 let _ = self.state_tx.send(EngineState::Failed);
217 metrics::set_engine_state("Failed");
218 return Err(ReplicationError::Internal(
219 "No peers connected".to_string(),
220 ));
221 }
222
223 self.spawn_hot_path_tasks().await;
225
226 if self.config.settings.cold_path.enabled {
228 self.spawn_cold_path_task().await;
229 }
230
231 self.spawn_cursor_flush_task().await;
233
234 if self.config.settings.peer_health.enabled {
236 self.spawn_peer_health_task().await;
237 }
238
239 let _ = self.state_tx.send(EngineState::Running);
240 metrics::set_engine_state("Running");
241 info!(
242 connected,
243 total = self.config.peers.len(),
244 "Replication engine running"
245 );
246
247 Ok(())
248 }
249
250 async fn spawn_hot_path_tasks(&self) {
252 let peers = self.peer_manager.all();
253 let mut handles = self.hot_path_handles.write().await;
254
255 let rate_limiter: Option<Arc<RateLimiter>> = self.config.settings.hot_path
257 .rate_limit_config()
258 .map(|cfg| {
259 info!(
260 rate_per_sec = cfg.refill_rate,
261 burst = cfg.burst_size,
262 "Rate limiting enabled for hot path"
263 );
264 Arc::new(RateLimiter::new(cfg))
265 });
266
267 for peer in peers {
268 if !peer.is_connected().await {
269 continue;
270 }
271
272 let peer_id = peer.node_id().to_string();
273 let peer = Arc::clone(&peer);
274 let cursor_store = Arc::clone(&self.cursor_store);
275 let sync_engine = Arc::clone(&self.sync_engine);
276 let circuit = Arc::clone(&self.circuit);
277 let shutdown_rx = self.shutdown_rx.clone();
278 let config = self.config.settings.hot_path.clone();
279 let catching_up_count = Arc::clone(&self.catching_up_count);
280 let rate_limiter = rate_limiter.clone();
281
282 let handle = tokio::spawn(async move {
283 hot_path::run_tailer(peer, cursor_store, sync_engine, circuit, config, shutdown_rx, catching_up_count, rate_limiter).await;
284 });
285
286 info!(peer_id = %peer_id, "Spawned hot path tailer");
287 handles.push(handle);
288 }
289 }
290
291 async fn spawn_cold_path_task(&self)
293 where
294 S: Send + Sync + 'static,
295 {
296 let sync_engine = Arc::clone(&self.sync_engine);
297 let peer_manager = Arc::clone(&self.peer_manager);
298 let shutdown_rx = self.shutdown_rx.clone();
299 let config = self.config.settings.cold_path.clone();
300 let catching_up_count = Arc::clone(&self.catching_up_count);
301
302 let handle = tokio::spawn(async move {
303 cold_path::run_repair(sync_engine, peer_manager, config, shutdown_rx, catching_up_count).await;
304 });
305
306 info!("Spawned cold path repair task");
307 self.hot_path_handles.write().await.push(handle);
308 }
309
310 async fn spawn_cursor_flush_task(&self) {
314 let cursor_store = Arc::clone(&self.cursor_store);
315 let mut shutdown_rx = self.shutdown_rx.clone();
316
317 let handle = tokio::spawn(async move {
318 let flush_interval = std::time::Duration::from_secs(5);
319 let mut timer = tokio::time::interval(flush_interval);
320
321 loop {
322 tokio::select! {
323 _ = timer.tick() => {
324 let store_guard = cursor_store.read().await;
325 if let Some(ref store) = *store_guard {
326 if let Err(e) = store.flush_dirty().await {
327 warn!(error = %e, "Failed to flush cursors");
328 }
329 }
330 }
331 _ = shutdown_rx.changed() => {
332 if *shutdown_rx.borrow() {
333 debug!("Cursor flush task stopping");
334 break;
335 }
336 }
337 }
338 }
339 });
340
341 debug!("Spawned cursor flush task");
342 self.hot_path_handles.write().await.push(handle);
343 }
344
345 async fn spawn_peer_health_task(&self) {
350 let peer_manager = Arc::clone(&self.peer_manager);
351 let mut shutdown_rx = self.shutdown_rx.clone();
352 let config = self.config.settings.peer_health.clone();
353
354 let handle = tokio::spawn(async move {
355 let ping_interval = std::time::Duration::from_secs(config.ping_interval_sec);
356 let idle_threshold_ms = config.idle_threshold_sec * 1000;
357 let mut timer = tokio::time::interval(ping_interval);
358
359 info!(
360 ping_interval_sec = config.ping_interval_sec,
361 idle_threshold_sec = config.idle_threshold_sec,
362 "Starting peer health check task"
363 );
364
365 loop {
366 tokio::select! {
367 _ = timer.tick() => {
368 for peer in peer_manager.all() {
370 if !peer.is_connected().await {
371 continue;
372 }
373
374 let idle_ms = peer.millis_since_success();
376 if idle_ms < idle_threshold_ms {
377 continue;
378 }
379
380 let peer_id = peer.node_id().to_string();
381 debug!(
382 peer_id = %peer_id,
383 idle_ms,
384 "Pinging idle peer"
385 );
386
387 match peer.ping().await {
388 Ok(latency) => {
389 debug!(
390 peer_id = %peer_id,
391 latency_ms = latency.as_millis(),
392 "Peer ping successful"
393 );
394 }
395 Err(e) => {
396 warn!(
397 peer_id = %peer_id,
398 error = %e,
399 "Peer ping failed"
400 );
401 peer.mark_disconnected().await;
403 }
404 }
405 }
406 }
407 _ = shutdown_rx.changed() => {
408 if *shutdown_rx.borrow() {
409 debug!("Peer health task stopping");
410 break;
411 }
412 }
413 }
414 }
415
416 info!("Peer health check task stopped");
417 });
418
419 info!("Spawned peer health check task");
420 self.hot_path_handles.write().await.push(handle);
421 }
422
423 pub async fn shutdown(&mut self) {
431 info!("Shutting down replication engine");
432 let _ = self.state_tx.send(EngineState::ShuttingDown);
433 metrics::set_engine_state("ShuttingDown");
434
435 let _ = self.shutdown_tx.send(true);
437
438 let handles: Vec<_> = {
440 let mut guard = self.hot_path_handles.write().await;
441 std::mem::take(&mut *guard)
442 };
443
444 let task_count = handles.len();
445 if task_count > 0 {
446 info!(task_count, "Waiting for tasks to drain and complete");
447 }
448
449 let drain_timeout = std::time::Duration::from_secs(10);
451 for (i, handle) in handles.into_iter().enumerate() {
452 match tokio::time::timeout(drain_timeout, handle).await {
453 Ok(Ok(())) => {
454 debug!(task = i + 1, "Task completed gracefully");
455 }
456 Ok(Err(e)) => {
457 warn!(task = i + 1, error = %e, "Task panicked during shutdown");
458 }
459 Err(_) => {
460 warn!(task = i + 1, "Task timed out during shutdown (batch may be lost)");
461 }
462 }
463 }
464
465 self.peer_manager.shutdown_all();
467 metrics::set_connected_peers(0);
468
469 if let Some(cursor_store) = self.cursor_store.write().await.take() {
471 cursor_store.close().await;
472 }
473
474 let _ = self.state_tx.send(EngineState::Stopped);
475 metrics::set_engine_state("Stopped");
476 info!("Replication engine stopped");
477 }
478
479 pub fn peer_manager(&self) -> &Arc<PeerManager> {
481 &self.peer_manager
482 }
483
484 pub fn node_id(&self) -> &str {
486 &self.config.local_node_id
487 }
488}
489
490#[cfg(test)]
491mod tests {
492 use super::*;
493 use crate::config::{PeerConfig, CursorConfig};
494
495 fn test_config() -> ReplicationConfig {
496 ReplicationConfig {
497 local_node_id: "test-node".to_string(),
498 peers: vec![],
499 settings: Default::default(),
500 cursor: CursorConfig::in_memory(),
501 }
502 }
503
504 #[test]
505 fn test_engine_initial_state() {
506 let (_tx, rx) = watch::channel(test_config());
507 let engine = ReplicationEngine::new(test_config(), rx);
508
509 assert_eq!(engine.state(), EngineState::Created);
510 assert!(!engine.is_running());
511 assert_eq!(engine.node_id(), "test-node");
512 }
513
514 #[test]
515 fn test_engine_state_receiver() {
516 let (_tx, rx) = watch::channel(test_config());
517 let engine = ReplicationEngine::new(test_config(), rx);
518
519 let state_rx = engine.state_receiver();
520 assert_eq!(*state_rx.borrow(), EngineState::Created);
521 }
522
523 #[test]
524 fn test_engine_with_sync_engine() {
525 let (_tx, rx) = watch::channel(test_config());
526 let sync_engine = Arc::new(NoOpSyncEngine);
527 let engine = ReplicationEngine::with_sync_engine(
528 test_config(),
529 rx,
530 sync_engine,
531 );
532
533 assert_eq!(engine.state(), EngineState::Created);
534 let _ = engine.sync_engine();
536 }
537
538 #[test]
539 fn test_engine_circuit_accessor() {
540 let (_tx, rx) = watch::channel(test_config());
541 let engine = ReplicationEngine::new(test_config(), rx);
542
543 let circuit = engine.circuit();
545 let _ = circuit.clone();
547 }
548
549 #[test]
550 fn test_engine_peer_manager_accessor() {
551 let (_tx, rx) = watch::channel(test_config());
552 let engine = ReplicationEngine::new(test_config(), rx);
553
554 let pm = engine.peer_manager();
555 assert!(pm.all().is_empty());
557 }
558
559 #[test]
560 fn test_engine_with_peers() {
561 let mut config = test_config();
562 config.peers.push(PeerConfig::for_testing("peer-1", "redis://localhost:6379"));
563 config.peers.push(PeerConfig::for_testing("peer-2", "redis://localhost:6380"));
564
565 let (_tx, rx) = watch::channel(config.clone());
566 let engine = ReplicationEngine::new(config, rx);
567
568 let pm = engine.peer_manager();
569 assert_eq!(pm.all().len(), 2);
571 }
572
573 #[tokio::test]
574 async fn test_engine_start_invalid_state() {
575 let (_tx, rx) = watch::channel(test_config());
576 let mut engine = ReplicationEngine::new(test_config(), rx);
577
578 let _ = engine.state_tx.send(EngineState::Running);
580
581 let result = engine.start().await;
583 assert!(result.is_err());
584
585 if let Err(ReplicationError::InvalidState { expected, actual }) = result {
586 assert_eq!(expected, "Created");
587 assert_eq!(actual, "Running");
588 } else {
589 panic!("Expected InvalidState error");
590 }
591 }
592
593 #[tokio::test]
594 async fn test_engine_shutdown_from_created() {
595 let (_tx, rx) = watch::channel(test_config());
596 let mut engine = ReplicationEngine::new(test_config(), rx);
597
598 engine.shutdown().await;
600
601 assert_eq!(engine.state(), EngineState::Stopped);
602 assert!(!engine.is_running());
603 }
604
605 #[test]
606 fn test_engine_state_is_running() {
607 let (_tx, rx) = watch::channel(test_config());
608 let engine = ReplicationEngine::new(test_config(), rx);
609
610 assert!(!engine.is_running());
612
613 let _ = engine.state_tx.send(EngineState::Running);
615 assert!(engine.is_running());
616
617 let _ = engine.state_tx.send(EngineState::Stopped);
619 assert!(!engine.is_running());
620 }
621}