Skip to main content

acts_next/
signal.rs

1use std::sync::{Arc, Mutex};
2use tokio::sync::Notify;
3
4#[derive(Clone)]
5pub struct Signal<T> {
6    data: Arc<Mutex<T>>,
7    sig: Arc<Notify>,
8    is_closed: Arc<Mutex<bool>>,
9}
10
11impl<T: Clone> Signal<T> {
12    pub fn new(v: T) -> Self {
13        let sig = Arc::new(Notify::new());
14        let data = Arc::new(Mutex::new(v));
15        Self {
16            sig,
17            data,
18            is_closed: Arc::new(Mutex::new(false)),
19        }
20    }
21
22    pub fn send(&self, v: T) {
23        if *self.is_closed.lock().unwrap() {
24            return;
25        }
26        *self.data.lock().unwrap() = v;
27        self.close();
28    }
29
30    pub fn close(&self) {
31        *self.is_closed.lock().unwrap() = true;
32        self.sig.notify_one();
33    }
34    pub fn data(&self) -> T {
35        let data = self.data.lock().unwrap();
36        data.clone()
37    }
38
39    pub fn update<F: Fn(&mut T)>(&self, f: F) {
40        if *self.is_closed.lock().unwrap() {
41            return;
42        }
43        let mut data = self.data.lock().unwrap();
44        f(&mut data);
45    }
46
47    pub fn double(&self) -> (Self, Self) {
48        (self.clone(), self.clone())
49    }
50
51    pub fn triple(&self) -> (Self, Self, Self) {
52        (self.clone(), self.clone(), self.clone())
53    }
54
55    pub async fn recv(&self) -> T {
56        self.sig.notified().await;
57        let v = self.data.lock().unwrap();
58        v.clone()
59    }
60
61    pub async fn timeout(&self, millis: u64) -> T {
62        tokio::time::sleep(tokio::time::Duration::from_millis(millis)).await;
63        let v = self.data.lock().unwrap();
64        v.clone()
65    }
66}
67
68#[cfg(test)]
69mod tests {
70    use crate::Signal;
71
72    #[test]
73    fn engine_signal_new() {
74        let s = Signal::new(5);
75        assert_eq!(s.data(), 5);
76
77        let s = Signal::new("abc");
78        assert_eq!(s.data(), "abc");
79    }
80
81    #[tokio::test]
82    async fn engine_signal_send() {
83        let s = Signal::new(0);
84        let s2 = s.clone();
85        tokio::spawn(async move {
86            s.send(10);
87        });
88        let ret = s2.recv().await;
89        assert_eq!(ret, 10);
90    }
91
92    #[tokio::test]
93    async fn engine_signal_close() {
94        let s = Signal::new(0);
95        let s2 = s.clone();
96        tokio::spawn(async move {
97            s.close();
98        });
99        let ret = s2.recv().await;
100        assert_eq!(ret, 0);
101    }
102
103    #[tokio::test]
104    async fn engine_signal_update() {
105        let s = Signal::new(0);
106        let s2 = s.clone();
107        tokio::spawn(async move {
108            s.update(|data| *data = 100);
109            s.close();
110        });
111        let ret = s2.recv().await;
112        assert_eq!(ret, 100);
113    }
114
115    #[tokio::test]
116    async fn engine_signal_timeout() {
117        let s = Signal::new(0);
118        let s2 = s.clone();
119        tokio::spawn(async move {
120            s.update(|data| *data = 100);
121        });
122        let ret = s2.timeout(10).await;
123        assert_eq!(ret, 100);
124    }
125
126    #[tokio::test]
127    async fn engine_signal_double() {
128        let (s1, s2) = Signal::new(0).double();
129        tokio::spawn(async move {
130            s1.send(10);
131        });
132        let ret = s2.recv().await;
133        assert_eq!(ret, 10);
134    }
135
136    #[tokio::test]
137    async fn engine_signal_triple() {
138        let (s1, s2, s3) = Signal::new(0).triple();
139        tokio::spawn(async move {
140            s1.update(|data| *data = 100);
141            s2.close();
142        });
143        let ret = s3.recv().await;
144        assert_eq!(ret, 100);
145    }
146}