1use chrono::Utc;
2use reqwest::header::{HeaderName, HeaderValue, ACCEPT, CONTENT_TYPE};
3use routa_core::events::{AgentEvent, AgentEventType};
4use routa_core::models::kanban::{KanbanAutomationStep, KanbanBoard, KanbanTransport};
5use serde_json::{json, Value};
6use std::collections::HashMap;
7use std::time::{Duration, Instant};
8
9use crate::error::ServerError;
10use crate::models::task::{Task, TaskLaneSession, TaskLaneSessionStatus};
11use crate::state::AppState;
12use routa_core::store::acp_session_store::CreateAcpSessionParams;
13
14const A2A_POLL_INTERVAL: Duration = Duration::from_secs(1);
15const A2A_MAX_WAIT: Duration = Duration::from_secs(300);
16const A2A_AUTH_CONFIGS_ENV: &str = "ROUTA_A2A_AUTH_CONFIGS";
17
18pub async fn resolve_codebase(
19 state: &AppState,
20 workspace_id: &str,
21 repo_path: Option<&str>,
22) -> Result<Option<crate::models::codebase::Codebase>, ServerError> {
23 if let Some(path) = repo_path {
24 state
25 .codebase_store
26 .find_by_repo_path(workspace_id, path)
27 .await
28 } else {
29 state.codebase_store.get_default(workspace_id).await
30 }
31}
32
33pub async fn auto_create_worktree(
34 state: &AppState,
35 task: &crate::models::task::Task,
36 codebase: &crate::models::codebase::Codebase,
37) -> Result<String, String> {
38 let short_id = &task.id[..task.id.len().min(8)];
39 let label = short_id.to_string();
40 let branch = format!("issue/{short_id}");
41
42 let workspace = state
43 .workspace_store
44 .get(&task.workspace_id)
45 .await
46 .ok()
47 .flatten();
48 let worktree_root = workspace
49 .as_ref()
50 .and_then(|ws| ws.metadata.get("worktreeRoot"))
51 .filter(|s| !s.trim().is_empty())
52 .map(std::path::PathBuf::from)
53 .unwrap_or_else(|| crate::git::get_default_workspace_worktree_root(&task.workspace_id));
54
55 let codebase_label = codebase
56 .label
57 .as_ref()
58 .map(|l| crate::git::branch_to_safe_dir_name(l))
59 .unwrap_or_else(|| crate::git::branch_to_safe_dir_name(&codebase.id));
60
61 let worktree_path = worktree_root
62 .join(&codebase_label)
63 .join(crate::git::branch_to_safe_dir_name(&label));
64
65 if let Some(parent) = worktree_path.parent() {
66 std::fs::create_dir_all(parent)
67 .map_err(|e| format!("Failed to create worktree parent dir: {e}"))?;
68 }
69
70 let worktree_path_str = worktree_path.to_string_lossy().to_string();
71 let base_branch = codebase
72 .branch
73 .clone()
74 .unwrap_or_else(|| "main".to_string());
75
76 let worktree = crate::models::worktree::Worktree::new(
77 uuid::Uuid::new_v4().to_string(),
78 codebase.id.clone(),
79 task.workspace_id.clone(),
80 worktree_path_str.clone(),
81 branch.clone(),
82 base_branch.clone(),
83 Some(label),
84 );
85 state
86 .worktree_store
87 .save(&worktree)
88 .await
89 .map_err(|e| format!("Failed to save worktree: {e}"))?;
90
91 let _ = crate::git::worktree_prune(&codebase.repo_path);
92 crate::git::worktree_add(
93 &codebase.repo_path,
94 &worktree_path_str,
95 &branch,
96 &base_branch,
97 false,
98 )
99 .map_err(|e| format!("git worktree add failed: {e}"))?;
100
101 Ok(worktree.id)
102}
103
104pub async fn trigger_assigned_task_agent(
105 state: &AppState,
106 task: &mut Task,
107 cwd: Option<&str>,
108 branch: Option<&str>,
109) -> Result<(), String> {
110 let board = load_task_board(state, task).await?;
111 let step = resolve_task_automation_step(board.as_ref(), task);
112 if is_a2a_step(step.as_ref()) {
113 return trigger_assigned_task_a2a_agent(state, task, board.as_ref(), step.as_ref()).await;
114 }
115
116 trigger_assigned_task_acp_agent(state, task, board.as_ref(), step.as_ref(), cwd, branch).await
117}
118
119fn build_task_prompt(
120 task: &Task,
121 board_id: Option<&str>,
122 next_column_id: Option<&str>,
123 available_columns: &str,
124) -> String {
125 let labels = if task.labels.is_empty() {
126 "Labels: none".to_string()
127 } else {
128 format!("Labels: {}", task.labels.join(", "))
129 };
130 let lane_id = task.column_id.as_deref().unwrap_or("backlog");
131 let lane_guidance = match lane_id {
132 "dev" => vec![
133 "You are in the `dev` lane. This lane may implement the requested change, but you must keep work scoped to the current card.".to_string(),
134 "Use `routa-coordination_update_card` to record concrete progress on this card before or after meaningful implementation steps.".to_string(),
135 "When implementation for this lane is complete, use `routa-coordination_move_card` to advance the same card.".to_string(),
136 ],
137 "todo" => vec![
138 "You are in the `todo` lane. This lane does not perform full implementation work.".to_string(),
139 "Only clarify the card, update its progress or status, and move the same card forward when the lane is complete.".to_string(),
140 "Do not edit files, do not inspect the whole repository, and do not run browser tests or environment diagnostics in this lane.".to_string(),
141 ],
142 _ => vec![
143 format!("You are in the `{lane_id}` lane. Keep work scoped to the current card and this lane only."),
144 ],
145 };
146 let mut sections = vec![
147 format!("You are assigned to Kanban task: {}", task.title),
148 String::new(),
149 "## Context".to_string(),
150 String::new(),
151 "**IMPORTANT**: You are working in Kanban lane automation for exactly one existing card.".to_string(),
152 "Only operate on the current card. Do not create a new task, do not switch to a different card, and do not broaden scope.".to_string(),
153 "Use the exact MCP tool names exposed by the provider. In OpenCode, prefer `routa-coordination_update_card` and `routa-coordination_move_card`.".to_string(),
154 "When a move is blocked by missing story definition fields, use `routa-coordination_update_task` to update structured fields such as scope, acceptance criteria, verification commands, or test cases.".to_string(),
155 "Do NOT use `gh issue create`, browser automation, Playwright, repo-wide debugging, API exploration, or unrelated codebase research unless the card objective explicitly requires it.".to_string(),
156 String::new(),
157 "## Task Details".to_string(),
158 String::new(),
159 format!("**Card ID:** {}", task.id),
160 format!(
161 "**Priority:** {}",
162 task.priority.as_ref().map(|value| value.as_str()).unwrap_or("medium")
163 ),
164 board_id
165 .map(|value| format!("**Board ID:** {value}"))
166 .unwrap_or_else(|| "**Board ID:** unavailable".to_string()),
167 format!("**Current Lane:** {}", lane_id),
168 next_column_id
169 .map(|value| format!("**Next Column ID:** {value}"))
170 .unwrap_or_else(|| "**Next Column ID:** unavailable".to_string()),
171 labels,
172 task.github_url
173 .as_ref()
174 .map(|url| format!("**GitHub Issue:** {url}"))
175 .unwrap_or_else(|| "**GitHub Issue:** local-only".to_string()),
176 String::new(),
177 "## Objective".to_string(),
178 String::new(),
179 task.objective.clone(),
180 String::new(),
181 "## Board Columns".to_string(),
182 String::new(),
183 available_columns.to_string(),
184 String::new(),
185 "## Lane Guidance".to_string(),
186 String::new(),
187 lane_guidance.join("\n"),
188 String::new(),
189 ];
190
191 if let Some(test_cases) = task.test_cases.as_ref().filter(|value| !value.is_empty()) {
192 sections.push("## Test Cases".to_string());
193 sections.push(String::new());
194 sections.push(
195 test_cases
196 .iter()
197 .map(|value| format!("- {value}"))
198 .collect::<Vec<_>>()
199 .join("\n"),
200 );
201 sections.push(String::new());
202 }
203
204 sections.extend([
205 "## Available MCP Tools".to_string(),
206 String::new(),
207 "Use the exact MCP tool names exposed in this session. For OpenCode, the important ones are:".to_string(),
208 String::new(),
209 format!(
210 "- **routa-coordination_update_task**: Update structured task fields such as scope, acceptanceCriteria, verificationCommands, and testCases. Use taskId: \"{}\" when story readiness is missing.",
211 task.id
212 ),
213 format!(
214 "- **routa-coordination_update_card**: Update this card's title, description, priority, or labels. Use cardId: \"{}\"",
215 task.id
216 ),
217 "- **routa-coordination_update_card is not a story-readiness tool**: card description or comment text does not satisfy move gates for scope, acceptance criteria, verification commands, or test cases.".to_string(),
218 format!(
219 "- **routa-coordination_move_card**: Move this same card to targetColumnId \"{}\" when the current lane is complete.",
220 next_column_id.unwrap_or("the exact next column id listed above")
221 ),
222 String::new(),
223 "## Instructions".to_string(),
224 String::new(),
225 "1. Start work for the current lane immediately.".to_string(),
226 "2. Keep changes focused on this card only.".to_string(),
227 "3. Use `routa-coordination_update_task` to fix missing structured story fields, and `routa-coordination_update_card` only for card text or progress notes.".to_string(),
228 format!(
229 "4. Use the exact tool name `routa-coordination_move_card` with targetColumnId `{}` only when the current lane is complete.",
230 next_column_id.unwrap_or("the exact next column id listed above")
231 ),
232 "5. Do not guess board ids or column ids. Use the Board ID and Board Columns listed above.".to_string(),
233 "6. If blocked, update this same card with the blocking reason instead of exploring side quests.".to_string(),
234 "7. Treat lane guidance as stricter than the general card objective when they conflict.".to_string(),
235 "8. Do not run browser tests or environment diagnostics unless the card explicitly asks for them.".to_string(),
236 ]);
237
238 sections.join("\n")
239}
240
241async fn trigger_assigned_task_acp_agent(
242 state: &AppState,
243 task: &mut Task,
244 board: Option<&KanbanBoard>,
245 step: Option<&KanbanAutomationStep>,
246 cwd: Option<&str>,
247 branch: Option<&str>,
248) -> Result<(), String> {
249 let provider = task
250 .assigned_provider
251 .clone()
252 .unwrap_or_else(|| "opencode".to_string());
253 let role = task
254 .assigned_role
255 .clone()
256 .unwrap_or_else(|| "CRAFTER".to_string())
257 .to_uppercase();
258 let session_id = uuid::Uuid::new_v4().to_string();
259 let cwd = cwd
260 .map(|value| value.to_string())
261 .or_else(|| {
262 std::env::current_dir()
263 .ok()
264 .map(|path| path.to_string_lossy().to_string())
265 })
266 .unwrap_or_else(|| ".".to_string());
267
268 state
269 .acp_manager
270 .create_session(
271 session_id.clone(),
272 cwd.clone(),
273 task.workspace_id.clone(),
274 Some(provider.clone()),
275 Some(role.clone()),
276 None,
277 None,
278 Some("full".to_string()),
279 Some("kanban-planning".to_string()),
280 )
281 .await
282 .map_err(|error| format!("Failed to create ACP session: {error}"))?;
283
284 state
285 .acp_session_store
286 .create(CreateAcpSessionParams {
287 id: &session_id,
288 cwd: &cwd,
289 branch: None,
290 workspace_id: &task.workspace_id,
291 provider: Some(provider.as_str()),
292 role: Some(role.as_str()),
293 custom_command: None,
294 custom_args: None,
295 parent_session_id: None,
296 })
297 .await
298 .map_err(|error| format!("Failed to persist ACP session: {error}"))?;
299
300 let mut ordered_columns = board.map(|value| value.columns.clone()).unwrap_or_default();
301 ordered_columns.sort_by_key(|column| column.position);
302 let next_column_id = ordered_columns
303 .iter()
304 .position(|column| Some(column.id.as_str()) == task.column_id.as_deref())
305 .and_then(|index| ordered_columns.get(index + 1))
306 .map(|column| column.id.clone());
307 let available_columns = if ordered_columns.is_empty() {
308 "- unavailable".to_string()
309 } else {
310 ordered_columns
311 .iter()
312 .map(|column| {
313 format!(
314 "- {} ({}) stage={} position={}",
315 column.id, column.name, column.stage, column.position
316 )
317 })
318 .collect::<Vec<_>>()
319 .join("\n")
320 };
321 let prompt = build_task_prompt(
322 task,
323 board
324 .map(|value| value.id.as_str())
325 .or(task.board_id.as_deref()),
326 next_column_id.as_deref(),
327 &available_columns,
328 );
329 let state_clone = state.clone();
330 let session_id_clone = session_id.clone();
331 let task_workspace = task.workspace_id.clone();
332 let provider_clone = provider.clone();
333 let cwd_clone = cwd.clone();
334 let _branch = branch.map(|value| value.to_string());
335
336 if let Err(error) = state
337 .acp_session_store
338 .set_first_prompt_sent(&session_id)
339 .await
340 {
341 tracing::error!(
342 target: "routa_kanban_prompt",
343 session_id = %session_id,
344 workspace_id = %task.workspace_id,
345 error = %error,
346 "kanban auto prompt failed to mark prompt dispatched"
347 );
348 } else {
349 tracing::info!(
350 target: "routa_kanban_prompt",
351 session_id = %session_id,
352 workspace_id = %task.workspace_id,
353 provider = %provider,
354 "kanban auto prompt marked prompt dispatched"
355 );
356 }
357
358 tracing::info!(
359 target: "routa_kanban_prompt",
360 session_id = %session_id_clone,
361 workspace_id = %task_workspace,
362 provider = %provider_clone,
363 cwd = %cwd_clone,
364 "kanban auto prompt scheduled"
365 );
366
367 tokio::spawn(async move {
368 tracing::info!(
369 target: "routa_kanban_prompt",
370 session_id = %session_id_clone,
371 workspace_id = %task_workspace,
372 provider = %provider_clone,
373 cwd = %cwd_clone,
374 "kanban auto prompt start"
375 );
376 if let Err(error) = state_clone
377 .acp_manager
378 .prompt(&session_id_clone, &prompt)
379 .await
380 {
381 tracing::error!(
382 "[kanban] Failed to auto-prompt ACP task session {} in workspace {} with provider {} at {}: {}",
383 session_id_clone,
384 task_workspace,
385 provider_clone,
386 cwd_clone,
387 error
388 );
389 return;
390 }
391
392 tracing::info!(
393 target: "routa_kanban_prompt",
394 session_id = %session_id_clone,
395 workspace_id = %task_workspace,
396 provider = %provider_clone,
397 "kanban auto prompt success"
398 );
399 if let Some(history) = state_clone
400 .acp_manager
401 .get_session_history(&session_id_clone)
402 .await
403 {
404 if let Err(error) = state_clone
405 .acp_session_store
406 .save_history(&session_id_clone, &history)
407 .await
408 {
409 tracing::error!(
410 target: "routa_kanban_prompt",
411 session_id = %session_id_clone,
412 workspace_id = %task_workspace,
413 error = %error,
414 "kanban auto prompt failed to persist history"
415 );
416 } else {
417 tracing::info!(
418 target: "routa_kanban_prompt",
419 session_id = %session_id_clone,
420 workspace_id = %task_workspace,
421 history_len = history.len(),
422 "kanban auto prompt persisted history"
423 );
424 }
425 }
426 });
427
428 apply_trigger_result(
429 task,
430 board,
431 step,
432 AgentTriggerResult {
433 session_id,
434 transport: "acp".to_string(),
435 external_task_id: None,
436 context_id: None,
437 },
438 );
439
440 Ok(())
441}
442
443async fn trigger_assigned_task_a2a_agent(
444 state: &AppState,
445 task: &mut Task,
446 board: Option<&KanbanBoard>,
447 step: Option<&KanbanAutomationStep>,
448) -> Result<(), String> {
449 let step = step.ok_or_else(|| "A2A automation requires a resolved column step".to_string())?;
450 let agent_card_url = step
451 .agent_card_url
452 .as_deref()
453 .ok_or_else(|| "A2A automation requires agentCardUrl".to_string())?;
454 let auth_headers = resolve_a2a_auth_headers(step.auth_config_id.as_deref())?;
455
456 let mut ordered_columns = board.map(|value| value.columns.clone()).unwrap_or_default();
457 ordered_columns.sort_by_key(|column| column.position);
458 let next_column_id = ordered_columns
459 .iter()
460 .position(|column| Some(column.id.as_str()) == task.column_id.as_deref())
461 .and_then(|index| ordered_columns.get(index + 1))
462 .map(|column| column.id.clone());
463 let available_columns = if ordered_columns.is_empty() {
464 "- unavailable".to_string()
465 } else {
466 ordered_columns
467 .iter()
468 .map(|column| {
469 format!(
470 "- {} ({}) stage={} position={}",
471 column.id, column.name, column.stage, column.position
472 )
473 })
474 .collect::<Vec<_>>()
475 .join("\n")
476 };
477 let prompt = build_task_prompt(
478 task,
479 board
480 .map(|value| value.id.as_str())
481 .or(task.board_id.as_deref()),
482 next_column_id.as_deref(),
483 &available_columns,
484 );
485
486 let client = reqwest::Client::new();
487 let rpc_endpoint =
488 resolve_a2a_rpc_endpoint(&client, agent_card_url, auth_headers.as_ref()).await?;
489 let request_id = uuid::Uuid::new_v4().to_string();
490 let message_id = uuid::Uuid::new_v4().to_string();
491 let response = apply_a2a_auth_headers(
492 client
493 .post(&rpc_endpoint)
494 .header(CONTENT_TYPE, "application/json")
495 .header(ACCEPT, "application/json")
496 .json(&json!({
497 "jsonrpc": "2.0",
498 "id": request_id,
499 "method": "SendMessage",
500 "params": {
501 "message": {
502 "messageId": message_id,
503 "role": "user",
504 "parts": [
505 { "text": prompt }
506 ]
507 },
508 "metadata": {
509 "workspaceId": task.workspace_id,
510 "taskId": task.id,
511 "boardId": task.board_id,
512 "columnId": task.column_id,
513 "stepId": step.id,
514 "skillId": step.skill_id,
515 "authConfigId": step.auth_config_id,
516 "role": task.assigned_role,
517 }
518 }
519 })),
520 auth_headers.as_ref(),
521 )?
522 .send()
523 .await
524 .map_err(|error| format!("Failed to send A2A request: {error}"))?;
525
526 if !response.status().is_success() {
527 return Err(format!(
528 "A2A request failed with HTTP {}",
529 response.status().as_u16()
530 ));
531 }
532
533 let payload: Value = response
534 .json()
535 .await
536 .map_err(|error| format!("Failed to decode A2A response: {error}"))?;
537 if let Some(error) = payload.get("error") {
538 let message = error
539 .get("message")
540 .and_then(Value::as_str)
541 .unwrap_or("unknown A2A error");
542 return Err(format!("A2A JSON-RPC error: {message}"));
543 }
544
545 let task_result = payload
546 .get("result")
547 .and_then(|value| value.get("task"))
548 .ok_or_else(|| "A2A response missing result.task".to_string())?;
549 let external_task_id = task_result
550 .get("id")
551 .and_then(Value::as_str)
552 .ok_or_else(|| "A2A response missing task.id".to_string())?
553 .to_string();
554 let context_id = task_result
555 .get("contextId")
556 .and_then(Value::as_str)
557 .map(ToOwned::to_owned);
558 let session_id = format!("a2a-{}", uuid::Uuid::new_v4());
559
560 apply_trigger_result(
561 task,
562 board,
563 Some(step),
564 AgentTriggerResult {
565 session_id: session_id.clone(),
566 transport: "a2a".to_string(),
567 external_task_id: Some(external_task_id.clone()),
568 context_id,
569 },
570 );
571
572 let state_clone = state.clone();
573 let task_id = task.id.clone();
574 let workspace_id = task.workspace_id.clone();
575 tokio::spawn(async move {
576 monitor_a2a_task_completion(
577 &state_clone,
578 &workspace_id,
579 &task_id,
580 &session_id,
581 &rpc_endpoint,
582 &external_task_id,
583 auth_headers,
584 )
585 .await;
586 });
587
588 Ok(())
589}
590
591#[derive(Debug)]
592struct AgentTriggerResult {
593 session_id: String,
594 transport: String,
595 external_task_id: Option<String>,
596 context_id: Option<String>,
597}
598
599fn apply_trigger_result(
600 task: &mut Task,
601 board: Option<&KanbanBoard>,
602 step: Option<&KanbanAutomationStep>,
603 result: AgentTriggerResult,
604) {
605 task.trigger_session_id = Some(result.session_id.clone());
606 if !task.session_ids.iter().any(|id| id == &result.session_id) {
607 task.session_ids.push(result.session_id.clone());
608 }
609
610 let column_name = board.and_then(|value| {
611 value.columns.iter().find_map(|column| {
612 (Some(column.id.as_str()) == task.column_id.as_deref()).then(|| column.name.clone())
613 })
614 });
615 let lane_session = TaskLaneSession {
616 session_id: result.session_id.clone(),
617 routa_agent_id: None,
618 column_id: task.column_id.clone(),
619 column_name,
620 step_id: step.map(|value| value.id.clone()),
621 step_index: None,
622 step_name: step
623 .and_then(|value| value.specialist_name.clone())
624 .or_else(|| task.assigned_specialist_name.clone()),
625 provider: task.assigned_provider.clone(),
626 role: task.assigned_role.clone(),
627 specialist_id: task.assigned_specialist_id.clone(),
628 specialist_name: task.assigned_specialist_name.clone(),
629 transport: Some(result.transport),
630 external_task_id: result.external_task_id,
631 context_id: result.context_id,
632 attempt: Some(1),
633 loop_mode: None,
634 completion_requirement: None,
635 objective: Some(task.objective.clone()),
636 last_activity_at: Some(Utc::now().to_rfc3339()),
637 recovered_from_session_id: None,
638 recovery_reason: None,
639 status: TaskLaneSessionStatus::Running,
640 started_at: Utc::now().to_rfc3339(),
641 completed_at: None,
642 };
643
644 if let Some(existing) = task
645 .lane_sessions
646 .iter_mut()
647 .find(|existing| existing.session_id == result.session_id)
648 {
649 *existing = lane_session;
650 } else {
651 task.lane_sessions.push(lane_session);
652 }
653}
654
655#[derive(Debug)]
656struct A2ATaskTerminalUpdate {
657 status: TaskLaneSessionStatus,
658 completed_at: String,
659 last_activity_at: String,
660 context_id: Option<String>,
661 error: Option<String>,
662}
663
664async fn monitor_a2a_task_completion(
665 state: &AppState,
666 workspace_id: &str,
667 task_id: &str,
668 session_id: &str,
669 rpc_endpoint: &str,
670 external_task_id: &str,
671 auth_headers: Option<HashMap<String, String>>,
672) {
673 let client = reqwest::Client::new();
674 let terminal = match wait_for_a2a_completion(
675 &client,
676 rpc_endpoint,
677 external_task_id,
678 auth_headers.as_ref(),
679 )
680 .await
681 {
682 Ok(terminal) => terminal,
683 Err(error) => {
684 let now = Utc::now().to_rfc3339();
685 let status = if error.contains("did not complete within") {
686 TaskLaneSessionStatus::TimedOut
687 } else {
688 TaskLaneSessionStatus::Failed
689 };
690 A2ATaskTerminalUpdate {
691 status,
692 completed_at: now.clone(),
693 last_activity_at: now,
694 context_id: None,
695 error: Some(error),
696 }
697 }
698 };
699
700 if let Err(error) =
701 reconcile_a2a_lane_session(state, task_id, session_id, external_task_id, terminal).await
702 {
703 tracing::warn!(
704 target: "routa_a2a",
705 workspace_id = %workspace_id,
706 task_id = %task_id,
707 session_id = %session_id,
708 external_task_id = %external_task_id,
709 error = %error,
710 "failed to persist A2A terminal state"
711 );
712 return;
713 }
714
715 emit_kanban_workspace_event(state, workspace_id, task_id).await;
716}
717
718async fn wait_for_a2a_completion(
719 client: &reqwest::Client,
720 rpc_endpoint: &str,
721 task_id: &str,
722 auth_headers: Option<&HashMap<String, String>>,
723) -> Result<A2ATaskTerminalUpdate, String> {
724 let started_at = Instant::now();
725
726 loop {
727 let terminal = get_a2a_task_update(client, rpc_endpoint, task_id, auth_headers).await?;
728 if let Some(terminal) = terminal {
729 return Ok(terminal);
730 }
731 if started_at.elapsed() >= A2A_MAX_WAIT {
732 return Err(format!(
733 "A2A task {task_id} did not complete within {}ms",
734 A2A_MAX_WAIT.as_millis()
735 ));
736 }
737 tokio::time::sleep(A2A_POLL_INTERVAL).await;
738 }
739}
740
741async fn get_a2a_task_update(
742 client: &reqwest::Client,
743 rpc_endpoint: &str,
744 task_id: &str,
745 auth_headers: Option<&HashMap<String, String>>,
746) -> Result<Option<A2ATaskTerminalUpdate>, String> {
747 let request_id = uuid::Uuid::new_v4().to_string();
748 let response = apply_a2a_auth_headers(
749 client
750 .post(rpc_endpoint)
751 .header(CONTENT_TYPE, "application/json")
752 .header(ACCEPT, "application/json")
753 .json(&json!({
754 "jsonrpc": "2.0",
755 "id": request_id,
756 "method": "GetTask",
757 "params": { "id": task_id }
758 })),
759 auth_headers,
760 )?
761 .send()
762 .await
763 .map_err(|error| format!("Failed to poll A2A task: {error}"))?;
764
765 if !response.status().is_success() {
766 return Err(format!(
767 "A2A GetTask failed with HTTP {}",
768 response.status().as_u16()
769 ));
770 }
771
772 let payload: Value = response
773 .json()
774 .await
775 .map_err(|error| format!("Failed to decode A2A task payload: {error}"))?;
776 if let Some(error) = payload.get("error") {
777 let message = error
778 .get("message")
779 .and_then(Value::as_str)
780 .unwrap_or("unknown A2A error");
781 return Err(format!("A2A JSON-RPC error: {message}"));
782 }
783
784 let task = payload
785 .get("result")
786 .and_then(|value| value.get("task"))
787 .ok_or_else(|| "A2A response missing result.task".to_string())?;
788 let state = task
789 .get("status")
790 .and_then(|value| value.get("state"))
791 .and_then(Value::as_str)
792 .ok_or_else(|| "A2A task missing status.state".to_string())?;
793 if !is_terminal_a2a_state(state) {
794 return Ok(None);
795 }
796
797 let timestamp = task
798 .get("status")
799 .and_then(|value| value.get("timestamp"))
800 .and_then(Value::as_str)
801 .map(ToOwned::to_owned)
802 .unwrap_or_else(|| Utc::now().to_rfc3339());
803 let context_id = task
804 .get("contextId")
805 .and_then(Value::as_str)
806 .map(ToOwned::to_owned);
807 let error = if state == "completed" {
808 None
809 } else {
810 Some(
811 extract_a2a_status_message(task)
812 .unwrap_or_else(|| format!("A2A task ended in state: {state}")),
813 )
814 };
815
816 Ok(Some(A2ATaskTerminalUpdate {
817 status: map_a2a_terminal_status(state),
818 completed_at: timestamp.clone(),
819 last_activity_at: timestamp,
820 context_id,
821 error,
822 }))
823}
824
825fn extract_a2a_status_message(task: &Value) -> Option<String> {
826 let parts = task
827 .get("status")
828 .and_then(|value| value.get("message"))
829 .and_then(|value| value.get("parts"))
830 .and_then(Value::as_array)?;
831 let text = parts
832 .iter()
833 .filter_map(|part| part.get("text").and_then(Value::as_str))
834 .map(str::trim)
835 .filter(|part| !part.is_empty())
836 .collect::<Vec<_>>()
837 .join(" ");
838 (!text.is_empty()).then_some(text)
839}
840
841fn is_terminal_a2a_state(state: &str) -> bool {
842 matches!(
843 state,
844 "completed" | "failed" | "canceled" | "rejected" | "auth-required"
845 )
846}
847
848fn map_a2a_terminal_status(state: &str) -> TaskLaneSessionStatus {
849 match state {
850 "completed" => TaskLaneSessionStatus::Completed,
851 _ => TaskLaneSessionStatus::Failed,
852 }
853}
854
855async fn reconcile_a2a_lane_session(
856 state: &AppState,
857 task_id: &str,
858 session_id: &str,
859 external_task_id: &str,
860 terminal: A2ATaskTerminalUpdate,
861) -> Result<(), String> {
862 let mut task = wait_for_task_persistence(state, task_id, session_id).await?;
863 let lane_session = task
864 .lane_sessions
865 .iter_mut()
866 .find(|session| session.session_id == session_id)
867 .ok_or_else(|| format!("Task {task_id} missing lane session {session_id}"))?;
868
869 lane_session.status = terminal.status;
870 lane_session.completed_at = Some(terminal.completed_at.clone());
871 lane_session.last_activity_at = Some(terminal.last_activity_at.clone());
872 if lane_session.external_task_id.is_none() {
873 lane_session.external_task_id = Some(external_task_id.to_string());
874 }
875 if terminal.context_id.is_some() {
876 lane_session.context_id = terminal.context_id.clone();
877 }
878
879 if task.trigger_session_id.as_deref() == Some(session_id) {
880 task.trigger_session_id = None;
881 }
882 task.last_sync_error = terminal.error;
883 task.updated_at = Utc::now();
884
885 state
886 .task_store
887 .save(&task)
888 .await
889 .map_err(|error| format!("Failed to save A2A task reconciliation: {error}"))
890}
891
892async fn wait_for_task_persistence(
893 state: &AppState,
894 task_id: &str,
895 session_id: &str,
896) -> Result<Task, String> {
897 for _ in 0..20 {
898 if let Some(task) = state
899 .task_store
900 .get(task_id)
901 .await
902 .map_err(|error| format!("Failed to load task {task_id}: {error}"))?
903 {
904 if task
905 .lane_sessions
906 .iter()
907 .any(|session| session.session_id == session_id)
908 {
909 return Ok(task);
910 }
911 }
912 tokio::time::sleep(Duration::from_millis(50)).await;
913 }
914
915 Err(format!(
916 "Task {task_id} did not persist lane session {session_id} before A2A reconciliation"
917 ))
918}
919
920async fn emit_kanban_workspace_event(state: &AppState, workspace_id: &str, task_id: &str) {
921 state
922 .event_bus
923 .emit(AgentEvent {
924 event_type: AgentEventType::WorkspaceUpdated,
925 agent_id: "kanban-a2a".to_string(),
926 workspace_id: workspace_id.to_string(),
927 data: serde_json::json!({
928 "scope": "kanban",
929 "entity": "task",
930 "action": "updated",
931 "resourceId": task_id,
932 "source": "system",
933 }),
934 timestamp: Utc::now(),
935 })
936 .await;
937}
938
939async fn load_task_board(state: &AppState, task: &Task) -> Result<Option<KanbanBoard>, String> {
940 if let Some(board_id) = task.board_id.as_deref() {
941 state
942 .kanban_store
943 .get(board_id)
944 .await
945 .map_err(|error| format!("Failed to load Kanban board for automation: {error}"))
946 } else {
947 Ok(None)
948 }
949}
950
951fn resolve_task_automation_step(
952 board: Option<&KanbanBoard>,
953 task: &Task,
954) -> Option<KanbanAutomationStep> {
955 board
956 .and_then(|value| {
957 value
958 .columns
959 .iter()
960 .find(|column| Some(column.id.as_str()) == task.column_id.as_deref())
961 })
962 .and_then(|column| column.automation.as_ref())
963 .filter(|automation| automation.enabled)
964 .and_then(|automation| automation.primary_step())
965}
966
967fn is_a2a_step(step: Option<&KanbanAutomationStep>) -> bool {
968 step.is_some_and(|value| {
969 matches!(value.transport, Some(KanbanTransport::A2a)) || value.agent_card_url.is_some()
970 })
971}
972
973fn resolve_a2a_auth_headers(
974 auth_config_id: Option<&str>,
975) -> Result<Option<HashMap<String, String>>, String> {
976 let Some(auth_config_id) = auth_config_id
977 .map(str::trim)
978 .filter(|value| !value.is_empty())
979 else {
980 return Ok(None);
981 };
982 let raw = std::env::var(A2A_AUTH_CONFIGS_ENV).unwrap_or_default();
983 if raw.trim().is_empty() {
984 return Err(format!(
985 "A2A auth config \"{auth_config_id}\" was not found in {A2A_AUTH_CONFIGS_ENV}."
986 ));
987 }
988
989 let parsed: Value = serde_json::from_str(&raw)
990 .map_err(|error| format!("Invalid {A2A_AUTH_CONFIGS_ENV} JSON: {error}"))?;
991 let config = parsed.get(auth_config_id).ok_or_else(|| {
992 format!("A2A auth config \"{auth_config_id}\" was not found in {A2A_AUTH_CONFIGS_ENV}.")
993 })?;
994 let headers = config.get("headers").unwrap_or(config);
995 let headers_obj = headers.as_object().ok_or_else(|| {
996 format!(
997 "{A2A_AUTH_CONFIGS_ENV}.{auth_config_id} must be a header map or contain a string header map in \"headers\"."
998 )
999 })?;
1000
1001 let mut resolved = HashMap::new();
1002 for (name, value) in headers_obj {
1003 let value = value.as_str().ok_or_else(|| {
1004 format!("{A2A_AUTH_CONFIGS_ENV}.{auth_config_id} header {name} must be a string.")
1005 })?;
1006 resolved.insert(name.clone(), value.to_string());
1007 }
1008
1009 Ok(Some(resolved))
1010}
1011
1012fn apply_a2a_auth_headers(
1013 mut request: reqwest::RequestBuilder,
1014 auth_headers: Option<&HashMap<String, String>>,
1015) -> Result<reqwest::RequestBuilder, String> {
1016 if let Some(auth_headers) = auth_headers {
1017 for (name, value) in auth_headers {
1018 let header_name = HeaderName::try_from(name.as_str())
1019 .map_err(|error| format!("Invalid A2A auth header name {name}: {error}"))?;
1020 let header_value = HeaderValue::from_str(value).map_err(|error| {
1021 format!(
1022 "Invalid A2A auth header value for {}: {}",
1023 header_name.as_str(),
1024 error
1025 )
1026 })?;
1027 request = request.header(header_name, header_value);
1028 }
1029 }
1030
1031 Ok(request)
1032}
1033
1034async fn resolve_a2a_rpc_endpoint(
1035 client: &reqwest::Client,
1036 url: &str,
1037 auth_headers: Option<&HashMap<String, String>>,
1038) -> Result<String, String> {
1039 if url.ends_with(".json") || url.ends_with("/agent-card") || url.ends_with("/card") {
1040 let response = apply_a2a_auth_headers(
1041 client.get(url).header(ACCEPT, "application/json"),
1042 auth_headers,
1043 )?
1044 .send()
1045 .await
1046 .map_err(|error| format!("Failed to fetch A2A agent card: {error}"))?;
1047 if !response.status().is_success() {
1048 return Err(format!(
1049 "A2A agent card fetch failed with HTTP {}",
1050 response.status().as_u16()
1051 ));
1052 }
1053 let card: Value = response
1054 .json()
1055 .await
1056 .map_err(|error| format!("Failed to decode A2A agent card: {error}"))?;
1057 let rpc_url = card
1058 .get("url")
1059 .and_then(Value::as_str)
1060 .ok_or_else(|| "A2A agent card missing url".to_string())?;
1061 absolutize_url(url, rpc_url)
1062 } else {
1063 Ok(url.to_string())
1064 }
1065}
1066
1067fn absolutize_url(base_url: &str, maybe_relative: &str) -> Result<String, String> {
1068 if maybe_relative.starts_with("http://") || maybe_relative.starts_with("https://") {
1069 return Ok(maybe_relative.to_string());
1070 }
1071
1072 let base = reqwest::Url::parse(base_url)
1073 .map_err(|error| format!("Invalid base A2A URL {base_url}: {error}"))?;
1074 base.join(maybe_relative)
1075 .map(|url| url.to_string())
1076 .map_err(|error| format!("Invalid relative A2A URL {maybe_relative}: {error}"))
1077}