sync_engine/coordinator/
api.rs

1//! V1.1 API: Query and batch operations.
2//!
3//! This module contains the higher-level API methods added in V1.1:
4//! - `contains()` - Fast probabilistic existence check
5//! - `len()` / `is_empty()` - L1 cache size queries
6//! - `status()` - Detailed sync status
7//! - `get_many()` - Parallel batch fetch
8//! - `submit_many()` - Batch upsert
9//! - `delete_many()` - Batch delete
10//! - `get_or_insert_with()` - Cache-aside pattern
11
12use std::sync::atomic::Ordering;
13use tokio::task::JoinSet;
14use tracing::{debug, info, warn, error};
15
16use crate::storage::traits::StorageError;
17use crate::sync_item::SyncItem;
18use crate::merkle::MerkleBatch;
19
20use super::{SyncEngine, ItemStatus, BatchResult};
21
22impl SyncEngine {
23    // ═══════════════════════════════════════════════════════════════════════════
24    // API: Query & Batch Operations
25    // ═══════════════════════════════════════════════════════════════════════════
26
27    /// Check if an item exists across all tiers.
28    ///
29    /// Checks in order: L1 cache → Redis EXISTS → L3 Cuckoo filter → SQL query.
30    /// If found in SQL and Cuckoo filter was untrusted, updates the filter.
31    ///
32    /// # Returns
33    /// - `true` → item definitely exists in at least one tier
34    /// - `false` → item does not exist (authoritative)
35    ///
36    /// # Example
37    ///
38    /// ```rust,no_run
39    /// # use sync_engine::SyncEngine;
40    /// # async fn example(engine: &SyncEngine) {
41    /// if engine.contains("user.123").await {
42    ///     let item = engine.get("user.123").await;
43    /// } else {
44    ///     println!("Not found");
45    /// }
46    /// # }
47    /// ```
48    pub async fn contains(&self, id: &str) -> bool {
49        // L1: In-memory cache (definitive, sync)
50        if self.l1_cache.contains_key(id) {
51            return true;
52        }
53        
54        // L2: Redis EXISTS (async, authoritative for Redis tier)
55        if let Some(ref l2) = self.l2_store {
56            if l2.exists(id).await.unwrap_or(false) {
57                return true;
58            }
59        }
60        
61        // L3: Cuckoo filter check (sync, probabilistic)
62        if self.l3_filter.is_trusted() {
63            // Filter is trusted - use it for fast negative
64            if !self.l3_filter.should_check_l3(id) {
65                return false; // Definitely not in L3
66            }
67        }
68        
69        // L3: SQL query (async, ground truth)
70        if let Some(ref l3) = self.l3_store {
71            if l3.exists(id).await.unwrap_or(false) {
72                // Found in SQL - update Cuckoo if it was untrusted
73                if !self.l3_filter.is_trusted() {
74                    self.l3_filter.insert(id);
75                }
76                return true;
77            }
78        }
79        
80        false
81    }
82    
83    /// Fast sync check if item might exist (L1 + Cuckoo only).
84    ///
85    /// Use this when you need a quick probabilistic check without async.
86    /// For authoritative check, use `contains()` instead.
87    #[must_use]
88    #[inline]
89    pub fn contains_fast(&self, id: &str) -> bool {
90        self.l1_cache.contains_key(id) || self.l3_filter.should_check_l3(id)
91    }
92
93    /// Check if the item at `key` has the given content hash.
94    ///
95    /// This is the semantic API for CDC deduplication in replication.
96    /// Returns `true` if the item exists AND its content hash matches.
97    /// Returns `false` if item doesn't exist OR hash differs.
98    ///
99    /// # Arguments
100    /// * `id` - Object ID
101    /// * `content_hash` - SHA256 hash of content (hex-encoded string)
102    ///
103    /// # Example
104    ///
105    /// ```rust,no_run
106    /// # use sync_engine::SyncEngine;
107    /// # async fn example(engine: &SyncEngine) {
108    /// // Skip replication if we already have this version
109    /// let incoming_hash = "abc123...";
110    /// if engine.is_current("patient.123", incoming_hash).await {
111    ///     println!("Already up to date, skipping");
112    ///     return;
113    /// }
114    /// # }
115    /// ```
116    pub async fn is_current(&self, id: &str, content_hash: &str) -> bool {
117        // Check L1 first (fastest)
118        if let Some(item) = self.l1_cache.get(id) {
119            return item.content_hash == content_hash;
120        }
121
122        // Check L2 (if available)
123        if let Some(ref l2) = self.l2_store {
124            if let Ok(Some(item)) = l2.get(id).await {
125                return item.content_hash == content_hash;
126            }
127        }
128
129        // Check L3 (ground truth)
130        if let Some(ref l3) = self.l3_store {
131            if self.l3_filter.should_check_l3(id) {
132                if let Ok(Some(item)) = l3.get(id).await {
133                    return item.content_hash == content_hash;
134                }
135            }
136        }
137
138        false
139    }
140
141    /// Get the current count of items in L1 cache.
142    #[must_use]
143    #[inline]
144    pub fn len(&self) -> usize {
145        self.l1_cache.len()
146    }
147
148    /// Check if L1 cache is empty.
149    #[must_use]
150    #[inline]
151    pub fn is_empty(&self) -> bool {
152        self.l1_cache.is_empty()
153    }
154
155    /// Get the sync status of an item.
156    ///
157    /// Returns detailed state information about where an item exists
158    /// and its sync status across tiers.
159    ///
160    /// # Example
161    ///
162    /// ```rust,no_run
163    /// # use sync_engine::{SyncEngine, ItemStatus};
164    /// # async fn example(engine: &SyncEngine) {
165    /// match engine.status("order.456").await {
166    ///     ItemStatus::Synced { in_l1, in_l2, in_l3 } => {
167    ///         println!("Synced: L1={}, L2={}, L3={}", in_l1, in_l2, in_l3);
168    ///     }
169    ///     ItemStatus::Pending => println!("Queued for sync"),
170    ///     ItemStatus::Missing => println!("Not found"),
171    /// }
172    /// # }
173    /// ```
174    pub async fn status(&self, id: &str) -> ItemStatus {
175        let in_l1 = self.l1_cache.contains_key(id);
176        
177        // Check if pending in batch queue
178        let pending = self.l2_batcher.lock().await.contains(id);
179        if pending {
180            return ItemStatus::Pending;
181        }
182        
183        // Check L2 (if available) - use EXISTS, no filter
184        let in_l2 = if let Some(ref l2) = self.l2_store {
185            l2.exists(id).await.unwrap_or(false)
186        } else {
187            false
188        };
189        
190        // Check L3 (if available)  
191        let in_l3 = if let Some(ref l3) = self.l3_store {
192            self.l3_filter.should_check_l3(id) && l3.get(id).await.ok().flatten().is_some()
193        } else {
194            false
195        };
196        
197        if in_l1 || in_l2 || in_l3 {
198            ItemStatus::Synced { in_l1, in_l2, in_l3 }
199        } else {
200            ItemStatus::Missing
201        }
202    }
203
204    /// Fetch multiple items in parallel.
205    ///
206    /// Returns a vector of `Option<SyncItem>` in the same order as input IDs.
207    /// Missing items are represented as `None`.
208    ///
209    /// # Performance
210    ///
211    /// This method fetches from L1 synchronously, then batches L2/L3 lookups
212    /// for items not in L1. Much faster than sequential `get()` calls.
213    ///
214    /// # Example
215    ///
216    /// ```rust,no_run
217    /// # use sync_engine::SyncEngine;
218    /// # async fn example(engine: &SyncEngine) {
219    /// let ids = vec!["user.1", "user.2", "user.3"];
220    /// let items = engine.get_many(&ids).await;
221    /// for (id, item) in ids.iter().zip(items.iter()) {
222    ///     match item {
223    ///         Some(item) => println!("{}: found", id),
224    ///         None => println!("{}: missing", id),
225    ///     }
226    /// }
227    /// # }
228    /// ```
229    pub async fn get_many(&self, ids: &[&str]) -> Vec<Option<SyncItem>> {
230        let mut results: Vec<Option<SyncItem>> = vec![None; ids.len()];
231        let mut missing_indices: Vec<usize> = Vec::new();
232        
233        // Phase 1: Check L1 (synchronous, fast)
234        for (i, id) in ids.iter().enumerate() {
235            if let Some(item) = self.l1_cache.get(*id) {
236                results[i] = Some(item.clone());
237            } else {
238                missing_indices.push(i);
239            }
240        }
241        
242        // Phase 2: Fetch missing items from L2/L3 in parallel
243        if !missing_indices.is_empty() {
244            let mut join_set: JoinSet<(usize, Option<SyncItem>)> = JoinSet::new();
245            
246            for &i in &missing_indices {
247                let id = ids[i].to_string();
248                let l2_store = self.l2_store.clone();
249                let l3_store = self.l3_store.clone();
250                let l3_filter = self.l3_filter.clone();
251                
252                join_set.spawn(async move {
253                    // Try L2 first (no filter, just try Redis)
254                    if let Some(ref l2) = l2_store {
255                        if let Ok(Some(item)) = l2.get(&id).await {
256                            return (i, Some(item));
257                        }
258                    }
259                    
260                    // Fall back to L3 (use Cuckoo filter if trusted)
261                    if let Some(ref l3) = l3_store {
262                        if !l3_filter.is_trusted() || l3_filter.should_check_l3(&id) {
263                            if let Ok(Some(item)) = l3.get(&id).await {
264                                return (i, Some(item));
265                            }
266                        }
267                    }
268                    
269                    (i, None)
270                });
271            }
272            
273            // Collect results
274            while let Some(result) = join_set.join_next().await {
275                if let Ok((i, item)) = result {
276                    results[i] = item;
277                }
278            }
279        }
280        
281        results
282    }
283
284    /// Submit multiple items for sync atomically.
285    ///
286    /// All items are added to L1 and queued for batch persistence.
287    /// Returns a `BatchResult` with success/failure counts.
288    ///
289    /// # Example
290    ///
291    /// ```rust,no_run
292    /// # use sync_engine::{SyncEngine, SyncItem};
293    /// # use serde_json::json;
294    /// # async fn example(engine: &SyncEngine) {
295    /// let items = vec![
296    ///     SyncItem::from_json("user.1".into(), json!({"name": "Alice"})),
297    ///     SyncItem::from_json("user.2".into(), json!({"name": "Bob"})),
298    /// ];
299    /// let result = engine.submit_many(items).await.unwrap();
300    /// println!("Submitted: {}, Failed: {}", result.succeeded, result.failed);
301    /// # }
302    /// ```
303    pub async fn submit_many(&self, items: Vec<SyncItem>) -> Result<BatchResult, StorageError> {
304        if !self.should_accept_writes() {
305            return Err(StorageError::Backend(format!(
306                "Rejecting batch write: engine state={}, pressure={}",
307                self.state(),
308                self.pressure()
309            )));
310        }
311        
312        let total = items.len();
313        let mut succeeded = 0;
314        
315        // Lock batcher once for the whole batch
316        let mut batcher = self.l2_batcher.lock().await;
317        
318        for item in items {
319            self.insert_l1(item.clone());
320            batcher.add(item);
321            succeeded += 1;
322        }
323        
324        debug!(total, succeeded, "Batch submitted to L1 and queue");
325        
326        Ok(BatchResult {
327            total,
328            succeeded,
329            failed: total - succeeded,
330        })
331    }
332
333    /// Delete multiple items atomically.
334    ///
335    /// Removes items from all tiers (L1, L2, L3) and updates filters.
336    /// Returns a `BatchResult` with counts.
337    ///
338    /// # Example
339    ///
340    /// ```rust,no_run
341    /// # use sync_engine::SyncEngine;
342    /// # async fn example(engine: &SyncEngine) {
343    /// let ids = vec!["user.1", "user.2", "user.3"];
344    /// let result = engine.delete_many(&ids).await.unwrap();
345    /// println!("Deleted: {}", result.succeeded);
346    /// # }
347    /// ```
348    pub async fn delete_many(&self, ids: &[&str]) -> Result<BatchResult, StorageError> {
349        if !self.should_accept_writes() {
350            return Err(StorageError::Backend(format!(
351                "Rejecting batch delete: engine state={}, pressure={}",
352                self.state(),
353                self.pressure()
354            )));
355        }
356        
357        let total = ids.len();
358        let mut succeeded = 0;
359        
360        // Build merkle batch for all deletions
361        let mut merkle_batch = MerkleBatch::new();
362        
363        for id in ids {
364            // Remove from L1
365            if let Some((_, item)) = self.l1_cache.remove(*id) {
366                let size = Self::item_size(&item);
367                self.l1_size_bytes.fetch_sub(size, Ordering::Release);
368            }
369            
370            // Remove from L3 filter (no L2 filter with TTL support)
371            self.l3_filter.remove(id);
372            
373            // Queue merkle deletion
374            merkle_batch.delete(id.to_string());
375            
376            succeeded += 1;
377        }
378        
379        // Batch delete from L2
380        if let Some(ref l2) = self.l2_store {
381            for id in ids {
382                if let Err(e) = l2.delete(id).await {
383                    warn!(id, error = %e, "Failed to delete from L2");
384                }
385            }
386        }
387        
388        // Batch delete from L3
389        if let Some(ref l3) = self.l3_store {
390            for id in ids {
391                if let Err(e) = l3.delete(id).await {
392                    warn!(id, error = %e, "Failed to delete from L3");
393                }
394            }
395        }
396        
397        // Update merkle trees
398        if let Some(ref sql_merkle) = self.sql_merkle {
399            if let Err(e) = sql_merkle.apply_batch(&merkle_batch).await {
400                error!(error = %e, "Failed to update SQL Merkle tree for batch deletion");
401            }
402        }
403        
404        if let Some(ref redis_merkle) = self.redis_merkle {
405            if let Err(e) = redis_merkle.apply_batch(&merkle_batch).await {
406                warn!(error = %e, "Failed to update Redis Merkle tree for batch deletion");
407            }
408        }
409        
410        info!(total, succeeded, "Batch delete completed");
411        
412        Ok(BatchResult {
413            total,
414            succeeded,
415            failed: total - succeeded,
416        })
417    }
418
419    /// Get an item, or compute and insert it if missing.
420    ///
421    /// This is the classic "get or insert" pattern, useful for cache-aside:
422    /// 1. Check cache (L1 → L2 → L3)
423    /// 2. If missing, call the async factory function
424    /// 3. Insert the result and return it
425    ///
426    /// The factory is only called if the item is not found.
427    ///
428    /// # Example
429    ///
430    /// ```rust,no_run
431    /// # use sync_engine::{SyncEngine, SyncItem};
432    /// # use serde_json::json;
433    /// # async fn example(engine: &SyncEngine) {
434    /// let item = engine.get_or_insert_with("user.123", || async {
435    ///     // Expensive operation - only runs if not cached
436    ///     SyncItem::from_json("user.123".into(), json!({"name": "Fetched from DB"}))
437    /// }).await.unwrap();
438    /// # }
439    /// ```
440    pub async fn get_or_insert_with<F, Fut>(
441        &self,
442        id: &str,
443        factory: F,
444    ) -> Result<SyncItem, StorageError>
445    where
446        F: FnOnce() -> Fut,
447        Fut: std::future::Future<Output = SyncItem>,
448    {
449        // Try to get existing
450        if let Some(item) = self.get(id).await? {
451            return Ok(item);
452        }
453        
454        // Not found - compute new value
455        let item = factory().await;
456        
457        // Insert and return
458        self.submit(item.clone()).await?;
459        
460        Ok(item)
461    }
462    
463    // ═══════════════════════════════════════════════════════════════════════════
464    // State-based queries: Fast indexed access by caller-defined state tag
465    // ═══════════════════════════════════════════════════════════════════════════
466    
467    /// Get items by state from SQL (L3 ground truth).
468    ///
469    /// Uses indexed query for fast retrieval.
470    ///
471    /// # Example
472    ///
473    /// ```rust,no_run
474    /// # use sync_engine::{SyncEngine, StorageError};
475    /// # async fn example(engine: &SyncEngine) -> Result<(), StorageError> {
476    /// // Get all delta items for CRDT merging
477    /// let deltas = engine.get_by_state("delta", 1000).await?;
478    /// for item in deltas {
479    ///     println!("Delta: {}", item.object_id);
480    /// }
481    /// # Ok(())
482    /// # }
483    /// ```
484    pub async fn get_by_state(&self, state: &str, limit: usize) -> Result<Vec<SyncItem>, StorageError> {
485        if let Some(ref sql) = self.sql_store {
486            sql.get_by_state(state, limit).await
487        } else {
488            Ok(Vec::new())
489        }
490    }
491    
492    /// Count items in a given state (SQL ground truth).
493    ///
494    /// # Example
495    ///
496    /// ```rust,no_run
497    /// # use sync_engine::{SyncEngine, StorageError};
498    /// # async fn example(engine: &SyncEngine) -> Result<(), StorageError> {
499    /// let pending_count = engine.count_by_state("pending").await?;
500    /// println!("{} items pending", pending_count);
501    /// # Ok(())
502    /// # }
503    /// ```
504    pub async fn count_by_state(&self, state: &str) -> Result<u64, StorageError> {
505        if let Some(ref sql) = self.sql_store {
506            sql.count_by_state(state).await
507        } else {
508            Ok(0)
509        }
510    }
511    
512    /// Get just the IDs of items in a given state (lightweight query).
513    ///
514    /// Returns IDs from SQL. For Redis state SET, use `list_state_ids_redis()`.
515    pub async fn list_state_ids(&self, state: &str, limit: usize) -> Result<Vec<String>, StorageError> {
516        if let Some(ref sql) = self.sql_store {
517            sql.list_state_ids(state, limit).await
518        } else {
519            Ok(Vec::new())
520        }
521    }
522    
523    /// Update the state of an item by ID.
524    ///
525    /// Updates both SQL (ground truth) and Redis state SETs.
526    /// L1 cache is NOT updated - caller should re-fetch if needed.
527    ///
528    /// Returns true if the item was found and updated.
529    pub async fn set_state(&self, id: &str, new_state: &str) -> Result<bool, StorageError> {
530        let mut updated = false;
531        
532        // Update SQL (ground truth)
533        if let Some(ref sql) = self.sql_store {
534            updated = sql.set_state(id, new_state).await?;
535        }
536        
537        // Note: Redis state SETs are not updated here because we'd need to know
538        // the old state to do SREM. For full Redis state management, the item
539        // should be re-submitted with the new state via submit_with().
540        
541        Ok(updated)
542    }
543    
544    /// Delete all items in a given state from SQL.
545    ///
546    /// Also removes from L1 cache and Redis state SET.
547    /// Returns the number of deleted items.
548    ///
549    /// # Example
550    ///
551    /// ```rust,no_run
552    /// # use sync_engine::{SyncEngine, StorageError};
553    /// # async fn example(engine: &SyncEngine) -> Result<(), StorageError> {
554    /// // Clean up all processed deltas
555    /// let deleted = engine.delete_by_state("delta").await?;
556    /// println!("Deleted {} delta items", deleted);
557    /// # Ok(())
558    /// # }
559    /// ```
560    pub async fn delete_by_state(&self, state: &str) -> Result<u64, StorageError> {
561        let mut deleted = 0u64;
562        
563        // Get IDs first (for L1 cleanup)
564        let ids = if let Some(ref sql) = self.sql_store {
565            sql.list_state_ids(state, 100_000).await?
566        } else {
567            Vec::new()
568        };
569        
570        // Remove from L1 cache
571        for id in &ids {
572            self.l1_cache.remove(id);
573        }
574        
575        // Delete from SQL
576        if let Some(ref sql) = self.sql_store {
577            deleted = sql.delete_by_state(state).await?;
578        }
579        
580        // Note: Redis items with TTL will expire naturally.
581        // For immediate Redis cleanup, call delete_by_state on RedisStore directly.
582        
583        info!(state = %state, deleted = deleted, "Deleted items by state");
584        
585        Ok(deleted)
586    }
587    
588    // =========================================================================
589    // Prefix Scan Operations
590    // =========================================================================
591    
592    /// Scan items by ID prefix.
593    ///
594    /// Retrieves all items whose ID starts with the given prefix.
595    /// Queries SQL (ground truth) directly - does NOT check L1 cache.
596    ///
597    /// Useful for CRDT delta-first architecture where deltas are stored as:
598    /// `delta:{object_id}:{op_id}` and you need to fetch all deltas for an object.
599    ///
600    /// # Example
601    ///
602    /// ```rust,no_run
603    /// # use sync_engine::{SyncEngine, StorageError};
604    /// # async fn example(engine: &SyncEngine) -> Result<(), StorageError> {
605    /// // Get base state
606    /// let base = engine.get("base:user.123").await?;
607    ///
608    /// // Get all pending deltas for this object
609    /// let deltas = engine.scan_prefix("delta:user.123:", 1000).await?;
610    ///
611    /// // Merge on-the-fly for read-repair
612    /// for delta in deltas {
613    ///     println!("Delta: {} -> {:?}", delta.object_id, delta.content_as_json());
614    /// }
615    /// # Ok(())
616    /// # }
617    /// ```
618    pub async fn scan_prefix(&self, prefix: &str, limit: usize) -> Result<Vec<SyncItem>, StorageError> {
619        if let Some(ref sql) = self.sql_store {
620            sql.scan_prefix(prefix, limit).await
621        } else {
622            Ok(Vec::new())
623        }
624    }
625    
626    /// Count items matching an ID prefix (SQL ground truth).
627    pub async fn count_prefix(&self, prefix: &str) -> Result<u64, StorageError> {
628        if let Some(ref sql) = self.sql_store {
629            sql.count_prefix(prefix).await
630        } else {
631            Ok(0)
632        }
633    }
634    
635    /// Delete all items matching an ID prefix.
636    ///
637    /// Removes from L1 cache, SQL, and Redis.
638    /// Returns the number of deleted items.
639    ///
640    /// # Example
641    ///
642    /// ```rust,no_run
643    /// # use sync_engine::{SyncEngine, StorageError};
644    /// # async fn example(engine: &SyncEngine) -> Result<(), StorageError> {
645    /// // After merging deltas into base, clean them up
646    /// let deleted = engine.delete_prefix("delta:user.123:").await?;
647    /// println!("Cleaned up {} deltas", deleted);
648    /// # Ok(())
649    /// # }
650    /// ```
651    pub async fn delete_prefix(&self, prefix: &str) -> Result<u64, StorageError> {
652        let mut deleted = 0u64;
653        
654        // Get IDs first (for L1/L2 cleanup)
655        let items = if let Some(ref sql) = self.sql_store {
656            sql.scan_prefix(prefix, 100_000).await?
657        } else {
658            Vec::new()
659        };
660        
661        // Remove from L1 cache
662        for item in &items {
663            self.l1_cache.remove(&item.object_id);
664        }
665        
666        // Remove from L2 (Redis) one-by-one via CacheStore trait
667        if let Some(ref l2) = self.l2_store {
668            for item in &items {
669                let _ = l2.delete(&item.object_id).await;
670            }
671        }
672        
673        // Delete from SQL
674        if let Some(ref sql) = self.sql_store {
675            deleted = sql.delete_prefix(prefix).await?;
676        }
677        
678        info!(prefix = %prefix, deleted = deleted, "Deleted items by prefix");
679        
680        Ok(deleted)
681    }
682}
683
684#[cfg(test)]
685mod tests {
686    use super::*;
687    use crate::config::SyncEngineConfig;
688    use serde_json::json;
689    use tokio::sync::watch;
690
691    fn test_config() -> SyncEngineConfig {
692        SyncEngineConfig {
693            redis_url: None,
694            sql_url: None,
695            wal_path: None,
696            l1_max_bytes: 1024 * 1024,
697            ..Default::default()
698        }
699    }
700
701    fn test_item(id: &str) -> SyncItem {
702        SyncItem::from_json(id.to_string(), json!({"test": "data", "id": id}))
703    }
704
705    #[tokio::test]
706    async fn test_contains_l1_hit() {
707        let config = test_config();
708        let (_tx, rx) = watch::channel(config.clone());
709        let engine = SyncEngine::new(config, rx);
710        
711        engine.l1_cache.insert("test.exists".into(), test_item("test.exists"));
712        
713        assert!(engine.contains("test.exists").await);
714    }
715
716    #[tokio::test]
717    async fn test_contains_with_trusted_filter() {
718        let config = test_config();
719        let (_tx, rx) = watch::channel(config.clone());
720        let engine = SyncEngine::new(config, rx);
721        
722        engine.l3_filter.mark_trusted();
723        
724        engine.l1_cache.insert("test.exists".into(), test_item("test.exists"));
725        engine.l3_filter.insert("test.exists");
726        
727        assert!(engine.contains("test.exists").await);
728        assert!(!engine.contains("test.missing").await);
729    }
730
731    #[test]
732    fn test_len_and_is_empty() {
733        let config = test_config();
734        let (_tx, rx) = watch::channel(config.clone());
735        let engine = SyncEngine::new(config, rx);
736        
737        assert!(engine.is_empty());
738        assert_eq!(engine.len(), 0);
739        
740        engine.l1_cache.insert("a".into(), test_item("a"));
741        assert!(!engine.is_empty());
742        assert_eq!(engine.len(), 1);
743        
744        engine.l1_cache.insert("b".into(), test_item("b"));
745        assert_eq!(engine.len(), 2);
746    }
747
748    #[tokio::test]
749    async fn test_status_synced_in_l1() {
750        use super::super::EngineState;
751        
752        let config = test_config();
753        let (_tx, rx) = watch::channel(config.clone());
754        let engine = SyncEngine::new(config, rx);
755        let _ = engine.state.send(EngineState::Ready);
756        
757        engine.submit(test_item("test.item")).await.expect("Submit failed");
758        let _ = engine.l2_batcher.lock().await.force_flush();
759        
760        let status = engine.status("test.item").await;
761        assert!(matches!(status, ItemStatus::Synced { in_l1: true, .. }));
762    }
763
764    #[tokio::test]
765    async fn test_status_pending() {
766        use super::super::EngineState;
767        
768        let config = test_config();
769        let (_tx, rx) = watch::channel(config.clone());
770        let engine = SyncEngine::new(config, rx);
771        let _ = engine.state.send(EngineState::Ready);
772        
773        engine.submit(test_item("test.pending")).await.expect("Submit failed");
774        
775        let status = engine.status("test.pending").await;
776        assert_eq!(status, ItemStatus::Pending);
777    }
778
779    #[tokio::test]
780    async fn test_status_missing() {
781        let config = test_config();
782        let (_tx, rx) = watch::channel(config.clone());
783        let engine = SyncEngine::new(config, rx);
784        
785        let status = engine.status("test.nonexistent").await;
786        assert_eq!(status, ItemStatus::Missing);
787    }
788
789    #[tokio::test]
790    async fn test_get_many_from_l1() {
791        use super::super::EngineState;
792        
793        let config = test_config();
794        let (_tx, rx) = watch::channel(config.clone());
795        let engine = SyncEngine::new(config, rx);
796        let _ = engine.state.send(EngineState::Ready);
797        
798        engine.l1_cache.insert("a".into(), test_item("a"));
799        engine.l1_cache.insert("b".into(), test_item("b"));
800        engine.l1_cache.insert("c".into(), test_item("c"));
801        
802        let results = engine.get_many(&["a", "b", "missing", "c"]).await;
803        
804        assert_eq!(results.len(), 4);
805        assert!(results[0].is_some());
806        assert!(results[1].is_some());
807        assert!(results[2].is_none());
808        assert!(results[3].is_some());
809        
810        assert_eq!(results[0].as_ref().unwrap().object_id, "a");
811        assert_eq!(results[1].as_ref().unwrap().object_id, "b");
812        assert_eq!(results[3].as_ref().unwrap().object_id, "c");
813    }
814
815    #[tokio::test]
816    async fn test_submit_many() {
817        use super::super::EngineState;
818        
819        let config = test_config();
820        let (_tx, rx) = watch::channel(config.clone());
821        let engine = SyncEngine::new(config, rx);
822        let _ = engine.state.send(EngineState::Ready);
823        
824        let items = vec![
825            test_item("batch.1"),
826            test_item("batch.2"),
827            test_item("batch.3"),
828        ];
829        
830        let result = engine.submit_many(items).await.expect("Batch submit failed");
831        
832        assert_eq!(result.total, 3);
833        assert_eq!(result.succeeded, 3);
834        assert_eq!(result.failed, 0);
835        assert!(result.is_success());
836        
837        assert_eq!(engine.len(), 3);
838        assert!(engine.contains("batch.1").await);
839        assert!(engine.contains("batch.2").await);
840        assert!(engine.contains("batch.3").await);
841    }
842
843    #[tokio::test]
844    async fn test_delete_many() {
845        use super::super::EngineState;
846        
847        let config = test_config();
848        let (_tx, rx) = watch::channel(config.clone());
849        let engine = SyncEngine::new(config, rx);
850        let _ = engine.state.send(EngineState::Ready);
851        
852        engine.l1_cache.insert("del.1".into(), test_item("del.1"));
853        engine.l1_cache.insert("del.2".into(), test_item("del.2"));
854        engine.l1_cache.insert("keep".into(), test_item("keep"));
855        
856        let result = engine.delete_many(&["del.1", "del.2"]).await.expect("Batch delete failed");
857        
858        assert_eq!(result.total, 2);
859        assert_eq!(result.succeeded, 2);
860        assert!(result.is_success());
861        
862        assert!(!engine.l1_cache.contains_key("del.1"));
863        assert!(!engine.l1_cache.contains_key("del.2"));
864        assert!(engine.l1_cache.contains_key("keep"));
865    }
866
867    #[tokio::test]
868    async fn test_get_or_insert_with_existing() {
869        use super::super::EngineState;
870        
871        let config = test_config();
872        let (_tx, rx) = watch::channel(config.clone());
873        let engine = SyncEngine::new(config, rx);
874        let _ = engine.state.send(EngineState::Ready);
875        
876        let existing = test_item("existing");
877        engine.l1_cache.insert("existing".into(), existing.clone());
878        
879        let factory_called = std::sync::atomic::AtomicBool::new(false);
880        let result = engine.get_or_insert_with("existing", || {
881            factory_called.store(true, std::sync::atomic::Ordering::SeqCst);
882            async { test_item("should_not_be_used") }
883        }).await.expect("get_or_insert_with failed");
884        
885        assert!(!factory_called.load(std::sync::atomic::Ordering::SeqCst));
886        assert_eq!(result.object_id, "existing");
887    }
888
889    #[tokio::test]
890    async fn test_get_or_insert_with_missing() {
891        use super::super::EngineState;
892        
893        let config = test_config();
894        let (_tx, rx) = watch::channel(config.clone());
895        let engine = SyncEngine::new(config, rx);
896        let _ = engine.state.send(EngineState::Ready);
897        
898        let result = engine.get_or_insert_with("new_item", || async {
899            SyncItem::from_json("new_item".into(), json!({"created": "by factory"}))
900        }).await.expect("get_or_insert_with failed");
901        
902        assert_eq!(result.object_id, "new_item");
903        assert!(engine.contains("new_item").await);
904    }
905}