mssf_util/tokio/
mod.rs

1// ------------------------------------------------------------
2// Copyright (c) Microsoft Corporation.  All rights reserved.
3// Licensed under the MIT License (MIT). See License.txt in the repo root for license information.
4// ------------------------------------------------------------
5
6//! tokio utilites
7
8use std::{
9    future::Future,
10    pin::Pin,
11    task::{Context, Poll},
12};
13
14use mssf_core::runtime::executor::{BoxedCancelToken, CancelToken, EventFuture, Executor, Timer};
15use tokio::runtime::Handle;
16
17#[cfg(test)]
18mod tests;
19
20#[derive(Clone)]
21pub struct TokioExecutor {
22    rt: Handle,
23}
24
25impl TokioExecutor {
26    pub fn new(rt: Handle) -> TokioExecutor {
27        TokioExecutor { rt }
28    }
29
30    /// Returns a reference to the tokio runtime handle.
31    pub fn get_ref(&self) -> &Handle {
32        &self.rt
33    }
34
35    /// Block on the current task safely.
36    /// Usually regular tokio block_on panics if it is already on the tokio task.
37    /// This allows block on tokio task, using spawn_blocking.
38    /// Note: This only works on multi-threaded runtime.
39    pub fn block_on_any<F: Future>(&self, future: F) -> F::Output {
40        match tokio::runtime::Handle::try_current() {
41            Ok(h) => {
42                // Currently on tokio thread.
43                // Need to block the task.
44                tokio::task::block_in_place(move || h.block_on(future))
45            }
46            Err(_) => {
47                // Not on tokio thread, safe to block it directly
48                self.rt.block_on(future)
49            }
50        }
51    }
52
53    /// Block the current thread until Ctrl+C is received.
54    /// This is typically used in SF app main function.
55    pub fn block_until_ctrlc(&self) {
56        self.rt.block_on(async {
57            tokio::signal::ctrl_c().await.expect("fail to get ctrl-c");
58        });
59    }
60}
61
62impl Executor for TokioExecutor {
63    fn spawn<F>(&self, future: F)
64    where
65        F: Future + Send + 'static,
66        F::Output: Send,
67    {
68        self.rt.spawn(future);
69    }
70}
71
72/// Sleep timer implementation for tokio
73pub struct TokioTimer;
74
75// TODO: the return type may be simplified if using return impl
76impl Timer for TokioTimer {
77    fn sleep(&self, duration: std::time::Duration) -> std::pin::Pin<Box<dyn EventFuture>> {
78        Box::pin(TokioSleep::new(tokio::time::sleep(duration)))
79    }
80}
81
82/// Sleep future implementation for tokio
83pub struct TokioSleep {
84    // May need to use pin_project
85    // to remove the Pin because the inner is not Unpin
86    inner: Pin<Box<tokio::time::Sleep>>,
87}
88
89impl TokioSleep {
90    pub fn new(sleep: tokio::time::Sleep) -> Self {
91        Self {
92            inner: Box::pin(sleep),
93        }
94    }
95}
96
97impl Future for TokioSleep {
98    type Output = ();
99
100    fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
101        // Poll the inner Sleep future
102        self.inner.as_mut().poll(cx)
103    }
104}
105
106/// CancelToken implementation for tokio
107/// User can use tokio's token and integrate with mssf.
108#[derive(Debug, Clone)]
109pub struct TokioCancelToken {
110    token: tokio_util::sync::CancellationToken,
111}
112
113impl CancelToken for TokioCancelToken {
114    fn is_cancelled(&self) -> bool {
115        self.token.is_cancelled()
116    }
117
118    fn cancel(&self) {
119        self.token.cancel()
120    }
121
122    fn wait(&self) -> Pin<Box<dyn EventFuture>> {
123        let fut = self.token.clone().cancelled_owned();
124        Box::pin(fut) as Pin<Box<dyn EventFuture>>
125    }
126
127    fn clone_box(&self) -> BoxedCancelToken {
128        Box::new(self.clone())
129    }
130}
131
132impl TokioCancelToken {
133    pub fn new() -> Self {
134        TokioCancelToken {
135            token: tokio_util::sync::CancellationToken::new(),
136        }
137    }
138
139    pub fn new_boxed() -> BoxedCancelToken {
140        Box::new(Self::new())
141    }
142
143    pub fn get_ref(&self) -> &tokio_util::sync::CancellationToken {
144        &self.token
145    }
146}
147
148impl From<tokio_util::sync::CancellationToken> for TokioCancelToken {
149    fn from(token: tokio_util::sync::CancellationToken) -> Self {
150        TokioCancelToken { token }
151    }
152}
153
154impl Default for TokioCancelToken {
155    fn default() -> Self {
156        Self::new()
157    }
158}