1use chrono::Utc;
2use reqwest::header::{HeaderName, HeaderValue, ACCEPT, CONTENT_TYPE};
3use routa_core::events::{AgentEvent, AgentEventType};
4use routa_core::models::kanban::{KanbanAutomationStep, KanbanBoard, KanbanTransport};
5use serde_json::{json, Value};
6use std::collections::HashMap;
7use std::time::{Duration, Instant};
8
9use crate::error::ServerError;
10use crate::models::task::{Task, TaskLaneSession, TaskLaneSessionStatus};
11use crate::state::AppState;
12use routa_core::store::acp_session_store::CreateAcpSessionParams;
13
14const A2A_POLL_INTERVAL: Duration = Duration::from_secs(1);
15const A2A_MAX_WAIT: Duration = Duration::from_secs(300);
16const A2A_AUTH_CONFIGS_ENV: &str = "ROUTA_A2A_AUTH_CONFIGS";
17
18pub async fn resolve_codebase(
19 state: &AppState,
20 workspace_id: &str,
21 repo_path: Option<&str>,
22) -> Result<Option<crate::models::codebase::Codebase>, ServerError> {
23 if let Some(path) = repo_path {
24 state
25 .codebase_store
26 .find_by_repo_path(workspace_id, path)
27 .await
28 } else {
29 state.codebase_store.get_default(workspace_id).await
30 }
31}
32
33pub async fn auto_create_worktree(
34 state: &AppState,
35 task: &crate::models::task::Task,
36 codebase: &crate::models::codebase::Codebase,
37) -> Result<String, String> {
38 let slugified = task
39 .title
40 .to_lowercase()
41 .chars()
42 .map(|c| if c.is_alphanumeric() { c } else { '-' })
43 .collect::<String>()
44 .split('-')
45 .filter(|s| !s.is_empty())
46 .collect::<Vec<_>>()
47 .join("-");
48 let short_id = &task.id[..task.id.len().min(8)];
49 let slug = format!("{}-{}", short_id, &slugified[..slugified.len().min(40)]);
50 let branch = format!("issue/{}", slug);
51
52 let workspace = state
53 .workspace_store
54 .get(&task.workspace_id)
55 .await
56 .ok()
57 .flatten();
58 let worktree_root = workspace
59 .as_ref()
60 .and_then(|ws| ws.metadata.get("worktreeRoot"))
61 .filter(|s| !s.trim().is_empty())
62 .map(std::path::PathBuf::from)
63 .unwrap_or_else(|| crate::git::get_default_workspace_worktree_root(&task.workspace_id));
64
65 let codebase_label = codebase
66 .label
67 .as_ref()
68 .map(|l| crate::git::branch_to_safe_dir_name(l))
69 .unwrap_or_else(|| crate::git::branch_to_safe_dir_name(&codebase.id));
70
71 let worktree_path = worktree_root
72 .join(&codebase_label)
73 .join(crate::git::branch_to_safe_dir_name(&slug));
74
75 if let Some(parent) = worktree_path.parent() {
76 std::fs::create_dir_all(parent)
77 .map_err(|e| format!("Failed to create worktree parent dir: {}", e))?;
78 }
79
80 let worktree_path_str = worktree_path.to_string_lossy().to_string();
81 let base_branch = codebase
82 .branch
83 .clone()
84 .unwrap_or_else(|| "main".to_string());
85
86 let worktree = crate::models::worktree::Worktree::new(
87 uuid::Uuid::new_v4().to_string(),
88 codebase.id.clone(),
89 task.workspace_id.clone(),
90 worktree_path_str.clone(),
91 branch.clone(),
92 base_branch.clone(),
93 Some(slug),
94 );
95 state
96 .worktree_store
97 .save(&worktree)
98 .await
99 .map_err(|e| format!("Failed to save worktree: {}", e))?;
100
101 let _ = crate::git::worktree_prune(&codebase.repo_path);
102 crate::git::worktree_add(
103 &codebase.repo_path,
104 &worktree_path_str,
105 &branch,
106 &base_branch,
107 false,
108 )
109 .map_err(|e| format!("git worktree add failed: {}", e))?;
110
111 Ok(worktree.id)
112}
113
114pub async fn trigger_assigned_task_agent(
115 state: &AppState,
116 task: &mut Task,
117 cwd: Option<&str>,
118 branch: Option<&str>,
119) -> Result<(), String> {
120 let board = load_task_board(state, task).await?;
121 let step = resolve_task_automation_step(board.as_ref(), task);
122 if is_a2a_step(step.as_ref()) {
123 return trigger_assigned_task_a2a_agent(state, task, board.as_ref(), step.as_ref()).await;
124 }
125
126 trigger_assigned_task_acp_agent(state, task, board.as_ref(), step.as_ref(), cwd, branch).await
127}
128
129fn build_task_prompt(
130 task: &Task,
131 board_id: Option<&str>,
132 next_column_id: Option<&str>,
133 available_columns: &str,
134) -> String {
135 let labels = if task.labels.is_empty() {
136 "Labels: none".to_string()
137 } else {
138 format!("Labels: {}", task.labels.join(", "))
139 };
140 let lane_id = task.column_id.as_deref().unwrap_or("backlog");
141 let lane_guidance = match lane_id {
142 "dev" => vec![
143 "You are in the `dev` lane. This lane may implement the requested change, but you must keep work scoped to the current card.".to_string(),
144 "Use `routa-coordination_update_card` to record concrete progress on this card before or after meaningful implementation steps.".to_string(),
145 "When implementation for this lane is complete, use `routa-coordination_move_card` to advance the same card.".to_string(),
146 ],
147 "todo" => vec![
148 "You are in the `todo` lane. This lane does not perform full implementation work.".to_string(),
149 "Only clarify the card, update its progress or status, and move the same card forward when the lane is complete.".to_string(),
150 "Do not edit files, do not inspect the whole repository, and do not run browser tests or environment diagnostics in this lane.".to_string(),
151 ],
152 _ => vec![
153 format!("You are in the `{lane_id}` lane. Keep work scoped to the current card and this lane only."),
154 ],
155 };
156 let mut sections = vec![
157 format!("You are assigned to Kanban task: {}", task.title),
158 String::new(),
159 "## Context".to_string(),
160 String::new(),
161 "**IMPORTANT**: You are working in Kanban lane automation for exactly one existing card.".to_string(),
162 "Only operate on the current card. Do not create a new task, do not switch to a different card, and do not broaden scope.".to_string(),
163 "Use the exact MCP tool names exposed by the provider. In OpenCode, prefer `routa-coordination_update_card` and `routa-coordination_move_card`.".to_string(),
164 "Do NOT use `gh issue create`, browser automation, Playwright, repo-wide debugging, API exploration, or unrelated codebase research unless the card objective explicitly requires it.".to_string(),
165 String::new(),
166 "## Task Details".to_string(),
167 String::new(),
168 format!("**Card ID:** {}", task.id),
169 format!(
170 "**Priority:** {}",
171 task.priority.as_ref().map(|value| value.as_str()).unwrap_or("medium")
172 ),
173 board_id
174 .map(|value| format!("**Board ID:** {}", value))
175 .unwrap_or_else(|| "**Board ID:** unavailable".to_string()),
176 format!("**Current Lane:** {}", lane_id),
177 next_column_id
178 .map(|value| format!("**Next Column ID:** {}", value))
179 .unwrap_or_else(|| "**Next Column ID:** unavailable".to_string()),
180 labels,
181 task.github_url
182 .as_ref()
183 .map(|url| format!("**GitHub Issue:** {}", url))
184 .unwrap_or_else(|| "**GitHub Issue:** local-only".to_string()),
185 String::new(),
186 "## Objective".to_string(),
187 String::new(),
188 task.objective.clone(),
189 String::new(),
190 "## Board Columns".to_string(),
191 String::new(),
192 available_columns.to_string(),
193 String::new(),
194 "## Lane Guidance".to_string(),
195 String::new(),
196 lane_guidance.join("\n"),
197 String::new(),
198 ];
199
200 if let Some(test_cases) = task.test_cases.as_ref().filter(|value| !value.is_empty()) {
201 sections.push("## Test Cases".to_string());
202 sections.push(String::new());
203 sections.push(
204 test_cases
205 .iter()
206 .map(|value| format!("- {}", value))
207 .collect::<Vec<_>>()
208 .join("\n"),
209 );
210 sections.push(String::new());
211 }
212
213 sections.extend([
214 "## Available MCP Tools".to_string(),
215 String::new(),
216 "Use the exact MCP tool names exposed in this session. For OpenCode, the important ones are:".to_string(),
217 String::new(),
218 format!(
219 "- **routa-coordination_update_card**: Update this card's title, description, priority, or labels. Use cardId: \"{}\"",
220 task.id
221 ),
222 format!(
223 "- **routa-coordination_move_card**: Move this same card to targetColumnId \"{}\" when the current lane is complete.",
224 next_column_id.unwrap_or("the exact next column id listed above")
225 ),
226 String::new(),
227 "## Instructions".to_string(),
228 String::new(),
229 "1. Start work for the current lane immediately.".to_string(),
230 "2. Keep changes focused on this card only.".to_string(),
231 "3. Use the exact tool name `routa-coordination_update_card` to record progress on this card.".to_string(),
232 format!(
233 "4. Use the exact tool name `routa-coordination_move_card` with targetColumnId `{}` only when the current lane is complete.",
234 next_column_id.unwrap_or("the exact next column id listed above")
235 ),
236 "5. Do not guess board ids or column ids. Use the Board ID and Board Columns listed above.".to_string(),
237 "6. If blocked, update this same card with the blocking reason instead of exploring side quests.".to_string(),
238 "7. Treat lane guidance as stricter than the general card objective when they conflict.".to_string(),
239 "8. Do not run browser tests or environment diagnostics unless the card explicitly asks for them.".to_string(),
240 ]);
241
242 sections.join("\n")
243}
244
245async fn trigger_assigned_task_acp_agent(
246 state: &AppState,
247 task: &mut Task,
248 board: Option<&KanbanBoard>,
249 step: Option<&KanbanAutomationStep>,
250 cwd: Option<&str>,
251 branch: Option<&str>,
252) -> Result<(), String> {
253 let provider = task
254 .assigned_provider
255 .clone()
256 .unwrap_or_else(|| "opencode".to_string());
257 let role = task
258 .assigned_role
259 .clone()
260 .unwrap_or_else(|| "CRAFTER".to_string())
261 .to_uppercase();
262 let session_id = uuid::Uuid::new_v4().to_string();
263 let cwd = cwd
264 .map(|value| value.to_string())
265 .or_else(|| {
266 std::env::current_dir()
267 .ok()
268 .map(|path| path.to_string_lossy().to_string())
269 })
270 .unwrap_or_else(|| ".".to_string());
271
272 state
273 .acp_manager
274 .create_session(
275 session_id.clone(),
276 cwd.clone(),
277 task.workspace_id.clone(),
278 Some(provider.clone()),
279 Some(role.clone()),
280 None,
281 None,
282 Some("full".to_string()),
283 Some("kanban-planning".to_string()),
284 )
285 .await
286 .map_err(|error| format!("Failed to create ACP session: {}", error))?;
287
288 state
289 .acp_session_store
290 .create(CreateAcpSessionParams {
291 id: &session_id,
292 cwd: &cwd,
293 branch: None,
294 workspace_id: &task.workspace_id,
295 provider: Some(provider.as_str()),
296 role: Some(role.as_str()),
297 parent_session_id: None,
298 })
299 .await
300 .map_err(|error| format!("Failed to persist ACP session: {}", error))?;
301
302 let mut ordered_columns = board.map(|value| value.columns.clone()).unwrap_or_default();
303 ordered_columns.sort_by_key(|column| column.position);
304 let next_column_id = ordered_columns
305 .iter()
306 .position(|column| Some(column.id.as_str()) == task.column_id.as_deref())
307 .and_then(|index| ordered_columns.get(index + 1))
308 .map(|column| column.id.clone());
309 let available_columns = if ordered_columns.is_empty() {
310 "- unavailable".to_string()
311 } else {
312 ordered_columns
313 .iter()
314 .map(|column| {
315 format!(
316 "- {} ({}) stage={} position={}",
317 column.id, column.name, column.stage, column.position
318 )
319 })
320 .collect::<Vec<_>>()
321 .join("\n")
322 };
323 let prompt = build_task_prompt(
324 task,
325 board
326 .map(|value| value.id.as_str())
327 .or(task.board_id.as_deref()),
328 next_column_id.as_deref(),
329 &available_columns,
330 );
331 let state_clone = state.clone();
332 let session_id_clone = session_id.clone();
333 let task_workspace = task.workspace_id.clone();
334 let provider_clone = provider.clone();
335 let cwd_clone = cwd.clone();
336 let _branch = branch.map(|value| value.to_string());
337
338 if let Err(error) = state
339 .acp_session_store
340 .set_first_prompt_sent(&session_id)
341 .await
342 {
343 tracing::error!(
344 target: "routa_kanban_prompt",
345 session_id = %session_id,
346 workspace_id = %task.workspace_id,
347 error = %error,
348 "kanban auto prompt failed to mark prompt dispatched"
349 );
350 } else {
351 tracing::info!(
352 target: "routa_kanban_prompt",
353 session_id = %session_id,
354 workspace_id = %task.workspace_id,
355 provider = %provider,
356 "kanban auto prompt marked prompt dispatched"
357 );
358 }
359
360 tracing::info!(
361 target: "routa_kanban_prompt",
362 session_id = %session_id_clone,
363 workspace_id = %task_workspace,
364 provider = %provider_clone,
365 cwd = %cwd_clone,
366 "kanban auto prompt scheduled"
367 );
368
369 tokio::spawn(async move {
370 tracing::info!(
371 target: "routa_kanban_prompt",
372 session_id = %session_id_clone,
373 workspace_id = %task_workspace,
374 provider = %provider_clone,
375 cwd = %cwd_clone,
376 "kanban auto prompt start"
377 );
378 if let Err(error) = state_clone
379 .acp_manager
380 .prompt(&session_id_clone, &prompt)
381 .await
382 {
383 tracing::error!(
384 "[kanban] Failed to auto-prompt ACP task session {} in workspace {} with provider {} at {}: {}",
385 session_id_clone,
386 task_workspace,
387 provider_clone,
388 cwd_clone,
389 error
390 );
391 return;
392 }
393
394 tracing::info!(
395 target: "routa_kanban_prompt",
396 session_id = %session_id_clone,
397 workspace_id = %task_workspace,
398 provider = %provider_clone,
399 "kanban auto prompt success"
400 );
401 if let Some(history) = state_clone
402 .acp_manager
403 .get_session_history(&session_id_clone)
404 .await
405 {
406 if let Err(error) = state_clone
407 .acp_session_store
408 .save_history(&session_id_clone, &history)
409 .await
410 {
411 tracing::error!(
412 target: "routa_kanban_prompt",
413 session_id = %session_id_clone,
414 workspace_id = %task_workspace,
415 error = %error,
416 "kanban auto prompt failed to persist history"
417 );
418 } else {
419 tracing::info!(
420 target: "routa_kanban_prompt",
421 session_id = %session_id_clone,
422 workspace_id = %task_workspace,
423 history_len = history.len(),
424 "kanban auto prompt persisted history"
425 );
426 }
427 }
428 });
429
430 apply_trigger_result(
431 task,
432 board,
433 step,
434 AgentTriggerResult {
435 session_id,
436 transport: "acp".to_string(),
437 external_task_id: None,
438 context_id: None,
439 },
440 );
441
442 Ok(())
443}
444
445async fn trigger_assigned_task_a2a_agent(
446 state: &AppState,
447 task: &mut Task,
448 board: Option<&KanbanBoard>,
449 step: Option<&KanbanAutomationStep>,
450) -> Result<(), String> {
451 let step = step.ok_or_else(|| "A2A automation requires a resolved column step".to_string())?;
452 let agent_card_url = step
453 .agent_card_url
454 .as_deref()
455 .ok_or_else(|| "A2A automation requires agentCardUrl".to_string())?;
456 let auth_headers = resolve_a2a_auth_headers(step.auth_config_id.as_deref())?;
457
458 let mut ordered_columns = board.map(|value| value.columns.clone()).unwrap_or_default();
459 ordered_columns.sort_by_key(|column| column.position);
460 let next_column_id = ordered_columns
461 .iter()
462 .position(|column| Some(column.id.as_str()) == task.column_id.as_deref())
463 .and_then(|index| ordered_columns.get(index + 1))
464 .map(|column| column.id.clone());
465 let available_columns = if ordered_columns.is_empty() {
466 "- unavailable".to_string()
467 } else {
468 ordered_columns
469 .iter()
470 .map(|column| {
471 format!(
472 "- {} ({}) stage={} position={}",
473 column.id, column.name, column.stage, column.position
474 )
475 })
476 .collect::<Vec<_>>()
477 .join("\n")
478 };
479 let prompt = build_task_prompt(
480 task,
481 board
482 .map(|value| value.id.as_str())
483 .or(task.board_id.as_deref()),
484 next_column_id.as_deref(),
485 &available_columns,
486 );
487
488 let client = reqwest::Client::new();
489 let rpc_endpoint =
490 resolve_a2a_rpc_endpoint(&client, agent_card_url, auth_headers.as_ref()).await?;
491 let request_id = uuid::Uuid::new_v4().to_string();
492 let message_id = uuid::Uuid::new_v4().to_string();
493 let response = apply_a2a_auth_headers(
494 client
495 .post(&rpc_endpoint)
496 .header(CONTENT_TYPE, "application/json")
497 .header(ACCEPT, "application/json")
498 .json(&json!({
499 "jsonrpc": "2.0",
500 "id": request_id,
501 "method": "SendMessage",
502 "params": {
503 "message": {
504 "messageId": message_id,
505 "role": "user",
506 "parts": [
507 { "text": prompt }
508 ]
509 },
510 "metadata": {
511 "workspaceId": task.workspace_id,
512 "taskId": task.id,
513 "boardId": task.board_id,
514 "columnId": task.column_id,
515 "stepId": step.id,
516 "skillId": step.skill_id,
517 "authConfigId": step.auth_config_id,
518 "role": task.assigned_role,
519 }
520 }
521 })),
522 auth_headers.as_ref(),
523 )?
524 .send()
525 .await
526 .map_err(|error| format!("Failed to send A2A request: {}", error))?;
527
528 if !response.status().is_success() {
529 return Err(format!(
530 "A2A request failed with HTTP {}",
531 response.status().as_u16()
532 ));
533 }
534
535 let payload: Value = response
536 .json()
537 .await
538 .map_err(|error| format!("Failed to decode A2A response: {}", error))?;
539 if let Some(error) = payload.get("error") {
540 let message = error
541 .get("message")
542 .and_then(Value::as_str)
543 .unwrap_or("unknown A2A error");
544 return Err(format!("A2A JSON-RPC error: {}", message));
545 }
546
547 let task_result = payload
548 .get("result")
549 .and_then(|value| value.get("task"))
550 .ok_or_else(|| "A2A response missing result.task".to_string())?;
551 let external_task_id = task_result
552 .get("id")
553 .and_then(Value::as_str)
554 .ok_or_else(|| "A2A response missing task.id".to_string())?
555 .to_string();
556 let context_id = task_result
557 .get("contextId")
558 .and_then(Value::as_str)
559 .map(ToOwned::to_owned);
560 let session_id = format!("a2a-{}", uuid::Uuid::new_v4());
561
562 apply_trigger_result(
563 task,
564 board,
565 Some(step),
566 AgentTriggerResult {
567 session_id: session_id.clone(),
568 transport: "a2a".to_string(),
569 external_task_id: Some(external_task_id.clone()),
570 context_id,
571 },
572 );
573
574 let state_clone = state.clone();
575 let task_id = task.id.clone();
576 let workspace_id = task.workspace_id.clone();
577 tokio::spawn(async move {
578 monitor_a2a_task_completion(
579 &state_clone,
580 &workspace_id,
581 &task_id,
582 &session_id,
583 &rpc_endpoint,
584 &external_task_id,
585 auth_headers,
586 )
587 .await;
588 });
589
590 Ok(())
591}
592
593#[derive(Debug)]
594struct AgentTriggerResult {
595 session_id: String,
596 transport: String,
597 external_task_id: Option<String>,
598 context_id: Option<String>,
599}
600
601fn apply_trigger_result(
602 task: &mut Task,
603 board: Option<&KanbanBoard>,
604 step: Option<&KanbanAutomationStep>,
605 result: AgentTriggerResult,
606) {
607 task.trigger_session_id = Some(result.session_id.clone());
608 if !task.session_ids.iter().any(|id| id == &result.session_id) {
609 task.session_ids.push(result.session_id.clone());
610 }
611
612 let column_name = board.and_then(|value| {
613 value.columns.iter().find_map(|column| {
614 (Some(column.id.as_str()) == task.column_id.as_deref()).then(|| column.name.clone())
615 })
616 });
617 let lane_session = TaskLaneSession {
618 session_id: result.session_id.clone(),
619 routa_agent_id: None,
620 column_id: task.column_id.clone(),
621 column_name,
622 step_id: step.map(|value| value.id.clone()),
623 step_index: None,
624 step_name: step
625 .and_then(|value| value.specialist_name.clone())
626 .or_else(|| task.assigned_specialist_name.clone()),
627 provider: task.assigned_provider.clone(),
628 role: task.assigned_role.clone(),
629 specialist_id: task.assigned_specialist_id.clone(),
630 specialist_name: task.assigned_specialist_name.clone(),
631 transport: Some(result.transport),
632 external_task_id: result.external_task_id,
633 context_id: result.context_id,
634 attempt: Some(1),
635 loop_mode: None,
636 completion_requirement: None,
637 objective: Some(task.objective.clone()),
638 last_activity_at: Some(Utc::now().to_rfc3339()),
639 recovered_from_session_id: None,
640 recovery_reason: None,
641 status: TaskLaneSessionStatus::Running,
642 started_at: Utc::now().to_rfc3339(),
643 completed_at: None,
644 };
645
646 if let Some(existing) = task
647 .lane_sessions
648 .iter_mut()
649 .find(|existing| existing.session_id == result.session_id)
650 {
651 *existing = lane_session;
652 } else {
653 task.lane_sessions.push(lane_session);
654 }
655}
656
657#[derive(Debug)]
658struct A2ATaskTerminalUpdate {
659 status: TaskLaneSessionStatus,
660 completed_at: String,
661 last_activity_at: String,
662 context_id: Option<String>,
663 error: Option<String>,
664}
665
666async fn monitor_a2a_task_completion(
667 state: &AppState,
668 workspace_id: &str,
669 task_id: &str,
670 session_id: &str,
671 rpc_endpoint: &str,
672 external_task_id: &str,
673 auth_headers: Option<HashMap<String, String>>,
674) {
675 let client = reqwest::Client::new();
676 let terminal = match wait_for_a2a_completion(
677 &client,
678 rpc_endpoint,
679 external_task_id,
680 auth_headers.as_ref(),
681 )
682 .await
683 {
684 Ok(terminal) => terminal,
685 Err(error) => {
686 let now = Utc::now().to_rfc3339();
687 let status = if error.contains("did not complete within") {
688 TaskLaneSessionStatus::TimedOut
689 } else {
690 TaskLaneSessionStatus::Failed
691 };
692 A2ATaskTerminalUpdate {
693 status,
694 completed_at: now.clone(),
695 last_activity_at: now,
696 context_id: None,
697 error: Some(error),
698 }
699 }
700 };
701
702 if let Err(error) =
703 reconcile_a2a_lane_session(state, task_id, session_id, external_task_id, terminal).await
704 {
705 tracing::warn!(
706 target: "routa_a2a",
707 workspace_id = %workspace_id,
708 task_id = %task_id,
709 session_id = %session_id,
710 external_task_id = %external_task_id,
711 error = %error,
712 "failed to persist A2A terminal state"
713 );
714 return;
715 }
716
717 emit_kanban_workspace_event(state, workspace_id, task_id).await;
718}
719
720async fn wait_for_a2a_completion(
721 client: &reqwest::Client,
722 rpc_endpoint: &str,
723 task_id: &str,
724 auth_headers: Option<&HashMap<String, String>>,
725) -> Result<A2ATaskTerminalUpdate, String> {
726 let started_at = Instant::now();
727
728 loop {
729 let terminal = get_a2a_task_update(client, rpc_endpoint, task_id, auth_headers).await?;
730 if let Some(terminal) = terminal {
731 return Ok(terminal);
732 }
733 if started_at.elapsed() >= A2A_MAX_WAIT {
734 return Err(format!(
735 "A2A task {task_id} did not complete within {}ms",
736 A2A_MAX_WAIT.as_millis()
737 ));
738 }
739 tokio::time::sleep(A2A_POLL_INTERVAL).await;
740 }
741}
742
743async fn get_a2a_task_update(
744 client: &reqwest::Client,
745 rpc_endpoint: &str,
746 task_id: &str,
747 auth_headers: Option<&HashMap<String, String>>,
748) -> Result<Option<A2ATaskTerminalUpdate>, String> {
749 let request_id = uuid::Uuid::new_v4().to_string();
750 let response = apply_a2a_auth_headers(
751 client
752 .post(rpc_endpoint)
753 .header(CONTENT_TYPE, "application/json")
754 .header(ACCEPT, "application/json")
755 .json(&json!({
756 "jsonrpc": "2.0",
757 "id": request_id,
758 "method": "GetTask",
759 "params": { "id": task_id }
760 })),
761 auth_headers,
762 )?
763 .send()
764 .await
765 .map_err(|error| format!("Failed to poll A2A task: {}", error))?;
766
767 if !response.status().is_success() {
768 return Err(format!(
769 "A2A GetTask failed with HTTP {}",
770 response.status().as_u16()
771 ));
772 }
773
774 let payload: Value = response
775 .json()
776 .await
777 .map_err(|error| format!("Failed to decode A2A task payload: {}", error))?;
778 if let Some(error) = payload.get("error") {
779 let message = error
780 .get("message")
781 .and_then(Value::as_str)
782 .unwrap_or("unknown A2A error");
783 return Err(format!("A2A JSON-RPC error: {}", message));
784 }
785
786 let task = payload
787 .get("result")
788 .and_then(|value| value.get("task"))
789 .ok_or_else(|| "A2A response missing result.task".to_string())?;
790 let state = task
791 .get("status")
792 .and_then(|value| value.get("state"))
793 .and_then(Value::as_str)
794 .ok_or_else(|| "A2A task missing status.state".to_string())?;
795 if !is_terminal_a2a_state(state) {
796 return Ok(None);
797 }
798
799 let timestamp = task
800 .get("status")
801 .and_then(|value| value.get("timestamp"))
802 .and_then(Value::as_str)
803 .map(ToOwned::to_owned)
804 .unwrap_or_else(|| Utc::now().to_rfc3339());
805 let context_id = task
806 .get("contextId")
807 .and_then(Value::as_str)
808 .map(ToOwned::to_owned);
809 let error = if state == "completed" {
810 None
811 } else {
812 Some(
813 extract_a2a_status_message(task)
814 .unwrap_or_else(|| format!("A2A task ended in state: {state}")),
815 )
816 };
817
818 Ok(Some(A2ATaskTerminalUpdate {
819 status: map_a2a_terminal_status(state),
820 completed_at: timestamp.clone(),
821 last_activity_at: timestamp,
822 context_id,
823 error,
824 }))
825}
826
827fn extract_a2a_status_message(task: &Value) -> Option<String> {
828 let parts = task
829 .get("status")
830 .and_then(|value| value.get("message"))
831 .and_then(|value| value.get("parts"))
832 .and_then(Value::as_array)?;
833 let text = parts
834 .iter()
835 .filter_map(|part| part.get("text").and_then(Value::as_str))
836 .map(str::trim)
837 .filter(|part| !part.is_empty())
838 .collect::<Vec<_>>()
839 .join(" ");
840 (!text.is_empty()).then_some(text)
841}
842
843fn is_terminal_a2a_state(state: &str) -> bool {
844 matches!(
845 state,
846 "completed" | "failed" | "canceled" | "rejected" | "auth-required"
847 )
848}
849
850fn map_a2a_terminal_status(state: &str) -> TaskLaneSessionStatus {
851 match state {
852 "completed" => TaskLaneSessionStatus::Completed,
853 _ => TaskLaneSessionStatus::Failed,
854 }
855}
856
857async fn reconcile_a2a_lane_session(
858 state: &AppState,
859 task_id: &str,
860 session_id: &str,
861 external_task_id: &str,
862 terminal: A2ATaskTerminalUpdate,
863) -> Result<(), String> {
864 let mut task = wait_for_task_persistence(state, task_id, session_id).await?;
865 let lane_session = task
866 .lane_sessions
867 .iter_mut()
868 .find(|session| session.session_id == session_id)
869 .ok_or_else(|| format!("Task {task_id} missing lane session {session_id}"))?;
870
871 lane_session.status = terminal.status;
872 lane_session.completed_at = Some(terminal.completed_at.clone());
873 lane_session.last_activity_at = Some(terminal.last_activity_at.clone());
874 if lane_session.external_task_id.is_none() {
875 lane_session.external_task_id = Some(external_task_id.to_string());
876 }
877 if terminal.context_id.is_some() {
878 lane_session.context_id = terminal.context_id.clone();
879 }
880
881 if task.trigger_session_id.as_deref() == Some(session_id) {
882 task.trigger_session_id = None;
883 }
884 task.last_sync_error = terminal.error;
885 task.updated_at = Utc::now();
886
887 state
888 .task_store
889 .save(&task)
890 .await
891 .map_err(|error| format!("Failed to save A2A task reconciliation: {}", error))
892}
893
894async fn wait_for_task_persistence(
895 state: &AppState,
896 task_id: &str,
897 session_id: &str,
898) -> Result<Task, String> {
899 for _ in 0..20 {
900 if let Some(task) = state
901 .task_store
902 .get(task_id)
903 .await
904 .map_err(|error| format!("Failed to load task {task_id}: {}", error))?
905 {
906 if task
907 .lane_sessions
908 .iter()
909 .any(|session| session.session_id == session_id)
910 {
911 return Ok(task);
912 }
913 }
914 tokio::time::sleep(Duration::from_millis(50)).await;
915 }
916
917 Err(format!(
918 "Task {task_id} did not persist lane session {session_id} before A2A reconciliation"
919 ))
920}
921
922async fn emit_kanban_workspace_event(state: &AppState, workspace_id: &str, task_id: &str) {
923 state
924 .event_bus
925 .emit(AgentEvent {
926 event_type: AgentEventType::WorkspaceUpdated,
927 agent_id: "kanban-a2a".to_string(),
928 workspace_id: workspace_id.to_string(),
929 data: serde_json::json!({
930 "scope": "kanban",
931 "entity": "task",
932 "action": "updated",
933 "resourceId": task_id,
934 "source": "system",
935 }),
936 timestamp: Utc::now(),
937 })
938 .await;
939}
940
941async fn load_task_board(state: &AppState, task: &Task) -> Result<Option<KanbanBoard>, String> {
942 if let Some(board_id) = task.board_id.as_deref() {
943 state
944 .kanban_store
945 .get(board_id)
946 .await
947 .map_err(|error| format!("Failed to load Kanban board for automation: {}", error))
948 } else {
949 Ok(None)
950 }
951}
952
953fn resolve_task_automation_step(
954 board: Option<&KanbanBoard>,
955 task: &Task,
956) -> Option<KanbanAutomationStep> {
957 board
958 .and_then(|value| {
959 value
960 .columns
961 .iter()
962 .find(|column| Some(column.id.as_str()) == task.column_id.as_deref())
963 })
964 .and_then(|column| column.automation.as_ref())
965 .filter(|automation| automation.enabled)
966 .and_then(|automation| automation.primary_step())
967}
968
969fn is_a2a_step(step: Option<&KanbanAutomationStep>) -> bool {
970 step.is_some_and(|value| {
971 matches!(value.transport, Some(KanbanTransport::A2a)) || value.agent_card_url.is_some()
972 })
973}
974
975fn resolve_a2a_auth_headers(
976 auth_config_id: Option<&str>,
977) -> Result<Option<HashMap<String, String>>, String> {
978 let Some(auth_config_id) = auth_config_id
979 .map(str::trim)
980 .filter(|value| !value.is_empty())
981 else {
982 return Ok(None);
983 };
984 let raw = std::env::var(A2A_AUTH_CONFIGS_ENV).unwrap_or_default();
985 if raw.trim().is_empty() {
986 return Err(format!(
987 "A2A auth config \"{}\" was not found in {}.",
988 auth_config_id, A2A_AUTH_CONFIGS_ENV
989 ));
990 }
991
992 let parsed: Value = serde_json::from_str(&raw)
993 .map_err(|error| format!("Invalid {} JSON: {}", A2A_AUTH_CONFIGS_ENV, error))?;
994 let config = parsed.get(auth_config_id).ok_or_else(|| {
995 format!(
996 "A2A auth config \"{}\" was not found in {}.",
997 auth_config_id, A2A_AUTH_CONFIGS_ENV
998 )
999 })?;
1000 let headers = config.get("headers").unwrap_or(config);
1001 let headers_obj = headers.as_object().ok_or_else(|| {
1002 format!(
1003 "{}.{} must be a header map or contain a string header map in \"headers\".",
1004 A2A_AUTH_CONFIGS_ENV, auth_config_id
1005 )
1006 })?;
1007
1008 let mut resolved = HashMap::new();
1009 for (name, value) in headers_obj {
1010 let value = value.as_str().ok_or_else(|| {
1011 format!(
1012 "{}.{} header {} must be a string.",
1013 A2A_AUTH_CONFIGS_ENV, auth_config_id, name
1014 )
1015 })?;
1016 resolved.insert(name.clone(), value.to_string());
1017 }
1018
1019 Ok(Some(resolved))
1020}
1021
1022fn apply_a2a_auth_headers(
1023 mut request: reqwest::RequestBuilder,
1024 auth_headers: Option<&HashMap<String, String>>,
1025) -> Result<reqwest::RequestBuilder, String> {
1026 if let Some(auth_headers) = auth_headers {
1027 for (name, value) in auth_headers {
1028 let header_name = HeaderName::try_from(name.as_str())
1029 .map_err(|error| format!("Invalid A2A auth header name {}: {}", name, error))?;
1030 let header_value = HeaderValue::from_str(value).map_err(|error| {
1031 format!(
1032 "Invalid A2A auth header value for {}: {}",
1033 header_name.as_str(),
1034 error
1035 )
1036 })?;
1037 request = request.header(header_name, header_value);
1038 }
1039 }
1040
1041 Ok(request)
1042}
1043
1044async fn resolve_a2a_rpc_endpoint(
1045 client: &reqwest::Client,
1046 url: &str,
1047 auth_headers: Option<&HashMap<String, String>>,
1048) -> Result<String, String> {
1049 if url.ends_with(".json") || url.ends_with("/agent-card") || url.ends_with("/card") {
1050 let response = apply_a2a_auth_headers(
1051 client.get(url).header(ACCEPT, "application/json"),
1052 auth_headers,
1053 )?
1054 .send()
1055 .await
1056 .map_err(|error| format!("Failed to fetch A2A agent card: {}", error))?;
1057 if !response.status().is_success() {
1058 return Err(format!(
1059 "A2A agent card fetch failed with HTTP {}",
1060 response.status().as_u16()
1061 ));
1062 }
1063 let card: Value = response
1064 .json()
1065 .await
1066 .map_err(|error| format!("Failed to decode A2A agent card: {}", error))?;
1067 let rpc_url = card
1068 .get("url")
1069 .and_then(Value::as_str)
1070 .ok_or_else(|| "A2A agent card missing url".to_string())?;
1071 absolutize_url(url, rpc_url)
1072 } else {
1073 Ok(url.to_string())
1074 }
1075}
1076
1077fn absolutize_url(base_url: &str, maybe_relative: &str) -> Result<String, String> {
1078 if maybe_relative.starts_with("http://") || maybe_relative.starts_with("https://") {
1079 return Ok(maybe_relative.to_string());
1080 }
1081
1082 let base = reqwest::Url::parse(base_url)
1083 .map_err(|error| format!("Invalid base A2A URL {}: {}", base_url, error))?;
1084 base.join(maybe_relative)
1085 .map(|url| url.to_string())
1086 .map_err(|error| format!("Invalid relative A2A URL {}: {}", maybe_relative, error))
1087}