Skip to main content

routa_server/api/
workflows.rs

1use crate::application::tasks::{CreateTaskCommand, TaskApplicationService};
2use axum::{
3    extract::{Path, State},
4    routing::get,
5    Json, Router,
6};
7use routa_core::workflow::schema::{WorkflowDefinition, WorkflowStep};
8use serde::Deserialize;
9use std::path::PathBuf;
10
11use crate::error::ServerError;
12use crate::state::AppState;
13
14const FLOWS_SUBDIR: &str = "resources/flows";
15
16pub fn router() -> Router<AppState> {
17    Router::new()
18        .route("/", get(list_workflows).post(create_workflow))
19        .route(
20            "/{id}",
21            get(get_workflow)
22                .put(update_workflow)
23                .delete(delete_workflow),
24        )
25        .route("/{id}/trigger", axum::routing::post(trigger_workflow))
26}
27
28fn flows_dir() -> Result<PathBuf, ServerError> {
29    let cwd = std::env::current_dir()
30        .map_err(|e| ServerError::Internal(format!("Failed to get cwd: {e}")))?;
31    let dir = cwd.join(FLOWS_SUBDIR);
32    if !dir.exists() {
33        std::fs::create_dir_all(&dir)
34            .map_err(|e| ServerError::Internal(format!("Failed to create flows dir: {e}")))?;
35    }
36    Ok(dir)
37}
38
39fn parse_workflow(id: &str, content: &str) -> serde_json::Value {
40    let parsed: serde_yaml::Value = serde_yaml::from_str(content).unwrap_or_default();
41    let name = parsed.get("name").and_then(|v| v.as_str()).unwrap_or(id);
42    let description = parsed
43        .get("description")
44        .and_then(|v| v.as_str())
45        .unwrap_or("");
46    let version = parsed
47        .get("version")
48        .and_then(|v| v.as_str())
49        .unwrap_or("1.0");
50    let trigger = parsed
51        .get("trigger")
52        .map(|v| serde_json::to_value(v).unwrap_or_default())
53        .unwrap_or(serde_json::Value::Null);
54    let steps = parsed
55        .get("steps")
56        .map(|v| serde_json::to_value(v).unwrap_or_default())
57        .unwrap_or(serde_json::json!([]));
58
59    serde_json::json!({
60        "id": id,
61        "name": name,
62        "description": description,
63        "version": version,
64        "trigger": trigger,
65        "steps": steps,
66        "yamlContent": content,
67    })
68}
69
70fn workflow_file_path(id: &str) -> Result<PathBuf, ServerError> {
71    Ok(flows_dir()?.join(format!("{id}.yaml")))
72}
73
74fn load_workflow_definition(id: &str) -> Result<WorkflowDefinition, ServerError> {
75    let file_path = workflow_file_path(id)?;
76    if !file_path.exists() {
77        return Err(ServerError::NotFound("Workflow not found".to_string()));
78    }
79
80    let content = std::fs::read_to_string(&file_path)
81        .map_err(|e| ServerError::Internal(format!("Failed to read workflow: {e}")))?;
82
83    serde_yaml::from_str(&content)
84        .map_err(|e| ServerError::BadRequest(format!("Invalid workflow YAML: {e}")))
85}
86
87fn require_workspace_id(value: &str) -> Result<String, ServerError> {
88    let normalized = value.trim();
89    if normalized.is_empty() {
90        return Err(ServerError::BadRequest(
91            "workspaceId is required".to_string(),
92        ));
93    }
94    Ok(normalized.to_string())
95}
96
97fn group_steps_by_parallel(steps: &[WorkflowStep]) -> Vec<Vec<WorkflowStep>> {
98    let mut groups: Vec<Vec<WorkflowStep>> = Vec::new();
99    let mut current_group: Vec<WorkflowStep> = Vec::new();
100    let mut current_parallel_group: Option<String> = None;
101
102    for step in steps {
103        if let Some(parallel_group) = &step.parallel_group {
104            if current_parallel_group.as_deref() == Some(parallel_group.as_str()) {
105                current_group.push(step.clone());
106            } else {
107                if !current_group.is_empty() {
108                    groups.push(current_group);
109                }
110                current_group = vec![step.clone()];
111                current_parallel_group = Some(parallel_group.clone());
112            }
113        } else {
114            if !current_group.is_empty() {
115                groups.push(current_group);
116                current_group = Vec::new();
117            }
118            groups.push(vec![step.clone()]);
119            current_parallel_group = None;
120        }
121    }
122
123    if !current_group.is_empty() {
124        groups.push(current_group);
125    }
126
127    groups
128}
129
130fn build_step_prompt(
131    step: &WorkflowStep,
132    definition: &WorkflowDefinition,
133    trigger_payload: Option<&str>,
134) -> String {
135    let mut prompt = step.input.clone().unwrap_or_default();
136    prompt = prompt.replace("${trigger.payload}", trigger_payload.unwrap_or_default());
137
138    for (key, value) in &definition.variables {
139        prompt = prompt.replace(&format!("${{variables.{key}}}"), value);
140        prompt = prompt.replace(&format!("${{{key}}}"), value);
141    }
142
143    if prompt.trim().is_empty() {
144        format!("Execute step: {}", step.name)
145    } else {
146        prompt
147    }
148}
149
150/// GET /api/workflows — List all workflow YAML definitions.
151async fn list_workflows() -> Result<Json<serde_json::Value>, ServerError> {
152    let dir = flows_dir()?;
153    let mut workflows = Vec::new();
154
155    let entries = std::fs::read_dir(&dir)
156        .map_err(|e| ServerError::Internal(format!("Failed to read flows dir: {e}")))?;
157
158    for entry in entries.flatten() {
159        let path = entry.path();
160        let ext = path.extension().and_then(|e| e.to_str()).unwrap_or("");
161        if ext != "yaml" && ext != "yml" {
162            continue;
163        }
164        let id = path
165            .file_stem()
166            .and_then(|s| s.to_str())
167            .unwrap_or("")
168            .to_string();
169        if id.is_empty() {
170            continue;
171        }
172        match std::fs::read_to_string(&path) {
173            Ok(content) => workflows.push(parse_workflow(&id, &content)),
174            Err(_) => continue,
175        }
176    }
177
178    Ok(Json(serde_json::json!({ "workflows": workflows })))
179}
180
181#[derive(Debug, Deserialize)]
182struct CreateWorkflowInput {
183    id: String,
184    #[serde(rename = "yamlContent")]
185    yaml_content: String,
186}
187
188/// POST /api/workflows — Create a new workflow YAML file.
189async fn create_workflow(
190    Json(body): Json<CreateWorkflowInput>,
191) -> Result<(axum::http::StatusCode, Json<serde_json::Value>), ServerError> {
192    // Validate ID format
193    let id_re = regex::Regex::new(r"^[a-zA-Z0-9_-]+$").unwrap();
194    if !id_re.is_match(&body.id) {
195        return Err(ServerError::BadRequest(
196            "ID must contain only letters, numbers, hyphens, and underscores".to_string(),
197        ));
198    }
199
200    // Validate YAML
201    let parsed: serde_yaml::Value = serde_yaml::from_str(&body.yaml_content)
202        .map_err(|e| ServerError::BadRequest(format!("Invalid YAML: {e}")))?;
203
204    let has_name = parsed.get("name").and_then(|v| v.as_str()).is_some();
205    let has_steps = parsed
206        .get("steps")
207        .and_then(|v| v.as_sequence())
208        .map(|s| !s.is_empty())
209        .unwrap_or(false);
210
211    if !has_name || !has_steps {
212        return Err(ServerError::BadRequest(
213            "Workflow YAML must have name and at least one step".to_string(),
214        ));
215    }
216
217    let dir = flows_dir()?;
218    let file_path = dir.join(format!("{}.yaml", body.id));
219
220    if file_path.exists() {
221        return Err(ServerError::Conflict(format!(
222            "Workflow with id \"{}\" already exists",
223            body.id
224        )));
225    }
226
227    std::fs::write(&file_path, &body.yaml_content)
228        .map_err(|e| ServerError::Internal(format!("Failed to write workflow: {e}")))?;
229
230    let workflow = parse_workflow(&body.id, &body.yaml_content);
231    Ok((
232        axum::http::StatusCode::CREATED,
233        Json(serde_json::json!({ "workflow": workflow })),
234    ))
235}
236
237/// GET /api/workflows/{id} — Get a specific workflow.
238async fn get_workflow(Path(id): Path<String>) -> Result<Json<serde_json::Value>, ServerError> {
239    let file_path = workflow_file_path(&id)?;
240    let content = std::fs::read_to_string(&file_path)
241        .map_err(|e| ServerError::Internal(format!("Failed to read workflow: {e}")))?;
242
243    Ok(Json(
244        serde_json::json!({ "workflow": parse_workflow(&id, &content) }),
245    ))
246}
247
248#[derive(Debug, Deserialize)]
249struct UpdateWorkflowInput {
250    #[serde(rename = "yamlContent")]
251    yaml_content: String,
252}
253
254/// PUT /api/workflows/{id} — Update a workflow YAML file.
255async fn update_workflow(
256    Path(id): Path<String>,
257    Json(body): Json<UpdateWorkflowInput>,
258) -> Result<Json<serde_json::Value>, ServerError> {
259    let dir = flows_dir()?;
260    let file_path = dir.join(format!("{id}.yaml"));
261
262    if !file_path.exists() {
263        return Err(ServerError::NotFound("Workflow not found".to_string()));
264    }
265
266    // Validate YAML
267    let parsed: serde_yaml::Value = serde_yaml::from_str(&body.yaml_content)
268        .map_err(|e| ServerError::BadRequest(format!("Invalid YAML: {e}")))?;
269
270    let has_name = parsed.get("name").and_then(|v| v.as_str()).is_some();
271    let has_steps = parsed
272        .get("steps")
273        .and_then(|v| v.as_sequence())
274        .map(|s| !s.is_empty())
275        .unwrap_or(false);
276
277    if !has_name || !has_steps {
278        return Err(ServerError::BadRequest(
279            "Workflow YAML must have name and at least one step".to_string(),
280        ));
281    }
282
283    std::fs::write(&file_path, &body.yaml_content)
284        .map_err(|e| ServerError::Internal(format!("Failed to write workflow: {e}")))?;
285
286    Ok(Json(
287        serde_json::json!({ "workflow": parse_workflow(&id, &body.yaml_content) }),
288    ))
289}
290
291/// DELETE /api/workflows/{id} — Delete a workflow YAML file.
292async fn delete_workflow(Path(id): Path<String>) -> Result<Json<serde_json::Value>, ServerError> {
293    let dir = flows_dir()?;
294    let file_path = dir.join(format!("{id}.yaml"));
295
296    if !file_path.exists() {
297        return Err(ServerError::NotFound("Workflow not found".to_string()));
298    }
299
300    std::fs::remove_file(&file_path)
301        .map_err(|e| ServerError::Internal(format!("Failed to delete workflow: {e}")))?;
302
303    Ok(Json(serde_json::json!({ "success": true })))
304}
305
306#[derive(Debug, Deserialize)]
307#[serde(rename_all = "camelCase")]
308struct TriggerWorkflowInput {
309    workspace_id: String,
310    trigger_payload: Option<String>,
311}
312
313/// POST /api/workflows/{id}/trigger — start a workflow run inside a workspace.
314async fn trigger_workflow(
315    State(state): State<AppState>,
316    Path(id): Path<String>,
317    Json(body): Json<TriggerWorkflowInput>,
318) -> Result<(axum::http::StatusCode, Json<serde_json::Value>), ServerError> {
319    let workspace_id = require_workspace_id(&body.workspace_id)?;
320    let Some(_) = state.workspace_store.get(&workspace_id).await? else {
321        return Err(ServerError::NotFound("Workspace not found".to_string()));
322    };
323
324    let definition = load_workflow_definition(&id)?;
325    let workflow_run_id = uuid::Uuid::new_v4().to_string();
326    let task_service = TaskApplicationService::new(state.clone());
327    let mut task_ids = Vec::new();
328
329    for group in group_steps_by_parallel(&definition.steps) {
330        let dependencies = if task_ids.is_empty() {
331            None
332        } else {
333            Some(task_ids.clone())
334        };
335
336        for step in group {
337            let plan = task_service
338                .create_task(CreateTaskCommand {
339                    title: format!("[{}] {}", definition.name, step.name),
340                    objective: build_step_prompt(
341                        &step,
342                        &definition,
343                        body.trigger_payload.as_deref(),
344                    ),
345                    workspace_id: Some(workspace_id.clone()),
346                    session_id: None,
347                    scope: None,
348                    acceptance_criteria: None,
349                    verification_commands: None,
350                    test_cases: None,
351                    dependencies: dependencies.clone(),
352                    parallel_group: step.parallel_group.clone(),
353                    board_id: None,
354                    column_id: None,
355                    position: None,
356                    priority: None,
357                    labels: Some(vec![
358                        "workflow".to_string(),
359                        id.clone(),
360                        workflow_run_id.clone(),
361                    ]),
362                    assignee: None,
363                    assigned_provider: None,
364                    assigned_role: None,
365                    assigned_specialist_id: Some(step.specialist.clone()),
366                    assigned_specialist_name: Some(step.specialist.clone()),
367                    create_github_issue: Some(false),
368                    repo_path: None,
369                    codebase_ids: None,
370                    github_id: None,
371                    github_number: None,
372                    github_url: None,
373                    github_repo: None,
374                    github_state: None,
375                })
376                .await?;
377
378            state.task_store.save(&plan.task).await?;
379            task_ids.push(plan.task.id);
380        }
381    }
382
383    Ok((
384        axum::http::StatusCode::CREATED,
385        Json(serde_json::json!({
386            "workflowRunId": workflow_run_id,
387            "taskIds": task_ids,
388        })),
389    ))
390}