use std::sync::mpsc::{Receiver, channel};
use std::thread::{self, sleep};
use std::time::Duration;
use serde::{Deserialize, Serialize};
use nexosim::model::{Context, Model, ProtoModel, schedulable};
use nexosim::ports::{EventSinkReader, Output, SinkState, event_queue};
use nexosim::simulation::{ExecutionError, Mailbox, SimInit, SimulationError};
use nexosim::time::{AutoSystemClock, MonotonicTime, PeriodicTicker};
const DELTA: Duration = Duration::from_millis(2);
const PERIOD: Duration = Duration::from_millis(20);
const N: usize = 10;
#[derive(Serialize, Deserialize)]
pub struct Listener {
pub message: Output<String>,
}
#[Model(type Env=ListenerEnv)]
impl Listener {
fn new(message: Output<String>) -> Self {
Self { message }
}
#[nexosim(init)]
async fn init(&mut self, cx: &Context<Self>) {
cx.schedule_periodic_event(DELTA, PERIOD, schedulable!(Self::process), ())
.unwrap();
}
#[nexosim(schedulable)]
async fn process(&mut self, _: (), _: &Context<Self>, env: &mut ListenerEnv) {
while let Ok(message) = env.external.try_recv() {
self.message.send(message).await;
}
}
}
pub struct ListenerEnv {
external: Receiver<String>,
}
impl ListenerEnv {
fn new(external: Receiver<String>) -> Self {
Self { external }
}
}
struct ProtoListener {
external: Receiver<String>,
pub message: Output<String>,
}
impl ProtoListener {
pub fn new(external: Receiver<String>) -> Self {
Self {
external,
message: Output::default(),
}
}
}
impl ProtoModel for ProtoListener {
type Model = Listener;
fn build(
self,
_: &mut nexosim::model::BuildContext<Self>,
) -> (Self::Model, <Self::Model as Model>::Env) {
(Listener::new(self.message), ListenerEnv::new(self.external))
}
}
fn main() -> Result<(), SimulationError> {
let (tx, rx) = channel();
let mut listener = ProtoListener::new(rx);
let listener_mbox = Mailbox::new();
let (sink, mut message) = event_queue(SinkState::Enabled);
listener.message.connect_sink(sink);
let t0 = MonotonicTime::EPOCH; let mut simu = SimInit::new()
.add_model(listener, listener_mbox, "listener")
.with_clock(
AutoSystemClock::new(),
PeriodicTicker::new(Duration::from_millis(100)),
)
.init(t0)?;
let scheduler = simu.scheduler();
let simulation_handle = thread::spawn(move || {
simu.run()
});
for i in 0..N {
tx.send(i.to_string()).unwrap();
if i % 3 == 0 {
sleep(PERIOD * i as u32)
}
}
for i in 0..N {
assert_eq!(message.try_read().unwrap(), i.to_string());
}
assert_eq!(message.try_read(), None);
scheduler.halt();
match simulation_handle.join().unwrap() {
Err(ExecutionError::Halted) => Ok(()),
Err(e) => Err(e.into()),
_ => Ok(()),
}
}