#![allow(dead_code, unreachable_pub)]
use educe::Educe;
use futures::{Stream, StreamExt};
use postage::watch;
use std::{
fmt,
time::{Duration, Instant},
};
use tor_basic_utils::skip_fmt;
#[derive(Default, Debug, Clone)]
pub struct ConnStatus {
online: Option<bool>,
auth_works: Option<bool>,
handshake_works: Option<bool>,
}
#[derive(Debug, Clone, Eq, PartialEq, derive_more::Display)]
#[non_exhaustive]
pub enum ConnBlockage {
#[display("unable to connect to the internet")]
NoTcp,
#[display("our internet connection seems to be filtered")]
NoHandshake,
#[display("relays all seem to be using expired certificates")]
CertsExpired,
}
impl ConnStatus {
fn eq(&self, other: &ConnStatus) -> bool {
self.online == other.online && self.handshake_works == other.handshake_works
}
pub fn usable(&self) -> bool {
self.online == Some(true) && self.handshake_works == Some(true)
}
pub fn frac(&self) -> f32 {
match self {
Self {
online: Some(true),
auth_works: Some(true),
handshake_works: Some(true),
} => 1.0,
Self {
online: Some(true), ..
} => 0.5,
_ => 0.0,
}
}
pub fn blockage(&self) -> Option<ConnBlockage> {
match self {
Self {
online: Some(false),
..
} => Some(ConnBlockage::NoTcp),
Self {
auth_works: Some(false),
..
} => Some(ConnBlockage::NoHandshake),
Self {
handshake_works: Some(false),
..
} => Some(ConnBlockage::CertsExpired),
_ => None,
}
}
}
impl fmt::Display for ConnStatus {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
match self {
ConnStatus { online: None, .. } => write!(f, "connecting to the internet"),
ConnStatus {
online: Some(false),
..
} => write!(f, "unable to connect to the internet"),
ConnStatus {
handshake_works: None,
..
} => write!(f, "handshaking with Tor relays"),
ConnStatus {
auth_works: Some(true),
handshake_works: Some(false),
..
} => write!(
f,
"unable to handshake with Tor relays, possibly due to clock skew"
),
ConnStatus {
handshake_works: Some(false),
..
} => write!(f, "unable to handshake with Tor relays"),
ConnStatus {
online: Some(true),
handshake_works: Some(true),
..
} => write!(f, "connecting successfully"),
}
}
}
#[derive(Clone, Educe)]
#[educe(Debug)]
pub struct ConnStatusEvents {
#[educe(Debug(method = "skip_fmt"))]
inner: watch::Receiver<ConnStatus>,
}
impl Stream for ConnStatusEvents {
type Item = ConnStatus;
fn poll_next(
mut self: std::pin::Pin<&mut Self>,
cx: &mut std::task::Context<'_>,
) -> std::task::Poll<Option<Self::Item>> {
self.inner.poll_next_unpin(cx)
}
}
#[derive(Debug, Clone)]
struct ChanMgrStatus {
startup: Instant,
n_attempts: usize,
last_tcp_success: Option<Instant>,
last_tls_success: Option<Instant>,
last_chan_auth_success: Option<Instant>,
last_chan_success: Option<Instant>,
}
impl ChanMgrStatus {
fn new_at(now: Instant) -> ChanMgrStatus {
ChanMgrStatus {
startup: now,
n_attempts: 0,
last_tcp_success: None,
last_tls_success: None,
last_chan_auth_success: None,
last_chan_success: None,
}
}
fn conn_status_at(&self, now: Instant) -> ConnStatus {
const MIN_DURATION: Duration = Duration::from_secs(60);
const MIN_ATTEMPTS: usize = 6;
let early = now < self.startup + MIN_DURATION || self.n_attempts < MIN_ATTEMPTS;
let online = match (self.last_tcp_success.is_some(), early) {
(true, _) => Some(true),
(_, true) => None,
(false, false) => Some(false),
};
let auth_works = match (self.last_chan_auth_success.is_some(), early) {
(true, _) => Some(true),
(_, true) => None,
(false, false) => Some(false),
};
let handshake_works = match (self.last_chan_success.is_some(), early) {
(true, _) => Some(true),
(_, true) => None,
(false, false) => Some(false),
};
ConnStatus {
online,
auth_works,
handshake_works,
}
}
fn record_attempt(&mut self) {
self.n_attempts += 1;
}
fn record_tcp_success(&mut self, now: Instant) {
self.last_tcp_success = Some(now);
}
fn record_tls_finished(&mut self, now: Instant) {
self.last_tls_success = Some(now);
}
fn record_handshake_done_with_skewed_clock(&mut self, now: Instant) {
self.last_chan_auth_success = Some(now);
}
fn record_handshake_done(&mut self, now: Instant) {
self.last_chan_auth_success = Some(now);
self.last_chan_success = Some(now);
}
}
pub(crate) struct ChanMgrEventSender {
last_conn_status: ConnStatus,
mgr_status: ChanMgrStatus,
sender: watch::Sender<ConnStatus>,
}
impl ChanMgrEventSender {
fn push_at(&mut self, now: Instant) {
let status = self.mgr_status.conn_status_at(now);
if !status.eq(&self.last_conn_status) {
self.last_conn_status = status.clone();
let mut b = self.sender.borrow_mut();
*b = status;
}
}
pub(crate) fn record_attempt(&mut self) {
self.mgr_status.record_attempt();
self.push_at(Instant::now());
}
pub(crate) fn record_tcp_success(&mut self) {
let now = Instant::now();
self.mgr_status.record_tcp_success(now);
self.push_at(now);
}
pub(crate) fn record_tls_finished(&mut self) {
let now = Instant::now();
self.mgr_status.record_tls_finished(now);
self.push_at(now);
}
pub(crate) fn record_handshake_done_with_skewed_clock(&mut self) {
let now = Instant::now();
self.mgr_status.record_handshake_done_with_skewed_clock(now);
self.push_at(now);
}
pub(crate) fn record_handshake_done(&mut self) {
let now = Instant::now();
self.mgr_status.record_handshake_done(now);
self.push_at(now);
}
}
pub(crate) fn channel() -> (ChanMgrEventSender, ConnStatusEvents) {
let (sender, receiver) = watch::channel();
let receiver = ConnStatusEvents { inner: receiver };
let sender = ChanMgrEventSender {
last_conn_status: ConnStatus::default(),
mgr_status: ChanMgrStatus::new_at(Instant::now()),
sender,
};
(sender, receiver)
}
#[cfg(test)]
#[allow(clippy::cognitive_complexity)]
mod test {
#![allow(clippy::bool_assert_comparison)]
#![allow(clippy::clone_on_copy)]
#![allow(clippy::dbg_macro)]
#![allow(clippy::mixed_attributes_style)]
#![allow(clippy::print_stderr)]
#![allow(clippy::print_stdout)]
#![allow(clippy::single_char_pattern)]
#![allow(clippy::unwrap_used)]
#![allow(clippy::unchecked_time_subtraction)]
#![allow(clippy::useless_vec)]
#![allow(clippy::needless_pass_by_value)]
use super::*;
use float_eq::assert_float_eq;
const TOL: f32 = 0.00001;
#[test]
fn status_basics() {
let s1 = ConnStatus::default();
assert_eq!(s1.to_string(), "connecting to the internet");
assert_float_eq!(s1.frac(), 0.0, abs <= TOL);
assert!(s1.eq(&s1));
assert!(s1.blockage().is_none());
assert!(!s1.usable());
let s2 = ConnStatus {
online: Some(false),
auth_works: None,
handshake_works: None,
};
assert_eq!(s2.to_string(), "unable to connect to the internet");
assert_float_eq!(s2.frac(), 0.0, abs <= TOL);
assert!(s2.eq(&s2));
assert!(!s2.eq(&s1));
assert_eq!(s2.blockage(), Some(ConnBlockage::NoTcp));
assert_eq!(
s2.blockage().unwrap().to_string(),
"unable to connect to the internet"
);
assert!(!s2.usable());
let s3 = ConnStatus {
online: Some(true),
auth_works: None,
handshake_works: None,
};
assert_eq!(s3.to_string(), "handshaking with Tor relays");
assert_float_eq!(s3.frac(), 0.5, abs <= TOL);
assert_eq!(s3.blockage(), None);
assert!(!s3.eq(&s1));
assert!(!s3.usable());
let s4 = ConnStatus {
online: Some(true),
auth_works: Some(false),
handshake_works: Some(false),
};
assert_eq!(s4.to_string(), "unable to handshake with Tor relays");
assert_float_eq!(s4.frac(), 0.5, abs <= TOL);
assert_eq!(s4.blockage(), Some(ConnBlockage::NoHandshake));
assert_eq!(
s4.blockage().unwrap().to_string(),
"our internet connection seems to be filtered"
);
assert!(!s4.eq(&s1));
assert!(!s4.eq(&s2));
assert!(!s4.eq(&s3));
assert!(s4.eq(&s4));
assert!(!s4.usable());
let s5 = ConnStatus {
online: Some(true),
auth_works: Some(true),
handshake_works: Some(true),
};
assert_eq!(s5.to_string(), "connecting successfully");
assert_float_eq!(s5.frac(), 1.0, abs <= TOL);
assert!(s5.blockage().is_none());
assert!(s5.eq(&s5));
assert!(!s5.eq(&s4));
assert!(s5.usable());
}
#[test]
fn derive_status() {
let start = Instant::now();
let sec = Duration::from_secs(1);
let hour = Duration::from_secs(3600);
let mut ms = ChanMgrStatus::new_at(start);
let s0 = ms.conn_status_at(start);
assert!(s0.online.is_none());
assert!(s0.handshake_works.is_none());
let s = ms.conn_status_at(start + hour);
assert!(s.eq(&s0));
for _ in 0..10 {
ms.record_attempt();
}
let s = ms.conn_status_at(start);
assert!(s.eq(&s0));
let s = ms.conn_status_at(start + hour);
assert_eq!(s.online, Some(false));
assert_eq!(s.handshake_works, Some(false));
ms.record_tcp_success(start + sec);
let s = ms.conn_status_at(start + sec * 2);
assert_eq!(s.online, Some(true));
assert!(s.handshake_works.is_none());
let s = ms.conn_status_at(start + hour);
assert_eq!(s.online, Some(true));
assert_eq!(s.handshake_works, Some(false));
ms.record_handshake_done(start + sec * 2);
let s = ms.conn_status_at(start + sec * 3);
assert_eq!(s.online, Some(true));
assert_eq!(s.handshake_works, Some(true));
}
#[test]
fn sender() {
let (mut snd, rcv) = channel();
{
let s = rcv.inner.borrow().clone();
assert_float_eq!(s.frac(), 0.0, abs <= TOL);
}
snd.record_attempt();
snd.record_tcp_success();
snd.record_tls_finished();
snd.record_handshake_done();
{
let s = rcv.inner.borrow().clone();
assert_float_eq!(s.frac(), 1.0, abs <= TOL);
}
}
}