1use std::borrow::Borrow;
2use std::collections::BTreeMap;
3use std::fmt;
4use std::future::Future;
5use std::io;
6use std::marker::PhantomData;
7use std::mem;
8use std::panic;
9use std::pin::Pin;
10use std::sync::atomic::{AtomicUsize, Ordering};
11use std::sync::{Arc, Mutex, MutexGuard};
12use std::task::{Context, Poll, Waker};
13use std::time::{Duration, Instant};
14
15use async_lock::OnceCell;
16use concurrent_queue::ConcurrentQueue;
17use futures_lite::ready;
18use polling::{Event, Events, Poller};
19use slab::Slab;
20
21cfg_if::cfg_if! {
23 if #[cfg(windows)] {
24 mod windows;
25 pub use windows::Registration;
26 } else if #[cfg(any(
27 target_vendor = "apple",
28 target_os = "freebsd",
29 target_os = "netbsd",
30 target_os = "openbsd",
31 target_os = "dragonfly",
32 ))] {
33 mod kqueue;
34 pub use kqueue::Registration;
35 } else if #[cfg(unix)] {
36 mod unix;
37 pub use unix::Registration;
38 } else {
39 compile_error!("unsupported platform");
40 }
41}
42
43#[cfg(not(target_os = "espidf"))]
44const TIMER_QUEUE_SIZE: usize = 1000;
45
46#[cfg(target_os = "espidf")]
49const TIMER_QUEUE_SIZE: usize = 100;
50
51const READ: usize = 0;
52const WRITE: usize = 1;
53
54pub(crate) struct Reactor {
58 pub(crate) poller: Poller,
62
63 ticker: AtomicUsize,
70
71 sources: Mutex<Slab<Arc<Source>>>,
73
74 events: Mutex<Events>,
78
79 timers: Mutex<BTreeMap<(Instant, usize), Waker>>,
85
86 timer_ops: ConcurrentQueue<TimerOp>,
91}
92
93impl Reactor {
94 pub(crate) fn get() -> &'static Reactor {
96 static REACTOR: OnceCell<Reactor> = OnceCell::new();
97
98 REACTOR.get_or_init_blocking(|| {
99 crate::driver::init();
100 Reactor {
101 poller: Poller::new().expect("cannot initialize I/O event notification"),
102 ticker: AtomicUsize::new(0),
103 sources: Mutex::new(Slab::new()),
104 events: Mutex::new(Events::new()),
105 timers: Mutex::new(BTreeMap::new()),
106 timer_ops: ConcurrentQueue::bounded(TIMER_QUEUE_SIZE),
107 }
108 })
109 }
110
111 pub(crate) fn ticker(&self) -> usize {
113 self.ticker.load(Ordering::SeqCst)
114 }
115
116 pub(crate) fn insert_io(&self, raw: Registration) -> io::Result<Arc<Source>> {
118 let source = {
120 let mut sources = self.sources.lock().unwrap();
121 let key = sources.vacant_entry().key();
122 let source = Arc::new(Source {
123 registration: raw,
124 key,
125 state: Default::default(),
126 });
127 sources.insert(source.clone());
128 source
129 };
130
131 if let Err(err) = source.registration.add(&self.poller, source.key) {
133 let mut sources = self.sources.lock().unwrap();
134 sources.remove(source.key);
135 return Err(err);
136 }
137
138 Ok(source)
139 }
140
141 pub(crate) fn remove_io(&self, source: &Source) -> io::Result<()> {
143 let mut sources = self.sources.lock().unwrap();
144 sources.remove(source.key);
145 source.registration.delete(&self.poller)
146 }
147
148 pub(crate) fn insert_timer(&self, when: Instant, waker: &Waker) -> usize {
152 static ID_GENERATOR: AtomicUsize = AtomicUsize::new(1);
154 let id = ID_GENERATOR.fetch_add(1, Ordering::Relaxed);
155
156 while self
158 .timer_ops
159 .push(TimerOp::Insert(when, id, waker.clone()))
160 .is_err()
161 {
162 let mut timers = self.timers.lock().unwrap();
164 self.process_timer_ops(&mut timers);
165 }
166
167 self.notify();
169
170 id
171 }
172
173 pub(crate) fn remove_timer(&self, when: Instant, id: usize) {
175 while self.timer_ops.push(TimerOp::Remove(when, id)).is_err() {
177 let mut timers = self.timers.lock().unwrap();
179 self.process_timer_ops(&mut timers);
180 }
181 }
182
183 pub(crate) fn notify(&self) {
185 self.poller.notify().expect("failed to notify reactor");
186 }
187
188 pub(crate) fn lock(&self) -> ReactorLock<'_> {
190 let reactor = self;
191 let events = self.events.lock().unwrap();
192 ReactorLock { reactor, events }
193 }
194
195 pub(crate) fn try_lock(&self) -> Option<ReactorLock<'_>> {
197 self.events.try_lock().ok().map(|events| {
198 let reactor = self;
199 ReactorLock { reactor, events }
200 })
201 }
202
203 fn process_timers(&self, wakers: &mut Vec<Waker>) -> Option<Duration> {
207 #[cfg(feature = "tracing")]
208 let span = tracing::trace_span!("process_timers");
209 #[cfg(feature = "tracing")]
210 let _enter = span.enter();
211
212 let mut timers = self.timers.lock().unwrap();
213 self.process_timer_ops(&mut timers);
214
215 let now = Instant::now();
216
217 let pending = timers.split_off(&(now + Duration::from_nanos(1), 0));
222 let ready = mem::replace(&mut *timers, pending);
223
224 let dur = if ready.is_empty() {
226 timers
228 .keys()
229 .next()
230 .map(|(when, _)| when.saturating_duration_since(now))
231 } else {
232 Some(Duration::from_secs(0))
234 };
235
236 drop(timers);
238
239 #[cfg(feature = "tracing")]
241 tracing::trace!("{} ready wakers", ready.len());
242
243 for (_, waker) in ready {
244 wakers.push(waker);
245 }
246
247 dur
248 }
249
250 fn process_timer_ops(&self, timers: &mut MutexGuard<'_, BTreeMap<(Instant, usize), Waker>>) {
252 self.timer_ops
255 .try_iter()
256 .take(self.timer_ops.capacity().unwrap())
257 .for_each(|op| match op {
258 TimerOp::Insert(when, id, waker) => {
259 timers.insert((when, id), waker);
260 }
261 TimerOp::Remove(when, id) => {
262 timers.remove(&(when, id));
263 }
264 });
265 }
266}
267
268pub(crate) struct ReactorLock<'a> {
270 reactor: &'a Reactor,
271 events: MutexGuard<'a, Events>,
272}
273
274impl ReactorLock<'_> {
275 pub(crate) fn react(&mut self, timeout: Option<Duration>) -> io::Result<()> {
277 #[cfg(feature = "tracing")]
278 let span = tracing::trace_span!("react");
279 #[cfg(feature = "tracing")]
280 let _enter = span.enter();
281
282 let mut wakers = Vec::new();
283
284 let next_timer = self.reactor.process_timers(&mut wakers);
286
287 let timeout = match (next_timer, timeout) {
289 (None, None) => None,
290 (Some(t), None) | (None, Some(t)) => Some(t),
291 (Some(a), Some(b)) => Some(a.min(b)),
292 };
293
294 let tick = self
296 .reactor
297 .ticker
298 .fetch_add(1, Ordering::SeqCst)
299 .wrapping_add(1);
300
301 self.events.clear();
302
303 let res = match self.reactor.poller.wait(&mut self.events, timeout) {
305 Ok(0) => {
307 if timeout != Some(Duration::from_secs(0)) {
308 self.reactor.process_timers(&mut wakers);
310 }
311 Ok(())
312 }
313
314 Ok(_) => {
316 let sources = self.reactor.sources.lock().unwrap();
318
319 for ev in self.events.iter() {
320 if let Some(source) = sources.get(ev.key) {
322 let mut state = source.state.lock().unwrap();
323
324 for &(dir, emitted) in &[(WRITE, ev.writable), (READ, ev.readable)] {
326 if emitted {
327 state[dir].tick = tick;
328 state[dir].drain_into(&mut wakers);
329 }
330 }
331
332 if !state[READ].is_empty() || !state[WRITE].is_empty() {
336 let event = {
338 let mut event = Event::none(source.key);
339 event.readable = !state[READ].is_empty();
340 event.writable = !state[WRITE].is_empty();
341 event
342 };
343
344 source.registration.modify(&self.reactor.poller, event)?;
346 }
347 }
348 }
349
350 Ok(())
351 }
352
353 Err(err) if err.kind() == io::ErrorKind::Interrupted => Ok(()),
355
356 Err(err) => Err(err),
358 };
359
360 #[cfg(feature = "tracing")]
362 tracing::trace!("{} ready wakers", wakers.len());
363 for waker in wakers {
364 panic::catch_unwind(|| waker.wake()).ok();
366 }
367
368 res
369 }
370}
371
372enum TimerOp {
374 Insert(Instant, usize, Waker),
375 Remove(Instant, usize),
376}
377
378#[derive(Debug)]
380pub(crate) struct Source {
381 registration: Registration,
383
384 key: usize,
386
387 state: Mutex<[Direction; 2]>,
389}
390
391#[derive(Debug, Default)]
393struct Direction {
394 tick: usize,
396
397 ticks: Option<(usize, usize)>,
399
400 waker: Option<Waker>,
402
403 wakers: Slab<Option<Waker>>,
407}
408
409impl Direction {
410 fn is_empty(&self) -> bool {
412 self.waker.is_none() && self.wakers.iter().all(|(_, opt)| opt.is_none())
413 }
414
415 fn drain_into(&mut self, dst: &mut Vec<Waker>) {
417 if let Some(w) = self.waker.take() {
418 dst.push(w);
419 }
420 for (_, opt) in self.wakers.iter_mut() {
421 if let Some(w) = opt.take() {
422 dst.push(w);
423 }
424 }
425 }
426}
427
428impl Source {
429 pub(crate) fn poll_readable(&self, cx: &mut Context<'_>) -> Poll<io::Result<()>> {
431 self.poll_ready(READ, cx)
432 }
433
434 pub(crate) fn poll_writable(&self, cx: &mut Context<'_>) -> Poll<io::Result<()>> {
436 self.poll_ready(WRITE, cx)
437 }
438
439 fn poll_ready(&self, dir: usize, cx: &mut Context<'_>) -> Poll<io::Result<()>> {
443 let mut state = self.state.lock().unwrap();
444
445 if let Some((a, b)) = state[dir].ticks {
447 if state[dir].tick != a && state[dir].tick != b {
450 state[dir].ticks = None;
451 return Poll::Ready(Ok(()));
452 }
453 }
454
455 let was_empty = state[dir].is_empty();
456
457 if let Some(w) = state[dir].waker.take() {
459 if w.will_wake(cx.waker()) {
460 state[dir].waker = Some(w);
461 return Poll::Pending;
462 }
463 panic::catch_unwind(|| w.wake()).ok();
465 }
466 state[dir].waker = Some(cx.waker().clone());
467 state[dir].ticks = Some((Reactor::get().ticker(), state[dir].tick));
468
469 if was_empty {
471 let event = {
473 let mut event = Event::none(self.key);
474 event.readable = !state[READ].is_empty();
475 event.writable = !state[WRITE].is_empty();
476 event
477 };
478
479 self.registration.modify(&Reactor::get().poller, event)?;
481 }
482
483 Poll::Pending
484 }
485
486 pub(crate) fn readable<T>(handle: &crate::Async<T>) -> Readable<'_, T> {
488 Readable(Self::ready(handle, READ))
489 }
490
491 pub(crate) fn readable_owned<T>(handle: Arc<crate::Async<T>>) -> ReadableOwned<T> {
493 ReadableOwned(Self::ready(handle, READ))
494 }
495
496 pub(crate) fn writable<T>(handle: &crate::Async<T>) -> Writable<'_, T> {
498 Writable(Self::ready(handle, WRITE))
499 }
500
501 pub(crate) fn writable_owned<T>(handle: Arc<crate::Async<T>>) -> WritableOwned<T> {
503 WritableOwned(Self::ready(handle, WRITE))
504 }
505
506 fn ready<H: Borrow<crate::Async<T>> + Clone, T>(handle: H, dir: usize) -> Ready<H, T> {
508 Ready {
509 handle,
510 dir,
511 ticks: None,
512 index: None,
513 _capture: PhantomData,
514 }
515 }
516}
517
518#[must_use = "futures do nothing unless you `.await` or poll them"]
520pub struct Readable<'a, T>(Ready<&'a crate::Async<T>, T>);
521
522impl<T> Future for Readable<'_, T> {
523 type Output = io::Result<()>;
524
525 fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
526 ready!(Pin::new(&mut self.0).poll(cx))?;
527 #[cfg(feature = "tracing")]
528 tracing::trace!(fd = ?self.0.handle.source.registration, "readable");
529 Poll::Ready(Ok(()))
530 }
531}
532
533impl<T> fmt::Debug for Readable<'_, T> {
534 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
535 f.debug_struct("Readable").finish()
536 }
537}
538
539#[must_use = "futures do nothing unless you `.await` or poll them"]
541pub struct ReadableOwned<T>(Ready<Arc<crate::Async<T>>, T>);
542
543impl<T> Future for ReadableOwned<T> {
544 type Output = io::Result<()>;
545
546 fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
547 ready!(Pin::new(&mut self.0).poll(cx))?;
548 #[cfg(feature = "tracing")]
549 tracing::trace!(fd = ?self.0.handle.source.registration, "readable_owned");
550 Poll::Ready(Ok(()))
551 }
552}
553
554impl<T> fmt::Debug for ReadableOwned<T> {
555 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
556 f.debug_struct("ReadableOwned").finish()
557 }
558}
559
560#[must_use = "futures do nothing unless you `.await` or poll them"]
562pub struct Writable<'a, T>(Ready<&'a crate::Async<T>, T>);
563
564impl<T> Future for Writable<'_, T> {
565 type Output = io::Result<()>;
566
567 fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
568 ready!(Pin::new(&mut self.0).poll(cx))?;
569 #[cfg(feature = "tracing")]
570 tracing::trace!(fd = ?self.0.handle.source.registration, "writable");
571 Poll::Ready(Ok(()))
572 }
573}
574
575impl<T> fmt::Debug for Writable<'_, T> {
576 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
577 f.debug_struct("Writable").finish()
578 }
579}
580
581#[must_use = "futures do nothing unless you `.await` or poll them"]
583pub struct WritableOwned<T>(Ready<Arc<crate::Async<T>>, T>);
584
585impl<T> Future for WritableOwned<T> {
586 type Output = io::Result<()>;
587
588 fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
589 ready!(Pin::new(&mut self.0).poll(cx))?;
590 #[cfg(feature = "tracing")]
591 tracing::trace!(fd = ?self.0.handle.source.registration, "writable_owned");
592 Poll::Ready(Ok(()))
593 }
594}
595
596impl<T> fmt::Debug for WritableOwned<T> {
597 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
598 f.debug_struct("WritableOwned").finish()
599 }
600}
601
602struct Ready<H: Borrow<crate::Async<T>>, T> {
603 handle: H,
604 dir: usize,
605 ticks: Option<(usize, usize)>,
606 index: Option<usize>,
607 _capture: PhantomData<fn() -> T>,
608}
609
610impl<H: Borrow<crate::Async<T>>, T> Unpin for Ready<H, T> {}
611
612impl<H: Borrow<crate::Async<T>> + Clone, T> Future for Ready<H, T> {
613 type Output = io::Result<()>;
614
615 fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
616 let Self {
617 ref handle,
618 dir,
619 ticks,
620 index,
621 ..
622 } = &mut *self;
623
624 let mut state = handle.borrow().source.state.lock().unwrap();
625
626 if let Some((a, b)) = *ticks {
628 if state[*dir].tick != a && state[*dir].tick != b {
631 return Poll::Ready(Ok(()));
632 }
633 }
634
635 let was_empty = state[*dir].is_empty();
636
637 let i = match *index {
639 Some(i) => i,
640 None => {
641 let i = state[*dir].wakers.insert(None);
642 *index = Some(i);
643 *ticks = Some((Reactor::get().ticker(), state[*dir].tick));
644 i
645 }
646 };
647 state[*dir].wakers[i] = Some(cx.waker().clone());
648
649 if was_empty {
651 let event = {
653 let mut event = Event::none(handle.borrow().source.key);
654 event.readable = !state[READ].is_empty();
655 event.writable = !state[WRITE].is_empty();
656 event
657 };
658
659 handle
661 .borrow()
662 .source
663 .registration
664 .modify(&Reactor::get().poller, event)?;
665 }
666
667 Poll::Pending
668 }
669}
670
671impl<H: Borrow<crate::Async<T>>, T> Drop for Ready<H, T> {
672 fn drop(&mut self) {
673 if let Some(key) = self.index {
675 let mut state = self.handle.borrow().source.state.lock().unwrap();
676 let wakers = &mut state[self.dir].wakers;
677 if wakers.contains(key) {
678 wakers.remove(key);
679 }
680 }
681 }
682}