use std::{
fmt::Display,
io::{self, Write},
sync::{
mpsc::{self, Sender},
Arc, Barrier,
},
thread,
};
use crate::{internal::common::ForceLock, Request};
pub struct ServerSentEventStream {
stream: Sender<EventType>,
pub last_index: Option<u32>,
}
pub struct Event {
id: Option<u32>,
event: String,
data: String,
}
enum EventType {
Event(Event),
SetRetry(u32),
Close(Arc<Barrier>),
}
impl ServerSentEventStream {
pub fn send(&self, event_type: impl AsRef<str>, data: impl Display) {
let _ = self.stream.send(Event::new(event_type).data(data).into());
}
pub fn send_id(&self, event_type: impl AsRef<str>, id: u32, data: impl Display) {
let _ = self
.stream
.send(Event::new(event_type).id(id).data(data).into());
}
pub fn send_event(&self, event: Event) {
let _ = self.stream.send(event.into());
}
pub fn set_retry(&self, retry: u32) {
let _ = self.stream.send(EventType::SetRetry(retry));
}
pub fn close(&self) {
let barrier = Arc::new(Barrier::new(2));
let _ = self.stream.send(EventType::Close(barrier.clone()));
barrier.wait();
}
pub fn from_request(this: &Request) -> io::Result<Self> {
let last_index = this
.headers
.get("Last-Event-ID")
.and_then(|id| id.parse::<u32>().ok());
let socket = this.socket.clone();
socket.force_lock().write_all(b"HTTP/1.1 200 OK\r\nContent-Type: text/event-stream\r\nCache-Control: no-cache\r\n\r\n")?;
let (tx, rx) = mpsc::channel::<EventType>();
thread::Builder::new()
.name("SSE worker".to_owned())
.spawn(move || {
for event in rx {
match event {
EventType::Event(e) => {
let _ = socket.force_lock().write_all(e.to_string().as_bytes());
}
EventType::SetRetry(retry) => {
let _ = socket
.force_lock()
.write_all(format!("retry: {retry}\n\n").as_bytes());
}
EventType::Close(b) => {
b.wait();
break;
}
}
}
})
.unwrap();
Ok(Self {
stream: tx,
last_index,
})
}
}
impl Event {
pub fn new(event_type: impl AsRef<str>) -> Self {
Self {
id: None,
event: event_type.as_ref().to_owned(),
data: String::new(),
}
}
pub fn id(mut self, id: u32) -> Self {
self.id = Some(id);
self
}
pub fn data(mut self, data: impl Display) -> Self {
self.data.push_str(&data.to_string());
self
}
}
impl ToString for Event {
fn to_string(&self) -> String {
let mut out = String::new();
if let Some(id) = self.id {
out.push_str(&format!("id: {id}\n"));
}
let event = &self.event;
out.push_str(&format!("event: {event}\n"));
for i in self.data.split('\n') {
out.push_str(&format!("data: {i}\n"));
}
out.push('\n');
out
}
}
pub trait ServerSentEventsExt {
fn sse(&self) -> io::Result<ServerSentEventStream>;
}
impl ServerSentEventsExt for Request {
fn sse(&self) -> io::Result<ServerSentEventStream> {
ServerSentEventStream::from_request(self)
}
}
impl From<Event> for EventType {
fn from(event: Event) -> Self {
Self::Event(event)
}
}
#[cfg(test)]
mod test {
use super::Event;
#[test]
fn test_sse_event_format() {
let event = Event::new("event");
assert_eq!(event.to_string(), "event: event\ndata: \n\n");
let event = Event::new("update").id(1).data("Hello");
assert_eq!(event.to_string(), "id: 1\nevent: update\ndata: Hello\n\n");
}
}