odem_rs_sync/
facility.rs

1//! This module contains support for a GPSS-like facility.
2
3use core::{
4	cell::Cell,
5	future::Future,
6	pin::Pin,
7	task::Waker,
8	task::{Context, Poll},
9};
10
11use crate::{
12	Publisher,
13	chain::{Chain, Link},
14	error::FacilityOccupied,
15};
16
17/* ***************************************************************** Facility */
18
19/// GPSS-inspired facility that houses one exclusive resource to be used by
20/// arbitrary many processes.
21pub struct Facility {
22	/// A queue of waiting-to-be-activated processes.
23	queue: Chain<Waker>,
24	/// A boolean flag indicating whether the facility is in use or not.
25	in_use: Cell<bool>,
26}
27
28impl Facility {
29	/// Creates a new facility.
30	pub fn new() -> Self {
31		Facility {
32			queue: Chain::new(),
33			in_use: Cell::new(false),
34		}
35	}
36
37	/// Attempts to seize the facility without blocking, returning `Err` if
38	/// it failed and `Ok` otherwise.
39	pub fn try_seize(&self) -> Result<SeizeGuard<'_>, FacilityOccupied> {
40		if self.in_use.replace(true) {
41			Err(FacilityOccupied)
42		} else {
43			Ok(SeizeGuard(self))
44		}
45	}
46
47	/// Attempts to seize the facility, blocking until it's possible.
48	pub fn seize(&self) -> impl Future<Output = SeizeGuard<'_>> + '_ {
49		SeizeFuture::new(self)
50	}
51
52	/// Releases the facility and activates the next waiting process.
53	fn release(&self) {
54		self.in_use.set(false);
55		self.queue.notify_one().go();
56	}
57}
58
59impl Default for Facility {
60	fn default() -> Self {
61		Facility::new()
62	}
63}
64
65/* ************************************************************* Seize Future */
66
67/// Future that blocks until the underlying [Facility] can be seized.
68#[pin_project::pin_project(PinnedDrop)]
69struct SeizeFuture<'f> {
70	/// Shared reference to the facility.
71	facility: &'f Facility,
72	/// Intrusive [Link] into a [Chain]. Initially `None`.
73	#[pin]
74	link: Option<Link<Waker>>,
75}
76
77impl<'f> SeizeFuture<'f> {
78	/// Creates the future from the [Facility].
79	const fn new(facility: &'f Facility) -> Self {
80		SeizeFuture {
81			facility,
82			link: None,
83		}
84	}
85}
86
87impl<'f> Future for SeizeFuture<'f> {
88	type Output = SeizeGuard<'f>;
89
90	fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
91		let mut inner = self.project();
92
93		// test if the link has been initialized previously without moving it
94		if let Some(_link) = inner.link.as_ref().as_pin_ref() {
95			// we must have been reactivated because the facility is free again
96
97			// note that it is not necessary to unsubscribe here, since the
98			// reactivation unsubscribed us automatically
99			debug_assert!(
100				!_link.is_linked(),
101				"reactivation should not be possible without being \
102				 unsubscribed from the chain"
103			);
104
105			// drop the old link
106			inner.link.set(None);
107
108			debug_assert!(
109				!inner.facility.in_use.get(),
110				"reactivation should not be possible if the facility is \
111				 still in use"
112			);
113
114			// seize the facility
115
116			// note that we only get reactivated if the facility is free
117			inner.facility.in_use.set(true);
118		} else if inner.facility.in_use.replace(true) {
119			// the facility is currently in use and cannot be seized
120
121			// create the pinned link
122			inner.link.set(Some(Link::new(cx.waker().clone())));
123
124			// initialize the corresponding attribute
125			let link = unsafe { inner.link.as_ref().as_pin_ref().unwrap_unchecked() };
126
127			// register for reactivation
128			unsafe {
129				inner.facility.queue.subscribe(link);
130			}
131
132			// await reactivation
133			return Poll::Pending;
134		}
135
136		// the facility has successfully been seized by us, so return the guard
137		Poll::Ready(SeizeGuard(inner.facility))
138	}
139}
140
141#[pin_project::pinned_drop]
142impl PinnedDrop for SeizeFuture<'_> {
143	fn drop(self: Pin<&mut Self>) {
144		// make sure to unsubscribe
145		if let Some(link) = self.as_ref().project_ref().link.as_pin_ref() {
146			unsafe {
147				self.facility.queue.unsubscribe(link);
148			}
149		}
150	}
151}
152
153/* ************************************************************** Seize Guard */
154
155/// Guards the referenced [Facility] to ensure that it is vacated at the end of
156/// the Guard's scope.
157#[must_use = "the lifetime of this guard restricts the time this facility is occupied for"]
158pub struct SeizeGuard<'f>(&'f Facility);
159
160impl SeizeGuard<'_> {
161	/// Releases the facility managed by this guard.
162	pub fn release(self) {}
163}
164
165impl Drop for SeizeGuard<'_> {
166	fn drop(&mut self) {
167		self.0.release();
168	}
169}