odem_rs_core/continuation/
mod.rs

1//! The `continuation` module provides the implementation of a type-erased,
2//! intrusively linked futures abstraction used in the scheduler of the
3//! simulation library.
4//!
5//! Continuations are held by a calendar, allowing them to be scheduled and
6//! managed based on model time.
7//!
8//! The [`Continuation`] type represents a continuation that can be scheduled
9//! and executed within the simulation library. It is designed to handle
10//! type-erased futures and supports branding to tie [token witnesses] to their
11//! runtime-state, enabling statically checked validity of state transitions.
12//!
13//! [token witnesses]: token::State
14
15use core::{
16	any::Any,
17	cell::Cell,
18	fmt,
19	hint::unreachable_unchecked,
20	panic::Location,
21	pin::Pin,
22	ptr::NonNull,
23	sync::atomic::{AtomicUsize, Ordering},
24	task::{Context, Poll},
25};
26use intrusive_collections::{LinkedList, RBTreeLink};
27
28pub use adapter::{Adapter, PointerOps};
29pub use puck::Puck;
30pub use share::{Label, Share};
31
32use crate::{
33	Dispatch, ExitStatus,
34	calendar::{PlanState, Scheduler},
35	config::Config,
36	error::NotIdle,
37	fsm::*,
38	ptr::{IntrusivelyCounted, Irc, IrcBox, IrcBoxed},
39	simulator::{Mark, Prec},
40};
41
42mod adapter;
43mod puck;
44mod share;
45
46/* ********************************************************************* Continuation */
47
48/// The type of intrusive link stored in every [Continuation].
49pub type Link = RBTreeLink;
50
51/// A structure of type-erased, intrusively linked [futures].
52///
53/// Pointers to instances of this type are used in the scheduler of the
54/// simulation library. They are oblivious to lifetime-restrictions and
55/// erase the concrete types of the future and the process instance.
56/// The latter types are recovered during dynamic lookup, but the
57/// lifetime-restrictions have to be observed by the abstractions provided
58/// in the simulation library.
59///
60/// Continuations may be *branded* using an additional lifetime that ties
61/// [Token] to specific instances. This allows composition and chaining of
62/// method calls that depend on specific [States] without having to
63/// continuously test the current states within the method implementations.
64///
65/// [futures]: Future
66/// [Token]: token
67/// [States]: State
68pub struct Continuation<'brand, C: ?Sized + Config> {
69	/// An intrusive link for insertion of a continuation into the calendar.
70	hook: Link,
71	/// Intrusive reference counter and raw pointer to the most specialized
72	/// version of the type containing this continuation.
73	///
74	/// This is needed to provide run-time polymorphism over the type
75	/// of future being executed as well as what code is run on dropping. See
76	/// [Dispatch] for further information.
77	task_box: IrcBox<ContBox>,
78	/// Pointer to data shared across different continuations.
79	share: NonNull<Share<C>>,
80	/// A list of continuations waiting for this continuation to terminate.
81	pending: Cell<LinkedList<Adapter<C>>>,
82	/// Precedence of this continuation relative to other continuations from the
83	/// same agent.
84	prec: Cell<Prec>,
85	/// Current state of this continuation instance.
86	state: StateMachine<'brand, State<C>>,
87	/// Span information for the current continuation.
88	#[cfg(feature = "tracing")]
89	span: tracing::Span,
90}
91
92impl<C: ?Sized + Config> Continuation<'static, C> {
93	/// Creates a new continuation initialized in [State::Born] with specific
94	/// location information attached.
95	///
96	/// [State::Born]: erased::State::Born
97	pub(crate) fn new(prec: Prec, location: &'static Location<'static>) -> Self {
98		Self {
99			hook: Link::new(),
100			task_box: IrcBox::with_location(ContBox::new(), location),
101			share: NonNull::dangling(),
102			pending: Cell::new(LinkedList::new(Adapter::NEW)),
103			prec: Cell::new(prec),
104			state: StateMachine::default(),
105			#[cfg(feature = "tracing")]
106			span: tracing::Span::current(),
107		}
108	}
109
110	/// Returns the exit code of the continuation, or `None` if it hasn't
111	/// terminated.
112	pub(crate) fn result(&self) -> Option<ExitStatus> {
113		self.brand(|task, once| {
114			task.token(once)
115				.into_term()
116				.map(|state| task.branded_result(&state))
117				.ok()
118		})
119	}
120
121	/// Reactivates the continuation at the current model-time.
122	pub(crate) fn wake(this: Irc<Self>) -> Result<(), NotIdle> {
123		this.clone().brand(|task, once| {
124			let idle = task.token(once).into_idle()?;
125
126			task.branded_share(&idle)
127				.sim()
128				.calendar()
129				.activate(task.clone(), idle);
130
131			Ok(())
132		})
133	}
134
135	/// Returns the next model time this continuation is scheduled for, or
136	/// `None` if it isn't scheduled.
137	pub(crate) fn time(&self) -> Option<C::Time> {
138		self.brand(|task, once| {
139			task.token(once)
140				.into_next()
141				.ok()
142				.map(|next| task.next_time(&next))
143		})
144	}
145}
146
147impl<'brand, C: ?Sized + Config> Continuation<'brand, C> {
148	/// Enters the [`Span`] associated with the agent owning the shared data.
149	///
150	/// [`Span`]: tracing::Span
151	#[cfg(feature = "tracing")]
152	pub fn enter_span(&self) -> tracing::span::Entered<'_> {
153		self.span.enter()
154	}
155
156	#[cfg(not(feature = "tracing"))]
157	pub const fn enter_span(&self) {}
158
159	/// Sets a new virtual function pointer for this continuation.
160	///
161	/// # Safety
162	/// The caller is responsible to ensure that the pointer stays valid for
163	/// the duration of the continuation's life.
164	pub(crate) unsafe fn set_vptr(&self, vptr: NonNull<dyn Dispatch + '_>) {
165		unsafe {
166			self.task_box.set_vptr(vptr);
167		}
168	}
169
170	/// Clears the virtual function pointer for this continuation, preventing any
171	/// methods from being called.
172	///
173	/// This can be used to prevent [`Dispatch::reclaim`] from being called,
174	/// even if the reference counter reaches zero.
175	pub(crate) fn clear_vptr(&self) {
176		self.task_box.clear_vptr();
177	}
178
179	/// Returns the number of references to this continuation.
180	pub(crate) fn use_count(&self) -> usize {
181		self.task_box.refs.load(Ordering::Relaxed)
182	}
183
184	/// Returns a copy of the internal [State](erased::State).
185	pub(crate) fn state(&self) -> &StateMachine<'brand, State<C>> {
186		&self.state
187	}
188
189	/// Returns the current [Prec] of this continuation.
190	pub(crate) fn prec(&self) -> Prec {
191		self.prec.get()
192	}
193
194	/// Sets the new [Prec] of this continuation.
195	pub(crate) fn set_prec(&self, prec: Prec) {
196		self.prec.set(prec);
197	}
198
199	/// Adds another continuation to the list of continuations to be awoken upon terminating.
200	#[inline]
201	pub(crate) fn insert_pending(&self, other: Irc<Continuation<'static, C>>) {
202		let mut list = self.pending.take();
203		list.push_back(other);
204		self.pending.set(list);
205	}
206
207	/// Removes a previously added continuation from the list of pending continuations.
208	///
209	/// # Safety
210	/// It is the caller's responsibility to ensure that the continuation had been
211	/// added previously via [Self::insert_pending].
212	pub(crate) unsafe fn remove_pending(&self, other: &Continuation<'static, C>) {
213		// Only unlink if the other continuation is actually linked right now.
214		// This can happen during panics, when the `drop` impl of a `Join`
215		// attempts to remove a stored continuation from the pending list, but
216		// it has already been removed by a prior reactivation.
217		if other.hook.is_linked() {
218			let mut list = self.pending.take();
219
220			unsafe {
221				list.cursor_mut_from_ptr(other.detach()).remove();
222			}
223
224			self.pending.set(list);
225		}
226	}
227
228	/// Awakens all pending continuations and clears the list.
229	pub(crate) fn wake_pending(&self) {
230		for task in self.pending.take() {
231			Continuation::wake(task).ok();
232		}
233	}
234
235	/// Converts the specific brand into a generic brand, breaking the
236	/// connection with the equally branded token.
237	pub(crate) fn detach(&self) -> &Continuation<'static, C> {
238		unsafe { core::mem::transmute(self) }
239	}
240
241	/// Returns the [`Location`] information for this `Continuation`.
242	pub(crate) fn location(&self) -> &'static Location<'static> {
243		IrcBox::location(&self.task_box)
244	}
245
246	/// Performs a runtime-check if this continuation has been dereferenced on the
247	/// same thread as the one the executor is running and panicks if that is
248	/// not the case.
249	///
250	/// # Safety
251	/// This method can only be called after the `Continuation` has been
252	/// activated. Calling it is thread-safe.
253	unsafe fn is_same_thread(&self) -> bool {
254		// Extract the shared data from the task.
255		let share = unsafe { self.share.as_ref() };
256
257		// Compare the pointer-address of this continuation's simulation context
258		// to the pointer-address of the thread-local simulation context.
259		crate::erased::with(|sim| core::ptr::addr_eq(&**share.sim(), sim)).unwrap_or(false)
260	}
261
262	/// Returns an enumeration copy of the internal [State](token::State).
263	pub(crate) fn token(&self, once: Ephemeral<'brand>) -> token::State<'brand> {
264		self.state.token(once)
265	}
266
267	/// Binds a continuation to [shared data].
268	///
269	/// # Safety
270	/// The caller is responsible to ensure that the shared-data-reference
271	/// outlives the (active) part of the continuation's life.
272	///
273	/// [shared data]: Share
274	pub(crate) unsafe fn bind(
275		mut self: Pin<&mut Self>,
276		born: token::Born<'brand>,
277		share: &Share<C>,
278	) -> token::Idle<'brand> {
279		// assign the reference to the shared data
280		self.share = NonNull::from(share);
281
282		// enter our `Span` to properly record the transition
283		let _span = self.enter_span();
284
285		// transition into state `Idle`
286		self.state.transition(born, ())
287	}
288
289	/// Purges the continuation from the calendar.
290	pub(crate) fn deschedule(&self, next: token::Next<'brand>) -> token::Idle<'brand> {
291		let share = self.branded_share(&next);
292
293		// remove the continuation from the calendar
294		share.sim().calendar().remove(self, next)
295	}
296
297	/// Removes the active state from the continuation.
298	pub(crate) fn deactivate(&self, busy: token::Busy<'brand>) -> token::Idle<'brand> {
299		let share = self.branded_share(&busy);
300
301		// deregister the continuation from the active cell
302		share.sim().unslot(self, busy)
303	}
304
305	/// Polls the underlying future of this continuation.
306	pub(crate) fn poll(&self, busy: token::Busy<'brand>, cx: &mut Context<'_>) -> Poll<()> {
307		// read the virtual function from the table
308
309		// SAFETY: the busy-token testifies that the continuation has been
310		// bound, which ensures that the vptr to the virtual function table has
311		// been set; pinning is ensured by the binding routine requiring it
312		let vtab = unsafe { Pin::new_unchecked(self.task_box.vptr.get().unwrap().as_ref()) };
313
314		// temporarily escape the branding to allow `Future::poll()` to rebrand
315		// without accidentally creating two tokens for the same instance
316		let (once, res) = self.state.debrand(busy, move |_| vtab.poll(cx));
317
318		// analyze the resulting state
319		match self.token(once).into_busy() {
320			Ok(busy) => match res {
321				Poll::Ready(result) => {
322					// busy -> done
323					let _: token::Done<'_> = self.state.transition(busy, result);
324
325					// reactivate pending continuations on completion
326					self.wake_pending();
327
328					Poll::Ready(())
329				}
330				Poll::Pending => {
331					// busy -> idle
332					let _: token::Idle<'_> = self.state.transition(busy, ());
333					Poll::Pending
334				}
335			},
336			Err(_err) => {
337				debug_assert!(
338					res.is_pending(),
339					"task in state `{:?}` should not have been able to terminate",
340					_err.0
341				);
342				Poll::Pending
343			}
344		}
345	}
346
347	/// Returns a reference to the shared data for this continuation.
348	pub(crate) fn branded_share<'s, I>(&'s self, init: &I) -> &'s Share<C>
349	where
350		I: Into<token::Init<'brand>>,
351	{
352		let _ = init;
353
354		// SAFETY: the token witness testifies that the shared data is
355		// initialized, which only happens during binding
356		unsafe { self.share.as_ref() }
357	}
358
359	/// Returns a reference to the shared data if initialization of this continuation
360	/// has been completed.
361	pub(crate) fn share(&self) -> Option<&Share<C>> {
362		if self.state().erased().is_init() {
363			// SAFETY: once bound, the pointer stays valid
364			Some(unsafe { self.share.as_ref() })
365		} else {
366			None
367		}
368	}
369
370	/// Grants access to the calendar state if in state [`Next`](State::Next).
371	pub(crate) fn next_state<F, R>(&self, next: &token::Next<'brand>, f: F) -> R
372	where
373		F: FnOnce(&<C::Plan as Scheduler>::State) -> R,
374	{
375		let _ = next;
376
377		// SAFETY: the `Next` token guarantees that this continuation is in the
378		// correct state
379		match &*self.state.borrow() {
380			State::Next(state) => f(state),
381			_ => unsafe { unreachable_unchecked() },
382		}
383	}
384
385	/// Returns the model time that the continuation will be activated.
386	pub(crate) fn next_time(&self, next: &token::Next<'brand>) -> C::Time {
387		self.next_state(next, |s| s.time())
388	}
389
390	/// Returns the [Cell] containing the current mark of the shared data
391	/// associated with this continuation.
392	///
393	/// This value is used to organize different continuations with identical shared
394	/// data in the calendar such that they are executed in a contiguous batch.
395	pub(crate) fn mark<'s, I>(&'s self, init: &I) -> &'s Cell<Mark>
396	where
397		I: Into<token::Init<'brand>>,
398	{
399		self.branded_share(init).mark()
400	}
401
402	/// Returns the exit code of this continuation.
403	pub(crate) fn branded_result<T>(&self, _: &T) -> ExitStatus
404	where
405		T: Into<token::Term<'brand>>,
406	{
407		// SAFETY: the continuation is in state `Done` or `Gone` per the token witness
408		match &*self.state.borrow() {
409			State::Done(rc) | State::Gone(rc) => *rc,
410			_ => unsafe { unreachable_unchecked() },
411		}
412	}
413}
414
415// Continuations offer unique methods for Branded variants
416impl<'b, C: ?Sized + Config> Stateful for Continuation<'b, C> {
417	type Brand = &'b ();
418
419	unsafe fn enter(&self) {
420		unsafe {
421			self.state.enter();
422		}
423	}
424
425	unsafe fn leave(&self) {
426		unsafe {
427			self.state.leave();
428		}
429	}
430}
431
432impl<'b, C: ?Sized + Config> Rebrand<'b> for Continuation<'b, C> {
433	type Kind<'a> = Continuation<'a, C>;
434}
435
436// Continuations only contain pointers to pinned data and are not pinned themselves
437impl<C: ?Sized + Config> Unpin for Continuation<'_, C> {}
438
439impl<C: ?Sized + Config> fmt::Debug for Continuation<'_, C> {
440	fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
441		let mut s = f.debug_struct("Continuation");
442
443		s.field("state", &self.state.borrow());
444
445		self.brand(|task, once| {
446			struct PrettyName(Label);
447
448			impl fmt::Debug for PrettyName {
449				fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
450					write!(f, "\"{}\"", &self.0)
451				}
452			}
453
454			let state = task.token(once);
455			let init = state.as_init()?;
456
457			s.field("label", &PrettyName(task.branded_share(init).label()))
458				.field("rank", &task.branded_share(init).rank());
459
460			None::<()>
461		});
462
463		s.field("prec", &self.prec().float_range())
464			.field("refs", &self.use_count())
465			.finish()
466	}
467}
468
469impl<C: Config> crate::erased::Continuation for Continuation<'static, C> {
470	fn subject(&self) -> &dyn Any {
471		self.share().map_or(&(), |shared| shared.subject())
472	}
473
474	fn label(&self) -> Label {
475		self.share()
476			.map_or(Label::default(), |shared| shared.label())
477	}
478
479	fn prec(&self) -> Prec {
480		self.prec()
481	}
482
483	fn state(&self) -> erased::State {
484		self.state().borrow().erased()
485	}
486}
487
488unsafe impl<C: ?Sized + Config> IntrusivelyCounted for Continuation<'_, C> {
489	fn irc_box(&self) -> &IrcBox<dyn IrcBoxed> {
490		&self.task_box
491	}
492}
493
494/* ************************************************************** Irc Support */
495
496/// Container type used to support [intrusive reference counting](Irc).
497pub struct ContBox {
498	/// Contains a counter for the number of references to the outer
499	/// [`Continuation`].
500	refs: AtomicUsize,
501	/// Pointer to the most-specialized version of the instance.
502	/// Used to recover type information during dynamic dispatch.
503	vptr: Cell<Option<NonNull<dyn Dispatch>>>,
504}
505
506impl ContBox {
507	/// Creates a new irc box from a location with zero references and no
508	/// dispatcher.
509	const fn new() -> Self {
510		ContBox {
511			refs: AtomicUsize::new(0),
512			vptr: Cell::new(None),
513		}
514	}
515
516	/// Sets the pointer to the vtable.
517	///
518	/// # Safety
519	/// It is the callers' responsibility to ensure that the `vptr` pointer to
520	/// the `dyn Dispatch` object outlives the continuation box.
521	unsafe fn set_vptr(&self, vptr: NonNull<dyn Dispatch + '_>) {
522		unsafe {
523			use core::mem::transmute;
524
525			self.vptr.set(Some(transmute::<
526				NonNull<dyn Dispatch + '_>,
527				NonNull<dyn Dispatch + 'static>,
528			>(vptr)));
529		}
530	}
531
532	/// Clears the pointer to the vtable.
533	///
534	/// This can be useful to prevent [`IrcBoxed::release`] from returning
535	/// a destructor function in case the reference counter reaches zero.
536	fn clear_vptr(&self) {
537		self.vptr.set(None);
538	}
539}
540
541unsafe impl IrcBoxed for ContBox {
542	fn ref_count(&self) -> usize {
543		self.refs.load(Ordering::SeqCst)
544	}
545
546	fn acquire(&self, _: crate::ptr::Private) {
547		self.refs.fetch_add(1, Ordering::SeqCst);
548	}
549
550	fn release(&self, _: crate::ptr::Private) {
551		self.refs.fetch_sub(1, Ordering::SeqCst);
552	}
553
554	fn reclaim(&self, _: crate::ptr::Private) -> Option<fn(NonNull<dyn IrcBoxed>)> {
555		self.vptr.get().is_some().then_some(
556			// return a function reclaiming the outer type if no references point
557			// to the Continuation anymore; this separation is necessary to prevent
558			// overlapping references to this Continuation from self and (indirectly) from
559			// the inner dyn object
560			|this| unsafe {
561				// restore the pointer to the virtual table
562				let this = this.cast::<Self>().as_ref();
563				let mut vptr = this.vptr.get().unwrap_unchecked();
564
565				// call the reclaim-method on an exclusive reference to the dyn
566				// object; the temporary reference to the continuation has been dropped
567				// previously, and no other active references exist to the continuation
568				// which allows us to take this exclusive reference
569				Pin::new_unchecked(vptr.as_mut()).reclaim();
570			},
571		)
572	}
573}
574
575/* ************************************************************** Continuation States */
576
577fsm! {
578	/// Represents the state of a [`Continuation`] throughout its lifecycle.
579	///
580	/// A `Continuation` starts in the [`Born`] state without references to
581	/// shared data or its future, as provided by its owning [Job] or [Agent].
582	/// After binding pinned references to it, the `Continuation` moves to the
583	/// [`Idle`] state, indicating it is unscheduled.
584	///
585	/// From the [`Idle`] state, the `Continuation` can be scheduled by
586	/// inserting it into the event calendar at a specific model time,
587	/// transitioning it to the [`Next`] state. When the model time reaches this
588	/// point and the `Continuation` is activated, it enters the [`Busy`] state.
589	///
590	/// Upon completion, the `Continuation` transitions to the [`Done`] state,
591	/// indicating that a return value is available for extraction. At any point
592	/// before normal termination, the `Continuation` can be aborted, moving it
593	/// to the [`Gone`] state and dropping its bound future. The [`Gone`] state
594	/// is the final state in a `Continuation`'s lifecycle.
595	///
596	/// [Job]: crate::job::Job
597	/// [Agent]: crate::Agent
598	/// [`Born`]: State::Born
599	/// [`Idle`]: State::Idle
600	/// [`Next`]: State::Next
601	/// [`Busy`]: State::Busy
602	/// [`Done`]: State::Done
603	/// [`Gone`]: State::Gone
604	#[derive(Default)]
605	pub enum State<C: Config> {
606		/// State of a continuation signifying a non-bound future and shared data.
607		#[default]
608		Born -> {Idle, Gone},
609		/// State of a continuation that is waiting for external reactivation.
610		Idle -> {Next, Gone},
611		/// State of the continuation that is currently active. At most one
612		/// continuation may be in this state during a simulation run at any
613		/// time.
614		Busy -> {Idle, Done},
615		/// State of a continuation that is managed by the calendar.
616		Next(<C::Plan as Scheduler>::State) -> {Idle, Busy},
617		/// State of a completed continuation with a result available for extraction.
618		Done(ExitStatus) -> {Gone},
619		/// State of a continuation that cannot be scheduled anymore.
620		Gone(ExitStatus) -> {}
621	}
622
623	/// Meta-state for all non-[`Born`] states.
624	///
625	/// [`Born`]: State::Born
626	pub Init = {Idle, Busy, Next, Done, Gone};
627
628	/// Meta-state for [`Done`] and [`Gone`] states and a subset of the [`Init`]
629	/// meta-state.
630	///
631	/// [`Done`]: State::Done
632	/// [`Gone`]: State::Gone
633	/// [`Init`]: token::Init
634	pub Term: Init = {Done, Gone};
635}
636
637impl<C: ?Sized + Config> fmt::Debug for State<C> {
638	fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
639		let mut debug = f.debug_tuple(self.label());
640
641		match self {
642			State::Next(plan) => debug.field(plan),
643			State::Done(exit) => debug.field(exit),
644			State::Gone(exit) => debug.field(exit),
645			_ => &mut debug,
646		}
647		.finish()
648	}
649}