Skip to main content

foxtive_worker/
batch.rs

1use async_trait::async_trait;
2use serde::{Deserialize, Serialize};
3use std::time::Duration;
4
5use crate::error::WorkerResult;
6use crate::message::Message;
7
8/// A batch of messages to be processed together.
9#[derive(Debug)]
10pub struct MessageBatch<T> {
11    /// Unique identifier for this batch
12    pub id: String,
13    
14    /// Messages in this batch
15    pub messages: Vec<ReceivedBatchMessage<T>>,
16    
17    /// When this batch was created
18    pub created_at: std::time::Instant,
19    
20    /// Metadata about the batch
21    pub metadata: BatchMetadata,
22}
23
24impl<T> Clone for MessageBatch<T> where T: Clone {
25    fn clone(&self) -> Self {
26        Self {
27            id: self.id.clone(),
28            messages: self.messages.clone(),
29            created_at: self.created_at,
30            metadata: self.metadata.clone(),
31        }
32    }
33}
34
35impl<T> MessageBatch<T> {
36    /// Create a new message batch
37    pub fn new(id: String, messages: Vec<ReceivedBatchMessage<T>>) -> Self {
38        let total = messages.len();
39        Self {
40            id,
41            messages,
42            created_at: std::time::Instant::now(),
43            metadata: BatchMetadata {
44                total_messages: total,
45                ..Default::default()
46            },
47        }
48    }
49
50    /// Get the number of messages in this batch
51    pub fn len(&self) -> usize {
52        self.messages.len()
53    }
54
55    /// Check if batch is empty
56    pub fn is_empty(&self) -> bool {
57        self.messages.is_empty()
58    }
59
60    /// Get batch age
61    pub fn age(&self) -> Duration {
62        self.created_at.elapsed()
63    }
64}
65
66/// Metadata for a message batch
67#[derive(Debug, Clone, Default, Serialize, Deserialize)]
68pub struct BatchMetadata {
69    /// Total number of messages in the batch
70    pub total_messages: usize,
71    
72    /// Source queues represented in this batch
73    pub source_queues: Vec<String>,
74    
75    /// Batch processing status
76    pub status: BatchStatus,
77    
78    /// Error information if batch failed
79    pub error: Option<String>,
80}
81
82/// Status of a batch processing operation
83#[derive(Debug, Clone, Serialize, Deserialize)]
84#[derive(Default)]
85pub enum BatchStatus {
86    /// Batch is being assembled
87    #[default]
88    Assembling,
89    /// Batch is ready for processing
90    Ready,
91    /// Batch is currently being processed
92    Processing,
93    /// Batch completed successfully
94    Completed,
95    /// Batch failed
96    Failed,
97    /// Batch was flushed due to timeout
98    TimeoutFlush,
99    /// Batch was flushed due to shutdown
100    ShutdownFlush,
101}
102
103
104/// A message within a batch context
105#[derive(Debug)]
106pub struct ReceivedBatchMessage<T> {
107    /// The underlying message
108    pub message: Message<T>,
109    
110    /// Index within the batch
111    pub batch_index: usize,
112}
113
114impl<T> Clone for ReceivedBatchMessage<T> where T: Clone {
115    fn clone(&self) -> Self {
116        Self {
117            message: self.message.clone(),
118            batch_index: self.batch_index,
119        }
120    }
121}
122
123/// Trait for handlers that process batches of messages
124#[async_trait]
125pub trait BatchHandler: Send + Sync {
126    /// Process a batch of messages
127    /// 
128    /// All messages in the batch should be processed atomically.
129    /// If any message fails, the entire batch may need to be retried
130    /// or handled according to your error strategy.
131    async fn process_batch(&self, batch: MessageBatch<serde_json::Value>) -> WorkerResult<()>;
132    
133    /// Optional: Setup before batch handler starts
134    async fn setup(&self) -> WorkerResult<()> {
135        Ok(())
136    }
137    
138    /// Optional: Cleanup on shutdown
139    async fn teardown(&self) {}
140    
141    /// Maximum batch size this handler can process
142    fn max_batch_size(&self) -> usize {
143        100
144    }
145    
146    /// Maximum time to wait before flushing an incomplete batch
147    fn max_batch_age(&self) -> Duration {
148        Duration::from_secs(30)
149    }
150}
151
152/// Configuration for batch processing
153#[derive(Debug, Clone)]
154pub struct BatchConfig {
155    /// Maximum number of messages per batch
156    pub batch_size: usize,
157    
158    /// Maximum time to wait before flushing (even if batch not full)
159    pub flush_interval: Duration,
160    
161    /// Whether to wait for full batch before processing
162    pub wait_for_full_batch: bool,
163    
164    /// Timeout for batch processing
165    pub processing_timeout: Duration,
166}
167
168impl Default for BatchConfig {
169    fn default() -> Self {
170        Self {
171            batch_size: 50,
172            flush_interval: Duration::from_secs(10),
173            wait_for_full_batch: false,
174            processing_timeout: Duration::from_secs(60),
175        }
176    }
177}
178
179impl BatchConfig {
180    /// Create a new batch config with custom batch size
181    pub fn with_batch_size(mut self, size: usize) -> Self {
182        self.batch_size = size;
183        self
184    }
185
186    /// Set the flush interval
187    pub fn with_flush_interval(mut self, interval: Duration) -> Self {
188        self.flush_interval = interval;
189        self
190    }
191
192    /// Configure whether to wait for full batch
193    pub fn wait_for_full_batch(mut self, wait: bool) -> Self {
194        self.wait_for_full_batch = wait;
195        self
196    }
197
198    /// Set processing timeout
199    pub fn with_processing_timeout(mut self, timeout: Duration) -> Self {
200        self.processing_timeout = timeout;
201        self
202    }
203}
204
205#[cfg(test)]
206mod tests {
207    use super::*;
208
209    #[test]
210    fn test_message_batch_creation() {
211        let messages = vec![];
212        let batch = MessageBatch::<serde_json::Value>::new("batch-1".to_string(), messages);
213        
214        assert_eq!(batch.id, "batch-1");
215        assert_eq!(batch.len(), 0);
216        assert!(batch.is_empty());
217    }
218
219    #[test]
220    fn test_batch_config_builder() {
221        let config = BatchConfig::default()
222            .with_batch_size(100)
223            .with_flush_interval(Duration::from_secs(5))
224            .wait_for_full_batch(true);
225        
226        assert_eq!(config.batch_size, 100);
227        assert_eq!(config.flush_interval, Duration::from_secs(5));
228        assert!(config.wait_for_full_batch);
229    }
230}