1use std::{
9 future::Future,
10 pin::Pin,
11 task::{Context, Poll},
12};
13
14use mssf_core::runtime::executor::{BoxedCancelToken, CancelToken, EventFuture, Executor, Timer};
15use tokio::runtime::Handle;
16
17#[cfg(test)]
18mod tests;
19
20#[derive(Clone)]
21pub struct TokioExecutor {
22 rt: Handle,
23}
24
25impl TokioExecutor {
26 pub fn new(rt: Handle) -> TokioExecutor {
27 TokioExecutor { rt }
28 }
29
30 pub fn get_ref(&self) -> &Handle {
32 &self.rt
33 }
34
35 pub fn block_on_any<F>(&self, future: F) -> F::Output
40 where
41 F: Future + Send + 'static,
42 F::Output: Send,
43 {
44 match tokio::runtime::Handle::try_current() {
45 Ok(h) => {
46 tokio::task::block_in_place(move || h.block_on(future))
49 }
50 Err(_) => {
51 self.rt.block_on(future)
53 }
54 }
55 }
56
57 pub fn block_until_ctrlc(&self) {
60 self.rt.block_on(async {
61 tokio::signal::ctrl_c().await.expect("fail to get ctrl-c");
62 });
63 }
64}
65
66impl Executor for TokioExecutor {
67 fn spawn<F>(&self, future: F)
68 where
69 F: Future + Send + 'static,
70 F::Output: Send,
71 {
72 self.rt.spawn(future);
73 }
74}
75
76pub struct TokioTimer;
78
79impl Timer for TokioTimer {
81 fn sleep(&self, duration: std::time::Duration) -> std::pin::Pin<Box<dyn EventFuture>> {
82 Box::pin(TokioSleep::new(tokio::time::sleep(duration)))
83 }
84}
85
86pub struct TokioSleep {
88 inner: Pin<Box<tokio::time::Sleep>>,
91}
92
93impl TokioSleep {
94 pub fn new(sleep: tokio::time::Sleep) -> Self {
95 Self {
96 inner: Box::pin(sleep),
97 }
98 }
99}
100
101impl Future for TokioSleep {
102 type Output = ();
103
104 fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
105 self.inner.as_mut().poll(cx)
107 }
108}
109
110#[derive(Debug, Clone)]
113pub struct TokioCancelToken {
114 token: tokio_util::sync::CancellationToken,
115}
116
117impl CancelToken for TokioCancelToken {
118 fn is_cancelled(&self) -> bool {
119 self.token.is_cancelled()
120 }
121
122 fn cancel(&self) {
123 self.token.cancel()
124 }
125
126 fn wait(&self) -> Pin<Box<dyn EventFuture>> {
127 let fut = self.token.clone().cancelled_owned();
128 Box::pin(fut) as Pin<Box<dyn EventFuture>>
129 }
130
131 fn clone_box(&self) -> BoxedCancelToken {
132 Box::new(self.clone())
133 }
134}
135
136impl TokioCancelToken {
137 pub fn new() -> Self {
138 TokioCancelToken {
139 token: tokio_util::sync::CancellationToken::new(),
140 }
141 }
142
143 pub fn new_boxed() -> BoxedCancelToken {
144 Box::new(Self::new())
145 }
146
147 pub fn get_ref(&self) -> &tokio_util::sync::CancellationToken {
148 &self.token
149 }
150}
151
152impl From<tokio_util::sync::CancellationToken> for TokioCancelToken {
153 fn from(token: tokio_util::sync::CancellationToken) -> Self {
154 TokioCancelToken { token }
155 }
156}
157
158impl Default for TokioCancelToken {
159 fn default() -> Self {
160 Self::new()
161 }
162}