Skip to main content

stormchaser_api/
lib.rs

1//! Stormchaser API implementation.
2//! This crate contains the REST API for the Stormchaser system.
3
4/// Authentication and authorization module
5pub mod auth;
6/// Database access module
7pub mod db;
8/// Human-in-the-loop module
9pub mod hitl;
10/// API routes module
11pub mod routes;
12/// Telemetry and metrics module
13pub mod telemetry;
14
15use async_nats::Client;
16use auth::opa::opa_middleware;
17pub use auth::{AuthClaims, Claims, JWT_SECRET};
18use axum::extract;
19use axum::{
20    http::StatusCode,
21    middleware,
22    routing::{delete, get, post},
23    Router,
24};
25use once_cell::sync::Lazy;
26use opentelemetry::{global, metrics::Counter};
27use sqlx::PgPool;
28use std::env;
29use std::net::SocketAddr;
30use std::sync::Arc;
31use stormchaser_model::auth::OpaAuthorizer;
32use stormchaser_model::cron::CronWorkflow;
33use stormchaser_model::event_rules::EventRule;
34use stormchaser_model::event_rules::WebhookConfig;
35use stormchaser_model::storage::ArtifactRegistry;
36use stormchaser_model::storage::BackendType;
37use stormchaser_model::storage::StorageBackend;
38use stormchaser_model::test_report::TestCase;
39use stormchaser_model::test_report::TestCaseStatus;
40use stormchaser_model::test_report::TestReport;
41use stormchaser_model::test_report::TestSummary;
42use stormchaser_model::LogBackend;
43use tokio::sync;
44/// Rate limiting middleware and configuration
45pub mod rate_limit;
46
47use tower_http::trace::TraceLayer;
48use utoipa::OpenApi;
49
50use routes::auth::*;
51use routes::cron::*;
52use routes::event_rule::*;
53use routes::step::*;
54use routes::storage::*;
55use routes::webhook::*;
56use routes::workflow::*;
57pub use routes::*;
58
59#[derive(OpenApi)]
60#[openapi(
61    paths(
62        routes::auth::login,
63        routes::auth::exchange_token,
64        routes::auth::refresh_token,
65        routes::workflow::enqueue_workflow,
66        routes::workflow::list_workflow_runs,
67        routes::workflow::get_workflow_run,
68        routes::workflow::delete_workflow_run_api,
69        routes::workflow::direct_run,
70        routes::workflow::stream_workflow_runs_api,
71        routes::cron::create_cron_workflow,
72        routes::cron::list_cron_workflows,
73        routes::cron::delete_cron_workflow,
74        routes::cron::trigger_cron_workflow,
75        routes::storage::create_storage_backend,
76        routes::storage::list_storage_backends,
77        routes::storage::get_storage_backend,
78        routes::storage::update_storage_backend,
79        routes::storage::delete_storage_backend,
80        routes::storage::list_run_artifacts,
81        routes::storage::list_run_test_reports,
82        routes::storage::list_run_test_summaries,
83        routes::storage::get_test_report,
84        routes::webhook::create_webhook,
85        routes::webhook::list_webhooks,
86        routes::webhook::get_webhook,
87        routes::webhook::delete_webhook,
88        routes::event_rule::create_event_rule,
89        routes::event_rule::list_event_rules,
90        routes::event_rule::delete_event_rule,
91        routes::webhook::handle_webhook,
92        routes::step::stream_step_logs_api,
93        routes::step::get_step_logs_api,
94        routes::step::stream_run_logs_api,
95        routes::step::stream_run_status_api,
96        routes::schema::get_schema,
97        hitl::approve_step_link
98    ),
99    components(
100        schemas(
101            AuthExchangeRequest, AuthExchangeResponse, AuthRefreshRequest,
102            EnqueueRequest, EnqueueResponse, RunOverrides,
103            ListRunsQuery, WorkflowRunDetail,
104            WorkflowRunFullDetail, StepDetail,
105            CreateCronWorkflowRequest, CronWorkflowResponse,
106            CronWorkflow,
107            CreateStorageBackendRequest, UpdateStorageBackendRequest,
108            StorageBackend, BackendType,
109            ArtifactRegistry,
110            TestCase, TestCaseStatus,
111            TestSummary, TestReport,
112            CreateWebhookRequest, CreateEventRuleRequest,
113            WebhookConfig, EventRule,
114            DirectRunRequest
115        )
116    ),
117    tags(
118        (name = "stormchaser", description = "Stormchaser API"),
119        (name = "hitl", description = "Human-in-the-Loop"),
120        (name = "cron", description = "Cron workflows"),
121        (name = "storage", description = "Storage and artifacts"),
122        (name = "webhook", description = "Webhooks and rules"),
123        (name = "event_rule", description = "Event rules"),
124        (name = "step", description = "Step actions"),
125        (name = "workflow", description = "Workflow actions")
126    ),
127    security(
128        ("bearer_auth" = [])
129    )
130)]
131/// OpenAPI documentation struct for the API
132pub struct ApiDoc;
133
134/// Counter metric for tracking the total number of enqueued workflow runs
135pub static RUNS_ENQUEUED: Lazy<Counter<u64>> = Lazy::new(|| {
136    global::meter("stormchaser-api")
137        .u64_counter("stormchaser.v1.runs_enqueued")
138        .with_description("Total number of runs enqueued")
139        .build()
140});
141
142use tokio::sync::RwLock;
143
144/// Application state shared across routes
145#[derive(Clone)]
146pub struct AppState {
147    /// Database connection pool
148    pub pool: PgPool,
149    /// NATS client connection
150    pub nats: Client,
151    /// OPA authorizer
152    pub opa: Arc<dyn OpaAuthorizer>,
153    /// Optional OIDC configuration
154    pub oidc_config: Option<auth::jwks::OidcConfig>,
155    /// JWKS cache for token validation
156    pub jwks: Arc<RwLock<auth::jwks::JwksCache>>,
157    /// Optional backend for logging
158    pub log_backend: Option<LogBackend>,
159}
160
161/// Constructs the Axum application router with all routes and middleware
162pub fn app(state: AppState) -> Router {
163    let per_second = env::var("API_RATE_LIMIT_PER_SECOND")
164        .ok()
165        .and_then(|s| s.parse().ok())
166        .unwrap_or(5);
167    let burst_size = env::var("API_RATE_LIMIT_BURST_SIZE")
168        .ok()
169        .and_then(|s| s.parse().ok())
170        .unwrap_or(10);
171
172    let rate_limit_state = Arc::new(rate_limit::RateLimitState {
173        nats: state.nats.clone(),
174        store: Arc::new(sync::OnceCell::new()),
175        per_second,
176        burst_size,
177    });
178
179    let authenticated_routes = Router::new()
180        .route("/runs", get(list_workflow_runs).post(enqueue_workflow))
181        .route("/runs/stream", get(stream_workflow_runs_api))
182        .route(
183            "/runs/:id",
184            get(get_workflow_run).delete(delete_workflow_run_api),
185        )
186        .route("/runs/:id/steps/:step_id/approve", post(hitl::approve_step))
187        .route("/runs/:id/steps/:step_id/reject", post(hitl::reject_step))
188        .route("/events/correlate", post(hitl::correlate_event))
189        .route("/runs/:id/artifacts", get(list_run_artifacts))
190        .route("/runs/:id/reports", get(list_run_test_reports))
191        .route("/runs/:id/summaries", get(list_run_test_summaries))
192        .route("/runs/:id/reports/:report_id", get(get_test_report))
193        .route(
194            "/runs/:id/steps/:step_id/logs/stream",
195            get(stream_step_logs_api),
196        )
197        .route("/runs/:id/steps/:step_id/logs", get(get_step_logs_api))
198        .route("/runs/:id/logs/stream", get(stream_run_logs_api))
199        .route("/runs/:id/status/stream", get(stream_run_status_api))
200        .route("/runs/direct", post(direct_run))
201        .route("/webhooks", get(list_webhooks).post(create_webhook))
202        .route(
203            "/webhooks/:id",
204            get(get_webhook)
205                .patch(update_webhook)
206                .delete(delete_webhook),
207        )
208        .route(
209            "/cron-workflows",
210            get(list_cron_workflows).post(create_cron_workflow),
211        )
212        .route("/cron-workflows/:id", delete(delete_cron_workflow))
213        .route("/rules", get(list_event_rules).post(create_event_rule))
214        .route("/rules/:id", delete(delete_event_rule))
215        .route(
216            "/storage-backends",
217            get(list_storage_backends).post(create_storage_backend),
218        )
219        .route(
220            "/storage-backends/:id",
221            get(get_storage_backend)
222                .patch(update_storage_backend)
223                .delete(delete_storage_backend),
224        )
225        .layer(middleware::from_fn_with_state(
226            state.clone(),
227            opa_middleware,
228        ));
229
230    let api_v1 = Router::new()
231        .merge(authenticated_routes)
232        .route("/webhooks/:id", post(handle_webhook))
233        .route("/auth/login", get(login))
234        .route("/auth/exchange", post(exchange_token))
235        .route("/auth/refresh", post(refresh_token))
236        .route("/approve-link/:token", get(hitl::approve_step_link))
237        .route(
238            "/cron-trigger/:id",
239            post(routes::cron::trigger_cron_workflow),
240        )
241        .route("/schema", get(routes::schema::get_schema))
242        .layer(middleware::from_fn_with_state(
243            rate_limit_state,
244            rate_limit::nats_rate_limiter,
245        ))
246        .layer(middleware::from_fn(
247            |mut req: extract::Request, next: middleware::Next| async move {
248                if req
249                    .extensions()
250                    .get::<extract::ConnectInfo<SocketAddr>>()
251                    .is_none()
252                {
253                    req.extensions_mut()
254                        .insert(extract::ConnectInfo(SocketAddr::from(([127, 0, 0, 1], 0))));
255                }
256                Ok::<_, StatusCode>(next.run(req).await)
257            },
258        ));
259
260    Router::new()
261        .merge(
262            utoipa_swagger_ui::SwaggerUi::new("/swagger-ui")
263                .url("/api-docs/openapi.json", ApiDoc::openapi()),
264        )
265        .route("/", get(|| async { "Stormchaser API" }))
266        .route("/healthz", get(|| async { "OK" }))
267        .route("/api/health", get(|| async { "OK" }))
268        .nest("/api/v1", api_v1)
269        .layer(TraceLayer::new_for_http())
270        .with_state(state)
271}