async_rs/implementors/
smol.rs

1//! smol implementation of async runtime definition traits
2
3use crate::{
4    Runtime,
5    sys::IO,
6    traits::{AsyncIOHandle, Executor, Reactor, RuntimeKit, Task},
7    util::{IOHandle, UnitFuture},
8};
9use async_trait::async_trait;
10use futures_core::Stream;
11use smol::{Async, Timer};
12use std::{
13    future::Future,
14    io,
15    net::{SocketAddr, TcpStream},
16    pin::Pin,
17    task::{Context, Poll},
18    time::{Duration, Instant},
19};
20
21/// Type alias for the smol runtime
22pub type SmolRuntime = Runtime<Smol>;
23
24impl SmolRuntime {
25    /// Create a new SmolRuntime
26    pub fn smol() -> Self {
27        Self::new(Smol)
28    }
29}
30
31/// Dummy object implementing async common interfaces on top of smol
32#[derive(Debug, Default, Copy, Clone, PartialEq, Eq, PartialOrd, Ord)]
33pub struct Smol;
34
35struct STask<T: Send>(Option<smol::Task<T>>);
36
37impl RuntimeKit for Smol {}
38
39impl Executor for Smol {
40    fn block_on<T, F: Future<Output = T>>(&self, f: F) -> T {
41        smol::block_on(f)
42    }
43
44    fn spawn<T: Send + 'static>(
45        &self,
46        f: impl Future<Output = T> + Send + 'static,
47    ) -> impl Task<T> {
48        STask(Some(smol::spawn(f)))
49    }
50
51    fn spawn_blocking<F: FnOnce() -> T + Send + 'static, T: Send + 'static>(
52        &self,
53        f: F,
54    ) -> impl Task<T> {
55        STask(Some(smol::unblock(f)))
56    }
57}
58
59#[async_trait(?Send)]
60impl<T: Send> Task<T> for STask<T> {
61    async fn cancel(&mut self) -> Option<T> {
62        self.0.take()?.cancel().await
63    }
64}
65
66impl<T: Send> Drop for STask<T> {
67    fn drop(&mut self) {
68        if let Some(task) = self.0.take() {
69            task.detach();
70        }
71    }
72}
73
74impl<T: Send> Future for STask<T> {
75    type Output = T;
76
77    fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
78        Pin::new(self.0.as_mut().expect("task canceled")).poll(cx)
79    }
80}
81
82impl Reactor for Smol {
83    fn register<H: IO + Send + 'static>(
84        &self,
85        socket: IOHandle<H>,
86    ) -> io::Result<impl AsyncIOHandle + Send> {
87        Async::new(socket)
88    }
89
90    fn sleep(&self, dur: Duration) -> impl Future<Output = ()> {
91        UnitFuture(Timer::after(dur))
92    }
93
94    fn interval(&self, dur: Duration) -> impl Stream<Item = Instant> {
95        Timer::interval(dur)
96    }
97
98    fn tcp_connect(
99        &self,
100        addr: SocketAddr,
101    ) -> impl Future<Output = io::Result<impl AsyncIOHandle + Send>> + Send {
102        Async::<TcpStream>::connect(addr)
103    }
104}
105
106#[cfg(test)]
107mod tests {
108    use super::*;
109
110    #[test]
111    fn dyn_compat() {
112        struct Test {
113            _executor: Box<dyn Executor>,
114            _reactor: Box<dyn Reactor>,
115            _kit: Box<dyn RuntimeKit>,
116            _task: Box<dyn Task<String>>,
117        }
118
119        let _ = Test {
120            _executor: Box::new(Smol),
121            _reactor: Box::new(Smol),
122            _kit: Box::new(Smol),
123            _task: Box::new(STask(None)),
124        };
125    }
126}