Skip to main content

foxtive_worker/
batch.rs

1use async_trait::async_trait;
2use serde::{Deserialize, Serialize};
3use std::time::Duration;
4
5use crate::error::WorkerResult;
6use crate::message::Message;
7
8/// A batch of messages to be processed together.
9#[derive(Debug)]
10pub struct MessageBatch<T> {
11    /// Unique identifier for this batch
12    pub id: String,
13
14    /// Messages in this batch
15    pub messages: Vec<ReceivedBatchMessage<T>>,
16
17    /// When this batch was created
18    pub created_at: std::time::Instant,
19
20    /// Metadata about the batch
21    pub metadata: BatchMetadata,
22}
23
24impl<T> Clone for MessageBatch<T>
25where
26    T: Clone,
27{
28    fn clone(&self) -> Self {
29        Self {
30            id: self.id.clone(),
31            messages: self.messages.clone(),
32            created_at: self.created_at,
33            metadata: self.metadata.clone(),
34        }
35    }
36}
37
38impl<T> MessageBatch<T> {
39    /// Create a new message batch
40    pub fn new(id: String, messages: Vec<ReceivedBatchMessage<T>>) -> Self {
41        let total = messages.len();
42        Self {
43            id,
44            messages,
45            created_at: std::time::Instant::now(),
46            metadata: BatchMetadata {
47                total_messages: total,
48                ..Default::default()
49            },
50        }
51    }
52
53    /// Get the number of messages in this batch
54    pub fn len(&self) -> usize {
55        self.messages.len()
56    }
57
58    /// Check if batch is empty
59    pub fn is_empty(&self) -> bool {
60        self.messages.is_empty()
61    }
62
63    /// Get batch age
64    pub fn age(&self) -> Duration {
65        self.created_at.elapsed()
66    }
67}
68
69/// Metadata for a message batch
70#[derive(Debug, Clone, Default, Serialize, Deserialize)]
71pub struct BatchMetadata {
72    /// Total number of messages in the batch
73    pub total_messages: usize,
74
75    /// Source queues represented in this batch
76    pub source_queues: Vec<String>,
77
78    /// Batch processing status
79    pub status: BatchStatus,
80
81    /// Error information if batch failed
82    pub error: Option<String>,
83}
84
85/// Status of a batch processing operation
86#[derive(Debug, Clone, Serialize, Deserialize, Default)]
87pub enum BatchStatus {
88    /// Batch is being assembled
89    #[default]
90    Assembling,
91    /// Batch is ready for processing
92    Ready,
93    /// Batch is currently being processed
94    Processing,
95    /// Batch completed successfully
96    Completed,
97    /// Batch failed
98    Failed,
99    /// Batch was flushed due to timeout
100    TimeoutFlush,
101    /// Batch was flushed due to shutdown
102    ShutdownFlush,
103}
104
105/// A message within a batch context
106#[derive(Debug)]
107pub struct ReceivedBatchMessage<T> {
108    /// The underlying message
109    pub message: Message<T>,
110
111    /// Index within the batch
112    pub batch_index: usize,
113}
114
115impl<T> Clone for ReceivedBatchMessage<T>
116where
117    T: Clone,
118{
119    fn clone(&self) -> Self {
120        Self {
121            message: self.message.clone(),
122            batch_index: self.batch_index,
123        }
124    }
125}
126
127/// Trait for handlers that process batches of messages
128#[async_trait]
129pub trait BatchHandler: Send + Sync {
130    /// Process a batch of messages
131    ///
132    /// All messages in the batch should be processed atomically.
133    /// If any message fails, the entire batch may need to be retried
134    /// or handled according to your error strategy.
135    async fn process_batch(&self, batch: MessageBatch<serde_json::Value>) -> WorkerResult<()>;
136
137    /// Optional: Setup before batch handler starts
138    async fn setup(&self) -> WorkerResult<()> {
139        Ok(())
140    }
141
142    /// Optional: Cleanup on shutdown
143    async fn teardown(&self) {}
144
145    /// Maximum batch size this handler can process
146    fn max_batch_size(&self) -> usize {
147        100
148    }
149
150    /// Maximum time to wait before flushing an incomplete batch
151    fn max_batch_age(&self) -> Duration {
152        Duration::from_secs(30)
153    }
154}
155
156/// Configuration for batch processing
157#[derive(Debug, Clone)]
158pub struct BatchConfig {
159    /// Maximum number of messages per batch
160    pub batch_size: usize,
161
162    /// Maximum time to wait before flushing (even if batch not full)
163    pub flush_interval: Duration,
164
165    /// Whether to wait for full batch before processing
166    pub wait_for_full_batch: bool,
167
168    /// Timeout for batch processing
169    pub processing_timeout: Duration,
170}
171
172impl Default for BatchConfig {
173    fn default() -> Self {
174        Self {
175            batch_size: 50,
176            flush_interval: Duration::from_secs(10),
177            wait_for_full_batch: false,
178            processing_timeout: Duration::from_secs(60),
179        }
180    }
181}
182
183impl BatchConfig {
184    /// Create a new batch config with custom batch size
185    pub fn with_batch_size(mut self, size: usize) -> Self {
186        self.batch_size = size;
187        self
188    }
189
190    /// Set the flush interval
191    pub fn with_flush_interval(mut self, interval: Duration) -> Self {
192        self.flush_interval = interval;
193        self
194    }
195
196    /// Configure whether to wait for full batch
197    pub fn wait_for_full_batch(mut self, wait: bool) -> Self {
198        self.wait_for_full_batch = wait;
199        self
200    }
201
202    /// Set processing timeout
203    pub fn with_processing_timeout(mut self, timeout: Duration) -> Self {
204        self.processing_timeout = timeout;
205        self
206    }
207}
208
209#[cfg(test)]
210mod tests {
211    use super::*;
212
213    #[test]
214    fn test_message_batch_creation() {
215        let messages = vec![];
216        let batch = MessageBatch::<serde_json::Value>::new("batch-1".to_string(), messages);
217
218        assert_eq!(batch.id, "batch-1");
219        assert_eq!(batch.len(), 0);
220        assert!(batch.is_empty());
221    }
222
223    #[test]
224    fn test_batch_config_builder() {
225        let config = BatchConfig::default()
226            .with_batch_size(100)
227            .with_flush_interval(Duration::from_secs(5))
228            .wait_for_full_batch(true);
229
230        assert_eq!(config.batch_size, 100);
231        assert_eq!(config.flush_interval, Duration::from_secs(5));
232        assert!(config.wait_for_full_batch);
233    }
234}