Skip to main content

stormchaser_api/
lib.rs

1//! Stormchaser API implementation.
2//! This crate contains the REST API for the Stormchaser system.
3
4/// Authentication and authorization module
5pub mod auth;
6/// Database access module
7pub mod db;
8/// Human-in-the-loop module
9pub mod hitl;
10/// API routes module
11pub mod routes;
12/// Telemetry and metrics module
13pub mod telemetry;
14
15use async_nats::Client;
16use auth::opa::opa_middleware;
17pub use auth::{AuthClaims, Claims, JWT_SECRET};
18use axum::extract;
19use axum::{
20    http::StatusCode,
21    middleware,
22    routing::{delete, get, post},
23    Router,
24};
25use once_cell::sync::Lazy;
26use opentelemetry::{global, metrics::Counter};
27use sqlx::PgPool;
28use std::env;
29use std::net::SocketAddr;
30use std::sync::Arc;
31use stormchaser_model::auth::OpaAuthorizer;
32use stormchaser_model::cron::CronWorkflow;
33use stormchaser_model::event_rules::EventRule;
34use stormchaser_model::event_rules::WebhookConfig;
35use stormchaser_model::storage::ArtifactRegistry;
36use stormchaser_model::storage::BackendType;
37use stormchaser_model::storage::StorageBackend;
38use stormchaser_model::test_report::TestCase;
39use stormchaser_model::test_report::TestCaseStatus;
40use stormchaser_model::test_report::TestReport;
41use stormchaser_model::test_report::TestSummary;
42use stormchaser_model::LogBackend;
43use tokio::sync;
44/// Rate limiting middleware and configuration
45pub mod rate_limit;
46
47use tower_http::trace::TraceLayer;
48use utoipa::OpenApi;
49
50use routes::auth::*;
51use routes::cron::*;
52use routes::event_rule::*;
53#[cfg(feature = "mcp")]
54use routes::mcp::*;
55use routes::step::*;
56use routes::storage::*;
57use routes::webhook::*;
58use routes::workflow::*;
59pub use routes::*;
60
61#[derive(OpenApi)]
62#[openapi(
63    paths(
64        routes::auth::login,
65        routes::auth::exchange_token,
66        routes::auth::refresh_token,
67        routes::workflow::enqueue_workflow,
68        routes::workflow::list_workflow_runs,
69        routes::workflow::get_workflow_run,
70        routes::workflow::delete_workflow_run_api,
71        routes::workflow::direct_run,
72        routes::workflow::stream_workflow_runs_api,
73        routes::cron::create_cron_workflow,
74        routes::cron::list_cron_workflows,
75        routes::cron::delete_cron_workflow,
76        routes::cron::trigger_cron_workflow,
77        routes::storage::create_storage_backend,
78        routes::storage::list_storage_backends,
79        routes::storage::get_storage_backend,
80        routes::storage::update_storage_backend,
81        routes::storage::delete_storage_backend,
82        routes::storage::list_run_artifacts,
83        routes::storage::list_run_test_reports,
84        routes::storage::list_run_test_summaries,
85        routes::storage::get_test_report,
86        routes::webhook::create_webhook,
87        routes::webhook::list_webhooks,
88        routes::webhook::get_webhook,
89        routes::webhook::delete_webhook,
90        routes::event_rule::create_event_rule,
91        routes::event_rule::list_event_rules,
92        routes::event_rule::delete_event_rule,
93        routes::webhook::handle_webhook,
94        routes::step::stream_step_logs_api,
95        routes::step::get_step_logs_api,
96        routes::step::stream_run_logs_api,
97        routes::step::stream_run_status_api,
98        routes::schema::get_schema,
99        hitl::approve_step_link
100    ),
101    components(
102        schemas(
103            AuthExchangeRequest, AuthExchangeResponse, AuthRefreshRequest,
104            EnqueueRequest, EnqueueResponse, RunOverrides,
105            ListRunsQuery, WorkflowRunDetail,
106            WorkflowRunFullDetail, StepDetail,
107            CreateCronWorkflowRequest, CronWorkflowResponse,
108            CronWorkflow,
109            CreateStorageBackendRequest, UpdateStorageBackendRequest,
110            StorageBackend, BackendType,
111            ArtifactRegistry,
112            TestCase, TestCaseStatus,
113            TestSummary, TestReport,
114            CreateWebhookRequest, CreateEventRuleRequest,
115            WebhookConfig, EventRule,
116            DirectRunRequest
117        )
118    ),
119    tags(
120        (name = "stormchaser", description = "Stormchaser API"),
121        (name = "hitl", description = "Human-in-the-Loop"),
122        (name = "cron", description = "Cron workflows"),
123        (name = "storage", description = "Storage and artifacts"),
124        (name = "webhook", description = "Webhooks and rules"),
125        (name = "event_rule", description = "Event rules"),
126        (name = "step", description = "Step actions"),
127        (name = "workflow", description = "Workflow actions")
128    ),
129    security(
130        ("bearer_auth" = [])
131    )
132)]
133/// OpenAPI documentation struct for the API
134pub struct ApiDoc;
135
136/// Counter metric for tracking the total number of enqueued workflow runs
137pub static RUNS_ENQUEUED: Lazy<Counter<u64>> = Lazy::new(|| {
138    global::meter("stormchaser-api")
139        .u64_counter("stormchaser.v1.runs_enqueued")
140        .with_description("Total number of runs enqueued")
141        .build()
142});
143
144use tokio::sync::RwLock;
145
146/// Application state shared across routes
147#[derive(Clone)]
148pub struct AppState {
149    /// Database connection pool
150    pub pool: PgPool,
151    /// NATS client connection
152    pub nats: Client,
153    /// OPA authorizer
154    pub opa: Arc<dyn OpaAuthorizer>,
155    /// Optional OIDC configuration
156    pub oidc_config: Option<auth::jwks::OidcConfig>,
157    /// JWKS cache for token validation
158    pub jwks: Arc<RwLock<auth::jwks::JwksCache>>,
159    /// Optional backend for logging
160    pub log_backend: Option<LogBackend>,
161    /// API base URL for MCP tool callback
162    pub api_base_url: String,
163}
164
165/// Constructs the Axum application router with all routes and middleware
166pub fn app(state: AppState) -> Router {
167    let per_second = env::var("API_RATE_LIMIT_PER_SECOND")
168        .ok()
169        .and_then(|s| s.parse().ok())
170        .unwrap_or(5);
171    let burst_size = env::var("API_RATE_LIMIT_BURST_SIZE")
172        .ok()
173        .and_then(|s| s.parse().ok())
174        .unwrap_or(10);
175
176    let rate_limit_state = Arc::new(rate_limit::RateLimitState {
177        nats: state.nats.clone(),
178        store: Arc::new(sync::OnceCell::new()),
179        per_second,
180        burst_size,
181    });
182
183    #[allow(unused_mut)]
184    let mut authenticated_routes = Router::new()
185        .route("/runs", get(list_workflow_runs).post(enqueue_workflow))
186        .route("/runs/stream", get(stream_workflow_runs_api))
187        .route(
188            "/runs/:id",
189            get(get_workflow_run).delete(delete_workflow_run_api),
190        )
191        .route("/runs/:id/steps/:step_id/approve", post(hitl::approve_step))
192        .route("/runs/:id/steps/:step_id/reject", post(hitl::reject_step))
193        .route("/events/correlate", post(hitl::correlate_event))
194        .route("/runs/:id/artifacts", get(list_run_artifacts))
195        .route("/runs/:id/reports", get(list_run_test_reports))
196        .route("/runs/:id/summaries", get(list_run_test_summaries))
197        .route("/runs/:id/reports/:report_id", get(get_test_report))
198        .route(
199            "/runs/:id/steps/:step_id/logs/stream",
200            get(stream_step_logs_api),
201        )
202        .route("/runs/:id/steps/:step_id/logs", get(get_step_logs_api))
203        .route("/runs/:id/logs/stream", get(stream_run_logs_api))
204        .route("/runs/:id/status/stream", get(stream_run_status_api))
205        .route("/runs/direct", post(direct_run))
206        .route("/webhooks", get(list_webhooks).post(create_webhook))
207        .route(
208            "/webhooks/:id",
209            get(get_webhook)
210                .patch(update_webhook)
211                .delete(delete_webhook),
212        )
213        .route(
214            "/cron-workflows",
215            get(list_cron_workflows).post(create_cron_workflow),
216        )
217        .route("/cron-workflows/:id", delete(delete_cron_workflow))
218        .route("/rules", get(list_event_rules).post(create_event_rule))
219        .route("/rules/:id", delete(delete_event_rule))
220        .route(
221            "/storage-backends",
222            get(list_storage_backends).post(create_storage_backend),
223        )
224        .route(
225            "/storage-backends/:id",
226            get(get_storage_backend)
227                .patch(update_storage_backend)
228                .delete(delete_storage_backend),
229        );
230
231    #[cfg(feature = "mcp")]
232    {
233        if let Some(service) = mcp_service(&state.api_base_url) {
234            authenticated_routes = authenticated_routes.nest_service("/mcp", service);
235        }
236    }
237
238    let authenticated_routes = authenticated_routes.layer(middleware::from_fn_with_state(
239        state.clone(),
240        opa_middleware,
241    ));
242
243    let api_v1 = Router::new()
244        .merge(authenticated_routes)
245        .route("/webhooks/:id", post(handle_webhook))
246        .route("/auth/login", get(login))
247        .route("/auth/exchange", post(exchange_token))
248        .route("/auth/refresh", post(refresh_token))
249        .route("/approve-link/:token", get(hitl::approve_step_link))
250        .route(
251            "/cron-trigger/:id",
252            post(routes::cron::trigger_cron_workflow),
253        )
254        .route("/schema", get(routes::schema::get_schema))
255        .layer(middleware::from_fn_with_state(
256            rate_limit_state,
257            rate_limit::nats_rate_limiter,
258        ))
259        .layer(middleware::from_fn(
260            |mut req: extract::Request, next: middleware::Next| async move {
261                if req
262                    .extensions()
263                    .get::<extract::ConnectInfo<SocketAddr>>()
264                    .is_none()
265                {
266                    req.extensions_mut()
267                        .insert(extract::ConnectInfo(SocketAddr::from(([127, 0, 0, 1], 0))));
268                }
269                Ok::<_, StatusCode>(next.run(req).await)
270            },
271        ));
272
273    Router::new()
274        .merge(
275            utoipa_swagger_ui::SwaggerUi::new("/swagger-ui")
276                .url("/api-docs/openapi.json", ApiDoc::openapi()),
277        )
278        .route("/", get(|| async { "Stormchaser API" }))
279        .route("/healthz", get(|| async { "OK" }))
280        .route("/api/health", get(|| async { "OK" }))
281        .nest("/api/v1", api_v1)
282        .layer(TraceLayer::new_for_http())
283        .with_state(state)
284}