pub mod udp_broadcast;
pub mod local_network;
pub mod udp_direct;
use crate::world_view::WorldView;
use crate::{init, config, print, ip_help_functions, world_view, };
use std::collections::VecDeque;
use serde::{Serialize, Deserialize};
use tokio::net::UdpSocket;
use tokio::time::{timeout, Duration, Instant};
use tokio::sync::{mpsc, watch};
use std::sync::atomic::{Ordering, AtomicU8, AtomicBool};
use std::sync::OnceLock;
use std::thread::sleep;
use local_ip_address::local_ip;
use std::net::IpAddr;
static ONLINE: OnceLock<AtomicBool> = OnceLock::new();
pub static SELF_ID: OnceLock<AtomicU8> = OnceLock::new();
#[derive(Serialize, Deserialize, Clone, Debug)]
pub struct ConnectionStatus
{
pub on_internett: bool,
pub connected_on_elevator_network: bool,
pub packet_loss: u8,
}
impl ConnectionStatus
{
pub fn new() -> Self
{
Self
{
on_internett: false,
connected_on_elevator_network: false,
packet_loss: 0,
}
}
fn set_packet_loss(&mut self, loss: f32)
{
self.packet_loss = (loss * 100.0) as u8;
}
}
pub async fn watch_ethernet(
wv_watch_rx: watch::Receiver<WorldView>,
network_watch_tx: watch::Sender<ConnectionStatus>,
new_wv_after_offline_tx: mpsc::Sender<WorldView>
)
{
let mut last_net_status = false;
let network_quality_rx = start_packet_loss_monitor(
1,
5,
1000 as usize,
1.0
).await;
loop
{
let ip = get_self_ip();
let mut connection_status = ConnectionStatus::new();
let net_status: bool;
match ip
{
Ok(ip) if ip_help_functions::get_root_ip(ip) == config::NETWORK_PREFIX =>
{
let (is_ok, loss) = network_quality_rx.borrow().clone();
net_status = is_ok;
connection_status.on_internett = true;
connection_status.connected_on_elevator_network = is_ok;
connection_status.set_packet_loss(loss);
let _ = network_watch_tx.send(connection_status.clone());
}
_ =>
{
connection_status.on_internett = false;
connection_status.connected_on_elevator_network = false;
connection_status.packet_loss = 100;
net_status = false;
let _ = network_watch_tx.send(connection_status.clone());
}
}
if last_net_status != net_status
{
if net_status
{
let mut wv = world_view::get_wv(wv_watch_rx.clone());
let self_elev = world_view::extract_self_elevator_container(&wv);
wv = init::initialize_worldview(self_elev).await;
let _ = new_wv_after_offline_tx.send(wv).await;
print::ok("System is online".to_string());
} else
{
print::warn("System is offline".to_string());
}
set_network_status(net_status);
last_net_status = net_status;
}
sleep(config::POLL_PERIOD);
}
}
pub fn read_network_status() -> bool
{
ONLINE.get_or_init(|| AtomicBool::new(false)).load(Ordering::SeqCst)
}
pub fn read_self_id() -> u8
{
SELF_ID.get_or_init(|| AtomicU8::new(config::ERROR_ID)).load(Ordering::SeqCst)
}
pub fn set_self_id(id: u8)
{
SELF_ID.get_or_init(|| AtomicU8::new(config::ERROR_ID)).store(id, Ordering::SeqCst);
}
fn get_self_ip() -> Result<IpAddr, local_ip_address::Error> {
let ip = match local_ip() {
Ok(ip) => {
ip
}
Err(e) => {
return Err(e);
}
};
Ok(ip)
}
async fn wait_for_ip() -> IpAddr {
loop {
if let Ok(ip) = get_self_ip() {
return ip;
} else {
sleep(config::POLL_PERIOD);
}
}
}
async fn start_packet_loss_monitor(
interval_ms: u64,
timeout_ms: u64,
max_window: usize,
max_loss_rate: f32,
) -> watch::Receiver<(bool, f32)> {
use tokio::sync::watch;
use socket2::{Socket, Domain, Type};
let (tx, rx) = watch::channel((true, 0.0)); let addr = format!("{}:{}", wait_for_ip().await, config::BROADCAST_PORT);
tokio::spawn(async move {
let mut last_loss: f32 = 0.0;
let mut last_status: bool = false;
let mut last_instant = Instant::now();
let mut window: VecDeque<bool> = VecDeque::from(vec![true; max_window]);
loop {
let success = {
let socket_addr: std::net::SocketAddr = match addr.parse() {
Ok(addr) => addr,
Err(_) => {
break false;
}
};
let socket_temp = match Socket::new(Domain::IPV4, Type::DGRAM, None) {
Ok(s) => s,
Err(_) => {
break false;
}
};
if socket_temp.set_nonblocking(true).is_err() {break false}
if socket_temp.set_reuse_address(true).is_err() {break false}
if socket_temp.set_broadcast(true).is_err() {break false}
if socket_temp.bind(&socket_addr.into()).is_err() {break false}
match UdpSocket::from_std(socket_temp.into()) {
Ok(socket) => {
let payload = b"ping";
if socket.send_to(payload, &addr).await.is_err() {
false
} else {
let mut buf = [0u8; 16];
timeout(Duration::from_millis(timeout_ms), socket.recv_from(&mut buf))
.await
.ok()
.map(|r| r.is_ok())
.unwrap_or(false)
}
}
Err(_) => {
false
},
}
};
window.push_back(success);
if window.len() > max_window {
window.pop_front();
}
let fail_count = window.iter().filter(|&&ok| !ok).count();
let raw_loss = fail_count as f32 / window.len() as f32;
let loss_rate = 1.0 - (1.0 - raw_loss).sqrt();
let new_status = loss_rate <= max_loss_rate;
if (last_status != new_status) || (loss_rate - last_loss).abs() > 0.01 {
if Instant::now() - last_instant > Duration::from_secs(5) {
last_instant = Instant::now();
let _ = tx.send((new_status, loss_rate));
last_status = new_status;
}else {
let _ = tx.send((new_status, loss_rate));
last_loss = loss_rate;
}
}
tokio::time::sleep(Duration::from_millis(interval_ms)).await;
}
});
rx
}
fn set_network_status(status: bool) {
ONLINE.get_or_init(|| AtomicBool::new(false)).store(status, Ordering::SeqCst);
}