use std::collections::HashMap;
use std::sync::Arc;
use std::time::Instant;
use async_trait::async_trait;
use tokio::sync::RwLock;
use crate::{
AgentSpec, ArtifactId, ExecutionContext, ExecutionHandle, ExecutionMetrics, ExecutionResult,
ResourceLimits, Run, RunError, RunId, RunStatus, RuntimeAdapter, RuntimeKind, StatusResult,
};
#[derive(Debug, Clone)]
pub struct HttpConfig {
pub base_url: Option<String>,
pub api_key: Option<String>,
pub timeout_secs: u64,
pub default_limits: ResourceLimits,
}
impl Default for HttpConfig {
fn default() -> Self {
HttpConfig {
base_url: None,
api_key: None,
timeout_secs: 300,
default_limits: ResourceLimits::default(),
}
}
}
#[allow(dead_code)]
struct ActiveHttpExecution {
run_id: RunId,
status: RunStatus,
started_at: Instant,
artifacts: Vec<ArtifactId>,
response: Option<serde_json::Value>,
error_message: Option<String>,
base_url: String,
cancel_endpoint: String,
api_key: Option<String>,
}
pub struct HttpRuntime {
config: HttpConfig,
active_executions: Arc<RwLock<HashMap<String, ActiveHttpExecution>>>,
}
impl HttpRuntime {
pub fn new() -> Self {
HttpRuntime::with_config(HttpConfig::default())
}
pub fn with_config(config: HttpConfig) -> Self {
HttpRuntime {
config,
active_executions: Arc::new(RwLock::new(HashMap::new())),
}
}
pub fn base_url(&self) -> Option<&str> {
self.config.base_url.as_deref()
}
fn build_client() -> reqwest::Client {
reqwest::Client::builder()
.timeout(std::time::Duration::from_secs(300))
.build()
.unwrap_or_else(|_| reqwest::Client::new())
}
async fn execute_http_request(
base_url: &str,
endpoint: &str,
payload: &serde_json::Value,
api_key: Option<&str>,
) -> Result<serde_json::Value, RunError> {
let client = Self::build_client();
let url = format!(
"{}/{}",
base_url.trim_end_matches('/'),
endpoint.trim_start_matches('/')
);
let mut request = client.post(&url).json(payload);
if let Some(key) = api_key {
request = request.header("Authorization", format!("Bearer {}", key));
}
let response = request.send().await.map_err(|e| RunError::StartupFailed {
message: format!("HTTP request failed: {}", e),
})?;
let status = response.status();
let body = response
.text()
.await
.map_err(|e| RunError::ExecutionFailed {
message: format!("Failed to read response: {}", e),
})?;
if !status.is_success() {
return Err(RunError::ExecutionFailed {
message: format!("HTTP error {}: {}", status, body),
});
}
let json: serde_json::Value =
serde_json::from_str(&body).unwrap_or_else(|_| serde_json::json!({ "result": body }));
Ok(json)
}
fn load_spec_from_run(&self, run: &Run, ctx: &ExecutionContext) -> Result<AgentSpec, RunError> {
match &run.target {
crate::RunTarget::Agent { spec_path } => {
let default_dir = std::path::PathBuf::from(".");
let base = ctx.working_dir.as_ref().unwrap_or(&default_dir);
let full_path = if spec_path.is_absolute() {
spec_path.clone()
} else {
base.join(spec_path)
};
if full_path.exists() {
AgentSpec::from_yaml_file(&full_path)
} else {
let name = full_path
.file_stem()
.and_then(|s| s.to_str())
.unwrap_or("agent");
Ok(AgentSpec::new(name, RuntimeKind::Http))
}
}
crate::RunTarget::Swarm { swarmfile_path: _ } => Err(RunError::InvalidConfig {
message: "Swarm execution requires worker loading".into(),
}),
crate::RunTarget::A2AAgent { .. } => Err(RunError::InvalidConfig {
message: "A2A targets should use A2ARuntime, not HttpRuntime".into(),
}),
}
}
}
impl Default for HttpRuntime {
fn default() -> Self {
Self::new()
}
}
#[async_trait]
impl RuntimeAdapter for HttpRuntime {
fn kind(&self) -> RuntimeKind {
RuntimeKind::Http
}
async fn create(&self, spec: &AgentSpec) -> Result<ExecutionContext, RunError> {
let ctx = ExecutionContext::new(format!("http-{}", spec.id.as_str()), RuntimeKind::Http)
.with_limits(self.config.default_limits.clone());
Ok(ctx)
}
async fn execute(
&self,
ctx: &ExecutionContext,
run: &Run,
) -> Result<ExecutionHandle, RunError> {
let spec = self.load_spec_from_run(run, ctx)?;
let base_url = spec
.runtime
.config
.get("url")
.map(|s| s.as_str())
.or(self.config.base_url.as_deref())
.ok_or_else(|| RunError::InvalidConfig {
message: "No 'url' specified in AgentSpec runtime.config or HttpConfig.base_url"
.into(),
})?;
let endpoint = spec
.runtime
.config
.get("endpoint")
.map(|s| s.as_str())
.unwrap_or("/execute");
let cancel_endpoint = spec
.runtime
.config
.get("cancel_endpoint")
.map(|s| s.as_str())
.unwrap_or("/cancel");
let mut payload = spec
.runtime
.config
.get("payload")
.and_then(|s| serde_json::from_str(s).ok())
.unwrap_or(serde_json::json!({ "agent": spec.id.as_str() }));
if let Some(input) = &run.input {
payload["input"] = input.clone();
}
let started_at = Instant::now();
let api_key = self.config.api_key.clone();
let result = Self::execute_http_request(
base_url,
endpoint,
&payload,
self.config.api_key.as_deref(),
)
.await;
let (status, response, error_message) = match result {
Ok(json) => (RunStatus::Completed, Some(json), None),
Err(e) => (RunStatus::Failed, None, Some(e.to_string())),
};
let handle = ExecutionHandle::new(
run.id.clone(),
RuntimeKind::Http,
format!("http:{}", ctx.id),
);
let execution = ActiveHttpExecution {
run_id: run.id.clone(),
status,
started_at,
artifacts: Vec::new(),
response,
error_message,
base_url: base_url.to_string(),
cancel_endpoint: cancel_endpoint.to_string(),
api_key,
};
{
let mut executions = self.active_executions.write().await;
executions.insert(run.id.as_str().to_string(), execution);
}
Ok(handle)
}
async fn execute_background(
&self,
ctx: &ExecutionContext,
run: &Run,
) -> Result<ExecutionHandle, RunError> {
self.execute(ctx, run).await
}
async fn status(&self, handle: &ExecutionHandle) -> Result<StatusResult, RunError> {
let executions = self.active_executions.read().await;
let execution =
executions
.get(handle.run_id.as_str())
.ok_or_else(|| RunError::InvalidConfig {
message: format!("Execution not found: {}", handle.run_id.as_str()),
})?;
Ok(StatusResult {
run_id: execution.run_id.clone(),
status: execution.status,
current_step: None,
progress: 0,
elapsed_ms: execution.started_at.elapsed().as_millis() as u64,
artifacts: execution.artifacts.clone(),
})
}
async fn destroy(&self, _ctx: &ExecutionContext) -> Result<(), RunError> {
Ok(())
}
async fn cancel(&self, handle: &ExecutionHandle) -> Result<(), RunError> {
let executions = self.active_executions.read().await;
let execution =
executions
.get(handle.run_id.as_str())
.ok_or_else(|| RunError::InvalidConfig {
message: format!("Execution not found: {}", handle.run_id.as_str()),
})?;
let cancel_url = format!(
"{}/{}",
execution.base_url.trim_end_matches('/'),
execution.cancel_endpoint.trim_start_matches('/')
);
let cancel_payload = serde_json::json!({
"run_id": handle.run_id.as_str(),
"action": "cancel"
});
let api_key = execution.api_key.clone();
drop(executions);
let client = Self::build_client();
let mut request = client.post(&cancel_url).json(&cancel_payload);
if let Some(key) = &api_key {
request = request.header("Authorization", format!("Bearer {}", key));
}
let result = request.send().await;
{
let mut executions = self.active_executions.write().await;
if let Some(exec) = executions.get_mut(handle.run_id.as_str()) {
exec.status = RunStatus::Cancelled;
}
}
if let Err(e) = result {
eprintln!("Warning: HTTP cancel request failed: {}", e);
}
Ok(())
}
async fn wait(&self, handle: &ExecutionHandle) -> Result<ExecutionResult, RunError> {
let executions = self.active_executions.read().await;
let execution =
executions
.get(handle.run_id.as_str())
.ok_or_else(|| RunError::InvalidConfig {
message: format!("Execution not found: {}", handle.run_id.as_str()),
})?;
let elapsed = execution.started_at.elapsed();
Ok(ExecutionResult {
run_id: execution.run_id.clone(),
status: execution.status,
artifacts: execution.artifacts.clone(),
error: execution
.error_message
.as_ref()
.map(|msg| RunError::ExecutionFailed {
message: msg.clone(),
}),
metrics: ExecutionMetrics {
wall_time_ms: elapsed.as_millis() as u64,
..Default::default()
},
output: execution.response.clone(),
})
}
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn test_http_runtime_creation() {
let runtime = HttpRuntime::new();
assert_eq!(runtime.kind(), RuntimeKind::Http);
}
#[test]
fn test_http_config() {
let config = HttpConfig {
base_url: Some("https://api.example.com".into()),
api_key: Some("secret-key".into()),
timeout_secs: 600,
..Default::default()
};
let runtime = HttpRuntime::with_config(config);
assert_eq!(
runtime.config.base_url,
Some("https://api.example.com".into())
);
assert_eq!(runtime.config.timeout_secs, 600);
}
#[tokio::test]
async fn test_http_runtime_create() {
let runtime = HttpRuntime::new();
let spec = AgentSpec::new("test-agent", RuntimeKind::Http);
let ctx = runtime.create(&spec).await.unwrap();
assert!(ctx.id.starts_with("http-"));
assert_eq!(ctx.runtime_kind, RuntimeKind::Http);
}
#[tokio::test]
#[ignore = "Requires a live HTTP server"]
async fn test_http_real_execute() {
let config = HttpConfig {
base_url: Some("https://httpbin.org".into()),
..Default::default()
};
let runtime = HttpRuntime::with_config(config);
let mut spec = AgentSpec::new("test-agent", RuntimeKind::Http);
spec.runtime
.config
.insert("url".into(), "https://httpbin.org".into());
spec.runtime
.config
.insert("endpoint".into(), "/post".into());
let ctx = runtime.create(&spec).await.unwrap();
let run = Run::new(
crate::RunTarget::Agent {
spec_path: std::path::PathBuf::from("agent.yaml"),
},
RuntimeKind::Http,
);
let handle = runtime.execute(&ctx, &run).await.unwrap();
assert_eq!(handle.runtime_kind, RuntimeKind::Http);
let status = runtime.status(&handle).await.unwrap();
assert_eq!(status.status, RunStatus::Completed);
}
}