1use std::io;
2use std::sync::atomic::{AtomicUsize, Ordering};
3use std::sync::Arc;
4use std::thread;
5
6use crate::coroutine_impl::CoroutineImpl;
7use crate::io::cancel::CancelIoImpl;
8use crate::scheduler::get_scheduler;
9use crate::std::sync::AtomicOption;
10use crate::yield_now::{get_co_para, set_co_para};
11use generator::Error;
12
13#[inline]
19pub fn trigger_cancel_panic() -> ! {
20 if thread::panicking() {
21 eprintln!("trigger another panic while panicking");
22 }
23
24 std::panic::panic_any(Error::Cancel);
29}
30
31pub trait CancelIo {
32 type Data;
33 fn new() -> Self;
34 fn set(&self, _: Self::Data);
35 fn clear(&self);
36 fn cancel(&self) -> Result<(),std::io::Error>;
37}
38
39pub struct CancelImpl<T: CancelIo> {
41 state: AtomicUsize,
44 io: T,
46 co: AtomicOption<Arc<AtomicOption<CoroutineImpl>>>,
50}
51
52impl<T: CancelIo> Default for CancelImpl<T> {
53 fn default() -> Self {
54 Self::new()
55 }
56}
57
58impl<T: CancelIo> CancelImpl<T> {
60 pub fn new() -> Self {
61 CancelImpl {
62 state: AtomicUsize::new(0),
63 io: T::new(),
64 co: AtomicOption::none(),
65 }
66 }
67
68 pub fn is_canceled(&self) -> bool {
70 self.state.load(Ordering::Acquire) == 1
71 }
72
73 pub fn is_disabled(&self) -> bool {
75 self.state.load(Ordering::Acquire) >= 2
76 }
77
78 pub fn disable_cancel(&self) {
80 self.state.fetch_add(2, Ordering::Release);
81 }
82
83 pub fn enable_cancel(&self) {
85 self.state.fetch_sub(2, Ordering::Release);
86 }
87
88 pub fn check_cancel(&self) {
90 if self.state.load(Ordering::Acquire) == 1 {
91 {
92 get_co_para();
95 trigger_cancel_panic();
96 }
97 }
98 }
99
100 pub fn cancel(&self) -> Result<(),std::io::Error> {
102 self.state.fetch_or(1, Ordering::Release);
103 match self.co.take() {
104 Some(co) => {
105 co.take()
106 .map(|mut co| {
107 set_co_para(&mut co, io::Error::new(io::ErrorKind::Other, "Canceled"));
109 get_scheduler().schedule(co);
110 })
111 .unwrap_or(());
112 Ok(())
113 }
114 None => Ok(self.io.cancel()?),
115 }
116 }
117
118 #[cfg(unix)]
120 pub fn clear_cancel_bit(&self) {
121 self.state.fetch_and(!1, Ordering::Release);
122 }
123
124 pub fn set_io(&self, io: T::Data) {
127 self.io.set(io)
128 }
129
130 pub fn set_co(&self, co: Arc<AtomicOption<CoroutineImpl>>) {
133 self.co.swap(co);
134 }
135
136 pub fn clear(&self) {
139 if self.co.take().is_none() {
140 self.io.clear();
141 }
142 }
143}
144
145pub type Cancel = CancelImpl<CancelIoImpl>;