1use std::{
42 sync::{Arc, Mutex},
43 time::Duration,
44};
45
46pub use crossbeam::channel::RecvError;
47pub use crossbeam::channel::RecvTimeoutError;
48pub use crossbeam::channel::SendError;
49pub use crossbeam::channel::TryRecvError;
50
51pub struct Signal {
52 tx: crossbeam::channel::Sender<i32>,
53 rx: crossbeam::channel::Receiver<i32>,
54}
55
56impl Signal {
57 fn new() -> Self {
58 let (tx, rx) = crossbeam::channel::unbounded();
59 Self { tx, rx }
60 }
61}
62
63pub struct Sender<T> {
64 init: Mutex<bool>,
65 producer: Mutex<Option<SignalSender>>,
66 signal: ArcMutex2<OptionSignal>,
67 tx: crossbeam::channel::Sender<T>,
68 id: i32,
69}
70
71pub type SignalSender = crossbeam::channel::Sender<i32>;
72pub type OptionSignal = Option<Signal>;
73pub type ArcMutex<T> = Arc<Mutex<T>>;
74pub type ArcMutex2<T> = ArcMutex<ArcMutex<T>>;
75static UID: Mutex<i32> = Mutex::new(0);
76
77pub struct Receiver<T> {
78 signal: ArcMutex2<OptionSignal>,
79 rx: crossbeam::channel::Receiver<T>,
80 id: i32,
81}
82
83pub fn channel<T>() -> (Sender<T>, Receiver<T>) {
84 let inner = Arc::new(Mutex::new(None));
85 let signal = Arc::new(Mutex::new(inner));
86 let (tx, rx) = crossbeam::channel::unbounded();
87 let mut id = UID.lock().unwrap();
88 let next = *id;
89 *id += 1;
90 let receiver = Receiver {
91 signal,
92 rx,
93 id: next,
94 };
95 let sender = Sender {
96 producer: Mutex::new(None),
97 signal: receiver.signal.clone(),
98 tx,
99 id: next,
100 init: Mutex::new(false),
101 };
102 (sender, receiver)
103}
104
105impl<T> Clone for Sender<T> {
106 fn clone(&self) -> Self {
107 Self {
108 init: Mutex::new(false),
109 producer: Mutex::new(None),
110 signal: self.signal.clone(),
111 tx: self.tx.clone(),
112 id: self.id,
113 }
114 }
115}
116
117impl<T> Sender<T> {
118 pub fn send(&self, data: T) -> Result<(), SendError<T>> {
119 let mut init = self.init.lock().unwrap();
121 let mut producer = self.producer.lock().unwrap();
122 if !*init {
123 *init = true;
124 let inner = self.signal.lock().unwrap();
125 let signal = inner.lock().unwrap();
126 if signal.is_some() {
127 let tx = signal.as_ref().unwrap().tx.clone();
128 *producer = Some(tx);
129 }
130 }
131 let result = self.tx.send(data);
132 if let Some(signal) = &*producer {
133 let _ = signal.send(self.id);
134 }
135 return result;
136 }
137}
138
139impl<T> Receiver<T> {
140 pub fn id(&self) -> i32 {
142 self.id
143 }
144
145 pub fn recv(&self) -> Result<T, RecvError> {
146 self.rx.recv()
147 }
148
149 pub fn recv_timeout(
150 &self,
151 timeout: Duration,
152 ) -> Result<T, crossbeam::channel::RecvTimeoutError> {
153 self.rx.recv_timeout(timeout)
154 }
155
156 pub fn try_recv(&self) -> Result<T, TryRecvError> {
157 self.rx.try_recv()
158 }
159
160 pub fn len(&self) -> usize {
161 self.rx.len()
162 }
163}
164
165pub trait Pollable {
166 fn signal(&self) -> ArcMutex2<OptionSignal>;
168 fn id(&self) -> i32;
170}
171
172impl<T> Pollable for Receiver<T> {
173 fn signal(&self) -> ArcMutex2<OptionSignal> {
174 self.signal.clone()
175 }
176
177 fn id(&self) -> i32 {
178 self.id
179 }
180}
181
182pub struct Poll {
183 signal: ArcMutex<OptionSignal>,
184}
185
186impl Poll {
187 pub fn new() -> Self {
188 let instance = Signal::new();
189 let inner = Arc::new(Mutex::new(Some(instance)));
190 Self { signal: inner }
191 }
192
193 pub fn append<T: Pollable>(&self, receivers: &[&T]) {
195 for i in receivers {
196 self.add(*i);
197 }
198 }
199
200 pub fn add<T: Pollable>(&self, receiver: &T) {
202 let outer = receiver.signal();
203 let mut inner = outer.lock().unwrap();
204 *inner = self.signal.clone();
205 }
206
207 pub fn poll(&self, timeout: f32) -> i32 {
209 let timeout = Duration::from_nanos((timeout * 1e9) as u64);
210 let signal = self.signal.lock().unwrap();
212 signal
213 .as_ref()
214 .unwrap()
215 .rx
216 .recv_timeout(timeout)
217 .unwrap_or(-1)
218 }
219}