Skip to main content

liminal/durability/
bridge.rs

1//! Synchronous bridge for driving durable-store futures to completion.
2//!
3//! # Why a hand-rolled executor and why it cannot deadlock
4//!
5//! The runtime channel API ([`crate::channel::ChannelHandle::publish`] and
6//! [`crate::channel::ChannelHandle::flush`]) is synchronous: it is called from a
7//! beamr scheduler worker thread that drives a connection process. The durable
8//! storage surface ([`crate::durability::DurableStore`]) is `async`. This helper
9//! polls a single store future to completion on the *calling* thread without
10//! spawning a runtime or a thread.
11//!
12//! It is deadlock-free **because the current store backend is synchronous
13//! underneath**: the vendored haematite [`crate::durability::HaematiteStore`]
14//! completes every `append`/`cas`/`read`/`flush` under an in-memory lock and
15//! returns `Poll::Ready` on the first poll — there is no external I/O await, no
16//! cross-task channel, and nothing that hands control to another task that would
17//! need *this* thread to make progress. Concretely:
18//!
19//! * No runtime is created, so no other task contends for an executor.
20//! * No thread is spawned and no lock is held across the poll by this helper, so
21//!   it cannot form a wait cycle with another thread.
22//! * The future returns immediately, so the calling scheduler worker is not
23//!   blocked for any meaningful time.
24//!
25//! If a future-yielding, real-I/O store backend is ever introduced (the
26//! downstream mock -> on-disk-haematite swap), this synchronous bridge must be
27//! replaced with a real executor on a dedicated I/O thread.
28//!
29//! # Bounded, loud failure on a suspending future
30//!
31//! A synchronous-underneath future returns `Poll::Ready` on the FIRST poll. To
32//! make the unreachable suspending-backend case fail loudly instead of spinning
33//! forever, the poll loop is **bounded** to [`MAX_POLLS`] (a tiny margin above
34//! the single poll a correct future needs). If the future is still `Pending`
35//! after that bound, [`block_on`] returns [`BridgeError::DidNotComplete`] rather
36//! than busy-yielding indefinitely: a still-pending future here is a contract
37//! violation (a suspending `DurableStore` backend was wired without swapping in
38//! a real executor), and a loud, bounded error is far safer than a silent
39//! worker-starving hang. Production callers surface that error up their normal
40//! error path (no panic in production code). The happy path is unchanged:
41//! `Ready` on the first poll returns immediately with zero extra overhead.
42use std::future::Future;
43use std::pin::pin;
44use std::sync::Arc;
45use std::task::{Context, Poll, Wake, Waker};
46
47/// Maximum number of polls [`block_on`] performs before failing loudly.
48///
49/// A synchronous-underneath store future completes in exactly one poll; the
50/// margin tolerates a trivially-chained adapter future while still bounding the
51/// loop so a genuinely-suspending future fails fast instead of spinning.
52const MAX_POLLS: usize = 8;
53
54/// Failure raised when a future driven by [`block_on`] does not complete
55/// synchronously within the bounded poll budget.
56#[derive(Debug, thiserror::Error)]
57pub enum BridgeError {
58    /// The future was still `Poll::Pending` after [`MAX_POLLS`] polls.
59    #[error(
60        "durable bridge future did not complete synchronously after {polls} polls; \
61         a suspending DurableStore backend requires a real executor on a dedicated I/O \
62         thread — see bridge module docs"
63    )]
64    DidNotComplete {
65        /// Number of polls performed before giving up.
66        polls: usize,
67    },
68}
69
70/// Drives `future` to completion on the calling thread.
71///
72/// See the module documentation for the deadlock-freedom contract: this is only
73/// sound for store backends whose futures complete synchronously without
74/// awaiting external I/O.
75///
76/// # Errors
77///
78/// Returns [`BridgeError::DidNotComplete`] if `future` is still `Poll::Pending`
79/// after [`MAX_POLLS`] polls. This never happens for a synchronous-underneath
80/// store backend (which completes on the first poll); it signals that a
81/// suspending backend was introduced and requires a real executor on a
82/// dedicated I/O thread (see the module docs).
83pub fn block_on<F: Future>(future: F) -> Result<F::Output, BridgeError> {
84    let waker = Waker::from(Arc::new(NoopWaker));
85    let mut context = Context::from_waker(&waker);
86    let mut future = pin!(future);
87    for _ in 0..MAX_POLLS {
88        if let Poll::Ready(output) = future.as_mut().poll(&mut context) {
89            return Ok(output);
90        }
91        std::thread::yield_now();
92    }
93    Err(BridgeError::DidNotComplete { polls: MAX_POLLS })
94}
95
96struct NoopWaker;
97
98impl Wake for NoopWaker {
99    fn wake(self: Arc<Self>) {}
100}
101
102#[cfg(test)]
103mod tests {
104    use std::future::{pending, ready};
105
106    use super::{BridgeError, block_on};
107
108    #[test]
109    fn block_on_returns_value_from_ready_future() -> Result<(), BridgeError> {
110        assert_eq!(block_on(ready(42_u32))?, 42);
111        Ok(())
112    }
113
114    #[test]
115    fn block_on_fails_loudly_and_fast_on_always_pending_future() {
116        // `pending()` never resolves; the bound guarantees this terminates fast
117        // with the loud diagnostic instead of hanging.
118        let result = block_on(pending::<()>());
119        assert!(matches!(
120            result,
121            Err(BridgeError::DidNotComplete { polls }) if polls == super::MAX_POLLS
122        ));
123    }
124}