1use core::{
24 cell::Cell,
25 fmt,
26 future::{Future, IntoFuture},
27 pin::Pin,
28 task::Waker,
29 task::{Context, Poll},
30};
31
32use intrusive_collections::UnsafeRef;
33
34use crate::{
35 Publisher, Subscriber,
36 chain::{Chain, Link},
37};
38
39pub mod imp;
40
41pub trait Expr {
45 type Output;
47
48 fn get(&self) -> Self::Output;
50}
51
52#[derive(Default)]
63pub struct Control<T: ?Sized> {
64 queue: Chain<UnsafeRef<Predicate>>,
66 value: Cell<T>,
68}
69
70impl<T> Control<T> {
71 #[inline]
73 pub fn new(value: T) -> Self {
74 Self {
75 value: Cell::new(value),
76 queue: Chain::new(),
77 }
78 }
79
80 pub fn set(&self, val: T) {
84 self.value.set(val);
85 self.notify();
86 }
87
88 #[inline]
90 pub fn get(&self) -> T
91 where
92 T: Copy,
93 {
94 self.value.get()
95 }
96
97 pub fn swap(&self, other: &Self) {
101 self.value.swap(&other.value);
102 self.notify();
103 }
104
105 pub fn replace(&self, val: T) -> T {
110 let res = self.value.replace(val);
111 self.notify();
112 res
113 }
114
115 #[inline]
117 pub fn update<F>(&self, f: F)
118 where
119 F: FnOnce(T) -> T,
120 T: Copy,
121 {
122 self.set(f(self.get()))
123 }
124
125 #[inline]
128 pub fn take(&self) -> T
129 where
130 T: Default,
131 {
132 self.replace(T::default())
133 }
134
135 #[inline]
140 pub fn get_mut(&mut self) -> &mut T {
141 self.value.get_mut()
142 }
143
144 #[inline]
152 pub fn unchecked_set(&self, val: T) {
153 self.value.set(val);
154 }
155}
156
157impl<T: ?Sized> Publisher for Control<T> {
158 type Link = Link<UnsafeRef<Predicate>>;
159
160 #[inline]
161 unsafe fn subscribe(&self, link: Pin<&Self::Link>) {
162 unsafe {
163 self.queue.subscribe(link);
164 }
165 }
166
167 #[inline]
168 unsafe fn unsubscribe(&self, link: Pin<&Self::Link>) {
169 unsafe {
170 self.queue.unsubscribe(link);
171 }
172 }
173}
174
175impl<T: ?Sized> Subscriber for Control<T> {
176 #[inline]
177 fn notify(&self) {
178 self.queue.notify_all().filter(Expr::get).go();
180 }
181}
182
183impl<T: Copy> Expr for Control<T> {
184 type Output = T;
185
186 #[inline]
187 fn get(&self) -> T {
188 self.value.get()
189 }
190}
191
192impl<T: fmt::Debug> fmt::Debug for Control<T> {
193 #[inline]
194 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
195 f.debug_tuple("Control")
196 .field(unsafe { &*self.value.as_ptr() })
197 .finish()
198 }
199}
200
201impl<T: fmt::Display> fmt::Display for Control<T> {
202 #[inline]
203 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
204 unsafe { &*self.value.as_ptr() }.fmt(f)
205 }
206}
207
208#[derive(Copy, Clone)]
213pub struct ControlExpr<C, F> {
214 depends: C,
216 closure: F,
219}
220
221impl<C, F> ControlExpr<C, F>
222where
223 C: Publisher,
224 F: Fn(&C) -> bool,
225{
226 #[inline]
229 pub const fn new(ctrl: C, code: F) -> Self {
230 ControlExpr {
231 depends: ctrl,
232 closure: code,
233 }
234 }
235}
236
237impl<C, F> Publisher for ControlExpr<C, F>
238where
239 C: Publisher,
240{
241 type Link = C::Link;
242
243 #[inline]
244 unsafe fn subscribe(&self, link: Pin<&Self::Link>) {
245 unsafe {
246 self.depends.subscribe(link);
247 }
248 }
249
250 #[inline]
251 unsafe fn unsubscribe(&self, link: Pin<&Self::Link>) {
252 unsafe {
253 self.depends.unsubscribe(link);
254 }
255 }
256}
257
258impl<C, F, R> Expr for ControlExpr<C, F>
259where
260 F: Fn(&C) -> R,
261{
262 type Output = R;
263
264 fn get(&self) -> R {
265 (self.closure)(&self.depends)
266 }
267}
268
269impl<C, F> IntoFuture for ControlExpr<C, F>
270where
271 C: Publisher,
272 F: Fn(&C) -> bool,
273 C::Link: From<UnsafeRef<Predicate>>,
274{
275 type Output = ();
276 type IntoFuture = Until<C, F>;
277
278 #[inline]
279 fn into_future(self) -> Self::IntoFuture {
280 Until::new(self)
281 }
282}
283
284#[pin_project::pin_project(PinnedDrop)]
298pub struct Until<C: Publisher, F> {
299 #[pin]
301 poll: Predicate<ControlExpr<C, F>>,
302 #[pin]
304 link: Option<C::Link>,
305}
306
307impl<C: Publisher, F> Until<C, F> {
308 #[inline]
311 pub const fn new(cond: ControlExpr<C, F>) -> Self {
312 Until {
313 poll: Predicate {
314 note: Cell::new(None),
315 cond,
316 },
317 link: None,
318 }
319 }
320}
321
322impl<C, F> Future for Until<C, F>
323where
324 C: Publisher,
325 F: Fn(&C) -> bool,
326 C::Link: From<UnsafeRef<Predicate>>,
327{
328 type Output = ();
329
330 fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
331 if self.poll.get() {
333 return Poll::Ready(());
334 }
335
336 let mut projection = self.project();
337
338 projection.poll.note.set(Some(cx.waker().clone()));
340
341 if let Some(link) = projection.link.as_ref().as_pin_ref() {
345 unsafe {
346 projection.poll.cond.unsubscribe(link);
347 }
348 }
349
350 projection.link.set(Some(
354 unsafe {
355 UnsafeRef::from_raw(&*core::mem::transmute::<
356 Pin<&Predicate<dyn Expr<Output = bool> + '_>>,
357 Pin<&Predicate<dyn Expr<Output = bool> + 'static>>,
358 >(projection.poll.as_ref()))
359 }
360 .into(),
361 ));
362
363 unsafe {
367 projection
368 .poll
369 .cond
370 .subscribe(projection.link.as_ref().as_pin_ref().unwrap_unchecked());
371 }
372
373 Poll::Pending
375 }
376}
377
378#[pin_project::pinned_drop]
379impl<C: Publisher, F> PinnedDrop for Until<C, F> {
380 #[inline]
381 fn drop(self: Pin<&mut Self>) {
382 if let Some(link) = self.as_ref().project_ref().link.as_pin_ref() {
384 unsafe { self.poll.cond.unsubscribe(link) }
385 }
386 }
387}
388
389pub struct Predicate<C: ?Sized = dyn Expr<Output = bool>> {
391 note: Cell<Option<Waker>>,
393 cond: C,
395}
396
397impl<C: Expr<Output = bool> + ?Sized> Expr for Predicate<C> {
398 type Output = bool;
399
400 #[inline]
401 fn get(&self) -> bool {
402 self.cond.get()
403 }
404}
405
406impl<C: ?Sized> Subscriber for Predicate<C> {
407 #[inline]
408 fn notify(&self) {
409 if let Some(waker) = self.note.take() {
411 waker.wake();
412 }
413 }
414}
415
416macro_rules! impl_primitive_expr {
420 ($($L:ty),*) => {$(
421 impl Expr for $L {
422 type Output = $L;
423
424 #[inline(always)]
425 fn get(&self) -> $L { *self }
426 }
427 )*};
428}
429
430impl_primitive_expr!(
431 (),
432 bool,
433 char,
434 f32,
435 f64,
436 u8,
437 u16,
438 u32,
439 u64,
440 u128,
441 usize,
442 i8,
443 i16,
444 i32,
445 i64,
446 i128,
447 isize
448);
449
450impl<T: ?Sized + Expr> Expr for &'_ T {
451 type Output = T::Output;
452
453 #[inline(always)]
454 fn get(&self) -> Self::Output {
455 (**self).get()
456 }
457}
458
459impl<T: ?Sized + Expr> Expr for UnsafeRef<T> {
460 type Output = T::Output;
461
462 #[inline(always)]
463 fn get(&self) -> Self::Output {
464 (**self).get()
465 }
466}
467
468#[cfg(test)]
471mod tests {
472 use super::{Control, imp::*};
473 use crate::{Publisher, until};
474 use core::pin::pin;
475 use odem_rs_core::{
476 job::Job,
477 simulator::{Sim, Simulator},
478 };
479 use std::boxed::Box;
480
481 #[derive(Publisher)]
482 struct Foo(#[subscribe] Control<bool>);
483
484 impl Foo {
485 fn is_false(&self) -> bool {
486 !self.0.get()
487 }
488 }
489
490 #[test]
491 fn simple_control_expr() {
492 async fn sim_main(sim: &Sim) -> f64 {
493 until!(true).await;
494 sim.now()
495 }
496
497 let time = Simulator::default()
498 .run(sim_main)
499 .expect("no errors during execution");
500
501 assert_eq!(time, 0.0);
502 }
503
504 #[test]
505 fn complex_control_expr() {
506 async fn sim_main(sim: &Sim) -> f64 {
507 let t1 = false;
508 let t2 = Foo(Control::new(false));
509 let t3 = Control::new(false);
510 let t4 = Box::new(Control::new(false));
511 let t5 = &t4;
512
513 until!(t1 || t2.is_false() || t3 || t4 || t5).await;
514 sim.now()
515 }
516
517 let time = Simulator::default()
518 .run(sim_main)
519 .expect("no errors during execution");
520 assert_eq!(time, 0.0);
521 }
522
523 #[test]
524 fn simple_scenario() {
525 async fn sim_main(sim: &Sim) -> f64 {
526 let t1 = Control::new(false);
527 let job = pin!(Job::new(async {
528 sim.advance(1.0).await;
529 t1.set(true);
530 }));
531
532 sim.activate(job);
533 until!(t1).await;
534 sim.now()
535 }
536
537 let time = Simulator::default()
538 .run(sim_main)
539 .expect("no errors during execution");
540 assert_eq!(time, 1.0);
541 }
542
543 #[test]
544 fn complex_scenario() {
545 async fn sim_main(sim: &Sim) -> f64 {
546 let t1 = Control::new(0);
547 let t2 = Control::new(false);
548
549 let job = pin!(Job::new(async {
550 loop {
551 sim.advance(1.0).await;
552 t1.update(|inner| inner + 1);
553 t2.update(|inner| !inner);
554 }
555 }));
556
557 sim.activate(job);
558 until!(t1 > 5 && t2).await;
559 sim.now()
560 }
561
562 let time = Simulator::default()
563 .run(sim_main)
564 .expect("no errors during execution");
565 assert_eq!(time, 7.0);
566 }
567}