1use std::borrow::Borrow;
2use std::collections::BTreeMap;
3use std::fmt;
4use std::future::Future;
5use std::io;
6use std::marker::PhantomData;
7use std::mem;
8use std::panic;
9use std::pin::Pin;
10use std::sync::atomic::{AtomicUsize, Ordering};
11use std::sync::{Arc, Mutex, MutexGuard, OnceLock};
12use std::task::{ready, Context, Poll, Waker};
13use std::time::{Duration, Instant};
14
15use concurrent_queue::ConcurrentQueue;
16use polling::{Event, Events, Poller};
17use slab::Slab;
18
19cfg_if::cfg_if! {
21 if #[cfg(windows)] {
22 mod windows;
23 pub use windows::Registration;
24 } else if #[cfg(any(
25 target_vendor = "apple",
26 target_os = "freebsd",
27 target_os = "netbsd",
28 target_os = "openbsd",
29 target_os = "dragonfly",
30 ))] {
31 mod kqueue;
32 pub use kqueue::Registration;
33 } else if #[cfg(unix)] {
34 mod unix;
35 pub use unix::Registration;
36 } else {
37 compile_error!("unsupported platform");
38 }
39}
40
41#[cfg(not(target_os = "espidf"))]
42const TIMER_QUEUE_SIZE: usize = 1000;
43
44#[cfg(target_os = "espidf")]
47const TIMER_QUEUE_SIZE: usize = 100;
48
49const READ: usize = 0;
50const WRITE: usize = 1;
51
52pub(crate) struct Reactor {
56 pub(crate) poller: Poller,
60
61 ticker: AtomicUsize,
68
69 sources: Mutex<Slab<Arc<Source>>>,
71
72 events: Mutex<Events>,
76
77 timers: Mutex<BTreeMap<(Instant, usize), Waker>>,
83
84 timer_ops: ConcurrentQueue<TimerOp>,
89}
90
91impl Reactor {
92 pub(crate) fn get() -> &'static Reactor {
94 static REACTOR: OnceLock<Reactor> = OnceLock::new();
95
96 REACTOR.get_or_init(|| {
97 crate::driver::init();
98 Reactor {
99 poller: Poller::new().expect("cannot initialize I/O event notification"),
100 ticker: AtomicUsize::new(0),
101 sources: Mutex::new(Slab::new()),
102 events: Mutex::new(Events::new()),
103 timers: Mutex::new(BTreeMap::new()),
104 timer_ops: ConcurrentQueue::bounded(TIMER_QUEUE_SIZE),
105 }
106 })
107 }
108
109 pub(crate) fn ticker(&self) -> usize {
111 self.ticker.load(Ordering::SeqCst)
112 }
113
114 pub(crate) fn insert_io(&self, raw: Registration) -> io::Result<Arc<Source>> {
116 let source = {
118 let mut sources = self.sources.lock().unwrap();
119 let key = sources.vacant_entry().key();
120 let source = Arc::new(Source {
121 registration: raw,
122 key,
123 state: Default::default(),
124 });
125 sources.insert(source.clone());
126 source
127 };
128
129 if let Err(err) = source.registration.add(&self.poller, source.key) {
131 let mut sources = self.sources.lock().unwrap();
132 sources.remove(source.key);
133 return Err(err);
134 }
135
136 Ok(source)
137 }
138
139 pub(crate) fn remove_io(&self, source: &Source) -> io::Result<()> {
141 let mut sources = self.sources.lock().unwrap();
142 sources.remove(source.key);
143 source.registration.delete(&self.poller)
144 }
145
146 pub(crate) fn insert_timer(&self, when: Instant, waker: &Waker) -> usize {
150 static ID_GENERATOR: AtomicUsize = AtomicUsize::new(1);
152 let id = ID_GENERATOR.fetch_add(1, Ordering::Relaxed);
153
154 while self
156 .timer_ops
157 .push(TimerOp::Insert(when, id, waker.clone()))
158 .is_err()
159 {
160 let mut timers = self.timers.lock().unwrap();
162 self.process_timer_ops(&mut timers);
163 }
164
165 self.notify();
167
168 id
169 }
170
171 pub(crate) fn remove_timer(&self, when: Instant, id: usize) {
173 while self.timer_ops.push(TimerOp::Remove(when, id)).is_err() {
175 let mut timers = self.timers.lock().unwrap();
177 self.process_timer_ops(&mut timers);
178 }
179 }
180
181 pub(crate) fn notify(&self) {
183 self.poller.notify().expect("failed to notify reactor");
184 }
185
186 pub(crate) fn lock(&self) -> ReactorLock<'_> {
188 let reactor = self;
189 let events = self.events.lock().unwrap();
190 ReactorLock { reactor, events }
191 }
192
193 pub(crate) fn try_lock(&self) -> Option<ReactorLock<'_>> {
195 self.events.try_lock().ok().map(|events| {
196 let reactor = self;
197 ReactorLock { reactor, events }
198 })
199 }
200
201 fn process_timers(&self, wakers: &mut Vec<Waker>) -> Option<Duration> {
205 #[cfg(feature = "tracing")]
206 let span = tracing::trace_span!("process_timers");
207 #[cfg(feature = "tracing")]
208 let _enter = span.enter();
209
210 let mut timers = self.timers.lock().unwrap();
211 self.process_timer_ops(&mut timers);
212
213 let now = Instant::now();
214
215 let pending = timers.split_off(&(now + Duration::from_nanos(1), 0));
220 let ready = mem::replace(&mut *timers, pending);
221
222 let dur = if ready.is_empty() {
224 timers
226 .keys()
227 .next()
228 .map(|(when, _)| when.saturating_duration_since(now))
229 } else {
230 Some(Duration::from_secs(0))
232 };
233
234 drop(timers);
236
237 #[cfg(feature = "tracing")]
239 tracing::trace!("{} ready wakers", ready.len());
240
241 for (_, waker) in ready {
242 wakers.push(waker);
243 }
244
245 dur
246 }
247
248 fn process_timer_ops(&self, timers: &mut MutexGuard<'_, BTreeMap<(Instant, usize), Waker>>) {
250 self.timer_ops
253 .try_iter()
254 .take(self.timer_ops.capacity().unwrap())
255 .for_each(|op| match op {
256 TimerOp::Insert(when, id, waker) => {
257 timers.insert((when, id), waker);
258 }
259 TimerOp::Remove(when, id) => {
260 timers.remove(&(when, id));
261 }
262 });
263 }
264}
265
266pub(crate) struct ReactorLock<'a> {
268 reactor: &'a Reactor,
269 events: MutexGuard<'a, Events>,
270}
271
272impl ReactorLock<'_> {
273 pub(crate) fn react(&mut self, timeout: Option<Duration>) -> io::Result<()> {
275 #[cfg(feature = "tracing")]
276 let span = tracing::trace_span!("react");
277 #[cfg(feature = "tracing")]
278 let _enter = span.enter();
279
280 let mut wakers = Vec::new();
281
282 let next_timer = self.reactor.process_timers(&mut wakers);
284
285 let timeout = match (next_timer, timeout) {
287 (None, None) => None,
288 (Some(t), None) | (None, Some(t)) => Some(t),
289 (Some(a), Some(b)) => Some(a.min(b)),
290 };
291
292 let tick = self
294 .reactor
295 .ticker
296 .fetch_add(1, Ordering::SeqCst)
297 .wrapping_add(1);
298
299 self.events.clear();
300
301 let res = match self.reactor.poller.wait(&mut self.events, timeout) {
303 Ok(0) => {
305 if timeout != Some(Duration::from_secs(0)) {
306 self.reactor.process_timers(&mut wakers);
308 }
309 Ok(())
310 }
311
312 Ok(_) => {
314 let sources = self.reactor.sources.lock().unwrap();
316
317 for ev in self.events.iter() {
318 if let Some(source) = sources.get(ev.key) {
320 let mut state = source.state.lock().unwrap();
321
322 for &(dir, emitted) in &[(WRITE, ev.writable), (READ, ev.readable)] {
324 if emitted {
325 state[dir].tick = tick;
326 state[dir].drain_into(&mut wakers);
327 }
328 }
329
330 if !state[READ].is_empty() || !state[WRITE].is_empty() {
334 let event = {
336 let mut event = Event::none(source.key);
337 event.readable = !state[READ].is_empty();
338 event.writable = !state[WRITE].is_empty();
339 event
340 };
341
342 source.registration.modify(&self.reactor.poller, event)?;
344 }
345 }
346 }
347
348 Ok(())
349 }
350
351 Err(err) if err.kind() == io::ErrorKind::Interrupted => Ok(()),
353
354 Err(err) => Err(err),
356 };
357
358 #[cfg(feature = "tracing")]
360 tracing::trace!("{} ready wakers", wakers.len());
361 for waker in wakers {
362 panic::catch_unwind(|| waker.wake()).ok();
364 }
365
366 res
367 }
368}
369
370enum TimerOp {
372 Insert(Instant, usize, Waker),
373 Remove(Instant, usize),
374}
375
376#[derive(Debug)]
378pub(crate) struct Source {
379 registration: Registration,
381
382 key: usize,
384
385 state: Mutex<[Direction; 2]>,
387}
388
389#[derive(Debug, Default)]
391struct Direction {
392 tick: usize,
394
395 ticks: Option<(usize, usize)>,
397
398 waker: Option<Waker>,
400
401 wakers: Slab<Option<Waker>>,
405}
406
407impl Direction {
408 fn is_empty(&self) -> bool {
410 self.waker.is_none() && self.wakers.iter().all(|(_, opt)| opt.is_none())
411 }
412
413 fn drain_into(&mut self, dst: &mut Vec<Waker>) {
415 if let Some(w) = self.waker.take() {
416 dst.push(w);
417 }
418 for (_, opt) in self.wakers.iter_mut() {
419 if let Some(w) = opt.take() {
420 dst.push(w);
421 }
422 }
423 }
424}
425
426impl Source {
427 pub(crate) fn poll_readable(&self, cx: &mut Context<'_>) -> Poll<io::Result<()>> {
429 self.poll_ready(READ, cx)
430 }
431
432 pub(crate) fn poll_writable(&self, cx: &mut Context<'_>) -> Poll<io::Result<()>> {
434 self.poll_ready(WRITE, cx)
435 }
436
437 fn poll_ready(&self, dir: usize, cx: &mut Context<'_>) -> Poll<io::Result<()>> {
441 let mut state = self.state.lock().unwrap();
442
443 if let Some((a, b)) = state[dir].ticks {
445 if state[dir].tick != a && state[dir].tick != b {
448 state[dir].ticks = None;
449 return Poll::Ready(Ok(()));
450 }
451 }
452
453 let was_empty = state[dir].is_empty();
454
455 if let Some(w) = state[dir].waker.take() {
457 if w.will_wake(cx.waker()) {
458 state[dir].waker = Some(w);
459 return Poll::Pending;
460 }
461 panic::catch_unwind(|| w.wake()).ok();
463 }
464 state[dir].waker = Some(cx.waker().clone());
465 state[dir].ticks = Some((Reactor::get().ticker(), state[dir].tick));
466
467 if was_empty {
469 let event = {
471 let mut event = Event::none(self.key);
472 event.readable = !state[READ].is_empty();
473 event.writable = !state[WRITE].is_empty();
474 event
475 };
476
477 self.registration.modify(&Reactor::get().poller, event)?;
479 }
480
481 Poll::Pending
482 }
483
484 pub(crate) fn readable<T>(handle: &crate::Async<T>) -> Readable<'_, T> {
486 Readable(Self::ready(handle, READ))
487 }
488
489 pub(crate) fn readable_owned<T>(handle: Arc<crate::Async<T>>) -> ReadableOwned<T> {
491 ReadableOwned(Self::ready(handle, READ))
492 }
493
494 pub(crate) fn writable<T>(handle: &crate::Async<T>) -> Writable<'_, T> {
496 Writable(Self::ready(handle, WRITE))
497 }
498
499 pub(crate) fn writable_owned<T>(handle: Arc<crate::Async<T>>) -> WritableOwned<T> {
501 WritableOwned(Self::ready(handle, WRITE))
502 }
503
504 fn ready<H: Borrow<crate::Async<T>> + Clone, T>(handle: H, dir: usize) -> Ready<H, T> {
506 Ready {
507 handle,
508 dir,
509 ticks: None,
510 index: None,
511 _capture: PhantomData,
512 }
513 }
514}
515
516#[must_use = "futures do nothing unless you `.await` or poll them"]
518pub struct Readable<'a, T>(Ready<&'a crate::Async<T>, T>);
519
520impl<T> Future for Readable<'_, T> {
521 type Output = io::Result<()>;
522
523 fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
524 ready!(Pin::new(&mut self.0).poll(cx))?;
525 #[cfg(feature = "tracing")]
526 tracing::trace!(fd = ?self.0.handle.source.registration, "readable");
527 Poll::Ready(Ok(()))
528 }
529}
530
531impl<T> fmt::Debug for Readable<'_, T> {
532 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
533 f.debug_struct("Readable").finish()
534 }
535}
536
537#[must_use = "futures do nothing unless you `.await` or poll them"]
539pub struct ReadableOwned<T>(Ready<Arc<crate::Async<T>>, T>);
540
541impl<T> Future for ReadableOwned<T> {
542 type Output = io::Result<()>;
543
544 fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
545 ready!(Pin::new(&mut self.0).poll(cx))?;
546 #[cfg(feature = "tracing")]
547 tracing::trace!(fd = ?self.0.handle.source.registration, "readable_owned");
548 Poll::Ready(Ok(()))
549 }
550}
551
552impl<T> fmt::Debug for ReadableOwned<T> {
553 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
554 f.debug_struct("ReadableOwned").finish()
555 }
556}
557
558#[must_use = "futures do nothing unless you `.await` or poll them"]
560pub struct Writable<'a, T>(Ready<&'a crate::Async<T>, T>);
561
562impl<T> Future for Writable<'_, T> {
563 type Output = io::Result<()>;
564
565 fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
566 ready!(Pin::new(&mut self.0).poll(cx))?;
567 #[cfg(feature = "tracing")]
568 tracing::trace!(fd = ?self.0.handle.source.registration, "writable");
569 Poll::Ready(Ok(()))
570 }
571}
572
573impl<T> fmt::Debug for Writable<'_, T> {
574 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
575 f.debug_struct("Writable").finish()
576 }
577}
578
579#[must_use = "futures do nothing unless you `.await` or poll them"]
581pub struct WritableOwned<T>(Ready<Arc<crate::Async<T>>, T>);
582
583impl<T> Future for WritableOwned<T> {
584 type Output = io::Result<()>;
585
586 fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
587 ready!(Pin::new(&mut self.0).poll(cx))?;
588 #[cfg(feature = "tracing")]
589 tracing::trace!(fd = ?self.0.handle.source.registration, "writable_owned");
590 Poll::Ready(Ok(()))
591 }
592}
593
594impl<T> fmt::Debug for WritableOwned<T> {
595 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
596 f.debug_struct("WritableOwned").finish()
597 }
598}
599
600struct Ready<H: Borrow<crate::Async<T>>, T> {
601 handle: H,
602 dir: usize,
603 ticks: Option<(usize, usize)>,
604 index: Option<usize>,
605 _capture: PhantomData<fn() -> T>,
606}
607
608impl<H: Borrow<crate::Async<T>>, T> Unpin for Ready<H, T> {}
609
610impl<H: Borrow<crate::Async<T>> + Clone, T> Future for Ready<H, T> {
611 type Output = io::Result<()>;
612
613 fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
614 let Self {
615 ref handle,
616 dir,
617 ticks,
618 index,
619 ..
620 } = &mut *self;
621
622 let mut state = handle.borrow().source.state.lock().unwrap();
623
624 if let Some((a, b)) = *ticks {
626 if state[*dir].tick != a && state[*dir].tick != b {
629 return Poll::Ready(Ok(()));
630 }
631 }
632
633 let was_empty = state[*dir].is_empty();
634
635 let i = match *index {
637 Some(i) => i,
638 None => {
639 let i = state[*dir].wakers.insert(None);
640 *index = Some(i);
641 *ticks = Some((Reactor::get().ticker(), state[*dir].tick));
642 i
643 }
644 };
645 state[*dir].wakers[i] = Some(cx.waker().clone());
646
647 if was_empty {
649 let event = {
651 let mut event = Event::none(handle.borrow().source.key);
652 event.readable = !state[READ].is_empty();
653 event.writable = !state[WRITE].is_empty();
654 event
655 };
656
657 handle
659 .borrow()
660 .source
661 .registration
662 .modify(&Reactor::get().poller, event)?;
663 }
664
665 Poll::Pending
666 }
667}
668
669impl<H: Borrow<crate::Async<T>>, T> Drop for Ready<H, T> {
670 fn drop(&mut self) {
671 if let Some(key) = self.index {
673 let mut state = self.handle.borrow().source.state.lock().unwrap();
674 let wakers = &mut state[self.dir].wakers;
675 if wakers.contains(key) {
676 wakers.remove(key);
677 }
678 }
679 }
680}