Skip to main content

columbo/
lib.rs

1#![doc = include_str!("../README.md")]
2
3mod cancel_on_drop;
4mod format;
5mod html;
6mod html_stream;
7mod resp_adapter;
8
9#[cfg(test)]
10mod tests;
11
12use std::{any::Any, fmt, panic::AssertUnwindSafe, sync::Arc};
13
14use futures::FutureExt;
15pub use html::Html;
16use tokio::sync::mpsc;
17use tokio_util::sync::CancellationToken;
18use tracing::{Instrument, Span, debug, instrument, trace, warn};
19
20pub use self::format::GLOBAL_SCRIPT_CONTENTS;
21use self::{cancel_on_drop::CancelOnDrop, html_stream::HtmlStream};
22
23type Id = ulid::Ulid;
24
25/// Creates a new [`SuspenseContext`] and [`SuspendedResponse`]. The context is
26/// for suspending futures, and the response turns into an output stream.
27pub fn new() -> (SuspenseContext, SuspendedResponse) {
28  new_with_opts(ColumboOptions::default())
29}
30
31/// Creates a new [`SuspenseContext`] and [`SuspendedResponse`], with the given
32/// [`ColumboOptions`]. The context is for suspending futures, and the response
33/// turns into an output stream.
34#[instrument(name = "columbo::new", skip_all)]
35pub fn new_with_opts(
36  options: ColumboOptions,
37) -> (SuspenseContext, SuspendedResponse) {
38  let (tx, rx) = mpsc::unbounded_channel();
39  let cancel = CancellationToken::new();
40  let opts = Arc::new(options);
41
42  debug!("created new suspense context and response");
43  (
44    SuspenseContext {
45      tx,
46      opts: opts.clone(),
47      cancel: cancel.clone(),
48    },
49    SuspendedResponse {
50      rx,
51      opts,
52      cancel: CancelOnDrop::new(cancel),
53    },
54  )
55}
56
57/// The context with which you can create suspense boundaries for futures.
58#[derive(Clone)]
59pub struct SuspenseContext {
60  tx:     mpsc::UnboundedSender<Html>,
61  opts:   Arc<ColumboOptions>,
62  cancel: CancellationToken,
63}
64
65impl SuspenseContext {
66  fn new_id() -> Id { ulid::Ulid::new() }
67
68  /// Suspends async work and streams the result. The placeholder is sent
69  /// immediately, while the future output is streamed and replaces the
70  /// placeholder in the browser.
71  ///
72  /// The future can return any type that implements [`Into<Html>`], including
73  /// `String`, `&str`, or types like `maud::Markup`.
74  ///
75  /// To spawn nested suspensions from within the future, clone [`self`] before
76  /// the `async` block and capture it by move.
77  ///
78  /// Suspended futures must be `Send` because they are handed to `tokio`.
79  #[instrument(name = "columbo::suspend", skip_all, fields(suspense.id))]
80  pub fn suspend<Fut, M>(
81    &self,
82    fut: Fut,
83    placeholder: impl Into<Html>,
84  ) -> Suspense
85  where
86    Fut: Future<Output = M> + Send + 'static,
87    M: Into<Html> + 'static,
88  {
89    let id = Self::new_id();
90    Span::current().record("suspense.id", id.to_string());
91
92    tokio::spawn(
93      self
94        .clone()
95        .run_suspended(id, fut)
96        .instrument(tracing::info_span!("columbo::suspended_task")),
97    );
98
99    Suspense::new(id, placeholder.into())
100  }
101
102  /// Yields if [`SuspendedResponse`] or the resulting stream type is dropped.
103  ///
104  /// Useful for exiting from suspended futures that should stop if the
105  /// connection is dropped. Suspended futures are not aborted otherwise, so
106  /// they will continue to execute if you don't listen for cancellation.
107  pub async fn cancelled(&self) { self.cancel.cancelled().await; }
108
109  /// Returns true if [`SuspendedResponse`] or the resulting stream type is
110  /// dropped.
111  ///
112  /// Useful for exiting from suspended futures that should stop if the
113  /// connection is dropped. Suspended futures are not aborted otherwise, so
114  /// they will continue to execute if you don't listen for cancellation.
115  pub fn is_cancelled(&self) -> bool { self.cancel.is_cancelled() }
116
117  async fn run_suspended<Fut, M>(self, id: Id, future: Fut)
118  where
119    Fut: Future<Output = M> + Send + 'static,
120    M: Into<Html>,
121  {
122    let auto_cancel = self.opts.auto_cancel.unwrap_or(false);
123
124    // catch panics in future
125    let future = AssertUnwindSafe(future).catch_unwind();
126    // race the future against the cancellation token
127    let result = if auto_cancel {
128      tokio::select! {
129        _ = self.cancel.cancelled() => {
130          trace!(suspense.id = %id, "task exited via auto_cancel");
131          return; // exit immediately; nothing to send
132        }
133        result = future => result,
134      }
135    } else {
136      future.await
137    };
138
139    // determine what to swap in
140    let panic_handler = self
141      .opts
142      .panic_renderer
143      .unwrap_or(crate::format::default_panic_renderer);
144    let content: Html = match result {
145      Ok(m) => m.into(),
146      Err(panic_payload) => {
147        warn!(suspense.id = %id, "suspended task panicked; rendering panic");
148        panic_handler(panic_payload)
149      }
150    };
151
152    // render the wrapper
153    let payload = format::render_replacement(&id, &content);
154
155    let _ = self.tx.send(payload).inspect_err(|_| {
156      trace!(suspense.id = %id, "future completed but receiver is dropped");
157    });
158  }
159}
160
161/// Options for configuring `columbo` suspense.
162#[derive(Clone, Debug, Default)]
163pub struct ColumboOptions {
164  /// Renders a panic fallback given the panic object.
165  pub panic_renderer: Option<fn(Box<dyn Any + Send>) -> Html>,
166  /// Whether to automatically cancel suspended futures at the next await bound
167  /// when the response is dropped. Defaults to false.
168  pub auto_cancel:    Option<bool>,
169  /// Whether to include the replacement script in the stream. If true, it will
170  /// be included after the document and before the replacements. Defaults to
171  /// true.
172  pub include_script: Option<bool>,
173}
174
175/// Contains suspended results. Can be turned into a byte stream with a
176/// prepended document.
177pub struct SuspendedResponse {
178  rx:     mpsc::UnboundedReceiver<Html>,
179  opts:   Arc<ColumboOptions>,
180  cancel: CancelOnDrop,
181}
182
183impl SuspendedResponse {
184  /// Turns the `SuspendedResponse` into a stream for sending as a response.
185  #[instrument(name = "columbo::into_stream", skip_all)]
186  pub fn into_stream(self, body: impl Into<Html>) -> HtmlStream {
187    debug!("converting suspended response into stream");
188    HtmlStream::new(self, body.into())
189  }
190}
191
192/// A suspended future. Can be interpolated into markup as the placeholder.
193pub struct Suspense {
194  id:          Id,
195  placeholder: Html,
196}
197
198impl fmt::Debug for Suspense {
199  fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
200    f.debug_struct("Suspense").field("id", &self.id).finish()
201  }
202}
203
204impl Suspense {
205  fn new(id: Id, placeholder: Html) -> Self { Suspense { id, placeholder } }
206
207  /// Render the placeholder HTML.
208  pub fn render_to_html(&self) -> Html {
209    format::render_placeholder(&self.id, &self.placeholder)
210  }
211}
212
213impl fmt::Display for Suspense {
214  fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
215    f.write_str(self.render_to_html().as_str())
216  }
217}
218
219#[cfg(feature = "maud")]
220impl maud::Render for Suspense {
221  fn render(&self) -> maud::Markup {
222    maud::PreEscaped(self.render_to_html().into_string())
223  }
224}