Skip to main content

dynomite/runtime/
sidejob.rs

1//! Bounded-mailbox actor wrapper, modelled on Riak's `sidejob` library.
2//!
3//! A [`Sidejob`] owns a tokio task that drains a fixed-capacity mpsc
4//! channel and runs a user-supplied async handler against each
5//! request. The capacity is the back-pressure knob: when callers
6//! [`Sidejob::submit`] requests faster than the handler can complete
7//! them, the channel saturates and new submissions return
8//! [`SidejobError::Overloaded`] without blocking the caller.
9//!
10//! The handler runs serially inside the actor, so it sees one request
11//! at a time. Each request is handed to its handler-future inside a
12//! `tokio::spawn` so a panic in user code only kills the per-request
13//! task: the actor loop keeps draining the mailbox and subsequent
14//! submits succeed normally. Callers waiting on a panicked request
15//! observe [`SidejobError::Stopped`] when the reply channel is
16//! dropped.
17//!
18//! # Examples
19//!
20//! ```
21//! use dynomite::runtime::{Sidejob, SidejobError};
22//! # tokio::runtime::Builder::new_current_thread()
23//! #     .enable_all()
24//! #     .build()
25//! #     .unwrap()
26//! #     .block_on(async {
27//! let job: Sidejob<u32, u32> =
28//!     Sidejob::spawn("doubler", 4, |n| async move { n * 2 });
29//! assert_eq!(job.submit(21).await.unwrap(), 42);
30//! # let _: Result<u32, SidejobError> = job.submit(0).await;
31//! # });
32//! ```
33
34use std::future::Future;
35use std::sync::atomic::{AtomicU64, Ordering};
36use std::sync::Arc;
37
38use thiserror::Error;
39use tokio::sync::{mpsc, oneshot};
40
41use crate::runtime::metrics;
42
43/// Reasons a submit may fail.
44#[derive(Debug, Error, PartialEq, Eq)]
45pub enum SidejobError {
46    /// The mailbox is full. Drop the request and let the caller
47    /// retry against another stage or report a 503-equivalent.
48    #[error("sidejob mailbox is full")]
49    Overloaded,
50    /// The actor task is no longer running, so no further requests
51    /// can be served. This also covers the case where a handler
52    /// panicked on a specific request and dropped the reply channel.
53    #[error("sidejob actor has stopped")]
54    Stopped,
55}
56
57/// Handle to a sidejob actor.
58///
59/// `Sidejob` is cheap to clone: clones share the same underlying
60/// channel and overload counter, so spreading the handle across
61/// dispatcher tasks does not change the back-pressure semantics.
62pub struct Sidejob<Req, Reply> {
63    /// Static name used as the `name` label on
64    /// `sidejob_overload_total`.
65    name: &'static str,
66    /// Sender half of the mailbox. The receiver lives inside the
67    /// spawned actor task. When all senders are dropped, the actor
68    /// loop exits cleanly.
69    tx: mpsc::Sender<(Req, oneshot::Sender<Reply>)>,
70    /// Process-local counter of `Overloaded` rejections. The
71    /// Prometheus counter is the cluster-wide rollup; this field
72    /// gives unit tests a deterministic value to assert against.
73    full_failures: Arc<AtomicU64>,
74}
75
76impl<Req, Reply> Clone for Sidejob<Req, Reply> {
77    fn clone(&self) -> Self {
78        Self {
79            name: self.name,
80            tx: self.tx.clone(),
81            full_failures: Arc::clone(&self.full_failures),
82        }
83    }
84}
85
86impl<Req, Reply> Sidejob<Req, Reply>
87where
88    Req: Send + 'static,
89    Reply: Send + 'static,
90{
91    /// Spawn a new sidejob actor.
92    ///
93    /// `name` is recorded as the `name` label on
94    /// `sidejob_overload_total` and used in tracing log lines.
95    /// `capacity` is the maximum number of in-flight requests the
96    /// mailbox will hold before it starts rejecting submits with
97    /// [`SidejobError::Overloaded`].
98    ///
99    /// The handler is `FnMut` so callers may close over mutable
100    /// state; because the actor loop is serial, the handler is
101    /// called against one request at a time.
102    ///
103    /// # Panics
104    ///
105    /// Panics if `capacity` is zero. A zero-capacity sidejob would
106    /// reject every submit and is almost certainly a configuration
107    /// mistake.
108    pub fn spawn<F, Fut>(name: &'static str, capacity: usize, mut handler: F) -> Self
109    where
110        F: FnMut(Req) -> Fut + Send + 'static,
111        Fut: Future<Output = Reply> + Send + 'static,
112    {
113        assert!(capacity > 0, "sidejob capacity must be > 0");
114        // Touch the metric family eagerly so the first overload
115        // event does not have to acquire the registry lock under
116        // contention.
117        let _ = metrics::sidejob_overload().with_label_values(&[name]);
118
119        let (tx, mut rx) = mpsc::channel::<(Req, oneshot::Sender<Reply>)>(capacity);
120        tokio::spawn(async move {
121            while let Some((req, reply_tx)) = rx.recv().await {
122                let fut = handler(req);
123                // Run the handler future inside its own task so a
124                // panic only kills the per-request task. The actor
125                // loop keeps running and subsequent submits succeed.
126                match tokio::spawn(fut).await {
127                    Ok(reply) => {
128                        // Receiver may have given up; that is not a
129                        // sidejob bug, so we silently drop.
130                        let _ = reply_tx.send(reply);
131                    }
132                    Err(join_err) => {
133                        if join_err.is_panic() {
134                            tracing::warn!(
135                                sidejob = name,
136                                "handler panicked; reply channel dropped"
137                            );
138                        }
139                        // Dropping `reply_tx` surfaces
140                        // SidejobError::Stopped to the caller for
141                        // this one request, while the actor stays
142                        // alive for future requests.
143                        drop(reply_tx);
144                    }
145                }
146            }
147            tracing::debug!(sidejob = name, "actor loop exited (channel closed)");
148        });
149
150        Self {
151            name,
152            tx,
153            full_failures: Arc::new(AtomicU64::new(0)),
154        }
155    }
156
157    /// Submit a request and await the handler's reply.
158    ///
159    /// Returns [`SidejobError::Overloaded`] immediately if the
160    /// mailbox is full, or [`SidejobError::Stopped`] if the actor
161    /// (or this specific request's per-request task) has gone away
162    /// before producing a reply.
163    pub async fn submit(&self, req: Req) -> Result<Reply, SidejobError> {
164        let rx = self.try_submit(req)?;
165        rx.await.map_err(|_| SidejobError::Stopped)
166    }
167
168    /// Submit a request without awaiting the reply.
169    ///
170    /// On success the caller receives the [`oneshot::Receiver`] for
171    /// the reply and may await it (or drop it) at leisure. Like
172    /// [`Sidejob::submit`], a full mailbox returns
173    /// [`SidejobError::Overloaded`] immediately.
174    pub fn try_submit(&self, req: Req) -> Result<oneshot::Receiver<Reply>, SidejobError> {
175        let (reply_tx, reply_rx) = oneshot::channel();
176        match self.tx.try_send((req, reply_tx)) {
177            Ok(()) => Ok(reply_rx),
178            Err(mpsc::error::TrySendError::Full(_)) => {
179                self.full_failures.fetch_add(1, Ordering::Relaxed);
180                metrics::sidejob_overload()
181                    .with_label_values(&[self.name])
182                    .inc();
183                Err(SidejobError::Overloaded)
184            }
185            Err(mpsc::error::TrySendError::Closed(_)) => Err(SidejobError::Stopped),
186        }
187    }
188
189    /// Number of submits this handle has observed being rejected
190    /// with [`SidejobError::Overloaded`] over its entire lifetime.
191    /// Cloned handles share the same counter.
192    pub fn full_failures(&self) -> u64 {
193        self.full_failures.load(Ordering::Relaxed)
194    }
195
196    /// Static name used for metric labels.
197    pub fn name(&self) -> &'static str {
198        self.name
199    }
200}
201
202impl<Req, Reply> std::fmt::Debug for Sidejob<Req, Reply> {
203    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
204        f.debug_struct("Sidejob")
205            .field("name", &self.name)
206            .field("capacity", &self.tx.max_capacity())
207            .field("full_failures", &self.full_failures.load(Ordering::Relaxed))
208            .finish()
209    }
210}