#![allow(unused_variables)]
use std::collections::HashMap;
use std::fmt::{Debug, Display, Formatter};
use std::os::unix::io::{AsRawFd, RawFd};
use std::thread::JoinHandle;
use std::time::Duration;
use std::{io, thread};
use crossbeam_channel as chan;
use crate::poller::{IoType, Poll, Waker, WakerRecv, WakerSend};
use crate::resource::WriteError;
use crate::{Resource, ResourceId, ResourceType, Timer, Timestamp, WriteAtomic};
const WAIT_TIMEOUT: Duration = Duration::from_secs(60 * 60);
#[derive(Error, Display, From)]
#[display(doc_comments)]
pub enum Error<L: Resource, T: Resource> {
ListenerDisconnect(ResourceId, L),
TransportDisconnect(ResourceId, T),
Poll(io::Error),
}
impl<L: Resource, T: Resource> Debug for Error<L, T> {
fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result { Display::fmt(self, f) }
}
#[derive(Display)]
pub enum Action<L: Resource, T: Resource> {
#[display("register_listener")]
RegisterListener(L),
#[display("register_transport")]
RegisterTransport(T),
#[display("unregister_listener")]
UnregisterListener(ResourceId),
#[display("unregister_transport")]
UnregisterTransport(ResourceId),
#[display("send_to({0})")]
Send(ResourceId, Vec<u8>),
#[display("set_timer({0:?})")]
SetTimer(Duration),
}
pub trait Handler: Send + Iterator<Item = Action<Self::Listener, Self::Transport>> {
type Listener: Resource;
type Transport: Resource;
type Command: Debug + Send;
fn tick(&mut self, time: Timestamp);
fn handle_timer(&mut self);
fn handle_listener_event(
&mut self,
id: ResourceId,
event: <Self::Listener as Resource>::Event,
time: Timestamp,
);
fn handle_transport_event(
&mut self,
id: ResourceId,
event: <Self::Transport as Resource>::Event,
time: Timestamp,
);
fn handle_registered(&mut self, fd: RawFd, id: ResourceId, ty: ResourceType);
fn handle_command(&mut self, cmd: Self::Command);
fn handle_error(&mut self, err: Error<Self::Listener, Self::Transport>);
fn handover_listener(&mut self, id: ResourceId, listener: Self::Listener);
fn handover_transport(&mut self, id: ResourceId, transport: Self::Transport);
}
pub struct Reactor<C, P: Poll> {
thread: JoinHandle<()>,
controller: Controller<C, <P::Waker as Waker>::Send>,
}
impl<C, P: Poll> Reactor<C, P> {
pub fn new<H: Handler<Command = C>>(service: H, poller: P) -> Result<Self, io::Error>
where
H: 'static,
P: 'static,
C: 'static + Send,
{
Reactor::with(service, poller, thread::Builder::new())
}
pub fn named<H: Handler<Command = C>>(
service: H,
poller: P,
thread_name: String,
) -> Result<Self, io::Error>
where
H: 'static,
P: 'static,
C: 'static + Send,
{
Reactor::with(service, poller, thread::Builder::new().name(thread_name))
}
pub fn with<H: Handler<Command = C>>(
service: H,
mut poller: P,
builder: thread::Builder,
) -> Result<Self, io::Error>
where
H: 'static,
P: 'static,
C: 'static + Send,
{
let (ctl_send, ctl_recv) = chan::unbounded();
let (waker_writer, waker_reader) = P::Waker::pair()?;
let controller = Controller {
ctl_send,
waker: waker_writer,
};
#[cfg(feature = "log")]
log::debug!(target: "reactor-controller", "Initializing reactor thread...");
let runtime_controller = controller.clone();
let thread = builder.spawn(move || {
#[cfg(feature = "log")]
log::debug!(target: "reactor", "Registering waker (fd {})", waker_reader.as_raw_fd());
poller.register_waker(&waker_reader);
let runtime = Runtime {
service,
poller,
controller: runtime_controller,
ctl_recv,
listeners: empty!(),
transports: empty!(),
waker: waker_reader,
timeouts: Timer::new(),
};
#[cfg(feature = "log")]
log::info!(target: "reactor", "Entering reactor event loop");
runtime.run();
})?;
controller.wake()?;
Ok(Self { thread, controller })
}
pub fn controller(&self) -> Controller<C, <P::Waker as Waker>::Send> { self.controller.clone() }
pub fn join(self) -> thread::Result<()> { self.thread.join() }
}
enum Ctl<C> {
Cmd(C),
Shutdown,
}
pub struct Controller<C, W: WakerSend> {
ctl_send: chan::Sender<Ctl<C>>,
waker: W,
}
impl<C, W: WakerSend> Clone for Controller<C, W> {
fn clone(&self) -> Self {
Controller {
ctl_send: self.ctl_send.clone(),
waker: self.waker.clone(),
}
}
}
impl<C, W: WakerSend> Controller<C, W> {
#[allow(unused_mut)] pub fn cmd(&self, mut command: C) -> Result<(), io::Error>
where C: 'static {
#[cfg(feature = "log")]
{
use std::any::Any;
let cmd = Box::new(command);
let any = cmd as Box<dyn Any>;
let any = match any.downcast::<Box<dyn Debug>>() {
Err(any) => {
log::debug!(target: "reactor-controller", "Sending command to the reactor");
any
}
Ok(debug) => {
log::debug!(target: "reactor-controller", "Sending command {debug:?} to the reactor");
debug
}
};
command = *any.downcast().expect("from upcast");
}
self.ctl_send.send(Ctl::Cmd(command)).map_err(|_| io::ErrorKind::BrokenPipe)?;
self.wake()?;
Ok(())
}
pub fn shutdown(self) -> Result<(), Self> {
#[cfg(feature = "log")]
log::info!(target: "reactor-controller", "Initiating reactor shutdown...");
let res1 = self.ctl_send.send(Ctl::Shutdown);
let res2 = self.wake();
res1.or(res2).map_err(|_| self)
}
fn wake(&self) -> io::Result<()> {
#[cfg(feature = "log")]
log::trace!(target: "reactor-controller", "Wakening the reactor");
self.waker.wake()
}
}
pub struct Runtime<H: Handler, P: Poll> {
service: H,
poller: P,
controller: Controller<H::Command, <P::Waker as Waker>::Send>,
ctl_recv: chan::Receiver<Ctl<H::Command>>,
listeners: HashMap<ResourceId, H::Listener>,
transports: HashMap<ResourceId, H::Transport>,
waker: <P::Waker as Waker>::Recv,
timeouts: Timer,
}
impl<H: Handler, P: Poll> Runtime<H, P> {
pub fn with(service: H, mut poller: P) -> io::Result<Self> {
let (ctl_send, ctl_recv) = chan::unbounded();
let (waker_writer, waker_reader) = P::Waker::pair()?;
#[cfg(feature = "log")]
log::debug!(target: "reactor", "Registering waker (fd {})", waker_reader.as_raw_fd());
poller.register_waker(&waker_reader);
let controller = Controller {
ctl_send,
waker: waker_writer,
};
Ok(Runtime {
service,
poller,
controller,
ctl_recv,
listeners: empty!(),
transports: empty!(),
waker: waker_reader,
timeouts: Timer::new(),
})
}
pub fn controller(&self) -> Controller<H::Command, <P::Waker as Waker>::Send> {
self.controller.clone()
}
fn run(mut self) {
loop {
let before_poll = Timestamp::now();
let timeout = self.timeouts.next_expiring_from(before_poll).unwrap_or(WAIT_TIMEOUT);
for (id, res) in &self.listeners {
self.poller.set_interest(*id, res.interests());
}
for (id, res) in &self.transports {
self.poller.set_interest(*id, res.interests());
}
#[cfg(feature = "log")]
log::trace!(target: "reactor", "Polling with timeout {timeout:?}");
let res = self.poller.poll(Some(timeout));
let now = Timestamp::now();
self.service.tick(now);
let timers_fired = self.timeouts.remove_expired_by(now);
if timers_fired > 0 {
#[cfg(feature = "log")]
log::trace!(target: "reactor", "Timer has fired");
self.service.handle_timer();
}
match res {
Ok(0) if timers_fired == 0 => {
#[cfg(feature = "log")]
log::trace!(target: "reactor", "Poll timeout; no I/O events had happened");
}
Err(err) => {
#[cfg(feature = "log")]
log::error!(target: "reactor", "Error during polling: {err}");
self.service.handle_error(Error::Poll(err));
}
_ => {}
}
let awoken = self.handle_events(now);
if awoken {
loop {
match self.ctl_recv.try_recv() {
Err(chan::TryRecvError::Empty) => break,
Err(chan::TryRecvError::Disconnected) => {
panic!("control channel is broken")
}
Ok(Ctl::Shutdown) => return self.handle_shutdown(),
Ok(Ctl::Cmd(cmd)) => self.service.handle_command(cmd),
}
}
}
self.handle_actions(now);
}
}
fn handle_events(&mut self, time: Timestamp) -> bool {
let mut awoken = false;
while let Some((id, res)) = self.poller.next() {
if id == ResourceId::WAKER {
if let Err(err) = res {
#[cfg(feature = "log")]
log::error!(target: "reactor", "Polling waker has failed: {err}");
panic!("waker failure: {err}");
};
#[cfg(feature = "log")]
log::trace!(target: "reactor", "Awoken by the controller");
self.waker.reset();
awoken = true;
} else if self.listeners.contains_key(&id) {
match res {
Ok(io) => {
#[cfg(feature = "log")]
log::trace!(target: "reactor", "Got `{io}` event from listener {id}");
let listener = self.listeners.get_mut(&id).expect("resource disappeared");
for io in io {
if let Some(event) = listener.handle_io(io) {
self.service.handle_listener_event(id, event, time);
}
}
}
Err(err) => {
#[cfg(feature = "log")]
log::trace!(target: "reactor", "Listener {id} {err}");
let listener =
self.unregister_listener(id).expect("listener has disappeared");
self.service.handle_error(Error::ListenerDisconnect(id, listener));
}
}
} else if self.transports.contains_key(&id) {
match res {
Ok(io) => {
#[cfg(feature = "log")]
log::trace!(target: "reactor", "Got `{io}` event from transport {id}");
let transport = self.transports.get_mut(&id).expect("resource disappeared");
for io in io {
if let Some(event) = transport.handle_io(io) {
self.service.handle_transport_event(id, event, time);
}
}
}
Err(err) => {
#[cfg(feature = "log")]
log::trace!(target: "reactor", "Transport {id} {err}");
let transport =
self.unregister_transport(id).expect("transport has disappeared");
self.service.handle_error(Error::TransportDisconnect(id, transport));
}
}
} else {
panic!(
"file descriptor in reactor which is not a known waker, listener or transport"
)
}
}
awoken
}
fn handle_actions(&mut self, time: Timestamp) {
while let Some(action) = self.service.next() {
#[cfg(feature = "log")]
log::trace!(target: "reactor", "Handling action {action} from the service");
if let Err(err) = self.handle_action(action, time) {
#[cfg(feature = "log")]
log::error!(target: "reactor", "Error: {err}");
self.service.handle_error(err);
}
}
}
fn handle_action(
&mut self,
action: Action<H::Listener, H::Transport>,
time: Timestamp,
) -> Result<(), Error<H::Listener, H::Transport>> {
match action {
Action::RegisterListener(listener) => {
let fd = listener.as_raw_fd();
#[cfg(feature = "log")]
log::debug!(target: "reactor", "Registering listener with fd={fd}");
let id = self.poller.register(&listener, IoType::read_only());
self.listeners.insert(id, listener);
self.service.handle_registered(fd, id, ResourceType::Listener);
}
Action::RegisterTransport(transport) => {
let fd = transport.as_raw_fd();
#[cfg(feature = "log")]
log::debug!(target: "reactor", "Registering transport with fd={fd}");
let id = self.poller.register(&transport, IoType::read_only());
self.transports.insert(id, transport);
self.service.handle_registered(fd, id, ResourceType::Transport);
}
Action::UnregisterListener(id) => {
let Some(listener) = self.unregister_listener(id) else {
return Ok(());
};
#[cfg(feature = "log")]
log::debug!(target: "reactor", "Handling over listener {id}");
self.service.handover_listener(id, listener);
}
Action::UnregisterTransport(id) => {
let Some(transport) = self.unregister_transport(id) else {
return Ok(());
};
#[cfg(feature = "log")]
log::debug!(target: "reactor", "Handling over transport {id}");
self.service.handover_transport(id, transport);
}
Action::Send(id, data) => {
#[cfg(feature = "log")]
log::trace!(target: "reactor", "Sending {} bytes to {id}", data.len());
let Some(transport) = self.transports.get_mut(&id) else {
#[cfg(feature = "log")]
log::error!(target: "reactor", "Transport {id} is not in the reactor");
return Ok(());
};
match transport.write_atomic(&data) {
Err(WriteError::NotReady) => {
#[cfg(feature = "log")]
log::error!(target: "reactor", internal = true;
"An attempt to write to transport {id} before it got ready");
panic!(
"application business logic error: write to transport {id} which is \
read-only or not ready for a write operation"
);
}
Err(WriteError::Io(e)) => {
#[cfg(feature = "log")]
log::error!(target: "reactor", "Fatal error writing to transport {id}, disconnecting. Error details: {e:?}");
if let Some(transport) = self.unregister_transport(id) {
return Err(Error::TransportDisconnect(id, transport));
}
}
Ok(_) => {}
}
}
Action::SetTimer(duration) => {
#[cfg(feature = "log")]
log::debug!(target: "reactor", "Adding timer {duration:?} from now");
self.timeouts.set_timeout(duration, time);
}
}
Ok(())
}
fn handle_shutdown(self) {
#[cfg(feature = "log")]
log::info!(target: "reactor", "Shutdown");
}
fn unregister_listener(&mut self, id: ResourceId) -> Option<H::Listener> {
let Some(listener) = self.listeners.remove(&id) else {
#[cfg(feature = "log")]
log::warn!(target: "reactor", "Unregistering non-registered listener {id}");
return None;
};
#[cfg(feature = "log")]
log::debug!(target: "reactor", "Handling over listener {id} (fd={})", listener.as_raw_fd());
self.poller.unregister(id);
Some(listener)
}
fn unregister_transport(&mut self, id: ResourceId) -> Option<H::Transport> {
let Some(transport) = self.transports.remove(&id) else {
#[cfg(feature = "log")]
log::warn!(target: "reactor", "Unregistering non-registered transport {id}");
return None;
};
#[cfg(feature = "log")]
log::debug!(target: "reactor", "Unregistering over transport {id} (fd={})", transport.as_raw_fd());
self.poller.unregister(id);
Some(transport)
}
}
#[cfg(test)]
mod test {
use std::io::stdout;
use std::thread::sleep;
use super::*;
use crate::{poller, Io};
pub struct DumbRes(Box<dyn AsRawFd + Send>);
impl DumbRes {
pub fn new() -> DumbRes { DumbRes(Box::new(stdout())) }
}
impl AsRawFd for DumbRes {
fn as_raw_fd(&self) -> RawFd { self.0.as_raw_fd() }
}
impl io::Write for DumbRes {
fn write(&mut self, buf: &[u8]) -> io::Result<usize> { Ok(buf.len()) }
fn flush(&mut self) -> io::Result<()> { Ok(()) }
}
impl WriteAtomic for DumbRes {
fn is_ready_to_write(&self) -> bool { true }
fn empty_write_buf(&mut self) -> io::Result<bool> { Ok(true) }
fn write_or_buf(&mut self, _buf: &[u8]) -> io::Result<()> { Ok(()) }
}
impl Resource for DumbRes {
type Event = ();
fn interests(&self) -> IoType { IoType::read_write() }
fn handle_io(&mut self, _io: Io) -> Option<Self::Event> { None }
}
#[test]
fn timer() {
#[derive(Clone, Eq, PartialEq, Debug)]
enum Cmd {
Init,
Expect(Vec<Event>),
}
#[derive(Clone, Eq, PartialEq, Debug)]
enum Event {
Timer,
}
#[derive(Clone, Debug, Default)]
struct DumbService {
pub add_resource: bool,
pub set_timer: bool,
pub log: Vec<Event>,
}
impl Iterator for DumbService {
type Item = Action<DumbRes, DumbRes>;
fn next(&mut self) -> Option<Self::Item> {
if self.add_resource {
self.add_resource = false;
Some(Action::RegisterTransport(DumbRes::new()))
} else if self.set_timer {
self.set_timer = false;
Some(Action::SetTimer(Duration::from_millis(3)))
} else {
None
}
}
}
impl Handler for DumbService {
type Listener = DumbRes;
type Transport = DumbRes;
type Command = Cmd;
fn tick(&mut self, _time: Timestamp) {}
fn handle_timer(&mut self) {
self.log.push(Event::Timer);
self.set_timer = true;
}
fn handle_listener_event(
&mut self,
_d: ResourceId,
_event: <Self::Listener as Resource>::Event,
_time: Timestamp,
) {
unreachable!()
}
fn handle_transport_event(
&mut self,
_id: ResourceId,
_event: <Self::Transport as Resource>::Event,
_time: Timestamp,
) {
unreachable!()
}
fn handle_registered(&mut self, _fd: RawFd, _id: ResourceId, _ty: ResourceType) {}
fn handle_command(&mut self, cmd: Self::Command) {
match cmd {
Cmd::Init => {
self.add_resource = true;
self.set_timer = true;
}
Cmd::Expect(expected) => {
assert_eq!(expected, self.log);
}
}
}
fn handle_error(&mut self, err: Error<Self::Listener, Self::Transport>) {
panic!("{err}")
}
fn handover_listener(&mut self, _id: ResourceId, _listener: Self::Listener) {
unreachable!()
}
fn handover_transport(&mut self, _id: ResourceId, _transport: Self::Transport) {
unreachable!()
}
}
let reactor = Reactor::new(DumbService::default(), poller::popol::Poller::new()).unwrap();
reactor.controller().cmd(Cmd::Init).unwrap();
sleep(Duration::from_secs(2));
reactor.controller().cmd(Cmd::Expect(vec![Event::Timer; 6])).unwrap();
}
}