odem_rs_sync/
chain.rs

1//! A module containing a data structure for intrusively linked lists of
2//! [notifyable] objects that is used to construct [Chains] of [Continuations] on the
3//! stack.
4//!
5//! [Chains]: Chain
6//! [notifyable]: Subscriber
7//! [Continuations]: odem_rs_core::continuation::Continuation
8
9use core::{
10	cell::Cell,
11	fmt,
12	marker::PhantomData,
13	pin::{Pin, pin},
14	task::Waker,
15};
16use intrusive_collections::{LinkedList, LinkedListLink, UnsafeRef, intrusive_adapter};
17
18use crate::{Publisher, Subscriber};
19
20/* ******************************************************************** Chain */
21
22/// A shared list of intrusively [linked] objects that can be [notified].
23///
24/// [linked]: Link
25/// [notified]: Subscriber
26pub struct Chain<N = Waker> {
27	inner: Cell<LinkedList<LinkAdapter<N>>>,
28}
29
30impl<N> Chain<N> {
31	/// Creates a new, empty chain.
32	pub fn new() -> Self {
33		Chain {
34			inner: Cell::new(LinkedList::new(LinkAdapter::NEW)),
35		}
36	}
37
38	/// Removes all the queued-up objects without notifying them.
39	pub fn clear(&mut self) {
40		self.inner.get_mut().clear();
41	}
42
43	/// Returns `true` if this chain is empty.
44	pub fn is_empty(&self) -> bool {
45		let inner = self.inner.take();
46		let res = inner.is_empty();
47		self.inner.set(inner);
48		res
49	}
50
51	/// Constructs a Notifier for the first linked object.
52	///
53	/// At most one object is notified once the [`go`]-method is called and the
54	/// chain isn't empty, but the caller may execute additional methods that
55	/// change which object is notified and what happens in that case.
56	///
57	/// The notified object (if any) is automatically removed from the chain.
58	///
59	/// [`go`]: NotifyOne::go
60	pub fn notify_one(&self) -> NotifyOne<'_, N>
61	where
62		N: Subscriber,
63	{
64		NotifyOne::new(self)
65	}
66
67	/// Constructs a Notifier for all the linked objects.
68	///
69	/// Potentially all the objects in the chain are notified when the [`go`]
70	/// method is called, but the caller may execute additional methods that
71	/// change which objects are notified and what happens in that case.
72	///
73	/// All notified objects are automatically removed from the chain.
74	///
75	/// [`go`]: NotifyAll::go
76	pub fn notify_all(&self) -> NotifyAll<'_, N>
77	where
78		N: Subscriber,
79	{
80		NotifyAll::new(self)
81	}
82
83	/// Removes the first element of the chain.
84	///
85	/// Returns `None` if the chain is empty.
86	fn pop_front(&self) -> Option<UnsafeRef<Link<N>>> {
87		let mut inner = self.inner.take();
88		let res = inner.pop_front();
89		self.inner.set(inner);
90		res
91	}
92
93	/// Removes the last element of the chain.
94	///
95	/// Returns `None` if the chain is empty.
96	fn pop_back(&self) -> Option<UnsafeRef<Link<N>>> {
97		let mut inner = self.inner.take();
98		let res = inner.pop_back();
99		self.inner.set(inner);
100		res
101	}
102}
103
104// Extension methods for `Waker`-based chains.
105impl Chain<Waker> {
106	/// Enters the current process into a FIFO-queue, to be activated later.
107	///
108	/// The reactivation follows queue-ordering, i.e. first-in first-out.
109	pub async fn fifo(&self) {
110		use odem_rs_core::ops::{sleep, waker};
111
112		// Construct a link containing the currently active waker.
113		let link = pin!(Link::new(waker().await));
114
115		// Subscribe to the chain in queue-order.
116		let mut inner = self.inner.take();
117		unsafe {
118			inner.push_back(UnsafeRef::from_raw(&*link));
119		}
120		self.inner.set(inner);
121
122		// Make sure to unsubscribe, even when canceled early.
123		scopeguard::defer! {
124			unsafe {
125				self.unsubscribe(link.as_ref());
126			}
127		}
128
129		// Await reactivation.
130		sleep().await;
131	}
132
133	/// Enters the current process into a LIFO-queue, to be activated later.
134	///
135	/// The reactivation follows stack-ordering, i.e. last-in first-out.
136	pub async fn lifo(&self) {
137		use odem_rs_core::ops::{sleep, waker};
138
139		// Construct a link containing the currently active waker.
140		let link = pin!(Link::new(waker().await));
141
142		// Subscribe to the chain in stack-order.
143		let mut inner = self.inner.take();
144		unsafe {
145			inner.push_front(UnsafeRef::from_raw(&*link));
146		}
147		self.inner.set(inner);
148
149		// Make sure to unsubscribe, even when panicking.
150		scopeguard::defer! {
151			unsafe {
152				self.unsubscribe(link.as_ref());
153			}
154		}
155
156		// Await reactivation.
157		sleep().await;
158	}
159}
160
161impl<N: Subscriber> Subscriber for Chain<N> {
162	/// Notifies all the linked objects, removing them all from the chain.
163	fn notify(&self) {
164		self.notify_all().go();
165	}
166}
167
168impl<N: Subscriber> Publisher for Chain<N> {
169	type Link = Link<N>;
170
171	unsafe fn subscribe(&self, link: Pin<&Self::Link>) {
172		let mut inner = self.inner.take();
173		unsafe {
174			inner.push_back(UnsafeRef::from_raw(&*link));
175		}
176		self.inner.set(inner);
177	}
178
179	unsafe fn unsubscribe(&self, link: Pin<&Self::Link>) {
180		if link.hook.is_linked() {
181			let mut inner = self.inner.take();
182			unsafe {
183				inner.cursor_mut_from_ptr(&*link).remove();
184			}
185			self.inner.set(inner);
186		}
187	}
188}
189
190impl<N: fmt::Debug> fmt::Debug for Chain<N> {
191	fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
192		let inner = self.inner.take();
193		let res = inner.fmt(f);
194		self.inner.set(inner);
195		res
196	}
197}
198
199impl<N> Default for Chain<N> {
200	fn default() -> Self {
201		Self::new()
202	}
203}
204
205impl<N> Drop for Chain<N> {
206	fn drop(&mut self) {
207		self.clear();
208	}
209}
210
211/* *********************************************************** Intrusive Link */
212
213/// A wrapper type to augment objects that may be notified into intrusive
214/// links that can be stored in intrusive collections.
215#[pin_project::pin_project]
216pub struct Link<N: ?Sized> {
217	/// Marks the structure as non-Send and non-Sync.
218	_mark: PhantomData<*const ()>,
219	/// Contains the intrusive link.
220	#[pin]
221	hook: LinkedListLink,
222	/// The object to be notified.
223	pub note: N,
224}
225
226impl<N> Link<N> {
227	/// Initializes a new link with the notifiable object.
228	pub const fn new(waker: N) -> Self {
229		Link {
230			hook: LinkedListLink::new(),
231			note: waker,
232			_mark: PhantomData,
233		}
234	}
235
236	/// Returns whether this link is in use or not.
237	pub fn is_linked(&self) -> bool {
238		self.hook.is_linked()
239	}
240
241	/// Strips the outer link and returns the inner `N`.
242	pub fn into_inner(self) -> N {
243		self.note
244	}
245}
246
247impl<N> From<N> for Link<N> {
248	fn from(value: N) -> Self {
249		Self::new(value)
250	}
251}
252
253impl<N: ?Sized + Subscriber> Subscriber for Link<N> {
254	fn notify(&self) {
255		self.note.notify();
256	}
257}
258
259impl<N: fmt::Debug> fmt::Debug for Link<N> {
260	fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
261		f.debug_struct("Link")
262			.field("hook", &self.hook)
263			.field("note", &self.note)
264			.finish()
265	}
266}
267
268impl fmt::Debug for Link<dyn Subscriber> {
269	fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
270		f.debug_struct("Link").field("hook", &self.hook).finish()
271	}
272}
273
274#[doc(hidden)]
275mod adapter {
276	#![allow(missing_docs)]
277	use super::*;
278
279	intrusive_adapter! {
280		/// An adapter for the [Link]-structure, allowing it to be inserted into an
281		/// intrusive [LinkedList].
282		pub LinkAdapter<N> = UnsafeRef<Link<N>>: Link<N> {
283			hook: LinkedListLink
284		}
285	}
286}
287
288pub use adapter::*;
289
290/* ***************************************************** Notification Helpers */
291
292/// Trait used to trigger the notification agent after the
293/// notify-configuration is complete.
294pub trait Trigger {
295	/// The result-type of the triggered notification operation.
296	type Output;
297
298	/// The notification operation itself.
299	fn trigger(self) -> Self::Output;
300}
301
302/* *************************************************************** Notify One */
303
304/// Notification builder that seeks, notifies and removes the first fitting
305/// object in a [Chain].
306#[must_use = "value must be consumed with go() in order to run"]
307pub struct NotifyOne<'c, N, P = (), F = (), const FIRST: bool = true> {
308	/// A reference to the [Chain].
309	chain: &'c Chain<N>,
310	/// A boolean function that is used to skip elements in the chain that are
311	/// not ready to be notified.
312	filter: P,
313	/// A function that is called at most once for the selected chain element.
314	///
315	/// This function is called before the actual notification is submitted.
316	action: F,
317}
318
319impl<'c, N: Subscriber> NotifyOne<'c, N, (), ()> {
320	/// Creates a notification configuration that notifies the first object in
321	/// the chain.
322	const fn new(chain: &'c Chain<N>) -> Self {
323		NotifyOne {
324			chain,
325			filter: (),
326			action: (),
327		}
328	}
329}
330
331impl<'c, N: Subscriber, P, F, const FIRST: bool> NotifyOne<'c, N, P, F, FIRST> {
332	/// Configures the notification configuration with a boolean predicate that
333	/// allows skipping the first sequence of objects that aren't ready to be
334	/// notified.
335	pub fn filter<X: Fn(&N) -> bool>(self, filter: X) -> NotifyOne<'c, N, X, F> {
336		NotifyOne {
337			chain: self.chain,
338			filter,
339			action: self.action,
340		}
341	}
342
343	/// Configures what happens after a suitable object has been found for
344	/// notification but before it is actually notified.
345	pub fn then<X: FnOnce(&N) -> R, R>(self, action: X) -> NotifyOne<'c, N, P, X> {
346		NotifyOne {
347			chain: self.chain,
348			filter: self.filter,
349			action,
350		}
351	}
352
353	/// Selects the first item in the chain to be notified.
354	pub fn first(self) -> NotifyOne<'c, N, P, F, true> {
355		NotifyOne {
356			chain: self.chain,
357			filter: self.filter,
358			action: self.action,
359		}
360	}
361
362	/// Selects the last item in the chain to be notified.
363	pub fn last(self) -> NotifyOne<'c, N, P, F, false> {
364		NotifyOne {
365			chain: self.chain,
366			filter: self.filter,
367			action: self.action,
368		}
369	}
370
371	/// Triggers the notification.
372	pub fn go(self) -> <Self as Trigger>::Output
373	where
374		Self: Trigger,
375	{
376		Trigger::trigger(self)
377	}
378}
379
380impl<N, P, F, R, const FIRST: bool> Trigger for NotifyOne<'_, N, P, F, FIRST>
381where
382	N: Subscriber,
383	P: Fn(&N) -> bool,
384	F: FnOnce(&N) -> R,
385{
386	type Output = Option<R>;
387
388	fn trigger(self) -> Self::Output {
389		// take a local copy of the chain and replace it with an empty list
390		let mut chain = self.chain.inner.replace(LinkedList::new(LinkAdapter::NEW));
391
392		// get a mutable cursor for iteration and removal
393		let mut cur = if FIRST {
394			chain.front_mut()
395		} else {
396			chain.back_mut()
397		};
398		let mut out = None;
399
400		// iterate over the elements
401		while let Some(link) = cur.get() {
402			// check the predicate
403			if (self.filter)(&link.note) {
404				// found the element: call the function, notify and remove
405				let res = (self.action)(&link.note);
406				link.notify();
407				cur.remove();
408
409				// store the result and break off the loop
410				out = Some(res);
411				break;
412			}
413
414			// move to the next/prev element
415			if FIRST {
416				cur.move_next();
417			} else {
418				cur.move_prev();
419			}
420		}
421
422		// restore the chain
423		self.chain.inner.replace(chain);
424
425		out
426	}
427}
428
429impl<N, P, const FIRST: bool> Trigger for NotifyOne<'_, N, P, (), FIRST>
430where
431	N: Subscriber,
432	P: Fn(&N) -> bool,
433{
434	type Output = ();
435
436	fn trigger(self) -> Self::Output {
437		// take a local copy of the chain and replace it with an empty list
438		let mut chain = self.chain.inner.replace(LinkedList::new(LinkAdapter::NEW));
439
440		// get a mutable cursor for iteration and removal
441		let mut cur = if FIRST {
442			chain.front_mut()
443		} else {
444			chain.back_mut()
445		};
446
447		// iterate over the elements
448		while let Some(link) = cur.get() {
449			// check the predicate
450			if (self.filter)(&link.note) {
451				// found the element: notify and remove
452				link.notify();
453				cur.remove();
454				break;
455			}
456
457			// move to the next/prev element
458			if FIRST {
459				cur.move_next();
460			} else {
461				cur.move_prev();
462			}
463		}
464
465		// restore the chain
466		self.chain.inner.replace(chain);
467	}
468}
469
470impl<N, F, R, const FIRST: bool> Trigger for NotifyOne<'_, N, (), F, FIRST>
471where
472	N: Subscriber,
473	F: FnOnce(&N) -> R,
474{
475	type Output = Option<R>;
476
477	fn trigger(self) -> Self::Output {
478		// immediately select the first link to notifiy
479		if FIRST {
480			self.chain.pop_front()
481		} else {
482			self.chain.pop_back()
483		}
484		.map(move |link| {
485			// run the provided closure before notifying
486			let res = (self.action)(&link.note);
487			link.notify();
488			res
489		})
490	}
491}
492
493impl<N, const FIRST: bool> Trigger for NotifyOne<'_, N, (), (), FIRST>
494where
495	N: Subscriber,
496{
497	type Output = ();
498
499	fn trigger(self) -> Self::Output {
500		// immediately select the first link to notify
501		if let Some(link) = if FIRST {
502			self.chain.pop_front()
503		} else {
504			self.chain.pop_back()
505		} {
506			link.notify();
507		}
508	}
509}
510
511/* *************************************************************** Notify All */
512
513/// Notification builder that locates, notifies and removes all fitting objects
514/// in a [Chain].
515#[must_use = "value must be consumed with go() in order to run"]
516pub struct NotifyAll<'c, N, P = (), F = (), const REV: bool = false> {
517	/// A reference to the [Chain].
518	chain: &'c Chain<N>,
519	/// A boolean function that is used to skip elements that are not ready to
520	/// be notified.
521	filter: P,
522	/// A function that is called for every fitting element immediately prior
523	/// to its notification.
524	///
525	/// This function may be used to aggregate data over the sequence of fitting
526	/// objects.
527	action: F,
528}
529
530impl<'c, N: Subscriber> NotifyAll<'c, N, (), ()> {
531	/// Creates a notification configuration that notifies all the objects
532	/// contained in the chain.
533	///
534	/// All successfully notified objects are removed from the chain.
535	const fn new(chain: &'c Chain<N>) -> Self {
536		NotifyAll {
537			chain,
538			filter: (),
539			action: (),
540		}
541	}
542}
543
544impl<'c, N: Subscriber, P, F, const REV: bool> NotifyAll<'c, N, P, F, REV> {
545	/// Configures the notification configuration with a boolean predicate that
546	/// allows skipping the objects that aren't ready to be notified.
547	pub fn filter<X: Fn(&N) -> bool>(self, filter: X) -> NotifyAll<'c, N, X, F> {
548		NotifyAll {
549			chain: self.chain,
550			filter,
551			action: self.action,
552		}
553	}
554
555	/// Configures what happens for every suitable object that has been found
556	/// for notification right before it is actually notified.
557	pub fn then<X: FnMut(&N)>(self, action: X) -> NotifyAll<'c, N, P, X> {
558		NotifyAll {
559			chain: self.chain,
560			filter: self.filter,
561			action,
562		}
563	}
564
565	/// Triggers the notification.
566	pub fn go(self) -> <Self as Trigger>::Output
567	where
568		Self: Trigger,
569	{
570		Trigger::trigger(self)
571	}
572}
573
574impl<'c, N: Subscriber, P, F> NotifyAll<'c, N, P, F, true> {
575	/// Reverses the order when triggering.
576	pub fn rev(self) -> NotifyAll<'c, N, P, F, false> {
577		NotifyAll {
578			chain: self.chain,
579			filter: self.filter,
580			action: self.action,
581		}
582	}
583}
584
585impl<'c, N: Subscriber, P, F> NotifyAll<'c, N, P, F, false> {
586	/// Reverses the order when triggering.
587	pub fn rev(self) -> NotifyAll<'c, N, P, F, true> {
588		NotifyAll {
589			chain: self.chain,
590			filter: self.filter,
591			action: self.action,
592		}
593	}
594}
595
596impl<N, P, F, const REV: bool> Trigger for NotifyAll<'_, N, P, F, REV>
597where
598	N: Subscriber,
599	P: Fn(&N) -> bool,
600	F: FnMut(&N),
601{
602	type Output = ();
603
604	fn trigger(mut self) -> Self::Output {
605		// take a local copy of the chain and replace it with an empty list
606		let mut chain = self.chain.inner.replace(LinkedList::new(LinkAdapter::NEW));
607
608		// get a mutable cursor for iteration and removal
609		let mut cur = if REV {
610			chain.back_mut()
611		} else {
612			chain.front_mut()
613		};
614
615		// iterate over the elements
616		while let Some(link) = cur.get() {
617			// check the predicate
618			if (self.filter)(&link.note) {
619				// located an element: call the function, notify and remove
620				(self.action)(&link.note);
621				link.notify();
622				cur.remove();
623			} else if !REV {
624				cur.move_next();
625			}
626
627			if REV {
628				cur.move_prev();
629			}
630		}
631
632		// restore the chain
633		self.chain.inner.replace(chain);
634	}
635}
636
637impl<N, P, const REV: bool> Trigger for NotifyAll<'_, N, P, (), REV>
638where
639	N: Subscriber,
640	P: Fn(&N) -> bool,
641{
642	type Output = ();
643
644	fn trigger(self) -> Self::Output {
645		// take a local copy of the chain and replace it with an empty list
646		let mut chain = self.chain.inner.replace(LinkedList::new(LinkAdapter::NEW));
647
648		// get a mutable cursor for iteration and removal
649		let mut cur = if REV {
650			chain.back_mut()
651		} else {
652			chain.front_mut()
653		};
654
655		// iterate over the elements
656		while let Some(link) = cur.get() {
657			// check the predicate
658			if (self.filter)(&link.note) {
659				// located an element: remove and notify
660				cur.remove().unwrap().notify();
661			} else if !REV {
662				cur.move_next();
663			}
664
665			if REV {
666				cur.move_prev();
667			}
668		}
669
670		// restore the chain
671		self.chain.inner.replace(chain);
672	}
673}
674
675impl<N, F, const REV: bool> Trigger for NotifyAll<'_, N, (), F, REV>
676where
677	N: Subscriber,
678	F: FnMut(&N),
679{
680	type Output = ();
681
682	fn trigger(mut self) -> Self::Output {
683		while let Some(link) = if REV {
684			self.chain.pop_back()
685		} else {
686			self.chain.pop_front()
687		} {
688			(self.action)(&link.note);
689			link.notify();
690		}
691	}
692}
693
694impl<N: Subscriber, const REV: bool> Trigger for NotifyAll<'_, N, (), (), REV> {
695	type Output = ();
696
697	fn trigger(self) -> Self::Output {
698		while let Some(link) = if REV {
699			self.chain.pop_back()
700		} else {
701			self.chain.pop_front()
702		} {
703			link.notify();
704		}
705	}
706}
707
708/* *********************************************************** Waker Notifier */
709
710impl Subscriber for Waker {
711	fn notify(&self) {
712		self.wake_by_ref();
713	}
714}