odem_rs_sync/fork/
any_of.rs

1use super::{ActiveList, Cons, Term};
2
3use odem_rs_core::{
4	ExitStatus, Puck,
5	config::Config,
6	continuation::Puck as TaskPuck,
7	fsm::Brand,
8	job::{Job, Settle},
9	simulator::{Prec, Sim},
10};
11
12use core::{
13	future::{Future, IntoFuture},
14	pin::Pin,
15	task::{Context, Poll},
16};
17
18/* *************************************************************** AnyOf Type */
19
20/// A combinator for awaiting the first future to complete.
21///
22/// This utility resolves when the first future in its collection completes,
23/// returning that future's result.
24///
25/// # Behavior
26/// - If multiple futures terminate simultaneously, the one appearing first in
27///   the code will take precedence.
28/// - Supports chaining with the [`or`] method to combine futures.
29///
30/// [`or`]: AnyOf::or
31#[pin_project::pin_project]
32pub struct AnyOf<'s, C: ?Sized + Config, P: Pickable<C>> {
33	/// The simulation context before activation and `None` afterward.
34	sim: Option<&'s Sim<C>>,
35	/// List of futures to pick from.
36	#[pin]
37	fut: P,
38}
39
40impl<'s, C: ?Sized + Config, P: Pickable<C>> AnyOf<'s, C, P> {
41	/// Creates a new future combinator containing a set of [Pickable] futures.
42	pub(super) const fn new(sim: &'s Sim<C>, fut: P) -> Self {
43		AnyOf {
44			sim: Some(sim),
45			fut,
46		}
47	}
48
49	/// Adds another future to await to the list.
50	#[track_caller]
51	pub fn or<F: IntoFuture<Output = P::Output>>(
52		self,
53		fut: F,
54	) -> AnyOf<'s, C, impl Pickable<C, Output = P::Output> + use<F, C, P>> {
55		AnyOf {
56			sim: self.sim,
57			fut: Cons(
58				self.fut,
59				Job::build()
60					.with_actions(fut)
61					.with_finalizer(NotifyOnExit::<C>::new())
62					.finish()
63					.into_inner(),
64			),
65		}
66	}
67}
68
69impl<C: ?Sized + Config, P: Pickable<C>> Future for AnyOf<'_, C, P> {
70	type Output = P::Output;
71
72	fn poll(self: Pin<&mut Self>, _: &mut Context<'_>) -> Poll<Self::Output> {
73		let mut this = self.project();
74
75		if let Some(sim) = this.sim.take() {
76			let active = sim.active();
77			let prec = active.prec();
78
79			// distribute the active continuation's puck
80			this.fut.as_mut().bind(
81				active,
82				&mut prec
83					.split(P::LEN as _)
84					.expect("ran out of bits to support this level of nesting"),
85			);
86
87			// activate the list of jobs
88			this.fut.activate(sim);
89		} else {
90			let Some(result) = this.fut.collect() else {
91				unreachable!("unexpected spurious reactivation")
92			};
93
94			// return the result
95			return Poll::Ready(result);
96		}
97
98		// await another activation
99		Poll::Pending
100	}
101}
102
103/* ************************************************************* NotifyOnExit */
104
105/// Future adapter that reactivates the parent continuation when the child
106/// continuation terminates. Doesn't reactivate the parent if the child is
107/// canceled prematurely.
108pub struct NotifyOnExit<C: ?Sized + Config> {
109	/// Handle to the parent continuation.
110	parent: Option<TaskPuck<C>>,
111}
112
113impl<C: ?Sized + Config> NotifyOnExit<C> {
114	/// Initializes the notifier from a future. The parent continuation is initialized
115	/// later during binding to ensure the reactivation of the correct continuation,
116	/// even if the job has been moved to a different continuation between creation
117	/// and activation.
118	pub const fn new() -> Self {
119		NotifyOnExit { parent: None }
120	}
121
122	/// Method initializing the parent continuation.
123	fn bind(&mut self, puck: TaskPuck<C>) {
124		self.parent = Some(puck);
125	}
126}
127
128impl<C: ?Sized + Config, R> Settle<R> for NotifyOnExit<C> {
129	fn settle(self, _: &mut R) -> ExitStatus {
130		// wake the parent; if it has already been woken up, ignore
131		if let Some(mut puck) = self.parent {
132			puck.wake().ok();
133		}
134
135		// report success
136		Ok(odem_rs_core::Success)
137	}
138}
139
140/* *************************************************************** Pick Trait */
141
142/// Represents a heterogeneous list of [`Job`], of which the earliest one in
143/// terms of model time and (secondary) ordering is to be awaited.
144pub trait Pickable<C: ?Sized + Config>: ActiveList<C> {
145	/// The shared output type of all contained futures.
146	type Output;
147
148	/// Completes initializing the `NotifyOnExit` finisher with the caller's
149	/// [`TaskPuck`] and sets [`Prec`]s according to the order of queuing.
150	fn bind(self: Pin<&mut Self>, puck: TaskPuck<C>, prec: &mut impl Iterator<Item = Prec>);
151
152	/// Collects the result of the single job that terminated.
153	fn collect(self: Pin<&mut Self>) -> Option<Self::Output>;
154}
155
156impl<C, F> Pickable<C> for Term<Job<C, F, NotifyOnExit<C>>>
157where
158	C: ?Sized + Config,
159	F: Future,
160{
161	type Output = F::Output;
162
163	fn bind(self: Pin<&mut Self>, puck: TaskPuck<C>, prec: &mut impl Iterator<Item = Prec>) {
164		let this = self.project();
165		this.0.brand(|job, once| {
166			let born = job.token(once).into_born().unwrap();
167			job.set_prec(prec.next().unwrap());
168			job.finalizer(&born).bind(puck);
169		});
170	}
171
172	fn collect(self: Pin<&mut Self>) -> Option<Self::Output> {
173		let this = self.project();
174		this.0.result().ok()
175	}
176}
177
178impl<C, L, F> Pickable<C> for Cons<L, Job<C, F, NotifyOnExit<C>>>
179where
180	C: ?Sized + Config,
181	L: Pickable<C, Output = F::Output>,
182	F: Future,
183{
184	type Output = F::Output;
185
186	fn bind(self: Pin<&mut Self>, puck: TaskPuck<C>, prec: &mut impl Iterator<Item = Prec>) {
187		let this = self.project();
188
189		this.0.bind(puck.clone(), prec);
190		this.1.brand(|job, once| {
191			let born = job.token(once).into_born().unwrap();
192			job.set_prec(prec.next().unwrap());
193			job.finalizer(&born).bind(puck);
194		});
195	}
196
197	fn collect(self: Pin<&mut Self>) -> Option<Self::Output> {
198		let this = self.project();
199		this.0.collect().or_else(|| this.1.result().ok())
200	}
201}