1use crate::id::{RunId, StepInstanceId};
2use chrono::{DateTime, Utc};
3use schemars::JsonSchema;
4use serde::{Deserialize, Serialize};
5use serde_json::Value;
6use std::collections::HashMap;
7
8use nutype::nutype;
9
10#[nutype(derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize, JsonSchema))]
11pub struct SchemaVersion(String);
12
13#[nutype(derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize, JsonSchema))]
14pub struct SchemaId(String);
15
16#[derive(Debug, Clone, Serialize, Deserialize, JsonSchema, PartialEq, Eq)]
17pub enum EventSource {
18 System,
19 Api,
20 Engine,
21}
22
23impl EventSource {
24 pub fn as_str(&self) -> &str {
25 match self {
26 EventSource::System => "/stormchaser",
27 EventSource::Api => "/stormchaser/api",
28 EventSource::Engine => "stormchaser-engine",
29 }
30 }
31}
32
33#[derive(Debug, Clone, Serialize, Deserialize, JsonSchema, PartialEq, Eq)]
34pub enum WorkflowEventType {
35 Queued,
36 StartPending,
37 Running,
38 Completed,
39 Failed,
40 Aborted,
41}
42
43impl WorkflowEventType {
44 pub fn as_str(&self) -> &str {
45 match self {
46 WorkflowEventType::Queued => "WorkflowQueuedEvent",
47 WorkflowEventType::StartPending => "WorkflowStartPendingEvent",
48 WorkflowEventType::Running => "WorkflowRunningEvent",
49 WorkflowEventType::Completed => "WorkflowCompletedEvent",
50 WorkflowEventType::Failed => "WorkflowFailedEvent",
51 WorkflowEventType::Aborted => "WorkflowAbortedEvent",
52 }
53 }
54}
55
56#[derive(Debug, Clone, Serialize, Deserialize, JsonSchema, PartialEq, Eq)]
57pub enum StepEventType {
58 Scheduled,
59 Running,
60 Completed,
61 Failed,
62 Query,
63 QueryResponse,
64}
65
66impl StepEventType {
67 pub fn as_str(&self) -> &str {
68 match self {
69 StepEventType::Scheduled => "StepScheduledEvent",
70 StepEventType::Running => "StepRunningEvent",
71 StepEventType::Completed => "StepCompletedEvent",
72 StepEventType::Failed => "StepFailedEvent",
73 StepEventType::Query => "StepQueryEvent",
74 StepEventType::QueryResponse => "StepQueryResponseEvent",
75 }
76 }
77}
78
79#[derive(Debug, Clone, Serialize, Deserialize, JsonSchema, PartialEq, Eq)]
80pub enum RunnerEventType {
81 Register,
82 Heartbeat,
83 Offline,
84}
85
86impl RunnerEventType {
87 pub fn as_str(&self) -> &str {
88 match self {
89 RunnerEventType::Register => "RunnerRegisterEvent",
90 RunnerEventType::Heartbeat => "RunnerHeartbeatEvent",
91 RunnerEventType::Offline => "RunnerOfflineEvent",
92 }
93 }
94}
95
96#[derive(Debug, Clone, PartialEq, Eq)]
97pub enum EventType {
98 Workflow(WorkflowEventType),
99 Step(StepEventType),
100 Runner(RunnerEventType),
101}
102
103impl EventType {
104 pub fn as_str(&self) -> &str {
105 match self {
106 EventType::Workflow(w) => w.as_str(),
107 EventType::Step(s) => s.as_str(),
108 EventType::Runner(r) => r.as_str(),
109 }
110 }
111}
112
113impl Serialize for EventType {
114 fn serialize<S>(&self, serializer: S) -> Result<S::Ok, S::Error>
115 where
116 S: serde::Serializer,
117 {
118 serializer.serialize_str(self.as_str())
119 }
120}
121
122impl<'de> Deserialize<'de> for EventType {
123 fn deserialize<D>(deserializer: D) -> Result<Self, D::Error>
124 where
125 D: serde::Deserializer<'de>,
126 {
127 let s = String::deserialize(deserializer)?;
128 match s.as_str() {
129 "WorkflowQueuedEvent" => Ok(EventType::Workflow(WorkflowEventType::Queued)),
130 "WorkflowStartPendingEvent" => Ok(EventType::Workflow(WorkflowEventType::StartPending)),
131 "WorkflowRunningEvent" => Ok(EventType::Workflow(WorkflowEventType::Running)),
132 "WorkflowCompletedEvent" => Ok(EventType::Workflow(WorkflowEventType::Completed)),
133 "WorkflowFailedEvent" => Ok(EventType::Workflow(WorkflowEventType::Failed)),
134 "WorkflowAbortedEvent" => Ok(EventType::Workflow(WorkflowEventType::Aborted)),
135 "StepScheduledEvent" => Ok(EventType::Step(StepEventType::Scheduled)),
136 "StepRunningEvent" => Ok(EventType::Step(StepEventType::Running)),
137 "StepCompletedEvent" => Ok(EventType::Step(StepEventType::Completed)),
138 "StepFailedEvent" => Ok(EventType::Step(StepEventType::Failed)),
139 "StepQueryEvent" => Ok(EventType::Step(StepEventType::Query)),
140 "StepQueryResponseEvent" => Ok(EventType::Step(StepEventType::QueryResponse)),
141 "RunnerRegisterEvent" => Ok(EventType::Runner(RunnerEventType::Register)),
142 "RunnerHeartbeatEvent" => Ok(EventType::Runner(RunnerEventType::Heartbeat)),
143 "RunnerOfflineEvent" => Ok(EventType::Runner(RunnerEventType::Offline)),
144 _ => Err(serde::de::Error::custom(format!(
145 "Unknown EventType: {}",
146 s
147 ))),
148 }
149 }
150}
151
152impl schemars::JsonSchema for EventType {
153 fn schema_name() -> String {
154 "EventType".to_string()
155 }
156
157 fn json_schema(gen: &mut schemars::gen::SchemaGenerator) -> schemars::schema::Schema {
158 String::json_schema(gen)
159 }
160}
161
162use crate::workflow::RunStatus;
163
164#[derive(Debug, Clone, Serialize, Deserialize, JsonSchema)]
165pub struct WorkflowQueuedEvent {
166 pub run_id: RunId,
167 pub event_type: EventType,
168 pub timestamp: DateTime<Utc>,
169 pub status: RunStatus,
170 #[serde(skip_serializing_if = "Option::is_none")]
171 pub dsl: Option<String>,
172 #[serde(skip_serializing_if = "Option::is_none")]
173 pub inputs: Option<HashMap<String, Value>>,
174 #[serde(skip_serializing_if = "Option::is_none")]
175 pub initiating_user: Option<String>,
176 #[serde(skip_serializing_if = "Option::is_none")]
177 pub sops_file: Option<String>,
178 #[serde(skip_serializing_if = "Option::is_none")]
179 pub sops_role_arn: Option<String>,
180}
181
182#[derive(Debug, Clone, Serialize, Deserialize, JsonSchema)]
183pub struct WorkflowStartPendingEvent {
184 pub run_id: RunId,
185 pub event_type: EventType,
186 pub timestamp: DateTime<Utc>,
187 pub status: RunStatus,
188}
189
190#[derive(Debug, Clone, Serialize, Deserialize, JsonSchema)]
191pub struct WorkflowRunningEvent {
192 pub run_id: RunId,
193 pub event_type: EventType,
194 pub timestamp: DateTime<Utc>,
195 pub status: RunStatus,
196}
197
198#[derive(Debug, Clone, Serialize, Deserialize, JsonSchema)]
199pub struct WorkflowCompletedEvent {
200 pub run_id: RunId,
201 pub event_type: EventType,
202 pub timestamp: DateTime<Utc>,
203 pub status: RunStatus,
204}
205
206#[derive(Debug, Clone, Serialize, Deserialize, JsonSchema)]
207pub struct WorkflowFailedEvent {
208 pub run_id: RunId,
209 pub event_type: EventType,
210 pub timestamp: DateTime<Utc>,
211 pub status: RunStatus,
212}
213
214#[derive(Debug, Clone, Serialize, Deserialize, JsonSchema)]
215pub struct WorkflowAbortedEvent {
216 pub run_id: RunId,
217 pub event_type: EventType,
218 pub timestamp: DateTime<Utc>,
219 pub status: RunStatus,
220}
221
222#[derive(Debug, Clone, Serialize, Deserialize, JsonSchema)]
223pub struct StepScheduledEvent {
224 pub run_id: RunId,
225 pub step_id: StepInstanceId,
226 pub fencing_token: i64,
227 #[serde(skip_serializing_if = "Option::is_none")]
228 pub step_name: Option<String>,
229 #[serde(skip_serializing_if = "Option::is_none")]
230 pub step_type: Option<String>,
231 pub event_type: EventType,
232 pub step_dsl: Value,
233 #[serde(skip_serializing_if = "Option::is_none")]
234 pub spec: Option<Value>,
235 #[serde(skip_serializing_if = "Option::is_none")]
236 pub params: Option<Value>,
237 #[serde(skip_serializing_if = "Option::is_none")]
238 pub storage: Option<HashMap<String, Value>>,
239 #[serde(skip_serializing_if = "Option::is_none")]
240 pub test_report_urls: Option<HashMap<String, Value>>,
241 #[serde(skip_serializing_if = "Option::is_none")]
242 pub registry_auth: Option<Value>,
243 pub timestamp: DateTime<Utc>,
244}
245
246#[derive(Debug, Clone, Serialize, Deserialize, JsonSchema)]
247pub struct StepRunningEvent {
248 pub run_id: RunId,
249 pub step_id: StepInstanceId,
250 pub event_type: EventType,
251 #[serde(skip_serializing_if = "Option::is_none")]
252 pub runner_id: Option<String>,
253 pub timestamp: DateTime<Utc>,
254}
255
256#[derive(Debug, Clone, Serialize, Deserialize, JsonSchema)]
257pub struct StepCompletedEvent {
258 pub run_id: RunId,
259 pub step_id: StepInstanceId,
260 pub fencing_token: i64,
261 pub event_type: EventType,
262 #[serde(skip_serializing_if = "Option::is_none")]
263 pub runner_id: Option<String>,
264 #[serde(skip_serializing_if = "Option::is_none")]
265 pub storage_hashes: Option<HashMap<String, Value>>,
266 #[serde(skip_serializing_if = "Option::is_none")]
267 pub artifacts: Option<HashMap<String, Value>>,
268 #[serde(skip_serializing_if = "Option::is_none")]
269 pub test_reports: Option<HashMap<String, Value>>,
270 #[serde(skip_serializing_if = "Option::is_none")]
271 pub outputs: Option<HashMap<String, Value>>,
272 #[serde(skip_serializing_if = "Option::is_none")]
273 pub exit_code: Option<i32>,
274 pub timestamp: DateTime<Utc>,
275}
276
277#[derive(Debug, Clone, Serialize, Deserialize, JsonSchema)]
278pub struct StepFailedEvent {
279 pub run_id: RunId,
280 pub step_id: StepInstanceId,
281 pub fencing_token: i64,
282 pub event_type: EventType,
283 pub error: String,
284 #[serde(skip_serializing_if = "Option::is_none")]
285 pub exit_code: Option<i32>,
286 #[serde(skip_serializing_if = "Option::is_none")]
287 pub runner_id: Option<String>,
288 #[serde(skip_serializing_if = "Option::is_none")]
289 pub storage_hashes: Option<HashMap<String, Value>>,
290 #[serde(skip_serializing_if = "Option::is_none")]
291 pub artifacts: Option<HashMap<String, Value>>,
292 #[serde(skip_serializing_if = "Option::is_none")]
293 pub test_reports: Option<HashMap<String, Value>>,
294 #[serde(skip_serializing_if = "Option::is_none")]
295 pub outputs: Option<HashMap<String, Value>>,
296 pub timestamp: DateTime<Utc>,
297}
298
299#[derive(Debug, Clone, Serialize, Deserialize, JsonSchema)]
300pub struct StepQueryEvent {
301 pub step_id: StepInstanceId,
302}
303
304#[derive(Debug, Clone, Serialize, Deserialize, JsonSchema)]
305pub struct StepQueryResponseEvent {
306 pub step_id: StepInstanceId,
307 pub exists: bool,
308 #[serde(skip_serializing_if = "Option::is_none")]
309 pub status: Option<crate::step::StepStatus>,
310}
311
312#[derive(Debug, Clone, Serialize, Deserialize, JsonSchema)]
313pub struct RunnerStepTypeSchema {
314 pub step_type: String,
315 #[serde(skip_serializing_if = "Option::is_none")]
316 pub schema: Option<serde_json::Value>,
317 #[serde(skip_serializing_if = "Option::is_none")]
318 pub documentation: Option<String>,
319}
320
321#[derive(Debug, Clone, Serialize, Deserialize, JsonSchema)]
322pub struct RunnerRegisterEvent {
323 pub runner_id: String,
324 pub runner_type: String,
325 pub protocol_version: String,
326 pub nats_subject: String,
327 pub capabilities: Vec<String>,
328 pub step_types: Vec<RunnerStepTypeSchema>,
329}
330
331#[derive(Debug, Clone, Serialize, Deserialize, JsonSchema)]
332pub struct RunnerHeartbeatEvent {
333 pub runner_id: String,
334 pub version: String,
335 pub state: crate::runner::RunnerStatus,
336}
337
338#[derive(Debug, Clone, Serialize, Deserialize, JsonSchema)]
339pub struct RunnerOfflineEvent {
340 pub runner_id: String,
341 pub event_type: EventType,
342 pub timestamp: DateTime<Utc>,
343}
344
345#[cfg(test)]
346mod tests {
347 use super::*;
348 use uuid::Uuid;
349
350 #[test]
351 fn test_runner_register_event_deserializes_from_wire_format() {
352 let wire_json = serde_json::json!({
353 "runner_id": "runner-abc-123",
354 "runner_type": "docker",
355 "protocol_version": "v1",
356 "nats_subject": "stormchaser.v1.runner.docker.runner-abc-123",
357 "capabilities": ["docker", "linux", "container"],
358 "step_types": [
359 {
360 "step_type": "RunContainer",
361 "documentation": "Runs a container using Docker."
362 }
363 ]
364 });
365
366 let event: RunnerRegisterEvent = serde_json::from_value(wire_json).unwrap();
367 assert_eq!(event.runner_id, "runner-abc-123");
368 assert_eq!(event.runner_type, "docker");
369 assert_eq!(event.protocol_version, "v1");
370 assert_eq!(
371 event.nats_subject,
372 "stormchaser.v1.runner.docker.runner-abc-123"
373 );
374 assert_eq!(event.capabilities, vec!["docker", "linux", "container"]);
375 assert_eq!(event.step_types.len(), 1);
376 assert_eq!(event.step_types[0].step_type, "RunContainer");
377 assert!(event.step_types[0].schema.is_none());
378 }
379
380 #[test]
381 fn test_runner_register_event_serializes_correctly() {
382 let event = RunnerRegisterEvent {
383 runner_id: "runner-xyz".to_string(),
384 runner_type: "k8s".to_string(),
385 protocol_version: "v1".to_string(),
386 nats_subject: "stormchaser.v1.runner.k8s.runner-xyz".to_string(),
387 capabilities: vec!["k8s".to_string()],
388 step_types: vec![RunnerStepTypeSchema {
389 step_type: "RunK8sJob".to_string(),
390 schema: None,
391 documentation: Some("Runs a Kubernetes Job.".to_string()),
392 }],
393 };
394
395 let json = serde_json::to_value(&event).unwrap();
396 assert_eq!(json["runner_id"], "runner-xyz");
397 assert_eq!(json["runner_type"], "k8s");
398 assert_eq!(json["protocol_version"], "v1");
399 assert_eq!(json["step_types"][0]["step_type"], "RunK8sJob");
400 assert!(json["step_types"][0].get("schema").is_none());
402 }
403
404 #[test]
405 fn test_step_query_event_has_only_step_id() {
406 let event = StepQueryEvent {
407 step_id: StepInstanceId::new(Uuid::nil()),
408 };
409 let json = serde_json::to_value(&event).unwrap();
410 assert!(json.get("step_id").is_some());
411 assert!(json.get("run_id").is_none());
412 }
413}