replication_engine/
batch.rs

1// Copyright (c) 2025-2026 Adrian Robinson. Licensed under the AGPL-3.0.
2// See LICENSE file in the project root for full license text.
3
4//! Batch processor for CDC events.
5//!
6//! Collects events with key deduplication (latest wins) and flushes to sync-engine
7//! in batches for efficiency.
8//!
9//! # Design
10//!
11//! ```text
12//! CDC Events ──┬──▶ BatchProcessor ──┬──▶ Debounce (time/count)
13//!              │                     │
14//!              │ HashMap<key, state> │
15//!              │ (latest wins)       │
16//!              │                     ▼
17//!              └─────────────────────┼──▶ Parallel is_current() dedup
18//!                                    │
19//!                                    ▼
20//!                      submit_many() / delete_many()
21//! ```
22//!
23use crate::error::Result;
24use crate::stream::{CdcEvent, CdcOp};
25use crate::sync_engine::{SyncEngineRef, SyncItem};
26use std::collections::HashMap;
27use std::sync::Arc;
28use std::time::{Duration, Instant};
29use tokio::sync::Mutex;
30use tokio::task::JoinSet;
31use tracing::{debug, info, instrument, warn};
32
33/// Configuration for batch processing.
34#[derive(Debug, Clone)]
35pub struct BatchConfig {
36    /// Maximum events before forcing a flush.
37    pub max_batch_size: usize,
38    /// Maximum time to wait before flushing.
39    pub max_batch_delay: Duration,
40    /// Maximum concurrent is_current() checks.
41    pub max_concurrent_checks: usize,
42}
43
44impl Default for BatchConfig {
45    fn default() -> Self {
46        Self {
47            max_batch_size: 100,
48            max_batch_delay: Duration::from_millis(50),
49            max_concurrent_checks: 32,
50        }
51    }
52}
53
54impl BatchConfig {
55    /// Fast flush for testing.
56    pub fn testing() -> Self {
57        Self {
58            max_batch_size: 10,
59            max_batch_delay: Duration::from_millis(5),
60            max_concurrent_checks: 4,
61        }
62    }
63}
64
65/// Pending state for a key (latest wins).
66#[derive(Debug, Clone)]
67enum PendingOp {
68    Put {
69        data: Vec<u8>,
70        hash: String,
71        version: u64,
72    },
73    Delete,
74}
75
76/// Result of processing a batch.
77#[derive(Debug, Default)]
78pub struct BatchResult {
79    /// Number of events in the batch (after dedup).
80    pub total: usize,
81    /// Number of items skipped (already current).
82    pub skipped: usize,
83    /// Number of items submitted.
84    pub submitted: usize,
85    /// Number of items deleted.
86    pub deleted: usize,
87    /// Number of errors.
88    pub errors: usize,
89}
90
91impl BatchResult {
92    /// Check if all operations succeeded.
93    pub fn is_success(&self) -> bool {
94        self.errors == 0
95    }
96}
97
98/// Accumulates CDC events and flushes to sync-engine in batches.
99pub struct BatchProcessor<S: SyncEngineRef> {
100    /// Pending operations keyed by object ID (latest wins).
101    pending: HashMap<String, PendingOp>,
102    /// When the current batch started accumulating.
103    batch_start: Option<Instant>,
104    /// Configuration.
105    config: BatchConfig,
106    /// Sync engine to submit to.
107    sync_engine: Arc<S>,
108    /// Peer ID (for logging).
109    peer_id: String,
110}
111
112impl<S: SyncEngineRef> BatchProcessor<S> {
113    /// Create a new batch processor.
114    pub fn new(sync_engine: Arc<S>, peer_id: String, config: BatchConfig) -> Self {
115        Self {
116            pending: HashMap::new(),
117            batch_start: None,
118            config,
119            sync_engine,
120            peer_id,
121        }
122    }
123
124    /// Add a CDC event to the batch.
125    ///
126    /// If the same key has a pending operation, the new one replaces it (latest wins).
127    pub fn add(&mut self, event: CdcEvent) {
128        // Start batch timer on first event
129        if self.batch_start.is_none() {
130            self.batch_start = Some(Instant::now());
131        }
132
133        // Latest operation for this key wins
134        let op = match event.op {
135            CdcOp::Put => {
136                // PUT requires data and hash
137                if let (Some(data), Some(hash)) = (event.data, event.hash) {
138                    let version = event.meta.map(|m| m.version).unwrap_or(0);
139                    PendingOp::Put { data, hash, version }
140                } else {
141                    warn!(
142                        peer_id = %self.peer_id,
143                        key = %event.key,
144                        "PUT event missing data or hash, skipping"
145                    );
146                    return;
147                }
148            }
149            CdcOp::Delete => PendingOp::Delete,
150        };
151
152        self.pending.insert(event.key, op);
153    }
154
155    /// Check if the batch should be flushed.
156    pub fn should_flush(&self) -> bool {
157        // Size threshold
158        if self.pending.len() >= self.config.max_batch_size {
159            return true;
160        }
161
162        // Time threshold
163        if let Some(start) = self.batch_start {
164            if start.elapsed() >= self.config.max_batch_delay {
165                return true;
166            }
167        }
168
169        false
170    }
171
172    /// Number of pending events.
173    pub fn len(&self) -> usize {
174        self.pending.len()
175    }
176
177    /// Check if batch is empty.
178    pub fn is_empty(&self) -> bool {
179        self.pending.is_empty()
180    }
181
182    /// Flush the batch to sync-engine.
183    ///
184    /// 1. Partition by operation type
185    /// 2. Parallel is_current() checks for PUTs
186    /// 3. Submit filtered PUTs and DELETEs
187    #[instrument(skip(self), fields(peer_id = %self.peer_id))]
188    pub async fn flush(&mut self) -> Result<BatchResult> {
189        if self.pending.is_empty() {
190            return Ok(BatchResult::default());
191        }
192
193        let batch = std::mem::take(&mut self.pending);
194        self.batch_start = None;
195
196        let total = batch.len();
197        debug!(
198            peer_id = %self.peer_id,
199            batch_size = total,
200            "Flushing batch"
201        );
202
203        // Partition by operation type
204        let mut puts: Vec<(String, Vec<u8>, String, u64)> = Vec::new();
205        let mut deletes: Vec<String> = Vec::new();
206
207        for (key, op) in batch {
208            match op {
209                PendingOp::Put { data, hash, version } => {
210                    puts.push((key, data, hash, version));
211                }
212                PendingOp::Delete => {
213                    deletes.push(key);
214                }
215            }
216        }
217
218        let mut result = BatchResult {
219            total,
220            ..Default::default()
221        };
222
223        // Phase 1: Parallel is_current() checks for dedup
224        let filtered_puts = self.dedup_puts(puts).await?;
225        result.skipped = total - filtered_puts.len() - deletes.len();
226
227        // Phase 2: Submit filtered puts
228        for (key, data, hash, version) in filtered_puts {
229            let mut item = SyncItem::new(key.clone(), data);
230            item.version = version;
231            // Hash is already computed by SyncItem::new(), but we use the peer's hash
232            // to ensure we detect duplicates correctly during dedup phase
233            item.content_hash = hash;
234            
235            match self.sync_engine.submit(item).await {
236                Ok(()) => {
237                    result.submitted += 1;
238                }
239                Err(e) => {
240                    warn!(
241                        peer_id = %self.peer_id,
242                        key = %key,
243                        error = %e,
244                        "Failed to submit item"
245                    );
246                    result.errors += 1;
247                }
248            }
249        }
250
251        // Phase 3: Process deletes (use delete_replicated to avoid CDC loops)
252        for key in deletes {
253            match self.sync_engine.delete_replicated(key.clone()).await {
254                Ok(_deleted) => {
255                    result.deleted += 1;
256                }
257                Err(e) => {
258                    warn!(
259                        peer_id = %self.peer_id,
260                        key = %key,
261                        error = %e,
262                        "Failed to delete replicated item"
263                    );
264                    result.errors += 1;
265                }
266            }
267        }
268
269        info!(
270            peer_id = %self.peer_id,
271            total = result.total,
272            skipped = result.skipped,
273            submitted = result.submitted,
274            deleted = result.deleted,
275            errors = result.errors,
276            "Batch flush complete"
277        );
278
279        Ok(result)
280    }
281
282    /// Filter out PUTs that are already current (dedup).
283    async fn dedup_puts(
284        &self,
285        puts: Vec<(String, Vec<u8>, String, u64)>,
286    ) -> Result<Vec<(String, Vec<u8>, String, u64)>> {
287        if puts.is_empty() {
288            return Ok(Vec::new());
289        }
290
291        let mut to_submit = Vec::with_capacity(puts.len());
292        let mut join_set: JoinSet<(String, Vec<u8>, String, u64, bool)> = JoinSet::new();
293
294        // Spawn parallel is_current() checks
295        for (key, data, hash, version) in puts {
296            let sync_engine = Arc::clone(&self.sync_engine);
297            let key_clone = key.clone();
298            let hash_clone = hash.clone();
299
300            join_set.spawn(async move {
301                let is_current = sync_engine
302                    .is_current(&key_clone, &hash_clone)
303                    .await
304                    .unwrap_or(false);
305                (key, data, hash, version, is_current)
306            });
307        }
308
309        // Collect results
310        while let Some(result) = join_set.join_next().await {
311            match result {
312                Ok((key, data, hash, version, is_current)) => {
313                    if !is_current {
314                        to_submit.push((key, data, hash, version));
315                    } else {
316                        debug!(key = %key, "Skipping (already current)");
317                    }
318                }
319                Err(e) => {
320                    warn!(error = %e, "is_current check failed (JoinError)");
321                }
322            }
323        }
324
325        Ok(to_submit)
326    }
327}
328
329/// Thread-safe wrapper around BatchProcessor.
330pub struct SharedBatchProcessor<S: SyncEngineRef> {
331    inner: Mutex<BatchProcessor<S>>,
332}
333
334impl<S: SyncEngineRef> SharedBatchProcessor<S> {
335    /// Create a new shared batch processor.
336    pub fn new(sync_engine: Arc<S>, peer_id: String, config: BatchConfig) -> Self {
337        Self {
338            inner: Mutex::new(BatchProcessor::new(sync_engine, peer_id, config)),
339        }
340    }
341
342    /// Add an event and potentially flush.
343    pub async fn add(&self, event: CdcEvent) -> Result<Option<BatchResult>> {
344        let mut processor = self.inner.lock().await;
345        processor.add(event);
346
347        if processor.should_flush() {
348            Ok(Some(processor.flush().await?))
349        } else {
350            Ok(None)
351        }
352    }
353
354    /// Force a flush regardless of thresholds.
355    pub async fn flush(&self) -> Result<BatchResult> {
356        let mut processor = self.inner.lock().await;
357        processor.flush().await
358    }
359
360    /// Check if there are pending events.
361    pub async fn is_empty(&self) -> bool {
362        self.inner.lock().await.is_empty()
363    }
364
365    /// Get pending event count.
366    pub async fn len(&self) -> usize {
367        self.inner.lock().await.len()
368    }
369}
370
371#[cfg(test)]
372mod tests {
373    use super::*;
374    use crate::sync_engine::SyncResult;
375    use std::sync::atomic::{AtomicBool, AtomicUsize, Ordering};
376
377    /// Test sync engine that tracks calls.
378    struct TrackingSyncEngine {
379        submit_count: AtomicUsize,
380        delete_count: AtomicUsize,
381        is_current_count: AtomicUsize,
382        always_current: AtomicBool,
383    }
384
385    impl TrackingSyncEngine {
386        fn new() -> Self {
387            Self {
388                submit_count: AtomicUsize::new(0),
389                delete_count: AtomicUsize::new(0),
390                is_current_count: AtomicUsize::new(0),
391                always_current: AtomicBool::new(false),
392            }
393        }
394
395        fn set_always_current(&self, value: bool) {
396            self.always_current.store(value, Ordering::SeqCst);
397        }
398    }
399
400    impl SyncEngineRef for TrackingSyncEngine {
401        fn submit(
402            &self,
403            _item: SyncItem,
404        ) -> std::pin::Pin<Box<dyn std::future::Future<Output = SyncResult<()>> + Send + '_>> {
405            self.submit_count.fetch_add(1, Ordering::SeqCst);
406            Box::pin(async { Ok(()) })
407        }
408
409        fn delete(
410            &self,
411            _key: String,
412        ) -> std::pin::Pin<Box<dyn std::future::Future<Output = SyncResult<bool>> + Send + '_>> {
413            self.delete_count.fetch_add(1, Ordering::SeqCst);
414            Box::pin(async { Ok(true) })
415        }
416
417        fn delete_replicated(
418            &self,
419            _key: String,
420        ) -> std::pin::Pin<Box<dyn std::future::Future<Output = SyncResult<bool>> + Send + '_>> {
421            self.delete_count.fetch_add(1, Ordering::SeqCst);
422            Box::pin(async { Ok(true) })
423        }
424
425        fn is_current(
426            &self,
427            _key: &str,
428            _content_hash: &str,
429        ) -> std::pin::Pin<Box<dyn std::future::Future<Output = SyncResult<bool>> + Send + '_>> {
430            self.is_current_count.fetch_add(1, Ordering::SeqCst);
431            let result = self.always_current.load(Ordering::SeqCst);
432            Box::pin(async move { Ok(result) })
433        }
434
435        fn get_merkle_root(&self) -> crate::sync_engine::BoxFuture<'_, Option<[u8; 32]>> {
436            Box::pin(async { Ok(None) })
437        }
438
439        fn get_merkle_children(
440            &self,
441            _path: &str,
442        ) -> crate::sync_engine::BoxFuture<'_, Vec<(String, [u8; 32])>> {
443            Box::pin(async { Ok(Vec::new()) })
444        }
445
446        fn get(&self, _key: &str) -> crate::sync_engine::BoxFuture<'_, Option<Vec<u8>>> {
447            Box::pin(async { Ok(None) })
448        }
449    }
450
451    fn make_put_event(key: &str, data: &str) -> CdcEvent {
452        CdcEvent {
453            stream_id: "0-0".to_string(),
454            op: CdcOp::Put,
455            key: key.to_string(),
456            hash: Some(format!("hash_{}", key)),
457            data: Some(data.as_bytes().to_vec()),
458            meta: None,
459        }
460    }
461
462    fn make_delete_event(key: &str) -> CdcEvent {
463        CdcEvent {
464            stream_id: "0-0".to_string(),
465            op: CdcOp::Delete,
466            key: key.to_string(),
467            hash: None,
468            data: None,
469            meta: None,
470        }
471    }
472
473    #[tokio::test]
474    async fn test_batch_key_deduplication() {
475        let engine = Arc::new(TrackingSyncEngine::new());
476        let mut processor = BatchProcessor::new(
477            Arc::clone(&engine),
478            "test-peer".to_string(),
479            BatchConfig::testing(),
480        );
481
482        // Add multiple events for same key - only latest should be processed
483        processor.add(make_put_event("user.1", "v1"));
484        processor.add(make_put_event("user.1", "v2"));
485        processor.add(make_put_event("user.1", "v3")); // This one wins
486
487        assert_eq!(processor.len(), 1);
488
489        let result = processor.flush().await.unwrap();
490        assert_eq!(result.total, 1);
491        assert_eq!(result.submitted, 1);
492        assert_eq!(engine.submit_count.load(Ordering::SeqCst), 1);
493    }
494
495    #[tokio::test]
496    async fn test_batch_dedup_skips_current() {
497        let engine = Arc::new(TrackingSyncEngine::new());
498        engine.set_always_current(true); // Everything is already current
499
500        let mut processor = BatchProcessor::new(
501            Arc::clone(&engine),
502            "test-peer".to_string(),
503            BatchConfig::testing(),
504        );
505
506        processor.add(make_put_event("user.1", "data1"));
507        processor.add(make_put_event("user.2", "data2"));
508
509        let result = processor.flush().await.unwrap();
510        assert_eq!(result.total, 2);
511        assert_eq!(result.skipped, 2);
512        assert_eq!(result.submitted, 0);
513
514        // is_current should be called for each
515        assert_eq!(engine.is_current_count.load(Ordering::SeqCst), 2);
516        // submit should not be called
517        assert_eq!(engine.submit_count.load(Ordering::SeqCst), 0);
518    }
519
520    #[tokio::test]
521    async fn test_batch_mixed_operations() {
522        let engine = Arc::new(TrackingSyncEngine::new());
523        let mut processor = BatchProcessor::new(
524            Arc::clone(&engine),
525            "test-peer".to_string(),
526            BatchConfig::testing(),
527        );
528
529        processor.add(make_put_event("user.1", "data1"));
530        processor.add(make_delete_event("user.2"));
531        processor.add(make_put_event("user.3", "data3"));
532
533        let result = processor.flush().await.unwrap();
534        assert_eq!(result.total, 3);
535        assert_eq!(result.submitted, 2); // 2 puts
536        assert_eq!(result.deleted, 1);   // 1 delete
537    }
538
539    #[tokio::test]
540    async fn test_put_then_delete_same_key() {
541        let engine = Arc::new(TrackingSyncEngine::new());
542        let mut processor = BatchProcessor::new(
543            Arc::clone(&engine),
544            "test-peer".to_string(),
545            BatchConfig::testing(),
546        );
547
548        // PUT then DELETE - DELETE wins (latest)
549        processor.add(make_put_event("user.1", "data"));
550        processor.add(make_delete_event("user.1"));
551
552        let result = processor.flush().await.unwrap();
553        assert_eq!(result.total, 1);
554        assert_eq!(result.submitted, 0);
555        assert_eq!(result.deleted, 1);
556    }
557
558    #[tokio::test]
559    async fn test_delete_then_put_same_key() {
560        let engine = Arc::new(TrackingSyncEngine::new());
561        let mut processor = BatchProcessor::new(
562            Arc::clone(&engine),
563            "test-peer".to_string(),
564            BatchConfig::testing(),
565        );
566
567        // DELETE then PUT - PUT wins (latest)
568        processor.add(make_delete_event("user.1"));
569        processor.add(make_put_event("user.1", "data"));
570
571        let result = processor.flush().await.unwrap();
572        assert_eq!(result.total, 1);
573        assert_eq!(result.submitted, 1);
574        assert_eq!(result.deleted, 0);
575    }
576
577    #[tokio::test]
578    async fn test_should_flush_by_size() {
579        let engine = Arc::new(TrackingSyncEngine::new());
580        let mut processor = BatchProcessor::new(
581            Arc::clone(&engine),
582            "test-peer".to_string(),
583            BatchConfig {
584                max_batch_size: 3,
585                max_batch_delay: Duration::from_secs(60),
586                max_concurrent_checks: 4,
587            },
588        );
589
590        processor.add(make_put_event("a", "1"));
591        assert!(!processor.should_flush());
592
593        processor.add(make_put_event("b", "2"));
594        assert!(!processor.should_flush());
595
596        processor.add(make_put_event("c", "3"));
597        assert!(processor.should_flush()); // Hit max_batch_size
598    }
599
600    #[tokio::test]
601    async fn test_empty_flush() {
602        let engine = Arc::new(TrackingSyncEngine::new());
603        let mut processor = BatchProcessor::new(
604            Arc::clone(&engine),
605            "test-peer".to_string(),
606            BatchConfig::testing(),
607        );
608
609        let result = processor.flush().await.unwrap();
610        assert_eq!(result.total, 0);
611        assert!(result.is_success());
612    }
613
614    #[tokio::test]
615    async fn test_batch_is_empty() {
616        let engine = Arc::new(TrackingSyncEngine::new());
617        let mut processor = BatchProcessor::new(
618            Arc::clone(&engine),
619            "test-peer".to_string(),
620            BatchConfig::testing(),
621        );
622
623        assert!(processor.is_empty());
624        assert_eq!(processor.len(), 0);
625
626        processor.add(make_put_event("key", "data"));
627        assert!(!processor.is_empty());
628        assert_eq!(processor.len(), 1);
629
630        processor.flush().await.unwrap();
631        assert!(processor.is_empty());
632        assert_eq!(processor.len(), 0);
633    }
634
635    #[tokio::test]
636    async fn test_should_flush_by_time() {
637        let engine = Arc::new(TrackingSyncEngine::new());
638        let mut processor = BatchProcessor::new(
639            Arc::clone(&engine),
640            "test-peer".to_string(),
641            BatchConfig {
642                max_batch_size: 1000, // High, won't trigger
643                max_batch_delay: Duration::from_millis(10),
644                max_concurrent_checks: 4,
645            },
646        );
647
648        processor.add(make_put_event("a", "1"));
649        assert!(!processor.should_flush());
650
651        // Wait for delay
652        tokio::time::sleep(Duration::from_millis(15)).await;
653        assert!(processor.should_flush());
654    }
655
656    #[tokio::test]
657    async fn test_batch_result_is_success() {
658        let mut result = BatchResult::default();
659        assert!(result.is_success());
660
661        result.errors = 1;
662        assert!(!result.is_success());
663
664        result.errors = 0;
665        result.submitted = 10;
666        result.skipped = 5;
667        assert!(result.is_success());
668    }
669
670    #[test]
671    fn test_batch_config_default() {
672        let config = BatchConfig::default();
673        assert_eq!(config.max_batch_size, 100);
674        assert_eq!(config.max_batch_delay, Duration::from_millis(50));
675        assert_eq!(config.max_concurrent_checks, 32);
676    }
677
678    #[test]
679    fn test_batch_config_testing() {
680        let config = BatchConfig::testing();
681        assert_eq!(config.max_batch_size, 10);
682        assert_eq!(config.max_batch_delay, Duration::from_millis(5));
683        assert_eq!(config.max_concurrent_checks, 4);
684    }
685
686    #[tokio::test]
687    async fn test_put_event_without_data() {
688        let engine = Arc::new(TrackingSyncEngine::new());
689        let mut processor = BatchProcessor::new(
690            Arc::clone(&engine),
691            "test-peer".to_string(),
692            BatchConfig::testing(),
693        );
694
695        // PUT without data should be skipped
696        let event = CdcEvent {
697            stream_id: "0-0".to_string(),
698            op: CdcOp::Put,
699            key: "key1".to_string(),
700            hash: Some("hash".to_string()),
701            data: None, // Missing data
702            meta: None,
703        };
704        processor.add(event);
705
706        assert_eq!(processor.len(), 0); // Should not be added
707    }
708
709    #[tokio::test]
710    async fn test_put_event_without_hash() {
711        let engine = Arc::new(TrackingSyncEngine::new());
712        let mut processor = BatchProcessor::new(
713            Arc::clone(&engine),
714            "test-peer".to_string(),
715            BatchConfig::testing(),
716        );
717
718        // PUT without hash should be skipped
719        let event = CdcEvent {
720            stream_id: "0-0".to_string(),
721            op: CdcOp::Put,
722            key: "key1".to_string(),
723            hash: None, // Missing hash
724            data: Some(vec![1, 2, 3]),
725            meta: None,
726        };
727        processor.add(event);
728
729        assert_eq!(processor.len(), 0); // Should not be added
730    }
731
732    #[tokio::test]
733    async fn test_multiple_puts_same_key_different_data() {
734        let engine = Arc::new(TrackingSyncEngine::new());
735        let mut processor = BatchProcessor::new(
736            Arc::clone(&engine),
737            "test-peer".to_string(),
738            BatchConfig::testing(),
739        );
740
741        // First put
742        processor.add(make_put_event("user.1", "first"));
743        // Second put with different data - should replace
744        processor.add(make_put_event("user.1", "second"));
745        // Third put - final value
746        processor.add(make_put_event("user.1", "third"));
747
748        assert_eq!(processor.len(), 1);
749
750        let result = processor.flush().await.unwrap();
751        assert_eq!(result.submitted, 1);
752    }
753
754    #[tokio::test]
755    async fn test_many_different_keys() {
756        let engine = Arc::new(TrackingSyncEngine::new());
757        let mut processor = BatchProcessor::new(
758            Arc::clone(&engine),
759            "test-peer".to_string(),
760            BatchConfig::testing(),
761        );
762
763        // Many different keys
764        for i in 0..50 {
765            processor.add(make_put_event(&format!("key.{}", i), &format!("data{}", i)));
766        }
767
768        assert_eq!(processor.len(), 50);
769
770        let result = processor.flush().await.unwrap();
771        assert_eq!(result.total, 50);
772        assert_eq!(result.submitted, 50);
773    }
774
775    #[tokio::test]
776    async fn test_interleaved_operations() {
777        let engine = Arc::new(TrackingSyncEngine::new());
778        let mut processor = BatchProcessor::new(
779            Arc::clone(&engine),
780            "test-peer".to_string(),
781            BatchConfig::testing(),
782        );
783
784        // Interleaved PUTs and DELETEs on different keys
785        processor.add(make_put_event("a", "1"));
786        processor.add(make_delete_event("b"));
787        processor.add(make_put_event("c", "3"));
788        processor.add(make_delete_event("d"));
789        processor.add(make_put_event("e", "5"));
790
791        assert_eq!(processor.len(), 5);
792
793        let result = processor.flush().await.unwrap();
794        assert_eq!(result.total, 5);
795        assert_eq!(result.submitted, 3);
796        assert_eq!(result.deleted, 2);
797    }
798}