sync_engine/coordinator/
lifecycle.rs

1// Copyright (c) 2025-2026 Adrian Robinson. Licensed under the AGPL-3.0.
2// See LICENSE file in the project root for full license text.
3
4//! Engine lifecycle management: start, shutdown, run loop.
5//!
6//! This module contains the startup sequence, main run loop, and shutdown logic.
7
8use tracing::{info, warn, debug, error};
9
10use crate::storage::traits::StorageError;
11use crate::cuckoo::{FilterPersistence, L3_FILTER_ID};
12use crate::merkle::{RedisMerkleStore, SqlMerkleStore, MerkleBatch, PathMerkle};
13use crate::resilience::wal::WriteAheadLog;
14
15use super::{SyncEngine, EngineState};
16#[allow(unused_imports)]
17use super::WriteTarget;
18
19impl SyncEngine {
20    /// Start the engine - connect to backends with proper startup sequence.
21    /// 
22    /// Startup flow (trust hierarchy):
23    /// 1. Initialize WAL (SQLite) - always first, our durability lifeline
24    /// 2. Connect to SQL (L3) - ground truth, initialize SQL merkle store
25    /// 3. Drain any pending WAL entries to SQL (blocking)
26    /// 4. Get SQL merkle root (this is our trusted root)
27    /// 5. Load CF snapshots from SQLite:
28    ///    - If snapshot merkle root matches SQL root → CF is trusted
29    ///    - Otherwise → CF must be rebuilt from SQL scan
30    /// 6. Connect to Redis (L2) - cache layer
31    /// 7. Compare Redis merkle root with SQL merkle root
32    ///    - If match → Redis is synced
33    ///    - If mismatch → use branch diff to find stale regions and resync
34    /// 8. Ready!
35    #[tracing::instrument(skip(self), fields(has_redis, has_sql))]
36    pub async fn start(&mut self) -> Result<(), StorageError> {
37        let startup_start = std::time::Instant::now();
38        info!("Starting sync engine with trust-verified startup...");
39        let _ = self.state.send(EngineState::Connecting);
40
41        // ========== PHASE 1: Initialize WAL (always first) ==========
42        let phase_start = std::time::Instant::now();
43        let (wal_path, wal_max_items) = {
44            let cfg = self.config.read();
45            (
46                cfg.wal_path.clone().unwrap_or_else(|| "./sync_engine_wal.db".to_string()),
47                cfg.wal_max_items.unwrap_or(1_000_000),
48            )
49        };
50        
51        let wal = match WriteAheadLog::new(&wal_path, wal_max_items).await {
52            Ok(wal) => {
53                info!(path = %wal_path, "Write-ahead log initialized");
54                crate::metrics::record_startup_phase("wal_init", phase_start.elapsed());
55                wal
56            }
57            Err(e) => {
58                crate::metrics::record_error("WAL", "init", "sqlite");
59                return Err(StorageError::Backend(format!(
60                    "Failed to initialize WAL at {}: {}. Cannot guarantee durability!",
61                    wal_path, e
62                )));
63            }
64        };
65        
66        // Initialize filter persistence (uses same WAL SQLite)
67        match FilterPersistence::new(&wal_path).await {
68            Ok(fp) => {
69                self.filter_persistence = Some(fp);
70            }
71            Err(e) => {
72                warn!(error = %e, "Failed to initialize filter persistence - CF snapshots disabled");
73                crate::metrics::record_error("filter", "init", "persistence");
74            }
75        }
76        
77        let pending_count = if wal.has_pending() {
78            wal.stats(false).pending_items
79        } else {
80            0
81        };
82        self.l3_wal = Some(wal);
83
84        // ========== PHASE 2: Connect to SQL (L3 - ground truth) ==========
85        let phase_start = std::time::Instant::now();
86        let sql_url = self.config.read().sql_url.clone();
87        if let Some(ref sql_url) = sql_url {
88            info!(url = %sql_url, "Connecting to SQL (L3 - ground truth)...");
89            // Pass shared schema registry to SqlStore for table routing
90            match crate::storage::sql::SqlStore::with_registry(sql_url, self.schema_registry.clone()).await {
91                Ok(store) => {
92                    // Initialize SQL merkle store (ground truth) - shares pool with SqlStore
93                    let is_sqlite = sql_url.starts_with("sqlite:");
94                    let sql_merkle = SqlMerkleStore::from_pool(store.pool(), is_sqlite);
95                    if let Err(e) = sql_merkle.init_schema().await {
96                        error!(error = %e, "Failed to initialize SQL merkle schema");
97                        crate::metrics::record_error("L3", "init", "merkle_schema");
98                        return Err(StorageError::Backend(format!(
99                            "Failed to initialize SQL merkle schema: {}", e
100                        )));
101                    }
102                    self.sql_merkle = Some(sql_merkle);
103                    
104                    // Keep both Arc<dyn ArchiveStore> and Arc<SqlStore> for dirty merkle access
105                    let store = std::sync::Arc::new(store);
106                    self.sql_store = Some(store.clone());
107                    self.l3_store = Some(store);
108                    tracing::Span::current().record("has_sql", true);
109                    self.mysql_health.record_success();
110                    crate::metrics::set_backend_healthy("mysql", true);
111                    crate::metrics::record_startup_phase("sql_connect", phase_start.elapsed());
112                    info!("SQL (L3) connected with merkle store (ground truth)");
113                }
114                Err(e) => {
115                    tracing::Span::current().record("has_sql", false);
116                    error!(error = %e, "Failed to connect to SQL - this is required for startup");
117                    self.mysql_health.record_failure();
118                    crate::metrics::set_backend_healthy("mysql", false);
119                    crate::metrics::record_connection_error("mysql");
120                    return Err(StorageError::Backend(format!(
121                        "SQL connection required for startup: {}", e
122                    )));
123                }
124            }
125        } else {
126            warn!("No SQL URL configured - operating without ground truth storage!");
127            tracing::Span::current().record("has_sql", false);
128        }
129
130        // ========== PHASE 3: Drain WAL to SQL ==========
131        if pending_count > 0 {
132            let phase_start = std::time::Instant::now();
133            let _ = self.state.send(EngineState::DrainingWal);
134            info!(pending = pending_count, "Draining WAL to SQL before startup...");
135            
136            if let Some(ref l3) = self.l3_store {
137                if let Some(ref wal) = self.l3_wal {
138                    match wal.drain_to(l3.as_ref(), pending_count as usize).await {
139                        Ok(drained) => {
140                            info!(drained = drained.len(), "WAL drained to SQL");
141                            crate::metrics::record_items_written("L3", drained.len());
142                        }
143                        Err(e) => {
144                            warn!(error = %e, "WAL drain had errors - some items may retry later");
145                            crate::metrics::record_error("WAL", "drain", "partial");
146                        }
147                    }
148                }
149            }
150            crate::metrics::record_startup_phase("wal_drain", phase_start.elapsed());
151        }
152        
153        // ========== PHASE 4: Get SQL merkle root (trusted root) ==========
154        let sql_root: Option<[u8; 32]> = if let Some(ref sql_merkle) = self.sql_merkle {
155            match sql_merkle.root_hash().await {
156                Ok(Some(root)) => {
157                    info!(root = %hex::encode(root), "SQL merkle root (ground truth)");
158                    Some(root)
159                }
160                Ok(None) => {
161                    info!("SQL merkle tree is empty (no data yet)");
162                    None
163                }
164                Err(e) => {
165                    warn!(error = %e, "Failed to get SQL merkle root");
166                    None
167                }
168            }
169        } else {
170            None
171        };
172        
173        // ========== PHASE 5: Restore CF from snapshot (if valid) ==========
174        let phase_start = std::time::Instant::now();
175        self.restore_cuckoo_filters(&sql_root).await;
176        crate::metrics::record_startup_phase("cf_restore", phase_start.elapsed());
177
178        // ========== PHASE 6: Connect to Redis (L2 - cache) ==========
179        let phase_start = std::time::Instant::now();
180        let (redis_url, redis_prefix) = {
181            let cfg = self.config.read();
182            (cfg.redis_url.clone(), cfg.redis_prefix.clone())
183        };
184        if let Some(ref redis_url) = redis_url {
185            info!(url = %redis_url, prefix = ?redis_prefix, "Connecting to Redis (L2 - cache)...");
186            match crate::storage::redis::RedisStore::with_prefix(redis_url, redis_prefix.as_deref()).await {
187                Ok(store) => {
188                    let redis_merkle = RedisMerkleStore::with_prefix(
189                        store.connection(),
190                        redis_prefix.as_deref(),
191                    );
192                    self.redis_merkle = Some(redis_merkle);
193                    let store = std::sync::Arc::new(store);
194                    self.redis_store = Some(store.clone());  // Keep direct reference for CDC
195                    self.l2_store = Some(store);
196                    tracing::Span::current().record("has_redis", true);
197                    crate::metrics::set_backend_healthy("redis", true);
198                    crate::metrics::record_startup_phase("redis_connect", phase_start.elapsed());
199                    info!("Redis (L2) connected with merkle shadow tree");
200                }
201                Err(e) => {
202                    tracing::Span::current().record("has_redis", false);
203                    warn!(error = %e, "Failed to connect to Redis, continuing without L2 cache");
204                    crate::metrics::set_backend_healthy("redis", false);
205                    crate::metrics::record_connection_error("redis");
206                }
207            }
208        } else {
209            tracing::Span::current().record("has_redis", false);
210        }
211
212        // ========== PHASE 7: Sync Redis with SQL via branch diff ==========
213        if let (Some(ref sql_merkle), Some(ref redis_merkle), Some(ref sql_root)) = 
214            (&self.sql_merkle, &self.redis_merkle, &sql_root) 
215        {
216            let phase_start = std::time::Instant::now();
217            let _ = self.state.send(EngineState::SyncingRedis);
218            
219            match redis_merkle.root_hash().await {
220                Ok(Some(redis_root)) if &redis_root == sql_root => {
221                    info!("Redis merkle root matches SQL - Redis is in sync");
222                }
223                Ok(Some(redis_root)) => {
224                    info!(
225                        sql_root = %hex::encode(sql_root),
226                        redis_root = %hex::encode(redis_root),
227                        "Redis merkle root mismatch - initiating branch diff sync"
228                    );
229                    
230                    match self.sync_redis_from_sql_diff(sql_merkle, redis_merkle).await {
231                        Ok(synced) => {
232                            info!(items_synced = synced, "Redis sync complete via branch diff");
233                            crate::metrics::record_items_written("L2", synced);
234                        }
235                        Err(e) => {
236                            warn!(error = %e, "Branch diff sync failed - Redis may be stale");
237                            crate::metrics::record_error("L2", "sync", "branch_diff");
238                        }
239                    }
240                }
241                Ok(None) => {
242                    info!("Redis merkle tree is empty - will be populated on writes");
243                }
244                Err(e) => {
245                    warn!(error = %e, "Failed to get Redis merkle root - Redis may be stale");
246                    crate::metrics::record_error("L2", "merkle", "root_hash");
247                }
248            }
249            crate::metrics::record_startup_phase("redis_sync", phase_start.elapsed());
250        }
251
252        let _ = self.state.send(EngineState::Ready);
253        crate::metrics::record_startup_total(startup_start.elapsed());
254        info!("Sync engine ready (trust-verified startup complete)");
255        Ok(())
256    }
257    
258    /// Restore cuckoo filters from snapshots if merkle roots match
259    async fn restore_cuckoo_filters(&self, sql_root: &Option<[u8; 32]>) {
260        let persistence = match &self.filter_persistence {
261            Some(p) => p,
262            None => return,
263        };
264        
265        let sql_root = match sql_root {
266            Some(r) => r,
267            None => return,
268        };
269        
270        // Note: L2 filter removed (TTL makes it untrustworthy - use Redis EXISTS)
271        
272        // Try L3 filter
273        match persistence.load(L3_FILTER_ID).await {
274            Ok(Some(state)) if &state.merkle_root == sql_root => {
275                if let Err(e) = self.l3_filter.import(&state.filter_bytes) {
276                    warn!(error = %e, "Failed to import L3 filter from snapshot");
277                } else {
278                    self.l3_filter.mark_trusted();
279                    info!(entries = state.entry_count, "Restored L3 cuckoo filter from snapshot");
280                }
281            }
282            Ok(Some(_)) => warn!("L3 CF snapshot merkle root mismatch - filter will be rebuilt"),
283            Ok(None) => info!("No L3 CF snapshot found - filter will be built on warmup"),
284            Err(e) => warn!(error = %e, "Failed to load L3 CF snapshot"),
285        }
286    }
287    
288    /// Sync Redis from SQL by diffing merkle trees and only syncing stale branches.
289    async fn sync_redis_from_sql_diff(
290        &self,
291        sql_merkle: &SqlMerkleStore,
292        redis_merkle: &RedisMerkleStore,
293    ) -> Result<usize, StorageError> {
294        let mut total_synced = 0;
295        let stale_prefixes = self.find_stale_branches(sql_merkle, redis_merkle, "").await?;
296        
297        for prefix in stale_prefixes {
298            info!(prefix = %prefix, "Syncing stale branch from SQL to Redis");
299            
300            let leaf_paths = sql_merkle.get_leaves_under(&prefix).await
301                .map_err(|e| StorageError::Backend(format!("Failed to get leaves: {}", e)))?;
302            
303            if leaf_paths.is_empty() {
304                continue;
305            }
306            
307            let mut merkle_batch = MerkleBatch::new();
308            
309            if let Some(ref l3_store) = self.l3_store {
310                for object_id in &leaf_paths {
311                    if let Ok(Some(item)) = l3_store.get(object_id).await {
312                        let payload_hash = PathMerkle::payload_hash(&item.content);
313                        let leaf_hash = PathMerkle::leaf_hash(
314                            &item.object_id,
315                            item.version,
316                            &payload_hash,
317                        );
318                        merkle_batch.insert(object_id.clone(), leaf_hash);
319                        
320                        if let Some(ref l2_store) = self.l2_store {
321                            if let Err(e) = l2_store.put(&item).await {
322                                warn!(id = %object_id, error = %e, "Failed to sync item to Redis");
323                            } else {
324                                total_synced += 1;
325                            }
326                        }
327                    }
328                }
329                
330                if !merkle_batch.is_empty() {
331                    if let Err(e) = redis_merkle.apply_batch(&merkle_batch).await {
332                        warn!(prefix = %prefix, error = %e, "Failed to update Redis merkle");
333                    }
334                }
335            }
336        }
337        
338        Ok(total_synced)
339    }
340    
341    /// Find stale branches by recursively comparing SQL and Redis merkle nodes.
342    async fn find_stale_branches(
343        &self,
344        sql_merkle: &SqlMerkleStore,
345        redis_merkle: &RedisMerkleStore,
346        prefix: &str,
347    ) -> Result<Vec<String>, StorageError> {
348        let mut stale = Vec::new();
349        
350        let sql_children = sql_merkle.get_children(prefix).await
351            .map_err(|e| StorageError::Backend(format!("SQL merkle error: {}", e)))?;
352        let redis_children = redis_merkle.get_children(prefix).await
353            .map_err(|e| StorageError::Backend(format!("Redis merkle error: {}", e)))?;
354        
355        if sql_children.is_empty() {
356            return Ok(stale);
357        }
358        
359        for (child_path, sql_hash) in sql_children {
360            match redis_children.get(&child_path) {
361                Some(redis_hash) if redis_hash == &sql_hash => continue,
362                Some(_) => {
363                    if child_path.contains('.') && !child_path.ends_with('.') {
364                        stale.push(child_path);
365                    } else {
366                        let sub_stale = Box::pin(
367                            self.find_stale_branches(sql_merkle, redis_merkle, &child_path)
368                        ).await?;
369                        stale.extend(sub_stale);
370                    }
371                }
372                None => stale.push(child_path),
373            }
374        }
375        
376        Ok(stale)
377    }
378
379    /// Warm up L1 cache from L2/L3 (call after start)
380    #[tracing::instrument(skip(self))]
381    pub async fn warm_up(&mut self) -> Result<(), StorageError> {
382        let _ = self.state.send(EngineState::WarmingUp);
383        info!("Warming up cuckoo filter and L1 cache...");
384        
385        if let Some(l3) = &self.l3_store {
386            let batch_size = self.config.read().cuckoo_warmup_batch_size;
387            info!(batch_size, "Warming L3 cuckoo filter from MySQL...");
388            
389            let total_count = l3.count_all().await.unwrap_or(0);
390            if total_count > 0 {
391                let mut offset = 0u64;
392                let mut loaded = 0usize;
393                
394                loop {
395                    let keys = l3.scan_keys(offset, batch_size).await?;
396                    if keys.is_empty() {
397                        break;
398                    }
399                    
400                    for key in &keys {
401                        self.l3_filter.insert(key);
402                    }
403                    
404                    loaded += keys.len();
405                    offset += keys.len() as u64;
406                    
407                    if loaded % 10_000 == 0 || loaded == total_count as usize {
408                        debug!(loaded, total = %total_count, "L3 filter warmup progress");
409                    }
410                }
411                
412                self.l3_filter.mark_trusted();
413                info!(loaded, trust_state = ?self.l3_filter.trust_state(), "L3 cuckoo filter warmup complete");
414            } else {
415                info!("L3 store is empty, skipping filter warmup");
416                self.l3_filter.mark_trusted();
417            }
418        }
419        
420        info!(
421            l3_trust = ?self.l3_filter.trust_state(),
422            "Cuckoo filter warmup complete (L3 only, Redis uses EXISTS)"
423        );
424
425        let _ = self.state.send(EngineState::Ready);
426        info!("Warm-up complete, engine ready");
427        Ok(())
428    }
429
430    /// Perform one tick of maintenance (for manual control instead of run loop).
431    pub async fn tick(&self) {
432        self.maybe_evict();
433        self.maybe_flush_l2().await;
434    }
435
436    /// Force flush all pending L2 batches immediately.
437    pub async fn force_flush(&self) {
438        let batch = self.l2_batcher.lock().await.force_flush();
439        if let Some(batch) = batch {
440            debug!(batch_size = batch.items.len(), "Force flushing L2 batch");
441            self.flush_batch_internal(batch).await;
442        }
443    }
444
445    /// Run the main event loop.
446    /// 
447    /// This method takes `&self` (not `&mut self`) so the engine can be
448    /// shared via `Arc` while the run loop executes in a background task.
449    /// 
450    /// # Example
451    /// 
452    /// ```rust,ignore
453    /// let engine = Arc::new(engine);
454    /// let engine_clone = engine.clone();
455    /// tokio::spawn(async move {
456    ///     engine_clone.run().await;
457    /// });
458    /// ```
459    #[tracing::instrument(skip(self))]
460    pub async fn run(&self) {
461        let _ = self.state.send(EngineState::Running);
462        info!("Sync engine running");
463
464        let mut health_check_interval = tokio::time::interval(
465            tokio::time::Duration::from_secs(30)
466        );
467        let mut wal_drain_interval = tokio::time::interval(
468            tokio::time::Duration::from_secs(5)
469        );
470        let cf_snapshot_secs = self.config.read().cf_snapshot_interval_secs;
471        let mut cf_snapshot_interval = tokio::time::interval(
472            tokio::time::Duration::from_secs(cf_snapshot_secs)
473        );
474
475        loop {
476            // Lock config_rx only for the check, not across awaits
477            let config_changed = {
478                let rx = self.config_rx.lock().await;
479                // Use poll-style check to avoid holding lock
480                rx.has_changed().unwrap_or(false)
481            };
482            
483            if config_changed {
484                let new_config = self.config_rx.lock().await.borrow_and_update().clone();
485                info!("Config hot-reloaded: l1_max_bytes={}, redis_url={:?}", 
486                    new_config.l1_max_bytes,
487                    new_config.redis_url.as_ref().map(|u| u.rsplit('@').next().unwrap_or(u)));
488                *self.config.write() = new_config;
489            }
490            
491            tokio::select! {
492                _ = tokio::time::sleep(tokio::time::Duration::from_millis(100)) => {
493                    self.maybe_evict();
494                    self.maybe_flush_l2().await;
495                    self.maybe_snapshot_cf_by_threshold().await;
496                }
497                
498                _ = health_check_interval.tick() => {
499                    self.check_mysql_health().await;
500                }
501                
502                _ = wal_drain_interval.tick() => {
503                    self.maybe_drain_wal().await;
504                }
505                
506                _ = cf_snapshot_interval.tick() => {
507                    self.maybe_snapshot_cf_by_time().await;
508                }
509            }
510        }
511    }
512
513    /// Initiate graceful shutdown
514    #[tracing::instrument(skip(self))]
515    pub async fn shutdown(&self) {
516        use crate::FlushReason;
517
518        let shutdown_start = std::time::Instant::now();
519        info!("Initiating sync engine shutdown...");
520        let _ = self.state.send(EngineState::ShuttingDown);
521        
522        let batch = self.l2_batcher.lock().await.force_flush_with_reason(FlushReason::Shutdown);
523        if let Some(batch) = batch {
524            let batch_size = batch.items.len();
525            info!(batch_size, "Flushing final L2 batch on shutdown");
526            {
527                let mut batcher = self.l2_batcher.lock().await;
528                batcher.add_batch(batch.items);
529            }
530            self.maybe_flush_l2().await;
531            crate::metrics::record_items_written("L2", batch_size);
532        }
533        
534        self.snapshot_cuckoo_filters("shutdown").await;
535        
536        crate::metrics::record_startup_phase("shutdown", shutdown_start.elapsed());
537        info!("Sync engine shutdown complete");
538    }
539}