1#![no_std]
2
3use ach_cell as ach;
4use ach_util::Error;
5use async_ach_notify::{Listener, Notify};
6use core::future::Future;
7use core::ops::Deref;
8use core::pin::Pin;
9use core::task::{Context, Poll};
10
11pub struct Ref<'a, T, const MP: usize, const MC: usize> {
12 parent: &'a Cell<T, MP, MC>,
13 val: ach::Ref<'a, T>,
14}
15impl<'a, T, const MP: usize, const MC: usize> Deref for Ref<'a, T, MP, MC> {
16 type Target = ach::Ref<'a, T>;
17 fn deref(&self) -> &Self::Target {
18 &self.val
19 }
20}
21impl<'a, T, const MP: usize, const MC: usize> Drop for Ref<'a, T, MP, MC> {
22 fn drop(&mut self) {
23 if self.val.ref_num() == Ok(1) {
24 if self.val.will_remove() {
25 self.parent.consumer.notify_waiters();
26 } else {
27 self.parent.producer.notify_waiters();
28 }
29 }
30 }
31}
32
33pub struct Cell<T, const MP: usize, const MC: usize> {
34 val: ach::Cell<T>,
35 consumer: Notify<MP>,
36 producer: Notify<MC>,
37}
38impl<T, const MP: usize, const MC: usize> Cell<T, MP, MC> {
39 pub const fn new() -> Self {
40 Self {
41 val: ach::Cell::new(),
42 consumer: Notify::new(),
43 producer: Notify::new(),
44 }
45 }
46 pub const fn new_with(val: T) -> Self {
47 Self {
48 val: ach::Cell::new_with(val),
49 consumer: Notify::new(),
50 producer: Notify::new(),
51 }
52 }
53}
54impl<T: Unpin, const MP: usize, const MC: usize> Cell<T, MP, MC> {
55 pub unsafe fn peek(&self) -> &T {
56 self.val.peek()
57 }
58 pub fn try_get(&self) -> Result<Ref<T, MP, MC>, Error<()>> {
62 self.val.try_get().map(|x| Ref {
63 parent: self,
64 val: x,
65 })
66 }
67 pub fn get(&self) -> Get<'_, T, MP, MC> {
71 Get {
72 parent: self,
73 wait_p: self.producer.listen(),
74 }
75 }
76 pub fn try_set(&self, val: T) -> Result<(), Error<T>> {
80 self.val.try_set(val).map(|x| {
81 self.producer.notify_waiters();
82 x
83 })
84 }
85 pub fn set(&self, val: T) -> Set<'_, T, MP, MC> {
89 Set {
90 parent: self,
91 wait_c: self.consumer.listen(),
92 val: Some(val),
93 }
94 }
95 pub fn try_take(&self) -> Result<Option<T>, Error<()>> {
99 self.val.try_take().map(|x| {
100 self.consumer.notify_waiters();
101 x
102 })
103 }
104 pub fn take(&self) -> Take<'_, T, MP, MC> {
106 Take {
107 parent: self,
108 wait_p: self.producer.listen(),
109 wait_c: self.consumer.listen(),
110 }
111 }
112 pub fn try_replace(&self, val: T) -> Result<Option<T>, Error<T>> {
116 self.val.try_replace(val).map(|x| {
117 self.producer.notify_waiters();
118 x
119 })
120 }
121 pub fn replace(&self, val: T) -> Replace<'_, T, MP, MC> {
123 Replace {
124 parent: self,
125 wait_p: self.producer.listen(),
126 wait_c: self.consumer.listen(),
127 val: Some(val),
128 }
129 }
130}
131
132pub struct Get<'a, T, const MP: usize, const MC: usize> {
133 parent: &'a Cell<T, MP, MC>,
134 wait_p: Listener<'a, MC>,
135}
136impl<'a, T: Unpin, const MP: usize, const MC: usize> Future for Get<'a, T, MP, MC> {
137 type Output = Result<Ref<'a, T, MP, MC>, Error<()>>;
138 fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
139 match self.parent.try_get() {
140 Ok(v) => Poll::Ready(Ok(v)),
141 Err(err) if err.retry => {
142 if Pin::new(&mut self.wait_p).poll(cx).is_ready() {
143 self.poll(cx)
144 } else {
145 Poll::Pending
146 }
147 }
148 Err(err) => Poll::Ready(Err(err)),
149 }
150 }
151}
152pub struct Set<'a, T, const MP: usize, const MC: usize> {
153 parent: &'a Cell<T, MP, MC>,
154 wait_c: Listener<'a, MP>,
155 val: Option<T>,
156}
157impl<'a, T: Unpin, const MP: usize, const MC: usize> Future for Set<'a, T, MP, MC> {
158 type Output = Result<(), Error<T>>;
159 fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
160 let val = self.val.take().expect("resumed after completion");
161 match self.parent.try_set(val) {
162 Ok(v) => Poll::Ready(Ok(v)),
163 Err(err) if err.retry => {
164 self.val = Some(err.input);
165 if Pin::new(&mut self.wait_c).poll(cx).is_ready() {
166 self.poll(cx)
167 } else {
168 Poll::Pending
169 }
170 }
171 Err(err) => Poll::Ready(Err(err)),
172 }
173 }
174}
175
176pub struct Take<'a, T, const MP: usize, const MC: usize> {
177 parent: &'a Cell<T, MP, MC>,
178 wait_p: Listener<'a, MC>,
179 wait_c: Listener<'a, MP>,
180}
181impl<'a, T: Unpin, const MP: usize, const MC: usize> Future for Take<'a, T, MP, MC> {
182 type Output = Option<T>;
183 fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
184 match self.parent.try_take() {
185 Ok(v) => Poll::Ready(v),
186 Err(_) => {
187 if Pin::new(&mut self.wait_p).poll(cx).is_ready()
188 || Pin::new(&mut self.wait_c).poll(cx).is_ready()
189 {
190 self.poll(cx)
191 } else {
192 Poll::Pending
193 }
194 }
195 }
196 }
197}
198pub struct Replace<'a, T, const MP: usize, const MC: usize> {
199 parent: &'a Cell<T, MP, MC>,
200 wait_p: Listener<'a, MC>,
201 wait_c: Listener<'a, MP>,
202 val: Option<T>,
203}
204impl<'a, T: Unpin, const MP: usize, const MC: usize> Future for Replace<'a, T, MP, MC> {
205 type Output = Option<T>;
206 fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
207 let val = self.val.take().expect("resumed after completion");
208 match self.parent.try_replace(val) {
209 Ok(v) => Poll::Ready(v),
210 Err(err) => {
211 let _ = Pin::new(&mut self.wait_p).poll(cx);
212 let _ = Pin::new(&mut self.wait_c).poll(cx);
213 match self.parent.try_replace(err.input) {
214 Ok(v) => Poll::Ready(v),
215 Err(err) => {
216 self.val = Some(err.input);
217 Poll::Pending
218 }
219 }
220 }
221 }
222 }
223}