use async_trait::async_trait;
use serde::{Deserialize, Serialize};
use std::time::Duration;
use crate::error::WorkerResult;
use crate::message::Message;
#[derive(Debug)]
pub struct MessageBatch<T> {
pub id: String,
pub messages: Vec<ReceivedBatchMessage<T>>,
pub created_at: std::time::Instant,
pub metadata: BatchMetadata,
}
impl<T> Clone for MessageBatch<T> where T: Clone {
fn clone(&self) -> Self {
Self {
id: self.id.clone(),
messages: self.messages.clone(),
created_at: self.created_at,
metadata: self.metadata.clone(),
}
}
}
impl<T> MessageBatch<T> {
pub fn new(id: String, messages: Vec<ReceivedBatchMessage<T>>) -> Self {
let total = messages.len();
Self {
id,
messages,
created_at: std::time::Instant::now(),
metadata: BatchMetadata {
total_messages: total,
..Default::default()
},
}
}
pub fn len(&self) -> usize {
self.messages.len()
}
pub fn is_empty(&self) -> bool {
self.messages.is_empty()
}
pub fn age(&self) -> Duration {
self.created_at.elapsed()
}
}
#[derive(Debug, Clone, Default, Serialize, Deserialize)]
pub struct BatchMetadata {
pub total_messages: usize,
pub source_queues: Vec<String>,
pub status: BatchStatus,
pub error: Option<String>,
}
#[derive(Debug, Clone, Serialize, Deserialize)]
#[derive(Default)]
pub enum BatchStatus {
#[default]
Assembling,
Ready,
Processing,
Completed,
Failed,
TimeoutFlush,
ShutdownFlush,
}
#[derive(Debug)]
pub struct ReceivedBatchMessage<T> {
pub message: Message<T>,
pub batch_index: usize,
}
impl<T> Clone for ReceivedBatchMessage<T> where T: Clone {
fn clone(&self) -> Self {
Self {
message: self.message.clone(),
batch_index: self.batch_index,
}
}
}
#[async_trait]
pub trait BatchHandler: Send + Sync {
async fn process_batch(&self, batch: MessageBatch<serde_json::Value>) -> WorkerResult<()>;
async fn setup(&self) -> WorkerResult<()> {
Ok(())
}
async fn teardown(&self) {}
fn max_batch_size(&self) -> usize {
100
}
fn max_batch_age(&self) -> Duration {
Duration::from_secs(30)
}
}
#[derive(Debug, Clone)]
pub struct BatchConfig {
pub batch_size: usize,
pub flush_interval: Duration,
pub wait_for_full_batch: bool,
pub processing_timeout: Duration,
}
impl Default for BatchConfig {
fn default() -> Self {
Self {
batch_size: 50,
flush_interval: Duration::from_secs(10),
wait_for_full_batch: false,
processing_timeout: Duration::from_secs(60),
}
}
}
impl BatchConfig {
pub fn with_batch_size(mut self, size: usize) -> Self {
self.batch_size = size;
self
}
pub fn with_flush_interval(mut self, interval: Duration) -> Self {
self.flush_interval = interval;
self
}
pub fn wait_for_full_batch(mut self, wait: bool) -> Self {
self.wait_for_full_batch = wait;
self
}
pub fn with_processing_timeout(mut self, timeout: Duration) -> Self {
self.processing_timeout = timeout;
self
}
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn test_message_batch_creation() {
let messages = vec![];
let batch = MessageBatch::<serde_json::Value>::new("batch-1".to_string(), messages);
assert_eq!(batch.id, "batch-1");
assert_eq!(batch.len(), 0);
assert!(batch.is_empty());
}
#[test]
fn test_batch_config_builder() {
let config = BatchConfig::default()
.with_batch_size(100)
.with_flush_interval(Duration::from_secs(5))
.wait_for_full_batch(true);
assert_eq!(config.batch_size, 100);
assert_eq!(config.flush_interval, Duration::from_secs(5));
assert!(config.wait_for_full_batch);
}
}