this/server/exposure/rest/
sse.rs1use crate::core::events::{EntityEvent, EventBus, EventEnvelope, FrameworkEvent, LinkEvent};
28use axum::extract::{Query, State};
29use axum::response::sse::{Event, KeepAlive, Sse};
30use futures::StreamExt;
31use futures::stream::Stream;
32use serde::Deserialize;
33use serde_json::json;
34use std::convert::Infallible;
35use std::sync::Arc;
36use std::time::Duration;
37use tokio_stream::wrappers::BroadcastStream;
38
39#[derive(Debug, Deserialize, Default)]
41pub struct SseFilter {
42 pub kind: Option<String>,
44
45 pub entity_type: Option<String>,
47
48 pub event_type: Option<String>,
50}
51
52pub async fn sse_handler(
57 State(event_bus): State<Arc<EventBus>>,
58 Query(filter): Query<SseFilter>,
59) -> Sse<impl Stream<Item = Result<Event, Infallible>>> {
60 let rx = event_bus.subscribe();
61
62 let stream = BroadcastStream::new(rx).filter_map(move |result| {
63 let item = match result {
64 Ok(envelope) => {
65 if matches_filter(&envelope, &filter) {
66 envelope_to_sse_event(&envelope).map(Ok)
67 } else {
68 None
69 }
70 }
71 Err(tokio_stream::wrappers::errors::BroadcastStreamRecvError::Lagged(n)) => {
72 tracing::warn!(missed = n, "SSE client lagged, missed events");
73 let warning = Event::default()
74 .event("warning")
75 .data(format!("missed {} events due to slow consumption", n));
76 Some(Ok(warning))
77 }
78 };
79 std::future::ready(item)
80 });
81
82 Sse::new(stream).keep_alive(
83 KeepAlive::new()
84 .interval(Duration::from_secs(30))
85 .text("heartbeat"),
86 )
87}
88
89fn matches_filter(envelope: &EventEnvelope, filter: &SseFilter) -> bool {
91 if let Some(ref kind) = filter.kind
93 && envelope.event.event_kind() != kind
94 {
95 return false;
96 }
97
98 if let Some(ref entity_type) = filter.entity_type {
100 let matches = match &envelope.event {
101 FrameworkEvent::Entity(e) => match e {
102 EntityEvent::Created {
103 entity_type: et, ..
104 }
105 | EntityEvent::Updated {
106 entity_type: et, ..
107 }
108 | EntityEvent::Deleted {
109 entity_type: et, ..
110 } => et == entity_type,
111 },
112 FrameworkEvent::Link(l) => match l {
113 LinkEvent::Created { link_type: lt, .. }
114 | LinkEvent::Deleted { link_type: lt, .. } => lt == entity_type,
115 },
116 };
117 if !matches {
118 return false;
119 }
120 }
121
122 if let Some(ref event_type) = filter.event_type
124 && envelope.event.action() != event_type
125 {
126 return false;
127 }
128
129 true
130}
131
132fn envelope_to_sse_event(envelope: &EventEnvelope) -> Option<Event> {
134 let data = match &envelope.event {
135 FrameworkEvent::Entity(e) => match e {
136 EntityEvent::Created {
137 entity_type,
138 entity_id,
139 data,
140 }
141 | EntityEvent::Updated {
142 entity_type,
143 entity_id,
144 data,
145 } => json!({
146 "kind": "entity",
147 "action": envelope.event.action(),
148 "entity_type": entity_type,
149 "entity_id": entity_id,
150 "data": data,
151 "timestamp": envelope.timestamp.to_rfc3339(),
152 }),
153 EntityEvent::Deleted {
154 entity_type,
155 entity_id,
156 } => json!({
157 "kind": "entity",
158 "action": "deleted",
159 "entity_type": entity_type,
160 "entity_id": entity_id,
161 "timestamp": envelope.timestamp.to_rfc3339(),
162 }),
163 },
164 FrameworkEvent::Link(l) => match l {
165 LinkEvent::Created {
166 link_type,
167 link_id,
168 source_id,
169 target_id,
170 metadata,
171 } => json!({
172 "kind": "link",
173 "action": "created",
174 "link_type": link_type,
175 "link_id": link_id,
176 "source_id": source_id,
177 "target_id": target_id,
178 "metadata": metadata,
179 "timestamp": envelope.timestamp.to_rfc3339(),
180 }),
181 LinkEvent::Deleted {
182 link_type,
183 link_id,
184 source_id,
185 target_id,
186 } => json!({
187 "kind": "link",
188 "action": "deleted",
189 "link_type": link_type,
190 "link_id": link_id,
191 "source_id": source_id,
192 "target_id": target_id,
193 "timestamp": envelope.timestamp.to_rfc3339(),
194 }),
195 },
196 };
197
198 let json_str = serde_json::to_string(&data).ok()?;
199 Some(Event::default().data(json_str))
200}
201
202#[cfg(test)]
203mod tests {
204 use super::*;
205 use uuid::Uuid;
206
207 fn make_entity_envelope(entity_type: &str, action: &str) -> EventEnvelope {
208 let event = match action {
209 "created" => FrameworkEvent::Entity(EntityEvent::Created {
210 entity_type: entity_type.to_string(),
211 entity_id: Uuid::new_v4(),
212 data: json!({"name": "test"}),
213 }),
214 "updated" => FrameworkEvent::Entity(EntityEvent::Updated {
215 entity_type: entity_type.to_string(),
216 entity_id: Uuid::new_v4(),
217 data: json!({"name": "updated"}),
218 }),
219 "deleted" => FrameworkEvent::Entity(EntityEvent::Deleted {
220 entity_type: entity_type.to_string(),
221 entity_id: Uuid::new_v4(),
222 }),
223 _ => unreachable!(),
224 };
225
226 EventEnvelope::new(event)
227 }
228
229 fn make_link_envelope(link_type: &str, action: &str) -> EventEnvelope {
230 let event = match action {
231 "created" => FrameworkEvent::Link(LinkEvent::Created {
232 link_type: link_type.to_string(),
233 link_id: Uuid::new_v4(),
234 source_id: Uuid::new_v4(),
235 target_id: Uuid::new_v4(),
236 metadata: None,
237 }),
238 "deleted" => FrameworkEvent::Link(LinkEvent::Deleted {
239 link_type: link_type.to_string(),
240 link_id: Uuid::new_v4(),
241 source_id: Uuid::new_v4(),
242 target_id: Uuid::new_v4(),
243 }),
244 _ => unreachable!(),
245 };
246
247 EventEnvelope::new(event)
248 }
249
250 #[test]
251 fn test_matches_filter_no_filter() {
252 let envelope = make_entity_envelope("user", "created");
253 let filter = SseFilter::default();
254 assert!(matches_filter(&envelope, &filter));
255 }
256
257 #[test]
258 fn test_matches_filter_kind_entity() {
259 let envelope = make_entity_envelope("user", "created");
260 let filter = SseFilter {
261 kind: Some("entity".to_string()),
262 ..Default::default()
263 };
264 assert!(matches_filter(&envelope, &filter));
265
266 let filter = SseFilter {
267 kind: Some("link".to_string()),
268 ..Default::default()
269 };
270 assert!(!matches_filter(&envelope, &filter));
271 }
272
273 #[test]
274 fn test_matches_filter_kind_link() {
275 let envelope = make_link_envelope("follows", "created");
276 let filter = SseFilter {
277 kind: Some("link".to_string()),
278 ..Default::default()
279 };
280 assert!(matches_filter(&envelope, &filter));
281
282 let filter = SseFilter {
283 kind: Some("entity".to_string()),
284 ..Default::default()
285 };
286 assert!(!matches_filter(&envelope, &filter));
287 }
288
289 #[test]
290 fn test_matches_filter_entity_type() {
291 let envelope = make_entity_envelope("user", "created");
292 let filter = SseFilter {
293 entity_type: Some("user".to_string()),
294 ..Default::default()
295 };
296 assert!(matches_filter(&envelope, &filter));
297
298 let filter = SseFilter {
299 entity_type: Some("order".to_string()),
300 ..Default::default()
301 };
302 assert!(!matches_filter(&envelope, &filter));
303 }
304
305 #[test]
306 fn test_matches_filter_event_type() {
307 let envelope = make_entity_envelope("user", "created");
308 let filter = SseFilter {
309 event_type: Some("created".to_string()),
310 ..Default::default()
311 };
312 assert!(matches_filter(&envelope, &filter));
313
314 let filter = SseFilter {
315 event_type: Some("deleted".to_string()),
316 ..Default::default()
317 };
318 assert!(!matches_filter(&envelope, &filter));
319 }
320
321 #[test]
322 fn test_matches_filter_combined() {
323 let envelope = make_entity_envelope("user", "created");
324 let filter = SseFilter {
325 kind: Some("entity".to_string()),
326 entity_type: Some("user".to_string()),
327 event_type: Some("created".to_string()),
328 };
329 assert!(matches_filter(&envelope, &filter));
330
331 let filter = SseFilter {
333 kind: Some("entity".to_string()),
334 entity_type: Some("user".to_string()),
335 event_type: Some("deleted".to_string()),
336 };
337 assert!(!matches_filter(&envelope, &filter));
338 }
339
340 #[test]
341 fn test_envelope_to_sse_event_entity_created() {
342 let envelope = make_entity_envelope("user", "created");
343 let event = envelope_to_sse_event(&envelope);
344 assert!(event.is_some());
345 }
346
347 #[test]
348 fn test_envelope_to_sse_event_entity_deleted() {
349 let envelope = make_entity_envelope("user", "deleted");
350 let event = envelope_to_sse_event(&envelope);
351 assert!(event.is_some());
352 }
353
354 #[test]
355 fn test_envelope_to_sse_event_link_created() {
356 let envelope = make_link_envelope("follows", "created");
357 let event = envelope_to_sse_event(&envelope);
358 assert!(event.is_some());
359 }
360
361 #[test]
362 fn test_envelope_to_sse_event_link_deleted() {
363 let envelope = make_link_envelope("follows", "deleted");
364 let event = envelope_to_sse_event(&envelope);
365 assert!(event.is_some());
366 }
367
368 #[test]
369 fn test_link_filter_by_link_type() {
370 let envelope = make_link_envelope("follows", "created");
371
372 let filter = SseFilter {
374 entity_type: Some("follows".to_string()),
375 ..Default::default()
376 };
377 assert!(matches_filter(&envelope, &filter));
378
379 let filter = SseFilter {
381 entity_type: Some("owns".to_string()),
382 ..Default::default()
383 };
384 assert!(!matches_filter(&envelope, &filter));
385 }
386
387 #[test]
388 fn test_link_event_type_filter() {
389 let created = make_link_envelope("follows", "created");
390 let deleted = make_link_envelope("follows", "deleted");
391
392 let filter = SseFilter {
393 event_type: Some("created".to_string()),
394 ..Default::default()
395 };
396 assert!(matches_filter(&created, &filter));
397 assert!(!matches_filter(&deleted, &filter));
398 }
399}