1use super::{
2 DirectRunRequest, EnqueueRequest, EnqueueResponse, ListRunsQuery, StepDetail,
3 WorkflowRunDetail, WorkflowRunFullDetail,
4};
5use crate::db;
6use crate::{AppState, AuthClaims, RUNS_ENQUEUED};
7use async_nats::jetstream;
8use async_nats::HeaderMap;
9use axum::response::sse::Event;
10use axum::{
11 extract::{Path, Query, State},
12 http::StatusCode,
13 response::IntoResponse,
14 Json,
15};
16use chrono::Utc;
17use futures::StreamExt;
18use serde_json::Value;
19use stormchaser_model::events::WorkflowQueuedEvent;
20use stormchaser_model::nats::publish_cloudevent;
21use stormchaser_model::workflow::RunStatus;
22use stormchaser_model::RunId;
23use tokio::sync::mpsc;
24use uuid::Uuid;
25
26#[utoipa::path(
27 post,
28 path = "/api/v1/runs",
29 request_body = EnqueueRequest,
30 responses(
31 (status = 200, description = "Workflow enqueued", body = EnqueueResponse),
32 (status = 500, description = "Internal Server Error")
33 ),
34 security(
35 ("bearer_auth" = [])
36 ),
37 tag = "stormchaser"
38)]
39#[tracing::instrument(skip(state, claims), fields(run_id = tracing::field::Empty, initiating_user = tracing::field::Empty))]
40pub async fn enqueue_workflow(
42 AuthClaims(claims): AuthClaims,
43 State(state): State<AppState>,
44 Json(payload): Json<EnqueueRequest>,
45) -> Result<impl IntoResponse, StatusCode> {
46 let run_id = stormchaser_model::RunId::new_v4();
47 let user_id = claims.email.clone().unwrap_or(claims.sub.clone());
48
49 let span = tracing::Span::current();
50 span.record("run_id", tracing::field::display(run_id));
51 span.record("initiating_user", tracing::field::display(&user_id));
52
53 tracing::info!("Enqueuing workflow: {:?}", payload.workflow_name);
54 let fencing_token = Utc::now().timestamp_nanos_opt().unwrap_or(0);
55
56 let mut tx = state
57 .pool
58 .begin()
59 .await
60 .map_err(|_| StatusCode::INTERNAL_SERVER_ERROR)?;
61
62 db::insert_workflow_run(
64 &mut tx,
65 run_id,
66 &payload.workflow_name,
67 &user_id,
68 &payload.repo_url,
69 &payload.workflow_path,
70 &payload.git_ref,
71 RunStatus::Queued,
72 fencing_token,
73 )
74 .await
75 .inspect_err(|e| tracing::error!(run_id = %run_id, "Database error inserting run: {:?}", e))
76 .map_err(|_| StatusCode::INTERNAL_SERVER_ERROR)?;
77
78 db::insert_run_context(
80 &mut tx,
81 run_id,
82 "v1",
83 serde_json::json!({}),
84 "",
85 &payload.inputs,
86 )
87 .await
88 .map_err(|_| StatusCode::INTERNAL_SERVER_ERROR)?;
89
90 let timeout = payload
92 .overrides
93 .as_ref()
94 .and_then(|o| o.timeout.clone())
95 .unwrap_or_else(|| "1h".to_string());
96
97 db::insert_run_quotas(&mut tx, run_id, 10, "1", "4Gi", "10Gi", &timeout)
98 .await
99 .map_err(|_| StatusCode::INTERNAL_SERVER_ERROR)?;
100
101 tx.commit()
102 .await
103 .map_err(|_| StatusCode::INTERNAL_SERVER_ERROR)?;
104
105 let event = WorkflowQueuedEvent {
107 run_id,
108 event_type: "workflow_queued".to_string(),
109 timestamp: Utc::now(),
110 dsl: None,
111 inputs: None,
112 initiating_user: None,
113 };
114
115 publish_cloudevent(
116 &jetstream::new(state.nats.clone()),
117 "stormchaser.v1.run.queued",
118 "stormchaser.v1.run.queued",
119 "/stormchaser/api",
120 serde_json::to_value(event).unwrap(),
121 Some("1.0"),
122 None,
123 )
124 .await
125 .map_err(|_| StatusCode::INTERNAL_SERVER_ERROR)?;
126
127 RUNS_ENQUEUED.add(
128 1,
129 &[
130 opentelemetry::KeyValue::new("workflow_name", payload.workflow_name.clone()),
131 opentelemetry::KeyValue::new("initiating_user", user_id.clone()),
132 ],
133 );
134
135 Ok(Json(EnqueueResponse {
136 run_id,
137 status: "queued".to_string(),
138 }))
139}
140
141#[utoipa::path(
142 get,
143 path = "/api/v1/runs",
144 params(
145 ListRunsQuery
146 ),
147 responses(
148 (status = 200, description = "List of workflow runs", body = [WorkflowRunDetail]),
149 (status = 500, description = "Internal Server Error")
150 ),
151 security(
152 ("bearer_auth" = [])
153 ),
154 tag = "stormchaser"
155)]
156pub async fn list_workflow_runs(
158 AuthClaims(_claims): AuthClaims,
159 State(state): State<AppState>,
160 Query(params): Query<ListRunsQuery>,
161) -> Result<impl IntoResponse, StatusCode> {
162 let limit = params.limit.unwrap_or(20).min(100);
163 let offset = params.offset.unwrap_or(0);
164
165 let runs = db::list_workflow_runs(&state.pool, ¶ms, limit as i64, offset as i64)
166 .await
167 .map_err(|e| {
168 tracing::error!("Failed to fetch workflow runs: {:?}", e);
169 StatusCode::INTERNAL_SERVER_ERROR
170 })?;
171
172 Ok(Json(runs))
173}
174
175#[utoipa::path(
177 get,
178 path = "/api/v1/runs/{id}",
179 params(
180 ("id" = stormchaser_model::RunId, Path, description = "Run ID")
181 ),
182 responses(
183 (status = 200, description = "Workflow run details", body = WorkflowRunFullDetail),
184 (status = 404, description = "Run not found")
185 ),
186 security(
187 ("bearer_auth" = [])
188 ),
189 tag = "stormchaser"
190)]
191pub async fn get_workflow_run(
193 AuthClaims(_claims): AuthClaims,
194 State(state): State<AppState>,
195 Path(run_id): Path<RunId>,
196) -> Result<impl IntoResponse, StatusCode> {
197 let detail: WorkflowRunDetail = db::get_workflow_run_detail(&state.pool, run_id)
199 .await
200 .map_err(|e| {
201 tracing::error!(
202 "Failed to fetch workflow run detail for {}: {:?}",
203 run_id,
204 e
205 );
206 StatusCode::INTERNAL_SERVER_ERROR
207 })?
208 .ok_or(StatusCode::NOT_FOUND)?;
209
210 let instances = db::get_step_instances(&state.pool, run_id)
212 .await
213 .map_err(|e| {
214 tracing::error!("Failed to fetch step instances for {}: {:?}", run_id, e);
215 StatusCode::INTERNAL_SERVER_ERROR
216 })?;
217
218 let mut steps = Vec::new();
219 for instance in instances {
220 let outputs = db::get_step_outputs(&state.pool, instance.id)
222 .await
223 .map_err(|_| StatusCode::INTERNAL_SERVER_ERROR)?;
224
225 let history = db::get_step_status_history(&state.pool, instance.id)
227 .await
228 .map_err(|_| StatusCode::INTERNAL_SERVER_ERROR)?;
229
230 let mut logs = if let Some(backend) = &state.log_backend {
231 backend
232 .fetch_step_logs(
233 &instance.step_name,
234 instance.id,
235 instance.started_at,
236 instance.finished_at,
237 Some(100), )
239 .await
240 .unwrap_or_else(|e| {
241 tracing::warn!("Failed to fetch logs for step {}: {:?}", instance.id, e);
242 vec![format!("Error fetching logs: {}", e)]
243 })
244 } else {
245 vec!["Log backend not configured".to_string()]
246 };
247
248 if logs.len() > 1000 {
249 logs = logs.split_off(logs.len() - 1000);
250 }
251
252 steps.push(StepDetail {
253 instance,
254 outputs,
255 history,
256 logs,
257 });
258 }
259
260 let artifacts = db::list_run_artifacts(&state.pool, run_id)
261 .await
262 .map_err(|e| {
263 tracing::error!("Failed to fetch run artifacts for {}: {:?}", run_id, e);
264 StatusCode::INTERNAL_SERVER_ERROR
265 })?;
266
267 let test_summaries = db::list_run_test_summaries(&state.pool, run_id)
268 .await
269 .map_err(|e| {
270 tracing::error!("Failed to fetch test summaries for {}: {:?}", run_id, e);
271 StatusCode::INTERNAL_SERVER_ERROR
272 })?;
273
274 let test_cases = db::list_run_test_cases(&state.pool, run_id)
275 .await
276 .map_err(|e| {
277 tracing::error!("Failed to fetch test cases for {}: {:?}", run_id, e);
278 StatusCode::INTERNAL_SERVER_ERROR
279 })?;
280
281 Ok(Json(WorkflowRunFullDetail {
282 detail,
283 steps,
284 artifacts,
285 test_summaries,
286 test_cases,
287 }))
288}
289
290#[utoipa::path(
292 delete,
293 path = "/api/v1/runs/{run_id}",
294 params(("run_id" = stormchaser_model::RunId, Path, description="Run ID")),
295 responses(
296 (status = 200, description = "Success"),
297 (status = 400, description = "Bad Request"),
298 (status = 404, description = "Not Found"),
299 (status = 500, description = "Internal Server Error")
300 ),
301 tag = "workflow"
302)]
303pub async fn delete_workflow_run_api(
304 AuthClaims(_claims): AuthClaims,
305 State(state): State<AppState>,
306 Path(run_id): Path<RunId>,
307) -> Result<impl IntoResponse, StatusCode> {
308 db::delete_workflow_run(&state.pool, run_id)
309 .await
310 .map_err(|e| {
311 tracing::error!("Failed to delete workflow run {}: {:?}", run_id, e);
312 StatusCode::INTERNAL_SERVER_ERROR
313 })?;
314
315 Ok(StatusCode::NO_CONTENT)
316}
317
318#[utoipa::path(
319 post,
320 path = "/api/v1/runs/direct",
321 request_body = DirectRunRequest,
322 responses(
323 (status = 200, description = "Workflow started", body = EnqueueResponse),
324 (status = 500, description = "Internal Server Error")
325 ),
326 security(
327 ("bearer_auth" = [])
328 ),
329 tag = "workflow"
330)]
331#[tracing::instrument(skip(state, claims), fields(run_id = tracing::field::Empty, initiating_user = tracing::field::Empty))]
333pub async fn direct_run(
334 AuthClaims(claims): AuthClaims,
335 State(state): State<AppState>,
336 Json(payload): Json<DirectRunRequest>,
337) -> Result<impl IntoResponse, StatusCode> {
338 let run_id = stormchaser_model::RunId::new_v4();
339 let user_id = claims.email.clone().unwrap_or(claims.sub.clone());
340
341 let span = tracing::Span::current();
342 span.record("run_id", tracing::field::display(run_id));
343 span.record("initiating_user", tracing::field::display(&user_id));
344
345 let payload_json = serde_json::json!({
347 "run_id": run_id,
348 "dsl": payload.dsl,
349 "initiating_user": user_id,
350 "inputs": payload.inputs,
351 });
352
353 use cloudevents::{EventBuilder, EventBuilderV10};
354 let event = EventBuilderV10::new()
355 .id(uuid::Uuid::new_v4().to_string())
356 .ty("stormchaser.v1.run.direct")
357 .source("/stormchaser/api")
358 .time(Utc::now())
359 .data("application/json", payload_json)
360 .build()
361 .map_err(|e| {
362 tracing::error!("Failed to build CloudEvent: {}", e);
363 StatusCode::INTERNAL_SERVER_ERROR
364 })?;
365
366 let event_str = serde_json::to_string(&event).unwrap_or_default();
367 let mut headers = HeaderMap::new();
368 headers.insert("Content-Type", "application/cloudevents+json");
369
370 state
371 .nats
372 .publish_with_headers("stormchaser.v1.run.direct", headers, event_str.into())
373 .await
374 .map_err(|e| {
375 tracing::error!("Failed to publish CloudEvent: {}", e);
376 StatusCode::INTERNAL_SERVER_ERROR
377 })?;
378
379 Ok(Json(EnqueueResponse {
380 run_id,
381 status: "started".to_string(),
382 }))
383}
384
385#[utoipa::path(
386 get,
387 path = "/api/v1/runs/stream",
388 responses(
389 (status = 200, description = "Workflow runs stream (SSE)")
390 ),
391 security(
392 ("bearer_auth" = [])
393 ),
394 tag = "workflow"
395)]
396pub async fn stream_workflow_runs_api(
398 AuthClaims(_claims): AuthClaims,
399 State(state): State<AppState>,
400) -> Result<
401 axum::response::sse::Sse<
402 impl futures::stream::Stream<Item = Result<Event, std::convert::Infallible>>,
403 >,
404 StatusCode,
405> {
406 let (tx, rx) = mpsc::channel::<Result<Event, std::convert::Infallible>>(100);
407
408 let nats = state.nats.clone();
409 let pool = state.pool.clone();
410
411 tokio::spawn(async move {
412 let mut subscriber = match nats.subscribe("stormchaser.v1.run.>").await {
413 Ok(sub) => sub,
414 Err(e) => {
415 tracing::error!("Failed to subscribe to NATS for workflow runs: {:?}", e);
416 return;
417 }
418 };
419
420 while let Some(msg) = subscriber.next().await {
421 let ce: cloudevents::Event = match serde_json::from_slice(&msg.payload) {
422 Ok(e) => e,
423 Err(_) => continue,
424 };
425 let payload: Value = if let Some(cloudevents::Data::Json(v)) = ce.data() {
426 v.clone()
427 } else {
428 continue;
429 };
430
431 if let Some(run_id_str) = payload.get("run_id").and_then(|id| id.as_str()) {
432 if let Ok(run_id) = Uuid::parse_str(run_id_str) {
433 let detail =
435 db::get_workflow_run_detail(&pool, stormchaser_model::RunId::new(run_id))
436 .await
437 .unwrap_or(None);
438
439 if let Some(run) = detail {
440 let data = serde_json::to_string(&run).unwrap_or_default();
441 let event = Event::default().event("workflow_run").data(data);
442 if tx.send(Ok(event)).await.is_err() {
443 break;
444 }
445 }
446 }
447 }
448 }
449 });
450
451 let stream = tokio_stream::wrappers::ReceiverStream::new(rx).map(|res| match res {
452 Ok(event) => Ok(event),
453 Err(_) => unreachable!(),
454 });
455
456 Ok(axum::response::sse::Sse::new(stream).keep_alive(axum::response::sse::KeepAlive::default()))
457}