ruspiro_lock/async/
asyncsemaphore.rs1extern crate alloc;
12
13use crate::sync::{Mutex, Semaphore};
14use alloc::{collections::BTreeMap, sync::Arc};
15use core::{
16 future::Future,
17 pin::Pin,
18 task::{Context, Poll, Waker},
19};
20
21pub struct AsyncSemaphore {
22 inner: Arc<Mutex<AsyncSemaphoreInner>>,
23 sema: Arc<Semaphore>,
24}
25
26impl AsyncSemaphore {
27 pub fn new(initial: u32) -> Self {
28 Self {
29 inner: Arc::new(Mutex::new(AsyncSemaphoreInner::new())),
30 sema: Arc::new(Semaphore::new(initial)),
31 }
32 }
33
34 pub async fn down(&self) {
35 if self.sema.try_down().is_err() {
38 let mut inner = self.inner.lock();
39 let current_id = inner.next_waiter;
40 inner.next_waiter += 1;
41 drop(inner);
42
43 AsyncSemaphoreFuture::new(Arc::clone(&self.inner), Arc::clone(&self.sema), current_id).await
44 }
45 }
46
47 pub fn up(&self) {
50 self.sema.up();
51
52 let mut inner = self.inner.lock();
53 if let Some(&waiter_id) = inner.waiter.keys().next() {
54 let waiter = inner.waiter.remove(&waiter_id).unwrap();
55 waiter.wake();
56 }
57 }
58}
59
60struct AsyncSemaphoreFuture {
63 inner: Arc<Mutex<AsyncSemaphoreInner>>,
64 sema: Arc<Semaphore>,
65 id: usize,
66}
67
68impl AsyncSemaphoreFuture {
69 fn new(inner: Arc<Mutex<AsyncSemaphoreInner>>, sema: Arc<Semaphore>, id: usize) -> Self {
70 Self { inner, sema, id }
71 }
72}
73
74impl Future for AsyncSemaphoreFuture {
75 type Output = ();
76
77 fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
78 let this = self.get_mut();
79
80 if this.sema.try_down().is_ok() {
81 Poll::Ready(())
82 } else {
83 let mut inner = this.inner.lock();
84 inner.waiter.insert(this.id, cx.waker().clone());
85 drop(inner);
86
87 Poll::Pending
88 }
89 }
90}
91
92struct AsyncSemaphoreInner {
93 waiter: BTreeMap<usize, Waker>,
96 next_waiter: usize,
99}
100
101impl AsyncSemaphoreInner {
102 fn new() -> Self {
103 Self {
104 waiter: BTreeMap::new(),
105 next_waiter: 0,
106 }
107 }
108}