assay_workflow/api/
events.rs1use std::convert::Infallible;
2use std::sync::Arc;
3
4use axum::extract::State;
5use axum::response::sse::{Event, KeepAlive, Sse};
6use axum::routing::get;
7use axum::Router;
8use serde::{Deserialize, Serialize};
9use tokio_stream::wrappers::BroadcastStream;
10use tokio_stream::StreamExt;
11
12use crate::api::AppState;
13use crate::store::WorkflowStore;
14
15#[derive(Clone, Debug, Serialize, Deserialize)]
17pub struct BroadcastEvent {
18 pub event_type: String,
19 pub workflow_id: String,
20 pub payload: Option<String>,
21}
22
23pub fn router<S: WorkflowStore + 'static>() -> Router<Arc<AppState<S>>> {
24 Router::new().route("/events/stream", get(event_stream))
25}
26
27async fn event_stream<S: WorkflowStore>(
28 State(state): State<Arc<AppState<S>>>,
29) -> Sse<impl tokio_stream::Stream<Item = Result<Event, Infallible>>> {
30 let rx = state.event_tx.subscribe();
31
32 let stream =
33 BroadcastStream::new(rx).filter_map(|result: Result<BroadcastEvent, _>| {
34 result.ok().map(|evt| {
35 let data = serde_json::to_string(&evt).unwrap_or_default();
36 Ok(Event::default().event(&evt.event_type).data(data))
37 })
38 });
39
40 Sse::new(stream).keep_alive(KeepAlive::default())
41}