1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
mod prelude {
    pub use std::sync::{Arc, RwLock};
    pub use tokio::runtime::Runtime;
    pub use tokio::sync::oneshot::{channel, Sender};
    pub use tokio::time::{sleep, timeout, Duration};
}
use prelude::*;

#[macro_use]
extern crate lazy_static;

const RUNTIME_INIT: Option<Runtime> = None;

lazy_static! {
    static ref RUNTIMES: Arc<RwLock<[Option<Runtime>; 256]>> =
        Arc::new(RwLock::new([RUNTIME_INIT; 256]));
}

#[derive(Debug)]
pub struct Context {
    pub profile: u8,
    pub timeout: Duration,
}

pub fn init_runtime(profile: u8) {
    {
        let r = RUNTIMES.read().unwrap();
        if r[profile as usize].is_some() {
            return;
        }
    }
    let mut w = RUNTIMES.write().unwrap();
    if w[profile as usize].is_none() {
        w[profile as usize] = Some(Runtime::new().unwrap());
    }
}

#[macro_export]
macro_rules! go {
    (|$x:ident : Sender<$t:ty>|$y:expr) => {
        async {
            let (sender, receiver) = channel::<$t>();
            init_runtime(0);
            let rts = RUNTIMES.read().unwrap();
            let runtime = rts[0].as_ref().unwrap();
            runtime.spawn((|$x: Sender<$t>| $y)(sender));
            match receiver.await {
                Ok(v) => Ok(v),
                Err(_) => Err("unknown error"),
            }
        }
    };
    (|$x:ident : Sender<$t:ty>|$y:expr,$c:expr) => {
        async {
            let (sender, receiver) = channel::<$t>();
            init_runtime($c.profile);
            let rts = RUNTIMES.read().unwrap();
            let runtime = rts[$c.profile as usize].as_ref().unwrap();
            runtime.spawn((|$x: Sender<$t>| $y)(sender));
            match timeout($c.timeout, receiver).await {
                Err(_) => Err("timeout"),
                Ok(v) => Ok(v.unwrap()),
            }
        }
    };
}

#[cfg(test)]
mod tests {
    use super::*;
    use std::thread;

    #[tokio::test]
    async fn it_works() {
        let r1 = go!(|sender: Sender<i32>| async move {
            println!("Thread id: {:?}", thread::current().id());
            if let Err(_) = sender.send(2) {
                println!("the receiver dropped");
            }
        })
        .await
        .unwrap();
        assert_eq!(r1, 2);
        let r2 = go!(
            |sender: Sender<String>| async move {
                println!("Thread id: {:?}", thread::current().id());
                if let Err(_) = sender.send("shit".to_string()) {
                    println!("the receiver dropped");
                }
            },
            Context {
                profile: 1,
                timeout: Duration::from_secs(1)
            }
        )
        .await
        .unwrap();
        assert_eq!(r2, "shit");
        let r3 = go!(|sender: Sender<()>| async move {
            println!("Thread id: {:?}", thread::current().id());
            if let Err(_) = sender.send(()) {
                println!("the receiver dropped");
            }
        })
        .await
        .unwrap();
        assert_eq!(r3, ());
    }
}