Skip to main content

palladium_runtime/
reactor.rs

1use std::cell::RefCell;
2use std::future::Future;
3use std::pin::Pin;
4use std::time::Duration;
5
6thread_local! {
7    static CURRENT_REACTOR: RefCell<Option<Box<dyn palladium_actor::Reactor>>> = RefCell::new(None);
8}
9
10/// Set the reactor for the current thread. Used by simulation to allow
11/// libraries with static-method dependencies (like OpenRaft) to find the reactor.
12pub fn set_current_reactor(reactor: Box<dyn palladium_actor::Reactor>) {
13    CURRENT_REACTOR.with(|r| {
14        *r.borrow_mut() = Some(reactor);
15    });
16}
17
18/// Execute a closure with the given reactor set as current for the duration of the call.
19pub fn with_reactor<R, F: FnOnce() -> R>(reactor: Box<dyn palladium_actor::Reactor>, f: F) -> R {
20    let prev = CURRENT_REACTOR.with(|r| r.borrow_mut().replace(reactor));
21    let result = f();
22    CURRENT_REACTOR.with(|r| {
23        *r.borrow_mut() = prev;
24    });
25    result
26}
27
28/// Access the current thread's reactor, if set.
29pub fn get_current_reactor() -> Option<Box<dyn palladium_actor::Reactor>> {
30    CURRENT_REACTOR.with(|r| r.borrow().as_ref().map(|r| r.clone_box()))
31}
32
33/// Handle to a spawned task.
34pub struct TokioSpawnHandle(tokio::task::JoinHandle<()>);
35
36impl palladium_actor::SpawnHandle for TokioSpawnHandle {
37    fn abort(&self) {
38        self.0.abort();
39    }
40}
41
42/// Type-erased handle to a spawned task.  Allows aborting a task regardless
43/// of which reactor spawned it.
44pub trait SpawnHandle: palladium_actor::SpawnHandle + Send + Sync + 'static {}
45
46impl SpawnHandle for TokioSpawnHandle {}
47
48/// Abstraction over the async task scheduler (ADR-001).
49///
50/// Production code uses `TokioReactor`; simulation tests swap in `SimReactor`
51/// (Phase 3) to control scheduling and time deterministically.
52pub trait Reactor: palladium_actor::Reactor + Clone + Send + Sync + 'static {
53    // Methods are inherited from palladium_actor::Reactor.
54    // Use fully qualified syntax or unambiguous naming if needed for
55    // internal performance generic versions.
56}
57
58/// A periodic timer that can be polled or awaited.
59pub trait Interval: palladium_actor::Interval + Send + 'static {}
60
61/// Production reactor wrapping a Tokio `current_thread` runtime.
62#[derive(Clone, Default)]
63pub struct TokioReactor;
64
65struct TokioInterval(tokio::time::Interval);
66
67impl palladium_actor::Interval for TokioInterval {
68    fn tick(&mut self) -> Pin<Box<dyn Future<Output = ()> + Send + '_>> {
69        Box::pin(async move {
70            self.0.tick().await;
71        })
72    }
73}
74
75impl Interval for TokioInterval {}
76
77impl palladium_actor::Reactor for TokioReactor {
78    fn clone_box(&self) -> Box<dyn palladium_actor::Reactor> {
79        Box::new(self.clone())
80    }
81
82    fn spawn_local(
83        &self,
84        fut: Pin<Box<dyn Future<Output = ()> + Send + 'static>>,
85    ) -> Box<dyn palladium_actor::SpawnHandle> {
86        Box::new(TokioSpawnHandle(tokio::task::spawn_local(fut)))
87    }
88
89    fn spawn(
90        &self,
91        fut: Pin<Box<dyn Future<Output = ()> + Send + 'static>>,
92    ) -> Box<dyn palladium_actor::SpawnHandle> {
93        Box::new(TokioSpawnHandle(tokio::spawn(fut)))
94    }
95
96    fn yield_now(&self) -> Pin<Box<dyn Future<Output = ()> + Send + Sync + 'static>> {
97        Box::pin(tokio::task::yield_now())
98    }
99
100    fn sleep(
101        &self,
102        duration: Duration,
103    ) -> Pin<Box<dyn Future<Output = ()> + Send + Sync + 'static>> {
104        Box::pin(tokio::time::sleep(duration))
105    }
106
107    fn now(&self) -> std::time::Instant {
108        tokio::time::Instant::now().into_std()
109    }
110
111    fn elapsed_since(&self, start: std::time::Instant) -> u64 {
112        start.elapsed().as_nanos() as u64
113    }
114
115    fn create_interval(&self, duration: Duration) -> Box<dyn palladium_actor::Interval> {
116        Box::new(TokioInterval(tokio::time::interval(duration)))
117    }
118
119    fn next_u64(&self) -> u64 {
120        use rand::RngCore;
121        rand::thread_rng().next_u64()
122    }
123}
124
125impl Reactor for TokioReactor {}