replication_engine/
batch.rs

1// Copyright (c) 2025-2026 Adrian Robinson. Licensed under the AGPL-3.0.
2// See LICENSE file in the project root for full license text.
3
4//! Batch processor for CDC events.
5//!
6//! Collects events with key deduplication (latest wins) and flushes to sync-engine
7//! in batches for efficiency.
8//!
9//! # Design
10//!
11//! ```text
12//! CDC Events ──┬──▶ BatchProcessor ──┬──▶ Debounce (time/count)
13//!              │                     │
14//!              │ HashMap<key, state> │
15//!              │ (latest wins)       │
16//!              │                     ▼
17//!              └─────────────────────┼──▶ Parallel is_current() dedup
18//!                                    │
19//!                                    ▼
20//!                      submit_many() / delete_many()
21//! ```
22//!
23use crate::error::Result;
24use crate::stream::{CdcEvent, CdcOp};
25use crate::sync_engine::{SyncEngineRef, SyncItem};
26use std::collections::HashMap;
27use std::sync::Arc;
28use std::time::{Duration, Instant};
29use tokio::sync::Mutex;
30use tokio::task::JoinSet;
31use tracing::{debug, info, instrument, warn};
32
33/// Configuration for batch processing.
34#[derive(Debug, Clone)]
35pub struct BatchConfig {
36    /// Maximum events before forcing a flush.
37    pub max_batch_size: usize,
38    /// Maximum time to wait before flushing.
39    pub max_batch_delay: Duration,
40    /// Maximum concurrent is_current() checks.
41    pub max_concurrent_checks: usize,
42}
43
44impl Default for BatchConfig {
45    fn default() -> Self {
46        Self {
47            max_batch_size: 100,
48            max_batch_delay: Duration::from_millis(50),
49            max_concurrent_checks: 32,
50        }
51    }
52}
53
54impl BatchConfig {
55    /// Fast flush for testing.
56    pub fn testing() -> Self {
57        Self {
58            max_batch_size: 10,
59            max_batch_delay: Duration::from_millis(5),
60            max_concurrent_checks: 4,
61        }
62    }
63}
64
65/// Pending state for a key (latest wins).
66#[derive(Debug, Clone)]
67enum PendingOp {
68    Put {
69        data: Vec<u8>,
70        hash: String,
71        version: u64,
72    },
73    Delete,
74}
75
76/// Result of processing a batch.
77#[derive(Debug, Default)]
78pub struct BatchResult {
79    /// Number of events in the batch (after dedup).
80    pub total: usize,
81    /// Number of items skipped (already current).
82    pub skipped: usize,
83    /// Number of items submitted.
84    pub submitted: usize,
85    /// Number of items deleted.
86    pub deleted: usize,
87    /// Number of errors.
88    pub errors: usize,
89}
90
91impl BatchResult {
92    /// Check if all operations succeeded.
93    pub fn is_success(&self) -> bool {
94        self.errors == 0
95    }
96}
97
98/// Accumulates CDC events and flushes to sync-engine in batches.
99pub struct BatchProcessor<S: SyncEngineRef> {
100    /// Pending operations keyed by object ID (latest wins).
101    pending: HashMap<String, PendingOp>,
102    /// When the current batch started accumulating.
103    batch_start: Option<Instant>,
104    /// Configuration.
105    config: BatchConfig,
106    /// Sync engine to submit to.
107    sync_engine: Arc<S>,
108    /// Peer ID (for logging).
109    peer_id: String,
110}
111
112impl<S: SyncEngineRef> BatchProcessor<S> {
113    /// Create a new batch processor.
114    pub fn new(sync_engine: Arc<S>, peer_id: String, config: BatchConfig) -> Self {
115        Self {
116            pending: HashMap::new(),
117            batch_start: None,
118            config,
119            sync_engine,
120            peer_id,
121        }
122    }
123
124    /// Add a CDC event to the batch.
125    ///
126    /// If the same key has a pending operation, the new one replaces it (latest wins).
127    pub fn add(&mut self, event: CdcEvent) {
128        // Start batch timer on first event
129        if self.batch_start.is_none() {
130            self.batch_start = Some(Instant::now());
131        }
132
133        // Latest operation for this key wins
134        let op = match event.op {
135            CdcOp::Put => {
136                // PUT requires data and hash
137                if let (Some(data), Some(hash)) = (event.data, event.hash) {
138                    let version = event.meta.map(|m| m.version).unwrap_or(0);
139                    PendingOp::Put { data, hash, version }
140                } else {
141                    warn!(
142                        peer_id = %self.peer_id,
143                        key = %event.key,
144                        "PUT event missing data or hash, skipping"
145                    );
146                    return;
147                }
148            }
149            CdcOp::Delete => PendingOp::Delete,
150        };
151
152        self.pending.insert(event.key, op);
153    }
154
155    /// Check if the batch should be flushed.
156    pub fn should_flush(&self) -> bool {
157        // Size threshold
158        if self.pending.len() >= self.config.max_batch_size {
159            return true;
160        }
161
162        // Time threshold
163        if let Some(start) = self.batch_start {
164            if start.elapsed() >= self.config.max_batch_delay {
165                return true;
166            }
167        }
168
169        false
170    }
171
172    /// Number of pending events.
173    pub fn len(&self) -> usize {
174        self.pending.len()
175    }
176
177    /// Check if batch is empty.
178    pub fn is_empty(&self) -> bool {
179        self.pending.is_empty()
180    }
181
182    /// Flush the batch to sync-engine.
183    ///
184    /// 1. Partition by operation type
185    /// 2. Parallel is_current() checks for PUTs
186    /// 3. Submit filtered PUTs and DELETEs
187    #[instrument(skip(self), fields(peer_id = %self.peer_id))]
188    pub async fn flush(&mut self) -> Result<BatchResult> {
189        if self.pending.is_empty() {
190            return Ok(BatchResult::default());
191        }
192
193        let batch = std::mem::take(&mut self.pending);
194        self.batch_start = None;
195
196        let total = batch.len();
197        debug!(
198            peer_id = %self.peer_id,
199            batch_size = total,
200            "Flushing batch"
201        );
202
203        // Partition by operation type
204        let mut puts: Vec<(String, Vec<u8>, String, u64)> = Vec::new();
205        let mut deletes: Vec<String> = Vec::new();
206
207        for (key, op) in batch {
208            match op {
209                PendingOp::Put { data, hash, version } => {
210                    puts.push((key, data, hash, version));
211                }
212                PendingOp::Delete => {
213                    deletes.push(key);
214                }
215            }
216        }
217
218        let mut result = BatchResult {
219            total,
220            ..Default::default()
221        };
222
223        // Phase 1: Parallel is_current() checks for dedup
224        let filtered_puts = self.dedup_puts(puts).await?;
225        result.skipped = total - filtered_puts.len() - deletes.len();
226
227        // Phase 2: Submit filtered puts
228        for (key, data, hash, version) in filtered_puts {
229            let mut item = SyncItem::new(key.clone(), data);
230            item.version = version;
231            // Hash is already computed by SyncItem::new(), but we use the peer's hash
232            // to ensure we detect duplicates correctly during dedup phase
233            item.content_hash = hash;
234            
235            match self.sync_engine.submit(item).await {
236                Ok(()) => {
237                    result.submitted += 1;
238                }
239                Err(e) => {
240                    warn!(
241                        peer_id = %self.peer_id,
242                        key = %key,
243                        error = %e,
244                        "Failed to submit item"
245                    );
246                    result.errors += 1;
247                }
248            }
249        }
250
251        // Phase 3: Process deletes
252        for key in deletes {
253            match self.sync_engine.delete(key.clone()).await {
254                Ok(_deleted) => {
255                    result.deleted += 1;
256                }
257                Err(e) => {
258                    warn!(
259                        peer_id = %self.peer_id,
260                        key = %key,
261                        error = %e,
262                        "Failed to delete item"
263                    );
264                    result.errors += 1;
265                }
266            }
267        }
268
269        info!(
270            peer_id = %self.peer_id,
271            total = result.total,
272            skipped = result.skipped,
273            submitted = result.submitted,
274            deleted = result.deleted,
275            errors = result.errors,
276            "Batch flush complete"
277        );
278
279        Ok(result)
280    }
281
282    /// Filter out PUTs that are already current (dedup).
283    async fn dedup_puts(
284        &self,
285        puts: Vec<(String, Vec<u8>, String, u64)>,
286    ) -> Result<Vec<(String, Vec<u8>, String, u64)>> {
287        if puts.is_empty() {
288            return Ok(Vec::new());
289        }
290
291        let mut to_submit = Vec::with_capacity(puts.len());
292        let mut join_set: JoinSet<(String, Vec<u8>, String, u64, bool)> = JoinSet::new();
293
294        // Spawn parallel is_current() checks
295        for (key, data, hash, version) in puts {
296            let sync_engine = Arc::clone(&self.sync_engine);
297            let key_clone = key.clone();
298            let hash_clone = hash.clone();
299
300            join_set.spawn(async move {
301                let is_current = sync_engine
302                    .is_current(&key_clone, &hash_clone)
303                    .await
304                    .unwrap_or(false);
305                (key, data, hash, version, is_current)
306            });
307        }
308
309        // Collect results
310        while let Some(result) = join_set.join_next().await {
311            match result {
312                Ok((key, data, hash, version, is_current)) => {
313                    if !is_current {
314                        to_submit.push((key, data, hash, version));
315                    } else {
316                        debug!(key = %key, "Skipping (already current)");
317                    }
318                }
319                Err(e) => {
320                    warn!(error = %e, "is_current check failed (JoinError)");
321                }
322            }
323        }
324
325        Ok(to_submit)
326    }
327}
328
329/// Thread-safe wrapper around BatchProcessor.
330pub struct SharedBatchProcessor<S: SyncEngineRef> {
331    inner: Mutex<BatchProcessor<S>>,
332}
333
334impl<S: SyncEngineRef> SharedBatchProcessor<S> {
335    /// Create a new shared batch processor.
336    pub fn new(sync_engine: Arc<S>, peer_id: String, config: BatchConfig) -> Self {
337        Self {
338            inner: Mutex::new(BatchProcessor::new(sync_engine, peer_id, config)),
339        }
340    }
341
342    /// Add an event and potentially flush.
343    pub async fn add(&self, event: CdcEvent) -> Result<Option<BatchResult>> {
344        let mut processor = self.inner.lock().await;
345        processor.add(event);
346
347        if processor.should_flush() {
348            Ok(Some(processor.flush().await?))
349        } else {
350            Ok(None)
351        }
352    }
353
354    /// Force a flush regardless of thresholds.
355    pub async fn flush(&self) -> Result<BatchResult> {
356        let mut processor = self.inner.lock().await;
357        processor.flush().await
358    }
359
360    /// Check if there are pending events.
361    pub async fn is_empty(&self) -> bool {
362        self.inner.lock().await.is_empty()
363    }
364
365    /// Get pending event count.
366    pub async fn len(&self) -> usize {
367        self.inner.lock().await.len()
368    }
369}
370
371#[cfg(test)]
372mod tests {
373    use super::*;
374    use crate::sync_engine::SyncResult;
375    use std::sync::atomic::{AtomicBool, AtomicUsize, Ordering};
376
377    /// Test sync engine that tracks calls.
378    struct TrackingSyncEngine {
379        submit_count: AtomicUsize,
380        delete_count: AtomicUsize,
381        is_current_count: AtomicUsize,
382        always_current: AtomicBool,
383    }
384
385    impl TrackingSyncEngine {
386        fn new() -> Self {
387            Self {
388                submit_count: AtomicUsize::new(0),
389                delete_count: AtomicUsize::new(0),
390                is_current_count: AtomicUsize::new(0),
391                always_current: AtomicBool::new(false),
392            }
393        }
394
395        fn set_always_current(&self, value: bool) {
396            self.always_current.store(value, Ordering::SeqCst);
397        }
398    }
399
400    impl SyncEngineRef for TrackingSyncEngine {
401        fn submit(
402            &self,
403            _item: SyncItem,
404        ) -> std::pin::Pin<Box<dyn std::future::Future<Output = SyncResult<()>> + Send + '_>> {
405            self.submit_count.fetch_add(1, Ordering::SeqCst);
406            Box::pin(async { Ok(()) })
407        }
408
409        fn delete(
410            &self,
411            _key: String,
412        ) -> std::pin::Pin<Box<dyn std::future::Future<Output = SyncResult<bool>> + Send + '_>> {
413            self.delete_count.fetch_add(1, Ordering::SeqCst);
414            Box::pin(async { Ok(true) })
415        }
416
417        fn is_current(
418            &self,
419            _key: &str,
420            _content_hash: &str,
421        ) -> std::pin::Pin<Box<dyn std::future::Future<Output = SyncResult<bool>> + Send + '_>> {
422            self.is_current_count.fetch_add(1, Ordering::SeqCst);
423            let result = self.always_current.load(Ordering::SeqCst);
424            Box::pin(async move { Ok(result) })
425        }
426
427        fn get_merkle_root(&self) -> crate::sync_engine::BoxFuture<'_, Option<[u8; 32]>> {
428            Box::pin(async { Ok(None) })
429        }
430
431        fn get_merkle_children(
432            &self,
433            _path: &str,
434        ) -> crate::sync_engine::BoxFuture<'_, Vec<(String, [u8; 32])>> {
435            Box::pin(async { Ok(Vec::new()) })
436        }
437
438        fn get(&self, _key: &str) -> crate::sync_engine::BoxFuture<'_, Option<Vec<u8>>> {
439            Box::pin(async { Ok(None) })
440        }
441    }
442
443    fn make_put_event(key: &str, data: &str) -> CdcEvent {
444        CdcEvent {
445            stream_id: "0-0".to_string(),
446            op: CdcOp::Put,
447            key: key.to_string(),
448            hash: Some(format!("hash_{}", key)),
449            data: Some(data.as_bytes().to_vec()),
450            meta: None,
451        }
452    }
453
454    fn make_delete_event(key: &str) -> CdcEvent {
455        CdcEvent {
456            stream_id: "0-0".to_string(),
457            op: CdcOp::Delete,
458            key: key.to_string(),
459            hash: None,
460            data: None,
461            meta: None,
462        }
463    }
464
465    #[tokio::test]
466    async fn test_batch_key_deduplication() {
467        let engine = Arc::new(TrackingSyncEngine::new());
468        let mut processor = BatchProcessor::new(
469            Arc::clone(&engine),
470            "test-peer".to_string(),
471            BatchConfig::testing(),
472        );
473
474        // Add multiple events for same key - only latest should be processed
475        processor.add(make_put_event("user.1", "v1"));
476        processor.add(make_put_event("user.1", "v2"));
477        processor.add(make_put_event("user.1", "v3")); // This one wins
478
479        assert_eq!(processor.len(), 1);
480
481        let result = processor.flush().await.unwrap();
482        assert_eq!(result.total, 1);
483        assert_eq!(result.submitted, 1);
484        assert_eq!(engine.submit_count.load(Ordering::SeqCst), 1);
485    }
486
487    #[tokio::test]
488    async fn test_batch_dedup_skips_current() {
489        let engine = Arc::new(TrackingSyncEngine::new());
490        engine.set_always_current(true); // Everything is already current
491
492        let mut processor = BatchProcessor::new(
493            Arc::clone(&engine),
494            "test-peer".to_string(),
495            BatchConfig::testing(),
496        );
497
498        processor.add(make_put_event("user.1", "data1"));
499        processor.add(make_put_event("user.2", "data2"));
500
501        let result = processor.flush().await.unwrap();
502        assert_eq!(result.total, 2);
503        assert_eq!(result.skipped, 2);
504        assert_eq!(result.submitted, 0);
505
506        // is_current should be called for each
507        assert_eq!(engine.is_current_count.load(Ordering::SeqCst), 2);
508        // submit should not be called
509        assert_eq!(engine.submit_count.load(Ordering::SeqCst), 0);
510    }
511
512    #[tokio::test]
513    async fn test_batch_mixed_operations() {
514        let engine = Arc::new(TrackingSyncEngine::new());
515        let mut processor = BatchProcessor::new(
516            Arc::clone(&engine),
517            "test-peer".to_string(),
518            BatchConfig::testing(),
519        );
520
521        processor.add(make_put_event("user.1", "data1"));
522        processor.add(make_delete_event("user.2"));
523        processor.add(make_put_event("user.3", "data3"));
524
525        let result = processor.flush().await.unwrap();
526        assert_eq!(result.total, 3);
527        assert_eq!(result.submitted, 2); // 2 puts
528        assert_eq!(result.deleted, 1);   // 1 delete
529    }
530
531    #[tokio::test]
532    async fn test_put_then_delete_same_key() {
533        let engine = Arc::new(TrackingSyncEngine::new());
534        let mut processor = BatchProcessor::new(
535            Arc::clone(&engine),
536            "test-peer".to_string(),
537            BatchConfig::testing(),
538        );
539
540        // PUT then DELETE - DELETE wins (latest)
541        processor.add(make_put_event("user.1", "data"));
542        processor.add(make_delete_event("user.1"));
543
544        let result = processor.flush().await.unwrap();
545        assert_eq!(result.total, 1);
546        assert_eq!(result.submitted, 0);
547        assert_eq!(result.deleted, 1);
548    }
549
550    #[tokio::test]
551    async fn test_delete_then_put_same_key() {
552        let engine = Arc::new(TrackingSyncEngine::new());
553        let mut processor = BatchProcessor::new(
554            Arc::clone(&engine),
555            "test-peer".to_string(),
556            BatchConfig::testing(),
557        );
558
559        // DELETE then PUT - PUT wins (latest)
560        processor.add(make_delete_event("user.1"));
561        processor.add(make_put_event("user.1", "data"));
562
563        let result = processor.flush().await.unwrap();
564        assert_eq!(result.total, 1);
565        assert_eq!(result.submitted, 1);
566        assert_eq!(result.deleted, 0);
567    }
568
569    #[tokio::test]
570    async fn test_should_flush_by_size() {
571        let engine = Arc::new(TrackingSyncEngine::new());
572        let mut processor = BatchProcessor::new(
573            Arc::clone(&engine),
574            "test-peer".to_string(),
575            BatchConfig {
576                max_batch_size: 3,
577                max_batch_delay: Duration::from_secs(60),
578                max_concurrent_checks: 4,
579            },
580        );
581
582        processor.add(make_put_event("a", "1"));
583        assert!(!processor.should_flush());
584
585        processor.add(make_put_event("b", "2"));
586        assert!(!processor.should_flush());
587
588        processor.add(make_put_event("c", "3"));
589        assert!(processor.should_flush()); // Hit max_batch_size
590    }
591
592    #[tokio::test]
593    async fn test_empty_flush() {
594        let engine = Arc::new(TrackingSyncEngine::new());
595        let mut processor = BatchProcessor::new(
596            Arc::clone(&engine),
597            "test-peer".to_string(),
598            BatchConfig::testing(),
599        );
600
601        let result = processor.flush().await.unwrap();
602        assert_eq!(result.total, 0);
603        assert!(result.is_success());
604    }
605
606    #[tokio::test]
607    async fn test_batch_is_empty() {
608        let engine = Arc::new(TrackingSyncEngine::new());
609        let mut processor = BatchProcessor::new(
610            Arc::clone(&engine),
611            "test-peer".to_string(),
612            BatchConfig::testing(),
613        );
614
615        assert!(processor.is_empty());
616        assert_eq!(processor.len(), 0);
617
618        processor.add(make_put_event("key", "data"));
619        assert!(!processor.is_empty());
620        assert_eq!(processor.len(), 1);
621
622        processor.flush().await.unwrap();
623        assert!(processor.is_empty());
624        assert_eq!(processor.len(), 0);
625    }
626
627    #[tokio::test]
628    async fn test_should_flush_by_time() {
629        let engine = Arc::new(TrackingSyncEngine::new());
630        let mut processor = BatchProcessor::new(
631            Arc::clone(&engine),
632            "test-peer".to_string(),
633            BatchConfig {
634                max_batch_size: 1000, // High, won't trigger
635                max_batch_delay: Duration::from_millis(10),
636                max_concurrent_checks: 4,
637            },
638        );
639
640        processor.add(make_put_event("a", "1"));
641        assert!(!processor.should_flush());
642
643        // Wait for delay
644        tokio::time::sleep(Duration::from_millis(15)).await;
645        assert!(processor.should_flush());
646    }
647
648    #[tokio::test]
649    async fn test_batch_result_is_success() {
650        let mut result = BatchResult::default();
651        assert!(result.is_success());
652
653        result.errors = 1;
654        assert!(!result.is_success());
655
656        result.errors = 0;
657        result.submitted = 10;
658        result.skipped = 5;
659        assert!(result.is_success());
660    }
661
662    #[test]
663    fn test_batch_config_default() {
664        let config = BatchConfig::default();
665        assert_eq!(config.max_batch_size, 100);
666        assert_eq!(config.max_batch_delay, Duration::from_millis(50));
667        assert_eq!(config.max_concurrent_checks, 32);
668    }
669
670    #[test]
671    fn test_batch_config_testing() {
672        let config = BatchConfig::testing();
673        assert_eq!(config.max_batch_size, 10);
674        assert_eq!(config.max_batch_delay, Duration::from_millis(5));
675        assert_eq!(config.max_concurrent_checks, 4);
676    }
677
678    #[tokio::test]
679    async fn test_put_event_without_data() {
680        let engine = Arc::new(TrackingSyncEngine::new());
681        let mut processor = BatchProcessor::new(
682            Arc::clone(&engine),
683            "test-peer".to_string(),
684            BatchConfig::testing(),
685        );
686
687        // PUT without data should be skipped
688        let event = CdcEvent {
689            stream_id: "0-0".to_string(),
690            op: CdcOp::Put,
691            key: "key1".to_string(),
692            hash: Some("hash".to_string()),
693            data: None, // Missing data
694            meta: None,
695        };
696        processor.add(event);
697
698        assert_eq!(processor.len(), 0); // Should not be added
699    }
700
701    #[tokio::test]
702    async fn test_put_event_without_hash() {
703        let engine = Arc::new(TrackingSyncEngine::new());
704        let mut processor = BatchProcessor::new(
705            Arc::clone(&engine),
706            "test-peer".to_string(),
707            BatchConfig::testing(),
708        );
709
710        // PUT without hash should be skipped
711        let event = CdcEvent {
712            stream_id: "0-0".to_string(),
713            op: CdcOp::Put,
714            key: "key1".to_string(),
715            hash: None, // Missing hash
716            data: Some(vec![1, 2, 3]),
717            meta: None,
718        };
719        processor.add(event);
720
721        assert_eq!(processor.len(), 0); // Should not be added
722    }
723
724    #[tokio::test]
725    async fn test_multiple_puts_same_key_different_data() {
726        let engine = Arc::new(TrackingSyncEngine::new());
727        let mut processor = BatchProcessor::new(
728            Arc::clone(&engine),
729            "test-peer".to_string(),
730            BatchConfig::testing(),
731        );
732
733        // First put
734        processor.add(make_put_event("user.1", "first"));
735        // Second put with different data - should replace
736        processor.add(make_put_event("user.1", "second"));
737        // Third put - final value
738        processor.add(make_put_event("user.1", "third"));
739
740        assert_eq!(processor.len(), 1);
741
742        let result = processor.flush().await.unwrap();
743        assert_eq!(result.submitted, 1);
744    }
745
746    #[tokio::test]
747    async fn test_many_different_keys() {
748        let engine = Arc::new(TrackingSyncEngine::new());
749        let mut processor = BatchProcessor::new(
750            Arc::clone(&engine),
751            "test-peer".to_string(),
752            BatchConfig::testing(),
753        );
754
755        // Many different keys
756        for i in 0..50 {
757            processor.add(make_put_event(&format!("key.{}", i), &format!("data{}", i)));
758        }
759
760        assert_eq!(processor.len(), 50);
761
762        let result = processor.flush().await.unwrap();
763        assert_eq!(result.total, 50);
764        assert_eq!(result.submitted, 50);
765    }
766
767    #[tokio::test]
768    async fn test_interleaved_operations() {
769        let engine = Arc::new(TrackingSyncEngine::new());
770        let mut processor = BatchProcessor::new(
771            Arc::clone(&engine),
772            "test-peer".to_string(),
773            BatchConfig::testing(),
774        );
775
776        // Interleaved PUTs and DELETEs on different keys
777        processor.add(make_put_event("a", "1"));
778        processor.add(make_delete_event("b"));
779        processor.add(make_put_event("c", "3"));
780        processor.add(make_delete_event("d"));
781        processor.add(make_put_event("e", "5"));
782
783        assert_eq!(processor.len(), 5);
784
785        let result = processor.flush().await.unwrap();
786        assert_eq!(result.total, 5);
787        assert_eq!(result.submitted, 3);
788        assert_eq!(result.deleted, 2);
789    }
790}