sync_engine/coordinator/
lifecycle.rs

1// Copyright (c) 2025-2026 Adrian Robinson. Licensed under the AGPL-3.0.
2// See LICENSE file in the project root for full license text.
3
4//! Engine lifecycle management: start, shutdown, run loop.
5//!
6//! This module contains the startup sequence, main run loop, and shutdown logic.
7
8use tracing::{info, warn, debug, error};
9
10use crate::storage::traits::StorageError;
11use crate::cuckoo::{FilterPersistence, L3_FILTER_ID};
12use crate::merkle::{RedisMerkleStore, SqlMerkleStore, MerkleBatch, PathMerkle};
13use crate::resilience::wal::WriteAheadLog;
14
15use super::{SyncEngine, EngineState};
16#[allow(unused_imports)]
17use super::WriteTarget;
18
19impl SyncEngine {
20    /// Start the engine - connect to backends with proper startup sequence.
21    /// 
22    /// Startup flow (trust hierarchy):
23    /// 1. Initialize WAL (SQLite) - always first, our durability lifeline
24    /// 2. Connect to SQL (L3) - ground truth, initialize SQL merkle store
25    /// 3. Drain any pending WAL entries to SQL (blocking)
26    /// 4. Get SQL merkle root (this is our trusted root)
27    /// 5. Load CF snapshots from SQLite:
28    ///    - If snapshot merkle root matches SQL root → CF is trusted
29    ///    - Otherwise → CF must be rebuilt from SQL scan
30    /// 6. Connect to Redis (L2) - cache layer
31    /// 7. Compare Redis merkle root with SQL merkle root
32    ///    - If match → Redis is synced
33    ///    - If mismatch → use branch diff to find stale regions and resync
34    /// 8. Ready!
35    #[tracing::instrument(skip(self), fields(has_redis, has_sql))]
36    pub async fn start(&mut self) -> Result<(), StorageError> {
37        let startup_start = std::time::Instant::now();
38        info!("Starting sync engine with trust-verified startup...");
39        let _ = self.state.send(EngineState::Connecting);
40
41        // ========== PHASE 1: Initialize WAL (always first) ==========
42        let phase_start = std::time::Instant::now();
43        let (wal_path, wal_max_items) = {
44            let cfg = self.config.read();
45            (
46                cfg.wal_path.clone().unwrap_or_else(|| "./sync_engine_wal.db".to_string()),
47                cfg.wal_max_items.unwrap_or(1_000_000),
48            )
49        };
50        
51        let wal = match WriteAheadLog::new(&wal_path, wal_max_items).await {
52            Ok(wal) => {
53                info!(path = %wal_path, "Write-ahead log initialized");
54                crate::metrics::record_startup_phase("wal_init", phase_start.elapsed());
55                wal
56            }
57            Err(e) => {
58                crate::metrics::record_error("WAL", "init", "sqlite");
59                return Err(StorageError::Backend(format!(
60                    "Failed to initialize WAL at {}: {}. Cannot guarantee durability!",
61                    wal_path, e
62                )));
63            }
64        };
65        
66        // Initialize filter persistence (uses same WAL SQLite)
67        match FilterPersistence::new(&wal_path).await {
68            Ok(fp) => {
69                self.filter_persistence = Some(fp);
70            }
71            Err(e) => {
72                warn!(error = %e, "Failed to initialize filter persistence - CF snapshots disabled");
73                crate::metrics::record_error("filter", "init", "persistence");
74            }
75        }
76        
77        let pending_count = if wal.has_pending() {
78            wal.stats(false).pending_items
79        } else {
80            0
81        };
82        self.l3_wal = Some(wal);
83
84        // ========== PHASE 2: Connect to SQL (L3 - ground truth) ==========
85        let phase_start = std::time::Instant::now();
86        let sql_url = self.config.read().sql_url.clone();
87        if let Some(ref sql_url) = sql_url {
88            info!(url = %sql_url, "Connecting to SQL (L3 - ground truth)...");
89            match crate::storage::sql::SqlStore::new(sql_url).await {
90                Ok(store) => {
91                    // Initialize SQL merkle store (ground truth) - shares pool with SqlStore
92                    let is_sqlite = sql_url.starts_with("sqlite:");
93                    let sql_merkle = SqlMerkleStore::from_pool(store.pool(), is_sqlite);
94                    if let Err(e) = sql_merkle.init_schema().await {
95                        error!(error = %e, "Failed to initialize SQL merkle schema");
96                        crate::metrics::record_error("L3", "init", "merkle_schema");
97                        return Err(StorageError::Backend(format!(
98                            "Failed to initialize SQL merkle schema: {}", e
99                        )));
100                    }
101                    self.sql_merkle = Some(sql_merkle);
102                    
103                    // Keep both Arc<dyn ArchiveStore> and Arc<SqlStore> for dirty merkle access
104                    let store = std::sync::Arc::new(store);
105                    self.sql_store = Some(store.clone());
106                    self.l3_store = Some(store);
107                    tracing::Span::current().record("has_sql", true);
108                    self.mysql_health.record_success();
109                    crate::metrics::set_backend_healthy("mysql", true);
110                    crate::metrics::record_startup_phase("sql_connect", phase_start.elapsed());
111                    info!("SQL (L3) connected with merkle store (ground truth)");
112                }
113                Err(e) => {
114                    tracing::Span::current().record("has_sql", false);
115                    error!(error = %e, "Failed to connect to SQL - this is required for startup");
116                    self.mysql_health.record_failure();
117                    crate::metrics::set_backend_healthy("mysql", false);
118                    crate::metrics::record_connection_error("mysql");
119                    return Err(StorageError::Backend(format!(
120                        "SQL connection required for startup: {}", e
121                    )));
122                }
123            }
124        } else {
125            warn!("No SQL URL configured - operating without ground truth storage!");
126            tracing::Span::current().record("has_sql", false);
127        }
128
129        // ========== PHASE 3: Drain WAL to SQL ==========
130        if pending_count > 0 {
131            let phase_start = std::time::Instant::now();
132            let _ = self.state.send(EngineState::DrainingWal);
133            info!(pending = pending_count, "Draining WAL to SQL before startup...");
134            
135            if let Some(ref l3) = self.l3_store {
136                if let Some(ref wal) = self.l3_wal {
137                    match wal.drain_to(l3.as_ref(), pending_count as usize).await {
138                        Ok(drained) => {
139                            info!(drained = drained.len(), "WAL drained to SQL");
140                            crate::metrics::record_items_written("L3", drained.len());
141                        }
142                        Err(e) => {
143                            warn!(error = %e, "WAL drain had errors - some items may retry later");
144                            crate::metrics::record_error("WAL", "drain", "partial");
145                        }
146                    }
147                }
148            }
149            crate::metrics::record_startup_phase("wal_drain", phase_start.elapsed());
150        }
151        
152        // ========== PHASE 4: Get SQL merkle root (trusted root) ==========
153        let sql_root: Option<[u8; 32]> = if let Some(ref sql_merkle) = self.sql_merkle {
154            match sql_merkle.root_hash().await {
155                Ok(Some(root)) => {
156                    info!(root = %hex::encode(root), "SQL merkle root (ground truth)");
157                    Some(root)
158                }
159                Ok(None) => {
160                    info!("SQL merkle tree is empty (no data yet)");
161                    None
162                }
163                Err(e) => {
164                    warn!(error = %e, "Failed to get SQL merkle root");
165                    None
166                }
167            }
168        } else {
169            None
170        };
171        
172        // ========== PHASE 5: Restore CF from snapshot (if valid) ==========
173        let phase_start = std::time::Instant::now();
174        self.restore_cuckoo_filters(&sql_root).await;
175        crate::metrics::record_startup_phase("cf_restore", phase_start.elapsed());
176
177        // ========== PHASE 6: Connect to Redis (L2 - cache) ==========
178        let phase_start = std::time::Instant::now();
179        let (redis_url, redis_prefix) = {
180            let cfg = self.config.read();
181            (cfg.redis_url.clone(), cfg.redis_prefix.clone())
182        };
183        if let Some(ref redis_url) = redis_url {
184            info!(url = %redis_url, prefix = ?redis_prefix, "Connecting to Redis (L2 - cache)...");
185            match crate::storage::redis::RedisStore::with_prefix(redis_url, redis_prefix.as_deref()).await {
186                Ok(store) => {
187                    let redis_merkle = RedisMerkleStore::with_prefix(
188                        store.connection(),
189                        redis_prefix.as_deref(),
190                    );
191                    self.redis_merkle = Some(redis_merkle);
192                    let store = std::sync::Arc::new(store);
193                    self.redis_store = Some(store.clone());  // Keep direct reference for CDC
194                    self.l2_store = Some(store);
195                    tracing::Span::current().record("has_redis", true);
196                    crate::metrics::set_backend_healthy("redis", true);
197                    crate::metrics::record_startup_phase("redis_connect", phase_start.elapsed());
198                    info!("Redis (L2) connected with merkle shadow tree");
199                }
200                Err(e) => {
201                    tracing::Span::current().record("has_redis", false);
202                    warn!(error = %e, "Failed to connect to Redis, continuing without L2 cache");
203                    crate::metrics::set_backend_healthy("redis", false);
204                    crate::metrics::record_connection_error("redis");
205                }
206            }
207        } else {
208            tracing::Span::current().record("has_redis", false);
209        }
210
211        // ========== PHASE 7: Sync Redis with SQL via branch diff ==========
212        if let (Some(ref sql_merkle), Some(ref redis_merkle), Some(ref sql_root)) = 
213            (&self.sql_merkle, &self.redis_merkle, &sql_root) 
214        {
215            let phase_start = std::time::Instant::now();
216            let _ = self.state.send(EngineState::SyncingRedis);
217            
218            match redis_merkle.root_hash().await {
219                Ok(Some(redis_root)) if &redis_root == sql_root => {
220                    info!("Redis merkle root matches SQL - Redis is in sync");
221                }
222                Ok(Some(redis_root)) => {
223                    info!(
224                        sql_root = %hex::encode(sql_root),
225                        redis_root = %hex::encode(redis_root),
226                        "Redis merkle root mismatch - initiating branch diff sync"
227                    );
228                    
229                    match self.sync_redis_from_sql_diff(sql_merkle, redis_merkle).await {
230                        Ok(synced) => {
231                            info!(items_synced = synced, "Redis sync complete via branch diff");
232                            crate::metrics::record_items_written("L2", synced);
233                        }
234                        Err(e) => {
235                            warn!(error = %e, "Branch diff sync failed - Redis may be stale");
236                            crate::metrics::record_error("L2", "sync", "branch_diff");
237                        }
238                    }
239                }
240                Ok(None) => {
241                    info!("Redis merkle tree is empty - will be populated on writes");
242                }
243                Err(e) => {
244                    warn!(error = %e, "Failed to get Redis merkle root - Redis may be stale");
245                    crate::metrics::record_error("L2", "merkle", "root_hash");
246                }
247            }
248            crate::metrics::record_startup_phase("redis_sync", phase_start.elapsed());
249        }
250
251        let _ = self.state.send(EngineState::Ready);
252        crate::metrics::record_startup_total(startup_start.elapsed());
253        info!("Sync engine ready (trust-verified startup complete)");
254        Ok(())
255    }
256    
257    /// Restore cuckoo filters from snapshots if merkle roots match
258    async fn restore_cuckoo_filters(&self, sql_root: &Option<[u8; 32]>) {
259        let persistence = match &self.filter_persistence {
260            Some(p) => p,
261            None => return,
262        };
263        
264        let sql_root = match sql_root {
265            Some(r) => r,
266            None => return,
267        };
268        
269        // Note: L2 filter removed (TTL makes it untrustworthy - use Redis EXISTS)
270        
271        // Try L3 filter
272        match persistence.load(L3_FILTER_ID).await {
273            Ok(Some(state)) if &state.merkle_root == sql_root => {
274                if let Err(e) = self.l3_filter.import(&state.filter_bytes) {
275                    warn!(error = %e, "Failed to import L3 filter from snapshot");
276                } else {
277                    self.l3_filter.mark_trusted();
278                    info!(entries = state.entry_count, "Restored L3 cuckoo filter from snapshot");
279                }
280            }
281            Ok(Some(_)) => warn!("L3 CF snapshot merkle root mismatch - filter will be rebuilt"),
282            Ok(None) => info!("No L3 CF snapshot found - filter will be built on warmup"),
283            Err(e) => warn!(error = %e, "Failed to load L3 CF snapshot"),
284        }
285    }
286    
287    /// Sync Redis from SQL by diffing merkle trees and only syncing stale branches.
288    async fn sync_redis_from_sql_diff(
289        &self,
290        sql_merkle: &SqlMerkleStore,
291        redis_merkle: &RedisMerkleStore,
292    ) -> Result<usize, StorageError> {
293        let mut total_synced = 0;
294        let stale_prefixes = self.find_stale_branches(sql_merkle, redis_merkle, "").await?;
295        
296        for prefix in stale_prefixes {
297            info!(prefix = %prefix, "Syncing stale branch from SQL to Redis");
298            
299            let leaf_paths = sql_merkle.get_leaves_under(&prefix).await
300                .map_err(|e| StorageError::Backend(format!("Failed to get leaves: {}", e)))?;
301            
302            if leaf_paths.is_empty() {
303                continue;
304            }
305            
306            let mut merkle_batch = MerkleBatch::new();
307            
308            if let Some(ref l3_store) = self.l3_store {
309                for object_id in &leaf_paths {
310                    if let Ok(Some(item)) = l3_store.get(object_id).await {
311                        let payload_hash = PathMerkle::payload_hash(&item.content);
312                        let leaf_hash = PathMerkle::leaf_hash(
313                            &item.object_id,
314                            item.version,
315                            item.updated_at,
316                            &payload_hash,
317                        );
318                        merkle_batch.insert(object_id.clone(), leaf_hash);
319                        
320                        if let Some(ref l2_store) = self.l2_store {
321                            if let Err(e) = l2_store.put(&item).await {
322                                warn!(id = %object_id, error = %e, "Failed to sync item to Redis");
323                            } else {
324                                total_synced += 1;
325                            }
326                        }
327                    }
328                }
329                
330                if !merkle_batch.is_empty() {
331                    if let Err(e) = redis_merkle.apply_batch(&merkle_batch).await {
332                        warn!(prefix = %prefix, error = %e, "Failed to update Redis merkle");
333                    }
334                }
335            }
336        }
337        
338        Ok(total_synced)
339    }
340    
341    /// Find stale branches by recursively comparing SQL and Redis merkle nodes.
342    async fn find_stale_branches(
343        &self,
344        sql_merkle: &SqlMerkleStore,
345        redis_merkle: &RedisMerkleStore,
346        prefix: &str,
347    ) -> Result<Vec<String>, StorageError> {
348        let mut stale = Vec::new();
349        
350        let sql_children = sql_merkle.get_children(prefix).await
351            .map_err(|e| StorageError::Backend(format!("SQL merkle error: {}", e)))?;
352        let redis_children = redis_merkle.get_children(prefix).await
353            .map_err(|e| StorageError::Backend(format!("Redis merkle error: {}", e)))?;
354        
355        if sql_children.is_empty() {
356            return Ok(stale);
357        }
358        
359        for (child_path, sql_hash) in sql_children {
360            match redis_children.get(&child_path) {
361                Some(redis_hash) if redis_hash == &sql_hash => continue,
362                Some(_) => {
363                    if child_path.contains('.') && !child_path.ends_with('.') {
364                        stale.push(child_path);
365                    } else {
366                        let sub_stale = Box::pin(
367                            self.find_stale_branches(sql_merkle, redis_merkle, &child_path)
368                        ).await?;
369                        stale.extend(sub_stale);
370                    }
371                }
372                None => stale.push(child_path),
373            }
374        }
375        
376        Ok(stale)
377    }
378
379    /// Warm up L1 cache from L2/L3 (call after start)
380    #[tracing::instrument(skip(self))]
381    pub async fn warm_up(&mut self) -> Result<(), StorageError> {
382        let _ = self.state.send(EngineState::WarmingUp);
383        info!("Warming up cuckoo filter and L1 cache...");
384        
385        if let Some(l3) = &self.l3_store {
386            let batch_size = self.config.read().cuckoo_warmup_batch_size;
387            info!(batch_size, "Warming L3 cuckoo filter from MySQL...");
388            
389            let total_count = l3.count_all().await.unwrap_or(0);
390            if total_count > 0 {
391                let mut offset = 0u64;
392                let mut loaded = 0usize;
393                
394                loop {
395                    let keys = l3.scan_keys(offset, batch_size).await?;
396                    if keys.is_empty() {
397                        break;
398                    }
399                    
400                    for key in &keys {
401                        self.l3_filter.insert(key);
402                    }
403                    
404                    loaded += keys.len();
405                    offset += keys.len() as u64;
406                    
407                    if loaded % 10_000 == 0 || loaded == total_count as usize {
408                        debug!(loaded, total = %total_count, "L3 filter warmup progress");
409                    }
410                }
411                
412                self.l3_filter.mark_trusted();
413                info!(loaded, trust_state = ?self.l3_filter.trust_state(), "L3 cuckoo filter warmup complete");
414            } else {
415                info!("L3 store is empty, skipping filter warmup");
416                self.l3_filter.mark_trusted();
417            }
418        }
419        
420        info!(
421            l3_trust = ?self.l3_filter.trust_state(),
422            "Cuckoo filter warmup complete (L3 only, Redis uses EXISTS)"
423        );
424
425        let _ = self.state.send(EngineState::Ready);
426        info!("Warm-up complete, engine ready");
427        Ok(())
428    }
429
430    /// Perform one tick of maintenance (for manual control instead of run loop).
431    pub async fn tick(&self) {
432        self.maybe_evict();
433        self.maybe_flush_l2().await;
434    }
435
436    /// Force flush all pending L2 batches immediately.
437    pub async fn force_flush(&self) {
438        let batch = self.l2_batcher.lock().await.force_flush();
439        if let Some(batch) = batch {
440            debug!(batch_size = batch.items.len(), "Force flushing L2 batch");
441            self.flush_batch_internal(batch).await;
442        }
443    }
444
445    /// Run the main event loop.
446    /// 
447    /// This method takes `&self` (not `&mut self`) so the engine can be
448    /// shared via `Arc` while the run loop executes in a background task.
449    /// 
450    /// # Example
451    /// 
452    /// ```rust,ignore
453    /// let engine = Arc::new(engine);
454    /// let engine_clone = engine.clone();
455    /// tokio::spawn(async move {
456    ///     engine_clone.run().await;
457    /// });
458    /// ```
459    #[tracing::instrument(skip(self))]
460    pub async fn run(&self) {
461        let _ = self.state.send(EngineState::Running);
462        info!("Sync engine running");
463
464        let mut health_check_interval = tokio::time::interval(
465            tokio::time::Duration::from_secs(30)
466        );
467        let mut wal_drain_interval = tokio::time::interval(
468            tokio::time::Duration::from_secs(5)
469        );
470        let cf_snapshot_secs = self.config.read().cf_snapshot_interval_secs;
471        let mut cf_snapshot_interval = tokio::time::interval(
472            tokio::time::Duration::from_secs(cf_snapshot_secs)
473        );
474
475        loop {
476            // Lock config_rx only for the check, not across awaits
477            let config_changed = {
478                let rx = self.config_rx.lock().await;
479                // Use poll-style check to avoid holding lock
480                rx.has_changed().unwrap_or(false)
481            };
482            
483            if config_changed {
484                let new_config = self.config_rx.lock().await.borrow_and_update().clone();
485                info!("Config updated: l1_max_bytes={}", new_config.l1_max_bytes);
486                *self.config.write() = new_config;
487            }
488            
489            tokio::select! {
490                _ = tokio::time::sleep(tokio::time::Duration::from_millis(100)) => {
491                    self.maybe_evict();
492                    self.maybe_flush_l2().await;
493                    self.maybe_snapshot_cf_by_threshold().await;
494                }
495                
496                _ = health_check_interval.tick() => {
497                    self.check_mysql_health().await;
498                }
499                
500                _ = wal_drain_interval.tick() => {
501                    self.maybe_drain_wal().await;
502                }
503                
504                _ = cf_snapshot_interval.tick() => {
505                    self.maybe_snapshot_cf_by_time().await;
506                }
507            }
508        }
509    }
510
511    /// Initiate graceful shutdown
512    #[tracing::instrument(skip(self))]
513    pub async fn shutdown(&self) {
514        use crate::FlushReason;
515        
516        let shutdown_start = std::time::Instant::now();
517        info!("Initiating sync engine shutdown...");
518        let _ = self.state.send(EngineState::ShuttingDown);
519        
520        let batch = self.l2_batcher.lock().await.force_flush_with_reason(FlushReason::Shutdown);
521        if let Some(batch) = batch {
522            let batch_size = batch.items.len();
523            info!(batch_size, "Flushing final L2 batch on shutdown");
524            {
525                let mut batcher = self.l2_batcher.lock().await;
526                batcher.add_batch(batch.items);
527            }
528            self.maybe_flush_l2().await;
529            crate::metrics::record_items_written("L2", batch_size);
530        }
531        
532        self.snapshot_cuckoo_filters("shutdown").await;
533        
534        crate::metrics::record_startup_phase("shutdown", shutdown_start.elapsed());
535        info!("Sync engine shutdown complete");
536    }
537}