Skip to main content

columbo/
lib.rs

1#![doc = include_str!("../README.md")]
2
3mod cancel_on_drop;
4mod format;
5mod html;
6mod html_stream;
7mod resp_adapter;
8
9#[cfg(test)]
10mod tests;
11
12use std::{
13  any::Any,
14  fmt,
15  panic::AssertUnwindSafe,
16  sync::{
17    Arc,
18    atomic::{AtomicUsize, Ordering},
19  },
20};
21
22use futures::FutureExt;
23pub use html::Html;
24use tokio::sync::mpsc;
25use tokio_util::sync::CancellationToken;
26use tracing::{Instrument, Span, debug, instrument, trace, warn};
27
28pub use self::format::GLOBAL_SCRIPT_CONTENTS;
29use self::{cancel_on_drop::CancelOnDrop, html_stream::HtmlStream};
30
31type Id = usize;
32
33/// Creates a new [`SuspenseContext`] and [`SuspendedResponse`]. The context is
34/// for suspending futures, and the response turns into an output stream.
35pub fn new() -> (SuspenseContext, SuspendedResponse) {
36  new_with_opts(ColumboOptions::default())
37}
38
39/// Creates a new [`SuspenseContext`] and [`SuspendedResponse`], with the given
40/// [`ColumboOptions`]. The context is for suspending futures, and the response
41/// turns into an output stream.
42#[instrument(name = "columbo::new", skip_all)]
43pub fn new_with_opts(
44  options: ColumboOptions,
45) -> (SuspenseContext, SuspendedResponse) {
46  let (tx, rx) = mpsc::unbounded_channel();
47  let cancel = CancellationToken::new();
48  let opts = Arc::new(options);
49
50  debug!("created new suspense context and response");
51  (
52    SuspenseContext {
53      next_id: Arc::new(AtomicUsize::new(0)),
54      tx,
55      opts: opts.clone(),
56      cancel: cancel.clone(),
57    },
58    SuspendedResponse {
59      rx,
60      opts,
61      cancel: CancelOnDrop::new(cancel),
62    },
63  )
64}
65
66/// The context with which you can create suspense boundaries for futures.
67#[derive(Clone)]
68pub struct SuspenseContext {
69  next_id: Arc<AtomicUsize>,
70  tx:      mpsc::UnboundedSender<Html>,
71  opts:    Arc<ColumboOptions>,
72  cancel:  CancellationToken,
73}
74
75impl SuspenseContext {
76  fn new_id(&self) -> Id {
77    // IDs don't need to be sequential, only unique
78    self.next_id.fetch_add(1, Ordering::Relaxed)
79  }
80
81  /// Suspends async work and streams the result. The placeholder is sent
82  /// immediately, while the future output is streamed and replaces the
83  /// placeholder in the browser.
84  ///
85  /// The future can return any type that implements [`Into<Html>`], including
86  /// `String`, `&str`, or types like `maud::Markup`.
87  ///
88  /// To spawn nested suspensions from within the future, clone [`self`] before
89  /// the `async` block and capture it by move.
90  ///
91  /// Suspended futures must be `Send` because they are handed to `tokio`.
92  #[instrument(name = "columbo::suspend", skip_all, fields(suspense.id))]
93  pub fn suspend<Fut, M>(
94    &self,
95    fut: Fut,
96    placeholder: impl Into<Html>,
97  ) -> Suspense
98  where
99    Fut: Future<Output = M> + Send + 'static,
100    M: Into<Html> + 'static,
101  {
102    let id = self.new_id();
103    Span::current().record("suspense.id", id.to_string());
104
105    tokio::spawn(
106      self
107        .clone()
108        .run_suspended(id, fut)
109        .instrument(tracing::info_span!("columbo::suspended_task")),
110    );
111
112    Suspense::new(id, placeholder.into())
113  }
114
115  /// Yields if [`SuspendedResponse`] or the resulting stream type is dropped.
116  ///
117  /// Useful for exiting from suspended futures that should stop if the
118  /// connection is dropped. Suspended futures are not aborted otherwise, so
119  /// they will continue to execute if you don't listen for cancellation.
120  pub async fn cancelled(&self) { self.cancel.cancelled().await; }
121
122  /// Returns true if [`SuspendedResponse`] or the resulting stream type is
123  /// dropped.
124  ///
125  /// Useful for exiting from suspended futures that should stop if the
126  /// connection is dropped. Suspended futures are not aborted otherwise, so
127  /// they will continue to execute if you don't listen for cancellation.
128  pub fn is_cancelled(&self) -> bool { self.cancel.is_cancelled() }
129
130  async fn run_suspended<Fut, M>(self, id: Id, future: Fut)
131  where
132    Fut: Future<Output = M> + Send + 'static,
133    M: Into<Html>,
134  {
135    let auto_cancel = self.opts.auto_cancel.unwrap_or(false);
136
137    // catch panics in future
138    let future = AssertUnwindSafe(future).catch_unwind();
139    // race the future against the cancellation token
140    let result = if auto_cancel {
141      tokio::select! {
142        _ = self.cancel.cancelled() => {
143          trace!(suspense.id = %id, "task exited via auto_cancel");
144          return; // exit immediately; nothing to send
145        }
146        result = future => result,
147      }
148    } else {
149      future.await
150    };
151
152    // determine what to swap in
153    let panic_handler = self
154      .opts
155      .panic_renderer
156      .unwrap_or(crate::format::default_panic_renderer);
157    let content: Html = match result {
158      Ok(m) => m.into(),
159      Err(panic_payload) => {
160        warn!(suspense.id = %id, "suspended task panicked; rendering panic");
161        panic_handler(panic_payload)
162      }
163    };
164
165    // render the wrapper
166    let payload = format::render_replacement(&id, &content);
167
168    let _ = self.tx.send(payload).inspect_err(|_| {
169      trace!(suspense.id = %id, "future completed but receiver is dropped");
170    });
171  }
172}
173
174/// Options for configuring `columbo` suspense.
175#[derive(Clone, Debug, Default)]
176pub struct ColumboOptions {
177  /// Renders a panic fallback given the panic object.
178  pub panic_renderer: Option<fn(Box<dyn Any + Send>) -> Html>,
179  /// Whether to automatically cancel suspended futures at the next await bound
180  /// when the response is dropped. Defaults to false.
181  pub auto_cancel:    Option<bool>,
182  /// Whether to include the replacement script in the stream. If true, it will
183  /// be included after the document and before the replacements. Defaults to
184  /// true.
185  pub include_script: Option<bool>,
186}
187
188/// Contains suspended results. Can be turned into a byte stream with a
189/// prepended document.
190pub struct SuspendedResponse {
191  rx:     mpsc::UnboundedReceiver<Html>,
192  opts:   Arc<ColumboOptions>,
193  cancel: CancelOnDrop,
194}
195
196impl SuspendedResponse {
197  /// Turns the `SuspendedResponse` into a stream for sending as a response.
198  #[instrument(name = "columbo::into_stream", skip_all)]
199  pub fn into_stream(self, body: impl Into<Html>) -> HtmlStream {
200    debug!("converting suspended response into stream");
201    HtmlStream::new(self, body.into())
202  }
203}
204
205/// A suspended future. Can be interpolated into markup as the placeholder.
206pub struct Suspense {
207  id:          Id,
208  placeholder: Html,
209}
210
211impl fmt::Debug for Suspense {
212  fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
213    f.debug_struct("Suspense").field("id", &self.id).finish()
214  }
215}
216
217impl Suspense {
218  fn new(id: Id, placeholder: Html) -> Self { Suspense { id, placeholder } }
219
220  /// Render the placeholder HTML.
221  pub fn render_to_html(&self) -> Html {
222    format::render_placeholder(&self.id, &self.placeholder)
223  }
224}
225
226impl fmt::Display for Suspense {
227  fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
228    f.write_str(self.render_to_html().as_str())
229  }
230}
231
232#[cfg(feature = "maud")]
233impl maud::Render for Suspense {
234  fn render(&self) -> maud::Markup {
235    maud::PreEscaped(self.render_to_html().into_string())
236  }
237}