1use std::sync::atomic::Ordering;
13use tokio::task::JoinSet;
14use tracing::{debug, info, warn, error};
15
16use crate::storage::traits::StorageError;
17use crate::sync_item::SyncItem;
18use crate::merkle::MerkleBatch;
19
20use super::{SyncEngine, ItemStatus, BatchResult};
21
22impl SyncEngine {
23 pub async fn contains(&self, id: &str) -> bool {
49 if self.l1_cache.contains_key(id) {
51 return true;
52 }
53
54 if let Some(ref l2) = self.l2_store {
56 if l2.exists(id).await.unwrap_or(false) {
57 return true;
58 }
59 }
60
61 if self.l3_filter.is_trusted() {
63 if !self.l3_filter.should_check_l3(id) {
65 return false; }
67 }
68
69 if let Some(ref l3) = self.l3_store {
71 if l3.exists(id).await.unwrap_or(false) {
72 if !self.l3_filter.is_trusted() {
74 self.l3_filter.insert(id);
75 }
76 return true;
77 }
78 }
79
80 false
81 }
82
83 #[must_use]
88 #[inline]
89 pub fn contains_fast(&self, id: &str) -> bool {
90 self.l1_cache.contains_key(id) || self.l3_filter.should_check_l3(id)
91 }
92
93 #[must_use]
95 #[inline]
96 pub fn len(&self) -> usize {
97 self.l1_cache.len()
98 }
99
100 #[must_use]
102 #[inline]
103 pub fn is_empty(&self) -> bool {
104 self.l1_cache.is_empty()
105 }
106
107 pub async fn status(&self, id: &str) -> ItemStatus {
127 let in_l1 = self.l1_cache.contains_key(id);
128
129 let pending = self.l2_batcher.lock().await.contains(id);
131 if pending {
132 return ItemStatus::Pending;
133 }
134
135 let in_l2 = if let Some(ref l2) = self.l2_store {
137 l2.exists(id).await.unwrap_or(false)
138 } else {
139 false
140 };
141
142 let in_l3 = if let Some(ref l3) = self.l3_store {
144 self.l3_filter.should_check_l3(id) && l3.get(id).await.ok().flatten().is_some()
145 } else {
146 false
147 };
148
149 if in_l1 || in_l2 || in_l3 {
150 ItemStatus::Synced { in_l1, in_l2, in_l3 }
151 } else {
152 ItemStatus::Missing
153 }
154 }
155
156 pub async fn get_many(&self, ids: &[&str]) -> Vec<Option<SyncItem>> {
182 let mut results: Vec<Option<SyncItem>> = vec![None; ids.len()];
183 let mut missing_indices: Vec<usize> = Vec::new();
184
185 for (i, id) in ids.iter().enumerate() {
187 if let Some(item) = self.l1_cache.get(*id) {
188 results[i] = Some(item.clone());
189 } else {
190 missing_indices.push(i);
191 }
192 }
193
194 if !missing_indices.is_empty() {
196 let mut join_set: JoinSet<(usize, Option<SyncItem>)> = JoinSet::new();
197
198 for &i in &missing_indices {
199 let id = ids[i].to_string();
200 let l2_store = self.l2_store.clone();
201 let l3_store = self.l3_store.clone();
202 let l3_filter = self.l3_filter.clone();
203
204 join_set.spawn(async move {
205 if let Some(ref l2) = l2_store {
207 if let Ok(Some(item)) = l2.get(&id).await {
208 return (i, Some(item));
209 }
210 }
211
212 if let Some(ref l3) = l3_store {
214 if !l3_filter.is_trusted() || l3_filter.should_check_l3(&id) {
215 if let Ok(Some(item)) = l3.get(&id).await {
216 return (i, Some(item));
217 }
218 }
219 }
220
221 (i, None)
222 });
223 }
224
225 while let Some(result) = join_set.join_next().await {
227 if let Ok((i, item)) = result {
228 results[i] = item;
229 }
230 }
231 }
232
233 results
234 }
235
236 pub async fn submit_many(&self, items: Vec<SyncItem>) -> Result<BatchResult, StorageError> {
256 if !self.should_accept_writes() {
257 return Err(StorageError::Backend(format!(
258 "Rejecting batch write: engine state={}, pressure={}",
259 self.state(),
260 self.pressure()
261 )));
262 }
263
264 let total = items.len();
265 let mut succeeded = 0;
266
267 for item in items {
268 self.insert_l1(item.clone());
269 self.l2_batcher.lock().await.add(item);
270 succeeded += 1;
271 }
272
273 debug!(total, succeeded, "Batch submitted to L1 and queue");
274
275 Ok(BatchResult {
276 total,
277 succeeded,
278 failed: total - succeeded,
279 })
280 }
281
282 pub async fn delete_many(&self, ids: &[&str]) -> Result<BatchResult, StorageError> {
298 if !self.should_accept_writes() {
299 return Err(StorageError::Backend(format!(
300 "Rejecting batch delete: engine state={}, pressure={}",
301 self.state(),
302 self.pressure()
303 )));
304 }
305
306 let total = ids.len();
307 let mut succeeded = 0;
308
309 let mut merkle_batch = MerkleBatch::new();
311
312 for id in ids {
313 if let Some((_, item)) = self.l1_cache.remove(*id) {
315 let size = Self::item_size(&item);
316 self.l1_size_bytes.fetch_sub(size, Ordering::Release);
317 }
318
319 self.l3_filter.remove(id);
321
322 merkle_batch.delete(id.to_string());
324
325 succeeded += 1;
326 }
327
328 if let Some(ref l2) = self.l2_store {
330 for id in ids {
331 if let Err(e) = l2.delete(id).await {
332 warn!(id, error = %e, "Failed to delete from L2");
333 }
334 }
335 }
336
337 if let Some(ref l3) = self.l3_store {
339 for id in ids {
340 if let Err(e) = l3.delete(id).await {
341 warn!(id, error = %e, "Failed to delete from L3");
342 }
343 }
344 }
345
346 if let Some(ref sql_merkle) = self.sql_merkle {
348 if let Err(e) = sql_merkle.apply_batch(&merkle_batch).await {
349 error!(error = %e, "Failed to update SQL Merkle tree for batch deletion");
350 }
351 }
352
353 if let Some(ref redis_merkle) = self.redis_merkle {
354 if let Err(e) = redis_merkle.apply_batch(&merkle_batch).await {
355 warn!(error = %e, "Failed to update Redis Merkle tree for batch deletion");
356 }
357 }
358
359 info!(total, succeeded, "Batch delete completed");
360
361 Ok(BatchResult {
362 total,
363 succeeded,
364 failed: total - succeeded,
365 })
366 }
367
368 pub async fn get_or_insert_with<F, Fut>(
390 &self,
391 id: &str,
392 factory: F,
393 ) -> Result<SyncItem, StorageError>
394 where
395 F: FnOnce() -> Fut,
396 Fut: std::future::Future<Output = SyncItem>,
397 {
398 if let Some(item) = self.get(id).await? {
400 return Ok(item);
401 }
402
403 let item = factory().await;
405
406 self.submit(item.clone()).await?;
408
409 Ok(item)
410 }
411}
412
413#[cfg(test)]
414mod tests {
415 use super::*;
416 use crate::config::SyncEngineConfig;
417 use serde_json::json;
418 use tokio::sync::watch;
419
420 fn test_config() -> SyncEngineConfig {
421 SyncEngineConfig {
422 redis_url: None,
423 sql_url: None,
424 wal_path: None,
425 l1_max_bytes: 1024 * 1024,
426 ..Default::default()
427 }
428 }
429
430 fn test_item(id: &str) -> SyncItem {
431 SyncItem::from_json(id.to_string(), json!({"test": "data", "id": id}))
432 }
433
434 #[tokio::test]
435 async fn test_contains_l1_hit() {
436 let config = test_config();
437 let (_tx, rx) = watch::channel(config.clone());
438 let engine = SyncEngine::new(config, rx);
439
440 engine.l1_cache.insert("test.exists".into(), test_item("test.exists"));
441
442 assert!(engine.contains("test.exists").await);
443 }
444
445 #[tokio::test]
446 async fn test_contains_with_trusted_filter() {
447 let config = test_config();
448 let (_tx, rx) = watch::channel(config.clone());
449 let engine = SyncEngine::new(config, rx);
450
451 engine.l3_filter.mark_trusted();
452
453 engine.l1_cache.insert("test.exists".into(), test_item("test.exists"));
454 engine.l3_filter.insert("test.exists");
455
456 assert!(engine.contains("test.exists").await);
457 assert!(!engine.contains("test.missing").await);
458 }
459
460 #[test]
461 fn test_len_and_is_empty() {
462 let config = test_config();
463 let (_tx, rx) = watch::channel(config.clone());
464 let engine = SyncEngine::new(config, rx);
465
466 assert!(engine.is_empty());
467 assert_eq!(engine.len(), 0);
468
469 engine.l1_cache.insert("a".into(), test_item("a"));
470 assert!(!engine.is_empty());
471 assert_eq!(engine.len(), 1);
472
473 engine.l1_cache.insert("b".into(), test_item("b"));
474 assert_eq!(engine.len(), 2);
475 }
476
477 #[tokio::test]
478 async fn test_status_synced_in_l1() {
479 use super::super::EngineState;
480
481 let config = test_config();
482 let (_tx, rx) = watch::channel(config.clone());
483 let engine = SyncEngine::new(config, rx);
484 let _ = engine.state.send(EngineState::Ready);
485
486 engine.submit(test_item("test.item")).await.expect("Submit failed");
487 let _ = engine.l2_batcher.lock().await.force_flush();
488
489 let status = engine.status("test.item").await;
490 assert!(matches!(status, ItemStatus::Synced { in_l1: true, .. }));
491 }
492
493 #[tokio::test]
494 async fn test_status_pending() {
495 use super::super::EngineState;
496
497 let config = test_config();
498 let (_tx, rx) = watch::channel(config.clone());
499 let engine = SyncEngine::new(config, rx);
500 let _ = engine.state.send(EngineState::Ready);
501
502 engine.submit(test_item("test.pending")).await.expect("Submit failed");
503
504 let status = engine.status("test.pending").await;
505 assert_eq!(status, ItemStatus::Pending);
506 }
507
508 #[tokio::test]
509 async fn test_status_missing() {
510 let config = test_config();
511 let (_tx, rx) = watch::channel(config.clone());
512 let engine = SyncEngine::new(config, rx);
513
514 let status = engine.status("test.nonexistent").await;
515 assert_eq!(status, ItemStatus::Missing);
516 }
517
518 #[tokio::test]
519 async fn test_get_many_from_l1() {
520 use super::super::EngineState;
521
522 let config = test_config();
523 let (_tx, rx) = watch::channel(config.clone());
524 let engine = SyncEngine::new(config, rx);
525 let _ = engine.state.send(EngineState::Ready);
526
527 engine.l1_cache.insert("a".into(), test_item("a"));
528 engine.l1_cache.insert("b".into(), test_item("b"));
529 engine.l1_cache.insert("c".into(), test_item("c"));
530
531 let results = engine.get_many(&["a", "b", "missing", "c"]).await;
532
533 assert_eq!(results.len(), 4);
534 assert!(results[0].is_some());
535 assert!(results[1].is_some());
536 assert!(results[2].is_none());
537 assert!(results[3].is_some());
538
539 assert_eq!(results[0].as_ref().unwrap().object_id, "a");
540 assert_eq!(results[1].as_ref().unwrap().object_id, "b");
541 assert_eq!(results[3].as_ref().unwrap().object_id, "c");
542 }
543
544 #[tokio::test]
545 async fn test_submit_many() {
546 use super::super::EngineState;
547
548 let config = test_config();
549 let (_tx, rx) = watch::channel(config.clone());
550 let engine = SyncEngine::new(config, rx);
551 let _ = engine.state.send(EngineState::Ready);
552
553 let items = vec![
554 test_item("batch.1"),
555 test_item("batch.2"),
556 test_item("batch.3"),
557 ];
558
559 let result = engine.submit_many(items).await.expect("Batch submit failed");
560
561 assert_eq!(result.total, 3);
562 assert_eq!(result.succeeded, 3);
563 assert_eq!(result.failed, 0);
564 assert!(result.is_success());
565
566 assert_eq!(engine.len(), 3);
567 assert!(engine.contains("batch.1").await);
568 assert!(engine.contains("batch.2").await);
569 assert!(engine.contains("batch.3").await);
570 }
571
572 #[tokio::test]
573 async fn test_delete_many() {
574 use super::super::EngineState;
575
576 let config = test_config();
577 let (_tx, rx) = watch::channel(config.clone());
578 let engine = SyncEngine::new(config, rx);
579 let _ = engine.state.send(EngineState::Ready);
580
581 engine.l1_cache.insert("del.1".into(), test_item("del.1"));
582 engine.l1_cache.insert("del.2".into(), test_item("del.2"));
583 engine.l1_cache.insert("keep".into(), test_item("keep"));
584
585 let result = engine.delete_many(&["del.1", "del.2"]).await.expect("Batch delete failed");
586
587 assert_eq!(result.total, 2);
588 assert_eq!(result.succeeded, 2);
589 assert!(result.is_success());
590
591 assert!(!engine.l1_cache.contains_key("del.1"));
592 assert!(!engine.l1_cache.contains_key("del.2"));
593 assert!(engine.l1_cache.contains_key("keep"));
594 }
595
596 #[tokio::test]
597 async fn test_get_or_insert_with_existing() {
598 use super::super::EngineState;
599
600 let config = test_config();
601 let (_tx, rx) = watch::channel(config.clone());
602 let engine = SyncEngine::new(config, rx);
603 let _ = engine.state.send(EngineState::Ready);
604
605 let existing = test_item("existing");
606 engine.l1_cache.insert("existing".into(), existing.clone());
607
608 let factory_called = std::sync::atomic::AtomicBool::new(false);
609 let result = engine.get_or_insert_with("existing", || {
610 factory_called.store(true, std::sync::atomic::Ordering::SeqCst);
611 async { test_item("should_not_be_used") }
612 }).await.expect("get_or_insert_with failed");
613
614 assert!(!factory_called.load(std::sync::atomic::Ordering::SeqCst));
615 assert_eq!(result.object_id, "existing");
616 }
617
618 #[tokio::test]
619 async fn test_get_or_insert_with_missing() {
620 use super::super::EngineState;
621
622 let config = test_config();
623 let (_tx, rx) = watch::channel(config.clone());
624 let engine = SyncEngine::new(config, rx);
625 let _ = engine.state.send(EngineState::Ready);
626
627 let result = engine.get_or_insert_with("new_item", || async {
628 SyncItem::from_json("new_item".into(), json!({"created": "by factory"}))
629 }).await.expect("get_or_insert_with failed");
630
631 assert_eq!(result.object_id, "new_item");
632 assert!(engine.contains("new_item").await);
633 }
634}