1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
//! # modo::sse
//!
//! Server-Sent Events (SSE) — keyed broadcast channels, event builders, and
//! reconnection helpers for real-time streaming over HTTP.
//!
//! Provides:
//! - [`Broadcaster`] — keyed broadcast channel registry; produces SSE responses
//! - [`BroadcastStream`] — stream of values from a broadcast channel with configurable lag policy
//! - [`LagPolicy`] — `End` or `Skip` behavior when a subscriber falls behind
//! - [`Sender`] — imperative event sender for [`Broadcaster::channel()`] closures
//! - [`SseStreamExt`] — `.cast_events()` combinator to map a stream to SSE events
//! - [`Event`] — builder for a single SSE event (id, event name, data, retry)
//! - [`LastEventId`] — axum extractor for the `Last-Event-ID` reconnection header
//! - [`SseConfig`] — keep-alive interval configuration
//! - [`replay()`] — converts a `Vec<T>` into a stream for missed-event replay on reconnect
//!
//! # Quick start
//!
//! ## Stream from a broadcast channel
//!
//! ```
//! use modo::sse::{Broadcaster, Event, LagPolicy, SseConfig, SseStreamExt};
//! use modo::service::Service;
//!
//! # #[derive(Clone, serde::Serialize)]
//! # struct Notification { msg: String }
//! // Register a broadcaster as a service in main()
//! let notifications: Broadcaster<String, Notification> =
//! Broadcaster::new(64, SseConfig::default());
//! # let mut registry = modo::service::Registry::new();
//! registry.add(notifications);
//!
//! // Subscribe in a handler
//! async fn events(
//! Service(bc): Service<Broadcaster<String, Notification>>,
//! ) -> axum::response::Response {
//! let topic = "topic".to_string();
//! let stream = bc.subscribe(&topic)
//! .on_lag(LagPolicy::Skip)
//! .cast_events(|n| {
//! Event::new(modo::id::short(), "notification")?.json(&n)
//! });
//! bc.response(stream)
//! }
//! ```
//!
//! ## Imperative channel (monitoring)
//!
//! ```
//! use modo::sse::{Broadcaster, Event};
//! use modo::service::Service;
//! use std::time::Duration;
//!
//! # #[derive(Clone, serde::Serialize)]
//! # struct Status { ok: bool }
//! # async fn check_health() -> Status { Status { ok: true } }
//! async fn health(
//! Service(bc): Service<Broadcaster<String, Status>>,
//! ) -> axum::response::Response {
//! bc.channel(|tx| async move {
//! loop {
//! let status = check_health().await;
//! tx.send(Event::new(modo::id::short(), "health")?.json(&status)?).await?;
//! tokio::time::sleep(Duration::from_secs(5)).await;
//! }
//! })
//! }
//! ```
//!
//! ## HTML partials (HTMX)
//!
//! ```
//! use modo::sse::{Broadcaster, Event, LagPolicy, SseStreamExt};
//! use modo::service::Service;
//! # use axum::extract::Path;
//!
//! # #[derive(Clone, serde::Serialize)]
//! # struct ChatMessage { text: String }
//! # struct Renderer;
//! # impl Renderer { fn render(&self, _tpl: &str, _data: &ChatMessage) -> modo::Result<String> { Ok(String::new()) } }
//! async fn chat(
//! Path(room_id): Path<String>,
//! Service(bc): Service<Broadcaster<String, ChatMessage>>,
//! Service(renderer): Service<Renderer>,
//! ) -> axum::response::Response {
//! let stream = bc.subscribe(&room_id)
//! .on_lag(LagPolicy::End)
//! .cast_events(move |msg| {
//! let html = renderer.render("chat/message.html", &msg)?;
//! Ok(Event::new(modo::id::short(), "message")?.html(html))
//! });
//! bc.response(stream)
//! }
//! ```
//!
//! # Architecture
//!
//! | Type | Purpose |
//! |------|---------|
//! | [`Event`] | Builder for a single event (id + event name + data + retry) |
//! | [`Broadcaster`] | Keyed broadcast channels, owns config, produces responses |
//! | [`BroadcastStream`] | Stream from a broadcast channel with lag policy |
//! | [`LagPolicy`] | `End` or `Skip` — controls behavior when subscriber lags |
//! | [`Sender`] | Imperative sender for [`Broadcaster::channel()`] closures |
//! | [`SseStreamExt`] | `.cast_events()` combinator for stream-to-event conversion |
//! | [`LastEventId`] | Standalone extractor for the `Last-Event-ID` header |
//! | [`SseConfig`] | Keep-alive configuration |
//! | [`replay()`] | Convert a `Vec<T>` into a stream for reconnection replay |
//!
//! # Gotchas
//!
//! ## Request timeout
//!
//! If a global request timeout layer is configured, it will terminate SSE
//! connections. SSE connections are long-lived — either set a long timeout
//! or exclude SSE routes from the timeout layer.
//!
//! ## Reverse proxy buffering (nginx)
//!
//! Nginx buffers responses by default, which breaks SSE. The module
//! automatically sets `X-Accel-Buffering: no` on all SSE responses.
//! Other proxies may need manual configuration.
//!
//! ## HTTP compression
//!
//! `CompressionLayer` buffers response data before sending, preventing
//! real-time event flushing. Disable compression for SSE routes using
//! per-route layer overrides or the predicate option — prefer per-route
//! disabling over turning compression off globally.
//!
//! ## Multi-line HTML
//!
//! Multi-line data (including HTML partials) is handled automatically per
//! the SSE spec. Keep partials small — send individual components, not
//! entire page sections.
pub use ;
pub use SseConfig;
pub use Event;
pub use LastEventId;
pub use Sender;
pub use SseStreamExt;