islands-core 0.1.2

Server-side SSR primitives for islands.rs: island markers, the page shell, the asset manifest, and streaming Suspense.
Documentation
use std::cell::RefCell;
use std::future::Future;
use std::pin::Pin;
use std::rc::Rc;

use bytes::Bytes;
use futures_core::Stream;
use futures_util::stream::FuturesUnordered;
use futures_util::StreamExt;

pub struct StreamError(pub String);

impl std::fmt::Display for StreamError {
    fn fmt(&self, formatter: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
        formatter.write_str(&self.0)
    }
}

pub enum HtmlChunk {
    Static(Bytes),
    Suspense {
        slot_id: u64,
        fallback_html: Bytes,
        future: Pin<Box<dyn Future<Output = Result<String, StreamError>> + Send>>,
    },
}

type PendingFuture =
    Pin<Box<dyn Future<Output = (u64, Bytes, Result<String, StreamError>)> + Send>>;

/// A pending Suspense boundary registered through the rsx `<Suspense>` component.
///
/// Carries enough state to flush as an `HtmlChunk::Suspense` after the body
/// closure completes.
pub struct PendingSuspense {
    pub slot_id: u64,
    pub fallback_html: Bytes,
    pub future: Pin<Box<dyn Future<Output = Result<String, StreamError>> + Send>>,
}

/// Task-local state held by `render_streaming_with` (and any future helper that
/// composes Suspense boundaries from rsx-rendered HTML).
///
/// `<Suspense>` reads this through `with_active(...)` to allocate a slot ID and
/// register its future. Reading the context while it is not active panics with
/// the exact message `RenderStreamContext not active` — see AC-S4.
pub struct RenderStreamContext {
    next_slot_id: u64,
    pending: Vec<PendingSuspense>,
}

impl RenderStreamContext {
    fn new() -> Self {
        Self {
            next_slot_id: 0,
            pending: Vec::new(),
        }
    }

    /// Allocate a fresh slot ID. IDs are 1-based to match the existing
    /// `S:1`/`T:1` markers emitted by `render_streaming`.
    pub fn next_slot_id(&mut self) -> u64 {
        self.next_slot_id += 1;
        self.next_slot_id
    }

    pub fn register(&mut self, pending: PendingSuspense) {
        self.pending.push(pending);
    }

    fn take_pending(&mut self) -> Vec<PendingSuspense> {
        std::mem::take(&mut self.pending)
    }
}

thread_local! {
    static RENDER_STREAM_CONTEXT: RefCell<Option<Rc<RefCell<RenderStreamContext>>>> =
        const { RefCell::new(None) };
}

/// Invoke `callback` with mutable access to the active `RenderStreamContext`.
///
/// Panics with `RenderStreamContext not active` if no context has been
/// established (i.e. the caller is not running inside `render_streaming_with`).
/// This is the AC-S4 panic mechanism.
pub fn with_active<R>(callback: impl FnOnce(&mut RenderStreamContext) -> R) -> R {
    RENDER_STREAM_CONTEXT.with(|cell| {
        let context = cell
            .borrow()
            .as_ref()
            .cloned()
            .expect("RenderStreamContext not active");
        let mut state = context.borrow_mut();
        callback(&mut state)
    })
}

/// Returns true when a `RenderStreamContext` is currently established on this
/// thread. Useful for testing and for components that want to fall back to a
/// non-streaming path when not inside `render_streaming_with`.
pub fn is_active() -> bool {
    RENDER_STREAM_CONTEXT.with(|cell| cell.borrow().is_some())
}

/// Streams HTML chunks, resolving Suspense boundaries asynchronously.
///
/// Algorithm:
/// 1. Walk chunks linearly. Static → yield immediately. Suspense → emit the
///    slot placeholder, push the future to FuturesUnordered.
/// 2. After all chunks walked, drain FuturesUnordered. Each resolved future
///    emits a `<template>` + inline replace script. Errors preserve the fallback
///    and are logged via `tracing::error!`.
pub fn render_streaming(
    chunks: Vec<HtmlChunk>,
) -> impl Stream<Item = Result<Bytes, std::io::Error>> + Send {
    async_gen(chunks)
}

/// Streams HTML chunks produced by `build_body`, allowing rsx-side
/// `<Suspense>` components to register their futures through the task-local
/// `RenderStreamContext`.
///
/// The callback receives no arguments and returns the head + body HTML to emit
/// before the Suspense boundaries are drained. Any `<Suspense>` rendered inside
/// the callback installs a slot placeholder in the returned string AND
/// registers its future for later resolution.
///
/// `tail` is appended after the body and before the Suspense drain (typical
/// usage: `</body></html>` minus any prefixes already in the body).
pub fn render_streaming_with(
    build_body: impl FnOnce() -> String,
    tail: Bytes,
) -> impl Stream<Item = Result<Bytes, std::io::Error>> + Send {
    let context = Rc::new(RefCell::new(RenderStreamContext::new()));
    let body_html = RENDER_STREAM_CONTEXT.with(|cell| {
        let previous = cell.borrow_mut().replace(context.clone());
        let result = build_body();
        *cell.borrow_mut() = previous;
        result
    });
    let pending = context.borrow_mut().take_pending();
    rsx_stream(Bytes::from(body_html), pending, tail)
}

/// Stream a pre-rendered body (whose Suspense placeholders are already
/// inlined via `Suspense::render`) followed by the resolution of each pending
/// future as `<template>` + replace script chunks.
///
/// Distinct from `async_gen` (which is keyed by `HtmlChunk::Suspense` and
/// re-emits each placeholder slot): callers of `render_streaming_with` have
/// already embedded the slot HTML in `body_html` via `Suspense::render()`,
/// so this path emits ONLY the future-resolution side of each boundary.
fn rsx_stream(
    body_html: Bytes,
    pending: Vec<PendingSuspense>,
    tail: Bytes,
) -> impl Stream<Item = Result<Bytes, std::io::Error>> + Send {
    let (sender, receiver) =
        tokio::sync::mpsc::unbounded_channel::<Result<Bytes, std::io::Error>>();

    tokio::spawn(async move {
        let _ = sender.send(Ok(body_html));
        let _ = sender.send(Ok(tail));

        let mut futures: FuturesUnordered<PendingFuture> = FuturesUnordered::new();
        for PendingSuspense {
            slot_id,
            fallback_html,
            future,
        } in pending
        {
            futures.push(Box::pin(async move {
                let result = future.await;
                (slot_id, fallback_html, result)
            }));
        }

        while let Some((slot_id, fallback_html, result)) = futures.next().await {
            let body = match result {
                Ok(html) => html,
                Err(error) => {
                    tracing::error!(slot_id, error = %error, "suspense future failed; keeping fallback");
                    std::str::from_utf8(&fallback_html).unwrap_or("").to_owned()
                }
            };
            let chunk = Bytes::from(format!(
                "<template id=\"T:{slot_id}\">{body}</template>\
<script>$ISLANDS_REPLACE(\"S:{slot_id}\",\"T:{slot_id}\")</script>"
            ));
            let _ = sender.send(Ok(chunk));
        }
    });

    tokio_stream::wrappers::UnboundedReceiverStream::new(receiver)
}

fn async_gen(
    chunks: Vec<HtmlChunk>,
) -> impl Stream<Item = Result<Bytes, std::io::Error>> + Send {
    // Phase 1: walk static chunks, collecting Suspense slots into pending.
    // Phase 2: drain pending futures, emitting template+script per slot.
    // We implement this as a hand-rolled stream using poll_fn over an async
    // block that yields into a channel.

    let (sender, receiver) =
        tokio::sync::mpsc::unbounded_channel::<Result<Bytes, std::io::Error>>();

    tokio::spawn(async move {
        let mut pending: FuturesUnordered<PendingFuture> = FuturesUnordered::new();

        for chunk in chunks {
            match chunk {
                HtmlChunk::Static(bytes) => {
                    let _ = sender.send(Ok(bytes));
                }
                HtmlChunk::Suspense {
                    slot_id,
                    fallback_html,
                    future,
                } => {
                    let slot_html = Bytes::from(format!(
                        "<div data-suspense-slot id=\"S:{slot_id}\">{}</div>",
                        std::str::from_utf8(&fallback_html).unwrap_or("")
                    ));
                    let _ = sender.send(Ok(slot_html));
                    let future_pending: PendingFuture = Box::pin(async move {
                        let result = future.await;
                        (slot_id, fallback_html, result)
                    });
                    pending.push(future_pending);
                }
            }
        }

        // Drain all pending suspense futures.
        while let Some((slot_id, fallback_html, result)) = pending.next().await {
            let body = match result {
                Ok(html) => html,
                Err(error) => {
                    tracing::error!(slot_id, error = %error, "suspense future failed; keeping fallback");
                    std::str::from_utf8(&fallback_html).unwrap_or("").to_owned()
                }
            };
            let chunk = Bytes::from(format!(
                "<template id=\"T:{slot_id}\">{body}</template>\
<script>$ISLANDS_REPLACE(\"S:{slot_id}\",\"T:{slot_id}\")</script>"
            ));
            let _ = sender.send(Ok(chunk));
        }
        // sender drops here, closing the channel.
    });

    tokio_stream::wrappers::UnboundedReceiverStream::new(receiver)
}