Skip to main content

routa_server/api/
tasks_automation.rs

1use chrono::Utc;
2use reqwest::header::{HeaderName, HeaderValue, ACCEPT, CONTENT_TYPE};
3use routa_core::events::{AgentEvent, AgentEventType};
4use routa_core::models::kanban::{KanbanAutomationStep, KanbanBoard, KanbanTransport};
5use serde_json::{json, Value};
6use std::collections::HashMap;
7use std::time::{Duration, Instant};
8
9use crate::error::ServerError;
10use crate::models::task::{Task, TaskLaneSession, TaskLaneSessionStatus};
11use crate::state::AppState;
12use routa_core::store::acp_session_store::CreateAcpSessionParams;
13
14const A2A_POLL_INTERVAL: Duration = Duration::from_secs(1);
15const A2A_MAX_WAIT: Duration = Duration::from_secs(300);
16const A2A_AUTH_CONFIGS_ENV: &str = "ROUTA_A2A_AUTH_CONFIGS";
17
18pub async fn resolve_codebase(
19    state: &AppState,
20    workspace_id: &str,
21    repo_path: Option<&str>,
22) -> Result<Option<crate::models::codebase::Codebase>, ServerError> {
23    if let Some(path) = repo_path {
24        state
25            .codebase_store
26            .find_by_repo_path(workspace_id, path)
27            .await
28    } else {
29        state.codebase_store.get_default(workspace_id).await
30    }
31}
32
33pub async fn auto_create_worktree(
34    state: &AppState,
35    task: &crate::models::task::Task,
36    codebase: &crate::models::codebase::Codebase,
37) -> Result<String, String> {
38    let short_id = &task.id[..task.id.len().min(8)];
39    let label = short_id.to_string();
40    let branch = format!("issue/{short_id}");
41
42    let workspace = state
43        .workspace_store
44        .get(&task.workspace_id)
45        .await
46        .ok()
47        .flatten();
48    let worktree_root = workspace
49        .as_ref()
50        .and_then(|ws| ws.metadata.get("worktreeRoot"))
51        .filter(|s| !s.trim().is_empty())
52        .map(std::path::PathBuf::from)
53        .unwrap_or_else(|| crate::git::get_default_workspace_worktree_root(&task.workspace_id));
54
55    let codebase_label = codebase
56        .label
57        .as_ref()
58        .map(|l| crate::git::branch_to_safe_dir_name(l))
59        .unwrap_or_else(|| crate::git::branch_to_safe_dir_name(&codebase.id));
60
61    let worktree_path = worktree_root
62        .join(&codebase_label)
63        .join(crate::git::branch_to_safe_dir_name(&label));
64
65    if let Some(parent) = worktree_path.parent() {
66        std::fs::create_dir_all(parent)
67            .map_err(|e| format!("Failed to create worktree parent dir: {e}"))?;
68    }
69
70    let worktree_path_str = worktree_path.to_string_lossy().to_string();
71    let base_branch = codebase
72        .branch
73        .clone()
74        .unwrap_or_else(|| "main".to_string());
75
76    let worktree = crate::models::worktree::Worktree::new(
77        uuid::Uuid::new_v4().to_string(),
78        codebase.id.clone(),
79        task.workspace_id.clone(),
80        worktree_path_str.clone(),
81        branch.clone(),
82        base_branch.clone(),
83        Some(label),
84    );
85    state
86        .worktree_store
87        .save(&worktree)
88        .await
89        .map_err(|e| format!("Failed to save worktree: {e}"))?;
90
91    let _ = crate::git::worktree_prune(&codebase.repo_path);
92    crate::git::worktree_add(
93        &codebase.repo_path,
94        &worktree_path_str,
95        &branch,
96        &base_branch,
97        false,
98    )
99    .map_err(|e| format!("git worktree add failed: {e}"))?;
100
101    Ok(worktree.id)
102}
103
104pub async fn trigger_assigned_task_agent(
105    state: &AppState,
106    task: &mut Task,
107    cwd: Option<&str>,
108    branch: Option<&str>,
109) -> Result<(), String> {
110    let board = load_task_board(state, task).await?;
111    let step = resolve_task_automation_step(board.as_ref(), task);
112    if is_a2a_step(step.as_ref()) {
113        return trigger_assigned_task_a2a_agent(state, task, board.as_ref(), step.as_ref()).await;
114    }
115
116    trigger_assigned_task_acp_agent(state, task, board.as_ref(), step.as_ref(), cwd, branch).await
117}
118
119fn build_task_prompt(
120    task: &Task,
121    board_id: Option<&str>,
122    next_column_id: Option<&str>,
123    available_columns: &str,
124) -> String {
125    let labels = if task.labels.is_empty() {
126        "Labels: none".to_string()
127    } else {
128        format!("Labels: {}", task.labels.join(", "))
129    };
130    let lane_id = task.column_id.as_deref().unwrap_or("backlog");
131    let lane_guidance = match lane_id {
132        "dev" => vec![
133            "You are in the `dev` lane. This lane may implement the requested change, but you must keep work scoped to the current card.".to_string(),
134            "Use `routa-coordination_update_card` to record concrete progress on this card before or after meaningful implementation steps.".to_string(),
135            "When implementation for this lane is complete, use `routa-coordination_move_card` to advance the same card.".to_string(),
136        ],
137        "todo" => vec![
138            "You are in the `todo` lane. This lane does not perform full implementation work.".to_string(),
139            "Only clarify the card, update its progress or status, and move the same card forward when the lane is complete.".to_string(),
140            "Do not edit files, do not inspect the whole repository, and do not run browser tests or environment diagnostics in this lane.".to_string(),
141        ],
142        _ => vec![
143            format!("You are in the `{lane_id}` lane. Keep work scoped to the current card and this lane only."),
144        ],
145    };
146    let mut sections = vec![
147        format!("You are assigned to Kanban task: {}", task.title),
148        String::new(),
149        "## Context".to_string(),
150        String::new(),
151        "**IMPORTANT**: You are working in Kanban lane automation for exactly one existing card.".to_string(),
152        "Only operate on the current card. Do not create a new task, do not switch to a different card, and do not broaden scope.".to_string(),
153        "Use the exact MCP tool names exposed by the provider. In OpenCode, prefer `routa-coordination_update_card` and `routa-coordination_move_card`.".to_string(),
154        "When a move is blocked by missing story definition fields, use `routa-coordination_update_task` to update structured fields such as scope, acceptance criteria, verification commands, or test cases.".to_string(),
155        "Do NOT use `gh issue create`, browser automation, Playwright, repo-wide debugging, API exploration, or unrelated codebase research unless the card objective explicitly requires it.".to_string(),
156        String::new(),
157        "## Task Details".to_string(),
158        String::new(),
159        format!("**Card ID:** {}", task.id),
160        format!(
161            "**Priority:** {}",
162            task.priority.as_ref().map(|value| value.as_str()).unwrap_or("medium")
163        ),
164        board_id
165            .map(|value| format!("**Board ID:** {value}"))
166            .unwrap_or_else(|| "**Board ID:** unavailable".to_string()),
167        format!("**Current Lane:** {}", lane_id),
168        next_column_id
169            .map(|value| format!("**Next Column ID:** {value}"))
170            .unwrap_or_else(|| "**Next Column ID:** unavailable".to_string()),
171        labels,
172        task.github_url
173            .as_ref()
174            .map(|url| format!("**GitHub Issue:** {url}"))
175            .unwrap_or_else(|| "**GitHub Issue:** local-only".to_string()),
176        String::new(),
177        "## Objective".to_string(),
178        String::new(),
179        task.objective.clone(),
180        String::new(),
181        "## Board Columns".to_string(),
182        String::new(),
183        available_columns.to_string(),
184        String::new(),
185        "## Lane Guidance".to_string(),
186        String::new(),
187        lane_guidance.join("\n"),
188        String::new(),
189    ];
190
191    if let Some(test_cases) = task.test_cases.as_ref().filter(|value| !value.is_empty()) {
192        sections.push("## Test Cases".to_string());
193        sections.push(String::new());
194        sections.push(
195            test_cases
196                .iter()
197                .map(|value| format!("- {value}"))
198                .collect::<Vec<_>>()
199                .join("\n"),
200        );
201        sections.push(String::new());
202    }
203
204    sections.extend([
205        "## Available MCP Tools".to_string(),
206        String::new(),
207        "Use the exact MCP tool names exposed in this session. For OpenCode, the important ones are:".to_string(),
208        String::new(),
209        format!(
210            "- **routa-coordination_update_task**: Update structured task fields such as scope, acceptanceCriteria, verificationCommands, and testCases. Use taskId: \"{}\" when story readiness is missing.",
211            task.id
212        ),
213        format!(
214            "- **routa-coordination_update_card**: Update this card's title, description, priority, or labels. Use cardId: \"{}\"",
215            task.id
216        ),
217        "- **routa-coordination_update_card is not a story-readiness tool**: card description or comment text does not satisfy move gates for scope, acceptance criteria, verification commands, or test cases.".to_string(),
218        format!(
219            "- **routa-coordination_move_card**: Move this same card to targetColumnId \"{}\" when the current lane is complete.",
220            next_column_id.unwrap_or("the exact next column id listed above")
221        ),
222        String::new(),
223        "## Instructions".to_string(),
224        String::new(),
225        "1. Start work for the current lane immediately.".to_string(),
226        "2. Keep changes focused on this card only.".to_string(),
227        "3. Use `routa-coordination_update_task` to fix missing structured story fields, and `routa-coordination_update_card` only for card text or progress notes.".to_string(),
228        format!(
229            "4. Use the exact tool name `routa-coordination_move_card` with targetColumnId `{}` only when the current lane is complete.",
230            next_column_id.unwrap_or("the exact next column id listed above")
231        ),
232        "5. Do not guess board ids or column ids. Use the Board ID and Board Columns listed above.".to_string(),
233        "6. If blocked, update this same card with the blocking reason instead of exploring side quests.".to_string(),
234        "7. Treat lane guidance as stricter than the general card objective when they conflict.".to_string(),
235        "8. Do not run browser tests or environment diagnostics unless the card explicitly asks for them.".to_string(),
236    ]);
237
238    sections.join("\n")
239}
240
241async fn trigger_assigned_task_acp_agent(
242    state: &AppState,
243    task: &mut Task,
244    board: Option<&KanbanBoard>,
245    step: Option<&KanbanAutomationStep>,
246    cwd: Option<&str>,
247    branch: Option<&str>,
248) -> Result<(), String> {
249    let provider = task
250        .assigned_provider
251        .clone()
252        .unwrap_or_else(|| "opencode".to_string());
253    let role = task
254        .assigned_role
255        .clone()
256        .unwrap_or_else(|| "CRAFTER".to_string())
257        .to_uppercase();
258    let session_id = uuid::Uuid::new_v4().to_string();
259    let cwd = cwd
260        .map(|value| value.to_string())
261        .or_else(|| {
262            std::env::current_dir()
263                .ok()
264                .map(|path| path.to_string_lossy().to_string())
265        })
266        .unwrap_or_else(|| ".".to_string());
267
268    state
269        .acp_manager
270        .create_session(
271            session_id.clone(),
272            cwd.clone(),
273            task.workspace_id.clone(),
274            Some(provider.clone()),
275            Some(role.clone()),
276            None,
277            None,
278            Some("full".to_string()),
279            Some("kanban-planning".to_string()),
280        )
281        .await
282        .map_err(|error| format!("Failed to create ACP session: {error}"))?;
283
284    state
285        .acp_session_store
286        .create(CreateAcpSessionParams {
287            id: &session_id,
288            cwd: &cwd,
289            branch: None,
290            workspace_id: &task.workspace_id,
291            provider: Some(provider.as_str()),
292            role: Some(role.as_str()),
293            custom_command: None,
294            custom_args: None,
295            parent_session_id: None,
296        })
297        .await
298        .map_err(|error| format!("Failed to persist ACP session: {error}"))?;
299
300    let mut ordered_columns = board.map(|value| value.columns.clone()).unwrap_or_default();
301    ordered_columns.sort_by_key(|column| column.position);
302    let next_column_id = ordered_columns
303        .iter()
304        .position(|column| Some(column.id.as_str()) == task.column_id.as_deref())
305        .and_then(|index| ordered_columns.get(index + 1))
306        .map(|column| column.id.clone());
307    let available_columns = if ordered_columns.is_empty() {
308        "- unavailable".to_string()
309    } else {
310        ordered_columns
311            .iter()
312            .map(|column| {
313                format!(
314                    "- {} ({}) stage={} position={}",
315                    column.id, column.name, column.stage, column.position
316                )
317            })
318            .collect::<Vec<_>>()
319            .join("\n")
320    };
321    let prompt = build_task_prompt(
322        task,
323        board
324            .map(|value| value.id.as_str())
325            .or(task.board_id.as_deref()),
326        next_column_id.as_deref(),
327        &available_columns,
328    );
329    let state_clone = state.clone();
330    let session_id_clone = session_id.clone();
331    let task_workspace = task.workspace_id.clone();
332    let provider_clone = provider.clone();
333    let cwd_clone = cwd.clone();
334    let _branch = branch.map(|value| value.to_string());
335
336    if let Err(error) = state
337        .acp_session_store
338        .set_first_prompt_sent(&session_id)
339        .await
340    {
341        tracing::error!(
342            target: "routa_kanban_prompt",
343            session_id = %session_id,
344            workspace_id = %task.workspace_id,
345            error = %error,
346            "kanban auto prompt failed to mark prompt dispatched"
347        );
348    } else {
349        tracing::info!(
350            target: "routa_kanban_prompt",
351            session_id = %session_id,
352            workspace_id = %task.workspace_id,
353            provider = %provider,
354            "kanban auto prompt marked prompt dispatched"
355        );
356    }
357
358    tracing::info!(
359        target: "routa_kanban_prompt",
360        session_id = %session_id_clone,
361        workspace_id = %task_workspace,
362        provider = %provider_clone,
363        cwd = %cwd_clone,
364        "kanban auto prompt scheduled"
365    );
366
367    tokio::spawn(async move {
368        tracing::info!(
369            target: "routa_kanban_prompt",
370            session_id = %session_id_clone,
371            workspace_id = %task_workspace,
372            provider = %provider_clone,
373            cwd = %cwd_clone,
374            "kanban auto prompt start"
375        );
376        if let Err(error) = state_clone
377            .acp_manager
378            .prompt(&session_id_clone, &prompt)
379            .await
380        {
381            tracing::error!(
382                "[kanban] Failed to auto-prompt ACP task session {} in workspace {} with provider {} at {}: {}",
383                session_id_clone,
384                task_workspace,
385                provider_clone,
386                cwd_clone,
387                error
388            );
389            return;
390        }
391
392        tracing::info!(
393            target: "routa_kanban_prompt",
394            session_id = %session_id_clone,
395            workspace_id = %task_workspace,
396            provider = %provider_clone,
397            "kanban auto prompt success"
398        );
399        if let Some(history) = state_clone
400            .acp_manager
401            .get_session_history(&session_id_clone)
402            .await
403        {
404            if let Err(error) = state_clone
405                .acp_session_store
406                .save_history(&session_id_clone, &history)
407                .await
408            {
409                tracing::error!(
410                    target: "routa_kanban_prompt",
411                    session_id = %session_id_clone,
412                    workspace_id = %task_workspace,
413                    error = %error,
414                    "kanban auto prompt failed to persist history"
415                );
416            } else {
417                tracing::info!(
418                    target: "routa_kanban_prompt",
419                    session_id = %session_id_clone,
420                    workspace_id = %task_workspace,
421                    history_len = history.len(),
422                    "kanban auto prompt persisted history"
423                );
424            }
425        }
426    });
427
428    apply_trigger_result(
429        task,
430        board,
431        step,
432        AgentTriggerResult {
433            session_id,
434            transport: "acp".to_string(),
435            external_task_id: None,
436            context_id: None,
437        },
438    );
439
440    Ok(())
441}
442
443async fn trigger_assigned_task_a2a_agent(
444    state: &AppState,
445    task: &mut Task,
446    board: Option<&KanbanBoard>,
447    step: Option<&KanbanAutomationStep>,
448) -> Result<(), String> {
449    let step = step.ok_or_else(|| "A2A automation requires a resolved column step".to_string())?;
450    let agent_card_url = step
451        .agent_card_url
452        .as_deref()
453        .ok_or_else(|| "A2A automation requires agentCardUrl".to_string())?;
454    let auth_headers = resolve_a2a_auth_headers(step.auth_config_id.as_deref())?;
455
456    let mut ordered_columns = board.map(|value| value.columns.clone()).unwrap_or_default();
457    ordered_columns.sort_by_key(|column| column.position);
458    let next_column_id = ordered_columns
459        .iter()
460        .position(|column| Some(column.id.as_str()) == task.column_id.as_deref())
461        .and_then(|index| ordered_columns.get(index + 1))
462        .map(|column| column.id.clone());
463    let available_columns = if ordered_columns.is_empty() {
464        "- unavailable".to_string()
465    } else {
466        ordered_columns
467            .iter()
468            .map(|column| {
469                format!(
470                    "- {} ({}) stage={} position={}",
471                    column.id, column.name, column.stage, column.position
472                )
473            })
474            .collect::<Vec<_>>()
475            .join("\n")
476    };
477    let prompt = build_task_prompt(
478        task,
479        board
480            .map(|value| value.id.as_str())
481            .or(task.board_id.as_deref()),
482        next_column_id.as_deref(),
483        &available_columns,
484    );
485
486    let client = reqwest::Client::new();
487    let rpc_endpoint =
488        resolve_a2a_rpc_endpoint(&client, agent_card_url, auth_headers.as_ref()).await?;
489    let request_id = uuid::Uuid::new_v4().to_string();
490    let message_id = uuid::Uuid::new_v4().to_string();
491    let response = apply_a2a_auth_headers(
492        client
493            .post(&rpc_endpoint)
494            .header(CONTENT_TYPE, "application/json")
495            .header(ACCEPT, "application/json")
496            .json(&json!({
497                "jsonrpc": "2.0",
498                "id": request_id,
499                "method": "SendMessage",
500                "params": {
501                    "message": {
502                        "messageId": message_id,
503                        "role": "user",
504                        "parts": [
505                            { "text": prompt }
506                        ]
507                    },
508                    "metadata": {
509                        "workspaceId": task.workspace_id,
510                        "taskId": task.id,
511                        "boardId": task.board_id,
512                        "columnId": task.column_id,
513                        "stepId": step.id,
514                        "skillId": step.skill_id,
515                        "authConfigId": step.auth_config_id,
516                        "role": task.assigned_role,
517                    }
518                }
519            })),
520        auth_headers.as_ref(),
521    )?
522    .send()
523    .await
524    .map_err(|error| format!("Failed to send A2A request: {error}"))?;
525
526    if !response.status().is_success() {
527        return Err(format!(
528            "A2A request failed with HTTP {}",
529            response.status().as_u16()
530        ));
531    }
532
533    let payload: Value = response
534        .json()
535        .await
536        .map_err(|error| format!("Failed to decode A2A response: {error}"))?;
537    if let Some(error) = payload.get("error") {
538        let message = error
539            .get("message")
540            .and_then(Value::as_str)
541            .unwrap_or("unknown A2A error");
542        return Err(format!("A2A JSON-RPC error: {message}"));
543    }
544
545    let task_result = payload
546        .get("result")
547        .and_then(|value| value.get("task"))
548        .ok_or_else(|| "A2A response missing result.task".to_string())?;
549    let external_task_id = task_result
550        .get("id")
551        .and_then(Value::as_str)
552        .ok_or_else(|| "A2A response missing task.id".to_string())?
553        .to_string();
554    let context_id = task_result
555        .get("contextId")
556        .and_then(Value::as_str)
557        .map(ToOwned::to_owned);
558    let session_id = format!("a2a-{}", uuid::Uuid::new_v4());
559
560    apply_trigger_result(
561        task,
562        board,
563        Some(step),
564        AgentTriggerResult {
565            session_id: session_id.clone(),
566            transport: "a2a".to_string(),
567            external_task_id: Some(external_task_id.clone()),
568            context_id,
569        },
570    );
571
572    let state_clone = state.clone();
573    let task_id = task.id.clone();
574    let workspace_id = task.workspace_id.clone();
575    tokio::spawn(async move {
576        monitor_a2a_task_completion(
577            &state_clone,
578            &workspace_id,
579            &task_id,
580            &session_id,
581            &rpc_endpoint,
582            &external_task_id,
583            auth_headers,
584        )
585        .await;
586    });
587
588    Ok(())
589}
590
591#[derive(Debug)]
592struct AgentTriggerResult {
593    session_id: String,
594    transport: String,
595    external_task_id: Option<String>,
596    context_id: Option<String>,
597}
598
599fn apply_trigger_result(
600    task: &mut Task,
601    board: Option<&KanbanBoard>,
602    step: Option<&KanbanAutomationStep>,
603    result: AgentTriggerResult,
604) {
605    task.trigger_session_id = Some(result.session_id.clone());
606    if !task.session_ids.iter().any(|id| id == &result.session_id) {
607        task.session_ids.push(result.session_id.clone());
608    }
609
610    let column_name = board.and_then(|value| {
611        value.columns.iter().find_map(|column| {
612            (Some(column.id.as_str()) == task.column_id.as_deref()).then(|| column.name.clone())
613        })
614    });
615    let lane_session = TaskLaneSession {
616        session_id: result.session_id.clone(),
617        routa_agent_id: None,
618        column_id: task.column_id.clone(),
619        column_name,
620        step_id: step.map(|value| value.id.clone()),
621        step_index: None,
622        step_name: step
623            .and_then(|value| value.specialist_name.clone())
624            .or_else(|| task.assigned_specialist_name.clone()),
625        provider: task.assigned_provider.clone(),
626        role: task.assigned_role.clone(),
627        specialist_id: task.assigned_specialist_id.clone(),
628        specialist_name: task.assigned_specialist_name.clone(),
629        transport: Some(result.transport),
630        external_task_id: result.external_task_id,
631        context_id: result.context_id,
632        attempt: Some(1),
633        loop_mode: None,
634        completion_requirement: None,
635        objective: Some(task.objective.clone()),
636        last_activity_at: Some(Utc::now().to_rfc3339()),
637        recovered_from_session_id: None,
638        recovery_reason: None,
639        status: TaskLaneSessionStatus::Running,
640        started_at: Utc::now().to_rfc3339(),
641        completed_at: None,
642    };
643
644    if let Some(existing) = task
645        .lane_sessions
646        .iter_mut()
647        .find(|existing| existing.session_id == result.session_id)
648    {
649        *existing = lane_session;
650    } else {
651        task.lane_sessions.push(lane_session);
652    }
653}
654
655#[derive(Debug)]
656struct A2ATaskTerminalUpdate {
657    status: TaskLaneSessionStatus,
658    completed_at: String,
659    last_activity_at: String,
660    context_id: Option<String>,
661    error: Option<String>,
662}
663
664async fn monitor_a2a_task_completion(
665    state: &AppState,
666    workspace_id: &str,
667    task_id: &str,
668    session_id: &str,
669    rpc_endpoint: &str,
670    external_task_id: &str,
671    auth_headers: Option<HashMap<String, String>>,
672) {
673    let client = reqwest::Client::new();
674    let terminal = match wait_for_a2a_completion(
675        &client,
676        rpc_endpoint,
677        external_task_id,
678        auth_headers.as_ref(),
679    )
680    .await
681    {
682        Ok(terminal) => terminal,
683        Err(error) => {
684            let now = Utc::now().to_rfc3339();
685            let status = if error.contains("did not complete within") {
686                TaskLaneSessionStatus::TimedOut
687            } else {
688                TaskLaneSessionStatus::Failed
689            };
690            A2ATaskTerminalUpdate {
691                status,
692                completed_at: now.clone(),
693                last_activity_at: now,
694                context_id: None,
695                error: Some(error),
696            }
697        }
698    };
699
700    if let Err(error) =
701        reconcile_a2a_lane_session(state, task_id, session_id, external_task_id, terminal).await
702    {
703        tracing::warn!(
704            target: "routa_a2a",
705            workspace_id = %workspace_id,
706            task_id = %task_id,
707            session_id = %session_id,
708            external_task_id = %external_task_id,
709            error = %error,
710            "failed to persist A2A terminal state"
711        );
712        return;
713    }
714
715    emit_kanban_workspace_event(state, workspace_id, task_id).await;
716}
717
718async fn wait_for_a2a_completion(
719    client: &reqwest::Client,
720    rpc_endpoint: &str,
721    task_id: &str,
722    auth_headers: Option<&HashMap<String, String>>,
723) -> Result<A2ATaskTerminalUpdate, String> {
724    let started_at = Instant::now();
725
726    loop {
727        let terminal = get_a2a_task_update(client, rpc_endpoint, task_id, auth_headers).await?;
728        if let Some(terminal) = terminal {
729            return Ok(terminal);
730        }
731        if started_at.elapsed() >= A2A_MAX_WAIT {
732            return Err(format!(
733                "A2A task {task_id} did not complete within {}ms",
734                A2A_MAX_WAIT.as_millis()
735            ));
736        }
737        tokio::time::sleep(A2A_POLL_INTERVAL).await;
738    }
739}
740
741async fn get_a2a_task_update(
742    client: &reqwest::Client,
743    rpc_endpoint: &str,
744    task_id: &str,
745    auth_headers: Option<&HashMap<String, String>>,
746) -> Result<Option<A2ATaskTerminalUpdate>, String> {
747    let request_id = uuid::Uuid::new_v4().to_string();
748    let response = apply_a2a_auth_headers(
749        client
750            .post(rpc_endpoint)
751            .header(CONTENT_TYPE, "application/json")
752            .header(ACCEPT, "application/json")
753            .json(&json!({
754                "jsonrpc": "2.0",
755                "id": request_id,
756                "method": "GetTask",
757                "params": { "id": task_id }
758            })),
759        auth_headers,
760    )?
761    .send()
762    .await
763    .map_err(|error| format!("Failed to poll A2A task: {error}"))?;
764
765    if !response.status().is_success() {
766        return Err(format!(
767            "A2A GetTask failed with HTTP {}",
768            response.status().as_u16()
769        ));
770    }
771
772    let payload: Value = response
773        .json()
774        .await
775        .map_err(|error| format!("Failed to decode A2A task payload: {error}"))?;
776    if let Some(error) = payload.get("error") {
777        let message = error
778            .get("message")
779            .and_then(Value::as_str)
780            .unwrap_or("unknown A2A error");
781        return Err(format!("A2A JSON-RPC error: {message}"));
782    }
783
784    let task = payload
785        .get("result")
786        .and_then(|value| value.get("task"))
787        .ok_or_else(|| "A2A response missing result.task".to_string())?;
788    let state = task
789        .get("status")
790        .and_then(|value| value.get("state"))
791        .and_then(Value::as_str)
792        .ok_or_else(|| "A2A task missing status.state".to_string())?;
793    if !is_terminal_a2a_state(state) {
794        return Ok(None);
795    }
796
797    let timestamp = task
798        .get("status")
799        .and_then(|value| value.get("timestamp"))
800        .and_then(Value::as_str)
801        .map(ToOwned::to_owned)
802        .unwrap_or_else(|| Utc::now().to_rfc3339());
803    let context_id = task
804        .get("contextId")
805        .and_then(Value::as_str)
806        .map(ToOwned::to_owned);
807    let error = if state == "completed" {
808        None
809    } else {
810        Some(
811            extract_a2a_status_message(task)
812                .unwrap_or_else(|| format!("A2A task ended in state: {state}")),
813        )
814    };
815
816    Ok(Some(A2ATaskTerminalUpdate {
817        status: map_a2a_terminal_status(state),
818        completed_at: timestamp.clone(),
819        last_activity_at: timestamp,
820        context_id,
821        error,
822    }))
823}
824
825fn extract_a2a_status_message(task: &Value) -> Option<String> {
826    let parts = task
827        .get("status")
828        .and_then(|value| value.get("message"))
829        .and_then(|value| value.get("parts"))
830        .and_then(Value::as_array)?;
831    let text = parts
832        .iter()
833        .filter_map(|part| part.get("text").and_then(Value::as_str))
834        .map(str::trim)
835        .filter(|part| !part.is_empty())
836        .collect::<Vec<_>>()
837        .join(" ");
838    (!text.is_empty()).then_some(text)
839}
840
841fn is_terminal_a2a_state(state: &str) -> bool {
842    matches!(
843        state,
844        "completed" | "failed" | "canceled" | "rejected" | "auth-required"
845    )
846}
847
848fn map_a2a_terminal_status(state: &str) -> TaskLaneSessionStatus {
849    match state {
850        "completed" => TaskLaneSessionStatus::Completed,
851        _ => TaskLaneSessionStatus::Failed,
852    }
853}
854
855async fn reconcile_a2a_lane_session(
856    state: &AppState,
857    task_id: &str,
858    session_id: &str,
859    external_task_id: &str,
860    terminal: A2ATaskTerminalUpdate,
861) -> Result<(), String> {
862    let mut task = wait_for_task_persistence(state, task_id, session_id).await?;
863    let lane_session = task
864        .lane_sessions
865        .iter_mut()
866        .find(|session| session.session_id == session_id)
867        .ok_or_else(|| format!("Task {task_id} missing lane session {session_id}"))?;
868
869    lane_session.status = terminal.status;
870    lane_session.completed_at = Some(terminal.completed_at.clone());
871    lane_session.last_activity_at = Some(terminal.last_activity_at.clone());
872    if lane_session.external_task_id.is_none() {
873        lane_session.external_task_id = Some(external_task_id.to_string());
874    }
875    if terminal.context_id.is_some() {
876        lane_session.context_id = terminal.context_id.clone();
877    }
878
879    if task.trigger_session_id.as_deref() == Some(session_id) {
880        task.trigger_session_id = None;
881    }
882    task.last_sync_error = terminal.error;
883    task.updated_at = Utc::now();
884
885    state
886        .task_store
887        .save(&task)
888        .await
889        .map_err(|error| format!("Failed to save A2A task reconciliation: {error}"))
890}
891
892async fn wait_for_task_persistence(
893    state: &AppState,
894    task_id: &str,
895    session_id: &str,
896) -> Result<Task, String> {
897    for _ in 0..20 {
898        if let Some(task) = state
899            .task_store
900            .get(task_id)
901            .await
902            .map_err(|error| format!("Failed to load task {task_id}: {error}"))?
903        {
904            if task
905                .lane_sessions
906                .iter()
907                .any(|session| session.session_id == session_id)
908            {
909                return Ok(task);
910            }
911        }
912        tokio::time::sleep(Duration::from_millis(50)).await;
913    }
914
915    Err(format!(
916        "Task {task_id} did not persist lane session {session_id} before A2A reconciliation"
917    ))
918}
919
920async fn emit_kanban_workspace_event(state: &AppState, workspace_id: &str, task_id: &str) {
921    state
922        .event_bus
923        .emit(AgentEvent {
924            event_type: AgentEventType::WorkspaceUpdated,
925            agent_id: "kanban-a2a".to_string(),
926            workspace_id: workspace_id.to_string(),
927            data: serde_json::json!({
928                "scope": "kanban",
929                "entity": "task",
930                "action": "updated",
931                "resourceId": task_id,
932                "source": "system",
933            }),
934            timestamp: Utc::now(),
935        })
936        .await;
937}
938
939async fn load_task_board(state: &AppState, task: &Task) -> Result<Option<KanbanBoard>, String> {
940    if let Some(board_id) = task.board_id.as_deref() {
941        state
942            .kanban_store
943            .get(board_id)
944            .await
945            .map_err(|error| format!("Failed to load Kanban board for automation: {error}"))
946    } else {
947        Ok(None)
948    }
949}
950
951fn resolve_task_automation_step(
952    board: Option<&KanbanBoard>,
953    task: &Task,
954) -> Option<KanbanAutomationStep> {
955    board
956        .and_then(|value| {
957            value
958                .columns
959                .iter()
960                .find(|column| Some(column.id.as_str()) == task.column_id.as_deref())
961        })
962        .and_then(|column| column.automation.as_ref())
963        .filter(|automation| automation.enabled)
964        .and_then(|automation| automation.primary_step())
965}
966
967fn is_a2a_step(step: Option<&KanbanAutomationStep>) -> bool {
968    step.is_some_and(|value| {
969        matches!(value.transport, Some(KanbanTransport::A2a)) || value.agent_card_url.is_some()
970    })
971}
972
973fn resolve_a2a_auth_headers(
974    auth_config_id: Option<&str>,
975) -> Result<Option<HashMap<String, String>>, String> {
976    let Some(auth_config_id) = auth_config_id
977        .map(str::trim)
978        .filter(|value| !value.is_empty())
979    else {
980        return Ok(None);
981    };
982    let raw = std::env::var(A2A_AUTH_CONFIGS_ENV).unwrap_or_default();
983    if raw.trim().is_empty() {
984        return Err(format!(
985            "A2A auth config \"{auth_config_id}\" was not found in {A2A_AUTH_CONFIGS_ENV}."
986        ));
987    }
988
989    let parsed: Value = serde_json::from_str(&raw)
990        .map_err(|error| format!("Invalid {A2A_AUTH_CONFIGS_ENV} JSON: {error}"))?;
991    let config = parsed.get(auth_config_id).ok_or_else(|| {
992        format!("A2A auth config \"{auth_config_id}\" was not found in {A2A_AUTH_CONFIGS_ENV}.")
993    })?;
994    let headers = config.get("headers").unwrap_or(config);
995    let headers_obj = headers.as_object().ok_or_else(|| {
996        format!(
997            "{A2A_AUTH_CONFIGS_ENV}.{auth_config_id} must be a header map or contain a string header map in \"headers\"."
998        )
999    })?;
1000
1001    let mut resolved = HashMap::new();
1002    for (name, value) in headers_obj {
1003        let value = value.as_str().ok_or_else(|| {
1004            format!("{A2A_AUTH_CONFIGS_ENV}.{auth_config_id} header {name} must be a string.")
1005        })?;
1006        resolved.insert(name.clone(), value.to_string());
1007    }
1008
1009    Ok(Some(resolved))
1010}
1011
1012fn apply_a2a_auth_headers(
1013    mut request: reqwest::RequestBuilder,
1014    auth_headers: Option<&HashMap<String, String>>,
1015) -> Result<reqwest::RequestBuilder, String> {
1016    if let Some(auth_headers) = auth_headers {
1017        for (name, value) in auth_headers {
1018            let header_name = HeaderName::try_from(name.as_str())
1019                .map_err(|error| format!("Invalid A2A auth header name {name}: {error}"))?;
1020            let header_value = HeaderValue::from_str(value).map_err(|error| {
1021                format!(
1022                    "Invalid A2A auth header value for {}: {}",
1023                    header_name.as_str(),
1024                    error
1025                )
1026            })?;
1027            request = request.header(header_name, header_value);
1028        }
1029    }
1030
1031    Ok(request)
1032}
1033
1034async fn resolve_a2a_rpc_endpoint(
1035    client: &reqwest::Client,
1036    url: &str,
1037    auth_headers: Option<&HashMap<String, String>>,
1038) -> Result<String, String> {
1039    if url.ends_with(".json") || url.ends_with("/agent-card") || url.ends_with("/card") {
1040        let response = apply_a2a_auth_headers(
1041            client.get(url).header(ACCEPT, "application/json"),
1042            auth_headers,
1043        )?
1044        .send()
1045        .await
1046        .map_err(|error| format!("Failed to fetch A2A agent card: {error}"))?;
1047        if !response.status().is_success() {
1048            return Err(format!(
1049                "A2A agent card fetch failed with HTTP {}",
1050                response.status().as_u16()
1051            ));
1052        }
1053        let card: Value = response
1054            .json()
1055            .await
1056            .map_err(|error| format!("Failed to decode A2A agent card: {error}"))?;
1057        let rpc_url = card
1058            .get("url")
1059            .and_then(Value::as_str)
1060            .ok_or_else(|| "A2A agent card missing url".to_string())?;
1061        absolutize_url(url, rpc_url)
1062    } else {
1063        Ok(url.to_string())
1064    }
1065}
1066
1067fn absolutize_url(base_url: &str, maybe_relative: &str) -> Result<String, String> {
1068    if maybe_relative.starts_with("http://") || maybe_relative.starts_with("https://") {
1069        return Ok(maybe_relative.to_string());
1070    }
1071
1072    let base = reqwest::Url::parse(base_url)
1073        .map_err(|error| format!("Invalid base A2A URL {base_url}: {error}"))?;
1074    base.join(maybe_relative)
1075        .map(|url| url.to_string())
1076        .map_err(|error| format!("Invalid relative A2A URL {maybe_relative}: {error}"))
1077}