Skip to main content

atomr_core/dispatch/
dispatcher.rs

1//! Dispatchers schedule actor cells onto a runtime.
2//! akka.net: `Dispatch/Dispatcher.cs`, `PinnedDispatcher.cs`.
3
4use std::future::Future;
5use std::sync::Arc;
6
7use tokio::runtime::{Handle, Runtime};
8use tokio::task::JoinHandle;
9
10/// Abstraction over "somewhere a task can run".
11pub trait Dispatcher: Send + Sync {
12    fn spawn_task(&self, task: futures_util::future::BoxFuture<'static, ()>) -> DispatcherHandle;
13
14    /// akka.net: `Throughput`.
15    fn throughput(&self) -> u32 {
16        10
17    }
18}
19
20pub struct DispatcherHandle(pub(crate) JoinHandle<()>);
21
22impl DispatcherHandle {
23    pub async fn join(self) {
24        let _ = self.0.await;
25    }
26
27    pub fn abort(&self) {
28        self.0.abort();
29    }
30}
31
32/// Default dispatcher — uses the ambient Tokio runtime.
33pub struct DefaultDispatcher {
34    handle: Handle,
35    throughput: u32,
36}
37
38impl DefaultDispatcher {
39    pub fn new(handle: Handle, throughput: u32) -> Self {
40        Self { handle, throughput }
41    }
42
43    pub fn current() -> Self {
44        Self::new(Handle::current(), 10)
45    }
46}
47
48impl Dispatcher for DefaultDispatcher {
49    fn spawn_task(&self, task: futures_util::future::BoxFuture<'static, ()>) -> DispatcherHandle {
50        DispatcherHandle(self.handle.spawn(task))
51    }
52
53    fn throughput(&self) -> u32 {
54        self.throughput
55    }
56}
57
58/// Dedicated single-thread runtime for actors that require strict affinity.
59/// akka.net: `PinnedDispatcher`.
60pub struct PinnedDispatcher {
61    rt: Arc<Runtime>,
62}
63
64impl PinnedDispatcher {
65    pub fn new() -> std::io::Result<Self> {
66        let rt = tokio::runtime::Builder::new_current_thread().enable_all().build()?;
67        Ok(Self { rt: Arc::new(rt) })
68    }
69}
70
71impl Dispatcher for PinnedDispatcher {
72    fn spawn_task(&self, task: futures_util::future::BoxFuture<'static, ()>) -> DispatcherHandle {
73        DispatcherHandle(self.rt.spawn(task))
74    }
75}
76
77/// Helper to run a future on the default tokio executor.
78pub fn spawn<F>(f: F) -> JoinHandle<F::Output>
79where
80    F: Future + Send + 'static,
81    F::Output: Send + 'static,
82{
83    tokio::spawn(f)
84}
85
86/// Multi-thread dedicated runtime sized by `worker_threads`.
87/// akka.net: `ThreadPoolDispatcher`.
88pub struct ThreadPoolDispatcher {
89    rt: Arc<Runtime>,
90    throughput: u32,
91}
92
93impl ThreadPoolDispatcher {
94    pub fn new(worker_threads: usize, throughput: u32) -> std::io::Result<Self> {
95        let rt = tokio::runtime::Builder::new_multi_thread()
96            .worker_threads(worker_threads.max(1))
97            .enable_all()
98            .build()?;
99        Ok(Self { rt: Arc::new(rt), throughput })
100    }
101}
102
103impl Dispatcher for ThreadPoolDispatcher {
104    fn spawn_task(&self, task: futures_util::future::BoxFuture<'static, ()>) -> DispatcherHandle {
105        DispatcherHandle(self.rt.spawn(task))
106    }
107    fn throughput(&self) -> u32 {
108        self.throughput
109    }
110}
111
112/// Dispatcher that runs the task immediately on the calling thread by
113/// using `tokio::task::spawn_blocking` to drive the future to completion
114/// inline. akka.net: `CallingThreadDispatcher`. Mostly useful in tests.
115pub struct CallingThreadDispatcher;
116
117impl Dispatcher for CallingThreadDispatcher {
118    fn spawn_task(&self, task: futures_util::future::BoxFuture<'static, ()>) -> DispatcherHandle {
119        DispatcherHandle(tokio::task::spawn(task))
120    }
121    fn throughput(&self) -> u32 {
122        1
123    }
124}
125
126#[cfg(test)]
127mod tests {
128    use super::*;
129
130    #[tokio::test]
131    async fn default_dispatcher_runs_task() {
132        let d = DefaultDispatcher::current();
133        let (tx, rx) = tokio::sync::oneshot::channel();
134        let h = d.spawn_task(Box::pin(async move {
135            tx.send(42u32).unwrap();
136        }));
137        assert_eq!(rx.await.unwrap(), 42);
138        h.join().await;
139    }
140}