1mod clone_location;
4mod clone_target;
5mod model_defaults;
6mod model_preferences;
7pub(crate) mod task_timeline;
8pub(super) mod workspace_resolve;
9
10use crate::a2a::claim::TaskClaimResponse;
11use crate::a2a::git_credentials::{
12 configure_repo_git_auth, configure_repo_git_github_app, write_git_credential_helper_script,
13};
14use crate::a2a::worker_workspace_record::{RegisteredWorkspaceRecord, fetch_workspace_record};
15use crate::bus::AgentBus;
16use crate::cli::{A2aArgs, ForageArgs};
17use crate::provenance::{ClaimProvenance, install_commit_msg_hook};
18use crate::provider::ProviderRegistry;
19use crate::session::Session;
20use crate::swarm::{DecompositionStrategy, SwarmConfig, SwarmExecutor};
21use crate::tui::swarm_view::SwarmEvent;
22use anyhow::{Context, Result};
23use futures::StreamExt;
24use reqwest::Client;
25use serde::Deserialize;
26use std::collections::HashMap;
27use std::collections::HashSet;
28use std::path::{Path, PathBuf};
29use std::sync::Arc;
30use std::time::Duration;
31use tokio::process::Command;
32use tokio::sync::{Mutex, mpsc};
33use tokio::task::JoinHandle;
34use tokio::time::Instant;
35
36use self::model_defaults::{default_model_for_provider, prefers_temperature_one};
37use self::model_preferences::choose_provider_for_tier;
38use crate::a2a::task_scope::check_task_scope;
39use crate::a2a::worker_tool_registry::{create_filtered_registry, is_tool_allowed};
40use crate::a2a::worker_workspace_context::resolve_task_workspace_dir;
41use clone_location::{git_clone_base_dir, resolve_workspace_clone_path};
42use clone_target::prepare_clone_target;
43
44#[derive(Debug, Clone, Copy, PartialEq, Eq)]
46pub enum WorkerStatus {
47 Idle,
48 Processing,
49}
50
51impl WorkerStatus {
52 pub fn as_str(&self) -> &'static str {
53 match self {
54 WorkerStatus::Idle => "idle",
55 WorkerStatus::Processing => "processing",
56 }
57 }
58}
59
60#[derive(Clone)]
62pub struct HeartbeatState {
63 worker_id: String,
64 pub agent_name: String,
65 pub status: Arc<Mutex<WorkerStatus>>,
66 pub active_task_count: Arc<Mutex<usize>>,
67 pub sub_agents: Arc<Mutex<HashSet<String>>>,
68}
69
70impl HeartbeatState {
71 pub fn new(worker_id: String, agent_name: String) -> Self {
72 Self {
73 worker_id,
74 agent_name,
75 status: Arc::new(Mutex::new(WorkerStatus::Idle)),
76 active_task_count: Arc::new(Mutex::new(0)),
77 sub_agents: Arc::new(Mutex::new(HashSet::new())),
78 }
79 }
80
81 pub async fn set_status(&self, status: WorkerStatus) {
82 *self.status.lock().await = status;
83 }
84
85 pub async fn set_task_count(&self, count: usize) {
86 *self.active_task_count.lock().await = count;
87 }
88
89 pub async fn register_sub_agent(&self, agent_name: String) {
90 self.sub_agents.lock().await.insert(agent_name);
91 }
92
93 pub async fn deregister_sub_agent(&self, agent_name: &str) {
94 self.sub_agents.lock().await.remove(agent_name);
95 }
96
97 pub async fn sub_agents_snapshot(&self) -> Vec<String> {
98 let mut names = self
99 .sub_agents
100 .lock()
101 .await
102 .iter()
103 .cloned()
104 .collect::<Vec<_>>();
105 names.sort();
106 names
107 }
108}
109
110#[derive(Clone, Debug)]
111#[allow(dead_code)]
112pub struct CognitionHeartbeatConfig {
113 pub enabled: bool,
114 pub source_base_url: String,
115 pub token: Option<String>,
116 pub provider_name: String,
117 pub interval_secs: u64,
118 pub include_thought_summary: bool,
119 pub summary_max_chars: usize,
120 pub request_timeout_ms: u64,
121}
122
123impl CognitionHeartbeatConfig {
124 pub fn from_env() -> Self {
125 let source_base_url = std::env::var("CODETETHER_WORKER_COGNITION_SOURCE_URL")
126 .unwrap_or_else(|_| "http://127.0.0.1:4096".to_string())
127 .trim_end_matches('/')
128 .to_string();
129
130 Self {
131 enabled: env_bool("CODETETHER_WORKER_COGNITION_SHARE_ENABLED", true),
132 source_base_url,
133 include_thought_summary: env_bool("CODETETHER_WORKER_COGNITION_INCLUDE_THOUGHTS", true),
134 summary_max_chars: env_usize("CODETETHER_WORKER_COGNITION_THOUGHT_MAX_CHARS", 480),
135 request_timeout_ms: env_u64("CODETETHER_WORKER_COGNITION_TIMEOUT_MS", 2_500).max(250),
136 interval_secs: env_u64("CODETETHER_WORKER_COGNITION_INTERVAL_SECS", 30).max(5),
137 provider_name: std::env::var("CODETETHER_WORKER_COGNITION_PROVIDER")
138 .unwrap_or_else(|_| "cognition".to_string()),
139 token: std::env::var("CODETETHER_WORKER_COGNITION_TOKEN").ok(),
140 }
141 }
142}
143
144#[derive(Debug, Deserialize)]
145struct CognitionStatusSnapshot {
146 running: bool,
147 #[serde(default)]
148 last_tick_at: Option<String>,
149 #[serde(default)]
150 active_persona_count: usize,
151 #[serde(default)]
152 events_buffered: usize,
153 #[serde(default)]
154 snapshots_buffered: usize,
155 #[serde(default)]
156 loop_interval_ms: u64,
157}
158
159#[derive(Debug, Deserialize)]
160struct CognitionLatestSnapshot {
161 generated_at: String,
162 summary: String,
163 #[serde(default)]
164 metadata: HashMap<String, serde_json::Value>,
165}
166
167#[derive(Debug, Clone, Copy, PartialEq, Eq)]
168enum TaskReservation {
169 Reserved,
170 AlreadyProcessing,
171 AtCapacity,
172}
173
174#[derive(Clone)]
175struct WorkerTaskRuntime {
176 client: Client,
177 server: String,
178 token: Option<String>,
179 worker_id: String,
180 agent_name: String,
181 processing: Arc<Mutex<HashSet<String>>>,
182 max_concurrent_tasks: usize,
183 auto_approve: AutoApprove,
184 bus: Arc<AgentBus>,
185 task_progress: Arc<Mutex<task_timeline::TaskProgressState>>,
187 workspace_ids: Vec<String>,
189}
190
191pub async fn run(args: A2aArgs) -> Result<()> {
193 let server = args.server.trim_end_matches('/');
194 let name = args
195 .name
196 .unwrap_or_else(|| format!("codetether-{}", std::process::id()));
197 let worker_id = resolve_worker_id();
198 export_worker_runtime_env(server, &args.token, &worker_id);
199 let max_concurrent_tasks = normalize_max_concurrent_tasks(args.max_concurrent_tasks);
200
201 let codebases: Vec<String> = args
202 .workspaces
203 .map(|c| c.split(',').map(|s| s.trim().to_string()).collect())
204 .unwrap_or_else(|| vec![std::env::current_dir().unwrap().display().to_string()]);
205
206 tracing::info!("Starting A2A worker: {} ({})", name, worker_id);
207 tracing::info!("Server: {}", server);
208 tracing::info!("Workspaces: {:?}", codebases);
209 tracing::info!(max_concurrent_tasks, "Worker task concurrency configured");
210
211 let shared_codebases: Arc<Mutex<Vec<String>>> = Arc::new(Mutex::new(codebases));
213
214 let client = build_worker_http_client()?;
215 let processing = Arc::new(Mutex::new(HashSet::<String>::new()));
216 let cognition_heartbeat = CognitionHeartbeatConfig::from_env();
217 if cognition_heartbeat.enabled {
218 tracing::info!(
219 source = %cognition_heartbeat.source_base_url,
220 include_thoughts = cognition_heartbeat.include_thought_summary,
221 max_chars = cognition_heartbeat.summary_max_chars,
222 timeout_ms = cognition_heartbeat.request_timeout_ms,
223 "Cognition heartbeat sharing enabled (set CODETETHER_WORKER_COGNITION_SHARE_ENABLED=false to disable)"
224 );
225 } else {
226 tracing::warn!(
227 "Cognition heartbeat sharing disabled; worker thought state will not be shared upstream"
228 );
229 }
230
231 let auto_approve = match args.auto_approve.as_str() {
232 "all" => AutoApprove::All,
233 "safe" => AutoApprove::Safe,
234 _ => AutoApprove::None,
235 };
236
237 let heartbeat_state = HeartbeatState::new(worker_id.clone(), name.clone());
239
240 let bus = AgentBus::new().into_arc();
242
243 crate::bus::s3_sink::spawn_bus_s3_sink(bus.clone());
245
246 {
247 let handle = bus.handle(&worker_id);
248 handle.announce_ready(worker_capabilities());
249 }
250
251 let task_progress: Arc<Mutex<task_timeline::TaskProgressState>> =
252 Arc::new(Mutex::new(task_timeline::TaskProgressState::new()));
253
254 let task_runtime = WorkerTaskRuntime {
255 client: client.clone(),
256 server: server.to_string(),
257 token: args.token.clone(),
258 worker_id: worker_id.clone(),
259 agent_name: name.clone(),
260 processing: processing.clone(),
261 max_concurrent_tasks,
262 auto_approve,
263 bus: bus.clone(),
264 task_progress: task_progress.clone(),
265 workspace_ids: Vec::new(),
266 };
267
268 {
270 let codebases = shared_codebases.lock().await.clone();
271 register_worker(
272 &client,
273 server,
274 &args.token,
275 &worker_id,
276 &name,
277 &codebases,
278 args.public_url.as_deref(),
279 )
280 .await?;
281 }
282
283 if let Err(e) =
284 sync_workspaces_from_server(&client, server, &args.token, &shared_codebases).await
285 {
286 tracing::warn!(error = %e, "Initial workspace sync failed");
287 }
288
289 fetch_pending_tasks(&task_runtime).await?;
291
292 let _workspace_sync_handle = start_workspace_sync(
294 client.clone(),
295 server.to_string(),
296 args.token.clone(),
297 shared_codebases.clone(),
298 );
299
300 loop {
302 let codebases = shared_codebases.lock().await.clone();
304
305 if let Err(e) = register_worker(
307 &client,
308 server,
309 &args.token,
310 &worker_id,
311 &name,
312 &codebases,
313 args.public_url.as_deref(),
314 )
315 .await
316 {
317 tracing::warn!("Failed to re-register worker on reconnection: {}", e);
318 }
319 if let Err(e) = fetch_pending_tasks(&task_runtime).await {
320 tracing::warn!("Reconnect task fetch failed: {}", e);
321 }
322
323 let heartbeat_handle = start_heartbeat(
325 client.clone(),
326 server.to_string(),
327 args.token.clone(),
328 heartbeat_state.clone(),
329 processing.clone(),
330 cognition_heartbeat.clone(),
331 task_progress.clone(),
332 );
333
334 match connect_stream(&task_runtime, &name, &codebases, None).await {
335 Ok(StreamDisconnectReason::Ended) => {
336 tracing::warn!("Stream ended, reconnecting...");
337 }
338 Ok(StreamDisconnectReason::ReadError(error)) => {
339 tracing::warn!(error = %error, "Stream read failed, reconnecting...");
340 }
341 Err(e) => {
342 tracing::error!("Stream error: {}, reconnecting...", e);
343 }
344 }
345
346 heartbeat_handle.abort();
348 tracing::debug!("Heartbeat cancelled for reconnection");
349
350 tokio::time::sleep(tokio::time::Duration::from_secs(5)).await;
351 }
352}
353
354pub async fn run_with_state(
357 args: A2aArgs,
358 server_state: crate::worker_server::WorkerServerState,
359) -> Result<()> {
360 let server = args.server.trim_end_matches('/');
361 let name = args
362 .name
363 .unwrap_or_else(|| format!("codetether-{}", std::process::id()));
364 let worker_id = resolve_worker_id();
365 export_worker_runtime_env(server, &args.token, &worker_id);
366 let max_concurrent_tasks = normalize_max_concurrent_tasks(args.max_concurrent_tasks);
367
368 server_state.set_worker_id(worker_id.clone()).await;
370
371 let codebases: Vec<String> = args
372 .workspaces
373 .map(|c| c.split(',').map(|s| s.trim().to_string()).collect())
374 .unwrap_or_else(|| vec![std::env::current_dir().unwrap().display().to_string()]);
375
376 tracing::info!("Starting A2A worker: {} ({})", name, worker_id);
377 tracing::info!("Server: {}", server);
378 tracing::info!("Workspaces: {:?}", codebases);
379 tracing::info!(max_concurrent_tasks, "Worker task concurrency configured");
380
381 let shared_codebases: Arc<Mutex<Vec<String>>> = Arc::new(Mutex::new(codebases));
383
384 let client = build_worker_http_client()?;
385 let processing = Arc::new(Mutex::new(HashSet::<String>::new()));
386 let cognition_heartbeat = CognitionHeartbeatConfig::from_env();
387 if cognition_heartbeat.enabled {
388 tracing::info!(
389 source = %cognition_heartbeat.source_base_url,
390 include_thoughts = cognition_heartbeat.include_thought_summary,
391 max_chars = cognition_heartbeat.summary_max_chars,
392 timeout_ms = cognition_heartbeat.request_timeout_ms,
393 "Cognition heartbeat sharing enabled (set CODETETHER_WORKER_COGNITION_SHARE_ENABLED=false to disable)"
394 );
395 } else {
396 tracing::warn!(
397 "Cognition heartbeat sharing disabled; worker thought state will not be shared upstream"
398 );
399 }
400
401 let auto_approve = match args.auto_approve.as_str() {
402 "all" => AutoApprove::All,
403 "safe" => AutoApprove::Safe,
404 _ => AutoApprove::None,
405 };
406
407 let heartbeat_state = HeartbeatState::new(worker_id.clone(), name.clone());
409
410 server_state
412 .set_heartbeat_state(Arc::new(heartbeat_state.clone()))
413 .await;
414
415 let bus = AgentBus::new().into_arc();
417 server_state.set_bus(bus.clone()).await;
418
419 crate::bus::s3_sink::spawn_bus_s3_sink(bus.clone());
421
422 {
423 let handle = bus.handle(&worker_id);
424 handle.announce_ready(worker_capabilities());
425 }
426
427 let task_progress_ws: Arc<Mutex<task_timeline::TaskProgressState>> =
428 Arc::new(Mutex::new(task_timeline::TaskProgressState::new()));
429
430 let task_runtime = WorkerTaskRuntime {
431 client: client.clone(),
432 server: server.to_string(),
433 token: args.token.clone(),
434 worker_id: worker_id.clone(),
435 agent_name: name.clone(),
436 processing: processing.clone(),
437 max_concurrent_tasks,
438 auto_approve,
439 bus: bus.clone(),
440 task_progress: task_progress_ws.clone(),
441 workspace_ids: Vec::new(),
442 };
443
444 {
446 let codebases = shared_codebases.lock().await.clone();
447 register_worker(
448 &client,
449 server,
450 &args.token,
451 &worker_id,
452 &name,
453 &codebases,
454 args.public_url.as_deref(),
455 )
456 .await?;
457 }
458
459 if let Err(e) =
460 sync_workspaces_from_server(&client, server, &args.token, &shared_codebases).await
461 {
462 tracing::warn!(error = %e, "Initial workspace sync failed");
463 }
464
465 server_state.set_connected(true).await;
467
468 fetch_pending_tasks(&task_runtime).await?;
470
471 let _workspace_sync_handle = start_workspace_sync(
473 client.clone(),
474 server.to_string(),
475 args.token.clone(),
476 shared_codebases.clone(),
477 );
478
479 loop {
481 let codebases = shared_codebases.lock().await.clone();
483
484 let (task_notify_tx, task_notify_rx) = mpsc::channel::<String>(32);
487 server_state
488 .set_task_notification_channel(task_notify_tx)
489 .await;
490
491 server_state.set_connected(true).await;
493
494 if let Err(e) = register_worker(
496 &client,
497 server,
498 &args.token,
499 &worker_id,
500 &name,
501 &codebases,
502 args.public_url.as_deref(),
503 )
504 .await
505 {
506 tracing::warn!("Failed to re-register worker on reconnection: {}", e);
507 }
508 if let Err(e) = fetch_pending_tasks(&task_runtime).await {
509 tracing::warn!("Reconnect task fetch failed: {}", e);
510 }
511
512 let heartbeat_handle = start_heartbeat(
514 client.clone(),
515 server.to_string(),
516 args.token.clone(),
517 heartbeat_state.clone(),
518 processing.clone(),
519 cognition_heartbeat.clone(),
520 task_progress_ws.clone(),
521 );
522
523 match connect_stream(&task_runtime, &name, &codebases, Some(task_notify_rx)).await {
524 Ok(StreamDisconnectReason::Ended) => {
525 tracing::warn!("Stream ended, reconnecting...");
526 }
527 Ok(StreamDisconnectReason::ReadError(error)) => {
528 tracing::warn!(error = %error, "Stream read failed, reconnecting...");
529 }
530 Err(e) => {
531 tracing::error!("Stream error: {}, reconnecting...", e);
532 }
533 }
534
535 server_state.set_connected(false).await;
537
538 heartbeat_handle.abort();
540 tracing::debug!("Heartbeat cancelled for reconnection");
541
542 tokio::time::sleep(tokio::time::Duration::from_secs(5)).await;
543 }
544}
545
546pub fn generate_worker_id() -> String {
547 format!(
548 "wrk_{}_{:x}",
549 chrono::Utc::now().timestamp(),
550 rand::random::<u64>()
551 )
552}
553
554fn build_worker_http_client() -> Result<Client> {
555 Client::builder()
556 .no_gzip()
557 .no_brotli()
558 .no_zstd()
559 .no_deflate()
560 .build()
561 .context("Failed to build worker HTTP client")
562}
563
564fn resolve_worker_id() -> String {
565 for key in ["CODETETHER_WORKER_ID", "A2A_WORKER_ID"] {
566 if let Ok(value) = std::env::var(key) {
567 let trimmed = value.trim();
568 if !trimmed.is_empty() {
569 return trimmed.to_string();
570 }
571 }
572 }
573 generate_worker_id()
574}
575
576fn normalize_max_concurrent_tasks(max_concurrent_tasks: usize) -> usize {
577 max_concurrent_tasks.max(1)
578}
579
580fn persistent_worker_enabled() -> bool {
581 std::env::var("PERSISTENT_WORKER_ENABLED")
582 .map(|value| {
583 matches!(
584 value.trim().to_ascii_lowercase().as_str(),
585 "1" | "true" | "yes" | "on"
586 )
587 })
588 .unwrap_or(false)
589}
590
591fn persistent_worker_lease_seconds() -> u64 {
592 std::env::var("PERSISTENT_WORKER_LEASE_SECONDS")
593 .ok()
594 .and_then(|value| value.trim().parse::<u64>().ok())
595 .filter(|value| *value > 0)
596 .unwrap_or(3600)
597}
598
599async fn reserve_task_slot(
600 processing: &Arc<Mutex<HashSet<String>>>,
601 task_id: &str,
602 max_concurrent_tasks: usize,
603) -> TaskReservation {
604 let mut proc = processing.lock().await;
605 if proc.contains(task_id) {
606 TaskReservation::AlreadyProcessing
607 } else if proc.len() >= max_concurrent_tasks {
608 TaskReservation::AtCapacity
609 } else {
610 proc.insert(task_id.to_string());
611 TaskReservation::Reserved
612 }
613}
614
615fn export_worker_runtime_env(server: &str, token: &Option<String>, worker_id: &str) {
616 unsafe {
620 std::env::set_var("CODETETHER_SERVER", server);
621 std::env::set_var("CODETETHER_WORKER_ID", worker_id);
622 if let Some(token) = token {
623 std::env::set_var("CODETETHER_TOKEN", token);
624 }
625 }
626}
627
628fn advertised_interfaces(public_url: Option<&str>) -> serde_json::Value {
629 let Some(base_url) = public_url
630 .map(str::trim)
631 .filter(|value| !value.is_empty())
632 .map(|value| value.trim_end_matches('/').to_string())
633 else {
634 return serde_json::json!({});
635 };
636
637 serde_json::json!({
638 "http": {
639 "base_url": base_url,
640 },
641 "bus": {
642 "stream_url": format!("{base_url}/v1/bus/stream"),
643 "publish_url": format!("{base_url}/v1/bus/publish"),
644 },
645 })
646}
647
648#[derive(Debug, Clone, Copy)]
659pub enum AutoApprove {
660 All,
661 Safe,
662 None,
663}
664
665pub const DEFAULT_A2A_SERVER_URL: &str = "https://api.codetether.run";
667
668const BASE_WORKER_CAPABILITIES: &[&str] = &[
670 "forage", "ralph", "swarm", "rlm", "a2a", "mcp", "grpc", "grpc-web", "jsonrpc",
671];
672
673fn worker_capabilities() -> Vec<String> {
674 let mut capabilities: Vec<String> = BASE_WORKER_CAPABILITIES
675 .iter()
676 .map(|capability| capability.to_string())
677 .collect();
678
679 let is_knative = std::env::var("KNATIVE_SERVICE")
680 .map(|value| {
681 let normalized = value.trim().to_lowercase();
682 normalized == "1" || normalized == "true" || normalized == "yes"
683 })
684 .unwrap_or(false);
685 if is_knative {
686 capabilities.push("knative".to_string());
687 }
688
689 capabilities
690}
691
692async fn sync_timeline_to_runtime(
694 timeline: &task_timeline::TaskTimeline,
695 runtime: &WorkerTaskRuntime,
696) {
697 timeline.sync_progress().await;
698 let state = timeline.progress_handle().lock().await.clone();
699 *runtime.task_progress.lock().await = state;
700}
701
702async fn resolve_and_log_workspace_ids(
704 client: &Client,
705 server: &str,
706 token: &Option<String>,
707 workspace_roots: &[String],
708) -> Vec<String> {
709 match workspace_resolve::resolve_workspace_ids(client, server, token, workspace_roots).await {
710 Ok(resolved) => {
711 if resolved.is_empty() {
712 tracing::warn!(
713 roots = ?workspace_roots,
714 "No server-side workspace IDs resolved for configured roots \
715 — the server may not route tasks to this worker. \
716 Ensure workspaces are registered with the control plane \
717 and that their paths fall under the configured roots."
718 );
719 } else {
720 let ids: Vec<String> = resolved.iter().map(|ws| ws.id.clone()).collect();
721 tracing::info!(
722 workspace_ids = ?ids,
723 count = resolved.len(),
724 "Resolved server-side workspace IDs for configured roots"
725 );
726 }
727 resolved.iter().map(|ws| ws.id.clone()).collect()
728 }
729 Err(e) => {
730 tracing::warn!(error = %e, "Failed to resolve workspace IDs from server");
731 Vec::new()
732 }
733 }
734}
735
736fn task_value<'a>(task: &'a serde_json::Value, key: &str) -> Option<&'a serde_json::Value> {
737 task.get("task")
738 .and_then(|t| t.get(key))
739 .or_else(|| task.get(key))
740}
741
742fn task_str<'a>(task: &'a serde_json::Value, key: &str) -> Option<&'a str> {
743 task_value(task, key).and_then(|v| v.as_str())
744}
745
746fn task_u64(task: &serde_json::Value, key: &str) -> Option<u64> {
747 let value = task_value(task, key)?;
748 if let Some(v) = value.as_u64() {
749 return Some(v);
750 }
751 if let Some(v) = value.as_i64()
752 && v >= 0
753 {
754 return Some(v as u64);
755 }
756 if let Some(v) = value.as_str()
757 && let Ok(parsed) = v.trim().parse::<u64>()
758 {
759 return Some(parsed);
760 }
761 None
762}
763
764fn task_metadata(task: &serde_json::Value) -> serde_json::Map<String, serde_json::Value> {
765 task_value(task, "metadata")
766 .and_then(|m| m.as_object())
767 .cloned()
768 .unwrap_or_default()
769}
770
771fn task_timeout_secs(
772 task: &serde_json::Value,
773 metadata: &serde_json::Map<String, serde_json::Value>,
774) -> u64 {
775 task_u64(task, "task_timeout_seconds")
776 .or_else(|| task_u64(task, "timeout_secs"))
777 .or_else(|| {
778 metadata_u64(
779 metadata,
780 &["task_timeout_seconds", "timeout_secs", "timeout"],
781 )
782 })
783 .unwrap_or(1200)
784 .clamp(60, 604800)
785}
786
787fn model_ref_to_provider_model(model: &str) -> String {
788 if !model.contains('/') && model.contains(':') {
792 model.replacen(':', "/", 1)
793 } else {
794 model.to_string()
795 }
796}
797
798fn is_swarm_agent(agent_type: &str) -> bool {
799 matches!(
800 agent_type.trim().to_ascii_lowercase().as_str(),
801 "swarm" | "parallel" | "multi-agent"
802 )
803}
804
805fn is_forage_agent(agent_type: &str) -> bool {
806 agent_type.trim().eq_ignore_ascii_case("forage")
807}
808
809fn metadata_lookup<'a>(
810 metadata: &'a serde_json::Map<String, serde_json::Value>,
811 key: &str,
812) -> Option<&'a serde_json::Value> {
813 metadata
814 .get(key)
815 .or_else(|| {
816 metadata
817 .get("routing")
818 .and_then(|v| v.as_object())
819 .and_then(|obj| obj.get(key))
820 })
821 .or_else(|| {
822 metadata
823 .get("swarm")
824 .and_then(|v| v.as_object())
825 .and_then(|obj| obj.get(key))
826 })
827 .or_else(|| {
828 metadata
829 .get("forage")
830 .and_then(|v| v.as_object())
831 .and_then(|obj| obj.get(key))
832 })
833}
834
835fn metadata_str(
836 metadata: &serde_json::Map<String, serde_json::Value>,
837 keys: &[&str],
838) -> Option<String> {
839 for key in keys {
840 if let Some(value) = metadata_lookup(metadata, key).and_then(|v| v.as_str()) {
841 let trimmed = value.trim();
842 if !trimmed.is_empty() {
843 return Some(trimmed.to_string());
844 }
845 }
846 }
847 None
848}
849
850fn metadata_usize(
851 metadata: &serde_json::Map<String, serde_json::Value>,
852 keys: &[&str],
853) -> Option<usize> {
854 for key in keys {
855 if let Some(value) = metadata_lookup(metadata, key) {
856 if let Some(v) = value.as_u64() {
857 return usize::try_from(v).ok();
858 }
859 if let Some(v) = value.as_i64()
860 && v >= 0
861 {
862 return usize::try_from(v as u64).ok();
863 }
864 if let Some(v) = value.as_str()
865 && let Ok(parsed) = v.trim().parse::<usize>()
866 {
867 return Some(parsed);
868 }
869 }
870 }
871 None
872}
873
874fn metadata_u64(
875 metadata: &serde_json::Map<String, serde_json::Value>,
876 keys: &[&str],
877) -> Option<u64> {
878 for key in keys {
879 if let Some(value) = metadata_lookup(metadata, key) {
880 if let Some(v) = value.as_u64() {
881 return Some(v);
882 }
883 if let Some(v) = value.as_i64()
884 && v >= 0
885 {
886 return Some(v as u64);
887 }
888 if let Some(v) = value.as_str()
889 && let Ok(parsed) = v.trim().parse::<u64>()
890 {
891 return Some(parsed);
892 }
893 }
894 }
895 None
896}
897
898fn metadata_bool(
899 metadata: &serde_json::Map<String, serde_json::Value>,
900 keys: &[&str],
901) -> Option<bool> {
902 for key in keys {
903 if let Some(value) = metadata_lookup(metadata, key) {
904 if let Some(v) = value.as_bool() {
905 return Some(v);
906 }
907 if let Some(v) = value.as_str() {
908 match v.trim().to_ascii_lowercase().as_str() {
909 "1" | "true" | "yes" | "on" => return Some(true),
910 "0" | "false" | "no" | "off" => return Some(false),
911 _ => {}
912 }
913 }
914 }
915 }
916 None
917}
918
919fn metadata_f64(
920 metadata: &serde_json::Map<String, serde_json::Value>,
921 keys: &[&str],
922) -> Option<f64> {
923 for key in keys {
924 if let Some(value) = metadata_lookup(metadata, key) {
925 if let Some(v) = value.as_f64() {
926 return Some(v);
927 }
928 if let Some(v) = value.as_i64() {
929 return Some(v as f64);
930 }
931 if let Some(v) = value.as_u64() {
932 return Some(v as f64);
933 }
934 if let Some(v) = value.as_str()
935 && let Ok(parsed) = v.trim().parse::<f64>()
936 {
937 return Some(parsed);
938 }
939 }
940 }
941 None
942}
943
944fn metadata_string_list(
945 metadata: &serde_json::Map<String, serde_json::Value>,
946 keys: &[&str],
947) -> Vec<String> {
948 for key in keys {
949 let Some(value) = metadata_lookup(metadata, key) else {
950 continue;
951 };
952
953 if let Some(items) = value.as_array() {
954 let parsed = items
955 .iter()
956 .filter_map(|item| item.as_str())
957 .map(str::trim)
958 .filter(|item| !item.is_empty())
959 .map(ToString::to_string)
960 .collect::<Vec<_>>();
961 if !parsed.is_empty() {
962 return parsed;
963 }
964 }
965
966 if let Some(value) = value.as_str() {
967 let parsed = value
968 .split(['\n', ','])
969 .map(str::trim)
970 .filter(|item| !item.is_empty())
971 .map(ToString::to_string)
972 .collect::<Vec<_>>();
973 if !parsed.is_empty() {
974 return parsed;
975 }
976 }
977 }
978
979 Vec::new()
980}
981
982fn normalize_forage_execution_engine(value: Option<String>) -> String {
983 match value
984 .as_deref()
985 .unwrap_or("run")
986 .trim()
987 .to_ascii_lowercase()
988 .as_str()
989 {
990 "swarm" => "swarm".to_string(),
991 "go" => "go".to_string(),
992 _ => "run".to_string(),
993 }
994}
995
996fn normalize_forage_swarm_strategy(value: Option<String>) -> String {
997 match value
998 .as_deref()
999 .unwrap_or("auto")
1000 .trim()
1001 .to_ascii_lowercase()
1002 .as_str()
1003 {
1004 "domain" => "domain".to_string(),
1005 "data" => "data".to_string(),
1006 "stage" => "stage".to_string(),
1007 "none" => "none".to_string(),
1008 _ => "auto".to_string(),
1009 }
1010}
1011
1012fn build_forage_args(
1013 prompt: &str,
1014 title: &str,
1015 metadata: &serde_json::Map<String, serde_json::Value>,
1016 selected_model: Option<String>,
1017) -> ForageArgs {
1018 let mut moonshots = metadata_string_list(
1019 metadata,
1020 &["moonshots", "moonshot", "goals", "mission", "missions"],
1021 );
1022 if moonshots.is_empty() {
1023 let fallback = prompt.trim();
1024 if !fallback.is_empty() {
1025 moonshots.push(fallback.to_string());
1026 } else if !title.trim().is_empty() {
1027 moonshots.push(title.trim().to_string());
1028 }
1029 }
1030
1031 ForageArgs {
1032 top: metadata_usize(metadata, &["top"]).unwrap_or(3),
1033 loop_mode: metadata_bool(metadata, &["loop", "loop_mode"]).unwrap_or(false),
1034 interval_secs: metadata_u64(metadata, &["interval_secs", "interval"]).unwrap_or(120),
1035 max_cycles: metadata_usize(metadata, &["max_cycles"]).unwrap_or(0),
1036 execute: metadata_bool(metadata, &["execute"]).unwrap_or(false),
1037 no_s3: metadata_bool(metadata, &["no_s3", "local_only"]).unwrap_or(true),
1039 moonshots,
1040 moonshot_file: metadata_str(metadata, &["moonshot_file"]).map(PathBuf::from),
1041 moonshot_required: metadata_bool(metadata, &["moonshot_required"]).unwrap_or(false),
1042 moonshot_min_alignment: metadata_f64(metadata, &["moonshot_min_alignment"]).unwrap_or(0.10),
1043 execution_engine: normalize_forage_execution_engine(metadata_str(
1044 metadata,
1045 &["execution_engine", "engine"],
1046 )),
1047 run_timeout_secs: metadata_u64(metadata, &["run_timeout_secs", "timeout_secs", "timeout"])
1048 .unwrap_or(900),
1049 fail_fast: metadata_bool(metadata, &["fail_fast"]).unwrap_or(false),
1050 swarm_strategy: normalize_forage_swarm_strategy(metadata_str(
1051 metadata,
1052 &["swarm_strategy", "strategy"],
1053 )),
1054 swarm_max_subagents: metadata_usize(metadata, &["swarm_max_subagents"]).unwrap_or(8),
1055 swarm_max_steps: metadata_usize(metadata, &["swarm_max_steps"]).unwrap_or(100),
1056 swarm_subagent_timeout_secs: metadata_u64(metadata, &["swarm_subagent_timeout_secs"])
1057 .unwrap_or(300),
1058 model: selected_model,
1059 json: metadata_bool(metadata, &["json"]).unwrap_or(false),
1060 }
1061}
1062
1063fn parse_swarm_strategy(
1064 metadata: &serde_json::Map<String, serde_json::Value>,
1065) -> DecompositionStrategy {
1066 match metadata_str(
1067 metadata,
1068 &[
1069 "decomposition_strategy",
1070 "swarm_strategy",
1071 "strategy",
1072 "swarm_decomposition",
1073 ],
1074 )
1075 .as_deref()
1076 .map(|s| s.to_ascii_lowercase())
1077 .as_deref()
1078 {
1079 Some("none") | Some("single") => DecompositionStrategy::None,
1080 Some("domain") | Some("by_domain") => DecompositionStrategy::ByDomain,
1081 Some("data") | Some("by_data") => DecompositionStrategy::ByData,
1082 Some("stage") | Some("by_stage") => DecompositionStrategy::ByStage,
1083 _ => DecompositionStrategy::Automatic,
1084 }
1085}
1086
1087async fn resolve_swarm_model(
1088 explicit_model: Option<String>,
1089 model_tier: Option<&str>,
1090) -> Option<String> {
1091 if let Some(model) = explicit_model
1092 && !model.trim().is_empty()
1093 {
1094 return Some(model);
1095 }
1096
1097 let registry = ProviderRegistry::shared_from_vault().await.ok()?;
1098 let providers = registry.list();
1099 if providers.is_empty() {
1100 return None;
1101 }
1102 let provider = choose_provider_for_tier(providers.as_slice(), model_tier);
1103 let model = default_model_for_provider(provider, model_tier);
1104 Some(format!("{}/{}", provider, model))
1105}
1106
1107fn format_swarm_event_for_output(event: &SwarmEvent) -> Option<String> {
1108 match event {
1109 SwarmEvent::Started {
1110 task,
1111 total_subtasks,
1112 } => Some(format!(
1113 "[swarm] started task={} planned_subtasks={}",
1114 task, total_subtasks
1115 )),
1116 SwarmEvent::StageComplete {
1117 stage,
1118 completed,
1119 failed,
1120 } => Some(format!(
1121 "[swarm] stage={} completed={} failed={}",
1122 stage, completed, failed
1123 )),
1124 SwarmEvent::SubTaskUpdate { id, status, .. } => Some(format!(
1125 "[swarm] subtask id={} status={}",
1126 &id.chars().take(8).collect::<String>(),
1127 format!("{status:?}").to_ascii_lowercase()
1128 )),
1129 SwarmEvent::AgentToolCall {
1130 subtask_id,
1131 tool_name,
1132 } => Some(format!(
1133 "[swarm] subtask id={} tool={}",
1134 &subtask_id.chars().take(8).collect::<String>(),
1135 tool_name
1136 )),
1137 SwarmEvent::AgentError { subtask_id, error } => Some(format!(
1138 "[swarm] subtask id={} error={}",
1139 &subtask_id.chars().take(8).collect::<String>(),
1140 error
1141 )),
1142 SwarmEvent::Complete { success, stats } => Some(format!(
1143 "[swarm] complete success={} subtasks={} speedup={:.2}",
1144 success,
1145 stats.subagents_completed + stats.subagents_failed,
1146 stats.speedup_factor
1147 )),
1148 SwarmEvent::Error(err) => Some(format!("[swarm] error message={}", err)),
1149 _ => None,
1150 }
1151}
1152
1153pub async fn register_worker(
1154 client: &Client,
1155 server: &str,
1156 token: &Option<String>,
1157 worker_id: &str,
1158 name: &str,
1159 codebases: &[String],
1160 public_url: Option<&str>,
1161) -> Result<()> {
1162 let models = match load_provider_models().await {
1164 Ok(m) => m,
1165 Err(e) => {
1166 tracing::warn!(
1167 "Failed to load provider models: {}, proceeding without model info",
1168 e
1169 );
1170 HashMap::new()
1171 }
1172 };
1173
1174 let mut req = client.post(format!("{}/v1/agent/workers/register", server));
1176
1177 if let Some(t) = token {
1178 req = req.bearer_auth(t);
1179 }
1180
1181 let models_array: Vec<serde_json::Value> = models
1184 .iter()
1185 .flat_map(|(provider, model_infos)| {
1186 model_infos.iter().map(move |m| {
1187 let mut obj = serde_json::json!({
1188 "id": format!("{}/{}", provider, m.id),
1189 "name": &m.id,
1190 "provider": provider,
1191 "provider_id": provider,
1192 });
1193 if let Some(input_cost) = m.input_cost_per_million {
1194 obj["input_cost_per_million"] = serde_json::json!(input_cost);
1195 }
1196 if let Some(output_cost) = m.output_cost_per_million {
1197 obj["output_cost_per_million"] = serde_json::json!(output_cost);
1198 }
1199 obj
1200 })
1201 })
1202 .collect();
1203
1204 tracing::info!(
1205 "Registering worker with {} models from {} providers",
1206 models_array.len(),
1207 models.len()
1208 );
1209
1210 let hostname = std::env::var("HOSTNAME")
1211 .or_else(|_| std::env::var("COMPUTERNAME"))
1212 .unwrap_or_else(|_| "unknown".to_string());
1213 let k8s_node_name = std::env::var("K8S_NODE_NAME")
1214 .ok()
1215 .map(|value| value.trim().to_string())
1216 .filter(|value| !value.is_empty());
1217
1218 let registry = crate::agent::AgentRegistry::with_builtins();
1220 let agent_defs: Vec<serde_json::Value> = registry
1221 .list()
1222 .iter()
1223 .map(|info| {
1224 serde_json::json!({
1225 "name": info.name,
1226 "description": info.description,
1227 "mode": format!("{:?}", info.mode).to_lowercase(),
1228 "native": info.native,
1229 "hidden": info.hidden,
1230 "model": info.model,
1231 "temperature": info.temperature,
1232 "top_p": info.top_p,
1233 "max_steps": info.max_steps,
1234 })
1235 })
1236 .collect();
1237
1238 let workspace_ids = resolve_and_log_workspace_ids(client, server, token, codebases).await;
1240
1241 let res = req
1242 .json(&serde_json::json!({
1243 "worker_id": worker_id,
1244 "name": name,
1245 "capabilities": worker_capabilities(),
1246 "hostname": hostname,
1247 "k8s_node_name": k8s_node_name,
1248 "models": models_array,
1249 "workspaces": codebases,
1250 "workspace_ids": workspace_ids,
1251 "interfaces": advertised_interfaces(public_url),
1252 "agents": agent_defs,
1253 }))
1254 .send()
1255 .await?;
1256
1257 if res.status().is_success() {
1258 tracing::info!("Worker registered successfully");
1259 } else {
1260 tracing::warn!("Failed to register worker: {}", res.status());
1261 }
1262
1263 Ok(())
1264}
1265
1266async fn load_provider_models() -> Result<HashMap<String, Vec<crate::provider::ModelInfo>>> {
1275 use tokio::sync::OnceCell;
1276 static CACHE: OnceCell<HashMap<String, Vec<crate::provider::ModelInfo>>> =
1277 OnceCell::const_new();
1278 CACHE
1279 .get_or_try_init(|| async { load_provider_models_uncached().await })
1280 .await
1281 .cloned()
1282}
1283
1284async fn load_provider_models_uncached() -> Result<HashMap<String, Vec<crate::provider::ModelInfo>>>
1285{
1286 let registry = match ProviderRegistry::shared_from_vault().await {
1288 Ok(r) if !r.list().is_empty() => {
1289 tracing::info!("Loaded {} providers from Vault", r.list().len());
1290 (*r).clone()
1291 }
1292 Ok(_) => {
1293 tracing::warn!("Vault returned 0 providers, falling back to config/env vars");
1294 fallback_registry().await?
1295 }
1296 Err(e) => {
1297 tracing::warn!("Vault unreachable ({}), falling back to config/env vars", e);
1298 fallback_registry().await?
1299 }
1300 };
1301
1302 let mut models_by_provider: HashMap<String, Vec<crate::provider::ModelInfo>> = HashMap::new();
1303
1304 for provider_name in registry.list() {
1305 if let Some(provider) = registry.get(provider_name) {
1306 match provider.list_models().await {
1307 Ok(models) => {
1308 if !models.is_empty() {
1309 tracing::debug!("Provider {}: {} models", provider_name, models.len());
1310 models_by_provider.insert(provider_name.to_string(), models);
1311 }
1312 }
1313 Err(e) => {
1314 tracing::debug!("Failed to list models for {}: {}", provider_name, e);
1315 }
1316 }
1317 }
1318 }
1319
1320 Ok(models_by_provider)
1321}
1322
1323async fn fallback_registry() -> Result<ProviderRegistry> {
1325 let config = crate::config::Config::load().await.unwrap_or_default();
1326 ProviderRegistry::from_config(&config).await
1327}
1328
1329async fn fetch_pending_tasks(runtime: &WorkerTaskRuntime) -> Result<()> {
1330 tracing::info!("Checking for pending tasks...");
1331
1332 let mut url = format!("{}/v1/agent/tasks?status=pending", runtime.server);
1333 if !runtime.agent_name.is_empty() {
1334 url = format!(
1335 "{}&agent_name={}",
1336 url,
1337 urlencoding::encode(&runtime.agent_name)
1338 );
1339 }
1340 let mut req = runtime.client.get(&url);
1341 if let Some(t) = &runtime.token {
1342 req = req.bearer_auth(t);
1343 }
1344
1345 let res = req.send().await?;
1346 if !res.status().is_success() {
1347 return Ok(());
1348 }
1349
1350 let data: serde_json::Value = res.json().await?;
1351 let tasks = if let Some(arr) = data.as_array() {
1353 arr.clone()
1354 } else {
1355 data["tasks"].as_array().cloned().unwrap_or_default()
1356 };
1357
1358 tracing::info!("Found {} pending task(s)", tasks.len());
1359
1360 for task in tasks {
1361 if let Some(id) = task["id"].as_str() {
1362 if let Err(reason) = check_task_scope(
1364 &task,
1365 &runtime.worker_id,
1366 &runtime.agent_name,
1367 &runtime.workspace_ids,
1368 ) {
1369 tracing::debug!(
1370 task_id = id,
1371 reason = %reason,
1372 "Pending task skipped — out of scope"
1373 );
1374 continue;
1375 }
1376
1377 match reserve_task_slot(&runtime.processing, id, runtime.max_concurrent_tasks).await {
1378 TaskReservation::Reserved => {
1379 let task_id = id.to_string();
1380 let runtime = runtime.clone();
1381
1382 tokio::spawn(async move {
1383 if let Err(e) = handle_task(&runtime, &task).await {
1384 tracing::error!("Task {} failed: {}", task_id, e);
1385 }
1386 runtime.processing.lock().await.remove(&task_id);
1387 });
1388 }
1389 TaskReservation::AlreadyProcessing => {}
1390 TaskReservation::AtCapacity => {
1391 tracing::debug!(
1392 max_concurrent_tasks = runtime.max_concurrent_tasks,
1393 "Worker is at task capacity; leaving remaining tasks pending"
1394 );
1395 break;
1396 }
1397 }
1398 }
1399 }
1400
1401 Ok(())
1402}
1403
1404#[derive(Debug, PartialEq, Eq)]
1405enum StreamDisconnectReason {
1406 Ended,
1407 ReadError(String),
1408}
1409
1410async fn connect_stream(
1411 runtime: &WorkerTaskRuntime,
1412 name: &str,
1413 codebases: &[String],
1414 task_notify_rx: Option<mpsc::Receiver<String>>,
1415) -> Result<StreamDisconnectReason> {
1416 let url = format!(
1417 "{}/v1/worker/tasks/stream?agent_name={}&worker_id={}",
1418 runtime.server,
1419 urlencoding::encode(name),
1420 urlencoding::encode(&runtime.worker_id)
1421 );
1422
1423 let mut req = runtime
1424 .client
1425 .get(&url)
1426 .header("Accept", "text/event-stream")
1427 .header("Accept-Encoding", "identity")
1428 .header("Cache-Control", "no-cache, no-transform")
1429 .header("X-Worker-ID", &runtime.worker_id)
1430 .header("X-Agent-Name", name)
1431 .header("X-Codebases", codebases.join(","))
1432 .header("X-Workspaces", codebases.join(","));
1433
1434 if let Some(t) = &runtime.token {
1435 req = req.bearer_auth(t);
1436 }
1437
1438 let res = req.send().await?;
1439 if !res.status().is_success() {
1440 anyhow::bail!("Failed to connect: {}", res.status());
1441 }
1442
1443 tracing::info!("Connected to A2A server");
1444
1445 let mut stream = res.bytes_stream();
1446 let mut buffer = String::new();
1447 let mut poll_interval = tokio::time::interval(tokio::time::Duration::from_secs(30));
1448 poll_interval.tick().await; let mut task_notify_rx = task_notify_rx;
1452
1453 loop {
1454 tokio::select! {
1455 task_id = async {
1458 if let Some(ref mut rx) = task_notify_rx {
1459 rx.recv().await
1460 } else {
1461 futures::future::pending().await
1463 }
1464 } => {
1465 if let Some(task_id) = task_id {
1466 tracing::info!("Received task notification via CloudEvent: {}", task_id);
1467 if let Err(e) = poll_pending_tasks(runtime).await {
1469 tracing::warn!("Task notification poll failed: {}", e);
1470 }
1471 }
1472 }
1473 chunk = stream.next() => {
1474 match chunk {
1475 Some(Ok(chunk)) => {
1476 buffer.push_str(&String::from_utf8_lossy(&chunk));
1477
1478 while let Some(pos) = buffer.find("\n\n") {
1480 let event_str = buffer[..pos].to_string();
1481 buffer = buffer[pos + 2..].to_string();
1482
1483 if let Some(data_line) = event_str.lines().find(|l| l.starts_with("data:")) {
1484 let data = data_line.trim_start_matches("data:").trim();
1485 if data == "[DONE]" || data.is_empty() {
1486 continue;
1487 }
1488
1489 if let Ok(task) = serde_json::from_str::<serde_json::Value>(data) {
1490 spawn_task_handler(&task, runtime).await;
1491 }
1492 }
1493 }
1494 }
1495 Some(Err(e)) => {
1496 return Ok(StreamDisconnectReason::ReadError(e.to_string()));
1497 }
1498 None => {
1499 return Ok(StreamDisconnectReason::Ended);
1501 }
1502 }
1503 }
1504 _ = poll_interval.tick() => {
1505 if let Err(e) = poll_pending_tasks(runtime).await {
1507 tracing::warn!("Periodic task poll failed: {}", e);
1508 }
1509 }
1510 }
1511 }
1512}
1513
1514async fn spawn_task_handler(task: &serde_json::Value, runtime: &WorkerTaskRuntime) {
1515 if let Some(id) = task
1516 .get("task")
1517 .and_then(|t| t["id"].as_str())
1518 .or_else(|| task["id"].as_str())
1519 {
1520 if let Err(reason) = check_task_scope(
1522 task,
1523 &runtime.worker_id,
1524 &runtime.agent_name,
1525 &runtime.workspace_ids,
1526 ) {
1527 tracing::debug!(
1528 task_id = id,
1529 reason = %reason,
1530 "SSE task skipped — out of scope"
1531 );
1532 return;
1533 }
1534
1535 match reserve_task_slot(&runtime.processing, id, runtime.max_concurrent_tasks).await {
1536 TaskReservation::Reserved => {
1537 let task_id = id.to_string();
1538 let task = task.clone();
1539 let runtime = runtime.clone();
1540
1541 tokio::spawn(async move {
1542 if let Err(e) = handle_task(&runtime, &task).await {
1543 tracing::error!("Task {} failed: {}", task_id, e);
1544 }
1545 runtime.processing.lock().await.remove(&task_id);
1546 });
1547 }
1548 TaskReservation::AlreadyProcessing => {}
1549 TaskReservation::AtCapacity => {
1550 tracing::debug!(
1551 task_id = id,
1552 max_concurrent_tasks = runtime.max_concurrent_tasks,
1553 "Worker is at task capacity; task will stay pending until a slot frees up"
1554 );
1555 }
1556 }
1557 }
1558}
1559
1560async fn poll_pending_tasks(runtime: &WorkerTaskRuntime) -> Result<()> {
1561 let mut url = format!("{}/v1/agent/tasks?status=pending", runtime.server);
1562 if !runtime.agent_name.is_empty() {
1563 url = format!(
1564 "{}&agent_name={}",
1565 url,
1566 urlencoding::encode(&runtime.agent_name)
1567 );
1568 }
1569 let mut req = runtime.client.get(&url);
1570 if let Some(t) = &runtime.token {
1571 req = req.bearer_auth(t);
1572 }
1573
1574 let res = req.send().await?;
1575 if !res.status().is_success() {
1576 return Ok(());
1577 }
1578
1579 let data: serde_json::Value = res.json().await?;
1580 let tasks = if let Some(arr) = data.as_array() {
1581 arr.clone()
1582 } else {
1583 data["tasks"].as_array().cloned().unwrap_or_default()
1584 };
1585
1586 if !tasks.is_empty() {
1587 tracing::debug!("Poll found {} pending task(s)", tasks.len());
1588 }
1589
1590 for task in &tasks {
1591 let task_id_str = task_str(task, "id").unwrap_or("?");
1593 if let Err(reason) = check_task_scope(
1594 task,
1595 &runtime.worker_id,
1596 &runtime.agent_name,
1597 &runtime.workspace_ids,
1598 ) {
1599 tracing::debug!(
1600 task_id = task_id_str,
1601 reason = %reason,
1602 "Poll task skipped — out of scope"
1603 );
1604 continue;
1605 }
1606 spawn_task_handler(task, runtime).await;
1607 }
1608
1609 Ok(())
1610}
1611
1612async fn handle_task(runtime: &WorkerTaskRuntime, task: &serde_json::Value) -> Result<()> {
1613 let task_id = task_str(task, "id").ok_or_else(|| anyhow::anyhow!("No task ID"))?;
1614 let title = task_str(task, "title").unwrap_or("Untitled");
1615
1616 let metadata = task_metadata(task);
1618 let timeout_secs = task_timeout_secs(task, &metadata);
1619
1620 let mut timeline = task_timeline::TaskTimeline::new(task_id, timeout_secs);
1621 timeline.checkpoint(task_timeline::TaskCheckpoint::TaskReceived);
1622 tracing::info!(
1623 "Handling task: {} ({}) [timeout={}s]",
1624 title,
1625 task_id,
1626 timeout_secs
1627 );
1628
1629 sync_timeline_to_runtime(&timeline, runtime).await;
1632
1633 timeline.checkpoint(task_timeline::TaskCheckpoint::ClaimRequested);
1635 let mut req = runtime
1636 .client
1637 .post(format!("{}/v1/worker/tasks/claim", runtime.server))
1638 .header("X-Worker-ID", &runtime.worker_id);
1639 if let Some(t) = &runtime.token {
1640 req = req.bearer_auth(t);
1641 }
1642
1643 let res = req
1644 .json(&serde_json::json!({ "task_id": task_id }))
1645 .send()
1646 .await?;
1647
1648 if !res.status().is_success() {
1649 let status = res.status();
1650 let text = res.text().await?;
1651 if status == reqwest::StatusCode::CONFLICT {
1652 tracing::debug!(task_id, "Task already claimed by another worker, skipping");
1653 } else {
1654 tracing::warn!(task_id, %status, "Failed to claim task: {}", text);
1655 }
1656 return Ok(());
1657 }
1658
1659 let claim = res.json::<TaskClaimResponse>().await?;
1660 if let Some(claim_timeout_secs) = claim
1661 .task_timeout_seconds
1662 .map(|timeout_secs| timeout_secs.clamp(60, 604800))
1663 {
1664 timeline.update_timeout_secs(claim_timeout_secs);
1665 }
1666 let provider_keys = claim.provider_keys.clone();
1667 let claim_provenance = claim.into_provenance();
1668 timeline.checkpoint_with_detail(
1669 task_timeline::TaskCheckpoint::Claimed,
1670 Some(format!(
1671 "run_id={:?} attempt_id={:?}",
1672 claim_provenance.run_id, claim_provenance.attempt_id
1673 )),
1674 );
1675 tracing::info!(
1676 task_id,
1677 run_id = ?claim_provenance.run_id,
1678 attempt_id = ?claim_provenance.attempt_id,
1679 "Claimed task"
1680 );
1681
1682 let inner_result: Result<(&str, Option<String>, Option<String>, Option<String>)> =
1684 execute_claimed_task(
1685 runtime,
1686 task,
1687 task_id,
1688 title,
1689 &claim_provenance,
1690 provider_keys,
1691 &mut timeline,
1692 )
1693 .await;
1694
1695 let (status, result, error, session_id) = match inner_result {
1696 Ok(tuple) => tuple,
1697 Err(e) => {
1698 tracing::error!(
1699 task_id,
1700 error = %e,
1701 "Task failed after claim (releasing as failed)"
1702 );
1703 timeline.checkpoint_with_detail(
1704 task_timeline::TaskCheckpoint::Failed,
1705 Some(format!("{}", e)),
1706 );
1707 (
1708 "failed",
1709 None,
1710 Some(format!("Worker error after claim: {}", e)),
1711 None,
1712 )
1713 }
1714 };
1715
1716 timeline.emit_diagnostics();
1718 let diagnostics_json = timeline.diagnostics_json();
1719
1720 timeline.checkpoint(task_timeline::TaskCheckpoint::Releasing);
1721 release_task_result(
1722 &runtime.client,
1723 &runtime.server,
1724 &runtime.token,
1725 &runtime.worker_id,
1726 task_id,
1727 status,
1728 result,
1729 error,
1730 session_id,
1731 Some(diagnostics_json),
1732 )
1733 .await?;
1734
1735 timeline.checkpoint(task_timeline::TaskCheckpoint::Released);
1736
1737 runtime.task_progress.lock().await.clear();
1739
1740 tracing::info!("Task released: {} with status: {}", task_id, status);
1741 Ok(())
1742}
1743
1744#[allow(clippy::too_many_lines)]
1747async fn execute_claimed_task<'a>(
1748 runtime: &WorkerTaskRuntime,
1749 task: &'a serde_json::Value,
1750 task_id: &'a str,
1751 title: &'a str,
1752 claim_provenance: &ClaimProvenance,
1753 provider_keys: Option<serde_json::Value>,
1754 timeline: &mut task_timeline::TaskTimeline,
1755) -> Result<(&'static str, Option<String>, Option<String>, Option<String>)> {
1756 let metadata = task_metadata(task);
1757 timeline.checkpoint(task_timeline::TaskCheckpoint::MetadataParsed);
1758 let resume_session_id = metadata
1759 .get("resume_session_id")
1760 .and_then(|v| v.as_str())
1761 .map(|s| s.trim().to_string())
1762 .filter(|s| !s.is_empty());
1763 let complexity_hint = metadata_str(&metadata, &["complexity"]);
1764 let model_tier = metadata_str(&metadata, &["model_tier", "tier"])
1765 .map(|s| s.to_ascii_lowercase())
1766 .or_else(|| {
1767 complexity_hint.as_ref().map(|complexity| {
1768 match complexity.to_ascii_lowercase().as_str() {
1769 "quick" => "fast".to_string(),
1770 "deep" => "heavy".to_string(),
1771 _ => "balanced".to_string(),
1772 }
1773 })
1774 });
1775 let worker_personality = metadata_str(
1776 &metadata,
1777 &["worker_personality", "personality", "agent_personality"],
1778 );
1779 let target_agent_name = metadata_str(&metadata, &["target_agent_name", "agent_name"]);
1780 let raw_model = task_str(task, "model_ref")
1781 .or_else(|| metadata_lookup(&metadata, "model_ref").and_then(|v| v.as_str()))
1782 .or_else(|| task_str(task, "model"))
1783 .or_else(|| metadata_lookup(&metadata, "model").and_then(|v| v.as_str()));
1784 let selected_model = raw_model.map(model_ref_to_provider_model);
1785 let raw_agent = task_str(task, "agent_type")
1786 .or_else(|| task_str(task, "agent"))
1787 .unwrap_or("build");
1788 let prompt = task_str(task, "prompt")
1789 .or_else(|| task_str(task, "description"))
1790 .unwrap_or(title);
1791
1792 let workspace_id = task_str(task, "workspace_id")
1794 .or_else(|| metadata_lookup(&metadata, "workspace_id").and_then(|v| v.as_str()));
1795 let is_virtual_task = workspace_id.map_or(false, |ws| ws == "global" || ws.is_empty());
1796
1797 if raw_agent.eq_ignore_ascii_case("clone_repo") {
1798 return match handle_clone_repo_task(
1799 &runtime.client,
1800 &runtime.server,
1801 &runtime.token,
1802 &runtime.worker_id,
1803 task,
1804 &metadata,
1805 )
1806 .await
1807 {
1808 Ok(message) => Ok(("completed", Some(message), None, None)),
1809 Err(err) => {
1810 tracing::error!(task_id, error = %err, "Clone task failed");
1811 Ok(("failed", None, Some(format!("Error: {}", err)), None))
1812 }
1813 };
1814 }
1815
1816 if is_forage_agent(raw_agent) {
1817 return match handle_forage_task(title, prompt, &metadata, selected_model.clone()).await {
1818 Ok(message) => Ok(("completed", Some(message), None, None)),
1819 Err(err) => {
1820 tracing::error!(task_id, error = %err, "Forage task failed");
1821 Ok(("failed", None, Some(format!("Error: {}", err)), None))
1822 }
1823 };
1824 }
1825
1826 let mut session = if let Some(ref sid) = resume_session_id {
1828 match Session::load(sid).await {
1829 Ok(existing) => {
1830 tracing::info!("Resuming session {} for task {}", sid, task_id);
1831 existing
1832 }
1833 Err(e) => {
1834 tracing::warn!(
1835 "Could not load session {} for task {} ({}), starting a new session",
1836 sid,
1837 task_id,
1838 e
1839 );
1840 Session::new().await?
1841 }
1842 }
1843 } else {
1844 Session::new().await?
1845 };
1846 timeline.checkpoint_with_detail(
1847 task_timeline::TaskCheckpoint::SessionReady,
1848 Some(format!("session_id={}", session.id)),
1849 );
1850
1851 if !is_virtual_task {
1852 if let Some(workspace_path) = resolve_task_workspace_dir(
1853 &runtime.client,
1854 &runtime.server,
1855 &runtime.token,
1856 workspace_id,
1857 )
1858 .await?
1859 {
1860 session.metadata.directory = Some(workspace_path);
1861 }
1862 timeline.checkpoint_with_detail(
1863 task_timeline::TaskCheckpoint::WorkspaceReady,
1864 session
1865 .metadata
1866 .directory
1867 .as_deref()
1868 .map(|d| d.display().to_string()),
1869 );
1870 }
1871
1872 if is_virtual_task {
1875 tracing::info!(
1876 task_id,
1877 workspace_id = workspace_id.unwrap_or("(none)"),
1878 "Virtual task detected — skipping workspace setup"
1879 );
1880 session.metadata.directory = None;
1881 }
1882
1883 let agent_type = if is_swarm_agent(raw_agent) {
1886 raw_agent
1887 } else {
1888 match raw_agent {
1889 "build" | "plan" => raw_agent,
1890 other => {
1891 tracing::info!(
1892 "Agent \"{}\" is not a primary agent, falling back to \"build\"",
1893 other
1894 );
1895 "build"
1896 }
1897 }
1898 };
1899 session.set_agent_name(agent_type.to_string());
1900 session.attach_claim_provenance(claim_provenance);
1901 session.metadata.provider_keys = provider_keys;
1902
1903 if !is_virtual_task {
1905 if let (Some(directory), Some(workspace_id)) =
1906 (session.metadata.directory.as_deref(), workspace_id)
1907 && directory.join(".git").exists()
1908 && let Err(err) = configure_repo_git_auth(directory, workspace_id)
1909 {
1910 tracing::warn!(task_id, error = %err, "Failed to configure Git credential helper");
1911 }
1912 if let Some(directory) = session.metadata.directory.as_deref()
1913 && let Err(err) = install_commit_msg_hook(directory)
1914 {
1915 tracing::warn!(task_id, error = %err, "Failed to install commit-msg hook");
1916 }
1917 timeline.checkpoint(task_timeline::TaskCheckpoint::GitHookInstalled);
1918 }
1919
1920 if let Some(model) = selected_model.clone() {
1921 session.metadata.model = Some(model);
1922 }
1923 timeline.checkpoint_with_detail(
1924 task_timeline::TaskCheckpoint::ModelSelected,
1925 session.metadata.model.clone(),
1926 );
1927
1928 tracing::info!(task_id, agent_type, "Executing prompt: {}", prompt);
1929 timeline.checkpoint_with_detail(
1930 task_timeline::TaskCheckpoint::AgentStarting,
1931 Some(format!(
1932 "agent={} model={:?}",
1933 agent_type, session.metadata.model
1934 )),
1935 );
1936 sync_timeline_to_runtime(&timeline, runtime).await;
1937
1938 let stream_client = runtime.client.clone();
1940 let stream_server = runtime.server.clone();
1941 let stream_token = runtime.token.clone();
1942 let stream_worker_id = runtime.worker_id.clone();
1943 let stream_task_id = task_id.to_string();
1944 let stream_bus = Arc::clone(&runtime.bus);
1945
1946 let output_callback: Arc<dyn Fn(String) + Send + Sync + 'static> =
1947 Arc::new(move |output: String| {
1948 let c = stream_client.clone();
1949 let s = stream_server.clone();
1950 let t = stream_token.clone();
1951 let w = stream_worker_id.clone();
1952 let tid = stream_task_id.clone();
1953
1954 let bus_handle = stream_bus.handle("task-output");
1955 bus_handle.send(
1956 format!("task.{}", tid),
1957 crate::bus::BusMessage::TaskUpdate {
1958 task_id: tid.clone(),
1959 state: crate::a2a::types::TaskState::Working,
1960 message: Some(output.clone()),
1961 },
1962 );
1963
1964 tokio::spawn(async move {
1965 let mut req = c
1966 .post(format!("{}/v1/agent/tasks/{}/output", s, tid))
1967 .header("X-Worker-ID", &w);
1968 if let Some(tok) = &t {
1969 req = req.bearer_auth(tok);
1970 }
1971 let _ = req
1972 .json(&serde_json::json!({
1973 "worker_id": w,
1974 "output": output,
1975 }))
1976 .send()
1977 .await;
1978 });
1979 });
1980
1981 let (status, result, error, session_id) = if is_swarm_agent(agent_type) {
1983 match execute_swarm_with_policy(
1984 &mut session,
1985 prompt,
1986 model_tier.as_deref(),
1987 selected_model,
1988 &metadata,
1989 complexity_hint.as_deref(),
1990 worker_personality.as_deref(),
1991 target_agent_name.as_deref(),
1992 Some(&runtime.bus),
1993 Some(Arc::clone(&output_callback)),
1994 )
1995 .await
1996 {
1997 Ok((session_result, true)) => {
1998 tracing::info!("Swarm task completed successfully: {}", task_id);
1999 (
2000 "completed",
2001 Some(session_result.text),
2002 None,
2003 Some(session_result.session_id),
2004 )
2005 }
2006 Ok((session_result, false)) => {
2007 tracing::warn!("Swarm task completed with failures: {}", task_id);
2008 (
2009 "failed",
2010 Some(session_result.text),
2011 Some("Swarm execution completed with failures".to_string()),
2012 Some(session_result.session_id),
2013 )
2014 }
2015 Err(e) => {
2016 tracing::error!("Swarm task failed: {} - {}", task_id, e);
2017 ("failed", None, Some(format!("Error: {}", e)), None)
2018 }
2019 }
2020 } else {
2021 match execute_session_with_policy(
2022 &mut session,
2023 prompt,
2024 runtime.auto_approve,
2025 model_tier.as_deref(),
2026 Some(Arc::clone(&output_callback)),
2027 )
2028 .await
2029 {
2030 Ok(session_result) => {
2031 tracing::info!("Task completed successfully: {}", task_id);
2032 (
2033 "completed",
2034 Some(session_result.text),
2035 None,
2036 Some(session_result.session_id),
2037 )
2038 }
2039 Err(e) => {
2040 tracing::error!(task_id, error = %e, "Task execution failed");
2041 ("failed", None, Some(format!("Error: {}", e)), None)
2042 }
2043 }
2044 };
2045
2046 timeline.checkpoint_with_detail(
2048 task_timeline::TaskCheckpoint::AgentDone,
2049 Some(format!("status={}", status)),
2050 );
2051 sync_timeline_to_runtime(&timeline, runtime).await;
2052
2053 if timeline.is_near_deadline() {
2055 tracing::warn!(
2056 task_id,
2057 elapsed_secs = format!("{:.1}", timeline.elapsed_secs()),
2058 budget_pct = format!("{:.1}%", timeline.budget_pct_used()),
2059 "Near deadline after agent completion — will attempt graceful shutdown"
2060 );
2061 timeline.checkpoint(task_timeline::TaskCheckpoint::GracefulShutdown);
2062 }
2063
2064 timeline.sync_progress().await;
2066
2067 Ok((
2068 status,
2069 result,
2070 error,
2071 Some(session_id.unwrap_or_else(|| session.id.clone())),
2072 ))
2073}
2074
2075async fn handle_forage_task(
2076 title: &str,
2077 prompt: &str,
2078 metadata: &serde_json::Map<String, serde_json::Value>,
2079 selected_model: Option<String>,
2080) -> Result<String> {
2081 let forage_args = build_forage_args(prompt, title, metadata, selected_model);
2082 let summary = crate::forage::execute_with_summary(forage_args).await?;
2083 Ok(summary.render_text())
2084}
2085
2086async fn handle_clone_repo_task(
2087 client: &Client,
2088 server: &str,
2089 token: &Option<String>,
2090 worker_id: &str,
2091 task: &serde_json::Value,
2092 metadata: &serde_json::Map<String, serde_json::Value>,
2093) -> Result<String> {
2094 let workspace_id = metadata_str(metadata, &["workspace_id"])
2095 .or_else(|| task_str(task, "workspace_id").map(|value| value.to_string()))
2096 .ok_or_else(|| anyhow::anyhow!("Clone task is missing workspace_id metadata"))?;
2097 let workspace = fetch_workspace_record(client, server, token, &workspace_id).await?;
2098 let git_url = metadata_str(metadata, &["git_url"])
2099 .or_else(|| workspace.git_url.clone())
2100 .ok_or_else(|| anyhow::anyhow!("Workspace {} is missing git_url", workspace_id))?;
2101 let branch = metadata_str(metadata, &["git_branch"])
2102 .or_else(|| workspace.git_branch.clone())
2103 .unwrap_or_else(|| "main".to_string());
2104 let repo_path = resolve_workspace_clone_path(&workspace_id, &workspace.path);
2105
2106 if let Some(parent) = repo_path.parent() {
2107 tokio::fs::create_dir_all(parent)
2108 .await
2109 .with_context(|| format!("Failed to create repo parent {}", parent.display()))?;
2110 }
2111
2112 let temp_helper_path =
2113 git_clone_base_dir().join(format!(".{}-git-credential-helper", workspace_id));
2114 write_git_credential_helper_script(&temp_helper_path, &workspace_id)?;
2115
2116 let clone_result = async {
2117 let git_dir = repo_path.join(".git");
2118 if git_dir.exists() {
2119 configure_repo_git_auth(&repo_path, &workspace_id)?;
2120 configure_repo_git_github_app_from_agent_config(
2121 &repo_path,
2122 Some(&serde_json::Value::Object(workspace.agent_config.clone())),
2123 );
2124 run_git_command_at(
2125 Some(&repo_path),
2126 vec!["fetch".to_string(), "origin".to_string(), branch.clone()],
2127 )
2128 .await?;
2129 run_git_command_at(
2130 Some(&repo_path),
2131 vec!["checkout".to_string(), branch.clone()],
2132 )
2133 .await?;
2134 run_git_command_at(
2135 Some(&repo_path),
2136 vec![
2137 "pull".to_string(),
2138 "--ff-only".to_string(),
2139 "origin".to_string(),
2140 branch.clone(),
2141 ],
2142 )
2143 .await?;
2144 } else {
2145 prepare_clone_target(&repo_path).await?;
2146
2147 run_git_command_at(
2148 None,
2149 vec![
2150 "-c".to_string(),
2151 format!("credential.helper={}", temp_helper_path.display()),
2152 "-c".to_string(),
2153 "credential.useHttpPath=true".to_string(),
2154 "clone".to_string(),
2155 "--single-branch".to_string(),
2156 "--branch".to_string(),
2157 branch.clone(),
2158 git_url.clone(),
2159 repo_path.display().to_string(),
2160 ],
2161 )
2162 .await?;
2163 configure_repo_git_auth(&repo_path, &workspace_id)?;
2164 configure_repo_git_github_app_from_agent_config(
2165 &repo_path,
2166 Some(&serde_json::Value::Object(workspace.agent_config.clone())),
2167 );
2168 }
2169 install_commit_msg_hook(&repo_path)?;
2170
2171 register_cloned_workspace(client, server, token, worker_id, &workspace, &repo_path).await?;
2172 enqueue_post_clone_task(client, server, token, worker_id, &workspace_id, metadata).await?;
2173
2174 Ok::<String, anyhow::Error>(format!(
2175 "Repository ready at {} (branch: {})",
2176 repo_path.display(),
2177 branch
2178 ))
2179 }
2180 .await;
2181
2182 let _ = tokio::fs::remove_file(&temp_helper_path).await;
2183 clone_result
2184}
2185
2186async fn register_cloned_workspace(
2187 client: &Client,
2188 server: &str,
2189 token: &Option<String>,
2190 worker_id: &str,
2191 workspace: &RegisteredWorkspaceRecord,
2192 repo_path: &Path,
2193) -> Result<()> {
2194 let mut req = client.post(format!(
2195 "{}/v1/agent/workspaces",
2196 server.trim_end_matches('/')
2197 ));
2198 if let Some(token) = token {
2199 req = req.bearer_auth(token);
2200 }
2201
2202 let response = req
2203 .json(&serde_json::json!({
2204 "workspace_id": workspace.id,
2205 "name": workspace.name,
2206 "path": repo_path.display().to_string(),
2207 "description": workspace.description,
2208 "agent_config": workspace.agent_config,
2209 "git_url": workspace.git_url,
2210 "git_branch": workspace.git_branch,
2211 "worker_id": worker_id,
2212 }))
2213 .send()
2214 .await
2215 .context("Failed to register cloned workspace with server")?;
2216 let status = response.status();
2217 if !status.is_success() {
2218 let body = response.text().await.unwrap_or_default();
2219 anyhow::bail!(
2220 "Failed to register cloned workspace {} ({}): {}",
2221 workspace.id,
2222 status,
2223 body
2224 );
2225 }
2226
2227 Ok(())
2228}
2229
2230async fn enqueue_post_clone_task(
2231 client: &Client,
2232 server: &str,
2233 token: &Option<String>,
2234 worker_id: &str,
2235 workspace_id: &str,
2236 metadata: &serde_json::Map<String, serde_json::Value>,
2237) -> Result<()> {
2238 if !worker_should_enqueue_post_clone_task(metadata) {
2239 return Ok(());
2240 }
2241
2242 let Some(post_clone_task) = metadata.get("post_clone_task") else {
2243 return Ok(());
2244 };
2245 let Some(task) = post_clone_task.as_object() else {
2246 return Ok(());
2247 };
2248 let title = task
2249 .get("title")
2250 .and_then(|value| value.as_str())
2251 .filter(|value| !value.trim().is_empty())
2252 .ok_or_else(|| anyhow::anyhow!("post_clone_task is missing title"))?;
2253 let prompt = task
2254 .get("prompt")
2255 .and_then(|value| value.as_str())
2256 .filter(|value| !value.trim().is_empty())
2257 .ok_or_else(|| anyhow::anyhow!("post_clone_task is missing prompt"))?;
2258 let agent_type = task
2259 .get("agent_type")
2260 .and_then(|value| value.as_str())
2261 .unwrap_or("build");
2262 let mut task_metadata = task
2263 .get("metadata")
2264 .cloned()
2265 .unwrap_or_else(|| serde_json::json!({}));
2266 if let Some(task_metadata_obj) = task_metadata.as_object_mut() {
2267 task_metadata_obj
2268 .entry("target_worker_id".to_string())
2269 .or_insert_with(|| serde_json::Value::String(worker_id.to_string()));
2270 }
2271
2272 let mut req = client.post(format!(
2273 "{}/v1/agent/workspaces/{}/tasks",
2274 server.trim_end_matches('/'),
2275 workspace_id
2276 ));
2277 if let Some(token) = token {
2278 req = req.bearer_auth(token);
2279 }
2280
2281 let response = req
2282 .json(&serde_json::json!({
2283 "title": title,
2284 "prompt": prompt,
2285 "agent_type": agent_type,
2286 "metadata": task_metadata,
2287 }))
2288 .send()
2289 .await
2290 .context("Failed to enqueue post-clone task")?;
2291 let status = response.status();
2292 if !status.is_success() {
2293 let body = response.text().await.unwrap_or_default();
2294 anyhow::bail!("Failed to enqueue post-clone task ({}): {}", status, body);
2295 }
2296 Ok(())
2297}
2298
2299fn worker_should_enqueue_post_clone_task(
2300 metadata: &serde_json::Map<String, serde_json::Value>,
2301) -> bool {
2302 metadata
2303 .get("post_clone_task")
2304 .and_then(|value| value.as_object())
2305 .is_some()
2306}
2307
2308async fn run_git_command_at(current_dir: Option<&Path>, args: Vec<String>) -> Result<String> {
2309 let mut command = Command::new("git");
2310 if let Some(dir) = current_dir {
2311 command.current_dir(dir);
2312 }
2313
2314 let output = command
2315 .args(args.iter().map(String::as_str))
2316 .output()
2317 .await
2318 .context("Failed to execute git command")?;
2319
2320 if output.status.success() {
2321 return Ok(String::from_utf8_lossy(&output.stdout).trim().to_string());
2322 }
2323
2324 Err(anyhow::anyhow!(
2325 "Git command failed: {}",
2326 String::from_utf8_lossy(&output.stderr).trim()
2327 ))
2328}
2329
2330async fn release_task_result(
2331 client: &Client,
2332 server: &str,
2333 token: &Option<String>,
2334 worker_id: &str,
2335 task_id: &str,
2336 status: &str,
2337 result: Option<String>,
2338 error: Option<String>,
2339 session_id: Option<String>,
2340 diagnostics: Option<serde_json::Value>,
2341) -> Result<()> {
2342 let mut req = client
2343 .post(format!("{}/v1/worker/tasks/release", server))
2344 .header("X-Worker-ID", worker_id);
2345 if let Some(token) = token {
2346 req = req.bearer_auth(token);
2347 }
2348
2349 let mut payload = serde_json::json!({
2350 "task_id": task_id,
2351 "status": status,
2352 "result": result,
2353 "error": error,
2354 "session_id": session_id,
2355 });
2356 if let Some(diagnostics) = diagnostics {
2357 if let Some(obj) = payload.as_object_mut() {
2358 obj.insert("diagnostics".to_string(), diagnostics);
2359 }
2360 }
2361
2362 req.json(&payload)
2363 .send()
2364 .await
2365 .context("Failed to release task result")?;
2366
2367 Ok(())
2368}
2369
2370async fn execute_swarm_with_policy(
2371 session: &mut Session,
2372 prompt: &str,
2373 model_tier: Option<&str>,
2374 explicit_model: Option<String>,
2375 metadata: &serde_json::Map<String, serde_json::Value>,
2376 complexity_hint: Option<&str>,
2377 worker_personality: Option<&str>,
2378 target_agent_name: Option<&str>,
2379 bus: Option<&Arc<AgentBus>>,
2380 output_callback: Option<Arc<dyn Fn(String) + Send + Sync + 'static>>,
2381) -> Result<(crate::session::SessionResult, bool)> {
2382 use crate::provider::{ContentPart, Message, Role};
2383
2384 session.add_message(Message {
2385 role: Role::User,
2386 content: vec![ContentPart::Text {
2387 text: prompt.to_string(),
2388 }],
2389 });
2390
2391 if session.title.is_none() {
2392 session.generate_title().await?;
2393 }
2394
2395 let strategy = parse_swarm_strategy(metadata);
2396 let max_subagents = metadata_usize(
2397 metadata,
2398 &["swarm_max_subagents", "max_subagents", "subagents"],
2399 )
2400 .unwrap_or(10)
2401 .clamp(1, 100);
2402 let max_steps_per_subagent = metadata_usize(
2403 metadata,
2404 &[
2405 "swarm_max_steps_per_subagent",
2406 "max_steps_per_subagent",
2407 "max_steps",
2408 ],
2409 )
2410 .unwrap_or(50)
2411 .clamp(1, 200);
2412 let timeout_secs = metadata_u64(metadata, &["swarm_timeout_secs", "timeout_secs", "timeout"])
2413 .unwrap_or(600)
2414 .clamp(30, 3600);
2415 let parallel_enabled =
2416 metadata_bool(metadata, &["swarm_parallel_enabled", "parallel_enabled"]).unwrap_or(true);
2417
2418 let model = resolve_swarm_model(explicit_model, model_tier).await;
2419 if let Some(ref selected_model) = model {
2420 session.metadata.model = Some(selected_model.clone());
2421 }
2422
2423 if let Some(ref cb) = output_callback {
2424 cb(format!(
2425 "[swarm] routing complexity={} tier={} personality={} target_agent={}",
2426 complexity_hint.unwrap_or("standard"),
2427 model_tier.unwrap_or("balanced"),
2428 worker_personality.unwrap_or("auto"),
2429 target_agent_name.unwrap_or("auto")
2430 ));
2431 cb(format!(
2432 "[swarm] config strategy={:?} max_subagents={} max_steps={} timeout={}s tier={}",
2433 strategy,
2434 max_subagents,
2435 max_steps_per_subagent,
2436 timeout_secs,
2437 model_tier.unwrap_or("balanced")
2438 ));
2439 }
2440
2441 let swarm_config = SwarmConfig {
2442 max_subagents,
2443 max_steps_per_subagent,
2444 subagent_timeout_secs: timeout_secs,
2445 parallel_enabled,
2446 model,
2447 working_dir: session
2448 .metadata
2449 .directory
2450 .as_ref()
2451 .map(|p| p.to_string_lossy().to_string()),
2452 ..Default::default()
2453 };
2454
2455 let swarm_result = if output_callback.is_some() {
2456 let (event_tx, mut event_rx) = mpsc::channel(256);
2457 let mut executor = SwarmExecutor::new(swarm_config).with_event_tx(event_tx);
2458 if let Some(bus) = bus {
2459 executor = executor.with_bus(Arc::clone(bus));
2460 }
2461 let prompt_owned = prompt.to_string();
2462 let mut exec_handle =
2463 tokio::spawn(async move { executor.execute(&prompt_owned, strategy).await });
2464
2465 let mut final_result: Option<crate::swarm::SwarmResult> = None;
2466
2467 while final_result.is_none() {
2468 tokio::select! {
2469 maybe_event = event_rx.recv() => {
2470 if let Some(event) = maybe_event
2471 && let Some(ref cb) = output_callback
2472 && let Some(line) = format_swarm_event_for_output(&event) {
2473 cb(line);
2474 }
2475 }
2476 join_result = &mut exec_handle => {
2477 let joined = join_result.map_err(|e| anyhow::anyhow!("Swarm join failure: {}", e))?;
2478 final_result = Some(joined?);
2479 }
2480 }
2481 }
2482
2483 while let Ok(event) = event_rx.try_recv() {
2484 if let Some(ref cb) = output_callback
2485 && let Some(line) = format_swarm_event_for_output(&event)
2486 {
2487 cb(line);
2488 }
2489 }
2490
2491 final_result.ok_or_else(|| anyhow::anyhow!("Swarm execution returned no result"))?
2492 } else {
2493 let mut executor = SwarmExecutor::new(swarm_config);
2494 if let Some(bus) = bus {
2495 executor = executor.with_bus(Arc::clone(bus));
2496 }
2497 executor.execute(prompt, strategy).await?
2498 };
2499
2500 let final_text = if swarm_result.result.trim().is_empty() {
2501 if swarm_result.success {
2502 "Swarm completed without textual output.".to_string()
2503 } else {
2504 "Swarm finished with failures and no textual output.".to_string()
2505 }
2506 } else {
2507 swarm_result.result.clone()
2508 };
2509
2510 session.add_message(Message {
2511 role: Role::Assistant,
2512 content: vec![ContentPart::Text {
2513 text: final_text.clone(),
2514 }],
2515 });
2516 session.save().await?;
2517
2518 Ok((
2519 crate::session::SessionResult {
2520 text: final_text,
2521 session_id: session.id.clone(),
2522 },
2523 swarm_result.success,
2524 ))
2525}
2526
2527async fn execute_session_with_policy(
2530 session: &mut Session,
2531 prompt: &str,
2532 auto_approve: AutoApprove,
2533 model_tier: Option<&str>,
2534 output_callback: Option<Arc<dyn Fn(String) + Send + Sync + 'static>>,
2535) -> Result<crate::session::SessionResult> {
2536 use crate::provider::{ContentPart, Message, ProviderRegistry, Role, parse_model_string};
2537 use std::sync::Arc;
2538
2539 let per_task_keys = session
2540 .metadata
2541 .provider_keys
2542 .as_ref()
2543 .and_then(|value| match crate::provider::PerTaskProviderKeys::from_value(value) {
2544 Ok(keys) => keys,
2545 Err(err) => {
2546 tracing::warn!(error = %err, "Invalid per-task provider key payload; falling back to platform registry");
2547 None
2548 }
2549 });
2550
2551 let registry = if let Some(ref keys) = per_task_keys {
2553 let registry = keys.build_registry();
2554 if registry.list().is_empty() {
2555 tracing::warn!(
2556 "Per-task provider key payload produced no providers; falling back to platform registry"
2557 );
2558 (*ProviderRegistry::shared_from_vault().await.context(
2559 "Failed to load provider registry from Vault — check VAULT_ADDR/VAULT_TOKEN",
2560 )?)
2561 .clone()
2562 } else {
2563 tracing::info!(
2564 source = keys.source(),
2565 providers = ?keys.provider_names(),
2566 "Using tenant-scoped per-task provider registry"
2567 );
2568 registry
2569 }
2570 } else {
2571 (*ProviderRegistry::shared_from_vault().await.context(
2572 "Failed to load provider registry from Vault — check VAULT_ADDR/VAULT_TOKEN",
2573 )?)
2574 .clone()
2575 };
2576 let providers = registry.list();
2577 tracing::info!("Available providers: {:?}", providers);
2578
2579 if providers.is_empty() {
2580 anyhow::bail!(
2581 "No LLM providers available (0 providers loaded). \
2582 Configure API keys in HashiCorp Vault or set environment variables. \
2583 Vault address: {}",
2584 std::env::var("VAULT_ADDR").unwrap_or_else(|_| "(not set)".into())
2585 );
2586 }
2587
2588 let (provider_name, model_id) = if let Some(ref model_str) = session.metadata.model {
2590 let (prov, model) = parse_model_string(model_str);
2591 let prov = prov.map(|p| if p == "zhipuai" { "zai" } else { p });
2592 if prov.is_some() {
2593 (prov.map(|s| s.to_string()), model.to_string())
2594 } else if providers.contains(&model) {
2595 (Some(model.to_string()), String::new())
2596 } else {
2597 (None, model.to_string())
2598 }
2599 } else {
2600 (None, String::new())
2601 };
2602
2603 let provider_slice = providers.as_slice();
2604 if let Some(explicit_provider) = provider_name.as_deref()
2605 && !providers.contains(&explicit_provider)
2606 {
2607 anyhow::bail!(
2608 "Provider '{}' selected explicitly but is unavailable. Available providers: {}",
2609 explicit_provider,
2610 providers.join(", ")
2611 );
2612 }
2613
2614 let selected_provider = provider_name
2616 .as_deref()
2617 .unwrap_or_else(|| choose_provider_for_tier(provider_slice, model_tier));
2618
2619 let provider = registry
2620 .get(selected_provider)
2621 .ok_or_else(|| anyhow::anyhow!("Provider {} not found", selected_provider))?;
2622
2623 session.add_message(Message {
2625 role: Role::User,
2626 content: vec![ContentPart::Text {
2627 text: prompt.to_string(),
2628 }],
2629 });
2630
2631 if session.title.is_none() {
2633 session.generate_title().await?;
2634 }
2635
2636 let model = if !model_id.is_empty() {
2638 model_id
2639 } else {
2640 default_model_for_provider(selected_provider, model_tier)
2641 };
2642
2643 let workspace_dir = session
2645 .metadata
2646 .directory
2647 .clone()
2648 .unwrap_or_else(|| std::env::current_dir().unwrap_or_default());
2649 let tool_registry = create_filtered_registry(
2650 Arc::clone(&provider),
2651 model.clone(),
2652 auto_approve,
2653 &workspace_dir,
2654 output_callback.clone(),
2655 );
2656 let tool_definitions = tool_registry.definitions();
2657
2658 let temperature = if prefers_temperature_one(&model) {
2659 Some(1.0)
2660 } else {
2661 Some(0.7)
2662 };
2663
2664 tracing::info!(
2665 "Using model: {} via provider: {} (tier: {:?})",
2666 model,
2667 selected_provider,
2668 model_tier
2669 );
2670 tracing::info!(
2671 "Available tools: {} (auto_approve: {:?})",
2672 tool_definitions.len(),
2673 auto_approve
2674 );
2675
2676 let system_prompt = crate::agent::builtin::build_system_prompt(&workspace_dir);
2678
2679 let mut final_output = String::new();
2680 let max_steps = 50;
2681
2682 for step in 1..=max_steps {
2683 tracing::info!(step = step, "Agent step starting");
2684
2685 let response = complete_worker_step_with_context_fallback(
2686 Arc::clone(&provider),
2687 session,
2688 &model,
2689 &system_prompt,
2690 &tool_definitions,
2691 temperature,
2692 )
2693 .await?;
2694
2695 crate::telemetry::TOKEN_USAGE.record_model_usage(
2696 &model,
2697 response.usage.prompt_tokens as u64,
2698 response.usage.completion_tokens as u64,
2699 );
2700
2701 let tool_calls: Vec<(String, String, serde_json::Value)> = response
2703 .message
2704 .content
2705 .iter()
2706 .filter_map(|part| {
2707 if let ContentPart::ToolCall {
2708 id,
2709 name,
2710 arguments,
2711 ..
2712 } = part
2713 {
2714 let args: serde_json::Value =
2715 serde_json::from_str(arguments).unwrap_or(serde_json::json!({}));
2716 Some((id.clone(), name.clone(), args))
2717 } else {
2718 None
2719 }
2720 })
2721 .collect();
2722
2723 for part in &response.message.content {
2725 if let ContentPart::Text { text } = part
2726 && !text.is_empty()
2727 {
2728 final_output.push_str(text);
2729 final_output.push('\n');
2730 if let Some(ref cb) = output_callback {
2731 cb(text.clone());
2732 }
2733 }
2734 }
2735
2736 if tool_calls.is_empty() {
2738 session.add_message(response.message.clone());
2739 break;
2740 }
2741
2742 session.add_message(response.message.clone());
2743
2744 tracing::info!(
2745 step = step,
2746 num_tools = tool_calls.len(),
2747 "Executing tool calls"
2748 );
2749
2750 for (tool_id, tool_name, tool_input) in tool_calls {
2752 tracing::info!(tool = %tool_name, tool_id = %tool_id, "Executing tool");
2753
2754 if let Some(ref cb) = output_callback {
2756 cb(format!("[tool:start:{}]", tool_name));
2757 }
2758
2759 if !is_tool_allowed(&tool_name, auto_approve) {
2761 let msg = format!(
2762 "Tool '{}' requires approval but auto-approve policy is {:?}",
2763 tool_name, auto_approve
2764 );
2765 tracing::warn!(tool = %tool_name, "Tool blocked by auto-approve policy");
2766 session.add_message(Message {
2767 role: Role::Tool,
2768 content: vec![ContentPart::ToolResult {
2769 tool_call_id: tool_id,
2770 content: msg,
2771 }],
2772 });
2773 continue;
2774 }
2775
2776 let content = if let Some(tool) = tool_registry.get(&tool_name) {
2777 let tool_timeout_secs =
2778 env_u64("CODETETHER_WORKER_TOOL_TIMEOUT_SECS", 120).clamp(1, 3600);
2779 let exec_result: Result<crate::tool::ToolResult> = match tokio::time::timeout(
2780 Duration::from_secs(tool_timeout_secs),
2781 tool.execute(tool_input.clone()),
2782 )
2783 .await
2784 {
2785 Ok(result) => result,
2786 Err(_) => Ok(crate::tool::ToolResult::structured_error(
2787 "TOOL_TIMEOUT",
2788 &tool_name,
2789 &format!("tool timed out after {tool_timeout_secs}s"),
2790 None,
2791 Some(serde_json::json!({
2792 "hint": "Narrow the request, set a more specific path/include filter, or retry with smaller scope."
2793 })),
2794 )),
2795 };
2796 match exec_result {
2797 Ok(result) => {
2798 tracing::info!(tool = %tool_name, success = result.success, "Tool execution completed");
2799 if let Some(ref cb) = output_callback {
2800 let status = if result.success { "ok" } else { "err" };
2801 cb(format!(
2802 "[tool:{}:{}] {}",
2803 tool_name,
2804 status,
2805 crate::util::truncate_bytes_safe(&result.output, 500)
2806 ));
2807 }
2808 result.output
2809 }
2810 Err(e) => {
2811 tracing::warn!(tool = %tool_name, error = %e, "Tool execution failed");
2812 if let Some(ref cb) = output_callback {
2813 cb(format!("[tool:{}:err] {}", tool_name, e));
2814 }
2815 format!("Error: {}", e)
2816 }
2817 }
2818 } else {
2819 tracing::warn!(tool = %tool_name, "Tool not found");
2820 format!("Error: Unknown tool '{}'", tool_name)
2821 };
2822
2823 session.add_message(Message {
2824 role: Role::Tool,
2825 content: vec![ContentPart::ToolResult {
2826 tool_call_id: tool_id,
2827 content,
2828 }],
2829 });
2830 }
2831 }
2832
2833 session.save().await?;
2834
2835 Ok(crate::session::SessionResult {
2836 text: final_output.trim().to_string(),
2837 session_id: session.id.clone(),
2838 })
2839}
2840
2841async fn complete_worker_step_with_context_fallback(
2842 provider: Arc<dyn crate::provider::Provider>,
2843 session: &Session,
2844 model: &str,
2845 system_prompt: &str,
2846 tool_definitions: &[crate::provider::ToolDefinition],
2847 temperature: Option<f32>,
2848) -> Result<crate::provider::CompletionResponse> {
2849 let options = crate::session::context::RequestOptions {
2850 temperature,
2851 top_p: None,
2852 max_tokens: Some(8192),
2853 force_keep_last: None,
2854 };
2855
2856 match crate::session::context::complete_with_context(
2857 Arc::clone(&provider),
2858 session,
2859 model,
2860 system_prompt,
2861 tool_definitions,
2862 options,
2863 )
2864 .await
2865 {
2866 Ok(response) => Ok(response),
2867 Err(error) if crate::session::helper::error::is_prompt_too_long_error(&error) => {
2868 let compact_tools = compact_worker_tool_definitions(tool_definitions);
2869 tracing::warn!(
2870 error = %error,
2871 tool_count = tool_definitions.len(),
2872 original_tool_schema_bytes = tool_schema_bytes(tool_definitions),
2873 compact_tool_schema_bytes = tool_schema_bytes(&compact_tools),
2874 "Provider rejected prompt after context compaction; retrying with compact tool schemas"
2875 );
2876 crate::session::context::complete_with_context(
2877 provider,
2878 session,
2879 model,
2880 system_prompt,
2881 &compact_tools,
2882 options,
2883 )
2884 .await
2885 .map_err(|retry_error| {
2886 if crate::session::helper::error::is_prompt_too_long_error(&retry_error) {
2887 retry_error.context(
2888 "provider rejected prompt as too long after RLM compaction and compact tool-schema fallback",
2889 )
2890 } else {
2891 retry_error
2892 }
2893 })
2894 }
2895 Err(error) => Err(error),
2896 }
2897}
2898
2899fn compact_worker_tool_definitions(
2900 tools: &[crate::provider::ToolDefinition],
2901) -> Vec<crate::provider::ToolDefinition> {
2902 tools
2903 .iter()
2904 .map(|tool| crate::provider::ToolDefinition {
2905 name: tool.name.clone(),
2906 description: tool.description.clone(),
2907 parameters: serde_json::json!({
2908 "type": "object",
2909 "additionalProperties": true
2910 }),
2911 })
2912 .collect()
2913}
2914
2915fn tool_schema_bytes(tools: &[crate::provider::ToolDefinition]) -> usize {
2916 let mut writer = CountingWriter::default();
2917 serde_json::to_writer(&mut writer, tools)
2918 .map(|_| writer.bytes)
2919 .unwrap_or(0)
2920}
2921
2922#[derive(Default)]
2923struct CountingWriter {
2924 bytes: usize,
2925}
2926
2927impl std::io::Write for CountingWriter {
2928 fn write(&mut self, buf: &[u8]) -> std::io::Result<usize> {
2929 self.bytes += buf.len();
2930 Ok(buf.len())
2931 }
2932
2933 fn flush(&mut self) -> std::io::Result<()> {
2934 Ok(())
2935 }
2936}
2937
2938pub fn start_heartbeat(
2941 client: Client,
2942 server: String,
2943 token: Option<String>,
2944 heartbeat_state: HeartbeatState,
2945 processing: Arc<Mutex<HashSet<String>>>,
2946 cognition_config: CognitionHeartbeatConfig,
2947 task_progress: Arc<Mutex<task_timeline::TaskProgressState>>,
2948) -> JoinHandle<()> {
2949 tokio::spawn(async move {
2950 let mut consecutive_failures = 0u32;
2951 const MAX_FAILURES: u32 = 3;
2952 const HEARTBEAT_INTERVAL_SECS: u64 = 30;
2953 const COGNITION_RETRY_COOLDOWN_SECS: u64 = 300;
2954 let mut cognition_payload_disabled_until: Option<Instant> = None;
2955 let persistent_heartbeat_enabled = persistent_worker_enabled();
2956 let persistent_lease_seconds = persistent_worker_lease_seconds();
2957
2958 let mut interval =
2959 tokio::time::interval(tokio::time::Duration::from_secs(HEARTBEAT_INTERVAL_SECS));
2960 interval.set_missed_tick_behavior(tokio::time::MissedTickBehavior::Skip);
2961
2962 loop {
2963 interval.tick().await;
2964
2965 let active_count = processing.lock().await.len();
2967 heartbeat_state.set_task_count(active_count).await;
2968
2969 let status = if active_count > 0 {
2971 WorkerStatus::Processing
2972 } else {
2973 WorkerStatus::Idle
2974 };
2975 heartbeat_state.set_status(status).await;
2976
2977 let url = format!(
2979 "{}/v1/agent/workers/{}/heartbeat",
2980 server, heartbeat_state.worker_id
2981 );
2982 let mut req = client.post(&url);
2983
2984 if let Some(ref t) = token {
2985 req = req.bearer_auth(t);
2986 }
2987
2988 let status_str = heartbeat_state.status.lock().await.as_str().to_string();
2989 let sub_agents = heartbeat_state.sub_agents_snapshot().await;
2990
2991 let progress_snapshot = task_progress.lock().await.clone();
2993 let task_progress_payload = if progress_snapshot.task_id.is_some() {
2994 Some(serde_json::json!({
2995 "task_id": progress_snapshot.task_id,
2996 "current_checkpoint": progress_snapshot.current_checkpoint,
2997 "elapsed_secs": format!("{:.1}", progress_snapshot.elapsed_secs),
2998 "remaining_secs": format!("{:.1}", progress_snapshot.remaining_secs),
2999 "budget_pct_used": format!("{:.1}%", progress_snapshot.budget_pct_used),
3000 "checkpoints_reached": progress_snapshot.checkpoints_reached.len(),
3001 "last_detail": progress_snapshot.last_detail,
3002 }))
3003 } else {
3004 None
3005 };
3006
3007 let base_payload = serde_json::json!({
3008 "worker_id": &heartbeat_state.worker_id,
3009 "agent_name": &heartbeat_state.agent_name,
3010 "status": status_str,
3011 "active_task_count": active_count,
3012 "sub_agents": sub_agents,
3013 });
3014 let mut payload = base_payload.clone();
3015 let mut included_cognition_payload = false;
3016 let cognition_payload_allowed = cognition_payload_disabled_until
3017 .map(|until| Instant::now() >= until)
3018 .unwrap_or(true);
3019
3020 if cognition_config.enabled
3021 && cognition_payload_allowed
3022 && let Some(cognition_payload) =
3023 fetch_cognition_heartbeat_payload(&client, &cognition_config).await
3024 && let Some(obj) = payload.as_object_mut()
3025 {
3026 obj.insert("cognition".to_string(), cognition_payload);
3027 included_cognition_payload = true;
3028 }
3029
3030 if let Some(progress_json) = task_progress_payload.clone() {
3032 if let Some(obj) = payload.as_object_mut() {
3033 obj.insert("task_progress".to_string(), progress_json);
3034 }
3035 }
3036
3037 match req.json(&payload).send().await {
3038 Ok(res) => {
3039 if res.status().is_success() {
3040 consecutive_failures = 0;
3041 tracing::debug!(
3042 worker_id = %heartbeat_state.worker_id,
3043 status = status_str,
3044 active_tasks = active_count,
3045 "Heartbeat sent successfully"
3046 );
3047 } else if included_cognition_payload && res.status().is_client_error() {
3048 tracing::warn!(
3049 worker_id = %heartbeat_state.worker_id,
3050 status = %res.status(),
3051 "Heartbeat cognition payload rejected, retrying without cognition payload"
3052 );
3053
3054 let mut retry_req = client.post(&url);
3055 if let Some(ref t) = token {
3056 retry_req = retry_req.bearer_auth(t);
3057 }
3058
3059 match retry_req.json(&base_payload).send().await {
3060 Ok(retry_res) if retry_res.status().is_success() => {
3061 cognition_payload_disabled_until = Some(
3062 Instant::now()
3063 + Duration::from_secs(COGNITION_RETRY_COOLDOWN_SECS),
3064 );
3065 consecutive_failures = 0;
3066 tracing::warn!(
3067 worker_id = %heartbeat_state.worker_id,
3068 retry_after_secs = COGNITION_RETRY_COOLDOWN_SECS,
3069 "Paused cognition heartbeat payload after schema rejection"
3070 );
3071 }
3072 Ok(retry_res) => {
3073 consecutive_failures += 1;
3074 tracing::warn!(
3075 worker_id = %heartbeat_state.worker_id,
3076 status = %retry_res.status(),
3077 failures = consecutive_failures,
3078 "Heartbeat failed even after retry without cognition payload"
3079 );
3080 }
3081 Err(e) => {
3082 consecutive_failures += 1;
3083 tracing::warn!(
3084 worker_id = %heartbeat_state.worker_id,
3085 error = %e,
3086 failures = consecutive_failures,
3087 "Heartbeat retry without cognition payload failed"
3088 );
3089 }
3090 }
3091 } else {
3092 consecutive_failures += 1;
3093 tracing::warn!(
3094 worker_id = %heartbeat_state.worker_id,
3095 status = %res.status(),
3096 failures = consecutive_failures,
3097 "Heartbeat failed"
3098 );
3099 }
3100 }
3101 Err(e) => {
3102 consecutive_failures += 1;
3103 tracing::warn!(
3104 worker_id = %heartbeat_state.worker_id,
3105 error = %e,
3106 failures = consecutive_failures,
3107 "Heartbeat request failed"
3108 );
3109 }
3110 }
3111
3112 if persistent_heartbeat_enabled
3113 && let Some(progress_json) = task_progress_payload
3114 && let Err(e) = send_extended_task_heartbeat(
3115 &client,
3116 &server,
3117 &token,
3118 &heartbeat_state.worker_id,
3119 progress_json,
3120 persistent_lease_seconds,
3121 )
3122 .await
3123 {
3124 tracing::warn!(
3125 worker_id = %heartbeat_state.worker_id,
3126 error = %e,
3127 "Extended task heartbeat failed"
3128 );
3129 }
3130
3131 if consecutive_failures >= MAX_FAILURES {
3133 tracing::error!(
3134 worker_id = %heartbeat_state.worker_id,
3135 failures = consecutive_failures,
3136 "Heartbeat failed {} consecutive times - worker will continue running and attempt reconnection via SSE loop",
3137 MAX_FAILURES
3138 );
3139 consecutive_failures = 0;
3141 }
3142 }
3143 })
3144}
3145
3146async fn send_extended_task_heartbeat(
3147 client: &Client,
3148 server: &str,
3149 token: &Option<String>,
3150 worker_id: &str,
3151 progress: serde_json::Value,
3152 lease_seconds: u64,
3153) -> Result<()> {
3154 let task_id = progress
3155 .get("task_id")
3156 .and_then(|value| value.as_str())
3157 .filter(|value| !value.is_empty())
3158 .context("active task progress missing task_id")?
3159 .to_string();
3160 let checkpoint_seq = progress
3161 .get("checkpoints_reached")
3162 .and_then(|value| value.as_u64())
3163 .unwrap_or(1)
3164 .max(1);
3165 let status_message = progress
3166 .get("current_checkpoint")
3167 .and_then(|value| value.as_str())
3168 .map(|value| value.to_string());
3169
3170 let mut req = client.post(format!("{}/v1/worker/tasks/heartbeat-extended", server));
3171 if let Some(token) = token {
3172 req = req.bearer_auth(token);
3173 }
3174
3175 let response = req
3176 .json(&serde_json::json!({
3177 "task_id": &task_id,
3178 "worker_id": worker_id,
3179 "status_message": status_message,
3180 "checkpoint": progress,
3181 "checkpoint_seq": checkpoint_seq,
3182 "lease_extension_seconds": lease_seconds,
3183 }))
3184 .send()
3185 .await?;
3186
3187 let status = response.status();
3188 if !status.is_success() {
3189 let body = response.text().await.unwrap_or_default();
3190 anyhow::bail!("extended heartbeat rejected with {}: {}", status, body);
3191 }
3192
3193 let body = response
3194 .json::<serde_json::Value>()
3195 .await
3196 .unwrap_or_default();
3197 if body.get("success").and_then(|value| value.as_bool()) == Some(false) {
3198 tracing::debug!(
3199 task_id = %task_id,
3200 message = body
3201 .get("message")
3202 .and_then(|value| value.as_str())
3203 .unwrap_or("no active run"),
3204 "Extended task heartbeat skipped"
3205 );
3206 return Ok(());
3207 }
3208
3209 Ok(())
3210}
3211
3212async fn fetch_cognition_heartbeat_payload(
3213 client: &Client,
3214 config: &CognitionHeartbeatConfig,
3215) -> Option<serde_json::Value> {
3216 let status_url = format!("{}/v1/cognition/status", config.source_base_url);
3217 let status_res = tokio::time::timeout(
3218 Duration::from_millis(config.request_timeout_ms),
3219 client.get(status_url).send(),
3220 )
3221 .await
3222 .ok()?
3223 .ok()?;
3224
3225 if !status_res.status().is_success() {
3226 return None;
3227 }
3228
3229 let status: CognitionStatusSnapshot = status_res.json().await.ok()?;
3230 let mut payload = serde_json::json!({
3231 "running": status.running,
3232 "last_tick_at": status.last_tick_at,
3233 "active_persona_count": status.active_persona_count,
3234 "events_buffered": status.events_buffered,
3235 "snapshots_buffered": status.snapshots_buffered,
3236 "loop_interval_ms": status.loop_interval_ms,
3237 });
3238
3239 if config.include_thought_summary {
3240 let snapshot_url = format!("{}/v1/cognition/snapshots/latest", config.source_base_url);
3241 let snapshot_res = tokio::time::timeout(
3242 Duration::from_millis(config.request_timeout_ms),
3243 client.get(snapshot_url).send(),
3244 )
3245 .await
3246 .ok()
3247 .and_then(Result::ok);
3248
3249 if let Some(snapshot_res) = snapshot_res
3250 && snapshot_res.status().is_success()
3251 && let Ok(snapshot) = snapshot_res.json::<CognitionLatestSnapshot>().await
3252 && let Some(obj) = payload.as_object_mut()
3253 {
3254 obj.insert(
3255 "latest_snapshot_at".to_string(),
3256 serde_json::Value::String(snapshot.generated_at),
3257 );
3258 obj.insert(
3259 "latest_thought".to_string(),
3260 serde_json::Value::String(trim_for_heartbeat(
3261 &snapshot.summary,
3262 config.summary_max_chars,
3263 )),
3264 );
3265 if let Some(model) = snapshot
3266 .metadata
3267 .get("model")
3268 .and_then(serde_json::Value::as_str)
3269 {
3270 obj.insert(
3271 "latest_thought_model".to_string(),
3272 serde_json::Value::String(model.to_string()),
3273 );
3274 }
3275 if let Some(source) = snapshot
3276 .metadata
3277 .get("source")
3278 .and_then(serde_json::Value::as_str)
3279 {
3280 obj.insert(
3281 "latest_thought_source".to_string(),
3282 serde_json::Value::String(source.to_string()),
3283 );
3284 }
3285 }
3286 }
3287
3288 Some(payload)
3289}
3290
3291fn trim_for_heartbeat(input: &str, max_chars: usize) -> String {
3292 if input.chars().count() <= max_chars {
3293 return input.trim().to_string();
3294 }
3295
3296 let mut trimmed = String::with_capacity(max_chars + 3);
3297 for ch in input.chars().take(max_chars) {
3298 trimmed.push(ch);
3299 }
3300 trimmed.push_str("...");
3301 trimmed.trim().to_string()
3302}
3303
3304fn env_bool(name: &str, default: bool) -> bool {
3305 std::env::var(name)
3306 .ok()
3307 .and_then(|v| match v.to_ascii_lowercase().as_str() {
3308 "1" | "true" | "yes" | "on" => Some(true),
3309 "0" | "false" | "no" | "off" => Some(false),
3310 _ => None,
3311 })
3312 .unwrap_or(default)
3313}
3314
3315fn env_usize(name: &str, default: usize) -> usize {
3316 std::env::var(name)
3317 .ok()
3318 .and_then(|v| v.parse::<usize>().ok())
3319 .unwrap_or(default)
3320}
3321
3322fn env_u64(name: &str, default: u64) -> u64 {
3323 std::env::var(name)
3324 .ok()
3325 .and_then(|v| v.parse::<u64>().ok())
3326 .unwrap_or(default)
3327}
3328
3329fn maybe_configure_repo_git_auth_from_entry(entry: &serde_json::Value) {
3330 let Some(workspace_id) = entry.get("id").and_then(|v| v.as_str()) else {
3331 return;
3332 };
3333 let Some(path) = entry.get("path").and_then(|v| v.as_str()) else {
3334 return;
3335 };
3336 let has_git_remote = entry
3337 .get("git_url")
3338 .and_then(|v| v.as_str())
3339 .is_some_and(|value| !value.trim().is_empty());
3340 if !has_git_remote {
3341 return;
3342 }
3343
3344 let repo_path = Path::new(path);
3345 if !repo_path.join(".git").exists() {
3346 return;
3347 }
3348
3349 if let Err(error) = configure_repo_git_auth(repo_path, workspace_id) {
3350 tracing::debug!(
3351 workspace_id,
3352 path,
3353 error = %error,
3354 "Workspace sync could not install Git credential helper"
3355 );
3356 }
3357 configure_repo_git_github_app_from_agent_config(repo_path, entry.get("agent_config"));
3358}
3359
3360fn configure_repo_git_github_app_from_agent_config(
3361 repo_path: &Path,
3362 agent_config: Option<&serde_json::Value>,
3363) {
3364 let installation_id = agent_config
3365 .and_then(|value| value.get("git_auth"))
3366 .and_then(|value| value.get("github_app"))
3367 .and_then(|value| value.get("installation_id"))
3368 .and_then(|value| value.as_str());
3369 let app_id = agent_config
3370 .and_then(|value| value.get("git_auth"))
3371 .and_then(|value| value.get("github_app"))
3372 .and_then(|value| value.get("app_id"))
3373 .and_then(|value| value.as_str());
3374 if let Err(error) = configure_repo_git_github_app(repo_path, installation_id, app_id) {
3375 tracing::debug!(path = %repo_path.display(), error = %error, "Failed to persist GitHub App repo metadata");
3376 }
3377}
3378
3379fn start_workspace_sync(
3383 client: Client,
3384 server: String,
3385 token: Option<String>,
3386 shared_codebases: Arc<Mutex<Vec<String>>>,
3387) -> JoinHandle<()> {
3388 tokio::spawn(async move {
3389 const POLL_INTERVAL_SECS: u64 = 60;
3390 let mut interval = tokio::time::interval(Duration::from_secs(POLL_INTERVAL_SECS));
3391 interval.set_missed_tick_behavior(tokio::time::MissedTickBehavior::Skip);
3392 interval.tick().await; loop {
3395 interval.tick().await;
3396 if let Err(e) =
3397 sync_workspaces_from_server(&client, &server, &token, &shared_codebases).await
3398 {
3399 tracing::warn!("Workspace sync failed: {}", e);
3400 }
3401 }
3402 })
3403}
3404
3405async fn sync_workspaces_from_server(
3409 client: &Client,
3410 server: &str,
3411 token: &Option<String>,
3412 shared_codebases: &Arc<Mutex<Vec<String>>>,
3413) -> Result<()> {
3414 let mut req = client.get(format!("{}/v1/agent/workspaces", server));
3415 if let Some(t) = token {
3416 req = req.bearer_auth(t);
3417 }
3418
3419 let res = req.send().await?;
3420 if !res.status().is_success() {
3421 tracing::debug!(
3422 status = %res.status(),
3423 "Workspace sync: server returned non-success, skipping"
3424 );
3425 return Ok(());
3426 }
3427
3428 let data: serde_json::Value = res.json().await?;
3429
3430 let entries = if let Some(arr) = data.as_array() {
3432 arr.clone()
3433 } else {
3434 data["workspaces"]
3435 .as_array()
3436 .or_else(|| data["codebases"].as_array())
3437 .cloned()
3438 .unwrap_or_default()
3439 };
3440
3441 let mut new_paths: Vec<String> = Vec::new();
3442 {
3443 let current = shared_codebases.lock().await;
3444 for entry in &entries {
3445 maybe_configure_repo_git_auth_from_entry(entry);
3446 let path = match entry["path"].as_str().filter(|p| !p.is_empty()) {
3447 Some(p) => p,
3448 None => continue,
3449 };
3450 if std::path::Path::new(path).exists() && !current.iter().any(|c| c.as_str() == path) {
3453 new_paths.push(path.to_string());
3454 }
3455 }
3456 }
3457
3458 if !new_paths.is_empty() {
3459 let mut current = shared_codebases.lock().await;
3460 for path in &new_paths {
3461 tracing::info!(
3462 path = %path,
3463 "Workspace sync: auto-discovered local path, adding to codebases"
3464 );
3465 current.push(path.clone());
3466 }
3467 tracing::info!(
3468 added = new_paths.len(),
3469 total = current.len(),
3470 "Workspace sync complete -- new paths take effect on next reconnect"
3471 );
3472 } else {
3473 tracing::debug!("Workspace sync: no new local paths found");
3474 }
3475
3476 Ok(())
3477}
3478
3479#[cfg(test)]
3480mod tests {
3481 use super::*;
3482 use crate::provider::{
3483 CompletionRequest, CompletionResponse, ContentPart, FinishReason, Message, ModelInfo,
3484 Provider, Role, StreamChunk, ToolDefinition, Usage,
3485 };
3486 use futures::stream::{self, BoxStream};
3487 use serde_json::json;
3488 use std::sync::atomic::{AtomicBool, AtomicUsize, Ordering};
3489
3490 struct ContextErrorUntilCompactProvider {
3491 calls: AtomicUsize,
3492 saw_compact_schema: AtomicBool,
3493 }
3494
3495 impl ContextErrorUntilCompactProvider {
3496 fn new() -> Self {
3497 Self {
3498 calls: AtomicUsize::new(0),
3499 saw_compact_schema: AtomicBool::new(false),
3500 }
3501 }
3502
3503 fn calls(&self) -> usize {
3504 self.calls.load(Ordering::SeqCst)
3505 }
3506
3507 fn saw_compact_schema(&self) -> bool {
3508 self.saw_compact_schema.load(Ordering::SeqCst)
3509 }
3510 }
3511
3512 #[async_trait::async_trait]
3513 impl Provider for ContextErrorUntilCompactProvider {
3514 fn name(&self) -> &str {
3515 "mock"
3516 }
3517
3518 async fn list_models(&self) -> Result<Vec<ModelInfo>> {
3519 Ok(Vec::new())
3520 }
3521
3522 async fn complete(&self, request: CompletionRequest) -> Result<CompletionResponse> {
3523 self.calls.fetch_add(1, Ordering::SeqCst);
3524 let compact = request.tools.iter().all(|tool| {
3525 tool.parameters.get("additionalProperties") == Some(&serde_json::Value::Bool(true))
3526 });
3527 if compact {
3528 self.saw_compact_schema.store(true, Ordering::SeqCst);
3529 return Ok(CompletionResponse {
3530 message: Message {
3531 role: Role::Assistant,
3532 content: vec![ContentPart::Text { text: "ok".into() }],
3533 },
3534 usage: Usage::default(),
3535 finish_reason: FinishReason::Stop,
3536 });
3537 }
3538 anyhow::bail!(
3539 "Your input exceeds the context window of this model. Please adjust your input and try again."
3540 )
3541 }
3542
3543 async fn complete_stream(
3544 &self,
3545 _request: CompletionRequest,
3546 ) -> Result<BoxStream<'static, StreamChunk>> {
3547 Ok(Box::pin(stream::empty()))
3548 }
3549 }
3550
3551 #[test]
3552 fn metadata_lookup_reads_nested_forage_keys() {
3553 let metadata = json!({
3554 "forage": {
3555 "execute": true,
3556 "top": 5,
3557 }
3558 })
3559 .as_object()
3560 .cloned()
3561 .unwrap();
3562
3563 assert_eq!(metadata_bool(&metadata, &["execute"]), Some(true));
3564 assert_eq!(metadata_usize(&metadata, &["top"]), Some(5));
3565 }
3566
3567 #[test]
3568 fn build_forage_args_defaults_prompt_to_moonshot_and_local_mode() {
3569 let metadata = serde_json::Map::new();
3570 let args = build_forage_args(
3571 "Expand enterprise adoption in regulated markets",
3572 "forage task",
3573 &metadata,
3574 Some("openrouter/z-ai/glm-5".to_string()),
3575 );
3576
3577 assert_eq!(
3578 args.moonshots,
3579 vec!["Expand enterprise adoption in regulated markets".to_string()]
3580 );
3581 assert!(args.no_s3);
3582 assert_eq!(args.model.as_deref(), Some("openrouter/z-ai/glm-5"));
3583 assert_eq!(args.execution_engine, "run");
3584 }
3585
3586 #[test]
3587 fn compact_worker_tool_definitions_preserve_tool_identity() {
3588 let tools = vec![ToolDefinition {
3589 name: "large_tool".to_string(),
3590 description: "Large tool schema".to_string(),
3591 parameters: json!({
3592 "type": "object",
3593 "properties": {
3594 "payload": {
3595 "type": "string",
3596 "description": "x".repeat(20_000)
3597 }
3598 }
3599 }),
3600 }];
3601
3602 let compact = compact_worker_tool_definitions(&tools);
3603
3604 assert_eq!(compact[0].name, tools[0].name);
3605 assert_eq!(compact[0].description, tools[0].description);
3606 assert_eq!(
3607 compact[0].parameters.get("additionalProperties"),
3608 Some(&serde_json::Value::Bool(true))
3609 );
3610 assert!(tool_schema_bytes(&compact) < tool_schema_bytes(&tools) / 10);
3611 }
3612
3613 #[tokio::test]
3614 async fn worker_retries_context_window_error_with_compact_tool_schema() {
3615 let provider = Arc::new(ContextErrorUntilCompactProvider::new());
3616 let provider_dyn: Arc<dyn Provider> = provider.clone();
3617 let mut session = Session::new().await.expect("session");
3618 session.add_message(Message {
3619 role: Role::User,
3620 content: vec![ContentPart::Text {
3621 text: "Run cronjob \"rule:Retry Failed Conversion Forwards\".".to_string(),
3622 }],
3623 });
3624 let tools = vec![ToolDefinition {
3625 name: "large_tool".to_string(),
3626 description: "Large tool schema".to_string(),
3627 parameters: json!({
3628 "type": "object",
3629 "properties": {
3630 "payload": {
3631 "type": "string",
3632 "description": "x".repeat(20_000)
3633 }
3634 }
3635 }),
3636 }];
3637
3638 let response = complete_worker_step_with_context_fallback(
3639 provider_dyn,
3640 &session,
3641 "mock-model",
3642 "system",
3643 &tools,
3644 Some(0.7),
3645 )
3646 .await
3647 .expect("compact tool-schema retry should recover");
3648
3649 assert_eq!(
3650 response
3651 .message
3652 .content
3653 .iter()
3654 .filter_map(|part| match part {
3655 ContentPart::Text { text } => Some(text.as_str()),
3656 _ => None,
3657 })
3658 .collect::<String>(),
3659 "ok"
3660 );
3661 assert_eq!(provider.calls(), 5);
3662 assert!(provider.saw_compact_schema());
3663 }
3664
3665 #[test]
3666 fn build_forage_args_honors_nested_forage_configuration() {
3667 let metadata = json!({
3668 "forage": {
3669 "top": 7,
3670 "loop": true,
3671 "max_cycles": 2,
3672 "execute": true,
3673 "no_s3": false,
3674 "moonshots": ["Ship autonomous OKR execution", "Reduce operator toil"],
3675 "moonshot_required": true,
3676 "moonshot_min_alignment": "0.25",
3677 "execution_engine": "swarm",
3678 "run_timeout_secs": 1200,
3679 "fail_fast": true,
3680 "swarm_strategy": "stage",
3681 "swarm_max_subagents": 4,
3682 "swarm_max_steps": 42,
3683 "swarm_subagent_timeout_secs": 180
3684 }
3685 })
3686 .as_object()
3687 .cloned()
3688 .unwrap();
3689
3690 let args = build_forage_args("fallback prompt", "title", &metadata, None);
3691
3692 assert_eq!(args.top, 7);
3693 assert!(args.loop_mode);
3694 assert_eq!(args.max_cycles, 2);
3695 assert!(args.execute);
3696 assert!(!args.no_s3);
3697 assert_eq!(args.moonshots.len(), 2);
3698 assert!(args.moonshot_required);
3699 assert!((args.moonshot_min_alignment - 0.25).abs() < f64::EPSILON);
3700 assert_eq!(args.execution_engine, "swarm");
3701 assert_eq!(args.run_timeout_secs, 1200);
3702 assert!(args.fail_fast);
3703 assert_eq!(args.swarm_strategy, "stage");
3704 assert_eq!(args.swarm_max_subagents, 4);
3705 assert_eq!(args.swarm_max_steps, 42);
3706 assert_eq!(args.swarm_subagent_timeout_secs, 180);
3707 }
3708
3709 #[test]
3710 fn worker_allows_github_app_post_clone_followups() {
3711 let metadata = json!({
3712 "source": "github-app",
3713 "post_clone_task": {
3714 "title": "Work issue #76",
3715 "prompt": "fix it"
3716 }
3717 })
3718 .as_object()
3719 .cloned()
3720 .unwrap();
3721
3722 assert!(worker_should_enqueue_post_clone_task(&metadata));
3723 }
3724
3725 #[test]
3726 fn worker_allows_legacy_post_clone_followups() {
3727 let metadata = json!({
3728 "post_clone_task": {
3729 "title": "Continue after clone",
3730 "prompt": "run build"
3731 }
3732 })
3733 .as_object()
3734 .cloned()
3735 .unwrap();
3736
3737 assert!(worker_should_enqueue_post_clone_task(&metadata));
3738 }
3739
3740 #[test]
3741 fn resolve_worker_id_prefers_env() {
3742 let original = std::env::var("CODETETHER_WORKER_ID").ok();
3743 unsafe {
3744 std::env::set_var("CODETETHER_WORKER_ID", "harvester-test-worker");
3745 }
3746 let resolved = resolve_worker_id();
3747 match original {
3748 Some(value) => unsafe {
3749 std::env::set_var("CODETETHER_WORKER_ID", value);
3750 },
3751 None => unsafe {
3752 std::env::remove_var("CODETETHER_WORKER_ID");
3753 },
3754 }
3755 assert_eq!(resolved, "harvester-test-worker");
3756 }
3757
3758 #[test]
3759 fn advertised_interfaces_include_http_and_bus_urls() {
3760 let interfaces = advertised_interfaces(Some("http://worker.test:8080/"));
3761 assert_eq!(interfaces["http"]["base_url"], "http://worker.test:8080");
3762 assert_eq!(
3763 interfaces["bus"]["stream_url"],
3764 "http://worker.test:8080/v1/bus/stream"
3765 );
3766 assert_eq!(
3767 interfaces["bus"]["publish_url"],
3768 "http://worker.test:8080/v1/bus/publish"
3769 );
3770 }
3771
3772 #[test]
3773 fn advertised_interfaces_omit_empty_public_url() {
3774 assert_eq!(advertised_interfaces(None), serde_json::json!({}));
3775 assert_eq!(advertised_interfaces(Some(" ")), serde_json::json!({}));
3776 }
3777
3778 #[test]
3779 fn worker_http_client_builds_with_stream_decoders_disabled() {
3780 assert!(build_worker_http_client().is_ok());
3781 }
3782
3783 #[test]
3784 fn task_timeout_prefers_fire_and_forget_payload_timeout() {
3785 let task = json!({
3786 "id": "task-1",
3787 "task_timeout_seconds": 604800,
3788 "metadata": {"timeout_secs": 1200}
3789 });
3790 let metadata = task_metadata(&task);
3791 assert_eq!(task_timeout_secs(&task, &metadata), 604800);
3792 }
3793
3794 #[tokio::test]
3795 async fn reserve_task_slot_enforces_capacity() {
3796 let processing = Arc::new(Mutex::new(HashSet::new()));
3797
3798 assert_eq!(
3799 reserve_task_slot(&processing, "task-1", 2).await,
3800 TaskReservation::Reserved
3801 );
3802 assert_eq!(
3803 reserve_task_slot(&processing, "task-1", 2).await,
3804 TaskReservation::AlreadyProcessing
3805 );
3806 assert_eq!(
3807 reserve_task_slot(&processing, "task-2", 2).await,
3808 TaskReservation::Reserved
3809 );
3810 assert_eq!(
3811 reserve_task_slot(&processing, "task-3", 2).await,
3812 TaskReservation::AtCapacity
3813 );
3814 }
3815
3816 #[test]
3817 fn normalize_max_concurrent_tasks_never_returns_zero() {
3818 assert_eq!(normalize_max_concurrent_tasks(0), 1);
3819 assert_eq!(normalize_max_concurrent_tasks(3), 3);
3820 }
3821}