Skip to main content

stormchaser_api/routes/
workflow.rs

1use super::{
2    DirectRunRequest, EnqueueRequest, EnqueueResponse, ListRunsQuery, StepDetail,
3    WorkflowRunDetail, WorkflowRunFullDetail,
4};
5use crate::db;
6use crate::{AppState, AuthClaims, RUNS_ENQUEUED};
7use async_nats::jetstream;
8use async_nats::HeaderMap;
9use axum::response::sse::Event;
10use axum::{
11    extract::{Path, Query, State},
12    http::StatusCode,
13    response::IntoResponse,
14    Json,
15};
16use chrono::Utc;
17use futures::StreamExt;
18use serde_json::Value;
19use stormchaser_model::events::WorkflowQueuedEvent;
20use stormchaser_model::nats::publish_cloudevent;
21use stormchaser_model::workflow::RunStatus;
22use stormchaser_model::RunId;
23use tokio::sync::mpsc;
24use uuid::Uuid;
25
26#[utoipa::path(
27    post,
28    path = "/api/v1/runs",
29    request_body = EnqueueRequest,
30    responses(
31        (status = 200, description = "Workflow enqueued", body = EnqueueResponse),
32        (status = 500, description = "Internal Server Error")
33    ),
34    security(
35        ("bearer_auth" = [])
36    ),
37    tag = "stormchaser"
38)]
39#[tracing::instrument(skip(state, claims), fields(run_id = tracing::field::Empty, initiating_user = tracing::field::Empty))]
40/// Enqueue workflow.
41pub async fn enqueue_workflow(
42    AuthClaims(claims): AuthClaims,
43    State(state): State<AppState>,
44    Json(payload): Json<EnqueueRequest>,
45) -> Result<impl IntoResponse, StatusCode> {
46    let run_id = stormchaser_model::RunId::new_v4();
47    let user_id = claims.email.clone().unwrap_or(claims.sub.clone());
48
49    let span = tracing::Span::current();
50    span.record("run_id", tracing::field::display(run_id));
51    span.record("initiating_user", tracing::field::display(&user_id));
52
53    tracing::info!("Enqueuing workflow: {:?}", payload.workflow_name);
54    let fencing_token = Utc::now().timestamp_nanos_opt().unwrap_or(0);
55
56    let mut tx = state
57        .pool
58        .begin()
59        .await
60        .map_err(|_| StatusCode::INTERNAL_SERVER_ERROR)?;
61
62    // Create WorkflowRun with initiating_user
63    db::insert_workflow_run(
64        &mut tx,
65        run_id,
66        &payload.workflow_name,
67        &user_id,
68        &payload.repo_url,
69        &payload.workflow_path,
70        &payload.git_ref,
71        RunStatus::Queued,
72        fencing_token,
73    )
74    .await
75    .inspect_err(|e| tracing::error!(run_id = %run_id, "Database error inserting run: {:?}", e))
76    .map_err(|_| StatusCode::INTERNAL_SERVER_ERROR)?;
77
78    // Create RunContext (placeholder for dsl_version and workflow_definition)
79    db::insert_run_context(
80        &mut tx,
81        run_id,
82        "v1",
83        serde_json::json!({}),
84        "",
85        &payload.inputs,
86    )
87    .await
88    .map_err(|_| StatusCode::INTERNAL_SERVER_ERROR)?;
89
90    // 3. Create RunQuotas (always, with defaults + overrides)
91    let timeout = payload
92        .overrides
93        .as_ref()
94        .and_then(|o| o.timeout.clone())
95        .unwrap_or_else(|| "1h".to_string());
96
97    db::insert_run_quotas(&mut tx, run_id, 10, "1", "4Gi", "10Gi", &timeout)
98        .await
99        .map_err(|_| StatusCode::INTERNAL_SERVER_ERROR)?;
100
101    tx.commit()
102        .await
103        .map_err(|_| StatusCode::INTERNAL_SERVER_ERROR)?;
104
105    // Publish to NATS
106    let event = WorkflowQueuedEvent {
107        run_id,
108        event_type: "workflow_queued".to_string(),
109        timestamp: Utc::now(),
110        dsl: None,
111        inputs: None,
112        initiating_user: None,
113    };
114
115    publish_cloudevent(
116        &jetstream::new(state.nats.clone()),
117        "stormchaser.v1.run.queued",
118        "stormchaser.v1.run.queued",
119        "/stormchaser/api",
120        serde_json::to_value(event).unwrap(),
121        Some("1.0"),
122        None,
123    )
124    .await
125    .map_err(|_| StatusCode::INTERNAL_SERVER_ERROR)?;
126
127    RUNS_ENQUEUED.add(
128        1,
129        &[
130            opentelemetry::KeyValue::new("workflow_name", payload.workflow_name.clone()),
131            opentelemetry::KeyValue::new("initiating_user", user_id.clone()),
132        ],
133    );
134
135    Ok(Json(EnqueueResponse {
136        run_id,
137        status: "queued".to_string(),
138    }))
139}
140
141#[utoipa::path(
142    get,
143    path = "/api/v1/runs",
144    params(
145        ListRunsQuery
146    ),
147    responses(
148        (status = 200, description = "List of workflow runs", body = [WorkflowRunDetail]),
149        (status = 500, description = "Internal Server Error")
150    ),
151    security(
152        ("bearer_auth" = [])
153    ),
154    tag = "stormchaser"
155)]
156/// List workflow runs.
157pub async fn list_workflow_runs(
158    AuthClaims(_claims): AuthClaims,
159    State(state): State<AppState>,
160    Query(params): Query<ListRunsQuery>,
161) -> Result<impl IntoResponse, StatusCode> {
162    let limit = params.limit.unwrap_or(20).min(100);
163    let offset = params.offset.unwrap_or(0);
164
165    let runs = db::list_workflow_runs(&state.pool, &params, limit as i64, offset as i64)
166        .await
167        .map_err(|e| {
168            tracing::error!("Failed to fetch workflow runs: {:?}", e);
169            StatusCode::INTERNAL_SERVER_ERROR
170        })?;
171
172    Ok(Json(runs))
173}
174
175/// Gets workflow run details.
176#[utoipa::path(
177    get,
178    path = "/api/v1/runs/{id}",
179    params(
180        ("id" = stormchaser_model::RunId, Path, description = "Run ID")
181    ),
182    responses(
183        (status = 200, description = "Workflow run details", body = WorkflowRunFullDetail),
184        (status = 404, description = "Run not found")
185    ),
186    security(
187        ("bearer_auth" = [])
188    ),
189    tag = "stormchaser"
190)]
191/// Get workflow run.
192pub async fn get_workflow_run(
193    AuthClaims(_claims): AuthClaims,
194    State(state): State<AppState>,
195    Path(run_id): Path<RunId>,
196) -> Result<impl IntoResponse, StatusCode> {
197    // 1. Fetch the workflow run detail
198    let detail: WorkflowRunDetail = db::get_workflow_run_detail(&state.pool, run_id)
199        .await
200        .map_err(|e| {
201            tracing::error!(
202                "Failed to fetch workflow run detail for {}: {:?}",
203                run_id,
204                e
205            );
206            StatusCode::INTERNAL_SERVER_ERROR
207        })?
208        .ok_or(StatusCode::NOT_FOUND)?;
209
210    // 2. Fetch all step instances for this run (active or archived)
211    let instances = db::get_step_instances(&state.pool, run_id)
212        .await
213        .map_err(|e| {
214            tracing::error!("Failed to fetch step instances for {}: {:?}", run_id, e);
215            StatusCode::INTERNAL_SERVER_ERROR
216        })?;
217
218    let mut steps = Vec::new();
219    for instance in instances {
220        // Fetch outputs for this step (active or archived)
221        let outputs = db::get_step_outputs(&state.pool, instance.id)
222            .await
223            .map_err(|_| StatusCode::INTERNAL_SERVER_ERROR)?;
224
225        // Fetch status history
226        let history = db::get_step_status_history(&state.pool, instance.id)
227            .await
228            .map_err(|_| StatusCode::INTERNAL_SERVER_ERROR)?;
229
230        let mut logs = if let Some(backend) = &state.log_backend {
231            backend
232                .fetch_step_logs(
233                    &instance.step_name,
234                    instance.id,
235                    instance.started_at,
236                    instance.finished_at,
237                    Some(100), // Reduce payload size for full detail, TUI will fetch on demand
238                )
239                .await
240                .unwrap_or_else(|e| {
241                    tracing::warn!("Failed to fetch logs for step {}: {:?}", instance.id, e);
242                    vec![format!("Error fetching logs: {}", e)]
243                })
244        } else {
245            vec!["Log backend not configured".to_string()]
246        };
247
248        if logs.len() > 1000 {
249            logs = logs.split_off(logs.len() - 1000);
250        }
251
252        steps.push(StepDetail {
253            instance,
254            outputs,
255            history,
256            logs,
257        });
258    }
259
260    let artifacts = db::list_run_artifacts(&state.pool, run_id)
261        .await
262        .map_err(|e| {
263            tracing::error!("Failed to fetch run artifacts for {}: {:?}", run_id, e);
264            StatusCode::INTERNAL_SERVER_ERROR
265        })?;
266
267    let test_summaries = db::list_run_test_summaries(&state.pool, run_id)
268        .await
269        .map_err(|e| {
270            tracing::error!("Failed to fetch test summaries for {}: {:?}", run_id, e);
271            StatusCode::INTERNAL_SERVER_ERROR
272        })?;
273
274    let test_cases = db::list_run_test_cases(&state.pool, run_id)
275        .await
276        .map_err(|e| {
277            tracing::error!("Failed to fetch test cases for {}: {:?}", run_id, e);
278            StatusCode::INTERNAL_SERVER_ERROR
279        })?;
280
281    Ok(Json(WorkflowRunFullDetail {
282        detail,
283        steps,
284        artifacts,
285        test_summaries,
286        test_cases,
287    }))
288}
289
290/// Deletes a workflow run.
291#[utoipa::path(
292    delete,
293    path = "/api/v1/runs/{run_id}",
294    params(("run_id" = stormchaser_model::RunId, Path, description="Run ID")),
295    responses(
296        (status = 200, description = "Success"),
297        (status = 400, description = "Bad Request"),
298        (status = 404, description = "Not Found"),
299        (status = 500, description = "Internal Server Error")
300    ),
301    tag = "workflow"
302)]
303pub async fn delete_workflow_run_api(
304    AuthClaims(_claims): AuthClaims,
305    State(state): State<AppState>,
306    Path(run_id): Path<RunId>,
307) -> Result<impl IntoResponse, StatusCode> {
308    db::delete_workflow_run(&state.pool, run_id)
309        .await
310        .map_err(|e| {
311            tracing::error!("Failed to delete workflow run {}: {:?}", run_id, e);
312            StatusCode::INTERNAL_SERVER_ERROR
313        })?;
314
315    Ok(StatusCode::NO_CONTENT)
316}
317
318#[utoipa::path(
319    post,
320    path = "/api/v1/runs/direct",
321    request_body = DirectRunRequest,
322    responses(
323        (status = 200, description = "Workflow started", body = EnqueueResponse),
324        (status = 500, description = "Internal Server Error")
325    ),
326    security(
327        ("bearer_auth" = [])
328    ),
329    tag = "workflow"
330)]
331/// Executes a direct run.
332#[tracing::instrument(skip(state, claims), fields(run_id = tracing::field::Empty, initiating_user = tracing::field::Empty))]
333pub async fn direct_run(
334    AuthClaims(claims): AuthClaims,
335    State(state): State<AppState>,
336    Json(payload): Json<DirectRunRequest>,
337) -> Result<impl IntoResponse, StatusCode> {
338    let run_id = stormchaser_model::RunId::new_v4();
339    let user_id = claims.email.clone().unwrap_or(claims.sub.clone());
340
341    let span = tracing::Span::current();
342    span.record("run_id", tracing::field::display(run_id));
343    span.record("initiating_user", tracing::field::display(&user_id));
344
345    // Publish to NATS stormchaser.run.direct
346    let payload_json = serde_json::json!({
347        "run_id": run_id,
348        "dsl": payload.dsl,
349        "initiating_user": user_id,
350        "inputs": payload.inputs,
351    });
352
353    use cloudevents::{EventBuilder, EventBuilderV10};
354    let event = EventBuilderV10::new()
355        .id(uuid::Uuid::new_v4().to_string())
356        .ty("stormchaser.v1.run.direct")
357        .source("/stormchaser/api")
358        .time(Utc::now())
359        .data("application/json", payload_json)
360        .build()
361        .map_err(|e| {
362            tracing::error!("Failed to build CloudEvent: {}", e);
363            StatusCode::INTERNAL_SERVER_ERROR
364        })?;
365
366    let event_str = serde_json::to_string(&event).unwrap_or_default();
367    let mut headers = HeaderMap::new();
368    headers.insert("Content-Type", "application/cloudevents+json");
369
370    state
371        .nats
372        .publish_with_headers("stormchaser.v1.run.direct", headers, event_str.into())
373        .await
374        .map_err(|e| {
375            tracing::error!("Failed to publish CloudEvent: {}", e);
376            StatusCode::INTERNAL_SERVER_ERROR
377        })?;
378
379    Ok(Json(EnqueueResponse {
380        run_id,
381        status: "started".to_string(),
382    }))
383}
384
385#[utoipa::path(
386    get,
387    path = "/api/v1/runs/stream",
388    responses(
389        (status = 200, description = "Workflow runs stream (SSE)")
390    ),
391    security(
392        ("bearer_auth" = [])
393    ),
394    tag = "workflow"
395)]
396/// Stream workflow runs api.
397pub async fn stream_workflow_runs_api(
398    AuthClaims(_claims): AuthClaims,
399    State(state): State<AppState>,
400) -> Result<
401    axum::response::sse::Sse<
402        impl futures::stream::Stream<Item = Result<Event, std::convert::Infallible>>,
403    >,
404    StatusCode,
405> {
406    let (tx, rx) = mpsc::channel::<Result<Event, std::convert::Infallible>>(100);
407
408    let nats = state.nats.clone();
409    let pool = state.pool.clone();
410
411    tokio::spawn(async move {
412        let mut subscriber = match nats.subscribe("stormchaser.v1.run.>").await {
413            Ok(sub) => sub,
414            Err(e) => {
415                tracing::error!("Failed to subscribe to NATS for workflow runs: {:?}", e);
416                return;
417            }
418        };
419
420        while let Some(msg) = subscriber.next().await {
421            let ce: cloudevents::Event = match serde_json::from_slice(&msg.payload) {
422                Ok(e) => e,
423                Err(_) => continue,
424            };
425            let payload: Value = if let Some(cloudevents::Data::Json(v)) = ce.data() {
426                v.clone()
427            } else {
428                continue;
429            };
430
431            if let Some(run_id_str) = payload.get("run_id").and_then(|id| id.as_str()) {
432                if let Ok(run_id) = Uuid::parse_str(run_id_str) {
433                    // Fetch full detail for the run
434                    let detail =
435                        db::get_workflow_run_detail(&pool, stormchaser_model::RunId::new(run_id))
436                            .await
437                            .unwrap_or(None);
438
439                    if let Some(run) = detail {
440                        let data = serde_json::to_string(&run).unwrap_or_default();
441                        let event = Event::default().event("workflow_run").data(data);
442                        if tx.send(Ok(event)).await.is_err() {
443                            break;
444                        }
445                    }
446                }
447            }
448        }
449    });
450
451    let stream = tokio_stream::wrappers::ReceiverStream::new(rx).map(|res| match res {
452        Ok(event) => Ok(event),
453        Err(_) => unreachable!(),
454    });
455
456    Ok(axum::response::sse::Sse::new(stream).keep_alive(axum::response::sse::KeepAlive::default()))
457}