1use std::sync::atomic::Ordering;
13use tokio::task::JoinSet;
14use tracing::{debug, info, warn, error};
15
16use crate::storage::traits::StorageError;
17use crate::sync_item::SyncItem;
18use crate::merkle::MerkleBatch;
19
20use super::{SyncEngine, ItemStatus, BatchResult};
21
22impl SyncEngine {
23 pub async fn contains(&self, id: &str) -> bool {
49 if self.l1_cache.contains_key(id) {
51 return true;
52 }
53
54 if let Some(ref l2) = self.l2_store {
56 if l2.exists(id).await.unwrap_or(false) {
57 return true;
58 }
59 }
60
61 if self.l3_filter.is_trusted() {
63 if !self.l3_filter.should_check_l3(id) {
65 return false; }
67 }
68
69 if let Some(ref l3) = self.l3_store {
71 if l3.exists(id).await.unwrap_or(false) {
72 if !self.l3_filter.is_trusted() {
74 self.l3_filter.insert(id);
75 }
76 return true;
77 }
78 }
79
80 false
81 }
82
83 #[must_use]
88 #[inline]
89 pub fn contains_fast(&self, id: &str) -> bool {
90 self.l1_cache.contains_key(id) || self.l3_filter.should_check_l3(id)
91 }
92
93 #[must_use]
95 #[inline]
96 pub fn len(&self) -> usize {
97 self.l1_cache.len()
98 }
99
100 #[must_use]
102 #[inline]
103 pub fn is_empty(&self) -> bool {
104 self.l1_cache.is_empty()
105 }
106
107 pub async fn status(&self, id: &str) -> ItemStatus {
127 let in_l1 = self.l1_cache.contains_key(id);
128
129 let pending = self.l2_batcher.lock().await.contains(id);
131 if pending {
132 return ItemStatus::Pending;
133 }
134
135 let in_l2 = if let Some(ref l2) = self.l2_store {
137 l2.exists(id).await.unwrap_or(false)
138 } else {
139 false
140 };
141
142 let in_l3 = if let Some(ref l3) = self.l3_store {
144 self.l3_filter.should_check_l3(id) && l3.get(id).await.ok().flatten().is_some()
145 } else {
146 false
147 };
148
149 if in_l1 || in_l2 || in_l3 {
150 ItemStatus::Synced { in_l1, in_l2, in_l3 }
151 } else {
152 ItemStatus::Missing
153 }
154 }
155
156 pub async fn get_many(&self, ids: &[&str]) -> Vec<Option<SyncItem>> {
182 let mut results: Vec<Option<SyncItem>> = vec![None; ids.len()];
183 let mut missing_indices: Vec<usize> = Vec::new();
184
185 for (i, id) in ids.iter().enumerate() {
187 if let Some(item) = self.l1_cache.get(*id) {
188 results[i] = Some(item.clone());
189 } else {
190 missing_indices.push(i);
191 }
192 }
193
194 if !missing_indices.is_empty() {
196 let mut join_set: JoinSet<(usize, Option<SyncItem>)> = JoinSet::new();
197
198 for &i in &missing_indices {
199 let id = ids[i].to_string();
200 let l2_store = self.l2_store.clone();
201 let l3_store = self.l3_store.clone();
202 let l3_filter = self.l3_filter.clone();
203
204 join_set.spawn(async move {
205 if let Some(ref l2) = l2_store {
207 if let Ok(Some(item)) = l2.get(&id).await {
208 return (i, Some(item));
209 }
210 }
211
212 if let Some(ref l3) = l3_store {
214 if !l3_filter.is_trusted() || l3_filter.should_check_l3(&id) {
215 if let Ok(Some(item)) = l3.get(&id).await {
216 return (i, Some(item));
217 }
218 }
219 }
220
221 (i, None)
222 });
223 }
224
225 while let Some(result) = join_set.join_next().await {
227 if let Ok((i, item)) = result {
228 results[i] = item;
229 }
230 }
231 }
232
233 results
234 }
235
236 pub async fn submit_many(&self, items: Vec<SyncItem>) -> Result<BatchResult, StorageError> {
256 if !self.should_accept_writes() {
257 return Err(StorageError::Backend(format!(
258 "Rejecting batch write: engine state={}, pressure={}",
259 self.state(),
260 self.pressure()
261 )));
262 }
263
264 let total = items.len();
265 let mut succeeded = 0;
266
267 for item in items {
268 self.insert_l1(item.clone());
269 self.l2_batcher.lock().await.add(item);
270 succeeded += 1;
271 }
272
273 debug!(total, succeeded, "Batch submitted to L1 and queue");
274
275 Ok(BatchResult {
276 total,
277 succeeded,
278 failed: total - succeeded,
279 })
280 }
281
282 pub async fn delete_many(&self, ids: &[&str]) -> Result<BatchResult, StorageError> {
298 if !self.should_accept_writes() {
299 return Err(StorageError::Backend(format!(
300 "Rejecting batch delete: engine state={}, pressure={}",
301 self.state(),
302 self.pressure()
303 )));
304 }
305
306 let total = ids.len();
307 let mut succeeded = 0;
308
309 let mut merkle_batch = MerkleBatch::new();
311
312 for id in ids {
313 if let Some((_, item)) = self.l1_cache.remove(*id) {
315 let size = Self::item_size(&item);
316 self.l1_size_bytes.fetch_sub(size, Ordering::Release);
317 }
318
319 self.l3_filter.remove(id);
321
322 merkle_batch.delete(id.to_string());
324
325 succeeded += 1;
326 }
327
328 if let Some(ref l2) = self.l2_store {
330 for id in ids {
331 if let Err(e) = l2.delete(id).await {
332 warn!(id, error = %e, "Failed to delete from L2");
333 }
334 }
335 }
336
337 if let Some(ref l3) = self.l3_store {
339 for id in ids {
340 if let Err(e) = l3.delete(id).await {
341 warn!(id, error = %e, "Failed to delete from L3");
342 }
343 }
344 }
345
346 if let Some(ref sql_merkle) = self.sql_merkle {
348 if let Err(e) = sql_merkle.apply_batch(&merkle_batch).await {
349 error!(error = %e, "Failed to update SQL Merkle tree for batch deletion");
350 }
351 }
352
353 if let Some(ref redis_merkle) = self.redis_merkle {
354 if let Err(e) = redis_merkle.apply_batch(&merkle_batch).await {
355 warn!(error = %e, "Failed to update Redis Merkle tree for batch deletion");
356 }
357 }
358
359 info!(total, succeeded, "Batch delete completed");
360
361 Ok(BatchResult {
362 total,
363 succeeded,
364 failed: total - succeeded,
365 })
366 }
367
368 pub async fn get_or_insert_with<F, Fut>(
390 &self,
391 id: &str,
392 factory: F,
393 ) -> Result<SyncItem, StorageError>
394 where
395 F: FnOnce() -> Fut,
396 Fut: std::future::Future<Output = SyncItem>,
397 {
398 if let Some(item) = self.get(id).await? {
400 return Ok(item);
401 }
402
403 let item = factory().await;
405
406 self.submit(item.clone()).await?;
408
409 Ok(item)
410 }
411
412 pub async fn get_by_state(&self, state: &str, limit: usize) -> Result<Vec<SyncItem>, StorageError> {
434 if let Some(ref sql) = self.sql_store {
435 sql.get_by_state(state, limit).await
436 } else {
437 Ok(Vec::new())
438 }
439 }
440
441 pub async fn count_by_state(&self, state: &str) -> Result<u64, StorageError> {
454 if let Some(ref sql) = self.sql_store {
455 sql.count_by_state(state).await
456 } else {
457 Ok(0)
458 }
459 }
460
461 pub async fn list_state_ids(&self, state: &str, limit: usize) -> Result<Vec<String>, StorageError> {
465 if let Some(ref sql) = self.sql_store {
466 sql.list_state_ids(state, limit).await
467 } else {
468 Ok(Vec::new())
469 }
470 }
471
472 pub async fn set_state(&self, id: &str, new_state: &str) -> Result<bool, StorageError> {
479 let mut updated = false;
480
481 if let Some(ref sql) = self.sql_store {
483 updated = sql.set_state(id, new_state).await?;
484 }
485
486 Ok(updated)
491 }
492
493 pub async fn delete_by_state(&self, state: &str) -> Result<u64, StorageError> {
510 let mut deleted = 0u64;
511
512 let ids = if let Some(ref sql) = self.sql_store {
514 sql.list_state_ids(state, 100_000).await?
515 } else {
516 Vec::new()
517 };
518
519 for id in &ids {
521 self.l1_cache.remove(id);
522 }
523
524 if let Some(ref sql) = self.sql_store {
526 deleted = sql.delete_by_state(state).await?;
527 }
528
529 info!(state = %state, deleted = deleted, "Deleted items by state");
533
534 Ok(deleted)
535 }
536}
537
538#[cfg(test)]
539mod tests {
540 use super::*;
541 use crate::config::SyncEngineConfig;
542 use serde_json::json;
543 use tokio::sync::watch;
544
545 fn test_config() -> SyncEngineConfig {
546 SyncEngineConfig {
547 redis_url: None,
548 sql_url: None,
549 wal_path: None,
550 l1_max_bytes: 1024 * 1024,
551 ..Default::default()
552 }
553 }
554
555 fn test_item(id: &str) -> SyncItem {
556 SyncItem::from_json(id.to_string(), json!({"test": "data", "id": id}))
557 }
558
559 #[tokio::test]
560 async fn test_contains_l1_hit() {
561 let config = test_config();
562 let (_tx, rx) = watch::channel(config.clone());
563 let engine = SyncEngine::new(config, rx);
564
565 engine.l1_cache.insert("test.exists".into(), test_item("test.exists"));
566
567 assert!(engine.contains("test.exists").await);
568 }
569
570 #[tokio::test]
571 async fn test_contains_with_trusted_filter() {
572 let config = test_config();
573 let (_tx, rx) = watch::channel(config.clone());
574 let engine = SyncEngine::new(config, rx);
575
576 engine.l3_filter.mark_trusted();
577
578 engine.l1_cache.insert("test.exists".into(), test_item("test.exists"));
579 engine.l3_filter.insert("test.exists");
580
581 assert!(engine.contains("test.exists").await);
582 assert!(!engine.contains("test.missing").await);
583 }
584
585 #[test]
586 fn test_len_and_is_empty() {
587 let config = test_config();
588 let (_tx, rx) = watch::channel(config.clone());
589 let engine = SyncEngine::new(config, rx);
590
591 assert!(engine.is_empty());
592 assert_eq!(engine.len(), 0);
593
594 engine.l1_cache.insert("a".into(), test_item("a"));
595 assert!(!engine.is_empty());
596 assert_eq!(engine.len(), 1);
597
598 engine.l1_cache.insert("b".into(), test_item("b"));
599 assert_eq!(engine.len(), 2);
600 }
601
602 #[tokio::test]
603 async fn test_status_synced_in_l1() {
604 use super::super::EngineState;
605
606 let config = test_config();
607 let (_tx, rx) = watch::channel(config.clone());
608 let engine = SyncEngine::new(config, rx);
609 let _ = engine.state.send(EngineState::Ready);
610
611 engine.submit(test_item("test.item")).await.expect("Submit failed");
612 let _ = engine.l2_batcher.lock().await.force_flush();
613
614 let status = engine.status("test.item").await;
615 assert!(matches!(status, ItemStatus::Synced { in_l1: true, .. }));
616 }
617
618 #[tokio::test]
619 async fn test_status_pending() {
620 use super::super::EngineState;
621
622 let config = test_config();
623 let (_tx, rx) = watch::channel(config.clone());
624 let engine = SyncEngine::new(config, rx);
625 let _ = engine.state.send(EngineState::Ready);
626
627 engine.submit(test_item("test.pending")).await.expect("Submit failed");
628
629 let status = engine.status("test.pending").await;
630 assert_eq!(status, ItemStatus::Pending);
631 }
632
633 #[tokio::test]
634 async fn test_status_missing() {
635 let config = test_config();
636 let (_tx, rx) = watch::channel(config.clone());
637 let engine = SyncEngine::new(config, rx);
638
639 let status = engine.status("test.nonexistent").await;
640 assert_eq!(status, ItemStatus::Missing);
641 }
642
643 #[tokio::test]
644 async fn test_get_many_from_l1() {
645 use super::super::EngineState;
646
647 let config = test_config();
648 let (_tx, rx) = watch::channel(config.clone());
649 let engine = SyncEngine::new(config, rx);
650 let _ = engine.state.send(EngineState::Ready);
651
652 engine.l1_cache.insert("a".into(), test_item("a"));
653 engine.l1_cache.insert("b".into(), test_item("b"));
654 engine.l1_cache.insert("c".into(), test_item("c"));
655
656 let results = engine.get_many(&["a", "b", "missing", "c"]).await;
657
658 assert_eq!(results.len(), 4);
659 assert!(results[0].is_some());
660 assert!(results[1].is_some());
661 assert!(results[2].is_none());
662 assert!(results[3].is_some());
663
664 assert_eq!(results[0].as_ref().unwrap().object_id, "a");
665 assert_eq!(results[1].as_ref().unwrap().object_id, "b");
666 assert_eq!(results[3].as_ref().unwrap().object_id, "c");
667 }
668
669 #[tokio::test]
670 async fn test_submit_many() {
671 use super::super::EngineState;
672
673 let config = test_config();
674 let (_tx, rx) = watch::channel(config.clone());
675 let engine = SyncEngine::new(config, rx);
676 let _ = engine.state.send(EngineState::Ready);
677
678 let items = vec![
679 test_item("batch.1"),
680 test_item("batch.2"),
681 test_item("batch.3"),
682 ];
683
684 let result = engine.submit_many(items).await.expect("Batch submit failed");
685
686 assert_eq!(result.total, 3);
687 assert_eq!(result.succeeded, 3);
688 assert_eq!(result.failed, 0);
689 assert!(result.is_success());
690
691 assert_eq!(engine.len(), 3);
692 assert!(engine.contains("batch.1").await);
693 assert!(engine.contains("batch.2").await);
694 assert!(engine.contains("batch.3").await);
695 }
696
697 #[tokio::test]
698 async fn test_delete_many() {
699 use super::super::EngineState;
700
701 let config = test_config();
702 let (_tx, rx) = watch::channel(config.clone());
703 let engine = SyncEngine::new(config, rx);
704 let _ = engine.state.send(EngineState::Ready);
705
706 engine.l1_cache.insert("del.1".into(), test_item("del.1"));
707 engine.l1_cache.insert("del.2".into(), test_item("del.2"));
708 engine.l1_cache.insert("keep".into(), test_item("keep"));
709
710 let result = engine.delete_many(&["del.1", "del.2"]).await.expect("Batch delete failed");
711
712 assert_eq!(result.total, 2);
713 assert_eq!(result.succeeded, 2);
714 assert!(result.is_success());
715
716 assert!(!engine.l1_cache.contains_key("del.1"));
717 assert!(!engine.l1_cache.contains_key("del.2"));
718 assert!(engine.l1_cache.contains_key("keep"));
719 }
720
721 #[tokio::test]
722 async fn test_get_or_insert_with_existing() {
723 use super::super::EngineState;
724
725 let config = test_config();
726 let (_tx, rx) = watch::channel(config.clone());
727 let engine = SyncEngine::new(config, rx);
728 let _ = engine.state.send(EngineState::Ready);
729
730 let existing = test_item("existing");
731 engine.l1_cache.insert("existing".into(), existing.clone());
732
733 let factory_called = std::sync::atomic::AtomicBool::new(false);
734 let result = engine.get_or_insert_with("existing", || {
735 factory_called.store(true, std::sync::atomic::Ordering::SeqCst);
736 async { test_item("should_not_be_used") }
737 }).await.expect("get_or_insert_with failed");
738
739 assert!(!factory_called.load(std::sync::atomic::Ordering::SeqCst));
740 assert_eq!(result.object_id, "existing");
741 }
742
743 #[tokio::test]
744 async fn test_get_or_insert_with_missing() {
745 use super::super::EngineState;
746
747 let config = test_config();
748 let (_tx, rx) = watch::channel(config.clone());
749 let engine = SyncEngine::new(config, rx);
750 let _ = engine.state.send(EngineState::Ready);
751
752 let result = engine.get_or_insert_with("new_item", || async {
753 SyncItem::from_json("new_item".into(), json!({"created": "by factory"}))
754 }).await.expect("get_or_insert_with failed");
755
756 assert_eq!(result.object_id, "new_item");
757 assert!(engine.contains("new_item").await);
758 }
759}