1use async_trait::async_trait;
2use serde::{Deserialize, Serialize};
3use std::time::Duration;
4
5use crate::error::WorkerResult;
6use crate::message::Message;
7
8#[derive(Debug)]
10pub struct MessageBatch<T> {
11 pub id: String,
13
14 pub messages: Vec<ReceivedBatchMessage<T>>,
16
17 pub created_at: std::time::Instant,
19
20 pub metadata: BatchMetadata,
22}
23
24impl<T> Clone for MessageBatch<T> where T: Clone {
25 fn clone(&self) -> Self {
26 Self {
27 id: self.id.clone(),
28 messages: self.messages.clone(),
29 created_at: self.created_at,
30 metadata: self.metadata.clone(),
31 }
32 }
33}
34
35impl<T> MessageBatch<T> {
36 pub fn new(id: String, messages: Vec<ReceivedBatchMessage<T>>) -> Self {
38 let total = messages.len();
39 Self {
40 id,
41 messages,
42 created_at: std::time::Instant::now(),
43 metadata: BatchMetadata {
44 total_messages: total,
45 ..Default::default()
46 },
47 }
48 }
49
50 pub fn len(&self) -> usize {
52 self.messages.len()
53 }
54
55 pub fn is_empty(&self) -> bool {
57 self.messages.is_empty()
58 }
59
60 pub fn age(&self) -> Duration {
62 self.created_at.elapsed()
63 }
64}
65
66#[derive(Debug, Clone, Default, Serialize, Deserialize)]
68pub struct BatchMetadata {
69 pub total_messages: usize,
71
72 pub source_queues: Vec<String>,
74
75 pub status: BatchStatus,
77
78 pub error: Option<String>,
80}
81
82#[derive(Debug, Clone, Serialize, Deserialize)]
84#[derive(Default)]
85pub enum BatchStatus {
86 #[default]
88 Assembling,
89 Ready,
91 Processing,
93 Completed,
95 Failed,
97 TimeoutFlush,
99 ShutdownFlush,
101}
102
103
104#[derive(Debug)]
106pub struct ReceivedBatchMessage<T> {
107 pub message: Message<T>,
109
110 pub batch_index: usize,
112}
113
114impl<T> Clone for ReceivedBatchMessage<T> where T: Clone {
115 fn clone(&self) -> Self {
116 Self {
117 message: self.message.clone(),
118 batch_index: self.batch_index,
119 }
120 }
121}
122
123#[async_trait]
125pub trait BatchHandler: Send + Sync {
126 async fn process_batch(&self, batch: MessageBatch<serde_json::Value>) -> WorkerResult<()>;
132
133 async fn setup(&self) -> WorkerResult<()> {
135 Ok(())
136 }
137
138 async fn teardown(&self) {}
140
141 fn max_batch_size(&self) -> usize {
143 100
144 }
145
146 fn max_batch_age(&self) -> Duration {
148 Duration::from_secs(30)
149 }
150}
151
152#[derive(Debug, Clone)]
154pub struct BatchConfig {
155 pub batch_size: usize,
157
158 pub flush_interval: Duration,
160
161 pub wait_for_full_batch: bool,
163
164 pub processing_timeout: Duration,
166}
167
168impl Default for BatchConfig {
169 fn default() -> Self {
170 Self {
171 batch_size: 50,
172 flush_interval: Duration::from_secs(10),
173 wait_for_full_batch: false,
174 processing_timeout: Duration::from_secs(60),
175 }
176 }
177}
178
179impl BatchConfig {
180 pub fn with_batch_size(mut self, size: usize) -> Self {
182 self.batch_size = size;
183 self
184 }
185
186 pub fn with_flush_interval(mut self, interval: Duration) -> Self {
188 self.flush_interval = interval;
189 self
190 }
191
192 pub fn wait_for_full_batch(mut self, wait: bool) -> Self {
194 self.wait_for_full_batch = wait;
195 self
196 }
197
198 pub fn with_processing_timeout(mut self, timeout: Duration) -> Self {
200 self.processing_timeout = timeout;
201 self
202 }
203}
204
205#[cfg(test)]
206mod tests {
207 use super::*;
208
209 #[test]
210 fn test_message_batch_creation() {
211 let messages = vec![];
212 let batch = MessageBatch::<serde_json::Value>::new("batch-1".to_string(), messages);
213
214 assert_eq!(batch.id, "batch-1");
215 assert_eq!(batch.len(), 0);
216 assert!(batch.is_empty());
217 }
218
219 #[test]
220 fn test_batch_config_builder() {
221 let config = BatchConfig::default()
222 .with_batch_size(100)
223 .with_flush_interval(Duration::from_secs(5))
224 .wait_for_full_batch(true);
225
226 assert_eq!(config.batch_size, 100);
227 assert_eq!(config.flush_interval, Duration::from_secs(5));
228 assert!(config.wait_for_full_batch);
229 }
230}