use crate::{
html::attribute::any_attribute::AnyAttribute,
view::{Position, RenderHtml},
};
use futures::Stream;
use std::{
collections::VecDeque,
fmt::{Debug, Write},
future::Future,
mem,
pin::Pin,
sync::Arc,
task::{Context, Poll},
};
#[derive(Default)]
pub struct StreamBuilder {
pub(crate) sync_buf: String,
pub(crate) chunks: VecDeque<StreamChunk>,
pending: Option<ChunkFuture>,
pending_ooo: VecDeque<PinnedFuture<OooChunk>>,
id: Option<Vec<u16>>,
}
type PinnedFuture<T> = Pin<Box<dyn Future<Output = T> + Send>>;
type ChunkFuture = PinnedFuture<VecDeque<StreamChunk>>;
impl StreamBuilder {
pub fn new(id: Option<Vec<u16>>) -> Self {
Self::with_capacity(0, id)
}
pub fn with_capacity(capacity: usize, id: Option<Vec<u16>>) -> Self {
Self {
id,
sync_buf: String::with_capacity(capacity),
..Default::default()
}
}
pub fn reserve(&mut self, additional: usize) {
self.sync_buf.reserve(additional);
}
pub fn push_sync(&mut self, string: &str) {
self.sync_buf.push_str(string);
}
pub fn push_async(
&mut self,
fut: impl Future<Output = VecDeque<StreamChunk>> + Send + 'static,
) {
let sync = mem::take(&mut self.sync_buf);
if !sync.is_empty() {
self.chunks.push_back(StreamChunk::Sync(sync));
}
self.chunks.push_back(StreamChunk::Async {
chunks: Box::pin(fut) as PinnedFuture<VecDeque<StreamChunk>>,
});
}
pub fn with_buf(&mut self, fun: impl FnOnce(&mut String)) {
fun(&mut self.sync_buf)
}
pub fn take_chunks(&mut self) -> VecDeque<StreamChunk> {
let sync = mem::take(&mut self.sync_buf);
if !sync.is_empty() {
self.chunks.push_back(StreamChunk::Sync(sync));
}
mem::take(&mut self.chunks)
}
pub fn append(&mut self, mut other: StreamBuilder) {
if !self.sync_buf.is_empty() {
self.chunks
.push_back(StreamChunk::Sync(mem::take(&mut self.sync_buf)));
}
self.chunks.append(&mut other.chunks);
self.sync_buf.push_str(&other.sync_buf);
}
pub fn finish(mut self) -> Self {
let sync_buf_remaining = mem::take(&mut self.sync_buf);
if sync_buf_remaining.is_empty() {
return self;
} else if let Some(StreamChunk::Sync(buf)) = self.chunks.back_mut() {
buf.push_str(&sync_buf_remaining);
} else {
self.chunks.push_back(StreamChunk::Sync(sync_buf_remaining));
}
self
}
pub fn push_fallback<View>(
&mut self,
fallback: View,
position: &mut Position,
mark_branches: bool,
extra_attrs: Vec<AnyAttribute>,
) where
View: RenderHtml,
{
self.write_chunk_marker(true);
fallback.to_html_with_buf(
&mut self.sync_buf,
position,
true,
mark_branches,
extra_attrs,
);
self.write_chunk_marker(false);
*position = Position::NextChild;
}
pub fn next_id(&mut self) {
if let Some(last) = self.id.as_mut().and_then(|ids| ids.last_mut()) {
*last += 1;
}
}
pub fn clone_id(&self) -> Option<Vec<u16>> {
self.id.clone()
}
pub fn child_id(&self) -> Option<Vec<u16>> {
let mut child = self.id.clone();
if let Some(child) = child.as_mut() {
child.push(0);
}
child
}
pub fn write_chunk_marker(&mut self, opening: bool) {
if let Some(id) = &self.id {
self.sync_buf.reserve(11 + (id.len() * 2));
self.sync_buf.push_str("<!--s-");
for piece in id {
write!(&mut self.sync_buf, "{piece}-").unwrap();
}
if opening {
self.sync_buf.push_str("o-->");
} else {
self.sync_buf.push_str("c-->");
}
}
}
pub fn push_async_out_of_order<View>(
&mut self,
view: impl Future<Output = Option<View>> + Send + 'static,
position: &mut Position,
mark_branches: bool,
extra_attrs: Vec<AnyAttribute>,
) where
View: RenderHtml,
{
self.push_async_out_of_order_with_nonce(
view,
position,
mark_branches,
None,
extra_attrs,
);
}
pub fn push_async_out_of_order_with_nonce<View>(
&mut self,
view: impl Future<Output = Option<View>> + Send + 'static,
position: &mut Position,
mark_branches: bool,
nonce: Option<Arc<str>>,
extra_attrs: Vec<AnyAttribute>,
) where
View: RenderHtml,
{
let id = self.clone_id();
let mut position = *position;
self.chunks.push_back(StreamChunk::OutOfOrder {
chunks: Box::pin(async move {
let view = view.await;
let mut subbuilder = StreamBuilder::new(id);
let mut id = String::new();
if let Some(ids) = &subbuilder.id {
for piece in ids {
write!(&mut id, "{piece}-").unwrap();
}
}
if let Some(id) = subbuilder.id.as_mut() {
id.push(0);
}
let replace = view.is_some();
view.to_html_async_with_buf::<true>(
&mut subbuilder,
&mut position,
true,
mark_branches,
extra_attrs,
);
let chunks = subbuilder.finish().take_chunks();
let mut flattened_chunks =
VecDeque::with_capacity(chunks.len());
for chunk in chunks {
if let StreamChunk::Async { chunks } = chunk {
flattened_chunks.extend(chunks.await);
} else {
flattened_chunks.push_back(chunk);
}
}
OooChunk {
id,
chunks: flattened_chunks,
replace,
nonce,
}
}),
});
}
}
impl Debug for StreamBuilder {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
f.debug_struct("StreamBuilderInner")
.field("sync_buf", &self.sync_buf)
.field("chunks", &self.chunks)
.field("pending", &self.pending.is_some())
.finish()
}
}
pub enum StreamChunk {
Sync(String),
Async {
chunks: PinnedFuture<VecDeque<StreamChunk>>,
},
OutOfOrder {
chunks: PinnedFuture<OooChunk>,
},
}
#[derive(Debug)]
pub struct OooChunk {
id: String,
chunks: VecDeque<StreamChunk>,
replace: bool,
nonce: Option<Arc<str>>,
}
impl OooChunk {
pub fn push_start(id: &str, buf: &mut String) {
buf.push_str("<template id=\"");
buf.push_str(id);
buf.push('f');
buf.push_str("\">");
}
pub fn push_end(replace: bool, id: &str, buf: &mut String) {
Self::push_end_with_nonce(replace, id, buf, None);
}
pub fn push_end_with_nonce(
replace: bool,
id: &str,
buf: &mut String,
nonce: Option<&str>,
) {
buf.push_str("</template>");
if let Some(nonce) = nonce {
buf.push_str("<script nonce=\"");
buf.push_str(nonce);
buf.push_str(r#"">(function() { let id = ""#);
} else {
buf.push_str(r#"<script>(function() { let id = ""#);
}
buf.push_str(id);
buf.push_str(
"\";let open = undefined;let close = undefined;let walker = \
document.createTreeWalker(document.body, \
NodeFilter.SHOW_COMMENT);while(walker.nextNode()) \
{if(walker.currentNode.textContent == `s-${id}o`){ \
open=walker.currentNode; } else \
if(walker.currentNode.textContent == `s-${id}c`) { close = \
walker.currentNode;}}let range = new Range(); \
range.setStartBefore(open); range.setEndBefore(close);",
);
if replace {
buf.push_str(
"range.deleteContents(); let tpl = \
document.getElementById(`${id}f`); \
close.parentNode.insertBefore(tpl.content.cloneNode(true), \
close);close.remove();",
);
} else {
buf.push_str("close.remove();open.remove();");
}
buf.push_str("})()</script>");
}
pub fn take_chunks(self) -> VecDeque<StreamChunk> {
self.chunks
}
}
impl Debug for StreamChunk {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
match self {
Self::Sync(arg0) => f.debug_tuple("Sync").field(arg0).finish(),
Self::Async { .. } => {
f.debug_struct("Async").finish_non_exhaustive()
}
Self::OutOfOrder { .. } => {
f.debug_struct("OutOfOrder").finish_non_exhaustive()
}
}
}
}
impl Stream for StreamBuilder {
type Item = String;
fn poll_next(
mut self: Pin<&mut Self>,
cx: &mut Context<'_>,
) -> Poll<Option<Self::Item>> {
let mut this = self.as_mut();
let pending = this.pending.take();
if let Some(mut pending) = pending {
match pending.as_mut().poll(cx) {
Poll::Pending => {
this.pending = Some(pending);
Poll::Pending
}
Poll::Ready(chunks) => {
for chunk in chunks.into_iter().rev() {
this.chunks.push_front(chunk);
}
self.poll_next(cx)
}
}
} else {
let next_chunk = this.chunks.pop_front();
match next_chunk {
None => {
if this.pending_ooo.is_empty() {
if this.sync_buf.is_empty() {
Poll::Ready(None)
} else {
Poll::Ready(Some(mem::take(&mut this.sync_buf)))
}
} else {
for mut chunk in mem::take(&mut this.pending_ooo) {
match chunk.as_mut().poll(cx) {
Poll::Ready(OooChunk {
id,
chunks,
replace,
nonce,
}) => {
let opening = format!("<!--s-{id}o-->");
let placeholder_at =
this.sync_buf.find(&opening);
if let Some(start) = placeholder_at {
let closing = format!("<!--s-{id}c-->");
let end = this
.sync_buf
.find(&closing)
.unwrap();
let chunks_iter =
chunks.into_iter().rev();
let (before, replaced) =
this.sync_buf.split_at(start);
let (_, after) = replaced.split_at(
end - start + closing.len(),
);
let mut buf = String::new();
buf.push_str(before);
let mut held_chunks = VecDeque::new();
for chunk in chunks_iter {
if let StreamChunk::Sync(ready) =
chunk
{
buf.push_str(&ready);
} else {
held_chunks.push_front(chunk);
}
}
buf.push_str(after);
this.sync_buf = buf;
for chunk in held_chunks {
this.chunks.push_front(chunk);
}
} else {
OooChunk::push_start(
&id,
&mut this.sync_buf,
);
for chunk in chunks.into_iter().rev() {
if let StreamChunk::Sync(ready) =
chunk
{
this.sync_buf.push_str(&ready);
} else {
this.chunks.push_front(chunk);
}
}
OooChunk::push_end_with_nonce(
replace,
&id,
&mut this.sync_buf,
nonce.as_deref(),
);
}
}
Poll::Pending => {
this.pending_ooo.push_back(chunk);
}
}
}
if this.sync_buf.is_empty() {
Poll::Pending
} else {
Poll::Ready(Some(mem::take(&mut this.sync_buf)))
}
}
}
Some(StreamChunk::Sync(value)) => {
this.sync_buf.push_str(&value);
loop {
match this.chunks.pop_front() {
None => break,
Some(StreamChunk::Async { chunks }) => {
this.chunks
.push_front(StreamChunk::Async { chunks });
break;
}
Some(StreamChunk::OutOfOrder {
chunks, ..
}) => {
this.pending_ooo.push_back(chunks);
break;
}
Some(StreamChunk::Sync(next)) => {
this.sync_buf.push_str(&next);
}
}
}
this.poll_next(cx)
}
Some(StreamChunk::Async { chunks, .. }) => {
this.pending = Some(chunks);
if this.sync_buf.is_empty() {
self.poll_next(cx)
} else {
Poll::Ready(Some(mem::take(&mut this.sync_buf)))
}
}
Some(StreamChunk::OutOfOrder { chunks, .. }) => {
this.pending_ooo.push_back(chunks);
if this.sync_buf.is_empty() {
self.poll_next(cx)
} else {
Poll::Ready(Some(mem::take(&mut this.sync_buf)))
}
}
}
}
}
}