sync_engine/coordinator/
lifecycle.rs

1//! Engine lifecycle management: start, shutdown, run loop.
2//!
3//! This module contains the startup sequence, main run loop, and shutdown logic.
4
5use tracing::{info, warn, debug, error};
6
7use crate::storage::traits::StorageError;
8use crate::cuckoo::{FilterPersistence, L3_FILTER_ID};
9use crate::merkle::{RedisMerkleStore, SqlMerkleStore, MerkleBatch, PathMerkle};
10use crate::resilience::wal::WriteAheadLog;
11
12use super::{SyncEngine, EngineState};
13#[allow(unused_imports)]
14use super::WriteTarget;
15
16impl SyncEngine {
17    /// Start the engine - connect to backends with proper startup sequence.
18    /// 
19    /// Startup flow (trust hierarchy):
20    /// 1. Initialize WAL (SQLite) - always first, our durability lifeline
21    /// 2. Connect to SQL (L3) - ground truth, initialize SQL merkle store
22    /// 3. Drain any pending WAL entries to SQL (blocking)
23    /// 4. Get SQL merkle root (this is our trusted root)
24    /// 5. Load CF snapshots from SQLite:
25    ///    - If snapshot merkle root matches SQL root → CF is trusted
26    ///    - Otherwise → CF must be rebuilt from SQL scan
27    /// 6. Connect to Redis (L2) - cache layer
28    /// 7. Compare Redis merkle root with SQL merkle root
29    ///    - If match → Redis is synced
30    ///    - If mismatch → use branch diff to find stale regions and resync
31    /// 8. Ready!
32    #[tracing::instrument(skip(self), fields(has_redis, has_sql))]
33    pub async fn start(&mut self) -> Result<(), StorageError> {
34        let startup_start = std::time::Instant::now();
35        info!("Starting sync engine with trust-verified startup...");
36        let _ = self.state.send(EngineState::Connecting);
37
38        // ========== PHASE 1: Initialize WAL (always first) ==========
39        let phase_start = std::time::Instant::now();
40        let (wal_path, wal_max_items) = {
41            let cfg = self.config.read();
42            (
43                cfg.wal_path.clone().unwrap_or_else(|| "./sync_engine_wal.db".to_string()),
44                cfg.wal_max_items.unwrap_or(1_000_000),
45            )
46        };
47        
48        let wal = match WriteAheadLog::new(&wal_path, wal_max_items).await {
49            Ok(wal) => {
50                info!(path = %wal_path, "Write-ahead log initialized");
51                crate::metrics::record_startup_phase("wal_init", phase_start.elapsed());
52                wal
53            }
54            Err(e) => {
55                crate::metrics::record_error("WAL", "init", "sqlite");
56                return Err(StorageError::Backend(format!(
57                    "Failed to initialize WAL at {}: {}. Cannot guarantee durability!",
58                    wal_path, e
59                )));
60            }
61        };
62        
63        // Initialize filter persistence (uses same WAL SQLite)
64        match FilterPersistence::new(&wal_path).await {
65            Ok(fp) => {
66                self.filter_persistence = Some(fp);
67            }
68            Err(e) => {
69                warn!(error = %e, "Failed to initialize filter persistence - CF snapshots disabled");
70                crate::metrics::record_error("filter", "init", "persistence");
71            }
72        }
73        
74        let pending_count = if wal.has_pending() {
75            wal.stats(false).pending_items
76        } else {
77            0
78        };
79        self.l3_wal = Some(wal);
80
81        // ========== PHASE 2: Connect to SQL (L3 - ground truth) ==========
82        let phase_start = std::time::Instant::now();
83        let sql_url = self.config.read().sql_url.clone();
84        if let Some(ref sql_url) = sql_url {
85            info!(url = %sql_url, "Connecting to SQL (L3 - ground truth)...");
86            match crate::storage::sql::SqlStore::new(sql_url).await {
87                Ok(store) => {
88                    // Initialize SQL merkle store (ground truth) - shares pool with SqlStore
89                    let is_sqlite = sql_url.starts_with("sqlite:");
90                    let sql_merkle = SqlMerkleStore::from_pool(store.pool(), is_sqlite);
91                    if let Err(e) = sql_merkle.init_schema().await {
92                        error!(error = %e, "Failed to initialize SQL merkle schema");
93                        crate::metrics::record_error("L3", "init", "merkle_schema");
94                        return Err(StorageError::Backend(format!(
95                            "Failed to initialize SQL merkle schema: {}", e
96                        )));
97                    }
98                    self.sql_merkle = Some(sql_merkle);
99                    
100                    // Keep both Arc<dyn ArchiveStore> and Arc<SqlStore> for dirty merkle access
101                    let store = std::sync::Arc::new(store);
102                    self.sql_store = Some(store.clone());
103                    self.l3_store = Some(store);
104                    tracing::Span::current().record("has_sql", true);
105                    self.mysql_health.record_success();
106                    crate::metrics::set_backend_healthy("mysql", true);
107                    crate::metrics::record_startup_phase("sql_connect", phase_start.elapsed());
108                    info!("SQL (L3) connected with merkle store (ground truth)");
109                }
110                Err(e) => {
111                    tracing::Span::current().record("has_sql", false);
112                    error!(error = %e, "Failed to connect to SQL - this is required for startup");
113                    self.mysql_health.record_failure();
114                    crate::metrics::set_backend_healthy("mysql", false);
115                    crate::metrics::record_connection_error("mysql");
116                    return Err(StorageError::Backend(format!(
117                        "SQL connection required for startup: {}", e
118                    )));
119                }
120            }
121        } else {
122            warn!("No SQL URL configured - operating without ground truth storage!");
123            tracing::Span::current().record("has_sql", false);
124        }
125
126        // ========== PHASE 3: Drain WAL to SQL ==========
127        if pending_count > 0 {
128            let phase_start = std::time::Instant::now();
129            let _ = self.state.send(EngineState::DrainingWal);
130            info!(pending = pending_count, "Draining WAL to SQL before startup...");
131            
132            if let Some(ref l3) = self.l3_store {
133                if let Some(ref wal) = self.l3_wal {
134                    match wal.drain_to(l3.as_ref(), pending_count as usize).await {
135                        Ok(drained) => {
136                            info!(drained = drained.len(), "WAL drained to SQL");
137                            crate::metrics::record_items_written("L3", drained.len());
138                        }
139                        Err(e) => {
140                            warn!(error = %e, "WAL drain had errors - some items may retry later");
141                            crate::metrics::record_error("WAL", "drain", "partial");
142                        }
143                    }
144                }
145            }
146            crate::metrics::record_startup_phase("wal_drain", phase_start.elapsed());
147        }
148        
149        // ========== PHASE 4: Get SQL merkle root (trusted root) ==========
150        let sql_root: Option<[u8; 32]> = if let Some(ref sql_merkle) = self.sql_merkle {
151            match sql_merkle.root_hash().await {
152                Ok(Some(root)) => {
153                    info!(root = %hex::encode(root), "SQL merkle root (ground truth)");
154                    Some(root)
155                }
156                Ok(None) => {
157                    info!("SQL merkle tree is empty (no data yet)");
158                    None
159                }
160                Err(e) => {
161                    warn!(error = %e, "Failed to get SQL merkle root");
162                    None
163                }
164            }
165        } else {
166            None
167        };
168        
169        // ========== PHASE 5: Restore CF from snapshot (if valid) ==========
170        let phase_start = std::time::Instant::now();
171        self.restore_cuckoo_filters(&sql_root).await;
172        crate::metrics::record_startup_phase("cf_restore", phase_start.elapsed());
173
174        // ========== PHASE 6: Connect to Redis (L2 - cache) ==========
175        let phase_start = std::time::Instant::now();
176        let (redis_url, redis_prefix) = {
177            let cfg = self.config.read();
178            (cfg.redis_url.clone(), cfg.redis_prefix.clone())
179        };
180        if let Some(ref redis_url) = redis_url {
181            info!(url = %redis_url, prefix = ?redis_prefix, "Connecting to Redis (L2 - cache)...");
182            match crate::storage::redis::RedisStore::with_prefix(redis_url, redis_prefix.as_deref()).await {
183                Ok(store) => {
184                    let redis_merkle = RedisMerkleStore::with_prefix(
185                        store.connection(),
186                        redis_prefix.as_deref(),
187                    );
188                    self.redis_merkle = Some(redis_merkle);
189                    let store = std::sync::Arc::new(store);
190                    self.redis_store = Some(store.clone());  // Keep direct reference for CDC
191                    self.l2_store = Some(store);
192                    tracing::Span::current().record("has_redis", true);
193                    crate::metrics::set_backend_healthy("redis", true);
194                    crate::metrics::record_startup_phase("redis_connect", phase_start.elapsed());
195                    info!("Redis (L2) connected with merkle shadow tree");
196                }
197                Err(e) => {
198                    tracing::Span::current().record("has_redis", false);
199                    warn!(error = %e, "Failed to connect to Redis, continuing without L2 cache");
200                    crate::metrics::set_backend_healthy("redis", false);
201                    crate::metrics::record_connection_error("redis");
202                }
203            }
204        } else {
205            tracing::Span::current().record("has_redis", false);
206        }
207
208        // ========== PHASE 7: Sync Redis with SQL via branch diff ==========
209        if let (Some(ref sql_merkle), Some(ref redis_merkle), Some(ref sql_root)) = 
210            (&self.sql_merkle, &self.redis_merkle, &sql_root) 
211        {
212            let phase_start = std::time::Instant::now();
213            let _ = self.state.send(EngineState::SyncingRedis);
214            
215            match redis_merkle.root_hash().await {
216                Ok(Some(redis_root)) if &redis_root == sql_root => {
217                    info!("Redis merkle root matches SQL - Redis is in sync");
218                }
219                Ok(Some(redis_root)) => {
220                    info!(
221                        sql_root = %hex::encode(sql_root),
222                        redis_root = %hex::encode(redis_root),
223                        "Redis merkle root mismatch - initiating branch diff sync"
224                    );
225                    
226                    match self.sync_redis_from_sql_diff(sql_merkle, redis_merkle).await {
227                        Ok(synced) => {
228                            info!(items_synced = synced, "Redis sync complete via branch diff");
229                            crate::metrics::record_items_written("L2", synced);
230                        }
231                        Err(e) => {
232                            warn!(error = %e, "Branch diff sync failed - Redis may be stale");
233                            crate::metrics::record_error("L2", "sync", "branch_diff");
234                        }
235                    }
236                }
237                Ok(None) => {
238                    info!("Redis merkle tree is empty - will be populated on writes");
239                }
240                Err(e) => {
241                    warn!(error = %e, "Failed to get Redis merkle root - Redis may be stale");
242                    crate::metrics::record_error("L2", "merkle", "root_hash");
243                }
244            }
245            crate::metrics::record_startup_phase("redis_sync", phase_start.elapsed());
246        }
247
248        let _ = self.state.send(EngineState::Ready);
249        crate::metrics::record_startup_total(startup_start.elapsed());
250        info!("Sync engine ready (trust-verified startup complete)");
251        Ok(())
252    }
253    
254    /// Restore cuckoo filters from snapshots if merkle roots match
255    async fn restore_cuckoo_filters(&self, sql_root: &Option<[u8; 32]>) {
256        let persistence = match &self.filter_persistence {
257            Some(p) => p,
258            None => return,
259        };
260        
261        let sql_root = match sql_root {
262            Some(r) => r,
263            None => return,
264        };
265        
266        // Note: L2 filter removed (TTL makes it untrustworthy - use Redis EXISTS)
267        
268        // Try L3 filter
269        match persistence.load(L3_FILTER_ID).await {
270            Ok(Some(state)) if &state.merkle_root == sql_root => {
271                if let Err(e) = self.l3_filter.import(&state.filter_bytes) {
272                    warn!(error = %e, "Failed to import L3 filter from snapshot");
273                } else {
274                    self.l3_filter.mark_trusted();
275                    info!(entries = state.entry_count, "Restored L3 cuckoo filter from snapshot");
276                }
277            }
278            Ok(Some(_)) => warn!("L3 CF snapshot merkle root mismatch - filter will be rebuilt"),
279            Ok(None) => info!("No L3 CF snapshot found - filter will be built on warmup"),
280            Err(e) => warn!(error = %e, "Failed to load L3 CF snapshot"),
281        }
282    }
283    
284    /// Sync Redis from SQL by diffing merkle trees and only syncing stale branches.
285    async fn sync_redis_from_sql_diff(
286        &self,
287        sql_merkle: &SqlMerkleStore,
288        redis_merkle: &RedisMerkleStore,
289    ) -> Result<usize, StorageError> {
290        let mut total_synced = 0;
291        let stale_prefixes = self.find_stale_branches(sql_merkle, redis_merkle, "").await?;
292        
293        for prefix in stale_prefixes {
294            info!(prefix = %prefix, "Syncing stale branch from SQL to Redis");
295            
296            let leaf_paths = sql_merkle.get_leaves_under(&prefix).await
297                .map_err(|e| StorageError::Backend(format!("Failed to get leaves: {}", e)))?;
298            
299            if leaf_paths.is_empty() {
300                continue;
301            }
302            
303            let mut merkle_batch = MerkleBatch::new();
304            
305            if let Some(ref l3_store) = self.l3_store {
306                for object_id in &leaf_paths {
307                    if let Ok(Some(item)) = l3_store.get(object_id).await {
308                        let payload_hash = PathMerkle::payload_hash(&item.content);
309                        let leaf_hash = PathMerkle::leaf_hash(
310                            &item.object_id,
311                            item.version,
312                            item.updated_at,
313                            &payload_hash,
314                        );
315                        merkle_batch.insert(object_id.clone(), leaf_hash);
316                        
317                        if let Some(ref l2_store) = self.l2_store {
318                            if let Err(e) = l2_store.put(&item).await {
319                                warn!(id = %object_id, error = %e, "Failed to sync item to Redis");
320                            } else {
321                                total_synced += 1;
322                            }
323                        }
324                    }
325                }
326                
327                if !merkle_batch.is_empty() {
328                    if let Err(e) = redis_merkle.apply_batch(&merkle_batch).await {
329                        warn!(prefix = %prefix, error = %e, "Failed to update Redis merkle");
330                    }
331                }
332            }
333        }
334        
335        Ok(total_synced)
336    }
337    
338    /// Find stale branches by recursively comparing SQL and Redis merkle nodes.
339    async fn find_stale_branches(
340        &self,
341        sql_merkle: &SqlMerkleStore,
342        redis_merkle: &RedisMerkleStore,
343        prefix: &str,
344    ) -> Result<Vec<String>, StorageError> {
345        let mut stale = Vec::new();
346        
347        let sql_children = sql_merkle.get_children(prefix).await
348            .map_err(|e| StorageError::Backend(format!("SQL merkle error: {}", e)))?;
349        let redis_children = redis_merkle.get_children(prefix).await
350            .map_err(|e| StorageError::Backend(format!("Redis merkle error: {}", e)))?;
351        
352        if sql_children.is_empty() {
353            return Ok(stale);
354        }
355        
356        for (child_path, sql_hash) in sql_children {
357            match redis_children.get(&child_path) {
358                Some(redis_hash) if redis_hash == &sql_hash => continue,
359                Some(_) => {
360                    if child_path.contains('.') && !child_path.ends_with('.') {
361                        stale.push(child_path);
362                    } else {
363                        let sub_stale = Box::pin(
364                            self.find_stale_branches(sql_merkle, redis_merkle, &child_path)
365                        ).await?;
366                        stale.extend(sub_stale);
367                    }
368                }
369                None => stale.push(child_path),
370            }
371        }
372        
373        Ok(stale)
374    }
375
376    /// Warm up L1 cache from L2/L3 (call after start)
377    #[tracing::instrument(skip(self))]
378    pub async fn warm_up(&mut self) -> Result<(), StorageError> {
379        let _ = self.state.send(EngineState::WarmingUp);
380        info!("Warming up cuckoo filter and L1 cache...");
381        
382        if let Some(l3) = &self.l3_store {
383            let batch_size = self.config.read().cuckoo_warmup_batch_size;
384            info!(batch_size, "Warming L3 cuckoo filter from MySQL...");
385            
386            let total_count = l3.count_all().await.unwrap_or(0);
387            if total_count > 0 {
388                let mut offset = 0u64;
389                let mut loaded = 0usize;
390                
391                loop {
392                    let keys = l3.scan_keys(offset, batch_size).await?;
393                    if keys.is_empty() {
394                        break;
395                    }
396                    
397                    for key in &keys {
398                        self.l3_filter.insert(key);
399                    }
400                    
401                    loaded += keys.len();
402                    offset += keys.len() as u64;
403                    
404                    if loaded % 10_000 == 0 || loaded == total_count as usize {
405                        debug!(loaded, total = %total_count, "L3 filter warmup progress");
406                    }
407                }
408                
409                self.l3_filter.mark_trusted();
410                info!(loaded, trust_state = ?self.l3_filter.trust_state(), "L3 cuckoo filter warmup complete");
411            } else {
412                info!("L3 store is empty, skipping filter warmup");
413                self.l3_filter.mark_trusted();
414            }
415        }
416        
417        info!(
418            l3_trust = ?self.l3_filter.trust_state(),
419            "Cuckoo filter warmup complete (L3 only, Redis uses EXISTS)"
420        );
421
422        let _ = self.state.send(EngineState::Ready);
423        info!("Warm-up complete, engine ready");
424        Ok(())
425    }
426
427    /// Perform one tick of maintenance (for manual control instead of run loop).
428    pub async fn tick(&self) {
429        self.maybe_evict();
430        self.maybe_flush_l2().await;
431    }
432
433    /// Force flush all pending L2 batches immediately.
434    pub async fn force_flush(&self) {
435        let batch = self.l2_batcher.lock().await.force_flush();
436        if let Some(batch) = batch {
437            debug!(batch_size = batch.items.len(), "Force flushing L2 batch");
438            self.flush_batch_internal(batch).await;
439        }
440    }
441
442    /// Run the main event loop.
443    /// 
444    /// This method takes `&self` (not `&mut self`) so the engine can be
445    /// shared via `Arc` while the run loop executes in a background task.
446    /// 
447    /// # Example
448    /// 
449    /// ```rust,ignore
450    /// let engine = Arc::new(engine);
451    /// let engine_clone = engine.clone();
452    /// tokio::spawn(async move {
453    ///     engine_clone.run().await;
454    /// });
455    /// ```
456    #[tracing::instrument(skip(self))]
457    pub async fn run(&self) {
458        let _ = self.state.send(EngineState::Running);
459        info!("Sync engine running");
460
461        let mut health_check_interval = tokio::time::interval(
462            tokio::time::Duration::from_secs(30)
463        );
464        let mut wal_drain_interval = tokio::time::interval(
465            tokio::time::Duration::from_secs(5)
466        );
467        let cf_snapshot_secs = self.config.read().cf_snapshot_interval_secs;
468        let mut cf_snapshot_interval = tokio::time::interval(
469            tokio::time::Duration::from_secs(cf_snapshot_secs)
470        );
471
472        loop {
473            // Lock config_rx only for the check, not across awaits
474            let config_changed = {
475                let rx = self.config_rx.lock().await;
476                // Use poll-style check to avoid holding lock
477                rx.has_changed().unwrap_or(false)
478            };
479            
480            if config_changed {
481                let new_config = self.config_rx.lock().await.borrow_and_update().clone();
482                info!("Config updated: l1_max_bytes={}", new_config.l1_max_bytes);
483                *self.config.write() = new_config;
484            }
485            
486            tokio::select! {
487                _ = tokio::time::sleep(tokio::time::Duration::from_millis(100)) => {
488                    self.maybe_evict();
489                    self.maybe_flush_l2().await;
490                    self.maybe_snapshot_cf_by_threshold().await;
491                }
492                
493                _ = health_check_interval.tick() => {
494                    self.check_mysql_health().await;
495                }
496                
497                _ = wal_drain_interval.tick() => {
498                    self.maybe_drain_wal().await;
499                }
500                
501                _ = cf_snapshot_interval.tick() => {
502                    self.maybe_snapshot_cf_by_time().await;
503                }
504            }
505        }
506    }
507
508    /// Initiate graceful shutdown
509    #[tracing::instrument(skip(self))]
510    pub async fn shutdown(&self) {
511        use crate::FlushReason;
512        
513        let shutdown_start = std::time::Instant::now();
514        info!("Initiating sync engine shutdown...");
515        let _ = self.state.send(EngineState::ShuttingDown);
516        
517        let batch = self.l2_batcher.lock().await.force_flush_with_reason(FlushReason::Shutdown);
518        if let Some(batch) = batch {
519            let batch_size = batch.items.len();
520            info!(batch_size, "Flushing final L2 batch on shutdown");
521            {
522                let mut batcher = self.l2_batcher.lock().await;
523                batcher.add_batch(batch.items);
524            }
525            self.maybe_flush_l2().await;
526            crate::metrics::record_items_written("L2", batch_size);
527        }
528        
529        self.snapshot_cuckoo_filters("shutdown").await;
530        
531        crate::metrics::record_startup_phase("shutdown", shutdown_start.elapsed());
532        info!("Sync engine shutdown complete");
533    }
534}