use crate::network::{self, NetworkController, NetworkProcessor, NetEvent, Endpoint, ResourceId};
use crate::events::{self, EventSender, EventReceiver};
use crate::util::thread::{NamespacedThread, OTHER_THREAD_ERR};
use std::sync::{
Arc, Mutex,
atomic::{AtomicBool, Ordering},
};
use std::time::{Duration};
use std::collections::{VecDeque};
lazy_static::lazy_static! {
static ref SAMPLING_TIMEOUT: Duration = Duration::from_millis(50);
}
pub enum NodeEvent<'a, S> {
Network(NetEvent<'a>),
Signal(S),
}
impl<'a, S: std::fmt::Debug> std::fmt::Debug for NodeEvent<'a, S> {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
match self {
NodeEvent::Network(net_event) => write!(f, "NodeEvent::Network({:?})", net_event),
NodeEvent::Signal(signal) => write!(f, "NodeEvent::Signal({:?})", signal),
}
}
}
impl<'a, S> NodeEvent<'a, S> {
pub fn network(self) -> NetEvent<'a> {
match self {
NodeEvent::Network(net_event) => net_event,
NodeEvent::Signal(..) => panic!("NodeEvent must be a NetEvent"),
}
}
pub fn signal(self) -> S {
match self {
NodeEvent::Network(..) => panic!("NodeEvent must be a Signal"),
NodeEvent::Signal(signal) => signal,
}
}
}
pub fn split<S: Send>() -> (NodeHandler<S>, NodeListener<S>) {
let (network_controller, network_processor) = network::split();
let (signal_sender, signal_receiver) = events::split();
let running = Arc::new(AtomicBool::new(true));
let handler = NodeHandler::new(network_controller, signal_sender, running.clone());
let listener = NodeListener::new(network_processor, signal_receiver, running);
(handler, listener)
}
pub struct NodeHandler<S> {
network: Arc<NetworkController>,
signals: EventSender<S>,
running: Arc<AtomicBool>,
}
impl<S> NodeHandler<S> {
fn new(network: NetworkController, signals: EventSender<S>, running: Arc<AtomicBool>) -> Self {
Self { network: Arc::new(network), signals, running }
}
pub fn network(&self) -> &NetworkController {
&self.network
}
pub fn signals(&self) -> &EventSender<S> {
&self.signals
}
pub fn stop(&self) {
self.running.store(false, Ordering::Relaxed);
}
pub fn is_running(&self) -> bool {
self.running.load(Ordering::Relaxed)
}
}
impl<S: Send + 'static> Clone for NodeHandler<S> {
fn clone(&self) -> Self {
Self {
network: self.network.clone(),
signals: self.signals.clone(),
running: self.running.clone(),
}
}
}
#[derive(Debug)]
enum StoredNetEvent {
Connected(Endpoint, ResourceId),
Message(Endpoint, Vec<u8>),
Disconnected(Endpoint),
}
impl From<NetEvent<'_>> for StoredNetEvent {
fn from(net_event: NetEvent<'_>) -> Self {
match net_event {
NetEvent::Connected(endpoint, id) => Self::Connected(endpoint, id),
NetEvent::Message(endpoint, data) => Self::Message(endpoint, Vec::from(data)),
NetEvent::Disconnected(endpoint) => Self::Disconnected(endpoint),
}
}
}
impl StoredNetEvent {
fn borrow(&self) -> NetEvent<'_> {
match self {
Self::Connected(endpoint, id) => NetEvent::Connected(*endpoint, *id),
Self::Message(endpoint, data) => NetEvent::Message(*endpoint, &data),
Self::Disconnected(endpoint) => NetEvent::Disconnected(*endpoint),
}
}
}
struct SendableEventCallback<S>(Arc<Mutex<dyn FnMut(NodeEvent<S>)>>);
unsafe impl<S> Send for SendableEventCallback<S> {}
impl<S> Clone for SendableEventCallback<S> {
fn clone(&self) -> Self {
Self(self.0.clone())
}
}
pub struct NodeListener<S: Send + 'static> {
network_cache_thread: NamespacedThread<(NetworkProcessor, VecDeque<StoredNetEvent>)>,
cache_running: Arc<AtomicBool>,
signal_receiver: EventReceiver<S>,
running: Arc<AtomicBool>,
}
impl<S: Send + 'static> NodeListener<S> {
fn new(
mut network_processor: NetworkProcessor,
signal_receiver: EventReceiver<S>,
running: Arc<AtomicBool>,
) -> NodeListener<S> {
let cache_running = Arc::new(AtomicBool::new(true));
let network_cache_thread = {
let cache_running = cache_running.clone();
let mut cache = VecDeque::new();
NamespacedThread::spawn("node-network-cache-thread", move || {
while cache_running.load(Ordering::Relaxed) {
network_processor.process_poll_event(Some(*SAMPLING_TIMEOUT), |net_event| {
log::trace!("Cached {:?}", net_event);
cache.push_back(net_event.into());
});
}
(network_processor, cache)
})
};
NodeListener { network_cache_thread, cache_running, signal_receiver, running }
}
pub fn for_each(self, event_callback: impl FnMut(NodeEvent<S>) + 'static) {
let sendable_callback = SendableEventCallback(Arc::new(Mutex::new(event_callback)));
let mut task = self.for_each_impl(sendable_callback);
task.wait();
}
pub fn for_each_async(
self,
event_callback: impl FnMut(NodeEvent<S>) + Send + 'static,
) -> NodeTask {
let sendable_callback = SendableEventCallback(Arc::new(Mutex::new(event_callback)));
self.for_each_impl(sendable_callback)
}
fn for_each_impl(mut self, multiplexed: SendableEventCallback<S>) -> NodeTask {
self.cache_running.store(false, Ordering::Relaxed);
let (mut network_processor, mut cache) = self.network_cache_thread.join();
let _locked = multiplexed.0.lock().expect(OTHER_THREAD_ERR);
let network_thread = {
let multiplexed = multiplexed.clone();
let running = self.running.clone();
NamespacedThread::spawn("node-network-thread", move || {
while let Some(event) = cache.pop_front() {
let mut event_callback = multiplexed.0.lock().expect(OTHER_THREAD_ERR);
let net_event = event.borrow();
log::trace!("Read from cache {:?}", net_event);
event_callback(NodeEvent::Network(net_event));
if !running.load(Ordering::Relaxed) {
return
}
}
while running.load(Ordering::Relaxed) {
network_processor.process_poll_event(Some(*SAMPLING_TIMEOUT), |net_event| {
let mut event_callback = multiplexed.0.lock().expect(OTHER_THREAD_ERR);
if running.load(Ordering::Relaxed) {
event_callback(NodeEvent::Network(net_event));
}
});
}
})
};
let signal_thread = {
let multiplexed = multiplexed.clone();
let mut signal_receiver = std::mem::take(&mut self.signal_receiver);
let running = self.running.clone();
NamespacedThread::spawn("node-signal-thread", move || {
while running.load(Ordering::Relaxed) {
if let Some(signal) = signal_receiver.receive_timeout(*SAMPLING_TIMEOUT) {
let mut event_callback = multiplexed.0.lock().expect(OTHER_THREAD_ERR);
if running.load(Ordering::Relaxed) {
event_callback(NodeEvent::Signal(signal));
}
}
}
})
};
NodeTask { network_thread, signal_thread }
}
}
impl<S: Send + 'static> Drop for NodeListener<S> {
fn drop(&mut self) {
self.cache_running.store(false, Ordering::Relaxed);
}
}
pub struct NodeTask {
network_thread: NamespacedThread<()>,
signal_thread: NamespacedThread<()>,
}
impl NodeTask {
pub fn wait(&mut self) {
self.network_thread.try_join();
self.signal_thread.try_join();
}
}
#[cfg(test)]
mod tests {
use super::*;
use std::time::{Duration};
#[test]
fn create_node_and_drop() {
let (handler, _listener) = split::<()>();
assert!(handler.is_running());
}
#[test]
fn sync_node() {
let (handler, listener) = split();
assert!(handler.is_running());
handler.signals().send_with_timer((), Duration::from_millis(1000));
let inner_handler = handler.clone();
listener.for_each(move |_| inner_handler.stop());
assert!(!handler.is_running());
}
#[test]
fn async_node() {
let (handler, listener) = split();
assert!(handler.is_running());
handler.signals().send_with_timer("check", Duration::from_millis(250));
let checked = Arc::new(AtomicBool::new(false));
let inner_checked = checked.clone();
let inner_handler = handler.clone();
let _node_task = listener.for_each_async(move |event| match event.signal() {
"stop" => inner_handler.stop(),
"check" => inner_checked.store(true, Ordering::Relaxed),
_ => unreachable!(),
});
assert!(handler.is_running());
std::thread::sleep(Duration::from_millis(500));
assert!(checked.load(Ordering::Relaxed));
assert!(handler.is_running());
handler.signals().send("stop");
}
#[test]
fn wait_task() {
let (handler, listener) = split();
handler.signals().send_with_timer((), Duration::from_millis(1000));
let inner_handler = handler.clone();
listener.for_each_async(move |_| inner_handler.stop()).wait();
assert!(!handler.is_running());
}
#[test]
fn wait_already_waited_task() {
let (handler, listener) = split();
handler.signals().send_with_timer((), Duration::from_millis(1000));
let inner_handler = handler.clone();
let mut task = listener.for_each_async(move |_| inner_handler.stop());
assert!(handler.is_running());
task.wait();
assert!(!handler.is_running());
task.wait();
assert!(!handler.is_running());
}
}