sync_engine/coordinator/
api.rs

1//! V1.1 API: Query and batch operations.
2//!
3//! This module contains the higher-level API methods added in V1.1:
4//! - `contains()` - Fast probabilistic existence check
5//! - `len()` / `is_empty()` - L1 cache size queries
6//! - `status()` - Detailed sync status
7//! - `get_many()` - Parallel batch fetch
8//! - `submit_many()` - Batch upsert
9//! - `delete_many()` - Batch delete
10//! - `get_or_insert_with()` - Cache-aside pattern
11
12use std::sync::atomic::Ordering;
13use tokio::task::JoinSet;
14use tracing::{debug, info, warn, error};
15
16use crate::storage::traits::StorageError;
17use crate::sync_item::SyncItem;
18use crate::merkle::MerkleBatch;
19
20use super::{SyncEngine, ItemStatus, BatchResult};
21
22impl SyncEngine {
23    // ═══════════════════════════════════════════════════════════════════════════
24    // API: Query & Batch Operations
25    // ═══════════════════════════════════════════════════════════════════════════
26
27    /// Check if an item exists across all tiers.
28    ///
29    /// Checks in order: L1 cache → Redis EXISTS → L3 Cuckoo filter → SQL query.
30    /// If found in SQL and Cuckoo filter was untrusted, updates the filter.
31    ///
32    /// # Returns
33    /// - `true` → item definitely exists in at least one tier
34    /// - `false` → item does not exist (authoritative)
35    ///
36    /// # Example
37    ///
38    /// ```rust,no_run
39    /// # use sync_engine::SyncEngine;
40    /// # async fn example(engine: &SyncEngine) {
41    /// if engine.contains("user.123").await {
42    ///     let item = engine.get("user.123").await;
43    /// } else {
44    ///     println!("Not found");
45    /// }
46    /// # }
47    /// ```
48    pub async fn contains(&self, id: &str) -> bool {
49        // L1: In-memory cache (definitive, sync)
50        if self.l1_cache.contains_key(id) {
51            return true;
52        }
53        
54        // L2: Redis EXISTS (async, authoritative for Redis tier)
55        if let Some(ref l2) = self.l2_store {
56            if l2.exists(id).await.unwrap_or(false) {
57                return true;
58            }
59        }
60        
61        // L3: Cuckoo filter check (sync, probabilistic)
62        if self.l3_filter.is_trusted() {
63            // Filter is trusted - use it for fast negative
64            if !self.l3_filter.should_check_l3(id) {
65                return false; // Definitely not in L3
66            }
67        }
68        
69        // L3: SQL query (async, ground truth)
70        if let Some(ref l3) = self.l3_store {
71            if l3.exists(id).await.unwrap_or(false) {
72                // Found in SQL - update Cuckoo if it was untrusted
73                if !self.l3_filter.is_trusted() {
74                    self.l3_filter.insert(id);
75                }
76                return true;
77            }
78        }
79        
80        false
81    }
82    
83    /// Fast sync check if item might exist (L1 + Cuckoo only).
84    ///
85    /// Use this when you need a quick probabilistic check without async.
86    /// For authoritative check, use `contains()` instead.
87    #[must_use]
88    #[inline]
89    pub fn contains_fast(&self, id: &str) -> bool {
90        self.l1_cache.contains_key(id) || self.l3_filter.should_check_l3(id)
91    }
92
93    /// Get the current count of items in L1 cache.
94    #[must_use]
95    #[inline]
96    pub fn len(&self) -> usize {
97        self.l1_cache.len()
98    }
99
100    /// Check if L1 cache is empty.
101    #[must_use]
102    #[inline]
103    pub fn is_empty(&self) -> bool {
104        self.l1_cache.is_empty()
105    }
106
107    /// Get the sync status of an item.
108    ///
109    /// Returns detailed state information about where an item exists
110    /// and its sync status across tiers.
111    ///
112    /// # Example
113    ///
114    /// ```rust,no_run
115    /// # use sync_engine::{SyncEngine, ItemStatus};
116    /// # async fn example(engine: &SyncEngine) {
117    /// match engine.status("order.456").await {
118    ///     ItemStatus::Synced { in_l1, in_l2, in_l3 } => {
119    ///         println!("Synced: L1={}, L2={}, L3={}", in_l1, in_l2, in_l3);
120    ///     }
121    ///     ItemStatus::Pending => println!("Queued for sync"),
122    ///     ItemStatus::Missing => println!("Not found"),
123    /// }
124    /// # }
125    /// ```
126    pub async fn status(&self, id: &str) -> ItemStatus {
127        let in_l1 = self.l1_cache.contains_key(id);
128        
129        // Check if pending in batch queue
130        let pending = self.l2_batcher.lock().await.contains(id);
131        if pending {
132            return ItemStatus::Pending;
133        }
134        
135        // Check L2 (if available) - use EXISTS, no filter
136        let in_l2 = if let Some(ref l2) = self.l2_store {
137            l2.exists(id).await.unwrap_or(false)
138        } else {
139            false
140        };
141        
142        // Check L3 (if available)  
143        let in_l3 = if let Some(ref l3) = self.l3_store {
144            self.l3_filter.should_check_l3(id) && l3.get(id).await.ok().flatten().is_some()
145        } else {
146            false
147        };
148        
149        if in_l1 || in_l2 || in_l3 {
150            ItemStatus::Synced { in_l1, in_l2, in_l3 }
151        } else {
152            ItemStatus::Missing
153        }
154    }
155
156    /// Fetch multiple items in parallel.
157    ///
158    /// Returns a vector of `Option<SyncItem>` in the same order as input IDs.
159    /// Missing items are represented as `None`.
160    ///
161    /// # Performance
162    ///
163    /// This method fetches from L1 synchronously, then batches L2/L3 lookups
164    /// for items not in L1. Much faster than sequential `get()` calls.
165    ///
166    /// # Example
167    ///
168    /// ```rust,no_run
169    /// # use sync_engine::SyncEngine;
170    /// # async fn example(engine: &SyncEngine) {
171    /// let ids = vec!["user.1", "user.2", "user.3"];
172    /// let items = engine.get_many(&ids).await;
173    /// for (id, item) in ids.iter().zip(items.iter()) {
174    ///     match item {
175    ///         Some(item) => println!("{}: found", id),
176    ///         None => println!("{}: missing", id),
177    ///     }
178    /// }
179    /// # }
180    /// ```
181    pub async fn get_many(&self, ids: &[&str]) -> Vec<Option<SyncItem>> {
182        let mut results: Vec<Option<SyncItem>> = vec![None; ids.len()];
183        let mut missing_indices: Vec<usize> = Vec::new();
184        
185        // Phase 1: Check L1 (synchronous, fast)
186        for (i, id) in ids.iter().enumerate() {
187            if let Some(item) = self.l1_cache.get(*id) {
188                results[i] = Some(item.clone());
189            } else {
190                missing_indices.push(i);
191            }
192        }
193        
194        // Phase 2: Fetch missing items from L2/L3 in parallel
195        if !missing_indices.is_empty() {
196            let mut join_set: JoinSet<(usize, Option<SyncItem>)> = JoinSet::new();
197            
198            for &i in &missing_indices {
199                let id = ids[i].to_string();
200                let l2_store = self.l2_store.clone();
201                let l3_store = self.l3_store.clone();
202                let l3_filter = self.l3_filter.clone();
203                
204                join_set.spawn(async move {
205                    // Try L2 first (no filter, just try Redis)
206                    if let Some(ref l2) = l2_store {
207                        if let Ok(Some(item)) = l2.get(&id).await {
208                            return (i, Some(item));
209                        }
210                    }
211                    
212                    // Fall back to L3 (use Cuckoo filter if trusted)
213                    if let Some(ref l3) = l3_store {
214                        if !l3_filter.is_trusted() || l3_filter.should_check_l3(&id) {
215                            if let Ok(Some(item)) = l3.get(&id).await {
216                                return (i, Some(item));
217                            }
218                        }
219                    }
220                    
221                    (i, None)
222                });
223            }
224            
225            // Collect results
226            while let Some(result) = join_set.join_next().await {
227                if let Ok((i, item)) = result {
228                    results[i] = item;
229                }
230            }
231        }
232        
233        results
234    }
235
236    /// Submit multiple items for sync atomically.
237    ///
238    /// All items are added to L1 and queued for batch persistence.
239    /// Returns a `BatchResult` with success/failure counts.
240    ///
241    /// # Example
242    ///
243    /// ```rust,no_run
244    /// # use sync_engine::{SyncEngine, SyncItem};
245    /// # use serde_json::json;
246    /// # async fn example(engine: &SyncEngine) {
247    /// let items = vec![
248    ///     SyncItem::from_json("user.1".into(), json!({"name": "Alice"})),
249    ///     SyncItem::from_json("user.2".into(), json!({"name": "Bob"})),
250    /// ];
251    /// let result = engine.submit_many(items).await.unwrap();
252    /// println!("Submitted: {}, Failed: {}", result.succeeded, result.failed);
253    /// # }
254    /// ```
255    pub async fn submit_many(&self, items: Vec<SyncItem>) -> Result<BatchResult, StorageError> {
256        if !self.should_accept_writes() {
257            return Err(StorageError::Backend(format!(
258                "Rejecting batch write: engine state={}, pressure={}",
259                self.state(),
260                self.pressure()
261            )));
262        }
263        
264        let total = items.len();
265        let mut succeeded = 0;
266        
267        // Lock batcher once for the whole batch
268        let mut batcher = self.l2_batcher.lock().await;
269        
270        for item in items {
271            self.insert_l1(item.clone());
272            batcher.add(item);
273            succeeded += 1;
274        }
275        
276        debug!(total, succeeded, "Batch submitted to L1 and queue");
277        
278        Ok(BatchResult {
279            total,
280            succeeded,
281            failed: total - succeeded,
282        })
283    }
284
285    /// Delete multiple items atomically.
286    ///
287    /// Removes items from all tiers (L1, L2, L3) and updates filters.
288    /// Returns a `BatchResult` with counts.
289    ///
290    /// # Example
291    ///
292    /// ```rust,no_run
293    /// # use sync_engine::SyncEngine;
294    /// # async fn example(engine: &SyncEngine) {
295    /// let ids = vec!["user.1", "user.2", "user.3"];
296    /// let result = engine.delete_many(&ids).await.unwrap();
297    /// println!("Deleted: {}", result.succeeded);
298    /// # }
299    /// ```
300    pub async fn delete_many(&self, ids: &[&str]) -> Result<BatchResult, StorageError> {
301        if !self.should_accept_writes() {
302            return Err(StorageError::Backend(format!(
303                "Rejecting batch delete: engine state={}, pressure={}",
304                self.state(),
305                self.pressure()
306            )));
307        }
308        
309        let total = ids.len();
310        let mut succeeded = 0;
311        
312        // Build merkle batch for all deletions
313        let mut merkle_batch = MerkleBatch::new();
314        
315        for id in ids {
316            // Remove from L1
317            if let Some((_, item)) = self.l1_cache.remove(*id) {
318                let size = Self::item_size(&item);
319                self.l1_size_bytes.fetch_sub(size, Ordering::Release);
320            }
321            
322            // Remove from L3 filter (no L2 filter with TTL support)
323            self.l3_filter.remove(id);
324            
325            // Queue merkle deletion
326            merkle_batch.delete(id.to_string());
327            
328            succeeded += 1;
329        }
330        
331        // Batch delete from L2
332        if let Some(ref l2) = self.l2_store {
333            for id in ids {
334                if let Err(e) = l2.delete(id).await {
335                    warn!(id, error = %e, "Failed to delete from L2");
336                }
337            }
338        }
339        
340        // Batch delete from L3
341        if let Some(ref l3) = self.l3_store {
342            for id in ids {
343                if let Err(e) = l3.delete(id).await {
344                    warn!(id, error = %e, "Failed to delete from L3");
345                }
346            }
347        }
348        
349        // Update merkle trees
350        if let Some(ref sql_merkle) = self.sql_merkle {
351            if let Err(e) = sql_merkle.apply_batch(&merkle_batch).await {
352                error!(error = %e, "Failed to update SQL Merkle tree for batch deletion");
353            }
354        }
355        
356        if let Some(ref redis_merkle) = self.redis_merkle {
357            if let Err(e) = redis_merkle.apply_batch(&merkle_batch).await {
358                warn!(error = %e, "Failed to update Redis Merkle tree for batch deletion");
359            }
360        }
361        
362        info!(total, succeeded, "Batch delete completed");
363        
364        Ok(BatchResult {
365            total,
366            succeeded,
367            failed: total - succeeded,
368        })
369    }
370
371    /// Get an item, or compute and insert it if missing.
372    ///
373    /// This is the classic "get or insert" pattern, useful for cache-aside:
374    /// 1. Check cache (L1 → L2 → L3)
375    /// 2. If missing, call the async factory function
376    /// 3. Insert the result and return it
377    ///
378    /// The factory is only called if the item is not found.
379    ///
380    /// # Example
381    ///
382    /// ```rust,no_run
383    /// # use sync_engine::{SyncEngine, SyncItem};
384    /// # use serde_json::json;
385    /// # async fn example(engine: &SyncEngine) {
386    /// let item = engine.get_or_insert_with("user.123", || async {
387    ///     // Expensive operation - only runs if not cached
388    ///     SyncItem::from_json("user.123".into(), json!({"name": "Fetched from DB"}))
389    /// }).await.unwrap();
390    /// # }
391    /// ```
392    pub async fn get_or_insert_with<F, Fut>(
393        &self,
394        id: &str,
395        factory: F,
396    ) -> Result<SyncItem, StorageError>
397    where
398        F: FnOnce() -> Fut,
399        Fut: std::future::Future<Output = SyncItem>,
400    {
401        // Try to get existing
402        if let Some(item) = self.get(id).await? {
403            return Ok(item);
404        }
405        
406        // Not found - compute new value
407        let item = factory().await;
408        
409        // Insert and return
410        self.submit(item.clone()).await?;
411        
412        Ok(item)
413    }
414    
415    // ═══════════════════════════════════════════════════════════════════════════
416    // State-based queries: Fast indexed access by caller-defined state tag
417    // ═══════════════════════════════════════════════════════════════════════════
418    
419    /// Get items by state from SQL (L3 ground truth).
420    ///
421    /// Uses indexed query for fast retrieval.
422    ///
423    /// # Example
424    ///
425    /// ```rust,no_run
426    /// # use sync_engine::{SyncEngine, StorageError};
427    /// # async fn example(engine: &SyncEngine) -> Result<(), StorageError> {
428    /// // Get all delta items for CRDT merging
429    /// let deltas = engine.get_by_state("delta", 1000).await?;
430    /// for item in deltas {
431    ///     println!("Delta: {}", item.object_id);
432    /// }
433    /// # Ok(())
434    /// # }
435    /// ```
436    pub async fn get_by_state(&self, state: &str, limit: usize) -> Result<Vec<SyncItem>, StorageError> {
437        if let Some(ref sql) = self.sql_store {
438            sql.get_by_state(state, limit).await
439        } else {
440            Ok(Vec::new())
441        }
442    }
443    
444    /// Count items in a given state (SQL ground truth).
445    ///
446    /// # Example
447    ///
448    /// ```rust,no_run
449    /// # use sync_engine::{SyncEngine, StorageError};
450    /// # async fn example(engine: &SyncEngine) -> Result<(), StorageError> {
451    /// let pending_count = engine.count_by_state("pending").await?;
452    /// println!("{} items pending", pending_count);
453    /// # Ok(())
454    /// # }
455    /// ```
456    pub async fn count_by_state(&self, state: &str) -> Result<u64, StorageError> {
457        if let Some(ref sql) = self.sql_store {
458            sql.count_by_state(state).await
459        } else {
460            Ok(0)
461        }
462    }
463    
464    /// Get just the IDs of items in a given state (lightweight query).
465    ///
466    /// Returns IDs from SQL. For Redis state SET, use `list_state_ids_redis()`.
467    pub async fn list_state_ids(&self, state: &str, limit: usize) -> Result<Vec<String>, StorageError> {
468        if let Some(ref sql) = self.sql_store {
469            sql.list_state_ids(state, limit).await
470        } else {
471            Ok(Vec::new())
472        }
473    }
474    
475    /// Update the state of an item by ID.
476    ///
477    /// Updates both SQL (ground truth) and Redis state SETs.
478    /// L1 cache is NOT updated - caller should re-fetch if needed.
479    ///
480    /// Returns true if the item was found and updated.
481    pub async fn set_state(&self, id: &str, new_state: &str) -> Result<bool, StorageError> {
482        let mut updated = false;
483        
484        // Update SQL (ground truth)
485        if let Some(ref sql) = self.sql_store {
486            updated = sql.set_state(id, new_state).await?;
487        }
488        
489        // Note: Redis state SETs are not updated here because we'd need to know
490        // the old state to do SREM. For full Redis state management, the item
491        // should be re-submitted with the new state via submit_with().
492        
493        Ok(updated)
494    }
495    
496    /// Delete all items in a given state from SQL.
497    ///
498    /// Also removes from L1 cache and Redis state SET.
499    /// Returns the number of deleted items.
500    ///
501    /// # Example
502    ///
503    /// ```rust,no_run
504    /// # use sync_engine::{SyncEngine, StorageError};
505    /// # async fn example(engine: &SyncEngine) -> Result<(), StorageError> {
506    /// // Clean up all processed deltas
507    /// let deleted = engine.delete_by_state("delta").await?;
508    /// println!("Deleted {} delta items", deleted);
509    /// # Ok(())
510    /// # }
511    /// ```
512    pub async fn delete_by_state(&self, state: &str) -> Result<u64, StorageError> {
513        let mut deleted = 0u64;
514        
515        // Get IDs first (for L1 cleanup)
516        let ids = if let Some(ref sql) = self.sql_store {
517            sql.list_state_ids(state, 100_000).await?
518        } else {
519            Vec::new()
520        };
521        
522        // Remove from L1 cache
523        for id in &ids {
524            self.l1_cache.remove(id);
525        }
526        
527        // Delete from SQL
528        if let Some(ref sql) = self.sql_store {
529            deleted = sql.delete_by_state(state).await?;
530        }
531        
532        // Note: Redis items with TTL will expire naturally.
533        // For immediate Redis cleanup, call delete_by_state on RedisStore directly.
534        
535        info!(state = %state, deleted = deleted, "Deleted items by state");
536        
537        Ok(deleted)
538    }
539    
540    // =========================================================================
541    // Prefix Scan Operations
542    // =========================================================================
543    
544    /// Scan items by ID prefix.
545    ///
546    /// Retrieves all items whose ID starts with the given prefix.
547    /// Queries SQL (ground truth) directly - does NOT check L1 cache.
548    ///
549    /// Useful for CRDT delta-first architecture where deltas are stored as:
550    /// `delta:{object_id}:{op_id}` and you need to fetch all deltas for an object.
551    ///
552    /// # Example
553    ///
554    /// ```rust,no_run
555    /// # use sync_engine::{SyncEngine, StorageError};
556    /// # async fn example(engine: &SyncEngine) -> Result<(), StorageError> {
557    /// // Get base state
558    /// let base = engine.get("base:user.123").await?;
559    ///
560    /// // Get all pending deltas for this object
561    /// let deltas = engine.scan_prefix("delta:user.123:", 1000).await?;
562    ///
563    /// // Merge on-the-fly for read-repair
564    /// for delta in deltas {
565    ///     println!("Delta: {} -> {:?}", delta.object_id, delta.content_as_json());
566    /// }
567    /// # Ok(())
568    /// # }
569    /// ```
570    pub async fn scan_prefix(&self, prefix: &str, limit: usize) -> Result<Vec<SyncItem>, StorageError> {
571        if let Some(ref sql) = self.sql_store {
572            sql.scan_prefix(prefix, limit).await
573        } else {
574            Ok(Vec::new())
575        }
576    }
577    
578    /// Count items matching an ID prefix (SQL ground truth).
579    pub async fn count_prefix(&self, prefix: &str) -> Result<u64, StorageError> {
580        if let Some(ref sql) = self.sql_store {
581            sql.count_prefix(prefix).await
582        } else {
583            Ok(0)
584        }
585    }
586    
587    /// Delete all items matching an ID prefix.
588    ///
589    /// Removes from L1 cache, SQL, and Redis.
590    /// Returns the number of deleted items.
591    ///
592    /// # Example
593    ///
594    /// ```rust,no_run
595    /// # use sync_engine::{SyncEngine, StorageError};
596    /// # async fn example(engine: &SyncEngine) -> Result<(), StorageError> {
597    /// // After merging deltas into base, clean them up
598    /// let deleted = engine.delete_prefix("delta:user.123:").await?;
599    /// println!("Cleaned up {} deltas", deleted);
600    /// # Ok(())
601    /// # }
602    /// ```
603    pub async fn delete_prefix(&self, prefix: &str) -> Result<u64, StorageError> {
604        let mut deleted = 0u64;
605        
606        // Get IDs first (for L1/L2 cleanup)
607        let items = if let Some(ref sql) = self.sql_store {
608            sql.scan_prefix(prefix, 100_000).await?
609        } else {
610            Vec::new()
611        };
612        
613        // Remove from L1 cache
614        for item in &items {
615            self.l1_cache.remove(&item.object_id);
616        }
617        
618        // Remove from L2 (Redis) one-by-one via CacheStore trait
619        if let Some(ref l2) = self.l2_store {
620            for item in &items {
621                let _ = l2.delete(&item.object_id).await;
622            }
623        }
624        
625        // Delete from SQL
626        if let Some(ref sql) = self.sql_store {
627            deleted = sql.delete_prefix(prefix).await?;
628        }
629        
630        info!(prefix = %prefix, deleted = deleted, "Deleted items by prefix");
631        
632        Ok(deleted)
633    }
634}
635
636#[cfg(test)]
637mod tests {
638    use super::*;
639    use crate::config::SyncEngineConfig;
640    use serde_json::json;
641    use tokio::sync::watch;
642
643    fn test_config() -> SyncEngineConfig {
644        SyncEngineConfig {
645            redis_url: None,
646            sql_url: None,
647            wal_path: None,
648            l1_max_bytes: 1024 * 1024,
649            ..Default::default()
650        }
651    }
652
653    fn test_item(id: &str) -> SyncItem {
654        SyncItem::from_json(id.to_string(), json!({"test": "data", "id": id}))
655    }
656
657    #[tokio::test]
658    async fn test_contains_l1_hit() {
659        let config = test_config();
660        let (_tx, rx) = watch::channel(config.clone());
661        let engine = SyncEngine::new(config, rx);
662        
663        engine.l1_cache.insert("test.exists".into(), test_item("test.exists"));
664        
665        assert!(engine.contains("test.exists").await);
666    }
667
668    #[tokio::test]
669    async fn test_contains_with_trusted_filter() {
670        let config = test_config();
671        let (_tx, rx) = watch::channel(config.clone());
672        let engine = SyncEngine::new(config, rx);
673        
674        engine.l3_filter.mark_trusted();
675        
676        engine.l1_cache.insert("test.exists".into(), test_item("test.exists"));
677        engine.l3_filter.insert("test.exists");
678        
679        assert!(engine.contains("test.exists").await);
680        assert!(!engine.contains("test.missing").await);
681    }
682
683    #[test]
684    fn test_len_and_is_empty() {
685        let config = test_config();
686        let (_tx, rx) = watch::channel(config.clone());
687        let engine = SyncEngine::new(config, rx);
688        
689        assert!(engine.is_empty());
690        assert_eq!(engine.len(), 0);
691        
692        engine.l1_cache.insert("a".into(), test_item("a"));
693        assert!(!engine.is_empty());
694        assert_eq!(engine.len(), 1);
695        
696        engine.l1_cache.insert("b".into(), test_item("b"));
697        assert_eq!(engine.len(), 2);
698    }
699
700    #[tokio::test]
701    async fn test_status_synced_in_l1() {
702        use super::super::EngineState;
703        
704        let config = test_config();
705        let (_tx, rx) = watch::channel(config.clone());
706        let engine = SyncEngine::new(config, rx);
707        let _ = engine.state.send(EngineState::Ready);
708        
709        engine.submit(test_item("test.item")).await.expect("Submit failed");
710        let _ = engine.l2_batcher.lock().await.force_flush();
711        
712        let status = engine.status("test.item").await;
713        assert!(matches!(status, ItemStatus::Synced { in_l1: true, .. }));
714    }
715
716    #[tokio::test]
717    async fn test_status_pending() {
718        use super::super::EngineState;
719        
720        let config = test_config();
721        let (_tx, rx) = watch::channel(config.clone());
722        let engine = SyncEngine::new(config, rx);
723        let _ = engine.state.send(EngineState::Ready);
724        
725        engine.submit(test_item("test.pending")).await.expect("Submit failed");
726        
727        let status = engine.status("test.pending").await;
728        assert_eq!(status, ItemStatus::Pending);
729    }
730
731    #[tokio::test]
732    async fn test_status_missing() {
733        let config = test_config();
734        let (_tx, rx) = watch::channel(config.clone());
735        let engine = SyncEngine::new(config, rx);
736        
737        let status = engine.status("test.nonexistent").await;
738        assert_eq!(status, ItemStatus::Missing);
739    }
740
741    #[tokio::test]
742    async fn test_get_many_from_l1() {
743        use super::super::EngineState;
744        
745        let config = test_config();
746        let (_tx, rx) = watch::channel(config.clone());
747        let engine = SyncEngine::new(config, rx);
748        let _ = engine.state.send(EngineState::Ready);
749        
750        engine.l1_cache.insert("a".into(), test_item("a"));
751        engine.l1_cache.insert("b".into(), test_item("b"));
752        engine.l1_cache.insert("c".into(), test_item("c"));
753        
754        let results = engine.get_many(&["a", "b", "missing", "c"]).await;
755        
756        assert_eq!(results.len(), 4);
757        assert!(results[0].is_some());
758        assert!(results[1].is_some());
759        assert!(results[2].is_none());
760        assert!(results[3].is_some());
761        
762        assert_eq!(results[0].as_ref().unwrap().object_id, "a");
763        assert_eq!(results[1].as_ref().unwrap().object_id, "b");
764        assert_eq!(results[3].as_ref().unwrap().object_id, "c");
765    }
766
767    #[tokio::test]
768    async fn test_submit_many() {
769        use super::super::EngineState;
770        
771        let config = test_config();
772        let (_tx, rx) = watch::channel(config.clone());
773        let engine = SyncEngine::new(config, rx);
774        let _ = engine.state.send(EngineState::Ready);
775        
776        let items = vec![
777            test_item("batch.1"),
778            test_item("batch.2"),
779            test_item("batch.3"),
780        ];
781        
782        let result = engine.submit_many(items).await.expect("Batch submit failed");
783        
784        assert_eq!(result.total, 3);
785        assert_eq!(result.succeeded, 3);
786        assert_eq!(result.failed, 0);
787        assert!(result.is_success());
788        
789        assert_eq!(engine.len(), 3);
790        assert!(engine.contains("batch.1").await);
791        assert!(engine.contains("batch.2").await);
792        assert!(engine.contains("batch.3").await);
793    }
794
795    #[tokio::test]
796    async fn test_delete_many() {
797        use super::super::EngineState;
798        
799        let config = test_config();
800        let (_tx, rx) = watch::channel(config.clone());
801        let engine = SyncEngine::new(config, rx);
802        let _ = engine.state.send(EngineState::Ready);
803        
804        engine.l1_cache.insert("del.1".into(), test_item("del.1"));
805        engine.l1_cache.insert("del.2".into(), test_item("del.2"));
806        engine.l1_cache.insert("keep".into(), test_item("keep"));
807        
808        let result = engine.delete_many(&["del.1", "del.2"]).await.expect("Batch delete failed");
809        
810        assert_eq!(result.total, 2);
811        assert_eq!(result.succeeded, 2);
812        assert!(result.is_success());
813        
814        assert!(!engine.l1_cache.contains_key("del.1"));
815        assert!(!engine.l1_cache.contains_key("del.2"));
816        assert!(engine.l1_cache.contains_key("keep"));
817    }
818
819    #[tokio::test]
820    async fn test_get_or_insert_with_existing() {
821        use super::super::EngineState;
822        
823        let config = test_config();
824        let (_tx, rx) = watch::channel(config.clone());
825        let engine = SyncEngine::new(config, rx);
826        let _ = engine.state.send(EngineState::Ready);
827        
828        let existing = test_item("existing");
829        engine.l1_cache.insert("existing".into(), existing.clone());
830        
831        let factory_called = std::sync::atomic::AtomicBool::new(false);
832        let result = engine.get_or_insert_with("existing", || {
833            factory_called.store(true, std::sync::atomic::Ordering::SeqCst);
834            async { test_item("should_not_be_used") }
835        }).await.expect("get_or_insert_with failed");
836        
837        assert!(!factory_called.load(std::sync::atomic::Ordering::SeqCst));
838        assert_eq!(result.object_id, "existing");
839    }
840
841    #[tokio::test]
842    async fn test_get_or_insert_with_missing() {
843        use super::super::EngineState;
844        
845        let config = test_config();
846        let (_tx, rx) = watch::channel(config.clone());
847        let engine = SyncEngine::new(config, rx);
848        let _ = engine.state.send(EngineState::Ready);
849        
850        let result = engine.get_or_insert_with("new_item", || async {
851            SyncItem::from_json("new_item".into(), json!({"created": "by factory"}))
852        }).await.expect("get_or_insert_with failed");
853        
854        assert_eq!(result.object_id, "new_item");
855        assert!(engine.contains("new_item").await);
856    }
857}