odem-rs 0.3.0

Object-based Discrete-Event Modelling in Rust using async/await
Documentation
//! This example demonstrates waker-based inter-process communication by
//! building a custom single-message async channel (`Uplink`).
//!
//! An operator process creates and activates a sleeping agent, then sends it a
//! message after a delay. The agent suspends until a message arrives, using the
//! low-level `waker()` and `sleep()` primitives to register and wake the
//! receiver. This pattern is useful for implementing bespoke synchronization
//! mechanisms on top of the odem-rs runtime.

use odem_rs::{
	core::ops::waker,
	prelude::*,
	tracing::{Level, info},
};
use std::{cell::Cell, pin::pin, task::Waker};

use Message::*;

// Internal state of the Uplink channel.
enum Message<T> {
	// Channel is ready, and may hold a message sent by the operator.
	Primed(Option<T>),
	// A receiver is suspended, waiting for a message.
	Linked(Waker),
}

/// A single-message async channel for "interrupting" an Agent.
struct Uplink<T>(Cell<Message<T>>);

impl<T> Uplink<T> {
	/// Creates a new, empty `Uplink` channel.
	pub fn new() -> Self {
		Self(Cell::new(Primed(None)))
	}

	/// Sends a message to the channel, waking a waiting receiver if one exists.
	pub fn interrupt(&self, value: T) -> Result<(), T> {
		match self.0.replace(Primed(Some(value))) {
			Primed(None) => Ok(()),
			Primed(Some(old_val)) => Err(old_val), // Channel was already full
			Linked(waker) => {
				waker.wake();
				Ok(())
			}
		}
	}

	/// Asynchronously waits for and returns a message from the channel.
	pub async fn interruption(&self) -> T {
		loop {
			// Check for a message first. If present, take it and return.
			if let Primed(Some(v)) = self.0.replace(Primed(None)) {
				break v;
			}
			// If no message, store our waker and suspend.
			self.0.set(Linked(waker().await));
			sleep().await;
		}
	}
}

// The Agent's state includes the communication channel.
struct Sleeper {
	pub link: Uplink<i32>,
}

// The Agent's behavior is to simply wait for a message.
impl Behavior for Sleeper {
	type Output = ();

	async fn actions(&self, _: &Sim<()>) -> Self::Output {
		info!("Order: {}", self.link.interruption().await);
	}
}

// An external task that creates and interrupts the Agent.
async fn operator(sim: &Sim) {
	let smith = pin!(Agent::new(Sleeper {
		link: Uplink::new()
	}));
	let puck = sim.activate(smith);

	sim.advance(1.0).await;
	puck.subject().link.interrupt(66).expect("awaiting orders");
	sim.advance(1.0).await;
}

fn main() {
	tracing_subscriber::fmt()
		.with_max_level(Level::INFO)
		.with_target(false)
		.with_timer(model_time!("[{time}]"))
		.init();

	Simulator::default().run(operator).unwrap();
}