sync_engine/coordinator/api.rs
1//! V1.1 API: Query and batch operations.
2//!
3//! This module contains the higher-level API methods added in V1.1:
4//! - `contains()` - Fast probabilistic existence check
5//! - `len()` / `is_empty()` - L1 cache size queries
6//! - `status()` - Detailed sync status
7//! - `get_many()` - Parallel batch fetch
8//! - `submit_many()` - Batch upsert
9//! - `delete_many()` - Batch delete
10//! - `get_or_insert_with()` - Cache-aside pattern
11
12use std::sync::atomic::Ordering;
13use tokio::task::JoinSet;
14use tracing::{debug, info, warn, error};
15
16use crate::storage::traits::StorageError;
17use crate::sync_item::SyncItem;
18use crate::merkle::MerkleBatch;
19
20use super::{SyncEngine, ItemStatus, BatchResult};
21
22impl SyncEngine {
23 // ═══════════════════════════════════════════════════════════════════════════
24 // API: Query & Batch Operations
25 // ═══════════════════════════════════════════════════════════════════════════
26
27 /// Check if an item exists across all tiers.
28 ///
29 /// Checks in order: L1 cache → Redis EXISTS → L3 Cuckoo filter → SQL query.
30 /// If found in SQL and Cuckoo filter was untrusted, updates the filter.
31 ///
32 /// # Returns
33 /// - `true` → item definitely exists in at least one tier
34 /// - `false` → item does not exist (authoritative)
35 ///
36 /// # Example
37 ///
38 /// ```rust,no_run
39 /// # use sync_engine::SyncEngine;
40 /// # async fn example(engine: &SyncEngine) {
41 /// if engine.contains("user.123").await {
42 /// let item = engine.get("user.123").await;
43 /// } else {
44 /// println!("Not found");
45 /// }
46 /// # }
47 /// ```
48 pub async fn contains(&self, id: &str) -> bool {
49 // L1: In-memory cache (definitive, sync)
50 if self.l1_cache.contains_key(id) {
51 return true;
52 }
53
54 // L2: Redis EXISTS (async, authoritative for Redis tier)
55 if let Some(ref l2) = self.l2_store {
56 if l2.exists(id).await.unwrap_or(false) {
57 return true;
58 }
59 }
60
61 // L3: Cuckoo filter check (sync, probabilistic)
62 if self.l3_filter.is_trusted() {
63 // Filter is trusted - use it for fast negative
64 if !self.l3_filter.should_check_l3(id) {
65 return false; // Definitely not in L3
66 }
67 }
68
69 // L3: SQL query (async, ground truth)
70 if let Some(ref l3) = self.l3_store {
71 if l3.exists(id).await.unwrap_or(false) {
72 // Found in SQL - update Cuckoo if it was untrusted
73 if !self.l3_filter.is_trusted() {
74 self.l3_filter.insert(id);
75 }
76 return true;
77 }
78 }
79
80 false
81 }
82
83 /// Fast sync check if item might exist (L1 + Cuckoo only).
84 ///
85 /// Use this when you need a quick probabilistic check without async.
86 /// For authoritative check, use `contains()` instead.
87 #[must_use]
88 #[inline]
89 pub fn contains_fast(&self, id: &str) -> bool {
90 self.l1_cache.contains_key(id) || self.l3_filter.should_check_l3(id)
91 }
92
93 /// Check if the item at `key` has the given content hash.
94 ///
95 /// This is the semantic API for CDC deduplication in replication.
96 /// Returns `true` if the item exists AND its content hash matches.
97 /// Returns `false` if item doesn't exist OR hash differs.
98 ///
99 /// # Arguments
100 /// * `id` - Object ID
101 /// * `content_hash` - SHA256 hash of content (hex-encoded string)
102 ///
103 /// # Example
104 ///
105 /// ```rust,no_run
106 /// # use sync_engine::SyncEngine;
107 /// # async fn example(engine: &SyncEngine) {
108 /// // Skip replication if we already have this version
109 /// let incoming_hash = "abc123...";
110 /// if engine.is_current("patient.123", incoming_hash).await {
111 /// println!("Already up to date, skipping");
112 /// return;
113 /// }
114 /// # }
115 /// ```
116 pub async fn is_current(&self, id: &str, content_hash: &str) -> bool {
117 // Check L1 first (fastest)
118 if let Some(item) = self.l1_cache.get(id) {
119 return item.content_hash == content_hash;
120 }
121
122 // Check L2 (if available)
123 if let Some(ref l2) = self.l2_store {
124 if let Ok(Some(item)) = l2.get(id).await {
125 return item.content_hash == content_hash;
126 }
127 }
128
129 // Check L3 (ground truth)
130 if let Some(ref l3) = self.l3_store {
131 if self.l3_filter.should_check_l3(id) {
132 if let Ok(Some(item)) = l3.get(id).await {
133 return item.content_hash == content_hash;
134 }
135 }
136 }
137
138 false
139 }
140
141 /// Get the current count of items in L1 cache.
142 #[must_use]
143 #[inline]
144 pub fn len(&self) -> usize {
145 self.l1_cache.len()
146 }
147
148 /// Check if L1 cache is empty.
149 #[must_use]
150 #[inline]
151 pub fn is_empty(&self) -> bool {
152 self.l1_cache.is_empty()
153 }
154
155 /// Get the sync status of an item.
156 ///
157 /// Returns detailed state information about where an item exists
158 /// and its sync status across tiers.
159 ///
160 /// # Example
161 ///
162 /// ```rust,no_run
163 /// # use sync_engine::{SyncEngine, ItemStatus};
164 /// # async fn example(engine: &SyncEngine) {
165 /// match engine.status("order.456").await {
166 /// ItemStatus::Synced { in_l1, in_l2, in_l3 } => {
167 /// println!("Synced: L1={}, L2={}, L3={}", in_l1, in_l2, in_l3);
168 /// }
169 /// ItemStatus::Pending => println!("Queued for sync"),
170 /// ItemStatus::Missing => println!("Not found"),
171 /// }
172 /// # }
173 /// ```
174 pub async fn status(&self, id: &str) -> ItemStatus {
175 let in_l1 = self.l1_cache.contains_key(id);
176
177 // Check if pending in batch queue
178 let pending = self.l2_batcher.lock().await.contains(id);
179 if pending {
180 return ItemStatus::Pending;
181 }
182
183 // Check L2 (if available) - use EXISTS, no filter
184 let in_l2 = if let Some(ref l2) = self.l2_store {
185 l2.exists(id).await.unwrap_or(false)
186 } else {
187 false
188 };
189
190 // Check L3 (if available)
191 let in_l3 = if let Some(ref l3) = self.l3_store {
192 self.l3_filter.should_check_l3(id) && l3.get(id).await.ok().flatten().is_some()
193 } else {
194 false
195 };
196
197 if in_l1 || in_l2 || in_l3 {
198 ItemStatus::Synced { in_l1, in_l2, in_l3 }
199 } else {
200 ItemStatus::Missing
201 }
202 }
203
204 /// Fetch multiple items in parallel.
205 ///
206 /// Returns a vector of `Option<SyncItem>` in the same order as input IDs.
207 /// Missing items are represented as `None`.
208 ///
209 /// # Performance
210 ///
211 /// This method fetches from L1 synchronously, then batches L2/L3 lookups
212 /// for items not in L1. Much faster than sequential `get()` calls.
213 ///
214 /// # Example
215 ///
216 /// ```rust,no_run
217 /// # use sync_engine::SyncEngine;
218 /// # async fn example(engine: &SyncEngine) {
219 /// let ids = vec!["user.1", "user.2", "user.3"];
220 /// let items = engine.get_many(&ids).await;
221 /// for (id, item) in ids.iter().zip(items.iter()) {
222 /// match item {
223 /// Some(item) => println!("{}: found", id),
224 /// None => println!("{}: missing", id),
225 /// }
226 /// }
227 /// # }
228 /// ```
229 pub async fn get_many(&self, ids: &[&str]) -> Vec<Option<SyncItem>> {
230 let mut results: Vec<Option<SyncItem>> = vec![None; ids.len()];
231 let mut missing_indices: Vec<usize> = Vec::new();
232
233 // Phase 1: Check L1 (synchronous, fast)
234 for (i, id) in ids.iter().enumerate() {
235 if let Some(item) = self.l1_cache.get(*id) {
236 results[i] = Some(item.clone());
237 } else {
238 missing_indices.push(i);
239 }
240 }
241
242 // Phase 2: Fetch missing items from L2/L3 in parallel
243 if !missing_indices.is_empty() {
244 let mut join_set: JoinSet<(usize, Option<SyncItem>)> = JoinSet::new();
245
246 for &i in &missing_indices {
247 let id = ids[i].to_string();
248 let l2_store = self.l2_store.clone();
249 let l3_store = self.l3_store.clone();
250 let l3_filter = self.l3_filter.clone();
251
252 join_set.spawn(async move {
253 // Try L2 first (no filter, just try Redis)
254 if let Some(ref l2) = l2_store {
255 if let Ok(Some(item)) = l2.get(&id).await {
256 return (i, Some(item));
257 }
258 }
259
260 // Fall back to L3 (use Cuckoo filter if trusted)
261 if let Some(ref l3) = l3_store {
262 if !l3_filter.is_trusted() || l3_filter.should_check_l3(&id) {
263 if let Ok(Some(item)) = l3.get(&id).await {
264 return (i, Some(item));
265 }
266 }
267 }
268
269 (i, None)
270 });
271 }
272
273 // Collect results
274 while let Some(result) = join_set.join_next().await {
275 if let Ok((i, item)) = result {
276 results[i] = item;
277 }
278 }
279 }
280
281 results
282 }
283
284 /// Submit multiple items for sync atomically.
285 ///
286 /// All items are added to L1 and queued for batch persistence.
287 /// Returns a `BatchResult` with success/failure counts.
288 ///
289 /// # Example
290 ///
291 /// ```rust,no_run
292 /// # use sync_engine::{SyncEngine, SyncItem};
293 /// # use serde_json::json;
294 /// # async fn example(engine: &SyncEngine) {
295 /// let items = vec![
296 /// SyncItem::from_json("user.1".into(), json!({"name": "Alice"})),
297 /// SyncItem::from_json("user.2".into(), json!({"name": "Bob"})),
298 /// ];
299 /// let result = engine.submit_many(items).await.unwrap();
300 /// println!("Submitted: {}, Failed: {}", result.succeeded, result.failed);
301 /// # }
302 /// ```
303 pub async fn submit_many(&self, items: Vec<SyncItem>) -> Result<BatchResult, StorageError> {
304 if !self.should_accept_writes() {
305 return Err(StorageError::Backend(format!(
306 "Rejecting batch write: engine state={}, pressure={}",
307 self.state(),
308 self.pressure()
309 )));
310 }
311
312 let total = items.len();
313 let mut succeeded = 0;
314
315 // Lock batcher once for the whole batch
316 let mut batcher = self.l2_batcher.lock().await;
317
318 for item in items {
319 self.insert_l1(item.clone());
320 batcher.add(item);
321 succeeded += 1;
322 }
323
324 debug!(total, succeeded, "Batch submitted to L1 and queue");
325
326 Ok(BatchResult {
327 total,
328 succeeded,
329 failed: total - succeeded,
330 })
331 }
332
333 /// Delete multiple items atomically.
334 ///
335 /// Removes items from all tiers (L1, L2, L3) and updates filters.
336 /// Returns a `BatchResult` with counts.
337 ///
338 /// # Example
339 ///
340 /// ```rust,no_run
341 /// # use sync_engine::SyncEngine;
342 /// # async fn example(engine: &SyncEngine) {
343 /// let ids = vec!["user.1", "user.2", "user.3"];
344 /// let result = engine.delete_many(&ids).await.unwrap();
345 /// println!("Deleted: {}", result.succeeded);
346 /// # }
347 /// ```
348 pub async fn delete_many(&self, ids: &[&str]) -> Result<BatchResult, StorageError> {
349 if !self.should_accept_writes() {
350 return Err(StorageError::Backend(format!(
351 "Rejecting batch delete: engine state={}, pressure={}",
352 self.state(),
353 self.pressure()
354 )));
355 }
356
357 let total = ids.len();
358 let mut succeeded = 0;
359
360 // Build merkle batch for all deletions
361 let mut merkle_batch = MerkleBatch::new();
362
363 for id in ids {
364 // Remove from L1
365 if let Some((_, item)) = self.l1_cache.remove(*id) {
366 let size = Self::item_size(&item);
367 self.l1_size_bytes.fetch_sub(size, Ordering::Release);
368 }
369
370 // Remove from L3 filter (no L2 filter with TTL support)
371 self.l3_filter.remove(id);
372
373 // Queue merkle deletion
374 merkle_batch.delete(id.to_string());
375
376 succeeded += 1;
377 }
378
379 // Batch delete from L2
380 if let Some(ref l2) = self.l2_store {
381 for id in ids {
382 if let Err(e) = l2.delete(id).await {
383 warn!(id, error = %e, "Failed to delete from L2");
384 }
385 }
386 }
387
388 // Batch delete from L3
389 if let Some(ref l3) = self.l3_store {
390 for id in ids {
391 if let Err(e) = l3.delete(id).await {
392 warn!(id, error = %e, "Failed to delete from L3");
393 }
394 }
395 }
396
397 // Update merkle trees
398 if let Some(ref sql_merkle) = self.sql_merkle {
399 if let Err(e) = sql_merkle.apply_batch(&merkle_batch).await {
400 error!(error = %e, "Failed to update SQL Merkle tree for batch deletion");
401 }
402 }
403
404 if let Some(ref redis_merkle) = self.redis_merkle {
405 if let Err(e) = redis_merkle.apply_batch(&merkle_batch).await {
406 warn!(error = %e, "Failed to update Redis Merkle tree for batch deletion");
407 }
408 }
409
410 info!(total, succeeded, "Batch delete completed");
411
412 Ok(BatchResult {
413 total,
414 succeeded,
415 failed: total - succeeded,
416 })
417 }
418
419 /// Get an item, or compute and insert it if missing.
420 ///
421 /// This is the classic "get or insert" pattern, useful for cache-aside:
422 /// 1. Check cache (L1 → L2 → L3)
423 /// 2. If missing, call the async factory function
424 /// 3. Insert the result and return it
425 ///
426 /// The factory is only called if the item is not found.
427 ///
428 /// # Example
429 ///
430 /// ```rust,no_run
431 /// # use sync_engine::{SyncEngine, SyncItem};
432 /// # use serde_json::json;
433 /// # async fn example(engine: &SyncEngine) {
434 /// let item = engine.get_or_insert_with("user.123", || async {
435 /// // Expensive operation - only runs if not cached
436 /// SyncItem::from_json("user.123".into(), json!({"name": "Fetched from DB"}))
437 /// }).await.unwrap();
438 /// # }
439 /// ```
440 pub async fn get_or_insert_with<F, Fut>(
441 &self,
442 id: &str,
443 factory: F,
444 ) -> Result<SyncItem, StorageError>
445 where
446 F: FnOnce() -> Fut,
447 Fut: std::future::Future<Output = SyncItem>,
448 {
449 // Try to get existing
450 if let Some(item) = self.get(id).await? {
451 return Ok(item);
452 }
453
454 // Not found - compute new value
455 let item = factory().await;
456
457 // Insert and return
458 self.submit(item.clone()).await?;
459
460 Ok(item)
461 }
462
463 // ═══════════════════════════════════════════════════════════════════════════
464 // State-based queries: Fast indexed access by caller-defined state tag
465 // ═══════════════════════════════════════════════════════════════════════════
466
467 /// Get items by state from SQL (L3 ground truth).
468 ///
469 /// Uses indexed query for fast retrieval.
470 ///
471 /// # Example
472 ///
473 /// ```rust,no_run
474 /// # use sync_engine::{SyncEngine, StorageError};
475 /// # async fn example(engine: &SyncEngine) -> Result<(), StorageError> {
476 /// // Get all delta items for CRDT merging
477 /// let deltas = engine.get_by_state("delta", 1000).await?;
478 /// for item in deltas {
479 /// println!("Delta: {}", item.object_id);
480 /// }
481 /// # Ok(())
482 /// # }
483 /// ```
484 pub async fn get_by_state(&self, state: &str, limit: usize) -> Result<Vec<SyncItem>, StorageError> {
485 if let Some(ref sql) = self.sql_store {
486 sql.get_by_state(state, limit).await
487 } else {
488 Ok(Vec::new())
489 }
490 }
491
492 /// Count items in a given state (SQL ground truth).
493 ///
494 /// # Example
495 ///
496 /// ```rust,no_run
497 /// # use sync_engine::{SyncEngine, StorageError};
498 /// # async fn example(engine: &SyncEngine) -> Result<(), StorageError> {
499 /// let pending_count = engine.count_by_state("pending").await?;
500 /// println!("{} items pending", pending_count);
501 /// # Ok(())
502 /// # }
503 /// ```
504 pub async fn count_by_state(&self, state: &str) -> Result<u64, StorageError> {
505 if let Some(ref sql) = self.sql_store {
506 sql.count_by_state(state).await
507 } else {
508 Ok(0)
509 }
510 }
511
512 /// Get just the IDs of items in a given state (lightweight query).
513 ///
514 /// Returns IDs from SQL. For Redis state SET, use `list_state_ids_redis()`.
515 pub async fn list_state_ids(&self, state: &str, limit: usize) -> Result<Vec<String>, StorageError> {
516 if let Some(ref sql) = self.sql_store {
517 sql.list_state_ids(state, limit).await
518 } else {
519 Ok(Vec::new())
520 }
521 }
522
523 /// Update the state of an item by ID.
524 ///
525 /// Updates both SQL (ground truth) and Redis state SETs.
526 /// L1 cache is NOT updated - caller should re-fetch if needed.
527 ///
528 /// Returns true if the item was found and updated.
529 pub async fn set_state(&self, id: &str, new_state: &str) -> Result<bool, StorageError> {
530 let mut updated = false;
531
532 // Update SQL (ground truth)
533 if let Some(ref sql) = self.sql_store {
534 updated = sql.set_state(id, new_state).await?;
535 }
536
537 // Note: Redis state SETs are not updated here because we'd need to know
538 // the old state to do SREM. For full Redis state management, the item
539 // should be re-submitted with the new state via submit_with().
540
541 Ok(updated)
542 }
543
544 /// Delete all items in a given state from SQL.
545 ///
546 /// Also removes from L1 cache and Redis state SET.
547 /// Returns the number of deleted items.
548 ///
549 /// # Example
550 ///
551 /// ```rust,no_run
552 /// # use sync_engine::{SyncEngine, StorageError};
553 /// # async fn example(engine: &SyncEngine) -> Result<(), StorageError> {
554 /// // Clean up all processed deltas
555 /// let deleted = engine.delete_by_state("delta").await?;
556 /// println!("Deleted {} delta items", deleted);
557 /// # Ok(())
558 /// # }
559 /// ```
560 pub async fn delete_by_state(&self, state: &str) -> Result<u64, StorageError> {
561 let mut deleted = 0u64;
562
563 // Get IDs first (for L1 cleanup)
564 let ids = if let Some(ref sql) = self.sql_store {
565 sql.list_state_ids(state, 100_000).await?
566 } else {
567 Vec::new()
568 };
569
570 // Remove from L1 cache
571 for id in &ids {
572 self.l1_cache.remove(id);
573 }
574
575 // Delete from SQL
576 if let Some(ref sql) = self.sql_store {
577 deleted = sql.delete_by_state(state).await?;
578 }
579
580 // Note: Redis items with TTL will expire naturally.
581 // For immediate Redis cleanup, call delete_by_state on RedisStore directly.
582
583 info!(state = %state, deleted = deleted, "Deleted items by state");
584
585 Ok(deleted)
586 }
587
588 // =========================================================================
589 // Prefix Scan Operations
590 // =========================================================================
591
592 /// Scan items by ID prefix.
593 ///
594 /// Retrieves all items whose ID starts with the given prefix.
595 /// Queries SQL (ground truth) directly - does NOT check L1 cache.
596 ///
597 /// Useful for CRDT delta-first architecture where deltas are stored as:
598 /// `delta:{object_id}:{op_id}` and you need to fetch all deltas for an object.
599 ///
600 /// # Example
601 ///
602 /// ```rust,no_run
603 /// # use sync_engine::{SyncEngine, StorageError};
604 /// # async fn example(engine: &SyncEngine) -> Result<(), StorageError> {
605 /// // Get base state
606 /// let base = engine.get("base:user.123").await?;
607 ///
608 /// // Get all pending deltas for this object
609 /// let deltas = engine.scan_prefix("delta:user.123:", 1000).await?;
610 ///
611 /// // Merge on-the-fly for read-repair
612 /// for delta in deltas {
613 /// println!("Delta: {} -> {:?}", delta.object_id, delta.content_as_json());
614 /// }
615 /// # Ok(())
616 /// # }
617 /// ```
618 pub async fn scan_prefix(&self, prefix: &str, limit: usize) -> Result<Vec<SyncItem>, StorageError> {
619 if let Some(ref sql) = self.sql_store {
620 sql.scan_prefix(prefix, limit).await
621 } else {
622 Ok(Vec::new())
623 }
624 }
625
626 /// Count items matching an ID prefix (SQL ground truth).
627 pub async fn count_prefix(&self, prefix: &str) -> Result<u64, StorageError> {
628 if let Some(ref sql) = self.sql_store {
629 sql.count_prefix(prefix).await
630 } else {
631 Ok(0)
632 }
633 }
634
635 /// Delete all items matching an ID prefix.
636 ///
637 /// Removes from L1 cache, SQL, and Redis.
638 /// Returns the number of deleted items.
639 ///
640 /// # Example
641 ///
642 /// ```rust,no_run
643 /// # use sync_engine::{SyncEngine, StorageError};
644 /// # async fn example(engine: &SyncEngine) -> Result<(), StorageError> {
645 /// // After merging deltas into base, clean them up
646 /// let deleted = engine.delete_prefix("delta:user.123:").await?;
647 /// println!("Cleaned up {} deltas", deleted);
648 /// # Ok(())
649 /// # }
650 /// ```
651 pub async fn delete_prefix(&self, prefix: &str) -> Result<u64, StorageError> {
652 let mut deleted = 0u64;
653
654 // Get IDs first (for L1/L2 cleanup)
655 let items = if let Some(ref sql) = self.sql_store {
656 sql.scan_prefix(prefix, 100_000).await?
657 } else {
658 Vec::new()
659 };
660
661 // Remove from L1 cache
662 for item in &items {
663 self.l1_cache.remove(&item.object_id);
664 }
665
666 // Remove from L2 (Redis) one-by-one via CacheStore trait
667 if let Some(ref l2) = self.l2_store {
668 for item in &items {
669 let _ = l2.delete(&item.object_id).await;
670 }
671 }
672
673 // Delete from SQL
674 if let Some(ref sql) = self.sql_store {
675 deleted = sql.delete_prefix(prefix).await?;
676 }
677
678 info!(prefix = %prefix, deleted = deleted, "Deleted items by prefix");
679
680 Ok(deleted)
681 }
682}
683
684#[cfg(test)]
685mod tests {
686 use super::*;
687 use crate::config::SyncEngineConfig;
688 use serde_json::json;
689 use tokio::sync::watch;
690
691 fn test_config() -> SyncEngineConfig {
692 SyncEngineConfig {
693 redis_url: None,
694 sql_url: None,
695 wal_path: None,
696 l1_max_bytes: 1024 * 1024,
697 ..Default::default()
698 }
699 }
700
701 fn test_item(id: &str) -> SyncItem {
702 SyncItem::from_json(id.to_string(), json!({"test": "data", "id": id}))
703 }
704
705 #[tokio::test]
706 async fn test_contains_l1_hit() {
707 let config = test_config();
708 let (_tx, rx) = watch::channel(config.clone());
709 let engine = SyncEngine::new(config, rx);
710
711 engine.l1_cache.insert("test.exists".into(), test_item("test.exists"));
712
713 assert!(engine.contains("test.exists").await);
714 }
715
716 #[tokio::test]
717 async fn test_contains_with_trusted_filter() {
718 let config = test_config();
719 let (_tx, rx) = watch::channel(config.clone());
720 let engine = SyncEngine::new(config, rx);
721
722 engine.l3_filter.mark_trusted();
723
724 engine.l1_cache.insert("test.exists".into(), test_item("test.exists"));
725 engine.l3_filter.insert("test.exists");
726
727 assert!(engine.contains("test.exists").await);
728 assert!(!engine.contains("test.missing").await);
729 }
730
731 #[test]
732 fn test_len_and_is_empty() {
733 let config = test_config();
734 let (_tx, rx) = watch::channel(config.clone());
735 let engine = SyncEngine::new(config, rx);
736
737 assert!(engine.is_empty());
738 assert_eq!(engine.len(), 0);
739
740 engine.l1_cache.insert("a".into(), test_item("a"));
741 assert!(!engine.is_empty());
742 assert_eq!(engine.len(), 1);
743
744 engine.l1_cache.insert("b".into(), test_item("b"));
745 assert_eq!(engine.len(), 2);
746 }
747
748 #[tokio::test]
749 async fn test_status_synced_in_l1() {
750 use super::super::EngineState;
751
752 let config = test_config();
753 let (_tx, rx) = watch::channel(config.clone());
754 let engine = SyncEngine::new(config, rx);
755 let _ = engine.state.send(EngineState::Ready);
756
757 engine.submit(test_item("test.item")).await.expect("Submit failed");
758 let _ = engine.l2_batcher.lock().await.force_flush();
759
760 let status = engine.status("test.item").await;
761 assert!(matches!(status, ItemStatus::Synced { in_l1: true, .. }));
762 }
763
764 #[tokio::test]
765 async fn test_status_pending() {
766 use super::super::EngineState;
767
768 let config = test_config();
769 let (_tx, rx) = watch::channel(config.clone());
770 let engine = SyncEngine::new(config, rx);
771 let _ = engine.state.send(EngineState::Ready);
772
773 engine.submit(test_item("test.pending")).await.expect("Submit failed");
774
775 let status = engine.status("test.pending").await;
776 assert_eq!(status, ItemStatus::Pending);
777 }
778
779 #[tokio::test]
780 async fn test_status_missing() {
781 let config = test_config();
782 let (_tx, rx) = watch::channel(config.clone());
783 let engine = SyncEngine::new(config, rx);
784
785 let status = engine.status("test.nonexistent").await;
786 assert_eq!(status, ItemStatus::Missing);
787 }
788
789 #[tokio::test]
790 async fn test_get_many_from_l1() {
791 use super::super::EngineState;
792
793 let config = test_config();
794 let (_tx, rx) = watch::channel(config.clone());
795 let engine = SyncEngine::new(config, rx);
796 let _ = engine.state.send(EngineState::Ready);
797
798 engine.l1_cache.insert("a".into(), test_item("a"));
799 engine.l1_cache.insert("b".into(), test_item("b"));
800 engine.l1_cache.insert("c".into(), test_item("c"));
801
802 let results = engine.get_many(&["a", "b", "missing", "c"]).await;
803
804 assert_eq!(results.len(), 4);
805 assert!(results[0].is_some());
806 assert!(results[1].is_some());
807 assert!(results[2].is_none());
808 assert!(results[3].is_some());
809
810 assert_eq!(results[0].as_ref().unwrap().object_id, "a");
811 assert_eq!(results[1].as_ref().unwrap().object_id, "b");
812 assert_eq!(results[3].as_ref().unwrap().object_id, "c");
813 }
814
815 #[tokio::test]
816 async fn test_submit_many() {
817 use super::super::EngineState;
818
819 let config = test_config();
820 let (_tx, rx) = watch::channel(config.clone());
821 let engine = SyncEngine::new(config, rx);
822 let _ = engine.state.send(EngineState::Ready);
823
824 let items = vec![
825 test_item("batch.1"),
826 test_item("batch.2"),
827 test_item("batch.3"),
828 ];
829
830 let result = engine.submit_many(items).await.expect("Batch submit failed");
831
832 assert_eq!(result.total, 3);
833 assert_eq!(result.succeeded, 3);
834 assert_eq!(result.failed, 0);
835 assert!(result.is_success());
836
837 assert_eq!(engine.len(), 3);
838 assert!(engine.contains("batch.1").await);
839 assert!(engine.contains("batch.2").await);
840 assert!(engine.contains("batch.3").await);
841 }
842
843 #[tokio::test]
844 async fn test_delete_many() {
845 use super::super::EngineState;
846
847 let config = test_config();
848 let (_tx, rx) = watch::channel(config.clone());
849 let engine = SyncEngine::new(config, rx);
850 let _ = engine.state.send(EngineState::Ready);
851
852 engine.l1_cache.insert("del.1".into(), test_item("del.1"));
853 engine.l1_cache.insert("del.2".into(), test_item("del.2"));
854 engine.l1_cache.insert("keep".into(), test_item("keep"));
855
856 let result = engine.delete_many(&["del.1", "del.2"]).await.expect("Batch delete failed");
857
858 assert_eq!(result.total, 2);
859 assert_eq!(result.succeeded, 2);
860 assert!(result.is_success());
861
862 assert!(!engine.l1_cache.contains_key("del.1"));
863 assert!(!engine.l1_cache.contains_key("del.2"));
864 assert!(engine.l1_cache.contains_key("keep"));
865 }
866
867 #[tokio::test]
868 async fn test_get_or_insert_with_existing() {
869 use super::super::EngineState;
870
871 let config = test_config();
872 let (_tx, rx) = watch::channel(config.clone());
873 let engine = SyncEngine::new(config, rx);
874 let _ = engine.state.send(EngineState::Ready);
875
876 let existing = test_item("existing");
877 engine.l1_cache.insert("existing".into(), existing.clone());
878
879 let factory_called = std::sync::atomic::AtomicBool::new(false);
880 let result = engine.get_or_insert_with("existing", || {
881 factory_called.store(true, std::sync::atomic::Ordering::SeqCst);
882 async { test_item("should_not_be_used") }
883 }).await.expect("get_or_insert_with failed");
884
885 assert!(!factory_called.load(std::sync::atomic::Ordering::SeqCst));
886 assert_eq!(result.object_id, "existing");
887 }
888
889 #[tokio::test]
890 async fn test_get_or_insert_with_missing() {
891 use super::super::EngineState;
892
893 let config = test_config();
894 let (_tx, rx) = watch::channel(config.clone());
895 let engine = SyncEngine::new(config, rx);
896 let _ = engine.state.send(EngineState::Ready);
897
898 let result = engine.get_or_insert_with("new_item", || async {
899 SyncItem::from_json("new_item".into(), json!({"created": "by factory"}))
900 }).await.expect("get_or_insert_with failed");
901
902 assert_eq!(result.object_id, "new_item");
903 assert!(engine.contains("new_item").await);
904 }
905}