use std::net::SocketAddr;
use std::sync::atomic::{AtomicU64, Ordering};
use std::sync::Arc;
use std::time::Instant;
use axum::body::Bytes;
use axum::extract::State;
use axum::http::{HeaderMap, HeaderValue, StatusCode};
use axum::response::{IntoResponse, Response};
use axum::routing::{get, post};
use axum::Router;
use secrecy::SecretString;
use serde::Serialize;
use tokio::sync::Mutex as AsyncMutex;
use tower_http::limit::RequestBodyLimitLayer;
use tracing::{error, info, warn};
use crate::auth::binding_secrets::BindingSecretStore;
use crate::error::{
OlError, OL_4220_HMAC_FAILED, OL_4221_MALFORMED_BODY, OL_4222_BINDING_NOT_CONFIGURED,
};
use crate::runtime::audit_log::{AuditTx, Record};
use crate::runtime::deadline::Deadline;
use crate::runtime::multi_tool::LocalRoute;
use crate::runtime::proxy;
use crate::runtime::reload::SharedRoutes;
use crate::runtime::replay_cache::{CachedVerdict, ReplayCache};
use crate::runtime::verdict;
use crate::runtime::webhook::{self, VerifyFailure};
use crate::telemetry::{self, Event};
pub const MAX_INBOUND_BYTES: usize = 1024 * 1024;
pub const DEFAULT_PORT: u16 = 8443;
pub struct RuntimeContext {
pub routes: SharedRoutes,
pub secrets: Arc<dyn BindingSecretStore>,
pub replay_cache: ReplayCache,
pub proxy_client: reqwest::Client,
pub audit_tx: Option<AuditTx>,
pub admin_token: Arc<AsyncMutex<String>>,
pub started_at: Instant,
pub events_processed: AtomicU64,
pub events_failed: AtomicU64,
pub manifest_path: std::path::PathBuf,
pub live_bindings: AsyncMutex<Vec<crate::api::editor::EditorBindingRow>>,
pub manifest_secret_ids: AsyncMutex<Vec<String>>,
pub update_in_progress: std::sync::atomic::AtomicBool,
pub update_status: std::sync::Mutex<crate::update::UpdateStatusSnapshot>,
pub admin_shutdown_request: tokio::sync::Notify,
pub supervisor: Option<Arc<crate::runtime::supervisor::Supervisor>>,
pub log_ansi: bool,
}
impl RuntimeContext {
pub fn uptime_ms(&self) -> u64 {
self.started_at.elapsed().as_millis() as u64
}
}
pub fn tls_mode_tag(no_tls: bool) -> &'static str {
if no_tls {
"proxy-fronted"
} else {
"direct"
}
}
#[derive(Debug, Serialize)]
struct Health {
status: &'static str,
uptime_secs: u64,
events_processed: u64,
events_failed: u64,
binding_count: usize,
bindings: Vec<crate::runtime::supervisor::ToolStatus>,
}
async fn handle_health(State(ctx): State<Arc<RuntimeContext>>) -> Response {
let snap = ctx.routes.snapshot();
let bindings = if let Some(sup) = &ctx.supervisor {
sup.snapshot().await
} else {
Vec::new()
};
let body = Health {
status: "ok",
uptime_secs: ctx.started_at.elapsed().as_secs(),
events_processed: ctx.events_processed.load(Ordering::Relaxed),
events_failed: ctx.events_failed.load(Ordering::Relaxed),
binding_count: snap.len(),
bindings,
};
(StatusCode::OK, axum::Json(body)).into_response()
}
async fn handle_event(
State(ctx): State<Arc<RuntimeContext>>,
headers: HeaderMap,
body: Bytes,
) -> Response {
let started = Instant::now();
let snap = ctx.routes.snapshot();
let webhook_id = match header_str(&headers, "webhook-id") {
Some(v) => v.to_string(),
None => return ol_4220_response("missing webhook-id header"),
};
let webhook_ts =
match header_str(&headers, "webhook-timestamp").and_then(|s| s.parse::<i64>().ok()) {
Some(v) => v,
None => return ol_4220_response("missing or malformed webhook-timestamp header"),
};
let signature = match header_str(&headers, "webhook-signature") {
Some(v) => v.to_string(),
None => return ol_4220_response("missing webhook-signature header"),
};
let binding_id = match header_str(&headers, "x-openlatch-binding-id") {
Some(v) => v.to_string(),
None => return ol_4222_response("missing X-OpenLatch-Binding-Id header"),
};
let event_id = header_str(&headers, "x-openlatch-event-id")
.map(str::to_string)
.unwrap_or_else(|| format!("evt_{}", uuid::Uuid::now_v7().simple()));
let deadline_ms: u64 = header_str(&headers, "x-openlatch-deadline-ms")
.and_then(|s| s.parse::<u64>().ok())
.unwrap_or(200);
let route: LocalRoute = match snap.lookup(&binding_id) {
Some(r) => r.clone(),
None => {
ctx.events_failed.fetch_add(1, Ordering::Relaxed);
audit(
ctx.as_ref(),
Record::now(&event_id, &binding_id, "not_configured"),
);
return ol_4222_response(&format!("binding `{binding_id}` is not configured locally"));
}
};
let secret: SecretString = match ctx.secrets.retrieve(&binding_id) {
Ok(s) => s,
Err(_) => {
ctx.events_failed.fetch_add(1, Ordering::Relaxed);
audit(
ctx.as_ref(),
Record::now(&event_id, &binding_id, "secret_missing"),
);
return ol_4222_response(&format!("no local secret for binding `{binding_id}`"));
}
};
if let Err(err) = webhook::verify(&secret, &webhook_id, webhook_ts, &body, &signature) {
let kind = match err.code.code {
"OL-4226" => VerifyFailure::Timestamp,
_ => VerifyFailure::Hmac,
};
warn!(
binding_id = %binding_id,
webhook_id = %webhook_id,
code = %err.code,
"webhook verification failed"
);
telemetry::capture_global(Event::webhook_verify_failed(
&binding_id,
kind.telemetry_kind(),
));
ctx.events_failed.fetch_add(1, Ordering::Relaxed);
audit(
ctx.as_ref(),
Record::now(&event_id, &binding_id, "hmac_failed"),
);
return error_response(StatusCode::UNAUTHORIZED, &err);
}
if let Some(cached) = ctx.replay_cache.lookup(&webhook_id) {
info!(
binding_id = %binding_id,
webhook_id = %webhook_id,
"replay cache hit; returning cached verdict"
);
ctx.events_processed.fetch_add(1, Ordering::Relaxed);
audit(
ctx.as_ref(),
Record::now(&event_id, &binding_id, "replay_cache_hit"),
);
return cached_response(&cached);
}
if let Err(e) = verdict::ensure_json(&body) {
ctx.events_failed.fetch_add(1, Ordering::Relaxed);
audit(
ctx.as_ref(),
Record::now(&event_id, &binding_id, "malformed_body"),
);
return error_response(StatusCode::BAD_REQUEST, &e);
}
let deadline = Deadline::from_budget_ms(deadline_ms);
let proxy_started = Instant::now();
let proxy_outcome = match proxy::proxy(&ctx.proxy_client, &route, body.clone(), deadline).await
{
Ok(o) => o,
Err(err) => {
let kind = proxy::telemetry_kind(&err);
warn!(
binding_id = %binding_id,
code = %err.code,
"localhost proxy call failed"
);
telemetry::capture_global(Event::proxy_call_failed(&binding_id, kind));
ctx.events_failed.fetch_add(1, Ordering::Relaxed);
let outcome = match err.code.code {
"OL-4223" => "oversize",
"OL-4224" => "tool_unreachable",
"OL-4225" => "tool_5xx",
"OL-4228" => "timeout",
_ => "tool_unreachable",
};
audit(ctx.as_ref(), Record::now(&event_id, &binding_id, outcome));
let status = if err.code.code == "OL-4228" {
StatusCode::GATEWAY_TIMEOUT
} else {
StatusCode::BAD_GATEWAY
};
return error_response(status, &err);
}
};
let tool_ms = proxy_started.elapsed().as_millis() as u64;
if let Err(e) = verdict::validate_body_size(&proxy_outcome.body) {
ctx.events_failed.fetch_add(1, Ordering::Relaxed);
audit(
ctx.as_ref(),
Record::now(&event_id, &binding_id, "oversize"),
);
return error_response(StatusCode::BAD_GATEWAY, &e);
}
let signed = match verdict::sign(&secret, &proxy_outcome.body) {
Ok(h) => h,
Err(e) => {
ctx.events_failed.fetch_add(1, Ordering::Relaxed);
audit(
ctx.as_ref(),
Record::now(&event_id, &binding_id, "sign_failed"),
);
return error_response(StatusCode::INTERNAL_SERVER_ERROR, &e);
}
};
let processing_ms = started.elapsed().as_millis() as u64;
let cached = CachedVerdict {
status: 200,
body: proxy_outcome.body.clone(),
webhook_id: signed.webhook_id.clone(),
webhook_timestamp: signed.webhook_timestamp,
webhook_signature: signed.webhook_signature.clone(),
processing_ms,
};
ctx.replay_cache.insert(webhook_id.clone(), cached);
ctx.events_processed.fetch_add(1, Ordering::Relaxed);
let parsed_for_log = verdict::parse_lossy(&proxy_outcome.body);
let parsed_value = parsed_for_log
.as_ref()
.and_then(|v| serde_json::to_value(v).ok());
let record = crate::runtime::audit_log::record_from_verdict(
&event_id,
&binding_id,
parsed_value.as_ref(),
"delivered",
processing_ms,
tool_ms,
);
audit(ctx.as_ref(), record);
let verdict_str = crate::observability::verdict_display(
parsed_for_log.as_ref().and_then(|v| v.verdict_hint),
ctx.log_ansi,
);
info!(
event_id = %event_id,
binding_id = %binding_id,
verdict = %verdict_str,
processing_ms,
tool_ms,
"event delivered"
);
success_response(&proxy_outcome.body, &signed, processing_ms)
}
fn header_str<'a>(headers: &'a HeaderMap, name: &str) -> Option<&'a str> {
headers.get(name).and_then(|v| v.to_str().ok())
}
fn ol_4220_response(message: &str) -> Response {
let err = OlError::new(OL_4220_HMAC_FAILED, message);
error_response(StatusCode::UNAUTHORIZED, &err)
}
fn ol_4222_response(message: &str) -> Response {
let err = OlError::new(OL_4222_BINDING_NOT_CONFIGURED, message);
error_response(StatusCode::NOT_FOUND, &err)
}
#[allow(dead_code)]
fn ol_4221_response(message: &str) -> Response {
let err = OlError::new(OL_4221_MALFORMED_BODY, message);
error_response(StatusCode::BAD_REQUEST, &err)
}
fn error_response(status: StatusCode, err: &OlError) -> Response {
let body = serde_json::json!({
"error": {
"code": err.code.code,
"message": err.message,
"docs_url": err.code.docs_url,
}
});
(status, axum::Json(body)).into_response()
}
fn cached_response(cached: &CachedVerdict) -> Response {
let mut resp = (StatusCode::OK, cached.body.clone()).into_response();
apply_signed_headers(
resp.headers_mut(),
&cached.webhook_id,
cached.webhook_timestamp,
&cached.webhook_signature,
cached.processing_ms,
);
resp
}
fn success_response(body: &Bytes, signed: &webhook::SignedHeaders, processing_ms: u64) -> Response {
let mut resp = (StatusCode::OK, body.clone()).into_response();
apply_signed_headers(
resp.headers_mut(),
&signed.webhook_id,
signed.webhook_timestamp,
&signed.webhook_signature,
processing_ms,
);
resp
}
fn apply_signed_headers(
headers: &mut HeaderMap,
webhook_id: &str,
webhook_timestamp: i64,
signature: &str,
processing_ms: u64,
) {
if let Ok(v) = HeaderValue::from_str(webhook_id) {
headers.insert("webhook-id", v);
}
if let Ok(v) = HeaderValue::from_str(&webhook_timestamp.to_string()) {
headers.insert("webhook-timestamp", v);
}
if let Ok(v) = HeaderValue::from_str(signature) {
headers.insert("webhook-signature", v);
}
headers.insert("content-type", HeaderValue::from_static("application/json"));
if let Ok(v) = HeaderValue::from_str(&processing_ms.to_string()) {
headers.insert("X-OpenLatch-Provider-Processing-Ms", v);
}
}
fn audit(ctx: &RuntimeContext, record: Record) {
if let Some(tx) = &ctx.audit_tx {
tx.append(record);
}
}
pub fn build_router(ctx: Arc<RuntimeContext>) -> Router {
Router::new()
.route("/v1/event", post(handle_event))
.route("/v1/health", get(handle_health))
.layer(RequestBodyLimitLayer::new(MAX_INBOUND_BYTES))
.with_state(ctx)
}
pub async fn serve_plain(
bind_addr: SocketAddr,
ctx: Arc<RuntimeContext>,
shutdown: impl std::future::Future<Output = ()> + Send + 'static,
) -> Result<SocketAddr, OlError> {
let listener = tokio::net::TcpListener::bind(bind_addr)
.await
.map_err(|e| OlError::new(OL_4220_HMAC_FAILED, format!("bind {bind_addr}: {e}")))?;
let local = listener
.local_addr()
.map_err(|e| OlError::new(OL_4220_HMAC_FAILED, format!("local_addr: {e}")))?;
let app = build_router(ctx);
info!(addr = %local, "runtime listening (plain)");
let server = axum::serve(listener, app).with_graceful_shutdown(shutdown);
if let Err(e) = server.await {
error!(error = %e, "axum serve error");
return Err(OlError::new(OL_4220_HMAC_FAILED, format!("serve: {e}")));
}
Ok(local)
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn tls_mode_tag_branches() {
assert_eq!(tls_mode_tag(true), "proxy-fronted");
assert_eq!(tls_mode_tag(false), "direct");
}
}