odem_rs_sync/
control.rs

1//! This module contains support for awaiting arbitrary state events in the form
2//! of control expressions.
3//!
4//! Control expressions are types that implement the [Publisher] and [Expr]
5//! trait, which allows them to [calculate] a value and [notify] dependent
6//! continuations of changes in their value.
7//! Once a continuation has subscribed, it is automatically activated by the control
8//! expression whenever its internal state changes.
9//!
10//! This is used for implementing a mechanism that allows processes to await
11//! specific conditions regarding the system state, which is made
12//! ergonomic through the [until!]-macro.
13//!
14//! The simplest control expressions are [Control]-variables that capture a
15//! single, [copyable] value and automatically notify subscribed continuations when it
16//! changes.
17//!
18//! [calculate]: Expr::get
19//! [notify]: Subscriber::notify
20//! [until!]: crate::until
21//! [copyable]: Copy
22
23use core::{
24	cell::Cell,
25	fmt,
26	future::{Future, IntoFuture},
27	pin::Pin,
28	task::Waker,
29	task::{Context, Poll},
30};
31
32use intrusive_collections::UnsafeRef;
33
34use crate::{
35	Publisher, Subscriber,
36	chain::{Chain, Link},
37};
38
39pub mod imp;
40
41/* ******************************************************* Control Expression */
42
43/// An object-safe trait for repeatedly evaluating a complex expression.
44pub trait Expr {
45	/// The result type.
46	type Output;
47
48	/// The routine used for evaluation.
49	fn get(&self) -> Self::Output;
50}
51
52/* ********************************************************* Control Variable */
53
54/// Marks a variable as a state variable which can be used in conjunction with
55/// the [until!]-macro.
56///
57/// Whenever a control variable changes its value, the runtime system checks
58/// whether any other parts of the model are currently waiting for the variable
59/// to attain a certain value.
60///
61/// [until!]: crate::until!
62#[derive(Default)]
63pub struct Control<T: ?Sized> {
64	/// A [Chain] of waiting continuations.
65	queue: Chain<UnsafeRef<Predicate>>,
66	/// The inner value.
67	value: Cell<T>,
68}
69
70impl<T> Control<T> {
71	/// Creates a new control variable and initializes its value.
72	#[inline]
73	pub fn new(value: T) -> Self {
74		Self {
75			value: Cell::new(value),
76			queue: Chain::new(),
77		}
78	}
79
80	/// Assigns a new value to the control variable.
81	///
82	/// This can lead to potential activations of waiting processes.
83	pub fn set(&self, val: T) {
84		self.value.set(val);
85		self.notify();
86	}
87
88	/// Returns a copy of the stored inner value.
89	#[inline]
90	pub fn get(&self) -> T
91	where
92		T: Copy,
93	{
94		self.value.get()
95	}
96
97	/// Swaps the values of two control variables. The difference to
98	/// [`core::mem::swap`] is that this function doesn't require an exclusive
99	/// reference.
100	pub fn swap(&self, other: &Self) {
101		self.value.swap(&other.value);
102		self.notify();
103	}
104
105	/// Replaces the current value of the control variable with a new one and
106	/// returns it.
107	///
108	/// This can lead to potential activations of waiting processes.
109	pub fn replace(&self, val: T) -> T {
110		let res = self.value.replace(val);
111		self.notify();
112		res
113	}
114
115	/// Updates the contained value using a function and returns the new value.
116	#[inline]
117	pub fn update<F>(&self, f: F)
118	where
119		F: FnOnce(T) -> T,
120		T: Copy,
121	{
122		self.set(f(self.get()))
123	}
124
125	/// Takes the value of the control variable, leaving `Default::default()` in
126	/// its place.
127	#[inline]
128	pub fn take(&self) -> T
129	where
130		T: Default,
131	{
132		self.replace(T::default())
133	}
134
135	/// Returns a mutable reference to the underlying data.
136	///
137	/// This call borrows the control variable mutably (at compile-time) which
138	/// guarantees that we possess the only reference.
139	#[inline]
140	pub fn get_mut(&mut self) -> &mut T {
141		self.value.get_mut()
142	}
143
144	/// Assigns a new value to the control variable **without notifying pending
145	/// control conditions**.
146	///
147	/// Use this method if you want to change the inner value without scheduling
148	/// continuations that depend on it. Once finished, you may call [notify] directly.
149	///
150	/// [notify]: Self::notify
151	#[inline]
152	pub fn unchecked_set(&self, val: T) {
153		self.value.set(val);
154	}
155}
156
157impl<T: ?Sized> Publisher for Control<T> {
158	type Link = Link<UnsafeRef<Predicate>>;
159
160	#[inline]
161	unsafe fn subscribe(&self, link: Pin<&Self::Link>) {
162		unsafe {
163			self.queue.subscribe(link);
164		}
165	}
166
167	#[inline]
168	unsafe fn unsubscribe(&self, link: Pin<&Self::Link>) {
169		unsafe {
170			self.queue.unsubscribe(link);
171		}
172	}
173}
174
175impl<T: ?Sized> Subscriber for Control<T> {
176	#[inline]
177	fn notify(&self) {
178		// notify the set of waiting continuations with fulfilled conditions
179		self.queue.notify_all().filter(Expr::get).go();
180	}
181}
182
183impl<T: Copy> Expr for Control<T> {
184	type Output = T;
185
186	#[inline]
187	fn get(&self) -> T {
188		self.value.get()
189	}
190}
191
192impl<T: fmt::Debug> fmt::Debug for Control<T> {
193	#[inline]
194	fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
195		f.debug_tuple("Control")
196			.field(unsafe { &*self.value.as_ptr() })
197			.finish()
198	}
199}
200
201impl<T: fmt::Display> fmt::Display for Control<T> {
202	#[inline]
203	fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
204		unsafe { &*self.value.as_ptr() }.fmt(f)
205	}
206}
207
208/* ******************************************************* Control Expression */
209
210/// Container for complex control expressions that combines a set of [Control]
211/// variables with a closure that depends on the values of the control set.
212#[derive(Copy, Clone)]
213pub struct ControlExpr<C, F> {
214	/// The set of (potential) control variables that the closure depends on.
215	depends: C,
216	/// A function closure accepting the current values of the control variables
217	/// and calculating the result.
218	closure: F,
219}
220
221impl<C, F> ControlExpr<C, F>
222where
223	C: Publisher,
224	F: Fn(&C) -> bool,
225{
226	/// Creates a new control expression given a set of [Control] variables and
227	/// a closure that depends on them.
228	#[inline]
229	pub const fn new(ctrl: C, code: F) -> Self {
230		ControlExpr {
231			depends: ctrl,
232			closure: code,
233		}
234	}
235}
236
237impl<C, F> Publisher for ControlExpr<C, F>
238where
239	C: Publisher,
240{
241	type Link = C::Link;
242
243	#[inline]
244	unsafe fn subscribe(&self, link: Pin<&Self::Link>) {
245		unsafe {
246			self.depends.subscribe(link);
247		}
248	}
249
250	#[inline]
251	unsafe fn unsubscribe(&self, link: Pin<&Self::Link>) {
252		unsafe {
253			self.depends.unsubscribe(link);
254		}
255	}
256}
257
258impl<C, F, R> Expr for ControlExpr<C, F>
259where
260	F: Fn(&C) -> R,
261{
262	type Output = R;
263
264	fn get(&self) -> R {
265		(self.closure)(&self.depends)
266	}
267}
268
269impl<C, F> IntoFuture for ControlExpr<C, F>
270where
271	C: Publisher,
272	F: Fn(&C) -> bool,
273	C::Link: From<UnsafeRef<Predicate>>,
274{
275	type Output = ();
276	type IntoFuture = Until<C, F>;
277
278	#[inline]
279	fn into_future(self) -> Self::IntoFuture {
280		Until::new(self)
281	}
282}
283
284/* ************************************************************* Until Future */
285
286/// Notify dispatcher that includes the boolean condition being awaited.
287///
288/// The basic idea is to reduce the number of spurious reactivations of waiting
289/// continuations by checking if the condition being awaited evaluated to `true` for the
290/// specific state-change triggering the re-evaluation and not re-awakening the
291/// continuation if it doesn't.
292///
293/// Even if it does, the re-awakened continuation still needs to check the condition
294/// since its truth value may have changed between the scheduling and the
295/// activation, but this technique still has the potential to cut down on a lot
296/// of superfluous scheduling operations.
297#[pin_project::pin_project(PinnedDrop)]
298pub struct Until<C: Publisher, F> {
299	/// A predicate of the boolean condition.
300	#[pin]
301	poll: Predicate<ControlExpr<C, F>>,
302	/// Space for the link needed during polling.
303	#[pin]
304	link: Option<C::Link>,
305}
306
307impl<C: Publisher, F> Until<C, F> {
308	/// Creates a new future that blocks on being awaited until the passed
309	/// condition is fulfilled.
310	#[inline]
311	pub const fn new(cond: ControlExpr<C, F>) -> Self {
312		Until {
313			poll: Predicate {
314				note: Cell::new(None),
315				cond,
316			},
317			link: None,
318		}
319	}
320}
321
322impl<C, F> Future for Until<C, F>
323where
324	C: Publisher,
325	F: Fn(&C) -> bool,
326	C::Link: From<UnsafeRef<Predicate>>,
327{
328	type Output = ();
329
330	fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
331		// test if the condition evaluates to true and terminate if it does
332		if self.poll.get() {
333			return Poll::Ready(());
334		}
335
336		let mut projection = self.project();
337
338		// clone the waker
339		projection.poll.note.set(Some(cx.waker().clone()));
340
341		// unsubscribe from the control set
342		// SAFETY: we will only unsubscribe if we have been subscribed before
343		// using the original link
344		if let Some(link) = projection.link.as_ref().as_pin_ref() {
345			unsafe {
346				projection.poll.cond.unsubscribe(link);
347			}
348		}
349
350		// fill the option with the link
351		// SAFETY: the lifetime-extension is justified because we are sure to
352		// unsubscribe during drop; drop happens because of pin
353		projection.link.set(Some(
354			unsafe {
355				UnsafeRef::from_raw(&*core::mem::transmute::<
356					Pin<&Predicate<dyn Expr<Output = bool> + '_>>,
357					Pin<&Predicate<dyn Expr<Output = bool> + 'static>>,
358				>(projection.poll.as_ref()))
359			}
360			.into(),
361		));
362
363		// subscribe to the control set
364		// SAFETY: pinning guarantees that the link's address is stable
365		// until this future drops, which will also unsubscribe the link
366		unsafe {
367			projection
368				.poll
369				.cond
370				.subscribe(projection.link.as_ref().as_pin_ref().unwrap_unchecked());
371		}
372
373		// await the next reactivation
374		Poll::Pending
375	}
376}
377
378#[pin_project::pinned_drop]
379impl<C: Publisher, F> PinnedDrop for Until<C, F> {
380	#[inline]
381	fn drop(self: Pin<&mut Self>) {
382		// unsubscribe from a control set
383		if let Some(link) = self.as_ref().project_ref().link.as_pin_ref() {
384			unsafe { self.poll.cond.unsubscribe(link) }
385		}
386	}
387}
388
389/// A boolean predicate with an attached [Waker] to notify.
390pub struct Predicate<C: ?Sized = dyn Expr<Output = bool>> {
391	/// The [Waker] to notify.
392	note: Cell<Option<Waker>>,
393	/// The boolean condition as a dynamically dispatched trait.
394	cond: C,
395}
396
397impl<C: Expr<Output = bool> + ?Sized> Expr for Predicate<C> {
398	type Output = bool;
399
400	#[inline]
401	fn get(&self) -> bool {
402		self.cond.get()
403	}
404}
405
406impl<C: ?Sized> Subscriber for Predicate<C> {
407	#[inline]
408	fn notify(&self) {
409		// only wake the process once
410		if let Some(waker) = self.note.take() {
411			waker.wake();
412		}
413	}
414}
415
416/* *************************************************************** Expr Impls */
417
418// implement Expr for primitive data types
419macro_rules! impl_primitive_expr {
420	($($L:ty),*) => {$(
421		impl Expr for $L {
422			type Output = $L;
423
424			#[inline(always)]
425			fn get(&self) -> $L { *self }
426		}
427	)*};
428}
429
430impl_primitive_expr!(
431	(),
432	bool,
433	char,
434	f32,
435	f64,
436	u8,
437	u16,
438	u32,
439	u64,
440	u128,
441	usize,
442	i8,
443	i16,
444	i32,
445	i64,
446	i128,
447	isize
448);
449
450impl<T: ?Sized + Expr> Expr for &'_ T {
451	type Output = T::Output;
452
453	#[inline(always)]
454	fn get(&self) -> Self::Output {
455		(**self).get()
456	}
457}
458
459impl<T: ?Sized + Expr> Expr for UnsafeRef<T> {
460	type Output = T::Output;
461
462	#[inline(always)]
463	fn get(&self) -> Self::Output {
464		(**self).get()
465	}
466}
467
468/* ******************************************************************* Macros */
469
470#[cfg(test)]
471mod tests {
472	use super::{Control, imp::*};
473	use crate::{Publisher, until};
474	use core::pin::pin;
475	use odem_rs_core::{
476		job::Job,
477		simulator::{Sim, Simulator},
478	};
479	use std::boxed::Box;
480
481	#[derive(Publisher)]
482	struct Foo(#[subscribe] Control<bool>);
483
484	impl Foo {
485		fn is_false(&self) -> bool {
486			!self.0.get()
487		}
488	}
489
490	#[test]
491	fn simple_control_expr() {
492		async fn sim_main(sim: &Sim) -> f64 {
493			until!(true).await;
494			sim.now()
495		}
496
497		let time = Simulator::default()
498			.run(sim_main)
499			.expect("no errors during execution");
500
501		assert_eq!(time, 0.0);
502	}
503
504	#[test]
505	fn complex_control_expr() {
506		async fn sim_main(sim: &Sim) -> f64 {
507			let t1 = false;
508			let t2 = Foo(Control::new(false));
509			let t3 = Control::new(false);
510			let t4 = Box::new(Control::new(false));
511			let t5 = &t4;
512
513			until!(t1 || t2.is_false() || t3 || t4 || t5).await;
514			sim.now()
515		}
516
517		let time = Simulator::default()
518			.run(sim_main)
519			.expect("no errors during execution");
520		assert_eq!(time, 0.0);
521	}
522
523	#[test]
524	fn simple_scenario() {
525		async fn sim_main(sim: &Sim) -> f64 {
526			let t1 = Control::new(false);
527			let job = pin!(Job::new(async {
528				sim.advance(1.0).await;
529				t1.set(true);
530			}));
531
532			sim.activate(job);
533			until!(t1).await;
534			sim.now()
535		}
536
537		let time = Simulator::default()
538			.run(sim_main)
539			.expect("no errors during execution");
540		assert_eq!(time, 1.0);
541	}
542
543	#[test]
544	fn complex_scenario() {
545		async fn sim_main(sim: &Sim) -> f64 {
546			let t1 = Control::new(0);
547			let t2 = Control::new(false);
548
549			let job = pin!(Job::new(async {
550				loop {
551					sim.advance(1.0).await;
552					t1.update(|inner| inner + 1);
553					t2.update(|inner| !inner);
554				}
555			}));
556
557			sim.activate(job);
558			until!(t1 > 5 && t2).await;
559			sim.now()
560		}
561
562		let time = Simulator::default()
563			.run(sim_main)
564			.expect("no errors during execution");
565		assert_eq!(time, 7.0);
566	}
567}