runtime_bridge/lib.rs
1// SPDX-License-Identifier: Apache-2.0
2//! Sync→async runtime bridge for Heddle.
3//!
4//! [`RuntimeBridge`] is a worker thread that owns a private current-thread
5//! Tokio runtime. Synchronous code hands futures to the worker over an
6//! async mpsc channel; the worker `tokio::spawn`s each future on its own
7//! runtime and the caller blocks on a per-request reply channel until the
8//! task completes. The caller's runtime (if any) is never re-entered.
9//!
10//! ## Why
11//!
12//! Heddle has several `ObjectStore` / `OpLogBackend` / `RefBackend`
13//! implementations whose trait surface is synchronous but whose underlying
14//! I/O is async (`aws-sdk-s3`, `sqlx`, etc.). A naive bridge —
15//! `Handle::current().block_on(...)` or
16//! `tokio::task::block_in_place(|| Handle::current().block_on(...))` —
17//! breaks in caller-flavor-dependent ways:
18//!
19//! * `Handle::current().block_on(...)` panics with "Cannot start a runtime
20//! from within a runtime" when the caller is already on a Tokio runtime.
21//! * `block_in_place(...)` panics with "can call blocking only when running
22//! on the multi-threaded runtime" when the caller is on a current-thread
23//! runtime (e.g. `#[tokio::test(flavor = "current_thread")]`).
24//! * Neither works at all when the caller is on a non-Tokio thread.
25//!
26//! Routing through this bridge sidesteps all three: the future runs on the
27//! bridge's private runtime regardless of who calls it, and the caller's
28//! thread simply blocks on a reply channel.
29//!
30//! ## Concurrency
31//!
32//! The worker dispatches each request via [`tokio::spawn`] rather than
33//! awaiting it inline, so concurrent callers sharing one bridge can
34//! progress in parallel on the worker's runtime. This preserves the
35//! connection-level parallelism of pools like `sqlx::PgPool` instead of
36//! head-of-line blocking every caller behind the slowest in-flight query.
37//!
38//! ## Error recovery
39//!
40//! [`RuntimeBridge::block_on`] returns [`Result<T, BridgeError>`] so a dead
41//! worker surfaces as a recoverable error in the caller's `Result`-typed
42//! API rather than escalating into a process-level panic. A bridged task
43//! that panics aborts only that task: its reply channel is dropped and the
44//! waiting caller observes [`BridgeError::ResponseLost`]; the worker keeps
45//! serving other requests.
46//!
47//! ## Shutdown
48//!
49//! Dropping the bridge drops the `Sender`; the worker's `Receiver::recv`
50//! then returns `None`, the loop breaks, and the runtime is dropped on
51//! the worker thread. The `JoinHandle` is retained on the bridge so the
52//! thread isn't reaped before its in-flight requests drain.
53
54use std::{fmt, future::Future, io, pin::Pin, sync::mpsc, thread};
55
56use tokio::sync::mpsc as tokio_mpsc;
57
58/// Worker thread + private current-thread Tokio runtime used to drive
59/// async work off the caller's runtime.
60///
61/// See the crate-level docs for the design rationale. Construct with
62/// [`RuntimeBridge::new`]; submit work with [`RuntimeBridge::block_on`].
63pub struct RuntimeBridge {
64 tx: tokio_mpsc::UnboundedSender<BridgedTask>,
65 _worker: thread::JoinHandle<()>,
66}
67
68type BridgedTask = Pin<Box<dyn Future<Output = ()> + Send + 'static>>;
69
70/// Errors returned by [`RuntimeBridge::block_on`] when the bridge cannot
71/// complete a request. Both variants signal an unrecoverable problem with
72/// the worker for this one call; the bridge as a whole remains usable for
73/// subsequent calls only when [`BridgeError::ResponseLost`] is returned
74/// (the worker is still alive, just this one task died).
75#[derive(Debug)]
76pub enum BridgeError {
77 /// The worker thread is gone — sending the task failed because the
78 /// receiver side of the dispatch channel was dropped. In practice this
79 /// means the worker thread panicked while building or driving its
80 /// runtime; every later call will return the same error.
81 WorkerDead,
82 /// The task was accepted but no reply ever arrived. The future panicked
83 /// mid-poll (so its reply sender was dropped without sending) or the
84 /// worker's runtime was shut down underneath an in-flight task. Other
85 /// in-flight requests on the same bridge are unaffected.
86 ResponseLost,
87}
88
89impl fmt::Display for BridgeError {
90 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
91 match self {
92 Self::WorkerDead => {
93 f.write_str("runtime bridge: worker thread terminated; dispatch channel closed")
94 }
95 Self::ResponseLost => f.write_str(
96 "runtime bridge: task accepted but no reply received (task panicked \
97 or runtime shut down mid-flight)",
98 ),
99 }
100 }
101}
102
103impl std::error::Error for BridgeError {}
104
105impl RuntimeBridge {
106 /// Spawn a worker thread with a private current-thread Tokio runtime.
107 ///
108 /// Returns the worker's `io::Error` if the OS refuses the thread spawn.
109 /// The thread name is `heddle-runtime-bridge` so it's identifiable in
110 /// stack traces and process listings; pick a more specific wrapper at
111 /// the call site if you need per-consumer naming.
112 pub fn new() -> io::Result<Self> {
113 Self::with_thread_name("heddle-runtime-bridge")
114 }
115
116 /// Same as [`RuntimeBridge::new`] but uses a custom worker thread name.
117 pub fn with_thread_name(thread_name: impl Into<String>) -> io::Result<Self> {
118 // tokio mpsc lets the worker `await` on `recv` so the executor
119 // stays scheduling spawned tasks between dispatches. A std mpsc
120 // would block the runtime thread on `recv`, defeating the spawn
121 // concurrency fix.
122 let (tx, mut rx) = tokio_mpsc::unbounded_channel::<BridgedTask>();
123 let worker = thread::Builder::new()
124 .name(thread_name.into())
125 .spawn(move || {
126 let runtime = tokio::runtime::Builder::new_current_thread()
127 .enable_all()
128 .build()
129 .expect("build heddle-runtime-bridge worker runtime");
130 runtime.block_on(async move {
131 // `tokio::spawn` runs each task concurrently on the
132 // current-thread runtime; the loop returns to `recv`
133 // immediately so subsequent callers don't queue behind
134 // the slowest in-flight task. Task `JoinHandle`s are
135 // dropped (tasks are detached); each task signals its
136 // caller through its private reply channel.
137 while let Some(task) = rx.recv().await {
138 tokio::spawn(task);
139 }
140 });
141 })?;
142 Ok(Self {
143 tx,
144 _worker: worker,
145 })
146 }
147
148 /// Run `future` on the worker's runtime and block the caller until it
149 /// completes, returning the future's output or a [`BridgeError`] when
150 /// the worker cannot deliver a reply.
151 ///
152 /// `future` must be `Send + 'static`; compose it from owned data
153 /// (`Arc` clones, owned `String` keys, serialized bodies), not borrows
154 /// of `&self`.
155 ///
156 /// ## Errors
157 ///
158 /// - [`BridgeError::WorkerDead`] when the worker thread has terminated
159 /// (its receive channel was dropped). Subsequent calls will keep
160 /// returning this error; the bridge cannot recover.
161 /// - [`BridgeError::ResponseLost`] when the worker accepted the task
162 /// but no reply arrived — the future panicked, or the worker's
163 /// runtime was dropped while the task was in flight. The bridge
164 /// itself remains usable; other in-flight callers are unaffected.
165 pub fn block_on<F, T>(&self, future: F) -> Result<T, BridgeError>
166 where
167 F: Future<Output = T> + Send + 'static,
168 T: Send + 'static,
169 {
170 // Capacity 1: each call sends exactly one value and then drops.
171 let (reply_tx, reply_rx) = mpsc::sync_channel::<T>(1);
172 let task: BridgedTask = Box::pin(async move {
173 let value = future.await;
174 // The receiver is held on the caller's stack until `recv`
175 // returns, so `send` is infallible in practice.
176 let _ = reply_tx.send(value);
177 });
178 self.tx.send(task).map_err(|_| BridgeError::WorkerDead)?;
179 reply_rx.recv().map_err(|_| BridgeError::ResponseLost)
180 }
181}
182
183#[cfg(test)]
184mod tests {
185 use std::{
186 sync::{
187 Arc, Barrier,
188 atomic::{AtomicUsize, Ordering},
189 },
190 time::{Duration, Instant},
191 };
192
193 use super::*;
194
195 #[test]
196 fn block_on_from_non_tokio_thread() {
197 let bridge = RuntimeBridge::new().expect("spawn bridge");
198 let value = bridge.block_on(async { 1 + 2 }).expect("ok");
199 assert_eq!(value, 3);
200 }
201
202 /// The whole point of this crate: a current-thread Tokio runtime can
203 /// drive sync work through the bridge without the
204 /// `tokio::task::block_in_place` panic that
205 /// `Handle::current().block_on(...)` would trigger.
206 #[tokio::test(flavor = "current_thread")]
207 async fn block_on_from_current_thread_runtime() {
208 let bridge = RuntimeBridge::new().expect("spawn bridge");
209 let value = bridge.block_on(async { "ok".to_string() }).expect("ok");
210 assert_eq!(value, "ok");
211 }
212
213 #[tokio::test(flavor = "multi_thread", worker_threads = 2)]
214 async fn block_on_from_multi_thread_runtime() {
215 let bridge = RuntimeBridge::new().expect("spawn bridge");
216 let value = bridge.block_on(async { 42_u64 }).expect("ok");
217 assert_eq!(value, 42);
218 }
219
220 /// Multiple sequential calls share one worker thread.
221 #[test]
222 fn block_on_sequential_calls() {
223 let bridge = Arc::new(RuntimeBridge::new().expect("spawn bridge"));
224 for i in 0..5 {
225 let got: u32 = bridge.block_on(async move { i * 2 }).expect("ok");
226 assert_eq!(got, i * 2);
227 }
228 }
229
230 /// Dropping the bridge shuts down the worker cleanly: the worker's
231 /// `recv` returns `None`, the loop exits, the runtime drops. We can't
232 /// observe the drop directly here, but if it deadlocked the test would
233 /// hang.
234 #[test]
235 fn drop_shuts_down_worker() {
236 let bridge = RuntimeBridge::new().expect("spawn bridge");
237 let _ = bridge.block_on(async { 1 });
238 drop(bridge);
239 }
240
241 /// Codex P2 #1 regression guard: concurrent callers on one bridge must
242 /// progress in parallel rather than head-of-line block on a single
243 /// `task.await`.
244 ///
245 /// Each of N caller threads calls `block_on` with a future that sleeps
246 /// for `delay`. Total wall time is measured by an external clock. If
247 /// the worker serialized requests (the pre-fix behaviour), total time
248 /// would be ≥ `N * delay`. With concurrent dispatch, every call sleeps
249 /// in parallel, so the worst case is ~`delay` plus a small scheduling
250 /// margin. The assertion bounds at `N * delay / 2`: tight enough to
251 /// fail the serial implementation, loose enough to survive slow CI.
252 #[test]
253 fn concurrent_callers_run_in_parallel() {
254 const N: usize = 4;
255 const DELAY: Duration = Duration::from_millis(250);
256
257 let bridge = Arc::new(RuntimeBridge::new().expect("spawn bridge"));
258 let barrier = Arc::new(Barrier::new(N));
259 let started = Instant::now();
260
261 let mut handles = Vec::with_capacity(N);
262 for _ in 0..N {
263 let bridge = Arc::clone(&bridge);
264 let barrier = Arc::clone(&barrier);
265 handles.push(thread::spawn(move || {
266 // Sync the dispatch instant across threads so the bridge
267 // sees all N requests at roughly the same time.
268 barrier.wait();
269 bridge
270 .block_on(async move {
271 tokio::time::sleep(DELAY).await;
272 })
273 .expect("ok")
274 }));
275 }
276 for h in handles {
277 h.join().expect("worker thread joined");
278 }
279 let elapsed = started.elapsed();
280 let serial_floor = DELAY * (N as u32);
281 assert!(
282 elapsed < serial_floor / 2,
283 "concurrent dispatch regressed to serial behaviour: \
284 elapsed {elapsed:?} >= serial_floor/2 ({:?}); \
285 N={N}, per-call delay={DELAY:?}",
286 serial_floor / 2,
287 );
288 }
289
290 /// Codex P2 #2 regression guard: a bridged task that panics must
291 /// surface as `BridgeError::ResponseLost` to its caller, and the
292 /// bridge must remain usable for subsequent calls. The pre-fix
293 /// `.expect(...)` path would have escalated into a process-level
294 /// panic in the caller's thread on the *next* call.
295 #[test]
296 fn panicking_task_returns_response_lost_and_bridge_stays_alive() {
297 let bridge = RuntimeBridge::new().expect("spawn bridge");
298 let result: Result<(), _> = bridge.block_on(async {
299 panic!("simulated task panic");
300 });
301 assert!(
302 matches!(result, Err(BridgeError::ResponseLost)),
303 "panicking task must return ResponseLost; got {result:?}",
304 );
305 // The bridge must still be usable — the worker thread is alive,
306 // only the panicking task died.
307 let next: u64 = bridge.block_on(async { 7 }).expect("ok");
308 assert_eq!(next, 7, "bridge must keep serving after a task panic");
309 }
310
311 /// Sanity check the `tokio::spawn` path inside the worker: when the
312 /// bridged future spawns its own tasks they must complete normally
313 /// (i.e. the runtime is still running the executor, not parked on a
314 /// blocking recv).
315 #[test]
316 fn worker_runtime_runs_inner_spawns() {
317 let bridge = RuntimeBridge::new().expect("spawn bridge");
318 let counter = Arc::new(AtomicUsize::new(0));
319 let counter_for_task = Arc::clone(&counter);
320 bridge
321 .block_on(async move {
322 let handles: Vec<_> = (0..8)
323 .map(|_| {
324 let c = Arc::clone(&counter_for_task);
325 tokio::spawn(async move {
326 c.fetch_add(1, Ordering::SeqCst);
327 })
328 })
329 .collect();
330 for h in handles {
331 h.await.expect("inner spawn joined");
332 }
333 })
334 .expect("ok");
335 assert_eq!(counter.load(Ordering::SeqCst), 8);
336 }
337}