use std::{borrow::Cow, fmt, time::SystemTime};
use derive_more::Display;
use educe::Educe;
use futures::{Stream, StreamExt};
use tor_basic_utils::skip_fmt;
use tor_chanmgr::{ConnBlockage, ConnStatus, ConnStatusEvents};
use tor_circmgr::{ClockSkewEvents, SkewEstimate};
use tor_dirmgr::{DirBlockage, DirBootstrapStatus};
use tracing::debug;
#[derive(Debug, Clone, Default)]
pub struct BootstrapStatus {
conn_status: ConnStatus,
dir_status: DirBootstrapStatus,
skew: Option<SkewEstimate>,
}
impl BootstrapStatus {
pub fn as_frac(&self) -> f32 {
self.conn_status.frac() * 0.15 + self.dir_status.frac_at(SystemTime::now()) * 0.85
}
pub fn ready_for_traffic(&self) -> bool {
let now = SystemTime::now();
self.conn_status.usable() && self.dir_status.usable_at(now)
}
pub fn blocked(&self) -> Option<Blockage> {
if let Some(b) = self.conn_status.blockage() {
let message = b.to_string().into();
let kind = b.into();
if matches!(kind, BlockageKind::ClockSkewed) && self.skew_is_noteworthy() {
Some(Blockage {
kind,
message: format!("Clock is {}", self.skew.as_ref().expect("logic error"))
.into(),
})
} else {
Some(Blockage { kind, message })
}
} else if let Some(b) = self.dir_status.blockage(SystemTime::now()) {
let message = b.to_string().into();
let kind = b.into();
Some(Blockage { kind, message })
} else {
None
}
}
fn apply_conn_status(&mut self, status: ConnStatus) {
self.conn_status = status;
}
fn apply_dir_status(&mut self, status: DirBootstrapStatus) {
self.dir_status = status;
}
fn apply_skew_estimate(&mut self, status: Option<SkewEstimate>) {
self.skew = status;
}
fn skew_is_noteworthy(&self) -> bool {
matches!(&self.skew, Some(s) if s.noteworthy())
}
}
#[derive(Clone, Debug, Display)]
#[display(fmt = "{} ({})", "kind", "message")]
pub struct Blockage {
kind: BlockageKind,
message: Cow<'static, str>,
}
#[derive(Clone, Debug, Display)]
#[non_exhaustive]
pub enum BlockageKind {
#[display(fmt = "We seem to be offline")]
Offline,
#[display(fmt = "Our internet connection seems filtered")]
Filtering,
#[display(fmt = "Can't reach the Tor network")]
CantReachTor,
#[display(fmt = "Clock is skewed.")]
ClockSkewed,
#[display(fmt = "Can't bootstrap a Tor directory.")]
CantBootstrap,
}
impl From<ConnBlockage> for BlockageKind {
fn from(b: ConnBlockage) -> BlockageKind {
match b {
ConnBlockage::NoTcp => BlockageKind::Offline,
ConnBlockage::NoHandshake => BlockageKind::Filtering,
ConnBlockage::CertsExpired => BlockageKind::ClockSkewed,
_ => BlockageKind::CantReachTor,
}
}
}
impl From<DirBlockage> for BlockageKind {
fn from(_: DirBlockage) -> Self {
BlockageKind::CantBootstrap
}
}
impl fmt::Display for BootstrapStatus {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
let percent = (self.as_frac() * 100.0).round() as u32;
if let Some(problem) = self.blocked() {
write!(f, "Stuck at {}%: {}", percent, problem)?;
} else {
write!(
f,
"{}%: {}; {}",
percent, &self.conn_status, &self.dir_status
)?;
}
if let Some(skew) = &self.skew {
if skew.noteworthy() {
write!(f, ". Clock is {}", skew)?;
}
}
Ok(())
}
}
pub(crate) async fn report_status(
mut sender: postage::watch::Sender<BootstrapStatus>,
conn_status: ConnStatusEvents,
dir_status: impl Stream<Item = DirBootstrapStatus> + Send + Unpin,
skew_status: ClockSkewEvents,
) {
#[allow(clippy::large_enum_variant)]
enum Event {
Conn(ConnStatus),
Dir(DirBootstrapStatus),
Skew(Option<SkewEstimate>),
}
let mut stream = futures::stream::select_all(vec![
conn_status.map(Event::Conn).boxed(),
dir_status.map(Event::Dir).boxed(),
skew_status.map(Event::Skew).boxed(),
]);
while let Some(event) = stream.next().await {
let mut b = sender.borrow_mut();
match event {
Event::Conn(e) => b.apply_conn_status(e),
Event::Dir(e) => b.apply_dir_status(e),
Event::Skew(e) => b.apply_skew_estimate(e),
}
debug!("{}", *b);
}
}
#[derive(Clone, Educe)]
#[educe(Debug)]
pub struct BootstrapEvents {
#[educe(Debug(method = "skip_fmt"))]
pub(crate) inner: postage::watch::Receiver<BootstrapStatus>,
}
impl Stream for BootstrapEvents {
type Item = BootstrapStatus;
fn poll_next(
mut self: std::pin::Pin<&mut Self>,
cx: &mut std::task::Context<'_>,
) -> std::task::Poll<Option<Self::Item>> {
self.inner.poll_next_unpin(cx)
}
}