mod update_wv;
use update_wv::{
join_wv_from_udp,
abort_network,
join_wv_from_container,
remove_container,
clear_from_sent_data,
distribute_tasks,
update_elev_states,
merge_wv_after_offline,
};
use crate::print;
use crate::world_view::{ElevatorContainer, WorldView};
use crate::world_view::{self};
use tokio::sync::{mpsc, watch};
use std::collections::HashMap;
#[allow(non_snake_case)]
pub async fn update_wv_watch(
mut mpsc_rxs: MpscRxs,
worldview_watch_tx: watch::Sender<WorldView>,
mut worldview: &mut WorldView
)
{
let _ = worldview_watch_tx.send(worldview.clone());
let mut wv_edited_I = false;
let mut master_container_updated_I = false;
let (master_container_tx, mut master_container_rx) = mpsc::channel::<ElevatorContainer>(100);
loop
{
match mpsc_rxs.sent_container.try_recv()
{
Ok(msg) =>
{
wv_edited_I = clear_from_sent_data(&mut worldview, msg);
},
Err(_) => {},
}
match mpsc_rxs.udp_wv.try_recv()
{
Ok(mut master_wv) =>
{
wv_edited_I = join_wv_from_udp(&mut worldview, &mut master_wv);
},
Err(_) => {},
}
match mpsc_rxs.connection_to_master_failed.try_recv()
{
Ok(_) =>
{
wv_edited_I = abort_network(&mut worldview);
},
Err(_) => {},
}
match master_container_rx.try_recv()
{
Ok(container) =>
{
wv_edited_I = join_wv_from_container(&mut worldview, &container).await;
},
Err(_) => {},
}
match mpsc_rxs.container.try_recv()
{
Ok(container) =>
{
wv_edited_I = join_wv_from_container(&mut worldview, &container).await;
},
Err(_) => {},
}
match mpsc_rxs.remove_container.try_recv()
{
Ok(id) =>
{
print::master(format!("Removing ID: {}", id));
wv_edited_I = remove_container(&mut worldview, id);
},
Err(_) => {},
}
match mpsc_rxs.delegated_tasks.try_recv()
{
Ok(map) =>
{
wv_edited_I = distribute_tasks(&mut worldview, map);
},
Err(_) => {},
}
match mpsc_rxs.elevator_states.try_recv()
{
Ok(container) =>
{
wv_edited_I = update_elev_states(&mut worldview, container);
master_container_updated_I = world_view::is_master(&worldview);
},
Err(_) => {},
}
match mpsc_rxs.new_wv_after_offline.try_recv()
{
Ok(mut read_wv) =>
{
merge_wv_after_offline(&mut worldview, &mut read_wv);
let _ = worldview_watch_tx.send(worldview.clone());
},
Err(_) => {},
}
if master_container_updated_I
{
if let Some(container) = world_view::extract_self_elevator_container(&worldview)
{
let _ = master_container_tx.send(container.clone()).await;
} else
{
print::warn(format!("Failed to extract self elevator container – skipping update"));
}
master_container_updated_I = false;
}
if wv_edited_I
{
let _ = worldview_watch_tx.send(worldview.clone());
wv_edited_I = false;
}
}
}
#[allow(missing_docs)]
#[derive(Clone)]
pub struct MpscTxs
{
pub udp_wv: mpsc::Sender<WorldView>,
pub connection_to_master_failed: mpsc::Sender<bool>,
pub container: mpsc::Sender<ElevatorContainer>,
pub remove_container: mpsc::Sender<u8>,
pub sent_container: mpsc::Sender<ElevatorContainer>,
pub delegated_tasks: mpsc::Sender<HashMap<u8, Vec<[bool; 2]>>>,
pub elevator_states: mpsc::Sender<ElevatorContainer>,
pub new_wv_after_offline: mpsc::Sender<WorldView>,
pub mpsc_buffer_ch6: mpsc::Sender<Vec<u8>>,
pub mpsc_buffer_ch7: mpsc::Sender<Vec<u8>>,
pub mpsc_buffer_ch8: mpsc::Sender<Vec<u8>>,
pub mpsc_buffer_ch9: mpsc::Sender<Vec<u8>>,
}
#[allow(missing_docs)]
pub struct MpscRxs
{
pub udp_wv: mpsc::Receiver<WorldView>,
pub connection_to_master_failed: mpsc::Receiver<bool>,
pub container: mpsc::Receiver<ElevatorContainer>,
pub remove_container: mpsc::Receiver<u8>,
pub sent_container: mpsc::Receiver<ElevatorContainer>,
pub delegated_tasks: mpsc::Receiver<HashMap<u8, Vec<[bool; 2]>>>,
pub elevator_states: mpsc::Receiver<ElevatorContainer>,
pub new_wv_after_offline: mpsc::Receiver<WorldView>,
pub mpsc_buffer_ch6: mpsc::Receiver<Vec<u8>>,
pub mpsc_buffer_ch7: mpsc::Receiver<Vec<u8>>,
pub mpsc_buffer_ch8: mpsc::Receiver<Vec<u8>>,
pub mpsc_buffer_ch9: mpsc::Receiver<Vec<u8>>,
}
pub struct Mpscs
{
pub txs: MpscTxs,
pub rxs: MpscRxs,
}
impl Mpscs {
pub fn new() -> Self
{
let (tx_udp, rx_udp) = mpsc::channel(300);
let (tx_connection_to_master_failed, rx_connection_to_master_failed) = mpsc::channel(300);
let (tx_container, rx_container) = mpsc::channel(300);
let (tx_remove_container, rx_remove_container) = mpsc::channel(300);
let (tx_sent_container, rx_sent_container) = mpsc::channel(300);
let (tx_buf3, rx_buf3) = mpsc::channel(300);
let (tx_buf4, rx_buf4) = mpsc::channel(300);
let (tx_buf5, rx_buf5) = mpsc::channel(300);
let (tx_buf6, rx_buf6) = mpsc::channel(300);
let (tx_buf7, rx_buf7) = mpsc::channel(300);
let (tx_buf8, rx_buf8) = mpsc::channel(300);
let (tx_buf9, rx_buf9) = mpsc::channel(300);
Mpscs
{
txs: MpscTxs
{
udp_wv: tx_udp,
connection_to_master_failed: tx_connection_to_master_failed,
container: tx_container,
remove_container: tx_remove_container,
sent_container: tx_sent_container,
delegated_tasks: tx_buf3,
elevator_states: tx_buf4,
new_wv_after_offline: tx_buf5,
mpsc_buffer_ch6: tx_buf6,
mpsc_buffer_ch7: tx_buf7,
mpsc_buffer_ch8: tx_buf8,
mpsc_buffer_ch9: tx_buf9,
},
rxs: MpscRxs
{
udp_wv: rx_udp,
connection_to_master_failed: rx_connection_to_master_failed,
container: rx_container,
remove_container: rx_remove_container,
sent_container: rx_sent_container,
delegated_tasks: rx_buf3,
elevator_states: rx_buf4,
new_wv_after_offline: rx_buf5,
mpsc_buffer_ch6: rx_buf6,
mpsc_buffer_ch7: rx_buf7,
mpsc_buffer_ch8: rx_buf8,
mpsc_buffer_ch9: rx_buf9,
},
}
}
}