tokio_go/
lib.rs

1pub mod prelude {
2    pub use std::sync::{Arc, RwLock};
3    pub use tokio::runtime::Runtime;
4    pub use tokio::sync::oneshot::{channel, Sender};
5    pub use tokio::time::{interval, sleep, Duration};
6    const RUNTIME_INIT: Option<Runtime> = None;
7    lazy_static! {
8        pub static ref RUNTIMES: Arc<RwLock<[Option<Runtime>; 256]>> =
9            Arc::new(RwLock::new([RUNTIME_INIT; 256]));
10    }
11    #[derive(Debug)]
12    pub struct Context {
13        pub profile: u8,
14        pub timeout: Duration,
15    }
16
17    pub fn init_runtime(profile: u8) {
18        {
19            let r = RUNTIMES.read().unwrap();
20            if r[profile as usize].is_some() {
21                return;
22            }
23        }
24        let mut w = RUNTIMES.write().unwrap();
25        if w[profile as usize].is_none() {
26            w[profile as usize] = Some(Runtime::new().unwrap());
27        }
28    }
29}
30/// support running a async closure in default or specified tokio runtime
31/// # Example:
32/// using default runtime, without timeout
33/// ```
34/// use tokio_go::prelude::*;
35/// use tokio_go::go;
36/// #[tokio::main]
37/// async fn main(){
38/// let r = go!(|tx: Sender<i32>|async move{
39///   tx.send(1);
40/// }).await;
41/// assert_eq!(r.unwrap(),1);
42/// }
43/// ```
44///
45/// using specified runtime (identified by context.profile), with timeout Duration
46///
47/// ```
48/// use tokio_go::prelude::*;
49/// use tokio_go::go;
50/// #[tokio::main]
51/// async fn main(){
52/// let r = go!(|tx: Sender<String>|async move{
53///     sleep(Duration::from_secs(2)).await;
54///     tx.send("whocares".to_string());
55/// },
56/// Context{
57///     profile: 1,
58///     timeout: Duration::from_secs(1),
59/// }
60/// ).await;
61/// assert_eq!(r.is_ok(),false);
62/// }
63/// ```
64
65#[macro_export]
66macro_rules! go {
67    (|$x:ident : Sender<$t:ty>|$y:expr) => {
68        async {
69            let (sender, receiver) = channel::<$t>();
70            init_runtime(0);
71            let rts = RUNTIMES.read().unwrap();
72            let runtime = rts[0].as_ref().unwrap();
73            runtime.spawn((|$x: Sender<$t>| $y)(sender));
74            match receiver.await {
75                Ok(v) => Ok(v),
76                Err(_) => Err("unknown error"),
77            }
78        }
79    };
80    (|$x:ident : Sender<$t:ty>|$y:expr,$c:expr) => {
81        async {
82            let (sender, mut receiver) = channel::<$t>();
83            init_runtime($c.profile);
84            let rts = RUNTIMES.read().unwrap();
85            let runtime = rts[$c.profile as usize].as_ref().unwrap();
86            runtime.spawn((|$x: Sender<$t>| $y)(sender));
87            match $c.timeout {
88                Duration::ZERO => match receiver.await {
89                    Ok(v) => Ok(v),
90                    Err(_) => Err("unknown error"),
91                },
92                _ => {
93                    let mut interval = interval($c.timeout);
94                    tokio::select! {
95                        _=interval.tick()=>Err("timeout"),
96                        msg=&mut receiver =>match msg{
97                            Ok(v)=>Ok(v),
98                            Err(_)=>Err("unknown error")
99                        }
100
101                    }
102                }
103            }
104        }
105    };
106}
107
108#[macro_use]
109extern crate lazy_static;
110
111#[cfg(test)]
112mod tests {
113    use super::prelude::*;
114    use super::*;
115    use std::thread;
116
117    #[tokio::test]
118    async fn it_works() {
119        let r1 = go!(|sender: Sender<i32>| async move {
120            println!("Thread id: {:?}", thread::current().id());
121            if let Err(_) = sender.send(2) {
122                println!("the receiver dropped");
123            }
124        })
125        .await
126        .unwrap();
127        assert_eq!(r1, 2);
128        let r2 = go!(
129            |sender: Sender<String>| async move {
130                println!("Thread id: {:?}", thread::current().id());
131                if let Err(_) = sender.send("whocares".to_string()) {
132                    println!("the receiver dropped");
133                }
134            },
135            Context {
136                profile: 1,
137                timeout: Duration::from_secs(1)
138            }
139        )
140        .await
141        .unwrap();
142        assert_eq!(r2, "whocares");
143        let r3 = go!(
144            |sender: Sender<String>| async move {
145                println!("Thread id: {:?}", thread::current().id());
146                if let Err(_) = sender.send("whocares".to_string()) {
147                    println!("the receiver dropped");
148                }
149            },
150            Context {
151                profile: 1,
152                timeout: Duration::ZERO
153            }
154        )
155        .await
156        .unwrap();
157        assert_eq!(r3, "whocares");
158        let r4 = go!(|sender: Sender<()>| async move {
159            println!("Thread id: {:?}", thread::current().id());
160            if let Err(_) = sender.send(()) {
161                println!("the receiver dropped");
162            }
163        })
164        .await
165        .unwrap();
166        assert_eq!(r4, ());
167    }
168}