Skip to main content

codetether_agent/a2a/
worker.rs

1//! A2A Worker - connects to an A2A server to process tasks
2
3mod clone_location;
4mod clone_target;
5
6use crate::a2a::claim::TaskClaimResponse;
7use crate::a2a::git_credentials::{
8    configure_repo_git_auth, configure_repo_git_github_app, write_git_credential_helper_script,
9};
10use crate::a2a::worker_workspace_record::{RegisteredWorkspaceRecord, fetch_workspace_record};
11use crate::bus::AgentBus;
12use crate::cli::{A2aArgs, ForageArgs};
13use crate::provenance::{ClaimProvenance, install_commit_msg_hook};
14use crate::provider::ProviderRegistry;
15use crate::session::Session;
16use crate::swarm::{DecompositionStrategy, SwarmConfig, SwarmExecutor};
17use crate::tui::swarm_view::SwarmEvent;
18use crate::util;
19use anyhow::{Context, Result};
20use futures::StreamExt;
21use reqwest::Client;
22use serde::Deserialize;
23use std::collections::HashMap;
24use std::collections::HashSet;
25use std::path::{Path, PathBuf};
26use std::sync::Arc;
27use std::time::Duration;
28use tokio::process::Command;
29use tokio::sync::{Mutex, mpsc};
30use tokio::task::JoinHandle;
31use tokio::time::Instant;
32
33use crate::a2a::worker_tool_registry::{create_filtered_registry, is_tool_allowed};
34use crate::a2a::worker_workspace_context::resolve_task_workspace_dir;
35use clone_location::{git_clone_base_dir, resolve_workspace_clone_path};
36use clone_target::prepare_clone_target;
37
38/// Worker status for heartbeat
39#[derive(Debug, Clone, Copy, PartialEq, Eq)]
40pub enum WorkerStatus {
41    Idle,
42    Processing,
43}
44
45impl WorkerStatus {
46    pub fn as_str(&self) -> &'static str {
47        match self {
48            WorkerStatus::Idle => "idle",
49            WorkerStatus::Processing => "processing",
50        }
51    }
52}
53
54/// Heartbeat state shared between the heartbeat task and the main worker
55#[derive(Clone)]
56pub struct HeartbeatState {
57    worker_id: String,
58    pub agent_name: String,
59    pub status: Arc<Mutex<WorkerStatus>>,
60    pub active_task_count: Arc<Mutex<usize>>,
61    pub sub_agents: Arc<Mutex<HashSet<String>>>,
62}
63
64impl HeartbeatState {
65    pub fn new(worker_id: String, agent_name: String) -> Self {
66        Self {
67            worker_id,
68            agent_name,
69            status: Arc::new(Mutex::new(WorkerStatus::Idle)),
70            active_task_count: Arc::new(Mutex::new(0)),
71            sub_agents: Arc::new(Mutex::new(HashSet::new())),
72        }
73    }
74
75    pub async fn set_status(&self, status: WorkerStatus) {
76        *self.status.lock().await = status;
77    }
78
79    pub async fn set_task_count(&self, count: usize) {
80        *self.active_task_count.lock().await = count;
81    }
82
83    pub async fn register_sub_agent(&self, agent_name: String) {
84        self.sub_agents.lock().await.insert(agent_name);
85    }
86
87    pub async fn deregister_sub_agent(&self, agent_name: &str) {
88        self.sub_agents.lock().await.remove(agent_name);
89    }
90
91    pub async fn sub_agents_snapshot(&self) -> Vec<String> {
92        let mut names = self
93            .sub_agents
94            .lock()
95            .await
96            .iter()
97            .cloned()
98            .collect::<Vec<_>>();
99        names.sort();
100        names
101    }
102}
103
104#[derive(Clone, Debug)]
105#[allow(dead_code)]
106pub struct CognitionHeartbeatConfig {
107    pub enabled: bool,
108    pub source_base_url: String,
109    pub token: Option<String>,
110    pub provider_name: String,
111    pub interval_secs: u64,
112    pub include_thought_summary: bool,
113    pub summary_max_chars: usize,
114    pub request_timeout_ms: u64,
115}
116
117impl CognitionHeartbeatConfig {
118    pub fn from_env() -> Self {
119        let source_base_url = std::env::var("CODETETHER_WORKER_COGNITION_SOURCE_URL")
120            .unwrap_or_else(|_| "http://127.0.0.1:4096".to_string())
121            .trim_end_matches('/')
122            .to_string();
123
124        Self {
125            enabled: env_bool("CODETETHER_WORKER_COGNITION_SHARE_ENABLED", true),
126            source_base_url,
127            include_thought_summary: env_bool("CODETETHER_WORKER_COGNITION_INCLUDE_THOUGHTS", true),
128            summary_max_chars: env_usize("CODETETHER_WORKER_COGNITION_THOUGHT_MAX_CHARS", 480),
129            request_timeout_ms: env_u64("CODETETHER_WORKER_COGNITION_TIMEOUT_MS", 2_500).max(250),
130            interval_secs: env_u64("CODETETHER_WORKER_COGNITION_INTERVAL_SECS", 30).max(5),
131            provider_name: std::env::var("CODETETHER_WORKER_COGNITION_PROVIDER")
132                .unwrap_or_else(|_| "cognition".to_string()),
133            token: std::env::var("CODETETHER_WORKER_COGNITION_TOKEN").ok(),
134        }
135    }
136}
137
138#[derive(Debug, Deserialize)]
139struct CognitionStatusSnapshot {
140    running: bool,
141    #[serde(default)]
142    last_tick_at: Option<String>,
143    #[serde(default)]
144    active_persona_count: usize,
145    #[serde(default)]
146    events_buffered: usize,
147    #[serde(default)]
148    snapshots_buffered: usize,
149    #[serde(default)]
150    loop_interval_ms: u64,
151}
152
153#[derive(Debug, Deserialize)]
154struct CognitionLatestSnapshot {
155    generated_at: String,
156    summary: String,
157    #[serde(default)]
158    metadata: HashMap<String, serde_json::Value>,
159}
160
161#[derive(Debug, Clone, Copy, PartialEq, Eq)]
162enum TaskReservation {
163    Reserved,
164    AlreadyProcessing,
165    AtCapacity,
166}
167
168#[derive(Clone)]
169struct WorkerTaskRuntime {
170    client: Client,
171    server: String,
172    token: Option<String>,
173    worker_id: String,
174    processing: Arc<Mutex<HashSet<String>>>,
175    max_concurrent_tasks: usize,
176    auto_approve: AutoApprove,
177    bus: Arc<AgentBus>,
178}
179
180// Run the A2A worker
181pub async fn run(args: A2aArgs) -> Result<()> {
182    let server = args.server.trim_end_matches('/');
183    let name = args
184        .name
185        .unwrap_or_else(|| format!("codetether-{}", std::process::id()));
186    let worker_id = resolve_worker_id();
187    export_worker_runtime_env(server, &args.token, &worker_id);
188    let max_concurrent_tasks = normalize_max_concurrent_tasks(args.max_concurrent_tasks);
189
190    let codebases: Vec<String> = args
191        .workspaces
192        .map(|c| c.split(',').map(|s| s.trim().to_string()).collect())
193        .unwrap_or_else(|| vec![std::env::current_dir().unwrap().display().to_string()]);
194
195    tracing::info!("Starting A2A worker: {} ({})", name, worker_id);
196    tracing::info!("Server: {}", server);
197    tracing::info!("Workspaces: {:?}", codebases);
198    tracing::info!(max_concurrent_tasks, "Worker task concurrency configured");
199
200    // Wrap in shared mutex so background workspace-sync can add new local paths
201    let shared_codebases: Arc<Mutex<Vec<String>>> = Arc::new(Mutex::new(codebases));
202
203    let client = Client::new();
204    let processing = Arc::new(Mutex::new(HashSet::<String>::new()));
205    let cognition_heartbeat = CognitionHeartbeatConfig::from_env();
206    if cognition_heartbeat.enabled {
207        tracing::info!(
208            source = %cognition_heartbeat.source_base_url,
209            include_thoughts = cognition_heartbeat.include_thought_summary,
210            max_chars = cognition_heartbeat.summary_max_chars,
211            timeout_ms = cognition_heartbeat.request_timeout_ms,
212            "Cognition heartbeat sharing enabled (set CODETETHER_WORKER_COGNITION_SHARE_ENABLED=false to disable)"
213        );
214    } else {
215        tracing::warn!(
216            "Cognition heartbeat sharing disabled; worker thought state will not be shared upstream"
217        );
218    }
219
220    let auto_approve = match args.auto_approve.as_str() {
221        "all" => AutoApprove::All,
222        "safe" => AutoApprove::Safe,
223        _ => AutoApprove::None,
224    };
225
226    // Create heartbeat state
227    let heartbeat_state = HeartbeatState::new(worker_id.clone(), name.clone());
228
229    // Create agent bus for in-process sub-agent communication
230    let bus = AgentBus::new().into_arc();
231
232    // Auto-start S3 sink if MinIO is configured
233    crate::bus::s3_sink::spawn_bus_s3_sink(bus.clone());
234
235    {
236        let handle = bus.handle(&worker_id);
237        handle.announce_ready(worker_capabilities());
238    }
239
240    let task_runtime = WorkerTaskRuntime {
241        client: client.clone(),
242        server: server.to_string(),
243        token: args.token.clone(),
244        worker_id: worker_id.clone(),
245        processing: processing.clone(),
246        max_concurrent_tasks,
247        auto_approve,
248        bus: bus.clone(),
249    };
250
251    // Register worker
252    {
253        let codebases = shared_codebases.lock().await.clone();
254        register_worker(
255            &client,
256            server,
257            &args.token,
258            &worker_id,
259            &name,
260            &codebases,
261            args.public_url.as_deref(),
262        )
263        .await?;
264    }
265
266    if let Err(e) =
267        sync_workspaces_from_server(&client, server, &args.token, &shared_codebases).await
268    {
269        tracing::warn!(error = %e, "Initial workspace sync failed");
270    }
271
272    // Fetch pending tasks
273    fetch_pending_tasks(&task_runtime).await?;
274
275    // Start background task that polls server for new locally-present workspaces
276    let _workspace_sync_handle = start_workspace_sync(
277        client.clone(),
278        server.to_string(),
279        args.token.clone(),
280        shared_codebases.clone(),
281    );
282
283    // Connect to SSE stream
284    loop {
285        // Refresh codebases snapshot (background sync may have added new local paths)
286        let codebases = shared_codebases.lock().await.clone();
287
288        // Re-register worker on each reconnection to report updated models/capabilities
289        if let Err(e) = register_worker(
290            &client,
291            server,
292            &args.token,
293            &worker_id,
294            &name,
295            &codebases,
296            args.public_url.as_deref(),
297        )
298        .await
299        {
300            tracing::warn!("Failed to re-register worker on reconnection: {}", e);
301        }
302        if let Err(e) = fetch_pending_tasks(&task_runtime).await {
303            tracing::warn!("Reconnect task fetch failed: {}", e);
304        }
305
306        // Start heartbeat task for this connection
307        let heartbeat_handle = start_heartbeat(
308            client.clone(),
309            server.to_string(),
310            args.token.clone(),
311            heartbeat_state.clone(),
312            processing.clone(),
313            cognition_heartbeat.clone(),
314        );
315
316        match connect_stream(&task_runtime, &name, &codebases, None).await {
317            Ok(()) => {
318                tracing::warn!("Stream ended, reconnecting...");
319            }
320            Err(e) => {
321                tracing::error!("Stream error: {}, reconnecting...", e);
322            }
323        }
324
325        // Cancel heartbeat on disconnection
326        heartbeat_handle.abort();
327        tracing::debug!("Heartbeat cancelled for reconnection");
328
329        tokio::time::sleep(tokio::time::Duration::from_secs(5)).await;
330    }
331}
332
333/// Run the A2A worker with shared state for HTTP server integration
334/// This variant accepts a WorkerServerState to communicate with the HTTP server
335pub async fn run_with_state(
336    args: A2aArgs,
337    server_state: crate::worker_server::WorkerServerState,
338) -> Result<()> {
339    let server = args.server.trim_end_matches('/');
340    let name = args
341        .name
342        .unwrap_or_else(|| format!("codetether-{}", std::process::id()));
343    let worker_id = resolve_worker_id();
344    export_worker_runtime_env(server, &args.token, &worker_id);
345    let max_concurrent_tasks = normalize_max_concurrent_tasks(args.max_concurrent_tasks);
346
347    // Share worker_id with HTTP server
348    server_state.set_worker_id(worker_id.clone()).await;
349
350    let codebases: Vec<String> = args
351        .workspaces
352        .map(|c| c.split(',').map(|s| s.trim().to_string()).collect())
353        .unwrap_or_else(|| vec![std::env::current_dir().unwrap().display().to_string()]);
354
355    tracing::info!("Starting A2A worker: {} ({})", name, worker_id);
356    tracing::info!("Server: {}", server);
357    tracing::info!("Workspaces: {:?}", codebases);
358    tracing::info!(max_concurrent_tasks, "Worker task concurrency configured");
359
360    // Wrap in shared mutex so background workspace-sync can add new local paths
361    let shared_codebases: Arc<Mutex<Vec<String>>> = Arc::new(Mutex::new(codebases));
362
363    let client = Client::new();
364    let processing = Arc::new(Mutex::new(HashSet::<String>::new()));
365    let cognition_heartbeat = CognitionHeartbeatConfig::from_env();
366    if cognition_heartbeat.enabled {
367        tracing::info!(
368            source = %cognition_heartbeat.source_base_url,
369            include_thoughts = cognition_heartbeat.include_thought_summary,
370            max_chars = cognition_heartbeat.summary_max_chars,
371            timeout_ms = cognition_heartbeat.request_timeout_ms,
372            "Cognition heartbeat sharing enabled (set CODETETHER_WORKER_COGNITION_SHARE_ENABLED=false to disable)"
373        );
374    } else {
375        tracing::warn!(
376            "Cognition heartbeat sharing disabled; worker thought state will not be shared upstream"
377        );
378    }
379
380    let auto_approve = match args.auto_approve.as_str() {
381        "all" => AutoApprove::All,
382        "safe" => AutoApprove::Safe,
383        _ => AutoApprove::None,
384    };
385
386    // Create heartbeat state
387    let heartbeat_state = HeartbeatState::new(worker_id.clone(), name.clone());
388
389    // Share heartbeat state with HTTP server
390    server_state
391        .set_heartbeat_state(Arc::new(heartbeat_state.clone()))
392        .await;
393
394    // Create agent bus for in-process sub-agent communication
395    let bus = AgentBus::new().into_arc();
396    server_state.set_bus(bus.clone()).await;
397
398    // Auto-start S3 sink if MinIO is configured
399    crate::bus::s3_sink::spawn_bus_s3_sink(bus.clone());
400
401    {
402        let handle = bus.handle(&worker_id);
403        handle.announce_ready(worker_capabilities());
404    }
405
406    let task_runtime = WorkerTaskRuntime {
407        client: client.clone(),
408        server: server.to_string(),
409        token: args.token.clone(),
410        worker_id: worker_id.clone(),
411        processing: processing.clone(),
412        max_concurrent_tasks,
413        auto_approve,
414        bus: bus.clone(),
415    };
416
417    // Register worker
418    {
419        let codebases = shared_codebases.lock().await.clone();
420        register_worker(
421            &client,
422            server,
423            &args.token,
424            &worker_id,
425            &name,
426            &codebases,
427            args.public_url.as_deref(),
428        )
429        .await?;
430    }
431
432    if let Err(e) =
433        sync_workspaces_from_server(&client, server, &args.token, &shared_codebases).await
434    {
435        tracing::warn!(error = %e, "Initial workspace sync failed");
436    }
437
438    // Mark as connected
439    server_state.set_connected(true).await;
440
441    // Fetch pending tasks before entering reconnection loop
442    fetch_pending_tasks(&task_runtime).await?;
443
444    // Start background task that polls server for new locally-present workspaces
445    let _workspace_sync_handle = start_workspace_sync(
446        client.clone(),
447        server.to_string(),
448        args.token.clone(),
449        shared_codebases.clone(),
450    );
451
452    // Connect to SSE stream
453    loop {
454        // Refresh codebases snapshot (background sync may have added new local paths)
455        let codebases = shared_codebases.lock().await.clone();
456
457        // Create task notification channel for CloudEvent-triggered task execution
458        // Recreate on each reconnection since the receiver is moved into connect_stream
459        let (task_notify_tx, task_notify_rx) = mpsc::channel::<String>(32);
460        server_state
461            .set_task_notification_channel(task_notify_tx)
462            .await;
463
464        // Mark as connected on each reconnection
465        server_state.set_connected(true).await;
466
467        // Re-register worker on each reconnection to report updated models/capabilities
468        if let Err(e) = register_worker(
469            &client,
470            server,
471            &args.token,
472            &worker_id,
473            &name,
474            &codebases,
475            args.public_url.as_deref(),
476        )
477        .await
478        {
479            tracing::warn!("Failed to re-register worker on reconnection: {}", e);
480        }
481        if let Err(e) = fetch_pending_tasks(&task_runtime).await {
482            tracing::warn!("Reconnect task fetch failed: {}", e);
483        }
484
485        // Start heartbeat task for this connection
486        let heartbeat_handle = start_heartbeat(
487            client.clone(),
488            server.to_string(),
489            args.token.clone(),
490            heartbeat_state.clone(),
491            processing.clone(),
492            cognition_heartbeat.clone(),
493        );
494
495        match connect_stream(&task_runtime, &name, &codebases, Some(task_notify_rx)).await {
496            Ok(()) => {
497                tracing::warn!("Stream ended, reconnecting...");
498            }
499            Err(e) => {
500                tracing::error!("Stream error: {}, reconnecting...", e);
501            }
502        }
503
504        // Mark as disconnected
505        server_state.set_connected(false).await;
506
507        // Cancel heartbeat on disconnection
508        heartbeat_handle.abort();
509        tracing::debug!("Heartbeat cancelled for reconnection");
510
511        tokio::time::sleep(tokio::time::Duration::from_secs(5)).await;
512    }
513}
514
515pub fn generate_worker_id() -> String {
516    format!(
517        "wrk_{}_{:x}",
518        chrono::Utc::now().timestamp(),
519        rand::random::<u64>()
520    )
521}
522
523fn resolve_worker_id() -> String {
524    for key in ["CODETETHER_WORKER_ID", "A2A_WORKER_ID"] {
525        if let Ok(value) = std::env::var(key) {
526            let trimmed = value.trim();
527            if !trimmed.is_empty() {
528                return trimmed.to_string();
529            }
530        }
531    }
532    generate_worker_id()
533}
534
535fn normalize_max_concurrent_tasks(max_concurrent_tasks: usize) -> usize {
536    max_concurrent_tasks.max(1)
537}
538
539async fn reserve_task_slot(
540    processing: &Arc<Mutex<HashSet<String>>>,
541    task_id: &str,
542    max_concurrent_tasks: usize,
543) -> TaskReservation {
544    let mut proc = processing.lock().await;
545    if proc.contains(task_id) {
546        TaskReservation::AlreadyProcessing
547    } else if proc.len() >= max_concurrent_tasks {
548        TaskReservation::AtCapacity
549    } else {
550        proc.insert(task_id.to_string());
551        TaskReservation::Reserved
552    }
553}
554
555fn export_worker_runtime_env(server: &str, token: &Option<String>, worker_id: &str) {
556    // SAFETY: The worker sets these process-wide variables once during startup before
557    // spawning Git helper child processes. They are required so Git credential helpers
558    // invoked by later shell/git commands can reach the control plane securely.
559    unsafe {
560        std::env::set_var("CODETETHER_SERVER", server);
561        std::env::set_var("CODETETHER_WORKER_ID", worker_id);
562        if let Some(token) = token {
563            std::env::set_var("CODETETHER_TOKEN", token);
564        }
565    }
566}
567
568fn advertised_interfaces(public_url: Option<&str>) -> serde_json::Value {
569    let Some(base_url) = public_url
570        .map(str::trim)
571        .filter(|value| !value.is_empty())
572        .map(|value| value.trim_end_matches('/').to_string())
573    else {
574        return serde_json::json!({});
575    };
576
577    serde_json::json!({
578        "http": {
579            "base_url": base_url,
580        },
581        "bus": {
582            "stream_url": format!("{base_url}/v1/bus/stream"),
583            "publish_url": format!("{base_url}/v1/bus/publish"),
584        },
585    })
586}
587
588/// Approval policy used by the worker when deciding whether to execute tools automatically.
589///
590/// # Examples
591///
592/// ```rust
593/// use codetether_agent::a2a::worker::AutoApprove;
594///
595/// let mode = AutoApprove::Safe;
596/// assert!(matches!(mode, AutoApprove::Safe));
597/// ```
598#[derive(Debug, Clone, Copy)]
599pub enum AutoApprove {
600    All,
601    Safe,
602    None,
603}
604
605/// Default A2A server URL when none is configured
606pub const DEFAULT_A2A_SERVER_URL: &str = "https://api.codetether.run";
607
608/// Capabilities of the codetether-agent worker
609const BASE_WORKER_CAPABILITIES: &[&str] = &[
610    "forage", "ralph", "swarm", "rlm", "a2a", "mcp", "grpc", "grpc-web", "jsonrpc",
611];
612
613fn worker_capabilities() -> Vec<String> {
614    let mut capabilities: Vec<String> = BASE_WORKER_CAPABILITIES
615        .iter()
616        .map(|capability| capability.to_string())
617        .collect();
618
619    let is_knative = std::env::var("KNATIVE_SERVICE")
620        .map(|value| {
621            let normalized = value.trim().to_lowercase();
622            normalized == "1" || normalized == "true" || normalized == "yes"
623        })
624        .unwrap_or(false);
625    if is_knative {
626        capabilities.push("knative".to_string());
627    }
628
629    capabilities
630}
631
632fn task_value<'a>(task: &'a serde_json::Value, key: &str) -> Option<&'a serde_json::Value> {
633    task.get("task")
634        .and_then(|t| t.get(key))
635        .or_else(|| task.get(key))
636}
637
638fn task_str<'a>(task: &'a serde_json::Value, key: &str) -> Option<&'a str> {
639    task_value(task, key).and_then(|v| v.as_str())
640}
641
642fn task_metadata(task: &serde_json::Value) -> serde_json::Map<String, serde_json::Value> {
643    task_value(task, "metadata")
644        .and_then(|m| m.as_object())
645        .cloned()
646        .unwrap_or_default()
647}
648
649fn model_ref_to_provider_model(model: &str) -> String {
650    // Convert "provider:model" to "provider/model" format, but only if
651    // there is no '/' already present. Model IDs like "amazon.nova-micro-v1:0"
652    // contain colons as version separators and must NOT be converted.
653    if !model.contains('/') && model.contains(':') {
654        model.replacen(':', "/", 1)
655    } else {
656        model.to_string()
657    }
658}
659
660fn provider_preferences_for_tier(model_tier: Option<&str>) -> &'static [&'static str] {
661    match model_tier.unwrap_or("balanced") {
662        "fast" | "quick" => &[
663            "zai",
664            "openai",
665            "github-copilot",
666            "moonshotai",
667            "openrouter",
668            "novita",
669            "google",
670            "anthropic",
671        ],
672        "heavy" | "deep" => &[
673            "zai",
674            "anthropic",
675            "openai",
676            "github-copilot",
677            "moonshotai",
678            "openrouter",
679            "novita",
680            "google",
681        ],
682        _ => &[
683            "zai",
684            "openai",
685            "github-copilot",
686            "anthropic",
687            "moonshotai",
688            "openrouter",
689            "novita",
690            "google",
691        ],
692    }
693}
694
695fn choose_provider_for_tier<'a>(providers: &'a [&'a str], model_tier: Option<&str>) -> &'a str {
696    for preferred in provider_preferences_for_tier(model_tier) {
697        if let Some(found) = providers.iter().copied().find(|p| *p == *preferred) {
698            return found;
699        }
700    }
701    if let Some(found) = providers.iter().copied().find(|p| *p == "zai") {
702        return found;
703    }
704    providers[0]
705}
706
707fn default_model_for_provider(provider: &str, model_tier: Option<&str>) -> String {
708    match model_tier.unwrap_or("balanced") {
709        "fast" | "quick" => match provider {
710            "moonshotai" => "kimi-k2.5".to_string(),
711            "anthropic" => "claude-haiku-4-5".to_string(),
712            "openai" => "gpt-4o-mini".to_string(),
713            "google" => "gemini-2.5-flash".to_string(),
714            "zhipuai" | "zai" => "glm-5".to_string(),
715            "openrouter" => "z-ai/glm-5".to_string(),
716            "novita" => "Qwen/Qwen3.5-35B-A3B".to_string(),
717            "bedrock" => "amazon.nova-lite-v1:0".to_string(),
718            _ => "glm-5".to_string(),
719        },
720        "heavy" | "deep" => match provider {
721            "moonshotai" => "kimi-k2.5".to_string(),
722            "anthropic" => "claude-sonnet-4-20250514".to_string(),
723            "openai" => "o3".to_string(),
724            "google" => "gemini-2.5-pro".to_string(),
725            "zhipuai" | "zai" => "glm-5".to_string(),
726            "openrouter" => "z-ai/glm-5".to_string(),
727            "novita" => "Qwen/Qwen3.5-35B-A3B".to_string(),
728            "bedrock" => "us.anthropic.claude-opus-4-6-v1".to_string(),
729            _ => "glm-5".to_string(),
730        },
731        _ => match provider {
732            "moonshotai" => "kimi-k2.5".to_string(),
733            "anthropic" => "claude-sonnet-4-20250514".to_string(),
734            "openai" => "gpt-4o".to_string(),
735            "google" => "gemini-2.5-pro".to_string(),
736            "zhipuai" | "zai" => "glm-5".to_string(),
737            "openrouter" => "z-ai/glm-5".to_string(),
738            "novita" => "Qwen/Qwen3.5-35B-A3B".to_string(),
739            "bedrock" => "amazon.nova-lite-v1:0".to_string(),
740            _ => "glm-5".to_string(),
741        },
742    }
743}
744
745fn prefers_temperature_one(model: &str) -> bool {
746    let normalized = model.to_ascii_lowercase();
747    normalized.contains("kimi-k2") || normalized.contains("glm-") || normalized.contains("minimax")
748}
749
750fn is_swarm_agent(agent_type: &str) -> bool {
751    matches!(
752        agent_type.trim().to_ascii_lowercase().as_str(),
753        "swarm" | "parallel" | "multi-agent"
754    )
755}
756
757fn is_forage_agent(agent_type: &str) -> bool {
758    agent_type.trim().eq_ignore_ascii_case("forage")
759}
760
761fn metadata_lookup<'a>(
762    metadata: &'a serde_json::Map<String, serde_json::Value>,
763    key: &str,
764) -> Option<&'a serde_json::Value> {
765    metadata
766        .get(key)
767        .or_else(|| {
768            metadata
769                .get("routing")
770                .and_then(|v| v.as_object())
771                .and_then(|obj| obj.get(key))
772        })
773        .or_else(|| {
774            metadata
775                .get("swarm")
776                .and_then(|v| v.as_object())
777                .and_then(|obj| obj.get(key))
778        })
779        .or_else(|| {
780            metadata
781                .get("forage")
782                .and_then(|v| v.as_object())
783                .and_then(|obj| obj.get(key))
784        })
785}
786
787fn metadata_str(
788    metadata: &serde_json::Map<String, serde_json::Value>,
789    keys: &[&str],
790) -> Option<String> {
791    for key in keys {
792        if let Some(value) = metadata_lookup(metadata, key).and_then(|v| v.as_str()) {
793            let trimmed = value.trim();
794            if !trimmed.is_empty() {
795                return Some(trimmed.to_string());
796            }
797        }
798    }
799    None
800}
801
802fn metadata_usize(
803    metadata: &serde_json::Map<String, serde_json::Value>,
804    keys: &[&str],
805) -> Option<usize> {
806    for key in keys {
807        if let Some(value) = metadata_lookup(metadata, key) {
808            if let Some(v) = value.as_u64() {
809                return usize::try_from(v).ok();
810            }
811            if let Some(v) = value.as_i64()
812                && v >= 0
813            {
814                return usize::try_from(v as u64).ok();
815            }
816            if let Some(v) = value.as_str()
817                && let Ok(parsed) = v.trim().parse::<usize>()
818            {
819                return Some(parsed);
820            }
821        }
822    }
823    None
824}
825
826fn metadata_u64(
827    metadata: &serde_json::Map<String, serde_json::Value>,
828    keys: &[&str],
829) -> Option<u64> {
830    for key in keys {
831        if let Some(value) = metadata_lookup(metadata, key) {
832            if let Some(v) = value.as_u64() {
833                return Some(v);
834            }
835            if let Some(v) = value.as_i64()
836                && v >= 0
837            {
838                return Some(v as u64);
839            }
840            if let Some(v) = value.as_str()
841                && let Ok(parsed) = v.trim().parse::<u64>()
842            {
843                return Some(parsed);
844            }
845        }
846    }
847    None
848}
849
850fn metadata_bool(
851    metadata: &serde_json::Map<String, serde_json::Value>,
852    keys: &[&str],
853) -> Option<bool> {
854    for key in keys {
855        if let Some(value) = metadata_lookup(metadata, key) {
856            if let Some(v) = value.as_bool() {
857                return Some(v);
858            }
859            if let Some(v) = value.as_str() {
860                match v.trim().to_ascii_lowercase().as_str() {
861                    "1" | "true" | "yes" | "on" => return Some(true),
862                    "0" | "false" | "no" | "off" => return Some(false),
863                    _ => {}
864                }
865            }
866        }
867    }
868    None
869}
870
871fn metadata_f64(
872    metadata: &serde_json::Map<String, serde_json::Value>,
873    keys: &[&str],
874) -> Option<f64> {
875    for key in keys {
876        if let Some(value) = metadata_lookup(metadata, key) {
877            if let Some(v) = value.as_f64() {
878                return Some(v);
879            }
880            if let Some(v) = value.as_i64() {
881                return Some(v as f64);
882            }
883            if let Some(v) = value.as_u64() {
884                return Some(v as f64);
885            }
886            if let Some(v) = value.as_str()
887                && let Ok(parsed) = v.trim().parse::<f64>()
888            {
889                return Some(parsed);
890            }
891        }
892    }
893    None
894}
895
896fn metadata_string_list(
897    metadata: &serde_json::Map<String, serde_json::Value>,
898    keys: &[&str],
899) -> Vec<String> {
900    for key in keys {
901        let Some(value) = metadata_lookup(metadata, key) else {
902            continue;
903        };
904
905        if let Some(items) = value.as_array() {
906            let parsed = items
907                .iter()
908                .filter_map(|item| item.as_str())
909                .map(str::trim)
910                .filter(|item| !item.is_empty())
911                .map(ToString::to_string)
912                .collect::<Vec<_>>();
913            if !parsed.is_empty() {
914                return parsed;
915            }
916        }
917
918        if let Some(value) = value.as_str() {
919            let parsed = value
920                .split(['\n', ','])
921                .map(str::trim)
922                .filter(|item| !item.is_empty())
923                .map(ToString::to_string)
924                .collect::<Vec<_>>();
925            if !parsed.is_empty() {
926                return parsed;
927            }
928        }
929    }
930
931    Vec::new()
932}
933
934fn normalize_forage_execution_engine(value: Option<String>) -> String {
935    match value
936        .as_deref()
937        .unwrap_or("run")
938        .trim()
939        .to_ascii_lowercase()
940        .as_str()
941    {
942        "swarm" => "swarm".to_string(),
943        "go" => "go".to_string(),
944        _ => "run".to_string(),
945    }
946}
947
948fn normalize_forage_swarm_strategy(value: Option<String>) -> String {
949    match value
950        .as_deref()
951        .unwrap_or("auto")
952        .trim()
953        .to_ascii_lowercase()
954        .as_str()
955    {
956        "domain" => "domain".to_string(),
957        "data" => "data".to_string(),
958        "stage" => "stage".to_string(),
959        "none" => "none".to_string(),
960        _ => "auto".to_string(),
961    }
962}
963
964fn build_forage_args(
965    prompt: &str,
966    title: &str,
967    metadata: &serde_json::Map<String, serde_json::Value>,
968    selected_model: Option<String>,
969) -> ForageArgs {
970    let mut moonshots = metadata_string_list(
971        metadata,
972        &["moonshots", "moonshot", "goals", "mission", "missions"],
973    );
974    if moonshots.is_empty() {
975        let fallback = prompt.trim();
976        if !fallback.is_empty() {
977            moonshots.push(fallback.to_string());
978        } else if !title.trim().is_empty() {
979            moonshots.push(title.trim().to_string());
980        }
981    }
982
983    ForageArgs {
984        top: metadata_usize(metadata, &["top"]).unwrap_or(3),
985        loop_mode: metadata_bool(metadata, &["loop", "loop_mode"]).unwrap_or(false),
986        interval_secs: metadata_u64(metadata, &["interval_secs", "interval"]).unwrap_or(120),
987        max_cycles: metadata_usize(metadata, &["max_cycles"]).unwrap_or(0),
988        execute: metadata_bool(metadata, &["execute"]).unwrap_or(false),
989        // Task-queue forage should run without requiring S3 archival by default.
990        no_s3: metadata_bool(metadata, &["no_s3", "local_only"]).unwrap_or(true),
991        moonshots,
992        moonshot_file: metadata_str(metadata, &["moonshot_file"]).map(PathBuf::from),
993        moonshot_required: metadata_bool(metadata, &["moonshot_required"]).unwrap_or(false),
994        moonshot_min_alignment: metadata_f64(metadata, &["moonshot_min_alignment"]).unwrap_or(0.10),
995        execution_engine: normalize_forage_execution_engine(metadata_str(
996            metadata,
997            &["execution_engine", "engine"],
998        )),
999        run_timeout_secs: metadata_u64(metadata, &["run_timeout_secs", "timeout_secs", "timeout"])
1000            .unwrap_or(900),
1001        fail_fast: metadata_bool(metadata, &["fail_fast"]).unwrap_or(false),
1002        swarm_strategy: normalize_forage_swarm_strategy(metadata_str(
1003            metadata,
1004            &["swarm_strategy", "strategy"],
1005        )),
1006        swarm_max_subagents: metadata_usize(metadata, &["swarm_max_subagents"]).unwrap_or(8),
1007        swarm_max_steps: metadata_usize(metadata, &["swarm_max_steps"]).unwrap_or(100),
1008        swarm_subagent_timeout_secs: metadata_u64(metadata, &["swarm_subagent_timeout_secs"])
1009            .unwrap_or(300),
1010        model: selected_model,
1011        json: metadata_bool(metadata, &["json"]).unwrap_or(false),
1012    }
1013}
1014
1015fn parse_swarm_strategy(
1016    metadata: &serde_json::Map<String, serde_json::Value>,
1017) -> DecompositionStrategy {
1018    match metadata_str(
1019        metadata,
1020        &[
1021            "decomposition_strategy",
1022            "swarm_strategy",
1023            "strategy",
1024            "swarm_decomposition",
1025        ],
1026    )
1027    .as_deref()
1028    .map(|s| s.to_ascii_lowercase())
1029    .as_deref()
1030    {
1031        Some("none") | Some("single") => DecompositionStrategy::None,
1032        Some("domain") | Some("by_domain") => DecompositionStrategy::ByDomain,
1033        Some("data") | Some("by_data") => DecompositionStrategy::ByData,
1034        Some("stage") | Some("by_stage") => DecompositionStrategy::ByStage,
1035        _ => DecompositionStrategy::Automatic,
1036    }
1037}
1038
1039async fn resolve_swarm_model(
1040    explicit_model: Option<String>,
1041    model_tier: Option<&str>,
1042) -> Option<String> {
1043    if let Some(model) = explicit_model
1044        && !model.trim().is_empty()
1045    {
1046        return Some(model);
1047    }
1048
1049    let registry = ProviderRegistry::from_vault().await.ok()?;
1050    let providers = registry.list();
1051    if providers.is_empty() {
1052        return None;
1053    }
1054    let provider = choose_provider_for_tier(providers.as_slice(), model_tier);
1055    let model = default_model_for_provider(provider, model_tier);
1056    Some(format!("{}/{}", provider, model))
1057}
1058
1059fn format_swarm_event_for_output(event: &SwarmEvent) -> Option<String> {
1060    match event {
1061        SwarmEvent::Started {
1062            task,
1063            total_subtasks,
1064        } => Some(format!(
1065            "[swarm] started task={} planned_subtasks={}",
1066            task, total_subtasks
1067        )),
1068        SwarmEvent::StageComplete {
1069            stage,
1070            completed,
1071            failed,
1072        } => Some(format!(
1073            "[swarm] stage={} completed={} failed={}",
1074            stage, completed, failed
1075        )),
1076        SwarmEvent::SubTaskUpdate { id, status, .. } => Some(format!(
1077            "[swarm] subtask id={} status={}",
1078            &id.chars().take(8).collect::<String>(),
1079            format!("{status:?}").to_ascii_lowercase()
1080        )),
1081        SwarmEvent::AgentToolCall {
1082            subtask_id,
1083            tool_name,
1084        } => Some(format!(
1085            "[swarm] subtask id={} tool={}",
1086            &subtask_id.chars().take(8).collect::<String>(),
1087            tool_name
1088        )),
1089        SwarmEvent::AgentError { subtask_id, error } => Some(format!(
1090            "[swarm] subtask id={} error={}",
1091            &subtask_id.chars().take(8).collect::<String>(),
1092            error
1093        )),
1094        SwarmEvent::Complete { success, stats } => Some(format!(
1095            "[swarm] complete success={} subtasks={} speedup={:.2}",
1096            success,
1097            stats.subagents_completed + stats.subagents_failed,
1098            stats.speedup_factor
1099        )),
1100        SwarmEvent::Error(err) => Some(format!("[swarm] error message={}", err)),
1101        _ => None,
1102    }
1103}
1104
1105pub async fn register_worker(
1106    client: &Client,
1107    server: &str,
1108    token: &Option<String>,
1109    worker_id: &str,
1110    name: &str,
1111    codebases: &[String],
1112    public_url: Option<&str>,
1113) -> Result<()> {
1114    // Load ProviderRegistry and collect available models
1115    let models = match load_provider_models().await {
1116        Ok(m) => m,
1117        Err(e) => {
1118            tracing::warn!(
1119                "Failed to load provider models: {}, proceeding without model info",
1120                e
1121            );
1122            HashMap::new()
1123        }
1124    };
1125
1126    // Register via the workers/register endpoint
1127    let mut req = client.post(format!("{}/v1/agent/workers/register", server));
1128
1129    if let Some(t) = token {
1130        req = req.bearer_auth(t);
1131    }
1132
1133    // Flatten models HashMap into array of model objects with pricing data
1134    // matching the format expected by the A2A server's /models and /workers endpoints
1135    let models_array: Vec<serde_json::Value> = models
1136        .iter()
1137        .flat_map(|(provider, model_infos)| {
1138            model_infos.iter().map(move |m| {
1139                let mut obj = serde_json::json!({
1140                    "id": format!("{}/{}", provider, m.id),
1141                    "name": &m.id,
1142                    "provider": provider,
1143                    "provider_id": provider,
1144                });
1145                if let Some(input_cost) = m.input_cost_per_million {
1146                    obj["input_cost_per_million"] = serde_json::json!(input_cost);
1147                }
1148                if let Some(output_cost) = m.output_cost_per_million {
1149                    obj["output_cost_per_million"] = serde_json::json!(output_cost);
1150                }
1151                obj
1152            })
1153        })
1154        .collect();
1155
1156    tracing::info!(
1157        "Registering worker with {} models from {} providers",
1158        models_array.len(),
1159        models.len()
1160    );
1161
1162    let hostname = std::env::var("HOSTNAME")
1163        .or_else(|_| std::env::var("COMPUTERNAME"))
1164        .unwrap_or_else(|_| "unknown".to_string());
1165    let k8s_node_name = std::env::var("K8S_NODE_NAME")
1166        .ok()
1167        .map(|value| value.trim().to_string())
1168        .filter(|value| !value.is_empty());
1169
1170    // Collect agent definitions from the builtin registry
1171    let registry = crate::agent::AgentRegistry::with_builtins();
1172    let agent_defs: Vec<serde_json::Value> = registry
1173        .list()
1174        .iter()
1175        .map(|info| {
1176            serde_json::json!({
1177                "name": info.name,
1178                "description": info.description,
1179                "mode": format!("{:?}", info.mode).to_lowercase(),
1180                "native": info.native,
1181                "hidden": info.hidden,
1182                "model": info.model,
1183                "temperature": info.temperature,
1184                "top_p": info.top_p,
1185                "max_steps": info.max_steps,
1186            })
1187        })
1188        .collect();
1189
1190    let res = req
1191        .json(&serde_json::json!({
1192            "worker_id": worker_id,
1193            "name": name,
1194            "capabilities": worker_capabilities(),
1195            "hostname": hostname,
1196            "k8s_node_name": k8s_node_name,
1197            "models": models_array,
1198            "workspaces": codebases,
1199            "interfaces": advertised_interfaces(public_url),
1200            "agents": agent_defs,
1201        }))
1202        .send()
1203        .await?;
1204
1205    if res.status().is_success() {
1206        tracing::info!("Worker registered successfully");
1207    } else {
1208        tracing::warn!("Failed to register worker: {}", res.status());
1209    }
1210
1211    Ok(())
1212}
1213
1214/// Load ProviderRegistry and collect all available models grouped by provider.
1215/// Tries Vault first, then falls back to config/env vars if Vault is unreachable.
1216/// Returns ModelInfo structs (with pricing data when available).
1217async fn load_provider_models() -> Result<HashMap<String, Vec<crate::provider::ModelInfo>>> {
1218    // Try Vault first
1219    let registry = match ProviderRegistry::from_vault().await {
1220        Ok(r) if !r.list().is_empty() => {
1221            tracing::info!("Loaded {} providers from Vault", r.list().len());
1222            r
1223        }
1224        Ok(_) => {
1225            tracing::warn!("Vault returned 0 providers, falling back to config/env vars");
1226            fallback_registry().await?
1227        }
1228        Err(e) => {
1229            tracing::warn!("Vault unreachable ({}), falling back to config/env vars", e);
1230            fallback_registry().await?
1231        }
1232    };
1233
1234    let mut models_by_provider: HashMap<String, Vec<crate::provider::ModelInfo>> = HashMap::new();
1235
1236    for provider_name in registry.list() {
1237        if let Some(provider) = registry.get(provider_name) {
1238            match provider.list_models().await {
1239                Ok(models) => {
1240                    if !models.is_empty() {
1241                        tracing::debug!("Provider {}: {} models", provider_name, models.len());
1242                        models_by_provider.insert(provider_name.to_string(), models);
1243                    }
1244                }
1245                Err(e) => {
1246                    tracing::debug!("Failed to list models for {}: {}", provider_name, e);
1247                }
1248            }
1249        }
1250    }
1251
1252    Ok(models_by_provider)
1253}
1254
1255/// Fallback: build a ProviderRegistry from config file + environment variables
1256async fn fallback_registry() -> Result<ProviderRegistry> {
1257    let config = crate::config::Config::load().await.unwrap_or_default();
1258    ProviderRegistry::from_config(&config).await
1259}
1260
1261async fn fetch_pending_tasks(runtime: &WorkerTaskRuntime) -> Result<()> {
1262    tracing::info!("Checking for pending tasks...");
1263
1264    let mut req = runtime
1265        .client
1266        .get(format!("{}/v1/agent/tasks?status=pending", runtime.server));
1267    if let Some(t) = &runtime.token {
1268        req = req.bearer_auth(t);
1269    }
1270
1271    let res = req.send().await?;
1272    if !res.status().is_success() {
1273        return Ok(());
1274    }
1275
1276    let data: serde_json::Value = res.json().await?;
1277    // Handle both plain array response and {tasks: [...]} wrapper
1278    let tasks = if let Some(arr) = data.as_array() {
1279        arr.clone()
1280    } else {
1281        data["tasks"].as_array().cloned().unwrap_or_default()
1282    };
1283
1284    tracing::info!("Found {} pending task(s)", tasks.len());
1285
1286    for task in tasks {
1287        if let Some(id) = task["id"].as_str() {
1288            match reserve_task_slot(&runtime.processing, id, runtime.max_concurrent_tasks).await {
1289                TaskReservation::Reserved => {
1290                    let task_id = id.to_string();
1291                    let runtime = runtime.clone();
1292
1293                    tokio::spawn(async move {
1294                        if let Err(e) = handle_task(&runtime, &task).await {
1295                            tracing::error!("Task {} failed: {}", task_id, e);
1296                        }
1297                        runtime.processing.lock().await.remove(&task_id);
1298                    });
1299                }
1300                TaskReservation::AlreadyProcessing => {}
1301                TaskReservation::AtCapacity => {
1302                    tracing::debug!(
1303                        max_concurrent_tasks = runtime.max_concurrent_tasks,
1304                        "Worker is at task capacity; leaving remaining tasks pending"
1305                    );
1306                    break;
1307                }
1308            }
1309        }
1310    }
1311
1312    Ok(())
1313}
1314
1315async fn connect_stream(
1316    runtime: &WorkerTaskRuntime,
1317    name: &str,
1318    codebases: &[String],
1319    task_notify_rx: Option<mpsc::Receiver<String>>,
1320) -> Result<()> {
1321    let url = format!(
1322        "{}/v1/worker/tasks/stream?agent_name={}&worker_id={}",
1323        runtime.server,
1324        urlencoding::encode(name),
1325        urlencoding::encode(&runtime.worker_id)
1326    );
1327
1328    let mut req = runtime
1329        .client
1330        .get(&url)
1331        .header("Accept", "text/event-stream")
1332        .header("X-Worker-ID", &runtime.worker_id)
1333        .header("X-Agent-Name", name)
1334        .header("X-Codebases", codebases.join(","))
1335        .header("X-Workspaces", codebases.join(","));
1336
1337    if let Some(t) = &runtime.token {
1338        req = req.bearer_auth(t);
1339    }
1340
1341    let res = req.send().await?;
1342    if !res.status().is_success() {
1343        anyhow::bail!("Failed to connect: {}", res.status());
1344    }
1345
1346    tracing::info!("Connected to A2A server");
1347
1348    let mut stream = res.bytes_stream();
1349    let mut buffer = String::new();
1350    let mut poll_interval = tokio::time::interval(tokio::time::Duration::from_secs(30));
1351    poll_interval.tick().await; // consume the initial immediate tick
1352
1353    // Pin the optional receiver so we can use it in the loop
1354    let mut task_notify_rx = task_notify_rx;
1355
1356    loop {
1357        tokio::select! {
1358            // Handle task notification from CloudEvent (Knative Eventing)
1359            // Only if the channel was provided (i.e., running with HTTP server)
1360            task_id = async {
1361                if let Some(ref mut rx) = task_notify_rx {
1362                    rx.recv().await
1363                } else {
1364                    // Never ready when None - use pending to skip this branch
1365                    futures::future::pending().await
1366                }
1367            } => {
1368                if let Some(task_id) = task_id {
1369                    tracing::info!("Received task notification via CloudEvent: {}", task_id);
1370                    // Immediately poll for and process this task
1371                    if let Err(e) = poll_pending_tasks(runtime).await {
1372                        tracing::warn!("Task notification poll failed: {}", e);
1373                    }
1374                }
1375            }
1376            chunk = stream.next() => {
1377                match chunk {
1378                    Some(Ok(chunk)) => {
1379                        buffer.push_str(&String::from_utf8_lossy(&chunk));
1380
1381                        // Process SSE events
1382                        while let Some(pos) = buffer.find("\n\n") {
1383                            let event_str = buffer[..pos].to_string();
1384                            buffer = buffer[pos + 2..].to_string();
1385
1386                            if let Some(data_line) = event_str.lines().find(|l| l.starts_with("data:")) {
1387                                let data = data_line.trim_start_matches("data:").trim();
1388                                if data == "[DONE]" || data.is_empty() {
1389                                    continue;
1390                                }
1391
1392                                if let Ok(task) = serde_json::from_str::<serde_json::Value>(data) {
1393                                    spawn_task_handler(&task, runtime).await;
1394                                }
1395                            }
1396                        }
1397                    }
1398                    Some(Err(e)) => {
1399                        return Err(e.into());
1400                    }
1401                    None => {
1402                        // Stream ended
1403                        return Ok(());
1404                    }
1405                }
1406            }
1407            _ = poll_interval.tick() => {
1408                // Periodic poll for pending tasks the SSE stream may have missed
1409                if let Err(e) = poll_pending_tasks(runtime).await {
1410                    tracing::warn!("Periodic task poll failed: {}", e);
1411                }
1412            }
1413        }
1414    }
1415}
1416
1417async fn spawn_task_handler(task: &serde_json::Value, runtime: &WorkerTaskRuntime) {
1418    if let Some(id) = task
1419        .get("task")
1420        .and_then(|t| t["id"].as_str())
1421        .or_else(|| task["id"].as_str())
1422    {
1423        match reserve_task_slot(&runtime.processing, id, runtime.max_concurrent_tasks).await {
1424            TaskReservation::Reserved => {
1425                let task_id = id.to_string();
1426                let task = task.clone();
1427                let runtime = runtime.clone();
1428
1429                tokio::spawn(async move {
1430                    if let Err(e) = handle_task(&runtime, &task).await {
1431                        tracing::error!("Task {} failed: {}", task_id, e);
1432                    }
1433                    runtime.processing.lock().await.remove(&task_id);
1434                });
1435            }
1436            TaskReservation::AlreadyProcessing => {}
1437            TaskReservation::AtCapacity => {
1438                tracing::debug!(
1439                    task_id = id,
1440                    max_concurrent_tasks = runtime.max_concurrent_tasks,
1441                    "Worker is at task capacity; task will stay pending until a slot frees up"
1442                );
1443            }
1444        }
1445    }
1446}
1447
1448async fn poll_pending_tasks(runtime: &WorkerTaskRuntime) -> Result<()> {
1449    let mut req = runtime
1450        .client
1451        .get(format!("{}/v1/agent/tasks?status=pending", runtime.server));
1452    if let Some(t) = &runtime.token {
1453        req = req.bearer_auth(t);
1454    }
1455
1456    let res = req.send().await?;
1457    if !res.status().is_success() {
1458        return Ok(());
1459    }
1460
1461    let data: serde_json::Value = res.json().await?;
1462    let tasks = if let Some(arr) = data.as_array() {
1463        arr.clone()
1464    } else {
1465        data["tasks"].as_array().cloned().unwrap_or_default()
1466    };
1467
1468    if !tasks.is_empty() {
1469        tracing::debug!("Poll found {} pending task(s)", tasks.len());
1470    }
1471
1472    for task in &tasks {
1473        spawn_task_handler(task, runtime).await;
1474    }
1475
1476    Ok(())
1477}
1478
1479async fn handle_task(runtime: &WorkerTaskRuntime, task: &serde_json::Value) -> Result<()> {
1480    let task_id = task_str(task, "id").ok_or_else(|| anyhow::anyhow!("No task ID"))?;
1481    let title = task_str(task, "title").unwrap_or("Untitled");
1482
1483    tracing::info!("Handling task: {} ({})", title, task_id);
1484
1485    // Claim the task
1486    let mut req = runtime
1487        .client
1488        .post(format!("{}/v1/worker/tasks/claim", runtime.server))
1489        .header("X-Worker-ID", &runtime.worker_id);
1490    if let Some(t) = &runtime.token {
1491        req = req.bearer_auth(t);
1492    }
1493
1494    let res = req
1495        .json(&serde_json::json!({ "task_id": task_id }))
1496        .send()
1497        .await?;
1498
1499    if !res.status().is_success() {
1500        let status = res.status();
1501        let text = res.text().await?;
1502        if status == reqwest::StatusCode::CONFLICT {
1503            tracing::debug!(task_id, "Task already claimed by another worker, skipping");
1504        } else {
1505            tracing::warn!(task_id, %status, "Failed to claim task: {}", text);
1506        }
1507        return Ok(());
1508    }
1509
1510    let claim = res.json::<TaskClaimResponse>().await?;
1511    let claim_provenance = claim.into_provenance();
1512    tracing::info!(
1513        task_id,
1514        run_id = ?claim_provenance.run_id,
1515        attempt_id = ?claim_provenance.attempt_id,
1516        "Claimed task"
1517    );
1518
1519    // --- All post-claim work is wrapped so we always release the task ---
1520    let inner_result: Result<(&str, Option<String>, Option<String>, Option<String>)> =
1521        execute_claimed_task(runtime, task, task_id, title, &claim_provenance).await;
1522
1523    let (status, result, error, session_id) = match inner_result {
1524        Ok(tuple) => tuple,
1525        Err(e) => {
1526            tracing::error!(
1527                task_id,
1528                error = %e,
1529                "Task failed after claim (releasing as failed)"
1530            );
1531            (
1532                "failed",
1533                None,
1534                Some(format!("Worker error after claim: {}", e)),
1535                None,
1536            )
1537        }
1538    };
1539
1540    release_task_result(
1541        &runtime.client,
1542        &runtime.server,
1543        &runtime.token,
1544        &runtime.worker_id,
1545        task_id,
1546        status,
1547        result,
1548        error,
1549        session_id,
1550    )
1551    .await?;
1552
1553    tracing::info!("Task released: {} with status: {}", task_id, status);
1554    Ok(())
1555}
1556
1557/// Inner logic for a claimed task. Returns (status, result, error, session_id).
1558/// Extracted so that `handle_task` can catch any error and always release.
1559#[allow(clippy::too_many_lines)]
1560async fn execute_claimed_task<'a>(
1561    runtime: &WorkerTaskRuntime,
1562    task: &'a serde_json::Value,
1563    task_id: &'a str,
1564    title: &'a str,
1565    claim_provenance: &ClaimProvenance,
1566) -> Result<(&'static str, Option<String>, Option<String>, Option<String>)> {
1567    let metadata = task_metadata(task);
1568    let resume_session_id = metadata
1569        .get("resume_session_id")
1570        .and_then(|v| v.as_str())
1571        .map(|s| s.trim().to_string())
1572        .filter(|s| !s.is_empty());
1573    let complexity_hint = metadata_str(&metadata, &["complexity"]);
1574    let model_tier = metadata_str(&metadata, &["model_tier", "tier"])
1575        .map(|s| s.to_ascii_lowercase())
1576        .or_else(|| {
1577            complexity_hint.as_ref().map(|complexity| {
1578                match complexity.to_ascii_lowercase().as_str() {
1579                    "quick" => "fast".to_string(),
1580                    "deep" => "heavy".to_string(),
1581                    _ => "balanced".to_string(),
1582                }
1583            })
1584        });
1585    let worker_personality = metadata_str(
1586        &metadata,
1587        &["worker_personality", "personality", "agent_personality"],
1588    );
1589    let target_agent_name = metadata_str(&metadata, &["target_agent_name", "agent_name"]);
1590    let raw_model = task_str(task, "model_ref")
1591        .or_else(|| metadata_lookup(&metadata, "model_ref").and_then(|v| v.as_str()))
1592        .or_else(|| task_str(task, "model"))
1593        .or_else(|| metadata_lookup(&metadata, "model").and_then(|v| v.as_str()));
1594    let selected_model = raw_model.map(model_ref_to_provider_model);
1595    let raw_agent = task_str(task, "agent_type")
1596        .or_else(|| task_str(task, "agent"))
1597        .unwrap_or("build");
1598    let prompt = task_str(task, "prompt")
1599        .or_else(|| task_str(task, "description"))
1600        .unwrap_or(title);
1601
1602    // Detect virtual/global workspace tasks dispatched via the API.
1603    let workspace_id = task_str(task, "workspace_id")
1604        .or_else(|| metadata_lookup(&metadata, "workspace_id").and_then(|v| v.as_str()));
1605    let is_virtual_task = workspace_id.map_or(false, |ws| ws == "global" || ws.is_empty());
1606
1607    if raw_agent.eq_ignore_ascii_case("clone_repo") {
1608        return match handle_clone_repo_task(
1609            &runtime.client,
1610            &runtime.server,
1611            &runtime.token,
1612            &runtime.worker_id,
1613            task,
1614            &metadata,
1615        )
1616        .await
1617        {
1618            Ok(message) => Ok(("completed", Some(message), None, None)),
1619            Err(err) => {
1620                tracing::error!(task_id, error = %err, "Clone task failed");
1621                Ok(("failed", None, Some(format!("Error: {}", err)), None))
1622            }
1623        };
1624    }
1625
1626    if is_forage_agent(raw_agent) {
1627        return match handle_forage_task(title, prompt, &metadata, selected_model.clone()).await {
1628            Ok(message) => Ok(("completed", Some(message), None, None)),
1629            Err(err) => {
1630                tracing::error!(task_id, error = %err, "Forage task failed");
1631                Ok(("failed", None, Some(format!("Error: {}", err)), None))
1632            }
1633        };
1634    }
1635
1636    // Resume existing session when requested; fall back to a fresh session if missing.
1637    let mut session = if let Some(ref sid) = resume_session_id {
1638        match Session::load(sid).await {
1639            Ok(existing) => {
1640                tracing::info!("Resuming session {} for task {}", sid, task_id);
1641                existing
1642            }
1643            Err(e) => {
1644                tracing::warn!(
1645                    "Could not load session {} for task {} ({}), starting a new session",
1646                    sid,
1647                    task_id,
1648                    e
1649                );
1650                Session::new().await?
1651            }
1652        }
1653    } else {
1654        Session::new().await?
1655    };
1656
1657    if !is_virtual_task {
1658        if let Some(workspace_path) = resolve_task_workspace_dir(
1659            &runtime.client,
1660            &runtime.server,
1661            &runtime.token,
1662            workspace_id,
1663        )
1664        .await?
1665        {
1666            session.metadata.directory = Some(workspace_path);
1667        }
1668    }
1669
1670    // For virtual/global tasks, clear the workspace directory so tools don't
1671    // try to operate on a workspace that doesn't exist on this worker.
1672    if is_virtual_task {
1673        tracing::info!(
1674            task_id,
1675            workspace_id = workspace_id.unwrap_or("(none)"),
1676            "Virtual task detected — skipping workspace setup"
1677        );
1678        session.metadata.directory = None;
1679    }
1680
1681    // Normalize agent: only "build", "plan", and swarm types are valid.
1682    // Map deprecated/unknown agent types (e.g. "general", "explore") to "build".
1683    let agent_type = if is_swarm_agent(raw_agent) {
1684        raw_agent
1685    } else {
1686        match raw_agent {
1687            "build" | "plan" => raw_agent,
1688            other => {
1689                tracing::info!(
1690                    "Agent \"{}\" is not a primary agent, falling back to \"build\"",
1691                    other
1692                );
1693                "build"
1694            }
1695        }
1696    };
1697    session.set_agent_name(agent_type.to_string());
1698    session.attach_claim_provenance(claim_provenance);
1699
1700    // Skip git hook installation for virtual tasks (no workspace directory).
1701    if !is_virtual_task {
1702        if let Some(directory) = session.metadata.directory.as_deref()
1703            && let Err(err) = install_commit_msg_hook(directory)
1704        {
1705            tracing::warn!(task_id, error = %err, "Failed to install commit-msg hook");
1706        }
1707    }
1708
1709    if let Some(model) = selected_model.clone() {
1710        session.metadata.model = Some(model);
1711    }
1712
1713    tracing::info!(task_id, agent_type, "Executing prompt: {}", prompt);
1714
1715    // Set up output streaming to forward progress to the server and bus
1716    let stream_client = runtime.client.clone();
1717    let stream_server = runtime.server.clone();
1718    let stream_token = runtime.token.clone();
1719    let stream_worker_id = runtime.worker_id.clone();
1720    let stream_task_id = task_id.to_string();
1721    let stream_bus = Arc::clone(&runtime.bus);
1722
1723    let output_callback: Arc<dyn Fn(String) + Send + Sync + 'static> =
1724        Arc::new(move |output: String| {
1725            let c = stream_client.clone();
1726            let s = stream_server.clone();
1727            let t = stream_token.clone();
1728            let w = stream_worker_id.clone();
1729            let tid = stream_task_id.clone();
1730
1731            let bus_handle = stream_bus.handle("task-output");
1732            bus_handle.send(
1733                format!("task.{}", tid),
1734                crate::bus::BusMessage::TaskUpdate {
1735                    task_id: tid.clone(),
1736                    state: crate::a2a::types::TaskState::Working,
1737                    message: Some(output.clone()),
1738                },
1739            );
1740
1741            tokio::spawn(async move {
1742                let mut req = c
1743                    .post(format!("{}/v1/agent/tasks/{}/output", s, tid))
1744                    .header("X-Worker-ID", &w);
1745                if let Some(tok) = &t {
1746                    req = req.bearer_auth(tok);
1747                }
1748                let _ = req
1749                    .json(&serde_json::json!({
1750                        "worker_id": w,
1751                        "output": output,
1752                    }))
1753                    .send()
1754                    .await;
1755            });
1756        });
1757
1758    // Execute swarm tasks via SwarmExecutor; all other agents use the standard session loop.
1759    let (status, result, error, session_id) = if is_swarm_agent(agent_type) {
1760        match execute_swarm_with_policy(
1761            &mut session,
1762            prompt,
1763            model_tier.as_deref(),
1764            selected_model,
1765            &metadata,
1766            complexity_hint.as_deref(),
1767            worker_personality.as_deref(),
1768            target_agent_name.as_deref(),
1769            Some(&runtime.bus),
1770            Some(Arc::clone(&output_callback)),
1771        )
1772        .await
1773        {
1774            Ok((session_result, true)) => {
1775                tracing::info!("Swarm task completed successfully: {}", task_id);
1776                (
1777                    "completed",
1778                    Some(session_result.text),
1779                    None,
1780                    Some(session_result.session_id),
1781                )
1782            }
1783            Ok((session_result, false)) => {
1784                tracing::warn!("Swarm task completed with failures: {}", task_id);
1785                (
1786                    "failed",
1787                    Some(session_result.text),
1788                    Some("Swarm execution completed with failures".to_string()),
1789                    Some(session_result.session_id),
1790                )
1791            }
1792            Err(e) => {
1793                tracing::error!("Swarm task failed: {} - {}", task_id, e);
1794                ("failed", None, Some(format!("Error: {}", e)), None)
1795            }
1796        }
1797    } else {
1798        match execute_session_with_policy(
1799            &mut session,
1800            prompt,
1801            runtime.auto_approve,
1802            model_tier.as_deref(),
1803            Some(Arc::clone(&output_callback)),
1804        )
1805        .await
1806        {
1807            Ok(session_result) => {
1808                tracing::info!("Task completed successfully: {}", task_id);
1809                (
1810                    "completed",
1811                    Some(session_result.text),
1812                    None,
1813                    Some(session_result.session_id),
1814                )
1815            }
1816            Err(e) => {
1817                tracing::error!(task_id, error = %e, "Task execution failed");
1818                ("failed", None, Some(format!("Error: {}", e)), None)
1819            }
1820        }
1821    };
1822
1823    Ok((
1824        status,
1825        result,
1826        error,
1827        Some(session_id.unwrap_or_else(|| session.id.clone())),
1828    ))
1829}
1830
1831async fn handle_forage_task(
1832    title: &str,
1833    prompt: &str,
1834    metadata: &serde_json::Map<String, serde_json::Value>,
1835    selected_model: Option<String>,
1836) -> Result<String> {
1837    let forage_args = build_forage_args(prompt, title, metadata, selected_model);
1838    let summary = crate::forage::execute_with_summary(forage_args).await?;
1839    Ok(summary.render_text())
1840}
1841
1842async fn handle_clone_repo_task(
1843    client: &Client,
1844    server: &str,
1845    token: &Option<String>,
1846    worker_id: &str,
1847    task: &serde_json::Value,
1848    metadata: &serde_json::Map<String, serde_json::Value>,
1849) -> Result<String> {
1850    let workspace_id = metadata_str(metadata, &["workspace_id"])
1851        .or_else(|| task_str(task, "workspace_id").map(|value| value.to_string()))
1852        .ok_or_else(|| anyhow::anyhow!("Clone task is missing workspace_id metadata"))?;
1853    let workspace = fetch_workspace_record(client, server, token, &workspace_id).await?;
1854    let git_url = metadata_str(metadata, &["git_url"])
1855        .or_else(|| workspace.git_url.clone())
1856        .ok_or_else(|| anyhow::anyhow!("Workspace {} is missing git_url", workspace_id))?;
1857    let branch = metadata_str(metadata, &["git_branch"])
1858        .or_else(|| workspace.git_branch.clone())
1859        .unwrap_or_else(|| "main".to_string());
1860    let repo_path = resolve_workspace_clone_path(&workspace_id, &workspace.path);
1861
1862    if let Some(parent) = repo_path.parent() {
1863        tokio::fs::create_dir_all(parent)
1864            .await
1865            .with_context(|| format!("Failed to create repo parent {}", parent.display()))?;
1866    }
1867
1868    let temp_helper_path =
1869        git_clone_base_dir().join(format!(".{}-git-credential-helper", workspace_id));
1870    write_git_credential_helper_script(&temp_helper_path, &workspace_id)?;
1871
1872    let clone_result = async {
1873        let git_dir = repo_path.join(".git");
1874        if git_dir.exists() {
1875            configure_repo_git_auth(&repo_path, &workspace_id)?;
1876            configure_repo_git_github_app_from_agent_config(
1877                &repo_path,
1878                Some(&serde_json::Value::Object(workspace.agent_config.clone())),
1879            );
1880            run_git_command_at(
1881                Some(&repo_path),
1882                vec!["fetch".to_string(), "origin".to_string(), branch.clone()],
1883            )
1884            .await?;
1885            run_git_command_at(
1886                Some(&repo_path),
1887                vec!["checkout".to_string(), branch.clone()],
1888            )
1889            .await?;
1890            run_git_command_at(
1891                Some(&repo_path),
1892                vec![
1893                    "pull".to_string(),
1894                    "--ff-only".to_string(),
1895                    "origin".to_string(),
1896                    branch.clone(),
1897                ],
1898            )
1899            .await?;
1900        } else {
1901            prepare_clone_target(&repo_path).await?;
1902
1903            run_git_command_at(
1904                None,
1905                vec![
1906                    "-c".to_string(),
1907                    format!("credential.helper={}", temp_helper_path.display()),
1908                    "-c".to_string(),
1909                    "credential.useHttpPath=true".to_string(),
1910                    "clone".to_string(),
1911                    "--single-branch".to_string(),
1912                    "--branch".to_string(),
1913                    branch.clone(),
1914                    git_url.clone(),
1915                    repo_path.display().to_string(),
1916                ],
1917            )
1918            .await?;
1919            configure_repo_git_auth(&repo_path, &workspace_id)?;
1920            configure_repo_git_github_app_from_agent_config(
1921                &repo_path,
1922                Some(&serde_json::Value::Object(workspace.agent_config.clone())),
1923            );
1924        }
1925        install_commit_msg_hook(&repo_path)?;
1926
1927        register_cloned_workspace(client, server, token, worker_id, &workspace, &repo_path).await?;
1928        enqueue_post_clone_task(client, server, token, worker_id, &workspace_id, metadata).await?;
1929
1930        Ok::<String, anyhow::Error>(format!(
1931            "Repository ready at {} (branch: {})",
1932            repo_path.display(),
1933            branch
1934        ))
1935    }
1936    .await;
1937
1938    let _ = tokio::fs::remove_file(&temp_helper_path).await;
1939    clone_result
1940}
1941
1942async fn register_cloned_workspace(
1943    client: &Client,
1944    server: &str,
1945    token: &Option<String>,
1946    worker_id: &str,
1947    workspace: &RegisteredWorkspaceRecord,
1948    repo_path: &Path,
1949) -> Result<()> {
1950    let mut req = client.post(format!(
1951        "{}/v1/agent/workspaces",
1952        server.trim_end_matches('/')
1953    ));
1954    if let Some(token) = token {
1955        req = req.bearer_auth(token);
1956    }
1957
1958    let response = req
1959        .json(&serde_json::json!({
1960            "workspace_id": workspace.id,
1961            "name": workspace.name,
1962            "path": repo_path.display().to_string(),
1963            "description": workspace.description,
1964            "agent_config": workspace.agent_config,
1965            "git_url": workspace.git_url,
1966            "git_branch": workspace.git_branch,
1967            "worker_id": worker_id,
1968        }))
1969        .send()
1970        .await
1971        .context("Failed to register cloned workspace with server")?;
1972    let status = response.status();
1973    if !status.is_success() {
1974        let body = response.text().await.unwrap_or_default();
1975        anyhow::bail!(
1976            "Failed to register cloned workspace {} ({}): {}",
1977            workspace.id,
1978            status,
1979            body
1980        );
1981    }
1982
1983    Ok(())
1984}
1985
1986async fn enqueue_post_clone_task(
1987    client: &Client,
1988    server: &str,
1989    token: &Option<String>,
1990    worker_id: &str,
1991    workspace_id: &str,
1992    metadata: &serde_json::Map<String, serde_json::Value>,
1993) -> Result<()> {
1994    let Some(post_clone_task) = metadata.get("post_clone_task") else {
1995        return Ok(());
1996    };
1997    let Some(task) = post_clone_task.as_object() else {
1998        return Ok(());
1999    };
2000    let title = task
2001        .get("title")
2002        .and_then(|value| value.as_str())
2003        .filter(|value| !value.trim().is_empty())
2004        .ok_or_else(|| anyhow::anyhow!("post_clone_task is missing title"))?;
2005    let prompt = task
2006        .get("prompt")
2007        .and_then(|value| value.as_str())
2008        .filter(|value| !value.trim().is_empty())
2009        .ok_or_else(|| anyhow::anyhow!("post_clone_task is missing prompt"))?;
2010    let agent_type = task
2011        .get("agent_type")
2012        .and_then(|value| value.as_str())
2013        .unwrap_or("build");
2014    let mut task_metadata = task
2015        .get("metadata")
2016        .cloned()
2017        .unwrap_or_else(|| serde_json::json!({}));
2018    if let Some(task_metadata_obj) = task_metadata.as_object_mut() {
2019        task_metadata_obj
2020            .entry("target_worker_id".to_string())
2021            .or_insert_with(|| serde_json::Value::String(worker_id.to_string()));
2022    }
2023
2024    let mut req = client.post(format!(
2025        "{}/v1/agent/workspaces/{}/tasks",
2026        server.trim_end_matches('/'),
2027        workspace_id
2028    ));
2029    if let Some(token) = token {
2030        req = req.bearer_auth(token);
2031    }
2032
2033    let response = req
2034        .json(&serde_json::json!({
2035            "title": title,
2036            "prompt": prompt,
2037            "agent_type": agent_type,
2038            "metadata": task_metadata,
2039        }))
2040        .send()
2041        .await
2042        .context("Failed to enqueue post-clone task")?;
2043    let status = response.status();
2044    if !status.is_success() {
2045        let body = response.text().await.unwrap_or_default();
2046        anyhow::bail!("Failed to enqueue post-clone task ({}): {}", status, body);
2047    }
2048    Ok(())
2049}
2050
2051async fn run_git_command_at(current_dir: Option<&Path>, args: Vec<String>) -> Result<String> {
2052    let mut command = Command::new("git");
2053    if let Some(dir) = current_dir {
2054        command.current_dir(dir);
2055    }
2056
2057    let output = command
2058        .args(args.iter().map(String::as_str))
2059        .output()
2060        .await
2061        .context("Failed to execute git command")?;
2062
2063    if output.status.success() {
2064        return Ok(String::from_utf8_lossy(&output.stdout).trim().to_string());
2065    }
2066
2067    Err(anyhow::anyhow!(
2068        "Git command failed: {}",
2069        String::from_utf8_lossy(&output.stderr).trim()
2070    ))
2071}
2072
2073async fn release_task_result(
2074    client: &Client,
2075    server: &str,
2076    token: &Option<String>,
2077    worker_id: &str,
2078    task_id: &str,
2079    status: &str,
2080    result: Option<String>,
2081    error: Option<String>,
2082    session_id: Option<String>,
2083) -> Result<()> {
2084    let mut req = client
2085        .post(format!("{}/v1/worker/tasks/release", server))
2086        .header("X-Worker-ID", worker_id);
2087    if let Some(token) = token {
2088        req = req.bearer_auth(token);
2089    }
2090
2091    req.json(&serde_json::json!({
2092        "task_id": task_id,
2093        "status": status,
2094        "result": result,
2095        "error": error,
2096        "session_id": session_id,
2097    }))
2098    .send()
2099    .await
2100    .context("Failed to release task result")?;
2101
2102    Ok(())
2103}
2104
2105async fn execute_swarm_with_policy(
2106    session: &mut Session,
2107    prompt: &str,
2108    model_tier: Option<&str>,
2109    explicit_model: Option<String>,
2110    metadata: &serde_json::Map<String, serde_json::Value>,
2111    complexity_hint: Option<&str>,
2112    worker_personality: Option<&str>,
2113    target_agent_name: Option<&str>,
2114    bus: Option<&Arc<AgentBus>>,
2115    output_callback: Option<Arc<dyn Fn(String) + Send + Sync + 'static>>,
2116) -> Result<(crate::session::SessionResult, bool)> {
2117    use crate::provider::{ContentPart, Message, Role};
2118
2119    session.add_message(Message {
2120        role: Role::User,
2121        content: vec![ContentPart::Text {
2122            text: prompt.to_string(),
2123        }],
2124    });
2125
2126    if session.title.is_none() {
2127        session.generate_title().await?;
2128    }
2129
2130    let strategy = parse_swarm_strategy(metadata);
2131    let max_subagents = metadata_usize(
2132        metadata,
2133        &["swarm_max_subagents", "max_subagents", "subagents"],
2134    )
2135    .unwrap_or(10)
2136    .clamp(1, 100);
2137    let max_steps_per_subagent = metadata_usize(
2138        metadata,
2139        &[
2140            "swarm_max_steps_per_subagent",
2141            "max_steps_per_subagent",
2142            "max_steps",
2143        ],
2144    )
2145    .unwrap_or(50)
2146    .clamp(1, 200);
2147    let timeout_secs = metadata_u64(metadata, &["swarm_timeout_secs", "timeout_secs", "timeout"])
2148        .unwrap_or(600)
2149        .clamp(30, 3600);
2150    let parallel_enabled =
2151        metadata_bool(metadata, &["swarm_parallel_enabled", "parallel_enabled"]).unwrap_or(true);
2152
2153    let model = resolve_swarm_model(explicit_model, model_tier).await;
2154    if let Some(ref selected_model) = model {
2155        session.metadata.model = Some(selected_model.clone());
2156    }
2157
2158    if let Some(ref cb) = output_callback {
2159        cb(format!(
2160            "[swarm] routing complexity={} tier={} personality={} target_agent={}",
2161            complexity_hint.unwrap_or("standard"),
2162            model_tier.unwrap_or("balanced"),
2163            worker_personality.unwrap_or("auto"),
2164            target_agent_name.unwrap_or("auto")
2165        ));
2166        cb(format!(
2167            "[swarm] config strategy={:?} max_subagents={} max_steps={} timeout={}s tier={}",
2168            strategy,
2169            max_subagents,
2170            max_steps_per_subagent,
2171            timeout_secs,
2172            model_tier.unwrap_or("balanced")
2173        ));
2174    }
2175
2176    let swarm_config = SwarmConfig {
2177        max_subagents,
2178        max_steps_per_subagent,
2179        subagent_timeout_secs: timeout_secs,
2180        parallel_enabled,
2181        model,
2182        working_dir: session
2183            .metadata
2184            .directory
2185            .as_ref()
2186            .map(|p| p.to_string_lossy().to_string()),
2187        ..Default::default()
2188    };
2189
2190    let swarm_result = if output_callback.is_some() {
2191        let (event_tx, mut event_rx) = mpsc::channel(256);
2192        let mut executor = SwarmExecutor::new(swarm_config).with_event_tx(event_tx);
2193        if let Some(bus) = bus {
2194            executor = executor.with_bus(Arc::clone(bus));
2195        }
2196        let prompt_owned = prompt.to_string();
2197        let mut exec_handle =
2198            tokio::spawn(async move { executor.execute(&prompt_owned, strategy).await });
2199
2200        let mut final_result: Option<crate::swarm::SwarmResult> = None;
2201
2202        while final_result.is_none() {
2203            tokio::select! {
2204                maybe_event = event_rx.recv() => {
2205                    if let Some(event) = maybe_event
2206                        && let Some(ref cb) = output_callback
2207                            && let Some(line) = format_swarm_event_for_output(&event) {
2208                                cb(line);
2209                            }
2210                }
2211                join_result = &mut exec_handle => {
2212                    let joined = join_result.map_err(|e| anyhow::anyhow!("Swarm join failure: {}", e))?;
2213                    final_result = Some(joined?);
2214                }
2215            }
2216        }
2217
2218        while let Ok(event) = event_rx.try_recv() {
2219            if let Some(ref cb) = output_callback
2220                && let Some(line) = format_swarm_event_for_output(&event)
2221            {
2222                cb(line);
2223            }
2224        }
2225
2226        final_result.ok_or_else(|| anyhow::anyhow!("Swarm execution returned no result"))?
2227    } else {
2228        let mut executor = SwarmExecutor::new(swarm_config);
2229        if let Some(bus) = bus {
2230            executor = executor.with_bus(Arc::clone(bus));
2231        }
2232        executor.execute(prompt, strategy).await?
2233    };
2234
2235    let final_text = if swarm_result.result.trim().is_empty() {
2236        if swarm_result.success {
2237            "Swarm completed without textual output.".to_string()
2238        } else {
2239            "Swarm finished with failures and no textual output.".to_string()
2240        }
2241    } else {
2242        swarm_result.result.clone()
2243    };
2244
2245    session.add_message(Message {
2246        role: Role::Assistant,
2247        content: vec![ContentPart::Text {
2248            text: final_text.clone(),
2249        }],
2250    });
2251    session.save().await?;
2252
2253    Ok((
2254        crate::session::SessionResult {
2255            text: final_text,
2256            session_id: session.id.clone(),
2257        },
2258        swarm_result.success,
2259    ))
2260}
2261
2262/// Execute a session with the given auto-approve policy
2263/// Optionally streams output chunks via the callback
2264async fn execute_session_with_policy(
2265    session: &mut Session,
2266    prompt: &str,
2267    auto_approve: AutoApprove,
2268    model_tier: Option<&str>,
2269    output_callback: Option<Arc<dyn Fn(String) + Send + Sync + 'static>>,
2270) -> Result<crate::session::SessionResult> {
2271    use crate::provider::{
2272        CompletionRequest, ContentPart, Message, ProviderRegistry, Role, parse_model_string,
2273    };
2274    use std::sync::Arc;
2275
2276    // Load provider registry from Vault
2277    let registry = ProviderRegistry::from_vault()
2278        .await
2279        .context("Failed to load provider registry from Vault — check VAULT_ADDR/VAULT_TOKEN")?;
2280    let providers = registry.list();
2281    tracing::info!("Available providers: {:?}", providers);
2282
2283    if providers.is_empty() {
2284        anyhow::bail!(
2285            "No LLM providers available (0 providers loaded). \
2286             Configure API keys in HashiCorp Vault or set environment variables. \
2287             Vault address: {}",
2288            std::env::var("VAULT_ADDR").unwrap_or_else(|_| "(not set)".into())
2289        );
2290    }
2291
2292    // Parse model string
2293    let (provider_name, model_id) = if let Some(ref model_str) = session.metadata.model {
2294        let (prov, model) = parse_model_string(model_str);
2295        let prov = prov.map(|p| if p == "zhipuai" { "zai" } else { p });
2296        if prov.is_some() {
2297            (prov.map(|s| s.to_string()), model.to_string())
2298        } else if providers.contains(&model) {
2299            (Some(model.to_string()), String::new())
2300        } else {
2301            (None, model.to_string())
2302        }
2303    } else {
2304        (None, String::new())
2305    };
2306
2307    let provider_slice = providers.as_slice();
2308    if let Some(explicit_provider) = provider_name.as_deref()
2309        && !providers.contains(&explicit_provider)
2310    {
2311        anyhow::bail!(
2312            "Provider '{}' selected explicitly but is unavailable. Available providers: {}",
2313            explicit_provider,
2314            providers.join(", ")
2315        );
2316    }
2317
2318    // Determine which provider to use, preferring explicit request first, then model tier.
2319    let selected_provider = provider_name
2320        .as_deref()
2321        .unwrap_or_else(|| choose_provider_for_tier(provider_slice, model_tier));
2322
2323    let provider = registry
2324        .get(selected_provider)
2325        .ok_or_else(|| anyhow::anyhow!("Provider {} not found", selected_provider))?;
2326
2327    // Add user message
2328    session.add_message(Message {
2329        role: Role::User,
2330        content: vec![ContentPart::Text {
2331            text: prompt.to_string(),
2332        }],
2333    });
2334
2335    // Generate title
2336    if session.title.is_none() {
2337        session.generate_title().await?;
2338    }
2339
2340    // Determine model.
2341    let model = if !model_id.is_empty() {
2342        model_id
2343    } else {
2344        default_model_for_provider(selected_provider, model_tier)
2345    };
2346
2347    // Create tool registry with filtering based on auto-approve policy
2348    let workspace_dir = session
2349        .metadata
2350        .directory
2351        .clone()
2352        .unwrap_or_else(|| std::env::current_dir().unwrap_or_default());
2353    let tool_registry = create_filtered_registry(
2354        Arc::clone(&provider),
2355        model.clone(),
2356        auto_approve,
2357        &workspace_dir,
2358        output_callback.clone(),
2359    );
2360    let tool_definitions = tool_registry.definitions();
2361
2362    let temperature = if prefers_temperature_one(&model) {
2363        Some(1.0)
2364    } else {
2365        Some(0.7)
2366    };
2367
2368    tracing::info!(
2369        "Using model: {} via provider: {} (tier: {:?})",
2370        model,
2371        selected_provider,
2372        model_tier
2373    );
2374    tracing::info!(
2375        "Available tools: {} (auto_approve: {:?})",
2376        tool_definitions.len(),
2377        auto_approve
2378    );
2379
2380    // Build system prompt
2381    let system_prompt = crate::agent::builtin::build_system_prompt(&workspace_dir);
2382
2383    let mut final_output = String::new();
2384    let max_steps = 50;
2385
2386    for step in 1..=max_steps {
2387        tracing::info!(step = step, "Agent step starting");
2388
2389        // Build messages with system prompt first
2390        let mut messages = vec![Message {
2391            role: Role::System,
2392            content: vec![ContentPart::Text {
2393                text: system_prompt.clone(),
2394            }],
2395        }];
2396        messages.extend(session.messages.clone());
2397
2398        let request = CompletionRequest {
2399            messages,
2400            tools: tool_definitions.clone(),
2401            model: model.clone(),
2402            temperature,
2403            top_p: None,
2404            max_tokens: Some(8192),
2405            stop: Vec::new(),
2406        };
2407
2408        let response = provider.complete(request).await?;
2409
2410        crate::telemetry::TOKEN_USAGE.record_model_usage(
2411            &model,
2412            response.usage.prompt_tokens as u64,
2413            response.usage.completion_tokens as u64,
2414        );
2415
2416        // Extract tool calls
2417        let tool_calls: Vec<(String, String, serde_json::Value)> = response
2418            .message
2419            .content
2420            .iter()
2421            .filter_map(|part| {
2422                if let ContentPart::ToolCall {
2423                    id,
2424                    name,
2425                    arguments,
2426                    ..
2427                } = part
2428                {
2429                    let args: serde_json::Value =
2430                        serde_json::from_str(arguments).unwrap_or(serde_json::json!({}));
2431                    Some((id.clone(), name.clone(), args))
2432                } else {
2433                    None
2434                }
2435            })
2436            .collect();
2437
2438        // Collect text output and stream it
2439        for part in &response.message.content {
2440            if let ContentPart::Text { text } = part
2441                && !text.is_empty()
2442            {
2443                final_output.push_str(text);
2444                final_output.push('\n');
2445                if let Some(ref cb) = output_callback {
2446                    cb(text.clone());
2447                }
2448            }
2449        }
2450
2451        // If no tool calls, we're done
2452        if tool_calls.is_empty() {
2453            session.add_message(response.message.clone());
2454            break;
2455        }
2456
2457        session.add_message(response.message.clone());
2458
2459        tracing::info!(
2460            step = step,
2461            num_tools = tool_calls.len(),
2462            "Executing tool calls"
2463        );
2464
2465        // Execute each tool call
2466        for (tool_id, tool_name, tool_input) in tool_calls {
2467            tracing::info!(tool = %tool_name, tool_id = %tool_id, "Executing tool");
2468
2469            // Stream tool start event
2470            if let Some(ref cb) = output_callback {
2471                cb(format!("[tool:start:{}]", tool_name));
2472            }
2473
2474            // Check if tool is allowed based on auto-approve policy
2475            if !is_tool_allowed(&tool_name, auto_approve) {
2476                let msg = format!(
2477                    "Tool '{}' requires approval but auto-approve policy is {:?}",
2478                    tool_name, auto_approve
2479                );
2480                tracing::warn!(tool = %tool_name, "Tool blocked by auto-approve policy");
2481                session.add_message(Message {
2482                    role: Role::Tool,
2483                    content: vec![ContentPart::ToolResult {
2484                        tool_call_id: tool_id,
2485                        content: msg,
2486                    }],
2487                });
2488                continue;
2489            }
2490
2491            let content = if let Some(tool) = tool_registry.get(&tool_name) {
2492                let exec_result: Result<crate::tool::ToolResult> =
2493                    tool.execute(tool_input.clone()).await;
2494                match exec_result {
2495                    Ok(result) => {
2496                        tracing::info!(tool = %tool_name, success = result.success, "Tool execution completed");
2497                        if let Some(ref cb) = output_callback {
2498                            let status = if result.success { "ok" } else { "err" };
2499                            cb(format!(
2500                                "[tool:{}:{}] {}",
2501                                tool_name,
2502                                status,
2503                                crate::util::truncate_bytes_safe(&result.output, 500)
2504                            ));
2505                        }
2506                        result.output
2507                    }
2508                    Err(e) => {
2509                        tracing::warn!(tool = %tool_name, error = %e, "Tool execution failed");
2510                        if let Some(ref cb) = output_callback {
2511                            cb(format!("[tool:{}:err] {}", tool_name, e));
2512                        }
2513                        format!("Error: {}", e)
2514                    }
2515                }
2516            } else {
2517                tracing::warn!(tool = %tool_name, "Tool not found");
2518                format!("Error: Unknown tool '{}'", tool_name)
2519            };
2520
2521            session.add_message(Message {
2522                role: Role::Tool,
2523                content: vec![ContentPart::ToolResult {
2524                    tool_call_id: tool_id,
2525                    content,
2526                }],
2527            });
2528        }
2529    }
2530
2531    session.save().await?;
2532
2533    Ok(crate::session::SessionResult {
2534        text: final_output.trim().to_string(),
2535        session_id: session.id.clone(),
2536    })
2537}
2538
2539/// Start the heartbeat background task
2540/// Returns a JoinHandle that can be used to cancel the heartbeat
2541pub fn start_heartbeat(
2542    client: Client,
2543    server: String,
2544    token: Option<String>,
2545    heartbeat_state: HeartbeatState,
2546    processing: Arc<Mutex<HashSet<String>>>,
2547    cognition_config: CognitionHeartbeatConfig,
2548) -> JoinHandle<()> {
2549    tokio::spawn(async move {
2550        let mut consecutive_failures = 0u32;
2551        const MAX_FAILURES: u32 = 3;
2552        const HEARTBEAT_INTERVAL_SECS: u64 = 30;
2553        const COGNITION_RETRY_COOLDOWN_SECS: u64 = 300;
2554        let mut cognition_payload_disabled_until: Option<Instant> = None;
2555
2556        let mut interval =
2557            tokio::time::interval(tokio::time::Duration::from_secs(HEARTBEAT_INTERVAL_SECS));
2558        interval.set_missed_tick_behavior(tokio::time::MissedTickBehavior::Skip);
2559
2560        loop {
2561            interval.tick().await;
2562
2563            // Update task count from processing set
2564            let active_count = processing.lock().await.len();
2565            heartbeat_state.set_task_count(active_count).await;
2566
2567            // Determine status based on active tasks
2568            let status = if active_count > 0 {
2569                WorkerStatus::Processing
2570            } else {
2571                WorkerStatus::Idle
2572            };
2573            heartbeat_state.set_status(status).await;
2574
2575            // Send heartbeat
2576            let url = format!(
2577                "{}/v1/agent/workers/{}/heartbeat",
2578                server, heartbeat_state.worker_id
2579            );
2580            let mut req = client.post(&url);
2581
2582            if let Some(ref t) = token {
2583                req = req.bearer_auth(t);
2584            }
2585
2586            let status_str = heartbeat_state.status.lock().await.as_str().to_string();
2587            let sub_agents = heartbeat_state.sub_agents_snapshot().await;
2588            let base_payload = serde_json::json!({
2589                "worker_id": &heartbeat_state.worker_id,
2590                "agent_name": &heartbeat_state.agent_name,
2591                "status": status_str,
2592                "active_task_count": active_count,
2593                "sub_agents": sub_agents,
2594            });
2595            let mut payload = base_payload.clone();
2596            let mut included_cognition_payload = false;
2597            let cognition_payload_allowed = cognition_payload_disabled_until
2598                .map(|until| Instant::now() >= until)
2599                .unwrap_or(true);
2600
2601            if cognition_config.enabled
2602                && cognition_payload_allowed
2603                && let Some(cognition_payload) =
2604                    fetch_cognition_heartbeat_payload(&client, &cognition_config).await
2605                && let Some(obj) = payload.as_object_mut()
2606            {
2607                obj.insert("cognition".to_string(), cognition_payload);
2608                included_cognition_payload = true;
2609            }
2610
2611            match req.json(&payload).send().await {
2612                Ok(res) => {
2613                    if res.status().is_success() {
2614                        consecutive_failures = 0;
2615                        tracing::debug!(
2616                            worker_id = %heartbeat_state.worker_id,
2617                            status = status_str,
2618                            active_tasks = active_count,
2619                            "Heartbeat sent successfully"
2620                        );
2621                    } else if included_cognition_payload && res.status().is_client_error() {
2622                        tracing::warn!(
2623                            worker_id = %heartbeat_state.worker_id,
2624                            status = %res.status(),
2625                            "Heartbeat cognition payload rejected, retrying without cognition payload"
2626                        );
2627
2628                        let mut retry_req = client.post(&url);
2629                        if let Some(ref t) = token {
2630                            retry_req = retry_req.bearer_auth(t);
2631                        }
2632
2633                        match retry_req.json(&base_payload).send().await {
2634                            Ok(retry_res) if retry_res.status().is_success() => {
2635                                cognition_payload_disabled_until = Some(
2636                                    Instant::now()
2637                                        + Duration::from_secs(COGNITION_RETRY_COOLDOWN_SECS),
2638                                );
2639                                consecutive_failures = 0;
2640                                tracing::warn!(
2641                                    worker_id = %heartbeat_state.worker_id,
2642                                    retry_after_secs = COGNITION_RETRY_COOLDOWN_SECS,
2643                                    "Paused cognition heartbeat payload after schema rejection"
2644                                );
2645                            }
2646                            Ok(retry_res) => {
2647                                consecutive_failures += 1;
2648                                tracing::warn!(
2649                                    worker_id = %heartbeat_state.worker_id,
2650                                    status = %retry_res.status(),
2651                                    failures = consecutive_failures,
2652                                    "Heartbeat failed even after retry without cognition payload"
2653                                );
2654                            }
2655                            Err(e) => {
2656                                consecutive_failures += 1;
2657                                tracing::warn!(
2658                                    worker_id = %heartbeat_state.worker_id,
2659                                    error = %e,
2660                                    failures = consecutive_failures,
2661                                    "Heartbeat retry without cognition payload failed"
2662                                );
2663                            }
2664                        }
2665                    } else {
2666                        consecutive_failures += 1;
2667                        tracing::warn!(
2668                            worker_id = %heartbeat_state.worker_id,
2669                            status = %res.status(),
2670                            failures = consecutive_failures,
2671                            "Heartbeat failed"
2672                        );
2673                    }
2674                }
2675                Err(e) => {
2676                    consecutive_failures += 1;
2677                    tracing::warn!(
2678                        worker_id = %heartbeat_state.worker_id,
2679                        error = %e,
2680                        failures = consecutive_failures,
2681                        "Heartbeat request failed"
2682                    );
2683                }
2684            }
2685
2686            // Log error after 3 consecutive failures but do not terminate
2687            if consecutive_failures >= MAX_FAILURES {
2688                tracing::error!(
2689                    worker_id = %heartbeat_state.worker_id,
2690                    failures = consecutive_failures,
2691                    "Heartbeat failed {} consecutive times - worker will continue running and attempt reconnection via SSE loop",
2692                    MAX_FAILURES
2693                );
2694                // Reset counter to avoid spamming error logs
2695                consecutive_failures = 0;
2696            }
2697        }
2698    })
2699}
2700
2701async fn fetch_cognition_heartbeat_payload(
2702    client: &Client,
2703    config: &CognitionHeartbeatConfig,
2704) -> Option<serde_json::Value> {
2705    let status_url = format!("{}/v1/cognition/status", config.source_base_url);
2706    let status_res = tokio::time::timeout(
2707        Duration::from_millis(config.request_timeout_ms),
2708        client.get(status_url).send(),
2709    )
2710    .await
2711    .ok()?
2712    .ok()?;
2713
2714    if !status_res.status().is_success() {
2715        return None;
2716    }
2717
2718    let status: CognitionStatusSnapshot = status_res.json().await.ok()?;
2719    let mut payload = serde_json::json!({
2720        "running": status.running,
2721        "last_tick_at": status.last_tick_at,
2722        "active_persona_count": status.active_persona_count,
2723        "events_buffered": status.events_buffered,
2724        "snapshots_buffered": status.snapshots_buffered,
2725        "loop_interval_ms": status.loop_interval_ms,
2726    });
2727
2728    if config.include_thought_summary {
2729        let snapshot_url = format!("{}/v1/cognition/snapshots/latest", config.source_base_url);
2730        let snapshot_res = tokio::time::timeout(
2731            Duration::from_millis(config.request_timeout_ms),
2732            client.get(snapshot_url).send(),
2733        )
2734        .await
2735        .ok()
2736        .and_then(Result::ok);
2737
2738        if let Some(snapshot_res) = snapshot_res
2739            && snapshot_res.status().is_success()
2740            && let Ok(snapshot) = snapshot_res.json::<CognitionLatestSnapshot>().await
2741            && let Some(obj) = payload.as_object_mut()
2742        {
2743            obj.insert(
2744                "latest_snapshot_at".to_string(),
2745                serde_json::Value::String(snapshot.generated_at),
2746            );
2747            obj.insert(
2748                "latest_thought".to_string(),
2749                serde_json::Value::String(trim_for_heartbeat(
2750                    &snapshot.summary,
2751                    config.summary_max_chars,
2752                )),
2753            );
2754            if let Some(model) = snapshot
2755                .metadata
2756                .get("model")
2757                .and_then(serde_json::Value::as_str)
2758            {
2759                obj.insert(
2760                    "latest_thought_model".to_string(),
2761                    serde_json::Value::String(model.to_string()),
2762                );
2763            }
2764            if let Some(source) = snapshot
2765                .metadata
2766                .get("source")
2767                .and_then(serde_json::Value::as_str)
2768            {
2769                obj.insert(
2770                    "latest_thought_source".to_string(),
2771                    serde_json::Value::String(source.to_string()),
2772                );
2773            }
2774        }
2775    }
2776
2777    Some(payload)
2778}
2779
2780fn trim_for_heartbeat(input: &str, max_chars: usize) -> String {
2781    if input.chars().count() <= max_chars {
2782        return input.trim().to_string();
2783    }
2784
2785    let mut trimmed = String::with_capacity(max_chars + 3);
2786    for ch in input.chars().take(max_chars) {
2787        trimmed.push(ch);
2788    }
2789    trimmed.push_str("...");
2790    trimmed.trim().to_string()
2791}
2792
2793fn env_bool(name: &str, default: bool) -> bool {
2794    std::env::var(name)
2795        .ok()
2796        .and_then(|v| match v.to_ascii_lowercase().as_str() {
2797            "1" | "true" | "yes" | "on" => Some(true),
2798            "0" | "false" | "no" | "off" => Some(false),
2799            _ => None,
2800        })
2801        .unwrap_or(default)
2802}
2803
2804fn env_usize(name: &str, default: usize) -> usize {
2805    std::env::var(name)
2806        .ok()
2807        .and_then(|v| v.parse::<usize>().ok())
2808        .unwrap_or(default)
2809}
2810
2811fn env_u64(name: &str, default: u64) -> u64 {
2812    std::env::var(name)
2813        .ok()
2814        .and_then(|v| v.parse::<u64>().ok())
2815        .unwrap_or(default)
2816}
2817
2818fn maybe_configure_repo_git_auth_from_entry(entry: &serde_json::Value) {
2819    let Some(workspace_id) = entry.get("id").and_then(|v| v.as_str()) else {
2820        return;
2821    };
2822    let Some(path) = entry.get("path").and_then(|v| v.as_str()) else {
2823        return;
2824    };
2825    let has_git_remote = entry
2826        .get("git_url")
2827        .and_then(|v| v.as_str())
2828        .is_some_and(|value| !value.trim().is_empty());
2829    if !has_git_remote {
2830        return;
2831    }
2832
2833    let repo_path = Path::new(path);
2834    if !repo_path.join(".git").exists() {
2835        return;
2836    }
2837
2838    if let Err(error) = configure_repo_git_auth(repo_path, workspace_id) {
2839        tracing::debug!(
2840            workspace_id,
2841            path,
2842            error = %error,
2843            "Workspace sync could not install Git credential helper"
2844        );
2845    }
2846    configure_repo_git_github_app_from_agent_config(repo_path, entry.get("agent_config"));
2847}
2848
2849fn configure_repo_git_github_app_from_agent_config(
2850    repo_path: &Path,
2851    agent_config: Option<&serde_json::Value>,
2852) {
2853    let installation_id = agent_config
2854        .and_then(|value| value.get("git_auth"))
2855        .and_then(|value| value.get("github_app"))
2856        .and_then(|value| value.get("installation_id"))
2857        .and_then(|value| value.as_str());
2858    let app_id = agent_config
2859        .and_then(|value| value.get("git_auth"))
2860        .and_then(|value| value.get("github_app"))
2861        .and_then(|value| value.get("app_id"))
2862        .and_then(|value| value.as_str());
2863    if let Err(error) = configure_repo_git_github_app(repo_path, installation_id, app_id) {
2864        tracing::debug!(path = %repo_path.display(), error = %error, "Failed to persist GitHub App repo metadata");
2865    }
2866}
2867
2868/// Start a background task that periodically fetches the server's workspace list
2869/// and auto-registers any whose path exists on this machine's filesystem.
2870/// This lets workers pick up new codebases without restarting.
2871fn start_workspace_sync(
2872    client: Client,
2873    server: String,
2874    token: Option<String>,
2875    shared_codebases: Arc<Mutex<Vec<String>>>,
2876) -> JoinHandle<()> {
2877    tokio::spawn(async move {
2878        const POLL_INTERVAL_SECS: u64 = 60;
2879        let mut interval = tokio::time::interval(Duration::from_secs(POLL_INTERVAL_SECS));
2880        interval.set_missed_tick_behavior(tokio::time::MissedTickBehavior::Skip);
2881        interval.tick().await; // skip the immediate first tick -- let the worker settle first
2882
2883        loop {
2884            interval.tick().await;
2885            if let Err(e) =
2886                sync_workspaces_from_server(&client, &server, &token, &shared_codebases).await
2887            {
2888                tracing::warn!("Workspace sync failed: {}", e);
2889            }
2890        }
2891    })
2892}
2893
2894/// Fetch the server's workspace/codebase list and add any locally-present paths
2895/// that are not already in the worker's registered codebases.
2896/// New paths take effect on the next SSE reconnect (re-register re-sends X-Workspaces).
2897async fn sync_workspaces_from_server(
2898    client: &Client,
2899    server: &str,
2900    token: &Option<String>,
2901    shared_codebases: &Arc<Mutex<Vec<String>>>,
2902) -> Result<()> {
2903    let mut req = client.get(format!("{}/v1/agent/workspaces", server));
2904    if let Some(t) = token {
2905        req = req.bearer_auth(t);
2906    }
2907
2908    let res = req.send().await?;
2909    if !res.status().is_success() {
2910        tracing::debug!(
2911            status = %res.status(),
2912            "Workspace sync: server returned non-success, skipping"
2913        );
2914        return Ok(());
2915    }
2916
2917    let data: serde_json::Value = res.json().await?;
2918
2919    // Server returns { workspaces: [...] } or { codebases: [...] }
2920    let entries = if let Some(arr) = data.as_array() {
2921        arr.clone()
2922    } else {
2923        data["workspaces"]
2924            .as_array()
2925            .or_else(|| data["codebases"].as_array())
2926            .cloned()
2927            .unwrap_or_default()
2928    };
2929
2930    let mut new_paths: Vec<String> = Vec::new();
2931    {
2932        let current = shared_codebases.lock().await;
2933        for entry in &entries {
2934            maybe_configure_repo_git_auth_from_entry(entry);
2935            let path = match entry["path"].as_str().filter(|p| !p.is_empty()) {
2936                Some(p) => p,
2937                None => continue,
2938            };
2939            // Only auto-register if the path physically exists on this machine
2940            // and is not already in the codebases list
2941            if std::path::Path::new(path).exists() && !current.iter().any(|c| c.as_str() == path) {
2942                new_paths.push(path.to_string());
2943            }
2944        }
2945    }
2946
2947    if !new_paths.is_empty() {
2948        let mut current = shared_codebases.lock().await;
2949        for path in &new_paths {
2950            tracing::info!(
2951                path = %path,
2952                "Workspace sync: auto-discovered local path, adding to codebases"
2953            );
2954            current.push(path.clone());
2955        }
2956        tracing::info!(
2957            added = new_paths.len(),
2958            total = current.len(),
2959            "Workspace sync complete -- new paths take effect on next reconnect"
2960        );
2961    } else {
2962        tracing::debug!("Workspace sync: no new local paths found");
2963    }
2964
2965    Ok(())
2966}
2967
2968#[cfg(test)]
2969mod tests {
2970    use super::*;
2971    use serde_json::json;
2972
2973    #[test]
2974    fn metadata_lookup_reads_nested_forage_keys() {
2975        let metadata = json!({
2976            "forage": {
2977                "execute": true,
2978                "top": 5,
2979            }
2980        })
2981        .as_object()
2982        .cloned()
2983        .unwrap();
2984
2985        assert_eq!(metadata_bool(&metadata, &["execute"]), Some(true));
2986        assert_eq!(metadata_usize(&metadata, &["top"]), Some(5));
2987    }
2988
2989    #[test]
2990    fn build_forage_args_defaults_prompt_to_moonshot_and_local_mode() {
2991        let metadata = serde_json::Map::new();
2992        let args = build_forage_args(
2993            "Expand enterprise adoption in regulated markets",
2994            "forage task",
2995            &metadata,
2996            Some("openrouter/z-ai/glm-5".to_string()),
2997        );
2998
2999        assert_eq!(
3000            args.moonshots,
3001            vec!["Expand enterprise adoption in regulated markets".to_string()]
3002        );
3003        assert!(args.no_s3);
3004        assert_eq!(args.model.as_deref(), Some("openrouter/z-ai/glm-5"));
3005        assert_eq!(args.execution_engine, "run");
3006    }
3007
3008    #[test]
3009    fn build_forage_args_honors_nested_forage_configuration() {
3010        let metadata = json!({
3011            "forage": {
3012                "top": 7,
3013                "loop": true,
3014                "max_cycles": 2,
3015                "execute": true,
3016                "no_s3": false,
3017                "moonshots": ["Ship autonomous OKR execution", "Reduce operator toil"],
3018                "moonshot_required": true,
3019                "moonshot_min_alignment": "0.25",
3020                "execution_engine": "swarm",
3021                "run_timeout_secs": 1200,
3022                "fail_fast": true,
3023                "swarm_strategy": "stage",
3024                "swarm_max_subagents": 4,
3025                "swarm_max_steps": 42,
3026                "swarm_subagent_timeout_secs": 180
3027            }
3028        })
3029        .as_object()
3030        .cloned()
3031        .unwrap();
3032
3033        let args = build_forage_args("fallback prompt", "title", &metadata, None);
3034
3035        assert_eq!(args.top, 7);
3036        assert!(args.loop_mode);
3037        assert_eq!(args.max_cycles, 2);
3038        assert!(args.execute);
3039        assert!(!args.no_s3);
3040        assert_eq!(args.moonshots.len(), 2);
3041        assert!(args.moonshot_required);
3042        assert!((args.moonshot_min_alignment - 0.25).abs() < f64::EPSILON);
3043        assert_eq!(args.execution_engine, "swarm");
3044        assert_eq!(args.run_timeout_secs, 1200);
3045        assert!(args.fail_fast);
3046        assert_eq!(args.swarm_strategy, "stage");
3047        assert_eq!(args.swarm_max_subagents, 4);
3048        assert_eq!(args.swarm_max_steps, 42);
3049        assert_eq!(args.swarm_subagent_timeout_secs, 180);
3050    }
3051
3052    #[test]
3053    fn resolve_worker_id_prefers_env() {
3054        let original = std::env::var("CODETETHER_WORKER_ID").ok();
3055        unsafe {
3056            std::env::set_var("CODETETHER_WORKER_ID", "harvester-test-worker");
3057        }
3058        let resolved = resolve_worker_id();
3059        match original {
3060            Some(value) => unsafe {
3061                std::env::set_var("CODETETHER_WORKER_ID", value);
3062            },
3063            None => unsafe {
3064                std::env::remove_var("CODETETHER_WORKER_ID");
3065            },
3066        }
3067        assert_eq!(resolved, "harvester-test-worker");
3068    }
3069
3070    #[test]
3071    fn advertised_interfaces_include_http_and_bus_urls() {
3072        let interfaces = advertised_interfaces(Some("http://worker.test:8080/"));
3073        assert_eq!(interfaces["http"]["base_url"], "http://worker.test:8080");
3074        assert_eq!(
3075            interfaces["bus"]["stream_url"],
3076            "http://worker.test:8080/v1/bus/stream"
3077        );
3078        assert_eq!(
3079            interfaces["bus"]["publish_url"],
3080            "http://worker.test:8080/v1/bus/publish"
3081        );
3082    }
3083
3084    #[test]
3085    fn advertised_interfaces_omit_empty_public_url() {
3086        assert_eq!(advertised_interfaces(None), serde_json::json!({}));
3087        assert_eq!(advertised_interfaces(Some("   ")), serde_json::json!({}));
3088    }
3089
3090    #[tokio::test]
3091    async fn reserve_task_slot_enforces_capacity() {
3092        let processing = Arc::new(Mutex::new(HashSet::new()));
3093
3094        assert_eq!(
3095            reserve_task_slot(&processing, "task-1", 2).await,
3096            TaskReservation::Reserved
3097        );
3098        assert_eq!(
3099            reserve_task_slot(&processing, "task-1", 2).await,
3100            TaskReservation::AlreadyProcessing
3101        );
3102        assert_eq!(
3103            reserve_task_slot(&processing, "task-2", 2).await,
3104            TaskReservation::Reserved
3105        );
3106        assert_eq!(
3107            reserve_task_slot(&processing, "task-3", 2).await,
3108            TaskReservation::AtCapacity
3109        );
3110    }
3111
3112    #[test]
3113    fn normalize_max_concurrent_tasks_never_returns_zero() {
3114        assert_eq!(normalize_max_concurrent_tasks(0), 1);
3115        assert_eq!(normalize_max_concurrent_tasks(3), 3);
3116    }
3117}