Skip to main content

routa_server/api/
tasks_automation.rs

1use chrono::Utc;
2use reqwest::header::{HeaderName, HeaderValue, ACCEPT, CONTENT_TYPE};
3use routa_core::events::{AgentEvent, AgentEventType};
4use routa_core::models::kanban::{KanbanAutomationStep, KanbanBoard, KanbanTransport};
5use serde_json::{json, Value};
6use std::collections::HashMap;
7use std::time::{Duration, Instant};
8
9use crate::error::ServerError;
10use crate::models::task::{Task, TaskLaneSession, TaskLaneSessionStatus};
11use crate::state::AppState;
12use routa_core::store::acp_session_store::CreateAcpSessionParams;
13
14const A2A_POLL_INTERVAL: Duration = Duration::from_secs(1);
15const A2A_MAX_WAIT: Duration = Duration::from_secs(300);
16const A2A_AUTH_CONFIGS_ENV: &str = "ROUTA_A2A_AUTH_CONFIGS";
17
18pub async fn resolve_codebase(
19    state: &AppState,
20    workspace_id: &str,
21    repo_path: Option<&str>,
22) -> Result<Option<crate::models::codebase::Codebase>, ServerError> {
23    if let Some(path) = repo_path {
24        state
25            .codebase_store
26            .find_by_repo_path(workspace_id, path)
27            .await
28    } else {
29        state.codebase_store.get_default(workspace_id).await
30    }
31}
32
33pub async fn auto_create_worktree(
34    state: &AppState,
35    task: &crate::models::task::Task,
36    codebase: &crate::models::codebase::Codebase,
37) -> Result<String, String> {
38    let slugified = task
39        .title
40        .to_lowercase()
41        .chars()
42        .map(|c| if c.is_alphanumeric() { c } else { '-' })
43        .collect::<String>()
44        .split('-')
45        .filter(|s| !s.is_empty())
46        .collect::<Vec<_>>()
47        .join("-");
48    let short_id = &task.id[..task.id.len().min(8)];
49    let slug = format!("{}-{}", short_id, &slugified[..slugified.len().min(40)]);
50    let branch = format!("issue/{}", slug);
51
52    let workspace = state
53        .workspace_store
54        .get(&task.workspace_id)
55        .await
56        .ok()
57        .flatten();
58    let worktree_root = workspace
59        .as_ref()
60        .and_then(|ws| ws.metadata.get("worktreeRoot"))
61        .filter(|s| !s.trim().is_empty())
62        .map(std::path::PathBuf::from)
63        .unwrap_or_else(|| crate::git::get_default_workspace_worktree_root(&task.workspace_id));
64
65    let codebase_label = codebase
66        .label
67        .as_ref()
68        .map(|l| crate::git::branch_to_safe_dir_name(l))
69        .unwrap_or_else(|| crate::git::branch_to_safe_dir_name(&codebase.id));
70
71    let worktree_path = worktree_root
72        .join(&codebase_label)
73        .join(crate::git::branch_to_safe_dir_name(&slug));
74
75    if let Some(parent) = worktree_path.parent() {
76        std::fs::create_dir_all(parent)
77            .map_err(|e| format!("Failed to create worktree parent dir: {}", e))?;
78    }
79
80    let worktree_path_str = worktree_path.to_string_lossy().to_string();
81    let base_branch = codebase
82        .branch
83        .clone()
84        .unwrap_or_else(|| "main".to_string());
85
86    let worktree = crate::models::worktree::Worktree::new(
87        uuid::Uuid::new_v4().to_string(),
88        codebase.id.clone(),
89        task.workspace_id.clone(),
90        worktree_path_str.clone(),
91        branch.clone(),
92        base_branch.clone(),
93        Some(slug),
94    );
95    state
96        .worktree_store
97        .save(&worktree)
98        .await
99        .map_err(|e| format!("Failed to save worktree: {}", e))?;
100
101    let _ = crate::git::worktree_prune(&codebase.repo_path);
102    crate::git::worktree_add(
103        &codebase.repo_path,
104        &worktree_path_str,
105        &branch,
106        &base_branch,
107        false,
108    )
109    .map_err(|e| format!("git worktree add failed: {}", e))?;
110
111    Ok(worktree.id)
112}
113
114pub async fn trigger_assigned_task_agent(
115    state: &AppState,
116    task: &mut Task,
117    cwd: Option<&str>,
118    branch: Option<&str>,
119) -> Result<(), String> {
120    let board = load_task_board(state, task).await?;
121    let step = resolve_task_automation_step(board.as_ref(), task);
122    if is_a2a_step(step.as_ref()) {
123        return trigger_assigned_task_a2a_agent(state, task, board.as_ref(), step.as_ref()).await;
124    }
125
126    trigger_assigned_task_acp_agent(state, task, board.as_ref(), step.as_ref(), cwd, branch).await
127}
128
129fn build_task_prompt(
130    task: &Task,
131    board_id: Option<&str>,
132    next_column_id: Option<&str>,
133    available_columns: &str,
134) -> String {
135    let labels = if task.labels.is_empty() {
136        "Labels: none".to_string()
137    } else {
138        format!("Labels: {}", task.labels.join(", "))
139    };
140    let lane_id = task.column_id.as_deref().unwrap_or("backlog");
141    let lane_guidance = match lane_id {
142        "dev" => vec![
143            "You are in the `dev` lane. This lane may implement the requested change, but you must keep work scoped to the current card.".to_string(),
144            "Use `routa-coordination_update_card` to record concrete progress on this card before or after meaningful implementation steps.".to_string(),
145            "When implementation for this lane is complete, use `routa-coordination_move_card` to advance the same card.".to_string(),
146        ],
147        "todo" => vec![
148            "You are in the `todo` lane. This lane does not perform full implementation work.".to_string(),
149            "Only clarify the card, update its progress or status, and move the same card forward when the lane is complete.".to_string(),
150            "Do not edit files, do not inspect the whole repository, and do not run browser tests or environment diagnostics in this lane.".to_string(),
151        ],
152        _ => vec![
153            format!("You are in the `{lane_id}` lane. Keep work scoped to the current card and this lane only."),
154        ],
155    };
156    let mut sections = vec![
157        format!("You are assigned to Kanban task: {}", task.title),
158        String::new(),
159        "## Context".to_string(),
160        String::new(),
161        "**IMPORTANT**: You are working in Kanban lane automation for exactly one existing card.".to_string(),
162        "Only operate on the current card. Do not create a new task, do not switch to a different card, and do not broaden scope.".to_string(),
163        "Use the exact MCP tool names exposed by the provider. In OpenCode, prefer `routa-coordination_update_card` and `routa-coordination_move_card`.".to_string(),
164        "Do NOT use `gh issue create`, browser automation, Playwright, repo-wide debugging, API exploration, or unrelated codebase research unless the card objective explicitly requires it.".to_string(),
165        String::new(),
166        "## Task Details".to_string(),
167        String::new(),
168        format!("**Card ID:** {}", task.id),
169        format!(
170            "**Priority:** {}",
171            task.priority.as_ref().map(|value| value.as_str()).unwrap_or("medium")
172        ),
173        board_id
174            .map(|value| format!("**Board ID:** {}", value))
175            .unwrap_or_else(|| "**Board ID:** unavailable".to_string()),
176        format!("**Current Lane:** {}", lane_id),
177        next_column_id
178            .map(|value| format!("**Next Column ID:** {}", value))
179            .unwrap_or_else(|| "**Next Column ID:** unavailable".to_string()),
180        labels,
181        task.github_url
182            .as_ref()
183            .map(|url| format!("**GitHub Issue:** {}", url))
184            .unwrap_or_else(|| "**GitHub Issue:** local-only".to_string()),
185        String::new(),
186        "## Objective".to_string(),
187        String::new(),
188        task.objective.clone(),
189        String::new(),
190        "## Board Columns".to_string(),
191        String::new(),
192        available_columns.to_string(),
193        String::new(),
194        "## Lane Guidance".to_string(),
195        String::new(),
196        lane_guidance.join("\n"),
197        String::new(),
198    ];
199
200    if let Some(test_cases) = task.test_cases.as_ref().filter(|value| !value.is_empty()) {
201        sections.push("## Test Cases".to_string());
202        sections.push(String::new());
203        sections.push(
204            test_cases
205                .iter()
206                .map(|value| format!("- {}", value))
207                .collect::<Vec<_>>()
208                .join("\n"),
209        );
210        sections.push(String::new());
211    }
212
213    sections.extend([
214        "## Available MCP Tools".to_string(),
215        String::new(),
216        "Use the exact MCP tool names exposed in this session. For OpenCode, the important ones are:".to_string(),
217        String::new(),
218        format!(
219            "- **routa-coordination_update_card**: Update this card's title, description, priority, or labels. Use cardId: \"{}\"",
220            task.id
221        ),
222        format!(
223            "- **routa-coordination_move_card**: Move this same card to targetColumnId \"{}\" when the current lane is complete.",
224            next_column_id.unwrap_or("the exact next column id listed above")
225        ),
226        String::new(),
227        "## Instructions".to_string(),
228        String::new(),
229        "1. Start work for the current lane immediately.".to_string(),
230        "2. Keep changes focused on this card only.".to_string(),
231        "3. Use the exact tool name `routa-coordination_update_card` to record progress on this card.".to_string(),
232        format!(
233            "4. Use the exact tool name `routa-coordination_move_card` with targetColumnId `{}` only when the current lane is complete.",
234            next_column_id.unwrap_or("the exact next column id listed above")
235        ),
236        "5. Do not guess board ids or column ids. Use the Board ID and Board Columns listed above.".to_string(),
237        "6. If blocked, update this same card with the blocking reason instead of exploring side quests.".to_string(),
238        "7. Treat lane guidance as stricter than the general card objective when they conflict.".to_string(),
239        "8. Do not run browser tests or environment diagnostics unless the card explicitly asks for them.".to_string(),
240    ]);
241
242    sections.join("\n")
243}
244
245async fn trigger_assigned_task_acp_agent(
246    state: &AppState,
247    task: &mut Task,
248    board: Option<&KanbanBoard>,
249    step: Option<&KanbanAutomationStep>,
250    cwd: Option<&str>,
251    branch: Option<&str>,
252) -> Result<(), String> {
253    let provider = task
254        .assigned_provider
255        .clone()
256        .unwrap_or_else(|| "opencode".to_string());
257    let role = task
258        .assigned_role
259        .clone()
260        .unwrap_or_else(|| "CRAFTER".to_string())
261        .to_uppercase();
262    let session_id = uuid::Uuid::new_v4().to_string();
263    let cwd = cwd
264        .map(|value| value.to_string())
265        .or_else(|| {
266            std::env::current_dir()
267                .ok()
268                .map(|path| path.to_string_lossy().to_string())
269        })
270        .unwrap_or_else(|| ".".to_string());
271
272    state
273        .acp_manager
274        .create_session(
275            session_id.clone(),
276            cwd.clone(),
277            task.workspace_id.clone(),
278            Some(provider.clone()),
279            Some(role.clone()),
280            None,
281            None,
282            Some("full".to_string()),
283            Some("kanban-planning".to_string()),
284        )
285        .await
286        .map_err(|error| format!("Failed to create ACP session: {}", error))?;
287
288    state
289        .acp_session_store
290        .create(CreateAcpSessionParams {
291            id: &session_id,
292            cwd: &cwd,
293            branch: None,
294            workspace_id: &task.workspace_id,
295            provider: Some(provider.as_str()),
296            role: Some(role.as_str()),
297            parent_session_id: None,
298        })
299        .await
300        .map_err(|error| format!("Failed to persist ACP session: {}", error))?;
301
302    let mut ordered_columns = board.map(|value| value.columns.clone()).unwrap_or_default();
303    ordered_columns.sort_by_key(|column| column.position);
304    let next_column_id = ordered_columns
305        .iter()
306        .position(|column| Some(column.id.as_str()) == task.column_id.as_deref())
307        .and_then(|index| ordered_columns.get(index + 1))
308        .map(|column| column.id.clone());
309    let available_columns = if ordered_columns.is_empty() {
310        "- unavailable".to_string()
311    } else {
312        ordered_columns
313            .iter()
314            .map(|column| {
315                format!(
316                    "- {} ({}) stage={} position={}",
317                    column.id, column.name, column.stage, column.position
318                )
319            })
320            .collect::<Vec<_>>()
321            .join("\n")
322    };
323    let prompt = build_task_prompt(
324        task,
325        board
326            .map(|value| value.id.as_str())
327            .or(task.board_id.as_deref()),
328        next_column_id.as_deref(),
329        &available_columns,
330    );
331    let state_clone = state.clone();
332    let session_id_clone = session_id.clone();
333    let task_workspace = task.workspace_id.clone();
334    let provider_clone = provider.clone();
335    let cwd_clone = cwd.clone();
336    let _branch = branch.map(|value| value.to_string());
337
338    if let Err(error) = state
339        .acp_session_store
340        .set_first_prompt_sent(&session_id)
341        .await
342    {
343        tracing::error!(
344            target: "routa_kanban_prompt",
345            session_id = %session_id,
346            workspace_id = %task.workspace_id,
347            error = %error,
348            "kanban auto prompt failed to mark prompt dispatched"
349        );
350    } else {
351        tracing::info!(
352            target: "routa_kanban_prompt",
353            session_id = %session_id,
354            workspace_id = %task.workspace_id,
355            provider = %provider,
356            "kanban auto prompt marked prompt dispatched"
357        );
358    }
359
360    tracing::info!(
361        target: "routa_kanban_prompt",
362        session_id = %session_id_clone,
363        workspace_id = %task_workspace,
364        provider = %provider_clone,
365        cwd = %cwd_clone,
366        "kanban auto prompt scheduled"
367    );
368
369    tokio::spawn(async move {
370        tracing::info!(
371            target: "routa_kanban_prompt",
372            session_id = %session_id_clone,
373            workspace_id = %task_workspace,
374            provider = %provider_clone,
375            cwd = %cwd_clone,
376            "kanban auto prompt start"
377        );
378        if let Err(error) = state_clone
379            .acp_manager
380            .prompt(&session_id_clone, &prompt)
381            .await
382        {
383            tracing::error!(
384                "[kanban] Failed to auto-prompt ACP task session {} in workspace {} with provider {} at {}: {}",
385                session_id_clone,
386                task_workspace,
387                provider_clone,
388                cwd_clone,
389                error
390            );
391            return;
392        }
393
394        tracing::info!(
395            target: "routa_kanban_prompt",
396            session_id = %session_id_clone,
397            workspace_id = %task_workspace,
398            provider = %provider_clone,
399            "kanban auto prompt success"
400        );
401        if let Some(history) = state_clone
402            .acp_manager
403            .get_session_history(&session_id_clone)
404            .await
405        {
406            if let Err(error) = state_clone
407                .acp_session_store
408                .save_history(&session_id_clone, &history)
409                .await
410            {
411                tracing::error!(
412                    target: "routa_kanban_prompt",
413                    session_id = %session_id_clone,
414                    workspace_id = %task_workspace,
415                    error = %error,
416                    "kanban auto prompt failed to persist history"
417                );
418            } else {
419                tracing::info!(
420                    target: "routa_kanban_prompt",
421                    session_id = %session_id_clone,
422                    workspace_id = %task_workspace,
423                    history_len = history.len(),
424                    "kanban auto prompt persisted history"
425                );
426            }
427        }
428    });
429
430    apply_trigger_result(
431        task,
432        board,
433        step,
434        AgentTriggerResult {
435            session_id,
436            transport: "acp".to_string(),
437            external_task_id: None,
438            context_id: None,
439        },
440    );
441
442    Ok(())
443}
444
445async fn trigger_assigned_task_a2a_agent(
446    state: &AppState,
447    task: &mut Task,
448    board: Option<&KanbanBoard>,
449    step: Option<&KanbanAutomationStep>,
450) -> Result<(), String> {
451    let step = step.ok_or_else(|| "A2A automation requires a resolved column step".to_string())?;
452    let agent_card_url = step
453        .agent_card_url
454        .as_deref()
455        .ok_or_else(|| "A2A automation requires agentCardUrl".to_string())?;
456    let auth_headers = resolve_a2a_auth_headers(step.auth_config_id.as_deref())?;
457
458    let mut ordered_columns = board.map(|value| value.columns.clone()).unwrap_or_default();
459    ordered_columns.sort_by_key(|column| column.position);
460    let next_column_id = ordered_columns
461        .iter()
462        .position(|column| Some(column.id.as_str()) == task.column_id.as_deref())
463        .and_then(|index| ordered_columns.get(index + 1))
464        .map(|column| column.id.clone());
465    let available_columns = if ordered_columns.is_empty() {
466        "- unavailable".to_string()
467    } else {
468        ordered_columns
469            .iter()
470            .map(|column| {
471                format!(
472                    "- {} ({}) stage={} position={}",
473                    column.id, column.name, column.stage, column.position
474                )
475            })
476            .collect::<Vec<_>>()
477            .join("\n")
478    };
479    let prompt = build_task_prompt(
480        task,
481        board
482            .map(|value| value.id.as_str())
483            .or(task.board_id.as_deref()),
484        next_column_id.as_deref(),
485        &available_columns,
486    );
487
488    let client = reqwest::Client::new();
489    let rpc_endpoint =
490        resolve_a2a_rpc_endpoint(&client, agent_card_url, auth_headers.as_ref()).await?;
491    let request_id = uuid::Uuid::new_v4().to_string();
492    let message_id = uuid::Uuid::new_v4().to_string();
493    let response = apply_a2a_auth_headers(
494        client
495            .post(&rpc_endpoint)
496            .header(CONTENT_TYPE, "application/json")
497            .header(ACCEPT, "application/json")
498            .json(&json!({
499                "jsonrpc": "2.0",
500                "id": request_id,
501                "method": "SendMessage",
502                "params": {
503                    "message": {
504                        "messageId": message_id,
505                        "role": "user",
506                        "parts": [
507                            { "text": prompt }
508                        ]
509                    },
510                    "metadata": {
511                        "workspaceId": task.workspace_id,
512                        "taskId": task.id,
513                        "boardId": task.board_id,
514                        "columnId": task.column_id,
515                        "stepId": step.id,
516                        "skillId": step.skill_id,
517                        "authConfigId": step.auth_config_id,
518                        "role": task.assigned_role,
519                    }
520                }
521            })),
522        auth_headers.as_ref(),
523    )?
524    .send()
525    .await
526    .map_err(|error| format!("Failed to send A2A request: {}", error))?;
527
528    if !response.status().is_success() {
529        return Err(format!(
530            "A2A request failed with HTTP {}",
531            response.status().as_u16()
532        ));
533    }
534
535    let payload: Value = response
536        .json()
537        .await
538        .map_err(|error| format!("Failed to decode A2A response: {}", error))?;
539    if let Some(error) = payload.get("error") {
540        let message = error
541            .get("message")
542            .and_then(Value::as_str)
543            .unwrap_or("unknown A2A error");
544        return Err(format!("A2A JSON-RPC error: {}", message));
545    }
546
547    let task_result = payload
548        .get("result")
549        .and_then(|value| value.get("task"))
550        .ok_or_else(|| "A2A response missing result.task".to_string())?;
551    let external_task_id = task_result
552        .get("id")
553        .and_then(Value::as_str)
554        .ok_or_else(|| "A2A response missing task.id".to_string())?
555        .to_string();
556    let context_id = task_result
557        .get("contextId")
558        .and_then(Value::as_str)
559        .map(ToOwned::to_owned);
560    let session_id = format!("a2a-{}", uuid::Uuid::new_v4());
561
562    apply_trigger_result(
563        task,
564        board,
565        Some(step),
566        AgentTriggerResult {
567            session_id: session_id.clone(),
568            transport: "a2a".to_string(),
569            external_task_id: Some(external_task_id.clone()),
570            context_id,
571        },
572    );
573
574    let state_clone = state.clone();
575    let task_id = task.id.clone();
576    let workspace_id = task.workspace_id.clone();
577    tokio::spawn(async move {
578        monitor_a2a_task_completion(
579            &state_clone,
580            &workspace_id,
581            &task_id,
582            &session_id,
583            &rpc_endpoint,
584            &external_task_id,
585            auth_headers,
586        )
587        .await;
588    });
589
590    Ok(())
591}
592
593#[derive(Debug)]
594struct AgentTriggerResult {
595    session_id: String,
596    transport: String,
597    external_task_id: Option<String>,
598    context_id: Option<String>,
599}
600
601fn apply_trigger_result(
602    task: &mut Task,
603    board: Option<&KanbanBoard>,
604    step: Option<&KanbanAutomationStep>,
605    result: AgentTriggerResult,
606) {
607    task.trigger_session_id = Some(result.session_id.clone());
608    if !task.session_ids.iter().any(|id| id == &result.session_id) {
609        task.session_ids.push(result.session_id.clone());
610    }
611
612    let column_name = board.and_then(|value| {
613        value.columns.iter().find_map(|column| {
614            (Some(column.id.as_str()) == task.column_id.as_deref()).then(|| column.name.clone())
615        })
616    });
617    let lane_session = TaskLaneSession {
618        session_id: result.session_id.clone(),
619        routa_agent_id: None,
620        column_id: task.column_id.clone(),
621        column_name,
622        step_id: step.map(|value| value.id.clone()),
623        step_index: None,
624        step_name: step
625            .and_then(|value| value.specialist_name.clone())
626            .or_else(|| task.assigned_specialist_name.clone()),
627        provider: task.assigned_provider.clone(),
628        role: task.assigned_role.clone(),
629        specialist_id: task.assigned_specialist_id.clone(),
630        specialist_name: task.assigned_specialist_name.clone(),
631        transport: Some(result.transport),
632        external_task_id: result.external_task_id,
633        context_id: result.context_id,
634        attempt: Some(1),
635        loop_mode: None,
636        completion_requirement: None,
637        objective: Some(task.objective.clone()),
638        last_activity_at: Some(Utc::now().to_rfc3339()),
639        recovered_from_session_id: None,
640        recovery_reason: None,
641        status: TaskLaneSessionStatus::Running,
642        started_at: Utc::now().to_rfc3339(),
643        completed_at: None,
644    };
645
646    if let Some(existing) = task
647        .lane_sessions
648        .iter_mut()
649        .find(|existing| existing.session_id == result.session_id)
650    {
651        *existing = lane_session;
652    } else {
653        task.lane_sessions.push(lane_session);
654    }
655}
656
657#[derive(Debug)]
658struct A2ATaskTerminalUpdate {
659    status: TaskLaneSessionStatus,
660    completed_at: String,
661    last_activity_at: String,
662    context_id: Option<String>,
663    error: Option<String>,
664}
665
666async fn monitor_a2a_task_completion(
667    state: &AppState,
668    workspace_id: &str,
669    task_id: &str,
670    session_id: &str,
671    rpc_endpoint: &str,
672    external_task_id: &str,
673    auth_headers: Option<HashMap<String, String>>,
674) {
675    let client = reqwest::Client::new();
676    let terminal = match wait_for_a2a_completion(
677        &client,
678        rpc_endpoint,
679        external_task_id,
680        auth_headers.as_ref(),
681    )
682    .await
683    {
684        Ok(terminal) => terminal,
685        Err(error) => {
686            let now = Utc::now().to_rfc3339();
687            let status = if error.contains("did not complete within") {
688                TaskLaneSessionStatus::TimedOut
689            } else {
690                TaskLaneSessionStatus::Failed
691            };
692            A2ATaskTerminalUpdate {
693                status,
694                completed_at: now.clone(),
695                last_activity_at: now,
696                context_id: None,
697                error: Some(error),
698            }
699        }
700    };
701
702    if let Err(error) =
703        reconcile_a2a_lane_session(state, task_id, session_id, external_task_id, terminal).await
704    {
705        tracing::warn!(
706            target: "routa_a2a",
707            workspace_id = %workspace_id,
708            task_id = %task_id,
709            session_id = %session_id,
710            external_task_id = %external_task_id,
711            error = %error,
712            "failed to persist A2A terminal state"
713        );
714        return;
715    }
716
717    emit_kanban_workspace_event(state, workspace_id, task_id).await;
718}
719
720async fn wait_for_a2a_completion(
721    client: &reqwest::Client,
722    rpc_endpoint: &str,
723    task_id: &str,
724    auth_headers: Option<&HashMap<String, String>>,
725) -> Result<A2ATaskTerminalUpdate, String> {
726    let started_at = Instant::now();
727
728    loop {
729        let terminal = get_a2a_task_update(client, rpc_endpoint, task_id, auth_headers).await?;
730        if let Some(terminal) = terminal {
731            return Ok(terminal);
732        }
733        if started_at.elapsed() >= A2A_MAX_WAIT {
734            return Err(format!(
735                "A2A task {task_id} did not complete within {}ms",
736                A2A_MAX_WAIT.as_millis()
737            ));
738        }
739        tokio::time::sleep(A2A_POLL_INTERVAL).await;
740    }
741}
742
743async fn get_a2a_task_update(
744    client: &reqwest::Client,
745    rpc_endpoint: &str,
746    task_id: &str,
747    auth_headers: Option<&HashMap<String, String>>,
748) -> Result<Option<A2ATaskTerminalUpdate>, String> {
749    let request_id = uuid::Uuid::new_v4().to_string();
750    let response = apply_a2a_auth_headers(
751        client
752            .post(rpc_endpoint)
753            .header(CONTENT_TYPE, "application/json")
754            .header(ACCEPT, "application/json")
755            .json(&json!({
756                "jsonrpc": "2.0",
757                "id": request_id,
758                "method": "GetTask",
759                "params": { "id": task_id }
760            })),
761        auth_headers,
762    )?
763    .send()
764    .await
765    .map_err(|error| format!("Failed to poll A2A task: {}", error))?;
766
767    if !response.status().is_success() {
768        return Err(format!(
769            "A2A GetTask failed with HTTP {}",
770            response.status().as_u16()
771        ));
772    }
773
774    let payload: Value = response
775        .json()
776        .await
777        .map_err(|error| format!("Failed to decode A2A task payload: {}", error))?;
778    if let Some(error) = payload.get("error") {
779        let message = error
780            .get("message")
781            .and_then(Value::as_str)
782            .unwrap_or("unknown A2A error");
783        return Err(format!("A2A JSON-RPC error: {}", message));
784    }
785
786    let task = payload
787        .get("result")
788        .and_then(|value| value.get("task"))
789        .ok_or_else(|| "A2A response missing result.task".to_string())?;
790    let state = task
791        .get("status")
792        .and_then(|value| value.get("state"))
793        .and_then(Value::as_str)
794        .ok_or_else(|| "A2A task missing status.state".to_string())?;
795    if !is_terminal_a2a_state(state) {
796        return Ok(None);
797    }
798
799    let timestamp = task
800        .get("status")
801        .and_then(|value| value.get("timestamp"))
802        .and_then(Value::as_str)
803        .map(ToOwned::to_owned)
804        .unwrap_or_else(|| Utc::now().to_rfc3339());
805    let context_id = task
806        .get("contextId")
807        .and_then(Value::as_str)
808        .map(ToOwned::to_owned);
809    let error = if state == "completed" {
810        None
811    } else {
812        Some(
813            extract_a2a_status_message(task)
814                .unwrap_or_else(|| format!("A2A task ended in state: {state}")),
815        )
816    };
817
818    Ok(Some(A2ATaskTerminalUpdate {
819        status: map_a2a_terminal_status(state),
820        completed_at: timestamp.clone(),
821        last_activity_at: timestamp,
822        context_id,
823        error,
824    }))
825}
826
827fn extract_a2a_status_message(task: &Value) -> Option<String> {
828    let parts = task
829        .get("status")
830        .and_then(|value| value.get("message"))
831        .and_then(|value| value.get("parts"))
832        .and_then(Value::as_array)?;
833    let text = parts
834        .iter()
835        .filter_map(|part| part.get("text").and_then(Value::as_str))
836        .map(str::trim)
837        .filter(|part| !part.is_empty())
838        .collect::<Vec<_>>()
839        .join(" ");
840    (!text.is_empty()).then_some(text)
841}
842
843fn is_terminal_a2a_state(state: &str) -> bool {
844    matches!(
845        state,
846        "completed" | "failed" | "canceled" | "rejected" | "auth-required"
847    )
848}
849
850fn map_a2a_terminal_status(state: &str) -> TaskLaneSessionStatus {
851    match state {
852        "completed" => TaskLaneSessionStatus::Completed,
853        _ => TaskLaneSessionStatus::Failed,
854    }
855}
856
857async fn reconcile_a2a_lane_session(
858    state: &AppState,
859    task_id: &str,
860    session_id: &str,
861    external_task_id: &str,
862    terminal: A2ATaskTerminalUpdate,
863) -> Result<(), String> {
864    let mut task = wait_for_task_persistence(state, task_id, session_id).await?;
865    let lane_session = task
866        .lane_sessions
867        .iter_mut()
868        .find(|session| session.session_id == session_id)
869        .ok_or_else(|| format!("Task {task_id} missing lane session {session_id}"))?;
870
871    lane_session.status = terminal.status;
872    lane_session.completed_at = Some(terminal.completed_at.clone());
873    lane_session.last_activity_at = Some(terminal.last_activity_at.clone());
874    if lane_session.external_task_id.is_none() {
875        lane_session.external_task_id = Some(external_task_id.to_string());
876    }
877    if terminal.context_id.is_some() {
878        lane_session.context_id = terminal.context_id.clone();
879    }
880
881    if task.trigger_session_id.as_deref() == Some(session_id) {
882        task.trigger_session_id = None;
883    }
884    task.last_sync_error = terminal.error;
885    task.updated_at = Utc::now();
886
887    state
888        .task_store
889        .save(&task)
890        .await
891        .map_err(|error| format!("Failed to save A2A task reconciliation: {}", error))
892}
893
894async fn wait_for_task_persistence(
895    state: &AppState,
896    task_id: &str,
897    session_id: &str,
898) -> Result<Task, String> {
899    for _ in 0..20 {
900        if let Some(task) = state
901            .task_store
902            .get(task_id)
903            .await
904            .map_err(|error| format!("Failed to load task {task_id}: {}", error))?
905        {
906            if task
907                .lane_sessions
908                .iter()
909                .any(|session| session.session_id == session_id)
910            {
911                return Ok(task);
912            }
913        }
914        tokio::time::sleep(Duration::from_millis(50)).await;
915    }
916
917    Err(format!(
918        "Task {task_id} did not persist lane session {session_id} before A2A reconciliation"
919    ))
920}
921
922async fn emit_kanban_workspace_event(state: &AppState, workspace_id: &str, task_id: &str) {
923    state
924        .event_bus
925        .emit(AgentEvent {
926            event_type: AgentEventType::WorkspaceUpdated,
927            agent_id: "kanban-a2a".to_string(),
928            workspace_id: workspace_id.to_string(),
929            data: serde_json::json!({
930                "scope": "kanban",
931                "entity": "task",
932                "action": "updated",
933                "resourceId": task_id,
934                "source": "system",
935            }),
936            timestamp: Utc::now(),
937        })
938        .await;
939}
940
941async fn load_task_board(state: &AppState, task: &Task) -> Result<Option<KanbanBoard>, String> {
942    if let Some(board_id) = task.board_id.as_deref() {
943        state
944            .kanban_store
945            .get(board_id)
946            .await
947            .map_err(|error| format!("Failed to load Kanban board for automation: {}", error))
948    } else {
949        Ok(None)
950    }
951}
952
953fn resolve_task_automation_step(
954    board: Option<&KanbanBoard>,
955    task: &Task,
956) -> Option<KanbanAutomationStep> {
957    board
958        .and_then(|value| {
959            value
960                .columns
961                .iter()
962                .find(|column| Some(column.id.as_str()) == task.column_id.as_deref())
963        })
964        .and_then(|column| column.automation.as_ref())
965        .filter(|automation| automation.enabled)
966        .and_then(|automation| automation.primary_step())
967}
968
969fn is_a2a_step(step: Option<&KanbanAutomationStep>) -> bool {
970    step.is_some_and(|value| {
971        matches!(value.transport, Some(KanbanTransport::A2a)) || value.agent_card_url.is_some()
972    })
973}
974
975fn resolve_a2a_auth_headers(
976    auth_config_id: Option<&str>,
977) -> Result<Option<HashMap<String, String>>, String> {
978    let Some(auth_config_id) = auth_config_id
979        .map(str::trim)
980        .filter(|value| !value.is_empty())
981    else {
982        return Ok(None);
983    };
984    let raw = std::env::var(A2A_AUTH_CONFIGS_ENV).unwrap_or_default();
985    if raw.trim().is_empty() {
986        return Err(format!(
987            "A2A auth config \"{}\" was not found in {}.",
988            auth_config_id, A2A_AUTH_CONFIGS_ENV
989        ));
990    }
991
992    let parsed: Value = serde_json::from_str(&raw)
993        .map_err(|error| format!("Invalid {} JSON: {}", A2A_AUTH_CONFIGS_ENV, error))?;
994    let config = parsed.get(auth_config_id).ok_or_else(|| {
995        format!(
996            "A2A auth config \"{}\" was not found in {}.",
997            auth_config_id, A2A_AUTH_CONFIGS_ENV
998        )
999    })?;
1000    let headers = config.get("headers").unwrap_or(config);
1001    let headers_obj = headers.as_object().ok_or_else(|| {
1002        format!(
1003            "{}.{} must be a header map or contain a string header map in \"headers\".",
1004            A2A_AUTH_CONFIGS_ENV, auth_config_id
1005        )
1006    })?;
1007
1008    let mut resolved = HashMap::new();
1009    for (name, value) in headers_obj {
1010        let value = value.as_str().ok_or_else(|| {
1011            format!(
1012                "{}.{} header {} must be a string.",
1013                A2A_AUTH_CONFIGS_ENV, auth_config_id, name
1014            )
1015        })?;
1016        resolved.insert(name.clone(), value.to_string());
1017    }
1018
1019    Ok(Some(resolved))
1020}
1021
1022fn apply_a2a_auth_headers(
1023    mut request: reqwest::RequestBuilder,
1024    auth_headers: Option<&HashMap<String, String>>,
1025) -> Result<reqwest::RequestBuilder, String> {
1026    if let Some(auth_headers) = auth_headers {
1027        for (name, value) in auth_headers {
1028            let header_name = HeaderName::try_from(name.as_str())
1029                .map_err(|error| format!("Invalid A2A auth header name {}: {}", name, error))?;
1030            let header_value = HeaderValue::from_str(value).map_err(|error| {
1031                format!(
1032                    "Invalid A2A auth header value for {}: {}",
1033                    header_name.as_str(),
1034                    error
1035                )
1036            })?;
1037            request = request.header(header_name, header_value);
1038        }
1039    }
1040
1041    Ok(request)
1042}
1043
1044async fn resolve_a2a_rpc_endpoint(
1045    client: &reqwest::Client,
1046    url: &str,
1047    auth_headers: Option<&HashMap<String, String>>,
1048) -> Result<String, String> {
1049    if url.ends_with(".json") || url.ends_with("/agent-card") || url.ends_with("/card") {
1050        let response = apply_a2a_auth_headers(
1051            client.get(url).header(ACCEPT, "application/json"),
1052            auth_headers,
1053        )?
1054        .send()
1055        .await
1056        .map_err(|error| format!("Failed to fetch A2A agent card: {}", error))?;
1057        if !response.status().is_success() {
1058            return Err(format!(
1059                "A2A agent card fetch failed with HTTP {}",
1060                response.status().as_u16()
1061            ));
1062        }
1063        let card: Value = response
1064            .json()
1065            .await
1066            .map_err(|error| format!("Failed to decode A2A agent card: {}", error))?;
1067        let rpc_url = card
1068            .get("url")
1069            .and_then(Value::as_str)
1070            .ok_or_else(|| "A2A agent card missing url".to_string())?;
1071        absolutize_url(url, rpc_url)
1072    } else {
1073        Ok(url.to_string())
1074    }
1075}
1076
1077fn absolutize_url(base_url: &str, maybe_relative: &str) -> Result<String, String> {
1078    if maybe_relative.starts_with("http://") || maybe_relative.starts_with("https://") {
1079        return Ok(maybe_relative.to_string());
1080    }
1081
1082    let base = reqwest::Url::parse(base_url)
1083        .map_err(|error| format!("Invalid base A2A URL {}: {}", base_url, error))?;
1084    base.join(maybe_relative)
1085        .map(|url| url.to_string())
1086        .map_err(|error| format!("Invalid relative A2A URL {}: {}", maybe_relative, error))
1087}