1use async_trait::async_trait;
2use serde::{Deserialize, Serialize};
3use std::time::Duration;
4
5use crate::error::WorkerResult;
6use crate::message::Message;
7
8#[derive(Debug)]
10pub struct MessageBatch<T> {
11 pub id: String,
13
14 pub messages: Vec<ReceivedBatchMessage<T>>,
16
17 pub created_at: std::time::Instant,
19
20 pub metadata: BatchMetadata,
22}
23
24impl<T> Clone for MessageBatch<T>
25where
26 T: Clone,
27{
28 fn clone(&self) -> Self {
29 Self {
30 id: self.id.clone(),
31 messages: self.messages.clone(),
32 created_at: self.created_at,
33 metadata: self.metadata.clone(),
34 }
35 }
36}
37
38impl<T> MessageBatch<T> {
39 pub fn new(id: String, messages: Vec<ReceivedBatchMessage<T>>) -> Self {
41 let total = messages.len();
42 Self {
43 id,
44 messages,
45 created_at: std::time::Instant::now(),
46 metadata: BatchMetadata {
47 total_messages: total,
48 ..Default::default()
49 },
50 }
51 }
52
53 pub fn len(&self) -> usize {
55 self.messages.len()
56 }
57
58 pub fn is_empty(&self) -> bool {
60 self.messages.is_empty()
61 }
62
63 pub fn age(&self) -> Duration {
65 self.created_at.elapsed()
66 }
67}
68
69#[derive(Debug, Clone, Default, Serialize, Deserialize)]
71pub struct BatchMetadata {
72 pub total_messages: usize,
74
75 pub source_queues: Vec<String>,
77
78 pub status: BatchStatus,
80
81 pub error: Option<String>,
83}
84
85#[derive(Debug, Clone, Serialize, Deserialize, Default)]
87pub enum BatchStatus {
88 #[default]
90 Assembling,
91 Ready,
93 Processing,
95 Completed,
97 Failed,
99 TimeoutFlush,
101 ShutdownFlush,
103}
104
105#[derive(Debug)]
107pub struct ReceivedBatchMessage<T> {
108 pub message: Message<T>,
110
111 pub batch_index: usize,
113}
114
115impl<T> Clone for ReceivedBatchMessage<T>
116where
117 T: Clone,
118{
119 fn clone(&self) -> Self {
120 Self {
121 message: self.message.clone(),
122 batch_index: self.batch_index,
123 }
124 }
125}
126
127#[async_trait]
129pub trait BatchHandler: Send + Sync {
130 async fn process_batch(&self, batch: MessageBatch<serde_json::Value>) -> WorkerResult<()>;
136
137 async fn setup(&self) -> WorkerResult<()> {
139 Ok(())
140 }
141
142 async fn teardown(&self) {}
144
145 fn max_batch_size(&self) -> usize {
147 100
148 }
149
150 fn max_batch_age(&self) -> Duration {
152 Duration::from_secs(30)
153 }
154}
155
156#[derive(Debug, Clone)]
158pub struct BatchConfig {
159 pub batch_size: usize,
161
162 pub flush_interval: Duration,
164
165 pub wait_for_full_batch: bool,
167
168 pub processing_timeout: Duration,
170}
171
172impl Default for BatchConfig {
173 fn default() -> Self {
174 Self {
175 batch_size: 50,
176 flush_interval: Duration::from_secs(10),
177 wait_for_full_batch: false,
178 processing_timeout: Duration::from_secs(60),
179 }
180 }
181}
182
183impl BatchConfig {
184 pub fn with_batch_size(mut self, size: usize) -> Self {
186 self.batch_size = size;
187 self
188 }
189
190 pub fn with_flush_interval(mut self, interval: Duration) -> Self {
192 self.flush_interval = interval;
193 self
194 }
195
196 pub fn wait_for_full_batch(mut self, wait: bool) -> Self {
198 self.wait_for_full_batch = wait;
199 self
200 }
201
202 pub fn with_processing_timeout(mut self, timeout: Duration) -> Self {
204 self.processing_timeout = timeout;
205 self
206 }
207}
208
209#[cfg(test)]
210mod tests {
211 use super::*;
212
213 #[test]
214 fn test_message_batch_creation() {
215 let messages = vec![];
216 let batch = MessageBatch::<serde_json::Value>::new("batch-1".to_string(), messages);
217
218 assert_eq!(batch.id, "batch-1");
219 assert_eq!(batch.len(), 0);
220 assert!(batch.is_empty());
221 }
222
223 #[test]
224 fn test_batch_config_builder() {
225 let config = BatchConfig::default()
226 .with_batch_size(100)
227 .with_flush_interval(Duration::from_secs(5))
228 .wait_for_full_batch(true);
229
230 assert_eq!(config.batch_size, 100);
231 assert_eq!(config.flush_interval, Duration::from_secs(5));
232 assert!(config.wait_for_full_batch);
233 }
234}