odem_rs_core/continuation/mod.rs
1//! The `continuation` module provides the implementation of a type-erased,
2//! intrusively linked futures abstraction used in the scheduler of the
3//! simulation library.
4//!
5//! Continuations are held by a calendar, allowing them to be scheduled and
6//! managed based on model time.
7//!
8//! The [`Continuation`] type represents a continuation that can be scheduled
9//! and executed within the simulation library. It is designed to handle
10//! type-erased futures and supports branding to tie [token witnesses] to their
11//! runtime-state, enabling statically checked validity of state transitions.
12//!
13//! [token witnesses]: token::State
14
15use core::{
16 any::Any,
17 cell::Cell,
18 fmt,
19 hint::unreachable_unchecked,
20 panic::Location,
21 pin::Pin,
22 ptr::NonNull,
23 sync::atomic::{AtomicUsize, Ordering},
24 task::{Context, Poll},
25};
26use intrusive_collections::{LinkedList, RBTreeLink};
27
28pub use adapter::{Adapter, PointerOps};
29pub use puck::Puck;
30pub use share::{Label, Share};
31
32use crate::{
33 Dispatch, ExitStatus,
34 calendar::{PlanState, Scheduler},
35 config::Config,
36 error::NotIdle,
37 fsm::*,
38 ptr::{IntrusivelyCounted, Irc, IrcBox, IrcBoxed},
39 simulator::{Mark, Prec},
40};
41
42mod adapter;
43mod puck;
44mod share;
45
46/* ********************************************************************* Continuation */
47
48/// The type of intrusive link stored in every [Continuation].
49pub type Link = RBTreeLink;
50
51/// A structure of type-erased, intrusively linked [futures].
52///
53/// Pointers to instances of this type are used in the scheduler of the
54/// simulation library. They are oblivious to lifetime-restrictions and
55/// erase the concrete types of the future and the process instance.
56/// The latter types are recovered during dynamic lookup, but the
57/// lifetime-restrictions have to be observed by the abstractions provided
58/// in the simulation library.
59///
60/// Continuations may be *branded* using an additional lifetime that ties
61/// [Token] to specific instances. This allows composition and chaining of
62/// method calls that depend on specific [States] without having to
63/// continuously test the current states within the method implementations.
64///
65/// [futures]: Future
66/// [Token]: token
67/// [States]: State
68pub struct Continuation<'brand, C: ?Sized + Config> {
69 /// An intrusive link for insertion of a continuation into the calendar.
70 hook: Link,
71 /// Intrusive reference counter and raw pointer to the most specialized
72 /// version of the type containing this continuation.
73 ///
74 /// This is needed to provide run-time polymorphism over the type
75 /// of future being executed as well as what code is run on dropping. See
76 /// [Dispatch] for further information.
77 task_box: IrcBox<ContBox>,
78 /// Pointer to data shared across different continuations.
79 share: NonNull<Share<C>>,
80 /// A list of continuations waiting for this continuation to terminate.
81 pending: Cell<LinkedList<Adapter<C>>>,
82 /// Precedence of this continuation relative to other continuations from the
83 /// same agent.
84 prec: Cell<Prec>,
85 /// Current state of this continuation instance.
86 state: StateMachine<'brand, State<C>>,
87 /// Span information for the current continuation.
88 #[cfg(feature = "tracing")]
89 span: tracing::Span,
90}
91
92impl<C: ?Sized + Config> Continuation<'static, C> {
93 /// Creates a new continuation initialized in [State::Born] with specific
94 /// location information attached.
95 ///
96 /// [State::Born]: erased::State::Born
97 pub(crate) fn new(prec: Prec, location: &'static Location<'static>) -> Self {
98 Self {
99 hook: Link::new(),
100 task_box: IrcBox::with_location(ContBox::new(), location),
101 share: NonNull::dangling(),
102 pending: Cell::new(LinkedList::new(Adapter::NEW)),
103 prec: Cell::new(prec),
104 state: StateMachine::default(),
105 #[cfg(feature = "tracing")]
106 span: tracing::Span::current(),
107 }
108 }
109
110 /// Returns the exit code of the continuation, or `None` if it hasn't
111 /// terminated.
112 pub(crate) fn result(&self) -> Option<ExitStatus> {
113 self.brand(|task, once| {
114 task.token(once)
115 .into_term()
116 .map(|state| task.branded_result(&state))
117 .ok()
118 })
119 }
120
121 /// Reactivates the continuation at the current model-time.
122 pub(crate) fn wake(this: Irc<Self>) -> Result<(), NotIdle> {
123 this.clone().brand(|task, once| {
124 let idle = task.token(once).into_idle()?;
125
126 task.branded_share(&idle)
127 .sim()
128 .calendar()
129 .activate(task.clone(), idle);
130
131 Ok(())
132 })
133 }
134
135 /// Returns the next model time this continuation is scheduled for, or
136 /// `None` if it isn't scheduled.
137 pub(crate) fn time(&self) -> Option<C::Time> {
138 self.brand(|task, once| {
139 task.token(once)
140 .into_next()
141 .ok()
142 .map(|next| task.next_time(&next))
143 })
144 }
145}
146
147impl<'brand, C: ?Sized + Config> Continuation<'brand, C> {
148 /// Enters the [`Span`] associated with the agent owning the shared data.
149 ///
150 /// [`Span`]: tracing::Span
151 #[cfg(feature = "tracing")]
152 pub fn enter_span(&self) -> tracing::span::Entered<'_> {
153 self.span.enter()
154 }
155
156 #[cfg(not(feature = "tracing"))]
157 pub const fn enter_span(&self) {}
158
159 /// Sets a new virtual function pointer for this continuation.
160 ///
161 /// # Safety
162 /// The caller is responsible to ensure that the pointer stays valid for
163 /// the duration of the continuation's life.
164 pub(crate) unsafe fn set_vptr(&self, vptr: NonNull<dyn Dispatch + '_>) {
165 unsafe {
166 self.task_box.set_vptr(vptr);
167 }
168 }
169
170 /// Clears the virtual function pointer for this continuation, preventing any
171 /// methods from being called.
172 ///
173 /// This can be used to prevent [`Dispatch::reclaim`] from being called,
174 /// even if the reference counter reaches zero.
175 pub(crate) fn clear_vptr(&self) {
176 self.task_box.clear_vptr();
177 }
178
179 /// Returns the number of references to this continuation.
180 pub(crate) fn use_count(&self) -> usize {
181 self.task_box.refs.load(Ordering::Relaxed)
182 }
183
184 /// Returns a copy of the internal [State](erased::State).
185 pub(crate) fn state(&self) -> &StateMachine<'brand, State<C>> {
186 &self.state
187 }
188
189 /// Returns the current [Prec] of this continuation.
190 pub(crate) fn prec(&self) -> Prec {
191 self.prec.get()
192 }
193
194 /// Sets the new [Prec] of this continuation.
195 pub(crate) fn set_prec(&self, prec: Prec) {
196 self.prec.set(prec);
197 }
198
199 /// Adds another continuation to the list of continuations to be awoken upon terminating.
200 #[inline]
201 pub(crate) fn insert_pending(&self, other: Irc<Continuation<'static, C>>) {
202 let mut list = self.pending.take();
203 list.push_back(other);
204 self.pending.set(list);
205 }
206
207 /// Removes a previously added continuation from the list of pending continuations.
208 ///
209 /// # Safety
210 /// It is the caller's responsibility to ensure that the continuation had been
211 /// added previously via [Self::insert_pending].
212 pub(crate) unsafe fn remove_pending(&self, other: &Continuation<'static, C>) {
213 // Only unlink if the other continuation is actually linked right now.
214 // This can happen during panics, when the `drop` impl of a `Join`
215 // attempts to remove a stored continuation from the pending list, but
216 // it has already been removed by a prior reactivation.
217 if other.hook.is_linked() {
218 let mut list = self.pending.take();
219
220 unsafe {
221 list.cursor_mut_from_ptr(other.detach()).remove();
222 }
223
224 self.pending.set(list);
225 }
226 }
227
228 /// Awakens all pending continuations and clears the list.
229 pub(crate) fn wake_pending(&self) {
230 for task in self.pending.take() {
231 Continuation::wake(task).ok();
232 }
233 }
234
235 /// Converts the specific brand into a generic brand, breaking the
236 /// connection with the equally branded token.
237 pub(crate) fn detach(&self) -> &Continuation<'static, C> {
238 unsafe { core::mem::transmute(self) }
239 }
240
241 /// Returns the [`Location`] information for this `Continuation`.
242 pub(crate) fn location(&self) -> &'static Location<'static> {
243 IrcBox::location(&self.task_box)
244 }
245
246 /// Performs a runtime-check if this continuation has been dereferenced on the
247 /// same thread as the one the executor is running and panicks if that is
248 /// not the case.
249 ///
250 /// # Safety
251 /// This method can only be called after the `Continuation` has been
252 /// activated. Calling it is thread-safe.
253 unsafe fn is_same_thread(&self) -> bool {
254 // Extract the shared data from the task.
255 let share = unsafe { self.share.as_ref() };
256
257 // Compare the pointer-address of this continuation's simulation context
258 // to the pointer-address of the thread-local simulation context.
259 crate::erased::with(|sim| core::ptr::addr_eq(&**share.sim(), sim)).unwrap_or(false)
260 }
261
262 /// Returns an enumeration copy of the internal [State](token::State).
263 pub(crate) fn token(&self, once: Ephemeral<'brand>) -> token::State<'brand> {
264 self.state.token(once)
265 }
266
267 /// Binds a continuation to [shared data].
268 ///
269 /// # Safety
270 /// The caller is responsible to ensure that the shared-data-reference
271 /// outlives the (active) part of the continuation's life.
272 ///
273 /// [shared data]: Share
274 pub(crate) unsafe fn bind(
275 mut self: Pin<&mut Self>,
276 born: token::Born<'brand>,
277 share: &Share<C>,
278 ) -> token::Idle<'brand> {
279 // assign the reference to the shared data
280 self.share = NonNull::from(share);
281
282 // enter our `Span` to properly record the transition
283 let _span = self.enter_span();
284
285 // transition into state `Idle`
286 self.state.transition(born, ())
287 }
288
289 /// Purges the continuation from the calendar.
290 pub(crate) fn deschedule(&self, next: token::Next<'brand>) -> token::Idle<'brand> {
291 let share = self.branded_share(&next);
292
293 // remove the continuation from the calendar
294 share.sim().calendar().remove(self, next)
295 }
296
297 /// Removes the active state from the continuation.
298 pub(crate) fn deactivate(&self, busy: token::Busy<'brand>) -> token::Idle<'brand> {
299 let share = self.branded_share(&busy);
300
301 // deregister the continuation from the active cell
302 share.sim().unslot(self, busy)
303 }
304
305 /// Polls the underlying future of this continuation.
306 pub(crate) fn poll(&self, busy: token::Busy<'brand>, cx: &mut Context<'_>) -> Poll<()> {
307 // read the virtual function from the table
308
309 // SAFETY: the busy-token testifies that the continuation has been
310 // bound, which ensures that the vptr to the virtual function table has
311 // been set; pinning is ensured by the binding routine requiring it
312 let vtab = unsafe { Pin::new_unchecked(self.task_box.vptr.get().unwrap().as_ref()) };
313
314 // temporarily escape the branding to allow `Future::poll()` to rebrand
315 // without accidentally creating two tokens for the same instance
316 let (once, res) = self.state.debrand(busy, move |_| vtab.poll(cx));
317
318 // analyze the resulting state
319 match self.token(once).into_busy() {
320 Ok(busy) => match res {
321 Poll::Ready(result) => {
322 // busy -> done
323 let _: token::Done<'_> = self.state.transition(busy, result);
324
325 // reactivate pending continuations on completion
326 self.wake_pending();
327
328 Poll::Ready(())
329 }
330 Poll::Pending => {
331 // busy -> idle
332 let _: token::Idle<'_> = self.state.transition(busy, ());
333 Poll::Pending
334 }
335 },
336 Err(_err) => {
337 debug_assert!(
338 res.is_pending(),
339 "task in state `{:?}` should not have been able to terminate",
340 _err.0
341 );
342 Poll::Pending
343 }
344 }
345 }
346
347 /// Returns a reference to the shared data for this continuation.
348 pub(crate) fn branded_share<'s, I>(&'s self, init: &I) -> &'s Share<C>
349 where
350 I: Into<token::Init<'brand>>,
351 {
352 let _ = init;
353
354 // SAFETY: the token witness testifies that the shared data is
355 // initialized, which only happens during binding
356 unsafe { self.share.as_ref() }
357 }
358
359 /// Returns a reference to the shared data if initialization of this continuation
360 /// has been completed.
361 pub(crate) fn share(&self) -> Option<&Share<C>> {
362 if self.state().erased().is_init() {
363 // SAFETY: once bound, the pointer stays valid
364 Some(unsafe { self.share.as_ref() })
365 } else {
366 None
367 }
368 }
369
370 /// Grants access to the calendar state if in state [`Next`](State::Next).
371 pub(crate) fn next_state<F, R>(&self, next: &token::Next<'brand>, f: F) -> R
372 where
373 F: FnOnce(&<C::Plan as Scheduler>::State) -> R,
374 {
375 let _ = next;
376
377 // SAFETY: the `Next` token guarantees that this continuation is in the
378 // correct state
379 match &*self.state.borrow() {
380 State::Next(state) => f(state),
381 _ => unsafe { unreachable_unchecked() },
382 }
383 }
384
385 /// Returns the model time that the continuation will be activated.
386 pub(crate) fn next_time(&self, next: &token::Next<'brand>) -> C::Time {
387 self.next_state(next, |s| s.time())
388 }
389
390 /// Returns the [Cell] containing the current mark of the shared data
391 /// associated with this continuation.
392 ///
393 /// This value is used to organize different continuations with identical shared
394 /// data in the calendar such that they are executed in a contiguous batch.
395 pub(crate) fn mark<'s, I>(&'s self, init: &I) -> &'s Cell<Mark>
396 where
397 I: Into<token::Init<'brand>>,
398 {
399 self.branded_share(init).mark()
400 }
401
402 /// Returns the exit code of this continuation.
403 pub(crate) fn branded_result<T>(&self, _: &T) -> ExitStatus
404 where
405 T: Into<token::Term<'brand>>,
406 {
407 // SAFETY: the continuation is in state `Done` or `Gone` per the token witness
408 match &*self.state.borrow() {
409 State::Done(rc) | State::Gone(rc) => *rc,
410 _ => unsafe { unreachable_unchecked() },
411 }
412 }
413}
414
415// Continuations offer unique methods for Branded variants
416impl<'b, C: ?Sized + Config> Stateful for Continuation<'b, C> {
417 type Brand = &'b ();
418
419 unsafe fn enter(&self) {
420 unsafe {
421 self.state.enter();
422 }
423 }
424
425 unsafe fn leave(&self) {
426 unsafe {
427 self.state.leave();
428 }
429 }
430}
431
432impl<'b, C: ?Sized + Config> Rebrand<'b> for Continuation<'b, C> {
433 type Kind<'a> = Continuation<'a, C>;
434}
435
436// Continuations only contain pointers to pinned data and are not pinned themselves
437impl<C: ?Sized + Config> Unpin for Continuation<'_, C> {}
438
439impl<C: ?Sized + Config> fmt::Debug for Continuation<'_, C> {
440 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
441 let mut s = f.debug_struct("Continuation");
442
443 s.field("state", &self.state.borrow());
444
445 self.brand(|task, once| {
446 struct PrettyName(Label);
447
448 impl fmt::Debug for PrettyName {
449 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
450 write!(f, "\"{}\"", &self.0)
451 }
452 }
453
454 let state = task.token(once);
455 let init = state.as_init()?;
456
457 s.field("label", &PrettyName(task.branded_share(init).label()))
458 .field("rank", &task.branded_share(init).rank());
459
460 None::<()>
461 });
462
463 s.field("prec", &self.prec().float_range())
464 .field("refs", &self.use_count())
465 .finish()
466 }
467}
468
469impl<C: Config> crate::erased::Continuation for Continuation<'static, C> {
470 fn subject(&self) -> &dyn Any {
471 self.share().map_or(&(), |shared| shared.subject())
472 }
473
474 fn label(&self) -> Label {
475 self.share()
476 .map_or(Label::default(), |shared| shared.label())
477 }
478
479 fn prec(&self) -> Prec {
480 self.prec()
481 }
482
483 fn state(&self) -> erased::State {
484 self.state().borrow().erased()
485 }
486}
487
488unsafe impl<C: ?Sized + Config> IntrusivelyCounted for Continuation<'_, C> {
489 fn irc_box(&self) -> &IrcBox<dyn IrcBoxed> {
490 &self.task_box
491 }
492}
493
494/* ************************************************************** Irc Support */
495
496/// Container type used to support [intrusive reference counting](Irc).
497pub struct ContBox {
498 /// Contains a counter for the number of references to the outer
499 /// [`Continuation`].
500 refs: AtomicUsize,
501 /// Pointer to the most-specialized version of the instance.
502 /// Used to recover type information during dynamic dispatch.
503 vptr: Cell<Option<NonNull<dyn Dispatch>>>,
504}
505
506impl ContBox {
507 /// Creates a new irc box from a location with zero references and no
508 /// dispatcher.
509 const fn new() -> Self {
510 ContBox {
511 refs: AtomicUsize::new(0),
512 vptr: Cell::new(None),
513 }
514 }
515
516 /// Sets the pointer to the vtable.
517 ///
518 /// # Safety
519 /// It is the callers' responsibility to ensure that the `vptr` pointer to
520 /// the `dyn Dispatch` object outlives the continuation box.
521 unsafe fn set_vptr(&self, vptr: NonNull<dyn Dispatch + '_>) {
522 unsafe {
523 use core::mem::transmute;
524
525 self.vptr.set(Some(transmute::<
526 NonNull<dyn Dispatch + '_>,
527 NonNull<dyn Dispatch + 'static>,
528 >(vptr)));
529 }
530 }
531
532 /// Clears the pointer to the vtable.
533 ///
534 /// This can be useful to prevent [`IrcBoxed::release`] from returning
535 /// a destructor function in case the reference counter reaches zero.
536 fn clear_vptr(&self) {
537 self.vptr.set(None);
538 }
539}
540
541unsafe impl IrcBoxed for ContBox {
542 fn ref_count(&self) -> usize {
543 self.refs.load(Ordering::SeqCst)
544 }
545
546 fn acquire(&self, _: crate::ptr::Private) {
547 self.refs.fetch_add(1, Ordering::SeqCst);
548 }
549
550 fn release(&self, _: crate::ptr::Private) {
551 self.refs.fetch_sub(1, Ordering::SeqCst);
552 }
553
554 fn reclaim(&self, _: crate::ptr::Private) -> Option<fn(NonNull<dyn IrcBoxed>)> {
555 self.vptr.get().is_some().then_some(
556 // return a function reclaiming the outer type if no references point
557 // to the Continuation anymore; this separation is necessary to prevent
558 // overlapping references to this Continuation from self and (indirectly) from
559 // the inner dyn object
560 |this| unsafe {
561 // restore the pointer to the virtual table
562 let this = this.cast::<Self>().as_ref();
563 let mut vptr = this.vptr.get().unwrap_unchecked();
564
565 // call the reclaim-method on an exclusive reference to the dyn
566 // object; the temporary reference to the continuation has been dropped
567 // previously, and no other active references exist to the continuation
568 // which allows us to take this exclusive reference
569 Pin::new_unchecked(vptr.as_mut()).reclaim();
570 },
571 )
572 }
573}
574
575/* ************************************************************** Continuation States */
576
577fsm! {
578 /// Represents the state of a [`Continuation`] throughout its lifecycle.
579 ///
580 /// A `Continuation` starts in the [`Born`] state without references to
581 /// shared data or its future, as provided by its owning [Job] or [Agent].
582 /// After binding pinned references to it, the `Continuation` moves to the
583 /// [`Idle`] state, indicating it is unscheduled.
584 ///
585 /// From the [`Idle`] state, the `Continuation` can be scheduled by
586 /// inserting it into the event calendar at a specific model time,
587 /// transitioning it to the [`Next`] state. When the model time reaches this
588 /// point and the `Continuation` is activated, it enters the [`Busy`] state.
589 ///
590 /// Upon completion, the `Continuation` transitions to the [`Done`] state,
591 /// indicating that a return value is available for extraction. At any point
592 /// before normal termination, the `Continuation` can be aborted, moving it
593 /// to the [`Gone`] state and dropping its bound future. The [`Gone`] state
594 /// is the final state in a `Continuation`'s lifecycle.
595 ///
596 /// [Job]: crate::job::Job
597 /// [Agent]: crate::Agent
598 /// [`Born`]: State::Born
599 /// [`Idle`]: State::Idle
600 /// [`Next`]: State::Next
601 /// [`Busy`]: State::Busy
602 /// [`Done`]: State::Done
603 /// [`Gone`]: State::Gone
604 #[derive(Default)]
605 pub enum State<C: Config> {
606 /// State of a continuation signifying a non-bound future and shared data.
607 #[default]
608 Born -> {Idle, Gone},
609 /// State of a continuation that is waiting for external reactivation.
610 Idle -> {Next, Gone},
611 /// State of the continuation that is currently active. At most one
612 /// continuation may be in this state during a simulation run at any
613 /// time.
614 Busy -> {Idle, Done},
615 /// State of a continuation that is managed by the calendar.
616 Next(<C::Plan as Scheduler>::State) -> {Idle, Busy},
617 /// State of a completed continuation with a result available for extraction.
618 Done(ExitStatus) -> {Gone},
619 /// State of a continuation that cannot be scheduled anymore.
620 Gone(ExitStatus) -> {}
621 }
622
623 /// Meta-state for all non-[`Born`] states.
624 ///
625 /// [`Born`]: State::Born
626 pub Init = {Idle, Busy, Next, Done, Gone};
627
628 /// Meta-state for [`Done`] and [`Gone`] states and a subset of the [`Init`]
629 /// meta-state.
630 ///
631 /// [`Done`]: State::Done
632 /// [`Gone`]: State::Gone
633 /// [`Init`]: token::Init
634 pub Term: Init = {Done, Gone};
635}
636
637impl<C: ?Sized + Config> fmt::Debug for State<C> {
638 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
639 let mut debug = f.debug_tuple(self.label());
640
641 match self {
642 State::Next(plan) => debug.field(plan),
643 State::Done(exit) => debug.field(exit),
644 State::Gone(exit) => debug.field(exit),
645 _ => &mut debug,
646 }
647 .finish()
648 }
649}