sync_engine/coordinator/
mod.rs

1// Copyright (c) 2025-2026 Adrian Robinson. Licensed under the AGPL-3.0.
2// See LICENSE file in the project root for full license text.
3
4//! Sync engine coordinator.
5//!
6//! The [`SyncEngine`] is the main orchestrator that ties together all components:
7//! - L1 in-memory cache with eviction
8//! - L2 Redis cache with batch writes
9//! - L3 MySQL/SQLite archive with WAL durability
10//! - Cuckoo filters for existence checks
11//! - Merkle trees for sync verification
12//!
13//! # Lifecycle
14//!
15//! ```text
16//! Created → Connecting → DrainingWal → SyncingRedis → WarmingUp → Ready → Running → ShuttingDown
17//! ```
18//!
19//! # Example
20//!
21//! ```rust,no_run
22//! use sync_engine::{SyncEngine, SyncEngineConfig, SyncItem, EngineState};
23//! use serde_json::json;
24//! use tokio::sync::watch;
25//!
26//! # #[tokio::main]
27//! # async fn main() {
28//! let config = SyncEngineConfig::default();
29//! let (_tx, rx) = watch::channel(config.clone());
30//! let mut engine = SyncEngine::new(config, rx);
31//!
32//! assert_eq!(engine.state(), EngineState::Created);
33//!
34//! // engine.start().await.expect("Start failed");
35//! // assert!(engine.is_ready());
36//! # }
37//! ```
38
39mod types;
40mod api;
41mod lifecycle;
42mod flush;
43mod merkle_api;
44mod search_api;
45
46pub use types::{EngineState, ItemStatus, BatchResult, HealthCheck};
47pub use merkle_api::MerkleDiff;
48pub use search_api::{SearchTier, SearchResult, SearchSource};
49#[allow(unused_imports)]
50use types::WriteTarget;
51
52use std::sync::Arc;
53use std::sync::atomic::{AtomicUsize, AtomicU64, Ordering};
54use std::time::Instant;
55use dashmap::DashMap;
56use parking_lot::RwLock;
57use tokio::sync::{watch, Mutex, Semaphore};
58use tracing::{info, warn, debug, error};
59
60use crate::config::SyncEngineConfig;
61use crate::sync_item::SyncItem;
62use crate::submit_options::SubmitOptions;
63use crate::backpressure::BackpressureLevel;
64use crate::storage::traits::{CacheStore, ArchiveStore, StorageError};
65use crate::storage::sql::SqlStore;
66use crate::cuckoo::filter_manager::{FilterManager, FilterTrust};
67use crate::cuckoo::FilterPersistence;
68use crate::batching::hybrid_batcher::{HybridBatcher, BatchConfig, SizedItem};
69use crate::merkle::{RedisMerkleStore, SqlMerkleStore, MerkleBatch};
70use crate::resilience::wal::{WriteAheadLog, MysqlHealthChecker};
71use crate::eviction::tan_curve::{TanCurvePolicy, CacheEntry};
72
73use search_api::SearchState;
74
75/// Main sync engine coordinator.
76///
77/// Manages the three-tier storage architecture:
78/// - **L1**: In-memory DashMap with pressure-based eviction
79/// - **L2**: Redis cache with batch writes
80/// - **L3**: MySQL/SQLite archive (ground truth)
81///
82/// # Thread Safety
83///
84/// The engine is `Send + Sync` and designed for concurrent access.
85/// Internal state uses atomic operations and concurrent data structures.
86pub struct SyncEngine {
87    /// Configuration (can be updated at runtime via watch channel)
88    /// Uses RwLock for interior mutability so run() can take &self
89    pub(super) config: RwLock<SyncEngineConfig>,
90
91    /// Runtime config updates (Mutex for interior mutability in run loop)
92    pub(super) config_rx: Mutex<watch::Receiver<SyncEngineConfig>>,
93
94    /// Engine state (broadcast to watchers)
95    pub(super) state: watch::Sender<EngineState>,
96
97    /// Engine state receiver (for internal use)
98    pub(super) state_rx: watch::Receiver<EngineState>,
99
100    /// L1: In-memory cache
101    pub(super) l1_cache: Arc<DashMap<String, SyncItem>>,
102
103    /// L1 size tracking (bytes)
104    pub(super) l1_size_bytes: Arc<AtomicUsize>,
105
106    /// L2: Redis cache (optional)
107    pub(super) l2_store: Option<Arc<dyn CacheStore>>,
108    
109    /// L2: Direct RedisStore reference for CDC operations
110    pub(super) redis_store: Option<Arc<crate::storage::redis::RedisStore>>,
111
112    /// L3: MySQL/SQLite archive (optional)
113    pub(super) l3_store: Option<Arc<dyn ArchiveStore>>,
114    
115    /// L3: Direct SqlStore reference for dirty merkle operations
116    pub(super) sql_store: Option<Arc<SqlStore>>,
117
118    /// L3 Cuckoo filter (L2 has no filter - TTL makes it unreliable)
119    pub(super) l3_filter: Arc<FilterManager>,
120
121    /// Filter persistence (for fast startup)
122    pub(super) filter_persistence: Option<FilterPersistence>,
123
124    /// CF snapshot tracking
125    pub(super) cf_inserts_since_snapshot: AtomicU64,
126    pub(super) cf_last_snapshot: Mutex<Instant>,
127
128    /// Hybrid batcher for L2 writes
129    pub(super) l2_batcher: Mutex<HybridBatcher<SyncItem>>,
130
131    /// Redis merkle store
132    pub(super) redis_merkle: Option<RedisMerkleStore>,
133
134    /// SQL merkle store
135    pub(super) sql_merkle: Option<SqlMerkleStore>,
136
137    /// Write-ahead log for L3 durability
138    pub(super) l3_wal: Option<WriteAheadLog>,
139
140    /// MySQL health checker
141    pub(super) mysql_health: MysqlHealthChecker,
142
143    /// Eviction policy
144    pub(super) eviction_policy: TanCurvePolicy,
145
146    /// Search state (index manager + cache)
147    pub(super) search_state: Option<Arc<RwLock<SearchState>>>,
148
149    /// SQL write concurrency limiter.
150    /// 
151    /// Limits concurrent sql_put_batch and merkle_nodes updates to reduce
152    /// row-level lock contention and deadlocks under high load.
153    pub(super) sql_write_semaphore: Arc<Semaphore>,
154}
155
156impl SyncEngine {
157    /// Create a new sync engine.
158    ///
159    /// The engine starts in `Created` state. Call [`start()`](Self::start)
160    /// to connect to backends and transition to `Ready`.
161    pub fn new(config: SyncEngineConfig, config_rx: watch::Receiver<SyncEngineConfig>) -> Self {
162        let (state_tx, state_rx) = watch::channel(EngineState::Created);
163
164        let batch_config = BatchConfig {
165            flush_ms: config.batch_flush_ms,
166            flush_count: config.batch_flush_count,
167            flush_bytes: config.batch_flush_bytes,
168        };
169
170        // SQL write concurrency limiter - reduces row lock contention
171        let sql_write_semaphore = Arc::new(Semaphore::new(config.sql_write_concurrency));
172
173        Self {
174            config: RwLock::new(config.clone()),
175            config_rx: Mutex::new(config_rx),
176            state: state_tx,
177            state_rx,
178            l1_cache: Arc::new(DashMap::new()),
179            l1_size_bytes: Arc::new(AtomicUsize::new(0)),
180            l2_store: None,
181            redis_store: None,
182            l3_store: None,
183            sql_store: None,
184            l3_filter: Arc::new(FilterManager::new("sync-engine-l3", 100_000)),
185            filter_persistence: None,
186            cf_inserts_since_snapshot: AtomicU64::new(0),
187            cf_last_snapshot: Mutex::new(Instant::now()),
188            l2_batcher: Mutex::new(HybridBatcher::new(batch_config)),
189            redis_merkle: None,
190            sql_merkle: None,
191            l3_wal: None,
192            mysql_health: MysqlHealthChecker::new(),
193            eviction_policy: TanCurvePolicy::default(),
194            search_state: Some(Arc::new(RwLock::new(SearchState::default()))),
195            sql_write_semaphore,
196        }
197    }
198
199    /// Get current engine state.
200    #[must_use]
201    pub fn state(&self) -> EngineState {
202        *self.state_rx.borrow()
203    }
204
205    /// Get a receiver to watch state changes.
206    #[must_use]
207    pub fn state_receiver(&self) -> watch::Receiver<EngineState> {
208        self.state_rx.clone()
209    }
210
211    /// Check if engine is ready to accept requests.
212    #[must_use]
213    pub fn is_ready(&self) -> bool {
214        matches!(self.state(), EngineState::Ready | EngineState::Running)
215    }
216
217    /// Get current memory pressure (0.0 - 1.0+).
218    #[must_use]
219    pub fn memory_pressure(&self) -> f64 {
220        let used = self.l1_size_bytes.load(Ordering::Acquire);
221        let max = self.config.read().l1_max_bytes;
222        if max == 0 {
223            0.0
224        } else {
225            used as f64 / max as f64
226        }
227    }
228
229    /// Get current backpressure level.
230    #[must_use]
231    pub fn pressure(&self) -> BackpressureLevel {
232        BackpressureLevel::from_pressure(self.memory_pressure())
233    }
234
235    /// Check if the engine should accept writes (based on pressure).
236    #[must_use]
237    pub fn should_accept_writes(&self) -> bool {
238        let pressure = self.pressure();
239        !matches!(pressure, BackpressureLevel::Emergency | BackpressureLevel::Shutdown)
240    }
241
242    /// Perform a comprehensive health check.
243    ///
244    /// This probes backend connectivity (Redis PING, SQL SELECT 1) and
245    /// collects internal state into a [`HealthCheck`] struct suitable for
246    /// `/ready` and `/health` endpoints.
247    ///
248    /// # Performance
249    ///
250    /// - **Cached fields**: Instant (no I/O)
251    /// - **Live probes**: Redis PING + SQL SELECT 1 (parallel, ~1-10ms each)
252    ///
253    /// # Example
254    ///
255    /// ```rust,ignore
256    /// let health = engine.health_check().await;
257    ///
258    /// // For /ready endpoint (load balancer)
259    /// if health.healthy {
260    ///     HttpResponse::Ok().body("ready")
261    /// } else {
262    ///     HttpResponse::ServiceUnavailable().body("not ready")
263    /// }
264    ///
265    /// // For /health endpoint (diagnostics)
266    /// HttpResponse::Ok().json(health)
267    /// ```
268    pub async fn health_check(&self) -> types::HealthCheck {
269        // Cached state (no I/O)
270        let state = self.state();
271        let ready = matches!(state, EngineState::Ready | EngineState::Running);
272        let memory_pressure = self.memory_pressure();
273        let backpressure_level = self.pressure();
274        let accepting_writes = self.should_accept_writes();
275        let (l1_items, l1_bytes) = self.l1_stats();
276        let (filter_items, _, filter_trust) = self.l3_filter_stats();
277        
278        // WAL stats (cached, no I/O)
279        // Note: WalStats doesn't have file_size, so we only report pending items
280        let mysql_healthy = self.mysql_health.is_healthy();
281        let wal_pending_items = if let Some(ref wal) = self.l3_wal {
282            let stats = wal.stats(mysql_healthy);
283            Some(stats.pending_items)
284        } else {
285            None
286        };
287        
288        // Live backend probes (parallel)
289        let (redis_result, sql_result) = tokio::join!(
290            self.probe_redis(),
291            self.probe_sql()
292        );
293        
294        let (redis_connected, redis_latency_ms) = redis_result;
295        let (sql_connected, sql_latency_ms) = sql_result;
296        
297        // Derive overall health
298        // Healthy if: running, backends connected (or not configured), not in critical backpressure
299        let healthy = matches!(state, EngineState::Running)
300            && redis_connected != Some(false)
301            && sql_connected != Some(false)
302            && !matches!(backpressure_level, BackpressureLevel::Emergency | BackpressureLevel::Shutdown);
303        
304        types::HealthCheck {
305            state,
306            ready,
307            memory_pressure,
308            backpressure_level,
309            accepting_writes,
310            l1_items,
311            l1_bytes,
312            filter_items,
313            filter_trust,
314            redis_connected,
315            redis_latency_ms,
316            sql_connected,
317            sql_latency_ms,
318            wal_pending_items,
319            healthy,
320        }
321    }
322    
323    /// Probe Redis connectivity with PING.
324    async fn probe_redis(&self) -> (Option<bool>, Option<u64>) {
325        let Some(ref redis_store) = self.redis_store else {
326            return (None, None); // Redis not configured
327        };
328        
329        let start = std::time::Instant::now();
330        let mut conn = redis_store.connection();
331        
332        let result: Result<String, _> = redis::cmd("PING")
333            .query_async(&mut conn)
334            .await;
335        
336        match result {
337            Ok(_) => {
338                let latency_ms = start.elapsed().as_millis() as u64;
339                (Some(true), Some(latency_ms))
340            }
341            Err(_) => (Some(false), None),
342        }
343    }
344    
345    /// Probe SQL connectivity with SELECT 1.
346    async fn probe_sql(&self) -> (Option<bool>, Option<u64>) {
347        let Some(ref sql_store) = self.sql_store else {
348            return (None, None); // SQL not configured
349        };
350        
351        let start = std::time::Instant::now();
352        let pool = sql_store.pool();
353        
354        let result = sqlx::query("SELECT 1")
355            .fetch_one(&pool)
356            .await;
357        
358        match result {
359            Ok(_) => {
360                let latency_ms = start.elapsed().as_millis() as u64;
361                (Some(true), Some(latency_ms))
362            }
363            Err(_) => (Some(false), None),
364        }
365    }
366
367    // --- Core CRUD Operations ---
368
369    /// Get an item by ID.
370    ///
371    /// Checks storage tiers in order: L1 → L2 → L3.
372    /// Updates access count and promotes to L1 on hit.
373    #[tracing::instrument(skip(self), fields(tier))]
374    pub async fn get(&self, id: &str) -> Result<Option<SyncItem>, StorageError> {
375        let start = std::time::Instant::now();
376        
377        // 1. Check L1 (in-memory)
378        if let Some(mut item) = self.l1_cache.get_mut(id) {
379            item.access_count = item.access_count.saturating_add(1);
380            item.last_accessed = std::time::SystemTime::now()
381                .duration_since(std::time::UNIX_EPOCH)
382                .unwrap_or_default()
383                .as_millis() as u64;
384            tracing::Span::current().record("tier", "L1");
385            debug!("L1 hit");
386            crate::metrics::record_operation("L1", "get", "hit");
387            crate::metrics::record_latency("L1", "get", start.elapsed());
388            return Ok(Some(item.clone()));
389        }
390
391        // 2. Try L2 (Redis) - no filter, just try it
392        if let Some(ref l2) = self.l2_store {
393            match l2.get(id).await {
394                Ok(Some(item)) => {
395                    // Promote to L1
396                    self.insert_l1(item.clone());
397                    tracing::Span::current().record("tier", "L2");
398                    debug!("L2 hit, promoted to L1");
399                    crate::metrics::record_operation("L2", "get", "hit");
400                    crate::metrics::record_latency("L2", "get", start.elapsed());
401                    return Ok(Some(item));
402                }
403                Ok(None) => {
404                    // Not in Redis
405                    debug!("L2 miss");
406                    crate::metrics::record_operation("L2", "get", "miss");
407                }
408                Err(e) => {
409                    warn!(error = %e, "L2 lookup failed");
410                    crate::metrics::record_operation("L2", "get", "error");
411                }
412            }
413        }
414
415        // 3. Check L3 filter before hitting MySQL
416        if self.l3_filter.should_check_l3(id) {
417            crate::metrics::record_cuckoo_check("L3", "positive");
418            if let Some(ref l3) = self.l3_store {
419                match l3.get(id).await {
420                    Ok(Some(item)) => {
421                        // Promote to L1
422                        if self.memory_pressure() < 1.0 {
423                            self.insert_l1(item.clone());
424                        }
425                        
426                        // Also populate L2 (read-through cache)
427                        if let Some(ref l2) = self.l2_store {
428                            if let Err(e) = l2.put(&item).await {
429                                warn!(id = %id, error = %e, "Failed to populate L2 on read");
430                            } else {
431                                debug!("L3 hit, promoted to L1 and L2");
432                            }
433                        } else {
434                            debug!("L3 hit, promoted to L1");
435                        }
436                        
437                        tracing::Span::current().record("tier", "L3");
438                        crate::metrics::record_operation("L3", "get", "hit");
439                        crate::metrics::record_latency("L3", "get", start.elapsed());
440                        crate::metrics::record_bytes_read("L3", item.content.len());
441                        return Ok(Some(item));
442                    }
443                    Ok(None) => {
444                        // False positive in filter
445                        debug!("L3 filter false positive");
446                        crate::metrics::record_operation("L3", "get", "false_positive");
447                        crate::metrics::record_cuckoo_false_positive("L3");
448                    }
449                    Err(e) => {
450                        warn!(error = %e, "L3 lookup failed");
451                        crate::metrics::record_operation("L3", "get", "error");
452                        crate::metrics::record_error("L3", "get", "backend");
453                    }
454                }
455            }
456        } else {
457            // Cuckoo filter says definitely not in L3 - saved a database query
458            crate::metrics::record_cuckoo_check("L3", "negative");
459        }
460
461        tracing::Span::current().record("tier", "miss");
462        debug!("Cache miss");
463        crate::metrics::record_operation("all", "get", "miss");
464        crate::metrics::record_latency("all", "get", start.elapsed());
465        Ok(None)
466    }
467
468    /// Get an item with hash verification.
469    ///
470    /// If the item has a non-empty `content_hash`, the content hash is verified.
471    /// Returns `StorageError::Corruption` if the hash doesn't match.
472    #[tracing::instrument(skip(self), fields(verified))]
473    pub async fn get_verified(&self, id: &str) -> Result<Option<SyncItem>, StorageError> {
474        let item = match self.get(id).await? {
475            Some(item) => item,
476            None => return Ok(None),
477        };
478
479        // Verify hash if item has content_hash set
480        if !item.content_hash.is_empty() {
481            use sha2::{Sha256, Digest};
482            
483            let computed = Sha256::digest(&item.content);
484            let computed_hex = hex::encode(computed);
485            
486            if computed_hex != item.content_hash {
487                tracing::Span::current().record("verified", false);
488                warn!(
489                    id = %id,
490                    expected = %item.content_hash,
491                    actual = %computed_hex,
492                    "Data corruption detected!"
493                );
494                
495                // Record corruption metric
496                crate::metrics::record_corruption(id);
497                
498                return Err(StorageError::Corruption {
499                    id: id.to_string(),
500                    expected: item.content_hash.clone(),
501                    actual: computed_hex,
502                });
503            }
504            
505            tracing::Span::current().record("verified", true);
506            debug!(id = %id, "Hash verification passed");
507        }
508
509        Ok(Some(item))
510    }
511
512    /// Submit an item for sync.
513    ///
514    /// The item is immediately stored in L1 and queued for batch write to L2/L3.
515    /// Uses default options: Redis + SQL (both enabled).
516    /// Filters are updated only on successful writes in flush_batch_internal().
517    ///
518    /// For custom routing, use [`submit_with`](Self::submit_with).
519    #[tracing::instrument(skip(self, item), fields(object_id = %item.object_id))]
520    pub async fn submit(&self, item: SyncItem) -> Result<(), StorageError> {
521        self.submit_with(item, SubmitOptions::default()).await
522    }
523
524    /// Submit an item with custom routing options.
525    ///
526    /// The item is immediately stored in L1 and queued for batch write.
527    /// Items are batched by compatible options for efficient pipelined writes.
528    ///
529    /// # Example
530    ///
531    /// ```rust,no_run
532    /// # use sync_engine::{SyncEngine, SyncItem, SubmitOptions, CacheTtl};
533    /// # async fn example(engine: &SyncEngine) -> Result<(), sync_engine::StorageError> {
534    /// // Cache-only with 1 minute TTL (no SQL write)
535    /// let item = SyncItem::new("cache.key".into(), b"data".to_vec());
536    /// engine.submit_with(item, SubmitOptions::cache(CacheTtl::Minute)).await?;
537    ///
538    /// // SQL-only durable storage (no Redis)
539    /// let item = SyncItem::new("archive.key".into(), b"data".to_vec());
540    /// engine.submit_with(item, SubmitOptions::durable()).await?;
541    /// # Ok(())
542    /// # }
543    /// ```
544    #[tracing::instrument(skip(self, item, options), fields(object_id = %item.object_id, redis = options.redis, sql = options.sql))]
545    pub async fn submit_with(&self, mut item: SyncItem, options: SubmitOptions) -> Result<(), StorageError> {
546        let start = std::time::Instant::now();
547        
548        if !self.should_accept_writes() {
549            crate::metrics::record_operation("engine", "submit", "rejected");
550            crate::metrics::record_error("engine", "submit", "backpressure");
551            return Err(StorageError::Backend(format!(
552                "Rejecting write: engine state={}, pressure={}",
553                self.state(),
554                self.pressure()
555            )));
556        }
557
558        let id = item.object_id.clone();
559        let item_bytes = item.content.len();
560
561        // Apply state override from options (if provided)
562        if let Some(ref state) = options.state {
563            item.state = state.clone();
564        }
565        
566        // Attach options to item (travels through batch pipeline)
567        item.submit_options = Some(options);
568
569        // Insert into L1 (immediate, in-memory)
570        self.insert_l1(item.clone());
571        crate::metrics::record_operation("L1", "submit", "success");
572        crate::metrics::record_bytes_written("L1", item_bytes);
573        
574        // NOTE: We do NOT insert into L2/L3 filters here!
575        // Filters are updated only on SUCCESSFUL writes in flush_batch_internal()
576        // This prevents filter/storage divergence if writes fail.
577        
578        // Queue for batched L2/L3 persistence
579        let batch_to_flush = {
580            let mut batcher = self.l2_batcher.lock().await;
581            if let Some(reason) = batcher.add(item) {
582                batcher.force_flush_with_reason(reason)
583            } else {
584                None
585            }
586        };
587
588        if let Some(batch) = batch_to_flush {
589            // Flush immediately (provides backpressure)
590            self.flush_batch_internal(batch).await;
591        }
592
593        debug!(id = %id, "Item submitted to L1 and batch queue");
594        crate::metrics::record_latency("L1", "submit", start.elapsed());
595        Ok(())
596    }
597
598    /// Delete an item from all storage tiers.
599    /// 
600    /// Deletes are more complex than writes because the item may exist in:
601    /// - L1 (DashMap) - immediate removal
602    /// - L2 (Redis) - async removal
603    /// - L3 (MySQL) - async removal  
604    /// - Cuckoo filters (L2/L3) - remove from both
605    /// - Merkle trees - update with deletion marker
606    #[tracing::instrument(skip(self), fields(object_id = %id))]
607    pub async fn delete(&self, id: &str) -> Result<bool, StorageError> {
608        let start = std::time::Instant::now();
609        
610        if !self.should_accept_writes() {
611            crate::metrics::record_operation("engine", "delete", "rejected");
612            crate::metrics::record_error("engine", "delete", "backpressure");
613            return Err(StorageError::Backend(format!(
614                "Rejecting delete: engine state={}, pressure={}",
615                self.state(),
616                self.pressure()
617            )));
618        }
619
620        let mut found = false;
621
622        // 1. Remove from L1 (immediate)
623        if let Some((_, item)) = self.l1_cache.remove(id) {
624            let size = Self::item_size(&item);
625            self.l1_size_bytes.fetch_sub(size, Ordering::Release);
626            found = true;
627            debug!("Deleted from L1");
628            crate::metrics::record_operation("L1", "delete", "success");
629        }
630
631        // 2. Remove from L3 cuckoo filter only (no L2 filter - TTL makes it unreliable)
632        self.l3_filter.remove(id);
633
634        // 3. Delete from L2 (Redis) - best effort
635        if let Some(ref l2) = self.l2_store {
636            let l2_start = std::time::Instant::now();
637            match l2.delete(id).await {
638                Ok(()) => {
639                    found = true;
640                    debug!("Deleted from L2 (Redis)");
641                    crate::metrics::record_operation("L2", "delete", "success");
642                    crate::metrics::record_latency("L2", "delete", l2_start.elapsed());
643                }
644                Err(e) => {
645                    warn!(error = %e, "Failed to delete from L2 (Redis)");
646                    crate::metrics::record_operation("L2", "delete", "error");
647                    crate::metrics::record_error("L2", "delete", "backend");
648                }
649            }
650        }
651
652        // 4. Delete from L3 (MySQL) - ground truth
653        if let Some(ref l3) = self.l3_store {
654            let l3_start = std::time::Instant::now();
655            match l3.delete(id).await {
656                Ok(()) => {
657                    found = true;
658                    debug!("Deleted from L3 (MySQL)");
659                    crate::metrics::record_operation("L3", "delete", "success");
660                    crate::metrics::record_latency("L3", "delete", l3_start.elapsed());
661                }
662                Err(e) => {
663                    error!(error = %e, "Failed to delete from L3 (MySQL)");
664                    crate::metrics::record_operation("L3", "delete", "error");
665                    crate::metrics::record_error("L3", "delete", "backend");
666                    // Don't return error - item may not exist in L3
667                }
668            }
669        }
670
671        // 5. Update merkle trees with deletion marker
672        let mut merkle_batch = MerkleBatch::new();
673        merkle_batch.delete(id.to_string());
674
675        if let Some(ref sql_merkle) = self.sql_merkle {
676            if let Err(e) = sql_merkle.apply_batch(&merkle_batch).await {
677                error!(error = %e, "Failed to update SQL Merkle tree for deletion");
678                crate::metrics::record_error("L3", "merkle", "batch_apply");
679            }
680        }
681
682        if let Some(ref redis_merkle) = self.redis_merkle {
683            if let Err(e) = redis_merkle.apply_batch(&merkle_batch).await {
684                warn!(error = %e, "Failed to update Redis Merkle tree for deletion");
685                crate::metrics::record_error("L2", "merkle", "batch_apply");
686            }
687        }
688
689        // 6. Emit CDC delete entry (if enabled)
690        if self.config.read().enable_cdc_stream && found {
691            self.emit_cdc_delete(id).await;
692        }
693
694        info!(found, "Delete operation completed");
695        crate::metrics::record_latency("all", "delete", start.elapsed());
696        Ok(found)
697    }
698
699    // --- Internal helpers ---
700
701    /// Insert or update an item in L1, correctly tracking size.
702    fn insert_l1(&self, item: SyncItem) {
703        let new_size = Self::item_size(&item);
704        let key = item.object_id.clone();
705        
706        // Use entry API to handle insert vs update atomically
707        if let Some(old_item) = self.l1_cache.insert(key, item) {
708            // Update: subtract old size, add new size
709            let old_size = Self::item_size(&old_item);
710            // Use wrapping operations to avoid underflow if sizes are estimated
711            let current = self.l1_size_bytes.load(Ordering::Acquire);
712            let new_total = current.saturating_sub(old_size).saturating_add(new_size);
713            self.l1_size_bytes.store(new_total, Ordering::Release);
714        } else {
715            // Insert: just add new size
716            self.l1_size_bytes.fetch_add(new_size, Ordering::Release);
717        }
718    }
719
720    /// Calculate approximate size of an item in bytes.
721    #[inline]
722    fn item_size(item: &SyncItem) -> usize {
723        // Use cached size if available, otherwise compute
724        item.size_bytes()
725    }
726
727    fn maybe_evict(&self) {
728        let pressure = self.memory_pressure();
729        if pressure < self.config.read().backpressure_warn {
730            return;
731        }
732
733        let level = BackpressureLevel::from_pressure(pressure);
734        debug!(pressure = %pressure, level = %level, "Memory pressure detected, running eviction");
735        
736        // Collect cache entries for scoring
737        let now = std::time::Instant::now();
738        let entries: Vec<CacheEntry> = self.l1_cache.iter()
739            .map(|ref_multi| {
740                let item = ref_multi.value();
741                let id = ref_multi.key().clone();
742                
743                // Convert epoch millis to Instant-relative age
744                let now_millis = std::time::SystemTime::now()
745                    .duration_since(std::time::UNIX_EPOCH)
746                    .unwrap_or_default()
747                    .as_millis() as u64;
748                let age_secs = if item.last_accessed > 0 {
749                    (now_millis.saturating_sub(item.last_accessed)) as f64 / 1000.0
750                } else {
751                    3600.0 // Default 1 hour if never accessed
752                };
753                
754                CacheEntry {
755                    id,
756                    size_bytes: item.size_bytes(),
757                    created_at: now - std::time::Duration::from_secs_f64(age_secs),
758                    last_access: now - std::time::Duration::from_secs_f64(age_secs),
759                    access_count: item.access_count,
760                    is_dirty: false, // All items in L1 are assumed flushed to L2/L3
761                }
762            })
763            .collect();
764        
765        if entries.is_empty() {
766            return;
767        }
768        
769        // Calculate how many to evict based on pressure level
770        let evict_count = match level {
771            BackpressureLevel::Normal => 0,
772            BackpressureLevel::Warn => entries.len() / 20,    // 5%
773            BackpressureLevel::Throttle => entries.len() / 10, // 10%
774            BackpressureLevel::Critical => entries.len() / 5,  // 20%
775            BackpressureLevel::Emergency => entries.len() / 3, // 33%
776            BackpressureLevel::Shutdown => entries.len() / 2,  // 50%
777        }.max(1);
778        
779        // Select victims using tan curve algorithm
780        let victims = self.eviction_policy.select_victims(&entries, evict_count, pressure);
781        
782        // Evict victims
783        let mut evicted_bytes = 0usize;
784        for victim_id in &victims {
785            if let Some((_, item)) = self.l1_cache.remove(victim_id) {
786                evicted_bytes += item.size_bytes();
787            }
788        }
789        
790        // Update size tracking
791        self.l1_size_bytes.fetch_sub(evicted_bytes, Ordering::Release);
792        
793        info!(
794            evicted = victims.len(),
795            evicted_bytes = evicted_bytes,
796            pressure = %pressure,
797            level = %level,
798            "Evicted entries from L1 cache"
799        );
800    }
801
802    /// Get L1 cache stats
803    pub fn l1_stats(&self) -> (usize, usize) {
804        (
805            self.l1_cache.len(),
806            self.l1_size_bytes.load(Ordering::Acquire),
807        )
808    }
809
810    /// Get L3 filter stats (entries, capacity, trust_state)
811    #[must_use]
812    pub fn l3_filter_stats(&self) -> (usize, usize, FilterTrust) {
813        self.l3_filter.stats()
814    }
815
816    /// Get access to the L3 filter (for warmup/verification)
817    pub fn l3_filter(&self) -> &Arc<FilterManager> {
818        &self.l3_filter
819    }
820
821    /// Get merkle root hashes from Redis (L2) and SQL (L3).
822    /// 
823    /// Returns `(redis_root, sql_root)` as hex strings.
824    /// Returns `None` for backends that aren't connected or have empty trees.
825    pub async fn merkle_roots(&self) -> (Option<String>, Option<String>) {
826        let redis_root = if let Some(ref rm) = self.redis_merkle {
827            rm.root_hash().await.ok().flatten().map(hex::encode)
828        } else {
829            None
830        };
831        
832        let sql_root = if let Some(ref sm) = self.sql_merkle {
833            sm.root_hash().await.ok().flatten().map(hex::encode)
834        } else {
835            None
836        };
837        
838        (redis_root, sql_root)
839    }
840
841    /// Verify and trust the L3 cuckoo filter.
842    /// 
843    /// Compares the filter's merkle root against L3's merkle root.
844    /// If they match, marks the filter as trusted.
845    /// 
846    /// Returns `true` if the filter is now trusted, `false` otherwise.
847    pub async fn verify_filter(&self) -> bool {
848        // Get SQL merkle root
849        let sql_root = if let Some(ref sm) = self.sql_merkle {
850            match sm.root_hash().await {
851                Ok(Some(root)) => root,
852                _ => return false,
853            }
854        } else {
855            // No SQL backend - can't verify, mark trusted anyway
856            self.l3_filter.mark_trusted();
857            return true;
858        };
859
860        // For now, we trust the filter if we have a SQL root
861        // A full implementation would compare CF merkle against SQL merkle
862        // But since CF doesn't maintain a merkle tree, we trust after warmup
863        info!(
864            sql_root = %hex::encode(sql_root),
865            "Verifying L3 filter against SQL merkle root"
866        );
867        
868        // Mark trusted if we got here (SQL is connected and has a root)
869        self.l3_filter.mark_trusted();
870        true
871    }
872
873    /// Update all gauge metrics with current engine state.
874    /// 
875    /// Call this before snapshotting metrics to ensure gauges reflect current state.
876    /// Useful for OTEL export or monitoring dashboards.
877    pub fn update_gauge_metrics(&self) {
878        let (l1_count, l1_bytes) = self.l1_stats();
879        crate::metrics::set_l1_cache_items(l1_count);
880        crate::metrics::set_l1_cache_bytes(l1_bytes);
881        crate::metrics::set_memory_pressure(self.memory_pressure());
882        
883        let (filter_entries, filter_capacity, _trust) = self.l3_filter_stats();
884        let filter_load = if filter_capacity > 0 { 
885            filter_entries as f64 / filter_capacity as f64 
886        } else { 
887            0.0 
888        };
889        crate::metrics::set_cuckoo_filter_entries("L3", filter_entries);
890        crate::metrics::set_cuckoo_filter_load("L3", filter_load);
891        
892        crate::metrics::set_backpressure_level(self.pressure() as u8);
893    }
894}
895
896#[cfg(test)]
897mod tests {
898    use super::*;
899    use crate::config::SyncEngineConfig;
900    use tokio::sync::watch;
901    use serde_json::json;
902
903    fn create_test_engine() -> SyncEngine {
904        let config = SyncEngineConfig::default();
905        let (_tx, rx) = watch::channel(config.clone());
906        SyncEngine::new(config, rx)
907    }
908
909    fn create_test_item(id: &str) -> SyncItem {
910        SyncItem::from_json(
911            id.to_string(),
912            json!({"data": "test"}),
913        )
914    }
915
916    #[test]
917    fn test_engine_created_state() {
918        let engine = create_test_engine();
919        assert_eq!(engine.state(), EngineState::Created);
920        assert!(!engine.is_ready());
921    }
922
923    #[test]
924    fn test_memory_pressure_calculation() {
925        let config = SyncEngineConfig {
926            l1_max_bytes: 1000,
927            ..Default::default()
928        };
929        let (_tx, rx) = watch::channel(config.clone());
930        let engine = SyncEngine::new(config, rx);
931
932        assert_eq!(engine.memory_pressure(), 0.0);
933
934        // Simulate adding items
935        let item = create_test_item("test1");
936        engine.insert_l1(item);
937
938        // Pressure should be > 0 now
939        assert!(engine.memory_pressure() > 0.0);
940    }
941
942    #[test]
943    fn test_l1_insert_and_size_tracking() {
944        let engine = create_test_engine();
945        
946        let item = create_test_item("test1");
947        let expected_size = item.size_bytes();
948        
949        engine.insert_l1(item);
950        
951        let (count, size) = engine.l1_stats();
952        assert_eq!(count, 1);
953        assert_eq!(size, expected_size);
954    }
955
956    #[test]
957    fn test_l1_update_size_tracking() {
958        let engine = create_test_engine();
959        
960        let item1 = create_test_item("test1");
961        engine.insert_l1(item1);
962        let (_, _size1) = engine.l1_stats();
963        
964        // Insert larger item with same ID
965        let item2 = SyncItem::from_json(
966            "test1".to_string(),
967            json!({"data": "much larger content here for testing size changes"}),
968        );
969        let size2_expected = item2.size_bytes();
970        engine.insert_l1(item2);
971        
972        let (count, size2) = engine.l1_stats();
973        assert_eq!(count, 1); // Still one item
974        assert_eq!(size2, size2_expected); // Size should be updated
975    }
976
977    #[tokio::test]
978    async fn test_get_nonexistent() {
979        let engine = create_test_engine();
980        let result = engine.get("nonexistent").await.unwrap();
981        assert!(result.is_none());
982    }
983
984    #[tokio::test]
985    async fn test_get_from_l1() {
986        let engine = create_test_engine();
987        let item = create_test_item("test1");
988        engine.insert_l1(item.clone());
989
990        let result = engine.get("test1").await.unwrap();
991        assert!(result.is_some());
992        assert_eq!(result.unwrap().object_id, "test1");
993    }
994
995    #[tokio::test]
996    async fn test_delete_from_l1() {
997        let engine = create_test_engine();
998        let item = create_test_item("test1");
999        engine.insert_l1(item);
1000
1001        let (count_before, _) = engine.l1_stats();
1002        assert_eq!(count_before, 1);
1003
1004        let deleted = engine.delete("test1").await.unwrap();
1005        assert!(deleted);
1006
1007        let (count_after, size_after) = engine.l1_stats();
1008        assert_eq!(count_after, 0);
1009        assert_eq!(size_after, 0);
1010    }
1011
1012    #[test]
1013    fn test_filter_stats() {
1014        let engine = create_test_engine();
1015        
1016        let (entries, capacity, _trust) = engine.l3_filter_stats();
1017        assert_eq!(entries, 0);
1018        assert!(capacity > 0);
1019    }
1020
1021    #[test]
1022    fn test_should_accept_writes() {
1023        let engine = create_test_engine();
1024        assert!(engine.should_accept_writes());
1025    }
1026
1027    #[test]
1028    fn test_pressure_level() {
1029        let engine = create_test_engine();
1030        assert_eq!(engine.pressure(), BackpressureLevel::Normal);
1031    }
1032
1033    #[tokio::test]
1034    async fn test_health_check_basic() {
1035        let engine = create_test_engine();
1036        
1037        let health = engine.health_check().await;
1038        
1039        // Engine is in Created state (not started)
1040        assert_eq!(health.state, EngineState::Created);
1041        assert!(!health.ready); // Not ready until started
1042        assert!(!health.healthy); // Not healthy (not Running)
1043        
1044        // Memory should be empty
1045        assert_eq!(health.memory_pressure, 0.0);
1046        assert_eq!(health.l1_items, 0);
1047        assert_eq!(health.l1_bytes, 0);
1048        
1049        // Backpressure should be normal
1050        assert_eq!(health.backpressure_level, BackpressureLevel::Normal);
1051        assert!(health.accepting_writes);
1052        
1053        // No backends configured
1054        assert!(health.redis_connected.is_none());
1055        assert!(health.sql_connected.is_none());
1056        assert!(health.wal_pending_items.is_none());
1057    }
1058}