1#![allow(unused_variables)] use std::collections::HashMap;
27use std::fmt::{Debug, Display, Formatter};
28use std::os::unix::io::{AsRawFd, RawFd};
29use std::thread::JoinHandle;
30use std::time::Duration;
31use std::{io, thread};
32
33use crossbeam_channel as chan;
34
35use crate::poller::{IoType, Poll, Waker, WakerRecv, WakerSend};
36use crate::resource::WriteError;
37use crate::{Resource, ResourceId, ResourceType, Timer, Timestamp, WriteAtomic};
38
39const WAIT_TIMEOUT: Duration = Duration::from_secs(60 * 60);
41
42#[derive(Error, Display, From)]
44#[display(doc_comments)]
45pub enum Error<L: Resource, T: Resource> {
46 ListenerDisconnect(ResourceId, L),
48
49 TransportDisconnect(ResourceId, T),
51
52 Poll(io::Error),
54}
55
56impl<L: Resource, T: Resource> Debug for Error<L, T> {
57 fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result { Display::fmt(self, f) }
58}
59
60#[derive(Display)]
64pub enum Action<L: Resource, T: Resource> {
65 #[display("register_listener")]
70 RegisterListener(L),
71
72 #[display("register_transport")]
77 RegisterTransport(T),
78
79 #[display("unregister_listener")]
86 UnregisterListener(ResourceId),
87
88 #[display("unregister_transport")]
95 UnregisterTransport(ResourceId),
96
97 #[display("send_to({0})")]
99 Send(ResourceId, Vec<u8>),
100
101 #[display("set_timer({0:?})")]
105 SetTimer(Duration),
106}
107
108pub trait Handler: Send + Iterator<Item = Action<Self::Listener, Self::Transport>> {
110 type Listener: Resource;
116
117 type Transport: Resource;
122
123 type Command: Debug + Send;
133
134 fn tick(&mut self, time: Timestamp);
136
137 fn handle_timer(&mut self);
141
142 fn handle_listener_event(
147 &mut self,
148 id: ResourceId,
149 event: <Self::Listener as Resource>::Event,
150 time: Timestamp,
151 );
152
153 fn handle_transport_event(
155 &mut self,
156 id: ResourceId,
157 event: <Self::Transport as Resource>::Event,
158 time: Timestamp,
159 );
160
161 fn handle_registered(&mut self, fd: RawFd, id: ResourceId, ty: ResourceType);
168
169 fn handle_command(&mut self, cmd: Self::Command);
174
175 fn handle_error(&mut self, err: Error<Self::Listener, Self::Transport>);
180
181 fn handover_listener(&mut self, id: ResourceId, listener: Self::Listener);
187
188 fn handover_transport(&mut self, id: ResourceId, transport: Self::Transport);
194}
195
196pub struct Reactor<C, P: Poll> {
202 thread: JoinHandle<()>,
203 controller: Controller<C, <P::Waker as Waker>::Send>,
204}
205
206impl<C, P: Poll> Reactor<C, P> {
207 pub fn new<H: Handler<Command = C>>(service: H, poller: P) -> Result<Self, io::Error>
217 where
218 H: 'static,
219 P: 'static,
220 C: 'static + Send,
221 {
222 Reactor::with(service, poller, thread::Builder::new())
223 }
224
225 pub fn named<H: Handler<Command = C>>(
236 service: H,
237 poller: P,
238 thread_name: String,
239 ) -> Result<Self, io::Error>
240 where
241 H: 'static,
242 P: 'static,
243 C: 'static + Send,
244 {
245 Reactor::with(service, poller, thread::Builder::new().name(thread_name))
246 }
247
248 pub fn with<H: Handler<Command = C>>(
259 service: H,
260 mut poller: P,
261 builder: thread::Builder,
262 ) -> Result<Self, io::Error>
263 where
264 H: 'static,
265 P: 'static,
266 C: 'static + Send,
267 {
268 let (ctl_send, ctl_recv) = chan::unbounded();
269
270 let (waker_writer, waker_reader) = P::Waker::pair()?;
271
272 let controller = Controller {
273 ctl_send,
274 waker: waker_writer,
275 };
276
277 #[cfg(feature = "log")]
278 log::debug!(target: "reactor-controller", "Initializing reactor thread...");
279
280 let runtime_controller = controller.clone();
281 let thread = builder.spawn(move || {
282 #[cfg(feature = "log")]
283 log::debug!(target: "reactor", "Registering waker (fd {})", waker_reader.as_raw_fd());
284 poller.register_waker(&waker_reader);
285
286 let runtime = Runtime {
287 service,
288 poller,
289 controller: runtime_controller,
290 ctl_recv,
291 listeners: empty!(),
292 transports: empty!(),
293 waker: waker_reader,
294 timeouts: Timer::new(),
295 };
296
297 #[cfg(feature = "log")]
298 log::info!(target: "reactor", "Entering reactor event loop");
299
300 runtime.run();
301 })?;
302
303 controller.wake()?;
305 Ok(Self { thread, controller })
306 }
307
308 pub fn controller(&self) -> Controller<C, <P::Waker as Waker>::Send> { self.controller.clone() }
313
314 pub fn join(self) -> thread::Result<()> { self.thread.join() }
316}
317
318enum Ctl<C> {
319 Cmd(C),
320 Shutdown,
321}
322
323pub struct Controller<C, W: WakerSend> {
330 ctl_send: chan::Sender<Ctl<C>>,
331 waker: W,
332}
333
334impl<C, W: WakerSend> Clone for Controller<C, W> {
335 fn clone(&self) -> Self {
336 Controller {
337 ctl_send: self.ctl_send.clone(),
338 waker: self.waker.clone(),
339 }
340 }
341}
342
343impl<C, W: WakerSend> Controller<C, W> {
344 #[allow(unused_mut)] pub fn cmd(&self, mut command: C) -> Result<(), io::Error>
347 where C: 'static {
348 #[cfg(feature = "log")]
349 {
350 use std::any::Any;
351
352 let cmd = Box::new(command);
353 let any = cmd as Box<dyn Any>;
354 let any = match any.downcast::<Box<dyn Debug>>() {
355 Err(any) => {
356 log::debug!(target: "reactor-controller", "Sending command to the reactor");
357 any
358 }
359 Ok(debug) => {
360 log::debug!(target: "reactor-controller", "Sending command {debug:?} to the reactor");
361 debug
362 }
363 };
364 command = *any.downcast().expect("from upcast");
365 }
366
367 self.ctl_send.send(Ctl::Cmd(command)).map_err(|_| io::ErrorKind::BrokenPipe)?;
368 self.wake()?;
369 Ok(())
370 }
371
372 pub fn shutdown(self) -> Result<(), Self> {
374 #[cfg(feature = "log")]
375 log::info!(target: "reactor-controller", "Initiating reactor shutdown...");
376
377 let res1 = self.ctl_send.send(Ctl::Shutdown);
378 let res2 = self.wake();
379 res1.or(res2).map_err(|_| self)
380 }
381
382 fn wake(&self) -> io::Result<()> {
383 #[cfg(feature = "log")]
384 log::trace!(target: "reactor-controller", "Wakening the reactor");
385 self.waker.wake()
386 }
387}
388
389pub struct Runtime<H: Handler, P: Poll> {
397 service: H,
398 poller: P,
399 controller: Controller<H::Command, <P::Waker as Waker>::Send>,
400 ctl_recv: chan::Receiver<Ctl<H::Command>>,
401 listeners: HashMap<ResourceId, H::Listener>,
402 transports: HashMap<ResourceId, H::Transport>,
403 waker: <P::Waker as Waker>::Recv,
404 timeouts: Timer,
405}
406
407impl<H: Handler, P: Poll> Runtime<H, P> {
408 pub fn with(service: H, mut poller: P) -> io::Result<Self> {
411 let (ctl_send, ctl_recv) = chan::unbounded();
412
413 let (waker_writer, waker_reader) = P::Waker::pair()?;
414
415 #[cfg(feature = "log")]
416 log::debug!(target: "reactor", "Registering waker (fd {})", waker_reader.as_raw_fd());
417 poller.register_waker(&waker_reader);
418
419 let controller = Controller {
420 ctl_send,
421 waker: waker_writer,
422 };
423
424 Ok(Runtime {
425 service,
426 poller,
427 controller,
428 ctl_recv,
429 listeners: empty!(),
430 transports: empty!(),
431 waker: waker_reader,
432 timeouts: Timer::new(),
433 })
434 }
435
436 pub fn controller(&self) -> Controller<H::Command, <P::Waker as Waker>::Send> {
441 self.controller.clone()
442 }
443
444 fn run(mut self) {
445 loop {
446 let before_poll = Timestamp::now();
447 let timeout = self.timeouts.next_expiring_from(before_poll).unwrap_or(WAIT_TIMEOUT);
448
449 for (id, res) in &self.listeners {
450 self.poller.set_interest(*id, res.interests());
451 }
452 for (id, res) in &self.transports {
453 self.poller.set_interest(*id, res.interests());
454 }
455
456 #[cfg(feature = "log")]
458 log::trace!(target: "reactor", "Polling with timeout {timeout:?}");
459
460 let res = self.poller.poll(Some(timeout));
461 let now = Timestamp::now();
462 self.service.tick(now);
463
464 let timers_fired = self.timeouts.remove_expired_by(now);
467 if timers_fired > 0 {
468 #[cfg(feature = "log")]
469 log::trace!(target: "reactor", "Timer has fired");
470 self.service.handle_timer();
471 }
472
473 match res {
474 Ok(0) if timers_fired == 0 => {
475 #[cfg(feature = "log")]
476 log::trace!(target: "reactor", "Poll timeout; no I/O events had happened");
477 }
478 Err(err) => {
479 #[cfg(feature = "log")]
480 log::error!(target: "reactor", "Error during polling: {err}");
481 self.service.handle_error(Error::Poll(err));
482 }
483 _ => {}
484 }
485
486 let awoken = self.handle_events(now);
487
488 if awoken {
490 loop {
491 match self.ctl_recv.try_recv() {
492 Err(chan::TryRecvError::Empty) => break,
493 Err(chan::TryRecvError::Disconnected) => {
494 panic!("control channel is broken")
495 }
496 Ok(Ctl::Shutdown) => return self.handle_shutdown(),
497 Ok(Ctl::Cmd(cmd)) => self.service.handle_command(cmd),
498 }
499 }
500 }
501
502 self.handle_actions(now);
503 }
504 }
505
506 fn handle_events(&mut self, time: Timestamp) -> bool {
510 let mut awoken = false;
511
512 while let Some((id, res)) = self.poller.next() {
513 if id == ResourceId::WAKER {
514 if let Err(err) = res {
515 #[cfg(feature = "log")]
516 log::error!(target: "reactor", "Polling waker has failed: {err}");
517 panic!("waker failure: {err}");
518 };
519
520 #[cfg(feature = "log")]
521 log::trace!(target: "reactor", "Awoken by the controller");
522
523 self.waker.reset();
524 awoken = true;
525 } else if self.listeners.contains_key(&id) {
526 match res {
527 Ok(io) => {
528 #[cfg(feature = "log")]
529 log::trace!(target: "reactor", "Got `{io}` event from listener {id}");
530
531 let listener = self.listeners.get_mut(&id).expect("resource disappeared");
532 for io in io {
533 if let Some(event) = listener.handle_io(io) {
534 self.service.handle_listener_event(id, event, time);
535 }
536 }
537 }
538 Err(err) => {
539 #[cfg(feature = "log")]
540 log::trace!(target: "reactor", "Listener {id} {err}");
541 let listener =
542 self.unregister_listener(id).expect("listener has disappeared");
543 self.service.handle_error(Error::ListenerDisconnect(id, listener));
544 }
545 }
546 } else if self.transports.contains_key(&id) {
547 match res {
548 Ok(io) => {
549 #[cfg(feature = "log")]
550 log::trace!(target: "reactor", "Got `{io}` event from transport {id}");
551
552 let transport = self.transports.get_mut(&id).expect("resource disappeared");
553 for io in io {
554 if let Some(event) = transport.handle_io(io) {
555 self.service.handle_transport_event(id, event, time);
556 }
557 }
558 }
559 Err(err) => {
560 #[cfg(feature = "log")]
561 log::trace!(target: "reactor", "Transport {id} {err}");
562 let transport =
563 self.unregister_transport(id).expect("transport has disappeared");
564 self.service.handle_error(Error::TransportDisconnect(id, transport));
565 }
566 }
567 } else {
568 panic!(
569 "file descriptor in reactor which is not a known waker, listener or transport"
570 )
571 }
572 }
573
574 awoken
575 }
576
577 fn handle_actions(&mut self, time: Timestamp) {
578 while let Some(action) = self.service.next() {
579 #[cfg(feature = "log")]
580 log::trace!(target: "reactor", "Handling action {action} from the service");
581
582 if let Err(err) = self.handle_action(action, time) {
585 #[cfg(feature = "log")]
586 log::error!(target: "reactor", "Error: {err}");
587 self.service.handle_error(err);
588 }
589 }
590 }
591
592 fn handle_action(
598 &mut self,
599 action: Action<H::Listener, H::Transport>,
600 time: Timestamp,
601 ) -> Result<(), Error<H::Listener, H::Transport>> {
602 match action {
603 Action::RegisterListener(listener) => {
604 let fd = listener.as_raw_fd();
605
606 #[cfg(feature = "log")]
607 log::debug!(target: "reactor", "Registering listener with fd={fd}");
608
609 let id = self.poller.register(&listener, IoType::read_only());
610 self.listeners.insert(id, listener);
611 self.service.handle_registered(fd, id, ResourceType::Listener);
612 }
613 Action::RegisterTransport(transport) => {
614 let fd = transport.as_raw_fd();
615
616 #[cfg(feature = "log")]
617 log::debug!(target: "reactor", "Registering transport with fd={fd}");
618
619 let id = self.poller.register(&transport, IoType::read_only());
620 self.transports.insert(id, transport);
621 self.service.handle_registered(fd, id, ResourceType::Transport);
622 }
623 Action::UnregisterListener(id) => {
624 let Some(listener) = self.unregister_listener(id) else {
625 return Ok(());
626 };
627 #[cfg(feature = "log")]
628 log::debug!(target: "reactor", "Handling over listener {id}");
629 self.service.handover_listener(id, listener);
630 }
631 Action::UnregisterTransport(id) => {
632 let Some(transport) = self.unregister_transport(id) else {
633 return Ok(());
634 };
635 #[cfg(feature = "log")]
636 log::debug!(target: "reactor", "Handling over transport {id}");
637 self.service.handover_transport(id, transport);
638 }
639 Action::Send(id, data) => {
640 #[cfg(feature = "log")]
641 log::trace!(target: "reactor", "Sending {} bytes to {id}", data.len());
642
643 let Some(transport) = self.transports.get_mut(&id) else {
644 #[cfg(feature = "log")]
645 log::error!(target: "reactor", "Transport {id} is not in the reactor");
646
647 return Ok(());
648 };
649 match transport.write_atomic(&data) {
650 Err(WriteError::NotReady) => {
651 #[cfg(feature = "log")]
652 log::error!(target: "reactor", internal = true;
653 "An attempt to write to transport {id} before it got ready");
654 panic!(
655 "application business logic error: write to transport {id} which is \
656 read-only or not ready for a write operation"
657 );
658 }
659 Err(WriteError::Io(e)) => {
660 #[cfg(feature = "log")]
661 log::error!(target: "reactor", "Fatal error writing to transport {id}, disconnecting. Error details: {e:?}");
662 if let Some(transport) = self.unregister_transport(id) {
663 return Err(Error::TransportDisconnect(id, transport));
664 }
665 }
666 Ok(_) => {}
667 }
668 }
669 Action::SetTimer(duration) => {
670 #[cfg(feature = "log")]
671 log::debug!(target: "reactor", "Adding timer {duration:?} from now");
672
673 self.timeouts.set_timeout(duration, time);
674 }
675 }
676 Ok(())
677 }
678
679 fn handle_shutdown(self) {
680 #[cfg(feature = "log")]
681 log::info!(target: "reactor", "Shutdown");
682
683 }
685
686 fn unregister_listener(&mut self, id: ResourceId) -> Option<H::Listener> {
687 let Some(listener) = self.listeners.remove(&id) else {
688 #[cfg(feature = "log")]
689 log::warn!(target: "reactor", "Unregistering non-registered listener {id}");
690 return None;
691 };
692
693 #[cfg(feature = "log")]
694 log::debug!(target: "reactor", "Handling over listener {id} (fd={})", listener.as_raw_fd());
695
696 self.poller.unregister(id);
697
698 Some(listener)
699 }
700
701 fn unregister_transport(&mut self, id: ResourceId) -> Option<H::Transport> {
702 let Some(transport) = self.transports.remove(&id) else {
703 #[cfg(feature = "log")]
704 log::warn!(target: "reactor", "Unregistering non-registered transport {id}");
705 return None;
706 };
707
708 #[cfg(feature = "log")]
709 log::debug!(target: "reactor", "Unregistering over transport {id} (fd={})", transport.as_raw_fd());
710
711 self.poller.unregister(id);
712
713 Some(transport)
714 }
715}
716
717#[cfg(test)]
718mod test {
719 use std::io::stdout;
720 use std::thread::sleep;
721
722 use super::*;
723 use crate::{poller, Io};
724
725 pub struct DumbRes(Box<dyn AsRawFd + Send>);
726 impl DumbRes {
727 pub fn new() -> DumbRes { DumbRes(Box::new(stdout())) }
728 }
729 impl AsRawFd for DumbRes {
730 fn as_raw_fd(&self) -> RawFd { self.0.as_raw_fd() }
731 }
732 impl io::Write for DumbRes {
733 fn write(&mut self, buf: &[u8]) -> io::Result<usize> { Ok(buf.len()) }
734 fn flush(&mut self) -> io::Result<()> { Ok(()) }
735 }
736 impl WriteAtomic for DumbRes {
737 fn is_ready_to_write(&self) -> bool { true }
738 fn empty_write_buf(&mut self) -> io::Result<bool> { Ok(true) }
739 fn write_or_buf(&mut self, _buf: &[u8]) -> io::Result<()> { Ok(()) }
740 }
741 impl Resource for DumbRes {
742 type Event = ();
743 fn interests(&self) -> IoType { IoType::read_write() }
744 fn handle_io(&mut self, _io: Io) -> Option<Self::Event> { None }
745 }
746
747 #[test]
748 fn timer() {
749 #[derive(Clone, Eq, PartialEq, Debug)]
750 enum Cmd {
751 Init,
752 Expect(Vec<Event>),
753 }
754 #[derive(Clone, Eq, PartialEq, Debug)]
755 enum Event {
756 Timer,
757 }
758 #[derive(Clone, Debug, Default)]
759 struct DumbService {
760 pub add_resource: bool,
761 pub set_timer: bool,
762 pub log: Vec<Event>,
763 }
764 impl Iterator for DumbService {
765 type Item = Action<DumbRes, DumbRes>;
766 fn next(&mut self) -> Option<Self::Item> {
767 if self.add_resource {
768 self.add_resource = false;
769 Some(Action::RegisterTransport(DumbRes::new()))
770 } else if self.set_timer {
771 self.set_timer = false;
772 Some(Action::SetTimer(Duration::from_millis(3)))
773 } else {
774 None
775 }
776 }
777 }
778 impl Handler for DumbService {
779 type Listener = DumbRes;
780 type Transport = DumbRes;
781 type Command = Cmd;
782
783 fn tick(&mut self, _time: Timestamp) {}
784 fn handle_timer(&mut self) {
785 self.log.push(Event::Timer);
786 self.set_timer = true;
787 }
788 fn handle_listener_event(
789 &mut self,
790 _d: ResourceId,
791 _event: <Self::Listener as Resource>::Event,
792 _time: Timestamp,
793 ) {
794 unreachable!()
795 }
796 fn handle_transport_event(
797 &mut self,
798 _id: ResourceId,
799 _event: <Self::Transport as Resource>::Event,
800 _time: Timestamp,
801 ) {
802 unreachable!()
803 }
804 fn handle_registered(&mut self, _fd: RawFd, _id: ResourceId, _ty: ResourceType) {}
805 fn handle_command(&mut self, cmd: Self::Command) {
806 match cmd {
807 Cmd::Init => {
808 self.add_resource = true;
809 self.set_timer = true;
810 }
811 Cmd::Expect(expected) => {
812 assert_eq!(expected, self.log);
813 }
814 }
815 }
816 fn handle_error(&mut self, err: Error<Self::Listener, Self::Transport>) {
817 panic!("{err}")
818 }
819 fn handover_listener(&mut self, _id: ResourceId, _listener: Self::Listener) {
820 unreachable!()
821 }
822 fn handover_transport(&mut self, _id: ResourceId, _transport: Self::Transport) {
823 unreachable!()
824 }
825 }
826
827 let reactor = Reactor::new(DumbService::default(), poller::popol::Poller::new()).unwrap();
828 reactor.controller().cmd(Cmd::Init).unwrap();
829 sleep(Duration::from_secs(2));
830 reactor.controller().cmd(Cmd::Expect(vec![Event::Timer; 6])).unwrap();
831 }
832}