message_io/
node.rs

1use crate::network::{self, NetworkController, NetworkProcessor, NetEvent, Endpoint, ResourceId};
2use crate::events::{self, EventSender, EventReceiver};
3use crate::util::thread::{NamespacedThread, OTHER_THREAD_ERR};
4
5use std::sync::{
6    Arc, Mutex,
7    atomic::{AtomicBool, Ordering},
8};
9use std::time::{Duration};
10use std::collections::{VecDeque};
11
12lazy_static::lazy_static! {
13    static ref SAMPLING_TIMEOUT: Duration = Duration::from_millis(50);
14}
15
16/// Event returned by [`NodeListener::for_each()`] and [`NodeListener::for_each_async()`]
17/// when some network event or signal is received.
18pub enum NodeEvent<'a, S> {
19    /// The `NodeEvent` is an event that comes from the network.
20    /// See [`NetEvent`] to know about the different network events.
21    Network(NetEvent<'a>),
22
23    /// The `NodeEvent` is a signal.
24    /// A signal is an event produced by the own node to itself.
25    /// You can send signals with timers or priority.
26    /// See [`EventSender`] to know about how to send signals.
27    Signal(S),
28}
29
30impl<S: std::fmt::Debug> std::fmt::Debug for NodeEvent<'_, S> {
31    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
32        match self {
33            NodeEvent::Network(net_event) => write!(f, "NodeEvent::Network({net_event:?})"),
34            NodeEvent::Signal(signal) => write!(f, "NodeEvent::Signal({signal:?})"),
35        }
36    }
37}
38
39impl<'a, S> NodeEvent<'a, S> {
40    /// Assume the event is a [`NodeEvent::Network`], panics if not.
41    pub fn network(self) -> NetEvent<'a> {
42        match self {
43            NodeEvent::Network(net_event) => net_event,
44            NodeEvent::Signal(..) => panic!("NodeEvent must be a NetEvent"),
45        }
46    }
47
48    /// Assume the event is a [`NodeEvent::Signal`], panics if not.
49    pub fn signal(self) -> S {
50        match self {
51            NodeEvent::Network(..) => panic!("NodeEvent must be a Signal"),
52            NodeEvent::Signal(signal) => signal,
53        }
54    }
55}
56
57/// Analogous to [`NodeEvent`] but without reference the data.
58/// This kind of event is dispatched by `NodeListener::to_event_queue()`.
59/// It is useful when you need to move an [`NodeEvent`]
60#[derive(Clone)]
61pub enum StoredNodeEvent<S> {
62    /// The `StoredNodeEvent` is an event that comes from the network.
63    /// See [`NetEvent`] to know about the different network events.
64    Network(StoredNetEvent),
65
66    /// The `StoredNodeEvent` is a signal.
67    /// A signal is an event produced by the own node to itself.
68    /// You can send signals with timers or priority.
69    /// See [`EventSender`] to know about how to send signals.
70    Signal(S),
71}
72
73impl<S: std::fmt::Debug> std::fmt::Debug for StoredNodeEvent<S> {
74    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
75        match self {
76            StoredNodeEvent::Network(net_event) => write!(f, "NodeEvent::Network({net_event:?})"),
77            StoredNodeEvent::Signal(signal) => write!(f, "NodeEvent::Signal({signal:?})"),
78        }
79    }
80}
81
82impl<S> StoredNodeEvent<S> {
83    /// Assume the event is a [`StoredNodeEvent::Network`], panics if not.
84    pub fn network(self) -> StoredNetEvent {
85        match self {
86            StoredNodeEvent::Network(net_event) => net_event,
87            StoredNodeEvent::Signal(..) => panic!("NodeEvent must be a NetEvent"),
88        }
89    }
90
91    /// Assume the event is a [`StoredNodeEvent::Signal`], panics if not.
92    pub fn signal(self) -> S {
93        match self {
94            StoredNodeEvent::Network(..) => panic!("NodeEvent must be a Signal"),
95            StoredNodeEvent::Signal(signal) => signal,
96        }
97    }
98}
99
100impl<S> From<NodeEvent<'_, S>> for StoredNodeEvent<S> {
101    fn from(node_event: NodeEvent<'_, S>) -> Self {
102        match node_event {
103            NodeEvent::Network(net_event) => StoredNodeEvent::Network(net_event.into()),
104            NodeEvent::Signal(signal) => StoredNodeEvent::Signal(signal),
105        }
106    }
107}
108
109/// Analogous to [`NetEvent`] but with static lifetime (without reference the data).
110/// This kind of event is dispatched by `NodeListener::to_event_queue()`
111/// and can be easily stored in any container.
112#[derive(Debug, Clone)]
113pub enum StoredNetEvent {
114    Connected(Endpoint, bool),
115    Accepted(Endpoint, ResourceId),
116    Message(Endpoint, Vec<u8>),
117    Disconnected(Endpoint),
118}
119
120impl From<NetEvent<'_>> for StoredNetEvent {
121    fn from(net_event: NetEvent<'_>) -> Self {
122        match net_event {
123            NetEvent::Connected(endpoint, status) => Self::Connected(endpoint, status),
124            NetEvent::Accepted(endpoint, id) => Self::Accepted(endpoint, id),
125            NetEvent::Message(endpoint, data) => Self::Message(endpoint, Vec::from(data)),
126            NetEvent::Disconnected(endpoint) => Self::Disconnected(endpoint),
127        }
128    }
129}
130
131impl StoredNetEvent {
132    /// Use this `StoredNetEvent` as a `NetEvent` referencing its data.
133    pub fn borrow(&self) -> NetEvent<'_> {
134        match self {
135            Self::Connected(endpoint, status) => NetEvent::Connected(*endpoint, *status),
136            Self::Accepted(endpoint, id) => NetEvent::Accepted(*endpoint, *id),
137            Self::Message(endpoint, data) => NetEvent::Message(*endpoint, data),
138            Self::Disconnected(endpoint) => NetEvent::Disconnected(*endpoint),
139        }
140    }
141}
142
143/// Creates a node already working.
144/// This function offers two instances: a [`NodeHandler`] to perform network and signals actions
145/// and a [`NodeListener`] to receive the events the node receives.
146///
147/// Note that [`NodeListener`] is already listen for events from its creation.
148/// In order to get the listened events you can call [`NodeListener::for_each()`]
149/// Any event happened before `for_each()` call will be also dispatched.
150///
151/// # Examples
152/// ```rust
153/// use message_io::node::{self, NodeEvent};
154///
155/// enum Signal {
156///     Close,
157///     Tick,
158///     //Other signals here.
159/// }
160///
161/// let (handler, listener) = node::split();
162///
163/// handler.signals().send_with_timer(Signal::Close, std::time::Duration::from_secs(1));
164///
165/// listener.for_each(move |event| match event {
166///     NodeEvent::Network(_) => { /* ... */ },
167///     NodeEvent::Signal(signal) => match signal {
168///         Signal::Close => handler.stop(), //Received after 1 sec
169///         Signal::Tick => { /* ... */ },
170///     },
171/// });
172/// ```
173///
174/// In case you don't use signals, specify the node type with an unit (`()`) type.
175/// ```
176/// use message_io::node::{self};
177///
178/// let (handler, listener) = node::split::<()>();
179/// ```
180pub fn split<S: Send>() -> (NodeHandler<S>, NodeListener<S>) {
181    let (network_controller, network_processor) = network::split();
182    let (signal_sender, signal_receiver) = events::split();
183    let running = AtomicBool::new(true);
184
185    let handler = NodeHandler(Arc::new(NodeHandlerImpl {
186        network: network_controller,
187        signals: signal_sender,
188        running,
189    }));
190
191    let listener = NodeListener::new(network_processor, signal_receiver, handler.clone());
192
193    (handler, listener)
194}
195
196struct NodeHandlerImpl<S> {
197    network: NetworkController,
198    signals: EventSender<S>,
199    running: AtomicBool,
200}
201
202/// A shareable and clonable entity that allows to deal with
203/// the network, send signals and stop the node.
204pub struct NodeHandler<S>(Arc<NodeHandlerImpl<S>>);
205
206impl<S> NodeHandler<S> {
207    /// Returns a reference to the NetworkController to deal with the network.
208    /// See [`NetworkController`]
209    pub fn network(&self) -> &NetworkController {
210        &self.0.network
211    }
212
213    /// Returns a reference to the EventSender to send signals to the node.
214    /// Signals are events that the node send to itself useful in situation where you need
215    /// to "wake up" the [`NodeListener`] to perform some action.
216    /// See [`EventSender`].
217    pub fn signals(&self) -> &EventSender<S> {
218        &self.0.signals
219    }
220
221    /// Finalizes the [`NodeListener`].
222    /// After this call, no more events will be processed by [`NodeListener::for_each()`].
223    pub fn stop(&self) {
224        self.0.running.store(false, Ordering::Relaxed);
225    }
226
227    /// Check if the node is running.
228    /// Note that the node is running and listening events from its creation,
229    /// not only once you call to [`NodeListener::for_each()`].
230    pub fn is_running(&self) -> bool {
231        self.0.running.load(Ordering::Relaxed)
232    }
233}
234
235impl<S: Send + 'static> Clone for NodeHandler<S> {
236    fn clone(&self) -> Self {
237        Self(self.0.clone())
238    }
239}
240
241/// Listen events for network and signal events.
242pub struct NodeListener<S: Send + 'static> {
243    network_cache_thread: NamespacedThread<(NetworkProcessor, VecDeque<StoredNetEvent>)>,
244    cache_running: Arc<AtomicBool>,
245    signal_receiver: EventReceiver<S>,
246    handler: NodeHandler<S>,
247}
248
249impl<S: Send + 'static> NodeListener<S> {
250    fn new(
251        mut network_processor: NetworkProcessor,
252        signal_receiver: EventReceiver<S>,
253        handler: NodeHandler<S>,
254    ) -> NodeListener<S> {
255        // Spawn the network thread to be able to perform correctly any network action before
256        // for_each() call. Any generated event would be cached and offered to the user when they
257        // call for_each().
258        let cache_running = Arc::new(AtomicBool::new(true));
259        let network_cache_thread = {
260            let cache_running = cache_running.clone();
261            let mut cache = VecDeque::new();
262            NamespacedThread::spawn("node-network-cache-thread", move || {
263                while cache_running.load(Ordering::Relaxed) {
264                    network_processor.process_poll_event(Some(*SAMPLING_TIMEOUT), |net_event| {
265                        log::trace!("Cached {:?}", net_event);
266                        cache.push_back(net_event.into());
267                    });
268                }
269                (network_processor, cache)
270            })
271        };
272
273        NodeListener { network_cache_thread, cache_running, signal_receiver, handler }
274    }
275
276    /// Iterate indefinitely over all generated `NetEvent`.
277    /// This function will work until [`NodeHandler::stop()`] is called.
278    ///
279    /// Note that any events generated before calling this function (e.g. some connection was done)
280    /// will be stored and offered once you call `for_each()`.
281    /// # Example
282    /// ```
283    /// use message_io::node::{self, NodeEvent};
284    /// use message_io::network::Transport;
285    ///
286    /// let (handler, listener) = node::split();
287    /// handler.signals().send_with_timer((), std::time::Duration::from_secs(1));
288    /// let (id, addr) = handler.network().listen(Transport::FramedTcp, "127.0.0.1:0").unwrap();
289    ///
290    /// listener.for_each(move |event| match event {
291    ///     NodeEvent::Network(net_event) => { /* Your logic here */ },
292    ///     NodeEvent::Signal(_) => handler.stop(),
293    /// });
294    /// // Blocked here until handler.stop() is called (1 sec).
295    /// println!("Node is stopped");
296    /// ```
297    pub fn for_each(mut self, mut event_callback: impl FnMut(NodeEvent<S>)) {
298        // Stop cache events
299        self.cache_running.store(false, Ordering::Relaxed);
300        let (mut network_processor, mut cache) = self.network_cache_thread.join();
301
302        // Dispatch the catched events first.
303        while let Some(event) = cache.pop_front() {
304            let net_event = event.borrow();
305            log::trace!("Read from cache {:?}", net_event);
306            event_callback(NodeEvent::Network(net_event));
307            if !self.handler.is_running() {
308                return;
309            }
310        }
311
312        crossbeam_utils::thread::scope(|scope| {
313            let multiplexed = Arc::new(Mutex::new(event_callback));
314
315            let _signal_thread = {
316                let mut signal_receiver = std::mem::take(&mut self.signal_receiver);
317                let handler = self.handler.clone();
318
319                // This struct is used to allow passing the no sendable event_callback
320                // into the signal thread.
321                // It is safe because the thread are scoped and the callback is managed by a lock,
322                // so only one call is performed at the same time.
323                // It implies that any object moved into the callback do not have
324                // any concurrence issues.
325                #[allow(clippy::type_complexity)]
326                struct SendableEventCallback<'a, S>(Arc<Mutex<dyn FnMut(NodeEvent<S>) + 'a>>);
327                #[allow(clippy::non_send_fields_in_send_ty)]
328                unsafe impl<S> Send for SendableEventCallback<'_, S> {}
329
330                let multiplexed = SendableEventCallback(multiplexed.clone());
331
332                scope
333                    .builder()
334                    .name(String::from("node-network-thread"))
335                    .spawn(move |_| {
336                        while handler.is_running() {
337                            if let Some(signal) = signal_receiver.receive_timeout(*SAMPLING_TIMEOUT)
338                            {
339                                let mut event_callback =
340                                    multiplexed.0.lock().expect(OTHER_THREAD_ERR);
341                                if handler.is_running() {
342                                    event_callback(NodeEvent::Signal(signal));
343                                }
344                            }
345                        }
346                    })
347                    .unwrap()
348            };
349
350            while self.handler.is_running() {
351                network_processor.process_poll_event(Some(*SAMPLING_TIMEOUT), |net_event| {
352                    let mut event_callback = multiplexed.lock().expect(OTHER_THREAD_ERR);
353                    if self.handler.is_running() {
354                        event_callback(NodeEvent::Network(net_event));
355                    }
356                });
357            }
358        })
359        .unwrap();
360    }
361
362    /// Similar to [`NodeListener::for_each()`] but it returns the control to the user
363    /// after calling it. The events will be processed asynchronously.
364    /// A `NodeTask` representing this asynchronous job is returned.
365    /// Destroying this object will result in blocking the current thread until
366    /// [`NodeHandler::stop()`] is called.
367    ///
368    /// In order to allow the node working asynchronously, you can move the `NodeTask` to a
369    /// an object with a longer lifetime.
370    ///
371    /// # Example
372    /// ```
373    /// use message_io::node::{self, NodeEvent};
374    /// use message_io::network::Transport;
375    ///
376    /// let (handler, listener) = node::split();
377    /// handler.signals().send_with_timer((), std::time::Duration::from_secs(1));
378    /// let (id, addr) = handler.network().listen(Transport::FramedTcp, "127.0.0.1:0").unwrap();
379    ///
380    /// let task = listener.for_each_async(move |event| match event {
381    ///      NodeEvent::Network(net_event) => { /* Your logic here */ },
382    ///      NodeEvent::Signal(_) => handler.stop(),
383    /// });
384    /// // for_each_async() will act asynchronous during 'task' lifetime.
385    ///
386    /// // ...
387    /// println!("Node is running");
388    /// // ...
389    ///
390    /// drop(task); // Blocked here until handler.stop() is called (1 sec).
391    /// // Also task.wait(); can be called doing the same (but taking a mutable reference).
392    ///
393    /// println!("Node is stopped");
394    /// ```
395    pub fn for_each_async(
396        mut self,
397        event_callback: impl FnMut(NodeEvent<S>) + Send + 'static,
398    ) -> NodeTask {
399        // Stop cache events
400        self.cache_running.store(false, Ordering::Relaxed);
401        let (mut network_processor, mut cache) = self.network_cache_thread.join();
402
403        let multiplexed = Arc::new(Mutex::new(event_callback));
404
405        // To avoid processing stops while the node is configuring,
406        // the user callback locked until the function ends.
407        let _locked = multiplexed.lock().expect(OTHER_THREAD_ERR);
408
409        let network_thread = {
410            let multiplexed = multiplexed.clone();
411            let handler = self.handler.clone();
412
413            NamespacedThread::spawn("node-network-thread", move || {
414                while let Some(event) = cache.pop_front() {
415                    let net_event = event.borrow();
416                    log::trace!("Read from cache {:?}", net_event);
417                    let mut event_callback = multiplexed.lock().expect(OTHER_THREAD_ERR);
418                    event_callback(NodeEvent::Network(net_event));
419                    if !handler.is_running() {
420                        return;
421                    }
422                }
423
424                while handler.is_running() {
425                    network_processor.process_poll_event(Some(*SAMPLING_TIMEOUT), |net_event| {
426                        let mut event_callback = multiplexed.lock().expect(OTHER_THREAD_ERR);
427                        if handler.is_running() {
428                            event_callback(NodeEvent::Network(net_event));
429                        }
430                    });
431                }
432            })
433        };
434
435        let signal_thread = {
436            let multiplexed = multiplexed.clone();
437            let mut signal_receiver = std::mem::take(&mut self.signal_receiver);
438            let handler = self.handler.clone();
439
440            NamespacedThread::spawn("node-signal-thread", move || {
441                while handler.is_running() {
442                    if let Some(signal) = signal_receiver.receive_timeout(*SAMPLING_TIMEOUT) {
443                        let mut event_callback = multiplexed.lock().expect(OTHER_THREAD_ERR);
444                        if handler.is_running() {
445                            event_callback(NodeEvent::Signal(signal));
446                        }
447                    }
448                }
449            })
450        };
451
452        NodeTask { network_thread, signal_thread }
453    }
454
455    /// Consumes the listener to create a `NodeTask` and an `EventReceiver` where the events
456    /// of this node will be sent.
457    /// The events will be sent to the `EventReceiver` during the `NodeTask` lifetime.
458    /// The aim of this method is to offer a synchronous way of working with a *node*,
459    /// without using a clousure.
460    /// This easier API management has a performance cost.
461    /// Compared to [`NodeListener::for_each()`], this function adds latency because the
462    /// node event must be copied and no longer reference data from the internal socket buffer.
463    ///
464    /// # Example
465    /// ```
466    /// use message_io::node::{self, StoredNodeEvent as NodeEvent};
467    /// use message_io::network::Transport;
468    ///
469    /// let (handler, listener) = node::split();
470    /// handler.signals().send_with_timer((), std::time::Duration::from_secs(1));
471    /// let (id, addr) = handler.network().listen(Transport::FramedTcp, "127.0.0.1:0").unwrap();
472    ///
473    /// let (task, mut receiver) = listener.enqueue();
474    ///
475    /// loop {
476    ///     match receiver.receive() {
477    ///         NodeEvent::Network(net_event) => { /* Your logic here */ },
478    ///         NodeEvent::Signal(_) => break handler.stop(),
479    ///     }
480    /// }
481    /// ```
482    pub fn enqueue(self) -> (NodeTask, EventReceiver<StoredNodeEvent<S>>) {
483        let (sender, receiver) = events::split::<StoredNodeEvent<S>>();
484        let task = self.for_each_async(move |node_event| sender.send(node_event.into()));
485        (task, receiver)
486    }
487}
488
489impl<S: Send + 'static> Drop for NodeListener<S> {
490    fn drop(&mut self) {
491        self.cache_running.store(false, Ordering::Relaxed);
492    }
493}
494
495/// Entity used to ensure the lifetime of [`NodeListener::for_each_async()`] call.
496/// The node will process events asynchronously while this entity lives.
497/// The destruction of this entity will block until the task is finished.
498/// If you want to "unblock" the thread that drops this entity call to
499/// [`NodeHandler::stop()`] before or from another thread.
500#[must_use = "The NodeTask must be used or the asynchronous task will be dropped in return"]
501pub struct NodeTask {
502    network_thread: NamespacedThread<()>,
503    signal_thread: NamespacedThread<()>,
504}
505
506impl NodeTask {
507    /// Block the current thread until the task finalizes.
508    /// Similar to call `drop(node_task)` but more verbose and without take the ownership.
509    /// To finalize the task call [`NodeHandler::stop()`].
510    /// Calling `wait()` over an already finished task do not block.
511    pub fn wait(&mut self) {
512        self.network_thread.try_join();
513        self.signal_thread.try_join();
514    }
515}
516
517#[cfg(test)]
518mod tests {
519    use super::*;
520    use std::time::{Duration};
521
522    #[test]
523    fn create_node_and_drop() {
524        let (handler, _listener) = split::<()>();
525        assert!(handler.is_running());
526        // listener dropped here.
527    }
528
529    #[test]
530    fn sync_node() {
531        let (handler, listener) = split();
532        assert!(handler.is_running());
533        handler.signals().send_with_timer((), Duration::from_millis(1000));
534
535        let inner_handler = handler.clone();
536        listener.for_each(move |_| inner_handler.stop());
537
538        assert!(!handler.is_running());
539    }
540
541    #[test]
542    fn async_node() {
543        let (handler, listener) = split();
544        assert!(handler.is_running());
545        handler.signals().send_with_timer("check", Duration::from_millis(250));
546
547        let checked = Arc::new(AtomicBool::new(false));
548        let inner_checked = checked.clone();
549        let inner_handler = handler.clone();
550        let _node_task = listener.for_each_async(move |event| match event.signal() {
551            "stop" => inner_handler.stop(),
552            "check" => inner_checked.store(true, Ordering::Relaxed),
553            _ => unreachable!(),
554        });
555
556        // Since here `NodeTask` is living, the node is considered running.
557        assert!(handler.is_running());
558        std::thread::sleep(Duration::from_millis(500));
559        assert!(checked.load(Ordering::Relaxed));
560        assert!(handler.is_running());
561        handler.signals().send("stop");
562    }
563
564    #[test]
565    fn enqueue() {
566        let (handler, listener) = split();
567        assert!(handler.is_running());
568        handler.signals().send_with_timer((), Duration::from_millis(1000));
569
570        let (mut task, mut receiver) = listener.enqueue();
571        assert!(handler.is_running());
572
573        receiver.receive_timeout(Duration::from_millis(2000)).unwrap().signal();
574        handler.stop();
575
576        assert!(!handler.is_running());
577        task.wait();
578    }
579
580    #[test]
581    fn wait_task() {
582        let (handler, listener) = split();
583
584        handler.signals().send_with_timer((), Duration::from_millis(1000));
585
586        let inner_handler = handler.clone();
587        listener.for_each_async(move |_| inner_handler.stop()).wait();
588
589        assert!(!handler.is_running());
590    }
591
592    #[test]
593    fn wait_already_waited_task() {
594        let (handler, listener) = split();
595
596        handler.signals().send_with_timer((), Duration::from_millis(1000));
597
598        let inner_handler = handler.clone();
599        let mut task = listener.for_each_async(move |_| inner_handler.stop());
600        assert!(handler.is_running());
601        task.wait();
602        assert!(!handler.is_running());
603        task.wait();
604        assert!(!handler.is_running());
605    }
606}