reactor/poller/
popol.rs

1// Library for concurrent I/O resource management using reactor pattern.
2//
3// SPDX-License-Identifier: Apache-2.0
4//
5// Written in 2021-2023 by
6//     Dr. Maxim Orlovsky <orlovsky@ubideco.org>
7//     Alexis Sellier <alexis@cloudhead.io>
8//
9// Copyright 2022-2023 UBIDECO Institute, Switzerland
10// Copyright 2021 Alexis Sellier <alexis@cloudhead.io>
11//
12// Licensed under the Apache License, Version 2.0 (the "License");
13// you may not use this file except in compliance with the License.
14// You may obtain a copy of the License at
15//
16//     http://www.apache.org/licenses/LICENSE-2.0
17//
18// Unless required by applicable law or agreed to in writing, software
19// distributed under the License is distributed on an "AS IS" BASIS,
20// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
21// See the License for the specific language governing permissions and
22// limitations under the License.
23
24//! Poll engine provided by the [`popol`] crate.
25
26use std::collections::VecDeque;
27use std::io::{self, Error};
28use std::os::unix::io::{AsRawFd, RawFd};
29use std::sync::Arc;
30use std::time::Duration;
31
32use crate::poller::{IoFail, IoType, Poll, Waker, WakerRecv, WakerSend};
33use crate::{ResourceId, ResourceIdGenerator};
34
35/// Manager for a set of reactor which are polled for an event loop by the
36/// re-actor by using [`popol`] library.
37pub struct Poller {
38    poll: popol::Sources<ResourceId>,
39    events: VecDeque<popol::Event<ResourceId>>,
40    id_gen: ResourceIdGenerator,
41}
42
43impl Default for Poller {
44    fn default() -> Self { Self::new() }
45}
46
47impl Poller {
48    /// Constructs new [`popol`]-backed poll engine.
49    pub fn new() -> Self {
50        Self {
51            poll: popol::Sources::new(),
52            events: empty!(),
53            id_gen: ResourceIdGenerator::default(),
54        }
55    }
56
57    /// Constructs new [`popol`]-backed poll engine and reserves certain capacity for the resources
58    /// which later will be registered for a poll operation.
59    pub fn with_capacity(capacity: usize) -> Self {
60        Self {
61            poll: popol::Sources::with_capacity(capacity),
62            events: VecDeque::with_capacity(capacity),
63            id_gen: ResourceIdGenerator::default(),
64        }
65    }
66}
67
68impl Poll for Poller {
69    type Waker = PopolWaker;
70
71    fn register_waker(&mut self, fd: &impl AsRawFd) {
72        let id = ResourceId::WAKER;
73        if self.poll.get(&id).is_some() {
74            #[cfg(feature = "log")]
75            log::error!(target: "popol", "Reactor waker is already registered, terminating");
76            panic!("Reactor waker is already registered");
77        }
78
79        self.poll.register(id, fd, popol::interest::READ);
80    }
81
82    fn register(&mut self, fd: &impl AsRawFd, interest: IoType) -> ResourceId {
83        let id = self.id_gen.next();
84
85        #[cfg(feature = "log")]
86        log::trace!(target: "popol", "Registering file descriptor {} as resource with id {}", fd.as_raw_fd(), id);
87
88        self.poll.register(id, fd, interest.into());
89        id
90    }
91
92    fn unregister(&mut self, id: ResourceId) {
93        #[cfg(feature = "log")]
94        log::trace!(target: "popol", "Unregistering {}", id);
95        self.poll.unregister(&id);
96    }
97
98    fn set_interest(&mut self, id: ResourceId, interest: IoType) -> bool {
99        #[cfg(feature = "log")]
100        log::trace!(target: "popol", "Setting interest `{interest}` on {}", id);
101
102        self.poll.unset(&id, (!interest).into());
103        self.poll.set(&id, interest.into())
104    }
105
106    fn poll(&mut self, timeout: Option<Duration>) -> io::Result<usize> {
107        #[cfg(feature = "log")]
108        log::trace!(target: "popol",
109            "Polling {} reactor resources with timeout {timeout:?} (pending event queue is {})",
110            self.poll.len(), self.events.len()
111        );
112
113        // Blocking call
114        match self.poll.poll(&mut self.events, timeout) {
115            Ok(count) => {
116                #[cfg(feature = "log")]
117                log::trace!(target: "popol", "Poll resulted in {} new event(s)", count);
118                Ok(count)
119            }
120            Err(err) if err.kind() == io::ErrorKind::TimedOut => {
121                #[cfg(feature = "log")]
122                log::trace!(target: "popol", "Poll timed out with zero events generated");
123                Ok(0)
124            }
125            Err(err) => {
126                #[cfg(feature = "log")]
127                log::trace!(target: "popol", "Poll resulted in error: {err}");
128                Err(err)
129            }
130        }
131    }
132}
133
134impl Iterator for Poller {
135    type Item = (ResourceId, Result<IoType, IoFail>);
136
137    fn next(&mut self) -> Option<Self::Item> {
138        let event = self.events.pop_front()?;
139
140        let id = event.key;
141        let fired = event.raw_events();
142        let res = if event.is_hangup() {
143            #[cfg(feature = "log")]
144            log::trace!(target: "popol", "Hangup on {id}");
145
146            Err(IoFail::Connectivity(fired))
147        } else if event.is_error() || event.is_invalid() {
148            #[cfg(feature = "log")]
149            log::trace!(target: "popol", "OS error on {id} (fired events {fired:#b})");
150
151            Err(IoFail::Os(fired))
152        } else {
153            let io = IoType {
154                read: event.is_readable(),
155                write: event.is_writable(),
156            };
157
158            #[cfg(feature = "log")]
159            log::trace!(target: "popol", "I/O event on {id}: {io}");
160
161            Ok(io)
162        };
163        Some((id, res))
164    }
165}
166
167impl From<IoType> for popol::Interest {
168    fn from(ev: IoType) -> Self {
169        let mut e = popol::interest::NONE;
170        if ev.read {
171            e |= popol::interest::READ;
172        }
173        if ev.write {
174            e |= popol::interest::WRITE;
175        }
176        e
177    }
178}
179
180/// Wrapper type around the waker provided by `popol` crate.
181#[derive(Clone)]
182pub struct PopolWaker(Arc<popol::Waker>);
183
184impl Waker for PopolWaker {
185    type Send = Self;
186    type Recv = Self;
187
188    fn pair() -> Result<(Self::Send, Self::Recv), Error> {
189        let waker = Arc::new(popol::Waker::new()?);
190        Ok((PopolWaker(waker.clone()), PopolWaker(waker)))
191    }
192}
193
194impl io::Read for PopolWaker {
195    fn read(&mut self, _buf: &mut [u8]) -> io::Result<usize> {
196        self.reset();
197        // Waker reads only when there is something which was sent.
198        // That's why we just return here.
199        Ok(0)
200    }
201}
202
203impl AsRawFd for PopolWaker {
204    fn as_raw_fd(&self) -> RawFd { self.0.as_ref().as_raw_fd() }
205}
206
207impl WakerRecv for PopolWaker {
208    fn reset(&self) {
209        if let Err(e) = popol::Waker::reset(self.0.as_ref()) {
210            #[cfg(feature = "log")]
211            log::error!(target: "reactor-controller", "Unable to reset waker queue: {e}");
212            panic!("unable to reset waker queue. Details: {e}");
213        }
214    }
215}
216
217impl WakerSend for PopolWaker {
218    fn wake(&self) -> io::Result<()> { self.0.wake() }
219}