replication_engine/coordinator/
mod.rs

1// Copyright (c) 2025-2026 Adrian Robinson. Licensed under the AGPL-3.0.
2// See LICENSE file in the project root for full license text.
3
4//! Replication engine coordinator.
5//!
6//! The main orchestrator that ties together:
7//! - Peer connections via [`crate::peer::PeerConnection`]
8//! - Stream tailing (hot path) via [`crate::stream::StreamTailer`]
9//! - Cursor persistence via [`crate::cursor::CursorStore`]
10//! - Merkle repair (cold path) for consistency
11//!
12//! # Architecture
13//!
14//! The coordinator manages the full replication lifecycle:
15//! 1. Connects to configured peers
16//! 2. Tails CDC streams for real-time updates (hot path)
17//! 3. Periodically runs Merkle tree repair (cold path)
18//! 4. Handles graceful shutdown with in-flight batch draining
19
20mod types;
21mod hot_path;
22mod cold_path;
23
24pub use types::EngineState;
25
26use crate::circuit_breaker::SyncEngineCircuit;
27use crate::config::ReplicationConfig;
28use crate::cursor::CursorStore;
29use crate::error::{ReplicationError, Result};
30use crate::metrics;
31use crate::peer::PeerManager;
32use crate::resilience::{RetryConfig, RateLimiter};
33use crate::sync_engine::{SyncEngineRef, NoOpSyncEngine};
34use std::sync::Arc;
35use std::sync::atomic::AtomicUsize;
36use tokio::sync::{watch, RwLock};
37use tracing::{debug, error, info, warn};
38
39/// The main replication engine.
40///
41/// Manages bidirectional data sync between this node and its mesh peers.
42/// 
43/// # Sync Engine Integration
44/// 
45/// The replication engine is passed a reference to the local sync-engine by the daemon.
46/// We use this to:
47/// - Write replicated data from peers (`submit`)
48/// - Check for duplicates before applying (`is_current`)
49/// - Query Merkle tree for cold path repair
50///
51/// We **never** write to any CDC stream — that's sync-engine's responsibility.
52/// We only **read** from peer CDC streams and **write** to local sync-engine.
53pub struct ReplicationEngine<S: SyncEngineRef = NoOpSyncEngine> {
54    /// Configuration (can be updated at runtime)
55    config: ReplicationConfig,
56
57    /// Runtime config updates
58    #[allow(dead_code)]
59    config_rx: watch::Receiver<ReplicationConfig>,
60
61    /// Engine state (broadcast to watchers)
62    state_tx: watch::Sender<EngineState>,
63
64    /// Engine state receiver (for internal use)
65    state_rx: watch::Receiver<EngineState>,
66
67    /// Reference to local sync-engine (passed from daemon)
68    sync_engine: Arc<S>,
69
70    /// Circuit breaker for sync-engine protection
71    circuit: Arc<SyncEngineCircuit>,
72
73    /// Peer connection manager
74    peer_manager: Arc<PeerManager>,
75
76    /// Cursor persistence store
77    cursor_store: Arc<RwLock<Option<CursorStore>>>,
78
79    /// Shutdown signal sender
80    shutdown_tx: watch::Sender<bool>,
81
82    /// Shutdown signal receiver
83    shutdown_rx: watch::Receiver<bool>,
84
85    /// Count of peers currently catching up (cursor far behind stream head).
86    /// When > 0, cold path repair is skipped to avoid duplicate work.
87    catching_up_count: Arc<AtomicUsize>,
88
89    /// Hot path task handles
90    hot_path_handles: RwLock<Vec<tokio::task::JoinHandle<()>>>,
91}
92
93impl ReplicationEngine<NoOpSyncEngine> {
94    /// Create a new replication engine with no-op sync engine (for testing/standalone).
95    ///
96    /// The engine starts in `Created` state. Call [`start()`](Self::start)
97    /// to connect to peers and begin replication.
98    pub fn new(
99        config: ReplicationConfig,
100        config_rx: watch::Receiver<ReplicationConfig>,
101    ) -> Self {
102        Self::with_sync_engine(config, config_rx, Arc::new(NoOpSyncEngine))
103    }
104}
105
106impl<S: SyncEngineRef> ReplicationEngine<S> {
107    /// Create a new replication engine with a sync-engine reference.
108    ///
109    /// This is the primary constructor used by the daemon.
110    ///
111    /// # Arguments
112    /// * `config` - Replication configuration
113    /// * `config_rx` - Watch channel for config updates
114    /// * `sync_engine` - Reference to local sync-engine (for writes and dedup)
115    pub fn with_sync_engine(
116        config: ReplicationConfig,
117        config_rx: watch::Receiver<ReplicationConfig>,
118        sync_engine: Arc<S>,
119    ) -> Self {
120        let (state_tx, state_rx) = watch::channel(EngineState::Created);
121        let (shutdown_tx, shutdown_rx) = watch::channel(false);
122
123        // Create peer manager with daemon retry config
124        let retry_config = RetryConfig::daemon();
125        let peer_manager = Arc::new(PeerManager::new(retry_config));
126
127        // Add peers from config
128        for peer_config in &config.peers {
129            peer_manager.add_peer(peer_config.clone());
130        }
131
132        Self {
133            config,
134            config_rx,
135            state_tx,
136            state_rx,
137            sync_engine,
138            circuit: Arc::new(SyncEngineCircuit::new()),
139            peer_manager,
140            cursor_store: Arc::new(RwLock::new(None)),
141            shutdown_tx,
142            shutdown_rx,
143            catching_up_count: Arc::new(AtomicUsize::new(0)),
144            hot_path_handles: RwLock::new(Vec::new()),
145        }
146    }
147
148    /// Get a reference to the sync engine.
149    pub fn sync_engine(&self) -> &Arc<S> {
150        &self.sync_engine
151    }
152
153    /// Get a reference to the circuit breaker for sync-engine protection.
154    pub fn circuit(&self) -> &Arc<SyncEngineCircuit> {
155        &self.circuit
156    }
157
158    /// Get current engine state.
159    pub fn state(&self) -> EngineState {
160        *self.state_rx.borrow()
161    }
162
163    /// Get a receiver to watch state changes.
164    pub fn state_receiver(&self) -> watch::Receiver<EngineState> {
165        self.state_rx.clone()
166    }
167
168    /// Check if engine is running.
169    pub fn is_running(&self) -> bool {
170        matches!(self.state(), EngineState::Running)
171    }
172
173    /// Start the replication engine.
174    ///
175    /// 1. Opens cursor store (SQLite)
176    /// 2. Connects to all enabled peers
177    /// 3. Spawns hot path tailers for each peer
178    /// 4. Spawns cold path repair task (if enabled)
179    pub async fn start(&mut self) -> Result<()> {
180        if self.state() != EngineState::Created {
181            return Err(ReplicationError::InvalidState {
182                expected: "Created".to_string(),
183                actual: format!("{:?}", self.state()),
184            });
185        }
186
187        info!(
188            node_id = %self.config.local_node_id,
189            peer_count = self.config.peers.len(),
190            "Starting replication engine"
191        );
192
193        // Update state
194        let _ = self.state_tx.send(EngineState::Connecting);
195        metrics::set_engine_state("Connecting");
196
197        // Initialize cursor store
198        let cursor_store = CursorStore::new(&self.config.cursor.sqlite_path).await?;
199        *self.cursor_store.write().await = Some(cursor_store);
200        info!(path = %self.config.cursor.sqlite_path, "Cursor store initialized");
201
202        // Connect to peers
203        let results = self.peer_manager.connect_all().await;
204        let connected = results.iter().filter(|r| r.is_ok()).count();
205        let failed = results.iter().filter(|r| r.is_err()).count();
206
207        // Update connected peers metric
208        metrics::set_connected_peers(connected);
209
210        if failed > 0 {
211            warn!(connected, failed, "Some peer connections failed");
212        }
213
214        if connected == 0 && !self.config.peers.is_empty() {
215            error!("Failed to connect to any peers");
216            let _ = self.state_tx.send(EngineState::Failed);
217            metrics::set_engine_state("Failed");
218            return Err(ReplicationError::Internal(
219                "No peers connected".to_string(),
220            ));
221        }
222
223        // Spawn hot path tasks for each connected peer
224        self.spawn_hot_path_tasks().await;
225
226        // Spawn cold path task if enabled
227        if self.config.settings.cold_path.enabled {
228            self.spawn_cold_path_task().await;
229        }
230
231        // Spawn cursor flush task (debounced writes)
232        self.spawn_cursor_flush_task().await;
233
234        // Spawn peer health check task if enabled
235        if self.config.settings.peer_health.enabled {
236            self.spawn_peer_health_task().await;
237        }
238
239        let _ = self.state_tx.send(EngineState::Running);
240        metrics::set_engine_state("Running");
241        info!(
242            connected,
243            total = self.config.peers.len(),
244            "Replication engine running"
245        );
246
247        Ok(())
248    }
249
250    /// Spawn hot path tailer tasks for each peer.
251    async fn spawn_hot_path_tasks(&self) {
252        let peers = self.peer_manager.all();
253        let mut handles = self.hot_path_handles.write().await;
254
255        // Create shared rate limiter if enabled (shared across all peers)
256        let rate_limiter: Option<Arc<RateLimiter>> = self.config.settings.hot_path
257            .rate_limit_config()
258            .map(|cfg| {
259                info!(
260                    rate_per_sec = cfg.refill_rate,
261                    burst = cfg.burst_size,
262                    "Rate limiting enabled for hot path"
263                );
264                Arc::new(RateLimiter::new(cfg))
265            });
266
267        for peer in peers {
268            if !peer.is_connected().await {
269                continue;
270            }
271
272            let peer_id = peer.node_id().to_string();
273            let peer = Arc::clone(&peer);
274            let cursor_store = Arc::clone(&self.cursor_store);
275            let sync_engine = Arc::clone(&self.sync_engine);
276            let circuit = Arc::clone(&self.circuit);
277            let shutdown_rx = self.shutdown_rx.clone();
278            let config = self.config.settings.hot_path.clone();
279            let catching_up_count = Arc::clone(&self.catching_up_count);
280            let rate_limiter = rate_limiter.clone();
281
282            let handle = tokio::spawn(async move {
283                hot_path::run_tailer(peer, cursor_store, sync_engine, circuit, config, shutdown_rx, catching_up_count, rate_limiter).await;
284            });
285
286            info!(peer_id = %peer_id, "Spawned hot path tailer");
287            handles.push(handle);
288        }
289    }
290
291    /// Spawn cold path repair task.
292    async fn spawn_cold_path_task(&self)
293    where
294        S: Send + Sync + 'static,
295    {
296        let sync_engine = Arc::clone(&self.sync_engine);
297        let peer_manager = Arc::clone(&self.peer_manager);
298        let shutdown_rx = self.shutdown_rx.clone();
299        let config = self.config.settings.cold_path.clone();
300        let catching_up_count = Arc::clone(&self.catching_up_count);
301
302        let handle = tokio::spawn(async move {
303            cold_path::run_repair(sync_engine, peer_manager, config, shutdown_rx, catching_up_count).await;
304        });
305
306        info!("Spawned cold path repair task");
307        self.hot_path_handles.write().await.push(handle);
308    }
309
310    /// Spawn cursor flush task for debounced writes.
311    ///
312    /// Periodically flushes dirty cursors to SQLite (every 5 seconds).
313    async fn spawn_cursor_flush_task(&self) {
314        let cursor_store = Arc::clone(&self.cursor_store);
315        let mut shutdown_rx = self.shutdown_rx.clone();
316
317        let handle = tokio::spawn(async move {
318            let flush_interval = std::time::Duration::from_secs(5);
319            let mut timer = tokio::time::interval(flush_interval);
320
321            loop {
322                tokio::select! {
323                    _ = timer.tick() => {
324                        let store_guard = cursor_store.read().await;
325                        if let Some(ref store) = *store_guard {
326                            if let Err(e) = store.flush_dirty().await {
327                                warn!(error = %e, "Failed to flush cursors");
328                            }
329                        }
330                    }
331                    _ = shutdown_rx.changed() => {
332                        if *shutdown_rx.borrow() {
333                            debug!("Cursor flush task stopping");
334                            break;
335                        }
336                    }
337                }
338            }
339        });
340
341        debug!("Spawned cursor flush task");
342        self.hot_path_handles.write().await.push(handle);
343    }
344
345    /// Spawn peer health check task.
346    ///
347    /// Periodically pings idle peers to check connection health
348    /// and record latency for observability.
349    async fn spawn_peer_health_task(&self) {
350        let peer_manager = Arc::clone(&self.peer_manager);
351        let mut shutdown_rx = self.shutdown_rx.clone();
352        let config = self.config.settings.peer_health.clone();
353
354        let handle = tokio::spawn(async move {
355            let ping_interval = std::time::Duration::from_secs(config.ping_interval_sec);
356            let idle_threshold_ms = config.idle_threshold_sec * 1000;
357            let mut timer = tokio::time::interval(ping_interval);
358
359            info!(
360                ping_interval_sec = config.ping_interval_sec,
361                idle_threshold_sec = config.idle_threshold_sec,
362                "Starting peer health check task"
363            );
364
365            loop {
366                tokio::select! {
367                    _ = timer.tick() => {
368                        // Check each connected peer
369                        for peer in peer_manager.all() {
370                            if !peer.is_connected().await {
371                                continue;
372                            }
373
374                            // Only ping if idle (no recent successful contact)
375                            let idle_ms = peer.millis_since_success();
376                            if idle_ms < idle_threshold_ms {
377                                continue;
378                            }
379
380                            let peer_id = peer.node_id().to_string();
381                            debug!(
382                                peer_id = %peer_id,
383                                idle_ms,
384                                "Pinging idle peer"
385                            );
386
387                            match peer.ping().await {
388                                Ok(latency) => {
389                                    debug!(
390                                        peer_id = %peer_id,
391                                        latency_ms = latency.as_millis(),
392                                        "Peer ping successful"
393                                    );
394                                }
395                                Err(e) => {
396                                    warn!(
397                                        peer_id = %peer_id,
398                                        error = %e,
399                                        "Peer ping failed"
400                                    );
401                                    // Connection may be stale - mark disconnected
402                                    peer.mark_disconnected().await;
403                                }
404                            }
405                        }
406                    }
407                    _ = shutdown_rx.changed() => {
408                        if *shutdown_rx.borrow() {
409                            debug!("Peer health task stopping");
410                            break;
411                        }
412                    }
413                }
414            }
415
416            info!("Peer health check task stopped");
417        });
418
419        info!("Spawned peer health check task");
420        self.hot_path_handles.write().await.push(handle);
421    }
422
423    /// Shutdown the replication engine gracefully.
424    ///
425    /// Shutdown sequence:
426    /// 1. Signal all hot/cold path tasks to stop
427    /// 2. Wait for tasks to flush pending batches (with timeout)
428    /// 3. Shutdown peer connections
429    /// 4. Checkpoint and close cursor store
430    pub async fn shutdown(&mut self) {
431        info!("Shutting down replication engine");
432        let _ = self.state_tx.send(EngineState::ShuttingDown);
433        metrics::set_engine_state("ShuttingDown");
434
435        // Signal shutdown to all tasks
436        let _ = self.shutdown_tx.send(true);
437
438        // Wait for tasks to complete gracefully (they'll flush pending batches)
439        let handles: Vec<_> = {
440            let mut guard = self.hot_path_handles.write().await;
441            std::mem::take(&mut *guard)
442        };
443
444        let task_count = handles.len();
445        if task_count > 0 {
446            info!(task_count, "Waiting for tasks to drain and complete");
447        }
448
449        // Give tasks time to flush their batches (10 seconds should be plenty)
450        let drain_timeout = std::time::Duration::from_secs(10);
451        for (i, handle) in handles.into_iter().enumerate() {
452            match tokio::time::timeout(drain_timeout, handle).await {
453                Ok(Ok(())) => {
454                    debug!(task = i + 1, "Task completed gracefully");
455                }
456                Ok(Err(e)) => {
457                    warn!(task = i + 1, error = %e, "Task panicked during shutdown");
458                }
459                Err(_) => {
460                    warn!(task = i + 1, "Task timed out during shutdown (batch may be lost)");
461                }
462            }
463        }
464
465        // Shutdown peer manager
466        self.peer_manager.shutdown_all();
467        metrics::set_connected_peers(0);
468
469        // Close cursor store (includes WAL checkpoint)
470        if let Some(cursor_store) = self.cursor_store.write().await.take() {
471            cursor_store.close().await;
472        }
473
474        let _ = self.state_tx.send(EngineState::Stopped);
475        metrics::set_engine_state("Stopped");
476        info!("Replication engine stopped");
477    }
478
479    /// Get the peer manager (for metrics/diagnostics).
480    pub fn peer_manager(&self) -> &Arc<PeerManager> {
481        &self.peer_manager
482    }
483
484    /// Get the node ID.
485    pub fn node_id(&self) -> &str {
486        &self.config.local_node_id
487    }
488}
489
490#[cfg(test)]
491mod tests {
492    use super::*;
493    use crate::config::{PeerConfig, CursorConfig};
494
495    fn test_config() -> ReplicationConfig {
496        ReplicationConfig {
497            local_node_id: "test-node".to_string(),
498            peers: vec![],
499            settings: Default::default(),
500            cursor: CursorConfig::in_memory(),
501        }
502    }
503
504    #[test]
505    fn test_engine_initial_state() {
506        let (_tx, rx) = watch::channel(test_config());
507        let engine = ReplicationEngine::new(test_config(), rx);
508        
509        assert_eq!(engine.state(), EngineState::Created);
510        assert!(!engine.is_running());
511        assert_eq!(engine.node_id(), "test-node");
512    }
513
514    #[test]
515    fn test_engine_state_receiver() {
516        let (_tx, rx) = watch::channel(test_config());
517        let engine = ReplicationEngine::new(test_config(), rx);
518        
519        let state_rx = engine.state_receiver();
520        assert_eq!(*state_rx.borrow(), EngineState::Created);
521    }
522
523    #[test]
524    fn test_engine_with_sync_engine() {
525        let (_tx, rx) = watch::channel(test_config());
526        let sync_engine = Arc::new(NoOpSyncEngine);
527        let engine = ReplicationEngine::with_sync_engine(
528            test_config(),
529            rx,
530            sync_engine,
531        );
532        
533        assert_eq!(engine.state(), EngineState::Created);
534        // sync_engine accessor should work
535        let _ = engine.sync_engine();
536    }
537
538    #[test]
539    fn test_engine_circuit_accessor() {
540        let (_tx, rx) = watch::channel(test_config());
541        let engine = ReplicationEngine::new(test_config(), rx);
542        
543        // Circuit breaker should be accessible
544        let circuit = engine.circuit();
545        // Just verify we can access it
546        let _ = circuit.clone();
547    }
548
549    #[test]
550    fn test_engine_peer_manager_accessor() {
551        let (_tx, rx) = watch::channel(test_config());
552        let engine = ReplicationEngine::new(test_config(), rx);
553        
554        let pm = engine.peer_manager();
555        // Empty config = no peers
556        assert!(pm.all().is_empty());
557    }
558
559    #[test]
560    fn test_engine_with_peers() {
561        let mut config = test_config();
562        config.peers.push(PeerConfig::for_testing("peer-1", "redis://localhost:6379"));
563        config.peers.push(PeerConfig::for_testing("peer-2", "redis://localhost:6380"));
564        
565        let (_tx, rx) = watch::channel(config.clone());
566        let engine = ReplicationEngine::new(config, rx);
567        
568        let pm = engine.peer_manager();
569        // Both peers added
570        assert_eq!(pm.all().len(), 2);
571    }
572
573    #[tokio::test]
574    async fn test_engine_start_invalid_state() {
575        let (_tx, rx) = watch::channel(test_config());
576        let mut engine = ReplicationEngine::new(test_config(), rx);
577        
578        // Force state to Running (simulating already started)
579        let _ = engine.state_tx.send(EngineState::Running);
580        
581        // Trying to start should fail
582        let result = engine.start().await;
583        assert!(result.is_err());
584        
585        if let Err(ReplicationError::InvalidState { expected, actual }) = result {
586            assert_eq!(expected, "Created");
587            assert_eq!(actual, "Running");
588        } else {
589            panic!("Expected InvalidState error");
590        }
591    }
592
593    #[tokio::test]
594    async fn test_engine_shutdown_from_created() {
595        let (_tx, rx) = watch::channel(test_config());
596        let mut engine = ReplicationEngine::new(test_config(), rx);
597        
598        // Shutdown from Created state should work
599        engine.shutdown().await;
600        
601        assert_eq!(engine.state(), EngineState::Stopped);
602        assert!(!engine.is_running());
603    }
604
605    #[test]
606    fn test_engine_state_is_running() {
607        let (_tx, rx) = watch::channel(test_config());
608        let engine = ReplicationEngine::new(test_config(), rx);
609        
610        // Initially not running
611        assert!(!engine.is_running());
612        
613        // Manually set to running
614        let _ = engine.state_tx.send(EngineState::Running);
615        assert!(engine.is_running());
616        
617        // Set to stopped
618        let _ = engine.state_tx.send(EngineState::Stopped);
619        assert!(!engine.is_running());
620    }
621}