async_rs/implementors/
smol.rs1use crate::{
4 Runtime,
5 sys::AsSysFd,
6 traits::{Executor, Reactor, RuntimeKit, Task},
7 util::{IOHandle, UnitFuture},
8};
9use async_trait::async_trait;
10use futures_core::Stream;
11use futures_io::{AsyncRead, AsyncWrite};
12use smol::{Async, Timer};
13use std::{
14 future::Future,
15 io::{self, Read, Write},
16 net::{SocketAddr, TcpStream},
17 pin::Pin,
18 task::{Context, Poll},
19 time::{Duration, Instant},
20};
21
22pub type SmolRuntime = Runtime<Smol>;
24
25impl SmolRuntime {
26 pub fn smol() -> Self {
28 Self::new(Smol)
29 }
30}
31
32#[derive(Debug, Default, Copy, Clone, PartialEq, Eq, PartialOrd, Ord)]
34pub struct Smol;
35
36struct STask<T: Send + 'static>(Option<smol::Task<T>>);
37
38impl RuntimeKit for Smol {}
39
40impl Executor for Smol {
41 fn block_on<T, F: Future<Output = T>>(&self, f: F) -> T {
42 smol::block_on(f)
43 }
44
45 fn spawn<T: Send + 'static, F: Future<Output = T> + Send + 'static>(
46 &self,
47 f: F,
48 ) -> impl Task<T> + 'static {
49 STask(Some(smol::spawn(f)))
50 }
51
52 fn spawn_blocking<T: Send + 'static, F: FnOnce() -> T + Send + 'static>(
53 &self,
54 f: F,
55 ) -> impl Task<T> + 'static {
56 STask(Some(smol::unblock(f)))
57 }
58}
59
60#[async_trait]
61impl<T: Send + 'static> Task<T> for STask<T> {
62 async fn cancel(&mut self) -> Option<T> {
63 self.0.take()?.cancel().await
64 }
65}
66
67impl<T: Send + 'static> Drop for STask<T> {
68 fn drop(&mut self) {
69 if let Some(task) = self.0.take() {
70 task.detach();
71 }
72 }
73}
74
75impl<T: Send + 'static> Future for STask<T> {
76 type Output = T;
77
78 fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
79 Pin::new(self.0.as_mut().expect("task canceled")).poll(cx)
80 }
81}
82
83impl Reactor for Smol {
84 type TcpStream = Async<TcpStream>;
85
86 fn register<H: Read + Write + AsSysFd + Send + 'static>(
87 &self,
88 socket: H,
89 ) -> io::Result<impl AsyncRead + AsyncWrite + Send + Unpin + 'static> {
90 Async::new(IOHandle::new(socket))
91 }
92
93 fn sleep(&self, dur: Duration) -> impl Future<Output = ()> + Send + 'static {
94 UnitFuture(Timer::after(dur))
95 }
96
97 fn interval(&self, dur: Duration) -> impl Stream<Item = Instant> + Send + 'static {
98 Timer::interval(dur)
99 }
100
101 fn tcp_connect(
102 &self,
103 addr: SocketAddr,
104 ) -> impl Future<Output = io::Result<Self::TcpStream>> + Send + 'static {
105 Async::<TcpStream>::connect(addr)
106 }
107}
108
109#[cfg(test)]
110mod tests {
111 use super::*;
112
113 #[test]
114 fn dyn_compat() {
115 struct Test {
116 _executor: Box<dyn Executor>,
117 _reactor: Box<dyn Reactor<TcpStream = Async<TcpStream>>>,
118 _kit: Box<dyn RuntimeKit<TcpStream = Async<TcpStream>>>,
119 _task: Box<dyn Task<String>>,
120 }
121
122 let _ = Test {
123 _executor: Box::new(Smol),
124 _reactor: Box::new(Smol),
125 _kit: Box::new(Smol),
126 _task: Box::new(STask(None)),
127 };
128 }
129}