ruspiro_lock/async/
asyncsemaphore.rs

1/***********************************************************************************************************************
2 * Copyright (c) 2020 by the authors
3 *
4 * Author: André Borrmann <pspwizard@gmx.de>
5 * License: Apache License 2.0 / MIT
6 **********************************************************************************************************************/
7
8//! # Async Semaphore
9//!
10
11extern crate alloc;
12
13use crate::sync::{Mutex, Semaphore};
14use alloc::{collections::BTreeMap, sync::Arc};
15use core::{
16  future::Future,
17  pin::Pin,
18  task::{Context, Poll, Waker},
19};
20
21pub struct AsyncSemaphore {
22  inner: Arc<Mutex<AsyncSemaphoreInner>>,
23  sema: Arc<Semaphore>,
24}
25
26impl AsyncSemaphore {
27  pub fn new(initial: u32) -> Self {
28    Self {
29      inner: Arc::new(Mutex::new(AsyncSemaphoreInner::new())),
30      sema: Arc::new(Semaphore::new(initial)),
31    }
32  }
33
34  pub async fn down(&self) {
35    // if we cann't immediately pull the semaphore down we need to use a future to poll the
36    // result
37    if self.sema.try_down().is_err() {
38      let mut inner = self.inner.lock();
39      let current_id = inner.next_waiter;
40      inner.next_waiter += 1;
41      drop(inner);
42
43      AsyncSemaphoreFuture::new(Arc::clone(&self.inner), Arc::clone(&self.sema), current_id).await
44    }
45  }
46
47  /// when increasing the [AsyncSemaphore] we will increase the embedded [Semaphore] and notify the next waiter in the
48  /// list that previously did not got the chance to decrease the [Semaphore]
49  pub fn up(&self) {
50    self.sema.up();
51
52    let mut inner = self.inner.lock();
53    if let Some(&waiter_id) = inner.waiter.keys().next() {
54      let waiter = inner.waiter.remove(&waiter_id).unwrap();
55      waiter.wake();
56    }
57  }
58}
59
60/// The `Future` that represents an `await`able semaphore down request to an [AsyncSemaphore] and can only be created
61/// from functions of the [AsyncSemaphore]
62struct AsyncSemaphoreFuture {
63  inner: Arc<Mutex<AsyncSemaphoreInner>>,
64  sema: Arc<Semaphore>,
65  id: usize,
66}
67
68impl AsyncSemaphoreFuture {
69  fn new(inner: Arc<Mutex<AsyncSemaphoreInner>>, sema: Arc<Semaphore>, id: usize) -> Self {
70    Self { inner, sema, id }
71  }
72}
73
74impl Future for AsyncSemaphoreFuture {
75  type Output = ();
76
77  fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
78    let this = self.get_mut();
79
80    if this.sema.try_down().is_ok() {
81      Poll::Ready(())
82    } else {
83      let mut inner = this.inner.lock();
84      inner.waiter.insert(this.id, cx.waker().clone());
85      drop(inner);
86
87      Poll::Pending
88    }
89  }
90}
91
92struct AsyncSemaphoreInner {
93  /// If the lock could not be aquired we store the requestor id here to allow the next one
94  /// already waiting for the lock to retrieve it
95  waiter: BTreeMap<usize, Waker>,
96  /// The id of the next waiter that can be woken once the lock is released and someone else is already waiting for
97  /// the lock to be aquired
98  next_waiter: usize,
99}
100
101impl AsyncSemaphoreInner {
102  fn new() -> Self {
103    Self {
104      waiter: BTreeMap::new(),
105      next_waiter: 0,
106    }
107  }
108}