sync_engine/coordinator/api.rs
1//! V1.1 API: Query and batch operations.
2//!
3//! This module contains the higher-level API methods added in V1.1:
4//! - `contains()` - Fast probabilistic existence check
5//! - `len()` / `is_empty()` - L1 cache size queries
6//! - `status()` - Detailed sync status
7//! - `get_many()` - Parallel batch fetch
8//! - `submit_many()` - Batch upsert
9//! - `delete_many()` - Batch delete
10//! - `get_or_insert_with()` - Cache-aside pattern
11
12use std::sync::atomic::Ordering;
13use tokio::task::JoinSet;
14use tracing::{debug, info, warn, error};
15
16use crate::storage::traits::StorageError;
17use crate::sync_item::SyncItem;
18use crate::merkle::MerkleBatch;
19
20use super::{SyncEngine, ItemStatus, BatchResult};
21
22impl SyncEngine {
23 // ═══════════════════════════════════════════════════════════════════════════
24 // API: Query & Batch Operations
25 // ═══════════════════════════════════════════════════════════════════════════
26
27 /// Check if an item exists across all tiers.
28 ///
29 /// Checks in order: L1 cache → Redis EXISTS → L3 Cuckoo filter → SQL query.
30 /// If found in SQL and Cuckoo filter was untrusted, updates the filter.
31 ///
32 /// # Returns
33 /// - `true` → item definitely exists in at least one tier
34 /// - `false` → item does not exist (authoritative)
35 ///
36 /// # Example
37 ///
38 /// ```rust,no_run
39 /// # use sync_engine::SyncEngine;
40 /// # async fn example(engine: &SyncEngine) {
41 /// if engine.contains("user.123").await {
42 /// let item = engine.get("user.123").await;
43 /// } else {
44 /// println!("Not found");
45 /// }
46 /// # }
47 /// ```
48 pub async fn contains(&self, id: &str) -> bool {
49 // L1: In-memory cache (definitive, sync)
50 if self.l1_cache.contains_key(id) {
51 return true;
52 }
53
54 // L2: Redis EXISTS (async, authoritative for Redis tier)
55 if let Some(ref l2) = self.l2_store {
56 if l2.exists(id).await.unwrap_or(false) {
57 return true;
58 }
59 }
60
61 // L3: Cuckoo filter check (sync, probabilistic)
62 if self.l3_filter.is_trusted() {
63 // Filter is trusted - use it for fast negative
64 if !self.l3_filter.should_check_l3(id) {
65 return false; // Definitely not in L3
66 }
67 }
68
69 // L3: SQL query (async, ground truth)
70 if let Some(ref l3) = self.l3_store {
71 if l3.exists(id).await.unwrap_or(false) {
72 // Found in SQL - update Cuckoo if it was untrusted
73 if !self.l3_filter.is_trusted() {
74 self.l3_filter.insert(id);
75 }
76 return true;
77 }
78 }
79
80 false
81 }
82
83 /// Fast check: is this item definitely NOT in L3?
84 ///
85 /// Uses the Cuckoo filter for a fast authoritative negative.
86 /// - Returns `true` → item is **definitely not** in L3 (safe to skip)
87 /// - Returns `false` → item **might** exist (need to check L3)
88 ///
89 /// Only meaningful when the L3 filter is trusted. If untrusted, returns `false`
90 /// (meaning "we don't know, you should check").
91 ///
92 /// # Use Case
93 ///
94 /// Fast early-exit in replication: if definitely missing, apply without checking.
95 ///
96 /// ```rust,no_run
97 /// # use sync_engine::SyncEngine;
98 /// # async fn example(engine: &SyncEngine) {
99 /// if engine.definitely_missing("patient.123") {
100 /// // Fast path: definitely new, just insert
101 /// println!("New item, inserting directly");
102 /// } else {
103 /// // Slow path: might exist, check hash
104 /// if !engine.is_current("patient.123", "abc123...").await {
105 /// println!("Outdated, updating");
106 /// }
107 /// }
108 /// # }
109 /// ```
110 #[must_use]
111 #[inline]
112 pub fn definitely_missing(&self, id: &str) -> bool {
113 // Only authoritative if filter is trusted
114 if !self.l3_filter.is_trusted() {
115 return false; // Unknown, caller should check
116 }
117 // Cuckoo false = definitely not there
118 !self.l3_filter.should_check_l3(id)
119 }
120
121 /// Fast check: might this item exist somewhere?
122 ///
123 /// Checks L1 cache (partial, evicts) and Cuckoo filter (probabilistic).
124 /// - Returns `true` → item is in L1 OR might be in L3 (worth checking)
125 /// - Returns `false` → item is definitely not in L1 or L3
126 ///
127 /// Note: L1 is partial (items evict), so this can return `false` even if
128 /// the item exists in L2/L3. For authoritative check, use `contains()`.
129 ///
130 /// # Use Case
131 ///
132 /// Quick probabilistic check before expensive async lookup.
133 #[must_use]
134 #[inline]
135 pub fn might_exist(&self, id: &str) -> bool {
136 self.l1_cache.contains_key(id) || self.l3_filter.should_check_l3(id)
137 }
138
139 /// Check if the item at `key` has the given content hash.
140 ///
141 /// This is the semantic API for CDC deduplication in replication.
142 /// Returns `true` if the item exists AND its content hash matches.
143 /// Returns `false` if item doesn't exist OR hash differs.
144 ///
145 /// # Arguments
146 /// * `id` - Object ID
147 /// * `content_hash` - SHA256 hash of content (hex-encoded string)
148 ///
149 /// # Example
150 ///
151 /// ```rust,no_run
152 /// # use sync_engine::SyncEngine;
153 /// # async fn example(engine: &SyncEngine) {
154 /// // Skip replication if we already have this version
155 /// let incoming_hash = "abc123...";
156 /// if engine.is_current("patient.123", incoming_hash).await {
157 /// println!("Already up to date, skipping");
158 /// return;
159 /// }
160 /// # }
161 /// ```
162 pub async fn is_current(&self, id: &str, content_hash: &str) -> bool {
163 // Check L1 first (fastest)
164 if let Some(item) = self.l1_cache.get(id) {
165 return item.content_hash == content_hash;
166 }
167
168 // Check L2 (if available)
169 if let Some(ref l2) = self.l2_store {
170 if let Ok(Some(item)) = l2.get(id).await {
171 return item.content_hash == content_hash;
172 }
173 }
174
175 // Check L3 (ground truth)
176 if let Some(ref l3) = self.l3_store {
177 if self.l3_filter.should_check_l3(id) {
178 if let Ok(Some(item)) = l3.get(id).await {
179 return item.content_hash == content_hash;
180 }
181 }
182 }
183
184 false
185 }
186
187 /// Get the current count of items in L1 cache.
188 #[must_use]
189 #[inline]
190 pub fn len(&self) -> usize {
191 self.l1_cache.len()
192 }
193
194 /// Check if L1 cache is empty.
195 #[must_use]
196 #[inline]
197 pub fn is_empty(&self) -> bool {
198 self.l1_cache.is_empty()
199 }
200
201 /// Get the sync status of an item.
202 ///
203 /// Returns detailed state information about where an item exists
204 /// and its sync status across tiers.
205 ///
206 /// # Example
207 ///
208 /// ```rust,no_run
209 /// # use sync_engine::{SyncEngine, ItemStatus};
210 /// # async fn example(engine: &SyncEngine) {
211 /// match engine.status("order.456").await {
212 /// ItemStatus::Synced { in_l1, in_l2, in_l3 } => {
213 /// println!("Synced: L1={}, L2={}, L3={}", in_l1, in_l2, in_l3);
214 /// }
215 /// ItemStatus::Pending => println!("Queued for sync"),
216 /// ItemStatus::Missing => println!("Not found"),
217 /// }
218 /// # }
219 /// ```
220 pub async fn status(&self, id: &str) -> ItemStatus {
221 let in_l1 = self.l1_cache.contains_key(id);
222
223 // Check if pending in batch queue
224 let pending = self.l2_batcher.lock().await.contains(id);
225 if pending {
226 return ItemStatus::Pending;
227 }
228
229 // Check L2 (if available) - use EXISTS, no filter
230 let in_l2 = if let Some(ref l2) = self.l2_store {
231 l2.exists(id).await.unwrap_or(false)
232 } else {
233 false
234 };
235
236 // Check L3 (if available)
237 let in_l3 = if let Some(ref l3) = self.l3_store {
238 self.l3_filter.should_check_l3(id) && l3.get(id).await.ok().flatten().is_some()
239 } else {
240 false
241 };
242
243 if in_l1 || in_l2 || in_l3 {
244 ItemStatus::Synced { in_l1, in_l2, in_l3 }
245 } else {
246 ItemStatus::Missing
247 }
248 }
249
250 /// Fetch multiple items in parallel.
251 ///
252 /// Returns a vector of `Option<SyncItem>` in the same order as input IDs.
253 /// Missing items are represented as `None`.
254 ///
255 /// # Performance
256 ///
257 /// This method fetches from L1 synchronously, then batches L2/L3 lookups
258 /// for items not in L1. Much faster than sequential `get()` calls.
259 ///
260 /// # Example
261 ///
262 /// ```rust,no_run
263 /// # use sync_engine::SyncEngine;
264 /// # async fn example(engine: &SyncEngine) {
265 /// let ids = vec!["user.1", "user.2", "user.3"];
266 /// let items = engine.get_many(&ids).await;
267 /// for (id, item) in ids.iter().zip(items.iter()) {
268 /// match item {
269 /// Some(item) => println!("{}: found", id),
270 /// None => println!("{}: missing", id),
271 /// }
272 /// }
273 /// # }
274 /// ```
275 pub async fn get_many(&self, ids: &[&str]) -> Vec<Option<SyncItem>> {
276 let mut results: Vec<Option<SyncItem>> = vec![None; ids.len()];
277 let mut missing_indices: Vec<usize> = Vec::new();
278
279 // Phase 1: Check L1 (synchronous, fast)
280 for (i, id) in ids.iter().enumerate() {
281 if let Some(item) = self.l1_cache.get(*id) {
282 results[i] = Some(item.clone());
283 } else {
284 missing_indices.push(i);
285 }
286 }
287
288 // Phase 2: Fetch missing items from L2/L3 in parallel
289 if !missing_indices.is_empty() {
290 let mut join_set: JoinSet<(usize, Option<SyncItem>)> = JoinSet::new();
291
292 for &i in &missing_indices {
293 let id = ids[i].to_string();
294 let l2_store = self.l2_store.clone();
295 let l3_store = self.l3_store.clone();
296 let l3_filter = self.l3_filter.clone();
297
298 join_set.spawn(async move {
299 // Try L2 first (no filter, just try Redis)
300 if let Some(ref l2) = l2_store {
301 if let Ok(Some(item)) = l2.get(&id).await {
302 return (i, Some(item));
303 }
304 }
305
306 // Fall back to L3 (use Cuckoo filter if trusted)
307 if let Some(ref l3) = l3_store {
308 if !l3_filter.is_trusted() || l3_filter.should_check_l3(&id) {
309 if let Ok(Some(item)) = l3.get(&id).await {
310 return (i, Some(item));
311 }
312 }
313 }
314
315 (i, None)
316 });
317 }
318
319 // Collect results
320 while let Some(result) = join_set.join_next().await {
321 if let Ok((i, item)) = result {
322 results[i] = item;
323 }
324 }
325 }
326
327 results
328 }
329
330 /// Submit multiple items for sync atomically.
331 ///
332 /// All items are added to L1 and queued for batch persistence.
333 /// Returns a `BatchResult` with success/failure counts.
334 ///
335 /// # Example
336 ///
337 /// ```rust,no_run
338 /// # use sync_engine::{SyncEngine, SyncItem};
339 /// # use serde_json::json;
340 /// # async fn example(engine: &SyncEngine) {
341 /// let items = vec![
342 /// SyncItem::from_json("user.1".into(), json!({"name": "Alice"})),
343 /// SyncItem::from_json("user.2".into(), json!({"name": "Bob"})),
344 /// ];
345 /// let result = engine.submit_many(items).await.unwrap();
346 /// println!("Submitted: {}, Failed: {}", result.succeeded, result.failed);
347 /// # }
348 /// ```
349 pub async fn submit_many(&self, items: Vec<SyncItem>) -> Result<BatchResult, StorageError> {
350 if !self.should_accept_writes() {
351 return Err(StorageError::Backend(format!(
352 "Rejecting batch write: engine state={}, pressure={}",
353 self.state(),
354 self.pressure()
355 )));
356 }
357
358 let total = items.len();
359 let mut succeeded = 0;
360
361 // Lock batcher once for the whole batch
362 let mut batcher = self.l2_batcher.lock().await;
363
364 for item in items {
365 self.insert_l1(item.clone());
366 batcher.add(item);
367 succeeded += 1;
368 }
369
370 debug!(total, succeeded, "Batch submitted to L1 and queue");
371
372 Ok(BatchResult {
373 total,
374 succeeded,
375 failed: total - succeeded,
376 })
377 }
378
379 /// Delete multiple items atomically.
380 ///
381 /// Removes items from all tiers (L1, L2, L3) and updates filters.
382 /// Returns a `BatchResult` with counts.
383 ///
384 /// # Example
385 ///
386 /// ```rust,no_run
387 /// # use sync_engine::SyncEngine;
388 /// # async fn example(engine: &SyncEngine) {
389 /// let ids = vec!["user.1", "user.2", "user.3"];
390 /// let result = engine.delete_many(&ids).await.unwrap();
391 /// println!("Deleted: {}", result.succeeded);
392 /// # }
393 /// ```
394 pub async fn delete_many(&self, ids: &[&str]) -> Result<BatchResult, StorageError> {
395 if !self.should_accept_writes() {
396 return Err(StorageError::Backend(format!(
397 "Rejecting batch delete: engine state={}, pressure={}",
398 self.state(),
399 self.pressure()
400 )));
401 }
402
403 let total = ids.len();
404 let mut succeeded = 0;
405
406 // Build merkle batch for all deletions
407 let mut merkle_batch = MerkleBatch::new();
408
409 for id in ids {
410 // Remove from L1
411 if let Some((_, item)) = self.l1_cache.remove(*id) {
412 let size = Self::item_size(&item);
413 self.l1_size_bytes.fetch_sub(size, Ordering::Release);
414 }
415
416 // Remove from L3 filter (no L2 filter with TTL support)
417 self.l3_filter.remove(id);
418
419 // Queue merkle deletion
420 merkle_batch.delete(id.to_string());
421
422 succeeded += 1;
423 }
424
425 // Batch delete from L2
426 if let Some(ref l2) = self.l2_store {
427 for id in ids {
428 if let Err(e) = l2.delete(id).await {
429 warn!(id, error = %e, "Failed to delete from L2");
430 }
431 }
432 }
433
434 // Batch delete from L3
435 if let Some(ref l3) = self.l3_store {
436 for id in ids {
437 if let Err(e) = l3.delete(id).await {
438 warn!(id, error = %e, "Failed to delete from L3");
439 }
440 }
441 }
442
443 // Update merkle trees
444 if let Some(ref sql_merkle) = self.sql_merkle {
445 if let Err(e) = sql_merkle.apply_batch(&merkle_batch).await {
446 error!(error = %e, "Failed to update SQL Merkle tree for batch deletion");
447 }
448 }
449
450 if let Some(ref redis_merkle) = self.redis_merkle {
451 if let Err(e) = redis_merkle.apply_batch(&merkle_batch).await {
452 warn!(error = %e, "Failed to update Redis Merkle tree for batch deletion");
453 }
454 }
455
456 info!(total, succeeded, "Batch delete completed");
457
458 Ok(BatchResult {
459 total,
460 succeeded,
461 failed: total - succeeded,
462 })
463 }
464
465 /// Get an item, or compute and insert it if missing.
466 ///
467 /// This is the classic "get or insert" pattern, useful for cache-aside:
468 /// 1. Check cache (L1 → L2 → L3)
469 /// 2. If missing, call the async factory function
470 /// 3. Insert the result and return it
471 ///
472 /// The factory is only called if the item is not found.
473 ///
474 /// # Example
475 ///
476 /// ```rust,no_run
477 /// # use sync_engine::{SyncEngine, SyncItem};
478 /// # use serde_json::json;
479 /// # async fn example(engine: &SyncEngine) {
480 /// let item = engine.get_or_insert_with("user.123", || async {
481 /// // Expensive operation - only runs if not cached
482 /// SyncItem::from_json("user.123".into(), json!({"name": "Fetched from DB"}))
483 /// }).await.unwrap();
484 /// # }
485 /// ```
486 pub async fn get_or_insert_with<F, Fut>(
487 &self,
488 id: &str,
489 factory: F,
490 ) -> Result<SyncItem, StorageError>
491 where
492 F: FnOnce() -> Fut,
493 Fut: std::future::Future<Output = SyncItem>,
494 {
495 // Try to get existing
496 if let Some(item) = self.get(id).await? {
497 return Ok(item);
498 }
499
500 // Not found - compute new value
501 let item = factory().await;
502
503 // Insert and return
504 self.submit(item.clone()).await?;
505
506 Ok(item)
507 }
508
509 // ═══════════════════════════════════════════════════════════════════════════
510 // State-based queries: Fast indexed access by caller-defined state tag
511 // ═══════════════════════════════════════════════════════════════════════════
512
513 /// Get items by state from SQL (L3 ground truth).
514 ///
515 /// Uses indexed query for fast retrieval.
516 ///
517 /// # Example
518 ///
519 /// ```rust,no_run
520 /// # use sync_engine::{SyncEngine, StorageError};
521 /// # async fn example(engine: &SyncEngine) -> Result<(), StorageError> {
522 /// // Get all delta items for CRDT merging
523 /// let deltas = engine.get_by_state("delta", 1000).await?;
524 /// for item in deltas {
525 /// println!("Delta: {}", item.object_id);
526 /// }
527 /// # Ok(())
528 /// # }
529 /// ```
530 pub async fn get_by_state(&self, state: &str, limit: usize) -> Result<Vec<SyncItem>, StorageError> {
531 if let Some(ref sql) = self.sql_store {
532 sql.get_by_state(state, limit).await
533 } else {
534 Ok(Vec::new())
535 }
536 }
537
538 /// Count items in a given state (SQL ground truth).
539 ///
540 /// # Example
541 ///
542 /// ```rust,no_run
543 /// # use sync_engine::{SyncEngine, StorageError};
544 /// # async fn example(engine: &SyncEngine) -> Result<(), StorageError> {
545 /// let pending_count = engine.count_by_state("pending").await?;
546 /// println!("{} items pending", pending_count);
547 /// # Ok(())
548 /// # }
549 /// ```
550 pub async fn count_by_state(&self, state: &str) -> Result<u64, StorageError> {
551 if let Some(ref sql) = self.sql_store {
552 sql.count_by_state(state).await
553 } else {
554 Ok(0)
555 }
556 }
557
558 /// Get just the IDs of items in a given state (lightweight query).
559 ///
560 /// Returns IDs from SQL. For Redis state SET, use `list_state_ids_redis()`.
561 pub async fn list_state_ids(&self, state: &str, limit: usize) -> Result<Vec<String>, StorageError> {
562 if let Some(ref sql) = self.sql_store {
563 sql.list_state_ids(state, limit).await
564 } else {
565 Ok(Vec::new())
566 }
567 }
568
569 /// Update the state of an item by ID.
570 ///
571 /// Updates both SQL (ground truth) and Redis state SETs.
572 /// L1 cache is NOT updated - caller should re-fetch if needed.
573 ///
574 /// Returns true if the item was found and updated.
575 pub async fn set_state(&self, id: &str, new_state: &str) -> Result<bool, StorageError> {
576 let mut updated = false;
577
578 // Update SQL (ground truth)
579 if let Some(ref sql) = self.sql_store {
580 updated = sql.set_state(id, new_state).await?;
581 }
582
583 // Note: Redis state SETs are not updated here because we'd need to know
584 // the old state to do SREM. For full Redis state management, the item
585 // should be re-submitted with the new state via submit_with().
586
587 Ok(updated)
588 }
589
590 /// Delete all items in a given state from SQL.
591 ///
592 /// Also removes from L1 cache and Redis state SET.
593 /// Returns the number of deleted items.
594 ///
595 /// # Example
596 ///
597 /// ```rust,no_run
598 /// # use sync_engine::{SyncEngine, StorageError};
599 /// # async fn example(engine: &SyncEngine) -> Result<(), StorageError> {
600 /// // Clean up all processed deltas
601 /// let deleted = engine.delete_by_state("delta").await?;
602 /// println!("Deleted {} delta items", deleted);
603 /// # Ok(())
604 /// # }
605 /// ```
606 pub async fn delete_by_state(&self, state: &str) -> Result<u64, StorageError> {
607 let mut deleted = 0u64;
608
609 // Get IDs first (for L1 cleanup)
610 let ids = if let Some(ref sql) = self.sql_store {
611 sql.list_state_ids(state, 100_000).await?
612 } else {
613 Vec::new()
614 };
615
616 // Remove from L1 cache
617 for id in &ids {
618 self.l1_cache.remove(id);
619 }
620
621 // Delete from SQL
622 if let Some(ref sql) = self.sql_store {
623 deleted = sql.delete_by_state(state).await?;
624 }
625
626 // Note: Redis items with TTL will expire naturally.
627 // For immediate Redis cleanup, call delete_by_state on RedisStore directly.
628
629 info!(state = %state, deleted = deleted, "Deleted items by state");
630
631 Ok(deleted)
632 }
633
634 // =========================================================================
635 // Prefix Scan Operations
636 // =========================================================================
637
638 /// Scan items by ID prefix.
639 ///
640 /// Retrieves all items whose ID starts with the given prefix.
641 /// Queries SQL (ground truth) directly - does NOT check L1 cache.
642 ///
643 /// Useful for CRDT delta-first architecture where deltas are stored as:
644 /// `delta:{object_id}:{op_id}` and you need to fetch all deltas for an object.
645 ///
646 /// # Example
647 ///
648 /// ```rust,no_run
649 /// # use sync_engine::{SyncEngine, StorageError};
650 /// # async fn example(engine: &SyncEngine) -> Result<(), StorageError> {
651 /// // Get base state
652 /// let base = engine.get("base:user.123").await?;
653 ///
654 /// // Get all pending deltas for this object
655 /// let deltas = engine.scan_prefix("delta:user.123:", 1000).await?;
656 ///
657 /// // Merge on-the-fly for read-repair
658 /// for delta in deltas {
659 /// println!("Delta: {} -> {:?}", delta.object_id, delta.content_as_json());
660 /// }
661 /// # Ok(())
662 /// # }
663 /// ```
664 pub async fn scan_prefix(&self, prefix: &str, limit: usize) -> Result<Vec<SyncItem>, StorageError> {
665 if let Some(ref sql) = self.sql_store {
666 sql.scan_prefix(prefix, limit).await
667 } else {
668 Ok(Vec::new())
669 }
670 }
671
672 /// Count items matching an ID prefix (SQL ground truth).
673 pub async fn count_prefix(&self, prefix: &str) -> Result<u64, StorageError> {
674 if let Some(ref sql) = self.sql_store {
675 sql.count_prefix(prefix).await
676 } else {
677 Ok(0)
678 }
679 }
680
681 /// Delete all items matching an ID prefix.
682 ///
683 /// Removes from L1 cache, SQL, and Redis.
684 /// Returns the number of deleted items.
685 ///
686 /// # Example
687 ///
688 /// ```rust,no_run
689 /// # use sync_engine::{SyncEngine, StorageError};
690 /// # async fn example(engine: &SyncEngine) -> Result<(), StorageError> {
691 /// // After merging deltas into base, clean them up
692 /// let deleted = engine.delete_prefix("delta:user.123:").await?;
693 /// println!("Cleaned up {} deltas", deleted);
694 /// # Ok(())
695 /// # }
696 /// ```
697 pub async fn delete_prefix(&self, prefix: &str) -> Result<u64, StorageError> {
698 let mut deleted = 0u64;
699
700 // Get IDs first (for L1/L2 cleanup)
701 let items = if let Some(ref sql) = self.sql_store {
702 sql.scan_prefix(prefix, 100_000).await?
703 } else {
704 Vec::new()
705 };
706
707 // Remove from L1 cache
708 for item in &items {
709 self.l1_cache.remove(&item.object_id);
710 }
711
712 // Remove from L2 (Redis) one-by-one via CacheStore trait
713 if let Some(ref l2) = self.l2_store {
714 for item in &items {
715 let _ = l2.delete(&item.object_id).await;
716 }
717 }
718
719 // Delete from SQL
720 if let Some(ref sql) = self.sql_store {
721 deleted = sql.delete_prefix(prefix).await?;
722 }
723
724 info!(prefix = %prefix, deleted = deleted, "Deleted items by prefix");
725
726 Ok(deleted)
727 }
728}
729
730#[cfg(test)]
731mod tests {
732 use super::*;
733 use crate::config::SyncEngineConfig;
734 use serde_json::json;
735 use tokio::sync::watch;
736
737 fn test_config() -> SyncEngineConfig {
738 SyncEngineConfig {
739 redis_url: None,
740 sql_url: None,
741 wal_path: None,
742 l1_max_bytes: 1024 * 1024,
743 ..Default::default()
744 }
745 }
746
747 fn test_item(id: &str) -> SyncItem {
748 SyncItem::from_json(id.to_string(), json!({"test": "data", "id": id}))
749 }
750
751 #[tokio::test]
752 async fn test_contains_l1_hit() {
753 let config = test_config();
754 let (_tx, rx) = watch::channel(config.clone());
755 let engine = SyncEngine::new(config, rx);
756
757 engine.l1_cache.insert("test.exists".into(), test_item("test.exists"));
758
759 assert!(engine.contains("test.exists").await);
760 }
761
762 #[tokio::test]
763 async fn test_contains_with_trusted_filter() {
764 let config = test_config();
765 let (_tx, rx) = watch::channel(config.clone());
766 let engine = SyncEngine::new(config, rx);
767
768 engine.l3_filter.mark_trusted();
769
770 engine.l1_cache.insert("test.exists".into(), test_item("test.exists"));
771 engine.l3_filter.insert("test.exists");
772
773 assert!(engine.contains("test.exists").await);
774 assert!(!engine.contains("test.missing").await);
775 }
776
777 #[test]
778 fn test_len_and_is_empty() {
779 let config = test_config();
780 let (_tx, rx) = watch::channel(config.clone());
781 let engine = SyncEngine::new(config, rx);
782
783 assert!(engine.is_empty());
784 assert_eq!(engine.len(), 0);
785
786 engine.l1_cache.insert("a".into(), test_item("a"));
787 assert!(!engine.is_empty());
788 assert_eq!(engine.len(), 1);
789
790 engine.l1_cache.insert("b".into(), test_item("b"));
791 assert_eq!(engine.len(), 2);
792 }
793
794 #[tokio::test]
795 async fn test_status_synced_in_l1() {
796 use super::super::EngineState;
797
798 let config = test_config();
799 let (_tx, rx) = watch::channel(config.clone());
800 let engine = SyncEngine::new(config, rx);
801 let _ = engine.state.send(EngineState::Ready);
802
803 engine.submit(test_item("test.item")).await.expect("Submit failed");
804 let _ = engine.l2_batcher.lock().await.force_flush();
805
806 let status = engine.status("test.item").await;
807 assert!(matches!(status, ItemStatus::Synced { in_l1: true, .. }));
808 }
809
810 #[tokio::test]
811 async fn test_status_pending() {
812 use super::super::EngineState;
813
814 let config = test_config();
815 let (_tx, rx) = watch::channel(config.clone());
816 let engine = SyncEngine::new(config, rx);
817 let _ = engine.state.send(EngineState::Ready);
818
819 engine.submit(test_item("test.pending")).await.expect("Submit failed");
820
821 let status = engine.status("test.pending").await;
822 assert_eq!(status, ItemStatus::Pending);
823 }
824
825 #[tokio::test]
826 async fn test_status_missing() {
827 let config = test_config();
828 let (_tx, rx) = watch::channel(config.clone());
829 let engine = SyncEngine::new(config, rx);
830
831 let status = engine.status("test.nonexistent").await;
832 assert_eq!(status, ItemStatus::Missing);
833 }
834
835 #[tokio::test]
836 async fn test_get_many_from_l1() {
837 use super::super::EngineState;
838
839 let config = test_config();
840 let (_tx, rx) = watch::channel(config.clone());
841 let engine = SyncEngine::new(config, rx);
842 let _ = engine.state.send(EngineState::Ready);
843
844 engine.l1_cache.insert("a".into(), test_item("a"));
845 engine.l1_cache.insert("b".into(), test_item("b"));
846 engine.l1_cache.insert("c".into(), test_item("c"));
847
848 let results = engine.get_many(&["a", "b", "missing", "c"]).await;
849
850 assert_eq!(results.len(), 4);
851 assert!(results[0].is_some());
852 assert!(results[1].is_some());
853 assert!(results[2].is_none());
854 assert!(results[3].is_some());
855
856 assert_eq!(results[0].as_ref().unwrap().object_id, "a");
857 assert_eq!(results[1].as_ref().unwrap().object_id, "b");
858 assert_eq!(results[3].as_ref().unwrap().object_id, "c");
859 }
860
861 #[tokio::test]
862 async fn test_submit_many() {
863 use super::super::EngineState;
864
865 let config = test_config();
866 let (_tx, rx) = watch::channel(config.clone());
867 let engine = SyncEngine::new(config, rx);
868 let _ = engine.state.send(EngineState::Ready);
869
870 let items = vec![
871 test_item("batch.1"),
872 test_item("batch.2"),
873 test_item("batch.3"),
874 ];
875
876 let result = engine.submit_many(items).await.expect("Batch submit failed");
877
878 assert_eq!(result.total, 3);
879 assert_eq!(result.succeeded, 3);
880 assert_eq!(result.failed, 0);
881 assert!(result.is_success());
882
883 assert_eq!(engine.len(), 3);
884 assert!(engine.contains("batch.1").await);
885 assert!(engine.contains("batch.2").await);
886 assert!(engine.contains("batch.3").await);
887 }
888
889 #[tokio::test]
890 async fn test_delete_many() {
891 use super::super::EngineState;
892
893 let config = test_config();
894 let (_tx, rx) = watch::channel(config.clone());
895 let engine = SyncEngine::new(config, rx);
896 let _ = engine.state.send(EngineState::Ready);
897
898 engine.l1_cache.insert("del.1".into(), test_item("del.1"));
899 engine.l1_cache.insert("del.2".into(), test_item("del.2"));
900 engine.l1_cache.insert("keep".into(), test_item("keep"));
901
902 let result = engine.delete_many(&["del.1", "del.2"]).await.expect("Batch delete failed");
903
904 assert_eq!(result.total, 2);
905 assert_eq!(result.succeeded, 2);
906 assert!(result.is_success());
907
908 assert!(!engine.l1_cache.contains_key("del.1"));
909 assert!(!engine.l1_cache.contains_key("del.2"));
910 assert!(engine.l1_cache.contains_key("keep"));
911 }
912
913 #[tokio::test]
914 async fn test_get_or_insert_with_existing() {
915 use super::super::EngineState;
916
917 let config = test_config();
918 let (_tx, rx) = watch::channel(config.clone());
919 let engine = SyncEngine::new(config, rx);
920 let _ = engine.state.send(EngineState::Ready);
921
922 let existing = test_item("existing");
923 engine.l1_cache.insert("existing".into(), existing.clone());
924
925 let factory_called = std::sync::atomic::AtomicBool::new(false);
926 let result = engine.get_or_insert_with("existing", || {
927 factory_called.store(true, std::sync::atomic::Ordering::SeqCst);
928 async { test_item("should_not_be_used") }
929 }).await.expect("get_or_insert_with failed");
930
931 assert!(!factory_called.load(std::sync::atomic::Ordering::SeqCst));
932 assert_eq!(result.object_id, "existing");
933 }
934
935 #[tokio::test]
936 async fn test_get_or_insert_with_missing() {
937 use super::super::EngineState;
938
939 let config = test_config();
940 let (_tx, rx) = watch::channel(config.clone());
941 let engine = SyncEngine::new(config, rx);
942 let _ = engine.state.send(EngineState::Ready);
943
944 let result = engine.get_or_insert_with("new_item", || async {
945 SyncItem::from_json("new_item".into(), json!({"created": "by factory"}))
946 }).await.expect("get_or_insert_with failed");
947
948 assert_eq!(result.object_id, "new_item");
949 assert!(engine.contains("new_item").await);
950 }
951}