use async_trait::async_trait;
use celers_protocol::Message;
use std::time::Duration;
use crate::{Envelope, Result};
#[derive(Debug, Clone)]
pub struct ScheduleConfig {
pub delay: Option<Duration>,
pub scheduled_at: Option<u64>,
pub execution_window: Option<Duration>,
}
impl ScheduleConfig {
pub fn delay(delay: Duration) -> Self {
Self {
delay: Some(delay),
scheduled_at: None,
execution_window: None,
}
}
pub fn at(timestamp: u64) -> Self {
Self {
delay: None,
scheduled_at: Some(timestamp),
execution_window: None,
}
}
pub fn with_window(mut self, window: Duration) -> Self {
self.execution_window = Some(window);
self
}
pub fn is_ready(&self) -> bool {
let now = std::time::SystemTime::now()
.duration_since(std::time::UNIX_EPOCH)
.unwrap()
.as_secs();
if let Some(timestamp) = self.scheduled_at {
return now >= timestamp;
}
true
}
pub fn delivery_time(&self) -> Option<u64> {
if let Some(timestamp) = self.scheduled_at {
return Some(timestamp);
}
if let Some(delay) = self.delay {
let now = std::time::SystemTime::now()
.duration_since(std::time::UNIX_EPOCH)
.unwrap()
.as_secs();
return Some(now + delay.as_secs());
}
None
}
}
#[async_trait]
pub trait MessageScheduler: Send + Sync {
async fn schedule_message(
&mut self,
queue: &str,
message: Message,
schedule: ScheduleConfig,
) -> Result<String>;
async fn cancel_scheduled(&mut self, schedule_id: &str) -> Result<()>;
async fn list_scheduled(&mut self, queue: &str) -> Result<Vec<ScheduledMessage>>;
async fn scheduled_count(&mut self, queue: &str) -> Result<usize>;
}
#[derive(Debug, Clone)]
pub struct ScheduledMessage {
pub schedule_id: String,
pub queue: String,
pub delivery_time: u64,
pub message_size: usize,
}
#[derive(Debug, Clone)]
pub struct ConsumerGroupConfig {
pub group_id: String,
pub consumer_id: String,
pub max_consumers: Option<usize>,
pub rebalance_timeout: Duration,
pub heartbeat_interval: Duration,
}
impl ConsumerGroupConfig {
pub fn new(group_id: String, consumer_id: String) -> Self {
Self {
group_id,
consumer_id,
max_consumers: None,
rebalance_timeout: Duration::from_secs(30),
heartbeat_interval: Duration::from_secs(3),
}
}
pub fn with_max_consumers(mut self, max: usize) -> Self {
self.max_consumers = Some(max);
self
}
pub fn with_rebalance_timeout(mut self, timeout: Duration) -> Self {
self.rebalance_timeout = timeout;
self
}
pub fn with_heartbeat_interval(mut self, interval: Duration) -> Self {
self.heartbeat_interval = interval;
self
}
}
#[async_trait]
pub trait ConsumerGroup: Send + Sync {
async fn join_group(&mut self, config: &ConsumerGroupConfig) -> Result<()>;
async fn leave_group(&mut self, group_id: &str) -> Result<()>;
async fn heartbeat(&mut self, group_id: &str) -> Result<()>;
async fn group_members(&mut self, group_id: &str) -> Result<Vec<String>>;
async fn consume_from_group(
&mut self,
group_id: &str,
queues: &[String],
timeout: Duration,
) -> Result<Option<Envelope>>;
}
#[derive(Debug, Clone)]
pub struct ReplayConfig {
pub from_duration: Option<Duration>,
pub from_timestamp: Option<u64>,
pub until_timestamp: Option<u64>,
pub max_messages: Option<usize>,
pub speed_multiplier: f64,
}
impl ReplayConfig {
pub fn from_duration(duration: Duration) -> Self {
Self {
from_duration: Some(duration),
from_timestamp: None,
until_timestamp: None,
max_messages: None,
speed_multiplier: 1.0,
}
}
pub fn from_timestamp(timestamp: u64) -> Self {
Self {
from_duration: None,
from_timestamp: Some(timestamp),
until_timestamp: None,
max_messages: None,
speed_multiplier: 1.0,
}
}
pub fn until(mut self, timestamp: u64) -> Self {
self.until_timestamp = Some(timestamp);
self
}
pub fn with_max_messages(mut self, max: usize) -> Self {
self.max_messages = Some(max);
self
}
pub fn with_speed(mut self, multiplier: f64) -> Self {
self.speed_multiplier = multiplier;
self
}
pub fn start_timestamp(&self) -> u64 {
if let Some(ts) = self.from_timestamp {
return ts;
}
if let Some(duration) = self.from_duration {
let now = std::time::SystemTime::now()
.duration_since(std::time::UNIX_EPOCH)
.unwrap()
.as_secs();
return now.saturating_sub(duration.as_secs());
}
0
}
}
#[async_trait]
pub trait MessageReplay: Send + Sync {
async fn begin_replay(&mut self, queue: &str, config: ReplayConfig) -> Result<String>;
async fn replay_next(&mut self, replay_id: &str) -> Result<Option<Envelope>>;
async fn stop_replay(&mut self, replay_id: &str) -> Result<()>;
async fn replay_progress(&mut self, replay_id: &str) -> Result<ReplayProgress>;
}
#[derive(Debug, Clone)]
pub struct ReplayProgress {
pub replay_id: String,
pub messages_replayed: usize,
pub total_messages: Option<usize>,
pub current_timestamp: u64,
pub completed: bool,
}
impl ReplayProgress {
pub fn completion_percent(&self) -> Option<f64> {
self.total_messages.map(|total| {
if total == 0 {
100.0
} else {
(self.messages_replayed as f64 / total as f64) * 100.0
}
})
}
}