use crate::network::ConnectionStatus;
use crate::world_view::{self, WorldView, serialize};
use crate::config;
use crate::init;
use crate::network;
use crate::print;
use serde::{Serialize, Deserialize};
use socket2::{Socket, Domain, Type, Protocol};
use std::env;
use std::process::Command;
use std::net::ToSocketAddrs;
use std::io::{self, Write};
use std::sync::atomic::{AtomicBool, Ordering};
use tokio::net::{TcpListener, TcpStream};
use tokio::io::{AsyncReadExt, AsyncWriteExt};
use tokio::sync::watch;
use tokio::time::{sleep, timeout};
#[derive(Serialize, Deserialize, Clone, Debug)]
struct BackupPayload
{
pub worldview: world_view::WorldView,
pub network_status: ConnectionStatus,
}
static BACKUP_STARTED: AtomicBool = AtomicBool::new(false);
pub async fn start_backup_server(
wv_watch_rx: watch::Receiver<WorldView>,
network_watch_rx: watch::Receiver<network::ConnectionStatus>,
)
{
print::info(format!("Backup-server starting..."));
let listener = create_reusable_listener(config::BCU_PORT);
let wv = world_view::get_wv(wv_watch_rx.clone());
let initial_payload = BackupPayload
{
worldview: wv.clone(),
network_status: ConnectionStatus::new(),
};
let (tx, rx) = watch::channel(initial_payload);
start_backup_terminal();
tokio::spawn(async move {
loop
{
let (socket, _) = listener
.accept()
.await
.expect("Failed to accept backup-connection");
handle_backup_client(socket, rx.clone()).await;
}
});
let tx_clone = tx.clone();
let wv_rx_clone = wv_watch_rx.clone();
tokio::spawn(async move {
loop
{
let new_wv = world_view::get_wv(wv_rx_clone.clone());
let status = network_watch_rx.borrow().clone();
let payload = BackupPayload
{
worldview: new_wv,
network_status: status,
};
if tx_clone.send(payload).is_err()
{
print::err(format!("Failed to sen payload to backup"));
}
sleep(config::BACKUP_WORLDVIEW_REFRESH_INTERVAL).await;
}
});
}
pub async fn run_as_backup() -> Option<world_view::ElevatorContainer>
{
println!("Starting backup-client...");
let mut current_wv = init::initialize_worldview(None).await;
let mut retries = 0;
loop
{
match timeout(
config::MASTER_TIMEOUT,
TcpStream::connect(format!("localhost:{}", config::BCU_PORT))
).await
{
Ok(Ok(mut stream)) =>
{
retries = 0;
let mut buf = vec![0u8; 1024];
loop
{
match stream.read(&mut buf).await
{
Ok(0) =>
{
print::err(format!("Master connection has ended."));
break;
},
Ok(n) =>
{
let raw = &buf[..n];
let payload: Option<BackupPayload> = bincode::deserialize(raw).ok();
if let Some(payload) = payload
{
current_wv = payload.worldview;
let status = payload.network_status;
print!("\x1B[2J\x1B[H");
io::stdout().flush().unwrap();
print::worldview(¤t_wv, Some(status));
} else
{
print::warn(format!("Klarte ikkje deserialisere payload."));
continue;
}
},
Err(e) =>
{
print::err(format!("Error while reading from master: {}", e));
break;
}
}
}
},
_ =>
{
retries += 1;
print::err(format!("Failed to connect to master, retry {}.", retries));
if retries > config::BACKUP_FAILOVER_THRESHOLD
{
print::err(format!("Master failed, promoting backup to master!"));
match world_view::extract_self_elevator_container(¤t_wv).to_owned()
{
Some(container) => return Some(container.to_owned()),
None =>
{
print::warn(format!("Failed to extract self elevator container"));
return None;
}
}
}
}
}
sleep(config::BACKUP_RETRY_DELAY).await;
}
}
fn create_reusable_listener(
port: u16
) -> TcpListener {
let addr_str = format!("localhost:{}", port);
let addr_iter = addr_str
.to_socket_addrs()
.expect("Klarte ikkje resolve 'localhost'");
let addr = addr_iter
.filter(|a| a.is_ipv4())
.next()
.expect("Fann ingen IPv4-adresse for localhost");
let socket = Socket::new(Domain::IPV4, Type::STREAM, Some(Protocol::TCP))
.expect("Couldnt create socket");
socket.set_nonblocking(true)
.expect("Couldnt set non blocking");
socket.set_reuse_address(true)
.expect("Couldnt set reuse_address");
socket.bind(&addr.into())
.expect("Couldnt bind the socket");
socket.listen(128)
.expect("Couldnt listen on the socket");
TcpListener::from_std(socket.into())
.expect("Couldnt create TcpListener")
}
fn start_backup_terminal() {
if !BACKUP_STARTED.load(Ordering::SeqCst) {
let current_exe = env::current_exe().expect("Couldnt extract the executable");
let _child = Command::new("gnome-terminal")
.arg("--geometry=400x24")
.arg("--")
.arg(current_exe.to_str().unwrap())
.arg("backup")
.spawn()
.expect("Feil ved å starte backupterminalen");
BACKUP_STARTED.store(true, Ordering::SeqCst);
}
}
async fn handle_backup_client(
mut stream: TcpStream,
rx: watch::Receiver<BackupPayload>
) {
loop {
let payload = rx.borrow().clone();
let serialized = serialize(&payload);
if let Err(e) = stream.write_all(&serialized).await {
print::err(format!("Backup send error: {}", e));
print::warn(format!("Prøver igjen om {:?}", config::BACKUP_TIMEOUT));
sleep(config::BACKUP_TIMEOUT).await;
BACKUP_STARTED.store(false, Ordering::SeqCst);
start_backup_terminal();
break;
}
sleep(config::BACKUP_SEND_INTERVAL).await;
}
}