1use serde::{Deserialize, Serialize};
2use std::{
3 collections::{HashMap, HashSet},
4 sync::{Arc, Mutex},
5 task::{Context, RawWaker, RawWakerVTable, Waker},
6};
7
8#[derive(Debug, Clone, Serialize, Deserialize, Copy, PartialEq, Eq, PartialOrd, Ord, Hash)]
9pub enum InterestType {
10 Readable,
11 Writable,
12 Closed,
13 Error,
14}
15
16#[derive(Debug)]
17pub struct WakerInterestHandler {
18 set: HashSet<InterestType>,
19 waker: Waker,
20}
21impl WakerInterestHandler {
22 pub fn new(waker: &Waker) -> Box<Self> {
23 Box::new(WakerInterestHandler {
24 set: Default::default(),
25 waker: waker.clone(),
26 })
27 }
28}
29impl InterestHandler for WakerInterestHandler {
30 fn push_interest(&mut self, interest: InterestType) {
31 self.set.insert(interest);
32 self.waker.wake_by_ref();
33 }
34
35 fn pop_interest(&mut self, interest: InterestType) -> bool {
36 self.set.remove(&interest)
37 }
38
39 fn has_interest(&self, interest: InterestType) -> bool {
40 self.set.contains(&interest)
41 }
42}
43
44#[derive(Debug, Clone)]
45pub struct SharedWakerInterestHandler {
46 inner: Arc<Mutex<Box<WakerInterestHandler>>>,
47}
48impl SharedWakerInterestHandler {
49 pub fn new(waker: &Waker) -> Box<Self> {
50 Box::new(Self {
51 inner: Arc::new(Mutex::new(WakerInterestHandler::new(waker))),
52 })
53 }
54}
55impl InterestHandler for SharedWakerInterestHandler {
56 fn push_interest(&mut self, interest: InterestType) {
57 let mut inner = self.inner.lock().unwrap();
58 inner.push_interest(interest);
59 }
60
61 fn pop_interest(&mut self, interest: InterestType) -> bool {
62 let mut inner = self.inner.lock().unwrap();
63 inner.pop_interest(interest)
64 }
65
66 fn has_interest(&self, interest: InterestType) -> bool {
67 let inner = self.inner.lock().unwrap();
68 inner.has_interest(interest)
69 }
70}
71
72pub trait InterestHandler: Send + Sync + std::fmt::Debug {
73 fn push_interest(&mut self, interest: InterestType);
74
75 fn pop_interest(&mut self, interest: InterestType) -> bool;
76
77 fn has_interest(&self, interest: InterestType) -> bool;
78}
79
80impl From<&Waker> for Box<dyn InterestHandler + Send + Sync> {
81 fn from(waker: &Waker) -> Self {
82 WakerInterestHandler::new(waker)
83 }
84}
85
86impl From<&Context<'_>> for Box<dyn InterestHandler + Send + Sync> {
87 fn from(cx: &Context) -> Self {
88 cx.waker().into()
89 }
90}
91
92pub fn handler_into_waker(
93 handler: Box<dyn InterestHandler + Send + Sync>,
94 interest: InterestType,
95) -> Arc<InterestHandlerWaker> {
96 Arc::new(InterestHandlerWaker {
97 handler: Arc::new(Mutex::new(handler)),
98 interest,
99 })
100}
101
102#[derive(Debug, Clone)]
103pub struct InterestHandlerWaker {
104 handler: Arc<Mutex<Box<dyn InterestHandler + Send + Sync>>>,
105 interest: InterestType,
106}
107impl InterestHandlerWaker {
108 pub fn wake_now(&self) {
109 let mut handler = self.handler.lock().unwrap();
110 handler.push_interest(self.interest);
111 }
112 pub fn set_interest(self: &Arc<Self>, interest: InterestType) -> Arc<Self> {
113 let mut next = self.as_ref().clone();
114 next.interest = interest;
115 Arc::new(next)
116 }
117 pub fn as_waker(self: &Arc<Self>) -> Waker {
118 let s: *const Self = Arc::into_raw(Arc::clone(self));
119 let raw_waker = RawWaker::new(s as *const (), &VTABLE);
120 unsafe { Waker::from_raw(raw_waker) }
121 }
122}
123
124fn handler_waker_wake(s: &InterestHandlerWaker) {
125 let waker_arc = unsafe { Arc::from_raw(s) };
126 waker_arc.wake_now();
127}
128
129fn handler_waker_clone(s: &InterestHandlerWaker) -> RawWaker {
130 let arc = unsafe { Arc::from_raw(s) };
131 std::mem::forget(arc.clone());
132 RawWaker::new(Arc::into_raw(arc) as *const (), &VTABLE)
133}
134
135const VTABLE: RawWakerVTable = unsafe {
136 RawWakerVTable::new(
137 |s| handler_waker_clone(&*(s as *const InterestHandlerWaker)), |s| handler_waker_wake(&*(s as *const InterestHandlerWaker)), |s| (*(s as *const InterestHandlerWaker)).wake_now(), |s| drop(Arc::from_raw(s as *const InterestHandlerWaker)), )
142};
143
144#[derive(Debug, Clone, Default)]
145struct InterestWakerMapState {
146 wakers: HashMap<InterestType, Vec<Waker>>,
147 triggered: HashSet<InterestType>,
148}
149
150#[derive(Debug, Clone, Default)]
151pub struct InterestWakerMap {
152 state: Arc<Mutex<InterestWakerMapState>>,
153}
154
155impl InterestWakerMap {
156 pub fn add(&self, interest: InterestType, waker: &Waker) {
157 let mut state = self.state.lock().unwrap();
158 let entries = state.wakers.entry(interest).or_default();
159 if !entries.iter().any(|w| w.will_wake(waker)) {
160 entries.push(waker.clone());
161 }
162 }
163
164 pub fn pop(&self, interest: InterestType) -> bool {
165 let mut state = self.state.lock().unwrap();
166 state.triggered.remove(&interest)
167 }
168
169 pub fn push(&self, interest: InterestType) -> bool {
170 let mut state = self.state.lock().unwrap();
171 state.triggered.insert(interest)
172 }
173}
174
175impl InterestHandler for InterestWakerMap {
176 fn push_interest(&mut self, interest: InterestType) {
177 let mut state = self.state.lock().unwrap();
178 if let Some(wakers) = state.wakers.remove(&interest) {
179 for waker in wakers {
180 waker.wake();
181 }
182 } else {
183 state.triggered.insert(interest);
184 }
185 }
186
187 fn pop_interest(&mut self, interest: InterestType) -> bool {
188 let mut state = self.state.lock().unwrap();
189 state.triggered.remove(&interest)
190 }
191
192 fn has_interest(&self, interest: InterestType) -> bool {
193 let state = self.state.lock().unwrap();
194 state.triggered.contains(&interest)
195 }
196}