sync_engine/coordinator/
lifecycle.rs

1//! Engine lifecycle management: start, shutdown, run loop.
2//!
3//! This module contains the startup sequence, main run loop, and shutdown logic.
4
5use tracing::{info, warn, debug, error};
6
7use crate::storage::traits::StorageError;
8use crate::cuckoo::{FilterPersistence, L3_FILTER_ID};
9use crate::merkle::{RedisMerkleStore, SqlMerkleStore, MerkleBatch, PathMerkle};
10use crate::resilience::wal::WriteAheadLog;
11
12use super::{SyncEngine, EngineState};
13#[allow(unused_imports)]
14use super::WriteTarget;
15
16impl SyncEngine {
17    /// Start the engine - connect to backends with proper startup sequence.
18    /// 
19    /// Startup flow (trust hierarchy):
20    /// 1. Initialize WAL (SQLite) - always first, our durability lifeline
21    /// 2. Connect to SQL (L3) - ground truth, initialize SQL merkle store
22    /// 3. Drain any pending WAL entries to SQL (blocking)
23    /// 4. Get SQL merkle root (this is our trusted root)
24    /// 5. Load CF snapshots from SQLite:
25    ///    - If snapshot merkle root matches SQL root → CF is trusted
26    ///    - Otherwise → CF must be rebuilt from SQL scan
27    /// 6. Connect to Redis (L2) - cache layer
28    /// 7. Compare Redis merkle root with SQL merkle root
29    ///    - If match → Redis is synced
30    ///    - If mismatch → use branch diff to find stale regions and resync
31    /// 8. Ready!
32    #[tracing::instrument(skip(self), fields(has_redis, has_sql))]
33    pub async fn start(&mut self) -> Result<(), StorageError> {
34        let startup_start = std::time::Instant::now();
35        info!("Starting sync engine with trust-verified startup...");
36        let _ = self.state.send(EngineState::Connecting);
37
38        // ========== PHASE 1: Initialize WAL (always first) ==========
39        let phase_start = std::time::Instant::now();
40        let wal_path = self.config.wal_path.clone()
41            .unwrap_or_else(|| "./sync_engine_wal.db".to_string());
42        let wal_max_items = self.config.wal_max_items.unwrap_or(1_000_000);
43        
44        let wal = match WriteAheadLog::new(&wal_path, wal_max_items).await {
45            Ok(wal) => {
46                info!(path = %wal_path, "Write-ahead log initialized");
47                crate::metrics::record_startup_phase("wal_init", phase_start.elapsed());
48                wal
49            }
50            Err(e) => {
51                crate::metrics::record_error("WAL", "init", "sqlite");
52                return Err(StorageError::Backend(format!(
53                    "Failed to initialize WAL at {}: {}. Cannot guarantee durability!",
54                    wal_path, e
55                )));
56            }
57        };
58        
59        // Initialize filter persistence (uses same WAL SQLite)
60        match FilterPersistence::new(&wal_path).await {
61            Ok(fp) => {
62                self.filter_persistence = Some(fp);
63            }
64            Err(e) => {
65                warn!(error = %e, "Failed to initialize filter persistence - CF snapshots disabled");
66                crate::metrics::record_error("filter", "init", "persistence");
67            }
68        }
69        
70        let pending_count = if wal.has_pending() {
71            wal.stats(false).pending_items
72        } else {
73            0
74        };
75        self.l3_wal = Some(wal);
76
77        // ========== PHASE 2: Connect to SQL (L3 - ground truth) ==========
78        let phase_start = std::time::Instant::now();
79        if let Some(ref sql_url) = self.config.sql_url {
80            info!(url = %sql_url, "Connecting to SQL (L3 - ground truth)...");
81            match crate::storage::sql::SqlStore::new(sql_url).await {
82                Ok(store) => {
83                    // Initialize SQL merkle store (ground truth) - shares pool with SqlStore
84                    let is_sqlite = sql_url.starts_with("sqlite:");
85                    let sql_merkle = SqlMerkleStore::from_pool(store.pool(), is_sqlite);
86                    if let Err(e) = sql_merkle.init_schema().await {
87                        error!(error = %e, "Failed to initialize SQL merkle schema");
88                        crate::metrics::record_error("L3", "init", "merkle_schema");
89                        return Err(StorageError::Backend(format!(
90                            "Failed to initialize SQL merkle schema: {}", e
91                        )));
92                    }
93                    self.sql_merkle = Some(sql_merkle);
94                    
95                    self.l3_store = Some(std::sync::Arc::new(store));
96                    tracing::Span::current().record("has_sql", true);
97                    self.mysql_health.record_success();
98                    crate::metrics::set_backend_healthy("mysql", true);
99                    crate::metrics::record_startup_phase("sql_connect", phase_start.elapsed());
100                    info!("SQL (L3) connected with merkle store (ground truth)");
101                }
102                Err(e) => {
103                    tracing::Span::current().record("has_sql", false);
104                    error!(error = %e, "Failed to connect to SQL - this is required for startup");
105                    self.mysql_health.record_failure();
106                    crate::metrics::set_backend_healthy("mysql", false);
107                    crate::metrics::record_connection_error("mysql");
108                    return Err(StorageError::Backend(format!(
109                        "SQL connection required for startup: {}", e
110                    )));
111                }
112            }
113        } else {
114            warn!("No SQL URL configured - operating without ground truth storage!");
115            tracing::Span::current().record("has_sql", false);
116        }
117
118        // ========== PHASE 3: Drain WAL to SQL ==========
119        if pending_count > 0 {
120            let phase_start = std::time::Instant::now();
121            let _ = self.state.send(EngineState::DrainingWal);
122            info!(pending = pending_count, "Draining WAL to SQL before startup...");
123            
124            if let Some(ref l3) = self.l3_store {
125                if let Some(ref wal) = self.l3_wal {
126                    match wal.drain_to(l3.as_ref(), pending_count as usize).await {
127                        Ok(drained) => {
128                            info!(drained = drained.len(), "WAL drained to SQL");
129                            crate::metrics::record_items_written("L3", drained.len());
130                        }
131                        Err(e) => {
132                            warn!(error = %e, "WAL drain had errors - some items may retry later");
133                            crate::metrics::record_error("WAL", "drain", "partial");
134                        }
135                    }
136                }
137            }
138            crate::metrics::record_startup_phase("wal_drain", phase_start.elapsed());
139        }
140        
141        // ========== PHASE 4: Get SQL merkle root (trusted root) ==========
142        let sql_root: Option<[u8; 32]> = if let Some(ref sql_merkle) = self.sql_merkle {
143            match sql_merkle.root_hash().await {
144                Ok(Some(root)) => {
145                    info!(root = %hex::encode(root), "SQL merkle root (ground truth)");
146                    Some(root)
147                }
148                Ok(None) => {
149                    info!("SQL merkle tree is empty (no data yet)");
150                    None
151                }
152                Err(e) => {
153                    warn!(error = %e, "Failed to get SQL merkle root");
154                    None
155                }
156            }
157        } else {
158            None
159        };
160        
161        // ========== PHASE 5: Restore CF from snapshot (if valid) ==========
162        let phase_start = std::time::Instant::now();
163        self.restore_cuckoo_filters(&sql_root).await;
164        crate::metrics::record_startup_phase("cf_restore", phase_start.elapsed());
165
166        // ========== PHASE 6: Connect to Redis (L2 - cache) ==========
167        let phase_start = std::time::Instant::now();
168        if let Some(ref redis_url) = self.config.redis_url {
169            info!(url = %redis_url, prefix = ?self.config.redis_prefix, "Connecting to Redis (L2 - cache)...");
170            match crate::storage::redis::RedisStore::with_prefix(redis_url, self.config.redis_prefix.as_deref()).await {
171                Ok(store) => {
172                    let redis_merkle = RedisMerkleStore::with_prefix(
173                        store.connection(),
174                        self.config.redis_prefix.as_deref(),
175                    );
176                    self.redis_merkle = Some(redis_merkle);
177                    self.l2_store = Some(std::sync::Arc::new(store));
178                    tracing::Span::current().record("has_redis", true);
179                    crate::metrics::set_backend_healthy("redis", true);
180                    crate::metrics::record_startup_phase("redis_connect", phase_start.elapsed());
181                    info!("Redis (L2) connected with merkle shadow tree");
182                }
183                Err(e) => {
184                    tracing::Span::current().record("has_redis", false);
185                    warn!(error = %e, "Failed to connect to Redis, continuing without L2 cache");
186                    crate::metrics::set_backend_healthy("redis", false);
187                    crate::metrics::record_connection_error("redis");
188                }
189            }
190        } else {
191            tracing::Span::current().record("has_redis", false);
192        }
193
194        // ========== PHASE 7: Sync Redis with SQL via branch diff ==========
195        if let (Some(ref sql_merkle), Some(ref redis_merkle), Some(ref sql_root)) = 
196            (&self.sql_merkle, &self.redis_merkle, &sql_root) 
197        {
198            let phase_start = std::time::Instant::now();
199            let _ = self.state.send(EngineState::SyncingRedis);
200            
201            match redis_merkle.root_hash().await {
202                Ok(Some(redis_root)) if &redis_root == sql_root => {
203                    info!("Redis merkle root matches SQL - Redis is in sync");
204                }
205                Ok(Some(redis_root)) => {
206                    info!(
207                        sql_root = %hex::encode(sql_root),
208                        redis_root = %hex::encode(redis_root),
209                        "Redis merkle root mismatch - initiating branch diff sync"
210                    );
211                    
212                    match self.sync_redis_from_sql_diff(sql_merkle, redis_merkle).await {
213                        Ok(synced) => {
214                            info!(items_synced = synced, "Redis sync complete via branch diff");
215                            crate::metrics::record_items_written("L2", synced);
216                        }
217                        Err(e) => {
218                            warn!(error = %e, "Branch diff sync failed - Redis may be stale");
219                            crate::metrics::record_error("L2", "sync", "branch_diff");
220                        }
221                    }
222                }
223                Ok(None) => {
224                    info!("Redis merkle tree is empty - will be populated on writes");
225                }
226                Err(e) => {
227                    warn!(error = %e, "Failed to get Redis merkle root - Redis may be stale");
228                    crate::metrics::record_error("L2", "merkle", "root_hash");
229                }
230            }
231            crate::metrics::record_startup_phase("redis_sync", phase_start.elapsed());
232        }
233
234        let _ = self.state.send(EngineState::Ready);
235        crate::metrics::record_startup_total(startup_start.elapsed());
236        info!("Sync engine ready (trust-verified startup complete)");
237        Ok(())
238    }
239    
240    /// Restore cuckoo filters from snapshots if merkle roots match
241    async fn restore_cuckoo_filters(&self, sql_root: &Option<[u8; 32]>) {
242        let persistence = match &self.filter_persistence {
243            Some(p) => p,
244            None => return,
245        };
246        
247        let sql_root = match sql_root {
248            Some(r) => r,
249            None => return,
250        };
251        
252        // Note: L2 filter removed (TTL makes it untrustworthy - use Redis EXISTS)
253        
254        // Try L3 filter
255        match persistence.load(L3_FILTER_ID).await {
256            Ok(Some(state)) if &state.merkle_root == sql_root => {
257                if let Err(e) = self.l3_filter.import(&state.filter_bytes) {
258                    warn!(error = %e, "Failed to import L3 filter from snapshot");
259                } else {
260                    self.l3_filter.mark_trusted();
261                    info!(entries = state.entry_count, "Restored L3 cuckoo filter from snapshot");
262                }
263            }
264            Ok(Some(_)) => warn!("L3 CF snapshot merkle root mismatch - filter will be rebuilt"),
265            Ok(None) => info!("No L3 CF snapshot found - filter will be built on warmup"),
266            Err(e) => warn!(error = %e, "Failed to load L3 CF snapshot"),
267        }
268    }
269    
270    /// Sync Redis from SQL by diffing merkle trees and only syncing stale branches.
271    async fn sync_redis_from_sql_diff(
272        &self,
273        sql_merkle: &SqlMerkleStore,
274        redis_merkle: &RedisMerkleStore,
275    ) -> Result<usize, StorageError> {
276        let mut total_synced = 0;
277        let stale_prefixes = self.find_stale_branches(sql_merkle, redis_merkle, "").await?;
278        
279        for prefix in stale_prefixes {
280            info!(prefix = %prefix, "Syncing stale branch from SQL to Redis");
281            
282            let leaf_paths = sql_merkle.get_leaves_under(&prefix).await
283                .map_err(|e| StorageError::Backend(format!("Failed to get leaves: {}", e)))?;
284            
285            if leaf_paths.is_empty() {
286                continue;
287            }
288            
289            let mut merkle_batch = MerkleBatch::new();
290            
291            if let Some(ref l3_store) = self.l3_store {
292                for object_id in &leaf_paths {
293                    if let Ok(Some(item)) = l3_store.get(object_id).await {
294                        let payload_hash = PathMerkle::payload_hash(&item.content);
295                        let leaf_hash = PathMerkle::leaf_hash(
296                            &item.object_id,
297                            item.version,
298                            item.updated_at,
299                            &payload_hash,
300                        );
301                        merkle_batch.insert(object_id.clone(), leaf_hash);
302                        
303                        if let Some(ref l2_store) = self.l2_store {
304                            if let Err(e) = l2_store.put(&item).await {
305                                warn!(id = %object_id, error = %e, "Failed to sync item to Redis");
306                            } else {
307                                total_synced += 1;
308                            }
309                        }
310                    }
311                }
312                
313                if !merkle_batch.is_empty() {
314                    if let Err(e) = redis_merkle.apply_batch(&merkle_batch).await {
315                        warn!(prefix = %prefix, error = %e, "Failed to update Redis merkle");
316                    }
317                }
318            }
319        }
320        
321        Ok(total_synced)
322    }
323    
324    /// Find stale branches by recursively comparing SQL and Redis merkle nodes.
325    async fn find_stale_branches(
326        &self,
327        sql_merkle: &SqlMerkleStore,
328        redis_merkle: &RedisMerkleStore,
329        prefix: &str,
330    ) -> Result<Vec<String>, StorageError> {
331        let mut stale = Vec::new();
332        
333        let sql_children = sql_merkle.get_children(prefix).await
334            .map_err(|e| StorageError::Backend(format!("SQL merkle error: {}", e)))?;
335        let redis_children = redis_merkle.get_children(prefix).await
336            .map_err(|e| StorageError::Backend(format!("Redis merkle error: {}", e)))?;
337        
338        if sql_children.is_empty() {
339            return Ok(stale);
340        }
341        
342        for (child_path, sql_hash) in sql_children {
343            match redis_children.get(&child_path) {
344                Some(redis_hash) if redis_hash == &sql_hash => continue,
345                Some(_) => {
346                    if child_path.contains('.') && !child_path.ends_with('.') {
347                        stale.push(child_path);
348                    } else {
349                        let sub_stale = Box::pin(
350                            self.find_stale_branches(sql_merkle, redis_merkle, &child_path)
351                        ).await?;
352                        stale.extend(sub_stale);
353                    }
354                }
355                None => stale.push(child_path),
356            }
357        }
358        
359        Ok(stale)
360    }
361
362    /// Warm up L1 cache from L2/L3 (call after start)
363    #[tracing::instrument(skip(self))]
364    pub async fn warm_up(&mut self) -> Result<(), StorageError> {
365        let _ = self.state.send(EngineState::WarmingUp);
366        info!("Warming up cuckoo filter and L1 cache...");
367        
368        if let Some(l3) = &self.l3_store {
369            let batch_size = self.config.cuckoo_warmup_batch_size;
370            info!(batch_size, "Warming L3 cuckoo filter from MySQL...");
371            
372            let total_count = l3.count_all().await.unwrap_or(0);
373            if total_count > 0 {
374                let mut offset = 0u64;
375                let mut loaded = 0usize;
376                
377                loop {
378                    let keys = l3.scan_keys(offset, batch_size).await?;
379                    if keys.is_empty() {
380                        break;
381                    }
382                    
383                    for key in &keys {
384                        self.l3_filter.insert(key);
385                    }
386                    
387                    loaded += keys.len();
388                    offset += keys.len() as u64;
389                    
390                    if loaded % 10_000 == 0 || loaded == total_count as usize {
391                        debug!(loaded, total = %total_count, "L3 filter warmup progress");
392                    }
393                }
394                
395                self.l3_filter.mark_trusted();
396                info!(loaded, trust_state = ?self.l3_filter.trust_state(), "L3 cuckoo filter warmup complete");
397            } else {
398                info!("L3 store is empty, skipping filter warmup");
399                self.l3_filter.mark_trusted();
400            }
401        }
402        
403        info!(
404            l3_trust = ?self.l3_filter.trust_state(),
405            "Cuckoo filter warmup complete (L3 only, Redis uses EXISTS)"
406        );
407
408        let _ = self.state.send(EngineState::Ready);
409        info!("Warm-up complete, engine ready");
410        Ok(())
411    }
412
413    /// Perform one tick of maintenance (for manual control instead of run loop).
414    pub async fn tick(&self) {
415        self.maybe_evict();
416        self.maybe_flush_l2().await;
417    }
418
419    /// Force flush all pending L2 batches immediately.
420    pub async fn force_flush(&self) {
421        let batch = self.l2_batcher.lock().await.force_flush();
422        if let Some(batch) = batch {
423            debug!(batch_size = batch.items.len(), "Force flushing L2 batch");
424            self.flush_batch_internal(batch).await;
425        }
426    }
427
428    /// Run the main event loop
429    #[tracing::instrument(skip(self))]
430    pub async fn run(&mut self) {
431        let _ = self.state.send(EngineState::Running);
432        info!("Sync engine running");
433
434        let mut health_check_interval = tokio::time::interval(
435            tokio::time::Duration::from_secs(30)
436        );
437        let mut wal_drain_interval = tokio::time::interval(
438            tokio::time::Duration::from_secs(5)
439        );
440        let mut cf_snapshot_interval = tokio::time::interval(
441            tokio::time::Duration::from_secs(self.config.cf_snapshot_interval_secs)
442        );
443
444        loop {
445            tokio::select! {
446                Ok(()) = self.config_rx.changed() => {
447                    let new_config = self.config_rx.borrow().clone();
448                    info!("Config updated: l1_max_bytes={}", new_config.l1_max_bytes);
449                    self.config = new_config;
450                }
451                
452                _ = tokio::time::sleep(tokio::time::Duration::from_millis(100)) => {
453                    self.maybe_evict();
454                    self.maybe_flush_l2().await;
455                    self.maybe_snapshot_cf_by_threshold().await;
456                }
457                
458                _ = health_check_interval.tick() => {
459                    self.check_mysql_health().await;
460                }
461                
462                _ = wal_drain_interval.tick() => {
463                    self.maybe_drain_wal().await;
464                }
465                
466                _ = cf_snapshot_interval.tick() => {
467                    self.maybe_snapshot_cf_by_time().await;
468                }
469            }
470        }
471    }
472
473    /// Initiate graceful shutdown
474    #[tracing::instrument(skip(self))]
475    pub async fn shutdown(&mut self) {
476        use crate::FlushReason;
477        
478        let shutdown_start = std::time::Instant::now();
479        info!("Initiating sync engine shutdown...");
480        let _ = self.state.send(EngineState::ShuttingDown);
481        
482        let batch = self.l2_batcher.lock().await.force_flush_with_reason(FlushReason::Shutdown);
483        if let Some(batch) = batch {
484            let batch_size = batch.items.len();
485            info!(batch_size, "Flushing final L2 batch on shutdown");
486            {
487                let mut batcher = self.l2_batcher.lock().await;
488                batcher.add_batch(batch.items);
489            }
490            self.maybe_flush_l2().await;
491            crate::metrics::record_items_written("L2", batch_size);
492        }
493        
494        self.snapshot_cuckoo_filters("shutdown").await;
495        
496        crate::metrics::record_startup_phase("shutdown", shutdown_start.elapsed());
497        info!("Sync engine shutdown complete");
498    }
499}