sync_engine/coordinator/
mod.rs

1// Copyright (c) 2025-2026 Adrian Robinson. Licensed under the AGPL-3.0.
2// See LICENSE file in the project root for full license text.
3
4//! Sync engine coordinator.
5//!
6//! The [`SyncEngine`] is the main orchestrator that ties together all components:
7//! - L1 in-memory cache with eviction
8//! - L2 Redis cache with batch writes
9//! - L3 MySQL/SQLite archive with WAL durability
10//! - Cuckoo filters for existence checks
11//! - Merkle trees for sync verification
12//!
13//! # Lifecycle
14//!
15//! ```text
16//! Created → Connecting → DrainingWal → SyncingRedis → WarmingUp → Ready → Running → ShuttingDown
17//! ```
18//!
19//! # Example
20//!
21//! ```rust,no_run
22//! use sync_engine::{SyncEngine, SyncEngineConfig, SyncItem, EngineState};
23//! use serde_json::json;
24//! use tokio::sync::watch;
25//!
26//! # #[tokio::main]
27//! # async fn main() {
28//! let config = SyncEngineConfig::default();
29//! let (_tx, rx) = watch::channel(config.clone());
30//! let mut engine = SyncEngine::new(config, rx);
31//!
32//! assert_eq!(engine.state(), EngineState::Created);
33//!
34//! // engine.start().await.expect("Start failed");
35//! // assert!(engine.is_ready());
36//! # }
37//! ```
38
39mod types;
40mod api;
41mod lifecycle;
42mod flush;
43mod merkle_api;
44mod search_api;
45mod schema_api;
46
47pub use types::{EngineState, ItemStatus, BatchResult, HealthCheck};
48pub use merkle_api::MerkleDiff;
49pub use search_api::{SearchTier, SearchResult, SearchSource};
50#[allow(unused_imports)]
51use types::WriteTarget;
52
53use std::sync::Arc;
54use std::sync::atomic::{AtomicUsize, AtomicU64, Ordering};
55use std::time::Instant;
56use dashmap::DashMap;
57use parking_lot::RwLock;
58use tokio::sync::{watch, Mutex, Semaphore};
59use tracing::{info, warn, debug, error};
60
61use crate::config::SyncEngineConfig;
62use crate::sync_item::SyncItem;
63use crate::submit_options::SubmitOptions;
64use crate::backpressure::BackpressureLevel;
65use crate::storage::traits::{CacheStore, ArchiveStore, StorageError};
66use crate::storage::sql::SqlStore;
67use crate::cuckoo::filter_manager::{FilterManager, FilterTrust};
68use crate::cuckoo::FilterPersistence;
69use crate::batching::hybrid_batcher::{HybridBatcher, BatchConfig, SizedItem};
70use crate::merkle::{RedisMerkleStore, SqlMerkleStore, MerkleBatch};
71use crate::resilience::wal::{WriteAheadLog, MysqlHealthChecker};
72use crate::eviction::tan_curve::{TanCurvePolicy, CacheEntry};
73use crate::schema::SchemaRegistry;
74
75use search_api::SearchState;
76
77/// Main sync engine coordinator.
78///
79/// Manages the three-tier storage architecture:
80/// - **L1**: In-memory DashMap with pressure-based eviction
81/// - **L2**: Redis cache with batch writes
82/// - **L3**: MySQL/SQLite archive (ground truth)
83///
84/// # Thread Safety
85///
86/// The engine is `Send + Sync` and designed for concurrent access.
87/// Internal state uses atomic operations and concurrent data structures.
88pub struct SyncEngine {
89    /// Configuration (can be updated at runtime via watch channel)
90    /// Uses RwLock for interior mutability so run() can take &self
91    pub(super) config: RwLock<SyncEngineConfig>,
92
93    /// Runtime config updates (Mutex for interior mutability in run loop)
94    pub(super) config_rx: Mutex<watch::Receiver<SyncEngineConfig>>,
95
96    /// Engine state (broadcast to watchers)
97    pub(super) state: watch::Sender<EngineState>,
98
99    /// Engine state receiver (for internal use)
100    pub(super) state_rx: watch::Receiver<EngineState>,
101
102    /// L1: In-memory cache
103    pub(super) l1_cache: Arc<DashMap<String, SyncItem>>,
104
105    /// L1 size tracking (bytes)
106    pub(super) l1_size_bytes: Arc<AtomicUsize>,
107
108    /// L2: Redis cache (optional)
109    pub(super) l2_store: Option<Arc<dyn CacheStore>>,
110    
111    /// L2: Direct RedisStore reference for CDC operations
112    pub(super) redis_store: Option<Arc<crate::storage::redis::RedisStore>>,
113
114    /// L3: MySQL/SQLite archive (optional)
115    pub(super) l3_store: Option<Arc<dyn ArchiveStore>>,
116    
117    /// L3: Direct SqlStore reference for dirty merkle operations
118    pub(super) sql_store: Option<Arc<SqlStore>>,
119
120    /// L3 Cuckoo filter (L2 has no filter - TTL makes it unreliable)
121    pub(super) l3_filter: Arc<FilterManager>,
122
123    /// Filter persistence (for fast startup)
124    pub(super) filter_persistence: Option<FilterPersistence>,
125
126    /// CF snapshot tracking
127    pub(super) cf_inserts_since_snapshot: AtomicU64,
128    pub(super) cf_last_snapshot: Mutex<Instant>,
129
130    /// Hybrid batcher for L2 writes
131    pub(super) l2_batcher: Mutex<HybridBatcher<SyncItem>>,
132
133    /// Redis merkle store
134    pub(super) redis_merkle: Option<RedisMerkleStore>,
135
136    /// SQL merkle store
137    pub(super) sql_merkle: Option<SqlMerkleStore>,
138
139    /// Write-ahead log for L3 durability
140    pub(super) l3_wal: Option<WriteAheadLog>,
141
142    /// MySQL health checker
143    pub(super) mysql_health: MysqlHealthChecker,
144
145    /// Eviction policy
146    pub(super) eviction_policy: TanCurvePolicy,
147
148    /// Search state (index manager + cache)
149    pub(super) search_state: Option<Arc<RwLock<SearchState>>>,
150
151    /// SQL write concurrency limiter.
152    /// 
153    /// Limits concurrent sql_put_batch and merkle_nodes updates to reduce
154    /// row-level lock contention and deadlocks under high load.
155    pub(super) sql_write_semaphore: Arc<Semaphore>,
156
157    /// Schema registry for table routing.
158    ///
159    /// Maps key prefixes to separate SQL tables for horizontal scaling.
160    /// Shared with SqlStore for consistent routing.
161    pub(super) schema_registry: Arc<SchemaRegistry>,
162}
163
164impl SyncEngine {
165    /// Create a new sync engine.
166    ///
167    /// The engine starts in `Created` state. Call [`start()`](Self::start)
168    /// to connect to backends and transition to `Ready`.
169    pub fn new(config: SyncEngineConfig, config_rx: watch::Receiver<SyncEngineConfig>) -> Self {
170        let (state_tx, state_rx) = watch::channel(EngineState::Created);
171
172        let batch_config = BatchConfig {
173            flush_ms: config.batch_flush_ms,
174            flush_count: config.batch_flush_count,
175            flush_bytes: config.batch_flush_bytes,
176        };
177
178        // SQL write concurrency limiter - reduces row lock contention
179        let sql_write_semaphore = Arc::new(Semaphore::new(config.sql_write_concurrency));
180
181        // Schema registry for table routing (starts empty, populated via register_schema)
182        let schema_registry = Arc::new(SchemaRegistry::new());
183
184        Self {
185            config: RwLock::new(config.clone()),
186            config_rx: Mutex::new(config_rx),
187            state: state_tx,
188            state_rx,
189            l1_cache: Arc::new(DashMap::new()),
190            l1_size_bytes: Arc::new(AtomicUsize::new(0)),
191            l2_store: None,
192            redis_store: None,
193            l3_store: None,
194            sql_store: None,
195            l3_filter: Arc::new(FilterManager::new("sync-engine-l3", 100_000)),
196            filter_persistence: None,
197            cf_inserts_since_snapshot: AtomicU64::new(0),
198            cf_last_snapshot: Mutex::new(Instant::now()),
199            l2_batcher: Mutex::new(HybridBatcher::new(batch_config)),
200            redis_merkle: None,
201            sql_merkle: None,
202            l3_wal: None,
203            mysql_health: MysqlHealthChecker::new(),
204            eviction_policy: TanCurvePolicy::default(),
205            search_state: Some(Arc::new(RwLock::new(SearchState::default()))),
206            sql_write_semaphore,
207            schema_registry,
208        }
209    }
210
211    /// Get current engine state.
212    #[must_use]
213    pub fn state(&self) -> EngineState {
214        *self.state_rx.borrow()
215    }
216
217    /// Get a receiver to watch state changes.
218    #[must_use]
219    pub fn state_receiver(&self) -> watch::Receiver<EngineState> {
220        self.state_rx.clone()
221    }
222
223    /// Check if engine is ready to accept requests.
224    #[must_use]
225    pub fn is_ready(&self) -> bool {
226        matches!(self.state(), EngineState::Ready | EngineState::Running)
227    }
228
229    /// Get current memory pressure (0.0 - 1.0+).
230    #[must_use]
231    pub fn memory_pressure(&self) -> f64 {
232        let used = self.l1_size_bytes.load(Ordering::Acquire);
233        let max = self.config.read().l1_max_bytes;
234        if max == 0 {
235            0.0
236        } else {
237            used as f64 / max as f64
238        }
239    }
240
241    /// Get current backpressure level.
242    #[must_use]
243    pub fn pressure(&self) -> BackpressureLevel {
244        BackpressureLevel::from_pressure(self.memory_pressure())
245    }
246
247    /// Check if the engine should accept writes (based on pressure).
248    #[must_use]
249    pub fn should_accept_writes(&self) -> bool {
250        let pressure = self.pressure();
251        !matches!(pressure, BackpressureLevel::Emergency | BackpressureLevel::Shutdown)
252    }
253
254    /// Perform a comprehensive health check.
255    ///
256    /// This probes backend connectivity (Redis PING, SQL SELECT 1) and
257    /// collects internal state into a [`HealthCheck`] struct suitable for
258    /// `/ready` and `/health` endpoints.
259    ///
260    /// # Performance
261    ///
262    /// - **Cached fields**: Instant (no I/O)
263    /// - **Live probes**: Redis PING + SQL SELECT 1 (parallel, ~1-10ms each)
264    ///
265    /// # Example
266    ///
267    /// ```rust,ignore
268    /// let health = engine.health_check().await;
269    ///
270    /// // For /ready endpoint (load balancer)
271    /// if health.healthy {
272    ///     HttpResponse::Ok().body("ready")
273    /// } else {
274    ///     HttpResponse::ServiceUnavailable().body("not ready")
275    /// }
276    ///
277    /// // For /health endpoint (diagnostics)
278    /// HttpResponse::Ok().json(health)
279    /// ```
280    pub async fn health_check(&self) -> types::HealthCheck {
281        // Cached state (no I/O)
282        let state = self.state();
283        let ready = matches!(state, EngineState::Ready | EngineState::Running);
284        let memory_pressure = self.memory_pressure();
285        let backpressure_level = self.pressure();
286        let accepting_writes = self.should_accept_writes();
287        let (l1_items, l1_bytes) = self.l1_stats();
288        let (filter_items, _, filter_trust) = self.l3_filter_stats();
289        
290        // WAL stats (cached, no I/O)
291        // Note: WalStats doesn't have file_size, so we only report pending items
292        let mysql_healthy = self.mysql_health.is_healthy();
293        let wal_pending_items = if let Some(ref wal) = self.l3_wal {
294            let stats = wal.stats(mysql_healthy);
295            Some(stats.pending_items)
296        } else {
297            None
298        };
299        
300        // Live backend probes (parallel)
301        let (redis_result, sql_result) = tokio::join!(
302            self.probe_redis(),
303            self.probe_sql()
304        );
305        
306        let (redis_connected, redis_latency_ms) = redis_result;
307        let (sql_connected, sql_latency_ms) = sql_result;
308        
309        // Derive overall health
310        // Healthy if: running, backends connected (or not configured), not in critical backpressure
311        let healthy = matches!(state, EngineState::Running)
312            && redis_connected != Some(false)
313            && sql_connected != Some(false)
314            && !matches!(backpressure_level, BackpressureLevel::Emergency | BackpressureLevel::Shutdown);
315        
316        types::HealthCheck {
317            state,
318            ready,
319            memory_pressure,
320            backpressure_level,
321            accepting_writes,
322            l1_items,
323            l1_bytes,
324            filter_items,
325            filter_trust,
326            redis_connected,
327            redis_latency_ms,
328            sql_connected,
329            sql_latency_ms,
330            wal_pending_items,
331            healthy,
332        }
333    }
334    
335    /// Probe Redis connectivity with PING.
336    async fn probe_redis(&self) -> (Option<bool>, Option<u64>) {
337        let Some(ref redis_store) = self.redis_store else {
338            return (None, None); // Redis not configured
339        };
340        
341        let start = std::time::Instant::now();
342        let mut conn = redis_store.connection();
343        
344        let result: Result<String, _> = redis::cmd("PING")
345            .query_async(&mut conn)
346            .await;
347        
348        match result {
349            Ok(_) => {
350                let latency_ms = start.elapsed().as_millis() as u64;
351                (Some(true), Some(latency_ms))
352            }
353            Err(_) => (Some(false), None),
354        }
355    }
356    
357    /// Probe SQL connectivity with SELECT 1.
358    async fn probe_sql(&self) -> (Option<bool>, Option<u64>) {
359        let Some(ref sql_store) = self.sql_store else {
360            return (None, None); // SQL not configured
361        };
362        
363        let start = std::time::Instant::now();
364        let pool = sql_store.pool();
365        
366        let result = sqlx::query("SELECT 1")
367            .fetch_one(&pool)
368            .await;
369        
370        match result {
371            Ok(_) => {
372                let latency_ms = start.elapsed().as_millis() as u64;
373                (Some(true), Some(latency_ms))
374            }
375            Err(_) => (Some(false), None),
376        }
377    }
378
379    // --- Core CRUD Operations ---
380
381    /// Get an item by ID.
382    ///
383    /// Checks storage tiers in order: L1 → L2 → L3.
384    /// Updates access count and promotes to L1 on hit.
385    #[tracing::instrument(skip(self), fields(tier))]
386    pub async fn get(&self, id: &str) -> Result<Option<SyncItem>, StorageError> {
387        let start = std::time::Instant::now();
388        
389        // 1. Check L1 (in-memory)
390        if let Some(mut item) = self.l1_cache.get_mut(id) {
391            item.access_count = item.access_count.saturating_add(1);
392            item.last_accessed = std::time::SystemTime::now()
393                .duration_since(std::time::UNIX_EPOCH)
394                .unwrap_or_default()
395                .as_millis() as u64;
396            tracing::Span::current().record("tier", "L1");
397            debug!("L1 hit");
398            crate::metrics::record_operation("L1", "get", "hit");
399            crate::metrics::record_latency("L1", "get", start.elapsed());
400            return Ok(Some(item.clone()));
401        }
402
403        // 2. Try L2 (Redis) - no filter, just try it
404        if let Some(ref l2) = self.l2_store {
405            match l2.get(id).await {
406                Ok(Some(item)) => {
407                    // Promote to L1
408                    self.insert_l1(item.clone());
409                    tracing::Span::current().record("tier", "L2");
410                    debug!("L2 hit, promoted to L1");
411                    crate::metrics::record_operation("L2", "get", "hit");
412                    crate::metrics::record_latency("L2", "get", start.elapsed());
413                    return Ok(Some(item));
414                }
415                Ok(None) => {
416                    // Not in Redis
417                    debug!("L2 miss");
418                    crate::metrics::record_operation("L2", "get", "miss");
419                }
420                Err(e) => {
421                    warn!(error = %e, "L2 lookup failed");
422                    crate::metrics::record_operation("L2", "get", "error");
423                }
424            }
425        }
426
427        // 3. Check L3 filter before hitting MySQL
428        if self.l3_filter.should_check_l3(id) {
429            crate::metrics::record_cuckoo_check("L3", "positive");
430            if let Some(ref l3) = self.l3_store {
431                match l3.get(id).await {
432                    Ok(Some(item)) => {
433                        // Promote to L1
434                        if self.memory_pressure() < 1.0 {
435                            self.insert_l1(item.clone());
436                        }
437                        
438                        // Also populate L2 (read-through cache)
439                        if let Some(ref l2) = self.l2_store {
440                            if let Err(e) = l2.put(&item).await {
441                                warn!(id = %id, error = %e, "Failed to populate L2 on read");
442                            } else {
443                                debug!("L3 hit, promoted to L1 and L2");
444                            }
445                        } else {
446                            debug!("L3 hit, promoted to L1");
447                        }
448                        
449                        tracing::Span::current().record("tier", "L3");
450                        crate::metrics::record_operation("L3", "get", "hit");
451                        crate::metrics::record_latency("L3", "get", start.elapsed());
452                        crate::metrics::record_bytes_read("L3", item.content.len());
453                        return Ok(Some(item));
454                    }
455                    Ok(None) => {
456                        // False positive in filter
457                        debug!("L3 filter false positive");
458                        crate::metrics::record_operation("L3", "get", "false_positive");
459                        crate::metrics::record_cuckoo_false_positive("L3");
460                    }
461                    Err(e) => {
462                        warn!(error = %e, "L3 lookup failed");
463                        crate::metrics::record_operation("L3", "get", "error");
464                        crate::metrics::record_error("L3", "get", "backend");
465                    }
466                }
467            }
468        } else {
469            // Cuckoo filter says definitely not in L3 - saved a database query
470            crate::metrics::record_cuckoo_check("L3", "negative");
471        }
472
473        tracing::Span::current().record("tier", "miss");
474        debug!("Cache miss");
475        crate::metrics::record_operation("all", "get", "miss");
476        crate::metrics::record_latency("all", "get", start.elapsed());
477        Ok(None)
478    }
479
480    /// Get an item with hash verification.
481    ///
482    /// If the item has a non-empty `content_hash`, the content hash is verified.
483    /// Returns `StorageError::Corruption` if the hash doesn't match.
484    #[tracing::instrument(skip(self), fields(verified))]
485    pub async fn get_verified(&self, id: &str) -> Result<Option<SyncItem>, StorageError> {
486        let item = match self.get(id).await? {
487            Some(item) => item,
488            None => return Ok(None),
489        };
490
491        // Verify hash if item has content_hash set
492        if !item.content_hash.is_empty() {
493            use sha2::{Sha256, Digest};
494            
495            let computed = Sha256::digest(&item.content);
496            let computed_hex = hex::encode(computed);
497            
498            if computed_hex != item.content_hash {
499                tracing::Span::current().record("verified", false);
500                warn!(
501                    id = %id,
502                    expected = %item.content_hash,
503                    actual = %computed_hex,
504                    "Data corruption detected!"
505                );
506                
507                // Record corruption metric
508                crate::metrics::record_corruption(id);
509                
510                return Err(StorageError::Corruption {
511                    id: id.to_string(),
512                    expected: item.content_hash.clone(),
513                    actual: computed_hex,
514                });
515            }
516            
517            tracing::Span::current().record("verified", true);
518            debug!(id = %id, "Hash verification passed");
519        }
520
521        Ok(Some(item))
522    }
523
524    /// Submit an item for sync.
525    ///
526    /// The item is immediately stored in L1 and queued for batch write to L2/L3.
527    /// Uses default options: Redis + SQL (both enabled).
528    /// Filters are updated only on successful writes in flush_batch_internal().
529    ///
530    /// For custom routing, use [`submit_with`](Self::submit_with).
531    #[tracing::instrument(skip(self, item), fields(object_id = %item.object_id))]
532    pub async fn submit(&self, item: SyncItem) -> Result<(), StorageError> {
533        self.submit_with(item, SubmitOptions::default()).await
534    }
535
536    /// Submit an item with custom routing options.
537    ///
538    /// The item is immediately stored in L1 and queued for batch write.
539    /// Items are batched by compatible options for efficient pipelined writes.
540    ///
541    /// # Example
542    ///
543    /// ```rust,no_run
544    /// # use sync_engine::{SyncEngine, SyncItem, SubmitOptions, CacheTtl};
545    /// # async fn example(engine: &SyncEngine) -> Result<(), sync_engine::StorageError> {
546    /// // Cache-only with 1 minute TTL (no SQL write)
547    /// let item = SyncItem::new("cache.key".into(), b"data".to_vec());
548    /// engine.submit_with(item, SubmitOptions::cache(CacheTtl::Minute)).await?;
549    ///
550    /// // SQL-only durable storage (no Redis)
551    /// let item = SyncItem::new("archive.key".into(), b"data".to_vec());
552    /// engine.submit_with(item, SubmitOptions::durable()).await?;
553    /// # Ok(())
554    /// # }
555    /// ```
556    #[tracing::instrument(skip(self, item, options), fields(object_id = %item.object_id, redis = options.redis, sql = options.sql))]
557    pub async fn submit_with(&self, mut item: SyncItem, options: SubmitOptions) -> Result<(), StorageError> {
558        let start = std::time::Instant::now();
559        
560        if !self.should_accept_writes() {
561            crate::metrics::record_operation("engine", "submit", "rejected");
562            crate::metrics::record_error("engine", "submit", "backpressure");
563            return Err(StorageError::Backend(format!(
564                "Rejecting write: engine state={}, pressure={}",
565                self.state(),
566                self.pressure()
567            )));
568        }
569
570        let id = item.object_id.clone();
571        let item_bytes = item.content.len();
572
573        // Apply state override from options (if provided)
574        if let Some(ref state) = options.state {
575            item.state = state.clone();
576        }
577        
578        // Attach options to item (travels through batch pipeline)
579        item.submit_options = Some(options);
580
581        // Insert into L1 (immediate, in-memory)
582        self.insert_l1(item.clone());
583        crate::metrics::record_operation("L1", "submit", "success");
584        crate::metrics::record_bytes_written("L1", item_bytes);
585        
586        // NOTE: We do NOT insert into L2/L3 filters here!
587        // Filters are updated only on SUCCESSFUL writes in flush_batch_internal()
588        // This prevents filter/storage divergence if writes fail.
589        
590        // Queue for batched L2/L3 persistence
591        let batch_to_flush = {
592            let mut batcher = self.l2_batcher.lock().await;
593            if let Some(reason) = batcher.add(item) {
594                batcher.force_flush_with_reason(reason)
595            } else {
596                None
597            }
598        };
599
600        if let Some(batch) = batch_to_flush {
601            // Flush immediately (provides backpressure)
602            self.flush_batch_internal(batch).await;
603        }
604
605        debug!(id = %id, "Item submitted to L1 and batch queue");
606        crate::metrics::record_latency("L1", "submit", start.elapsed());
607        Ok(())
608    }
609
610    /// Delete an item from all storage tiers.
611    /// 
612    /// Deletes are more complex than writes because the item may exist in:
613    /// - L1 (DashMap) - immediate removal
614    /// - L2 (Redis) - async removal
615    /// - L3 (MySQL) - async removal  
616    /// - Cuckoo filters (L2/L3) - remove from both
617    /// - Merkle trees - update with deletion marker
618    #[tracing::instrument(skip(self), fields(object_id = %id))]
619    pub async fn delete(&self, id: &str) -> Result<bool, StorageError> {
620        let start = std::time::Instant::now();
621        
622        if !self.should_accept_writes() {
623            crate::metrics::record_operation("engine", "delete", "rejected");
624            crate::metrics::record_error("engine", "delete", "backpressure");
625            return Err(StorageError::Backend(format!(
626                "Rejecting delete: engine state={}, pressure={}",
627                self.state(),
628                self.pressure()
629            )));
630        }
631
632        let mut found = false;
633
634        // 1. Remove from L1 (immediate)
635        if let Some((_, item)) = self.l1_cache.remove(id) {
636            let size = Self::item_size(&item);
637            self.l1_size_bytes.fetch_sub(size, Ordering::Release);
638            found = true;
639            debug!("Deleted from L1");
640            crate::metrics::record_operation("L1", "delete", "success");
641        }
642
643        // 2. Remove from L3 cuckoo filter only (no L2 filter - TTL makes it unreliable)
644        self.l3_filter.remove(id);
645
646        // 3. Delete from L2 (Redis) - best effort
647        if let Some(ref l2) = self.l2_store {
648            let l2_start = std::time::Instant::now();
649            match l2.delete(id).await {
650                Ok(()) => {
651                    found = true;
652                    debug!("Deleted from L2 (Redis)");
653                    crate::metrics::record_operation("L2", "delete", "success");
654                    crate::metrics::record_latency("L2", "delete", l2_start.elapsed());
655                }
656                Err(e) => {
657                    warn!(error = %e, "Failed to delete from L2 (Redis)");
658                    crate::metrics::record_operation("L2", "delete", "error");
659                    crate::metrics::record_error("L2", "delete", "backend");
660                }
661            }
662        }
663
664        // 4. Delete from L3 (MySQL) - ground truth
665        if let Some(ref l3) = self.l3_store {
666            let l3_start = std::time::Instant::now();
667            match l3.delete(id).await {
668                Ok(()) => {
669                    found = true;
670                    debug!("Deleted from L3 (MySQL)");
671                    crate::metrics::record_operation("L3", "delete", "success");
672                    crate::metrics::record_latency("L3", "delete", l3_start.elapsed());
673                }
674                Err(e) => {
675                    error!(error = %e, "Failed to delete from L3 (MySQL)");
676                    crate::metrics::record_operation("L3", "delete", "error");
677                    crate::metrics::record_error("L3", "delete", "backend");
678                    // Don't return error - item may not exist in L3
679                }
680            }
681        }
682
683        // 5. Update merkle trees with deletion marker
684        let mut merkle_batch = MerkleBatch::new();
685        merkle_batch.delete(id.to_string());
686
687        if let Some(ref sql_merkle) = self.sql_merkle {
688            if let Err(e) = sql_merkle.apply_batch(&merkle_batch).await {
689                error!(error = %e, "Failed to update SQL Merkle tree for deletion");
690                crate::metrics::record_error("L3", "merkle", "batch_apply");
691            }
692        }
693
694        if let Some(ref redis_merkle) = self.redis_merkle {
695            if let Err(e) = redis_merkle.apply_batch(&merkle_batch).await {
696                warn!(error = %e, "Failed to update Redis Merkle tree for deletion");
697                crate::metrics::record_error("L2", "merkle", "batch_apply");
698            }
699        }
700
701        // 6. Emit CDC delete entry (if enabled)
702        if self.config.read().enable_cdc_stream && found {
703            self.emit_cdc_delete(id).await;
704        }
705
706        info!(found, "Delete operation completed");
707        crate::metrics::record_latency("all", "delete", start.elapsed());
708        Ok(found)
709    }
710
711    // --- Internal helpers ---
712
713    /// Insert or update an item in L1, correctly tracking size.
714    fn insert_l1(&self, item: SyncItem) {
715        let new_size = Self::item_size(&item);
716        let key = item.object_id.clone();
717        
718        // Use entry API to handle insert vs update atomically
719        if let Some(old_item) = self.l1_cache.insert(key, item) {
720            // Update: subtract old size, add new size
721            let old_size = Self::item_size(&old_item);
722            // Use wrapping operations to avoid underflow if sizes are estimated
723            let current = self.l1_size_bytes.load(Ordering::Acquire);
724            let new_total = current.saturating_sub(old_size).saturating_add(new_size);
725            self.l1_size_bytes.store(new_total, Ordering::Release);
726        } else {
727            // Insert: just add new size
728            self.l1_size_bytes.fetch_add(new_size, Ordering::Release);
729        }
730    }
731
732    /// Calculate approximate size of an item in bytes.
733    #[inline]
734    fn item_size(item: &SyncItem) -> usize {
735        // Use cached size if available, otherwise compute
736        item.size_bytes()
737    }
738
739    fn maybe_evict(&self) {
740        let pressure = self.memory_pressure();
741        if pressure < self.config.read().backpressure_warn {
742            return;
743        }
744
745        let level = BackpressureLevel::from_pressure(pressure);
746        debug!(pressure = %pressure, level = %level, "Memory pressure detected, running eviction");
747        
748        // Collect cache entries for scoring
749        let now = std::time::Instant::now();
750        let entries: Vec<CacheEntry> = self.l1_cache.iter()
751            .map(|ref_multi| {
752                let item = ref_multi.value();
753                let id = ref_multi.key().clone();
754                
755                // Convert epoch millis to Instant-relative age
756                let now_millis = std::time::SystemTime::now()
757                    .duration_since(std::time::UNIX_EPOCH)
758                    .unwrap_or_default()
759                    .as_millis() as u64;
760                let age_secs = if item.last_accessed > 0 {
761                    (now_millis.saturating_sub(item.last_accessed)) as f64 / 1000.0
762                } else {
763                    3600.0 // Default 1 hour if never accessed
764                };
765                
766                CacheEntry {
767                    id,
768                    size_bytes: item.size_bytes(),
769                    created_at: now - std::time::Duration::from_secs_f64(age_secs),
770                    last_access: now - std::time::Duration::from_secs_f64(age_secs),
771                    access_count: item.access_count,
772                    is_dirty: false, // All items in L1 are assumed flushed to L2/L3
773                }
774            })
775            .collect();
776        
777        if entries.is_empty() {
778            return;
779        }
780        
781        // Calculate how many to evict based on pressure level
782        let evict_count = match level {
783            BackpressureLevel::Normal => 0,
784            BackpressureLevel::Warn => entries.len() / 20,    // 5%
785            BackpressureLevel::Throttle => entries.len() / 10, // 10%
786            BackpressureLevel::Critical => entries.len() / 5,  // 20%
787            BackpressureLevel::Emergency => entries.len() / 3, // 33%
788            BackpressureLevel::Shutdown => entries.len() / 2,  // 50%
789        }.max(1);
790        
791        // Select victims using tan curve algorithm
792        let victims = self.eviction_policy.select_victims(&entries, evict_count, pressure);
793        
794        // Evict victims
795        let mut evicted_bytes = 0usize;
796        for victim_id in &victims {
797            if let Some((_, item)) = self.l1_cache.remove(victim_id) {
798                evicted_bytes += item.size_bytes();
799            }
800        }
801        
802        // Update size tracking
803        self.l1_size_bytes.fetch_sub(evicted_bytes, Ordering::Release);
804        
805        info!(
806            evicted = victims.len(),
807            evicted_bytes = evicted_bytes,
808            pressure = %pressure,
809            level = %level,
810            "Evicted entries from L1 cache"
811        );
812    }
813
814    /// Get L1 cache stats
815    pub fn l1_stats(&self) -> (usize, usize) {
816        (
817            self.l1_cache.len(),
818            self.l1_size_bytes.load(Ordering::Acquire),
819        )
820    }
821
822    /// Get L3 filter stats (entries, capacity, trust_state)
823    #[must_use]
824    pub fn l3_filter_stats(&self) -> (usize, usize, FilterTrust) {
825        self.l3_filter.stats()
826    }
827
828    /// Get access to the L3 filter (for warmup/verification)
829    pub fn l3_filter(&self) -> &Arc<FilterManager> {
830        &self.l3_filter
831    }
832
833    /// Get merkle root hashes from Redis (L2) and SQL (L3).
834    /// 
835    /// Returns `(redis_root, sql_root)` as hex strings.
836    /// Returns `None` for backends that aren't connected or have empty trees.
837    pub async fn merkle_roots(&self) -> (Option<String>, Option<String>) {
838        let redis_root = if let Some(ref rm) = self.redis_merkle {
839            rm.root_hash().await.ok().flatten().map(hex::encode)
840        } else {
841            None
842        };
843        
844        let sql_root = if let Some(ref sm) = self.sql_merkle {
845            sm.root_hash().await.ok().flatten().map(hex::encode)
846        } else {
847            None
848        };
849        
850        (redis_root, sql_root)
851    }
852
853    /// Verify and trust the L3 cuckoo filter.
854    /// 
855    /// Compares the filter's merkle root against L3's merkle root.
856    /// If they match, marks the filter as trusted.
857    /// 
858    /// Returns `true` if the filter is now trusted, `false` otherwise.
859    pub async fn verify_filter(&self) -> bool {
860        // Get SQL merkle root
861        let sql_root = if let Some(ref sm) = self.sql_merkle {
862            match sm.root_hash().await {
863                Ok(Some(root)) => root,
864                _ => return false,
865            }
866        } else {
867            // No SQL backend - can't verify, mark trusted anyway
868            self.l3_filter.mark_trusted();
869            return true;
870        };
871
872        // For now, we trust the filter if we have a SQL root
873        // A full implementation would compare CF merkle against SQL merkle
874        // But since CF doesn't maintain a merkle tree, we trust after warmup
875        info!(
876            sql_root = %hex::encode(sql_root),
877            "Verifying L3 filter against SQL merkle root"
878        );
879        
880        // Mark trusted if we got here (SQL is connected and has a root)
881        self.l3_filter.mark_trusted();
882        true
883    }
884
885    /// Update all gauge metrics with current engine state.
886    /// 
887    /// Call this before snapshotting metrics to ensure gauges reflect current state.
888    /// Useful for OTEL export or monitoring dashboards.
889    pub fn update_gauge_metrics(&self) {
890        let (l1_count, l1_bytes) = self.l1_stats();
891        crate::metrics::set_l1_cache_items(l1_count);
892        crate::metrics::set_l1_cache_bytes(l1_bytes);
893        crate::metrics::set_memory_pressure(self.memory_pressure());
894        
895        let (filter_entries, filter_capacity, _trust) = self.l3_filter_stats();
896        let filter_load = if filter_capacity > 0 { 
897            filter_entries as f64 / filter_capacity as f64 
898        } else { 
899            0.0 
900        };
901        crate::metrics::set_cuckoo_filter_entries("L3", filter_entries);
902        crate::metrics::set_cuckoo_filter_load("L3", filter_load);
903        
904        crate::metrics::set_backpressure_level(self.pressure() as u8);
905    }
906}
907
908#[cfg(test)]
909mod tests {
910    use super::*;
911    use crate::config::SyncEngineConfig;
912    use tokio::sync::watch;
913    use serde_json::json;
914
915    fn create_test_engine() -> SyncEngine {
916        let config = SyncEngineConfig::default();
917        let (_tx, rx) = watch::channel(config.clone());
918        SyncEngine::new(config, rx)
919    }
920
921    fn create_test_item(id: &str) -> SyncItem {
922        SyncItem::from_json(
923            id.to_string(),
924            json!({"data": "test"}),
925        )
926    }
927
928    #[test]
929    fn test_engine_created_state() {
930        let engine = create_test_engine();
931        assert_eq!(engine.state(), EngineState::Created);
932        assert!(!engine.is_ready());
933    }
934
935    #[test]
936    fn test_memory_pressure_calculation() {
937        let config = SyncEngineConfig {
938            l1_max_bytes: 1000,
939            ..Default::default()
940        };
941        let (_tx, rx) = watch::channel(config.clone());
942        let engine = SyncEngine::new(config, rx);
943
944        assert_eq!(engine.memory_pressure(), 0.0);
945
946        // Simulate adding items
947        let item = create_test_item("test1");
948        engine.insert_l1(item);
949
950        // Pressure should be > 0 now
951        assert!(engine.memory_pressure() > 0.0);
952    }
953
954    #[test]
955    fn test_l1_insert_and_size_tracking() {
956        let engine = create_test_engine();
957        
958        let item = create_test_item("test1");
959        let expected_size = item.size_bytes();
960        
961        engine.insert_l1(item);
962        
963        let (count, size) = engine.l1_stats();
964        assert_eq!(count, 1);
965        assert_eq!(size, expected_size);
966    }
967
968    #[test]
969    fn test_l1_update_size_tracking() {
970        let engine = create_test_engine();
971        
972        let item1 = create_test_item("test1");
973        engine.insert_l1(item1);
974        let (_, _size1) = engine.l1_stats();
975        
976        // Insert larger item with same ID
977        let item2 = SyncItem::from_json(
978            "test1".to_string(),
979            json!({"data": "much larger content here for testing size changes"}),
980        );
981        let size2_expected = item2.size_bytes();
982        engine.insert_l1(item2);
983        
984        let (count, size2) = engine.l1_stats();
985        assert_eq!(count, 1); // Still one item
986        assert_eq!(size2, size2_expected); // Size should be updated
987    }
988
989    #[tokio::test]
990    async fn test_get_nonexistent() {
991        let engine = create_test_engine();
992        let result = engine.get("nonexistent").await.unwrap();
993        assert!(result.is_none());
994    }
995
996    #[tokio::test]
997    async fn test_get_from_l1() {
998        let engine = create_test_engine();
999        let item = create_test_item("test1");
1000        engine.insert_l1(item.clone());
1001
1002        let result = engine.get("test1").await.unwrap();
1003        assert!(result.is_some());
1004        assert_eq!(result.unwrap().object_id, "test1");
1005    }
1006
1007    #[tokio::test]
1008    async fn test_delete_from_l1() {
1009        let engine = create_test_engine();
1010        let item = create_test_item("test1");
1011        engine.insert_l1(item);
1012
1013        let (count_before, _) = engine.l1_stats();
1014        assert_eq!(count_before, 1);
1015
1016        let deleted = engine.delete("test1").await.unwrap();
1017        assert!(deleted);
1018
1019        let (count_after, size_after) = engine.l1_stats();
1020        assert_eq!(count_after, 0);
1021        assert_eq!(size_after, 0);
1022    }
1023
1024    #[test]
1025    fn test_filter_stats() {
1026        let engine = create_test_engine();
1027        
1028        let (entries, capacity, _trust) = engine.l3_filter_stats();
1029        assert_eq!(entries, 0);
1030        assert!(capacity > 0);
1031    }
1032
1033    #[test]
1034    fn test_should_accept_writes() {
1035        let engine = create_test_engine();
1036        assert!(engine.should_accept_writes());
1037    }
1038
1039    #[test]
1040    fn test_pressure_level() {
1041        let engine = create_test_engine();
1042        assert_eq!(engine.pressure(), BackpressureLevel::Normal);
1043    }
1044
1045    #[tokio::test]
1046    async fn test_health_check_basic() {
1047        let engine = create_test_engine();
1048        
1049        let health = engine.health_check().await;
1050        
1051        // Engine is in Created state (not started)
1052        assert_eq!(health.state, EngineState::Created);
1053        assert!(!health.ready); // Not ready until started
1054        assert!(!health.healthy); // Not healthy (not Running)
1055        
1056        // Memory should be empty
1057        assert_eq!(health.memory_pressure, 0.0);
1058        assert_eq!(health.l1_items, 0);
1059        assert_eq!(health.l1_bytes, 0);
1060        
1061        // Backpressure should be normal
1062        assert_eq!(health.backpressure_level, BackpressureLevel::Normal);
1063        assert!(health.accepting_writes);
1064        
1065        // No backends configured
1066        assert!(health.redis_connected.is_none());
1067        assert!(health.sql_connected.is_none());
1068        assert!(health.wal_pending_items.is_none());
1069    }
1070}