1use std::pin::Pin;
60use std::task::{Context, Poll};
61use std::time::Duration;
62
63use asupersync::stream::Stream;
64
65use crate::response::{Response, ResponseBody, StatusCode};
66
67#[derive(Debug, Clone)]
97pub struct SseEvent {
98 data: Option<String>,
99 event_type: Option<String>,
100 id: Option<String>,
101 retry: Option<u64>,
102 comment: Option<String>,
103}
104
105impl SseEvent {
106 #[must_use]
110 pub fn new(data: impl Into<String>) -> Self {
111 Self {
112 data: Some(data.into()),
113 event_type: None,
114 id: None,
115 retry: None,
116 comment: None,
117 }
118 }
119
120 #[must_use]
124 pub fn message(data: impl Into<String>) -> Self {
125 Self::new(data)
126 }
127
128 #[must_use]
139 pub fn comment(comment: impl Into<String>) -> Self {
140 Self {
141 data: None,
142 event_type: None,
143 id: None,
144 retry: None,
145 comment: Some(comment.into()),
146 }
147 }
148
149 #[must_use]
160 pub fn event_type(mut self, event_type: impl Into<String>) -> Self {
161 self.event_type = Some(event_type.into());
162 self
163 }
164
165 #[must_use]
177 pub fn id(mut self, id: impl Into<String>) -> Self {
178 self.id = Some(id.into());
179 self
180 }
181
182 #[must_use]
194 pub fn retry_ms(mut self, milliseconds: u64) -> Self {
195 self.retry = Some(milliseconds);
196 self
197 }
198
199 #[must_use]
210 pub fn retry(self, duration: Duration) -> Self {
211 self.retry_ms(duration.as_millis() as u64)
212 }
213
214 #[must_use]
218 pub fn to_bytes(&self) -> Vec<u8> {
219 let mut output = Vec::with_capacity(256);
220
221 if let Some(ref comment) = self.comment {
223 for line in comment.lines() {
224 output.extend_from_slice(b": ");
225 output.extend_from_slice(line.as_bytes());
226 output.push(b'\n');
227 }
228 }
229
230 if let Some(ref event_type) = self.event_type {
232 output.extend_from_slice(b"event: ");
233 output.extend_from_slice(event_type.as_bytes());
234 output.push(b'\n');
235 }
236
237 if let Some(ref id) = self.id {
239 output.extend_from_slice(b"id: ");
240 output.extend_from_slice(id.as_bytes());
241 output.push(b'\n');
242 }
243
244 if let Some(retry) = self.retry {
246 output.extend_from_slice(b"retry: ");
247 output.extend_from_slice(retry.to_string().as_bytes());
248 output.push(b'\n');
249 }
250
251 if let Some(ref data) = self.data {
253 for line in data.lines() {
254 output.extend_from_slice(b"data: ");
255 output.extend_from_slice(line.as_bytes());
256 output.push(b'\n');
257 }
258 if data.is_empty() {
260 output.extend_from_slice(b"data: \n");
261 }
262 }
263
264 output.push(b'\n');
266
267 output
268 }
269}
270
271impl From<&str> for SseEvent {
272 fn from(data: &str) -> Self {
273 Self::new(data)
274 }
275}
276
277impl From<String> for SseEvent {
278 fn from(data: String) -> Self {
279 Self::new(data)
280 }
281}
282
283pub struct SseStream<S> {
287 inner: S,
288}
289
290impl<S> SseStream<S> {
291 pub fn new(stream: S) -> Self {
293 Self { inner: stream }
294 }
295}
296
297impl<S> Stream for SseStream<S>
298where
299 S: Stream<Item = SseEvent> + Unpin,
300{
301 type Item = Vec<u8>;
302
303 fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
304 match Pin::new(&mut self.inner).poll_next(cx) {
305 Poll::Ready(Some(event)) => Poll::Ready(Some(event.to_bytes())),
306 Poll::Ready(None) => Poll::Ready(None),
307 Poll::Pending => Poll::Pending,
308 }
309 }
310}
311
312#[derive(Debug, Clone)]
314pub struct SseConfig {
315 pub keep_alive_secs: u64,
317 pub keep_alive_comment: String,
319}
320
321impl Default for SseConfig {
322 fn default() -> Self {
323 Self {
324 keep_alive_secs: 30,
325 keep_alive_comment: "keep-alive".to_string(),
326 }
327 }
328}
329
330impl SseConfig {
331 #[must_use]
333 pub fn new() -> Self {
334 Self::default()
335 }
336
337 #[must_use]
339 pub fn keep_alive_secs(mut self, seconds: u64) -> Self {
340 self.keep_alive_secs = seconds;
341 self
342 }
343
344 #[must_use]
346 pub fn disable_keep_alive(mut self) -> Self {
347 self.keep_alive_secs = 0;
348 self
349 }
350
351 #[must_use]
353 pub fn keep_alive_comment(mut self, comment: impl Into<String>) -> Self {
354 self.keep_alive_comment = comment.into();
355 self
356 }
357}
358
359pub struct SseResponse<S> {
378 stream: S,
379 _config: SseConfig,
380}
381
382impl<S> SseResponse<S>
383where
384 S: Stream<Item = SseEvent> + Send + Unpin + 'static,
385{
386 pub fn new(stream: S) -> Self {
388 Self {
389 stream,
390 _config: SseConfig::default(),
391 }
392 }
393
394 pub fn with_config(stream: S, config: SseConfig) -> Self {
396 Self {
397 stream,
398 _config: config,
399 }
400 }
401
402 #[must_use]
409 pub fn into_response(self) -> Response {
410 let sse_stream = SseStream::new(self.stream);
411
412 Response::with_status(StatusCode::OK)
413 .header("content-type", b"text/event-stream".to_vec())
414 .header("cache-control", b"no-cache".to_vec())
415 .header("connection", b"keep-alive".to_vec())
416 .header("x-accel-buffering", b"no".to_vec()) .body(ResponseBody::stream(sse_stream))
418 }
419}
420
421pub fn sse_response<S>(stream: S) -> Response
436where
437 S: Stream<Item = SseEvent> + Send + Unpin + 'static,
438{
439 SseResponse::new(stream).into_response()
440}
441
442#[cfg(test)]
447mod tests {
448 use super::*;
449
450 #[test]
451 fn event_simple_message() {
452 let event = SseEvent::message("Hello, World!");
453 let bytes = event.to_bytes();
454 let output = String::from_utf8_lossy(&bytes);
455 assert!(output.contains("data: Hello, World!\n"));
456 assert!(output.ends_with("\n\n"));
457 }
458
459 #[test]
460 fn event_with_type() {
461 let event = SseEvent::new("user joined").event_type("join");
462 let bytes = event.to_bytes();
463 let output = String::from_utf8_lossy(&bytes);
464 assert!(output.contains("event: join\n"));
465 assert!(output.contains("data: user joined\n"));
466 }
467
468 #[test]
469 fn event_with_id() {
470 let event = SseEvent::new("data").id("12345");
471 let bytes = event.to_bytes();
472 let output = String::from_utf8_lossy(&bytes);
473 assert!(output.contains("id: 12345\n"));
474 }
475
476 #[test]
477 fn event_with_retry() {
478 let event = SseEvent::new("data").retry_ms(5000);
479 let bytes = event.to_bytes();
480 let output = String::from_utf8_lossy(&bytes);
481 assert!(output.contains("retry: 5000\n"));
482 }
483
484 #[test]
485 fn event_multiline_data() {
486 let event = SseEvent::new("line1\nline2\nline3");
487 let bytes = event.to_bytes();
488 let output = String::from_utf8_lossy(&bytes);
489 assert!(output.contains("data: line1\n"));
490 assert!(output.contains("data: line2\n"));
491 assert!(output.contains("data: line3\n"));
492 }
493
494 #[test]
495 fn event_comment() {
496 let event = SseEvent::comment("keep-alive");
497 let bytes = event.to_bytes();
498 let output = String::from_utf8_lossy(&bytes);
499 assert!(output.contains(": keep-alive\n"));
500 }
501
502 #[test]
503 fn event_full_format() {
504 let event = SseEvent::new("payload")
505 .event_type("update")
506 .id("42")
507 .retry_ms(3000);
508 let bytes = event.to_bytes();
509 let output = String::from_utf8_lossy(&bytes);
510
511 let event_pos = output.find("event:").unwrap();
513 let id_pos = output.find("id:").unwrap();
514 let retry_pos = output.find("retry:").unwrap();
515 let data_pos = output.find("data:").unwrap();
516
517 assert!(event_pos < id_pos);
518 assert!(id_pos < retry_pos);
519 assert!(retry_pos < data_pos);
520 }
521
522 #[test]
523 fn event_from_str() {
524 let event: SseEvent = "Hello".into();
525 let bytes = event.to_bytes();
526 let output = String::from_utf8_lossy(&bytes);
527 assert!(output.contains("data: Hello\n"));
528 }
529
530 #[test]
531 fn event_from_string() {
532 let event: SseEvent = String::from("World").into();
533 let bytes = event.to_bytes();
534 let output = String::from_utf8_lossy(&bytes);
535 assert!(output.contains("data: World\n"));
536 }
537
538 #[test]
539 fn config_defaults() {
540 let config = SseConfig::default();
541 assert_eq!(config.keep_alive_secs, 30);
542 assert_eq!(config.keep_alive_comment, "keep-alive");
543 }
544
545 #[test]
546 fn config_custom() {
547 let config = SseConfig::new()
548 .keep_alive_secs(60)
549 .keep_alive_comment("heartbeat");
550 assert_eq!(config.keep_alive_secs, 60);
551 assert_eq!(config.keep_alive_comment, "heartbeat");
552 }
553
554 #[test]
555 fn config_disable_keepalive() {
556 let config = SseConfig::new().disable_keep_alive();
557 assert_eq!(config.keep_alive_secs, 0);
558 }
559
560 #[test]
561 fn event_empty_data() {
562 let event = SseEvent::new("");
563 let bytes = event.to_bytes();
564 let output = String::from_utf8_lossy(&bytes);
565 assert!(output.contains("data: \n"));
566 }
567
568 #[test]
569 fn retry_from_duration() {
570 let event = SseEvent::new("data").retry(Duration::from_secs(10));
571 let bytes = event.to_bytes();
572 let output = String::from_utf8_lossy(&bytes);
573 assert!(output.contains("retry: 10000\n"));
574 }
575}