Skip to main content

shaperail_runtime/events/
emitter.rs

1use std::sync::Arc;
2
3use serde::{Deserialize, Serialize};
4use shaperail_core::{EventSubscriber, EventTarget, EventsConfig, ShaperailError};
5
6use crate::jobs::{JobPriority, JobQueue};
7
8/// An emitted event payload.
9#[derive(Debug, Clone, Serialize, Deserialize)]
10pub struct EventPayload {
11    /// Event name (e.g., "users.created").
12    pub event: String,
13    /// Resource name.
14    pub resource: String,
15    /// Action that triggered the event.
16    pub action: String,
17    /// The record data associated with the event.
18    pub data: serde_json::Value,
19    /// Timestamp of emission.
20    pub timestamp: String,
21    /// Unique event ID.
22    pub event_id: String,
23}
24
25/// Non-blocking event emitter that dispatches events via the job queue.
26///
27/// Events never block the HTTP response — they are always enqueued as jobs.
28#[derive(Clone)]
29pub struct EventEmitter {
30    job_queue: JobQueue,
31    subscribers: Arc<Vec<EventSubscriber>>,
32}
33
34impl EventEmitter {
35    /// Creates a new event emitter.
36    pub fn new(job_queue: JobQueue, config: Option<&EventsConfig>) -> Self {
37        let subscribers = config.map(|c| c.subscribers.clone()).unwrap_or_default();
38        Self {
39            job_queue,
40            subscribers: Arc::new(subscribers),
41        }
42    }
43
44    /// Emits an event non-blockingly via the job queue.
45    ///
46    /// This enqueues processing jobs for:
47    /// 1. The event log (always)
48    /// 2. Each matching subscriber target
49    pub async fn emit(
50        &self,
51        event_name: &str,
52        resource: &str,
53        action: &str,
54        data: serde_json::Value,
55    ) -> Result<String, ShaperailError> {
56        let event_id = uuid::Uuid::new_v4().to_string();
57        let timestamp = chrono::Utc::now().to_rfc3339();
58
59        let payload = EventPayload {
60            event: event_name.to_string(),
61            resource: resource.to_string(),
62            action: action.to_string(),
63            data,
64            timestamp,
65            event_id: event_id.clone(),
66        };
67
68        let payload_json = serde_json::to_value(&payload)
69            .map_err(|e| ShaperailError::Internal(format!("Failed to serialize event: {e}")))?;
70
71        // Always log the event
72        self.job_queue
73            .enqueue(
74                "shaperail:event_log",
75                payload_json.clone(),
76                JobPriority::Normal,
77            )
78            .await?;
79
80        // Dispatch to matching subscribers
81        let matching = self.find_matching_subscribers(event_name);
82        for subscriber in matching {
83            for target in &subscriber.targets {
84                self.dispatch_to_target(target, &payload_json).await?;
85            }
86        }
87
88        tracing::info!(
89            event = event_name,
90            event_id = %event_id,
91            resource = resource,
92            action = action,
93            "Event emitted"
94        );
95
96        Ok(event_id)
97    }
98
99    /// Finds subscribers whose event pattern matches the given event name.
100    fn find_matching_subscribers(&self, event_name: &str) -> Vec<&EventSubscriber> {
101        self.subscribers
102            .iter()
103            .filter(|s| event_matches(&s.event, event_name))
104            .collect()
105    }
106
107    /// Dispatches an event payload to a single target via the job queue.
108    async fn dispatch_to_target(
109        &self,
110        target: &EventTarget,
111        payload: &serde_json::Value,
112    ) -> Result<(), ShaperailError> {
113        match target {
114            EventTarget::Job { name } => {
115                self.job_queue
116                    .enqueue(name, payload.clone(), JobPriority::Normal)
117                    .await?;
118            }
119            EventTarget::Webhook { url } => {
120                let webhook_payload = serde_json::json!({
121                    "url": url,
122                    "payload": payload,
123                });
124                self.job_queue
125                    .enqueue(
126                        "shaperail:webhook_deliver",
127                        webhook_payload,
128                        JobPriority::High,
129                    )
130                    .await?;
131            }
132            EventTarget::Channel { name, room } => {
133                let channel_payload = serde_json::json!({
134                    "channel": name,
135                    "room": room,
136                    "payload": payload,
137                });
138                self.job_queue
139                    .enqueue(
140                        "shaperail:channel_broadcast",
141                        channel_payload,
142                        JobPriority::High,
143                    )
144                    .await?;
145            }
146            EventTarget::Hook { name } => {
147                let hook_payload = serde_json::json!({
148                    "hook": name,
149                    "payload": payload,
150                });
151                self.job_queue
152                    .enqueue("shaperail:hook_execute", hook_payload, JobPriority::Normal)
153                    .await?;
154            }
155        }
156        Ok(())
157    }
158}
159
160/// Matches an event name against a pattern.
161///
162/// Supports:
163/// - Exact match: "users.created" matches "users.created"
164/// - Wildcard prefix: "*.created" matches "users.created"
165/// - Wildcard suffix: "users.*" matches "users.created"
166/// - Full wildcard: "*" matches everything
167fn event_matches(pattern: &str, event_name: &str) -> bool {
168    if pattern == "*" {
169        return true;
170    }
171    if pattern == event_name {
172        return true;
173    }
174    if let Some(suffix) = pattern.strip_prefix("*.") {
175        return event_name.ends_with(&format!(".{suffix}"));
176    }
177    if let Some(prefix) = pattern.strip_suffix(".*") {
178        return event_name.starts_with(&format!("{prefix}."));
179    }
180    false
181}
182
183#[cfg(test)]
184mod tests {
185    use super::*;
186
187    #[test]
188    fn exact_match() {
189        assert!(event_matches("users.created", "users.created"));
190        assert!(!event_matches("users.created", "users.updated"));
191    }
192
193    #[test]
194    fn wildcard_all() {
195        assert!(event_matches("*", "users.created"));
196        assert!(event_matches("*", "orders.deleted"));
197    }
198
199    #[test]
200    fn wildcard_prefix() {
201        assert!(event_matches("*.created", "users.created"));
202        assert!(event_matches("*.created", "orders.created"));
203        assert!(!event_matches("*.created", "users.deleted"));
204    }
205
206    #[test]
207    fn wildcard_suffix() {
208        assert!(event_matches("users.*", "users.created"));
209        assert!(event_matches("users.*", "users.deleted"));
210        assert!(!event_matches("users.*", "orders.created"));
211    }
212
213    #[test]
214    fn no_partial_match() {
215        assert!(!event_matches("user", "users.created"));
216        assert!(!event_matches("users.create", "users.created"));
217    }
218
219    #[test]
220    fn event_payload_serde_roundtrip() {
221        let payload = EventPayload {
222            event: "users.created".to_string(),
223            resource: "users".to_string(),
224            action: "created".to_string(),
225            data: serde_json::json!({"id": "123", "name": "Alice"}),
226            timestamp: "2026-01-01T00:00:00Z".to_string(),
227            event_id: "evt-001".to_string(),
228        };
229        let json = serde_json::to_string(&payload).unwrap();
230        let back: EventPayload = serde_json::from_str(&json).unwrap();
231        assert_eq!(back.event, "users.created");
232        assert_eq!(back.event_id, "evt-001");
233    }
234}