sync_engine/coordinator/
mod.rs

1// Copyright (c) 2025-2026 Adrian Robinson. Licensed under the AGPL-3.0.
2// See LICENSE file in the project root for full license text.
3
4//! Sync engine coordinator.
5//!
6//! The [`SyncEngine`] is the main orchestrator that ties together all components:
7//! - L1 in-memory cache with eviction
8//! - L2 Redis cache with batch writes
9//! - L3 MySQL/SQLite archive with WAL durability
10//! - Cuckoo filters for existence checks
11//! - Merkle trees for sync verification
12//!
13//! # Lifecycle
14//!
15//! ```text
16//! Created → Connecting → DrainingWal → SyncingRedis → WarmingUp → Ready → Running → ShuttingDown
17//! ```
18//!
19//! # Example
20//!
21//! ```rust,no_run
22//! use sync_engine::{SyncEngine, SyncEngineConfig, SyncItem, EngineState};
23//! use serde_json::json;
24//! use tokio::sync::watch;
25//!
26//! # #[tokio::main]
27//! # async fn main() {
28//! let config = SyncEngineConfig::default();
29//! let (_tx, rx) = watch::channel(config.clone());
30//! let mut engine = SyncEngine::new(config, rx);
31//!
32//! assert_eq!(engine.state(), EngineState::Created);
33//!
34//! // engine.start().await.expect("Start failed");
35//! // assert!(engine.is_ready());
36//! # }
37//! ```
38
39mod types;
40mod api;
41mod lifecycle;
42mod flush;
43mod merkle_api;
44mod search_api;
45
46pub use types::{EngineState, ItemStatus, BatchResult, HealthCheck};
47pub use merkle_api::MerkleDiff;
48pub use search_api::{SearchTier, SearchResult, SearchSource};
49#[allow(unused_imports)]
50use types::WriteTarget;
51
52use std::sync::Arc;
53use std::sync::atomic::{AtomicUsize, AtomicU64, Ordering};
54use std::time::Instant;
55use dashmap::DashMap;
56use parking_lot::RwLock;
57use tokio::sync::{watch, Mutex};
58use tracing::{info, warn, debug, error};
59
60use crate::config::SyncEngineConfig;
61use crate::sync_item::SyncItem;
62use crate::submit_options::SubmitOptions;
63use crate::backpressure::BackpressureLevel;
64use crate::storage::traits::{CacheStore, ArchiveStore, StorageError};
65use crate::storage::sql::SqlStore;
66use crate::cuckoo::filter_manager::{FilterManager, FilterTrust};
67use crate::cuckoo::FilterPersistence;
68use crate::batching::hybrid_batcher::{HybridBatcher, BatchConfig, SizedItem};
69use crate::merkle::{RedisMerkleStore, SqlMerkleStore, MerkleBatch};
70use crate::resilience::wal::{WriteAheadLog, MysqlHealthChecker};
71use crate::eviction::tan_curve::{TanCurvePolicy, CacheEntry};
72
73use search_api::SearchState;
74
75/// Main sync engine coordinator.
76///
77/// Manages the three-tier storage architecture:
78/// - **L1**: In-memory DashMap with pressure-based eviction
79/// - **L2**: Redis cache with batch writes
80/// - **L3**: MySQL/SQLite archive (ground truth)
81///
82/// # Thread Safety
83///
84/// The engine is `Send + Sync` and designed for concurrent access.
85/// Internal state uses atomic operations and concurrent data structures.
86pub struct SyncEngine {
87    /// Configuration (can be updated at runtime via watch channel)
88    /// Uses RwLock for interior mutability so run() can take &self
89    pub(super) config: RwLock<SyncEngineConfig>,
90
91    /// Runtime config updates (Mutex for interior mutability in run loop)
92    pub(super) config_rx: Mutex<watch::Receiver<SyncEngineConfig>>,
93
94    /// Engine state (broadcast to watchers)
95    pub(super) state: watch::Sender<EngineState>,
96
97    /// Engine state receiver (for internal use)
98    pub(super) state_rx: watch::Receiver<EngineState>,
99
100    /// L1: In-memory cache
101    pub(super) l1_cache: Arc<DashMap<String, SyncItem>>,
102
103    /// L1 size tracking (bytes)
104    pub(super) l1_size_bytes: Arc<AtomicUsize>,
105
106    /// L2: Redis cache (optional)
107    pub(super) l2_store: Option<Arc<dyn CacheStore>>,
108    
109    /// L2: Direct RedisStore reference for CDC operations
110    pub(super) redis_store: Option<Arc<crate::storage::redis::RedisStore>>,
111
112    /// L3: MySQL/SQLite archive (optional)
113    pub(super) l3_store: Option<Arc<dyn ArchiveStore>>,
114    
115    /// L3: Direct SqlStore reference for dirty merkle operations
116    pub(super) sql_store: Option<Arc<SqlStore>>,
117
118    /// L3 Cuckoo filter (L2 has no filter - TTL makes it unreliable)
119    pub(super) l3_filter: Arc<FilterManager>,
120
121    /// Filter persistence (for fast startup)
122    pub(super) filter_persistence: Option<FilterPersistence>,
123
124    /// CF snapshot tracking
125    pub(super) cf_inserts_since_snapshot: AtomicU64,
126    pub(super) cf_last_snapshot: Mutex<Instant>,
127
128    /// Hybrid batcher for L2 writes
129    pub(super) l2_batcher: Mutex<HybridBatcher<SyncItem>>,
130
131    /// Redis merkle store
132    pub(super) redis_merkle: Option<RedisMerkleStore>,
133
134    /// SQL merkle store
135    pub(super) sql_merkle: Option<SqlMerkleStore>,
136
137    /// Write-ahead log for L3 durability
138    pub(super) l3_wal: Option<WriteAheadLog>,
139
140    /// MySQL health checker
141    pub(super) mysql_health: MysqlHealthChecker,
142
143    /// Eviction policy
144    pub(super) eviction_policy: TanCurvePolicy,
145
146    /// Search state (index manager + cache)
147    pub(super) search_state: Option<Arc<RwLock<SearchState>>>,
148}
149
150impl SyncEngine {
151    /// Create a new sync engine.
152    ///
153    /// The engine starts in `Created` state. Call [`start()`](Self::start)
154    /// to connect to backends and transition to `Ready`.
155    pub fn new(config: SyncEngineConfig, config_rx: watch::Receiver<SyncEngineConfig>) -> Self {
156        let (state_tx, state_rx) = watch::channel(EngineState::Created);
157
158        let batch_config = BatchConfig {
159            flush_ms: config.batch_flush_ms,
160            flush_count: config.batch_flush_count,
161            flush_bytes: config.batch_flush_bytes,
162        };
163
164        Self {
165            config: RwLock::new(config.clone()),
166            config_rx: Mutex::new(config_rx),
167            state: state_tx,
168            state_rx,
169            l1_cache: Arc::new(DashMap::new()),
170            l1_size_bytes: Arc::new(AtomicUsize::new(0)),
171            l2_store: None,
172            redis_store: None,
173            l3_store: None,
174            sql_store: None,
175            l3_filter: Arc::new(FilterManager::new("sync-engine-l3", 100_000)),
176            filter_persistence: None,
177            cf_inserts_since_snapshot: AtomicU64::new(0),
178            cf_last_snapshot: Mutex::new(Instant::now()),
179            l2_batcher: Mutex::new(HybridBatcher::new(batch_config)),
180            redis_merkle: None,
181            sql_merkle: None,
182            l3_wal: None,
183            mysql_health: MysqlHealthChecker::new(),
184            eviction_policy: TanCurvePolicy::default(),
185            search_state: Some(Arc::new(RwLock::new(SearchState::default()))),
186        }
187    }
188
189    /// Get current engine state.
190    #[must_use]
191    pub fn state(&self) -> EngineState {
192        *self.state_rx.borrow()
193    }
194
195    /// Get a receiver to watch state changes.
196    #[must_use]
197    pub fn state_receiver(&self) -> watch::Receiver<EngineState> {
198        self.state_rx.clone()
199    }
200
201    /// Check if engine is ready to accept requests.
202    #[must_use]
203    pub fn is_ready(&self) -> bool {
204        matches!(self.state(), EngineState::Ready | EngineState::Running)
205    }
206
207    /// Get current memory pressure (0.0 - 1.0+).
208    #[must_use]
209    pub fn memory_pressure(&self) -> f64 {
210        let used = self.l1_size_bytes.load(Ordering::Acquire);
211        let max = self.config.read().l1_max_bytes;
212        if max == 0 {
213            0.0
214        } else {
215            used as f64 / max as f64
216        }
217    }
218
219    /// Get current backpressure level.
220    #[must_use]
221    pub fn pressure(&self) -> BackpressureLevel {
222        BackpressureLevel::from_pressure(self.memory_pressure())
223    }
224
225    /// Check if the engine should accept writes (based on pressure).
226    #[must_use]
227    pub fn should_accept_writes(&self) -> bool {
228        let pressure = self.pressure();
229        !matches!(pressure, BackpressureLevel::Emergency | BackpressureLevel::Shutdown)
230    }
231
232    /// Perform a comprehensive health check.
233    ///
234    /// This probes backend connectivity (Redis PING, SQL SELECT 1) and
235    /// collects internal state into a [`HealthCheck`] struct suitable for
236    /// `/ready` and `/health` endpoints.
237    ///
238    /// # Performance
239    ///
240    /// - **Cached fields**: Instant (no I/O)
241    /// - **Live probes**: Redis PING + SQL SELECT 1 (parallel, ~1-10ms each)
242    ///
243    /// # Example
244    ///
245    /// ```rust,ignore
246    /// let health = engine.health_check().await;
247    ///
248    /// // For /ready endpoint (load balancer)
249    /// if health.healthy {
250    ///     HttpResponse::Ok().body("ready")
251    /// } else {
252    ///     HttpResponse::ServiceUnavailable().body("not ready")
253    /// }
254    ///
255    /// // For /health endpoint (diagnostics)
256    /// HttpResponse::Ok().json(health)
257    /// ```
258    pub async fn health_check(&self) -> types::HealthCheck {
259        // Cached state (no I/O)
260        let state = self.state();
261        let ready = matches!(state, EngineState::Ready | EngineState::Running);
262        let memory_pressure = self.memory_pressure();
263        let backpressure_level = self.pressure();
264        let accepting_writes = self.should_accept_writes();
265        let (l1_items, l1_bytes) = self.l1_stats();
266        let (filter_items, _, filter_trust) = self.l3_filter_stats();
267        
268        // WAL stats (cached, no I/O)
269        // Note: WalStats doesn't have file_size, so we only report pending items
270        let mysql_healthy = self.mysql_health.is_healthy();
271        let wal_pending_items = if let Some(ref wal) = self.l3_wal {
272            let stats = wal.stats(mysql_healthy);
273            Some(stats.pending_items)
274        } else {
275            None
276        };
277        
278        // Live backend probes (parallel)
279        let (redis_result, sql_result) = tokio::join!(
280            self.probe_redis(),
281            self.probe_sql()
282        );
283        
284        let (redis_connected, redis_latency_ms) = redis_result;
285        let (sql_connected, sql_latency_ms) = sql_result;
286        
287        // Derive overall health
288        // Healthy if: running, backends connected (or not configured), not in critical backpressure
289        let healthy = matches!(state, EngineState::Running)
290            && redis_connected != Some(false)
291            && sql_connected != Some(false)
292            && !matches!(backpressure_level, BackpressureLevel::Emergency | BackpressureLevel::Shutdown);
293        
294        types::HealthCheck {
295            state,
296            ready,
297            memory_pressure,
298            backpressure_level,
299            accepting_writes,
300            l1_items,
301            l1_bytes,
302            filter_items,
303            filter_trust,
304            redis_connected,
305            redis_latency_ms,
306            sql_connected,
307            sql_latency_ms,
308            wal_pending_items,
309            healthy,
310        }
311    }
312    
313    /// Probe Redis connectivity with PING.
314    async fn probe_redis(&self) -> (Option<bool>, Option<u64>) {
315        let Some(ref redis_store) = self.redis_store else {
316            return (None, None); // Redis not configured
317        };
318        
319        let start = std::time::Instant::now();
320        let mut conn = redis_store.connection();
321        
322        let result: Result<String, _> = redis::cmd("PING")
323            .query_async(&mut conn)
324            .await;
325        
326        match result {
327            Ok(_) => {
328                let latency_ms = start.elapsed().as_millis() as u64;
329                (Some(true), Some(latency_ms))
330            }
331            Err(_) => (Some(false), None),
332        }
333    }
334    
335    /// Probe SQL connectivity with SELECT 1.
336    async fn probe_sql(&self) -> (Option<bool>, Option<u64>) {
337        let Some(ref sql_store) = self.sql_store else {
338            return (None, None); // SQL not configured
339        };
340        
341        let start = std::time::Instant::now();
342        let pool = sql_store.pool();
343        
344        let result = sqlx::query("SELECT 1")
345            .fetch_one(&pool)
346            .await;
347        
348        match result {
349            Ok(_) => {
350                let latency_ms = start.elapsed().as_millis() as u64;
351                (Some(true), Some(latency_ms))
352            }
353            Err(_) => (Some(false), None),
354        }
355    }
356
357    // --- Core CRUD Operations ---
358
359    /// Get an item by ID.
360    ///
361    /// Checks storage tiers in order: L1 → L2 → L3.
362    /// Updates access count and promotes to L1 on hit.
363    #[tracing::instrument(skip(self), fields(tier))]
364    pub async fn get(&self, id: &str) -> Result<Option<SyncItem>, StorageError> {
365        let start = std::time::Instant::now();
366        
367        // 1. Check L1 (in-memory)
368        if let Some(mut item) = self.l1_cache.get_mut(id) {
369            item.access_count = item.access_count.saturating_add(1);
370            item.last_accessed = std::time::SystemTime::now()
371                .duration_since(std::time::UNIX_EPOCH)
372                .unwrap_or_default()
373                .as_millis() as u64;
374            tracing::Span::current().record("tier", "L1");
375            debug!("L1 hit");
376            crate::metrics::record_operation("L1", "get", "hit");
377            crate::metrics::record_latency("L1", "get", start.elapsed());
378            return Ok(Some(item.clone()));
379        }
380
381        // 2. Try L2 (Redis) - no filter, just try it
382        if let Some(ref l2) = self.l2_store {
383            match l2.get(id).await {
384                Ok(Some(item)) => {
385                    // Promote to L1
386                    self.insert_l1(item.clone());
387                    tracing::Span::current().record("tier", "L2");
388                    debug!("L2 hit, promoted to L1");
389                    crate::metrics::record_operation("L2", "get", "hit");
390                    crate::metrics::record_latency("L2", "get", start.elapsed());
391                    return Ok(Some(item));
392                }
393                Ok(None) => {
394                    // Not in Redis
395                    debug!("L2 miss");
396                    crate::metrics::record_operation("L2", "get", "miss");
397                }
398                Err(e) => {
399                    warn!(error = %e, "L2 lookup failed");
400                    crate::metrics::record_operation("L2", "get", "error");
401                }
402            }
403        }
404
405        // 3. Check L3 filter before hitting MySQL
406        if self.l3_filter.should_check_l3(id) {
407            crate::metrics::record_cuckoo_check("L3", "positive");
408            if let Some(ref l3) = self.l3_store {
409                match l3.get(id).await {
410                    Ok(Some(item)) => {
411                        // Promote to L1
412                        if self.memory_pressure() < 1.0 {
413                            self.insert_l1(item.clone());
414                        }
415                        tracing::Span::current().record("tier", "L3");
416                        debug!("L3 hit, promoted to L1");
417                        crate::metrics::record_operation("L3", "get", "hit");
418                        crate::metrics::record_latency("L3", "get", start.elapsed());
419                        crate::metrics::record_bytes_read("L3", item.content.len());
420                        return Ok(Some(item));
421                    }
422                    Ok(None) => {
423                        // False positive in filter
424                        debug!("L3 filter false positive");
425                        crate::metrics::record_operation("L3", "get", "false_positive");
426                        crate::metrics::record_cuckoo_false_positive("L3");
427                    }
428                    Err(e) => {
429                        warn!(error = %e, "L3 lookup failed");
430                        crate::metrics::record_operation("L3", "get", "error");
431                        crate::metrics::record_error("L3", "get", "backend");
432                    }
433                }
434            }
435        } else {
436            // Cuckoo filter says definitely not in L3 - saved a database query
437            crate::metrics::record_cuckoo_check("L3", "negative");
438        }
439
440        tracing::Span::current().record("tier", "miss");
441        debug!("Cache miss");
442        crate::metrics::record_operation("all", "get", "miss");
443        crate::metrics::record_latency("all", "get", start.elapsed());
444        Ok(None)
445    }
446
447    /// Get an item with hash verification.
448    ///
449    /// If the item has a non-empty `content_hash`, the content hash is verified.
450    /// Returns `StorageError::Corruption` if the hash doesn't match.
451    #[tracing::instrument(skip(self), fields(verified))]
452    pub async fn get_verified(&self, id: &str) -> Result<Option<SyncItem>, StorageError> {
453        let item = match self.get(id).await? {
454            Some(item) => item,
455            None => return Ok(None),
456        };
457
458        // Verify hash if item has content_hash set
459        if !item.content_hash.is_empty() {
460            use sha2::{Sha256, Digest};
461            
462            let computed = Sha256::digest(&item.content);
463            let computed_hex = hex::encode(computed);
464            
465            if computed_hex != item.content_hash {
466                tracing::Span::current().record("verified", false);
467                warn!(
468                    id = %id,
469                    expected = %item.content_hash,
470                    actual = %computed_hex,
471                    "Data corruption detected!"
472                );
473                
474                // Record corruption metric
475                crate::metrics::record_corruption(id);
476                
477                return Err(StorageError::Corruption {
478                    id: id.to_string(),
479                    expected: item.content_hash.clone(),
480                    actual: computed_hex,
481                });
482            }
483            
484            tracing::Span::current().record("verified", true);
485            debug!(id = %id, "Hash verification passed");
486        }
487
488        Ok(Some(item))
489    }
490
491    /// Submit an item for sync.
492    ///
493    /// The item is immediately stored in L1 and queued for batch write to L2/L3.
494    /// Uses default options: Redis + SQL (both enabled).
495    /// Filters are updated only on successful writes in flush_batch_internal().
496    ///
497    /// For custom routing, use [`submit_with`](Self::submit_with).
498    #[tracing::instrument(skip(self, item), fields(object_id = %item.object_id))]
499    pub async fn submit(&self, item: SyncItem) -> Result<(), StorageError> {
500        self.submit_with(item, SubmitOptions::default()).await
501    }
502
503    /// Submit an item with custom routing options.
504    ///
505    /// The item is immediately stored in L1 and queued for batch write.
506    /// Items are batched by compatible options for efficient pipelined writes.
507    ///
508    /// # Example
509    ///
510    /// ```rust,no_run
511    /// # use sync_engine::{SyncEngine, SyncItem, SubmitOptions, CacheTtl};
512    /// # async fn example(engine: &SyncEngine) -> Result<(), sync_engine::StorageError> {
513    /// // Cache-only with 1 minute TTL (no SQL write)
514    /// let item = SyncItem::new("cache.key".into(), b"data".to_vec());
515    /// engine.submit_with(item, SubmitOptions::cache(CacheTtl::Minute)).await?;
516    ///
517    /// // SQL-only durable storage (no Redis)
518    /// let item = SyncItem::new("archive.key".into(), b"data".to_vec());
519    /// engine.submit_with(item, SubmitOptions::durable()).await?;
520    /// # Ok(())
521    /// # }
522    /// ```
523    #[tracing::instrument(skip(self, item, options), fields(object_id = %item.object_id, redis = options.redis, sql = options.sql))]
524    pub async fn submit_with(&self, mut item: SyncItem, options: SubmitOptions) -> Result<(), StorageError> {
525        let start = std::time::Instant::now();
526        
527        if !self.should_accept_writes() {
528            crate::metrics::record_operation("engine", "submit", "rejected");
529            crate::metrics::record_error("engine", "submit", "backpressure");
530            return Err(StorageError::Backend(format!(
531                "Rejecting write: engine state={}, pressure={}",
532                self.state(),
533                self.pressure()
534            )));
535        }
536
537        let id = item.object_id.clone();
538        let item_bytes = item.content.len();
539
540        // Apply state override from options (if provided)
541        if let Some(ref state) = options.state {
542            item.state = state.clone();
543        }
544        
545        // Attach options to item (travels through batch pipeline)
546        item.submit_options = Some(options);
547
548        // Insert into L1 (immediate, in-memory)
549        self.insert_l1(item.clone());
550        crate::metrics::record_operation("L1", "submit", "success");
551        crate::metrics::record_bytes_written("L1", item_bytes);
552        
553        // NOTE: We do NOT insert into L2/L3 filters here!
554        // Filters are updated only on SUCCESSFUL writes in flush_batch_internal()
555        // This prevents filter/storage divergence if writes fail.
556        
557        // Queue for batched L2/L3 persistence
558        self.l2_batcher.lock().await.add(item);
559
560        debug!(id = %id, "Item submitted to L1 and batch queue");
561        crate::metrics::record_latency("L1", "submit", start.elapsed());
562        Ok(())
563    }
564
565    /// Delete an item from all storage tiers.
566    /// 
567    /// Deletes are more complex than writes because the item may exist in:
568    /// - L1 (DashMap) - immediate removal
569    /// - L2 (Redis) - async removal
570    /// - L3 (MySQL) - async removal  
571    /// - Cuckoo filters (L2/L3) - remove from both
572    /// - Merkle trees - update with deletion marker
573    #[tracing::instrument(skip(self), fields(object_id = %id))]
574    pub async fn delete(&self, id: &str) -> Result<bool, StorageError> {
575        let start = std::time::Instant::now();
576        
577        if !self.should_accept_writes() {
578            crate::metrics::record_operation("engine", "delete", "rejected");
579            crate::metrics::record_error("engine", "delete", "backpressure");
580            return Err(StorageError::Backend(format!(
581                "Rejecting delete: engine state={}, pressure={}",
582                self.state(),
583                self.pressure()
584            )));
585        }
586
587        let mut found = false;
588
589        // 1. Remove from L1 (immediate)
590        if let Some((_, item)) = self.l1_cache.remove(id) {
591            let size = Self::item_size(&item);
592            self.l1_size_bytes.fetch_sub(size, Ordering::Release);
593            found = true;
594            debug!("Deleted from L1");
595            crate::metrics::record_operation("L1", "delete", "success");
596        }
597
598        // 2. Remove from L3 cuckoo filter only (no L2 filter - TTL makes it unreliable)
599        self.l3_filter.remove(id);
600
601        // 3. Delete from L2 (Redis) - best effort
602        if let Some(ref l2) = self.l2_store {
603            let l2_start = std::time::Instant::now();
604            match l2.delete(id).await {
605                Ok(()) => {
606                    found = true;
607                    debug!("Deleted from L2 (Redis)");
608                    crate::metrics::record_operation("L2", "delete", "success");
609                    crate::metrics::record_latency("L2", "delete", l2_start.elapsed());
610                }
611                Err(e) => {
612                    warn!(error = %e, "Failed to delete from L2 (Redis)");
613                    crate::metrics::record_operation("L2", "delete", "error");
614                    crate::metrics::record_error("L2", "delete", "backend");
615                }
616            }
617        }
618
619        // 4. Delete from L3 (MySQL) - ground truth
620        if let Some(ref l3) = self.l3_store {
621            let l3_start = std::time::Instant::now();
622            match l3.delete(id).await {
623                Ok(()) => {
624                    found = true;
625                    debug!("Deleted from L3 (MySQL)");
626                    crate::metrics::record_operation("L3", "delete", "success");
627                    crate::metrics::record_latency("L3", "delete", l3_start.elapsed());
628                }
629                Err(e) => {
630                    error!(error = %e, "Failed to delete from L3 (MySQL)");
631                    crate::metrics::record_operation("L3", "delete", "error");
632                    crate::metrics::record_error("L3", "delete", "backend");
633                    // Don't return error - item may not exist in L3
634                }
635            }
636        }
637
638        // 5. Update merkle trees with deletion marker
639        let mut merkle_batch = MerkleBatch::new();
640        merkle_batch.delete(id.to_string());
641
642        if let Some(ref sql_merkle) = self.sql_merkle {
643            if let Err(e) = sql_merkle.apply_batch(&merkle_batch).await {
644                error!(error = %e, "Failed to update SQL Merkle tree for deletion");
645                crate::metrics::record_error("L3", "merkle", "batch_apply");
646            }
647        }
648
649        if let Some(ref redis_merkle) = self.redis_merkle {
650            if let Err(e) = redis_merkle.apply_batch(&merkle_batch).await {
651                warn!(error = %e, "Failed to update Redis Merkle tree for deletion");
652                crate::metrics::record_error("L2", "merkle", "batch_apply");
653            }
654        }
655
656        // 6. Emit CDC delete entry (if enabled)
657        if self.config.read().enable_cdc_stream && found {
658            self.emit_cdc_delete(id).await;
659        }
660
661        info!(found, "Delete operation completed");
662        crate::metrics::record_latency("all", "delete", start.elapsed());
663        Ok(found)
664    }
665
666    // --- Internal helpers ---
667
668    /// Insert or update an item in L1, correctly tracking size.
669    fn insert_l1(&self, item: SyncItem) {
670        let new_size = Self::item_size(&item);
671        let key = item.object_id.clone();
672        
673        // Use entry API to handle insert vs update atomically
674        if let Some(old_item) = self.l1_cache.insert(key, item) {
675            // Update: subtract old size, add new size
676            let old_size = Self::item_size(&old_item);
677            // Use wrapping operations to avoid underflow if sizes are estimated
678            let current = self.l1_size_bytes.load(Ordering::Acquire);
679            let new_total = current.saturating_sub(old_size).saturating_add(new_size);
680            self.l1_size_bytes.store(new_total, Ordering::Release);
681        } else {
682            // Insert: just add new size
683            self.l1_size_bytes.fetch_add(new_size, Ordering::Release);
684        }
685    }
686
687    /// Calculate approximate size of an item in bytes.
688    #[inline]
689    fn item_size(item: &SyncItem) -> usize {
690        // Use cached size if available, otherwise compute
691        item.size_bytes()
692    }
693
694    fn maybe_evict(&self) {
695        let pressure = self.memory_pressure();
696        if pressure < self.config.read().backpressure_warn {
697            return;
698        }
699
700        let level = BackpressureLevel::from_pressure(pressure);
701        debug!(pressure = %pressure, level = %level, "Memory pressure detected, running eviction");
702        
703        // Collect cache entries for scoring
704        let now = std::time::Instant::now();
705        let entries: Vec<CacheEntry> = self.l1_cache.iter()
706            .map(|ref_multi| {
707                let item = ref_multi.value();
708                let id = ref_multi.key().clone();
709                
710                // Convert epoch millis to Instant-relative age
711                let now_millis = std::time::SystemTime::now()
712                    .duration_since(std::time::UNIX_EPOCH)
713                    .unwrap_or_default()
714                    .as_millis() as u64;
715                let age_secs = if item.last_accessed > 0 {
716                    (now_millis.saturating_sub(item.last_accessed)) as f64 / 1000.0
717                } else {
718                    3600.0 // Default 1 hour if never accessed
719                };
720                
721                CacheEntry {
722                    id,
723                    size_bytes: item.size_bytes(),
724                    created_at: now - std::time::Duration::from_secs_f64(age_secs),
725                    last_access: now - std::time::Duration::from_secs_f64(age_secs),
726                    access_count: item.access_count,
727                    is_dirty: false, // All items in L1 are assumed flushed to L2/L3
728                }
729            })
730            .collect();
731        
732        if entries.is_empty() {
733            return;
734        }
735        
736        // Calculate how many to evict based on pressure level
737        let evict_count = match level {
738            BackpressureLevel::Normal => 0,
739            BackpressureLevel::Warn => entries.len() / 20,    // 5%
740            BackpressureLevel::Throttle => entries.len() / 10, // 10%
741            BackpressureLevel::Critical => entries.len() / 5,  // 20%
742            BackpressureLevel::Emergency => entries.len() / 3, // 33%
743            BackpressureLevel::Shutdown => entries.len() / 2,  // 50%
744        }.max(1);
745        
746        // Select victims using tan curve algorithm
747        let victims = self.eviction_policy.select_victims(&entries, evict_count, pressure);
748        
749        // Evict victims
750        let mut evicted_bytes = 0usize;
751        for victim_id in &victims {
752            if let Some((_, item)) = self.l1_cache.remove(victim_id) {
753                evicted_bytes += item.size_bytes();
754            }
755        }
756        
757        // Update size tracking
758        self.l1_size_bytes.fetch_sub(evicted_bytes, Ordering::Release);
759        
760        info!(
761            evicted = victims.len(),
762            evicted_bytes = evicted_bytes,
763            pressure = %pressure,
764            level = %level,
765            "Evicted entries from L1 cache"
766        );
767    }
768
769    /// Get L1 cache stats
770    pub fn l1_stats(&self) -> (usize, usize) {
771        (
772            self.l1_cache.len(),
773            self.l1_size_bytes.load(Ordering::Acquire),
774        )
775    }
776
777    /// Get L3 filter stats (entries, capacity, trust_state)
778    #[must_use]
779    pub fn l3_filter_stats(&self) -> (usize, usize, FilterTrust) {
780        self.l3_filter.stats()
781    }
782
783    /// Get access to the L3 filter (for warmup/verification)
784    pub fn l3_filter(&self) -> &Arc<FilterManager> {
785        &self.l3_filter
786    }
787
788    /// Get merkle root hashes from Redis (L2) and SQL (L3).
789    /// 
790    /// Returns `(redis_root, sql_root)` as hex strings.
791    /// Returns `None` for backends that aren't connected or have empty trees.
792    pub async fn merkle_roots(&self) -> (Option<String>, Option<String>) {
793        let redis_root = if let Some(ref rm) = self.redis_merkle {
794            rm.root_hash().await.ok().flatten().map(hex::encode)
795        } else {
796            None
797        };
798        
799        let sql_root = if let Some(ref sm) = self.sql_merkle {
800            sm.root_hash().await.ok().flatten().map(hex::encode)
801        } else {
802            None
803        };
804        
805        (redis_root, sql_root)
806    }
807
808    /// Verify and trust the L3 cuckoo filter.
809    /// 
810    /// Compares the filter's merkle root against L3's merkle root.
811    /// If they match, marks the filter as trusted.
812    /// 
813    /// Returns `true` if the filter is now trusted, `false` otherwise.
814    pub async fn verify_filter(&self) -> bool {
815        // Get SQL merkle root
816        let sql_root = if let Some(ref sm) = self.sql_merkle {
817            match sm.root_hash().await {
818                Ok(Some(root)) => root,
819                _ => return false,
820            }
821        } else {
822            // No SQL backend - can't verify, mark trusted anyway
823            self.l3_filter.mark_trusted();
824            return true;
825        };
826
827        // For now, we trust the filter if we have a SQL root
828        // A full implementation would compare CF merkle against SQL merkle
829        // But since CF doesn't maintain a merkle tree, we trust after warmup
830        info!(
831            sql_root = %hex::encode(sql_root),
832            "Verifying L3 filter against SQL merkle root"
833        );
834        
835        // Mark trusted if we got here (SQL is connected and has a root)
836        self.l3_filter.mark_trusted();
837        true
838    }
839
840    /// Update all gauge metrics with current engine state.
841    /// 
842    /// Call this before snapshotting metrics to ensure gauges reflect current state.
843    /// Useful for OTEL export or monitoring dashboards.
844    pub fn update_gauge_metrics(&self) {
845        let (l1_count, l1_bytes) = self.l1_stats();
846        crate::metrics::set_l1_cache_items(l1_count);
847        crate::metrics::set_l1_cache_bytes(l1_bytes);
848        crate::metrics::set_memory_pressure(self.memory_pressure());
849        
850        let (filter_entries, filter_capacity, _trust) = self.l3_filter_stats();
851        let filter_load = if filter_capacity > 0 { 
852            filter_entries as f64 / filter_capacity as f64 
853        } else { 
854            0.0 
855        };
856        crate::metrics::set_cuckoo_filter_entries("L3", filter_entries);
857        crate::metrics::set_cuckoo_filter_load("L3", filter_load);
858        
859        crate::metrics::set_backpressure_level(self.pressure() as u8);
860    }
861}
862
863#[cfg(test)]
864mod tests {
865    use super::*;
866    use crate::config::SyncEngineConfig;
867    use tokio::sync::watch;
868    use serde_json::json;
869
870    fn create_test_engine() -> SyncEngine {
871        let config = SyncEngineConfig::default();
872        let (_tx, rx) = watch::channel(config.clone());
873        SyncEngine::new(config, rx)
874    }
875
876    fn create_test_item(id: &str) -> SyncItem {
877        SyncItem::from_json(
878            id.to_string(),
879            json!({"data": "test"}),
880        )
881    }
882
883    #[test]
884    fn test_engine_created_state() {
885        let engine = create_test_engine();
886        assert_eq!(engine.state(), EngineState::Created);
887        assert!(!engine.is_ready());
888    }
889
890    #[test]
891    fn test_memory_pressure_calculation() {
892        let config = SyncEngineConfig {
893            l1_max_bytes: 1000,
894            ..Default::default()
895        };
896        let (_tx, rx) = watch::channel(config.clone());
897        let engine = SyncEngine::new(config, rx);
898
899        assert_eq!(engine.memory_pressure(), 0.0);
900
901        // Simulate adding items
902        let item = create_test_item("test1");
903        engine.insert_l1(item);
904
905        // Pressure should be > 0 now
906        assert!(engine.memory_pressure() > 0.0);
907    }
908
909    #[test]
910    fn test_l1_insert_and_size_tracking() {
911        let engine = create_test_engine();
912        
913        let item = create_test_item("test1");
914        let expected_size = item.size_bytes();
915        
916        engine.insert_l1(item);
917        
918        let (count, size) = engine.l1_stats();
919        assert_eq!(count, 1);
920        assert_eq!(size, expected_size);
921    }
922
923    #[test]
924    fn test_l1_update_size_tracking() {
925        let engine = create_test_engine();
926        
927        let item1 = create_test_item("test1");
928        engine.insert_l1(item1);
929        let (_, _size1) = engine.l1_stats();
930        
931        // Insert larger item with same ID
932        let item2 = SyncItem::from_json(
933            "test1".to_string(),
934            json!({"data": "much larger content here for testing size changes"}),
935        );
936        let size2_expected = item2.size_bytes();
937        engine.insert_l1(item2);
938        
939        let (count, size2) = engine.l1_stats();
940        assert_eq!(count, 1); // Still one item
941        assert_eq!(size2, size2_expected); // Size should be updated
942    }
943
944    #[tokio::test]
945    async fn test_get_nonexistent() {
946        let engine = create_test_engine();
947        let result = engine.get("nonexistent").await.unwrap();
948        assert!(result.is_none());
949    }
950
951    #[tokio::test]
952    async fn test_get_from_l1() {
953        let engine = create_test_engine();
954        let item = create_test_item("test1");
955        engine.insert_l1(item.clone());
956
957        let result = engine.get("test1").await.unwrap();
958        assert!(result.is_some());
959        assert_eq!(result.unwrap().object_id, "test1");
960    }
961
962    #[tokio::test]
963    async fn test_delete_from_l1() {
964        let engine = create_test_engine();
965        let item = create_test_item("test1");
966        engine.insert_l1(item);
967
968        let (count_before, _) = engine.l1_stats();
969        assert_eq!(count_before, 1);
970
971        let deleted = engine.delete("test1").await.unwrap();
972        assert!(deleted);
973
974        let (count_after, size_after) = engine.l1_stats();
975        assert_eq!(count_after, 0);
976        assert_eq!(size_after, 0);
977    }
978
979    #[test]
980    fn test_filter_stats() {
981        let engine = create_test_engine();
982        
983        let (entries, capacity, _trust) = engine.l3_filter_stats();
984        assert_eq!(entries, 0);
985        assert!(capacity > 0);
986    }
987
988    #[test]
989    fn test_should_accept_writes() {
990        let engine = create_test_engine();
991        assert!(engine.should_accept_writes());
992    }
993
994    #[test]
995    fn test_pressure_level() {
996        let engine = create_test_engine();
997        assert_eq!(engine.pressure(), BackpressureLevel::Normal);
998    }
999
1000    #[tokio::test]
1001    async fn test_health_check_basic() {
1002        let engine = create_test_engine();
1003        
1004        let health = engine.health_check().await;
1005        
1006        // Engine is in Created state (not started)
1007        assert_eq!(health.state, EngineState::Created);
1008        assert!(!health.ready); // Not ready until started
1009        assert!(!health.healthy); // Not healthy (not Running)
1010        
1011        // Memory should be empty
1012        assert_eq!(health.memory_pressure, 0.0);
1013        assert_eq!(health.l1_items, 0);
1014        assert_eq!(health.l1_bytes, 0);
1015        
1016        // Backpressure should be normal
1017        assert_eq!(health.backpressure_level, BackpressureLevel::Normal);
1018        assert!(health.accepting_writes);
1019        
1020        // No backends configured
1021        assert!(health.redis_connected.is_none());
1022        assert!(health.sql_connected.is_none());
1023        assert!(health.wal_pending_items.is_none());
1024    }
1025}