sync_engine/coordinator/
mod.rs

1// Copyright (c) 2025-2026 Adrian Robinson. Licensed under the AGPL-3.0.
2// See LICENSE file in the project root for full license text.
3
4//! Sync engine coordinator.
5//!
6//! The [`SyncEngine`] is the main orchestrator that ties together all components:
7//! - L1 in-memory cache with eviction
8//! - L2 Redis cache with batch writes
9//! - L3 MySQL/SQLite archive with WAL durability
10//! - Cuckoo filters for existence checks
11//! - Merkle trees for sync verification
12//!
13//! # Lifecycle
14//!
15//! ```text
16//! Created → Connecting → DrainingWal → SyncingRedis → WarmingUp → Ready → Running → ShuttingDown
17//! ```
18//!
19//! # Example
20//!
21//! ```rust,no_run
22//! use sync_engine::{SyncEngine, SyncEngineConfig, SyncItem, EngineState};
23//! use serde_json::json;
24//! use tokio::sync::watch;
25//!
26//! # #[tokio::main]
27//! # async fn main() {
28//! let config = SyncEngineConfig::default();
29//! let (_tx, rx) = watch::channel(config.clone());
30//! let mut engine = SyncEngine::new(config, rx);
31//!
32//! assert_eq!(engine.state(), EngineState::Created);
33//!
34//! // engine.start().await.expect("Start failed");
35//! // assert!(engine.is_ready());
36//! # }
37//! ```
38
39mod types;
40mod api;
41mod lifecycle;
42mod flush;
43mod merkle_api;
44mod search_api;
45
46pub use types::{EngineState, ItemStatus, BatchResult};
47pub use merkle_api::MerkleDiff;
48pub use search_api::{SearchTier, SearchResult, SearchSource};
49#[allow(unused_imports)]
50use types::WriteTarget;
51
52use std::sync::Arc;
53use std::sync::atomic::{AtomicUsize, AtomicU64, Ordering};
54use std::time::Instant;
55use dashmap::DashMap;
56use parking_lot::RwLock;
57use tokio::sync::{watch, Mutex};
58use tracing::{info, warn, debug, error};
59
60use crate::config::SyncEngineConfig;
61use crate::sync_item::SyncItem;
62use crate::submit_options::SubmitOptions;
63use crate::backpressure::BackpressureLevel;
64use crate::storage::traits::{CacheStore, ArchiveStore, StorageError};
65use crate::storage::sql::SqlStore;
66use crate::cuckoo::filter_manager::{FilterManager, FilterTrust};
67use crate::cuckoo::FilterPersistence;
68use crate::batching::hybrid_batcher::{HybridBatcher, BatchConfig, SizedItem};
69use crate::merkle::{RedisMerkleStore, SqlMerkleStore, MerkleBatch};
70use crate::resilience::wal::{WriteAheadLog, MysqlHealthChecker};
71use crate::eviction::tan_curve::{TanCurvePolicy, CacheEntry};
72
73use search_api::SearchState;
74
75/// Main sync engine coordinator.
76///
77/// Manages the three-tier storage architecture:
78/// - **L1**: In-memory DashMap with pressure-based eviction
79/// - **L2**: Redis cache with batch writes
80/// - **L3**: MySQL/SQLite archive (ground truth)
81///
82/// # Thread Safety
83///
84/// The engine is `Send + Sync` and designed for concurrent access.
85/// Internal state uses atomic operations and concurrent data structures.
86pub struct SyncEngine {
87    /// Configuration (can be updated at runtime via watch channel)
88    /// Uses RwLock for interior mutability so run() can take &self
89    pub(super) config: RwLock<SyncEngineConfig>,
90
91    /// Runtime config updates (Mutex for interior mutability in run loop)
92    pub(super) config_rx: Mutex<watch::Receiver<SyncEngineConfig>>,
93
94    /// Engine state (broadcast to watchers)
95    pub(super) state: watch::Sender<EngineState>,
96
97    /// Engine state receiver (for internal use)
98    pub(super) state_rx: watch::Receiver<EngineState>,
99
100    /// L1: In-memory cache
101    pub(super) l1_cache: Arc<DashMap<String, SyncItem>>,
102
103    /// L1 size tracking (bytes)
104    pub(super) l1_size_bytes: Arc<AtomicUsize>,
105
106    /// L2: Redis cache (optional)
107    pub(super) l2_store: Option<Arc<dyn CacheStore>>,
108    
109    /// L2: Direct RedisStore reference for CDC operations
110    pub(super) redis_store: Option<Arc<crate::storage::redis::RedisStore>>,
111
112    /// L3: MySQL/SQLite archive (optional)
113    pub(super) l3_store: Option<Arc<dyn ArchiveStore>>,
114    
115    /// L3: Direct SqlStore reference for dirty merkle operations
116    pub(super) sql_store: Option<Arc<SqlStore>>,
117
118    /// L3 Cuckoo filter (L2 has no filter - TTL makes it unreliable)
119    pub(super) l3_filter: Arc<FilterManager>,
120
121    /// Filter persistence (for fast startup)
122    pub(super) filter_persistence: Option<FilterPersistence>,
123
124    /// CF snapshot tracking
125    pub(super) cf_inserts_since_snapshot: AtomicU64,
126    pub(super) cf_last_snapshot: Mutex<Instant>,
127
128    /// Hybrid batcher for L2 writes
129    pub(super) l2_batcher: Mutex<HybridBatcher<SyncItem>>,
130
131    /// Redis merkle store
132    pub(super) redis_merkle: Option<RedisMerkleStore>,
133
134    /// SQL merkle store
135    pub(super) sql_merkle: Option<SqlMerkleStore>,
136
137    /// Write-ahead log for L3 durability
138    pub(super) l3_wal: Option<WriteAheadLog>,
139
140    /// MySQL health checker
141    pub(super) mysql_health: MysqlHealthChecker,
142
143    /// Eviction policy
144    pub(super) eviction_policy: TanCurvePolicy,
145
146    /// Search state (index manager + cache)
147    pub(super) search_state: Option<Arc<RwLock<SearchState>>>,
148}
149
150impl SyncEngine {
151    /// Create a new sync engine.
152    ///
153    /// The engine starts in `Created` state. Call [`start()`](Self::start)
154    /// to connect to backends and transition to `Ready`.
155    pub fn new(config: SyncEngineConfig, config_rx: watch::Receiver<SyncEngineConfig>) -> Self {
156        let (state_tx, state_rx) = watch::channel(EngineState::Created);
157
158        let batch_config = BatchConfig {
159            flush_ms: config.batch_flush_ms,
160            flush_count: config.batch_flush_count,
161            flush_bytes: config.batch_flush_bytes,
162        };
163
164        Self {
165            config: RwLock::new(config.clone()),
166            config_rx: Mutex::new(config_rx),
167            state: state_tx,
168            state_rx,
169            l1_cache: Arc::new(DashMap::new()),
170            l1_size_bytes: Arc::new(AtomicUsize::new(0)),
171            l2_store: None,
172            redis_store: None,
173            l3_store: None,
174            sql_store: None,
175            l3_filter: Arc::new(FilterManager::new("sync-engine-l3", 100_000)),
176            filter_persistence: None,
177            cf_inserts_since_snapshot: AtomicU64::new(0),
178            cf_last_snapshot: Mutex::new(Instant::now()),
179            l2_batcher: Mutex::new(HybridBatcher::new(batch_config)),
180            redis_merkle: None,
181            sql_merkle: None,
182            l3_wal: None,
183            mysql_health: MysqlHealthChecker::new(),
184            eviction_policy: TanCurvePolicy::default(),
185            search_state: Some(Arc::new(RwLock::new(SearchState::default()))),
186        }
187    }
188
189    /// Get current engine state.
190    #[must_use]
191    pub fn state(&self) -> EngineState {
192        *self.state_rx.borrow()
193    }
194
195    /// Get a receiver to watch state changes.
196    #[must_use]
197    pub fn state_receiver(&self) -> watch::Receiver<EngineState> {
198        self.state_rx.clone()
199    }
200
201    /// Check if engine is ready to accept requests.
202    #[must_use]
203    pub fn is_ready(&self) -> bool {
204        matches!(self.state(), EngineState::Ready | EngineState::Running)
205    }
206
207    /// Get current memory pressure (0.0 - 1.0+).
208    #[must_use]
209    pub fn memory_pressure(&self) -> f64 {
210        let used = self.l1_size_bytes.load(Ordering::Acquire);
211        let max = self.config.read().l1_max_bytes;
212        if max == 0 {
213            0.0
214        } else {
215            used as f64 / max as f64
216        }
217    }
218
219    /// Get current backpressure level.
220    #[must_use]
221    pub fn pressure(&self) -> BackpressureLevel {
222        BackpressureLevel::from_pressure(self.memory_pressure())
223    }
224
225    /// Check if the engine should accept writes (based on pressure).
226    #[must_use]
227    pub fn should_accept_writes(&self) -> bool {
228        let pressure = self.pressure();
229        !matches!(pressure, BackpressureLevel::Emergency | BackpressureLevel::Shutdown)
230    }
231
232    // --- Core CRUD Operations ---
233
234    /// Get an item by ID.
235    ///
236    /// Checks storage tiers in order: L1 → L2 → L3.
237    /// Updates access count and promotes to L1 on hit.
238    #[tracing::instrument(skip(self), fields(tier))]
239    pub async fn get(&self, id: &str) -> Result<Option<SyncItem>, StorageError> {
240        let start = std::time::Instant::now();
241        
242        // 1. Check L1 (in-memory)
243        if let Some(mut item) = self.l1_cache.get_mut(id) {
244            item.access_count = item.access_count.saturating_add(1);
245            item.last_accessed = std::time::SystemTime::now()
246                .duration_since(std::time::UNIX_EPOCH)
247                .unwrap_or_default()
248                .as_millis() as u64;
249            tracing::Span::current().record("tier", "L1");
250            debug!("L1 hit");
251            crate::metrics::record_operation("L1", "get", "hit");
252            crate::metrics::record_latency("L1", "get", start.elapsed());
253            return Ok(Some(item.clone()));
254        }
255
256        // 2. Try L2 (Redis) - no filter, just try it
257        if let Some(ref l2) = self.l2_store {
258            match l2.get(id).await {
259                Ok(Some(item)) => {
260                    // Promote to L1
261                    self.insert_l1(item.clone());
262                    tracing::Span::current().record("tier", "L2");
263                    debug!("L2 hit, promoted to L1");
264                    crate::metrics::record_operation("L2", "get", "hit");
265                    crate::metrics::record_latency("L2", "get", start.elapsed());
266                    return Ok(Some(item));
267                }
268                Ok(None) => {
269                    // Not in Redis
270                    debug!("L2 miss");
271                    crate::metrics::record_operation("L2", "get", "miss");
272                }
273                Err(e) => {
274                    warn!(error = %e, "L2 lookup failed");
275                    crate::metrics::record_operation("L2", "get", "error");
276                }
277            }
278        }
279
280        // 3. Check L3 filter before hitting MySQL
281        if self.l3_filter.should_check_l3(id) {
282            crate::metrics::record_cuckoo_check("L3", "positive");
283            if let Some(ref l3) = self.l3_store {
284                match l3.get(id).await {
285                    Ok(Some(item)) => {
286                        // Promote to L1
287                        if self.memory_pressure() < 1.0 {
288                            self.insert_l1(item.clone());
289                        }
290                        tracing::Span::current().record("tier", "L3");
291                        debug!("L3 hit, promoted to L1");
292                        crate::metrics::record_operation("L3", "get", "hit");
293                        crate::metrics::record_latency("L3", "get", start.elapsed());
294                        crate::metrics::record_bytes_read("L3", item.content.len());
295                        return Ok(Some(item));
296                    }
297                    Ok(None) => {
298                        // False positive in filter
299                        debug!("L3 filter false positive");
300                        crate::metrics::record_operation("L3", "get", "false_positive");
301                        crate::metrics::record_cuckoo_false_positive("L3");
302                    }
303                    Err(e) => {
304                        warn!(error = %e, "L3 lookup failed");
305                        crate::metrics::record_operation("L3", "get", "error");
306                        crate::metrics::record_error("L3", "get", "backend");
307                    }
308                }
309            }
310        } else {
311            // Cuckoo filter says definitely not in L3 - saved a database query
312            crate::metrics::record_cuckoo_check("L3", "negative");
313        }
314
315        tracing::Span::current().record("tier", "miss");
316        debug!("Cache miss");
317        crate::metrics::record_operation("all", "get", "miss");
318        crate::metrics::record_latency("all", "get", start.elapsed());
319        Ok(None)
320    }
321
322    /// Get an item with hash verification.
323    ///
324    /// If the item has a non-empty `content_hash`, the content hash is verified.
325    /// Returns `StorageError::Corruption` if the hash doesn't match.
326    #[tracing::instrument(skip(self), fields(verified))]
327    pub async fn get_verified(&self, id: &str) -> Result<Option<SyncItem>, StorageError> {
328        let item = match self.get(id).await? {
329            Some(item) => item,
330            None => return Ok(None),
331        };
332
333        // Verify hash if item has content_hash set
334        if !item.content_hash.is_empty() {
335            use sha2::{Sha256, Digest};
336            
337            let computed = Sha256::digest(&item.content);
338            let computed_hex = hex::encode(computed);
339            
340            if computed_hex != item.content_hash {
341                tracing::Span::current().record("verified", false);
342                warn!(
343                    id = %id,
344                    expected = %item.content_hash,
345                    actual = %computed_hex,
346                    "Data corruption detected!"
347                );
348                
349                // Record corruption metric
350                crate::metrics::record_corruption(id);
351                
352                return Err(StorageError::Corruption {
353                    id: id.to_string(),
354                    expected: item.content_hash.clone(),
355                    actual: computed_hex,
356                });
357            }
358            
359            tracing::Span::current().record("verified", true);
360            debug!(id = %id, "Hash verification passed");
361        }
362
363        Ok(Some(item))
364    }
365
366    /// Submit an item for sync.
367    ///
368    /// The item is immediately stored in L1 and queued for batch write to L2/L3.
369    /// Uses default options: Redis + SQL (both enabled).
370    /// Filters are updated only on successful writes in flush_batch_internal().
371    ///
372    /// For custom routing, use [`submit_with`](Self::submit_with).
373    #[tracing::instrument(skip(self, item), fields(object_id = %item.object_id))]
374    pub async fn submit(&self, item: SyncItem) -> Result<(), StorageError> {
375        self.submit_with(item, SubmitOptions::default()).await
376    }
377
378    /// Submit an item with custom routing options.
379    ///
380    /// The item is immediately stored in L1 and queued for batch write.
381    /// Items are batched by compatible options for efficient pipelined writes.
382    ///
383    /// # Example
384    ///
385    /// ```rust,no_run
386    /// # use sync_engine::{SyncEngine, SyncItem, SubmitOptions, CacheTtl};
387    /// # async fn example(engine: &SyncEngine) -> Result<(), sync_engine::StorageError> {
388    /// // Cache-only with 1 minute TTL (no SQL write)
389    /// let item = SyncItem::new("cache.key".into(), b"data".to_vec());
390    /// engine.submit_with(item, SubmitOptions::cache(CacheTtl::Minute)).await?;
391    ///
392    /// // SQL-only durable storage (no Redis)
393    /// let item = SyncItem::new("archive.key".into(), b"data".to_vec());
394    /// engine.submit_with(item, SubmitOptions::durable()).await?;
395    /// # Ok(())
396    /// # }
397    /// ```
398    #[tracing::instrument(skip(self, item, options), fields(object_id = %item.object_id, redis = options.redis, sql = options.sql))]
399    pub async fn submit_with(&self, mut item: SyncItem, options: SubmitOptions) -> Result<(), StorageError> {
400        let start = std::time::Instant::now();
401        
402        if !self.should_accept_writes() {
403            crate::metrics::record_operation("engine", "submit", "rejected");
404            crate::metrics::record_error("engine", "submit", "backpressure");
405            return Err(StorageError::Backend(format!(
406                "Rejecting write: engine state={}, pressure={}",
407                self.state(),
408                self.pressure()
409            )));
410        }
411
412        let id = item.object_id.clone();
413        let item_bytes = item.content.len();
414
415        // Apply state override from options (if provided)
416        if let Some(ref state) = options.state {
417            item.state = state.clone();
418        }
419        
420        // Attach options to item (travels through batch pipeline)
421        item.submit_options = Some(options);
422
423        // Insert into L1 (immediate, in-memory)
424        self.insert_l1(item.clone());
425        crate::metrics::record_operation("L1", "submit", "success");
426        crate::metrics::record_bytes_written("L1", item_bytes);
427        
428        // NOTE: We do NOT insert into L2/L3 filters here!
429        // Filters are updated only on SUCCESSFUL writes in flush_batch_internal()
430        // This prevents filter/storage divergence if writes fail.
431        
432        // Queue for batched L2/L3 persistence
433        self.l2_batcher.lock().await.add(item);
434
435        debug!(id = %id, "Item submitted to L1 and batch queue");
436        crate::metrics::record_latency("L1", "submit", start.elapsed());
437        Ok(())
438    }
439
440    /// Delete an item from all storage tiers.
441    /// 
442    /// Deletes are more complex than writes because the item may exist in:
443    /// - L1 (DashMap) - immediate removal
444    /// - L2 (Redis) - async removal
445    /// - L3 (MySQL) - async removal  
446    /// - Cuckoo filters (L2/L3) - remove from both
447    /// - Merkle trees - update with deletion marker
448    #[tracing::instrument(skip(self), fields(object_id = %id))]
449    pub async fn delete(&self, id: &str) -> Result<bool, StorageError> {
450        let start = std::time::Instant::now();
451        
452        if !self.should_accept_writes() {
453            crate::metrics::record_operation("engine", "delete", "rejected");
454            crate::metrics::record_error("engine", "delete", "backpressure");
455            return Err(StorageError::Backend(format!(
456                "Rejecting delete: engine state={}, pressure={}",
457                self.state(),
458                self.pressure()
459            )));
460        }
461
462        let mut found = false;
463
464        // 1. Remove from L1 (immediate)
465        if let Some((_, item)) = self.l1_cache.remove(id) {
466            let size = Self::item_size(&item);
467            self.l1_size_bytes.fetch_sub(size, Ordering::Release);
468            found = true;
469            debug!("Deleted from L1");
470            crate::metrics::record_operation("L1", "delete", "success");
471        }
472
473        // 2. Remove from L3 cuckoo filter only (no L2 filter - TTL makes it unreliable)
474        self.l3_filter.remove(id);
475
476        // 3. Delete from L2 (Redis) - best effort
477        if let Some(ref l2) = self.l2_store {
478            let l2_start = std::time::Instant::now();
479            match l2.delete(id).await {
480                Ok(()) => {
481                    found = true;
482                    debug!("Deleted from L2 (Redis)");
483                    crate::metrics::record_operation("L2", "delete", "success");
484                    crate::metrics::record_latency("L2", "delete", l2_start.elapsed());
485                }
486                Err(e) => {
487                    warn!(error = %e, "Failed to delete from L2 (Redis)");
488                    crate::metrics::record_operation("L2", "delete", "error");
489                    crate::metrics::record_error("L2", "delete", "backend");
490                }
491            }
492        }
493
494        // 4. Delete from L3 (MySQL) - ground truth
495        if let Some(ref l3) = self.l3_store {
496            let l3_start = std::time::Instant::now();
497            match l3.delete(id).await {
498                Ok(()) => {
499                    found = true;
500                    debug!("Deleted from L3 (MySQL)");
501                    crate::metrics::record_operation("L3", "delete", "success");
502                    crate::metrics::record_latency("L3", "delete", l3_start.elapsed());
503                }
504                Err(e) => {
505                    error!(error = %e, "Failed to delete from L3 (MySQL)");
506                    crate::metrics::record_operation("L3", "delete", "error");
507                    crate::metrics::record_error("L3", "delete", "backend");
508                    // Don't return error - item may not exist in L3
509                }
510            }
511        }
512
513        // 5. Update merkle trees with deletion marker
514        let mut merkle_batch = MerkleBatch::new();
515        merkle_batch.delete(id.to_string());
516
517        if let Some(ref sql_merkle) = self.sql_merkle {
518            if let Err(e) = sql_merkle.apply_batch(&merkle_batch).await {
519                error!(error = %e, "Failed to update SQL Merkle tree for deletion");
520                crate::metrics::record_error("L3", "merkle", "batch_apply");
521            }
522        }
523
524        if let Some(ref redis_merkle) = self.redis_merkle {
525            if let Err(e) = redis_merkle.apply_batch(&merkle_batch).await {
526                warn!(error = %e, "Failed to update Redis Merkle tree for deletion");
527                crate::metrics::record_error("L2", "merkle", "batch_apply");
528            }
529        }
530
531        // 6. Emit CDC delete entry (if enabled)
532        if self.config.read().enable_cdc_stream && found {
533            self.emit_cdc_delete(id).await;
534        }
535
536        info!(found, "Delete operation completed");
537        crate::metrics::record_latency("all", "delete", start.elapsed());
538        Ok(found)
539    }
540
541    // --- Internal helpers ---
542
543    /// Insert or update an item in L1, correctly tracking size.
544    fn insert_l1(&self, item: SyncItem) {
545        let new_size = Self::item_size(&item);
546        let key = item.object_id.clone();
547        
548        // Use entry API to handle insert vs update atomically
549        if let Some(old_item) = self.l1_cache.insert(key, item) {
550            // Update: subtract old size, add new size
551            let old_size = Self::item_size(&old_item);
552            // Use wrapping operations to avoid underflow if sizes are estimated
553            let current = self.l1_size_bytes.load(Ordering::Acquire);
554            let new_total = current.saturating_sub(old_size).saturating_add(new_size);
555            self.l1_size_bytes.store(new_total, Ordering::Release);
556        } else {
557            // Insert: just add new size
558            self.l1_size_bytes.fetch_add(new_size, Ordering::Release);
559        }
560    }
561
562    /// Calculate approximate size of an item in bytes.
563    #[inline]
564    fn item_size(item: &SyncItem) -> usize {
565        // Use cached size if available, otherwise compute
566        item.size_bytes()
567    }
568
569    fn maybe_evict(&self) {
570        let pressure = self.memory_pressure();
571        if pressure < self.config.read().backpressure_warn {
572            return;
573        }
574
575        let level = BackpressureLevel::from_pressure(pressure);
576        debug!(pressure = %pressure, level = %level, "Memory pressure detected, running eviction");
577        
578        // Collect cache entries for scoring
579        let now = std::time::Instant::now();
580        let entries: Vec<CacheEntry> = self.l1_cache.iter()
581            .map(|ref_multi| {
582                let item = ref_multi.value();
583                let id = ref_multi.key().clone();
584                
585                // Convert epoch millis to Instant-relative age
586                let now_millis = std::time::SystemTime::now()
587                    .duration_since(std::time::UNIX_EPOCH)
588                    .unwrap_or_default()
589                    .as_millis() as u64;
590                let age_secs = if item.last_accessed > 0 {
591                    (now_millis.saturating_sub(item.last_accessed)) as f64 / 1000.0
592                } else {
593                    3600.0 // Default 1 hour if never accessed
594                };
595                
596                CacheEntry {
597                    id,
598                    size_bytes: item.size_bytes(),
599                    created_at: now - std::time::Duration::from_secs_f64(age_secs),
600                    last_access: now - std::time::Duration::from_secs_f64(age_secs),
601                    access_count: item.access_count,
602                    is_dirty: false, // All items in L1 are assumed flushed to L2/L3
603                }
604            })
605            .collect();
606        
607        if entries.is_empty() {
608            return;
609        }
610        
611        // Calculate how many to evict based on pressure level
612        let evict_count = match level {
613            BackpressureLevel::Normal => 0,
614            BackpressureLevel::Warn => entries.len() / 20,    // 5%
615            BackpressureLevel::Throttle => entries.len() / 10, // 10%
616            BackpressureLevel::Critical => entries.len() / 5,  // 20%
617            BackpressureLevel::Emergency => entries.len() / 3, // 33%
618            BackpressureLevel::Shutdown => entries.len() / 2,  // 50%
619        }.max(1);
620        
621        // Select victims using tan curve algorithm
622        let victims = self.eviction_policy.select_victims(&entries, evict_count, pressure);
623        
624        // Evict victims
625        let mut evicted_bytes = 0usize;
626        for victim_id in &victims {
627            if let Some((_, item)) = self.l1_cache.remove(victim_id) {
628                evicted_bytes += item.size_bytes();
629            }
630        }
631        
632        // Update size tracking
633        self.l1_size_bytes.fetch_sub(evicted_bytes, Ordering::Release);
634        
635        info!(
636            evicted = victims.len(),
637            evicted_bytes = evicted_bytes,
638            pressure = %pressure,
639            level = %level,
640            "Evicted entries from L1 cache"
641        );
642    }
643
644    /// Get L1 cache stats
645    pub fn l1_stats(&self) -> (usize, usize) {
646        (
647            self.l1_cache.len(),
648            self.l1_size_bytes.load(Ordering::Acquire),
649        )
650    }
651
652    /// Get L3 filter stats (entries, capacity, trust_state)
653    #[must_use]
654    pub fn l3_filter_stats(&self) -> (usize, usize, FilterTrust) {
655        self.l3_filter.stats()
656    }
657
658    /// Get access to the L3 filter (for warmup/verification)
659    pub fn l3_filter(&self) -> &Arc<FilterManager> {
660        &self.l3_filter
661    }
662
663    /// Get merkle root hashes from Redis (L2) and SQL (L3).
664    /// 
665    /// Returns `(redis_root, sql_root)` as hex strings.
666    /// Returns `None` for backends that aren't connected or have empty trees.
667    pub async fn merkle_roots(&self) -> (Option<String>, Option<String>) {
668        let redis_root = if let Some(ref rm) = self.redis_merkle {
669            rm.root_hash().await.ok().flatten().map(hex::encode)
670        } else {
671            None
672        };
673        
674        let sql_root = if let Some(ref sm) = self.sql_merkle {
675            sm.root_hash().await.ok().flatten().map(hex::encode)
676        } else {
677            None
678        };
679        
680        (redis_root, sql_root)
681    }
682
683    /// Verify and trust the L3 cuckoo filter.
684    /// 
685    /// Compares the filter's merkle root against L3's merkle root.
686    /// If they match, marks the filter as trusted.
687    /// 
688    /// Returns `true` if the filter is now trusted, `false` otherwise.
689    pub async fn verify_filter(&self) -> bool {
690        // Get SQL merkle root
691        let sql_root = if let Some(ref sm) = self.sql_merkle {
692            match sm.root_hash().await {
693                Ok(Some(root)) => root,
694                _ => return false,
695            }
696        } else {
697            // No SQL backend - can't verify, mark trusted anyway
698            self.l3_filter.mark_trusted();
699            return true;
700        };
701
702        // For now, we trust the filter if we have a SQL root
703        // A full implementation would compare CF merkle against SQL merkle
704        // But since CF doesn't maintain a merkle tree, we trust after warmup
705        info!(
706            sql_root = %hex::encode(sql_root),
707            "Verifying L3 filter against SQL merkle root"
708        );
709        
710        // Mark trusted if we got here (SQL is connected and has a root)
711        self.l3_filter.mark_trusted();
712        true
713    }
714
715    /// Update all gauge metrics with current engine state.
716    /// 
717    /// Call this before snapshotting metrics to ensure gauges reflect current state.
718    /// Useful for OTEL export or monitoring dashboards.
719    pub fn update_gauge_metrics(&self) {
720        let (l1_count, l1_bytes) = self.l1_stats();
721        crate::metrics::set_l1_cache_items(l1_count);
722        crate::metrics::set_l1_cache_bytes(l1_bytes);
723        crate::metrics::set_memory_pressure(self.memory_pressure());
724        
725        let (filter_entries, filter_capacity, _trust) = self.l3_filter_stats();
726        let filter_load = if filter_capacity > 0 { 
727            filter_entries as f64 / filter_capacity as f64 
728        } else { 
729            0.0 
730        };
731        crate::metrics::set_cuckoo_filter_entries("L3", filter_entries);
732        crate::metrics::set_cuckoo_filter_load("L3", filter_load);
733        
734        crate::metrics::set_backpressure_level(self.pressure() as u8);
735    }
736}
737
738#[cfg(test)]
739mod tests {
740    use super::*;
741    use crate::config::SyncEngineConfig;
742    use tokio::sync::watch;
743    use serde_json::json;
744
745    fn create_test_engine() -> SyncEngine {
746        let config = SyncEngineConfig::default();
747        let (_tx, rx) = watch::channel(config.clone());
748        SyncEngine::new(config, rx)
749    }
750
751    fn create_test_item(id: &str) -> SyncItem {
752        SyncItem::from_json(
753            id.to_string(),
754            json!({"data": "test"}),
755        )
756    }
757
758    #[test]
759    fn test_engine_created_state() {
760        let engine = create_test_engine();
761        assert_eq!(engine.state(), EngineState::Created);
762        assert!(!engine.is_ready());
763    }
764
765    #[test]
766    fn test_memory_pressure_calculation() {
767        let config = SyncEngineConfig {
768            l1_max_bytes: 1000,
769            ..Default::default()
770        };
771        let (_tx, rx) = watch::channel(config.clone());
772        let engine = SyncEngine::new(config, rx);
773
774        assert_eq!(engine.memory_pressure(), 0.0);
775
776        // Simulate adding items
777        let item = create_test_item("test1");
778        engine.insert_l1(item);
779
780        // Pressure should be > 0 now
781        assert!(engine.memory_pressure() > 0.0);
782    }
783
784    #[test]
785    fn test_l1_insert_and_size_tracking() {
786        let engine = create_test_engine();
787        
788        let item = create_test_item("test1");
789        let expected_size = item.size_bytes();
790        
791        engine.insert_l1(item);
792        
793        let (count, size) = engine.l1_stats();
794        assert_eq!(count, 1);
795        assert_eq!(size, expected_size);
796    }
797
798    #[test]
799    fn test_l1_update_size_tracking() {
800        let engine = create_test_engine();
801        
802        let item1 = create_test_item("test1");
803        engine.insert_l1(item1);
804        let (_, _size1) = engine.l1_stats();
805        
806        // Insert larger item with same ID
807        let item2 = SyncItem::from_json(
808            "test1".to_string(),
809            json!({"data": "much larger content here for testing size changes"}),
810        );
811        let size2_expected = item2.size_bytes();
812        engine.insert_l1(item2);
813        
814        let (count, size2) = engine.l1_stats();
815        assert_eq!(count, 1); // Still one item
816        assert_eq!(size2, size2_expected); // Size should be updated
817    }
818
819    #[tokio::test]
820    async fn test_get_nonexistent() {
821        let engine = create_test_engine();
822        let result = engine.get("nonexistent").await.unwrap();
823        assert!(result.is_none());
824    }
825
826    #[tokio::test]
827    async fn test_get_from_l1() {
828        let engine = create_test_engine();
829        let item = create_test_item("test1");
830        engine.insert_l1(item.clone());
831
832        let result = engine.get("test1").await.unwrap();
833        assert!(result.is_some());
834        assert_eq!(result.unwrap().object_id, "test1");
835    }
836
837    #[tokio::test]
838    async fn test_delete_from_l1() {
839        let engine = create_test_engine();
840        let item = create_test_item("test1");
841        engine.insert_l1(item);
842
843        let (count_before, _) = engine.l1_stats();
844        assert_eq!(count_before, 1);
845
846        let deleted = engine.delete("test1").await.unwrap();
847        assert!(deleted);
848
849        let (count_after, size_after) = engine.l1_stats();
850        assert_eq!(count_after, 0);
851        assert_eq!(size_after, 0);
852    }
853
854    #[test]
855    fn test_filter_stats() {
856        let engine = create_test_engine();
857        
858        let (entries, capacity, _trust) = engine.l3_filter_stats();
859        assert_eq!(entries, 0);
860        assert!(capacity > 0);
861    }
862
863    #[test]
864    fn test_should_accept_writes() {
865        let engine = create_test_engine();
866        assert!(engine.should_accept_writes());
867    }
868
869    #[test]
870    fn test_pressure_level() {
871        let engine = create_test_engine();
872        assert_eq!(engine.pressure(), BackpressureLevel::Normal);
873    }
874}