1use crate::application::tasks::{CreateTaskCommand, TaskApplicationService};
2use axum::{
3 extract::{Path, State},
4 routing::get,
5 Json, Router,
6};
7use routa_core::workflow::schema::{WorkflowDefinition, WorkflowStep};
8use serde::Deserialize;
9use std::path::PathBuf;
10
11use crate::error::ServerError;
12use crate::state::AppState;
13
14const FLOWS_SUBDIR: &str = "resources/flows";
15
16pub fn router() -> Router<AppState> {
17 Router::new()
18 .route("/", get(list_workflows).post(create_workflow))
19 .route(
20 "/{id}",
21 get(get_workflow)
22 .put(update_workflow)
23 .delete(delete_workflow),
24 )
25 .route("/{id}/trigger", axum::routing::post(trigger_workflow))
26}
27
28fn flows_dir() -> Result<PathBuf, ServerError> {
29 let cwd = std::env::current_dir()
30 .map_err(|e| ServerError::Internal(format!("Failed to get cwd: {}", e)))?;
31 let dir = cwd.join(FLOWS_SUBDIR);
32 if !dir.exists() {
33 std::fs::create_dir_all(&dir)
34 .map_err(|e| ServerError::Internal(format!("Failed to create flows dir: {}", e)))?;
35 }
36 Ok(dir)
37}
38
39fn parse_workflow(id: &str, content: &str) -> serde_json::Value {
40 let parsed: serde_yaml::Value = serde_yaml::from_str(content).unwrap_or_default();
41 let name = parsed.get("name").and_then(|v| v.as_str()).unwrap_or(id);
42 let description = parsed
43 .get("description")
44 .and_then(|v| v.as_str())
45 .unwrap_or("");
46 let version = parsed
47 .get("version")
48 .and_then(|v| v.as_str())
49 .unwrap_or("1.0");
50 let trigger = parsed
51 .get("trigger")
52 .map(|v| serde_json::to_value(v).unwrap_or_default())
53 .unwrap_or(serde_json::Value::Null);
54 let steps = parsed
55 .get("steps")
56 .map(|v| serde_json::to_value(v).unwrap_or_default())
57 .unwrap_or(serde_json::json!([]));
58
59 serde_json::json!({
60 "id": id,
61 "name": name,
62 "description": description,
63 "version": version,
64 "trigger": trigger,
65 "steps": steps,
66 "yamlContent": content,
67 })
68}
69
70fn workflow_file_path(id: &str) -> Result<PathBuf, ServerError> {
71 Ok(flows_dir()?.join(format!("{}.yaml", id)))
72}
73
74fn load_workflow_definition(id: &str) -> Result<WorkflowDefinition, ServerError> {
75 let file_path = workflow_file_path(id)?;
76 if !file_path.exists() {
77 return Err(ServerError::NotFound("Workflow not found".to_string()));
78 }
79
80 let content = std::fs::read_to_string(&file_path)
81 .map_err(|e| ServerError::Internal(format!("Failed to read workflow: {}", e)))?;
82
83 serde_yaml::from_str(&content)
84 .map_err(|e| ServerError::BadRequest(format!("Invalid workflow YAML: {}", e)))
85}
86
87fn require_workspace_id(value: &str) -> Result<String, ServerError> {
88 let normalized = value.trim();
89 if normalized.is_empty() {
90 return Err(ServerError::BadRequest(
91 "workspaceId is required".to_string(),
92 ));
93 }
94 Ok(normalized.to_string())
95}
96
97fn group_steps_by_parallel(steps: &[WorkflowStep]) -> Vec<Vec<WorkflowStep>> {
98 let mut groups: Vec<Vec<WorkflowStep>> = Vec::new();
99 let mut current_group: Vec<WorkflowStep> = Vec::new();
100 let mut current_parallel_group: Option<String> = None;
101
102 for step in steps {
103 if let Some(parallel_group) = &step.parallel_group {
104 if current_parallel_group.as_deref() == Some(parallel_group.as_str()) {
105 current_group.push(step.clone());
106 } else {
107 if !current_group.is_empty() {
108 groups.push(current_group);
109 }
110 current_group = vec![step.clone()];
111 current_parallel_group = Some(parallel_group.clone());
112 }
113 } else {
114 if !current_group.is_empty() {
115 groups.push(current_group);
116 current_group = Vec::new();
117 }
118 groups.push(vec![step.clone()]);
119 current_parallel_group = None;
120 }
121 }
122
123 if !current_group.is_empty() {
124 groups.push(current_group);
125 }
126
127 groups
128}
129
130fn build_step_prompt(
131 step: &WorkflowStep,
132 definition: &WorkflowDefinition,
133 trigger_payload: Option<&str>,
134) -> String {
135 let mut prompt = step.input.clone().unwrap_or_default();
136 prompt = prompt.replace("${trigger.payload}", trigger_payload.unwrap_or_default());
137
138 for (key, value) in &definition.variables {
139 prompt = prompt.replace(&format!("${{variables.{}}}", key), value);
140 prompt = prompt.replace(&format!("${{{}}}", key), value);
141 }
142
143 if prompt.trim().is_empty() {
144 format!("Execute step: {}", step.name)
145 } else {
146 prompt
147 }
148}
149
150async fn list_workflows() -> Result<Json<serde_json::Value>, ServerError> {
152 let dir = flows_dir()?;
153 let mut workflows = Vec::new();
154
155 let entries = std::fs::read_dir(&dir)
156 .map_err(|e| ServerError::Internal(format!("Failed to read flows dir: {}", e)))?;
157
158 for entry in entries.flatten() {
159 let path = entry.path();
160 let ext = path.extension().and_then(|e| e.to_str()).unwrap_or("");
161 if ext != "yaml" && ext != "yml" {
162 continue;
163 }
164 let id = path
165 .file_stem()
166 .and_then(|s| s.to_str())
167 .unwrap_or("")
168 .to_string();
169 if id.is_empty() {
170 continue;
171 }
172 match std::fs::read_to_string(&path) {
173 Ok(content) => workflows.push(parse_workflow(&id, &content)),
174 Err(_) => continue,
175 }
176 }
177
178 Ok(Json(serde_json::json!({ "workflows": workflows })))
179}
180
181#[derive(Debug, Deserialize)]
182struct CreateWorkflowInput {
183 id: String,
184 #[serde(rename = "yamlContent")]
185 yaml_content: String,
186}
187
188async fn create_workflow(
190 Json(body): Json<CreateWorkflowInput>,
191) -> Result<(axum::http::StatusCode, Json<serde_json::Value>), ServerError> {
192 let id_re = regex::Regex::new(r"^[a-zA-Z0-9_-]+$").unwrap();
194 if !id_re.is_match(&body.id) {
195 return Err(ServerError::BadRequest(
196 "ID must contain only letters, numbers, hyphens, and underscores".to_string(),
197 ));
198 }
199
200 let parsed: serde_yaml::Value = serde_yaml::from_str(&body.yaml_content)
202 .map_err(|e| ServerError::BadRequest(format!("Invalid YAML: {}", e)))?;
203
204 let has_name = parsed.get("name").and_then(|v| v.as_str()).is_some();
205 let has_steps = parsed
206 .get("steps")
207 .and_then(|v| v.as_sequence())
208 .map(|s| !s.is_empty())
209 .unwrap_or(false);
210
211 if !has_name || !has_steps {
212 return Err(ServerError::BadRequest(
213 "Workflow YAML must have name and at least one step".to_string(),
214 ));
215 }
216
217 let dir = flows_dir()?;
218 let file_path = dir.join(format!("{}.yaml", body.id));
219
220 if file_path.exists() {
221 return Err(ServerError::Conflict(format!(
222 "Workflow with id \"{}\" already exists",
223 body.id
224 )));
225 }
226
227 std::fs::write(&file_path, &body.yaml_content)
228 .map_err(|e| ServerError::Internal(format!("Failed to write workflow: {}", e)))?;
229
230 let workflow = parse_workflow(&body.id, &body.yaml_content);
231 Ok((
232 axum::http::StatusCode::CREATED,
233 Json(serde_json::json!({ "workflow": workflow })),
234 ))
235}
236
237async fn get_workflow(Path(id): Path<String>) -> Result<Json<serde_json::Value>, ServerError> {
239 let file_path = workflow_file_path(&id)?;
240 let content = std::fs::read_to_string(&file_path)
241 .map_err(|e| ServerError::Internal(format!("Failed to read workflow: {}", e)))?;
242
243 Ok(Json(
244 serde_json::json!({ "workflow": parse_workflow(&id, &content) }),
245 ))
246}
247
248#[derive(Debug, Deserialize)]
249struct UpdateWorkflowInput {
250 #[serde(rename = "yamlContent")]
251 yaml_content: String,
252}
253
254async fn update_workflow(
256 Path(id): Path<String>,
257 Json(body): Json<UpdateWorkflowInput>,
258) -> Result<Json<serde_json::Value>, ServerError> {
259 let dir = flows_dir()?;
260 let file_path = dir.join(format!("{}.yaml", id));
261
262 if !file_path.exists() {
263 return Err(ServerError::NotFound("Workflow not found".to_string()));
264 }
265
266 let parsed: serde_yaml::Value = serde_yaml::from_str(&body.yaml_content)
268 .map_err(|e| ServerError::BadRequest(format!("Invalid YAML: {}", e)))?;
269
270 let has_name = parsed.get("name").and_then(|v| v.as_str()).is_some();
271 let has_steps = parsed
272 .get("steps")
273 .and_then(|v| v.as_sequence())
274 .map(|s| !s.is_empty())
275 .unwrap_or(false);
276
277 if !has_name || !has_steps {
278 return Err(ServerError::BadRequest(
279 "Workflow YAML must have name and at least one step".to_string(),
280 ));
281 }
282
283 std::fs::write(&file_path, &body.yaml_content)
284 .map_err(|e| ServerError::Internal(format!("Failed to write workflow: {}", e)))?;
285
286 Ok(Json(
287 serde_json::json!({ "workflow": parse_workflow(&id, &body.yaml_content) }),
288 ))
289}
290
291async fn delete_workflow(Path(id): Path<String>) -> Result<Json<serde_json::Value>, ServerError> {
293 let dir = flows_dir()?;
294 let file_path = dir.join(format!("{}.yaml", id));
295
296 if !file_path.exists() {
297 return Err(ServerError::NotFound("Workflow not found".to_string()));
298 }
299
300 std::fs::remove_file(&file_path)
301 .map_err(|e| ServerError::Internal(format!("Failed to delete workflow: {}", e)))?;
302
303 Ok(Json(serde_json::json!({ "success": true })))
304}
305
306#[derive(Debug, Deserialize)]
307#[serde(rename_all = "camelCase")]
308struct TriggerWorkflowInput {
309 workspace_id: String,
310 trigger_payload: Option<String>,
311}
312
313async fn trigger_workflow(
315 State(state): State<AppState>,
316 Path(id): Path<String>,
317 Json(body): Json<TriggerWorkflowInput>,
318) -> Result<(axum::http::StatusCode, Json<serde_json::Value>), ServerError> {
319 let workspace_id = require_workspace_id(&body.workspace_id)?;
320 let Some(_) = state.workspace_store.get(&workspace_id).await? else {
321 return Err(ServerError::NotFound("Workspace not found".to_string()));
322 };
323
324 let definition = load_workflow_definition(&id)?;
325 let workflow_run_id = uuid::Uuid::new_v4().to_string();
326 let task_service = TaskApplicationService::new(state.clone());
327 let mut task_ids = Vec::new();
328
329 for group in group_steps_by_parallel(&definition.steps) {
330 let dependencies = if task_ids.is_empty() {
331 None
332 } else {
333 Some(task_ids.clone())
334 };
335
336 for step in group {
337 let plan = task_service
338 .create_task(CreateTaskCommand {
339 title: format!("[{}] {}", definition.name, step.name),
340 objective: build_step_prompt(
341 &step,
342 &definition,
343 body.trigger_payload.as_deref(),
344 ),
345 workspace_id: Some(workspace_id.clone()),
346 session_id: None,
347 scope: None,
348 acceptance_criteria: None,
349 verification_commands: None,
350 test_cases: None,
351 dependencies: dependencies.clone(),
352 parallel_group: step.parallel_group.clone(),
353 board_id: None,
354 column_id: None,
355 position: None,
356 priority: None,
357 labels: Some(vec![
358 "workflow".to_string(),
359 id.clone(),
360 workflow_run_id.clone(),
361 ]),
362 assignee: None,
363 assigned_provider: None,
364 assigned_role: None,
365 assigned_specialist_id: Some(step.specialist.clone()),
366 assigned_specialist_name: Some(step.specialist.clone()),
367 create_github_issue: Some(false),
368 repo_path: None,
369 codebase_ids: None,
370 github_id: None,
371 github_number: None,
372 github_url: None,
373 github_repo: None,
374 github_state: None,
375 })
376 .await?;
377
378 state.task_store.save(&plan.task).await?;
379 task_ids.push(plan.task.id);
380 }
381 }
382
383 Ok((
384 axum::http::StatusCode::CREATED,
385 Json(serde_json::json!({
386 "workflowRunId": workflow_run_id,
387 "taskIds": task_ids,
388 })),
389 ))
390}