sync_engine/coordinator/
lifecycle.rs

1// Copyright (c) 2025-2026 Adrian Robinson. Licensed under the AGPL-3.0.
2// See LICENSE file in the project root for full license text.
3
4//! Engine lifecycle management: start, shutdown, run loop.
5//!
6//! This module contains the startup sequence, main run loop, and shutdown logic.
7
8use tracing::{info, warn, debug, error};
9
10use crate::storage::traits::StorageError;
11use crate::cuckoo::{FilterPersistence, L3_FILTER_ID};
12use crate::merkle::{MerkleCacheStore, SqlMerkleStore};
13use crate::resilience::wal::WriteAheadLog;
14
15use super::{SyncEngine, EngineState};
16#[allow(unused_imports)]
17use super::WriteTarget;
18
19impl SyncEngine {
20    /// Start the engine - connect to backends with proper startup sequence.
21    /// 
22    /// Startup flow (trust hierarchy):
23    /// 1. Initialize WAL (SQLite) - always first, our durability lifeline
24    /// 2. Connect to SQL (L3) - ground truth, initialize SQL merkle store
25    /// 3. Drain any pending WAL entries to SQL (blocking)
26    /// 4. Get SQL merkle root (this is our trusted root)
27    /// 5. Load CF snapshots from SQLite:
28    ///    - If snapshot merkle root matches SQL root → CF is trusted
29    ///    - Otherwise → CF must be rebuilt from SQL scan
30    /// 6. Connect to Redis (L2) - cache layer
31    /// 7. Compare Redis merkle root with SQL merkle root
32    ///    - If match → Redis is synced
33    ///    - If mismatch → use branch diff to find stale regions and resync
34    /// 8. Ready!
35    #[tracing::instrument(skip(self), fields(has_redis, has_sql))]
36    pub async fn start(&mut self) -> Result<(), StorageError> {
37        let startup_start = std::time::Instant::now();
38        info!("Starting sync engine with trust-verified startup...");
39        let _ = self.state.send(EngineState::Connecting);
40
41        // ========== PHASE 1: Initialize WAL (always first) ==========
42        let phase_start = std::time::Instant::now();
43        let (wal_path, wal_max_items) = {
44            let cfg = self.config.read();
45            (
46                cfg.wal_path.clone().unwrap_or_else(|| "./sync_engine_wal.db".to_string()),
47                cfg.wal_max_items.unwrap_or(1_000_000),
48            )
49        };
50        
51        let wal = match WriteAheadLog::new(&wal_path, wal_max_items).await {
52            Ok(wal) => {
53                info!(path = %wal_path, "Write-ahead log initialized");
54                crate::metrics::record_startup_phase("wal_init", phase_start.elapsed());
55                wal
56            }
57            Err(e) => {
58                crate::metrics::record_error("WAL", "init", "sqlite");
59                return Err(StorageError::Backend(format!(
60                    "Failed to initialize WAL at {}: {}. Cannot guarantee durability!",
61                    wal_path, e
62                )));
63            }
64        };
65        
66        // Initialize filter persistence (uses same WAL SQLite)
67        match FilterPersistence::new(&wal_path).await {
68            Ok(fp) => {
69                self.filter_persistence = Some(fp);
70            }
71            Err(e) => {
72                warn!(error = %e, "Failed to initialize filter persistence - CF snapshots disabled");
73                crate::metrics::record_error("filter", "init", "persistence");
74            }
75        }
76        
77        let pending_count = if wal.has_pending() {
78            wal.stats(false).pending_items
79        } else {
80            0
81        };
82        self.l3_wal = Some(wal);
83
84        // ========== PHASE 2: Connect to SQL (L3 - ground truth) ==========
85        let phase_start = std::time::Instant::now();
86        let sql_url = self.config.read().sql_url.clone();
87        if let Some(ref sql_url) = sql_url {
88            info!(url = %sql_url, "Connecting to SQL (L3 - ground truth)...");
89            // Pass shared schema registry to SqlStore for table routing
90            match crate::storage::sql::SqlStore::with_registry(sql_url, self.schema_registry.clone()).await {
91                Ok(store) => {
92                    // Initialize SQL merkle store (ground truth) - shares pool with SqlStore
93                    let is_sqlite = sql_url.starts_with("sqlite:");
94                    let sql_merkle = SqlMerkleStore::from_pool(store.pool(), is_sqlite);
95                    if let Err(e) = sql_merkle.init_schema().await {
96                        error!(error = %e, "Failed to initialize SQL merkle schema");
97                        crate::metrics::record_error("L3", "init", "merkle_schema");
98                        return Err(StorageError::Backend(format!(
99                            "Failed to initialize SQL merkle schema: {}", e
100                        )));
101                    }
102                    self.sql_merkle = Some(sql_merkle);
103                    
104                    // Keep both Arc<dyn ArchiveStore> and Arc<SqlStore> for dirty merkle access
105                    let store = std::sync::Arc::new(store);
106                    self.sql_store = Some(store.clone());
107                    self.l3_store = Some(store);
108                    tracing::Span::current().record("has_sql", true);
109                    self.mysql_health.record_success();
110                    crate::metrics::set_backend_healthy("mysql", true);
111                    crate::metrics::record_startup_phase("sql_connect", phase_start.elapsed());
112                    info!("SQL (L3) connected with merkle store (ground truth)");
113                }
114                Err(e) => {
115                    tracing::Span::current().record("has_sql", false);
116                    error!(error = %e, "Failed to connect to SQL - this is required for startup");
117                    self.mysql_health.record_failure();
118                    crate::metrics::set_backend_healthy("mysql", false);
119                    crate::metrics::record_connection_error("mysql");
120                    return Err(StorageError::Backend(format!(
121                        "SQL connection required for startup: {}", e
122                    )));
123                }
124            }
125        } else {
126            warn!("No SQL URL configured - operating without ground truth storage!");
127            tracing::Span::current().record("has_sql", false);
128        }
129
130        // ========== PHASE 3: Drain WAL to SQL ==========
131        if pending_count > 0 {
132            let phase_start = std::time::Instant::now();
133            let _ = self.state.send(EngineState::DrainingWal);
134            info!(pending = pending_count, "Draining WAL to SQL before startup...");
135            
136            if let Some(ref l3) = self.l3_store {
137                if let Some(ref wal) = self.l3_wal {
138                    match wal.drain_to(l3.as_ref(), pending_count as usize).await {
139                        Ok(drained) => {
140                            info!(drained = drained.len(), "WAL drained to SQL");
141                            crate::metrics::record_items_written("L3", drained.len());
142                        }
143                        Err(e) => {
144                            warn!(error = %e, "WAL drain had errors - some items may retry later");
145                            crate::metrics::record_error("WAL", "drain", "partial");
146                        }
147                    }
148                }
149            }
150            crate::metrics::record_startup_phase("wal_drain", phase_start.elapsed());
151        }
152        
153        // ========== PHASE 4: Get SQL merkle root (used to validate snapshots) ==========
154        let sql_root: Option<[u8; 32]> = if let Some(ref sql_merkle) = self.sql_merkle {
155            match sql_merkle.root_hash().await {
156                Ok(Some(root)) => {
157                    info!(root = %hex::encode(root), "SQL merkle root (for snapshot validation)");
158                    Some(root)
159                }
160                Ok(None) => {
161                    info!("SQL merkle tree is empty (no data yet)");
162                    None
163                }
164                Err(e) => {
165                    warn!(error = %e, "Failed to get SQL merkle root");
166                    None
167                }
168            }
169        } else {
170            None
171        };
172        
173        // ========== PHASE 5: Restore CF from snapshot (if valid) ==========
174        let phase_start = std::time::Instant::now();
175        self.restore_cuckoo_filters(&sql_root).await;
176        crate::metrics::record_startup_phase("cf_restore", phase_start.elapsed());
177
178        // ========== PHASE 6: Connect to Redis (L2 - cache) ==========
179        let phase_start = std::time::Instant::now();
180        let (redis_url, redis_prefix) = {
181            let cfg = self.config.read();
182            (cfg.redis_url.clone(), cfg.redis_prefix.clone())
183        };
184        if let Some(ref redis_url) = redis_url {
185            info!(url = %redis_url, prefix = ?redis_prefix, "Connecting to Redis (L2 - cache)...");
186            match crate::storage::redis::RedisStore::with_prefix(redis_url, redis_prefix.as_deref()).await {
187                Ok(store) => {
188                    let merkle_cache = MerkleCacheStore::with_prefix(
189                        store.connection(),
190                        redis_prefix.as_deref(),
191                    );
192                    self.merkle_cache = Some(merkle_cache);
193                    let store = std::sync::Arc::new(store);
194                    self.redis_store = Some(store.clone());  // Keep direct reference for CDC
195                    self.l2_store = Some(store);
196                    tracing::Span::current().record("has_redis", true);
197                    crate::metrics::set_backend_healthy("redis", true);
198                    crate::metrics::record_startup_phase("redis_connect", phase_start.elapsed());
199                    info!("Redis (L2) connected as merkle cache");
200                }
201                Err(e) => {
202                    tracing::Span::current().record("has_redis", false);
203                    warn!(error = %e, "Failed to connect to Redis, continuing without L2 cache");
204                    crate::metrics::set_backend_healthy("redis", false);
205                    crate::metrics::record_connection_error("redis");
206                }
207            }
208        } else {
209            tracing::Span::current().record("has_redis", false);
210        }
211
212        // ========== PHASE 7: Sync Redis with SQL via branch diff ==========
213        if let (Some(ref sql_merkle), Some(ref merkle_cache), Some(ref sql_root)) = 
214            (&self.sql_merkle, &self.merkle_cache, &sql_root) 
215        {
216            let phase_start = std::time::Instant::now();
217            let _ = self.state.send(EngineState::SyncingRedis);
218            
219            match merkle_cache.root_hash().await {
220                Ok(Some(redis_root)) if &redis_root == sql_root => {
221                    info!("Redis cache in sync with SQL");
222                }
223                Ok(Some(redis_root)) => {
224                    info!(
225                        sql_root = %hex::encode(sql_root),
226                        redis_root = %hex::encode(redis_root),
227                        "Redis cache stale - syncing from SQL"
228                    );
229                    
230                    match self.sync_redis_from_sql_diff(sql_merkle, merkle_cache).await {
231                        Ok(synced) => {
232                            info!(items_synced = synced, "Redis cache synced from SQL");
233                            crate::metrics::record_items_written("L2", synced);
234                        }
235                        Err(e) => {
236                            warn!(error = %e, "Branch diff sync failed - Redis may be stale");
237                            crate::metrics::record_error("L2", "sync", "branch_diff");
238                        }
239                    }
240                }
241                Ok(None) => {
242                    info!("Redis cache empty - will populate on writes");
243                }
244                Err(e) => {
245                    warn!(error = %e, "Failed to get Redis merkle root - Redis may be stale");
246                    crate::metrics::record_error("L2", "merkle", "root_hash");
247                }
248            }
249            crate::metrics::record_startup_phase("redis_sync", phase_start.elapsed());
250        }
251
252        let _ = self.state.send(EngineState::Ready);
253        crate::metrics::record_startup_total(startup_start.elapsed());
254        info!("Sync engine ready (trust-verified startup complete)");
255        Ok(())
256    }
257    
258    /// Restore cuckoo filters from snapshots if merkle roots match
259    async fn restore_cuckoo_filters(&self, sql_root: &Option<[u8; 32]>) {
260        let persistence = match &self.filter_persistence {
261            Some(p) => p,
262            None => return,
263        };
264        
265        let sql_root = match sql_root {
266            Some(r) => r,
267            None => return,
268        };
269        
270        // Note: L2 filter removed (TTL makes it untrustworthy - use Redis EXISTS)
271        
272        // Try L3 filter
273        match persistence.load(L3_FILTER_ID).await {
274            Ok(Some(state)) if &state.merkle_root == sql_root => {
275                if let Err(e) = self.l3_filter.import(&state.filter_bytes) {
276                    warn!(error = %e, "Failed to import L3 filter from snapshot");
277                } else {
278                    self.l3_filter.mark_trusted();
279                    info!(entries = state.entry_count, "L3 cuckoo filter valid (snapshot merkle matches SQL)");
280                }
281            }
282            Ok(Some(_)) => warn!("L3 CF snapshot merkle root mismatch - filter will be rebuilt"),
283            Ok(None) => info!("No L3 CF snapshot found - filter will be built on warmup"),
284            Err(e) => warn!(error = %e, "Failed to load L3 CF snapshot"),
285        }
286    }
287    
288    /// Sync Redis from SQL by diffing merkle trees and only syncing stale branches.
289    async fn sync_redis_from_sql_diff(
290        &self,
291        sql_merkle: &SqlMerkleStore,
292        merkle_cache: &MerkleCacheStore,
293    ) -> Result<usize, StorageError> {
294        let mut total_synced = 0;
295        let stale_prefixes = self.find_stale_branches(sql_merkle, merkle_cache, "").await?;
296        
297        for prefix in stale_prefixes {
298            info!(prefix = %prefix, "Syncing stale branch from SQL to Redis");
299            
300            let leaf_paths = sql_merkle.get_leaves_under(&prefix).await
301                .map_err(|e| StorageError::Backend(format!("Failed to get leaves: {}", e)))?;
302            
303            if leaf_paths.is_empty() {
304                continue;
305            }
306            
307            if let Some(ref l3_store) = self.l3_store {
308                for object_id in &leaf_paths {
309                    if let Ok(Some(item)) = l3_store.get(object_id).await {
310                        if let Some(ref l2_store) = self.l2_store {
311                            if let Err(e) = l2_store.put(&item).await {
312                                warn!(id = %object_id, error = %e, "Failed to sync item to Redis");
313                            } else {
314                                total_synced += 1;
315                            }
316                        }
317                    }
318                }
319            }
320            
321            // Sync merkle cache for these paths
322            if let Err(e) = merkle_cache.sync_affected_from_sql(sql_merkle, &leaf_paths).await {
323                warn!(prefix = %prefix, error = %e, "Failed to sync merkle cache");
324            }
325        }
326        
327        Ok(total_synced)
328    }
329    
330    /// Find stale branches by recursively comparing SQL and Redis merkle nodes.
331    async fn find_stale_branches(
332        &self,
333        sql_merkle: &SqlMerkleStore,
334        merkle_cache: &MerkleCacheStore,
335        prefix: &str,
336    ) -> Result<Vec<String>, StorageError> {
337        let mut stale = Vec::new();
338        
339        let sql_children = sql_merkle.get_children(prefix).await
340            .map_err(|e| StorageError::Backend(format!("SQL merkle error: {}", e)))?;
341        let redis_children = merkle_cache.get_children(prefix).await
342            .map_err(|e| StorageError::Backend(format!("Redis merkle error: {}", e)))?;
343        
344        if sql_children.is_empty() {
345            return Ok(stale);
346        }
347        
348        for (child_path, sql_hash) in sql_children {
349            match redis_children.get(&child_path) {
350                Some(redis_hash) if redis_hash == &sql_hash => continue,
351                Some(_) => {
352                    if child_path.contains('.') && !child_path.ends_with('.') {
353                        stale.push(child_path);
354                    } else {
355                        let sub_stale = Box::pin(
356                            self.find_stale_branches(sql_merkle, merkle_cache, &child_path)
357                        ).await?;
358                        stale.extend(sub_stale);
359                    }
360                }
361                None => stale.push(child_path),
362            }
363        }
364        
365        Ok(stale)
366    }
367
368    /// Warm up L1 cache from L2/L3 (call after start)
369    #[tracing::instrument(skip(self))]
370    pub async fn warm_up(&mut self) -> Result<(), StorageError> {
371        let _ = self.state.send(EngineState::WarmingUp);
372        info!("Warming up cuckoo filter and L1 cache...");
373        
374        if let Some(l3) = &self.l3_store {
375            let batch_size = self.config.read().cuckoo_warmup_batch_size;
376            info!(batch_size, "Warming L3 cuckoo filter from MySQL...");
377            
378            let total_count = l3.count_all().await.unwrap_or(0);
379            if total_count > 0 {
380                let mut offset = 0u64;
381                let mut loaded = 0usize;
382                
383                loop {
384                    let keys = l3.scan_keys(offset, batch_size).await?;
385                    if keys.is_empty() {
386                        break;
387                    }
388                    
389                    for key in &keys {
390                        self.l3_filter.insert(key);
391                    }
392                    
393                    loaded += keys.len();
394                    offset += keys.len() as u64;
395                    
396                    if loaded % 10_000 == 0 || loaded == total_count as usize {
397                        debug!(loaded, total = %total_count, "L3 filter warmup progress");
398                    }
399                }
400                
401                self.l3_filter.mark_trusted();
402                info!(loaded, trust_state = ?self.l3_filter.trust_state(), "L3 cuckoo filter warmup complete");
403            } else {
404                info!("L3 store is empty, skipping filter warmup");
405                self.l3_filter.mark_trusted();
406            }
407        }
408        
409        info!(
410            l3_trust = ?self.l3_filter.trust_state(),
411            "Cuckoo filter warmup complete (L3 only, Redis uses EXISTS)"
412        );
413
414        let _ = self.state.send(EngineState::Ready);
415        info!("Warm-up complete, engine ready");
416        Ok(())
417    }
418
419    /// Perform one tick of maintenance (for manual control instead of run loop).
420    pub async fn tick(&self) {
421        self.maybe_evict();
422        self.maybe_flush_l2().await;
423    }
424
425    /// Force flush all pending L2 batches immediately.
426    pub async fn force_flush(&self) {
427        let batch = self.l2_batcher.lock().await.force_flush();
428        if let Some(batch) = batch {
429            debug!(batch_size = batch.items.len(), "Force flushing L2 batch");
430            self.flush_batch_internal(batch).await;
431        }
432    }
433    
434    /// Log merkle tree diagnostic info (root hash, L3 count).
435    /// 
436    /// Used for debugging replication sync issues.
437    pub async fn log_merkle_diagnostic(&self) {
438        let merkle_root = self.merkle_root().await;
439        let l3_count = self.l3_filter.len();
440        let (l1_items, l1_bytes) = self.l1_stats();
441        
442        debug!(
443            merkle_root = merkle_root.as_deref().unwrap_or("none"),
444            l3_filter_count = l3_count,
445            l1_items = l1_items,
446            l1_bytes = l1_bytes,
447            "sync_engine_state"
448        );
449    }
450
451    /// Run the main event loop.
452    /// 
453    /// This method takes `&self` (not `&mut self`) so the engine can be
454    /// shared via `Arc` while the run loop executes in a background task.
455    /// 
456    /// # Example
457    /// 
458    /// ```rust,ignore
459    /// let engine = Arc::new(engine);
460    /// let engine_clone = engine.clone();
461    /// tokio::spawn(async move {
462    ///     engine_clone.run().await;
463    /// });
464    /// ```
465    #[tracing::instrument(skip(self))]
466    pub async fn run(&self) {
467        let _ = self.state.send(EngineState::Running);
468        info!("Sync engine running");
469
470        let mut health_check_interval = tokio::time::interval(
471            tokio::time::Duration::from_secs(30)
472        );
473        let mut wal_drain_interval = tokio::time::interval(
474            tokio::time::Duration::from_secs(5)
475        );
476        let cf_snapshot_secs = self.config.read().cf_snapshot_interval_secs;
477        let mut cf_snapshot_interval = tokio::time::interval(
478            tokio::time::Duration::from_secs(cf_snapshot_secs)
479        );
480
481        loop {
482            // Lock config_rx only for the check, not across awaits
483            let config_changed = {
484                let rx = self.config_rx.lock().await;
485                // Use poll-style check to avoid holding lock
486                rx.has_changed().unwrap_or(false)
487            };
488            
489            if config_changed {
490                let new_config = self.config_rx.lock().await.borrow_and_update().clone();
491                info!("Config hot-reloaded: l1_max_bytes={}, redis_url={:?}", 
492                    new_config.l1_max_bytes,
493                    new_config.redis_url.as_ref().map(|u| u.rsplit('@').next().unwrap_or(u)));
494                *self.config.write() = new_config;
495            }
496            
497            tokio::select! {
498                _ = tokio::time::sleep(tokio::time::Duration::from_millis(100)) => {
499                    self.maybe_evict();
500                    self.maybe_flush_l2().await;
501                    self.maybe_snapshot_cf_by_threshold().await;
502                }
503                
504                _ = health_check_interval.tick() => {
505                    self.check_mysql_health().await;
506                }
507                
508                _ = wal_drain_interval.tick() => {
509                    self.maybe_drain_wal().await;
510                }
511                
512                _ = cf_snapshot_interval.tick() => {
513                    self.maybe_snapshot_cf_by_time().await;
514                }
515            }
516        }
517    }
518
519    /// Initiate graceful shutdown
520    #[tracing::instrument(skip(self))]
521    pub async fn shutdown(&self) {
522        use crate::FlushReason;
523
524        let shutdown_start = std::time::Instant::now();
525        info!("Initiating sync engine shutdown...");
526        let _ = self.state.send(EngineState::ShuttingDown);
527        
528        let batch = self.l2_batcher.lock().await.force_flush_with_reason(FlushReason::Shutdown);
529        if let Some(batch) = batch {
530            let batch_size = batch.items.len();
531            info!(batch_size, "Flushing final L2 batch on shutdown");
532            {
533                let mut batcher = self.l2_batcher.lock().await;
534                batcher.add_batch(batch.items);
535            }
536            self.maybe_flush_l2().await;
537            crate::metrics::record_items_written("L2", batch_size);
538        }
539        
540        self.snapshot_cuckoo_filters("shutdown").await;
541        
542        crate::metrics::record_startup_phase("shutdown", shutdown_start.elapsed());
543        info!("Sync engine shutdown complete");
544    }
545}