sync_engine/coordinator/
mod.rs

1//! Sync engine coordinator.
2//!
3//! The [`SyncEngine`] is the main orchestrator that ties together all components:
4//! - L1 in-memory cache with eviction
5//! - L2 Redis cache with batch writes
6//! - L3 MySQL/SQLite archive with WAL durability
7//! - Cuckoo filters for existence checks
8//! - Merkle trees for sync verification
9//!
10//! # Lifecycle
11//!
12//! ```text
13//! Created → Connecting → DrainingWal → SyncingRedis → WarmingUp → Ready → Running → ShuttingDown
14//! ```
15//!
16//! # Example
17//!
18//! ```rust,no_run
19//! use sync_engine::{SyncEngine, SyncEngineConfig, SyncItem, EngineState};
20//! use serde_json::json;
21//! use tokio::sync::watch;
22//!
23//! # #[tokio::main]
24//! # async fn main() {
25//! let config = SyncEngineConfig::default();
26//! let (_tx, rx) = watch::channel(config.clone());
27//! let mut engine = SyncEngine::new(config, rx);
28//!
29//! assert_eq!(engine.state(), EngineState::Created);
30//!
31//! // engine.start().await.expect("Start failed");
32//! // assert!(engine.is_ready());
33//! # }
34//! ```
35
36mod types;
37mod api;
38mod lifecycle;
39mod flush;
40mod merkle_api;
41mod search_api;
42
43pub use types::{EngineState, ItemStatus, BatchResult};
44pub use merkle_api::MerkleDiff;
45pub use search_api::{SearchTier, SearchResult, SearchSource};
46#[allow(unused_imports)]
47use types::WriteTarget;
48
49use std::sync::Arc;
50use std::sync::atomic::{AtomicUsize, AtomicU64, Ordering};
51use std::time::Instant;
52use dashmap::DashMap;
53use parking_lot::RwLock;
54use tokio::sync::{watch, Mutex};
55use tracing::{info, warn, debug, error};
56
57use crate::config::SyncEngineConfig;
58use crate::sync_item::SyncItem;
59use crate::submit_options::SubmitOptions;
60use crate::backpressure::BackpressureLevel;
61use crate::storage::traits::{CacheStore, ArchiveStore, StorageError};
62use crate::storage::sql::SqlStore;
63use crate::cuckoo::filter_manager::{FilterManager, FilterTrust};
64use crate::cuckoo::FilterPersistence;
65use crate::batching::hybrid_batcher::{HybridBatcher, BatchConfig, SizedItem};
66use crate::merkle::{RedisMerkleStore, SqlMerkleStore, MerkleBatch};
67use crate::resilience::wal::{WriteAheadLog, MysqlHealthChecker};
68use crate::eviction::tan_curve::{TanCurvePolicy, CacheEntry};
69
70use search_api::SearchState;
71
72/// Main sync engine coordinator.
73///
74/// Manages the three-tier storage architecture:
75/// - **L1**: In-memory DashMap with pressure-based eviction
76/// - **L2**: Redis cache with batch writes
77/// - **L3**: MySQL/SQLite archive (ground truth)
78///
79/// # Thread Safety
80///
81/// The engine is `Send + Sync` and designed for concurrent access.
82/// Internal state uses atomic operations and concurrent data structures.
83pub struct SyncEngine {
84    /// Configuration (can be updated at runtime via watch channel)
85    pub(super) config: SyncEngineConfig,
86
87    /// Runtime config updates
88    #[allow(dead_code)]
89    pub(super) config_rx: watch::Receiver<SyncEngineConfig>,
90
91    /// Engine state (broadcast to watchers)
92    pub(super) state: watch::Sender<EngineState>,
93
94    /// Engine state receiver (for internal use)
95    pub(super) state_rx: watch::Receiver<EngineState>,
96
97    /// L1: In-memory cache
98    pub(super) l1_cache: Arc<DashMap<String, SyncItem>>,
99
100    /// L1 size tracking (bytes)
101    pub(super) l1_size_bytes: Arc<AtomicUsize>,
102
103    /// L2: Redis cache (optional)
104    pub(super) l2_store: Option<Arc<dyn CacheStore>>,
105
106    /// L3: MySQL/SQLite archive (optional)
107    pub(super) l3_store: Option<Arc<dyn ArchiveStore>>,
108    
109    /// L3: Direct SqlStore reference for dirty merkle operations
110    pub(super) sql_store: Option<Arc<SqlStore>>,
111
112    /// L3 Cuckoo filter (L2 has no filter - TTL makes it unreliable)
113    pub(super) l3_filter: Arc<FilterManager>,
114
115    /// Filter persistence (for fast startup)
116    pub(super) filter_persistence: Option<FilterPersistence>,
117
118    /// CF snapshot tracking
119    pub(super) cf_inserts_since_snapshot: AtomicU64,
120    pub(super) cf_last_snapshot: Mutex<Instant>,
121
122    /// Hybrid batcher for L2 writes
123    pub(super) l2_batcher: Mutex<HybridBatcher<SyncItem>>,
124
125    /// Redis merkle store
126    pub(super) redis_merkle: Option<RedisMerkleStore>,
127
128    /// SQL merkle store
129    pub(super) sql_merkle: Option<SqlMerkleStore>,
130
131    /// Write-ahead log for L3 durability
132    pub(super) l3_wal: Option<WriteAheadLog>,
133
134    /// MySQL health checker
135    pub(super) mysql_health: MysqlHealthChecker,
136
137    /// Eviction policy
138    pub(super) eviction_policy: TanCurvePolicy,
139
140    /// Search state (index manager + cache)
141    pub(super) search_state: Option<Arc<RwLock<SearchState>>>,
142}
143
144impl SyncEngine {
145    /// Create a new sync engine.
146    ///
147    /// The engine starts in `Created` state. Call [`start()`](Self::start)
148    /// to connect to backends and transition to `Ready`.
149    pub fn new(config: SyncEngineConfig, config_rx: watch::Receiver<SyncEngineConfig>) -> Self {
150        let (state_tx, state_rx) = watch::channel(EngineState::Created);
151
152        let batch_config = BatchConfig {
153            flush_ms: config.batch_flush_ms,
154            flush_count: config.batch_flush_count,
155            flush_bytes: config.batch_flush_bytes,
156        };
157
158        Self {
159            config: config.clone(),
160            config_rx,
161            state: state_tx,
162            state_rx,
163            l1_cache: Arc::new(DashMap::new()),
164            l1_size_bytes: Arc::new(AtomicUsize::new(0)),
165            l2_store: None,
166            l3_store: None,
167            sql_store: None,
168            l3_filter: Arc::new(FilterManager::new("sync-engine-l3", 100_000)),
169            filter_persistence: None,
170            cf_inserts_since_snapshot: AtomicU64::new(0),
171            cf_last_snapshot: Mutex::new(Instant::now()),
172            l2_batcher: Mutex::new(HybridBatcher::new(batch_config)),
173            redis_merkle: None,
174            sql_merkle: None,
175            l3_wal: None,
176            mysql_health: MysqlHealthChecker::new(),
177            eviction_policy: TanCurvePolicy::default(),
178            search_state: Some(Arc::new(RwLock::new(SearchState::default()))),
179        }
180    }
181
182    /// Get current engine state.
183    #[must_use]
184    pub fn state(&self) -> EngineState {
185        *self.state_rx.borrow()
186    }
187
188    /// Get a receiver to watch state changes.
189    #[must_use]
190    pub fn state_receiver(&self) -> watch::Receiver<EngineState> {
191        self.state_rx.clone()
192    }
193
194    /// Check if engine is ready to accept requests.
195    #[must_use]
196    pub fn is_ready(&self) -> bool {
197        matches!(self.state(), EngineState::Ready | EngineState::Running)
198    }
199
200    /// Get current memory pressure (0.0 - 1.0+).
201    #[must_use]
202    pub fn memory_pressure(&self) -> f64 {
203        let used = self.l1_size_bytes.load(Ordering::Acquire);
204        let max = self.config.l1_max_bytes;
205        if max == 0 {
206            0.0
207        } else {
208            used as f64 / max as f64
209        }
210    }
211
212    /// Get current backpressure level.
213    #[must_use]
214    pub fn pressure(&self) -> BackpressureLevel {
215        BackpressureLevel::from_pressure(self.memory_pressure())
216    }
217
218    /// Check if the engine should accept writes (based on pressure).
219    #[must_use]
220    pub fn should_accept_writes(&self) -> bool {
221        let pressure = self.pressure();
222        !matches!(pressure, BackpressureLevel::Emergency | BackpressureLevel::Shutdown)
223    }
224
225    // --- Core CRUD Operations ---
226
227    /// Get an item by ID.
228    ///
229    /// Checks storage tiers in order: L1 → L2 → L3.
230    /// Updates access count and promotes to L1 on hit.
231    #[tracing::instrument(skip(self), fields(tier))]
232    pub async fn get(&self, id: &str) -> Result<Option<SyncItem>, StorageError> {
233        let start = std::time::Instant::now();
234        
235        // 1. Check L1 (in-memory)
236        if let Some(mut item) = self.l1_cache.get_mut(id) {
237            item.access_count = item.access_count.saturating_add(1);
238            item.last_accessed = std::time::SystemTime::now()
239                .duration_since(std::time::UNIX_EPOCH)
240                .unwrap_or_default()
241                .as_millis() as u64;
242            tracing::Span::current().record("tier", "L1");
243            debug!("L1 hit");
244            crate::metrics::record_operation("L1", "get", "hit");
245            crate::metrics::record_latency("L1", "get", start.elapsed());
246            return Ok(Some(item.clone()));
247        }
248
249        // 2. Try L2 (Redis) - no filter, just try it
250        if let Some(ref l2) = self.l2_store {
251            match l2.get(id).await {
252                Ok(Some(item)) => {
253                    // Promote to L1
254                    self.insert_l1(item.clone());
255                    tracing::Span::current().record("tier", "L2");
256                    debug!("L2 hit, promoted to L1");
257                    crate::metrics::record_operation("L2", "get", "hit");
258                    crate::metrics::record_latency("L2", "get", start.elapsed());
259                    return Ok(Some(item));
260                }
261                Ok(None) => {
262                    // Not in Redis
263                    debug!("L2 miss");
264                    crate::metrics::record_operation("L2", "get", "miss");
265                }
266                Err(e) => {
267                    warn!(error = %e, "L2 lookup failed");
268                    crate::metrics::record_operation("L2", "get", "error");
269                }
270            }
271        }
272
273        // 3. Check L3 filter before hitting MySQL
274        if self.l3_filter.should_check_l3(id) {
275            crate::metrics::record_cuckoo_check("L3", "positive");
276            if let Some(ref l3) = self.l3_store {
277                match l3.get(id).await {
278                    Ok(Some(item)) => {
279                        // Promote to L1
280                        if self.memory_pressure() < 1.0 {
281                            self.insert_l1(item.clone());
282                        }
283                        tracing::Span::current().record("tier", "L3");
284                        debug!("L3 hit, promoted to L1");
285                        crate::metrics::record_operation("L3", "get", "hit");
286                        crate::metrics::record_latency("L3", "get", start.elapsed());
287                        crate::metrics::record_bytes_read("L3", item.content.len());
288                        return Ok(Some(item));
289                    }
290                    Ok(None) => {
291                        // False positive in filter
292                        debug!("L3 filter false positive");
293                        crate::metrics::record_operation("L3", "get", "false_positive");
294                        crate::metrics::record_cuckoo_false_positive("L3");
295                    }
296                    Err(e) => {
297                        warn!(error = %e, "L3 lookup failed");
298                        crate::metrics::record_operation("L3", "get", "error");
299                        crate::metrics::record_error("L3", "get", "backend");
300                    }
301                }
302            }
303        } else {
304            // Cuckoo filter says definitely not in L3 - saved a database query
305            crate::metrics::record_cuckoo_check("L3", "negative");
306        }
307
308        tracing::Span::current().record("tier", "miss");
309        debug!("Cache miss");
310        crate::metrics::record_operation("all", "get", "miss");
311        crate::metrics::record_latency("all", "get", start.elapsed());
312        Ok(None)
313    }
314
315    /// Get an item with hash verification.
316    ///
317    /// If the item has a non-empty `merkle_root`, the content hash is verified.
318    /// Returns `StorageError::Corruption` if the hash doesn't match.
319    #[tracing::instrument(skip(self), fields(verified))]
320    pub async fn get_verified(&self, id: &str) -> Result<Option<SyncItem>, StorageError> {
321        let item = match self.get(id).await? {
322            Some(item) => item,
323            None => return Ok(None),
324        };
325
326        // Verify hash if item has merkle_root set
327        if !item.merkle_root.is_empty() {
328            use sha2::{Sha256, Digest};
329            
330            let computed = Sha256::digest(&item.content);
331            let computed_hex = hex::encode(computed);
332            
333            if computed_hex != item.merkle_root {
334                tracing::Span::current().record("verified", false);
335                warn!(
336                    id = %id,
337                    expected = %item.merkle_root,
338                    actual = %computed_hex,
339                    "Data corruption detected!"
340                );
341                
342                // Record corruption metric
343                crate::metrics::record_corruption(id);
344                
345                return Err(StorageError::Corruption {
346                    id: id.to_string(),
347                    expected: item.merkle_root.clone(),
348                    actual: computed_hex,
349                });
350            }
351            
352            tracing::Span::current().record("verified", true);
353            debug!(id = %id, "Hash verification passed");
354        }
355
356        Ok(Some(item))
357    }
358
359    /// Submit an item for sync.
360    ///
361    /// The item is immediately stored in L1 and queued for batch write to L2/L3.
362    /// Uses default options: Redis + SQL (both enabled).
363    /// Filters are updated only on successful writes in flush_batch_internal().
364    ///
365    /// For custom routing, use [`submit_with`](Self::submit_with).
366    #[tracing::instrument(skip(self, item), fields(object_id = %item.object_id))]
367    pub async fn submit(&self, item: SyncItem) -> Result<(), StorageError> {
368        self.submit_with(item, SubmitOptions::default()).await
369    }
370
371    /// Submit an item with custom routing options.
372    ///
373    /// The item is immediately stored in L1 and queued for batch write.
374    /// Items are batched by compatible options for efficient pipelined writes.
375    ///
376    /// # Example
377    ///
378    /// ```rust,no_run
379    /// # use sync_engine::{SyncEngine, SyncItem, SubmitOptions, CacheTtl};
380    /// # async fn example(engine: &SyncEngine) -> Result<(), sync_engine::StorageError> {
381    /// // Cache-only with 1 minute TTL (no SQL write)
382    /// let item = SyncItem::new("cache.key".into(), b"data".to_vec());
383    /// engine.submit_with(item, SubmitOptions::cache(CacheTtl::Minute)).await?;
384    ///
385    /// // SQL-only durable storage (no Redis)
386    /// let item = SyncItem::new("archive.key".into(), b"data".to_vec());
387    /// engine.submit_with(item, SubmitOptions::durable()).await?;
388    /// # Ok(())
389    /// # }
390    /// ```
391    #[tracing::instrument(skip(self, item, options), fields(object_id = %item.object_id, redis = options.redis, sql = options.sql))]
392    pub async fn submit_with(&self, mut item: SyncItem, options: SubmitOptions) -> Result<(), StorageError> {
393        let start = std::time::Instant::now();
394        
395        if !self.should_accept_writes() {
396            crate::metrics::record_operation("engine", "submit", "rejected");
397            crate::metrics::record_error("engine", "submit", "backpressure");
398            return Err(StorageError::Backend(format!(
399                "Rejecting write: engine state={}, pressure={}",
400                self.state(),
401                self.pressure()
402            )));
403        }
404
405        let id = item.object_id.clone();
406        let item_bytes = item.content.len();
407
408        // Apply state override from options (if provided)
409        if let Some(ref state) = options.state {
410            item.state = state.clone();
411        }
412        
413        // Attach options to item (travels through batch pipeline)
414        item.submit_options = Some(options);
415
416        // Insert into L1 (immediate, in-memory)
417        self.insert_l1(item.clone());
418        crate::metrics::record_operation("L1", "submit", "success");
419        crate::metrics::record_bytes_written("L1", item_bytes);
420        
421        // NOTE: We do NOT insert into L2/L3 filters here!
422        // Filters are updated only on SUCCESSFUL writes in flush_batch_internal()
423        // This prevents filter/storage divergence if writes fail.
424        
425        // Queue for batched L2/L3 persistence
426        self.l2_batcher.lock().await.add(item);
427
428        debug!(id = %id, "Item submitted to L1 and batch queue");
429        crate::metrics::record_latency("L1", "submit", start.elapsed());
430        Ok(())
431    }
432
433    /// Delete an item from all storage tiers.
434    /// 
435    /// Deletes are more complex than writes because the item may exist in:
436    /// - L1 (DashMap) - immediate removal
437    /// - L2 (Redis) - async removal
438    /// - L3 (MySQL) - async removal  
439    /// - Cuckoo filters (L2/L3) - remove from both
440    /// - Merkle trees - update with deletion marker
441    #[tracing::instrument(skip(self), fields(object_id = %id))]
442    pub async fn delete(&self, id: &str) -> Result<bool, StorageError> {
443        let start = std::time::Instant::now();
444        
445        if !self.should_accept_writes() {
446            crate::metrics::record_operation("engine", "delete", "rejected");
447            crate::metrics::record_error("engine", "delete", "backpressure");
448            return Err(StorageError::Backend(format!(
449                "Rejecting delete: engine state={}, pressure={}",
450                self.state(),
451                self.pressure()
452            )));
453        }
454
455        let mut found = false;
456
457        // 1. Remove from L1 (immediate)
458        if let Some((_, item)) = self.l1_cache.remove(id) {
459            let size = Self::item_size(&item);
460            self.l1_size_bytes.fetch_sub(size, Ordering::Release);
461            found = true;
462            debug!("Deleted from L1");
463            crate::metrics::record_operation("L1", "delete", "success");
464        }
465
466        // 2. Remove from L3 cuckoo filter only (no L2 filter - TTL makes it unreliable)
467        self.l3_filter.remove(id);
468
469        // 3. Delete from L2 (Redis) - best effort
470        if let Some(ref l2) = self.l2_store {
471            let l2_start = std::time::Instant::now();
472            match l2.delete(id).await {
473                Ok(()) => {
474                    found = true;
475                    debug!("Deleted from L2 (Redis)");
476                    crate::metrics::record_operation("L2", "delete", "success");
477                    crate::metrics::record_latency("L2", "delete", l2_start.elapsed());
478                }
479                Err(e) => {
480                    warn!(error = %e, "Failed to delete from L2 (Redis)");
481                    crate::metrics::record_operation("L2", "delete", "error");
482                    crate::metrics::record_error("L2", "delete", "backend");
483                }
484            }
485        }
486
487        // 4. Delete from L3 (MySQL) - ground truth
488        if let Some(ref l3) = self.l3_store {
489            let l3_start = std::time::Instant::now();
490            match l3.delete(id).await {
491                Ok(()) => {
492                    found = true;
493                    debug!("Deleted from L3 (MySQL)");
494                    crate::metrics::record_operation("L3", "delete", "success");
495                    crate::metrics::record_latency("L3", "delete", l3_start.elapsed());
496                }
497                Err(e) => {
498                    error!(error = %e, "Failed to delete from L3 (MySQL)");
499                    crate::metrics::record_operation("L3", "delete", "error");
500                    crate::metrics::record_error("L3", "delete", "backend");
501                    // Don't return error - item may not exist in L3
502                }
503            }
504        }
505
506        // 5. Update merkle trees with deletion marker
507        let mut merkle_batch = MerkleBatch::new();
508        merkle_batch.delete(id.to_string());
509
510        if let Some(ref sql_merkle) = self.sql_merkle {
511            if let Err(e) = sql_merkle.apply_batch(&merkle_batch).await {
512                error!(error = %e, "Failed to update SQL Merkle tree for deletion");
513                crate::metrics::record_error("L3", "merkle", "batch_apply");
514            }
515        }
516
517        if let Some(ref redis_merkle) = self.redis_merkle {
518            if let Err(e) = redis_merkle.apply_batch(&merkle_batch).await {
519                warn!(error = %e, "Failed to update Redis Merkle tree for deletion");
520                crate::metrics::record_error("L2", "merkle", "batch_apply");
521            }
522        }
523
524        info!(found, "Delete operation completed");
525        crate::metrics::record_latency("all", "delete", start.elapsed());
526        Ok(found)
527    }
528
529    // --- Internal helpers ---
530
531    /// Insert or update an item in L1, correctly tracking size.
532    fn insert_l1(&self, item: SyncItem) {
533        let new_size = Self::item_size(&item);
534        let key = item.object_id.clone();
535        
536        // Use entry API to handle insert vs update atomically
537        if let Some(old_item) = self.l1_cache.insert(key, item) {
538            // Update: subtract old size, add new size
539            let old_size = Self::item_size(&old_item);
540            // Use wrapping operations to avoid underflow if sizes are estimated
541            let current = self.l1_size_bytes.load(Ordering::Acquire);
542            let new_total = current.saturating_sub(old_size).saturating_add(new_size);
543            self.l1_size_bytes.store(new_total, Ordering::Release);
544        } else {
545            // Insert: just add new size
546            self.l1_size_bytes.fetch_add(new_size, Ordering::Release);
547        }
548    }
549
550    /// Calculate approximate size of an item in bytes.
551    #[inline]
552    fn item_size(item: &SyncItem) -> usize {
553        // Use cached size if available, otherwise compute
554        item.size_bytes()
555    }
556
557    fn maybe_evict(&self) {
558        let pressure = self.memory_pressure();
559        if pressure < self.config.backpressure_warn {
560            return;
561        }
562
563        let level = BackpressureLevel::from_pressure(pressure);
564        debug!(pressure = %pressure, level = %level, "Memory pressure detected, running eviction");
565        
566        // Collect cache entries for scoring
567        let now = std::time::Instant::now();
568        let entries: Vec<CacheEntry> = self.l1_cache.iter()
569            .map(|ref_multi| {
570                let item = ref_multi.value();
571                let id = ref_multi.key().clone();
572                
573                // Convert epoch millis to Instant-relative age
574                let now_millis = std::time::SystemTime::now()
575                    .duration_since(std::time::UNIX_EPOCH)
576                    .unwrap_or_default()
577                    .as_millis() as u64;
578                let age_secs = if item.last_accessed > 0 {
579                    (now_millis.saturating_sub(item.last_accessed)) as f64 / 1000.0
580                } else {
581                    3600.0 // Default 1 hour if never accessed
582                };
583                
584                CacheEntry {
585                    id,
586                    size_bytes: item.size_bytes(),
587                    created_at: now - std::time::Duration::from_secs_f64(age_secs),
588                    last_access: now - std::time::Duration::from_secs_f64(age_secs),
589                    access_count: item.access_count,
590                    is_dirty: false, // All items in L1 are assumed flushed to L2/L3
591                }
592            })
593            .collect();
594        
595        if entries.is_empty() {
596            return;
597        }
598        
599        // Calculate how many to evict based on pressure level
600        let evict_count = match level {
601            BackpressureLevel::Normal => 0,
602            BackpressureLevel::Warn => entries.len() / 20,    // 5%
603            BackpressureLevel::Throttle => entries.len() / 10, // 10%
604            BackpressureLevel::Critical => entries.len() / 5,  // 20%
605            BackpressureLevel::Emergency => entries.len() / 3, // 33%
606            BackpressureLevel::Shutdown => entries.len() / 2,  // 50%
607        }.max(1);
608        
609        // Select victims using tan curve algorithm
610        let victims = self.eviction_policy.select_victims(&entries, evict_count, pressure);
611        
612        // Evict victims
613        let mut evicted_bytes = 0usize;
614        for victim_id in &victims {
615            if let Some((_, item)) = self.l1_cache.remove(victim_id) {
616                evicted_bytes += item.size_bytes();
617            }
618        }
619        
620        // Update size tracking
621        self.l1_size_bytes.fetch_sub(evicted_bytes, Ordering::Release);
622        
623        info!(
624            evicted = victims.len(),
625            evicted_bytes = evicted_bytes,
626            pressure = %pressure,
627            level = %level,
628            "Evicted entries from L1 cache"
629        );
630    }
631
632    /// Get L1 cache stats
633    pub fn l1_stats(&self) -> (usize, usize) {
634        (
635            self.l1_cache.len(),
636            self.l1_size_bytes.load(Ordering::Acquire),
637        )
638    }
639
640    /// Get L3 filter stats (entries, capacity, trust_state)
641    #[must_use]
642    pub fn l3_filter_stats(&self) -> (usize, usize, FilterTrust) {
643        self.l3_filter.stats()
644    }
645
646    /// Get access to the L3 filter (for warmup/verification)
647    pub fn l3_filter(&self) -> &Arc<FilterManager> {
648        &self.l3_filter
649    }
650
651    /// Get merkle root hashes from Redis (L2) and SQL (L3).
652    /// 
653    /// Returns `(redis_root, sql_root)` as hex strings.
654    /// Returns `None` for backends that aren't connected or have empty trees.
655    pub async fn merkle_roots(&self) -> (Option<String>, Option<String>) {
656        let redis_root = if let Some(ref rm) = self.redis_merkle {
657            rm.root_hash().await.ok().flatten().map(hex::encode)
658        } else {
659            None
660        };
661        
662        let sql_root = if let Some(ref sm) = self.sql_merkle {
663            sm.root_hash().await.ok().flatten().map(hex::encode)
664        } else {
665            None
666        };
667        
668        (redis_root, sql_root)
669    }
670
671    /// Verify and trust the L3 cuckoo filter.
672    /// 
673    /// Compares the filter's merkle root against L3's merkle root.
674    /// If they match, marks the filter as trusted.
675    /// 
676    /// Returns `true` if the filter is now trusted, `false` otherwise.
677    pub async fn verify_filter(&self) -> bool {
678        // Get SQL merkle root
679        let sql_root = if let Some(ref sm) = self.sql_merkle {
680            match sm.root_hash().await {
681                Ok(Some(root)) => root,
682                _ => return false,
683            }
684        } else {
685            // No SQL backend - can't verify, mark trusted anyway
686            self.l3_filter.mark_trusted();
687            return true;
688        };
689
690        // For now, we trust the filter if we have a SQL root
691        // A full implementation would compare CF merkle against SQL merkle
692        // But since CF doesn't maintain a merkle tree, we trust after warmup
693        info!(
694            sql_root = %hex::encode(sql_root),
695            "Verifying L3 filter against SQL merkle root"
696        );
697        
698        // Mark trusted if we got here (SQL is connected and has a root)
699        self.l3_filter.mark_trusted();
700        true
701    }
702
703    /// Update all gauge metrics with current engine state.
704    /// 
705    /// Call this before snapshotting metrics to ensure gauges reflect current state.
706    /// Useful for OTEL export or monitoring dashboards.
707    pub fn update_gauge_metrics(&self) {
708        let (l1_count, l1_bytes) = self.l1_stats();
709        crate::metrics::set_l1_cache_items(l1_count);
710        crate::metrics::set_l1_cache_bytes(l1_bytes);
711        crate::metrics::set_memory_pressure(self.memory_pressure());
712        
713        let (filter_entries, filter_capacity, _trust) = self.l3_filter_stats();
714        let filter_load = if filter_capacity > 0 { 
715            filter_entries as f64 / filter_capacity as f64 
716        } else { 
717            0.0 
718        };
719        crate::metrics::set_cuckoo_filter_entries("L3", filter_entries);
720        crate::metrics::set_cuckoo_filter_load("L3", filter_load);
721        
722        crate::metrics::set_backpressure_level(self.pressure() as u8);
723    }
724}
725
726#[cfg(test)]
727mod tests {
728    use super::*;
729    use crate::config::SyncEngineConfig;
730    use tokio::sync::watch;
731    use serde_json::json;
732
733    fn create_test_engine() -> SyncEngine {
734        let config = SyncEngineConfig::default();
735        let (_tx, rx) = watch::channel(config.clone());
736        SyncEngine::new(config, rx)
737    }
738
739    fn create_test_item(id: &str) -> SyncItem {
740        SyncItem::from_json(
741            id.to_string(),
742            json!({"data": "test"}),
743        )
744    }
745
746    #[test]
747    fn test_engine_created_state() {
748        let engine = create_test_engine();
749        assert_eq!(engine.state(), EngineState::Created);
750        assert!(!engine.is_ready());
751    }
752
753    #[test]
754    fn test_memory_pressure_calculation() {
755        let config = SyncEngineConfig {
756            l1_max_bytes: 1000,
757            ..Default::default()
758        };
759        let (_tx, rx) = watch::channel(config.clone());
760        let engine = SyncEngine::new(config, rx);
761
762        assert_eq!(engine.memory_pressure(), 0.0);
763
764        // Simulate adding items
765        let item = create_test_item("test1");
766        engine.insert_l1(item);
767
768        // Pressure should be > 0 now
769        assert!(engine.memory_pressure() > 0.0);
770    }
771
772    #[test]
773    fn test_l1_insert_and_size_tracking() {
774        let engine = create_test_engine();
775        
776        let item = create_test_item("test1");
777        let expected_size = item.size_bytes();
778        
779        engine.insert_l1(item);
780        
781        let (count, size) = engine.l1_stats();
782        assert_eq!(count, 1);
783        assert_eq!(size, expected_size);
784    }
785
786    #[test]
787    fn test_l1_update_size_tracking() {
788        let engine = create_test_engine();
789        
790        let item1 = create_test_item("test1");
791        engine.insert_l1(item1);
792        let (_, _size1) = engine.l1_stats();
793        
794        // Insert larger item with same ID
795        let item2 = SyncItem::from_json(
796            "test1".to_string(),
797            json!({"data": "much larger content here for testing size changes"}),
798        );
799        let size2_expected = item2.size_bytes();
800        engine.insert_l1(item2);
801        
802        let (count, size2) = engine.l1_stats();
803        assert_eq!(count, 1); // Still one item
804        assert_eq!(size2, size2_expected); // Size should be updated
805    }
806
807    #[tokio::test]
808    async fn test_get_nonexistent() {
809        let engine = create_test_engine();
810        let result = engine.get("nonexistent").await.unwrap();
811        assert!(result.is_none());
812    }
813
814    #[tokio::test]
815    async fn test_get_from_l1() {
816        let engine = create_test_engine();
817        let item = create_test_item("test1");
818        engine.insert_l1(item.clone());
819
820        let result = engine.get("test1").await.unwrap();
821        assert!(result.is_some());
822        assert_eq!(result.unwrap().object_id, "test1");
823    }
824
825    #[tokio::test]
826    async fn test_delete_from_l1() {
827        let engine = create_test_engine();
828        let item = create_test_item("test1");
829        engine.insert_l1(item);
830
831        let (count_before, _) = engine.l1_stats();
832        assert_eq!(count_before, 1);
833
834        let deleted = engine.delete("test1").await.unwrap();
835        assert!(deleted);
836
837        let (count_after, size_after) = engine.l1_stats();
838        assert_eq!(count_after, 0);
839        assert_eq!(size_after, 0);
840    }
841
842    #[test]
843    fn test_filter_stats() {
844        let engine = create_test_engine();
845        
846        let (entries, capacity, _trust) = engine.l3_filter_stats();
847        assert_eq!(entries, 0);
848        assert!(capacity > 0);
849    }
850
851    #[test]
852    fn test_should_accept_writes() {
853        let engine = create_test_engine();
854        assert!(engine.should_accept_writes());
855    }
856
857    #[test]
858    fn test_pressure_level() {
859        let engine = create_test_engine();
860        assert_eq!(engine.pressure(), BackpressureLevel::Normal);
861    }
862}