sync_engine/coordinator/
api.rs

1// Copyright (c) 2025-2026 Adrian Robinson. Licensed under the AGPL-3.0.
2// See LICENSE file in the project root for full license text.
3
4//! V1.1 API: Query and batch operations.
5//!
6//! This module contains the higher-level API methods added in V1.1:
7//! - `contains()` - Fast probabilistic existence check
8//! - `len()` / `is_empty()` - L1 cache size queries
9//! - `status()` - Detailed sync status
10//! - `get_many()` - Parallel batch fetch
11//! - `submit_many()` - Batch upsert
12//! - `delete_many()` - Batch delete
13//! - `get_or_insert_with()` - Cache-aside pattern
14
15use std::sync::atomic::Ordering;
16use tokio::task::JoinSet;
17use tracing::{debug, info, warn, error};
18
19use crate::storage::traits::StorageError;
20use crate::sync_item::SyncItem;
21use crate::merkle::MerkleBatch;
22
23use super::{SyncEngine, ItemStatus, BatchResult};
24
25impl SyncEngine {
26    // ═══════════════════════════════════════════════════════════════════════════
27    // API: Query & Batch Operations
28    // ═══════════════════════════════════════════════════════════════════════════
29
30    /// Check if an item exists across all tiers.
31    ///
32    /// Checks in order: L1 cache → Redis EXISTS → L3 Cuckoo filter → SQL query.
33    /// If found in SQL and Cuckoo filter was untrusted, updates the filter.
34    ///
35    /// # Returns
36    /// - `true` → item definitely exists in at least one tier
37    /// - `false` → item does not exist (authoritative)
38    ///
39    /// # Example
40    ///
41    /// ```rust,no_run
42    /// # use sync_engine::SyncEngine;
43    /// # async fn example(engine: &SyncEngine) {
44    /// if engine.contains("user.123").await {
45    ///     let item = engine.get("user.123").await;
46    /// } else {
47    ///     println!("Not found");
48    /// }
49    /// # }
50    /// ```
51    pub async fn contains(&self, id: &str) -> bool {
52        // L1: In-memory cache (definitive, sync)
53        if self.l1_cache.contains_key(id) {
54            return true;
55        }
56        
57        // L2: Redis EXISTS (async, authoritative for Redis tier)
58        if let Some(ref l2) = self.l2_store {
59            if l2.exists(id).await.unwrap_or(false) {
60                return true;
61            }
62        }
63        
64        // L3: Cuckoo filter check (sync, probabilistic)
65        if self.l3_filter.is_trusted() {
66            // Filter is trusted - use it for fast negative
67            if !self.l3_filter.should_check_l3(id) {
68                return false; // Definitely not in L3
69            }
70        }
71        
72        // L3: SQL query (async, ground truth)
73        if let Some(ref l3) = self.l3_store {
74            if l3.exists(id).await.unwrap_or(false) {
75                // Found in SQL - update Cuckoo if it was untrusted
76                if !self.l3_filter.is_trusted() {
77                    self.l3_filter.insert(id);
78                }
79                return true;
80            }
81        }
82        
83        false
84    }
85    
86    /// Fast check: is this item definitely NOT in L3?
87    ///
88    /// Uses the Cuckoo filter for a fast authoritative negative.
89    /// - Returns `true` → item is **definitely not** in L3 (safe to skip)
90    /// - Returns `false` → item **might** exist (need to check L3)
91    ///
92    /// Only meaningful when the L3 filter is trusted. If untrusted, returns `false`
93    /// (meaning "we don't know, you should check").
94    ///
95    /// # Use Case
96    ///
97    /// Fast early-exit in replication: if definitely missing, apply without checking.
98    ///
99    /// ```rust,no_run
100    /// # use sync_engine::SyncEngine;
101    /// # async fn example(engine: &SyncEngine) {
102    /// if engine.definitely_missing("patient.123") {
103    ///     // Fast path: definitely new, just insert
104    ///     println!("New item, inserting directly");
105    /// } else {
106    ///     // Slow path: might exist, check hash
107    ///     if !engine.is_current("patient.123", "abc123...").await {
108    ///         println!("Outdated, updating");
109    ///     }
110    /// }
111    /// # }
112    /// ```
113    #[must_use]
114    #[inline]
115    pub fn definitely_missing(&self, id: &str) -> bool {
116        // Only authoritative if filter is trusted
117        if !self.l3_filter.is_trusted() {
118            return false; // Unknown, caller should check
119        }
120        // Cuckoo false = definitely not there
121        !self.l3_filter.should_check_l3(id)
122    }
123
124    /// Fast check: might this item exist somewhere?
125    ///
126    /// Checks L1 cache (partial, evicts) and Cuckoo filter (probabilistic).
127    /// - Returns `true` → item is in L1 OR might be in L3 (worth checking)
128    /// - Returns `false` → item is definitely not in L1 or L3
129    ///
130    /// Note: L1 is partial (items evict), so this can return `false` even if
131    /// the item exists in L2/L3. For authoritative check, use `contains()`.
132    ///
133    /// # Use Case
134    ///
135    /// Quick probabilistic check before expensive async lookup.
136    #[must_use]
137    #[inline]
138    pub fn might_exist(&self, id: &str) -> bool {
139        self.l1_cache.contains_key(id) || self.l3_filter.should_check_l3(id)
140    }
141
142    /// Check if the item at `key` has the given content hash.
143    ///
144    /// This is the semantic API for CDC deduplication in replication.
145    /// Returns `true` if the item exists AND its content hash matches.
146    /// Returns `false` if item doesn't exist OR hash differs.
147    ///
148    /// # Arguments
149    /// * `id` - Object ID
150    /// * `content_hash` - SHA256 hash of content (hex-encoded string)
151    ///
152    /// # Example
153    ///
154    /// ```rust,no_run
155    /// # use sync_engine::SyncEngine;
156    /// # async fn example(engine: &SyncEngine) {
157    /// // Skip replication if we already have this version
158    /// let incoming_hash = "abc123...";
159    /// if engine.is_current("patient.123", incoming_hash).await {
160    ///     println!("Already up to date, skipping");
161    ///     return;
162    /// }
163    /// # }
164    /// ```
165    pub async fn is_current(&self, id: &str, content_hash: &str) -> bool {
166        // Check L1 first (fastest)
167        if let Some(item) = self.l1_cache.get(id) {
168            return item.content_hash == content_hash;
169        }
170
171        // Check L2 (if available)
172        if let Some(ref l2) = self.l2_store {
173            if let Ok(Some(item)) = l2.get(id).await {
174                return item.content_hash == content_hash;
175            }
176        }
177
178        // Check L3 (ground truth)
179        if let Some(ref l3) = self.l3_store {
180            if self.l3_filter.should_check_l3(id) {
181                if let Ok(Some(item)) = l3.get(id).await {
182                    return item.content_hash == content_hash;
183                }
184            }
185        }
186
187        false
188    }
189
190    /// Get the current count of items in L1 cache.
191    #[must_use]
192    #[inline]
193    pub fn len(&self) -> usize {
194        self.l1_cache.len()
195    }
196
197    /// Check if L1 cache is empty.
198    #[must_use]
199    #[inline]
200    pub fn is_empty(&self) -> bool {
201        self.l1_cache.is_empty()
202    }
203
204    /// Get the sync status of an item.
205    ///
206    /// Returns detailed state information about where an item exists
207    /// and its sync status across tiers.
208    ///
209    /// # Example
210    ///
211    /// ```rust,no_run
212    /// # use sync_engine::{SyncEngine, ItemStatus};
213    /// # async fn example(engine: &SyncEngine) {
214    /// match engine.status("order.456").await {
215    ///     ItemStatus::Synced { in_l1, in_l2, in_l3 } => {
216    ///         println!("Synced: L1={}, L2={}, L3={}", in_l1, in_l2, in_l3);
217    ///     }
218    ///     ItemStatus::Pending => println!("Queued for sync"),
219    ///     ItemStatus::Missing => println!("Not found"),
220    /// }
221    /// # }
222    /// ```
223    pub async fn status(&self, id: &str) -> ItemStatus {
224        let in_l1 = self.l1_cache.contains_key(id);
225        
226        // Check if pending in batch queue
227        let pending = self.l2_batcher.lock().await.contains(id);
228        if pending {
229            return ItemStatus::Pending;
230        }
231        
232        // Check L2 (if available) - use EXISTS, no filter
233        let in_l2 = if let Some(ref l2) = self.l2_store {
234            l2.exists(id).await.unwrap_or(false)
235        } else {
236            false
237        };
238        
239        // Check L3 (if available)  
240        let in_l3 = if let Some(ref l3) = self.l3_store {
241            self.l3_filter.should_check_l3(id) && l3.get(id).await.ok().flatten().is_some()
242        } else {
243            false
244        };
245        
246        if in_l1 || in_l2 || in_l3 {
247            ItemStatus::Synced { in_l1, in_l2, in_l3 }
248        } else {
249            ItemStatus::Missing
250        }
251    }
252
253    /// Fetch multiple items in parallel.
254    ///
255    /// Returns a vector of `Option<SyncItem>` in the same order as input IDs.
256    /// Missing items are represented as `None`.
257    ///
258    /// # Performance
259    ///
260    /// This method fetches from L1 synchronously, then batches L2/L3 lookups
261    /// for items not in L1. Much faster than sequential `get()` calls.
262    ///
263    /// # Example
264    ///
265    /// ```rust,no_run
266    /// # use sync_engine::SyncEngine;
267    /// # async fn example(engine: &SyncEngine) {
268    /// let ids = vec!["user.1", "user.2", "user.3"];
269    /// let items = engine.get_many(&ids).await;
270    /// for (id, item) in ids.iter().zip(items.iter()) {
271    ///     match item {
272    ///         Some(item) => println!("{}: found", id),
273    ///         None => println!("{}: missing", id),
274    ///     }
275    /// }
276    /// # }
277    /// ```
278    pub async fn get_many(&self, ids: &[&str]) -> Vec<Option<SyncItem>> {
279        let mut results: Vec<Option<SyncItem>> = vec![None; ids.len()];
280        let mut missing_indices: Vec<usize> = Vec::new();
281        
282        // Phase 1: Check L1 (synchronous, fast)
283        for (i, id) in ids.iter().enumerate() {
284            if let Some(item) = self.l1_cache.get(*id) {
285                results[i] = Some(item.clone());
286            } else {
287                missing_indices.push(i);
288            }
289        }
290        
291        // Phase 2: Fetch missing items from L2/L3 in parallel
292        if !missing_indices.is_empty() {
293            let mut join_set: JoinSet<(usize, Option<SyncItem>)> = JoinSet::new();
294            
295            for &i in &missing_indices {
296                let id = ids[i].to_string();
297                let l2_store = self.l2_store.clone();
298                let l3_store = self.l3_store.clone();
299                let l3_filter = self.l3_filter.clone();
300                
301                join_set.spawn(async move {
302                    // Try L2 first (no filter, just try Redis)
303                    if let Some(ref l2) = l2_store {
304                        if let Ok(Some(item)) = l2.get(&id).await {
305                            return (i, Some(item));
306                        }
307                    }
308                    
309                    // Fall back to L3 (use Cuckoo filter if trusted)
310                    if let Some(ref l3) = l3_store {
311                        if !l3_filter.is_trusted() || l3_filter.should_check_l3(&id) {
312                            if let Ok(Some(item)) = l3.get(&id).await {
313                                return (i, Some(item));
314                            }
315                        }
316                    }
317                    
318                    (i, None)
319                });
320            }
321            
322            // Collect results
323            while let Some(result) = join_set.join_next().await {
324                if let Ok((i, item)) = result {
325                    results[i] = item;
326                }
327            }
328        }
329        
330        results
331    }
332
333    /// Submit multiple items for sync atomically.
334    ///
335    /// All items are added to L1 and queued for batch persistence.
336    /// Returns a `BatchResult` with success/failure counts.
337    ///
338    /// # Example
339    ///
340    /// ```rust,no_run
341    /// # use sync_engine::{SyncEngine, SyncItem};
342    /// # use serde_json::json;
343    /// # async fn example(engine: &SyncEngine) {
344    /// let items = vec![
345    ///     SyncItem::from_json("user.1".into(), json!({"name": "Alice"})),
346    ///     SyncItem::from_json("user.2".into(), json!({"name": "Bob"})),
347    /// ];
348    /// let result = engine.submit_many(items).await.unwrap();
349    /// println!("Submitted: {}, Failed: {}", result.succeeded, result.failed);
350    /// # }
351    /// ```
352    pub async fn submit_many(&self, items: Vec<SyncItem>) -> Result<BatchResult, StorageError> {
353        if !self.should_accept_writes() {
354            return Err(StorageError::Backend(format!(
355                "Rejecting batch write: engine state={}, pressure={}",
356                self.state(),
357                self.pressure()
358            )));
359        }
360        
361        let total = items.len();
362        let mut succeeded = 0;
363        
364        // Lock batcher once for the whole batch
365        let mut batcher = self.l2_batcher.lock().await;
366        
367        for item in items {
368            self.insert_l1(item.clone());
369            batcher.add(item);
370            succeeded += 1;
371        }
372        
373        debug!(total, succeeded, "Batch submitted to L1 and queue");
374        
375        Ok(BatchResult {
376            total,
377            succeeded,
378            failed: total - succeeded,
379        })
380    }
381
382    /// Submit a batch of materialized views.
383    ///
384    /// Uses a dedicated queue to avoid contention with the main batcher.
385    /// Views are flushed independently by the sync engine loop.
386    pub async fn submit_view_batch(&self, items: Vec<SyncItem>) -> Result<BatchResult, StorageError> {
387        if !self.should_accept_writes() {
388            return Err(StorageError::Backend(format!(
389                "Rejecting view batch: engine state={}, pressure={}",
390                self.state(),
391                self.pressure()
392            )));
393        }
394        
395        let total = items.len();
396        
397        // Update L1 cache immediately
398        for item in &items {
399            self.insert_l1(item.clone());
400        }
401        
402        // Send to dedicated queue
403        match self.view_queue_tx.send(items).await {
404            Ok(_) => {
405                debug!(total, "View batch submitted to queue");
406                Ok(BatchResult {
407                    total,
408                    succeeded: total,
409                    failed: 0,
410                })
411            },
412            Err(_) => {
413                error!("View queue closed");
414                Err(StorageError::Backend("View queue closed".to_string()))
415            }
416        }
417    }
418
419    /// Delete multiple items atomically.
420    ///
421    /// Removes items from all tiers (L1, L2, L3) and updates filters.
422    /// Returns a `BatchResult` with counts.
423    ///
424    /// # Example
425    ///
426    /// ```rust,no_run
427    /// # use sync_engine::SyncEngine;
428    /// # async fn example(engine: &SyncEngine) {
429    /// let ids = vec!["user.1", "user.2", "user.3"];
430    /// let result = engine.delete_many(&ids).await.unwrap();
431    /// println!("Deleted: {}", result.succeeded);
432    /// # }
433    /// ```
434    pub async fn delete_many(&self, ids: &[&str]) -> Result<BatchResult, StorageError> {
435        if !self.should_accept_writes() {
436            return Err(StorageError::Backend(format!(
437                "Rejecting batch delete: engine state={}, pressure={}",
438                self.state(),
439                self.pressure()
440            )));
441        }
442        
443        let total = ids.len();
444        let mut succeeded = 0;
445        
446        // Build merkle batch for all deletions
447        let mut merkle_batch = MerkleBatch::new();
448        
449        for id in ids {
450            // Remove from L1
451            if let Some((_, item)) = self.l1_cache.remove(*id) {
452                let size = Self::item_size(&item);
453                self.l1_size_bytes.fetch_sub(size, Ordering::Release);
454            }
455            
456            // Remove from L3 filter (no L2 filter with TTL support)
457            self.l3_filter.remove(id);
458            
459            // Queue merkle deletion
460            merkle_batch.delete(id.to_string());
461            
462            succeeded += 1;
463        }
464        
465        // Batch delete from L2
466        if let Some(ref l2) = self.l2_store {
467            for id in ids {
468                if let Err(e) = l2.delete(id).await {
469                    warn!(id, error = %e, "Failed to delete from L2");
470                }
471            }
472        }
473        
474        // Batch delete from L3
475        if let Some(ref l3) = self.l3_store {
476            for id in ids {
477                if let Err(e) = l3.delete(id).await {
478                    warn!(id, error = %e, "Failed to delete from L3");
479                }
480            }
481        }
482        
483        // Update merkle trees
484        if let Some(ref sql_merkle) = self.sql_merkle {
485            if let Err(e) = sql_merkle.apply_batch(&merkle_batch).await {
486                error!(error = %e, "Failed to update SQL Merkle tree for batch deletion");
487            }
488        }
489        
490        if let Some(ref redis_merkle) = self.redis_merkle {
491            if let Err(e) = redis_merkle.apply_batch(&merkle_batch).await {
492                warn!(error = %e, "Failed to update Redis Merkle tree for batch deletion");
493            }
494        }
495        
496        info!(total, succeeded, "Batch delete completed");
497        
498        Ok(BatchResult {
499            total,
500            succeeded,
501            failed: total - succeeded,
502        })
503    }
504
505    /// Get an item, or compute and insert it if missing.
506    ///
507    /// This is the classic "get or insert" pattern, useful for cache-aside:
508    /// 1. Check cache (L1 → L2 → L3)
509    /// 2. If missing, call the async factory function
510    /// 3. Insert the result and return it
511    ///
512    /// The factory is only called if the item is not found.
513    ///
514    /// # Example
515    ///
516    /// ```rust,no_run
517    /// # use sync_engine::{SyncEngine, SyncItem};
518    /// # use serde_json::json;
519    /// # async fn example(engine: &SyncEngine) {
520    /// let item = engine.get_or_insert_with("user.123", || async {
521    ///     // Expensive operation - only runs if not cached
522    ///     SyncItem::from_json("user.123".into(), json!({"name": "Fetched from DB"}))
523    /// }).await.unwrap();
524    /// # }
525    /// ```
526    pub async fn get_or_insert_with<F, Fut>(
527        &self,
528        id: &str,
529        factory: F,
530    ) -> Result<SyncItem, StorageError>
531    where
532        F: FnOnce() -> Fut,
533        Fut: std::future::Future<Output = SyncItem>,
534    {
535        // Try to get existing
536        if let Some(item) = self.get(id).await? {
537            return Ok(item);
538        }
539        
540        // Not found - compute new value
541        let item = factory().await;
542        
543        // Insert and return
544        self.submit(item.clone()).await?;
545        
546        Ok(item)
547    }
548    
549    // ═══════════════════════════════════════════════════════════════════════════
550    // State-based queries: Fast indexed access by caller-defined state tag
551    // ═══════════════════════════════════════════════════════════════════════════
552    
553    /// Get items by state from SQL (L3 ground truth).
554    ///
555    /// Uses indexed query for fast retrieval.
556    ///
557    /// # Example
558    ///
559    /// ```rust,no_run
560    /// # use sync_engine::{SyncEngine, StorageError};
561    /// # async fn example(engine: &SyncEngine) -> Result<(), StorageError> {
562    /// // Get all delta items for CRDT merging
563    /// let deltas = engine.get_by_state("delta", 1000).await?;
564    /// for item in deltas {
565    ///     println!("Delta: {}", item.object_id);
566    /// }
567    /// # Ok(())
568    /// # }
569    /// ```
570    pub async fn get_by_state(&self, state: &str, limit: usize) -> Result<Vec<SyncItem>, StorageError> {
571        if let Some(ref sql) = self.sql_store {
572            sql.get_by_state(state, limit).await
573        } else {
574            Ok(Vec::new())
575        }
576    }
577    
578    /// Count items in a given state (SQL ground truth).
579    ///
580    /// # Example
581    ///
582    /// ```rust,no_run
583    /// # use sync_engine::{SyncEngine, StorageError};
584    /// # async fn example(engine: &SyncEngine) -> Result<(), StorageError> {
585    /// let pending_count = engine.count_by_state("pending").await?;
586    /// println!("{} items pending", pending_count);
587    /// # Ok(())
588    /// # }
589    /// ```
590    pub async fn count_by_state(&self, state: &str) -> Result<u64, StorageError> {
591        if let Some(ref sql) = self.sql_store {
592            sql.count_by_state(state).await
593        } else {
594            Ok(0)
595        }
596    }
597    
598    /// Get just the IDs of items in a given state (lightweight query).
599    ///
600    /// Returns IDs from SQL. For Redis state SET, use `list_state_ids_redis()`.
601    pub async fn list_state_ids(&self, state: &str, limit: usize) -> Result<Vec<String>, StorageError> {
602        if let Some(ref sql) = self.sql_store {
603            sql.list_state_ids(state, limit).await
604        } else {
605            Ok(Vec::new())
606        }
607    }
608    
609    /// Update the state of an item by ID.
610    ///
611    /// Updates both SQL (ground truth) and Redis state SETs.
612    /// L1 cache is NOT updated - caller should re-fetch if needed.
613    ///
614    /// Returns true if the item was found and updated.
615    pub async fn set_state(&self, id: &str, new_state: &str) -> Result<bool, StorageError> {
616        let mut updated = false;
617        
618        // Update SQL (ground truth)
619        if let Some(ref sql) = self.sql_store {
620            updated = sql.set_state(id, new_state).await?;
621        }
622        
623        // Note: Redis state SETs are not updated here because we'd need to know
624        // the old state to do SREM. For full Redis state management, the item
625        // should be re-submitted with the new state via submit_with().
626        
627        Ok(updated)
628    }
629    
630    /// Delete all items in a given state from SQL.
631    ///
632    /// Also removes from L1 cache and Redis state SET.
633    /// Returns the number of deleted items.
634    ///
635    /// # Example
636    ///
637    /// ```rust,no_run
638    /// # use sync_engine::{SyncEngine, StorageError};
639    /// # async fn example(engine: &SyncEngine) -> Result<(), StorageError> {
640    /// // Clean up all processed deltas
641    /// let deleted = engine.delete_by_state("delta").await?;
642    /// println!("Deleted {} delta items", deleted);
643    /// # Ok(())
644    /// # }
645    /// ```
646    pub async fn delete_by_state(&self, state: &str) -> Result<u64, StorageError> {
647        let mut deleted = 0u64;
648        
649        // Get IDs first (for L1 cleanup)
650        let ids = if let Some(ref sql) = self.sql_store {
651            sql.list_state_ids(state, 100_000).await?
652        } else {
653            Vec::new()
654        };
655        
656        // Remove from L1 cache
657        for id in &ids {
658            self.l1_cache.remove(id);
659        }
660        
661        // Delete from SQL
662        if let Some(ref sql) = self.sql_store {
663            deleted = sql.delete_by_state(state).await?;
664        }
665        
666        // Note: Redis items with TTL will expire naturally.
667        // For immediate Redis cleanup, call delete_by_state on RedisStore directly.
668        
669        info!(state = %state, deleted = deleted, "Deleted items by state");
670        
671        Ok(deleted)
672    }
673    
674    // =========================================================================
675    // Prefix Scan Operations
676    // =========================================================================
677    
678    /// Scan items by ID prefix.
679    ///
680    /// Retrieves all items whose ID starts with the given prefix.
681    /// Queries SQL (ground truth) directly - does NOT check L1 cache.
682    ///
683    /// Useful for CRDT delta-first architecture where deltas are stored as:
684    /// `delta:{object_id}:{op_id}` and you need to fetch all deltas for an object.
685    ///
686    /// # Example
687    ///
688    /// ```rust,no_run
689    /// # use sync_engine::{SyncEngine, StorageError};
690    /// # async fn example(engine: &SyncEngine) -> Result<(), StorageError> {
691    /// // Get base state
692    /// let base = engine.get("base:user.123").await?;
693    ///
694    /// // Get all pending deltas for this object
695    /// let deltas = engine.scan_prefix("delta:user.123:", 1000).await?;
696    ///
697    /// // Merge on-the-fly for read-repair
698    /// for delta in deltas {
699    ///     println!("Delta: {} -> {:?}", delta.object_id, delta.content_as_json());
700    /// }
701    /// # Ok(())
702    /// # }
703    /// ```
704    pub async fn scan_prefix(&self, prefix: &str, limit: usize) -> Result<Vec<SyncItem>, StorageError> {
705        if let Some(ref sql) = self.sql_store {
706            sql.scan_prefix(prefix, limit).await
707        } else {
708            Ok(Vec::new())
709        }
710    }
711    
712    /// Count items matching an ID prefix (SQL ground truth).
713    pub async fn count_prefix(&self, prefix: &str) -> Result<u64, StorageError> {
714        if let Some(ref sql) = self.sql_store {
715            sql.count_prefix(prefix).await
716        } else {
717            Ok(0)
718        }
719    }
720    
721    /// Delete all items matching an ID prefix.
722    ///
723    /// Removes from L1 cache, SQL, and Redis.
724    /// Returns the number of deleted items.
725    ///
726    /// # Example
727    ///
728    /// ```rust,no_run
729    /// # use sync_engine::{SyncEngine, StorageError};
730    /// # async fn example(engine: &SyncEngine) -> Result<(), StorageError> {
731    /// // After merging deltas into base, clean them up
732    /// let deleted = engine.delete_prefix("delta:user.123:").await?;
733    /// println!("Cleaned up {} deltas", deleted);
734    /// # Ok(())
735    /// # }
736    /// ```
737    pub async fn delete_prefix(&self, prefix: &str) -> Result<u64, StorageError> {
738        let mut deleted = 0u64;
739        
740        // Get IDs first (for L1/L2 cleanup)
741        let items = if let Some(ref sql) = self.sql_store {
742            sql.scan_prefix(prefix, 100_000).await?
743        } else {
744            Vec::new()
745        };
746        
747        // Remove from L1 cache
748        for item in &items {
749            self.l1_cache.remove(&item.object_id);
750        }
751        
752        // Remove from L2 (Redis) one-by-one via CacheStore trait
753        if let Some(ref l2) = self.l2_store {
754            for item in &items {
755                let _ = l2.delete(&item.object_id).await;
756            }
757        }
758        
759        // Delete from SQL
760        if let Some(ref sql) = self.sql_store {
761            deleted = sql.delete_prefix(prefix).await?;
762        }
763        
764        info!(prefix = %prefix, deleted = deleted, "Deleted items by prefix");
765        
766        Ok(deleted)
767    }
768}
769
770#[cfg(test)]
771mod tests {
772    use super::*;
773    use crate::config::SyncEngineConfig;
774    use serde_json::json;
775    use tokio::sync::watch;
776
777    fn test_config() -> SyncEngineConfig {
778        SyncEngineConfig {
779            redis_url: None,
780            sql_url: None,
781            wal_path: None,
782            l1_max_bytes: 1024 * 1024,
783            ..Default::default()
784        }
785    }
786
787    fn test_item(id: &str) -> SyncItem {
788        SyncItem::from_json(id.to_string(), json!({"test": "data", "id": id}))
789    }
790
791    #[tokio::test]
792    async fn test_contains_l1_hit() {
793        let config = test_config();
794        let (_tx, rx) = watch::channel(config.clone());
795        let engine = SyncEngine::new(config, rx);
796        
797        engine.l1_cache.insert("test.exists".into(), test_item("test.exists"));
798        
799        assert!(engine.contains("test.exists").await);
800    }
801
802    #[tokio::test]
803    async fn test_contains_with_trusted_filter() {
804        let config = test_config();
805        let (_tx, rx) = watch::channel(config.clone());
806        let engine = SyncEngine::new(config, rx);
807        
808        engine.l3_filter.mark_trusted();
809        
810        engine.l1_cache.insert("test.exists".into(), test_item("test.exists"));
811        engine.l3_filter.insert("test.exists");
812        
813        assert!(engine.contains("test.exists").await);
814        assert!(!engine.contains("test.missing").await);
815    }
816
817    #[test]
818    fn test_len_and_is_empty() {
819        let config = test_config();
820        let (_tx, rx) = watch::channel(config.clone());
821        let engine = SyncEngine::new(config, rx);
822        
823        assert!(engine.is_empty());
824        assert_eq!(engine.len(), 0);
825        
826        engine.l1_cache.insert("a".into(), test_item("a"));
827        assert!(!engine.is_empty());
828        assert_eq!(engine.len(), 1);
829        
830        engine.l1_cache.insert("b".into(), test_item("b"));
831        assert_eq!(engine.len(), 2);
832    }
833
834    #[tokio::test]
835    async fn test_status_synced_in_l1() {
836        use super::super::EngineState;
837        
838        let config = test_config();
839        let (_tx, rx) = watch::channel(config.clone());
840        let engine = SyncEngine::new(config, rx);
841        let _ = engine.state.send(EngineState::Ready);
842        
843        engine.submit(test_item("test.item")).await.expect("Submit failed");
844        let _ = engine.l2_batcher.lock().await.force_flush();
845        
846        let status = engine.status("test.item").await;
847        assert!(matches!(status, ItemStatus::Synced { in_l1: true, .. }));
848    }
849
850    #[tokio::test]
851    async fn test_status_pending() {
852        use super::super::EngineState;
853        
854        let config = test_config();
855        let (_tx, rx) = watch::channel(config.clone());
856        let engine = SyncEngine::new(config, rx);
857        let _ = engine.state.send(EngineState::Ready);
858        
859        engine.submit(test_item("test.pending")).await.expect("Submit failed");
860        
861        let status = engine.status("test.pending").await;
862        assert_eq!(status, ItemStatus::Pending);
863    }
864
865    #[tokio::test]
866    async fn test_status_missing() {
867        let config = test_config();
868        let (_tx, rx) = watch::channel(config.clone());
869        let engine = SyncEngine::new(config, rx);
870        
871        let status = engine.status("test.nonexistent").await;
872        assert_eq!(status, ItemStatus::Missing);
873    }
874
875    #[tokio::test]
876    async fn test_get_many_from_l1() {
877        use super::super::EngineState;
878        
879        let config = test_config();
880        let (_tx, rx) = watch::channel(config.clone());
881        let engine = SyncEngine::new(config, rx);
882        let _ = engine.state.send(EngineState::Ready);
883        
884        engine.l1_cache.insert("a".into(), test_item("a"));
885        engine.l1_cache.insert("b".into(), test_item("b"));
886        engine.l1_cache.insert("c".into(), test_item("c"));
887        
888        let results = engine.get_many(&["a", "b", "missing", "c"]).await;
889        
890        assert_eq!(results.len(), 4);
891        assert!(results[0].is_some());
892        assert!(results[1].is_some());
893        assert!(results[2].is_none());
894        assert!(results[3].is_some());
895        
896        assert_eq!(results[0].as_ref().unwrap().object_id, "a");
897        assert_eq!(results[1].as_ref().unwrap().object_id, "b");
898        assert_eq!(results[3].as_ref().unwrap().object_id, "c");
899    }
900
901    #[tokio::test]
902    async fn test_submit_many() {
903        use super::super::EngineState;
904        
905        let config = test_config();
906        let (_tx, rx) = watch::channel(config.clone());
907        let engine = SyncEngine::new(config, rx);
908        let _ = engine.state.send(EngineState::Ready);
909        
910        let items = vec![
911            test_item("batch.1"),
912            test_item("batch.2"),
913            test_item("batch.3"),
914        ];
915        
916        let result = engine.submit_many(items).await.expect("Batch submit failed");
917        
918        assert_eq!(result.total, 3);
919        assert_eq!(result.succeeded, 3);
920        assert_eq!(result.failed, 0);
921        assert!(result.is_success());
922        
923        assert_eq!(engine.len(), 3);
924        assert!(engine.contains("batch.1").await);
925        assert!(engine.contains("batch.2").await);
926        assert!(engine.contains("batch.3").await);
927    }
928
929    #[tokio::test]
930    async fn test_delete_many() {
931        use super::super::EngineState;
932        
933        let config = test_config();
934        let (_tx, rx) = watch::channel(config.clone());
935        let engine = SyncEngine::new(config, rx);
936        let _ = engine.state.send(EngineState::Ready);
937        
938        engine.l1_cache.insert("del.1".into(), test_item("del.1"));
939        engine.l1_cache.insert("del.2".into(), test_item("del.2"));
940        engine.l1_cache.insert("keep".into(), test_item("keep"));
941        
942        let result = engine.delete_many(&["del.1", "del.2"]).await.expect("Batch delete failed");
943        
944        assert_eq!(result.total, 2);
945        assert_eq!(result.succeeded, 2);
946        assert!(result.is_success());
947        
948        assert!(!engine.l1_cache.contains_key("del.1"));
949        assert!(!engine.l1_cache.contains_key("del.2"));
950        assert!(engine.l1_cache.contains_key("keep"));
951    }
952
953    #[tokio::test]
954    async fn test_get_or_insert_with_existing() {
955        use super::super::EngineState;
956        
957        let config = test_config();
958        let (_tx, rx) = watch::channel(config.clone());
959        let engine = SyncEngine::new(config, rx);
960        let _ = engine.state.send(EngineState::Ready);
961        
962        let existing = test_item("existing");
963        engine.l1_cache.insert("existing".into(), existing.clone());
964        
965        let factory_called = std::sync::atomic::AtomicBool::new(false);
966        let result = engine.get_or_insert_with("existing", || {
967            factory_called.store(true, std::sync::atomic::Ordering::SeqCst);
968            async { test_item("should_not_be_used") }
969        }).await.expect("get_or_insert_with failed");
970        
971        assert!(!factory_called.load(std::sync::atomic::Ordering::SeqCst));
972        assert_eq!(result.object_id, "existing");
973    }
974
975    #[tokio::test]
976    async fn test_get_or_insert_with_missing() {
977        use super::super::EngineState;
978        
979        let config = test_config();
980        let (_tx, rx) = watch::channel(config.clone());
981        let engine = SyncEngine::new(config, rx);
982        let _ = engine.state.send(EngineState::Ready);
983        
984        let result = engine.get_or_insert_with("new_item", || async {
985            SyncItem::from_json("new_item".into(), json!({"created": "by factory"}))
986        }).await.expect("get_or_insert_with failed");
987        
988        assert_eq!(result.object_id, "new_item");
989        assert!(engine.contains("new_item").await);
990    }
991}