1#![no_std]
2
3use ach_cell as ach;
4use ach_util::Error;
5use async_ach_notify::{Listener, Notify};
6use core::future::Future;
7use core::ops::Deref;
8use core::pin::Pin;
9use core::task::{Context, Poll};
10
11pub struct Ref<'a, T, const MP: usize, const MC: usize> {
12 parent: &'a Cell<T, MP, MC>,
13 val: ach::Ref<'a, T>,
14}
15impl<'a, T, const MP: usize, const MC: usize> Deref for Ref<'a, T, MP, MC> {
16 type Target = ach::Ref<'a, T>;
17 fn deref(&self) -> &Self::Target {
18 &self.val
19 }
20}
21impl<'a, T, const MP: usize, const MC: usize> Drop for Ref<'a, T, MP, MC> {
22 fn drop(&mut self) {
23 if self.val.ref_num() == Ok(1) {
24 if self.val.will_remove() {
25 self.parent.consumer.notify_one();
26 } else {
27 self.parent.producer.notify_one();
28 }
29 }
30 }
31}
32
33pub struct Cell<T, const MP: usize, const MC: usize> {
34 val: ach::Cell<T>,
35 consumer: Notify<MP>,
36 producer: Notify<MC>,
37}
38impl<T, const MP: usize, const MC: usize> Cell<T, MP, MC> {
39 pub const fn new() -> Self {
40 Self {
41 val: ach::Cell::new(),
42 consumer: Notify::new(),
43 producer: Notify::new(),
44 }
45 }
46 pub const fn new_with(val: T) -> Self {
47 Self {
48 val: ach::Cell::new_with(val),
49 consumer: Notify::new(),
50 producer: Notify::new(),
51 }
52 }
53}
54impl<T: Unpin, const MP: usize, const MC: usize> Cell<T, MP, MC> {
55 pub unsafe fn peek(&self) -> &T {
56 self.val.peek()
57 }
58 pub fn try_get(&'_ self) -> Result<Ref<'_, T, MP, MC>, Error<()>> {
62 self.val.try_get().map(|x| Ref {
63 parent: self,
64 val: x,
65 })
66 }
67 pub fn get(&self) -> Get<'_, T, MP, MC> {
71 Get {
72 parent: self,
73 wait_p: self.producer.listen(),
74 }
75 }
76 pub fn try_set(&self, val: T) -> Result<(), Error<T>> {
80 self.val.try_set(val).inspect(|_x| {
81 self.producer.notify_one();
82 })
83 }
84 pub fn set(&self, val: T) -> Set<'_, T, MP, MC> {
88 Set {
89 parent: self,
90 wait_c: self.consumer.listen(),
91 val: Some(val),
92 }
93 }
94 pub fn try_take(&self) -> Result<Option<T>, Error<()>> {
98 self.val.try_take().inspect(|_x| {
99 self.consumer.notify_one();
100 })
101 }
102 pub fn take(&self) -> Take<'_, T, MP, MC> {
104 Take {
105 parent: self,
106 wait_p: self.producer.listen(),
107 wait_c: self.consumer.listen(),
108 }
109 }
110 pub fn try_replace(&self, val: T) -> Result<Option<T>, Error<T>> {
114 self.val.try_replace(val).inspect(|_x| {
115 self.producer.notify_one();
116 })
117 }
118 pub fn replace(&self, val: T) -> Replace<'_, T, MP, MC> {
120 Replace {
121 parent: self,
122 wait_p: self.producer.listen(),
123 wait_c: self.consumer.listen(),
124 val: Some(val),
125 }
126 }
127}
128
129pub struct Get<'a, T, const MP: usize, const MC: usize> {
130 parent: &'a Cell<T, MP, MC>,
131 wait_p: Listener<'a, MC>,
132}
133impl<'a, T: Unpin, const MP: usize, const MC: usize> Future for Get<'a, T, MP, MC> {
134 type Output = Result<Ref<'a, T, MP, MC>, Error<()>>;
135 fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
136 match self.parent.try_get() {
137 Ok(v) => Poll::Ready(Ok(v)),
138 Err(err) if err.retry => {
139 if Pin::new(&mut self.wait_p).poll(cx).is_ready() {
140 self.poll(cx)
141 } else {
142 Poll::Pending
143 }
144 }
145 Err(err) => Poll::Ready(Err(err)),
146 }
147 }
148}
149pub struct Set<'a, T, const MP: usize, const MC: usize> {
150 parent: &'a Cell<T, MP, MC>,
151 wait_c: Listener<'a, MP>,
152 val: Option<T>,
153}
154impl<'a, T: Unpin, const MP: usize, const MC: usize> Future for Set<'a, T, MP, MC> {
155 type Output = Result<(), Error<T>>;
156 fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
157 let val = self.val.take().expect("resumed after completion");
158 match self.parent.try_set(val) {
159 Ok(v) => Poll::Ready(Ok(v)),
160 Err(err) if err.retry => {
161 self.val = Some(err.input);
162 if Pin::new(&mut self.wait_c).poll(cx).is_ready() {
163 self.poll(cx)
164 } else {
165 Poll::Pending
166 }
167 }
168 Err(err) => Poll::Ready(Err(err)),
169 }
170 }
171}
172
173pub struct Take<'a, T, const MP: usize, const MC: usize> {
174 parent: &'a Cell<T, MP, MC>,
175 wait_p: Listener<'a, MC>,
176 wait_c: Listener<'a, MP>,
177}
178impl<'a, T: Unpin, const MP: usize, const MC: usize> Future for Take<'a, T, MP, MC> {
179 type Output = Option<T>;
180 fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
181 match self.parent.try_take() {
182 Ok(v) => Poll::Ready(v),
183 Err(_) => {
184 if Pin::new(&mut self.wait_p).poll(cx).is_ready()
185 || Pin::new(&mut self.wait_c).poll(cx).is_ready()
186 {
187 self.poll(cx)
188 } else {
189 Poll::Pending
190 }
191 }
192 }
193 }
194}
195pub struct Replace<'a, T, const MP: usize, const MC: usize> {
196 parent: &'a Cell<T, MP, MC>,
197 wait_p: Listener<'a, MC>,
198 wait_c: Listener<'a, MP>,
199 val: Option<T>,
200}
201impl<'a, T: Unpin, const MP: usize, const MC: usize> Future for Replace<'a, T, MP, MC> {
202 type Output = Option<T>;
203 fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
204 let val = self.val.take().expect("resumed after completion");
205 match self.parent.try_replace(val) {
206 Ok(v) => Poll::Ready(v),
207 Err(err) => {
208 let _ = Pin::new(&mut self.wait_p).poll(cx);
209 let _ = Pin::new(&mut self.wait_c).poll(cx);
210 match self.parent.try_replace(err.input) {
211 Ok(v) => Poll::Ready(v),
212 Err(err) => {
213 self.val = Some(err.input);
214 Poll::Pending
215 }
216 }
217 }
218 }
219 }
220}