1use chrono::Utc;
2use reqwest::header::{ACCEPT, CONTENT_TYPE};
3use routa_core::models::kanban::{KanbanAutomationStep, KanbanBoard, KanbanTransport};
4use serde_json::{json, Value};
5
6use crate::error::ServerError;
7use crate::models::task::{Task, TaskLaneSession, TaskLaneSessionStatus};
8use crate::state::AppState;
9
10pub async fn resolve_codebase(
11 state: &AppState,
12 workspace_id: &str,
13 repo_path: Option<&str>,
14) -> Result<Option<crate::models::codebase::Codebase>, ServerError> {
15 if let Some(path) = repo_path {
16 state
17 .codebase_store
18 .find_by_repo_path(workspace_id, path)
19 .await
20 } else {
21 state.codebase_store.get_default(workspace_id).await
22 }
23}
24
25pub async fn auto_create_worktree(
26 state: &AppState,
27 task: &crate::models::task::Task,
28 codebase: &crate::models::codebase::Codebase,
29) -> Result<String, String> {
30 let slugified = task
31 .title
32 .to_lowercase()
33 .chars()
34 .map(|c| if c.is_alphanumeric() { c } else { '-' })
35 .collect::<String>()
36 .split('-')
37 .filter(|s| !s.is_empty())
38 .collect::<Vec<_>>()
39 .join("-");
40 let short_id = &task.id[..task.id.len().min(8)];
41 let slug = format!("{}-{}", short_id, &slugified[..slugified.len().min(40)]);
42 let branch = format!("issue/{}", slug);
43
44 let workspace = state
45 .workspace_store
46 .get(&task.workspace_id)
47 .await
48 .ok()
49 .flatten();
50 let worktree_root = workspace
51 .as_ref()
52 .and_then(|ws| ws.metadata.get("worktreeRoot"))
53 .filter(|s| !s.trim().is_empty())
54 .map(std::path::PathBuf::from)
55 .unwrap_or_else(|| crate::git::get_default_workspace_worktree_root(&task.workspace_id));
56
57 let codebase_label = codebase
58 .label
59 .as_ref()
60 .map(|l| crate::git::branch_to_safe_dir_name(l))
61 .unwrap_or_else(|| crate::git::branch_to_safe_dir_name(&codebase.id));
62
63 let worktree_path = worktree_root
64 .join(&codebase_label)
65 .join(crate::git::branch_to_safe_dir_name(&slug));
66
67 if let Some(parent) = worktree_path.parent() {
68 std::fs::create_dir_all(parent)
69 .map_err(|e| format!("Failed to create worktree parent dir: {}", e))?;
70 }
71
72 let worktree_path_str = worktree_path.to_string_lossy().to_string();
73 let base_branch = codebase
74 .branch
75 .clone()
76 .unwrap_or_else(|| "main".to_string());
77
78 let worktree = crate::models::worktree::Worktree::new(
79 uuid::Uuid::new_v4().to_string(),
80 codebase.id.clone(),
81 task.workspace_id.clone(),
82 worktree_path_str.clone(),
83 branch.clone(),
84 base_branch.clone(),
85 Some(slug),
86 );
87 state
88 .worktree_store
89 .save(&worktree)
90 .await
91 .map_err(|e| format!("Failed to save worktree: {}", e))?;
92
93 let _ = crate::git::worktree_prune(&codebase.repo_path);
94 crate::git::worktree_add(
95 &codebase.repo_path,
96 &worktree_path_str,
97 &branch,
98 &base_branch,
99 false,
100 )
101 .map_err(|e| format!("git worktree add failed: {}", e))?;
102
103 Ok(worktree.id)
104}
105
106pub async fn trigger_assigned_task_agent(
107 state: &AppState,
108 task: &mut Task,
109 cwd: Option<&str>,
110 branch: Option<&str>,
111) -> Result<(), String> {
112 let board = load_task_board(state, task).await?;
113 let step = resolve_task_automation_step(board.as_ref(), task);
114 if is_a2a_step(step.as_ref()) {
115 return trigger_assigned_task_a2a_agent(state, task, board.as_ref(), step.as_ref()).await;
116 }
117
118 trigger_assigned_task_acp_agent(state, task, board.as_ref(), step.as_ref(), cwd, branch).await
119}
120
121fn build_task_prompt(
122 task: &Task,
123 board_id: Option<&str>,
124 next_column_id: Option<&str>,
125 available_columns: &str,
126) -> String {
127 let labels = if task.labels.is_empty() {
128 "Labels: none".to_string()
129 } else {
130 format!("Labels: {}", task.labels.join(", "))
131 };
132 let lane_id = task.column_id.as_deref().unwrap_or("backlog");
133 let lane_guidance = match lane_id {
134 "dev" => vec![
135 "You are in the `dev` lane. This lane may implement the requested change, but you must keep work scoped to the current card.".to_string(),
136 "Use `routa-coordination_update_card` to record concrete progress on this card before or after meaningful implementation steps.".to_string(),
137 "When implementation for this lane is complete, use `routa-coordination_move_card` to advance the same card.".to_string(),
138 ],
139 "todo" => vec![
140 "You are in the `todo` lane. This lane does not perform full implementation work.".to_string(),
141 "Only clarify the card, update its progress or status, and move the same card forward when the lane is complete.".to_string(),
142 "Do not edit files, do not inspect the whole repository, and do not run browser tests or environment diagnostics in this lane.".to_string(),
143 ],
144 _ => vec![
145 format!("You are in the `{lane_id}` lane. Keep work scoped to the current card and this lane only."),
146 ],
147 };
148 let mut sections = vec![
149 format!("You are assigned to Kanban task: {}", task.title),
150 String::new(),
151 "## Context".to_string(),
152 String::new(),
153 "**IMPORTANT**: You are working in Kanban lane automation for exactly one existing card.".to_string(),
154 "Only operate on the current card. Do not create a new task, do not switch to a different card, and do not broaden scope.".to_string(),
155 "Use the exact MCP tool names exposed by the provider. In OpenCode, prefer `routa-coordination_update_card` and `routa-coordination_move_card`.".to_string(),
156 "Do NOT use `gh issue create`, browser automation, Playwright, repo-wide debugging, API exploration, or unrelated codebase research unless the card objective explicitly requires it.".to_string(),
157 String::new(),
158 "## Task Details".to_string(),
159 String::new(),
160 format!("**Card ID:** {}", task.id),
161 format!(
162 "**Priority:** {}",
163 task.priority.as_ref().map(|value| value.as_str()).unwrap_or("medium")
164 ),
165 board_id
166 .map(|value| format!("**Board ID:** {}", value))
167 .unwrap_or_else(|| "**Board ID:** unavailable".to_string()),
168 format!("**Current Lane:** {}", lane_id),
169 next_column_id
170 .map(|value| format!("**Next Column ID:** {}", value))
171 .unwrap_or_else(|| "**Next Column ID:** unavailable".to_string()),
172 labels,
173 task.github_url
174 .as_ref()
175 .map(|url| format!("**GitHub Issue:** {}", url))
176 .unwrap_or_else(|| "**GitHub Issue:** local-only".to_string()),
177 String::new(),
178 "## Objective".to_string(),
179 String::new(),
180 task.objective.clone(),
181 String::new(),
182 "## Board Columns".to_string(),
183 String::new(),
184 available_columns.to_string(),
185 String::new(),
186 "## Lane Guidance".to_string(),
187 String::new(),
188 lane_guidance.join("\n"),
189 String::new(),
190 ];
191
192 if let Some(test_cases) = task.test_cases.as_ref().filter(|value| !value.is_empty()) {
193 sections.push("## Test Cases".to_string());
194 sections.push(String::new());
195 sections.push(
196 test_cases
197 .iter()
198 .map(|value| format!("- {}", value))
199 .collect::<Vec<_>>()
200 .join("\n"),
201 );
202 sections.push(String::new());
203 }
204
205 sections.extend([
206 "## Available MCP Tools".to_string(),
207 String::new(),
208 "Use the exact MCP tool names exposed in this session. For OpenCode, the important ones are:".to_string(),
209 String::new(),
210 format!(
211 "- **routa-coordination_update_card**: Update this card's title, description, priority, or labels. Use cardId: \"{}\"",
212 task.id
213 ),
214 format!(
215 "- **routa-coordination_move_card**: Move this same card to targetColumnId \"{}\" when the current lane is complete.",
216 next_column_id.unwrap_or("the exact next column id listed above")
217 ),
218 String::new(),
219 "## Instructions".to_string(),
220 String::new(),
221 "1. Start work for the current lane immediately.".to_string(),
222 "2. Keep changes focused on this card only.".to_string(),
223 "3. Use the exact tool name `routa-coordination_update_card` to record progress on this card.".to_string(),
224 format!(
225 "4. Use the exact tool name `routa-coordination_move_card` with targetColumnId `{}` only when the current lane is complete.",
226 next_column_id.unwrap_or("the exact next column id listed above")
227 ),
228 "5. Do not guess board ids or column ids. Use the Board ID and Board Columns listed above.".to_string(),
229 "6. If blocked, update this same card with the blocking reason instead of exploring side quests.".to_string(),
230 "7. Treat lane guidance as stricter than the general card objective when they conflict.".to_string(),
231 "8. Do not run browser tests or environment diagnostics unless the card explicitly asks for them.".to_string(),
232 ]);
233
234 sections.join("\n")
235}
236
237async fn trigger_assigned_task_acp_agent(
238 state: &AppState,
239 task: &mut Task,
240 board: Option<&KanbanBoard>,
241 step: Option<&KanbanAutomationStep>,
242 cwd: Option<&str>,
243 branch: Option<&str>,
244) -> Result<(), String> {
245 let provider = task
246 .assigned_provider
247 .clone()
248 .unwrap_or_else(|| "opencode".to_string());
249 let role = task
250 .assigned_role
251 .clone()
252 .unwrap_or_else(|| "CRAFTER".to_string())
253 .to_uppercase();
254 let session_id = uuid::Uuid::new_v4().to_string();
255 let cwd = cwd
256 .map(|value| value.to_string())
257 .or_else(|| {
258 std::env::current_dir()
259 .ok()
260 .map(|path| path.to_string_lossy().to_string())
261 })
262 .unwrap_or_else(|| ".".to_string());
263
264 state
265 .acp_manager
266 .create_session(
267 session_id.clone(),
268 cwd.clone(),
269 task.workspace_id.clone(),
270 Some(provider.clone()),
271 Some(role.clone()),
272 None,
273 None,
274 Some("full".to_string()),
275 Some("kanban-planning".to_string()),
276 )
277 .await
278 .map_err(|error| format!("Failed to create ACP session: {}", error))?;
279
280 state
281 .acp_session_store
282 .create(
283 &session_id,
284 &cwd,
285 None,
286 &task.workspace_id,
287 Some(provider.as_str()),
288 Some(role.as_str()),
289 None,
290 )
291 .await
292 .map_err(|error| format!("Failed to persist ACP session: {}", error))?;
293
294 let mut ordered_columns = board.map(|value| value.columns.clone()).unwrap_or_default();
295 ordered_columns.sort_by_key(|column| column.position);
296 let next_column_id = ordered_columns
297 .iter()
298 .position(|column| Some(column.id.as_str()) == task.column_id.as_deref())
299 .and_then(|index| ordered_columns.get(index + 1))
300 .map(|column| column.id.clone());
301 let available_columns = if ordered_columns.is_empty() {
302 "- unavailable".to_string()
303 } else {
304 ordered_columns
305 .iter()
306 .map(|column| {
307 format!(
308 "- {} ({}) stage={} position={}",
309 column.id, column.name, column.stage, column.position
310 )
311 })
312 .collect::<Vec<_>>()
313 .join("\n")
314 };
315 let prompt = build_task_prompt(
316 task,
317 board
318 .map(|value| value.id.as_str())
319 .or(task.board_id.as_deref()),
320 next_column_id.as_deref(),
321 &available_columns,
322 );
323 let state_clone = state.clone();
324 let session_id_clone = session_id.clone();
325 let task_workspace = task.workspace_id.clone();
326 let provider_clone = provider.clone();
327 let cwd_clone = cwd.clone();
328 let _branch = branch.map(|value| value.to_string());
329
330 if let Err(error) = state
331 .acp_session_store
332 .set_first_prompt_sent(&session_id)
333 .await
334 {
335 tracing::error!(
336 target: "routa_kanban_prompt",
337 session_id = %session_id,
338 workspace_id = %task.workspace_id,
339 error = %error,
340 "kanban auto prompt failed to mark prompt dispatched"
341 );
342 } else {
343 tracing::info!(
344 target: "routa_kanban_prompt",
345 session_id = %session_id,
346 workspace_id = %task.workspace_id,
347 provider = %provider,
348 "kanban auto prompt marked prompt dispatched"
349 );
350 }
351
352 tracing::info!(
353 target: "routa_kanban_prompt",
354 session_id = %session_id_clone,
355 workspace_id = %task_workspace,
356 provider = %provider_clone,
357 cwd = %cwd_clone,
358 "kanban auto prompt scheduled"
359 );
360
361 tokio::spawn(async move {
362 tracing::info!(
363 target: "routa_kanban_prompt",
364 session_id = %session_id_clone,
365 workspace_id = %task_workspace,
366 provider = %provider_clone,
367 cwd = %cwd_clone,
368 "kanban auto prompt start"
369 );
370 if let Err(error) = state_clone
371 .acp_manager
372 .prompt(&session_id_clone, &prompt)
373 .await
374 {
375 tracing::error!(
376 "[kanban] Failed to auto-prompt ACP task session {} in workspace {} with provider {} at {}: {}",
377 session_id_clone,
378 task_workspace,
379 provider_clone,
380 cwd_clone,
381 error
382 );
383 return;
384 }
385
386 tracing::info!(
387 target: "routa_kanban_prompt",
388 session_id = %session_id_clone,
389 workspace_id = %task_workspace,
390 provider = %provider_clone,
391 "kanban auto prompt success"
392 );
393 if let Some(history) = state_clone
394 .acp_manager
395 .get_session_history(&session_id_clone)
396 .await
397 {
398 if let Err(error) = state_clone
399 .acp_session_store
400 .save_history(&session_id_clone, &history)
401 .await
402 {
403 tracing::error!(
404 target: "routa_kanban_prompt",
405 session_id = %session_id_clone,
406 workspace_id = %task_workspace,
407 error = %error,
408 "kanban auto prompt failed to persist history"
409 );
410 } else {
411 tracing::info!(
412 target: "routa_kanban_prompt",
413 session_id = %session_id_clone,
414 workspace_id = %task_workspace,
415 history_len = history.len(),
416 "kanban auto prompt persisted history"
417 );
418 }
419 }
420 });
421
422 apply_trigger_result(
423 task,
424 board,
425 step,
426 AgentTriggerResult {
427 session_id,
428 transport: "acp".to_string(),
429 external_task_id: None,
430 context_id: None,
431 },
432 );
433
434 Ok(())
435}
436
437async fn trigger_assigned_task_a2a_agent(
438 _state: &AppState,
439 task: &mut Task,
440 board: Option<&KanbanBoard>,
441 step: Option<&KanbanAutomationStep>,
442) -> Result<(), String> {
443 let step = step.ok_or_else(|| "A2A automation requires a resolved column step".to_string())?;
444 let agent_card_url = step
445 .agent_card_url
446 .as_deref()
447 .ok_or_else(|| "A2A automation requires agentCardUrl".to_string())?;
448
449 let mut ordered_columns = board.map(|value| value.columns.clone()).unwrap_or_default();
450 ordered_columns.sort_by_key(|column| column.position);
451 let next_column_id = ordered_columns
452 .iter()
453 .position(|column| Some(column.id.as_str()) == task.column_id.as_deref())
454 .and_then(|index| ordered_columns.get(index + 1))
455 .map(|column| column.id.clone());
456 let available_columns = if ordered_columns.is_empty() {
457 "- unavailable".to_string()
458 } else {
459 ordered_columns
460 .iter()
461 .map(|column| {
462 format!(
463 "- {} ({}) stage={} position={}",
464 column.id, column.name, column.stage, column.position
465 )
466 })
467 .collect::<Vec<_>>()
468 .join("\n")
469 };
470 let prompt = build_task_prompt(
471 task,
472 board
473 .map(|value| value.id.as_str())
474 .or(task.board_id.as_deref()),
475 next_column_id.as_deref(),
476 &available_columns,
477 );
478
479 let client = reqwest::Client::new();
480 let rpc_endpoint = resolve_a2a_rpc_endpoint(&client, agent_card_url).await?;
481 let request_id = uuid::Uuid::new_v4().to_string();
482 let message_id = uuid::Uuid::new_v4().to_string();
483 let response = client
484 .post(&rpc_endpoint)
485 .header(CONTENT_TYPE, "application/json")
486 .header(ACCEPT, "application/json")
487 .json(&json!({
488 "jsonrpc": "2.0",
489 "id": request_id,
490 "method": "SendMessage",
491 "params": {
492 "message": {
493 "messageId": message_id,
494 "role": "user",
495 "parts": [
496 { "text": prompt }
497 ]
498 },
499 "metadata": {
500 "workspaceId": task.workspace_id,
501 "taskId": task.id,
502 "boardId": task.board_id,
503 "columnId": task.column_id,
504 "stepId": step.id,
505 "skillId": step.skill_id,
506 "authConfigId": step.auth_config_id,
507 "role": task.assigned_role,
508 }
509 }
510 }))
511 .send()
512 .await
513 .map_err(|error| format!("Failed to send A2A request: {}", error))?;
514
515 if !response.status().is_success() {
516 return Err(format!(
517 "A2A request failed with HTTP {}",
518 response.status().as_u16()
519 ));
520 }
521
522 let payload: Value = response
523 .json()
524 .await
525 .map_err(|error| format!("Failed to decode A2A response: {}", error))?;
526 if let Some(error) = payload.get("error") {
527 let message = error
528 .get("message")
529 .and_then(Value::as_str)
530 .unwrap_or("unknown A2A error");
531 return Err(format!("A2A JSON-RPC error: {}", message));
532 }
533
534 let task_result = payload
535 .get("result")
536 .and_then(|value| value.get("task"))
537 .ok_or_else(|| "A2A response missing result.task".to_string())?;
538 let external_task_id = task_result
539 .get("id")
540 .and_then(Value::as_str)
541 .ok_or_else(|| "A2A response missing task.id".to_string())?
542 .to_string();
543 let context_id = task_result
544 .get("contextId")
545 .and_then(Value::as_str)
546 .map(ToOwned::to_owned);
547 let session_id = format!("a2a-{}", uuid::Uuid::new_v4());
548
549 apply_trigger_result(
550 task,
551 board,
552 Some(step),
553 AgentTriggerResult {
554 session_id,
555 transport: "a2a".to_string(),
556 external_task_id: Some(external_task_id),
557 context_id,
558 },
559 );
560
561 Ok(())
562}
563
564#[derive(Debug)]
565struct AgentTriggerResult {
566 session_id: String,
567 transport: String,
568 external_task_id: Option<String>,
569 context_id: Option<String>,
570}
571
572fn apply_trigger_result(
573 task: &mut Task,
574 board: Option<&KanbanBoard>,
575 step: Option<&KanbanAutomationStep>,
576 result: AgentTriggerResult,
577) {
578 task.trigger_session_id = Some(result.session_id.clone());
579 if !task.session_ids.iter().any(|id| id == &result.session_id) {
580 task.session_ids.push(result.session_id.clone());
581 }
582
583 let column_name = board.and_then(|value| {
584 value.columns.iter().find_map(|column| {
585 (Some(column.id.as_str()) == task.column_id.as_deref()).then(|| column.name.clone())
586 })
587 });
588 let lane_session = TaskLaneSession {
589 session_id: result.session_id.clone(),
590 routa_agent_id: None,
591 column_id: task.column_id.clone(),
592 column_name,
593 step_id: step.map(|value| value.id.clone()),
594 step_index: None,
595 step_name: step
596 .and_then(|value| value.specialist_name.clone())
597 .or_else(|| task.assigned_specialist_name.clone()),
598 provider: task.assigned_provider.clone(),
599 role: task.assigned_role.clone(),
600 specialist_id: task.assigned_specialist_id.clone(),
601 specialist_name: task.assigned_specialist_name.clone(),
602 transport: Some(result.transport),
603 external_task_id: result.external_task_id,
604 context_id: result.context_id,
605 attempt: Some(1),
606 loop_mode: None,
607 completion_requirement: None,
608 objective: Some(task.objective.clone()),
609 last_activity_at: Some(Utc::now().to_rfc3339()),
610 recovered_from_session_id: None,
611 recovery_reason: None,
612 status: TaskLaneSessionStatus::Running,
613 started_at: Utc::now().to_rfc3339(),
614 completed_at: None,
615 };
616
617 if let Some(existing) = task
618 .lane_sessions
619 .iter_mut()
620 .find(|existing| existing.session_id == result.session_id)
621 {
622 *existing = lane_session;
623 } else {
624 task.lane_sessions.push(lane_session);
625 }
626}
627
628async fn load_task_board(state: &AppState, task: &Task) -> Result<Option<KanbanBoard>, String> {
629 if let Some(board_id) = task.board_id.as_deref() {
630 state
631 .kanban_store
632 .get(board_id)
633 .await
634 .map_err(|error| format!("Failed to load Kanban board for automation: {}", error))
635 } else {
636 Ok(None)
637 }
638}
639
640fn resolve_task_automation_step(
641 board: Option<&KanbanBoard>,
642 task: &Task,
643) -> Option<KanbanAutomationStep> {
644 board
645 .and_then(|value| {
646 value
647 .columns
648 .iter()
649 .find(|column| Some(column.id.as_str()) == task.column_id.as_deref())
650 })
651 .and_then(|column| column.automation.as_ref())
652 .filter(|automation| automation.enabled)
653 .and_then(|automation| automation.primary_step())
654}
655
656fn is_a2a_step(step: Option<&KanbanAutomationStep>) -> bool {
657 step.is_some_and(|value| {
658 matches!(value.transport, Some(KanbanTransport::A2a)) || value.agent_card_url.is_some()
659 })
660}
661
662async fn resolve_a2a_rpc_endpoint(client: &reqwest::Client, url: &str) -> Result<String, String> {
663 if url.ends_with(".json") || url.ends_with("/agent-card") || url.ends_with("/card") {
664 let response = client
665 .get(url)
666 .header(ACCEPT, "application/json")
667 .send()
668 .await
669 .map_err(|error| format!("Failed to fetch A2A agent card: {}", error))?;
670 if !response.status().is_success() {
671 return Err(format!(
672 "A2A agent card fetch failed with HTTP {}",
673 response.status().as_u16()
674 ));
675 }
676 let card: Value = response
677 .json()
678 .await
679 .map_err(|error| format!("Failed to decode A2A agent card: {}", error))?;
680 let rpc_url = card
681 .get("url")
682 .and_then(Value::as_str)
683 .ok_or_else(|| "A2A agent card missing url".to_string())?;
684 absolutize_url(url, rpc_url)
685 } else {
686 Ok(url.to_string())
687 }
688}
689
690fn absolutize_url(base_url: &str, maybe_relative: &str) -> Result<String, String> {
691 if maybe_relative.starts_with("http://") || maybe_relative.starts_with("https://") {
692 return Ok(maybe_relative.to_string());
693 }
694
695 let base = reqwest::Url::parse(base_url)
696 .map_err(|error| format!("Invalid base A2A URL {}: {}", base_url, error))?;
697 base.join(maybe_relative)
698 .map(|url| url.to_string())
699 .map_err(|error| format!("Invalid relative A2A URL {}: {}", maybe_relative, error))
700}