1mod controller;
2mod listener;
3mod session;
4mod timer;
5mod token;
6mod transport;
7
8use std::collections::HashMap;
9use std::fmt::{Debug, Display, Formatter};
10use std::io::ErrorKind;
11use std::sync::Arc;
12use std::thread::JoinHandle;
13use std::time::{Duration, Instant};
14use std::{io, thread};
15
16use crossbeam_channel::{unbounded, Receiver, TryRecvError};
17use mio::event::{Event, Source};
18use mio::{Events, Interest, Poll, Waker};
19use thiserror::Error;
20
21use timer::Timer;
22use token::WAKER;
23
24use crate::wire;
25
26pub(crate) use self::controller::{ControlMessage, Controller};
27pub(crate) use listener::Listener;
28pub use session::{NoiseSession, ProtocolArtifact, Socks5Session};
29pub(crate) use token::{Token, Tokens};
30pub(crate) use transport::{SessionEvent, Transport};
31
32const SECONDS_IN_AN_HOUR: u64 = 60 * 60;
33
34const WAIT_TIMEOUT: Duration = Duration::from_secs(SECONDS_IN_AN_HOUR);
36
37const LAG_TIMEOUT: Duration = Duration::from_millis(100);
42
43pub trait EventHandler {
45 type Reaction;
49
50 fn interests(&self) -> Option<Interest>;
52
53 fn handle(&mut self, event: &Event) -> Vec<Self::Reaction>;
56}
57
58pub trait WriteAtomic: std::io::Write {
63 fn write_atomic(&mut self, buf: &[u8]) -> io::Result<()> {
72 use ErrorKind::*;
73
74 if !self.is_ready_to_write() {
75 panic!("WriteAtomic::write_atomic was called when the resource is not ready to write");
76 }
77
78 let result = self.write_or_buf(buf);
79
80 debug_assert!(
81 !matches!(
82 result.as_ref().err().map(|err| err.kind()),
83 Some(Interrupted | WouldBlock | WriteZero)
84 ),
85 "WriteAtomic::write_or_buf must handle errors of kind {Interrupted:?}, {WouldBlock:?}, {WriteZero:?} by buffering",
86 );
87
88 result
89 }
90
91 fn is_ready_to_write(&self) -> bool;
93
94 fn write_or_buf(&mut self, buf: &[u8]) -> io::Result<()>;
103}
104
105#[derive(Error)]
107pub enum Error<L: EventHandler, T: EventHandler> {
108 #[error("listener {0:?} got disconnected during poll operation")]
109 ListenerDisconnect(Token, L),
110
111 #[error("transport {0:?} got disconnected during poll operation")]
112 TransportDisconnect(Token, T),
113
114 #[error("registration of a resource has failed: {0}")]
115 Poll(io::Error),
116
117 #[error("registration of a resource has failed: {0}")]
118 Registration(io::Error),
119}
120
121impl<L: EventHandler, T: EventHandler> Debug for Error<L, T> {
122 fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
123 Display::fmt(self, f)
124 }
125}
126
127pub enum Action<L, T> {
131 RegisterListener(Token, L),
136
137 RegisterTransport(Token, T),
142
143 #[allow(dead_code)] UnregisterListener(Token),
151
152 UnregisterTransport(Token),
159
160 Send(Token, Vec<u8>),
162
163 SetTimer(Duration),
168}
169
170impl<L: EventHandler, T: EventHandler> Display for Action<L, T> {
171 fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
172 match self {
173 Action::RegisterListener(token, _listener) => f
174 .debug_struct("RegisterListener")
175 .field("token", token)
176 .field("listener", &"<omitted>")
177 .finish(),
178 Action::RegisterTransport(token, _transport) => f
179 .debug_struct("RegisterTransport")
180 .field("token", token)
181 .field("transport", &"<omitted>")
182 .finish(),
183 Action::UnregisterListener(token) => f
184 .debug_struct("UnregisterListener")
185 .field("token", token)
186 .finish(),
187 Action::UnregisterTransport(token) => f
188 .debug_struct("UnregisterTransport")
189 .field("token", token)
190 .finish(),
191 Action::Send(token, _data) => f
192 .debug_struct("Send")
193 .field("token", token)
194 .field("data", &"<omitted>")
195 .finish(),
196 Action::SetTimer(duration) => f
197 .debug_struct("SetTimer")
198 .field("duration", duration)
199 .finish(),
200 }
201 }
202}
203
204pub trait ReactionHandler: Send + Iterator<Item = Action<Self::Listener, Self::Transport>> {
206 type Listener: EventHandler + Source + Send + Debug;
212
213 type Transport: EventHandler + Source + Send + Debug + WriteAtomic;
218
219 fn tick(&mut self);
221
222 fn timer_reacted(&mut self);
226
227 fn listener_reacted(
232 &mut self,
233 token: Token,
234 reaction: <Self::Listener as EventHandler>::Reaction,
235 instant: Instant,
236 );
237
238 fn transport_reacted(
240 &mut self,
241 token: Token,
242 reaction: <Self::Transport as EventHandler>::Reaction,
243 instant: Instant,
244 );
245
246 fn listener_registered(&mut self, token: Token, listener: &Self::Listener);
252
253 fn transport_registered(&mut self, token: Token, transport: &Self::Transport);
259
260 fn handle_command(&mut self, cmd: wire::Control);
266
267 fn handle_error(&mut self, err: Error<Self::Listener, Self::Transport>);
272
273 fn handover_listener(&mut self, token: Token, listener: Self::Listener);
279
280 fn handover_transport(&mut self, token: Token, transport: Self::Transport);
286}
287
288pub struct Reactor {
294 thread: JoinHandle<()>,
295 controller: Controller,
296}
297
298impl Reactor {
299 pub fn new<H>(service: H, thread_name: String) -> Result<Self, io::Error>
305 where
306 H: 'static + ReactionHandler,
307 {
308 let builder = thread::Builder::new().name(thread_name);
309 let (sender, receiver) = unbounded();
310 let poll = Poll::new()?;
311 let controller = Controller::new(sender, Arc::new(Waker::new(poll.registry(), WAKER)?));
312
313 log::debug!(target: "reactor-controller", "Initializing reactor thread...");
314 let thread = builder.spawn(move || {
315 let runtime = Runtime {
316 service,
317 poll,
318 receiver,
319 listeners: HashMap::new(),
320 transports: HashMap::new(),
321 timeouts: Timer::new(),
322 };
323
324 log::info!(target: "reactor", "Entering reactor event loop");
325
326 runtime.run();
327 })?;
328
329 controller.wake()?;
331
332 Ok(Self { thread, controller })
333 }
334
335 pub fn controller(&self) -> Controller {
338 self.controller.clone()
339 }
340
341 pub fn join(self) -> thread::Result<()> {
343 self.thread.join()
344 }
345}
346
347pub struct Runtime<H: ReactionHandler> {
352 service: H,
353 poll: Poll,
354 receiver: Receiver<ControlMessage>,
355 listeners: HashMap<Token, H::Listener>,
356 transports: HashMap<Token, H::Transport>,
357 timeouts: Timer,
358}
359
360impl<H: ReactionHandler> Runtime<H> {
361 fn register_interests(&mut self) -> io::Result<()> {
362 let registry = self.poll.registry();
363 for (id, res) in self.listeners.iter_mut() {
364 match res.interests() {
365 None => registry.deregister(res)?,
366 Some(interests) => registry.reregister(res, *id, interests)?,
367 };
368 }
369 for (id, res) in self.transports.iter_mut() {
370 match res.interests() {
371 None => registry.deregister(res)?,
372 Some(interests) => registry.reregister(res, *id, interests)?,
373 };
374 }
375 Ok(())
376 }
377
378 fn run(mut self) {
379 loop {
380 let timeout = self
381 .timeouts
382 .next_expiring_from(Instant::now())
383 .unwrap_or(WAIT_TIMEOUT);
384
385 self.register_interests()
386 .expect("registering interests must work to ensure correct operation");
387
388 log::trace!(target: "reactor", "Polling with timeout {timeout:?}");
389
390 let mut events = Events::with_capacity(1024);
391
392 let res = self.poll.poll(&mut events, Some(timeout));
394
395 let tick = Instant::now();
398
399 self.service.tick();
401
402 if let Err(err) = res {
404 log::warn!(target: "reactor", "Failure during polling: {err}");
405 self.service.handle_error(Error::Poll(err));
406 }
407
408 let timers_fired = self.timeouts.remove_expired_by(tick);
413 if timers_fired > 0 {
414 log::trace!(target: "reactor", "Timer has fired");
415 self.service.timer_reacted();
416 }
417
418 if self.handle_events(tick, events) {
419 loop {
421 use ControlMessage::*;
422 use TryRecvError::*;
423
424 match self.receiver.try_recv() {
425 Ok(Command(cmd)) => self.service.handle_command(*cmd),
426 Ok(Shutdown) => return self.handle_shutdown(),
427 Err(Empty) => break,
428 Err(Disconnected) => panic!("control channel disconnected unexpectedly"),
429 }
430 }
431 }
432
433 let duration = Instant::now().duration_since(tick);
434 if duration > LAG_TIMEOUT {
435 log::warn!(target: "reactor", "Service was busy {:?} which exceeds the timeout of {:?}", duration, LAG_TIMEOUT);
436 }
437
438 self.handle_actions(tick);
439 }
440 }
441
442 fn handle_events(&mut self, instant: Instant, events: Events) -> bool {
446 log::trace!(target: "reactor", "Handling events");
447 let mut awoken = false;
448 let mut deregistered = Vec::new();
449
450 for event in events.into_iter() {
451 let token = event.token();
452
453 if token == WAKER {
454 log::trace!(target: "reactor", "Awoken by the controller");
455 awoken = true;
456 } else if self.listeners.contains_key(&token) {
457 log::trace!(target: "reactor", token=token.0; "Event from listener with token {}: {:?}", token.0, event);
458 if !event.is_error() {
459 let listener = self
460 .listeners
461 .get_mut(&token)
462 .expect("resource disappeared");
463 listener
464 .handle(event)
465 .into_iter()
466 .for_each(|service_event| {
467 self.service.listener_reacted(token, service_event, instant);
468 });
469 } else {
470 let listener = self.deregister_listener(token).unwrap_or_else(|| {
471 panic!("listener with token {} has disappeared", token.0)
472 });
473 self.service
474 .handle_error(Error::ListenerDisconnect(token, listener));
475 deregistered.push(token);
476 }
477 } else if self.transports.contains_key(&token) {
478 log::trace!(target: "reactor", token=token.0; "Event from transport with token {}: {:?}", token.0, event);
479 if !event.is_error() {
480 let transport = self
481 .transports
482 .get_mut(&token)
483 .expect("resource disappeared");
484 transport
485 .handle(event)
486 .into_iter()
487 .for_each(|service_event| {
488 self.service
489 .transport_reacted(token, service_event, instant);
490 });
491 } else {
492 let transport = self.deregister_transport(token).unwrap_or_else(|| {
493 panic!("transport with token {} has disappeared", token.0)
494 });
495 self.service
496 .handle_error(Error::TransportDisconnect(token, transport));
497 deregistered.push(token);
498 }
499 } else if !deregistered.contains(&token) {
500 log::debug!(target: "reactor", token=token.0; "Event from unknown token {}: {:?}", token.0, event);
501 }
502 }
503
504 awoken
505 }
506
507 fn handle_actions(&mut self, instant: Instant) {
508 while let Some(action) = self.service.next() {
509 log::trace!(target: "reactor", "Handling action {action} from the service");
510
511 if let Err(err) = self.handle_action(action, instant) {
514 log::warn!(target: "reactor", "Failure: {err}");
515 self.service.handle_error(err);
516 }
517 }
518 }
519
520 fn handle_action(
521 &mut self,
522 action: Action<H::Listener, H::Transport>,
523 instant: Instant,
524 ) -> Result<(), Error<H::Listener, H::Transport>> {
525 match action {
526 Action::RegisterListener(token, mut listener) => {
527 log::trace!(target: "reactor", token=token.0; "Registering listener {:?} with token {}", listener, token.0);
528
529 self.poll
530 .registry()
531 .register(&mut listener, token, Interest::READABLE)
532 .map_err(Error::Registration)?;
533 self.listeners.insert(token, listener);
534 self.service
535 .listener_registered(token, &self.listeners[&token]);
536 }
537 Action::RegisterTransport(token, mut transport) => {
538 log::debug!(target: "reactor", token=token.0; "Registering transport");
539
540 self.poll
541 .registry()
542 .register(&mut transport, token, Interest::READABLE)
543 .map_err(Error::Registration)?;
544 self.transports.insert(token, transport);
545 self.service
546 .transport_registered(token, &self.transports[&token]);
547 }
548 Action::UnregisterListener(token) => {
549 let Some(listener) = self.deregister_listener(token) else {
550 return Ok(());
551 };
552
553 log::debug!(target: "reactor", token=token.0; "Handing over listener {listener:?} with token {}", token.0);
554 self.service.handover_listener(token, listener);
555 }
556 Action::UnregisterTransport(token) => {
557 let Some(transport) = self.deregister_transport(token) else {
558 return Ok(());
559 };
560
561 log::debug!(target: "reactor", token=token.0; "Handing over transport {transport:?} with token {}", token.0);
562 self.service.handover_transport(token, transport);
563 }
564 Action::Send(token, data) => {
565 log::trace!(target: "reactor", token=token.0; "Sending {} bytes to {token:?}", data.len());
566
567 if let Some(transport) = self.transports.get_mut(&token) {
568 if let Err(e) = transport.write_atomic(&data) {
569 log::error!(target: "reactor", "Fatal error writing to transport {token:?}, disconnecting. Error details: {e:?}");
570 if let Some(transport) = self.deregister_transport(token) {
571 return Err(Error::TransportDisconnect(token, transport));
572 }
573 }
574 } else {
575 log::debug!(target: "reactor", token=token.0; "No transport with token {token:?} is known!");
576 }
577 }
578 Action::SetTimer(duration) => {
579 log::trace!(target: "reactor", "Adding timer {duration:?} from now");
580
581 self.timeouts.set_timeout(duration, instant);
582 }
583 }
584 Ok(())
585 }
586
587 fn handle_shutdown(self) {
588 log::info!(target: "reactor", "Shutdown");
589 }
590
591 fn deregister_listener(&mut self, token: Token) -> Option<H::Listener> {
592 let Some(mut source) = self.listeners.remove(&token) else {
593 log::debug!(target: "reactor", token=token.0; "Deregistering non-registered listener with token {}", token.0);
594 return None;
595 };
596
597 if let Err(err) = self.poll.registry().deregister(&mut source) {
598 log::debug!(target: "reactor", token=token.0; "Failed to deregister listener with token {} from mio: {err}", token.0);
599 }
600
601 Some(source)
602 }
603
604 fn deregister_transport(&mut self, token: Token) -> Option<H::Transport> {
605 let Some(mut source) = self.transports.remove(&token) else {
606 log::debug!(target: "reactor", token=token.0; "Deregistering non-registered transport with token {}", token.0);
607 return None;
608 };
609
610 if let Err(err) = self.poll.registry().deregister(&mut source) {
611 log::debug!(target: "reactor", token=token.0; "Failed to deregister transport with token {} from mio: {err}", token.0);
612 }
613
614 Some(source)
615 }
616}