use oxify_model::{NodeId, Workflow};
use serde::{Deserialize, Serialize};
use std::collections::HashMap;
use std::sync::Arc;
use tokio::sync::{broadcast, RwLock};
use uuid::Uuid;
pub type EventType = String;
pub mod execution_events {
pub const WORKFLOW_STARTED: &str = "workflow.started";
pub const WORKFLOW_COMPLETED: &str = "workflow.completed";
pub const WORKFLOW_FAILED: &str = "workflow.failed";
pub const WORKFLOW_PAUSED: &str = "workflow.paused";
pub const WORKFLOW_RESUMED: &str = "workflow.resumed";
pub const NODE_STARTED: &str = "node.started";
pub const NODE_COMPLETED: &str = "node.completed";
pub const NODE_FAILED: &str = "node.failed";
pub const NODE_SKIPPED: &str = "node.skipped";
pub const NODE_RETRY: &str = "node.retry";
pub const LEVEL_STARTED: &str = "level.started";
pub const LEVEL_COMPLETED: &str = "level.completed";
pub const VARIABLE_UPDATED: &str = "variable.updated";
pub const CHECKPOINT_CREATED: &str = "checkpoint.created";
pub const CHECKPOINT_RESTORED: &str = "checkpoint.restored";
pub const PROGRESS_UPDATE: &str = "progress.update";
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct WorkflowEvent {
pub id: Uuid,
pub event_type: EventType,
pub workflow_id: Uuid,
pub execution_id: Option<Uuid>,
pub payload: serde_json::Value,
pub timestamp: std::time::SystemTime,
pub source: String,
}
impl WorkflowEvent {
pub fn new(event_type: EventType, workflow_id: Uuid, payload: serde_json::Value) -> Self {
Self {
id: Uuid::new_v4(),
event_type,
workflow_id,
execution_id: None,
payload,
timestamp: std::time::SystemTime::now(),
source: "oxify-engine".to_string(),
}
}
pub fn with_execution(mut self, execution_id: Uuid) -> Self {
self.execution_id = Some(execution_id);
self
}
pub fn with_source(mut self, source: String) -> Self {
self.source = source;
self
}
pub fn workflow_started(workflow_id: Uuid, execution_id: Uuid) -> Self {
Self::new(
execution_events::WORKFLOW_STARTED.to_string(),
workflow_id,
serde_json::json!({
"status": "started",
}),
)
.with_execution(execution_id)
}
pub fn workflow_completed(
workflow_id: Uuid,
execution_id: Uuid,
node_count: usize,
duration_ms: u128,
) -> Self {
Self::new(
execution_events::WORKFLOW_COMPLETED.to_string(),
workflow_id,
serde_json::json!({
"status": "completed",
"node_count": node_count,
"duration_ms": duration_ms,
}),
)
.with_execution(execution_id)
}
pub fn workflow_failed(workflow_id: Uuid, execution_id: Uuid, error: &str) -> Self {
Self::new(
execution_events::WORKFLOW_FAILED.to_string(),
workflow_id,
serde_json::json!({
"status": "failed",
"error": error,
}),
)
.with_execution(execution_id)
}
pub fn workflow_paused(workflow_id: Uuid, execution_id: Uuid, reason: &str) -> Self {
Self::new(
execution_events::WORKFLOW_PAUSED.to_string(),
workflow_id,
serde_json::json!({
"status": "paused",
"reason": reason,
}),
)
.with_execution(execution_id)
}
pub fn workflow_resumed(workflow_id: Uuid, execution_id: Uuid) -> Self {
Self::new(
execution_events::WORKFLOW_RESUMED.to_string(),
workflow_id,
serde_json::json!({
"status": "resumed",
}),
)
.with_execution(execution_id)
}
pub fn node_started(
workflow_id: Uuid,
execution_id: Uuid,
node_id: NodeId,
node_name: &str,
) -> Self {
Self::new(
execution_events::NODE_STARTED.to_string(),
workflow_id,
serde_json::json!({
"node_id": node_id.to_string(),
"node_name": node_name,
"status": "started",
}),
)
.with_execution(execution_id)
}
pub fn node_completed(
workflow_id: Uuid,
execution_id: Uuid,
node_id: NodeId,
node_name: &str,
duration_ms: u128,
) -> Self {
Self::new(
execution_events::NODE_COMPLETED.to_string(),
workflow_id,
serde_json::json!({
"node_id": node_id.to_string(),
"node_name": node_name,
"status": "completed",
"duration_ms": duration_ms,
}),
)
.with_execution(execution_id)
}
pub fn node_failed(
workflow_id: Uuid,
execution_id: Uuid,
node_id: NodeId,
node_name: &str,
error: &str,
) -> Self {
Self::new(
execution_events::NODE_FAILED.to_string(),
workflow_id,
serde_json::json!({
"node_id": node_id.to_string(),
"node_name": node_name,
"status": "failed",
"error": error,
}),
)
.with_execution(execution_id)
}
pub fn node_retry(
workflow_id: Uuid,
execution_id: Uuid,
node_id: NodeId,
node_name: &str,
attempt: u32,
max_retries: u32,
) -> Self {
Self::new(
execution_events::NODE_RETRY.to_string(),
workflow_id,
serde_json::json!({
"node_id": node_id.to_string(),
"node_name": node_name,
"attempt": attempt,
"max_retries": max_retries,
}),
)
.with_execution(execution_id)
}
pub fn level_started(
workflow_id: Uuid,
execution_id: Uuid,
level: usize,
node_count: usize,
) -> Self {
Self::new(
execution_events::LEVEL_STARTED.to_string(),
workflow_id,
serde_json::json!({
"level": level,
"node_count": node_count,
}),
)
.with_execution(execution_id)
}
pub fn level_completed(workflow_id: Uuid, execution_id: Uuid, level: usize) -> Self {
Self::new(
execution_events::LEVEL_COMPLETED.to_string(),
workflow_id,
serde_json::json!({
"level": level,
}),
)
.with_execution(execution_id)
}
pub fn checkpoint_created(
workflow_id: Uuid,
execution_id: Uuid,
checkpoint_id: Uuid,
level: usize,
) -> Self {
Self::new(
execution_events::CHECKPOINT_CREATED.to_string(),
workflow_id,
serde_json::json!({
"checkpoint_id": checkpoint_id.to_string(),
"level": level,
}),
)
.with_execution(execution_id)
}
pub fn progress_update(
workflow_id: Uuid,
execution_id: Uuid,
completed_nodes: usize,
total_nodes: usize,
current_level: usize,
total_levels: usize,
) -> Self {
let percentage = if total_nodes > 0 {
(completed_nodes as f32 / total_nodes as f32) * 100.0
} else {
0.0
};
Self::new(
execution_events::PROGRESS_UPDATE.to_string(),
workflow_id,
serde_json::json!({
"completed_nodes": completed_nodes,
"total_nodes": total_nodes,
"current_level": current_level,
"total_levels": total_levels,
"percentage": percentage,
}),
)
.with_execution(execution_id)
}
}
pub type EventHandler = Arc<dyn Fn(WorkflowEvent) + Send + Sync>;
#[derive(Debug, Clone)]
pub struct WorkflowTrigger {
pub id: Uuid,
pub event_type: EventType,
pub workflow: Workflow,
pub filter: Option<String>,
pub enabled: bool,
}
impl WorkflowTrigger {
pub fn new(event_type: EventType, workflow: Workflow) -> Self {
Self {
id: Uuid::new_v4(),
event_type,
workflow,
filter: None,
enabled: true,
}
}
pub fn with_filter(mut self, filter: String) -> Self {
self.filter = Some(filter);
self
}
pub fn matches(&self, event: &WorkflowEvent) -> bool {
if !self.enabled {
return false;
}
if event.event_type != self.event_type {
return false;
}
if let Some(filter) = &self.filter {
if let Some((key, value)) = filter.split_once('=') {
if let Some(event_value) = event.payload.get(key) {
return event_value.as_str() == Some(value.trim());
}
return false;
}
}
true
}
}
pub struct EventBus {
sender: broadcast::Sender<WorkflowEvent>,
handlers: Arc<RwLock<HashMap<EventType, Vec<EventHandler>>>>,
triggers: Arc<RwLock<HashMap<Uuid, WorkflowTrigger>>>,
}
impl EventBus {
pub fn new(capacity: usize) -> Self {
let (sender, _) = broadcast::channel(capacity);
Self {
sender,
handlers: Arc::new(RwLock::new(HashMap::new())),
triggers: Arc::new(RwLock::new(HashMap::new())),
}
}
pub async fn publish(&self, event: WorkflowEvent) -> Result<(), String> {
let event_type = event.event_type.clone();
let _ = self.sender.send(event.clone());
let handlers = self.handlers.read().await;
if let Some(event_handlers) = handlers.get(&event_type) {
for handler in event_handlers {
handler(event.clone());
}
}
self.check_triggers(&event).await;
tracing::debug!("Published event: {} ({})", event.id, event_type);
Ok(())
}
pub fn subscribe(&self) -> broadcast::Receiver<WorkflowEvent> {
self.sender.subscribe()
}
pub async fn on(&self, event_type: EventType, handler: EventHandler) {
self.handlers
.write()
.await
.entry(event_type.clone())
.or_insert_with(Vec::new)
.push(handler);
tracing::info!("Registered handler for event type: {}", event_type);
}
pub async fn register_trigger(&self, trigger: WorkflowTrigger) -> Uuid {
let id = trigger.id;
self.triggers.write().await.insert(id, trigger);
tracing::info!("Registered workflow trigger: {}", id);
id
}
pub async fn unregister_trigger(&self, trigger_id: Uuid) -> bool {
let removed = self.triggers.write().await.remove(&trigger_id).is_some();
if removed {
tracing::info!("Unregistered trigger: {}", trigger_id);
}
removed
}
pub async fn list_triggers(&self) -> Vec<WorkflowTrigger> {
self.triggers.read().await.values().cloned().collect()
}
async fn check_triggers(&self, event: &WorkflowEvent) {
let triggers = self.triggers.read().await;
for trigger in triggers.values() {
if trigger.matches(event) {
tracing::info!("Trigger {} matched event {}", trigger.id, event.id);
}
}
}
}
impl Default for EventBus {
fn default() -> Self {
Self::new(1000)
}
}
#[cfg(test)]
mod tests {
use super::*;
use oxify_model::{Node, NodeKind, WorkflowMetadata};
#[test]
fn test_event_creation() {
let event = WorkflowEvent::new(
"test.event".to_string(),
Uuid::new_v4(),
serde_json::json!({"key": "value"}),
);
assert_eq!(event.event_type, "test.event");
assert!(event.execution_id.is_none());
}
#[test]
fn test_trigger_matching() {
let workflow = Workflow {
metadata: WorkflowMetadata::new("Test".to_string()),
nodes: vec![
Node::new("Start".to_string(), NodeKind::Start),
Node::new("End".to_string(), NodeKind::End),
],
edges: vec![],
};
let trigger = WorkflowTrigger::new("user.created".to_string(), workflow);
let matching_event = WorkflowEvent::new(
"user.created".to_string(),
Uuid::new_v4(),
serde_json::json!({}),
);
let non_matching_event = WorkflowEvent::new(
"user.deleted".to_string(),
Uuid::new_v4(),
serde_json::json!({}),
);
assert!(trigger.matches(&matching_event));
assert!(!trigger.matches(&non_matching_event));
}
#[tokio::test]
async fn test_event_bus_publish() {
let bus = EventBus::new(10);
let event = WorkflowEvent::new(
"test.event".to_string(),
Uuid::new_v4(),
serde_json::json!({}),
);
let result = bus.publish(event).await;
assert!(result.is_ok());
}
#[tokio::test]
async fn test_event_bus_subscribe() {
let bus = EventBus::new(10);
let mut rx = bus.subscribe();
let event = WorkflowEvent::new(
"test.event".to_string(),
Uuid::new_v4(),
serde_json::json!({}),
);
bus.publish(event.clone()).await.unwrap();
let received = rx.recv().await.unwrap();
assert_eq!(received.id, event.id);
}
}