Skip to main content

blazen_events/
lib.rs

1//! # `Blazen` Events
2//!
3//! Defines the core event traits and built-in event types used for
4//! inter-component communication within the `Blazen` framework.
5//!
6//! Every piece of data flowing between workflow steps is an [`Event`].
7//! Events carry a stable string identifier ([`Event::event_type`]) used for
8//! routing, serialization, and cross-language boundaries.
9//!
10//! The [`AnyEvent`] trait provides type-erased access so that the internal
11//! event queue can hold heterogeneous events without generics.
12
13use std::any::Any;
14use std::fmt::Debug;
15
16use chrono::{DateTime, Utc};
17use serde::{Deserialize, Serialize};
18use uuid::Uuid;
19
20mod progress_event;
21mod usage_event;
22
23pub use progress_event::{ProgressEvent, ProgressKind};
24pub use usage_event::{Modality, UsageEvent};
25
26// ---------------------------------------------------------------------------
27// Core traits
28// ---------------------------------------------------------------------------
29
30/// The core routing trait.
31///
32/// Every piece of data flowing between workflow steps implements `Event`.
33/// Implementors must be `Send + Sync + Debug + Clone + 'static` so that
34/// events can be safely transferred across threads and stored in queues.
35pub trait Event: Send + Sync + Debug + Clone + 'static {
36    /// Stable string identifier for this event type.
37    ///
38    /// Used for routing, serialization, and cross-language boundaries.
39    /// By convention the string takes the form `"crate::TypeName"`.
40    fn event_type() -> &'static str
41    where
42        Self: Sized;
43
44    /// Instance method version of [`Event::event_type`] for dynamic dispatch.
45    fn event_type_id(&self) -> &'static str;
46
47    /// Upcast to [`Any`] for type-erasure in the event queue.
48    fn as_any(&self) -> &dyn Any;
49
50    /// Clone into a boxed trait object.
51    fn clone_boxed(&self) -> Box<dyn AnyEvent>;
52
53    /// Serialize to JSON for cross-language boundaries and persistence.
54    #[must_use]
55    fn to_json(&self) -> serde_json::Value;
56}
57
58/// Type-erased event for the internal event queue.
59///
60/// This trait mirrors the instance methods of [`Event`] but drops the
61/// `Clone` and `Sized` bounds so it can be used as a trait object.
62pub trait AnyEvent: Send + Sync + Debug {
63    /// Returns the stable string identifier for this event type.
64    fn event_type_id(&self) -> &'static str;
65
66    /// Upcast to [`Any`] for downcasting back to the concrete type.
67    fn as_any(&self) -> &dyn Any;
68
69    /// Clone into a new boxed trait object.
70    fn clone_boxed(&self) -> Box<dyn AnyEvent>;
71
72    /// Serialize to JSON for cross-language boundaries and persistence.
73    #[must_use]
74    fn to_json(&self) -> serde_json::Value;
75}
76
77// Blanket implementation: anything that is `Event + Serialize` is `AnyEvent`.
78impl<T> AnyEvent for T
79where
80    T: Event + Serialize,
81{
82    fn event_type_id(&self) -> &'static str {
83        Event::event_type_id(self)
84    }
85
86    fn as_any(&self) -> &dyn Any {
87        Event::as_any(self)
88    }
89
90    fn clone_boxed(&self) -> Box<dyn AnyEvent> {
91        Event::clone_boxed(self)
92    }
93
94    fn to_json(&self) -> serde_json::Value {
95        Event::to_json(self)
96    }
97}
98
99// Downcast helper on `dyn AnyEvent`.
100impl dyn AnyEvent {
101    /// Attempt to downcast the type-erased event to a concrete type `T`.
102    ///
103    /// Returns `Some(&T)` if the underlying event is of type `T`, or `None`
104    /// otherwise.
105    #[must_use]
106    pub fn downcast_ref<T: Event>(&self) -> Option<&T> {
107        self.as_any().downcast_ref::<T>()
108    }
109}
110
111// Allow cloning boxed trait objects via `clone_boxed`.
112impl Clone for Box<dyn AnyEvent> {
113    fn clone(&self) -> Self {
114        self.clone_boxed()
115    }
116}
117
118// ---------------------------------------------------------------------------
119// Built-in events
120// ---------------------------------------------------------------------------
121
122/// Emitted to kick off a workflow with arbitrary JSON data.
123#[derive(Debug, Clone, Serialize, Deserialize)]
124pub struct StartEvent {
125    /// Arbitrary payload passed into the workflow at start.
126    pub data: serde_json::Value,
127}
128
129impl Event for StartEvent {
130    fn event_type() -> &'static str {
131        static REGISTER: std::sync::Once = std::sync::Once::new();
132        REGISTER.call_once(|| {
133            register_event_deserializer("blazen::StartEvent", |value| {
134                serde_json::from_value::<StartEvent>(value)
135                    .ok()
136                    .map(|e| Box::new(e) as _)
137            });
138        });
139        "blazen::StartEvent"
140    }
141
142    fn event_type_id(&self) -> &'static str {
143        "blazen::StartEvent"
144    }
145
146    fn as_any(&self) -> &dyn Any {
147        self
148    }
149
150    fn clone_boxed(&self) -> Box<dyn AnyEvent> {
151        Box::new(self.clone())
152    }
153
154    fn to_json(&self) -> serde_json::Value {
155        serde_json::to_value(self).expect("StartEvent serialization should never fail")
156    }
157}
158
159/// Emitted to signal that a workflow has completed with a result.
160#[derive(Debug, Clone, Serialize, Deserialize)]
161pub struct StopEvent {
162    /// The final result of the workflow.
163    pub result: serde_json::Value,
164}
165
166impl Event for StopEvent {
167    fn event_type() -> &'static str {
168        static REGISTER: std::sync::Once = std::sync::Once::new();
169        REGISTER.call_once(|| {
170            register_event_deserializer("blazen::StopEvent", |value| {
171                serde_json::from_value::<StopEvent>(value)
172                    .ok()
173                    .map(|e| Box::new(e) as _)
174            });
175        });
176        "blazen::StopEvent"
177    }
178
179    fn event_type_id(&self) -> &'static str {
180        "blazen::StopEvent"
181    }
182
183    fn as_any(&self) -> &dyn Any {
184        self
185    }
186
187    fn clone_boxed(&self) -> Box<dyn AnyEvent> {
188        Box::new(self.clone())
189    }
190
191    fn to_json(&self) -> serde_json::Value {
192        serde_json::to_value(self).expect("StopEvent serialization should never fail")
193    }
194}
195
196/// Emitted by a step to request human input. Triggers auto-pause.
197#[derive(Debug, Clone, Serialize, Deserialize)]
198pub struct InputRequestEvent {
199    /// Unique ID for this request (for matching response).
200    pub request_id: String,
201    /// The question/prompt to show the human.
202    pub prompt: String,
203    /// Optional structured metadata (choices, type hints, etc.).
204    pub metadata: serde_json::Value,
205}
206
207impl Event for InputRequestEvent {
208    fn event_type() -> &'static str {
209        static REGISTER: std::sync::Once = std::sync::Once::new();
210        REGISTER.call_once(|| {
211            register_event_deserializer("blazen::InputRequestEvent", |value| {
212                serde_json::from_value::<InputRequestEvent>(value)
213                    .ok()
214                    .map(|e| Box::new(e) as _)
215            });
216        });
217        "blazen::InputRequestEvent"
218    }
219
220    fn event_type_id(&self) -> &'static str {
221        "blazen::InputRequestEvent"
222    }
223
224    fn as_any(&self) -> &dyn Any {
225        self
226    }
227
228    fn clone_boxed(&self) -> Box<dyn AnyEvent> {
229        Box::new(self.clone())
230    }
231
232    fn to_json(&self) -> serde_json::Value {
233        serde_json::to_value(self).expect("InputRequestEvent serialization should never fail")
234    }
235}
236
237/// The human's response, injected on resume.
238#[derive(Debug, Clone, Serialize, Deserialize)]
239pub struct InputResponseEvent {
240    /// Matches the `InputRequestEvent.request_id`.
241    pub request_id: String,
242    /// The human's answer.
243    pub response: serde_json::Value,
244}
245
246impl Event for InputResponseEvent {
247    fn event_type() -> &'static str {
248        static REGISTER: std::sync::Once = std::sync::Once::new();
249        REGISTER.call_once(|| {
250            register_event_deserializer("blazen::InputResponseEvent", |value| {
251                serde_json::from_value::<InputResponseEvent>(value)
252                    .ok()
253                    .map(|e| Box::new(e) as _)
254            });
255        });
256        "blazen::InputResponseEvent"
257    }
258
259    fn event_type_id(&self) -> &'static str {
260        "blazen::InputResponseEvent"
261    }
262
263    fn as_any(&self) -> &dyn Any {
264        self
265    }
266
267    fn clone_boxed(&self) -> Box<dyn AnyEvent> {
268        Box::new(self.clone())
269    }
270
271    fn to_json(&self) -> serde_json::Value {
272        serde_json::to_value(self).expect("InputResponseEvent serialization should never fail")
273    }
274}
275
276// ---------------------------------------------------------------------------
277// EventEnvelope
278// ---------------------------------------------------------------------------
279
280/// Wraps an event with metadata for the internal queue.
281///
282/// Each envelope carries a unique id, a timestamp, and an optional source step
283/// name so the runtime can trace event provenance.
284#[derive(Debug)]
285pub struct EventEnvelope {
286    /// The type-erased event payload.
287    pub event: Box<dyn AnyEvent>,
288    /// The name of the step that produced this event, if any.
289    pub source_step: Option<String>,
290    /// When the envelope was created.
291    pub timestamp: DateTime<Utc>,
292    /// Unique identifier for this envelope.
293    pub id: Uuid,
294}
295
296impl EventEnvelope {
297    /// Create a new envelope, automatically filling in the timestamp and id.
298    #[must_use]
299    pub fn new(event: Box<dyn AnyEvent>, source_step: Option<String>) -> Self {
300        Self {
301            event,
302            source_step,
303            timestamp: Utc::now(),
304            id: Uuid::new_v4(),
305        }
306    }
307}
308
309// ---------------------------------------------------------------------------
310// Event deserializer registry
311// ---------------------------------------------------------------------------
312
313/// Function signature for deserializing JSON into a concrete event type.
314pub type EventDeserializerFn = fn(serde_json::Value) -> Option<Box<dyn AnyEvent>>;
315
316/// Global registry mapping event type strings to deserializer functions.
317///
318/// When a workflow is resumed from a snapshot, pending events are stored as
319/// JSON. This registry allows the runtime to reconstruct concrete event types
320/// from that JSON, avoiding the `DynamicEvent` wrapper that breaks
321/// `downcast_ref`.
322static EVENT_DESERIALIZER_REGISTRY: std::sync::LazyLock<
323    dashmap::DashMap<&'static str, EventDeserializerFn>,
324> = std::sync::LazyLock::new(dashmap::DashMap::new);
325
326/// Register a deserializer function for a given event type string.
327///
328/// Typically called once per event type, guarded by [`std::sync::Once`],
329/// inside the `event_type()` method of each concrete event.
330pub fn register_event_deserializer(event_type: &'static str, deserializer: EventDeserializerFn) {
331    EVENT_DESERIALIZER_REGISTRY.insert(event_type, deserializer);
332}
333
334/// Attempt to deserialize a JSON value into a concrete event type using the
335/// registry.
336///
337/// Returns `Some(boxed_event)` if a deserializer is registered for the given
338/// event type and deserialization succeeds, or `None` otherwise.
339pub fn try_deserialize_event(
340    event_type: &str,
341    data: &serde_json::Value,
342) -> Option<Box<dyn AnyEvent>> {
343    let entry = EVENT_DESERIALIZER_REGISTRY.get(event_type)?;
344    let deserializer = *entry.value();
345    deserializer(data.clone())
346}
347
348// ---------------------------------------------------------------------------
349// Dynamic event type interning
350// ---------------------------------------------------------------------------
351
352/// Thread-safe registry that interns event type names into `&'static str`.
353///
354/// The [`Event`] trait requires `&'static str` for `event_type_id()`, but
355/// dynamic events from foreign language bindings carry runtime type names.
356/// We leak a small, bounded number of strings once and reuse them forever.
357static EVENT_TYPE_REGISTRY: std::sync::LazyLock<dashmap::DashMap<String, &'static str>> =
358    std::sync::LazyLock::new(dashmap::DashMap::new);
359
360/// Intern a dynamic event type name, returning a `&'static str`.
361///
362/// If the name has been interned before, the same pointer is returned.
363/// Otherwise the string is heap-allocated and leaked so it lives for
364/// `'static`.
365pub fn intern_event_type(name: &str) -> &'static str {
366    if let Some(entry) = EVENT_TYPE_REGISTRY.get(name) {
367        return entry.value();
368    }
369    let leaked: &'static str = Box::leak(name.to_string().into_boxed_str());
370    EVENT_TYPE_REGISTRY.insert(name.to_string(), leaked);
371    leaked
372}
373
374// ---------------------------------------------------------------------------
375// DynamicEvent
376// ---------------------------------------------------------------------------
377
378/// A type-erased event that carries its type name and payload as JSON.
379///
380/// Used to transport events defined in foreign language bindings (Python,
381/// TypeScript) through the Rust workflow engine.
382#[derive(Debug, Clone, Serialize, Deserialize)]
383pub struct DynamicEvent {
384    /// The event type identifier (e.g. `"AnalyzeEvent"`).
385    pub event_type: String,
386    /// The event data as a JSON object.
387    pub data: serde_json::Value,
388}
389
390impl Event for DynamicEvent {
391    fn event_type() -> &'static str
392    where
393        Self: Sized,
394    {
395        // This static method cannot return a dynamic string, but it is only
396        // used when you know the concrete type at compile time. For dynamic
397        // dispatch the instance method `event_type_id` is used instead.
398        "dynamic"
399    }
400
401    fn event_type_id(&self) -> &'static str {
402        intern_event_type(&self.event_type)
403    }
404
405    fn as_any(&self) -> &dyn Any {
406        self
407    }
408
409    fn clone_boxed(&self) -> Box<dyn AnyEvent> {
410        Box::new(self.clone())
411    }
412
413    fn to_json(&self) -> serde_json::Value {
414        self.data.clone()
415    }
416}
417
418// ---------------------------------------------------------------------------
419// Tests
420// ---------------------------------------------------------------------------
421
422#[cfg(test)]
423mod tests {
424    use super::*;
425
426    #[test]
427    fn start_event_type_id() {
428        assert_eq!(StartEvent::event_type(), "blazen::StartEvent");
429        let evt = StartEvent {
430            data: serde_json::json!({"key": "value"}),
431        };
432        assert_eq!(Event::event_type_id(&evt), "blazen::StartEvent");
433    }
434
435    #[test]
436    fn stop_event_type_id() {
437        assert_eq!(StopEvent::event_type(), "blazen::StopEvent");
438        let evt = StopEvent {
439            result: serde_json::json!(42),
440        };
441        assert_eq!(Event::event_type_id(&evt), "blazen::StopEvent");
442    }
443
444    #[test]
445    fn any_event_downcast() {
446        let evt = StartEvent {
447            data: serde_json::json!(null),
448        };
449        let boxed: Box<dyn AnyEvent> = Box::new(evt.clone());
450        let downcasted = boxed.downcast_ref::<StartEvent>().unwrap();
451        assert_eq!(downcasted.data, evt.data);
452
453        // Wrong type returns None.
454        assert!(boxed.downcast_ref::<StopEvent>().is_none());
455    }
456
457    #[test]
458    fn clone_boxed_any_event() {
459        let evt = StopEvent {
460            result: serde_json::json!("done"),
461        };
462        let boxed: Box<dyn AnyEvent> = Box::new(evt);
463        let cloned = boxed.clone();
464        assert_eq!(boxed.event_type_id(), cloned.event_type_id());
465        assert_eq!(boxed.to_json(), cloned.to_json());
466    }
467
468    #[test]
469    fn event_envelope_constructor() {
470        let evt = StartEvent {
471            data: serde_json::json!({"hello": "world"}),
472        };
473        let envelope = EventEnvelope::new(Box::new(evt), Some("my_step".to_string()));
474        assert_eq!(envelope.source_step.as_deref(), Some("my_step"));
475        assert_eq!(envelope.event.event_type_id(), "blazen::StartEvent");
476    }
477
478    #[test]
479    fn to_json_roundtrip() {
480        let start = StartEvent {
481            data: serde_json::json!({"nums": [1, 2, 3]}),
482        };
483        let json = Event::to_json(&start);
484        let deserialized: StartEvent = serde_json::from_value(json).unwrap();
485        assert_eq!(start.data, deserialized.data);
486
487        let stop = StopEvent {
488            result: serde_json::json!("ok"),
489        };
490        let json = Event::to_json(&stop);
491        let deserialized: StopEvent = serde_json::from_value(json).unwrap();
492        assert_eq!(stop.result, deserialized.result);
493    }
494
495    #[test]
496    fn intern_event_type_returns_same_pointer() {
497        let a = intern_event_type("TestEventInEvents");
498        let b = intern_event_type("TestEventInEvents");
499        assert!(std::ptr::eq(a, b));
500    }
501
502    #[test]
503    fn dynamic_event_roundtrip() {
504        let evt = DynamicEvent {
505            event_type: "MyEvent".to_owned(),
506            data: serde_json::json!({"key": "value"}),
507        };
508        let json = Event::to_json(&evt);
509        // DynamicEvent::to_json() now returns the flat data directly.
510        assert_eq!(json["key"], "value");
511    }
512
513    #[test]
514    fn dynamic_event_type_id() {
515        let evt = DynamicEvent {
516            event_type: "CustomEvent".to_owned(),
517            data: serde_json::json!({}),
518        };
519        assert_eq!(Event::event_type_id(&evt), "CustomEvent");
520    }
521
522    #[test]
523    fn input_request_event_type_id() {
524        assert_eq!(InputRequestEvent::event_type(), "blazen::InputRequestEvent");
525        let evt = InputRequestEvent {
526            request_id: "req-1".to_string(),
527            prompt: "What is your name?".to_string(),
528            metadata: serde_json::json!({"choices": ["Alice", "Bob"]}),
529        };
530        assert_eq!(Event::event_type_id(&evt), "blazen::InputRequestEvent");
531    }
532
533    #[test]
534    fn input_response_event_type_id() {
535        assert_eq!(
536            InputResponseEvent::event_type(),
537            "blazen::InputResponseEvent"
538        );
539        let evt = InputResponseEvent {
540            request_id: "req-1".to_string(),
541            response: serde_json::json!("Alice"),
542        };
543        assert_eq!(Event::event_type_id(&evt), "blazen::InputResponseEvent");
544    }
545
546    #[test]
547    fn input_request_event_roundtrip() {
548        let evt = InputRequestEvent {
549            request_id: "req-42".to_string(),
550            prompt: "Pick a number".to_string(),
551            metadata: serde_json::json!({"min": 1, "max": 100}),
552        };
553        let json = Event::to_json(&evt);
554        let deserialized: InputRequestEvent = serde_json::from_value(json).unwrap();
555        assert_eq!(evt.request_id, deserialized.request_id);
556        assert_eq!(evt.prompt, deserialized.prompt);
557        assert_eq!(evt.metadata, deserialized.metadata);
558    }
559
560    #[test]
561    fn input_response_event_roundtrip() {
562        let evt = InputResponseEvent {
563            request_id: "req-42".to_string(),
564            response: serde_json::json!(77),
565        };
566        let json = Event::to_json(&evt);
567        let deserialized: InputResponseEvent = serde_json::from_value(json).unwrap();
568        assert_eq!(evt.request_id, deserialized.request_id);
569        assert_eq!(evt.response, deserialized.response);
570    }
571
572    #[test]
573    fn input_request_event_downcast() {
574        let evt = InputRequestEvent {
575            request_id: "req-99".to_string(),
576            prompt: "Confirm?".to_string(),
577            metadata: serde_json::json!(null),
578        };
579        let boxed: Box<dyn AnyEvent> = Box::new(evt.clone());
580        let downcasted = boxed.downcast_ref::<InputRequestEvent>().unwrap();
581        assert_eq!(downcasted.request_id, evt.request_id);
582
583        // Wrong type returns None.
584        assert!(boxed.downcast_ref::<InputResponseEvent>().is_none());
585    }
586
587    #[test]
588    fn input_response_event_downcast() {
589        let evt = InputResponseEvent {
590            request_id: "req-99".to_string(),
591            response: serde_json::json!({"answer": true}),
592        };
593        let boxed: Box<dyn AnyEvent> = Box::new(evt.clone());
594        let downcasted = boxed.downcast_ref::<InputResponseEvent>().unwrap();
595        assert_eq!(downcasted.request_id, evt.request_id);
596
597        // Wrong type returns None.
598        assert!(boxed.downcast_ref::<InputRequestEvent>().is_none());
599    }
600}