1#![doc = include_str!("../README.md")]
2
3mod cancel_on_drop;
4mod format;
5mod html;
6mod html_stream;
7mod resp_adapter;
8
9#[cfg(test)]
10mod tests;
11
12use std::{
13 any::Any,
14 fmt,
15 panic::AssertUnwindSafe,
16 sync::{
17 Arc,
18 atomic::{AtomicUsize, Ordering},
19 },
20};
21
22use futures::FutureExt;
23pub use html::Html;
24use tokio::sync::mpsc;
25use tokio_util::sync::CancellationToken;
26use tracing::{Instrument, Span, debug, instrument, trace, warn};
27
28pub use self::format::GLOBAL_SCRIPT_CONTENTS;
29use self::{cancel_on_drop::CancelOnDrop, html_stream::HtmlStream};
30
31type Id = usize;
32
33pub fn new() -> (SuspenseContext, SuspendedResponse) {
36 new_with_opts(ColumboOptions::default())
37}
38
39#[instrument(name = "columbo::new", skip_all)]
43pub fn new_with_opts(
44 options: ColumboOptions,
45) -> (SuspenseContext, SuspendedResponse) {
46 let (tx, rx) = mpsc::unbounded_channel();
47 let cancel = CancellationToken::new();
48 let opts = Arc::new(options);
49
50 debug!("created new suspense context and response");
51 (
52 SuspenseContext {
53 next_id: Arc::new(AtomicUsize::new(0)),
54 tx,
55 opts: opts.clone(),
56 cancel: cancel.clone(),
57 },
58 SuspendedResponse {
59 rx,
60 opts,
61 cancel: CancelOnDrop::new(cancel),
62 },
63 )
64}
65
66#[derive(Clone)]
68pub struct SuspenseContext {
69 next_id: Arc<AtomicUsize>,
70 tx: mpsc::UnboundedSender<Html>,
71 opts: Arc<ColumboOptions>,
72 cancel: CancellationToken,
73}
74
75impl SuspenseContext {
76 fn new_id(&self) -> Id {
77 self.next_id.fetch_add(1, Ordering::Relaxed)
79 }
80
81 #[instrument(name = "columbo::suspend", skip_all, fields(suspense.id))]
93 pub fn suspend<Fut, M>(
94 &self,
95 fut: Fut,
96 placeholder: impl Into<Html>,
97 ) -> Suspense
98 where
99 Fut: Future<Output = M> + Send + 'static,
100 M: Into<Html> + 'static,
101 {
102 let id = self.new_id();
103 Span::current().record("suspense.id", id.to_string());
104
105 tokio::spawn(
106 self
107 .clone()
108 .run_suspended(id, fut)
109 .instrument(tracing::info_span!("columbo::suspended_task")),
110 );
111
112 Suspense::new(id, placeholder.into())
113 }
114
115 pub async fn cancelled(&self) { self.cancel.cancelled().await; }
121
122 pub fn is_cancelled(&self) -> bool { self.cancel.is_cancelled() }
129
130 async fn run_suspended<Fut, M>(self, id: Id, future: Fut)
131 where
132 Fut: Future<Output = M> + Send + 'static,
133 M: Into<Html>,
134 {
135 let auto_cancel = self.opts.auto_cancel.unwrap_or(false);
136
137 let future = AssertUnwindSafe(future).catch_unwind();
139 let result = if auto_cancel {
141 tokio::select! {
142 _ = self.cancel.cancelled() => {
143 trace!(suspense.id = %id, "task exited via auto_cancel");
144 return; }
146 result = future => result,
147 }
148 } else {
149 future.await
150 };
151
152 let panic_handler = self
154 .opts
155 .panic_renderer
156 .unwrap_or(crate::format::default_panic_renderer);
157 let content: Html = match result {
158 Ok(m) => m.into(),
159 Err(panic_payload) => {
160 warn!(suspense.id = %id, "suspended task panicked; rendering panic");
161 panic_handler(panic_payload)
162 }
163 };
164
165 let payload = format::render_replacement(&id, &content);
167
168 let _ = self.tx.send(payload).inspect_err(|_| {
169 trace!(suspense.id = %id, "future completed but receiver is dropped");
170 });
171 }
172}
173
174#[derive(Clone, Debug, Default)]
176pub struct ColumboOptions {
177 pub panic_renderer: Option<fn(Box<dyn Any + Send>) -> Html>,
179 pub auto_cancel: Option<bool>,
182 pub include_script: Option<bool>,
186}
187
188pub struct SuspendedResponse {
191 rx: mpsc::UnboundedReceiver<Html>,
192 opts: Arc<ColumboOptions>,
193 cancel: CancelOnDrop,
194}
195
196impl SuspendedResponse {
197 #[instrument(name = "columbo::into_stream", skip_all)]
199 pub fn into_stream(self, body: impl Into<Html>) -> HtmlStream {
200 debug!("converting suspended response into stream");
201 HtmlStream::new(self, body.into())
202 }
203}
204
205pub struct Suspense {
207 id: Id,
208 placeholder: Html,
209}
210
211impl fmt::Debug for Suspense {
212 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
213 f.debug_struct("Suspense").field("id", &self.id).finish()
214 }
215}
216
217impl Suspense {
218 fn new(id: Id, placeholder: Html) -> Self { Suspense { id, placeholder } }
219
220 pub fn render_to_html(&self) -> Html {
222 format::render_placeholder(&self.id, &self.placeholder)
223 }
224}
225
226impl fmt::Display for Suspense {
227 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
228 f.write_str(self.render_to_html().as_str())
229 }
230}
231
232#[cfg(feature = "maud")]
233impl maud::Render for Suspense {
234 fn render(&self) -> maud::Markup {
235 maud::PreEscaped(self.render_to_html().into_string())
236 }
237}