use std::sync::Arc;
use std::time::Duration;
use futures::future::join_all;
use tokio::sync::mpsc;
use uuid::Uuid;
use crate::agent::scheduler::WorkerMessage;
use crate::agent::task::TaskOutput;
use crate::context::{ContextManager, JobState};
use crate::db::Database;
use crate::error::Error;
use crate::hooks::HookRegistry;
use crate::llm::{
ActionPlan, ChatMessage, LlmProvider, Reasoning, ReasoningContext, RespondResult, ToolSelection,
};
use crate::safety::SafetyLayer;
use crate::tools::ToolRegistry;
#[derive(Clone)]
pub struct WorkerDeps {
pub context_manager: Arc<ContextManager>,
pub llm: Arc<dyn LlmProvider>,
pub safety: Arc<SafetyLayer>,
pub tools: Arc<ToolRegistry>,
pub store: Option<Arc<dyn Database>>,
pub hooks: Arc<HookRegistry>,
pub timeout: Duration,
pub use_planning: bool,
}
pub struct Worker {
job_id: Uuid,
deps: WorkerDeps,
}
struct ToolExecResult {
result: Result<String, Error>,
}
impl Worker {
pub fn new(job_id: Uuid, deps: WorkerDeps) -> Self {
Self { job_id, deps }
}
fn context_manager(&self) -> &Arc<ContextManager> {
&self.deps.context_manager
}
fn llm(&self) -> &Arc<dyn LlmProvider> {
&self.deps.llm
}
fn safety(&self) -> &Arc<SafetyLayer> {
&self.deps.safety
}
fn tools(&self) -> &Arc<ToolRegistry> {
&self.deps.tools
}
fn store(&self) -> Option<&Arc<dyn Database>> {
self.deps.store.as_ref()
}
fn timeout(&self) -> Duration {
self.deps.timeout
}
fn use_planning(&self) -> bool {
self.deps.use_planning
}
fn persist_status(&self, status: JobState, reason: Option<String>) {
if let Some(store) = self.store() {
let store = store.clone();
let job_id = self.job_id;
tokio::spawn(async move {
if let Err(e) = store
.update_job_status(job_id, status, reason.as_deref())
.await
{
tracing::warn!("Failed to persist status for job {}: {}", job_id, e);
}
});
}
}
pub async fn run(self, mut rx: mpsc::Receiver<WorkerMessage>) -> Result<(), Error> {
tracing::info!("Worker starting for job {}", self.job_id);
match rx.recv().await {
Some(WorkerMessage::Start) => {}
Some(WorkerMessage::Stop) | None => {
tracing::debug!("Worker for job {} stopped before starting", self.job_id);
return Ok(());
}
Some(WorkerMessage::Ping) => {}
}
let job_ctx = self.context_manager().get_context(self.job_id).await?;
let reasoning = Reasoning::new(self.llm().clone(), self.safety().clone());
let mut reason_ctx = ReasoningContext::new().with_job(&job_ctx.description);
reason_ctx.messages.push(ChatMessage::system(format!(
r#"You are an autonomous agent working on a job.
Job: {}
Description: {}
You have access to tools to complete this job. Plan your approach and execute tools as needed.
You may request multiple tools at once if they can be executed in parallel.
Report when the job is complete or if you encounter issues you cannot resolve."#,
job_ctx.title, job_ctx.description
)));
let result = tokio::time::timeout(self.timeout(), async {
self.execution_loop(&mut rx, &reasoning, &mut reason_ctx)
.await
})
.await;
match result {
Ok(Ok(())) => {
tracing::info!("Worker for job {} completed successfully", self.job_id);
}
Ok(Err(e)) => {
tracing::error!("Worker for job {} failed: {}", self.job_id, e);
self.mark_failed(&e.to_string()).await?;
}
Err(_) => {
tracing::warn!("Worker for job {} timed out", self.job_id);
self.mark_stuck("Execution timeout").await?;
}
}
Ok(())
}
async fn execution_loop(
&self,
rx: &mut mpsc::Receiver<WorkerMessage>,
reasoning: &Reasoning,
reason_ctx: &mut ReasoningContext,
) -> Result<(), Error> {
let max_iterations = 50;
let mut iteration = 0;
reason_ctx.available_tools = self.tools().tool_definitions().await;
let plan = if self.use_planning() {
match reasoning.plan(reason_ctx).await {
Ok(p) => {
tracing::info!(
"Created plan for job {}: {} actions, {:.0}% confidence",
self.job_id,
p.actions.len(),
p.confidence * 100.0
);
reason_ctx.messages.push(ChatMessage::assistant(format!(
"I've created a plan to accomplish this goal: {}\n\nSteps:\n{}",
p.goal,
p.actions
.iter()
.enumerate()
.map(|(i, a)| format!("{}. {} - {}", i + 1, a.tool_name, a.reasoning))
.collect::<Vec<_>>()
.join("\n")
)));
Some(p)
}
Err(e) => {
tracing::warn!(
"Planning failed for job {}, falling back to direct selection: {}",
self.job_id,
e
);
None
}
}
} else {
None
};
if let Some(ref plan) = plan {
return self.execute_plan(rx, reasoning, reason_ctx, plan).await;
}
loop {
if let Ok(msg) = rx.try_recv() {
match msg {
WorkerMessage::Stop => {
tracing::debug!("Worker for job {} received stop signal", self.job_id);
return Ok(());
}
WorkerMessage::Ping => {
tracing::trace!("Worker for job {} received ping", self.job_id);
}
WorkerMessage::Start => {}
}
}
if let Ok(ctx) = self.context_manager().get_context(self.job_id).await
&& ctx.state == JobState::Cancelled
{
tracing::info!("Worker for job {} detected cancellation", self.job_id);
return Ok(());
}
iteration += 1;
if iteration > max_iterations {
self.mark_stuck("Maximum iterations exceeded").await?;
return Ok(());
}
reason_ctx.available_tools = self.tools().tool_definitions().await;
let selections = reasoning.select_tools(reason_ctx).await?;
if selections.is_empty() {
let respond_output = reasoning.respond_with_tools(reason_ctx).await?;
match respond_output.result {
RespondResult::Text(response) => {
if crate::util::llm_signals_completion(&response) {
self.mark_completed().await?;
return Ok(());
}
reason_ctx.messages.push(ChatMessage::assistant(&response));
if iteration > 3 && iteration % 5 == 0 {
reason_ctx.messages.push(ChatMessage::user(
"Are you stuck? Do you need help completing this job?",
));
}
}
RespondResult::ToolCalls {
tool_calls,
content,
} => {
tracing::debug!(
"Job {} respond_with_tools returned {} tool calls",
self.job_id,
tool_calls.len()
);
reason_ctx
.messages
.push(ChatMessage::assistant_with_tool_calls(
content,
tool_calls.clone(),
));
for tc in tool_calls {
let result = self.execute_tool(&tc.name, &tc.arguments).await;
let selection = ToolSelection {
tool_name: tc.name.clone(),
parameters: tc.arguments.clone(),
reasoning: String::new(),
alternatives: vec![],
tool_call_id: tc.id.clone(),
};
self.process_tool_result(reason_ctx, &selection, result)
.await?;
}
}
}
} else if selections.len() == 1 {
let selection = &selections[0];
tracing::debug!(
"Job {} selecting tool: {} - {}",
self.job_id,
selection.tool_name,
selection.reasoning
);
let result = self
.execute_tool(&selection.tool_name, &selection.parameters)
.await;
self.process_tool_result(reason_ctx, selection, result)
.await?;
} else {
tracing::debug!(
"Job {} executing {} tools in parallel",
self.job_id,
selections.len()
);
let results = self.execute_tools_parallel(&selections).await;
for (selection, result) in selections.iter().zip(results) {
self.process_tool_result(reason_ctx, selection, result.result)
.await?;
}
}
tokio::time::sleep(Duration::from_millis(100)).await;
}
}
async fn execute_tools_parallel(&self, selections: &[ToolSelection]) -> Vec<ToolExecResult> {
let futures: Vec<_> = selections
.iter()
.map(|selection| {
let tool_name = selection.tool_name.clone();
let params = selection.parameters.clone();
let deps = self.deps.clone();
let job_id = self.job_id;
async move {
let result = Self::execute_tool_inner(&deps, job_id, &tool_name, ¶ms).await;
ToolExecResult { result }
}
})
.collect();
join_all(futures).await
}
async fn execute_tool_inner(
deps: &WorkerDeps,
job_id: Uuid,
tool_name: &str,
params: &serde_json::Value,
) -> Result<String, Error> {
let tool =
deps.tools
.get(tool_name)
.await
.ok_or_else(|| crate::error::ToolError::NotFound {
name: tool_name.to_string(),
})?;
if tool.requires_approval() {
return Err(crate::error::ToolError::AuthRequired {
name: tool_name.to_string(),
}
.into());
}
let job_ctx = deps.context_manager.get_context(job_id).await?;
let params = {
use crate::hooks::{HookError, HookEvent, HookOutcome};
let event = HookEvent::ToolCall {
tool_name: tool_name.to_string(),
parameters: params.clone(),
user_id: job_ctx.user_id.clone(),
context: format!("job:{}", job_id),
};
match deps.hooks.run(&event).await {
Err(HookError::Rejected { reason }) => {
return Err(crate::error::ToolError::ExecutionFailed {
name: tool_name.to_string(),
reason: format!("Blocked by hook: {}", reason),
}
.into());
}
Err(err) => {
return Err(crate::error::ToolError::ExecutionFailed {
name: tool_name.to_string(),
reason: format!("Blocked by hook failure mode: {}", err),
}
.into());
}
Ok(HookOutcome::Continue {
modified: Some(new_params),
}) => serde_json::from_str(&new_params).unwrap_or_else(|e| {
tracing::warn!(
tool = %tool_name,
"Hook returned non-JSON modification for ToolCall, ignoring: {}",
e
);
params.clone()
}),
_ => params.clone(),
}
};
if job_ctx.state == JobState::Cancelled {
return Err(crate::error::ToolError::ExecutionFailed {
name: tool_name.to_string(),
reason: "Job is cancelled".to_string(),
}
.into());
}
let validation = deps.safety.validator().validate_tool_params(¶ms);
if !validation.is_valid {
let details = validation
.errors
.iter()
.map(|e| format!("{}: {}", e.field, e.message))
.collect::<Vec<_>>()
.join("; ");
return Err(crate::error::ToolError::InvalidParameters {
name: tool_name.to_string(),
reason: format!("Invalid tool parameters: {}", details),
}
.into());
}
tracing::debug!(
tool = %tool_name,
params = %params,
job = %job_id,
"Tool call started"
);
let tool_timeout = tool.execution_timeout();
let start = std::time::Instant::now();
let result = tokio::time::timeout(tool_timeout, async {
tool.execute(params.clone(), &job_ctx).await
})
.await;
let elapsed = start.elapsed();
match &result {
Ok(Ok(output)) => {
let result_str = serde_json::to_string(&output.result)
.unwrap_or_else(|_| "<serialize error>".to_string());
tracing::debug!(
tool = %tool_name,
elapsed_ms = elapsed.as_millis() as u64,
result = %result_str,
"Tool call succeeded"
);
}
Ok(Err(e)) => {
tracing::debug!(
tool = %tool_name,
elapsed_ms = elapsed.as_millis() as u64,
error = %e,
"Tool call failed"
);
}
Err(_) => {
tracing::debug!(
tool = %tool_name,
elapsed_ms = elapsed.as_millis() as u64,
timeout_secs = tool_timeout.as_secs(),
"Tool call timed out"
);
}
}
let action = match &result {
Ok(Ok(output)) => {
let output_str = serde_json::to_string_pretty(&output.result)
.ok()
.map(|s| deps.safety.sanitize_tool_output(tool_name, &s).content);
deps.context_manager
.update_memory(job_id, |mem| {
let rec = mem.create_action(tool_name, params.clone()).succeed(
output_str.clone(),
output.result.clone(),
elapsed,
);
mem.record_action(rec.clone());
rec
})
.await
.ok()
}
Ok(Err(e)) => deps
.context_manager
.update_memory(job_id, |mem| {
let rec = mem
.create_action(tool_name, params.clone())
.fail(e.to_string(), elapsed);
mem.record_action(rec.clone());
rec
})
.await
.ok(),
Err(_) => deps
.context_manager
.update_memory(job_id, |mem| {
let rec = mem
.create_action(tool_name, params.clone())
.fail("Execution timeout", elapsed);
mem.record_action(rec.clone());
rec
})
.await
.ok(),
};
if let (Some(action), Some(store)) = (action, deps.store.clone()) {
tokio::spawn(async move {
if let Err(e) = store.save_action(job_id, &action).await {
tracing::warn!("Failed to persist action for job {}: {}", job_id, e);
}
});
}
let output = result
.map_err(|_| crate::error::ToolError::Timeout {
name: tool_name.to_string(),
timeout: tool_timeout,
})?
.map_err(|e| crate::error::ToolError::ExecutionFailed {
name: tool_name.to_string(),
reason: e.to_string(),
})?;
serde_json::to_string_pretty(&output.result).map_err(|e| {
crate::error::ToolError::ExecutionFailed {
name: tool_name.to_string(),
reason: format!("Failed to serialize result: {}", e),
}
.into()
})
}
async fn process_tool_result(
&self,
reason_ctx: &mut ReasoningContext,
selection: &ToolSelection,
result: Result<String, Error>,
) -> Result<bool, Error> {
match result {
Ok(output) => {
let sanitized = self
.safety()
.sanitize_tool_output(&selection.tool_name, &output);
let wrapped = self.safety().wrap_for_llm(
&selection.tool_name,
&sanitized.content,
sanitized.was_modified,
);
reason_ctx.messages.push(ChatMessage::tool_result(
&selection.tool_call_id,
&selection.tool_name,
wrapped,
));
Ok(false)
}
Err(e) => {
tracing::warn!(
"Tool {} failed for job {}: {}",
selection.tool_name,
self.job_id,
e
);
if let Some(store) = self.store() {
let store = store.clone();
let tool_name = selection.tool_name.clone();
let error_msg = e.to_string();
tokio::spawn(async move {
if let Err(db_err) = store.record_tool_failure(&tool_name, &error_msg).await
{
tracing::warn!("Failed to record tool failure: {}", db_err);
}
});
}
reason_ctx.messages.push(ChatMessage::tool_result(
&selection.tool_call_id,
&selection.tool_name,
format!("Error: {}", e),
));
Ok(false)
}
}
}
async fn execute_plan(
&self,
rx: &mut mpsc::Receiver<WorkerMessage>,
reasoning: &Reasoning,
reason_ctx: &mut ReasoningContext,
plan: &ActionPlan,
) -> Result<(), Error> {
for (i, action) in plan.actions.iter().enumerate() {
if let Ok(msg) = rx.try_recv() {
match msg {
WorkerMessage::Stop => {
tracing::debug!(
"Worker for job {} received stop signal during plan execution",
self.job_id
);
return Ok(());
}
WorkerMessage::Ping => {
tracing::trace!("Worker for job {} received ping", self.job_id);
}
WorkerMessage::Start => {}
}
}
tracing::debug!(
"Job {} executing planned action {}/{}: {} - {}",
self.job_id,
i + 1,
plan.actions.len(),
action.tool_name,
action.reasoning
);
let result = self
.execute_tool(&action.tool_name, &action.parameters)
.await;
let selection = ToolSelection {
tool_name: action.tool_name.clone(),
parameters: action.parameters.clone(),
reasoning: action.reasoning.clone(),
alternatives: vec![],
tool_call_id: format!("plan_{}_{}", self.job_id, i),
};
let completed = self
.process_tool_result(reason_ctx, &selection, result)
.await?;
if completed {
return Ok(());
}
tokio::time::sleep(Duration::from_millis(100)).await;
}
reason_ctx.messages.push(ChatMessage::user(
"All planned actions have been executed. Is the job complete? If not, what else needs to be done?",
));
let response = reasoning.respond(reason_ctx).await?;
reason_ctx.messages.push(ChatMessage::assistant(&response));
if crate::util::llm_signals_completion(&response) {
self.mark_completed().await?;
} else {
tracing::info!(
"Job {} plan completed but work remains, falling back to direct selection",
self.job_id
);
self.mark_stuck("Plan completed but job incomplete - needs re-planning")
.await?;
}
Ok(())
}
async fn execute_tool(
&self,
tool_name: &str,
params: &serde_json::Value,
) -> Result<String, Error> {
Self::execute_tool_inner(&self.deps, self.job_id, tool_name, params).await
}
async fn mark_completed(&self) -> Result<(), Error> {
self.context_manager()
.update_context(self.job_id, |ctx| {
ctx.transition_to(
JobState::Completed,
Some("Job completed successfully".to_string()),
)
})
.await?
.map_err(|s| crate::error::JobError::ContextError {
id: self.job_id,
reason: s,
})?;
self.persist_status(
JobState::Completed,
Some("Job completed successfully".to_string()),
);
Ok(())
}
async fn mark_failed(&self, reason: &str) -> Result<(), Error> {
self.context_manager()
.update_context(self.job_id, |ctx| {
ctx.transition_to(JobState::Failed, Some(reason.to_string()))
})
.await?
.map_err(|s| crate::error::JobError::ContextError {
id: self.job_id,
reason: s,
})?;
self.persist_status(JobState::Failed, Some(reason.to_string()));
Ok(())
}
async fn mark_stuck(&self, reason: &str) -> Result<(), Error> {
self.context_manager()
.update_context(self.job_id, |ctx| ctx.mark_stuck(reason))
.await?
.map_err(|s| crate::error::JobError::ContextError {
id: self.job_id,
reason: s,
})?;
self.persist_status(JobState::Stuck, Some(reason.to_string()));
Ok(())
}
}
impl From<TaskOutput> for Result<String, Error> {
fn from(output: TaskOutput) -> Self {
serde_json::to_string_pretty(&output.result).map_err(|e| {
crate::error::ToolError::ExecutionFailed {
name: "task".to_string(),
reason: format!("Failed to serialize result: {}", e),
}
.into()
})
}
}
#[cfg(test)]
mod tests {
use crate::llm::ToolSelection;
use crate::util::llm_signals_completion;
#[test]
fn test_tool_selection_preserves_call_id() {
let selection = ToolSelection {
tool_name: "memory_search".to_string(),
parameters: serde_json::json!({"query": "test"}),
reasoning: "Need to search memory".to_string(),
alternatives: vec![],
tool_call_id: "call_abc123".to_string(),
};
assert_eq!(selection.tool_call_id, "call_abc123");
assert_ne!(
selection.tool_call_id, "tool_call_id",
"tool_call_id must not be the hardcoded placeholder string"
);
}
#[test]
fn test_completion_positive_signals() {
assert!(llm_signals_completion("The job is complete."));
assert!(llm_signals_completion(
"I have completed the task successfully."
));
assert!(llm_signals_completion("The task is done."));
assert!(llm_signals_completion("The task is finished."));
assert!(llm_signals_completion(
"All steps are complete and verified."
));
assert!(llm_signals_completion(
"I've done all the work. The work is done."
));
assert!(llm_signals_completion(
"Successfully completed the migration."
));
}
#[test]
fn test_completion_negative_signals_block_false_positives() {
assert!(!llm_signals_completion("The task is not complete yet."));
assert!(!llm_signals_completion("This is not done."));
assert!(!llm_signals_completion("The work is incomplete."));
assert!(!llm_signals_completion(
"The migration is not yet finished."
));
assert!(!llm_signals_completion("The job isn't done yet."));
assert!(!llm_signals_completion("This remains unfinished."));
}
#[test]
fn test_completion_does_not_match_bare_substrings() {
assert!(!llm_signals_completion(
"I need to complete more work first."
));
assert!(!llm_signals_completion(
"Let me finish the remaining steps."
));
assert!(!llm_signals_completion(
"I'm done analyzing, now let me fix it."
));
assert!(!llm_signals_completion(
"I completed step 1 but step 2 remains."
));
}
#[test]
fn test_completion_tool_output_injection() {
assert!(!llm_signals_completion("TASK_COMPLETE"));
assert!(!llm_signals_completion("JOB_DONE"));
assert!(!llm_signals_completion(
"The tool returned: TASK_COMPLETE signal"
));
}
}