use crate::{Broker, SerializedTask};
use async_trait::async_trait;
use celers_core::broker::BrokerMessage;
use celers_core::error::CelersError;
use celers_core::task::TaskId;
use std::collections::VecDeque;
use std::sync::{Arc, Mutex};
type Result<T> = std::result::Result<T, CelersError>;
#[derive(Clone)]
pub struct MockBroker {
queue: Arc<Mutex<VecDeque<SerializedTask>>>,
published_tasks: Arc<Mutex<Vec<SerializedTask>>>,
}
impl MockBroker {
pub fn new() -> Self {
Self {
queue: Arc::new(Mutex::new(VecDeque::new())),
published_tasks: Arc::new(Mutex::new(Vec::new())),
}
}
pub fn queue_len(&self) -> usize {
self.queue
.lock()
.expect("lock should not be poisoned")
.len()
}
pub fn published_tasks(&self) -> Vec<SerializedTask> {
self.published_tasks
.lock()
.expect("lock should not be poisoned")
.clone()
}
pub fn clear(&self) {
self.queue
.lock()
.expect("lock should not be poisoned")
.clear();
self.published_tasks
.lock()
.expect("lock should not be poisoned")
.clear();
}
pub fn push_task(&self, task: SerializedTask) {
self.queue
.lock()
.expect("lock should not be poisoned")
.push_back(task);
}
}
impl Default for MockBroker {
fn default() -> Self {
Self::new()
}
}
#[async_trait]
impl Broker for MockBroker {
async fn enqueue(&self, task: SerializedTask) -> Result<TaskId> {
let task_id = task.metadata.id;
self.published_tasks
.lock()
.expect("lock should not be poisoned")
.push(task.clone());
self.queue
.lock()
.expect("lock should not be poisoned")
.push_back(task);
Ok(task_id)
}
async fn dequeue(&self) -> Result<Option<BrokerMessage>> {
let task = self
.queue
.lock()
.expect("lock should not be poisoned")
.pop_front();
Ok(task.map(BrokerMessage::new))
}
async fn ack(&self, _task_id: &TaskId, _receipt_handle: Option<&str>) -> Result<()> {
Ok(())
}
async fn reject(
&self,
_task_id: &TaskId,
_receipt_handle: Option<&str>,
_requeue: bool,
) -> Result<()> {
Ok(())
}
async fn queue_size(&self) -> Result<usize> {
Ok(self
.queue
.lock()
.expect("lock should not be poisoned")
.len())
}
async fn cancel(&self, task_id: &TaskId) -> Result<bool> {
let mut queue = self.queue.lock().expect("lock should not be poisoned");
let original_len = queue.len();
queue.retain(|t| &t.metadata.id != task_id);
Ok(queue.len() < original_len)
}
}
pub struct TaskBuilder {
name: String,
id: Option<String>,
max_retries: u32,
payload: Vec<u8>,
}
impl TaskBuilder {
pub fn new(task_name: &str) -> Self {
Self {
name: task_name.to_string(),
id: None,
max_retries: 0,
payload: Vec::new(),
}
}
pub fn id(mut self, id: String) -> Self {
self.id = Some(id);
self
}
pub fn max_retries(mut self, max_retries: u32) -> Self {
self.max_retries = max_retries;
self
}
pub fn payload(mut self, payload: Vec<u8>) -> Self {
self.payload = payload;
self
}
pub fn build(self) -> SerializedTask {
use uuid::Uuid;
let mut task = SerializedTask::new(self.name, self.payload);
if let Some(id) = self.id {
task.metadata.id = Uuid::parse_str(&id).unwrap_or_else(|_| Uuid::new_v4());
}
task.metadata.max_retries = self.max_retries;
task
}
}
pub fn create_test_task(name: &str) -> SerializedTask {
TaskBuilder::new(name).build()
}
pub struct TaskDebugger {
task_history: Arc<Mutex<Vec<TaskDebugInfo>>>,
}
#[derive(Debug, Clone)]
pub struct TaskDebugInfo {
pub task_id: String,
pub task_name: String,
pub state: String,
pub timestamp: std::time::SystemTime,
pub metadata: std::collections::HashMap<String, String>,
}
impl TaskDebugger {
pub fn new() -> Self {
Self {
task_history: Arc::new(Mutex::new(Vec::new())),
}
}
pub fn record_task(&self, task: &SerializedTask, state: &str) {
let mut history = self
.task_history
.lock()
.expect("lock should not be poisoned");
history.push(TaskDebugInfo {
task_id: task.metadata.id.to_string(),
task_name: task.metadata.name.clone(),
state: state.to_string(),
timestamp: std::time::SystemTime::now(),
metadata: std::collections::HashMap::new(),
});
}
pub fn history(&self) -> Vec<TaskDebugInfo> {
self.task_history
.lock()
.expect("lock should not be poisoned")
.clone()
}
pub fn clear(&self) {
self.task_history
.lock()
.expect("lock should not be poisoned")
.clear();
}
pub fn tasks_by_state(&self, state: &str) -> Vec<TaskDebugInfo> {
self.task_history
.lock()
.unwrap()
.iter()
.filter(|info| info.state == state)
.cloned()
.collect()
}
pub fn print_history(&self) {
let history = self.history();
println!(
"\n╔══════════════════════════════════════════════════════════════════════════════╗"
);
println!(
"║ Task Execution History ║"
);
println!(
"╚══════════════════════════════════════════════════════════════════════════════╝\n"
);
for (idx, info) in history.iter().enumerate() {
println!("Task #{}", idx + 1);
println!(" ID: {}", info.task_id);
println!(" Name: {}", info.task_name);
println!(" State: {}", info.state);
println!(" Timestamp: {:?}", info.timestamp);
if !info.metadata.is_empty() {
println!(" Metadata:");
for (key, value) in &info.metadata {
println!(" {}: {}", key, value);
}
}
println!();
}
}
}
impl Default for TaskDebugger {
fn default() -> Self {
Self::new()
}
}
pub struct EventTracker {
events: Arc<Mutex<Vec<TrackedEvent>>>,
}
#[derive(Debug, Clone)]
pub struct TrackedEvent {
pub event_type: String,
pub task_id: Option<String>,
pub message: String,
pub timestamp: std::time::SystemTime,
}
impl EventTracker {
pub fn new() -> Self {
Self {
events: Arc::new(Mutex::new(Vec::new())),
}
}
pub fn track(&self, event_type: &str, task_id: Option<String>, message: String) {
let mut events = self.events.lock().expect("lock should not be poisoned");
events.push(TrackedEvent {
event_type: event_type.to_string(),
task_id,
message,
timestamp: std::time::SystemTime::now(),
});
}
pub fn events(&self) -> Vec<TrackedEvent> {
self.events
.lock()
.expect("lock should not be poisoned")
.clone()
}
pub fn events_by_type(&self, event_type: &str) -> Vec<TrackedEvent> {
self.events
.lock()
.unwrap()
.iter()
.filter(|e| e.event_type == event_type)
.cloned()
.collect()
}
pub fn clear(&self) {
self.events
.lock()
.expect("lock should not be poisoned")
.clear();
}
pub fn print_events(&self) {
let events = self.events();
println!(
"\n╔══════════════════════════════════════════════════════════════════════════════╗"
);
println!(
"║ Event Log ║"
);
println!(
"╚══════════════════════════════════════════════════════════════════════════════╝\n"
);
for (idx, event) in events.iter().enumerate() {
println!("Event #{}", idx + 1);
println!(" Type: {}", event.event_type);
if let Some(ref task_id) = event.task_id {
println!(" Task ID: {}", task_id);
}
println!(" Message: {}", event.message);
println!(" Timestamp: {:?}", event.timestamp);
println!();
}
}
}
impl Default for EventTracker {
fn default() -> Self {
Self::new()
}
}
pub struct PerformanceProfiler {
measurements: Arc<Mutex<Vec<PerformanceMeasurement>>>,
}
#[derive(Debug, Clone)]
pub struct PerformanceMeasurement {
pub name: String,
pub duration_ms: u128,
pub timestamp: std::time::SystemTime,
pub metadata: std::collections::HashMap<String, String>,
}
impl PerformanceProfiler {
pub fn new() -> Self {
Self {
measurements: Arc::new(Mutex::new(Vec::new())),
}
}
pub fn start_measurement(&self, name: &str) -> MeasurementGuard {
MeasurementGuard {
name: name.to_string(),
start: std::time::Instant::now(),
profiler: self.clone(),
}
}
fn record(&self, name: String, duration_ms: u128) {
let mut measurements = self
.measurements
.lock()
.expect("lock should not be poisoned");
measurements.push(PerformanceMeasurement {
name,
duration_ms,
timestamp: std::time::SystemTime::now(),
metadata: std::collections::HashMap::new(),
});
}
pub fn measurements(&self) -> Vec<PerformanceMeasurement> {
self.measurements
.lock()
.expect("lock should not be poisoned")
.clone()
}
pub fn clear(&self) {
self.measurements
.lock()
.expect("lock should not be poisoned")
.clear();
}
pub fn average_duration(&self, name: &str) -> Option<u128> {
let measurements = self
.measurements
.lock()
.expect("lock should not be poisoned");
let matching: Vec<_> = measurements
.iter()
.filter(|m| m.name == name)
.map(|m| m.duration_ms)
.collect();
if matching.is_empty() {
None
} else {
Some(matching.iter().sum::<u128>() / matching.len() as u128)
}
}
pub fn print_summary(&self) {
let measurements = self.measurements();
println!(
"\n╔══════════════════════════════════════════════════════════════════════════════╗"
);
println!(
"║ Performance Summary ║"
);
println!(
"╚══════════════════════════════════════════════════════════════════════════════╝\n"
);
let mut grouped: std::collections::HashMap<String, Vec<u128>> =
std::collections::HashMap::new();
for m in measurements {
grouped.entry(m.name).or_default().push(m.duration_ms);
}
for (name, durations) in grouped {
let count = durations.len();
let total: u128 = durations.iter().sum();
let avg = total / count as u128;
let min = *durations
.iter()
.min()
.expect("collection validated to be non-empty");
let max = *durations
.iter()
.max()
.expect("collection validated to be non-empty");
println!("{}", name);
println!(" Count: {}", count);
println!(" Avg: {} ms", avg);
println!(" Min: {} ms", min);
println!(" Max: {} ms", max);
println!(" Total: {} ms", total);
println!();
}
}
}
impl Clone for PerformanceProfiler {
fn clone(&self) -> Self {
Self {
measurements: Arc::clone(&self.measurements),
}
}
}
impl Default for PerformanceProfiler {
fn default() -> Self {
Self::new()
}
}
pub struct MeasurementGuard {
name: String,
start: std::time::Instant,
profiler: PerformanceProfiler,
}
impl Drop for MeasurementGuard {
fn drop(&mut self) {
let duration_ms = self.start.elapsed().as_millis();
self.profiler.record(self.name.clone(), duration_ms);
}
}
pub struct QueueInspector {
snapshots: Arc<Mutex<Vec<QueueSnapshot>>>,
}
#[derive(Debug, Clone)]
pub struct QueueSnapshot {
pub queue_size: usize,
pub timestamp: std::time::SystemTime,
pub metadata: std::collections::HashMap<String, String>,
}
impl QueueInspector {
pub fn new() -> Self {
Self {
snapshots: Arc::new(Mutex::new(Vec::new())),
}
}
pub async fn snapshot(&self, broker: &MockBroker) {
let size = broker.queue_len();
let mut snapshots = self.snapshots.lock().expect("lock should not be poisoned");
snapshots.push(QueueSnapshot {
queue_size: size,
timestamp: std::time::SystemTime::now(),
metadata: std::collections::HashMap::new(),
});
}
pub fn snapshots(&self) -> Vec<QueueSnapshot> {
self.snapshots
.lock()
.expect("lock should not be poisoned")
.clone()
}
pub fn clear(&self) {
self.snapshots
.lock()
.expect("lock should not be poisoned")
.clear();
}
pub fn print_history(&self) {
let snapshots = self.snapshots();
println!(
"\n╔══════════════════════════════════════════════════════════════════════════════╗"
);
println!(
"║ Queue Size History ║"
);
println!(
"╚══════════════════════════════════════════════════════════════════════════════╝\n"
);
for (idx, snapshot) in snapshots.iter().enumerate() {
println!("Snapshot #{}", idx + 1);
println!(" Queue Size: {}", snapshot.queue_size);
println!(" Timestamp: {:?}", snapshot.timestamp);
println!();
}
}
}
impl Default for QueueInspector {
fn default() -> Self {
Self::new()
}
}