1mod clone_location;
4mod clone_target;
5
6use crate::a2a::claim::TaskClaimResponse;
7use crate::a2a::git_credentials::{
8 configure_repo_git_auth, configure_repo_git_github_app, write_git_credential_helper_script,
9};
10use crate::a2a::worker_workspace_record::{RegisteredWorkspaceRecord, fetch_workspace_record};
11use crate::bus::AgentBus;
12use crate::cli::{A2aArgs, ForageArgs};
13use crate::provenance::{ClaimProvenance, install_commit_msg_hook};
14use crate::provider::ProviderRegistry;
15use crate::session::Session;
16use crate::swarm::{DecompositionStrategy, SwarmConfig, SwarmExecutor};
17use crate::tui::swarm_view::SwarmEvent;
18use anyhow::{Context, Result};
19use futures::StreamExt;
20use reqwest::Client;
21use serde::Deserialize;
22use std::collections::HashMap;
23use std::collections::HashSet;
24use std::path::{Path, PathBuf};
25use std::sync::Arc;
26use std::time::Duration;
27use tokio::process::Command;
28use tokio::sync::{Mutex, mpsc};
29use tokio::task::JoinHandle;
30use tokio::time::Instant;
31
32use crate::a2a::worker_tool_registry::{create_filtered_registry, is_tool_allowed};
33use crate::a2a::worker_workspace_context::resolve_task_workspace_dir;
34use clone_location::{git_clone_base_dir, resolve_workspace_clone_path};
35use clone_target::prepare_clone_target;
36
37#[derive(Debug, Clone, Copy, PartialEq, Eq)]
39pub enum WorkerStatus {
40 Idle,
41 Processing,
42}
43
44impl WorkerStatus {
45 pub fn as_str(&self) -> &'static str {
46 match self {
47 WorkerStatus::Idle => "idle",
48 WorkerStatus::Processing => "processing",
49 }
50 }
51}
52
53#[derive(Clone)]
55pub struct HeartbeatState {
56 worker_id: String,
57 pub agent_name: String,
58 pub status: Arc<Mutex<WorkerStatus>>,
59 pub active_task_count: Arc<Mutex<usize>>,
60 pub sub_agents: Arc<Mutex<HashSet<String>>>,
61}
62
63impl HeartbeatState {
64 pub fn new(worker_id: String, agent_name: String) -> Self {
65 Self {
66 worker_id,
67 agent_name,
68 status: Arc::new(Mutex::new(WorkerStatus::Idle)),
69 active_task_count: Arc::new(Mutex::new(0)),
70 sub_agents: Arc::new(Mutex::new(HashSet::new())),
71 }
72 }
73
74 pub async fn set_status(&self, status: WorkerStatus) {
75 *self.status.lock().await = status;
76 }
77
78 pub async fn set_task_count(&self, count: usize) {
79 *self.active_task_count.lock().await = count;
80 }
81
82 pub async fn register_sub_agent(&self, agent_name: String) {
83 self.sub_agents.lock().await.insert(agent_name);
84 }
85
86 pub async fn deregister_sub_agent(&self, agent_name: &str) {
87 self.sub_agents.lock().await.remove(agent_name);
88 }
89
90 pub async fn sub_agents_snapshot(&self) -> Vec<String> {
91 let mut names = self
92 .sub_agents
93 .lock()
94 .await
95 .iter()
96 .cloned()
97 .collect::<Vec<_>>();
98 names.sort();
99 names
100 }
101}
102
103#[derive(Clone, Debug)]
104#[allow(dead_code)]
105pub struct CognitionHeartbeatConfig {
106 pub enabled: bool,
107 pub source_base_url: String,
108 pub token: Option<String>,
109 pub provider_name: String,
110 pub interval_secs: u64,
111 pub include_thought_summary: bool,
112 pub summary_max_chars: usize,
113 pub request_timeout_ms: u64,
114}
115
116impl CognitionHeartbeatConfig {
117 pub fn from_env() -> Self {
118 let source_base_url = std::env::var("CODETETHER_WORKER_COGNITION_SOURCE_URL")
119 .unwrap_or_else(|_| "http://127.0.0.1:4096".to_string())
120 .trim_end_matches('/')
121 .to_string();
122
123 Self {
124 enabled: env_bool("CODETETHER_WORKER_COGNITION_SHARE_ENABLED", true),
125 source_base_url,
126 include_thought_summary: env_bool("CODETETHER_WORKER_COGNITION_INCLUDE_THOUGHTS", true),
127 summary_max_chars: env_usize("CODETETHER_WORKER_COGNITION_THOUGHT_MAX_CHARS", 480),
128 request_timeout_ms: env_u64("CODETETHER_WORKER_COGNITION_TIMEOUT_MS", 2_500).max(250),
129 interval_secs: env_u64("CODETETHER_WORKER_COGNITION_INTERVAL_SECS", 30).max(5),
130 provider_name: std::env::var("CODETETHER_WORKER_COGNITION_PROVIDER")
131 .unwrap_or_else(|_| "cognition".to_string()),
132 token: std::env::var("CODETETHER_WORKER_COGNITION_TOKEN").ok(),
133 }
134 }
135}
136
137#[derive(Debug, Deserialize)]
138struct CognitionStatusSnapshot {
139 running: bool,
140 #[serde(default)]
141 last_tick_at: Option<String>,
142 #[serde(default)]
143 active_persona_count: usize,
144 #[serde(default)]
145 events_buffered: usize,
146 #[serde(default)]
147 snapshots_buffered: usize,
148 #[serde(default)]
149 loop_interval_ms: u64,
150}
151
152#[derive(Debug, Deserialize)]
153struct CognitionLatestSnapshot {
154 generated_at: String,
155 summary: String,
156 #[serde(default)]
157 metadata: HashMap<String, serde_json::Value>,
158}
159
160#[derive(Debug, Clone, Copy, PartialEq, Eq)]
161enum TaskReservation {
162 Reserved,
163 AlreadyProcessing,
164 AtCapacity,
165}
166
167#[derive(Clone)]
168struct WorkerTaskRuntime {
169 client: Client,
170 server: String,
171 token: Option<String>,
172 worker_id: String,
173 processing: Arc<Mutex<HashSet<String>>>,
174 max_concurrent_tasks: usize,
175 auto_approve: AutoApprove,
176 bus: Arc<AgentBus>,
177}
178
179pub async fn run(args: A2aArgs) -> Result<()> {
181 let server = args.server.trim_end_matches('/');
182 let name = args
183 .name
184 .unwrap_or_else(|| format!("codetether-{}", std::process::id()));
185 let worker_id = resolve_worker_id();
186 export_worker_runtime_env(server, &args.token, &worker_id);
187 let max_concurrent_tasks = normalize_max_concurrent_tasks(args.max_concurrent_tasks);
188
189 let codebases: Vec<String> = args
190 .workspaces
191 .map(|c| c.split(',').map(|s| s.trim().to_string()).collect())
192 .unwrap_or_else(|| vec![std::env::current_dir().unwrap().display().to_string()]);
193
194 tracing::info!("Starting A2A worker: {} ({})", name, worker_id);
195 tracing::info!("Server: {}", server);
196 tracing::info!("Workspaces: {:?}", codebases);
197 tracing::info!(max_concurrent_tasks, "Worker task concurrency configured");
198
199 let shared_codebases: Arc<Mutex<Vec<String>>> = Arc::new(Mutex::new(codebases));
201
202 let client = Client::new();
203 let processing = Arc::new(Mutex::new(HashSet::<String>::new()));
204 let cognition_heartbeat = CognitionHeartbeatConfig::from_env();
205 if cognition_heartbeat.enabled {
206 tracing::info!(
207 source = %cognition_heartbeat.source_base_url,
208 include_thoughts = cognition_heartbeat.include_thought_summary,
209 max_chars = cognition_heartbeat.summary_max_chars,
210 timeout_ms = cognition_heartbeat.request_timeout_ms,
211 "Cognition heartbeat sharing enabled (set CODETETHER_WORKER_COGNITION_SHARE_ENABLED=false to disable)"
212 );
213 } else {
214 tracing::warn!(
215 "Cognition heartbeat sharing disabled; worker thought state will not be shared upstream"
216 );
217 }
218
219 let auto_approve = match args.auto_approve.as_str() {
220 "all" => AutoApprove::All,
221 "safe" => AutoApprove::Safe,
222 _ => AutoApprove::None,
223 };
224
225 let heartbeat_state = HeartbeatState::new(worker_id.clone(), name.clone());
227
228 let bus = AgentBus::new().into_arc();
230
231 crate::bus::s3_sink::spawn_bus_s3_sink(bus.clone());
233
234 {
235 let handle = bus.handle(&worker_id);
236 handle.announce_ready(worker_capabilities());
237 }
238
239 let task_runtime = WorkerTaskRuntime {
240 client: client.clone(),
241 server: server.to_string(),
242 token: args.token.clone(),
243 worker_id: worker_id.clone(),
244 processing: processing.clone(),
245 max_concurrent_tasks,
246 auto_approve,
247 bus: bus.clone(),
248 };
249
250 {
252 let codebases = shared_codebases.lock().await.clone();
253 register_worker(
254 &client,
255 server,
256 &args.token,
257 &worker_id,
258 &name,
259 &codebases,
260 args.public_url.as_deref(),
261 )
262 .await?;
263 }
264
265 if let Err(e) =
266 sync_workspaces_from_server(&client, server, &args.token, &shared_codebases).await
267 {
268 tracing::warn!(error = %e, "Initial workspace sync failed");
269 }
270
271 fetch_pending_tasks(&task_runtime).await?;
273
274 let _workspace_sync_handle = start_workspace_sync(
276 client.clone(),
277 server.to_string(),
278 args.token.clone(),
279 shared_codebases.clone(),
280 );
281
282 loop {
284 let codebases = shared_codebases.lock().await.clone();
286
287 if let Err(e) = register_worker(
289 &client,
290 server,
291 &args.token,
292 &worker_id,
293 &name,
294 &codebases,
295 args.public_url.as_deref(),
296 )
297 .await
298 {
299 tracing::warn!("Failed to re-register worker on reconnection: {}", e);
300 }
301 if let Err(e) = fetch_pending_tasks(&task_runtime).await {
302 tracing::warn!("Reconnect task fetch failed: {}", e);
303 }
304
305 let heartbeat_handle = start_heartbeat(
307 client.clone(),
308 server.to_string(),
309 args.token.clone(),
310 heartbeat_state.clone(),
311 processing.clone(),
312 cognition_heartbeat.clone(),
313 );
314
315 match connect_stream(&task_runtime, &name, &codebases, None).await {
316 Ok(()) => {
317 tracing::warn!("Stream ended, reconnecting...");
318 }
319 Err(e) => {
320 tracing::error!("Stream error: {}, reconnecting...", e);
321 }
322 }
323
324 heartbeat_handle.abort();
326 tracing::debug!("Heartbeat cancelled for reconnection");
327
328 tokio::time::sleep(tokio::time::Duration::from_secs(5)).await;
329 }
330}
331
332pub async fn run_with_state(
335 args: A2aArgs,
336 server_state: crate::worker_server::WorkerServerState,
337) -> Result<()> {
338 let server = args.server.trim_end_matches('/');
339 let name = args
340 .name
341 .unwrap_or_else(|| format!("codetether-{}", std::process::id()));
342 let worker_id = resolve_worker_id();
343 export_worker_runtime_env(server, &args.token, &worker_id);
344 let max_concurrent_tasks = normalize_max_concurrent_tasks(args.max_concurrent_tasks);
345
346 server_state.set_worker_id(worker_id.clone()).await;
348
349 let codebases: Vec<String> = args
350 .workspaces
351 .map(|c| c.split(',').map(|s| s.trim().to_string()).collect())
352 .unwrap_or_else(|| vec![std::env::current_dir().unwrap().display().to_string()]);
353
354 tracing::info!("Starting A2A worker: {} ({})", name, worker_id);
355 tracing::info!("Server: {}", server);
356 tracing::info!("Workspaces: {:?}", codebases);
357 tracing::info!(max_concurrent_tasks, "Worker task concurrency configured");
358
359 let shared_codebases: Arc<Mutex<Vec<String>>> = Arc::new(Mutex::new(codebases));
361
362 let client = Client::new();
363 let processing = Arc::new(Mutex::new(HashSet::<String>::new()));
364 let cognition_heartbeat = CognitionHeartbeatConfig::from_env();
365 if cognition_heartbeat.enabled {
366 tracing::info!(
367 source = %cognition_heartbeat.source_base_url,
368 include_thoughts = cognition_heartbeat.include_thought_summary,
369 max_chars = cognition_heartbeat.summary_max_chars,
370 timeout_ms = cognition_heartbeat.request_timeout_ms,
371 "Cognition heartbeat sharing enabled (set CODETETHER_WORKER_COGNITION_SHARE_ENABLED=false to disable)"
372 );
373 } else {
374 tracing::warn!(
375 "Cognition heartbeat sharing disabled; worker thought state will not be shared upstream"
376 );
377 }
378
379 let auto_approve = match args.auto_approve.as_str() {
380 "all" => AutoApprove::All,
381 "safe" => AutoApprove::Safe,
382 _ => AutoApprove::None,
383 };
384
385 let heartbeat_state = HeartbeatState::new(worker_id.clone(), name.clone());
387
388 server_state
390 .set_heartbeat_state(Arc::new(heartbeat_state.clone()))
391 .await;
392
393 let bus = AgentBus::new().into_arc();
395 server_state.set_bus(bus.clone()).await;
396
397 crate::bus::s3_sink::spawn_bus_s3_sink(bus.clone());
399
400 {
401 let handle = bus.handle(&worker_id);
402 handle.announce_ready(worker_capabilities());
403 }
404
405 let task_runtime = WorkerTaskRuntime {
406 client: client.clone(),
407 server: server.to_string(),
408 token: args.token.clone(),
409 worker_id: worker_id.clone(),
410 processing: processing.clone(),
411 max_concurrent_tasks,
412 auto_approve,
413 bus: bus.clone(),
414 };
415
416 {
418 let codebases = shared_codebases.lock().await.clone();
419 register_worker(
420 &client,
421 server,
422 &args.token,
423 &worker_id,
424 &name,
425 &codebases,
426 args.public_url.as_deref(),
427 )
428 .await?;
429 }
430
431 if let Err(e) =
432 sync_workspaces_from_server(&client, server, &args.token, &shared_codebases).await
433 {
434 tracing::warn!(error = %e, "Initial workspace sync failed");
435 }
436
437 server_state.set_connected(true).await;
439
440 fetch_pending_tasks(&task_runtime).await?;
442
443 let _workspace_sync_handle = start_workspace_sync(
445 client.clone(),
446 server.to_string(),
447 args.token.clone(),
448 shared_codebases.clone(),
449 );
450
451 loop {
453 let codebases = shared_codebases.lock().await.clone();
455
456 let (task_notify_tx, task_notify_rx) = mpsc::channel::<String>(32);
459 server_state
460 .set_task_notification_channel(task_notify_tx)
461 .await;
462
463 server_state.set_connected(true).await;
465
466 if let Err(e) = register_worker(
468 &client,
469 server,
470 &args.token,
471 &worker_id,
472 &name,
473 &codebases,
474 args.public_url.as_deref(),
475 )
476 .await
477 {
478 tracing::warn!("Failed to re-register worker on reconnection: {}", e);
479 }
480 if let Err(e) = fetch_pending_tasks(&task_runtime).await {
481 tracing::warn!("Reconnect task fetch failed: {}", e);
482 }
483
484 let heartbeat_handle = start_heartbeat(
486 client.clone(),
487 server.to_string(),
488 args.token.clone(),
489 heartbeat_state.clone(),
490 processing.clone(),
491 cognition_heartbeat.clone(),
492 );
493
494 match connect_stream(&task_runtime, &name, &codebases, Some(task_notify_rx)).await {
495 Ok(()) => {
496 tracing::warn!("Stream ended, reconnecting...");
497 }
498 Err(e) => {
499 tracing::error!("Stream error: {}, reconnecting...", e);
500 }
501 }
502
503 server_state.set_connected(false).await;
505
506 heartbeat_handle.abort();
508 tracing::debug!("Heartbeat cancelled for reconnection");
509
510 tokio::time::sleep(tokio::time::Duration::from_secs(5)).await;
511 }
512}
513
514pub fn generate_worker_id() -> String {
515 format!(
516 "wrk_{}_{:x}",
517 chrono::Utc::now().timestamp(),
518 rand::random::<u64>()
519 )
520}
521
522fn resolve_worker_id() -> String {
523 for key in ["CODETETHER_WORKER_ID", "A2A_WORKER_ID"] {
524 if let Ok(value) = std::env::var(key) {
525 let trimmed = value.trim();
526 if !trimmed.is_empty() {
527 return trimmed.to_string();
528 }
529 }
530 }
531 generate_worker_id()
532}
533
534fn normalize_max_concurrent_tasks(max_concurrent_tasks: usize) -> usize {
535 max_concurrent_tasks.max(1)
536}
537
538async fn reserve_task_slot(
539 processing: &Arc<Mutex<HashSet<String>>>,
540 task_id: &str,
541 max_concurrent_tasks: usize,
542) -> TaskReservation {
543 let mut proc = processing.lock().await;
544 if proc.contains(task_id) {
545 TaskReservation::AlreadyProcessing
546 } else if proc.len() >= max_concurrent_tasks {
547 TaskReservation::AtCapacity
548 } else {
549 proc.insert(task_id.to_string());
550 TaskReservation::Reserved
551 }
552}
553
554fn export_worker_runtime_env(server: &str, token: &Option<String>, worker_id: &str) {
555 unsafe {
559 std::env::set_var("CODETETHER_SERVER", server);
560 std::env::set_var("CODETETHER_WORKER_ID", worker_id);
561 if let Some(token) = token {
562 std::env::set_var("CODETETHER_TOKEN", token);
563 }
564 }
565}
566
567fn advertised_interfaces(public_url: Option<&str>) -> serde_json::Value {
568 let Some(base_url) = public_url
569 .map(str::trim)
570 .filter(|value| !value.is_empty())
571 .map(|value| value.trim_end_matches('/').to_string())
572 else {
573 return serde_json::json!({});
574 };
575
576 serde_json::json!({
577 "http": {
578 "base_url": base_url,
579 },
580 "bus": {
581 "stream_url": format!("{base_url}/v1/bus/stream"),
582 "publish_url": format!("{base_url}/v1/bus/publish"),
583 },
584 })
585}
586
587#[derive(Debug, Clone, Copy)]
598pub enum AutoApprove {
599 All,
600 Safe,
601 None,
602}
603
604pub const DEFAULT_A2A_SERVER_URL: &str = "https://api.codetether.run";
606
607const BASE_WORKER_CAPABILITIES: &[&str] = &[
609 "forage", "ralph", "swarm", "rlm", "a2a", "mcp", "grpc", "grpc-web", "jsonrpc",
610];
611
612fn worker_capabilities() -> Vec<String> {
613 let mut capabilities: Vec<String> = BASE_WORKER_CAPABILITIES
614 .iter()
615 .map(|capability| capability.to_string())
616 .collect();
617
618 let is_knative = std::env::var("KNATIVE_SERVICE")
619 .map(|value| {
620 let normalized = value.trim().to_lowercase();
621 normalized == "1" || normalized == "true" || normalized == "yes"
622 })
623 .unwrap_or(false);
624 if is_knative {
625 capabilities.push("knative".to_string());
626 }
627
628 capabilities
629}
630
631fn task_value<'a>(task: &'a serde_json::Value, key: &str) -> Option<&'a serde_json::Value> {
632 task.get("task")
633 .and_then(|t| t.get(key))
634 .or_else(|| task.get(key))
635}
636
637fn task_str<'a>(task: &'a serde_json::Value, key: &str) -> Option<&'a str> {
638 task_value(task, key).and_then(|v| v.as_str())
639}
640
641fn task_metadata(task: &serde_json::Value) -> serde_json::Map<String, serde_json::Value> {
642 task_value(task, "metadata")
643 .and_then(|m| m.as_object())
644 .cloned()
645 .unwrap_or_default()
646}
647
648fn model_ref_to_provider_model(model: &str) -> String {
649 if !model.contains('/') && model.contains(':') {
653 model.replacen(':', "/", 1)
654 } else {
655 model.to_string()
656 }
657}
658
659fn provider_preferences_for_tier(model_tier: Option<&str>) -> &'static [&'static str] {
660 match model_tier.unwrap_or("balanced") {
661 "fast" | "quick" => &[
662 "zai",
663 "openai",
664 "github-copilot",
665 "moonshotai",
666 "openrouter",
667 "novita",
668 "google",
669 "anthropic",
670 ],
671 "heavy" | "deep" => &[
672 "zai",
673 "anthropic",
674 "openai",
675 "github-copilot",
676 "moonshotai",
677 "openrouter",
678 "novita",
679 "google",
680 ],
681 _ => &[
682 "zai",
683 "openai",
684 "github-copilot",
685 "anthropic",
686 "moonshotai",
687 "openrouter",
688 "novita",
689 "google",
690 ],
691 }
692}
693
694fn choose_provider_for_tier<'a>(providers: &'a [&'a str], model_tier: Option<&str>) -> &'a str {
695 for preferred in provider_preferences_for_tier(model_tier) {
696 if let Some(found) = providers.iter().copied().find(|p| *p == *preferred) {
697 return found;
698 }
699 }
700 if let Some(found) = providers.iter().copied().find(|p| *p == "zai") {
701 return found;
702 }
703 providers[0]
704}
705
706fn default_model_for_provider(provider: &str, model_tier: Option<&str>) -> String {
707 match model_tier.unwrap_or("balanced") {
708 "fast" | "quick" => match provider {
709 "moonshotai" => "kimi-k2.5".to_string(),
710 "anthropic" => "claude-haiku-4-5".to_string(),
711 "openai" => "gpt-4o-mini".to_string(),
712 "google" => "gemini-2.5-flash".to_string(),
713 "zhipuai" | "zai" => "glm-5".to_string(),
714 "openrouter" => "z-ai/glm-5".to_string(),
715 "novita" => "Qwen/Qwen3.5-35B-A3B".to_string(),
716 "bedrock" => "amazon.nova-lite-v1:0".to_string(),
717 _ => "glm-5".to_string(),
718 },
719 "heavy" | "deep" => match provider {
720 "moonshotai" => "kimi-k2.5".to_string(),
721 "anthropic" => "claude-sonnet-4-20250514".to_string(),
722 "openai" => "o3".to_string(),
723 "google" => "gemini-2.5-pro".to_string(),
724 "zhipuai" | "zai" => "glm-5".to_string(),
725 "openrouter" => "z-ai/glm-5".to_string(),
726 "novita" => "Qwen/Qwen3.5-35B-A3B".to_string(),
727 "bedrock" => "us.anthropic.claude-opus-4-6-v1".to_string(),
728 _ => "glm-5".to_string(),
729 },
730 _ => match provider {
731 "moonshotai" => "kimi-k2.5".to_string(),
732 "anthropic" => "claude-sonnet-4-20250514".to_string(),
733 "openai" => "gpt-4o".to_string(),
734 "google" => "gemini-2.5-pro".to_string(),
735 "zhipuai" | "zai" => "glm-5".to_string(),
736 "openrouter" => "z-ai/glm-5".to_string(),
737 "novita" => "Qwen/Qwen3.5-35B-A3B".to_string(),
738 "bedrock" => "amazon.nova-lite-v1:0".to_string(),
739 _ => "glm-5".to_string(),
740 },
741 }
742}
743
744fn prefers_temperature_one(model: &str) -> bool {
745 let normalized = model.to_ascii_lowercase();
746 normalized.contains("kimi-k2") || normalized.contains("glm-") || normalized.contains("minimax")
747}
748
749fn is_swarm_agent(agent_type: &str) -> bool {
750 matches!(
751 agent_type.trim().to_ascii_lowercase().as_str(),
752 "swarm" | "parallel" | "multi-agent"
753 )
754}
755
756fn is_forage_agent(agent_type: &str) -> bool {
757 agent_type.trim().eq_ignore_ascii_case("forage")
758}
759
760fn metadata_lookup<'a>(
761 metadata: &'a serde_json::Map<String, serde_json::Value>,
762 key: &str,
763) -> Option<&'a serde_json::Value> {
764 metadata
765 .get(key)
766 .or_else(|| {
767 metadata
768 .get("routing")
769 .and_then(|v| v.as_object())
770 .and_then(|obj| obj.get(key))
771 })
772 .or_else(|| {
773 metadata
774 .get("swarm")
775 .and_then(|v| v.as_object())
776 .and_then(|obj| obj.get(key))
777 })
778 .or_else(|| {
779 metadata
780 .get("forage")
781 .and_then(|v| v.as_object())
782 .and_then(|obj| obj.get(key))
783 })
784}
785
786fn metadata_str(
787 metadata: &serde_json::Map<String, serde_json::Value>,
788 keys: &[&str],
789) -> Option<String> {
790 for key in keys {
791 if let Some(value) = metadata_lookup(metadata, key).and_then(|v| v.as_str()) {
792 let trimmed = value.trim();
793 if !trimmed.is_empty() {
794 return Some(trimmed.to_string());
795 }
796 }
797 }
798 None
799}
800
801fn metadata_usize(
802 metadata: &serde_json::Map<String, serde_json::Value>,
803 keys: &[&str],
804) -> Option<usize> {
805 for key in keys {
806 if let Some(value) = metadata_lookup(metadata, key) {
807 if let Some(v) = value.as_u64() {
808 return usize::try_from(v).ok();
809 }
810 if let Some(v) = value.as_i64()
811 && v >= 0
812 {
813 return usize::try_from(v as u64).ok();
814 }
815 if let Some(v) = value.as_str()
816 && let Ok(parsed) = v.trim().parse::<usize>()
817 {
818 return Some(parsed);
819 }
820 }
821 }
822 None
823}
824
825fn metadata_u64(
826 metadata: &serde_json::Map<String, serde_json::Value>,
827 keys: &[&str],
828) -> Option<u64> {
829 for key in keys {
830 if let Some(value) = metadata_lookup(metadata, key) {
831 if let Some(v) = value.as_u64() {
832 return Some(v);
833 }
834 if let Some(v) = value.as_i64()
835 && v >= 0
836 {
837 return Some(v as u64);
838 }
839 if let Some(v) = value.as_str()
840 && let Ok(parsed) = v.trim().parse::<u64>()
841 {
842 return Some(parsed);
843 }
844 }
845 }
846 None
847}
848
849fn metadata_bool(
850 metadata: &serde_json::Map<String, serde_json::Value>,
851 keys: &[&str],
852) -> Option<bool> {
853 for key in keys {
854 if let Some(value) = metadata_lookup(metadata, key) {
855 if let Some(v) = value.as_bool() {
856 return Some(v);
857 }
858 if let Some(v) = value.as_str() {
859 match v.trim().to_ascii_lowercase().as_str() {
860 "1" | "true" | "yes" | "on" => return Some(true),
861 "0" | "false" | "no" | "off" => return Some(false),
862 _ => {}
863 }
864 }
865 }
866 }
867 None
868}
869
870fn metadata_f64(
871 metadata: &serde_json::Map<String, serde_json::Value>,
872 keys: &[&str],
873) -> Option<f64> {
874 for key in keys {
875 if let Some(value) = metadata_lookup(metadata, key) {
876 if let Some(v) = value.as_f64() {
877 return Some(v);
878 }
879 if let Some(v) = value.as_i64() {
880 return Some(v as f64);
881 }
882 if let Some(v) = value.as_u64() {
883 return Some(v as f64);
884 }
885 if let Some(v) = value.as_str()
886 && let Ok(parsed) = v.trim().parse::<f64>()
887 {
888 return Some(parsed);
889 }
890 }
891 }
892 None
893}
894
895fn metadata_string_list(
896 metadata: &serde_json::Map<String, serde_json::Value>,
897 keys: &[&str],
898) -> Vec<String> {
899 for key in keys {
900 let Some(value) = metadata_lookup(metadata, key) else {
901 continue;
902 };
903
904 if let Some(items) = value.as_array() {
905 let parsed = items
906 .iter()
907 .filter_map(|item| item.as_str())
908 .map(str::trim)
909 .filter(|item| !item.is_empty())
910 .map(ToString::to_string)
911 .collect::<Vec<_>>();
912 if !parsed.is_empty() {
913 return parsed;
914 }
915 }
916
917 if let Some(value) = value.as_str() {
918 let parsed = value
919 .split(['\n', ','])
920 .map(str::trim)
921 .filter(|item| !item.is_empty())
922 .map(ToString::to_string)
923 .collect::<Vec<_>>();
924 if !parsed.is_empty() {
925 return parsed;
926 }
927 }
928 }
929
930 Vec::new()
931}
932
933fn normalize_forage_execution_engine(value: Option<String>) -> String {
934 match value
935 .as_deref()
936 .unwrap_or("run")
937 .trim()
938 .to_ascii_lowercase()
939 .as_str()
940 {
941 "swarm" => "swarm".to_string(),
942 "go" => "go".to_string(),
943 _ => "run".to_string(),
944 }
945}
946
947fn normalize_forage_swarm_strategy(value: Option<String>) -> String {
948 match value
949 .as_deref()
950 .unwrap_or("auto")
951 .trim()
952 .to_ascii_lowercase()
953 .as_str()
954 {
955 "domain" => "domain".to_string(),
956 "data" => "data".to_string(),
957 "stage" => "stage".to_string(),
958 "none" => "none".to_string(),
959 _ => "auto".to_string(),
960 }
961}
962
963fn build_forage_args(
964 prompt: &str,
965 title: &str,
966 metadata: &serde_json::Map<String, serde_json::Value>,
967 selected_model: Option<String>,
968) -> ForageArgs {
969 let mut moonshots = metadata_string_list(
970 metadata,
971 &["moonshots", "moonshot", "goals", "mission", "missions"],
972 );
973 if moonshots.is_empty() {
974 let fallback = prompt.trim();
975 if !fallback.is_empty() {
976 moonshots.push(fallback.to_string());
977 } else if !title.trim().is_empty() {
978 moonshots.push(title.trim().to_string());
979 }
980 }
981
982 ForageArgs {
983 top: metadata_usize(metadata, &["top"]).unwrap_or(3),
984 loop_mode: metadata_bool(metadata, &["loop", "loop_mode"]).unwrap_or(false),
985 interval_secs: metadata_u64(metadata, &["interval_secs", "interval"]).unwrap_or(120),
986 max_cycles: metadata_usize(metadata, &["max_cycles"]).unwrap_or(0),
987 execute: metadata_bool(metadata, &["execute"]).unwrap_or(false),
988 no_s3: metadata_bool(metadata, &["no_s3", "local_only"]).unwrap_or(true),
990 moonshots,
991 moonshot_file: metadata_str(metadata, &["moonshot_file"]).map(PathBuf::from),
992 moonshot_required: metadata_bool(metadata, &["moonshot_required"]).unwrap_or(false),
993 moonshot_min_alignment: metadata_f64(metadata, &["moonshot_min_alignment"]).unwrap_or(0.10),
994 execution_engine: normalize_forage_execution_engine(metadata_str(
995 metadata,
996 &["execution_engine", "engine"],
997 )),
998 run_timeout_secs: metadata_u64(metadata, &["run_timeout_secs", "timeout_secs", "timeout"])
999 .unwrap_or(900),
1000 fail_fast: metadata_bool(metadata, &["fail_fast"]).unwrap_or(false),
1001 swarm_strategy: normalize_forage_swarm_strategy(metadata_str(
1002 metadata,
1003 &["swarm_strategy", "strategy"],
1004 )),
1005 swarm_max_subagents: metadata_usize(metadata, &["swarm_max_subagents"]).unwrap_or(8),
1006 swarm_max_steps: metadata_usize(metadata, &["swarm_max_steps"]).unwrap_or(100),
1007 swarm_subagent_timeout_secs: metadata_u64(metadata, &["swarm_subagent_timeout_secs"])
1008 .unwrap_or(300),
1009 model: selected_model,
1010 json: metadata_bool(metadata, &["json"]).unwrap_or(false),
1011 }
1012}
1013
1014fn parse_swarm_strategy(
1015 metadata: &serde_json::Map<String, serde_json::Value>,
1016) -> DecompositionStrategy {
1017 match metadata_str(
1018 metadata,
1019 &[
1020 "decomposition_strategy",
1021 "swarm_strategy",
1022 "strategy",
1023 "swarm_decomposition",
1024 ],
1025 )
1026 .as_deref()
1027 .map(|s| s.to_ascii_lowercase())
1028 .as_deref()
1029 {
1030 Some("none") | Some("single") => DecompositionStrategy::None,
1031 Some("domain") | Some("by_domain") => DecompositionStrategy::ByDomain,
1032 Some("data") | Some("by_data") => DecompositionStrategy::ByData,
1033 Some("stage") | Some("by_stage") => DecompositionStrategy::ByStage,
1034 _ => DecompositionStrategy::Automatic,
1035 }
1036}
1037
1038async fn resolve_swarm_model(
1039 explicit_model: Option<String>,
1040 model_tier: Option<&str>,
1041) -> Option<String> {
1042 if let Some(model) = explicit_model
1043 && !model.trim().is_empty()
1044 {
1045 return Some(model);
1046 }
1047
1048 let registry = ProviderRegistry::from_vault().await.ok()?;
1049 let providers = registry.list();
1050 if providers.is_empty() {
1051 return None;
1052 }
1053 let provider = choose_provider_for_tier(providers.as_slice(), model_tier);
1054 let model = default_model_for_provider(provider, model_tier);
1055 Some(format!("{}/{}", provider, model))
1056}
1057
1058fn format_swarm_event_for_output(event: &SwarmEvent) -> Option<String> {
1059 match event {
1060 SwarmEvent::Started {
1061 task,
1062 total_subtasks,
1063 } => Some(format!(
1064 "[swarm] started task={} planned_subtasks={}",
1065 task, total_subtasks
1066 )),
1067 SwarmEvent::StageComplete {
1068 stage,
1069 completed,
1070 failed,
1071 } => Some(format!(
1072 "[swarm] stage={} completed={} failed={}",
1073 stage, completed, failed
1074 )),
1075 SwarmEvent::SubTaskUpdate { id, status, .. } => Some(format!(
1076 "[swarm] subtask id={} status={}",
1077 &id.chars().take(8).collect::<String>(),
1078 format!("{status:?}").to_ascii_lowercase()
1079 )),
1080 SwarmEvent::AgentToolCall {
1081 subtask_id,
1082 tool_name,
1083 } => Some(format!(
1084 "[swarm] subtask id={} tool={}",
1085 &subtask_id.chars().take(8).collect::<String>(),
1086 tool_name
1087 )),
1088 SwarmEvent::AgentError { subtask_id, error } => Some(format!(
1089 "[swarm] subtask id={} error={}",
1090 &subtask_id.chars().take(8).collect::<String>(),
1091 error
1092 )),
1093 SwarmEvent::Complete { success, stats } => Some(format!(
1094 "[swarm] complete success={} subtasks={} speedup={:.2}",
1095 success,
1096 stats.subagents_completed + stats.subagents_failed,
1097 stats.speedup_factor
1098 )),
1099 SwarmEvent::Error(err) => Some(format!("[swarm] error message={}", err)),
1100 _ => None,
1101 }
1102}
1103
1104pub async fn register_worker(
1105 client: &Client,
1106 server: &str,
1107 token: &Option<String>,
1108 worker_id: &str,
1109 name: &str,
1110 codebases: &[String],
1111 public_url: Option<&str>,
1112) -> Result<()> {
1113 let models = match load_provider_models().await {
1115 Ok(m) => m,
1116 Err(e) => {
1117 tracing::warn!(
1118 "Failed to load provider models: {}, proceeding without model info",
1119 e
1120 );
1121 HashMap::new()
1122 }
1123 };
1124
1125 let mut req = client.post(format!("{}/v1/agent/workers/register", server));
1127
1128 if let Some(t) = token {
1129 req = req.bearer_auth(t);
1130 }
1131
1132 let models_array: Vec<serde_json::Value> = models
1135 .iter()
1136 .flat_map(|(provider, model_infos)| {
1137 model_infos.iter().map(move |m| {
1138 let mut obj = serde_json::json!({
1139 "id": format!("{}/{}", provider, m.id),
1140 "name": &m.id,
1141 "provider": provider,
1142 "provider_id": provider,
1143 });
1144 if let Some(input_cost) = m.input_cost_per_million {
1145 obj["input_cost_per_million"] = serde_json::json!(input_cost);
1146 }
1147 if let Some(output_cost) = m.output_cost_per_million {
1148 obj["output_cost_per_million"] = serde_json::json!(output_cost);
1149 }
1150 obj
1151 })
1152 })
1153 .collect();
1154
1155 tracing::info!(
1156 "Registering worker with {} models from {} providers",
1157 models_array.len(),
1158 models.len()
1159 );
1160
1161 let hostname = std::env::var("HOSTNAME")
1162 .or_else(|_| std::env::var("COMPUTERNAME"))
1163 .unwrap_or_else(|_| "unknown".to_string());
1164 let k8s_node_name = std::env::var("K8S_NODE_NAME")
1165 .ok()
1166 .map(|value| value.trim().to_string())
1167 .filter(|value| !value.is_empty());
1168
1169 let registry = crate::agent::AgentRegistry::with_builtins();
1171 let agent_defs: Vec<serde_json::Value> = registry
1172 .list()
1173 .iter()
1174 .map(|info| {
1175 serde_json::json!({
1176 "name": info.name,
1177 "description": info.description,
1178 "mode": format!("{:?}", info.mode).to_lowercase(),
1179 "native": info.native,
1180 "hidden": info.hidden,
1181 "model": info.model,
1182 "temperature": info.temperature,
1183 "top_p": info.top_p,
1184 "max_steps": info.max_steps,
1185 })
1186 })
1187 .collect();
1188
1189 let res = req
1190 .json(&serde_json::json!({
1191 "worker_id": worker_id,
1192 "name": name,
1193 "capabilities": worker_capabilities(),
1194 "hostname": hostname,
1195 "k8s_node_name": k8s_node_name,
1196 "models": models_array,
1197 "workspaces": codebases,
1198 "interfaces": advertised_interfaces(public_url),
1199 "agents": agent_defs,
1200 }))
1201 .send()
1202 .await?;
1203
1204 if res.status().is_success() {
1205 tracing::info!("Worker registered successfully");
1206 } else {
1207 tracing::warn!("Failed to register worker: {}", res.status());
1208 }
1209
1210 Ok(())
1211}
1212
1213async fn load_provider_models() -> Result<HashMap<String, Vec<crate::provider::ModelInfo>>> {
1222 use tokio::sync::OnceCell;
1223 static CACHE: OnceCell<HashMap<String, Vec<crate::provider::ModelInfo>>> =
1224 OnceCell::const_new();
1225 CACHE
1226 .get_or_try_init(|| async { load_provider_models_uncached().await })
1227 .await
1228 .cloned()
1229}
1230
1231async fn load_provider_models_uncached() -> Result<HashMap<String, Vec<crate::provider::ModelInfo>>>
1232{
1233 let registry = match ProviderRegistry::from_vault().await {
1235 Ok(r) if !r.list().is_empty() => {
1236 tracing::info!("Loaded {} providers from Vault", r.list().len());
1237 r
1238 }
1239 Ok(_) => {
1240 tracing::warn!("Vault returned 0 providers, falling back to config/env vars");
1241 fallback_registry().await?
1242 }
1243 Err(e) => {
1244 tracing::warn!("Vault unreachable ({}), falling back to config/env vars", e);
1245 fallback_registry().await?
1246 }
1247 };
1248
1249 let mut models_by_provider: HashMap<String, Vec<crate::provider::ModelInfo>> = HashMap::new();
1250
1251 for provider_name in registry.list() {
1252 if let Some(provider) = registry.get(provider_name) {
1253 match provider.list_models().await {
1254 Ok(models) => {
1255 if !models.is_empty() {
1256 tracing::debug!("Provider {}: {} models", provider_name, models.len());
1257 models_by_provider.insert(provider_name.to_string(), models);
1258 }
1259 }
1260 Err(e) => {
1261 tracing::debug!("Failed to list models for {}: {}", provider_name, e);
1262 }
1263 }
1264 }
1265 }
1266
1267 Ok(models_by_provider)
1268}
1269
1270async fn fallback_registry() -> Result<ProviderRegistry> {
1272 let config = crate::config::Config::load().await.unwrap_or_default();
1273 ProviderRegistry::from_config(&config).await
1274}
1275
1276async fn fetch_pending_tasks(runtime: &WorkerTaskRuntime) -> Result<()> {
1277 tracing::info!("Checking for pending tasks...");
1278
1279 let mut req = runtime
1280 .client
1281 .get(format!("{}/v1/agent/tasks?status=pending", runtime.server));
1282 if let Some(t) = &runtime.token {
1283 req = req.bearer_auth(t);
1284 }
1285
1286 let res = req.send().await?;
1287 if !res.status().is_success() {
1288 return Ok(());
1289 }
1290
1291 let data: serde_json::Value = res.json().await?;
1292 let tasks = if let Some(arr) = data.as_array() {
1294 arr.clone()
1295 } else {
1296 data["tasks"].as_array().cloned().unwrap_or_default()
1297 };
1298
1299 tracing::info!("Found {} pending task(s)", tasks.len());
1300
1301 for task in tasks {
1302 if let Some(id) = task["id"].as_str() {
1303 match reserve_task_slot(&runtime.processing, id, runtime.max_concurrent_tasks).await {
1304 TaskReservation::Reserved => {
1305 let task_id = id.to_string();
1306 let runtime = runtime.clone();
1307
1308 tokio::spawn(async move {
1309 if let Err(e) = handle_task(&runtime, &task).await {
1310 tracing::error!("Task {} failed: {}", task_id, e);
1311 }
1312 runtime.processing.lock().await.remove(&task_id);
1313 });
1314 }
1315 TaskReservation::AlreadyProcessing => {}
1316 TaskReservation::AtCapacity => {
1317 tracing::debug!(
1318 max_concurrent_tasks = runtime.max_concurrent_tasks,
1319 "Worker is at task capacity; leaving remaining tasks pending"
1320 );
1321 break;
1322 }
1323 }
1324 }
1325 }
1326
1327 Ok(())
1328}
1329
1330async fn connect_stream(
1331 runtime: &WorkerTaskRuntime,
1332 name: &str,
1333 codebases: &[String],
1334 task_notify_rx: Option<mpsc::Receiver<String>>,
1335) -> Result<()> {
1336 let url = format!(
1337 "{}/v1/worker/tasks/stream?agent_name={}&worker_id={}",
1338 runtime.server,
1339 urlencoding::encode(name),
1340 urlencoding::encode(&runtime.worker_id)
1341 );
1342
1343 let mut req = runtime
1344 .client
1345 .get(&url)
1346 .header("Accept", "text/event-stream")
1347 .header("X-Worker-ID", &runtime.worker_id)
1348 .header("X-Agent-Name", name)
1349 .header("X-Codebases", codebases.join(","))
1350 .header("X-Workspaces", codebases.join(","));
1351
1352 if let Some(t) = &runtime.token {
1353 req = req.bearer_auth(t);
1354 }
1355
1356 let res = req.send().await?;
1357 if !res.status().is_success() {
1358 anyhow::bail!("Failed to connect: {}", res.status());
1359 }
1360
1361 tracing::info!("Connected to A2A server");
1362
1363 let mut stream = res.bytes_stream();
1364 let mut buffer = String::new();
1365 let mut poll_interval = tokio::time::interval(tokio::time::Duration::from_secs(30));
1366 poll_interval.tick().await; let mut task_notify_rx = task_notify_rx;
1370
1371 loop {
1372 tokio::select! {
1373 task_id = async {
1376 if let Some(ref mut rx) = task_notify_rx {
1377 rx.recv().await
1378 } else {
1379 futures::future::pending().await
1381 }
1382 } => {
1383 if let Some(task_id) = task_id {
1384 tracing::info!("Received task notification via CloudEvent: {}", task_id);
1385 if let Err(e) = poll_pending_tasks(runtime).await {
1387 tracing::warn!("Task notification poll failed: {}", e);
1388 }
1389 }
1390 }
1391 chunk = stream.next() => {
1392 match chunk {
1393 Some(Ok(chunk)) => {
1394 buffer.push_str(&String::from_utf8_lossy(&chunk));
1395
1396 while let Some(pos) = buffer.find("\n\n") {
1398 let event_str = buffer[..pos].to_string();
1399 buffer = buffer[pos + 2..].to_string();
1400
1401 if let Some(data_line) = event_str.lines().find(|l| l.starts_with("data:")) {
1402 let data = data_line.trim_start_matches("data:").trim();
1403 if data == "[DONE]" || data.is_empty() {
1404 continue;
1405 }
1406
1407 if let Ok(task) = serde_json::from_str::<serde_json::Value>(data) {
1408 spawn_task_handler(&task, runtime).await;
1409 }
1410 }
1411 }
1412 }
1413 Some(Err(e)) => {
1414 return Err(e.into());
1415 }
1416 None => {
1417 return Ok(());
1419 }
1420 }
1421 }
1422 _ = poll_interval.tick() => {
1423 if let Err(e) = poll_pending_tasks(runtime).await {
1425 tracing::warn!("Periodic task poll failed: {}", e);
1426 }
1427 }
1428 }
1429 }
1430}
1431
1432async fn spawn_task_handler(task: &serde_json::Value, runtime: &WorkerTaskRuntime) {
1433 if let Some(id) = task
1434 .get("task")
1435 .and_then(|t| t["id"].as_str())
1436 .or_else(|| task["id"].as_str())
1437 {
1438 match reserve_task_slot(&runtime.processing, id, runtime.max_concurrent_tasks).await {
1439 TaskReservation::Reserved => {
1440 let task_id = id.to_string();
1441 let task = task.clone();
1442 let runtime = runtime.clone();
1443
1444 tokio::spawn(async move {
1445 if let Err(e) = handle_task(&runtime, &task).await {
1446 tracing::error!("Task {} failed: {}", task_id, e);
1447 }
1448 runtime.processing.lock().await.remove(&task_id);
1449 });
1450 }
1451 TaskReservation::AlreadyProcessing => {}
1452 TaskReservation::AtCapacity => {
1453 tracing::debug!(
1454 task_id = id,
1455 max_concurrent_tasks = runtime.max_concurrent_tasks,
1456 "Worker is at task capacity; task will stay pending until a slot frees up"
1457 );
1458 }
1459 }
1460 }
1461}
1462
1463async fn poll_pending_tasks(runtime: &WorkerTaskRuntime) -> Result<()> {
1464 let mut req = runtime
1465 .client
1466 .get(format!("{}/v1/agent/tasks?status=pending", runtime.server));
1467 if let Some(t) = &runtime.token {
1468 req = req.bearer_auth(t);
1469 }
1470
1471 let res = req.send().await?;
1472 if !res.status().is_success() {
1473 return Ok(());
1474 }
1475
1476 let data: serde_json::Value = res.json().await?;
1477 let tasks = if let Some(arr) = data.as_array() {
1478 arr.clone()
1479 } else {
1480 data["tasks"].as_array().cloned().unwrap_or_default()
1481 };
1482
1483 if !tasks.is_empty() {
1484 tracing::debug!("Poll found {} pending task(s)", tasks.len());
1485 }
1486
1487 for task in &tasks {
1488 spawn_task_handler(task, runtime).await;
1489 }
1490
1491 Ok(())
1492}
1493
1494async fn handle_task(runtime: &WorkerTaskRuntime, task: &serde_json::Value) -> Result<()> {
1495 let task_id = task_str(task, "id").ok_or_else(|| anyhow::anyhow!("No task ID"))?;
1496 let title = task_str(task, "title").unwrap_or("Untitled");
1497
1498 tracing::info!("Handling task: {} ({})", title, task_id);
1499
1500 let mut req = runtime
1502 .client
1503 .post(format!("{}/v1/worker/tasks/claim", runtime.server))
1504 .header("X-Worker-ID", &runtime.worker_id);
1505 if let Some(t) = &runtime.token {
1506 req = req.bearer_auth(t);
1507 }
1508
1509 let res = req
1510 .json(&serde_json::json!({ "task_id": task_id }))
1511 .send()
1512 .await?;
1513
1514 if !res.status().is_success() {
1515 let status = res.status();
1516 let text = res.text().await?;
1517 if status == reqwest::StatusCode::CONFLICT {
1518 tracing::debug!(task_id, "Task already claimed by another worker, skipping");
1519 } else {
1520 tracing::warn!(task_id, %status, "Failed to claim task: {}", text);
1521 }
1522 return Ok(());
1523 }
1524
1525 let claim = res.json::<TaskClaimResponse>().await?;
1526 let claim_provenance = claim.into_provenance();
1527 tracing::info!(
1528 task_id,
1529 run_id = ?claim_provenance.run_id,
1530 attempt_id = ?claim_provenance.attempt_id,
1531 "Claimed task"
1532 );
1533
1534 let inner_result: Result<(&str, Option<String>, Option<String>, Option<String>)> =
1536 execute_claimed_task(runtime, task, task_id, title, &claim_provenance).await;
1537
1538 let (status, result, error, session_id) = match inner_result {
1539 Ok(tuple) => tuple,
1540 Err(e) => {
1541 tracing::error!(
1542 task_id,
1543 error = %e,
1544 "Task failed after claim (releasing as failed)"
1545 );
1546 (
1547 "failed",
1548 None,
1549 Some(format!("Worker error after claim: {}", e)),
1550 None,
1551 )
1552 }
1553 };
1554
1555 release_task_result(
1556 &runtime.client,
1557 &runtime.server,
1558 &runtime.token,
1559 &runtime.worker_id,
1560 task_id,
1561 status,
1562 result,
1563 error,
1564 session_id,
1565 )
1566 .await?;
1567
1568 tracing::info!("Task released: {} with status: {}", task_id, status);
1569 Ok(())
1570}
1571
1572#[allow(clippy::too_many_lines)]
1575async fn execute_claimed_task<'a>(
1576 runtime: &WorkerTaskRuntime,
1577 task: &'a serde_json::Value,
1578 task_id: &'a str,
1579 title: &'a str,
1580 claim_provenance: &ClaimProvenance,
1581) -> Result<(&'static str, Option<String>, Option<String>, Option<String>)> {
1582 let metadata = task_metadata(task);
1583 let resume_session_id = metadata
1584 .get("resume_session_id")
1585 .and_then(|v| v.as_str())
1586 .map(|s| s.trim().to_string())
1587 .filter(|s| !s.is_empty());
1588 let complexity_hint = metadata_str(&metadata, &["complexity"]);
1589 let model_tier = metadata_str(&metadata, &["model_tier", "tier"])
1590 .map(|s| s.to_ascii_lowercase())
1591 .or_else(|| {
1592 complexity_hint.as_ref().map(|complexity| {
1593 match complexity.to_ascii_lowercase().as_str() {
1594 "quick" => "fast".to_string(),
1595 "deep" => "heavy".to_string(),
1596 _ => "balanced".to_string(),
1597 }
1598 })
1599 });
1600 let worker_personality = metadata_str(
1601 &metadata,
1602 &["worker_personality", "personality", "agent_personality"],
1603 );
1604 let target_agent_name = metadata_str(&metadata, &["target_agent_name", "agent_name"]);
1605 let raw_model = task_str(task, "model_ref")
1606 .or_else(|| metadata_lookup(&metadata, "model_ref").and_then(|v| v.as_str()))
1607 .or_else(|| task_str(task, "model"))
1608 .or_else(|| metadata_lookup(&metadata, "model").and_then(|v| v.as_str()));
1609 let selected_model = raw_model.map(model_ref_to_provider_model);
1610 let raw_agent = task_str(task, "agent_type")
1611 .or_else(|| task_str(task, "agent"))
1612 .unwrap_or("build");
1613 let prompt = task_str(task, "prompt")
1614 .or_else(|| task_str(task, "description"))
1615 .unwrap_or(title);
1616
1617 let workspace_id = task_str(task, "workspace_id")
1619 .or_else(|| metadata_lookup(&metadata, "workspace_id").and_then(|v| v.as_str()));
1620 let is_virtual_task = workspace_id.map_or(false, |ws| ws == "global" || ws.is_empty());
1621
1622 if raw_agent.eq_ignore_ascii_case("clone_repo") {
1623 return match handle_clone_repo_task(
1624 &runtime.client,
1625 &runtime.server,
1626 &runtime.token,
1627 &runtime.worker_id,
1628 task,
1629 &metadata,
1630 )
1631 .await
1632 {
1633 Ok(message) => Ok(("completed", Some(message), None, None)),
1634 Err(err) => {
1635 tracing::error!(task_id, error = %err, "Clone task failed");
1636 Ok(("failed", None, Some(format!("Error: {}", err)), None))
1637 }
1638 };
1639 }
1640
1641 if is_forage_agent(raw_agent) {
1642 return match handle_forage_task(title, prompt, &metadata, selected_model.clone()).await {
1643 Ok(message) => Ok(("completed", Some(message), None, None)),
1644 Err(err) => {
1645 tracing::error!(task_id, error = %err, "Forage task failed");
1646 Ok(("failed", None, Some(format!("Error: {}", err)), None))
1647 }
1648 };
1649 }
1650
1651 let mut session = if let Some(ref sid) = resume_session_id {
1653 match Session::load(sid).await {
1654 Ok(existing) => {
1655 tracing::info!("Resuming session {} for task {}", sid, task_id);
1656 existing
1657 }
1658 Err(e) => {
1659 tracing::warn!(
1660 "Could not load session {} for task {} ({}), starting a new session",
1661 sid,
1662 task_id,
1663 e
1664 );
1665 Session::new().await?
1666 }
1667 }
1668 } else {
1669 Session::new().await?
1670 };
1671
1672 if !is_virtual_task {
1673 if let Some(workspace_path) = resolve_task_workspace_dir(
1674 &runtime.client,
1675 &runtime.server,
1676 &runtime.token,
1677 workspace_id,
1678 )
1679 .await?
1680 {
1681 session.metadata.directory = Some(workspace_path);
1682 }
1683 }
1684
1685 if is_virtual_task {
1688 tracing::info!(
1689 task_id,
1690 workspace_id = workspace_id.unwrap_or("(none)"),
1691 "Virtual task detected — skipping workspace setup"
1692 );
1693 session.metadata.directory = None;
1694 }
1695
1696 let agent_type = if is_swarm_agent(raw_agent) {
1699 raw_agent
1700 } else {
1701 match raw_agent {
1702 "build" | "plan" => raw_agent,
1703 other => {
1704 tracing::info!(
1705 "Agent \"{}\" is not a primary agent, falling back to \"build\"",
1706 other
1707 );
1708 "build"
1709 }
1710 }
1711 };
1712 session.set_agent_name(agent_type.to_string());
1713 session.attach_claim_provenance(claim_provenance);
1714
1715 if !is_virtual_task {
1717 if let Some(directory) = session.metadata.directory.as_deref()
1718 && let Err(err) = install_commit_msg_hook(directory)
1719 {
1720 tracing::warn!(task_id, error = %err, "Failed to install commit-msg hook");
1721 }
1722 }
1723
1724 if let Some(model) = selected_model.clone() {
1725 session.metadata.model = Some(model);
1726 }
1727
1728 tracing::info!(task_id, agent_type, "Executing prompt: {}", prompt);
1729
1730 let stream_client = runtime.client.clone();
1732 let stream_server = runtime.server.clone();
1733 let stream_token = runtime.token.clone();
1734 let stream_worker_id = runtime.worker_id.clone();
1735 let stream_task_id = task_id.to_string();
1736 let stream_bus = Arc::clone(&runtime.bus);
1737
1738 let output_callback: Arc<dyn Fn(String) + Send + Sync + 'static> =
1739 Arc::new(move |output: String| {
1740 let c = stream_client.clone();
1741 let s = stream_server.clone();
1742 let t = stream_token.clone();
1743 let w = stream_worker_id.clone();
1744 let tid = stream_task_id.clone();
1745
1746 let bus_handle = stream_bus.handle("task-output");
1747 bus_handle.send(
1748 format!("task.{}", tid),
1749 crate::bus::BusMessage::TaskUpdate {
1750 task_id: tid.clone(),
1751 state: crate::a2a::types::TaskState::Working,
1752 message: Some(output.clone()),
1753 },
1754 );
1755
1756 tokio::spawn(async move {
1757 let mut req = c
1758 .post(format!("{}/v1/agent/tasks/{}/output", s, tid))
1759 .header("X-Worker-ID", &w);
1760 if let Some(tok) = &t {
1761 req = req.bearer_auth(tok);
1762 }
1763 let _ = req
1764 .json(&serde_json::json!({
1765 "worker_id": w,
1766 "output": output,
1767 }))
1768 .send()
1769 .await;
1770 });
1771 });
1772
1773 let (status, result, error, session_id) = if is_swarm_agent(agent_type) {
1775 match execute_swarm_with_policy(
1776 &mut session,
1777 prompt,
1778 model_tier.as_deref(),
1779 selected_model,
1780 &metadata,
1781 complexity_hint.as_deref(),
1782 worker_personality.as_deref(),
1783 target_agent_name.as_deref(),
1784 Some(&runtime.bus),
1785 Some(Arc::clone(&output_callback)),
1786 )
1787 .await
1788 {
1789 Ok((session_result, true)) => {
1790 tracing::info!("Swarm task completed successfully: {}", task_id);
1791 (
1792 "completed",
1793 Some(session_result.text),
1794 None,
1795 Some(session_result.session_id),
1796 )
1797 }
1798 Ok((session_result, false)) => {
1799 tracing::warn!("Swarm task completed with failures: {}", task_id);
1800 (
1801 "failed",
1802 Some(session_result.text),
1803 Some("Swarm execution completed with failures".to_string()),
1804 Some(session_result.session_id),
1805 )
1806 }
1807 Err(e) => {
1808 tracing::error!("Swarm task failed: {} - {}", task_id, e);
1809 ("failed", None, Some(format!("Error: {}", e)), None)
1810 }
1811 }
1812 } else {
1813 match execute_session_with_policy(
1814 &mut session,
1815 prompt,
1816 runtime.auto_approve,
1817 model_tier.as_deref(),
1818 Some(Arc::clone(&output_callback)),
1819 )
1820 .await
1821 {
1822 Ok(session_result) => {
1823 tracing::info!("Task completed successfully: {}", task_id);
1824 (
1825 "completed",
1826 Some(session_result.text),
1827 None,
1828 Some(session_result.session_id),
1829 )
1830 }
1831 Err(e) => {
1832 tracing::error!(task_id, error = %e, "Task execution failed");
1833 ("failed", None, Some(format!("Error: {}", e)), None)
1834 }
1835 }
1836 };
1837
1838 Ok((
1839 status,
1840 result,
1841 error,
1842 Some(session_id.unwrap_or_else(|| session.id.clone())),
1843 ))
1844}
1845
1846async fn handle_forage_task(
1847 title: &str,
1848 prompt: &str,
1849 metadata: &serde_json::Map<String, serde_json::Value>,
1850 selected_model: Option<String>,
1851) -> Result<String> {
1852 let forage_args = build_forage_args(prompt, title, metadata, selected_model);
1853 let summary = crate::forage::execute_with_summary(forage_args).await?;
1854 Ok(summary.render_text())
1855}
1856
1857async fn handle_clone_repo_task(
1858 client: &Client,
1859 server: &str,
1860 token: &Option<String>,
1861 worker_id: &str,
1862 task: &serde_json::Value,
1863 metadata: &serde_json::Map<String, serde_json::Value>,
1864) -> Result<String> {
1865 let workspace_id = metadata_str(metadata, &["workspace_id"])
1866 .or_else(|| task_str(task, "workspace_id").map(|value| value.to_string()))
1867 .ok_or_else(|| anyhow::anyhow!("Clone task is missing workspace_id metadata"))?;
1868 let workspace = fetch_workspace_record(client, server, token, &workspace_id).await?;
1869 let git_url = metadata_str(metadata, &["git_url"])
1870 .or_else(|| workspace.git_url.clone())
1871 .ok_or_else(|| anyhow::anyhow!("Workspace {} is missing git_url", workspace_id))?;
1872 let branch = metadata_str(metadata, &["git_branch"])
1873 .or_else(|| workspace.git_branch.clone())
1874 .unwrap_or_else(|| "main".to_string());
1875 let repo_path = resolve_workspace_clone_path(&workspace_id, &workspace.path);
1876
1877 if let Some(parent) = repo_path.parent() {
1878 tokio::fs::create_dir_all(parent)
1879 .await
1880 .with_context(|| format!("Failed to create repo parent {}", parent.display()))?;
1881 }
1882
1883 let temp_helper_path =
1884 git_clone_base_dir().join(format!(".{}-git-credential-helper", workspace_id));
1885 write_git_credential_helper_script(&temp_helper_path, &workspace_id)?;
1886
1887 let clone_result = async {
1888 let git_dir = repo_path.join(".git");
1889 if git_dir.exists() {
1890 configure_repo_git_auth(&repo_path, &workspace_id)?;
1891 configure_repo_git_github_app_from_agent_config(
1892 &repo_path,
1893 Some(&serde_json::Value::Object(workspace.agent_config.clone())),
1894 );
1895 run_git_command_at(
1896 Some(&repo_path),
1897 vec!["fetch".to_string(), "origin".to_string(), branch.clone()],
1898 )
1899 .await?;
1900 run_git_command_at(
1901 Some(&repo_path),
1902 vec!["checkout".to_string(), branch.clone()],
1903 )
1904 .await?;
1905 run_git_command_at(
1906 Some(&repo_path),
1907 vec![
1908 "pull".to_string(),
1909 "--ff-only".to_string(),
1910 "origin".to_string(),
1911 branch.clone(),
1912 ],
1913 )
1914 .await?;
1915 } else {
1916 prepare_clone_target(&repo_path).await?;
1917
1918 run_git_command_at(
1919 None,
1920 vec![
1921 "-c".to_string(),
1922 format!("credential.helper={}", temp_helper_path.display()),
1923 "-c".to_string(),
1924 "credential.useHttpPath=true".to_string(),
1925 "clone".to_string(),
1926 "--single-branch".to_string(),
1927 "--branch".to_string(),
1928 branch.clone(),
1929 git_url.clone(),
1930 repo_path.display().to_string(),
1931 ],
1932 )
1933 .await?;
1934 configure_repo_git_auth(&repo_path, &workspace_id)?;
1935 configure_repo_git_github_app_from_agent_config(
1936 &repo_path,
1937 Some(&serde_json::Value::Object(workspace.agent_config.clone())),
1938 );
1939 }
1940 install_commit_msg_hook(&repo_path)?;
1941
1942 register_cloned_workspace(client, server, token, worker_id, &workspace, &repo_path).await?;
1943 enqueue_post_clone_task(client, server, token, worker_id, &workspace_id, metadata).await?;
1944
1945 Ok::<String, anyhow::Error>(format!(
1946 "Repository ready at {} (branch: {})",
1947 repo_path.display(),
1948 branch
1949 ))
1950 }
1951 .await;
1952
1953 let _ = tokio::fs::remove_file(&temp_helper_path).await;
1954 clone_result
1955}
1956
1957async fn register_cloned_workspace(
1958 client: &Client,
1959 server: &str,
1960 token: &Option<String>,
1961 worker_id: &str,
1962 workspace: &RegisteredWorkspaceRecord,
1963 repo_path: &Path,
1964) -> Result<()> {
1965 let mut req = client.post(format!(
1966 "{}/v1/agent/workspaces",
1967 server.trim_end_matches('/')
1968 ));
1969 if let Some(token) = token {
1970 req = req.bearer_auth(token);
1971 }
1972
1973 let response = req
1974 .json(&serde_json::json!({
1975 "workspace_id": workspace.id,
1976 "name": workspace.name,
1977 "path": repo_path.display().to_string(),
1978 "description": workspace.description,
1979 "agent_config": workspace.agent_config,
1980 "git_url": workspace.git_url,
1981 "git_branch": workspace.git_branch,
1982 "worker_id": worker_id,
1983 }))
1984 .send()
1985 .await
1986 .context("Failed to register cloned workspace with server")?;
1987 let status = response.status();
1988 if !status.is_success() {
1989 let body = response.text().await.unwrap_or_default();
1990 anyhow::bail!(
1991 "Failed to register cloned workspace {} ({}): {}",
1992 workspace.id,
1993 status,
1994 body
1995 );
1996 }
1997
1998 Ok(())
1999}
2000
2001async fn enqueue_post_clone_task(
2002 client: &Client,
2003 server: &str,
2004 token: &Option<String>,
2005 worker_id: &str,
2006 workspace_id: &str,
2007 metadata: &serde_json::Map<String, serde_json::Value>,
2008) -> Result<()> {
2009 let Some(post_clone_task) = metadata.get("post_clone_task") else {
2010 return Ok(());
2011 };
2012 let Some(task) = post_clone_task.as_object() else {
2013 return Ok(());
2014 };
2015 let title = task
2016 .get("title")
2017 .and_then(|value| value.as_str())
2018 .filter(|value| !value.trim().is_empty())
2019 .ok_or_else(|| anyhow::anyhow!("post_clone_task is missing title"))?;
2020 let prompt = task
2021 .get("prompt")
2022 .and_then(|value| value.as_str())
2023 .filter(|value| !value.trim().is_empty())
2024 .ok_or_else(|| anyhow::anyhow!("post_clone_task is missing prompt"))?;
2025 let agent_type = task
2026 .get("agent_type")
2027 .and_then(|value| value.as_str())
2028 .unwrap_or("build");
2029 let mut task_metadata = task
2030 .get("metadata")
2031 .cloned()
2032 .unwrap_or_else(|| serde_json::json!({}));
2033 if let Some(task_metadata_obj) = task_metadata.as_object_mut() {
2034 task_metadata_obj
2035 .entry("target_worker_id".to_string())
2036 .or_insert_with(|| serde_json::Value::String(worker_id.to_string()));
2037 }
2038
2039 let mut req = client.post(format!(
2040 "{}/v1/agent/workspaces/{}/tasks",
2041 server.trim_end_matches('/'),
2042 workspace_id
2043 ));
2044 if let Some(token) = token {
2045 req = req.bearer_auth(token);
2046 }
2047
2048 let response = req
2049 .json(&serde_json::json!({
2050 "title": title,
2051 "prompt": prompt,
2052 "agent_type": agent_type,
2053 "metadata": task_metadata,
2054 }))
2055 .send()
2056 .await
2057 .context("Failed to enqueue post-clone task")?;
2058 let status = response.status();
2059 if !status.is_success() {
2060 let body = response.text().await.unwrap_or_default();
2061 anyhow::bail!("Failed to enqueue post-clone task ({}): {}", status, body);
2062 }
2063 Ok(())
2064}
2065
2066async fn run_git_command_at(current_dir: Option<&Path>, args: Vec<String>) -> Result<String> {
2067 let mut command = Command::new("git");
2068 if let Some(dir) = current_dir {
2069 command.current_dir(dir);
2070 }
2071
2072 let output = command
2073 .args(args.iter().map(String::as_str))
2074 .output()
2075 .await
2076 .context("Failed to execute git command")?;
2077
2078 if output.status.success() {
2079 return Ok(String::from_utf8_lossy(&output.stdout).trim().to_string());
2080 }
2081
2082 Err(anyhow::anyhow!(
2083 "Git command failed: {}",
2084 String::from_utf8_lossy(&output.stderr).trim()
2085 ))
2086}
2087
2088async fn release_task_result(
2089 client: &Client,
2090 server: &str,
2091 token: &Option<String>,
2092 worker_id: &str,
2093 task_id: &str,
2094 status: &str,
2095 result: Option<String>,
2096 error: Option<String>,
2097 session_id: Option<String>,
2098) -> Result<()> {
2099 let mut req = client
2100 .post(format!("{}/v1/worker/tasks/release", server))
2101 .header("X-Worker-ID", worker_id);
2102 if let Some(token) = token {
2103 req = req.bearer_auth(token);
2104 }
2105
2106 req.json(&serde_json::json!({
2107 "task_id": task_id,
2108 "status": status,
2109 "result": result,
2110 "error": error,
2111 "session_id": session_id,
2112 }))
2113 .send()
2114 .await
2115 .context("Failed to release task result")?;
2116
2117 Ok(())
2118}
2119
2120async fn execute_swarm_with_policy(
2121 session: &mut Session,
2122 prompt: &str,
2123 model_tier: Option<&str>,
2124 explicit_model: Option<String>,
2125 metadata: &serde_json::Map<String, serde_json::Value>,
2126 complexity_hint: Option<&str>,
2127 worker_personality: Option<&str>,
2128 target_agent_name: Option<&str>,
2129 bus: Option<&Arc<AgentBus>>,
2130 output_callback: Option<Arc<dyn Fn(String) + Send + Sync + 'static>>,
2131) -> Result<(crate::session::SessionResult, bool)> {
2132 use crate::provider::{ContentPart, Message, Role};
2133
2134 session.add_message(Message {
2135 role: Role::User,
2136 content: vec![ContentPart::Text {
2137 text: prompt.to_string(),
2138 }],
2139 });
2140
2141 if session.title.is_none() {
2142 session.generate_title().await?;
2143 }
2144
2145 let strategy = parse_swarm_strategy(metadata);
2146 let max_subagents = metadata_usize(
2147 metadata,
2148 &["swarm_max_subagents", "max_subagents", "subagents"],
2149 )
2150 .unwrap_or(10)
2151 .clamp(1, 100);
2152 let max_steps_per_subagent = metadata_usize(
2153 metadata,
2154 &[
2155 "swarm_max_steps_per_subagent",
2156 "max_steps_per_subagent",
2157 "max_steps",
2158 ],
2159 )
2160 .unwrap_or(50)
2161 .clamp(1, 200);
2162 let timeout_secs = metadata_u64(metadata, &["swarm_timeout_secs", "timeout_secs", "timeout"])
2163 .unwrap_or(600)
2164 .clamp(30, 3600);
2165 let parallel_enabled =
2166 metadata_bool(metadata, &["swarm_parallel_enabled", "parallel_enabled"]).unwrap_or(true);
2167
2168 let model = resolve_swarm_model(explicit_model, model_tier).await;
2169 if let Some(ref selected_model) = model {
2170 session.metadata.model = Some(selected_model.clone());
2171 }
2172
2173 if let Some(ref cb) = output_callback {
2174 cb(format!(
2175 "[swarm] routing complexity={} tier={} personality={} target_agent={}",
2176 complexity_hint.unwrap_or("standard"),
2177 model_tier.unwrap_or("balanced"),
2178 worker_personality.unwrap_or("auto"),
2179 target_agent_name.unwrap_or("auto")
2180 ));
2181 cb(format!(
2182 "[swarm] config strategy={:?} max_subagents={} max_steps={} timeout={}s tier={}",
2183 strategy,
2184 max_subagents,
2185 max_steps_per_subagent,
2186 timeout_secs,
2187 model_tier.unwrap_or("balanced")
2188 ));
2189 }
2190
2191 let swarm_config = SwarmConfig {
2192 max_subagents,
2193 max_steps_per_subagent,
2194 subagent_timeout_secs: timeout_secs,
2195 parallel_enabled,
2196 model,
2197 working_dir: session
2198 .metadata
2199 .directory
2200 .as_ref()
2201 .map(|p| p.to_string_lossy().to_string()),
2202 ..Default::default()
2203 };
2204
2205 let swarm_result = if output_callback.is_some() {
2206 let (event_tx, mut event_rx) = mpsc::channel(256);
2207 let mut executor = SwarmExecutor::new(swarm_config).with_event_tx(event_tx);
2208 if let Some(bus) = bus {
2209 executor = executor.with_bus(Arc::clone(bus));
2210 }
2211 let prompt_owned = prompt.to_string();
2212 let mut exec_handle =
2213 tokio::spawn(async move { executor.execute(&prompt_owned, strategy).await });
2214
2215 let mut final_result: Option<crate::swarm::SwarmResult> = None;
2216
2217 while final_result.is_none() {
2218 tokio::select! {
2219 maybe_event = event_rx.recv() => {
2220 if let Some(event) = maybe_event
2221 && let Some(ref cb) = output_callback
2222 && let Some(line) = format_swarm_event_for_output(&event) {
2223 cb(line);
2224 }
2225 }
2226 join_result = &mut exec_handle => {
2227 let joined = join_result.map_err(|e| anyhow::anyhow!("Swarm join failure: {}", e))?;
2228 final_result = Some(joined?);
2229 }
2230 }
2231 }
2232
2233 while let Ok(event) = event_rx.try_recv() {
2234 if let Some(ref cb) = output_callback
2235 && let Some(line) = format_swarm_event_for_output(&event)
2236 {
2237 cb(line);
2238 }
2239 }
2240
2241 final_result.ok_or_else(|| anyhow::anyhow!("Swarm execution returned no result"))?
2242 } else {
2243 let mut executor = SwarmExecutor::new(swarm_config);
2244 if let Some(bus) = bus {
2245 executor = executor.with_bus(Arc::clone(bus));
2246 }
2247 executor.execute(prompt, strategy).await?
2248 };
2249
2250 let final_text = if swarm_result.result.trim().is_empty() {
2251 if swarm_result.success {
2252 "Swarm completed without textual output.".to_string()
2253 } else {
2254 "Swarm finished with failures and no textual output.".to_string()
2255 }
2256 } else {
2257 swarm_result.result.clone()
2258 };
2259
2260 session.add_message(Message {
2261 role: Role::Assistant,
2262 content: vec![ContentPart::Text {
2263 text: final_text.clone(),
2264 }],
2265 });
2266 session.save().await?;
2267
2268 Ok((
2269 crate::session::SessionResult {
2270 text: final_text,
2271 session_id: session.id.clone(),
2272 },
2273 swarm_result.success,
2274 ))
2275}
2276
2277async fn execute_session_with_policy(
2280 session: &mut Session,
2281 prompt: &str,
2282 auto_approve: AutoApprove,
2283 model_tier: Option<&str>,
2284 output_callback: Option<Arc<dyn Fn(String) + Send + Sync + 'static>>,
2285) -> Result<crate::session::SessionResult> {
2286 use crate::provider::{
2287 CompletionRequest, ContentPart, Message, ProviderRegistry, Role, parse_model_string,
2288 };
2289 use std::sync::Arc;
2290
2291 let registry = ProviderRegistry::from_vault()
2293 .await
2294 .context("Failed to load provider registry from Vault — check VAULT_ADDR/VAULT_TOKEN")?;
2295 let providers = registry.list();
2296 tracing::info!("Available providers: {:?}", providers);
2297
2298 if providers.is_empty() {
2299 anyhow::bail!(
2300 "No LLM providers available (0 providers loaded). \
2301 Configure API keys in HashiCorp Vault or set environment variables. \
2302 Vault address: {}",
2303 std::env::var("VAULT_ADDR").unwrap_or_else(|_| "(not set)".into())
2304 );
2305 }
2306
2307 let (provider_name, model_id) = if let Some(ref model_str) = session.metadata.model {
2309 let (prov, model) = parse_model_string(model_str);
2310 let prov = prov.map(|p| if p == "zhipuai" { "zai" } else { p });
2311 if prov.is_some() {
2312 (prov.map(|s| s.to_string()), model.to_string())
2313 } else if providers.contains(&model) {
2314 (Some(model.to_string()), String::new())
2315 } else {
2316 (None, model.to_string())
2317 }
2318 } else {
2319 (None, String::new())
2320 };
2321
2322 let provider_slice = providers.as_slice();
2323 if let Some(explicit_provider) = provider_name.as_deref()
2324 && !providers.contains(&explicit_provider)
2325 {
2326 anyhow::bail!(
2327 "Provider '{}' selected explicitly but is unavailable. Available providers: {}",
2328 explicit_provider,
2329 providers.join(", ")
2330 );
2331 }
2332
2333 let selected_provider = provider_name
2335 .as_deref()
2336 .unwrap_or_else(|| choose_provider_for_tier(provider_slice, model_tier));
2337
2338 let provider = registry
2339 .get(selected_provider)
2340 .ok_or_else(|| anyhow::anyhow!("Provider {} not found", selected_provider))?;
2341
2342 session.add_message(Message {
2344 role: Role::User,
2345 content: vec![ContentPart::Text {
2346 text: prompt.to_string(),
2347 }],
2348 });
2349
2350 if session.title.is_none() {
2352 session.generate_title().await?;
2353 }
2354
2355 let model = if !model_id.is_empty() {
2357 model_id
2358 } else {
2359 default_model_for_provider(selected_provider, model_tier)
2360 };
2361
2362 let workspace_dir = session
2364 .metadata
2365 .directory
2366 .clone()
2367 .unwrap_or_else(|| std::env::current_dir().unwrap_or_default());
2368 let tool_registry = create_filtered_registry(
2369 Arc::clone(&provider),
2370 model.clone(),
2371 auto_approve,
2372 &workspace_dir,
2373 output_callback.clone(),
2374 );
2375 let tool_definitions = tool_registry.definitions();
2376
2377 let temperature = if prefers_temperature_one(&model) {
2378 Some(1.0)
2379 } else {
2380 Some(0.7)
2381 };
2382
2383 tracing::info!(
2384 "Using model: {} via provider: {} (tier: {:?})",
2385 model,
2386 selected_provider,
2387 model_tier
2388 );
2389 tracing::info!(
2390 "Available tools: {} (auto_approve: {:?})",
2391 tool_definitions.len(),
2392 auto_approve
2393 );
2394
2395 let system_prompt = crate::agent::builtin::build_system_prompt(&workspace_dir);
2397
2398 let mut final_output = String::new();
2399 let max_steps = 50;
2400
2401 for step in 1..=max_steps {
2402 tracing::info!(step = step, "Agent step starting");
2403
2404 let mut messages = vec![Message {
2406 role: Role::System,
2407 content: vec![ContentPart::Text {
2408 text: system_prompt.clone(),
2409 }],
2410 }];
2411 messages.extend(session.messages.clone());
2412
2413 let request = CompletionRequest {
2414 messages,
2415 tools: tool_definitions.clone(),
2416 model: model.clone(),
2417 temperature,
2418 top_p: None,
2419 max_tokens: Some(8192),
2420 stop: Vec::new(),
2421 };
2422
2423 let response = provider.complete(request).await?;
2424
2425 crate::telemetry::TOKEN_USAGE.record_model_usage(
2426 &model,
2427 response.usage.prompt_tokens as u64,
2428 response.usage.completion_tokens as u64,
2429 );
2430
2431 let tool_calls: Vec<(String, String, serde_json::Value)> = response
2433 .message
2434 .content
2435 .iter()
2436 .filter_map(|part| {
2437 if let ContentPart::ToolCall {
2438 id,
2439 name,
2440 arguments,
2441 ..
2442 } = part
2443 {
2444 let args: serde_json::Value =
2445 serde_json::from_str(arguments).unwrap_or(serde_json::json!({}));
2446 Some((id.clone(), name.clone(), args))
2447 } else {
2448 None
2449 }
2450 })
2451 .collect();
2452
2453 for part in &response.message.content {
2455 if let ContentPart::Text { text } = part
2456 && !text.is_empty()
2457 {
2458 final_output.push_str(text);
2459 final_output.push('\n');
2460 if let Some(ref cb) = output_callback {
2461 cb(text.clone());
2462 }
2463 }
2464 }
2465
2466 if tool_calls.is_empty() {
2468 session.add_message(response.message.clone());
2469 break;
2470 }
2471
2472 session.add_message(response.message.clone());
2473
2474 tracing::info!(
2475 step = step,
2476 num_tools = tool_calls.len(),
2477 "Executing tool calls"
2478 );
2479
2480 for (tool_id, tool_name, tool_input) in tool_calls {
2482 tracing::info!(tool = %tool_name, tool_id = %tool_id, "Executing tool");
2483
2484 if let Some(ref cb) = output_callback {
2486 cb(format!("[tool:start:{}]", tool_name));
2487 }
2488
2489 if !is_tool_allowed(&tool_name, auto_approve) {
2491 let msg = format!(
2492 "Tool '{}' requires approval but auto-approve policy is {:?}",
2493 tool_name, auto_approve
2494 );
2495 tracing::warn!(tool = %tool_name, "Tool blocked by auto-approve policy");
2496 session.add_message(Message {
2497 role: Role::Tool,
2498 content: vec![ContentPart::ToolResult {
2499 tool_call_id: tool_id,
2500 content: msg,
2501 }],
2502 });
2503 continue;
2504 }
2505
2506 let content = if let Some(tool) = tool_registry.get(&tool_name) {
2507 let exec_result: Result<crate::tool::ToolResult> =
2508 tool.execute(tool_input.clone()).await;
2509 match exec_result {
2510 Ok(result) => {
2511 tracing::info!(tool = %tool_name, success = result.success, "Tool execution completed");
2512 if let Some(ref cb) = output_callback {
2513 let status = if result.success { "ok" } else { "err" };
2514 cb(format!(
2515 "[tool:{}:{}] {}",
2516 tool_name,
2517 status,
2518 crate::util::truncate_bytes_safe(&result.output, 500)
2519 ));
2520 }
2521 result.output
2522 }
2523 Err(e) => {
2524 tracing::warn!(tool = %tool_name, error = %e, "Tool execution failed");
2525 if let Some(ref cb) = output_callback {
2526 cb(format!("[tool:{}:err] {}", tool_name, e));
2527 }
2528 format!("Error: {}", e)
2529 }
2530 }
2531 } else {
2532 tracing::warn!(tool = %tool_name, "Tool not found");
2533 format!("Error: Unknown tool '{}'", tool_name)
2534 };
2535
2536 session.add_message(Message {
2537 role: Role::Tool,
2538 content: vec![ContentPart::ToolResult {
2539 tool_call_id: tool_id,
2540 content,
2541 }],
2542 });
2543 }
2544 }
2545
2546 session.save().await?;
2547
2548 Ok(crate::session::SessionResult {
2549 text: final_output.trim().to_string(),
2550 session_id: session.id.clone(),
2551 })
2552}
2553
2554pub fn start_heartbeat(
2557 client: Client,
2558 server: String,
2559 token: Option<String>,
2560 heartbeat_state: HeartbeatState,
2561 processing: Arc<Mutex<HashSet<String>>>,
2562 cognition_config: CognitionHeartbeatConfig,
2563) -> JoinHandle<()> {
2564 tokio::spawn(async move {
2565 let mut consecutive_failures = 0u32;
2566 const MAX_FAILURES: u32 = 3;
2567 const HEARTBEAT_INTERVAL_SECS: u64 = 30;
2568 const COGNITION_RETRY_COOLDOWN_SECS: u64 = 300;
2569 let mut cognition_payload_disabled_until: Option<Instant> = None;
2570
2571 let mut interval =
2572 tokio::time::interval(tokio::time::Duration::from_secs(HEARTBEAT_INTERVAL_SECS));
2573 interval.set_missed_tick_behavior(tokio::time::MissedTickBehavior::Skip);
2574
2575 loop {
2576 interval.tick().await;
2577
2578 let active_count = processing.lock().await.len();
2580 heartbeat_state.set_task_count(active_count).await;
2581
2582 let status = if active_count > 0 {
2584 WorkerStatus::Processing
2585 } else {
2586 WorkerStatus::Idle
2587 };
2588 heartbeat_state.set_status(status).await;
2589
2590 let url = format!(
2592 "{}/v1/agent/workers/{}/heartbeat",
2593 server, heartbeat_state.worker_id
2594 );
2595 let mut req = client.post(&url);
2596
2597 if let Some(ref t) = token {
2598 req = req.bearer_auth(t);
2599 }
2600
2601 let status_str = heartbeat_state.status.lock().await.as_str().to_string();
2602 let sub_agents = heartbeat_state.sub_agents_snapshot().await;
2603 let base_payload = serde_json::json!({
2604 "worker_id": &heartbeat_state.worker_id,
2605 "agent_name": &heartbeat_state.agent_name,
2606 "status": status_str,
2607 "active_task_count": active_count,
2608 "sub_agents": sub_agents,
2609 });
2610 let mut payload = base_payload.clone();
2611 let mut included_cognition_payload = false;
2612 let cognition_payload_allowed = cognition_payload_disabled_until
2613 .map(|until| Instant::now() >= until)
2614 .unwrap_or(true);
2615
2616 if cognition_config.enabled
2617 && cognition_payload_allowed
2618 && let Some(cognition_payload) =
2619 fetch_cognition_heartbeat_payload(&client, &cognition_config).await
2620 && let Some(obj) = payload.as_object_mut()
2621 {
2622 obj.insert("cognition".to_string(), cognition_payload);
2623 included_cognition_payload = true;
2624 }
2625
2626 match req.json(&payload).send().await {
2627 Ok(res) => {
2628 if res.status().is_success() {
2629 consecutive_failures = 0;
2630 tracing::debug!(
2631 worker_id = %heartbeat_state.worker_id,
2632 status = status_str,
2633 active_tasks = active_count,
2634 "Heartbeat sent successfully"
2635 );
2636 } else if included_cognition_payload && res.status().is_client_error() {
2637 tracing::warn!(
2638 worker_id = %heartbeat_state.worker_id,
2639 status = %res.status(),
2640 "Heartbeat cognition payload rejected, retrying without cognition payload"
2641 );
2642
2643 let mut retry_req = client.post(&url);
2644 if let Some(ref t) = token {
2645 retry_req = retry_req.bearer_auth(t);
2646 }
2647
2648 match retry_req.json(&base_payload).send().await {
2649 Ok(retry_res) if retry_res.status().is_success() => {
2650 cognition_payload_disabled_until = Some(
2651 Instant::now()
2652 + Duration::from_secs(COGNITION_RETRY_COOLDOWN_SECS),
2653 );
2654 consecutive_failures = 0;
2655 tracing::warn!(
2656 worker_id = %heartbeat_state.worker_id,
2657 retry_after_secs = COGNITION_RETRY_COOLDOWN_SECS,
2658 "Paused cognition heartbeat payload after schema rejection"
2659 );
2660 }
2661 Ok(retry_res) => {
2662 consecutive_failures += 1;
2663 tracing::warn!(
2664 worker_id = %heartbeat_state.worker_id,
2665 status = %retry_res.status(),
2666 failures = consecutive_failures,
2667 "Heartbeat failed even after retry without cognition payload"
2668 );
2669 }
2670 Err(e) => {
2671 consecutive_failures += 1;
2672 tracing::warn!(
2673 worker_id = %heartbeat_state.worker_id,
2674 error = %e,
2675 failures = consecutive_failures,
2676 "Heartbeat retry without cognition payload failed"
2677 );
2678 }
2679 }
2680 } else {
2681 consecutive_failures += 1;
2682 tracing::warn!(
2683 worker_id = %heartbeat_state.worker_id,
2684 status = %res.status(),
2685 failures = consecutive_failures,
2686 "Heartbeat failed"
2687 );
2688 }
2689 }
2690 Err(e) => {
2691 consecutive_failures += 1;
2692 tracing::warn!(
2693 worker_id = %heartbeat_state.worker_id,
2694 error = %e,
2695 failures = consecutive_failures,
2696 "Heartbeat request failed"
2697 );
2698 }
2699 }
2700
2701 if consecutive_failures >= MAX_FAILURES {
2703 tracing::error!(
2704 worker_id = %heartbeat_state.worker_id,
2705 failures = consecutive_failures,
2706 "Heartbeat failed {} consecutive times - worker will continue running and attempt reconnection via SSE loop",
2707 MAX_FAILURES
2708 );
2709 consecutive_failures = 0;
2711 }
2712 }
2713 })
2714}
2715
2716async fn fetch_cognition_heartbeat_payload(
2717 client: &Client,
2718 config: &CognitionHeartbeatConfig,
2719) -> Option<serde_json::Value> {
2720 let status_url = format!("{}/v1/cognition/status", config.source_base_url);
2721 let status_res = tokio::time::timeout(
2722 Duration::from_millis(config.request_timeout_ms),
2723 client.get(status_url).send(),
2724 )
2725 .await
2726 .ok()?
2727 .ok()?;
2728
2729 if !status_res.status().is_success() {
2730 return None;
2731 }
2732
2733 let status: CognitionStatusSnapshot = status_res.json().await.ok()?;
2734 let mut payload = serde_json::json!({
2735 "running": status.running,
2736 "last_tick_at": status.last_tick_at,
2737 "active_persona_count": status.active_persona_count,
2738 "events_buffered": status.events_buffered,
2739 "snapshots_buffered": status.snapshots_buffered,
2740 "loop_interval_ms": status.loop_interval_ms,
2741 });
2742
2743 if config.include_thought_summary {
2744 let snapshot_url = format!("{}/v1/cognition/snapshots/latest", config.source_base_url);
2745 let snapshot_res = tokio::time::timeout(
2746 Duration::from_millis(config.request_timeout_ms),
2747 client.get(snapshot_url).send(),
2748 )
2749 .await
2750 .ok()
2751 .and_then(Result::ok);
2752
2753 if let Some(snapshot_res) = snapshot_res
2754 && snapshot_res.status().is_success()
2755 && let Ok(snapshot) = snapshot_res.json::<CognitionLatestSnapshot>().await
2756 && let Some(obj) = payload.as_object_mut()
2757 {
2758 obj.insert(
2759 "latest_snapshot_at".to_string(),
2760 serde_json::Value::String(snapshot.generated_at),
2761 );
2762 obj.insert(
2763 "latest_thought".to_string(),
2764 serde_json::Value::String(trim_for_heartbeat(
2765 &snapshot.summary,
2766 config.summary_max_chars,
2767 )),
2768 );
2769 if let Some(model) = snapshot
2770 .metadata
2771 .get("model")
2772 .and_then(serde_json::Value::as_str)
2773 {
2774 obj.insert(
2775 "latest_thought_model".to_string(),
2776 serde_json::Value::String(model.to_string()),
2777 );
2778 }
2779 if let Some(source) = snapshot
2780 .metadata
2781 .get("source")
2782 .and_then(serde_json::Value::as_str)
2783 {
2784 obj.insert(
2785 "latest_thought_source".to_string(),
2786 serde_json::Value::String(source.to_string()),
2787 );
2788 }
2789 }
2790 }
2791
2792 Some(payload)
2793}
2794
2795fn trim_for_heartbeat(input: &str, max_chars: usize) -> String {
2796 if input.chars().count() <= max_chars {
2797 return input.trim().to_string();
2798 }
2799
2800 let mut trimmed = String::with_capacity(max_chars + 3);
2801 for ch in input.chars().take(max_chars) {
2802 trimmed.push(ch);
2803 }
2804 trimmed.push_str("...");
2805 trimmed.trim().to_string()
2806}
2807
2808fn env_bool(name: &str, default: bool) -> bool {
2809 std::env::var(name)
2810 .ok()
2811 .and_then(|v| match v.to_ascii_lowercase().as_str() {
2812 "1" | "true" | "yes" | "on" => Some(true),
2813 "0" | "false" | "no" | "off" => Some(false),
2814 _ => None,
2815 })
2816 .unwrap_or(default)
2817}
2818
2819fn env_usize(name: &str, default: usize) -> usize {
2820 std::env::var(name)
2821 .ok()
2822 .and_then(|v| v.parse::<usize>().ok())
2823 .unwrap_or(default)
2824}
2825
2826fn env_u64(name: &str, default: u64) -> u64 {
2827 std::env::var(name)
2828 .ok()
2829 .and_then(|v| v.parse::<u64>().ok())
2830 .unwrap_or(default)
2831}
2832
2833fn maybe_configure_repo_git_auth_from_entry(entry: &serde_json::Value) {
2834 let Some(workspace_id) = entry.get("id").and_then(|v| v.as_str()) else {
2835 return;
2836 };
2837 let Some(path) = entry.get("path").and_then(|v| v.as_str()) else {
2838 return;
2839 };
2840 let has_git_remote = entry
2841 .get("git_url")
2842 .and_then(|v| v.as_str())
2843 .is_some_and(|value| !value.trim().is_empty());
2844 if !has_git_remote {
2845 return;
2846 }
2847
2848 let repo_path = Path::new(path);
2849 if !repo_path.join(".git").exists() {
2850 return;
2851 }
2852
2853 if let Err(error) = configure_repo_git_auth(repo_path, workspace_id) {
2854 tracing::debug!(
2855 workspace_id,
2856 path,
2857 error = %error,
2858 "Workspace sync could not install Git credential helper"
2859 );
2860 }
2861 configure_repo_git_github_app_from_agent_config(repo_path, entry.get("agent_config"));
2862}
2863
2864fn configure_repo_git_github_app_from_agent_config(
2865 repo_path: &Path,
2866 agent_config: Option<&serde_json::Value>,
2867) {
2868 let installation_id = agent_config
2869 .and_then(|value| value.get("git_auth"))
2870 .and_then(|value| value.get("github_app"))
2871 .and_then(|value| value.get("installation_id"))
2872 .and_then(|value| value.as_str());
2873 let app_id = agent_config
2874 .and_then(|value| value.get("git_auth"))
2875 .and_then(|value| value.get("github_app"))
2876 .and_then(|value| value.get("app_id"))
2877 .and_then(|value| value.as_str());
2878 if let Err(error) = configure_repo_git_github_app(repo_path, installation_id, app_id) {
2879 tracing::debug!(path = %repo_path.display(), error = %error, "Failed to persist GitHub App repo metadata");
2880 }
2881}
2882
2883fn start_workspace_sync(
2887 client: Client,
2888 server: String,
2889 token: Option<String>,
2890 shared_codebases: Arc<Mutex<Vec<String>>>,
2891) -> JoinHandle<()> {
2892 tokio::spawn(async move {
2893 const POLL_INTERVAL_SECS: u64 = 60;
2894 let mut interval = tokio::time::interval(Duration::from_secs(POLL_INTERVAL_SECS));
2895 interval.set_missed_tick_behavior(tokio::time::MissedTickBehavior::Skip);
2896 interval.tick().await; loop {
2899 interval.tick().await;
2900 if let Err(e) =
2901 sync_workspaces_from_server(&client, &server, &token, &shared_codebases).await
2902 {
2903 tracing::warn!("Workspace sync failed: {}", e);
2904 }
2905 }
2906 })
2907}
2908
2909async fn sync_workspaces_from_server(
2913 client: &Client,
2914 server: &str,
2915 token: &Option<String>,
2916 shared_codebases: &Arc<Mutex<Vec<String>>>,
2917) -> Result<()> {
2918 let mut req = client.get(format!("{}/v1/agent/workspaces", server));
2919 if let Some(t) = token {
2920 req = req.bearer_auth(t);
2921 }
2922
2923 let res = req.send().await?;
2924 if !res.status().is_success() {
2925 tracing::debug!(
2926 status = %res.status(),
2927 "Workspace sync: server returned non-success, skipping"
2928 );
2929 return Ok(());
2930 }
2931
2932 let data: serde_json::Value = res.json().await?;
2933
2934 let entries = if let Some(arr) = data.as_array() {
2936 arr.clone()
2937 } else {
2938 data["workspaces"]
2939 .as_array()
2940 .or_else(|| data["codebases"].as_array())
2941 .cloned()
2942 .unwrap_or_default()
2943 };
2944
2945 let mut new_paths: Vec<String> = Vec::new();
2946 {
2947 let current = shared_codebases.lock().await;
2948 for entry in &entries {
2949 maybe_configure_repo_git_auth_from_entry(entry);
2950 let path = match entry["path"].as_str().filter(|p| !p.is_empty()) {
2951 Some(p) => p,
2952 None => continue,
2953 };
2954 if std::path::Path::new(path).exists() && !current.iter().any(|c| c.as_str() == path) {
2957 new_paths.push(path.to_string());
2958 }
2959 }
2960 }
2961
2962 if !new_paths.is_empty() {
2963 let mut current = shared_codebases.lock().await;
2964 for path in &new_paths {
2965 tracing::info!(
2966 path = %path,
2967 "Workspace sync: auto-discovered local path, adding to codebases"
2968 );
2969 current.push(path.clone());
2970 }
2971 tracing::info!(
2972 added = new_paths.len(),
2973 total = current.len(),
2974 "Workspace sync complete -- new paths take effect on next reconnect"
2975 );
2976 } else {
2977 tracing::debug!("Workspace sync: no new local paths found");
2978 }
2979
2980 Ok(())
2981}
2982
2983#[cfg(test)]
2984mod tests {
2985 use super::*;
2986 use serde_json::json;
2987
2988 #[test]
2989 fn metadata_lookup_reads_nested_forage_keys() {
2990 let metadata = json!({
2991 "forage": {
2992 "execute": true,
2993 "top": 5,
2994 }
2995 })
2996 .as_object()
2997 .cloned()
2998 .unwrap();
2999
3000 assert_eq!(metadata_bool(&metadata, &["execute"]), Some(true));
3001 assert_eq!(metadata_usize(&metadata, &["top"]), Some(5));
3002 }
3003
3004 #[test]
3005 fn build_forage_args_defaults_prompt_to_moonshot_and_local_mode() {
3006 let metadata = serde_json::Map::new();
3007 let args = build_forage_args(
3008 "Expand enterprise adoption in regulated markets",
3009 "forage task",
3010 &metadata,
3011 Some("openrouter/z-ai/glm-5".to_string()),
3012 );
3013
3014 assert_eq!(
3015 args.moonshots,
3016 vec!["Expand enterprise adoption in regulated markets".to_string()]
3017 );
3018 assert!(args.no_s3);
3019 assert_eq!(args.model.as_deref(), Some("openrouter/z-ai/glm-5"));
3020 assert_eq!(args.execution_engine, "run");
3021 }
3022
3023 #[test]
3024 fn build_forage_args_honors_nested_forage_configuration() {
3025 let metadata = json!({
3026 "forage": {
3027 "top": 7,
3028 "loop": true,
3029 "max_cycles": 2,
3030 "execute": true,
3031 "no_s3": false,
3032 "moonshots": ["Ship autonomous OKR execution", "Reduce operator toil"],
3033 "moonshot_required": true,
3034 "moonshot_min_alignment": "0.25",
3035 "execution_engine": "swarm",
3036 "run_timeout_secs": 1200,
3037 "fail_fast": true,
3038 "swarm_strategy": "stage",
3039 "swarm_max_subagents": 4,
3040 "swarm_max_steps": 42,
3041 "swarm_subagent_timeout_secs": 180
3042 }
3043 })
3044 .as_object()
3045 .cloned()
3046 .unwrap();
3047
3048 let args = build_forage_args("fallback prompt", "title", &metadata, None);
3049
3050 assert_eq!(args.top, 7);
3051 assert!(args.loop_mode);
3052 assert_eq!(args.max_cycles, 2);
3053 assert!(args.execute);
3054 assert!(!args.no_s3);
3055 assert_eq!(args.moonshots.len(), 2);
3056 assert!(args.moonshot_required);
3057 assert!((args.moonshot_min_alignment - 0.25).abs() < f64::EPSILON);
3058 assert_eq!(args.execution_engine, "swarm");
3059 assert_eq!(args.run_timeout_secs, 1200);
3060 assert!(args.fail_fast);
3061 assert_eq!(args.swarm_strategy, "stage");
3062 assert_eq!(args.swarm_max_subagents, 4);
3063 assert_eq!(args.swarm_max_steps, 42);
3064 assert_eq!(args.swarm_subagent_timeout_secs, 180);
3065 }
3066
3067 #[test]
3068 fn resolve_worker_id_prefers_env() {
3069 let original = std::env::var("CODETETHER_WORKER_ID").ok();
3070 unsafe {
3071 std::env::set_var("CODETETHER_WORKER_ID", "harvester-test-worker");
3072 }
3073 let resolved = resolve_worker_id();
3074 match original {
3075 Some(value) => unsafe {
3076 std::env::set_var("CODETETHER_WORKER_ID", value);
3077 },
3078 None => unsafe {
3079 std::env::remove_var("CODETETHER_WORKER_ID");
3080 },
3081 }
3082 assert_eq!(resolved, "harvester-test-worker");
3083 }
3084
3085 #[test]
3086 fn advertised_interfaces_include_http_and_bus_urls() {
3087 let interfaces = advertised_interfaces(Some("http://worker.test:8080/"));
3088 assert_eq!(interfaces["http"]["base_url"], "http://worker.test:8080");
3089 assert_eq!(
3090 interfaces["bus"]["stream_url"],
3091 "http://worker.test:8080/v1/bus/stream"
3092 );
3093 assert_eq!(
3094 interfaces["bus"]["publish_url"],
3095 "http://worker.test:8080/v1/bus/publish"
3096 );
3097 }
3098
3099 #[test]
3100 fn advertised_interfaces_omit_empty_public_url() {
3101 assert_eq!(advertised_interfaces(None), serde_json::json!({}));
3102 assert_eq!(advertised_interfaces(Some(" ")), serde_json::json!({}));
3103 }
3104
3105 #[tokio::test]
3106 async fn reserve_task_slot_enforces_capacity() {
3107 let processing = Arc::new(Mutex::new(HashSet::new()));
3108
3109 assert_eq!(
3110 reserve_task_slot(&processing, "task-1", 2).await,
3111 TaskReservation::Reserved
3112 );
3113 assert_eq!(
3114 reserve_task_slot(&processing, "task-1", 2).await,
3115 TaskReservation::AlreadyProcessing
3116 );
3117 assert_eq!(
3118 reserve_task_slot(&processing, "task-2", 2).await,
3119 TaskReservation::Reserved
3120 );
3121 assert_eq!(
3122 reserve_task_slot(&processing, "task-3", 2).await,
3123 TaskReservation::AtCapacity
3124 );
3125 }
3126
3127 #[test]
3128 fn normalize_max_concurrent_tasks_never_returns_zero() {
3129 assert_eq!(normalize_max_concurrent_tasks(0), 1);
3130 assert_eq!(normalize_max_concurrent_tasks(3), 3);
3131 }
3132}