sync_engine/coordinator/api.rs
1// Copyright (c) 2025-2026 Adrian Robinson. Licensed under the AGPL-3.0.
2// See LICENSE file in the project root for full license text.
3
4//! V1.1 API: Query and batch operations.
5//!
6//! This module contains the higher-level API methods added in V1.1:
7//! - `contains()` - Fast probabilistic existence check
8//! - `len()` / `is_empty()` - L1 cache size queries
9//! - `status()` - Detailed sync status
10//! - `get_many()` - Parallel batch fetch
11//! - `submit_many()` - Batch upsert
12//! - `delete_many()` - Batch delete
13//! - `get_or_insert_with()` - Cache-aside pattern
14
15use std::sync::atomic::Ordering;
16use tokio::task::JoinSet;
17use tracing::{debug, info, warn, error};
18
19use crate::storage::traits::StorageError;
20use crate::sync_item::SyncItem;
21use crate::merkle::MerkleBatch;
22
23use super::{SyncEngine, ItemStatus, BatchResult};
24
25impl SyncEngine {
26 // ═══════════════════════════════════════════════════════════════════════════
27 // API: Query & Batch Operations
28 // ═══════════════════════════════════════════════════════════════════════════
29
30 /// Check if an item exists across all tiers.
31 ///
32 /// Checks in order: L1 cache → Redis EXISTS → L3 Cuckoo filter → SQL query.
33 /// If found in SQL and Cuckoo filter was untrusted, updates the filter.
34 ///
35 /// # Returns
36 /// - `true` → item definitely exists in at least one tier
37 /// - `false` → item does not exist (authoritative)
38 ///
39 /// # Example
40 ///
41 /// ```rust,no_run
42 /// # use sync_engine::SyncEngine;
43 /// # async fn example(engine: &SyncEngine) {
44 /// if engine.contains("user.123").await {
45 /// let item = engine.get("user.123").await;
46 /// } else {
47 /// println!("Not found");
48 /// }
49 /// # }
50 /// ```
51 pub async fn contains(&self, id: &str) -> bool {
52 // L1: In-memory cache (definitive, sync)
53 if self.l1_cache.contains_key(id) {
54 return true;
55 }
56
57 // L2: Redis EXISTS (async, authoritative for Redis tier)
58 if let Some(ref l2) = self.l2_store {
59 if l2.exists(id).await.unwrap_or(false) {
60 return true;
61 }
62 }
63
64 // L3: Cuckoo filter check (sync, probabilistic)
65 if self.l3_filter.is_trusted() {
66 // Filter is trusted - use it for fast negative
67 if !self.l3_filter.should_check_l3(id) {
68 return false; // Definitely not in L3
69 }
70 }
71
72 // L3: SQL query (async, ground truth)
73 if let Some(ref l3) = self.l3_store {
74 if l3.exists(id).await.unwrap_or(false) {
75 // Found in SQL - update Cuckoo if it was untrusted
76 if !self.l3_filter.is_trusted() {
77 self.l3_filter.insert(id);
78 }
79 return true;
80 }
81 }
82
83 false
84 }
85
86 /// Fast check: is this item definitely NOT in L3?
87 ///
88 /// Uses the Cuckoo filter for a fast authoritative negative.
89 /// - Returns `true` → item is **definitely not** in L3 (safe to skip)
90 /// - Returns `false` → item **might** exist (need to check L3)
91 ///
92 /// Only meaningful when the L3 filter is trusted. If untrusted, returns `false`
93 /// (meaning "we don't know, you should check").
94 ///
95 /// # Use Case
96 ///
97 /// Fast early-exit in replication: if definitely missing, apply without checking.
98 ///
99 /// ```rust,no_run
100 /// # use sync_engine::SyncEngine;
101 /// # async fn example(engine: &SyncEngine) {
102 /// if engine.definitely_missing("patient.123") {
103 /// // Fast path: definitely new, just insert
104 /// println!("New item, inserting directly");
105 /// } else {
106 /// // Slow path: might exist, check hash
107 /// if !engine.is_current("patient.123", "abc123...").await {
108 /// println!("Outdated, updating");
109 /// }
110 /// }
111 /// # }
112 /// ```
113 #[must_use]
114 #[inline]
115 pub fn definitely_missing(&self, id: &str) -> bool {
116 // Only authoritative if filter is trusted
117 if !self.l3_filter.is_trusted() {
118 return false; // Unknown, caller should check
119 }
120 // Cuckoo false = definitely not there
121 !self.l3_filter.should_check_l3(id)
122 }
123
124 /// Fast check: might this item exist somewhere?
125 ///
126 /// Checks L1 cache (partial, evicts) and Cuckoo filter (probabilistic).
127 /// - Returns `true` → item is in L1 OR might be in L3 (worth checking)
128 /// - Returns `false` → item is definitely not in L1 or L3
129 ///
130 /// Note: L1 is partial (items evict), so this can return `false` even if
131 /// the item exists in L2/L3. For authoritative check, use `contains()`.
132 ///
133 /// # Use Case
134 ///
135 /// Quick probabilistic check before expensive async lookup.
136 #[must_use]
137 #[inline]
138 pub fn might_exist(&self, id: &str) -> bool {
139 self.l1_cache.contains_key(id) || self.l3_filter.should_check_l3(id)
140 }
141
142 /// Check if the item at `key` has the given content hash.
143 ///
144 /// This is the semantic API for CDC deduplication in replication.
145 /// Returns `true` if the item exists AND its content hash matches.
146 /// Returns `false` if item doesn't exist OR hash differs.
147 ///
148 /// # Arguments
149 /// * `id` - Object ID
150 /// * `content_hash` - SHA256 hash of content (hex-encoded string)
151 ///
152 /// # Example
153 ///
154 /// ```rust,no_run
155 /// # use sync_engine::SyncEngine;
156 /// # async fn example(engine: &SyncEngine) {
157 /// // Skip replication if we already have this version
158 /// let incoming_hash = "abc123...";
159 /// if engine.is_current("patient.123", incoming_hash).await {
160 /// println!("Already up to date, skipping");
161 /// return;
162 /// }
163 /// # }
164 /// ```
165 pub async fn is_current(&self, id: &str, content_hash: &str) -> bool {
166 // Check L1 first (fastest)
167 if let Some(item) = self.l1_cache.get(id) {
168 return item.content_hash == content_hash;
169 }
170
171 // Check L2 (if available)
172 if let Some(ref l2) = self.l2_store {
173 if let Ok(Some(item)) = l2.get(id).await {
174 return item.content_hash == content_hash;
175 }
176 }
177
178 // Check L3 (ground truth)
179 if let Some(ref l3) = self.l3_store {
180 if self.l3_filter.should_check_l3(id) {
181 if let Ok(Some(item)) = l3.get(id).await {
182 return item.content_hash == content_hash;
183 }
184 }
185 }
186
187 false
188 }
189
190 /// Get the current count of items in L1 cache.
191 #[must_use]
192 #[inline]
193 pub fn len(&self) -> usize {
194 self.l1_cache.len()
195 }
196
197 /// Check if L1 cache is empty.
198 #[must_use]
199 #[inline]
200 pub fn is_empty(&self) -> bool {
201 self.l1_cache.is_empty()
202 }
203
204 /// Get the sync status of an item.
205 ///
206 /// Returns detailed state information about where an item exists
207 /// and its sync status across tiers.
208 ///
209 /// # Example
210 ///
211 /// ```rust,no_run
212 /// # use sync_engine::{SyncEngine, ItemStatus};
213 /// # async fn example(engine: &SyncEngine) {
214 /// match engine.status("order.456").await {
215 /// ItemStatus::Synced { in_l1, in_l2, in_l3 } => {
216 /// println!("Synced: L1={}, L2={}, L3={}", in_l1, in_l2, in_l3);
217 /// }
218 /// ItemStatus::Pending => println!("Queued for sync"),
219 /// ItemStatus::Missing => println!("Not found"),
220 /// }
221 /// # }
222 /// ```
223 pub async fn status(&self, id: &str) -> ItemStatus {
224 let in_l1 = self.l1_cache.contains_key(id);
225
226 // Check if pending in batch queue
227 let pending = self.l2_batcher.lock().await.contains(id);
228 if pending {
229 return ItemStatus::Pending;
230 }
231
232 // Check L2 (if available) - use EXISTS, no filter
233 let in_l2 = if let Some(ref l2) = self.l2_store {
234 l2.exists(id).await.unwrap_or(false)
235 } else {
236 false
237 };
238
239 // Check L3 (if available)
240 let in_l3 = if let Some(ref l3) = self.l3_store {
241 self.l3_filter.should_check_l3(id) && l3.get(id).await.ok().flatten().is_some()
242 } else {
243 false
244 };
245
246 if in_l1 || in_l2 || in_l3 {
247 ItemStatus::Synced { in_l1, in_l2, in_l3 }
248 } else {
249 ItemStatus::Missing
250 }
251 }
252
253 /// Fetch multiple items in parallel.
254 ///
255 /// Returns a vector of `Option<SyncItem>` in the same order as input IDs.
256 /// Missing items are represented as `None`.
257 ///
258 /// # Performance
259 ///
260 /// This method fetches from L1 synchronously, then batches L2/L3 lookups
261 /// for items not in L1. Much faster than sequential `get()` calls.
262 ///
263 /// # Example
264 ///
265 /// ```rust,no_run
266 /// # use sync_engine::SyncEngine;
267 /// # async fn example(engine: &SyncEngine) {
268 /// let ids = vec!["user.1", "user.2", "user.3"];
269 /// let items = engine.get_many(&ids).await;
270 /// for (id, item) in ids.iter().zip(items.iter()) {
271 /// match item {
272 /// Some(item) => println!("{}: found", id),
273 /// None => println!("{}: missing", id),
274 /// }
275 /// }
276 /// # }
277 /// ```
278 pub async fn get_many(&self, ids: &[&str]) -> Vec<Option<SyncItem>> {
279 let mut results: Vec<Option<SyncItem>> = vec![None; ids.len()];
280 let mut missing_indices: Vec<usize> = Vec::new();
281
282 // Phase 1: Check L1 (synchronous, fast)
283 for (i, id) in ids.iter().enumerate() {
284 if let Some(item) = self.l1_cache.get(*id) {
285 results[i] = Some(item.clone());
286 } else {
287 missing_indices.push(i);
288 }
289 }
290
291 // Phase 2: Fetch missing items from L2/L3 in parallel
292 if !missing_indices.is_empty() {
293 let mut join_set: JoinSet<(usize, Option<SyncItem>)> = JoinSet::new();
294
295 for &i in &missing_indices {
296 let id = ids[i].to_string();
297 let l2_store = self.l2_store.clone();
298 let l3_store = self.l3_store.clone();
299 let l3_filter = self.l3_filter.clone();
300
301 join_set.spawn(async move {
302 // Try L2 first (no filter, just try Redis)
303 if let Some(ref l2) = l2_store {
304 if let Ok(Some(item)) = l2.get(&id).await {
305 return (i, Some(item));
306 }
307 }
308
309 // Fall back to L3 (use Cuckoo filter if trusted)
310 if let Some(ref l3) = l3_store {
311 if !l3_filter.is_trusted() || l3_filter.should_check_l3(&id) {
312 if let Ok(Some(item)) = l3.get(&id).await {
313 return (i, Some(item));
314 }
315 }
316 }
317
318 (i, None)
319 });
320 }
321
322 // Collect results
323 while let Some(result) = join_set.join_next().await {
324 if let Ok((i, item)) = result {
325 results[i] = item;
326 }
327 }
328 }
329
330 results
331 }
332
333 /// Submit multiple items for sync atomically.
334 ///
335 /// All items are added to L1 and queued for batch persistence.
336 /// Returns a `BatchResult` with success/failure counts.
337 ///
338 /// # Example
339 ///
340 /// ```rust,no_run
341 /// # use sync_engine::{SyncEngine, SyncItem};
342 /// # use serde_json::json;
343 /// # async fn example(engine: &SyncEngine) {
344 /// let items = vec![
345 /// SyncItem::from_json("user.1".into(), json!({"name": "Alice"})),
346 /// SyncItem::from_json("user.2".into(), json!({"name": "Bob"})),
347 /// ];
348 /// let result = engine.submit_many(items).await.unwrap();
349 /// println!("Submitted: {}, Failed: {}", result.succeeded, result.failed);
350 /// # }
351 /// ```
352 pub async fn submit_many(&self, items: Vec<SyncItem>) -> Result<BatchResult, StorageError> {
353 if !self.should_accept_writes() {
354 return Err(StorageError::Backend(format!(
355 "Rejecting batch write: engine state={}, pressure={}",
356 self.state(),
357 self.pressure()
358 )));
359 }
360
361 let total = items.len();
362 let mut succeeded = 0;
363
364 // Lock batcher once for the whole batch
365 let mut batcher = self.l2_batcher.lock().await;
366
367 for item in items {
368 self.insert_l1(item.clone());
369 batcher.add(item);
370 succeeded += 1;
371 }
372
373 debug!(total, succeeded, "Batch submitted to L1 and queue");
374
375 Ok(BatchResult {
376 total,
377 succeeded,
378 failed: total - succeeded,
379 })
380 }
381
382 /// Submit a batch of materialized views.
383 ///
384 /// Uses a dedicated queue to avoid contention with the main batcher.
385 /// Views are flushed independently by the sync engine loop.
386 pub async fn submit_view_batch(&self, items: Vec<SyncItem>) -> Result<BatchResult, StorageError> {
387 if !self.should_accept_writes() {
388 return Err(StorageError::Backend(format!(
389 "Rejecting view batch: engine state={}, pressure={}",
390 self.state(),
391 self.pressure()
392 )));
393 }
394
395 let total = items.len();
396
397 // Update L1 cache immediately
398 for item in &items {
399 self.insert_l1(item.clone());
400 }
401
402 // Send to dedicated queue
403 match self.view_queue_tx.send(items).await {
404 Ok(_) => {
405 debug!(total, "View batch submitted to queue");
406 Ok(BatchResult {
407 total,
408 succeeded: total,
409 failed: 0,
410 })
411 },
412 Err(_) => {
413 error!("View queue closed");
414 Err(StorageError::Backend("View queue closed".to_string()))
415 }
416 }
417 }
418
419 /// Delete multiple items atomically.
420 ///
421 /// Removes items from all tiers (L1, L2, L3) and updates filters.
422 /// Returns a `BatchResult` with counts.
423 ///
424 /// # Example
425 ///
426 /// ```rust,no_run
427 /// # use sync_engine::SyncEngine;
428 /// # async fn example(engine: &SyncEngine) {
429 /// let ids = vec!["user.1", "user.2", "user.3"];
430 /// let result = engine.delete_many(&ids).await.unwrap();
431 /// println!("Deleted: {}", result.succeeded);
432 /// # }
433 /// ```
434 pub async fn delete_many(&self, ids: &[&str]) -> Result<BatchResult, StorageError> {
435 if !self.should_accept_writes() {
436 return Err(StorageError::Backend(format!(
437 "Rejecting batch delete: engine state={}, pressure={}",
438 self.state(),
439 self.pressure()
440 )));
441 }
442
443 let total = ids.len();
444 let mut succeeded = 0;
445
446 // Build merkle batch for all deletions
447 let mut merkle_batch = MerkleBatch::new();
448
449 for id in ids {
450 // Remove from L1
451 if let Some((_, item)) = self.l1_cache.remove(*id) {
452 let size = Self::item_size(&item);
453 self.l1_size_bytes.fetch_sub(size, Ordering::Release);
454 }
455
456 // Remove from L3 filter (no L2 filter with TTL support)
457 self.l3_filter.remove(id);
458
459 // Queue merkle deletion
460 merkle_batch.delete(id.to_string());
461
462 succeeded += 1;
463 }
464
465 // Batch delete from L2
466 if let Some(ref l2) = self.l2_store {
467 for id in ids {
468 if let Err(e) = l2.delete(id).await {
469 warn!(id, error = %e, "Failed to delete from L2");
470 }
471 }
472 }
473
474 // Batch delete from L3
475 if let Some(ref l3) = self.l3_store {
476 for id in ids {
477 if let Err(e) = l3.delete(id).await {
478 warn!(id, error = %e, "Failed to delete from L3");
479 }
480 }
481 }
482
483 // Update merkle trees
484 if let Some(ref sql_merkle) = self.sql_merkle {
485 if let Err(e) = sql_merkle.apply_batch(&merkle_batch).await {
486 error!(error = %e, "Failed to update SQL Merkle tree for batch deletion");
487 }
488 }
489
490 if let Some(ref redis_merkle) = self.redis_merkle {
491 if let Err(e) = redis_merkle.apply_batch(&merkle_batch).await {
492 warn!(error = %e, "Failed to update Redis Merkle tree for batch deletion");
493 }
494 }
495
496 info!(total, succeeded, "Batch delete completed");
497
498 Ok(BatchResult {
499 total,
500 succeeded,
501 failed: total - succeeded,
502 })
503 }
504
505 /// Get an item, or compute and insert it if missing.
506 ///
507 /// This is the classic "get or insert" pattern, useful for cache-aside:
508 /// 1. Check cache (L1 → L2 → L3)
509 /// 2. If missing, call the async factory function
510 /// 3. Insert the result and return it
511 ///
512 /// The factory is only called if the item is not found.
513 ///
514 /// # Example
515 ///
516 /// ```rust,no_run
517 /// # use sync_engine::{SyncEngine, SyncItem};
518 /// # use serde_json::json;
519 /// # async fn example(engine: &SyncEngine) {
520 /// let item = engine.get_or_insert_with("user.123", || async {
521 /// // Expensive operation - only runs if not cached
522 /// SyncItem::from_json("user.123".into(), json!({"name": "Fetched from DB"}))
523 /// }).await.unwrap();
524 /// # }
525 /// ```
526 pub async fn get_or_insert_with<F, Fut>(
527 &self,
528 id: &str,
529 factory: F,
530 ) -> Result<SyncItem, StorageError>
531 where
532 F: FnOnce() -> Fut,
533 Fut: std::future::Future<Output = SyncItem>,
534 {
535 // Try to get existing
536 if let Some(item) = self.get(id).await? {
537 return Ok(item);
538 }
539
540 // Not found - compute new value
541 let item = factory().await;
542
543 // Insert and return
544 self.submit(item.clone()).await?;
545
546 Ok(item)
547 }
548
549 // ═══════════════════════════════════════════════════════════════════════════
550 // State-based queries: Fast indexed access by caller-defined state tag
551 // ═══════════════════════════════════════════════════════════════════════════
552
553 /// Get items by state from SQL (L3 ground truth).
554 ///
555 /// Uses indexed query for fast retrieval.
556 ///
557 /// # Example
558 ///
559 /// ```rust,no_run
560 /// # use sync_engine::{SyncEngine, StorageError};
561 /// # async fn example(engine: &SyncEngine) -> Result<(), StorageError> {
562 /// // Get all delta items for CRDT merging
563 /// let deltas = engine.get_by_state("delta", 1000).await?;
564 /// for item in deltas {
565 /// println!("Delta: {}", item.object_id);
566 /// }
567 /// # Ok(())
568 /// # }
569 /// ```
570 pub async fn get_by_state(&self, state: &str, limit: usize) -> Result<Vec<SyncItem>, StorageError> {
571 if let Some(ref sql) = self.sql_store {
572 sql.get_by_state(state, limit).await
573 } else {
574 Ok(Vec::new())
575 }
576 }
577
578 /// Count items in a given state (SQL ground truth).
579 ///
580 /// # Example
581 ///
582 /// ```rust,no_run
583 /// # use sync_engine::{SyncEngine, StorageError};
584 /// # async fn example(engine: &SyncEngine) -> Result<(), StorageError> {
585 /// let pending_count = engine.count_by_state("pending").await?;
586 /// println!("{} items pending", pending_count);
587 /// # Ok(())
588 /// # }
589 /// ```
590 pub async fn count_by_state(&self, state: &str) -> Result<u64, StorageError> {
591 if let Some(ref sql) = self.sql_store {
592 sql.count_by_state(state).await
593 } else {
594 Ok(0)
595 }
596 }
597
598 /// Get just the IDs of items in a given state (lightweight query).
599 ///
600 /// Returns IDs from SQL. For Redis state SET, use `list_state_ids_redis()`.
601 pub async fn list_state_ids(&self, state: &str, limit: usize) -> Result<Vec<String>, StorageError> {
602 if let Some(ref sql) = self.sql_store {
603 sql.list_state_ids(state, limit).await
604 } else {
605 Ok(Vec::new())
606 }
607 }
608
609 /// Update the state of an item by ID.
610 ///
611 /// Updates both SQL (ground truth) and Redis state SETs.
612 /// L1 cache is NOT updated - caller should re-fetch if needed.
613 ///
614 /// Returns true if the item was found and updated.
615 pub async fn set_state(&self, id: &str, new_state: &str) -> Result<bool, StorageError> {
616 let mut updated = false;
617
618 // Update SQL (ground truth)
619 if let Some(ref sql) = self.sql_store {
620 updated = sql.set_state(id, new_state).await?;
621 }
622
623 // Note: Redis state SETs are not updated here because we'd need to know
624 // the old state to do SREM. For full Redis state management, the item
625 // should be re-submitted with the new state via submit_with().
626
627 Ok(updated)
628 }
629
630 /// Delete all items in a given state from SQL.
631 ///
632 /// Also removes from L1 cache and Redis state SET.
633 /// Returns the number of deleted items.
634 ///
635 /// # Example
636 ///
637 /// ```rust,no_run
638 /// # use sync_engine::{SyncEngine, StorageError};
639 /// # async fn example(engine: &SyncEngine) -> Result<(), StorageError> {
640 /// // Clean up all processed deltas
641 /// let deleted = engine.delete_by_state("delta").await?;
642 /// println!("Deleted {} delta items", deleted);
643 /// # Ok(())
644 /// # }
645 /// ```
646 pub async fn delete_by_state(&self, state: &str) -> Result<u64, StorageError> {
647 let mut deleted = 0u64;
648
649 // Get IDs first (for L1 cleanup)
650 let ids = if let Some(ref sql) = self.sql_store {
651 sql.list_state_ids(state, 100_000).await?
652 } else {
653 Vec::new()
654 };
655
656 // Remove from L1 cache
657 for id in &ids {
658 self.l1_cache.remove(id);
659 }
660
661 // Delete from SQL
662 if let Some(ref sql) = self.sql_store {
663 deleted = sql.delete_by_state(state).await?;
664 }
665
666 // Note: Redis items with TTL will expire naturally.
667 // For immediate Redis cleanup, call delete_by_state on RedisStore directly.
668
669 info!(state = %state, deleted = deleted, "Deleted items by state");
670
671 Ok(deleted)
672 }
673
674 // =========================================================================
675 // Prefix Scan Operations
676 // =========================================================================
677
678 /// Scan items by ID prefix.
679 ///
680 /// Retrieves all items whose ID starts with the given prefix.
681 /// Queries SQL (ground truth) directly - does NOT check L1 cache.
682 ///
683 /// Useful for CRDT delta-first architecture where deltas are stored as:
684 /// `delta:{object_id}:{op_id}` and you need to fetch all deltas for an object.
685 ///
686 /// # Example
687 ///
688 /// ```rust,no_run
689 /// # use sync_engine::{SyncEngine, StorageError};
690 /// # async fn example(engine: &SyncEngine) -> Result<(), StorageError> {
691 /// // Get base state
692 /// let base = engine.get("base:user.123").await?;
693 ///
694 /// // Get all pending deltas for this object
695 /// let deltas = engine.scan_prefix("delta:user.123:", 1000).await?;
696 ///
697 /// // Merge on-the-fly for read-repair
698 /// for delta in deltas {
699 /// println!("Delta: {} -> {:?}", delta.object_id, delta.content_as_json());
700 /// }
701 /// # Ok(())
702 /// # }
703 /// ```
704 pub async fn scan_prefix(&self, prefix: &str, limit: usize) -> Result<Vec<SyncItem>, StorageError> {
705 if let Some(ref sql) = self.sql_store {
706 sql.scan_prefix(prefix, limit).await
707 } else {
708 Ok(Vec::new())
709 }
710 }
711
712 /// Count items matching an ID prefix (SQL ground truth).
713 pub async fn count_prefix(&self, prefix: &str) -> Result<u64, StorageError> {
714 if let Some(ref sql) = self.sql_store {
715 sql.count_prefix(prefix).await
716 } else {
717 Ok(0)
718 }
719 }
720
721 /// Delete all items matching an ID prefix.
722 ///
723 /// Removes from L1 cache, SQL, and Redis.
724 /// Returns the number of deleted items.
725 ///
726 /// # Example
727 ///
728 /// ```rust,no_run
729 /// # use sync_engine::{SyncEngine, StorageError};
730 /// # async fn example(engine: &SyncEngine) -> Result<(), StorageError> {
731 /// // After merging deltas into base, clean them up
732 /// let deleted = engine.delete_prefix("delta:user.123:").await?;
733 /// println!("Cleaned up {} deltas", deleted);
734 /// # Ok(())
735 /// # }
736 /// ```
737 pub async fn delete_prefix(&self, prefix: &str) -> Result<u64, StorageError> {
738 let mut deleted = 0u64;
739
740 // Get IDs first (for L1/L2 cleanup)
741 let items = if let Some(ref sql) = self.sql_store {
742 sql.scan_prefix(prefix, 100_000).await?
743 } else {
744 Vec::new()
745 };
746
747 // Remove from L1 cache
748 for item in &items {
749 self.l1_cache.remove(&item.object_id);
750 }
751
752 // Remove from L2 (Redis) one-by-one via CacheStore trait
753 if let Some(ref l2) = self.l2_store {
754 for item in &items {
755 let _ = l2.delete(&item.object_id).await;
756 }
757 }
758
759 // Delete from SQL
760 if let Some(ref sql) = self.sql_store {
761 deleted = sql.delete_prefix(prefix).await?;
762 }
763
764 info!(prefix = %prefix, deleted = deleted, "Deleted items by prefix");
765
766 Ok(deleted)
767 }
768}
769
770#[cfg(test)]
771mod tests {
772 use super::*;
773 use crate::config::SyncEngineConfig;
774 use serde_json::json;
775 use tokio::sync::watch;
776
777 fn test_config() -> SyncEngineConfig {
778 SyncEngineConfig {
779 redis_url: None,
780 sql_url: None,
781 wal_path: None,
782 l1_max_bytes: 1024 * 1024,
783 ..Default::default()
784 }
785 }
786
787 fn test_item(id: &str) -> SyncItem {
788 SyncItem::from_json(id.to_string(), json!({"test": "data", "id": id}))
789 }
790
791 #[tokio::test]
792 async fn test_contains_l1_hit() {
793 let config = test_config();
794 let (_tx, rx) = watch::channel(config.clone());
795 let engine = SyncEngine::new(config, rx);
796
797 engine.l1_cache.insert("test.exists".into(), test_item("test.exists"));
798
799 assert!(engine.contains("test.exists").await);
800 }
801
802 #[tokio::test]
803 async fn test_contains_with_trusted_filter() {
804 let config = test_config();
805 let (_tx, rx) = watch::channel(config.clone());
806 let engine = SyncEngine::new(config, rx);
807
808 engine.l3_filter.mark_trusted();
809
810 engine.l1_cache.insert("test.exists".into(), test_item("test.exists"));
811 engine.l3_filter.insert("test.exists");
812
813 assert!(engine.contains("test.exists").await);
814 assert!(!engine.contains("test.missing").await);
815 }
816
817 #[test]
818 fn test_len_and_is_empty() {
819 let config = test_config();
820 let (_tx, rx) = watch::channel(config.clone());
821 let engine = SyncEngine::new(config, rx);
822
823 assert!(engine.is_empty());
824 assert_eq!(engine.len(), 0);
825
826 engine.l1_cache.insert("a".into(), test_item("a"));
827 assert!(!engine.is_empty());
828 assert_eq!(engine.len(), 1);
829
830 engine.l1_cache.insert("b".into(), test_item("b"));
831 assert_eq!(engine.len(), 2);
832 }
833
834 #[tokio::test]
835 async fn test_status_synced_in_l1() {
836 use super::super::EngineState;
837
838 let config = test_config();
839 let (_tx, rx) = watch::channel(config.clone());
840 let engine = SyncEngine::new(config, rx);
841 let _ = engine.state.send(EngineState::Ready);
842
843 engine.submit(test_item("test.item")).await.expect("Submit failed");
844 let _ = engine.l2_batcher.lock().await.force_flush();
845
846 let status = engine.status("test.item").await;
847 assert!(matches!(status, ItemStatus::Synced { in_l1: true, .. }));
848 }
849
850 #[tokio::test]
851 async fn test_status_pending() {
852 use super::super::EngineState;
853
854 let config = test_config();
855 let (_tx, rx) = watch::channel(config.clone());
856 let engine = SyncEngine::new(config, rx);
857 let _ = engine.state.send(EngineState::Ready);
858
859 engine.submit(test_item("test.pending")).await.expect("Submit failed");
860
861 let status = engine.status("test.pending").await;
862 assert_eq!(status, ItemStatus::Pending);
863 }
864
865 #[tokio::test]
866 async fn test_status_missing() {
867 let config = test_config();
868 let (_tx, rx) = watch::channel(config.clone());
869 let engine = SyncEngine::new(config, rx);
870
871 let status = engine.status("test.nonexistent").await;
872 assert_eq!(status, ItemStatus::Missing);
873 }
874
875 #[tokio::test]
876 async fn test_get_many_from_l1() {
877 use super::super::EngineState;
878
879 let config = test_config();
880 let (_tx, rx) = watch::channel(config.clone());
881 let engine = SyncEngine::new(config, rx);
882 let _ = engine.state.send(EngineState::Ready);
883
884 engine.l1_cache.insert("a".into(), test_item("a"));
885 engine.l1_cache.insert("b".into(), test_item("b"));
886 engine.l1_cache.insert("c".into(), test_item("c"));
887
888 let results = engine.get_many(&["a", "b", "missing", "c"]).await;
889
890 assert_eq!(results.len(), 4);
891 assert!(results[0].is_some());
892 assert!(results[1].is_some());
893 assert!(results[2].is_none());
894 assert!(results[3].is_some());
895
896 assert_eq!(results[0].as_ref().unwrap().object_id, "a");
897 assert_eq!(results[1].as_ref().unwrap().object_id, "b");
898 assert_eq!(results[3].as_ref().unwrap().object_id, "c");
899 }
900
901 #[tokio::test]
902 async fn test_submit_many() {
903 use super::super::EngineState;
904
905 let config = test_config();
906 let (_tx, rx) = watch::channel(config.clone());
907 let engine = SyncEngine::new(config, rx);
908 let _ = engine.state.send(EngineState::Ready);
909
910 let items = vec![
911 test_item("batch.1"),
912 test_item("batch.2"),
913 test_item("batch.3"),
914 ];
915
916 let result = engine.submit_many(items).await.expect("Batch submit failed");
917
918 assert_eq!(result.total, 3);
919 assert_eq!(result.succeeded, 3);
920 assert_eq!(result.failed, 0);
921 assert!(result.is_success());
922
923 assert_eq!(engine.len(), 3);
924 assert!(engine.contains("batch.1").await);
925 assert!(engine.contains("batch.2").await);
926 assert!(engine.contains("batch.3").await);
927 }
928
929 #[tokio::test]
930 async fn test_delete_many() {
931 use super::super::EngineState;
932
933 let config = test_config();
934 let (_tx, rx) = watch::channel(config.clone());
935 let engine = SyncEngine::new(config, rx);
936 let _ = engine.state.send(EngineState::Ready);
937
938 engine.l1_cache.insert("del.1".into(), test_item("del.1"));
939 engine.l1_cache.insert("del.2".into(), test_item("del.2"));
940 engine.l1_cache.insert("keep".into(), test_item("keep"));
941
942 let result = engine.delete_many(&["del.1", "del.2"]).await.expect("Batch delete failed");
943
944 assert_eq!(result.total, 2);
945 assert_eq!(result.succeeded, 2);
946 assert!(result.is_success());
947
948 assert!(!engine.l1_cache.contains_key("del.1"));
949 assert!(!engine.l1_cache.contains_key("del.2"));
950 assert!(engine.l1_cache.contains_key("keep"));
951 }
952
953 #[tokio::test]
954 async fn test_get_or_insert_with_existing() {
955 use super::super::EngineState;
956
957 let config = test_config();
958 let (_tx, rx) = watch::channel(config.clone());
959 let engine = SyncEngine::new(config, rx);
960 let _ = engine.state.send(EngineState::Ready);
961
962 let existing = test_item("existing");
963 engine.l1_cache.insert("existing".into(), existing.clone());
964
965 let factory_called = std::sync::atomic::AtomicBool::new(false);
966 let result = engine.get_or_insert_with("existing", || {
967 factory_called.store(true, std::sync::atomic::Ordering::SeqCst);
968 async { test_item("should_not_be_used") }
969 }).await.expect("get_or_insert_with failed");
970
971 assert!(!factory_called.load(std::sync::atomic::Ordering::SeqCst));
972 assert_eq!(result.object_id, "existing");
973 }
974
975 #[tokio::test]
976 async fn test_get_or_insert_with_missing() {
977 use super::super::EngineState;
978
979 let config = test_config();
980 let (_tx, rx) = watch::channel(config.clone());
981 let engine = SyncEngine::new(config, rx);
982 let _ = engine.state.send(EngineState::Ready);
983
984 let result = engine.get_or_insert_with("new_item", || async {
985 SyncItem::from_json("new_item".into(), json!({"created": "by factory"}))
986 }).await.expect("get_or_insert_with failed");
987
988 assert_eq!(result.object_id, "new_item");
989 assert!(engine.contains("new_item").await);
990 }
991}