Skip to main content

routa_server/api/
tasks_automation.rs

1use chrono::Utc;
2use reqwest::header::{ACCEPT, CONTENT_TYPE};
3use routa_core::models::kanban::{KanbanAutomationStep, KanbanBoard, KanbanTransport};
4use serde_json::{json, Value};
5
6use crate::error::ServerError;
7use crate::models::task::{Task, TaskLaneSession, TaskLaneSessionStatus};
8use crate::state::AppState;
9
10pub async fn resolve_codebase(
11    state: &AppState,
12    workspace_id: &str,
13    repo_path: Option<&str>,
14) -> Result<Option<crate::models::codebase::Codebase>, ServerError> {
15    if let Some(path) = repo_path {
16        state
17            .codebase_store
18            .find_by_repo_path(workspace_id, path)
19            .await
20    } else {
21        state.codebase_store.get_default(workspace_id).await
22    }
23}
24
25pub async fn auto_create_worktree(
26    state: &AppState,
27    task: &crate::models::task::Task,
28    codebase: &crate::models::codebase::Codebase,
29) -> Result<String, String> {
30    let slugified = task
31        .title
32        .to_lowercase()
33        .chars()
34        .map(|c| if c.is_alphanumeric() { c } else { '-' })
35        .collect::<String>()
36        .split('-')
37        .filter(|s| !s.is_empty())
38        .collect::<Vec<_>>()
39        .join("-");
40    let short_id = &task.id[..task.id.len().min(8)];
41    let slug = format!("{}-{}", short_id, &slugified[..slugified.len().min(40)]);
42    let branch = format!("issue/{}", slug);
43
44    let workspace = state
45        .workspace_store
46        .get(&task.workspace_id)
47        .await
48        .ok()
49        .flatten();
50    let worktree_root = workspace
51        .as_ref()
52        .and_then(|ws| ws.metadata.get("worktreeRoot"))
53        .filter(|s| !s.trim().is_empty())
54        .map(std::path::PathBuf::from)
55        .unwrap_or_else(|| crate::git::get_default_workspace_worktree_root(&task.workspace_id));
56
57    let codebase_label = codebase
58        .label
59        .as_ref()
60        .map(|l| crate::git::branch_to_safe_dir_name(l))
61        .unwrap_or_else(|| crate::git::branch_to_safe_dir_name(&codebase.id));
62
63    let worktree_path = worktree_root
64        .join(&codebase_label)
65        .join(crate::git::branch_to_safe_dir_name(&slug));
66
67    if let Some(parent) = worktree_path.parent() {
68        std::fs::create_dir_all(parent)
69            .map_err(|e| format!("Failed to create worktree parent dir: {}", e))?;
70    }
71
72    let worktree_path_str = worktree_path.to_string_lossy().to_string();
73    let base_branch = codebase
74        .branch
75        .clone()
76        .unwrap_or_else(|| "main".to_string());
77
78    let worktree = crate::models::worktree::Worktree::new(
79        uuid::Uuid::new_v4().to_string(),
80        codebase.id.clone(),
81        task.workspace_id.clone(),
82        worktree_path_str.clone(),
83        branch.clone(),
84        base_branch.clone(),
85        Some(slug),
86    );
87    state
88        .worktree_store
89        .save(&worktree)
90        .await
91        .map_err(|e| format!("Failed to save worktree: {}", e))?;
92
93    let _ = crate::git::worktree_prune(&codebase.repo_path);
94    crate::git::worktree_add(
95        &codebase.repo_path,
96        &worktree_path_str,
97        &branch,
98        &base_branch,
99        false,
100    )
101    .map_err(|e| format!("git worktree add failed: {}", e))?;
102
103    Ok(worktree.id)
104}
105
106pub async fn trigger_assigned_task_agent(
107    state: &AppState,
108    task: &mut Task,
109    cwd: Option<&str>,
110    branch: Option<&str>,
111) -> Result<(), String> {
112    let board = load_task_board(state, task).await?;
113    let step = resolve_task_automation_step(board.as_ref(), task);
114    if is_a2a_step(step.as_ref()) {
115        return trigger_assigned_task_a2a_agent(state, task, board.as_ref(), step.as_ref()).await;
116    }
117
118    trigger_assigned_task_acp_agent(state, task, board.as_ref(), step.as_ref(), cwd, branch).await
119}
120
121fn build_task_prompt(
122    task: &Task,
123    board_id: Option<&str>,
124    next_column_id: Option<&str>,
125    available_columns: &str,
126) -> String {
127    let labels = if task.labels.is_empty() {
128        "Labels: none".to_string()
129    } else {
130        format!("Labels: {}", task.labels.join(", "))
131    };
132    let lane_id = task.column_id.as_deref().unwrap_or("backlog");
133    let lane_guidance = match lane_id {
134        "dev" => vec![
135            "You are in the `dev` lane. This lane may implement the requested change, but you must keep work scoped to the current card.".to_string(),
136            "Use `routa-coordination_update_card` to record concrete progress on this card before or after meaningful implementation steps.".to_string(),
137            "When implementation for this lane is complete, use `routa-coordination_move_card` to advance the same card.".to_string(),
138        ],
139        "todo" => vec![
140            "You are in the `todo` lane. This lane does not perform full implementation work.".to_string(),
141            "Only clarify the card, update its progress or status, and move the same card forward when the lane is complete.".to_string(),
142            "Do not edit files, do not inspect the whole repository, and do not run browser tests or environment diagnostics in this lane.".to_string(),
143        ],
144        _ => vec![
145            format!("You are in the `{lane_id}` lane. Keep work scoped to the current card and this lane only."),
146        ],
147    };
148    let mut sections = vec![
149        format!("You are assigned to Kanban task: {}", task.title),
150        String::new(),
151        "## Context".to_string(),
152        String::new(),
153        "**IMPORTANT**: You are working in Kanban lane automation for exactly one existing card.".to_string(),
154        "Only operate on the current card. Do not create a new task, do not switch to a different card, and do not broaden scope.".to_string(),
155        "Use the exact MCP tool names exposed by the provider. In OpenCode, prefer `routa-coordination_update_card` and `routa-coordination_move_card`.".to_string(),
156        "Do NOT use `gh issue create`, browser automation, Playwright, repo-wide debugging, API exploration, or unrelated codebase research unless the card objective explicitly requires it.".to_string(),
157        String::new(),
158        "## Task Details".to_string(),
159        String::new(),
160        format!("**Card ID:** {}", task.id),
161        format!(
162            "**Priority:** {}",
163            task.priority.as_ref().map(|value| value.as_str()).unwrap_or("medium")
164        ),
165        board_id
166            .map(|value| format!("**Board ID:** {}", value))
167            .unwrap_or_else(|| "**Board ID:** unavailable".to_string()),
168        format!("**Current Lane:** {}", lane_id),
169        next_column_id
170            .map(|value| format!("**Next Column ID:** {}", value))
171            .unwrap_or_else(|| "**Next Column ID:** unavailable".to_string()),
172        labels,
173        task.github_url
174            .as_ref()
175            .map(|url| format!("**GitHub Issue:** {}", url))
176            .unwrap_or_else(|| "**GitHub Issue:** local-only".to_string()),
177        String::new(),
178        "## Objective".to_string(),
179        String::new(),
180        task.objective.clone(),
181        String::new(),
182        "## Board Columns".to_string(),
183        String::new(),
184        available_columns.to_string(),
185        String::new(),
186        "## Lane Guidance".to_string(),
187        String::new(),
188        lane_guidance.join("\n"),
189        String::new(),
190    ];
191
192    if let Some(test_cases) = task.test_cases.as_ref().filter(|value| !value.is_empty()) {
193        sections.push("## Test Cases".to_string());
194        sections.push(String::new());
195        sections.push(
196            test_cases
197                .iter()
198                .map(|value| format!("- {}", value))
199                .collect::<Vec<_>>()
200                .join("\n"),
201        );
202        sections.push(String::new());
203    }
204
205    sections.extend([
206        "## Available MCP Tools".to_string(),
207        String::new(),
208        "Use the exact MCP tool names exposed in this session. For OpenCode, the important ones are:".to_string(),
209        String::new(),
210        format!(
211            "- **routa-coordination_update_card**: Update this card's title, description, priority, or labels. Use cardId: \"{}\"",
212            task.id
213        ),
214        format!(
215            "- **routa-coordination_move_card**: Move this same card to targetColumnId \"{}\" when the current lane is complete.",
216            next_column_id.unwrap_or("the exact next column id listed above")
217        ),
218        String::new(),
219        "## Instructions".to_string(),
220        String::new(),
221        "1. Start work for the current lane immediately.".to_string(),
222        "2. Keep changes focused on this card only.".to_string(),
223        "3. Use the exact tool name `routa-coordination_update_card` to record progress on this card.".to_string(),
224        format!(
225            "4. Use the exact tool name `routa-coordination_move_card` with targetColumnId `{}` only when the current lane is complete.",
226            next_column_id.unwrap_or("the exact next column id listed above")
227        ),
228        "5. Do not guess board ids or column ids. Use the Board ID and Board Columns listed above.".to_string(),
229        "6. If blocked, update this same card with the blocking reason instead of exploring side quests.".to_string(),
230        "7. Treat lane guidance as stricter than the general card objective when they conflict.".to_string(),
231        "8. Do not run browser tests or environment diagnostics unless the card explicitly asks for them.".to_string(),
232    ]);
233
234    sections.join("\n")
235}
236
237async fn trigger_assigned_task_acp_agent(
238    state: &AppState,
239    task: &mut Task,
240    board: Option<&KanbanBoard>,
241    step: Option<&KanbanAutomationStep>,
242    cwd: Option<&str>,
243    branch: Option<&str>,
244) -> Result<(), String> {
245    let provider = task
246        .assigned_provider
247        .clone()
248        .unwrap_or_else(|| "opencode".to_string());
249    let role = task
250        .assigned_role
251        .clone()
252        .unwrap_or_else(|| "CRAFTER".to_string())
253        .to_uppercase();
254    let session_id = uuid::Uuid::new_v4().to_string();
255    let cwd = cwd
256        .map(|value| value.to_string())
257        .or_else(|| {
258            std::env::current_dir()
259                .ok()
260                .map(|path| path.to_string_lossy().to_string())
261        })
262        .unwrap_or_else(|| ".".to_string());
263
264    state
265        .acp_manager
266        .create_session(
267            session_id.clone(),
268            cwd.clone(),
269            task.workspace_id.clone(),
270            Some(provider.clone()),
271            Some(role.clone()),
272            None,
273            None,
274            Some("full".to_string()),
275            Some("kanban-planning".to_string()),
276        )
277        .await
278        .map_err(|error| format!("Failed to create ACP session: {}", error))?;
279
280    state
281        .acp_session_store
282        .create(
283            &session_id,
284            &cwd,
285            None,
286            &task.workspace_id,
287            Some(provider.as_str()),
288            Some(role.as_str()),
289            None,
290        )
291        .await
292        .map_err(|error| format!("Failed to persist ACP session: {}", error))?;
293
294    let mut ordered_columns = board.map(|value| value.columns.clone()).unwrap_or_default();
295    ordered_columns.sort_by_key(|column| column.position);
296    let next_column_id = ordered_columns
297        .iter()
298        .position(|column| Some(column.id.as_str()) == task.column_id.as_deref())
299        .and_then(|index| ordered_columns.get(index + 1))
300        .map(|column| column.id.clone());
301    let available_columns = if ordered_columns.is_empty() {
302        "- unavailable".to_string()
303    } else {
304        ordered_columns
305            .iter()
306            .map(|column| {
307                format!(
308                    "- {} ({}) stage={} position={}",
309                    column.id, column.name, column.stage, column.position
310                )
311            })
312            .collect::<Vec<_>>()
313            .join("\n")
314    };
315    let prompt = build_task_prompt(
316        task,
317        board
318            .map(|value| value.id.as_str())
319            .or(task.board_id.as_deref()),
320        next_column_id.as_deref(),
321        &available_columns,
322    );
323    let state_clone = state.clone();
324    let session_id_clone = session_id.clone();
325    let task_workspace = task.workspace_id.clone();
326    let provider_clone = provider.clone();
327    let cwd_clone = cwd.clone();
328    let _branch = branch.map(|value| value.to_string());
329
330    if let Err(error) = state
331        .acp_session_store
332        .set_first_prompt_sent(&session_id)
333        .await
334    {
335        tracing::error!(
336            target: "routa_kanban_prompt",
337            session_id = %session_id,
338            workspace_id = %task.workspace_id,
339            error = %error,
340            "kanban auto prompt failed to mark prompt dispatched"
341        );
342    } else {
343        tracing::info!(
344            target: "routa_kanban_prompt",
345            session_id = %session_id,
346            workspace_id = %task.workspace_id,
347            provider = %provider,
348            "kanban auto prompt marked prompt dispatched"
349        );
350    }
351
352    tracing::info!(
353        target: "routa_kanban_prompt",
354        session_id = %session_id_clone,
355        workspace_id = %task_workspace,
356        provider = %provider_clone,
357        cwd = %cwd_clone,
358        "kanban auto prompt scheduled"
359    );
360
361    tokio::spawn(async move {
362        tracing::info!(
363            target: "routa_kanban_prompt",
364            session_id = %session_id_clone,
365            workspace_id = %task_workspace,
366            provider = %provider_clone,
367            cwd = %cwd_clone,
368            "kanban auto prompt start"
369        );
370        if let Err(error) = state_clone
371            .acp_manager
372            .prompt(&session_id_clone, &prompt)
373            .await
374        {
375            tracing::error!(
376                "[kanban] Failed to auto-prompt ACP task session {} in workspace {} with provider {} at {}: {}",
377                session_id_clone,
378                task_workspace,
379                provider_clone,
380                cwd_clone,
381                error
382            );
383            return;
384        }
385
386        tracing::info!(
387            target: "routa_kanban_prompt",
388            session_id = %session_id_clone,
389            workspace_id = %task_workspace,
390            provider = %provider_clone,
391            "kanban auto prompt success"
392        );
393        if let Some(history) = state_clone
394            .acp_manager
395            .get_session_history(&session_id_clone)
396            .await
397        {
398            if let Err(error) = state_clone
399                .acp_session_store
400                .save_history(&session_id_clone, &history)
401                .await
402            {
403                tracing::error!(
404                    target: "routa_kanban_prompt",
405                    session_id = %session_id_clone,
406                    workspace_id = %task_workspace,
407                    error = %error,
408                    "kanban auto prompt failed to persist history"
409                );
410            } else {
411                tracing::info!(
412                    target: "routa_kanban_prompt",
413                    session_id = %session_id_clone,
414                    workspace_id = %task_workspace,
415                    history_len = history.len(),
416                    "kanban auto prompt persisted history"
417                );
418            }
419        }
420    });
421
422    apply_trigger_result(
423        task,
424        board,
425        step,
426        AgentTriggerResult {
427            session_id,
428            transport: "acp".to_string(),
429            external_task_id: None,
430            context_id: None,
431        },
432    );
433
434    Ok(())
435}
436
437async fn trigger_assigned_task_a2a_agent(
438    _state: &AppState,
439    task: &mut Task,
440    board: Option<&KanbanBoard>,
441    step: Option<&KanbanAutomationStep>,
442) -> Result<(), String> {
443    let step = step.ok_or_else(|| "A2A automation requires a resolved column step".to_string())?;
444    let agent_card_url = step
445        .agent_card_url
446        .as_deref()
447        .ok_or_else(|| "A2A automation requires agentCardUrl".to_string())?;
448
449    let mut ordered_columns = board.map(|value| value.columns.clone()).unwrap_or_default();
450    ordered_columns.sort_by_key(|column| column.position);
451    let next_column_id = ordered_columns
452        .iter()
453        .position(|column| Some(column.id.as_str()) == task.column_id.as_deref())
454        .and_then(|index| ordered_columns.get(index + 1))
455        .map(|column| column.id.clone());
456    let available_columns = if ordered_columns.is_empty() {
457        "- unavailable".to_string()
458    } else {
459        ordered_columns
460            .iter()
461            .map(|column| {
462                format!(
463                    "- {} ({}) stage={} position={}",
464                    column.id, column.name, column.stage, column.position
465                )
466            })
467            .collect::<Vec<_>>()
468            .join("\n")
469    };
470    let prompt = build_task_prompt(
471        task,
472        board
473            .map(|value| value.id.as_str())
474            .or(task.board_id.as_deref()),
475        next_column_id.as_deref(),
476        &available_columns,
477    );
478
479    let client = reqwest::Client::new();
480    let rpc_endpoint = resolve_a2a_rpc_endpoint(&client, agent_card_url).await?;
481    let request_id = uuid::Uuid::new_v4().to_string();
482    let message_id = uuid::Uuid::new_v4().to_string();
483    let response = client
484        .post(&rpc_endpoint)
485        .header(CONTENT_TYPE, "application/json")
486        .header(ACCEPT, "application/json")
487        .json(&json!({
488            "jsonrpc": "2.0",
489            "id": request_id,
490            "method": "SendMessage",
491            "params": {
492                "message": {
493                    "messageId": message_id,
494                    "role": "user",
495                    "parts": [
496                        { "text": prompt }
497                    ]
498                },
499                "metadata": {
500                    "workspaceId": task.workspace_id,
501                    "taskId": task.id,
502                    "boardId": task.board_id,
503                    "columnId": task.column_id,
504                    "stepId": step.id,
505                    "skillId": step.skill_id,
506                    "authConfigId": step.auth_config_id,
507                    "role": task.assigned_role,
508                }
509            }
510        }))
511        .send()
512        .await
513        .map_err(|error| format!("Failed to send A2A request: {}", error))?;
514
515    if !response.status().is_success() {
516        return Err(format!(
517            "A2A request failed with HTTP {}",
518            response.status().as_u16()
519        ));
520    }
521
522    let payload: Value = response
523        .json()
524        .await
525        .map_err(|error| format!("Failed to decode A2A response: {}", error))?;
526    if let Some(error) = payload.get("error") {
527        let message = error
528            .get("message")
529            .and_then(Value::as_str)
530            .unwrap_or("unknown A2A error");
531        return Err(format!("A2A JSON-RPC error: {}", message));
532    }
533
534    let task_result = payload
535        .get("result")
536        .and_then(|value| value.get("task"))
537        .ok_or_else(|| "A2A response missing result.task".to_string())?;
538    let external_task_id = task_result
539        .get("id")
540        .and_then(Value::as_str)
541        .ok_or_else(|| "A2A response missing task.id".to_string())?
542        .to_string();
543    let context_id = task_result
544        .get("contextId")
545        .and_then(Value::as_str)
546        .map(ToOwned::to_owned);
547    let session_id = format!("a2a-{}", uuid::Uuid::new_v4());
548
549    apply_trigger_result(
550        task,
551        board,
552        Some(step),
553        AgentTriggerResult {
554            session_id,
555            transport: "a2a".to_string(),
556            external_task_id: Some(external_task_id),
557            context_id,
558        },
559    );
560
561    Ok(())
562}
563
564#[derive(Debug)]
565struct AgentTriggerResult {
566    session_id: String,
567    transport: String,
568    external_task_id: Option<String>,
569    context_id: Option<String>,
570}
571
572fn apply_trigger_result(
573    task: &mut Task,
574    board: Option<&KanbanBoard>,
575    step: Option<&KanbanAutomationStep>,
576    result: AgentTriggerResult,
577) {
578    task.trigger_session_id = Some(result.session_id.clone());
579    if !task.session_ids.iter().any(|id| id == &result.session_id) {
580        task.session_ids.push(result.session_id.clone());
581    }
582
583    let column_name = board.and_then(|value| {
584        value.columns.iter().find_map(|column| {
585            (Some(column.id.as_str()) == task.column_id.as_deref()).then(|| column.name.clone())
586        })
587    });
588    let lane_session = TaskLaneSession {
589        session_id: result.session_id.clone(),
590        routa_agent_id: None,
591        column_id: task.column_id.clone(),
592        column_name,
593        step_id: step.map(|value| value.id.clone()),
594        step_index: None,
595        step_name: step
596            .and_then(|value| value.specialist_name.clone())
597            .or_else(|| task.assigned_specialist_name.clone()),
598        provider: task.assigned_provider.clone(),
599        role: task.assigned_role.clone(),
600        specialist_id: task.assigned_specialist_id.clone(),
601        specialist_name: task.assigned_specialist_name.clone(),
602        transport: Some(result.transport),
603        external_task_id: result.external_task_id,
604        context_id: result.context_id,
605        attempt: Some(1),
606        loop_mode: None,
607        completion_requirement: None,
608        objective: Some(task.objective.clone()),
609        last_activity_at: Some(Utc::now().to_rfc3339()),
610        recovered_from_session_id: None,
611        recovery_reason: None,
612        status: TaskLaneSessionStatus::Running,
613        started_at: Utc::now().to_rfc3339(),
614        completed_at: None,
615    };
616
617    if let Some(existing) = task
618        .lane_sessions
619        .iter_mut()
620        .find(|existing| existing.session_id == result.session_id)
621    {
622        *existing = lane_session;
623    } else {
624        task.lane_sessions.push(lane_session);
625    }
626}
627
628async fn load_task_board(state: &AppState, task: &Task) -> Result<Option<KanbanBoard>, String> {
629    if let Some(board_id) = task.board_id.as_deref() {
630        state
631            .kanban_store
632            .get(board_id)
633            .await
634            .map_err(|error| format!("Failed to load Kanban board for automation: {}", error))
635    } else {
636        Ok(None)
637    }
638}
639
640fn resolve_task_automation_step(
641    board: Option<&KanbanBoard>,
642    task: &Task,
643) -> Option<KanbanAutomationStep> {
644    board
645        .and_then(|value| {
646            value
647                .columns
648                .iter()
649                .find(|column| Some(column.id.as_str()) == task.column_id.as_deref())
650        })
651        .and_then(|column| column.automation.as_ref())
652        .filter(|automation| automation.enabled)
653        .and_then(|automation| automation.primary_step())
654}
655
656fn is_a2a_step(step: Option<&KanbanAutomationStep>) -> bool {
657    step.is_some_and(|value| {
658        matches!(value.transport, Some(KanbanTransport::A2a)) || value.agent_card_url.is_some()
659    })
660}
661
662async fn resolve_a2a_rpc_endpoint(client: &reqwest::Client, url: &str) -> Result<String, String> {
663    if url.ends_with(".json") || url.ends_with("/agent-card") || url.ends_with("/card") {
664        let response = client
665            .get(url)
666            .header(ACCEPT, "application/json")
667            .send()
668            .await
669            .map_err(|error| format!("Failed to fetch A2A agent card: {}", error))?;
670        if !response.status().is_success() {
671            return Err(format!(
672                "A2A agent card fetch failed with HTTP {}",
673                response.status().as_u16()
674            ));
675        }
676        let card: Value = response
677            .json()
678            .await
679            .map_err(|error| format!("Failed to decode A2A agent card: {}", error))?;
680        let rpc_url = card
681            .get("url")
682            .and_then(Value::as_str)
683            .ok_or_else(|| "A2A agent card missing url".to_string())?;
684        absolutize_url(url, rpc_url)
685    } else {
686        Ok(url.to_string())
687    }
688}
689
690fn absolutize_url(base_url: &str, maybe_relative: &str) -> Result<String, String> {
691    if maybe_relative.starts_with("http://") || maybe_relative.starts_with("https://") {
692        return Ok(maybe_relative.to_string());
693    }
694
695    let base = reqwest::Url::parse(base_url)
696        .map_err(|error| format!("Invalid base A2A URL {}: {}", base_url, error))?;
697    base.join(maybe_relative)
698        .map(|url| url.to_string())
699        .map_err(|error| format!("Invalid relative A2A URL {}: {}", maybe_relative, error))
700}