use anyhow::Result;
use async_trait::async_trait;
use chrono::{DateTime, Utc};
#[derive(Debug, Clone, serde::Serialize, serde::Deserialize)]
pub struct ExecutorEvent {
pub execution_id: i64,
pub event_type: String,
pub step: String,
pub status: String,
pub created_at: DateTime<Utc>,
#[serde(alias = "payload")]
pub context: serde_json::Value,
#[serde(default, skip_serializing_if = "Option::is_none")]
pub event_id: Option<i64>,
#[serde(default, skip_serializing_if = "Option::is_none")]
pub worker_id: Option<String>,
#[serde(default, skip_serializing_if = "Option::is_none")]
pub meta: Option<serde_json::Value>,
}
#[async_trait]
pub trait EventSink: Send + Sync {
async fn emit(&self, event: ExecutorEvent) -> Result<()>;
}
pub struct EventEmitter {
pub sink: std::sync::Arc<dyn EventSink>,
pub execution_id: i64,
}
impl EventEmitter {
pub fn new(execution_id: i64, sink: std::sync::Arc<dyn EventSink>) -> Self {
Self { sink, execution_id }
}
pub async fn emit(
&self,
event_type: &str,
step: &str,
status: &str,
context: serde_json::Value,
) -> Result<()> {
let event = ExecutorEvent {
execution_id: self.execution_id,
event_type: event_type.to_string(),
step: step.to_string(),
status: status.to_string(),
created_at: Utc::now(),
context,
event_id: None,
worker_id: None,
meta: None,
};
self.sink.emit(event).await
}
}
#[derive(Default)]
pub struct NoopSink;
#[async_trait]
impl EventSink for NoopSink {
async fn emit(&self, _event: ExecutorEvent) -> Result<()> {
Ok(())
}
}
#[cfg(test)]
mod tests {
use super::*;
use std::sync::Arc;
#[tokio::test]
async fn noop_sink_accepts_any_event() {
let sink: Arc<dyn EventSink> = Arc::new(NoopSink);
let emitter = EventEmitter::new(12345, sink);
emitter
.emit(
"batch.completed",
"start",
"COMPLETED",
serde_json::json!({"processing_ms": 12.3}),
)
.await
.expect("noop emit");
}
fn dummy_event() -> ExecutorEvent {
ExecutorEvent {
execution_id: 1,
event_type: "step.enter".to_string(),
step: "fetch".to_string(),
status: "STARTED".to_string(),
created_at: Utc::now(),
context: serde_json::json!({}),
event_id: None,
worker_id: None,
meta: None,
}
}
#[test]
fn new_optional_fields_omit_from_serialized_json_when_none() {
let event = dummy_event();
let json = serde_json::to_value(&event).unwrap();
assert!(json.get("event_id").is_none(), "event_id omitted");
assert!(json.get("worker_id").is_none(), "worker_id omitted");
assert!(json.get("meta").is_none(), "meta omitted");
}
#[test]
fn new_optional_fields_serialize_when_present() {
let event = ExecutorEvent {
event_id: Some(478775660589088777),
worker_id: Some("worker-1".to_string()),
meta: Some(serde_json::json!({"attempts": 2})),
..dummy_event()
};
let json = serde_json::to_value(&event).unwrap();
assert_eq!(json["event_id"], serde_json::json!(478775660589088777_i64));
assert_eq!(json["worker_id"], "worker-1");
assert_eq!(json["meta"]["attempts"], 2);
}
#[test]
fn deserializes_payload_alias_into_context() {
let json = serde_json::json!({
"execution_id": 5,
"event_type": "step.enter",
"step": "s",
"status": "STARTED",
"created_at": "2026-05-31T00:00:00Z",
"payload": {"foo": "bar"},
});
let event: ExecutorEvent = serde_json::from_value(json).unwrap();
assert_eq!(event.context, serde_json::json!({"foo": "bar"}));
}
#[test]
fn deserializes_missing_optional_fields_with_none() {
let json = serde_json::json!({
"execution_id": 5,
"event_type": "step.enter",
"step": "s",
"status": "STARTED",
"created_at": "2026-05-31T00:00:00Z",
"context": {},
});
let event: ExecutorEvent = serde_json::from_value(json).unwrap();
assert!(event.event_id.is_none());
assert!(event.worker_id.is_none());
assert!(event.meta.is_none());
}
#[test]
fn round_trips_with_all_optional_fields_set() {
let original = ExecutorEvent {
execution_id: 478775660589088776,
event_type: "command.completed".to_string(),
step: "fetch_calendar".to_string(),
status: "COMPLETED".to_string(),
created_at: chrono::DateTime::parse_from_rfc3339("2026-05-31T03:14:15Z")
.unwrap()
.with_timezone(&Utc),
context: serde_json::json!({"result": {"items": 42}}),
event_id: Some(478775660589088777),
worker_id: Some("worker-prod-7".to_string()),
meta: Some(serde_json::json!({"attempts": 3, "parent_event_id": "478775660589088770"})),
};
let json = serde_json::to_value(&original).unwrap();
let parsed: ExecutorEvent = serde_json::from_value(json).unwrap();
assert_eq!(parsed.execution_id, original.execution_id);
assert_eq!(parsed.event_id, original.event_id);
assert_eq!(parsed.worker_id, original.worker_id);
assert_eq!(parsed.meta, original.meta);
}
}