use std::io::ErrorKind;
use std::net::{Ipv4Addr, UdpSocket};
use std::sync::mpsc::{Receiver, Sender, channel};
use std::sync::{Arc, Condvar, Mutex};
use std::thread::{self, JoinHandle, sleep};
use std::time::Duration;
use serde::{Deserialize, Serialize};
use nexosim::model::{BuildContext, Context, Model, ProtoModel, schedulable};
use nexosim::ports::{EventSinkReader, Output, SinkState, event_queue};
use nexosim::simulation::{Mailbox, SimInit, SimulationError};
use nexosim::time::{AutoSystemClock, MonotonicTime, PeriodicTicker};
const DELTA: Duration = Duration::from_millis(2);
const PERIOD: Duration = Duration::from_millis(20);
const N: usize = 10;
const SHUTDOWN_SIGNAL: &str = "<SHUTDOWN>";
const SENDER: (Ipv4Addr, u16) = (Ipv4Addr::new(127, 0, 0, 1), 8000);
const RECEIVER: (Ipv4Addr, u16) = (Ipv4Addr::new(127, 0, 0, 1), 9000);
pub struct ProtoListener {
pub message: Output<String>,
start: Notifier,
}
impl ProtoListener {
fn new(start: Notifier) -> Self {
Self {
message: Output::default(),
start,
}
}
}
impl ProtoModel for ProtoListener {
type Model = Listener;
fn build(self, _: &mut BuildContext<Self>) -> (Listener, ListenerEnv) {
let (tx, rx) = channel();
let external_handle = thread::spawn(move || {
Listener::listen(tx, self.start);
});
(
Listener::new(self.message),
ListenerEnv::new(rx, external_handle),
)
}
}
#[derive(Serialize, Deserialize)]
pub struct Listener {
message: Output<String>,
}
#[Model(type Env=ListenerEnv)]
impl Listener {
pub fn new(message: Output<String>) -> Self {
Self { message }
}
#[nexosim(init)]
async fn init(&mut self, cx: &Context<Self>) {
cx.schedule_periodic_event(DELTA, PERIOD, schedulable!(Self::process), ())
.unwrap();
}
#[nexosim(schedulable)]
async fn process(&mut self, _: (), _: &Context<Self>, env: &mut ListenerEnv) {
while let Ok(message) = env.rx.try_recv() {
self.message.send(message).await;
}
}
fn listen(tx: Sender<String>, start: Notifier) {
let socket = UdpSocket::bind(RECEIVER).unwrap();
let mut buf = [0; 1 << 16];
start.notify();
loop {
match socket.recv_from(&mut buf) {
Ok((packet_size, _)) => {
if let Ok(message) = std::str::from_utf8(&buf[..packet_size]) {
if message == SHUTDOWN_SIGNAL {
break;
}
if tx.send(message.into()).is_err() {
break;
}
};
}
Err(e) if e.kind() == ErrorKind::Interrupted => {
continue;
}
_ => {
break;
}
}
}
}
}
pub struct ListenerEnv {
rx: Receiver<String>,
server_handle: Option<JoinHandle<()>>,
}
impl ListenerEnv {
pub fn new(rx: Receiver<String>, server_handle: JoinHandle<()>) -> Self {
Self {
rx,
server_handle: Some(server_handle),
}
}
}
impl Drop for ListenerEnv {
fn drop(&mut self) {
if let Some(handle) = self.server_handle.take() {
let _ = handle.join();
};
}
}
struct WaitBarrier(Arc<(Mutex<bool>, Condvar)>);
impl WaitBarrier {
fn new() -> Self {
Self(Arc::new((Mutex::new(false), Condvar::new())))
}
fn notifier(&self) -> Notifier {
Notifier(self.0.clone())
}
fn wait(self) {
let _unused = self
.0
.1
.wait_while(self.0.0.lock().unwrap(), |pending| *pending)
.unwrap();
}
}
struct Notifier(Arc<(Mutex<bool>, Condvar)>);
impl Notifier {
fn notify(self) {
*self.0.0.lock().unwrap() = false;
self.0.1.notify_one();
}
}
fn main() -> Result<(), SimulationError> {
let start = WaitBarrier::new();
let mut listener = ProtoListener::new(start.notifier());
let listener_mbox = Mailbox::new();
let (sink, mut message) = event_queue(SinkState::Enabled);
listener.message.connect_sink(sink);
let t0 = MonotonicTime::EPOCH; let mut simu = SimInit::new()
.add_model(listener, listener_mbox, "listener")
.with_clock(
AutoSystemClock::new(),
PeriodicTicker::new(Duration::from_millis(100)),
)
.init(t0)?;
let sender_handle = thread::spawn(move || {
let socket = UdpSocket::bind(SENDER).unwrap();
start.wait();
for i in 0..N {
socket.send_to(i.to_string().as_bytes(), RECEIVER).unwrap();
if i % 3 == 0 {
sleep(PERIOD * i as u32)
}
}
socket
});
simu.step_until(Duration::from_secs(2))?;
let socket = sender_handle.join().unwrap();
socket
.send_to(SHUTDOWN_SIGNAL.as_bytes(), RECEIVER)
.unwrap();
let mut packets = 0_u32;
for _ in 0..N {
packets |= 1 << message.try_read().unwrap().parse::<u8>().unwrap();
}
assert_eq!(packets, u32::MAX >> 22);
assert_eq!(message.try_read(), None);
Ok(())
}