odem_rs_sync/
fork.rs

1//! Provides support for agent-internal parallelism in simulation contexts.
2//!
3//! This module implements a forking mechanism inspired by SLX, enabling
4//! concurrent execution of futures.
5//!
6//! # Features
7//! - **Join Futures**: Use [`and`] to wait for all futures to complete.
8//! - **Select Futures**: Use [`or`] to resolve when any one future completes.
9//! - Supports nesting of combinators for complex workflows.
10//!
11//! [`and`]: Fork::and
12//! [`or`]: Fork::or
13
14pub use {
15	all_of::{AllOf, Joinable},
16	any_of::{AnyOf, Pickable},
17};
18
19use all_of::NotifyQuorum;
20use any_of::NotifyOnExit;
21use core::{future::IntoFuture, panic::Location, pin::Pin};
22use odem_rs_core::{Active, config::Config, job::Job, ptr::Lease, simulator::Sim};
23
24mod all_of;
25mod any_of;
26
27/* ***************************************************** Fork Extension Trait */
28
29/// Extension trait for [simulation contexts] to support the [fork]-operation.
30///
31/// [simulation contexts]: Sim
32/// [fork]: ForkExt::fork
33pub trait ForkExt {
34	/// Associated [simulation configuration] type.
35	///
36	/// [simulation configuration]: Config
37	type Config: ?Sized + Config;
38
39	/// Creates a future combinator that enables agent-internal parallelism
40	/// by combining futures for concurrent execution within the simulation
41	/// context.
42	///
43	/// The `fork` method allows developers to do either:
44	///
45	/// 1. **Wait for any one of the futures to complete** ([`or`] combinator).
46	/// 2. **Wait for all futures to complete** ([`and`] combinator).
47	///
48	/// ## Usage Notes
49	/// - The `or` combinator requires all futures to share the same return
50	///   type. If multiple futures could complete at the same simulation time,
51	///   precedence is given to those chained earlier.
52	/// - The `and` combinator collects the results of all futures into a tuple,
53	///   preserving their order in the chain.
54	/// - Nested combinators can be used to create advanced logic.
55	///
56	/// ## Examples
57	///
58	/// ### Using `or` to Wait for the First Completion
59	/// ```
60	/// # use {odem_rs_core::simulator::Sim, odem_rs_sync::fork::ForkExt as _};
61	/// # async fn sim_main(sim: &Sim) {
62	/// let result = sim
63	///     .fork(async { 1 })
64	///     .or(async { 2 })
65	///     .or(async { 3 })
66	///     .await;
67	///
68	/// assert_eq!(result, 1);
69	/// # }
70	/// ```
71	///
72	/// ### Using `and` to Wait for All Completions
73	/// ```
74	/// # use {odem_rs_core::simulator::Sim, odem_rs_sync::fork::ForkExt as _};
75	/// # async fn sim_main(sim: &Sim) {
76	/// let results = sim
77	///     .fork(async { true })
78	///     .and(async { 2 })
79	///     .and(async { "3" })
80	///     .await;
81	///
82	/// assert_eq!(results, (true, 2, "3"));
83	/// # }
84	/// ```
85	///
86	/// ### Combining Nested Futures
87	/// ```
88	/// # use {odem_rs_core::simulator::Sim, odem_rs_sync::fork::ForkExt as _};
89	/// # async fn sim_main(sim: &Sim) {
90	/// let result = sim
91	///     .fork(async { (1, "One") })
92	///     .or(sim.fork(async { 2 }).and(async { "Two" }))
93	///     .await;
94	///
95	/// assert_eq!(result, (1, "One"));
96	/// # }
97	/// ```
98	///
99	/// [`or`]: Fork::or
100	/// [`and`]: Fork::and
101	fn fork<F: IntoFuture>(&self, future: F) -> Fork<'_, Self::Config, F>;
102}
103
104// implement the trait for all valid simulation contexts
105impl<C: ?Sized + Config> ForkExt for Sim<C> {
106	type Config = C;
107
108	#[track_caller]
109	fn fork<F: IntoFuture>(&self, fut: F) -> Fork<'_, C, F> {
110		Fork {
111			sim: self,
112			fut,
113			loc: Location::caller(),
114		}
115	}
116}
117
118/* ********************************************************************* Fork */
119
120/// A builder-type structure that allows assembling arbitrary many futures into
121/// a concurrently executed choice or collection.
122pub struct Fork<'s, C: ?Sized + Config, F> {
123	/// Associated simulation context.
124	sim: &'s Sim<C>,
125	/// A future to await.
126	fut: F,
127	/// A reference to the source location of the future.
128	loc: &'static Location<'static>,
129}
130
131impl<'s, C: ?Sized + Config, F: IntoFuture + 's> Fork<'s, C, F> {
132	/// Converts the `Fork` into a [`AnyOf`]-type of choice.
133	#[track_caller]
134	pub fn or<G: IntoFuture<Output = F::Output> + 's>(
135		self,
136		fut: G,
137	) -> AnyOf<'s, C, impl Pickable<C, Output = F::Output> + 's> {
138		#![allow(clippy::type_complexity)]
139		AnyOf::new(
140			self.sim,
141			Cons(
142				Term(
143					Job::build()
144						.with_actions(self.fut)
145						.with_location(self.loc)
146						.with_finalizer(NotifyOnExit::<C>::new())
147						.finish()
148						.into_inner(),
149				),
150				Job::build()
151					.with_actions(fut)
152					.with_finalizer(NotifyOnExit::<C>::new())
153					.finish()
154					.into_inner(),
155			),
156		)
157	}
158
159	/// Converts the `Fork` into a [`AllOf`]-type of join.
160	#[track_caller]
161	pub fn and<G: IntoFuture>(
162		self,
163		fut: G,
164	) -> AllOf<'s, C, impl Joinable<C, Output = ((F::Output,), G::Output)> + use<G, C, F>> {
165		#![allow(clippy::type_complexity)]
166		AllOf::new(
167			self.sim,
168			Cons(
169				Term(
170					Job::build()
171						.with_actions(self.fut)
172						.with_location(self.loc)
173						.with_finalizer(NotifyQuorum::<C>::new())
174						.finish()
175						.into_inner(),
176				),
177				Job::build()
178					.with_actions(fut)
179					.with_finalizer(NotifyQuorum::<C>::new())
180					.finish()
181					.into_inner(),
182			),
183		)
184	}
185}
186
187impl<C: ?Sized + Config, F: IntoFuture> IntoFuture for Fork<'_, C, F> {
188	type Output = F::Output;
189	type IntoFuture = F::IntoFuture;
190
191	fn into_future(self) -> Self::IntoFuture {
192		self.fut.into_future()
193	}
194}
195
196/* ******************************************************* Nested Tuple Lists */
197
198/// A heterogeneous expression list of length one, containing only a head
199/// element, but no tail.
200#[pin_project::pin_project]
201struct Term<H>(#[pin] H);
202
203/// A heterogeneous expression list, containing a single-element head and a tail
204/// list.
205#[pin_project::pin_project]
206struct Cons<H, T>(#[pin] H, #[pin] T);
207
208/* *************************************************************** ActiveList */
209
210/// Represents a list of heterogeneous expressions that are [Active].
211pub trait ActiveList<C: ?Sized + Config> {
212	/// The length of the list of active expressions.
213	const LEN: usize;
214
215	/// A method to activate the active objects.
216	fn activate(self: Pin<&mut Self>, sim: &Sim<C>);
217}
218
219impl<C, T> ActiveList<C> for Term<T>
220where
221	C: ?Sized + Config,
222	T: Active<C>,
223{
224	const LEN: usize = 1;
225
226	#[inline]
227	fn activate(self: Pin<&mut Self>, sim: &Sim<C>) {
228		let this = self.project();
229
230		// SAFETY: This pretends that we don't keep a reference outside the
231		// call, which we absolutely do and even use for later querying of the
232		// results. Once used, the outside reference invalidates all references
233		// kept by the simulator under stacked borrow rules. Despite this, it is
234		// safe (though not pretty), because we only use the outside reference
235		// once any or all of the jobs have terminated and discard of the
236		// other references immediately after.
237		// 7/10 - would not transmute again.
238		sim.activate(unsafe {
239			core::mem::transmute::<Pin<&mut T>, Pin<&mut Lease<'_, T>>>(this.0)
240		});
241	}
242}
243
244impl<C, L, R> ActiveList<C> for Cons<L, R>
245where
246	C: ?Sized + Config,
247	L: ActiveList<C>,
248	R: Active<C>,
249{
250	const LEN: usize = L::LEN + 1;
251
252	#[inline]
253	fn activate(self: Pin<&mut Self>, sim: &Sim<C>) {
254		let this = self.project();
255		this.0.activate(sim);
256
257		// SAFETY: This pretends that we don't keep a reference outside the
258		// call, which we absolutely do and even use for later querying of the
259		// results. Once used, the outside reference invalidates all references
260		// kept by the simulator under stacked borrow rules. Despite this, it is
261		// safe (though not pretty), because we only use the outside reference
262		// once any or all of the jobs have terminated and discard of the
263		// other references immediately after.
264		// 7/10 - would not transmute again.
265		sim.activate(unsafe {
266			core::mem::transmute::<Pin<&mut R>, Pin<&mut Lease<'_, R>>>(this.1)
267		});
268	}
269}