palladium_runtime/
reactor.rs1use std::cell::RefCell;
2use std::future::Future;
3use std::pin::Pin;
4use std::time::Duration;
5
6thread_local! {
7 static CURRENT_REACTOR: RefCell<Option<Box<dyn palladium_actor::Reactor>>> = RefCell::new(None);
8}
9
10pub fn set_current_reactor(reactor: Box<dyn palladium_actor::Reactor>) {
13 CURRENT_REACTOR.with(|r| {
14 *r.borrow_mut() = Some(reactor);
15 });
16}
17
18pub fn with_reactor<R, F: FnOnce() -> R>(reactor: Box<dyn palladium_actor::Reactor>, f: F) -> R {
20 let prev = CURRENT_REACTOR.with(|r| r.borrow_mut().replace(reactor));
21 let result = f();
22 CURRENT_REACTOR.with(|r| {
23 *r.borrow_mut() = prev;
24 });
25 result
26}
27
28pub fn get_current_reactor() -> Option<Box<dyn palladium_actor::Reactor>> {
30 CURRENT_REACTOR.with(|r| r.borrow().as_ref().map(|r| r.clone_box()))
31}
32
33pub struct TokioSpawnHandle(tokio::task::JoinHandle<()>);
35
36impl palladium_actor::SpawnHandle for TokioSpawnHandle {
37 fn abort(&self) {
38 self.0.abort();
39 }
40}
41
42pub trait SpawnHandle: palladium_actor::SpawnHandle + Send + Sync + 'static {}
45
46impl SpawnHandle for TokioSpawnHandle {}
47
48pub trait Reactor: palladium_actor::Reactor + Clone + Send + Sync + 'static {
53 }
57
58pub trait Interval: palladium_actor::Interval + Send + 'static {}
60
61#[derive(Clone, Default)]
63pub struct TokioReactor;
64
65struct TokioInterval(tokio::time::Interval);
66
67impl palladium_actor::Interval for TokioInterval {
68 fn tick(&mut self) -> Pin<Box<dyn Future<Output = ()> + Send + '_>> {
69 Box::pin(async move {
70 self.0.tick().await;
71 })
72 }
73}
74
75impl Interval for TokioInterval {}
76
77impl palladium_actor::Reactor for TokioReactor {
78 fn clone_box(&self) -> Box<dyn palladium_actor::Reactor> {
79 Box::new(self.clone())
80 }
81
82 fn spawn_local(
83 &self,
84 fut: Pin<Box<dyn Future<Output = ()> + Send + 'static>>,
85 ) -> Box<dyn palladium_actor::SpawnHandle> {
86 Box::new(TokioSpawnHandle(tokio::task::spawn_local(fut)))
87 }
88
89 fn spawn(
90 &self,
91 fut: Pin<Box<dyn Future<Output = ()> + Send + 'static>>,
92 ) -> Box<dyn palladium_actor::SpawnHandle> {
93 Box::new(TokioSpawnHandle(tokio::spawn(fut)))
94 }
95
96 fn yield_now(&self) -> Pin<Box<dyn Future<Output = ()> + Send + Sync + 'static>> {
97 Box::pin(tokio::task::yield_now())
98 }
99
100 fn sleep(
101 &self,
102 duration: Duration,
103 ) -> Pin<Box<dyn Future<Output = ()> + Send + Sync + 'static>> {
104 Box::pin(tokio::time::sleep(duration))
105 }
106
107 fn now(&self) -> std::time::Instant {
108 tokio::time::Instant::now().into_std()
109 }
110
111 fn elapsed_since(&self, start: std::time::Instant) -> u64 {
112 start.elapsed().as_nanos() as u64
113 }
114
115 fn create_interval(&self, duration: Duration) -> Box<dyn palladium_actor::Interval> {
116 Box::new(TokioInterval(tokio::time::interval(duration)))
117 }
118
119 fn next_u64(&self) -> u64 {
120 use rand::RngCore;
121 rand::thread_rng().next_u64()
122 }
123}
124
125impl Reactor for TokioReactor {}