Skip to main content

blazen_events/
lib.rs

1//! # `Blazen` Events
2//!
3//! Defines the core event traits and built-in event types used for
4//! inter-component communication within the `Blazen` framework.
5//!
6//! Every piece of data flowing between workflow steps is an [`Event`].
7//! Events carry a stable string identifier ([`Event::event_type`]) used for
8//! routing, serialization, and cross-language boundaries.
9//!
10//! The [`AnyEvent`] trait provides type-erased access so that the internal
11//! event queue can hold heterogeneous events without generics.
12
13use std::any::Any;
14use std::fmt::Debug;
15
16use chrono::{DateTime, Utc};
17use serde::{Deserialize, Serialize};
18use uuid::Uuid;
19
20// ---------------------------------------------------------------------------
21// Core traits
22// ---------------------------------------------------------------------------
23
24/// The core routing trait.
25///
26/// Every piece of data flowing between workflow steps implements `Event`.
27/// Implementors must be `Send + Sync + Debug + Clone + 'static` so that
28/// events can be safely transferred across threads and stored in queues.
29pub trait Event: Send + Sync + Debug + Clone + 'static {
30    /// Stable string identifier for this event type.
31    ///
32    /// Used for routing, serialization, and cross-language boundaries.
33    /// By convention the string takes the form `"crate::TypeName"`.
34    fn event_type() -> &'static str
35    where
36        Self: Sized;
37
38    /// Instance method version of [`Event::event_type`] for dynamic dispatch.
39    fn event_type_id(&self) -> &'static str;
40
41    /// Upcast to [`Any`] for type-erasure in the event queue.
42    fn as_any(&self) -> &dyn Any;
43
44    /// Clone into a boxed trait object.
45    fn clone_boxed(&self) -> Box<dyn AnyEvent>;
46
47    /// Serialize to JSON for cross-language boundaries and persistence.
48    #[must_use]
49    fn to_json(&self) -> serde_json::Value;
50}
51
52/// Type-erased event for the internal event queue.
53///
54/// This trait mirrors the instance methods of [`Event`] but drops the
55/// `Clone` and `Sized` bounds so it can be used as a trait object.
56pub trait AnyEvent: Send + Sync + Debug {
57    /// Returns the stable string identifier for this event type.
58    fn event_type_id(&self) -> &'static str;
59
60    /// Upcast to [`Any`] for downcasting back to the concrete type.
61    fn as_any(&self) -> &dyn Any;
62
63    /// Clone into a new boxed trait object.
64    fn clone_boxed(&self) -> Box<dyn AnyEvent>;
65
66    /// Serialize to JSON for cross-language boundaries and persistence.
67    #[must_use]
68    fn to_json(&self) -> serde_json::Value;
69}
70
71// Blanket implementation: anything that is `Event + Serialize` is `AnyEvent`.
72impl<T> AnyEvent for T
73where
74    T: Event + Serialize,
75{
76    fn event_type_id(&self) -> &'static str {
77        Event::event_type_id(self)
78    }
79
80    fn as_any(&self) -> &dyn Any {
81        Event::as_any(self)
82    }
83
84    fn clone_boxed(&self) -> Box<dyn AnyEvent> {
85        Event::clone_boxed(self)
86    }
87
88    fn to_json(&self) -> serde_json::Value {
89        Event::to_json(self)
90    }
91}
92
93// Downcast helper on `dyn AnyEvent`.
94impl dyn AnyEvent {
95    /// Attempt to downcast the type-erased event to a concrete type `T`.
96    ///
97    /// Returns `Some(&T)` if the underlying event is of type `T`, or `None`
98    /// otherwise.
99    #[must_use]
100    pub fn downcast_ref<T: Event>(&self) -> Option<&T> {
101        self.as_any().downcast_ref::<T>()
102    }
103}
104
105// Allow cloning boxed trait objects via `clone_boxed`.
106impl Clone for Box<dyn AnyEvent> {
107    fn clone(&self) -> Self {
108        self.clone_boxed()
109    }
110}
111
112// ---------------------------------------------------------------------------
113// Built-in events
114// ---------------------------------------------------------------------------
115
116/// Emitted to kick off a workflow with arbitrary JSON data.
117#[derive(Debug, Clone, Serialize, Deserialize)]
118pub struct StartEvent {
119    /// Arbitrary payload passed into the workflow at start.
120    pub data: serde_json::Value,
121}
122
123impl Event for StartEvent {
124    fn event_type() -> &'static str {
125        static REGISTER: std::sync::Once = std::sync::Once::new();
126        REGISTER.call_once(|| {
127            register_event_deserializer("blazen::StartEvent", |value| {
128                serde_json::from_value::<StartEvent>(value)
129                    .ok()
130                    .map(|e| Box::new(e) as _)
131            });
132        });
133        "blazen::StartEvent"
134    }
135
136    fn event_type_id(&self) -> &'static str {
137        "blazen::StartEvent"
138    }
139
140    fn as_any(&self) -> &dyn Any {
141        self
142    }
143
144    fn clone_boxed(&self) -> Box<dyn AnyEvent> {
145        Box::new(self.clone())
146    }
147
148    fn to_json(&self) -> serde_json::Value {
149        serde_json::to_value(self).expect("StartEvent serialization should never fail")
150    }
151}
152
153/// Emitted to signal that a workflow has completed with a result.
154#[derive(Debug, Clone, Serialize, Deserialize)]
155pub struct StopEvent {
156    /// The final result of the workflow.
157    pub result: serde_json::Value,
158}
159
160impl Event for StopEvent {
161    fn event_type() -> &'static str {
162        static REGISTER: std::sync::Once = std::sync::Once::new();
163        REGISTER.call_once(|| {
164            register_event_deserializer("blazen::StopEvent", |value| {
165                serde_json::from_value::<StopEvent>(value)
166                    .ok()
167                    .map(|e| Box::new(e) as _)
168            });
169        });
170        "blazen::StopEvent"
171    }
172
173    fn event_type_id(&self) -> &'static str {
174        "blazen::StopEvent"
175    }
176
177    fn as_any(&self) -> &dyn Any {
178        self
179    }
180
181    fn clone_boxed(&self) -> Box<dyn AnyEvent> {
182        Box::new(self.clone())
183    }
184
185    fn to_json(&self) -> serde_json::Value {
186        serde_json::to_value(self).expect("StopEvent serialization should never fail")
187    }
188}
189
190/// Emitted by a step to request human input. Triggers auto-pause.
191#[derive(Debug, Clone, Serialize, Deserialize)]
192pub struct InputRequestEvent {
193    /// Unique ID for this request (for matching response).
194    pub request_id: String,
195    /// The question/prompt to show the human.
196    pub prompt: String,
197    /// Optional structured metadata (choices, type hints, etc.).
198    pub metadata: serde_json::Value,
199}
200
201impl Event for InputRequestEvent {
202    fn event_type() -> &'static str {
203        static REGISTER: std::sync::Once = std::sync::Once::new();
204        REGISTER.call_once(|| {
205            register_event_deserializer("blazen::InputRequestEvent", |value| {
206                serde_json::from_value::<InputRequestEvent>(value)
207                    .ok()
208                    .map(|e| Box::new(e) as _)
209            });
210        });
211        "blazen::InputRequestEvent"
212    }
213
214    fn event_type_id(&self) -> &'static str {
215        "blazen::InputRequestEvent"
216    }
217
218    fn as_any(&self) -> &dyn Any {
219        self
220    }
221
222    fn clone_boxed(&self) -> Box<dyn AnyEvent> {
223        Box::new(self.clone())
224    }
225
226    fn to_json(&self) -> serde_json::Value {
227        serde_json::to_value(self).expect("InputRequestEvent serialization should never fail")
228    }
229}
230
231/// The human's response, injected on resume.
232#[derive(Debug, Clone, Serialize, Deserialize)]
233pub struct InputResponseEvent {
234    /// Matches the `InputRequestEvent.request_id`.
235    pub request_id: String,
236    /// The human's answer.
237    pub response: serde_json::Value,
238}
239
240impl Event for InputResponseEvent {
241    fn event_type() -> &'static str {
242        static REGISTER: std::sync::Once = std::sync::Once::new();
243        REGISTER.call_once(|| {
244            register_event_deserializer("blazen::InputResponseEvent", |value| {
245                serde_json::from_value::<InputResponseEvent>(value)
246                    .ok()
247                    .map(|e| Box::new(e) as _)
248            });
249        });
250        "blazen::InputResponseEvent"
251    }
252
253    fn event_type_id(&self) -> &'static str {
254        "blazen::InputResponseEvent"
255    }
256
257    fn as_any(&self) -> &dyn Any {
258        self
259    }
260
261    fn clone_boxed(&self) -> Box<dyn AnyEvent> {
262        Box::new(self.clone())
263    }
264
265    fn to_json(&self) -> serde_json::Value {
266        serde_json::to_value(self).expect("InputResponseEvent serialization should never fail")
267    }
268}
269
270// ---------------------------------------------------------------------------
271// EventEnvelope
272// ---------------------------------------------------------------------------
273
274/// Wraps an event with metadata for the internal queue.
275///
276/// Each envelope carries a unique id, a timestamp, and an optional source step
277/// name so the runtime can trace event provenance.
278#[derive(Debug)]
279pub struct EventEnvelope {
280    /// The type-erased event payload.
281    pub event: Box<dyn AnyEvent>,
282    /// The name of the step that produced this event, if any.
283    pub source_step: Option<String>,
284    /// When the envelope was created.
285    pub timestamp: DateTime<Utc>,
286    /// Unique identifier for this envelope.
287    pub id: Uuid,
288}
289
290impl EventEnvelope {
291    /// Create a new envelope, automatically filling in the timestamp and id.
292    #[must_use]
293    pub fn new(event: Box<dyn AnyEvent>, source_step: Option<String>) -> Self {
294        Self {
295            event,
296            source_step,
297            timestamp: Utc::now(),
298            id: Uuid::new_v4(),
299        }
300    }
301}
302
303// ---------------------------------------------------------------------------
304// Event deserializer registry
305// ---------------------------------------------------------------------------
306
307/// Function signature for deserializing JSON into a concrete event type.
308pub type EventDeserializerFn = fn(serde_json::Value) -> Option<Box<dyn AnyEvent>>;
309
310/// Global registry mapping event type strings to deserializer functions.
311///
312/// When a workflow is resumed from a snapshot, pending events are stored as
313/// JSON. This registry allows the runtime to reconstruct concrete event types
314/// from that JSON, avoiding the `DynamicEvent` wrapper that breaks
315/// `downcast_ref`.
316static EVENT_DESERIALIZER_REGISTRY: std::sync::LazyLock<
317    dashmap::DashMap<&'static str, EventDeserializerFn>,
318> = std::sync::LazyLock::new(dashmap::DashMap::new);
319
320/// Register a deserializer function for a given event type string.
321///
322/// Typically called once per event type, guarded by [`std::sync::Once`],
323/// inside the `event_type()` method of each concrete event.
324pub fn register_event_deserializer(event_type: &'static str, deserializer: EventDeserializerFn) {
325    EVENT_DESERIALIZER_REGISTRY.insert(event_type, deserializer);
326}
327
328/// Attempt to deserialize a JSON value into a concrete event type using the
329/// registry.
330///
331/// Returns `Some(boxed_event)` if a deserializer is registered for the given
332/// event type and deserialization succeeds, or `None` otherwise.
333pub fn try_deserialize_event(
334    event_type: &str,
335    data: &serde_json::Value,
336) -> Option<Box<dyn AnyEvent>> {
337    let entry = EVENT_DESERIALIZER_REGISTRY.get(event_type)?;
338    let deserializer = *entry.value();
339    deserializer(data.clone())
340}
341
342// ---------------------------------------------------------------------------
343// Dynamic event type interning
344// ---------------------------------------------------------------------------
345
346/// Thread-safe registry that interns event type names into `&'static str`.
347///
348/// The [`Event`] trait requires `&'static str` for `event_type_id()`, but
349/// dynamic events from foreign language bindings carry runtime type names.
350/// We leak a small, bounded number of strings once and reuse them forever.
351static EVENT_TYPE_REGISTRY: std::sync::LazyLock<dashmap::DashMap<String, &'static str>> =
352    std::sync::LazyLock::new(dashmap::DashMap::new);
353
354/// Intern a dynamic event type name, returning a `&'static str`.
355///
356/// If the name has been interned before, the same pointer is returned.
357/// Otherwise the string is heap-allocated and leaked so it lives for
358/// `'static`.
359pub fn intern_event_type(name: &str) -> &'static str {
360    if let Some(entry) = EVENT_TYPE_REGISTRY.get(name) {
361        return entry.value();
362    }
363    let leaked: &'static str = Box::leak(name.to_string().into_boxed_str());
364    EVENT_TYPE_REGISTRY.insert(name.to_string(), leaked);
365    leaked
366}
367
368// ---------------------------------------------------------------------------
369// DynamicEvent
370// ---------------------------------------------------------------------------
371
372/// A type-erased event that carries its type name and payload as JSON.
373///
374/// Used to transport events defined in foreign language bindings (Python,
375/// TypeScript) through the Rust workflow engine.
376#[derive(Debug, Clone, Serialize, Deserialize)]
377pub struct DynamicEvent {
378    /// The event type identifier (e.g. `"AnalyzeEvent"`).
379    pub event_type: String,
380    /// The event data as a JSON object.
381    pub data: serde_json::Value,
382}
383
384impl Event for DynamicEvent {
385    fn event_type() -> &'static str
386    where
387        Self: Sized,
388    {
389        // This static method cannot return a dynamic string, but it is only
390        // used when you know the concrete type at compile time. For dynamic
391        // dispatch the instance method `event_type_id` is used instead.
392        "dynamic"
393    }
394
395    fn event_type_id(&self) -> &'static str {
396        intern_event_type(&self.event_type)
397    }
398
399    fn as_any(&self) -> &dyn Any {
400        self
401    }
402
403    fn clone_boxed(&self) -> Box<dyn AnyEvent> {
404        Box::new(self.clone())
405    }
406
407    fn to_json(&self) -> serde_json::Value {
408        self.data.clone()
409    }
410}
411
412// ---------------------------------------------------------------------------
413// Tests
414// ---------------------------------------------------------------------------
415
416#[cfg(test)]
417mod tests {
418    use super::*;
419
420    #[test]
421    fn start_event_type_id() {
422        assert_eq!(StartEvent::event_type(), "blazen::StartEvent");
423        let evt = StartEvent {
424            data: serde_json::json!({"key": "value"}),
425        };
426        assert_eq!(Event::event_type_id(&evt), "blazen::StartEvent");
427    }
428
429    #[test]
430    fn stop_event_type_id() {
431        assert_eq!(StopEvent::event_type(), "blazen::StopEvent");
432        let evt = StopEvent {
433            result: serde_json::json!(42),
434        };
435        assert_eq!(Event::event_type_id(&evt), "blazen::StopEvent");
436    }
437
438    #[test]
439    fn any_event_downcast() {
440        let evt = StartEvent {
441            data: serde_json::json!(null),
442        };
443        let boxed: Box<dyn AnyEvent> = Box::new(evt.clone());
444        let downcasted = boxed.downcast_ref::<StartEvent>().unwrap();
445        assert_eq!(downcasted.data, evt.data);
446
447        // Wrong type returns None.
448        assert!(boxed.downcast_ref::<StopEvent>().is_none());
449    }
450
451    #[test]
452    fn clone_boxed_any_event() {
453        let evt = StopEvent {
454            result: serde_json::json!("done"),
455        };
456        let boxed: Box<dyn AnyEvent> = Box::new(evt);
457        let cloned = boxed.clone();
458        assert_eq!(boxed.event_type_id(), cloned.event_type_id());
459        assert_eq!(boxed.to_json(), cloned.to_json());
460    }
461
462    #[test]
463    fn event_envelope_constructor() {
464        let evt = StartEvent {
465            data: serde_json::json!({"hello": "world"}),
466        };
467        let envelope = EventEnvelope::new(Box::new(evt), Some("my_step".to_string()));
468        assert_eq!(envelope.source_step.as_deref(), Some("my_step"));
469        assert_eq!(envelope.event.event_type_id(), "blazen::StartEvent");
470    }
471
472    #[test]
473    fn to_json_roundtrip() {
474        let start = StartEvent {
475            data: serde_json::json!({"nums": [1, 2, 3]}),
476        };
477        let json = Event::to_json(&start);
478        let deserialized: StartEvent = serde_json::from_value(json).unwrap();
479        assert_eq!(start.data, deserialized.data);
480
481        let stop = StopEvent {
482            result: serde_json::json!("ok"),
483        };
484        let json = Event::to_json(&stop);
485        let deserialized: StopEvent = serde_json::from_value(json).unwrap();
486        assert_eq!(stop.result, deserialized.result);
487    }
488
489    #[test]
490    fn intern_event_type_returns_same_pointer() {
491        let a = intern_event_type("TestEventInEvents");
492        let b = intern_event_type("TestEventInEvents");
493        assert!(std::ptr::eq(a, b));
494    }
495
496    #[test]
497    fn dynamic_event_roundtrip() {
498        let evt = DynamicEvent {
499            event_type: "MyEvent".to_owned(),
500            data: serde_json::json!({"key": "value"}),
501        };
502        let json = Event::to_json(&evt);
503        // DynamicEvent::to_json() now returns the flat data directly.
504        assert_eq!(json["key"], "value");
505    }
506
507    #[test]
508    fn dynamic_event_type_id() {
509        let evt = DynamicEvent {
510            event_type: "CustomEvent".to_owned(),
511            data: serde_json::json!({}),
512        };
513        assert_eq!(Event::event_type_id(&evt), "CustomEvent");
514    }
515
516    #[test]
517    fn input_request_event_type_id() {
518        assert_eq!(InputRequestEvent::event_type(), "blazen::InputRequestEvent");
519        let evt = InputRequestEvent {
520            request_id: "req-1".to_string(),
521            prompt: "What is your name?".to_string(),
522            metadata: serde_json::json!({"choices": ["Alice", "Bob"]}),
523        };
524        assert_eq!(Event::event_type_id(&evt), "blazen::InputRequestEvent");
525    }
526
527    #[test]
528    fn input_response_event_type_id() {
529        assert_eq!(
530            InputResponseEvent::event_type(),
531            "blazen::InputResponseEvent"
532        );
533        let evt = InputResponseEvent {
534            request_id: "req-1".to_string(),
535            response: serde_json::json!("Alice"),
536        };
537        assert_eq!(Event::event_type_id(&evt), "blazen::InputResponseEvent");
538    }
539
540    #[test]
541    fn input_request_event_roundtrip() {
542        let evt = InputRequestEvent {
543            request_id: "req-42".to_string(),
544            prompt: "Pick a number".to_string(),
545            metadata: serde_json::json!({"min": 1, "max": 100}),
546        };
547        let json = Event::to_json(&evt);
548        let deserialized: InputRequestEvent = serde_json::from_value(json).unwrap();
549        assert_eq!(evt.request_id, deserialized.request_id);
550        assert_eq!(evt.prompt, deserialized.prompt);
551        assert_eq!(evt.metadata, deserialized.metadata);
552    }
553
554    #[test]
555    fn input_response_event_roundtrip() {
556        let evt = InputResponseEvent {
557            request_id: "req-42".to_string(),
558            response: serde_json::json!(77),
559        };
560        let json = Event::to_json(&evt);
561        let deserialized: InputResponseEvent = serde_json::from_value(json).unwrap();
562        assert_eq!(evt.request_id, deserialized.request_id);
563        assert_eq!(evt.response, deserialized.response);
564    }
565
566    #[test]
567    fn input_request_event_downcast() {
568        let evt = InputRequestEvent {
569            request_id: "req-99".to_string(),
570            prompt: "Confirm?".to_string(),
571            metadata: serde_json::json!(null),
572        };
573        let boxed: Box<dyn AnyEvent> = Box::new(evt.clone());
574        let downcasted = boxed.downcast_ref::<InputRequestEvent>().unwrap();
575        assert_eq!(downcasted.request_id, evt.request_id);
576
577        // Wrong type returns None.
578        assert!(boxed.downcast_ref::<InputResponseEvent>().is_none());
579    }
580
581    #[test]
582    fn input_response_event_downcast() {
583        let evt = InputResponseEvent {
584            request_id: "req-99".to_string(),
585            response: serde_json::json!({"answer": true}),
586        };
587        let boxed: Box<dyn AnyEvent> = Box::new(evt.clone());
588        let downcasted = boxed.downcast_ref::<InputResponseEvent>().unwrap();
589        assert_eq!(downcasted.request_id, evt.request_id);
590
591        // Wrong type returns None.
592        assert!(boxed.downcast_ref::<InputRequestEvent>().is_none());
593    }
594}