Skip to main content

codetether_agent/a2a/
worker.rs

1//! A2A Worker - connects to an A2A server to process tasks
2
3mod clone_location;
4mod clone_target;
5mod model_defaults;
6mod model_preferences;
7pub(crate) mod task_timeline;
8pub(super) mod workspace_resolve;
9
10use crate::a2a::claim::TaskClaimResponse;
11use crate::a2a::git_credentials::{
12    configure_repo_git_auth, configure_repo_git_github_app, write_git_credential_helper_script,
13};
14use crate::a2a::spawn::{A2APeerHandle, SpawnOptions};
15use crate::a2a::worker_workspace_record::{RegisteredWorkspaceRecord, fetch_workspace_record};
16use crate::bus::AgentBus;
17use crate::cli::{A2aArgs, ForageArgs};
18use crate::provenance::{ClaimProvenance, git_commit_with_provenance, install_commit_msg_hook};
19use crate::provider::ProviderRegistry;
20use crate::session::Session;
21use crate::session::helper::runtime::enrich_tool_input_with_runtime_context;
22use crate::swarm::{DecompositionStrategy, SwarmConfig, SwarmExecutor};
23use crate::tui::swarm_view::SwarmEvent;
24use anyhow::{Context, Result};
25use futures::StreamExt;
26use reqwest::Client;
27use serde::Deserialize;
28use std::collections::HashMap;
29use std::collections::HashSet;
30use std::path::{Path, PathBuf};
31use std::sync::Arc;
32use std::time::{Duration, SystemTime};
33use tokio::process::Command;
34use tokio::sync::{Mutex, mpsc};
35use tokio::task::JoinHandle;
36use tokio::time::Instant;
37
38use self::model_defaults::{default_model_for_provider, prefers_temperature_one};
39use self::model_preferences::choose_provider_for_tier;
40use crate::a2a::task_scope::check_task_scope;
41use crate::a2a::worker_tool_registry::{create_filtered_registry, is_tool_allowed};
42use crate::a2a::worker_workspace_context::resolve_task_workspace_dir;
43use clone_location::{git_clone_base_dir, resolve_workspace_clone_path};
44use clone_target::prepare_clone_target;
45
46/// Worker status for heartbeat
47#[derive(Debug, Clone, Copy, PartialEq, Eq)]
48pub enum WorkerStatus {
49    Idle,
50    Processing,
51}
52
53impl WorkerStatus {
54    pub fn as_str(&self) -> &'static str {
55        match self {
56            WorkerStatus::Idle => "idle",
57            WorkerStatus::Processing => "processing",
58        }
59    }
60}
61
62/// Heartbeat state shared between the heartbeat task and the main worker
63#[derive(Clone)]
64pub struct HeartbeatState {
65    worker_id: String,
66    pub agent_name: String,
67    pub status: Arc<Mutex<WorkerStatus>>,
68    pub active_task_count: Arc<Mutex<usize>>,
69    pub sub_agents: Arc<Mutex<HashSet<String>>>,
70}
71
72impl HeartbeatState {
73    pub fn new(worker_id: String, agent_name: String) -> Self {
74        Self {
75            worker_id,
76            agent_name,
77            status: Arc::new(Mutex::new(WorkerStatus::Idle)),
78            active_task_count: Arc::new(Mutex::new(0)),
79            sub_agents: Arc::new(Mutex::new(HashSet::new())),
80        }
81    }
82
83    pub async fn set_status(&self, status: WorkerStatus) {
84        *self.status.lock().await = status;
85    }
86
87    pub async fn set_task_count(&self, count: usize) {
88        *self.active_task_count.lock().await = count;
89    }
90
91    pub async fn register_sub_agent(&self, agent_name: String) {
92        self.sub_agents.lock().await.insert(agent_name);
93    }
94
95    pub async fn deregister_sub_agent(&self, agent_name: &str) {
96        self.sub_agents.lock().await.remove(agent_name);
97    }
98
99    pub async fn sub_agents_snapshot(&self) -> Vec<String> {
100        let mut names = self
101            .sub_agents
102            .lock()
103            .await
104            .iter()
105            .cloned()
106            .collect::<Vec<_>>();
107        names.sort();
108        names
109    }
110}
111
112#[derive(Clone, Debug)]
113#[allow(dead_code)]
114pub struct CognitionHeartbeatConfig {
115    pub enabled: bool,
116    pub source_base_url: String,
117    pub token: Option<String>,
118    pub provider_name: String,
119    pub interval_secs: u64,
120    pub include_thought_summary: bool,
121    pub summary_max_chars: usize,
122    pub request_timeout_ms: u64,
123}
124
125impl CognitionHeartbeatConfig {
126    pub fn from_env() -> Self {
127        let source_base_url = std::env::var("CODETETHER_WORKER_COGNITION_SOURCE_URL")
128            .unwrap_or_else(|_| "http://127.0.0.1:4096".to_string())
129            .trim_end_matches('/')
130            .to_string();
131
132        Self {
133            enabled: env_bool("CODETETHER_WORKER_COGNITION_SHARE_ENABLED", true),
134            source_base_url,
135            include_thought_summary: env_bool("CODETETHER_WORKER_COGNITION_INCLUDE_THOUGHTS", true),
136            summary_max_chars: env_usize("CODETETHER_WORKER_COGNITION_THOUGHT_MAX_CHARS", 480),
137            request_timeout_ms: env_u64("CODETETHER_WORKER_COGNITION_TIMEOUT_MS", 2_500).max(250),
138            interval_secs: env_u64("CODETETHER_WORKER_COGNITION_INTERVAL_SECS", 30).max(5),
139            provider_name: std::env::var("CODETETHER_WORKER_COGNITION_PROVIDER")
140                .unwrap_or_else(|_| "cognition".to_string()),
141            token: std::env::var("CODETETHER_WORKER_COGNITION_TOKEN").ok(),
142        }
143    }
144}
145
146#[derive(Debug, Deserialize)]
147struct CognitionStatusSnapshot {
148    running: bool,
149    #[serde(default)]
150    last_tick_at: Option<String>,
151    #[serde(default)]
152    active_persona_count: usize,
153    #[serde(default)]
154    events_buffered: usize,
155    #[serde(default)]
156    snapshots_buffered: usize,
157    #[serde(default)]
158    loop_interval_ms: u64,
159}
160
161#[derive(Debug, Deserialize)]
162struct CognitionLatestSnapshot {
163    generated_at: String,
164    summary: String,
165    #[serde(default)]
166    metadata: HashMap<String, serde_json::Value>,
167}
168
169#[derive(Debug, Clone, Copy, PartialEq, Eq)]
170enum TaskReservation {
171    Reserved,
172    AlreadyProcessing,
173    AtCapacity,
174}
175
176#[derive(Clone)]
177struct WorkerTaskRuntime {
178    client: Client,
179    server: String,
180    token: Option<String>,
181    worker_id: String,
182    agent_name: String,
183    processing: Arc<Mutex<HashSet<String>>>,
184    max_concurrent_tasks: usize,
185    auto_approve: AutoApprove,
186    bus: Arc<AgentBus>,
187    /// Shared progress state for the currently active task (read by heartbeat).
188    task_progress: Arc<Mutex<task_timeline::TaskProgressState>>,
189    /// Server-side workspace IDs this worker is scoped to (empty = accept all).
190    workspace_ids: Vec<String>,
191}
192
193fn worker_a2a_mdns_enabled() -> bool {
194    env_bool("CODETETHER_WORKER_A2A_MDNS", false)
195}
196
197fn worker_a2a_port() -> u16 {
198    std::env::var("CODETETHER_WORKER_A2A_PORT")
199        .ok()
200        .and_then(|value| value.trim().parse::<u16>().ok())
201        .unwrap_or(8081)
202}
203
204fn worker_a2a_peer_seeds() -> Vec<String> {
205    ["CODETETHER_WORKER_A2A_PEERS", "CODETETHER_A2A_PEERS"]
206        .into_iter()
207        .filter_map(|name| std::env::var(name).ok())
208        .flat_map(|value| {
209            value
210                .split(',')
211                .map(str::trim)
212                .filter(|peer| !peer.is_empty())
213                .map(ToString::to_string)
214                .collect::<Vec<_>>()
215        })
216        .collect()
217}
218
219fn worker_public_url_for_port(args: &A2aArgs, port: u16) -> Option<String> {
220    let configured = args
221        .public_url
222        .as_deref()
223        .map(str::trim)
224        .filter(|value| !value.is_empty())?;
225
226    if let Ok(mut url) = reqwest::Url::parse(configured) {
227        let _ = url.set_port(Some(port));
228        return Some(url.as_str().trim_end_matches('/').to_string());
229    }
230
231    Some(configured.trim_end_matches('/').to_string())
232}
233
234async fn maybe_start_worker_a2a_peer(
235    args: &A2aArgs,
236    name: &str,
237    bus: Arc<AgentBus>,
238) -> Option<A2APeerHandle> {
239    if !worker_a2a_mdns_enabled() {
240        tracing::info!(
241            "Worker A2A mDNS peer disabled; set CODETETHER_WORKER_A2A_MDNS=true to enable"
242        );
243        return None;
244    }
245
246    let port = worker_a2a_port();
247    let public_url = worker_public_url_for_port(args, port);
248    let peer = worker_a2a_peer_seeds();
249    if !peer.is_empty() {
250        tracing::info!(peer_count = peer.len(), "Worker A2A peer seeds configured");
251    }
252    let opts = SpawnOptions {
253        name: Some(format!("{name}-a2a")),
254        hostname: args.hostname.clone(),
255        port,
256        public_url,
257        description: Some(format!("CodeTether harvester worker A2A peer for {name}")),
258        peer,
259        discovery_interval_secs: 15,
260        auto_introduce: true,
261        mdns: true,
262    };
263
264    match crate::a2a::spawn::start_a2a_in_background(opts, bus).await {
265        Ok(handle) => {
266            tracing::info!(agent = %handle.agent_name, public_url = %handle.public_url, "Worker A2A mDNS peer started");
267            Some(handle)
268        }
269        Err(error) => {
270            tracing::warn!(%error, "Worker A2A mDNS peer failed to start; continuing SSE worker path");
271            None
272        }
273    }
274}
275
276// Run the A2A worker
277pub async fn run(args: A2aArgs) -> Result<()> {
278    let server = args.server.trim_end_matches('/');
279    let name = args
280        .name
281        .as_deref()
282        .map(ToString::to_string)
283        .unwrap_or_else(|| format!("codetether-{}", std::process::id()));
284    let worker_id = resolve_worker_id();
285    export_worker_runtime_env(server, &args.token, &worker_id);
286    let max_concurrent_tasks = normalize_max_concurrent_tasks(args.max_concurrent_tasks);
287
288    let codebases: Vec<String> = args
289        .workspaces
290        .as_deref()
291        .map(|c| c.split(',').map(|s| s.trim().to_string()).collect())
292        .unwrap_or_else(|| vec![std::env::current_dir().unwrap().display().to_string()]);
293
294    tracing::info!("Starting A2A worker: {} ({})", name, worker_id);
295    tracing::info!("Server: {}", server);
296    tracing::info!("Workspaces: {:?}", codebases);
297    tracing::info!(max_concurrent_tasks, "Worker task concurrency configured");
298
299    // Wrap in shared mutex so background workspace-sync can add new local paths
300    let shared_codebases: Arc<Mutex<Vec<String>>> = Arc::new(Mutex::new(codebases));
301
302    let client = build_worker_http_client()?;
303    let processing = Arc::new(Mutex::new(HashSet::<String>::new()));
304    let cognition_heartbeat = CognitionHeartbeatConfig::from_env();
305    if cognition_heartbeat.enabled {
306        tracing::info!(
307            source = %cognition_heartbeat.source_base_url,
308            include_thoughts = cognition_heartbeat.include_thought_summary,
309            max_chars = cognition_heartbeat.summary_max_chars,
310            timeout_ms = cognition_heartbeat.request_timeout_ms,
311            "Cognition heartbeat sharing enabled (set CODETETHER_WORKER_COGNITION_SHARE_ENABLED=false to disable)"
312        );
313    } else {
314        tracing::warn!(
315            "Cognition heartbeat sharing disabled; worker thought state will not be shared upstream"
316        );
317    }
318
319    let auto_approve = match args.auto_approve.as_str() {
320        "all" => AutoApprove::All,
321        "safe" => AutoApprove::Safe,
322        _ => AutoApprove::None,
323    };
324
325    // Create heartbeat state
326    let heartbeat_state = HeartbeatState::new(worker_id.clone(), name.clone());
327
328    // Create agent bus for in-process sub-agent communication
329    let bus = AgentBus::new().into_arc();
330
331    // Auto-start S3 sink if MinIO is configured
332    crate::bus::s3_sink::spawn_bus_s3_sink(bus.clone());
333
334    {
335        let handle = bus.handle(&worker_id);
336        handle.announce_ready(worker_capabilities());
337    }
338
339    let _a2a_peer = maybe_start_worker_a2a_peer(&args, &name, bus.clone()).await;
340
341    let task_progress: Arc<Mutex<task_timeline::TaskProgressState>> =
342        Arc::new(Mutex::new(task_timeline::TaskProgressState::new()));
343
344    let task_runtime = WorkerTaskRuntime {
345        client: client.clone(),
346        server: server.to_string(),
347        token: args.token.clone(),
348        worker_id: worker_id.clone(),
349        agent_name: name.clone(),
350        processing: processing.clone(),
351        max_concurrent_tasks,
352        auto_approve,
353        bus: bus.clone(),
354        task_progress: task_progress.clone(),
355        workspace_ids: Vec::new(),
356    };
357
358    // Register worker
359    {
360        let codebases = shared_codebases.lock().await.clone();
361        register_worker(
362            &client,
363            server,
364            &args.token,
365            &worker_id,
366            &name,
367            &codebases,
368            args.public_url.as_deref(),
369        )
370        .await?;
371    }
372
373    if let Err(e) =
374        sync_workspaces_from_server(&client, server, &args.token, &shared_codebases).await
375    {
376        tracing::warn!(error = %e, "Initial workspace sync failed");
377    }
378
379    // Fetch pending tasks
380    fetch_pending_tasks(&task_runtime).await?;
381
382    // Start background task that polls server for new locally-present workspaces
383    let _workspace_sync_handle = start_workspace_sync(
384        client.clone(),
385        server.to_string(),
386        args.token.clone(),
387        shared_codebases.clone(),
388    );
389
390    // Connect to SSE stream
391    loop {
392        // Refresh codebases snapshot (background sync may have added new local paths)
393        let codebases = shared_codebases.lock().await.clone();
394
395        // Re-register worker on each reconnection to report updated models/capabilities
396        if let Err(e) = register_worker(
397            &client,
398            server,
399            &args.token,
400            &worker_id,
401            &name,
402            &codebases,
403            args.public_url.as_deref(),
404        )
405        .await
406        {
407            tracing::warn!("Failed to re-register worker on reconnection: {}", e);
408        }
409        if let Err(e) = fetch_pending_tasks(&task_runtime).await {
410            tracing::warn!("Reconnect task fetch failed: {}", e);
411        }
412
413        // Start heartbeat task for this connection
414        let heartbeat_handle = start_heartbeat(
415            client.clone(),
416            server.to_string(),
417            args.token.clone(),
418            heartbeat_state.clone(),
419            processing.clone(),
420            cognition_heartbeat.clone(),
421            task_progress.clone(),
422        );
423
424        match connect_stream(&task_runtime, &name, &codebases, None).await {
425            Ok(StreamDisconnectReason::Ended) => {
426                tracing::warn!("Stream ended, reconnecting...");
427            }
428            Ok(StreamDisconnectReason::ReadError(error)) => {
429                tracing::warn!(error = %error, "Stream read failed, reconnecting...");
430            }
431            Err(e) => {
432                tracing::error!("Stream error: {}, reconnecting...", e);
433            }
434        }
435
436        // Cancel heartbeat on disconnection
437        heartbeat_handle.abort();
438        tracing::debug!("Heartbeat cancelled for reconnection");
439
440        tokio::time::sleep(tokio::time::Duration::from_secs(5)).await;
441    }
442}
443
444/// Run the A2A worker with shared state for HTTP server integration
445/// This variant accepts a WorkerServerState to communicate with the HTTP server
446pub async fn run_with_state(
447    args: A2aArgs,
448    server_state: crate::worker_server::WorkerServerState,
449) -> Result<()> {
450    let server = args.server.trim_end_matches('/');
451    let name = args
452        .name
453        .as_deref()
454        .map(ToString::to_string)
455        .unwrap_or_else(|| format!("codetether-{}", std::process::id()));
456    let worker_id = resolve_worker_id();
457    export_worker_runtime_env(server, &args.token, &worker_id);
458    let max_concurrent_tasks = normalize_max_concurrent_tasks(args.max_concurrent_tasks);
459
460    // Share worker_id with HTTP server
461    server_state.set_worker_id(worker_id.clone()).await;
462
463    let codebases: Vec<String> = args
464        .workspaces
465        .as_deref()
466        .map(|c| c.split(',').map(|s| s.trim().to_string()).collect())
467        .unwrap_or_else(|| vec![std::env::current_dir().unwrap().display().to_string()]);
468
469    tracing::info!("Starting A2A worker: {} ({})", name, worker_id);
470    tracing::info!("Server: {}", server);
471    tracing::info!("Workspaces: {:?}", codebases);
472    tracing::info!(max_concurrent_tasks, "Worker task concurrency configured");
473
474    // Wrap in shared mutex so background workspace-sync can add new local paths
475    let shared_codebases: Arc<Mutex<Vec<String>>> = Arc::new(Mutex::new(codebases));
476
477    let client = build_worker_http_client()?;
478    let processing = Arc::new(Mutex::new(HashSet::<String>::new()));
479    let cognition_heartbeat = CognitionHeartbeatConfig::from_env();
480    if cognition_heartbeat.enabled {
481        tracing::info!(
482            source = %cognition_heartbeat.source_base_url,
483            include_thoughts = cognition_heartbeat.include_thought_summary,
484            max_chars = cognition_heartbeat.summary_max_chars,
485            timeout_ms = cognition_heartbeat.request_timeout_ms,
486            "Cognition heartbeat sharing enabled (set CODETETHER_WORKER_COGNITION_SHARE_ENABLED=false to disable)"
487        );
488    } else {
489        tracing::warn!(
490            "Cognition heartbeat sharing disabled; worker thought state will not be shared upstream"
491        );
492    }
493
494    let auto_approve = match args.auto_approve.as_str() {
495        "all" => AutoApprove::All,
496        "safe" => AutoApprove::Safe,
497        _ => AutoApprove::None,
498    };
499
500    // Create heartbeat state
501    let heartbeat_state = HeartbeatState::new(worker_id.clone(), name.clone());
502
503    // Share heartbeat state with HTTP server
504    server_state
505        .set_heartbeat_state(Arc::new(heartbeat_state.clone()))
506        .await;
507
508    // Create agent bus for in-process sub-agent communication
509    let bus = AgentBus::new().into_arc();
510    server_state.set_bus(bus.clone()).await;
511
512    // Auto-start S3 sink if MinIO is configured
513    crate::bus::s3_sink::spawn_bus_s3_sink(bus.clone());
514
515    {
516        let handle = bus.handle(&worker_id);
517        handle.announce_ready(worker_capabilities());
518    }
519
520    let _a2a_peer = maybe_start_worker_a2a_peer(&args, &name, bus.clone()).await;
521
522    let task_progress_ws: Arc<Mutex<task_timeline::TaskProgressState>> =
523        Arc::new(Mutex::new(task_timeline::TaskProgressState::new()));
524
525    let task_runtime = WorkerTaskRuntime {
526        client: client.clone(),
527        server: server.to_string(),
528        token: args.token.clone(),
529        worker_id: worker_id.clone(),
530        agent_name: name.clone(),
531        processing: processing.clone(),
532        max_concurrent_tasks,
533        auto_approve,
534        bus: bus.clone(),
535        task_progress: task_progress_ws.clone(),
536        workspace_ids: Vec::new(),
537    };
538
539    // Register worker
540    {
541        let codebases = shared_codebases.lock().await.clone();
542        register_worker(
543            &client,
544            server,
545            &args.token,
546            &worker_id,
547            &name,
548            &codebases,
549            args.public_url.as_deref(),
550        )
551        .await?;
552    }
553
554    if let Err(e) =
555        sync_workspaces_from_server(&client, server, &args.token, &shared_codebases).await
556    {
557        tracing::warn!(error = %e, "Initial workspace sync failed");
558    }
559
560    // Mark as connected
561    server_state.set_connected(true).await;
562
563    // Fetch pending tasks before entering reconnection loop
564    fetch_pending_tasks(&task_runtime).await?;
565
566    // Start background task that polls server for new locally-present workspaces
567    let _workspace_sync_handle = start_workspace_sync(
568        client.clone(),
569        server.to_string(),
570        args.token.clone(),
571        shared_codebases.clone(),
572    );
573
574    // Connect to SSE stream
575    loop {
576        // Refresh codebases snapshot (background sync may have added new local paths)
577        let codebases = shared_codebases.lock().await.clone();
578
579        // Create task notification channel for CloudEvent-triggered task execution
580        // Recreate on each reconnection since the receiver is moved into connect_stream
581        let (task_notify_tx, task_notify_rx) = mpsc::channel::<String>(32);
582        server_state
583            .set_task_notification_channel(task_notify_tx)
584            .await;
585
586        // Mark as connected on each reconnection
587        server_state.set_connected(true).await;
588
589        // Re-register worker on each reconnection to report updated models/capabilities
590        if let Err(e) = register_worker(
591            &client,
592            server,
593            &args.token,
594            &worker_id,
595            &name,
596            &codebases,
597            args.public_url.as_deref(),
598        )
599        .await
600        {
601            tracing::warn!("Failed to re-register worker on reconnection: {}", e);
602        }
603        if let Err(e) = fetch_pending_tasks(&task_runtime).await {
604            tracing::warn!("Reconnect task fetch failed: {}", e);
605        }
606
607        // Start heartbeat task for this connection
608        let heartbeat_handle = start_heartbeat(
609            client.clone(),
610            server.to_string(),
611            args.token.clone(),
612            heartbeat_state.clone(),
613            processing.clone(),
614            cognition_heartbeat.clone(),
615            task_progress_ws.clone(),
616        );
617
618        match connect_stream(&task_runtime, &name, &codebases, Some(task_notify_rx)).await {
619            Ok(StreamDisconnectReason::Ended) => {
620                tracing::warn!("Stream ended, reconnecting...");
621            }
622            Ok(StreamDisconnectReason::ReadError(error)) => {
623                tracing::warn!(error = %error, "Stream read failed, reconnecting...");
624            }
625            Err(e) => {
626                tracing::error!("Stream error: {}, reconnecting...", e);
627            }
628        }
629
630        // Mark as disconnected
631        server_state.set_connected(false).await;
632
633        // Cancel heartbeat on disconnection
634        heartbeat_handle.abort();
635        tracing::debug!("Heartbeat cancelled for reconnection");
636
637        tokio::time::sleep(tokio::time::Duration::from_secs(5)).await;
638    }
639}
640
641pub fn generate_worker_id() -> String {
642    format!(
643        "wrk_{}_{:x}",
644        chrono::Utc::now().timestamp(),
645        rand::random::<u64>()
646    )
647}
648
649fn build_worker_http_client() -> Result<Client> {
650    Client::builder()
651        .no_gzip()
652        .no_brotli()
653        .no_zstd()
654        .no_deflate()
655        .build()
656        .context("Failed to build worker HTTP client")
657}
658
659fn resolve_worker_id() -> String {
660    for key in ["CODETETHER_WORKER_ID", "A2A_WORKER_ID"] {
661        if let Ok(value) = std::env::var(key) {
662            let trimmed = value.trim();
663            if !trimmed.is_empty() {
664                return trimmed.to_string();
665            }
666        }
667    }
668    generate_worker_id()
669}
670
671fn normalize_max_concurrent_tasks(max_concurrent_tasks: usize) -> usize {
672    max_concurrent_tasks.max(1)
673}
674
675fn persistent_worker_enabled() -> bool {
676    std::env::var("PERSISTENT_WORKER_ENABLED")
677        .map(|value| {
678            matches!(
679                value.trim().to_ascii_lowercase().as_str(),
680                "1" | "true" | "yes" | "on"
681            )
682        })
683        .unwrap_or(false)
684}
685
686fn persistent_worker_lease_seconds() -> u64 {
687    std::env::var("PERSISTENT_WORKER_LEASE_SECONDS")
688        .ok()
689        .and_then(|value| value.trim().parse::<u64>().ok())
690        .filter(|value| *value > 0)
691        .unwrap_or(3600)
692}
693
694async fn reserve_task_slot(
695    processing: &Arc<Mutex<HashSet<String>>>,
696    task_id: &str,
697    max_concurrent_tasks: usize,
698) -> TaskReservation {
699    let mut proc = processing.lock().await;
700    if proc.contains(task_id) {
701        TaskReservation::AlreadyProcessing
702    } else if proc.len() >= max_concurrent_tasks {
703        TaskReservation::AtCapacity
704    } else {
705        proc.insert(task_id.to_string());
706        TaskReservation::Reserved
707    }
708}
709
710fn export_worker_runtime_env(server: &str, token: &Option<String>, worker_id: &str) {
711    // SAFETY: The worker sets these process-wide variables once during startup before
712    // spawning Git helper child processes. They are required so Git credential helpers
713    // invoked by later shell/git commands can reach the control plane securely.
714    unsafe {
715        std::env::set_var("CODETETHER_SERVER", server);
716        std::env::set_var("CODETETHER_WORKER_ID", worker_id);
717        if let Some(token) = token {
718            std::env::set_var("CODETETHER_TOKEN", token);
719        }
720    }
721}
722
723fn advertised_interfaces(public_url: Option<&str>) -> serde_json::Value {
724    let Some(base_url) = public_url
725        .map(str::trim)
726        .filter(|value| !value.is_empty())
727        .map(|value| value.trim_end_matches('/').to_string())
728    else {
729        return serde_json::json!({});
730    };
731
732    serde_json::json!({
733        "http": {
734            "base_url": base_url,
735        },
736        "bus": {
737            "stream_url": format!("{base_url}/v1/bus/stream"),
738            "publish_url": format!("{base_url}/v1/bus/publish"),
739        },
740    })
741}
742
743/// Approval policy used by the worker when deciding whether to execute tools automatically.
744///
745/// # Examples
746///
747/// ```rust
748/// use codetether_agent::a2a::worker::AutoApprove;
749///
750/// let mode = AutoApprove::Safe;
751/// assert!(matches!(mode, AutoApprove::Safe));
752/// ```
753#[derive(Debug, Clone, Copy)]
754pub enum AutoApprove {
755    All,
756    Safe,
757    None,
758}
759
760/// Default A2A server URL when none is configured
761pub const DEFAULT_A2A_SERVER_URL: &str = "https://api.codetether.run";
762
763/// Capabilities of the codetether-agent worker
764const PERSISTENT_WORKER_CAPABILITIES: &[&str] = &[
765    "forage", "ralph", "swarm", "rlm", "a2a", "mcp", "grpc", "grpc-web", "jsonrpc",
766];
767
768fn worker_capabilities() -> Vec<String> {
769    let is_knative = std::env::var("KNATIVE_SERVICE")
770        .map(|value| {
771            let normalized = value.trim().to_lowercase();
772            normalized == "1" || normalized == "true" || normalized == "yes"
773        })
774        .unwrap_or(false);
775
776    if is_knative {
777        return vec!["knative".to_string()];
778    }
779
780    let mut capabilities: Vec<String> = PERSISTENT_WORKER_CAPABILITIES
781        .iter()
782        .map(|capability| capability.to_string())
783        .collect();
784
785    capabilities.push("persistent".to_string());
786
787    capabilities
788}
789
790/// Copy timeline progress into the runtime's shared progress state for heartbeat.
791async fn sync_timeline_to_runtime(
792    timeline: &task_timeline::TaskTimeline,
793    runtime: &WorkerTaskRuntime,
794) {
795    timeline.sync_progress().await;
796    let state = timeline.progress_handle().lock().await.clone();
797    *runtime.task_progress.lock().await = state;
798}
799
800/// Resolve workspace IDs for the configured workspace roots and log diagnostics.
801async fn resolve_and_log_workspace_ids(
802    client: &Client,
803    server: &str,
804    token: &Option<String>,
805    workspace_roots: &[String],
806) -> Vec<String> {
807    match workspace_resolve::resolve_workspace_ids(client, server, token, workspace_roots).await {
808        Ok(resolved) => {
809            if resolved.is_empty() {
810                tracing::warn!(
811                    roots = ?workspace_roots,
812                    "No server-side workspace IDs resolved for configured roots \
813                     — the server may not route tasks to this worker. \
814                     Ensure workspaces are registered with the control plane \
815                     and that their paths fall under the configured roots."
816                );
817            } else {
818                let ids: Vec<String> = resolved.iter().map(|ws| ws.id.clone()).collect();
819                tracing::info!(
820                    workspace_ids = ?ids,
821                    count = resolved.len(),
822                    "Resolved server-side workspace IDs for configured roots"
823                );
824            }
825            resolved.iter().map(|ws| ws.id.clone()).collect()
826        }
827        Err(e) => {
828            tracing::warn!(error = %e, "Failed to resolve workspace IDs from server");
829            Vec::new()
830        }
831    }
832}
833
834fn task_value<'a>(task: &'a serde_json::Value, key: &str) -> Option<&'a serde_json::Value> {
835    task.get("task")
836        .and_then(|t| t.get(key))
837        .or_else(|| task.get(key))
838}
839
840fn task_str<'a>(task: &'a serde_json::Value, key: &str) -> Option<&'a str> {
841    task_value(task, key).and_then(|v| v.as_str())
842}
843
844fn task_u64(task: &serde_json::Value, key: &str) -> Option<u64> {
845    let value = task_value(task, key)?;
846    if let Some(v) = value.as_u64() {
847        return Some(v);
848    }
849    if let Some(v) = value.as_i64()
850        && v >= 0
851    {
852        return Some(v as u64);
853    }
854    if let Some(v) = value.as_str()
855        && let Ok(parsed) = v.trim().parse::<u64>()
856    {
857        return Some(parsed);
858    }
859    None
860}
861
862fn task_metadata(task: &serde_json::Value) -> serde_json::Map<String, serde_json::Value> {
863    task_value(task, "metadata")
864        .and_then(|m| m.as_object())
865        .cloned()
866        .unwrap_or_default()
867}
868
869fn task_timeout_secs(
870    task: &serde_json::Value,
871    metadata: &serde_json::Map<String, serde_json::Value>,
872) -> u64 {
873    task_u64(task, "task_timeout_seconds")
874        .or_else(|| task_u64(task, "timeout_secs"))
875        .or_else(|| {
876            metadata_u64(
877                metadata,
878                &["task_timeout_seconds", "timeout_secs", "timeout"],
879            )
880        })
881        .unwrap_or(1200)
882        .clamp(60, 604800)
883}
884
885fn model_ref_to_provider_model(model: &str) -> String {
886    // Convert "provider:model" to "provider/model" format, but only if
887    // there is no '/' already present. Model IDs like "amazon.nova-micro-v1:0"
888    // contain colons as version separators and must NOT be converted.
889    if !model.contains('/') && model.contains(':') {
890        model.replacen(':', "/", 1)
891    } else {
892        model.to_string()
893    }
894}
895
896fn is_swarm_agent(agent_type: &str) -> bool {
897    matches!(
898        agent_type.trim().to_ascii_lowercase().as_str(),
899        "swarm" | "parallel" | "multi-agent"
900    )
901}
902
903fn is_forage_agent(agent_type: &str) -> bool {
904    agent_type.trim().eq_ignore_ascii_case("forage")
905}
906
907fn metadata_lookup<'a>(
908    metadata: &'a serde_json::Map<String, serde_json::Value>,
909    key: &str,
910) -> Option<&'a serde_json::Value> {
911    metadata
912        .get(key)
913        .or_else(|| {
914            metadata
915                .get("routing")
916                .and_then(|v| v.as_object())
917                .and_then(|obj| obj.get(key))
918        })
919        .or_else(|| {
920            metadata
921                .get("swarm")
922                .and_then(|v| v.as_object())
923                .and_then(|obj| obj.get(key))
924        })
925        .or_else(|| {
926            metadata
927                .get("forage")
928                .and_then(|v| v.as_object())
929                .and_then(|obj| obj.get(key))
930        })
931}
932
933fn metadata_str(
934    metadata: &serde_json::Map<String, serde_json::Value>,
935    keys: &[&str],
936) -> Option<String> {
937    for key in keys {
938        if let Some(value) = metadata_lookup(metadata, key).and_then(|v| v.as_str()) {
939            let trimmed = value.trim();
940            if !trimmed.is_empty() {
941                return Some(trimmed.to_string());
942            }
943        }
944    }
945    None
946}
947
948fn metadata_usize(
949    metadata: &serde_json::Map<String, serde_json::Value>,
950    keys: &[&str],
951) -> Option<usize> {
952    for key in keys {
953        if let Some(value) = metadata_lookup(metadata, key) {
954            if let Some(v) = value.as_u64() {
955                return usize::try_from(v).ok();
956            }
957            if let Some(v) = value.as_i64()
958                && v >= 0
959            {
960                return usize::try_from(v as u64).ok();
961            }
962            if let Some(v) = value.as_str()
963                && let Ok(parsed) = v.trim().parse::<usize>()
964            {
965                return Some(parsed);
966            }
967        }
968    }
969    None
970}
971
972fn metadata_u64(
973    metadata: &serde_json::Map<String, serde_json::Value>,
974    keys: &[&str],
975) -> Option<u64> {
976    for key in keys {
977        if let Some(value) = metadata_lookup(metadata, key) {
978            if let Some(v) = value.as_u64() {
979                return Some(v);
980            }
981            if let Some(v) = value.as_i64()
982                && v >= 0
983            {
984                return Some(v as u64);
985            }
986            if let Some(v) = value.as_str()
987                && let Ok(parsed) = v.trim().parse::<u64>()
988            {
989                return Some(parsed);
990            }
991        }
992    }
993    None
994}
995
996fn metadata_bool(
997    metadata: &serde_json::Map<String, serde_json::Value>,
998    keys: &[&str],
999) -> Option<bool> {
1000    for key in keys {
1001        if let Some(value) = metadata_lookup(metadata, key) {
1002            if let Some(v) = value.as_bool() {
1003                return Some(v);
1004            }
1005            if let Some(v) = value.as_str() {
1006                match v.trim().to_ascii_lowercase().as_str() {
1007                    "1" | "true" | "yes" | "on" => return Some(true),
1008                    "0" | "false" | "no" | "off" => return Some(false),
1009                    _ => {}
1010                }
1011            }
1012        }
1013    }
1014    None
1015}
1016
1017fn metadata_f64(
1018    metadata: &serde_json::Map<String, serde_json::Value>,
1019    keys: &[&str],
1020) -> Option<f64> {
1021    for key in keys {
1022        if let Some(value) = metadata_lookup(metadata, key) {
1023            if let Some(v) = value.as_f64() {
1024                return Some(v);
1025            }
1026            if let Some(v) = value.as_i64() {
1027                return Some(v as f64);
1028            }
1029            if let Some(v) = value.as_u64() {
1030                return Some(v as f64);
1031            }
1032            if let Some(v) = value.as_str()
1033                && let Ok(parsed) = v.trim().parse::<f64>()
1034            {
1035                return Some(parsed);
1036            }
1037        }
1038    }
1039    None
1040}
1041
1042fn metadata_string_list(
1043    metadata: &serde_json::Map<String, serde_json::Value>,
1044    keys: &[&str],
1045) -> Vec<String> {
1046    for key in keys {
1047        let Some(value) = metadata_lookup(metadata, key) else {
1048            continue;
1049        };
1050
1051        if let Some(items) = value.as_array() {
1052            let parsed = items
1053                .iter()
1054                .filter_map(|item| item.as_str())
1055                .map(str::trim)
1056                .filter(|item| !item.is_empty())
1057                .map(ToString::to_string)
1058                .collect::<Vec<_>>();
1059            if !parsed.is_empty() {
1060                return parsed;
1061            }
1062        }
1063
1064        if let Some(value) = value.as_str() {
1065            let parsed = value
1066                .split(['\n', ','])
1067                .map(str::trim)
1068                .filter(|item| !item.is_empty())
1069                .map(ToString::to_string)
1070                .collect::<Vec<_>>();
1071            if !parsed.is_empty() {
1072                return parsed;
1073            }
1074        }
1075    }
1076
1077    Vec::new()
1078}
1079
1080fn normalize_forage_execution_engine(value: Option<String>) -> String {
1081    match value
1082        .as_deref()
1083        .unwrap_or("run")
1084        .trim()
1085        .to_ascii_lowercase()
1086        .as_str()
1087    {
1088        "swarm" => "swarm".to_string(),
1089        "go" => "go".to_string(),
1090        _ => "run".to_string(),
1091    }
1092}
1093
1094fn normalize_forage_swarm_strategy(value: Option<String>) -> String {
1095    match value
1096        .as_deref()
1097        .unwrap_or("auto")
1098        .trim()
1099        .to_ascii_lowercase()
1100        .as_str()
1101    {
1102        "domain" => "domain".to_string(),
1103        "data" => "data".to_string(),
1104        "stage" => "stage".to_string(),
1105        "none" => "none".to_string(),
1106        _ => "auto".to_string(),
1107    }
1108}
1109
1110fn build_forage_args(
1111    prompt: &str,
1112    title: &str,
1113    metadata: &serde_json::Map<String, serde_json::Value>,
1114    selected_model: Option<String>,
1115) -> ForageArgs {
1116    let mut moonshots = metadata_string_list(
1117        metadata,
1118        &["moonshots", "moonshot", "goals", "mission", "missions"],
1119    );
1120    if moonshots.is_empty() {
1121        let fallback = prompt.trim();
1122        if !fallback.is_empty() {
1123            moonshots.push(fallback.to_string());
1124        } else if !title.trim().is_empty() {
1125            moonshots.push(title.trim().to_string());
1126        }
1127    }
1128
1129    ForageArgs {
1130        top: metadata_usize(metadata, &["top"]).unwrap_or(3),
1131        loop_mode: metadata_bool(metadata, &["loop", "loop_mode"]).unwrap_or(false),
1132        interval_secs: metadata_u64(metadata, &["interval_secs", "interval"]).unwrap_or(120),
1133        max_cycles: metadata_usize(metadata, &["max_cycles"]).unwrap_or(0),
1134        execute: metadata_bool(metadata, &["execute"]).unwrap_or(false),
1135        // Task-queue forage should run without requiring S3 archival by default.
1136        no_s3: metadata_bool(metadata, &["no_s3", "local_only"]).unwrap_or(true),
1137        moonshots,
1138        moonshot_file: metadata_str(metadata, &["moonshot_file"]).map(PathBuf::from),
1139        moonshot_required: metadata_bool(metadata, &["moonshot_required"]).unwrap_or(false),
1140        moonshot_min_alignment: metadata_f64(metadata, &["moonshot_min_alignment"]).unwrap_or(0.10),
1141        execution_engine: normalize_forage_execution_engine(metadata_str(
1142            metadata,
1143            &["execution_engine", "engine"],
1144        )),
1145        run_timeout_secs: metadata_u64(metadata, &["run_timeout_secs", "timeout_secs", "timeout"])
1146            .unwrap_or(900),
1147        fail_fast: metadata_bool(metadata, &["fail_fast"]).unwrap_or(false),
1148        swarm_strategy: normalize_forage_swarm_strategy(metadata_str(
1149            metadata,
1150            &["swarm_strategy", "strategy"],
1151        )),
1152        swarm_max_subagents: metadata_usize(metadata, &["swarm_max_subagents"]).unwrap_or(8),
1153        swarm_max_steps: metadata_usize(metadata, &["swarm_max_steps"]).unwrap_or(100),
1154        swarm_subagent_timeout_secs: metadata_u64(metadata, &["swarm_subagent_timeout_secs"])
1155            .unwrap_or(300),
1156        model: selected_model,
1157        json: metadata_bool(metadata, &["json"]).unwrap_or(false),
1158    }
1159}
1160
1161fn parse_swarm_strategy(
1162    metadata: &serde_json::Map<String, serde_json::Value>,
1163) -> DecompositionStrategy {
1164    match metadata_str(
1165        metadata,
1166        &[
1167            "decomposition_strategy",
1168            "swarm_strategy",
1169            "strategy",
1170            "swarm_decomposition",
1171        ],
1172    )
1173    .as_deref()
1174    .map(|s| s.to_ascii_lowercase())
1175    .as_deref()
1176    {
1177        Some("none") | Some("single") => DecompositionStrategy::None,
1178        Some("domain") | Some("by_domain") => DecompositionStrategy::ByDomain,
1179        Some("data") | Some("by_data") => DecompositionStrategy::ByData,
1180        Some("stage") | Some("by_stage") => DecompositionStrategy::ByStage,
1181        _ => DecompositionStrategy::Automatic,
1182    }
1183}
1184
1185async fn resolve_swarm_model(
1186    explicit_model: Option<String>,
1187    model_tier: Option<&str>,
1188) -> Option<String> {
1189    if let Some(model) = explicit_model
1190        && !model.trim().is_empty()
1191    {
1192        return Some(model);
1193    }
1194
1195    let registry = ProviderRegistry::shared_from_vault().await.ok()?;
1196    let providers = registry.list();
1197    if providers.is_empty() {
1198        return None;
1199    }
1200    let provider = choose_provider_for_tier(providers.as_slice(), model_tier);
1201    let model = default_model_for_provider(provider, model_tier);
1202    Some(format!("{}/{}", provider, model))
1203}
1204
1205fn format_swarm_event_for_output(event: &SwarmEvent) -> Option<String> {
1206    match event {
1207        SwarmEvent::Started {
1208            task,
1209            total_subtasks,
1210        } => Some(format!(
1211            "[swarm] started task={} planned_subtasks={}",
1212            task, total_subtasks
1213        )),
1214        SwarmEvent::StageComplete {
1215            stage,
1216            completed,
1217            failed,
1218        } => Some(format!(
1219            "[swarm] stage={} completed={} failed={}",
1220            stage, completed, failed
1221        )),
1222        SwarmEvent::SubTaskUpdate { id, status, .. } => Some(format!(
1223            "[swarm] subtask id={} status={}",
1224            &id.chars().take(8).collect::<String>(),
1225            format!("{status:?}").to_ascii_lowercase()
1226        )),
1227        SwarmEvent::AgentToolCall {
1228            subtask_id,
1229            tool_name,
1230        } => Some(format!(
1231            "[swarm] subtask id={} tool={}",
1232            &subtask_id.chars().take(8).collect::<String>(),
1233            tool_name
1234        )),
1235        SwarmEvent::AgentError { subtask_id, error } => Some(format!(
1236            "[swarm] subtask id={} error={}",
1237            &subtask_id.chars().take(8).collect::<String>(),
1238            error
1239        )),
1240        SwarmEvent::Complete { success, stats } => Some(format!(
1241            "[swarm] complete success={} subtasks={} speedup={:.2}",
1242            success,
1243            stats.subagents_completed + stats.subagents_failed,
1244            stats.speedup_factor
1245        )),
1246        SwarmEvent::Error(err) => Some(format!("[swarm] error message={}", err)),
1247        _ => None,
1248    }
1249}
1250
1251pub async fn register_worker(
1252    client: &Client,
1253    server: &str,
1254    token: &Option<String>,
1255    worker_id: &str,
1256    name: &str,
1257    codebases: &[String],
1258    public_url: Option<&str>,
1259) -> Result<()> {
1260    // Load ProviderRegistry and collect available models
1261    let models = match load_provider_models().await {
1262        Ok(m) => m,
1263        Err(e) => {
1264            tracing::warn!(
1265                "Failed to load provider models: {}, proceeding without model info",
1266                e
1267            );
1268            HashMap::new()
1269        }
1270    };
1271
1272    // Register via the workers/register endpoint
1273    let mut req = client.post(format!("{}/v1/agent/workers/register", server));
1274
1275    if let Some(t) = token {
1276        req = req.bearer_auth(t);
1277    }
1278
1279    // Flatten models HashMap into array of model objects with pricing data
1280    // matching the format expected by the A2A server's /models and /workers endpoints
1281    let models_array: Vec<serde_json::Value> = models
1282        .iter()
1283        .flat_map(|(provider, model_infos)| {
1284            model_infos.iter().map(move |m| {
1285                let mut obj = serde_json::json!({
1286                    "id": format!("{}/{}", provider, m.id),
1287                    "name": &m.id,
1288                    "provider": provider,
1289                    "provider_id": provider,
1290                });
1291                if let Some(input_cost) = m.input_cost_per_million {
1292                    obj["input_cost_per_million"] = serde_json::json!(input_cost);
1293                }
1294                if let Some(output_cost) = m.output_cost_per_million {
1295                    obj["output_cost_per_million"] = serde_json::json!(output_cost);
1296                }
1297                obj
1298            })
1299        })
1300        .collect();
1301
1302    tracing::info!(
1303        "Registering worker with {} models from {} providers",
1304        models_array.len(),
1305        models.len()
1306    );
1307
1308    let hostname = std::env::var("HOSTNAME")
1309        .or_else(|_| std::env::var("COMPUTERNAME"))
1310        .unwrap_or_else(|_| "unknown".to_string());
1311    let k8s_node_name = std::env::var("K8S_NODE_NAME")
1312        .ok()
1313        .map(|value| value.trim().to_string())
1314        .filter(|value| !value.is_empty());
1315
1316    // Collect agent definitions from the builtin registry
1317    let registry = crate::agent::AgentRegistry::with_builtins();
1318    let agent_defs: Vec<serde_json::Value> = registry
1319        .list()
1320        .iter()
1321        .map(|info| {
1322            serde_json::json!({
1323                "name": info.name,
1324                "description": info.description,
1325                "mode": format!("{:?}", info.mode).to_lowercase(),
1326                "native": info.native,
1327                "hidden": info.hidden,
1328                "model": info.model,
1329                "temperature": info.temperature,
1330                "top_p": info.top_p,
1331                "max_steps": info.max_steps,
1332            })
1333        })
1334        .collect();
1335
1336    // Resolve workspace IDs that correspond to our configured paths.
1337    let workspace_ids = resolve_and_log_workspace_ids(client, server, token, codebases).await;
1338
1339    let res = req
1340        .json(&serde_json::json!({
1341            "worker_id": worker_id,
1342            "name": name,
1343            "capabilities": worker_capabilities(),
1344            "hostname": hostname,
1345            "k8s_node_name": k8s_node_name,
1346            "models": models_array,
1347            "workspaces": codebases,
1348            "workspace_ids": workspace_ids,
1349            "interfaces": advertised_interfaces(public_url),
1350            "agents": agent_defs,
1351        }))
1352        .send()
1353        .await?;
1354
1355    if res.status().is_success() {
1356        tracing::info!("Worker registered successfully");
1357    } else {
1358        tracing::warn!("Failed to register worker: {}", res.status());
1359    }
1360
1361    Ok(())
1362}
1363
1364/// Load ProviderRegistry and collect all available models grouped by provider.
1365/// Tries Vault first, then falls back to config/env vars if Vault is unreachable.
1366/// Returns ModelInfo structs (with pricing data when available).
1367///
1368/// Result is cached process-wide: repeated calls (e.g. TUI startup +
1369/// background worker registration) do not re-hit every provider's
1370/// `/models` endpoint. The cache is populated exactly once per process
1371/// on the first call, so concurrent callers share one network fanout.
1372async fn load_provider_models() -> Result<HashMap<String, Vec<crate::provider::ModelInfo>>> {
1373    use tokio::sync::OnceCell;
1374    static CACHE: OnceCell<HashMap<String, Vec<crate::provider::ModelInfo>>> =
1375        OnceCell::const_new();
1376    CACHE
1377        .get_or_try_init(|| async { load_provider_models_uncached().await })
1378        .await
1379        .cloned()
1380}
1381
1382async fn load_provider_models_uncached() -> Result<HashMap<String, Vec<crate::provider::ModelInfo>>>
1383{
1384    // Try Vault first
1385    let registry = match ProviderRegistry::shared_from_vault().await {
1386        Ok(r) if !r.list().is_empty() => {
1387            tracing::info!("Loaded {} providers from Vault", r.list().len());
1388            (*r).clone()
1389        }
1390        Ok(_) => {
1391            tracing::warn!("Vault returned 0 providers, falling back to config/env vars");
1392            fallback_registry().await?
1393        }
1394        Err(e) => {
1395            tracing::warn!("Vault unreachable ({}), falling back to config/env vars", e);
1396            fallback_registry().await?
1397        }
1398    };
1399
1400    let mut models_by_provider: HashMap<String, Vec<crate::provider::ModelInfo>> = HashMap::new();
1401
1402    for provider_name in registry.list() {
1403        if let Some(provider) = registry.get(provider_name) {
1404            match provider.list_models().await {
1405                Ok(models) => {
1406                    if !models.is_empty() {
1407                        tracing::debug!("Provider {}: {} models", provider_name, models.len());
1408                        models_by_provider.insert(provider_name.to_string(), models);
1409                    }
1410                }
1411                Err(e) => {
1412                    tracing::debug!("Failed to list models for {}: {}", provider_name, e);
1413                }
1414            }
1415        }
1416    }
1417
1418    Ok(models_by_provider)
1419}
1420
1421/// Fallback: build a ProviderRegistry from config file + environment variables
1422async fn fallback_registry() -> Result<ProviderRegistry> {
1423    let config = crate::config::Config::load().await.unwrap_or_default();
1424    ProviderRegistry::from_config(&config).await
1425}
1426
1427async fn fetch_pending_tasks(runtime: &WorkerTaskRuntime) -> Result<()> {
1428    tracing::info!("Checking for pending tasks...");
1429
1430    let mut url = format!("{}/v1/agent/tasks?status=pending", runtime.server);
1431    if !runtime.agent_name.is_empty() {
1432        url = format!(
1433            "{}&agent_name={}",
1434            url,
1435            urlencoding::encode(&runtime.agent_name)
1436        );
1437    }
1438    let mut req = runtime.client.get(&url);
1439    if let Some(t) = &runtime.token {
1440        req = req.bearer_auth(t);
1441    }
1442
1443    let res = req.send().await?;
1444    if !res.status().is_success() {
1445        return Ok(());
1446    }
1447
1448    let data: serde_json::Value = res.json().await?;
1449    // Handle both plain array response and {tasks: [...]} wrapper
1450    let tasks = if let Some(arr) = data.as_array() {
1451        arr.clone()
1452    } else {
1453        data["tasks"].as_array().cloned().unwrap_or_default()
1454    };
1455
1456    tracing::info!("Found {} pending task(s)", tasks.len());
1457
1458    for task in tasks {
1459        if let Some(id) = task["id"].as_str() {
1460            // Scope gate: reject tasks not targeted at this worker
1461            if let Err(reason) = check_task_scope(
1462                &task,
1463                &runtime.worker_id,
1464                &runtime.agent_name,
1465                &runtime.workspace_ids,
1466            ) {
1467                tracing::debug!(
1468                    task_id = id,
1469                    reason = %reason,
1470                    "Pending task skipped — out of scope"
1471                );
1472                continue;
1473            }
1474
1475            match reserve_task_slot(&runtime.processing, id, runtime.max_concurrent_tasks).await {
1476                TaskReservation::Reserved => {
1477                    let task_id = id.to_string();
1478                    let runtime = runtime.clone();
1479
1480                    tokio::spawn(async move {
1481                        if let Err(e) = handle_task(&runtime, &task).await {
1482                            tracing::error!("Task {} failed: {}", task_id, e);
1483                        }
1484                        runtime.processing.lock().await.remove(&task_id);
1485                    });
1486                }
1487                TaskReservation::AlreadyProcessing => {}
1488                TaskReservation::AtCapacity => {
1489                    tracing::debug!(
1490                        max_concurrent_tasks = runtime.max_concurrent_tasks,
1491                        "Worker is at task capacity; leaving remaining tasks pending"
1492                    );
1493                    break;
1494                }
1495            }
1496        }
1497    }
1498
1499    Ok(())
1500}
1501
1502#[derive(Debug, PartialEq, Eq)]
1503enum StreamDisconnectReason {
1504    Ended,
1505    ReadError(String),
1506}
1507
1508async fn connect_stream(
1509    runtime: &WorkerTaskRuntime,
1510    name: &str,
1511    codebases: &[String],
1512    task_notify_rx: Option<mpsc::Receiver<String>>,
1513) -> Result<StreamDisconnectReason> {
1514    let url = format!(
1515        "{}/v1/worker/tasks/stream?agent_name={}&worker_id={}",
1516        runtime.server,
1517        urlencoding::encode(name),
1518        urlencoding::encode(&runtime.worker_id)
1519    );
1520
1521    let mut req = runtime
1522        .client
1523        .get(&url)
1524        .header("Accept", "text/event-stream")
1525        .header("Accept-Encoding", "identity")
1526        .header("Cache-Control", "no-cache, no-transform")
1527        .header("X-Worker-ID", &runtime.worker_id)
1528        .header("X-Agent-Name", name)
1529        .header("X-Codebases", codebases.join(","))
1530        .header("X-Workspaces", codebases.join(","));
1531
1532    if let Some(t) = &runtime.token {
1533        req = req.bearer_auth(t);
1534    }
1535
1536    let res = req.send().await?;
1537    if !res.status().is_success() {
1538        anyhow::bail!("Failed to connect: {}", res.status());
1539    }
1540
1541    tracing::info!("Connected to A2A server");
1542
1543    let mut stream = res.bytes_stream();
1544    let mut buffer = String::new();
1545    let mut poll_interval = tokio::time::interval(tokio::time::Duration::from_secs(30));
1546    poll_interval.tick().await; // consume the initial immediate tick
1547
1548    // Pin the optional receiver so we can use it in the loop
1549    let mut task_notify_rx = task_notify_rx;
1550
1551    loop {
1552        tokio::select! {
1553            // Handle task notification from CloudEvent (Knative Eventing)
1554            // Only if the channel was provided (i.e., running with HTTP server)
1555            task_id = async {
1556                if let Some(ref mut rx) = task_notify_rx {
1557                    rx.recv().await
1558                } else {
1559                    // Never ready when None - use pending to skip this branch
1560                    futures::future::pending().await
1561                }
1562            } => {
1563                if let Some(task_id) = task_id {
1564                    tracing::info!("Received task notification via CloudEvent: {}", task_id);
1565                    // Immediately poll for and process this task
1566                    if let Err(e) = poll_pending_tasks(runtime).await {
1567                        tracing::warn!("Task notification poll failed: {}", e);
1568                    }
1569                }
1570            }
1571            chunk = stream.next() => {
1572                match chunk {
1573                    Some(Ok(chunk)) => {
1574                        buffer.push_str(&String::from_utf8_lossy(&chunk));
1575
1576                        // Process SSE events
1577                        while let Some(pos) = buffer.find("\n\n") {
1578                            let event_str = buffer[..pos].to_string();
1579                            buffer = buffer[pos + 2..].to_string();
1580
1581                            if let Some(data_line) = event_str.lines().find(|l| l.starts_with("data:")) {
1582                                let data = data_line.trim_start_matches("data:").trim();
1583                                if data == "[DONE]" || data.is_empty() {
1584                                    continue;
1585                                }
1586
1587                                if let Ok(task) = serde_json::from_str::<serde_json::Value>(data) {
1588                                    spawn_task_handler(&task, runtime).await;
1589                                }
1590                            }
1591                        }
1592                    }
1593                    Some(Err(e)) => {
1594                        return Ok(StreamDisconnectReason::ReadError(e.to_string()));
1595                    }
1596                    None => {
1597                        // Stream ended
1598                        return Ok(StreamDisconnectReason::Ended);
1599                    }
1600                }
1601            }
1602            _ = poll_interval.tick() => {
1603                // Periodic poll for pending tasks the SSE stream may have missed
1604                if let Err(e) = poll_pending_tasks(runtime).await {
1605                    tracing::warn!("Periodic task poll failed: {}", e);
1606                }
1607            }
1608        }
1609    }
1610}
1611
1612async fn spawn_task_handler(task: &serde_json::Value, runtime: &WorkerTaskRuntime) {
1613    if let Some(id) = task
1614        .get("task")
1615        .and_then(|t| t["id"].as_str())
1616        .or_else(|| task["id"].as_str())
1617    {
1618        // Scope gate: reject tasks not targeted at this worker
1619        if let Err(reason) = check_task_scope(
1620            task,
1621            &runtime.worker_id,
1622            &runtime.agent_name,
1623            &runtime.workspace_ids,
1624        ) {
1625            tracing::debug!(
1626                task_id = id,
1627                reason = %reason,
1628                "SSE task skipped — out of scope"
1629            );
1630            return;
1631        }
1632
1633        match reserve_task_slot(&runtime.processing, id, runtime.max_concurrent_tasks).await {
1634            TaskReservation::Reserved => {
1635                let task_id = id.to_string();
1636                let task = task.clone();
1637                let runtime = runtime.clone();
1638
1639                tokio::spawn(async move {
1640                    if let Err(e) = handle_task(&runtime, &task).await {
1641                        tracing::error!("Task {} failed: {}", task_id, e);
1642                    }
1643                    runtime.processing.lock().await.remove(&task_id);
1644                });
1645            }
1646            TaskReservation::AlreadyProcessing => {}
1647            TaskReservation::AtCapacity => {
1648                tracing::debug!(
1649                    task_id = id,
1650                    max_concurrent_tasks = runtime.max_concurrent_tasks,
1651                    "Worker is at task capacity; task will stay pending until a slot frees up"
1652                );
1653            }
1654        }
1655    }
1656}
1657
1658async fn poll_pending_tasks(runtime: &WorkerTaskRuntime) -> Result<()> {
1659    let mut url = format!("{}/v1/agent/tasks?status=pending", runtime.server);
1660    if !runtime.agent_name.is_empty() {
1661        url = format!(
1662            "{}&agent_name={}",
1663            url,
1664            urlencoding::encode(&runtime.agent_name)
1665        );
1666    }
1667    let mut req = runtime.client.get(&url);
1668    if let Some(t) = &runtime.token {
1669        req = req.bearer_auth(t);
1670    }
1671
1672    let res = req.send().await?;
1673    if !res.status().is_success() {
1674        return Ok(());
1675    }
1676
1677    let data: serde_json::Value = res.json().await?;
1678    let tasks = if let Some(arr) = data.as_array() {
1679        arr.clone()
1680    } else {
1681        data["tasks"].as_array().cloned().unwrap_or_default()
1682    };
1683
1684    if !tasks.is_empty() {
1685        tracing::debug!("Poll found {} pending task(s)", tasks.len());
1686    }
1687
1688    for task in &tasks {
1689        // Scope gate: reject tasks not targeted at this worker
1690        let task_id_str = task_str(task, "id").unwrap_or("?");
1691        if let Err(reason) = check_task_scope(
1692            task,
1693            &runtime.worker_id,
1694            &runtime.agent_name,
1695            &runtime.workspace_ids,
1696        ) {
1697            tracing::debug!(
1698                task_id = task_id_str,
1699                reason = %reason,
1700                "Poll task skipped — out of scope"
1701            );
1702            continue;
1703        }
1704        spawn_task_handler(task, runtime).await;
1705    }
1706
1707    Ok(())
1708}
1709
1710async fn handle_task(runtime: &WorkerTaskRuntime, task: &serde_json::Value) -> Result<()> {
1711    let task_id = task_str(task, "id").ok_or_else(|| anyhow::anyhow!("No task ID"))?;
1712    let title = task_str(task, "title").unwrap_or("Untitled");
1713
1714    // Determine task timeout from the task payload/metadata or default to 1200s.
1715    let metadata = task_metadata(task);
1716    let timeout_secs = task_timeout_secs(task, &metadata);
1717
1718    let mut timeline = task_timeline::TaskTimeline::new(task_id, timeout_secs);
1719    timeline.checkpoint(task_timeline::TaskCheckpoint::TaskReceived);
1720    tracing::info!(
1721        "Handling task: {} ({}) [timeout={}s]",
1722        title,
1723        task_id,
1724        timeout_secs
1725    );
1726
1727    // Wire the timeline's progress state into the runtime so the heartbeat picks it up.
1728    // We'll sync after each checkpoint.
1729    sync_timeline_to_runtime(&timeline, runtime).await;
1730
1731    // Claim the task
1732    timeline.checkpoint(task_timeline::TaskCheckpoint::ClaimRequested);
1733    let mut req = runtime
1734        .client
1735        .post(format!("{}/v1/worker/tasks/claim", runtime.server))
1736        .header("X-Worker-ID", &runtime.worker_id);
1737    if let Some(t) = &runtime.token {
1738        req = req.bearer_auth(t);
1739    }
1740
1741    let res = req
1742        .json(&serde_json::json!({ "task_id": task_id }))
1743        .send()
1744        .await?;
1745
1746    if !res.status().is_success() {
1747        let status = res.status();
1748        let text = res.text().await?;
1749        if status == reqwest::StatusCode::CONFLICT {
1750            tracing::debug!(task_id, "Task already claimed by another worker, skipping");
1751        } else {
1752            tracing::warn!(task_id, %status, "Failed to claim task: {}", text);
1753        }
1754        return Ok(());
1755    }
1756
1757    let claim = res.json::<TaskClaimResponse>().await?;
1758    if let Some(claim_timeout_secs) = claim
1759        .task_timeout_seconds
1760        .map(|timeout_secs| timeout_secs.clamp(60, 604800))
1761    {
1762        timeline.update_timeout_secs(claim_timeout_secs);
1763    }
1764    let provider_keys = claim.provider_keys.clone();
1765    let claim_provenance = claim.into_provenance();
1766    timeline.checkpoint_with_detail(
1767        task_timeline::TaskCheckpoint::Claimed,
1768        Some(format!(
1769            "run_id={:?} attempt_id={:?}",
1770            claim_provenance.run_id, claim_provenance.attempt_id
1771        )),
1772    );
1773    tracing::info!(
1774        task_id,
1775        run_id = ?claim_provenance.run_id,
1776        attempt_id = ?claim_provenance.attempt_id,
1777        "Claimed task"
1778    );
1779
1780    // --- All post-claim work is wrapped so we always release the task ---
1781    let inner_result: Result<(&str, Option<String>, Option<String>, Option<String>)> =
1782        execute_claimed_task(
1783            runtime,
1784            task,
1785            task_id,
1786            title,
1787            &claim_provenance,
1788            provider_keys,
1789            &mut timeline,
1790        )
1791        .await;
1792
1793    let (status, result, error, session_id) = match inner_result {
1794        Ok(tuple) => tuple,
1795        Err(e) => {
1796            tracing::error!(
1797                task_id,
1798                error = %e,
1799                "Task failed after claim (releasing as failed)"
1800            );
1801            timeline.checkpoint_with_detail(
1802                task_timeline::TaskCheckpoint::Failed,
1803                Some(format!("{}", e)),
1804            );
1805            (
1806                "failed",
1807                None,
1808                Some(format!("Worker error after claim: {}", e)),
1809                None,
1810            )
1811        }
1812    };
1813
1814    // Emit final diagnostics before releasing
1815    timeline.emit_diagnostics();
1816    let diagnostics_json = timeline.diagnostics_json();
1817
1818    timeline.checkpoint(task_timeline::TaskCheckpoint::Releasing);
1819    release_task_result(
1820        &runtime.client,
1821        &runtime.server,
1822        &runtime.token,
1823        &runtime.worker_id,
1824        task_id,
1825        status,
1826        result,
1827        error,
1828        session_id,
1829        Some(diagnostics_json),
1830    )
1831    .await?;
1832
1833    timeline.checkpoint(task_timeline::TaskCheckpoint::Released);
1834
1835    // Clear task progress so heartbeat stops reporting stale state
1836    runtime.task_progress.lock().await.clear();
1837
1838    tracing::info!("Task released: {} with status: {}", task_id, status);
1839    Ok(())
1840}
1841
1842/// Inner logic for a claimed task. Returns (status, result, error, session_id).
1843/// Extracted so that `handle_task` can catch any error and always release.
1844#[allow(clippy::too_many_lines)]
1845async fn execute_claimed_task<'a>(
1846    runtime: &WorkerTaskRuntime,
1847    task: &'a serde_json::Value,
1848    task_id: &'a str,
1849    title: &'a str,
1850    claim_provenance: &ClaimProvenance,
1851    provider_keys: Option<serde_json::Value>,
1852    timeline: &mut task_timeline::TaskTimeline,
1853) -> Result<(&'static str, Option<String>, Option<String>, Option<String>)> {
1854    let metadata = task_metadata(task);
1855    timeline.checkpoint(task_timeline::TaskCheckpoint::MetadataParsed);
1856    let resume_session_id = metadata
1857        .get("resume_session_id")
1858        .and_then(|v| v.as_str())
1859        .map(|s| s.trim().to_string())
1860        .filter(|s| !s.is_empty());
1861    let complexity_hint = metadata_str(&metadata, &["complexity"]);
1862    let model_tier = metadata_str(&metadata, &["model_tier", "tier"])
1863        .map(|s| s.to_ascii_lowercase())
1864        .or_else(|| {
1865            complexity_hint.as_ref().map(|complexity| {
1866                match complexity.to_ascii_lowercase().as_str() {
1867                    "quick" => "fast".to_string(),
1868                    "deep" => "heavy".to_string(),
1869                    _ => "balanced".to_string(),
1870                }
1871            })
1872        });
1873    let worker_personality = metadata_str(
1874        &metadata,
1875        &["worker_personality", "personality", "agent_personality"],
1876    );
1877    let target_agent_name = metadata_str(&metadata, &["target_agent_name", "agent_name"]);
1878    let raw_model = task_str(task, "model_ref")
1879        .or_else(|| metadata_lookup(&metadata, "model_ref").and_then(|v| v.as_str()))
1880        .or_else(|| task_str(task, "model"))
1881        .or_else(|| metadata_lookup(&metadata, "model").and_then(|v| v.as_str()));
1882    let selected_model = raw_model.map(model_ref_to_provider_model);
1883    let raw_agent = task_str(task, "agent_type")
1884        .or_else(|| task_str(task, "agent"))
1885        .unwrap_or("build");
1886    let prompt = task_str(task, "prompt")
1887        .or_else(|| task_str(task, "description"))
1888        .unwrap_or(title);
1889
1890    // Detect virtual/global workspace tasks dispatched via the API.
1891    let workspace_id = task_str(task, "workspace_id")
1892        .or_else(|| metadata_lookup(&metadata, "workspace_id").and_then(|v| v.as_str()));
1893    let is_virtual_task = workspace_id.map_or(false, |ws| ws == "global" || ws.is_empty());
1894
1895    if raw_agent.eq_ignore_ascii_case("clone_repo") {
1896        return match handle_clone_repo_task(
1897            &runtime.client,
1898            &runtime.server,
1899            &runtime.token,
1900            &runtime.worker_id,
1901            task,
1902            &metadata,
1903        )
1904        .await
1905        {
1906            Ok(message) => Ok(("completed", Some(message), None, None)),
1907            Err(err) => {
1908                tracing::error!(task_id, error = %err, "Clone task failed");
1909                Ok(("failed", None, Some(format!("Error: {}", err)), None))
1910            }
1911        };
1912    }
1913
1914    if is_forage_agent(raw_agent) {
1915        return match handle_forage_task(title, prompt, &metadata, selected_model.clone()).await {
1916            Ok(message) => Ok(("completed", Some(message), None, None)),
1917            Err(err) => {
1918                tracing::error!(task_id, error = %err, "Forage task failed");
1919                Ok(("failed", None, Some(format!("Error: {}", err)), None))
1920            }
1921        };
1922    }
1923
1924    // Resume existing session when requested; fall back to a fresh session if missing.
1925    let mut session = if let Some(ref sid) = resume_session_id {
1926        match Session::load(sid).await {
1927            Ok(existing) => {
1928                tracing::info!("Resuming session {} for task {}", sid, task_id);
1929                existing
1930            }
1931            Err(e) => {
1932                tracing::warn!(
1933                    "Could not load session {} for task {} ({}), starting a new session",
1934                    sid,
1935                    task_id,
1936                    e
1937                );
1938                Session::new().await?
1939            }
1940        }
1941    } else {
1942        Session::new().await?
1943    };
1944    timeline.checkpoint_with_detail(
1945        task_timeline::TaskCheckpoint::SessionReady,
1946        Some(format!("session_id={}", session.id)),
1947    );
1948
1949    if !is_virtual_task {
1950        if let Some(workspace_path) = resolve_task_workspace_dir(
1951            &runtime.client,
1952            &runtime.server,
1953            &runtime.token,
1954            workspace_id,
1955        )
1956        .await?
1957        {
1958            session.metadata.directory = Some(workspace_path);
1959        }
1960        timeline.checkpoint_with_detail(
1961            task_timeline::TaskCheckpoint::WorkspaceReady,
1962            session
1963                .metadata
1964                .directory
1965                .as_deref()
1966                .map(|d| d.display().to_string()),
1967        );
1968    }
1969
1970    // For virtual/global tasks, clear the workspace directory so tools don't
1971    // try to operate on a workspace that doesn't exist on this worker.
1972    if is_virtual_task {
1973        tracing::info!(
1974            task_id,
1975            workspace_id = workspace_id.unwrap_or("(none)"),
1976            "Virtual task detected — skipping workspace setup"
1977        );
1978        session.metadata.directory = None;
1979    }
1980
1981    // Normalize agent: only "build", "plan", and swarm types are valid.
1982    // Map deprecated/unknown agent types (e.g. "general", "explore") to "build".
1983    let agent_type = if is_swarm_agent(raw_agent) {
1984        raw_agent
1985    } else {
1986        match raw_agent {
1987            "build" | "plan" => raw_agent,
1988            other => {
1989                tracing::info!(
1990                    "Agent \"{}\" is not a primary agent, falling back to \"build\"",
1991                    other
1992                );
1993                "build"
1994            }
1995        }
1996    };
1997    session.set_agent_name(agent_type.to_string());
1998    session.attach_claim_provenance(claim_provenance);
1999    session.metadata.provider_keys = provider_keys;
2000
2001    // Skip repository-local Git setup for virtual tasks (no workspace directory).
2002    if !is_virtual_task {
2003        if let (Some(directory), Some(workspace_id)) =
2004            (session.metadata.directory.as_deref(), workspace_id)
2005            && directory.join(".git").exists()
2006            && let Err(err) = configure_repo_git_auth(directory, workspace_id)
2007        {
2008            tracing::warn!(task_id, error = %err, "Failed to configure Git credential helper");
2009        }
2010        if let Some(directory) = session.metadata.directory.as_deref()
2011            && let Err(err) = install_commit_msg_hook(directory)
2012        {
2013            tracing::warn!(task_id, error = %err, "Failed to install commit-msg hook");
2014        }
2015        timeline.checkpoint(task_timeline::TaskCheckpoint::GitHookInstalled);
2016    }
2017
2018    if let Some(model) = selected_model.clone() {
2019        session.metadata.model = Some(model);
2020    }
2021    timeline.checkpoint_with_detail(
2022        task_timeline::TaskCheckpoint::ModelSelected,
2023        session.metadata.model.clone(),
2024    );
2025
2026    tracing::info!(task_id, agent_type, "Executing prompt: {}", prompt);
2027    timeline.checkpoint_with_detail(
2028        task_timeline::TaskCheckpoint::AgentStarting,
2029        Some(format!(
2030            "agent={} model={:?}",
2031            agent_type, session.metadata.model
2032        )),
2033    );
2034    sync_timeline_to_runtime(&timeline, runtime).await;
2035
2036    // Set up output streaming to forward progress to the server and bus
2037    let stream_client = runtime.client.clone();
2038    let stream_server = runtime.server.clone();
2039    let stream_token = runtime.token.clone();
2040    let stream_worker_id = runtime.worker_id.clone();
2041    let stream_task_id = task_id.to_string();
2042    let stream_bus = Arc::clone(&runtime.bus);
2043
2044    let output_callback: Arc<dyn Fn(String) + Send + Sync + 'static> =
2045        Arc::new(move |output: String| {
2046            let c = stream_client.clone();
2047            let s = stream_server.clone();
2048            let t = stream_token.clone();
2049            let w = stream_worker_id.clone();
2050            let tid = stream_task_id.clone();
2051
2052            let bus_handle = stream_bus.handle("task-output");
2053            bus_handle.send(
2054                format!("task.{}", tid),
2055                crate::bus::BusMessage::TaskUpdate {
2056                    task_id: tid.clone(),
2057                    state: crate::a2a::types::TaskState::Working,
2058                    message: Some(output.clone()),
2059                },
2060            );
2061
2062            tokio::spawn(async move {
2063                let mut req = c
2064                    .post(format!("{}/v1/agent/tasks/{}/output", s, tid))
2065                    .header("X-Worker-ID", &w);
2066                if let Some(tok) = &t {
2067                    req = req.bearer_auth(tok);
2068                }
2069                let _ = req
2070                    .json(&serde_json::json!({
2071                        "worker_id": w,
2072                        "output": output,
2073                    }))
2074                    .send()
2075                    .await;
2076            });
2077        });
2078
2079    // Execute swarm tasks via SwarmExecutor; all other agents use the standard session loop.
2080    let (status, result, error, session_id) = if is_swarm_agent(agent_type) {
2081        match execute_swarm_with_policy(
2082            &mut session,
2083            prompt,
2084            model_tier.as_deref(),
2085            selected_model,
2086            &metadata,
2087            complexity_hint.as_deref(),
2088            worker_personality.as_deref(),
2089            target_agent_name.as_deref(),
2090            Some(&runtime.bus),
2091            Some(Arc::clone(&output_callback)),
2092        )
2093        .await
2094        {
2095            Ok((session_result, true)) => {
2096                tracing::info!("Swarm task completed successfully: {}", task_id);
2097                (
2098                    "completed",
2099                    Some(session_result.text),
2100                    None,
2101                    Some(session_result.session_id),
2102                )
2103            }
2104            Ok((session_result, false)) => {
2105                tracing::warn!("Swarm task completed with failures: {}", task_id);
2106                (
2107                    "failed",
2108                    Some(session_result.text),
2109                    Some("Swarm execution completed with failures".to_string()),
2110                    Some(session_result.session_id),
2111                )
2112            }
2113            Err(e) => {
2114                tracing::error!("Swarm task failed: {} - {}", task_id, e);
2115                ("failed", None, Some(format!("Error: {}", e)), None)
2116            }
2117        }
2118    } else {
2119        match execute_session_with_policy(
2120            &mut session,
2121            prompt,
2122            runtime.auto_approve,
2123            model_tier.as_deref(),
2124            Some(Arc::clone(&output_callback)),
2125        )
2126        .await
2127        {
2128            Ok(session_result) => {
2129                tracing::info!("Task completed successfully: {}", task_id);
2130                (
2131                    "completed",
2132                    Some(session_result.text),
2133                    None,
2134                    Some(session_result.session_id),
2135                )
2136            }
2137            Err(e) => {
2138                tracing::error!(task_id, error = %e, "Task execution failed");
2139                ("failed", None, Some(format!("Error: {}", e)), None)
2140            }
2141        }
2142    };
2143
2144    // Record agent completion checkpoint
2145    timeline.checkpoint_with_detail(
2146        task_timeline::TaskCheckpoint::AgentDone,
2147        Some(format!("status={}", status)),
2148    );
2149    sync_timeline_to_runtime(&timeline, runtime).await;
2150
2151    // Check deadline before proceeding to commit/release phase
2152    if timeline.is_near_deadline() {
2153        tracing::warn!(
2154            task_id,
2155            elapsed_secs = format!("{:.1}", timeline.elapsed_secs()),
2156            budget_pct = format!("{:.1}%", timeline.budget_pct_used()),
2157            "Near deadline after agent completion — will attempt graceful shutdown"
2158        );
2159        timeline.checkpoint(task_timeline::TaskCheckpoint::GracefulShutdown);
2160    }
2161
2162    let mut status = status;
2163    let mut result = result;
2164    let mut error = error;
2165
2166    if status == "completed"
2167        && !is_virtual_task
2168        && let Some(directory) = session.metadata.directory.as_deref()
2169    {
2170        match commit_and_push_task_changes(
2171            directory,
2172            task_id,
2173            session.metadata.provenance.as_ref(),
2174            timeline,
2175        )
2176        .await
2177        {
2178            Ok(Some(summary)) => {
2179                result = Some(match result {
2180                    Some(existing) if !existing.trim().is_empty() => {
2181                        format!("{existing}\n\n{summary}")
2182                    }
2183                    _ => summary,
2184                });
2185            }
2186            Ok(None) => {}
2187            Err(err) => {
2188                tracing::error!(task_id, error = %err, "Failed to commit and push task changes");
2189                status = "failed";
2190                error = Some(format!("Post-agent git commit/push failed: {err}"));
2191            }
2192        }
2193    }
2194
2195    // Sync progress state for heartbeat reporting
2196    timeline.sync_progress().await;
2197
2198    Ok((
2199        status,
2200        result,
2201        error,
2202        Some(session_id.unwrap_or_else(|| session.id.clone())),
2203    ))
2204}
2205
2206async fn commit_and_push_task_changes(
2207    repo_path: &Path,
2208    task_id: &str,
2209    provenance: Option<&crate::provenance::ExecutionProvenance>,
2210    timeline: &mut task_timeline::TaskTimeline,
2211) -> Result<Option<String>> {
2212    if !repo_path.join(".git").exists() {
2213        tracing::debug!(repo_path = %repo_path.display(), "Skipping post-agent commit: not a git repo");
2214        return Ok(None);
2215    }
2216
2217    let status = run_git_command_at(
2218        Some(repo_path),
2219        vec!["status".to_string(), "--porcelain".to_string()],
2220    )
2221    .await?;
2222    if status.trim().is_empty() {
2223        tracing::info!(task_id, repo_path = %repo_path.display(), "No changes to commit after task");
2224        return Ok(None);
2225    }
2226
2227    timeline.checkpoint(task_timeline::TaskCheckpoint::CommitStaging);
2228    run_git_command_at(
2229        Some(repo_path),
2230        vec!["add".to_string(), "--all".to_string()],
2231    )
2232    .await?;
2233
2234    let staged_status = run_git_command_at(
2235        Some(repo_path),
2236        vec!["status".to_string(), "--porcelain".to_string()],
2237    )
2238    .await?;
2239    if staged_status.trim().is_empty() {
2240        return Ok(None);
2241    }
2242
2243    let commit_message = format!("CodeTether task {task_id}");
2244    let commit_output = git_commit_with_provenance(repo_path, &commit_message, provenance).await?;
2245    if !commit_output.status.success() {
2246        anyhow::bail!(
2247            "git commit failed: {}",
2248            String::from_utf8_lossy(&commit_output.stderr).trim()
2249        );
2250    }
2251    timeline.checkpoint(task_timeline::TaskCheckpoint::CommitCreated);
2252
2253    let commit_sha = run_git_command_at(
2254        Some(repo_path),
2255        vec!["rev-parse".to_string(), "HEAD".to_string()],
2256    )
2257    .await
2258    .unwrap_or_default();
2259    let branch = run_git_command_at(
2260        Some(repo_path),
2261        vec![
2262            "rev-parse".to_string(),
2263            "--abbrev-ref".to_string(),
2264            "HEAD".to_string(),
2265        ],
2266    )
2267    .await
2268    .unwrap_or_else(|_| "HEAD".to_string());
2269
2270    timeline.checkpoint(task_timeline::TaskCheckpoint::CommitPushing);
2271    run_git_command_at(
2272        Some(repo_path),
2273        vec![
2274            "push".to_string(),
2275            "origin".to_string(),
2276            format!("HEAD:{branch}"),
2277        ],
2278    )
2279    .await?;
2280    timeline.checkpoint(task_timeline::TaskCheckpoint::CommitPushed);
2281
2282    Ok(Some(format!(
2283        "Committed and pushed task changes: {} on {}",
2284        commit_sha.trim(),
2285        branch.trim()
2286    )))
2287}
2288
2289async fn handle_forage_task(
2290    title: &str,
2291    prompt: &str,
2292    metadata: &serde_json::Map<String, serde_json::Value>,
2293    selected_model: Option<String>,
2294) -> Result<String> {
2295    let forage_args = build_forage_args(prompt, title, metadata, selected_model);
2296    let summary = crate::forage::execute_with_summary(forage_args).await?;
2297    Ok(summary.render_text())
2298}
2299
2300async fn handle_clone_repo_task(
2301    client: &Client,
2302    server: &str,
2303    token: &Option<String>,
2304    worker_id: &str,
2305    task: &serde_json::Value,
2306    metadata: &serde_json::Map<String, serde_json::Value>,
2307) -> Result<String> {
2308    let workspace_id = metadata_str(metadata, &["workspace_id"])
2309        .or_else(|| task_str(task, "workspace_id").map(|value| value.to_string()))
2310        .ok_or_else(|| anyhow::anyhow!("Clone task is missing workspace_id metadata"))?;
2311    let workspace = fetch_workspace_record(client, server, token, &workspace_id).await?;
2312    let git_url = metadata_str(metadata, &["git_url"])
2313        .or_else(|| workspace.git_url.clone())
2314        .ok_or_else(|| anyhow::anyhow!("Workspace {} is missing git_url", workspace_id))?;
2315    let branch = metadata_str(metadata, &["git_branch"])
2316        .or_else(|| workspace.git_branch.clone())
2317        .unwrap_or_else(|| "main".to_string());
2318    let repo_path = resolve_workspace_clone_path(&workspace_id, &workspace.path);
2319
2320    if let Some(parent) = repo_path.parent() {
2321        tokio::fs::create_dir_all(parent)
2322            .await
2323            .with_context(|| format!("Failed to create repo parent {}", parent.display()))?;
2324    }
2325
2326    let temp_helper_path =
2327        git_clone_base_dir().join(format!(".{}-git-credential-helper", workspace_id));
2328    write_git_credential_helper_script(&temp_helper_path, &workspace_id)?;
2329    let clone_lock_path = workspace_clone_lock_path(&workspace_id);
2330    let clone_lock = acquire_workspace_clone_lock(&clone_lock_path).await?;
2331
2332    let clone_result = async {
2333        let git_dir = repo_path.join(".git");
2334        if git_dir.exists() {
2335            configure_repo_git_auth(&repo_path, &workspace_id)?;
2336            configure_repo_git_github_app_from_agent_config(
2337                &repo_path,
2338                Some(&serde_json::Value::Object(workspace.agent_config.clone())),
2339            );
2340            refresh_existing_clone(&repo_path, &branch).await?;
2341        } else {
2342            prepare_clone_target(&repo_path).await?;
2343
2344            run_git_command_at(
2345                None,
2346                vec![
2347                    "-c".to_string(),
2348                    format!("credential.helper={}", temp_helper_path.display()),
2349                    "-c".to_string(),
2350                    "credential.useHttpPath=true".to_string(),
2351                    "clone".to_string(),
2352                    "--single-branch".to_string(),
2353                    "--branch".to_string(),
2354                    branch.clone(),
2355                    git_url.clone(),
2356                    repo_path.display().to_string(),
2357                ],
2358            )
2359            .await?;
2360            configure_repo_git_auth(&repo_path, &workspace_id)?;
2361            configure_repo_git_github_app_from_agent_config(
2362                &repo_path,
2363                Some(&serde_json::Value::Object(workspace.agent_config.clone())),
2364            );
2365        }
2366        install_commit_msg_hook(&repo_path)?;
2367
2368        register_cloned_workspace(client, server, token, worker_id, &workspace, &repo_path).await?;
2369        enqueue_post_clone_task(client, server, token, worker_id, &workspace_id, metadata).await?;
2370
2371        Ok::<String, anyhow::Error>(format!(
2372            "Repository ready at {} (branch: {})",
2373            repo_path.display(),
2374            branch
2375        ))
2376    }
2377    .await;
2378
2379    let _ = tokio::fs::remove_file(&temp_helper_path).await;
2380    release_workspace_clone_lock(clone_lock).await;
2381    clone_result
2382}
2383
2384fn workspace_clone_lock_path(workspace_id: &str) -> PathBuf {
2385    let safe_workspace_id: String = workspace_id
2386        .chars()
2387        .map(|ch| {
2388            if ch.is_ascii_alphanumeric() || matches!(ch, '-' | '_') {
2389                ch
2390            } else {
2391                '_'
2392            }
2393        })
2394        .collect();
2395    git_clone_base_dir().join(format!(".{safe_workspace_id}.clone.lock"))
2396}
2397
2398async fn acquire_workspace_clone_lock(lock_path: &Path) -> Result<tokio::fs::File> {
2399    if let Some(parent) = lock_path.parent() {
2400        tokio::fs::create_dir_all(parent).await.with_context(|| {
2401            format!("Failed to create clone lock directory {}", parent.display())
2402        })?;
2403    }
2404
2405    let stale_after = Duration::from_secs(env_u64("CODETETHER_WORKER_CLONE_LOCK_STALE_SECS", 1800));
2406    loop {
2407        match tokio::fs::OpenOptions::new()
2408            .write(true)
2409            .create_new(true)
2410            .open(lock_path)
2411            .await
2412        {
2413            Ok(file) => return Ok(file),
2414            Err(err) if err.kind() == std::io::ErrorKind::AlreadyExists => {
2415                if workspace_clone_lock_is_stale(lock_path, stale_after).await {
2416                    tracing::warn!(lock = %lock_path.display(), "Removing stale workspace clone lock");
2417                    let _ = tokio::fs::remove_file(lock_path).await;
2418                    continue;
2419                }
2420                tokio::time::sleep(Duration::from_millis(250)).await;
2421            }
2422            Err(err) => {
2423                return Err(err).with_context(|| {
2424                    format!(
2425                        "Failed to acquire workspace clone lock {}",
2426                        lock_path.display()
2427                    )
2428                });
2429            }
2430        }
2431    }
2432}
2433
2434async fn workspace_clone_lock_is_stale(lock_path: &Path, stale_after: Duration) -> bool {
2435    let Ok(metadata) = tokio::fs::metadata(lock_path).await else {
2436        return false;
2437    };
2438    let Ok(modified) = metadata.modified() else {
2439        return false;
2440    };
2441    SystemTime::now()
2442        .duration_since(modified)
2443        .map(|age| age > stale_after)
2444        .unwrap_or(false)
2445}
2446
2447async fn release_workspace_clone_lock(lock: tokio::fs::File) {
2448    if let Ok(metadata) = lock.metadata().await {
2449        #[cfg(unix)]
2450        {
2451            use std::os::unix::fs::MetadataExt;
2452            let clone_base = git_clone_base_dir();
2453            if let Ok(mut entries) = tokio::fs::read_dir(&clone_base).await {
2454                while let Ok(Some(entry)) = entries.next_entry().await {
2455                    if let Ok(entry_metadata) = entry.metadata().await {
2456                        if entry_metadata.ino() == metadata.ino()
2457                            && entry_metadata.dev() == metadata.dev()
2458                        {
2459                            drop(lock);
2460                            let _ = tokio::fs::remove_file(entry.path()).await;
2461                            return;
2462                        }
2463                    }
2464                }
2465            }
2466        }
2467    }
2468    drop(lock);
2469}
2470
2471async fn refresh_existing_clone(repo_path: &Path, branch: &str) -> Result<()> {
2472    run_git_command_at(
2473        Some(repo_path),
2474        vec![
2475            "fetch".to_string(),
2476            "origin".to_string(),
2477            branch.to_string(),
2478        ],
2479    )
2480    .await?;
2481
2482    stash_dirty_clone_state(repo_path).await?;
2483
2484    let remote_ref = format!("origin/{branch}");
2485    run_git_command_at(
2486        Some(repo_path),
2487        vec![
2488            "checkout".to_string(),
2489            "-B".to_string(),
2490            branch.to_string(),
2491            remote_ref.clone(),
2492        ],
2493    )
2494    .await?;
2495    run_git_command_at(
2496        Some(repo_path),
2497        vec!["reset".to_string(), "--hard".to_string(), remote_ref],
2498    )
2499    .await?;
2500    run_git_command_at(
2501        Some(repo_path),
2502        vec!["clean".to_string(), "-fd".to_string()],
2503    )
2504    .await?;
2505
2506    Ok(())
2507}
2508
2509async fn stash_dirty_clone_state(repo_path: &Path) -> Result<()> {
2510    let status = run_git_command_at(
2511        Some(repo_path),
2512        vec!["status".to_string(), "--porcelain".to_string()],
2513    )
2514    .await?;
2515    if status.trim().is_empty() {
2516        return Ok(());
2517    }
2518
2519    let message = format!(
2520        "codetether auto-save before workspace refresh {}",
2521        chrono::Utc::now().to_rfc3339()
2522    );
2523    tracing::warn!(
2524        repo_path = %repo_path.display(),
2525        "Existing clone has local changes; stashing them before refreshing from origin"
2526    );
2527    run_git_command_at(
2528        Some(repo_path),
2529        vec![
2530            "stash".to_string(),
2531            "push".to_string(),
2532            "--include-untracked".to_string(),
2533            "-m".to_string(),
2534            message,
2535        ],
2536    )
2537    .await?;
2538
2539    Ok(())
2540}
2541
2542async fn register_cloned_workspace(
2543    client: &Client,
2544    server: &str,
2545    token: &Option<String>,
2546    worker_id: &str,
2547    workspace: &RegisteredWorkspaceRecord,
2548    repo_path: &Path,
2549) -> Result<()> {
2550    let mut req = client.post(format!(
2551        "{}/v1/agent/workspaces",
2552        server.trim_end_matches('/')
2553    ));
2554    if let Some(token) = token {
2555        req = req.bearer_auth(token);
2556    }
2557
2558    let response = req
2559        .json(&serde_json::json!({
2560            "workspace_id": workspace.id,
2561            "name": workspace.name,
2562            "path": repo_path.display().to_string(),
2563            "description": workspace.description,
2564            "agent_config": workspace.agent_config,
2565            "git_url": workspace.git_url,
2566            "git_branch": workspace.git_branch,
2567            "worker_id": worker_id,
2568        }))
2569        .send()
2570        .await
2571        .context("Failed to register cloned workspace with server")?;
2572    let status = response.status();
2573    if !status.is_success() {
2574        let body = response.text().await.unwrap_or_default();
2575        anyhow::bail!(
2576            "Failed to register cloned workspace {} ({}): {}",
2577            workspace.id,
2578            status,
2579            body
2580        );
2581    }
2582
2583    Ok(())
2584}
2585
2586async fn enqueue_post_clone_task(
2587    client: &Client,
2588    server: &str,
2589    token: &Option<String>,
2590    worker_id: &str,
2591    workspace_id: &str,
2592    metadata: &serde_json::Map<String, serde_json::Value>,
2593) -> Result<()> {
2594    if !worker_should_enqueue_post_clone_task(metadata) {
2595        return Ok(());
2596    }
2597
2598    let Some(post_clone_task) = metadata.get("post_clone_task") else {
2599        return Ok(());
2600    };
2601    let Some(task) = post_clone_task.as_object() else {
2602        return Ok(());
2603    };
2604    let title = task
2605        .get("title")
2606        .and_then(|value| value.as_str())
2607        .filter(|value| !value.trim().is_empty())
2608        .ok_or_else(|| anyhow::anyhow!("post_clone_task is missing title"))?;
2609    let prompt = task
2610        .get("prompt")
2611        .and_then(|value| value.as_str())
2612        .filter(|value| !value.trim().is_empty())
2613        .ok_or_else(|| anyhow::anyhow!("post_clone_task is missing prompt"))?;
2614    let agent_type = task
2615        .get("agent_type")
2616        .and_then(|value| value.as_str())
2617        .unwrap_or("build");
2618    let mut task_metadata = task
2619        .get("metadata")
2620        .cloned()
2621        .unwrap_or_else(|| serde_json::json!({}));
2622    if let Some(task_metadata_obj) = task_metadata.as_object_mut() {
2623        task_metadata_obj
2624            .entry("target_worker_id".to_string())
2625            .or_insert_with(|| serde_json::Value::String(worker_id.to_string()));
2626    }
2627
2628    let mut req = client.post(format!(
2629        "{}/v1/agent/workspaces/{}/tasks",
2630        server.trim_end_matches('/'),
2631        workspace_id
2632    ));
2633    if let Some(token) = token {
2634        req = req.bearer_auth(token);
2635    }
2636
2637    let response = req
2638        .json(&serde_json::json!({
2639            "title": title,
2640            "prompt": prompt,
2641            "agent_type": agent_type,
2642            "metadata": task_metadata,
2643        }))
2644        .send()
2645        .await
2646        .context("Failed to enqueue post-clone task")?;
2647    let status = response.status();
2648    if !status.is_success() {
2649        let body = response.text().await.unwrap_or_default();
2650        anyhow::bail!("Failed to enqueue post-clone task ({}): {}", status, body);
2651    }
2652    Ok(())
2653}
2654
2655fn worker_should_enqueue_post_clone_task(
2656    metadata: &serde_json::Map<String, serde_json::Value>,
2657) -> bool {
2658    metadata
2659        .get("post_clone_task")
2660        .and_then(|value| value.as_object())
2661        .is_some()
2662}
2663
2664async fn run_git_command_at(current_dir: Option<&Path>, args: Vec<String>) -> Result<String> {
2665    let mut command = Command::new("git");
2666    if let Some(dir) = current_dir {
2667        command.current_dir(dir);
2668    }
2669
2670    let output = command
2671        .args(args.iter().map(String::as_str))
2672        .output()
2673        .await
2674        .context("Failed to execute git command")?;
2675
2676    if output.status.success() {
2677        return Ok(String::from_utf8_lossy(&output.stdout).trim().to_string());
2678    }
2679
2680    Err(anyhow::anyhow!(
2681        "Git command failed: {}",
2682        String::from_utf8_lossy(&output.stderr).trim()
2683    ))
2684}
2685
2686async fn release_task_result(
2687    client: &Client,
2688    server: &str,
2689    token: &Option<String>,
2690    worker_id: &str,
2691    task_id: &str,
2692    status: &str,
2693    result: Option<String>,
2694    error: Option<String>,
2695    session_id: Option<String>,
2696    diagnostics: Option<serde_json::Value>,
2697) -> Result<()> {
2698    let mut req = client
2699        .post(format!("{}/v1/worker/tasks/release", server))
2700        .header("X-Worker-ID", worker_id);
2701    if let Some(token) = token {
2702        req = req.bearer_auth(token);
2703    }
2704
2705    let mut payload = serde_json::json!({
2706        "task_id": task_id,
2707        "status": status,
2708        "result": result,
2709        "error": error,
2710        "session_id": session_id,
2711    });
2712    if let Some(diagnostics) = diagnostics {
2713        if let Some(obj) = payload.as_object_mut() {
2714            obj.insert("diagnostics".to_string(), diagnostics);
2715        }
2716    }
2717
2718    req.json(&payload)
2719        .send()
2720        .await
2721        .context("Failed to release task result")?;
2722
2723    Ok(())
2724}
2725
2726async fn execute_swarm_with_policy(
2727    session: &mut Session,
2728    prompt: &str,
2729    model_tier: Option<&str>,
2730    explicit_model: Option<String>,
2731    metadata: &serde_json::Map<String, serde_json::Value>,
2732    complexity_hint: Option<&str>,
2733    worker_personality: Option<&str>,
2734    target_agent_name: Option<&str>,
2735    bus: Option<&Arc<AgentBus>>,
2736    output_callback: Option<Arc<dyn Fn(String) + Send + Sync + 'static>>,
2737) -> Result<(crate::session::SessionResult, bool)> {
2738    use crate::provider::{ContentPart, Message, Role};
2739
2740    session.add_message(Message {
2741        role: Role::User,
2742        content: vec![ContentPart::Text {
2743            text: prompt.to_string(),
2744        }],
2745    });
2746
2747    if session.title.is_none() {
2748        session.generate_title().await?;
2749    }
2750
2751    let strategy = parse_swarm_strategy(metadata);
2752    let max_subagents = metadata_usize(
2753        metadata,
2754        &["swarm_max_subagents", "max_subagents", "subagents"],
2755    )
2756    .unwrap_or(10)
2757    .clamp(1, 100);
2758    let max_steps_per_subagent = metadata_usize(
2759        metadata,
2760        &[
2761            "swarm_max_steps_per_subagent",
2762            "max_steps_per_subagent",
2763            "max_steps",
2764        ],
2765    )
2766    .unwrap_or(50)
2767    .clamp(1, 200);
2768    let timeout_secs = metadata_u64(metadata, &["swarm_timeout_secs", "timeout_secs", "timeout"])
2769        .unwrap_or(600)
2770        .clamp(30, 3600);
2771    let parallel_enabled =
2772        metadata_bool(metadata, &["swarm_parallel_enabled", "parallel_enabled"]).unwrap_or(true);
2773
2774    let model = resolve_swarm_model(explicit_model, model_tier).await;
2775    if let Some(ref selected_model) = model {
2776        session.metadata.model = Some(selected_model.clone());
2777    }
2778
2779    if let Some(ref cb) = output_callback {
2780        cb(format!(
2781            "[swarm] routing complexity={} tier={} personality={} target_agent={}",
2782            complexity_hint.unwrap_or("standard"),
2783            model_tier.unwrap_or("balanced"),
2784            worker_personality.unwrap_or("auto"),
2785            target_agent_name.unwrap_or("auto")
2786        ));
2787        cb(format!(
2788            "[swarm] config strategy={:?} max_subagents={} max_steps={} timeout={}s tier={}",
2789            strategy,
2790            max_subagents,
2791            max_steps_per_subagent,
2792            timeout_secs,
2793            model_tier.unwrap_or("balanced")
2794        ));
2795    }
2796
2797    let swarm_config = SwarmConfig {
2798        max_subagents,
2799        max_steps_per_subagent,
2800        subagent_timeout_secs: timeout_secs,
2801        parallel_enabled,
2802        model,
2803        working_dir: session
2804            .metadata
2805            .directory
2806            .as_ref()
2807            .map(|p| p.to_string_lossy().to_string()),
2808        ..Default::default()
2809    };
2810
2811    let swarm_result = if output_callback.is_some() {
2812        let (event_tx, mut event_rx) = mpsc::channel(256);
2813        let mut executor = SwarmExecutor::new(swarm_config).with_event_tx(event_tx);
2814        if let Some(bus) = bus {
2815            executor = executor.with_bus(Arc::clone(bus));
2816        }
2817        let prompt_owned = prompt.to_string();
2818        let mut exec_handle =
2819            tokio::spawn(async move { executor.execute(&prompt_owned, strategy).await });
2820
2821        let mut final_result: Option<crate::swarm::SwarmResult> = None;
2822
2823        while final_result.is_none() {
2824            tokio::select! {
2825                maybe_event = event_rx.recv() => {
2826                    if let Some(event) = maybe_event
2827                        && let Some(ref cb) = output_callback
2828                            && let Some(line) = format_swarm_event_for_output(&event) {
2829                                cb(line);
2830                            }
2831                }
2832                join_result = &mut exec_handle => {
2833                    let joined = join_result.map_err(|e| anyhow::anyhow!("Swarm join failure: {}", e))?;
2834                    final_result = Some(joined?);
2835                }
2836            }
2837        }
2838
2839        while let Ok(event) = event_rx.try_recv() {
2840            if let Some(ref cb) = output_callback
2841                && let Some(line) = format_swarm_event_for_output(&event)
2842            {
2843                cb(line);
2844            }
2845        }
2846
2847        final_result.ok_or_else(|| anyhow::anyhow!("Swarm execution returned no result"))?
2848    } else {
2849        let mut executor = SwarmExecutor::new(swarm_config);
2850        if let Some(bus) = bus {
2851            executor = executor.with_bus(Arc::clone(bus));
2852        }
2853        executor.execute(prompt, strategy).await?
2854    };
2855
2856    let final_text = if swarm_result.result.trim().is_empty() {
2857        if swarm_result.success {
2858            "Swarm completed without textual output.".to_string()
2859        } else {
2860            "Swarm finished with failures and no textual output.".to_string()
2861        }
2862    } else {
2863        swarm_result.result.clone()
2864    };
2865
2866    session.add_message(Message {
2867        role: Role::Assistant,
2868        content: vec![ContentPart::Text {
2869            text: final_text.clone(),
2870        }],
2871    });
2872    session.save().await?;
2873
2874    Ok((
2875        crate::session::SessionResult {
2876            text: final_text,
2877            session_id: session.id.clone(),
2878        },
2879        swarm_result.success,
2880    ))
2881}
2882
2883/// Execute a session with the given auto-approve policy
2884/// Optionally streams output chunks via the callback
2885async fn execute_session_with_policy(
2886    session: &mut Session,
2887    prompt: &str,
2888    auto_approve: AutoApprove,
2889    model_tier: Option<&str>,
2890    output_callback: Option<Arc<dyn Fn(String) + Send + Sync + 'static>>,
2891) -> Result<crate::session::SessionResult> {
2892    use crate::provider::{ContentPart, Message, ProviderRegistry, Role, parse_model_string};
2893    use std::sync::Arc;
2894
2895    let per_task_keys = session
2896        .metadata
2897        .provider_keys
2898        .as_ref()
2899        .and_then(|value| match crate::provider::PerTaskProviderKeys::from_value(value) {
2900            Ok(keys) => keys,
2901            Err(err) => {
2902                tracing::warn!(error = %err, "Invalid per-task provider key payload; falling back to platform registry");
2903                None
2904            }
2905        });
2906
2907    // Load provider registry from claim-injected tenant keys, falling back to Vault/env platform keys.
2908    let registry = if let Some(ref keys) = per_task_keys {
2909        let registry = keys.build_registry();
2910        if registry.list().is_empty() {
2911            tracing::warn!(
2912                "Per-task provider key payload produced no providers; falling back to platform registry"
2913            );
2914            (*ProviderRegistry::shared_from_vault().await.context(
2915                "Failed to load provider registry from Vault — check VAULT_ADDR/VAULT_TOKEN",
2916            )?)
2917            .clone()
2918        } else {
2919            tracing::info!(
2920                source = keys.source(),
2921                providers = ?keys.provider_names(),
2922                "Using tenant-scoped per-task provider registry"
2923            );
2924            registry
2925        }
2926    } else {
2927        (*ProviderRegistry::shared_from_vault().await.context(
2928            "Failed to load provider registry from Vault — check VAULT_ADDR/VAULT_TOKEN",
2929        )?)
2930        .clone()
2931    };
2932    let providers = registry.list();
2933    tracing::info!("Available providers: {:?}", providers);
2934
2935    if providers.is_empty() {
2936        anyhow::bail!(
2937            "No LLM providers available (0 providers loaded). \
2938             Configure API keys in HashiCorp Vault or set environment variables. \
2939             Vault address: {}",
2940            std::env::var("VAULT_ADDR").unwrap_or_else(|_| "(not set)".into())
2941        );
2942    }
2943
2944    // Parse model string
2945    let (provider_name, model_id) = if let Some(ref model_str) = session.metadata.model {
2946        let (prov, model) = parse_model_string(model_str);
2947        let prov = prov.map(|p| if p == "zhipuai" { "zai" } else { p });
2948        if prov.is_some() {
2949            (prov.map(|s| s.to_string()), model.to_string())
2950        } else if providers.contains(&model) {
2951            (Some(model.to_string()), String::new())
2952        } else {
2953            (None, model.to_string())
2954        }
2955    } else {
2956        (None, String::new())
2957    };
2958
2959    let provider_slice = providers.as_slice();
2960    if let Some(explicit_provider) = provider_name.as_deref()
2961        && !providers.contains(&explicit_provider)
2962    {
2963        anyhow::bail!(
2964            "Provider '{}' selected explicitly but is unavailable. Available providers: {}",
2965            explicit_provider,
2966            providers.join(", ")
2967        );
2968    }
2969
2970    // Determine which provider to use, preferring explicit request first, then model tier.
2971    let selected_provider = provider_name
2972        .as_deref()
2973        .unwrap_or_else(|| choose_provider_for_tier(provider_slice, model_tier));
2974
2975    let provider = registry
2976        .get(selected_provider)
2977        .ok_or_else(|| anyhow::anyhow!("Provider {} not found", selected_provider))?;
2978
2979    // Add user message
2980    session.add_message(Message {
2981        role: Role::User,
2982        content: vec![ContentPart::Text {
2983            text: prompt.to_string(),
2984        }],
2985    });
2986
2987    // Generate title
2988    if session.title.is_none() {
2989        session.generate_title().await?;
2990    }
2991
2992    // Determine model.
2993    let model = if !model_id.is_empty() {
2994        model_id
2995    } else {
2996        default_model_for_provider(selected_provider, model_tier)
2997    };
2998
2999    // Create tool registry with filtering based on auto-approve policy
3000    let workspace_dir = session
3001        .metadata
3002        .directory
3003        .clone()
3004        .unwrap_or_else(|| std::env::current_dir().unwrap_or_default());
3005    let tool_registry = create_filtered_registry(
3006        Arc::clone(&provider),
3007        model.clone(),
3008        auto_approve,
3009        &workspace_dir,
3010        output_callback.clone(),
3011    );
3012    let tool_definitions = tool_registry.definitions();
3013
3014    let temperature = if prefers_temperature_one(&model) {
3015        Some(1.0)
3016    } else {
3017        Some(0.7)
3018    };
3019
3020    tracing::info!(
3021        "Using model: {} via provider: {} (tier: {:?})",
3022        model,
3023        selected_provider,
3024        model_tier
3025    );
3026    tracing::info!(
3027        "Available tools: {} (auto_approve: {:?})",
3028        tool_definitions.len(),
3029        auto_approve
3030    );
3031
3032    // Build system prompt
3033    let system_prompt = crate::agent::builtin::build_system_prompt(&workspace_dir);
3034
3035    let mut final_output = String::new();
3036    let max_steps = 50;
3037
3038    for step in 1..=max_steps {
3039        tracing::info!(step = step, "Agent step starting");
3040
3041        let response = complete_worker_step_with_context_fallback(
3042            Arc::clone(&provider),
3043            session,
3044            &model,
3045            &system_prompt,
3046            &tool_definitions,
3047            temperature,
3048        )
3049        .await?;
3050
3051        crate::telemetry::TOKEN_USAGE.record_model_usage(
3052            &model,
3053            response.usage.prompt_tokens as u64,
3054            response.usage.completion_tokens as u64,
3055        );
3056
3057        // Extract tool calls
3058        let tool_calls: Vec<(String, String, serde_json::Value)> = response
3059            .message
3060            .content
3061            .iter()
3062            .filter_map(|part| {
3063                if let ContentPart::ToolCall {
3064                    id,
3065                    name,
3066                    arguments,
3067                    ..
3068                } = part
3069                {
3070                    let args: serde_json::Value =
3071                        serde_json::from_str(arguments).unwrap_or(serde_json::json!({}));
3072                    Some((id.clone(), name.clone(), args))
3073                } else {
3074                    None
3075                }
3076            })
3077            .collect();
3078
3079        // Collect text output and stream it
3080        for part in &response.message.content {
3081            if let ContentPart::Text { text } = part
3082                && !text.is_empty()
3083            {
3084                final_output.push_str(text);
3085                final_output.push('\n');
3086                if let Some(ref cb) = output_callback {
3087                    cb(text.clone());
3088                }
3089            }
3090        }
3091
3092        // If no tool calls, we're done
3093        if tool_calls.is_empty() {
3094            session.add_message(response.message.clone());
3095            break;
3096        }
3097
3098        session.add_message(response.message.clone());
3099
3100        tracing::info!(
3101            step = step,
3102            num_tools = tool_calls.len(),
3103            "Executing tool calls"
3104        );
3105
3106        // Execute each tool call
3107        for (tool_id, tool_name, tool_input) in tool_calls {
3108            tracing::info!(tool = %tool_name, tool_id = %tool_id, "Executing tool");
3109
3110            // Stream tool start event
3111            if let Some(ref cb) = output_callback {
3112                cb(format!("[tool:start:{}]", tool_name));
3113            }
3114
3115            // Check if tool is allowed based on auto-approve policy
3116            if !is_tool_allowed(&tool_name, auto_approve) {
3117                let msg = format!(
3118                    "Tool '{}' requires approval but auto-approve policy is {:?}",
3119                    tool_name, auto_approve
3120                );
3121                tracing::warn!(tool = %tool_name, "Tool blocked by auto-approve policy");
3122                session.add_message(Message {
3123                    role: Role::Tool,
3124                    content: vec![ContentPart::ToolResult {
3125                        tool_call_id: tool_id,
3126                        content: msg,
3127                    }],
3128                });
3129                continue;
3130            }
3131
3132            let enriched_tool_input = enrich_tool_input_with_runtime_context(
3133                &tool_input,
3134                &workspace_dir,
3135                Some(&model),
3136                &session.id,
3137                &session.agent,
3138                session.metadata.provenance.as_ref(),
3139            );
3140
3141            let content = if let Some(tool) = tool_registry.get(&tool_name) {
3142                let tool_timeout_secs =
3143                    env_u64("CODETETHER_WORKER_TOOL_TIMEOUT_SECS", 120).clamp(1, 3600);
3144                let exec_result: Result<crate::tool::ToolResult> = match tokio::time::timeout(
3145                    Duration::from_secs(tool_timeout_secs),
3146                    tool.execute(enriched_tool_input.clone()),
3147                )
3148                .await
3149                {
3150                    Ok(result) => result,
3151                    Err(_) => Ok(crate::tool::ToolResult::structured_error(
3152                        "TOOL_TIMEOUT",
3153                        &tool_name,
3154                        &format!("tool timed out after {tool_timeout_secs}s"),
3155                        None,
3156                        Some(serde_json::json!({
3157                            "hint": "Narrow the request, set a more specific path/include filter, or retry with smaller scope."
3158                        })),
3159                    )),
3160                };
3161                match exec_result {
3162                    Ok(result) => {
3163                        tracing::info!(tool = %tool_name, success = result.success, "Tool execution completed");
3164                        if let Some(ref cb) = output_callback {
3165                            let status = if result.success { "ok" } else { "err" };
3166                            cb(format!(
3167                                "[tool:{}:{}] {}",
3168                                tool_name,
3169                                status,
3170                                crate::util::truncate_bytes_safe(&result.output, 500)
3171                            ));
3172                        }
3173                        result.output
3174                    }
3175                    Err(e) => {
3176                        tracing::warn!(tool = %tool_name, error = %e, "Tool execution failed");
3177                        if let Some(ref cb) = output_callback {
3178                            cb(format!("[tool:{}:err] {}", tool_name, e));
3179                        }
3180                        format!("Error: {}", e)
3181                    }
3182                }
3183            } else {
3184                tracing::warn!(tool = %tool_name, "Tool not found");
3185                format!("Error: Unknown tool '{}'", tool_name)
3186            };
3187
3188            session.add_message(Message {
3189                role: Role::Tool,
3190                content: vec![ContentPart::ToolResult {
3191                    tool_call_id: tool_id,
3192                    content,
3193                }],
3194            });
3195        }
3196    }
3197
3198    session.save().await?;
3199
3200    Ok(crate::session::SessionResult {
3201        text: final_output.trim().to_string(),
3202        session_id: session.id.clone(),
3203    })
3204}
3205
3206async fn complete_worker_step_with_context_fallback(
3207    provider: Arc<dyn crate::provider::Provider>,
3208    session: &Session,
3209    model: &str,
3210    system_prompt: &str,
3211    tool_definitions: &[crate::provider::ToolDefinition],
3212    temperature: Option<f32>,
3213) -> Result<crate::provider::CompletionResponse> {
3214    let options = crate::session::context::RequestOptions {
3215        temperature,
3216        top_p: None,
3217        max_tokens: Some(8192),
3218        force_keep_last: None,
3219    };
3220
3221    match crate::session::context::complete_with_context(
3222        Arc::clone(&provider),
3223        session,
3224        model,
3225        system_prompt,
3226        tool_definitions,
3227        options,
3228    )
3229    .await
3230    {
3231        Ok(response) => Ok(response),
3232        Err(error) if crate::session::helper::error::is_prompt_too_long_error(&error) => {
3233            let compact_tools = compact_worker_tool_definitions(tool_definitions);
3234            tracing::warn!(
3235                error = %error,
3236                tool_count = tool_definitions.len(),
3237                original_tool_schema_bytes = tool_schema_bytes(tool_definitions),
3238                compact_tool_schema_bytes = tool_schema_bytes(&compact_tools),
3239                "Provider rejected prompt after context compaction; retrying with compact tool schemas"
3240            );
3241            crate::session::context::complete_with_context(
3242                provider,
3243                session,
3244                model,
3245                system_prompt,
3246                &compact_tools,
3247                options,
3248            )
3249            .await
3250            .map_err(|retry_error| {
3251                if crate::session::helper::error::is_prompt_too_long_error(&retry_error) {
3252                    retry_error.context(
3253                        "provider rejected prompt as too long after RLM compaction and compact tool-schema fallback",
3254                    )
3255                } else {
3256                    retry_error
3257                }
3258            })
3259        }
3260        Err(error) => Err(error),
3261    }
3262}
3263
3264fn compact_worker_tool_definitions(
3265    tools: &[crate::provider::ToolDefinition],
3266) -> Vec<crate::provider::ToolDefinition> {
3267    tools
3268        .iter()
3269        .map(|tool| crate::provider::ToolDefinition {
3270            name: tool.name.clone(),
3271            description: tool.description.clone(),
3272            parameters: serde_json::json!({
3273                "type": "object",
3274                "additionalProperties": true
3275            }),
3276        })
3277        .collect()
3278}
3279
3280fn tool_schema_bytes(tools: &[crate::provider::ToolDefinition]) -> usize {
3281    let mut writer = CountingWriter::default();
3282    serde_json::to_writer(&mut writer, tools)
3283        .map(|_| writer.bytes)
3284        .unwrap_or(0)
3285}
3286
3287#[derive(Default)]
3288struct CountingWriter {
3289    bytes: usize,
3290}
3291
3292impl std::io::Write for CountingWriter {
3293    fn write(&mut self, buf: &[u8]) -> std::io::Result<usize> {
3294        self.bytes += buf.len();
3295        Ok(buf.len())
3296    }
3297
3298    fn flush(&mut self) -> std::io::Result<()> {
3299        Ok(())
3300    }
3301}
3302
3303/// Start the heartbeat background task
3304/// Returns a JoinHandle that can be used to cancel the heartbeat
3305pub fn start_heartbeat(
3306    client: Client,
3307    server: String,
3308    token: Option<String>,
3309    heartbeat_state: HeartbeatState,
3310    processing: Arc<Mutex<HashSet<String>>>,
3311    cognition_config: CognitionHeartbeatConfig,
3312    task_progress: Arc<Mutex<task_timeline::TaskProgressState>>,
3313) -> JoinHandle<()> {
3314    tokio::spawn(async move {
3315        let mut consecutive_failures = 0u32;
3316        const MAX_FAILURES: u32 = 3;
3317        const HEARTBEAT_INTERVAL_SECS: u64 = 30;
3318        const COGNITION_RETRY_COOLDOWN_SECS: u64 = 300;
3319        let mut cognition_payload_disabled_until: Option<Instant> = None;
3320        let persistent_heartbeat_enabled = persistent_worker_enabled();
3321        let persistent_lease_seconds = persistent_worker_lease_seconds();
3322
3323        let mut interval =
3324            tokio::time::interval(tokio::time::Duration::from_secs(HEARTBEAT_INTERVAL_SECS));
3325        interval.set_missed_tick_behavior(tokio::time::MissedTickBehavior::Skip);
3326
3327        loop {
3328            interval.tick().await;
3329
3330            // Update task count from processing set
3331            let active_count = processing.lock().await.len();
3332            heartbeat_state.set_task_count(active_count).await;
3333
3334            // Determine status based on active tasks
3335            let status = if active_count > 0 {
3336                WorkerStatus::Processing
3337            } else {
3338                WorkerStatus::Idle
3339            };
3340            heartbeat_state.set_status(status).await;
3341
3342            // Send heartbeat
3343            let url = format!(
3344                "{}/v1/agent/workers/{}/heartbeat",
3345                server, heartbeat_state.worker_id
3346            );
3347            let mut req = client.post(&url);
3348
3349            if let Some(ref t) = token {
3350                req = req.bearer_auth(t);
3351            }
3352
3353            let status_str = heartbeat_state.status.lock().await.as_str().to_string();
3354            let sub_agents = heartbeat_state.sub_agents_snapshot().await;
3355
3356            // Include task pipeline progress in heartbeat
3357            let progress_snapshot = task_progress.lock().await.clone();
3358            let task_progress_payload = if progress_snapshot.task_id.is_some() {
3359                Some(serde_json::json!({
3360                    "task_id": progress_snapshot.task_id,
3361                    "current_checkpoint": progress_snapshot.current_checkpoint,
3362                    "elapsed_secs": format!("{:.1}", progress_snapshot.elapsed_secs),
3363                    "remaining_secs": format!("{:.1}", progress_snapshot.remaining_secs),
3364                    "budget_pct_used": format!("{:.1}%", progress_snapshot.budget_pct_used),
3365                    "checkpoints_reached": progress_snapshot.checkpoints_reached.len(),
3366                    "last_detail": progress_snapshot.last_detail,
3367                }))
3368            } else {
3369                None
3370            };
3371
3372            let base_payload = serde_json::json!({
3373                "worker_id": &heartbeat_state.worker_id,
3374                "agent_name": &heartbeat_state.agent_name,
3375                "status": status_str,
3376                "active_task_count": active_count,
3377                "sub_agents": sub_agents,
3378            });
3379            let mut payload = base_payload.clone();
3380            let mut included_cognition_payload = false;
3381            let cognition_payload_allowed = cognition_payload_disabled_until
3382                .map(|until| Instant::now() >= until)
3383                .unwrap_or(true);
3384
3385            if cognition_config.enabled
3386                && cognition_payload_allowed
3387                && let Some(cognition_payload) =
3388                    fetch_cognition_heartbeat_payload(&client, &cognition_config).await
3389                && let Some(obj) = payload.as_object_mut()
3390            {
3391                obj.insert("cognition".to_string(), cognition_payload);
3392                included_cognition_payload = true;
3393            }
3394
3395            // Include task pipeline progress if a task is active
3396            if let Some(progress_json) = task_progress_payload.clone() {
3397                if let Some(obj) = payload.as_object_mut() {
3398                    obj.insert("task_progress".to_string(), progress_json);
3399                }
3400            }
3401
3402            match req.json(&payload).send().await {
3403                Ok(res) => {
3404                    if res.status().is_success() {
3405                        consecutive_failures = 0;
3406                        tracing::debug!(
3407                            worker_id = %heartbeat_state.worker_id,
3408                            status = status_str,
3409                            active_tasks = active_count,
3410                            "Heartbeat sent successfully"
3411                        );
3412                    } else if included_cognition_payload && res.status().is_client_error() {
3413                        tracing::warn!(
3414                            worker_id = %heartbeat_state.worker_id,
3415                            status = %res.status(),
3416                            "Heartbeat cognition payload rejected, retrying without cognition payload"
3417                        );
3418
3419                        let mut retry_req = client.post(&url);
3420                        if let Some(ref t) = token {
3421                            retry_req = retry_req.bearer_auth(t);
3422                        }
3423
3424                        match retry_req.json(&base_payload).send().await {
3425                            Ok(retry_res) if retry_res.status().is_success() => {
3426                                cognition_payload_disabled_until = Some(
3427                                    Instant::now()
3428                                        + Duration::from_secs(COGNITION_RETRY_COOLDOWN_SECS),
3429                                );
3430                                consecutive_failures = 0;
3431                                tracing::warn!(
3432                                    worker_id = %heartbeat_state.worker_id,
3433                                    retry_after_secs = COGNITION_RETRY_COOLDOWN_SECS,
3434                                    "Paused cognition heartbeat payload after schema rejection"
3435                                );
3436                            }
3437                            Ok(retry_res) => {
3438                                consecutive_failures += 1;
3439                                tracing::warn!(
3440                                    worker_id = %heartbeat_state.worker_id,
3441                                    status = %retry_res.status(),
3442                                    failures = consecutive_failures,
3443                                    "Heartbeat failed even after retry without cognition payload"
3444                                );
3445                            }
3446                            Err(e) => {
3447                                consecutive_failures += 1;
3448                                tracing::warn!(
3449                                    worker_id = %heartbeat_state.worker_id,
3450                                    error = %e,
3451                                    failures = consecutive_failures,
3452                                    "Heartbeat retry without cognition payload failed"
3453                                );
3454                            }
3455                        }
3456                    } else {
3457                        consecutive_failures += 1;
3458                        tracing::warn!(
3459                            worker_id = %heartbeat_state.worker_id,
3460                            status = %res.status(),
3461                            failures = consecutive_failures,
3462                            "Heartbeat failed"
3463                        );
3464                    }
3465                }
3466                Err(e) => {
3467                    consecutive_failures += 1;
3468                    tracing::warn!(
3469                        worker_id = %heartbeat_state.worker_id,
3470                        error = %e,
3471                        failures = consecutive_failures,
3472                        "Heartbeat request failed"
3473                    );
3474                }
3475            }
3476
3477            if persistent_heartbeat_enabled
3478                && let Some(progress_json) = task_progress_payload
3479                && let Err(e) = send_extended_task_heartbeat(
3480                    &client,
3481                    &server,
3482                    &token,
3483                    &heartbeat_state.worker_id,
3484                    progress_json,
3485                    persistent_lease_seconds,
3486                )
3487                .await
3488            {
3489                tracing::warn!(
3490                    worker_id = %heartbeat_state.worker_id,
3491                    error = %e,
3492                    "Extended task heartbeat failed"
3493                );
3494            }
3495
3496            // Log error after 3 consecutive failures but do not terminate
3497            if consecutive_failures >= MAX_FAILURES {
3498                tracing::error!(
3499                    worker_id = %heartbeat_state.worker_id,
3500                    failures = consecutive_failures,
3501                    "Heartbeat failed {} consecutive times - worker will continue running and attempt reconnection via SSE loop",
3502                    MAX_FAILURES
3503                );
3504                // Reset counter to avoid spamming error logs
3505                consecutive_failures = 0;
3506            }
3507        }
3508    })
3509}
3510
3511async fn send_extended_task_heartbeat(
3512    client: &Client,
3513    server: &str,
3514    token: &Option<String>,
3515    worker_id: &str,
3516    progress: serde_json::Value,
3517    lease_seconds: u64,
3518) -> Result<()> {
3519    let task_id = progress
3520        .get("task_id")
3521        .and_then(|value| value.as_str())
3522        .filter(|value| !value.is_empty())
3523        .context("active task progress missing task_id")?
3524        .to_string();
3525    let checkpoint_seq = progress
3526        .get("checkpoints_reached")
3527        .and_then(|value| value.as_u64())
3528        .unwrap_or(1)
3529        .max(1);
3530    let status_message = progress
3531        .get("current_checkpoint")
3532        .and_then(|value| value.as_str())
3533        .map(|value| value.to_string());
3534
3535    let mut req = client.post(format!("{}/v1/worker/tasks/heartbeat-extended", server));
3536    if let Some(token) = token {
3537        req = req.bearer_auth(token);
3538    }
3539
3540    let response = req
3541        .json(&serde_json::json!({
3542            "task_id": &task_id,
3543            "worker_id": worker_id,
3544            "status_message": status_message,
3545            "checkpoint": progress,
3546            "checkpoint_seq": checkpoint_seq,
3547            "lease_extension_seconds": lease_seconds,
3548        }))
3549        .send()
3550        .await?;
3551
3552    let status = response.status();
3553    if !status.is_success() {
3554        let body = response.text().await.unwrap_or_default();
3555        anyhow::bail!("extended heartbeat rejected with {}: {}", status, body);
3556    }
3557
3558    let body = response
3559        .json::<serde_json::Value>()
3560        .await
3561        .unwrap_or_default();
3562    if body.get("success").and_then(|value| value.as_bool()) == Some(false) {
3563        tracing::debug!(
3564            task_id = %task_id,
3565            message = body
3566                .get("message")
3567                .and_then(|value| value.as_str())
3568                .unwrap_or("no active run"),
3569            "Extended task heartbeat skipped"
3570        );
3571        return Ok(());
3572    }
3573
3574    Ok(())
3575}
3576
3577async fn fetch_cognition_heartbeat_payload(
3578    client: &Client,
3579    config: &CognitionHeartbeatConfig,
3580) -> Option<serde_json::Value> {
3581    let status_url = format!("{}/v1/cognition/status", config.source_base_url);
3582    let status_res = tokio::time::timeout(
3583        Duration::from_millis(config.request_timeout_ms),
3584        client.get(status_url).send(),
3585    )
3586    .await
3587    .ok()?
3588    .ok()?;
3589
3590    if !status_res.status().is_success() {
3591        return None;
3592    }
3593
3594    let status: CognitionStatusSnapshot = status_res.json().await.ok()?;
3595    let mut payload = serde_json::json!({
3596        "running": status.running,
3597        "last_tick_at": status.last_tick_at,
3598        "active_persona_count": status.active_persona_count,
3599        "events_buffered": status.events_buffered,
3600        "snapshots_buffered": status.snapshots_buffered,
3601        "loop_interval_ms": status.loop_interval_ms,
3602    });
3603
3604    if config.include_thought_summary {
3605        let snapshot_url = format!("{}/v1/cognition/snapshots/latest", config.source_base_url);
3606        let snapshot_res = tokio::time::timeout(
3607            Duration::from_millis(config.request_timeout_ms),
3608            client.get(snapshot_url).send(),
3609        )
3610        .await
3611        .ok()
3612        .and_then(Result::ok);
3613
3614        if let Some(snapshot_res) = snapshot_res
3615            && snapshot_res.status().is_success()
3616            && let Ok(snapshot) = snapshot_res.json::<CognitionLatestSnapshot>().await
3617            && let Some(obj) = payload.as_object_mut()
3618        {
3619            obj.insert(
3620                "latest_snapshot_at".to_string(),
3621                serde_json::Value::String(snapshot.generated_at),
3622            );
3623            obj.insert(
3624                "latest_thought".to_string(),
3625                serde_json::Value::String(trim_for_heartbeat(
3626                    &snapshot.summary,
3627                    config.summary_max_chars,
3628                )),
3629            );
3630            if let Some(model) = snapshot
3631                .metadata
3632                .get("model")
3633                .and_then(serde_json::Value::as_str)
3634            {
3635                obj.insert(
3636                    "latest_thought_model".to_string(),
3637                    serde_json::Value::String(model.to_string()),
3638                );
3639            }
3640            if let Some(source) = snapshot
3641                .metadata
3642                .get("source")
3643                .and_then(serde_json::Value::as_str)
3644            {
3645                obj.insert(
3646                    "latest_thought_source".to_string(),
3647                    serde_json::Value::String(source.to_string()),
3648                );
3649            }
3650        }
3651    }
3652
3653    Some(payload)
3654}
3655
3656fn trim_for_heartbeat(input: &str, max_chars: usize) -> String {
3657    if input.chars().count() <= max_chars {
3658        return input.trim().to_string();
3659    }
3660
3661    let mut trimmed = String::with_capacity(max_chars + 3);
3662    for ch in input.chars().take(max_chars) {
3663        trimmed.push(ch);
3664    }
3665    trimmed.push_str("...");
3666    trimmed.trim().to_string()
3667}
3668
3669fn env_bool(name: &str, default: bool) -> bool {
3670    std::env::var(name)
3671        .ok()
3672        .and_then(|v| match v.to_ascii_lowercase().as_str() {
3673            "1" | "true" | "yes" | "on" => Some(true),
3674            "0" | "false" | "no" | "off" => Some(false),
3675            _ => None,
3676        })
3677        .unwrap_or(default)
3678}
3679
3680fn env_usize(name: &str, default: usize) -> usize {
3681    std::env::var(name)
3682        .ok()
3683        .and_then(|v| v.parse::<usize>().ok())
3684        .unwrap_or(default)
3685}
3686
3687fn env_u64(name: &str, default: u64) -> u64 {
3688    std::env::var(name)
3689        .ok()
3690        .and_then(|v| v.parse::<u64>().ok())
3691        .unwrap_or(default)
3692}
3693
3694fn maybe_configure_repo_git_auth_from_entry(entry: &serde_json::Value) {
3695    let Some(workspace_id) = entry.get("id").and_then(|v| v.as_str()) else {
3696        return;
3697    };
3698    let Some(path) = entry.get("path").and_then(|v| v.as_str()) else {
3699        return;
3700    };
3701    let has_git_remote = entry
3702        .get("git_url")
3703        .and_then(|v| v.as_str())
3704        .is_some_and(|value| !value.trim().is_empty());
3705    if !has_git_remote {
3706        return;
3707    }
3708
3709    let repo_path = Path::new(path);
3710    if !repo_path.join(".git").exists() {
3711        return;
3712    }
3713
3714    if let Err(error) = configure_repo_git_auth(repo_path, workspace_id) {
3715        tracing::debug!(
3716            workspace_id,
3717            path,
3718            error = %error,
3719            "Workspace sync could not install Git credential helper"
3720        );
3721    }
3722    configure_repo_git_github_app_from_agent_config(repo_path, entry.get("agent_config"));
3723}
3724
3725fn configure_repo_git_github_app_from_agent_config(
3726    repo_path: &Path,
3727    agent_config: Option<&serde_json::Value>,
3728) {
3729    let installation_id = agent_config
3730        .and_then(|value| value.get("git_auth"))
3731        .and_then(|value| value.get("github_app"))
3732        .and_then(|value| value.get("installation_id"))
3733        .and_then(|value| value.as_str());
3734    let app_id = agent_config
3735        .and_then(|value| value.get("git_auth"))
3736        .and_then(|value| value.get("github_app"))
3737        .and_then(|value| value.get("app_id"))
3738        .and_then(|value| value.as_str());
3739    if let Err(error) = configure_repo_git_github_app(repo_path, installation_id, app_id) {
3740        tracing::debug!(path = %repo_path.display(), error = %error, "Failed to persist GitHub App repo metadata");
3741    }
3742}
3743
3744/// Start a background task that periodically fetches the server's workspace list
3745/// and auto-registers any whose path exists on this machine's filesystem.
3746/// This lets workers pick up new codebases without restarting.
3747fn start_workspace_sync(
3748    client: Client,
3749    server: String,
3750    token: Option<String>,
3751    shared_codebases: Arc<Mutex<Vec<String>>>,
3752) -> JoinHandle<()> {
3753    tokio::spawn(async move {
3754        const POLL_INTERVAL_SECS: u64 = 60;
3755        let mut interval = tokio::time::interval(Duration::from_secs(POLL_INTERVAL_SECS));
3756        interval.set_missed_tick_behavior(tokio::time::MissedTickBehavior::Skip);
3757        interval.tick().await; // skip the immediate first tick -- let the worker settle first
3758
3759        loop {
3760            interval.tick().await;
3761            if let Err(e) =
3762                sync_workspaces_from_server(&client, &server, &token, &shared_codebases).await
3763            {
3764                tracing::warn!("Workspace sync failed: {}", e);
3765            }
3766        }
3767    })
3768}
3769
3770/// Fetch the server's workspace/codebase list and add any locally-present paths
3771/// that are not already in the worker's registered codebases.
3772/// New paths take effect on the next SSE reconnect (re-register re-sends X-Workspaces).
3773async fn sync_workspaces_from_server(
3774    client: &Client,
3775    server: &str,
3776    token: &Option<String>,
3777    shared_codebases: &Arc<Mutex<Vec<String>>>,
3778) -> Result<()> {
3779    let mut req = client.get(format!("{}/v1/agent/workspaces", server));
3780    if let Some(t) = token {
3781        req = req.bearer_auth(t);
3782    }
3783
3784    let res = req.send().await?;
3785    if !res.status().is_success() {
3786        tracing::debug!(
3787            status = %res.status(),
3788            "Workspace sync: server returned non-success, skipping"
3789        );
3790        return Ok(());
3791    }
3792
3793    let data: serde_json::Value = res.json().await?;
3794
3795    // Server returns { workspaces: [...] } or { codebases: [...] }
3796    let entries = if let Some(arr) = data.as_array() {
3797        arr.clone()
3798    } else {
3799        data["workspaces"]
3800            .as_array()
3801            .or_else(|| data["codebases"].as_array())
3802            .cloned()
3803            .unwrap_or_default()
3804    };
3805
3806    let mut new_paths: Vec<String> = Vec::new();
3807    {
3808        let current = shared_codebases.lock().await;
3809        for entry in &entries {
3810            maybe_configure_repo_git_auth_from_entry(entry);
3811            let path = match entry["path"].as_str().filter(|p| !p.is_empty()) {
3812                Some(p) => p,
3813                None => continue,
3814            };
3815            // Only auto-register if the path physically exists on this machine
3816            // and is not already in the codebases list
3817            if std::path::Path::new(path).exists() && !current.iter().any(|c| c.as_str() == path) {
3818                new_paths.push(path.to_string());
3819            }
3820        }
3821    }
3822
3823    if !new_paths.is_empty() {
3824        let mut current = shared_codebases.lock().await;
3825        for path in &new_paths {
3826            tracing::info!(
3827                path = %path,
3828                "Workspace sync: auto-discovered local path, adding to codebases"
3829            );
3830            current.push(path.clone());
3831        }
3832        tracing::info!(
3833            added = new_paths.len(),
3834            total = current.len(),
3835            "Workspace sync complete -- new paths take effect on next reconnect"
3836        );
3837    } else {
3838        tracing::debug!("Workspace sync: no new local paths found");
3839    }
3840
3841    Ok(())
3842}
3843
3844#[cfg(test)]
3845mod tests {
3846    use super::*;
3847    use crate::provider::{
3848        CompletionRequest, CompletionResponse, ContentPart, FinishReason, Message, ModelInfo,
3849        Provider, Role, StreamChunk, ToolDefinition, Usage,
3850    };
3851    use futures::stream::{self, BoxStream};
3852    use serde_json::json;
3853    use std::sync::atomic::{AtomicBool, AtomicUsize, Ordering};
3854
3855    struct ContextErrorUntilCompactProvider {
3856        calls: AtomicUsize,
3857        saw_compact_schema: AtomicBool,
3858    }
3859
3860    impl ContextErrorUntilCompactProvider {
3861        fn new() -> Self {
3862            Self {
3863                calls: AtomicUsize::new(0),
3864                saw_compact_schema: AtomicBool::new(false),
3865            }
3866        }
3867
3868        fn calls(&self) -> usize {
3869            self.calls.load(Ordering::SeqCst)
3870        }
3871
3872        fn saw_compact_schema(&self) -> bool {
3873            self.saw_compact_schema.load(Ordering::SeqCst)
3874        }
3875    }
3876
3877    #[async_trait::async_trait]
3878    impl Provider for ContextErrorUntilCompactProvider {
3879        fn name(&self) -> &str {
3880            "mock"
3881        }
3882
3883        async fn list_models(&self) -> Result<Vec<ModelInfo>> {
3884            Ok(Vec::new())
3885        }
3886
3887        async fn complete(&self, request: CompletionRequest) -> Result<CompletionResponse> {
3888            self.calls.fetch_add(1, Ordering::SeqCst);
3889            let compact = request.tools.iter().all(|tool| {
3890                tool.parameters.get("additionalProperties") == Some(&serde_json::Value::Bool(true))
3891            });
3892            if compact {
3893                self.saw_compact_schema.store(true, Ordering::SeqCst);
3894                return Ok(CompletionResponse {
3895                    message: Message {
3896                        role: Role::Assistant,
3897                        content: vec![ContentPart::Text { text: "ok".into() }],
3898                    },
3899                    usage: Usage::default(),
3900                    finish_reason: FinishReason::Stop,
3901                });
3902            }
3903            anyhow::bail!(
3904                "Your input exceeds the context window of this model. Please adjust your input and try again."
3905            )
3906        }
3907
3908        async fn complete_stream(
3909            &self,
3910            _request: CompletionRequest,
3911        ) -> Result<BoxStream<'static, StreamChunk>> {
3912            Ok(Box::pin(stream::empty()))
3913        }
3914    }
3915
3916    #[test]
3917    fn metadata_lookup_reads_nested_forage_keys() {
3918        let metadata = json!({
3919            "forage": {
3920                "execute": true,
3921                "top": 5,
3922            }
3923        })
3924        .as_object()
3925        .cloned()
3926        .unwrap();
3927
3928        assert_eq!(metadata_bool(&metadata, &["execute"]), Some(true));
3929        assert_eq!(metadata_usize(&metadata, &["top"]), Some(5));
3930    }
3931
3932    #[test]
3933    fn build_forage_args_defaults_prompt_to_moonshot_and_local_mode() {
3934        let metadata = serde_json::Map::new();
3935        let args = build_forage_args(
3936            "Expand enterprise adoption in regulated markets",
3937            "forage task",
3938            &metadata,
3939            Some("openrouter/z-ai/glm-5".to_string()),
3940        );
3941
3942        assert_eq!(
3943            args.moonshots,
3944            vec!["Expand enterprise adoption in regulated markets".to_string()]
3945        );
3946        assert!(args.no_s3);
3947        assert_eq!(args.model.as_deref(), Some("openrouter/z-ai/glm-5"));
3948        assert_eq!(args.execution_engine, "run");
3949    }
3950
3951    #[test]
3952    fn compact_worker_tool_definitions_preserve_tool_identity() {
3953        let tools = vec![ToolDefinition {
3954            name: "large_tool".to_string(),
3955            description: "Large tool schema".to_string(),
3956            parameters: json!({
3957                "type": "object",
3958                "properties": {
3959                    "payload": {
3960                        "type": "string",
3961                        "description": "x".repeat(20_000)
3962                    }
3963                }
3964            }),
3965        }];
3966
3967        let compact = compact_worker_tool_definitions(&tools);
3968
3969        assert_eq!(compact[0].name, tools[0].name);
3970        assert_eq!(compact[0].description, tools[0].description);
3971        assert_eq!(
3972            compact[0].parameters.get("additionalProperties"),
3973            Some(&serde_json::Value::Bool(true))
3974        );
3975        assert!(tool_schema_bytes(&compact) < tool_schema_bytes(&tools) / 10);
3976    }
3977
3978    #[tokio::test]
3979    async fn worker_retries_context_window_error_with_compact_tool_schema() {
3980        let provider = Arc::new(ContextErrorUntilCompactProvider::new());
3981        let provider_dyn: Arc<dyn Provider> = provider.clone();
3982        let mut session = Session::new().await.expect("session");
3983        session.add_message(Message {
3984            role: Role::User,
3985            content: vec![ContentPart::Text {
3986                text: "Run cronjob \"rule:Retry Failed Conversion Forwards\".".to_string(),
3987            }],
3988        });
3989        let tools = vec![ToolDefinition {
3990            name: "large_tool".to_string(),
3991            description: "Large tool schema".to_string(),
3992            parameters: json!({
3993                "type": "object",
3994                "properties": {
3995                    "payload": {
3996                        "type": "string",
3997                        "description": "x".repeat(20_000)
3998                    }
3999                }
4000            }),
4001        }];
4002
4003        let response = complete_worker_step_with_context_fallback(
4004            provider_dyn,
4005            &session,
4006            "mock-model",
4007            "system",
4008            &tools,
4009            Some(0.7),
4010        )
4011        .await
4012        .expect("compact tool-schema retry should recover");
4013
4014        assert_eq!(
4015            response
4016                .message
4017                .content
4018                .iter()
4019                .filter_map(|part| match part {
4020                    ContentPart::Text { text } => Some(text.as_str()),
4021                    _ => None,
4022                })
4023                .collect::<String>(),
4024            "ok"
4025        );
4026        assert_eq!(provider.calls(), 5);
4027        assert!(provider.saw_compact_schema());
4028    }
4029
4030    #[test]
4031    fn build_forage_args_honors_nested_forage_configuration() {
4032        let metadata = json!({
4033            "forage": {
4034                "top": 7,
4035                "loop": true,
4036                "max_cycles": 2,
4037                "execute": true,
4038                "no_s3": false,
4039                "moonshots": ["Ship autonomous OKR execution", "Reduce operator toil"],
4040                "moonshot_required": true,
4041                "moonshot_min_alignment": "0.25",
4042                "execution_engine": "swarm",
4043                "run_timeout_secs": 1200,
4044                "fail_fast": true,
4045                "swarm_strategy": "stage",
4046                "swarm_max_subagents": 4,
4047                "swarm_max_steps": 42,
4048                "swarm_subagent_timeout_secs": 180
4049            }
4050        })
4051        .as_object()
4052        .cloned()
4053        .unwrap();
4054
4055        let args = build_forage_args("fallback prompt", "title", &metadata, None);
4056
4057        assert_eq!(args.top, 7);
4058        assert!(args.loop_mode);
4059        assert_eq!(args.max_cycles, 2);
4060        assert!(args.execute);
4061        assert!(!args.no_s3);
4062        assert_eq!(args.moonshots.len(), 2);
4063        assert!(args.moonshot_required);
4064        assert!((args.moonshot_min_alignment - 0.25).abs() < f64::EPSILON);
4065        assert_eq!(args.execution_engine, "swarm");
4066        assert_eq!(args.run_timeout_secs, 1200);
4067        assert!(args.fail_fast);
4068        assert_eq!(args.swarm_strategy, "stage");
4069        assert_eq!(args.swarm_max_subagents, 4);
4070        assert_eq!(args.swarm_max_steps, 42);
4071        assert_eq!(args.swarm_subagent_timeout_secs, 180);
4072    }
4073
4074    #[test]
4075    fn worker_allows_github_app_post_clone_followups() {
4076        let metadata = json!({
4077            "source": "github-app",
4078            "post_clone_task": {
4079                "title": "Work issue #76",
4080                "prompt": "fix it"
4081            }
4082        })
4083        .as_object()
4084        .cloned()
4085        .unwrap();
4086
4087        assert!(worker_should_enqueue_post_clone_task(&metadata));
4088    }
4089
4090    #[test]
4091    fn worker_allows_legacy_post_clone_followups() {
4092        let metadata = json!({
4093            "post_clone_task": {
4094                "title": "Continue after clone",
4095                "prompt": "run build"
4096            }
4097        })
4098        .as_object()
4099        .cloned()
4100        .unwrap();
4101
4102        assert!(worker_should_enqueue_post_clone_task(&metadata));
4103    }
4104
4105    #[test]
4106    fn resolve_worker_id_prefers_env() {
4107        let original = std::env::var("CODETETHER_WORKER_ID").ok();
4108        unsafe {
4109            std::env::set_var("CODETETHER_WORKER_ID", "harvester-test-worker");
4110        }
4111        let resolved = resolve_worker_id();
4112        match original {
4113            Some(value) => unsafe {
4114                std::env::set_var("CODETETHER_WORKER_ID", value);
4115            },
4116            None => unsafe {
4117                std::env::remove_var("CODETETHER_WORKER_ID");
4118            },
4119        }
4120        assert_eq!(resolved, "harvester-test-worker");
4121    }
4122
4123    #[test]
4124    fn advertised_interfaces_include_http_and_bus_urls() {
4125        let interfaces = advertised_interfaces(Some("http://worker.test:8080/"));
4126        assert_eq!(interfaces["http"]["base_url"], "http://worker.test:8080");
4127        assert_eq!(
4128            interfaces["bus"]["stream_url"],
4129            "http://worker.test:8080/v1/bus/stream"
4130        );
4131        assert_eq!(
4132            interfaces["bus"]["publish_url"],
4133            "http://worker.test:8080/v1/bus/publish"
4134        );
4135    }
4136
4137    #[test]
4138    fn advertised_interfaces_omit_empty_public_url() {
4139        assert_eq!(advertised_interfaces(None), serde_json::json!({}));
4140        assert_eq!(advertised_interfaces(Some("   ")), serde_json::json!({}));
4141    }
4142
4143    #[test]
4144    fn worker_http_client_builds_with_stream_decoders_disabled() {
4145        assert!(build_worker_http_client().is_ok());
4146    }
4147
4148    #[test]
4149    fn task_timeout_prefers_fire_and_forget_payload_timeout() {
4150        let task = json!({
4151            "id": "task-1",
4152            "task_timeout_seconds": 604800,
4153            "metadata": {"timeout_secs": 1200}
4154        });
4155        let metadata = task_metadata(&task);
4156        assert_eq!(task_timeout_secs(&task, &metadata), 604800);
4157    }
4158
4159    #[tokio::test]
4160    async fn reserve_task_slot_enforces_capacity() {
4161        let processing = Arc::new(Mutex::new(HashSet::new()));
4162
4163        assert_eq!(
4164            reserve_task_slot(&processing, "task-1", 2).await,
4165            TaskReservation::Reserved
4166        );
4167        assert_eq!(
4168            reserve_task_slot(&processing, "task-1", 2).await,
4169            TaskReservation::AlreadyProcessing
4170        );
4171        assert_eq!(
4172            reserve_task_slot(&processing, "task-2", 2).await,
4173            TaskReservation::Reserved
4174        );
4175        assert_eq!(
4176            reserve_task_slot(&processing, "task-3", 2).await,
4177            TaskReservation::AtCapacity
4178        );
4179    }
4180
4181    #[test]
4182    fn normalize_max_concurrent_tasks_never_returns_zero() {
4183        assert_eq!(normalize_max_concurrent_tasks(0), 1);
4184        assert_eq!(normalize_max_concurrent_tasks(3), 3);
4185    }
4186}