use axum::{
extract::{Path, Query, State},
http::StatusCode,
response::Json,
};
use mockforge_pipelines::{
pipeline::{Pipeline, PipelineDefinition, PipelineExecution, PipelineExecutor},
PipelineEvent,
};
use serde::Deserialize;
use std::sync::Arc;
use tracing::{error, info};
use uuid::Uuid;
#[derive(Clone)]
pub struct PipelineState {
pub executor: Arc<PipelineExecutor>,
pub storage: Arc<dashmap::DashMap<Uuid, Pipeline>>,
pub executions: Arc<dashmap::DashMap<Uuid, PipelineExecution>>,
}
impl PipelineState {
pub fn new() -> Self {
Self {
executor: Arc::new(PipelineExecutor::new()),
storage: Arc::new(dashmap::DashMap::new()),
executions: Arc::new(dashmap::DashMap::new()),
}
}
}
impl Default for PipelineState {
fn default() -> Self {
Self::new()
}
}
#[derive(Debug, Deserialize)]
pub struct CreatePipelineRequest {
pub name: String,
pub definition: PipelineDefinition,
pub workspace_id: Option<Uuid>,
pub org_id: Option<Uuid>,
}
#[derive(Debug, Deserialize)]
pub struct UpdatePipelineRequest {
pub name: Option<String>,
pub definition: Option<PipelineDefinition>,
pub enabled: Option<bool>,
}
#[derive(Debug, Deserialize)]
pub struct ListPipelinesQuery {
pub workspace_id: Option<Uuid>,
pub org_id: Option<Uuid>,
pub enabled: Option<bool>,
}
#[derive(Debug, Deserialize)]
pub struct ListExecutionsQuery {
pub pipeline_id: Option<Uuid>,
pub status: Option<String>,
pub limit: Option<u32>,
pub offset: Option<u32>,
}
pub async fn create_pipeline(
State(state): State<PipelineState>,
Json(request): Json<CreatePipelineRequest>,
) -> Result<Json<Pipeline>, StatusCode> {
info!("Creating pipeline: {}", request.name);
let pipeline =
Pipeline::new(request.name, request.definition, request.workspace_id, request.org_id);
state.storage.insert(pipeline.id, pipeline.clone());
info!("Pipeline created: {}", pipeline.id);
Ok(Json(pipeline))
}
pub async fn list_pipelines(
State(state): State<PipelineState>,
Query(params): Query<ListPipelinesQuery>,
) -> Result<Json<Vec<Pipeline>>, StatusCode> {
let mut pipelines: Vec<Pipeline> =
state.storage.iter().map(|entry| entry.value().clone()).collect();
if let Some(workspace_id) = params.workspace_id {
pipelines.retain(|p| p.workspace_id == Some(workspace_id));
}
if let Some(org_id) = params.org_id {
pipelines.retain(|p| p.org_id == Some(org_id));
}
if let Some(enabled) = params.enabled {
pipelines.retain(|p| p.definition.enabled == enabled);
}
pipelines.sort_by(|a, b| b.created_at.cmp(&a.created_at));
Ok(Json(pipelines))
}
pub async fn get_pipeline(
State(state): State<PipelineState>,
Path(id): Path<String>,
) -> Result<Json<Pipeline>, StatusCode> {
let pipeline_id = Uuid::parse_str(&id).map_err(|e| {
tracing::warn!("Invalid UUID '{}': {}", id, e);
StatusCode::BAD_REQUEST
})?;
let pipeline = state
.storage
.get(&pipeline_id)
.map(|entry| entry.value().clone())
.ok_or(StatusCode::NOT_FOUND)?;
Ok(Json(pipeline))
}
pub async fn update_pipeline(
State(state): State<PipelineState>,
Path(id): Path<String>,
Json(request): Json<UpdatePipelineRequest>,
) -> Result<Json<Pipeline>, StatusCode> {
let pipeline_id = Uuid::parse_str(&id).map_err(|e| {
tracing::warn!("Invalid UUID '{}': {}", id, e);
StatusCode::BAD_REQUEST
})?;
let mut pipeline = state.storage.get_mut(&pipeline_id).ok_or(StatusCode::NOT_FOUND)?;
if let Some(name) = request.name {
pipeline.name = name;
}
if let Some(definition) = request.definition {
pipeline.definition = definition;
}
if let Some(enabled) = request.enabled {
pipeline.definition.enabled = enabled;
}
pipeline.updated_at = chrono::Utc::now();
info!("Pipeline updated: {}", pipeline_id);
Ok(Json(pipeline.clone()))
}
pub async fn delete_pipeline(
State(state): State<PipelineState>,
Path(id): Path<String>,
) -> Result<StatusCode, StatusCode> {
let pipeline_id = Uuid::parse_str(&id).map_err(|e| {
tracing::warn!("Invalid UUID '{}': {}", id, e);
StatusCode::BAD_REQUEST
})?;
if state.storage.remove(&pipeline_id).is_some() {
info!("Pipeline deleted: {}", pipeline_id);
Ok(StatusCode::NO_CONTENT)
} else {
Err(StatusCode::NOT_FOUND)
}
}
pub async fn trigger_pipeline(
State(state): State<PipelineState>,
Path(id): Path<String>,
Json(event): Json<PipelineEvent>,
) -> Result<Json<PipelineExecution>, StatusCode> {
let pipeline_id = Uuid::parse_str(&id).map_err(|e| {
tracing::warn!("Invalid UUID '{}': {}", id, e);
StatusCode::BAD_REQUEST
})?;
let pipeline = state
.storage
.get(&pipeline_id)
.map(|entry| entry.value().clone())
.ok_or(StatusCode::NOT_FOUND)?;
if !pipeline.definition.enabled {
return Err(StatusCode::BAD_REQUEST);
}
info!("Manually triggering pipeline: {}", pipeline_id);
match state.executor.execute(&pipeline, event.clone()).await {
Ok(execution) => {
state.executions.insert(execution.id, execution.clone());
Ok(Json(execution))
}
Err(e) => {
error!("Pipeline execution failed: {}", e);
Err(StatusCode::INTERNAL_SERVER_ERROR)
}
}
}
pub async fn list_executions(
State(state): State<PipelineState>,
Query(params): Query<ListExecutionsQuery>,
) -> Result<Json<Vec<PipelineExecution>>, StatusCode> {
let mut executions: Vec<PipelineExecution> =
state.executions.iter().map(|entry| entry.value().clone()).collect();
if let Some(pipeline_id) = params.pipeline_id {
executions.retain(|e| e.pipeline_id == pipeline_id);
}
if let Some(status_str) = params.status {
executions
.retain(|e| format!("{:?}", e.status).to_lowercase() == status_str.to_lowercase());
}
executions.sort_by(|a, b| b.started_at.cmp(&a.started_at));
let offset = params.offset.unwrap_or(0) as usize;
let limit = params.limit.unwrap_or(100) as usize;
let end = (offset + limit).min(executions.len());
let paginated = executions.into_iter().skip(offset).take(end - offset).collect();
Ok(Json(paginated))
}
pub async fn get_execution(
State(state): State<PipelineState>,
Path(id): Path<String>,
) -> Result<Json<PipelineExecution>, StatusCode> {
let execution_id = Uuid::parse_str(&id).map_err(|e| {
tracing::warn!("Invalid UUID '{}': {}", id, e);
StatusCode::BAD_REQUEST
})?;
let execution = state
.executions
.get(&execution_id)
.map(|entry| entry.value().clone())
.ok_or(StatusCode::NOT_FOUND)?;
Ok(Json(execution))
}
pub async fn get_pipeline_stats(
State(state): State<PipelineState>,
Path(id): Path<String>,
) -> Result<Json<serde_json::Value>, StatusCode> {
let pipeline_id = Uuid::parse_str(&id).map_err(|e| {
tracing::warn!("Invalid UUID '{}': {}", id, e);
StatusCode::BAD_REQUEST
})?;
let executions: Vec<PipelineExecution> = state
.executions
.iter()
.filter(|entry| entry.value().pipeline_id == pipeline_id)
.map(|entry| entry.value().clone())
.collect();
let total = executions.len();
let completed = executions
.iter()
.filter(|e| {
matches!(e.status, mockforge_pipelines::pipeline::PipelineExecutionStatus::Completed)
})
.count();
let failed = executions
.iter()
.filter(|e| {
matches!(e.status, mockforge_pipelines::pipeline::PipelineExecutionStatus::Failed)
})
.count();
let running = executions
.iter()
.filter(|e| {
matches!(e.status, mockforge_pipelines::pipeline::PipelineExecutionStatus::Running)
})
.count();
Ok(Json(serde_json::json!({
"pipeline_id": pipeline_id,
"total_executions": total,
"completed": completed,
"failed": failed,
"running": running,
"success_rate": if total > 0 { (completed as f64 / total as f64) * 100.0 } else { 0.0 },
})))
}
pub fn pipeline_router(state: PipelineState) -> axum::Router {
use axum::routing::{delete, get, patch, post};
axum::Router::new()
.route("/api/v1/pipelines", post(create_pipeline))
.route("/api/v1/pipelines", get(list_pipelines))
.route("/api/v1/pipelines/{id}", get(get_pipeline))
.route("/api/v1/pipelines/{id}", patch(update_pipeline))
.route("/api/v1/pipelines/{id}", delete(delete_pipeline))
.route("/api/v1/pipelines/{id}/trigger", post(trigger_pipeline))
.route("/api/v1/pipelines/{id}/stats", get(get_pipeline_stats))
.route("/api/v1/pipelines/executions", get(list_executions))
.route("/api/v1/pipelines/executions/{id}", get(get_execution))
.with_state(state)
}