dynomite/runtime/sidejob.rs
1//! Bounded-mailbox actor wrapper, modelled on Riak's `sidejob` library.
2//!
3//! A [`Sidejob`] owns a tokio task that drains a fixed-capacity mpsc
4//! channel and runs a user-supplied async handler against each
5//! request. The capacity is the back-pressure knob: when callers
6//! [`Sidejob::submit`] requests faster than the handler can complete
7//! them, the channel saturates and new submissions return
8//! [`SidejobError::Overloaded`] without blocking the caller.
9//!
10//! The handler runs serially inside the actor, so it sees one request
11//! at a time. Each request is handed to its handler-future inside a
12//! `tokio::spawn` so a panic in user code only kills the per-request
13//! task: the actor loop keeps draining the mailbox and subsequent
14//! submits succeed normally. Callers waiting on a panicked request
15//! observe [`SidejobError::Stopped`] when the reply channel is
16//! dropped.
17//!
18//! # Examples
19//!
20//! ```
21//! use dynomite::runtime::{Sidejob, SidejobError};
22//! # tokio::runtime::Builder::new_current_thread()
23//! # .enable_all()
24//! # .build()
25//! # .unwrap()
26//! # .block_on(async {
27//! let job: Sidejob<u32, u32> =
28//! Sidejob::spawn("doubler", 4, |n| async move { n * 2 });
29//! assert_eq!(job.submit(21).await.unwrap(), 42);
30//! # let _: Result<u32, SidejobError> = job.submit(0).await;
31//! # });
32//! ```
33
34use std::future::Future;
35use std::sync::atomic::{AtomicU64, Ordering};
36use std::sync::Arc;
37
38use thiserror::Error;
39use tokio::sync::{mpsc, oneshot};
40
41use crate::runtime::metrics;
42
43/// Reasons a submit may fail.
44#[derive(Debug, Error, PartialEq, Eq)]
45pub enum SidejobError {
46 /// The mailbox is full. Drop the request and let the caller
47 /// retry against another stage or report a 503-equivalent.
48 #[error("sidejob mailbox is full")]
49 Overloaded,
50 /// The actor task is no longer running, so no further requests
51 /// can be served. This also covers the case where a handler
52 /// panicked on a specific request and dropped the reply channel.
53 #[error("sidejob actor has stopped")]
54 Stopped,
55}
56
57/// Handle to a sidejob actor.
58///
59/// `Sidejob` is cheap to clone: clones share the same underlying
60/// channel and overload counter, so spreading the handle across
61/// dispatcher tasks does not change the back-pressure semantics.
62pub struct Sidejob<Req, Reply> {
63 /// Static name used as the `name` label on
64 /// `sidejob_overload_total`.
65 name: &'static str,
66 /// Sender half of the mailbox. The receiver lives inside the
67 /// spawned actor task. When all senders are dropped, the actor
68 /// loop exits cleanly.
69 tx: mpsc::Sender<(Req, oneshot::Sender<Reply>)>,
70 /// Process-local counter of `Overloaded` rejections. The
71 /// Prometheus counter is the cluster-wide rollup; this field
72 /// gives unit tests a deterministic value to assert against.
73 full_failures: Arc<AtomicU64>,
74}
75
76impl<Req, Reply> Clone for Sidejob<Req, Reply> {
77 fn clone(&self) -> Self {
78 Self {
79 name: self.name,
80 tx: self.tx.clone(),
81 full_failures: Arc::clone(&self.full_failures),
82 }
83 }
84}
85
86impl<Req, Reply> Sidejob<Req, Reply>
87where
88 Req: Send + 'static,
89 Reply: Send + 'static,
90{
91 /// Spawn a new sidejob actor.
92 ///
93 /// `name` is recorded as the `name` label on
94 /// `sidejob_overload_total` and used in tracing log lines.
95 /// `capacity` is the maximum number of in-flight requests the
96 /// mailbox will hold before it starts rejecting submits with
97 /// [`SidejobError::Overloaded`].
98 ///
99 /// The handler is `FnMut` so callers may close over mutable
100 /// state; because the actor loop is serial, the handler is
101 /// called against one request at a time.
102 ///
103 /// # Panics
104 ///
105 /// Panics if `capacity` is zero. A zero-capacity sidejob would
106 /// reject every submit and is almost certainly a configuration
107 /// mistake.
108 pub fn spawn<F, Fut>(name: &'static str, capacity: usize, mut handler: F) -> Self
109 where
110 F: FnMut(Req) -> Fut + Send + 'static,
111 Fut: Future<Output = Reply> + Send + 'static,
112 {
113 assert!(capacity > 0, "sidejob capacity must be > 0");
114 // Touch the metric family eagerly so the first overload
115 // event does not have to acquire the registry lock under
116 // contention.
117 let _ = metrics::sidejob_overload().with_label_values(&[name]);
118
119 let (tx, mut rx) = mpsc::channel::<(Req, oneshot::Sender<Reply>)>(capacity);
120 tokio::spawn(async move {
121 while let Some((req, reply_tx)) = rx.recv().await {
122 let fut = handler(req);
123 // Run the handler future inside its own task so a
124 // panic only kills the per-request task. The actor
125 // loop keeps running and subsequent submits succeed.
126 match tokio::spawn(fut).await {
127 Ok(reply) => {
128 // Receiver may have given up; that is not a
129 // sidejob bug, so we silently drop.
130 let _ = reply_tx.send(reply);
131 }
132 Err(join_err) => {
133 if join_err.is_panic() {
134 tracing::warn!(
135 sidejob = name,
136 "handler panicked; reply channel dropped"
137 );
138 }
139 // Dropping `reply_tx` surfaces
140 // SidejobError::Stopped to the caller for
141 // this one request, while the actor stays
142 // alive for future requests.
143 drop(reply_tx);
144 }
145 }
146 }
147 tracing::debug!(sidejob = name, "actor loop exited (channel closed)");
148 });
149
150 Self {
151 name,
152 tx,
153 full_failures: Arc::new(AtomicU64::new(0)),
154 }
155 }
156
157 /// Submit a request and await the handler's reply.
158 ///
159 /// Returns [`SidejobError::Overloaded`] immediately if the
160 /// mailbox is full, or [`SidejobError::Stopped`] if the actor
161 /// (or this specific request's per-request task) has gone away
162 /// before producing a reply.
163 pub async fn submit(&self, req: Req) -> Result<Reply, SidejobError> {
164 let rx = self.try_submit(req)?;
165 rx.await.map_err(|_| SidejobError::Stopped)
166 }
167
168 /// Submit a request without awaiting the reply.
169 ///
170 /// On success the caller receives the [`oneshot::Receiver`] for
171 /// the reply and may await it (or drop it) at leisure. Like
172 /// [`Sidejob::submit`], a full mailbox returns
173 /// [`SidejobError::Overloaded`] immediately.
174 pub fn try_submit(&self, req: Req) -> Result<oneshot::Receiver<Reply>, SidejobError> {
175 let (reply_tx, reply_rx) = oneshot::channel();
176 match self.tx.try_send((req, reply_tx)) {
177 Ok(()) => Ok(reply_rx),
178 Err(mpsc::error::TrySendError::Full(_)) => {
179 self.full_failures.fetch_add(1, Ordering::Relaxed);
180 metrics::sidejob_overload()
181 .with_label_values(&[self.name])
182 .inc();
183 Err(SidejobError::Overloaded)
184 }
185 Err(mpsc::error::TrySendError::Closed(_)) => Err(SidejobError::Stopped),
186 }
187 }
188
189 /// Number of submits this handle has observed being rejected
190 /// with [`SidejobError::Overloaded`] over its entire lifetime.
191 /// Cloned handles share the same counter.
192 pub fn full_failures(&self) -> u64 {
193 self.full_failures.load(Ordering::Relaxed)
194 }
195
196 /// Static name used for metric labels.
197 pub fn name(&self) -> &'static str {
198 self.name
199 }
200}
201
202impl<Req, Reply> std::fmt::Debug for Sidejob<Req, Reply> {
203 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
204 f.debug_struct("Sidejob")
205 .field("name", &self.name)
206 .field("capacity", &self.tx.max_capacity())
207 .field("full_failures", &self.full_failures.load(Ordering::Relaxed))
208 .finish()
209 }
210}