Skip to main content

stormchaser_api/routes/
workflow.rs

1use super::{
2    DirectRunRequest, EnqueueRequest, EnqueueResponse, ListRunsQuery, StepDetail,
3    WorkflowRunDetail, WorkflowRunFullDetail,
4};
5use crate::db;
6use crate::{AppState, AuthClaims, RUNS_ENQUEUED};
7use async_nats::jetstream;
8use async_nats::HeaderMap;
9use axum::response::sse::Event;
10use axum::{
11    extract::{Path, Query, State},
12    http::StatusCode,
13    response::IntoResponse,
14    Json,
15};
16use chrono::Utc;
17use futures::StreamExt;
18use serde_json::Value;
19use stormchaser_model::events::WorkflowQueuedEvent;
20use stormchaser_model::events::{EventSource, EventType, SchemaVersion, WorkflowEventType};
21use stormchaser_model::nats::{publish_cloudevent, NatsSubject};
22use stormchaser_model::workflow::RunStatus;
23use stormchaser_model::RunId;
24use tokio::sync::mpsc;
25use uuid::Uuid;
26
27#[utoipa::path(
28    post,
29    path = "/api/v1/runs",
30    request_body = EnqueueRequest,
31    responses(
32        (status = 200, description = "Workflow enqueued", body = EnqueueResponse),
33        (status = 500, description = "Internal Server Error")
34    ),
35    security(
36        ("bearer_auth" = [])
37    ),
38    tag = "stormchaser"
39)]
40#[tracing::instrument(skip(state, claims), fields(run_id = tracing::field::Empty, initiating_user = tracing::field::Empty))]
41/// Enqueue workflow.
42pub async fn enqueue_workflow(
43    AuthClaims(claims): AuthClaims,
44    State(state): State<AppState>,
45    Json(payload): Json<EnqueueRequest>,
46) -> Result<impl IntoResponse, StatusCode> {
47    let run_id = stormchaser_model::RunId::new_v4();
48    let user_id = claims.email.clone().unwrap_or(claims.sub.clone());
49
50    let span = tracing::Span::current();
51    span.record("run_id", tracing::field::display(run_id));
52    span.record("initiating_user", tracing::field::display(&user_id));
53
54    tracing::info!("Enqueuing workflow: {:?}", payload.workflow_name);
55    let fencing_token = Utc::now().timestamp_nanos_opt().unwrap_or(0);
56
57    let mut tx = state
58        .pool
59        .begin()
60        .await
61        .map_err(|_| StatusCode::INTERNAL_SERVER_ERROR)?;
62
63    // Create WorkflowRun with initiating_user
64    db::insert_workflow_run(
65        &mut tx,
66        run_id,
67        &payload.workflow_name,
68        &user_id,
69        &payload.repo_url,
70        &payload.workflow_path,
71        &payload.git_ref,
72        RunStatus::Queued,
73        fencing_token,
74    )
75    .await
76    .inspect_err(|e| tracing::error!(run_id = %run_id, "Database error inserting run: {:?}", e))
77    .map_err(|_| StatusCode::INTERNAL_SERVER_ERROR)?;
78
79    // Create RunContext (placeholder for dsl_version and workflow_definition)
80    db::insert_run_context(
81        &mut tx,
82        run_id,
83        "v1",
84        serde_json::json!({}),
85        "",
86        &payload.inputs,
87    )
88    .await
89    .map_err(|_| StatusCode::INTERNAL_SERVER_ERROR)?;
90
91    // 3. Create RunQuotas (always, with defaults + overrides)
92    let timeout = payload
93        .overrides
94        .as_ref()
95        .and_then(|o| o.timeout.clone())
96        .unwrap_or_else(|| "1h".to_string());
97
98    db::insert_run_quotas(&mut tx, run_id, 10, "1", "4Gi", "10Gi", &timeout)
99        .await
100        .map_err(|_| StatusCode::INTERNAL_SERVER_ERROR)?;
101
102    tx.commit()
103        .await
104        .map_err(|_| StatusCode::INTERNAL_SERVER_ERROR)?;
105
106    // Publish to NATS
107    let event = WorkflowQueuedEvent {
108        run_id,
109        event_type: EventType::Workflow(WorkflowEventType::Queued),
110        timestamp: Utc::now(),
111        dsl: None,
112        inputs: None,
113        initiating_user: None,
114    };
115
116    publish_cloudevent(
117        &jetstream::new(state.nats.clone()),
118        NatsSubject::RunQueued,
119        EventType::Workflow(WorkflowEventType::Queued),
120        EventSource::Api,
121        serde_json::to_value(event).unwrap(),
122        Some(SchemaVersion::new("1.0".to_string())),
123        None,
124    )
125    .await
126    .map_err(|_| StatusCode::INTERNAL_SERVER_ERROR)?;
127
128    RUNS_ENQUEUED.add(
129        1,
130        &[
131            opentelemetry::KeyValue::new("workflow_name", payload.workflow_name.clone()),
132            opentelemetry::KeyValue::new("initiating_user", user_id.clone()),
133        ],
134    );
135
136    Ok(Json(EnqueueResponse {
137        run_id,
138        status: stormchaser_model::RunStatus::Queued,
139    }))
140}
141
142#[utoipa::path(
143    get,
144    path = "/api/v1/runs",
145    params(
146        ListRunsQuery
147    ),
148    responses(
149        (status = 200, description = "List of workflow runs", body = [WorkflowRunDetail]),
150        (status = 500, description = "Internal Server Error")
151    ),
152    security(
153        ("bearer_auth" = [])
154    ),
155    tag = "stormchaser"
156)]
157/// List workflow runs.
158pub async fn list_workflow_runs(
159    AuthClaims(_claims): AuthClaims,
160    State(state): State<AppState>,
161    Query(params): Query<ListRunsQuery>,
162) -> Result<impl IntoResponse, StatusCode> {
163    let limit = params.limit.unwrap_or(20).min(100);
164    let offset = params.offset.unwrap_or(0);
165
166    let runs = db::list_workflow_runs(&state.pool, &params, limit as i64, offset as i64)
167        .await
168        .map_err(|e| {
169            tracing::error!("Failed to fetch workflow runs: {:?}", e);
170            StatusCode::INTERNAL_SERVER_ERROR
171        })?;
172
173    Ok(Json(runs))
174}
175
176/// Gets workflow run details.
177#[utoipa::path(
178    get,
179    path = "/api/v1/runs/{id}",
180    params(
181        ("id" = stormchaser_model::RunId, Path, description = "Run ID")
182    ),
183    responses(
184        (status = 200, description = "Workflow run details", body = WorkflowRunFullDetail),
185        (status = 404, description = "Run not found")
186    ),
187    security(
188        ("bearer_auth" = [])
189    ),
190    tag = "stormchaser"
191)]
192/// Get workflow run.
193pub async fn get_workflow_run(
194    AuthClaims(_claims): AuthClaims,
195    State(state): State<AppState>,
196    Path(run_id): Path<RunId>,
197) -> Result<impl IntoResponse, StatusCode> {
198    // 1. Fetch the workflow run detail
199    let detail: WorkflowRunDetail = db::get_workflow_run_detail(&state.pool, run_id)
200        .await
201        .map_err(|e| {
202            tracing::error!(
203                "Failed to fetch workflow run detail for {}: {:?}",
204                run_id,
205                e
206            );
207            StatusCode::INTERNAL_SERVER_ERROR
208        })?
209        .ok_or(StatusCode::NOT_FOUND)?;
210
211    // 2. Fetch all step instances for this run (active or archived)
212    let instances = db::get_step_instances(&state.pool, run_id)
213        .await
214        .map_err(|e| {
215            tracing::error!("Failed to fetch step instances for {}: {:?}", run_id, e);
216            StatusCode::INTERNAL_SERVER_ERROR
217        })?;
218
219    let mut steps = Vec::new();
220    for instance in instances {
221        // Fetch outputs for this step (active or archived)
222        let outputs = db::get_step_outputs(&state.pool, instance.id)
223            .await
224            .map_err(|_| StatusCode::INTERNAL_SERVER_ERROR)?;
225
226        // Fetch status history
227        let history = db::get_step_status_history(&state.pool, instance.id)
228            .await
229            .map_err(|_| StatusCode::INTERNAL_SERVER_ERROR)?;
230
231        let mut logs = if let Some(backend) = &state.log_backend {
232            backend
233                .fetch_step_logs(
234                    &instance.step_name,
235                    instance.id,
236                    instance.started_at,
237                    instance.finished_at,
238                    Some(100), // Reduce payload size for full detail, TUI will fetch on demand
239                )
240                .await
241                .unwrap_or_else(|e| {
242                    tracing::warn!("Failed to fetch logs for step {}: {:?}", instance.id, e);
243                    vec![format!("Error fetching logs: {}", e)]
244                })
245        } else {
246            vec!["Log backend not configured".to_string()]
247        };
248
249        if logs.len() > 1000 {
250            logs = logs.split_off(logs.len() - 1000);
251        }
252
253        steps.push(StepDetail {
254            instance,
255            outputs,
256            history,
257            logs,
258        });
259    }
260
261    let artifacts = db::list_run_artifacts(&state.pool, run_id)
262        .await
263        .map_err(|e| {
264            tracing::error!("Failed to fetch run artifacts for {}: {:?}", run_id, e);
265            StatusCode::INTERNAL_SERVER_ERROR
266        })?;
267
268    let test_summaries = db::list_run_test_summaries(&state.pool, run_id)
269        .await
270        .map_err(|e| {
271            tracing::error!("Failed to fetch test summaries for {}: {:?}", run_id, e);
272            StatusCode::INTERNAL_SERVER_ERROR
273        })?;
274
275    let test_cases = db::list_run_test_cases(&state.pool, run_id)
276        .await
277        .map_err(|e| {
278            tracing::error!("Failed to fetch test cases for {}: {:?}", run_id, e);
279            StatusCode::INTERNAL_SERVER_ERROR
280        })?;
281
282    Ok(Json(WorkflowRunFullDetail {
283        detail,
284        steps,
285        artifacts,
286        test_summaries,
287        test_cases,
288    }))
289}
290
291/// Deletes a workflow run.
292#[utoipa::path(
293    delete,
294    path = "/api/v1/runs/{run_id}",
295    params(("run_id" = stormchaser_model::RunId, Path, description="Run ID")),
296    responses(
297        (status = 200, description = "Success"),
298        (status = 400, description = "Bad Request"),
299        (status = 404, description = "Not Found"),
300        (status = 500, description = "Internal Server Error")
301    ),
302    tag = "workflow"
303)]
304pub async fn delete_workflow_run_api(
305    AuthClaims(_claims): AuthClaims,
306    State(state): State<AppState>,
307    Path(run_id): Path<RunId>,
308) -> Result<impl IntoResponse, StatusCode> {
309    db::delete_workflow_run(&state.pool, run_id)
310        .await
311        .map_err(|e| {
312            tracing::error!("Failed to delete workflow run {}: {:?}", run_id, e);
313            StatusCode::INTERNAL_SERVER_ERROR
314        })?;
315
316    Ok(StatusCode::NO_CONTENT)
317}
318
319#[utoipa::path(
320    post,
321    path = "/api/v1/runs/direct",
322    request_body = DirectRunRequest,
323    responses(
324        (status = 200, description = "Workflow started", body = EnqueueResponse),
325        (status = 500, description = "Internal Server Error")
326    ),
327    security(
328        ("bearer_auth" = [])
329    ),
330    tag = "workflow"
331)]
332/// Executes a direct run.
333#[tracing::instrument(skip(state, claims), fields(run_id = tracing::field::Empty, initiating_user = tracing::field::Empty))]
334pub async fn direct_run(
335    AuthClaims(claims): AuthClaims,
336    State(state): State<AppState>,
337    Json(payload): Json<DirectRunRequest>,
338) -> Result<impl IntoResponse, StatusCode> {
339    let run_id = stormchaser_model::RunId::new_v4();
340    let user_id = claims.email.clone().unwrap_or(claims.sub.clone());
341
342    let span = tracing::Span::current();
343    span.record("run_id", tracing::field::display(run_id));
344    span.record("initiating_user", tracing::field::display(&user_id));
345
346    // Publish to NATS stormchaser.run.direct
347    let payload_json = serde_json::json!({
348        "run_id": run_id,
349        "dsl": payload.dsl,
350        "initiating_user": user_id,
351        "inputs": payload.inputs,
352    });
353
354    use cloudevents::{EventBuilder, EventBuilderV10};
355    let event = EventBuilderV10::new()
356        .id(uuid::Uuid::new_v4().to_string())
357        .ty("stormchaser.v1.run.direct")
358        .source(EventSource::Api.as_str())
359        .time(Utc::now())
360        .data(stormchaser_model::APPLICATION_JSON, payload_json)
361        .build()
362        .map_err(|e| {
363            tracing::error!("Failed to build CloudEvent: {}", e);
364            StatusCode::INTERNAL_SERVER_ERROR
365        })?;
366
367    let event_str = serde_json::to_string(&event).unwrap_or_default();
368    let mut headers = HeaderMap::new();
369    headers.insert("Content-Type", "application/cloudevents+json");
370
371    state
372        .nats
373        .publish_with_headers("stormchaser.v1.run.direct", headers, event_str.into())
374        .await
375        .map_err(|e| {
376            tracing::error!("Failed to publish CloudEvent: {}", e);
377            StatusCode::INTERNAL_SERVER_ERROR
378        })?;
379
380    Ok(Json(EnqueueResponse {
381        run_id,
382        status: RunStatus::Running,
383    }))
384}
385
386#[utoipa::path(
387    get,
388    path = "/api/v1/runs/stream",
389    responses(
390        (status = 200, description = "Workflow runs stream (SSE)")
391    ),
392    security(
393        ("bearer_auth" = [])
394    ),
395    tag = "workflow"
396)]
397/// Stream workflow runs api.
398pub async fn stream_workflow_runs_api(
399    AuthClaims(_claims): AuthClaims,
400    State(state): State<AppState>,
401) -> Result<
402    axum::response::sse::Sse<
403        impl futures::stream::Stream<Item = Result<Event, std::convert::Infallible>>,
404    >,
405    StatusCode,
406> {
407    let (tx, rx) = mpsc::channel::<Result<Event, std::convert::Infallible>>(100);
408
409    let nats = state.nats.clone();
410    let pool = state.pool.clone();
411
412    tokio::spawn(async move {
413        let mut subscriber = match nats.subscribe("stormchaser.v1.run.>").await {
414            Ok(sub) => sub,
415            Err(e) => {
416                tracing::error!("Failed to subscribe to NATS for workflow runs: {:?}", e);
417                return;
418            }
419        };
420
421        while let Some(msg) = subscriber.next().await {
422            let ce: cloudevents::Event = match serde_json::from_slice(&msg.payload) {
423                Ok(e) => e,
424                Err(_) => continue,
425            };
426            let payload: Value = if let Some(cloudevents::Data::Json(v)) = ce.data() {
427                v.clone()
428            } else {
429                continue;
430            };
431
432            if let Some(run_id_str) = payload.get("run_id").and_then(|id| id.as_str()) {
433                if let Ok(run_id) = Uuid::parse_str(run_id_str) {
434                    // Fetch full detail for the run
435                    let detail =
436                        db::get_workflow_run_detail(&pool, stormchaser_model::RunId::new(run_id))
437                            .await
438                            .unwrap_or(None);
439
440                    if let Some(run) = detail {
441                        let data = serde_json::to_string(&run).unwrap_or_default();
442                        let event = Event::default().event("workflow_run").data(data);
443                        if tx.send(Ok(event)).await.is_err() {
444                            break;
445                        }
446                    }
447                }
448            }
449        }
450    });
451
452    let stream = tokio_stream::wrappers::ReceiverStream::new(rx).map(|res| match res {
453        Ok(event) => Ok(event),
454        Err(_) => unreachable!(),
455    });
456
457    Ok(axum::response::sse::Sse::new(stream).keep_alive(axum::response::sse::KeepAlive::default()))
458}