monolake_services/common/cancel/
mod.rs

1use std::{
2    cell::UnsafeCell,
3    future::Future,
4    rc::{Rc, Weak},
5    task::Waker,
6};
7
8use linked_list::LinkedList;
9
10pub mod linked_list;
11
12struct CancelHandler {
13    cancelled: bool,
14    waiters: LinkedList<Waker>,
15}
16
17#[derive(Clone)]
18pub struct Canceller {
19    handler: Rc<UnsafeCell<CancelHandler>>,
20}
21
22impl Default for Canceller {
23    #[inline]
24    fn default() -> Self {
25        Self::new()
26    }
27}
28
29impl Canceller {
30    pub fn new() -> Self {
31        Self {
32            handler: Rc::new(UnsafeCell::new(CancelHandler {
33                cancelled: false,
34                waiters: LinkedList::new(),
35            })),
36        }
37    }
38
39    pub fn waiter(&self) -> Waiter {
40        Waiter {
41            index: UnsafeCell::new(None),
42            handler: Rc::downgrade(&self.handler),
43        }
44    }
45
46    pub fn cancel(&self) {
47        let handler = unsafe { &mut *self.handler.get() };
48        if !handler.cancelled {
49            handler.cancelled = true;
50            let waiters: LinkedList<Waker> =
51                std::mem::replace(&mut handler.waiters, LinkedList::new());
52            for waker in waiters.into_iter() {
53                waker.wake();
54            }
55        }
56    }
57
58    pub const fn dropper(self) -> CancellerDropper {
59        CancellerDropper(self)
60    }
61}
62
63pub struct CancellerDropper(Canceller);
64
65impl Drop for CancellerDropper {
66    fn drop(&mut self) {
67        self.0.cancel();
68    }
69}
70
71pub struct Waiter {
72    index: UnsafeCell<Option<usize>>,
73    handler: Weak<UnsafeCell<CancelHandler>>,
74}
75
76impl Clone for Waiter {
77    fn clone(&self) -> Self {
78        Self {
79            index: UnsafeCell::new(None),
80            handler: self.handler.clone(),
81        }
82    }
83}
84
85impl Waiter {
86    pub fn cancelled(&self) -> bool {
87        self.handler
88            .upgrade()
89            .map_or(true, |handler| unsafe { &*handler.get() }.cancelled)
90    }
91}
92
93impl Future for Waiter {
94    type Output = ();
95
96    fn poll(
97        self: std::pin::Pin<&mut Self>,
98        cx: &mut std::task::Context<'_>,
99    ) -> std::task::Poll<Self::Output> {
100        let handler = match self.handler.upgrade() {
101            Some(handler) => handler,
102            None => return std::task::Poll::Ready(()),
103        };
104        let handler = unsafe { &mut *handler.get() };
105        if handler.cancelled {
106            return std::task::Poll::Ready(());
107        }
108        match unsafe { *self.index.get() } {
109            Some(idx) => {
110                let val = handler.waiters.get_mut(idx).unwrap();
111                val.clone_from(cx.waker());
112            }
113            None => {
114                let index = handler.waiters.push_back(cx.waker().clone());
115                unsafe { *self.index.get() = Some(index) };
116            }
117        }
118        std::task::Poll::Pending
119    }
120}
121
122impl Drop for Waiter {
123    fn drop(&mut self) {
124        if let Some(index) = unsafe { *self.index.get() } {
125            if let Some(handler) = self.handler.upgrade() {
126                let handler = unsafe { &mut *handler.get() };
127                if !handler.cancelled {
128                    handler.waiters.remove(index);
129                }
130            }
131        }
132    }
133}