Skip to main content

operation_queue/
line_token.rs

1/* This Source Code Form is subject to the terms of the Mozilla Public
2 * License, v. 2.0. If a copy of the MPL was not distributed with this
3 * file, You can obtain one at http://mozilla.org/MPL/2.0/. */
4
5//! Helpers for synchronizing operations (e.g. error handling) across futures.
6//!
7//! This module revolves around the [`Line`] struct, which is an asynchronous
8//! flow control structure that behaves a bit like a mutex, with the exception
9//! that consumers waiting for the [`Line`] to be released do not subsequently
10//! lock it.
11//!
12//! The design of a [`Line`] is inspired from the one of a [one-track railway
13//! line](https://en.wikipedia.org/wiki/Token_(railway_signalling)). To avoid
14//! collisions, conductors must acquire a token at the entrance to the line that
15//! ensures they're the only one on it. If the token is being held, traffic
16//! around this line stops until it's released again.
17//!
18//! Similarly, in a context with multiple parallel [`Future`]s, it might be
19//! necessary to ensure only one takes care of a given operation. For example,
20//! if multiple requests are being performed against the same service, and one
21//! of them hits an authentication error, it is likely the others will as well.
22//! In this case, it is preferrable to only let one future handle the error than
23//! let every request re-authenticate independently (in this example,
24//! credentials are the same across requests, and multiple simultaneous
25//! authentication attempts might cause issues with complex flows).
26//!
27//! Each future holds a shared on a [`Line`] (e.g. wrapped in an [`Rc`] or an
28//! [`Arc`]). Whenever a future needs to perform an operation that should only
29//! be performed once at a time, it attempts to acquire the line's token with
30//! [`Line::try_acquire_token`]. This function returns an enum
31//! ([`AcquireOutcome`]) describing one of two cases:
32//!
33//! * The line's token is available and has been acquired, and the future can
34//!   start performing the operation immediately. It is granted the line's
35//!   [`Token`], which it must hold in scope for the duration of the operation,
36//!   as dropping it releases the line.
37//! * The line's token has already been acquired by another future, in which
38//!   case the future must wait for the line to become available again. When the
39//!   line becomes available again, the future does not need to acquire another
40//!   token, as another future should have taken care of performing the
41//!   operation.
42//!
43//! [`OperationQueue`]: crate::operation_queue::OperationQueue
44//! [`Future`]: std::future::Future
45//! [`Rc`]: std::rc::Rc
46//! [`Arc`]: std::sync::Arc
47
48use std::cell::RefCell;
49
50use futures::{FutureExt, future::Shared};
51use oneshot::{Receiver, Sender};
52
53/// A oneshot channel used internally by a [`Line`] that's been acquired to
54/// communicate that the token has been dropped and the line was released.
55///
56/// The channel's [`Receiver`] is wrapped in a [`Shared`] that can be cloned
57/// when a new consumer tries and fails to acquire a token for the line.
58struct ReleaseChannel {
59    sender: Sender<()>,
60    receiver: Shared<Receiver<()>>,
61}
62
63/// A [`Line`] from which a [`Token`] can be acquired.
64#[derive(Default)]
65pub struct Line {
66    // TODO: We should look into replacing this `RefCell` with a `Mutex` from
67    // `async_lock` to make `Line` thread-safe.
68    // https://github.com/thunderbird/operation-queue-rs/issues/2
69    channel: RefCell<Option<ReleaseChannel>>,
70}
71
72impl Line {
73    /// Instantiates a new [`Line`].
74    pub fn new() -> Line {
75        Line {
76            channel: Default::default(),
77        }
78    }
79
80    /// Attempts to acquire a [`Token`] for this line.
81    ///
82    /// The [`Token`] automatically releases the line upon leaving the current
83    /// scope and getting dropped.
84    ///
85    /// If a [`Token`] has already been acquired for this line, a future to
86    /// `await` is returned instead. It resolves when the current token holder
87    /// has finished handling the current error and releases the line.
88    pub fn try_acquire_token<'l>(&'l self) -> AcquireOutcome<'l> {
89        if let Some(channel) = self.channel.borrow().as_ref() {
90            // Since the oneshot `Receiver` is wrapped in a `Shared`, cloning it
91            // will return a new handle on the `Shared` which will resolve at
92            // the same time as the others.
93            return AcquireOutcome::Failure(channel.receiver.clone());
94        }
95
96        // The line is currently available, create a new channel and give the
97        // consumer their token.
98        let (sender, receiver) = oneshot::channel();
99        self.channel.replace(Some(ReleaseChannel {
100            sender,
101            receiver: receiver.shared(),
102        }));
103
104        AcquireOutcome::Success(Token { line: self })
105    }
106
107    /// Releases the line, and resolves the [`Shared`] future other consumers
108    /// might be awaiting.
109    pub(self) fn release(&self) {
110        // "Take" the channel out of the `RefCell`; on top of letting us access
111        // its content, we're also making sure that even if something bad
112        // happens then the line can be acquired again.
113        match self.channel.take() {
114            Some(channel) => match channel.sender.send(()) {
115                Ok(_) => (),
116                Err(_) => log::error!("trying to release using a closed channel"),
117            },
118            None => log::error!("trying to release before acquiring"),
119        };
120    }
121}
122
123/// The outcome from trying to acquire a [`Token`] for a [`Line`].
124#[must_use = "if the token is unused the line will immediately release again"]
125pub enum AcquireOutcome<'ao> {
126    /// The line could be acquired and returned a token to hold on to.
127    ///
128    /// The token must remain in scope, as it will release the line when
129    /// dropped.
130    Success(Token<'ao>),
131
132    /// The line could not be acquired as another consumer is holding a token
133    /// for it.
134    ///
135    /// This variant includes a [`Shared`] future that resolves when the current
136    /// token holder drops it and releases the line.
137    Failure(Shared<Receiver<()>>),
138}
139
140impl<'ao> AcquireOutcome<'ao> {
141    /// Returns the [`AcquireOutcome`] if it's a success, otherwise returns a
142    /// success with the provided token if it's not [`None`].
143    ///
144    /// If the current [`AcquireOutcome`] is a failure, and the provided token
145    /// is [`None`], the failure is returned.
146    ///
147    /// # Design considerations
148    ///
149    /// One way to make this method more straightforward could have been to make
150    /// `token` be a [`Token`], not an [`Option`], but the current signature was
151    /// picked to simplify the consumers (which store the token, if any, in an
152    /// [`Option`]).
153    pub fn or_token(self, token: Option<Token<'ao>>) -> Self {
154        match self {
155            AcquireOutcome::Success(_) => self,
156            AcquireOutcome::Failure(_) => match token {
157                Some(token) => AcquireOutcome::Success(token),
158                None => self,
159            },
160        }
161    }
162}
163
164/// A token that symbolizes the current consumer holds exclusive access to the
165/// corresponding [`Line`].
166///
167/// The [`Line`] is automatically released when this token goes out of scope and
168/// is dropped.
169#[must_use = "if unused the line will immediately release again"]
170pub struct Token<'t> {
171    line: &'t Line,
172}
173
174impl Drop for Token<'_> {
175    fn drop(&mut self) {
176        self.line.release();
177    }
178}
179
180#[cfg(test)]
181mod tests {
182    use tokio::time::Duration;
183
184    use super::*;
185
186    fn get_token(line: &Line) -> Token<'_> {
187        match line.try_acquire_token() {
188            AcquireOutcome::Success(token) => token,
189            AcquireOutcome::Failure(_) => panic!("expected a token from try_acquire_token()"),
190        }
191    }
192
193    #[test]
194    fn acquire_token() {
195        let line = Line::new();
196
197        let _token = get_token(&line);
198
199        match line.try_acquire_token() {
200            AcquireOutcome::Success(_) => {
201                panic!("should not be able to acquire the line while the token is in scope")
202            }
203            AcquireOutcome::Failure(_) => (),
204        }
205    }
206
207    #[test]
208    fn token_out_of_scope() {
209        let line = Line::new();
210
211        {
212            let _token = get_token(&line);
213
214            match line.try_acquire_token() {
215                AcquireOutcome::Success(_) => {
216                    panic!("should not be able to acquire the line while the token is in scope")
217                }
218                AcquireOutcome::Failure(_) => (),
219            }
220        }
221
222        match line.try_acquire_token() {
223            AcquireOutcome::Success(_) => (),
224            AcquireOutcome::Failure(_) => {
225                panic!("expected a token now that the previous token has been dropped")
226            }
227        }
228    }
229
230    #[test]
231    fn or_token() {
232        let line = Line::new();
233
234        let token = get_token(&line);
235
236        match line.try_acquire_token().or_token(Some(token)) {
237            AcquireOutcome::Success(_) => (),
238            AcquireOutcome::Failure(_) => panic!("we should have kept our token"),
239        }
240    }
241
242    #[tokio::test(flavor = "current_thread")]
243    async fn line_release_on_drop() {
244        let line = Line::new();
245
246        // A mutable variable that will act as the test's success flag and will
247        // only be true if it succeeds.
248        let mut success = false;
249
250        // Acquire the line's token, sleep for a bit (10ms) and then drop it.
251        // The reason we sleep here is to give some time to `wait_for_line` to
252        // try (and fail) to acquire the line's token before we drop it.
253        async fn acquire_sleep_and_drop(line: &Line) {
254            let _token = get_token(&line);
255            tokio::time::sleep(Duration::from_millis(10)).await;
256        }
257
258        // Try (and fail) to acquire the token, then wait for the line to become
259        // available again. This function sets the success flag.
260        async fn wait_for_line(line: &Line, success: &mut bool) {
261            let shared = match line.try_acquire_token() {
262                AcquireOutcome::Success(_) => {
263                    panic!("should not be able to acquire the line while the token is in scope")
264                }
265                AcquireOutcome::Failure(shared) => shared,
266            };
267
268            shared.await.unwrap();
269            *success = true;
270        }
271
272        // Run both futures in parallel. `biased;` ensures the futures are
273        // polled in order (meaning `acquire_sleep_and_drop` is run first).
274        tokio::join! {
275            biased;
276            acquire_sleep_and_drop(&line),
277            wait_for_line(&line, &mut success),
278        };
279
280        assert!(success)
281    }
282}