sync_engine/coordinator/
lifecycle.rs

1//! Engine lifecycle management: start, shutdown, run loop.
2//!
3//! This module contains the startup sequence, main run loop, and shutdown logic.
4
5use tracing::{info, warn, debug, error};
6
7use crate::storage::traits::StorageError;
8use crate::cuckoo::{FilterPersistence, L3_FILTER_ID};
9use crate::merkle::{RedisMerkleStore, SqlMerkleStore, MerkleBatch, PathMerkle};
10use crate::resilience::wal::WriteAheadLog;
11
12use super::{SyncEngine, EngineState};
13#[allow(unused_imports)]
14use super::WriteTarget;
15
16impl SyncEngine {
17    /// Start the engine - connect to backends with proper startup sequence.
18    /// 
19    /// Startup flow (trust hierarchy):
20    /// 1. Initialize WAL (SQLite) - always first, our durability lifeline
21    /// 2. Connect to SQL (L3) - ground truth, initialize SQL merkle store
22    /// 3. Drain any pending WAL entries to SQL (blocking)
23    /// 4. Get SQL merkle root (this is our trusted root)
24    /// 5. Load CF snapshots from SQLite:
25    ///    - If snapshot merkle root matches SQL root → CF is trusted
26    ///    - Otherwise → CF must be rebuilt from SQL scan
27    /// 6. Connect to Redis (L2) - cache layer
28    /// 7. Compare Redis merkle root with SQL merkle root
29    ///    - If match → Redis is synced
30    ///    - If mismatch → use branch diff to find stale regions and resync
31    /// 8. Ready!
32    #[tracing::instrument(skip(self), fields(has_redis, has_sql))]
33    pub async fn start(&mut self) -> Result<(), StorageError> {
34        let startup_start = std::time::Instant::now();
35        info!("Starting sync engine with trust-verified startup...");
36        let _ = self.state.send(EngineState::Connecting);
37
38        // ========== PHASE 1: Initialize WAL (always first) ==========
39        let phase_start = std::time::Instant::now();
40        let wal_path = self.config.wal_path.clone()
41            .unwrap_or_else(|| "./sync_engine_wal.db".to_string());
42        let wal_max_items = self.config.wal_max_items.unwrap_or(1_000_000);
43        
44        let wal = match WriteAheadLog::new(&wal_path, wal_max_items).await {
45            Ok(wal) => {
46                info!(path = %wal_path, "Write-ahead log initialized");
47                crate::metrics::record_startup_phase("wal_init", phase_start.elapsed());
48                wal
49            }
50            Err(e) => {
51                crate::metrics::record_error("WAL", "init", "sqlite");
52                return Err(StorageError::Backend(format!(
53                    "Failed to initialize WAL at {}: {}. Cannot guarantee durability!",
54                    wal_path, e
55                )));
56            }
57        };
58        
59        // Initialize filter persistence (uses same WAL SQLite)
60        match FilterPersistence::new(&wal_path).await {
61            Ok(fp) => {
62                self.filter_persistence = Some(fp);
63            }
64            Err(e) => {
65                warn!(error = %e, "Failed to initialize filter persistence - CF snapshots disabled");
66                crate::metrics::record_error("filter", "init", "persistence");
67            }
68        }
69        
70        let pending_count = if wal.has_pending() {
71            wal.stats(false).pending_items
72        } else {
73            0
74        };
75        self.l3_wal = Some(wal);
76
77        // ========== PHASE 2: Connect to SQL (L3 - ground truth) ==========
78        let phase_start = std::time::Instant::now();
79        if let Some(ref sql_url) = self.config.sql_url {
80            info!(url = %sql_url, "Connecting to SQL (L3 - ground truth)...");
81            match crate::storage::sql::SqlStore::new(sql_url).await {
82                Ok(store) => {
83                    // Initialize SQL merkle store (ground truth) - shares pool with SqlStore
84                    let is_sqlite = sql_url.starts_with("sqlite:");
85                    let sql_merkle = SqlMerkleStore::from_pool(store.pool(), is_sqlite);
86                    if let Err(e) = sql_merkle.init_schema().await {
87                        error!(error = %e, "Failed to initialize SQL merkle schema");
88                        crate::metrics::record_error("L3", "init", "merkle_schema");
89                        return Err(StorageError::Backend(format!(
90                            "Failed to initialize SQL merkle schema: {}", e
91                        )));
92                    }
93                    self.sql_merkle = Some(sql_merkle);
94                    
95                    // Keep both Arc<dyn ArchiveStore> and Arc<SqlStore> for dirty merkle access
96                    let store = std::sync::Arc::new(store);
97                    self.sql_store = Some(store.clone());
98                    self.l3_store = Some(store);
99                    tracing::Span::current().record("has_sql", true);
100                    self.mysql_health.record_success();
101                    crate::metrics::set_backend_healthy("mysql", true);
102                    crate::metrics::record_startup_phase("sql_connect", phase_start.elapsed());
103                    info!("SQL (L3) connected with merkle store (ground truth)");
104                }
105                Err(e) => {
106                    tracing::Span::current().record("has_sql", false);
107                    error!(error = %e, "Failed to connect to SQL - this is required for startup");
108                    self.mysql_health.record_failure();
109                    crate::metrics::set_backend_healthy("mysql", false);
110                    crate::metrics::record_connection_error("mysql");
111                    return Err(StorageError::Backend(format!(
112                        "SQL connection required for startup: {}", e
113                    )));
114                }
115            }
116        } else {
117            warn!("No SQL URL configured - operating without ground truth storage!");
118            tracing::Span::current().record("has_sql", false);
119        }
120
121        // ========== PHASE 3: Drain WAL to SQL ==========
122        if pending_count > 0 {
123            let phase_start = std::time::Instant::now();
124            let _ = self.state.send(EngineState::DrainingWal);
125            info!(pending = pending_count, "Draining WAL to SQL before startup...");
126            
127            if let Some(ref l3) = self.l3_store {
128                if let Some(ref wal) = self.l3_wal {
129                    match wal.drain_to(l3.as_ref(), pending_count as usize).await {
130                        Ok(drained) => {
131                            info!(drained = drained.len(), "WAL drained to SQL");
132                            crate::metrics::record_items_written("L3", drained.len());
133                        }
134                        Err(e) => {
135                            warn!(error = %e, "WAL drain had errors - some items may retry later");
136                            crate::metrics::record_error("WAL", "drain", "partial");
137                        }
138                    }
139                }
140            }
141            crate::metrics::record_startup_phase("wal_drain", phase_start.elapsed());
142        }
143        
144        // ========== PHASE 4: Get SQL merkle root (trusted root) ==========
145        let sql_root: Option<[u8; 32]> = if let Some(ref sql_merkle) = self.sql_merkle {
146            match sql_merkle.root_hash().await {
147                Ok(Some(root)) => {
148                    info!(root = %hex::encode(root), "SQL merkle root (ground truth)");
149                    Some(root)
150                }
151                Ok(None) => {
152                    info!("SQL merkle tree is empty (no data yet)");
153                    None
154                }
155                Err(e) => {
156                    warn!(error = %e, "Failed to get SQL merkle root");
157                    None
158                }
159            }
160        } else {
161            None
162        };
163        
164        // ========== PHASE 5: Restore CF from snapshot (if valid) ==========
165        let phase_start = std::time::Instant::now();
166        self.restore_cuckoo_filters(&sql_root).await;
167        crate::metrics::record_startup_phase("cf_restore", phase_start.elapsed());
168
169        // ========== PHASE 6: Connect to Redis (L2 - cache) ==========
170        let phase_start = std::time::Instant::now();
171        if let Some(ref redis_url) = self.config.redis_url {
172            info!(url = %redis_url, prefix = ?self.config.redis_prefix, "Connecting to Redis (L2 - cache)...");
173            match crate::storage::redis::RedisStore::with_prefix(redis_url, self.config.redis_prefix.as_deref()).await {
174                Ok(store) => {
175                    let redis_merkle = RedisMerkleStore::with_prefix(
176                        store.connection(),
177                        self.config.redis_prefix.as_deref(),
178                    );
179                    self.redis_merkle = Some(redis_merkle);
180                    let store = std::sync::Arc::new(store);
181                    self.redis_store = Some(store.clone());  // Keep direct reference for CDC
182                    self.l2_store = Some(store);
183                    tracing::Span::current().record("has_redis", true);
184                    crate::metrics::set_backend_healthy("redis", true);
185                    crate::metrics::record_startup_phase("redis_connect", phase_start.elapsed());
186                    info!("Redis (L2) connected with merkle shadow tree");
187                }
188                Err(e) => {
189                    tracing::Span::current().record("has_redis", false);
190                    warn!(error = %e, "Failed to connect to Redis, continuing without L2 cache");
191                    crate::metrics::set_backend_healthy("redis", false);
192                    crate::metrics::record_connection_error("redis");
193                }
194            }
195        } else {
196            tracing::Span::current().record("has_redis", false);
197        }
198
199        // ========== PHASE 7: Sync Redis with SQL via branch diff ==========
200        if let (Some(ref sql_merkle), Some(ref redis_merkle), Some(ref sql_root)) = 
201            (&self.sql_merkle, &self.redis_merkle, &sql_root) 
202        {
203            let phase_start = std::time::Instant::now();
204            let _ = self.state.send(EngineState::SyncingRedis);
205            
206            match redis_merkle.root_hash().await {
207                Ok(Some(redis_root)) if &redis_root == sql_root => {
208                    info!("Redis merkle root matches SQL - Redis is in sync");
209                }
210                Ok(Some(redis_root)) => {
211                    info!(
212                        sql_root = %hex::encode(sql_root),
213                        redis_root = %hex::encode(redis_root),
214                        "Redis merkle root mismatch - initiating branch diff sync"
215                    );
216                    
217                    match self.sync_redis_from_sql_diff(sql_merkle, redis_merkle).await {
218                        Ok(synced) => {
219                            info!(items_synced = synced, "Redis sync complete via branch diff");
220                            crate::metrics::record_items_written("L2", synced);
221                        }
222                        Err(e) => {
223                            warn!(error = %e, "Branch diff sync failed - Redis may be stale");
224                            crate::metrics::record_error("L2", "sync", "branch_diff");
225                        }
226                    }
227                }
228                Ok(None) => {
229                    info!("Redis merkle tree is empty - will be populated on writes");
230                }
231                Err(e) => {
232                    warn!(error = %e, "Failed to get Redis merkle root - Redis may be stale");
233                    crate::metrics::record_error("L2", "merkle", "root_hash");
234                }
235            }
236            crate::metrics::record_startup_phase("redis_sync", phase_start.elapsed());
237        }
238
239        let _ = self.state.send(EngineState::Ready);
240        crate::metrics::record_startup_total(startup_start.elapsed());
241        info!("Sync engine ready (trust-verified startup complete)");
242        Ok(())
243    }
244    
245    /// Restore cuckoo filters from snapshots if merkle roots match
246    async fn restore_cuckoo_filters(&self, sql_root: &Option<[u8; 32]>) {
247        let persistence = match &self.filter_persistence {
248            Some(p) => p,
249            None => return,
250        };
251        
252        let sql_root = match sql_root {
253            Some(r) => r,
254            None => return,
255        };
256        
257        // Note: L2 filter removed (TTL makes it untrustworthy - use Redis EXISTS)
258        
259        // Try L3 filter
260        match persistence.load(L3_FILTER_ID).await {
261            Ok(Some(state)) if &state.merkle_root == sql_root => {
262                if let Err(e) = self.l3_filter.import(&state.filter_bytes) {
263                    warn!(error = %e, "Failed to import L3 filter from snapshot");
264                } else {
265                    self.l3_filter.mark_trusted();
266                    info!(entries = state.entry_count, "Restored L3 cuckoo filter from snapshot");
267                }
268            }
269            Ok(Some(_)) => warn!("L3 CF snapshot merkle root mismatch - filter will be rebuilt"),
270            Ok(None) => info!("No L3 CF snapshot found - filter will be built on warmup"),
271            Err(e) => warn!(error = %e, "Failed to load L3 CF snapshot"),
272        }
273    }
274    
275    /// Sync Redis from SQL by diffing merkle trees and only syncing stale branches.
276    async fn sync_redis_from_sql_diff(
277        &self,
278        sql_merkle: &SqlMerkleStore,
279        redis_merkle: &RedisMerkleStore,
280    ) -> Result<usize, StorageError> {
281        let mut total_synced = 0;
282        let stale_prefixes = self.find_stale_branches(sql_merkle, redis_merkle, "").await?;
283        
284        for prefix in stale_prefixes {
285            info!(prefix = %prefix, "Syncing stale branch from SQL to Redis");
286            
287            let leaf_paths = sql_merkle.get_leaves_under(&prefix).await
288                .map_err(|e| StorageError::Backend(format!("Failed to get leaves: {}", e)))?;
289            
290            if leaf_paths.is_empty() {
291                continue;
292            }
293            
294            let mut merkle_batch = MerkleBatch::new();
295            
296            if let Some(ref l3_store) = self.l3_store {
297                for object_id in &leaf_paths {
298                    if let Ok(Some(item)) = l3_store.get(object_id).await {
299                        let payload_hash = PathMerkle::payload_hash(&item.content);
300                        let leaf_hash = PathMerkle::leaf_hash(
301                            &item.object_id,
302                            item.version,
303                            item.updated_at,
304                            &payload_hash,
305                        );
306                        merkle_batch.insert(object_id.clone(), leaf_hash);
307                        
308                        if let Some(ref l2_store) = self.l2_store {
309                            if let Err(e) = l2_store.put(&item).await {
310                                warn!(id = %object_id, error = %e, "Failed to sync item to Redis");
311                            } else {
312                                total_synced += 1;
313                            }
314                        }
315                    }
316                }
317                
318                if !merkle_batch.is_empty() {
319                    if let Err(e) = redis_merkle.apply_batch(&merkle_batch).await {
320                        warn!(prefix = %prefix, error = %e, "Failed to update Redis merkle");
321                    }
322                }
323            }
324        }
325        
326        Ok(total_synced)
327    }
328    
329    /// Find stale branches by recursively comparing SQL and Redis merkle nodes.
330    async fn find_stale_branches(
331        &self,
332        sql_merkle: &SqlMerkleStore,
333        redis_merkle: &RedisMerkleStore,
334        prefix: &str,
335    ) -> Result<Vec<String>, StorageError> {
336        let mut stale = Vec::new();
337        
338        let sql_children = sql_merkle.get_children(prefix).await
339            .map_err(|e| StorageError::Backend(format!("SQL merkle error: {}", e)))?;
340        let redis_children = redis_merkle.get_children(prefix).await
341            .map_err(|e| StorageError::Backend(format!("Redis merkle error: {}", e)))?;
342        
343        if sql_children.is_empty() {
344            return Ok(stale);
345        }
346        
347        for (child_path, sql_hash) in sql_children {
348            match redis_children.get(&child_path) {
349                Some(redis_hash) if redis_hash == &sql_hash => continue,
350                Some(_) => {
351                    if child_path.contains('.') && !child_path.ends_with('.') {
352                        stale.push(child_path);
353                    } else {
354                        let sub_stale = Box::pin(
355                            self.find_stale_branches(sql_merkle, redis_merkle, &child_path)
356                        ).await?;
357                        stale.extend(sub_stale);
358                    }
359                }
360                None => stale.push(child_path),
361            }
362        }
363        
364        Ok(stale)
365    }
366
367    /// Warm up L1 cache from L2/L3 (call after start)
368    #[tracing::instrument(skip(self))]
369    pub async fn warm_up(&mut self) -> Result<(), StorageError> {
370        let _ = self.state.send(EngineState::WarmingUp);
371        info!("Warming up cuckoo filter and L1 cache...");
372        
373        if let Some(l3) = &self.l3_store {
374            let batch_size = self.config.cuckoo_warmup_batch_size;
375            info!(batch_size, "Warming L3 cuckoo filter from MySQL...");
376            
377            let total_count = l3.count_all().await.unwrap_or(0);
378            if total_count > 0 {
379                let mut offset = 0u64;
380                let mut loaded = 0usize;
381                
382                loop {
383                    let keys = l3.scan_keys(offset, batch_size).await?;
384                    if keys.is_empty() {
385                        break;
386                    }
387                    
388                    for key in &keys {
389                        self.l3_filter.insert(key);
390                    }
391                    
392                    loaded += keys.len();
393                    offset += keys.len() as u64;
394                    
395                    if loaded % 10_000 == 0 || loaded == total_count as usize {
396                        debug!(loaded, total = %total_count, "L3 filter warmup progress");
397                    }
398                }
399                
400                self.l3_filter.mark_trusted();
401                info!(loaded, trust_state = ?self.l3_filter.trust_state(), "L3 cuckoo filter warmup complete");
402            } else {
403                info!("L3 store is empty, skipping filter warmup");
404                self.l3_filter.mark_trusted();
405            }
406        }
407        
408        info!(
409            l3_trust = ?self.l3_filter.trust_state(),
410            "Cuckoo filter warmup complete (L3 only, Redis uses EXISTS)"
411        );
412
413        let _ = self.state.send(EngineState::Ready);
414        info!("Warm-up complete, engine ready");
415        Ok(())
416    }
417
418    /// Perform one tick of maintenance (for manual control instead of run loop).
419    pub async fn tick(&self) {
420        self.maybe_evict();
421        self.maybe_flush_l2().await;
422    }
423
424    /// Force flush all pending L2 batches immediately.
425    pub async fn force_flush(&self) {
426        let batch = self.l2_batcher.lock().await.force_flush();
427        if let Some(batch) = batch {
428            debug!(batch_size = batch.items.len(), "Force flushing L2 batch");
429            self.flush_batch_internal(batch).await;
430        }
431    }
432
433    /// Run the main event loop
434    #[tracing::instrument(skip(self))]
435    pub async fn run(&mut self) {
436        let _ = self.state.send(EngineState::Running);
437        info!("Sync engine running");
438
439        let mut health_check_interval = tokio::time::interval(
440            tokio::time::Duration::from_secs(30)
441        );
442        let mut wal_drain_interval = tokio::time::interval(
443            tokio::time::Duration::from_secs(5)
444        );
445        let mut cf_snapshot_interval = tokio::time::interval(
446            tokio::time::Duration::from_secs(self.config.cf_snapshot_interval_secs)
447        );
448
449        loop {
450            tokio::select! {
451                Ok(()) = self.config_rx.changed() => {
452                    let new_config = self.config_rx.borrow().clone();
453                    info!("Config updated: l1_max_bytes={}", new_config.l1_max_bytes);
454                    self.config = new_config;
455                }
456                
457                _ = tokio::time::sleep(tokio::time::Duration::from_millis(100)) => {
458                    self.maybe_evict();
459                    self.maybe_flush_l2().await;
460                    self.maybe_snapshot_cf_by_threshold().await;
461                }
462                
463                _ = health_check_interval.tick() => {
464                    self.check_mysql_health().await;
465                }
466                
467                _ = wal_drain_interval.tick() => {
468                    self.maybe_drain_wal().await;
469                }
470                
471                _ = cf_snapshot_interval.tick() => {
472                    self.maybe_snapshot_cf_by_time().await;
473                }
474            }
475        }
476    }
477
478    /// Initiate graceful shutdown
479    #[tracing::instrument(skip(self))]
480    pub async fn shutdown(&self) {
481        use crate::FlushReason;
482        
483        let shutdown_start = std::time::Instant::now();
484        info!("Initiating sync engine shutdown...");
485        let _ = self.state.send(EngineState::ShuttingDown);
486        
487        let batch = self.l2_batcher.lock().await.force_flush_with_reason(FlushReason::Shutdown);
488        if let Some(batch) = batch {
489            let batch_size = batch.items.len();
490            info!(batch_size, "Flushing final L2 batch on shutdown");
491            {
492                let mut batcher = self.l2_batcher.lock().await;
493                batcher.add_batch(batch.items);
494            }
495            self.maybe_flush_l2().await;
496            crate::metrics::record_items_written("L2", batch_size);
497        }
498        
499        self.snapshot_cuckoo_filters("shutdown").await;
500        
501        crate::metrics::record_startup_phase("shutdown", shutdown_start.elapsed());
502        info!("Sync engine shutdown complete");
503    }
504}