odem_rs_sync/fork.rs
1//! Provides support for agent-internal parallelism in simulation contexts.
2//!
3//! This module implements a forking mechanism inspired by SLX, enabling
4//! concurrent execution of futures.
5//!
6//! # Features
7//! - **Join Futures**: Use [`and`] to wait for all futures to complete.
8//! - **Select Futures**: Use [`or`] to resolve when any one future completes.
9//! - Supports nesting of combinators for complex workflows.
10//!
11//! [`and`]: Fork::and
12//! [`or`]: Fork::or
13
14pub use {
15 all_of::{AllOf, Joinable},
16 any_of::{AnyOf, Pickable},
17};
18
19use all_of::NotifyQuorum;
20use any_of::NotifyOnExit;
21use core::{future::IntoFuture, panic::Location, pin::Pin};
22use odem_rs_core::{Active, config::Config, job::Job, ptr::Lease, simulator::Sim};
23
24mod all_of;
25mod any_of;
26
27/* ***************************************************** Fork Extension Trait */
28
29/// Extension trait for [simulation contexts] to support the [fork]-operation.
30///
31/// [simulation contexts]: Sim
32/// [fork]: ForkExt::fork
33pub trait ForkExt {
34 /// Associated [simulation configuration] type.
35 ///
36 /// [simulation configuration]: Config
37 type Config: ?Sized + Config;
38
39 /// Creates a future combinator that enables agent-internal parallelism
40 /// by combining futures for concurrent execution within the simulation
41 /// context.
42 ///
43 /// The `fork` method allows developers to do either:
44 ///
45 /// 1. **Wait for any one of the futures to complete** ([`or`] combinator).
46 /// 2. **Wait for all futures to complete** ([`and`] combinator).
47 ///
48 /// ## Usage Notes
49 /// - The `or` combinator requires all futures to share the same return
50 /// type. If multiple futures could complete at the same simulation time,
51 /// precedence is given to those chained earlier.
52 /// - The `and` combinator collects the results of all futures into a tuple,
53 /// preserving their order in the chain.
54 /// - Nested combinators can be used to create advanced logic.
55 ///
56 /// ## Examples
57 ///
58 /// ### Using `or` to Wait for the First Completion
59 /// ```
60 /// # use {odem_rs_core::simulator::Sim, odem_rs_sync::fork::ForkExt as _};
61 /// # async fn sim_main(sim: &Sim) {
62 /// let result = sim
63 /// .fork(async { 1 })
64 /// .or(async { 2 })
65 /// .or(async { 3 })
66 /// .await;
67 ///
68 /// assert_eq!(result, 1);
69 /// # }
70 /// ```
71 ///
72 /// ### Using `and` to Wait for All Completions
73 /// ```
74 /// # use {odem_rs_core::simulator::Sim, odem_rs_sync::fork::ForkExt as _};
75 /// # async fn sim_main(sim: &Sim) {
76 /// let results = sim
77 /// .fork(async { true })
78 /// .and(async { 2 })
79 /// .and(async { "3" })
80 /// .await;
81 ///
82 /// assert_eq!(results, (true, 2, "3"));
83 /// # }
84 /// ```
85 ///
86 /// ### Combining Nested Futures
87 /// ```
88 /// # use {odem_rs_core::simulator::Sim, odem_rs_sync::fork::ForkExt as _};
89 /// # async fn sim_main(sim: &Sim) {
90 /// let result = sim
91 /// .fork(async { (1, "One") })
92 /// .or(sim.fork(async { 2 }).and(async { "Two" }))
93 /// .await;
94 ///
95 /// assert_eq!(result, (1, "One"));
96 /// # }
97 /// ```
98 ///
99 /// [`or`]: Fork::or
100 /// [`and`]: Fork::and
101 fn fork<F: IntoFuture>(&self, future: F) -> Fork<'_, Self::Config, F>;
102}
103
104// implement the trait for all valid simulation contexts
105impl<C: ?Sized + Config> ForkExt for Sim<C> {
106 type Config = C;
107
108 #[track_caller]
109 fn fork<F: IntoFuture>(&self, fut: F) -> Fork<'_, C, F> {
110 Fork {
111 sim: self,
112 fut,
113 loc: Location::caller(),
114 }
115 }
116}
117
118/* ********************************************************************* Fork */
119
120/// A builder-type structure that allows assembling arbitrary many futures into
121/// a concurrently executed choice or collection.
122pub struct Fork<'s, C: ?Sized + Config, F> {
123 /// Associated simulation context.
124 sim: &'s Sim<C>,
125 /// A future to await.
126 fut: F,
127 /// A reference to the source location of the future.
128 loc: &'static Location<'static>,
129}
130
131impl<'s, C: ?Sized + Config, F: IntoFuture + 's> Fork<'s, C, F> {
132 /// Converts the `Fork` into a [`AnyOf`]-type of choice.
133 #[track_caller]
134 pub fn or<G: IntoFuture<Output = F::Output> + 's>(
135 self,
136 fut: G,
137 ) -> AnyOf<'s, C, impl Pickable<C, Output = F::Output> + 's> {
138 #![allow(clippy::type_complexity)]
139 AnyOf::new(
140 self.sim,
141 Cons(
142 Term(
143 Job::build()
144 .with_actions(self.fut)
145 .with_location(self.loc)
146 .with_finalizer(NotifyOnExit::<C>::new())
147 .finish()
148 .into_inner(),
149 ),
150 Job::build()
151 .with_actions(fut)
152 .with_finalizer(NotifyOnExit::<C>::new())
153 .finish()
154 .into_inner(),
155 ),
156 )
157 }
158
159 /// Converts the `Fork` into a [`AllOf`]-type of join.
160 #[track_caller]
161 pub fn and<G: IntoFuture>(
162 self,
163 fut: G,
164 ) -> AllOf<'s, C, impl Joinable<C, Output = ((F::Output,), G::Output)> + use<G, C, F>> {
165 #![allow(clippy::type_complexity)]
166 AllOf::new(
167 self.sim,
168 Cons(
169 Term(
170 Job::build()
171 .with_actions(self.fut)
172 .with_location(self.loc)
173 .with_finalizer(NotifyQuorum::<C>::new())
174 .finish()
175 .into_inner(),
176 ),
177 Job::build()
178 .with_actions(fut)
179 .with_finalizer(NotifyQuorum::<C>::new())
180 .finish()
181 .into_inner(),
182 ),
183 )
184 }
185}
186
187impl<C: ?Sized + Config, F: IntoFuture> IntoFuture for Fork<'_, C, F> {
188 type Output = F::Output;
189 type IntoFuture = F::IntoFuture;
190
191 fn into_future(self) -> Self::IntoFuture {
192 self.fut.into_future()
193 }
194}
195
196/* ******************************************************* Nested Tuple Lists */
197
198/// A heterogeneous expression list of length one, containing only a head
199/// element, but no tail.
200#[pin_project::pin_project]
201struct Term<H>(#[pin] H);
202
203/// A heterogeneous expression list, containing a single-element head and a tail
204/// list.
205#[pin_project::pin_project]
206struct Cons<H, T>(#[pin] H, #[pin] T);
207
208/* *************************************************************** ActiveList */
209
210/// Represents a list of heterogeneous expressions that are [Active].
211pub trait ActiveList<C: ?Sized + Config> {
212 /// The length of the list of active expressions.
213 const LEN: usize;
214
215 /// A method to activate the active objects.
216 fn activate(self: Pin<&mut Self>, sim: &Sim<C>);
217}
218
219impl<C, T> ActiveList<C> for Term<T>
220where
221 C: ?Sized + Config,
222 T: Active<C>,
223{
224 const LEN: usize = 1;
225
226 #[inline]
227 fn activate(self: Pin<&mut Self>, sim: &Sim<C>) {
228 let this = self.project();
229
230 // SAFETY: This pretends that we don't keep a reference outside the
231 // call, which we absolutely do and even use for later querying of the
232 // results. Once used, the outside reference invalidates all references
233 // kept by the simulator under stacked borrow rules. Despite this, it is
234 // safe (though not pretty), because we only use the outside reference
235 // once any or all of the jobs have terminated and discard of the
236 // other references immediately after.
237 // 7/10 - would not transmute again.
238 sim.activate(unsafe {
239 core::mem::transmute::<Pin<&mut T>, Pin<&mut Lease<'_, T>>>(this.0)
240 });
241 }
242}
243
244impl<C, L, R> ActiveList<C> for Cons<L, R>
245where
246 C: ?Sized + Config,
247 L: ActiveList<C>,
248 R: Active<C>,
249{
250 const LEN: usize = L::LEN + 1;
251
252 #[inline]
253 fn activate(self: Pin<&mut Self>, sim: &Sim<C>) {
254 let this = self.project();
255 this.0.activate(sim);
256
257 // SAFETY: This pretends that we don't keep a reference outside the
258 // call, which we absolutely do and even use for later querying of the
259 // results. Once used, the outside reference invalidates all references
260 // kept by the simulator under stacked borrow rules. Despite this, it is
261 // safe (though not pretty), because we only use the outside reference
262 // once any or all of the jobs have terminated and discard of the
263 // other references immediately after.
264 // 7/10 - would not transmute again.
265 sim.activate(unsafe {
266 core::mem::transmute::<Pin<&mut R>, Pin<&mut Lease<'_, R>>>(this.1)
267 });
268 }
269}