Skip to main content

codetether_agent/a2a/
worker.rs

1//! A2A Worker - connects to an A2A server to process tasks
2
3mod clone_location;
4mod clone_target;
5mod model_defaults;
6mod model_preferences;
7pub(crate) mod task_timeline;
8pub(super) mod workspace_resolve;
9
10use crate::a2a::claim::TaskClaimResponse;
11use crate::a2a::git_credentials::{
12    configure_repo_git_auth, configure_repo_git_github_app, write_git_credential_helper_script,
13};
14use crate::a2a::worker_workspace_record::{RegisteredWorkspaceRecord, fetch_workspace_record};
15use crate::bus::AgentBus;
16use crate::cli::{A2aArgs, ForageArgs};
17use crate::provenance::{ClaimProvenance, install_commit_msg_hook};
18use crate::provider::ProviderRegistry;
19use crate::session::Session;
20use crate::swarm::{DecompositionStrategy, SwarmConfig, SwarmExecutor};
21use crate::tui::swarm_view::SwarmEvent;
22use anyhow::{Context, Result};
23use futures::StreamExt;
24use reqwest::Client;
25use serde::Deserialize;
26use std::collections::HashMap;
27use std::collections::HashSet;
28use std::path::{Path, PathBuf};
29use std::sync::Arc;
30use std::time::Duration;
31use tokio::process::Command;
32use tokio::sync::{Mutex, mpsc};
33use tokio::task::JoinHandle;
34use tokio::time::Instant;
35
36use self::model_defaults::{default_model_for_provider, prefers_temperature_one};
37use self::model_preferences::choose_provider_for_tier;
38use crate::a2a::task_scope::check_task_scope;
39use crate::a2a::worker_tool_registry::{create_filtered_registry, is_tool_allowed};
40use crate::a2a::worker_workspace_context::resolve_task_workspace_dir;
41use clone_location::{git_clone_base_dir, resolve_workspace_clone_path};
42use clone_target::prepare_clone_target;
43
44/// Worker status for heartbeat
45#[derive(Debug, Clone, Copy, PartialEq, Eq)]
46pub enum WorkerStatus {
47    Idle,
48    Processing,
49}
50
51impl WorkerStatus {
52    pub fn as_str(&self) -> &'static str {
53        match self {
54            WorkerStatus::Idle => "idle",
55            WorkerStatus::Processing => "processing",
56        }
57    }
58}
59
60/// Heartbeat state shared between the heartbeat task and the main worker
61#[derive(Clone)]
62pub struct HeartbeatState {
63    worker_id: String,
64    pub agent_name: String,
65    pub status: Arc<Mutex<WorkerStatus>>,
66    pub active_task_count: Arc<Mutex<usize>>,
67    pub sub_agents: Arc<Mutex<HashSet<String>>>,
68}
69
70impl HeartbeatState {
71    pub fn new(worker_id: String, agent_name: String) -> Self {
72        Self {
73            worker_id,
74            agent_name,
75            status: Arc::new(Mutex::new(WorkerStatus::Idle)),
76            active_task_count: Arc::new(Mutex::new(0)),
77            sub_agents: Arc::new(Mutex::new(HashSet::new())),
78        }
79    }
80
81    pub async fn set_status(&self, status: WorkerStatus) {
82        *self.status.lock().await = status;
83    }
84
85    pub async fn set_task_count(&self, count: usize) {
86        *self.active_task_count.lock().await = count;
87    }
88
89    pub async fn register_sub_agent(&self, agent_name: String) {
90        self.sub_agents.lock().await.insert(agent_name);
91    }
92
93    pub async fn deregister_sub_agent(&self, agent_name: &str) {
94        self.sub_agents.lock().await.remove(agent_name);
95    }
96
97    pub async fn sub_agents_snapshot(&self) -> Vec<String> {
98        let mut names = self
99            .sub_agents
100            .lock()
101            .await
102            .iter()
103            .cloned()
104            .collect::<Vec<_>>();
105        names.sort();
106        names
107    }
108}
109
110#[derive(Clone, Debug)]
111#[allow(dead_code)]
112pub struct CognitionHeartbeatConfig {
113    pub enabled: bool,
114    pub source_base_url: String,
115    pub token: Option<String>,
116    pub provider_name: String,
117    pub interval_secs: u64,
118    pub include_thought_summary: bool,
119    pub summary_max_chars: usize,
120    pub request_timeout_ms: u64,
121}
122
123impl CognitionHeartbeatConfig {
124    pub fn from_env() -> Self {
125        let source_base_url = std::env::var("CODETETHER_WORKER_COGNITION_SOURCE_URL")
126            .unwrap_or_else(|_| "http://127.0.0.1:4096".to_string())
127            .trim_end_matches('/')
128            .to_string();
129
130        Self {
131            enabled: env_bool("CODETETHER_WORKER_COGNITION_SHARE_ENABLED", true),
132            source_base_url,
133            include_thought_summary: env_bool("CODETETHER_WORKER_COGNITION_INCLUDE_THOUGHTS", true),
134            summary_max_chars: env_usize("CODETETHER_WORKER_COGNITION_THOUGHT_MAX_CHARS", 480),
135            request_timeout_ms: env_u64("CODETETHER_WORKER_COGNITION_TIMEOUT_MS", 2_500).max(250),
136            interval_secs: env_u64("CODETETHER_WORKER_COGNITION_INTERVAL_SECS", 30).max(5),
137            provider_name: std::env::var("CODETETHER_WORKER_COGNITION_PROVIDER")
138                .unwrap_or_else(|_| "cognition".to_string()),
139            token: std::env::var("CODETETHER_WORKER_COGNITION_TOKEN").ok(),
140        }
141    }
142}
143
144#[derive(Debug, Deserialize)]
145struct CognitionStatusSnapshot {
146    running: bool,
147    #[serde(default)]
148    last_tick_at: Option<String>,
149    #[serde(default)]
150    active_persona_count: usize,
151    #[serde(default)]
152    events_buffered: usize,
153    #[serde(default)]
154    snapshots_buffered: usize,
155    #[serde(default)]
156    loop_interval_ms: u64,
157}
158
159#[derive(Debug, Deserialize)]
160struct CognitionLatestSnapshot {
161    generated_at: String,
162    summary: String,
163    #[serde(default)]
164    metadata: HashMap<String, serde_json::Value>,
165}
166
167#[derive(Debug, Clone, Copy, PartialEq, Eq)]
168enum TaskReservation {
169    Reserved,
170    AlreadyProcessing,
171    AtCapacity,
172}
173
174#[derive(Clone)]
175struct WorkerTaskRuntime {
176    client: Client,
177    server: String,
178    token: Option<String>,
179    worker_id: String,
180    agent_name: String,
181    processing: Arc<Mutex<HashSet<String>>>,
182    max_concurrent_tasks: usize,
183    auto_approve: AutoApprove,
184    bus: Arc<AgentBus>,
185    /// Shared progress state for the currently active task (read by heartbeat).
186    task_progress: Arc<Mutex<task_timeline::TaskProgressState>>,
187    /// Server-side workspace IDs this worker is scoped to (empty = accept all).
188    workspace_ids: Vec<String>,
189}
190
191// Run the A2A worker
192pub async fn run(args: A2aArgs) -> Result<()> {
193    let server = args.server.trim_end_matches('/');
194    let name = args
195        .name
196        .unwrap_or_else(|| format!("codetether-{}", std::process::id()));
197    let worker_id = resolve_worker_id();
198    export_worker_runtime_env(server, &args.token, &worker_id);
199    let max_concurrent_tasks = normalize_max_concurrent_tasks(args.max_concurrent_tasks);
200
201    let codebases: Vec<String> = args
202        .workspaces
203        .map(|c| c.split(',').map(|s| s.trim().to_string()).collect())
204        .unwrap_or_else(|| vec![std::env::current_dir().unwrap().display().to_string()]);
205
206    tracing::info!("Starting A2A worker: {} ({})", name, worker_id);
207    tracing::info!("Server: {}", server);
208    tracing::info!("Workspaces: {:?}", codebases);
209    tracing::info!(max_concurrent_tasks, "Worker task concurrency configured");
210
211    // Wrap in shared mutex so background workspace-sync can add new local paths
212    let shared_codebases: Arc<Mutex<Vec<String>>> = Arc::new(Mutex::new(codebases));
213
214    let client = build_worker_http_client()?;
215    let processing = Arc::new(Mutex::new(HashSet::<String>::new()));
216    let cognition_heartbeat = CognitionHeartbeatConfig::from_env();
217    if cognition_heartbeat.enabled {
218        tracing::info!(
219            source = %cognition_heartbeat.source_base_url,
220            include_thoughts = cognition_heartbeat.include_thought_summary,
221            max_chars = cognition_heartbeat.summary_max_chars,
222            timeout_ms = cognition_heartbeat.request_timeout_ms,
223            "Cognition heartbeat sharing enabled (set CODETETHER_WORKER_COGNITION_SHARE_ENABLED=false to disable)"
224        );
225    } else {
226        tracing::warn!(
227            "Cognition heartbeat sharing disabled; worker thought state will not be shared upstream"
228        );
229    }
230
231    let auto_approve = match args.auto_approve.as_str() {
232        "all" => AutoApprove::All,
233        "safe" => AutoApprove::Safe,
234        _ => AutoApprove::None,
235    };
236
237    // Create heartbeat state
238    let heartbeat_state = HeartbeatState::new(worker_id.clone(), name.clone());
239
240    // Create agent bus for in-process sub-agent communication
241    let bus = AgentBus::new().into_arc();
242
243    // Auto-start S3 sink if MinIO is configured
244    crate::bus::s3_sink::spawn_bus_s3_sink(bus.clone());
245
246    {
247        let handle = bus.handle(&worker_id);
248        handle.announce_ready(worker_capabilities());
249    }
250
251    let task_progress: Arc<Mutex<task_timeline::TaskProgressState>> =
252        Arc::new(Mutex::new(task_timeline::TaskProgressState::new()));
253
254    let task_runtime = WorkerTaskRuntime {
255        client: client.clone(),
256        server: server.to_string(),
257        token: args.token.clone(),
258        worker_id: worker_id.clone(),
259        agent_name: name.clone(),
260        processing: processing.clone(),
261        max_concurrent_tasks,
262        auto_approve,
263        bus: bus.clone(),
264        task_progress: task_progress.clone(),
265        workspace_ids: Vec::new(),
266    };
267
268    // Register worker
269    {
270        let codebases = shared_codebases.lock().await.clone();
271        register_worker(
272            &client,
273            server,
274            &args.token,
275            &worker_id,
276            &name,
277            &codebases,
278            args.public_url.as_deref(),
279        )
280        .await?;
281    }
282
283    if let Err(e) =
284        sync_workspaces_from_server(&client, server, &args.token, &shared_codebases).await
285    {
286        tracing::warn!(error = %e, "Initial workspace sync failed");
287    }
288
289    // Fetch pending tasks
290    fetch_pending_tasks(&task_runtime).await?;
291
292    // Start background task that polls server for new locally-present workspaces
293    let _workspace_sync_handle = start_workspace_sync(
294        client.clone(),
295        server.to_string(),
296        args.token.clone(),
297        shared_codebases.clone(),
298    );
299
300    // Connect to SSE stream
301    loop {
302        // Refresh codebases snapshot (background sync may have added new local paths)
303        let codebases = shared_codebases.lock().await.clone();
304
305        // Re-register worker on each reconnection to report updated models/capabilities
306        if let Err(e) = register_worker(
307            &client,
308            server,
309            &args.token,
310            &worker_id,
311            &name,
312            &codebases,
313            args.public_url.as_deref(),
314        )
315        .await
316        {
317            tracing::warn!("Failed to re-register worker on reconnection: {}", e);
318        }
319        if let Err(e) = fetch_pending_tasks(&task_runtime).await {
320            tracing::warn!("Reconnect task fetch failed: {}", e);
321        }
322
323        // Start heartbeat task for this connection
324        let heartbeat_handle = start_heartbeat(
325            client.clone(),
326            server.to_string(),
327            args.token.clone(),
328            heartbeat_state.clone(),
329            processing.clone(),
330            cognition_heartbeat.clone(),
331            task_progress.clone(),
332        );
333
334        match connect_stream(&task_runtime, &name, &codebases, None).await {
335            Ok(StreamDisconnectReason::Ended) => {
336                tracing::warn!("Stream ended, reconnecting...");
337            }
338            Ok(StreamDisconnectReason::ReadError(error)) => {
339                tracing::warn!(error = %error, "Stream read failed, reconnecting...");
340            }
341            Err(e) => {
342                tracing::error!("Stream error: {}, reconnecting...", e);
343            }
344        }
345
346        // Cancel heartbeat on disconnection
347        heartbeat_handle.abort();
348        tracing::debug!("Heartbeat cancelled for reconnection");
349
350        tokio::time::sleep(tokio::time::Duration::from_secs(5)).await;
351    }
352}
353
354/// Run the A2A worker with shared state for HTTP server integration
355/// This variant accepts a WorkerServerState to communicate with the HTTP server
356pub async fn run_with_state(
357    args: A2aArgs,
358    server_state: crate::worker_server::WorkerServerState,
359) -> Result<()> {
360    let server = args.server.trim_end_matches('/');
361    let name = args
362        .name
363        .unwrap_or_else(|| format!("codetether-{}", std::process::id()));
364    let worker_id = resolve_worker_id();
365    export_worker_runtime_env(server, &args.token, &worker_id);
366    let max_concurrent_tasks = normalize_max_concurrent_tasks(args.max_concurrent_tasks);
367
368    // Share worker_id with HTTP server
369    server_state.set_worker_id(worker_id.clone()).await;
370
371    let codebases: Vec<String> = args
372        .workspaces
373        .map(|c| c.split(',').map(|s| s.trim().to_string()).collect())
374        .unwrap_or_else(|| vec![std::env::current_dir().unwrap().display().to_string()]);
375
376    tracing::info!("Starting A2A worker: {} ({})", name, worker_id);
377    tracing::info!("Server: {}", server);
378    tracing::info!("Workspaces: {:?}", codebases);
379    tracing::info!(max_concurrent_tasks, "Worker task concurrency configured");
380
381    // Wrap in shared mutex so background workspace-sync can add new local paths
382    let shared_codebases: Arc<Mutex<Vec<String>>> = Arc::new(Mutex::new(codebases));
383
384    let client = build_worker_http_client()?;
385    let processing = Arc::new(Mutex::new(HashSet::<String>::new()));
386    let cognition_heartbeat = CognitionHeartbeatConfig::from_env();
387    if cognition_heartbeat.enabled {
388        tracing::info!(
389            source = %cognition_heartbeat.source_base_url,
390            include_thoughts = cognition_heartbeat.include_thought_summary,
391            max_chars = cognition_heartbeat.summary_max_chars,
392            timeout_ms = cognition_heartbeat.request_timeout_ms,
393            "Cognition heartbeat sharing enabled (set CODETETHER_WORKER_COGNITION_SHARE_ENABLED=false to disable)"
394        );
395    } else {
396        tracing::warn!(
397            "Cognition heartbeat sharing disabled; worker thought state will not be shared upstream"
398        );
399    }
400
401    let auto_approve = match args.auto_approve.as_str() {
402        "all" => AutoApprove::All,
403        "safe" => AutoApprove::Safe,
404        _ => AutoApprove::None,
405    };
406
407    // Create heartbeat state
408    let heartbeat_state = HeartbeatState::new(worker_id.clone(), name.clone());
409
410    // Share heartbeat state with HTTP server
411    server_state
412        .set_heartbeat_state(Arc::new(heartbeat_state.clone()))
413        .await;
414
415    // Create agent bus for in-process sub-agent communication
416    let bus = AgentBus::new().into_arc();
417    server_state.set_bus(bus.clone()).await;
418
419    // Auto-start S3 sink if MinIO is configured
420    crate::bus::s3_sink::spawn_bus_s3_sink(bus.clone());
421
422    {
423        let handle = bus.handle(&worker_id);
424        handle.announce_ready(worker_capabilities());
425    }
426
427    let task_progress_ws: Arc<Mutex<task_timeline::TaskProgressState>> =
428        Arc::new(Mutex::new(task_timeline::TaskProgressState::new()));
429
430    let task_runtime = WorkerTaskRuntime {
431        client: client.clone(),
432        server: server.to_string(),
433        token: args.token.clone(),
434        worker_id: worker_id.clone(),
435        agent_name: name.clone(),
436        processing: processing.clone(),
437        max_concurrent_tasks,
438        auto_approve,
439        bus: bus.clone(),
440        task_progress: task_progress_ws.clone(),
441        workspace_ids: Vec::new(),
442    };
443
444    // Register worker
445    {
446        let codebases = shared_codebases.lock().await.clone();
447        register_worker(
448            &client,
449            server,
450            &args.token,
451            &worker_id,
452            &name,
453            &codebases,
454            args.public_url.as_deref(),
455        )
456        .await?;
457    }
458
459    if let Err(e) =
460        sync_workspaces_from_server(&client, server, &args.token, &shared_codebases).await
461    {
462        tracing::warn!(error = %e, "Initial workspace sync failed");
463    }
464
465    // Mark as connected
466    server_state.set_connected(true).await;
467
468    // Fetch pending tasks before entering reconnection loop
469    fetch_pending_tasks(&task_runtime).await?;
470
471    // Start background task that polls server for new locally-present workspaces
472    let _workspace_sync_handle = start_workspace_sync(
473        client.clone(),
474        server.to_string(),
475        args.token.clone(),
476        shared_codebases.clone(),
477    );
478
479    // Connect to SSE stream
480    loop {
481        // Refresh codebases snapshot (background sync may have added new local paths)
482        let codebases = shared_codebases.lock().await.clone();
483
484        // Create task notification channel for CloudEvent-triggered task execution
485        // Recreate on each reconnection since the receiver is moved into connect_stream
486        let (task_notify_tx, task_notify_rx) = mpsc::channel::<String>(32);
487        server_state
488            .set_task_notification_channel(task_notify_tx)
489            .await;
490
491        // Mark as connected on each reconnection
492        server_state.set_connected(true).await;
493
494        // Re-register worker on each reconnection to report updated models/capabilities
495        if let Err(e) = register_worker(
496            &client,
497            server,
498            &args.token,
499            &worker_id,
500            &name,
501            &codebases,
502            args.public_url.as_deref(),
503        )
504        .await
505        {
506            tracing::warn!("Failed to re-register worker on reconnection: {}", e);
507        }
508        if let Err(e) = fetch_pending_tasks(&task_runtime).await {
509            tracing::warn!("Reconnect task fetch failed: {}", e);
510        }
511
512        // Start heartbeat task for this connection
513        let heartbeat_handle = start_heartbeat(
514            client.clone(),
515            server.to_string(),
516            args.token.clone(),
517            heartbeat_state.clone(),
518            processing.clone(),
519            cognition_heartbeat.clone(),
520            task_progress_ws.clone(),
521        );
522
523        match connect_stream(&task_runtime, &name, &codebases, Some(task_notify_rx)).await {
524            Ok(StreamDisconnectReason::Ended) => {
525                tracing::warn!("Stream ended, reconnecting...");
526            }
527            Ok(StreamDisconnectReason::ReadError(error)) => {
528                tracing::warn!(error = %error, "Stream read failed, reconnecting...");
529            }
530            Err(e) => {
531                tracing::error!("Stream error: {}, reconnecting...", e);
532            }
533        }
534
535        // Mark as disconnected
536        server_state.set_connected(false).await;
537
538        // Cancel heartbeat on disconnection
539        heartbeat_handle.abort();
540        tracing::debug!("Heartbeat cancelled for reconnection");
541
542        tokio::time::sleep(tokio::time::Duration::from_secs(5)).await;
543    }
544}
545
546pub fn generate_worker_id() -> String {
547    format!(
548        "wrk_{}_{:x}",
549        chrono::Utc::now().timestamp(),
550        rand::random::<u64>()
551    )
552}
553
554fn build_worker_http_client() -> Result<Client> {
555    Client::builder()
556        .no_gzip()
557        .no_brotli()
558        .no_zstd()
559        .no_deflate()
560        .build()
561        .context("Failed to build worker HTTP client")
562}
563
564fn resolve_worker_id() -> String {
565    for key in ["CODETETHER_WORKER_ID", "A2A_WORKER_ID"] {
566        if let Ok(value) = std::env::var(key) {
567            let trimmed = value.trim();
568            if !trimmed.is_empty() {
569                return trimmed.to_string();
570            }
571        }
572    }
573    generate_worker_id()
574}
575
576fn normalize_max_concurrent_tasks(max_concurrent_tasks: usize) -> usize {
577    max_concurrent_tasks.max(1)
578}
579
580fn persistent_worker_enabled() -> bool {
581    std::env::var("PERSISTENT_WORKER_ENABLED")
582        .map(|value| {
583            matches!(
584                value.trim().to_ascii_lowercase().as_str(),
585                "1" | "true" | "yes" | "on"
586            )
587        })
588        .unwrap_or(false)
589}
590
591fn persistent_worker_lease_seconds() -> u64 {
592    std::env::var("PERSISTENT_WORKER_LEASE_SECONDS")
593        .ok()
594        .and_then(|value| value.trim().parse::<u64>().ok())
595        .filter(|value| *value > 0)
596        .unwrap_or(3600)
597}
598
599async fn reserve_task_slot(
600    processing: &Arc<Mutex<HashSet<String>>>,
601    task_id: &str,
602    max_concurrent_tasks: usize,
603) -> TaskReservation {
604    let mut proc = processing.lock().await;
605    if proc.contains(task_id) {
606        TaskReservation::AlreadyProcessing
607    } else if proc.len() >= max_concurrent_tasks {
608        TaskReservation::AtCapacity
609    } else {
610        proc.insert(task_id.to_string());
611        TaskReservation::Reserved
612    }
613}
614
615fn export_worker_runtime_env(server: &str, token: &Option<String>, worker_id: &str) {
616    // SAFETY: The worker sets these process-wide variables once during startup before
617    // spawning Git helper child processes. They are required so Git credential helpers
618    // invoked by later shell/git commands can reach the control plane securely.
619    unsafe {
620        std::env::set_var("CODETETHER_SERVER", server);
621        std::env::set_var("CODETETHER_WORKER_ID", worker_id);
622        if let Some(token) = token {
623            std::env::set_var("CODETETHER_TOKEN", token);
624        }
625    }
626}
627
628fn advertised_interfaces(public_url: Option<&str>) -> serde_json::Value {
629    let Some(base_url) = public_url
630        .map(str::trim)
631        .filter(|value| !value.is_empty())
632        .map(|value| value.trim_end_matches('/').to_string())
633    else {
634        return serde_json::json!({});
635    };
636
637    serde_json::json!({
638        "http": {
639            "base_url": base_url,
640        },
641        "bus": {
642            "stream_url": format!("{base_url}/v1/bus/stream"),
643            "publish_url": format!("{base_url}/v1/bus/publish"),
644        },
645    })
646}
647
648/// Approval policy used by the worker when deciding whether to execute tools automatically.
649///
650/// # Examples
651///
652/// ```rust
653/// use codetether_agent::a2a::worker::AutoApprove;
654///
655/// let mode = AutoApprove::Safe;
656/// assert!(matches!(mode, AutoApprove::Safe));
657/// ```
658#[derive(Debug, Clone, Copy)]
659pub enum AutoApprove {
660    All,
661    Safe,
662    None,
663}
664
665/// Default A2A server URL when none is configured
666pub const DEFAULT_A2A_SERVER_URL: &str = "https://api.codetether.run";
667
668/// Capabilities of the codetether-agent worker
669const BASE_WORKER_CAPABILITIES: &[&str] = &[
670    "forage", "ralph", "swarm", "rlm", "a2a", "mcp", "grpc", "grpc-web", "jsonrpc",
671];
672
673fn worker_capabilities() -> Vec<String> {
674    let mut capabilities: Vec<String> = BASE_WORKER_CAPABILITIES
675        .iter()
676        .map(|capability| capability.to_string())
677        .collect();
678
679    let is_knative = std::env::var("KNATIVE_SERVICE")
680        .map(|value| {
681            let normalized = value.trim().to_lowercase();
682            normalized == "1" || normalized == "true" || normalized == "yes"
683        })
684        .unwrap_or(false);
685    if is_knative {
686        capabilities.push("knative".to_string());
687    }
688
689    capabilities
690}
691
692/// Copy timeline progress into the runtime's shared progress state for heartbeat.
693async fn sync_timeline_to_runtime(
694    timeline: &task_timeline::TaskTimeline,
695    runtime: &WorkerTaskRuntime,
696) {
697    timeline.sync_progress().await;
698    let state = timeline.progress_handle().lock().await.clone();
699    *runtime.task_progress.lock().await = state;
700}
701
702/// Resolve workspace IDs for the configured workspace roots and log diagnostics.
703async fn resolve_and_log_workspace_ids(
704    client: &Client,
705    server: &str,
706    token: &Option<String>,
707    workspace_roots: &[String],
708) -> Vec<String> {
709    match workspace_resolve::resolve_workspace_ids(client, server, token, workspace_roots).await {
710        Ok(resolved) => {
711            if resolved.is_empty() {
712                tracing::warn!(
713                    roots = ?workspace_roots,
714                    "No server-side workspace IDs resolved for configured roots \
715                     — the server may not route tasks to this worker. \
716                     Ensure workspaces are registered with the control plane \
717                     and that their paths fall under the configured roots."
718                );
719            } else {
720                let ids: Vec<String> = resolved.iter().map(|ws| ws.id.clone()).collect();
721                tracing::info!(
722                    workspace_ids = ?ids,
723                    count = resolved.len(),
724                    "Resolved server-side workspace IDs for configured roots"
725                );
726            }
727            resolved.iter().map(|ws| ws.id.clone()).collect()
728        }
729        Err(e) => {
730            tracing::warn!(error = %e, "Failed to resolve workspace IDs from server");
731            Vec::new()
732        }
733    }
734}
735
736fn task_value<'a>(task: &'a serde_json::Value, key: &str) -> Option<&'a serde_json::Value> {
737    task.get("task")
738        .and_then(|t| t.get(key))
739        .or_else(|| task.get(key))
740}
741
742fn task_str<'a>(task: &'a serde_json::Value, key: &str) -> Option<&'a str> {
743    task_value(task, key).and_then(|v| v.as_str())
744}
745
746fn task_u64(task: &serde_json::Value, key: &str) -> Option<u64> {
747    let value = task_value(task, key)?;
748    if let Some(v) = value.as_u64() {
749        return Some(v);
750    }
751    if let Some(v) = value.as_i64()
752        && v >= 0
753    {
754        return Some(v as u64);
755    }
756    if let Some(v) = value.as_str()
757        && let Ok(parsed) = v.trim().parse::<u64>()
758    {
759        return Some(parsed);
760    }
761    None
762}
763
764fn task_metadata(task: &serde_json::Value) -> serde_json::Map<String, serde_json::Value> {
765    task_value(task, "metadata")
766        .and_then(|m| m.as_object())
767        .cloned()
768        .unwrap_or_default()
769}
770
771fn task_timeout_secs(
772    task: &serde_json::Value,
773    metadata: &serde_json::Map<String, serde_json::Value>,
774) -> u64 {
775    task_u64(task, "task_timeout_seconds")
776        .or_else(|| task_u64(task, "timeout_secs"))
777        .or_else(|| {
778            metadata_u64(
779                metadata,
780                &["task_timeout_seconds", "timeout_secs", "timeout"],
781            )
782        })
783        .unwrap_or(1200)
784        .clamp(60, 604800)
785}
786
787fn model_ref_to_provider_model(model: &str) -> String {
788    // Convert "provider:model" to "provider/model" format, but only if
789    // there is no '/' already present. Model IDs like "amazon.nova-micro-v1:0"
790    // contain colons as version separators and must NOT be converted.
791    if !model.contains('/') && model.contains(':') {
792        model.replacen(':', "/", 1)
793    } else {
794        model.to_string()
795    }
796}
797
798fn is_swarm_agent(agent_type: &str) -> bool {
799    matches!(
800        agent_type.trim().to_ascii_lowercase().as_str(),
801        "swarm" | "parallel" | "multi-agent"
802    )
803}
804
805fn is_forage_agent(agent_type: &str) -> bool {
806    agent_type.trim().eq_ignore_ascii_case("forage")
807}
808
809fn metadata_lookup<'a>(
810    metadata: &'a serde_json::Map<String, serde_json::Value>,
811    key: &str,
812) -> Option<&'a serde_json::Value> {
813    metadata
814        .get(key)
815        .or_else(|| {
816            metadata
817                .get("routing")
818                .and_then(|v| v.as_object())
819                .and_then(|obj| obj.get(key))
820        })
821        .or_else(|| {
822            metadata
823                .get("swarm")
824                .and_then(|v| v.as_object())
825                .and_then(|obj| obj.get(key))
826        })
827        .or_else(|| {
828            metadata
829                .get("forage")
830                .and_then(|v| v.as_object())
831                .and_then(|obj| obj.get(key))
832        })
833}
834
835fn metadata_str(
836    metadata: &serde_json::Map<String, serde_json::Value>,
837    keys: &[&str],
838) -> Option<String> {
839    for key in keys {
840        if let Some(value) = metadata_lookup(metadata, key).and_then(|v| v.as_str()) {
841            let trimmed = value.trim();
842            if !trimmed.is_empty() {
843                return Some(trimmed.to_string());
844            }
845        }
846    }
847    None
848}
849
850fn metadata_usize(
851    metadata: &serde_json::Map<String, serde_json::Value>,
852    keys: &[&str],
853) -> Option<usize> {
854    for key in keys {
855        if let Some(value) = metadata_lookup(metadata, key) {
856            if let Some(v) = value.as_u64() {
857                return usize::try_from(v).ok();
858            }
859            if let Some(v) = value.as_i64()
860                && v >= 0
861            {
862                return usize::try_from(v as u64).ok();
863            }
864            if let Some(v) = value.as_str()
865                && let Ok(parsed) = v.trim().parse::<usize>()
866            {
867                return Some(parsed);
868            }
869        }
870    }
871    None
872}
873
874fn metadata_u64(
875    metadata: &serde_json::Map<String, serde_json::Value>,
876    keys: &[&str],
877) -> Option<u64> {
878    for key in keys {
879        if let Some(value) = metadata_lookup(metadata, key) {
880            if let Some(v) = value.as_u64() {
881                return Some(v);
882            }
883            if let Some(v) = value.as_i64()
884                && v >= 0
885            {
886                return Some(v as u64);
887            }
888            if let Some(v) = value.as_str()
889                && let Ok(parsed) = v.trim().parse::<u64>()
890            {
891                return Some(parsed);
892            }
893        }
894    }
895    None
896}
897
898fn metadata_bool(
899    metadata: &serde_json::Map<String, serde_json::Value>,
900    keys: &[&str],
901) -> Option<bool> {
902    for key in keys {
903        if let Some(value) = metadata_lookup(metadata, key) {
904            if let Some(v) = value.as_bool() {
905                return Some(v);
906            }
907            if let Some(v) = value.as_str() {
908                match v.trim().to_ascii_lowercase().as_str() {
909                    "1" | "true" | "yes" | "on" => return Some(true),
910                    "0" | "false" | "no" | "off" => return Some(false),
911                    _ => {}
912                }
913            }
914        }
915    }
916    None
917}
918
919fn metadata_f64(
920    metadata: &serde_json::Map<String, serde_json::Value>,
921    keys: &[&str],
922) -> Option<f64> {
923    for key in keys {
924        if let Some(value) = metadata_lookup(metadata, key) {
925            if let Some(v) = value.as_f64() {
926                return Some(v);
927            }
928            if let Some(v) = value.as_i64() {
929                return Some(v as f64);
930            }
931            if let Some(v) = value.as_u64() {
932                return Some(v as f64);
933            }
934            if let Some(v) = value.as_str()
935                && let Ok(parsed) = v.trim().parse::<f64>()
936            {
937                return Some(parsed);
938            }
939        }
940    }
941    None
942}
943
944fn metadata_string_list(
945    metadata: &serde_json::Map<String, serde_json::Value>,
946    keys: &[&str],
947) -> Vec<String> {
948    for key in keys {
949        let Some(value) = metadata_lookup(metadata, key) else {
950            continue;
951        };
952
953        if let Some(items) = value.as_array() {
954            let parsed = items
955                .iter()
956                .filter_map(|item| item.as_str())
957                .map(str::trim)
958                .filter(|item| !item.is_empty())
959                .map(ToString::to_string)
960                .collect::<Vec<_>>();
961            if !parsed.is_empty() {
962                return parsed;
963            }
964        }
965
966        if let Some(value) = value.as_str() {
967            let parsed = value
968                .split(['\n', ','])
969                .map(str::trim)
970                .filter(|item| !item.is_empty())
971                .map(ToString::to_string)
972                .collect::<Vec<_>>();
973            if !parsed.is_empty() {
974                return parsed;
975            }
976        }
977    }
978
979    Vec::new()
980}
981
982fn normalize_forage_execution_engine(value: Option<String>) -> String {
983    match value
984        .as_deref()
985        .unwrap_or("run")
986        .trim()
987        .to_ascii_lowercase()
988        .as_str()
989    {
990        "swarm" => "swarm".to_string(),
991        "go" => "go".to_string(),
992        _ => "run".to_string(),
993    }
994}
995
996fn normalize_forage_swarm_strategy(value: Option<String>) -> String {
997    match value
998        .as_deref()
999        .unwrap_or("auto")
1000        .trim()
1001        .to_ascii_lowercase()
1002        .as_str()
1003    {
1004        "domain" => "domain".to_string(),
1005        "data" => "data".to_string(),
1006        "stage" => "stage".to_string(),
1007        "none" => "none".to_string(),
1008        _ => "auto".to_string(),
1009    }
1010}
1011
1012fn build_forage_args(
1013    prompt: &str,
1014    title: &str,
1015    metadata: &serde_json::Map<String, serde_json::Value>,
1016    selected_model: Option<String>,
1017) -> ForageArgs {
1018    let mut moonshots = metadata_string_list(
1019        metadata,
1020        &["moonshots", "moonshot", "goals", "mission", "missions"],
1021    );
1022    if moonshots.is_empty() {
1023        let fallback = prompt.trim();
1024        if !fallback.is_empty() {
1025            moonshots.push(fallback.to_string());
1026        } else if !title.trim().is_empty() {
1027            moonshots.push(title.trim().to_string());
1028        }
1029    }
1030
1031    ForageArgs {
1032        top: metadata_usize(metadata, &["top"]).unwrap_or(3),
1033        loop_mode: metadata_bool(metadata, &["loop", "loop_mode"]).unwrap_or(false),
1034        interval_secs: metadata_u64(metadata, &["interval_secs", "interval"]).unwrap_or(120),
1035        max_cycles: metadata_usize(metadata, &["max_cycles"]).unwrap_or(0),
1036        execute: metadata_bool(metadata, &["execute"]).unwrap_or(false),
1037        // Task-queue forage should run without requiring S3 archival by default.
1038        no_s3: metadata_bool(metadata, &["no_s3", "local_only"]).unwrap_or(true),
1039        moonshots,
1040        moonshot_file: metadata_str(metadata, &["moonshot_file"]).map(PathBuf::from),
1041        moonshot_required: metadata_bool(metadata, &["moonshot_required"]).unwrap_or(false),
1042        moonshot_min_alignment: metadata_f64(metadata, &["moonshot_min_alignment"]).unwrap_or(0.10),
1043        execution_engine: normalize_forage_execution_engine(metadata_str(
1044            metadata,
1045            &["execution_engine", "engine"],
1046        )),
1047        run_timeout_secs: metadata_u64(metadata, &["run_timeout_secs", "timeout_secs", "timeout"])
1048            .unwrap_or(900),
1049        fail_fast: metadata_bool(metadata, &["fail_fast"]).unwrap_or(false),
1050        swarm_strategy: normalize_forage_swarm_strategy(metadata_str(
1051            metadata,
1052            &["swarm_strategy", "strategy"],
1053        )),
1054        swarm_max_subagents: metadata_usize(metadata, &["swarm_max_subagents"]).unwrap_or(8),
1055        swarm_max_steps: metadata_usize(metadata, &["swarm_max_steps"]).unwrap_or(100),
1056        swarm_subagent_timeout_secs: metadata_u64(metadata, &["swarm_subagent_timeout_secs"])
1057            .unwrap_or(300),
1058        model: selected_model,
1059        json: metadata_bool(metadata, &["json"]).unwrap_or(false),
1060    }
1061}
1062
1063fn parse_swarm_strategy(
1064    metadata: &serde_json::Map<String, serde_json::Value>,
1065) -> DecompositionStrategy {
1066    match metadata_str(
1067        metadata,
1068        &[
1069            "decomposition_strategy",
1070            "swarm_strategy",
1071            "strategy",
1072            "swarm_decomposition",
1073        ],
1074    )
1075    .as_deref()
1076    .map(|s| s.to_ascii_lowercase())
1077    .as_deref()
1078    {
1079        Some("none") | Some("single") => DecompositionStrategy::None,
1080        Some("domain") | Some("by_domain") => DecompositionStrategy::ByDomain,
1081        Some("data") | Some("by_data") => DecompositionStrategy::ByData,
1082        Some("stage") | Some("by_stage") => DecompositionStrategy::ByStage,
1083        _ => DecompositionStrategy::Automatic,
1084    }
1085}
1086
1087async fn resolve_swarm_model(
1088    explicit_model: Option<String>,
1089    model_tier: Option<&str>,
1090) -> Option<String> {
1091    if let Some(model) = explicit_model
1092        && !model.trim().is_empty()
1093    {
1094        return Some(model);
1095    }
1096
1097    let registry = ProviderRegistry::shared_from_vault().await.ok()?;
1098    let providers = registry.list();
1099    if providers.is_empty() {
1100        return None;
1101    }
1102    let provider = choose_provider_for_tier(providers.as_slice(), model_tier);
1103    let model = default_model_for_provider(provider, model_tier);
1104    Some(format!("{}/{}", provider, model))
1105}
1106
1107fn format_swarm_event_for_output(event: &SwarmEvent) -> Option<String> {
1108    match event {
1109        SwarmEvent::Started {
1110            task,
1111            total_subtasks,
1112        } => Some(format!(
1113            "[swarm] started task={} planned_subtasks={}",
1114            task, total_subtasks
1115        )),
1116        SwarmEvent::StageComplete {
1117            stage,
1118            completed,
1119            failed,
1120        } => Some(format!(
1121            "[swarm] stage={} completed={} failed={}",
1122            stage, completed, failed
1123        )),
1124        SwarmEvent::SubTaskUpdate { id, status, .. } => Some(format!(
1125            "[swarm] subtask id={} status={}",
1126            &id.chars().take(8).collect::<String>(),
1127            format!("{status:?}").to_ascii_lowercase()
1128        )),
1129        SwarmEvent::AgentToolCall {
1130            subtask_id,
1131            tool_name,
1132        } => Some(format!(
1133            "[swarm] subtask id={} tool={}",
1134            &subtask_id.chars().take(8).collect::<String>(),
1135            tool_name
1136        )),
1137        SwarmEvent::AgentError { subtask_id, error } => Some(format!(
1138            "[swarm] subtask id={} error={}",
1139            &subtask_id.chars().take(8).collect::<String>(),
1140            error
1141        )),
1142        SwarmEvent::Complete { success, stats } => Some(format!(
1143            "[swarm] complete success={} subtasks={} speedup={:.2}",
1144            success,
1145            stats.subagents_completed + stats.subagents_failed,
1146            stats.speedup_factor
1147        )),
1148        SwarmEvent::Error(err) => Some(format!("[swarm] error message={}", err)),
1149        _ => None,
1150    }
1151}
1152
1153pub async fn register_worker(
1154    client: &Client,
1155    server: &str,
1156    token: &Option<String>,
1157    worker_id: &str,
1158    name: &str,
1159    codebases: &[String],
1160    public_url: Option<&str>,
1161) -> Result<()> {
1162    // Load ProviderRegistry and collect available models
1163    let models = match load_provider_models().await {
1164        Ok(m) => m,
1165        Err(e) => {
1166            tracing::warn!(
1167                "Failed to load provider models: {}, proceeding without model info",
1168                e
1169            );
1170            HashMap::new()
1171        }
1172    };
1173
1174    // Register via the workers/register endpoint
1175    let mut req = client.post(format!("{}/v1/agent/workers/register", server));
1176
1177    if let Some(t) = token {
1178        req = req.bearer_auth(t);
1179    }
1180
1181    // Flatten models HashMap into array of model objects with pricing data
1182    // matching the format expected by the A2A server's /models and /workers endpoints
1183    let models_array: Vec<serde_json::Value> = models
1184        .iter()
1185        .flat_map(|(provider, model_infos)| {
1186            model_infos.iter().map(move |m| {
1187                let mut obj = serde_json::json!({
1188                    "id": format!("{}/{}", provider, m.id),
1189                    "name": &m.id,
1190                    "provider": provider,
1191                    "provider_id": provider,
1192                });
1193                if let Some(input_cost) = m.input_cost_per_million {
1194                    obj["input_cost_per_million"] = serde_json::json!(input_cost);
1195                }
1196                if let Some(output_cost) = m.output_cost_per_million {
1197                    obj["output_cost_per_million"] = serde_json::json!(output_cost);
1198                }
1199                obj
1200            })
1201        })
1202        .collect();
1203
1204    tracing::info!(
1205        "Registering worker with {} models from {} providers",
1206        models_array.len(),
1207        models.len()
1208    );
1209
1210    let hostname = std::env::var("HOSTNAME")
1211        .or_else(|_| std::env::var("COMPUTERNAME"))
1212        .unwrap_or_else(|_| "unknown".to_string());
1213    let k8s_node_name = std::env::var("K8S_NODE_NAME")
1214        .ok()
1215        .map(|value| value.trim().to_string())
1216        .filter(|value| !value.is_empty());
1217
1218    // Collect agent definitions from the builtin registry
1219    let registry = crate::agent::AgentRegistry::with_builtins();
1220    let agent_defs: Vec<serde_json::Value> = registry
1221        .list()
1222        .iter()
1223        .map(|info| {
1224            serde_json::json!({
1225                "name": info.name,
1226                "description": info.description,
1227                "mode": format!("{:?}", info.mode).to_lowercase(),
1228                "native": info.native,
1229                "hidden": info.hidden,
1230                "model": info.model,
1231                "temperature": info.temperature,
1232                "top_p": info.top_p,
1233                "max_steps": info.max_steps,
1234            })
1235        })
1236        .collect();
1237
1238    // Resolve workspace IDs that correspond to our configured paths.
1239    let workspace_ids = resolve_and_log_workspace_ids(client, server, token, codebases).await;
1240
1241    let res = req
1242        .json(&serde_json::json!({
1243            "worker_id": worker_id,
1244            "name": name,
1245            "capabilities": worker_capabilities(),
1246            "hostname": hostname,
1247            "k8s_node_name": k8s_node_name,
1248            "models": models_array,
1249            "workspaces": codebases,
1250            "workspace_ids": workspace_ids,
1251            "interfaces": advertised_interfaces(public_url),
1252            "agents": agent_defs,
1253        }))
1254        .send()
1255        .await?;
1256
1257    if res.status().is_success() {
1258        tracing::info!("Worker registered successfully");
1259    } else {
1260        tracing::warn!("Failed to register worker: {}", res.status());
1261    }
1262
1263    Ok(())
1264}
1265
1266/// Load ProviderRegistry and collect all available models grouped by provider.
1267/// Tries Vault first, then falls back to config/env vars if Vault is unreachable.
1268/// Returns ModelInfo structs (with pricing data when available).
1269///
1270/// Result is cached process-wide: repeated calls (e.g. TUI startup +
1271/// background worker registration) do not re-hit every provider's
1272/// `/models` endpoint. The cache is populated exactly once per process
1273/// on the first call, so concurrent callers share one network fanout.
1274async fn load_provider_models() -> Result<HashMap<String, Vec<crate::provider::ModelInfo>>> {
1275    use tokio::sync::OnceCell;
1276    static CACHE: OnceCell<HashMap<String, Vec<crate::provider::ModelInfo>>> =
1277        OnceCell::const_new();
1278    CACHE
1279        .get_or_try_init(|| async { load_provider_models_uncached().await })
1280        .await
1281        .cloned()
1282}
1283
1284async fn load_provider_models_uncached() -> Result<HashMap<String, Vec<crate::provider::ModelInfo>>>
1285{
1286    // Try Vault first
1287    let registry = match ProviderRegistry::shared_from_vault().await {
1288        Ok(r) if !r.list().is_empty() => {
1289            tracing::info!("Loaded {} providers from Vault", r.list().len());
1290            (*r).clone()
1291        }
1292        Ok(_) => {
1293            tracing::warn!("Vault returned 0 providers, falling back to config/env vars");
1294            fallback_registry().await?
1295        }
1296        Err(e) => {
1297            tracing::warn!("Vault unreachable ({}), falling back to config/env vars", e);
1298            fallback_registry().await?
1299        }
1300    };
1301
1302    let mut models_by_provider: HashMap<String, Vec<crate::provider::ModelInfo>> = HashMap::new();
1303
1304    for provider_name in registry.list() {
1305        if let Some(provider) = registry.get(provider_name) {
1306            match provider.list_models().await {
1307                Ok(models) => {
1308                    if !models.is_empty() {
1309                        tracing::debug!("Provider {}: {} models", provider_name, models.len());
1310                        models_by_provider.insert(provider_name.to_string(), models);
1311                    }
1312                }
1313                Err(e) => {
1314                    tracing::debug!("Failed to list models for {}: {}", provider_name, e);
1315                }
1316            }
1317        }
1318    }
1319
1320    Ok(models_by_provider)
1321}
1322
1323/// Fallback: build a ProviderRegistry from config file + environment variables
1324async fn fallback_registry() -> Result<ProviderRegistry> {
1325    let config = crate::config::Config::load().await.unwrap_or_default();
1326    ProviderRegistry::from_config(&config).await
1327}
1328
1329async fn fetch_pending_tasks(runtime: &WorkerTaskRuntime) -> Result<()> {
1330    tracing::info!("Checking for pending tasks...");
1331
1332    let mut url = format!("{}/v1/agent/tasks?status=pending", runtime.server);
1333    if !runtime.agent_name.is_empty() {
1334        url = format!(
1335            "{}&agent_name={}",
1336            url,
1337            urlencoding::encode(&runtime.agent_name)
1338        );
1339    }
1340    let mut req = runtime.client.get(&url);
1341    if let Some(t) = &runtime.token {
1342        req = req.bearer_auth(t);
1343    }
1344
1345    let res = req.send().await?;
1346    if !res.status().is_success() {
1347        return Ok(());
1348    }
1349
1350    let data: serde_json::Value = res.json().await?;
1351    // Handle both plain array response and {tasks: [...]} wrapper
1352    let tasks = if let Some(arr) = data.as_array() {
1353        arr.clone()
1354    } else {
1355        data["tasks"].as_array().cloned().unwrap_or_default()
1356    };
1357
1358    tracing::info!("Found {} pending task(s)", tasks.len());
1359
1360    for task in tasks {
1361        if let Some(id) = task["id"].as_str() {
1362            // Scope gate: reject tasks not targeted at this worker
1363            if let Err(reason) = check_task_scope(
1364                &task,
1365                &runtime.worker_id,
1366                &runtime.agent_name,
1367                &runtime.workspace_ids,
1368            ) {
1369                tracing::debug!(
1370                    task_id = id,
1371                    reason = %reason,
1372                    "Pending task skipped — out of scope"
1373                );
1374                continue;
1375            }
1376
1377            match reserve_task_slot(&runtime.processing, id, runtime.max_concurrent_tasks).await {
1378                TaskReservation::Reserved => {
1379                    let task_id = id.to_string();
1380                    let runtime = runtime.clone();
1381
1382                    tokio::spawn(async move {
1383                        if let Err(e) = handle_task(&runtime, &task).await {
1384                            tracing::error!("Task {} failed: {}", task_id, e);
1385                        }
1386                        runtime.processing.lock().await.remove(&task_id);
1387                    });
1388                }
1389                TaskReservation::AlreadyProcessing => {}
1390                TaskReservation::AtCapacity => {
1391                    tracing::debug!(
1392                        max_concurrent_tasks = runtime.max_concurrent_tasks,
1393                        "Worker is at task capacity; leaving remaining tasks pending"
1394                    );
1395                    break;
1396                }
1397            }
1398        }
1399    }
1400
1401    Ok(())
1402}
1403
1404#[derive(Debug, PartialEq, Eq)]
1405enum StreamDisconnectReason {
1406    Ended,
1407    ReadError(String),
1408}
1409
1410async fn connect_stream(
1411    runtime: &WorkerTaskRuntime,
1412    name: &str,
1413    codebases: &[String],
1414    task_notify_rx: Option<mpsc::Receiver<String>>,
1415) -> Result<StreamDisconnectReason> {
1416    let url = format!(
1417        "{}/v1/worker/tasks/stream?agent_name={}&worker_id={}",
1418        runtime.server,
1419        urlencoding::encode(name),
1420        urlencoding::encode(&runtime.worker_id)
1421    );
1422
1423    let mut req = runtime
1424        .client
1425        .get(&url)
1426        .header("Accept", "text/event-stream")
1427        .header("Accept-Encoding", "identity")
1428        .header("Cache-Control", "no-cache, no-transform")
1429        .header("X-Worker-ID", &runtime.worker_id)
1430        .header("X-Agent-Name", name)
1431        .header("X-Codebases", codebases.join(","))
1432        .header("X-Workspaces", codebases.join(","));
1433
1434    if let Some(t) = &runtime.token {
1435        req = req.bearer_auth(t);
1436    }
1437
1438    let res = req.send().await?;
1439    if !res.status().is_success() {
1440        anyhow::bail!("Failed to connect: {}", res.status());
1441    }
1442
1443    tracing::info!("Connected to A2A server");
1444
1445    let mut stream = res.bytes_stream();
1446    let mut buffer = String::new();
1447    let mut poll_interval = tokio::time::interval(tokio::time::Duration::from_secs(30));
1448    poll_interval.tick().await; // consume the initial immediate tick
1449
1450    // Pin the optional receiver so we can use it in the loop
1451    let mut task_notify_rx = task_notify_rx;
1452
1453    loop {
1454        tokio::select! {
1455            // Handle task notification from CloudEvent (Knative Eventing)
1456            // Only if the channel was provided (i.e., running with HTTP server)
1457            task_id = async {
1458                if let Some(ref mut rx) = task_notify_rx {
1459                    rx.recv().await
1460                } else {
1461                    // Never ready when None - use pending to skip this branch
1462                    futures::future::pending().await
1463                }
1464            } => {
1465                if let Some(task_id) = task_id {
1466                    tracing::info!("Received task notification via CloudEvent: {}", task_id);
1467                    // Immediately poll for and process this task
1468                    if let Err(e) = poll_pending_tasks(runtime).await {
1469                        tracing::warn!("Task notification poll failed: {}", e);
1470                    }
1471                }
1472            }
1473            chunk = stream.next() => {
1474                match chunk {
1475                    Some(Ok(chunk)) => {
1476                        buffer.push_str(&String::from_utf8_lossy(&chunk));
1477
1478                        // Process SSE events
1479                        while let Some(pos) = buffer.find("\n\n") {
1480                            let event_str = buffer[..pos].to_string();
1481                            buffer = buffer[pos + 2..].to_string();
1482
1483                            if let Some(data_line) = event_str.lines().find(|l| l.starts_with("data:")) {
1484                                let data = data_line.trim_start_matches("data:").trim();
1485                                if data == "[DONE]" || data.is_empty() {
1486                                    continue;
1487                                }
1488
1489                                if let Ok(task) = serde_json::from_str::<serde_json::Value>(data) {
1490                                    spawn_task_handler(&task, runtime).await;
1491                                }
1492                            }
1493                        }
1494                    }
1495                    Some(Err(e)) => {
1496                        return Ok(StreamDisconnectReason::ReadError(e.to_string()));
1497                    }
1498                    None => {
1499                        // Stream ended
1500                        return Ok(StreamDisconnectReason::Ended);
1501                    }
1502                }
1503            }
1504            _ = poll_interval.tick() => {
1505                // Periodic poll for pending tasks the SSE stream may have missed
1506                if let Err(e) = poll_pending_tasks(runtime).await {
1507                    tracing::warn!("Periodic task poll failed: {}", e);
1508                }
1509            }
1510        }
1511    }
1512}
1513
1514async fn spawn_task_handler(task: &serde_json::Value, runtime: &WorkerTaskRuntime) {
1515    if let Some(id) = task
1516        .get("task")
1517        .and_then(|t| t["id"].as_str())
1518        .or_else(|| task["id"].as_str())
1519    {
1520        // Scope gate: reject tasks not targeted at this worker
1521        if let Err(reason) = check_task_scope(
1522            task,
1523            &runtime.worker_id,
1524            &runtime.agent_name,
1525            &runtime.workspace_ids,
1526        ) {
1527            tracing::debug!(
1528                task_id = id,
1529                reason = %reason,
1530                "SSE task skipped — out of scope"
1531            );
1532            return;
1533        }
1534
1535        match reserve_task_slot(&runtime.processing, id, runtime.max_concurrent_tasks).await {
1536            TaskReservation::Reserved => {
1537                let task_id = id.to_string();
1538                let task = task.clone();
1539                let runtime = runtime.clone();
1540
1541                tokio::spawn(async move {
1542                    if let Err(e) = handle_task(&runtime, &task).await {
1543                        tracing::error!("Task {} failed: {}", task_id, e);
1544                    }
1545                    runtime.processing.lock().await.remove(&task_id);
1546                });
1547            }
1548            TaskReservation::AlreadyProcessing => {}
1549            TaskReservation::AtCapacity => {
1550                tracing::debug!(
1551                    task_id = id,
1552                    max_concurrent_tasks = runtime.max_concurrent_tasks,
1553                    "Worker is at task capacity; task will stay pending until a slot frees up"
1554                );
1555            }
1556        }
1557    }
1558}
1559
1560async fn poll_pending_tasks(runtime: &WorkerTaskRuntime) -> Result<()> {
1561    let mut url = format!("{}/v1/agent/tasks?status=pending", runtime.server);
1562    if !runtime.agent_name.is_empty() {
1563        url = format!(
1564            "{}&agent_name={}",
1565            url,
1566            urlencoding::encode(&runtime.agent_name)
1567        );
1568    }
1569    let mut req = runtime.client.get(&url);
1570    if let Some(t) = &runtime.token {
1571        req = req.bearer_auth(t);
1572    }
1573
1574    let res = req.send().await?;
1575    if !res.status().is_success() {
1576        return Ok(());
1577    }
1578
1579    let data: serde_json::Value = res.json().await?;
1580    let tasks = if let Some(arr) = data.as_array() {
1581        arr.clone()
1582    } else {
1583        data["tasks"].as_array().cloned().unwrap_or_default()
1584    };
1585
1586    if !tasks.is_empty() {
1587        tracing::debug!("Poll found {} pending task(s)", tasks.len());
1588    }
1589
1590    for task in &tasks {
1591        // Scope gate: reject tasks not targeted at this worker
1592        let task_id_str = task_str(task, "id").unwrap_or("?");
1593        if let Err(reason) = check_task_scope(
1594            task,
1595            &runtime.worker_id,
1596            &runtime.agent_name,
1597            &runtime.workspace_ids,
1598        ) {
1599            tracing::debug!(
1600                task_id = task_id_str,
1601                reason = %reason,
1602                "Poll task skipped — out of scope"
1603            );
1604            continue;
1605        }
1606        spawn_task_handler(task, runtime).await;
1607    }
1608
1609    Ok(())
1610}
1611
1612async fn handle_task(runtime: &WorkerTaskRuntime, task: &serde_json::Value) -> Result<()> {
1613    let task_id = task_str(task, "id").ok_or_else(|| anyhow::anyhow!("No task ID"))?;
1614    let title = task_str(task, "title").unwrap_or("Untitled");
1615
1616    // Determine task timeout from the task payload/metadata or default to 1200s.
1617    let metadata = task_metadata(task);
1618    let timeout_secs = task_timeout_secs(task, &metadata);
1619
1620    let mut timeline = task_timeline::TaskTimeline::new(task_id, timeout_secs);
1621    timeline.checkpoint(task_timeline::TaskCheckpoint::TaskReceived);
1622    tracing::info!(
1623        "Handling task: {} ({}) [timeout={}s]",
1624        title,
1625        task_id,
1626        timeout_secs
1627    );
1628
1629    // Wire the timeline's progress state into the runtime so the heartbeat picks it up.
1630    // We'll sync after each checkpoint.
1631    sync_timeline_to_runtime(&timeline, runtime).await;
1632
1633    // Claim the task
1634    timeline.checkpoint(task_timeline::TaskCheckpoint::ClaimRequested);
1635    let mut req = runtime
1636        .client
1637        .post(format!("{}/v1/worker/tasks/claim", runtime.server))
1638        .header("X-Worker-ID", &runtime.worker_id);
1639    if let Some(t) = &runtime.token {
1640        req = req.bearer_auth(t);
1641    }
1642
1643    let res = req
1644        .json(&serde_json::json!({ "task_id": task_id }))
1645        .send()
1646        .await?;
1647
1648    if !res.status().is_success() {
1649        let status = res.status();
1650        let text = res.text().await?;
1651        if status == reqwest::StatusCode::CONFLICT {
1652            tracing::debug!(task_id, "Task already claimed by another worker, skipping");
1653        } else {
1654            tracing::warn!(task_id, %status, "Failed to claim task: {}", text);
1655        }
1656        return Ok(());
1657    }
1658
1659    let claim = res.json::<TaskClaimResponse>().await?;
1660    if let Some(claim_timeout_secs) = claim
1661        .task_timeout_seconds
1662        .map(|timeout_secs| timeout_secs.clamp(60, 604800))
1663    {
1664        timeline.update_timeout_secs(claim_timeout_secs);
1665    }
1666    let provider_keys = claim.provider_keys.clone();
1667    let claim_provenance = claim.into_provenance();
1668    timeline.checkpoint_with_detail(
1669        task_timeline::TaskCheckpoint::Claimed,
1670        Some(format!(
1671            "run_id={:?} attempt_id={:?}",
1672            claim_provenance.run_id, claim_provenance.attempt_id
1673        )),
1674    );
1675    tracing::info!(
1676        task_id,
1677        run_id = ?claim_provenance.run_id,
1678        attempt_id = ?claim_provenance.attempt_id,
1679        "Claimed task"
1680    );
1681
1682    // --- All post-claim work is wrapped so we always release the task ---
1683    let inner_result: Result<(&str, Option<String>, Option<String>, Option<String>)> =
1684        execute_claimed_task(
1685            runtime,
1686            task,
1687            task_id,
1688            title,
1689            &claim_provenance,
1690            provider_keys,
1691            &mut timeline,
1692        )
1693        .await;
1694
1695    let (status, result, error, session_id) = match inner_result {
1696        Ok(tuple) => tuple,
1697        Err(e) => {
1698            tracing::error!(
1699                task_id,
1700                error = %e,
1701                "Task failed after claim (releasing as failed)"
1702            );
1703            timeline.checkpoint_with_detail(
1704                task_timeline::TaskCheckpoint::Failed,
1705                Some(format!("{}", e)),
1706            );
1707            (
1708                "failed",
1709                None,
1710                Some(format!("Worker error after claim: {}", e)),
1711                None,
1712            )
1713        }
1714    };
1715
1716    // Emit final diagnostics before releasing
1717    timeline.emit_diagnostics();
1718    let diagnostics_json = timeline.diagnostics_json();
1719
1720    timeline.checkpoint(task_timeline::TaskCheckpoint::Releasing);
1721    release_task_result(
1722        &runtime.client,
1723        &runtime.server,
1724        &runtime.token,
1725        &runtime.worker_id,
1726        task_id,
1727        status,
1728        result,
1729        error,
1730        session_id,
1731        Some(diagnostics_json),
1732    )
1733    .await?;
1734
1735    timeline.checkpoint(task_timeline::TaskCheckpoint::Released);
1736
1737    // Clear task progress so heartbeat stops reporting stale state
1738    runtime.task_progress.lock().await.clear();
1739
1740    tracing::info!("Task released: {} with status: {}", task_id, status);
1741    Ok(())
1742}
1743
1744/// Inner logic for a claimed task. Returns (status, result, error, session_id).
1745/// Extracted so that `handle_task` can catch any error and always release.
1746#[allow(clippy::too_many_lines)]
1747async fn execute_claimed_task<'a>(
1748    runtime: &WorkerTaskRuntime,
1749    task: &'a serde_json::Value,
1750    task_id: &'a str,
1751    title: &'a str,
1752    claim_provenance: &ClaimProvenance,
1753    provider_keys: Option<serde_json::Value>,
1754    timeline: &mut task_timeline::TaskTimeline,
1755) -> Result<(&'static str, Option<String>, Option<String>, Option<String>)> {
1756    let metadata = task_metadata(task);
1757    timeline.checkpoint(task_timeline::TaskCheckpoint::MetadataParsed);
1758    let resume_session_id = metadata
1759        .get("resume_session_id")
1760        .and_then(|v| v.as_str())
1761        .map(|s| s.trim().to_string())
1762        .filter(|s| !s.is_empty());
1763    let complexity_hint = metadata_str(&metadata, &["complexity"]);
1764    let model_tier = metadata_str(&metadata, &["model_tier", "tier"])
1765        .map(|s| s.to_ascii_lowercase())
1766        .or_else(|| {
1767            complexity_hint.as_ref().map(|complexity| {
1768                match complexity.to_ascii_lowercase().as_str() {
1769                    "quick" => "fast".to_string(),
1770                    "deep" => "heavy".to_string(),
1771                    _ => "balanced".to_string(),
1772                }
1773            })
1774        });
1775    let worker_personality = metadata_str(
1776        &metadata,
1777        &["worker_personality", "personality", "agent_personality"],
1778    );
1779    let target_agent_name = metadata_str(&metadata, &["target_agent_name", "agent_name"]);
1780    let raw_model = task_str(task, "model_ref")
1781        .or_else(|| metadata_lookup(&metadata, "model_ref").and_then(|v| v.as_str()))
1782        .or_else(|| task_str(task, "model"))
1783        .or_else(|| metadata_lookup(&metadata, "model").and_then(|v| v.as_str()));
1784    let selected_model = raw_model.map(model_ref_to_provider_model);
1785    let raw_agent = task_str(task, "agent_type")
1786        .or_else(|| task_str(task, "agent"))
1787        .unwrap_or("build");
1788    let prompt = task_str(task, "prompt")
1789        .or_else(|| task_str(task, "description"))
1790        .unwrap_or(title);
1791
1792    // Detect virtual/global workspace tasks dispatched via the API.
1793    let workspace_id = task_str(task, "workspace_id")
1794        .or_else(|| metadata_lookup(&metadata, "workspace_id").and_then(|v| v.as_str()));
1795    let is_virtual_task = workspace_id.map_or(false, |ws| ws == "global" || ws.is_empty());
1796
1797    if raw_agent.eq_ignore_ascii_case("clone_repo") {
1798        return match handle_clone_repo_task(
1799            &runtime.client,
1800            &runtime.server,
1801            &runtime.token,
1802            &runtime.worker_id,
1803            task,
1804            &metadata,
1805        )
1806        .await
1807        {
1808            Ok(message) => Ok(("completed", Some(message), None, None)),
1809            Err(err) => {
1810                tracing::error!(task_id, error = %err, "Clone task failed");
1811                Ok(("failed", None, Some(format!("Error: {}", err)), None))
1812            }
1813        };
1814    }
1815
1816    if is_forage_agent(raw_agent) {
1817        return match handle_forage_task(title, prompt, &metadata, selected_model.clone()).await {
1818            Ok(message) => Ok(("completed", Some(message), None, None)),
1819            Err(err) => {
1820                tracing::error!(task_id, error = %err, "Forage task failed");
1821                Ok(("failed", None, Some(format!("Error: {}", err)), None))
1822            }
1823        };
1824    }
1825
1826    // Resume existing session when requested; fall back to a fresh session if missing.
1827    let mut session = if let Some(ref sid) = resume_session_id {
1828        match Session::load(sid).await {
1829            Ok(existing) => {
1830                tracing::info!("Resuming session {} for task {}", sid, task_id);
1831                existing
1832            }
1833            Err(e) => {
1834                tracing::warn!(
1835                    "Could not load session {} for task {} ({}), starting a new session",
1836                    sid,
1837                    task_id,
1838                    e
1839                );
1840                Session::new().await?
1841            }
1842        }
1843    } else {
1844        Session::new().await?
1845    };
1846    timeline.checkpoint_with_detail(
1847        task_timeline::TaskCheckpoint::SessionReady,
1848        Some(format!("session_id={}", session.id)),
1849    );
1850
1851    if !is_virtual_task {
1852        if let Some(workspace_path) = resolve_task_workspace_dir(
1853            &runtime.client,
1854            &runtime.server,
1855            &runtime.token,
1856            workspace_id,
1857        )
1858        .await?
1859        {
1860            session.metadata.directory = Some(workspace_path);
1861        }
1862        timeline.checkpoint_with_detail(
1863            task_timeline::TaskCheckpoint::WorkspaceReady,
1864            session
1865                .metadata
1866                .directory
1867                .as_deref()
1868                .map(|d| d.display().to_string()),
1869        );
1870    }
1871
1872    // For virtual/global tasks, clear the workspace directory so tools don't
1873    // try to operate on a workspace that doesn't exist on this worker.
1874    if is_virtual_task {
1875        tracing::info!(
1876            task_id,
1877            workspace_id = workspace_id.unwrap_or("(none)"),
1878            "Virtual task detected — skipping workspace setup"
1879        );
1880        session.metadata.directory = None;
1881    }
1882
1883    // Normalize agent: only "build", "plan", and swarm types are valid.
1884    // Map deprecated/unknown agent types (e.g. "general", "explore") to "build".
1885    let agent_type = if is_swarm_agent(raw_agent) {
1886        raw_agent
1887    } else {
1888        match raw_agent {
1889            "build" | "plan" => raw_agent,
1890            other => {
1891                tracing::info!(
1892                    "Agent \"{}\" is not a primary agent, falling back to \"build\"",
1893                    other
1894                );
1895                "build"
1896            }
1897        }
1898    };
1899    session.set_agent_name(agent_type.to_string());
1900    session.attach_claim_provenance(claim_provenance);
1901    session.metadata.provider_keys = provider_keys;
1902
1903    // Skip repository-local Git setup for virtual tasks (no workspace directory).
1904    if !is_virtual_task {
1905        if let (Some(directory), Some(workspace_id)) =
1906            (session.metadata.directory.as_deref(), workspace_id)
1907            && directory.join(".git").exists()
1908            && let Err(err) = configure_repo_git_auth(directory, workspace_id)
1909        {
1910            tracing::warn!(task_id, error = %err, "Failed to configure Git credential helper");
1911        }
1912        if let Some(directory) = session.metadata.directory.as_deref()
1913            && let Err(err) = install_commit_msg_hook(directory)
1914        {
1915            tracing::warn!(task_id, error = %err, "Failed to install commit-msg hook");
1916        }
1917        timeline.checkpoint(task_timeline::TaskCheckpoint::GitHookInstalled);
1918    }
1919
1920    if let Some(model) = selected_model.clone() {
1921        session.metadata.model = Some(model);
1922    }
1923    timeline.checkpoint_with_detail(
1924        task_timeline::TaskCheckpoint::ModelSelected,
1925        session.metadata.model.clone(),
1926    );
1927
1928    tracing::info!(task_id, agent_type, "Executing prompt: {}", prompt);
1929    timeline.checkpoint_with_detail(
1930        task_timeline::TaskCheckpoint::AgentStarting,
1931        Some(format!(
1932            "agent={} model={:?}",
1933            agent_type, session.metadata.model
1934        )),
1935    );
1936    sync_timeline_to_runtime(&timeline, runtime).await;
1937
1938    // Set up output streaming to forward progress to the server and bus
1939    let stream_client = runtime.client.clone();
1940    let stream_server = runtime.server.clone();
1941    let stream_token = runtime.token.clone();
1942    let stream_worker_id = runtime.worker_id.clone();
1943    let stream_task_id = task_id.to_string();
1944    let stream_bus = Arc::clone(&runtime.bus);
1945
1946    let output_callback: Arc<dyn Fn(String) + Send + Sync + 'static> =
1947        Arc::new(move |output: String| {
1948            let c = stream_client.clone();
1949            let s = stream_server.clone();
1950            let t = stream_token.clone();
1951            let w = stream_worker_id.clone();
1952            let tid = stream_task_id.clone();
1953
1954            let bus_handle = stream_bus.handle("task-output");
1955            bus_handle.send(
1956                format!("task.{}", tid),
1957                crate::bus::BusMessage::TaskUpdate {
1958                    task_id: tid.clone(),
1959                    state: crate::a2a::types::TaskState::Working,
1960                    message: Some(output.clone()),
1961                },
1962            );
1963
1964            tokio::spawn(async move {
1965                let mut req = c
1966                    .post(format!("{}/v1/agent/tasks/{}/output", s, tid))
1967                    .header("X-Worker-ID", &w);
1968                if let Some(tok) = &t {
1969                    req = req.bearer_auth(tok);
1970                }
1971                let _ = req
1972                    .json(&serde_json::json!({
1973                        "worker_id": w,
1974                        "output": output,
1975                    }))
1976                    .send()
1977                    .await;
1978            });
1979        });
1980
1981    // Execute swarm tasks via SwarmExecutor; all other agents use the standard session loop.
1982    let (status, result, error, session_id) = if is_swarm_agent(agent_type) {
1983        match execute_swarm_with_policy(
1984            &mut session,
1985            prompt,
1986            model_tier.as_deref(),
1987            selected_model,
1988            &metadata,
1989            complexity_hint.as_deref(),
1990            worker_personality.as_deref(),
1991            target_agent_name.as_deref(),
1992            Some(&runtime.bus),
1993            Some(Arc::clone(&output_callback)),
1994        )
1995        .await
1996        {
1997            Ok((session_result, true)) => {
1998                tracing::info!("Swarm task completed successfully: {}", task_id);
1999                (
2000                    "completed",
2001                    Some(session_result.text),
2002                    None,
2003                    Some(session_result.session_id),
2004                )
2005            }
2006            Ok((session_result, false)) => {
2007                tracing::warn!("Swarm task completed with failures: {}", task_id);
2008                (
2009                    "failed",
2010                    Some(session_result.text),
2011                    Some("Swarm execution completed with failures".to_string()),
2012                    Some(session_result.session_id),
2013                )
2014            }
2015            Err(e) => {
2016                tracing::error!("Swarm task failed: {} - {}", task_id, e);
2017                ("failed", None, Some(format!("Error: {}", e)), None)
2018            }
2019        }
2020    } else {
2021        match execute_session_with_policy(
2022            &mut session,
2023            prompt,
2024            runtime.auto_approve,
2025            model_tier.as_deref(),
2026            Some(Arc::clone(&output_callback)),
2027        )
2028        .await
2029        {
2030            Ok(session_result) => {
2031                tracing::info!("Task completed successfully: {}", task_id);
2032                (
2033                    "completed",
2034                    Some(session_result.text),
2035                    None,
2036                    Some(session_result.session_id),
2037                )
2038            }
2039            Err(e) => {
2040                tracing::error!(task_id, error = %e, "Task execution failed");
2041                ("failed", None, Some(format!("Error: {}", e)), None)
2042            }
2043        }
2044    };
2045
2046    // Record agent completion checkpoint
2047    timeline.checkpoint_with_detail(
2048        task_timeline::TaskCheckpoint::AgentDone,
2049        Some(format!("status={}", status)),
2050    );
2051    sync_timeline_to_runtime(&timeline, runtime).await;
2052
2053    // Check deadline before proceeding to commit/release phase
2054    if timeline.is_near_deadline() {
2055        tracing::warn!(
2056            task_id,
2057            elapsed_secs = format!("{:.1}", timeline.elapsed_secs()),
2058            budget_pct = format!("{:.1}%", timeline.budget_pct_used()),
2059            "Near deadline after agent completion — will attempt graceful shutdown"
2060        );
2061        timeline.checkpoint(task_timeline::TaskCheckpoint::GracefulShutdown);
2062    }
2063
2064    // Sync progress state for heartbeat reporting
2065    timeline.sync_progress().await;
2066
2067    Ok((
2068        status,
2069        result,
2070        error,
2071        Some(session_id.unwrap_or_else(|| session.id.clone())),
2072    ))
2073}
2074
2075async fn handle_forage_task(
2076    title: &str,
2077    prompt: &str,
2078    metadata: &serde_json::Map<String, serde_json::Value>,
2079    selected_model: Option<String>,
2080) -> Result<String> {
2081    let forage_args = build_forage_args(prompt, title, metadata, selected_model);
2082    let summary = crate::forage::execute_with_summary(forage_args).await?;
2083    Ok(summary.render_text())
2084}
2085
2086async fn handle_clone_repo_task(
2087    client: &Client,
2088    server: &str,
2089    token: &Option<String>,
2090    worker_id: &str,
2091    task: &serde_json::Value,
2092    metadata: &serde_json::Map<String, serde_json::Value>,
2093) -> Result<String> {
2094    let workspace_id = metadata_str(metadata, &["workspace_id"])
2095        .or_else(|| task_str(task, "workspace_id").map(|value| value.to_string()))
2096        .ok_or_else(|| anyhow::anyhow!("Clone task is missing workspace_id metadata"))?;
2097    let workspace = fetch_workspace_record(client, server, token, &workspace_id).await?;
2098    let git_url = metadata_str(metadata, &["git_url"])
2099        .or_else(|| workspace.git_url.clone())
2100        .ok_or_else(|| anyhow::anyhow!("Workspace {} is missing git_url", workspace_id))?;
2101    let branch = metadata_str(metadata, &["git_branch"])
2102        .or_else(|| workspace.git_branch.clone())
2103        .unwrap_or_else(|| "main".to_string());
2104    let repo_path = resolve_workspace_clone_path(&workspace_id, &workspace.path);
2105
2106    if let Some(parent) = repo_path.parent() {
2107        tokio::fs::create_dir_all(parent)
2108            .await
2109            .with_context(|| format!("Failed to create repo parent {}", parent.display()))?;
2110    }
2111
2112    let temp_helper_path =
2113        git_clone_base_dir().join(format!(".{}-git-credential-helper", workspace_id));
2114    write_git_credential_helper_script(&temp_helper_path, &workspace_id)?;
2115
2116    let clone_result = async {
2117        let git_dir = repo_path.join(".git");
2118        if git_dir.exists() {
2119            configure_repo_git_auth(&repo_path, &workspace_id)?;
2120            configure_repo_git_github_app_from_agent_config(
2121                &repo_path,
2122                Some(&serde_json::Value::Object(workspace.agent_config.clone())),
2123            );
2124            run_git_command_at(
2125                Some(&repo_path),
2126                vec!["fetch".to_string(), "origin".to_string(), branch.clone()],
2127            )
2128            .await?;
2129            run_git_command_at(
2130                Some(&repo_path),
2131                vec!["checkout".to_string(), branch.clone()],
2132            )
2133            .await?;
2134            run_git_command_at(
2135                Some(&repo_path),
2136                vec![
2137                    "pull".to_string(),
2138                    "--ff-only".to_string(),
2139                    "origin".to_string(),
2140                    branch.clone(),
2141                ],
2142            )
2143            .await?;
2144        } else {
2145            prepare_clone_target(&repo_path).await?;
2146
2147            run_git_command_at(
2148                None,
2149                vec![
2150                    "-c".to_string(),
2151                    format!("credential.helper={}", temp_helper_path.display()),
2152                    "-c".to_string(),
2153                    "credential.useHttpPath=true".to_string(),
2154                    "clone".to_string(),
2155                    "--single-branch".to_string(),
2156                    "--branch".to_string(),
2157                    branch.clone(),
2158                    git_url.clone(),
2159                    repo_path.display().to_string(),
2160                ],
2161            )
2162            .await?;
2163            configure_repo_git_auth(&repo_path, &workspace_id)?;
2164            configure_repo_git_github_app_from_agent_config(
2165                &repo_path,
2166                Some(&serde_json::Value::Object(workspace.agent_config.clone())),
2167            );
2168        }
2169        install_commit_msg_hook(&repo_path)?;
2170
2171        register_cloned_workspace(client, server, token, worker_id, &workspace, &repo_path).await?;
2172        enqueue_post_clone_task(client, server, token, worker_id, &workspace_id, metadata).await?;
2173
2174        Ok::<String, anyhow::Error>(format!(
2175            "Repository ready at {} (branch: {})",
2176            repo_path.display(),
2177            branch
2178        ))
2179    }
2180    .await;
2181
2182    let _ = tokio::fs::remove_file(&temp_helper_path).await;
2183    clone_result
2184}
2185
2186async fn register_cloned_workspace(
2187    client: &Client,
2188    server: &str,
2189    token: &Option<String>,
2190    worker_id: &str,
2191    workspace: &RegisteredWorkspaceRecord,
2192    repo_path: &Path,
2193) -> Result<()> {
2194    let mut req = client.post(format!(
2195        "{}/v1/agent/workspaces",
2196        server.trim_end_matches('/')
2197    ));
2198    if let Some(token) = token {
2199        req = req.bearer_auth(token);
2200    }
2201
2202    let response = req
2203        .json(&serde_json::json!({
2204            "workspace_id": workspace.id,
2205            "name": workspace.name,
2206            "path": repo_path.display().to_string(),
2207            "description": workspace.description,
2208            "agent_config": workspace.agent_config,
2209            "git_url": workspace.git_url,
2210            "git_branch": workspace.git_branch,
2211            "worker_id": worker_id,
2212        }))
2213        .send()
2214        .await
2215        .context("Failed to register cloned workspace with server")?;
2216    let status = response.status();
2217    if !status.is_success() {
2218        let body = response.text().await.unwrap_or_default();
2219        anyhow::bail!(
2220            "Failed to register cloned workspace {} ({}): {}",
2221            workspace.id,
2222            status,
2223            body
2224        );
2225    }
2226
2227    Ok(())
2228}
2229
2230async fn enqueue_post_clone_task(
2231    client: &Client,
2232    server: &str,
2233    token: &Option<String>,
2234    worker_id: &str,
2235    workspace_id: &str,
2236    metadata: &serde_json::Map<String, serde_json::Value>,
2237) -> Result<()> {
2238    if !worker_should_enqueue_post_clone_task(metadata) {
2239        return Ok(());
2240    }
2241
2242    let Some(post_clone_task) = metadata.get("post_clone_task") else {
2243        return Ok(());
2244    };
2245    let Some(task) = post_clone_task.as_object() else {
2246        return Ok(());
2247    };
2248    let title = task
2249        .get("title")
2250        .and_then(|value| value.as_str())
2251        .filter(|value| !value.trim().is_empty())
2252        .ok_or_else(|| anyhow::anyhow!("post_clone_task is missing title"))?;
2253    let prompt = task
2254        .get("prompt")
2255        .and_then(|value| value.as_str())
2256        .filter(|value| !value.trim().is_empty())
2257        .ok_or_else(|| anyhow::anyhow!("post_clone_task is missing prompt"))?;
2258    let agent_type = task
2259        .get("agent_type")
2260        .and_then(|value| value.as_str())
2261        .unwrap_or("build");
2262    let mut task_metadata = task
2263        .get("metadata")
2264        .cloned()
2265        .unwrap_or_else(|| serde_json::json!({}));
2266    if let Some(task_metadata_obj) = task_metadata.as_object_mut() {
2267        task_metadata_obj
2268            .entry("target_worker_id".to_string())
2269            .or_insert_with(|| serde_json::Value::String(worker_id.to_string()));
2270    }
2271
2272    let mut req = client.post(format!(
2273        "{}/v1/agent/workspaces/{}/tasks",
2274        server.trim_end_matches('/'),
2275        workspace_id
2276    ));
2277    if let Some(token) = token {
2278        req = req.bearer_auth(token);
2279    }
2280
2281    let response = req
2282        .json(&serde_json::json!({
2283            "title": title,
2284            "prompt": prompt,
2285            "agent_type": agent_type,
2286            "metadata": task_metadata,
2287        }))
2288        .send()
2289        .await
2290        .context("Failed to enqueue post-clone task")?;
2291    let status = response.status();
2292    if !status.is_success() {
2293        let body = response.text().await.unwrap_or_default();
2294        anyhow::bail!("Failed to enqueue post-clone task ({}): {}", status, body);
2295    }
2296    Ok(())
2297}
2298
2299fn worker_should_enqueue_post_clone_task(
2300    metadata: &serde_json::Map<String, serde_json::Value>,
2301) -> bool {
2302    metadata
2303        .get("post_clone_task")
2304        .and_then(|value| value.as_object())
2305        .is_some()
2306}
2307
2308async fn run_git_command_at(current_dir: Option<&Path>, args: Vec<String>) -> Result<String> {
2309    let mut command = Command::new("git");
2310    if let Some(dir) = current_dir {
2311        command.current_dir(dir);
2312    }
2313
2314    let output = command
2315        .args(args.iter().map(String::as_str))
2316        .output()
2317        .await
2318        .context("Failed to execute git command")?;
2319
2320    if output.status.success() {
2321        return Ok(String::from_utf8_lossy(&output.stdout).trim().to_string());
2322    }
2323
2324    Err(anyhow::anyhow!(
2325        "Git command failed: {}",
2326        String::from_utf8_lossy(&output.stderr).trim()
2327    ))
2328}
2329
2330async fn release_task_result(
2331    client: &Client,
2332    server: &str,
2333    token: &Option<String>,
2334    worker_id: &str,
2335    task_id: &str,
2336    status: &str,
2337    result: Option<String>,
2338    error: Option<String>,
2339    session_id: Option<String>,
2340    diagnostics: Option<serde_json::Value>,
2341) -> Result<()> {
2342    let mut req = client
2343        .post(format!("{}/v1/worker/tasks/release", server))
2344        .header("X-Worker-ID", worker_id);
2345    if let Some(token) = token {
2346        req = req.bearer_auth(token);
2347    }
2348
2349    let mut payload = serde_json::json!({
2350        "task_id": task_id,
2351        "status": status,
2352        "result": result,
2353        "error": error,
2354        "session_id": session_id,
2355    });
2356    if let Some(diagnostics) = diagnostics {
2357        if let Some(obj) = payload.as_object_mut() {
2358            obj.insert("diagnostics".to_string(), diagnostics);
2359        }
2360    }
2361
2362    req.json(&payload)
2363        .send()
2364        .await
2365        .context("Failed to release task result")?;
2366
2367    Ok(())
2368}
2369
2370async fn execute_swarm_with_policy(
2371    session: &mut Session,
2372    prompt: &str,
2373    model_tier: Option<&str>,
2374    explicit_model: Option<String>,
2375    metadata: &serde_json::Map<String, serde_json::Value>,
2376    complexity_hint: Option<&str>,
2377    worker_personality: Option<&str>,
2378    target_agent_name: Option<&str>,
2379    bus: Option<&Arc<AgentBus>>,
2380    output_callback: Option<Arc<dyn Fn(String) + Send + Sync + 'static>>,
2381) -> Result<(crate::session::SessionResult, bool)> {
2382    use crate::provider::{ContentPart, Message, Role};
2383
2384    session.add_message(Message {
2385        role: Role::User,
2386        content: vec![ContentPart::Text {
2387            text: prompt.to_string(),
2388        }],
2389    });
2390
2391    if session.title.is_none() {
2392        session.generate_title().await?;
2393    }
2394
2395    let strategy = parse_swarm_strategy(metadata);
2396    let max_subagents = metadata_usize(
2397        metadata,
2398        &["swarm_max_subagents", "max_subagents", "subagents"],
2399    )
2400    .unwrap_or(10)
2401    .clamp(1, 100);
2402    let max_steps_per_subagent = metadata_usize(
2403        metadata,
2404        &[
2405            "swarm_max_steps_per_subagent",
2406            "max_steps_per_subagent",
2407            "max_steps",
2408        ],
2409    )
2410    .unwrap_or(50)
2411    .clamp(1, 200);
2412    let timeout_secs = metadata_u64(metadata, &["swarm_timeout_secs", "timeout_secs", "timeout"])
2413        .unwrap_or(600)
2414        .clamp(30, 3600);
2415    let parallel_enabled =
2416        metadata_bool(metadata, &["swarm_parallel_enabled", "parallel_enabled"]).unwrap_or(true);
2417
2418    let model = resolve_swarm_model(explicit_model, model_tier).await;
2419    if let Some(ref selected_model) = model {
2420        session.metadata.model = Some(selected_model.clone());
2421    }
2422
2423    if let Some(ref cb) = output_callback {
2424        cb(format!(
2425            "[swarm] routing complexity={} tier={} personality={} target_agent={}",
2426            complexity_hint.unwrap_or("standard"),
2427            model_tier.unwrap_or("balanced"),
2428            worker_personality.unwrap_or("auto"),
2429            target_agent_name.unwrap_or("auto")
2430        ));
2431        cb(format!(
2432            "[swarm] config strategy={:?} max_subagents={} max_steps={} timeout={}s tier={}",
2433            strategy,
2434            max_subagents,
2435            max_steps_per_subagent,
2436            timeout_secs,
2437            model_tier.unwrap_or("balanced")
2438        ));
2439    }
2440
2441    let swarm_config = SwarmConfig {
2442        max_subagents,
2443        max_steps_per_subagent,
2444        subagent_timeout_secs: timeout_secs,
2445        parallel_enabled,
2446        model,
2447        working_dir: session
2448            .metadata
2449            .directory
2450            .as_ref()
2451            .map(|p| p.to_string_lossy().to_string()),
2452        ..Default::default()
2453    };
2454
2455    let swarm_result = if output_callback.is_some() {
2456        let (event_tx, mut event_rx) = mpsc::channel(256);
2457        let mut executor = SwarmExecutor::new(swarm_config).with_event_tx(event_tx);
2458        if let Some(bus) = bus {
2459            executor = executor.with_bus(Arc::clone(bus));
2460        }
2461        let prompt_owned = prompt.to_string();
2462        let mut exec_handle =
2463            tokio::spawn(async move { executor.execute(&prompt_owned, strategy).await });
2464
2465        let mut final_result: Option<crate::swarm::SwarmResult> = None;
2466
2467        while final_result.is_none() {
2468            tokio::select! {
2469                maybe_event = event_rx.recv() => {
2470                    if let Some(event) = maybe_event
2471                        && let Some(ref cb) = output_callback
2472                            && let Some(line) = format_swarm_event_for_output(&event) {
2473                                cb(line);
2474                            }
2475                }
2476                join_result = &mut exec_handle => {
2477                    let joined = join_result.map_err(|e| anyhow::anyhow!("Swarm join failure: {}", e))?;
2478                    final_result = Some(joined?);
2479                }
2480            }
2481        }
2482
2483        while let Ok(event) = event_rx.try_recv() {
2484            if let Some(ref cb) = output_callback
2485                && let Some(line) = format_swarm_event_for_output(&event)
2486            {
2487                cb(line);
2488            }
2489        }
2490
2491        final_result.ok_or_else(|| anyhow::anyhow!("Swarm execution returned no result"))?
2492    } else {
2493        let mut executor = SwarmExecutor::new(swarm_config);
2494        if let Some(bus) = bus {
2495            executor = executor.with_bus(Arc::clone(bus));
2496        }
2497        executor.execute(prompt, strategy).await?
2498    };
2499
2500    let final_text = if swarm_result.result.trim().is_empty() {
2501        if swarm_result.success {
2502            "Swarm completed without textual output.".to_string()
2503        } else {
2504            "Swarm finished with failures and no textual output.".to_string()
2505        }
2506    } else {
2507        swarm_result.result.clone()
2508    };
2509
2510    session.add_message(Message {
2511        role: Role::Assistant,
2512        content: vec![ContentPart::Text {
2513            text: final_text.clone(),
2514        }],
2515    });
2516    session.save().await?;
2517
2518    Ok((
2519        crate::session::SessionResult {
2520            text: final_text,
2521            session_id: session.id.clone(),
2522        },
2523        swarm_result.success,
2524    ))
2525}
2526
2527/// Execute a session with the given auto-approve policy
2528/// Optionally streams output chunks via the callback
2529async fn execute_session_with_policy(
2530    session: &mut Session,
2531    prompt: &str,
2532    auto_approve: AutoApprove,
2533    model_tier: Option<&str>,
2534    output_callback: Option<Arc<dyn Fn(String) + Send + Sync + 'static>>,
2535) -> Result<crate::session::SessionResult> {
2536    use crate::provider::{ContentPart, Message, ProviderRegistry, Role, parse_model_string};
2537    use std::sync::Arc;
2538
2539    let per_task_keys = session
2540        .metadata
2541        .provider_keys
2542        .as_ref()
2543        .and_then(|value| match crate::provider::PerTaskProviderKeys::from_value(value) {
2544            Ok(keys) => keys,
2545            Err(err) => {
2546                tracing::warn!(error = %err, "Invalid per-task provider key payload; falling back to platform registry");
2547                None
2548            }
2549        });
2550
2551    // Load provider registry from claim-injected tenant keys, falling back to Vault/env platform keys.
2552    let registry = if let Some(ref keys) = per_task_keys {
2553        let registry = keys.build_registry();
2554        if registry.list().is_empty() {
2555            tracing::warn!(
2556                "Per-task provider key payload produced no providers; falling back to platform registry"
2557            );
2558            (*ProviderRegistry::shared_from_vault().await.context(
2559                "Failed to load provider registry from Vault — check VAULT_ADDR/VAULT_TOKEN",
2560            )?)
2561            .clone()
2562        } else {
2563            tracing::info!(
2564                source = keys.source(),
2565                providers = ?keys.provider_names(),
2566                "Using tenant-scoped per-task provider registry"
2567            );
2568            registry
2569        }
2570    } else {
2571        (*ProviderRegistry::shared_from_vault().await.context(
2572            "Failed to load provider registry from Vault — check VAULT_ADDR/VAULT_TOKEN",
2573        )?)
2574        .clone()
2575    };
2576    let providers = registry.list();
2577    tracing::info!("Available providers: {:?}", providers);
2578
2579    if providers.is_empty() {
2580        anyhow::bail!(
2581            "No LLM providers available (0 providers loaded). \
2582             Configure API keys in HashiCorp Vault or set environment variables. \
2583             Vault address: {}",
2584            std::env::var("VAULT_ADDR").unwrap_or_else(|_| "(not set)".into())
2585        );
2586    }
2587
2588    // Parse model string
2589    let (provider_name, model_id) = if let Some(ref model_str) = session.metadata.model {
2590        let (prov, model) = parse_model_string(model_str);
2591        let prov = prov.map(|p| if p == "zhipuai" { "zai" } else { p });
2592        if prov.is_some() {
2593            (prov.map(|s| s.to_string()), model.to_string())
2594        } else if providers.contains(&model) {
2595            (Some(model.to_string()), String::new())
2596        } else {
2597            (None, model.to_string())
2598        }
2599    } else {
2600        (None, String::new())
2601    };
2602
2603    let provider_slice = providers.as_slice();
2604    if let Some(explicit_provider) = provider_name.as_deref()
2605        && !providers.contains(&explicit_provider)
2606    {
2607        anyhow::bail!(
2608            "Provider '{}' selected explicitly but is unavailable. Available providers: {}",
2609            explicit_provider,
2610            providers.join(", ")
2611        );
2612    }
2613
2614    // Determine which provider to use, preferring explicit request first, then model tier.
2615    let selected_provider = provider_name
2616        .as_deref()
2617        .unwrap_or_else(|| choose_provider_for_tier(provider_slice, model_tier));
2618
2619    let provider = registry
2620        .get(selected_provider)
2621        .ok_or_else(|| anyhow::anyhow!("Provider {} not found", selected_provider))?;
2622
2623    // Add user message
2624    session.add_message(Message {
2625        role: Role::User,
2626        content: vec![ContentPart::Text {
2627            text: prompt.to_string(),
2628        }],
2629    });
2630
2631    // Generate title
2632    if session.title.is_none() {
2633        session.generate_title().await?;
2634    }
2635
2636    // Determine model.
2637    let model = if !model_id.is_empty() {
2638        model_id
2639    } else {
2640        default_model_for_provider(selected_provider, model_tier)
2641    };
2642
2643    // Create tool registry with filtering based on auto-approve policy
2644    let workspace_dir = session
2645        .metadata
2646        .directory
2647        .clone()
2648        .unwrap_or_else(|| std::env::current_dir().unwrap_or_default());
2649    let tool_registry = create_filtered_registry(
2650        Arc::clone(&provider),
2651        model.clone(),
2652        auto_approve,
2653        &workspace_dir,
2654        output_callback.clone(),
2655    );
2656    let tool_definitions = tool_registry.definitions();
2657
2658    let temperature = if prefers_temperature_one(&model) {
2659        Some(1.0)
2660    } else {
2661        Some(0.7)
2662    };
2663
2664    tracing::info!(
2665        "Using model: {} via provider: {} (tier: {:?})",
2666        model,
2667        selected_provider,
2668        model_tier
2669    );
2670    tracing::info!(
2671        "Available tools: {} (auto_approve: {:?})",
2672        tool_definitions.len(),
2673        auto_approve
2674    );
2675
2676    // Build system prompt
2677    let system_prompt = crate::agent::builtin::build_system_prompt(&workspace_dir);
2678
2679    let mut final_output = String::new();
2680    let max_steps = 50;
2681
2682    for step in 1..=max_steps {
2683        tracing::info!(step = step, "Agent step starting");
2684
2685        let response = complete_worker_step_with_context_fallback(
2686            Arc::clone(&provider),
2687            session,
2688            &model,
2689            &system_prompt,
2690            &tool_definitions,
2691            temperature,
2692        )
2693        .await?;
2694
2695        crate::telemetry::TOKEN_USAGE.record_model_usage(
2696            &model,
2697            response.usage.prompt_tokens as u64,
2698            response.usage.completion_tokens as u64,
2699        );
2700
2701        // Extract tool calls
2702        let tool_calls: Vec<(String, String, serde_json::Value)> = response
2703            .message
2704            .content
2705            .iter()
2706            .filter_map(|part| {
2707                if let ContentPart::ToolCall {
2708                    id,
2709                    name,
2710                    arguments,
2711                    ..
2712                } = part
2713                {
2714                    let args: serde_json::Value =
2715                        serde_json::from_str(arguments).unwrap_or(serde_json::json!({}));
2716                    Some((id.clone(), name.clone(), args))
2717                } else {
2718                    None
2719                }
2720            })
2721            .collect();
2722
2723        // Collect text output and stream it
2724        for part in &response.message.content {
2725            if let ContentPart::Text { text } = part
2726                && !text.is_empty()
2727            {
2728                final_output.push_str(text);
2729                final_output.push('\n');
2730                if let Some(ref cb) = output_callback {
2731                    cb(text.clone());
2732                }
2733            }
2734        }
2735
2736        // If no tool calls, we're done
2737        if tool_calls.is_empty() {
2738            session.add_message(response.message.clone());
2739            break;
2740        }
2741
2742        session.add_message(response.message.clone());
2743
2744        tracing::info!(
2745            step = step,
2746            num_tools = tool_calls.len(),
2747            "Executing tool calls"
2748        );
2749
2750        // Execute each tool call
2751        for (tool_id, tool_name, tool_input) in tool_calls {
2752            tracing::info!(tool = %tool_name, tool_id = %tool_id, "Executing tool");
2753
2754            // Stream tool start event
2755            if let Some(ref cb) = output_callback {
2756                cb(format!("[tool:start:{}]", tool_name));
2757            }
2758
2759            // Check if tool is allowed based on auto-approve policy
2760            if !is_tool_allowed(&tool_name, auto_approve) {
2761                let msg = format!(
2762                    "Tool '{}' requires approval but auto-approve policy is {:?}",
2763                    tool_name, auto_approve
2764                );
2765                tracing::warn!(tool = %tool_name, "Tool blocked by auto-approve policy");
2766                session.add_message(Message {
2767                    role: Role::Tool,
2768                    content: vec![ContentPart::ToolResult {
2769                        tool_call_id: tool_id,
2770                        content: msg,
2771                    }],
2772                });
2773                continue;
2774            }
2775
2776            let content = if let Some(tool) = tool_registry.get(&tool_name) {
2777                let tool_timeout_secs =
2778                    env_u64("CODETETHER_WORKER_TOOL_TIMEOUT_SECS", 120).clamp(1, 3600);
2779                let exec_result: Result<crate::tool::ToolResult> = match tokio::time::timeout(
2780                    Duration::from_secs(tool_timeout_secs),
2781                    tool.execute(tool_input.clone()),
2782                )
2783                .await
2784                {
2785                    Ok(result) => result,
2786                    Err(_) => Ok(crate::tool::ToolResult::structured_error(
2787                        "TOOL_TIMEOUT",
2788                        &tool_name,
2789                        &format!("tool timed out after {tool_timeout_secs}s"),
2790                        None,
2791                        Some(serde_json::json!({
2792                            "hint": "Narrow the request, set a more specific path/include filter, or retry with smaller scope."
2793                        })),
2794                    )),
2795                };
2796                match exec_result {
2797                    Ok(result) => {
2798                        tracing::info!(tool = %tool_name, success = result.success, "Tool execution completed");
2799                        if let Some(ref cb) = output_callback {
2800                            let status = if result.success { "ok" } else { "err" };
2801                            cb(format!(
2802                                "[tool:{}:{}] {}",
2803                                tool_name,
2804                                status,
2805                                crate::util::truncate_bytes_safe(&result.output, 500)
2806                            ));
2807                        }
2808                        result.output
2809                    }
2810                    Err(e) => {
2811                        tracing::warn!(tool = %tool_name, error = %e, "Tool execution failed");
2812                        if let Some(ref cb) = output_callback {
2813                            cb(format!("[tool:{}:err] {}", tool_name, e));
2814                        }
2815                        format!("Error: {}", e)
2816                    }
2817                }
2818            } else {
2819                tracing::warn!(tool = %tool_name, "Tool not found");
2820                format!("Error: Unknown tool '{}'", tool_name)
2821            };
2822
2823            session.add_message(Message {
2824                role: Role::Tool,
2825                content: vec![ContentPart::ToolResult {
2826                    tool_call_id: tool_id,
2827                    content,
2828                }],
2829            });
2830        }
2831    }
2832
2833    session.save().await?;
2834
2835    Ok(crate::session::SessionResult {
2836        text: final_output.trim().to_string(),
2837        session_id: session.id.clone(),
2838    })
2839}
2840
2841async fn complete_worker_step_with_context_fallback(
2842    provider: Arc<dyn crate::provider::Provider>,
2843    session: &Session,
2844    model: &str,
2845    system_prompt: &str,
2846    tool_definitions: &[crate::provider::ToolDefinition],
2847    temperature: Option<f32>,
2848) -> Result<crate::provider::CompletionResponse> {
2849    let options = crate::session::context::RequestOptions {
2850        temperature,
2851        top_p: None,
2852        max_tokens: Some(8192),
2853        force_keep_last: None,
2854    };
2855
2856    match crate::session::context::complete_with_context(
2857        Arc::clone(&provider),
2858        session,
2859        model,
2860        system_prompt,
2861        tool_definitions,
2862        options,
2863    )
2864    .await
2865    {
2866        Ok(response) => Ok(response),
2867        Err(error) if crate::session::helper::error::is_prompt_too_long_error(&error) => {
2868            let compact_tools = compact_worker_tool_definitions(tool_definitions);
2869            tracing::warn!(
2870                error = %error,
2871                tool_count = tool_definitions.len(),
2872                original_tool_schema_bytes = tool_schema_bytes(tool_definitions),
2873                compact_tool_schema_bytes = tool_schema_bytes(&compact_tools),
2874                "Provider rejected prompt after context compaction; retrying with compact tool schemas"
2875            );
2876            crate::session::context::complete_with_context(
2877                provider,
2878                session,
2879                model,
2880                system_prompt,
2881                &compact_tools,
2882                options,
2883            )
2884            .await
2885            .map_err(|retry_error| {
2886                if crate::session::helper::error::is_prompt_too_long_error(&retry_error) {
2887                    retry_error.context(
2888                        "provider rejected prompt as too long after RLM compaction and compact tool-schema fallback",
2889                    )
2890                } else {
2891                    retry_error
2892                }
2893            })
2894        }
2895        Err(error) => Err(error),
2896    }
2897}
2898
2899fn compact_worker_tool_definitions(
2900    tools: &[crate::provider::ToolDefinition],
2901) -> Vec<crate::provider::ToolDefinition> {
2902    tools
2903        .iter()
2904        .map(|tool| crate::provider::ToolDefinition {
2905            name: tool.name.clone(),
2906            description: tool.description.clone(),
2907            parameters: serde_json::json!({
2908                "type": "object",
2909                "additionalProperties": true
2910            }),
2911        })
2912        .collect()
2913}
2914
2915fn tool_schema_bytes(tools: &[crate::provider::ToolDefinition]) -> usize {
2916    let mut writer = CountingWriter::default();
2917    serde_json::to_writer(&mut writer, tools)
2918        .map(|_| writer.bytes)
2919        .unwrap_or(0)
2920}
2921
2922#[derive(Default)]
2923struct CountingWriter {
2924    bytes: usize,
2925}
2926
2927impl std::io::Write for CountingWriter {
2928    fn write(&mut self, buf: &[u8]) -> std::io::Result<usize> {
2929        self.bytes += buf.len();
2930        Ok(buf.len())
2931    }
2932
2933    fn flush(&mut self) -> std::io::Result<()> {
2934        Ok(())
2935    }
2936}
2937
2938/// Start the heartbeat background task
2939/// Returns a JoinHandle that can be used to cancel the heartbeat
2940pub fn start_heartbeat(
2941    client: Client,
2942    server: String,
2943    token: Option<String>,
2944    heartbeat_state: HeartbeatState,
2945    processing: Arc<Mutex<HashSet<String>>>,
2946    cognition_config: CognitionHeartbeatConfig,
2947    task_progress: Arc<Mutex<task_timeline::TaskProgressState>>,
2948) -> JoinHandle<()> {
2949    tokio::spawn(async move {
2950        let mut consecutive_failures = 0u32;
2951        const MAX_FAILURES: u32 = 3;
2952        const HEARTBEAT_INTERVAL_SECS: u64 = 30;
2953        const COGNITION_RETRY_COOLDOWN_SECS: u64 = 300;
2954        let mut cognition_payload_disabled_until: Option<Instant> = None;
2955        let persistent_heartbeat_enabled = persistent_worker_enabled();
2956        let persistent_lease_seconds = persistent_worker_lease_seconds();
2957
2958        let mut interval =
2959            tokio::time::interval(tokio::time::Duration::from_secs(HEARTBEAT_INTERVAL_SECS));
2960        interval.set_missed_tick_behavior(tokio::time::MissedTickBehavior::Skip);
2961
2962        loop {
2963            interval.tick().await;
2964
2965            // Update task count from processing set
2966            let active_count = processing.lock().await.len();
2967            heartbeat_state.set_task_count(active_count).await;
2968
2969            // Determine status based on active tasks
2970            let status = if active_count > 0 {
2971                WorkerStatus::Processing
2972            } else {
2973                WorkerStatus::Idle
2974            };
2975            heartbeat_state.set_status(status).await;
2976
2977            // Send heartbeat
2978            let url = format!(
2979                "{}/v1/agent/workers/{}/heartbeat",
2980                server, heartbeat_state.worker_id
2981            );
2982            let mut req = client.post(&url);
2983
2984            if let Some(ref t) = token {
2985                req = req.bearer_auth(t);
2986            }
2987
2988            let status_str = heartbeat_state.status.lock().await.as_str().to_string();
2989            let sub_agents = heartbeat_state.sub_agents_snapshot().await;
2990
2991            // Include task pipeline progress in heartbeat
2992            let progress_snapshot = task_progress.lock().await.clone();
2993            let task_progress_payload = if progress_snapshot.task_id.is_some() {
2994                Some(serde_json::json!({
2995                    "task_id": progress_snapshot.task_id,
2996                    "current_checkpoint": progress_snapshot.current_checkpoint,
2997                    "elapsed_secs": format!("{:.1}", progress_snapshot.elapsed_secs),
2998                    "remaining_secs": format!("{:.1}", progress_snapshot.remaining_secs),
2999                    "budget_pct_used": format!("{:.1}%", progress_snapshot.budget_pct_used),
3000                    "checkpoints_reached": progress_snapshot.checkpoints_reached.len(),
3001                    "last_detail": progress_snapshot.last_detail,
3002                }))
3003            } else {
3004                None
3005            };
3006
3007            let base_payload = serde_json::json!({
3008                "worker_id": &heartbeat_state.worker_id,
3009                "agent_name": &heartbeat_state.agent_name,
3010                "status": status_str,
3011                "active_task_count": active_count,
3012                "sub_agents": sub_agents,
3013            });
3014            let mut payload = base_payload.clone();
3015            let mut included_cognition_payload = false;
3016            let cognition_payload_allowed = cognition_payload_disabled_until
3017                .map(|until| Instant::now() >= until)
3018                .unwrap_or(true);
3019
3020            if cognition_config.enabled
3021                && cognition_payload_allowed
3022                && let Some(cognition_payload) =
3023                    fetch_cognition_heartbeat_payload(&client, &cognition_config).await
3024                && let Some(obj) = payload.as_object_mut()
3025            {
3026                obj.insert("cognition".to_string(), cognition_payload);
3027                included_cognition_payload = true;
3028            }
3029
3030            // Include task pipeline progress if a task is active
3031            if let Some(progress_json) = task_progress_payload.clone() {
3032                if let Some(obj) = payload.as_object_mut() {
3033                    obj.insert("task_progress".to_string(), progress_json);
3034                }
3035            }
3036
3037            match req.json(&payload).send().await {
3038                Ok(res) => {
3039                    if res.status().is_success() {
3040                        consecutive_failures = 0;
3041                        tracing::debug!(
3042                            worker_id = %heartbeat_state.worker_id,
3043                            status = status_str,
3044                            active_tasks = active_count,
3045                            "Heartbeat sent successfully"
3046                        );
3047                    } else if included_cognition_payload && res.status().is_client_error() {
3048                        tracing::warn!(
3049                            worker_id = %heartbeat_state.worker_id,
3050                            status = %res.status(),
3051                            "Heartbeat cognition payload rejected, retrying without cognition payload"
3052                        );
3053
3054                        let mut retry_req = client.post(&url);
3055                        if let Some(ref t) = token {
3056                            retry_req = retry_req.bearer_auth(t);
3057                        }
3058
3059                        match retry_req.json(&base_payload).send().await {
3060                            Ok(retry_res) if retry_res.status().is_success() => {
3061                                cognition_payload_disabled_until = Some(
3062                                    Instant::now()
3063                                        + Duration::from_secs(COGNITION_RETRY_COOLDOWN_SECS),
3064                                );
3065                                consecutive_failures = 0;
3066                                tracing::warn!(
3067                                    worker_id = %heartbeat_state.worker_id,
3068                                    retry_after_secs = COGNITION_RETRY_COOLDOWN_SECS,
3069                                    "Paused cognition heartbeat payload after schema rejection"
3070                                );
3071                            }
3072                            Ok(retry_res) => {
3073                                consecutive_failures += 1;
3074                                tracing::warn!(
3075                                    worker_id = %heartbeat_state.worker_id,
3076                                    status = %retry_res.status(),
3077                                    failures = consecutive_failures,
3078                                    "Heartbeat failed even after retry without cognition payload"
3079                                );
3080                            }
3081                            Err(e) => {
3082                                consecutive_failures += 1;
3083                                tracing::warn!(
3084                                    worker_id = %heartbeat_state.worker_id,
3085                                    error = %e,
3086                                    failures = consecutive_failures,
3087                                    "Heartbeat retry without cognition payload failed"
3088                                );
3089                            }
3090                        }
3091                    } else {
3092                        consecutive_failures += 1;
3093                        tracing::warn!(
3094                            worker_id = %heartbeat_state.worker_id,
3095                            status = %res.status(),
3096                            failures = consecutive_failures,
3097                            "Heartbeat failed"
3098                        );
3099                    }
3100                }
3101                Err(e) => {
3102                    consecutive_failures += 1;
3103                    tracing::warn!(
3104                        worker_id = %heartbeat_state.worker_id,
3105                        error = %e,
3106                        failures = consecutive_failures,
3107                        "Heartbeat request failed"
3108                    );
3109                }
3110            }
3111
3112            if persistent_heartbeat_enabled
3113                && let Some(progress_json) = task_progress_payload
3114                && let Err(e) = send_extended_task_heartbeat(
3115                    &client,
3116                    &server,
3117                    &token,
3118                    &heartbeat_state.worker_id,
3119                    progress_json,
3120                    persistent_lease_seconds,
3121                )
3122                .await
3123            {
3124                tracing::warn!(
3125                    worker_id = %heartbeat_state.worker_id,
3126                    error = %e,
3127                    "Extended task heartbeat failed"
3128                );
3129            }
3130
3131            // Log error after 3 consecutive failures but do not terminate
3132            if consecutive_failures >= MAX_FAILURES {
3133                tracing::error!(
3134                    worker_id = %heartbeat_state.worker_id,
3135                    failures = consecutive_failures,
3136                    "Heartbeat failed {} consecutive times - worker will continue running and attempt reconnection via SSE loop",
3137                    MAX_FAILURES
3138                );
3139                // Reset counter to avoid spamming error logs
3140                consecutive_failures = 0;
3141            }
3142        }
3143    })
3144}
3145
3146async fn send_extended_task_heartbeat(
3147    client: &Client,
3148    server: &str,
3149    token: &Option<String>,
3150    worker_id: &str,
3151    progress: serde_json::Value,
3152    lease_seconds: u64,
3153) -> Result<()> {
3154    let task_id = progress
3155        .get("task_id")
3156        .and_then(|value| value.as_str())
3157        .filter(|value| !value.is_empty())
3158        .context("active task progress missing task_id")?
3159        .to_string();
3160    let checkpoint_seq = progress
3161        .get("checkpoints_reached")
3162        .and_then(|value| value.as_u64())
3163        .unwrap_or(1)
3164        .max(1);
3165    let status_message = progress
3166        .get("current_checkpoint")
3167        .and_then(|value| value.as_str())
3168        .map(|value| value.to_string());
3169
3170    let mut req = client.post(format!("{}/v1/worker/tasks/heartbeat-extended", server));
3171    if let Some(token) = token {
3172        req = req.bearer_auth(token);
3173    }
3174
3175    let response = req
3176        .json(&serde_json::json!({
3177            "task_id": &task_id,
3178            "worker_id": worker_id,
3179            "status_message": status_message,
3180            "checkpoint": progress,
3181            "checkpoint_seq": checkpoint_seq,
3182            "lease_extension_seconds": lease_seconds,
3183        }))
3184        .send()
3185        .await?;
3186
3187    let status = response.status();
3188    if !status.is_success() {
3189        let body = response.text().await.unwrap_or_default();
3190        anyhow::bail!("extended heartbeat rejected with {}: {}", status, body);
3191    }
3192
3193    let body = response
3194        .json::<serde_json::Value>()
3195        .await
3196        .unwrap_or_default();
3197    if body.get("success").and_then(|value| value.as_bool()) == Some(false) {
3198        tracing::debug!(
3199            task_id = %task_id,
3200            message = body
3201                .get("message")
3202                .and_then(|value| value.as_str())
3203                .unwrap_or("no active run"),
3204            "Extended task heartbeat skipped"
3205        );
3206        return Ok(());
3207    }
3208
3209    Ok(())
3210}
3211
3212async fn fetch_cognition_heartbeat_payload(
3213    client: &Client,
3214    config: &CognitionHeartbeatConfig,
3215) -> Option<serde_json::Value> {
3216    let status_url = format!("{}/v1/cognition/status", config.source_base_url);
3217    let status_res = tokio::time::timeout(
3218        Duration::from_millis(config.request_timeout_ms),
3219        client.get(status_url).send(),
3220    )
3221    .await
3222    .ok()?
3223    .ok()?;
3224
3225    if !status_res.status().is_success() {
3226        return None;
3227    }
3228
3229    let status: CognitionStatusSnapshot = status_res.json().await.ok()?;
3230    let mut payload = serde_json::json!({
3231        "running": status.running,
3232        "last_tick_at": status.last_tick_at,
3233        "active_persona_count": status.active_persona_count,
3234        "events_buffered": status.events_buffered,
3235        "snapshots_buffered": status.snapshots_buffered,
3236        "loop_interval_ms": status.loop_interval_ms,
3237    });
3238
3239    if config.include_thought_summary {
3240        let snapshot_url = format!("{}/v1/cognition/snapshots/latest", config.source_base_url);
3241        let snapshot_res = tokio::time::timeout(
3242            Duration::from_millis(config.request_timeout_ms),
3243            client.get(snapshot_url).send(),
3244        )
3245        .await
3246        .ok()
3247        .and_then(Result::ok);
3248
3249        if let Some(snapshot_res) = snapshot_res
3250            && snapshot_res.status().is_success()
3251            && let Ok(snapshot) = snapshot_res.json::<CognitionLatestSnapshot>().await
3252            && let Some(obj) = payload.as_object_mut()
3253        {
3254            obj.insert(
3255                "latest_snapshot_at".to_string(),
3256                serde_json::Value::String(snapshot.generated_at),
3257            );
3258            obj.insert(
3259                "latest_thought".to_string(),
3260                serde_json::Value::String(trim_for_heartbeat(
3261                    &snapshot.summary,
3262                    config.summary_max_chars,
3263                )),
3264            );
3265            if let Some(model) = snapshot
3266                .metadata
3267                .get("model")
3268                .and_then(serde_json::Value::as_str)
3269            {
3270                obj.insert(
3271                    "latest_thought_model".to_string(),
3272                    serde_json::Value::String(model.to_string()),
3273                );
3274            }
3275            if let Some(source) = snapshot
3276                .metadata
3277                .get("source")
3278                .and_then(serde_json::Value::as_str)
3279            {
3280                obj.insert(
3281                    "latest_thought_source".to_string(),
3282                    serde_json::Value::String(source.to_string()),
3283                );
3284            }
3285        }
3286    }
3287
3288    Some(payload)
3289}
3290
3291fn trim_for_heartbeat(input: &str, max_chars: usize) -> String {
3292    if input.chars().count() <= max_chars {
3293        return input.trim().to_string();
3294    }
3295
3296    let mut trimmed = String::with_capacity(max_chars + 3);
3297    for ch in input.chars().take(max_chars) {
3298        trimmed.push(ch);
3299    }
3300    trimmed.push_str("...");
3301    trimmed.trim().to_string()
3302}
3303
3304fn env_bool(name: &str, default: bool) -> bool {
3305    std::env::var(name)
3306        .ok()
3307        .and_then(|v| match v.to_ascii_lowercase().as_str() {
3308            "1" | "true" | "yes" | "on" => Some(true),
3309            "0" | "false" | "no" | "off" => Some(false),
3310            _ => None,
3311        })
3312        .unwrap_or(default)
3313}
3314
3315fn env_usize(name: &str, default: usize) -> usize {
3316    std::env::var(name)
3317        .ok()
3318        .and_then(|v| v.parse::<usize>().ok())
3319        .unwrap_or(default)
3320}
3321
3322fn env_u64(name: &str, default: u64) -> u64 {
3323    std::env::var(name)
3324        .ok()
3325        .and_then(|v| v.parse::<u64>().ok())
3326        .unwrap_or(default)
3327}
3328
3329fn maybe_configure_repo_git_auth_from_entry(entry: &serde_json::Value) {
3330    let Some(workspace_id) = entry.get("id").and_then(|v| v.as_str()) else {
3331        return;
3332    };
3333    let Some(path) = entry.get("path").and_then(|v| v.as_str()) else {
3334        return;
3335    };
3336    let has_git_remote = entry
3337        .get("git_url")
3338        .and_then(|v| v.as_str())
3339        .is_some_and(|value| !value.trim().is_empty());
3340    if !has_git_remote {
3341        return;
3342    }
3343
3344    let repo_path = Path::new(path);
3345    if !repo_path.join(".git").exists() {
3346        return;
3347    }
3348
3349    if let Err(error) = configure_repo_git_auth(repo_path, workspace_id) {
3350        tracing::debug!(
3351            workspace_id,
3352            path,
3353            error = %error,
3354            "Workspace sync could not install Git credential helper"
3355        );
3356    }
3357    configure_repo_git_github_app_from_agent_config(repo_path, entry.get("agent_config"));
3358}
3359
3360fn configure_repo_git_github_app_from_agent_config(
3361    repo_path: &Path,
3362    agent_config: Option<&serde_json::Value>,
3363) {
3364    let installation_id = agent_config
3365        .and_then(|value| value.get("git_auth"))
3366        .and_then(|value| value.get("github_app"))
3367        .and_then(|value| value.get("installation_id"))
3368        .and_then(|value| value.as_str());
3369    let app_id = agent_config
3370        .and_then(|value| value.get("git_auth"))
3371        .and_then(|value| value.get("github_app"))
3372        .and_then(|value| value.get("app_id"))
3373        .and_then(|value| value.as_str());
3374    if let Err(error) = configure_repo_git_github_app(repo_path, installation_id, app_id) {
3375        tracing::debug!(path = %repo_path.display(), error = %error, "Failed to persist GitHub App repo metadata");
3376    }
3377}
3378
3379/// Start a background task that periodically fetches the server's workspace list
3380/// and auto-registers any whose path exists on this machine's filesystem.
3381/// This lets workers pick up new codebases without restarting.
3382fn start_workspace_sync(
3383    client: Client,
3384    server: String,
3385    token: Option<String>,
3386    shared_codebases: Arc<Mutex<Vec<String>>>,
3387) -> JoinHandle<()> {
3388    tokio::spawn(async move {
3389        const POLL_INTERVAL_SECS: u64 = 60;
3390        let mut interval = tokio::time::interval(Duration::from_secs(POLL_INTERVAL_SECS));
3391        interval.set_missed_tick_behavior(tokio::time::MissedTickBehavior::Skip);
3392        interval.tick().await; // skip the immediate first tick -- let the worker settle first
3393
3394        loop {
3395            interval.tick().await;
3396            if let Err(e) =
3397                sync_workspaces_from_server(&client, &server, &token, &shared_codebases).await
3398            {
3399                tracing::warn!("Workspace sync failed: {}", e);
3400            }
3401        }
3402    })
3403}
3404
3405/// Fetch the server's workspace/codebase list and add any locally-present paths
3406/// that are not already in the worker's registered codebases.
3407/// New paths take effect on the next SSE reconnect (re-register re-sends X-Workspaces).
3408async fn sync_workspaces_from_server(
3409    client: &Client,
3410    server: &str,
3411    token: &Option<String>,
3412    shared_codebases: &Arc<Mutex<Vec<String>>>,
3413) -> Result<()> {
3414    let mut req = client.get(format!("{}/v1/agent/workspaces", server));
3415    if let Some(t) = token {
3416        req = req.bearer_auth(t);
3417    }
3418
3419    let res = req.send().await?;
3420    if !res.status().is_success() {
3421        tracing::debug!(
3422            status = %res.status(),
3423            "Workspace sync: server returned non-success, skipping"
3424        );
3425        return Ok(());
3426    }
3427
3428    let data: serde_json::Value = res.json().await?;
3429
3430    // Server returns { workspaces: [...] } or { codebases: [...] }
3431    let entries = if let Some(arr) = data.as_array() {
3432        arr.clone()
3433    } else {
3434        data["workspaces"]
3435            .as_array()
3436            .or_else(|| data["codebases"].as_array())
3437            .cloned()
3438            .unwrap_or_default()
3439    };
3440
3441    let mut new_paths: Vec<String> = Vec::new();
3442    {
3443        let current = shared_codebases.lock().await;
3444        for entry in &entries {
3445            maybe_configure_repo_git_auth_from_entry(entry);
3446            let path = match entry["path"].as_str().filter(|p| !p.is_empty()) {
3447                Some(p) => p,
3448                None => continue,
3449            };
3450            // Only auto-register if the path physically exists on this machine
3451            // and is not already in the codebases list
3452            if std::path::Path::new(path).exists() && !current.iter().any(|c| c.as_str() == path) {
3453                new_paths.push(path.to_string());
3454            }
3455        }
3456    }
3457
3458    if !new_paths.is_empty() {
3459        let mut current = shared_codebases.lock().await;
3460        for path in &new_paths {
3461            tracing::info!(
3462                path = %path,
3463                "Workspace sync: auto-discovered local path, adding to codebases"
3464            );
3465            current.push(path.clone());
3466        }
3467        tracing::info!(
3468            added = new_paths.len(),
3469            total = current.len(),
3470            "Workspace sync complete -- new paths take effect on next reconnect"
3471        );
3472    } else {
3473        tracing::debug!("Workspace sync: no new local paths found");
3474    }
3475
3476    Ok(())
3477}
3478
3479#[cfg(test)]
3480mod tests {
3481    use super::*;
3482    use crate::provider::{
3483        CompletionRequest, CompletionResponse, ContentPart, FinishReason, Message, ModelInfo,
3484        Provider, Role, StreamChunk, ToolDefinition, Usage,
3485    };
3486    use futures::stream::{self, BoxStream};
3487    use serde_json::json;
3488    use std::sync::atomic::{AtomicBool, AtomicUsize, Ordering};
3489
3490    struct ContextErrorUntilCompactProvider {
3491        calls: AtomicUsize,
3492        saw_compact_schema: AtomicBool,
3493    }
3494
3495    impl ContextErrorUntilCompactProvider {
3496        fn new() -> Self {
3497            Self {
3498                calls: AtomicUsize::new(0),
3499                saw_compact_schema: AtomicBool::new(false),
3500            }
3501        }
3502
3503        fn calls(&self) -> usize {
3504            self.calls.load(Ordering::SeqCst)
3505        }
3506
3507        fn saw_compact_schema(&self) -> bool {
3508            self.saw_compact_schema.load(Ordering::SeqCst)
3509        }
3510    }
3511
3512    #[async_trait::async_trait]
3513    impl Provider for ContextErrorUntilCompactProvider {
3514        fn name(&self) -> &str {
3515            "mock"
3516        }
3517
3518        async fn list_models(&self) -> Result<Vec<ModelInfo>> {
3519            Ok(Vec::new())
3520        }
3521
3522        async fn complete(&self, request: CompletionRequest) -> Result<CompletionResponse> {
3523            self.calls.fetch_add(1, Ordering::SeqCst);
3524            let compact = request.tools.iter().all(|tool| {
3525                tool.parameters.get("additionalProperties") == Some(&serde_json::Value::Bool(true))
3526            });
3527            if compact {
3528                self.saw_compact_schema.store(true, Ordering::SeqCst);
3529                return Ok(CompletionResponse {
3530                    message: Message {
3531                        role: Role::Assistant,
3532                        content: vec![ContentPart::Text { text: "ok".into() }],
3533                    },
3534                    usage: Usage::default(),
3535                    finish_reason: FinishReason::Stop,
3536                });
3537            }
3538            anyhow::bail!(
3539                "Your input exceeds the context window of this model. Please adjust your input and try again."
3540            )
3541        }
3542
3543        async fn complete_stream(
3544            &self,
3545            _request: CompletionRequest,
3546        ) -> Result<BoxStream<'static, StreamChunk>> {
3547            Ok(Box::pin(stream::empty()))
3548        }
3549    }
3550
3551    #[test]
3552    fn metadata_lookup_reads_nested_forage_keys() {
3553        let metadata = json!({
3554            "forage": {
3555                "execute": true,
3556                "top": 5,
3557            }
3558        })
3559        .as_object()
3560        .cloned()
3561        .unwrap();
3562
3563        assert_eq!(metadata_bool(&metadata, &["execute"]), Some(true));
3564        assert_eq!(metadata_usize(&metadata, &["top"]), Some(5));
3565    }
3566
3567    #[test]
3568    fn build_forage_args_defaults_prompt_to_moonshot_and_local_mode() {
3569        let metadata = serde_json::Map::new();
3570        let args = build_forage_args(
3571            "Expand enterprise adoption in regulated markets",
3572            "forage task",
3573            &metadata,
3574            Some("openrouter/z-ai/glm-5".to_string()),
3575        );
3576
3577        assert_eq!(
3578            args.moonshots,
3579            vec!["Expand enterprise adoption in regulated markets".to_string()]
3580        );
3581        assert!(args.no_s3);
3582        assert_eq!(args.model.as_deref(), Some("openrouter/z-ai/glm-5"));
3583        assert_eq!(args.execution_engine, "run");
3584    }
3585
3586    #[test]
3587    fn compact_worker_tool_definitions_preserve_tool_identity() {
3588        let tools = vec![ToolDefinition {
3589            name: "large_tool".to_string(),
3590            description: "Large tool schema".to_string(),
3591            parameters: json!({
3592                "type": "object",
3593                "properties": {
3594                    "payload": {
3595                        "type": "string",
3596                        "description": "x".repeat(20_000)
3597                    }
3598                }
3599            }),
3600        }];
3601
3602        let compact = compact_worker_tool_definitions(&tools);
3603
3604        assert_eq!(compact[0].name, tools[0].name);
3605        assert_eq!(compact[0].description, tools[0].description);
3606        assert_eq!(
3607            compact[0].parameters.get("additionalProperties"),
3608            Some(&serde_json::Value::Bool(true))
3609        );
3610        assert!(tool_schema_bytes(&compact) < tool_schema_bytes(&tools) / 10);
3611    }
3612
3613    #[tokio::test]
3614    async fn worker_retries_context_window_error_with_compact_tool_schema() {
3615        let provider = Arc::new(ContextErrorUntilCompactProvider::new());
3616        let provider_dyn: Arc<dyn Provider> = provider.clone();
3617        let mut session = Session::new().await.expect("session");
3618        session.add_message(Message {
3619            role: Role::User,
3620            content: vec![ContentPart::Text {
3621                text: "Run cronjob \"rule:Retry Failed Conversion Forwards\".".to_string(),
3622            }],
3623        });
3624        let tools = vec![ToolDefinition {
3625            name: "large_tool".to_string(),
3626            description: "Large tool schema".to_string(),
3627            parameters: json!({
3628                "type": "object",
3629                "properties": {
3630                    "payload": {
3631                        "type": "string",
3632                        "description": "x".repeat(20_000)
3633                    }
3634                }
3635            }),
3636        }];
3637
3638        let response = complete_worker_step_with_context_fallback(
3639            provider_dyn,
3640            &session,
3641            "mock-model",
3642            "system",
3643            &tools,
3644            Some(0.7),
3645        )
3646        .await
3647        .expect("compact tool-schema retry should recover");
3648
3649        assert_eq!(
3650            response
3651                .message
3652                .content
3653                .iter()
3654                .filter_map(|part| match part {
3655                    ContentPart::Text { text } => Some(text.as_str()),
3656                    _ => None,
3657                })
3658                .collect::<String>(),
3659            "ok"
3660        );
3661        assert_eq!(provider.calls(), 5);
3662        assert!(provider.saw_compact_schema());
3663    }
3664
3665    #[test]
3666    fn build_forage_args_honors_nested_forage_configuration() {
3667        let metadata = json!({
3668            "forage": {
3669                "top": 7,
3670                "loop": true,
3671                "max_cycles": 2,
3672                "execute": true,
3673                "no_s3": false,
3674                "moonshots": ["Ship autonomous OKR execution", "Reduce operator toil"],
3675                "moonshot_required": true,
3676                "moonshot_min_alignment": "0.25",
3677                "execution_engine": "swarm",
3678                "run_timeout_secs": 1200,
3679                "fail_fast": true,
3680                "swarm_strategy": "stage",
3681                "swarm_max_subagents": 4,
3682                "swarm_max_steps": 42,
3683                "swarm_subagent_timeout_secs": 180
3684            }
3685        })
3686        .as_object()
3687        .cloned()
3688        .unwrap();
3689
3690        let args = build_forage_args("fallback prompt", "title", &metadata, None);
3691
3692        assert_eq!(args.top, 7);
3693        assert!(args.loop_mode);
3694        assert_eq!(args.max_cycles, 2);
3695        assert!(args.execute);
3696        assert!(!args.no_s3);
3697        assert_eq!(args.moonshots.len(), 2);
3698        assert!(args.moonshot_required);
3699        assert!((args.moonshot_min_alignment - 0.25).abs() < f64::EPSILON);
3700        assert_eq!(args.execution_engine, "swarm");
3701        assert_eq!(args.run_timeout_secs, 1200);
3702        assert!(args.fail_fast);
3703        assert_eq!(args.swarm_strategy, "stage");
3704        assert_eq!(args.swarm_max_subagents, 4);
3705        assert_eq!(args.swarm_max_steps, 42);
3706        assert_eq!(args.swarm_subagent_timeout_secs, 180);
3707    }
3708
3709    #[test]
3710    fn worker_allows_github_app_post_clone_followups() {
3711        let metadata = json!({
3712            "source": "github-app",
3713            "post_clone_task": {
3714                "title": "Work issue #76",
3715                "prompt": "fix it"
3716            }
3717        })
3718        .as_object()
3719        .cloned()
3720        .unwrap();
3721
3722        assert!(worker_should_enqueue_post_clone_task(&metadata));
3723    }
3724
3725    #[test]
3726    fn worker_allows_legacy_post_clone_followups() {
3727        let metadata = json!({
3728            "post_clone_task": {
3729                "title": "Continue after clone",
3730                "prompt": "run build"
3731            }
3732        })
3733        .as_object()
3734        .cloned()
3735        .unwrap();
3736
3737        assert!(worker_should_enqueue_post_clone_task(&metadata));
3738    }
3739
3740    #[test]
3741    fn resolve_worker_id_prefers_env() {
3742        let original = std::env::var("CODETETHER_WORKER_ID").ok();
3743        unsafe {
3744            std::env::set_var("CODETETHER_WORKER_ID", "harvester-test-worker");
3745        }
3746        let resolved = resolve_worker_id();
3747        match original {
3748            Some(value) => unsafe {
3749                std::env::set_var("CODETETHER_WORKER_ID", value);
3750            },
3751            None => unsafe {
3752                std::env::remove_var("CODETETHER_WORKER_ID");
3753            },
3754        }
3755        assert_eq!(resolved, "harvester-test-worker");
3756    }
3757
3758    #[test]
3759    fn advertised_interfaces_include_http_and_bus_urls() {
3760        let interfaces = advertised_interfaces(Some("http://worker.test:8080/"));
3761        assert_eq!(interfaces["http"]["base_url"], "http://worker.test:8080");
3762        assert_eq!(
3763            interfaces["bus"]["stream_url"],
3764            "http://worker.test:8080/v1/bus/stream"
3765        );
3766        assert_eq!(
3767            interfaces["bus"]["publish_url"],
3768            "http://worker.test:8080/v1/bus/publish"
3769        );
3770    }
3771
3772    #[test]
3773    fn advertised_interfaces_omit_empty_public_url() {
3774        assert_eq!(advertised_interfaces(None), serde_json::json!({}));
3775        assert_eq!(advertised_interfaces(Some("   ")), serde_json::json!({}));
3776    }
3777
3778    #[test]
3779    fn worker_http_client_builds_with_stream_decoders_disabled() {
3780        assert!(build_worker_http_client().is_ok());
3781    }
3782
3783    #[test]
3784    fn task_timeout_prefers_fire_and_forget_payload_timeout() {
3785        let task = json!({
3786            "id": "task-1",
3787            "task_timeout_seconds": 604800,
3788            "metadata": {"timeout_secs": 1200}
3789        });
3790        let metadata = task_metadata(&task);
3791        assert_eq!(task_timeout_secs(&task, &metadata), 604800);
3792    }
3793
3794    #[tokio::test]
3795    async fn reserve_task_slot_enforces_capacity() {
3796        let processing = Arc::new(Mutex::new(HashSet::new()));
3797
3798        assert_eq!(
3799            reserve_task_slot(&processing, "task-1", 2).await,
3800            TaskReservation::Reserved
3801        );
3802        assert_eq!(
3803            reserve_task_slot(&processing, "task-1", 2).await,
3804            TaskReservation::AlreadyProcessing
3805        );
3806        assert_eq!(
3807            reserve_task_slot(&processing, "task-2", 2).await,
3808            TaskReservation::Reserved
3809        );
3810        assert_eq!(
3811            reserve_task_slot(&processing, "task-3", 2).await,
3812            TaskReservation::AtCapacity
3813        );
3814    }
3815
3816    #[test]
3817    fn normalize_max_concurrent_tasks_never_returns_zero() {
3818        assert_eq!(normalize_max_concurrent_tasks(0), 1);
3819        assert_eq!(normalize_max_concurrent_tasks(3), 3);
3820    }
3821}