1use crate::error::Result;
24use crate::stream::{CdcEvent, CdcOp};
25use crate::sync_engine::{SyncEngineRef, SyncItem};
26use std::collections::HashMap;
27use std::sync::Arc;
28use std::time::{Duration, Instant};
29use tokio::sync::Mutex;
30use tokio::task::JoinSet;
31use tracing::{debug, info, instrument, warn};
32
33#[derive(Debug, Clone)]
35pub struct BatchConfig {
36 pub max_batch_size: usize,
38 pub max_batch_delay: Duration,
40 pub max_concurrent_checks: usize,
42}
43
44impl Default for BatchConfig {
45 fn default() -> Self {
46 Self {
47 max_batch_size: 100,
48 max_batch_delay: Duration::from_millis(50),
49 max_concurrent_checks: 32,
50 }
51 }
52}
53
54impl BatchConfig {
55 pub fn testing() -> Self {
57 Self {
58 max_batch_size: 10,
59 max_batch_delay: Duration::from_millis(5),
60 max_concurrent_checks: 4,
61 }
62 }
63}
64
65#[derive(Debug, Clone)]
67enum PendingOp {
68 Put {
69 data: Vec<u8>,
70 hash: String,
71 version: u64,
72 },
73 Delete,
74}
75
76#[derive(Debug, Default)]
78pub struct BatchResult {
79 pub total: usize,
81 pub skipped: usize,
83 pub submitted: usize,
85 pub deleted: usize,
87 pub errors: usize,
89}
90
91impl BatchResult {
92 pub fn is_success(&self) -> bool {
94 self.errors == 0
95 }
96}
97
98pub struct BatchProcessor<S: SyncEngineRef> {
100 pending: HashMap<String, PendingOp>,
102 batch_start: Option<Instant>,
104 config: BatchConfig,
106 sync_engine: Arc<S>,
108 peer_id: String,
110}
111
112impl<S: SyncEngineRef> BatchProcessor<S> {
113 pub fn new(sync_engine: Arc<S>, peer_id: String, config: BatchConfig) -> Self {
115 Self {
116 pending: HashMap::new(),
117 batch_start: None,
118 config,
119 sync_engine,
120 peer_id,
121 }
122 }
123
124 pub fn add(&mut self, event: CdcEvent) {
128 if self.batch_start.is_none() {
130 self.batch_start = Some(Instant::now());
131 }
132
133 let op = match event.op {
135 CdcOp::Put => {
136 if let (Some(data), Some(hash)) = (event.data, event.hash) {
138 let version = event.meta.map(|m| m.version).unwrap_or(0);
139 PendingOp::Put { data, hash, version }
140 } else {
141 warn!(
142 peer_id = %self.peer_id,
143 key = %event.key,
144 "PUT event missing data or hash, skipping"
145 );
146 return;
147 }
148 }
149 CdcOp::Delete => PendingOp::Delete,
150 };
151
152 self.pending.insert(event.key, op);
153 }
154
155 pub fn should_flush(&self) -> bool {
157 if self.pending.len() >= self.config.max_batch_size {
159 return true;
160 }
161
162 if let Some(start) = self.batch_start {
164 if start.elapsed() >= self.config.max_batch_delay {
165 return true;
166 }
167 }
168
169 false
170 }
171
172 pub fn len(&self) -> usize {
174 self.pending.len()
175 }
176
177 pub fn is_empty(&self) -> bool {
179 self.pending.is_empty()
180 }
181
182 #[instrument(skip(self), fields(peer_id = %self.peer_id))]
188 pub async fn flush(&mut self) -> Result<BatchResult> {
189 if self.pending.is_empty() {
190 return Ok(BatchResult::default());
191 }
192
193 let batch = std::mem::take(&mut self.pending);
194 self.batch_start = None;
195
196 let total = batch.len();
197 debug!(
198 peer_id = %self.peer_id,
199 batch_size = total,
200 "Flushing batch"
201 );
202
203 let mut puts: Vec<(String, Vec<u8>, String, u64)> = Vec::new();
205 let mut deletes: Vec<String> = Vec::new();
206
207 for (key, op) in batch {
208 match op {
209 PendingOp::Put { data, hash, version } => {
210 puts.push((key, data, hash, version));
211 }
212 PendingOp::Delete => {
213 deletes.push(key);
214 }
215 }
216 }
217
218 let mut result = BatchResult {
219 total,
220 ..Default::default()
221 };
222
223 let filtered_puts = self.dedup_puts(puts).await?;
225 result.skipped = total - filtered_puts.len() - deletes.len();
226
227 for (key, data, hash, version) in filtered_puts {
229 let mut item = SyncItem::new(key.clone(), data);
230 item.version = version;
231 item.content_hash = hash;
234
235 match self.sync_engine.submit(item).await {
236 Ok(()) => {
237 result.submitted += 1;
238 }
239 Err(e) => {
240 warn!(
241 peer_id = %self.peer_id,
242 key = %key,
243 error = %e,
244 "Failed to submit item"
245 );
246 result.errors += 1;
247 }
248 }
249 }
250
251 for key in deletes {
253 match self.sync_engine.delete_replicated(key.clone()).await {
254 Ok(_deleted) => {
255 result.deleted += 1;
256 }
257 Err(e) => {
258 warn!(
259 peer_id = %self.peer_id,
260 key = %key,
261 error = %e,
262 "Failed to delete replicated item"
263 );
264 result.errors += 1;
265 }
266 }
267 }
268
269 info!(
270 peer_id = %self.peer_id,
271 total = result.total,
272 skipped = result.skipped,
273 submitted = result.submitted,
274 deleted = result.deleted,
275 errors = result.errors,
276 "Batch flush complete"
277 );
278
279 Ok(result)
280 }
281
282 async fn dedup_puts(
284 &self,
285 puts: Vec<(String, Vec<u8>, String, u64)>,
286 ) -> Result<Vec<(String, Vec<u8>, String, u64)>> {
287 if puts.is_empty() {
288 return Ok(Vec::new());
289 }
290
291 let mut to_submit = Vec::with_capacity(puts.len());
292 let mut join_set: JoinSet<(String, Vec<u8>, String, u64, bool)> = JoinSet::new();
293
294 for (key, data, hash, version) in puts {
296 let sync_engine = Arc::clone(&self.sync_engine);
297 let key_clone = key.clone();
298 let hash_clone = hash.clone();
299
300 join_set.spawn(async move {
301 let is_current = sync_engine
302 .is_current(&key_clone, &hash_clone)
303 .await
304 .unwrap_or(false);
305 (key, data, hash, version, is_current)
306 });
307 }
308
309 while let Some(result) = join_set.join_next().await {
311 match result {
312 Ok((key, data, hash, version, is_current)) => {
313 if !is_current {
314 to_submit.push((key, data, hash, version));
315 } else {
316 debug!(key = %key, "Skipping (already current)");
317 }
318 }
319 Err(e) => {
320 warn!(error = %e, "is_current check failed (JoinError)");
321 }
322 }
323 }
324
325 Ok(to_submit)
326 }
327}
328
329pub struct SharedBatchProcessor<S: SyncEngineRef> {
331 inner: Mutex<BatchProcessor<S>>,
332}
333
334impl<S: SyncEngineRef> SharedBatchProcessor<S> {
335 pub fn new(sync_engine: Arc<S>, peer_id: String, config: BatchConfig) -> Self {
337 Self {
338 inner: Mutex::new(BatchProcessor::new(sync_engine, peer_id, config)),
339 }
340 }
341
342 pub async fn add(&self, event: CdcEvent) -> Result<Option<BatchResult>> {
344 let mut processor = self.inner.lock().await;
345 processor.add(event);
346
347 if processor.should_flush() {
348 Ok(Some(processor.flush().await?))
349 } else {
350 Ok(None)
351 }
352 }
353
354 pub async fn flush(&self) -> Result<BatchResult> {
356 let mut processor = self.inner.lock().await;
357 processor.flush().await
358 }
359
360 pub async fn is_empty(&self) -> bool {
362 self.inner.lock().await.is_empty()
363 }
364
365 pub async fn len(&self) -> usize {
367 self.inner.lock().await.len()
368 }
369}
370
371#[cfg(test)]
372mod tests {
373 use super::*;
374 use crate::sync_engine::SyncResult;
375 use std::sync::atomic::{AtomicBool, AtomicUsize, Ordering};
376
377 struct TrackingSyncEngine {
379 submit_count: AtomicUsize,
380 delete_count: AtomicUsize,
381 is_current_count: AtomicUsize,
382 always_current: AtomicBool,
383 }
384
385 impl TrackingSyncEngine {
386 fn new() -> Self {
387 Self {
388 submit_count: AtomicUsize::new(0),
389 delete_count: AtomicUsize::new(0),
390 is_current_count: AtomicUsize::new(0),
391 always_current: AtomicBool::new(false),
392 }
393 }
394
395 fn set_always_current(&self, value: bool) {
396 self.always_current.store(value, Ordering::SeqCst);
397 }
398 }
399
400 impl SyncEngineRef for TrackingSyncEngine {
401 fn submit(
402 &self,
403 _item: SyncItem,
404 ) -> std::pin::Pin<Box<dyn std::future::Future<Output = SyncResult<()>> + Send + '_>> {
405 self.submit_count.fetch_add(1, Ordering::SeqCst);
406 Box::pin(async { Ok(()) })
407 }
408
409 fn delete(
410 &self,
411 _key: String,
412 ) -> std::pin::Pin<Box<dyn std::future::Future<Output = SyncResult<bool>> + Send + '_>> {
413 self.delete_count.fetch_add(1, Ordering::SeqCst);
414 Box::pin(async { Ok(true) })
415 }
416
417 fn delete_replicated(
418 &self,
419 _key: String,
420 ) -> std::pin::Pin<Box<dyn std::future::Future<Output = SyncResult<bool>> + Send + '_>> {
421 self.delete_count.fetch_add(1, Ordering::SeqCst);
422 Box::pin(async { Ok(true) })
423 }
424
425 fn is_current(
426 &self,
427 _key: &str,
428 _content_hash: &str,
429 ) -> std::pin::Pin<Box<dyn std::future::Future<Output = SyncResult<bool>> + Send + '_>> {
430 self.is_current_count.fetch_add(1, Ordering::SeqCst);
431 let result = self.always_current.load(Ordering::SeqCst);
432 Box::pin(async move { Ok(result) })
433 }
434
435 fn get_merkle_root(&self) -> crate::sync_engine::BoxFuture<'_, Option<[u8; 32]>> {
436 Box::pin(async { Ok(None) })
437 }
438
439 fn get_merkle_children(
440 &self,
441 _path: &str,
442 ) -> crate::sync_engine::BoxFuture<'_, Vec<(String, [u8; 32])>> {
443 Box::pin(async { Ok(Vec::new()) })
444 }
445
446 fn get(&self, _key: &str) -> crate::sync_engine::BoxFuture<'_, Option<Vec<u8>>> {
447 Box::pin(async { Ok(None) })
448 }
449 }
450
451 fn make_put_event(key: &str, data: &str) -> CdcEvent {
452 CdcEvent {
453 stream_id: "0-0".to_string(),
454 op: CdcOp::Put,
455 key: key.to_string(),
456 hash: Some(format!("hash_{}", key)),
457 data: Some(data.as_bytes().to_vec()),
458 meta: None,
459 }
460 }
461
462 fn make_delete_event(key: &str) -> CdcEvent {
463 CdcEvent {
464 stream_id: "0-0".to_string(),
465 op: CdcOp::Delete,
466 key: key.to_string(),
467 hash: None,
468 data: None,
469 meta: None,
470 }
471 }
472
473 #[tokio::test]
474 async fn test_batch_key_deduplication() {
475 let engine = Arc::new(TrackingSyncEngine::new());
476 let mut processor = BatchProcessor::new(
477 Arc::clone(&engine),
478 "test-peer".to_string(),
479 BatchConfig::testing(),
480 );
481
482 processor.add(make_put_event("user.1", "v1"));
484 processor.add(make_put_event("user.1", "v2"));
485 processor.add(make_put_event("user.1", "v3")); assert_eq!(processor.len(), 1);
488
489 let result = processor.flush().await.unwrap();
490 assert_eq!(result.total, 1);
491 assert_eq!(result.submitted, 1);
492 assert_eq!(engine.submit_count.load(Ordering::SeqCst), 1);
493 }
494
495 #[tokio::test]
496 async fn test_batch_dedup_skips_current() {
497 let engine = Arc::new(TrackingSyncEngine::new());
498 engine.set_always_current(true); let mut processor = BatchProcessor::new(
501 Arc::clone(&engine),
502 "test-peer".to_string(),
503 BatchConfig::testing(),
504 );
505
506 processor.add(make_put_event("user.1", "data1"));
507 processor.add(make_put_event("user.2", "data2"));
508
509 let result = processor.flush().await.unwrap();
510 assert_eq!(result.total, 2);
511 assert_eq!(result.skipped, 2);
512 assert_eq!(result.submitted, 0);
513
514 assert_eq!(engine.is_current_count.load(Ordering::SeqCst), 2);
516 assert_eq!(engine.submit_count.load(Ordering::SeqCst), 0);
518 }
519
520 #[tokio::test]
521 async fn test_batch_mixed_operations() {
522 let engine = Arc::new(TrackingSyncEngine::new());
523 let mut processor = BatchProcessor::new(
524 Arc::clone(&engine),
525 "test-peer".to_string(),
526 BatchConfig::testing(),
527 );
528
529 processor.add(make_put_event("user.1", "data1"));
530 processor.add(make_delete_event("user.2"));
531 processor.add(make_put_event("user.3", "data3"));
532
533 let result = processor.flush().await.unwrap();
534 assert_eq!(result.total, 3);
535 assert_eq!(result.submitted, 2); assert_eq!(result.deleted, 1); }
538
539 #[tokio::test]
540 async fn test_put_then_delete_same_key() {
541 let engine = Arc::new(TrackingSyncEngine::new());
542 let mut processor = BatchProcessor::new(
543 Arc::clone(&engine),
544 "test-peer".to_string(),
545 BatchConfig::testing(),
546 );
547
548 processor.add(make_put_event("user.1", "data"));
550 processor.add(make_delete_event("user.1"));
551
552 let result = processor.flush().await.unwrap();
553 assert_eq!(result.total, 1);
554 assert_eq!(result.submitted, 0);
555 assert_eq!(result.deleted, 1);
556 }
557
558 #[tokio::test]
559 async fn test_delete_then_put_same_key() {
560 let engine = Arc::new(TrackingSyncEngine::new());
561 let mut processor = BatchProcessor::new(
562 Arc::clone(&engine),
563 "test-peer".to_string(),
564 BatchConfig::testing(),
565 );
566
567 processor.add(make_delete_event("user.1"));
569 processor.add(make_put_event("user.1", "data"));
570
571 let result = processor.flush().await.unwrap();
572 assert_eq!(result.total, 1);
573 assert_eq!(result.submitted, 1);
574 assert_eq!(result.deleted, 0);
575 }
576
577 #[tokio::test]
578 async fn test_should_flush_by_size() {
579 let engine = Arc::new(TrackingSyncEngine::new());
580 let mut processor = BatchProcessor::new(
581 Arc::clone(&engine),
582 "test-peer".to_string(),
583 BatchConfig {
584 max_batch_size: 3,
585 max_batch_delay: Duration::from_secs(60),
586 max_concurrent_checks: 4,
587 },
588 );
589
590 processor.add(make_put_event("a", "1"));
591 assert!(!processor.should_flush());
592
593 processor.add(make_put_event("b", "2"));
594 assert!(!processor.should_flush());
595
596 processor.add(make_put_event("c", "3"));
597 assert!(processor.should_flush()); }
599
600 #[tokio::test]
601 async fn test_empty_flush() {
602 let engine = Arc::new(TrackingSyncEngine::new());
603 let mut processor = BatchProcessor::new(
604 Arc::clone(&engine),
605 "test-peer".to_string(),
606 BatchConfig::testing(),
607 );
608
609 let result = processor.flush().await.unwrap();
610 assert_eq!(result.total, 0);
611 assert!(result.is_success());
612 }
613
614 #[tokio::test]
615 async fn test_batch_is_empty() {
616 let engine = Arc::new(TrackingSyncEngine::new());
617 let mut processor = BatchProcessor::new(
618 Arc::clone(&engine),
619 "test-peer".to_string(),
620 BatchConfig::testing(),
621 );
622
623 assert!(processor.is_empty());
624 assert_eq!(processor.len(), 0);
625
626 processor.add(make_put_event("key", "data"));
627 assert!(!processor.is_empty());
628 assert_eq!(processor.len(), 1);
629
630 processor.flush().await.unwrap();
631 assert!(processor.is_empty());
632 assert_eq!(processor.len(), 0);
633 }
634
635 #[tokio::test]
636 async fn test_should_flush_by_time() {
637 let engine = Arc::new(TrackingSyncEngine::new());
638 let mut processor = BatchProcessor::new(
639 Arc::clone(&engine),
640 "test-peer".to_string(),
641 BatchConfig {
642 max_batch_size: 1000, max_batch_delay: Duration::from_millis(10),
644 max_concurrent_checks: 4,
645 },
646 );
647
648 processor.add(make_put_event("a", "1"));
649 assert!(!processor.should_flush());
650
651 tokio::time::sleep(Duration::from_millis(15)).await;
653 assert!(processor.should_flush());
654 }
655
656 #[tokio::test]
657 async fn test_batch_result_is_success() {
658 let mut result = BatchResult::default();
659 assert!(result.is_success());
660
661 result.errors = 1;
662 assert!(!result.is_success());
663
664 result.errors = 0;
665 result.submitted = 10;
666 result.skipped = 5;
667 assert!(result.is_success());
668 }
669
670 #[test]
671 fn test_batch_config_default() {
672 let config = BatchConfig::default();
673 assert_eq!(config.max_batch_size, 100);
674 assert_eq!(config.max_batch_delay, Duration::from_millis(50));
675 assert_eq!(config.max_concurrent_checks, 32);
676 }
677
678 #[test]
679 fn test_batch_config_testing() {
680 let config = BatchConfig::testing();
681 assert_eq!(config.max_batch_size, 10);
682 assert_eq!(config.max_batch_delay, Duration::from_millis(5));
683 assert_eq!(config.max_concurrent_checks, 4);
684 }
685
686 #[tokio::test]
687 async fn test_put_event_without_data() {
688 let engine = Arc::new(TrackingSyncEngine::new());
689 let mut processor = BatchProcessor::new(
690 Arc::clone(&engine),
691 "test-peer".to_string(),
692 BatchConfig::testing(),
693 );
694
695 let event = CdcEvent {
697 stream_id: "0-0".to_string(),
698 op: CdcOp::Put,
699 key: "key1".to_string(),
700 hash: Some("hash".to_string()),
701 data: None, meta: None,
703 };
704 processor.add(event);
705
706 assert_eq!(processor.len(), 0); }
708
709 #[tokio::test]
710 async fn test_put_event_without_hash() {
711 let engine = Arc::new(TrackingSyncEngine::new());
712 let mut processor = BatchProcessor::new(
713 Arc::clone(&engine),
714 "test-peer".to_string(),
715 BatchConfig::testing(),
716 );
717
718 let event = CdcEvent {
720 stream_id: "0-0".to_string(),
721 op: CdcOp::Put,
722 key: "key1".to_string(),
723 hash: None, data: Some(vec![1, 2, 3]),
725 meta: None,
726 };
727 processor.add(event);
728
729 assert_eq!(processor.len(), 0); }
731
732 #[tokio::test]
733 async fn test_multiple_puts_same_key_different_data() {
734 let engine = Arc::new(TrackingSyncEngine::new());
735 let mut processor = BatchProcessor::new(
736 Arc::clone(&engine),
737 "test-peer".to_string(),
738 BatchConfig::testing(),
739 );
740
741 processor.add(make_put_event("user.1", "first"));
743 processor.add(make_put_event("user.1", "second"));
745 processor.add(make_put_event("user.1", "third"));
747
748 assert_eq!(processor.len(), 1);
749
750 let result = processor.flush().await.unwrap();
751 assert_eq!(result.submitted, 1);
752 }
753
754 #[tokio::test]
755 async fn test_many_different_keys() {
756 let engine = Arc::new(TrackingSyncEngine::new());
757 let mut processor = BatchProcessor::new(
758 Arc::clone(&engine),
759 "test-peer".to_string(),
760 BatchConfig::testing(),
761 );
762
763 for i in 0..50 {
765 processor.add(make_put_event(&format!("key.{}", i), &format!("data{}", i)));
766 }
767
768 assert_eq!(processor.len(), 50);
769
770 let result = processor.flush().await.unwrap();
771 assert_eq!(result.total, 50);
772 assert_eq!(result.submitted, 50);
773 }
774
775 #[tokio::test]
776 async fn test_interleaved_operations() {
777 let engine = Arc::new(TrackingSyncEngine::new());
778 let mut processor = BatchProcessor::new(
779 Arc::clone(&engine),
780 "test-peer".to_string(),
781 BatchConfig::testing(),
782 );
783
784 processor.add(make_put_event("a", "1"));
786 processor.add(make_delete_event("b"));
787 processor.add(make_put_event("c", "3"));
788 processor.add(make_delete_event("d"));
789 processor.add(make_put_event("e", "5"));
790
791 assert_eq!(processor.len(), 5);
792
793 let result = processor.flush().await.unwrap();
794 assert_eq!(result.total, 5);
795 assert_eq!(result.submitted, 3);
796 assert_eq!(result.deleted, 2);
797 }
798}