mssf_util/tokio/
mod.rs

1// ------------------------------------------------------------
2// Copyright (c) Microsoft Corporation.  All rights reserved.
3// Licensed under the MIT License (MIT). See License.txt in the repo root for license information.
4// ------------------------------------------------------------
5
6//! tokio utilites
7
8use std::{
9    future::Future,
10    pin::Pin,
11    task::{Context, Poll},
12};
13
14use mssf_core::runtime::executor::{BoxedCancelToken, CancelToken, EventFuture, Executor, Timer};
15use tokio::runtime::Handle;
16
17#[cfg(test)]
18mod tests;
19
20#[derive(Clone)]
21pub struct TokioExecutor {
22    rt: Handle,
23}
24
25impl TokioExecutor {
26    pub fn new(rt: Handle) -> TokioExecutor {
27        TokioExecutor { rt }
28    }
29
30    /// Returns a reference to the tokio runtime handle.
31    pub fn get_ref(&self) -> &Handle {
32        &self.rt
33    }
34
35    /// Block on the current task safely.
36    /// Usually regular tokio block_on panics if it is already on the tokio task.
37    /// This allows block on tokio task, using spawn_blocking.
38    /// Note: This only works on multi-threaded runtime.
39    pub fn block_on_any<F>(&self, future: F) -> F::Output
40    where
41        F: Future + Send + 'static,
42        F::Output: Send,
43    {
44        match tokio::runtime::Handle::try_current() {
45            Ok(h) => {
46                // Currently on tokio thread.
47                // Need to block the task.
48                tokio::task::block_in_place(move || h.block_on(future))
49            }
50            Err(_) => {
51                // Not on tokio thread, safe to block it directly
52                self.rt.block_on(future)
53            }
54        }
55    }
56
57    /// Block the current thread until Ctrl+C is received.
58    /// This is typically used in SF app main function.
59    pub fn block_until_ctrlc(&self) {
60        self.rt.block_on(async {
61            tokio::signal::ctrl_c().await.expect("fail to get ctrl-c");
62        });
63    }
64}
65
66impl Executor for TokioExecutor {
67    fn spawn<F>(&self, future: F)
68    where
69        F: Future + Send + 'static,
70        F::Output: Send,
71    {
72        self.rt.spawn(future);
73    }
74}
75
76/// Sleep timer implementation for tokio
77pub struct TokioTimer;
78
79// TODO: the return type may be simplified if using return impl
80impl Timer for TokioTimer {
81    fn sleep(&self, duration: std::time::Duration) -> std::pin::Pin<Box<dyn EventFuture>> {
82        Box::pin(TokioSleep::new(tokio::time::sleep(duration)))
83    }
84}
85
86/// Sleep future implementation for tokio
87pub struct TokioSleep {
88    // May need to use pin_project
89    // to remove the Pin because the inner is not Unpin
90    inner: Pin<Box<tokio::time::Sleep>>,
91}
92
93impl TokioSleep {
94    pub fn new(sleep: tokio::time::Sleep) -> Self {
95        Self {
96            inner: Box::pin(sleep),
97        }
98    }
99}
100
101impl Future for TokioSleep {
102    type Output = ();
103
104    fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
105        // Poll the inner Sleep future
106        self.inner.as_mut().poll(cx)
107    }
108}
109
110/// CancelToken implementation for tokio
111/// User can use tokio's token and integrate with mssf.
112#[derive(Debug, Clone)]
113pub struct TokioCancelToken {
114    token: tokio_util::sync::CancellationToken,
115}
116
117impl CancelToken for TokioCancelToken {
118    fn is_cancelled(&self) -> bool {
119        self.token.is_cancelled()
120    }
121
122    fn cancel(&self) {
123        self.token.cancel()
124    }
125
126    fn wait(&self) -> Pin<Box<dyn EventFuture>> {
127        let fut = self.token.clone().cancelled_owned();
128        Box::pin(fut) as Pin<Box<dyn EventFuture>>
129    }
130
131    fn clone_box(&self) -> BoxedCancelToken {
132        Box::new(self.clone())
133    }
134}
135
136impl TokioCancelToken {
137    pub fn new() -> Self {
138        TokioCancelToken {
139            token: tokio_util::sync::CancellationToken::new(),
140        }
141    }
142
143    pub fn new_boxed() -> BoxedCancelToken {
144        Box::new(Self::new())
145    }
146
147    pub fn get_ref(&self) -> &tokio_util::sync::CancellationToken {
148        &self.token
149    }
150}
151
152impl From<tokio_util::sync::CancellationToken> for TokioCancelToken {
153    fn from(token: tokio_util::sync::CancellationToken) -> Self {
154        TokioCancelToken { token }
155    }
156}
157
158impl Default for TokioCancelToken {
159    fn default() -> Self {
160        Self::new()
161    }
162}