1use crate::error::Result;
24use crate::stream::{CdcEvent, CdcOp};
25use crate::sync_engine::{SyncEngineRef, SyncItem};
26use std::collections::HashMap;
27use std::sync::Arc;
28use std::time::{Duration, Instant};
29use tokio::sync::Mutex;
30use tokio::task::JoinSet;
31use tracing::{debug, info, instrument, warn};
32
33#[derive(Debug, Clone)]
35pub struct BatchConfig {
36 pub max_batch_size: usize,
38 pub max_batch_delay: Duration,
40 pub max_concurrent_checks: usize,
42}
43
44impl Default for BatchConfig {
45 fn default() -> Self {
46 Self {
47 max_batch_size: 100,
48 max_batch_delay: Duration::from_millis(50),
49 max_concurrent_checks: 32,
50 }
51 }
52}
53
54impl BatchConfig {
55 pub fn testing() -> Self {
57 Self {
58 max_batch_size: 10,
59 max_batch_delay: Duration::from_millis(5),
60 max_concurrent_checks: 4,
61 }
62 }
63}
64
65#[derive(Debug, Clone)]
67enum PendingOp {
68 Put {
69 data: Vec<u8>,
70 hash: String,
71 version: u64,
72 },
73 Delete,
74}
75
76#[derive(Debug, Default)]
78pub struct BatchResult {
79 pub total: usize,
81 pub skipped: usize,
83 pub submitted: usize,
85 pub deleted: usize,
87 pub errors: usize,
89}
90
91impl BatchResult {
92 pub fn is_success(&self) -> bool {
94 self.errors == 0
95 }
96}
97
98pub struct BatchProcessor<S: SyncEngineRef> {
100 pending: HashMap<String, PendingOp>,
102 batch_start: Option<Instant>,
104 config: BatchConfig,
106 sync_engine: Arc<S>,
108 peer_id: String,
110}
111
112impl<S: SyncEngineRef> BatchProcessor<S> {
113 pub fn new(sync_engine: Arc<S>, peer_id: String, config: BatchConfig) -> Self {
115 Self {
116 pending: HashMap::new(),
117 batch_start: None,
118 config,
119 sync_engine,
120 peer_id,
121 }
122 }
123
124 pub fn add(&mut self, event: CdcEvent) {
128 if self.batch_start.is_none() {
130 self.batch_start = Some(Instant::now());
131 }
132
133 let op = match event.op {
135 CdcOp::Put => {
136 if let (Some(data), Some(hash)) = (event.data, event.hash) {
138 let version = event.meta.map(|m| m.version).unwrap_or(0);
139 PendingOp::Put { data, hash, version }
140 } else {
141 warn!(
142 peer_id = %self.peer_id,
143 key = %event.key,
144 "PUT event missing data or hash, skipping"
145 );
146 return;
147 }
148 }
149 CdcOp::Delete => PendingOp::Delete,
150 };
151
152 self.pending.insert(event.key, op);
153 }
154
155 pub fn should_flush(&self) -> bool {
157 if self.pending.len() >= self.config.max_batch_size {
159 return true;
160 }
161
162 if let Some(start) = self.batch_start {
164 if start.elapsed() >= self.config.max_batch_delay {
165 return true;
166 }
167 }
168
169 false
170 }
171
172 pub fn len(&self) -> usize {
174 self.pending.len()
175 }
176
177 pub fn is_empty(&self) -> bool {
179 self.pending.is_empty()
180 }
181
182 #[instrument(skip(self), fields(peer_id = %self.peer_id))]
188 pub async fn flush(&mut self) -> Result<BatchResult> {
189 if self.pending.is_empty() {
190 return Ok(BatchResult::default());
191 }
192
193 let batch = std::mem::take(&mut self.pending);
194 self.batch_start = None;
195
196 let total = batch.len();
197 debug!(
198 peer_id = %self.peer_id,
199 batch_size = total,
200 "Flushing batch"
201 );
202
203 let mut puts: Vec<(String, Vec<u8>, String, u64)> = Vec::new();
205 let mut deletes: Vec<String> = Vec::new();
206
207 for (key, op) in batch {
208 match op {
209 PendingOp::Put { data, hash, version } => {
210 puts.push((key, data, hash, version));
211 }
212 PendingOp::Delete => {
213 deletes.push(key);
214 }
215 }
216 }
217
218 let mut result = BatchResult {
219 total,
220 ..Default::default()
221 };
222
223 let filtered_puts = self.dedup_puts(puts).await?;
225 result.skipped = total - filtered_puts.len() - deletes.len();
226
227 for (key, data, hash, version) in filtered_puts {
229 let mut item = SyncItem::new(key.clone(), data);
230 item.version = version;
231 item.content_hash = hash;
234
235 match self.sync_engine.submit(item).await {
236 Ok(()) => {
237 result.submitted += 1;
238 }
239 Err(e) => {
240 warn!(
241 peer_id = %self.peer_id,
242 key = %key,
243 error = %e,
244 "Failed to submit item"
245 );
246 result.errors += 1;
247 }
248 }
249 }
250
251 for key in deletes {
253 match self.sync_engine.delete(key.clone()).await {
254 Ok(_deleted) => {
255 result.deleted += 1;
256 }
257 Err(e) => {
258 warn!(
259 peer_id = %self.peer_id,
260 key = %key,
261 error = %e,
262 "Failed to delete item"
263 );
264 result.errors += 1;
265 }
266 }
267 }
268
269 info!(
270 peer_id = %self.peer_id,
271 total = result.total,
272 skipped = result.skipped,
273 submitted = result.submitted,
274 deleted = result.deleted,
275 errors = result.errors,
276 "Batch flush complete"
277 );
278
279 Ok(result)
280 }
281
282 async fn dedup_puts(
284 &self,
285 puts: Vec<(String, Vec<u8>, String, u64)>,
286 ) -> Result<Vec<(String, Vec<u8>, String, u64)>> {
287 if puts.is_empty() {
288 return Ok(Vec::new());
289 }
290
291 let mut to_submit = Vec::with_capacity(puts.len());
292 let mut join_set: JoinSet<(String, Vec<u8>, String, u64, bool)> = JoinSet::new();
293
294 for (key, data, hash, version) in puts {
296 let sync_engine = Arc::clone(&self.sync_engine);
297 let key_clone = key.clone();
298 let hash_clone = hash.clone();
299
300 join_set.spawn(async move {
301 let is_current = sync_engine
302 .is_current(&key_clone, &hash_clone)
303 .await
304 .unwrap_or(false);
305 (key, data, hash, version, is_current)
306 });
307 }
308
309 while let Some(result) = join_set.join_next().await {
311 match result {
312 Ok((key, data, hash, version, is_current)) => {
313 if !is_current {
314 to_submit.push((key, data, hash, version));
315 } else {
316 debug!(key = %key, "Skipping (already current)");
317 }
318 }
319 Err(e) => {
320 warn!(error = %e, "is_current check failed (JoinError)");
321 }
322 }
323 }
324
325 Ok(to_submit)
326 }
327}
328
329pub struct SharedBatchProcessor<S: SyncEngineRef> {
331 inner: Mutex<BatchProcessor<S>>,
332}
333
334impl<S: SyncEngineRef> SharedBatchProcessor<S> {
335 pub fn new(sync_engine: Arc<S>, peer_id: String, config: BatchConfig) -> Self {
337 Self {
338 inner: Mutex::new(BatchProcessor::new(sync_engine, peer_id, config)),
339 }
340 }
341
342 pub async fn add(&self, event: CdcEvent) -> Result<Option<BatchResult>> {
344 let mut processor = self.inner.lock().await;
345 processor.add(event);
346
347 if processor.should_flush() {
348 Ok(Some(processor.flush().await?))
349 } else {
350 Ok(None)
351 }
352 }
353
354 pub async fn flush(&self) -> Result<BatchResult> {
356 let mut processor = self.inner.lock().await;
357 processor.flush().await
358 }
359
360 pub async fn is_empty(&self) -> bool {
362 self.inner.lock().await.is_empty()
363 }
364
365 pub async fn len(&self) -> usize {
367 self.inner.lock().await.len()
368 }
369}
370
371#[cfg(test)]
372mod tests {
373 use super::*;
374 use crate::sync_engine::SyncResult;
375 use std::sync::atomic::{AtomicBool, AtomicUsize, Ordering};
376
377 struct TrackingSyncEngine {
379 submit_count: AtomicUsize,
380 delete_count: AtomicUsize,
381 is_current_count: AtomicUsize,
382 always_current: AtomicBool,
383 }
384
385 impl TrackingSyncEngine {
386 fn new() -> Self {
387 Self {
388 submit_count: AtomicUsize::new(0),
389 delete_count: AtomicUsize::new(0),
390 is_current_count: AtomicUsize::new(0),
391 always_current: AtomicBool::new(false),
392 }
393 }
394
395 fn set_always_current(&self, value: bool) {
396 self.always_current.store(value, Ordering::SeqCst);
397 }
398 }
399
400 impl SyncEngineRef for TrackingSyncEngine {
401 fn submit(
402 &self,
403 _item: SyncItem,
404 ) -> std::pin::Pin<Box<dyn std::future::Future<Output = SyncResult<()>> + Send + '_>> {
405 self.submit_count.fetch_add(1, Ordering::SeqCst);
406 Box::pin(async { Ok(()) })
407 }
408
409 fn delete(
410 &self,
411 _key: String,
412 ) -> std::pin::Pin<Box<dyn std::future::Future<Output = SyncResult<bool>> + Send + '_>> {
413 self.delete_count.fetch_add(1, Ordering::SeqCst);
414 Box::pin(async { Ok(true) })
415 }
416
417 fn is_current(
418 &self,
419 _key: &str,
420 _content_hash: &str,
421 ) -> std::pin::Pin<Box<dyn std::future::Future<Output = SyncResult<bool>> + Send + '_>> {
422 self.is_current_count.fetch_add(1, Ordering::SeqCst);
423 let result = self.always_current.load(Ordering::SeqCst);
424 Box::pin(async move { Ok(result) })
425 }
426
427 fn get_merkle_root(&self) -> crate::sync_engine::BoxFuture<'_, Option<[u8; 32]>> {
428 Box::pin(async { Ok(None) })
429 }
430
431 fn get_merkle_children(
432 &self,
433 _path: &str,
434 ) -> crate::sync_engine::BoxFuture<'_, Vec<(String, [u8; 32])>> {
435 Box::pin(async { Ok(Vec::new()) })
436 }
437
438 fn get(&self, _key: &str) -> crate::sync_engine::BoxFuture<'_, Option<Vec<u8>>> {
439 Box::pin(async { Ok(None) })
440 }
441 }
442
443 fn make_put_event(key: &str, data: &str) -> CdcEvent {
444 CdcEvent {
445 stream_id: "0-0".to_string(),
446 op: CdcOp::Put,
447 key: key.to_string(),
448 hash: Some(format!("hash_{}", key)),
449 data: Some(data.as_bytes().to_vec()),
450 meta: None,
451 }
452 }
453
454 fn make_delete_event(key: &str) -> CdcEvent {
455 CdcEvent {
456 stream_id: "0-0".to_string(),
457 op: CdcOp::Delete,
458 key: key.to_string(),
459 hash: None,
460 data: None,
461 meta: None,
462 }
463 }
464
465 #[tokio::test]
466 async fn test_batch_key_deduplication() {
467 let engine = Arc::new(TrackingSyncEngine::new());
468 let mut processor = BatchProcessor::new(
469 Arc::clone(&engine),
470 "test-peer".to_string(),
471 BatchConfig::testing(),
472 );
473
474 processor.add(make_put_event("user.1", "v1"));
476 processor.add(make_put_event("user.1", "v2"));
477 processor.add(make_put_event("user.1", "v3")); assert_eq!(processor.len(), 1);
480
481 let result = processor.flush().await.unwrap();
482 assert_eq!(result.total, 1);
483 assert_eq!(result.submitted, 1);
484 assert_eq!(engine.submit_count.load(Ordering::SeqCst), 1);
485 }
486
487 #[tokio::test]
488 async fn test_batch_dedup_skips_current() {
489 let engine = Arc::new(TrackingSyncEngine::new());
490 engine.set_always_current(true); let mut processor = BatchProcessor::new(
493 Arc::clone(&engine),
494 "test-peer".to_string(),
495 BatchConfig::testing(),
496 );
497
498 processor.add(make_put_event("user.1", "data1"));
499 processor.add(make_put_event("user.2", "data2"));
500
501 let result = processor.flush().await.unwrap();
502 assert_eq!(result.total, 2);
503 assert_eq!(result.skipped, 2);
504 assert_eq!(result.submitted, 0);
505
506 assert_eq!(engine.is_current_count.load(Ordering::SeqCst), 2);
508 assert_eq!(engine.submit_count.load(Ordering::SeqCst), 0);
510 }
511
512 #[tokio::test]
513 async fn test_batch_mixed_operations() {
514 let engine = Arc::new(TrackingSyncEngine::new());
515 let mut processor = BatchProcessor::new(
516 Arc::clone(&engine),
517 "test-peer".to_string(),
518 BatchConfig::testing(),
519 );
520
521 processor.add(make_put_event("user.1", "data1"));
522 processor.add(make_delete_event("user.2"));
523 processor.add(make_put_event("user.3", "data3"));
524
525 let result = processor.flush().await.unwrap();
526 assert_eq!(result.total, 3);
527 assert_eq!(result.submitted, 2); assert_eq!(result.deleted, 1); }
530
531 #[tokio::test]
532 async fn test_put_then_delete_same_key() {
533 let engine = Arc::new(TrackingSyncEngine::new());
534 let mut processor = BatchProcessor::new(
535 Arc::clone(&engine),
536 "test-peer".to_string(),
537 BatchConfig::testing(),
538 );
539
540 processor.add(make_put_event("user.1", "data"));
542 processor.add(make_delete_event("user.1"));
543
544 let result = processor.flush().await.unwrap();
545 assert_eq!(result.total, 1);
546 assert_eq!(result.submitted, 0);
547 assert_eq!(result.deleted, 1);
548 }
549
550 #[tokio::test]
551 async fn test_delete_then_put_same_key() {
552 let engine = Arc::new(TrackingSyncEngine::new());
553 let mut processor = BatchProcessor::new(
554 Arc::clone(&engine),
555 "test-peer".to_string(),
556 BatchConfig::testing(),
557 );
558
559 processor.add(make_delete_event("user.1"));
561 processor.add(make_put_event("user.1", "data"));
562
563 let result = processor.flush().await.unwrap();
564 assert_eq!(result.total, 1);
565 assert_eq!(result.submitted, 1);
566 assert_eq!(result.deleted, 0);
567 }
568
569 #[tokio::test]
570 async fn test_should_flush_by_size() {
571 let engine = Arc::new(TrackingSyncEngine::new());
572 let mut processor = BatchProcessor::new(
573 Arc::clone(&engine),
574 "test-peer".to_string(),
575 BatchConfig {
576 max_batch_size: 3,
577 max_batch_delay: Duration::from_secs(60),
578 max_concurrent_checks: 4,
579 },
580 );
581
582 processor.add(make_put_event("a", "1"));
583 assert!(!processor.should_flush());
584
585 processor.add(make_put_event("b", "2"));
586 assert!(!processor.should_flush());
587
588 processor.add(make_put_event("c", "3"));
589 assert!(processor.should_flush()); }
591
592 #[tokio::test]
593 async fn test_empty_flush() {
594 let engine = Arc::new(TrackingSyncEngine::new());
595 let mut processor = BatchProcessor::new(
596 Arc::clone(&engine),
597 "test-peer".to_string(),
598 BatchConfig::testing(),
599 );
600
601 let result = processor.flush().await.unwrap();
602 assert_eq!(result.total, 0);
603 assert!(result.is_success());
604 }
605
606 #[tokio::test]
607 async fn test_batch_is_empty() {
608 let engine = Arc::new(TrackingSyncEngine::new());
609 let mut processor = BatchProcessor::new(
610 Arc::clone(&engine),
611 "test-peer".to_string(),
612 BatchConfig::testing(),
613 );
614
615 assert!(processor.is_empty());
616 assert_eq!(processor.len(), 0);
617
618 processor.add(make_put_event("key", "data"));
619 assert!(!processor.is_empty());
620 assert_eq!(processor.len(), 1);
621
622 processor.flush().await.unwrap();
623 assert!(processor.is_empty());
624 assert_eq!(processor.len(), 0);
625 }
626
627 #[tokio::test]
628 async fn test_should_flush_by_time() {
629 let engine = Arc::new(TrackingSyncEngine::new());
630 let mut processor = BatchProcessor::new(
631 Arc::clone(&engine),
632 "test-peer".to_string(),
633 BatchConfig {
634 max_batch_size: 1000, max_batch_delay: Duration::from_millis(10),
636 max_concurrent_checks: 4,
637 },
638 );
639
640 processor.add(make_put_event("a", "1"));
641 assert!(!processor.should_flush());
642
643 tokio::time::sleep(Duration::from_millis(15)).await;
645 assert!(processor.should_flush());
646 }
647
648 #[tokio::test]
649 async fn test_batch_result_is_success() {
650 let mut result = BatchResult::default();
651 assert!(result.is_success());
652
653 result.errors = 1;
654 assert!(!result.is_success());
655
656 result.errors = 0;
657 result.submitted = 10;
658 result.skipped = 5;
659 assert!(result.is_success());
660 }
661
662 #[test]
663 fn test_batch_config_default() {
664 let config = BatchConfig::default();
665 assert_eq!(config.max_batch_size, 100);
666 assert_eq!(config.max_batch_delay, Duration::from_millis(50));
667 assert_eq!(config.max_concurrent_checks, 32);
668 }
669
670 #[test]
671 fn test_batch_config_testing() {
672 let config = BatchConfig::testing();
673 assert_eq!(config.max_batch_size, 10);
674 assert_eq!(config.max_batch_delay, Duration::from_millis(5));
675 assert_eq!(config.max_concurrent_checks, 4);
676 }
677
678 #[tokio::test]
679 async fn test_put_event_without_data() {
680 let engine = Arc::new(TrackingSyncEngine::new());
681 let mut processor = BatchProcessor::new(
682 Arc::clone(&engine),
683 "test-peer".to_string(),
684 BatchConfig::testing(),
685 );
686
687 let event = CdcEvent {
689 stream_id: "0-0".to_string(),
690 op: CdcOp::Put,
691 key: "key1".to_string(),
692 hash: Some("hash".to_string()),
693 data: None, meta: None,
695 };
696 processor.add(event);
697
698 assert_eq!(processor.len(), 0); }
700
701 #[tokio::test]
702 async fn test_put_event_without_hash() {
703 let engine = Arc::new(TrackingSyncEngine::new());
704 let mut processor = BatchProcessor::new(
705 Arc::clone(&engine),
706 "test-peer".to_string(),
707 BatchConfig::testing(),
708 );
709
710 let event = CdcEvent {
712 stream_id: "0-0".to_string(),
713 op: CdcOp::Put,
714 key: "key1".to_string(),
715 hash: None, data: Some(vec![1, 2, 3]),
717 meta: None,
718 };
719 processor.add(event);
720
721 assert_eq!(processor.len(), 0); }
723
724 #[tokio::test]
725 async fn test_multiple_puts_same_key_different_data() {
726 let engine = Arc::new(TrackingSyncEngine::new());
727 let mut processor = BatchProcessor::new(
728 Arc::clone(&engine),
729 "test-peer".to_string(),
730 BatchConfig::testing(),
731 );
732
733 processor.add(make_put_event("user.1", "first"));
735 processor.add(make_put_event("user.1", "second"));
737 processor.add(make_put_event("user.1", "third"));
739
740 assert_eq!(processor.len(), 1);
741
742 let result = processor.flush().await.unwrap();
743 assert_eq!(result.submitted, 1);
744 }
745
746 #[tokio::test]
747 async fn test_many_different_keys() {
748 let engine = Arc::new(TrackingSyncEngine::new());
749 let mut processor = BatchProcessor::new(
750 Arc::clone(&engine),
751 "test-peer".to_string(),
752 BatchConfig::testing(),
753 );
754
755 for i in 0..50 {
757 processor.add(make_put_event(&format!("key.{}", i), &format!("data{}", i)));
758 }
759
760 assert_eq!(processor.len(), 50);
761
762 let result = processor.flush().await.unwrap();
763 assert_eq!(result.total, 50);
764 assert_eq!(result.submitted, 50);
765 }
766
767 #[tokio::test]
768 async fn test_interleaved_operations() {
769 let engine = Arc::new(TrackingSyncEngine::new());
770 let mut processor = BatchProcessor::new(
771 Arc::clone(&engine),
772 "test-peer".to_string(),
773 BatchConfig::testing(),
774 );
775
776 processor.add(make_put_event("a", "1"));
778 processor.add(make_delete_event("b"));
779 processor.add(make_put_event("c", "3"));
780 processor.add(make_delete_event("d"));
781 processor.add(make_put_event("e", "5"));
782
783 assert_eq!(processor.len(), 5);
784
785 let result = processor.flush().await.unwrap();
786 assert_eq!(result.total, 5);
787 assert_eq!(result.submitted, 3);
788 assert_eq!(result.deleted, 2);
789 }
790}