#![doc = include_str!("../README.md")]
mod cancel_on_drop;
mod format;
mod html;
mod html_stream;
mod resp_adapter;
#[cfg(test)]
mod tests;
use std::{any::Any, fmt, panic::AssertUnwindSafe, sync::Arc};
use futures::FutureExt;
pub use html::Html;
use tokio::sync::mpsc;
use tokio_util::sync::CancellationToken;
use tracing::{Instrument, Span, debug, instrument, trace, warn};
pub use self::format::GLOBAL_SCRIPT_CONTENTS;
use self::{cancel_on_drop::CancelOnDrop, html_stream::HtmlStream};
type Id = ulid::Ulid;
pub fn new() -> (SuspenseContext, SuspendedResponse) {
new_with_opts(ColumboOptions::default())
}
#[instrument(name = "columbo::new", skip_all)]
pub fn new_with_opts(
options: ColumboOptions,
) -> (SuspenseContext, SuspendedResponse) {
let (tx, rx) = mpsc::unbounded_channel();
let cancel = CancellationToken::new();
let opts = Arc::new(options);
debug!("created new suspense context and response");
(
SuspenseContext {
tx,
opts: opts.clone(),
cancel: cancel.clone(),
},
SuspendedResponse {
rx,
opts,
cancel: CancelOnDrop::new(cancel),
},
)
}
#[derive(Clone)]
pub struct SuspenseContext {
tx: mpsc::UnboundedSender<Html>,
opts: Arc<ColumboOptions>,
cancel: CancellationToken,
}
impl SuspenseContext {
fn new_id() -> Id { ulid::Ulid::new() }
#[instrument(name = "columbo::suspend", skip_all, fields(suspense.id))]
pub fn suspend<Fut, M>(
&self,
fut: Fut,
placeholder: impl Into<Html>,
) -> Suspense
where
Fut: Future<Output = M> + Send + 'static,
M: Into<Html> + 'static,
{
let id = Self::new_id();
Span::current().record("suspense.id", id.to_string());
tokio::spawn(
self
.clone()
.run_suspended(id, fut)
.instrument(tracing::info_span!("columbo::suspended_task")),
);
Suspense::new(id, placeholder.into())
}
pub async fn cancelled(&self) { self.cancel.cancelled().await; }
pub fn is_cancelled(&self) -> bool { self.cancel.is_cancelled() }
async fn run_suspended<Fut, M>(self, id: Id, future: Fut)
where
Fut: Future<Output = M> + Send + 'static,
M: Into<Html>,
{
let auto_cancel = self.opts.auto_cancel.unwrap_or(false);
let future = AssertUnwindSafe(future).catch_unwind();
let result = if auto_cancel {
tokio::select! {
_ = self.cancel.cancelled() => {
trace!(suspense.id = %id, "task exited via auto_cancel");
return; }
result = future => result,
}
} else {
future.await
};
let panic_handler = self
.opts
.panic_renderer
.unwrap_or(crate::format::default_panic_renderer);
let content: Html = match result {
Ok(m) => m.into(),
Err(panic_payload) => {
warn!(suspense.id = %id, "suspended task panicked; rendering panic");
panic_handler(panic_payload)
}
};
let payload = format::render_replacement(&id, &content);
let _ = self.tx.send(payload).inspect_err(|_| {
trace!(suspense.id = %id, "future completed but receiver is dropped");
});
}
}
#[derive(Clone, Debug, Default)]
pub struct ColumboOptions {
pub panic_renderer: Option<fn(Box<dyn Any + Send>) -> Html>,
pub auto_cancel: Option<bool>,
pub include_script: Option<bool>,
}
pub struct SuspendedResponse {
rx: mpsc::UnboundedReceiver<Html>,
opts: Arc<ColumboOptions>,
cancel: CancelOnDrop,
}
impl SuspendedResponse {
#[instrument(name = "columbo::into_stream", skip_all)]
pub fn into_stream(self, body: impl Into<Html>) -> HtmlStream {
debug!("converting suspended response into stream");
HtmlStream::new(self, body.into())
}
}
pub struct Suspense {
id: Id,
placeholder: Html,
}
impl fmt::Debug for Suspense {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
f.debug_struct("Suspense").field("id", &self.id).finish()
}
}
impl Suspense {
fn new(id: Id, placeholder: Html) -> Self { Suspense { id, placeholder } }
pub fn render_to_html(&self) -> Html {
format::render_placeholder(&self.id, &self.placeholder)
}
}
impl fmt::Display for Suspense {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
f.write_str(self.render_to_html().as_str())
}
}
#[cfg(feature = "maud")]
impl maud::Render for Suspense {
fn render(&self) -> maud::Markup {
maud::PreEscaped(self.render_to_html().into_string())
}
}