use crate::config::resolved::ResolvedEntity;
use crate::config::types::{EntityEventTrigger, EventCondition};
use serde_json::Value;
use std::sync::Arc;
pub struct DecisionHubClient {
base_url: String,
client: reqwest::Client,
}
impl DecisionHubClient {
pub fn from_env() -> Option<Arc<Self>> {
let base_url = std::env::var("DECISION_HUB_URL").ok()?;
let timeout_secs: u64 = std::env::var("DECISION_HUB_TIMEOUT_SECS")
.ok()
.and_then(|s| s.parse().ok())
.unwrap_or(5);
let client = reqwest::Client::builder()
.timeout(std::time::Duration::from_secs(timeout_secs))
.build()
.ok()?;
tracing::info!(url = %base_url, "decision-hub event publishing enabled");
Some(Arc::new(Self { base_url, client }))
}
async fn publish(&self, tenant_id: &str, event_type: &str, context: Value) {
let payload = serde_json::json!({
"tenant_id": tenant_id,
"event_type": event_type,
"context": context,
});
let url = format!("{}/evaluate", self.base_url);
match self.client.post(&url).json(&payload).send().await {
Ok(resp) if !resp.status().is_success() => {
tracing::warn!(
event_type = %event_type,
status = %resp.status().as_u16(),
"decision-hub rejected event"
);
}
Err(e) => {
tracing::warn!(event_type = %event_type, error = %e, "decision-hub publish failed");
}
Ok(_) => {
tracing::info!(event_type = %event_type, "decision-hub event accepted");
}
}
}
}
fn evaluate_condition(
condition: &EventCondition,
row: &Value,
pre_update_row: Option<&Value>,
) -> bool {
let new_val = row.get(&condition.field);
if let Some(target) = &condition.changed_to {
let now_matches = new_val == Some(target);
return match pre_update_row {
Some(old_row) => now_matches && old_row.get(&condition.field) != Some(target),
None => now_matches,
};
}
if let Some(target) = &condition.equals {
return new_val == Some(target);
}
if let Some(not_null) = condition.not_null {
let is_not_null = matches!(new_val, Some(v) if !v.is_null());
return is_not_null == not_null;
}
true
}
fn default_event_name(on: &str) -> &str {
match on {
"create" => "created",
"update" => "updated",
"delete" => "deleted",
"archive" => "archived",
other => other,
}
}
fn trigger_matches(
trigger: &EntityEventTrigger,
lifecycle: &str,
raw_row: &Value,
archive_field: Option<&str>,
pre_update_row: Option<&Value>,
) -> bool {
match trigger.on.as_str() {
on if on == lifecycle => {
if let Some(cond) = &trigger.condition {
evaluate_condition(cond, raw_row, pre_update_row)
} else {
true
}
}
"archive" if lifecycle == "update" => archive_field
.and_then(|f| raw_row.get(f))
.map(|v| !v.is_null())
.unwrap_or(false),
_ => false,
}
}
pub fn spawn_events(
client: Arc<DecisionHubClient>,
entity: &ResolvedEntity,
lifecycle: &'static str,
raw_row: Value,
api_row: Value,
tenant_id: String,
pre_update_row: Option<Value>,
) {
if entity.events.is_empty() {
return;
}
let triggers: Vec<EntityEventTrigger> = entity
.events
.iter()
.filter(|t| {
trigger_matches(
t,
lifecycle,
&raw_row,
entity.archive_field.as_deref(),
pre_update_row.as_ref(),
)
})
.cloned()
.collect();
if triggers.is_empty() {
return;
}
let package_id = entity.package_id.clone();
let table_name = entity.table_name.clone();
tokio::spawn(async move {
for trigger in &triggers {
let suffix = trigger
.event_name
.as_deref()
.unwrap_or_else(|| default_event_name(trigger.on.as_str()));
let event_type = format!("{}.{}:{}", package_id, table_name, suffix);
tracing::info!(
tenant_id = %tenant_id,
event_type = %event_type,
lifecycle = %lifecycle,
"publishing decision-hub event"
);
let context = serde_json::json!({
"entity": api_row,
"operation": lifecycle,
});
client.publish(&tenant_id, &event_type, context).await;
}
});
}