Skip to main content

runtime_bridge/
lib.rs

1// SPDX-License-Identifier: Apache-2.0
2//! Sync→async runtime bridge for Heddle.
3//!
4//! [`RuntimeBridge`] is a worker thread that owns a private current-thread
5//! Tokio runtime. Synchronous code hands futures to the worker over an
6//! async mpsc channel; the worker `tokio::spawn`s each future on its own
7//! runtime and the caller blocks on a per-request reply channel until the
8//! task completes. The caller's runtime (if any) is never re-entered.
9//!
10//! ## Why
11//!
12//! Heddle has several `ObjectStore` / `OpLogBackend` / `RefBackend`
13//! implementations whose trait surface is synchronous but whose underlying
14//! I/O is async (`aws-sdk-s3`, `sqlx`, etc.). A naive bridge —
15//! `Handle::current().block_on(...)` or
16//! `tokio::task::block_in_place(|| Handle::current().block_on(...))` —
17//! breaks in caller-flavor-dependent ways:
18//!
19//! * `Handle::current().block_on(...)` panics with "Cannot start a runtime
20//!   from within a runtime" when the caller is already on a Tokio runtime.
21//! * `block_in_place(...)` panics with "can call blocking only when running
22//!   on the multi-threaded runtime" when the caller is on a current-thread
23//!   runtime (e.g. `#[tokio::test(flavor = "current_thread")]`).
24//! * Neither works at all when the caller is on a non-Tokio thread.
25//!
26//! Routing through this bridge sidesteps all three: the future runs on the
27//! bridge's private runtime regardless of who calls it, and the caller's
28//! thread simply blocks on a reply channel.
29//!
30//! ## Concurrency
31//!
32//! The worker dispatches each request via [`tokio::spawn`] rather than
33//! awaiting it inline, so concurrent callers sharing one bridge can
34//! progress in parallel on the worker's runtime. This preserves the
35//! connection-level parallelism of pools like `sqlx::PgPool` instead of
36//! head-of-line blocking every caller behind the slowest in-flight query.
37//!
38//! ## Error recovery
39//!
40//! [`RuntimeBridge::block_on`] returns [`Result<T, BridgeError>`] so a dead
41//! worker surfaces as a recoverable error in the caller's `Result`-typed
42//! API rather than escalating into a process-level panic. A bridged task
43//! that panics aborts only that task: its reply channel is dropped and the
44//! waiting caller observes [`BridgeError::ResponseLost`]; the worker keeps
45//! serving other requests.
46//!
47//! ## Shutdown
48//!
49//! Dropping the bridge drops the `Sender`; the worker's `Receiver::recv`
50//! then returns `None`, the loop breaks, and the runtime is dropped on
51//! the worker thread. The `JoinHandle` is retained on the bridge so the
52//! thread isn't reaped before its in-flight requests drain.
53
54use std::{fmt, future::Future, io, pin::Pin, sync::mpsc, thread};
55
56use tokio::sync::mpsc as tokio_mpsc;
57
58/// Worker thread + private current-thread Tokio runtime used to drive
59/// async work off the caller's runtime.
60///
61/// See the crate-level docs for the design rationale. Construct with
62/// [`RuntimeBridge::new`]; submit work with [`RuntimeBridge::block_on`].
63pub struct RuntimeBridge {
64    tx: tokio_mpsc::UnboundedSender<BridgedTask>,
65    _worker: thread::JoinHandle<()>,
66}
67
68type BridgedTask = Pin<Box<dyn Future<Output = ()> + Send + 'static>>;
69
70/// Errors returned by [`RuntimeBridge::block_on`] when the bridge cannot
71/// complete a request. Both variants signal an unrecoverable problem with
72/// the worker for this one call; the bridge as a whole remains usable for
73/// subsequent calls only when [`BridgeError::ResponseLost`] is returned
74/// (the worker is still alive, just this one task died).
75#[derive(Debug)]
76pub enum BridgeError {
77    /// The worker thread is gone — sending the task failed because the
78    /// receiver side of the dispatch channel was dropped. In practice this
79    /// means the worker thread panicked while building or driving its
80    /// runtime; every later call will return the same error.
81    WorkerDead,
82    /// The task was accepted but no reply ever arrived. The future panicked
83    /// mid-poll (so its reply sender was dropped without sending) or the
84    /// worker's runtime was shut down underneath an in-flight task. Other
85    /// in-flight requests on the same bridge are unaffected.
86    ResponseLost,
87}
88
89impl fmt::Display for BridgeError {
90    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
91        match self {
92            Self::WorkerDead => {
93                f.write_str("runtime bridge: worker thread terminated; dispatch channel closed")
94            }
95            Self::ResponseLost => f.write_str(
96                "runtime bridge: task accepted but no reply received (task panicked \
97                 or runtime shut down mid-flight)",
98            ),
99        }
100    }
101}
102
103impl std::error::Error for BridgeError {}
104
105impl RuntimeBridge {
106    /// Spawn a worker thread with a private current-thread Tokio runtime.
107    ///
108    /// Returns the worker's `io::Error` if the OS refuses the thread spawn.
109    /// The thread name is `heddle-runtime-bridge` so it's identifiable in
110    /// stack traces and process listings; pick a more specific wrapper at
111    /// the call site if you need per-consumer naming.
112    pub fn new() -> io::Result<Self> {
113        Self::with_thread_name("heddle-runtime-bridge")
114    }
115
116    /// Same as [`RuntimeBridge::new`] but uses a custom worker thread name.
117    pub fn with_thread_name(thread_name: impl Into<String>) -> io::Result<Self> {
118        // tokio mpsc lets the worker `await` on `recv` so the executor
119        // stays scheduling spawned tasks between dispatches. A std mpsc
120        // would block the runtime thread on `recv`, defeating the spawn
121        // concurrency fix.
122        let (tx, mut rx) = tokio_mpsc::unbounded_channel::<BridgedTask>();
123        let worker = thread::Builder::new()
124            .name(thread_name.into())
125            .spawn(move || {
126                let runtime = tokio::runtime::Builder::new_current_thread()
127                    .enable_all()
128                    .build()
129                    .expect("build heddle-runtime-bridge worker runtime");
130                runtime.block_on(async move {
131                    // `tokio::spawn` runs each task concurrently on the
132                    // current-thread runtime; the loop returns to `recv`
133                    // immediately so subsequent callers don't queue behind
134                    // the slowest in-flight task. Task `JoinHandle`s are
135                    // dropped (tasks are detached); each task signals its
136                    // caller through its private reply channel.
137                    while let Some(task) = rx.recv().await {
138                        tokio::spawn(task);
139                    }
140                });
141            })?;
142        Ok(Self {
143            tx,
144            _worker: worker,
145        })
146    }
147
148    /// Run `future` on the worker's runtime and block the caller until it
149    /// completes, returning the future's output or a [`BridgeError`] when
150    /// the worker cannot deliver a reply.
151    ///
152    /// `future` must be `Send + 'static`; compose it from owned data
153    /// (`Arc` clones, owned `String` keys, serialized bodies), not borrows
154    /// of `&self`.
155    ///
156    /// ## Errors
157    ///
158    /// - [`BridgeError::WorkerDead`] when the worker thread has terminated
159    ///   (its receive channel was dropped). Subsequent calls will keep
160    ///   returning this error; the bridge cannot recover.
161    /// - [`BridgeError::ResponseLost`] when the worker accepted the task
162    ///   but no reply arrived — the future panicked, or the worker's
163    ///   runtime was dropped while the task was in flight. The bridge
164    ///   itself remains usable; other in-flight callers are unaffected.
165    pub fn block_on<F, T>(&self, future: F) -> Result<T, BridgeError>
166    where
167        F: Future<Output = T> + Send + 'static,
168        T: Send + 'static,
169    {
170        // Capacity 1: each call sends exactly one value and then drops.
171        let (reply_tx, reply_rx) = mpsc::sync_channel::<T>(1);
172        let task: BridgedTask = Box::pin(async move {
173            let value = future.await;
174            // The receiver is held on the caller's stack until `recv`
175            // returns, so `send` is infallible in practice.
176            let _ = reply_tx.send(value);
177        });
178        self.tx.send(task).map_err(|_| BridgeError::WorkerDead)?;
179        reply_rx.recv().map_err(|_| BridgeError::ResponseLost)
180    }
181}
182
183#[cfg(test)]
184mod tests {
185    use std::{
186        sync::{
187            Arc, Barrier,
188            atomic::{AtomicUsize, Ordering},
189        },
190        time::{Duration, Instant},
191    };
192
193    use super::*;
194
195    #[test]
196    fn block_on_from_non_tokio_thread() {
197        let bridge = RuntimeBridge::new().expect("spawn bridge");
198        let value = bridge.block_on(async { 1 + 2 }).expect("ok");
199        assert_eq!(value, 3);
200    }
201
202    /// The whole point of this crate: a current-thread Tokio runtime can
203    /// drive sync work through the bridge without the
204    /// `tokio::task::block_in_place` panic that
205    /// `Handle::current().block_on(...)` would trigger.
206    #[tokio::test(flavor = "current_thread")]
207    async fn block_on_from_current_thread_runtime() {
208        let bridge = RuntimeBridge::new().expect("spawn bridge");
209        let value = bridge.block_on(async { "ok".to_string() }).expect("ok");
210        assert_eq!(value, "ok");
211    }
212
213    #[tokio::test(flavor = "multi_thread", worker_threads = 2)]
214    async fn block_on_from_multi_thread_runtime() {
215        let bridge = RuntimeBridge::new().expect("spawn bridge");
216        let value = bridge.block_on(async { 42_u64 }).expect("ok");
217        assert_eq!(value, 42);
218    }
219
220    /// Multiple sequential calls share one worker thread.
221    #[test]
222    fn block_on_sequential_calls() {
223        let bridge = Arc::new(RuntimeBridge::new().expect("spawn bridge"));
224        for i in 0..5 {
225            let got: u32 = bridge.block_on(async move { i * 2 }).expect("ok");
226            assert_eq!(got, i * 2);
227        }
228    }
229
230    /// Dropping the bridge shuts down the worker cleanly: the worker's
231    /// `recv` returns `None`, the loop exits, the runtime drops. We can't
232    /// observe the drop directly here, but if it deadlocked the test would
233    /// hang.
234    #[test]
235    fn drop_shuts_down_worker() {
236        let bridge = RuntimeBridge::new().expect("spawn bridge");
237        let _ = bridge.block_on(async { 1 });
238        drop(bridge);
239    }
240
241    /// Codex P2 #1 regression guard: concurrent callers on one bridge must
242    /// progress in parallel rather than head-of-line block on a single
243    /// `task.await`.
244    ///
245    /// Each of N caller threads calls `block_on` with a future that sleeps
246    /// for `delay`. Total wall time is measured by an external clock. If
247    /// the worker serialized requests (the pre-fix behaviour), total time
248    /// would be ≥ `N * delay`. With concurrent dispatch, every call sleeps
249    /// in parallel, so the worst case is ~`delay` plus a small scheduling
250    /// margin. The assertion bounds at `N * delay / 2`: tight enough to
251    /// fail the serial implementation, loose enough to survive slow CI.
252    #[test]
253    fn concurrent_callers_run_in_parallel() {
254        const N: usize = 4;
255        const DELAY: Duration = Duration::from_millis(250);
256
257        let bridge = Arc::new(RuntimeBridge::new().expect("spawn bridge"));
258        let barrier = Arc::new(Barrier::new(N));
259        let started = Instant::now();
260
261        let mut handles = Vec::with_capacity(N);
262        for _ in 0..N {
263            let bridge = Arc::clone(&bridge);
264            let barrier = Arc::clone(&barrier);
265            handles.push(thread::spawn(move || {
266                // Sync the dispatch instant across threads so the bridge
267                // sees all N requests at roughly the same time.
268                barrier.wait();
269                bridge
270                    .block_on(async move {
271                        tokio::time::sleep(DELAY).await;
272                    })
273                    .expect("ok")
274            }));
275        }
276        for h in handles {
277            h.join().expect("worker thread joined");
278        }
279        let elapsed = started.elapsed();
280        let serial_floor = DELAY * (N as u32);
281        assert!(
282            elapsed < serial_floor / 2,
283            "concurrent dispatch regressed to serial behaviour: \
284             elapsed {elapsed:?} >= serial_floor/2 ({:?}); \
285             N={N}, per-call delay={DELAY:?}",
286            serial_floor / 2,
287        );
288    }
289
290    /// Codex P2 #2 regression guard: a bridged task that panics must
291    /// surface as `BridgeError::ResponseLost` to its caller, and the
292    /// bridge must remain usable for subsequent calls. The pre-fix
293    /// `.expect(...)` path would have escalated into a process-level
294    /// panic in the caller's thread on the *next* call.
295    #[test]
296    fn panicking_task_returns_response_lost_and_bridge_stays_alive() {
297        let bridge = RuntimeBridge::new().expect("spawn bridge");
298        let result: Result<(), _> = bridge.block_on(async {
299            panic!("simulated task panic");
300        });
301        assert!(
302            matches!(result, Err(BridgeError::ResponseLost)),
303            "panicking task must return ResponseLost; got {result:?}",
304        );
305        // The bridge must still be usable — the worker thread is alive,
306        // only the panicking task died.
307        let next: u64 = bridge.block_on(async { 7 }).expect("ok");
308        assert_eq!(next, 7, "bridge must keep serving after a task panic");
309    }
310
311    /// Sanity check the `tokio::spawn` path inside the worker: when the
312    /// bridged future spawns its own tasks they must complete normally
313    /// (i.e. the runtime is still running the executor, not parked on a
314    /// blocking recv).
315    #[test]
316    fn worker_runtime_runs_inner_spawns() {
317        let bridge = RuntimeBridge::new().expect("spawn bridge");
318        let counter = Arc::new(AtomicUsize::new(0));
319        let counter_for_task = Arc::clone(&counter);
320        bridge
321            .block_on(async move {
322                let handles: Vec<_> = (0..8)
323                    .map(|_| {
324                        let c = Arc::clone(&counter_for_task);
325                        tokio::spawn(async move {
326                            c.fetch_add(1, Ordering::SeqCst);
327                        })
328                    })
329                    .collect();
330                for h in handles {
331                    h.await.expect("inner spawn joined");
332                }
333            })
334            .expect("ok");
335        assert_eq!(counter.load(Ordering::SeqCst), 8);
336    }
337}