1#![deny(unsafe_code)]
2use core::{
3 fmt::{self, Debug, Formatter},
4 future::Future,
5 pin::Pin,
6 sync::atomic::{AtomicBool, Ordering},
7 task::{Context, Poll, Waker},
8};
9use std::{
10 error::Error,
11 mem,
12 sync::{Condvar, Mutex},
13};
14use std::{sync::Arc, thread};
15
16#[cfg(feature = "compact")]
17mod compact;
18#[cfg(feature = "compact")]
19pub use compact::*;
20
21use error::Cause;
22pub mod error;
23enum TypeOpt<S, R>
24where
25 S: Send,
26 R: Send,
27{
28 Channel(S),
29 Success(R),
30 Error(Cause),
31 None,
32}
33
34impl<S, R> Default for TypeOpt<S, R>
35where
36 S: Send,
37 R: Send,
38{
39 fn default() -> Self {
40 Self::None
41 }
42}
43
44impl<S, R> Debug for TypeOpt<S, R>
45where
46 S: Send + Debug,
47 R: Send + Debug,
48{
49 fn fmt(&self, f: &mut Formatter<'_>) -> fmt::Result {
50 match self {
51 Self::Channel(s) => f.debug_tuple("Channel").field(s).finish(),
52 Self::Success(r) => f.debug_tuple("Success").field(r).finish(),
53 Self::Error(e) => f.debug_tuple("Error").field(e).finish(),
54 Self::None => write!(f, "None"),
55 }
56 }
57}
58
59impl<S, R> TypeOpt<S, R>
60where
61 S: Send,
62 R: Send,
63{
64 fn take(&mut self) -> Self {
65 mem::take(self)
66 }
67}
68
69struct InnerState<S, R>
70where
71 S: Send,
72 R: Send,
73{
74 activated: AtomicBool,
75 result_ready: AtomicBool,
76 channel_present: AtomicBool,
77 mtx: Mutex<TypeOpt<S, R>>,
78 cvar: Condvar,
79 canceled: AtomicBool,
80}
81
82impl<S, R> Debug for InnerState<S, R>
83where
84 S: Send + Debug,
85 R: Send + Debug,
86{
87 fn fmt(&self, f: &mut Formatter<'_>) -> fmt::Result {
88 f.debug_struct("InnerState")
89 .field("result_ready", &self.result_ready)
90 .field("channel_present", &self.channel_present)
91 .field("mtx", &self.mtx)
92 .field("cvar", &self.cvar)
93 .field("canceled", &self.canceled)
94 .field("activated", &self.activated)
95 .finish()
96 }
97}
98
99impl<S, R> Drop for InnerState<S, R>
100where
101 S: Send,
102 R: Send,
103{
104 fn drop(&mut self) {}
105}
106
107pub struct FlowerState<S, R>
109where
110 S: Send,
111 R: Send,
112{
113 state: Arc<InnerState<S, R>>,
114 async_suspender: Arc<(Mutex<Option<Waker>>, AtomicBool)>,
115 id: usize,
116}
117
118impl<S, R> Debug for FlowerState<S, R>
119where
120 S: Send + Debug,
121 R: Send + Debug,
122{
123 fn fmt(&self, f: &mut Formatter<'_>) -> fmt::Result {
124 f.debug_struct("FlowerState")
125 .field("state", &self.state)
126 .field("async_suspender", &self.async_suspender)
127 .field("id", &self.id)
128 .finish()
129 }
130}
131
132impl<S, R> FlowerState<S, R>
133where
134 S: Send,
135 R: Send,
136{
137 pub fn id(&self) -> usize {
139 self.id
140 }
141
142 pub fn cancel(&self) {
146 self.state.canceled.store(true, Ordering::Relaxed);
147 }
148
149 pub fn is_canceled(&self) -> bool {
151 self.state.canceled.load(Ordering::Relaxed)
152 }
153
154 pub fn is_active(&self) -> bool {
156 self.state.activated.load(Ordering::Relaxed)
157 }
158}
159
160impl<S, R> Clone for FlowerState<S, R>
161where
162 S: Send,
163 R: Send,
164{
165 fn clone(&self) -> Self {
166 Self {
167 state: Clone::clone(&self.state),
168 async_suspender: Clone::clone(&self.async_suspender),
169 id: self.id,
170 }
171 }
172}
173
174impl<S, R> Drop for FlowerState<S, R>
175where
176 S: Send,
177 R: Send,
178{
179 fn drop(&mut self) {}
180}
181
182struct AsyncSuspender {
183 inner: Arc<(Mutex<Option<Waker>>, AtomicBool)>,
184}
185
186impl Future for AsyncSuspender {
187 type Output = ();
188 fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
189 let mut mtx = self.inner.0.lock().unwrap();
190 if !self.inner.1.load(Ordering::Relaxed) {
191 Poll::Ready(())
192 } else {
193 *mtx = Some(cx.waker().clone());
194 Poll::Pending
195 }
196 }
197}
198
199pub struct Handle<S, R>
201where
202 S: Send,
203 R: Send,
204{
205 state: Arc<InnerState<S, R>>,
206 async_suspender: Arc<(Mutex<Option<Waker>>, AtomicBool)>,
207 id: usize,
208}
209
210impl<S, R> Handle<S, R>
211where
212 S: Send,
213 R: Send,
214{
215 pub fn id(&self) -> usize {
217 self.id
218 }
219
220 pub fn activate(&self) {
222 self.state.activated.store(true, Ordering::Relaxed);
223 }
224
225 pub fn is_active(&self) -> bool {
227 self.state.activated.load(Ordering::Relaxed)
228 }
229
230 pub fn should_cancel(&self) -> bool {
232 self.state.canceled.load(Ordering::Relaxed)
233 }
234
235 pub fn send(&self, s: S) {
237 let mut mtx = self.state.mtx.lock().unwrap();
238 {
239 *mtx = TypeOpt::Channel(s);
240 self.state.channel_present.store(true, Ordering::Relaxed);
241 self.async_suspender.1.store(false, Ordering::Relaxed);
242 }
243 drop(self.state.cvar.wait(mtx));
244 }
245
246 pub async fn send_async(&self, s: S) {
248 {
249 *self.state.mtx.lock().unwrap() = TypeOpt::Channel(s);
250 self.async_suspender.1.store(true, Ordering::Relaxed);
251 self.state.channel_present.store(true, Ordering::Relaxed);
252 }
253 AsyncSuspender {
254 inner: self.async_suspender.clone(),
255 }
256 .await
257 }
258
259 pub fn set_result(&self, r: Result<R, Box<dyn Error>>) {
263 match r {
264 Ok(val) => self.success(val),
265 Err(e) => self.error_verbose(e),
266 }
267 }
268
269 pub fn set_result_no_verbose(&self, r: Result<R, Box<dyn Error>>) {
271 match r {
272 Ok(val) => self.success(val),
273 Err(e) => self.error(e),
274 }
275 }
276
277 pub fn success(&self, r: R) {
279 *self.state.mtx.lock().unwrap() = TypeOpt::Success(r);
280 self.state.result_ready.store(true, Ordering::Relaxed);
281 }
282
283 pub fn error(&self, e: impl ToString) {
285 *self.state.mtx.lock().unwrap() = TypeOpt::Error(Cause::Suppose(e.to_string()));
286 self.state.result_ready.store(true, Ordering::Relaxed);
287 }
288
289 pub fn error_verbose(&self, e: Box<dyn Error>) {
291 let err_kind = format!("{:?}", e);
292 *self.state.mtx.lock().unwrap() = TypeOpt::Error(Cause::Suppose(err_kind));
293 self.state.result_ready.store(true, Ordering::Relaxed);
294 }
295}
296
297impl<S, R> Drop for Handle<S, R>
298where
299 S: Send,
300 R: Send,
301{
302 fn drop(&mut self) {
303 if thread::panicking() && !self.state.result_ready.load(Ordering::Relaxed) {
304 self.state.channel_present.store(false, Ordering::Relaxed);
305 let err = format!("the flower handle with id: {} error panicked!", self.id);
306 *self.state.mtx.lock().unwrap() = TypeOpt::Error(Cause::Panicked(err));
307 self.state.result_ready.store(true, Ordering::Relaxed);
308 }
309 }
310}
311
312impl<S, R> Debug for Handle<S, R>
313where
314 S: Send + Debug,
315 R: Send + Debug,
316{
317 fn fmt(&self, f: &mut Formatter<'_>) -> fmt::Result {
318 f.debug_struct("Handle")
319 .field("state", &self.state)
320 .field("awaiting", &self.async_suspender)
321 .field("id", &self.id)
322 .finish()
323 }
324}
325
326pub enum Finalizer<'a, S: Send, R: Send> {
327 Try(&'a Flower<S, R>),
328}
329
330impl<S, R> Finalizer<'_, S, R>
331where
332 S: Send,
333 R: Send,
334{
335 pub fn finalize(self, f: impl FnOnce(Result<R, Cause>)) {
337 let Self::Try(flower) = self;
338 if flower.state.result_ready.load(Ordering::Relaxed) {
339 let result = move || {
340 let result = flower.state.mtx.lock().unwrap().take();
341 flower.state.result_ready.store(false, Ordering::Relaxed);
342 flower.state.activated.store(false, Ordering::Relaxed);
343 result
344 };
345 let result = result();
346 if let TypeOpt::Success(value) = result {
347 f(Ok(value))
348 } else if let TypeOpt::Error(err_type) = result {
349 f(Err(err_type))
350 }
351 }
352 }
353}
354
355pub struct Flower<S, R>
430where
431 S: Send,
432 R: Send,
433{
434 state: Arc<InnerState<S, R>>,
435 async_suspender: Arc<(Mutex<Option<Waker>>, AtomicBool)>,
436 id: usize,
437}
438
439impl<S, R> Flower<S, R>
440where
441 S: Send,
442 R: Send,
443{
444 pub fn new(id: usize) -> Self {
445 Self {
446 state: Arc::new(InnerState {
447 activated: AtomicBool::new(false),
448 result_ready: AtomicBool::new(false),
449 channel_present: AtomicBool::new(false),
450 mtx: Mutex::new(TypeOpt::None),
451 cvar: Condvar::new(),
452 canceled: AtomicBool::new(false),
453 }),
454 async_suspender: Arc::new((Mutex::new(None), AtomicBool::new(false))),
455 id,
456 }
457 }
458
459 pub fn id(&self) -> usize {
461 self.id
462 }
463
464 pub fn handle(&self) -> Handle<S, R> {
466 self.state.canceled.store(false, Ordering::Relaxed);
467 Handle {
468 state: Clone::clone(&self.state),
469 async_suspender: Clone::clone(&self.async_suspender),
470 id: self.id,
471 }
472 }
473
474 pub fn state(&self) -> FlowerState<S, R> {
478 self.state.canceled.store(false, Ordering::Relaxed);
479 FlowerState {
480 state: Clone::clone(&self.state),
481 async_suspender: Clone::clone(&self.async_suspender),
482 id: self.id,
483 }
484 }
485
486 pub fn cancel(&self) {
490 self.state.canceled.store(true, Ordering::Relaxed);
491 }
492
493 pub fn is_canceled(&self) -> bool {
495 self.state.canceled.load(Ordering::Relaxed)
496 }
497
498 pub fn is_active(&self) -> bool {
500 self.state.activated.load(Ordering::Relaxed)
501 }
502
503 pub fn result_is_ready(&self) -> bool {
505 self.state.result_ready.load(Ordering::Relaxed)
506 }
507
508 pub fn channel_is_present(&self) -> bool {
510 self.state.channel_present.load(Ordering::Relaxed)
511 }
512
513 pub fn try_result(&self, f: impl FnOnce(Result<R, Cause>)) {
519 if self.state.channel_present.load(Ordering::Relaxed) {
520 self.state.cvar.notify_all();
521 self.state.channel_present.store(false, Ordering::Relaxed)
522 }
523 if self.state.result_ready.load(Ordering::Relaxed) {
524 let result = move || {
525 let result = self.state.mtx.lock().unwrap().take();
526 self.state.result_ready.store(false, Ordering::Relaxed);
527 self.state.activated.store(false, Ordering::Relaxed);
528 result
529 };
530 let result = result();
531 if let TypeOpt::Success(value) = result {
532 f(Ok(value))
533 } else if let TypeOpt::Error(err_type) = result {
534 f(Err(err_type))
535 }
536 }
537 }
538
539 pub fn extract(&self, f: impl FnOnce(S)) -> Finalizer<'_, S, R> {
543 if self.state.channel_present.load(Ordering::Relaxed) {
544 let channel = move || {
545 let channel = self.state.mtx.lock().unwrap().take();
546 self.state.channel_present.store(false, Ordering::Relaxed);
547 if self.async_suspender.1.load(Ordering::Relaxed) {
548 let mut mg_opt_waker = self.async_suspender.0.lock().unwrap();
549 self.async_suspender.1.store(false, Ordering::Relaxed);
550 if let Some(waker) = mg_opt_waker.take() {
551 waker.wake();
552 }
553 } else {
554 self.state.cvar.notify_all();
555 }
556 channel
557 };
558
559 if let TypeOpt::Channel(value) = channel() {
560 f(value)
561 }
562 }
563
564 Finalizer::Try(self)
565 }
566
567 pub fn poll(&self, f: impl FnOnce(Option<S>)) -> Finalizer<'_, S, R> {
569 if self.state.channel_present.load(Ordering::Relaxed) {
570 let channel = move || {
571 let channel = self.state.mtx.lock().unwrap().take();
572 self.state.channel_present.store(false, Ordering::Relaxed);
573 if self.async_suspender.1.load(Ordering::Relaxed) {
574 let mut mg_opt_waker = self.async_suspender.0.lock().unwrap();
575 self.async_suspender.1.store(false, Ordering::Relaxed);
576 if let Some(waker) = mg_opt_waker.take() {
577 waker.wake();
578 }
579 } else {
580 self.state.cvar.notify_all();
581 }
582 if let TypeOpt::Channel(value) = channel {
583 Some(value)
584 } else {
585 None
586 }
587 };
588 let channel = channel();
589 f(channel)
590 } else {
591 f(None)
592 }
593
594 Finalizer::Try(self)
595 }
596}
597
598impl<S, R> Debug for Flower<S, R>
599where
600 S: Send + Debug,
601 R: Send + Debug,
602{
603 fn fmt(&self, f: &mut Formatter<'_>) -> fmt::Result {
604 f.debug_struct("Flower")
605 .field("state", &self.state)
606 .field("async_suspender", &self.async_suspender)
607 .field("id", &self.id)
608 .finish()
609 }
610}
611
612impl<S, R> Drop for Flower<S, R>
613where
614 S: Send,
615 R: Send,
616{
617 fn drop(&mut self) {}
618}
619
620pub trait IntoResult<T> {
622 fn catch(self, error_msg: impl ToString) -> Result<T, Box<dyn Error>>;
624}
625
626impl<T> IntoResult<T> for Option<T> {
627 fn catch(self, error_msg: impl ToString) -> Result<T, Box<dyn Error>> {
628 let message: String = error_msg.to_string();
629 match self {
630 Some(val) => Ok(val),
631 None => Err(message.into()),
632 }
633 }
634}