sync_engine/coordinator/
api.rs

1//! V1.1 API: Query and batch operations.
2//!
3//! This module contains the higher-level API methods added in V1.1:
4//! - `contains()` - Fast probabilistic existence check
5//! - `len()` / `is_empty()` - L1 cache size queries
6//! - `status()` - Detailed sync status
7//! - `get_many()` - Parallel batch fetch
8//! - `submit_many()` - Batch upsert
9//! - `delete_many()` - Batch delete
10//! - `get_or_insert_with()` - Cache-aside pattern
11
12use std::sync::atomic::Ordering;
13use tokio::task::JoinSet;
14use tracing::{debug, info, warn, error};
15
16use crate::storage::traits::StorageError;
17use crate::sync_item::SyncItem;
18use crate::merkle::MerkleBatch;
19
20use super::{SyncEngine, ItemStatus, BatchResult};
21
22impl SyncEngine {
23    // ═══════════════════════════════════════════════════════════════════════════
24    // API: Query & Batch Operations
25    // ═══════════════════════════════════════════════════════════════════════════
26
27    /// Check if an item exists across all tiers.
28    ///
29    /// Checks in order: L1 cache → Redis EXISTS → L3 Cuckoo filter → SQL query.
30    /// If found in SQL and Cuckoo filter was untrusted, updates the filter.
31    ///
32    /// # Returns
33    /// - `true` → item definitely exists in at least one tier
34    /// - `false` → item does not exist (authoritative)
35    ///
36    /// # Example
37    ///
38    /// ```rust,no_run
39    /// # use sync_engine::SyncEngine;
40    /// # async fn example(engine: &SyncEngine) {
41    /// if engine.contains("user.123").await {
42    ///     let item = engine.get("user.123").await;
43    /// } else {
44    ///     println!("Not found");
45    /// }
46    /// # }
47    /// ```
48    pub async fn contains(&self, id: &str) -> bool {
49        // L1: In-memory cache (definitive, sync)
50        if self.l1_cache.contains_key(id) {
51            return true;
52        }
53        
54        // L2: Redis EXISTS (async, authoritative for Redis tier)
55        if let Some(ref l2) = self.l2_store {
56            if l2.exists(id).await.unwrap_or(false) {
57                return true;
58            }
59        }
60        
61        // L3: Cuckoo filter check (sync, probabilistic)
62        if self.l3_filter.is_trusted() {
63            // Filter is trusted - use it for fast negative
64            if !self.l3_filter.should_check_l3(id) {
65                return false; // Definitely not in L3
66            }
67        }
68        
69        // L3: SQL query (async, ground truth)
70        if let Some(ref l3) = self.l3_store {
71            if l3.exists(id).await.unwrap_or(false) {
72                // Found in SQL - update Cuckoo if it was untrusted
73                if !self.l3_filter.is_trusted() {
74                    self.l3_filter.insert(id);
75                }
76                return true;
77            }
78        }
79        
80        false
81    }
82    
83    /// Fast check: is this item definitely NOT in L3?
84    ///
85    /// Uses the Cuckoo filter for a fast authoritative negative.
86    /// - Returns `true` → item is **definitely not** in L3 (safe to skip)
87    /// - Returns `false` → item **might** exist (need to check L3)
88    ///
89    /// Only meaningful when the L3 filter is trusted. If untrusted, returns `false`
90    /// (meaning "we don't know, you should check").
91    ///
92    /// # Use Case
93    ///
94    /// Fast early-exit in replication: if definitely missing, apply without checking.
95    ///
96    /// ```rust,no_run
97    /// # use sync_engine::SyncEngine;
98    /// # async fn example(engine: &SyncEngine) {
99    /// if engine.definitely_missing("patient.123") {
100    ///     // Fast path: definitely new, just insert
101    ///     println!("New item, inserting directly");
102    /// } else {
103    ///     // Slow path: might exist, check hash
104    ///     if !engine.is_current("patient.123", "abc123...").await {
105    ///         println!("Outdated, updating");
106    ///     }
107    /// }
108    /// # }
109    /// ```
110    #[must_use]
111    #[inline]
112    pub fn definitely_missing(&self, id: &str) -> bool {
113        // Only authoritative if filter is trusted
114        if !self.l3_filter.is_trusted() {
115            return false; // Unknown, caller should check
116        }
117        // Cuckoo false = definitely not there
118        !self.l3_filter.should_check_l3(id)
119    }
120
121    /// Fast check: might this item exist somewhere?
122    ///
123    /// Checks L1 cache (partial, evicts) and Cuckoo filter (probabilistic).
124    /// - Returns `true` → item is in L1 OR might be in L3 (worth checking)
125    /// - Returns `false` → item is definitely not in L1 or L3
126    ///
127    /// Note: L1 is partial (items evict), so this can return `false` even if
128    /// the item exists in L2/L3. For authoritative check, use `contains()`.
129    ///
130    /// # Use Case
131    ///
132    /// Quick probabilistic check before expensive async lookup.
133    #[must_use]
134    #[inline]
135    pub fn might_exist(&self, id: &str) -> bool {
136        self.l1_cache.contains_key(id) || self.l3_filter.should_check_l3(id)
137    }
138
139    /// Check if the item at `key` has the given content hash.
140    ///
141    /// This is the semantic API for CDC deduplication in replication.
142    /// Returns `true` if the item exists AND its content hash matches.
143    /// Returns `false` if item doesn't exist OR hash differs.
144    ///
145    /// # Arguments
146    /// * `id` - Object ID
147    /// * `content_hash` - SHA256 hash of content (hex-encoded string)
148    ///
149    /// # Example
150    ///
151    /// ```rust,no_run
152    /// # use sync_engine::SyncEngine;
153    /// # async fn example(engine: &SyncEngine) {
154    /// // Skip replication if we already have this version
155    /// let incoming_hash = "abc123...";
156    /// if engine.is_current("patient.123", incoming_hash).await {
157    ///     println!("Already up to date, skipping");
158    ///     return;
159    /// }
160    /// # }
161    /// ```
162    pub async fn is_current(&self, id: &str, content_hash: &str) -> bool {
163        // Check L1 first (fastest)
164        if let Some(item) = self.l1_cache.get(id) {
165            return item.content_hash == content_hash;
166        }
167
168        // Check L2 (if available)
169        if let Some(ref l2) = self.l2_store {
170            if let Ok(Some(item)) = l2.get(id).await {
171                return item.content_hash == content_hash;
172            }
173        }
174
175        // Check L3 (ground truth)
176        if let Some(ref l3) = self.l3_store {
177            if self.l3_filter.should_check_l3(id) {
178                if let Ok(Some(item)) = l3.get(id).await {
179                    return item.content_hash == content_hash;
180                }
181            }
182        }
183
184        false
185    }
186
187    /// Get the current count of items in L1 cache.
188    #[must_use]
189    #[inline]
190    pub fn len(&self) -> usize {
191        self.l1_cache.len()
192    }
193
194    /// Check if L1 cache is empty.
195    #[must_use]
196    #[inline]
197    pub fn is_empty(&self) -> bool {
198        self.l1_cache.is_empty()
199    }
200
201    /// Get the sync status of an item.
202    ///
203    /// Returns detailed state information about where an item exists
204    /// and its sync status across tiers.
205    ///
206    /// # Example
207    ///
208    /// ```rust,no_run
209    /// # use sync_engine::{SyncEngine, ItemStatus};
210    /// # async fn example(engine: &SyncEngine) {
211    /// match engine.status("order.456").await {
212    ///     ItemStatus::Synced { in_l1, in_l2, in_l3 } => {
213    ///         println!("Synced: L1={}, L2={}, L3={}", in_l1, in_l2, in_l3);
214    ///     }
215    ///     ItemStatus::Pending => println!("Queued for sync"),
216    ///     ItemStatus::Missing => println!("Not found"),
217    /// }
218    /// # }
219    /// ```
220    pub async fn status(&self, id: &str) -> ItemStatus {
221        let in_l1 = self.l1_cache.contains_key(id);
222        
223        // Check if pending in batch queue
224        let pending = self.l2_batcher.lock().await.contains(id);
225        if pending {
226            return ItemStatus::Pending;
227        }
228        
229        // Check L2 (if available) - use EXISTS, no filter
230        let in_l2 = if let Some(ref l2) = self.l2_store {
231            l2.exists(id).await.unwrap_or(false)
232        } else {
233            false
234        };
235        
236        // Check L3 (if available)  
237        let in_l3 = if let Some(ref l3) = self.l3_store {
238            self.l3_filter.should_check_l3(id) && l3.get(id).await.ok().flatten().is_some()
239        } else {
240            false
241        };
242        
243        if in_l1 || in_l2 || in_l3 {
244            ItemStatus::Synced { in_l1, in_l2, in_l3 }
245        } else {
246            ItemStatus::Missing
247        }
248    }
249
250    /// Fetch multiple items in parallel.
251    ///
252    /// Returns a vector of `Option<SyncItem>` in the same order as input IDs.
253    /// Missing items are represented as `None`.
254    ///
255    /// # Performance
256    ///
257    /// This method fetches from L1 synchronously, then batches L2/L3 lookups
258    /// for items not in L1. Much faster than sequential `get()` calls.
259    ///
260    /// # Example
261    ///
262    /// ```rust,no_run
263    /// # use sync_engine::SyncEngine;
264    /// # async fn example(engine: &SyncEngine) {
265    /// let ids = vec!["user.1", "user.2", "user.3"];
266    /// let items = engine.get_many(&ids).await;
267    /// for (id, item) in ids.iter().zip(items.iter()) {
268    ///     match item {
269    ///         Some(item) => println!("{}: found", id),
270    ///         None => println!("{}: missing", id),
271    ///     }
272    /// }
273    /// # }
274    /// ```
275    pub async fn get_many(&self, ids: &[&str]) -> Vec<Option<SyncItem>> {
276        let mut results: Vec<Option<SyncItem>> = vec![None; ids.len()];
277        let mut missing_indices: Vec<usize> = Vec::new();
278        
279        // Phase 1: Check L1 (synchronous, fast)
280        for (i, id) in ids.iter().enumerate() {
281            if let Some(item) = self.l1_cache.get(*id) {
282                results[i] = Some(item.clone());
283            } else {
284                missing_indices.push(i);
285            }
286        }
287        
288        // Phase 2: Fetch missing items from L2/L3 in parallel
289        if !missing_indices.is_empty() {
290            let mut join_set: JoinSet<(usize, Option<SyncItem>)> = JoinSet::new();
291            
292            for &i in &missing_indices {
293                let id = ids[i].to_string();
294                let l2_store = self.l2_store.clone();
295                let l3_store = self.l3_store.clone();
296                let l3_filter = self.l3_filter.clone();
297                
298                join_set.spawn(async move {
299                    // Try L2 first (no filter, just try Redis)
300                    if let Some(ref l2) = l2_store {
301                        if let Ok(Some(item)) = l2.get(&id).await {
302                            return (i, Some(item));
303                        }
304                    }
305                    
306                    // Fall back to L3 (use Cuckoo filter if trusted)
307                    if let Some(ref l3) = l3_store {
308                        if !l3_filter.is_trusted() || l3_filter.should_check_l3(&id) {
309                            if let Ok(Some(item)) = l3.get(&id).await {
310                                return (i, Some(item));
311                            }
312                        }
313                    }
314                    
315                    (i, None)
316                });
317            }
318            
319            // Collect results
320            while let Some(result) = join_set.join_next().await {
321                if let Ok((i, item)) = result {
322                    results[i] = item;
323                }
324            }
325        }
326        
327        results
328    }
329
330    /// Submit multiple items for sync atomically.
331    ///
332    /// All items are added to L1 and queued for batch persistence.
333    /// Returns a `BatchResult` with success/failure counts.
334    ///
335    /// # Example
336    ///
337    /// ```rust,no_run
338    /// # use sync_engine::{SyncEngine, SyncItem};
339    /// # use serde_json::json;
340    /// # async fn example(engine: &SyncEngine) {
341    /// let items = vec![
342    ///     SyncItem::from_json("user.1".into(), json!({"name": "Alice"})),
343    ///     SyncItem::from_json("user.2".into(), json!({"name": "Bob"})),
344    /// ];
345    /// let result = engine.submit_many(items).await.unwrap();
346    /// println!("Submitted: {}, Failed: {}", result.succeeded, result.failed);
347    /// # }
348    /// ```
349    pub async fn submit_many(&self, items: Vec<SyncItem>) -> Result<BatchResult, StorageError> {
350        if !self.should_accept_writes() {
351            return Err(StorageError::Backend(format!(
352                "Rejecting batch write: engine state={}, pressure={}",
353                self.state(),
354                self.pressure()
355            )));
356        }
357        
358        let total = items.len();
359        let mut succeeded = 0;
360        
361        // Lock batcher once for the whole batch
362        let mut batcher = self.l2_batcher.lock().await;
363        
364        for item in items {
365            self.insert_l1(item.clone());
366            batcher.add(item);
367            succeeded += 1;
368        }
369        
370        debug!(total, succeeded, "Batch submitted to L1 and queue");
371        
372        Ok(BatchResult {
373            total,
374            succeeded,
375            failed: total - succeeded,
376        })
377    }
378
379    /// Delete multiple items atomically.
380    ///
381    /// Removes items from all tiers (L1, L2, L3) and updates filters.
382    /// Returns a `BatchResult` with counts.
383    ///
384    /// # Example
385    ///
386    /// ```rust,no_run
387    /// # use sync_engine::SyncEngine;
388    /// # async fn example(engine: &SyncEngine) {
389    /// let ids = vec!["user.1", "user.2", "user.3"];
390    /// let result = engine.delete_many(&ids).await.unwrap();
391    /// println!("Deleted: {}", result.succeeded);
392    /// # }
393    /// ```
394    pub async fn delete_many(&self, ids: &[&str]) -> Result<BatchResult, StorageError> {
395        if !self.should_accept_writes() {
396            return Err(StorageError::Backend(format!(
397                "Rejecting batch delete: engine state={}, pressure={}",
398                self.state(),
399                self.pressure()
400            )));
401        }
402        
403        let total = ids.len();
404        let mut succeeded = 0;
405        
406        // Build merkle batch for all deletions
407        let mut merkle_batch = MerkleBatch::new();
408        
409        for id in ids {
410            // Remove from L1
411            if let Some((_, item)) = self.l1_cache.remove(*id) {
412                let size = Self::item_size(&item);
413                self.l1_size_bytes.fetch_sub(size, Ordering::Release);
414            }
415            
416            // Remove from L3 filter (no L2 filter with TTL support)
417            self.l3_filter.remove(id);
418            
419            // Queue merkle deletion
420            merkle_batch.delete(id.to_string());
421            
422            succeeded += 1;
423        }
424        
425        // Batch delete from L2
426        if let Some(ref l2) = self.l2_store {
427            for id in ids {
428                if let Err(e) = l2.delete(id).await {
429                    warn!(id, error = %e, "Failed to delete from L2");
430                }
431            }
432        }
433        
434        // Batch delete from L3
435        if let Some(ref l3) = self.l3_store {
436            for id in ids {
437                if let Err(e) = l3.delete(id).await {
438                    warn!(id, error = %e, "Failed to delete from L3");
439                }
440            }
441        }
442        
443        // Update merkle trees
444        if let Some(ref sql_merkle) = self.sql_merkle {
445            if let Err(e) = sql_merkle.apply_batch(&merkle_batch).await {
446                error!(error = %e, "Failed to update SQL Merkle tree for batch deletion");
447            }
448        }
449        
450        if let Some(ref redis_merkle) = self.redis_merkle {
451            if let Err(e) = redis_merkle.apply_batch(&merkle_batch).await {
452                warn!(error = %e, "Failed to update Redis Merkle tree for batch deletion");
453            }
454        }
455        
456        info!(total, succeeded, "Batch delete completed");
457        
458        Ok(BatchResult {
459            total,
460            succeeded,
461            failed: total - succeeded,
462        })
463    }
464
465    /// Get an item, or compute and insert it if missing.
466    ///
467    /// This is the classic "get or insert" pattern, useful for cache-aside:
468    /// 1. Check cache (L1 → L2 → L3)
469    /// 2. If missing, call the async factory function
470    /// 3. Insert the result and return it
471    ///
472    /// The factory is only called if the item is not found.
473    ///
474    /// # Example
475    ///
476    /// ```rust,no_run
477    /// # use sync_engine::{SyncEngine, SyncItem};
478    /// # use serde_json::json;
479    /// # async fn example(engine: &SyncEngine) {
480    /// let item = engine.get_or_insert_with("user.123", || async {
481    ///     // Expensive operation - only runs if not cached
482    ///     SyncItem::from_json("user.123".into(), json!({"name": "Fetched from DB"}))
483    /// }).await.unwrap();
484    /// # }
485    /// ```
486    pub async fn get_or_insert_with<F, Fut>(
487        &self,
488        id: &str,
489        factory: F,
490    ) -> Result<SyncItem, StorageError>
491    where
492        F: FnOnce() -> Fut,
493        Fut: std::future::Future<Output = SyncItem>,
494    {
495        // Try to get existing
496        if let Some(item) = self.get(id).await? {
497            return Ok(item);
498        }
499        
500        // Not found - compute new value
501        let item = factory().await;
502        
503        // Insert and return
504        self.submit(item.clone()).await?;
505        
506        Ok(item)
507    }
508    
509    // ═══════════════════════════════════════════════════════════════════════════
510    // State-based queries: Fast indexed access by caller-defined state tag
511    // ═══════════════════════════════════════════════════════════════════════════
512    
513    /// Get items by state from SQL (L3 ground truth).
514    ///
515    /// Uses indexed query for fast retrieval.
516    ///
517    /// # Example
518    ///
519    /// ```rust,no_run
520    /// # use sync_engine::{SyncEngine, StorageError};
521    /// # async fn example(engine: &SyncEngine) -> Result<(), StorageError> {
522    /// // Get all delta items for CRDT merging
523    /// let deltas = engine.get_by_state("delta", 1000).await?;
524    /// for item in deltas {
525    ///     println!("Delta: {}", item.object_id);
526    /// }
527    /// # Ok(())
528    /// # }
529    /// ```
530    pub async fn get_by_state(&self, state: &str, limit: usize) -> Result<Vec<SyncItem>, StorageError> {
531        if let Some(ref sql) = self.sql_store {
532            sql.get_by_state(state, limit).await
533        } else {
534            Ok(Vec::new())
535        }
536    }
537    
538    /// Count items in a given state (SQL ground truth).
539    ///
540    /// # Example
541    ///
542    /// ```rust,no_run
543    /// # use sync_engine::{SyncEngine, StorageError};
544    /// # async fn example(engine: &SyncEngine) -> Result<(), StorageError> {
545    /// let pending_count = engine.count_by_state("pending").await?;
546    /// println!("{} items pending", pending_count);
547    /// # Ok(())
548    /// # }
549    /// ```
550    pub async fn count_by_state(&self, state: &str) -> Result<u64, StorageError> {
551        if let Some(ref sql) = self.sql_store {
552            sql.count_by_state(state).await
553        } else {
554            Ok(0)
555        }
556    }
557    
558    /// Get just the IDs of items in a given state (lightweight query).
559    ///
560    /// Returns IDs from SQL. For Redis state SET, use `list_state_ids_redis()`.
561    pub async fn list_state_ids(&self, state: &str, limit: usize) -> Result<Vec<String>, StorageError> {
562        if let Some(ref sql) = self.sql_store {
563            sql.list_state_ids(state, limit).await
564        } else {
565            Ok(Vec::new())
566        }
567    }
568    
569    /// Update the state of an item by ID.
570    ///
571    /// Updates both SQL (ground truth) and Redis state SETs.
572    /// L1 cache is NOT updated - caller should re-fetch if needed.
573    ///
574    /// Returns true if the item was found and updated.
575    pub async fn set_state(&self, id: &str, new_state: &str) -> Result<bool, StorageError> {
576        let mut updated = false;
577        
578        // Update SQL (ground truth)
579        if let Some(ref sql) = self.sql_store {
580            updated = sql.set_state(id, new_state).await?;
581        }
582        
583        // Note: Redis state SETs are not updated here because we'd need to know
584        // the old state to do SREM. For full Redis state management, the item
585        // should be re-submitted with the new state via submit_with().
586        
587        Ok(updated)
588    }
589    
590    /// Delete all items in a given state from SQL.
591    ///
592    /// Also removes from L1 cache and Redis state SET.
593    /// Returns the number of deleted items.
594    ///
595    /// # Example
596    ///
597    /// ```rust,no_run
598    /// # use sync_engine::{SyncEngine, StorageError};
599    /// # async fn example(engine: &SyncEngine) -> Result<(), StorageError> {
600    /// // Clean up all processed deltas
601    /// let deleted = engine.delete_by_state("delta").await?;
602    /// println!("Deleted {} delta items", deleted);
603    /// # Ok(())
604    /// # }
605    /// ```
606    pub async fn delete_by_state(&self, state: &str) -> Result<u64, StorageError> {
607        let mut deleted = 0u64;
608        
609        // Get IDs first (for L1 cleanup)
610        let ids = if let Some(ref sql) = self.sql_store {
611            sql.list_state_ids(state, 100_000).await?
612        } else {
613            Vec::new()
614        };
615        
616        // Remove from L1 cache
617        for id in &ids {
618            self.l1_cache.remove(id);
619        }
620        
621        // Delete from SQL
622        if let Some(ref sql) = self.sql_store {
623            deleted = sql.delete_by_state(state).await?;
624        }
625        
626        // Note: Redis items with TTL will expire naturally.
627        // For immediate Redis cleanup, call delete_by_state on RedisStore directly.
628        
629        info!(state = %state, deleted = deleted, "Deleted items by state");
630        
631        Ok(deleted)
632    }
633    
634    // =========================================================================
635    // Prefix Scan Operations
636    // =========================================================================
637    
638    /// Scan items by ID prefix.
639    ///
640    /// Retrieves all items whose ID starts with the given prefix.
641    /// Queries SQL (ground truth) directly - does NOT check L1 cache.
642    ///
643    /// Useful for CRDT delta-first architecture where deltas are stored as:
644    /// `delta:{object_id}:{op_id}` and you need to fetch all deltas for an object.
645    ///
646    /// # Example
647    ///
648    /// ```rust,no_run
649    /// # use sync_engine::{SyncEngine, StorageError};
650    /// # async fn example(engine: &SyncEngine) -> Result<(), StorageError> {
651    /// // Get base state
652    /// let base = engine.get("base:user.123").await?;
653    ///
654    /// // Get all pending deltas for this object
655    /// let deltas = engine.scan_prefix("delta:user.123:", 1000).await?;
656    ///
657    /// // Merge on-the-fly for read-repair
658    /// for delta in deltas {
659    ///     println!("Delta: {} -> {:?}", delta.object_id, delta.content_as_json());
660    /// }
661    /// # Ok(())
662    /// # }
663    /// ```
664    pub async fn scan_prefix(&self, prefix: &str, limit: usize) -> Result<Vec<SyncItem>, StorageError> {
665        if let Some(ref sql) = self.sql_store {
666            sql.scan_prefix(prefix, limit).await
667        } else {
668            Ok(Vec::new())
669        }
670    }
671    
672    /// Count items matching an ID prefix (SQL ground truth).
673    pub async fn count_prefix(&self, prefix: &str) -> Result<u64, StorageError> {
674        if let Some(ref sql) = self.sql_store {
675            sql.count_prefix(prefix).await
676        } else {
677            Ok(0)
678        }
679    }
680    
681    /// Delete all items matching an ID prefix.
682    ///
683    /// Removes from L1 cache, SQL, and Redis.
684    /// Returns the number of deleted items.
685    ///
686    /// # Example
687    ///
688    /// ```rust,no_run
689    /// # use sync_engine::{SyncEngine, StorageError};
690    /// # async fn example(engine: &SyncEngine) -> Result<(), StorageError> {
691    /// // After merging deltas into base, clean them up
692    /// let deleted = engine.delete_prefix("delta:user.123:").await?;
693    /// println!("Cleaned up {} deltas", deleted);
694    /// # Ok(())
695    /// # }
696    /// ```
697    pub async fn delete_prefix(&self, prefix: &str) -> Result<u64, StorageError> {
698        let mut deleted = 0u64;
699        
700        // Get IDs first (for L1/L2 cleanup)
701        let items = if let Some(ref sql) = self.sql_store {
702            sql.scan_prefix(prefix, 100_000).await?
703        } else {
704            Vec::new()
705        };
706        
707        // Remove from L1 cache
708        for item in &items {
709            self.l1_cache.remove(&item.object_id);
710        }
711        
712        // Remove from L2 (Redis) one-by-one via CacheStore trait
713        if let Some(ref l2) = self.l2_store {
714            for item in &items {
715                let _ = l2.delete(&item.object_id).await;
716            }
717        }
718        
719        // Delete from SQL
720        if let Some(ref sql) = self.sql_store {
721            deleted = sql.delete_prefix(prefix).await?;
722        }
723        
724        info!(prefix = %prefix, deleted = deleted, "Deleted items by prefix");
725        
726        Ok(deleted)
727    }
728}
729
730#[cfg(test)]
731mod tests {
732    use super::*;
733    use crate::config::SyncEngineConfig;
734    use serde_json::json;
735    use tokio::sync::watch;
736
737    fn test_config() -> SyncEngineConfig {
738        SyncEngineConfig {
739            redis_url: None,
740            sql_url: None,
741            wal_path: None,
742            l1_max_bytes: 1024 * 1024,
743            ..Default::default()
744        }
745    }
746
747    fn test_item(id: &str) -> SyncItem {
748        SyncItem::from_json(id.to_string(), json!({"test": "data", "id": id}))
749    }
750
751    #[tokio::test]
752    async fn test_contains_l1_hit() {
753        let config = test_config();
754        let (_tx, rx) = watch::channel(config.clone());
755        let engine = SyncEngine::new(config, rx);
756        
757        engine.l1_cache.insert("test.exists".into(), test_item("test.exists"));
758        
759        assert!(engine.contains("test.exists").await);
760    }
761
762    #[tokio::test]
763    async fn test_contains_with_trusted_filter() {
764        let config = test_config();
765        let (_tx, rx) = watch::channel(config.clone());
766        let engine = SyncEngine::new(config, rx);
767        
768        engine.l3_filter.mark_trusted();
769        
770        engine.l1_cache.insert("test.exists".into(), test_item("test.exists"));
771        engine.l3_filter.insert("test.exists");
772        
773        assert!(engine.contains("test.exists").await);
774        assert!(!engine.contains("test.missing").await);
775    }
776
777    #[test]
778    fn test_len_and_is_empty() {
779        let config = test_config();
780        let (_tx, rx) = watch::channel(config.clone());
781        let engine = SyncEngine::new(config, rx);
782        
783        assert!(engine.is_empty());
784        assert_eq!(engine.len(), 0);
785        
786        engine.l1_cache.insert("a".into(), test_item("a"));
787        assert!(!engine.is_empty());
788        assert_eq!(engine.len(), 1);
789        
790        engine.l1_cache.insert("b".into(), test_item("b"));
791        assert_eq!(engine.len(), 2);
792    }
793
794    #[tokio::test]
795    async fn test_status_synced_in_l1() {
796        use super::super::EngineState;
797        
798        let config = test_config();
799        let (_tx, rx) = watch::channel(config.clone());
800        let engine = SyncEngine::new(config, rx);
801        let _ = engine.state.send(EngineState::Ready);
802        
803        engine.submit(test_item("test.item")).await.expect("Submit failed");
804        let _ = engine.l2_batcher.lock().await.force_flush();
805        
806        let status = engine.status("test.item").await;
807        assert!(matches!(status, ItemStatus::Synced { in_l1: true, .. }));
808    }
809
810    #[tokio::test]
811    async fn test_status_pending() {
812        use super::super::EngineState;
813        
814        let config = test_config();
815        let (_tx, rx) = watch::channel(config.clone());
816        let engine = SyncEngine::new(config, rx);
817        let _ = engine.state.send(EngineState::Ready);
818        
819        engine.submit(test_item("test.pending")).await.expect("Submit failed");
820        
821        let status = engine.status("test.pending").await;
822        assert_eq!(status, ItemStatus::Pending);
823    }
824
825    #[tokio::test]
826    async fn test_status_missing() {
827        let config = test_config();
828        let (_tx, rx) = watch::channel(config.clone());
829        let engine = SyncEngine::new(config, rx);
830        
831        let status = engine.status("test.nonexistent").await;
832        assert_eq!(status, ItemStatus::Missing);
833    }
834
835    #[tokio::test]
836    async fn test_get_many_from_l1() {
837        use super::super::EngineState;
838        
839        let config = test_config();
840        let (_tx, rx) = watch::channel(config.clone());
841        let engine = SyncEngine::new(config, rx);
842        let _ = engine.state.send(EngineState::Ready);
843        
844        engine.l1_cache.insert("a".into(), test_item("a"));
845        engine.l1_cache.insert("b".into(), test_item("b"));
846        engine.l1_cache.insert("c".into(), test_item("c"));
847        
848        let results = engine.get_many(&["a", "b", "missing", "c"]).await;
849        
850        assert_eq!(results.len(), 4);
851        assert!(results[0].is_some());
852        assert!(results[1].is_some());
853        assert!(results[2].is_none());
854        assert!(results[3].is_some());
855        
856        assert_eq!(results[0].as_ref().unwrap().object_id, "a");
857        assert_eq!(results[1].as_ref().unwrap().object_id, "b");
858        assert_eq!(results[3].as_ref().unwrap().object_id, "c");
859    }
860
861    #[tokio::test]
862    async fn test_submit_many() {
863        use super::super::EngineState;
864        
865        let config = test_config();
866        let (_tx, rx) = watch::channel(config.clone());
867        let engine = SyncEngine::new(config, rx);
868        let _ = engine.state.send(EngineState::Ready);
869        
870        let items = vec![
871            test_item("batch.1"),
872            test_item("batch.2"),
873            test_item("batch.3"),
874        ];
875        
876        let result = engine.submit_many(items).await.expect("Batch submit failed");
877        
878        assert_eq!(result.total, 3);
879        assert_eq!(result.succeeded, 3);
880        assert_eq!(result.failed, 0);
881        assert!(result.is_success());
882        
883        assert_eq!(engine.len(), 3);
884        assert!(engine.contains("batch.1").await);
885        assert!(engine.contains("batch.2").await);
886        assert!(engine.contains("batch.3").await);
887    }
888
889    #[tokio::test]
890    async fn test_delete_many() {
891        use super::super::EngineState;
892        
893        let config = test_config();
894        let (_tx, rx) = watch::channel(config.clone());
895        let engine = SyncEngine::new(config, rx);
896        let _ = engine.state.send(EngineState::Ready);
897        
898        engine.l1_cache.insert("del.1".into(), test_item("del.1"));
899        engine.l1_cache.insert("del.2".into(), test_item("del.2"));
900        engine.l1_cache.insert("keep".into(), test_item("keep"));
901        
902        let result = engine.delete_many(&["del.1", "del.2"]).await.expect("Batch delete failed");
903        
904        assert_eq!(result.total, 2);
905        assert_eq!(result.succeeded, 2);
906        assert!(result.is_success());
907        
908        assert!(!engine.l1_cache.contains_key("del.1"));
909        assert!(!engine.l1_cache.contains_key("del.2"));
910        assert!(engine.l1_cache.contains_key("keep"));
911    }
912
913    #[tokio::test]
914    async fn test_get_or_insert_with_existing() {
915        use super::super::EngineState;
916        
917        let config = test_config();
918        let (_tx, rx) = watch::channel(config.clone());
919        let engine = SyncEngine::new(config, rx);
920        let _ = engine.state.send(EngineState::Ready);
921        
922        let existing = test_item("existing");
923        engine.l1_cache.insert("existing".into(), existing.clone());
924        
925        let factory_called = std::sync::atomic::AtomicBool::new(false);
926        let result = engine.get_or_insert_with("existing", || {
927            factory_called.store(true, std::sync::atomic::Ordering::SeqCst);
928            async { test_item("should_not_be_used") }
929        }).await.expect("get_or_insert_with failed");
930        
931        assert!(!factory_called.load(std::sync::atomic::Ordering::SeqCst));
932        assert_eq!(result.object_id, "existing");
933    }
934
935    #[tokio::test]
936    async fn test_get_or_insert_with_missing() {
937        use super::super::EngineState;
938        
939        let config = test_config();
940        let (_tx, rx) = watch::channel(config.clone());
941        let engine = SyncEngine::new(config, rx);
942        let _ = engine.state.send(EngineState::Ready);
943        
944        let result = engine.get_or_insert_with("new_item", || async {
945            SyncItem::from_json("new_item".into(), json!({"created": "by factory"}))
946        }).await.expect("get_or_insert_with failed");
947        
948        assert_eq!(result.object_id, "new_item");
949        assert!(engine.contains("new_item").await);
950    }
951}