flowync/
lib.rs

1#![deny(unsafe_code)]
2use core::{
3    fmt::{self, Debug, Formatter},
4    future::Future,
5    pin::Pin,
6    sync::atomic::{AtomicBool, Ordering},
7    task::{Context, Poll, Waker},
8};
9use std::{
10    error::Error,
11    mem,
12    sync::{Condvar, Mutex},
13};
14use std::{sync::Arc, thread};
15
16#[cfg(feature = "compact")]
17mod compact;
18#[cfg(feature = "compact")]
19pub use compact::*;
20
21use error::Cause;
22pub mod error;
23enum TypeOpt<S, R>
24where
25    S: Send,
26    R: Send,
27{
28    Channel(S),
29    Success(R),
30    Error(Cause),
31    None,
32}
33
34impl<S, R> Default for TypeOpt<S, R>
35where
36    S: Send,
37    R: Send,
38{
39    fn default() -> Self {
40        Self::None
41    }
42}
43
44impl<S, R> Debug for TypeOpt<S, R>
45where
46    S: Send + Debug,
47    R: Send + Debug,
48{
49    fn fmt(&self, f: &mut Formatter<'_>) -> fmt::Result {
50        match self {
51            Self::Channel(s) => f.debug_tuple("Channel").field(s).finish(),
52            Self::Success(r) => f.debug_tuple("Success").field(r).finish(),
53            Self::Error(e) => f.debug_tuple("Error").field(e).finish(),
54            Self::None => write!(f, "None"),
55        }
56    }
57}
58
59impl<S, R> TypeOpt<S, R>
60where
61    S: Send,
62    R: Send,
63{
64    fn take(&mut self) -> Self {
65        mem::take(self)
66    }
67}
68
69struct InnerState<S, R>
70where
71    S: Send,
72    R: Send,
73{
74    activated: AtomicBool,
75    result_ready: AtomicBool,
76    channel_present: AtomicBool,
77    mtx: Mutex<TypeOpt<S, R>>,
78    cvar: Condvar,
79    canceled: AtomicBool,
80}
81
82impl<S, R> Debug for InnerState<S, R>
83where
84    S: Send + Debug,
85    R: Send + Debug,
86{
87    fn fmt(&self, f: &mut Formatter<'_>) -> fmt::Result {
88        f.debug_struct("InnerState")
89            .field("result_ready", &self.result_ready)
90            .field("channel_present", &self.channel_present)
91            .field("mtx", &self.mtx)
92            .field("cvar", &self.cvar)
93            .field("canceled", &self.canceled)
94            .field("activated", &self.activated)
95            .finish()
96    }
97}
98
99impl<S, R> Drop for InnerState<S, R>
100where
101    S: Send,
102    R: Send,
103{
104    fn drop(&mut self) {}
105}
106
107/// State of the `Flower`
108pub struct FlowerState<S, R>
109where
110    S: Send,
111    R: Send,
112{
113    state: Arc<InnerState<S, R>>,
114    async_suspender: Arc<(Mutex<Option<Waker>>, AtomicBool)>,
115    id: usize,
116}
117
118impl<S, R> Debug for FlowerState<S, R>
119where
120    S: Send + Debug,
121    R: Send + Debug,
122{
123    fn fmt(&self, f: &mut Formatter<'_>) -> fmt::Result {
124        f.debug_struct("FlowerState")
125            .field("state", &self.state)
126            .field("async_suspender", &self.async_suspender)
127            .field("id", &self.id)
128            .finish()
129    }
130}
131
132impl<S, R> FlowerState<S, R>
133where
134    S: Send,
135    R: Send,
136{
137    /// Get ID of the `Flower`.
138    pub fn id(&self) -> usize {
139        self.id
140    }
141
142    /// Cancel `Flower`.
143    ///
144    /// will do nothing if not explicitly configured on the `Handle`.
145    pub fn cancel(&self) {
146        self.state.canceled.store(true, Ordering::Relaxed);
147    }
148
149    /// Check if the `Flower` is canceled
150    pub fn is_canceled(&self) -> bool {
151        self.state.canceled.load(Ordering::Relaxed)
152    }
153
154    /// Check if the current `Flower` is active
155    pub fn is_active(&self) -> bool {
156        self.state.activated.load(Ordering::Relaxed)
157    }
158}
159
160impl<S, R> Clone for FlowerState<S, R>
161where
162    S: Send,
163    R: Send,
164{
165    fn clone(&self) -> Self {
166        Self {
167            state: Clone::clone(&self.state),
168            async_suspender: Clone::clone(&self.async_suspender),
169            id: self.id,
170        }
171    }
172}
173
174impl<S, R> Drop for FlowerState<S, R>
175where
176    S: Send,
177    R: Send,
178{
179    fn drop(&mut self) {}
180}
181
182struct AsyncSuspender {
183    inner: Arc<(Mutex<Option<Waker>>, AtomicBool)>,
184}
185
186impl Future for AsyncSuspender {
187    type Output = ();
188    fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
189        let mut mtx = self.inner.0.lock().unwrap();
190        if !self.inner.1.load(Ordering::Relaxed) {
191            Poll::Ready(())
192        } else {
193            *mtx = Some(cx.waker().clone());
194            Poll::Pending
195        }
196    }
197}
198
199/// A handle for the Flower
200pub struct Handle<S, R>
201where
202    S: Send,
203    R: Send,
204{
205    state: Arc<InnerState<S, R>>,
206    async_suspender: Arc<(Mutex<Option<Waker>>, AtomicBool)>,
207    id: usize,
208}
209
210impl<S, R> Handle<S, R>
211where
212    S: Send,
213    R: Send,
214{
215    /// Get ID of the `Flower`.
216    pub fn id(&self) -> usize {
217        self.id
218    }
219
220    /// Activate current `Flower`
221    pub fn activate(&self) {
222        self.state.activated.store(true, Ordering::Relaxed);
223    }
224
225    /// Check if the current `Flower` is active
226    pub fn is_active(&self) -> bool {
227        self.state.activated.load(Ordering::Relaxed)
228    }
229
230    /// Check if the current `Flower` should be canceled
231    pub fn should_cancel(&self) -> bool {
232        self.state.canceled.load(Ordering::Relaxed)
233    }
234
235    /// Send current progress value
236    pub fn send(&self, s: S) {
237        let mut mtx = self.state.mtx.lock().unwrap();
238        {
239            *mtx = TypeOpt::Channel(s);
240            self.state.channel_present.store(true, Ordering::Relaxed);
241            self.async_suspender.1.store(false, Ordering::Relaxed);
242        }
243        drop(self.state.cvar.wait(mtx));
244    }
245
246    /// Send current progress value asynchronously.
247    pub async fn send_async(&self, s: S) {
248        {
249            *self.state.mtx.lock().unwrap() = TypeOpt::Channel(s);
250            self.async_suspender.1.store(true, Ordering::Relaxed);
251            self.state.channel_present.store(true, Ordering::Relaxed);
252        }
253        AsyncSuspender {
254            inner: self.async_suspender.clone(),
255        }
256        .await
257    }
258
259    /// Set `Result` value with verboser error message.
260    ///
261    /// (for more easier to keep in track with the real cause of the error)
262    pub fn set_result(&self, r: Result<R, Box<dyn Error>>) {
263        match r {
264            Ok(val) => self.success(val),
265            Err(e) => self.error_verbose(e),
266        }
267    }
268
269    /// Set `Result` value with no verbose (simpler error message)
270    pub fn set_result_no_verbose(&self, r: Result<R, Box<dyn Error>>) {
271        match r {
272            Ok(val) => self.success(val),
273            Err(e) => self.error(e),
274        }
275    }
276
277    /// Set the `Ok` value of the `Result`.
278    pub fn success(&self, r: R) {
279        *self.state.mtx.lock().unwrap() = TypeOpt::Success(r);
280        self.state.result_ready.store(true, Ordering::Relaxed);
281    }
282
283    /// Set the `Err` value of the `Result`.
284    pub fn error(&self, e: impl ToString) {
285        *self.state.mtx.lock().unwrap() = TypeOpt::Error(Cause::Suppose(e.to_string()));
286        self.state.result_ready.store(true, Ordering::Relaxed);
287    }
288
289    /// Set the `Err` value of the `Result` with more verboser error message.
290    pub fn error_verbose(&self, e: Box<dyn Error>) {
291        let err_kind = format!("{:?}", e);
292        *self.state.mtx.lock().unwrap() = TypeOpt::Error(Cause::Suppose(err_kind));
293        self.state.result_ready.store(true, Ordering::Relaxed);
294    }
295}
296
297impl<S, R> Drop for Handle<S, R>
298where
299    S: Send,
300    R: Send,
301{
302    fn drop(&mut self) {
303        if thread::panicking() && !self.state.result_ready.load(Ordering::Relaxed) {
304            self.state.channel_present.store(false, Ordering::Relaxed);
305            let err = format!("the flower handle with id: {} error panicked!", self.id);
306            *self.state.mtx.lock().unwrap() = TypeOpt::Error(Cause::Panicked(err));
307            self.state.result_ready.store(true, Ordering::Relaxed);
308        }
309    }
310}
311
312impl<S, R> Debug for Handle<S, R>
313where
314    S: Send + Debug,
315    R: Send + Debug,
316{
317    fn fmt(&self, f: &mut Formatter<'_>) -> fmt::Result {
318        f.debug_struct("Handle")
319            .field("state", &self.state)
320            .field("awaiting", &self.async_suspender)
321            .field("id", &self.id)
322            .finish()
323    }
324}
325
326pub enum Finalizer<'a, S: Send, R: Send> {
327    Try(&'a Flower<S, R>),
328}
329
330impl<S, R> Finalizer<'_, S, R>
331where
332    S: Send,
333    R: Send,
334{
335    /// Try finalize `Result` of the `Flower` (this fn will be called if only `Result` is available).
336    pub fn finalize(self, f: impl FnOnce(Result<R, Cause>)) {
337        let Self::Try(flower) = self;
338        if flower.state.result_ready.load(Ordering::Relaxed) {
339            let result = move || {
340                let result = flower.state.mtx.lock().unwrap().take();
341                flower.state.result_ready.store(false, Ordering::Relaxed);
342                flower.state.activated.store(false, Ordering::Relaxed);
343                result
344            };
345            let result = result();
346            if let TypeOpt::Success(value) = result {
347                f(Ok(value))
348            } else if let TypeOpt::Error(err_type) = result {
349                f(Err(err_type))
350            }
351        }
352    }
353}
354
355/// Flow loosely and gracefully.
356///
357/// Where:
358///
359/// `S` = type of the sender spsc channel value
360///
361/// `R` = type of `Ok` value of the `Result` (`Result<R, Cause>`) and `Cause` is the `Error` cause.
362///
363/// # Quick Example:
364///
365///```
366/// use flowync::{error::{Cause, IOError}, Flower};
367/// type TestFlower = Flower<u32, String>;
368///
369/// fn fetch_things(id: usize) -> Result<String, IOError> {
370///     let result =
371///         Ok::<String, IOError>(format!("the flower with id: {} successfully completed fetching.", id));
372///     let success = result?;
373///     Ok(success)
374/// }
375///
376/// fn main() {
377///     let flower: TestFlower = Flower::new(1);
378///     std::thread::spawn({
379///         let handle = flower.handle();
380///         // Activate
381///         handle.activate();
382///         move || {
383///             for i in 0..10 {
384///                 // Send current value through channel, will block the spawned thread
385///                 // until the option value successfully being polled in the main thread.
386///                 handle.send(i);
387///                 // or handle.send_async(i).await; can be used from any multithreaded async runtime,
388///             }
389///             let result = fetch_things(handle.id());
390///             // Set result and then extract later.
391///             handle.set_result(result)
392///         }
393///     });
394///
395///     let mut exit = false;
396///
397///     loop {
398///         // Check if the flower is_active()
399///         // and will deactivate itself if the result value successfully received.
400///         if flower.is_active() {
401///             // another logic goes here...
402///             // e.g:
403///             // notify_loading_fn();
404///
405///             flower
406///                 .extract(|value| println!("{}", value))
407///                 .finalize(|result| {
408///                     match result {
409///                         Ok(value) => println!("{}", value),
410///                         Err(Cause::Suppose(msg)) => {
411///                             println!("{}", msg)
412///                         }
413///                         Err(Cause::Panicked(_msg)) => {
414///                             // Handle things if stuff unexpectedly panicked at runtime.
415///                         }
416///                     }
417///
418///                     // Exit if finalized
419///                     exit = true;
420///                 });
421///         }
422///
423///         if exit {
424///             break;
425///         }
426///     }
427/// }
428/// ```
429pub struct Flower<S, R>
430where
431    S: Send,
432    R: Send,
433{
434    state: Arc<InnerState<S, R>>,
435    async_suspender: Arc<(Mutex<Option<Waker>>, AtomicBool)>,
436    id: usize,
437}
438
439impl<S, R> Flower<S, R>
440where
441    S: Send,
442    R: Send,
443{
444    pub fn new(id: usize) -> Self {
445        Self {
446            state: Arc::new(InnerState {
447                activated: AtomicBool::new(false),
448                result_ready: AtomicBool::new(false),
449                channel_present: AtomicBool::new(false),
450                mtx: Mutex::new(TypeOpt::None),
451                cvar: Condvar::new(),
452                canceled: AtomicBool::new(false),
453            }),
454            async_suspender: Arc::new((Mutex::new(None), AtomicBool::new(false))),
455            id,
456        }
457    }
458
459    /// Get the ID.
460    pub fn id(&self) -> usize {
461        self.id
462    }
463
464    /// Get the handle.
465    pub fn handle(&self) -> Handle<S, R> {
466        self.state.canceled.store(false, Ordering::Relaxed);
467        Handle {
468            state: Clone::clone(&self.state),
469            async_suspender: Clone::clone(&self.async_suspender),
470            id: self.id,
471        }
472    }
473
474    /// Get the state
475    ///
476    /// Since `Flower` itself is uncloneable to avoid data races, this is an alternative `fn` for `self.clone()`
477    pub fn state(&self) -> FlowerState<S, R> {
478        self.state.canceled.store(false, Ordering::Relaxed);
479        FlowerState {
480            state: Clone::clone(&self.state),
481            async_suspender: Clone::clone(&self.async_suspender),
482            id: self.id,
483        }
484    }
485
486    /// Cancel `Flower`.
487    ///
488    /// will do nothing if not explicitly configured on the `Handle`.
489    pub fn cancel(&self) {
490        self.state.canceled.store(true, Ordering::Relaxed);
491    }
492
493    /// Check if the `Flower` is canceled
494    pub fn is_canceled(&self) -> bool {
495        self.state.canceled.load(Ordering::Relaxed)
496    }
497
498    /// Check if the current `Flower` is active
499    pub fn is_active(&self) -> bool {
500        self.state.activated.load(Ordering::Relaxed)
501    }
502
503    /// Check if `Result` value of the `Flower` is ready
504    pub fn result_is_ready(&self) -> bool {
505        self.state.result_ready.load(Ordering::Relaxed)
506    }
507
508    /// Check if channel value of the `Flower` is present
509    pub fn channel_is_present(&self) -> bool {
510        self.state.channel_present.load(Ordering::Relaxed)
511    }
512
513    /// Try get the `Result` of the `Flower` and ignore channel value (if any).
514    ///
515    /// Note: (this fn will be called if only `Result` is available)
516    ///
517    /// **Warning!** don't use this fn if channel value is important, use `extract fn` and then use `finalize fn` instead.
518    pub fn try_result(&self, f: impl FnOnce(Result<R, Cause>)) {
519        if self.state.channel_present.load(Ordering::Relaxed) {
520            self.state.cvar.notify_all();
521            self.state.channel_present.store(false, Ordering::Relaxed)
522        }
523        if self.state.result_ready.load(Ordering::Relaxed) {
524            let result = move || {
525                let result = self.state.mtx.lock().unwrap().take();
526                self.state.result_ready.store(false, Ordering::Relaxed);
527                self.state.activated.store(false, Ordering::Relaxed);
528                result
529            };
530            let result = result();
531            if let TypeOpt::Success(value) = result {
532                f(Ok(value))
533            } else if let TypeOpt::Error(err_type) = result {
534                f(Err(err_type))
535            }
536        }
537    }
538
539    /// Try extract channel value of the `Flower` (this fn will be called if only channel value is available),
540    ///
541    /// and then `finalize` (must_use)
542    pub fn extract(&self, f: impl FnOnce(S)) -> Finalizer<'_, S, R> {
543        if self.state.channel_present.load(Ordering::Relaxed) {
544            let channel = move || {
545                let channel = self.state.mtx.lock().unwrap().take();
546                self.state.channel_present.store(false, Ordering::Relaxed);
547                if self.async_suspender.1.load(Ordering::Relaxed) {
548                    let mut mg_opt_waker = self.async_suspender.0.lock().unwrap();
549                    self.async_suspender.1.store(false, Ordering::Relaxed);
550                    if let Some(waker) = mg_opt_waker.take() {
551                        waker.wake();
552                    }
553                } else {
554                    self.state.cvar.notify_all();
555                }
556                channel
557            };
558
559            if let TypeOpt::Channel(value) = channel() {
560                f(value)
561            }
562        }
563
564        Finalizer::Try(self)
565    }
566
567    /// Poll channel value of the `Flower`, and then `finalize` (must_use)
568    pub fn poll(&self, f: impl FnOnce(Option<S>)) -> Finalizer<'_, S, R> {
569        if self.state.channel_present.load(Ordering::Relaxed) {
570            let channel = move || {
571                let channel = self.state.mtx.lock().unwrap().take();
572                self.state.channel_present.store(false, Ordering::Relaxed);
573                if self.async_suspender.1.load(Ordering::Relaxed) {
574                    let mut mg_opt_waker = self.async_suspender.0.lock().unwrap();
575                    self.async_suspender.1.store(false, Ordering::Relaxed);
576                    if let Some(waker) = mg_opt_waker.take() {
577                        waker.wake();
578                    }
579                } else {
580                    self.state.cvar.notify_all();
581                }
582                if let TypeOpt::Channel(value) = channel {
583                    Some(value)
584                } else {
585                    None
586                }
587            };
588            let channel = channel();
589            f(channel)
590        } else {
591            f(None)
592        }
593
594        Finalizer::Try(self)
595    }
596}
597
598impl<S, R> Debug for Flower<S, R>
599where
600    S: Send + Debug,
601    R: Send + Debug,
602{
603    fn fmt(&self, f: &mut Formatter<'_>) -> fmt::Result {
604        f.debug_struct("Flower")
605            .field("state", &self.state)
606            .field("async_suspender", &self.async_suspender)
607            .field("id", &self.id)
608            .finish()
609    }
610}
611
612impl<S, R> Drop for Flower<S, R>
613where
614    S: Send,
615    R: Send,
616{
617    fn drop(&mut self) {}
618}
619
620/// A converter to convert `Option<T>` into `Result<T, E>` using `catch` fn.
621pub trait IntoResult<T> {
622    /// Convert `Option<T>` into `Result<T, E>`
623    fn catch(self, error_msg: impl ToString) -> Result<T, Box<dyn Error>>;
624}
625
626impl<T> IntoResult<T> for Option<T> {
627    fn catch(self, error_msg: impl ToString) -> Result<T, Box<dyn Error>> {
628        let message: String = error_msg.to_string();
629        match self {
630            Some(val) => Ok(val),
631            None => Err(message.into()),
632        }
633    }
634}