use std::sync::atomic::Ordering;
use std::sync::Arc;
use axum::{
body::Bytes,
extract::State,
http::{header::CONTENT_TYPE, HeaderMap, StatusCode},
Json,
};
use crate::daemon::AppState;
use crate::envelope::{
new_event_id, AgentType, EventEnvelope, HookEventType, Verdict, VerdictResponse,
};
use crate::privacy;
const CT_CLOUDEVENTS_SINGLE: &str = "application/cloudevents+json";
const CT_CLOUDEVENTS_BATCH: &str = "application/cloudevents-batch+json";
const CT_JSON: &str = "application/json";
pub async fn ingest_cloudevent(
State(state): State<Arc<AppState>>,
headers: HeaderMap,
body: Bytes,
) -> (StatusCode, HeaderMap, Json<VerdictResponse>) {
let start = std::time::Instant::now();
let ct = headers
.get(CONTENT_TYPE)
.and_then(|v| v.to_str().ok())
.unwrap_or("");
let is_batch = ct.starts_with(CT_CLOUDEVENTS_BATCH);
let is_single = ct.starts_with(CT_CLOUDEVENTS_SINGLE) || ct.starts_with(CT_JSON);
if !is_batch && !is_single {
return reject(
StatusCode::UNSUPPORTED_MEDIA_TYPE,
"unsupported Content-Type; expected application/cloudevents+json or application/cloudevents-batch+json",
start,
);
}
let envelopes: Vec<EventEnvelope> = if is_batch {
match serde_json::from_slice::<Vec<EventEnvelope>>(&body) {
Ok(v) if !v.is_empty() => v,
Ok(_) => {
return reject(
StatusCode::BAD_REQUEST,
"cloudevents-batch body must not be empty",
start,
);
}
Err(e) => {
return reject(
StatusCode::BAD_REQUEST,
&format!("invalid cloudevents batch body: {e}"),
start,
);
}
}
} else {
match serde_json::from_slice::<EventEnvelope>(&body) {
Ok(env) => vec![env],
Err(e) => {
return reject(
StatusCode::BAD_REQUEST,
&format!("invalid cloudevent body: {e}"),
start,
);
}
}
};
let mut worst: Option<(HookEventType, String)> = None;
let mut response_headers = HeaderMap::new();
for envelope in envelopes {
let (ev_type, ev_id, dedup_hit) = process_envelope(state.clone(), envelope, start).await;
if dedup_hit {
response_headers.insert(
"x-openlatch-dedup",
"true".parse().expect("static header value is valid"),
);
}
let is_stop = matches!(ev_type, HookEventType::Stop);
match &worst {
None => worst = Some((ev_type, ev_id)),
Some((HookEventType::Stop, _)) => {}
Some(_) if is_stop => worst = Some((ev_type, ev_id)),
_ => {}
}
}
let (verdict_ev_type, verdict_ev_id) = worst.unwrap_or_else(|| {
(HookEventType::Unknown(String::new()), new_event_id())
});
let latency_ms = start.elapsed().as_secs_f64() * 1000.0;
let response = match verdict_ev_type {
HookEventType::Stop => VerdictResponse::approve(verdict_ev_id, latency_ms),
_ => VerdictResponse::allow(verdict_ev_id, latency_ms),
};
(StatusCode::OK, response_headers, Json(response))
}
async fn process_envelope(
state: Arc<AppState>,
mut envelope: EventEnvelope,
_start: std::time::Instant,
) -> (HookEventType, String, bool) {
let event_type = envelope.type_.clone();
let event_id = envelope.id.clone();
let subject = envelope.subject.clone().unwrap_or_else(|| "unknown".into());
let type_str = envelope.type_.as_str().to_string();
let data_for_dedup = envelope.data.clone().unwrap_or(serde_json::Value::Null);
let is_duplicate = state
.dedup
.check_and_insert(&subject, &type_str, &data_for_dedup);
if is_duplicate {
tracing::debug!(
code = crate::error::ERR_EVENT_DEDUPED,
session_id = %subject,
event_type = %type_str,
"Event deduplicated within TTL window"
);
return (event_type, event_id, true);
}
if matches!(envelope.source, AgentType::Unknown(_)) {
crate::telemetry::capture_hook_source_unknown(envelope.source.as_str());
}
if matches!(envelope.type_, HookEventType::Unknown(_)) {
crate::telemetry::capture_hook_type_unknown(envelope.type_.as_str());
}
if let Some(data) = envelope.data.as_mut() {
privacy::filter_event_with(data, &state.privacy_filter);
}
if envelope.os.is_none() {
envelope.os = Some(crate::envelope::os_string().to_string());
}
if envelope.arch.is_none() {
envelope.arch = Some(crate::envelope::arch_string().to_string());
}
envelope.clientversion = Some(env!("CARGO_PKG_VERSION").to_string());
if envelope.datacontenttype.is_none() {
envelope.datacontenttype = Some("application/json".to_string());
}
if envelope.localipv4.is_none() {
envelope.localipv4 = state.local_ipv4;
}
if envelope.localipv6.is_none() {
envelope.localipv6 = state.local_ipv6;
}
if envelope.publicipv4.is_none() {
envelope.publicipv4 = state.public_ipv4;
}
if envelope.publicipv6.is_none() {
envelope.publicipv6 = state.public_ipv6;
}
let mut envelope_value = serde_json::to_value(&envelope).unwrap_or(serde_json::Value::Null);
let verdict_for_stored = match event_type {
HookEventType::Stop => Verdict::Approve,
_ => Verdict::Allow,
};
let latency_snapshot_ms = _start.elapsed().as_millis() as u64;
if let Some(obj) = envelope_value.as_object_mut() {
obj.insert(
"olverdict".to_string(),
serde_json::json!(match verdict_for_stored {
Verdict::Allow => "allow",
Verdict::Approve => "approve",
Verdict::Deny => "deny",
}),
);
obj.insert(
"ollatencyms".to_string(),
serde_json::json!(latency_snapshot_ms),
);
}
let envelope_json = serde_json::to_string(&envelope_value).unwrap_or_default();
state.event_logger.log(envelope_json);
state.event_counter.fetch_add(1, Ordering::Relaxed);
if let Some(tx) = &state.cloud_tx {
let agent_id = state.config.agent_id.as_deref().unwrap_or_default();
if agent_id.is_empty() {
tracing::warn!(
code = "OL-1200",
"cloud forward skipped: [daemon].agent_id is empty in config.toml — run 'openlatch init' to populate it"
);
} else {
let cloud_event = crate::cloud::CloudEvent {
envelope: serde_json::to_value(&envelope).unwrap_or_default(),
agent_id: agent_id.to_string(),
};
match tx.try_send(cloud_event) {
Ok(()) => {}
Err(tokio::sync::mpsc::error::TrySendError::Full(_)) => {
tracing::warn!(code = "OL-1200", "cloud channel full - event dropped");
}
Err(tokio::sync::mpsc::error::TrySendError::Closed(_)) => {
tracing::warn!("cloud channel closed - worker has exited");
}
}
}
}
(event_type, event_id, false)
}
fn reject(
status: StatusCode,
message: &str,
start: std::time::Instant,
) -> (StatusCode, HeaderMap, Json<VerdictResponse>) {
tracing::warn!(%message, http_status = %status, "cloudevents ingest rejected");
let latency_ms = start.elapsed().as_secs_f64() * 1000.0;
let response = VerdictResponse::allow(new_event_id(), latency_ms);
(status, HeaderMap::new(), Json(response))
}
pub async fn health(State(state): State<Arc<AppState>>) -> Json<serde_json::Value> {
let uptime_secs = state.started_at.elapsed().as_secs();
Json(serde_json::json!({
"status": "ok",
"version": env!("CARGO_PKG_VERSION"),
"uptime_secs": uptime_secs,
}))
}
pub async fn metrics(State(state): State<Arc<AppState>>) -> Json<serde_json::Value> {
let events = state.event_counter.load(Ordering::Relaxed);
let uptime_secs = state.started_at.elapsed().as_secs();
let update_available = state.get_available_update();
let (
cloud_status,
cloud_forwarded_count,
cloud_last_sync_secs,
cloud_drop_count,
cloud_api_url,
) = if let Some(ref cs) = state.cloud_state {
let forwarded = cs.forwarded_count();
let last_sync = cs.last_sync_secs();
let drops = cs.drop_count();
let status = if cs.is_auth_error() {
"auth_error"
} else if cs.is_no_credential() {
"no_credential"
} else if cs.consecutive_drops() > 0 {
"network_error"
} else {
"connected"
};
(
status,
forwarded,
last_sync,
drops,
state.config.cloud.api_url.as_str(),
)
} else {
("not_configured", 0u64, 0u64, 0u64, "")
};
Json(serde_json::json!({
"events_processed": events,
"uptime_secs": uptime_secs,
"update_available": update_available,
"cloud_status": cloud_status,
"cloud_forwarded_count": cloud_forwarded_count,
"cloud_last_sync_secs": cloud_last_sync_secs,
"cloud_drop_count": cloud_drop_count,
"cloud_api_url": cloud_api_url,
}))
}
pub async fn shutdown_handler(State(state): State<Arc<AppState>>) -> StatusCode {
let mut tx = state.shutdown_tx.lock().await;
if let Some(sender) = tx.take() {
let _ = sender.send(());
StatusCode::OK
} else {
StatusCode::GONE
}
}