use crate::config;
use crate::network;
use crate::world_view;
use crate::world_view::WorldView;
use std::net::SocketAddr;
use std::thread::sleep;
use std::time::Duration;
use tokio::net::UdpSocket;
use socket2::{Domain, Socket, Type};
use tokio::sync::mpsc;
use tokio::sync::watch;
pub async fn start_udp_broadcaster(
wv_watch_rx: watch::Receiver<WorldView>
) -> tokio::io::Result<()>
{
while !network::read_network_status() {}
let mut prev_network_status = network::read_network_status();
let addr: &str = &format!("{}:{}", config::BC_ADDR, config::BROADCAST_PORT);
let addr2: &str = &format!("{}:0", config::BC_LISTEN_ADDR);
let broadcast_addr: SocketAddr = addr.parse().expect("Invalid address"); let socket_addr: SocketAddr = addr2.parse().expect("Invalid address");
let socket = Socket::new(Domain::IPV4, Type::DGRAM, None)?;
socket.set_nonblocking(true)?;
socket.set_reuse_address(true)?;
socket.set_broadcast(true)?;
socket.bind(&socket_addr.into())?;
let udp_socket = UdpSocket::from_std(socket.into())?;
let mut wv = world_view::get_wv(wv_watch_rx.clone());
loop
{
let wv_watch_rx_clone = wv_watch_rx.clone();
world_view::update_wv(wv_watch_rx_clone, &mut wv).await;
if network::read_self_id() == wv.master_id
{
sleep(config::UDP_PERIOD);
let message_bytes = build_message(&wv);
if network::read_network_status()
{
if !prev_network_status
{
sleep(Duration::from_millis(500));
prev_network_status = true;
}
match udp_socket.send_to(&message_bytes, &broadcast_addr).await
{
Ok(_) => {},
Err(_) => {},
}
}else
{
prev_network_status = false;
}
}
}
}
pub async fn start_udp_listener(
wv_watch_rx: watch::Receiver<WorldView>,
udp_wv_tx: mpsc::Sender<WorldView>
) -> tokio::io::Result<()>
{
while !network::read_network_status() {}
let self_id = network::read_self_id();
let broadcast_listen_addr = format!("{}:{}", config::BC_LISTEN_ADDR, config::BROADCAST_PORT);
let socket_addr: SocketAddr = broadcast_listen_addr.parse().expect("Invalid address");
let socket_temp = Socket::new(Domain::IPV4, Type::DGRAM, None)?;
socket_temp.set_nonblocking(true).expect("Failed to set non-blocking");
socket_temp.set_reuse_address(true)?;
socket_temp.set_broadcast(true)?;
socket_temp.bind(&socket_addr.into())?;
let socket = UdpSocket::from_std(socket_temp.into())?;
let mut buf = [0; config::UDP_BUFFER];
let mut read_wv: Option<WorldView>;
let mut my_wv = world_view::get_wv(wv_watch_rx.clone());
loop
{
match socket.recv_from(&mut buf).await
{
Ok((len, _)) =>
{
read_wv = parse_message(&buf[..len]);
}
Err(e) =>
{
return Err(e);
}
}
match read_wv
{
Some(read_wv) =>
{
world_view::update_wv(wv_watch_rx.clone(), &mut my_wv).await;
if my_wv.master_id >= read_wv.master_id
&& self_id != read_wv.master_id
{
my_wv = read_wv;
let _ = udp_wv_tx.send(my_wv.clone()).await;
}
},
None => continue,
}
}
}
fn build_message(
wv: &WorldView
) -> Vec<u8>
{
let mut buf = Vec::new();
let key_bytes = world_view::serialize(&config::KEY_STR);
buf.extend_from_slice(&key_bytes);
let wv_bytes = world_view::serialize(&wv);
buf.extend_from_slice(&wv_bytes);
buf
}
pub fn parse_message(
buf: &[u8]
) -> Option<WorldView>
{
let key_len = bincode::serialized_size(config::KEY_STR).unwrap() as usize;
if buf.len() <= key_len {return None}
let (key_part, wv_part) = buf.split_at(key_len);
let key: String = bincode::deserialize(key_part).ok()?;
if key != config::KEY_STR {return None}
world_view::deserialize(wv_part)
}