Skip to main content

rustapi_core/
sse.rs

1//! Server-Sent Events (SSE) response types for RustAPI
2//!
3//! This module provides types for streaming Server-Sent Events to clients.
4//! SSE is ideal for real-time updates like notifications, live feeds, and progress updates.
5//!
6//! # Example
7//!
8//! ```rust,ignore
9//! use rustapi_core::sse::{Sse, SseEvent, KeepAlive};
10//! use futures_util::stream;
11//! use std::time::Duration;
12//!
13//! async fn events() -> Sse<impl Stream<Item = Result<SseEvent, std::convert::Infallible>>> {
14//!     let stream = stream::iter(vec![
15//!         Ok(SseEvent::new("Hello")),
16//!         Ok(SseEvent::new("World").event("greeting")),
17//!     ]);
18//!     Sse::new(stream)
19//!         .keep_alive(KeepAlive::new().interval(Duration::from_secs(15)))
20//! }
21//! ```
22//!
23//! # Keep-Alive Support
24//!
25//! SSE connections can be kept alive by sending periodic comments:
26//!
27//! ```rust,ignore
28//! use rustapi_core::sse::{Sse, SseEvent, KeepAlive};
29//! use std::time::Duration;
30//!
31//! async fn events() -> impl IntoResponse {
32//!     let stream = async_stream::stream! {
33//!         for i in 0..10 {
34//!             yield Ok::<_, std::convert::Infallible>(
35//!                 SseEvent::new(format!("Event {}", i))
36//!             );
37//!             tokio::time::sleep(Duration::from_secs(1)).await;
38//!         }
39//!     };
40//!
41//!     Sse::new(stream)
42//!         .keep_alive(KeepAlive::new()
43//!             .interval(Duration::from_secs(30))
44//!             .text("ping"))
45//! }
46//! ```
47
48use bytes::Bytes;
49use futures_util::Stream;
50use http::{header, StatusCode};
51
52use pin_project_lite::pin_project;
53use rustapi_openapi::{MediaType, Operation, ResponseModifier, ResponseSpec, SchemaRef};
54use std::collections::BTreeMap;
55use std::fmt::Write;
56use std::pin::Pin;
57use std::task::{Context, Poll};
58use std::time::Duration;
59
60use crate::response::{IntoResponse, Response};
61
62/// A Server-Sent Event
63///
64/// SSE events follow the format specified in the W3C Server-Sent Events specification.
65/// Each event can have:
66/// - `data`: The event data (required)
67/// - `event`: The event type/name (optional)
68/// - `id`: The event ID for reconnection (optional)
69/// - `retry`: Reconnection time in milliseconds (optional)
70/// - `comment`: A comment line (optional, not visible to most clients)
71#[derive(Debug, Clone, Default)]
72pub struct SseEvent {
73    /// The event data
74    pub data: String,
75    /// The event type/name
76    pub event: Option<String>,
77    /// The event ID
78    pub id: Option<String>,
79    /// Reconnection time in milliseconds
80    pub retry: Option<u64>,
81    /// Comment line
82    comment: Option<String>,
83}
84
85impl SseEvent {
86    /// Create a new SSE event with the given data
87    pub fn new(data: impl Into<String>) -> Self {
88        Self {
89            data: data.into(),
90            event: None,
91            id: None,
92            retry: None,
93            comment: None,
94        }
95    }
96
97    /// Create an SSE comment (keep-alive)
98    ///
99    /// Comments are lines starting with `:` and are typically used for keep-alive.
100    pub fn comment(text: impl Into<String>) -> Self {
101        Self {
102            data: String::new(),
103            event: None,
104            id: None,
105            retry: None,
106            comment: Some(text.into()),
107        }
108    }
109
110    /// Set the event type/name
111    pub fn event(mut self, event: impl Into<String>) -> Self {
112        self.event = Some(event.into());
113        self
114    }
115
116    /// Set the event ID
117    pub fn id(mut self, id: impl Into<String>) -> Self {
118        self.id = Some(id.into());
119        self
120    }
121
122    /// Set the retry time in milliseconds
123    pub fn retry(mut self, retry: u64) -> Self {
124        self.retry = Some(retry);
125        self
126    }
127
128    /// Set JSON data (serializes the value)
129    pub fn json_data<T: serde::Serialize>(data: &T) -> Result<Self, serde_json::Error> {
130        Ok(Self::new(serde_json::to_string(data)?))
131    }
132
133    /// Format the event as an SSE message
134    ///
135    /// The format follows the SSE specification:
136    /// - Lines starting with "event:" specify the event type
137    /// - Lines starting with "id:" specify the event ID
138    /// - Lines starting with "retry:" specify the reconnection time
139    /// - Lines starting with "data:" contain the event data
140    /// - Lines starting with ":" are comments
141    /// - Events are terminated with a blank line
142    pub fn to_sse_string(&self) -> String {
143        let mut output = String::new();
144
145        // Comment (for keep-alive)
146        if let Some(ref comment) = self.comment {
147            writeln!(output, ": {}", comment).unwrap();
148            output.push('\n');
149            return output;
150        }
151
152        // Event type
153        if let Some(ref event) = self.event {
154            writeln!(output, "event: {}", event).unwrap();
155        }
156
157        // Event ID
158        if let Some(ref id) = self.id {
159            writeln!(output, "id: {}", id).unwrap();
160        }
161
162        // Retry time
163        if let Some(retry) = self.retry {
164            writeln!(output, "retry: {}", retry).unwrap();
165        }
166
167        // Data - handle multi-line data by prefixing each line with "data: "
168        for line in self.data.lines() {
169            writeln!(output, "data: {}", line).unwrap();
170        }
171
172        // If data is empty, still send an empty data line
173        if self.data.is_empty() && self.comment.is_none() {
174            writeln!(output, "data:").unwrap();
175        }
176
177        // Empty line to terminate the event
178        output.push('\n');
179
180        output
181    }
182
183    /// Convert the event to bytes
184    pub fn to_bytes(&self) -> Bytes {
185        Bytes::from(self.to_sse_string())
186    }
187}
188
189/// Keep-alive configuration for SSE connections
190///
191/// Keep-alive sends periodic comments to prevent connection timeouts.
192///
193/// # Example
194///
195/// ```rust,ignore
196/// use rustapi_core::sse::KeepAlive;
197/// use std::time::Duration;
198///
199/// let keep_alive = KeepAlive::new()
200///     .interval(Duration::from_secs(30))
201///     .text("ping");
202/// ```
203#[derive(Debug, Clone)]
204pub struct KeepAlive {
205    /// Interval between keep-alive messages
206    interval: Duration,
207    /// Text to send as keep-alive comment
208    text: String,
209}
210
211impl Default for KeepAlive {
212    fn default() -> Self {
213        Self {
214            interval: Duration::from_secs(15),
215            text: "keep-alive".to_string(),
216        }
217    }
218}
219
220impl KeepAlive {
221    /// Create a new keep-alive configuration with default settings
222    pub fn new() -> Self {
223        Self::default()
224    }
225
226    /// Set the keep-alive interval
227    pub fn interval(mut self, interval: Duration) -> Self {
228        self.interval = interval;
229        self
230    }
231
232    /// Set the keep-alive text
233    pub fn text(mut self, text: impl Into<String>) -> Self {
234        self.text = text.into();
235        self
236    }
237
238    /// Get the interval
239    pub fn get_interval(&self) -> Duration {
240        self.interval
241    }
242
243    /// Create the keep-alive event
244    pub fn event(&self) -> SseEvent {
245        SseEvent::comment(&self.text)
246    }
247}
248
249/// Server-Sent Events response wrapper
250///
251/// Wraps a stream of `SseEvent` items and converts them to an SSE response.
252///
253/// # Example
254///
255/// ```rust,ignore
256/// use rustapi_core::sse::{Sse, SseEvent, KeepAlive};
257/// use futures_util::stream;
258/// use std::time::Duration;
259///
260/// async fn events() -> Sse<impl Stream<Item = Result<SseEvent, std::convert::Infallible>>> {
261///     let stream = stream::iter(vec![
262///         Ok(SseEvent::new("Hello")),
263///         Ok(SseEvent::new("World").event("greeting")),
264///     ]);
265///     Sse::new(stream)
266///         .keep_alive(KeepAlive::new().interval(Duration::from_secs(30)))
267/// }
268/// ```
269pub struct Sse<S> {
270    stream: S,
271    keep_alive: Option<KeepAlive>,
272}
273
274impl<S> Sse<S> {
275    /// Create a new SSE response from a stream
276    pub fn new(stream: S) -> Self {
277        Self {
278            stream,
279            keep_alive: None,
280        }
281    }
282
283    /// Set the keep-alive configuration
284    ///
285    /// When set, the server will send periodic comments to keep the connection alive.
286    ///
287    /// # Example
288    ///
289    /// ```rust,ignore
290    /// use rustapi_core::sse::{Sse, KeepAlive};
291    /// use std::time::Duration;
292    ///
293    /// Sse::new(stream)
294    ///     .keep_alive(KeepAlive::new().interval(Duration::from_secs(30)))
295    /// ```
296    pub fn keep_alive(mut self, config: KeepAlive) -> Self {
297        self.keep_alive = Some(config);
298        self
299    }
300
301    /// Get the keep-alive configuration
302    pub fn get_keep_alive(&self) -> Option<&KeepAlive> {
303        self.keep_alive.as_ref()
304    }
305}
306
307// Stream that merges SSE events with keep-alive events
308pin_project! {
309    /// A stream that combines SSE events with keep-alive messages
310    pub struct SseStream<S> {
311        #[pin]
312        inner: S,
313        keep_alive: Option<KeepAlive>,
314        #[pin]
315        keep_alive_timer: Option<tokio::time::Interval>,
316    }
317}
318
319impl<S, E> Stream for SseStream<S>
320where
321    S: Stream<Item = Result<SseEvent, E>>,
322{
323    type Item = Result<Bytes, E>;
324
325    fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
326        let this = self.project();
327
328        // First, check if there's an event ready from the inner stream
329        match this.inner.poll_next(cx) {
330            Poll::Ready(Some(Ok(event))) => {
331                return Poll::Ready(Some(Ok(event.to_bytes())));
332            }
333            Poll::Ready(Some(Err(e))) => {
334                return Poll::Ready(Some(Err(e)));
335            }
336            Poll::Ready(None) => {
337                return Poll::Ready(None);
338            }
339            Poll::Pending => {}
340        }
341
342        // Check keep-alive timer
343        if let Some(mut timer) = this.keep_alive_timer.as_pin_mut() {
344            if timer.poll_tick(cx).is_ready() {
345                if let Some(keep_alive) = this.keep_alive {
346                    let event = keep_alive.event();
347                    return Poll::Ready(Some(Ok(event.to_bytes())));
348                }
349            }
350        }
351
352        Poll::Pending
353    }
354}
355
356// For now, we'll implement IntoResponse by collecting the stream into a single response
357// This is a simplified implementation that works with the current Response type (Full<Bytes>)
358// A full streaming implementation would require changes to the Response type
359impl<S, E> IntoResponse for Sse<S>
360where
361    S: Stream<Item = Result<SseEvent, E>> + Send + 'static,
362    E: std::error::Error + Send + Sync + 'static,
363{
364    fn into_response(self) -> Response {
365        let timer = self.keep_alive.as_ref().map(|k| {
366            let mut interval = tokio::time::interval(k.interval);
367            interval.set_missed_tick_behavior(tokio::time::MissedTickBehavior::Skip);
368            interval
369        });
370
371        let stream = SseStream {
372            inner: self.stream,
373            keep_alive: self.keep_alive,
374            keep_alive_timer: timer,
375        };
376
377        use futures_util::StreamExt;
378        let stream =
379            stream.map(|res| res.map_err(|e| crate::error::ApiError::internal(e.to_string())));
380        let body = crate::response::Body::from_stream(stream);
381
382        http::Response::builder()
383            .status(StatusCode::OK)
384            .header(header::CONTENT_TYPE, "text/event-stream")
385            .header(header::CACHE_CONTROL, "no-cache")
386            .header(header::CONNECTION, "keep-alive")
387            .header("X-Accel-Buffering", "no") // Disable nginx buffering
388            .body(body)
389            .unwrap()
390    }
391}
392
393// OpenAPI support: ResponseModifier for SSE streams
394impl<S> ResponseModifier for Sse<S> {
395    fn update_response(op: &mut Operation) {
396        let mut content = BTreeMap::new();
397        content.insert(
398            "text/event-stream".to_string(),
399            MediaType {
400                schema: Some(SchemaRef::Inline(serde_json::json!({
401                    "type": "string",
402                    "description": "Server-Sent Events stream. Events follow the SSE format: 'event: <type>\\ndata: <json>\\n\\n'",
403                }))),
404                example: Some(serde_json::json!("event: message\ndata: {\"id\": 1, \"text\": \"Hello\"}\n\n")),
405            },
406        );
407
408        let response = ResponseSpec {
409            description: "Server-Sent Events stream for real-time updates".to_string(),
410            content,
411            headers: BTreeMap::new(),
412        };
413        op.responses.insert("200".to_string(), response);
414    }
415}
416
417/// Collect all SSE events from a stream into a single response body
418///
419/// This is useful for testing or when you know the stream is finite.
420pub async fn collect_sse_events<S, E>(stream: S) -> Result<Bytes, E>
421where
422    S: Stream<Item = Result<SseEvent, E>> + Send,
423{
424    use futures_util::StreamExt;
425
426    let mut buffer = Vec::new();
427    futures_util::pin_mut!(stream);
428
429    while let Some(result) = stream.next().await {
430        let event = result?;
431        buffer.extend_from_slice(&event.to_bytes());
432    }
433
434    Ok(Bytes::from(buffer))
435}
436
437/// Create an SSE response from a synchronous iterator of events
438///
439/// This is a convenience function for simple cases with pre-computed events.
440///
441/// # Example
442///
443/// ```rust,ignore
444/// use rustapi_core::sse::{sse_response, SseEvent};
445///
446/// async fn handler() -> Response {
447///     sse_response(vec![
448///         SseEvent::new("Hello"),
449///         SseEvent::new("World").event("greeting"),
450///     ])
451/// }
452/// ```
453pub fn sse_response<I>(events: I) -> Response
454where
455    I: IntoIterator<Item = SseEvent>,
456{
457    let mut buffer = String::new();
458    for event in events {
459        buffer.push_str(&event.to_sse_string());
460    }
461
462    http::Response::builder()
463        .status(StatusCode::OK)
464        .header(header::CONTENT_TYPE, "text/event-stream")
465        .header(header::CACHE_CONTROL, "no-cache")
466        .header(header::CONNECTION, "keep-alive")
467        .header("X-Accel-Buffering", "no")
468        .body(crate::response::Body::from(buffer))
469        .unwrap()
470}
471
472/// Helper function to create an SSE response from an iterator of events
473///
474/// This is useful for simple cases where you have a fixed set of events.
475pub fn sse_from_iter<I, E>(
476    events: I,
477) -> Sse<futures_util::stream::Iter<std::vec::IntoIter<Result<SseEvent, E>>>>
478where
479    I: IntoIterator<Item = Result<SseEvent, E>>,
480{
481    use futures_util::stream;
482    let vec: Vec<_> = events.into_iter().collect();
483    Sse::new(stream::iter(vec))
484}
485
486#[cfg(test)]
487mod tests {
488    use super::*;
489    use proptest::prelude::*;
490
491    #[test]
492    fn test_sse_event_basic() {
493        let event = SseEvent::new("Hello, World!");
494        let output = event.to_sse_string();
495        assert_eq!(output, "data: Hello, World!\n\n");
496    }
497
498    #[test]
499    fn test_sse_event_with_event_type() {
500        let event = SseEvent::new("Hello").event("greeting");
501        let output = event.to_sse_string();
502        assert!(output.contains("event: greeting\n"));
503        assert!(output.contains("data: Hello\n"));
504    }
505
506    #[test]
507    fn test_sse_event_with_id() {
508        let event = SseEvent::new("Hello").id("123");
509        let output = event.to_sse_string();
510        assert!(output.contains("id: 123\n"));
511        assert!(output.contains("data: Hello\n"));
512    }
513
514    #[test]
515    fn test_sse_event_with_retry() {
516        let event = SseEvent::new("Hello").retry(5000);
517        let output = event.to_sse_string();
518        assert!(output.contains("retry: 5000\n"));
519        assert!(output.contains("data: Hello\n"));
520    }
521
522    #[test]
523    fn test_sse_event_multiline_data() {
524        let event = SseEvent::new("Line 1\nLine 2\nLine 3");
525        let output = event.to_sse_string();
526        assert!(output.contains("data: Line 1\n"));
527        assert!(output.contains("data: Line 2\n"));
528        assert!(output.contains("data: Line 3\n"));
529    }
530
531    #[test]
532    fn test_sse_event_full() {
533        let event = SseEvent::new("Hello").event("message").id("1").retry(3000);
534        let output = event.to_sse_string();
535
536        // Check all fields are present
537        assert!(output.contains("event: message\n"));
538        assert!(output.contains("id: 1\n"));
539        assert!(output.contains("retry: 3000\n"));
540        assert!(output.contains("data: Hello\n"));
541
542        // Check it ends with double newline
543        assert!(output.ends_with("\n\n"));
544    }
545
546    #[test]
547    fn test_sse_response_headers() {
548        use futures_util::stream;
549
550        let events: Vec<Result<SseEvent, std::convert::Infallible>> =
551            vec![Ok(SseEvent::new("test"))];
552        let sse = Sse::new(stream::iter(events));
553        let response = sse.into_response();
554
555        assert_eq!(response.status(), StatusCode::OK);
556        assert_eq!(
557            response.headers().get(header::CONTENT_TYPE).unwrap(),
558            "text/event-stream"
559        );
560        assert_eq!(
561            response.headers().get(header::CACHE_CONTROL).unwrap(),
562            "no-cache"
563        );
564        assert_eq!(
565            response.headers().get(header::CONNECTION).unwrap(),
566            "keep-alive"
567        );
568    }
569
570    // **Feature: phase3-batteries-included, Property 20: SSE response format**
571    //
572    // For any stream of SseEvent items, `Sse<S>` SHALL produce a response with
573    // `Content-Type: text/event-stream` and body formatted according to SSE specification.
574    //
575    // **Validates: Requirements 6.3**
576    proptest! {
577        #![proptest_config(ProptestConfig::with_cases(100))]
578
579        #[test]
580        fn prop_sse_response_format(
581            // Generate random event data (alphanumeric to avoid special chars)
582            data in "[a-zA-Z0-9 ]{1,50}",
583            // Optional event type
584            event_type in proptest::option::of("[a-zA-Z][a-zA-Z0-9_]{0,20}"),
585            // Optional event ID
586            event_id in proptest::option::of("[a-zA-Z0-9]{1,10}"),
587            // Optional retry time
588            retry_time in proptest::option::of(1000u64..60000u64),
589        ) {
590            use futures_util::stream;
591
592            // Build the SSE event with optional fields
593            let mut event = SseEvent::new(data.clone());
594            if let Some(ref et) = event_type {
595                event = event.event(et.clone());
596            }
597            if let Some(ref id) = event_id {
598                event = event.id(id.clone());
599            }
600            if let Some(retry) = retry_time {
601                event = event.retry(retry);
602            }
603
604            // Verify the SSE string format
605            let sse_string = event.to_sse_string();
606
607            // Property 1: SSE string must end with double newline (event terminator)
608            prop_assert!(
609                sse_string.ends_with("\n\n"),
610                "SSE event must end with double newline, got: {:?}",
611                sse_string
612            );
613
614            // Property 2: Data must be present with "data: " prefix
615            prop_assert!(
616                sse_string.contains(&format!("data: {}", data)),
617                "SSE event must contain data field with 'data: ' prefix"
618            );
619
620            // Property 3: If event type is set, it must be present with "event: " prefix
621            if let Some(ref et) = event_type {
622                prop_assert!(
623                    sse_string.contains(&format!("event: {}", et)),
624                    "SSE event must contain event type with 'event: ' prefix"
625                );
626            }
627
628            // Property 4: If ID is set, it must be present with "id: " prefix
629            if let Some(ref id) = event_id {
630                prop_assert!(
631                    sse_string.contains(&format!("id: {}", id)),
632                    "SSE event must contain ID with 'id: ' prefix"
633                );
634            }
635
636            // Property 5: If retry is set, it must be present with "retry: " prefix
637            if let Some(retry) = retry_time {
638                prop_assert!(
639                    sse_string.contains(&format!("retry: {}", retry)),
640                    "SSE event must contain retry with 'retry: ' prefix"
641                );
642            }
643
644            // Property 6: Verify response headers are correct
645            let events: Vec<Result<SseEvent, std::convert::Infallible>> = vec![Ok(event)];
646            let sse = Sse::new(stream::iter(events));
647            let response = sse.into_response();
648
649            prop_assert_eq!(
650                response.headers().get(header::CONTENT_TYPE).map(|v| v.to_str().unwrap()),
651                Some("text/event-stream"),
652                "SSE response must have Content-Type: text/event-stream"
653            );
654
655            prop_assert_eq!(
656                response.headers().get(header::CACHE_CONTROL).map(|v| v.to_str().unwrap()),
657                Some("no-cache"),
658                "SSE response must have Cache-Control: no-cache"
659            );
660
661            prop_assert_eq!(
662                response.headers().get(header::CONNECTION).map(|v| v.to_str().unwrap()),
663                Some("keep-alive"),
664                "SSE response must have Connection: keep-alive"
665            );
666        }
667
668        #[test]
669        fn prop_sse_multiline_data_format(
670            // Generate multiple lines of data
671            lines in proptest::collection::vec("[a-zA-Z0-9 ]{1,30}", 1..5),
672        ) {
673            let data = lines.join("\n");
674            let event = SseEvent::new(data.clone());
675            let sse_string = event.to_sse_string();
676
677            // Property: Each line of data must be prefixed with "data: "
678            for line in lines.iter() {
679                prop_assert!(
680                    sse_string.contains(&format!("data: {}", line)),
681                    "Each line of multiline data must be prefixed with 'data: '"
682                );
683            }
684
685            // Property: Must end with double newline
686            prop_assert!(
687                sse_string.ends_with("\n\n"),
688                "SSE event must end with double newline"
689            );
690        }
691    }
692}