1use chrono::Utc;
2use reqwest::header::{HeaderName, HeaderValue, ACCEPT, CONTENT_TYPE};
3use routa_core::events::{AgentEvent, AgentEventType};
4use routa_core::models::kanban::{KanbanAutomationStep, KanbanBoard, KanbanTransport};
5use serde_json::{json, Value};
6use std::collections::HashMap;
7use std::time::{Duration, Instant};
8
9use crate::error::ServerError;
10use crate::models::task::{Task, TaskLaneSession, TaskLaneSessionStatus};
11use crate::state::AppState;
12use routa_core::store::acp_session_store::CreateAcpSessionParams;
13
14const A2A_POLL_INTERVAL: Duration = Duration::from_secs(1);
15const A2A_MAX_WAIT: Duration = Duration::from_secs(300);
16const A2A_AUTH_CONFIGS_ENV: &str = "ROUTA_A2A_AUTH_CONFIGS";
17
18pub async fn resolve_codebase(
19 state: &AppState,
20 workspace_id: &str,
21 repo_path: Option<&str>,
22) -> Result<Option<crate::models::codebase::Codebase>, ServerError> {
23 if let Some(path) = repo_path {
24 state
25 .codebase_store
26 .find_by_repo_path(workspace_id, path)
27 .await
28 } else {
29 state.codebase_store.get_default(workspace_id).await
30 }
31}
32
33pub async fn auto_create_worktree(
34 state: &AppState,
35 task: &crate::models::task::Task,
36 codebase: &crate::models::codebase::Codebase,
37) -> Result<String, String> {
38 let short_id = &task.id[..task.id.len().min(8)];
39 let label = short_id.to_string();
40 let branch = format!("issue/{}", short_id);
41
42 let workspace = state
43 .workspace_store
44 .get(&task.workspace_id)
45 .await
46 .ok()
47 .flatten();
48 let worktree_root = workspace
49 .as_ref()
50 .and_then(|ws| ws.metadata.get("worktreeRoot"))
51 .filter(|s| !s.trim().is_empty())
52 .map(std::path::PathBuf::from)
53 .unwrap_or_else(|| crate::git::get_default_workspace_worktree_root(&task.workspace_id));
54
55 let codebase_label = codebase
56 .label
57 .as_ref()
58 .map(|l| crate::git::branch_to_safe_dir_name(l))
59 .unwrap_or_else(|| crate::git::branch_to_safe_dir_name(&codebase.id));
60
61 let worktree_path = worktree_root
62 .join(&codebase_label)
63 .join(crate::git::branch_to_safe_dir_name(&label));
64
65 if let Some(parent) = worktree_path.parent() {
66 std::fs::create_dir_all(parent)
67 .map_err(|e| format!("Failed to create worktree parent dir: {}", e))?;
68 }
69
70 let worktree_path_str = worktree_path.to_string_lossy().to_string();
71 let base_branch = codebase
72 .branch
73 .clone()
74 .unwrap_or_else(|| "main".to_string());
75
76 let worktree = crate::models::worktree::Worktree::new(
77 uuid::Uuid::new_v4().to_string(),
78 codebase.id.clone(),
79 task.workspace_id.clone(),
80 worktree_path_str.clone(),
81 branch.clone(),
82 base_branch.clone(),
83 Some(label),
84 );
85 state
86 .worktree_store
87 .save(&worktree)
88 .await
89 .map_err(|e| format!("Failed to save worktree: {}", e))?;
90
91 let _ = crate::git::worktree_prune(&codebase.repo_path);
92 crate::git::worktree_add(
93 &codebase.repo_path,
94 &worktree_path_str,
95 &branch,
96 &base_branch,
97 false,
98 )
99 .map_err(|e| format!("git worktree add failed: {}", e))?;
100
101 Ok(worktree.id)
102}
103
104pub async fn trigger_assigned_task_agent(
105 state: &AppState,
106 task: &mut Task,
107 cwd: Option<&str>,
108 branch: Option<&str>,
109) -> Result<(), String> {
110 let board = load_task_board(state, task).await?;
111 let step = resolve_task_automation_step(board.as_ref(), task);
112 if is_a2a_step(step.as_ref()) {
113 return trigger_assigned_task_a2a_agent(state, task, board.as_ref(), step.as_ref()).await;
114 }
115
116 trigger_assigned_task_acp_agent(state, task, board.as_ref(), step.as_ref(), cwd, branch).await
117}
118
119fn build_task_prompt(
120 task: &Task,
121 board_id: Option<&str>,
122 next_column_id: Option<&str>,
123 available_columns: &str,
124) -> String {
125 let labels = if task.labels.is_empty() {
126 "Labels: none".to_string()
127 } else {
128 format!("Labels: {}", task.labels.join(", "))
129 };
130 let lane_id = task.column_id.as_deref().unwrap_or("backlog");
131 let lane_guidance = match lane_id {
132 "dev" => vec![
133 "You are in the `dev` lane. This lane may implement the requested change, but you must keep work scoped to the current card.".to_string(),
134 "Use `routa-coordination_update_card` to record concrete progress on this card before or after meaningful implementation steps.".to_string(),
135 "When implementation for this lane is complete, use `routa-coordination_move_card` to advance the same card.".to_string(),
136 ],
137 "todo" => vec![
138 "You are in the `todo` lane. This lane does not perform full implementation work.".to_string(),
139 "Only clarify the card, update its progress or status, and move the same card forward when the lane is complete.".to_string(),
140 "Do not edit files, do not inspect the whole repository, and do not run browser tests or environment diagnostics in this lane.".to_string(),
141 ],
142 _ => vec![
143 format!("You are in the `{lane_id}` lane. Keep work scoped to the current card and this lane only."),
144 ],
145 };
146 let mut sections = vec![
147 format!("You are assigned to Kanban task: {}", task.title),
148 String::new(),
149 "## Context".to_string(),
150 String::new(),
151 "**IMPORTANT**: You are working in Kanban lane automation for exactly one existing card.".to_string(),
152 "Only operate on the current card. Do not create a new task, do not switch to a different card, and do not broaden scope.".to_string(),
153 "Use the exact MCP tool names exposed by the provider. In OpenCode, prefer `routa-coordination_update_card` and `routa-coordination_move_card`.".to_string(),
154 "Do NOT use `gh issue create`, browser automation, Playwright, repo-wide debugging, API exploration, or unrelated codebase research unless the card objective explicitly requires it.".to_string(),
155 String::new(),
156 "## Task Details".to_string(),
157 String::new(),
158 format!("**Card ID:** {}", task.id),
159 format!(
160 "**Priority:** {}",
161 task.priority.as_ref().map(|value| value.as_str()).unwrap_or("medium")
162 ),
163 board_id
164 .map(|value| format!("**Board ID:** {}", value))
165 .unwrap_or_else(|| "**Board ID:** unavailable".to_string()),
166 format!("**Current Lane:** {}", lane_id),
167 next_column_id
168 .map(|value| format!("**Next Column ID:** {}", value))
169 .unwrap_or_else(|| "**Next Column ID:** unavailable".to_string()),
170 labels,
171 task.github_url
172 .as_ref()
173 .map(|url| format!("**GitHub Issue:** {}", url))
174 .unwrap_or_else(|| "**GitHub Issue:** local-only".to_string()),
175 String::new(),
176 "## Objective".to_string(),
177 String::new(),
178 task.objective.clone(),
179 String::new(),
180 "## Board Columns".to_string(),
181 String::new(),
182 available_columns.to_string(),
183 String::new(),
184 "## Lane Guidance".to_string(),
185 String::new(),
186 lane_guidance.join("\n"),
187 String::new(),
188 ];
189
190 if let Some(test_cases) = task.test_cases.as_ref().filter(|value| !value.is_empty()) {
191 sections.push("## Test Cases".to_string());
192 sections.push(String::new());
193 sections.push(
194 test_cases
195 .iter()
196 .map(|value| format!("- {}", value))
197 .collect::<Vec<_>>()
198 .join("\n"),
199 );
200 sections.push(String::new());
201 }
202
203 sections.extend([
204 "## Available MCP Tools".to_string(),
205 String::new(),
206 "Use the exact MCP tool names exposed in this session. For OpenCode, the important ones are:".to_string(),
207 String::new(),
208 format!(
209 "- **routa-coordination_update_card**: Update this card's title, description, priority, or labels. Use cardId: \"{}\"",
210 task.id
211 ),
212 format!(
213 "- **routa-coordination_move_card**: Move this same card to targetColumnId \"{}\" when the current lane is complete.",
214 next_column_id.unwrap_or("the exact next column id listed above")
215 ),
216 String::new(),
217 "## Instructions".to_string(),
218 String::new(),
219 "1. Start work for the current lane immediately.".to_string(),
220 "2. Keep changes focused on this card only.".to_string(),
221 "3. Use the exact tool name `routa-coordination_update_card` to record progress on this card.".to_string(),
222 format!(
223 "4. Use the exact tool name `routa-coordination_move_card` with targetColumnId `{}` only when the current lane is complete.",
224 next_column_id.unwrap_or("the exact next column id listed above")
225 ),
226 "5. Do not guess board ids or column ids. Use the Board ID and Board Columns listed above.".to_string(),
227 "6. If blocked, update this same card with the blocking reason instead of exploring side quests.".to_string(),
228 "7. Treat lane guidance as stricter than the general card objective when they conflict.".to_string(),
229 "8. Do not run browser tests or environment diagnostics unless the card explicitly asks for them.".to_string(),
230 ]);
231
232 sections.join("\n")
233}
234
235async fn trigger_assigned_task_acp_agent(
236 state: &AppState,
237 task: &mut Task,
238 board: Option<&KanbanBoard>,
239 step: Option<&KanbanAutomationStep>,
240 cwd: Option<&str>,
241 branch: Option<&str>,
242) -> Result<(), String> {
243 let provider = task
244 .assigned_provider
245 .clone()
246 .unwrap_or_else(|| "opencode".to_string());
247 let role = task
248 .assigned_role
249 .clone()
250 .unwrap_or_else(|| "CRAFTER".to_string())
251 .to_uppercase();
252 let session_id = uuid::Uuid::new_v4().to_string();
253 let cwd = cwd
254 .map(|value| value.to_string())
255 .or_else(|| {
256 std::env::current_dir()
257 .ok()
258 .map(|path| path.to_string_lossy().to_string())
259 })
260 .unwrap_or_else(|| ".".to_string());
261
262 state
263 .acp_manager
264 .create_session(
265 session_id.clone(),
266 cwd.clone(),
267 task.workspace_id.clone(),
268 Some(provider.clone()),
269 Some(role.clone()),
270 None,
271 None,
272 Some("full".to_string()),
273 Some("kanban-planning".to_string()),
274 )
275 .await
276 .map_err(|error| format!("Failed to create ACP session: {}", error))?;
277
278 state
279 .acp_session_store
280 .create(CreateAcpSessionParams {
281 id: &session_id,
282 cwd: &cwd,
283 branch: None,
284 workspace_id: &task.workspace_id,
285 provider: Some(provider.as_str()),
286 role: Some(role.as_str()),
287 custom_command: None,
288 custom_args: None,
289 parent_session_id: None,
290 })
291 .await
292 .map_err(|error| format!("Failed to persist ACP session: {}", error))?;
293
294 let mut ordered_columns = board.map(|value| value.columns.clone()).unwrap_or_default();
295 ordered_columns.sort_by_key(|column| column.position);
296 let next_column_id = ordered_columns
297 .iter()
298 .position(|column| Some(column.id.as_str()) == task.column_id.as_deref())
299 .and_then(|index| ordered_columns.get(index + 1))
300 .map(|column| column.id.clone());
301 let available_columns = if ordered_columns.is_empty() {
302 "- unavailable".to_string()
303 } else {
304 ordered_columns
305 .iter()
306 .map(|column| {
307 format!(
308 "- {} ({}) stage={} position={}",
309 column.id, column.name, column.stage, column.position
310 )
311 })
312 .collect::<Vec<_>>()
313 .join("\n")
314 };
315 let prompt = build_task_prompt(
316 task,
317 board
318 .map(|value| value.id.as_str())
319 .or(task.board_id.as_deref()),
320 next_column_id.as_deref(),
321 &available_columns,
322 );
323 let state_clone = state.clone();
324 let session_id_clone = session_id.clone();
325 let task_workspace = task.workspace_id.clone();
326 let provider_clone = provider.clone();
327 let cwd_clone = cwd.clone();
328 let _branch = branch.map(|value| value.to_string());
329
330 if let Err(error) = state
331 .acp_session_store
332 .set_first_prompt_sent(&session_id)
333 .await
334 {
335 tracing::error!(
336 target: "routa_kanban_prompt",
337 session_id = %session_id,
338 workspace_id = %task.workspace_id,
339 error = %error,
340 "kanban auto prompt failed to mark prompt dispatched"
341 );
342 } else {
343 tracing::info!(
344 target: "routa_kanban_prompt",
345 session_id = %session_id,
346 workspace_id = %task.workspace_id,
347 provider = %provider,
348 "kanban auto prompt marked prompt dispatched"
349 );
350 }
351
352 tracing::info!(
353 target: "routa_kanban_prompt",
354 session_id = %session_id_clone,
355 workspace_id = %task_workspace,
356 provider = %provider_clone,
357 cwd = %cwd_clone,
358 "kanban auto prompt scheduled"
359 );
360
361 tokio::spawn(async move {
362 tracing::info!(
363 target: "routa_kanban_prompt",
364 session_id = %session_id_clone,
365 workspace_id = %task_workspace,
366 provider = %provider_clone,
367 cwd = %cwd_clone,
368 "kanban auto prompt start"
369 );
370 if let Err(error) = state_clone
371 .acp_manager
372 .prompt(&session_id_clone, &prompt)
373 .await
374 {
375 tracing::error!(
376 "[kanban] Failed to auto-prompt ACP task session {} in workspace {} with provider {} at {}: {}",
377 session_id_clone,
378 task_workspace,
379 provider_clone,
380 cwd_clone,
381 error
382 );
383 return;
384 }
385
386 tracing::info!(
387 target: "routa_kanban_prompt",
388 session_id = %session_id_clone,
389 workspace_id = %task_workspace,
390 provider = %provider_clone,
391 "kanban auto prompt success"
392 );
393 if let Some(history) = state_clone
394 .acp_manager
395 .get_session_history(&session_id_clone)
396 .await
397 {
398 if let Err(error) = state_clone
399 .acp_session_store
400 .save_history(&session_id_clone, &history)
401 .await
402 {
403 tracing::error!(
404 target: "routa_kanban_prompt",
405 session_id = %session_id_clone,
406 workspace_id = %task_workspace,
407 error = %error,
408 "kanban auto prompt failed to persist history"
409 );
410 } else {
411 tracing::info!(
412 target: "routa_kanban_prompt",
413 session_id = %session_id_clone,
414 workspace_id = %task_workspace,
415 history_len = history.len(),
416 "kanban auto prompt persisted history"
417 );
418 }
419 }
420 });
421
422 apply_trigger_result(
423 task,
424 board,
425 step,
426 AgentTriggerResult {
427 session_id,
428 transport: "acp".to_string(),
429 external_task_id: None,
430 context_id: None,
431 },
432 );
433
434 Ok(())
435}
436
437async fn trigger_assigned_task_a2a_agent(
438 state: &AppState,
439 task: &mut Task,
440 board: Option<&KanbanBoard>,
441 step: Option<&KanbanAutomationStep>,
442) -> Result<(), String> {
443 let step = step.ok_or_else(|| "A2A automation requires a resolved column step".to_string())?;
444 let agent_card_url = step
445 .agent_card_url
446 .as_deref()
447 .ok_or_else(|| "A2A automation requires agentCardUrl".to_string())?;
448 let auth_headers = resolve_a2a_auth_headers(step.auth_config_id.as_deref())?;
449
450 let mut ordered_columns = board.map(|value| value.columns.clone()).unwrap_or_default();
451 ordered_columns.sort_by_key(|column| column.position);
452 let next_column_id = ordered_columns
453 .iter()
454 .position(|column| Some(column.id.as_str()) == task.column_id.as_deref())
455 .and_then(|index| ordered_columns.get(index + 1))
456 .map(|column| column.id.clone());
457 let available_columns = if ordered_columns.is_empty() {
458 "- unavailable".to_string()
459 } else {
460 ordered_columns
461 .iter()
462 .map(|column| {
463 format!(
464 "- {} ({}) stage={} position={}",
465 column.id, column.name, column.stage, column.position
466 )
467 })
468 .collect::<Vec<_>>()
469 .join("\n")
470 };
471 let prompt = build_task_prompt(
472 task,
473 board
474 .map(|value| value.id.as_str())
475 .or(task.board_id.as_deref()),
476 next_column_id.as_deref(),
477 &available_columns,
478 );
479
480 let client = reqwest::Client::new();
481 let rpc_endpoint =
482 resolve_a2a_rpc_endpoint(&client, agent_card_url, auth_headers.as_ref()).await?;
483 let request_id = uuid::Uuid::new_v4().to_string();
484 let message_id = uuid::Uuid::new_v4().to_string();
485 let response = apply_a2a_auth_headers(
486 client
487 .post(&rpc_endpoint)
488 .header(CONTENT_TYPE, "application/json")
489 .header(ACCEPT, "application/json")
490 .json(&json!({
491 "jsonrpc": "2.0",
492 "id": request_id,
493 "method": "SendMessage",
494 "params": {
495 "message": {
496 "messageId": message_id,
497 "role": "user",
498 "parts": [
499 { "text": prompt }
500 ]
501 },
502 "metadata": {
503 "workspaceId": task.workspace_id,
504 "taskId": task.id,
505 "boardId": task.board_id,
506 "columnId": task.column_id,
507 "stepId": step.id,
508 "skillId": step.skill_id,
509 "authConfigId": step.auth_config_id,
510 "role": task.assigned_role,
511 }
512 }
513 })),
514 auth_headers.as_ref(),
515 )?
516 .send()
517 .await
518 .map_err(|error| format!("Failed to send A2A request: {}", error))?;
519
520 if !response.status().is_success() {
521 return Err(format!(
522 "A2A request failed with HTTP {}",
523 response.status().as_u16()
524 ));
525 }
526
527 let payload: Value = response
528 .json()
529 .await
530 .map_err(|error| format!("Failed to decode A2A response: {}", error))?;
531 if let Some(error) = payload.get("error") {
532 let message = error
533 .get("message")
534 .and_then(Value::as_str)
535 .unwrap_or("unknown A2A error");
536 return Err(format!("A2A JSON-RPC error: {}", message));
537 }
538
539 let task_result = payload
540 .get("result")
541 .and_then(|value| value.get("task"))
542 .ok_or_else(|| "A2A response missing result.task".to_string())?;
543 let external_task_id = task_result
544 .get("id")
545 .and_then(Value::as_str)
546 .ok_or_else(|| "A2A response missing task.id".to_string())?
547 .to_string();
548 let context_id = task_result
549 .get("contextId")
550 .and_then(Value::as_str)
551 .map(ToOwned::to_owned);
552 let session_id = format!("a2a-{}", uuid::Uuid::new_v4());
553
554 apply_trigger_result(
555 task,
556 board,
557 Some(step),
558 AgentTriggerResult {
559 session_id: session_id.clone(),
560 transport: "a2a".to_string(),
561 external_task_id: Some(external_task_id.clone()),
562 context_id,
563 },
564 );
565
566 let state_clone = state.clone();
567 let task_id = task.id.clone();
568 let workspace_id = task.workspace_id.clone();
569 tokio::spawn(async move {
570 monitor_a2a_task_completion(
571 &state_clone,
572 &workspace_id,
573 &task_id,
574 &session_id,
575 &rpc_endpoint,
576 &external_task_id,
577 auth_headers,
578 )
579 .await;
580 });
581
582 Ok(())
583}
584
585#[derive(Debug)]
586struct AgentTriggerResult {
587 session_id: String,
588 transport: String,
589 external_task_id: Option<String>,
590 context_id: Option<String>,
591}
592
593fn apply_trigger_result(
594 task: &mut Task,
595 board: Option<&KanbanBoard>,
596 step: Option<&KanbanAutomationStep>,
597 result: AgentTriggerResult,
598) {
599 task.trigger_session_id = Some(result.session_id.clone());
600 if !task.session_ids.iter().any(|id| id == &result.session_id) {
601 task.session_ids.push(result.session_id.clone());
602 }
603
604 let column_name = board.and_then(|value| {
605 value.columns.iter().find_map(|column| {
606 (Some(column.id.as_str()) == task.column_id.as_deref()).then(|| column.name.clone())
607 })
608 });
609 let lane_session = TaskLaneSession {
610 session_id: result.session_id.clone(),
611 routa_agent_id: None,
612 column_id: task.column_id.clone(),
613 column_name,
614 step_id: step.map(|value| value.id.clone()),
615 step_index: None,
616 step_name: step
617 .and_then(|value| value.specialist_name.clone())
618 .or_else(|| task.assigned_specialist_name.clone()),
619 provider: task.assigned_provider.clone(),
620 role: task.assigned_role.clone(),
621 specialist_id: task.assigned_specialist_id.clone(),
622 specialist_name: task.assigned_specialist_name.clone(),
623 transport: Some(result.transport),
624 external_task_id: result.external_task_id,
625 context_id: result.context_id,
626 attempt: Some(1),
627 loop_mode: None,
628 completion_requirement: None,
629 objective: Some(task.objective.clone()),
630 last_activity_at: Some(Utc::now().to_rfc3339()),
631 recovered_from_session_id: None,
632 recovery_reason: None,
633 status: TaskLaneSessionStatus::Running,
634 started_at: Utc::now().to_rfc3339(),
635 completed_at: None,
636 };
637
638 if let Some(existing) = task
639 .lane_sessions
640 .iter_mut()
641 .find(|existing| existing.session_id == result.session_id)
642 {
643 *existing = lane_session;
644 } else {
645 task.lane_sessions.push(lane_session);
646 }
647}
648
649#[derive(Debug)]
650struct A2ATaskTerminalUpdate {
651 status: TaskLaneSessionStatus,
652 completed_at: String,
653 last_activity_at: String,
654 context_id: Option<String>,
655 error: Option<String>,
656}
657
658async fn monitor_a2a_task_completion(
659 state: &AppState,
660 workspace_id: &str,
661 task_id: &str,
662 session_id: &str,
663 rpc_endpoint: &str,
664 external_task_id: &str,
665 auth_headers: Option<HashMap<String, String>>,
666) {
667 let client = reqwest::Client::new();
668 let terminal = match wait_for_a2a_completion(
669 &client,
670 rpc_endpoint,
671 external_task_id,
672 auth_headers.as_ref(),
673 )
674 .await
675 {
676 Ok(terminal) => terminal,
677 Err(error) => {
678 let now = Utc::now().to_rfc3339();
679 let status = if error.contains("did not complete within") {
680 TaskLaneSessionStatus::TimedOut
681 } else {
682 TaskLaneSessionStatus::Failed
683 };
684 A2ATaskTerminalUpdate {
685 status,
686 completed_at: now.clone(),
687 last_activity_at: now,
688 context_id: None,
689 error: Some(error),
690 }
691 }
692 };
693
694 if let Err(error) =
695 reconcile_a2a_lane_session(state, task_id, session_id, external_task_id, terminal).await
696 {
697 tracing::warn!(
698 target: "routa_a2a",
699 workspace_id = %workspace_id,
700 task_id = %task_id,
701 session_id = %session_id,
702 external_task_id = %external_task_id,
703 error = %error,
704 "failed to persist A2A terminal state"
705 );
706 return;
707 }
708
709 emit_kanban_workspace_event(state, workspace_id, task_id).await;
710}
711
712async fn wait_for_a2a_completion(
713 client: &reqwest::Client,
714 rpc_endpoint: &str,
715 task_id: &str,
716 auth_headers: Option<&HashMap<String, String>>,
717) -> Result<A2ATaskTerminalUpdate, String> {
718 let started_at = Instant::now();
719
720 loop {
721 let terminal = get_a2a_task_update(client, rpc_endpoint, task_id, auth_headers).await?;
722 if let Some(terminal) = terminal {
723 return Ok(terminal);
724 }
725 if started_at.elapsed() >= A2A_MAX_WAIT {
726 return Err(format!(
727 "A2A task {task_id} did not complete within {}ms",
728 A2A_MAX_WAIT.as_millis()
729 ));
730 }
731 tokio::time::sleep(A2A_POLL_INTERVAL).await;
732 }
733}
734
735async fn get_a2a_task_update(
736 client: &reqwest::Client,
737 rpc_endpoint: &str,
738 task_id: &str,
739 auth_headers: Option<&HashMap<String, String>>,
740) -> Result<Option<A2ATaskTerminalUpdate>, String> {
741 let request_id = uuid::Uuid::new_v4().to_string();
742 let response = apply_a2a_auth_headers(
743 client
744 .post(rpc_endpoint)
745 .header(CONTENT_TYPE, "application/json")
746 .header(ACCEPT, "application/json")
747 .json(&json!({
748 "jsonrpc": "2.0",
749 "id": request_id,
750 "method": "GetTask",
751 "params": { "id": task_id }
752 })),
753 auth_headers,
754 )?
755 .send()
756 .await
757 .map_err(|error| format!("Failed to poll A2A task: {}", error))?;
758
759 if !response.status().is_success() {
760 return Err(format!(
761 "A2A GetTask failed with HTTP {}",
762 response.status().as_u16()
763 ));
764 }
765
766 let payload: Value = response
767 .json()
768 .await
769 .map_err(|error| format!("Failed to decode A2A task payload: {}", error))?;
770 if let Some(error) = payload.get("error") {
771 let message = error
772 .get("message")
773 .and_then(Value::as_str)
774 .unwrap_or("unknown A2A error");
775 return Err(format!("A2A JSON-RPC error: {}", message));
776 }
777
778 let task = payload
779 .get("result")
780 .and_then(|value| value.get("task"))
781 .ok_or_else(|| "A2A response missing result.task".to_string())?;
782 let state = task
783 .get("status")
784 .and_then(|value| value.get("state"))
785 .and_then(Value::as_str)
786 .ok_or_else(|| "A2A task missing status.state".to_string())?;
787 if !is_terminal_a2a_state(state) {
788 return Ok(None);
789 }
790
791 let timestamp = task
792 .get("status")
793 .and_then(|value| value.get("timestamp"))
794 .and_then(Value::as_str)
795 .map(ToOwned::to_owned)
796 .unwrap_or_else(|| Utc::now().to_rfc3339());
797 let context_id = task
798 .get("contextId")
799 .and_then(Value::as_str)
800 .map(ToOwned::to_owned);
801 let error = if state == "completed" {
802 None
803 } else {
804 Some(
805 extract_a2a_status_message(task)
806 .unwrap_or_else(|| format!("A2A task ended in state: {state}")),
807 )
808 };
809
810 Ok(Some(A2ATaskTerminalUpdate {
811 status: map_a2a_terminal_status(state),
812 completed_at: timestamp.clone(),
813 last_activity_at: timestamp,
814 context_id,
815 error,
816 }))
817}
818
819fn extract_a2a_status_message(task: &Value) -> Option<String> {
820 let parts = task
821 .get("status")
822 .and_then(|value| value.get("message"))
823 .and_then(|value| value.get("parts"))
824 .and_then(Value::as_array)?;
825 let text = parts
826 .iter()
827 .filter_map(|part| part.get("text").and_then(Value::as_str))
828 .map(str::trim)
829 .filter(|part| !part.is_empty())
830 .collect::<Vec<_>>()
831 .join(" ");
832 (!text.is_empty()).then_some(text)
833}
834
835fn is_terminal_a2a_state(state: &str) -> bool {
836 matches!(
837 state,
838 "completed" | "failed" | "canceled" | "rejected" | "auth-required"
839 )
840}
841
842fn map_a2a_terminal_status(state: &str) -> TaskLaneSessionStatus {
843 match state {
844 "completed" => TaskLaneSessionStatus::Completed,
845 _ => TaskLaneSessionStatus::Failed,
846 }
847}
848
849async fn reconcile_a2a_lane_session(
850 state: &AppState,
851 task_id: &str,
852 session_id: &str,
853 external_task_id: &str,
854 terminal: A2ATaskTerminalUpdate,
855) -> Result<(), String> {
856 let mut task = wait_for_task_persistence(state, task_id, session_id).await?;
857 let lane_session = task
858 .lane_sessions
859 .iter_mut()
860 .find(|session| session.session_id == session_id)
861 .ok_or_else(|| format!("Task {task_id} missing lane session {session_id}"))?;
862
863 lane_session.status = terminal.status;
864 lane_session.completed_at = Some(terminal.completed_at.clone());
865 lane_session.last_activity_at = Some(terminal.last_activity_at.clone());
866 if lane_session.external_task_id.is_none() {
867 lane_session.external_task_id = Some(external_task_id.to_string());
868 }
869 if terminal.context_id.is_some() {
870 lane_session.context_id = terminal.context_id.clone();
871 }
872
873 if task.trigger_session_id.as_deref() == Some(session_id) {
874 task.trigger_session_id = None;
875 }
876 task.last_sync_error = terminal.error;
877 task.updated_at = Utc::now();
878
879 state
880 .task_store
881 .save(&task)
882 .await
883 .map_err(|error| format!("Failed to save A2A task reconciliation: {}", error))
884}
885
886async fn wait_for_task_persistence(
887 state: &AppState,
888 task_id: &str,
889 session_id: &str,
890) -> Result<Task, String> {
891 for _ in 0..20 {
892 if let Some(task) = state
893 .task_store
894 .get(task_id)
895 .await
896 .map_err(|error| format!("Failed to load task {task_id}: {}", error))?
897 {
898 if task
899 .lane_sessions
900 .iter()
901 .any(|session| session.session_id == session_id)
902 {
903 return Ok(task);
904 }
905 }
906 tokio::time::sleep(Duration::from_millis(50)).await;
907 }
908
909 Err(format!(
910 "Task {task_id} did not persist lane session {session_id} before A2A reconciliation"
911 ))
912}
913
914async fn emit_kanban_workspace_event(state: &AppState, workspace_id: &str, task_id: &str) {
915 state
916 .event_bus
917 .emit(AgentEvent {
918 event_type: AgentEventType::WorkspaceUpdated,
919 agent_id: "kanban-a2a".to_string(),
920 workspace_id: workspace_id.to_string(),
921 data: serde_json::json!({
922 "scope": "kanban",
923 "entity": "task",
924 "action": "updated",
925 "resourceId": task_id,
926 "source": "system",
927 }),
928 timestamp: Utc::now(),
929 })
930 .await;
931}
932
933async fn load_task_board(state: &AppState, task: &Task) -> Result<Option<KanbanBoard>, String> {
934 if let Some(board_id) = task.board_id.as_deref() {
935 state
936 .kanban_store
937 .get(board_id)
938 .await
939 .map_err(|error| format!("Failed to load Kanban board for automation: {}", error))
940 } else {
941 Ok(None)
942 }
943}
944
945fn resolve_task_automation_step(
946 board: Option<&KanbanBoard>,
947 task: &Task,
948) -> Option<KanbanAutomationStep> {
949 board
950 .and_then(|value| {
951 value
952 .columns
953 .iter()
954 .find(|column| Some(column.id.as_str()) == task.column_id.as_deref())
955 })
956 .and_then(|column| column.automation.as_ref())
957 .filter(|automation| automation.enabled)
958 .and_then(|automation| automation.primary_step())
959}
960
961fn is_a2a_step(step: Option<&KanbanAutomationStep>) -> bool {
962 step.is_some_and(|value| {
963 matches!(value.transport, Some(KanbanTransport::A2a)) || value.agent_card_url.is_some()
964 })
965}
966
967fn resolve_a2a_auth_headers(
968 auth_config_id: Option<&str>,
969) -> Result<Option<HashMap<String, String>>, String> {
970 let Some(auth_config_id) = auth_config_id
971 .map(str::trim)
972 .filter(|value| !value.is_empty())
973 else {
974 return Ok(None);
975 };
976 let raw = std::env::var(A2A_AUTH_CONFIGS_ENV).unwrap_or_default();
977 if raw.trim().is_empty() {
978 return Err(format!(
979 "A2A auth config \"{}\" was not found in {}.",
980 auth_config_id, A2A_AUTH_CONFIGS_ENV
981 ));
982 }
983
984 let parsed: Value = serde_json::from_str(&raw)
985 .map_err(|error| format!("Invalid {} JSON: {}", A2A_AUTH_CONFIGS_ENV, error))?;
986 let config = parsed.get(auth_config_id).ok_or_else(|| {
987 format!(
988 "A2A auth config \"{}\" was not found in {}.",
989 auth_config_id, A2A_AUTH_CONFIGS_ENV
990 )
991 })?;
992 let headers = config.get("headers").unwrap_or(config);
993 let headers_obj = headers.as_object().ok_or_else(|| {
994 format!(
995 "{}.{} must be a header map or contain a string header map in \"headers\".",
996 A2A_AUTH_CONFIGS_ENV, auth_config_id
997 )
998 })?;
999
1000 let mut resolved = HashMap::new();
1001 for (name, value) in headers_obj {
1002 let value = value.as_str().ok_or_else(|| {
1003 format!(
1004 "{}.{} header {} must be a string.",
1005 A2A_AUTH_CONFIGS_ENV, auth_config_id, name
1006 )
1007 })?;
1008 resolved.insert(name.clone(), value.to_string());
1009 }
1010
1011 Ok(Some(resolved))
1012}
1013
1014fn apply_a2a_auth_headers(
1015 mut request: reqwest::RequestBuilder,
1016 auth_headers: Option<&HashMap<String, String>>,
1017) -> Result<reqwest::RequestBuilder, String> {
1018 if let Some(auth_headers) = auth_headers {
1019 for (name, value) in auth_headers {
1020 let header_name = HeaderName::try_from(name.as_str())
1021 .map_err(|error| format!("Invalid A2A auth header name {}: {}", name, error))?;
1022 let header_value = HeaderValue::from_str(value).map_err(|error| {
1023 format!(
1024 "Invalid A2A auth header value for {}: {}",
1025 header_name.as_str(),
1026 error
1027 )
1028 })?;
1029 request = request.header(header_name, header_value);
1030 }
1031 }
1032
1033 Ok(request)
1034}
1035
1036async fn resolve_a2a_rpc_endpoint(
1037 client: &reqwest::Client,
1038 url: &str,
1039 auth_headers: Option<&HashMap<String, String>>,
1040) -> Result<String, String> {
1041 if url.ends_with(".json") || url.ends_with("/agent-card") || url.ends_with("/card") {
1042 let response = apply_a2a_auth_headers(
1043 client.get(url).header(ACCEPT, "application/json"),
1044 auth_headers,
1045 )?
1046 .send()
1047 .await
1048 .map_err(|error| format!("Failed to fetch A2A agent card: {}", error))?;
1049 if !response.status().is_success() {
1050 return Err(format!(
1051 "A2A agent card fetch failed with HTTP {}",
1052 response.status().as_u16()
1053 ));
1054 }
1055 let card: Value = response
1056 .json()
1057 .await
1058 .map_err(|error| format!("Failed to decode A2A agent card: {}", error))?;
1059 let rpc_url = card
1060 .get("url")
1061 .and_then(Value::as_str)
1062 .ok_or_else(|| "A2A agent card missing url".to_string())?;
1063 absolutize_url(url, rpc_url)
1064 } else {
1065 Ok(url.to_string())
1066 }
1067}
1068
1069fn absolutize_url(base_url: &str, maybe_relative: &str) -> Result<String, String> {
1070 if maybe_relative.starts_with("http://") || maybe_relative.starts_with("https://") {
1071 return Ok(maybe_relative.to_string());
1072 }
1073
1074 let base = reqwest::Url::parse(base_url)
1075 .map_err(|error| format!("Invalid base A2A URL {}: {}", base_url, error))?;
1076 base.join(maybe_relative)
1077 .map(|url| url.to_string())
1078 .map_err(|error| format!("Invalid relative A2A URL {}: {}", maybe_relative, error))
1079}