1use crate::network::{self, NetworkController, NetworkProcessor, NetEvent, Endpoint, ResourceId};
2use crate::events::{self, EventSender, EventReceiver};
3use crate::util::thread::{NamespacedThread, OTHER_THREAD_ERR};
4
5use std::sync::{
6 Arc, Mutex,
7 atomic::{AtomicBool, Ordering},
8};
9use std::time::{Duration};
10use std::collections::{VecDeque};
11
12lazy_static::lazy_static! {
13 static ref SAMPLING_TIMEOUT: Duration = Duration::from_millis(50);
14}
15
16pub enum NodeEvent<'a, S> {
19 Network(NetEvent<'a>),
22
23 Signal(S),
28}
29
30impl<S: std::fmt::Debug> std::fmt::Debug for NodeEvent<'_, S> {
31 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
32 match self {
33 NodeEvent::Network(net_event) => write!(f, "NodeEvent::Network({net_event:?})"),
34 NodeEvent::Signal(signal) => write!(f, "NodeEvent::Signal({signal:?})"),
35 }
36 }
37}
38
39impl<'a, S> NodeEvent<'a, S> {
40 pub fn network(self) -> NetEvent<'a> {
42 match self {
43 NodeEvent::Network(net_event) => net_event,
44 NodeEvent::Signal(..) => panic!("NodeEvent must be a NetEvent"),
45 }
46 }
47
48 pub fn signal(self) -> S {
50 match self {
51 NodeEvent::Network(..) => panic!("NodeEvent must be a Signal"),
52 NodeEvent::Signal(signal) => signal,
53 }
54 }
55}
56
57#[derive(Clone)]
61pub enum StoredNodeEvent<S> {
62 Network(StoredNetEvent),
65
66 Signal(S),
71}
72
73impl<S: std::fmt::Debug> std::fmt::Debug for StoredNodeEvent<S> {
74 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
75 match self {
76 StoredNodeEvent::Network(net_event) => write!(f, "NodeEvent::Network({net_event:?})"),
77 StoredNodeEvent::Signal(signal) => write!(f, "NodeEvent::Signal({signal:?})"),
78 }
79 }
80}
81
82impl<S> StoredNodeEvent<S> {
83 pub fn network(self) -> StoredNetEvent {
85 match self {
86 StoredNodeEvent::Network(net_event) => net_event,
87 StoredNodeEvent::Signal(..) => panic!("NodeEvent must be a NetEvent"),
88 }
89 }
90
91 pub fn signal(self) -> S {
93 match self {
94 StoredNodeEvent::Network(..) => panic!("NodeEvent must be a Signal"),
95 StoredNodeEvent::Signal(signal) => signal,
96 }
97 }
98}
99
100impl<S> From<NodeEvent<'_, S>> for StoredNodeEvent<S> {
101 fn from(node_event: NodeEvent<'_, S>) -> Self {
102 match node_event {
103 NodeEvent::Network(net_event) => StoredNodeEvent::Network(net_event.into()),
104 NodeEvent::Signal(signal) => StoredNodeEvent::Signal(signal),
105 }
106 }
107}
108
109#[derive(Debug, Clone)]
113pub enum StoredNetEvent {
114 Connected(Endpoint, bool),
115 Accepted(Endpoint, ResourceId),
116 Message(Endpoint, Vec<u8>),
117 Disconnected(Endpoint),
118}
119
120impl From<NetEvent<'_>> for StoredNetEvent {
121 fn from(net_event: NetEvent<'_>) -> Self {
122 match net_event {
123 NetEvent::Connected(endpoint, status) => Self::Connected(endpoint, status),
124 NetEvent::Accepted(endpoint, id) => Self::Accepted(endpoint, id),
125 NetEvent::Message(endpoint, data) => Self::Message(endpoint, Vec::from(data)),
126 NetEvent::Disconnected(endpoint) => Self::Disconnected(endpoint),
127 }
128 }
129}
130
131impl StoredNetEvent {
132 pub fn borrow(&self) -> NetEvent<'_> {
134 match self {
135 Self::Connected(endpoint, status) => NetEvent::Connected(*endpoint, *status),
136 Self::Accepted(endpoint, id) => NetEvent::Accepted(*endpoint, *id),
137 Self::Message(endpoint, data) => NetEvent::Message(*endpoint, data),
138 Self::Disconnected(endpoint) => NetEvent::Disconnected(*endpoint),
139 }
140 }
141}
142
143pub fn split<S: Send>() -> (NodeHandler<S>, NodeListener<S>) {
181 let (network_controller, network_processor) = network::split();
182 let (signal_sender, signal_receiver) = events::split();
183 let running = AtomicBool::new(true);
184
185 let handler = NodeHandler(Arc::new(NodeHandlerImpl {
186 network: network_controller,
187 signals: signal_sender,
188 running,
189 }));
190
191 let listener = NodeListener::new(network_processor, signal_receiver, handler.clone());
192
193 (handler, listener)
194}
195
196struct NodeHandlerImpl<S> {
197 network: NetworkController,
198 signals: EventSender<S>,
199 running: AtomicBool,
200}
201
202pub struct NodeHandler<S>(Arc<NodeHandlerImpl<S>>);
205
206impl<S> NodeHandler<S> {
207 pub fn network(&self) -> &NetworkController {
210 &self.0.network
211 }
212
213 pub fn signals(&self) -> &EventSender<S> {
218 &self.0.signals
219 }
220
221 pub fn stop(&self) {
224 self.0.running.store(false, Ordering::Relaxed);
225 }
226
227 pub fn is_running(&self) -> bool {
231 self.0.running.load(Ordering::Relaxed)
232 }
233}
234
235impl<S: Send + 'static> Clone for NodeHandler<S> {
236 fn clone(&self) -> Self {
237 Self(self.0.clone())
238 }
239}
240
241pub struct NodeListener<S: Send + 'static> {
243 network_cache_thread: NamespacedThread<(NetworkProcessor, VecDeque<StoredNetEvent>)>,
244 cache_running: Arc<AtomicBool>,
245 signal_receiver: EventReceiver<S>,
246 handler: NodeHandler<S>,
247}
248
249impl<S: Send + 'static> NodeListener<S> {
250 fn new(
251 mut network_processor: NetworkProcessor,
252 signal_receiver: EventReceiver<S>,
253 handler: NodeHandler<S>,
254 ) -> NodeListener<S> {
255 let cache_running = Arc::new(AtomicBool::new(true));
259 let network_cache_thread = {
260 let cache_running = cache_running.clone();
261 let mut cache = VecDeque::new();
262 NamespacedThread::spawn("node-network-cache-thread", move || {
263 while cache_running.load(Ordering::Relaxed) {
264 network_processor.process_poll_event(Some(*SAMPLING_TIMEOUT), |net_event| {
265 log::trace!("Cached {:?}", net_event);
266 cache.push_back(net_event.into());
267 });
268 }
269 (network_processor, cache)
270 })
271 };
272
273 NodeListener { network_cache_thread, cache_running, signal_receiver, handler }
274 }
275
276 pub fn for_each(mut self, mut event_callback: impl FnMut(NodeEvent<S>)) {
298 self.cache_running.store(false, Ordering::Relaxed);
300 let (mut network_processor, mut cache) = self.network_cache_thread.join();
301
302 while let Some(event) = cache.pop_front() {
304 let net_event = event.borrow();
305 log::trace!("Read from cache {:?}", net_event);
306 event_callback(NodeEvent::Network(net_event));
307 if !self.handler.is_running() {
308 return;
309 }
310 }
311
312 crossbeam_utils::thread::scope(|scope| {
313 let multiplexed = Arc::new(Mutex::new(event_callback));
314
315 let _signal_thread = {
316 let mut signal_receiver = std::mem::take(&mut self.signal_receiver);
317 let handler = self.handler.clone();
318
319 #[allow(clippy::type_complexity)]
326 struct SendableEventCallback<'a, S>(Arc<Mutex<dyn FnMut(NodeEvent<S>) + 'a>>);
327 #[allow(clippy::non_send_fields_in_send_ty)]
328 unsafe impl<S> Send for SendableEventCallback<'_, S> {}
329
330 let multiplexed = SendableEventCallback(multiplexed.clone());
331
332 scope
333 .builder()
334 .name(String::from("node-network-thread"))
335 .spawn(move |_| {
336 while handler.is_running() {
337 if let Some(signal) = signal_receiver.receive_timeout(*SAMPLING_TIMEOUT)
338 {
339 let mut event_callback =
340 multiplexed.0.lock().expect(OTHER_THREAD_ERR);
341 if handler.is_running() {
342 event_callback(NodeEvent::Signal(signal));
343 }
344 }
345 }
346 })
347 .unwrap()
348 };
349
350 while self.handler.is_running() {
351 network_processor.process_poll_event(Some(*SAMPLING_TIMEOUT), |net_event| {
352 let mut event_callback = multiplexed.lock().expect(OTHER_THREAD_ERR);
353 if self.handler.is_running() {
354 event_callback(NodeEvent::Network(net_event));
355 }
356 });
357 }
358 })
359 .unwrap();
360 }
361
362 pub fn for_each_async(
396 mut self,
397 event_callback: impl FnMut(NodeEvent<S>) + Send + 'static,
398 ) -> NodeTask {
399 self.cache_running.store(false, Ordering::Relaxed);
401 let (mut network_processor, mut cache) = self.network_cache_thread.join();
402
403 let multiplexed = Arc::new(Mutex::new(event_callback));
404
405 let _locked = multiplexed.lock().expect(OTHER_THREAD_ERR);
408
409 let network_thread = {
410 let multiplexed = multiplexed.clone();
411 let handler = self.handler.clone();
412
413 NamespacedThread::spawn("node-network-thread", move || {
414 while let Some(event) = cache.pop_front() {
415 let net_event = event.borrow();
416 log::trace!("Read from cache {:?}", net_event);
417 let mut event_callback = multiplexed.lock().expect(OTHER_THREAD_ERR);
418 event_callback(NodeEvent::Network(net_event));
419 if !handler.is_running() {
420 return;
421 }
422 }
423
424 while handler.is_running() {
425 network_processor.process_poll_event(Some(*SAMPLING_TIMEOUT), |net_event| {
426 let mut event_callback = multiplexed.lock().expect(OTHER_THREAD_ERR);
427 if handler.is_running() {
428 event_callback(NodeEvent::Network(net_event));
429 }
430 });
431 }
432 })
433 };
434
435 let signal_thread = {
436 let multiplexed = multiplexed.clone();
437 let mut signal_receiver = std::mem::take(&mut self.signal_receiver);
438 let handler = self.handler.clone();
439
440 NamespacedThread::spawn("node-signal-thread", move || {
441 while handler.is_running() {
442 if let Some(signal) = signal_receiver.receive_timeout(*SAMPLING_TIMEOUT) {
443 let mut event_callback = multiplexed.lock().expect(OTHER_THREAD_ERR);
444 if handler.is_running() {
445 event_callback(NodeEvent::Signal(signal));
446 }
447 }
448 }
449 })
450 };
451
452 NodeTask { network_thread, signal_thread }
453 }
454
455 pub fn enqueue(self) -> (NodeTask, EventReceiver<StoredNodeEvent<S>>) {
483 let (sender, receiver) = events::split::<StoredNodeEvent<S>>();
484 let task = self.for_each_async(move |node_event| sender.send(node_event.into()));
485 (task, receiver)
486 }
487}
488
489impl<S: Send + 'static> Drop for NodeListener<S> {
490 fn drop(&mut self) {
491 self.cache_running.store(false, Ordering::Relaxed);
492 }
493}
494
495#[must_use = "The NodeTask must be used or the asynchronous task will be dropped in return"]
501pub struct NodeTask {
502 network_thread: NamespacedThread<()>,
503 signal_thread: NamespacedThread<()>,
504}
505
506impl NodeTask {
507 pub fn wait(&mut self) {
512 self.network_thread.try_join();
513 self.signal_thread.try_join();
514 }
515}
516
517#[cfg(test)]
518mod tests {
519 use super::*;
520 use std::time::{Duration};
521
522 #[test]
523 fn create_node_and_drop() {
524 let (handler, _listener) = split::<()>();
525 assert!(handler.is_running());
526 }
528
529 #[test]
530 fn sync_node() {
531 let (handler, listener) = split();
532 assert!(handler.is_running());
533 handler.signals().send_with_timer((), Duration::from_millis(1000));
534
535 let inner_handler = handler.clone();
536 listener.for_each(move |_| inner_handler.stop());
537
538 assert!(!handler.is_running());
539 }
540
541 #[test]
542 fn async_node() {
543 let (handler, listener) = split();
544 assert!(handler.is_running());
545 handler.signals().send_with_timer("check", Duration::from_millis(250));
546
547 let checked = Arc::new(AtomicBool::new(false));
548 let inner_checked = checked.clone();
549 let inner_handler = handler.clone();
550 let _node_task = listener.for_each_async(move |event| match event.signal() {
551 "stop" => inner_handler.stop(),
552 "check" => inner_checked.store(true, Ordering::Relaxed),
553 _ => unreachable!(),
554 });
555
556 assert!(handler.is_running());
558 std::thread::sleep(Duration::from_millis(500));
559 assert!(checked.load(Ordering::Relaxed));
560 assert!(handler.is_running());
561 handler.signals().send("stop");
562 }
563
564 #[test]
565 fn enqueue() {
566 let (handler, listener) = split();
567 assert!(handler.is_running());
568 handler.signals().send_with_timer((), Duration::from_millis(1000));
569
570 let (mut task, mut receiver) = listener.enqueue();
571 assert!(handler.is_running());
572
573 receiver.receive_timeout(Duration::from_millis(2000)).unwrap().signal();
574 handler.stop();
575
576 assert!(!handler.is_running());
577 task.wait();
578 }
579
580 #[test]
581 fn wait_task() {
582 let (handler, listener) = split();
583
584 handler.signals().send_with_timer((), Duration::from_millis(1000));
585
586 let inner_handler = handler.clone();
587 listener.for_each_async(move |_| inner_handler.stop()).wait();
588
589 assert!(!handler.is_running());
590 }
591
592 #[test]
593 fn wait_already_waited_task() {
594 let (handler, listener) = split();
595
596 handler.signals().send_with_timer((), Duration::from_millis(1000));
597
598 let inner_handler = handler.clone();
599 let mut task = listener.for_each_async(move |_| inner_handler.stop());
600 assert!(handler.is_running());
601 task.wait();
602 assert!(!handler.is_running());
603 task.wait();
604 assert!(!handler.is_running());
605 }
606}