1#![doc = include_str!("../README.md")]
2
3mod cancel_on_drop;
4mod format;
5mod html;
6mod html_stream;
7mod resp_adapter;
8
9#[cfg(test)]
10mod tests;
11
12use std::{any::Any, fmt, panic::AssertUnwindSafe, sync::Arc};
13
14use futures::FutureExt;
15pub use html::Html;
16use tokio::sync::mpsc;
17use tokio_util::sync::CancellationToken;
18use tracing::{Instrument, Span, debug, instrument, trace, warn};
19
20pub use self::format::GLOBAL_SCRIPT_CONTENTS;
21use self::{cancel_on_drop::CancelOnDrop, html_stream::HtmlStream};
22
23type Id = ulid::Ulid;
24
25pub fn new() -> (SuspenseContext, SuspendedResponse) {
28 new_with_opts(ColumboOptions::default())
29}
30
31#[instrument(name = "columbo::new", skip_all)]
35pub fn new_with_opts(
36 options: ColumboOptions,
37) -> (SuspenseContext, SuspendedResponse) {
38 let (tx, rx) = mpsc::unbounded_channel();
39 let cancel = CancellationToken::new();
40 let opts = Arc::new(options);
41
42 debug!("created new suspense context and response");
43 (
44 SuspenseContext {
45 tx,
46 opts: opts.clone(),
47 cancel: cancel.clone(),
48 },
49 SuspendedResponse {
50 rx,
51 opts,
52 cancel: CancelOnDrop::new(cancel),
53 },
54 )
55}
56
57#[derive(Clone)]
59pub struct SuspenseContext {
60 tx: mpsc::UnboundedSender<Html>,
61 opts: Arc<ColumboOptions>,
62 cancel: CancellationToken,
63}
64
65impl SuspenseContext {
66 fn new_id() -> Id { ulid::Ulid::new() }
67
68 #[instrument(name = "columbo::suspend", skip_all, fields(suspense.id))]
80 pub fn suspend<Fut, M>(
81 &self,
82 fut: Fut,
83 placeholder: impl Into<Html>,
84 ) -> Suspense
85 where
86 Fut: Future<Output = M> + Send + 'static,
87 M: Into<Html> + 'static,
88 {
89 let id = Self::new_id();
90 Span::current().record("suspense.id", id.to_string());
91
92 tokio::spawn(
93 self
94 .clone()
95 .run_suspended(id, fut)
96 .instrument(tracing::info_span!("columbo::suspended_task")),
97 );
98
99 Suspense::new(id, placeholder.into())
100 }
101
102 pub async fn cancelled(&self) { self.cancel.cancelled().await; }
108
109 pub fn is_cancelled(&self) -> bool { self.cancel.is_cancelled() }
116
117 async fn run_suspended<Fut, M>(self, id: Id, future: Fut)
118 where
119 Fut: Future<Output = M> + Send + 'static,
120 M: Into<Html>,
121 {
122 let auto_cancel = self.opts.auto_cancel.unwrap_or(false);
123
124 let future = AssertUnwindSafe(future).catch_unwind();
126 let result = if auto_cancel {
128 tokio::select! {
129 _ = self.cancel.cancelled() => {
130 trace!(suspense.id = %id, "task exited via auto_cancel");
131 return; }
133 result = future => result,
134 }
135 } else {
136 future.await
137 };
138
139 let panic_handler = self
141 .opts
142 .panic_renderer
143 .unwrap_or(crate::format::default_panic_renderer);
144 let content: Html = match result {
145 Ok(m) => m.into(),
146 Err(panic_payload) => {
147 warn!(suspense.id = %id, "suspended task panicked; rendering panic");
148 panic_handler(panic_payload)
149 }
150 };
151
152 let payload = format::render_replacement(&id, &content);
154
155 let _ = self.tx.send(payload).inspect_err(|_| {
156 trace!(suspense.id = %id, "future completed but receiver is dropped");
157 });
158 }
159}
160
161#[derive(Clone, Debug, Default)]
163pub struct ColumboOptions {
164 pub panic_renderer: Option<fn(Box<dyn Any + Send>) -> Html>,
166 pub auto_cancel: Option<bool>,
169 pub include_script: Option<bool>,
173}
174
175pub struct SuspendedResponse {
178 rx: mpsc::UnboundedReceiver<Html>,
179 opts: Arc<ColumboOptions>,
180 cancel: CancelOnDrop,
181}
182
183impl SuspendedResponse {
184 #[instrument(name = "columbo::into_stream", skip_all)]
186 pub fn into_stream(self, body: impl Into<Html>) -> HtmlStream {
187 debug!("converting suspended response into stream");
188 HtmlStream::new(self, body.into())
189 }
190}
191
192pub struct Suspense {
194 id: Id,
195 placeholder: Html,
196}
197
198impl fmt::Debug for Suspense {
199 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
200 f.debug_struct("Suspense").field("id", &self.id).finish()
201 }
202}
203
204impl Suspense {
205 fn new(id: Id, placeholder: Html) -> Self { Suspense { id, placeholder } }
206
207 pub fn render_to_html(&self) -> Html {
209 format::render_placeholder(&self.id, &self.placeholder)
210 }
211}
212
213impl fmt::Display for Suspense {
214 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
215 f.write_str(self.render_to_html().as_str())
216 }
217}
218
219#[cfg(feature = "maud")]
220impl maud::Render for Suspense {
221 fn render(&self) -> maud::Markup {
222 maud::PreEscaped(self.render_to_html().into_string())
223 }
224}