architect_sdk/events/
mod.rs1use crate::config::resolved::ResolvedEntity;
12use crate::config::types::{EntityEventTrigger, EventCondition};
13use serde_json::Value;
14use std::sync::Arc;
15
16pub struct DecisionHubClient {
17 base_url: String,
18 client: reqwest::Client,
19}
20
21impl DecisionHubClient {
22 pub fn from_env() -> Option<Arc<Self>> {
23 let base_url = std::env::var("DECISION_HUB_URL").ok()?;
24 let timeout_secs: u64 = std::env::var("DECISION_HUB_TIMEOUT_SECS")
25 .ok()
26 .and_then(|s| s.parse().ok())
27 .unwrap_or(5);
28 let client = reqwest::Client::builder()
29 .timeout(std::time::Duration::from_secs(timeout_secs))
30 .build()
31 .ok()?;
32 tracing::info!(url = %base_url, "decision-hub event publishing enabled");
33 Some(Arc::new(Self { base_url, client }))
34 }
35
36 async fn publish(&self, tenant_id: &str, event_type: &str, context: Value) {
37 let payload = serde_json::json!({
38 "tenant_id": tenant_id,
39 "event_type": event_type,
40 "context": context,
41 });
42 let url = format!("{}/evaluate", self.base_url);
43 match self.client.post(&url).json(&payload).send().await {
44 Ok(resp) if !resp.status().is_success() => {
45 tracing::warn!(
46 event_type = %event_type,
47 status = %resp.status().as_u16(),
48 "decision-hub rejected event"
49 );
50 }
51 Err(e) => {
52 tracing::warn!(event_type = %event_type, error = %e, "decision-hub publish failed");
53 }
54 Ok(_) => {
55 tracing::info!(event_type = %event_type, "decision-hub event accepted");
56 }
57 }
58 }
59}
60
61fn evaluate_condition(
68 condition: &EventCondition,
69 row: &Value,
70 pre_update_row: Option<&Value>,
71) -> bool {
72 let new_val = row.get(&condition.field);
73 if let Some(target) = &condition.changed_to {
74 let now_matches = new_val == Some(target);
75 return match pre_update_row {
76 Some(old_row) => now_matches && old_row.get(&condition.field) != Some(target),
78 None => now_matches,
80 };
81 }
82 if let Some(target) = &condition.equals {
83 return new_val == Some(target);
84 }
85 if let Some(not_null) = condition.not_null {
86 let is_not_null = matches!(new_val, Some(v) if !v.is_null());
87 return is_not_null == not_null;
88 }
89 true
90}
91
92fn default_event_name(on: &str) -> &str {
93 match on {
94 "create" => "created",
95 "update" => "updated",
96 "delete" => "deleted",
97 "archive" => "archived",
98 other => other,
99 }
100}
101
102fn trigger_matches(
104 trigger: &EntityEventTrigger,
105 lifecycle: &str,
106 raw_row: &Value,
107 archive_field: Option<&str>,
108 pre_update_row: Option<&Value>,
109) -> bool {
110 match trigger.on.as_str() {
111 on if on == lifecycle => {
112 if let Some(cond) = &trigger.condition {
113 evaluate_condition(cond, raw_row, pre_update_row)
114 } else {
115 true
116 }
117 }
118 "archive" if lifecycle == "update" => archive_field
120 .and_then(|f| raw_row.get(f))
121 .map(|v| !v.is_null())
122 .unwrap_or(false),
123 _ => false,
124 }
125}
126
127pub fn spawn_events(
138 client: Arc<DecisionHubClient>,
139 entity: &ResolvedEntity,
140 lifecycle: &'static str,
141 raw_row: Value,
142 api_row: Value,
143 tenant_id: String,
144 pre_update_row: Option<Value>,
145) {
146 if entity.events.is_empty() {
147 return;
148 }
149
150 let triggers: Vec<EntityEventTrigger> = entity
151 .events
152 .iter()
153 .filter(|t| {
154 trigger_matches(
155 t,
156 lifecycle,
157 &raw_row,
158 entity.archive_field.as_deref(),
159 pre_update_row.as_ref(),
160 )
161 })
162 .cloned()
163 .collect();
164
165 if triggers.is_empty() {
166 return;
167 }
168
169 let package_id = entity.package_id.clone();
170 let table_name = entity.table_name.clone();
171
172 tokio::spawn(async move {
173 for trigger in &triggers {
174 let suffix = trigger
175 .event_name
176 .as_deref()
177 .unwrap_or_else(|| default_event_name(trigger.on.as_str()));
178 let event_type = format!("{}.{}:{}", package_id, table_name, suffix);
179 tracing::info!(
180 tenant_id = %tenant_id,
181 event_type = %event_type,
182 lifecycle = %lifecycle,
183 "publishing decision-hub event"
184 );
185 let context = serde_json::json!({
186 "entity": api_row,
187 "operation": lifecycle,
188 });
189 client.publish(&tenant_id, &event_type, context).await;
190 }
191 });
192}