1use crate::id::{RunId, StepInstanceId};
2use chrono::{DateTime, Utc};
3use schemars::JsonSchema;
4use serde::{Deserialize, Serialize};
5use serde_json::Value;
6use std::collections::HashMap;
7
8use nutype::nutype;
9
10#[nutype(derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize, JsonSchema))]
11pub struct SchemaVersion(String);
12
13#[nutype(derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize, JsonSchema))]
14pub struct SchemaId(String);
15
16#[derive(Debug, Clone, Serialize, Deserialize, JsonSchema, PartialEq, Eq)]
17pub enum EventSource {
18 System,
19 Api,
20 Engine,
21}
22
23impl EventSource {
24 pub fn as_str(&self) -> &str {
25 match self {
26 EventSource::System => "/stormchaser",
27 EventSource::Api => "/stormchaser/api",
28 EventSource::Engine => "stormchaser-engine",
29 }
30 }
31}
32
33#[derive(Debug, Clone, Serialize, Deserialize, JsonSchema, PartialEq, Eq)]
34pub enum WorkflowEventType {
35 Queued,
36 StartPending,
37 Running,
38 Completed,
39 Failed,
40 Aborted,
41}
42
43impl WorkflowEventType {
44 pub fn as_str(&self) -> &str {
45 match self {
46 WorkflowEventType::Queued => "WorkflowQueuedEvent",
47 WorkflowEventType::StartPending => "WorkflowStartPendingEvent",
48 WorkflowEventType::Running => "WorkflowRunningEvent",
49 WorkflowEventType::Completed => "WorkflowCompletedEvent",
50 WorkflowEventType::Failed => "WorkflowFailedEvent",
51 WorkflowEventType::Aborted => "WorkflowAbortedEvent",
52 }
53 }
54}
55
56#[derive(Debug, Clone, Serialize, Deserialize, JsonSchema, PartialEq, Eq)]
57pub enum StepEventType {
58 Scheduled,
59 Running,
60 Completed,
61 Failed,
62 Query,
63 QueryResponse,
64}
65
66impl StepEventType {
67 pub fn as_str(&self) -> &str {
68 match self {
69 StepEventType::Scheduled => "StepScheduledEvent",
70 StepEventType::Running => "StepRunningEvent",
71 StepEventType::Completed => "StepCompletedEvent",
72 StepEventType::Failed => "StepFailedEvent",
73 StepEventType::Query => "StepQueryEvent",
74 StepEventType::QueryResponse => "StepQueryResponseEvent",
75 }
76 }
77}
78
79#[derive(Debug, Clone, Serialize, Deserialize, JsonSchema, PartialEq, Eq)]
80pub enum RunnerEventType {
81 Register,
82 Heartbeat,
83 Offline,
84}
85
86impl RunnerEventType {
87 pub fn as_str(&self) -> &str {
88 match self {
89 RunnerEventType::Register => "RunnerRegisterEvent",
90 RunnerEventType::Heartbeat => "RunnerHeartbeatEvent",
91 RunnerEventType::Offline => "RunnerOfflineEvent",
92 }
93 }
94}
95
96#[derive(Debug, Clone, PartialEq, Eq)]
97pub enum EventType {
98 Workflow(WorkflowEventType),
99 Step(StepEventType),
100 Runner(RunnerEventType),
101}
102
103impl EventType {
104 pub fn as_str(&self) -> &str {
105 match self {
106 EventType::Workflow(w) => w.as_str(),
107 EventType::Step(s) => s.as_str(),
108 EventType::Runner(r) => r.as_str(),
109 }
110 }
111}
112
113impl Serialize for EventType {
114 fn serialize<S>(&self, serializer: S) -> Result<S::Ok, S::Error>
115 where
116 S: serde::Serializer,
117 {
118 serializer.serialize_str(self.as_str())
119 }
120}
121
122impl<'de> Deserialize<'de> for EventType {
123 fn deserialize<D>(deserializer: D) -> Result<Self, D::Error>
124 where
125 D: serde::Deserializer<'de>,
126 {
127 let s = String::deserialize(deserializer)?;
128 match s.as_str() {
129 "WorkflowQueuedEvent" => Ok(EventType::Workflow(WorkflowEventType::Queued)),
130 "WorkflowStartPendingEvent" => Ok(EventType::Workflow(WorkflowEventType::StartPending)),
131 "WorkflowRunningEvent" => Ok(EventType::Workflow(WorkflowEventType::Running)),
132 "WorkflowCompletedEvent" => Ok(EventType::Workflow(WorkflowEventType::Completed)),
133 "WorkflowFailedEvent" => Ok(EventType::Workflow(WorkflowEventType::Failed)),
134 "WorkflowAbortedEvent" => Ok(EventType::Workflow(WorkflowEventType::Aborted)),
135 "StepScheduledEvent" => Ok(EventType::Step(StepEventType::Scheduled)),
136 "StepRunningEvent" => Ok(EventType::Step(StepEventType::Running)),
137 "StepCompletedEvent" => Ok(EventType::Step(StepEventType::Completed)),
138 "StepFailedEvent" => Ok(EventType::Step(StepEventType::Failed)),
139 "StepQueryEvent" => Ok(EventType::Step(StepEventType::Query)),
140 "StepQueryResponseEvent" => Ok(EventType::Step(StepEventType::QueryResponse)),
141 "RunnerRegisterEvent" => Ok(EventType::Runner(RunnerEventType::Register)),
142 "RunnerHeartbeatEvent" => Ok(EventType::Runner(RunnerEventType::Heartbeat)),
143 "RunnerOfflineEvent" => Ok(EventType::Runner(RunnerEventType::Offline)),
144 _ => Err(serde::de::Error::custom(format!(
145 "Unknown EventType: {}",
146 s
147 ))),
148 }
149 }
150}
151
152impl schemars::JsonSchema for EventType {
153 fn schema_name() -> String {
154 "EventType".to_string()
155 }
156
157 fn json_schema(gen: &mut schemars::gen::SchemaGenerator) -> schemars::schema::Schema {
158 String::json_schema(gen)
159 }
160}
161
162#[derive(Debug, Clone, Serialize, Deserialize, JsonSchema)]
163pub struct WorkflowQueuedEvent {
164 pub run_id: RunId,
165 pub event_type: EventType,
166 pub timestamp: DateTime<Utc>,
167 #[serde(skip_serializing_if = "Option::is_none")]
168 pub dsl: Option<String>,
169 #[serde(skip_serializing_if = "Option::is_none")]
170 pub inputs: Option<HashMap<String, Value>>,
171 #[serde(skip_serializing_if = "Option::is_none")]
172 pub initiating_user: Option<String>,
173}
174
175#[derive(Debug, Clone, Serialize, Deserialize, JsonSchema)]
176pub struct WorkflowStartPendingEvent {
177 pub run_id: RunId,
178 pub event_type: EventType,
179 pub timestamp: DateTime<Utc>,
180}
181
182#[derive(Debug, Clone, Serialize, Deserialize, JsonSchema)]
183pub struct WorkflowRunningEvent {
184 pub run_id: RunId,
185 pub event_type: EventType,
186 pub timestamp: DateTime<Utc>,
187}
188
189#[derive(Debug, Clone, Serialize, Deserialize, JsonSchema)]
190pub struct WorkflowCompletedEvent {
191 pub run_id: RunId,
192 pub event_type: EventType,
193 pub timestamp: DateTime<Utc>,
194}
195
196#[derive(Debug, Clone, Serialize, Deserialize, JsonSchema)]
197pub struct WorkflowFailedEvent {
198 pub run_id: RunId,
199 pub event_type: EventType,
200 pub timestamp: DateTime<Utc>,
201}
202
203#[derive(Debug, Clone, Serialize, Deserialize, JsonSchema)]
204pub struct WorkflowAbortedEvent {
205 pub run_id: RunId,
206 pub event_type: EventType,
207 pub timestamp: DateTime<Utc>,
208}
209
210#[derive(Debug, Clone, Serialize, Deserialize, JsonSchema)]
211pub struct StepScheduledEvent {
212 pub run_id: RunId,
213 pub step_id: StepInstanceId,
214 #[serde(skip_serializing_if = "Option::is_none")]
215 pub step_name: Option<String>,
216 #[serde(skip_serializing_if = "Option::is_none")]
217 pub step_type: Option<String>,
218 pub event_type: EventType,
219 pub step_dsl: Value,
220 #[serde(skip_serializing_if = "Option::is_none")]
221 pub spec: Option<Value>,
222 #[serde(skip_serializing_if = "Option::is_none")]
223 pub params: Option<Value>,
224 #[serde(skip_serializing_if = "Option::is_none")]
225 pub storage: Option<HashMap<String, Value>>,
226 #[serde(skip_serializing_if = "Option::is_none")]
227 pub test_report_urls: Option<HashMap<String, Value>>,
228 pub timestamp: DateTime<Utc>,
229}
230
231#[derive(Debug, Clone, Serialize, Deserialize, JsonSchema)]
232pub struct StepRunningEvent {
233 pub run_id: RunId,
234 pub step_id: StepInstanceId,
235 pub event_type: EventType,
236 #[serde(skip_serializing_if = "Option::is_none")]
237 pub runner_id: Option<String>,
238 pub timestamp: DateTime<Utc>,
239}
240
241#[derive(Debug, Clone, Serialize, Deserialize, JsonSchema)]
242pub struct StepCompletedEvent {
243 pub run_id: RunId,
244 pub step_id: StepInstanceId,
245 pub event_type: EventType,
246 #[serde(skip_serializing_if = "Option::is_none")]
247 pub runner_id: Option<String>,
248 #[serde(skip_serializing_if = "Option::is_none")]
249 pub storage_hashes: Option<HashMap<String, Value>>,
250 #[serde(skip_serializing_if = "Option::is_none")]
251 pub artifacts: Option<HashMap<String, Value>>,
252 #[serde(skip_serializing_if = "Option::is_none")]
253 pub test_reports: Option<HashMap<String, Value>>,
254 #[serde(skip_serializing_if = "Option::is_none")]
255 pub outputs: Option<HashMap<String, Value>>,
256 #[serde(skip_serializing_if = "Option::is_none")]
257 pub exit_code: Option<i32>,
258 pub timestamp: DateTime<Utc>,
259}
260
261#[derive(Debug, Clone, Serialize, Deserialize, JsonSchema)]
262pub struct StepFailedEvent {
263 pub run_id: RunId,
264 pub step_id: StepInstanceId,
265 pub event_type: EventType,
266 pub error: String,
267 #[serde(skip_serializing_if = "Option::is_none")]
268 pub exit_code: Option<i32>,
269 #[serde(skip_serializing_if = "Option::is_none")]
270 pub runner_id: Option<String>,
271 #[serde(skip_serializing_if = "Option::is_none")]
272 pub storage_hashes: Option<HashMap<String, Value>>,
273 #[serde(skip_serializing_if = "Option::is_none")]
274 pub artifacts: Option<HashMap<String, Value>>,
275 #[serde(skip_serializing_if = "Option::is_none")]
276 pub test_reports: Option<HashMap<String, Value>>,
277 #[serde(skip_serializing_if = "Option::is_none")]
278 pub outputs: Option<HashMap<String, Value>>,
279 pub timestamp: DateTime<Utc>,
280}
281
282#[derive(Debug, Clone, Serialize, Deserialize, JsonSchema)]
283pub struct StepQueryEvent {
284 pub step_id: StepInstanceId,
285}
286
287#[derive(Debug, Clone, Serialize, Deserialize, JsonSchema)]
288pub struct StepQueryResponseEvent {
289 pub step_id: StepInstanceId,
290 pub exists: bool,
291 #[serde(skip_serializing_if = "Option::is_none")]
292 pub status: Option<crate::step::StepStatus>,
293}
294
295#[derive(Debug, Clone, Serialize, Deserialize, JsonSchema)]
296pub struct RunnerStepTypeSchema {
297 pub step_type: String,
298 #[serde(skip_serializing_if = "Option::is_none")]
299 pub schema: Option<serde_json::Value>,
300 #[serde(skip_serializing_if = "Option::is_none")]
301 pub documentation: Option<String>,
302}
303
304#[derive(Debug, Clone, Serialize, Deserialize, JsonSchema)]
305pub struct RunnerRegisterEvent {
306 pub runner_id: String,
307 pub runner_type: String,
308 pub protocol_version: String,
309 pub nats_subject: String,
310 pub capabilities: Vec<String>,
311 pub step_types: Vec<RunnerStepTypeSchema>,
312}
313
314#[derive(Debug, Clone, Serialize, Deserialize, JsonSchema)]
315pub struct RunnerHeartbeatEvent {
316 pub runner_id: String,
317 pub version: String,
318 pub state: crate::runner::RunnerStatus,
319}
320
321#[derive(Debug, Clone, Serialize, Deserialize, JsonSchema)]
322pub struct RunnerOfflineEvent {
323 pub runner_id: String,
324 pub event_type: EventType,
325 pub timestamp: DateTime<Utc>,
326}
327
328#[cfg(test)]
329mod tests {
330 use super::*;
331 use uuid::Uuid;
332
333 #[test]
334 fn test_runner_register_event_deserializes_from_wire_format() {
335 let wire_json = serde_json::json!({
336 "runner_id": "runner-abc-123",
337 "runner_type": "docker",
338 "protocol_version": "v1",
339 "nats_subject": "stormchaser.v1.runner.docker.runner-abc-123",
340 "capabilities": ["docker", "linux", "container"],
341 "step_types": [
342 {
343 "step_type": "RunContainer",
344 "documentation": "Runs a container using Docker."
345 }
346 ]
347 });
348
349 let event: RunnerRegisterEvent = serde_json::from_value(wire_json).unwrap();
350 assert_eq!(event.runner_id, "runner-abc-123");
351 assert_eq!(event.runner_type, "docker");
352 assert_eq!(event.protocol_version, "v1");
353 assert_eq!(
354 event.nats_subject,
355 "stormchaser.v1.runner.docker.runner-abc-123"
356 );
357 assert_eq!(event.capabilities, vec!["docker", "linux", "container"]);
358 assert_eq!(event.step_types.len(), 1);
359 assert_eq!(event.step_types[0].step_type, "RunContainer");
360 assert!(event.step_types[0].schema.is_none());
361 }
362
363 #[test]
364 fn test_runner_register_event_serializes_correctly() {
365 let event = RunnerRegisterEvent {
366 runner_id: "runner-xyz".to_string(),
367 runner_type: "k8s".to_string(),
368 protocol_version: "v1".to_string(),
369 nats_subject: "stormchaser.v1.runner.k8s.runner-xyz".to_string(),
370 capabilities: vec!["k8s".to_string()],
371 step_types: vec![RunnerStepTypeSchema {
372 step_type: "RunK8sJob".to_string(),
373 schema: None,
374 documentation: Some("Runs a Kubernetes Job.".to_string()),
375 }],
376 };
377
378 let json = serde_json::to_value(&event).unwrap();
379 assert_eq!(json["runner_id"], "runner-xyz");
380 assert_eq!(json["runner_type"], "k8s");
381 assert_eq!(json["protocol_version"], "v1");
382 assert_eq!(json["step_types"][0]["step_type"], "RunK8sJob");
383 assert!(json["step_types"][0].get("schema").is_none());
385 }
386
387 #[test]
388 fn test_step_query_event_has_only_step_id() {
389 let event = StepQueryEvent {
390 step_id: StepInstanceId::new(Uuid::nil()),
391 };
392 let json = serde_json::to_value(&event).unwrap();
393 assert!(json.get("step_id").is_some());
394 assert!(json.get("run_id").is_none());
395 }
396}