async_rs/implementors/
smol.rs

1//! smol implementation of async runtime definition traits
2
3use crate::{
4    AsyncIOHandle, Executor, IOHandle, Reactor, Runtime, RuntimeKit, Task, TimerTask, sys::IO,
5};
6use async_trait::async_trait;
7use futures_core::Stream;
8use smol::{Async, Timer};
9use std::{
10    future::Future,
11    io,
12    net::{SocketAddr, TcpStream},
13    pin::Pin,
14    task::{Context, Poll},
15    time::{Duration, Instant},
16};
17
18/// Type alias for the smol runtime
19pub type SmolRuntime = Runtime<Smol>;
20
21impl SmolRuntime {
22    /// Create a new SmolRuntime
23    pub fn smol() -> Self {
24        Self::new(Smol)
25    }
26}
27
28/// Dummy object implementing async common interfaces on top of smol
29#[derive(Debug, Default, Copy, Clone, PartialEq, Eq, PartialOrd, Ord)]
30pub struct Smol;
31
32struct STask<T: Send>(Option<smol::Task<T>>);
33
34impl RuntimeKit for Smol {}
35
36impl Executor for Smol {
37    fn block_on<T, F: Future<Output = T>>(&self, f: F) -> T {
38        smol::block_on(f)
39    }
40
41    fn spawn<T: Send + 'static>(
42        &self,
43        f: impl Future<Output = T> + Send + 'static,
44    ) -> impl Task<T> {
45        STask(Some(smol::spawn(f)))
46    }
47
48    fn spawn_blocking<F: FnOnce() -> T + Send + 'static, T: Send + 'static>(
49        &self,
50        f: F,
51    ) -> impl Task<T> {
52        STask(Some(smol::unblock(f)))
53    }
54}
55
56#[async_trait(?Send)]
57impl<T: Send> Task<T> for STask<T> {
58    async fn cancel(&mut self) -> Option<T> {
59        self.0.take()?.cancel().await
60    }
61}
62
63impl<T: Send> Drop for STask<T> {
64    fn drop(&mut self) {
65        if let Some(task) = self.0.take() {
66            task.detach();
67        }
68    }
69}
70
71impl<T: Send> Future for STask<T> {
72    type Output = T;
73
74    fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
75        Pin::new(self.0.as_mut().expect("task canceled")).poll(cx)
76    }
77}
78
79#[async_trait]
80impl Reactor for Smol {
81    fn register<H: IO + Send + 'static>(
82        &self,
83        socket: IOHandle<H>,
84    ) -> io::Result<impl AsyncIOHandle + Send> {
85        Async::new(socket)
86    }
87
88    fn sleep(&self, dur: Duration) -> impl Future<Output = ()> {
89        TimerTask(Timer::after(dur))
90    }
91
92    fn interval(&self, dur: Duration) -> impl Stream<Item = Instant> {
93        Timer::interval(dur)
94    }
95
96    async fn tcp_connect(&self, addr: SocketAddr) -> io::Result<impl AsyncIOHandle + Send> {
97        Async::<TcpStream>::connect(addr).await
98    }
99}
100
101#[cfg(test)]
102mod tests {
103    use super::*;
104
105    #[test]
106    fn dyn_compat() {
107        struct Test {
108            _executor: Box<dyn Executor>,
109            _reactor: Box<dyn Reactor>,
110            _kit: Box<dyn RuntimeKit>,
111            _task: Box<dyn Task<String>>,
112        }
113
114        let _ = Test {
115            _executor: Box::new(Smol),
116            _reactor: Box::new(Smol),
117            _kit: Box::new(Smol),
118            _task: Box::new(STask(None)),
119        };
120    }
121}