use std::{
future::Future,
pin::Pin,
task::{Context, Poll},
};
use mssf_core::runtime::executor::{BoxedCancelToken, CancelToken, EventFuture, Executor, Timer};
use tokio::runtime::Handle;
#[cfg(test)]
mod tests;
#[derive(Clone)]
pub struct TokioExecutor {
rt: Handle,
}
impl TokioExecutor {
pub fn new(rt: Handle) -> TokioExecutor {
TokioExecutor { rt }
}
pub fn get_ref(&self) -> &Handle {
&self.rt
}
pub fn block_on_any<F: Future>(&self, future: F) -> F::Output {
match tokio::runtime::Handle::try_current() {
Ok(h) => {
tokio::task::block_in_place(move || h.block_on(future))
}
Err(_) => {
self.rt.block_on(future)
}
}
}
pub fn block_until_ctrlc(&self) {
self.rt.block_on(async {
tokio::signal::ctrl_c().await.expect("fail to get ctrl-c");
});
}
}
impl Executor for TokioExecutor {
fn spawn<F>(&self, future: F)
where
F: Future + Send + 'static,
F::Output: Send,
{
self.rt.spawn(future);
}
}
pub struct TokioTimer;
impl Timer for TokioTimer {
fn sleep(&self, duration: std::time::Duration) -> std::pin::Pin<Box<dyn EventFuture>> {
Box::pin(TokioSleep::new(tokio::time::sleep(duration)))
}
}
pub struct TokioSleep {
inner: Pin<Box<tokio::time::Sleep>>,
}
impl TokioSleep {
pub fn new(sleep: tokio::time::Sleep) -> Self {
Self {
inner: Box::pin(sleep),
}
}
}
impl Future for TokioSleep {
type Output = ();
fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
self.inner.as_mut().poll(cx)
}
}
#[derive(Debug, Clone)]
pub struct TokioCancelToken {
token: tokio_util::sync::CancellationToken,
}
impl CancelToken for TokioCancelToken {
fn is_cancelled(&self) -> bool {
self.token.is_cancelled()
}
fn cancel(&self) {
self.token.cancel()
}
fn wait(&self) -> Pin<Box<dyn EventFuture>> {
let fut = self.token.clone().cancelled_owned();
Box::pin(fut) as Pin<Box<dyn EventFuture>>
}
fn clone_box(&self) -> BoxedCancelToken {
Box::new(self.clone())
}
}
impl TokioCancelToken {
pub fn new() -> Self {
TokioCancelToken {
token: tokio_util::sync::CancellationToken::new(),
}
}
pub fn new_boxed() -> BoxedCancelToken {
Box::new(Self::new())
}
pub fn get_ref(&self) -> &tokio_util::sync::CancellationToken {
&self.token
}
}
impl From<tokio_util::sync::CancellationToken> for TokioCancelToken {
fn from(token: tokio_util::sync::CancellationToken) -> Self {
TokioCancelToken { token }
}
}
impl Default for TokioCancelToken {
fn default() -> Self {
Self::new()
}
}