use log::{debug, warn};
use std::borrow::Cow;
use std::collections::HashMap;
use std::str;
use zbus::Connection;
use zvariant::OwnedObjectPath;
use crate::Result;
use crate::api::models::ConnectionStateReason;
use crate::dbus::{
NMAccessPointProxy, NMActiveConnectionProxy, NMDeviceProxy, NMProxy, NMWirelessProxy,
};
use crate::types::constants::{device_type, frequency, signal_strength, wifi_mode};
pub(crate) fn channel_from_freq(mhz: u32) -> Option<u16> {
match mhz {
frequency::BAND_2_4_START..=frequency::BAND_2_4_END => {
Some(((mhz - frequency::BAND_2_4_START) / frequency::CHANNEL_SPACING + 1) as u16)
}
frequency::BAND_2_4_CH14 => Some(14),
frequency::BAND_5_START..=frequency::BAND_5_END => {
Some(((mhz - 5000) / frequency::CHANNEL_SPACING) as u16)
}
frequency::BAND_6_START..=frequency::BAND_6_END => {
Some(((mhz - frequency::BAND_6_START) / frequency::CHANNEL_SPACING + 1) as u16)
}
_ => None,
}
}
pub(crate) fn bars_from_strength(s: u8) -> &'static str {
match s {
0..=signal_strength::BAR_1_MAX => "▂___",
signal_strength::BAR_2_MIN..=signal_strength::BAR_2_MAX => "▂▄__",
signal_strength::BAR_3_MIN..=signal_strength::BAR_3_MAX => "▂▄▆_",
_ => "▂▄▆█",
}
}
pub(crate) fn mode_to_string(m: u32) -> &'static str {
match m {
wifi_mode::ADHOC => "Adhoc",
wifi_mode::INFRA => "Infra",
wifi_mode::AP => "AP",
_ => "Unknown",
}
}
pub(crate) fn decode_ssid_or_hidden(bytes: &[u8]) -> Cow<'static, str> {
if bytes.is_empty() {
return Cow::Borrowed("<Hidden Network>");
}
match str::from_utf8(bytes) {
Ok(s) => Cow::Owned(s.to_owned()),
Err(e) => {
warn!("Invalid UTF-8 in SSID during comparison: {e}");
Cow::Borrowed("<Hidden Network>")
}
}
}
pub(crate) fn decode_ssid_or_empty(bytes: &[u8]) -> Cow<'static, str> {
if bytes.is_empty() {
return Cow::Borrowed("");
}
match str::from_utf8(bytes) {
Ok(s) => Cow::Owned(s.to_owned()),
Err(e) => {
warn!("Invalid UTF-8 in SSID during comparison: {e}");
Cow::Borrowed("")
}
}
}
pub(crate) fn strength_or_zero(strength: Option<u8>) -> u8 {
strength.unwrap_or(0)
}
pub(crate) async fn for_each_access_point<F, T>(conn: &Connection, mut func: F) -> Result<Vec<T>>
where
F: for<'a> FnMut(
&'a NMAccessPointProxy<'a>,
) -> std::pin::Pin<
Box<dyn std::future::Future<Output = Result<Option<T>>> + Send + 'a>,
>,
{
let nm = NMProxy::new(conn).await?;
let devices = nm.get_devices().await?;
let mut results = Vec::new();
for dp in devices {
let d_proxy = NMDeviceProxy::builder(conn)
.path(dp.clone())?
.build()
.await?;
if d_proxy.device_type().await? != device_type::WIFI {
continue;
}
let wifi = NMWirelessProxy::builder(conn)
.path(dp.clone())?
.build()
.await?;
for ap_path in wifi.access_points().await? {
let ap = NMAccessPointProxy::builder(conn)
.path(ap_path)?
.build()
.await?;
if let Some(result) = func(&ap).await? {
results.push(result);
}
}
}
Ok(results)
}
pub(crate) async fn nm_proxy<'a, P>(
conn: &'a Connection,
path: P,
interface: &'a str,
) -> Result<zbus::Proxy<'a>>
where
P: TryInto<OwnedObjectPath>,
P::Error: Into<zbus::Error>,
{
let owned_path = path.try_into().map_err(Into::into)?;
Ok(zbus::proxy::Builder::new(conn)
.destination("org.freedesktop.NetworkManager")?
.path(owned_path)?
.interface(interface)?
.build()
.await?)
}
pub(crate) async fn settings_proxy(conn: &Connection) -> Result<zbus::Proxy<'_>> {
nm_proxy(
conn,
"/org/freedesktop/NetworkManager/Settings",
"org.freedesktop.NetworkManager.Settings",
)
.await
}
pub(crate) async fn connection_settings_proxy<'a, P>(
conn: &'a Connection,
connection_path: P,
) -> Result<zbus::Proxy<'a>>
where
P: TryInto<OwnedObjectPath>,
P::Error: Into<zbus::Error>,
{
nm_proxy(
conn,
connection_path,
"org.freedesktop.NetworkManager.Settings.Connection",
)
.await
}
pub(crate) async fn extract_connection_state_reason(
conn: &Connection,
active_conn_path: &OwnedObjectPath,
) -> ConnectionStateReason {
match NMActiveConnectionProxy::builder(conn).path(active_conn_path.clone()) {
Ok(builder) => match builder.build().await {
Ok(ac) => match ac.state().await {
Ok(state) => {
debug!(
"Active connection state: {}, but reason not available as property",
state
);
ConnectionStateReason::Unknown
}
Err(e) => {
warn!("Failed to query active connection state: {}", e);
ConnectionStateReason::Unknown
}
},
Err(e) => {
warn!("Failed to build active connection proxy: {}", e);
ConnectionStateReason::Unknown
}
},
Err(e) => {
warn!("Failed to create active connection proxy builder: {}", e);
ConnectionStateReason::Unknown
}
}
}
pub(crate) fn bluez_device_path(bdaddr: &str) -> String {
format!("/org/bluez/hci0/dev_{}", bdaddr.replace(':', "_"))
}
#[macro_export]
macro_rules! try_log {
($result:expr, $context:expr) => {
match $result {
Ok(value) => value,
Err(e) => {
log::warn!("{}: {:?}", $context, e);
return None;
}
}
};
}
async fn extract_ip_address(
conn: &Connection,
config_path: OwnedObjectPath,
interface: &str,
) -> Option<String> {
let proxy = nm_proxy(conn, config_path, interface).await.ok()?;
let addr_array: Vec<HashMap<String, zvariant::Value>> =
proxy.get_property("AddressData").await.ok()?;
addr_array.first().and_then(|addr_map| {
let address = match addr_map.get("address")? {
zvariant::Value::Str(s) => s.as_str().to_string(),
_ => return None,
};
let prefix = match addr_map.get("prefix")? {
zvariant::Value::U32(p) => *p,
_ => return None,
};
Some(format!("{}/{}", address, prefix))
})
}
pub(crate) async fn get_ip_addresses_from_active_connection(
conn: &Connection,
active_conn_path: &OwnedObjectPath,
) -> (Option<String>, Option<String>) {
let ac_proxy = match async {
NMActiveConnectionProxy::builder(conn)
.path(active_conn_path.clone())
.ok()?
.build()
.await
.ok()
}
.await
{
Some(proxy) => proxy,
None => return (None, None),
};
let ip4_address = match ac_proxy.ip4_config().await {
Ok(path) if path.as_str() != "/" => {
extract_ip_address(conn, path, "org.freedesktop.NetworkManager.IP4Config").await
}
_ => None,
};
let ip6_address = match ac_proxy.ip6_config().await {
Ok(path) if path.as_str() != "/" => {
extract_ip_address(conn, path, "org.freedesktop.NetworkManager.IP6Config").await
}
_ => None,
};
(ip4_address, ip6_address)
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn test_channel_from_freq_2_4ghz() {
assert_eq!(channel_from_freq(2412), Some(1));
assert_eq!(channel_from_freq(2437), Some(6));
assert_eq!(channel_from_freq(2472), Some(13));
assert_eq!(channel_from_freq(2484), Some(14));
}
#[test]
fn test_channel_from_freq_5ghz() {
assert_eq!(channel_from_freq(5180), Some(36));
assert_eq!(channel_from_freq(5220), Some(44));
assert_eq!(channel_from_freq(5500), Some(100));
}
#[test]
fn test_channel_from_freq_6ghz() {
assert_eq!(channel_from_freq(5955), Some(1));
assert_eq!(channel_from_freq(6115), Some(33));
}
#[test]
fn test_channel_from_freq_invalid() {
assert_eq!(channel_from_freq(1000), None);
assert_eq!(channel_from_freq(9999), None);
}
#[test]
fn test_bars_from_strength() {
assert_eq!(bars_from_strength(0), "▂___");
assert_eq!(bars_from_strength(24), "▂___");
assert_eq!(bars_from_strength(25), "▂▄__");
assert_eq!(bars_from_strength(49), "▂▄__");
assert_eq!(bars_from_strength(50), "▂▄▆_");
assert_eq!(bars_from_strength(74), "▂▄▆_");
assert_eq!(bars_from_strength(75), "▂▄▆█");
assert_eq!(bars_from_strength(100), "▂▄▆█");
}
#[test]
fn test_mode_to_string() {
assert_eq!(mode_to_string(1), "Adhoc");
assert_eq!(mode_to_string(2), "Infra");
assert_eq!(mode_to_string(3), "AP");
assert_eq!(mode_to_string(99), "Unknown");
}
#[test]
fn test_decode_ssid_or_hidden() {
assert_eq!(decode_ssid_or_hidden(b"MyNetwork"), "MyNetwork");
assert_eq!(decode_ssid_or_hidden(b""), "<Hidden Network>");
assert_eq!(decode_ssid_or_hidden(b"Test_SSID-123"), "Test_SSID-123");
}
#[test]
fn test_decode_ssid_or_empty() {
assert_eq!(decode_ssid_or_empty(b"MyNetwork"), "MyNetwork");
assert_eq!(decode_ssid_or_empty(b""), "");
assert_eq!(decode_ssid_or_empty("café".as_bytes()), "café");
}
#[test]
fn test_strength_or_zero() {
assert_eq!(strength_or_zero(Some(75)), 75);
assert_eq!(strength_or_zero(Some(0)), 0);
assert_eq!(strength_or_zero(Some(100)), 100);
assert_eq!(strength_or_zero(None), 0);
}
}