#[cfg(feature = "http-input")]
use std::sync::Arc;
#[cfg(feature = "http-input")]
use axum::{
extract::{DefaultBodyLimit, State},
http::{HeaderMap, StatusCode},
middleware,
response::{IntoResponse, Response},
routing::post,
Json, Router,
};
#[cfg(feature = "http-input")]
use serde_json::Value;
#[cfg(feature = "http-input")]
use tokio::sync::{RwLock, Semaphore};
#[cfg(feature = "http-input")]
use tower_http::cors::CorsLayer;
#[cfg(feature = "http-input")]
use super::config::{HttpInputConfig, ResponseControlConfig, RouteMatch};
#[cfg(feature = "http-input")]
use crate::secrets::{new_secret_store, SecretStore, SecretsConfig};
#[cfg(feature = "http-input")]
use crate::types::{AgentId, RuntimeError};
#[cfg(feature = "http-input")]
pub struct HttpInputServer {
config: Arc<RwLock<HttpInputConfig>>,
runtime: Option<Arc<crate::AgentRuntime>>,
secret_store: Option<Arc<dyn SecretStore + Send + Sync>>,
concurrency_limiter: Arc<Semaphore>,
resolved_auth_header: Arc<RwLock<Option<String>>>,
}
#[cfg(feature = "http-input")]
impl HttpInputServer {
pub fn new(config: HttpInputConfig) -> Self {
let concurrency_limiter = Arc::new(Semaphore::new(config.concurrency));
Self {
config: Arc::new(RwLock::new(config)),
runtime: None,
secret_store: None,
concurrency_limiter,
resolved_auth_header: Arc::new(RwLock::new(None)),
}
}
pub fn with_runtime(mut self, runtime: Arc<crate::AgentRuntime>) -> Self {
self.runtime = Some(runtime);
self
}
pub fn with_secret_store(mut self, secret_store: Arc<dyn SecretStore + Send + Sync>) -> Self {
self.secret_store = Some(secret_store);
self
}
pub async fn start(&self) -> Result<(), RuntimeError> {
let config = self.config.read().await;
let addr = format!("{}:{}", config.bind_address, config.port);
if let Some(auth_header) = &config.auth_header {
if let Some(secret_store) = &self.secret_store {
let resolved = resolve_secret_reference(secret_store.as_ref(), auth_header).await?;
*self.resolved_auth_header.write().await = Some(resolved);
} else {
*self.resolved_auth_header.write().await = Some(auth_header.clone());
}
}
let server_state = ServerState {
config: self.config.clone(),
runtime: self.runtime.clone(),
secret_store: self.secret_store.clone(),
concurrency_limiter: self.concurrency_limiter.clone(),
resolved_auth_header: self.resolved_auth_header.clone(),
};
let mut app = Router::new();
let path = config.path.clone();
app = app.route(&path, post(webhook_handler));
app = app.layer(middleware::from_fn_with_state(
server_state.clone(),
auth_middleware,
));
app = app.layer(DefaultBodyLimit::max(config.max_body_bytes));
if config.cors_enabled {
app = app.layer(CorsLayer::permissive());
}
tracing::info!("Starting HTTP Input server on {}", addr);
let listener = tokio::net::TcpListener::bind(&addr).await.map_err(|e| {
RuntimeError::Internal(format!("Failed to bind to address {}: {}", addr, e))
})?;
let app_with_state = app.with_state(server_state);
axum::serve(listener, app_with_state.into_make_service())
.await
.map_err(|e| RuntimeError::Internal(format!("Server error: {}", e)))?;
Ok(())
}
pub async fn stop(&self) -> Result<(), RuntimeError> {
tracing::info!("HTTP Input server stopping");
Ok(())
}
pub async fn update_config(&self, new_config: HttpInputConfig) -> Result<(), RuntimeError> {
*self.config.write().await = new_config;
Ok(())
}
pub async fn get_config(&self) -> HttpInputConfig {
self.config.read().await.clone()
}
}
#[cfg(feature = "http-input")]
#[derive(Clone)]
struct ServerState {
config: Arc<RwLock<HttpInputConfig>>,
runtime: Option<Arc<crate::AgentRuntime>>,
secret_store: Option<Arc<dyn SecretStore + Send + Sync>>,
concurrency_limiter: Arc<Semaphore>,
resolved_auth_header: Arc<RwLock<Option<String>>>,
}
#[cfg(feature = "http-input")]
async fn auth_middleware(
State(state): State<ServerState>,
headers: HeaderMap,
req: axum::extract::Request,
next: axum::middleware::Next,
) -> Result<Response, StatusCode> {
let resolved_auth = state.resolved_auth_header.read().await;
if let Some(expected_auth) = resolved_auth.as_ref() {
let auth_header = headers.get("Authorization").and_then(|h| h.to_str().ok());
match auth_header {
Some(provided_auth) => {
if provided_auth != expected_auth {
tracing::warn!("Authentication failed: invalid authorization header");
return Err(StatusCode::UNAUTHORIZED);
}
}
None => {
tracing::warn!("Authentication failed: missing authorization header");
return Err(StatusCode::UNAUTHORIZED);
}
}
}
Ok(next.run(req).await)
}
#[cfg(feature = "http-input")]
async fn webhook_handler(
State(state): State<ServerState>,
headers: HeaderMap,
Json(payload): Json<Value>,
) -> Result<Response, StatusCode> {
let _permit = state.concurrency_limiter.try_acquire().map_err(|_| {
tracing::warn!("Concurrency limit exceeded");
StatusCode::TOO_MANY_REQUESTS
})?;
let config = state.config.read().await;
if config.audit_enabled {
tracing::info!(
"HTTP Input: Received request with {} headers",
headers.len()
);
}
let agent_id = route_request(&config, &payload, &headers).await;
if let Some(runtime) = &state.runtime {
match invoke_agent(runtime, agent_id, payload).await {
Ok(result) => {
let response_config = config.response_control.as_ref();
format_success_response(result, response_config)
}
Err(e) => {
tracing::error!("Agent invocation failed: {:?}", e);
let response_config = config.response_control.as_ref();
format_error_response(e, response_config)
}
}
} else {
tracing::warn!("No runtime available for agent invocation");
let response_config = config.response_control.as_ref();
format_success_response(
serde_json::json!({"status": "received", "agent": agent_id.to_string()}),
response_config,
)
}
}
#[cfg(feature = "http-input")]
async fn route_request(config: &HttpInputConfig, payload: &Value, headers: &HeaderMap) -> AgentId {
if let Some(routing_rules) = &config.routing_rules {
for rule in routing_rules {
if matches_route_condition(&rule.condition, payload, headers).await {
tracing::debug!("Request routed to agent {} via rule", rule.agent);
return rule.agent;
}
}
}
tracing::debug!("Request routed to default agent {}", config.agent);
config.agent
}
#[cfg(feature = "http-input")]
async fn matches_route_condition(
condition: &RouteMatch,
payload: &Value,
headers: &HeaderMap,
) -> bool {
match condition {
RouteMatch::PathPrefix(_path) => {
false
}
RouteMatch::HeaderEquals(header_name, expected_value) => headers
.get(header_name)
.and_then(|h| h.to_str().ok())
.map(|value| value == expected_value)
.unwrap_or(false),
RouteMatch::JsonFieldEquals(field_name, expected_value) => payload
.get(field_name)
.and_then(|v| v.as_str())
.map(|value| value == expected_value)
.unwrap_or(false),
}
}
#[cfg(feature = "http-input")]
async fn invoke_agent(
_runtime: &crate::AgentRuntime,
agent_id: AgentId,
input_data: Value,
) -> Result<Value, RuntimeError> {
use crate::types::{
EncryptedPayload, EncryptionAlgorithm, MessageId, MessageSignature, MessageType, RequestId,
SecureMessage, SignatureAlgorithm,
};
use bytes::Bytes;
let _message = SecureMessage {
id: MessageId::new(),
sender: AgentId::new(), recipient: Some(agent_id),
topic: None,
payload: EncryptedPayload {
data: Bytes::from(serde_json::to_vec(&input_data).unwrap_or_default()),
encryption_algorithm: EncryptionAlgorithm::None, nonce: vec![],
},
signature: MessageSignature {
signature: vec![],
algorithm: SignatureAlgorithm::None, public_key: vec![],
},
timestamp: std::time::SystemTime::now(),
ttl: std::time::Duration::from_secs(300),
message_type: MessageType::Request(RequestId::new()),
};
tracing::info!("Would invoke agent {} with input data", agent_id);
Ok(serde_json::json!({
"status": "invoked",
"agent_id": agent_id.to_string(),
"timestamp": chrono::Utc::now().to_rfc3339()
}))
}
#[cfg(feature = "http-input")]
fn format_success_response(
result: Value,
response_config: Option<&ResponseControlConfig>,
) -> Result<Response, StatusCode> {
let default_config = ResponseControlConfig::default();
let config = response_config.unwrap_or(&default_config);
let status = StatusCode::from_u16(config.default_status).unwrap_or(StatusCode::OK);
if config.agent_output_to_json {
Ok((status, Json(result)).into_response())
} else {
Ok((status, "OK").into_response())
}
}
#[cfg(feature = "http-input")]
fn format_error_response(
error: RuntimeError,
response_config: Option<&ResponseControlConfig>,
) -> Result<Response, StatusCode> {
let default_config = ResponseControlConfig::default();
let config = response_config.unwrap_or(&default_config);
let status =
StatusCode::from_u16(config.error_status).unwrap_or(StatusCode::INTERNAL_SERVER_ERROR);
let error_body = serde_json::json!({
"error": error.to_string(),
"timestamp": chrono::Utc::now().to_rfc3339()
});
Ok((status, Json(error_body)).into_response())
}
#[cfg(feature = "http-input")]
async fn resolve_secret_reference(
secret_store: &dyn SecretStore,
reference: &str,
) -> Result<String, RuntimeError> {
if reference.starts_with("vault://") || reference.starts_with("file://") {
let key = reference.split("://").nth(1).ok_or_else(|| {
RuntimeError::Configuration(crate::types::ConfigError::Invalid(
"Invalid secret reference format".to_string(),
))
})?;
let secret = secret_store
.get_secret(key)
.await
.map_err(|e| RuntimeError::Internal(format!("Secret resolution failed: {}", e)))?;
Ok(secret.value)
} else {
Ok(reference.to_string())
}
}
#[cfg(feature = "http-input")]
#[derive(Debug, Clone)]
struct RequestData {
path: String,
method: String,
headers: Vec<(String, String)>,
body: Vec<u8>,
query_params: Vec<(String, String)>,
}
#[cfg(feature = "http-input")]
#[derive(Debug, Clone)]
struct WebhookResponse {
status: u16,
body: String,
headers: Vec<(String, String)>,
}
#[cfg(feature = "http-input")]
pub async fn start_http_input(
config: HttpInputConfig,
runtime: Option<Arc<crate::AgentRuntime>>,
secrets_config: Option<SecretsConfig>,
) -> Result<(), RuntimeError> {
let mut server = HttpInputServer::new(config);
if let Some(runtime) = runtime {
server = server.with_runtime(runtime);
}
if let Some(secrets_config) = secrets_config {
let secret_store = new_secret_store(&secrets_config, "http_input")
.await
.map_err(|e| {
RuntimeError::Internal(format!("Failed to initialize secret store: {}", e))
})?;
server = server.with_secret_store(Arc::from(secret_store));
}
server.start().await
}