sync_engine/coordinator/api.rs
1//! V1.1 API: Query and batch operations.
2//!
3//! This module contains the higher-level API methods added in V1.1:
4//! - `contains()` - Fast probabilistic existence check
5//! - `len()` / `is_empty()` - L1 cache size queries
6//! - `status()` - Detailed sync status
7//! - `get_many()` - Parallel batch fetch
8//! - `submit_many()` - Batch upsert
9//! - `delete_many()` - Batch delete
10//! - `get_or_insert_with()` - Cache-aside pattern
11
12use std::sync::atomic::Ordering;
13use tokio::task::JoinSet;
14use tracing::{debug, info, warn, error};
15
16use crate::storage::traits::StorageError;
17use crate::sync_item::SyncItem;
18use crate::merkle::MerkleBatch;
19
20use super::{SyncEngine, ItemStatus, BatchResult};
21
22impl SyncEngine {
23 // ═══════════════════════════════════════════════════════════════════════════
24 // API: Query & Batch Operations
25 // ═══════════════════════════════════════════════════════════════════════════
26
27 /// Check if an item exists across all tiers.
28 ///
29 /// Checks in order: L1 cache → Redis EXISTS → L3 Cuckoo filter → SQL query.
30 /// If found in SQL and Cuckoo filter was untrusted, updates the filter.
31 ///
32 /// # Returns
33 /// - `true` → item definitely exists in at least one tier
34 /// - `false` → item does not exist (authoritative)
35 ///
36 /// # Example
37 ///
38 /// ```rust,no_run
39 /// # use sync_engine::SyncEngine;
40 /// # async fn example(engine: &SyncEngine) {
41 /// if engine.contains("user.123").await {
42 /// let item = engine.get("user.123").await;
43 /// } else {
44 /// println!("Not found");
45 /// }
46 /// # }
47 /// ```
48 pub async fn contains(&self, id: &str) -> bool {
49 // L1: In-memory cache (definitive, sync)
50 if self.l1_cache.contains_key(id) {
51 return true;
52 }
53
54 // L2: Redis EXISTS (async, authoritative for Redis tier)
55 if let Some(ref l2) = self.l2_store {
56 if l2.exists(id).await.unwrap_or(false) {
57 return true;
58 }
59 }
60
61 // L3: Cuckoo filter check (sync, probabilistic)
62 if self.l3_filter.is_trusted() {
63 // Filter is trusted - use it for fast negative
64 if !self.l3_filter.should_check_l3(id) {
65 return false; // Definitely not in L3
66 }
67 }
68
69 // L3: SQL query (async, ground truth)
70 if let Some(ref l3) = self.l3_store {
71 if l3.exists(id).await.unwrap_or(false) {
72 // Found in SQL - update Cuckoo if it was untrusted
73 if !self.l3_filter.is_trusted() {
74 self.l3_filter.insert(id);
75 }
76 return true;
77 }
78 }
79
80 false
81 }
82
83 /// Fast sync check if item might exist (L1 + Cuckoo only).
84 ///
85 /// Use this when you need a quick probabilistic check without async.
86 /// For authoritative check, use `contains()` instead.
87 #[must_use]
88 #[inline]
89 pub fn contains_fast(&self, id: &str) -> bool {
90 self.l1_cache.contains_key(id) || self.l3_filter.should_check_l3(id)
91 }
92
93 /// Get the current count of items in L1 cache.
94 #[must_use]
95 #[inline]
96 pub fn len(&self) -> usize {
97 self.l1_cache.len()
98 }
99
100 /// Check if L1 cache is empty.
101 #[must_use]
102 #[inline]
103 pub fn is_empty(&self) -> bool {
104 self.l1_cache.is_empty()
105 }
106
107 /// Get the sync status of an item.
108 ///
109 /// Returns detailed state information about where an item exists
110 /// and its sync status across tiers.
111 ///
112 /// # Example
113 ///
114 /// ```rust,no_run
115 /// # use sync_engine::{SyncEngine, ItemStatus};
116 /// # async fn example(engine: &SyncEngine) {
117 /// match engine.status("order.456").await {
118 /// ItemStatus::Synced { in_l1, in_l2, in_l3 } => {
119 /// println!("Synced: L1={}, L2={}, L3={}", in_l1, in_l2, in_l3);
120 /// }
121 /// ItemStatus::Pending => println!("Queued for sync"),
122 /// ItemStatus::Missing => println!("Not found"),
123 /// }
124 /// # }
125 /// ```
126 pub async fn status(&self, id: &str) -> ItemStatus {
127 let in_l1 = self.l1_cache.contains_key(id);
128
129 // Check if pending in batch queue
130 let pending = self.l2_batcher.lock().await.contains(id);
131 if pending {
132 return ItemStatus::Pending;
133 }
134
135 // Check L2 (if available) - use EXISTS, no filter
136 let in_l2 = if let Some(ref l2) = self.l2_store {
137 l2.exists(id).await.unwrap_or(false)
138 } else {
139 false
140 };
141
142 // Check L3 (if available)
143 let in_l3 = if let Some(ref l3) = self.l3_store {
144 self.l3_filter.should_check_l3(id) && l3.get(id).await.ok().flatten().is_some()
145 } else {
146 false
147 };
148
149 if in_l1 || in_l2 || in_l3 {
150 ItemStatus::Synced { in_l1, in_l2, in_l3 }
151 } else {
152 ItemStatus::Missing
153 }
154 }
155
156 /// Fetch multiple items in parallel.
157 ///
158 /// Returns a vector of `Option<SyncItem>` in the same order as input IDs.
159 /// Missing items are represented as `None`.
160 ///
161 /// # Performance
162 ///
163 /// This method fetches from L1 synchronously, then batches L2/L3 lookups
164 /// for items not in L1. Much faster than sequential `get()` calls.
165 ///
166 /// # Example
167 ///
168 /// ```rust,no_run
169 /// # use sync_engine::SyncEngine;
170 /// # async fn example(engine: &SyncEngine) {
171 /// let ids = vec!["user.1", "user.2", "user.3"];
172 /// let items = engine.get_many(&ids).await;
173 /// for (id, item) in ids.iter().zip(items.iter()) {
174 /// match item {
175 /// Some(item) => println!("{}: found", id),
176 /// None => println!("{}: missing", id),
177 /// }
178 /// }
179 /// # }
180 /// ```
181 pub async fn get_many(&self, ids: &[&str]) -> Vec<Option<SyncItem>> {
182 let mut results: Vec<Option<SyncItem>> = vec![None; ids.len()];
183 let mut missing_indices: Vec<usize> = Vec::new();
184
185 // Phase 1: Check L1 (synchronous, fast)
186 for (i, id) in ids.iter().enumerate() {
187 if let Some(item) = self.l1_cache.get(*id) {
188 results[i] = Some(item.clone());
189 } else {
190 missing_indices.push(i);
191 }
192 }
193
194 // Phase 2: Fetch missing items from L2/L3 in parallel
195 if !missing_indices.is_empty() {
196 let mut join_set: JoinSet<(usize, Option<SyncItem>)> = JoinSet::new();
197
198 for &i in &missing_indices {
199 let id = ids[i].to_string();
200 let l2_store = self.l2_store.clone();
201 let l3_store = self.l3_store.clone();
202 let l3_filter = self.l3_filter.clone();
203
204 join_set.spawn(async move {
205 // Try L2 first (no filter, just try Redis)
206 if let Some(ref l2) = l2_store {
207 if let Ok(Some(item)) = l2.get(&id).await {
208 return (i, Some(item));
209 }
210 }
211
212 // Fall back to L3 (use Cuckoo filter if trusted)
213 if let Some(ref l3) = l3_store {
214 if !l3_filter.is_trusted() || l3_filter.should_check_l3(&id) {
215 if let Ok(Some(item)) = l3.get(&id).await {
216 return (i, Some(item));
217 }
218 }
219 }
220
221 (i, None)
222 });
223 }
224
225 // Collect results
226 while let Some(result) = join_set.join_next().await {
227 if let Ok((i, item)) = result {
228 results[i] = item;
229 }
230 }
231 }
232
233 results
234 }
235
236 /// Submit multiple items for sync atomically.
237 ///
238 /// All items are added to L1 and queued for batch persistence.
239 /// Returns a `BatchResult` with success/failure counts.
240 ///
241 /// # Example
242 ///
243 /// ```rust,no_run
244 /// # use sync_engine::{SyncEngine, SyncItem};
245 /// # use serde_json::json;
246 /// # async fn example(engine: &SyncEngine) {
247 /// let items = vec![
248 /// SyncItem::from_json("user.1".into(), json!({"name": "Alice"})),
249 /// SyncItem::from_json("user.2".into(), json!({"name": "Bob"})),
250 /// ];
251 /// let result = engine.submit_many(items).await.unwrap();
252 /// println!("Submitted: {}, Failed: {}", result.succeeded, result.failed);
253 /// # }
254 /// ```
255 pub async fn submit_many(&self, items: Vec<SyncItem>) -> Result<BatchResult, StorageError> {
256 if !self.should_accept_writes() {
257 return Err(StorageError::Backend(format!(
258 "Rejecting batch write: engine state={}, pressure={}",
259 self.state(),
260 self.pressure()
261 )));
262 }
263
264 let total = items.len();
265 let mut succeeded = 0;
266
267 // Lock batcher once for the whole batch
268 let mut batcher = self.l2_batcher.lock().await;
269
270 for item in items {
271 self.insert_l1(item.clone());
272 batcher.add(item);
273 succeeded += 1;
274 }
275
276 debug!(total, succeeded, "Batch submitted to L1 and queue");
277
278 Ok(BatchResult {
279 total,
280 succeeded,
281 failed: total - succeeded,
282 })
283 }
284
285 /// Delete multiple items atomically.
286 ///
287 /// Removes items from all tiers (L1, L2, L3) and updates filters.
288 /// Returns a `BatchResult` with counts.
289 ///
290 /// # Example
291 ///
292 /// ```rust,no_run
293 /// # use sync_engine::SyncEngine;
294 /// # async fn example(engine: &SyncEngine) {
295 /// let ids = vec!["user.1", "user.2", "user.3"];
296 /// let result = engine.delete_many(&ids).await.unwrap();
297 /// println!("Deleted: {}", result.succeeded);
298 /// # }
299 /// ```
300 pub async fn delete_many(&self, ids: &[&str]) -> Result<BatchResult, StorageError> {
301 if !self.should_accept_writes() {
302 return Err(StorageError::Backend(format!(
303 "Rejecting batch delete: engine state={}, pressure={}",
304 self.state(),
305 self.pressure()
306 )));
307 }
308
309 let total = ids.len();
310 let mut succeeded = 0;
311
312 // Build merkle batch for all deletions
313 let mut merkle_batch = MerkleBatch::new();
314
315 for id in ids {
316 // Remove from L1
317 if let Some((_, item)) = self.l1_cache.remove(*id) {
318 let size = Self::item_size(&item);
319 self.l1_size_bytes.fetch_sub(size, Ordering::Release);
320 }
321
322 // Remove from L3 filter (no L2 filter with TTL support)
323 self.l3_filter.remove(id);
324
325 // Queue merkle deletion
326 merkle_batch.delete(id.to_string());
327
328 succeeded += 1;
329 }
330
331 // Batch delete from L2
332 if let Some(ref l2) = self.l2_store {
333 for id in ids {
334 if let Err(e) = l2.delete(id).await {
335 warn!(id, error = %e, "Failed to delete from L2");
336 }
337 }
338 }
339
340 // Batch delete from L3
341 if let Some(ref l3) = self.l3_store {
342 for id in ids {
343 if let Err(e) = l3.delete(id).await {
344 warn!(id, error = %e, "Failed to delete from L3");
345 }
346 }
347 }
348
349 // Update merkle trees
350 if let Some(ref sql_merkle) = self.sql_merkle {
351 if let Err(e) = sql_merkle.apply_batch(&merkle_batch).await {
352 error!(error = %e, "Failed to update SQL Merkle tree for batch deletion");
353 }
354 }
355
356 if let Some(ref redis_merkle) = self.redis_merkle {
357 if let Err(e) = redis_merkle.apply_batch(&merkle_batch).await {
358 warn!(error = %e, "Failed to update Redis Merkle tree for batch deletion");
359 }
360 }
361
362 info!(total, succeeded, "Batch delete completed");
363
364 Ok(BatchResult {
365 total,
366 succeeded,
367 failed: total - succeeded,
368 })
369 }
370
371 /// Get an item, or compute and insert it if missing.
372 ///
373 /// This is the classic "get or insert" pattern, useful for cache-aside:
374 /// 1. Check cache (L1 → L2 → L3)
375 /// 2. If missing, call the async factory function
376 /// 3. Insert the result and return it
377 ///
378 /// The factory is only called if the item is not found.
379 ///
380 /// # Example
381 ///
382 /// ```rust,no_run
383 /// # use sync_engine::{SyncEngine, SyncItem};
384 /// # use serde_json::json;
385 /// # async fn example(engine: &SyncEngine) {
386 /// let item = engine.get_or_insert_with("user.123", || async {
387 /// // Expensive operation - only runs if not cached
388 /// SyncItem::from_json("user.123".into(), json!({"name": "Fetched from DB"}))
389 /// }).await.unwrap();
390 /// # }
391 /// ```
392 pub async fn get_or_insert_with<F, Fut>(
393 &self,
394 id: &str,
395 factory: F,
396 ) -> Result<SyncItem, StorageError>
397 where
398 F: FnOnce() -> Fut,
399 Fut: std::future::Future<Output = SyncItem>,
400 {
401 // Try to get existing
402 if let Some(item) = self.get(id).await? {
403 return Ok(item);
404 }
405
406 // Not found - compute new value
407 let item = factory().await;
408
409 // Insert and return
410 self.submit(item.clone()).await?;
411
412 Ok(item)
413 }
414
415 // ═══════════════════════════════════════════════════════════════════════════
416 // State-based queries: Fast indexed access by caller-defined state tag
417 // ═══════════════════════════════════════════════════════════════════════════
418
419 /// Get items by state from SQL (L3 ground truth).
420 ///
421 /// Uses indexed query for fast retrieval.
422 ///
423 /// # Example
424 ///
425 /// ```rust,no_run
426 /// # use sync_engine::{SyncEngine, StorageError};
427 /// # async fn example(engine: &SyncEngine) -> Result<(), StorageError> {
428 /// // Get all delta items for CRDT merging
429 /// let deltas = engine.get_by_state("delta", 1000).await?;
430 /// for item in deltas {
431 /// println!("Delta: {}", item.object_id);
432 /// }
433 /// # Ok(())
434 /// # }
435 /// ```
436 pub async fn get_by_state(&self, state: &str, limit: usize) -> Result<Vec<SyncItem>, StorageError> {
437 if let Some(ref sql) = self.sql_store {
438 sql.get_by_state(state, limit).await
439 } else {
440 Ok(Vec::new())
441 }
442 }
443
444 /// Count items in a given state (SQL ground truth).
445 ///
446 /// # Example
447 ///
448 /// ```rust,no_run
449 /// # use sync_engine::{SyncEngine, StorageError};
450 /// # async fn example(engine: &SyncEngine) -> Result<(), StorageError> {
451 /// let pending_count = engine.count_by_state("pending").await?;
452 /// println!("{} items pending", pending_count);
453 /// # Ok(())
454 /// # }
455 /// ```
456 pub async fn count_by_state(&self, state: &str) -> Result<u64, StorageError> {
457 if let Some(ref sql) = self.sql_store {
458 sql.count_by_state(state).await
459 } else {
460 Ok(0)
461 }
462 }
463
464 /// Get just the IDs of items in a given state (lightweight query).
465 ///
466 /// Returns IDs from SQL. For Redis state SET, use `list_state_ids_redis()`.
467 pub async fn list_state_ids(&self, state: &str, limit: usize) -> Result<Vec<String>, StorageError> {
468 if let Some(ref sql) = self.sql_store {
469 sql.list_state_ids(state, limit).await
470 } else {
471 Ok(Vec::new())
472 }
473 }
474
475 /// Update the state of an item by ID.
476 ///
477 /// Updates both SQL (ground truth) and Redis state SETs.
478 /// L1 cache is NOT updated - caller should re-fetch if needed.
479 ///
480 /// Returns true if the item was found and updated.
481 pub async fn set_state(&self, id: &str, new_state: &str) -> Result<bool, StorageError> {
482 let mut updated = false;
483
484 // Update SQL (ground truth)
485 if let Some(ref sql) = self.sql_store {
486 updated = sql.set_state(id, new_state).await?;
487 }
488
489 // Note: Redis state SETs are not updated here because we'd need to know
490 // the old state to do SREM. For full Redis state management, the item
491 // should be re-submitted with the new state via submit_with().
492
493 Ok(updated)
494 }
495
496 /// Delete all items in a given state from SQL.
497 ///
498 /// Also removes from L1 cache and Redis state SET.
499 /// Returns the number of deleted items.
500 ///
501 /// # Example
502 ///
503 /// ```rust,no_run
504 /// # use sync_engine::{SyncEngine, StorageError};
505 /// # async fn example(engine: &SyncEngine) -> Result<(), StorageError> {
506 /// // Clean up all processed deltas
507 /// let deleted = engine.delete_by_state("delta").await?;
508 /// println!("Deleted {} delta items", deleted);
509 /// # Ok(())
510 /// # }
511 /// ```
512 pub async fn delete_by_state(&self, state: &str) -> Result<u64, StorageError> {
513 let mut deleted = 0u64;
514
515 // Get IDs first (for L1 cleanup)
516 let ids = if let Some(ref sql) = self.sql_store {
517 sql.list_state_ids(state, 100_000).await?
518 } else {
519 Vec::new()
520 };
521
522 // Remove from L1 cache
523 for id in &ids {
524 self.l1_cache.remove(id);
525 }
526
527 // Delete from SQL
528 if let Some(ref sql) = self.sql_store {
529 deleted = sql.delete_by_state(state).await?;
530 }
531
532 // Note: Redis items with TTL will expire naturally.
533 // For immediate Redis cleanup, call delete_by_state on RedisStore directly.
534
535 info!(state = %state, deleted = deleted, "Deleted items by state");
536
537 Ok(deleted)
538 }
539
540 // =========================================================================
541 // Prefix Scan Operations
542 // =========================================================================
543
544 /// Scan items by ID prefix.
545 ///
546 /// Retrieves all items whose ID starts with the given prefix.
547 /// Queries SQL (ground truth) directly - does NOT check L1 cache.
548 ///
549 /// Useful for CRDT delta-first architecture where deltas are stored as:
550 /// `delta:{object_id}:{op_id}` and you need to fetch all deltas for an object.
551 ///
552 /// # Example
553 ///
554 /// ```rust,no_run
555 /// # use sync_engine::{SyncEngine, StorageError};
556 /// # async fn example(engine: &SyncEngine) -> Result<(), StorageError> {
557 /// // Get base state
558 /// let base = engine.get("base:user.123").await?;
559 ///
560 /// // Get all pending deltas for this object
561 /// let deltas = engine.scan_prefix("delta:user.123:", 1000).await?;
562 ///
563 /// // Merge on-the-fly for read-repair
564 /// for delta in deltas {
565 /// println!("Delta: {} -> {:?}", delta.object_id, delta.content_as_json());
566 /// }
567 /// # Ok(())
568 /// # }
569 /// ```
570 pub async fn scan_prefix(&self, prefix: &str, limit: usize) -> Result<Vec<SyncItem>, StorageError> {
571 if let Some(ref sql) = self.sql_store {
572 sql.scan_prefix(prefix, limit).await
573 } else {
574 Ok(Vec::new())
575 }
576 }
577
578 /// Count items matching an ID prefix (SQL ground truth).
579 pub async fn count_prefix(&self, prefix: &str) -> Result<u64, StorageError> {
580 if let Some(ref sql) = self.sql_store {
581 sql.count_prefix(prefix).await
582 } else {
583 Ok(0)
584 }
585 }
586
587 /// Delete all items matching an ID prefix.
588 ///
589 /// Removes from L1 cache, SQL, and Redis.
590 /// Returns the number of deleted items.
591 ///
592 /// # Example
593 ///
594 /// ```rust,no_run
595 /// # use sync_engine::{SyncEngine, StorageError};
596 /// # async fn example(engine: &SyncEngine) -> Result<(), StorageError> {
597 /// // After merging deltas into base, clean them up
598 /// let deleted = engine.delete_prefix("delta:user.123:").await?;
599 /// println!("Cleaned up {} deltas", deleted);
600 /// # Ok(())
601 /// # }
602 /// ```
603 pub async fn delete_prefix(&self, prefix: &str) -> Result<u64, StorageError> {
604 let mut deleted = 0u64;
605
606 // Get IDs first (for L1/L2 cleanup)
607 let items = if let Some(ref sql) = self.sql_store {
608 sql.scan_prefix(prefix, 100_000).await?
609 } else {
610 Vec::new()
611 };
612
613 // Remove from L1 cache
614 for item in &items {
615 self.l1_cache.remove(&item.object_id);
616 }
617
618 // Remove from L2 (Redis) one-by-one via CacheStore trait
619 if let Some(ref l2) = self.l2_store {
620 for item in &items {
621 let _ = l2.delete(&item.object_id).await;
622 }
623 }
624
625 // Delete from SQL
626 if let Some(ref sql) = self.sql_store {
627 deleted = sql.delete_prefix(prefix).await?;
628 }
629
630 info!(prefix = %prefix, deleted = deleted, "Deleted items by prefix");
631
632 Ok(deleted)
633 }
634}
635
636#[cfg(test)]
637mod tests {
638 use super::*;
639 use crate::config::SyncEngineConfig;
640 use serde_json::json;
641 use tokio::sync::watch;
642
643 fn test_config() -> SyncEngineConfig {
644 SyncEngineConfig {
645 redis_url: None,
646 sql_url: None,
647 wal_path: None,
648 l1_max_bytes: 1024 * 1024,
649 ..Default::default()
650 }
651 }
652
653 fn test_item(id: &str) -> SyncItem {
654 SyncItem::from_json(id.to_string(), json!({"test": "data", "id": id}))
655 }
656
657 #[tokio::test]
658 async fn test_contains_l1_hit() {
659 let config = test_config();
660 let (_tx, rx) = watch::channel(config.clone());
661 let engine = SyncEngine::new(config, rx);
662
663 engine.l1_cache.insert("test.exists".into(), test_item("test.exists"));
664
665 assert!(engine.contains("test.exists").await);
666 }
667
668 #[tokio::test]
669 async fn test_contains_with_trusted_filter() {
670 let config = test_config();
671 let (_tx, rx) = watch::channel(config.clone());
672 let engine = SyncEngine::new(config, rx);
673
674 engine.l3_filter.mark_trusted();
675
676 engine.l1_cache.insert("test.exists".into(), test_item("test.exists"));
677 engine.l3_filter.insert("test.exists");
678
679 assert!(engine.contains("test.exists").await);
680 assert!(!engine.contains("test.missing").await);
681 }
682
683 #[test]
684 fn test_len_and_is_empty() {
685 let config = test_config();
686 let (_tx, rx) = watch::channel(config.clone());
687 let engine = SyncEngine::new(config, rx);
688
689 assert!(engine.is_empty());
690 assert_eq!(engine.len(), 0);
691
692 engine.l1_cache.insert("a".into(), test_item("a"));
693 assert!(!engine.is_empty());
694 assert_eq!(engine.len(), 1);
695
696 engine.l1_cache.insert("b".into(), test_item("b"));
697 assert_eq!(engine.len(), 2);
698 }
699
700 #[tokio::test]
701 async fn test_status_synced_in_l1() {
702 use super::super::EngineState;
703
704 let config = test_config();
705 let (_tx, rx) = watch::channel(config.clone());
706 let engine = SyncEngine::new(config, rx);
707 let _ = engine.state.send(EngineState::Ready);
708
709 engine.submit(test_item("test.item")).await.expect("Submit failed");
710 let _ = engine.l2_batcher.lock().await.force_flush();
711
712 let status = engine.status("test.item").await;
713 assert!(matches!(status, ItemStatus::Synced { in_l1: true, .. }));
714 }
715
716 #[tokio::test]
717 async fn test_status_pending() {
718 use super::super::EngineState;
719
720 let config = test_config();
721 let (_tx, rx) = watch::channel(config.clone());
722 let engine = SyncEngine::new(config, rx);
723 let _ = engine.state.send(EngineState::Ready);
724
725 engine.submit(test_item("test.pending")).await.expect("Submit failed");
726
727 let status = engine.status("test.pending").await;
728 assert_eq!(status, ItemStatus::Pending);
729 }
730
731 #[tokio::test]
732 async fn test_status_missing() {
733 let config = test_config();
734 let (_tx, rx) = watch::channel(config.clone());
735 let engine = SyncEngine::new(config, rx);
736
737 let status = engine.status("test.nonexistent").await;
738 assert_eq!(status, ItemStatus::Missing);
739 }
740
741 #[tokio::test]
742 async fn test_get_many_from_l1() {
743 use super::super::EngineState;
744
745 let config = test_config();
746 let (_tx, rx) = watch::channel(config.clone());
747 let engine = SyncEngine::new(config, rx);
748 let _ = engine.state.send(EngineState::Ready);
749
750 engine.l1_cache.insert("a".into(), test_item("a"));
751 engine.l1_cache.insert("b".into(), test_item("b"));
752 engine.l1_cache.insert("c".into(), test_item("c"));
753
754 let results = engine.get_many(&["a", "b", "missing", "c"]).await;
755
756 assert_eq!(results.len(), 4);
757 assert!(results[0].is_some());
758 assert!(results[1].is_some());
759 assert!(results[2].is_none());
760 assert!(results[3].is_some());
761
762 assert_eq!(results[0].as_ref().unwrap().object_id, "a");
763 assert_eq!(results[1].as_ref().unwrap().object_id, "b");
764 assert_eq!(results[3].as_ref().unwrap().object_id, "c");
765 }
766
767 #[tokio::test]
768 async fn test_submit_many() {
769 use super::super::EngineState;
770
771 let config = test_config();
772 let (_tx, rx) = watch::channel(config.clone());
773 let engine = SyncEngine::new(config, rx);
774 let _ = engine.state.send(EngineState::Ready);
775
776 let items = vec![
777 test_item("batch.1"),
778 test_item("batch.2"),
779 test_item("batch.3"),
780 ];
781
782 let result = engine.submit_many(items).await.expect("Batch submit failed");
783
784 assert_eq!(result.total, 3);
785 assert_eq!(result.succeeded, 3);
786 assert_eq!(result.failed, 0);
787 assert!(result.is_success());
788
789 assert_eq!(engine.len(), 3);
790 assert!(engine.contains("batch.1").await);
791 assert!(engine.contains("batch.2").await);
792 assert!(engine.contains("batch.3").await);
793 }
794
795 #[tokio::test]
796 async fn test_delete_many() {
797 use super::super::EngineState;
798
799 let config = test_config();
800 let (_tx, rx) = watch::channel(config.clone());
801 let engine = SyncEngine::new(config, rx);
802 let _ = engine.state.send(EngineState::Ready);
803
804 engine.l1_cache.insert("del.1".into(), test_item("del.1"));
805 engine.l1_cache.insert("del.2".into(), test_item("del.2"));
806 engine.l1_cache.insert("keep".into(), test_item("keep"));
807
808 let result = engine.delete_many(&["del.1", "del.2"]).await.expect("Batch delete failed");
809
810 assert_eq!(result.total, 2);
811 assert_eq!(result.succeeded, 2);
812 assert!(result.is_success());
813
814 assert!(!engine.l1_cache.contains_key("del.1"));
815 assert!(!engine.l1_cache.contains_key("del.2"));
816 assert!(engine.l1_cache.contains_key("keep"));
817 }
818
819 #[tokio::test]
820 async fn test_get_or_insert_with_existing() {
821 use super::super::EngineState;
822
823 let config = test_config();
824 let (_tx, rx) = watch::channel(config.clone());
825 let engine = SyncEngine::new(config, rx);
826 let _ = engine.state.send(EngineState::Ready);
827
828 let existing = test_item("existing");
829 engine.l1_cache.insert("existing".into(), existing.clone());
830
831 let factory_called = std::sync::atomic::AtomicBool::new(false);
832 let result = engine.get_or_insert_with("existing", || {
833 factory_called.store(true, std::sync::atomic::Ordering::SeqCst);
834 async { test_item("should_not_be_used") }
835 }).await.expect("get_or_insert_with failed");
836
837 assert!(!factory_called.load(std::sync::atomic::Ordering::SeqCst));
838 assert_eq!(result.object_id, "existing");
839 }
840
841 #[tokio::test]
842 async fn test_get_or_insert_with_missing() {
843 use super::super::EngineState;
844
845 let config = test_config();
846 let (_tx, rx) = watch::channel(config.clone());
847 let engine = SyncEngine::new(config, rx);
848 let _ = engine.state.send(EngineState::Ready);
849
850 let result = engine.get_or_insert_with("new_item", || async {
851 SyncItem::from_json("new_item".into(), json!({"created": "by factory"}))
852 }).await.expect("get_or_insert_with failed");
853
854 assert_eq!(result.object_id, "new_item");
855 assert!(engine.contains("new_item").await);
856 }
857}