1mod clone_location;
4mod clone_target;
5mod model_defaults;
6mod model_preferences;
7pub(crate) mod task_timeline;
8pub(super) mod workspace_resolve;
9
10use crate::a2a::claim::TaskClaimResponse;
11use crate::a2a::git_credentials::{
12 configure_repo_git_auth, configure_repo_git_github_app, write_git_credential_helper_script,
13};
14use crate::a2a::spawn::{A2APeerHandle, SpawnOptions};
15use crate::a2a::worker_workspace_record::{RegisteredWorkspaceRecord, fetch_workspace_record};
16use crate::bus::AgentBus;
17use crate::cli::{A2aArgs, ForageArgs};
18use crate::provenance::{ClaimProvenance, git_commit_with_provenance, install_commit_msg_hook};
19use crate::provider::ProviderRegistry;
20use crate::session::Session;
21use crate::session::helper::runtime::enrich_tool_input_with_runtime_context;
22use crate::swarm::{DecompositionStrategy, SwarmConfig, SwarmExecutor};
23use crate::tui::swarm_view::SwarmEvent;
24use anyhow::{Context, Result};
25use futures::StreamExt;
26use reqwest::Client;
27use serde::Deserialize;
28use std::collections::HashMap;
29use std::collections::HashSet;
30use std::path::{Path, PathBuf};
31use std::sync::Arc;
32use std::time::{Duration, SystemTime};
33use tokio::process::Command;
34use tokio::sync::{Mutex, mpsc};
35use tokio::task::JoinHandle;
36use tokio::time::Instant;
37
38use self::model_defaults::{default_model_for_provider, prefers_temperature_one};
39use self::model_preferences::choose_provider_for_tier;
40use crate::a2a::task_scope::check_task_scope;
41use crate::a2a::worker_tool_registry::{create_filtered_registry, is_tool_allowed};
42use crate::a2a::worker_workspace_context::resolve_task_workspace_dir;
43use clone_location::{git_clone_base_dir, resolve_workspace_clone_path};
44use clone_target::prepare_clone_target;
45
46#[derive(Debug, Clone, Copy, PartialEq, Eq)]
48pub enum WorkerStatus {
49 Idle,
50 Processing,
51}
52
53impl WorkerStatus {
54 pub fn as_str(&self) -> &'static str {
55 match self {
56 WorkerStatus::Idle => "idle",
57 WorkerStatus::Processing => "processing",
58 }
59 }
60}
61
62#[derive(Clone)]
64pub struct HeartbeatState {
65 worker_id: String,
66 pub agent_name: String,
67 pub status: Arc<Mutex<WorkerStatus>>,
68 pub active_task_count: Arc<Mutex<usize>>,
69 pub sub_agents: Arc<Mutex<HashSet<String>>>,
70}
71
72impl HeartbeatState {
73 pub fn new(worker_id: String, agent_name: String) -> Self {
74 Self {
75 worker_id,
76 agent_name,
77 status: Arc::new(Mutex::new(WorkerStatus::Idle)),
78 active_task_count: Arc::new(Mutex::new(0)),
79 sub_agents: Arc::new(Mutex::new(HashSet::new())),
80 }
81 }
82
83 pub async fn set_status(&self, status: WorkerStatus) {
84 *self.status.lock().await = status;
85 }
86
87 pub async fn set_task_count(&self, count: usize) {
88 *self.active_task_count.lock().await = count;
89 }
90
91 pub async fn register_sub_agent(&self, agent_name: String) {
92 self.sub_agents.lock().await.insert(agent_name);
93 }
94
95 pub async fn deregister_sub_agent(&self, agent_name: &str) {
96 self.sub_agents.lock().await.remove(agent_name);
97 }
98
99 pub async fn sub_agents_snapshot(&self) -> Vec<String> {
100 let mut names = self
101 .sub_agents
102 .lock()
103 .await
104 .iter()
105 .cloned()
106 .collect::<Vec<_>>();
107 names.sort();
108 names
109 }
110}
111
112#[derive(Clone, Debug)]
113#[allow(dead_code)]
114pub struct CognitionHeartbeatConfig {
115 pub enabled: bool,
116 pub source_base_url: String,
117 pub token: Option<String>,
118 pub provider_name: String,
119 pub interval_secs: u64,
120 pub include_thought_summary: bool,
121 pub summary_max_chars: usize,
122 pub request_timeout_ms: u64,
123}
124
125impl CognitionHeartbeatConfig {
126 pub fn from_env() -> Self {
127 let source_base_url = std::env::var("CODETETHER_WORKER_COGNITION_SOURCE_URL")
128 .unwrap_or_else(|_| "http://127.0.0.1:4096".to_string())
129 .trim_end_matches('/')
130 .to_string();
131
132 Self {
133 enabled: env_bool("CODETETHER_WORKER_COGNITION_SHARE_ENABLED", true),
134 source_base_url,
135 include_thought_summary: env_bool("CODETETHER_WORKER_COGNITION_INCLUDE_THOUGHTS", true),
136 summary_max_chars: env_usize("CODETETHER_WORKER_COGNITION_THOUGHT_MAX_CHARS", 480),
137 request_timeout_ms: env_u64("CODETETHER_WORKER_COGNITION_TIMEOUT_MS", 2_500).max(250),
138 interval_secs: env_u64("CODETETHER_WORKER_COGNITION_INTERVAL_SECS", 30).max(5),
139 provider_name: std::env::var("CODETETHER_WORKER_COGNITION_PROVIDER")
140 .unwrap_or_else(|_| "cognition".to_string()),
141 token: std::env::var("CODETETHER_WORKER_COGNITION_TOKEN").ok(),
142 }
143 }
144}
145
146#[derive(Debug, Deserialize)]
147struct CognitionStatusSnapshot {
148 running: bool,
149 #[serde(default)]
150 last_tick_at: Option<String>,
151 #[serde(default)]
152 active_persona_count: usize,
153 #[serde(default)]
154 events_buffered: usize,
155 #[serde(default)]
156 snapshots_buffered: usize,
157 #[serde(default)]
158 loop_interval_ms: u64,
159}
160
161#[derive(Debug, Deserialize)]
162struct CognitionLatestSnapshot {
163 generated_at: String,
164 summary: String,
165 #[serde(default)]
166 metadata: HashMap<String, serde_json::Value>,
167}
168
169#[derive(Debug, Clone, Copy, PartialEq, Eq)]
170enum TaskReservation {
171 Reserved,
172 AlreadyProcessing,
173 AtCapacity,
174}
175
176#[derive(Clone)]
177struct WorkerTaskRuntime {
178 client: Client,
179 server: String,
180 token: Option<String>,
181 worker_id: String,
182 agent_name: String,
183 processing: Arc<Mutex<HashSet<String>>>,
184 max_concurrent_tasks: usize,
185 auto_approve: AutoApprove,
186 bus: Arc<AgentBus>,
187 task_progress: Arc<Mutex<task_timeline::TaskProgressState>>,
189 workspace_ids: Vec<String>,
191}
192
193fn worker_a2a_mdns_enabled() -> bool {
194 env_bool("CODETETHER_WORKER_A2A_MDNS", false)
195}
196
197fn worker_a2a_port() -> u16 {
198 std::env::var("CODETETHER_WORKER_A2A_PORT")
199 .ok()
200 .and_then(|value| value.trim().parse::<u16>().ok())
201 .unwrap_or(8081)
202}
203
204fn worker_a2a_peer_seeds() -> Vec<String> {
205 ["CODETETHER_WORKER_A2A_PEERS", "CODETETHER_A2A_PEERS"]
206 .into_iter()
207 .filter_map(|name| std::env::var(name).ok())
208 .flat_map(|value| {
209 value
210 .split(',')
211 .map(str::trim)
212 .filter(|peer| !peer.is_empty())
213 .map(ToString::to_string)
214 .collect::<Vec<_>>()
215 })
216 .collect()
217}
218
219fn worker_public_url_for_port(args: &A2aArgs, port: u16) -> Option<String> {
220 let configured = args
221 .public_url
222 .as_deref()
223 .map(str::trim)
224 .filter(|value| !value.is_empty())?;
225
226 if let Ok(mut url) = reqwest::Url::parse(configured) {
227 let _ = url.set_port(Some(port));
228 return Some(url.as_str().trim_end_matches('/').to_string());
229 }
230
231 Some(configured.trim_end_matches('/').to_string())
232}
233
234async fn maybe_start_worker_a2a_peer(
235 args: &A2aArgs,
236 name: &str,
237 bus: Arc<AgentBus>,
238) -> Option<A2APeerHandle> {
239 if !worker_a2a_mdns_enabled() {
240 tracing::info!(
241 "Worker A2A mDNS peer disabled; set CODETETHER_WORKER_A2A_MDNS=true to enable"
242 );
243 return None;
244 }
245
246 let port = worker_a2a_port();
247 let public_url = worker_public_url_for_port(args, port);
248 let peer = worker_a2a_peer_seeds();
249 if !peer.is_empty() {
250 tracing::info!(peer_count = peer.len(), "Worker A2A peer seeds configured");
251 }
252 let opts = SpawnOptions {
253 name: Some(format!("{name}-a2a")),
254 hostname: args.hostname.clone(),
255 port,
256 public_url,
257 description: Some(format!("CodeTether harvester worker A2A peer for {name}")),
258 peer,
259 discovery_interval_secs: 15,
260 auto_introduce: true,
261 mdns: true,
262 };
263
264 match crate::a2a::spawn::start_a2a_in_background(opts, bus).await {
265 Ok(handle) => {
266 tracing::info!(agent = %handle.agent_name, public_url = %handle.public_url, "Worker A2A mDNS peer started");
267 Some(handle)
268 }
269 Err(error) => {
270 tracing::warn!(%error, "Worker A2A mDNS peer failed to start; continuing SSE worker path");
271 None
272 }
273 }
274}
275
276pub async fn run(args: A2aArgs) -> Result<()> {
278 let server = args.server.trim_end_matches('/');
279 let name = args
280 .name
281 .as_deref()
282 .map(ToString::to_string)
283 .unwrap_or_else(|| format!("codetether-{}", std::process::id()));
284 let worker_id = resolve_worker_id();
285 export_worker_runtime_env(server, &args.token, &worker_id);
286 let max_concurrent_tasks = normalize_max_concurrent_tasks(args.max_concurrent_tasks);
287
288 let codebases: Vec<String> = args
289 .workspaces
290 .as_deref()
291 .map(|c| c.split(',').map(|s| s.trim().to_string()).collect())
292 .unwrap_or_else(|| vec![std::env::current_dir().unwrap().display().to_string()]);
293
294 tracing::info!("Starting A2A worker: {} ({})", name, worker_id);
295 tracing::info!("Server: {}", server);
296 tracing::info!("Workspaces: {:?}", codebases);
297 tracing::info!(max_concurrent_tasks, "Worker task concurrency configured");
298
299 let shared_codebases: Arc<Mutex<Vec<String>>> = Arc::new(Mutex::new(codebases));
301
302 let client = build_worker_http_client()?;
303 let processing = Arc::new(Mutex::new(HashSet::<String>::new()));
304 let cognition_heartbeat = CognitionHeartbeatConfig::from_env();
305 if cognition_heartbeat.enabled {
306 tracing::info!(
307 source = %cognition_heartbeat.source_base_url,
308 include_thoughts = cognition_heartbeat.include_thought_summary,
309 max_chars = cognition_heartbeat.summary_max_chars,
310 timeout_ms = cognition_heartbeat.request_timeout_ms,
311 "Cognition heartbeat sharing enabled (set CODETETHER_WORKER_COGNITION_SHARE_ENABLED=false to disable)"
312 );
313 } else {
314 tracing::warn!(
315 "Cognition heartbeat sharing disabled; worker thought state will not be shared upstream"
316 );
317 }
318
319 let auto_approve = match args.auto_approve.as_str() {
320 "all" => AutoApprove::All,
321 "safe" => AutoApprove::Safe,
322 _ => AutoApprove::None,
323 };
324
325 let heartbeat_state = HeartbeatState::new(worker_id.clone(), name.clone());
327
328 let bus = AgentBus::new().into_arc();
330
331 crate::bus::s3_sink::spawn_bus_s3_sink(bus.clone());
333
334 {
335 let handle = bus.handle(&worker_id);
336 handle.announce_ready(worker_capabilities());
337 }
338
339 let _a2a_peer = maybe_start_worker_a2a_peer(&args, &name, bus.clone()).await;
340
341 let task_progress: Arc<Mutex<task_timeline::TaskProgressState>> =
342 Arc::new(Mutex::new(task_timeline::TaskProgressState::new()));
343
344 let task_runtime = WorkerTaskRuntime {
345 client: client.clone(),
346 server: server.to_string(),
347 token: args.token.clone(),
348 worker_id: worker_id.clone(),
349 agent_name: name.clone(),
350 processing: processing.clone(),
351 max_concurrent_tasks,
352 auto_approve,
353 bus: bus.clone(),
354 task_progress: task_progress.clone(),
355 workspace_ids: Vec::new(),
356 };
357
358 {
360 let codebases = shared_codebases.lock().await.clone();
361 register_worker(
362 &client,
363 server,
364 &args.token,
365 &worker_id,
366 &name,
367 &codebases,
368 args.public_url.as_deref(),
369 )
370 .await?;
371 }
372
373 if let Err(e) =
374 sync_workspaces_from_server(&client, server, &args.token, &shared_codebases).await
375 {
376 tracing::warn!(error = %e, "Initial workspace sync failed");
377 }
378
379 fetch_pending_tasks(&task_runtime).await?;
381
382 let _workspace_sync_handle = start_workspace_sync(
384 client.clone(),
385 server.to_string(),
386 args.token.clone(),
387 shared_codebases.clone(),
388 );
389
390 loop {
392 let codebases = shared_codebases.lock().await.clone();
394
395 if let Err(e) = register_worker(
397 &client,
398 server,
399 &args.token,
400 &worker_id,
401 &name,
402 &codebases,
403 args.public_url.as_deref(),
404 )
405 .await
406 {
407 tracing::warn!("Failed to re-register worker on reconnection: {}", e);
408 }
409 if let Err(e) = fetch_pending_tasks(&task_runtime).await {
410 tracing::warn!("Reconnect task fetch failed: {}", e);
411 }
412
413 let heartbeat_handle = start_heartbeat(
415 client.clone(),
416 server.to_string(),
417 args.token.clone(),
418 heartbeat_state.clone(),
419 processing.clone(),
420 cognition_heartbeat.clone(),
421 task_progress.clone(),
422 );
423
424 match connect_stream(&task_runtime, &name, &codebases, None).await {
425 Ok(StreamDisconnectReason::Ended) => {
426 tracing::warn!("Stream ended, reconnecting...");
427 }
428 Ok(StreamDisconnectReason::ReadError(error)) => {
429 tracing::warn!(error = %error, "Stream read failed, reconnecting...");
430 }
431 Err(e) => {
432 tracing::error!("Stream error: {}, reconnecting...", e);
433 }
434 }
435
436 heartbeat_handle.abort();
438 tracing::debug!("Heartbeat cancelled for reconnection");
439
440 tokio::time::sleep(tokio::time::Duration::from_secs(5)).await;
441 }
442}
443
444pub async fn run_with_state(
447 args: A2aArgs,
448 server_state: crate::worker_server::WorkerServerState,
449) -> Result<()> {
450 let server = args.server.trim_end_matches('/');
451 let name = args
452 .name
453 .as_deref()
454 .map(ToString::to_string)
455 .unwrap_or_else(|| format!("codetether-{}", std::process::id()));
456 let worker_id = resolve_worker_id();
457 export_worker_runtime_env(server, &args.token, &worker_id);
458 let max_concurrent_tasks = normalize_max_concurrent_tasks(args.max_concurrent_tasks);
459
460 server_state.set_worker_id(worker_id.clone()).await;
462
463 let codebases: Vec<String> = args
464 .workspaces
465 .as_deref()
466 .map(|c| c.split(',').map(|s| s.trim().to_string()).collect())
467 .unwrap_or_else(|| vec![std::env::current_dir().unwrap().display().to_string()]);
468
469 tracing::info!("Starting A2A worker: {} ({})", name, worker_id);
470 tracing::info!("Server: {}", server);
471 tracing::info!("Workspaces: {:?}", codebases);
472 tracing::info!(max_concurrent_tasks, "Worker task concurrency configured");
473
474 let shared_codebases: Arc<Mutex<Vec<String>>> = Arc::new(Mutex::new(codebases));
476
477 let client = build_worker_http_client()?;
478 let processing = Arc::new(Mutex::new(HashSet::<String>::new()));
479 let cognition_heartbeat = CognitionHeartbeatConfig::from_env();
480 if cognition_heartbeat.enabled {
481 tracing::info!(
482 source = %cognition_heartbeat.source_base_url,
483 include_thoughts = cognition_heartbeat.include_thought_summary,
484 max_chars = cognition_heartbeat.summary_max_chars,
485 timeout_ms = cognition_heartbeat.request_timeout_ms,
486 "Cognition heartbeat sharing enabled (set CODETETHER_WORKER_COGNITION_SHARE_ENABLED=false to disable)"
487 );
488 } else {
489 tracing::warn!(
490 "Cognition heartbeat sharing disabled; worker thought state will not be shared upstream"
491 );
492 }
493
494 let auto_approve = match args.auto_approve.as_str() {
495 "all" => AutoApprove::All,
496 "safe" => AutoApprove::Safe,
497 _ => AutoApprove::None,
498 };
499
500 let heartbeat_state = HeartbeatState::new(worker_id.clone(), name.clone());
502
503 server_state
505 .set_heartbeat_state(Arc::new(heartbeat_state.clone()))
506 .await;
507
508 let bus = AgentBus::new().into_arc();
510 server_state.set_bus(bus.clone()).await;
511
512 crate::bus::s3_sink::spawn_bus_s3_sink(bus.clone());
514
515 {
516 let handle = bus.handle(&worker_id);
517 handle.announce_ready(worker_capabilities());
518 }
519
520 let _a2a_peer = maybe_start_worker_a2a_peer(&args, &name, bus.clone()).await;
521
522 let task_progress_ws: Arc<Mutex<task_timeline::TaskProgressState>> =
523 Arc::new(Mutex::new(task_timeline::TaskProgressState::new()));
524
525 let task_runtime = WorkerTaskRuntime {
526 client: client.clone(),
527 server: server.to_string(),
528 token: args.token.clone(),
529 worker_id: worker_id.clone(),
530 agent_name: name.clone(),
531 processing: processing.clone(),
532 max_concurrent_tasks,
533 auto_approve,
534 bus: bus.clone(),
535 task_progress: task_progress_ws.clone(),
536 workspace_ids: Vec::new(),
537 };
538
539 {
541 let codebases = shared_codebases.lock().await.clone();
542 register_worker(
543 &client,
544 server,
545 &args.token,
546 &worker_id,
547 &name,
548 &codebases,
549 args.public_url.as_deref(),
550 )
551 .await?;
552 }
553
554 if let Err(e) =
555 sync_workspaces_from_server(&client, server, &args.token, &shared_codebases).await
556 {
557 tracing::warn!(error = %e, "Initial workspace sync failed");
558 }
559
560 server_state.set_connected(true).await;
562
563 fetch_pending_tasks(&task_runtime).await?;
565
566 let _workspace_sync_handle = start_workspace_sync(
568 client.clone(),
569 server.to_string(),
570 args.token.clone(),
571 shared_codebases.clone(),
572 );
573
574 loop {
576 let codebases = shared_codebases.lock().await.clone();
578
579 let (task_notify_tx, task_notify_rx) = mpsc::channel::<String>(32);
582 server_state
583 .set_task_notification_channel(task_notify_tx)
584 .await;
585
586 server_state.set_connected(true).await;
588
589 if let Err(e) = register_worker(
591 &client,
592 server,
593 &args.token,
594 &worker_id,
595 &name,
596 &codebases,
597 args.public_url.as_deref(),
598 )
599 .await
600 {
601 tracing::warn!("Failed to re-register worker on reconnection: {}", e);
602 }
603 if let Err(e) = fetch_pending_tasks(&task_runtime).await {
604 tracing::warn!("Reconnect task fetch failed: {}", e);
605 }
606
607 let heartbeat_handle = start_heartbeat(
609 client.clone(),
610 server.to_string(),
611 args.token.clone(),
612 heartbeat_state.clone(),
613 processing.clone(),
614 cognition_heartbeat.clone(),
615 task_progress_ws.clone(),
616 );
617
618 match connect_stream(&task_runtime, &name, &codebases, Some(task_notify_rx)).await {
619 Ok(StreamDisconnectReason::Ended) => {
620 tracing::warn!("Stream ended, reconnecting...");
621 }
622 Ok(StreamDisconnectReason::ReadError(error)) => {
623 tracing::warn!(error = %error, "Stream read failed, reconnecting...");
624 }
625 Err(e) => {
626 tracing::error!("Stream error: {}, reconnecting...", e);
627 }
628 }
629
630 server_state.set_connected(false).await;
632
633 heartbeat_handle.abort();
635 tracing::debug!("Heartbeat cancelled for reconnection");
636
637 tokio::time::sleep(tokio::time::Duration::from_secs(5)).await;
638 }
639}
640
641pub fn generate_worker_id() -> String {
642 format!(
643 "wrk_{}_{:x}",
644 chrono::Utc::now().timestamp(),
645 rand::random::<u64>()
646 )
647}
648
649fn build_worker_http_client() -> Result<Client> {
650 Client::builder()
651 .no_gzip()
652 .no_brotli()
653 .no_zstd()
654 .no_deflate()
655 .build()
656 .context("Failed to build worker HTTP client")
657}
658
659fn resolve_worker_id() -> String {
660 for key in ["CODETETHER_WORKER_ID", "A2A_WORKER_ID"] {
661 if let Ok(value) = std::env::var(key) {
662 let trimmed = value.trim();
663 if !trimmed.is_empty() {
664 return trimmed.to_string();
665 }
666 }
667 }
668 generate_worker_id()
669}
670
671fn normalize_max_concurrent_tasks(max_concurrent_tasks: usize) -> usize {
672 max_concurrent_tasks.max(1)
673}
674
675fn persistent_worker_enabled() -> bool {
676 std::env::var("PERSISTENT_WORKER_ENABLED")
677 .map(|value| {
678 matches!(
679 value.trim().to_ascii_lowercase().as_str(),
680 "1" | "true" | "yes" | "on"
681 )
682 })
683 .unwrap_or(false)
684}
685
686fn persistent_worker_lease_seconds() -> u64 {
687 std::env::var("PERSISTENT_WORKER_LEASE_SECONDS")
688 .ok()
689 .and_then(|value| value.trim().parse::<u64>().ok())
690 .filter(|value| *value > 0)
691 .unwrap_or(3600)
692}
693
694async fn reserve_task_slot(
695 processing: &Arc<Mutex<HashSet<String>>>,
696 task_id: &str,
697 max_concurrent_tasks: usize,
698) -> TaskReservation {
699 let mut proc = processing.lock().await;
700 if proc.contains(task_id) {
701 TaskReservation::AlreadyProcessing
702 } else if proc.len() >= max_concurrent_tasks {
703 TaskReservation::AtCapacity
704 } else {
705 proc.insert(task_id.to_string());
706 TaskReservation::Reserved
707 }
708}
709
710fn export_worker_runtime_env(server: &str, token: &Option<String>, worker_id: &str) {
711 unsafe {
715 std::env::set_var("CODETETHER_SERVER", server);
716 std::env::set_var("CODETETHER_WORKER_ID", worker_id);
717 if let Some(token) = token {
718 std::env::set_var("CODETETHER_TOKEN", token);
719 }
720 }
721}
722
723fn advertised_interfaces(public_url: Option<&str>) -> serde_json::Value {
724 let Some(base_url) = public_url
725 .map(str::trim)
726 .filter(|value| !value.is_empty())
727 .map(|value| value.trim_end_matches('/').to_string())
728 else {
729 return serde_json::json!({});
730 };
731
732 serde_json::json!({
733 "http": {
734 "base_url": base_url,
735 },
736 "bus": {
737 "stream_url": format!("{base_url}/v1/bus/stream"),
738 "publish_url": format!("{base_url}/v1/bus/publish"),
739 },
740 })
741}
742
743#[derive(Debug, Clone, Copy)]
754pub enum AutoApprove {
755 All,
756 Safe,
757 None,
758}
759
760pub const DEFAULT_A2A_SERVER_URL: &str = "https://api.codetether.run";
762
763const PERSISTENT_WORKER_CAPABILITIES: &[&str] = &[
765 "forage", "ralph", "swarm", "rlm", "a2a", "mcp", "grpc", "grpc-web", "jsonrpc",
766];
767
768fn worker_capabilities() -> Vec<String> {
769 let is_knative = std::env::var("KNATIVE_SERVICE")
770 .map(|value| {
771 let normalized = value.trim().to_lowercase();
772 normalized == "1" || normalized == "true" || normalized == "yes"
773 })
774 .unwrap_or(false);
775
776 if is_knative {
777 return vec!["knative".to_string()];
778 }
779
780 let mut capabilities: Vec<String> = PERSISTENT_WORKER_CAPABILITIES
781 .iter()
782 .map(|capability| capability.to_string())
783 .collect();
784
785 capabilities.push("persistent".to_string());
786
787 capabilities
788}
789
790async fn sync_timeline_to_runtime(
792 timeline: &task_timeline::TaskTimeline,
793 runtime: &WorkerTaskRuntime,
794) {
795 timeline.sync_progress().await;
796 let state = timeline.progress_handle().lock().await.clone();
797 *runtime.task_progress.lock().await = state;
798}
799
800async fn resolve_and_log_workspace_ids(
802 client: &Client,
803 server: &str,
804 token: &Option<String>,
805 workspace_roots: &[String],
806) -> Vec<String> {
807 match workspace_resolve::resolve_workspace_ids(client, server, token, workspace_roots).await {
808 Ok(resolved) => {
809 if resolved.is_empty() {
810 tracing::warn!(
811 roots = ?workspace_roots,
812 "No server-side workspace IDs resolved for configured roots \
813 — the server may not route tasks to this worker. \
814 Ensure workspaces are registered with the control plane \
815 and that their paths fall under the configured roots."
816 );
817 } else {
818 let ids: Vec<String> = resolved.iter().map(|ws| ws.id.clone()).collect();
819 tracing::info!(
820 workspace_ids = ?ids,
821 count = resolved.len(),
822 "Resolved server-side workspace IDs for configured roots"
823 );
824 }
825 resolved.iter().map(|ws| ws.id.clone()).collect()
826 }
827 Err(e) => {
828 tracing::warn!(error = %e, "Failed to resolve workspace IDs from server");
829 Vec::new()
830 }
831 }
832}
833
834fn task_value<'a>(task: &'a serde_json::Value, key: &str) -> Option<&'a serde_json::Value> {
835 task.get("task")
836 .and_then(|t| t.get(key))
837 .or_else(|| task.get(key))
838}
839
840fn task_str<'a>(task: &'a serde_json::Value, key: &str) -> Option<&'a str> {
841 task_value(task, key).and_then(|v| v.as_str())
842}
843
844fn task_u64(task: &serde_json::Value, key: &str) -> Option<u64> {
845 let value = task_value(task, key)?;
846 if let Some(v) = value.as_u64() {
847 return Some(v);
848 }
849 if let Some(v) = value.as_i64()
850 && v >= 0
851 {
852 return Some(v as u64);
853 }
854 if let Some(v) = value.as_str()
855 && let Ok(parsed) = v.trim().parse::<u64>()
856 {
857 return Some(parsed);
858 }
859 None
860}
861
862fn task_metadata(task: &serde_json::Value) -> serde_json::Map<String, serde_json::Value> {
863 task_value(task, "metadata")
864 .and_then(|m| m.as_object())
865 .cloned()
866 .unwrap_or_default()
867}
868
869fn task_timeout_secs(
870 task: &serde_json::Value,
871 metadata: &serde_json::Map<String, serde_json::Value>,
872) -> u64 {
873 task_u64(task, "task_timeout_seconds")
874 .or_else(|| task_u64(task, "timeout_secs"))
875 .or_else(|| {
876 metadata_u64(
877 metadata,
878 &["task_timeout_seconds", "timeout_secs", "timeout"],
879 )
880 })
881 .unwrap_or(1200)
882 .clamp(60, 604800)
883}
884
885fn model_ref_to_provider_model(model: &str) -> String {
886 if !model.contains('/') && model.contains(':') {
890 model.replacen(':', "/", 1)
891 } else {
892 model.to_string()
893 }
894}
895
896fn is_swarm_agent(agent_type: &str) -> bool {
897 matches!(
898 agent_type.trim().to_ascii_lowercase().as_str(),
899 "swarm" | "parallel" | "multi-agent"
900 )
901}
902
903fn is_forage_agent(agent_type: &str) -> bool {
904 agent_type.trim().eq_ignore_ascii_case("forage")
905}
906
907fn metadata_lookup<'a>(
908 metadata: &'a serde_json::Map<String, serde_json::Value>,
909 key: &str,
910) -> Option<&'a serde_json::Value> {
911 metadata
912 .get(key)
913 .or_else(|| {
914 metadata
915 .get("routing")
916 .and_then(|v| v.as_object())
917 .and_then(|obj| obj.get(key))
918 })
919 .or_else(|| {
920 metadata
921 .get("swarm")
922 .and_then(|v| v.as_object())
923 .and_then(|obj| obj.get(key))
924 })
925 .or_else(|| {
926 metadata
927 .get("forage")
928 .and_then(|v| v.as_object())
929 .and_then(|obj| obj.get(key))
930 })
931}
932
933fn metadata_str(
934 metadata: &serde_json::Map<String, serde_json::Value>,
935 keys: &[&str],
936) -> Option<String> {
937 for key in keys {
938 if let Some(value) = metadata_lookup(metadata, key).and_then(|v| v.as_str()) {
939 let trimmed = value.trim();
940 if !trimmed.is_empty() {
941 return Some(trimmed.to_string());
942 }
943 }
944 }
945 None
946}
947
948fn metadata_usize(
949 metadata: &serde_json::Map<String, serde_json::Value>,
950 keys: &[&str],
951) -> Option<usize> {
952 for key in keys {
953 if let Some(value) = metadata_lookup(metadata, key) {
954 if let Some(v) = value.as_u64() {
955 return usize::try_from(v).ok();
956 }
957 if let Some(v) = value.as_i64()
958 && v >= 0
959 {
960 return usize::try_from(v as u64).ok();
961 }
962 if let Some(v) = value.as_str()
963 && let Ok(parsed) = v.trim().parse::<usize>()
964 {
965 return Some(parsed);
966 }
967 }
968 }
969 None
970}
971
972fn metadata_u64(
973 metadata: &serde_json::Map<String, serde_json::Value>,
974 keys: &[&str],
975) -> Option<u64> {
976 for key in keys {
977 if let Some(value) = metadata_lookup(metadata, key) {
978 if let Some(v) = value.as_u64() {
979 return Some(v);
980 }
981 if let Some(v) = value.as_i64()
982 && v >= 0
983 {
984 return Some(v as u64);
985 }
986 if let Some(v) = value.as_str()
987 && let Ok(parsed) = v.trim().parse::<u64>()
988 {
989 return Some(parsed);
990 }
991 }
992 }
993 None
994}
995
996fn metadata_bool(
997 metadata: &serde_json::Map<String, serde_json::Value>,
998 keys: &[&str],
999) -> Option<bool> {
1000 for key in keys {
1001 if let Some(value) = metadata_lookup(metadata, key) {
1002 if let Some(v) = value.as_bool() {
1003 return Some(v);
1004 }
1005 if let Some(v) = value.as_str() {
1006 match v.trim().to_ascii_lowercase().as_str() {
1007 "1" | "true" | "yes" | "on" => return Some(true),
1008 "0" | "false" | "no" | "off" => return Some(false),
1009 _ => {}
1010 }
1011 }
1012 }
1013 }
1014 None
1015}
1016
1017fn metadata_f64(
1018 metadata: &serde_json::Map<String, serde_json::Value>,
1019 keys: &[&str],
1020) -> Option<f64> {
1021 for key in keys {
1022 if let Some(value) = metadata_lookup(metadata, key) {
1023 if let Some(v) = value.as_f64() {
1024 return Some(v);
1025 }
1026 if let Some(v) = value.as_i64() {
1027 return Some(v as f64);
1028 }
1029 if let Some(v) = value.as_u64() {
1030 return Some(v as f64);
1031 }
1032 if let Some(v) = value.as_str()
1033 && let Ok(parsed) = v.trim().parse::<f64>()
1034 {
1035 return Some(parsed);
1036 }
1037 }
1038 }
1039 None
1040}
1041
1042fn metadata_string_list(
1043 metadata: &serde_json::Map<String, serde_json::Value>,
1044 keys: &[&str],
1045) -> Vec<String> {
1046 for key in keys {
1047 let Some(value) = metadata_lookup(metadata, key) else {
1048 continue;
1049 };
1050
1051 if let Some(items) = value.as_array() {
1052 let parsed = items
1053 .iter()
1054 .filter_map(|item| item.as_str())
1055 .map(str::trim)
1056 .filter(|item| !item.is_empty())
1057 .map(ToString::to_string)
1058 .collect::<Vec<_>>();
1059 if !parsed.is_empty() {
1060 return parsed;
1061 }
1062 }
1063
1064 if let Some(value) = value.as_str() {
1065 let parsed = value
1066 .split(['\n', ','])
1067 .map(str::trim)
1068 .filter(|item| !item.is_empty())
1069 .map(ToString::to_string)
1070 .collect::<Vec<_>>();
1071 if !parsed.is_empty() {
1072 return parsed;
1073 }
1074 }
1075 }
1076
1077 Vec::new()
1078}
1079
1080fn normalize_forage_execution_engine(value: Option<String>) -> String {
1081 match value
1082 .as_deref()
1083 .unwrap_or("run")
1084 .trim()
1085 .to_ascii_lowercase()
1086 .as_str()
1087 {
1088 "swarm" => "swarm".to_string(),
1089 "go" => "go".to_string(),
1090 _ => "run".to_string(),
1091 }
1092}
1093
1094fn normalize_forage_swarm_strategy(value: Option<String>) -> String {
1095 match value
1096 .as_deref()
1097 .unwrap_or("auto")
1098 .trim()
1099 .to_ascii_lowercase()
1100 .as_str()
1101 {
1102 "domain" => "domain".to_string(),
1103 "data" => "data".to_string(),
1104 "stage" => "stage".to_string(),
1105 "none" => "none".to_string(),
1106 _ => "auto".to_string(),
1107 }
1108}
1109
1110fn build_forage_args(
1111 prompt: &str,
1112 title: &str,
1113 metadata: &serde_json::Map<String, serde_json::Value>,
1114 selected_model: Option<String>,
1115) -> ForageArgs {
1116 let mut moonshots = metadata_string_list(
1117 metadata,
1118 &["moonshots", "moonshot", "goals", "mission", "missions"],
1119 );
1120 if moonshots.is_empty() {
1121 let fallback = prompt.trim();
1122 if !fallback.is_empty() {
1123 moonshots.push(fallback.to_string());
1124 } else if !title.trim().is_empty() {
1125 moonshots.push(title.trim().to_string());
1126 }
1127 }
1128
1129 ForageArgs {
1130 top: metadata_usize(metadata, &["top"]).unwrap_or(3),
1131 loop_mode: metadata_bool(metadata, &["loop", "loop_mode"]).unwrap_or(false),
1132 interval_secs: metadata_u64(metadata, &["interval_secs", "interval"]).unwrap_or(120),
1133 max_cycles: metadata_usize(metadata, &["max_cycles"]).unwrap_or(0),
1134 execute: metadata_bool(metadata, &["execute"]).unwrap_or(false),
1135 no_s3: metadata_bool(metadata, &["no_s3", "local_only"]).unwrap_or(true),
1137 moonshots,
1138 moonshot_file: metadata_str(metadata, &["moonshot_file"]).map(PathBuf::from),
1139 moonshot_required: metadata_bool(metadata, &["moonshot_required"]).unwrap_or(false),
1140 moonshot_min_alignment: metadata_f64(metadata, &["moonshot_min_alignment"]).unwrap_or(0.10),
1141 execution_engine: normalize_forage_execution_engine(metadata_str(
1142 metadata,
1143 &["execution_engine", "engine"],
1144 )),
1145 run_timeout_secs: metadata_u64(metadata, &["run_timeout_secs", "timeout_secs", "timeout"])
1146 .unwrap_or(900),
1147 fail_fast: metadata_bool(metadata, &["fail_fast"]).unwrap_or(false),
1148 swarm_strategy: normalize_forage_swarm_strategy(metadata_str(
1149 metadata,
1150 &["swarm_strategy", "strategy"],
1151 )),
1152 swarm_max_subagents: metadata_usize(metadata, &["swarm_max_subagents"]).unwrap_or(8),
1153 swarm_max_steps: metadata_usize(metadata, &["swarm_max_steps"]).unwrap_or(100),
1154 swarm_subagent_timeout_secs: metadata_u64(metadata, &["swarm_subagent_timeout_secs"])
1155 .unwrap_or(300),
1156 model: selected_model,
1157 json: metadata_bool(metadata, &["json"]).unwrap_or(false),
1158 }
1159}
1160
1161fn parse_swarm_strategy(
1162 metadata: &serde_json::Map<String, serde_json::Value>,
1163) -> DecompositionStrategy {
1164 match metadata_str(
1165 metadata,
1166 &[
1167 "decomposition_strategy",
1168 "swarm_strategy",
1169 "strategy",
1170 "swarm_decomposition",
1171 ],
1172 )
1173 .as_deref()
1174 .map(|s| s.to_ascii_lowercase())
1175 .as_deref()
1176 {
1177 Some("none") | Some("single") => DecompositionStrategy::None,
1178 Some("domain") | Some("by_domain") => DecompositionStrategy::ByDomain,
1179 Some("data") | Some("by_data") => DecompositionStrategy::ByData,
1180 Some("stage") | Some("by_stage") => DecompositionStrategy::ByStage,
1181 _ => DecompositionStrategy::Automatic,
1182 }
1183}
1184
1185async fn resolve_swarm_model(
1186 explicit_model: Option<String>,
1187 model_tier: Option<&str>,
1188) -> Option<String> {
1189 if let Some(model) = explicit_model
1190 && !model.trim().is_empty()
1191 {
1192 return Some(model);
1193 }
1194
1195 let registry = ProviderRegistry::shared_from_vault().await.ok()?;
1196 let providers = registry.list();
1197 if providers.is_empty() {
1198 return None;
1199 }
1200 let provider = choose_provider_for_tier(providers.as_slice(), model_tier);
1201 let model = default_model_for_provider(provider, model_tier);
1202 Some(format!("{}/{}", provider, model))
1203}
1204
1205fn format_swarm_event_for_output(event: &SwarmEvent) -> Option<String> {
1206 match event {
1207 SwarmEvent::Started {
1208 task,
1209 total_subtasks,
1210 } => Some(format!(
1211 "[swarm] started task={} planned_subtasks={}",
1212 task, total_subtasks
1213 )),
1214 SwarmEvent::StageComplete {
1215 stage,
1216 completed,
1217 failed,
1218 } => Some(format!(
1219 "[swarm] stage={} completed={} failed={}",
1220 stage, completed, failed
1221 )),
1222 SwarmEvent::SubTaskUpdate { id, status, .. } => Some(format!(
1223 "[swarm] subtask id={} status={}",
1224 &id.chars().take(8).collect::<String>(),
1225 format!("{status:?}").to_ascii_lowercase()
1226 )),
1227 SwarmEvent::AgentToolCall {
1228 subtask_id,
1229 tool_name,
1230 } => Some(format!(
1231 "[swarm] subtask id={} tool={}",
1232 &subtask_id.chars().take(8).collect::<String>(),
1233 tool_name
1234 )),
1235 SwarmEvent::AgentError { subtask_id, error } => Some(format!(
1236 "[swarm] subtask id={} error={}",
1237 &subtask_id.chars().take(8).collect::<String>(),
1238 error
1239 )),
1240 SwarmEvent::Complete { success, stats } => Some(format!(
1241 "[swarm] complete success={} subtasks={} speedup={:.2}",
1242 success,
1243 stats.subagents_completed + stats.subagents_failed,
1244 stats.speedup_factor
1245 )),
1246 SwarmEvent::Error(err) => Some(format!("[swarm] error message={}", err)),
1247 _ => None,
1248 }
1249}
1250
1251pub async fn register_worker(
1252 client: &Client,
1253 server: &str,
1254 token: &Option<String>,
1255 worker_id: &str,
1256 name: &str,
1257 codebases: &[String],
1258 public_url: Option<&str>,
1259) -> Result<()> {
1260 let models = match load_provider_models().await {
1262 Ok(m) => m,
1263 Err(e) => {
1264 tracing::warn!(
1265 "Failed to load provider models: {}, proceeding without model info",
1266 e
1267 );
1268 HashMap::new()
1269 }
1270 };
1271
1272 let mut req = client.post(format!("{}/v1/agent/workers/register", server));
1274
1275 if let Some(t) = token {
1276 req = req.bearer_auth(t);
1277 }
1278
1279 let models_array: Vec<serde_json::Value> = models
1282 .iter()
1283 .flat_map(|(provider, model_infos)| {
1284 model_infos.iter().map(move |m| {
1285 let mut obj = serde_json::json!({
1286 "id": format!("{}/{}", provider, m.id),
1287 "name": &m.id,
1288 "provider": provider,
1289 "provider_id": provider,
1290 });
1291 if let Some(input_cost) = m.input_cost_per_million {
1292 obj["input_cost_per_million"] = serde_json::json!(input_cost);
1293 }
1294 if let Some(output_cost) = m.output_cost_per_million {
1295 obj["output_cost_per_million"] = serde_json::json!(output_cost);
1296 }
1297 obj
1298 })
1299 })
1300 .collect();
1301
1302 tracing::info!(
1303 "Registering worker with {} models from {} providers",
1304 models_array.len(),
1305 models.len()
1306 );
1307
1308 let hostname = std::env::var("HOSTNAME")
1309 .or_else(|_| std::env::var("COMPUTERNAME"))
1310 .unwrap_or_else(|_| "unknown".to_string());
1311 let k8s_node_name = std::env::var("K8S_NODE_NAME")
1312 .ok()
1313 .map(|value| value.trim().to_string())
1314 .filter(|value| !value.is_empty());
1315
1316 let registry = crate::agent::AgentRegistry::with_builtins();
1318 let agent_defs: Vec<serde_json::Value> = registry
1319 .list()
1320 .iter()
1321 .map(|info| {
1322 serde_json::json!({
1323 "name": info.name,
1324 "description": info.description,
1325 "mode": format!("{:?}", info.mode).to_lowercase(),
1326 "native": info.native,
1327 "hidden": info.hidden,
1328 "model": info.model,
1329 "temperature": info.temperature,
1330 "top_p": info.top_p,
1331 "max_steps": info.max_steps,
1332 })
1333 })
1334 .collect();
1335
1336 let workspace_ids = resolve_and_log_workspace_ids(client, server, token, codebases).await;
1338
1339 let res = req
1340 .json(&serde_json::json!({
1341 "worker_id": worker_id,
1342 "name": name,
1343 "capabilities": worker_capabilities(),
1344 "hostname": hostname,
1345 "k8s_node_name": k8s_node_name,
1346 "models": models_array,
1347 "workspaces": codebases,
1348 "workspace_ids": workspace_ids,
1349 "interfaces": advertised_interfaces(public_url),
1350 "agents": agent_defs,
1351 }))
1352 .send()
1353 .await?;
1354
1355 if res.status().is_success() {
1356 tracing::info!("Worker registered successfully");
1357 } else {
1358 tracing::warn!("Failed to register worker: {}", res.status());
1359 }
1360
1361 Ok(())
1362}
1363
1364async fn load_provider_models() -> Result<HashMap<String, Vec<crate::provider::ModelInfo>>> {
1373 use tokio::sync::OnceCell;
1374 static CACHE: OnceCell<HashMap<String, Vec<crate::provider::ModelInfo>>> =
1375 OnceCell::const_new();
1376 CACHE
1377 .get_or_try_init(|| async { load_provider_models_uncached().await })
1378 .await
1379 .cloned()
1380}
1381
1382async fn load_provider_models_uncached() -> Result<HashMap<String, Vec<crate::provider::ModelInfo>>>
1383{
1384 let registry = match ProviderRegistry::shared_from_vault().await {
1386 Ok(r) if !r.list().is_empty() => {
1387 tracing::info!("Loaded {} providers from Vault", r.list().len());
1388 (*r).clone()
1389 }
1390 Ok(_) => {
1391 tracing::warn!("Vault returned 0 providers, falling back to config/env vars");
1392 fallback_registry().await?
1393 }
1394 Err(e) => {
1395 tracing::warn!("Vault unreachable ({}), falling back to config/env vars", e);
1396 fallback_registry().await?
1397 }
1398 };
1399
1400 let mut models_by_provider: HashMap<String, Vec<crate::provider::ModelInfo>> = HashMap::new();
1401
1402 for provider_name in registry.list() {
1403 if let Some(provider) = registry.get(provider_name) {
1404 match provider.list_models().await {
1405 Ok(models) => {
1406 if !models.is_empty() {
1407 tracing::debug!("Provider {}: {} models", provider_name, models.len());
1408 models_by_provider.insert(provider_name.to_string(), models);
1409 }
1410 }
1411 Err(e) => {
1412 tracing::debug!("Failed to list models for {}: {}", provider_name, e);
1413 }
1414 }
1415 }
1416 }
1417
1418 Ok(models_by_provider)
1419}
1420
1421async fn fallback_registry() -> Result<ProviderRegistry> {
1423 let config = crate::config::Config::load().await.unwrap_or_default();
1424 ProviderRegistry::from_config(&config).await
1425}
1426
1427async fn fetch_pending_tasks(runtime: &WorkerTaskRuntime) -> Result<()> {
1428 tracing::info!("Checking for pending tasks...");
1429
1430 let mut url = format!("{}/v1/agent/tasks?status=pending", runtime.server);
1431 if !runtime.agent_name.is_empty() {
1432 url = format!(
1433 "{}&agent_name={}",
1434 url,
1435 urlencoding::encode(&runtime.agent_name)
1436 );
1437 }
1438 let mut req = runtime.client.get(&url);
1439 if let Some(t) = &runtime.token {
1440 req = req.bearer_auth(t);
1441 }
1442
1443 let res = req.send().await?;
1444 if !res.status().is_success() {
1445 return Ok(());
1446 }
1447
1448 let data: serde_json::Value = res.json().await?;
1449 let tasks = if let Some(arr) = data.as_array() {
1451 arr.clone()
1452 } else {
1453 data["tasks"].as_array().cloned().unwrap_or_default()
1454 };
1455
1456 tracing::info!("Found {} pending task(s)", tasks.len());
1457
1458 for task in tasks {
1459 if let Some(id) = task["id"].as_str() {
1460 if let Err(reason) = check_task_scope(
1462 &task,
1463 &runtime.worker_id,
1464 &runtime.agent_name,
1465 &runtime.workspace_ids,
1466 ) {
1467 tracing::debug!(
1468 task_id = id,
1469 reason = %reason,
1470 "Pending task skipped — out of scope"
1471 );
1472 continue;
1473 }
1474
1475 match reserve_task_slot(&runtime.processing, id, runtime.max_concurrent_tasks).await {
1476 TaskReservation::Reserved => {
1477 let task_id = id.to_string();
1478 let runtime = runtime.clone();
1479
1480 tokio::spawn(async move {
1481 if let Err(e) = handle_task(&runtime, &task).await {
1482 tracing::error!("Task {} failed: {}", task_id, e);
1483 }
1484 runtime.processing.lock().await.remove(&task_id);
1485 });
1486 }
1487 TaskReservation::AlreadyProcessing => {}
1488 TaskReservation::AtCapacity => {
1489 tracing::debug!(
1490 max_concurrent_tasks = runtime.max_concurrent_tasks,
1491 "Worker is at task capacity; leaving remaining tasks pending"
1492 );
1493 break;
1494 }
1495 }
1496 }
1497 }
1498
1499 Ok(())
1500}
1501
1502#[derive(Debug, PartialEq, Eq)]
1503enum StreamDisconnectReason {
1504 Ended,
1505 ReadError(String),
1506}
1507
1508async fn connect_stream(
1509 runtime: &WorkerTaskRuntime,
1510 name: &str,
1511 codebases: &[String],
1512 task_notify_rx: Option<mpsc::Receiver<String>>,
1513) -> Result<StreamDisconnectReason> {
1514 let url = format!(
1515 "{}/v1/worker/tasks/stream?agent_name={}&worker_id={}",
1516 runtime.server,
1517 urlencoding::encode(name),
1518 urlencoding::encode(&runtime.worker_id)
1519 );
1520
1521 let mut req = runtime
1522 .client
1523 .get(&url)
1524 .header("Accept", "text/event-stream")
1525 .header("Accept-Encoding", "identity")
1526 .header("Cache-Control", "no-cache, no-transform")
1527 .header("X-Worker-ID", &runtime.worker_id)
1528 .header("X-Agent-Name", name)
1529 .header("X-Codebases", codebases.join(","))
1530 .header("X-Workspaces", codebases.join(","));
1531
1532 if let Some(t) = &runtime.token {
1533 req = req.bearer_auth(t);
1534 }
1535
1536 let res = req.send().await?;
1537 if !res.status().is_success() {
1538 anyhow::bail!("Failed to connect: {}", res.status());
1539 }
1540
1541 tracing::info!("Connected to A2A server");
1542
1543 let mut stream = res.bytes_stream();
1544 let mut buffer = String::new();
1545 let mut poll_interval = tokio::time::interval(tokio::time::Duration::from_secs(30));
1546 poll_interval.tick().await; let mut task_notify_rx = task_notify_rx;
1550
1551 loop {
1552 tokio::select! {
1553 task_id = async {
1556 if let Some(ref mut rx) = task_notify_rx {
1557 rx.recv().await
1558 } else {
1559 futures::future::pending().await
1561 }
1562 } => {
1563 if let Some(task_id) = task_id {
1564 tracing::info!("Received task notification via CloudEvent: {}", task_id);
1565 if let Err(e) = poll_pending_tasks(runtime).await {
1567 tracing::warn!("Task notification poll failed: {}", e);
1568 }
1569 }
1570 }
1571 chunk = stream.next() => {
1572 match chunk {
1573 Some(Ok(chunk)) => {
1574 buffer.push_str(&String::from_utf8_lossy(&chunk));
1575
1576 while let Some(pos) = buffer.find("\n\n") {
1578 let event_str = buffer[..pos].to_string();
1579 buffer = buffer[pos + 2..].to_string();
1580
1581 if let Some(data_line) = event_str.lines().find(|l| l.starts_with("data:")) {
1582 let data = data_line.trim_start_matches("data:").trim();
1583 if data == "[DONE]" || data.is_empty() {
1584 continue;
1585 }
1586
1587 if let Ok(task) = serde_json::from_str::<serde_json::Value>(data) {
1588 spawn_task_handler(&task, runtime).await;
1589 }
1590 }
1591 }
1592 }
1593 Some(Err(e)) => {
1594 return Ok(StreamDisconnectReason::ReadError(e.to_string()));
1595 }
1596 None => {
1597 return Ok(StreamDisconnectReason::Ended);
1599 }
1600 }
1601 }
1602 _ = poll_interval.tick() => {
1603 if let Err(e) = poll_pending_tasks(runtime).await {
1605 tracing::warn!("Periodic task poll failed: {}", e);
1606 }
1607 }
1608 }
1609 }
1610}
1611
1612async fn spawn_task_handler(task: &serde_json::Value, runtime: &WorkerTaskRuntime) {
1613 if let Some(id) = task
1614 .get("task")
1615 .and_then(|t| t["id"].as_str())
1616 .or_else(|| task["id"].as_str())
1617 {
1618 if let Err(reason) = check_task_scope(
1620 task,
1621 &runtime.worker_id,
1622 &runtime.agent_name,
1623 &runtime.workspace_ids,
1624 ) {
1625 tracing::debug!(
1626 task_id = id,
1627 reason = %reason,
1628 "SSE task skipped — out of scope"
1629 );
1630 return;
1631 }
1632
1633 match reserve_task_slot(&runtime.processing, id, runtime.max_concurrent_tasks).await {
1634 TaskReservation::Reserved => {
1635 let task_id = id.to_string();
1636 let task = task.clone();
1637 let runtime = runtime.clone();
1638
1639 tokio::spawn(async move {
1640 if let Err(e) = handle_task(&runtime, &task).await {
1641 tracing::error!("Task {} failed: {}", task_id, e);
1642 }
1643 runtime.processing.lock().await.remove(&task_id);
1644 });
1645 }
1646 TaskReservation::AlreadyProcessing => {}
1647 TaskReservation::AtCapacity => {
1648 tracing::debug!(
1649 task_id = id,
1650 max_concurrent_tasks = runtime.max_concurrent_tasks,
1651 "Worker is at task capacity; task will stay pending until a slot frees up"
1652 );
1653 }
1654 }
1655 }
1656}
1657
1658async fn poll_pending_tasks(runtime: &WorkerTaskRuntime) -> Result<()> {
1659 let mut url = format!("{}/v1/agent/tasks?status=pending", runtime.server);
1660 if !runtime.agent_name.is_empty() {
1661 url = format!(
1662 "{}&agent_name={}",
1663 url,
1664 urlencoding::encode(&runtime.agent_name)
1665 );
1666 }
1667 let mut req = runtime.client.get(&url);
1668 if let Some(t) = &runtime.token {
1669 req = req.bearer_auth(t);
1670 }
1671
1672 let res = req.send().await?;
1673 if !res.status().is_success() {
1674 return Ok(());
1675 }
1676
1677 let data: serde_json::Value = res.json().await?;
1678 let tasks = if let Some(arr) = data.as_array() {
1679 arr.clone()
1680 } else {
1681 data["tasks"].as_array().cloned().unwrap_or_default()
1682 };
1683
1684 if !tasks.is_empty() {
1685 tracing::debug!("Poll found {} pending task(s)", tasks.len());
1686 }
1687
1688 for task in &tasks {
1689 let task_id_str = task_str(task, "id").unwrap_or("?");
1691 if let Err(reason) = check_task_scope(
1692 task,
1693 &runtime.worker_id,
1694 &runtime.agent_name,
1695 &runtime.workspace_ids,
1696 ) {
1697 tracing::debug!(
1698 task_id = task_id_str,
1699 reason = %reason,
1700 "Poll task skipped — out of scope"
1701 );
1702 continue;
1703 }
1704 spawn_task_handler(task, runtime).await;
1705 }
1706
1707 Ok(())
1708}
1709
1710async fn handle_task(runtime: &WorkerTaskRuntime, task: &serde_json::Value) -> Result<()> {
1711 let task_id = task_str(task, "id").ok_or_else(|| anyhow::anyhow!("No task ID"))?;
1712 let title = task_str(task, "title").unwrap_or("Untitled");
1713
1714 let metadata = task_metadata(task);
1716 let timeout_secs = task_timeout_secs(task, &metadata);
1717
1718 let mut timeline = task_timeline::TaskTimeline::new(task_id, timeout_secs);
1719 timeline.checkpoint(task_timeline::TaskCheckpoint::TaskReceived);
1720 tracing::info!(
1721 "Handling task: {} ({}) [timeout={}s]",
1722 title,
1723 task_id,
1724 timeout_secs
1725 );
1726
1727 sync_timeline_to_runtime(&timeline, runtime).await;
1730
1731 timeline.checkpoint(task_timeline::TaskCheckpoint::ClaimRequested);
1733 let mut req = runtime
1734 .client
1735 .post(format!("{}/v1/worker/tasks/claim", runtime.server))
1736 .header("X-Worker-ID", &runtime.worker_id);
1737 if let Some(t) = &runtime.token {
1738 req = req.bearer_auth(t);
1739 }
1740
1741 let res = req
1742 .json(&serde_json::json!({ "task_id": task_id }))
1743 .send()
1744 .await?;
1745
1746 if !res.status().is_success() {
1747 let status = res.status();
1748 let text = res.text().await?;
1749 if status == reqwest::StatusCode::CONFLICT {
1750 tracing::debug!(task_id, "Task already claimed by another worker, skipping");
1751 } else {
1752 tracing::warn!(task_id, %status, "Failed to claim task: {}", text);
1753 }
1754 return Ok(());
1755 }
1756
1757 let claim = res.json::<TaskClaimResponse>().await?;
1758 if let Some(claim_timeout_secs) = claim
1759 .task_timeout_seconds
1760 .map(|timeout_secs| timeout_secs.clamp(60, 604800))
1761 {
1762 timeline.update_timeout_secs(claim_timeout_secs);
1763 }
1764 let provider_keys = claim.provider_keys.clone();
1765 let claim_provenance = claim.into_provenance();
1766 timeline.checkpoint_with_detail(
1767 task_timeline::TaskCheckpoint::Claimed,
1768 Some(format!(
1769 "run_id={:?} attempt_id={:?}",
1770 claim_provenance.run_id, claim_provenance.attempt_id
1771 )),
1772 );
1773 tracing::info!(
1774 task_id,
1775 run_id = ?claim_provenance.run_id,
1776 attempt_id = ?claim_provenance.attempt_id,
1777 "Claimed task"
1778 );
1779
1780 let inner_result: Result<(&str, Option<String>, Option<String>, Option<String>)> =
1782 execute_claimed_task(
1783 runtime,
1784 task,
1785 task_id,
1786 title,
1787 &claim_provenance,
1788 provider_keys,
1789 &mut timeline,
1790 )
1791 .await;
1792
1793 let (status, result, error, session_id) = match inner_result {
1794 Ok(tuple) => tuple,
1795 Err(e) => {
1796 tracing::error!(
1797 task_id,
1798 error = %e,
1799 "Task failed after claim (releasing as failed)"
1800 );
1801 timeline.checkpoint_with_detail(
1802 task_timeline::TaskCheckpoint::Failed,
1803 Some(format!("{}", e)),
1804 );
1805 (
1806 "failed",
1807 None,
1808 Some(format!("Worker error after claim: {}", e)),
1809 None,
1810 )
1811 }
1812 };
1813
1814 timeline.emit_diagnostics();
1816 let diagnostics_json = timeline.diagnostics_json();
1817
1818 timeline.checkpoint(task_timeline::TaskCheckpoint::Releasing);
1819 release_task_result(
1820 &runtime.client,
1821 &runtime.server,
1822 &runtime.token,
1823 &runtime.worker_id,
1824 task_id,
1825 status,
1826 result,
1827 error,
1828 session_id,
1829 Some(diagnostics_json),
1830 )
1831 .await?;
1832
1833 timeline.checkpoint(task_timeline::TaskCheckpoint::Released);
1834
1835 runtime.task_progress.lock().await.clear();
1837
1838 tracing::info!("Task released: {} with status: {}", task_id, status);
1839 Ok(())
1840}
1841
1842#[allow(clippy::too_many_lines)]
1845async fn execute_claimed_task<'a>(
1846 runtime: &WorkerTaskRuntime,
1847 task: &'a serde_json::Value,
1848 task_id: &'a str,
1849 title: &'a str,
1850 claim_provenance: &ClaimProvenance,
1851 provider_keys: Option<serde_json::Value>,
1852 timeline: &mut task_timeline::TaskTimeline,
1853) -> Result<(&'static str, Option<String>, Option<String>, Option<String>)> {
1854 let metadata = task_metadata(task);
1855 timeline.checkpoint(task_timeline::TaskCheckpoint::MetadataParsed);
1856 let resume_session_id = metadata
1857 .get("resume_session_id")
1858 .and_then(|v| v.as_str())
1859 .map(|s| s.trim().to_string())
1860 .filter(|s| !s.is_empty());
1861 let complexity_hint = metadata_str(&metadata, &["complexity"]);
1862 let model_tier = metadata_str(&metadata, &["model_tier", "tier"])
1863 .map(|s| s.to_ascii_lowercase())
1864 .or_else(|| {
1865 complexity_hint.as_ref().map(|complexity| {
1866 match complexity.to_ascii_lowercase().as_str() {
1867 "quick" => "fast".to_string(),
1868 "deep" => "heavy".to_string(),
1869 _ => "balanced".to_string(),
1870 }
1871 })
1872 });
1873 let worker_personality = metadata_str(
1874 &metadata,
1875 &["worker_personality", "personality", "agent_personality"],
1876 );
1877 let target_agent_name = metadata_str(&metadata, &["target_agent_name", "agent_name"]);
1878 let raw_model = task_str(task, "model_ref")
1879 .or_else(|| metadata_lookup(&metadata, "model_ref").and_then(|v| v.as_str()))
1880 .or_else(|| task_str(task, "model"))
1881 .or_else(|| metadata_lookup(&metadata, "model").and_then(|v| v.as_str()));
1882 let selected_model = raw_model.map(model_ref_to_provider_model);
1883 let raw_agent = task_str(task, "agent_type")
1884 .or_else(|| task_str(task, "agent"))
1885 .unwrap_or("build");
1886 let prompt = task_str(task, "prompt")
1887 .or_else(|| task_str(task, "description"))
1888 .unwrap_or(title);
1889
1890 let workspace_id = task_str(task, "workspace_id")
1892 .or_else(|| metadata_lookup(&metadata, "workspace_id").and_then(|v| v.as_str()));
1893 let is_virtual_task = workspace_id.map_or(false, |ws| ws == "global" || ws.is_empty());
1894
1895 if raw_agent.eq_ignore_ascii_case("clone_repo") {
1896 return match handle_clone_repo_task(
1897 &runtime.client,
1898 &runtime.server,
1899 &runtime.token,
1900 &runtime.worker_id,
1901 task,
1902 &metadata,
1903 )
1904 .await
1905 {
1906 Ok(message) => Ok(("completed", Some(message), None, None)),
1907 Err(err) => {
1908 tracing::error!(task_id, error = %err, "Clone task failed");
1909 Ok(("failed", None, Some(format!("Error: {}", err)), None))
1910 }
1911 };
1912 }
1913
1914 if is_forage_agent(raw_agent) {
1915 return match handle_forage_task(title, prompt, &metadata, selected_model.clone()).await {
1916 Ok(message) => Ok(("completed", Some(message), None, None)),
1917 Err(err) => {
1918 tracing::error!(task_id, error = %err, "Forage task failed");
1919 Ok(("failed", None, Some(format!("Error: {}", err)), None))
1920 }
1921 };
1922 }
1923
1924 let mut session = if let Some(ref sid) = resume_session_id {
1926 match Session::load(sid).await {
1927 Ok(existing) => {
1928 tracing::info!("Resuming session {} for task {}", sid, task_id);
1929 existing
1930 }
1931 Err(e) => {
1932 tracing::warn!(
1933 "Could not load session {} for task {} ({}), starting a new session",
1934 sid,
1935 task_id,
1936 e
1937 );
1938 Session::new().await?
1939 }
1940 }
1941 } else {
1942 Session::new().await?
1943 };
1944 timeline.checkpoint_with_detail(
1945 task_timeline::TaskCheckpoint::SessionReady,
1946 Some(format!("session_id={}", session.id)),
1947 );
1948
1949 if !is_virtual_task {
1950 if let Some(workspace_path) = resolve_task_workspace_dir(
1951 &runtime.client,
1952 &runtime.server,
1953 &runtime.token,
1954 workspace_id,
1955 )
1956 .await?
1957 {
1958 session.metadata.directory = Some(workspace_path);
1959 }
1960 timeline.checkpoint_with_detail(
1961 task_timeline::TaskCheckpoint::WorkspaceReady,
1962 session
1963 .metadata
1964 .directory
1965 .as_deref()
1966 .map(|d| d.display().to_string()),
1967 );
1968 }
1969
1970 if is_virtual_task {
1973 tracing::info!(
1974 task_id,
1975 workspace_id = workspace_id.unwrap_or("(none)"),
1976 "Virtual task detected — skipping workspace setup"
1977 );
1978 session.metadata.directory = None;
1979 }
1980
1981 let agent_type = if is_swarm_agent(raw_agent) {
1984 raw_agent
1985 } else {
1986 match raw_agent {
1987 "build" | "plan" => raw_agent,
1988 other => {
1989 tracing::info!(
1990 "Agent \"{}\" is not a primary agent, falling back to \"build\"",
1991 other
1992 );
1993 "build"
1994 }
1995 }
1996 };
1997 session.set_agent_name(agent_type.to_string());
1998 session.attach_claim_provenance(claim_provenance);
1999 session.metadata.provider_keys = provider_keys;
2000
2001 if !is_virtual_task {
2003 if let (Some(directory), Some(workspace_id)) =
2004 (session.metadata.directory.as_deref(), workspace_id)
2005 && directory.join(".git").exists()
2006 && let Err(err) = configure_repo_git_auth(directory, workspace_id)
2007 {
2008 tracing::warn!(task_id, error = %err, "Failed to configure Git credential helper");
2009 }
2010 if let Some(directory) = session.metadata.directory.as_deref()
2011 && let Err(err) = install_commit_msg_hook(directory)
2012 {
2013 tracing::warn!(task_id, error = %err, "Failed to install commit-msg hook");
2014 }
2015 timeline.checkpoint(task_timeline::TaskCheckpoint::GitHookInstalled);
2016 }
2017
2018 if let Some(model) = selected_model.clone() {
2019 session.metadata.model = Some(model);
2020 }
2021 timeline.checkpoint_with_detail(
2022 task_timeline::TaskCheckpoint::ModelSelected,
2023 session.metadata.model.clone(),
2024 );
2025
2026 tracing::info!(task_id, agent_type, "Executing prompt: {}", prompt);
2027 timeline.checkpoint_with_detail(
2028 task_timeline::TaskCheckpoint::AgentStarting,
2029 Some(format!(
2030 "agent={} model={:?}",
2031 agent_type, session.metadata.model
2032 )),
2033 );
2034 sync_timeline_to_runtime(&timeline, runtime).await;
2035
2036 let stream_client = runtime.client.clone();
2038 let stream_server = runtime.server.clone();
2039 let stream_token = runtime.token.clone();
2040 let stream_worker_id = runtime.worker_id.clone();
2041 let stream_task_id = task_id.to_string();
2042 let stream_bus = Arc::clone(&runtime.bus);
2043
2044 let output_callback: Arc<dyn Fn(String) + Send + Sync + 'static> =
2045 Arc::new(move |output: String| {
2046 let c = stream_client.clone();
2047 let s = stream_server.clone();
2048 let t = stream_token.clone();
2049 let w = stream_worker_id.clone();
2050 let tid = stream_task_id.clone();
2051
2052 let bus_handle = stream_bus.handle("task-output");
2053 bus_handle.send(
2054 format!("task.{}", tid),
2055 crate::bus::BusMessage::TaskUpdate {
2056 task_id: tid.clone(),
2057 state: crate::a2a::types::TaskState::Working,
2058 message: Some(output.clone()),
2059 },
2060 );
2061
2062 tokio::spawn(async move {
2063 let mut req = c
2064 .post(format!("{}/v1/agent/tasks/{}/output", s, tid))
2065 .header("X-Worker-ID", &w);
2066 if let Some(tok) = &t {
2067 req = req.bearer_auth(tok);
2068 }
2069 let _ = req
2070 .json(&serde_json::json!({
2071 "worker_id": w,
2072 "output": output,
2073 }))
2074 .send()
2075 .await;
2076 });
2077 });
2078
2079 let (status, result, error, session_id) = if is_swarm_agent(agent_type) {
2081 match execute_swarm_with_policy(
2082 &mut session,
2083 prompt,
2084 model_tier.as_deref(),
2085 selected_model,
2086 &metadata,
2087 complexity_hint.as_deref(),
2088 worker_personality.as_deref(),
2089 target_agent_name.as_deref(),
2090 Some(&runtime.bus),
2091 Some(Arc::clone(&output_callback)),
2092 )
2093 .await
2094 {
2095 Ok((session_result, true)) => {
2096 tracing::info!("Swarm task completed successfully: {}", task_id);
2097 (
2098 "completed",
2099 Some(session_result.text),
2100 None,
2101 Some(session_result.session_id),
2102 )
2103 }
2104 Ok((session_result, false)) => {
2105 tracing::warn!("Swarm task completed with failures: {}", task_id);
2106 (
2107 "failed",
2108 Some(session_result.text),
2109 Some("Swarm execution completed with failures".to_string()),
2110 Some(session_result.session_id),
2111 )
2112 }
2113 Err(e) => {
2114 tracing::error!("Swarm task failed: {} - {}", task_id, e);
2115 ("failed", None, Some(format!("Error: {}", e)), None)
2116 }
2117 }
2118 } else {
2119 match execute_session_with_policy(
2120 &mut session,
2121 prompt,
2122 runtime.auto_approve,
2123 model_tier.as_deref(),
2124 Some(Arc::clone(&output_callback)),
2125 )
2126 .await
2127 {
2128 Ok(session_result) => {
2129 tracing::info!("Task completed successfully: {}", task_id);
2130 (
2131 "completed",
2132 Some(session_result.text),
2133 None,
2134 Some(session_result.session_id),
2135 )
2136 }
2137 Err(e) => {
2138 tracing::error!(task_id, error = %e, "Task execution failed");
2139 ("failed", None, Some(format!("Error: {}", e)), None)
2140 }
2141 }
2142 };
2143
2144 timeline.checkpoint_with_detail(
2146 task_timeline::TaskCheckpoint::AgentDone,
2147 Some(format!("status={}", status)),
2148 );
2149 sync_timeline_to_runtime(&timeline, runtime).await;
2150
2151 if timeline.is_near_deadline() {
2153 tracing::warn!(
2154 task_id,
2155 elapsed_secs = format!("{:.1}", timeline.elapsed_secs()),
2156 budget_pct = format!("{:.1}%", timeline.budget_pct_used()),
2157 "Near deadline after agent completion — will attempt graceful shutdown"
2158 );
2159 timeline.checkpoint(task_timeline::TaskCheckpoint::GracefulShutdown);
2160 }
2161
2162 let mut status = status;
2163 let mut result = result;
2164 let mut error = error;
2165
2166 if status == "completed"
2167 && !is_virtual_task
2168 && let Some(directory) = session.metadata.directory.as_deref()
2169 {
2170 match commit_and_push_task_changes(
2171 directory,
2172 task_id,
2173 session.metadata.provenance.as_ref(),
2174 timeline,
2175 )
2176 .await
2177 {
2178 Ok(Some(summary)) => {
2179 result = Some(match result {
2180 Some(existing) if !existing.trim().is_empty() => {
2181 format!("{existing}\n\n{summary}")
2182 }
2183 _ => summary,
2184 });
2185 }
2186 Ok(None) => {}
2187 Err(err) => {
2188 tracing::error!(task_id, error = %err, "Failed to commit and push task changes");
2189 status = "failed";
2190 error = Some(format!("Post-agent git commit/push failed: {err}"));
2191 }
2192 }
2193 }
2194
2195 timeline.sync_progress().await;
2197
2198 Ok((
2199 status,
2200 result,
2201 error,
2202 Some(session_id.unwrap_or_else(|| session.id.clone())),
2203 ))
2204}
2205
2206async fn commit_and_push_task_changes(
2207 repo_path: &Path,
2208 task_id: &str,
2209 provenance: Option<&crate::provenance::ExecutionProvenance>,
2210 timeline: &mut task_timeline::TaskTimeline,
2211) -> Result<Option<String>> {
2212 if !repo_path.join(".git").exists() {
2213 tracing::debug!(repo_path = %repo_path.display(), "Skipping post-agent commit: not a git repo");
2214 return Ok(None);
2215 }
2216
2217 let status = run_git_command_at(
2218 Some(repo_path),
2219 vec!["status".to_string(), "--porcelain".to_string()],
2220 )
2221 .await?;
2222 if status.trim().is_empty() {
2223 tracing::info!(task_id, repo_path = %repo_path.display(), "No changes to commit after task");
2224 return Ok(None);
2225 }
2226
2227 timeline.checkpoint(task_timeline::TaskCheckpoint::CommitStaging);
2228 run_git_command_at(
2229 Some(repo_path),
2230 vec!["add".to_string(), "--all".to_string()],
2231 )
2232 .await?;
2233
2234 let staged_status = run_git_command_at(
2235 Some(repo_path),
2236 vec!["status".to_string(), "--porcelain".to_string()],
2237 )
2238 .await?;
2239 if staged_status.trim().is_empty() {
2240 return Ok(None);
2241 }
2242
2243 let commit_message = format!("CodeTether task {task_id}");
2244 let commit_output = git_commit_with_provenance(repo_path, &commit_message, provenance).await?;
2245 if !commit_output.status.success() {
2246 anyhow::bail!(
2247 "git commit failed: {}",
2248 String::from_utf8_lossy(&commit_output.stderr).trim()
2249 );
2250 }
2251 timeline.checkpoint(task_timeline::TaskCheckpoint::CommitCreated);
2252
2253 let commit_sha = run_git_command_at(
2254 Some(repo_path),
2255 vec!["rev-parse".to_string(), "HEAD".to_string()],
2256 )
2257 .await
2258 .unwrap_or_default();
2259 let branch = run_git_command_at(
2260 Some(repo_path),
2261 vec![
2262 "rev-parse".to_string(),
2263 "--abbrev-ref".to_string(),
2264 "HEAD".to_string(),
2265 ],
2266 )
2267 .await
2268 .unwrap_or_else(|_| "HEAD".to_string());
2269
2270 timeline.checkpoint(task_timeline::TaskCheckpoint::CommitPushing);
2271 run_git_command_at(
2272 Some(repo_path),
2273 vec![
2274 "push".to_string(),
2275 "origin".to_string(),
2276 format!("HEAD:{branch}"),
2277 ],
2278 )
2279 .await?;
2280 timeline.checkpoint(task_timeline::TaskCheckpoint::CommitPushed);
2281
2282 Ok(Some(format!(
2283 "Committed and pushed task changes: {} on {}",
2284 commit_sha.trim(),
2285 branch.trim()
2286 )))
2287}
2288
2289async fn handle_forage_task(
2290 title: &str,
2291 prompt: &str,
2292 metadata: &serde_json::Map<String, serde_json::Value>,
2293 selected_model: Option<String>,
2294) -> Result<String> {
2295 let forage_args = build_forage_args(prompt, title, metadata, selected_model);
2296 let summary = crate::forage::execute_with_summary(forage_args).await?;
2297 Ok(summary.render_text())
2298}
2299
2300async fn handle_clone_repo_task(
2301 client: &Client,
2302 server: &str,
2303 token: &Option<String>,
2304 worker_id: &str,
2305 task: &serde_json::Value,
2306 metadata: &serde_json::Map<String, serde_json::Value>,
2307) -> Result<String> {
2308 let workspace_id = metadata_str(metadata, &["workspace_id"])
2309 .or_else(|| task_str(task, "workspace_id").map(|value| value.to_string()))
2310 .ok_or_else(|| anyhow::anyhow!("Clone task is missing workspace_id metadata"))?;
2311 let workspace = fetch_workspace_record(client, server, token, &workspace_id).await?;
2312 let git_url = metadata_str(metadata, &["git_url"])
2313 .or_else(|| workspace.git_url.clone())
2314 .ok_or_else(|| anyhow::anyhow!("Workspace {} is missing git_url", workspace_id))?;
2315 let branch = metadata_str(metadata, &["git_branch"])
2316 .or_else(|| workspace.git_branch.clone())
2317 .unwrap_or_else(|| "main".to_string());
2318 let repo_path = resolve_workspace_clone_path(&workspace_id, &workspace.path);
2319
2320 if let Some(parent) = repo_path.parent() {
2321 tokio::fs::create_dir_all(parent)
2322 .await
2323 .with_context(|| format!("Failed to create repo parent {}", parent.display()))?;
2324 }
2325
2326 let temp_helper_path =
2327 git_clone_base_dir().join(format!(".{}-git-credential-helper", workspace_id));
2328 write_git_credential_helper_script(&temp_helper_path, &workspace_id)?;
2329 let clone_lock_path = workspace_clone_lock_path(&workspace_id);
2330 let clone_lock = acquire_workspace_clone_lock(&clone_lock_path).await?;
2331
2332 let clone_result = async {
2333 let git_dir = repo_path.join(".git");
2334 if git_dir.exists() {
2335 configure_repo_git_auth(&repo_path, &workspace_id)?;
2336 configure_repo_git_github_app_from_agent_config(
2337 &repo_path,
2338 Some(&serde_json::Value::Object(workspace.agent_config.clone())),
2339 );
2340 refresh_existing_clone(&repo_path, &branch).await?;
2341 } else {
2342 prepare_clone_target(&repo_path).await?;
2343
2344 run_git_command_at(
2345 None,
2346 vec![
2347 "-c".to_string(),
2348 format!("credential.helper={}", temp_helper_path.display()),
2349 "-c".to_string(),
2350 "credential.useHttpPath=true".to_string(),
2351 "clone".to_string(),
2352 "--single-branch".to_string(),
2353 "--branch".to_string(),
2354 branch.clone(),
2355 git_url.clone(),
2356 repo_path.display().to_string(),
2357 ],
2358 )
2359 .await?;
2360 configure_repo_git_auth(&repo_path, &workspace_id)?;
2361 configure_repo_git_github_app_from_agent_config(
2362 &repo_path,
2363 Some(&serde_json::Value::Object(workspace.agent_config.clone())),
2364 );
2365 }
2366 install_commit_msg_hook(&repo_path)?;
2367
2368 register_cloned_workspace(client, server, token, worker_id, &workspace, &repo_path).await?;
2369 enqueue_post_clone_task(client, server, token, worker_id, &workspace_id, metadata).await?;
2370
2371 Ok::<String, anyhow::Error>(format!(
2372 "Repository ready at {} (branch: {})",
2373 repo_path.display(),
2374 branch
2375 ))
2376 }
2377 .await;
2378
2379 let _ = tokio::fs::remove_file(&temp_helper_path).await;
2380 release_workspace_clone_lock(clone_lock).await;
2381 clone_result
2382}
2383
2384fn workspace_clone_lock_path(workspace_id: &str) -> PathBuf {
2385 let safe_workspace_id: String = workspace_id
2386 .chars()
2387 .map(|ch| {
2388 if ch.is_ascii_alphanumeric() || matches!(ch, '-' | '_') {
2389 ch
2390 } else {
2391 '_'
2392 }
2393 })
2394 .collect();
2395 git_clone_base_dir().join(format!(".{safe_workspace_id}.clone.lock"))
2396}
2397
2398async fn acquire_workspace_clone_lock(lock_path: &Path) -> Result<tokio::fs::File> {
2399 if let Some(parent) = lock_path.parent() {
2400 tokio::fs::create_dir_all(parent).await.with_context(|| {
2401 format!("Failed to create clone lock directory {}", parent.display())
2402 })?;
2403 }
2404
2405 let stale_after = Duration::from_secs(env_u64("CODETETHER_WORKER_CLONE_LOCK_STALE_SECS", 1800));
2406 loop {
2407 match tokio::fs::OpenOptions::new()
2408 .write(true)
2409 .create_new(true)
2410 .open(lock_path)
2411 .await
2412 {
2413 Ok(file) => return Ok(file),
2414 Err(err) if err.kind() == std::io::ErrorKind::AlreadyExists => {
2415 if workspace_clone_lock_is_stale(lock_path, stale_after).await {
2416 tracing::warn!(lock = %lock_path.display(), "Removing stale workspace clone lock");
2417 let _ = tokio::fs::remove_file(lock_path).await;
2418 continue;
2419 }
2420 tokio::time::sleep(Duration::from_millis(250)).await;
2421 }
2422 Err(err) => {
2423 return Err(err).with_context(|| {
2424 format!(
2425 "Failed to acquire workspace clone lock {}",
2426 lock_path.display()
2427 )
2428 });
2429 }
2430 }
2431 }
2432}
2433
2434async fn workspace_clone_lock_is_stale(lock_path: &Path, stale_after: Duration) -> bool {
2435 let Ok(metadata) = tokio::fs::metadata(lock_path).await else {
2436 return false;
2437 };
2438 let Ok(modified) = metadata.modified() else {
2439 return false;
2440 };
2441 SystemTime::now()
2442 .duration_since(modified)
2443 .map(|age| age > stale_after)
2444 .unwrap_or(false)
2445}
2446
2447async fn release_workspace_clone_lock(lock: tokio::fs::File) {
2448 if let Ok(metadata) = lock.metadata().await {
2449 #[cfg(unix)]
2450 {
2451 use std::os::unix::fs::MetadataExt;
2452 let clone_base = git_clone_base_dir();
2453 if let Ok(mut entries) = tokio::fs::read_dir(&clone_base).await {
2454 while let Ok(Some(entry)) = entries.next_entry().await {
2455 if let Ok(entry_metadata) = entry.metadata().await {
2456 if entry_metadata.ino() == metadata.ino()
2457 && entry_metadata.dev() == metadata.dev()
2458 {
2459 drop(lock);
2460 let _ = tokio::fs::remove_file(entry.path()).await;
2461 return;
2462 }
2463 }
2464 }
2465 }
2466 }
2467 }
2468 drop(lock);
2469}
2470
2471async fn refresh_existing_clone(repo_path: &Path, branch: &str) -> Result<()> {
2472 run_git_command_at(
2473 Some(repo_path),
2474 vec![
2475 "fetch".to_string(),
2476 "origin".to_string(),
2477 branch.to_string(),
2478 ],
2479 )
2480 .await?;
2481
2482 stash_dirty_clone_state(repo_path).await?;
2483
2484 let remote_ref = format!("origin/{branch}");
2485 run_git_command_at(
2486 Some(repo_path),
2487 vec![
2488 "checkout".to_string(),
2489 "-B".to_string(),
2490 branch.to_string(),
2491 remote_ref.clone(),
2492 ],
2493 )
2494 .await?;
2495 run_git_command_at(
2496 Some(repo_path),
2497 vec!["reset".to_string(), "--hard".to_string(), remote_ref],
2498 )
2499 .await?;
2500 run_git_command_at(
2501 Some(repo_path),
2502 vec!["clean".to_string(), "-fd".to_string()],
2503 )
2504 .await?;
2505
2506 Ok(())
2507}
2508
2509async fn stash_dirty_clone_state(repo_path: &Path) -> Result<()> {
2510 let status = run_git_command_at(
2511 Some(repo_path),
2512 vec!["status".to_string(), "--porcelain".to_string()],
2513 )
2514 .await?;
2515 if status.trim().is_empty() {
2516 return Ok(());
2517 }
2518
2519 let message = format!(
2520 "codetether auto-save before workspace refresh {}",
2521 chrono::Utc::now().to_rfc3339()
2522 );
2523 tracing::warn!(
2524 repo_path = %repo_path.display(),
2525 "Existing clone has local changes; stashing them before refreshing from origin"
2526 );
2527 run_git_command_at(
2528 Some(repo_path),
2529 vec![
2530 "stash".to_string(),
2531 "push".to_string(),
2532 "--include-untracked".to_string(),
2533 "-m".to_string(),
2534 message,
2535 ],
2536 )
2537 .await?;
2538
2539 Ok(())
2540}
2541
2542async fn register_cloned_workspace(
2543 client: &Client,
2544 server: &str,
2545 token: &Option<String>,
2546 worker_id: &str,
2547 workspace: &RegisteredWorkspaceRecord,
2548 repo_path: &Path,
2549) -> Result<()> {
2550 let mut req = client.post(format!(
2551 "{}/v1/agent/workspaces",
2552 server.trim_end_matches('/')
2553 ));
2554 if let Some(token) = token {
2555 req = req.bearer_auth(token);
2556 }
2557
2558 let response = req
2559 .json(&serde_json::json!({
2560 "workspace_id": workspace.id,
2561 "name": workspace.name,
2562 "path": repo_path.display().to_string(),
2563 "description": workspace.description,
2564 "agent_config": workspace.agent_config,
2565 "git_url": workspace.git_url,
2566 "git_branch": workspace.git_branch,
2567 "worker_id": worker_id,
2568 }))
2569 .send()
2570 .await
2571 .context("Failed to register cloned workspace with server")?;
2572 let status = response.status();
2573 if !status.is_success() {
2574 let body = response.text().await.unwrap_or_default();
2575 anyhow::bail!(
2576 "Failed to register cloned workspace {} ({}): {}",
2577 workspace.id,
2578 status,
2579 body
2580 );
2581 }
2582
2583 Ok(())
2584}
2585
2586async fn enqueue_post_clone_task(
2587 client: &Client,
2588 server: &str,
2589 token: &Option<String>,
2590 worker_id: &str,
2591 workspace_id: &str,
2592 metadata: &serde_json::Map<String, serde_json::Value>,
2593) -> Result<()> {
2594 if !worker_should_enqueue_post_clone_task(metadata) {
2595 return Ok(());
2596 }
2597
2598 let Some(post_clone_task) = metadata.get("post_clone_task") else {
2599 return Ok(());
2600 };
2601 let Some(task) = post_clone_task.as_object() else {
2602 return Ok(());
2603 };
2604 let title = task
2605 .get("title")
2606 .and_then(|value| value.as_str())
2607 .filter(|value| !value.trim().is_empty())
2608 .ok_or_else(|| anyhow::anyhow!("post_clone_task is missing title"))?;
2609 let prompt = task
2610 .get("prompt")
2611 .and_then(|value| value.as_str())
2612 .filter(|value| !value.trim().is_empty())
2613 .ok_or_else(|| anyhow::anyhow!("post_clone_task is missing prompt"))?;
2614 let agent_type = task
2615 .get("agent_type")
2616 .and_then(|value| value.as_str())
2617 .unwrap_or("build");
2618 let mut task_metadata = task
2619 .get("metadata")
2620 .cloned()
2621 .unwrap_or_else(|| serde_json::json!({}));
2622 if let Some(task_metadata_obj) = task_metadata.as_object_mut() {
2623 task_metadata_obj
2624 .entry("target_worker_id".to_string())
2625 .or_insert_with(|| serde_json::Value::String(worker_id.to_string()));
2626 }
2627
2628 let mut req = client.post(format!(
2629 "{}/v1/agent/workspaces/{}/tasks",
2630 server.trim_end_matches('/'),
2631 workspace_id
2632 ));
2633 if let Some(token) = token {
2634 req = req.bearer_auth(token);
2635 }
2636
2637 let response = req
2638 .json(&serde_json::json!({
2639 "title": title,
2640 "prompt": prompt,
2641 "agent_type": agent_type,
2642 "metadata": task_metadata,
2643 }))
2644 .send()
2645 .await
2646 .context("Failed to enqueue post-clone task")?;
2647 let status = response.status();
2648 if !status.is_success() {
2649 let body = response.text().await.unwrap_or_default();
2650 anyhow::bail!("Failed to enqueue post-clone task ({}): {}", status, body);
2651 }
2652 Ok(())
2653}
2654
2655fn worker_should_enqueue_post_clone_task(
2656 metadata: &serde_json::Map<String, serde_json::Value>,
2657) -> bool {
2658 metadata
2659 .get("post_clone_task")
2660 .and_then(|value| value.as_object())
2661 .is_some()
2662}
2663
2664async fn run_git_command_at(current_dir: Option<&Path>, args: Vec<String>) -> Result<String> {
2665 let mut command = Command::new("git");
2666 if let Some(dir) = current_dir {
2667 command.current_dir(dir);
2668 }
2669
2670 let output = command
2671 .args(args.iter().map(String::as_str))
2672 .output()
2673 .await
2674 .context("Failed to execute git command")?;
2675
2676 if output.status.success() {
2677 return Ok(String::from_utf8_lossy(&output.stdout).trim().to_string());
2678 }
2679
2680 Err(anyhow::anyhow!(
2681 "Git command failed: {}",
2682 String::from_utf8_lossy(&output.stderr).trim()
2683 ))
2684}
2685
2686async fn release_task_result(
2687 client: &Client,
2688 server: &str,
2689 token: &Option<String>,
2690 worker_id: &str,
2691 task_id: &str,
2692 status: &str,
2693 result: Option<String>,
2694 error: Option<String>,
2695 session_id: Option<String>,
2696 diagnostics: Option<serde_json::Value>,
2697) -> Result<()> {
2698 let mut req = client
2699 .post(format!("{}/v1/worker/tasks/release", server))
2700 .header("X-Worker-ID", worker_id);
2701 if let Some(token) = token {
2702 req = req.bearer_auth(token);
2703 }
2704
2705 let mut payload = serde_json::json!({
2706 "task_id": task_id,
2707 "status": status,
2708 "result": result,
2709 "error": error,
2710 "session_id": session_id,
2711 });
2712 if let Some(diagnostics) = diagnostics {
2713 if let Some(obj) = payload.as_object_mut() {
2714 obj.insert("diagnostics".to_string(), diagnostics);
2715 }
2716 }
2717
2718 req.json(&payload)
2719 .send()
2720 .await
2721 .context("Failed to release task result")?;
2722
2723 Ok(())
2724}
2725
2726async fn execute_swarm_with_policy(
2727 session: &mut Session,
2728 prompt: &str,
2729 model_tier: Option<&str>,
2730 explicit_model: Option<String>,
2731 metadata: &serde_json::Map<String, serde_json::Value>,
2732 complexity_hint: Option<&str>,
2733 worker_personality: Option<&str>,
2734 target_agent_name: Option<&str>,
2735 bus: Option<&Arc<AgentBus>>,
2736 output_callback: Option<Arc<dyn Fn(String) + Send + Sync + 'static>>,
2737) -> Result<(crate::session::SessionResult, bool)> {
2738 use crate::provider::{ContentPart, Message, Role};
2739
2740 session.add_message(Message {
2741 role: Role::User,
2742 content: vec![ContentPart::Text {
2743 text: prompt.to_string(),
2744 }],
2745 });
2746
2747 if session.title.is_none() {
2748 session.generate_title().await?;
2749 }
2750
2751 let strategy = parse_swarm_strategy(metadata);
2752 let max_subagents = metadata_usize(
2753 metadata,
2754 &["swarm_max_subagents", "max_subagents", "subagents"],
2755 )
2756 .unwrap_or(10)
2757 .clamp(1, 100);
2758 let max_steps_per_subagent = metadata_usize(
2759 metadata,
2760 &[
2761 "swarm_max_steps_per_subagent",
2762 "max_steps_per_subagent",
2763 "max_steps",
2764 ],
2765 )
2766 .unwrap_or(50)
2767 .clamp(1, 200);
2768 let timeout_secs = metadata_u64(metadata, &["swarm_timeout_secs", "timeout_secs", "timeout"])
2769 .unwrap_or(600)
2770 .clamp(30, 3600);
2771 let parallel_enabled =
2772 metadata_bool(metadata, &["swarm_parallel_enabled", "parallel_enabled"]).unwrap_or(true);
2773
2774 let model = resolve_swarm_model(explicit_model, model_tier).await;
2775 if let Some(ref selected_model) = model {
2776 session.metadata.model = Some(selected_model.clone());
2777 }
2778
2779 if let Some(ref cb) = output_callback {
2780 cb(format!(
2781 "[swarm] routing complexity={} tier={} personality={} target_agent={}",
2782 complexity_hint.unwrap_or("standard"),
2783 model_tier.unwrap_or("balanced"),
2784 worker_personality.unwrap_or("auto"),
2785 target_agent_name.unwrap_or("auto")
2786 ));
2787 cb(format!(
2788 "[swarm] config strategy={:?} max_subagents={} max_steps={} timeout={}s tier={}",
2789 strategy,
2790 max_subagents,
2791 max_steps_per_subagent,
2792 timeout_secs,
2793 model_tier.unwrap_or("balanced")
2794 ));
2795 }
2796
2797 let swarm_config = SwarmConfig {
2798 max_subagents,
2799 max_steps_per_subagent,
2800 subagent_timeout_secs: timeout_secs,
2801 parallel_enabled,
2802 model,
2803 working_dir: session
2804 .metadata
2805 .directory
2806 .as_ref()
2807 .map(|p| p.to_string_lossy().to_string()),
2808 ..Default::default()
2809 };
2810
2811 let swarm_result = if output_callback.is_some() {
2812 let (event_tx, mut event_rx) = mpsc::channel(256);
2813 let mut executor = SwarmExecutor::new(swarm_config).with_event_tx(event_tx);
2814 if let Some(bus) = bus {
2815 executor = executor.with_bus(Arc::clone(bus));
2816 }
2817 let prompt_owned = prompt.to_string();
2818 let mut exec_handle =
2819 tokio::spawn(async move { executor.execute(&prompt_owned, strategy).await });
2820
2821 let mut final_result: Option<crate::swarm::SwarmResult> = None;
2822
2823 while final_result.is_none() {
2824 tokio::select! {
2825 maybe_event = event_rx.recv() => {
2826 if let Some(event) = maybe_event
2827 && let Some(ref cb) = output_callback
2828 && let Some(line) = format_swarm_event_for_output(&event) {
2829 cb(line);
2830 }
2831 }
2832 join_result = &mut exec_handle => {
2833 let joined = join_result.map_err(|e| anyhow::anyhow!("Swarm join failure: {}", e))?;
2834 final_result = Some(joined?);
2835 }
2836 }
2837 }
2838
2839 while let Ok(event) = event_rx.try_recv() {
2840 if let Some(ref cb) = output_callback
2841 && let Some(line) = format_swarm_event_for_output(&event)
2842 {
2843 cb(line);
2844 }
2845 }
2846
2847 final_result.ok_or_else(|| anyhow::anyhow!("Swarm execution returned no result"))?
2848 } else {
2849 let mut executor = SwarmExecutor::new(swarm_config);
2850 if let Some(bus) = bus {
2851 executor = executor.with_bus(Arc::clone(bus));
2852 }
2853 executor.execute(prompt, strategy).await?
2854 };
2855
2856 let final_text = if swarm_result.result.trim().is_empty() {
2857 if swarm_result.success {
2858 "Swarm completed without textual output.".to_string()
2859 } else {
2860 "Swarm finished with failures and no textual output.".to_string()
2861 }
2862 } else {
2863 swarm_result.result.clone()
2864 };
2865
2866 session.add_message(Message {
2867 role: Role::Assistant,
2868 content: vec![ContentPart::Text {
2869 text: final_text.clone(),
2870 }],
2871 });
2872 session.save().await?;
2873
2874 Ok((
2875 crate::session::SessionResult {
2876 text: final_text,
2877 session_id: session.id.clone(),
2878 },
2879 swarm_result.success,
2880 ))
2881}
2882
2883async fn execute_session_with_policy(
2886 session: &mut Session,
2887 prompt: &str,
2888 auto_approve: AutoApprove,
2889 model_tier: Option<&str>,
2890 output_callback: Option<Arc<dyn Fn(String) + Send + Sync + 'static>>,
2891) -> Result<crate::session::SessionResult> {
2892 use crate::provider::{ContentPart, Message, ProviderRegistry, Role, parse_model_string};
2893 use std::sync::Arc;
2894
2895 let per_task_keys = session
2896 .metadata
2897 .provider_keys
2898 .as_ref()
2899 .and_then(|value| match crate::provider::PerTaskProviderKeys::from_value(value) {
2900 Ok(keys) => keys,
2901 Err(err) => {
2902 tracing::warn!(error = %err, "Invalid per-task provider key payload; falling back to platform registry");
2903 None
2904 }
2905 });
2906
2907 let registry = if let Some(ref keys) = per_task_keys {
2909 let registry = keys.build_registry();
2910 if registry.list().is_empty() {
2911 tracing::warn!(
2912 "Per-task provider key payload produced no providers; falling back to platform registry"
2913 );
2914 (*ProviderRegistry::shared_from_vault().await.context(
2915 "Failed to load provider registry from Vault — check VAULT_ADDR/VAULT_TOKEN",
2916 )?)
2917 .clone()
2918 } else {
2919 tracing::info!(
2920 source = keys.source(),
2921 providers = ?keys.provider_names(),
2922 "Using tenant-scoped per-task provider registry"
2923 );
2924 registry
2925 }
2926 } else {
2927 (*ProviderRegistry::shared_from_vault().await.context(
2928 "Failed to load provider registry from Vault — check VAULT_ADDR/VAULT_TOKEN",
2929 )?)
2930 .clone()
2931 };
2932 let providers = registry.list();
2933 tracing::info!("Available providers: {:?}", providers);
2934
2935 if providers.is_empty() {
2936 anyhow::bail!(
2937 "No LLM providers available (0 providers loaded). \
2938 Configure API keys in HashiCorp Vault or set environment variables. \
2939 Vault address: {}",
2940 std::env::var("VAULT_ADDR").unwrap_or_else(|_| "(not set)".into())
2941 );
2942 }
2943
2944 let (provider_name, model_id) = if let Some(ref model_str) = session.metadata.model {
2946 let (prov, model) = parse_model_string(model_str);
2947 let prov = prov.map(|p| if p == "zhipuai" { "zai" } else { p });
2948 if prov.is_some() {
2949 (prov.map(|s| s.to_string()), model.to_string())
2950 } else if providers.contains(&model) {
2951 (Some(model.to_string()), String::new())
2952 } else {
2953 (None, model.to_string())
2954 }
2955 } else {
2956 (None, String::new())
2957 };
2958
2959 let provider_slice = providers.as_slice();
2960 if let Some(explicit_provider) = provider_name.as_deref()
2961 && !providers.contains(&explicit_provider)
2962 {
2963 anyhow::bail!(
2964 "Provider '{}' selected explicitly but is unavailable. Available providers: {}",
2965 explicit_provider,
2966 providers.join(", ")
2967 );
2968 }
2969
2970 let selected_provider = provider_name
2972 .as_deref()
2973 .unwrap_or_else(|| choose_provider_for_tier(provider_slice, model_tier));
2974
2975 let provider = registry
2976 .get(selected_provider)
2977 .ok_or_else(|| anyhow::anyhow!("Provider {} not found", selected_provider))?;
2978
2979 session.add_message(Message {
2981 role: Role::User,
2982 content: vec![ContentPart::Text {
2983 text: prompt.to_string(),
2984 }],
2985 });
2986
2987 if session.title.is_none() {
2989 session.generate_title().await?;
2990 }
2991
2992 let model = if !model_id.is_empty() {
2994 model_id
2995 } else {
2996 default_model_for_provider(selected_provider, model_tier)
2997 };
2998
2999 let workspace_dir = session
3001 .metadata
3002 .directory
3003 .clone()
3004 .unwrap_or_else(|| std::env::current_dir().unwrap_or_default());
3005 let tool_registry = create_filtered_registry(
3006 Arc::clone(&provider),
3007 model.clone(),
3008 auto_approve,
3009 &workspace_dir,
3010 output_callback.clone(),
3011 );
3012 let tool_definitions = tool_registry.definitions();
3013
3014 let temperature = if prefers_temperature_one(&model) {
3015 Some(1.0)
3016 } else {
3017 Some(0.7)
3018 };
3019
3020 tracing::info!(
3021 "Using model: {} via provider: {} (tier: {:?})",
3022 model,
3023 selected_provider,
3024 model_tier
3025 );
3026 tracing::info!(
3027 "Available tools: {} (auto_approve: {:?})",
3028 tool_definitions.len(),
3029 auto_approve
3030 );
3031
3032 let system_prompt = crate::agent::builtin::build_system_prompt(&workspace_dir);
3034
3035 let mut final_output = String::new();
3036 let max_steps = 50;
3037
3038 for step in 1..=max_steps {
3039 tracing::info!(step = step, "Agent step starting");
3040
3041 let response = complete_worker_step_with_context_fallback(
3042 Arc::clone(&provider),
3043 session,
3044 &model,
3045 &system_prompt,
3046 &tool_definitions,
3047 temperature,
3048 )
3049 .await?;
3050
3051 crate::telemetry::TOKEN_USAGE.record_model_usage(
3052 &model,
3053 response.usage.prompt_tokens as u64,
3054 response.usage.completion_tokens as u64,
3055 );
3056
3057 let tool_calls: Vec<(String, String, serde_json::Value)> = response
3059 .message
3060 .content
3061 .iter()
3062 .filter_map(|part| {
3063 if let ContentPart::ToolCall {
3064 id,
3065 name,
3066 arguments,
3067 ..
3068 } = part
3069 {
3070 let args: serde_json::Value =
3071 serde_json::from_str(arguments).unwrap_or(serde_json::json!({}));
3072 Some((id.clone(), name.clone(), args))
3073 } else {
3074 None
3075 }
3076 })
3077 .collect();
3078
3079 for part in &response.message.content {
3081 if let ContentPart::Text { text } = part
3082 && !text.is_empty()
3083 {
3084 final_output.push_str(text);
3085 final_output.push('\n');
3086 if let Some(ref cb) = output_callback {
3087 cb(text.clone());
3088 }
3089 }
3090 }
3091
3092 if tool_calls.is_empty() {
3094 session.add_message(response.message.clone());
3095 break;
3096 }
3097
3098 session.add_message(response.message.clone());
3099
3100 tracing::info!(
3101 step = step,
3102 num_tools = tool_calls.len(),
3103 "Executing tool calls"
3104 );
3105
3106 for (tool_id, tool_name, tool_input) in tool_calls {
3108 tracing::info!(tool = %tool_name, tool_id = %tool_id, "Executing tool");
3109
3110 if let Some(ref cb) = output_callback {
3112 cb(format!("[tool:start:{}]", tool_name));
3113 }
3114
3115 if !is_tool_allowed(&tool_name, auto_approve) {
3117 let msg = format!(
3118 "Tool '{}' requires approval but auto-approve policy is {:?}",
3119 tool_name, auto_approve
3120 );
3121 tracing::warn!(tool = %tool_name, "Tool blocked by auto-approve policy");
3122 session.add_message(Message {
3123 role: Role::Tool,
3124 content: vec![ContentPart::ToolResult {
3125 tool_call_id: tool_id,
3126 content: msg,
3127 }],
3128 });
3129 continue;
3130 }
3131
3132 let enriched_tool_input = enrich_tool_input_with_runtime_context(
3133 &tool_input,
3134 &workspace_dir,
3135 Some(&model),
3136 &session.id,
3137 &session.agent,
3138 session.metadata.provenance.as_ref(),
3139 );
3140
3141 let content = if let Some(tool) = tool_registry.get(&tool_name) {
3142 let tool_timeout_secs =
3143 env_u64("CODETETHER_WORKER_TOOL_TIMEOUT_SECS", 120).clamp(1, 3600);
3144 let exec_result: Result<crate::tool::ToolResult> = match tokio::time::timeout(
3145 Duration::from_secs(tool_timeout_secs),
3146 tool.execute(enriched_tool_input.clone()),
3147 )
3148 .await
3149 {
3150 Ok(result) => result,
3151 Err(_) => Ok(crate::tool::ToolResult::structured_error(
3152 "TOOL_TIMEOUT",
3153 &tool_name,
3154 &format!("tool timed out after {tool_timeout_secs}s"),
3155 None,
3156 Some(serde_json::json!({
3157 "hint": "Narrow the request, set a more specific path/include filter, or retry with smaller scope."
3158 })),
3159 )),
3160 };
3161 match exec_result {
3162 Ok(result) => {
3163 tracing::info!(tool = %tool_name, success = result.success, "Tool execution completed");
3164 if let Some(ref cb) = output_callback {
3165 let status = if result.success { "ok" } else { "err" };
3166 cb(format!(
3167 "[tool:{}:{}] {}",
3168 tool_name,
3169 status,
3170 crate::util::truncate_bytes_safe(&result.output, 500)
3171 ));
3172 }
3173 result.output
3174 }
3175 Err(e) => {
3176 tracing::warn!(tool = %tool_name, error = %e, "Tool execution failed");
3177 if let Some(ref cb) = output_callback {
3178 cb(format!("[tool:{}:err] {}", tool_name, e));
3179 }
3180 format!("Error: {}", e)
3181 }
3182 }
3183 } else {
3184 tracing::warn!(tool = %tool_name, "Tool not found");
3185 format!("Error: Unknown tool '{}'", tool_name)
3186 };
3187
3188 session.add_message(Message {
3189 role: Role::Tool,
3190 content: vec![ContentPart::ToolResult {
3191 tool_call_id: tool_id,
3192 content,
3193 }],
3194 });
3195 }
3196 }
3197
3198 session.save().await?;
3199
3200 Ok(crate::session::SessionResult {
3201 text: final_output.trim().to_string(),
3202 session_id: session.id.clone(),
3203 })
3204}
3205
3206async fn complete_worker_step_with_context_fallback(
3207 provider: Arc<dyn crate::provider::Provider>,
3208 session: &Session,
3209 model: &str,
3210 system_prompt: &str,
3211 tool_definitions: &[crate::provider::ToolDefinition],
3212 temperature: Option<f32>,
3213) -> Result<crate::provider::CompletionResponse> {
3214 let options = crate::session::context::RequestOptions {
3215 temperature,
3216 top_p: None,
3217 max_tokens: Some(8192),
3218 force_keep_last: None,
3219 };
3220
3221 match crate::session::context::complete_with_context(
3222 Arc::clone(&provider),
3223 session,
3224 model,
3225 system_prompt,
3226 tool_definitions,
3227 options,
3228 )
3229 .await
3230 {
3231 Ok(response) => Ok(response),
3232 Err(error) if crate::session::helper::error::is_prompt_too_long_error(&error) => {
3233 let compact_tools = compact_worker_tool_definitions(tool_definitions);
3234 tracing::warn!(
3235 error = %error,
3236 tool_count = tool_definitions.len(),
3237 original_tool_schema_bytes = tool_schema_bytes(tool_definitions),
3238 compact_tool_schema_bytes = tool_schema_bytes(&compact_tools),
3239 "Provider rejected prompt after context compaction; retrying with compact tool schemas"
3240 );
3241 crate::session::context::complete_with_context(
3242 provider,
3243 session,
3244 model,
3245 system_prompt,
3246 &compact_tools,
3247 options,
3248 )
3249 .await
3250 .map_err(|retry_error| {
3251 if crate::session::helper::error::is_prompt_too_long_error(&retry_error) {
3252 retry_error.context(
3253 "provider rejected prompt as too long after RLM compaction and compact tool-schema fallback",
3254 )
3255 } else {
3256 retry_error
3257 }
3258 })
3259 }
3260 Err(error) => Err(error),
3261 }
3262}
3263
3264fn compact_worker_tool_definitions(
3265 tools: &[crate::provider::ToolDefinition],
3266) -> Vec<crate::provider::ToolDefinition> {
3267 tools
3268 .iter()
3269 .map(|tool| crate::provider::ToolDefinition {
3270 name: tool.name.clone(),
3271 description: tool.description.clone(),
3272 parameters: serde_json::json!({
3273 "type": "object",
3274 "additionalProperties": true
3275 }),
3276 })
3277 .collect()
3278}
3279
3280fn tool_schema_bytes(tools: &[crate::provider::ToolDefinition]) -> usize {
3281 let mut writer = CountingWriter::default();
3282 serde_json::to_writer(&mut writer, tools)
3283 .map(|_| writer.bytes)
3284 .unwrap_or(0)
3285}
3286
3287#[derive(Default)]
3288struct CountingWriter {
3289 bytes: usize,
3290}
3291
3292impl std::io::Write for CountingWriter {
3293 fn write(&mut self, buf: &[u8]) -> std::io::Result<usize> {
3294 self.bytes += buf.len();
3295 Ok(buf.len())
3296 }
3297
3298 fn flush(&mut self) -> std::io::Result<()> {
3299 Ok(())
3300 }
3301}
3302
3303pub fn start_heartbeat(
3306 client: Client,
3307 server: String,
3308 token: Option<String>,
3309 heartbeat_state: HeartbeatState,
3310 processing: Arc<Mutex<HashSet<String>>>,
3311 cognition_config: CognitionHeartbeatConfig,
3312 task_progress: Arc<Mutex<task_timeline::TaskProgressState>>,
3313) -> JoinHandle<()> {
3314 tokio::spawn(async move {
3315 let mut consecutive_failures = 0u32;
3316 const MAX_FAILURES: u32 = 3;
3317 const HEARTBEAT_INTERVAL_SECS: u64 = 30;
3318 const COGNITION_RETRY_COOLDOWN_SECS: u64 = 300;
3319 let mut cognition_payload_disabled_until: Option<Instant> = None;
3320 let persistent_heartbeat_enabled = persistent_worker_enabled();
3321 let persistent_lease_seconds = persistent_worker_lease_seconds();
3322
3323 let mut interval =
3324 tokio::time::interval(tokio::time::Duration::from_secs(HEARTBEAT_INTERVAL_SECS));
3325 interval.set_missed_tick_behavior(tokio::time::MissedTickBehavior::Skip);
3326
3327 loop {
3328 interval.tick().await;
3329
3330 let active_count = processing.lock().await.len();
3332 heartbeat_state.set_task_count(active_count).await;
3333
3334 let status = if active_count > 0 {
3336 WorkerStatus::Processing
3337 } else {
3338 WorkerStatus::Idle
3339 };
3340 heartbeat_state.set_status(status).await;
3341
3342 let url = format!(
3344 "{}/v1/agent/workers/{}/heartbeat",
3345 server, heartbeat_state.worker_id
3346 );
3347 let mut req = client.post(&url);
3348
3349 if let Some(ref t) = token {
3350 req = req.bearer_auth(t);
3351 }
3352
3353 let status_str = heartbeat_state.status.lock().await.as_str().to_string();
3354 let sub_agents = heartbeat_state.sub_agents_snapshot().await;
3355
3356 let progress_snapshot = task_progress.lock().await.clone();
3358 let task_progress_payload = if progress_snapshot.task_id.is_some() {
3359 Some(serde_json::json!({
3360 "task_id": progress_snapshot.task_id,
3361 "current_checkpoint": progress_snapshot.current_checkpoint,
3362 "elapsed_secs": format!("{:.1}", progress_snapshot.elapsed_secs),
3363 "remaining_secs": format!("{:.1}", progress_snapshot.remaining_secs),
3364 "budget_pct_used": format!("{:.1}%", progress_snapshot.budget_pct_used),
3365 "checkpoints_reached": progress_snapshot.checkpoints_reached.len(),
3366 "last_detail": progress_snapshot.last_detail,
3367 }))
3368 } else {
3369 None
3370 };
3371
3372 let base_payload = serde_json::json!({
3373 "worker_id": &heartbeat_state.worker_id,
3374 "agent_name": &heartbeat_state.agent_name,
3375 "status": status_str,
3376 "active_task_count": active_count,
3377 "sub_agents": sub_agents,
3378 });
3379 let mut payload = base_payload.clone();
3380 let mut included_cognition_payload = false;
3381 let cognition_payload_allowed = cognition_payload_disabled_until
3382 .map(|until| Instant::now() >= until)
3383 .unwrap_or(true);
3384
3385 if cognition_config.enabled
3386 && cognition_payload_allowed
3387 && let Some(cognition_payload) =
3388 fetch_cognition_heartbeat_payload(&client, &cognition_config).await
3389 && let Some(obj) = payload.as_object_mut()
3390 {
3391 obj.insert("cognition".to_string(), cognition_payload);
3392 included_cognition_payload = true;
3393 }
3394
3395 if let Some(progress_json) = task_progress_payload.clone() {
3397 if let Some(obj) = payload.as_object_mut() {
3398 obj.insert("task_progress".to_string(), progress_json);
3399 }
3400 }
3401
3402 match req.json(&payload).send().await {
3403 Ok(res) => {
3404 if res.status().is_success() {
3405 consecutive_failures = 0;
3406 tracing::debug!(
3407 worker_id = %heartbeat_state.worker_id,
3408 status = status_str,
3409 active_tasks = active_count,
3410 "Heartbeat sent successfully"
3411 );
3412 } else if included_cognition_payload && res.status().is_client_error() {
3413 tracing::warn!(
3414 worker_id = %heartbeat_state.worker_id,
3415 status = %res.status(),
3416 "Heartbeat cognition payload rejected, retrying without cognition payload"
3417 );
3418
3419 let mut retry_req = client.post(&url);
3420 if let Some(ref t) = token {
3421 retry_req = retry_req.bearer_auth(t);
3422 }
3423
3424 match retry_req.json(&base_payload).send().await {
3425 Ok(retry_res) if retry_res.status().is_success() => {
3426 cognition_payload_disabled_until = Some(
3427 Instant::now()
3428 + Duration::from_secs(COGNITION_RETRY_COOLDOWN_SECS),
3429 );
3430 consecutive_failures = 0;
3431 tracing::warn!(
3432 worker_id = %heartbeat_state.worker_id,
3433 retry_after_secs = COGNITION_RETRY_COOLDOWN_SECS,
3434 "Paused cognition heartbeat payload after schema rejection"
3435 );
3436 }
3437 Ok(retry_res) => {
3438 consecutive_failures += 1;
3439 tracing::warn!(
3440 worker_id = %heartbeat_state.worker_id,
3441 status = %retry_res.status(),
3442 failures = consecutive_failures,
3443 "Heartbeat failed even after retry without cognition payload"
3444 );
3445 }
3446 Err(e) => {
3447 consecutive_failures += 1;
3448 tracing::warn!(
3449 worker_id = %heartbeat_state.worker_id,
3450 error = %e,
3451 failures = consecutive_failures,
3452 "Heartbeat retry without cognition payload failed"
3453 );
3454 }
3455 }
3456 } else {
3457 consecutive_failures += 1;
3458 tracing::warn!(
3459 worker_id = %heartbeat_state.worker_id,
3460 status = %res.status(),
3461 failures = consecutive_failures,
3462 "Heartbeat failed"
3463 );
3464 }
3465 }
3466 Err(e) => {
3467 consecutive_failures += 1;
3468 tracing::warn!(
3469 worker_id = %heartbeat_state.worker_id,
3470 error = %e,
3471 failures = consecutive_failures,
3472 "Heartbeat request failed"
3473 );
3474 }
3475 }
3476
3477 if persistent_heartbeat_enabled
3478 && let Some(progress_json) = task_progress_payload
3479 && let Err(e) = send_extended_task_heartbeat(
3480 &client,
3481 &server,
3482 &token,
3483 &heartbeat_state.worker_id,
3484 progress_json,
3485 persistent_lease_seconds,
3486 )
3487 .await
3488 {
3489 tracing::warn!(
3490 worker_id = %heartbeat_state.worker_id,
3491 error = %e,
3492 "Extended task heartbeat failed"
3493 );
3494 }
3495
3496 if consecutive_failures >= MAX_FAILURES {
3498 tracing::error!(
3499 worker_id = %heartbeat_state.worker_id,
3500 failures = consecutive_failures,
3501 "Heartbeat failed {} consecutive times - worker will continue running and attempt reconnection via SSE loop",
3502 MAX_FAILURES
3503 );
3504 consecutive_failures = 0;
3506 }
3507 }
3508 })
3509}
3510
3511async fn send_extended_task_heartbeat(
3512 client: &Client,
3513 server: &str,
3514 token: &Option<String>,
3515 worker_id: &str,
3516 progress: serde_json::Value,
3517 lease_seconds: u64,
3518) -> Result<()> {
3519 let task_id = progress
3520 .get("task_id")
3521 .and_then(|value| value.as_str())
3522 .filter(|value| !value.is_empty())
3523 .context("active task progress missing task_id")?
3524 .to_string();
3525 let checkpoint_seq = progress
3526 .get("checkpoints_reached")
3527 .and_then(|value| value.as_u64())
3528 .unwrap_or(1)
3529 .max(1);
3530 let status_message = progress
3531 .get("current_checkpoint")
3532 .and_then(|value| value.as_str())
3533 .map(|value| value.to_string());
3534
3535 let mut req = client.post(format!("{}/v1/worker/tasks/heartbeat-extended", server));
3536 if let Some(token) = token {
3537 req = req.bearer_auth(token);
3538 }
3539
3540 let response = req
3541 .json(&serde_json::json!({
3542 "task_id": &task_id,
3543 "worker_id": worker_id,
3544 "status_message": status_message,
3545 "checkpoint": progress,
3546 "checkpoint_seq": checkpoint_seq,
3547 "lease_extension_seconds": lease_seconds,
3548 }))
3549 .send()
3550 .await?;
3551
3552 let status = response.status();
3553 if !status.is_success() {
3554 let body = response.text().await.unwrap_or_default();
3555 anyhow::bail!("extended heartbeat rejected with {}: {}", status, body);
3556 }
3557
3558 let body = response
3559 .json::<serde_json::Value>()
3560 .await
3561 .unwrap_or_default();
3562 if body.get("success").and_then(|value| value.as_bool()) == Some(false) {
3563 tracing::debug!(
3564 task_id = %task_id,
3565 message = body
3566 .get("message")
3567 .and_then(|value| value.as_str())
3568 .unwrap_or("no active run"),
3569 "Extended task heartbeat skipped"
3570 );
3571 return Ok(());
3572 }
3573
3574 Ok(())
3575}
3576
3577async fn fetch_cognition_heartbeat_payload(
3578 client: &Client,
3579 config: &CognitionHeartbeatConfig,
3580) -> Option<serde_json::Value> {
3581 let status_url = format!("{}/v1/cognition/status", config.source_base_url);
3582 let status_res = tokio::time::timeout(
3583 Duration::from_millis(config.request_timeout_ms),
3584 client.get(status_url).send(),
3585 )
3586 .await
3587 .ok()?
3588 .ok()?;
3589
3590 if !status_res.status().is_success() {
3591 return None;
3592 }
3593
3594 let status: CognitionStatusSnapshot = status_res.json().await.ok()?;
3595 let mut payload = serde_json::json!({
3596 "running": status.running,
3597 "last_tick_at": status.last_tick_at,
3598 "active_persona_count": status.active_persona_count,
3599 "events_buffered": status.events_buffered,
3600 "snapshots_buffered": status.snapshots_buffered,
3601 "loop_interval_ms": status.loop_interval_ms,
3602 });
3603
3604 if config.include_thought_summary {
3605 let snapshot_url = format!("{}/v1/cognition/snapshots/latest", config.source_base_url);
3606 let snapshot_res = tokio::time::timeout(
3607 Duration::from_millis(config.request_timeout_ms),
3608 client.get(snapshot_url).send(),
3609 )
3610 .await
3611 .ok()
3612 .and_then(Result::ok);
3613
3614 if let Some(snapshot_res) = snapshot_res
3615 && snapshot_res.status().is_success()
3616 && let Ok(snapshot) = snapshot_res.json::<CognitionLatestSnapshot>().await
3617 && let Some(obj) = payload.as_object_mut()
3618 {
3619 obj.insert(
3620 "latest_snapshot_at".to_string(),
3621 serde_json::Value::String(snapshot.generated_at),
3622 );
3623 obj.insert(
3624 "latest_thought".to_string(),
3625 serde_json::Value::String(trim_for_heartbeat(
3626 &snapshot.summary,
3627 config.summary_max_chars,
3628 )),
3629 );
3630 if let Some(model) = snapshot
3631 .metadata
3632 .get("model")
3633 .and_then(serde_json::Value::as_str)
3634 {
3635 obj.insert(
3636 "latest_thought_model".to_string(),
3637 serde_json::Value::String(model.to_string()),
3638 );
3639 }
3640 if let Some(source) = snapshot
3641 .metadata
3642 .get("source")
3643 .and_then(serde_json::Value::as_str)
3644 {
3645 obj.insert(
3646 "latest_thought_source".to_string(),
3647 serde_json::Value::String(source.to_string()),
3648 );
3649 }
3650 }
3651 }
3652
3653 Some(payload)
3654}
3655
3656fn trim_for_heartbeat(input: &str, max_chars: usize) -> String {
3657 if input.chars().count() <= max_chars {
3658 return input.trim().to_string();
3659 }
3660
3661 let mut trimmed = String::with_capacity(max_chars + 3);
3662 for ch in input.chars().take(max_chars) {
3663 trimmed.push(ch);
3664 }
3665 trimmed.push_str("...");
3666 trimmed.trim().to_string()
3667}
3668
3669fn env_bool(name: &str, default: bool) -> bool {
3670 std::env::var(name)
3671 .ok()
3672 .and_then(|v| match v.to_ascii_lowercase().as_str() {
3673 "1" | "true" | "yes" | "on" => Some(true),
3674 "0" | "false" | "no" | "off" => Some(false),
3675 _ => None,
3676 })
3677 .unwrap_or(default)
3678}
3679
3680fn env_usize(name: &str, default: usize) -> usize {
3681 std::env::var(name)
3682 .ok()
3683 .and_then(|v| v.parse::<usize>().ok())
3684 .unwrap_or(default)
3685}
3686
3687fn env_u64(name: &str, default: u64) -> u64 {
3688 std::env::var(name)
3689 .ok()
3690 .and_then(|v| v.parse::<u64>().ok())
3691 .unwrap_or(default)
3692}
3693
3694fn maybe_configure_repo_git_auth_from_entry(entry: &serde_json::Value) {
3695 let Some(workspace_id) = entry.get("id").and_then(|v| v.as_str()) else {
3696 return;
3697 };
3698 let Some(path) = entry.get("path").and_then(|v| v.as_str()) else {
3699 return;
3700 };
3701 let has_git_remote = entry
3702 .get("git_url")
3703 .and_then(|v| v.as_str())
3704 .is_some_and(|value| !value.trim().is_empty());
3705 if !has_git_remote {
3706 return;
3707 }
3708
3709 let repo_path = Path::new(path);
3710 if !repo_path.join(".git").exists() {
3711 return;
3712 }
3713
3714 if let Err(error) = configure_repo_git_auth(repo_path, workspace_id) {
3715 tracing::debug!(
3716 workspace_id,
3717 path,
3718 error = %error,
3719 "Workspace sync could not install Git credential helper"
3720 );
3721 }
3722 configure_repo_git_github_app_from_agent_config(repo_path, entry.get("agent_config"));
3723}
3724
3725fn configure_repo_git_github_app_from_agent_config(
3726 repo_path: &Path,
3727 agent_config: Option<&serde_json::Value>,
3728) {
3729 let installation_id = agent_config
3730 .and_then(|value| value.get("git_auth"))
3731 .and_then(|value| value.get("github_app"))
3732 .and_then(|value| value.get("installation_id"))
3733 .and_then(|value| value.as_str());
3734 let app_id = agent_config
3735 .and_then(|value| value.get("git_auth"))
3736 .and_then(|value| value.get("github_app"))
3737 .and_then(|value| value.get("app_id"))
3738 .and_then(|value| value.as_str());
3739 if let Err(error) = configure_repo_git_github_app(repo_path, installation_id, app_id) {
3740 tracing::debug!(path = %repo_path.display(), error = %error, "Failed to persist GitHub App repo metadata");
3741 }
3742}
3743
3744fn start_workspace_sync(
3748 client: Client,
3749 server: String,
3750 token: Option<String>,
3751 shared_codebases: Arc<Mutex<Vec<String>>>,
3752) -> JoinHandle<()> {
3753 tokio::spawn(async move {
3754 const POLL_INTERVAL_SECS: u64 = 60;
3755 let mut interval = tokio::time::interval(Duration::from_secs(POLL_INTERVAL_SECS));
3756 interval.set_missed_tick_behavior(tokio::time::MissedTickBehavior::Skip);
3757 interval.tick().await; loop {
3760 interval.tick().await;
3761 if let Err(e) =
3762 sync_workspaces_from_server(&client, &server, &token, &shared_codebases).await
3763 {
3764 tracing::warn!("Workspace sync failed: {}", e);
3765 }
3766 }
3767 })
3768}
3769
3770async fn sync_workspaces_from_server(
3774 client: &Client,
3775 server: &str,
3776 token: &Option<String>,
3777 shared_codebases: &Arc<Mutex<Vec<String>>>,
3778) -> Result<()> {
3779 let mut req = client.get(format!("{}/v1/agent/workspaces", server));
3780 if let Some(t) = token {
3781 req = req.bearer_auth(t);
3782 }
3783
3784 let res = req.send().await?;
3785 if !res.status().is_success() {
3786 tracing::debug!(
3787 status = %res.status(),
3788 "Workspace sync: server returned non-success, skipping"
3789 );
3790 return Ok(());
3791 }
3792
3793 let data: serde_json::Value = res.json().await?;
3794
3795 let entries = if let Some(arr) = data.as_array() {
3797 arr.clone()
3798 } else {
3799 data["workspaces"]
3800 .as_array()
3801 .or_else(|| data["codebases"].as_array())
3802 .cloned()
3803 .unwrap_or_default()
3804 };
3805
3806 let mut new_paths: Vec<String> = Vec::new();
3807 {
3808 let current = shared_codebases.lock().await;
3809 for entry in &entries {
3810 maybe_configure_repo_git_auth_from_entry(entry);
3811 let path = match entry["path"].as_str().filter(|p| !p.is_empty()) {
3812 Some(p) => p,
3813 None => continue,
3814 };
3815 if std::path::Path::new(path).exists() && !current.iter().any(|c| c.as_str() == path) {
3818 new_paths.push(path.to_string());
3819 }
3820 }
3821 }
3822
3823 if !new_paths.is_empty() {
3824 let mut current = shared_codebases.lock().await;
3825 for path in &new_paths {
3826 tracing::info!(
3827 path = %path,
3828 "Workspace sync: auto-discovered local path, adding to codebases"
3829 );
3830 current.push(path.clone());
3831 }
3832 tracing::info!(
3833 added = new_paths.len(),
3834 total = current.len(),
3835 "Workspace sync complete -- new paths take effect on next reconnect"
3836 );
3837 } else {
3838 tracing::debug!("Workspace sync: no new local paths found");
3839 }
3840
3841 Ok(())
3842}
3843
3844#[cfg(test)]
3845mod tests {
3846 use super::*;
3847 use crate::provider::{
3848 CompletionRequest, CompletionResponse, ContentPart, FinishReason, Message, ModelInfo,
3849 Provider, Role, StreamChunk, ToolDefinition, Usage,
3850 };
3851 use futures::stream::{self, BoxStream};
3852 use serde_json::json;
3853 use std::sync::atomic::{AtomicBool, AtomicUsize, Ordering};
3854
3855 struct ContextErrorUntilCompactProvider {
3856 calls: AtomicUsize,
3857 saw_compact_schema: AtomicBool,
3858 }
3859
3860 impl ContextErrorUntilCompactProvider {
3861 fn new() -> Self {
3862 Self {
3863 calls: AtomicUsize::new(0),
3864 saw_compact_schema: AtomicBool::new(false),
3865 }
3866 }
3867
3868 fn calls(&self) -> usize {
3869 self.calls.load(Ordering::SeqCst)
3870 }
3871
3872 fn saw_compact_schema(&self) -> bool {
3873 self.saw_compact_schema.load(Ordering::SeqCst)
3874 }
3875 }
3876
3877 #[async_trait::async_trait]
3878 impl Provider for ContextErrorUntilCompactProvider {
3879 fn name(&self) -> &str {
3880 "mock"
3881 }
3882
3883 async fn list_models(&self) -> Result<Vec<ModelInfo>> {
3884 Ok(Vec::new())
3885 }
3886
3887 async fn complete(&self, request: CompletionRequest) -> Result<CompletionResponse> {
3888 self.calls.fetch_add(1, Ordering::SeqCst);
3889 let compact = request.tools.iter().all(|tool| {
3890 tool.parameters.get("additionalProperties") == Some(&serde_json::Value::Bool(true))
3891 });
3892 if compact {
3893 self.saw_compact_schema.store(true, Ordering::SeqCst);
3894 return Ok(CompletionResponse {
3895 message: Message {
3896 role: Role::Assistant,
3897 content: vec![ContentPart::Text { text: "ok".into() }],
3898 },
3899 usage: Usage::default(),
3900 finish_reason: FinishReason::Stop,
3901 });
3902 }
3903 anyhow::bail!(
3904 "Your input exceeds the context window of this model. Please adjust your input and try again."
3905 )
3906 }
3907
3908 async fn complete_stream(
3909 &self,
3910 _request: CompletionRequest,
3911 ) -> Result<BoxStream<'static, StreamChunk>> {
3912 Ok(Box::pin(stream::empty()))
3913 }
3914 }
3915
3916 #[test]
3917 fn metadata_lookup_reads_nested_forage_keys() {
3918 let metadata = json!({
3919 "forage": {
3920 "execute": true,
3921 "top": 5,
3922 }
3923 })
3924 .as_object()
3925 .cloned()
3926 .unwrap();
3927
3928 assert_eq!(metadata_bool(&metadata, &["execute"]), Some(true));
3929 assert_eq!(metadata_usize(&metadata, &["top"]), Some(5));
3930 }
3931
3932 #[test]
3933 fn build_forage_args_defaults_prompt_to_moonshot_and_local_mode() {
3934 let metadata = serde_json::Map::new();
3935 let args = build_forage_args(
3936 "Expand enterprise adoption in regulated markets",
3937 "forage task",
3938 &metadata,
3939 Some("openrouter/z-ai/glm-5".to_string()),
3940 );
3941
3942 assert_eq!(
3943 args.moonshots,
3944 vec!["Expand enterprise adoption in regulated markets".to_string()]
3945 );
3946 assert!(args.no_s3);
3947 assert_eq!(args.model.as_deref(), Some("openrouter/z-ai/glm-5"));
3948 assert_eq!(args.execution_engine, "run");
3949 }
3950
3951 #[test]
3952 fn compact_worker_tool_definitions_preserve_tool_identity() {
3953 let tools = vec![ToolDefinition {
3954 name: "large_tool".to_string(),
3955 description: "Large tool schema".to_string(),
3956 parameters: json!({
3957 "type": "object",
3958 "properties": {
3959 "payload": {
3960 "type": "string",
3961 "description": "x".repeat(20_000)
3962 }
3963 }
3964 }),
3965 }];
3966
3967 let compact = compact_worker_tool_definitions(&tools);
3968
3969 assert_eq!(compact[0].name, tools[0].name);
3970 assert_eq!(compact[0].description, tools[0].description);
3971 assert_eq!(
3972 compact[0].parameters.get("additionalProperties"),
3973 Some(&serde_json::Value::Bool(true))
3974 );
3975 assert!(tool_schema_bytes(&compact) < tool_schema_bytes(&tools) / 10);
3976 }
3977
3978 #[tokio::test]
3979 async fn worker_retries_context_window_error_with_compact_tool_schema() {
3980 let provider = Arc::new(ContextErrorUntilCompactProvider::new());
3981 let provider_dyn: Arc<dyn Provider> = provider.clone();
3982 let mut session = Session::new().await.expect("session");
3983 session.add_message(Message {
3984 role: Role::User,
3985 content: vec![ContentPart::Text {
3986 text: "Run cronjob \"rule:Retry Failed Conversion Forwards\".".to_string(),
3987 }],
3988 });
3989 let tools = vec![ToolDefinition {
3990 name: "large_tool".to_string(),
3991 description: "Large tool schema".to_string(),
3992 parameters: json!({
3993 "type": "object",
3994 "properties": {
3995 "payload": {
3996 "type": "string",
3997 "description": "x".repeat(20_000)
3998 }
3999 }
4000 }),
4001 }];
4002
4003 let response = complete_worker_step_with_context_fallback(
4004 provider_dyn,
4005 &session,
4006 "mock-model",
4007 "system",
4008 &tools,
4009 Some(0.7),
4010 )
4011 .await
4012 .expect("compact tool-schema retry should recover");
4013
4014 assert_eq!(
4015 response
4016 .message
4017 .content
4018 .iter()
4019 .filter_map(|part| match part {
4020 ContentPart::Text { text } => Some(text.as_str()),
4021 _ => None,
4022 })
4023 .collect::<String>(),
4024 "ok"
4025 );
4026 assert_eq!(provider.calls(), 5);
4027 assert!(provider.saw_compact_schema());
4028 }
4029
4030 #[test]
4031 fn build_forage_args_honors_nested_forage_configuration() {
4032 let metadata = json!({
4033 "forage": {
4034 "top": 7,
4035 "loop": true,
4036 "max_cycles": 2,
4037 "execute": true,
4038 "no_s3": false,
4039 "moonshots": ["Ship autonomous OKR execution", "Reduce operator toil"],
4040 "moonshot_required": true,
4041 "moonshot_min_alignment": "0.25",
4042 "execution_engine": "swarm",
4043 "run_timeout_secs": 1200,
4044 "fail_fast": true,
4045 "swarm_strategy": "stage",
4046 "swarm_max_subagents": 4,
4047 "swarm_max_steps": 42,
4048 "swarm_subagent_timeout_secs": 180
4049 }
4050 })
4051 .as_object()
4052 .cloned()
4053 .unwrap();
4054
4055 let args = build_forage_args("fallback prompt", "title", &metadata, None);
4056
4057 assert_eq!(args.top, 7);
4058 assert!(args.loop_mode);
4059 assert_eq!(args.max_cycles, 2);
4060 assert!(args.execute);
4061 assert!(!args.no_s3);
4062 assert_eq!(args.moonshots.len(), 2);
4063 assert!(args.moonshot_required);
4064 assert!((args.moonshot_min_alignment - 0.25).abs() < f64::EPSILON);
4065 assert_eq!(args.execution_engine, "swarm");
4066 assert_eq!(args.run_timeout_secs, 1200);
4067 assert!(args.fail_fast);
4068 assert_eq!(args.swarm_strategy, "stage");
4069 assert_eq!(args.swarm_max_subagents, 4);
4070 assert_eq!(args.swarm_max_steps, 42);
4071 assert_eq!(args.swarm_subagent_timeout_secs, 180);
4072 }
4073
4074 #[test]
4075 fn worker_allows_github_app_post_clone_followups() {
4076 let metadata = json!({
4077 "source": "github-app",
4078 "post_clone_task": {
4079 "title": "Work issue #76",
4080 "prompt": "fix it"
4081 }
4082 })
4083 .as_object()
4084 .cloned()
4085 .unwrap();
4086
4087 assert!(worker_should_enqueue_post_clone_task(&metadata));
4088 }
4089
4090 #[test]
4091 fn worker_allows_legacy_post_clone_followups() {
4092 let metadata = json!({
4093 "post_clone_task": {
4094 "title": "Continue after clone",
4095 "prompt": "run build"
4096 }
4097 })
4098 .as_object()
4099 .cloned()
4100 .unwrap();
4101
4102 assert!(worker_should_enqueue_post_clone_task(&metadata));
4103 }
4104
4105 #[test]
4106 fn resolve_worker_id_prefers_env() {
4107 let original = std::env::var("CODETETHER_WORKER_ID").ok();
4108 unsafe {
4109 std::env::set_var("CODETETHER_WORKER_ID", "harvester-test-worker");
4110 }
4111 let resolved = resolve_worker_id();
4112 match original {
4113 Some(value) => unsafe {
4114 std::env::set_var("CODETETHER_WORKER_ID", value);
4115 },
4116 None => unsafe {
4117 std::env::remove_var("CODETETHER_WORKER_ID");
4118 },
4119 }
4120 assert_eq!(resolved, "harvester-test-worker");
4121 }
4122
4123 #[test]
4124 fn advertised_interfaces_include_http_and_bus_urls() {
4125 let interfaces = advertised_interfaces(Some("http://worker.test:8080/"));
4126 assert_eq!(interfaces["http"]["base_url"], "http://worker.test:8080");
4127 assert_eq!(
4128 interfaces["bus"]["stream_url"],
4129 "http://worker.test:8080/v1/bus/stream"
4130 );
4131 assert_eq!(
4132 interfaces["bus"]["publish_url"],
4133 "http://worker.test:8080/v1/bus/publish"
4134 );
4135 }
4136
4137 #[test]
4138 fn advertised_interfaces_omit_empty_public_url() {
4139 assert_eq!(advertised_interfaces(None), serde_json::json!({}));
4140 assert_eq!(advertised_interfaces(Some(" ")), serde_json::json!({}));
4141 }
4142
4143 #[test]
4144 fn worker_http_client_builds_with_stream_decoders_disabled() {
4145 assert!(build_worker_http_client().is_ok());
4146 }
4147
4148 #[test]
4149 fn task_timeout_prefers_fire_and_forget_payload_timeout() {
4150 let task = json!({
4151 "id": "task-1",
4152 "task_timeout_seconds": 604800,
4153 "metadata": {"timeout_secs": 1200}
4154 });
4155 let metadata = task_metadata(&task);
4156 assert_eq!(task_timeout_secs(&task, &metadata), 604800);
4157 }
4158
4159 #[tokio::test]
4160 async fn reserve_task_slot_enforces_capacity() {
4161 let processing = Arc::new(Mutex::new(HashSet::new()));
4162
4163 assert_eq!(
4164 reserve_task_slot(&processing, "task-1", 2).await,
4165 TaskReservation::Reserved
4166 );
4167 assert_eq!(
4168 reserve_task_slot(&processing, "task-1", 2).await,
4169 TaskReservation::AlreadyProcessing
4170 );
4171 assert_eq!(
4172 reserve_task_slot(&processing, "task-2", 2).await,
4173 TaskReservation::Reserved
4174 );
4175 assert_eq!(
4176 reserve_task_slot(&processing, "task-3", 2).await,
4177 TaskReservation::AtCapacity
4178 );
4179 }
4180
4181 #[test]
4182 fn normalize_max_concurrent_tasks_never_returns_zero() {
4183 assert_eq!(normalize_max_concurrent_tasks(0), 1);
4184 assert_eq!(normalize_max_concurrent_tasks(3), 3);
4185 }
4186}