use crate::GuardMgrInner;
use crate::pending::{GuardStatus, RequestId};
use futures::{channel::mpsc, stream::StreamExt};
#[cfg(test)]
use oneshot_fused_workaround as oneshot;
use tor_proto::ClockSkew;
use tracing::instrument;
use std::sync::{Mutex, Weak};
#[derive(Debug)]
pub(crate) enum Msg {
Status(RequestId, GuardStatus, Option<ClockSkew>),
#[cfg(test)]
Ping(oneshot::Sender<()>),
}
#[instrument(skip_all, level = "trace")]
pub(crate) async fn report_status_events(
runtime: impl tor_rtcompat::SleepProvider,
inner: Weak<Mutex<GuardMgrInner>>,
mut events: mpsc::UnboundedReceiver<Msg>,
) {
loop {
match events.next().await {
Some(Msg::Status(id, status, skew)) => {
if let Some(inner) = inner.upgrade() {
let mut inner = inner.lock().expect("Poisoned lock");
inner.handle_msg(id, status, skew, &runtime);
} else {
return;
}
}
#[cfg(test)]
Some(Msg::Ping(sender)) => {
let _ignore = sender.send(());
}
None => return,
}
}
}
#[instrument(skip_all, level = "trace")]
pub(crate) async fn run_periodic<R: tor_rtcompat::SleepProvider>(
runtime: R,
inner: Weak<Mutex<GuardMgrInner>>,
) {
loop {
let delay = if let Some(inner) = inner.upgrade() {
let mut inner = inner.lock().expect("Poisoned lock");
let wallclock = runtime.wallclock();
let now = runtime.now();
inner.run_periodic_events(wallclock, now)
} else {
return;
};
runtime.sleep(delay).await;
}
}
#[instrument(skip_all, level = "trace")]
pub(crate) async fn keep_netdir_updated<RT: tor_rtcompat::Runtime>(
runtime: RT,
inner: Weak<Mutex<GuardMgrInner>>,
netdir_provider: Weak<dyn tor_netdir::NetDirProvider>,
) {
use tor_netdir::DirEvent;
let mut event_stream = match netdir_provider.upgrade().map(|p| p.events()) {
Some(s) => s,
None => return,
};
while let Some(event) = event_stream.next().await {
match event {
DirEvent::NewConsensus | DirEvent::NewDescriptors => {
if let Some(inner) = inner.upgrade() {
let mut inner = inner.lock().expect("Poisoned lock");
inner.update(runtime.wallclock(), runtime.now());
} else {
return;
}
}
_ => {}
}
}
}
#[cfg(feature = "bridge-client")]
#[instrument(level = "trace", skip_all)]
pub(crate) async fn keep_bridge_descs_updated<RT: tor_rtcompat::Runtime>(
runtime: RT,
inner: Weak<Mutex<GuardMgrInner>>,
bridge_desc_provider: Weak<dyn crate::bridge::BridgeDescProvider>,
) {
use crate::bridge::BridgeDescEvent as E;
let mut event_stream = match bridge_desc_provider.upgrade().map(|p| p.events()) {
Some(s) => s,
None => return,
};
while let Some(event) = event_stream.next().await {
match event {
E::SomethingChanged => {
if let Some(inner) = inner.upgrade() {
let mut inner = inner.lock().expect("Poisoned lock");
inner.update(runtime.wallclock(), runtime.now());
} else {
return;
}
}
}
}
}