1use super::{
2 DirectRunRequest, EnqueueRequest, EnqueueResponse, ListRunsQuery, StepDetail,
3 WorkflowRunDetail, WorkflowRunFullDetail,
4};
5use crate::db;
6use crate::{AppState, AuthClaims, RUNS_ENQUEUED};
7use async_nats::jetstream;
8use async_nats::HeaderMap;
9use axum::response::sse::Event;
10use axum::{
11 extract::{Path, Query, State},
12 http::StatusCode,
13 response::IntoResponse,
14 Json,
15};
16use chrono::Utc;
17use futures::StreamExt;
18use serde_json::Value;
19use stormchaser_model::events::WorkflowQueuedEvent;
20use stormchaser_model::events::{EventSource, EventType, SchemaVersion, WorkflowEventType};
21use stormchaser_model::nats::{publish_cloudevent, NatsSubject};
22use stormchaser_model::workflow::RunStatus;
23use stormchaser_model::RunId;
24use tokio::sync::mpsc;
25use uuid::Uuid;
26
27#[utoipa::path(
28 post,
29 path = "/api/v1/runs",
30 request_body = EnqueueRequest,
31 responses(
32 (status = 200, description = "Workflow enqueued", body = EnqueueResponse),
33 (status = 500, description = "Internal Server Error")
34 ),
35 security(
36 ("bearer_auth" = [])
37 ),
38 tag = "stormchaser"
39)]
40#[tracing::instrument(skip(state, claims), fields(run_id = tracing::field::Empty, initiating_user = tracing::field::Empty))]
41pub async fn enqueue_workflow(
43 AuthClaims(claims): AuthClaims,
44 State(state): State<AppState>,
45 Json(payload): Json<EnqueueRequest>,
46) -> Result<impl IntoResponse, StatusCode> {
47 let run_id = stormchaser_model::RunId::new_v4();
48 let user_id = claims.email.clone().unwrap_or(claims.sub.clone());
49
50 let span = tracing::Span::current();
51 span.record("run_id", tracing::field::display(run_id));
52 span.record("initiating_user", tracing::field::display(&user_id));
53
54 tracing::info!("Enqueuing workflow: {:?}", payload.workflow_name);
55 let fencing_token = Utc::now().timestamp_nanos_opt().unwrap_or(0);
56
57 let mut tx = state
58 .pool
59 .begin()
60 .await
61 .map_err(|_| StatusCode::INTERNAL_SERVER_ERROR)?;
62
63 db::insert_workflow_run(
65 &mut tx,
66 run_id,
67 &payload.workflow_name,
68 &user_id,
69 &payload.repo_url,
70 &payload.workflow_path,
71 &payload.git_ref,
72 RunStatus::Queued,
73 fencing_token,
74 )
75 .await
76 .inspect_err(|e| tracing::error!(run_id = %run_id, "Database error inserting run: {:?}", e))
77 .map_err(|_| StatusCode::INTERNAL_SERVER_ERROR)?;
78
79 db::insert_run_context(
81 &mut tx,
82 run_id,
83 "v1",
84 serde_json::json!({}),
85 "",
86 &payload.inputs,
87 )
88 .await
89 .map_err(|_| StatusCode::INTERNAL_SERVER_ERROR)?;
90
91 let timeout = payload
93 .overrides
94 .as_ref()
95 .and_then(|o| o.timeout.clone())
96 .unwrap_or_else(|| "1h".to_string());
97
98 db::insert_run_quotas(&mut tx, run_id, 10, "1", "4Gi", "10Gi", &timeout)
99 .await
100 .map_err(|_| StatusCode::INTERNAL_SERVER_ERROR)?;
101
102 tx.commit()
103 .await
104 .map_err(|_| StatusCode::INTERNAL_SERVER_ERROR)?;
105
106 let event = WorkflowQueuedEvent {
108 run_id,
109 event_type: EventType::Workflow(WorkflowEventType::Queued),
110 timestamp: Utc::now(),
111 dsl: None,
112 inputs: None,
113 initiating_user: None,
114 };
115
116 publish_cloudevent(
117 &jetstream::new(state.nats.clone()),
118 NatsSubject::RunQueued,
119 EventType::Workflow(WorkflowEventType::Queued),
120 EventSource::Api,
121 serde_json::to_value(event).unwrap(),
122 Some(SchemaVersion::new("1.0".to_string())),
123 None,
124 )
125 .await
126 .map_err(|_| StatusCode::INTERNAL_SERVER_ERROR)?;
127
128 RUNS_ENQUEUED.add(
129 1,
130 &[
131 opentelemetry::KeyValue::new("workflow_name", payload.workflow_name.clone()),
132 opentelemetry::KeyValue::new("initiating_user", user_id.clone()),
133 ],
134 );
135
136 Ok(Json(EnqueueResponse {
137 run_id,
138 status: stormchaser_model::RunStatus::Queued,
139 }))
140}
141
142#[utoipa::path(
143 get,
144 path = "/api/v1/runs",
145 params(
146 ListRunsQuery
147 ),
148 responses(
149 (status = 200, description = "List of workflow runs", body = [WorkflowRunDetail]),
150 (status = 500, description = "Internal Server Error")
151 ),
152 security(
153 ("bearer_auth" = [])
154 ),
155 tag = "stormchaser"
156)]
157pub async fn list_workflow_runs(
159 AuthClaims(_claims): AuthClaims,
160 State(state): State<AppState>,
161 Query(params): Query<ListRunsQuery>,
162) -> Result<impl IntoResponse, StatusCode> {
163 let limit = params.limit.unwrap_or(20).min(100);
164 let offset = params.offset.unwrap_or(0);
165
166 let runs = db::list_workflow_runs(&state.pool, ¶ms, limit as i64, offset as i64)
167 .await
168 .map_err(|e| {
169 tracing::error!("Failed to fetch workflow runs: {:?}", e);
170 StatusCode::INTERNAL_SERVER_ERROR
171 })?;
172
173 Ok(Json(runs))
174}
175
176#[utoipa::path(
178 get,
179 path = "/api/v1/runs/{id}",
180 params(
181 ("id" = stormchaser_model::RunId, Path, description = "Run ID")
182 ),
183 responses(
184 (status = 200, description = "Workflow run details", body = WorkflowRunFullDetail),
185 (status = 404, description = "Run not found")
186 ),
187 security(
188 ("bearer_auth" = [])
189 ),
190 tag = "stormchaser"
191)]
192pub async fn get_workflow_run(
194 AuthClaims(_claims): AuthClaims,
195 State(state): State<AppState>,
196 Path(run_id): Path<RunId>,
197) -> Result<impl IntoResponse, StatusCode> {
198 let detail: WorkflowRunDetail = db::get_workflow_run_detail(&state.pool, run_id)
200 .await
201 .map_err(|e| {
202 tracing::error!(
203 "Failed to fetch workflow run detail for {}: {:?}",
204 run_id,
205 e
206 );
207 StatusCode::INTERNAL_SERVER_ERROR
208 })?
209 .ok_or(StatusCode::NOT_FOUND)?;
210
211 let instances = db::get_step_instances(&state.pool, run_id)
213 .await
214 .map_err(|e| {
215 tracing::error!("Failed to fetch step instances for {}: {:?}", run_id, e);
216 StatusCode::INTERNAL_SERVER_ERROR
217 })?;
218
219 let mut steps = Vec::new();
220 for instance in instances {
221 let outputs = db::get_step_outputs(&state.pool, instance.id)
223 .await
224 .map_err(|_| StatusCode::INTERNAL_SERVER_ERROR)?;
225
226 let history = db::get_step_status_history(&state.pool, instance.id)
228 .await
229 .map_err(|_| StatusCode::INTERNAL_SERVER_ERROR)?;
230
231 let mut logs = if let Some(backend) = &state.log_backend {
232 backend
233 .fetch_step_logs(
234 &instance.step_name,
235 instance.id,
236 instance.started_at,
237 instance.finished_at,
238 Some(100), )
240 .await
241 .unwrap_or_else(|e| {
242 tracing::warn!("Failed to fetch logs for step {}: {:?}", instance.id, e);
243 vec![format!("Error fetching logs: {}", e)]
244 })
245 } else {
246 vec!["Log backend not configured".to_string()]
247 };
248
249 if logs.len() > 1000 {
250 logs = logs.split_off(logs.len() - 1000);
251 }
252
253 steps.push(StepDetail {
254 instance,
255 outputs,
256 history,
257 logs,
258 });
259 }
260
261 let artifacts = db::list_run_artifacts(&state.pool, run_id)
262 .await
263 .map_err(|e| {
264 tracing::error!("Failed to fetch run artifacts for {}: {:?}", run_id, e);
265 StatusCode::INTERNAL_SERVER_ERROR
266 })?;
267
268 let test_summaries = db::list_run_test_summaries(&state.pool, run_id)
269 .await
270 .map_err(|e| {
271 tracing::error!("Failed to fetch test summaries for {}: {:?}", run_id, e);
272 StatusCode::INTERNAL_SERVER_ERROR
273 })?;
274
275 let test_cases = db::list_run_test_cases(&state.pool, run_id)
276 .await
277 .map_err(|e| {
278 tracing::error!("Failed to fetch test cases for {}: {:?}", run_id, e);
279 StatusCode::INTERNAL_SERVER_ERROR
280 })?;
281
282 Ok(Json(WorkflowRunFullDetail {
283 detail,
284 steps,
285 artifacts,
286 test_summaries,
287 test_cases,
288 }))
289}
290
291#[utoipa::path(
293 delete,
294 path = "/api/v1/runs/{run_id}",
295 params(("run_id" = stormchaser_model::RunId, Path, description="Run ID")),
296 responses(
297 (status = 200, description = "Success"),
298 (status = 400, description = "Bad Request"),
299 (status = 404, description = "Not Found"),
300 (status = 500, description = "Internal Server Error")
301 ),
302 tag = "workflow"
303)]
304pub async fn delete_workflow_run_api(
305 AuthClaims(_claims): AuthClaims,
306 State(state): State<AppState>,
307 Path(run_id): Path<RunId>,
308) -> Result<impl IntoResponse, StatusCode> {
309 db::delete_workflow_run(&state.pool, run_id)
310 .await
311 .map_err(|e| {
312 tracing::error!("Failed to delete workflow run {}: {:?}", run_id, e);
313 StatusCode::INTERNAL_SERVER_ERROR
314 })?;
315
316 Ok(StatusCode::NO_CONTENT)
317}
318
319#[utoipa::path(
320 post,
321 path = "/api/v1/runs/direct",
322 request_body = DirectRunRequest,
323 responses(
324 (status = 200, description = "Workflow started", body = EnqueueResponse),
325 (status = 500, description = "Internal Server Error")
326 ),
327 security(
328 ("bearer_auth" = [])
329 ),
330 tag = "workflow"
331)]
332#[tracing::instrument(skip(state, claims), fields(run_id = tracing::field::Empty, initiating_user = tracing::field::Empty))]
334pub async fn direct_run(
335 AuthClaims(claims): AuthClaims,
336 State(state): State<AppState>,
337 Json(payload): Json<DirectRunRequest>,
338) -> Result<impl IntoResponse, StatusCode> {
339 let run_id = stormchaser_model::RunId::new_v4();
340 let user_id = claims.email.clone().unwrap_or(claims.sub.clone());
341
342 let span = tracing::Span::current();
343 span.record("run_id", tracing::field::display(run_id));
344 span.record("initiating_user", tracing::field::display(&user_id));
345
346 let payload_json = serde_json::json!({
348 "run_id": run_id,
349 "dsl": payload.dsl,
350 "initiating_user": user_id,
351 "inputs": payload.inputs,
352 });
353
354 use cloudevents::{EventBuilder, EventBuilderV10};
355 let event = EventBuilderV10::new()
356 .id(uuid::Uuid::new_v4().to_string())
357 .ty("stormchaser.v1.run.direct")
358 .source(EventSource::Api.as_str())
359 .time(Utc::now())
360 .data(stormchaser_model::APPLICATION_JSON, payload_json)
361 .build()
362 .map_err(|e| {
363 tracing::error!("Failed to build CloudEvent: {}", e);
364 StatusCode::INTERNAL_SERVER_ERROR
365 })?;
366
367 let event_str = serde_json::to_string(&event).unwrap_or_default();
368 let mut headers = HeaderMap::new();
369 headers.insert("Content-Type", "application/cloudevents+json");
370
371 state
372 .nats
373 .publish_with_headers("stormchaser.v1.run.direct", headers, event_str.into())
374 .await
375 .map_err(|e| {
376 tracing::error!("Failed to publish CloudEvent: {}", e);
377 StatusCode::INTERNAL_SERVER_ERROR
378 })?;
379
380 Ok(Json(EnqueueResponse {
381 run_id,
382 status: RunStatus::Running,
383 }))
384}
385
386#[utoipa::path(
387 get,
388 path = "/api/v1/runs/stream",
389 responses(
390 (status = 200, description = "Workflow runs stream (SSE)")
391 ),
392 security(
393 ("bearer_auth" = [])
394 ),
395 tag = "workflow"
396)]
397pub async fn stream_workflow_runs_api(
399 AuthClaims(_claims): AuthClaims,
400 State(state): State<AppState>,
401) -> Result<
402 axum::response::sse::Sse<
403 impl futures::stream::Stream<Item = Result<Event, std::convert::Infallible>>,
404 >,
405 StatusCode,
406> {
407 let (tx, rx) = mpsc::channel::<Result<Event, std::convert::Infallible>>(100);
408
409 let nats = state.nats.clone();
410 let pool = state.pool.clone();
411
412 tokio::spawn(async move {
413 let mut subscriber = match nats.subscribe("stormchaser.v1.run.>").await {
414 Ok(sub) => sub,
415 Err(e) => {
416 tracing::error!("Failed to subscribe to NATS for workflow runs: {:?}", e);
417 return;
418 }
419 };
420
421 while let Some(msg) = subscriber.next().await {
422 let ce: cloudevents::Event = match serde_json::from_slice(&msg.payload) {
423 Ok(e) => e,
424 Err(_) => continue,
425 };
426 let payload: Value = if let Some(cloudevents::Data::Json(v)) = ce.data() {
427 v.clone()
428 } else {
429 continue;
430 };
431
432 if let Some(run_id_str) = payload.get("run_id").and_then(|id| id.as_str()) {
433 if let Ok(run_id) = Uuid::parse_str(run_id_str) {
434 let detail =
436 db::get_workflow_run_detail(&pool, stormchaser_model::RunId::new(run_id))
437 .await
438 .unwrap_or(None);
439
440 if let Some(run) = detail {
441 let data = serde_json::to_string(&run).unwrap_or_default();
442 let event = Event::default().event("workflow_run").data(data);
443 if tx.send(Ok(event)).await.is_err() {
444 break;
445 }
446 }
447 }
448 }
449 }
450 });
451
452 let stream = tokio_stream::wrappers::ReceiverStream::new(rx).map(|res| match res {
453 Ok(event) => Ok(event),
454 Err(_) => unreachable!(),
455 });
456
457 Ok(axum::response::sse::Sse::new(stream).keep_alive(axum::response::sse::KeepAlive::default()))
458}