Skip to main content

codetether_agent/a2a/
worker.rs

1//! A2A Worker - connects to an A2A server to process tasks
2
3mod clone_location;
4mod clone_target;
5
6use crate::a2a::claim::TaskClaimResponse;
7use crate::a2a::git_credentials::{
8    configure_repo_git_auth, configure_repo_git_github_app, write_git_credential_helper_script,
9};
10use crate::a2a::worker_workspace_record::{RegisteredWorkspaceRecord, fetch_workspace_record};
11use crate::bus::AgentBus;
12use crate::cli::{A2aArgs, ForageArgs};
13use crate::provenance::{ClaimProvenance, install_commit_msg_hook};
14use crate::provider::ProviderRegistry;
15use crate::session::Session;
16use crate::swarm::{DecompositionStrategy, SwarmConfig, SwarmExecutor};
17use crate::tui::swarm_view::SwarmEvent;
18use anyhow::{Context, Result};
19use futures::StreamExt;
20use reqwest::Client;
21use serde::Deserialize;
22use std::collections::HashMap;
23use std::collections::HashSet;
24use std::path::{Path, PathBuf};
25use std::sync::Arc;
26use std::time::Duration;
27use tokio::process::Command;
28use tokio::sync::{Mutex, mpsc};
29use tokio::task::JoinHandle;
30use tokio::time::Instant;
31
32use crate::a2a::worker_tool_registry::{create_filtered_registry, is_tool_allowed};
33use crate::a2a::worker_workspace_context::resolve_task_workspace_dir;
34use clone_location::{git_clone_base_dir, resolve_workspace_clone_path};
35use clone_target::prepare_clone_target;
36
37/// Worker status for heartbeat
38#[derive(Debug, Clone, Copy, PartialEq, Eq)]
39pub enum WorkerStatus {
40    Idle,
41    Processing,
42}
43
44impl WorkerStatus {
45    pub fn as_str(&self) -> &'static str {
46        match self {
47            WorkerStatus::Idle => "idle",
48            WorkerStatus::Processing => "processing",
49        }
50    }
51}
52
53/// Heartbeat state shared between the heartbeat task and the main worker
54#[derive(Clone)]
55pub struct HeartbeatState {
56    worker_id: String,
57    pub agent_name: String,
58    pub status: Arc<Mutex<WorkerStatus>>,
59    pub active_task_count: Arc<Mutex<usize>>,
60    pub sub_agents: Arc<Mutex<HashSet<String>>>,
61}
62
63impl HeartbeatState {
64    pub fn new(worker_id: String, agent_name: String) -> Self {
65        Self {
66            worker_id,
67            agent_name,
68            status: Arc::new(Mutex::new(WorkerStatus::Idle)),
69            active_task_count: Arc::new(Mutex::new(0)),
70            sub_agents: Arc::new(Mutex::new(HashSet::new())),
71        }
72    }
73
74    pub async fn set_status(&self, status: WorkerStatus) {
75        *self.status.lock().await = status;
76    }
77
78    pub async fn set_task_count(&self, count: usize) {
79        *self.active_task_count.lock().await = count;
80    }
81
82    pub async fn register_sub_agent(&self, agent_name: String) {
83        self.sub_agents.lock().await.insert(agent_name);
84    }
85
86    pub async fn deregister_sub_agent(&self, agent_name: &str) {
87        self.sub_agents.lock().await.remove(agent_name);
88    }
89
90    pub async fn sub_agents_snapshot(&self) -> Vec<String> {
91        let mut names = self
92            .sub_agents
93            .lock()
94            .await
95            .iter()
96            .cloned()
97            .collect::<Vec<_>>();
98        names.sort();
99        names
100    }
101}
102
103#[derive(Clone, Debug)]
104#[allow(dead_code)]
105pub struct CognitionHeartbeatConfig {
106    pub enabled: bool,
107    pub source_base_url: String,
108    pub token: Option<String>,
109    pub provider_name: String,
110    pub interval_secs: u64,
111    pub include_thought_summary: bool,
112    pub summary_max_chars: usize,
113    pub request_timeout_ms: u64,
114}
115
116impl CognitionHeartbeatConfig {
117    pub fn from_env() -> Self {
118        let source_base_url = std::env::var("CODETETHER_WORKER_COGNITION_SOURCE_URL")
119            .unwrap_or_else(|_| "http://127.0.0.1:4096".to_string())
120            .trim_end_matches('/')
121            .to_string();
122
123        Self {
124            enabled: env_bool("CODETETHER_WORKER_COGNITION_SHARE_ENABLED", true),
125            source_base_url,
126            include_thought_summary: env_bool("CODETETHER_WORKER_COGNITION_INCLUDE_THOUGHTS", true),
127            summary_max_chars: env_usize("CODETETHER_WORKER_COGNITION_THOUGHT_MAX_CHARS", 480),
128            request_timeout_ms: env_u64("CODETETHER_WORKER_COGNITION_TIMEOUT_MS", 2_500).max(250),
129            interval_secs: env_u64("CODETETHER_WORKER_COGNITION_INTERVAL_SECS", 30).max(5),
130            provider_name: std::env::var("CODETETHER_WORKER_COGNITION_PROVIDER")
131                .unwrap_or_else(|_| "cognition".to_string()),
132            token: std::env::var("CODETETHER_WORKER_COGNITION_TOKEN").ok(),
133        }
134    }
135}
136
137#[derive(Debug, Deserialize)]
138struct CognitionStatusSnapshot {
139    running: bool,
140    #[serde(default)]
141    last_tick_at: Option<String>,
142    #[serde(default)]
143    active_persona_count: usize,
144    #[serde(default)]
145    events_buffered: usize,
146    #[serde(default)]
147    snapshots_buffered: usize,
148    #[serde(default)]
149    loop_interval_ms: u64,
150}
151
152#[derive(Debug, Deserialize)]
153struct CognitionLatestSnapshot {
154    generated_at: String,
155    summary: String,
156    #[serde(default)]
157    metadata: HashMap<String, serde_json::Value>,
158}
159
160#[derive(Debug, Clone, Copy, PartialEq, Eq)]
161enum TaskReservation {
162    Reserved,
163    AlreadyProcessing,
164    AtCapacity,
165}
166
167#[derive(Clone)]
168struct WorkerTaskRuntime {
169    client: Client,
170    server: String,
171    token: Option<String>,
172    worker_id: String,
173    processing: Arc<Mutex<HashSet<String>>>,
174    max_concurrent_tasks: usize,
175    auto_approve: AutoApprove,
176    bus: Arc<AgentBus>,
177}
178
179// Run the A2A worker
180pub async fn run(args: A2aArgs) -> Result<()> {
181    let server = args.server.trim_end_matches('/');
182    let name = args
183        .name
184        .unwrap_or_else(|| format!("codetether-{}", std::process::id()));
185    let worker_id = resolve_worker_id();
186    export_worker_runtime_env(server, &args.token, &worker_id);
187    let max_concurrent_tasks = normalize_max_concurrent_tasks(args.max_concurrent_tasks);
188
189    let codebases: Vec<String> = args
190        .workspaces
191        .map(|c| c.split(',').map(|s| s.trim().to_string()).collect())
192        .unwrap_or_else(|| vec![std::env::current_dir().unwrap().display().to_string()]);
193
194    tracing::info!("Starting A2A worker: {} ({})", name, worker_id);
195    tracing::info!("Server: {}", server);
196    tracing::info!("Workspaces: {:?}", codebases);
197    tracing::info!(max_concurrent_tasks, "Worker task concurrency configured");
198
199    // Wrap in shared mutex so background workspace-sync can add new local paths
200    let shared_codebases: Arc<Mutex<Vec<String>>> = Arc::new(Mutex::new(codebases));
201
202    let client = Client::new();
203    let processing = Arc::new(Mutex::new(HashSet::<String>::new()));
204    let cognition_heartbeat = CognitionHeartbeatConfig::from_env();
205    if cognition_heartbeat.enabled {
206        tracing::info!(
207            source = %cognition_heartbeat.source_base_url,
208            include_thoughts = cognition_heartbeat.include_thought_summary,
209            max_chars = cognition_heartbeat.summary_max_chars,
210            timeout_ms = cognition_heartbeat.request_timeout_ms,
211            "Cognition heartbeat sharing enabled (set CODETETHER_WORKER_COGNITION_SHARE_ENABLED=false to disable)"
212        );
213    } else {
214        tracing::warn!(
215            "Cognition heartbeat sharing disabled; worker thought state will not be shared upstream"
216        );
217    }
218
219    let auto_approve = match args.auto_approve.as_str() {
220        "all" => AutoApprove::All,
221        "safe" => AutoApprove::Safe,
222        _ => AutoApprove::None,
223    };
224
225    // Create heartbeat state
226    let heartbeat_state = HeartbeatState::new(worker_id.clone(), name.clone());
227
228    // Create agent bus for in-process sub-agent communication
229    let bus = AgentBus::new().into_arc();
230
231    // Auto-start S3 sink if MinIO is configured
232    crate::bus::s3_sink::spawn_bus_s3_sink(bus.clone());
233
234    {
235        let handle = bus.handle(&worker_id);
236        handle.announce_ready(worker_capabilities());
237    }
238
239    let task_runtime = WorkerTaskRuntime {
240        client: client.clone(),
241        server: server.to_string(),
242        token: args.token.clone(),
243        worker_id: worker_id.clone(),
244        processing: processing.clone(),
245        max_concurrent_tasks,
246        auto_approve,
247        bus: bus.clone(),
248    };
249
250    // Register worker
251    {
252        let codebases = shared_codebases.lock().await.clone();
253        register_worker(
254            &client,
255            server,
256            &args.token,
257            &worker_id,
258            &name,
259            &codebases,
260            args.public_url.as_deref(),
261        )
262        .await?;
263    }
264
265    if let Err(e) =
266        sync_workspaces_from_server(&client, server, &args.token, &shared_codebases).await
267    {
268        tracing::warn!(error = %e, "Initial workspace sync failed");
269    }
270
271    // Fetch pending tasks
272    fetch_pending_tasks(&task_runtime).await?;
273
274    // Start background task that polls server for new locally-present workspaces
275    let _workspace_sync_handle = start_workspace_sync(
276        client.clone(),
277        server.to_string(),
278        args.token.clone(),
279        shared_codebases.clone(),
280    );
281
282    // Connect to SSE stream
283    loop {
284        // Refresh codebases snapshot (background sync may have added new local paths)
285        let codebases = shared_codebases.lock().await.clone();
286
287        // Re-register worker on each reconnection to report updated models/capabilities
288        if let Err(e) = register_worker(
289            &client,
290            server,
291            &args.token,
292            &worker_id,
293            &name,
294            &codebases,
295            args.public_url.as_deref(),
296        )
297        .await
298        {
299            tracing::warn!("Failed to re-register worker on reconnection: {}", e);
300        }
301        if let Err(e) = fetch_pending_tasks(&task_runtime).await {
302            tracing::warn!("Reconnect task fetch failed: {}", e);
303        }
304
305        // Start heartbeat task for this connection
306        let heartbeat_handle = start_heartbeat(
307            client.clone(),
308            server.to_string(),
309            args.token.clone(),
310            heartbeat_state.clone(),
311            processing.clone(),
312            cognition_heartbeat.clone(),
313        );
314
315        match connect_stream(&task_runtime, &name, &codebases, None).await {
316            Ok(()) => {
317                tracing::warn!("Stream ended, reconnecting...");
318            }
319            Err(e) => {
320                tracing::error!("Stream error: {}, reconnecting...", e);
321            }
322        }
323
324        // Cancel heartbeat on disconnection
325        heartbeat_handle.abort();
326        tracing::debug!("Heartbeat cancelled for reconnection");
327
328        tokio::time::sleep(tokio::time::Duration::from_secs(5)).await;
329    }
330}
331
332/// Run the A2A worker with shared state for HTTP server integration
333/// This variant accepts a WorkerServerState to communicate with the HTTP server
334pub async fn run_with_state(
335    args: A2aArgs,
336    server_state: crate::worker_server::WorkerServerState,
337) -> Result<()> {
338    let server = args.server.trim_end_matches('/');
339    let name = args
340        .name
341        .unwrap_or_else(|| format!("codetether-{}", std::process::id()));
342    let worker_id = resolve_worker_id();
343    export_worker_runtime_env(server, &args.token, &worker_id);
344    let max_concurrent_tasks = normalize_max_concurrent_tasks(args.max_concurrent_tasks);
345
346    // Share worker_id with HTTP server
347    server_state.set_worker_id(worker_id.clone()).await;
348
349    let codebases: Vec<String> = args
350        .workspaces
351        .map(|c| c.split(',').map(|s| s.trim().to_string()).collect())
352        .unwrap_or_else(|| vec![std::env::current_dir().unwrap().display().to_string()]);
353
354    tracing::info!("Starting A2A worker: {} ({})", name, worker_id);
355    tracing::info!("Server: {}", server);
356    tracing::info!("Workspaces: {:?}", codebases);
357    tracing::info!(max_concurrent_tasks, "Worker task concurrency configured");
358
359    // Wrap in shared mutex so background workspace-sync can add new local paths
360    let shared_codebases: Arc<Mutex<Vec<String>>> = Arc::new(Mutex::new(codebases));
361
362    let client = Client::new();
363    let processing = Arc::new(Mutex::new(HashSet::<String>::new()));
364    let cognition_heartbeat = CognitionHeartbeatConfig::from_env();
365    if cognition_heartbeat.enabled {
366        tracing::info!(
367            source = %cognition_heartbeat.source_base_url,
368            include_thoughts = cognition_heartbeat.include_thought_summary,
369            max_chars = cognition_heartbeat.summary_max_chars,
370            timeout_ms = cognition_heartbeat.request_timeout_ms,
371            "Cognition heartbeat sharing enabled (set CODETETHER_WORKER_COGNITION_SHARE_ENABLED=false to disable)"
372        );
373    } else {
374        tracing::warn!(
375            "Cognition heartbeat sharing disabled; worker thought state will not be shared upstream"
376        );
377    }
378
379    let auto_approve = match args.auto_approve.as_str() {
380        "all" => AutoApprove::All,
381        "safe" => AutoApprove::Safe,
382        _ => AutoApprove::None,
383    };
384
385    // Create heartbeat state
386    let heartbeat_state = HeartbeatState::new(worker_id.clone(), name.clone());
387
388    // Share heartbeat state with HTTP server
389    server_state
390        .set_heartbeat_state(Arc::new(heartbeat_state.clone()))
391        .await;
392
393    // Create agent bus for in-process sub-agent communication
394    let bus = AgentBus::new().into_arc();
395    server_state.set_bus(bus.clone()).await;
396
397    // Auto-start S3 sink if MinIO is configured
398    crate::bus::s3_sink::spawn_bus_s3_sink(bus.clone());
399
400    {
401        let handle = bus.handle(&worker_id);
402        handle.announce_ready(worker_capabilities());
403    }
404
405    let task_runtime = WorkerTaskRuntime {
406        client: client.clone(),
407        server: server.to_string(),
408        token: args.token.clone(),
409        worker_id: worker_id.clone(),
410        processing: processing.clone(),
411        max_concurrent_tasks,
412        auto_approve,
413        bus: bus.clone(),
414    };
415
416    // Register worker
417    {
418        let codebases = shared_codebases.lock().await.clone();
419        register_worker(
420            &client,
421            server,
422            &args.token,
423            &worker_id,
424            &name,
425            &codebases,
426            args.public_url.as_deref(),
427        )
428        .await?;
429    }
430
431    if let Err(e) =
432        sync_workspaces_from_server(&client, server, &args.token, &shared_codebases).await
433    {
434        tracing::warn!(error = %e, "Initial workspace sync failed");
435    }
436
437    // Mark as connected
438    server_state.set_connected(true).await;
439
440    // Fetch pending tasks before entering reconnection loop
441    fetch_pending_tasks(&task_runtime).await?;
442
443    // Start background task that polls server for new locally-present workspaces
444    let _workspace_sync_handle = start_workspace_sync(
445        client.clone(),
446        server.to_string(),
447        args.token.clone(),
448        shared_codebases.clone(),
449    );
450
451    // Connect to SSE stream
452    loop {
453        // Refresh codebases snapshot (background sync may have added new local paths)
454        let codebases = shared_codebases.lock().await.clone();
455
456        // Create task notification channel for CloudEvent-triggered task execution
457        // Recreate on each reconnection since the receiver is moved into connect_stream
458        let (task_notify_tx, task_notify_rx) = mpsc::channel::<String>(32);
459        server_state
460            .set_task_notification_channel(task_notify_tx)
461            .await;
462
463        // Mark as connected on each reconnection
464        server_state.set_connected(true).await;
465
466        // Re-register worker on each reconnection to report updated models/capabilities
467        if let Err(e) = register_worker(
468            &client,
469            server,
470            &args.token,
471            &worker_id,
472            &name,
473            &codebases,
474            args.public_url.as_deref(),
475        )
476        .await
477        {
478            tracing::warn!("Failed to re-register worker on reconnection: {}", e);
479        }
480        if let Err(e) = fetch_pending_tasks(&task_runtime).await {
481            tracing::warn!("Reconnect task fetch failed: {}", e);
482        }
483
484        // Start heartbeat task for this connection
485        let heartbeat_handle = start_heartbeat(
486            client.clone(),
487            server.to_string(),
488            args.token.clone(),
489            heartbeat_state.clone(),
490            processing.clone(),
491            cognition_heartbeat.clone(),
492        );
493
494        match connect_stream(&task_runtime, &name, &codebases, Some(task_notify_rx)).await {
495            Ok(()) => {
496                tracing::warn!("Stream ended, reconnecting...");
497            }
498            Err(e) => {
499                tracing::error!("Stream error: {}, reconnecting...", e);
500            }
501        }
502
503        // Mark as disconnected
504        server_state.set_connected(false).await;
505
506        // Cancel heartbeat on disconnection
507        heartbeat_handle.abort();
508        tracing::debug!("Heartbeat cancelled for reconnection");
509
510        tokio::time::sleep(tokio::time::Duration::from_secs(5)).await;
511    }
512}
513
514pub fn generate_worker_id() -> String {
515    format!(
516        "wrk_{}_{:x}",
517        chrono::Utc::now().timestamp(),
518        rand::random::<u64>()
519    )
520}
521
522fn resolve_worker_id() -> String {
523    for key in ["CODETETHER_WORKER_ID", "A2A_WORKER_ID"] {
524        if let Ok(value) = std::env::var(key) {
525            let trimmed = value.trim();
526            if !trimmed.is_empty() {
527                return trimmed.to_string();
528            }
529        }
530    }
531    generate_worker_id()
532}
533
534fn normalize_max_concurrent_tasks(max_concurrent_tasks: usize) -> usize {
535    max_concurrent_tasks.max(1)
536}
537
538async fn reserve_task_slot(
539    processing: &Arc<Mutex<HashSet<String>>>,
540    task_id: &str,
541    max_concurrent_tasks: usize,
542) -> TaskReservation {
543    let mut proc = processing.lock().await;
544    if proc.contains(task_id) {
545        TaskReservation::AlreadyProcessing
546    } else if proc.len() >= max_concurrent_tasks {
547        TaskReservation::AtCapacity
548    } else {
549        proc.insert(task_id.to_string());
550        TaskReservation::Reserved
551    }
552}
553
554fn export_worker_runtime_env(server: &str, token: &Option<String>, worker_id: &str) {
555    // SAFETY: The worker sets these process-wide variables once during startup before
556    // spawning Git helper child processes. They are required so Git credential helpers
557    // invoked by later shell/git commands can reach the control plane securely.
558    unsafe {
559        std::env::set_var("CODETETHER_SERVER", server);
560        std::env::set_var("CODETETHER_WORKER_ID", worker_id);
561        if let Some(token) = token {
562            std::env::set_var("CODETETHER_TOKEN", token);
563        }
564    }
565}
566
567fn advertised_interfaces(public_url: Option<&str>) -> serde_json::Value {
568    let Some(base_url) = public_url
569        .map(str::trim)
570        .filter(|value| !value.is_empty())
571        .map(|value| value.trim_end_matches('/').to_string())
572    else {
573        return serde_json::json!({});
574    };
575
576    serde_json::json!({
577        "http": {
578            "base_url": base_url,
579        },
580        "bus": {
581            "stream_url": format!("{base_url}/v1/bus/stream"),
582            "publish_url": format!("{base_url}/v1/bus/publish"),
583        },
584    })
585}
586
587/// Approval policy used by the worker when deciding whether to execute tools automatically.
588///
589/// # Examples
590///
591/// ```rust
592/// use codetether_agent::a2a::worker::AutoApprove;
593///
594/// let mode = AutoApprove::Safe;
595/// assert!(matches!(mode, AutoApprove::Safe));
596/// ```
597#[derive(Debug, Clone, Copy)]
598pub enum AutoApprove {
599    All,
600    Safe,
601    None,
602}
603
604/// Default A2A server URL when none is configured
605pub const DEFAULT_A2A_SERVER_URL: &str = "https://api.codetether.run";
606
607/// Capabilities of the codetether-agent worker
608const BASE_WORKER_CAPABILITIES: &[&str] = &[
609    "forage", "ralph", "swarm", "rlm", "a2a", "mcp", "grpc", "grpc-web", "jsonrpc",
610];
611
612fn worker_capabilities() -> Vec<String> {
613    let mut capabilities: Vec<String> = BASE_WORKER_CAPABILITIES
614        .iter()
615        .map(|capability| capability.to_string())
616        .collect();
617
618    let is_knative = std::env::var("KNATIVE_SERVICE")
619        .map(|value| {
620            let normalized = value.trim().to_lowercase();
621            normalized == "1" || normalized == "true" || normalized == "yes"
622        })
623        .unwrap_or(false);
624    if is_knative {
625        capabilities.push("knative".to_string());
626    }
627
628    capabilities
629}
630
631fn task_value<'a>(task: &'a serde_json::Value, key: &str) -> Option<&'a serde_json::Value> {
632    task.get("task")
633        .and_then(|t| t.get(key))
634        .or_else(|| task.get(key))
635}
636
637fn task_str<'a>(task: &'a serde_json::Value, key: &str) -> Option<&'a str> {
638    task_value(task, key).and_then(|v| v.as_str())
639}
640
641fn task_metadata(task: &serde_json::Value) -> serde_json::Map<String, serde_json::Value> {
642    task_value(task, "metadata")
643        .and_then(|m| m.as_object())
644        .cloned()
645        .unwrap_or_default()
646}
647
648fn model_ref_to_provider_model(model: &str) -> String {
649    // Convert "provider:model" to "provider/model" format, but only if
650    // there is no '/' already present. Model IDs like "amazon.nova-micro-v1:0"
651    // contain colons as version separators and must NOT be converted.
652    if !model.contains('/') && model.contains(':') {
653        model.replacen(':', "/", 1)
654    } else {
655        model.to_string()
656    }
657}
658
659fn provider_preferences_for_tier(model_tier: Option<&str>) -> &'static [&'static str] {
660    match model_tier.unwrap_or("balanced") {
661        "fast" | "quick" => &[
662            "zai",
663            "openai",
664            "github-copilot",
665            "moonshotai",
666            "openrouter",
667            "novita",
668            "google",
669            "anthropic",
670        ],
671        "heavy" | "deep" => &[
672            "zai",
673            "anthropic",
674            "openai",
675            "github-copilot",
676            "moonshotai",
677            "openrouter",
678            "novita",
679            "google",
680        ],
681        _ => &[
682            "zai",
683            "openai",
684            "github-copilot",
685            "anthropic",
686            "moonshotai",
687            "openrouter",
688            "novita",
689            "google",
690        ],
691    }
692}
693
694fn choose_provider_for_tier<'a>(providers: &'a [&'a str], model_tier: Option<&str>) -> &'a str {
695    for preferred in provider_preferences_for_tier(model_tier) {
696        if let Some(found) = providers.iter().copied().find(|p| *p == *preferred) {
697            return found;
698        }
699    }
700    if let Some(found) = providers.iter().copied().find(|p| *p == "zai") {
701        return found;
702    }
703    providers[0]
704}
705
706fn default_model_for_provider(provider: &str, model_tier: Option<&str>) -> String {
707    match model_tier.unwrap_or("balanced") {
708        "fast" | "quick" => match provider {
709            "moonshotai" => "kimi-k2.5".to_string(),
710            "anthropic" => "claude-haiku-4-5".to_string(),
711            "openai" => "gpt-4o-mini".to_string(),
712            "google" => "gemini-2.5-flash".to_string(),
713            "zhipuai" | "zai" => "glm-5".to_string(),
714            "openrouter" => "z-ai/glm-5".to_string(),
715            "novita" => "Qwen/Qwen3.5-35B-A3B".to_string(),
716            "bedrock" => "amazon.nova-lite-v1:0".to_string(),
717            _ => "glm-5".to_string(),
718        },
719        "heavy" | "deep" => match provider {
720            "moonshotai" => "kimi-k2.5".to_string(),
721            "anthropic" => "claude-sonnet-4-20250514".to_string(),
722            "openai" => "o3".to_string(),
723            "google" => "gemini-2.5-pro".to_string(),
724            "zhipuai" | "zai" => "glm-5".to_string(),
725            "openrouter" => "z-ai/glm-5".to_string(),
726            "novita" => "Qwen/Qwen3.5-35B-A3B".to_string(),
727            "bedrock" => "us.anthropic.claude-opus-4-6-v1".to_string(),
728            _ => "glm-5".to_string(),
729        },
730        _ => match provider {
731            "moonshotai" => "kimi-k2.5".to_string(),
732            "anthropic" => "claude-sonnet-4-20250514".to_string(),
733            "openai" => "gpt-4o".to_string(),
734            "google" => "gemini-2.5-pro".to_string(),
735            "zhipuai" | "zai" => "glm-5".to_string(),
736            "openrouter" => "z-ai/glm-5".to_string(),
737            "novita" => "Qwen/Qwen3.5-35B-A3B".to_string(),
738            "bedrock" => "amazon.nova-lite-v1:0".to_string(),
739            _ => "glm-5".to_string(),
740        },
741    }
742}
743
744fn prefers_temperature_one(model: &str) -> bool {
745    let normalized = model.to_ascii_lowercase();
746    normalized.contains("kimi-k2") || normalized.contains("glm-") || normalized.contains("minimax")
747}
748
749fn is_swarm_agent(agent_type: &str) -> bool {
750    matches!(
751        agent_type.trim().to_ascii_lowercase().as_str(),
752        "swarm" | "parallel" | "multi-agent"
753    )
754}
755
756fn is_forage_agent(agent_type: &str) -> bool {
757    agent_type.trim().eq_ignore_ascii_case("forage")
758}
759
760fn metadata_lookup<'a>(
761    metadata: &'a serde_json::Map<String, serde_json::Value>,
762    key: &str,
763) -> Option<&'a serde_json::Value> {
764    metadata
765        .get(key)
766        .or_else(|| {
767            metadata
768                .get("routing")
769                .and_then(|v| v.as_object())
770                .and_then(|obj| obj.get(key))
771        })
772        .or_else(|| {
773            metadata
774                .get("swarm")
775                .and_then(|v| v.as_object())
776                .and_then(|obj| obj.get(key))
777        })
778        .or_else(|| {
779            metadata
780                .get("forage")
781                .and_then(|v| v.as_object())
782                .and_then(|obj| obj.get(key))
783        })
784}
785
786fn metadata_str(
787    metadata: &serde_json::Map<String, serde_json::Value>,
788    keys: &[&str],
789) -> Option<String> {
790    for key in keys {
791        if let Some(value) = metadata_lookup(metadata, key).and_then(|v| v.as_str()) {
792            let trimmed = value.trim();
793            if !trimmed.is_empty() {
794                return Some(trimmed.to_string());
795            }
796        }
797    }
798    None
799}
800
801fn metadata_usize(
802    metadata: &serde_json::Map<String, serde_json::Value>,
803    keys: &[&str],
804) -> Option<usize> {
805    for key in keys {
806        if let Some(value) = metadata_lookup(metadata, key) {
807            if let Some(v) = value.as_u64() {
808                return usize::try_from(v).ok();
809            }
810            if let Some(v) = value.as_i64()
811                && v >= 0
812            {
813                return usize::try_from(v as u64).ok();
814            }
815            if let Some(v) = value.as_str()
816                && let Ok(parsed) = v.trim().parse::<usize>()
817            {
818                return Some(parsed);
819            }
820        }
821    }
822    None
823}
824
825fn metadata_u64(
826    metadata: &serde_json::Map<String, serde_json::Value>,
827    keys: &[&str],
828) -> Option<u64> {
829    for key in keys {
830        if let Some(value) = metadata_lookup(metadata, key) {
831            if let Some(v) = value.as_u64() {
832                return Some(v);
833            }
834            if let Some(v) = value.as_i64()
835                && v >= 0
836            {
837                return Some(v as u64);
838            }
839            if let Some(v) = value.as_str()
840                && let Ok(parsed) = v.trim().parse::<u64>()
841            {
842                return Some(parsed);
843            }
844        }
845    }
846    None
847}
848
849fn metadata_bool(
850    metadata: &serde_json::Map<String, serde_json::Value>,
851    keys: &[&str],
852) -> Option<bool> {
853    for key in keys {
854        if let Some(value) = metadata_lookup(metadata, key) {
855            if let Some(v) = value.as_bool() {
856                return Some(v);
857            }
858            if let Some(v) = value.as_str() {
859                match v.trim().to_ascii_lowercase().as_str() {
860                    "1" | "true" | "yes" | "on" => return Some(true),
861                    "0" | "false" | "no" | "off" => return Some(false),
862                    _ => {}
863                }
864            }
865        }
866    }
867    None
868}
869
870fn metadata_f64(
871    metadata: &serde_json::Map<String, serde_json::Value>,
872    keys: &[&str],
873) -> Option<f64> {
874    for key in keys {
875        if let Some(value) = metadata_lookup(metadata, key) {
876            if let Some(v) = value.as_f64() {
877                return Some(v);
878            }
879            if let Some(v) = value.as_i64() {
880                return Some(v as f64);
881            }
882            if let Some(v) = value.as_u64() {
883                return Some(v as f64);
884            }
885            if let Some(v) = value.as_str()
886                && let Ok(parsed) = v.trim().parse::<f64>()
887            {
888                return Some(parsed);
889            }
890        }
891    }
892    None
893}
894
895fn metadata_string_list(
896    metadata: &serde_json::Map<String, serde_json::Value>,
897    keys: &[&str],
898) -> Vec<String> {
899    for key in keys {
900        let Some(value) = metadata_lookup(metadata, key) else {
901            continue;
902        };
903
904        if let Some(items) = value.as_array() {
905            let parsed = items
906                .iter()
907                .filter_map(|item| item.as_str())
908                .map(str::trim)
909                .filter(|item| !item.is_empty())
910                .map(ToString::to_string)
911                .collect::<Vec<_>>();
912            if !parsed.is_empty() {
913                return parsed;
914            }
915        }
916
917        if let Some(value) = value.as_str() {
918            let parsed = value
919                .split(['\n', ','])
920                .map(str::trim)
921                .filter(|item| !item.is_empty())
922                .map(ToString::to_string)
923                .collect::<Vec<_>>();
924            if !parsed.is_empty() {
925                return parsed;
926            }
927        }
928    }
929
930    Vec::new()
931}
932
933fn normalize_forage_execution_engine(value: Option<String>) -> String {
934    match value
935        .as_deref()
936        .unwrap_or("run")
937        .trim()
938        .to_ascii_lowercase()
939        .as_str()
940    {
941        "swarm" => "swarm".to_string(),
942        "go" => "go".to_string(),
943        _ => "run".to_string(),
944    }
945}
946
947fn normalize_forage_swarm_strategy(value: Option<String>) -> String {
948    match value
949        .as_deref()
950        .unwrap_or("auto")
951        .trim()
952        .to_ascii_lowercase()
953        .as_str()
954    {
955        "domain" => "domain".to_string(),
956        "data" => "data".to_string(),
957        "stage" => "stage".to_string(),
958        "none" => "none".to_string(),
959        _ => "auto".to_string(),
960    }
961}
962
963fn build_forage_args(
964    prompt: &str,
965    title: &str,
966    metadata: &serde_json::Map<String, serde_json::Value>,
967    selected_model: Option<String>,
968) -> ForageArgs {
969    let mut moonshots = metadata_string_list(
970        metadata,
971        &["moonshots", "moonshot", "goals", "mission", "missions"],
972    );
973    if moonshots.is_empty() {
974        let fallback = prompt.trim();
975        if !fallback.is_empty() {
976            moonshots.push(fallback.to_string());
977        } else if !title.trim().is_empty() {
978            moonshots.push(title.trim().to_string());
979        }
980    }
981
982    ForageArgs {
983        top: metadata_usize(metadata, &["top"]).unwrap_or(3),
984        loop_mode: metadata_bool(metadata, &["loop", "loop_mode"]).unwrap_or(false),
985        interval_secs: metadata_u64(metadata, &["interval_secs", "interval"]).unwrap_or(120),
986        max_cycles: metadata_usize(metadata, &["max_cycles"]).unwrap_or(0),
987        execute: metadata_bool(metadata, &["execute"]).unwrap_or(false),
988        // Task-queue forage should run without requiring S3 archival by default.
989        no_s3: metadata_bool(metadata, &["no_s3", "local_only"]).unwrap_or(true),
990        moonshots,
991        moonshot_file: metadata_str(metadata, &["moonshot_file"]).map(PathBuf::from),
992        moonshot_required: metadata_bool(metadata, &["moonshot_required"]).unwrap_or(false),
993        moonshot_min_alignment: metadata_f64(metadata, &["moonshot_min_alignment"]).unwrap_or(0.10),
994        execution_engine: normalize_forage_execution_engine(metadata_str(
995            metadata,
996            &["execution_engine", "engine"],
997        )),
998        run_timeout_secs: metadata_u64(metadata, &["run_timeout_secs", "timeout_secs", "timeout"])
999            .unwrap_or(900),
1000        fail_fast: metadata_bool(metadata, &["fail_fast"]).unwrap_or(false),
1001        swarm_strategy: normalize_forage_swarm_strategy(metadata_str(
1002            metadata,
1003            &["swarm_strategy", "strategy"],
1004        )),
1005        swarm_max_subagents: metadata_usize(metadata, &["swarm_max_subagents"]).unwrap_or(8),
1006        swarm_max_steps: metadata_usize(metadata, &["swarm_max_steps"]).unwrap_or(100),
1007        swarm_subagent_timeout_secs: metadata_u64(metadata, &["swarm_subagent_timeout_secs"])
1008            .unwrap_or(300),
1009        model: selected_model,
1010        json: metadata_bool(metadata, &["json"]).unwrap_or(false),
1011    }
1012}
1013
1014fn parse_swarm_strategy(
1015    metadata: &serde_json::Map<String, serde_json::Value>,
1016) -> DecompositionStrategy {
1017    match metadata_str(
1018        metadata,
1019        &[
1020            "decomposition_strategy",
1021            "swarm_strategy",
1022            "strategy",
1023            "swarm_decomposition",
1024        ],
1025    )
1026    .as_deref()
1027    .map(|s| s.to_ascii_lowercase())
1028    .as_deref()
1029    {
1030        Some("none") | Some("single") => DecompositionStrategy::None,
1031        Some("domain") | Some("by_domain") => DecompositionStrategy::ByDomain,
1032        Some("data") | Some("by_data") => DecompositionStrategy::ByData,
1033        Some("stage") | Some("by_stage") => DecompositionStrategy::ByStage,
1034        _ => DecompositionStrategy::Automatic,
1035    }
1036}
1037
1038async fn resolve_swarm_model(
1039    explicit_model: Option<String>,
1040    model_tier: Option<&str>,
1041) -> Option<String> {
1042    if let Some(model) = explicit_model
1043        && !model.trim().is_empty()
1044    {
1045        return Some(model);
1046    }
1047
1048    let registry = ProviderRegistry::from_vault().await.ok()?;
1049    let providers = registry.list();
1050    if providers.is_empty() {
1051        return None;
1052    }
1053    let provider = choose_provider_for_tier(providers.as_slice(), model_tier);
1054    let model = default_model_for_provider(provider, model_tier);
1055    Some(format!("{}/{}", provider, model))
1056}
1057
1058fn format_swarm_event_for_output(event: &SwarmEvent) -> Option<String> {
1059    match event {
1060        SwarmEvent::Started {
1061            task,
1062            total_subtasks,
1063        } => Some(format!(
1064            "[swarm] started task={} planned_subtasks={}",
1065            task, total_subtasks
1066        )),
1067        SwarmEvent::StageComplete {
1068            stage,
1069            completed,
1070            failed,
1071        } => Some(format!(
1072            "[swarm] stage={} completed={} failed={}",
1073            stage, completed, failed
1074        )),
1075        SwarmEvent::SubTaskUpdate { id, status, .. } => Some(format!(
1076            "[swarm] subtask id={} status={}",
1077            &id.chars().take(8).collect::<String>(),
1078            format!("{status:?}").to_ascii_lowercase()
1079        )),
1080        SwarmEvent::AgentToolCall {
1081            subtask_id,
1082            tool_name,
1083        } => Some(format!(
1084            "[swarm] subtask id={} tool={}",
1085            &subtask_id.chars().take(8).collect::<String>(),
1086            tool_name
1087        )),
1088        SwarmEvent::AgentError { subtask_id, error } => Some(format!(
1089            "[swarm] subtask id={} error={}",
1090            &subtask_id.chars().take(8).collect::<String>(),
1091            error
1092        )),
1093        SwarmEvent::Complete { success, stats } => Some(format!(
1094            "[swarm] complete success={} subtasks={} speedup={:.2}",
1095            success,
1096            stats.subagents_completed + stats.subagents_failed,
1097            stats.speedup_factor
1098        )),
1099        SwarmEvent::Error(err) => Some(format!("[swarm] error message={}", err)),
1100        _ => None,
1101    }
1102}
1103
1104pub async fn register_worker(
1105    client: &Client,
1106    server: &str,
1107    token: &Option<String>,
1108    worker_id: &str,
1109    name: &str,
1110    codebases: &[String],
1111    public_url: Option<&str>,
1112) -> Result<()> {
1113    // Load ProviderRegistry and collect available models
1114    let models = match load_provider_models().await {
1115        Ok(m) => m,
1116        Err(e) => {
1117            tracing::warn!(
1118                "Failed to load provider models: {}, proceeding without model info",
1119                e
1120            );
1121            HashMap::new()
1122        }
1123    };
1124
1125    // Register via the workers/register endpoint
1126    let mut req = client.post(format!("{}/v1/agent/workers/register", server));
1127
1128    if let Some(t) = token {
1129        req = req.bearer_auth(t);
1130    }
1131
1132    // Flatten models HashMap into array of model objects with pricing data
1133    // matching the format expected by the A2A server's /models and /workers endpoints
1134    let models_array: Vec<serde_json::Value> = models
1135        .iter()
1136        .flat_map(|(provider, model_infos)| {
1137            model_infos.iter().map(move |m| {
1138                let mut obj = serde_json::json!({
1139                    "id": format!("{}/{}", provider, m.id),
1140                    "name": &m.id,
1141                    "provider": provider,
1142                    "provider_id": provider,
1143                });
1144                if let Some(input_cost) = m.input_cost_per_million {
1145                    obj["input_cost_per_million"] = serde_json::json!(input_cost);
1146                }
1147                if let Some(output_cost) = m.output_cost_per_million {
1148                    obj["output_cost_per_million"] = serde_json::json!(output_cost);
1149                }
1150                obj
1151            })
1152        })
1153        .collect();
1154
1155    tracing::info!(
1156        "Registering worker with {} models from {} providers",
1157        models_array.len(),
1158        models.len()
1159    );
1160
1161    let hostname = std::env::var("HOSTNAME")
1162        .or_else(|_| std::env::var("COMPUTERNAME"))
1163        .unwrap_or_else(|_| "unknown".to_string());
1164    let k8s_node_name = std::env::var("K8S_NODE_NAME")
1165        .ok()
1166        .map(|value| value.trim().to_string())
1167        .filter(|value| !value.is_empty());
1168
1169    // Collect agent definitions from the builtin registry
1170    let registry = crate::agent::AgentRegistry::with_builtins();
1171    let agent_defs: Vec<serde_json::Value> = registry
1172        .list()
1173        .iter()
1174        .map(|info| {
1175            serde_json::json!({
1176                "name": info.name,
1177                "description": info.description,
1178                "mode": format!("{:?}", info.mode).to_lowercase(),
1179                "native": info.native,
1180                "hidden": info.hidden,
1181                "model": info.model,
1182                "temperature": info.temperature,
1183                "top_p": info.top_p,
1184                "max_steps": info.max_steps,
1185            })
1186        })
1187        .collect();
1188
1189    let res = req
1190        .json(&serde_json::json!({
1191            "worker_id": worker_id,
1192            "name": name,
1193            "capabilities": worker_capabilities(),
1194            "hostname": hostname,
1195            "k8s_node_name": k8s_node_name,
1196            "models": models_array,
1197            "workspaces": codebases,
1198            "interfaces": advertised_interfaces(public_url),
1199            "agents": agent_defs,
1200        }))
1201        .send()
1202        .await?;
1203
1204    if res.status().is_success() {
1205        tracing::info!("Worker registered successfully");
1206    } else {
1207        tracing::warn!("Failed to register worker: {}", res.status());
1208    }
1209
1210    Ok(())
1211}
1212
1213/// Load ProviderRegistry and collect all available models grouped by provider.
1214/// Tries Vault first, then falls back to config/env vars if Vault is unreachable.
1215/// Returns ModelInfo structs (with pricing data when available).
1216///
1217/// Result is cached process-wide: repeated calls (e.g. TUI startup +
1218/// background worker registration) do not re-hit every provider's
1219/// `/models` endpoint. The cache is populated exactly once per process
1220/// on the first call, so concurrent callers share one network fanout.
1221async fn load_provider_models() -> Result<HashMap<String, Vec<crate::provider::ModelInfo>>> {
1222    use tokio::sync::OnceCell;
1223    static CACHE: OnceCell<HashMap<String, Vec<crate::provider::ModelInfo>>> =
1224        OnceCell::const_new();
1225    CACHE
1226        .get_or_try_init(|| async { load_provider_models_uncached().await })
1227        .await
1228        .cloned()
1229}
1230
1231async fn load_provider_models_uncached() -> Result<HashMap<String, Vec<crate::provider::ModelInfo>>>
1232{
1233    // Try Vault first
1234    let registry = match ProviderRegistry::from_vault().await {
1235        Ok(r) if !r.list().is_empty() => {
1236            tracing::info!("Loaded {} providers from Vault", r.list().len());
1237            r
1238        }
1239        Ok(_) => {
1240            tracing::warn!("Vault returned 0 providers, falling back to config/env vars");
1241            fallback_registry().await?
1242        }
1243        Err(e) => {
1244            tracing::warn!("Vault unreachable ({}), falling back to config/env vars", e);
1245            fallback_registry().await?
1246        }
1247    };
1248
1249    let mut models_by_provider: HashMap<String, Vec<crate::provider::ModelInfo>> = HashMap::new();
1250
1251    for provider_name in registry.list() {
1252        if let Some(provider) = registry.get(provider_name) {
1253            match provider.list_models().await {
1254                Ok(models) => {
1255                    if !models.is_empty() {
1256                        tracing::debug!("Provider {}: {} models", provider_name, models.len());
1257                        models_by_provider.insert(provider_name.to_string(), models);
1258                    }
1259                }
1260                Err(e) => {
1261                    tracing::debug!("Failed to list models for {}: {}", provider_name, e);
1262                }
1263            }
1264        }
1265    }
1266
1267    Ok(models_by_provider)
1268}
1269
1270/// Fallback: build a ProviderRegistry from config file + environment variables
1271async fn fallback_registry() -> Result<ProviderRegistry> {
1272    let config = crate::config::Config::load().await.unwrap_or_default();
1273    ProviderRegistry::from_config(&config).await
1274}
1275
1276async fn fetch_pending_tasks(runtime: &WorkerTaskRuntime) -> Result<()> {
1277    tracing::info!("Checking for pending tasks...");
1278
1279    let mut req = runtime
1280        .client
1281        .get(format!("{}/v1/agent/tasks?status=pending", runtime.server));
1282    if let Some(t) = &runtime.token {
1283        req = req.bearer_auth(t);
1284    }
1285
1286    let res = req.send().await?;
1287    if !res.status().is_success() {
1288        return Ok(());
1289    }
1290
1291    let data: serde_json::Value = res.json().await?;
1292    // Handle both plain array response and {tasks: [...]} wrapper
1293    let tasks = if let Some(arr) = data.as_array() {
1294        arr.clone()
1295    } else {
1296        data["tasks"].as_array().cloned().unwrap_or_default()
1297    };
1298
1299    tracing::info!("Found {} pending task(s)", tasks.len());
1300
1301    for task in tasks {
1302        if let Some(id) = task["id"].as_str() {
1303            match reserve_task_slot(&runtime.processing, id, runtime.max_concurrent_tasks).await {
1304                TaskReservation::Reserved => {
1305                    let task_id = id.to_string();
1306                    let runtime = runtime.clone();
1307
1308                    tokio::spawn(async move {
1309                        if let Err(e) = handle_task(&runtime, &task).await {
1310                            tracing::error!("Task {} failed: {}", task_id, e);
1311                        }
1312                        runtime.processing.lock().await.remove(&task_id);
1313                    });
1314                }
1315                TaskReservation::AlreadyProcessing => {}
1316                TaskReservation::AtCapacity => {
1317                    tracing::debug!(
1318                        max_concurrent_tasks = runtime.max_concurrent_tasks,
1319                        "Worker is at task capacity; leaving remaining tasks pending"
1320                    );
1321                    break;
1322                }
1323            }
1324        }
1325    }
1326
1327    Ok(())
1328}
1329
1330async fn connect_stream(
1331    runtime: &WorkerTaskRuntime,
1332    name: &str,
1333    codebases: &[String],
1334    task_notify_rx: Option<mpsc::Receiver<String>>,
1335) -> Result<()> {
1336    let url = format!(
1337        "{}/v1/worker/tasks/stream?agent_name={}&worker_id={}",
1338        runtime.server,
1339        urlencoding::encode(name),
1340        urlencoding::encode(&runtime.worker_id)
1341    );
1342
1343    let mut req = runtime
1344        .client
1345        .get(&url)
1346        .header("Accept", "text/event-stream")
1347        .header("X-Worker-ID", &runtime.worker_id)
1348        .header("X-Agent-Name", name)
1349        .header("X-Codebases", codebases.join(","))
1350        .header("X-Workspaces", codebases.join(","));
1351
1352    if let Some(t) = &runtime.token {
1353        req = req.bearer_auth(t);
1354    }
1355
1356    let res = req.send().await?;
1357    if !res.status().is_success() {
1358        anyhow::bail!("Failed to connect: {}", res.status());
1359    }
1360
1361    tracing::info!("Connected to A2A server");
1362
1363    let mut stream = res.bytes_stream();
1364    let mut buffer = String::new();
1365    let mut poll_interval = tokio::time::interval(tokio::time::Duration::from_secs(30));
1366    poll_interval.tick().await; // consume the initial immediate tick
1367
1368    // Pin the optional receiver so we can use it in the loop
1369    let mut task_notify_rx = task_notify_rx;
1370
1371    loop {
1372        tokio::select! {
1373            // Handle task notification from CloudEvent (Knative Eventing)
1374            // Only if the channel was provided (i.e., running with HTTP server)
1375            task_id = async {
1376                if let Some(ref mut rx) = task_notify_rx {
1377                    rx.recv().await
1378                } else {
1379                    // Never ready when None - use pending to skip this branch
1380                    futures::future::pending().await
1381                }
1382            } => {
1383                if let Some(task_id) = task_id {
1384                    tracing::info!("Received task notification via CloudEvent: {}", task_id);
1385                    // Immediately poll for and process this task
1386                    if let Err(e) = poll_pending_tasks(runtime).await {
1387                        tracing::warn!("Task notification poll failed: {}", e);
1388                    }
1389                }
1390            }
1391            chunk = stream.next() => {
1392                match chunk {
1393                    Some(Ok(chunk)) => {
1394                        buffer.push_str(&String::from_utf8_lossy(&chunk));
1395
1396                        // Process SSE events
1397                        while let Some(pos) = buffer.find("\n\n") {
1398                            let event_str = buffer[..pos].to_string();
1399                            buffer = buffer[pos + 2..].to_string();
1400
1401                            if let Some(data_line) = event_str.lines().find(|l| l.starts_with("data:")) {
1402                                let data = data_line.trim_start_matches("data:").trim();
1403                                if data == "[DONE]" || data.is_empty() {
1404                                    continue;
1405                                }
1406
1407                                if let Ok(task) = serde_json::from_str::<serde_json::Value>(data) {
1408                                    spawn_task_handler(&task, runtime).await;
1409                                }
1410                            }
1411                        }
1412                    }
1413                    Some(Err(e)) => {
1414                        return Err(e.into());
1415                    }
1416                    None => {
1417                        // Stream ended
1418                        return Ok(());
1419                    }
1420                }
1421            }
1422            _ = poll_interval.tick() => {
1423                // Periodic poll for pending tasks the SSE stream may have missed
1424                if let Err(e) = poll_pending_tasks(runtime).await {
1425                    tracing::warn!("Periodic task poll failed: {}", e);
1426                }
1427            }
1428        }
1429    }
1430}
1431
1432async fn spawn_task_handler(task: &serde_json::Value, runtime: &WorkerTaskRuntime) {
1433    if let Some(id) = task
1434        .get("task")
1435        .and_then(|t| t["id"].as_str())
1436        .or_else(|| task["id"].as_str())
1437    {
1438        match reserve_task_slot(&runtime.processing, id, runtime.max_concurrent_tasks).await {
1439            TaskReservation::Reserved => {
1440                let task_id = id.to_string();
1441                let task = task.clone();
1442                let runtime = runtime.clone();
1443
1444                tokio::spawn(async move {
1445                    if let Err(e) = handle_task(&runtime, &task).await {
1446                        tracing::error!("Task {} failed: {}", task_id, e);
1447                    }
1448                    runtime.processing.lock().await.remove(&task_id);
1449                });
1450            }
1451            TaskReservation::AlreadyProcessing => {}
1452            TaskReservation::AtCapacity => {
1453                tracing::debug!(
1454                    task_id = id,
1455                    max_concurrent_tasks = runtime.max_concurrent_tasks,
1456                    "Worker is at task capacity; task will stay pending until a slot frees up"
1457                );
1458            }
1459        }
1460    }
1461}
1462
1463async fn poll_pending_tasks(runtime: &WorkerTaskRuntime) -> Result<()> {
1464    let mut req = runtime
1465        .client
1466        .get(format!("{}/v1/agent/tasks?status=pending", runtime.server));
1467    if let Some(t) = &runtime.token {
1468        req = req.bearer_auth(t);
1469    }
1470
1471    let res = req.send().await?;
1472    if !res.status().is_success() {
1473        return Ok(());
1474    }
1475
1476    let data: serde_json::Value = res.json().await?;
1477    let tasks = if let Some(arr) = data.as_array() {
1478        arr.clone()
1479    } else {
1480        data["tasks"].as_array().cloned().unwrap_or_default()
1481    };
1482
1483    if !tasks.is_empty() {
1484        tracing::debug!("Poll found {} pending task(s)", tasks.len());
1485    }
1486
1487    for task in &tasks {
1488        spawn_task_handler(task, runtime).await;
1489    }
1490
1491    Ok(())
1492}
1493
1494async fn handle_task(runtime: &WorkerTaskRuntime, task: &serde_json::Value) -> Result<()> {
1495    let task_id = task_str(task, "id").ok_or_else(|| anyhow::anyhow!("No task ID"))?;
1496    let title = task_str(task, "title").unwrap_or("Untitled");
1497
1498    tracing::info!("Handling task: {} ({})", title, task_id);
1499
1500    // Claim the task
1501    let mut req = runtime
1502        .client
1503        .post(format!("{}/v1/worker/tasks/claim", runtime.server))
1504        .header("X-Worker-ID", &runtime.worker_id);
1505    if let Some(t) = &runtime.token {
1506        req = req.bearer_auth(t);
1507    }
1508
1509    let res = req
1510        .json(&serde_json::json!({ "task_id": task_id }))
1511        .send()
1512        .await?;
1513
1514    if !res.status().is_success() {
1515        let status = res.status();
1516        let text = res.text().await?;
1517        if status == reqwest::StatusCode::CONFLICT {
1518            tracing::debug!(task_id, "Task already claimed by another worker, skipping");
1519        } else {
1520            tracing::warn!(task_id, %status, "Failed to claim task: {}", text);
1521        }
1522        return Ok(());
1523    }
1524
1525    let claim = res.json::<TaskClaimResponse>().await?;
1526    let claim_provenance = claim.into_provenance();
1527    tracing::info!(
1528        task_id,
1529        run_id = ?claim_provenance.run_id,
1530        attempt_id = ?claim_provenance.attempt_id,
1531        "Claimed task"
1532    );
1533
1534    // --- All post-claim work is wrapped so we always release the task ---
1535    let inner_result: Result<(&str, Option<String>, Option<String>, Option<String>)> =
1536        execute_claimed_task(runtime, task, task_id, title, &claim_provenance).await;
1537
1538    let (status, result, error, session_id) = match inner_result {
1539        Ok(tuple) => tuple,
1540        Err(e) => {
1541            tracing::error!(
1542                task_id,
1543                error = %e,
1544                "Task failed after claim (releasing as failed)"
1545            );
1546            (
1547                "failed",
1548                None,
1549                Some(format!("Worker error after claim: {}", e)),
1550                None,
1551            )
1552        }
1553    };
1554
1555    release_task_result(
1556        &runtime.client,
1557        &runtime.server,
1558        &runtime.token,
1559        &runtime.worker_id,
1560        task_id,
1561        status,
1562        result,
1563        error,
1564        session_id,
1565    )
1566    .await?;
1567
1568    tracing::info!("Task released: {} with status: {}", task_id, status);
1569    Ok(())
1570}
1571
1572/// Inner logic for a claimed task. Returns (status, result, error, session_id).
1573/// Extracted so that `handle_task` can catch any error and always release.
1574#[allow(clippy::too_many_lines)]
1575async fn execute_claimed_task<'a>(
1576    runtime: &WorkerTaskRuntime,
1577    task: &'a serde_json::Value,
1578    task_id: &'a str,
1579    title: &'a str,
1580    claim_provenance: &ClaimProvenance,
1581) -> Result<(&'static str, Option<String>, Option<String>, Option<String>)> {
1582    let metadata = task_metadata(task);
1583    let resume_session_id = metadata
1584        .get("resume_session_id")
1585        .and_then(|v| v.as_str())
1586        .map(|s| s.trim().to_string())
1587        .filter(|s| !s.is_empty());
1588    let complexity_hint = metadata_str(&metadata, &["complexity"]);
1589    let model_tier = metadata_str(&metadata, &["model_tier", "tier"])
1590        .map(|s| s.to_ascii_lowercase())
1591        .or_else(|| {
1592            complexity_hint.as_ref().map(|complexity| {
1593                match complexity.to_ascii_lowercase().as_str() {
1594                    "quick" => "fast".to_string(),
1595                    "deep" => "heavy".to_string(),
1596                    _ => "balanced".to_string(),
1597                }
1598            })
1599        });
1600    let worker_personality = metadata_str(
1601        &metadata,
1602        &["worker_personality", "personality", "agent_personality"],
1603    );
1604    let target_agent_name = metadata_str(&metadata, &["target_agent_name", "agent_name"]);
1605    let raw_model = task_str(task, "model_ref")
1606        .or_else(|| metadata_lookup(&metadata, "model_ref").and_then(|v| v.as_str()))
1607        .or_else(|| task_str(task, "model"))
1608        .or_else(|| metadata_lookup(&metadata, "model").and_then(|v| v.as_str()));
1609    let selected_model = raw_model.map(model_ref_to_provider_model);
1610    let raw_agent = task_str(task, "agent_type")
1611        .or_else(|| task_str(task, "agent"))
1612        .unwrap_or("build");
1613    let prompt = task_str(task, "prompt")
1614        .or_else(|| task_str(task, "description"))
1615        .unwrap_or(title);
1616
1617    // Detect virtual/global workspace tasks dispatched via the API.
1618    let workspace_id = task_str(task, "workspace_id")
1619        .or_else(|| metadata_lookup(&metadata, "workspace_id").and_then(|v| v.as_str()));
1620    let is_virtual_task = workspace_id.map_or(false, |ws| ws == "global" || ws.is_empty());
1621
1622    if raw_agent.eq_ignore_ascii_case("clone_repo") {
1623        return match handle_clone_repo_task(
1624            &runtime.client,
1625            &runtime.server,
1626            &runtime.token,
1627            &runtime.worker_id,
1628            task,
1629            &metadata,
1630        )
1631        .await
1632        {
1633            Ok(message) => Ok(("completed", Some(message), None, None)),
1634            Err(err) => {
1635                tracing::error!(task_id, error = %err, "Clone task failed");
1636                Ok(("failed", None, Some(format!("Error: {}", err)), None))
1637            }
1638        };
1639    }
1640
1641    if is_forage_agent(raw_agent) {
1642        return match handle_forage_task(title, prompt, &metadata, selected_model.clone()).await {
1643            Ok(message) => Ok(("completed", Some(message), None, None)),
1644            Err(err) => {
1645                tracing::error!(task_id, error = %err, "Forage task failed");
1646                Ok(("failed", None, Some(format!("Error: {}", err)), None))
1647            }
1648        };
1649    }
1650
1651    // Resume existing session when requested; fall back to a fresh session if missing.
1652    let mut session = if let Some(ref sid) = resume_session_id {
1653        match Session::load(sid).await {
1654            Ok(existing) => {
1655                tracing::info!("Resuming session {} for task {}", sid, task_id);
1656                existing
1657            }
1658            Err(e) => {
1659                tracing::warn!(
1660                    "Could not load session {} for task {} ({}), starting a new session",
1661                    sid,
1662                    task_id,
1663                    e
1664                );
1665                Session::new().await?
1666            }
1667        }
1668    } else {
1669        Session::new().await?
1670    };
1671
1672    if !is_virtual_task {
1673        if let Some(workspace_path) = resolve_task_workspace_dir(
1674            &runtime.client,
1675            &runtime.server,
1676            &runtime.token,
1677            workspace_id,
1678        )
1679        .await?
1680        {
1681            session.metadata.directory = Some(workspace_path);
1682        }
1683    }
1684
1685    // For virtual/global tasks, clear the workspace directory so tools don't
1686    // try to operate on a workspace that doesn't exist on this worker.
1687    if is_virtual_task {
1688        tracing::info!(
1689            task_id,
1690            workspace_id = workspace_id.unwrap_or("(none)"),
1691            "Virtual task detected — skipping workspace setup"
1692        );
1693        session.metadata.directory = None;
1694    }
1695
1696    // Normalize agent: only "build", "plan", and swarm types are valid.
1697    // Map deprecated/unknown agent types (e.g. "general", "explore") to "build".
1698    let agent_type = if is_swarm_agent(raw_agent) {
1699        raw_agent
1700    } else {
1701        match raw_agent {
1702            "build" | "plan" => raw_agent,
1703            other => {
1704                tracing::info!(
1705                    "Agent \"{}\" is not a primary agent, falling back to \"build\"",
1706                    other
1707                );
1708                "build"
1709            }
1710        }
1711    };
1712    session.set_agent_name(agent_type.to_string());
1713    session.attach_claim_provenance(claim_provenance);
1714
1715    // Skip git hook installation for virtual tasks (no workspace directory).
1716    if !is_virtual_task {
1717        if let Some(directory) = session.metadata.directory.as_deref()
1718            && let Err(err) = install_commit_msg_hook(directory)
1719        {
1720            tracing::warn!(task_id, error = %err, "Failed to install commit-msg hook");
1721        }
1722    }
1723
1724    if let Some(model) = selected_model.clone() {
1725        session.metadata.model = Some(model);
1726    }
1727
1728    tracing::info!(task_id, agent_type, "Executing prompt: {}", prompt);
1729
1730    // Set up output streaming to forward progress to the server and bus
1731    let stream_client = runtime.client.clone();
1732    let stream_server = runtime.server.clone();
1733    let stream_token = runtime.token.clone();
1734    let stream_worker_id = runtime.worker_id.clone();
1735    let stream_task_id = task_id.to_string();
1736    let stream_bus = Arc::clone(&runtime.bus);
1737
1738    let output_callback: Arc<dyn Fn(String) + Send + Sync + 'static> =
1739        Arc::new(move |output: String| {
1740            let c = stream_client.clone();
1741            let s = stream_server.clone();
1742            let t = stream_token.clone();
1743            let w = stream_worker_id.clone();
1744            let tid = stream_task_id.clone();
1745
1746            let bus_handle = stream_bus.handle("task-output");
1747            bus_handle.send(
1748                format!("task.{}", tid),
1749                crate::bus::BusMessage::TaskUpdate {
1750                    task_id: tid.clone(),
1751                    state: crate::a2a::types::TaskState::Working,
1752                    message: Some(output.clone()),
1753                },
1754            );
1755
1756            tokio::spawn(async move {
1757                let mut req = c
1758                    .post(format!("{}/v1/agent/tasks/{}/output", s, tid))
1759                    .header("X-Worker-ID", &w);
1760                if let Some(tok) = &t {
1761                    req = req.bearer_auth(tok);
1762                }
1763                let _ = req
1764                    .json(&serde_json::json!({
1765                        "worker_id": w,
1766                        "output": output,
1767                    }))
1768                    .send()
1769                    .await;
1770            });
1771        });
1772
1773    // Execute swarm tasks via SwarmExecutor; all other agents use the standard session loop.
1774    let (status, result, error, session_id) = if is_swarm_agent(agent_type) {
1775        match execute_swarm_with_policy(
1776            &mut session,
1777            prompt,
1778            model_tier.as_deref(),
1779            selected_model,
1780            &metadata,
1781            complexity_hint.as_deref(),
1782            worker_personality.as_deref(),
1783            target_agent_name.as_deref(),
1784            Some(&runtime.bus),
1785            Some(Arc::clone(&output_callback)),
1786        )
1787        .await
1788        {
1789            Ok((session_result, true)) => {
1790                tracing::info!("Swarm task completed successfully: {}", task_id);
1791                (
1792                    "completed",
1793                    Some(session_result.text),
1794                    None,
1795                    Some(session_result.session_id),
1796                )
1797            }
1798            Ok((session_result, false)) => {
1799                tracing::warn!("Swarm task completed with failures: {}", task_id);
1800                (
1801                    "failed",
1802                    Some(session_result.text),
1803                    Some("Swarm execution completed with failures".to_string()),
1804                    Some(session_result.session_id),
1805                )
1806            }
1807            Err(e) => {
1808                tracing::error!("Swarm task failed: {} - {}", task_id, e);
1809                ("failed", None, Some(format!("Error: {}", e)), None)
1810            }
1811        }
1812    } else {
1813        match execute_session_with_policy(
1814            &mut session,
1815            prompt,
1816            runtime.auto_approve,
1817            model_tier.as_deref(),
1818            Some(Arc::clone(&output_callback)),
1819        )
1820        .await
1821        {
1822            Ok(session_result) => {
1823                tracing::info!("Task completed successfully: {}", task_id);
1824                (
1825                    "completed",
1826                    Some(session_result.text),
1827                    None,
1828                    Some(session_result.session_id),
1829                )
1830            }
1831            Err(e) => {
1832                tracing::error!(task_id, error = %e, "Task execution failed");
1833                ("failed", None, Some(format!("Error: {}", e)), None)
1834            }
1835        }
1836    };
1837
1838    Ok((
1839        status,
1840        result,
1841        error,
1842        Some(session_id.unwrap_or_else(|| session.id.clone())),
1843    ))
1844}
1845
1846async fn handle_forage_task(
1847    title: &str,
1848    prompt: &str,
1849    metadata: &serde_json::Map<String, serde_json::Value>,
1850    selected_model: Option<String>,
1851) -> Result<String> {
1852    let forage_args = build_forage_args(prompt, title, metadata, selected_model);
1853    let summary = crate::forage::execute_with_summary(forage_args).await?;
1854    Ok(summary.render_text())
1855}
1856
1857async fn handle_clone_repo_task(
1858    client: &Client,
1859    server: &str,
1860    token: &Option<String>,
1861    worker_id: &str,
1862    task: &serde_json::Value,
1863    metadata: &serde_json::Map<String, serde_json::Value>,
1864) -> Result<String> {
1865    let workspace_id = metadata_str(metadata, &["workspace_id"])
1866        .or_else(|| task_str(task, "workspace_id").map(|value| value.to_string()))
1867        .ok_or_else(|| anyhow::anyhow!("Clone task is missing workspace_id metadata"))?;
1868    let workspace = fetch_workspace_record(client, server, token, &workspace_id).await?;
1869    let git_url = metadata_str(metadata, &["git_url"])
1870        .or_else(|| workspace.git_url.clone())
1871        .ok_or_else(|| anyhow::anyhow!("Workspace {} is missing git_url", workspace_id))?;
1872    let branch = metadata_str(metadata, &["git_branch"])
1873        .or_else(|| workspace.git_branch.clone())
1874        .unwrap_or_else(|| "main".to_string());
1875    let repo_path = resolve_workspace_clone_path(&workspace_id, &workspace.path);
1876
1877    if let Some(parent) = repo_path.parent() {
1878        tokio::fs::create_dir_all(parent)
1879            .await
1880            .with_context(|| format!("Failed to create repo parent {}", parent.display()))?;
1881    }
1882
1883    let temp_helper_path =
1884        git_clone_base_dir().join(format!(".{}-git-credential-helper", workspace_id));
1885    write_git_credential_helper_script(&temp_helper_path, &workspace_id)?;
1886
1887    let clone_result = async {
1888        let git_dir = repo_path.join(".git");
1889        if git_dir.exists() {
1890            configure_repo_git_auth(&repo_path, &workspace_id)?;
1891            configure_repo_git_github_app_from_agent_config(
1892                &repo_path,
1893                Some(&serde_json::Value::Object(workspace.agent_config.clone())),
1894            );
1895            run_git_command_at(
1896                Some(&repo_path),
1897                vec!["fetch".to_string(), "origin".to_string(), branch.clone()],
1898            )
1899            .await?;
1900            run_git_command_at(
1901                Some(&repo_path),
1902                vec!["checkout".to_string(), branch.clone()],
1903            )
1904            .await?;
1905            run_git_command_at(
1906                Some(&repo_path),
1907                vec![
1908                    "pull".to_string(),
1909                    "--ff-only".to_string(),
1910                    "origin".to_string(),
1911                    branch.clone(),
1912                ],
1913            )
1914            .await?;
1915        } else {
1916            prepare_clone_target(&repo_path).await?;
1917
1918            run_git_command_at(
1919                None,
1920                vec![
1921                    "-c".to_string(),
1922                    format!("credential.helper={}", temp_helper_path.display()),
1923                    "-c".to_string(),
1924                    "credential.useHttpPath=true".to_string(),
1925                    "clone".to_string(),
1926                    "--single-branch".to_string(),
1927                    "--branch".to_string(),
1928                    branch.clone(),
1929                    git_url.clone(),
1930                    repo_path.display().to_string(),
1931                ],
1932            )
1933            .await?;
1934            configure_repo_git_auth(&repo_path, &workspace_id)?;
1935            configure_repo_git_github_app_from_agent_config(
1936                &repo_path,
1937                Some(&serde_json::Value::Object(workspace.agent_config.clone())),
1938            );
1939        }
1940        install_commit_msg_hook(&repo_path)?;
1941
1942        register_cloned_workspace(client, server, token, worker_id, &workspace, &repo_path).await?;
1943        enqueue_post_clone_task(client, server, token, worker_id, &workspace_id, metadata).await?;
1944
1945        Ok::<String, anyhow::Error>(format!(
1946            "Repository ready at {} (branch: {})",
1947            repo_path.display(),
1948            branch
1949        ))
1950    }
1951    .await;
1952
1953    let _ = tokio::fs::remove_file(&temp_helper_path).await;
1954    clone_result
1955}
1956
1957async fn register_cloned_workspace(
1958    client: &Client,
1959    server: &str,
1960    token: &Option<String>,
1961    worker_id: &str,
1962    workspace: &RegisteredWorkspaceRecord,
1963    repo_path: &Path,
1964) -> Result<()> {
1965    let mut req = client.post(format!(
1966        "{}/v1/agent/workspaces",
1967        server.trim_end_matches('/')
1968    ));
1969    if let Some(token) = token {
1970        req = req.bearer_auth(token);
1971    }
1972
1973    let response = req
1974        .json(&serde_json::json!({
1975            "workspace_id": workspace.id,
1976            "name": workspace.name,
1977            "path": repo_path.display().to_string(),
1978            "description": workspace.description,
1979            "agent_config": workspace.agent_config,
1980            "git_url": workspace.git_url,
1981            "git_branch": workspace.git_branch,
1982            "worker_id": worker_id,
1983        }))
1984        .send()
1985        .await
1986        .context("Failed to register cloned workspace with server")?;
1987    let status = response.status();
1988    if !status.is_success() {
1989        let body = response.text().await.unwrap_or_default();
1990        anyhow::bail!(
1991            "Failed to register cloned workspace {} ({}): {}",
1992            workspace.id,
1993            status,
1994            body
1995        );
1996    }
1997
1998    Ok(())
1999}
2000
2001async fn enqueue_post_clone_task(
2002    client: &Client,
2003    server: &str,
2004    token: &Option<String>,
2005    worker_id: &str,
2006    workspace_id: &str,
2007    metadata: &serde_json::Map<String, serde_json::Value>,
2008) -> Result<()> {
2009    let Some(post_clone_task) = metadata.get("post_clone_task") else {
2010        return Ok(());
2011    };
2012    let Some(task) = post_clone_task.as_object() else {
2013        return Ok(());
2014    };
2015    let title = task
2016        .get("title")
2017        .and_then(|value| value.as_str())
2018        .filter(|value| !value.trim().is_empty())
2019        .ok_or_else(|| anyhow::anyhow!("post_clone_task is missing title"))?;
2020    let prompt = task
2021        .get("prompt")
2022        .and_then(|value| value.as_str())
2023        .filter(|value| !value.trim().is_empty())
2024        .ok_or_else(|| anyhow::anyhow!("post_clone_task is missing prompt"))?;
2025    let agent_type = task
2026        .get("agent_type")
2027        .and_then(|value| value.as_str())
2028        .unwrap_or("build");
2029    let mut task_metadata = task
2030        .get("metadata")
2031        .cloned()
2032        .unwrap_or_else(|| serde_json::json!({}));
2033    if let Some(task_metadata_obj) = task_metadata.as_object_mut() {
2034        task_metadata_obj
2035            .entry("target_worker_id".to_string())
2036            .or_insert_with(|| serde_json::Value::String(worker_id.to_string()));
2037    }
2038
2039    let mut req = client.post(format!(
2040        "{}/v1/agent/workspaces/{}/tasks",
2041        server.trim_end_matches('/'),
2042        workspace_id
2043    ));
2044    if let Some(token) = token {
2045        req = req.bearer_auth(token);
2046    }
2047
2048    let response = req
2049        .json(&serde_json::json!({
2050            "title": title,
2051            "prompt": prompt,
2052            "agent_type": agent_type,
2053            "metadata": task_metadata,
2054        }))
2055        .send()
2056        .await
2057        .context("Failed to enqueue post-clone task")?;
2058    let status = response.status();
2059    if !status.is_success() {
2060        let body = response.text().await.unwrap_or_default();
2061        anyhow::bail!("Failed to enqueue post-clone task ({}): {}", status, body);
2062    }
2063    Ok(())
2064}
2065
2066async fn run_git_command_at(current_dir: Option<&Path>, args: Vec<String>) -> Result<String> {
2067    let mut command = Command::new("git");
2068    if let Some(dir) = current_dir {
2069        command.current_dir(dir);
2070    }
2071
2072    let output = command
2073        .args(args.iter().map(String::as_str))
2074        .output()
2075        .await
2076        .context("Failed to execute git command")?;
2077
2078    if output.status.success() {
2079        return Ok(String::from_utf8_lossy(&output.stdout).trim().to_string());
2080    }
2081
2082    Err(anyhow::anyhow!(
2083        "Git command failed: {}",
2084        String::from_utf8_lossy(&output.stderr).trim()
2085    ))
2086}
2087
2088async fn release_task_result(
2089    client: &Client,
2090    server: &str,
2091    token: &Option<String>,
2092    worker_id: &str,
2093    task_id: &str,
2094    status: &str,
2095    result: Option<String>,
2096    error: Option<String>,
2097    session_id: Option<String>,
2098) -> Result<()> {
2099    let mut req = client
2100        .post(format!("{}/v1/worker/tasks/release", server))
2101        .header("X-Worker-ID", worker_id);
2102    if let Some(token) = token {
2103        req = req.bearer_auth(token);
2104    }
2105
2106    req.json(&serde_json::json!({
2107        "task_id": task_id,
2108        "status": status,
2109        "result": result,
2110        "error": error,
2111        "session_id": session_id,
2112    }))
2113    .send()
2114    .await
2115    .context("Failed to release task result")?;
2116
2117    Ok(())
2118}
2119
2120async fn execute_swarm_with_policy(
2121    session: &mut Session,
2122    prompt: &str,
2123    model_tier: Option<&str>,
2124    explicit_model: Option<String>,
2125    metadata: &serde_json::Map<String, serde_json::Value>,
2126    complexity_hint: Option<&str>,
2127    worker_personality: Option<&str>,
2128    target_agent_name: Option<&str>,
2129    bus: Option<&Arc<AgentBus>>,
2130    output_callback: Option<Arc<dyn Fn(String) + Send + Sync + 'static>>,
2131) -> Result<(crate::session::SessionResult, bool)> {
2132    use crate::provider::{ContentPart, Message, Role};
2133
2134    session.add_message(Message {
2135        role: Role::User,
2136        content: vec![ContentPart::Text {
2137            text: prompt.to_string(),
2138        }],
2139    });
2140
2141    if session.title.is_none() {
2142        session.generate_title().await?;
2143    }
2144
2145    let strategy = parse_swarm_strategy(metadata);
2146    let max_subagents = metadata_usize(
2147        metadata,
2148        &["swarm_max_subagents", "max_subagents", "subagents"],
2149    )
2150    .unwrap_or(10)
2151    .clamp(1, 100);
2152    let max_steps_per_subagent = metadata_usize(
2153        metadata,
2154        &[
2155            "swarm_max_steps_per_subagent",
2156            "max_steps_per_subagent",
2157            "max_steps",
2158        ],
2159    )
2160    .unwrap_or(50)
2161    .clamp(1, 200);
2162    let timeout_secs = metadata_u64(metadata, &["swarm_timeout_secs", "timeout_secs", "timeout"])
2163        .unwrap_or(600)
2164        .clamp(30, 3600);
2165    let parallel_enabled =
2166        metadata_bool(metadata, &["swarm_parallel_enabled", "parallel_enabled"]).unwrap_or(true);
2167
2168    let model = resolve_swarm_model(explicit_model, model_tier).await;
2169    if let Some(ref selected_model) = model {
2170        session.metadata.model = Some(selected_model.clone());
2171    }
2172
2173    if let Some(ref cb) = output_callback {
2174        cb(format!(
2175            "[swarm] routing complexity={} tier={} personality={} target_agent={}",
2176            complexity_hint.unwrap_or("standard"),
2177            model_tier.unwrap_or("balanced"),
2178            worker_personality.unwrap_or("auto"),
2179            target_agent_name.unwrap_or("auto")
2180        ));
2181        cb(format!(
2182            "[swarm] config strategy={:?} max_subagents={} max_steps={} timeout={}s tier={}",
2183            strategy,
2184            max_subagents,
2185            max_steps_per_subagent,
2186            timeout_secs,
2187            model_tier.unwrap_or("balanced")
2188        ));
2189    }
2190
2191    let swarm_config = SwarmConfig {
2192        max_subagents,
2193        max_steps_per_subagent,
2194        subagent_timeout_secs: timeout_secs,
2195        parallel_enabled,
2196        model,
2197        working_dir: session
2198            .metadata
2199            .directory
2200            .as_ref()
2201            .map(|p| p.to_string_lossy().to_string()),
2202        ..Default::default()
2203    };
2204
2205    let swarm_result = if output_callback.is_some() {
2206        let (event_tx, mut event_rx) = mpsc::channel(256);
2207        let mut executor = SwarmExecutor::new(swarm_config).with_event_tx(event_tx);
2208        if let Some(bus) = bus {
2209            executor = executor.with_bus(Arc::clone(bus));
2210        }
2211        let prompt_owned = prompt.to_string();
2212        let mut exec_handle =
2213            tokio::spawn(async move { executor.execute(&prompt_owned, strategy).await });
2214
2215        let mut final_result: Option<crate::swarm::SwarmResult> = None;
2216
2217        while final_result.is_none() {
2218            tokio::select! {
2219                maybe_event = event_rx.recv() => {
2220                    if let Some(event) = maybe_event
2221                        && let Some(ref cb) = output_callback
2222                            && let Some(line) = format_swarm_event_for_output(&event) {
2223                                cb(line);
2224                            }
2225                }
2226                join_result = &mut exec_handle => {
2227                    let joined = join_result.map_err(|e| anyhow::anyhow!("Swarm join failure: {}", e))?;
2228                    final_result = Some(joined?);
2229                }
2230            }
2231        }
2232
2233        while let Ok(event) = event_rx.try_recv() {
2234            if let Some(ref cb) = output_callback
2235                && let Some(line) = format_swarm_event_for_output(&event)
2236            {
2237                cb(line);
2238            }
2239        }
2240
2241        final_result.ok_or_else(|| anyhow::anyhow!("Swarm execution returned no result"))?
2242    } else {
2243        let mut executor = SwarmExecutor::new(swarm_config);
2244        if let Some(bus) = bus {
2245            executor = executor.with_bus(Arc::clone(bus));
2246        }
2247        executor.execute(prompt, strategy).await?
2248    };
2249
2250    let final_text = if swarm_result.result.trim().is_empty() {
2251        if swarm_result.success {
2252            "Swarm completed without textual output.".to_string()
2253        } else {
2254            "Swarm finished with failures and no textual output.".to_string()
2255        }
2256    } else {
2257        swarm_result.result.clone()
2258    };
2259
2260    session.add_message(Message {
2261        role: Role::Assistant,
2262        content: vec![ContentPart::Text {
2263            text: final_text.clone(),
2264        }],
2265    });
2266    session.save().await?;
2267
2268    Ok((
2269        crate::session::SessionResult {
2270            text: final_text,
2271            session_id: session.id.clone(),
2272        },
2273        swarm_result.success,
2274    ))
2275}
2276
2277/// Execute a session with the given auto-approve policy
2278/// Optionally streams output chunks via the callback
2279async fn execute_session_with_policy(
2280    session: &mut Session,
2281    prompt: &str,
2282    auto_approve: AutoApprove,
2283    model_tier: Option<&str>,
2284    output_callback: Option<Arc<dyn Fn(String) + Send + Sync + 'static>>,
2285) -> Result<crate::session::SessionResult> {
2286    use crate::provider::{
2287        CompletionRequest, ContentPart, Message, ProviderRegistry, Role, parse_model_string,
2288    };
2289    use std::sync::Arc;
2290
2291    // Load provider registry from Vault
2292    let registry = ProviderRegistry::from_vault()
2293        .await
2294        .context("Failed to load provider registry from Vault — check VAULT_ADDR/VAULT_TOKEN")?;
2295    let providers = registry.list();
2296    tracing::info!("Available providers: {:?}", providers);
2297
2298    if providers.is_empty() {
2299        anyhow::bail!(
2300            "No LLM providers available (0 providers loaded). \
2301             Configure API keys in HashiCorp Vault or set environment variables. \
2302             Vault address: {}",
2303            std::env::var("VAULT_ADDR").unwrap_or_else(|_| "(not set)".into())
2304        );
2305    }
2306
2307    // Parse model string
2308    let (provider_name, model_id) = if let Some(ref model_str) = session.metadata.model {
2309        let (prov, model) = parse_model_string(model_str);
2310        let prov = prov.map(|p| if p == "zhipuai" { "zai" } else { p });
2311        if prov.is_some() {
2312            (prov.map(|s| s.to_string()), model.to_string())
2313        } else if providers.contains(&model) {
2314            (Some(model.to_string()), String::new())
2315        } else {
2316            (None, model.to_string())
2317        }
2318    } else {
2319        (None, String::new())
2320    };
2321
2322    let provider_slice = providers.as_slice();
2323    if let Some(explicit_provider) = provider_name.as_deref()
2324        && !providers.contains(&explicit_provider)
2325    {
2326        anyhow::bail!(
2327            "Provider '{}' selected explicitly but is unavailable. Available providers: {}",
2328            explicit_provider,
2329            providers.join(", ")
2330        );
2331    }
2332
2333    // Determine which provider to use, preferring explicit request first, then model tier.
2334    let selected_provider = provider_name
2335        .as_deref()
2336        .unwrap_or_else(|| choose_provider_for_tier(provider_slice, model_tier));
2337
2338    let provider = registry
2339        .get(selected_provider)
2340        .ok_or_else(|| anyhow::anyhow!("Provider {} not found", selected_provider))?;
2341
2342    // Add user message
2343    session.add_message(Message {
2344        role: Role::User,
2345        content: vec![ContentPart::Text {
2346            text: prompt.to_string(),
2347        }],
2348    });
2349
2350    // Generate title
2351    if session.title.is_none() {
2352        session.generate_title().await?;
2353    }
2354
2355    // Determine model.
2356    let model = if !model_id.is_empty() {
2357        model_id
2358    } else {
2359        default_model_for_provider(selected_provider, model_tier)
2360    };
2361
2362    // Create tool registry with filtering based on auto-approve policy
2363    let workspace_dir = session
2364        .metadata
2365        .directory
2366        .clone()
2367        .unwrap_or_else(|| std::env::current_dir().unwrap_or_default());
2368    let tool_registry = create_filtered_registry(
2369        Arc::clone(&provider),
2370        model.clone(),
2371        auto_approve,
2372        &workspace_dir,
2373        output_callback.clone(),
2374    );
2375    let tool_definitions = tool_registry.definitions();
2376
2377    let temperature = if prefers_temperature_one(&model) {
2378        Some(1.0)
2379    } else {
2380        Some(0.7)
2381    };
2382
2383    tracing::info!(
2384        "Using model: {} via provider: {} (tier: {:?})",
2385        model,
2386        selected_provider,
2387        model_tier
2388    );
2389    tracing::info!(
2390        "Available tools: {} (auto_approve: {:?})",
2391        tool_definitions.len(),
2392        auto_approve
2393    );
2394
2395    // Build system prompt
2396    let system_prompt = crate::agent::builtin::build_system_prompt(&workspace_dir);
2397
2398    let mut final_output = String::new();
2399    let max_steps = 50;
2400
2401    for step in 1..=max_steps {
2402        tracing::info!(step = step, "Agent step starting");
2403
2404        // Build messages with system prompt first
2405        let mut messages = vec![Message {
2406            role: Role::System,
2407            content: vec![ContentPart::Text {
2408                text: system_prompt.clone(),
2409            }],
2410        }];
2411        messages.extend(session.messages.clone());
2412
2413        let request = CompletionRequest {
2414            messages,
2415            tools: tool_definitions.clone(),
2416            model: model.clone(),
2417            temperature,
2418            top_p: None,
2419            max_tokens: Some(8192),
2420            stop: Vec::new(),
2421        };
2422
2423        let response = provider.complete(request).await?;
2424
2425        crate::telemetry::TOKEN_USAGE.record_model_usage(
2426            &model,
2427            response.usage.prompt_tokens as u64,
2428            response.usage.completion_tokens as u64,
2429        );
2430
2431        // Extract tool calls
2432        let tool_calls: Vec<(String, String, serde_json::Value)> = response
2433            .message
2434            .content
2435            .iter()
2436            .filter_map(|part| {
2437                if let ContentPart::ToolCall {
2438                    id,
2439                    name,
2440                    arguments,
2441                    ..
2442                } = part
2443                {
2444                    let args: serde_json::Value =
2445                        serde_json::from_str(arguments).unwrap_or(serde_json::json!({}));
2446                    Some((id.clone(), name.clone(), args))
2447                } else {
2448                    None
2449                }
2450            })
2451            .collect();
2452
2453        // Collect text output and stream it
2454        for part in &response.message.content {
2455            if let ContentPart::Text { text } = part
2456                && !text.is_empty()
2457            {
2458                final_output.push_str(text);
2459                final_output.push('\n');
2460                if let Some(ref cb) = output_callback {
2461                    cb(text.clone());
2462                }
2463            }
2464        }
2465
2466        // If no tool calls, we're done
2467        if tool_calls.is_empty() {
2468            session.add_message(response.message.clone());
2469            break;
2470        }
2471
2472        session.add_message(response.message.clone());
2473
2474        tracing::info!(
2475            step = step,
2476            num_tools = tool_calls.len(),
2477            "Executing tool calls"
2478        );
2479
2480        // Execute each tool call
2481        for (tool_id, tool_name, tool_input) in tool_calls {
2482            tracing::info!(tool = %tool_name, tool_id = %tool_id, "Executing tool");
2483
2484            // Stream tool start event
2485            if let Some(ref cb) = output_callback {
2486                cb(format!("[tool:start:{}]", tool_name));
2487            }
2488
2489            // Check if tool is allowed based on auto-approve policy
2490            if !is_tool_allowed(&tool_name, auto_approve) {
2491                let msg = format!(
2492                    "Tool '{}' requires approval but auto-approve policy is {:?}",
2493                    tool_name, auto_approve
2494                );
2495                tracing::warn!(tool = %tool_name, "Tool blocked by auto-approve policy");
2496                session.add_message(Message {
2497                    role: Role::Tool,
2498                    content: vec![ContentPart::ToolResult {
2499                        tool_call_id: tool_id,
2500                        content: msg,
2501                    }],
2502                });
2503                continue;
2504            }
2505
2506            let content = if let Some(tool) = tool_registry.get(&tool_name) {
2507                let exec_result: Result<crate::tool::ToolResult> =
2508                    tool.execute(tool_input.clone()).await;
2509                match exec_result {
2510                    Ok(result) => {
2511                        tracing::info!(tool = %tool_name, success = result.success, "Tool execution completed");
2512                        if let Some(ref cb) = output_callback {
2513                            let status = if result.success { "ok" } else { "err" };
2514                            cb(format!(
2515                                "[tool:{}:{}] {}",
2516                                tool_name,
2517                                status,
2518                                crate::util::truncate_bytes_safe(&result.output, 500)
2519                            ));
2520                        }
2521                        result.output
2522                    }
2523                    Err(e) => {
2524                        tracing::warn!(tool = %tool_name, error = %e, "Tool execution failed");
2525                        if let Some(ref cb) = output_callback {
2526                            cb(format!("[tool:{}:err] {}", tool_name, e));
2527                        }
2528                        format!("Error: {}", e)
2529                    }
2530                }
2531            } else {
2532                tracing::warn!(tool = %tool_name, "Tool not found");
2533                format!("Error: Unknown tool '{}'", tool_name)
2534            };
2535
2536            session.add_message(Message {
2537                role: Role::Tool,
2538                content: vec![ContentPart::ToolResult {
2539                    tool_call_id: tool_id,
2540                    content,
2541                }],
2542            });
2543        }
2544    }
2545
2546    session.save().await?;
2547
2548    Ok(crate::session::SessionResult {
2549        text: final_output.trim().to_string(),
2550        session_id: session.id.clone(),
2551    })
2552}
2553
2554/// Start the heartbeat background task
2555/// Returns a JoinHandle that can be used to cancel the heartbeat
2556pub fn start_heartbeat(
2557    client: Client,
2558    server: String,
2559    token: Option<String>,
2560    heartbeat_state: HeartbeatState,
2561    processing: Arc<Mutex<HashSet<String>>>,
2562    cognition_config: CognitionHeartbeatConfig,
2563) -> JoinHandle<()> {
2564    tokio::spawn(async move {
2565        let mut consecutive_failures = 0u32;
2566        const MAX_FAILURES: u32 = 3;
2567        const HEARTBEAT_INTERVAL_SECS: u64 = 30;
2568        const COGNITION_RETRY_COOLDOWN_SECS: u64 = 300;
2569        let mut cognition_payload_disabled_until: Option<Instant> = None;
2570
2571        let mut interval =
2572            tokio::time::interval(tokio::time::Duration::from_secs(HEARTBEAT_INTERVAL_SECS));
2573        interval.set_missed_tick_behavior(tokio::time::MissedTickBehavior::Skip);
2574
2575        loop {
2576            interval.tick().await;
2577
2578            // Update task count from processing set
2579            let active_count = processing.lock().await.len();
2580            heartbeat_state.set_task_count(active_count).await;
2581
2582            // Determine status based on active tasks
2583            let status = if active_count > 0 {
2584                WorkerStatus::Processing
2585            } else {
2586                WorkerStatus::Idle
2587            };
2588            heartbeat_state.set_status(status).await;
2589
2590            // Send heartbeat
2591            let url = format!(
2592                "{}/v1/agent/workers/{}/heartbeat",
2593                server, heartbeat_state.worker_id
2594            );
2595            let mut req = client.post(&url);
2596
2597            if let Some(ref t) = token {
2598                req = req.bearer_auth(t);
2599            }
2600
2601            let status_str = heartbeat_state.status.lock().await.as_str().to_string();
2602            let sub_agents = heartbeat_state.sub_agents_snapshot().await;
2603            let base_payload = serde_json::json!({
2604                "worker_id": &heartbeat_state.worker_id,
2605                "agent_name": &heartbeat_state.agent_name,
2606                "status": status_str,
2607                "active_task_count": active_count,
2608                "sub_agents": sub_agents,
2609            });
2610            let mut payload = base_payload.clone();
2611            let mut included_cognition_payload = false;
2612            let cognition_payload_allowed = cognition_payload_disabled_until
2613                .map(|until| Instant::now() >= until)
2614                .unwrap_or(true);
2615
2616            if cognition_config.enabled
2617                && cognition_payload_allowed
2618                && let Some(cognition_payload) =
2619                    fetch_cognition_heartbeat_payload(&client, &cognition_config).await
2620                && let Some(obj) = payload.as_object_mut()
2621            {
2622                obj.insert("cognition".to_string(), cognition_payload);
2623                included_cognition_payload = true;
2624            }
2625
2626            match req.json(&payload).send().await {
2627                Ok(res) => {
2628                    if res.status().is_success() {
2629                        consecutive_failures = 0;
2630                        tracing::debug!(
2631                            worker_id = %heartbeat_state.worker_id,
2632                            status = status_str,
2633                            active_tasks = active_count,
2634                            "Heartbeat sent successfully"
2635                        );
2636                    } else if included_cognition_payload && res.status().is_client_error() {
2637                        tracing::warn!(
2638                            worker_id = %heartbeat_state.worker_id,
2639                            status = %res.status(),
2640                            "Heartbeat cognition payload rejected, retrying without cognition payload"
2641                        );
2642
2643                        let mut retry_req = client.post(&url);
2644                        if let Some(ref t) = token {
2645                            retry_req = retry_req.bearer_auth(t);
2646                        }
2647
2648                        match retry_req.json(&base_payload).send().await {
2649                            Ok(retry_res) if retry_res.status().is_success() => {
2650                                cognition_payload_disabled_until = Some(
2651                                    Instant::now()
2652                                        + Duration::from_secs(COGNITION_RETRY_COOLDOWN_SECS),
2653                                );
2654                                consecutive_failures = 0;
2655                                tracing::warn!(
2656                                    worker_id = %heartbeat_state.worker_id,
2657                                    retry_after_secs = COGNITION_RETRY_COOLDOWN_SECS,
2658                                    "Paused cognition heartbeat payload after schema rejection"
2659                                );
2660                            }
2661                            Ok(retry_res) => {
2662                                consecutive_failures += 1;
2663                                tracing::warn!(
2664                                    worker_id = %heartbeat_state.worker_id,
2665                                    status = %retry_res.status(),
2666                                    failures = consecutive_failures,
2667                                    "Heartbeat failed even after retry without cognition payload"
2668                                );
2669                            }
2670                            Err(e) => {
2671                                consecutive_failures += 1;
2672                                tracing::warn!(
2673                                    worker_id = %heartbeat_state.worker_id,
2674                                    error = %e,
2675                                    failures = consecutive_failures,
2676                                    "Heartbeat retry without cognition payload failed"
2677                                );
2678                            }
2679                        }
2680                    } else {
2681                        consecutive_failures += 1;
2682                        tracing::warn!(
2683                            worker_id = %heartbeat_state.worker_id,
2684                            status = %res.status(),
2685                            failures = consecutive_failures,
2686                            "Heartbeat failed"
2687                        );
2688                    }
2689                }
2690                Err(e) => {
2691                    consecutive_failures += 1;
2692                    tracing::warn!(
2693                        worker_id = %heartbeat_state.worker_id,
2694                        error = %e,
2695                        failures = consecutive_failures,
2696                        "Heartbeat request failed"
2697                    );
2698                }
2699            }
2700
2701            // Log error after 3 consecutive failures but do not terminate
2702            if consecutive_failures >= MAX_FAILURES {
2703                tracing::error!(
2704                    worker_id = %heartbeat_state.worker_id,
2705                    failures = consecutive_failures,
2706                    "Heartbeat failed {} consecutive times - worker will continue running and attempt reconnection via SSE loop",
2707                    MAX_FAILURES
2708                );
2709                // Reset counter to avoid spamming error logs
2710                consecutive_failures = 0;
2711            }
2712        }
2713    })
2714}
2715
2716async fn fetch_cognition_heartbeat_payload(
2717    client: &Client,
2718    config: &CognitionHeartbeatConfig,
2719) -> Option<serde_json::Value> {
2720    let status_url = format!("{}/v1/cognition/status", config.source_base_url);
2721    let status_res = tokio::time::timeout(
2722        Duration::from_millis(config.request_timeout_ms),
2723        client.get(status_url).send(),
2724    )
2725    .await
2726    .ok()?
2727    .ok()?;
2728
2729    if !status_res.status().is_success() {
2730        return None;
2731    }
2732
2733    let status: CognitionStatusSnapshot = status_res.json().await.ok()?;
2734    let mut payload = serde_json::json!({
2735        "running": status.running,
2736        "last_tick_at": status.last_tick_at,
2737        "active_persona_count": status.active_persona_count,
2738        "events_buffered": status.events_buffered,
2739        "snapshots_buffered": status.snapshots_buffered,
2740        "loop_interval_ms": status.loop_interval_ms,
2741    });
2742
2743    if config.include_thought_summary {
2744        let snapshot_url = format!("{}/v1/cognition/snapshots/latest", config.source_base_url);
2745        let snapshot_res = tokio::time::timeout(
2746            Duration::from_millis(config.request_timeout_ms),
2747            client.get(snapshot_url).send(),
2748        )
2749        .await
2750        .ok()
2751        .and_then(Result::ok);
2752
2753        if let Some(snapshot_res) = snapshot_res
2754            && snapshot_res.status().is_success()
2755            && let Ok(snapshot) = snapshot_res.json::<CognitionLatestSnapshot>().await
2756            && let Some(obj) = payload.as_object_mut()
2757        {
2758            obj.insert(
2759                "latest_snapshot_at".to_string(),
2760                serde_json::Value::String(snapshot.generated_at),
2761            );
2762            obj.insert(
2763                "latest_thought".to_string(),
2764                serde_json::Value::String(trim_for_heartbeat(
2765                    &snapshot.summary,
2766                    config.summary_max_chars,
2767                )),
2768            );
2769            if let Some(model) = snapshot
2770                .metadata
2771                .get("model")
2772                .and_then(serde_json::Value::as_str)
2773            {
2774                obj.insert(
2775                    "latest_thought_model".to_string(),
2776                    serde_json::Value::String(model.to_string()),
2777                );
2778            }
2779            if let Some(source) = snapshot
2780                .metadata
2781                .get("source")
2782                .and_then(serde_json::Value::as_str)
2783            {
2784                obj.insert(
2785                    "latest_thought_source".to_string(),
2786                    serde_json::Value::String(source.to_string()),
2787                );
2788            }
2789        }
2790    }
2791
2792    Some(payload)
2793}
2794
2795fn trim_for_heartbeat(input: &str, max_chars: usize) -> String {
2796    if input.chars().count() <= max_chars {
2797        return input.trim().to_string();
2798    }
2799
2800    let mut trimmed = String::with_capacity(max_chars + 3);
2801    for ch in input.chars().take(max_chars) {
2802        trimmed.push(ch);
2803    }
2804    trimmed.push_str("...");
2805    trimmed.trim().to_string()
2806}
2807
2808fn env_bool(name: &str, default: bool) -> bool {
2809    std::env::var(name)
2810        .ok()
2811        .and_then(|v| match v.to_ascii_lowercase().as_str() {
2812            "1" | "true" | "yes" | "on" => Some(true),
2813            "0" | "false" | "no" | "off" => Some(false),
2814            _ => None,
2815        })
2816        .unwrap_or(default)
2817}
2818
2819fn env_usize(name: &str, default: usize) -> usize {
2820    std::env::var(name)
2821        .ok()
2822        .and_then(|v| v.parse::<usize>().ok())
2823        .unwrap_or(default)
2824}
2825
2826fn env_u64(name: &str, default: u64) -> u64 {
2827    std::env::var(name)
2828        .ok()
2829        .and_then(|v| v.parse::<u64>().ok())
2830        .unwrap_or(default)
2831}
2832
2833fn maybe_configure_repo_git_auth_from_entry(entry: &serde_json::Value) {
2834    let Some(workspace_id) = entry.get("id").and_then(|v| v.as_str()) else {
2835        return;
2836    };
2837    let Some(path) = entry.get("path").and_then(|v| v.as_str()) else {
2838        return;
2839    };
2840    let has_git_remote = entry
2841        .get("git_url")
2842        .and_then(|v| v.as_str())
2843        .is_some_and(|value| !value.trim().is_empty());
2844    if !has_git_remote {
2845        return;
2846    }
2847
2848    let repo_path = Path::new(path);
2849    if !repo_path.join(".git").exists() {
2850        return;
2851    }
2852
2853    if let Err(error) = configure_repo_git_auth(repo_path, workspace_id) {
2854        tracing::debug!(
2855            workspace_id,
2856            path,
2857            error = %error,
2858            "Workspace sync could not install Git credential helper"
2859        );
2860    }
2861    configure_repo_git_github_app_from_agent_config(repo_path, entry.get("agent_config"));
2862}
2863
2864fn configure_repo_git_github_app_from_agent_config(
2865    repo_path: &Path,
2866    agent_config: Option<&serde_json::Value>,
2867) {
2868    let installation_id = agent_config
2869        .and_then(|value| value.get("git_auth"))
2870        .and_then(|value| value.get("github_app"))
2871        .and_then(|value| value.get("installation_id"))
2872        .and_then(|value| value.as_str());
2873    let app_id = agent_config
2874        .and_then(|value| value.get("git_auth"))
2875        .and_then(|value| value.get("github_app"))
2876        .and_then(|value| value.get("app_id"))
2877        .and_then(|value| value.as_str());
2878    if let Err(error) = configure_repo_git_github_app(repo_path, installation_id, app_id) {
2879        tracing::debug!(path = %repo_path.display(), error = %error, "Failed to persist GitHub App repo metadata");
2880    }
2881}
2882
2883/// Start a background task that periodically fetches the server's workspace list
2884/// and auto-registers any whose path exists on this machine's filesystem.
2885/// This lets workers pick up new codebases without restarting.
2886fn start_workspace_sync(
2887    client: Client,
2888    server: String,
2889    token: Option<String>,
2890    shared_codebases: Arc<Mutex<Vec<String>>>,
2891) -> JoinHandle<()> {
2892    tokio::spawn(async move {
2893        const POLL_INTERVAL_SECS: u64 = 60;
2894        let mut interval = tokio::time::interval(Duration::from_secs(POLL_INTERVAL_SECS));
2895        interval.set_missed_tick_behavior(tokio::time::MissedTickBehavior::Skip);
2896        interval.tick().await; // skip the immediate first tick -- let the worker settle first
2897
2898        loop {
2899            interval.tick().await;
2900            if let Err(e) =
2901                sync_workspaces_from_server(&client, &server, &token, &shared_codebases).await
2902            {
2903                tracing::warn!("Workspace sync failed: {}", e);
2904            }
2905        }
2906    })
2907}
2908
2909/// Fetch the server's workspace/codebase list and add any locally-present paths
2910/// that are not already in the worker's registered codebases.
2911/// New paths take effect on the next SSE reconnect (re-register re-sends X-Workspaces).
2912async fn sync_workspaces_from_server(
2913    client: &Client,
2914    server: &str,
2915    token: &Option<String>,
2916    shared_codebases: &Arc<Mutex<Vec<String>>>,
2917) -> Result<()> {
2918    let mut req = client.get(format!("{}/v1/agent/workspaces", server));
2919    if let Some(t) = token {
2920        req = req.bearer_auth(t);
2921    }
2922
2923    let res = req.send().await?;
2924    if !res.status().is_success() {
2925        tracing::debug!(
2926            status = %res.status(),
2927            "Workspace sync: server returned non-success, skipping"
2928        );
2929        return Ok(());
2930    }
2931
2932    let data: serde_json::Value = res.json().await?;
2933
2934    // Server returns { workspaces: [...] } or { codebases: [...] }
2935    let entries = if let Some(arr) = data.as_array() {
2936        arr.clone()
2937    } else {
2938        data["workspaces"]
2939            .as_array()
2940            .or_else(|| data["codebases"].as_array())
2941            .cloned()
2942            .unwrap_or_default()
2943    };
2944
2945    let mut new_paths: Vec<String> = Vec::new();
2946    {
2947        let current = shared_codebases.lock().await;
2948        for entry in &entries {
2949            maybe_configure_repo_git_auth_from_entry(entry);
2950            let path = match entry["path"].as_str().filter(|p| !p.is_empty()) {
2951                Some(p) => p,
2952                None => continue,
2953            };
2954            // Only auto-register if the path physically exists on this machine
2955            // and is not already in the codebases list
2956            if std::path::Path::new(path).exists() && !current.iter().any(|c| c.as_str() == path) {
2957                new_paths.push(path.to_string());
2958            }
2959        }
2960    }
2961
2962    if !new_paths.is_empty() {
2963        let mut current = shared_codebases.lock().await;
2964        for path in &new_paths {
2965            tracing::info!(
2966                path = %path,
2967                "Workspace sync: auto-discovered local path, adding to codebases"
2968            );
2969            current.push(path.clone());
2970        }
2971        tracing::info!(
2972            added = new_paths.len(),
2973            total = current.len(),
2974            "Workspace sync complete -- new paths take effect on next reconnect"
2975        );
2976    } else {
2977        tracing::debug!("Workspace sync: no new local paths found");
2978    }
2979
2980    Ok(())
2981}
2982
2983#[cfg(test)]
2984mod tests {
2985    use super::*;
2986    use serde_json::json;
2987
2988    #[test]
2989    fn metadata_lookup_reads_nested_forage_keys() {
2990        let metadata = json!({
2991            "forage": {
2992                "execute": true,
2993                "top": 5,
2994            }
2995        })
2996        .as_object()
2997        .cloned()
2998        .unwrap();
2999
3000        assert_eq!(metadata_bool(&metadata, &["execute"]), Some(true));
3001        assert_eq!(metadata_usize(&metadata, &["top"]), Some(5));
3002    }
3003
3004    #[test]
3005    fn build_forage_args_defaults_prompt_to_moonshot_and_local_mode() {
3006        let metadata = serde_json::Map::new();
3007        let args = build_forage_args(
3008            "Expand enterprise adoption in regulated markets",
3009            "forage task",
3010            &metadata,
3011            Some("openrouter/z-ai/glm-5".to_string()),
3012        );
3013
3014        assert_eq!(
3015            args.moonshots,
3016            vec!["Expand enterprise adoption in regulated markets".to_string()]
3017        );
3018        assert!(args.no_s3);
3019        assert_eq!(args.model.as_deref(), Some("openrouter/z-ai/glm-5"));
3020        assert_eq!(args.execution_engine, "run");
3021    }
3022
3023    #[test]
3024    fn build_forage_args_honors_nested_forage_configuration() {
3025        let metadata = json!({
3026            "forage": {
3027                "top": 7,
3028                "loop": true,
3029                "max_cycles": 2,
3030                "execute": true,
3031                "no_s3": false,
3032                "moonshots": ["Ship autonomous OKR execution", "Reduce operator toil"],
3033                "moonshot_required": true,
3034                "moonshot_min_alignment": "0.25",
3035                "execution_engine": "swarm",
3036                "run_timeout_secs": 1200,
3037                "fail_fast": true,
3038                "swarm_strategy": "stage",
3039                "swarm_max_subagents": 4,
3040                "swarm_max_steps": 42,
3041                "swarm_subagent_timeout_secs": 180
3042            }
3043        })
3044        .as_object()
3045        .cloned()
3046        .unwrap();
3047
3048        let args = build_forage_args("fallback prompt", "title", &metadata, None);
3049
3050        assert_eq!(args.top, 7);
3051        assert!(args.loop_mode);
3052        assert_eq!(args.max_cycles, 2);
3053        assert!(args.execute);
3054        assert!(!args.no_s3);
3055        assert_eq!(args.moonshots.len(), 2);
3056        assert!(args.moonshot_required);
3057        assert!((args.moonshot_min_alignment - 0.25).abs() < f64::EPSILON);
3058        assert_eq!(args.execution_engine, "swarm");
3059        assert_eq!(args.run_timeout_secs, 1200);
3060        assert!(args.fail_fast);
3061        assert_eq!(args.swarm_strategy, "stage");
3062        assert_eq!(args.swarm_max_subagents, 4);
3063        assert_eq!(args.swarm_max_steps, 42);
3064        assert_eq!(args.swarm_subagent_timeout_secs, 180);
3065    }
3066
3067    #[test]
3068    fn resolve_worker_id_prefers_env() {
3069        let original = std::env::var("CODETETHER_WORKER_ID").ok();
3070        unsafe {
3071            std::env::set_var("CODETETHER_WORKER_ID", "harvester-test-worker");
3072        }
3073        let resolved = resolve_worker_id();
3074        match original {
3075            Some(value) => unsafe {
3076                std::env::set_var("CODETETHER_WORKER_ID", value);
3077            },
3078            None => unsafe {
3079                std::env::remove_var("CODETETHER_WORKER_ID");
3080            },
3081        }
3082        assert_eq!(resolved, "harvester-test-worker");
3083    }
3084
3085    #[test]
3086    fn advertised_interfaces_include_http_and_bus_urls() {
3087        let interfaces = advertised_interfaces(Some("http://worker.test:8080/"));
3088        assert_eq!(interfaces["http"]["base_url"], "http://worker.test:8080");
3089        assert_eq!(
3090            interfaces["bus"]["stream_url"],
3091            "http://worker.test:8080/v1/bus/stream"
3092        );
3093        assert_eq!(
3094            interfaces["bus"]["publish_url"],
3095            "http://worker.test:8080/v1/bus/publish"
3096        );
3097    }
3098
3099    #[test]
3100    fn advertised_interfaces_omit_empty_public_url() {
3101        assert_eq!(advertised_interfaces(None), serde_json::json!({}));
3102        assert_eq!(advertised_interfaces(Some("   ")), serde_json::json!({}));
3103    }
3104
3105    #[tokio::test]
3106    async fn reserve_task_slot_enforces_capacity() {
3107        let processing = Arc::new(Mutex::new(HashSet::new()));
3108
3109        assert_eq!(
3110            reserve_task_slot(&processing, "task-1", 2).await,
3111            TaskReservation::Reserved
3112        );
3113        assert_eq!(
3114            reserve_task_slot(&processing, "task-1", 2).await,
3115            TaskReservation::AlreadyProcessing
3116        );
3117        assert_eq!(
3118            reserve_task_slot(&processing, "task-2", 2).await,
3119            TaskReservation::Reserved
3120        );
3121        assert_eq!(
3122            reserve_task_slot(&processing, "task-3", 2).await,
3123            TaskReservation::AtCapacity
3124        );
3125    }
3126
3127    #[test]
3128    fn normalize_max_concurrent_tasks_never_returns_zero() {
3129        assert_eq!(normalize_max_concurrent_tasks(0), 1);
3130        assert_eq!(normalize_max_concurrent_tasks(3), 3);
3131    }
3132}