cogo/
cancel.rs

1use std::io;
2use std::sync::atomic::{AtomicUsize, Ordering};
3use std::sync::Arc;
4use std::thread;
5
6use crate::coroutine_impl::CoroutineImpl;
7use crate::io::cancel::CancelIoImpl;
8use crate::scheduler::get_scheduler;
9use crate::std::sync::AtomicOption;
10use crate::yield_now::{get_co_para, set_co_para};
11use generator::Error;
12
13// the cancel is implemented by triggering a Cancel panic
14// if drop is called due to a Cancel panic, it's not safe
15// to call Any coroutine API in the drop any more because
16// it would trigger another Cancel panic so here we check
17// the thread panicking status
18#[inline]
19pub fn trigger_cancel_panic() -> ! {
20    if thread::panicking() {
21        eprintln!("trigger another panic while panicking");
22    }
23
24    // should we clear the cancel flag to let other API continue?
25    // so that we can avoid the re-panic problem?
26    // currently this is not used in any drop implementation
27    // current_cancel_data().state.store(0, Ordering::Release);
28    std::panic::panic_any(Error::Cancel);
29}
30
31pub trait CancelIo {
32    type Data;
33    fn new() -> Self;
34    fn set(&self, _: Self::Data);
35    fn clear(&self);
36    fn cancel(&self) -> Result<(),std::io::Error>;
37}
38
39// each coroutine has it's own Cancel data
40pub struct CancelImpl<T: CancelIo> {
41    // first bit is used when need to cancel the coroutine
42    // higher bits are used to disable the cancel
43    state: AtomicUsize,
44    // the io data when the coroutine is suspended
45    io: T,
46    // other suspended type would register the co itself
47    // can't set io and co at the same time!
48    // most of the time this is park based API
49    co: AtomicOption<Arc<AtomicOption<CoroutineImpl>>>,
50}
51
52impl<T: CancelIo> Default for CancelImpl<T> {
53    fn default() -> Self {
54        Self::new()
55    }
56}
57
58// real io cancel impl is in io module
59impl<T: CancelIo> CancelImpl<T> {
60    pub fn new() -> Self {
61        CancelImpl {
62            state: AtomicUsize::new(0),
63            io: T::new(),
64            co: AtomicOption::none(),
65        }
66    }
67
68    // judge if the coroutine cancel flag is set
69    pub fn is_canceled(&self) -> bool {
70        self.state.load(Ordering::Acquire) == 1
71    }
72
73    // return if the coroutine cancel is disabled
74    pub fn is_disabled(&self) -> bool {
75        self.state.load(Ordering::Acquire) >= 2
76    }
77
78    // disable the cancel bit
79    pub fn disable_cancel(&self) {
80        self.state.fetch_add(2, Ordering::Release);
81    }
82
83    // enable the cancel bit again
84    pub fn enable_cancel(&self) {
85        self.state.fetch_sub(2, Ordering::Release);
86    }
87
88    // panic if cancel was set
89    pub fn check_cancel(&self) {
90        if self.state.load(Ordering::Acquire) == 1 {
91            {
92                // before panic clear the last coroutine error
93                // this would affect future new coroutine that reuse the instance
94                get_co_para();
95                trigger_cancel_panic();
96            }
97        }
98    }
99
100    // async cancel for a coroutine
101    pub fn cancel(&self) -> Result<(),std::io::Error> {
102        self.state.fetch_or(1, Ordering::Release);
103        match self.co.take() {
104            Some(co) => {
105                co.take()
106                    .map(|mut co| {
107                        // set the cancel result for the coroutine
108                        set_co_para(&mut co, io::Error::new(io::ErrorKind::Other, "Canceled"));
109                        get_scheduler().schedule(co);
110                    })
111                    .unwrap_or(());
112                Ok(())
113            }
114            None => Ok(self.io.cancel()?),
115        }
116    }
117
118    // clear the cancel bit so that we can reuse the cancel
119    #[cfg(unix)]
120    pub fn clear_cancel_bit(&self) {
121        self.state.fetch_and(!1, Ordering::Release);
122    }
123
124    // set the cancel io data
125    // should be called after register io request
126    pub fn set_io(&self, io: T::Data) {
127        self.io.set(io)
128    }
129
130    // set the cancel co data
131    // can't both set_io and set_co
132    pub fn set_co(&self, co: Arc<AtomicOption<CoroutineImpl>>) {
133        self.co.swap(co);
134    }
135
136    // clear the cancel io data
137    // should be called after io completion
138    pub fn clear(&self) {
139        if self.co.take().is_none() {
140            self.io.clear();
141        }
142    }
143}
144
145pub type Cancel = CancelImpl<CancelIoImpl>;