Skip to main content

stormchaser_model/
events.rs

1use crate::id::{RunId, StepInstanceId};
2use chrono::{DateTime, Utc};
3use schemars::JsonSchema;
4use serde::{Deserialize, Serialize};
5use serde_json::Value;
6use std::collections::HashMap;
7
8use nutype::nutype;
9
10#[nutype(derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize, JsonSchema))]
11pub struct SchemaVersion(String);
12
13#[nutype(derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize, JsonSchema))]
14pub struct SchemaId(String);
15
16#[derive(Debug, Clone, Serialize, Deserialize, JsonSchema, PartialEq, Eq)]
17pub enum EventSource {
18    System,
19    Api,
20    Engine,
21}
22
23impl EventSource {
24    pub fn as_str(&self) -> &str {
25        match self {
26            EventSource::System => "/stormchaser",
27            EventSource::Api => "/stormchaser/api",
28            EventSource::Engine => "stormchaser-engine",
29        }
30    }
31}
32
33#[derive(Debug, Clone, Serialize, Deserialize, JsonSchema, PartialEq, Eq)]
34pub enum WorkflowEventType {
35    Queued,
36    StartPending,
37    Running,
38    Completed,
39    Failed,
40    Aborted,
41}
42
43impl WorkflowEventType {
44    pub fn as_str(&self) -> &str {
45        match self {
46            WorkflowEventType::Queued => "WorkflowQueuedEvent",
47            WorkflowEventType::StartPending => "WorkflowStartPendingEvent",
48            WorkflowEventType::Running => "WorkflowRunningEvent",
49            WorkflowEventType::Completed => "WorkflowCompletedEvent",
50            WorkflowEventType::Failed => "WorkflowFailedEvent",
51            WorkflowEventType::Aborted => "WorkflowAbortedEvent",
52        }
53    }
54}
55
56#[derive(Debug, Clone, Serialize, Deserialize, JsonSchema, PartialEq, Eq)]
57pub enum StepEventType {
58    Scheduled,
59    Running,
60    Completed,
61    Failed,
62    Query,
63    QueryResponse,
64}
65
66impl StepEventType {
67    pub fn as_str(&self) -> &str {
68        match self {
69            StepEventType::Scheduled => "StepScheduledEvent",
70            StepEventType::Running => "StepRunningEvent",
71            StepEventType::Completed => "StepCompletedEvent",
72            StepEventType::Failed => "StepFailedEvent",
73            StepEventType::Query => "StepQueryEvent",
74            StepEventType::QueryResponse => "StepQueryResponseEvent",
75        }
76    }
77}
78
79#[derive(Debug, Clone, Serialize, Deserialize, JsonSchema, PartialEq, Eq)]
80pub enum RunnerEventType {
81    Register,
82    Heartbeat,
83    Offline,
84}
85
86impl RunnerEventType {
87    pub fn as_str(&self) -> &str {
88        match self {
89            RunnerEventType::Register => "RunnerRegisterEvent",
90            RunnerEventType::Heartbeat => "RunnerHeartbeatEvent",
91            RunnerEventType::Offline => "RunnerOfflineEvent",
92        }
93    }
94}
95
96#[derive(Debug, Clone, PartialEq, Eq)]
97pub enum EventType {
98    Workflow(WorkflowEventType),
99    Step(StepEventType),
100    Runner(RunnerEventType),
101}
102
103impl EventType {
104    pub fn as_str(&self) -> &str {
105        match self {
106            EventType::Workflow(w) => w.as_str(),
107            EventType::Step(s) => s.as_str(),
108            EventType::Runner(r) => r.as_str(),
109        }
110    }
111}
112
113impl Serialize for EventType {
114    fn serialize<S>(&self, serializer: S) -> Result<S::Ok, S::Error>
115    where
116        S: serde::Serializer,
117    {
118        serializer.serialize_str(self.as_str())
119    }
120}
121
122impl<'de> Deserialize<'de> for EventType {
123    fn deserialize<D>(deserializer: D) -> Result<Self, D::Error>
124    where
125        D: serde::Deserializer<'de>,
126    {
127        let s = String::deserialize(deserializer)?;
128        match s.as_str() {
129            "WorkflowQueuedEvent" => Ok(EventType::Workflow(WorkflowEventType::Queued)),
130            "WorkflowStartPendingEvent" => Ok(EventType::Workflow(WorkflowEventType::StartPending)),
131            "WorkflowRunningEvent" => Ok(EventType::Workflow(WorkflowEventType::Running)),
132            "WorkflowCompletedEvent" => Ok(EventType::Workflow(WorkflowEventType::Completed)),
133            "WorkflowFailedEvent" => Ok(EventType::Workflow(WorkflowEventType::Failed)),
134            "WorkflowAbortedEvent" => Ok(EventType::Workflow(WorkflowEventType::Aborted)),
135            "StepScheduledEvent" => Ok(EventType::Step(StepEventType::Scheduled)),
136            "StepRunningEvent" => Ok(EventType::Step(StepEventType::Running)),
137            "StepCompletedEvent" => Ok(EventType::Step(StepEventType::Completed)),
138            "StepFailedEvent" => Ok(EventType::Step(StepEventType::Failed)),
139            "StepQueryEvent" => Ok(EventType::Step(StepEventType::Query)),
140            "StepQueryResponseEvent" => Ok(EventType::Step(StepEventType::QueryResponse)),
141            "RunnerRegisterEvent" => Ok(EventType::Runner(RunnerEventType::Register)),
142            "RunnerHeartbeatEvent" => Ok(EventType::Runner(RunnerEventType::Heartbeat)),
143            "RunnerOfflineEvent" => Ok(EventType::Runner(RunnerEventType::Offline)),
144            _ => Err(serde::de::Error::custom(format!(
145                "Unknown EventType: {}",
146                s
147            ))),
148        }
149    }
150}
151
152impl schemars::JsonSchema for EventType {
153    fn schema_name() -> String {
154        "EventType".to_string()
155    }
156
157    fn json_schema(gen: &mut schemars::gen::SchemaGenerator) -> schemars::schema::Schema {
158        String::json_schema(gen)
159    }
160}
161
162use crate::workflow::RunStatus;
163
164#[derive(Debug, Clone, Serialize, Deserialize, JsonSchema)]
165pub struct WorkflowQueuedEvent {
166    pub run_id: RunId,
167    pub event_type: EventType,
168    pub timestamp: DateTime<Utc>,
169    pub status: RunStatus,
170    #[serde(skip_serializing_if = "Option::is_none")]
171    pub dsl: Option<String>,
172    #[serde(skip_serializing_if = "Option::is_none")]
173    pub inputs: Option<HashMap<String, Value>>,
174    #[serde(skip_serializing_if = "Option::is_none")]
175    pub initiating_user: Option<String>,
176    #[serde(skip_serializing_if = "Option::is_none")]
177    pub sops_file: Option<String>,
178    #[serde(skip_serializing_if = "Option::is_none")]
179    pub sops_role_arn: Option<String>,
180}
181
182#[derive(Debug, Clone, Serialize, Deserialize, JsonSchema)]
183pub struct WorkflowStartPendingEvent {
184    pub run_id: RunId,
185    pub event_type: EventType,
186    pub timestamp: DateTime<Utc>,
187    pub status: RunStatus,
188}
189
190#[derive(Debug, Clone, Serialize, Deserialize, JsonSchema)]
191pub struct WorkflowRunningEvent {
192    pub run_id: RunId,
193    pub event_type: EventType,
194    pub timestamp: DateTime<Utc>,
195    pub status: RunStatus,
196}
197
198#[derive(Debug, Clone, Serialize, Deserialize, JsonSchema)]
199pub struct WorkflowCompletedEvent {
200    pub run_id: RunId,
201    pub event_type: EventType,
202    pub timestamp: DateTime<Utc>,
203    pub status: RunStatus,
204}
205
206#[derive(Debug, Clone, Serialize, Deserialize, JsonSchema)]
207pub struct WorkflowFailedEvent {
208    pub run_id: RunId,
209    pub event_type: EventType,
210    pub timestamp: DateTime<Utc>,
211    pub status: RunStatus,
212}
213
214#[derive(Debug, Clone, Serialize, Deserialize, JsonSchema)]
215pub struct WorkflowAbortedEvent {
216    pub run_id: RunId,
217    pub event_type: EventType,
218    pub timestamp: DateTime<Utc>,
219    pub status: RunStatus,
220}
221
222#[derive(Debug, Clone, Serialize, Deserialize, JsonSchema)]
223pub struct StepScheduledEvent {
224    pub run_id: RunId,
225    pub step_id: StepInstanceId,
226    pub fencing_token: i64,
227    #[serde(skip_serializing_if = "Option::is_none")]
228    pub step_name: Option<String>,
229    #[serde(skip_serializing_if = "Option::is_none")]
230    pub step_type: Option<String>,
231    pub event_type: EventType,
232    pub step_dsl: Value,
233    #[serde(skip_serializing_if = "Option::is_none")]
234    pub spec: Option<Value>,
235    #[serde(skip_serializing_if = "Option::is_none")]
236    pub params: Option<Value>,
237    #[serde(skip_serializing_if = "Option::is_none")]
238    pub storage: Option<HashMap<String, Value>>,
239    #[serde(skip_serializing_if = "Option::is_none")]
240    pub test_report_urls: Option<HashMap<String, Value>>,
241    #[serde(skip_serializing_if = "Option::is_none")]
242    pub registry_auth: Option<Value>,
243    pub timestamp: DateTime<Utc>,
244}
245
246#[derive(Debug, Clone, Serialize, Deserialize, JsonSchema)]
247pub struct StepRunningEvent {
248    pub run_id: RunId,
249    pub step_id: StepInstanceId,
250    pub event_type: EventType,
251    #[serde(skip_serializing_if = "Option::is_none")]
252    pub runner_id: Option<String>,
253    pub timestamp: DateTime<Utc>,
254}
255
256#[derive(Debug, Clone, Serialize, Deserialize, JsonSchema)]
257pub struct StepCompletedEvent {
258    pub run_id: RunId,
259    pub step_id: StepInstanceId,
260    pub fencing_token: i64,
261    pub event_type: EventType,
262    #[serde(skip_serializing_if = "Option::is_none")]
263    pub runner_id: Option<String>,
264    #[serde(skip_serializing_if = "Option::is_none")]
265    pub storage_hashes: Option<HashMap<String, Value>>,
266    #[serde(skip_serializing_if = "Option::is_none")]
267    pub artifacts: Option<HashMap<String, Value>>,
268    #[serde(skip_serializing_if = "Option::is_none")]
269    pub test_reports: Option<HashMap<String, Value>>,
270    #[serde(skip_serializing_if = "Option::is_none")]
271    pub outputs: Option<HashMap<String, Value>>,
272    #[serde(skip_serializing_if = "Option::is_none")]
273    pub exit_code: Option<i32>,
274    pub timestamp: DateTime<Utc>,
275}
276
277#[derive(Debug, Clone, Serialize, Deserialize, JsonSchema)]
278pub struct StepFailedEvent {
279    pub run_id: RunId,
280    pub step_id: StepInstanceId,
281    pub fencing_token: i64,
282    pub event_type: EventType,
283    pub error: String,
284    #[serde(skip_serializing_if = "Option::is_none")]
285    pub exit_code: Option<i32>,
286    #[serde(skip_serializing_if = "Option::is_none")]
287    pub runner_id: Option<String>,
288    #[serde(skip_serializing_if = "Option::is_none")]
289    pub storage_hashes: Option<HashMap<String, Value>>,
290    #[serde(skip_serializing_if = "Option::is_none")]
291    pub artifacts: Option<HashMap<String, Value>>,
292    #[serde(skip_serializing_if = "Option::is_none")]
293    pub test_reports: Option<HashMap<String, Value>>,
294    #[serde(skip_serializing_if = "Option::is_none")]
295    pub outputs: Option<HashMap<String, Value>>,
296    pub timestamp: DateTime<Utc>,
297}
298
299#[derive(Debug, Clone, Serialize, Deserialize, JsonSchema)]
300pub struct StepQueryEvent {
301    pub step_id: StepInstanceId,
302}
303
304#[derive(Debug, Clone, Serialize, Deserialize, JsonSchema)]
305pub struct StepQueryResponseEvent {
306    pub step_id: StepInstanceId,
307    pub exists: bool,
308    #[serde(skip_serializing_if = "Option::is_none")]
309    pub status: Option<crate::step::StepStatus>,
310}
311
312#[derive(Debug, Clone, Serialize, Deserialize, JsonSchema)]
313pub struct RunnerStepTypeSchema {
314    pub step_type: String,
315    #[serde(skip_serializing_if = "Option::is_none")]
316    pub schema: Option<serde_json::Value>,
317    #[serde(skip_serializing_if = "Option::is_none")]
318    pub documentation: Option<String>,
319}
320
321#[derive(Debug, Clone, Serialize, Deserialize, JsonSchema)]
322pub struct RunnerRegisterEvent {
323    pub runner_id: String,
324    pub runner_type: String,
325    pub protocol_version: String,
326    pub nats_subject: String,
327    pub capabilities: Vec<String>,
328    pub step_types: Vec<RunnerStepTypeSchema>,
329}
330
331#[derive(Debug, Clone, Serialize, Deserialize, JsonSchema)]
332pub struct RunnerHeartbeatEvent {
333    pub runner_id: String,
334    pub version: String,
335    pub state: crate::runner::RunnerStatus,
336}
337
338#[derive(Debug, Clone, Serialize, Deserialize, JsonSchema)]
339pub struct RunnerOfflineEvent {
340    pub runner_id: String,
341    pub event_type: EventType,
342    pub timestamp: DateTime<Utc>,
343}
344
345#[cfg(test)]
346mod tests {
347    use super::*;
348    use uuid::Uuid;
349
350    #[test]
351    fn test_runner_register_event_deserializes_from_wire_format() {
352        let wire_json = serde_json::json!({
353            "runner_id": "runner-abc-123",
354            "runner_type": "docker",
355            "protocol_version": "v1",
356            "nats_subject": "stormchaser.v1.runner.docker.runner-abc-123",
357            "capabilities": ["docker", "linux", "container"],
358            "step_types": [
359                {
360                    "step_type": "RunContainer",
361                    "documentation": "Runs a container using Docker."
362                }
363            ]
364        });
365
366        let event: RunnerRegisterEvent = serde_json::from_value(wire_json).unwrap();
367        assert_eq!(event.runner_id, "runner-abc-123");
368        assert_eq!(event.runner_type, "docker");
369        assert_eq!(event.protocol_version, "v1");
370        assert_eq!(
371            event.nats_subject,
372            "stormchaser.v1.runner.docker.runner-abc-123"
373        );
374        assert_eq!(event.capabilities, vec!["docker", "linux", "container"]);
375        assert_eq!(event.step_types.len(), 1);
376        assert_eq!(event.step_types[0].step_type, "RunContainer");
377        assert!(event.step_types[0].schema.is_none());
378    }
379
380    #[test]
381    fn test_runner_register_event_serializes_correctly() {
382        let event = RunnerRegisterEvent {
383            runner_id: "runner-xyz".to_string(),
384            runner_type: "k8s".to_string(),
385            protocol_version: "v1".to_string(),
386            nats_subject: "stormchaser.v1.runner.k8s.runner-xyz".to_string(),
387            capabilities: vec!["k8s".to_string()],
388            step_types: vec![RunnerStepTypeSchema {
389                step_type: "RunK8sJob".to_string(),
390                schema: None,
391                documentation: Some("Runs a Kubernetes Job.".to_string()),
392            }],
393        };
394
395        let json = serde_json::to_value(&event).unwrap();
396        assert_eq!(json["runner_id"], "runner-xyz");
397        assert_eq!(json["runner_type"], "k8s");
398        assert_eq!(json["protocol_version"], "v1");
399        assert_eq!(json["step_types"][0]["step_type"], "RunK8sJob");
400        // schema is None, so it should not appear in the output
401        assert!(json["step_types"][0].get("schema").is_none());
402    }
403
404    #[test]
405    fn test_step_query_event_has_only_step_id() {
406        let event = StepQueryEvent {
407            step_id: StepInstanceId::new(Uuid::nil()),
408        };
409        let json = serde_json::to_value(&event).unwrap();
410        assert!(json.get("step_id").is_some());
411        assert!(json.get("run_id").is_none());
412    }
413}