replication_engine/coordinator/
mod.rs

1// Copyright (c) 2025-2026 Adrian Robinson. Licensed under the AGPL-3.0.
2// See LICENSE file in the project root for full license text.
3
4//! Replication engine coordinator.
5//!
6//! The main orchestrator that ties together:
7//! - Peer connections via [`crate::peer::PeerConnection`]
8//! - Stream tailing (hot path) via [`crate::stream::StreamTailer`]
9//! - Cursor persistence via [`crate::cursor::CursorStore`]
10//! - Merkle repair (cold path) for consistency
11//!
12//! # Architecture
13//!
14//! The coordinator manages the full replication lifecycle:
15//! 1. Connects to configured peers
16//! 2. Tails CDC streams for real-time updates (hot path)
17//! 3. Periodically runs Merkle tree repair (cold path)
18//! 4. Handles graceful shutdown with in-flight batch draining
19
20mod types;
21mod hot_path;
22mod cold_path;
23
24pub use types::{EngineState, HealthCheck, PeerHealth};
25
26use crate::circuit_breaker::SyncEngineCircuit;
27use crate::config::ReplicationEngineConfig;
28use crate::cursor::CursorStore;
29use crate::error::{ReplicationError, Result};
30use crate::metrics;
31use crate::peer::PeerManager;
32use crate::resilience::{RetryConfig, RateLimiter};
33use crate::sync_engine::{SyncEngineRef, NoOpSyncEngine};
34use std::sync::Arc;
35use std::sync::atomic::AtomicUsize;
36use tokio::sync::{watch, RwLock};
37use tracing::{debug, error, info, warn};
38
39/// The main replication engine.
40///
41/// Manages bidirectional data sync between this node and its mesh peers.
42/// 
43/// # Sync Engine Integration
44/// 
45/// The replication engine is passed a reference to the local sync-engine by the daemon.
46/// We use this to:
47/// - Write replicated data from peers (`submit`)
48/// - Check for duplicates before applying (`is_current`)
49/// - Query Merkle tree for cold path repair
50///
51/// We **never** write to any CDC stream — that's sync-engine's responsibility.
52/// We only **read** from peer CDC streams and **write** to local sync-engine.
53pub struct ReplicationEngine<S: SyncEngineRef = NoOpSyncEngine> {
54    /// Configuration (can be updated at runtime)
55    config: ReplicationEngineConfig,
56
57    /// Runtime config updates
58    #[allow(dead_code)]
59    config_rx: watch::Receiver<ReplicationEngineConfig>,
60    /// Engine state (broadcast to watchers)
61    state_tx: watch::Sender<EngineState>,
62
63    /// Engine state receiver (for internal use)
64    state_rx: watch::Receiver<EngineState>,
65
66    /// Reference to local sync-engine (passed from daemon)
67    sync_engine: Arc<S>,
68
69    /// Circuit breaker for sync-engine protection
70    circuit: Arc<SyncEngineCircuit>,
71
72    /// Peer connection manager
73    peer_manager: Arc<PeerManager>,
74
75    /// Cursor persistence store
76    cursor_store: Arc<RwLock<Option<CursorStore>>>,
77
78    /// Shutdown signal sender
79    shutdown_tx: watch::Sender<bool>,
80
81    /// Shutdown signal receiver
82    shutdown_rx: watch::Receiver<bool>,
83
84    /// Count of peers currently catching up (cursor far behind stream head).
85    /// When > 0, cold path repair is skipped to avoid duplicate work.
86    catching_up_count: Arc<AtomicUsize>,
87
88    /// Hot path task handles
89    hot_path_handles: RwLock<Vec<tokio::task::JoinHandle<()>>>,
90}
91
92impl ReplicationEngine<NoOpSyncEngine> {
93    /// Create a new replication engine with no-op sync engine (for testing/standalone).
94    ///
95    /// The engine starts in `Created` state. Call [`start()`](Self::start)
96    /// to connect to peers and begin replication.
97    pub fn new(
98        config: ReplicationEngineConfig,
99        config_rx: watch::Receiver<ReplicationEngineConfig>,
100    ) -> Self {
101        Self::with_sync_engine(config, config_rx, Arc::new(NoOpSyncEngine))
102    }
103}
104
105impl<S: SyncEngineRef> ReplicationEngine<S> {
106    /// Create a new replication engine with a sync-engine reference.
107    ///
108    /// This is the primary constructor used by the daemon.
109    ///
110    /// # Arguments
111    /// * `config` - Replication configuration
112    /// * `config_rx` - Watch channel for config updates
113    /// * `sync_engine` - Reference to local sync-engine (for writes and dedup)
114    pub fn with_sync_engine(
115        config: ReplicationEngineConfig,
116        config_rx: watch::Receiver<ReplicationEngineConfig>,
117        sync_engine: Arc<S>,
118    ) -> Self {
119        let (state_tx, state_rx) = watch::channel(EngineState::Created);
120        let (shutdown_tx, shutdown_rx) = watch::channel(false);
121
122        // Create peer manager with daemon retry config
123        let retry_config = RetryConfig::daemon();
124        let peer_manager = Arc::new(PeerManager::new(retry_config));
125
126        // Add peers from config
127        for peer_config in &config.peers {
128            peer_manager.add_peer(peer_config.clone());
129        }
130
131        Self {
132            config,
133            config_rx,
134            state_tx,
135            state_rx,
136            sync_engine,
137            circuit: Arc::new(SyncEngineCircuit::new()),
138            peer_manager,
139            cursor_store: Arc::new(RwLock::new(None)),
140            shutdown_tx,
141            shutdown_rx,
142            catching_up_count: Arc::new(AtomicUsize::new(0)),
143            hot_path_handles: RwLock::new(Vec::new()),
144        }
145    }
146
147    /// Get a reference to the sync engine.
148    pub fn sync_engine(&self) -> &Arc<S> {
149        &self.sync_engine
150    }
151
152    /// Get a reference to the circuit breaker for sync-engine protection.
153    pub fn circuit(&self) -> &Arc<SyncEngineCircuit> {
154        &self.circuit
155    }
156
157    /// Get current engine state.
158    pub fn state(&self) -> EngineState {
159        *self.state_rx.borrow()
160    }
161
162    /// Get a receiver to watch state changes.
163    pub fn state_receiver(&self) -> watch::Receiver<EngineState> {
164        self.state_rx.clone()
165    }
166
167    /// Check if engine is running.
168    pub fn is_running(&self) -> bool {
169        matches!(self.state(), EngineState::Running)
170    }
171
172    /// Get comprehensive health status for monitoring endpoints.
173    ///
174    /// Returns a [`HealthCheck`] struct containing:
175    /// - Engine state and readiness
176    /// - Sync-engine backpressure status
177    /// - Per-peer connectivity, circuit breaker state, and lag
178    ///
179    /// **Performance**: This method performs no network I/O. All data is
180    /// collected from cached internal state (atomics, mutexes, watch channels).
181    ///
182    /// # Example
183    ///
184    /// ```text
185    /// let health = engine.health_check().await;
186    /// 
187    /// // For /ready endpoint
188    /// if health.ready {
189    ///     HttpResponse::Ok()
190    /// } else {
191    ///     HttpResponse::ServiceUnavailable()
192    /// }
193    ///
194    /// // For /health endpoint (full diagnostics)
195    /// HttpResponse::Ok().json(serde_json::json!({
196    ///     "healthy": health.healthy,
197    ///     "state": health.state.to_string(),
198    ///     "peers_connected": health.peers_connected,
199    ///     "peers_total": health.peers_total,
200    ///     "sync_engine_accepting_writes": health.sync_engine_accepting_writes,
201    /// }))
202    /// ```
203    pub async fn health_check(&self) -> HealthCheck {
204        use crate::peer::PeerCircuitState;
205        
206        let state = self.state();
207        let sync_engine_accepting_writes = self.sync_engine.should_accept_writes();
208        
209        // Collect peer health
210        let all_peers = self.peer_manager.all();
211        let peers_total = all_peers.len();
212        let mut peers = Vec::with_capacity(peers_total);
213        let mut peers_connected = 0;
214        let mut peers_circuit_open = 0;
215        let peers_catching_up = self.catching_up_count.load(std::sync::atomic::Ordering::Relaxed);
216        
217        for peer in all_peers {
218            let connected = peer.is_connected().await;
219            let circuit_state = peer.circuit_state().await;
220            let circuit_open = circuit_state == PeerCircuitState::Open;
221            
222            if connected {
223                peers_connected += 1;
224            }
225            if circuit_open {
226                peers_circuit_open += 1;
227            }
228            
229            peers.push(PeerHealth {
230                node_id: peer.node_id().to_string(),
231                connected,
232                circuit_state,
233                circuit_open,
234                failure_count: peer.failure_count(),
235                millis_since_success: peer.millis_since_success(),
236                lag_ms: None, // TODO: Track per-peer lag in hot path
237                catching_up: false, // TODO: Track per-peer catching up state
238            });
239        }
240        
241        // Determine readiness and health
242        let ready = state == EngineState::Running && peers_connected > 0;
243        let healthy = ready && sync_engine_accepting_writes;
244        
245        HealthCheck {
246            state,
247            ready,
248            sync_engine_accepting_writes,
249            peers_total,
250            peers_connected,
251            peers_circuit_open,
252            peers_catching_up,
253            peers,
254            healthy,
255        }
256    }
257
258    /// Start the replication engine.
259    ///
260    /// 1. Opens cursor store (SQLite)
261    /// 2. Connects to all enabled peers
262    /// 3. Spawns hot path tailers for each peer
263    /// 4. Spawns cold path repair task (if enabled)
264    pub async fn start(&mut self) -> Result<()> {
265        if self.state() != EngineState::Created {
266            return Err(ReplicationError::InvalidState {
267                expected: "Created".to_string(),
268                actual: format!("{:?}", self.state()),
269            });
270        }
271
272        info!(
273            node_id = %self.config.local_node_id,
274            peer_count = self.config.peers.len(),
275            "Starting replication engine"
276        );
277
278        // Update state
279        let _ = self.state_tx.send(EngineState::Connecting);
280        metrics::set_engine_state("Connecting");
281
282        // Initialize cursor store
283        let cursor_store = CursorStore::new(&self.config.cursor.sqlite_path).await?;
284        *self.cursor_store.write().await = Some(cursor_store);
285        info!(path = %self.config.cursor.sqlite_path, "Cursor store initialized");
286
287        // Connect to peers
288        let results = self.peer_manager.connect_all().await;
289        let connected = results.iter().filter(|r| r.is_ok()).count();
290        let failed = results.iter().filter(|r| r.is_err()).count();
291
292        // Update connected peers metric
293        metrics::set_connected_peers(connected);
294
295        if failed > 0 {
296            warn!(connected, failed, "Some peer connections failed");
297        }
298
299        if connected == 0 && !self.config.peers.is_empty() {
300            error!("Failed to connect to any peers");
301            let _ = self.state_tx.send(EngineState::Failed);
302            metrics::set_engine_state("Failed");
303            return Err(ReplicationError::Internal(
304                "No peers connected".to_string(),
305            ));
306        }
307
308        // Spawn hot path tasks for each connected peer
309        self.spawn_hot_path_tasks().await;
310
311        // Spawn cold path task if enabled
312        if self.config.settings.cold_path.enabled {
313            self.spawn_cold_path_task().await;
314        }
315
316        // Spawn cursor flush task (debounced writes)
317        self.spawn_cursor_flush_task().await;
318
319        // Spawn peer health check task if enabled
320        if self.config.settings.peer_health.enabled {
321            self.spawn_peer_health_task().await;
322        }
323
324        let _ = self.state_tx.send(EngineState::Running);
325        metrics::set_engine_state("Running");
326        info!(
327            connected,
328            total = self.config.peers.len(),
329            "Replication engine running"
330        );
331
332        Ok(())
333    }
334
335    /// Spawn hot path tailer tasks for each peer.
336    async fn spawn_hot_path_tasks(&self) {
337        let peers = self.peer_manager.all();
338        let mut handles = self.hot_path_handles.write().await;
339
340        // Create shared rate limiter if enabled (shared across all peers)
341        let rate_limiter: Option<Arc<RateLimiter>> = self.config.settings.hot_path
342            .rate_limit_config()
343            .map(|cfg| {
344                info!(
345                    rate_per_sec = cfg.refill_rate,
346                    burst = cfg.burst_size,
347                    "Rate limiting enabled for hot path"
348                );
349                Arc::new(RateLimiter::new(cfg))
350            });
351
352        for peer in peers {
353            if !peer.is_connected().await {
354                continue;
355            }
356
357            let peer_id = peer.node_id().to_string();
358            let peer = Arc::clone(&peer);
359            let cursor_store = Arc::clone(&self.cursor_store);
360            let sync_engine = Arc::clone(&self.sync_engine);
361            let circuit = Arc::clone(&self.circuit);
362            let shutdown_rx = self.shutdown_rx.clone();
363            let config = self.config.settings.hot_path.clone();
364            let catching_up_count = Arc::clone(&self.catching_up_count);
365            let rate_limiter = rate_limiter.clone();
366
367            let handle = tokio::spawn(async move {
368                hot_path::run_tailer(peer, cursor_store, sync_engine, circuit, config, shutdown_rx, catching_up_count, rate_limiter).await;
369            });
370
371            info!(peer_id = %peer_id, "Spawned hot path tailer");
372            handles.push(handle);
373        }
374    }
375
376    /// Spawn cold path repair task.
377    async fn spawn_cold_path_task(&self)
378    where
379        S: Send + Sync + 'static,
380    {
381        let sync_engine = Arc::clone(&self.sync_engine);
382        let peer_manager = Arc::clone(&self.peer_manager);
383        let shutdown_rx = self.shutdown_rx.clone();
384        let config = self.config.settings.cold_path.clone();
385        let catching_up_count = Arc::clone(&self.catching_up_count);
386
387        let handle = tokio::spawn(async move {
388            cold_path::run_repair(sync_engine, peer_manager, config, shutdown_rx, catching_up_count).await;
389        });
390
391        info!("Spawned cold path repair task");
392        self.hot_path_handles.write().await.push(handle);
393    }
394
395    /// Spawn cursor flush task for debounced writes.
396    ///
397    /// Periodically flushes dirty cursors to SQLite (every 5 seconds).
398    async fn spawn_cursor_flush_task(&self) {
399        let cursor_store = Arc::clone(&self.cursor_store);
400        let mut shutdown_rx = self.shutdown_rx.clone();
401
402        let handle = tokio::spawn(async move {
403            // Mark initial shutdown value as seen so changed() only fires on actual changes
404            let _ = shutdown_rx.borrow_and_update();
405
406            let flush_interval = std::time::Duration::from_secs(5);
407            let mut timer = tokio::time::interval(flush_interval);
408            timer.set_missed_tick_behavior(tokio::time::MissedTickBehavior::Skip);
409
410            loop {
411                tokio::select! {
412                    biased;
413                    
414                    result = shutdown_rx.changed() => {
415                        if result.is_err() || *shutdown_rx.borrow() {
416                            debug!("Cursor flush task stopping");
417                            break;
418                        }
419                        continue;
420                    }
421                    
422                    _ = timer.tick() => {
423                        let store_guard = cursor_store.read().await;
424                        if let Some(ref store) = *store_guard {
425                            if let Err(e) = store.flush_dirty().await {
426                                warn!(error = %e, "Failed to flush cursors");
427                            }
428                        }
429                    }
430                }
431            }
432        });
433
434        debug!("Spawned cursor flush task");
435        self.hot_path_handles.write().await.push(handle);
436    }
437
438    /// Spawn peer health check task.
439    ///
440    /// Periodically pings idle peers to check connection health
441    /// and record latency for observability.
442    async fn spawn_peer_health_task(&self) {
443        let peer_manager = Arc::clone(&self.peer_manager);
444        let mut shutdown_rx = self.shutdown_rx.clone();
445        let config = self.config.settings.peer_health.clone();
446
447        let handle = tokio::spawn(async move {
448            // Mark initial shutdown value as seen so changed() only fires on actual changes
449            let _ = shutdown_rx.borrow_and_update();
450
451            let ping_interval = std::time::Duration::from_secs(config.ping_interval_sec);
452            let idle_threshold_ms = config.idle_threshold_sec * 1000;
453            let mut timer = tokio::time::interval(ping_interval);
454
455            info!(
456                ping_interval_sec = config.ping_interval_sec,
457                idle_threshold_sec = config.idle_threshold_sec,
458                "Starting peer health check task"
459            );
460
461            loop {
462                tokio::select! {
463                    biased;
464                    
465                    result = shutdown_rx.changed() => {
466                        if result.is_err() || *shutdown_rx.borrow() {
467                            debug!("Peer health task stopping");
468                            break;
469                        }
470                        continue;
471                    }
472                    
473                    _ = timer.tick() => {
474                        // Check each connected peer
475                        for peer in peer_manager.all() {
476                            if !peer.is_connected().await {
477                                continue;
478                            }
479
480                            // Only ping if idle (no recent successful contact)
481                            let idle_ms = peer.millis_since_success();
482                            if idle_ms < idle_threshold_ms {
483                                continue;
484                            }
485
486                            let peer_id = peer.node_id().to_string();
487                            debug!(
488                                peer_id = %peer_id,
489                                idle_ms,
490                                "Pinging idle peer"
491                            );
492
493                            match peer.ping().await {
494                                Ok(latency) => {
495                                    debug!(
496                                        peer_id = %peer_id,
497                                        latency_ms = latency.as_millis(),
498                                        "Peer ping successful"
499                                    );
500                                }
501                                Err(e) => {
502                                    warn!(
503                                        peer_id = %peer_id,
504                                        error = %e,
505                                        "Peer ping failed"
506                                    );
507                                    // Connection may be stale - mark disconnected
508                                    peer.mark_disconnected().await;
509                                }
510                            }
511                        }
512                    }
513                }
514            }
515
516            info!("Peer health check task stopped");
517        });
518
519        info!("Spawned peer health check task");
520        self.hot_path_handles.write().await.push(handle);
521    }
522
523    /// Shutdown the replication engine gracefully.
524    ///
525    /// Shutdown sequence:
526    /// 1. Signal all hot/cold path tasks to stop
527    /// 2. Wait for tasks to flush pending batches (with timeout)
528    /// 3. Shutdown peer connections
529    /// 4. Checkpoint and close cursor store
530    pub async fn shutdown(&mut self) {
531        info!("Shutting down replication engine");
532        let _ = self.state_tx.send(EngineState::ShuttingDown);
533        metrics::set_engine_state("ShuttingDown");
534
535        // Signal shutdown to all tasks
536        let _ = self.shutdown_tx.send(true);
537
538        // Wait for tasks to complete gracefully (they'll flush pending batches)
539        let handles: Vec<_> = {
540            let mut guard = self.hot_path_handles.write().await;
541            std::mem::take(&mut *guard)
542        };
543
544        let task_count = handles.len();
545        if task_count > 0 {
546            info!(task_count, "Waiting for tasks to drain and complete");
547        }
548
549        // Give tasks time to flush their batches (10 seconds should be plenty)
550        let drain_timeout = std::time::Duration::from_secs(10);
551        for (i, handle) in handles.into_iter().enumerate() {
552            match tokio::time::timeout(drain_timeout, handle).await {
553                Ok(Ok(())) => {
554                    debug!(task = i + 1, "Task completed gracefully");
555                }
556                Ok(Err(e)) => {
557                    warn!(task = i + 1, error = %e, "Task panicked during shutdown");
558                }
559                Err(_) => {
560                    warn!(task = i + 1, "Task timed out during shutdown (batch may be lost)");
561                }
562            }
563        }
564
565        // Shutdown peer manager
566        self.peer_manager.shutdown_all();
567        metrics::set_connected_peers(0);
568
569        // Close cursor store (includes WAL checkpoint)
570        if let Some(cursor_store) = self.cursor_store.write().await.take() {
571            cursor_store.close().await;
572        }
573
574        let _ = self.state_tx.send(EngineState::Stopped);
575        metrics::set_engine_state("Stopped");
576        info!("Replication engine stopped");
577    }
578
579    /// Get the peer manager (for metrics/diagnostics).
580    pub fn peer_manager(&self) -> &Arc<PeerManager> {
581        &self.peer_manager
582    }
583
584    /// Get the node ID.
585    pub fn node_id(&self) -> &str {
586        &self.config.local_node_id
587    }
588}
589
590#[cfg(test)]
591mod tests {
592    use super::*;
593    use crate::config::{PeerConfig, CursorConfig};
594
595    fn test_config() -> ReplicationEngineConfig {
596        ReplicationEngineConfig {
597            local_node_id: "test-node".to_string(),
598            peers: vec![],
599            settings: Default::default(),
600            cursor: CursorConfig::in_memory(),
601        }
602    }
603
604    #[test]
605    fn test_engine_initial_state() {
606        let (_tx, rx) = watch::channel(test_config());
607        let engine = ReplicationEngine::new(test_config(), rx);
608        
609        assert_eq!(engine.state(), EngineState::Created);
610        assert!(!engine.is_running());
611        assert_eq!(engine.node_id(), "test-node");
612    }
613
614    #[test]
615    fn test_engine_state_receiver() {
616        let (_tx, rx) = watch::channel(test_config());
617        let engine = ReplicationEngine::new(test_config(), rx);
618        
619        let state_rx = engine.state_receiver();
620        assert_eq!(*state_rx.borrow(), EngineState::Created);
621    }
622
623    #[test]
624    fn test_engine_with_sync_engine() {
625        let (_tx, rx) = watch::channel(test_config());
626        let sync_engine = Arc::new(NoOpSyncEngine);
627        let engine = ReplicationEngine::with_sync_engine(
628            test_config(),
629            rx,
630            sync_engine,
631        );
632        
633        assert_eq!(engine.state(), EngineState::Created);
634        // sync_engine accessor should work
635        let _ = engine.sync_engine();
636    }
637
638    #[test]
639    fn test_engine_circuit_accessor() {
640        let (_tx, rx) = watch::channel(test_config());
641        let engine = ReplicationEngine::new(test_config(), rx);
642        
643        // Circuit breaker should be accessible
644        let circuit = engine.circuit();
645        // Just verify we can access it
646        let _ = circuit.clone();
647    }
648
649    #[test]
650    fn test_engine_peer_manager_accessor() {
651        let (_tx, rx) = watch::channel(test_config());
652        let engine = ReplicationEngine::new(test_config(), rx);
653        
654        let pm = engine.peer_manager();
655        // Empty config = no peers
656        assert!(pm.all().is_empty());
657    }
658
659    #[test]
660    fn test_engine_with_peers() {
661        let mut config = test_config();
662        config.peers.push(PeerConfig::for_testing("peer-1", "redis://localhost:6379"));
663        config.peers.push(PeerConfig::for_testing("peer-2", "redis://localhost:6380"));
664        
665        let (_tx, rx) = watch::channel(config.clone());
666        let engine = ReplicationEngine::new(config, rx);
667        
668        let pm = engine.peer_manager();
669        // Both peers added
670        assert_eq!(pm.all().len(), 2);
671    }
672
673    #[tokio::test]
674    async fn test_engine_start_invalid_state() {
675        let (_tx, rx) = watch::channel(test_config());
676        let mut engine = ReplicationEngine::new(test_config(), rx);
677        
678        // Force state to Running (simulating already started)
679        let _ = engine.state_tx.send(EngineState::Running);
680        
681        // Trying to start should fail
682        let result = engine.start().await;
683        assert!(result.is_err());
684        
685        if let Err(ReplicationError::InvalidState { expected, actual }) = result {
686            assert_eq!(expected, "Created");
687            assert_eq!(actual, "Running");
688        } else {
689            panic!("Expected InvalidState error");
690        }
691    }
692
693    #[tokio::test]
694    async fn test_engine_shutdown_from_created() {
695        let (_tx, rx) = watch::channel(test_config());
696        let mut engine = ReplicationEngine::new(test_config(), rx);
697        
698        // Shutdown from Created state should work
699        engine.shutdown().await;
700        
701        assert_eq!(engine.state(), EngineState::Stopped);
702        assert!(!engine.is_running());
703    }
704
705    #[test]
706    fn test_engine_state_is_running() {
707        let (_tx, rx) = watch::channel(test_config());
708        let engine = ReplicationEngine::new(test_config(), rx);
709        
710        // Initially not running
711        assert!(!engine.is_running());
712        
713        // Manually set to running
714        let _ = engine.state_tx.send(EngineState::Running);
715        assert!(engine.is_running());
716        
717        // Set to stopped
718        let _ = engine.state_tx.send(EngineState::Stopped);
719        assert!(!engine.is_running());
720    }
721}