shaperail_runtime/events/
emitter.rs1use std::sync::Arc;
2
3use serde::{Deserialize, Serialize};
4use shaperail_core::{EventSubscriber, EventTarget, EventsConfig, ShaperailError};
5
6use crate::jobs::{JobPriority, JobQueue};
7
8#[derive(Debug, Clone, Serialize, Deserialize)]
10pub struct EventPayload {
11 pub event: String,
13 pub resource: String,
15 pub action: String,
17 pub data: serde_json::Value,
19 pub timestamp: String,
21 pub event_id: String,
23}
24
25#[derive(Clone)]
29pub struct EventEmitter {
30 job_queue: JobQueue,
31 subscribers: Arc<Vec<EventSubscriber>>,
32}
33
34impl EventEmitter {
35 pub fn new(job_queue: JobQueue, config: Option<&EventsConfig>) -> Self {
37 let subscribers = config.map(|c| c.subscribers.clone()).unwrap_or_default();
38 Self {
39 job_queue,
40 subscribers: Arc::new(subscribers),
41 }
42 }
43
44 pub async fn emit(
50 &self,
51 event_name: &str,
52 resource: &str,
53 action: &str,
54 data: serde_json::Value,
55 ) -> Result<String, ShaperailError> {
56 let event_id = uuid::Uuid::new_v4().to_string();
57 let timestamp = chrono::Utc::now().to_rfc3339();
58
59 let payload = EventPayload {
60 event: event_name.to_string(),
61 resource: resource.to_string(),
62 action: action.to_string(),
63 data,
64 timestamp,
65 event_id: event_id.clone(),
66 };
67
68 let payload_json = serde_json::to_value(&payload)
69 .map_err(|e| ShaperailError::Internal(format!("Failed to serialize event: {e}")))?;
70
71 self.job_queue
73 .enqueue(
74 "shaperail:event_log",
75 payload_json.clone(),
76 JobPriority::Normal,
77 )
78 .await?;
79
80 let matching = self.find_matching_subscribers(event_name);
82 for subscriber in matching {
83 for target in &subscriber.targets {
84 self.dispatch_to_target(target, &payload_json).await?;
85 }
86 }
87
88 tracing::info!(
89 event = event_name,
90 event_id = %event_id,
91 resource = resource,
92 action = action,
93 "Event emitted"
94 );
95
96 Ok(event_id)
97 }
98
99 fn find_matching_subscribers(&self, event_name: &str) -> Vec<&EventSubscriber> {
101 self.subscribers
102 .iter()
103 .filter(|s| event_matches(&s.event, event_name))
104 .collect()
105 }
106
107 async fn dispatch_to_target(
109 &self,
110 target: &EventTarget,
111 payload: &serde_json::Value,
112 ) -> Result<(), ShaperailError> {
113 match target {
114 EventTarget::Job { name } => {
115 self.job_queue
116 .enqueue(name, payload.clone(), JobPriority::Normal)
117 .await?;
118 }
119 EventTarget::Webhook { url } => {
120 let webhook_payload = serde_json::json!({
121 "url": url,
122 "payload": payload,
123 });
124 self.job_queue
125 .enqueue(
126 "shaperail:webhook_deliver",
127 webhook_payload,
128 JobPriority::High,
129 )
130 .await?;
131 }
132 EventTarget::Channel { name, room } => {
133 let channel_payload = serde_json::json!({
134 "channel": name,
135 "room": room,
136 "payload": payload,
137 });
138 self.job_queue
139 .enqueue(
140 "shaperail:channel_broadcast",
141 channel_payload,
142 JobPriority::High,
143 )
144 .await?;
145 }
146 EventTarget::Hook { name } => {
147 let hook_payload = serde_json::json!({
148 "hook": name,
149 "payload": payload,
150 });
151 self.job_queue
152 .enqueue("shaperail:hook_execute", hook_payload, JobPriority::Normal)
153 .await?;
154 }
155 }
156 Ok(())
157 }
158}
159
160fn event_matches(pattern: &str, event_name: &str) -> bool {
168 if pattern == "*" {
169 return true;
170 }
171 if pattern == event_name {
172 return true;
173 }
174 if let Some(suffix) = pattern.strip_prefix("*.") {
175 return event_name.ends_with(&format!(".{suffix}"));
176 }
177 if let Some(prefix) = pattern.strip_suffix(".*") {
178 return event_name.starts_with(&format!("{prefix}."));
179 }
180 false
181}
182
183#[cfg(test)]
184mod tests {
185 use super::*;
186
187 #[test]
188 fn exact_match() {
189 assert!(event_matches("users.created", "users.created"));
190 assert!(!event_matches("users.created", "users.updated"));
191 }
192
193 #[test]
194 fn wildcard_all() {
195 assert!(event_matches("*", "users.created"));
196 assert!(event_matches("*", "orders.deleted"));
197 }
198
199 #[test]
200 fn wildcard_prefix() {
201 assert!(event_matches("*.created", "users.created"));
202 assert!(event_matches("*.created", "orders.created"));
203 assert!(!event_matches("*.created", "users.deleted"));
204 }
205
206 #[test]
207 fn wildcard_suffix() {
208 assert!(event_matches("users.*", "users.created"));
209 assert!(event_matches("users.*", "users.deleted"));
210 assert!(!event_matches("users.*", "orders.created"));
211 }
212
213 #[test]
214 fn no_partial_match() {
215 assert!(!event_matches("user", "users.created"));
216 assert!(!event_matches("users.create", "users.created"));
217 }
218
219 #[test]
220 fn event_payload_serde_roundtrip() {
221 let payload = EventPayload {
222 event: "users.created".to_string(),
223 resource: "users".to_string(),
224 action: "created".to_string(),
225 data: serde_json::json!({"id": "123", "name": "Alice"}),
226 timestamp: "2026-01-01T00:00:00Z".to_string(),
227 event_id: "evt-001".to_string(),
228 };
229 let json = serde_json::to_string(&payload).unwrap();
230 let back: EventPayload = serde_json::from_str(&json).unwrap();
231 assert_eq!(back.event, "users.created");
232 assert_eq!(back.event_id, "evt-001");
233 }
234}