1use std::cell::RefCell;
10use std::rc::Rc;
11use std::sync::atomic::{AtomicBool, Ordering};
12use std::sync::mpsc;
13use std::sync::Arc;
14use std::time::Duration;
15
16pub trait ChannelDrain {
19 fn drain(&self);
21 fn clear(&self);
23 fn has_messages(&self) -> bool;
25}
26
27pub struct WakingSender<T> {
56 tx: mpsc::Sender<T>,
57 wake: Arc<AtomicBool>,
58}
59
60impl<T> Clone for WakingSender<T> {
61 fn clone(&self) -> Self {
62 Self {
63 tx: self.tx.clone(),
64 wake: Arc::clone(&self.wake),
65 }
66 }
67}
68
69impl<T> WakingSender<T> {
70 pub fn send(&self, value: T) -> Result<(), mpsc::SendError<T>> {
72 let result = self.tx.send(value);
73 if result.is_ok() {
74 self.wake.store(true, Ordering::Release);
75 }
76 result
77 }
78}
79
80pub struct ChannelHandle<T> {
81 inner: Rc<ChannelInner<T>>,
82}
83
84struct ChannelInner<T> {
85 tx: mpsc::Sender<T>,
86 wake: Arc<AtomicBool>,
87 rx: RefCell<mpsc::Receiver<T>>,
88 messages: RefCell<Vec<T>>,
89}
90
91impl<T> Clone for ChannelHandle<T> {
92 fn clone(&self) -> Self {
93 Self {
94 inner: Rc::clone(&self.inner),
95 }
96 }
97}
98
99impl<T: 'static> ChannelHandle<T> {
100 pub fn new(wake: Arc<AtomicBool>) -> Self {
102 let (tx, rx) = mpsc::channel();
103 Self {
104 inner: Rc::new(ChannelInner {
105 tx,
106 wake,
107 rx: RefCell::new(rx),
108 messages: RefCell::new(Vec::new()),
109 }),
110 }
111 }
112
113 pub fn tx(&self) -> WakingSender<T> {
115 WakingSender {
116 tx: self.inner.tx.clone(),
117 wake: Arc::clone(&self.inner.wake),
118 }
119 }
120
121 pub fn get(&self) -> Vec<T>
123 where
124 T: Clone,
125 {
126 self.inner.messages.borrow().clone()
127 }
128
129 pub fn len(&self) -> usize {
131 self.inner.messages.borrow().len()
132 }
133
134 pub fn is_empty(&self) -> bool {
136 self.inner.messages.borrow().is_empty()
137 }
138}
139
140impl<T: 'static> ChannelDrain for ChannelHandle<T> {
141 fn drain(&self) {
142 let rx = self.inner.rx.borrow();
143 let mut messages = self.inner.messages.borrow_mut();
144 loop {
145 match rx.try_recv() {
146 Ok(msg) => messages.push(msg),
147 Err(mpsc::TryRecvError::Empty) => break,
148 Err(mpsc::TryRecvError::Disconnected) => break,
149 }
150 }
151 }
152
153 fn clear(&self) {
154 self.inner.messages.borrow_mut().clear();
155 }
156
157 fn has_messages(&self) -> bool {
158 !self.inner.messages.borrow().is_empty()
159 }
160}
161
162pub struct PortHandle<In, Out> {
183 pub rx: ChannelHandle<In>,
185 outbound_tx: mpsc::Sender<Out>,
187 outbound_rx: Rc<RefCell<Option<mpsc::Receiver<Out>>>>,
189}
190
191impl<In, Out> Clone for PortHandle<In, Out> {
192 fn clone(&self) -> Self {
193 Self {
194 rx: self.rx.clone(),
195 outbound_tx: self.outbound_tx.clone(),
196 outbound_rx: Rc::clone(&self.outbound_rx),
197 }
198 }
199}
200
201impl<In: 'static, Out: 'static> PortHandle<In, Out> {
202 pub fn new(wake: Arc<AtomicBool>) -> Self {
204 let (outbound_tx, outbound_rx) = mpsc::channel();
205 Self {
206 rx: ChannelHandle::new(wake),
207 outbound_tx,
208 outbound_rx: Rc::new(RefCell::new(Some(outbound_rx))),
209 }
210 }
211
212 pub fn tx(&self) -> mpsc::Sender<Out> {
214 self.outbound_tx.clone()
215 }
216
217 pub fn take_outbound_rx(&self) -> Option<mpsc::Receiver<Out>> {
222 self.outbound_rx.borrow_mut().take()
223 }
224}
225
226pub(crate) struct IntervalHandle {
229 inner: Rc<IntervalInner>,
230}
231
232struct IntervalInner {
233 rx: RefCell<mpsc::Receiver<()>>,
234 callback: RefCell<Option<Rc<dyn Fn()>>>,
235 ticked: RefCell<bool>,
236}
237
238impl Clone for IntervalHandle {
239 fn clone(&self) -> Self {
240 Self {
241 inner: Rc::clone(&self.inner),
242 }
243 }
244}
245
246impl IntervalHandle {
247 pub(crate) fn new(duration: Duration, callback: Rc<dyn Fn()>, wake: Arc<AtomicBool>) -> Self {
249 let (tx, rx) = mpsc::channel();
250
251 std::thread::spawn(move || loop {
253 std::thread::sleep(duration);
254 if tx.send(()).is_err() {
255 break; }
257 wake.store(true, Ordering::Release);
258 });
259
260 Self {
261 inner: Rc::new(IntervalInner {
262 rx: RefCell::new(rx),
263 callback: RefCell::new(Some(callback)),
264 ticked: RefCell::new(false),
265 }),
266 }
267 }
268}
269
270impl ChannelDrain for IntervalHandle {
271 fn drain(&self) {
272 let rx = self.inner.rx.borrow();
273 let mut ticked = false;
274 loop {
275 match rx.try_recv() {
276 Ok(()) => ticked = true,
277 Err(mpsc::TryRecvError::Empty) => break,
278 Err(mpsc::TryRecvError::Disconnected) => break,
279 }
280 }
281 if ticked {
282 *self.inner.ticked.borrow_mut() = true;
283 if let Some(cb) = self.inner.callback.borrow().as_ref() {
284 cb();
285 }
286 }
287 }
288
289 fn clear(&self) {
290 *self.inner.ticked.borrow_mut() = false;
291 }
292
293 fn has_messages(&self) -> bool {
294 *self.inner.ticked.borrow()
295 }
296}