Skip to main content

assay_workflow/api/
events.rs

1use std::convert::Infallible;
2use std::sync::Arc;
3
4use axum::extract::State;
5use axum::response::sse::{Event, KeepAlive, Sse};
6use axum::routing::get;
7use axum::Router;
8use serde::{Deserialize, Serialize};
9use tokio_stream::wrappers::BroadcastStream;
10use tokio_stream::StreamExt;
11
12use crate::api::AppState;
13use crate::store::WorkflowStore;
14
15/// Event broadcast to SSE subscribers.
16#[derive(Clone, Debug, Serialize, Deserialize)]
17pub struct BroadcastEvent {
18    pub event_type: String,
19    pub workflow_id: String,
20    pub payload: Option<String>,
21}
22
23pub fn router<S: WorkflowStore + 'static>() -> Router<Arc<AppState<S>>> {
24    Router::new().route("/events/stream", get(event_stream))
25}
26
27async fn event_stream<S: WorkflowStore>(
28    State(state): State<Arc<AppState<S>>>,
29) -> Sse<impl tokio_stream::Stream<Item = Result<Event, Infallible>>> {
30    let rx = state.event_tx.subscribe();
31
32    let stream =
33        BroadcastStream::new(rx).filter_map(|result: Result<BroadcastEvent, _>| {
34            result.ok().map(|evt| {
35                let data = serde_json::to_string(&evt).unwrap_or_default();
36                Ok(Event::default().event(&evt.event_type).data(data))
37            })
38        });
39
40    Sse::new(stream).keep_alive(KeepAlive::default())
41}