Skip to main content

architect_sdk/events/
mod.rs

1//! Decision-hub event publishing. Active only when DECISION_HUB_URL env var is set.
2//!
3//! After a successful CRUD operation the handler calls `spawn_events()` which
4//! evaluates configured triggers against the saved row and fires matching events
5//! to the decision-hub `/evaluate` endpoint inside a detached tokio task — the
6//! HTTP response is already on the wire before the publish begins.
7//!
8//! Event type format: `{package_id}.{table_name}:{event_name}`
9//! Example: `manufacturing_core.materials:published`
10
11use crate::config::resolved::ResolvedEntity;
12use crate::config::types::{EntityEventTrigger, EventCondition};
13use serde_json::Value;
14use std::sync::Arc;
15
16pub struct DecisionHubClient {
17    base_url: String,
18    client: reqwest::Client,
19}
20
21impl DecisionHubClient {
22    pub fn from_env() -> Option<Arc<Self>> {
23        let base_url = std::env::var("DECISION_HUB_URL").ok()?;
24        let timeout_secs: u64 = std::env::var("DECISION_HUB_TIMEOUT_SECS")
25            .ok()
26            .and_then(|s| s.parse().ok())
27            .unwrap_or(5);
28        let client = reqwest::Client::builder()
29            .timeout(std::time::Duration::from_secs(timeout_secs))
30            .build()
31            .ok()?;
32        tracing::info!(url = %base_url, "decision-hub event publishing enabled");
33        Some(Arc::new(Self { base_url, client }))
34    }
35
36    async fn publish(&self, tenant_id: &str, event_type: &str, context: Value) {
37        let payload = serde_json::json!({
38            "tenant_id": tenant_id,
39            "event_type": event_type,
40            "context": context,
41        });
42        let url = format!("{}/evaluate", self.base_url);
43        match self.client.post(&url).json(&payload).send().await {
44            Ok(resp) if !resp.status().is_success() => {
45                tracing::warn!(
46                    event_type = %event_type,
47                    status = %resp.status().as_u16(),
48                    "decision-hub rejected event"
49                );
50            }
51            Err(e) => {
52                tracing::warn!(event_type = %event_type, error = %e, "decision-hub publish failed");
53            }
54            Ok(_) => {
55                tracing::info!(event_type = %event_type, "decision-hub event accepted");
56            }
57        }
58    }
59}
60
61/// Returns true when the trigger's condition is satisfied.
62///
63/// `row` is the post-operation snake_case row (new state).
64/// `pre_update_row` is the row fetched from DB *before* the update — only supplied for the
65/// "update" lifecycle when the entity has `changed_to` conditions. When present, `changed_to`
66/// requires a genuine transition: the field must have been a different value before the update.
67fn evaluate_condition(
68    condition: &EventCondition,
69    row: &Value,
70    pre_update_row: Option<&Value>,
71) -> bool {
72    let new_val = row.get(&condition.field);
73    if let Some(target) = &condition.changed_to {
74        let now_matches = new_val == Some(target);
75        return match pre_update_row {
76            // With old state: require old ≠ target AND new == target (real transition).
77            Some(old_row) => now_matches && old_row.get(&condition.field) != Some(target),
78            // Without old state: fall back to checking the new value only.
79            None => now_matches,
80        };
81    }
82    if let Some(target) = &condition.equals {
83        return new_val == Some(target);
84    }
85    if let Some(not_null) = condition.not_null {
86        let is_not_null = matches!(new_val, Some(v) if !v.is_null());
87        return is_not_null == not_null;
88    }
89    true
90}
91
92fn default_event_name(on: &str) -> &str {
93    match on {
94        "create" => "created",
95        "update" => "updated",
96        "delete" => "deleted",
97        "archive" => "archived",
98        other => other,
99    }
100}
101
102/// Check whether a trigger matches the current lifecycle + row state.
103fn trigger_matches(
104    trigger: &EntityEventTrigger,
105    lifecycle: &str,
106    raw_row: &Value,
107    archive_field: Option<&str>,
108    pre_update_row: Option<&Value>,
109) -> bool {
110    match trigger.on.as_str() {
111        on if on == lifecycle => {
112            if let Some(cond) = &trigger.condition {
113                evaluate_condition(cond, raw_row, pre_update_row)
114            } else {
115                true
116            }
117        }
118        // "archive" triggers fire during an update when archive_field transitions to non-null.
119        "archive" if lifecycle == "update" => archive_field
120            .and_then(|f| raw_row.get(f))
121            .map(|v| !v.is_null())
122            .unwrap_or(false),
123        _ => false,
124    }
125}
126
127/// Spawn a background task that publishes matching event triggers to decision-hub.
128///
129/// - `lifecycle`: `"create"` | `"update"` | `"delete"`
130/// - `raw_row`: snake_case row used for condition evaluation (post-operation state)
131/// - `api_row`: camelCase row sent as the event context (sensitive columns already stripped)
132/// - `pre_update_row`: snake_case row fetched from DB *before* the update; pass `Some` for the
133///   "update" lifecycle when `changed_to` conditions are present so transitions are detected
134///   accurately. `None` for create/delete or when no `changed_to` conditions exist.
135///
136/// Returns immediately; the HTTP publish happens after the response is sent.
137pub fn spawn_events(
138    client: Arc<DecisionHubClient>,
139    entity: &ResolvedEntity,
140    lifecycle: &'static str,
141    raw_row: Value,
142    api_row: Value,
143    tenant_id: String,
144    pre_update_row: Option<Value>,
145) {
146    if entity.events.is_empty() {
147        return;
148    }
149
150    let triggers: Vec<EntityEventTrigger> = entity
151        .events
152        .iter()
153        .filter(|t| {
154            trigger_matches(
155                t,
156                lifecycle,
157                &raw_row,
158                entity.archive_field.as_deref(),
159                pre_update_row.as_ref(),
160            )
161        })
162        .cloned()
163        .collect();
164
165    if triggers.is_empty() {
166        return;
167    }
168
169    let package_id = entity.package_id.clone();
170    let table_name = entity.table_name.clone();
171
172    tokio::spawn(async move {
173        for trigger in &triggers {
174            let suffix = trigger
175                .event_name
176                .as_deref()
177                .unwrap_or_else(|| default_event_name(trigger.on.as_str()));
178            let event_type = format!("{}.{}:{}", package_id, table_name, suffix);
179            tracing::info!(
180                tenant_id = %tenant_id,
181                event_type = %event_type,
182                lifecycle = %lifecycle,
183                "publishing decision-hub event"
184            );
185            let context = serde_json::json!({
186                "entity": api_row,
187                "operation": lifecycle,
188            });
189            client.publish(&tenant_id, &event_type, context).await;
190        }
191    });
192}