1use std::{
9 future::Future,
10 pin::Pin,
11 task::{Context, Poll},
12};
13
14use mssf_core::runtime::executor::{BoxedCancelToken, CancelToken, EventFuture, Executor, Timer};
15use tokio::runtime::Handle;
16
17#[cfg(test)]
18mod tests;
19
20#[derive(Clone)]
21pub struct TokioExecutor {
22 rt: Handle,
23}
24
25impl TokioExecutor {
26 pub fn new(rt: Handle) -> TokioExecutor {
27 TokioExecutor { rt }
28 }
29
30 pub fn get_ref(&self) -> &Handle {
32 &self.rt
33 }
34
35 pub fn block_on_any<F: Future>(&self, future: F) -> F::Output {
40 match tokio::runtime::Handle::try_current() {
41 Ok(h) => {
42 tokio::task::block_in_place(move || h.block_on(future))
45 }
46 Err(_) => {
47 self.rt.block_on(future)
49 }
50 }
51 }
52
53 pub fn block_until_ctrlc(&self) {
56 self.rt.block_on(async {
57 tokio::signal::ctrl_c().await.expect("fail to get ctrl-c");
58 });
59 }
60}
61
62impl Executor for TokioExecutor {
63 fn spawn<F>(&self, future: F)
64 where
65 F: Future + Send + 'static,
66 F::Output: Send,
67 {
68 self.rt.spawn(future);
69 }
70}
71
72pub struct TokioTimer;
74
75impl Timer for TokioTimer {
77 fn sleep(&self, duration: std::time::Duration) -> std::pin::Pin<Box<dyn EventFuture>> {
78 Box::pin(TokioSleep::new(tokio::time::sleep(duration)))
79 }
80}
81
82pub struct TokioSleep {
84 inner: Pin<Box<tokio::time::Sleep>>,
87}
88
89impl TokioSleep {
90 pub fn new(sleep: tokio::time::Sleep) -> Self {
91 Self {
92 inner: Box::pin(sleep),
93 }
94 }
95}
96
97impl Future for TokioSleep {
98 type Output = ();
99
100 fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
101 self.inner.as_mut().poll(cx)
103 }
104}
105
106#[derive(Debug, Clone)]
109pub struct TokioCancelToken {
110 token: tokio_util::sync::CancellationToken,
111}
112
113impl CancelToken for TokioCancelToken {
114 fn is_cancelled(&self) -> bool {
115 self.token.is_cancelled()
116 }
117
118 fn cancel(&self) {
119 self.token.cancel()
120 }
121
122 fn wait(&self) -> Pin<Box<dyn EventFuture>> {
123 let fut = self.token.clone().cancelled_owned();
124 Box::pin(fut) as Pin<Box<dyn EventFuture>>
125 }
126
127 fn clone_box(&self) -> BoxedCancelToken {
128 Box::new(self.clone())
129 }
130}
131
132impl TokioCancelToken {
133 pub fn new() -> Self {
134 TokioCancelToken {
135 token: tokio_util::sync::CancellationToken::new(),
136 }
137 }
138
139 pub fn new_boxed() -> BoxedCancelToken {
140 Box::new(Self::new())
141 }
142
143 pub fn get_ref(&self) -> &tokio_util::sync::CancellationToken {
144 &self.token
145 }
146}
147
148impl From<tokio_util::sync::CancellationToken> for TokioCancelToken {
149 fn from(token: tokio_util::sync::CancellationToken) -> Self {
150 TokioCancelToken { token }
151 }
152}
153
154impl Default for TokioCancelToken {
155 fn default() -> Self {
156 Self::new()
157 }
158}