atomr_core/dispatch/
dispatcher.rs1use std::future::Future;
5use std::sync::Arc;
6
7use tokio::runtime::{Handle, Runtime};
8use tokio::task::JoinHandle;
9
10pub trait Dispatcher: Send + Sync {
12 fn spawn_task(&self, task: futures_util::future::BoxFuture<'static, ()>) -> DispatcherHandle;
13
14 fn throughput(&self) -> u32 {
16 10
17 }
18}
19
20pub struct DispatcherHandle(pub(crate) JoinHandle<()>);
21
22impl DispatcherHandle {
23 pub async fn join(self) {
24 let _ = self.0.await;
25 }
26
27 pub fn abort(&self) {
28 self.0.abort();
29 }
30}
31
32pub struct DefaultDispatcher {
34 handle: Handle,
35 throughput: u32,
36}
37
38impl DefaultDispatcher {
39 pub fn new(handle: Handle, throughput: u32) -> Self {
40 Self { handle, throughput }
41 }
42
43 pub fn current() -> Self {
44 Self::new(Handle::current(), 10)
45 }
46}
47
48impl Dispatcher for DefaultDispatcher {
49 fn spawn_task(&self, task: futures_util::future::BoxFuture<'static, ()>) -> DispatcherHandle {
50 DispatcherHandle(self.handle.spawn(task))
51 }
52
53 fn throughput(&self) -> u32 {
54 self.throughput
55 }
56}
57
58pub struct PinnedDispatcher {
61 rt: Arc<Runtime>,
62}
63
64impl PinnedDispatcher {
65 pub fn new() -> std::io::Result<Self> {
66 let rt = tokio::runtime::Builder::new_current_thread().enable_all().build()?;
67 Ok(Self { rt: Arc::new(rt) })
68 }
69}
70
71impl Dispatcher for PinnedDispatcher {
72 fn spawn_task(&self, task: futures_util::future::BoxFuture<'static, ()>) -> DispatcherHandle {
73 DispatcherHandle(self.rt.spawn(task))
74 }
75}
76
77pub fn spawn<F>(f: F) -> JoinHandle<F::Output>
79where
80 F: Future + Send + 'static,
81 F::Output: Send + 'static,
82{
83 tokio::spawn(f)
84}
85
86pub struct ThreadPoolDispatcher {
89 rt: Arc<Runtime>,
90 throughput: u32,
91}
92
93impl ThreadPoolDispatcher {
94 pub fn new(worker_threads: usize, throughput: u32) -> std::io::Result<Self> {
95 let rt = tokio::runtime::Builder::new_multi_thread()
96 .worker_threads(worker_threads.max(1))
97 .enable_all()
98 .build()?;
99 Ok(Self { rt: Arc::new(rt), throughput })
100 }
101}
102
103impl Dispatcher for ThreadPoolDispatcher {
104 fn spawn_task(&self, task: futures_util::future::BoxFuture<'static, ()>) -> DispatcherHandle {
105 DispatcherHandle(self.rt.spawn(task))
106 }
107 fn throughput(&self) -> u32 {
108 self.throughput
109 }
110}
111
112pub struct CallingThreadDispatcher;
116
117impl Dispatcher for CallingThreadDispatcher {
118 fn spawn_task(&self, task: futures_util::future::BoxFuture<'static, ()>) -> DispatcherHandle {
119 DispatcherHandle(tokio::task::spawn(task))
120 }
121 fn throughput(&self) -> u32 {
122 1
123 }
124}
125
126#[cfg(test)]
127mod tests {
128 use super::*;
129
130 #[tokio::test]
131 async fn default_dispatcher_runs_task() {
132 let d = DefaultDispatcher::current();
133 let (tx, rx) = tokio::sync::oneshot::channel();
134 let h = d.spawn_task(Box::pin(async move {
135 tx.send(42u32).unwrap();
136 }));
137 assert_eq!(rx.await.unwrap(), 42);
138 h.join().await;
139 }
140}