Skip to main content

routa_server/api/
tasks_automation.rs

1use chrono::Utc;
2use reqwest::header::{HeaderName, HeaderValue, ACCEPT, CONTENT_TYPE};
3use routa_core::events::{AgentEvent, AgentEventType};
4use routa_core::models::kanban::{KanbanAutomationStep, KanbanBoard, KanbanTransport};
5use serde_json::{json, Value};
6use std::collections::HashMap;
7use std::time::{Duration, Instant};
8
9use crate::error::ServerError;
10use crate::models::task::{Task, TaskLaneSession, TaskLaneSessionStatus};
11use crate::state::AppState;
12use routa_core::store::acp_session_store::CreateAcpSessionParams;
13
14const A2A_POLL_INTERVAL: Duration = Duration::from_secs(1);
15const A2A_MAX_WAIT: Duration = Duration::from_secs(300);
16const A2A_AUTH_CONFIGS_ENV: &str = "ROUTA_A2A_AUTH_CONFIGS";
17
18pub async fn resolve_codebase(
19    state: &AppState,
20    workspace_id: &str,
21    repo_path: Option<&str>,
22) -> Result<Option<crate::models::codebase::Codebase>, ServerError> {
23    if let Some(path) = repo_path {
24        state
25            .codebase_store
26            .find_by_repo_path(workspace_id, path)
27            .await
28    } else {
29        state.codebase_store.get_default(workspace_id).await
30    }
31}
32
33pub async fn auto_create_worktree(
34    state: &AppState,
35    task: &crate::models::task::Task,
36    codebase: &crate::models::codebase::Codebase,
37) -> Result<String, String> {
38    let short_id = &task.id[..task.id.len().min(8)];
39    let label = short_id.to_string();
40    let branch = format!("issue/{}", short_id);
41
42    let workspace = state
43        .workspace_store
44        .get(&task.workspace_id)
45        .await
46        .ok()
47        .flatten();
48    let worktree_root = workspace
49        .as_ref()
50        .and_then(|ws| ws.metadata.get("worktreeRoot"))
51        .filter(|s| !s.trim().is_empty())
52        .map(std::path::PathBuf::from)
53        .unwrap_or_else(|| crate::git::get_default_workspace_worktree_root(&task.workspace_id));
54
55    let codebase_label = codebase
56        .label
57        .as_ref()
58        .map(|l| crate::git::branch_to_safe_dir_name(l))
59        .unwrap_or_else(|| crate::git::branch_to_safe_dir_name(&codebase.id));
60
61    let worktree_path = worktree_root
62        .join(&codebase_label)
63        .join(crate::git::branch_to_safe_dir_name(&label));
64
65    if let Some(parent) = worktree_path.parent() {
66        std::fs::create_dir_all(parent)
67            .map_err(|e| format!("Failed to create worktree parent dir: {}", e))?;
68    }
69
70    let worktree_path_str = worktree_path.to_string_lossy().to_string();
71    let base_branch = codebase
72        .branch
73        .clone()
74        .unwrap_or_else(|| "main".to_string());
75
76    let worktree = crate::models::worktree::Worktree::new(
77        uuid::Uuid::new_v4().to_string(),
78        codebase.id.clone(),
79        task.workspace_id.clone(),
80        worktree_path_str.clone(),
81        branch.clone(),
82        base_branch.clone(),
83        Some(label),
84    );
85    state
86        .worktree_store
87        .save(&worktree)
88        .await
89        .map_err(|e| format!("Failed to save worktree: {}", e))?;
90
91    let _ = crate::git::worktree_prune(&codebase.repo_path);
92    crate::git::worktree_add(
93        &codebase.repo_path,
94        &worktree_path_str,
95        &branch,
96        &base_branch,
97        false,
98    )
99    .map_err(|e| format!("git worktree add failed: {}", e))?;
100
101    Ok(worktree.id)
102}
103
104pub async fn trigger_assigned_task_agent(
105    state: &AppState,
106    task: &mut Task,
107    cwd: Option<&str>,
108    branch: Option<&str>,
109) -> Result<(), String> {
110    let board = load_task_board(state, task).await?;
111    let step = resolve_task_automation_step(board.as_ref(), task);
112    if is_a2a_step(step.as_ref()) {
113        return trigger_assigned_task_a2a_agent(state, task, board.as_ref(), step.as_ref()).await;
114    }
115
116    trigger_assigned_task_acp_agent(state, task, board.as_ref(), step.as_ref(), cwd, branch).await
117}
118
119fn build_task_prompt(
120    task: &Task,
121    board_id: Option<&str>,
122    next_column_id: Option<&str>,
123    available_columns: &str,
124) -> String {
125    let labels = if task.labels.is_empty() {
126        "Labels: none".to_string()
127    } else {
128        format!("Labels: {}", task.labels.join(", "))
129    };
130    let lane_id = task.column_id.as_deref().unwrap_or("backlog");
131    let lane_guidance = match lane_id {
132        "dev" => vec![
133            "You are in the `dev` lane. This lane may implement the requested change, but you must keep work scoped to the current card.".to_string(),
134            "Use `routa-coordination_update_card` to record concrete progress on this card before or after meaningful implementation steps.".to_string(),
135            "When implementation for this lane is complete, use `routa-coordination_move_card` to advance the same card.".to_string(),
136        ],
137        "todo" => vec![
138            "You are in the `todo` lane. This lane does not perform full implementation work.".to_string(),
139            "Only clarify the card, update its progress or status, and move the same card forward when the lane is complete.".to_string(),
140            "Do not edit files, do not inspect the whole repository, and do not run browser tests or environment diagnostics in this lane.".to_string(),
141        ],
142        _ => vec![
143            format!("You are in the `{lane_id}` lane. Keep work scoped to the current card and this lane only."),
144        ],
145    };
146    let mut sections = vec![
147        format!("You are assigned to Kanban task: {}", task.title),
148        String::new(),
149        "## Context".to_string(),
150        String::new(),
151        "**IMPORTANT**: You are working in Kanban lane automation for exactly one existing card.".to_string(),
152        "Only operate on the current card. Do not create a new task, do not switch to a different card, and do not broaden scope.".to_string(),
153        "Use the exact MCP tool names exposed by the provider. In OpenCode, prefer `routa-coordination_update_card` and `routa-coordination_move_card`.".to_string(),
154        "Do NOT use `gh issue create`, browser automation, Playwright, repo-wide debugging, API exploration, or unrelated codebase research unless the card objective explicitly requires it.".to_string(),
155        String::new(),
156        "## Task Details".to_string(),
157        String::new(),
158        format!("**Card ID:** {}", task.id),
159        format!(
160            "**Priority:** {}",
161            task.priority.as_ref().map(|value| value.as_str()).unwrap_or("medium")
162        ),
163        board_id
164            .map(|value| format!("**Board ID:** {}", value))
165            .unwrap_or_else(|| "**Board ID:** unavailable".to_string()),
166        format!("**Current Lane:** {}", lane_id),
167        next_column_id
168            .map(|value| format!("**Next Column ID:** {}", value))
169            .unwrap_or_else(|| "**Next Column ID:** unavailable".to_string()),
170        labels,
171        task.github_url
172            .as_ref()
173            .map(|url| format!("**GitHub Issue:** {}", url))
174            .unwrap_or_else(|| "**GitHub Issue:** local-only".to_string()),
175        String::new(),
176        "## Objective".to_string(),
177        String::new(),
178        task.objective.clone(),
179        String::new(),
180        "## Board Columns".to_string(),
181        String::new(),
182        available_columns.to_string(),
183        String::new(),
184        "## Lane Guidance".to_string(),
185        String::new(),
186        lane_guidance.join("\n"),
187        String::new(),
188    ];
189
190    if let Some(test_cases) = task.test_cases.as_ref().filter(|value| !value.is_empty()) {
191        sections.push("## Test Cases".to_string());
192        sections.push(String::new());
193        sections.push(
194            test_cases
195                .iter()
196                .map(|value| format!("- {}", value))
197                .collect::<Vec<_>>()
198                .join("\n"),
199        );
200        sections.push(String::new());
201    }
202
203    sections.extend([
204        "## Available MCP Tools".to_string(),
205        String::new(),
206        "Use the exact MCP tool names exposed in this session. For OpenCode, the important ones are:".to_string(),
207        String::new(),
208        format!(
209            "- **routa-coordination_update_card**: Update this card's title, description, priority, or labels. Use cardId: \"{}\"",
210            task.id
211        ),
212        format!(
213            "- **routa-coordination_move_card**: Move this same card to targetColumnId \"{}\" when the current lane is complete.",
214            next_column_id.unwrap_or("the exact next column id listed above")
215        ),
216        String::new(),
217        "## Instructions".to_string(),
218        String::new(),
219        "1. Start work for the current lane immediately.".to_string(),
220        "2. Keep changes focused on this card only.".to_string(),
221        "3. Use the exact tool name `routa-coordination_update_card` to record progress on this card.".to_string(),
222        format!(
223            "4. Use the exact tool name `routa-coordination_move_card` with targetColumnId `{}` only when the current lane is complete.",
224            next_column_id.unwrap_or("the exact next column id listed above")
225        ),
226        "5. Do not guess board ids or column ids. Use the Board ID and Board Columns listed above.".to_string(),
227        "6. If blocked, update this same card with the blocking reason instead of exploring side quests.".to_string(),
228        "7. Treat lane guidance as stricter than the general card objective when they conflict.".to_string(),
229        "8. Do not run browser tests or environment diagnostics unless the card explicitly asks for them.".to_string(),
230    ]);
231
232    sections.join("\n")
233}
234
235async fn trigger_assigned_task_acp_agent(
236    state: &AppState,
237    task: &mut Task,
238    board: Option<&KanbanBoard>,
239    step: Option<&KanbanAutomationStep>,
240    cwd: Option<&str>,
241    branch: Option<&str>,
242) -> Result<(), String> {
243    let provider = task
244        .assigned_provider
245        .clone()
246        .unwrap_or_else(|| "opencode".to_string());
247    let role = task
248        .assigned_role
249        .clone()
250        .unwrap_or_else(|| "CRAFTER".to_string())
251        .to_uppercase();
252    let session_id = uuid::Uuid::new_v4().to_string();
253    let cwd = cwd
254        .map(|value| value.to_string())
255        .or_else(|| {
256            std::env::current_dir()
257                .ok()
258                .map(|path| path.to_string_lossy().to_string())
259        })
260        .unwrap_or_else(|| ".".to_string());
261
262    state
263        .acp_manager
264        .create_session(
265            session_id.clone(),
266            cwd.clone(),
267            task.workspace_id.clone(),
268            Some(provider.clone()),
269            Some(role.clone()),
270            None,
271            None,
272            Some("full".to_string()),
273            Some("kanban-planning".to_string()),
274        )
275        .await
276        .map_err(|error| format!("Failed to create ACP session: {}", error))?;
277
278    state
279        .acp_session_store
280        .create(CreateAcpSessionParams {
281            id: &session_id,
282            cwd: &cwd,
283            branch: None,
284            workspace_id: &task.workspace_id,
285            provider: Some(provider.as_str()),
286            role: Some(role.as_str()),
287            custom_command: None,
288            custom_args: None,
289            parent_session_id: None,
290        })
291        .await
292        .map_err(|error| format!("Failed to persist ACP session: {}", error))?;
293
294    let mut ordered_columns = board.map(|value| value.columns.clone()).unwrap_or_default();
295    ordered_columns.sort_by_key(|column| column.position);
296    let next_column_id = ordered_columns
297        .iter()
298        .position(|column| Some(column.id.as_str()) == task.column_id.as_deref())
299        .and_then(|index| ordered_columns.get(index + 1))
300        .map(|column| column.id.clone());
301    let available_columns = if ordered_columns.is_empty() {
302        "- unavailable".to_string()
303    } else {
304        ordered_columns
305            .iter()
306            .map(|column| {
307                format!(
308                    "- {} ({}) stage={} position={}",
309                    column.id, column.name, column.stage, column.position
310                )
311            })
312            .collect::<Vec<_>>()
313            .join("\n")
314    };
315    let prompt = build_task_prompt(
316        task,
317        board
318            .map(|value| value.id.as_str())
319            .or(task.board_id.as_deref()),
320        next_column_id.as_deref(),
321        &available_columns,
322    );
323    let state_clone = state.clone();
324    let session_id_clone = session_id.clone();
325    let task_workspace = task.workspace_id.clone();
326    let provider_clone = provider.clone();
327    let cwd_clone = cwd.clone();
328    let _branch = branch.map(|value| value.to_string());
329
330    if let Err(error) = state
331        .acp_session_store
332        .set_first_prompt_sent(&session_id)
333        .await
334    {
335        tracing::error!(
336            target: "routa_kanban_prompt",
337            session_id = %session_id,
338            workspace_id = %task.workspace_id,
339            error = %error,
340            "kanban auto prompt failed to mark prompt dispatched"
341        );
342    } else {
343        tracing::info!(
344            target: "routa_kanban_prompt",
345            session_id = %session_id,
346            workspace_id = %task.workspace_id,
347            provider = %provider,
348            "kanban auto prompt marked prompt dispatched"
349        );
350    }
351
352    tracing::info!(
353        target: "routa_kanban_prompt",
354        session_id = %session_id_clone,
355        workspace_id = %task_workspace,
356        provider = %provider_clone,
357        cwd = %cwd_clone,
358        "kanban auto prompt scheduled"
359    );
360
361    tokio::spawn(async move {
362        tracing::info!(
363            target: "routa_kanban_prompt",
364            session_id = %session_id_clone,
365            workspace_id = %task_workspace,
366            provider = %provider_clone,
367            cwd = %cwd_clone,
368            "kanban auto prompt start"
369        );
370        if let Err(error) = state_clone
371            .acp_manager
372            .prompt(&session_id_clone, &prompt)
373            .await
374        {
375            tracing::error!(
376                "[kanban] Failed to auto-prompt ACP task session {} in workspace {} with provider {} at {}: {}",
377                session_id_clone,
378                task_workspace,
379                provider_clone,
380                cwd_clone,
381                error
382            );
383            return;
384        }
385
386        tracing::info!(
387            target: "routa_kanban_prompt",
388            session_id = %session_id_clone,
389            workspace_id = %task_workspace,
390            provider = %provider_clone,
391            "kanban auto prompt success"
392        );
393        if let Some(history) = state_clone
394            .acp_manager
395            .get_session_history(&session_id_clone)
396            .await
397        {
398            if let Err(error) = state_clone
399                .acp_session_store
400                .save_history(&session_id_clone, &history)
401                .await
402            {
403                tracing::error!(
404                    target: "routa_kanban_prompt",
405                    session_id = %session_id_clone,
406                    workspace_id = %task_workspace,
407                    error = %error,
408                    "kanban auto prompt failed to persist history"
409                );
410            } else {
411                tracing::info!(
412                    target: "routa_kanban_prompt",
413                    session_id = %session_id_clone,
414                    workspace_id = %task_workspace,
415                    history_len = history.len(),
416                    "kanban auto prompt persisted history"
417                );
418            }
419        }
420    });
421
422    apply_trigger_result(
423        task,
424        board,
425        step,
426        AgentTriggerResult {
427            session_id,
428            transport: "acp".to_string(),
429            external_task_id: None,
430            context_id: None,
431        },
432    );
433
434    Ok(())
435}
436
437async fn trigger_assigned_task_a2a_agent(
438    state: &AppState,
439    task: &mut Task,
440    board: Option<&KanbanBoard>,
441    step: Option<&KanbanAutomationStep>,
442) -> Result<(), String> {
443    let step = step.ok_or_else(|| "A2A automation requires a resolved column step".to_string())?;
444    let agent_card_url = step
445        .agent_card_url
446        .as_deref()
447        .ok_or_else(|| "A2A automation requires agentCardUrl".to_string())?;
448    let auth_headers = resolve_a2a_auth_headers(step.auth_config_id.as_deref())?;
449
450    let mut ordered_columns = board.map(|value| value.columns.clone()).unwrap_or_default();
451    ordered_columns.sort_by_key(|column| column.position);
452    let next_column_id = ordered_columns
453        .iter()
454        .position(|column| Some(column.id.as_str()) == task.column_id.as_deref())
455        .and_then(|index| ordered_columns.get(index + 1))
456        .map(|column| column.id.clone());
457    let available_columns = if ordered_columns.is_empty() {
458        "- unavailable".to_string()
459    } else {
460        ordered_columns
461            .iter()
462            .map(|column| {
463                format!(
464                    "- {} ({}) stage={} position={}",
465                    column.id, column.name, column.stage, column.position
466                )
467            })
468            .collect::<Vec<_>>()
469            .join("\n")
470    };
471    let prompt = build_task_prompt(
472        task,
473        board
474            .map(|value| value.id.as_str())
475            .or(task.board_id.as_deref()),
476        next_column_id.as_deref(),
477        &available_columns,
478    );
479
480    let client = reqwest::Client::new();
481    let rpc_endpoint =
482        resolve_a2a_rpc_endpoint(&client, agent_card_url, auth_headers.as_ref()).await?;
483    let request_id = uuid::Uuid::new_v4().to_string();
484    let message_id = uuid::Uuid::new_v4().to_string();
485    let response = apply_a2a_auth_headers(
486        client
487            .post(&rpc_endpoint)
488            .header(CONTENT_TYPE, "application/json")
489            .header(ACCEPT, "application/json")
490            .json(&json!({
491                "jsonrpc": "2.0",
492                "id": request_id,
493                "method": "SendMessage",
494                "params": {
495                    "message": {
496                        "messageId": message_id,
497                        "role": "user",
498                        "parts": [
499                            { "text": prompt }
500                        ]
501                    },
502                    "metadata": {
503                        "workspaceId": task.workspace_id,
504                        "taskId": task.id,
505                        "boardId": task.board_id,
506                        "columnId": task.column_id,
507                        "stepId": step.id,
508                        "skillId": step.skill_id,
509                        "authConfigId": step.auth_config_id,
510                        "role": task.assigned_role,
511                    }
512                }
513            })),
514        auth_headers.as_ref(),
515    )?
516    .send()
517    .await
518    .map_err(|error| format!("Failed to send A2A request: {}", error))?;
519
520    if !response.status().is_success() {
521        return Err(format!(
522            "A2A request failed with HTTP {}",
523            response.status().as_u16()
524        ));
525    }
526
527    let payload: Value = response
528        .json()
529        .await
530        .map_err(|error| format!("Failed to decode A2A response: {}", error))?;
531    if let Some(error) = payload.get("error") {
532        let message = error
533            .get("message")
534            .and_then(Value::as_str)
535            .unwrap_or("unknown A2A error");
536        return Err(format!("A2A JSON-RPC error: {}", message));
537    }
538
539    let task_result = payload
540        .get("result")
541        .and_then(|value| value.get("task"))
542        .ok_or_else(|| "A2A response missing result.task".to_string())?;
543    let external_task_id = task_result
544        .get("id")
545        .and_then(Value::as_str)
546        .ok_or_else(|| "A2A response missing task.id".to_string())?
547        .to_string();
548    let context_id = task_result
549        .get("contextId")
550        .and_then(Value::as_str)
551        .map(ToOwned::to_owned);
552    let session_id = format!("a2a-{}", uuid::Uuid::new_v4());
553
554    apply_trigger_result(
555        task,
556        board,
557        Some(step),
558        AgentTriggerResult {
559            session_id: session_id.clone(),
560            transport: "a2a".to_string(),
561            external_task_id: Some(external_task_id.clone()),
562            context_id,
563        },
564    );
565
566    let state_clone = state.clone();
567    let task_id = task.id.clone();
568    let workspace_id = task.workspace_id.clone();
569    tokio::spawn(async move {
570        monitor_a2a_task_completion(
571            &state_clone,
572            &workspace_id,
573            &task_id,
574            &session_id,
575            &rpc_endpoint,
576            &external_task_id,
577            auth_headers,
578        )
579        .await;
580    });
581
582    Ok(())
583}
584
585#[derive(Debug)]
586struct AgentTriggerResult {
587    session_id: String,
588    transport: String,
589    external_task_id: Option<String>,
590    context_id: Option<String>,
591}
592
593fn apply_trigger_result(
594    task: &mut Task,
595    board: Option<&KanbanBoard>,
596    step: Option<&KanbanAutomationStep>,
597    result: AgentTriggerResult,
598) {
599    task.trigger_session_id = Some(result.session_id.clone());
600    if !task.session_ids.iter().any(|id| id == &result.session_id) {
601        task.session_ids.push(result.session_id.clone());
602    }
603
604    let column_name = board.and_then(|value| {
605        value.columns.iter().find_map(|column| {
606            (Some(column.id.as_str()) == task.column_id.as_deref()).then(|| column.name.clone())
607        })
608    });
609    let lane_session = TaskLaneSession {
610        session_id: result.session_id.clone(),
611        routa_agent_id: None,
612        column_id: task.column_id.clone(),
613        column_name,
614        step_id: step.map(|value| value.id.clone()),
615        step_index: None,
616        step_name: step
617            .and_then(|value| value.specialist_name.clone())
618            .or_else(|| task.assigned_specialist_name.clone()),
619        provider: task.assigned_provider.clone(),
620        role: task.assigned_role.clone(),
621        specialist_id: task.assigned_specialist_id.clone(),
622        specialist_name: task.assigned_specialist_name.clone(),
623        transport: Some(result.transport),
624        external_task_id: result.external_task_id,
625        context_id: result.context_id,
626        attempt: Some(1),
627        loop_mode: None,
628        completion_requirement: None,
629        objective: Some(task.objective.clone()),
630        last_activity_at: Some(Utc::now().to_rfc3339()),
631        recovered_from_session_id: None,
632        recovery_reason: None,
633        status: TaskLaneSessionStatus::Running,
634        started_at: Utc::now().to_rfc3339(),
635        completed_at: None,
636    };
637
638    if let Some(existing) = task
639        .lane_sessions
640        .iter_mut()
641        .find(|existing| existing.session_id == result.session_id)
642    {
643        *existing = lane_session;
644    } else {
645        task.lane_sessions.push(lane_session);
646    }
647}
648
649#[derive(Debug)]
650struct A2ATaskTerminalUpdate {
651    status: TaskLaneSessionStatus,
652    completed_at: String,
653    last_activity_at: String,
654    context_id: Option<String>,
655    error: Option<String>,
656}
657
658async fn monitor_a2a_task_completion(
659    state: &AppState,
660    workspace_id: &str,
661    task_id: &str,
662    session_id: &str,
663    rpc_endpoint: &str,
664    external_task_id: &str,
665    auth_headers: Option<HashMap<String, String>>,
666) {
667    let client = reqwest::Client::new();
668    let terminal = match wait_for_a2a_completion(
669        &client,
670        rpc_endpoint,
671        external_task_id,
672        auth_headers.as_ref(),
673    )
674    .await
675    {
676        Ok(terminal) => terminal,
677        Err(error) => {
678            let now = Utc::now().to_rfc3339();
679            let status = if error.contains("did not complete within") {
680                TaskLaneSessionStatus::TimedOut
681            } else {
682                TaskLaneSessionStatus::Failed
683            };
684            A2ATaskTerminalUpdate {
685                status,
686                completed_at: now.clone(),
687                last_activity_at: now,
688                context_id: None,
689                error: Some(error),
690            }
691        }
692    };
693
694    if let Err(error) =
695        reconcile_a2a_lane_session(state, task_id, session_id, external_task_id, terminal).await
696    {
697        tracing::warn!(
698            target: "routa_a2a",
699            workspace_id = %workspace_id,
700            task_id = %task_id,
701            session_id = %session_id,
702            external_task_id = %external_task_id,
703            error = %error,
704            "failed to persist A2A terminal state"
705        );
706        return;
707    }
708
709    emit_kanban_workspace_event(state, workspace_id, task_id).await;
710}
711
712async fn wait_for_a2a_completion(
713    client: &reqwest::Client,
714    rpc_endpoint: &str,
715    task_id: &str,
716    auth_headers: Option<&HashMap<String, String>>,
717) -> Result<A2ATaskTerminalUpdate, String> {
718    let started_at = Instant::now();
719
720    loop {
721        let terminal = get_a2a_task_update(client, rpc_endpoint, task_id, auth_headers).await?;
722        if let Some(terminal) = terminal {
723            return Ok(terminal);
724        }
725        if started_at.elapsed() >= A2A_MAX_WAIT {
726            return Err(format!(
727                "A2A task {task_id} did not complete within {}ms",
728                A2A_MAX_WAIT.as_millis()
729            ));
730        }
731        tokio::time::sleep(A2A_POLL_INTERVAL).await;
732    }
733}
734
735async fn get_a2a_task_update(
736    client: &reqwest::Client,
737    rpc_endpoint: &str,
738    task_id: &str,
739    auth_headers: Option<&HashMap<String, String>>,
740) -> Result<Option<A2ATaskTerminalUpdate>, String> {
741    let request_id = uuid::Uuid::new_v4().to_string();
742    let response = apply_a2a_auth_headers(
743        client
744            .post(rpc_endpoint)
745            .header(CONTENT_TYPE, "application/json")
746            .header(ACCEPT, "application/json")
747            .json(&json!({
748                "jsonrpc": "2.0",
749                "id": request_id,
750                "method": "GetTask",
751                "params": { "id": task_id }
752            })),
753        auth_headers,
754    )?
755    .send()
756    .await
757    .map_err(|error| format!("Failed to poll A2A task: {}", error))?;
758
759    if !response.status().is_success() {
760        return Err(format!(
761            "A2A GetTask failed with HTTP {}",
762            response.status().as_u16()
763        ));
764    }
765
766    let payload: Value = response
767        .json()
768        .await
769        .map_err(|error| format!("Failed to decode A2A task payload: {}", error))?;
770    if let Some(error) = payload.get("error") {
771        let message = error
772            .get("message")
773            .and_then(Value::as_str)
774            .unwrap_or("unknown A2A error");
775        return Err(format!("A2A JSON-RPC error: {}", message));
776    }
777
778    let task = payload
779        .get("result")
780        .and_then(|value| value.get("task"))
781        .ok_or_else(|| "A2A response missing result.task".to_string())?;
782    let state = task
783        .get("status")
784        .and_then(|value| value.get("state"))
785        .and_then(Value::as_str)
786        .ok_or_else(|| "A2A task missing status.state".to_string())?;
787    if !is_terminal_a2a_state(state) {
788        return Ok(None);
789    }
790
791    let timestamp = task
792        .get("status")
793        .and_then(|value| value.get("timestamp"))
794        .and_then(Value::as_str)
795        .map(ToOwned::to_owned)
796        .unwrap_or_else(|| Utc::now().to_rfc3339());
797    let context_id = task
798        .get("contextId")
799        .and_then(Value::as_str)
800        .map(ToOwned::to_owned);
801    let error = if state == "completed" {
802        None
803    } else {
804        Some(
805            extract_a2a_status_message(task)
806                .unwrap_or_else(|| format!("A2A task ended in state: {state}")),
807        )
808    };
809
810    Ok(Some(A2ATaskTerminalUpdate {
811        status: map_a2a_terminal_status(state),
812        completed_at: timestamp.clone(),
813        last_activity_at: timestamp,
814        context_id,
815        error,
816    }))
817}
818
819fn extract_a2a_status_message(task: &Value) -> Option<String> {
820    let parts = task
821        .get("status")
822        .and_then(|value| value.get("message"))
823        .and_then(|value| value.get("parts"))
824        .and_then(Value::as_array)?;
825    let text = parts
826        .iter()
827        .filter_map(|part| part.get("text").and_then(Value::as_str))
828        .map(str::trim)
829        .filter(|part| !part.is_empty())
830        .collect::<Vec<_>>()
831        .join(" ");
832    (!text.is_empty()).then_some(text)
833}
834
835fn is_terminal_a2a_state(state: &str) -> bool {
836    matches!(
837        state,
838        "completed" | "failed" | "canceled" | "rejected" | "auth-required"
839    )
840}
841
842fn map_a2a_terminal_status(state: &str) -> TaskLaneSessionStatus {
843    match state {
844        "completed" => TaskLaneSessionStatus::Completed,
845        _ => TaskLaneSessionStatus::Failed,
846    }
847}
848
849async fn reconcile_a2a_lane_session(
850    state: &AppState,
851    task_id: &str,
852    session_id: &str,
853    external_task_id: &str,
854    terminal: A2ATaskTerminalUpdate,
855) -> Result<(), String> {
856    let mut task = wait_for_task_persistence(state, task_id, session_id).await?;
857    let lane_session = task
858        .lane_sessions
859        .iter_mut()
860        .find(|session| session.session_id == session_id)
861        .ok_or_else(|| format!("Task {task_id} missing lane session {session_id}"))?;
862
863    lane_session.status = terminal.status;
864    lane_session.completed_at = Some(terminal.completed_at.clone());
865    lane_session.last_activity_at = Some(terminal.last_activity_at.clone());
866    if lane_session.external_task_id.is_none() {
867        lane_session.external_task_id = Some(external_task_id.to_string());
868    }
869    if terminal.context_id.is_some() {
870        lane_session.context_id = terminal.context_id.clone();
871    }
872
873    if task.trigger_session_id.as_deref() == Some(session_id) {
874        task.trigger_session_id = None;
875    }
876    task.last_sync_error = terminal.error;
877    task.updated_at = Utc::now();
878
879    state
880        .task_store
881        .save(&task)
882        .await
883        .map_err(|error| format!("Failed to save A2A task reconciliation: {}", error))
884}
885
886async fn wait_for_task_persistence(
887    state: &AppState,
888    task_id: &str,
889    session_id: &str,
890) -> Result<Task, String> {
891    for _ in 0..20 {
892        if let Some(task) = state
893            .task_store
894            .get(task_id)
895            .await
896            .map_err(|error| format!("Failed to load task {task_id}: {}", error))?
897        {
898            if task
899                .lane_sessions
900                .iter()
901                .any(|session| session.session_id == session_id)
902            {
903                return Ok(task);
904            }
905        }
906        tokio::time::sleep(Duration::from_millis(50)).await;
907    }
908
909    Err(format!(
910        "Task {task_id} did not persist lane session {session_id} before A2A reconciliation"
911    ))
912}
913
914async fn emit_kanban_workspace_event(state: &AppState, workspace_id: &str, task_id: &str) {
915    state
916        .event_bus
917        .emit(AgentEvent {
918            event_type: AgentEventType::WorkspaceUpdated,
919            agent_id: "kanban-a2a".to_string(),
920            workspace_id: workspace_id.to_string(),
921            data: serde_json::json!({
922                "scope": "kanban",
923                "entity": "task",
924                "action": "updated",
925                "resourceId": task_id,
926                "source": "system",
927            }),
928            timestamp: Utc::now(),
929        })
930        .await;
931}
932
933async fn load_task_board(state: &AppState, task: &Task) -> Result<Option<KanbanBoard>, String> {
934    if let Some(board_id) = task.board_id.as_deref() {
935        state
936            .kanban_store
937            .get(board_id)
938            .await
939            .map_err(|error| format!("Failed to load Kanban board for automation: {}", error))
940    } else {
941        Ok(None)
942    }
943}
944
945fn resolve_task_automation_step(
946    board: Option<&KanbanBoard>,
947    task: &Task,
948) -> Option<KanbanAutomationStep> {
949    board
950        .and_then(|value| {
951            value
952                .columns
953                .iter()
954                .find(|column| Some(column.id.as_str()) == task.column_id.as_deref())
955        })
956        .and_then(|column| column.automation.as_ref())
957        .filter(|automation| automation.enabled)
958        .and_then(|automation| automation.primary_step())
959}
960
961fn is_a2a_step(step: Option<&KanbanAutomationStep>) -> bool {
962    step.is_some_and(|value| {
963        matches!(value.transport, Some(KanbanTransport::A2a)) || value.agent_card_url.is_some()
964    })
965}
966
967fn resolve_a2a_auth_headers(
968    auth_config_id: Option<&str>,
969) -> Result<Option<HashMap<String, String>>, String> {
970    let Some(auth_config_id) = auth_config_id
971        .map(str::trim)
972        .filter(|value| !value.is_empty())
973    else {
974        return Ok(None);
975    };
976    let raw = std::env::var(A2A_AUTH_CONFIGS_ENV).unwrap_or_default();
977    if raw.trim().is_empty() {
978        return Err(format!(
979            "A2A auth config \"{}\" was not found in {}.",
980            auth_config_id, A2A_AUTH_CONFIGS_ENV
981        ));
982    }
983
984    let parsed: Value = serde_json::from_str(&raw)
985        .map_err(|error| format!("Invalid {} JSON: {}", A2A_AUTH_CONFIGS_ENV, error))?;
986    let config = parsed.get(auth_config_id).ok_or_else(|| {
987        format!(
988            "A2A auth config \"{}\" was not found in {}.",
989            auth_config_id, A2A_AUTH_CONFIGS_ENV
990        )
991    })?;
992    let headers = config.get("headers").unwrap_or(config);
993    let headers_obj = headers.as_object().ok_or_else(|| {
994        format!(
995            "{}.{} must be a header map or contain a string header map in \"headers\".",
996            A2A_AUTH_CONFIGS_ENV, auth_config_id
997        )
998    })?;
999
1000    let mut resolved = HashMap::new();
1001    for (name, value) in headers_obj {
1002        let value = value.as_str().ok_or_else(|| {
1003            format!(
1004                "{}.{} header {} must be a string.",
1005                A2A_AUTH_CONFIGS_ENV, auth_config_id, name
1006            )
1007        })?;
1008        resolved.insert(name.clone(), value.to_string());
1009    }
1010
1011    Ok(Some(resolved))
1012}
1013
1014fn apply_a2a_auth_headers(
1015    mut request: reqwest::RequestBuilder,
1016    auth_headers: Option<&HashMap<String, String>>,
1017) -> Result<reqwest::RequestBuilder, String> {
1018    if let Some(auth_headers) = auth_headers {
1019        for (name, value) in auth_headers {
1020            let header_name = HeaderName::try_from(name.as_str())
1021                .map_err(|error| format!("Invalid A2A auth header name {}: {}", name, error))?;
1022            let header_value = HeaderValue::from_str(value).map_err(|error| {
1023                format!(
1024                    "Invalid A2A auth header value for {}: {}",
1025                    header_name.as_str(),
1026                    error
1027                )
1028            })?;
1029            request = request.header(header_name, header_value);
1030        }
1031    }
1032
1033    Ok(request)
1034}
1035
1036async fn resolve_a2a_rpc_endpoint(
1037    client: &reqwest::Client,
1038    url: &str,
1039    auth_headers: Option<&HashMap<String, String>>,
1040) -> Result<String, String> {
1041    if url.ends_with(".json") || url.ends_with("/agent-card") || url.ends_with("/card") {
1042        let response = apply_a2a_auth_headers(
1043            client.get(url).header(ACCEPT, "application/json"),
1044            auth_headers,
1045        )?
1046        .send()
1047        .await
1048        .map_err(|error| format!("Failed to fetch A2A agent card: {}", error))?;
1049        if !response.status().is_success() {
1050            return Err(format!(
1051                "A2A agent card fetch failed with HTTP {}",
1052                response.status().as_u16()
1053            ));
1054        }
1055        let card: Value = response
1056            .json()
1057            .await
1058            .map_err(|error| format!("Failed to decode A2A agent card: {}", error))?;
1059        let rpc_url = card
1060            .get("url")
1061            .and_then(Value::as_str)
1062            .ok_or_else(|| "A2A agent card missing url".to_string())?;
1063        absolutize_url(url, rpc_url)
1064    } else {
1065        Ok(url.to_string())
1066    }
1067}
1068
1069fn absolutize_url(base_url: &str, maybe_relative: &str) -> Result<String, String> {
1070    if maybe_relative.starts_with("http://") || maybe_relative.starts_with("https://") {
1071        return Ok(maybe_relative.to_string());
1072    }
1073
1074    let base = reqwest::Url::parse(base_url)
1075        .map_err(|error| format!("Invalid base A2A URL {}: {}", base_url, error))?;
1076    base.join(maybe_relative)
1077        .map(|url| url.to_string())
1078        .map_err(|error| format!("Invalid relative A2A URL {}: {}", maybe_relative, error))
1079}