use serde::{Deserialize, Serialize};
use uuid::Uuid;
use crate::error::WorkerError;
use crate::llm::{
ChatMessage, CompletionRequest, CompletionResponse, FinishReason, ToolCall,
ToolCompletionRequest, ToolCompletionResponse, ToolDefinition,
};
pub struct WorkerHttpClient {
client: reqwest::Client,
orchestrator_url: String,
job_id: Uuid,
token: String,
}
#[derive(Debug, Serialize, Deserialize)]
pub struct StatusUpdate {
pub state: String,
pub message: Option<String>,
pub iteration: u32,
}
#[derive(Debug, Serialize, Deserialize)]
pub struct JobDescription {
pub title: String,
pub description: String,
pub project_dir: Option<String>,
}
#[derive(Debug, Serialize, Deserialize)]
pub struct ProxyCompletionRequest {
pub messages: Vec<ChatMessage>,
pub max_tokens: Option<u32>,
pub temperature: Option<f32>,
pub stop_sequences: Option<Vec<String>>,
}
#[derive(Debug, Serialize, Deserialize)]
pub struct ProxyCompletionResponse {
pub content: String,
pub input_tokens: u32,
pub output_tokens: u32,
pub finish_reason: String,
}
#[derive(Debug, Serialize, Deserialize)]
pub struct ProxyToolCompletionRequest {
pub messages: Vec<ChatMessage>,
pub tools: Vec<ToolDefinition>,
pub max_tokens: Option<u32>,
pub temperature: Option<f32>,
pub tool_choice: Option<String>,
}
#[derive(Debug, Serialize, Deserialize)]
pub struct ProxyToolCompletionResponse {
pub content: Option<String>,
pub tool_calls: Vec<ToolCall>,
pub input_tokens: u32,
pub output_tokens: u32,
pub finish_reason: String,
}
#[derive(Debug, Serialize, Deserialize)]
pub struct CompletionReport {
pub success: bool,
pub message: Option<String>,
pub iterations: u32,
}
#[derive(Debug, Serialize, Deserialize)]
pub struct JobEventPayload {
pub event_type: String,
pub data: serde_json::Value,
}
#[derive(Debug, Deserialize)]
pub struct PromptResponse {
pub content: String,
#[serde(default)]
pub done: bool,
}
impl WorkerHttpClient {
pub fn from_env(orchestrator_url: String, job_id: Uuid) -> Result<Self, WorkerError> {
let token =
std::env::var("IRONCLAW_WORKER_TOKEN").map_err(|_| WorkerError::MissingToken)?;
Ok(Self {
client: reqwest::Client::new(),
orchestrator_url: orchestrator_url.trim_end_matches('/').to_string(),
job_id,
token,
})
}
pub fn new(orchestrator_url: String, job_id: Uuid, token: String) -> Self {
Self {
client: reqwest::Client::new(),
orchestrator_url: orchestrator_url.trim_end_matches('/').to_string(),
job_id,
token,
}
}
pub fn orchestrator_url(&self) -> &str {
&self.orchestrator_url
}
fn url(&self, path: &str) -> String {
format!("{}/worker/{}/{}", self.orchestrator_url, self.job_id, path)
}
async fn get_json<T: serde::de::DeserializeOwned>(
&self,
path: &str,
context: &str,
) -> Result<T, WorkerError> {
let resp = self
.client
.get(self.url(path))
.bearer_auth(&self.token)
.send()
.await
.map_err(|e| WorkerError::ConnectionFailed {
url: self.orchestrator_url.clone(),
reason: e.to_string(),
})?;
if !resp.status().is_success() {
return Err(WorkerError::OrchestratorRejected {
job_id: self.job_id,
reason: format!("{} returned {}", context, resp.status()),
});
}
resp.json().await.map_err(|e| WorkerError::LlmProxyFailed {
reason: format!("{}: failed to parse response: {}", context, e),
})
}
async fn post_json<B: Serialize, T: serde::de::DeserializeOwned>(
&self,
path: &str,
body: &B,
context: &str,
) -> Result<T, WorkerError> {
let resp = self
.client
.post(self.url(path))
.bearer_auth(&self.token)
.json(body)
.send()
.await
.map_err(|e| WorkerError::LlmProxyFailed {
reason: format!("{}: {}", context, e),
})?;
if !resp.status().is_success() {
let status = resp.status();
let body = resp.text().await.unwrap_or_default();
return Err(WorkerError::LlmProxyFailed {
reason: format!("{}: orchestrator returned {}: {}", context, status, body),
});
}
resp.json().await.map_err(|e| WorkerError::LlmProxyFailed {
reason: format!("{}: failed to parse response: {}", context, e),
})
}
pub async fn get_job(&self) -> Result<JobDescription, WorkerError> {
self.get_json("job", "GET /job").await
}
pub async fn llm_complete(
&self,
request: &CompletionRequest,
) -> Result<CompletionResponse, WorkerError> {
let proxy_req = ProxyCompletionRequest {
messages: request.messages.clone(),
max_tokens: request.max_tokens,
temperature: request.temperature,
stop_sequences: request.stop_sequences.clone(),
};
let proxy_resp: ProxyCompletionResponse = self
.post_json("llm/complete", &proxy_req, "LLM complete")
.await?;
Ok(CompletionResponse {
content: proxy_resp.content,
input_tokens: proxy_resp.input_tokens,
output_tokens: proxy_resp.output_tokens,
finish_reason: parse_finish_reason(&proxy_resp.finish_reason),
response_id: None,
})
}
pub async fn llm_complete_with_tools(
&self,
request: &ToolCompletionRequest,
) -> Result<ToolCompletionResponse, WorkerError> {
let proxy_req = ProxyToolCompletionRequest {
messages: request.messages.clone(),
tools: request.tools.clone(),
max_tokens: request.max_tokens,
temperature: request.temperature,
tool_choice: request.tool_choice.clone(),
};
let proxy_resp: ProxyToolCompletionResponse = self
.post_json("llm/complete_with_tools", &proxy_req, "LLM tool complete")
.await?;
Ok(ToolCompletionResponse {
content: proxy_resp.content,
tool_calls: proxy_resp.tool_calls,
input_tokens: proxy_resp.input_tokens,
output_tokens: proxy_resp.output_tokens,
finish_reason: parse_finish_reason(&proxy_resp.finish_reason),
response_id: None,
})
}
pub async fn report_status(&self, update: &StatusUpdate) -> Result<(), WorkerError> {
let resp = self
.client
.post(self.url("status"))
.bearer_auth(&self.token)
.json(update)
.send()
.await
.map_err(|e| WorkerError::ConnectionFailed {
url: self.orchestrator_url.clone(),
reason: e.to_string(),
})?;
if !resp.status().is_success() {
tracing::warn!(
"Status report failed with {}: {}",
resp.status(),
resp.text().await.unwrap_or_default()
);
}
Ok(())
}
pub async fn post_event(&self, payload: &JobEventPayload) {
let resp = self
.client
.post(self.url("event"))
.bearer_auth(&self.token)
.json(payload)
.send()
.await;
match resp {
Ok(r) if !r.status().is_success() => {
tracing::debug!(
job_id = %self.job_id,
event_type = %payload.event_type,
status = %r.status(),
"Job event POST rejected"
);
}
Err(e) => {
tracing::debug!(
job_id = %self.job_id,
event_type = %payload.event_type,
"Job event POST failed: {}", e
);
}
_ => {}
}
}
pub async fn poll_prompt(&self) -> Result<Option<PromptResponse>, WorkerError> {
let resp = self
.client
.get(self.url("prompt"))
.bearer_auth(&self.token)
.send()
.await
.map_err(|e| WorkerError::ConnectionFailed {
url: self.orchestrator_url.clone(),
reason: e.to_string(),
})?;
if resp.status() == reqwest::StatusCode::NO_CONTENT {
return Ok(None);
}
if !resp.status().is_success() {
return Err(WorkerError::OrchestratorRejected {
job_id: self.job_id,
reason: format!("prompt endpoint returned {}", resp.status()),
});
}
let prompt: PromptResponse =
resp.json().await.map_err(|e| WorkerError::LlmProxyFailed {
reason: format!("failed to parse prompt response: {}", e),
})?;
Ok(Some(prompt))
}
pub async fn report_complete(&self, report: &CompletionReport) -> Result<(), WorkerError> {
let _: serde_json::Value = self
.post_json("complete", report, "report complete")
.await?;
Ok(())
}
}
fn parse_finish_reason(s: &str) -> FinishReason {
match s {
"stop" => FinishReason::Stop,
"length" => FinishReason::Length,
"tool_use" | "tool_calls" => FinishReason::ToolUse,
"content_filter" => FinishReason::ContentFilter,
_ => FinishReason::Unknown,
}
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn test_url_construction() {
let client = WorkerHttpClient::new(
"http://host.docker.internal:50051".to_string(),
Uuid::nil(),
"test-token".to_string(),
);
assert_eq!(
client.url("llm/complete"),
format!(
"http://host.docker.internal:50051/worker/{}/llm/complete",
Uuid::nil()
)
);
}
#[test]
fn test_parse_finish_reason() {
assert_eq!(parse_finish_reason("stop"), FinishReason::Stop);
assert_eq!(parse_finish_reason("tool_use"), FinishReason::ToolUse);
assert_eq!(parse_finish_reason("unknown"), FinishReason::Unknown);
}
}