replication_engine/coordinator/
mod.rs

1// Copyright (c) 2025-2026 Adrian Robinson. Licensed under the AGPL-3.0.
2// See LICENSE file in the project root for full license text.
3
4//! Replication engine coordinator.
5//!
6//! The main orchestrator that ties together:
7//! - Peer connections via [`crate::peer::PeerConnection`]
8//! - Stream tailing (hot path) via [`crate::stream::StreamTailer`]
9//! - Cursor persistence via [`crate::cursor::CursorStore`]
10//! - Merkle repair (cold path) for consistency
11//!
12//! # Architecture
13//!
14//! The coordinator manages the full replication lifecycle:
15//! 1. Connects to configured peers
16//! 2. Tails CDC streams for real-time updates (hot path)
17//! 3. Periodically runs Merkle tree repair (cold path)
18//! 4. Handles graceful shutdown with in-flight batch draining
19
20mod types;
21mod hot_path;
22mod cold_path;
23
24pub use types::{EngineState, HealthCheck, PeerHealth};
25
26use crate::circuit_breaker::SyncEngineCircuit;
27use crate::config::ReplicationConfig;
28use crate::cursor::CursorStore;
29use crate::error::{ReplicationError, Result};
30use crate::metrics;
31use crate::peer::PeerManager;
32use crate::resilience::{RetryConfig, RateLimiter};
33use crate::sync_engine::{SyncEngineRef, NoOpSyncEngine};
34use std::sync::Arc;
35use std::sync::atomic::AtomicUsize;
36use tokio::sync::{watch, RwLock};
37use tracing::{debug, error, info, warn};
38
39/// The main replication engine.
40///
41/// Manages bidirectional data sync between this node and its mesh peers.
42/// 
43/// # Sync Engine Integration
44/// 
45/// The replication engine is passed a reference to the local sync-engine by the daemon.
46/// We use this to:
47/// - Write replicated data from peers (`submit`)
48/// - Check for duplicates before applying (`is_current`)
49/// - Query Merkle tree for cold path repair
50///
51/// We **never** write to any CDC stream — that's sync-engine's responsibility.
52/// We only **read** from peer CDC streams and **write** to local sync-engine.
53pub struct ReplicationEngine<S: SyncEngineRef = NoOpSyncEngine> {
54    /// Configuration (can be updated at runtime)
55    config: ReplicationConfig,
56
57    /// Runtime config updates
58    #[allow(dead_code)]
59    config_rx: watch::Receiver<ReplicationConfig>,
60
61    /// Engine state (broadcast to watchers)
62    state_tx: watch::Sender<EngineState>,
63
64    /// Engine state receiver (for internal use)
65    state_rx: watch::Receiver<EngineState>,
66
67    /// Reference to local sync-engine (passed from daemon)
68    sync_engine: Arc<S>,
69
70    /// Circuit breaker for sync-engine protection
71    circuit: Arc<SyncEngineCircuit>,
72
73    /// Peer connection manager
74    peer_manager: Arc<PeerManager>,
75
76    /// Cursor persistence store
77    cursor_store: Arc<RwLock<Option<CursorStore>>>,
78
79    /// Shutdown signal sender
80    shutdown_tx: watch::Sender<bool>,
81
82    /// Shutdown signal receiver
83    shutdown_rx: watch::Receiver<bool>,
84
85    /// Count of peers currently catching up (cursor far behind stream head).
86    /// When > 0, cold path repair is skipped to avoid duplicate work.
87    catching_up_count: Arc<AtomicUsize>,
88
89    /// Hot path task handles
90    hot_path_handles: RwLock<Vec<tokio::task::JoinHandle<()>>>,
91}
92
93impl ReplicationEngine<NoOpSyncEngine> {
94    /// Create a new replication engine with no-op sync engine (for testing/standalone).
95    ///
96    /// The engine starts in `Created` state. Call [`start()`](Self::start)
97    /// to connect to peers and begin replication.
98    pub fn new(
99        config: ReplicationConfig,
100        config_rx: watch::Receiver<ReplicationConfig>,
101    ) -> Self {
102        Self::with_sync_engine(config, config_rx, Arc::new(NoOpSyncEngine))
103    }
104}
105
106impl<S: SyncEngineRef> ReplicationEngine<S> {
107    /// Create a new replication engine with a sync-engine reference.
108    ///
109    /// This is the primary constructor used by the daemon.
110    ///
111    /// # Arguments
112    /// * `config` - Replication configuration
113    /// * `config_rx` - Watch channel for config updates
114    /// * `sync_engine` - Reference to local sync-engine (for writes and dedup)
115    pub fn with_sync_engine(
116        config: ReplicationConfig,
117        config_rx: watch::Receiver<ReplicationConfig>,
118        sync_engine: Arc<S>,
119    ) -> Self {
120        let (state_tx, state_rx) = watch::channel(EngineState::Created);
121        let (shutdown_tx, shutdown_rx) = watch::channel(false);
122
123        // Create peer manager with daemon retry config
124        let retry_config = RetryConfig::daemon();
125        let peer_manager = Arc::new(PeerManager::new(retry_config));
126
127        // Add peers from config
128        for peer_config in &config.peers {
129            peer_manager.add_peer(peer_config.clone());
130        }
131
132        Self {
133            config,
134            config_rx,
135            state_tx,
136            state_rx,
137            sync_engine,
138            circuit: Arc::new(SyncEngineCircuit::new()),
139            peer_manager,
140            cursor_store: Arc::new(RwLock::new(None)),
141            shutdown_tx,
142            shutdown_rx,
143            catching_up_count: Arc::new(AtomicUsize::new(0)),
144            hot_path_handles: RwLock::new(Vec::new()),
145        }
146    }
147
148    /// Get a reference to the sync engine.
149    pub fn sync_engine(&self) -> &Arc<S> {
150        &self.sync_engine
151    }
152
153    /// Get a reference to the circuit breaker for sync-engine protection.
154    pub fn circuit(&self) -> &Arc<SyncEngineCircuit> {
155        &self.circuit
156    }
157
158    /// Get current engine state.
159    pub fn state(&self) -> EngineState {
160        *self.state_rx.borrow()
161    }
162
163    /// Get a receiver to watch state changes.
164    pub fn state_receiver(&self) -> watch::Receiver<EngineState> {
165        self.state_rx.clone()
166    }
167
168    /// Check if engine is running.
169    pub fn is_running(&self) -> bool {
170        matches!(self.state(), EngineState::Running)
171    }
172
173    /// Get comprehensive health status for monitoring endpoints.
174    ///
175    /// Returns a [`HealthCheck`] struct containing:
176    /// - Engine state and readiness
177    /// - Sync-engine backpressure status
178    /// - Per-peer connectivity, circuit breaker state, and lag
179    ///
180    /// **Performance**: This method performs no network I/O. All data is
181    /// collected from cached internal state (atomics, mutexes, watch channels).
182    ///
183    /// # Example
184    ///
185    /// ```rust,ignore
186    /// let health = engine.health_check().await;
187    /// 
188    /// // For /ready endpoint
189    /// if health.ready {
190    ///     HttpResponse::Ok()
191    /// } else {
192    ///     HttpResponse::ServiceUnavailable()
193    /// }
194    ///
195    /// // For /health endpoint (full diagnostics)
196    /// HttpResponse::Ok().json(serde_json::json!({
197    ///     "healthy": health.healthy,
198    ///     "state": health.state.to_string(),
199    ///     "peers_connected": health.peers_connected,
200    ///     "peers_total": health.peers_total,
201    ///     "sync_engine_accepting_writes": health.sync_engine_accepting_writes,
202    /// }))
203    /// ```
204    pub async fn health_check(&self) -> HealthCheck {
205        use crate::peer::PeerCircuitState;
206        
207        let state = self.state();
208        let sync_engine_accepting_writes = self.sync_engine.should_accept_writes();
209        
210        // Collect peer health
211        let all_peers = self.peer_manager.all();
212        let peers_total = all_peers.len();
213        let mut peers = Vec::with_capacity(peers_total);
214        let mut peers_connected = 0;
215        let mut peers_circuit_open = 0;
216        let peers_catching_up = self.catching_up_count.load(std::sync::atomic::Ordering::Relaxed);
217        
218        for peer in all_peers {
219            let connected = peer.is_connected().await;
220            let circuit_state = peer.circuit_state().await;
221            let circuit_open = circuit_state == PeerCircuitState::Open;
222            
223            if connected {
224                peers_connected += 1;
225            }
226            if circuit_open {
227                peers_circuit_open += 1;
228            }
229            
230            peers.push(PeerHealth {
231                node_id: peer.node_id().to_string(),
232                connected,
233                circuit_state,
234                circuit_open,
235                failure_count: peer.failure_count(),
236                millis_since_success: peer.millis_since_success(),
237                lag_ms: None, // TODO: Track per-peer lag in hot path
238                catching_up: false, // TODO: Track per-peer catching up state
239            });
240        }
241        
242        // Determine readiness and health
243        let ready = state == EngineState::Running && peers_connected > 0;
244        let healthy = ready && sync_engine_accepting_writes;
245        
246        HealthCheck {
247            state,
248            ready,
249            sync_engine_accepting_writes,
250            peers_total,
251            peers_connected,
252            peers_circuit_open,
253            peers_catching_up,
254            peers,
255            healthy,
256        }
257    }
258
259    /// Start the replication engine.
260    ///
261    /// 1. Opens cursor store (SQLite)
262    /// 2. Connects to all enabled peers
263    /// 3. Spawns hot path tailers for each peer
264    /// 4. Spawns cold path repair task (if enabled)
265    pub async fn start(&mut self) -> Result<()> {
266        if self.state() != EngineState::Created {
267            return Err(ReplicationError::InvalidState {
268                expected: "Created".to_string(),
269                actual: format!("{:?}", self.state()),
270            });
271        }
272
273        info!(
274            node_id = %self.config.local_node_id,
275            peer_count = self.config.peers.len(),
276            "Starting replication engine"
277        );
278
279        // Update state
280        let _ = self.state_tx.send(EngineState::Connecting);
281        metrics::set_engine_state("Connecting");
282
283        // Initialize cursor store
284        let cursor_store = CursorStore::new(&self.config.cursor.sqlite_path).await?;
285        *self.cursor_store.write().await = Some(cursor_store);
286        info!(path = %self.config.cursor.sqlite_path, "Cursor store initialized");
287
288        // Connect to peers
289        let results = self.peer_manager.connect_all().await;
290        let connected = results.iter().filter(|r| r.is_ok()).count();
291        let failed = results.iter().filter(|r| r.is_err()).count();
292
293        // Update connected peers metric
294        metrics::set_connected_peers(connected);
295
296        if failed > 0 {
297            warn!(connected, failed, "Some peer connections failed");
298        }
299
300        if connected == 0 && !self.config.peers.is_empty() {
301            error!("Failed to connect to any peers");
302            let _ = self.state_tx.send(EngineState::Failed);
303            metrics::set_engine_state("Failed");
304            return Err(ReplicationError::Internal(
305                "No peers connected".to_string(),
306            ));
307        }
308
309        // Spawn hot path tasks for each connected peer
310        self.spawn_hot_path_tasks().await;
311
312        // Spawn cold path task if enabled
313        if self.config.settings.cold_path.enabled {
314            self.spawn_cold_path_task().await;
315        }
316
317        // Spawn cursor flush task (debounced writes)
318        self.spawn_cursor_flush_task().await;
319
320        // Spawn peer health check task if enabled
321        if self.config.settings.peer_health.enabled {
322            self.spawn_peer_health_task().await;
323        }
324
325        let _ = self.state_tx.send(EngineState::Running);
326        metrics::set_engine_state("Running");
327        info!(
328            connected,
329            total = self.config.peers.len(),
330            "Replication engine running"
331        );
332
333        Ok(())
334    }
335
336    /// Spawn hot path tailer tasks for each peer.
337    async fn spawn_hot_path_tasks(&self) {
338        let peers = self.peer_manager.all();
339        let mut handles = self.hot_path_handles.write().await;
340
341        // Create shared rate limiter if enabled (shared across all peers)
342        let rate_limiter: Option<Arc<RateLimiter>> = self.config.settings.hot_path
343            .rate_limit_config()
344            .map(|cfg| {
345                info!(
346                    rate_per_sec = cfg.refill_rate,
347                    burst = cfg.burst_size,
348                    "Rate limiting enabled for hot path"
349                );
350                Arc::new(RateLimiter::new(cfg))
351            });
352
353        for peer in peers {
354            if !peer.is_connected().await {
355                continue;
356            }
357
358            let peer_id = peer.node_id().to_string();
359            let peer = Arc::clone(&peer);
360            let cursor_store = Arc::clone(&self.cursor_store);
361            let sync_engine = Arc::clone(&self.sync_engine);
362            let circuit = Arc::clone(&self.circuit);
363            let shutdown_rx = self.shutdown_rx.clone();
364            let config = self.config.settings.hot_path.clone();
365            let catching_up_count = Arc::clone(&self.catching_up_count);
366            let rate_limiter = rate_limiter.clone();
367
368            let handle = tokio::spawn(async move {
369                hot_path::run_tailer(peer, cursor_store, sync_engine, circuit, config, shutdown_rx, catching_up_count, rate_limiter).await;
370            });
371
372            info!(peer_id = %peer_id, "Spawned hot path tailer");
373            handles.push(handle);
374        }
375    }
376
377    /// Spawn cold path repair task.
378    async fn spawn_cold_path_task(&self)
379    where
380        S: Send + Sync + 'static,
381    {
382        let sync_engine = Arc::clone(&self.sync_engine);
383        let peer_manager = Arc::clone(&self.peer_manager);
384        let shutdown_rx = self.shutdown_rx.clone();
385        let config = self.config.settings.cold_path.clone();
386        let catching_up_count = Arc::clone(&self.catching_up_count);
387
388        let handle = tokio::spawn(async move {
389            cold_path::run_repair(sync_engine, peer_manager, config, shutdown_rx, catching_up_count).await;
390        });
391
392        info!("Spawned cold path repair task");
393        self.hot_path_handles.write().await.push(handle);
394    }
395
396    /// Spawn cursor flush task for debounced writes.
397    ///
398    /// Periodically flushes dirty cursors to SQLite (every 5 seconds).
399    async fn spawn_cursor_flush_task(&self) {
400        let cursor_store = Arc::clone(&self.cursor_store);
401        let mut shutdown_rx = self.shutdown_rx.clone();
402
403        let handle = tokio::spawn(async move {
404            let flush_interval = std::time::Duration::from_secs(5);
405            let mut timer = tokio::time::interval(flush_interval);
406
407            loop {
408                tokio::select! {
409                    _ = timer.tick() => {
410                        let store_guard = cursor_store.read().await;
411                        if let Some(ref store) = *store_guard {
412                            if let Err(e) = store.flush_dirty().await {
413                                warn!(error = %e, "Failed to flush cursors");
414                            }
415                        }
416                    }
417                    _ = shutdown_rx.changed() => {
418                        if *shutdown_rx.borrow() {
419                            debug!("Cursor flush task stopping");
420                            break;
421                        }
422                    }
423                }
424            }
425        });
426
427        debug!("Spawned cursor flush task");
428        self.hot_path_handles.write().await.push(handle);
429    }
430
431    /// Spawn peer health check task.
432    ///
433    /// Periodically pings idle peers to check connection health
434    /// and record latency for observability.
435    async fn spawn_peer_health_task(&self) {
436        let peer_manager = Arc::clone(&self.peer_manager);
437        let mut shutdown_rx = self.shutdown_rx.clone();
438        let config = self.config.settings.peer_health.clone();
439
440        let handle = tokio::spawn(async move {
441            let ping_interval = std::time::Duration::from_secs(config.ping_interval_sec);
442            let idle_threshold_ms = config.idle_threshold_sec * 1000;
443            let mut timer = tokio::time::interval(ping_interval);
444
445            info!(
446                ping_interval_sec = config.ping_interval_sec,
447                idle_threshold_sec = config.idle_threshold_sec,
448                "Starting peer health check task"
449            );
450
451            loop {
452                tokio::select! {
453                    _ = timer.tick() => {
454                        // Check each connected peer
455                        for peer in peer_manager.all() {
456                            if !peer.is_connected().await {
457                                continue;
458                            }
459
460                            // Only ping if idle (no recent successful contact)
461                            let idle_ms = peer.millis_since_success();
462                            if idle_ms < idle_threshold_ms {
463                                continue;
464                            }
465
466                            let peer_id = peer.node_id().to_string();
467                            debug!(
468                                peer_id = %peer_id,
469                                idle_ms,
470                                "Pinging idle peer"
471                            );
472
473                            match peer.ping().await {
474                                Ok(latency) => {
475                                    debug!(
476                                        peer_id = %peer_id,
477                                        latency_ms = latency.as_millis(),
478                                        "Peer ping successful"
479                                    );
480                                }
481                                Err(e) => {
482                                    warn!(
483                                        peer_id = %peer_id,
484                                        error = %e,
485                                        "Peer ping failed"
486                                    );
487                                    // Connection may be stale - mark disconnected
488                                    peer.mark_disconnected().await;
489                                }
490                            }
491                        }
492                    }
493                    _ = shutdown_rx.changed() => {
494                        if *shutdown_rx.borrow() {
495                            debug!("Peer health task stopping");
496                            break;
497                        }
498                    }
499                }
500            }
501
502            info!("Peer health check task stopped");
503        });
504
505        info!("Spawned peer health check task");
506        self.hot_path_handles.write().await.push(handle);
507    }
508
509    /// Shutdown the replication engine gracefully.
510    ///
511    /// Shutdown sequence:
512    /// 1. Signal all hot/cold path tasks to stop
513    /// 2. Wait for tasks to flush pending batches (with timeout)
514    /// 3. Shutdown peer connections
515    /// 4. Checkpoint and close cursor store
516    pub async fn shutdown(&mut self) {
517        info!("Shutting down replication engine");
518        let _ = self.state_tx.send(EngineState::ShuttingDown);
519        metrics::set_engine_state("ShuttingDown");
520
521        // Signal shutdown to all tasks
522        let _ = self.shutdown_tx.send(true);
523
524        // Wait for tasks to complete gracefully (they'll flush pending batches)
525        let handles: Vec<_> = {
526            let mut guard = self.hot_path_handles.write().await;
527            std::mem::take(&mut *guard)
528        };
529
530        let task_count = handles.len();
531        if task_count > 0 {
532            info!(task_count, "Waiting for tasks to drain and complete");
533        }
534
535        // Give tasks time to flush their batches (10 seconds should be plenty)
536        let drain_timeout = std::time::Duration::from_secs(10);
537        for (i, handle) in handles.into_iter().enumerate() {
538            match tokio::time::timeout(drain_timeout, handle).await {
539                Ok(Ok(())) => {
540                    debug!(task = i + 1, "Task completed gracefully");
541                }
542                Ok(Err(e)) => {
543                    warn!(task = i + 1, error = %e, "Task panicked during shutdown");
544                }
545                Err(_) => {
546                    warn!(task = i + 1, "Task timed out during shutdown (batch may be lost)");
547                }
548            }
549        }
550
551        // Shutdown peer manager
552        self.peer_manager.shutdown_all();
553        metrics::set_connected_peers(0);
554
555        // Close cursor store (includes WAL checkpoint)
556        if let Some(cursor_store) = self.cursor_store.write().await.take() {
557            cursor_store.close().await;
558        }
559
560        let _ = self.state_tx.send(EngineState::Stopped);
561        metrics::set_engine_state("Stopped");
562        info!("Replication engine stopped");
563    }
564
565    /// Get the peer manager (for metrics/diagnostics).
566    pub fn peer_manager(&self) -> &Arc<PeerManager> {
567        &self.peer_manager
568    }
569
570    /// Get the node ID.
571    pub fn node_id(&self) -> &str {
572        &self.config.local_node_id
573    }
574}
575
576#[cfg(test)]
577mod tests {
578    use super::*;
579    use crate::config::{PeerConfig, CursorConfig};
580
581    fn test_config() -> ReplicationConfig {
582        ReplicationConfig {
583            local_node_id: "test-node".to_string(),
584            peers: vec![],
585            settings: Default::default(),
586            cursor: CursorConfig::in_memory(),
587        }
588    }
589
590    #[test]
591    fn test_engine_initial_state() {
592        let (_tx, rx) = watch::channel(test_config());
593        let engine = ReplicationEngine::new(test_config(), rx);
594        
595        assert_eq!(engine.state(), EngineState::Created);
596        assert!(!engine.is_running());
597        assert_eq!(engine.node_id(), "test-node");
598    }
599
600    #[test]
601    fn test_engine_state_receiver() {
602        let (_tx, rx) = watch::channel(test_config());
603        let engine = ReplicationEngine::new(test_config(), rx);
604        
605        let state_rx = engine.state_receiver();
606        assert_eq!(*state_rx.borrow(), EngineState::Created);
607    }
608
609    #[test]
610    fn test_engine_with_sync_engine() {
611        let (_tx, rx) = watch::channel(test_config());
612        let sync_engine = Arc::new(NoOpSyncEngine);
613        let engine = ReplicationEngine::with_sync_engine(
614            test_config(),
615            rx,
616            sync_engine,
617        );
618        
619        assert_eq!(engine.state(), EngineState::Created);
620        // sync_engine accessor should work
621        let _ = engine.sync_engine();
622    }
623
624    #[test]
625    fn test_engine_circuit_accessor() {
626        let (_tx, rx) = watch::channel(test_config());
627        let engine = ReplicationEngine::new(test_config(), rx);
628        
629        // Circuit breaker should be accessible
630        let circuit = engine.circuit();
631        // Just verify we can access it
632        let _ = circuit.clone();
633    }
634
635    #[test]
636    fn test_engine_peer_manager_accessor() {
637        let (_tx, rx) = watch::channel(test_config());
638        let engine = ReplicationEngine::new(test_config(), rx);
639        
640        let pm = engine.peer_manager();
641        // Empty config = no peers
642        assert!(pm.all().is_empty());
643    }
644
645    #[test]
646    fn test_engine_with_peers() {
647        let mut config = test_config();
648        config.peers.push(PeerConfig::for_testing("peer-1", "redis://localhost:6379"));
649        config.peers.push(PeerConfig::for_testing("peer-2", "redis://localhost:6380"));
650        
651        let (_tx, rx) = watch::channel(config.clone());
652        let engine = ReplicationEngine::new(config, rx);
653        
654        let pm = engine.peer_manager();
655        // Both peers added
656        assert_eq!(pm.all().len(), 2);
657    }
658
659    #[tokio::test]
660    async fn test_engine_start_invalid_state() {
661        let (_tx, rx) = watch::channel(test_config());
662        let mut engine = ReplicationEngine::new(test_config(), rx);
663        
664        // Force state to Running (simulating already started)
665        let _ = engine.state_tx.send(EngineState::Running);
666        
667        // Trying to start should fail
668        let result = engine.start().await;
669        assert!(result.is_err());
670        
671        if let Err(ReplicationError::InvalidState { expected, actual }) = result {
672            assert_eq!(expected, "Created");
673            assert_eq!(actual, "Running");
674        } else {
675            panic!("Expected InvalidState error");
676        }
677    }
678
679    #[tokio::test]
680    async fn test_engine_shutdown_from_created() {
681        let (_tx, rx) = watch::channel(test_config());
682        let mut engine = ReplicationEngine::new(test_config(), rx);
683        
684        // Shutdown from Created state should work
685        engine.shutdown().await;
686        
687        assert_eq!(engine.state(), EngineState::Stopped);
688        assert!(!engine.is_running());
689    }
690
691    #[test]
692    fn test_engine_state_is_running() {
693        let (_tx, rx) = watch::channel(test_config());
694        let engine = ReplicationEngine::new(test_config(), rx);
695        
696        // Initially not running
697        assert!(!engine.is_running());
698        
699        // Manually set to running
700        let _ = engine.state_tx.send(EngineState::Running);
701        assert!(engine.is_running());
702        
703        // Set to stopped
704        let _ = engine.state_tx.send(EngineState::Stopped);
705        assert!(!engine.is_running());
706    }
707}