use futures::stream::{Stream, StreamExt};
use log::{debug, warn};
use std::collections::HashSet;
use std::pin::Pin;
use tokio::select;
use tokio::sync::watch;
use zbus::Connection;
use zvariant::OwnedObjectPath;
use crate::Result;
use crate::api::models::ConnectionError;
use crate::dbus::{NMAccessPointProxy, NMDeviceProxy, NMProxy, NMWirelessProxy};
use crate::types::constants::device_type;
type NetworkChangeStream = Pin<Box<dyn Stream<Item = NetworkChange> + Send>>;
enum NetworkChange {
Added(OwnedObjectPath),
Removed(OwnedObjectPath),
SignalStrengthChanged,
}
pub async fn monitor_network_changes<F>(
conn: &Connection,
mut shutdown: watch::Receiver<()>,
callback: F,
) -> Result<()>
where
F: Fn() + Send + 'static,
{
let nm = NMProxy::new(conn).await?;
let devices = nm.get_devices().await?;
let mut streams: Vec<NetworkChangeStream> = Vec::new();
let mut monitored_access_points = HashSet::new();
for dev_path in devices {
let dev = NMDeviceProxy::builder(conn)
.path(dev_path.clone())?
.build()
.await?;
if dev.device_type().await? != device_type::WIFI {
continue;
}
let wifi = NMWirelessProxy::builder(conn)
.path(dev_path.clone())?
.build()
.await?;
let added_stream = wifi.receive_access_point_added().await?;
let removed_stream = wifi.receive_access_point_removed().await?;
streams.push(Box::pin(added_stream.map(|signal| {
signal.args().map_or_else(
|err| {
debug!("Failed to parse AccessPointAdded signal: {err}");
NetworkChange::SignalStrengthChanged
},
|args| NetworkChange::Added(args.path().clone()),
)
})));
streams.push(Box::pin(removed_stream.map(|signal| {
signal.args().map_or_else(
|err| {
debug!("Failed to parse AccessPointRemoved signal: {err}");
NetworkChange::SignalStrengthChanged
},
|args| NetworkChange::Removed(args.path().clone()),
)
})));
match wifi.access_points().await {
Ok(ap_paths) => {
for ap_path in ap_paths {
if !monitored_access_points.insert(ap_path.to_string()) {
continue;
}
match access_point_strength_stream(conn, ap_path.clone()).await {
Ok(stream) => streams.push(stream),
Err(err) => debug!(
"Failed to monitor signal strength for access point {}: {}",
ap_path, err
),
}
}
}
Err(err) => debug!("Failed to list access points on device {dev_path}: {err}"),
}
debug!("Subscribed to network change signals on device: {dev_path}");
}
if streams.is_empty() {
warn!("No Wi-Fi devices found to monitor");
return Err(ConnectionError::NoWifiDevice);
}
debug!(
"Monitoring {} signal streams for network changes",
streams.len()
);
let mut merged = futures::stream::select_all(streams);
loop {
select! {
_ = shutdown.changed() => {
debug!("Network monitoring shutdown requested");
break;
}
signal = merged.next() => {
match signal {
Some(NetworkChange::Added(path)) => {
if monitored_access_points.insert(path.to_string()) {
match access_point_strength_stream(conn, path.clone()).await {
Ok(stream) => merged.push(stream),
Err(err) => debug!(
"Failed to monitor signal strength for access point {}: {}",
path, err
),
}
}
callback();
}
Some(NetworkChange::Removed(path)) => {
monitored_access_points.remove(path.as_str());
callback();
}
Some(NetworkChange::SignalStrengthChanged) => callback(),
None => break,
}
}
}
}
while let Some(_signal) = merged.next().await {
debug!("Network change detected");
callback();
}
Err(ConnectionError::Stuck("monitoring stream ended".into()))
}
async fn access_point_strength_stream(
conn: &Connection,
ap_path: OwnedObjectPath,
) -> Result<NetworkChangeStream> {
let ap = NMAccessPointProxy::builder(conn)
.path(ap_path.clone())?
.build()
.await?;
let stream = ap.receive_strength_changed().await.skip(1).map(move |_| {
debug!("Access point signal strength changed: {ap_path}");
NetworkChange::SignalStrengthChanged
});
Ok(Box::pin(stream))
}