sync_engine/coordinator/
lifecycle.rs

1//! Engine lifecycle management: start, shutdown, run loop.
2//!
3//! This module contains the startup sequence, main run loop, and shutdown logic.
4
5use tracing::{info, warn, debug, error};
6
7use crate::storage::traits::StorageError;
8use crate::cuckoo::{FilterPersistence, L3_FILTER_ID};
9use crate::merkle::{RedisMerkleStore, SqlMerkleStore, MerkleBatch, PathMerkle};
10use crate::resilience::wal::WriteAheadLog;
11
12use super::{SyncEngine, EngineState};
13#[allow(unused_imports)]
14use super::WriteTarget;
15
16impl SyncEngine {
17    /// Start the engine - connect to backends with proper startup sequence.
18    /// 
19    /// Startup flow (trust hierarchy):
20    /// 1. Initialize WAL (SQLite) - always first, our durability lifeline
21    /// 2. Connect to SQL (L3) - ground truth, initialize SQL merkle store
22    /// 3. Drain any pending WAL entries to SQL (blocking)
23    /// 4. Get SQL merkle root (this is our trusted root)
24    /// 5. Load CF snapshots from SQLite:
25    ///    - If snapshot merkle root matches SQL root → CF is trusted
26    ///    - Otherwise → CF must be rebuilt from SQL scan
27    /// 6. Connect to Redis (L2) - cache layer
28    /// 7. Compare Redis merkle root with SQL merkle root
29    ///    - If match → Redis is synced
30    ///    - If mismatch → use branch diff to find stale regions and resync
31    /// 8. Ready!
32    #[tracing::instrument(skip(self), fields(has_redis, has_sql))]
33    pub async fn start(&mut self) -> Result<(), StorageError> {
34        let startup_start = std::time::Instant::now();
35        info!("Starting sync engine with trust-verified startup...");
36        let _ = self.state.send(EngineState::Connecting);
37
38        // ========== PHASE 1: Initialize WAL (always first) ==========
39        let phase_start = std::time::Instant::now();
40        let wal_path = self.config.wal_path.clone()
41            .unwrap_or_else(|| "./sync_engine_wal.db".to_string());
42        let wal_max_items = self.config.wal_max_items.unwrap_or(1_000_000);
43        
44        let wal = match WriteAheadLog::new(&wal_path, wal_max_items).await {
45            Ok(wal) => {
46                info!(path = %wal_path, "Write-ahead log initialized");
47                crate::metrics::record_startup_phase("wal_init", phase_start.elapsed());
48                wal
49            }
50            Err(e) => {
51                crate::metrics::record_error("WAL", "init", "sqlite");
52                return Err(StorageError::Backend(format!(
53                    "Failed to initialize WAL at {}: {}. Cannot guarantee durability!",
54                    wal_path, e
55                )));
56            }
57        };
58        
59        // Initialize filter persistence (uses same WAL SQLite)
60        match FilterPersistence::new(&wal_path).await {
61            Ok(fp) => {
62                self.filter_persistence = Some(fp);
63            }
64            Err(e) => {
65                warn!(error = %e, "Failed to initialize filter persistence - CF snapshots disabled");
66                crate::metrics::record_error("filter", "init", "persistence");
67            }
68        }
69        
70        let pending_count = if wal.has_pending() {
71            wal.stats(false).pending_items
72        } else {
73            0
74        };
75        self.l3_wal = Some(wal);
76
77        // ========== PHASE 2: Connect to SQL (L3 - ground truth) ==========
78        let phase_start = std::time::Instant::now();
79        if let Some(ref sql_url) = self.config.sql_url {
80            info!(url = %sql_url, "Connecting to SQL (L3 - ground truth)...");
81            match crate::storage::sql::SqlStore::new(sql_url).await {
82                Ok(store) => {
83                    // Initialize SQL merkle store (ground truth) - shares pool with SqlStore
84                    let is_sqlite = sql_url.starts_with("sqlite:");
85                    let sql_merkle = SqlMerkleStore::from_pool(store.pool(), is_sqlite);
86                    if let Err(e) = sql_merkle.init_schema().await {
87                        error!(error = %e, "Failed to initialize SQL merkle schema");
88                        crate::metrics::record_error("L3", "init", "merkle_schema");
89                        return Err(StorageError::Backend(format!(
90                            "Failed to initialize SQL merkle schema: {}", e
91                        )));
92                    }
93                    self.sql_merkle = Some(sql_merkle);
94                    
95                    // Keep both Arc<dyn ArchiveStore> and Arc<SqlStore> for dirty merkle access
96                    let store = std::sync::Arc::new(store);
97                    self.sql_store = Some(store.clone());
98                    self.l3_store = Some(store);
99                    tracing::Span::current().record("has_sql", true);
100                    self.mysql_health.record_success();
101                    crate::metrics::set_backend_healthy("mysql", true);
102                    crate::metrics::record_startup_phase("sql_connect", phase_start.elapsed());
103                    info!("SQL (L3) connected with merkle store (ground truth)");
104                }
105                Err(e) => {
106                    tracing::Span::current().record("has_sql", false);
107                    error!(error = %e, "Failed to connect to SQL - this is required for startup");
108                    self.mysql_health.record_failure();
109                    crate::metrics::set_backend_healthy("mysql", false);
110                    crate::metrics::record_connection_error("mysql");
111                    return Err(StorageError::Backend(format!(
112                        "SQL connection required for startup: {}", e
113                    )));
114                }
115            }
116        } else {
117            warn!("No SQL URL configured - operating without ground truth storage!");
118            tracing::Span::current().record("has_sql", false);
119        }
120
121        // ========== PHASE 3: Drain WAL to SQL ==========
122        if pending_count > 0 {
123            let phase_start = std::time::Instant::now();
124            let _ = self.state.send(EngineState::DrainingWal);
125            info!(pending = pending_count, "Draining WAL to SQL before startup...");
126            
127            if let Some(ref l3) = self.l3_store {
128                if let Some(ref wal) = self.l3_wal {
129                    match wal.drain_to(l3.as_ref(), pending_count as usize).await {
130                        Ok(drained) => {
131                            info!(drained = drained.len(), "WAL drained to SQL");
132                            crate::metrics::record_items_written("L3", drained.len());
133                        }
134                        Err(e) => {
135                            warn!(error = %e, "WAL drain had errors - some items may retry later");
136                            crate::metrics::record_error("WAL", "drain", "partial");
137                        }
138                    }
139                }
140            }
141            crate::metrics::record_startup_phase("wal_drain", phase_start.elapsed());
142        }
143        
144        // ========== PHASE 4: Get SQL merkle root (trusted root) ==========
145        let sql_root: Option<[u8; 32]> = if let Some(ref sql_merkle) = self.sql_merkle {
146            match sql_merkle.root_hash().await {
147                Ok(Some(root)) => {
148                    info!(root = %hex::encode(root), "SQL merkle root (ground truth)");
149                    Some(root)
150                }
151                Ok(None) => {
152                    info!("SQL merkle tree is empty (no data yet)");
153                    None
154                }
155                Err(e) => {
156                    warn!(error = %e, "Failed to get SQL merkle root");
157                    None
158                }
159            }
160        } else {
161            None
162        };
163        
164        // ========== PHASE 5: Restore CF from snapshot (if valid) ==========
165        let phase_start = std::time::Instant::now();
166        self.restore_cuckoo_filters(&sql_root).await;
167        crate::metrics::record_startup_phase("cf_restore", phase_start.elapsed());
168
169        // ========== PHASE 6: Connect to Redis (L2 - cache) ==========
170        let phase_start = std::time::Instant::now();
171        if let Some(ref redis_url) = self.config.redis_url {
172            info!(url = %redis_url, prefix = ?self.config.redis_prefix, "Connecting to Redis (L2 - cache)...");
173            match crate::storage::redis::RedisStore::with_prefix(redis_url, self.config.redis_prefix.as_deref()).await {
174                Ok(store) => {
175                    let redis_merkle = RedisMerkleStore::with_prefix(
176                        store.connection(),
177                        self.config.redis_prefix.as_deref(),
178                    );
179                    self.redis_merkle = Some(redis_merkle);
180                    self.l2_store = Some(std::sync::Arc::new(store));
181                    tracing::Span::current().record("has_redis", true);
182                    crate::metrics::set_backend_healthy("redis", true);
183                    crate::metrics::record_startup_phase("redis_connect", phase_start.elapsed());
184                    info!("Redis (L2) connected with merkle shadow tree");
185                }
186                Err(e) => {
187                    tracing::Span::current().record("has_redis", false);
188                    warn!(error = %e, "Failed to connect to Redis, continuing without L2 cache");
189                    crate::metrics::set_backend_healthy("redis", false);
190                    crate::metrics::record_connection_error("redis");
191                }
192            }
193        } else {
194            tracing::Span::current().record("has_redis", false);
195        }
196
197        // ========== PHASE 7: Sync Redis with SQL via branch diff ==========
198        if let (Some(ref sql_merkle), Some(ref redis_merkle), Some(ref sql_root)) = 
199            (&self.sql_merkle, &self.redis_merkle, &sql_root) 
200        {
201            let phase_start = std::time::Instant::now();
202            let _ = self.state.send(EngineState::SyncingRedis);
203            
204            match redis_merkle.root_hash().await {
205                Ok(Some(redis_root)) if &redis_root == sql_root => {
206                    info!("Redis merkle root matches SQL - Redis is in sync");
207                }
208                Ok(Some(redis_root)) => {
209                    info!(
210                        sql_root = %hex::encode(sql_root),
211                        redis_root = %hex::encode(redis_root),
212                        "Redis merkle root mismatch - initiating branch diff sync"
213                    );
214                    
215                    match self.sync_redis_from_sql_diff(sql_merkle, redis_merkle).await {
216                        Ok(synced) => {
217                            info!(items_synced = synced, "Redis sync complete via branch diff");
218                            crate::metrics::record_items_written("L2", synced);
219                        }
220                        Err(e) => {
221                            warn!(error = %e, "Branch diff sync failed - Redis may be stale");
222                            crate::metrics::record_error("L2", "sync", "branch_diff");
223                        }
224                    }
225                }
226                Ok(None) => {
227                    info!("Redis merkle tree is empty - will be populated on writes");
228                }
229                Err(e) => {
230                    warn!(error = %e, "Failed to get Redis merkle root - Redis may be stale");
231                    crate::metrics::record_error("L2", "merkle", "root_hash");
232                }
233            }
234            crate::metrics::record_startup_phase("redis_sync", phase_start.elapsed());
235        }
236
237        let _ = self.state.send(EngineState::Ready);
238        crate::metrics::record_startup_total(startup_start.elapsed());
239        info!("Sync engine ready (trust-verified startup complete)");
240        Ok(())
241    }
242    
243    /// Restore cuckoo filters from snapshots if merkle roots match
244    async fn restore_cuckoo_filters(&self, sql_root: &Option<[u8; 32]>) {
245        let persistence = match &self.filter_persistence {
246            Some(p) => p,
247            None => return,
248        };
249        
250        let sql_root = match sql_root {
251            Some(r) => r,
252            None => return,
253        };
254        
255        // Note: L2 filter removed (TTL makes it untrustworthy - use Redis EXISTS)
256        
257        // Try L3 filter
258        match persistence.load(L3_FILTER_ID).await {
259            Ok(Some(state)) if &state.merkle_root == sql_root => {
260                if let Err(e) = self.l3_filter.import(&state.filter_bytes) {
261                    warn!(error = %e, "Failed to import L3 filter from snapshot");
262                } else {
263                    self.l3_filter.mark_trusted();
264                    info!(entries = state.entry_count, "Restored L3 cuckoo filter from snapshot");
265                }
266            }
267            Ok(Some(_)) => warn!("L3 CF snapshot merkle root mismatch - filter will be rebuilt"),
268            Ok(None) => info!("No L3 CF snapshot found - filter will be built on warmup"),
269            Err(e) => warn!(error = %e, "Failed to load L3 CF snapshot"),
270        }
271    }
272    
273    /// Sync Redis from SQL by diffing merkle trees and only syncing stale branches.
274    async fn sync_redis_from_sql_diff(
275        &self,
276        sql_merkle: &SqlMerkleStore,
277        redis_merkle: &RedisMerkleStore,
278    ) -> Result<usize, StorageError> {
279        let mut total_synced = 0;
280        let stale_prefixes = self.find_stale_branches(sql_merkle, redis_merkle, "").await?;
281        
282        for prefix in stale_prefixes {
283            info!(prefix = %prefix, "Syncing stale branch from SQL to Redis");
284            
285            let leaf_paths = sql_merkle.get_leaves_under(&prefix).await
286                .map_err(|e| StorageError::Backend(format!("Failed to get leaves: {}", e)))?;
287            
288            if leaf_paths.is_empty() {
289                continue;
290            }
291            
292            let mut merkle_batch = MerkleBatch::new();
293            
294            if let Some(ref l3_store) = self.l3_store {
295                for object_id in &leaf_paths {
296                    if let Ok(Some(item)) = l3_store.get(object_id).await {
297                        let payload_hash = PathMerkle::payload_hash(&item.content);
298                        let leaf_hash = PathMerkle::leaf_hash(
299                            &item.object_id,
300                            item.version,
301                            item.updated_at,
302                            &payload_hash,
303                        );
304                        merkle_batch.insert(object_id.clone(), leaf_hash);
305                        
306                        if let Some(ref l2_store) = self.l2_store {
307                            if let Err(e) = l2_store.put(&item).await {
308                                warn!(id = %object_id, error = %e, "Failed to sync item to Redis");
309                            } else {
310                                total_synced += 1;
311                            }
312                        }
313                    }
314                }
315                
316                if !merkle_batch.is_empty() {
317                    if let Err(e) = redis_merkle.apply_batch(&merkle_batch).await {
318                        warn!(prefix = %prefix, error = %e, "Failed to update Redis merkle");
319                    }
320                }
321            }
322        }
323        
324        Ok(total_synced)
325    }
326    
327    /// Find stale branches by recursively comparing SQL and Redis merkle nodes.
328    async fn find_stale_branches(
329        &self,
330        sql_merkle: &SqlMerkleStore,
331        redis_merkle: &RedisMerkleStore,
332        prefix: &str,
333    ) -> Result<Vec<String>, StorageError> {
334        let mut stale = Vec::new();
335        
336        let sql_children = sql_merkle.get_children(prefix).await
337            .map_err(|e| StorageError::Backend(format!("SQL merkle error: {}", e)))?;
338        let redis_children = redis_merkle.get_children(prefix).await
339            .map_err(|e| StorageError::Backend(format!("Redis merkle error: {}", e)))?;
340        
341        if sql_children.is_empty() {
342            return Ok(stale);
343        }
344        
345        for (child_path, sql_hash) in sql_children {
346            match redis_children.get(&child_path) {
347                Some(redis_hash) if redis_hash == &sql_hash => continue,
348                Some(_) => {
349                    if child_path.contains('.') && !child_path.ends_with('.') {
350                        stale.push(child_path);
351                    } else {
352                        let sub_stale = Box::pin(
353                            self.find_stale_branches(sql_merkle, redis_merkle, &child_path)
354                        ).await?;
355                        stale.extend(sub_stale);
356                    }
357                }
358                None => stale.push(child_path),
359            }
360        }
361        
362        Ok(stale)
363    }
364
365    /// Warm up L1 cache from L2/L3 (call after start)
366    #[tracing::instrument(skip(self))]
367    pub async fn warm_up(&mut self) -> Result<(), StorageError> {
368        let _ = self.state.send(EngineState::WarmingUp);
369        info!("Warming up cuckoo filter and L1 cache...");
370        
371        if let Some(l3) = &self.l3_store {
372            let batch_size = self.config.cuckoo_warmup_batch_size;
373            info!(batch_size, "Warming L3 cuckoo filter from MySQL...");
374            
375            let total_count = l3.count_all().await.unwrap_or(0);
376            if total_count > 0 {
377                let mut offset = 0u64;
378                let mut loaded = 0usize;
379                
380                loop {
381                    let keys = l3.scan_keys(offset, batch_size).await?;
382                    if keys.is_empty() {
383                        break;
384                    }
385                    
386                    for key in &keys {
387                        self.l3_filter.insert(key);
388                    }
389                    
390                    loaded += keys.len();
391                    offset += keys.len() as u64;
392                    
393                    if loaded % 10_000 == 0 || loaded == total_count as usize {
394                        debug!(loaded, total = %total_count, "L3 filter warmup progress");
395                    }
396                }
397                
398                self.l3_filter.mark_trusted();
399                info!(loaded, trust_state = ?self.l3_filter.trust_state(), "L3 cuckoo filter warmup complete");
400            } else {
401                info!("L3 store is empty, skipping filter warmup");
402                self.l3_filter.mark_trusted();
403            }
404        }
405        
406        info!(
407            l3_trust = ?self.l3_filter.trust_state(),
408            "Cuckoo filter warmup complete (L3 only, Redis uses EXISTS)"
409        );
410
411        let _ = self.state.send(EngineState::Ready);
412        info!("Warm-up complete, engine ready");
413        Ok(())
414    }
415
416    /// Perform one tick of maintenance (for manual control instead of run loop).
417    pub async fn tick(&self) {
418        self.maybe_evict();
419        self.maybe_flush_l2().await;
420    }
421
422    /// Force flush all pending L2 batches immediately.
423    pub async fn force_flush(&self) {
424        let batch = self.l2_batcher.lock().await.force_flush();
425        if let Some(batch) = batch {
426            debug!(batch_size = batch.items.len(), "Force flushing L2 batch");
427            self.flush_batch_internal(batch).await;
428        }
429    }
430
431    /// Run the main event loop
432    #[tracing::instrument(skip(self))]
433    pub async fn run(&mut self) {
434        let _ = self.state.send(EngineState::Running);
435        info!("Sync engine running");
436
437        let mut health_check_interval = tokio::time::interval(
438            tokio::time::Duration::from_secs(30)
439        );
440        let mut wal_drain_interval = tokio::time::interval(
441            tokio::time::Duration::from_secs(5)
442        );
443        let mut cf_snapshot_interval = tokio::time::interval(
444            tokio::time::Duration::from_secs(self.config.cf_snapshot_interval_secs)
445        );
446
447        loop {
448            tokio::select! {
449                Ok(()) = self.config_rx.changed() => {
450                    let new_config = self.config_rx.borrow().clone();
451                    info!("Config updated: l1_max_bytes={}", new_config.l1_max_bytes);
452                    self.config = new_config;
453                }
454                
455                _ = tokio::time::sleep(tokio::time::Duration::from_millis(100)) => {
456                    self.maybe_evict();
457                    self.maybe_flush_l2().await;
458                    self.maybe_snapshot_cf_by_threshold().await;
459                }
460                
461                _ = health_check_interval.tick() => {
462                    self.check_mysql_health().await;
463                }
464                
465                _ = wal_drain_interval.tick() => {
466                    self.maybe_drain_wal().await;
467                }
468                
469                _ = cf_snapshot_interval.tick() => {
470                    self.maybe_snapshot_cf_by_time().await;
471                }
472            }
473        }
474    }
475
476    /// Initiate graceful shutdown
477    #[tracing::instrument(skip(self))]
478    pub async fn shutdown(&mut self) {
479        use crate::FlushReason;
480        
481        let shutdown_start = std::time::Instant::now();
482        info!("Initiating sync engine shutdown...");
483        let _ = self.state.send(EngineState::ShuttingDown);
484        
485        let batch = self.l2_batcher.lock().await.force_flush_with_reason(FlushReason::Shutdown);
486        if let Some(batch) = batch {
487            let batch_size = batch.items.len();
488            info!(batch_size, "Flushing final L2 batch on shutdown");
489            {
490                let mut batcher = self.l2_batcher.lock().await;
491                batcher.add_batch(batch.items);
492            }
493            self.maybe_flush_l2().await;
494            crate::metrics::record_items_written("L2", batch_size);
495        }
496        
497        self.snapshot_cuckoo_filters("shutdown").await;
498        
499        crate::metrics::record_startup_phase("shutdown", shutdown_start.elapsed());
500        info!("Sync engine shutdown complete");
501    }
502}