Skip to main content

stormchaser_model/
events.rs

1use crate::id::{RunId, StepInstanceId};
2use chrono::{DateTime, Utc};
3use schemars::JsonSchema;
4use serde::{Deserialize, Serialize};
5use serde_json::Value;
6use std::collections::HashMap;
7
8use nutype::nutype;
9
10#[nutype(derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize, JsonSchema))]
11pub struct SchemaVersion(String);
12
13#[nutype(derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize, JsonSchema))]
14pub struct SchemaId(String);
15
16#[derive(Debug, Clone, Serialize, Deserialize, JsonSchema, PartialEq, Eq)]
17pub enum EventSource {
18    System,
19    Api,
20    Engine,
21}
22
23impl EventSource {
24    pub fn as_str(&self) -> &str {
25        match self {
26            EventSource::System => "/stormchaser",
27            EventSource::Api => "/stormchaser/api",
28            EventSource::Engine => "stormchaser-engine",
29        }
30    }
31}
32
33#[derive(Debug, Clone, Serialize, Deserialize, JsonSchema, PartialEq, Eq)]
34pub enum WorkflowEventType {
35    Queued,
36    StartPending,
37    Running,
38    Completed,
39    Failed,
40    Aborted,
41}
42
43impl WorkflowEventType {
44    pub fn as_str(&self) -> &str {
45        match self {
46            WorkflowEventType::Queued => "WorkflowQueuedEvent",
47            WorkflowEventType::StartPending => "WorkflowStartPendingEvent",
48            WorkflowEventType::Running => "WorkflowRunningEvent",
49            WorkflowEventType::Completed => "WorkflowCompletedEvent",
50            WorkflowEventType::Failed => "WorkflowFailedEvent",
51            WorkflowEventType::Aborted => "WorkflowAbortedEvent",
52        }
53    }
54}
55
56#[derive(Debug, Clone, Serialize, Deserialize, JsonSchema, PartialEq, Eq)]
57pub enum StepEventType {
58    Scheduled,
59    Running,
60    Completed,
61    Failed,
62    Query,
63    QueryResponse,
64}
65
66impl StepEventType {
67    pub fn as_str(&self) -> &str {
68        match self {
69            StepEventType::Scheduled => "StepScheduledEvent",
70            StepEventType::Running => "StepRunningEvent",
71            StepEventType::Completed => "StepCompletedEvent",
72            StepEventType::Failed => "StepFailedEvent",
73            StepEventType::Query => "StepQueryEvent",
74            StepEventType::QueryResponse => "StepQueryResponseEvent",
75        }
76    }
77}
78
79#[derive(Debug, Clone, Serialize, Deserialize, JsonSchema, PartialEq, Eq)]
80pub enum RunnerEventType {
81    Register,
82    Heartbeat,
83    Offline,
84}
85
86impl RunnerEventType {
87    pub fn as_str(&self) -> &str {
88        match self {
89            RunnerEventType::Register => "RunnerRegisterEvent",
90            RunnerEventType::Heartbeat => "RunnerHeartbeatEvent",
91            RunnerEventType::Offline => "RunnerOfflineEvent",
92        }
93    }
94}
95
96#[derive(Debug, Clone, PartialEq, Eq)]
97pub enum EventType {
98    Workflow(WorkflowEventType),
99    Step(StepEventType),
100    Runner(RunnerEventType),
101}
102
103impl EventType {
104    pub fn as_str(&self) -> &str {
105        match self {
106            EventType::Workflow(w) => w.as_str(),
107            EventType::Step(s) => s.as_str(),
108            EventType::Runner(r) => r.as_str(),
109        }
110    }
111}
112
113impl Serialize for EventType {
114    fn serialize<S>(&self, serializer: S) -> Result<S::Ok, S::Error>
115    where
116        S: serde::Serializer,
117    {
118        serializer.serialize_str(self.as_str())
119    }
120}
121
122impl<'de> Deserialize<'de> for EventType {
123    fn deserialize<D>(deserializer: D) -> Result<Self, D::Error>
124    where
125        D: serde::Deserializer<'de>,
126    {
127        let s = String::deserialize(deserializer)?;
128        match s.as_str() {
129            "WorkflowQueuedEvent" => Ok(EventType::Workflow(WorkflowEventType::Queued)),
130            "WorkflowStartPendingEvent" => Ok(EventType::Workflow(WorkflowEventType::StartPending)),
131            "WorkflowRunningEvent" => Ok(EventType::Workflow(WorkflowEventType::Running)),
132            "WorkflowCompletedEvent" => Ok(EventType::Workflow(WorkflowEventType::Completed)),
133            "WorkflowFailedEvent" => Ok(EventType::Workflow(WorkflowEventType::Failed)),
134            "WorkflowAbortedEvent" => Ok(EventType::Workflow(WorkflowEventType::Aborted)),
135            "StepScheduledEvent" => Ok(EventType::Step(StepEventType::Scheduled)),
136            "StepRunningEvent" => Ok(EventType::Step(StepEventType::Running)),
137            "StepCompletedEvent" => Ok(EventType::Step(StepEventType::Completed)),
138            "StepFailedEvent" => Ok(EventType::Step(StepEventType::Failed)),
139            "StepQueryEvent" => Ok(EventType::Step(StepEventType::Query)),
140            "StepQueryResponseEvent" => Ok(EventType::Step(StepEventType::QueryResponse)),
141            "RunnerRegisterEvent" => Ok(EventType::Runner(RunnerEventType::Register)),
142            "RunnerHeartbeatEvent" => Ok(EventType::Runner(RunnerEventType::Heartbeat)),
143            "RunnerOfflineEvent" => Ok(EventType::Runner(RunnerEventType::Offline)),
144            _ => Err(serde::de::Error::custom(format!(
145                "Unknown EventType: {}",
146                s
147            ))),
148        }
149    }
150}
151
152impl schemars::JsonSchema for EventType {
153    fn schema_name() -> String {
154        "EventType".to_string()
155    }
156
157    fn json_schema(gen: &mut schemars::gen::SchemaGenerator) -> schemars::schema::Schema {
158        String::json_schema(gen)
159    }
160}
161
162#[derive(Debug, Clone, Serialize, Deserialize, JsonSchema)]
163pub struct WorkflowQueuedEvent {
164    pub run_id: RunId,
165    pub event_type: EventType,
166    pub timestamp: DateTime<Utc>,
167    #[serde(skip_serializing_if = "Option::is_none")]
168    pub dsl: Option<String>,
169    #[serde(skip_serializing_if = "Option::is_none")]
170    pub inputs: Option<HashMap<String, Value>>,
171    #[serde(skip_serializing_if = "Option::is_none")]
172    pub initiating_user: Option<String>,
173}
174
175#[derive(Debug, Clone, Serialize, Deserialize, JsonSchema)]
176pub struct WorkflowStartPendingEvent {
177    pub run_id: RunId,
178    pub event_type: EventType,
179    pub timestamp: DateTime<Utc>,
180}
181
182#[derive(Debug, Clone, Serialize, Deserialize, JsonSchema)]
183pub struct WorkflowRunningEvent {
184    pub run_id: RunId,
185    pub event_type: EventType,
186    pub timestamp: DateTime<Utc>,
187}
188
189#[derive(Debug, Clone, Serialize, Deserialize, JsonSchema)]
190pub struct WorkflowCompletedEvent {
191    pub run_id: RunId,
192    pub event_type: EventType,
193    pub timestamp: DateTime<Utc>,
194}
195
196#[derive(Debug, Clone, Serialize, Deserialize, JsonSchema)]
197pub struct WorkflowFailedEvent {
198    pub run_id: RunId,
199    pub event_type: EventType,
200    pub timestamp: DateTime<Utc>,
201}
202
203#[derive(Debug, Clone, Serialize, Deserialize, JsonSchema)]
204pub struct WorkflowAbortedEvent {
205    pub run_id: RunId,
206    pub event_type: EventType,
207    pub timestamp: DateTime<Utc>,
208}
209
210#[derive(Debug, Clone, Serialize, Deserialize, JsonSchema)]
211pub struct StepScheduledEvent {
212    pub run_id: RunId,
213    pub step_id: StepInstanceId,
214    #[serde(skip_serializing_if = "Option::is_none")]
215    pub step_name: Option<String>,
216    #[serde(skip_serializing_if = "Option::is_none")]
217    pub step_type: Option<String>,
218    pub event_type: EventType,
219    pub step_dsl: Value,
220    #[serde(skip_serializing_if = "Option::is_none")]
221    pub spec: Option<Value>,
222    #[serde(skip_serializing_if = "Option::is_none")]
223    pub params: Option<Value>,
224    #[serde(skip_serializing_if = "Option::is_none")]
225    pub storage: Option<HashMap<String, Value>>,
226    #[serde(skip_serializing_if = "Option::is_none")]
227    pub test_report_urls: Option<HashMap<String, Value>>,
228    pub timestamp: DateTime<Utc>,
229}
230
231#[derive(Debug, Clone, Serialize, Deserialize, JsonSchema)]
232pub struct StepRunningEvent {
233    pub run_id: RunId,
234    pub step_id: StepInstanceId,
235    pub event_type: EventType,
236    #[serde(skip_serializing_if = "Option::is_none")]
237    pub runner_id: Option<String>,
238    pub timestamp: DateTime<Utc>,
239}
240
241#[derive(Debug, Clone, Serialize, Deserialize, JsonSchema)]
242pub struct StepCompletedEvent {
243    pub run_id: RunId,
244    pub step_id: StepInstanceId,
245    pub event_type: EventType,
246    #[serde(skip_serializing_if = "Option::is_none")]
247    pub runner_id: Option<String>,
248    #[serde(skip_serializing_if = "Option::is_none")]
249    pub storage_hashes: Option<HashMap<String, Value>>,
250    #[serde(skip_serializing_if = "Option::is_none")]
251    pub artifacts: Option<HashMap<String, Value>>,
252    #[serde(skip_serializing_if = "Option::is_none")]
253    pub test_reports: Option<HashMap<String, Value>>,
254    #[serde(skip_serializing_if = "Option::is_none")]
255    pub outputs: Option<HashMap<String, Value>>,
256    #[serde(skip_serializing_if = "Option::is_none")]
257    pub exit_code: Option<i32>,
258    pub timestamp: DateTime<Utc>,
259}
260
261#[derive(Debug, Clone, Serialize, Deserialize, JsonSchema)]
262pub struct StepFailedEvent {
263    pub run_id: RunId,
264    pub step_id: StepInstanceId,
265    pub event_type: EventType,
266    pub error: String,
267    #[serde(skip_serializing_if = "Option::is_none")]
268    pub exit_code: Option<i32>,
269    #[serde(skip_serializing_if = "Option::is_none")]
270    pub runner_id: Option<String>,
271    #[serde(skip_serializing_if = "Option::is_none")]
272    pub storage_hashes: Option<HashMap<String, Value>>,
273    #[serde(skip_serializing_if = "Option::is_none")]
274    pub artifacts: Option<HashMap<String, Value>>,
275    #[serde(skip_serializing_if = "Option::is_none")]
276    pub test_reports: Option<HashMap<String, Value>>,
277    #[serde(skip_serializing_if = "Option::is_none")]
278    pub outputs: Option<HashMap<String, Value>>,
279    pub timestamp: DateTime<Utc>,
280}
281
282#[derive(Debug, Clone, Serialize, Deserialize, JsonSchema)]
283pub struct StepQueryEvent {
284    pub step_id: StepInstanceId,
285}
286
287#[derive(Debug, Clone, Serialize, Deserialize, JsonSchema)]
288pub struct StepQueryResponseEvent {
289    pub step_id: StepInstanceId,
290    pub exists: bool,
291    #[serde(skip_serializing_if = "Option::is_none")]
292    pub status: Option<crate::step::StepStatus>,
293}
294
295#[derive(Debug, Clone, Serialize, Deserialize, JsonSchema)]
296pub struct RunnerStepTypeSchema {
297    pub step_type: String,
298    #[serde(skip_serializing_if = "Option::is_none")]
299    pub schema: Option<serde_json::Value>,
300    #[serde(skip_serializing_if = "Option::is_none")]
301    pub documentation: Option<String>,
302}
303
304#[derive(Debug, Clone, Serialize, Deserialize, JsonSchema)]
305pub struct RunnerRegisterEvent {
306    pub runner_id: String,
307    pub runner_type: String,
308    pub protocol_version: String,
309    pub nats_subject: String,
310    pub capabilities: Vec<String>,
311    pub step_types: Vec<RunnerStepTypeSchema>,
312}
313
314#[derive(Debug, Clone, Serialize, Deserialize, JsonSchema)]
315pub struct RunnerHeartbeatEvent {
316    pub runner_id: String,
317    pub version: String,
318    pub state: crate::runner::RunnerStatus,
319}
320
321#[derive(Debug, Clone, Serialize, Deserialize, JsonSchema)]
322pub struct RunnerOfflineEvent {
323    pub runner_id: String,
324    pub event_type: EventType,
325    pub timestamp: DateTime<Utc>,
326}
327
328#[cfg(test)]
329mod tests {
330    use super::*;
331    use uuid::Uuid;
332
333    #[test]
334    fn test_runner_register_event_deserializes_from_wire_format() {
335        let wire_json = serde_json::json!({
336            "runner_id": "runner-abc-123",
337            "runner_type": "docker",
338            "protocol_version": "v1",
339            "nats_subject": "stormchaser.v1.runner.docker.runner-abc-123",
340            "capabilities": ["docker", "linux", "container"],
341            "step_types": [
342                {
343                    "step_type": "RunContainer",
344                    "documentation": "Runs a container using Docker."
345                }
346            ]
347        });
348
349        let event: RunnerRegisterEvent = serde_json::from_value(wire_json).unwrap();
350        assert_eq!(event.runner_id, "runner-abc-123");
351        assert_eq!(event.runner_type, "docker");
352        assert_eq!(event.protocol_version, "v1");
353        assert_eq!(
354            event.nats_subject,
355            "stormchaser.v1.runner.docker.runner-abc-123"
356        );
357        assert_eq!(event.capabilities, vec!["docker", "linux", "container"]);
358        assert_eq!(event.step_types.len(), 1);
359        assert_eq!(event.step_types[0].step_type, "RunContainer");
360        assert!(event.step_types[0].schema.is_none());
361    }
362
363    #[test]
364    fn test_runner_register_event_serializes_correctly() {
365        let event = RunnerRegisterEvent {
366            runner_id: "runner-xyz".to_string(),
367            runner_type: "k8s".to_string(),
368            protocol_version: "v1".to_string(),
369            nats_subject: "stormchaser.v1.runner.k8s.runner-xyz".to_string(),
370            capabilities: vec!["k8s".to_string()],
371            step_types: vec![RunnerStepTypeSchema {
372                step_type: "RunK8sJob".to_string(),
373                schema: None,
374                documentation: Some("Runs a Kubernetes Job.".to_string()),
375            }],
376        };
377
378        let json = serde_json::to_value(&event).unwrap();
379        assert_eq!(json["runner_id"], "runner-xyz");
380        assert_eq!(json["runner_type"], "k8s");
381        assert_eq!(json["protocol_version"], "v1");
382        assert_eq!(json["step_types"][0]["step_type"], "RunK8sJob");
383        // schema is None, so it should not appear in the output
384        assert!(json["step_types"][0].get("schema").is_none());
385    }
386
387    #[test]
388    fn test_step_query_event_has_only_step_id() {
389        let event = StepQueryEvent {
390            step_id: StepInstanceId::new(Uuid::nil()),
391        };
392        let json = serde_json::to_value(&event).unwrap();
393        assert!(json.get("step_id").is_some());
394        assert!(json.get("run_id").is_none());
395    }
396}