1use std::sync::{Arc, Mutex};
2use tokio::sync::Notify;
3
4#[derive(Clone)]
5pub struct Signal<T> {
6 data: Arc<Mutex<T>>,
7 sig: Arc<Notify>,
8 is_closed: Arc<Mutex<bool>>,
9}
10
11impl<T: Clone> Signal<T> {
12 pub fn new(v: T) -> Self {
13 let sig = Arc::new(Notify::new());
14 let data = Arc::new(Mutex::new(v));
15 Self {
16 sig,
17 data,
18 is_closed: Arc::new(Mutex::new(false)),
19 }
20 }
21
22 pub fn send(&self, v: T) {
23 if *self.is_closed.lock().unwrap() {
24 return;
25 }
26 *self.data.lock().unwrap() = v;
27 self.close();
28 }
29
30 pub fn close(&self) {
31 *self.is_closed.lock().unwrap() = true;
32 self.sig.notify_one();
33 }
34 pub fn data(&self) -> T {
35 let data = self.data.lock().unwrap();
36 data.clone()
37 }
38
39 pub fn update<F: Fn(&mut T)>(&self, f: F) {
40 if *self.is_closed.lock().unwrap() {
41 return;
42 }
43 let mut data = self.data.lock().unwrap();
44 f(&mut data);
45 }
46
47 pub fn double(&self) -> (Self, Self) {
48 (self.clone(), self.clone())
49 }
50
51 pub fn triple(&self) -> (Self, Self, Self) {
52 (self.clone(), self.clone(), self.clone())
53 }
54
55 pub async fn recv(&self) -> T {
56 self.sig.notified().await;
57 let v = self.data.lock().unwrap();
58 v.clone()
59 }
60
61 pub async fn timeout(&self, millis: u64) -> T {
62 tokio::time::sleep(tokio::time::Duration::from_millis(millis)).await;
63 let v = self.data.lock().unwrap();
64 v.clone()
65 }
66}
67
68#[cfg(test)]
69mod tests {
70 use crate::Signal;
71
72 #[test]
73 fn engine_signal_new() {
74 let s = Signal::new(5);
75 assert_eq!(s.data(), 5);
76
77 let s = Signal::new("abc");
78 assert_eq!(s.data(), "abc");
79 }
80
81 #[tokio::test]
82 async fn engine_signal_send() {
83 let s = Signal::new(0);
84 let s2 = s.clone();
85 tokio::spawn(async move {
86 s.send(10);
87 });
88 let ret = s2.recv().await;
89 assert_eq!(ret, 10);
90 }
91
92 #[tokio::test]
93 async fn engine_signal_close() {
94 let s = Signal::new(0);
95 let s2 = s.clone();
96 tokio::spawn(async move {
97 s.close();
98 });
99 let ret = s2.recv().await;
100 assert_eq!(ret, 0);
101 }
102
103 #[tokio::test]
104 async fn engine_signal_update() {
105 let s = Signal::new(0);
106 let s2 = s.clone();
107 tokio::spawn(async move {
108 s.update(|data| *data = 100);
109 s.close();
110 });
111 let ret = s2.recv().await;
112 assert_eq!(ret, 100);
113 }
114
115 #[tokio::test]
116 async fn engine_signal_timeout() {
117 let s = Signal::new(0);
118 let s2 = s.clone();
119 tokio::spawn(async move {
120 s.update(|data| *data = 100);
121 });
122 let ret = s2.timeout(10).await;
123 assert_eq!(ret, 100);
124 }
125
126 #[tokio::test]
127 async fn engine_signal_double() {
128 let (s1, s2) = Signal::new(0).double();
129 tokio::spawn(async move {
130 s1.send(10);
131 });
132 let ret = s2.recv().await;
133 assert_eq!(ret, 10);
134 }
135
136 #[tokio::test]
137 async fn engine_signal_triple() {
138 let (s1, s2, s3) = Signal::new(0).triple();
139 tokio::spawn(async move {
140 s1.update(|data| *data = 100);
141 s2.close();
142 });
143 let ret = s3.recv().await;
144 assert_eq!(ret, 100);
145 }
146}