use super::task::{Task, TaskStatus};
use serde::Serialize;
use std::sync::Arc;
use tokio::sync::broadcast;
use tracing::info;
const DEFAULT_CHANNEL_CAPACITY: usize = 512;
#[derive(Debug, Clone, Serialize)]
pub enum TaskEvent {
Created {
task: Task,
},
Updated {
task_id: String,
old_status: TaskStatus,
new_status: TaskStatus,
},
Deleted {
task_id: String,
},
Assigned {
task_id: String,
agent: String,
},
Failed {
task_id: String,
error: String,
attempt: u32,
},
Completed {
task_id: String,
result: String,
},
}
impl TaskEvent {
pub fn task_id(&self) -> &str {
match self {
TaskEvent::Created { task } => &task.id,
TaskEvent::Updated { task_id, .. } => task_id,
TaskEvent::Deleted { task_id } => task_id,
TaskEvent::Assigned { task_id, .. } => task_id,
TaskEvent::Failed { task_id, .. } => task_id,
TaskEvent::Completed { task_id, .. } => task_id,
}
}
}
pub trait TaskEventListener: Send + Sync {
fn on_event(&self, event: &TaskEvent);
}
#[async_trait::async_trait]
pub trait AsyncTaskEventListener: Send + Sync {
async fn on_event(&self, event: Arc<TaskEvent>);
}
pub struct LoggingListener;
impl TaskEventListener for LoggingListener {
fn on_event(&self, event: &TaskEvent) {
match event {
TaskEvent::Created { task } => {
info!(task_id = %task.id, subject = %task.subject, "task_created");
}
TaskEvent::Updated {
task_id,
old_status,
new_status,
} => {
info!(
task_id = %task_id,
old_status = ?old_status,
new_status = ?new_status,
"task_updated"
);
}
TaskEvent::Deleted { task_id } => {
info!(task_id = %task_id, "task_deleted");
}
TaskEvent::Assigned { task_id, agent } => {
info!(task_id = %task_id, agent = %agent, "task_assigned");
}
TaskEvent::Failed {
task_id,
error,
attempt,
} => {
info!(
task_id = %task_id,
error = %error,
attempt = attempt,
"task_failed"
);
}
TaskEvent::Completed { task_id, result } => {
let result_preview = if result.len() > 100 {
format!("{}...", &result[..100])
} else {
result.clone()
};
info!(
task_id = %task_id,
result = %result_preview,
"task_completed"
);
}
}
}
}
pub struct TaskEventBus {
tx: broadcast::Sender<Arc<TaskEvent>>,
sync_listeners: Vec<Arc<dyn TaskEventListener>>,
}
impl TaskEventBus {
pub fn new() -> Self {
Self::with_capacity(DEFAULT_CHANNEL_CAPACITY)
}
pub fn with_capacity(capacity: usize) -> Self {
let (tx, _) = broadcast::channel(capacity);
Self {
tx,
sync_listeners: Vec::new(),
}
}
pub fn register(&mut self, listener: Arc<dyn TaskEventListener>) {
self.sync_listeners.push(listener);
}
pub fn subscribe(&self) -> broadcast::Receiver<Arc<TaskEvent>> {
self.tx.subscribe()
}
pub fn emit(&self, event: TaskEvent) {
let event_for_async = Arc::new(event.clone());
for listener in &self.sync_listeners {
let listener = listener.clone();
let event = event.clone();
tokio::spawn(async move {
listener.on_event(&event);
});
}
let _ = self.tx.send(event_for_async);
}
pub fn subscriber_count(&self) -> usize {
self.tx.receiver_count()
}
}
impl Clone for TaskEventBus {
fn clone(&self) -> Self {
Self {
tx: self.tx.clone(),
sync_listeners: Vec::new(), }
}
}
impl Default for TaskEventBus {
fn default() -> Self {
Self::new()
}
}