use std::cell::RefCell;
use std::future::Future;
use std::pin::Pin;
use std::rc::Rc;
use bytes::Bytes;
use futures_core::Stream;
use futures_util::stream::FuturesUnordered;
use futures_util::StreamExt;
pub struct StreamError(pub String);
impl std::fmt::Display for StreamError {
fn fmt(&self, formatter: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
formatter.write_str(&self.0)
}
}
pub enum HtmlChunk {
Static(Bytes),
Suspense {
slot_id: u64,
fallback_html: Bytes,
future: Pin<Box<dyn Future<Output = Result<String, StreamError>> + Send>>,
},
}
type PendingFuture =
Pin<Box<dyn Future<Output = (u64, Bytes, Result<String, StreamError>)> + Send>>;
pub struct PendingSuspense {
pub slot_id: u64,
pub fallback_html: Bytes,
pub future: Pin<Box<dyn Future<Output = Result<String, StreamError>> + Send>>,
}
pub struct RenderStreamContext {
next_slot_id: u64,
pending: Vec<PendingSuspense>,
}
impl RenderStreamContext {
fn new() -> Self {
Self {
next_slot_id: 0,
pending: Vec::new(),
}
}
pub fn next_slot_id(&mut self) -> u64 {
self.next_slot_id += 1;
self.next_slot_id
}
pub fn register(&mut self, pending: PendingSuspense) {
self.pending.push(pending);
}
fn take_pending(&mut self) -> Vec<PendingSuspense> {
std::mem::take(&mut self.pending)
}
}
thread_local! {
static RENDER_STREAM_CONTEXT: RefCell<Option<Rc<RefCell<RenderStreamContext>>>> =
const { RefCell::new(None) };
}
pub fn with_active<R>(callback: impl FnOnce(&mut RenderStreamContext) -> R) -> R {
RENDER_STREAM_CONTEXT.with(|cell| {
let context = cell
.borrow()
.as_ref()
.cloned()
.expect("RenderStreamContext not active");
let mut state = context.borrow_mut();
callback(&mut state)
})
}
pub fn is_active() -> bool {
RENDER_STREAM_CONTEXT.with(|cell| cell.borrow().is_some())
}
pub fn render_streaming(
chunks: Vec<HtmlChunk>,
) -> impl Stream<Item = Result<Bytes, std::io::Error>> + Send {
async_gen(chunks)
}
pub fn render_streaming_with(
build_body: impl FnOnce() -> String,
tail: Bytes,
) -> impl Stream<Item = Result<Bytes, std::io::Error>> + Send {
let context = Rc::new(RefCell::new(RenderStreamContext::new()));
let body_html = RENDER_STREAM_CONTEXT.with(|cell| {
let previous = cell.borrow_mut().replace(context.clone());
let result = build_body();
*cell.borrow_mut() = previous;
result
});
let pending = context.borrow_mut().take_pending();
rsx_stream(Bytes::from(body_html), pending, tail)
}
fn rsx_stream(
body_html: Bytes,
pending: Vec<PendingSuspense>,
tail: Bytes,
) -> impl Stream<Item = Result<Bytes, std::io::Error>> + Send {
let (sender, receiver) =
tokio::sync::mpsc::unbounded_channel::<Result<Bytes, std::io::Error>>();
tokio::spawn(async move {
let _ = sender.send(Ok(body_html));
let _ = sender.send(Ok(tail));
let mut futures: FuturesUnordered<PendingFuture> = FuturesUnordered::new();
for PendingSuspense {
slot_id,
fallback_html,
future,
} in pending
{
futures.push(Box::pin(async move {
let result = future.await;
(slot_id, fallback_html, result)
}));
}
while let Some((slot_id, fallback_html, result)) = futures.next().await {
let body = match result {
Ok(html) => html,
Err(error) => {
tracing::error!(slot_id, error = %error, "suspense future failed; keeping fallback");
std::str::from_utf8(&fallback_html).unwrap_or("").to_owned()
}
};
let chunk = Bytes::from(format!(
"<template id=\"T:{slot_id}\">{body}</template>\
<script>$ISLANDS_REPLACE(\"S:{slot_id}\",\"T:{slot_id}\")</script>"
));
let _ = sender.send(Ok(chunk));
}
});
tokio_stream::wrappers::UnboundedReceiverStream::new(receiver)
}
fn async_gen(
chunks: Vec<HtmlChunk>,
) -> impl Stream<Item = Result<Bytes, std::io::Error>> + Send {
let (sender, receiver) =
tokio::sync::mpsc::unbounded_channel::<Result<Bytes, std::io::Error>>();
tokio::spawn(async move {
let mut pending: FuturesUnordered<PendingFuture> = FuturesUnordered::new();
for chunk in chunks {
match chunk {
HtmlChunk::Static(bytes) => {
let _ = sender.send(Ok(bytes));
}
HtmlChunk::Suspense {
slot_id,
fallback_html,
future,
} => {
let slot_html = Bytes::from(format!(
"<div data-suspense-slot id=\"S:{slot_id}\">{}</div>",
std::str::from_utf8(&fallback_html).unwrap_or("")
));
let _ = sender.send(Ok(slot_html));
let future_pending: PendingFuture = Box::pin(async move {
let result = future.await;
(slot_id, fallback_html, result)
});
pending.push(future_pending);
}
}
}
while let Some((slot_id, fallback_html, result)) = pending.next().await {
let body = match result {
Ok(html) => html,
Err(error) => {
tracing::error!(slot_id, error = %error, "suspense future failed; keeping fallback");
std::str::from_utf8(&fallback_html).unwrap_or("").to_owned()
}
};
let chunk = Bytes::from(format!(
"<template id=\"T:{slot_id}\">{body}</template>\
<script>$ISLANDS_REPLACE(\"S:{slot_id}\",\"T:{slot_id}\")</script>"
));
let _ = sender.send(Ok(chunk));
}
});
tokio_stream::wrappers::UnboundedReceiverStream::new(receiver)
}