use crate::network::{self, NetworkController, NetworkProcessor, NetEvent, Endpoint, ResourceId};
use crate::events::{self, EventSender, EventReceiver};
use crate::util::thread::{NamespacedThread, OTHER_THREAD_ERR};
use std::sync::{
Arc, Mutex,
atomic::{AtomicBool, Ordering},
};
use std::time::{Duration};
use std::collections::{VecDeque};
lazy_static::lazy_static! {
static ref SAMPLING_TIMEOUT: Duration = Duration::from_millis(50);
}
pub enum NodeEvent<'a, S> {
Network(NetEvent<'a>),
Signal(S),
}
impl<S: std::fmt::Debug> std::fmt::Debug for NodeEvent<'_, S> {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
match self {
NodeEvent::Network(net_event) => write!(f, "NodeEvent::Network({net_event:?})"),
NodeEvent::Signal(signal) => write!(f, "NodeEvent::Signal({signal:?})"),
}
}
}
impl<'a, S> NodeEvent<'a, S> {
pub fn network(self) -> NetEvent<'a> {
match self {
NodeEvent::Network(net_event) => net_event,
NodeEvent::Signal(..) => panic!("NodeEvent must be a NetEvent"),
}
}
pub fn signal(self) -> S {
match self {
NodeEvent::Network(..) => panic!("NodeEvent must be a Signal"),
NodeEvent::Signal(signal) => signal,
}
}
}
#[derive(Clone)]
pub enum StoredNodeEvent<S> {
Network(StoredNetEvent),
Signal(S),
}
impl<S: std::fmt::Debug> std::fmt::Debug for StoredNodeEvent<S> {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
match self {
StoredNodeEvent::Network(net_event) => write!(f, "NodeEvent::Network({net_event:?})"),
StoredNodeEvent::Signal(signal) => write!(f, "NodeEvent::Signal({signal:?})"),
}
}
}
impl<S> StoredNodeEvent<S> {
pub fn network(self) -> StoredNetEvent {
match self {
StoredNodeEvent::Network(net_event) => net_event,
StoredNodeEvent::Signal(..) => panic!("NodeEvent must be a NetEvent"),
}
}
pub fn signal(self) -> S {
match self {
StoredNodeEvent::Network(..) => panic!("NodeEvent must be a Signal"),
StoredNodeEvent::Signal(signal) => signal,
}
}
}
impl<S> From<NodeEvent<'_, S>> for StoredNodeEvent<S> {
fn from(node_event: NodeEvent<'_, S>) -> Self {
match node_event {
NodeEvent::Network(net_event) => StoredNodeEvent::Network(net_event.into()),
NodeEvent::Signal(signal) => StoredNodeEvent::Signal(signal),
}
}
}
#[derive(Debug, Clone)]
pub enum StoredNetEvent {
Connected(Endpoint, bool),
Accepted(Endpoint, ResourceId),
Message(Endpoint, Vec<u8>),
Disconnected(Endpoint),
}
impl From<NetEvent<'_>> for StoredNetEvent {
fn from(net_event: NetEvent<'_>) -> Self {
match net_event {
NetEvent::Connected(endpoint, status) => Self::Connected(endpoint, status),
NetEvent::Accepted(endpoint, id) => Self::Accepted(endpoint, id),
NetEvent::Message(endpoint, data) => Self::Message(endpoint, Vec::from(data)),
NetEvent::Disconnected(endpoint) => Self::Disconnected(endpoint),
}
}
}
impl StoredNetEvent {
pub fn borrow(&self) -> NetEvent<'_> {
match self {
Self::Connected(endpoint, status) => NetEvent::Connected(*endpoint, *status),
Self::Accepted(endpoint, id) => NetEvent::Accepted(*endpoint, *id),
Self::Message(endpoint, data) => NetEvent::Message(*endpoint, data),
Self::Disconnected(endpoint) => NetEvent::Disconnected(*endpoint),
}
}
}
pub fn split<S: Send>() -> (NodeHandler<S>, NodeListener<S>) {
let (network_controller, network_processor) = network::split();
let (signal_sender, signal_receiver) = events::split();
let running = AtomicBool::new(true);
let handler = NodeHandler(Arc::new(NodeHandlerImpl {
network: network_controller,
signals: signal_sender,
running,
}));
let listener = NodeListener::new(network_processor, signal_receiver, handler.clone());
(handler, listener)
}
struct NodeHandlerImpl<S> {
network: NetworkController,
signals: EventSender<S>,
running: AtomicBool,
}
pub struct NodeHandler<S>(Arc<NodeHandlerImpl<S>>);
impl<S> NodeHandler<S> {
pub fn network(&self) -> &NetworkController {
&self.0.network
}
pub fn signals(&self) -> &EventSender<S> {
&self.0.signals
}
pub fn stop(&self) {
self.0.running.store(false, Ordering::Relaxed);
}
pub fn is_running(&self) -> bool {
self.0.running.load(Ordering::Relaxed)
}
}
impl<S: Send + 'static> Clone for NodeHandler<S> {
fn clone(&self) -> Self {
Self(self.0.clone())
}
}
pub struct NodeListener<S: Send + 'static> {
network_cache_thread: NamespacedThread<(NetworkProcessor, VecDeque<StoredNetEvent>)>,
cache_running: Arc<AtomicBool>,
signal_receiver: EventReceiver<S>,
handler: NodeHandler<S>,
}
impl<S: Send + 'static> NodeListener<S> {
fn new(
mut network_processor: NetworkProcessor,
signal_receiver: EventReceiver<S>,
handler: NodeHandler<S>,
) -> NodeListener<S> {
let cache_running = Arc::new(AtomicBool::new(true));
let network_cache_thread = {
let cache_running = cache_running.clone();
let mut cache = VecDeque::new();
NamespacedThread::spawn("node-network-cache-thread", move || {
while cache_running.load(Ordering::Relaxed) {
network_processor.process_poll_event(Some(*SAMPLING_TIMEOUT), |net_event| {
log::trace!("Cached {:?}", net_event);
cache.push_back(net_event.into());
});
}
(network_processor, cache)
})
};
NodeListener { network_cache_thread, cache_running, signal_receiver, handler }
}
pub fn for_each(mut self, mut event_callback: impl FnMut(NodeEvent<S>)) {
self.cache_running.store(false, Ordering::Relaxed);
let (mut network_processor, mut cache) = self.network_cache_thread.join();
while let Some(event) = cache.pop_front() {
let net_event = event.borrow();
log::trace!("Read from cache {:?}", net_event);
event_callback(NodeEvent::Network(net_event));
if !self.handler.is_running() {
return;
}
}
crossbeam_utils::thread::scope(|scope| {
let multiplexed = Arc::new(Mutex::new(event_callback));
let _signal_thread = {
let mut signal_receiver = std::mem::take(&mut self.signal_receiver);
let handler = self.handler.clone();
#[allow(clippy::type_complexity)]
struct SendableEventCallback<'a, S>(Arc<Mutex<dyn FnMut(NodeEvent<S>) + 'a>>);
#[allow(clippy::non_send_fields_in_send_ty)]
unsafe impl<S> Send for SendableEventCallback<'_, S> {}
let multiplexed = SendableEventCallback(multiplexed.clone());
scope
.builder()
.name(String::from("node-network-thread"))
.spawn(move |_| {
while handler.is_running() {
if let Some(signal) = signal_receiver.receive_timeout(*SAMPLING_TIMEOUT)
{
let mut event_callback =
multiplexed.0.lock().expect(OTHER_THREAD_ERR);
if handler.is_running() {
event_callback(NodeEvent::Signal(signal));
}
}
}
})
.unwrap()
};
while self.handler.is_running() {
network_processor.process_poll_event(Some(*SAMPLING_TIMEOUT), |net_event| {
let mut event_callback = multiplexed.lock().expect(OTHER_THREAD_ERR);
if self.handler.is_running() {
event_callback(NodeEvent::Network(net_event));
}
});
}
})
.unwrap();
}
pub fn for_each_async(
mut self,
event_callback: impl FnMut(NodeEvent<S>) + Send + 'static,
) -> NodeTask {
self.cache_running.store(false, Ordering::Relaxed);
let (mut network_processor, mut cache) = self.network_cache_thread.join();
let multiplexed = Arc::new(Mutex::new(event_callback));
let _locked = multiplexed.lock().expect(OTHER_THREAD_ERR);
let network_thread = {
let multiplexed = multiplexed.clone();
let handler = self.handler.clone();
NamespacedThread::spawn("node-network-thread", move || {
while let Some(event) = cache.pop_front() {
let net_event = event.borrow();
log::trace!("Read from cache {:?}", net_event);
let mut event_callback = multiplexed.lock().expect(OTHER_THREAD_ERR);
event_callback(NodeEvent::Network(net_event));
if !handler.is_running() {
return;
}
}
while handler.is_running() {
network_processor.process_poll_event(Some(*SAMPLING_TIMEOUT), |net_event| {
let mut event_callback = multiplexed.lock().expect(OTHER_THREAD_ERR);
if handler.is_running() {
event_callback(NodeEvent::Network(net_event));
}
});
}
})
};
let signal_thread = {
let multiplexed = multiplexed.clone();
let mut signal_receiver = std::mem::take(&mut self.signal_receiver);
let handler = self.handler.clone();
NamespacedThread::spawn("node-signal-thread", move || {
while handler.is_running() {
if let Some(signal) = signal_receiver.receive_timeout(*SAMPLING_TIMEOUT) {
let mut event_callback = multiplexed.lock().expect(OTHER_THREAD_ERR);
if handler.is_running() {
event_callback(NodeEvent::Signal(signal));
}
}
}
})
};
NodeTask { network_thread, signal_thread }
}
pub fn enqueue(self) -> (NodeTask, EventReceiver<StoredNodeEvent<S>>) {
let (sender, receiver) = events::split::<StoredNodeEvent<S>>();
let task = self.for_each_async(move |node_event| sender.send(node_event.into()));
(task, receiver)
}
}
impl<S: Send + 'static> Drop for NodeListener<S> {
fn drop(&mut self) {
self.cache_running.store(false, Ordering::Relaxed);
}
}
#[must_use = "The NodeTask must be used or the asynchronous task will be dropped in return"]
pub struct NodeTask {
network_thread: NamespacedThread<()>,
signal_thread: NamespacedThread<()>,
}
impl NodeTask {
pub fn wait(&mut self) {
self.network_thread.try_join();
self.signal_thread.try_join();
}
}
#[cfg(test)]
mod tests {
use super::*;
use std::time::{Duration};
#[test]
fn create_node_and_drop() {
let (handler, _listener) = split::<()>();
assert!(handler.is_running());
}
#[test]
fn sync_node() {
let (handler, listener) = split();
assert!(handler.is_running());
handler.signals().send_with_timer((), Duration::from_millis(1000));
let inner_handler = handler.clone();
listener.for_each(move |_| inner_handler.stop());
assert!(!handler.is_running());
}
#[test]
fn async_node() {
let (handler, listener) = split();
assert!(handler.is_running());
handler.signals().send_with_timer("check", Duration::from_millis(250));
let checked = Arc::new(AtomicBool::new(false));
let inner_checked = checked.clone();
let inner_handler = handler.clone();
let _node_task = listener.for_each_async(move |event| match event.signal() {
"stop" => inner_handler.stop(),
"check" => inner_checked.store(true, Ordering::Relaxed),
_ => unreachable!(),
});
assert!(handler.is_running());
std::thread::sleep(Duration::from_millis(500));
assert!(checked.load(Ordering::Relaxed));
assert!(handler.is_running());
handler.signals().send("stop");
}
#[test]
fn enqueue() {
let (handler, listener) = split();
assert!(handler.is_running());
handler.signals().send_with_timer((), Duration::from_millis(1000));
let (mut task, mut receiver) = listener.enqueue();
assert!(handler.is_running());
receiver.receive_timeout(Duration::from_millis(2000)).unwrap().signal();
handler.stop();
assert!(!handler.is_running());
task.wait();
}
#[test]
fn wait_task() {
let (handler, listener) = split();
handler.signals().send_with_timer((), Duration::from_millis(1000));
let inner_handler = handler.clone();
listener.for_each_async(move |_| inner_handler.stop()).wait();
assert!(!handler.is_running());
}
#[test]
fn wait_already_waited_task() {
let (handler, listener) = split();
handler.signals().send_with_timer((), Duration::from_millis(1000));
let inner_handler = handler.clone();
let mut task = listener.for_each_async(move |_| inner_handler.stop());
assert!(handler.is_running());
task.wait();
assert!(!handler.is_running());
task.wait();
assert!(!handler.is_running());
}
}