sync_engine/coordinator/
mod.rs

1//! Sync engine coordinator.
2//!
3//! The [`SyncEngine`] is the main orchestrator that ties together all components:
4//! - L1 in-memory cache with eviction
5//! - L2 Redis cache with batch writes
6//! - L3 MySQL/SQLite archive with WAL durability
7//! - Cuckoo filters for existence checks
8//! - Merkle trees for sync verification
9//!
10//! # Lifecycle
11//!
12//! ```text
13//! Created → Connecting → DrainingWal → SyncingRedis → WarmingUp → Ready → Running → ShuttingDown
14//! ```
15//!
16//! # Example
17//!
18//! ```rust,no_run
19//! use sync_engine::{SyncEngine, SyncEngineConfig, SyncItem, EngineState};
20//! use serde_json::json;
21//! use tokio::sync::watch;
22//!
23//! # #[tokio::main]
24//! # async fn main() {
25//! let config = SyncEngineConfig::default();
26//! let (_tx, rx) = watch::channel(config.clone());
27//! let mut engine = SyncEngine::new(config, rx);
28//!
29//! assert_eq!(engine.state(), EngineState::Created);
30//!
31//! // engine.start().await.expect("Start failed");
32//! // assert!(engine.is_ready());
33//! # }
34//! ```
35
36mod types;
37mod api;
38mod lifecycle;
39mod flush;
40mod merkle_api;
41
42pub use types::{EngineState, ItemStatus, BatchResult};
43pub use merkle_api::MerkleDiff;
44#[allow(unused_imports)]
45use types::WriteTarget;
46
47use std::sync::Arc;
48use std::sync::atomic::{AtomicUsize, AtomicU64, Ordering};
49use std::time::Instant;
50use dashmap::DashMap;
51use tokio::sync::{watch, Mutex};
52use tracing::{info, warn, debug, error};
53
54use crate::config::SyncEngineConfig;
55use crate::sync_item::SyncItem;
56use crate::submit_options::SubmitOptions;
57use crate::backpressure::BackpressureLevel;
58use crate::storage::traits::{CacheStore, ArchiveStore, StorageError};
59use crate::storage::sql::SqlStore;
60use crate::cuckoo::filter_manager::{FilterManager, FilterTrust};
61use crate::cuckoo::FilterPersistence;
62use crate::batching::hybrid_batcher::{HybridBatcher, BatchConfig, SizedItem};
63use crate::merkle::{RedisMerkleStore, SqlMerkleStore, MerkleBatch};
64use crate::resilience::wal::{WriteAheadLog, MysqlHealthChecker};
65use crate::eviction::tan_curve::{TanCurvePolicy, CacheEntry};
66
67/// Main sync engine coordinator.
68///
69/// Manages the three-tier storage architecture:
70/// - **L1**: In-memory DashMap with pressure-based eviction
71/// - **L2**: Redis cache with batch writes
72/// - **L3**: MySQL/SQLite archive (ground truth)
73///
74/// # Thread Safety
75///
76/// The engine is `Send + Sync` and designed for concurrent access.
77/// Internal state uses atomic operations and concurrent data structures.
78pub struct SyncEngine {
79    /// Configuration (can be updated at runtime via watch channel)
80    pub(super) config: SyncEngineConfig,
81
82    /// Runtime config updates
83    #[allow(dead_code)]
84    pub(super) config_rx: watch::Receiver<SyncEngineConfig>,
85
86    /// Engine state (broadcast to watchers)
87    pub(super) state: watch::Sender<EngineState>,
88
89    /// Engine state receiver (for internal use)
90    pub(super) state_rx: watch::Receiver<EngineState>,
91
92    /// L1: In-memory cache
93    pub(super) l1_cache: Arc<DashMap<String, SyncItem>>,
94
95    /// L1 size tracking (bytes)
96    pub(super) l1_size_bytes: Arc<AtomicUsize>,
97
98    /// L2: Redis cache (optional)
99    pub(super) l2_store: Option<Arc<dyn CacheStore>>,
100
101    /// L3: MySQL/SQLite archive (optional)
102    pub(super) l3_store: Option<Arc<dyn ArchiveStore>>,
103    
104    /// L3: Direct SqlStore reference for dirty merkle operations
105    pub(super) sql_store: Option<Arc<SqlStore>>,
106
107    /// L3 Cuckoo filter (L2 has no filter - TTL makes it unreliable)
108    pub(super) l3_filter: Arc<FilterManager>,
109
110    /// Filter persistence (for fast startup)
111    pub(super) filter_persistence: Option<FilterPersistence>,
112
113    /// CF snapshot tracking
114    pub(super) cf_inserts_since_snapshot: AtomicU64,
115    pub(super) cf_last_snapshot: Mutex<Instant>,
116
117    /// Hybrid batcher for L2 writes
118    pub(super) l2_batcher: Mutex<HybridBatcher<SyncItem>>,
119
120    /// Redis merkle store
121    pub(super) redis_merkle: Option<RedisMerkleStore>,
122
123    /// SQL merkle store
124    pub(super) sql_merkle: Option<SqlMerkleStore>,
125
126    /// Write-ahead log for L3 durability
127    pub(super) l3_wal: Option<WriteAheadLog>,
128
129    /// MySQL health checker
130    pub(super) mysql_health: MysqlHealthChecker,
131
132    /// Eviction policy
133    pub(super) eviction_policy: TanCurvePolicy,
134}
135
136impl SyncEngine {
137    /// Create a new sync engine.
138    ///
139    /// The engine starts in `Created` state. Call [`start()`](Self::start)
140    /// to connect to backends and transition to `Ready`.
141    pub fn new(config: SyncEngineConfig, config_rx: watch::Receiver<SyncEngineConfig>) -> Self {
142        let (state_tx, state_rx) = watch::channel(EngineState::Created);
143
144        let batch_config = BatchConfig {
145            flush_ms: config.batch_flush_ms,
146            flush_count: config.batch_flush_count,
147            flush_bytes: config.batch_flush_bytes,
148        };
149
150        Self {
151            config: config.clone(),
152            config_rx,
153            state: state_tx,
154            state_rx,
155            l1_cache: Arc::new(DashMap::new()),
156            l1_size_bytes: Arc::new(AtomicUsize::new(0)),
157            l2_store: None,
158            l3_store: None,
159            sql_store: None,
160            l3_filter: Arc::new(FilterManager::new("sync-engine-l3", 100_000)),
161            filter_persistence: None,
162            cf_inserts_since_snapshot: AtomicU64::new(0),
163            cf_last_snapshot: Mutex::new(Instant::now()),
164            l2_batcher: Mutex::new(HybridBatcher::new(batch_config)),
165            redis_merkle: None,
166            sql_merkle: None,
167            l3_wal: None,
168            mysql_health: MysqlHealthChecker::new(),
169            eviction_policy: TanCurvePolicy::default(),
170        }
171    }
172
173    /// Get current engine state.
174    #[must_use]
175    pub fn state(&self) -> EngineState {
176        *self.state_rx.borrow()
177    }
178
179    /// Get a receiver to watch state changes.
180    #[must_use]
181    pub fn state_receiver(&self) -> watch::Receiver<EngineState> {
182        self.state_rx.clone()
183    }
184
185    /// Check if engine is ready to accept requests.
186    #[must_use]
187    pub fn is_ready(&self) -> bool {
188        matches!(self.state(), EngineState::Ready | EngineState::Running)
189    }
190
191    /// Get current memory pressure (0.0 - 1.0+).
192    #[must_use]
193    pub fn memory_pressure(&self) -> f64 {
194        let used = self.l1_size_bytes.load(Ordering::Acquire);
195        let max = self.config.l1_max_bytes;
196        if max == 0 {
197            0.0
198        } else {
199            used as f64 / max as f64
200        }
201    }
202
203    /// Get current backpressure level.
204    #[must_use]
205    pub fn pressure(&self) -> BackpressureLevel {
206        BackpressureLevel::from_pressure(self.memory_pressure())
207    }
208
209    /// Check if the engine should accept writes (based on pressure).
210    #[must_use]
211    pub fn should_accept_writes(&self) -> bool {
212        let pressure = self.pressure();
213        !matches!(pressure, BackpressureLevel::Emergency | BackpressureLevel::Shutdown)
214    }
215
216    // --- Core CRUD Operations ---
217
218    /// Get an item by ID.
219    ///
220    /// Checks storage tiers in order: L1 → L2 → L3.
221    /// Updates access count and promotes to L1 on hit.
222    #[tracing::instrument(skip(self), fields(tier))]
223    pub async fn get(&self, id: &str) -> Result<Option<SyncItem>, StorageError> {
224        let start = std::time::Instant::now();
225        
226        // 1. Check L1 (in-memory)
227        if let Some(mut item) = self.l1_cache.get_mut(id) {
228            item.access_count = item.access_count.saturating_add(1);
229            item.last_accessed = std::time::SystemTime::now()
230                .duration_since(std::time::UNIX_EPOCH)
231                .unwrap_or_default()
232                .as_millis() as u64;
233            tracing::Span::current().record("tier", "L1");
234            debug!("L1 hit");
235            crate::metrics::record_operation("L1", "get", "hit");
236            crate::metrics::record_latency("L1", "get", start.elapsed());
237            return Ok(Some(item.clone()));
238        }
239
240        // 2. Try L2 (Redis) - no filter, just try it
241        if let Some(ref l2) = self.l2_store {
242            match l2.get(id).await {
243                Ok(Some(item)) => {
244                    // Promote to L1
245                    self.insert_l1(item.clone());
246                    tracing::Span::current().record("tier", "L2");
247                    debug!("L2 hit, promoted to L1");
248                    crate::metrics::record_operation("L2", "get", "hit");
249                    crate::metrics::record_latency("L2", "get", start.elapsed());
250                    return Ok(Some(item));
251                }
252                Ok(None) => {
253                    // Not in Redis
254                    debug!("L2 miss");
255                    crate::metrics::record_operation("L2", "get", "miss");
256                }
257                Err(e) => {
258                    warn!(error = %e, "L2 lookup failed");
259                    crate::metrics::record_operation("L2", "get", "error");
260                }
261            }
262        }
263
264        // 3. Check L3 filter before hitting MySQL
265        if self.l3_filter.should_check_l3(id) {
266            crate::metrics::record_cuckoo_check("L3", "positive");
267            if let Some(ref l3) = self.l3_store {
268                match l3.get(id).await {
269                    Ok(Some(item)) => {
270                        // Promote to L1
271                        if self.memory_pressure() < 1.0 {
272                            self.insert_l1(item.clone());
273                        }
274                        tracing::Span::current().record("tier", "L3");
275                        debug!("L3 hit, promoted to L1");
276                        crate::metrics::record_operation("L3", "get", "hit");
277                        crate::metrics::record_latency("L3", "get", start.elapsed());
278                        crate::metrics::record_bytes_read("L3", item.content.len());
279                        return Ok(Some(item));
280                    }
281                    Ok(None) => {
282                        // False positive in filter
283                        debug!("L3 filter false positive");
284                        crate::metrics::record_operation("L3", "get", "false_positive");
285                        crate::metrics::record_cuckoo_false_positive("L3");
286                    }
287                    Err(e) => {
288                        warn!(error = %e, "L3 lookup failed");
289                        crate::metrics::record_operation("L3", "get", "error");
290                        crate::metrics::record_error("L3", "get", "backend");
291                    }
292                }
293            }
294        } else {
295            // Cuckoo filter says definitely not in L3 - saved a database query
296            crate::metrics::record_cuckoo_check("L3", "negative");
297        }
298
299        tracing::Span::current().record("tier", "miss");
300        debug!("Cache miss");
301        crate::metrics::record_operation("all", "get", "miss");
302        crate::metrics::record_latency("all", "get", start.elapsed());
303        Ok(None)
304    }
305
306    /// Get an item with hash verification.
307    ///
308    /// If the item has a non-empty `merkle_root`, the content hash is verified.
309    /// Returns `StorageError::Corruption` if the hash doesn't match.
310    #[tracing::instrument(skip(self), fields(verified))]
311    pub async fn get_verified(&self, id: &str) -> Result<Option<SyncItem>, StorageError> {
312        let item = match self.get(id).await? {
313            Some(item) => item,
314            None => return Ok(None),
315        };
316
317        // Verify hash if item has merkle_root set
318        if !item.merkle_root.is_empty() {
319            use sha2::{Sha256, Digest};
320            
321            let computed = Sha256::digest(&item.content);
322            let computed_hex = hex::encode(computed);
323            
324            if computed_hex != item.merkle_root {
325                tracing::Span::current().record("verified", false);
326                warn!(
327                    id = %id,
328                    expected = %item.merkle_root,
329                    actual = %computed_hex,
330                    "Data corruption detected!"
331                );
332                
333                // Record corruption metric
334                crate::metrics::record_corruption(id);
335                
336                return Err(StorageError::Corruption {
337                    id: id.to_string(),
338                    expected: item.merkle_root.clone(),
339                    actual: computed_hex,
340                });
341            }
342            
343            tracing::Span::current().record("verified", true);
344            debug!(id = %id, "Hash verification passed");
345        }
346
347        Ok(Some(item))
348    }
349
350    /// Submit an item for sync.
351    ///
352    /// The item is immediately stored in L1 and queued for batch write to L2/L3.
353    /// Uses default options: Redis + SQL (both enabled).
354    /// Filters are updated only on successful writes in flush_batch_internal().
355    ///
356    /// For custom routing, use [`submit_with`](Self::submit_with).
357    #[tracing::instrument(skip(self, item), fields(object_id = %item.object_id))]
358    pub async fn submit(&self, item: SyncItem) -> Result<(), StorageError> {
359        self.submit_with(item, SubmitOptions::default()).await
360    }
361
362    /// Submit an item with custom routing options.
363    ///
364    /// The item is immediately stored in L1 and queued for batch write.
365    /// Items are batched by compatible options for efficient pipelined writes.
366    ///
367    /// # Example
368    ///
369    /// ```rust,no_run
370    /// # use sync_engine::{SyncEngine, SyncItem, SubmitOptions, CacheTtl};
371    /// # async fn example(engine: &SyncEngine) -> Result<(), sync_engine::StorageError> {
372    /// // Cache-only with 1 minute TTL (no SQL write)
373    /// let item = SyncItem::new("cache.key".into(), b"data".to_vec());
374    /// engine.submit_with(item, SubmitOptions::cache(CacheTtl::Minute)).await?;
375    ///
376    /// // SQL-only durable storage (no Redis)
377    /// let item = SyncItem::new("archive.key".into(), b"data".to_vec());
378    /// engine.submit_with(item, SubmitOptions::durable()).await?;
379    /// # Ok(())
380    /// # }
381    /// ```
382    #[tracing::instrument(skip(self, item, options), fields(object_id = %item.object_id, redis = options.redis, sql = options.sql))]
383    pub async fn submit_with(&self, mut item: SyncItem, options: SubmitOptions) -> Result<(), StorageError> {
384        let start = std::time::Instant::now();
385        
386        if !self.should_accept_writes() {
387            crate::metrics::record_operation("engine", "submit", "rejected");
388            crate::metrics::record_error("engine", "submit", "backpressure");
389            return Err(StorageError::Backend(format!(
390                "Rejecting write: engine state={}, pressure={}",
391                self.state(),
392                self.pressure()
393            )));
394        }
395
396        let id = item.object_id.clone();
397        let item_bytes = item.content.len();
398
399        // Apply state override from options (if provided)
400        if let Some(ref state) = options.state {
401            item.state = state.clone();
402        }
403        
404        // Attach options to item (travels through batch pipeline)
405        item.submit_options = Some(options);
406
407        // Insert into L1 (immediate, in-memory)
408        self.insert_l1(item.clone());
409        crate::metrics::record_operation("L1", "submit", "success");
410        crate::metrics::record_bytes_written("L1", item_bytes);
411        
412        // NOTE: We do NOT insert into L2/L3 filters here!
413        // Filters are updated only on SUCCESSFUL writes in flush_batch_internal()
414        // This prevents filter/storage divergence if writes fail.
415        
416        // Queue for batched L2/L3 persistence
417        self.l2_batcher.lock().await.add(item);
418
419        debug!(id = %id, "Item submitted to L1 and batch queue");
420        crate::metrics::record_latency("L1", "submit", start.elapsed());
421        Ok(())
422    }
423
424    /// Delete an item from all storage tiers.
425    /// 
426    /// Deletes are more complex than writes because the item may exist in:
427    /// - L1 (DashMap) - immediate removal
428    /// - L2 (Redis) - async removal
429    /// - L3 (MySQL) - async removal  
430    /// - Cuckoo filters (L2/L3) - remove from both
431    /// - Merkle trees - update with deletion marker
432    #[tracing::instrument(skip(self), fields(object_id = %id))]
433    pub async fn delete(&self, id: &str) -> Result<bool, StorageError> {
434        let start = std::time::Instant::now();
435        
436        if !self.should_accept_writes() {
437            crate::metrics::record_operation("engine", "delete", "rejected");
438            crate::metrics::record_error("engine", "delete", "backpressure");
439            return Err(StorageError::Backend(format!(
440                "Rejecting delete: engine state={}, pressure={}",
441                self.state(),
442                self.pressure()
443            )));
444        }
445
446        let mut found = false;
447
448        // 1. Remove from L1 (immediate)
449        if let Some((_, item)) = self.l1_cache.remove(id) {
450            let size = Self::item_size(&item);
451            self.l1_size_bytes.fetch_sub(size, Ordering::Release);
452            found = true;
453            debug!("Deleted from L1");
454            crate::metrics::record_operation("L1", "delete", "success");
455        }
456
457        // 2. Remove from L3 cuckoo filter only (no L2 filter - TTL makes it unreliable)
458        self.l3_filter.remove(id);
459
460        // 3. Delete from L2 (Redis) - best effort
461        if let Some(ref l2) = self.l2_store {
462            let l2_start = std::time::Instant::now();
463            match l2.delete(id).await {
464                Ok(()) => {
465                    found = true;
466                    debug!("Deleted from L2 (Redis)");
467                    crate::metrics::record_operation("L2", "delete", "success");
468                    crate::metrics::record_latency("L2", "delete", l2_start.elapsed());
469                }
470                Err(e) => {
471                    warn!(error = %e, "Failed to delete from L2 (Redis)");
472                    crate::metrics::record_operation("L2", "delete", "error");
473                    crate::metrics::record_error("L2", "delete", "backend");
474                }
475            }
476        }
477
478        // 4. Delete from L3 (MySQL) - ground truth
479        if let Some(ref l3) = self.l3_store {
480            let l3_start = std::time::Instant::now();
481            match l3.delete(id).await {
482                Ok(()) => {
483                    found = true;
484                    debug!("Deleted from L3 (MySQL)");
485                    crate::metrics::record_operation("L3", "delete", "success");
486                    crate::metrics::record_latency("L3", "delete", l3_start.elapsed());
487                }
488                Err(e) => {
489                    error!(error = %e, "Failed to delete from L3 (MySQL)");
490                    crate::metrics::record_operation("L3", "delete", "error");
491                    crate::metrics::record_error("L3", "delete", "backend");
492                    // Don't return error - item may not exist in L3
493                }
494            }
495        }
496
497        // 5. Update merkle trees with deletion marker
498        let mut merkle_batch = MerkleBatch::new();
499        merkle_batch.delete(id.to_string());
500
501        if let Some(ref sql_merkle) = self.sql_merkle {
502            if let Err(e) = sql_merkle.apply_batch(&merkle_batch).await {
503                error!(error = %e, "Failed to update SQL Merkle tree for deletion");
504                crate::metrics::record_error("L3", "merkle", "batch_apply");
505            }
506        }
507
508        if let Some(ref redis_merkle) = self.redis_merkle {
509            if let Err(e) = redis_merkle.apply_batch(&merkle_batch).await {
510                warn!(error = %e, "Failed to update Redis Merkle tree for deletion");
511                crate::metrics::record_error("L2", "merkle", "batch_apply");
512            }
513        }
514
515        info!(found, "Delete operation completed");
516        crate::metrics::record_latency("all", "delete", start.elapsed());
517        Ok(found)
518    }
519
520    // --- Internal helpers ---
521
522    /// Insert or update an item in L1, correctly tracking size.
523    fn insert_l1(&self, item: SyncItem) {
524        let new_size = Self::item_size(&item);
525        let key = item.object_id.clone();
526        
527        // Use entry API to handle insert vs update atomically
528        if let Some(old_item) = self.l1_cache.insert(key, item) {
529            // Update: subtract old size, add new size
530            let old_size = Self::item_size(&old_item);
531            // Use wrapping operations to avoid underflow if sizes are estimated
532            let current = self.l1_size_bytes.load(Ordering::Acquire);
533            let new_total = current.saturating_sub(old_size).saturating_add(new_size);
534            self.l1_size_bytes.store(new_total, Ordering::Release);
535        } else {
536            // Insert: just add new size
537            self.l1_size_bytes.fetch_add(new_size, Ordering::Release);
538        }
539    }
540
541    /// Calculate approximate size of an item in bytes.
542    #[inline]
543    fn item_size(item: &SyncItem) -> usize {
544        // Use cached size if available, otherwise compute
545        item.size_bytes()
546    }
547
548    fn maybe_evict(&self) {
549        let pressure = self.memory_pressure();
550        if pressure < self.config.backpressure_warn {
551            return;
552        }
553
554        let level = BackpressureLevel::from_pressure(pressure);
555        debug!(pressure = %pressure, level = %level, "Memory pressure detected, running eviction");
556        
557        // Collect cache entries for scoring
558        let now = std::time::Instant::now();
559        let entries: Vec<CacheEntry> = self.l1_cache.iter()
560            .map(|ref_multi| {
561                let item = ref_multi.value();
562                let id = ref_multi.key().clone();
563                
564                // Convert epoch millis to Instant-relative age
565                let now_millis = std::time::SystemTime::now()
566                    .duration_since(std::time::UNIX_EPOCH)
567                    .unwrap_or_default()
568                    .as_millis() as u64;
569                let age_secs = if item.last_accessed > 0 {
570                    (now_millis.saturating_sub(item.last_accessed)) as f64 / 1000.0
571                } else {
572                    3600.0 // Default 1 hour if never accessed
573                };
574                
575                CacheEntry {
576                    id,
577                    size_bytes: item.size_bytes(),
578                    created_at: now - std::time::Duration::from_secs_f64(age_secs),
579                    last_access: now - std::time::Duration::from_secs_f64(age_secs),
580                    access_count: item.access_count,
581                    is_dirty: false, // All items in L1 are assumed flushed to L2/L3
582                }
583            })
584            .collect();
585        
586        if entries.is_empty() {
587            return;
588        }
589        
590        // Calculate how many to evict based on pressure level
591        let evict_count = match level {
592            BackpressureLevel::Normal => 0,
593            BackpressureLevel::Warn => entries.len() / 20,    // 5%
594            BackpressureLevel::Throttle => entries.len() / 10, // 10%
595            BackpressureLevel::Critical => entries.len() / 5,  // 20%
596            BackpressureLevel::Emergency => entries.len() / 3, // 33%
597            BackpressureLevel::Shutdown => entries.len() / 2,  // 50%
598        }.max(1);
599        
600        // Select victims using tan curve algorithm
601        let victims = self.eviction_policy.select_victims(&entries, evict_count, pressure);
602        
603        // Evict victims
604        let mut evicted_bytes = 0usize;
605        for victim_id in &victims {
606            if let Some((_, item)) = self.l1_cache.remove(victim_id) {
607                evicted_bytes += item.size_bytes();
608            }
609        }
610        
611        // Update size tracking
612        self.l1_size_bytes.fetch_sub(evicted_bytes, Ordering::Release);
613        
614        info!(
615            evicted = victims.len(),
616            evicted_bytes = evicted_bytes,
617            pressure = %pressure,
618            level = %level,
619            "Evicted entries from L1 cache"
620        );
621    }
622
623    /// Get L1 cache stats
624    pub fn l1_stats(&self) -> (usize, usize) {
625        (
626            self.l1_cache.len(),
627            self.l1_size_bytes.load(Ordering::Acquire),
628        )
629    }
630
631    /// Get L3 filter stats (entries, capacity, trust_state)
632    #[must_use]
633    pub fn l3_filter_stats(&self) -> (usize, usize, FilterTrust) {
634        self.l3_filter.stats()
635    }
636
637    /// Get access to the L3 filter (for warmup/verification)
638    pub fn l3_filter(&self) -> &Arc<FilterManager> {
639        &self.l3_filter
640    }
641
642    /// Get merkle root hashes from Redis (L2) and SQL (L3).
643    /// 
644    /// Returns `(redis_root, sql_root)` as hex strings.
645    /// Returns `None` for backends that aren't connected or have empty trees.
646    pub async fn merkle_roots(&self) -> (Option<String>, Option<String>) {
647        let redis_root = if let Some(ref rm) = self.redis_merkle {
648            rm.root_hash().await.ok().flatten().map(hex::encode)
649        } else {
650            None
651        };
652        
653        let sql_root = if let Some(ref sm) = self.sql_merkle {
654            sm.root_hash().await.ok().flatten().map(hex::encode)
655        } else {
656            None
657        };
658        
659        (redis_root, sql_root)
660    }
661
662    /// Verify and trust the L3 cuckoo filter.
663    /// 
664    /// Compares the filter's merkle root against L3's merkle root.
665    /// If they match, marks the filter as trusted.
666    /// 
667    /// Returns `true` if the filter is now trusted, `false` otherwise.
668    pub async fn verify_filter(&self) -> bool {
669        // Get SQL merkle root
670        let sql_root = if let Some(ref sm) = self.sql_merkle {
671            match sm.root_hash().await {
672                Ok(Some(root)) => root,
673                _ => return false,
674            }
675        } else {
676            // No SQL backend - can't verify, mark trusted anyway
677            self.l3_filter.mark_trusted();
678            return true;
679        };
680
681        // For now, we trust the filter if we have a SQL root
682        // A full implementation would compare CF merkle against SQL merkle
683        // But since CF doesn't maintain a merkle tree, we trust after warmup
684        info!(
685            sql_root = %hex::encode(sql_root),
686            "Verifying L3 filter against SQL merkle root"
687        );
688        
689        // Mark trusted if we got here (SQL is connected and has a root)
690        self.l3_filter.mark_trusted();
691        true
692    }
693
694    /// Update all gauge metrics with current engine state.
695    /// 
696    /// Call this before snapshotting metrics to ensure gauges reflect current state.
697    /// Useful for OTEL export or monitoring dashboards.
698    pub fn update_gauge_metrics(&self) {
699        let (l1_count, l1_bytes) = self.l1_stats();
700        crate::metrics::set_l1_cache_items(l1_count);
701        crate::metrics::set_l1_cache_bytes(l1_bytes);
702        crate::metrics::set_memory_pressure(self.memory_pressure());
703        
704        let (filter_entries, filter_capacity, _trust) = self.l3_filter_stats();
705        let filter_load = if filter_capacity > 0 { 
706            filter_entries as f64 / filter_capacity as f64 
707        } else { 
708            0.0 
709        };
710        crate::metrics::set_cuckoo_filter_entries("L3", filter_entries);
711        crate::metrics::set_cuckoo_filter_load("L3", filter_load);
712        
713        crate::metrics::set_backpressure_level(self.pressure() as u8);
714    }
715}
716
717#[cfg(test)]
718mod tests {
719    use super::*;
720    use crate::config::SyncEngineConfig;
721    use tokio::sync::watch;
722    use serde_json::json;
723
724    fn create_test_engine() -> SyncEngine {
725        let config = SyncEngineConfig::default();
726        let (_tx, rx) = watch::channel(config.clone());
727        SyncEngine::new(config, rx)
728    }
729
730    fn create_test_item(id: &str) -> SyncItem {
731        SyncItem::from_json(
732            id.to_string(),
733            json!({"data": "test"}),
734        )
735    }
736
737    #[test]
738    fn test_engine_created_state() {
739        let engine = create_test_engine();
740        assert_eq!(engine.state(), EngineState::Created);
741        assert!(!engine.is_ready());
742    }
743
744    #[test]
745    fn test_memory_pressure_calculation() {
746        let config = SyncEngineConfig {
747            l1_max_bytes: 1000,
748            ..Default::default()
749        };
750        let (_tx, rx) = watch::channel(config.clone());
751        let engine = SyncEngine::new(config, rx);
752
753        assert_eq!(engine.memory_pressure(), 0.0);
754
755        // Simulate adding items
756        let item = create_test_item("test1");
757        engine.insert_l1(item);
758
759        // Pressure should be > 0 now
760        assert!(engine.memory_pressure() > 0.0);
761    }
762
763    #[test]
764    fn test_l1_insert_and_size_tracking() {
765        let engine = create_test_engine();
766        
767        let item = create_test_item("test1");
768        let expected_size = item.size_bytes();
769        
770        engine.insert_l1(item);
771        
772        let (count, size) = engine.l1_stats();
773        assert_eq!(count, 1);
774        assert_eq!(size, expected_size);
775    }
776
777    #[test]
778    fn test_l1_update_size_tracking() {
779        let engine = create_test_engine();
780        
781        let item1 = create_test_item("test1");
782        engine.insert_l1(item1);
783        let (_, _size1) = engine.l1_stats();
784        
785        // Insert larger item with same ID
786        let item2 = SyncItem::from_json(
787            "test1".to_string(),
788            json!({"data": "much larger content here for testing size changes"}),
789        );
790        let size2_expected = item2.size_bytes();
791        engine.insert_l1(item2);
792        
793        let (count, size2) = engine.l1_stats();
794        assert_eq!(count, 1); // Still one item
795        assert_eq!(size2, size2_expected); // Size should be updated
796    }
797
798    #[tokio::test]
799    async fn test_get_nonexistent() {
800        let engine = create_test_engine();
801        let result = engine.get("nonexistent").await.unwrap();
802        assert!(result.is_none());
803    }
804
805    #[tokio::test]
806    async fn test_get_from_l1() {
807        let engine = create_test_engine();
808        let item = create_test_item("test1");
809        engine.insert_l1(item.clone());
810
811        let result = engine.get("test1").await.unwrap();
812        assert!(result.is_some());
813        assert_eq!(result.unwrap().object_id, "test1");
814    }
815
816    #[tokio::test]
817    async fn test_delete_from_l1() {
818        let engine = create_test_engine();
819        let item = create_test_item("test1");
820        engine.insert_l1(item);
821
822        let (count_before, _) = engine.l1_stats();
823        assert_eq!(count_before, 1);
824
825        let deleted = engine.delete("test1").await.unwrap();
826        assert!(deleted);
827
828        let (count_after, size_after) = engine.l1_stats();
829        assert_eq!(count_after, 0);
830        assert_eq!(size_after, 0);
831    }
832
833    #[test]
834    fn test_filter_stats() {
835        let engine = create_test_engine();
836        
837        let (entries, capacity, _trust) = engine.l3_filter_stats();
838        assert_eq!(entries, 0);
839        assert!(capacity > 0);
840    }
841
842    #[test]
843    fn test_should_accept_writes() {
844        let engine = create_test_engine();
845        assert!(engine.should_accept_writes());
846    }
847
848    #[test]
849    fn test_pressure_level() {
850        let engine = create_test_engine();
851        assert_eq!(engine.pressure(), BackpressureLevel::Normal);
852    }
853}