Skip to main content

this/server/exposure/rest/
sse.rs

1//! Server-Sent Events (SSE) endpoint for real-time event streaming
2//!
3//! Provides a `GET /events/stream` endpoint that streams events from
4//! the EventBus as SSE (text/event-stream). Supports filtering by
5//! query parameters and sends heartbeat comments every 30 seconds.
6//!
7//! # Query parameters
8//!
9//! - `kind` — Filter by event kind: "entity" or "link"
10//! - `entity_type` — Filter by entity type (e.g., "user", "order") or link type (e.g., "follows")
11//! - `event_type` — Filter by action: "created", "updated", "deleted"
12//!
13//! All filters are optional. When absent, all events are streamed.
14//!
15//! # Example
16//!
17//! ```text
18//! GET /events/stream?kind=entity&entity_type=user
19//!
20//! data: {"kind":"entity","action":"created","entity_type":"user","entity_id":"...","data":{...},"timestamp":"..."}
21//!
22//! : heartbeat
23//!
24//! data: {"kind":"entity","action":"updated","entity_type":"user","entity_id":"...","data":{...},"timestamp":"..."}
25//! ```
26
27use crate::core::events::{EntityEvent, EventBus, EventEnvelope, FrameworkEvent, LinkEvent};
28use axum::extract::{Query, State};
29use axum::response::sse::{Event, KeepAlive, Sse};
30use futures::StreamExt;
31use futures::stream::Stream;
32use serde::Deserialize;
33use serde_json::json;
34use std::convert::Infallible;
35use std::sync::Arc;
36use std::time::Duration;
37use tokio_stream::wrappers::BroadcastStream;
38
39/// Query parameters for SSE event filtering
40#[derive(Debug, Deserialize, Default)]
41pub struct SseFilter {
42    /// Filter by event kind: "entity" or "link"
43    pub kind: Option<String>,
44
45    /// Filter by entity type (e.g., "user", "order") or link type (e.g., "follows")
46    pub entity_type: Option<String>,
47
48    /// Filter by action: "created", "updated", "deleted"
49    pub event_type: Option<String>,
50}
51
52/// SSE event stream handler
53///
54/// Subscribes to the EventBus and streams matching events as SSE.
55/// Sends heartbeat comments every 30 seconds to keep the connection alive.
56pub async fn sse_handler(
57    State(event_bus): State<Arc<EventBus>>,
58    Query(filter): Query<SseFilter>,
59) -> Sse<impl Stream<Item = Result<Event, Infallible>>> {
60    let rx = event_bus.subscribe();
61
62    let stream = BroadcastStream::new(rx).filter_map(move |result| {
63        let item = match result {
64            Ok(envelope) => {
65                if matches_filter(&envelope, &filter) {
66                    envelope_to_sse_event(&envelope).map(Ok)
67                } else {
68                    None
69                }
70            }
71            Err(tokio_stream::wrappers::errors::BroadcastStreamRecvError::Lagged(n)) => {
72                tracing::warn!(missed = n, "SSE client lagged, missed events");
73                let warning = Event::default()
74                    .event("warning")
75                    .data(format!("missed {} events due to slow consumption", n));
76                Some(Ok(warning))
77            }
78        };
79        std::future::ready(item)
80    });
81
82    Sse::new(stream).keep_alive(
83        KeepAlive::new()
84            .interval(Duration::from_secs(30))
85            .text("heartbeat"),
86    )
87}
88
89/// Check if an event envelope matches the SSE filter
90fn matches_filter(envelope: &EventEnvelope, filter: &SseFilter) -> bool {
91    // Filter by kind (entity / link)
92    if let Some(ref kind) = filter.kind
93        && envelope.event.event_kind() != kind
94    {
95        return false;
96    }
97
98    // Filter by entity_type (matches entity_type for entities, link_type for links)
99    if let Some(ref entity_type) = filter.entity_type {
100        let matches = match &envelope.event {
101            FrameworkEvent::Entity(e) => match e {
102                EntityEvent::Created {
103                    entity_type: et, ..
104                }
105                | EntityEvent::Updated {
106                    entity_type: et, ..
107                }
108                | EntityEvent::Deleted {
109                    entity_type: et, ..
110                } => et == entity_type,
111            },
112            FrameworkEvent::Link(l) => match l {
113                LinkEvent::Created { link_type: lt, .. }
114                | LinkEvent::Deleted { link_type: lt, .. } => lt == entity_type,
115            },
116        };
117        if !matches {
118            return false;
119        }
120    }
121
122    // Filter by event_type (created, updated, deleted)
123    if let Some(ref event_type) = filter.event_type
124        && envelope.event.action() != event_type
125    {
126        return false;
127    }
128
129    true
130}
131
132/// Convert an EventEnvelope to an SSE Event
133fn envelope_to_sse_event(envelope: &EventEnvelope) -> Option<Event> {
134    let data = match &envelope.event {
135        FrameworkEvent::Entity(e) => match e {
136            EntityEvent::Created {
137                entity_type,
138                entity_id,
139                data,
140            }
141            | EntityEvent::Updated {
142                entity_type,
143                entity_id,
144                data,
145            } => json!({
146                "kind": "entity",
147                "action": envelope.event.action(),
148                "entity_type": entity_type,
149                "entity_id": entity_id,
150                "data": data,
151                "timestamp": envelope.timestamp.to_rfc3339(),
152            }),
153            EntityEvent::Deleted {
154                entity_type,
155                entity_id,
156            } => json!({
157                "kind": "entity",
158                "action": "deleted",
159                "entity_type": entity_type,
160                "entity_id": entity_id,
161                "timestamp": envelope.timestamp.to_rfc3339(),
162            }),
163        },
164        FrameworkEvent::Link(l) => match l {
165            LinkEvent::Created {
166                link_type,
167                link_id,
168                source_id,
169                target_id,
170                metadata,
171            } => json!({
172                "kind": "link",
173                "action": "created",
174                "link_type": link_type,
175                "link_id": link_id,
176                "source_id": source_id,
177                "target_id": target_id,
178                "metadata": metadata,
179                "timestamp": envelope.timestamp.to_rfc3339(),
180            }),
181            LinkEvent::Deleted {
182                link_type,
183                link_id,
184                source_id,
185                target_id,
186            } => json!({
187                "kind": "link",
188                "action": "deleted",
189                "link_type": link_type,
190                "link_id": link_id,
191                "source_id": source_id,
192                "target_id": target_id,
193                "timestamp": envelope.timestamp.to_rfc3339(),
194            }),
195        },
196    };
197
198    let json_str = serde_json::to_string(&data).ok()?;
199    Some(Event::default().data(json_str))
200}
201
202#[cfg(test)]
203mod tests {
204    use super::*;
205    use uuid::Uuid;
206
207    fn make_entity_envelope(entity_type: &str, action: &str) -> EventEnvelope {
208        let event = match action {
209            "created" => FrameworkEvent::Entity(EntityEvent::Created {
210                entity_type: entity_type.to_string(),
211                entity_id: Uuid::new_v4(),
212                data: json!({"name": "test"}),
213            }),
214            "updated" => FrameworkEvent::Entity(EntityEvent::Updated {
215                entity_type: entity_type.to_string(),
216                entity_id: Uuid::new_v4(),
217                data: json!({"name": "updated"}),
218            }),
219            "deleted" => FrameworkEvent::Entity(EntityEvent::Deleted {
220                entity_type: entity_type.to_string(),
221                entity_id: Uuid::new_v4(),
222            }),
223            _ => unreachable!(),
224        };
225
226        EventEnvelope::new(event)
227    }
228
229    fn make_link_envelope(link_type: &str, action: &str) -> EventEnvelope {
230        let event = match action {
231            "created" => FrameworkEvent::Link(LinkEvent::Created {
232                link_type: link_type.to_string(),
233                link_id: Uuid::new_v4(),
234                source_id: Uuid::new_v4(),
235                target_id: Uuid::new_v4(),
236                metadata: None,
237            }),
238            "deleted" => FrameworkEvent::Link(LinkEvent::Deleted {
239                link_type: link_type.to_string(),
240                link_id: Uuid::new_v4(),
241                source_id: Uuid::new_v4(),
242                target_id: Uuid::new_v4(),
243            }),
244            _ => unreachable!(),
245        };
246
247        EventEnvelope::new(event)
248    }
249
250    #[test]
251    fn test_matches_filter_no_filter() {
252        let envelope = make_entity_envelope("user", "created");
253        let filter = SseFilter::default();
254        assert!(matches_filter(&envelope, &filter));
255    }
256
257    #[test]
258    fn test_matches_filter_kind_entity() {
259        let envelope = make_entity_envelope("user", "created");
260        let filter = SseFilter {
261            kind: Some("entity".to_string()),
262            ..Default::default()
263        };
264        assert!(matches_filter(&envelope, &filter));
265
266        let filter = SseFilter {
267            kind: Some("link".to_string()),
268            ..Default::default()
269        };
270        assert!(!matches_filter(&envelope, &filter));
271    }
272
273    #[test]
274    fn test_matches_filter_kind_link() {
275        let envelope = make_link_envelope("follows", "created");
276        let filter = SseFilter {
277            kind: Some("link".to_string()),
278            ..Default::default()
279        };
280        assert!(matches_filter(&envelope, &filter));
281
282        let filter = SseFilter {
283            kind: Some("entity".to_string()),
284            ..Default::default()
285        };
286        assert!(!matches_filter(&envelope, &filter));
287    }
288
289    #[test]
290    fn test_matches_filter_entity_type() {
291        let envelope = make_entity_envelope("user", "created");
292        let filter = SseFilter {
293            entity_type: Some("user".to_string()),
294            ..Default::default()
295        };
296        assert!(matches_filter(&envelope, &filter));
297
298        let filter = SseFilter {
299            entity_type: Some("order".to_string()),
300            ..Default::default()
301        };
302        assert!(!matches_filter(&envelope, &filter));
303    }
304
305    #[test]
306    fn test_matches_filter_event_type() {
307        let envelope = make_entity_envelope("user", "created");
308        let filter = SseFilter {
309            event_type: Some("created".to_string()),
310            ..Default::default()
311        };
312        assert!(matches_filter(&envelope, &filter));
313
314        let filter = SseFilter {
315            event_type: Some("deleted".to_string()),
316            ..Default::default()
317        };
318        assert!(!matches_filter(&envelope, &filter));
319    }
320
321    #[test]
322    fn test_matches_filter_combined() {
323        let envelope = make_entity_envelope("user", "created");
324        let filter = SseFilter {
325            kind: Some("entity".to_string()),
326            entity_type: Some("user".to_string()),
327            event_type: Some("created".to_string()),
328        };
329        assert!(matches_filter(&envelope, &filter));
330
331        // Wrong event_type
332        let filter = SseFilter {
333            kind: Some("entity".to_string()),
334            entity_type: Some("user".to_string()),
335            event_type: Some("deleted".to_string()),
336        };
337        assert!(!matches_filter(&envelope, &filter));
338    }
339
340    #[test]
341    fn test_envelope_to_sse_event_entity_created() {
342        let envelope = make_entity_envelope("user", "created");
343        let event = envelope_to_sse_event(&envelope);
344        assert!(event.is_some());
345    }
346
347    #[test]
348    fn test_envelope_to_sse_event_entity_deleted() {
349        let envelope = make_entity_envelope("user", "deleted");
350        let event = envelope_to_sse_event(&envelope);
351        assert!(event.is_some());
352    }
353
354    #[test]
355    fn test_envelope_to_sse_event_link_created() {
356        let envelope = make_link_envelope("follows", "created");
357        let event = envelope_to_sse_event(&envelope);
358        assert!(event.is_some());
359    }
360
361    #[test]
362    fn test_envelope_to_sse_event_link_deleted() {
363        let envelope = make_link_envelope("follows", "deleted");
364        let event = envelope_to_sse_event(&envelope);
365        assert!(event.is_some());
366    }
367
368    #[test]
369    fn test_link_filter_by_link_type() {
370        let envelope = make_link_envelope("follows", "created");
371
372        // Matches link_type
373        let filter = SseFilter {
374            entity_type: Some("follows".to_string()),
375            ..Default::default()
376        };
377        assert!(matches_filter(&envelope, &filter));
378
379        // Doesn't match different type
380        let filter = SseFilter {
381            entity_type: Some("owns".to_string()),
382            ..Default::default()
383        };
384        assert!(!matches_filter(&envelope, &filter));
385    }
386
387    #[test]
388    fn test_link_event_type_filter() {
389        let created = make_link_envelope("follows", "created");
390        let deleted = make_link_envelope("follows", "deleted");
391
392        let filter = SseFilter {
393            event_type: Some("created".to_string()),
394            ..Default::default()
395        };
396        assert!(matches_filter(&created, &filter));
397        assert!(!matches_filter(&deleted, &filter));
398    }
399}