use tokio::sync::{mpsc, watch};
use crate::{device_state::DeviceState, status::StatusNode};
pub const NOTIFY_BUFFER: usize = 128;
#[derive(Debug, Clone, Copy, PartialEq, Eq, Default)]
pub struct NotifyWatchOpt(u64);
impl NotifyWatchOpt {
pub const fn empty() -> Self {
Self(0)
}
pub const INITIAL_STATE: Self = Self(1 << 1);
pub const INITIAL_NETMAP: Self = Self(1 << 3);
pub const fn contains(self, other: Self) -> bool {
self.0 & other.0 == other.0
}
}
impl core::ops::BitOr for NotifyWatchOpt {
type Output = Self;
fn bitor(self, rhs: Self) -> Self {
Self(self.0 | rhs.0)
}
}
#[derive(Debug, Clone, Default, PartialEq, Eq)]
#[non_exhaustive]
pub struct Notify {
pub state: Option<DeviceState>,
pub net_map: Option<Vec<StatusNode>>,
pub browse_to_url: Option<url::Url>,
}
impl Notify {
fn is_empty(&self) -> bool {
self.state.is_none() && self.net_map.is_none() && self.browse_to_url.is_none()
}
}
#[derive(Debug)]
pub struct IpnBusWatcher {
rx: mpsc::Receiver<Notify>,
}
impl IpnBusWatcher {
pub async fn next(&mut self) -> Option<Notify> {
self.rx.recv().await
}
}
pub(crate) fn spawn_watcher(
mask: NotifyWatchOpt,
state_rx: watch::Receiver<DeviceState>,
peer_rx: watch::Receiver<Vec<StatusNode>>,
browser_rx: watch::Receiver<Option<url::Url>>,
shutdown_rx: watch::Receiver<bool>,
) -> IpnBusWatcher {
let (tx, rx) = mpsc::channel(NOTIFY_BUFFER);
tokio::spawn(run_bus(
mask,
state_rx,
peer_rx,
browser_rx,
shutdown_rx,
tx,
));
IpnBusWatcher { rx }
}
fn deliver(tx: &mpsc::Sender<Notify>, n: Notify) -> bool {
match tx.try_send(n) {
Ok(()) => false,
Err(mpsc::error::TrySendError::Full(_)) => false,
Err(mpsc::error::TrySendError::Closed(_)) => true,
}
}
fn browse_url_for(state: &DeviceState) -> Option<url::Url> {
match state {
DeviceState::NeedsLogin(u) => Some(u.clone()),
_ => None,
}
}
fn state_notify(state: DeviceState) -> Notify {
let browse_to_url = browse_url_for(&state);
Notify {
state: Some(state),
net_map: None,
browse_to_url,
}
}
pub(crate) async fn run_bus(
mask: NotifyWatchOpt,
mut state_rx: watch::Receiver<DeviceState>,
mut peer_rx: watch::Receiver<Vec<StatusNode>>,
mut browser_rx: watch::Receiver<Option<url::Url>>,
mut shutdown_rx: watch::Receiver<bool>,
tx: mpsc::Sender<Notify>,
) {
if *shutdown_rx.borrow_and_update() {
return;
}
let mut initial = Notify::default();
{
let state = state_rx.borrow_and_update();
if mask.contains(NotifyWatchOpt::INITIAL_STATE) {
initial.browse_to_url = browse_url_for(&state);
initial.state = Some(state.clone());
}
}
{
let peers = peer_rx.borrow_and_update();
if mask.contains(NotifyWatchOpt::INITIAL_NETMAP) {
initial.net_map = Some(peers.clone());
}
}
browser_rx.borrow_and_update();
if !initial.is_empty() && deliver(&tx, initial) {
return;
}
loop {
tokio::select! {
biased;
_ = shutdown_rx.changed() => return,
_ = tx.closed() => return,
changed = state_rx.changed() => {
if changed.is_err() {
return;
}
let state = state_rx.borrow_and_update().clone();
if deliver(&tx, state_notify(state)) {
return;
}
}
changed = peer_rx.changed() => {
if changed.is_err() {
return;
}
let peers = peer_rx.borrow_and_update().clone();
let notify = Notify {
state: None,
net_map: Some(peers),
browse_to_url: None,
};
if deliver(&tx, notify) {
return;
}
}
changed = browser_rx.changed() => {
if changed.is_err() {
return;
}
let url = browser_rx.borrow_and_update().clone();
if let Some(url) = url {
let notify = Notify {
state: None,
net_map: None,
browse_to_url: Some(url),
};
if deliver(&tx, notify) {
return;
}
}
}
}
}
}
#[cfg(test)]
mod tests {
use core::time::Duration;
use tokio::sync::{mpsc, watch};
use super::*;
type Harness = (
watch::Sender<DeviceState>,
watch::Sender<Vec<StatusNode>>,
watch::Sender<Option<url::Url>>,
watch::Sender<bool>,
IpnBusWatcher,
);
fn harness(mask: NotifyWatchOpt, state: DeviceState, peers: Vec<StatusNode>) -> Harness {
let (state_tx, state_rx) = watch::channel(state);
let (peer_tx, peer_rx) = watch::channel(peers);
let (browser_tx, browser_rx) = watch::channel(None);
let (shutdown_tx, shutdown_rx) = watch::channel(false);
let (tx, rx) = mpsc::channel(NOTIFY_BUFFER);
tokio::spawn(run_bus(
mask,
state_rx,
peer_rx,
browser_rx,
shutdown_rx,
tx,
));
(
state_tx,
peer_tx,
browser_tx,
shutdown_tx,
IpnBusWatcher { rx },
)
}
fn login_url() -> url::Url {
"https://login.example/auth".parse().unwrap()
}
fn consent_url() -> url::Url {
"https://login.example/consent".parse().unwrap()
}
fn peer(id: &str) -> StatusNode {
use core::net::{IpAddr, Ipv4Addr, Ipv6Addr};
StatusNode {
stable_id: ts_control::StableNodeId(id.to_owned()),
display_name: id.to_owned(),
ipv4: IpAddr::V4(Ipv4Addr::new(100, 64, 0, 1)),
ipv6: IpAddr::V6(Ipv6Addr::LOCALHOST),
online: Some(true),
last_seen: None,
allowed_routes: Vec::new(),
is_exit_node: false,
cur_addr: None,
relay: None,
}
}
const QUIET_WINDOW: Duration = Duration::from_millis(250);
#[test]
fn mask_bitfield_semantics() {
assert!(NotifyWatchOpt::empty().contains(NotifyWatchOpt::empty()));
assert!(!NotifyWatchOpt::empty().contains(NotifyWatchOpt::INITIAL_STATE));
let both = NotifyWatchOpt::INITIAL_STATE | NotifyWatchOpt::INITIAL_NETMAP;
assert!(both.contains(NotifyWatchOpt::INITIAL_STATE));
assert!(both.contains(NotifyWatchOpt::INITIAL_NETMAP));
assert_eq!(NotifyWatchOpt::INITIAL_STATE, NotifyWatchOpt(1 << 1));
assert_eq!(NotifyWatchOpt::INITIAL_NETMAP, NotifyWatchOpt(1 << 3));
}
#[tokio::test]
async fn initial_state_snapshot_emitted_when_masked() {
let (_s, _p, _b, _sd, mut w) = harness(
NotifyWatchOpt::INITIAL_STATE,
DeviceState::Running,
Vec::new(),
);
let n = w.next().await.expect("initial snapshot");
assert_eq!(n.state, Some(DeviceState::Running));
assert_eq!(n.net_map, None);
assert_eq!(n.browse_to_url, None);
}
#[tokio::test]
async fn initial_netmap_snapshot_emitted_when_masked() {
let (_s, _p, _b, _sd, mut w) = harness(
NotifyWatchOpt::INITIAL_NETMAP,
DeviceState::Running,
Vec::new(),
);
let n = w.next().await.expect("initial snapshot");
assert_eq!(n.net_map, Some(Vec::new()));
assert_eq!(n.state, None);
}
#[tokio::test]
async fn initial_snapshot_coalesces_both_fields() {
let (_s, _p, _b, _sd, mut w) = harness(
NotifyWatchOpt::INITIAL_STATE | NotifyWatchOpt::INITIAL_NETMAP,
DeviceState::Running,
Vec::new(),
);
let n = w.next().await.expect("initial snapshot");
assert_eq!(n.state, Some(DeviceState::Running));
assert_eq!(n.net_map, Some(Vec::new()));
}
#[tokio::test]
async fn empty_mask_skips_initial_then_streams_change() {
let (state_tx, _p, _b, _sd, mut w) =
harness(NotifyWatchOpt::empty(), DeviceState::Connecting, Vec::new());
assert!(
tokio::time::timeout(QUIET_WINDOW, w.next()).await.is_err(),
"empty mask must not emit an initial snapshot"
);
state_tx.send_replace(DeviceState::Running);
let n = w.next().await.expect("change after subscribe");
assert_eq!(n.state, Some(DeviceState::Running));
}
#[tokio::test]
async fn needs_login_transition_derives_browse_to_url() {
let (state_tx, _p, _b, _sd, mut w) = harness(
NotifyWatchOpt::INITIAL_STATE,
DeviceState::Connecting,
Vec::new(),
);
let snap = w.next().await.expect("initial snapshot");
assert_eq!(snap.state, Some(DeviceState::Connecting));
assert_eq!(snap.browse_to_url, None);
state_tx.send_replace(DeviceState::NeedsLogin(login_url()));
let n = w.next().await.expect("needs-login event");
assert_eq!(n.state, Some(DeviceState::NeedsLogin(login_url())));
assert_eq!(n.browse_to_url, Some(login_url()));
}
#[tokio::test]
async fn initial_needs_login_includes_browse_to_url() {
let (_s, _p, _b, _sd, mut w) = harness(
NotifyWatchOpt::INITIAL_STATE,
DeviceState::NeedsLogin(login_url()),
Vec::new(),
);
let n = w.next().await.expect("initial snapshot");
assert_eq!(n.browse_to_url, Some(login_url()));
}
#[tokio::test]
async fn peer_change_streams_netmap() {
let (_s, peer_tx, _b, _sd, mut w) = harness(
NotifyWatchOpt::INITIAL_NETMAP,
DeviceState::Running,
Vec::new(),
);
let snap = w.next().await.expect("initial netmap snapshot");
assert_eq!(snap.net_map, Some(Vec::new()));
let peers = vec![peer("peer-a"), peer("peer-b")];
peer_tx.send_replace(peers.clone());
let n = w.next().await.expect("netmap change");
assert_eq!(n.net_map, Some(peers));
assert_eq!(n.state, None);
}
#[tokio::test]
async fn no_spurious_reemit_after_initial() {
let (state_tx, _p, _b, _sd, mut w) = harness(
NotifyWatchOpt::INITIAL_STATE | NotifyWatchOpt::INITIAL_NETMAP,
DeviceState::Running,
Vec::new(),
);
let _initial = w.next().await.expect("initial snapshot");
assert!(
tokio::time::timeout(QUIET_WINDOW, w.next()).await.is_err(),
"no change occurred, so no further notification must arrive"
);
state_tx.send_replace(DeviceState::Expired);
let n = w
.next()
.await
.expect("watcher still live after the quiet window");
assert_eq!(n.state, Some(DeviceState::Expired));
}
#[tokio::test]
async fn shutdown_terminates_stream() {
let (_s, _p, _b, shutdown_tx, mut w) =
harness(NotifyWatchOpt::empty(), DeviceState::Running, Vec::new());
shutdown_tx.send_replace(true);
assert_eq!(w.next().await, None, "shutdown must end the stream");
}
#[tokio::test]
async fn already_shutdown_ends_immediately() {
let (state_tx, state_rx) = watch::channel(DeviceState::Running);
let (peer_tx, peer_rx) = watch::channel(Vec::new());
let (browser_tx, browser_rx) = watch::channel(None);
let (_shutdown_tx, shutdown_rx) = watch::channel(true);
let (tx, rx) = mpsc::channel(NOTIFY_BUFFER);
tokio::spawn(run_bus(
NotifyWatchOpt::INITIAL_STATE,
state_rx,
peer_rx,
browser_rx,
shutdown_rx,
tx,
));
let mut w = IpnBusWatcher { rx };
assert_eq!(w.next().await, None, "already-shutdown must emit nothing");
drop((state_tx, peer_tx, browser_tx));
}
#[tokio::test]
async fn source_sender_drop_terminates_stream() {
let (state_tx, _p, _b, _sd, mut w) =
harness(NotifyWatchOpt::empty(), DeviceState::Running, Vec::new());
drop((state_tx, _p, _b, _sd));
assert_eq!(w.next().await, None, "all senders gone must end the stream");
}
#[tokio::test]
async fn streamed_events_are_per_source_not_coalesced() {
let (state_tx, peer_tx, _b, _sd, mut w) = harness(
NotifyWatchOpt::INITIAL_STATE,
DeviceState::Connecting,
Vec::new(),
);
let _snap = w.next().await.expect("initial snapshot barrier");
state_tx.send_replace(DeviceState::Running);
peer_tx.send_replace(vec![peer("peer-a")]);
let first = w.next().await.expect("first event");
let second = w.next().await.expect("second event");
for n in [&first, &second] {
assert!(
n.state.is_some() ^ n.net_map.is_some(),
"each streamed Notify carries exactly one of state / net_map, got {n:?}"
);
}
assert!(
first.state.is_some() || second.state.is_some(),
"a state event arrived"
);
assert!(
first.net_map.is_some() || second.net_map.is_some(),
"a net_map event arrived"
);
}
#[tokio::test]
async fn sequential_state_transitions_stream_in_order() {
let (state_tx, _p, _b, _sd, mut w) = harness(
NotifyWatchOpt::INITIAL_STATE,
DeviceState::Connecting,
Vec::new(),
);
assert_eq!(
w.next().await.expect("snapshot").state,
Some(DeviceState::Connecting)
);
for next in [
DeviceState::Running,
DeviceState::NeedsLogin(login_url()),
DeviceState::Expired,
] {
state_tx.send_replace(next.clone());
let n = w.next().await.expect("transition");
assert_eq!(n.state, Some(next.clone()));
assert_eq!(n.net_map, None);
let expect_url = matches!(next, DeviceState::NeedsLogin(_)).then(login_url);
assert_eq!(n.browse_to_url, expect_url);
}
}
#[tokio::test]
async fn expired_and_failed_states_stream_without_url() {
for state in [
DeviceState::Expired,
DeviceState::Failed(crate::RegistrationError::AuthRejected("bad key".into())),
] {
let (state_tx, _p, _b, _sd, mut w) = harness(
NotifyWatchOpt::INITIAL_STATE,
DeviceState::Connecting,
Vec::new(),
);
let _snap = w.next().await.expect("snapshot barrier");
state_tx.send_replace(state.clone());
let n = w.next().await.expect("state event");
assert_eq!(n.state, Some(state));
assert_eq!(n.browse_to_url, None);
}
}
#[tokio::test]
async fn full_buffer_drops_and_never_blocks_producer() {
let (state_tx, _p, _b, shutdown_tx, mut w) =
harness(NotifyWatchOpt::empty(), DeviceState::Connecting, Vec::new());
for _ in 0..(NOTIFY_BUFFER * 2 + 16) {
state_tx.send_replace(DeviceState::Running);
state_tx.send_replace(DeviceState::Connecting);
tokio::task::yield_now().await;
}
shutdown_tx.send_replace(true);
let mut drained = 0usize;
while let Some(_n) = w.next().await {
drained += 1;
assert!(
drained <= NOTIFY_BUFFER,
"buffer must be bounded at NOTIFY_BUFFER ({NOTIFY_BUFFER}), drained {drained}"
);
}
}
#[tokio::test]
async fn consumer_drop_terminates_task() {
let (state_tx, _p, _b, _sd, w) =
harness(NotifyWatchOpt::empty(), DeviceState::Connecting, Vec::new());
assert_eq!(
state_tx.receiver_count(),
1,
"bus task holds the source receiver"
);
drop(w);
while state_tx.receiver_count() != 0 {
tokio::task::yield_now().await;
}
assert_eq!(
state_tx.receiver_count(),
0,
"bus task must reclaim (drop its source receiver) once the consumer is gone"
);
}
#[tokio::test]
async fn running_node_browser_url_streams_standalone() {
let (_s, _p, browser_tx, _sd, mut w) = harness(
NotifyWatchOpt::INITIAL_STATE,
DeviceState::Running,
Vec::new(),
);
let snap = w.next().await.expect("initial snapshot");
assert_eq!(snap.state, Some(DeviceState::Running));
assert_eq!(
snap.browse_to_url, None,
"running-node URL is not front-loaded"
);
browser_tx.send_replace(Some(consent_url()));
let n = w.next().await.expect("browse-to-url event");
assert_eq!(n.browse_to_url, Some(consent_url()));
assert_eq!(n.state, None);
assert_eq!(n.net_map, None);
}
#[tokio::test]
async fn running_node_browser_url_not_in_initial_snapshot() {
let (state_tx, state_rx) = watch::channel(DeviceState::Running);
let (peer_tx, peer_rx) = watch::channel(Vec::new());
let (browser_tx, browser_rx) = watch::channel(Some(consent_url()));
let (shutdown_tx, shutdown_rx) = watch::channel(false);
let (tx, rx) = mpsc::channel(NOTIFY_BUFFER);
tokio::spawn(run_bus(
NotifyWatchOpt::INITIAL_STATE | NotifyWatchOpt::INITIAL_NETMAP,
state_rx,
peer_rx,
browser_rx,
shutdown_rx,
tx,
));
let mut w = IpnBusWatcher { rx };
let snap = w.next().await.expect("initial snapshot");
assert_eq!(snap.state, Some(DeviceState::Running));
assert_eq!(snap.net_map, Some(Vec::new()));
assert_eq!(
snap.browse_to_url, None,
"pre-existing running-node URL must not be front-loaded"
);
let next = consent_url();
let mut next2 = next.clone();
next2.set_path("/consent2");
browser_tx.send_replace(Some(next2.clone()));
let n = w.next().await.expect("browser-url change after subscribe");
assert_eq!(n.browse_to_url, Some(next2));
drop((state_tx, peer_tx, shutdown_tx));
}
#[tokio::test]
async fn sequential_browser_urls_stream_each() {
let (_s, _p, browser_tx, _sd, mut w) = harness(
NotifyWatchOpt::INITIAL_STATE,
DeviceState::Running,
Vec::new(),
);
let _snap = w.next().await.expect("snapshot barrier");
let url_a = consent_url();
let mut url_b = consent_url();
url_b.set_path("/consent-b");
browser_tx.send_replace(Some(url_a.clone()));
assert_eq!(
w.next().await.expect("first url").browse_to_url,
Some(url_a)
);
browser_tx.send_replace(Some(url_b.clone()));
assert_eq!(
w.next().await.expect("second url").browse_to_url,
Some(url_b)
);
}
#[tokio::test]
async fn browser_url_and_state_change_interleave() {
let (state_tx, _p, browser_tx, _sd, mut w) = harness(
NotifyWatchOpt::INITIAL_STATE,
DeviceState::Running,
Vec::new(),
);
let _snap = w.next().await.expect("snapshot barrier");
browser_tx.send_replace(Some(consent_url()));
state_tx.send_replace(DeviceState::Expired);
let a = w.next().await.expect("first event");
let b = w.next().await.expect("second event");
for n in [&a, &b] {
assert!(
n.state.is_some() ^ n.browse_to_url.is_some(),
"each streamed event carries exactly one of state / browse_to_url, got {n:?}"
);
assert_eq!(n.net_map, None);
}
assert!(
a.browse_to_url.is_some() || b.browse_to_url.is_some(),
"a browse_to_url event arrived"
);
assert!(
a.state.is_some() || b.state.is_some(),
"a state event arrived"
);
}
}