1mod clone_location;
4mod clone_target;
5
6use crate::a2a::claim::TaskClaimResponse;
7use crate::a2a::git_credentials::{
8 configure_repo_git_auth, configure_repo_git_github_app, write_git_credential_helper_script,
9};
10use crate::a2a::worker_workspace_record::{RegisteredWorkspaceRecord, fetch_workspace_record};
11use crate::bus::AgentBus;
12use crate::cli::{A2aArgs, ForageArgs};
13use crate::provenance::{ClaimProvenance, install_commit_msg_hook};
14use crate::provider::ProviderRegistry;
15use crate::session::Session;
16use crate::swarm::{DecompositionStrategy, SwarmConfig, SwarmExecutor};
17use crate::tui::swarm_view::SwarmEvent;
18use crate::util;
19use anyhow::{Context, Result};
20use futures::StreamExt;
21use reqwest::Client;
22use serde::Deserialize;
23use std::collections::HashMap;
24use std::collections::HashSet;
25use std::path::{Path, PathBuf};
26use std::sync::Arc;
27use std::time::Duration;
28use tokio::process::Command;
29use tokio::sync::{Mutex, mpsc};
30use tokio::task::JoinHandle;
31use tokio::time::Instant;
32
33use crate::a2a::worker_tool_registry::{create_filtered_registry, is_tool_allowed};
34use crate::a2a::worker_workspace_context::resolve_task_workspace_dir;
35use clone_location::{git_clone_base_dir, resolve_workspace_clone_path};
36use clone_target::prepare_clone_target;
37
38#[derive(Debug, Clone, Copy, PartialEq, Eq)]
40pub enum WorkerStatus {
41 Idle,
42 Processing,
43}
44
45impl WorkerStatus {
46 pub fn as_str(&self) -> &'static str {
47 match self {
48 WorkerStatus::Idle => "idle",
49 WorkerStatus::Processing => "processing",
50 }
51 }
52}
53
54#[derive(Clone)]
56pub struct HeartbeatState {
57 worker_id: String,
58 pub agent_name: String,
59 pub status: Arc<Mutex<WorkerStatus>>,
60 pub active_task_count: Arc<Mutex<usize>>,
61 pub sub_agents: Arc<Mutex<HashSet<String>>>,
62}
63
64impl HeartbeatState {
65 pub fn new(worker_id: String, agent_name: String) -> Self {
66 Self {
67 worker_id,
68 agent_name,
69 status: Arc::new(Mutex::new(WorkerStatus::Idle)),
70 active_task_count: Arc::new(Mutex::new(0)),
71 sub_agents: Arc::new(Mutex::new(HashSet::new())),
72 }
73 }
74
75 pub async fn set_status(&self, status: WorkerStatus) {
76 *self.status.lock().await = status;
77 }
78
79 pub async fn set_task_count(&self, count: usize) {
80 *self.active_task_count.lock().await = count;
81 }
82
83 pub async fn register_sub_agent(&self, agent_name: String) {
84 self.sub_agents.lock().await.insert(agent_name);
85 }
86
87 pub async fn deregister_sub_agent(&self, agent_name: &str) {
88 self.sub_agents.lock().await.remove(agent_name);
89 }
90
91 pub async fn sub_agents_snapshot(&self) -> Vec<String> {
92 let mut names = self
93 .sub_agents
94 .lock()
95 .await
96 .iter()
97 .cloned()
98 .collect::<Vec<_>>();
99 names.sort();
100 names
101 }
102}
103
104#[derive(Clone, Debug)]
105#[allow(dead_code)]
106pub struct CognitionHeartbeatConfig {
107 pub enabled: bool,
108 pub source_base_url: String,
109 pub token: Option<String>,
110 pub provider_name: String,
111 pub interval_secs: u64,
112 pub include_thought_summary: bool,
113 pub summary_max_chars: usize,
114 pub request_timeout_ms: u64,
115}
116
117impl CognitionHeartbeatConfig {
118 pub fn from_env() -> Self {
119 let source_base_url = std::env::var("CODETETHER_WORKER_COGNITION_SOURCE_URL")
120 .unwrap_or_else(|_| "http://127.0.0.1:4096".to_string())
121 .trim_end_matches('/')
122 .to_string();
123
124 Self {
125 enabled: env_bool("CODETETHER_WORKER_COGNITION_SHARE_ENABLED", true),
126 source_base_url,
127 include_thought_summary: env_bool("CODETETHER_WORKER_COGNITION_INCLUDE_THOUGHTS", true),
128 summary_max_chars: env_usize("CODETETHER_WORKER_COGNITION_THOUGHT_MAX_CHARS", 480),
129 request_timeout_ms: env_u64("CODETETHER_WORKER_COGNITION_TIMEOUT_MS", 2_500).max(250),
130 interval_secs: env_u64("CODETETHER_WORKER_COGNITION_INTERVAL_SECS", 30).max(5),
131 provider_name: std::env::var("CODETETHER_WORKER_COGNITION_PROVIDER")
132 .unwrap_or_else(|_| "cognition".to_string()),
133 token: std::env::var("CODETETHER_WORKER_COGNITION_TOKEN").ok(),
134 }
135 }
136}
137
138#[derive(Debug, Deserialize)]
139struct CognitionStatusSnapshot {
140 running: bool,
141 #[serde(default)]
142 last_tick_at: Option<String>,
143 #[serde(default)]
144 active_persona_count: usize,
145 #[serde(default)]
146 events_buffered: usize,
147 #[serde(default)]
148 snapshots_buffered: usize,
149 #[serde(default)]
150 loop_interval_ms: u64,
151}
152
153#[derive(Debug, Deserialize)]
154struct CognitionLatestSnapshot {
155 generated_at: String,
156 summary: String,
157 #[serde(default)]
158 metadata: HashMap<String, serde_json::Value>,
159}
160
161#[derive(Debug, Clone, Copy, PartialEq, Eq)]
162enum TaskReservation {
163 Reserved,
164 AlreadyProcessing,
165 AtCapacity,
166}
167
168#[derive(Clone)]
169struct WorkerTaskRuntime {
170 client: Client,
171 server: String,
172 token: Option<String>,
173 worker_id: String,
174 processing: Arc<Mutex<HashSet<String>>>,
175 max_concurrent_tasks: usize,
176 auto_approve: AutoApprove,
177 bus: Arc<AgentBus>,
178}
179
180pub async fn run(args: A2aArgs) -> Result<()> {
182 let server = args.server.trim_end_matches('/');
183 let name = args
184 .name
185 .unwrap_or_else(|| format!("codetether-{}", std::process::id()));
186 let worker_id = resolve_worker_id();
187 export_worker_runtime_env(server, &args.token, &worker_id);
188 let max_concurrent_tasks = normalize_max_concurrent_tasks(args.max_concurrent_tasks);
189
190 let codebases: Vec<String> = args
191 .workspaces
192 .map(|c| c.split(',').map(|s| s.trim().to_string()).collect())
193 .unwrap_or_else(|| vec![std::env::current_dir().unwrap().display().to_string()]);
194
195 tracing::info!("Starting A2A worker: {} ({})", name, worker_id);
196 tracing::info!("Server: {}", server);
197 tracing::info!("Workspaces: {:?}", codebases);
198 tracing::info!(max_concurrent_tasks, "Worker task concurrency configured");
199
200 let shared_codebases: Arc<Mutex<Vec<String>>> = Arc::new(Mutex::new(codebases));
202
203 let client = Client::new();
204 let processing = Arc::new(Mutex::new(HashSet::<String>::new()));
205 let cognition_heartbeat = CognitionHeartbeatConfig::from_env();
206 if cognition_heartbeat.enabled {
207 tracing::info!(
208 source = %cognition_heartbeat.source_base_url,
209 include_thoughts = cognition_heartbeat.include_thought_summary,
210 max_chars = cognition_heartbeat.summary_max_chars,
211 timeout_ms = cognition_heartbeat.request_timeout_ms,
212 "Cognition heartbeat sharing enabled (set CODETETHER_WORKER_COGNITION_SHARE_ENABLED=false to disable)"
213 );
214 } else {
215 tracing::warn!(
216 "Cognition heartbeat sharing disabled; worker thought state will not be shared upstream"
217 );
218 }
219
220 let auto_approve = match args.auto_approve.as_str() {
221 "all" => AutoApprove::All,
222 "safe" => AutoApprove::Safe,
223 _ => AutoApprove::None,
224 };
225
226 let heartbeat_state = HeartbeatState::new(worker_id.clone(), name.clone());
228
229 let bus = AgentBus::new().into_arc();
231
232 crate::bus::s3_sink::spawn_bus_s3_sink(bus.clone());
234
235 {
236 let handle = bus.handle(&worker_id);
237 handle.announce_ready(worker_capabilities());
238 }
239
240 let task_runtime = WorkerTaskRuntime {
241 client: client.clone(),
242 server: server.to_string(),
243 token: args.token.clone(),
244 worker_id: worker_id.clone(),
245 processing: processing.clone(),
246 max_concurrent_tasks,
247 auto_approve,
248 bus: bus.clone(),
249 };
250
251 {
253 let codebases = shared_codebases.lock().await.clone();
254 register_worker(
255 &client,
256 server,
257 &args.token,
258 &worker_id,
259 &name,
260 &codebases,
261 args.public_url.as_deref(),
262 )
263 .await?;
264 }
265
266 if let Err(e) =
267 sync_workspaces_from_server(&client, server, &args.token, &shared_codebases).await
268 {
269 tracing::warn!(error = %e, "Initial workspace sync failed");
270 }
271
272 fetch_pending_tasks(&task_runtime).await?;
274
275 let _workspace_sync_handle = start_workspace_sync(
277 client.clone(),
278 server.to_string(),
279 args.token.clone(),
280 shared_codebases.clone(),
281 );
282
283 loop {
285 let codebases = shared_codebases.lock().await.clone();
287
288 if let Err(e) = register_worker(
290 &client,
291 server,
292 &args.token,
293 &worker_id,
294 &name,
295 &codebases,
296 args.public_url.as_deref(),
297 )
298 .await
299 {
300 tracing::warn!("Failed to re-register worker on reconnection: {}", e);
301 }
302 if let Err(e) = fetch_pending_tasks(&task_runtime).await {
303 tracing::warn!("Reconnect task fetch failed: {}", e);
304 }
305
306 let heartbeat_handle = start_heartbeat(
308 client.clone(),
309 server.to_string(),
310 args.token.clone(),
311 heartbeat_state.clone(),
312 processing.clone(),
313 cognition_heartbeat.clone(),
314 );
315
316 match connect_stream(&task_runtime, &name, &codebases, None).await {
317 Ok(()) => {
318 tracing::warn!("Stream ended, reconnecting...");
319 }
320 Err(e) => {
321 tracing::error!("Stream error: {}, reconnecting...", e);
322 }
323 }
324
325 heartbeat_handle.abort();
327 tracing::debug!("Heartbeat cancelled for reconnection");
328
329 tokio::time::sleep(tokio::time::Duration::from_secs(5)).await;
330 }
331}
332
333pub async fn run_with_state(
336 args: A2aArgs,
337 server_state: crate::worker_server::WorkerServerState,
338) -> Result<()> {
339 let server = args.server.trim_end_matches('/');
340 let name = args
341 .name
342 .unwrap_or_else(|| format!("codetether-{}", std::process::id()));
343 let worker_id = resolve_worker_id();
344 export_worker_runtime_env(server, &args.token, &worker_id);
345 let max_concurrent_tasks = normalize_max_concurrent_tasks(args.max_concurrent_tasks);
346
347 server_state.set_worker_id(worker_id.clone()).await;
349
350 let codebases: Vec<String> = args
351 .workspaces
352 .map(|c| c.split(',').map(|s| s.trim().to_string()).collect())
353 .unwrap_or_else(|| vec![std::env::current_dir().unwrap().display().to_string()]);
354
355 tracing::info!("Starting A2A worker: {} ({})", name, worker_id);
356 tracing::info!("Server: {}", server);
357 tracing::info!("Workspaces: {:?}", codebases);
358 tracing::info!(max_concurrent_tasks, "Worker task concurrency configured");
359
360 let shared_codebases: Arc<Mutex<Vec<String>>> = Arc::new(Mutex::new(codebases));
362
363 let client = Client::new();
364 let processing = Arc::new(Mutex::new(HashSet::<String>::new()));
365 let cognition_heartbeat = CognitionHeartbeatConfig::from_env();
366 if cognition_heartbeat.enabled {
367 tracing::info!(
368 source = %cognition_heartbeat.source_base_url,
369 include_thoughts = cognition_heartbeat.include_thought_summary,
370 max_chars = cognition_heartbeat.summary_max_chars,
371 timeout_ms = cognition_heartbeat.request_timeout_ms,
372 "Cognition heartbeat sharing enabled (set CODETETHER_WORKER_COGNITION_SHARE_ENABLED=false to disable)"
373 );
374 } else {
375 tracing::warn!(
376 "Cognition heartbeat sharing disabled; worker thought state will not be shared upstream"
377 );
378 }
379
380 let auto_approve = match args.auto_approve.as_str() {
381 "all" => AutoApprove::All,
382 "safe" => AutoApprove::Safe,
383 _ => AutoApprove::None,
384 };
385
386 let heartbeat_state = HeartbeatState::new(worker_id.clone(), name.clone());
388
389 server_state
391 .set_heartbeat_state(Arc::new(heartbeat_state.clone()))
392 .await;
393
394 let bus = AgentBus::new().into_arc();
396 server_state.set_bus(bus.clone()).await;
397
398 crate::bus::s3_sink::spawn_bus_s3_sink(bus.clone());
400
401 {
402 let handle = bus.handle(&worker_id);
403 handle.announce_ready(worker_capabilities());
404 }
405
406 let task_runtime = WorkerTaskRuntime {
407 client: client.clone(),
408 server: server.to_string(),
409 token: args.token.clone(),
410 worker_id: worker_id.clone(),
411 processing: processing.clone(),
412 max_concurrent_tasks,
413 auto_approve,
414 bus: bus.clone(),
415 };
416
417 {
419 let codebases = shared_codebases.lock().await.clone();
420 register_worker(
421 &client,
422 server,
423 &args.token,
424 &worker_id,
425 &name,
426 &codebases,
427 args.public_url.as_deref(),
428 )
429 .await?;
430 }
431
432 if let Err(e) =
433 sync_workspaces_from_server(&client, server, &args.token, &shared_codebases).await
434 {
435 tracing::warn!(error = %e, "Initial workspace sync failed");
436 }
437
438 server_state.set_connected(true).await;
440
441 fetch_pending_tasks(&task_runtime).await?;
443
444 let _workspace_sync_handle = start_workspace_sync(
446 client.clone(),
447 server.to_string(),
448 args.token.clone(),
449 shared_codebases.clone(),
450 );
451
452 loop {
454 let codebases = shared_codebases.lock().await.clone();
456
457 let (task_notify_tx, task_notify_rx) = mpsc::channel::<String>(32);
460 server_state
461 .set_task_notification_channel(task_notify_tx)
462 .await;
463
464 server_state.set_connected(true).await;
466
467 if let Err(e) = register_worker(
469 &client,
470 server,
471 &args.token,
472 &worker_id,
473 &name,
474 &codebases,
475 args.public_url.as_deref(),
476 )
477 .await
478 {
479 tracing::warn!("Failed to re-register worker on reconnection: {}", e);
480 }
481 if let Err(e) = fetch_pending_tasks(&task_runtime).await {
482 tracing::warn!("Reconnect task fetch failed: {}", e);
483 }
484
485 let heartbeat_handle = start_heartbeat(
487 client.clone(),
488 server.to_string(),
489 args.token.clone(),
490 heartbeat_state.clone(),
491 processing.clone(),
492 cognition_heartbeat.clone(),
493 );
494
495 match connect_stream(&task_runtime, &name, &codebases, Some(task_notify_rx)).await {
496 Ok(()) => {
497 tracing::warn!("Stream ended, reconnecting...");
498 }
499 Err(e) => {
500 tracing::error!("Stream error: {}, reconnecting...", e);
501 }
502 }
503
504 server_state.set_connected(false).await;
506
507 heartbeat_handle.abort();
509 tracing::debug!("Heartbeat cancelled for reconnection");
510
511 tokio::time::sleep(tokio::time::Duration::from_secs(5)).await;
512 }
513}
514
515pub fn generate_worker_id() -> String {
516 format!(
517 "wrk_{}_{:x}",
518 chrono::Utc::now().timestamp(),
519 rand::random::<u64>()
520 )
521}
522
523fn resolve_worker_id() -> String {
524 for key in ["CODETETHER_WORKER_ID", "A2A_WORKER_ID"] {
525 if let Ok(value) = std::env::var(key) {
526 let trimmed = value.trim();
527 if !trimmed.is_empty() {
528 return trimmed.to_string();
529 }
530 }
531 }
532 generate_worker_id()
533}
534
535fn normalize_max_concurrent_tasks(max_concurrent_tasks: usize) -> usize {
536 max_concurrent_tasks.max(1)
537}
538
539async fn reserve_task_slot(
540 processing: &Arc<Mutex<HashSet<String>>>,
541 task_id: &str,
542 max_concurrent_tasks: usize,
543) -> TaskReservation {
544 let mut proc = processing.lock().await;
545 if proc.contains(task_id) {
546 TaskReservation::AlreadyProcessing
547 } else if proc.len() >= max_concurrent_tasks {
548 TaskReservation::AtCapacity
549 } else {
550 proc.insert(task_id.to_string());
551 TaskReservation::Reserved
552 }
553}
554
555fn export_worker_runtime_env(server: &str, token: &Option<String>, worker_id: &str) {
556 unsafe {
560 std::env::set_var("CODETETHER_SERVER", server);
561 std::env::set_var("CODETETHER_WORKER_ID", worker_id);
562 if let Some(token) = token {
563 std::env::set_var("CODETETHER_TOKEN", token);
564 }
565 }
566}
567
568fn advertised_interfaces(public_url: Option<&str>) -> serde_json::Value {
569 let Some(base_url) = public_url
570 .map(str::trim)
571 .filter(|value| !value.is_empty())
572 .map(|value| value.trim_end_matches('/').to_string())
573 else {
574 return serde_json::json!({});
575 };
576
577 serde_json::json!({
578 "http": {
579 "base_url": base_url,
580 },
581 "bus": {
582 "stream_url": format!("{base_url}/v1/bus/stream"),
583 "publish_url": format!("{base_url}/v1/bus/publish"),
584 },
585 })
586}
587
588#[derive(Debug, Clone, Copy)]
599pub enum AutoApprove {
600 All,
601 Safe,
602 None,
603}
604
605pub const DEFAULT_A2A_SERVER_URL: &str = "https://api.codetether.run";
607
608const BASE_WORKER_CAPABILITIES: &[&str] = &[
610 "forage", "ralph", "swarm", "rlm", "a2a", "mcp", "grpc", "grpc-web", "jsonrpc",
611];
612
613fn worker_capabilities() -> Vec<String> {
614 let mut capabilities: Vec<String> = BASE_WORKER_CAPABILITIES
615 .iter()
616 .map(|capability| capability.to_string())
617 .collect();
618
619 let is_knative = std::env::var("KNATIVE_SERVICE")
620 .map(|value| {
621 let normalized = value.trim().to_lowercase();
622 normalized == "1" || normalized == "true" || normalized == "yes"
623 })
624 .unwrap_or(false);
625 if is_knative {
626 capabilities.push("knative".to_string());
627 }
628
629 capabilities
630}
631
632fn task_value<'a>(task: &'a serde_json::Value, key: &str) -> Option<&'a serde_json::Value> {
633 task.get("task")
634 .and_then(|t| t.get(key))
635 .or_else(|| task.get(key))
636}
637
638fn task_str<'a>(task: &'a serde_json::Value, key: &str) -> Option<&'a str> {
639 task_value(task, key).and_then(|v| v.as_str())
640}
641
642fn task_metadata(task: &serde_json::Value) -> serde_json::Map<String, serde_json::Value> {
643 task_value(task, "metadata")
644 .and_then(|m| m.as_object())
645 .cloned()
646 .unwrap_or_default()
647}
648
649fn model_ref_to_provider_model(model: &str) -> String {
650 if !model.contains('/') && model.contains(':') {
654 model.replacen(':', "/", 1)
655 } else {
656 model.to_string()
657 }
658}
659
660fn provider_preferences_for_tier(model_tier: Option<&str>) -> &'static [&'static str] {
661 match model_tier.unwrap_or("balanced") {
662 "fast" | "quick" => &[
663 "zai",
664 "openai",
665 "github-copilot",
666 "moonshotai",
667 "openrouter",
668 "novita",
669 "google",
670 "anthropic",
671 ],
672 "heavy" | "deep" => &[
673 "zai",
674 "anthropic",
675 "openai",
676 "github-copilot",
677 "moonshotai",
678 "openrouter",
679 "novita",
680 "google",
681 ],
682 _ => &[
683 "zai",
684 "openai",
685 "github-copilot",
686 "anthropic",
687 "moonshotai",
688 "openrouter",
689 "novita",
690 "google",
691 ],
692 }
693}
694
695fn choose_provider_for_tier<'a>(providers: &'a [&'a str], model_tier: Option<&str>) -> &'a str {
696 for preferred in provider_preferences_for_tier(model_tier) {
697 if let Some(found) = providers.iter().copied().find(|p| *p == *preferred) {
698 return found;
699 }
700 }
701 if let Some(found) = providers.iter().copied().find(|p| *p == "zai") {
702 return found;
703 }
704 providers[0]
705}
706
707fn default_model_for_provider(provider: &str, model_tier: Option<&str>) -> String {
708 match model_tier.unwrap_or("balanced") {
709 "fast" | "quick" => match provider {
710 "moonshotai" => "kimi-k2.5".to_string(),
711 "anthropic" => "claude-haiku-4-5".to_string(),
712 "openai" => "gpt-4o-mini".to_string(),
713 "google" => "gemini-2.5-flash".to_string(),
714 "zhipuai" | "zai" => "glm-5".to_string(),
715 "openrouter" => "z-ai/glm-5".to_string(),
716 "novita" => "Qwen/Qwen3.5-35B-A3B".to_string(),
717 "bedrock" => "amazon.nova-lite-v1:0".to_string(),
718 _ => "glm-5".to_string(),
719 },
720 "heavy" | "deep" => match provider {
721 "moonshotai" => "kimi-k2.5".to_string(),
722 "anthropic" => "claude-sonnet-4-20250514".to_string(),
723 "openai" => "o3".to_string(),
724 "google" => "gemini-2.5-pro".to_string(),
725 "zhipuai" | "zai" => "glm-5".to_string(),
726 "openrouter" => "z-ai/glm-5".to_string(),
727 "novita" => "Qwen/Qwen3.5-35B-A3B".to_string(),
728 "bedrock" => "us.anthropic.claude-opus-4-6-v1".to_string(),
729 _ => "glm-5".to_string(),
730 },
731 _ => match provider {
732 "moonshotai" => "kimi-k2.5".to_string(),
733 "anthropic" => "claude-sonnet-4-20250514".to_string(),
734 "openai" => "gpt-4o".to_string(),
735 "google" => "gemini-2.5-pro".to_string(),
736 "zhipuai" | "zai" => "glm-5".to_string(),
737 "openrouter" => "z-ai/glm-5".to_string(),
738 "novita" => "Qwen/Qwen3.5-35B-A3B".to_string(),
739 "bedrock" => "amazon.nova-lite-v1:0".to_string(),
740 _ => "glm-5".to_string(),
741 },
742 }
743}
744
745fn prefers_temperature_one(model: &str) -> bool {
746 let normalized = model.to_ascii_lowercase();
747 normalized.contains("kimi-k2") || normalized.contains("glm-") || normalized.contains("minimax")
748}
749
750fn is_swarm_agent(agent_type: &str) -> bool {
751 matches!(
752 agent_type.trim().to_ascii_lowercase().as_str(),
753 "swarm" | "parallel" | "multi-agent"
754 )
755}
756
757fn is_forage_agent(agent_type: &str) -> bool {
758 agent_type.trim().eq_ignore_ascii_case("forage")
759}
760
761fn metadata_lookup<'a>(
762 metadata: &'a serde_json::Map<String, serde_json::Value>,
763 key: &str,
764) -> Option<&'a serde_json::Value> {
765 metadata
766 .get(key)
767 .or_else(|| {
768 metadata
769 .get("routing")
770 .and_then(|v| v.as_object())
771 .and_then(|obj| obj.get(key))
772 })
773 .or_else(|| {
774 metadata
775 .get("swarm")
776 .and_then(|v| v.as_object())
777 .and_then(|obj| obj.get(key))
778 })
779 .or_else(|| {
780 metadata
781 .get("forage")
782 .and_then(|v| v.as_object())
783 .and_then(|obj| obj.get(key))
784 })
785}
786
787fn metadata_str(
788 metadata: &serde_json::Map<String, serde_json::Value>,
789 keys: &[&str],
790) -> Option<String> {
791 for key in keys {
792 if let Some(value) = metadata_lookup(metadata, key).and_then(|v| v.as_str()) {
793 let trimmed = value.trim();
794 if !trimmed.is_empty() {
795 return Some(trimmed.to_string());
796 }
797 }
798 }
799 None
800}
801
802fn metadata_usize(
803 metadata: &serde_json::Map<String, serde_json::Value>,
804 keys: &[&str],
805) -> Option<usize> {
806 for key in keys {
807 if let Some(value) = metadata_lookup(metadata, key) {
808 if let Some(v) = value.as_u64() {
809 return usize::try_from(v).ok();
810 }
811 if let Some(v) = value.as_i64()
812 && v >= 0
813 {
814 return usize::try_from(v as u64).ok();
815 }
816 if let Some(v) = value.as_str()
817 && let Ok(parsed) = v.trim().parse::<usize>()
818 {
819 return Some(parsed);
820 }
821 }
822 }
823 None
824}
825
826fn metadata_u64(
827 metadata: &serde_json::Map<String, serde_json::Value>,
828 keys: &[&str],
829) -> Option<u64> {
830 for key in keys {
831 if let Some(value) = metadata_lookup(metadata, key) {
832 if let Some(v) = value.as_u64() {
833 return Some(v);
834 }
835 if let Some(v) = value.as_i64()
836 && v >= 0
837 {
838 return Some(v as u64);
839 }
840 if let Some(v) = value.as_str()
841 && let Ok(parsed) = v.trim().parse::<u64>()
842 {
843 return Some(parsed);
844 }
845 }
846 }
847 None
848}
849
850fn metadata_bool(
851 metadata: &serde_json::Map<String, serde_json::Value>,
852 keys: &[&str],
853) -> Option<bool> {
854 for key in keys {
855 if let Some(value) = metadata_lookup(metadata, key) {
856 if let Some(v) = value.as_bool() {
857 return Some(v);
858 }
859 if let Some(v) = value.as_str() {
860 match v.trim().to_ascii_lowercase().as_str() {
861 "1" | "true" | "yes" | "on" => return Some(true),
862 "0" | "false" | "no" | "off" => return Some(false),
863 _ => {}
864 }
865 }
866 }
867 }
868 None
869}
870
871fn metadata_f64(
872 metadata: &serde_json::Map<String, serde_json::Value>,
873 keys: &[&str],
874) -> Option<f64> {
875 for key in keys {
876 if let Some(value) = metadata_lookup(metadata, key) {
877 if let Some(v) = value.as_f64() {
878 return Some(v);
879 }
880 if let Some(v) = value.as_i64() {
881 return Some(v as f64);
882 }
883 if let Some(v) = value.as_u64() {
884 return Some(v as f64);
885 }
886 if let Some(v) = value.as_str()
887 && let Ok(parsed) = v.trim().parse::<f64>()
888 {
889 return Some(parsed);
890 }
891 }
892 }
893 None
894}
895
896fn metadata_string_list(
897 metadata: &serde_json::Map<String, serde_json::Value>,
898 keys: &[&str],
899) -> Vec<String> {
900 for key in keys {
901 let Some(value) = metadata_lookup(metadata, key) else {
902 continue;
903 };
904
905 if let Some(items) = value.as_array() {
906 let parsed = items
907 .iter()
908 .filter_map(|item| item.as_str())
909 .map(str::trim)
910 .filter(|item| !item.is_empty())
911 .map(ToString::to_string)
912 .collect::<Vec<_>>();
913 if !parsed.is_empty() {
914 return parsed;
915 }
916 }
917
918 if let Some(value) = value.as_str() {
919 let parsed = value
920 .split(['\n', ','])
921 .map(str::trim)
922 .filter(|item| !item.is_empty())
923 .map(ToString::to_string)
924 .collect::<Vec<_>>();
925 if !parsed.is_empty() {
926 return parsed;
927 }
928 }
929 }
930
931 Vec::new()
932}
933
934fn normalize_forage_execution_engine(value: Option<String>) -> String {
935 match value
936 .as_deref()
937 .unwrap_or("run")
938 .trim()
939 .to_ascii_lowercase()
940 .as_str()
941 {
942 "swarm" => "swarm".to_string(),
943 "go" => "go".to_string(),
944 _ => "run".to_string(),
945 }
946}
947
948fn normalize_forage_swarm_strategy(value: Option<String>) -> String {
949 match value
950 .as_deref()
951 .unwrap_or("auto")
952 .trim()
953 .to_ascii_lowercase()
954 .as_str()
955 {
956 "domain" => "domain".to_string(),
957 "data" => "data".to_string(),
958 "stage" => "stage".to_string(),
959 "none" => "none".to_string(),
960 _ => "auto".to_string(),
961 }
962}
963
964fn build_forage_args(
965 prompt: &str,
966 title: &str,
967 metadata: &serde_json::Map<String, serde_json::Value>,
968 selected_model: Option<String>,
969) -> ForageArgs {
970 let mut moonshots = metadata_string_list(
971 metadata,
972 &["moonshots", "moonshot", "goals", "mission", "missions"],
973 );
974 if moonshots.is_empty() {
975 let fallback = prompt.trim();
976 if !fallback.is_empty() {
977 moonshots.push(fallback.to_string());
978 } else if !title.trim().is_empty() {
979 moonshots.push(title.trim().to_string());
980 }
981 }
982
983 ForageArgs {
984 top: metadata_usize(metadata, &["top"]).unwrap_or(3),
985 loop_mode: metadata_bool(metadata, &["loop", "loop_mode"]).unwrap_or(false),
986 interval_secs: metadata_u64(metadata, &["interval_secs", "interval"]).unwrap_or(120),
987 max_cycles: metadata_usize(metadata, &["max_cycles"]).unwrap_or(0),
988 execute: metadata_bool(metadata, &["execute"]).unwrap_or(false),
989 no_s3: metadata_bool(metadata, &["no_s3", "local_only"]).unwrap_or(true),
991 moonshots,
992 moonshot_file: metadata_str(metadata, &["moonshot_file"]).map(PathBuf::from),
993 moonshot_required: metadata_bool(metadata, &["moonshot_required"]).unwrap_or(false),
994 moonshot_min_alignment: metadata_f64(metadata, &["moonshot_min_alignment"]).unwrap_or(0.10),
995 execution_engine: normalize_forage_execution_engine(metadata_str(
996 metadata,
997 &["execution_engine", "engine"],
998 )),
999 run_timeout_secs: metadata_u64(metadata, &["run_timeout_secs", "timeout_secs", "timeout"])
1000 .unwrap_or(900),
1001 fail_fast: metadata_bool(metadata, &["fail_fast"]).unwrap_or(false),
1002 swarm_strategy: normalize_forage_swarm_strategy(metadata_str(
1003 metadata,
1004 &["swarm_strategy", "strategy"],
1005 )),
1006 swarm_max_subagents: metadata_usize(metadata, &["swarm_max_subagents"]).unwrap_or(8),
1007 swarm_max_steps: metadata_usize(metadata, &["swarm_max_steps"]).unwrap_or(100),
1008 swarm_subagent_timeout_secs: metadata_u64(metadata, &["swarm_subagent_timeout_secs"])
1009 .unwrap_or(300),
1010 model: selected_model,
1011 json: metadata_bool(metadata, &["json"]).unwrap_or(false),
1012 }
1013}
1014
1015fn parse_swarm_strategy(
1016 metadata: &serde_json::Map<String, serde_json::Value>,
1017) -> DecompositionStrategy {
1018 match metadata_str(
1019 metadata,
1020 &[
1021 "decomposition_strategy",
1022 "swarm_strategy",
1023 "strategy",
1024 "swarm_decomposition",
1025 ],
1026 )
1027 .as_deref()
1028 .map(|s| s.to_ascii_lowercase())
1029 .as_deref()
1030 {
1031 Some("none") | Some("single") => DecompositionStrategy::None,
1032 Some("domain") | Some("by_domain") => DecompositionStrategy::ByDomain,
1033 Some("data") | Some("by_data") => DecompositionStrategy::ByData,
1034 Some("stage") | Some("by_stage") => DecompositionStrategy::ByStage,
1035 _ => DecompositionStrategy::Automatic,
1036 }
1037}
1038
1039async fn resolve_swarm_model(
1040 explicit_model: Option<String>,
1041 model_tier: Option<&str>,
1042) -> Option<String> {
1043 if let Some(model) = explicit_model
1044 && !model.trim().is_empty()
1045 {
1046 return Some(model);
1047 }
1048
1049 let registry = ProviderRegistry::from_vault().await.ok()?;
1050 let providers = registry.list();
1051 if providers.is_empty() {
1052 return None;
1053 }
1054 let provider = choose_provider_for_tier(providers.as_slice(), model_tier);
1055 let model = default_model_for_provider(provider, model_tier);
1056 Some(format!("{}/{}", provider, model))
1057}
1058
1059fn format_swarm_event_for_output(event: &SwarmEvent) -> Option<String> {
1060 match event {
1061 SwarmEvent::Started {
1062 task,
1063 total_subtasks,
1064 } => Some(format!(
1065 "[swarm] started task={} planned_subtasks={}",
1066 task, total_subtasks
1067 )),
1068 SwarmEvent::StageComplete {
1069 stage,
1070 completed,
1071 failed,
1072 } => Some(format!(
1073 "[swarm] stage={} completed={} failed={}",
1074 stage, completed, failed
1075 )),
1076 SwarmEvent::SubTaskUpdate { id, status, .. } => Some(format!(
1077 "[swarm] subtask id={} status={}",
1078 &id.chars().take(8).collect::<String>(),
1079 format!("{status:?}").to_ascii_lowercase()
1080 )),
1081 SwarmEvent::AgentToolCall {
1082 subtask_id,
1083 tool_name,
1084 } => Some(format!(
1085 "[swarm] subtask id={} tool={}",
1086 &subtask_id.chars().take(8).collect::<String>(),
1087 tool_name
1088 )),
1089 SwarmEvent::AgentError { subtask_id, error } => Some(format!(
1090 "[swarm] subtask id={} error={}",
1091 &subtask_id.chars().take(8).collect::<String>(),
1092 error
1093 )),
1094 SwarmEvent::Complete { success, stats } => Some(format!(
1095 "[swarm] complete success={} subtasks={} speedup={:.2}",
1096 success,
1097 stats.subagents_completed + stats.subagents_failed,
1098 stats.speedup_factor
1099 )),
1100 SwarmEvent::Error(err) => Some(format!("[swarm] error message={}", err)),
1101 _ => None,
1102 }
1103}
1104
1105pub async fn register_worker(
1106 client: &Client,
1107 server: &str,
1108 token: &Option<String>,
1109 worker_id: &str,
1110 name: &str,
1111 codebases: &[String],
1112 public_url: Option<&str>,
1113) -> Result<()> {
1114 let models = match load_provider_models().await {
1116 Ok(m) => m,
1117 Err(e) => {
1118 tracing::warn!(
1119 "Failed to load provider models: {}, proceeding without model info",
1120 e
1121 );
1122 HashMap::new()
1123 }
1124 };
1125
1126 let mut req = client.post(format!("{}/v1/agent/workers/register", server));
1128
1129 if let Some(t) = token {
1130 req = req.bearer_auth(t);
1131 }
1132
1133 let models_array: Vec<serde_json::Value> = models
1136 .iter()
1137 .flat_map(|(provider, model_infos)| {
1138 model_infos.iter().map(move |m| {
1139 let mut obj = serde_json::json!({
1140 "id": format!("{}/{}", provider, m.id),
1141 "name": &m.id,
1142 "provider": provider,
1143 "provider_id": provider,
1144 });
1145 if let Some(input_cost) = m.input_cost_per_million {
1146 obj["input_cost_per_million"] = serde_json::json!(input_cost);
1147 }
1148 if let Some(output_cost) = m.output_cost_per_million {
1149 obj["output_cost_per_million"] = serde_json::json!(output_cost);
1150 }
1151 obj
1152 })
1153 })
1154 .collect();
1155
1156 tracing::info!(
1157 "Registering worker with {} models from {} providers",
1158 models_array.len(),
1159 models.len()
1160 );
1161
1162 let hostname = std::env::var("HOSTNAME")
1163 .or_else(|_| std::env::var("COMPUTERNAME"))
1164 .unwrap_or_else(|_| "unknown".to_string());
1165 let k8s_node_name = std::env::var("K8S_NODE_NAME")
1166 .ok()
1167 .map(|value| value.trim().to_string())
1168 .filter(|value| !value.is_empty());
1169
1170 let registry = crate::agent::AgentRegistry::with_builtins();
1172 let agent_defs: Vec<serde_json::Value> = registry
1173 .list()
1174 .iter()
1175 .map(|info| {
1176 serde_json::json!({
1177 "name": info.name,
1178 "description": info.description,
1179 "mode": format!("{:?}", info.mode).to_lowercase(),
1180 "native": info.native,
1181 "hidden": info.hidden,
1182 "model": info.model,
1183 "temperature": info.temperature,
1184 "top_p": info.top_p,
1185 "max_steps": info.max_steps,
1186 })
1187 })
1188 .collect();
1189
1190 let res = req
1191 .json(&serde_json::json!({
1192 "worker_id": worker_id,
1193 "name": name,
1194 "capabilities": worker_capabilities(),
1195 "hostname": hostname,
1196 "k8s_node_name": k8s_node_name,
1197 "models": models_array,
1198 "workspaces": codebases,
1199 "interfaces": advertised_interfaces(public_url),
1200 "agents": agent_defs,
1201 }))
1202 .send()
1203 .await?;
1204
1205 if res.status().is_success() {
1206 tracing::info!("Worker registered successfully");
1207 } else {
1208 tracing::warn!("Failed to register worker: {}", res.status());
1209 }
1210
1211 Ok(())
1212}
1213
1214async fn load_provider_models() -> Result<HashMap<String, Vec<crate::provider::ModelInfo>>> {
1218 let registry = match ProviderRegistry::from_vault().await {
1220 Ok(r) if !r.list().is_empty() => {
1221 tracing::info!("Loaded {} providers from Vault", r.list().len());
1222 r
1223 }
1224 Ok(_) => {
1225 tracing::warn!("Vault returned 0 providers, falling back to config/env vars");
1226 fallback_registry().await?
1227 }
1228 Err(e) => {
1229 tracing::warn!("Vault unreachable ({}), falling back to config/env vars", e);
1230 fallback_registry().await?
1231 }
1232 };
1233
1234 let mut models_by_provider: HashMap<String, Vec<crate::provider::ModelInfo>> = HashMap::new();
1235
1236 for provider_name in registry.list() {
1237 if let Some(provider) = registry.get(provider_name) {
1238 match provider.list_models().await {
1239 Ok(models) => {
1240 if !models.is_empty() {
1241 tracing::debug!("Provider {}: {} models", provider_name, models.len());
1242 models_by_provider.insert(provider_name.to_string(), models);
1243 }
1244 }
1245 Err(e) => {
1246 tracing::debug!("Failed to list models for {}: {}", provider_name, e);
1247 }
1248 }
1249 }
1250 }
1251
1252 Ok(models_by_provider)
1253}
1254
1255async fn fallback_registry() -> Result<ProviderRegistry> {
1257 let config = crate::config::Config::load().await.unwrap_or_default();
1258 ProviderRegistry::from_config(&config).await
1259}
1260
1261async fn fetch_pending_tasks(runtime: &WorkerTaskRuntime) -> Result<()> {
1262 tracing::info!("Checking for pending tasks...");
1263
1264 let mut req = runtime
1265 .client
1266 .get(format!("{}/v1/agent/tasks?status=pending", runtime.server));
1267 if let Some(t) = &runtime.token {
1268 req = req.bearer_auth(t);
1269 }
1270
1271 let res = req.send().await?;
1272 if !res.status().is_success() {
1273 return Ok(());
1274 }
1275
1276 let data: serde_json::Value = res.json().await?;
1277 let tasks = if let Some(arr) = data.as_array() {
1279 arr.clone()
1280 } else {
1281 data["tasks"].as_array().cloned().unwrap_or_default()
1282 };
1283
1284 tracing::info!("Found {} pending task(s)", tasks.len());
1285
1286 for task in tasks {
1287 if let Some(id) = task["id"].as_str() {
1288 match reserve_task_slot(&runtime.processing, id, runtime.max_concurrent_tasks).await {
1289 TaskReservation::Reserved => {
1290 let task_id = id.to_string();
1291 let runtime = runtime.clone();
1292
1293 tokio::spawn(async move {
1294 if let Err(e) = handle_task(&runtime, &task).await {
1295 tracing::error!("Task {} failed: {}", task_id, e);
1296 }
1297 runtime.processing.lock().await.remove(&task_id);
1298 });
1299 }
1300 TaskReservation::AlreadyProcessing => {}
1301 TaskReservation::AtCapacity => {
1302 tracing::debug!(
1303 max_concurrent_tasks = runtime.max_concurrent_tasks,
1304 "Worker is at task capacity; leaving remaining tasks pending"
1305 );
1306 break;
1307 }
1308 }
1309 }
1310 }
1311
1312 Ok(())
1313}
1314
1315async fn connect_stream(
1316 runtime: &WorkerTaskRuntime,
1317 name: &str,
1318 codebases: &[String],
1319 task_notify_rx: Option<mpsc::Receiver<String>>,
1320) -> Result<()> {
1321 let url = format!(
1322 "{}/v1/worker/tasks/stream?agent_name={}&worker_id={}",
1323 runtime.server,
1324 urlencoding::encode(name),
1325 urlencoding::encode(&runtime.worker_id)
1326 );
1327
1328 let mut req = runtime
1329 .client
1330 .get(&url)
1331 .header("Accept", "text/event-stream")
1332 .header("X-Worker-ID", &runtime.worker_id)
1333 .header("X-Agent-Name", name)
1334 .header("X-Codebases", codebases.join(","))
1335 .header("X-Workspaces", codebases.join(","));
1336
1337 if let Some(t) = &runtime.token {
1338 req = req.bearer_auth(t);
1339 }
1340
1341 let res = req.send().await?;
1342 if !res.status().is_success() {
1343 anyhow::bail!("Failed to connect: {}", res.status());
1344 }
1345
1346 tracing::info!("Connected to A2A server");
1347
1348 let mut stream = res.bytes_stream();
1349 let mut buffer = String::new();
1350 let mut poll_interval = tokio::time::interval(tokio::time::Duration::from_secs(30));
1351 poll_interval.tick().await; let mut task_notify_rx = task_notify_rx;
1355
1356 loop {
1357 tokio::select! {
1358 task_id = async {
1361 if let Some(ref mut rx) = task_notify_rx {
1362 rx.recv().await
1363 } else {
1364 futures::future::pending().await
1366 }
1367 } => {
1368 if let Some(task_id) = task_id {
1369 tracing::info!("Received task notification via CloudEvent: {}", task_id);
1370 if let Err(e) = poll_pending_tasks(runtime).await {
1372 tracing::warn!("Task notification poll failed: {}", e);
1373 }
1374 }
1375 }
1376 chunk = stream.next() => {
1377 match chunk {
1378 Some(Ok(chunk)) => {
1379 buffer.push_str(&String::from_utf8_lossy(&chunk));
1380
1381 while let Some(pos) = buffer.find("\n\n") {
1383 let event_str = buffer[..pos].to_string();
1384 buffer = buffer[pos + 2..].to_string();
1385
1386 if let Some(data_line) = event_str.lines().find(|l| l.starts_with("data:")) {
1387 let data = data_line.trim_start_matches("data:").trim();
1388 if data == "[DONE]" || data.is_empty() {
1389 continue;
1390 }
1391
1392 if let Ok(task) = serde_json::from_str::<serde_json::Value>(data) {
1393 spawn_task_handler(&task, runtime).await;
1394 }
1395 }
1396 }
1397 }
1398 Some(Err(e)) => {
1399 return Err(e.into());
1400 }
1401 None => {
1402 return Ok(());
1404 }
1405 }
1406 }
1407 _ = poll_interval.tick() => {
1408 if let Err(e) = poll_pending_tasks(runtime).await {
1410 tracing::warn!("Periodic task poll failed: {}", e);
1411 }
1412 }
1413 }
1414 }
1415}
1416
1417async fn spawn_task_handler(task: &serde_json::Value, runtime: &WorkerTaskRuntime) {
1418 if let Some(id) = task
1419 .get("task")
1420 .and_then(|t| t["id"].as_str())
1421 .or_else(|| task["id"].as_str())
1422 {
1423 match reserve_task_slot(&runtime.processing, id, runtime.max_concurrent_tasks).await {
1424 TaskReservation::Reserved => {
1425 let task_id = id.to_string();
1426 let task = task.clone();
1427 let runtime = runtime.clone();
1428
1429 tokio::spawn(async move {
1430 if let Err(e) = handle_task(&runtime, &task).await {
1431 tracing::error!("Task {} failed: {}", task_id, e);
1432 }
1433 runtime.processing.lock().await.remove(&task_id);
1434 });
1435 }
1436 TaskReservation::AlreadyProcessing => {}
1437 TaskReservation::AtCapacity => {
1438 tracing::debug!(
1439 task_id = id,
1440 max_concurrent_tasks = runtime.max_concurrent_tasks,
1441 "Worker is at task capacity; task will stay pending until a slot frees up"
1442 );
1443 }
1444 }
1445 }
1446}
1447
1448async fn poll_pending_tasks(runtime: &WorkerTaskRuntime) -> Result<()> {
1449 let mut req = runtime
1450 .client
1451 .get(format!("{}/v1/agent/tasks?status=pending", runtime.server));
1452 if let Some(t) = &runtime.token {
1453 req = req.bearer_auth(t);
1454 }
1455
1456 let res = req.send().await?;
1457 if !res.status().is_success() {
1458 return Ok(());
1459 }
1460
1461 let data: serde_json::Value = res.json().await?;
1462 let tasks = if let Some(arr) = data.as_array() {
1463 arr.clone()
1464 } else {
1465 data["tasks"].as_array().cloned().unwrap_or_default()
1466 };
1467
1468 if !tasks.is_empty() {
1469 tracing::debug!("Poll found {} pending task(s)", tasks.len());
1470 }
1471
1472 for task in &tasks {
1473 spawn_task_handler(task, runtime).await;
1474 }
1475
1476 Ok(())
1477}
1478
1479async fn handle_task(runtime: &WorkerTaskRuntime, task: &serde_json::Value) -> Result<()> {
1480 let task_id = task_str(task, "id").ok_or_else(|| anyhow::anyhow!("No task ID"))?;
1481 let title = task_str(task, "title").unwrap_or("Untitled");
1482
1483 tracing::info!("Handling task: {} ({})", title, task_id);
1484
1485 let mut req = runtime
1487 .client
1488 .post(format!("{}/v1/worker/tasks/claim", runtime.server))
1489 .header("X-Worker-ID", &runtime.worker_id);
1490 if let Some(t) = &runtime.token {
1491 req = req.bearer_auth(t);
1492 }
1493
1494 let res = req
1495 .json(&serde_json::json!({ "task_id": task_id }))
1496 .send()
1497 .await?;
1498
1499 if !res.status().is_success() {
1500 let status = res.status();
1501 let text = res.text().await?;
1502 if status == reqwest::StatusCode::CONFLICT {
1503 tracing::debug!(task_id, "Task already claimed by another worker, skipping");
1504 } else {
1505 tracing::warn!(task_id, %status, "Failed to claim task: {}", text);
1506 }
1507 return Ok(());
1508 }
1509
1510 let claim = res.json::<TaskClaimResponse>().await?;
1511 let claim_provenance = claim.into_provenance();
1512 tracing::info!(
1513 task_id,
1514 run_id = ?claim_provenance.run_id,
1515 attempt_id = ?claim_provenance.attempt_id,
1516 "Claimed task"
1517 );
1518
1519 let inner_result: Result<(&str, Option<String>, Option<String>, Option<String>)> =
1521 execute_claimed_task(runtime, task, task_id, title, &claim_provenance).await;
1522
1523 let (status, result, error, session_id) = match inner_result {
1524 Ok(tuple) => tuple,
1525 Err(e) => {
1526 tracing::error!(
1527 task_id,
1528 error = %e,
1529 "Task failed after claim (releasing as failed)"
1530 );
1531 (
1532 "failed",
1533 None,
1534 Some(format!("Worker error after claim: {}", e)),
1535 None,
1536 )
1537 }
1538 };
1539
1540 release_task_result(
1541 &runtime.client,
1542 &runtime.server,
1543 &runtime.token,
1544 &runtime.worker_id,
1545 task_id,
1546 status,
1547 result,
1548 error,
1549 session_id,
1550 )
1551 .await?;
1552
1553 tracing::info!("Task released: {} with status: {}", task_id, status);
1554 Ok(())
1555}
1556
1557#[allow(clippy::too_many_lines)]
1560async fn execute_claimed_task<'a>(
1561 runtime: &WorkerTaskRuntime,
1562 task: &'a serde_json::Value,
1563 task_id: &'a str,
1564 title: &'a str,
1565 claim_provenance: &ClaimProvenance,
1566) -> Result<(&'static str, Option<String>, Option<String>, Option<String>)> {
1567 let metadata = task_metadata(task);
1568 let resume_session_id = metadata
1569 .get("resume_session_id")
1570 .and_then(|v| v.as_str())
1571 .map(|s| s.trim().to_string())
1572 .filter(|s| !s.is_empty());
1573 let complexity_hint = metadata_str(&metadata, &["complexity"]);
1574 let model_tier = metadata_str(&metadata, &["model_tier", "tier"])
1575 .map(|s| s.to_ascii_lowercase())
1576 .or_else(|| {
1577 complexity_hint.as_ref().map(|complexity| {
1578 match complexity.to_ascii_lowercase().as_str() {
1579 "quick" => "fast".to_string(),
1580 "deep" => "heavy".to_string(),
1581 _ => "balanced".to_string(),
1582 }
1583 })
1584 });
1585 let worker_personality = metadata_str(
1586 &metadata,
1587 &["worker_personality", "personality", "agent_personality"],
1588 );
1589 let target_agent_name = metadata_str(&metadata, &["target_agent_name", "agent_name"]);
1590 let raw_model = task_str(task, "model_ref")
1591 .or_else(|| metadata_lookup(&metadata, "model_ref").and_then(|v| v.as_str()))
1592 .or_else(|| task_str(task, "model"))
1593 .or_else(|| metadata_lookup(&metadata, "model").and_then(|v| v.as_str()));
1594 let selected_model = raw_model.map(model_ref_to_provider_model);
1595 let raw_agent = task_str(task, "agent_type")
1596 .or_else(|| task_str(task, "agent"))
1597 .unwrap_or("build");
1598 let prompt = task_str(task, "prompt")
1599 .or_else(|| task_str(task, "description"))
1600 .unwrap_or(title);
1601
1602 let workspace_id = task_str(task, "workspace_id")
1604 .or_else(|| metadata_lookup(&metadata, "workspace_id").and_then(|v| v.as_str()));
1605 let is_virtual_task = workspace_id.map_or(false, |ws| ws == "global" || ws.is_empty());
1606
1607 if raw_agent.eq_ignore_ascii_case("clone_repo") {
1608 return match handle_clone_repo_task(
1609 &runtime.client,
1610 &runtime.server,
1611 &runtime.token,
1612 &runtime.worker_id,
1613 task,
1614 &metadata,
1615 )
1616 .await
1617 {
1618 Ok(message) => Ok(("completed", Some(message), None, None)),
1619 Err(err) => {
1620 tracing::error!(task_id, error = %err, "Clone task failed");
1621 Ok(("failed", None, Some(format!("Error: {}", err)), None))
1622 }
1623 };
1624 }
1625
1626 if is_forage_agent(raw_agent) {
1627 return match handle_forage_task(title, prompt, &metadata, selected_model.clone()).await {
1628 Ok(message) => Ok(("completed", Some(message), None, None)),
1629 Err(err) => {
1630 tracing::error!(task_id, error = %err, "Forage task failed");
1631 Ok(("failed", None, Some(format!("Error: {}", err)), None))
1632 }
1633 };
1634 }
1635
1636 let mut session = if let Some(ref sid) = resume_session_id {
1638 match Session::load(sid).await {
1639 Ok(existing) => {
1640 tracing::info!("Resuming session {} for task {}", sid, task_id);
1641 existing
1642 }
1643 Err(e) => {
1644 tracing::warn!(
1645 "Could not load session {} for task {} ({}), starting a new session",
1646 sid,
1647 task_id,
1648 e
1649 );
1650 Session::new().await?
1651 }
1652 }
1653 } else {
1654 Session::new().await?
1655 };
1656
1657 if !is_virtual_task {
1658 if let Some(workspace_path) = resolve_task_workspace_dir(
1659 &runtime.client,
1660 &runtime.server,
1661 &runtime.token,
1662 workspace_id,
1663 )
1664 .await?
1665 {
1666 session.metadata.directory = Some(workspace_path);
1667 }
1668 }
1669
1670 if is_virtual_task {
1673 tracing::info!(
1674 task_id,
1675 workspace_id = workspace_id.unwrap_or("(none)"),
1676 "Virtual task detected — skipping workspace setup"
1677 );
1678 session.metadata.directory = None;
1679 }
1680
1681 let agent_type = if is_swarm_agent(raw_agent) {
1684 raw_agent
1685 } else {
1686 match raw_agent {
1687 "build" | "plan" => raw_agent,
1688 other => {
1689 tracing::info!(
1690 "Agent \"{}\" is not a primary agent, falling back to \"build\"",
1691 other
1692 );
1693 "build"
1694 }
1695 }
1696 };
1697 session.set_agent_name(agent_type.to_string());
1698 session.attach_claim_provenance(claim_provenance);
1699
1700 if !is_virtual_task {
1702 if let Some(directory) = session.metadata.directory.as_deref()
1703 && let Err(err) = install_commit_msg_hook(directory)
1704 {
1705 tracing::warn!(task_id, error = %err, "Failed to install commit-msg hook");
1706 }
1707 }
1708
1709 if let Some(model) = selected_model.clone() {
1710 session.metadata.model = Some(model);
1711 }
1712
1713 tracing::info!(task_id, agent_type, "Executing prompt: {}", prompt);
1714
1715 let stream_client = runtime.client.clone();
1717 let stream_server = runtime.server.clone();
1718 let stream_token = runtime.token.clone();
1719 let stream_worker_id = runtime.worker_id.clone();
1720 let stream_task_id = task_id.to_string();
1721 let stream_bus = Arc::clone(&runtime.bus);
1722
1723 let output_callback: Arc<dyn Fn(String) + Send + Sync + 'static> =
1724 Arc::new(move |output: String| {
1725 let c = stream_client.clone();
1726 let s = stream_server.clone();
1727 let t = stream_token.clone();
1728 let w = stream_worker_id.clone();
1729 let tid = stream_task_id.clone();
1730
1731 let bus_handle = stream_bus.handle("task-output");
1732 bus_handle.send(
1733 format!("task.{}", tid),
1734 crate::bus::BusMessage::TaskUpdate {
1735 task_id: tid.clone(),
1736 state: crate::a2a::types::TaskState::Working,
1737 message: Some(output.clone()),
1738 },
1739 );
1740
1741 tokio::spawn(async move {
1742 let mut req = c
1743 .post(format!("{}/v1/agent/tasks/{}/output", s, tid))
1744 .header("X-Worker-ID", &w);
1745 if let Some(tok) = &t {
1746 req = req.bearer_auth(tok);
1747 }
1748 let _ = req
1749 .json(&serde_json::json!({
1750 "worker_id": w,
1751 "output": output,
1752 }))
1753 .send()
1754 .await;
1755 });
1756 });
1757
1758 let (status, result, error, session_id) = if is_swarm_agent(agent_type) {
1760 match execute_swarm_with_policy(
1761 &mut session,
1762 prompt,
1763 model_tier.as_deref(),
1764 selected_model,
1765 &metadata,
1766 complexity_hint.as_deref(),
1767 worker_personality.as_deref(),
1768 target_agent_name.as_deref(),
1769 Some(&runtime.bus),
1770 Some(Arc::clone(&output_callback)),
1771 )
1772 .await
1773 {
1774 Ok((session_result, true)) => {
1775 tracing::info!("Swarm task completed successfully: {}", task_id);
1776 (
1777 "completed",
1778 Some(session_result.text),
1779 None,
1780 Some(session_result.session_id),
1781 )
1782 }
1783 Ok((session_result, false)) => {
1784 tracing::warn!("Swarm task completed with failures: {}", task_id);
1785 (
1786 "failed",
1787 Some(session_result.text),
1788 Some("Swarm execution completed with failures".to_string()),
1789 Some(session_result.session_id),
1790 )
1791 }
1792 Err(e) => {
1793 tracing::error!("Swarm task failed: {} - {}", task_id, e);
1794 ("failed", None, Some(format!("Error: {}", e)), None)
1795 }
1796 }
1797 } else {
1798 match execute_session_with_policy(
1799 &mut session,
1800 prompt,
1801 runtime.auto_approve,
1802 model_tier.as_deref(),
1803 Some(Arc::clone(&output_callback)),
1804 )
1805 .await
1806 {
1807 Ok(session_result) => {
1808 tracing::info!("Task completed successfully: {}", task_id);
1809 (
1810 "completed",
1811 Some(session_result.text),
1812 None,
1813 Some(session_result.session_id),
1814 )
1815 }
1816 Err(e) => {
1817 tracing::error!(task_id, error = %e, "Task execution failed");
1818 ("failed", None, Some(format!("Error: {}", e)), None)
1819 }
1820 }
1821 };
1822
1823 Ok((
1824 status,
1825 result,
1826 error,
1827 Some(session_id.unwrap_or_else(|| session.id.clone())),
1828 ))
1829}
1830
1831async fn handle_forage_task(
1832 title: &str,
1833 prompt: &str,
1834 metadata: &serde_json::Map<String, serde_json::Value>,
1835 selected_model: Option<String>,
1836) -> Result<String> {
1837 let forage_args = build_forage_args(prompt, title, metadata, selected_model);
1838 let summary = crate::forage::execute_with_summary(forage_args).await?;
1839 Ok(summary.render_text())
1840}
1841
1842async fn handle_clone_repo_task(
1843 client: &Client,
1844 server: &str,
1845 token: &Option<String>,
1846 worker_id: &str,
1847 task: &serde_json::Value,
1848 metadata: &serde_json::Map<String, serde_json::Value>,
1849) -> Result<String> {
1850 let workspace_id = metadata_str(metadata, &["workspace_id"])
1851 .or_else(|| task_str(task, "workspace_id").map(|value| value.to_string()))
1852 .ok_or_else(|| anyhow::anyhow!("Clone task is missing workspace_id metadata"))?;
1853 let workspace = fetch_workspace_record(client, server, token, &workspace_id).await?;
1854 let git_url = metadata_str(metadata, &["git_url"])
1855 .or_else(|| workspace.git_url.clone())
1856 .ok_or_else(|| anyhow::anyhow!("Workspace {} is missing git_url", workspace_id))?;
1857 let branch = metadata_str(metadata, &["git_branch"])
1858 .or_else(|| workspace.git_branch.clone())
1859 .unwrap_or_else(|| "main".to_string());
1860 let repo_path = resolve_workspace_clone_path(&workspace_id, &workspace.path);
1861
1862 if let Some(parent) = repo_path.parent() {
1863 tokio::fs::create_dir_all(parent)
1864 .await
1865 .with_context(|| format!("Failed to create repo parent {}", parent.display()))?;
1866 }
1867
1868 let temp_helper_path =
1869 git_clone_base_dir().join(format!(".{}-git-credential-helper", workspace_id));
1870 write_git_credential_helper_script(&temp_helper_path, &workspace_id)?;
1871
1872 let clone_result = async {
1873 let git_dir = repo_path.join(".git");
1874 if git_dir.exists() {
1875 configure_repo_git_auth(&repo_path, &workspace_id)?;
1876 configure_repo_git_github_app_from_agent_config(
1877 &repo_path,
1878 Some(&serde_json::Value::Object(workspace.agent_config.clone())),
1879 );
1880 run_git_command_at(
1881 Some(&repo_path),
1882 vec!["fetch".to_string(), "origin".to_string(), branch.clone()],
1883 )
1884 .await?;
1885 run_git_command_at(
1886 Some(&repo_path),
1887 vec!["checkout".to_string(), branch.clone()],
1888 )
1889 .await?;
1890 run_git_command_at(
1891 Some(&repo_path),
1892 vec![
1893 "pull".to_string(),
1894 "--ff-only".to_string(),
1895 "origin".to_string(),
1896 branch.clone(),
1897 ],
1898 )
1899 .await?;
1900 } else {
1901 prepare_clone_target(&repo_path).await?;
1902
1903 run_git_command_at(
1904 None,
1905 vec![
1906 "-c".to_string(),
1907 format!("credential.helper={}", temp_helper_path.display()),
1908 "-c".to_string(),
1909 "credential.useHttpPath=true".to_string(),
1910 "clone".to_string(),
1911 "--single-branch".to_string(),
1912 "--branch".to_string(),
1913 branch.clone(),
1914 git_url.clone(),
1915 repo_path.display().to_string(),
1916 ],
1917 )
1918 .await?;
1919 configure_repo_git_auth(&repo_path, &workspace_id)?;
1920 configure_repo_git_github_app_from_agent_config(
1921 &repo_path,
1922 Some(&serde_json::Value::Object(workspace.agent_config.clone())),
1923 );
1924 }
1925 install_commit_msg_hook(&repo_path)?;
1926
1927 register_cloned_workspace(client, server, token, worker_id, &workspace, &repo_path).await?;
1928 enqueue_post_clone_task(client, server, token, worker_id, &workspace_id, metadata).await?;
1929
1930 Ok::<String, anyhow::Error>(format!(
1931 "Repository ready at {} (branch: {})",
1932 repo_path.display(),
1933 branch
1934 ))
1935 }
1936 .await;
1937
1938 let _ = tokio::fs::remove_file(&temp_helper_path).await;
1939 clone_result
1940}
1941
1942async fn register_cloned_workspace(
1943 client: &Client,
1944 server: &str,
1945 token: &Option<String>,
1946 worker_id: &str,
1947 workspace: &RegisteredWorkspaceRecord,
1948 repo_path: &Path,
1949) -> Result<()> {
1950 let mut req = client.post(format!(
1951 "{}/v1/agent/workspaces",
1952 server.trim_end_matches('/')
1953 ));
1954 if let Some(token) = token {
1955 req = req.bearer_auth(token);
1956 }
1957
1958 let response = req
1959 .json(&serde_json::json!({
1960 "workspace_id": workspace.id,
1961 "name": workspace.name,
1962 "path": repo_path.display().to_string(),
1963 "description": workspace.description,
1964 "agent_config": workspace.agent_config,
1965 "git_url": workspace.git_url,
1966 "git_branch": workspace.git_branch,
1967 "worker_id": worker_id,
1968 }))
1969 .send()
1970 .await
1971 .context("Failed to register cloned workspace with server")?;
1972 let status = response.status();
1973 if !status.is_success() {
1974 let body = response.text().await.unwrap_or_default();
1975 anyhow::bail!(
1976 "Failed to register cloned workspace {} ({}): {}",
1977 workspace.id,
1978 status,
1979 body
1980 );
1981 }
1982
1983 Ok(())
1984}
1985
1986async fn enqueue_post_clone_task(
1987 client: &Client,
1988 server: &str,
1989 token: &Option<String>,
1990 worker_id: &str,
1991 workspace_id: &str,
1992 metadata: &serde_json::Map<String, serde_json::Value>,
1993) -> Result<()> {
1994 let Some(post_clone_task) = metadata.get("post_clone_task") else {
1995 return Ok(());
1996 };
1997 let Some(task) = post_clone_task.as_object() else {
1998 return Ok(());
1999 };
2000 let title = task
2001 .get("title")
2002 .and_then(|value| value.as_str())
2003 .filter(|value| !value.trim().is_empty())
2004 .ok_or_else(|| anyhow::anyhow!("post_clone_task is missing title"))?;
2005 let prompt = task
2006 .get("prompt")
2007 .and_then(|value| value.as_str())
2008 .filter(|value| !value.trim().is_empty())
2009 .ok_or_else(|| anyhow::anyhow!("post_clone_task is missing prompt"))?;
2010 let agent_type = task
2011 .get("agent_type")
2012 .and_then(|value| value.as_str())
2013 .unwrap_or("build");
2014 let mut task_metadata = task
2015 .get("metadata")
2016 .cloned()
2017 .unwrap_or_else(|| serde_json::json!({}));
2018 if let Some(task_metadata_obj) = task_metadata.as_object_mut() {
2019 task_metadata_obj
2020 .entry("target_worker_id".to_string())
2021 .or_insert_with(|| serde_json::Value::String(worker_id.to_string()));
2022 }
2023
2024 let mut req = client.post(format!(
2025 "{}/v1/agent/workspaces/{}/tasks",
2026 server.trim_end_matches('/'),
2027 workspace_id
2028 ));
2029 if let Some(token) = token {
2030 req = req.bearer_auth(token);
2031 }
2032
2033 let response = req
2034 .json(&serde_json::json!({
2035 "title": title,
2036 "prompt": prompt,
2037 "agent_type": agent_type,
2038 "metadata": task_metadata,
2039 }))
2040 .send()
2041 .await
2042 .context("Failed to enqueue post-clone task")?;
2043 let status = response.status();
2044 if !status.is_success() {
2045 let body = response.text().await.unwrap_or_default();
2046 anyhow::bail!("Failed to enqueue post-clone task ({}): {}", status, body);
2047 }
2048 Ok(())
2049}
2050
2051async fn run_git_command_at(current_dir: Option<&Path>, args: Vec<String>) -> Result<String> {
2052 let mut command = Command::new("git");
2053 if let Some(dir) = current_dir {
2054 command.current_dir(dir);
2055 }
2056
2057 let output = command
2058 .args(args.iter().map(String::as_str))
2059 .output()
2060 .await
2061 .context("Failed to execute git command")?;
2062
2063 if output.status.success() {
2064 return Ok(String::from_utf8_lossy(&output.stdout).trim().to_string());
2065 }
2066
2067 Err(anyhow::anyhow!(
2068 "Git command failed: {}",
2069 String::from_utf8_lossy(&output.stderr).trim()
2070 ))
2071}
2072
2073async fn release_task_result(
2074 client: &Client,
2075 server: &str,
2076 token: &Option<String>,
2077 worker_id: &str,
2078 task_id: &str,
2079 status: &str,
2080 result: Option<String>,
2081 error: Option<String>,
2082 session_id: Option<String>,
2083) -> Result<()> {
2084 let mut req = client
2085 .post(format!("{}/v1/worker/tasks/release", server))
2086 .header("X-Worker-ID", worker_id);
2087 if let Some(token) = token {
2088 req = req.bearer_auth(token);
2089 }
2090
2091 req.json(&serde_json::json!({
2092 "task_id": task_id,
2093 "status": status,
2094 "result": result,
2095 "error": error,
2096 "session_id": session_id,
2097 }))
2098 .send()
2099 .await
2100 .context("Failed to release task result")?;
2101
2102 Ok(())
2103}
2104
2105async fn execute_swarm_with_policy(
2106 session: &mut Session,
2107 prompt: &str,
2108 model_tier: Option<&str>,
2109 explicit_model: Option<String>,
2110 metadata: &serde_json::Map<String, serde_json::Value>,
2111 complexity_hint: Option<&str>,
2112 worker_personality: Option<&str>,
2113 target_agent_name: Option<&str>,
2114 bus: Option<&Arc<AgentBus>>,
2115 output_callback: Option<Arc<dyn Fn(String) + Send + Sync + 'static>>,
2116) -> Result<(crate::session::SessionResult, bool)> {
2117 use crate::provider::{ContentPart, Message, Role};
2118
2119 session.add_message(Message {
2120 role: Role::User,
2121 content: vec![ContentPart::Text {
2122 text: prompt.to_string(),
2123 }],
2124 });
2125
2126 if session.title.is_none() {
2127 session.generate_title().await?;
2128 }
2129
2130 let strategy = parse_swarm_strategy(metadata);
2131 let max_subagents = metadata_usize(
2132 metadata,
2133 &["swarm_max_subagents", "max_subagents", "subagents"],
2134 )
2135 .unwrap_or(10)
2136 .clamp(1, 100);
2137 let max_steps_per_subagent = metadata_usize(
2138 metadata,
2139 &[
2140 "swarm_max_steps_per_subagent",
2141 "max_steps_per_subagent",
2142 "max_steps",
2143 ],
2144 )
2145 .unwrap_or(50)
2146 .clamp(1, 200);
2147 let timeout_secs = metadata_u64(metadata, &["swarm_timeout_secs", "timeout_secs", "timeout"])
2148 .unwrap_or(600)
2149 .clamp(30, 3600);
2150 let parallel_enabled =
2151 metadata_bool(metadata, &["swarm_parallel_enabled", "parallel_enabled"]).unwrap_or(true);
2152
2153 let model = resolve_swarm_model(explicit_model, model_tier).await;
2154 if let Some(ref selected_model) = model {
2155 session.metadata.model = Some(selected_model.clone());
2156 }
2157
2158 if let Some(ref cb) = output_callback {
2159 cb(format!(
2160 "[swarm] routing complexity={} tier={} personality={} target_agent={}",
2161 complexity_hint.unwrap_or("standard"),
2162 model_tier.unwrap_or("balanced"),
2163 worker_personality.unwrap_or("auto"),
2164 target_agent_name.unwrap_or("auto")
2165 ));
2166 cb(format!(
2167 "[swarm] config strategy={:?} max_subagents={} max_steps={} timeout={}s tier={}",
2168 strategy,
2169 max_subagents,
2170 max_steps_per_subagent,
2171 timeout_secs,
2172 model_tier.unwrap_or("balanced")
2173 ));
2174 }
2175
2176 let swarm_config = SwarmConfig {
2177 max_subagents,
2178 max_steps_per_subagent,
2179 subagent_timeout_secs: timeout_secs,
2180 parallel_enabled,
2181 model,
2182 working_dir: session
2183 .metadata
2184 .directory
2185 .as_ref()
2186 .map(|p| p.to_string_lossy().to_string()),
2187 ..Default::default()
2188 };
2189
2190 let swarm_result = if output_callback.is_some() {
2191 let (event_tx, mut event_rx) = mpsc::channel(256);
2192 let mut executor = SwarmExecutor::new(swarm_config).with_event_tx(event_tx);
2193 if let Some(bus) = bus {
2194 executor = executor.with_bus(Arc::clone(bus));
2195 }
2196 let prompt_owned = prompt.to_string();
2197 let mut exec_handle =
2198 tokio::spawn(async move { executor.execute(&prompt_owned, strategy).await });
2199
2200 let mut final_result: Option<crate::swarm::SwarmResult> = None;
2201
2202 while final_result.is_none() {
2203 tokio::select! {
2204 maybe_event = event_rx.recv() => {
2205 if let Some(event) = maybe_event
2206 && let Some(ref cb) = output_callback
2207 && let Some(line) = format_swarm_event_for_output(&event) {
2208 cb(line);
2209 }
2210 }
2211 join_result = &mut exec_handle => {
2212 let joined = join_result.map_err(|e| anyhow::anyhow!("Swarm join failure: {}", e))?;
2213 final_result = Some(joined?);
2214 }
2215 }
2216 }
2217
2218 while let Ok(event) = event_rx.try_recv() {
2219 if let Some(ref cb) = output_callback
2220 && let Some(line) = format_swarm_event_for_output(&event)
2221 {
2222 cb(line);
2223 }
2224 }
2225
2226 final_result.ok_or_else(|| anyhow::anyhow!("Swarm execution returned no result"))?
2227 } else {
2228 let mut executor = SwarmExecutor::new(swarm_config);
2229 if let Some(bus) = bus {
2230 executor = executor.with_bus(Arc::clone(bus));
2231 }
2232 executor.execute(prompt, strategy).await?
2233 };
2234
2235 let final_text = if swarm_result.result.trim().is_empty() {
2236 if swarm_result.success {
2237 "Swarm completed without textual output.".to_string()
2238 } else {
2239 "Swarm finished with failures and no textual output.".to_string()
2240 }
2241 } else {
2242 swarm_result.result.clone()
2243 };
2244
2245 session.add_message(Message {
2246 role: Role::Assistant,
2247 content: vec![ContentPart::Text {
2248 text: final_text.clone(),
2249 }],
2250 });
2251 session.save().await?;
2252
2253 Ok((
2254 crate::session::SessionResult {
2255 text: final_text,
2256 session_id: session.id.clone(),
2257 },
2258 swarm_result.success,
2259 ))
2260}
2261
2262async fn execute_session_with_policy(
2265 session: &mut Session,
2266 prompt: &str,
2267 auto_approve: AutoApprove,
2268 model_tier: Option<&str>,
2269 output_callback: Option<Arc<dyn Fn(String) + Send + Sync + 'static>>,
2270) -> Result<crate::session::SessionResult> {
2271 use crate::provider::{
2272 CompletionRequest, ContentPart, Message, ProviderRegistry, Role, parse_model_string,
2273 };
2274 use std::sync::Arc;
2275
2276 let registry = ProviderRegistry::from_vault()
2278 .await
2279 .context("Failed to load provider registry from Vault — check VAULT_ADDR/VAULT_TOKEN")?;
2280 let providers = registry.list();
2281 tracing::info!("Available providers: {:?}", providers);
2282
2283 if providers.is_empty() {
2284 anyhow::bail!(
2285 "No LLM providers available (0 providers loaded). \
2286 Configure API keys in HashiCorp Vault or set environment variables. \
2287 Vault address: {}",
2288 std::env::var("VAULT_ADDR").unwrap_or_else(|_| "(not set)".into())
2289 );
2290 }
2291
2292 let (provider_name, model_id) = if let Some(ref model_str) = session.metadata.model {
2294 let (prov, model) = parse_model_string(model_str);
2295 let prov = prov.map(|p| if p == "zhipuai" { "zai" } else { p });
2296 if prov.is_some() {
2297 (prov.map(|s| s.to_string()), model.to_string())
2298 } else if providers.contains(&model) {
2299 (Some(model.to_string()), String::new())
2300 } else {
2301 (None, model.to_string())
2302 }
2303 } else {
2304 (None, String::new())
2305 };
2306
2307 let provider_slice = providers.as_slice();
2308 if let Some(explicit_provider) = provider_name.as_deref()
2309 && !providers.contains(&explicit_provider)
2310 {
2311 anyhow::bail!(
2312 "Provider '{}' selected explicitly but is unavailable. Available providers: {}",
2313 explicit_provider,
2314 providers.join(", ")
2315 );
2316 }
2317
2318 let selected_provider = provider_name
2320 .as_deref()
2321 .unwrap_or_else(|| choose_provider_for_tier(provider_slice, model_tier));
2322
2323 let provider = registry
2324 .get(selected_provider)
2325 .ok_or_else(|| anyhow::anyhow!("Provider {} not found", selected_provider))?;
2326
2327 session.add_message(Message {
2329 role: Role::User,
2330 content: vec![ContentPart::Text {
2331 text: prompt.to_string(),
2332 }],
2333 });
2334
2335 if session.title.is_none() {
2337 session.generate_title().await?;
2338 }
2339
2340 let model = if !model_id.is_empty() {
2342 model_id
2343 } else {
2344 default_model_for_provider(selected_provider, model_tier)
2345 };
2346
2347 let workspace_dir = session
2349 .metadata
2350 .directory
2351 .clone()
2352 .unwrap_or_else(|| std::env::current_dir().unwrap_or_default());
2353 let tool_registry = create_filtered_registry(
2354 Arc::clone(&provider),
2355 model.clone(),
2356 auto_approve,
2357 &workspace_dir,
2358 output_callback.clone(),
2359 );
2360 let tool_definitions = tool_registry.definitions();
2361
2362 let temperature = if prefers_temperature_one(&model) {
2363 Some(1.0)
2364 } else {
2365 Some(0.7)
2366 };
2367
2368 tracing::info!(
2369 "Using model: {} via provider: {} (tier: {:?})",
2370 model,
2371 selected_provider,
2372 model_tier
2373 );
2374 tracing::info!(
2375 "Available tools: {} (auto_approve: {:?})",
2376 tool_definitions.len(),
2377 auto_approve
2378 );
2379
2380 let system_prompt = crate::agent::builtin::build_system_prompt(&workspace_dir);
2382
2383 let mut final_output = String::new();
2384 let max_steps = 50;
2385
2386 for step in 1..=max_steps {
2387 tracing::info!(step = step, "Agent step starting");
2388
2389 let mut messages = vec![Message {
2391 role: Role::System,
2392 content: vec![ContentPart::Text {
2393 text: system_prompt.clone(),
2394 }],
2395 }];
2396 messages.extend(session.messages.clone());
2397
2398 let request = CompletionRequest {
2399 messages,
2400 tools: tool_definitions.clone(),
2401 model: model.clone(),
2402 temperature,
2403 top_p: None,
2404 max_tokens: Some(8192),
2405 stop: Vec::new(),
2406 };
2407
2408 let response = provider.complete(request).await?;
2409
2410 crate::telemetry::TOKEN_USAGE.record_model_usage(
2411 &model,
2412 response.usage.prompt_tokens as u64,
2413 response.usage.completion_tokens as u64,
2414 );
2415
2416 let tool_calls: Vec<(String, String, serde_json::Value)> = response
2418 .message
2419 .content
2420 .iter()
2421 .filter_map(|part| {
2422 if let ContentPart::ToolCall {
2423 id,
2424 name,
2425 arguments,
2426 ..
2427 } = part
2428 {
2429 let args: serde_json::Value =
2430 serde_json::from_str(arguments).unwrap_or(serde_json::json!({}));
2431 Some((id.clone(), name.clone(), args))
2432 } else {
2433 None
2434 }
2435 })
2436 .collect();
2437
2438 for part in &response.message.content {
2440 if let ContentPart::Text { text } = part
2441 && !text.is_empty()
2442 {
2443 final_output.push_str(text);
2444 final_output.push('\n');
2445 if let Some(ref cb) = output_callback {
2446 cb(text.clone());
2447 }
2448 }
2449 }
2450
2451 if tool_calls.is_empty() {
2453 session.add_message(response.message.clone());
2454 break;
2455 }
2456
2457 session.add_message(response.message.clone());
2458
2459 tracing::info!(
2460 step = step,
2461 num_tools = tool_calls.len(),
2462 "Executing tool calls"
2463 );
2464
2465 for (tool_id, tool_name, tool_input) in tool_calls {
2467 tracing::info!(tool = %tool_name, tool_id = %tool_id, "Executing tool");
2468
2469 if let Some(ref cb) = output_callback {
2471 cb(format!("[tool:start:{}]", tool_name));
2472 }
2473
2474 if !is_tool_allowed(&tool_name, auto_approve) {
2476 let msg = format!(
2477 "Tool '{}' requires approval but auto-approve policy is {:?}",
2478 tool_name, auto_approve
2479 );
2480 tracing::warn!(tool = %tool_name, "Tool blocked by auto-approve policy");
2481 session.add_message(Message {
2482 role: Role::Tool,
2483 content: vec![ContentPart::ToolResult {
2484 tool_call_id: tool_id,
2485 content: msg,
2486 }],
2487 });
2488 continue;
2489 }
2490
2491 let content = if let Some(tool) = tool_registry.get(&tool_name) {
2492 let exec_result: Result<crate::tool::ToolResult> =
2493 tool.execute(tool_input.clone()).await;
2494 match exec_result {
2495 Ok(result) => {
2496 tracing::info!(tool = %tool_name, success = result.success, "Tool execution completed");
2497 if let Some(ref cb) = output_callback {
2498 let status = if result.success { "ok" } else { "err" };
2499 cb(format!(
2500 "[tool:{}:{}] {}",
2501 tool_name,
2502 status,
2503 crate::util::truncate_bytes_safe(&result.output, 500)
2504 ));
2505 }
2506 result.output
2507 }
2508 Err(e) => {
2509 tracing::warn!(tool = %tool_name, error = %e, "Tool execution failed");
2510 if let Some(ref cb) = output_callback {
2511 cb(format!("[tool:{}:err] {}", tool_name, e));
2512 }
2513 format!("Error: {}", e)
2514 }
2515 }
2516 } else {
2517 tracing::warn!(tool = %tool_name, "Tool not found");
2518 format!("Error: Unknown tool '{}'", tool_name)
2519 };
2520
2521 session.add_message(Message {
2522 role: Role::Tool,
2523 content: vec![ContentPart::ToolResult {
2524 tool_call_id: tool_id,
2525 content,
2526 }],
2527 });
2528 }
2529 }
2530
2531 session.save().await?;
2532
2533 Ok(crate::session::SessionResult {
2534 text: final_output.trim().to_string(),
2535 session_id: session.id.clone(),
2536 })
2537}
2538
2539pub fn start_heartbeat(
2542 client: Client,
2543 server: String,
2544 token: Option<String>,
2545 heartbeat_state: HeartbeatState,
2546 processing: Arc<Mutex<HashSet<String>>>,
2547 cognition_config: CognitionHeartbeatConfig,
2548) -> JoinHandle<()> {
2549 tokio::spawn(async move {
2550 let mut consecutive_failures = 0u32;
2551 const MAX_FAILURES: u32 = 3;
2552 const HEARTBEAT_INTERVAL_SECS: u64 = 30;
2553 const COGNITION_RETRY_COOLDOWN_SECS: u64 = 300;
2554 let mut cognition_payload_disabled_until: Option<Instant> = None;
2555
2556 let mut interval =
2557 tokio::time::interval(tokio::time::Duration::from_secs(HEARTBEAT_INTERVAL_SECS));
2558 interval.set_missed_tick_behavior(tokio::time::MissedTickBehavior::Skip);
2559
2560 loop {
2561 interval.tick().await;
2562
2563 let active_count = processing.lock().await.len();
2565 heartbeat_state.set_task_count(active_count).await;
2566
2567 let status = if active_count > 0 {
2569 WorkerStatus::Processing
2570 } else {
2571 WorkerStatus::Idle
2572 };
2573 heartbeat_state.set_status(status).await;
2574
2575 let url = format!(
2577 "{}/v1/agent/workers/{}/heartbeat",
2578 server, heartbeat_state.worker_id
2579 );
2580 let mut req = client.post(&url);
2581
2582 if let Some(ref t) = token {
2583 req = req.bearer_auth(t);
2584 }
2585
2586 let status_str = heartbeat_state.status.lock().await.as_str().to_string();
2587 let sub_agents = heartbeat_state.sub_agents_snapshot().await;
2588 let base_payload = serde_json::json!({
2589 "worker_id": &heartbeat_state.worker_id,
2590 "agent_name": &heartbeat_state.agent_name,
2591 "status": status_str,
2592 "active_task_count": active_count,
2593 "sub_agents": sub_agents,
2594 });
2595 let mut payload = base_payload.clone();
2596 let mut included_cognition_payload = false;
2597 let cognition_payload_allowed = cognition_payload_disabled_until
2598 .map(|until| Instant::now() >= until)
2599 .unwrap_or(true);
2600
2601 if cognition_config.enabled
2602 && cognition_payload_allowed
2603 && let Some(cognition_payload) =
2604 fetch_cognition_heartbeat_payload(&client, &cognition_config).await
2605 && let Some(obj) = payload.as_object_mut()
2606 {
2607 obj.insert("cognition".to_string(), cognition_payload);
2608 included_cognition_payload = true;
2609 }
2610
2611 match req.json(&payload).send().await {
2612 Ok(res) => {
2613 if res.status().is_success() {
2614 consecutive_failures = 0;
2615 tracing::debug!(
2616 worker_id = %heartbeat_state.worker_id,
2617 status = status_str,
2618 active_tasks = active_count,
2619 "Heartbeat sent successfully"
2620 );
2621 } else if included_cognition_payload && res.status().is_client_error() {
2622 tracing::warn!(
2623 worker_id = %heartbeat_state.worker_id,
2624 status = %res.status(),
2625 "Heartbeat cognition payload rejected, retrying without cognition payload"
2626 );
2627
2628 let mut retry_req = client.post(&url);
2629 if let Some(ref t) = token {
2630 retry_req = retry_req.bearer_auth(t);
2631 }
2632
2633 match retry_req.json(&base_payload).send().await {
2634 Ok(retry_res) if retry_res.status().is_success() => {
2635 cognition_payload_disabled_until = Some(
2636 Instant::now()
2637 + Duration::from_secs(COGNITION_RETRY_COOLDOWN_SECS),
2638 );
2639 consecutive_failures = 0;
2640 tracing::warn!(
2641 worker_id = %heartbeat_state.worker_id,
2642 retry_after_secs = COGNITION_RETRY_COOLDOWN_SECS,
2643 "Paused cognition heartbeat payload after schema rejection"
2644 );
2645 }
2646 Ok(retry_res) => {
2647 consecutive_failures += 1;
2648 tracing::warn!(
2649 worker_id = %heartbeat_state.worker_id,
2650 status = %retry_res.status(),
2651 failures = consecutive_failures,
2652 "Heartbeat failed even after retry without cognition payload"
2653 );
2654 }
2655 Err(e) => {
2656 consecutive_failures += 1;
2657 tracing::warn!(
2658 worker_id = %heartbeat_state.worker_id,
2659 error = %e,
2660 failures = consecutive_failures,
2661 "Heartbeat retry without cognition payload failed"
2662 );
2663 }
2664 }
2665 } else {
2666 consecutive_failures += 1;
2667 tracing::warn!(
2668 worker_id = %heartbeat_state.worker_id,
2669 status = %res.status(),
2670 failures = consecutive_failures,
2671 "Heartbeat failed"
2672 );
2673 }
2674 }
2675 Err(e) => {
2676 consecutive_failures += 1;
2677 tracing::warn!(
2678 worker_id = %heartbeat_state.worker_id,
2679 error = %e,
2680 failures = consecutive_failures,
2681 "Heartbeat request failed"
2682 );
2683 }
2684 }
2685
2686 if consecutive_failures >= MAX_FAILURES {
2688 tracing::error!(
2689 worker_id = %heartbeat_state.worker_id,
2690 failures = consecutive_failures,
2691 "Heartbeat failed {} consecutive times - worker will continue running and attempt reconnection via SSE loop",
2692 MAX_FAILURES
2693 );
2694 consecutive_failures = 0;
2696 }
2697 }
2698 })
2699}
2700
2701async fn fetch_cognition_heartbeat_payload(
2702 client: &Client,
2703 config: &CognitionHeartbeatConfig,
2704) -> Option<serde_json::Value> {
2705 let status_url = format!("{}/v1/cognition/status", config.source_base_url);
2706 let status_res = tokio::time::timeout(
2707 Duration::from_millis(config.request_timeout_ms),
2708 client.get(status_url).send(),
2709 )
2710 .await
2711 .ok()?
2712 .ok()?;
2713
2714 if !status_res.status().is_success() {
2715 return None;
2716 }
2717
2718 let status: CognitionStatusSnapshot = status_res.json().await.ok()?;
2719 let mut payload = serde_json::json!({
2720 "running": status.running,
2721 "last_tick_at": status.last_tick_at,
2722 "active_persona_count": status.active_persona_count,
2723 "events_buffered": status.events_buffered,
2724 "snapshots_buffered": status.snapshots_buffered,
2725 "loop_interval_ms": status.loop_interval_ms,
2726 });
2727
2728 if config.include_thought_summary {
2729 let snapshot_url = format!("{}/v1/cognition/snapshots/latest", config.source_base_url);
2730 let snapshot_res = tokio::time::timeout(
2731 Duration::from_millis(config.request_timeout_ms),
2732 client.get(snapshot_url).send(),
2733 )
2734 .await
2735 .ok()
2736 .and_then(Result::ok);
2737
2738 if let Some(snapshot_res) = snapshot_res
2739 && snapshot_res.status().is_success()
2740 && let Ok(snapshot) = snapshot_res.json::<CognitionLatestSnapshot>().await
2741 && let Some(obj) = payload.as_object_mut()
2742 {
2743 obj.insert(
2744 "latest_snapshot_at".to_string(),
2745 serde_json::Value::String(snapshot.generated_at),
2746 );
2747 obj.insert(
2748 "latest_thought".to_string(),
2749 serde_json::Value::String(trim_for_heartbeat(
2750 &snapshot.summary,
2751 config.summary_max_chars,
2752 )),
2753 );
2754 if let Some(model) = snapshot
2755 .metadata
2756 .get("model")
2757 .and_then(serde_json::Value::as_str)
2758 {
2759 obj.insert(
2760 "latest_thought_model".to_string(),
2761 serde_json::Value::String(model.to_string()),
2762 );
2763 }
2764 if let Some(source) = snapshot
2765 .metadata
2766 .get("source")
2767 .and_then(serde_json::Value::as_str)
2768 {
2769 obj.insert(
2770 "latest_thought_source".to_string(),
2771 serde_json::Value::String(source.to_string()),
2772 );
2773 }
2774 }
2775 }
2776
2777 Some(payload)
2778}
2779
2780fn trim_for_heartbeat(input: &str, max_chars: usize) -> String {
2781 if input.chars().count() <= max_chars {
2782 return input.trim().to_string();
2783 }
2784
2785 let mut trimmed = String::with_capacity(max_chars + 3);
2786 for ch in input.chars().take(max_chars) {
2787 trimmed.push(ch);
2788 }
2789 trimmed.push_str("...");
2790 trimmed.trim().to_string()
2791}
2792
2793fn env_bool(name: &str, default: bool) -> bool {
2794 std::env::var(name)
2795 .ok()
2796 .and_then(|v| match v.to_ascii_lowercase().as_str() {
2797 "1" | "true" | "yes" | "on" => Some(true),
2798 "0" | "false" | "no" | "off" => Some(false),
2799 _ => None,
2800 })
2801 .unwrap_or(default)
2802}
2803
2804fn env_usize(name: &str, default: usize) -> usize {
2805 std::env::var(name)
2806 .ok()
2807 .and_then(|v| v.parse::<usize>().ok())
2808 .unwrap_or(default)
2809}
2810
2811fn env_u64(name: &str, default: u64) -> u64 {
2812 std::env::var(name)
2813 .ok()
2814 .and_then(|v| v.parse::<u64>().ok())
2815 .unwrap_or(default)
2816}
2817
2818fn maybe_configure_repo_git_auth_from_entry(entry: &serde_json::Value) {
2819 let Some(workspace_id) = entry.get("id").and_then(|v| v.as_str()) else {
2820 return;
2821 };
2822 let Some(path) = entry.get("path").and_then(|v| v.as_str()) else {
2823 return;
2824 };
2825 let has_git_remote = entry
2826 .get("git_url")
2827 .and_then(|v| v.as_str())
2828 .is_some_and(|value| !value.trim().is_empty());
2829 if !has_git_remote {
2830 return;
2831 }
2832
2833 let repo_path = Path::new(path);
2834 if !repo_path.join(".git").exists() {
2835 return;
2836 }
2837
2838 if let Err(error) = configure_repo_git_auth(repo_path, workspace_id) {
2839 tracing::debug!(
2840 workspace_id,
2841 path,
2842 error = %error,
2843 "Workspace sync could not install Git credential helper"
2844 );
2845 }
2846 configure_repo_git_github_app_from_agent_config(repo_path, entry.get("agent_config"));
2847}
2848
2849fn configure_repo_git_github_app_from_agent_config(
2850 repo_path: &Path,
2851 agent_config: Option<&serde_json::Value>,
2852) {
2853 let installation_id = agent_config
2854 .and_then(|value| value.get("git_auth"))
2855 .and_then(|value| value.get("github_app"))
2856 .and_then(|value| value.get("installation_id"))
2857 .and_then(|value| value.as_str());
2858 let app_id = agent_config
2859 .and_then(|value| value.get("git_auth"))
2860 .and_then(|value| value.get("github_app"))
2861 .and_then(|value| value.get("app_id"))
2862 .and_then(|value| value.as_str());
2863 if let Err(error) = configure_repo_git_github_app(repo_path, installation_id, app_id) {
2864 tracing::debug!(path = %repo_path.display(), error = %error, "Failed to persist GitHub App repo metadata");
2865 }
2866}
2867
2868fn start_workspace_sync(
2872 client: Client,
2873 server: String,
2874 token: Option<String>,
2875 shared_codebases: Arc<Mutex<Vec<String>>>,
2876) -> JoinHandle<()> {
2877 tokio::spawn(async move {
2878 const POLL_INTERVAL_SECS: u64 = 60;
2879 let mut interval = tokio::time::interval(Duration::from_secs(POLL_INTERVAL_SECS));
2880 interval.set_missed_tick_behavior(tokio::time::MissedTickBehavior::Skip);
2881 interval.tick().await; loop {
2884 interval.tick().await;
2885 if let Err(e) =
2886 sync_workspaces_from_server(&client, &server, &token, &shared_codebases).await
2887 {
2888 tracing::warn!("Workspace sync failed: {}", e);
2889 }
2890 }
2891 })
2892}
2893
2894async fn sync_workspaces_from_server(
2898 client: &Client,
2899 server: &str,
2900 token: &Option<String>,
2901 shared_codebases: &Arc<Mutex<Vec<String>>>,
2902) -> Result<()> {
2903 let mut req = client.get(format!("{}/v1/agent/workspaces", server));
2904 if let Some(t) = token {
2905 req = req.bearer_auth(t);
2906 }
2907
2908 let res = req.send().await?;
2909 if !res.status().is_success() {
2910 tracing::debug!(
2911 status = %res.status(),
2912 "Workspace sync: server returned non-success, skipping"
2913 );
2914 return Ok(());
2915 }
2916
2917 let data: serde_json::Value = res.json().await?;
2918
2919 let entries = if let Some(arr) = data.as_array() {
2921 arr.clone()
2922 } else {
2923 data["workspaces"]
2924 .as_array()
2925 .or_else(|| data["codebases"].as_array())
2926 .cloned()
2927 .unwrap_or_default()
2928 };
2929
2930 let mut new_paths: Vec<String> = Vec::new();
2931 {
2932 let current = shared_codebases.lock().await;
2933 for entry in &entries {
2934 maybe_configure_repo_git_auth_from_entry(entry);
2935 let path = match entry["path"].as_str().filter(|p| !p.is_empty()) {
2936 Some(p) => p,
2937 None => continue,
2938 };
2939 if std::path::Path::new(path).exists() && !current.iter().any(|c| c.as_str() == path) {
2942 new_paths.push(path.to_string());
2943 }
2944 }
2945 }
2946
2947 if !new_paths.is_empty() {
2948 let mut current = shared_codebases.lock().await;
2949 for path in &new_paths {
2950 tracing::info!(
2951 path = %path,
2952 "Workspace sync: auto-discovered local path, adding to codebases"
2953 );
2954 current.push(path.clone());
2955 }
2956 tracing::info!(
2957 added = new_paths.len(),
2958 total = current.len(),
2959 "Workspace sync complete -- new paths take effect on next reconnect"
2960 );
2961 } else {
2962 tracing::debug!("Workspace sync: no new local paths found");
2963 }
2964
2965 Ok(())
2966}
2967
2968#[cfg(test)]
2969mod tests {
2970 use super::*;
2971 use serde_json::json;
2972
2973 #[test]
2974 fn metadata_lookup_reads_nested_forage_keys() {
2975 let metadata = json!({
2976 "forage": {
2977 "execute": true,
2978 "top": 5,
2979 }
2980 })
2981 .as_object()
2982 .cloned()
2983 .unwrap();
2984
2985 assert_eq!(metadata_bool(&metadata, &["execute"]), Some(true));
2986 assert_eq!(metadata_usize(&metadata, &["top"]), Some(5));
2987 }
2988
2989 #[test]
2990 fn build_forage_args_defaults_prompt_to_moonshot_and_local_mode() {
2991 let metadata = serde_json::Map::new();
2992 let args = build_forage_args(
2993 "Expand enterprise adoption in regulated markets",
2994 "forage task",
2995 &metadata,
2996 Some("openrouter/z-ai/glm-5".to_string()),
2997 );
2998
2999 assert_eq!(
3000 args.moonshots,
3001 vec!["Expand enterprise adoption in regulated markets".to_string()]
3002 );
3003 assert!(args.no_s3);
3004 assert_eq!(args.model.as_deref(), Some("openrouter/z-ai/glm-5"));
3005 assert_eq!(args.execution_engine, "run");
3006 }
3007
3008 #[test]
3009 fn build_forage_args_honors_nested_forage_configuration() {
3010 let metadata = json!({
3011 "forage": {
3012 "top": 7,
3013 "loop": true,
3014 "max_cycles": 2,
3015 "execute": true,
3016 "no_s3": false,
3017 "moonshots": ["Ship autonomous OKR execution", "Reduce operator toil"],
3018 "moonshot_required": true,
3019 "moonshot_min_alignment": "0.25",
3020 "execution_engine": "swarm",
3021 "run_timeout_secs": 1200,
3022 "fail_fast": true,
3023 "swarm_strategy": "stage",
3024 "swarm_max_subagents": 4,
3025 "swarm_max_steps": 42,
3026 "swarm_subagent_timeout_secs": 180
3027 }
3028 })
3029 .as_object()
3030 .cloned()
3031 .unwrap();
3032
3033 let args = build_forage_args("fallback prompt", "title", &metadata, None);
3034
3035 assert_eq!(args.top, 7);
3036 assert!(args.loop_mode);
3037 assert_eq!(args.max_cycles, 2);
3038 assert!(args.execute);
3039 assert!(!args.no_s3);
3040 assert_eq!(args.moonshots.len(), 2);
3041 assert!(args.moonshot_required);
3042 assert!((args.moonshot_min_alignment - 0.25).abs() < f64::EPSILON);
3043 assert_eq!(args.execution_engine, "swarm");
3044 assert_eq!(args.run_timeout_secs, 1200);
3045 assert!(args.fail_fast);
3046 assert_eq!(args.swarm_strategy, "stage");
3047 assert_eq!(args.swarm_max_subagents, 4);
3048 assert_eq!(args.swarm_max_steps, 42);
3049 assert_eq!(args.swarm_subagent_timeout_secs, 180);
3050 }
3051
3052 #[test]
3053 fn resolve_worker_id_prefers_env() {
3054 let original = std::env::var("CODETETHER_WORKER_ID").ok();
3055 unsafe {
3056 std::env::set_var("CODETETHER_WORKER_ID", "harvester-test-worker");
3057 }
3058 let resolved = resolve_worker_id();
3059 match original {
3060 Some(value) => unsafe {
3061 std::env::set_var("CODETETHER_WORKER_ID", value);
3062 },
3063 None => unsafe {
3064 std::env::remove_var("CODETETHER_WORKER_ID");
3065 },
3066 }
3067 assert_eq!(resolved, "harvester-test-worker");
3068 }
3069
3070 #[test]
3071 fn advertised_interfaces_include_http_and_bus_urls() {
3072 let interfaces = advertised_interfaces(Some("http://worker.test:8080/"));
3073 assert_eq!(interfaces["http"]["base_url"], "http://worker.test:8080");
3074 assert_eq!(
3075 interfaces["bus"]["stream_url"],
3076 "http://worker.test:8080/v1/bus/stream"
3077 );
3078 assert_eq!(
3079 interfaces["bus"]["publish_url"],
3080 "http://worker.test:8080/v1/bus/publish"
3081 );
3082 }
3083
3084 #[test]
3085 fn advertised_interfaces_omit_empty_public_url() {
3086 assert_eq!(advertised_interfaces(None), serde_json::json!({}));
3087 assert_eq!(advertised_interfaces(Some(" ")), serde_json::json!({}));
3088 }
3089
3090 #[tokio::test]
3091 async fn reserve_task_slot_enforces_capacity() {
3092 let processing = Arc::new(Mutex::new(HashSet::new()));
3093
3094 assert_eq!(
3095 reserve_task_slot(&processing, "task-1", 2).await,
3096 TaskReservation::Reserved
3097 );
3098 assert_eq!(
3099 reserve_task_slot(&processing, "task-1", 2).await,
3100 TaskReservation::AlreadyProcessing
3101 );
3102 assert_eq!(
3103 reserve_task_slot(&processing, "task-2", 2).await,
3104 TaskReservation::Reserved
3105 );
3106 assert_eq!(
3107 reserve_task_slot(&processing, "task-3", 2).await,
3108 TaskReservation::AtCapacity
3109 );
3110 }
3111
3112 #[test]
3113 fn normalize_max_concurrent_tasks_never_returns_zero() {
3114 assert_eq!(normalize_max_concurrent_tasks(0), 1);
3115 assert_eq!(normalize_max_concurrent_tasks(3), 3);
3116 }
3117}