async_rs/implementors/
smol.rs

1//! smol implementation of async runtime definition traits
2
3use crate::{
4    Runtime,
5    sys::AsSysFd,
6    traits::{Executor, Reactor, RuntimeKit, Task},
7    util::{IOHandle, UnitFuture},
8};
9use async_trait::async_trait;
10use futures_core::Stream;
11use futures_io::{AsyncRead, AsyncWrite};
12use smol::{Async, Timer};
13use std::{
14    future::Future,
15    io::{self, Read, Write},
16    net::{SocketAddr, TcpStream},
17    pin::Pin,
18    task::{Context, Poll},
19    time::{Duration, Instant},
20};
21
22/// Type alias for the smol runtime
23pub type SmolRuntime = Runtime<Smol>;
24
25impl SmolRuntime {
26    /// Create a new SmolRuntime
27    pub fn smol() -> Self {
28        Self::new(Smol)
29    }
30}
31
32/// Dummy object implementing async common interfaces on top of smol
33#[derive(Debug, Default, Copy, Clone, PartialEq, Eq, PartialOrd, Ord)]
34pub struct Smol;
35
36struct STask<T: Send + 'static>(Option<smol::Task<T>>);
37
38impl RuntimeKit for Smol {}
39
40impl Executor for Smol {
41    fn block_on<T, F: Future<Output = T>>(&self, f: F) -> T {
42        smol::block_on(f)
43    }
44
45    fn spawn<T: Send + 'static, F: Future<Output = T> + Send + 'static>(
46        &self,
47        f: F,
48    ) -> impl Task<T> + 'static {
49        STask(Some(smol::spawn(f)))
50    }
51
52    fn spawn_blocking<T: Send + 'static, F: FnOnce() -> T + Send + 'static>(
53        &self,
54        f: F,
55    ) -> impl Task<T> + 'static {
56        STask(Some(smol::unblock(f)))
57    }
58}
59
60#[async_trait]
61impl<T: Send + 'static> Task<T> for STask<T> {
62    async fn cancel(&mut self) -> Option<T> {
63        self.0.take()?.cancel().await
64    }
65}
66
67impl<T: Send + 'static> Drop for STask<T> {
68    fn drop(&mut self) {
69        if let Some(task) = self.0.take() {
70            task.detach();
71        }
72    }
73}
74
75impl<T: Send + 'static> Future for STask<T> {
76    type Output = T;
77
78    fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
79        Pin::new(self.0.as_mut().expect("task canceled")).poll(cx)
80    }
81}
82
83impl Reactor for Smol {
84    type TcpStream = Async<TcpStream>;
85
86    fn register<H: Read + Write + AsSysFd + Send + 'static>(
87        &self,
88        socket: H,
89    ) -> io::Result<impl AsyncRead + AsyncWrite + Send + Unpin + 'static> {
90        Async::new(IOHandle::new(socket))
91    }
92
93    fn sleep(&self, dur: Duration) -> impl Future<Output = ()> + Send + 'static {
94        UnitFuture(Timer::after(dur))
95    }
96
97    fn interval(&self, dur: Duration) -> impl Stream<Item = Instant> + Send + 'static {
98        Timer::interval(dur)
99    }
100
101    fn tcp_connect(
102        &self,
103        addr: SocketAddr,
104    ) -> impl Future<Output = io::Result<Self::TcpStream>> + Send + 'static {
105        Async::<TcpStream>::connect(addr)
106    }
107}
108
109#[cfg(test)]
110mod tests {
111    use super::*;
112
113    #[test]
114    fn dyn_compat() {
115        struct Test {
116            _executor: Box<dyn Executor>,
117            _reactor: Box<dyn Reactor<TcpStream = Async<TcpStream>>>,
118            _kit: Box<dyn RuntimeKit<TcpStream = Async<TcpStream>>>,
119            _task: Box<dyn Task<String>>,
120        }
121
122        let _ = Test {
123            _executor: Box::new(Smol),
124            _reactor: Box::new(Smol),
125            _kit: Box::new(Smol),
126            _task: Box::new(STask(None)),
127        };
128    }
129}