use std::collections::HashMap;
use std::sync::Arc;
use std::time::Duration;
use async_trait::async_trait;
use tokio::sync::Mutex;
use uuid::Uuid;
use crate::agent::agentic_loop::{
AgenticLoopConfig, LoopDelegate, LoopOutcome, LoopSignal, TextAction, truncate_for_preview,
};
use crate::config::SafetyConfig;
use crate::context::JobContext;
use crate::error::WorkerError;
use crate::llm::{ChatMessage, LlmProvider, Reasoning, ReasoningContext};
use crate::safety::SafetyLayer;
use crate::tools::ToolRegistry;
use crate::tools::execute::{execute_tool_simple, process_tool_result};
use crate::worker::api::{CompletionReport, JobEventPayload, StatusUpdate, WorkerHttpClient};
use crate::worker::proxy_llm::ProxyLlmProvider;
pub struct WorkerConfig {
pub job_id: Uuid,
pub orchestrator_url: String,
pub max_iterations: u32,
pub timeout: Duration,
}
impl Default for WorkerConfig {
fn default() -> Self {
Self {
job_id: Uuid::nil(),
orchestrator_url: String::new(),
max_iterations: 50,
timeout: Duration::from_secs(600),
}
}
}
pub struct WorkerRuntime {
config: WorkerConfig,
client: Arc<WorkerHttpClient>,
llm: Arc<dyn LlmProvider>,
safety: Arc<SafetyLayer>,
tools: Arc<ToolRegistry>,
extra_env: Arc<HashMap<String, String>>,
}
impl WorkerRuntime {
pub fn new(config: WorkerConfig) -> Result<Self, WorkerError> {
let client = Arc::new(WorkerHttpClient::from_env(
config.orchestrator_url.clone(),
config.job_id,
)?);
let llm: Arc<dyn LlmProvider> = Arc::new(ProxyLlmProvider::new(
Arc::clone(&client),
"proxied".to_string(),
));
let safety = Arc::new(SafetyLayer::new(&SafetyConfig {
max_output_length: 100_000,
injection_check_enabled: true,
}));
let tools = Arc::new(ToolRegistry::new());
tools.register_container_tools();
Ok(Self {
config,
client,
llm,
safety,
tools,
extra_env: Arc::new(HashMap::new()),
})
}
pub async fn run(mut self) -> Result<(), WorkerError> {
tracing::info!("Worker starting for job {}", self.config.job_id);
let job = self.client.get_job().await?;
tracing::info!(
"Received job: {} - {}",
job.title,
truncate_for_preview(&job.description, 100)
);
let credentials = self.client.fetch_credentials().await?;
{
let mut env_map = HashMap::new();
for cred in &credentials {
env_map.insert(cred.env_var.clone(), cred.value.clone());
}
self.extra_env = Arc::new(env_map);
}
if !credentials.is_empty() {
tracing::info!(
"Fetched {} credential(s) for child process injection",
credentials.len()
);
}
self.client
.report_status(&StatusUpdate {
state: "in_progress".to_string(),
message: Some("Worker started, beginning execution".to_string()),
iteration: 0,
})
.await?;
let reasoning = Reasoning::new(self.llm.clone());
let mut reason_ctx = ReasoningContext::new().with_job(&job.description);
reason_ctx.messages.push(ChatMessage::system(format!(
r#"You are an autonomous agent running inside a Docker container.
Job: {}
Description: {}
You have tools for shell commands, file operations, and code editing.
Work independently to complete this job. When finished, your final message MUST include the phrase "The job is complete" to signal termination."#,
job.title, job.description
)));
reason_ctx.available_tools = self.tools.tool_definitions().await;
let iteration_tracker = Arc::new(Mutex::new(0u32));
let result = tokio::time::timeout(self.config.timeout, async {
let delegate = ContainerDelegate {
client: self.client.clone(),
safety: self.safety.clone(),
tools: self.tools.clone(),
extra_env: self.extra_env.clone(),
last_output: Mutex::new(String::new()),
iteration_tracker: iteration_tracker.clone(),
};
let config = AgenticLoopConfig {
max_iterations: self.config.max_iterations as usize,
enable_tool_intent_nudge: true,
max_tool_intent_nudges: 2,
};
crate::agent::agentic_loop::run_agentic_loop(
&delegate,
&reasoning,
&mut reason_ctx,
&config,
)
.await
})
.await;
let iterations = *iteration_tracker.lock().await;
match result {
Ok(Ok(LoopOutcome::Response(output))) => {
tracing::info!("Worker completed job {} successfully", self.config.job_id);
self.post_event(
"result",
serde_json::json!({
"success": true,
"message": truncate_for_preview(&output, 2000),
}),
)
.await;
self.client
.report_complete(&CompletionReport {
success: true,
message: Some(output),
iterations,
})
.await?;
}
Ok(Ok(LoopOutcome::MaxIterations)) => {
let msg = format!("max iterations ({}) exceeded", self.config.max_iterations);
tracing::warn!("Worker failed for job {}: {}", self.config.job_id, msg);
self.post_event(
"result",
serde_json::json!({
"success": false,
"message": format!("Execution failed: {}", msg),
}),
)
.await;
self.client
.report_complete(&CompletionReport {
success: false,
message: Some(format!("Execution failed: {}", msg)),
iterations,
})
.await?;
}
Ok(Ok(LoopOutcome::Stopped | LoopOutcome::NeedApproval(_))) => {
tracing::info!("Worker for job {} stopped", self.config.job_id);
self.client
.report_complete(&CompletionReport {
success: false,
message: Some("Execution stopped".to_string()),
iterations,
})
.await?;
}
Ok(Err(e)) => {
tracing::error!("Worker failed for job {}: {}", self.config.job_id, e);
self.post_event(
"result",
serde_json::json!({
"success": false,
"message": format!("Execution failed: {}", e),
}),
)
.await;
self.client
.report_complete(&CompletionReport {
success: false,
message: Some(format!("Execution failed: {}", e)),
iterations,
})
.await?;
}
Err(_) => {
tracing::warn!("Worker timed out for job {}", self.config.job_id);
self.post_event(
"result",
serde_json::json!({
"success": false,
"message": "Execution timed out",
}),
)
.await;
self.client
.report_complete(&CompletionReport {
success: false,
message: Some("Execution timed out".to_string()),
iterations,
})
.await?;
}
}
Ok(())
}
async fn post_event(&self, event_type: &str, data: serde_json::Value) {
self.client
.post_event(&JobEventPayload {
event_type: event_type.to_string(),
data,
})
.await;
}
}
struct ContainerDelegate {
client: Arc<WorkerHttpClient>,
safety: Arc<SafetyLayer>,
tools: Arc<ToolRegistry>,
extra_env: Arc<HashMap<String, String>>,
last_output: Mutex<String>,
iteration_tracker: Arc<Mutex<u32>>,
}
impl ContainerDelegate {
async fn post_event(&self, event_type: &str, data: serde_json::Value) {
self.client
.post_event(&JobEventPayload {
event_type: event_type.to_string(),
data,
})
.await;
}
async fn poll_and_inject_prompt(&self, reason_ctx: &mut ReasoningContext) {
match self.client.poll_prompt().await {
Ok(Some(prompt)) => {
tracing::info!(
"Received follow-up prompt: {}",
truncate_for_preview(&prompt.content, 100)
);
self.post_event(
"message",
serde_json::json!({
"role": "user",
"content": truncate_for_preview(&prompt.content, 2000),
}),
)
.await;
reason_ctx.messages.push(ChatMessage::user(&prompt.content));
}
Ok(None) => {}
Err(e) => {
tracing::debug!("Failed to poll for prompt: {}", e);
}
}
}
}
#[async_trait]
impl LoopDelegate for ContainerDelegate {
async fn check_signals(&self) -> LoopSignal {
LoopSignal::Continue
}
async fn before_llm_call(
&self,
reason_ctx: &mut ReasoningContext,
iteration: usize,
) -> Option<LoopOutcome> {
let iteration = iteration as u32;
*self.iteration_tracker.lock().await = iteration;
if iteration % 5 == 1 {
let _ = self
.client
.report_status(&StatusUpdate {
state: "in_progress".to_string(),
message: Some(format!("Iteration {}", iteration)),
iteration,
})
.await;
}
self.poll_and_inject_prompt(reason_ctx).await;
crate::util::ensure_ends_with_user_message(&mut reason_ctx.messages);
reason_ctx.available_tools = self.tools.tool_definitions().await;
None
}
async fn call_llm(
&self,
reasoning: &Reasoning,
reason_ctx: &mut ReasoningContext,
_iteration: usize,
) -> Result<crate::llm::RespondOutput, crate::error::Error> {
reasoning
.respond_with_tools(reason_ctx)
.await
.map_err(Into::into)
}
async fn handle_text_response(
&self,
text: &str,
reason_ctx: &mut ReasoningContext,
) -> TextAction {
self.post_event(
"message",
serde_json::json!({
"role": "assistant",
"content": truncate_for_preview(text, 2000),
}),
)
.await;
if crate::util::llm_signals_completion(text) {
let last = self.last_output.lock().await;
let output = if last.is_empty() {
text.to_string()
} else {
last.clone()
};
return TextAction::Return(LoopOutcome::Response(output));
}
reason_ctx.messages.push(ChatMessage::assistant(text));
TextAction::Continue
}
async fn execute_tool_calls(
&self,
tool_calls: Vec<crate::llm::ToolCall>,
content: Option<String>,
reason_ctx: &mut ReasoningContext,
) -> Result<Option<LoopOutcome>, crate::error::Error> {
if let Some(ref text) = content {
self.post_event(
"message",
serde_json::json!({
"role": "assistant",
"content": truncate_for_preview(text, 2000),
}),
)
.await;
}
reason_ctx
.messages
.push(ChatMessage::assistant_with_tool_calls(
content,
tool_calls.clone(),
));
for tc in tool_calls {
self.post_event(
"tool_use",
serde_json::json!({
"tool_name": tc.name,
"input": truncate_for_preview(&tc.arguments.to_string(), 500),
}),
)
.await;
let job_ctx = JobContext {
extra_env: self.extra_env.clone(),
..Default::default()
};
let result = execute_tool_simple(
&self.tools,
&self.safety,
&tc.name,
tc.arguments.clone(),
&job_ctx,
)
.await;
self.post_event(
"tool_result",
serde_json::json!({
"tool_name": tc.name,
"output": match &result {
Ok(output) => truncate_for_preview(output, 2000),
Err(e) => format!("Error: {}", truncate_for_preview(e, 500)).into(),
},
"success": result.is_ok(),
}),
)
.await;
if let Ok(ref output) = result {
*self.last_output.lock().await = output.clone();
}
let (_, message) = process_tool_result(&self.safety, &tc.name, &tc.id, &result);
reason_ctx.messages.push(message);
}
Ok(None)
}
async fn on_tool_intent_nudge(&self, text: &str, _reason_ctx: &mut ReasoningContext) {
self.post_event(
"message",
serde_json::json!({
"role": "assistant",
"content": truncate_for_preview(text, 2000),
"nudge": true,
}),
)
.await;
}
async fn after_iteration(&self, _iteration: usize) {
tokio::time::sleep(Duration::from_millis(100)).await;
}
}
#[cfg(test)]
mod tests {
use crate::agent::agentic_loop::truncate_for_preview;
#[test]
fn test_truncate_within_limit() {
assert_eq!(truncate_for_preview("hello", 10), "hello");
}
#[test]
fn test_truncate_at_limit() {
assert_eq!(truncate_for_preview("hello", 5), "hello");
}
#[test]
fn test_truncate_beyond_limit() {
let result = truncate_for_preview("hello world", 5);
assert_eq!(result, "hello...");
}
#[test]
fn test_truncate_multibyte_safe() {
let result = truncate_for_preview("é is fancy", 1);
assert_eq!(result, "...");
}
}