richat_shared/
shutdown.rs1use {
2 crate::mutex_lock,
3 slab::Slab,
4 std::{
5 future::Future,
6 pin::Pin,
7 sync::{Arc, Mutex, MutexGuard},
8 task::{Context, Poll, Waker},
9 },
10};
11
12#[derive(Debug)]
13pub struct Shutdown {
14 state: Arc<Mutex<State>>,
15 index: usize,
16}
17
18impl Shutdown {
19 pub fn new() -> Self {
20 let mut state = State {
21 shutdown: false,
22 wakers: Slab::with_capacity(64),
23 };
24 let index = state.wakers.insert(None);
25
26 Self {
27 state: Arc::new(Mutex::new(state)),
28 index,
29 }
30 }
31
32 fn state_lock(&self) -> MutexGuard<'_, State> {
33 mutex_lock(&self.state)
34 }
35
36 pub fn shutdown(&self) {
37 let mut state = self.state_lock();
38 state.shutdown = true;
39 for (_index, value) in state.wakers.iter_mut() {
40 if let Some(waker) = value.take() {
41 waker.wake();
42 }
43 }
44 }
45
46 pub fn is_set(&self) -> bool {
47 self.state_lock().shutdown
48 }
49}
50
51impl Default for Shutdown {
52 fn default() -> Self {
53 Self::new()
54 }
55}
56
57impl Clone for Shutdown {
58 fn clone(&self) -> Self {
59 let mut state = self.state_lock();
60 let index = state.wakers.insert(None);
61
62 Self {
63 state: Arc::clone(&self.state),
64 index,
65 }
66 }
67}
68
69impl Drop for Shutdown {
70 fn drop(&mut self) {
71 let mut state = self.state_lock();
72 state.wakers.remove(self.index);
73 }
74}
75
76impl Future for Shutdown {
77 type Output = ();
78
79 fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
80 let me = self.as_ref().get_ref();
81 let mut state = me.state_lock();
82
83 if state.shutdown {
84 return Poll::Ready(());
85 }
86
87 state.wakers[self.index] = Some(cx.waker().clone());
88 Poll::Pending
89 }
90}
91
92#[derive(Debug)]
93struct State {
94 shutdown: bool,
95 wakers: Slab<Option<Waker>>,
96}