1use axum::{
38 Router,
39 body::Body,
40 extract::{Path, State},
41 http::{HeaderMap, Request, StatusCode},
42 middleware::{self, Next},
43 response::{IntoResponse, Json, Response},
44 routing::{get, post},
45};
46use dashmap::DashMap;
47use serde::{Deserialize, Serialize};
48use serde_json::{Value, json};
49use std::sync::Arc;
50use std::time::{Duration, SystemTime, UNIX_EPOCH};
51use tokio::net::TcpListener;
52use tower_http::{
53 compression::CompressionLayer,
54 cors::{Any, CorsLayer},
55 trace::TraceLayer,
56};
57use tracing::info;
58use uuid::Uuid;
59
60#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)]
75#[serde(rename_all = "snake_case")]
76pub enum PipelineState {
77 Pending,
79 Running,
81 Completed,
83 Failed,
85 Cancelled,
87}
88
89#[derive(Debug, Clone, Serialize, Deserialize)]
91pub struct PipelineRun {
92 pub id: String,
94 pub definition: Value,
96 pub state: PipelineState,
98 pub submitted_at: u64,
100 pub finished_at: Option<u64>,
102 pub results: Value,
104 pub error: Option<String>,
106}
107
108impl PipelineRun {
109 fn new(id: String, definition: Value) -> Self {
110 let now = SystemTime::now()
111 .duration_since(UNIX_EPOCH)
112 .unwrap_or(Duration::ZERO)
113 .as_secs();
114 Self {
115 id,
116 definition,
117 state: PipelineState::Pending,
118 submitted_at: now,
119 finished_at: None,
120 results: json!({}),
121 error: None,
122 }
123 }
124}
125
126#[derive(Debug, Deserialize)]
132pub struct SubmitPipelineRequest {
133 pub definition: Value,
135}
136
137#[derive(Debug, Serialize)]
139pub struct SubmitPipelineResponse {
140 pub id: String,
142 pub state: PipelineState,
144}
145
146#[derive(Debug, Serialize)]
148pub struct PipelineStatus {
149 pub id: String,
151 pub state: PipelineState,
153 pub submitted_at: u64,
155 pub finished_at: Option<u64>,
157 pub error: Option<String>,
159}
160
161impl From<&PipelineRun> for PipelineStatus {
162 fn from(r: &PipelineRun) -> Self {
163 Self {
164 id: r.id.clone(),
165 state: r.state.clone(),
166 submitted_at: r.submitted_at,
167 finished_at: r.finished_at,
168 error: r.error.clone(),
169 }
170 }
171}
172
173#[derive(Clone)]
179pub struct AppState {
180 pub pipelines: Arc<DashMap<String, PipelineRun>>,
182 pub api_key: String,
184}
185
186impl AppState {
187 pub fn new(api_key: impl Into<String>) -> Self {
189 Self {
190 pipelines: Arc::new(DashMap::new()),
191 api_key: api_key.into(),
192 }
193 }
194}
195
196async fn require_api_key(
205 State(state): State<AppState>,
206 headers: HeaderMap,
207 request: Request<Body>,
208 next: Next,
209) -> Response {
210 let provided = headers
211 .get("x-api-key")
212 .and_then(|v| v.to_str().ok())
213 .unwrap_or("");
214
215 if provided != state.api_key {
216 return (
217 StatusCode::UNAUTHORIZED,
218 Json(json!({"error": "invalid or missing X-Api-Key"})),
219 )
220 .into_response();
221 }
222 next.run(request).await
223}
224
225async fn health() -> impl IntoResponse {
230 Json(json!({
231 "status": "ok",
232 "service": "stygian-api",
233 "version": env!("CARGO_PKG_VERSION"),
234 }))
235}
236
237async fn metrics() -> impl IntoResponse {
238 (
241 [(
242 axum::http::header::CONTENT_TYPE,
243 "text/plain; version=0.0.4",
244 )],
245 "# stygian-api metrics\n",
246 )
247}
248
249async fn dashboard() -> impl IntoResponse {
250 (
251 [(axum::http::header::CONTENT_TYPE, "text/html; charset=utf-8")],
252 DASHBOARD_HTML,
253 )
254}
255
256async fn submit_pipeline(
257 State(state): State<AppState>,
258 Json(body): Json<SubmitPipelineRequest>,
259) -> impl IntoResponse {
260 let id = Uuid::new_v4().to_string();
261 let run = PipelineRun::new(id.clone(), body.definition);
262 state.pipelines.insert(id.clone(), run);
263 info!(pipeline_id = %id, "pipeline submitted");
264 (
265 StatusCode::CREATED,
266 Json(SubmitPipelineResponse {
267 id,
268 state: PipelineState::Pending,
269 }),
270 )
271}
272
273async fn list_pipelines(State(state): State<AppState>) -> impl IntoResponse {
274 let list: Vec<PipelineStatus> = state
275 .pipelines
276 .iter()
277 .map(|e| PipelineStatus::from(e.value()))
278 .collect();
279 Json(list)
280}
281
282#[allow(clippy::option_if_let_else)]
283async fn get_pipeline(State(state): State<AppState>, Path(id): Path<String>) -> Response {
284 match state.pipelines.get(&id) {
285 Some(run) => Json(PipelineStatus::from(run.value())).into_response(),
286 None => (
287 StatusCode::NOT_FOUND,
288 Json(json!({"error": "pipeline not found"})),
289 )
290 .into_response(),
291 }
292}
293
294async fn get_pipeline_results(State(state): State<AppState>, Path(id): Path<String>) -> Response {
295 match state.pipelines.get(&id) {
296 Some(run) => {
297 if run.state == PipelineState::Completed {
298 Json(json!({
299 "id": run.id,
300 "results": run.results,
301 }))
302 .into_response()
303 } else {
304 (
305 StatusCode::ACCEPTED,
306 Json(json!({
307 "id": run.id,
308 "state": run.state,
309 "message": "pipeline not yet complete",
310 })),
311 )
312 .into_response()
313 }
314 }
315 None => (
316 StatusCode::NOT_FOUND,
317 Json(json!({"error": "pipeline not found"})),
318 )
319 .into_response(),
320 }
321}
322
323async fn cancel_pipeline(State(state): State<AppState>, Path(id): Path<String>) -> Response {
324 match state.pipelines.remove(&id) {
325 Some(_) => {
326 info!(pipeline_id = %id, "pipeline cancelled/deleted");
327 StatusCode::NO_CONTENT.into_response()
328 }
329 None => (
330 StatusCode::NOT_FOUND,
331 Json(json!({"error": "pipeline not found"})),
332 )
333 .into_response(),
334 }
335}
336
337pub fn build_router(state: AppState) -> Router {
352 let protected = Router::new()
353 .route("/pipelines", post(submit_pipeline).get(list_pipelines))
354 .route("/pipelines/{id}", get(get_pipeline).delete(cancel_pipeline))
355 .route("/pipelines/{id}/results", get(get_pipeline_results))
356 .layer(middleware::from_fn_with_state(
357 state.clone(),
358 require_api_key,
359 ));
360
361 let public = Router::new()
362 .route("/", get(dashboard))
363 .route("/health", get(health))
364 .route("/metrics", get(metrics));
365
366 Router::new()
367 .merge(public)
368 .merge(protected)
369 .layer(
370 CorsLayer::new()
371 .allow_origin(Any)
372 .allow_methods(Any)
373 .allow_headers(Any),
374 )
375 .layer(CompressionLayer::new())
376 .layer(TraceLayer::new_for_http())
377 .with_state(state)
378}
379
380pub struct ApiServer {
398 state: AppState,
399}
400
401impl ApiServer {
402 pub fn new(api_key: impl Into<String>) -> Self {
404 Self {
405 state: AppState::new(api_key),
406 }
407 }
408
409 #[must_use]
413 pub fn from_env() -> Self {
414 let key = std::env::var("STYGIAN_API_KEY").unwrap_or_else(|_| "dev-key".to_string());
415 Self::new(key)
416 }
417
418 pub async fn run(self, addr: &str) -> Result<(), Box<dyn std::error::Error>> {
425 let app = build_router(self.state);
426 let listener = TcpListener::bind(addr).await?;
427 info!(address = %addr, "stygian-api listening");
428 axum::serve(listener, app).await?;
429 Ok(())
430 }
431}
432
433const DASHBOARD_HTML: &str = r#"<!DOCTYPE html>
442<html lang="en">
443<head>
444 <meta charset="UTF-8" />
445 <meta name="viewport" content="width=device-width, initial-scale=1.0" />
446 <title>Stygian Dashboard</title>
447 <script src="https://cdn.tailwindcss.com"></script>
448 <style>
449 body { font-family: 'Inter', system-ui, sans-serif; }
450 .badge-pending { @apply bg-yellow-100 text-yellow-800; }
451 .badge-running { @apply bg-blue-100 text-blue-800; }
452 .badge-completed { @apply bg-green-100 text-green-800; }
453 .badge-failed { @apply bg-red-100 text-red-800; }
454 .badge-cancelled { @apply bg-gray-100 text-gray-800; }
455 </style>
456</head>
457<body class="bg-gray-50 text-gray-900 min-h-screen">
458
459<!-- Nav -->
460<nav class="bg-indigo-700 text-white px-6 py-4 flex items-center gap-3 shadow-md">
461 <span class="text-2xl">🕸️</span>
462 <h1 class="text-xl font-bold tracking-tight">Stygian</h1>
463 <span class="ml-auto text-sm opacity-70">Pipeline Dashboard</span>
464</nav>
465
466<!-- Main -->
467<main class="max-w-5xl mx-auto px-4 py-8 space-y-8">
468
469 <!-- Health card -->
470 <section class="bg-white rounded-xl shadow p-6">
471 <h2 class="text-lg font-semibold mb-3">System Health</h2>
472 <div id="health" class="text-sm text-gray-500">Loading…</div>
473 </section>
474
475 <!-- Submit pipeline -->
476 <section class="bg-white rounded-xl shadow p-6 space-y-4">
477 <h2 class="text-lg font-semibold">Submit Pipeline</h2>
478 <div class="space-y-2">
479 <label class="text-sm font-medium text-gray-700">API Key</label>
480 <input id="apikey" type="password" placeholder="X-Api-Key value"
481 class="w-full border border-gray-300 rounded-lg px-3 py-2 text-sm focus:outline-none focus:ring-2 focus:ring-indigo-500" />
482 </div>
483 <div class="space-y-2">
484 <label class="text-sm font-medium text-gray-700">Pipeline definition (JSON)</label>
485 <textarea id="pipelineDef" rows="6" placeholder='{"nodes":[]}'
486 class="w-full border border-gray-300 rounded-lg px-3 py-2 text-sm font-mono focus:outline-none focus:ring-2 focus:ring-indigo-500"></textarea>
487 </div>
488 <button onclick="submitPipeline()"
489 class="bg-indigo-600 hover:bg-indigo-700 text-white px-5 py-2 rounded-lg text-sm font-semibold transition-colors">
490 Submit
491 </button>
492 <div id="submit-result" class="text-sm"></div>
493 </section>
494
495 <!-- Pipeline list -->
496 <section class="bg-white rounded-xl shadow p-6">
497 <div class="flex items-center justify-between mb-4">
498 <h2 class="text-lg font-semibold">Pipelines</h2>
499 <button onclick="loadPipelines()"
500 class="text-sm text-indigo-600 hover:text-indigo-800 font-medium">Refresh</button>
501 </div>
502 <div id="pipeline-list" class="space-y-2 text-sm text-gray-500">Loading…</div>
503 </section>
504
505</main>
506
507<script>
508const BASE = '';
509
510async function fetchHealth() {
511 try {
512 const r = await fetch(`${BASE}/health`);
513 const d = await r.json();
514 document.getElementById('health').innerHTML =
515 `<span class="text-green-600 font-medium">✔ Online</span> — ${d.service} v${d.version}`;
516 } catch (e) {
517 document.getElementById('health').textContent = '✖ Unreachable';
518 }
519}
520
521function apiKey() { return document.getElementById('apikey').value || 'dev-key'; }
522
523function badge(state) {
524 const cls = {
525 pending: 'bg-yellow-100 text-yellow-800',
526 running: 'bg-blue-100 text-blue-800',
527 completed: 'bg-green-100 text-green-800',
528 failed: 'bg-red-100 text-red-800',
529 cancelled: 'bg-gray-100 text-gray-800',
530 }[state] || 'bg-gray-100 text-gray-500';
531 return `<span class="inline-block px-2 py-0.5 rounded-full text-xs font-medium ${cls}">${state}</span>`;
532}
533
534async function loadPipelines() {
535 const el = document.getElementById('pipeline-list');
536 try {
537 const r = await fetch(`${BASE}/pipelines`, {
538 headers: { 'X-Api-Key': apiKey() }
539 });
540 if (!r.ok) { el.textContent = 'Unauthorized — check API key'; return; }
541 const list = await r.json();
542 if (!list.length) { el.textContent = 'No pipelines yet'; return; }
543 el.innerHTML = list.map(p => `
544 <div class="flex items-center justify-between border border-gray-200 rounded-lg px-4 py-3">
545 <div>
546 <span class="font-mono text-xs text-gray-500">${p.id.slice(0,8)}…</span>
547 ${badge(p.state)}
548 ${p.error ? `<span class="ml-2 text-red-500 text-xs">${p.error}</span>` : ''}
549 </div>
550 <span class="text-xs text-gray-400">${new Date(p.submitted_at * 1000).toLocaleString()}</span>
551 </div>`).join('');
552 } catch(e) {
553 el.textContent = 'Error loading pipelines: ' + e.message;
554 }
555}
556
557async function submitPipeline() {
558 const el = document.getElementById('submit-result');
559 const raw = document.getElementById('pipelineDef').value.trim();
560 let definition;
561 try { definition = JSON.parse(raw || '{}'); } catch(e) {
562 el.textContent = '✖ Invalid JSON: ' + e.message; return;
563 }
564 try {
565 const r = await fetch(`${BASE}/pipelines`, {
566 method: 'POST',
567 headers: { 'Content-Type': 'application/json', 'X-Api-Key': apiKey() },
568 body: JSON.stringify({ definition }),
569 });
570 const d = await r.json();
571 if (r.ok) {
572 el.innerHTML = `<span class="text-green-600">✔ Submitted: <code>${d.id}</code></span>`;
573 loadPipelines();
574 } else {
575 el.innerHTML = `<span class="text-red-600">✖ ${d.error || 'Unknown error'}</span>`;
576 }
577 } catch(e) {
578 el.textContent = '✖ Network error: ' + e.message;
579 }
580}
581
582fetchHealth();
583loadPipelines();
584setInterval(loadPipelines, 10_000);
585</script>
586</body>
587</html>
588"#;
589
590#[cfg(test)]
595#[allow(clippy::unwrap_used, clippy::indexing_slicing)]
596mod tests {
597 use super::*;
598 use axum::{
599 body::to_bytes,
600 http::{Method, Request, StatusCode},
601 };
602 use tower::ServiceExt; fn test_state() -> AppState {
605 AppState::new("test-key")
606 }
607
608 async fn body_json(body: axum::body::Body) -> Value {
609 let bytes = to_bytes(body, usize::MAX).await.unwrap();
610 serde_json::from_slice(&bytes).unwrap()
611 }
612
613 #[tokio::test]
614 async fn health_returns_ok() {
615 let app = build_router(test_state());
616 let req = Request::builder()
617 .uri("/health")
618 .body(Body::empty())
619 .unwrap();
620 let res = app.oneshot(req).await.unwrap();
621 assert_eq!(res.status(), StatusCode::OK);
622 let body = body_json(res.into_body()).await;
623 assert_eq!(body["status"], "ok");
624 }
625
626 #[tokio::test]
627 async fn submit_pipeline_requires_api_key() {
628 let app = build_router(test_state());
629 let req = Request::builder()
630 .method(Method::POST)
631 .uri("/pipelines")
632 .header("content-type", "application/json")
633 .body(Body::from(r#"{"definition":{}}"#))
634 .unwrap();
635 let res = app.oneshot(req).await.unwrap();
636 assert_eq!(res.status(), StatusCode::UNAUTHORIZED);
637 }
638
639 #[tokio::test]
640 async fn submit_and_list_pipeline() {
641 let app = build_router(test_state());
642 let req = Request::builder()
644 .method(Method::POST)
645 .uri("/pipelines")
646 .header("content-type", "application/json")
647 .header("x-api-key", "test-key")
648 .body(Body::from(r#"{"definition":{"nodes":[]}}"#))
649 .unwrap();
650 let res = app.clone().oneshot(req).await.unwrap();
651 assert_eq!(res.status(), StatusCode::CREATED);
652 let body = body_json(res.into_body()).await;
653 let id = body["id"].as_str().unwrap().to_string();
654 assert!(!id.is_empty());
655
656 let req = Request::builder()
658 .uri("/pipelines")
659 .header("x-api-key", "test-key")
660 .body(Body::empty())
661 .unwrap();
662 let res = app.clone().oneshot(req).await.unwrap();
663 assert_eq!(res.status(), StatusCode::OK);
664 let list = body_json(res.into_body()).await;
665 assert!(list.as_array().unwrap().iter().any(|p| p["id"] == id));
666 }
667
668 #[tokio::test]
669 async fn delete_pipeline_removes_it() {
670 let state = test_state();
671 let id = Uuid::new_v4().to_string();
673 state
674 .pipelines
675 .insert(id.clone(), PipelineRun::new(id.clone(), json!({})));
676
677 let app = build_router(state);
678 let req = Request::builder()
679 .method(Method::DELETE)
680 .uri(format!("/pipelines/{id}"))
681 .header("x-api-key", "test-key")
682 .body(Body::empty())
683 .unwrap();
684 let res = app.oneshot(req).await.unwrap();
685 assert_eq!(res.status(), StatusCode::NO_CONTENT);
686 }
687
688 #[tokio::test]
689 async fn get_unknown_pipeline_returns_404() {
690 let app = build_router(test_state());
691 let req = Request::builder()
692 .uri("/pipelines/does-not-exist")
693 .header("x-api-key", "test-key")
694 .body(Body::empty())
695 .unwrap();
696 let res = app.oneshot(req).await.unwrap();
697 assert_eq!(res.status(), StatusCode::NOT_FOUND);
698 }
699
700 #[tokio::test]
701 async fn dashboard_returns_html() {
702 let app = build_router(test_state());
703 let req = Request::builder().uri("/").body(Body::empty()).unwrap();
704 let res = app.oneshot(req).await.unwrap();
705 assert_eq!(res.status(), StatusCode::OK);
706 let ct = res.headers()["content-type"].to_str().unwrap();
707 assert!(ct.contains("text/html"));
708 }
709}