use std::{convert::Infallible, fmt::Display};
use axum::response::{
IntoResponse, Response, Sse,
sse::{self, KeepAlive},
};
use futures::StreamExt;
mod js_script;
mod patch_elements;
mod patch_signals;
pub use js_script::JsScript;
pub use patch_elements::{PatchElements, PatchElementsMode, ViewTransition};
pub use patch_signals::PatchSignals;
pub struct EventReceiver(tokio::sync::mpsc::UnboundedReceiver<sse::Event>);
pub fn events() -> (EventSender, EventReceiver) {
let (tx, rx) = tokio::sync::mpsc::unbounded_channel();
(EventSender { tx }, EventReceiver(rx))
}
impl IntoResponse for EventReceiver {
fn into_response(self) -> Response {
let stream = tokio_stream::wrappers::UnboundedReceiverStream::new(self.0);
let stream = stream.map(Ok::<sse::Event, Infallible>);
Sse::new(stream)
.keep_alive(KeepAlive::default())
.into_response()
}
}
pub struct Event(pub(super) sse::Event);
#[derive(Debug)]
pub enum Error {
ReceiverHang,
InvalidSignalPatch(String),
}
impl Display for Error {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
match self {
Error::ReceiverHang => write!(f, "receiver hang"),
Error::InvalidSignalPatch(error) => {
write!(f, "invalid signal patch: {error}")
}
}
}
}
impl std::error::Error for Error {}
impl From<Infallible> for Error {
fn from(value: Infallible) -> Self {
match value {}
}
}
#[derive(Debug, Clone)]
pub struct EventSender {
tx: tokio::sync::mpsc::UnboundedSender<sse::Event>,
}
impl EventSender {
pub fn send<T>(&self, ev: T) -> Result<(), Error>
where
T: TryInto<Event>,
Error: From<T::Error>,
{
let ev = ev.try_into().map_err(Error::from)?;
self.tx.send(ev.0).map_err(|_| Error::ReceiverHang)
}
}
fn sanitize_axum_sse_data(data: String) -> String {
data.replace("\r\n", "\n").replace('\r', "\n")
}
const DATASTAR_PATCH_ELEMENTS: &str = "datastar-patch-elements";
const DATASTAR_PATCH_SIGNALS: &str = "datastar-patch-signals";
#[cfg(test)]
fn sse_response<E>(event: E) -> Response
where
E: TryInto<Event> + Send + 'static,
Error: From<E::Error>,
{
let (tx, rx) = events();
tokio::spawn(async move {
tx.send(event).expect("event receiver should still be open");
});
rx.into_response()
}
#[cfg(test)]
async fn read_sse_body<E>(event: E) -> String
where
E: TryInto<Event> + Send + 'static,
Error: From<E::Error>,
{
use crate::test_utils::read_axum_body;
let response = sse_response(event);
assert_eq!(response.status(), axum::http::StatusCode::OK);
assert_eq!(
response
.headers()
.get("content-type")
.expect("stream response should set content-type header"),
"text/event-stream"
);
read_axum_body(response).await
}