replication_engine/coordinator/
mod.rs

1// Copyright (c) 2025-2026 Adrian Robinson. Licensed under the AGPL-3.0.
2// See LICENSE file in the project root for full license text.
3
4//! Replication engine coordinator.
5//!
6//! The main orchestrator that ties together:
7//! - Peer connections via [`crate::peer::PeerConnection`]
8//! - Stream tailing (hot path) via [`crate::stream::StreamTailer`]
9//! - Cursor persistence via [`crate::cursor::CursorStore`]
10//! - Merkle repair (cold path) for consistency
11//!
12//! # Architecture
13//!
14//! The coordinator manages the full replication lifecycle:
15//! 1. Connects to configured peers
16//! 2. Tails CDC streams for real-time updates (hot path)
17//! 3. Periodically runs Merkle tree repair (cold path)
18//! 4. Handles graceful shutdown with in-flight batch draining
19
20mod types;
21mod hot_path;
22mod cold_path;
23
24pub use types::{EngineState, HealthCheck, PeerHealth};
25
26use crate::circuit_breaker::SyncEngineCircuit;
27use crate::config::ReplicationConfig;
28use crate::cursor::CursorStore;
29use crate::error::{ReplicationError, Result};
30use crate::metrics;
31use crate::peer::PeerManager;
32use crate::resilience::{RetryConfig, RateLimiter};
33use crate::sync_engine::{SyncEngineRef, NoOpSyncEngine};
34use std::sync::Arc;
35use std::sync::atomic::AtomicUsize;
36use tokio::sync::{watch, RwLock};
37use tracing::{debug, error, info, warn};
38
39/// The main replication engine.
40///
41/// Manages bidirectional data sync between this node and its mesh peers.
42/// 
43/// # Sync Engine Integration
44/// 
45/// The replication engine is passed a reference to the local sync-engine by the daemon.
46/// We use this to:
47/// - Write replicated data from peers (`submit`)
48/// - Check for duplicates before applying (`is_current`)
49/// - Query Merkle tree for cold path repair
50///
51/// We **never** write to any CDC stream — that's sync-engine's responsibility.
52/// We only **read** from peer CDC streams and **write** to local sync-engine.
53pub struct ReplicationEngine<S: SyncEngineRef = NoOpSyncEngine> {
54    /// Configuration (can be updated at runtime)
55    config: ReplicationConfig,
56
57    /// Runtime config updates
58    #[allow(dead_code)]
59    config_rx: watch::Receiver<ReplicationConfig>,
60
61    /// Engine state (broadcast to watchers)
62    state_tx: watch::Sender<EngineState>,
63
64    /// Engine state receiver (for internal use)
65    state_rx: watch::Receiver<EngineState>,
66
67    /// Reference to local sync-engine (passed from daemon)
68    sync_engine: Arc<S>,
69
70    /// Circuit breaker for sync-engine protection
71    circuit: Arc<SyncEngineCircuit>,
72
73    /// Peer connection manager
74    peer_manager: Arc<PeerManager>,
75
76    /// Cursor persistence store
77    cursor_store: Arc<RwLock<Option<CursorStore>>>,
78
79    /// Shutdown signal sender
80    shutdown_tx: watch::Sender<bool>,
81
82    /// Shutdown signal receiver
83    shutdown_rx: watch::Receiver<bool>,
84
85    /// Count of peers currently catching up (cursor far behind stream head).
86    /// When > 0, cold path repair is skipped to avoid duplicate work.
87    catching_up_count: Arc<AtomicUsize>,
88
89    /// Hot path task handles
90    hot_path_handles: RwLock<Vec<tokio::task::JoinHandle<()>>>,
91}
92
93impl ReplicationEngine<NoOpSyncEngine> {
94    /// Create a new replication engine with no-op sync engine (for testing/standalone).
95    ///
96    /// The engine starts in `Created` state. Call [`start()`](Self::start)
97    /// to connect to peers and begin replication.
98    pub fn new(
99        config: ReplicationConfig,
100        config_rx: watch::Receiver<ReplicationConfig>,
101    ) -> Self {
102        Self::with_sync_engine(config, config_rx, Arc::new(NoOpSyncEngine))
103    }
104}
105
106impl<S: SyncEngineRef> ReplicationEngine<S> {
107    /// Create a new replication engine with a sync-engine reference.
108    ///
109    /// This is the primary constructor used by the daemon.
110    ///
111    /// # Arguments
112    /// * `config` - Replication configuration
113    /// * `config_rx` - Watch channel for config updates
114    /// * `sync_engine` - Reference to local sync-engine (for writes and dedup)
115    pub fn with_sync_engine(
116        config: ReplicationConfig,
117        config_rx: watch::Receiver<ReplicationConfig>,
118        sync_engine: Arc<S>,
119    ) -> Self {
120        let (state_tx, state_rx) = watch::channel(EngineState::Created);
121        let (shutdown_tx, shutdown_rx) = watch::channel(false);
122
123        // Create peer manager with daemon retry config
124        let retry_config = RetryConfig::daemon();
125        let peer_manager = Arc::new(PeerManager::new(retry_config));
126
127        // Add peers from config
128        for peer_config in &config.peers {
129            peer_manager.add_peer(peer_config.clone());
130        }
131
132        Self {
133            config,
134            config_rx,
135            state_tx,
136            state_rx,
137            sync_engine,
138            circuit: Arc::new(SyncEngineCircuit::new()),
139            peer_manager,
140            cursor_store: Arc::new(RwLock::new(None)),
141            shutdown_tx,
142            shutdown_rx,
143            catching_up_count: Arc::new(AtomicUsize::new(0)),
144            hot_path_handles: RwLock::new(Vec::new()),
145        }
146    }
147
148    /// Get a reference to the sync engine.
149    pub fn sync_engine(&self) -> &Arc<S> {
150        &self.sync_engine
151    }
152
153    /// Get a reference to the circuit breaker for sync-engine protection.
154    pub fn circuit(&self) -> &Arc<SyncEngineCircuit> {
155        &self.circuit
156    }
157
158    /// Get current engine state.
159    pub fn state(&self) -> EngineState {
160        *self.state_rx.borrow()
161    }
162
163    /// Get a receiver to watch state changes.
164    pub fn state_receiver(&self) -> watch::Receiver<EngineState> {
165        self.state_rx.clone()
166    }
167
168    /// Check if engine is running.
169    pub fn is_running(&self) -> bool {
170        matches!(self.state(), EngineState::Running)
171    }
172
173    /// Get comprehensive health status for monitoring endpoints.
174    ///
175    /// Returns a [`HealthCheck`] struct containing:
176    /// - Engine state and readiness
177    /// - Sync-engine backpressure status
178    /// - Per-peer connectivity, circuit breaker state, and lag
179    ///
180    /// **Performance**: This method performs no network I/O. All data is
181    /// collected from cached internal state (atomics, mutexes, watch channels).
182    ///
183    /// # Example
184    ///
185    /// ```rust,ignore
186    /// let health = engine.health_check().await;
187    /// 
188    /// // For /ready endpoint
189    /// if health.ready {
190    ///     HttpResponse::Ok()
191    /// } else {
192    ///     HttpResponse::ServiceUnavailable()
193    /// }
194    ///
195    /// // For /health endpoint (full diagnostics)
196    /// HttpResponse::Ok().json(serde_json::json!({
197    ///     "healthy": health.healthy,
198    ///     "state": health.state.to_string(),
199    ///     "peers_connected": health.peers_connected,
200    ///     "peers_total": health.peers_total,
201    ///     "sync_engine_accepting_writes": health.sync_engine_accepting_writes,
202    /// }))
203    /// ```
204    pub async fn health_check(&self) -> HealthCheck {
205        use crate::peer::PeerCircuitState;
206        
207        let state = self.state();
208        let sync_engine_accepting_writes = self.sync_engine.should_accept_writes();
209        
210        // Collect peer health
211        let all_peers = self.peer_manager.all();
212        let peers_total = all_peers.len();
213        let mut peers = Vec::with_capacity(peers_total);
214        let mut peers_connected = 0;
215        let mut peers_circuit_open = 0;
216        let peers_catching_up = self.catching_up_count.load(std::sync::atomic::Ordering::Relaxed);
217        
218        for peer in all_peers {
219            let connected = peer.is_connected().await;
220            let circuit_state = peer.circuit_state().await;
221            let circuit_open = circuit_state == PeerCircuitState::Open;
222            
223            if connected {
224                peers_connected += 1;
225            }
226            if circuit_open {
227                peers_circuit_open += 1;
228            }
229            
230            peers.push(PeerHealth {
231                node_id: peer.node_id().to_string(),
232                connected,
233                circuit_state,
234                circuit_open,
235                failure_count: peer.failure_count(),
236                millis_since_success: peer.millis_since_success(),
237                lag_ms: None, // TODO: Track per-peer lag in hot path
238                catching_up: false, // TODO: Track per-peer catching up state
239            });
240        }
241        
242        // Determine readiness and health
243        let ready = state == EngineState::Running && peers_connected > 0;
244        let healthy = ready && sync_engine_accepting_writes;
245        
246        HealthCheck {
247            state,
248            ready,
249            sync_engine_accepting_writes,
250            peers_total,
251            peers_connected,
252            peers_circuit_open,
253            peers_catching_up,
254            peers,
255            healthy,
256        }
257    }
258
259    /// Start the replication engine.
260    ///
261    /// 1. Opens cursor store (SQLite)
262    /// 2. Connects to all enabled peers
263    /// 3. Spawns hot path tailers for each peer
264    /// 4. Spawns cold path repair task (if enabled)
265    pub async fn start(&mut self) -> Result<()> {
266        if self.state() != EngineState::Created {
267            return Err(ReplicationError::InvalidState {
268                expected: "Created".to_string(),
269                actual: format!("{:?}", self.state()),
270            });
271        }
272
273        info!(
274            node_id = %self.config.local_node_id,
275            peer_count = self.config.peers.len(),
276            "Starting replication engine"
277        );
278
279        // Update state
280        let _ = self.state_tx.send(EngineState::Connecting);
281        metrics::set_engine_state("Connecting");
282
283        // Initialize cursor store
284        let cursor_store = CursorStore::new(&self.config.cursor.sqlite_path).await?;
285        *self.cursor_store.write().await = Some(cursor_store);
286        info!(path = %self.config.cursor.sqlite_path, "Cursor store initialized");
287
288        // Connect to peers
289        let results = self.peer_manager.connect_all().await;
290        let connected = results.iter().filter(|r| r.is_ok()).count();
291        let failed = results.iter().filter(|r| r.is_err()).count();
292
293        // Update connected peers metric
294        metrics::set_connected_peers(connected);
295
296        if failed > 0 {
297            warn!(connected, failed, "Some peer connections failed");
298        }
299
300        if connected == 0 && !self.config.peers.is_empty() {
301            error!("Failed to connect to any peers");
302            let _ = self.state_tx.send(EngineState::Failed);
303            metrics::set_engine_state("Failed");
304            return Err(ReplicationError::Internal(
305                "No peers connected".to_string(),
306            ));
307        }
308
309        // Spawn hot path tasks for each connected peer
310        self.spawn_hot_path_tasks().await;
311
312        // Spawn cold path task if enabled
313        if self.config.settings.cold_path.enabled {
314            self.spawn_cold_path_task().await;
315        }
316
317        // Spawn cursor flush task (debounced writes)
318        self.spawn_cursor_flush_task().await;
319
320        // Spawn peer health check task if enabled
321        if self.config.settings.peer_health.enabled {
322            self.spawn_peer_health_task().await;
323        }
324
325        let _ = self.state_tx.send(EngineState::Running);
326        metrics::set_engine_state("Running");
327        info!(
328            connected,
329            total = self.config.peers.len(),
330            "Replication engine running"
331        );
332
333        Ok(())
334    }
335
336    /// Spawn hot path tailer tasks for each peer.
337    async fn spawn_hot_path_tasks(&self) {
338        let peers = self.peer_manager.all();
339        let mut handles = self.hot_path_handles.write().await;
340
341        // Create shared rate limiter if enabled (shared across all peers)
342        let rate_limiter: Option<Arc<RateLimiter>> = self.config.settings.hot_path
343            .rate_limit_config()
344            .map(|cfg| {
345                info!(
346                    rate_per_sec = cfg.refill_rate,
347                    burst = cfg.burst_size,
348                    "Rate limiting enabled for hot path"
349                );
350                Arc::new(RateLimiter::new(cfg))
351            });
352
353        for peer in peers {
354            if !peer.is_connected().await {
355                continue;
356            }
357
358            let peer_id = peer.node_id().to_string();
359            let peer = Arc::clone(&peer);
360            let cursor_store = Arc::clone(&self.cursor_store);
361            let sync_engine = Arc::clone(&self.sync_engine);
362            let circuit = Arc::clone(&self.circuit);
363            let shutdown_rx = self.shutdown_rx.clone();
364            let config = self.config.settings.hot_path.clone();
365            let catching_up_count = Arc::clone(&self.catching_up_count);
366            let rate_limiter = rate_limiter.clone();
367
368            let handle = tokio::spawn(async move {
369                hot_path::run_tailer(peer, cursor_store, sync_engine, circuit, config, shutdown_rx, catching_up_count, rate_limiter).await;
370            });
371
372            info!(peer_id = %peer_id, "Spawned hot path tailer");
373            handles.push(handle);
374        }
375    }
376
377    /// Spawn cold path repair task.
378    async fn spawn_cold_path_task(&self)
379    where
380        S: Send + Sync + 'static,
381    {
382        let sync_engine = Arc::clone(&self.sync_engine);
383        let peer_manager = Arc::clone(&self.peer_manager);
384        let shutdown_rx = self.shutdown_rx.clone();
385        let config = self.config.settings.cold_path.clone();
386        let catching_up_count = Arc::clone(&self.catching_up_count);
387
388        let handle = tokio::spawn(async move {
389            cold_path::run_repair(sync_engine, peer_manager, config, shutdown_rx, catching_up_count).await;
390        });
391
392        info!("Spawned cold path repair task");
393        self.hot_path_handles.write().await.push(handle);
394    }
395
396    /// Spawn cursor flush task for debounced writes.
397    ///
398    /// Periodically flushes dirty cursors to SQLite (every 5 seconds).
399    async fn spawn_cursor_flush_task(&self) {
400        let cursor_store = Arc::clone(&self.cursor_store);
401        let mut shutdown_rx = self.shutdown_rx.clone();
402
403        let handle = tokio::spawn(async move {
404            // Mark initial shutdown value as seen so changed() only fires on actual changes
405            let _ = shutdown_rx.borrow_and_update();
406
407            let flush_interval = std::time::Duration::from_secs(5);
408            let mut timer = tokio::time::interval(flush_interval);
409            timer.set_missed_tick_behavior(tokio::time::MissedTickBehavior::Skip);
410
411            loop {
412                tokio::select! {
413                    biased;
414                    
415                    result = shutdown_rx.changed() => {
416                        if result.is_err() || *shutdown_rx.borrow() {
417                            debug!("Cursor flush task stopping");
418                            break;
419                        }
420                        continue;
421                    }
422                    
423                    _ = timer.tick() => {
424                        let store_guard = cursor_store.read().await;
425                        if let Some(ref store) = *store_guard {
426                            if let Err(e) = store.flush_dirty().await {
427                                warn!(error = %e, "Failed to flush cursors");
428                            }
429                        }
430                    }
431                }
432            }
433        });
434
435        debug!("Spawned cursor flush task");
436        self.hot_path_handles.write().await.push(handle);
437    }
438
439    /// Spawn peer health check task.
440    ///
441    /// Periodically pings idle peers to check connection health
442    /// and record latency for observability.
443    async fn spawn_peer_health_task(&self) {
444        let peer_manager = Arc::clone(&self.peer_manager);
445        let mut shutdown_rx = self.shutdown_rx.clone();
446        let config = self.config.settings.peer_health.clone();
447
448        let handle = tokio::spawn(async move {
449            // Mark initial shutdown value as seen so changed() only fires on actual changes
450            let _ = shutdown_rx.borrow_and_update();
451
452            let ping_interval = std::time::Duration::from_secs(config.ping_interval_sec);
453            let idle_threshold_ms = config.idle_threshold_sec * 1000;
454            let mut timer = tokio::time::interval(ping_interval);
455
456            info!(
457                ping_interval_sec = config.ping_interval_sec,
458                idle_threshold_sec = config.idle_threshold_sec,
459                "Starting peer health check task"
460            );
461
462            loop {
463                tokio::select! {
464                    biased;
465                    
466                    result = shutdown_rx.changed() => {
467                        if result.is_err() || *shutdown_rx.borrow() {
468                            debug!("Peer health task stopping");
469                            break;
470                        }
471                        continue;
472                    }
473                    
474                    _ = timer.tick() => {
475                        // Check each connected peer
476                        for peer in peer_manager.all() {
477                            if !peer.is_connected().await {
478                                continue;
479                            }
480
481                            // Only ping if idle (no recent successful contact)
482                            let idle_ms = peer.millis_since_success();
483                            if idle_ms < idle_threshold_ms {
484                                continue;
485                            }
486
487                            let peer_id = peer.node_id().to_string();
488                            debug!(
489                                peer_id = %peer_id,
490                                idle_ms,
491                                "Pinging idle peer"
492                            );
493
494                            match peer.ping().await {
495                                Ok(latency) => {
496                                    debug!(
497                                        peer_id = %peer_id,
498                                        latency_ms = latency.as_millis(),
499                                        "Peer ping successful"
500                                    );
501                                }
502                                Err(e) => {
503                                    warn!(
504                                        peer_id = %peer_id,
505                                        error = %e,
506                                        "Peer ping failed"
507                                    );
508                                    // Connection may be stale - mark disconnected
509                                    peer.mark_disconnected().await;
510                                }
511                            }
512                        }
513                    }
514                }
515            }
516
517            info!("Peer health check task stopped");
518        });
519
520        info!("Spawned peer health check task");
521        self.hot_path_handles.write().await.push(handle);
522    }
523
524    /// Shutdown the replication engine gracefully.
525    ///
526    /// Shutdown sequence:
527    /// 1. Signal all hot/cold path tasks to stop
528    /// 2. Wait for tasks to flush pending batches (with timeout)
529    /// 3. Shutdown peer connections
530    /// 4. Checkpoint and close cursor store
531    pub async fn shutdown(&mut self) {
532        info!("Shutting down replication engine");
533        let _ = self.state_tx.send(EngineState::ShuttingDown);
534        metrics::set_engine_state("ShuttingDown");
535
536        // Signal shutdown to all tasks
537        let _ = self.shutdown_tx.send(true);
538
539        // Wait for tasks to complete gracefully (they'll flush pending batches)
540        let handles: Vec<_> = {
541            let mut guard = self.hot_path_handles.write().await;
542            std::mem::take(&mut *guard)
543        };
544
545        let task_count = handles.len();
546        if task_count > 0 {
547            info!(task_count, "Waiting for tasks to drain and complete");
548        }
549
550        // Give tasks time to flush their batches (10 seconds should be plenty)
551        let drain_timeout = std::time::Duration::from_secs(10);
552        for (i, handle) in handles.into_iter().enumerate() {
553            match tokio::time::timeout(drain_timeout, handle).await {
554                Ok(Ok(())) => {
555                    debug!(task = i + 1, "Task completed gracefully");
556                }
557                Ok(Err(e)) => {
558                    warn!(task = i + 1, error = %e, "Task panicked during shutdown");
559                }
560                Err(_) => {
561                    warn!(task = i + 1, "Task timed out during shutdown (batch may be lost)");
562                }
563            }
564        }
565
566        // Shutdown peer manager
567        self.peer_manager.shutdown_all();
568        metrics::set_connected_peers(0);
569
570        // Close cursor store (includes WAL checkpoint)
571        if let Some(cursor_store) = self.cursor_store.write().await.take() {
572            cursor_store.close().await;
573        }
574
575        let _ = self.state_tx.send(EngineState::Stopped);
576        metrics::set_engine_state("Stopped");
577        info!("Replication engine stopped");
578    }
579
580    /// Get the peer manager (for metrics/diagnostics).
581    pub fn peer_manager(&self) -> &Arc<PeerManager> {
582        &self.peer_manager
583    }
584
585    /// Get the node ID.
586    pub fn node_id(&self) -> &str {
587        &self.config.local_node_id
588    }
589}
590
591#[cfg(test)]
592mod tests {
593    use super::*;
594    use crate::config::{PeerConfig, CursorConfig};
595
596    fn test_config() -> ReplicationConfig {
597        ReplicationConfig {
598            local_node_id: "test-node".to_string(),
599            peers: vec![],
600            settings: Default::default(),
601            cursor: CursorConfig::in_memory(),
602        }
603    }
604
605    #[test]
606    fn test_engine_initial_state() {
607        let (_tx, rx) = watch::channel(test_config());
608        let engine = ReplicationEngine::new(test_config(), rx);
609        
610        assert_eq!(engine.state(), EngineState::Created);
611        assert!(!engine.is_running());
612        assert_eq!(engine.node_id(), "test-node");
613    }
614
615    #[test]
616    fn test_engine_state_receiver() {
617        let (_tx, rx) = watch::channel(test_config());
618        let engine = ReplicationEngine::new(test_config(), rx);
619        
620        let state_rx = engine.state_receiver();
621        assert_eq!(*state_rx.borrow(), EngineState::Created);
622    }
623
624    #[test]
625    fn test_engine_with_sync_engine() {
626        let (_tx, rx) = watch::channel(test_config());
627        let sync_engine = Arc::new(NoOpSyncEngine);
628        let engine = ReplicationEngine::with_sync_engine(
629            test_config(),
630            rx,
631            sync_engine,
632        );
633        
634        assert_eq!(engine.state(), EngineState::Created);
635        // sync_engine accessor should work
636        let _ = engine.sync_engine();
637    }
638
639    #[test]
640    fn test_engine_circuit_accessor() {
641        let (_tx, rx) = watch::channel(test_config());
642        let engine = ReplicationEngine::new(test_config(), rx);
643        
644        // Circuit breaker should be accessible
645        let circuit = engine.circuit();
646        // Just verify we can access it
647        let _ = circuit.clone();
648    }
649
650    #[test]
651    fn test_engine_peer_manager_accessor() {
652        let (_tx, rx) = watch::channel(test_config());
653        let engine = ReplicationEngine::new(test_config(), rx);
654        
655        let pm = engine.peer_manager();
656        // Empty config = no peers
657        assert!(pm.all().is_empty());
658    }
659
660    #[test]
661    fn test_engine_with_peers() {
662        let mut config = test_config();
663        config.peers.push(PeerConfig::for_testing("peer-1", "redis://localhost:6379"));
664        config.peers.push(PeerConfig::for_testing("peer-2", "redis://localhost:6380"));
665        
666        let (_tx, rx) = watch::channel(config.clone());
667        let engine = ReplicationEngine::new(config, rx);
668        
669        let pm = engine.peer_manager();
670        // Both peers added
671        assert_eq!(pm.all().len(), 2);
672    }
673
674    #[tokio::test]
675    async fn test_engine_start_invalid_state() {
676        let (_tx, rx) = watch::channel(test_config());
677        let mut engine = ReplicationEngine::new(test_config(), rx);
678        
679        // Force state to Running (simulating already started)
680        let _ = engine.state_tx.send(EngineState::Running);
681        
682        // Trying to start should fail
683        let result = engine.start().await;
684        assert!(result.is_err());
685        
686        if let Err(ReplicationError::InvalidState { expected, actual }) = result {
687            assert_eq!(expected, "Created");
688            assert_eq!(actual, "Running");
689        } else {
690            panic!("Expected InvalidState error");
691        }
692    }
693
694    #[tokio::test]
695    async fn test_engine_shutdown_from_created() {
696        let (_tx, rx) = watch::channel(test_config());
697        let mut engine = ReplicationEngine::new(test_config(), rx);
698        
699        // Shutdown from Created state should work
700        engine.shutdown().await;
701        
702        assert_eq!(engine.state(), EngineState::Stopped);
703        assert!(!engine.is_running());
704    }
705
706    #[test]
707    fn test_engine_state_is_running() {
708        let (_tx, rx) = watch::channel(test_config());
709        let engine = ReplicationEngine::new(test_config(), rx);
710        
711        // Initially not running
712        assert!(!engine.is_running());
713        
714        // Manually set to running
715        let _ = engine.state_tx.send(EngineState::Running);
716        assert!(engine.is_running());
717        
718        // Set to stopped
719        let _ = engine.state_tx.send(EngineState::Stopped);
720        assert!(!engine.is_running());
721    }
722}