use super::{Packet, PeerState, RequestSenderForBundle, ResponseSenderForBundle, ServiceState};
use crate::cla::ConvergenceLayerAgent;
use crate::routing::erouting::Error;
use crate::{
cla_names, lazy_static, service_add, BundlePack, ClaSenderTask, RoutingNotifcation, CLAS,
DTNCORE, PEERS,
};
use axum::extract::ws::{Message, WebSocket};
use futures_util::{future, SinkExt, StreamExt, TryStreamExt};
use log::{error, info, trace};
use std::collections::HashMap;
use std::sync::{Arc, Mutex};
use std::time;
use tokio::sync::mpsc;
use tokio::sync::mpsc::Sender;
use tokio::sync::oneshot;
use tokio::time::timeout;
const EROUTING_RESPONSE_TIMEOUT_MS: u64 = 250;
struct Connection {
tx: Sender<Message>,
}
type ResponseMap = Arc<Mutex<HashMap<String, oneshot::Sender<Packet>>>>;
lazy_static! {
static ref CONNECTION: Arc<Mutex<Option<Connection>>> = Arc::new(Mutex::new(None));
static ref RESPONSES: ResponseMap = ResponseMap::new(Mutex::new(HashMap::new()));
}
fn send_peer_state() {
let peer_state: Packet = Packet::PeerState(PeerState {
peers: PEERS.lock().clone(),
});
send_packet(&peer_state);
}
fn send_service_state() {
let service_state: Packet = Packet::ServiceState(ServiceState {
service_list: DTNCORE.lock().service_list.clone(),
});
send_packet(&service_state);
}
pub async fn handle_connection(ws: WebSocket) {
let (tx, mut rx) = mpsc::channel(100);
let (mut outgoing, incoming) = ws.split();
if CONNECTION.lock().unwrap().is_some() {
info!("Websocket connection closed because external routing agent is already connected");
if let Ok(data) = serde_json::to_string(&Packet::Error(Error {
reason: String::from("external routing agent already registered"),
})) {
if let Err(err) = outgoing.send(Message::Text(data)).await {
error!("Error while sending closing reason: {}", err);
}
}
if let Err(err) = outgoing.close().await {
error!("Error while closing websocket: {}", err);
}
return;
}
*CONNECTION.lock().unwrap() = Some(Connection { tx });
send_peer_state();
send_service_state();
let broadcast_incoming = incoming.try_for_each(|msg| {
trace!(
"Received a external routing message: {}",
msg.to_text().unwrap().trim()
);
let packet: serde_json::Result<Packet> = serde_json::from_str(msg.to_text().unwrap());
match packet {
Ok(packet) => match packet {
Packet::ResponseSenderForBundle(packet) => {
trace!(
"sender_for_bundle response: {}",
msg.to_text().unwrap().trim()
);
if let Some(tx) = RESPONSES
.lock()
.unwrap()
.remove(packet.bp.to_string().as_str())
{
if tx.send(Packet::ResponseSenderForBundle(packet)).is_err() {
error!("sender_for_bundle response could not be passed to channel")
}
} else {
info!("sender_for_bundle no response channel available")
}
}
Packet::ServiceAdd(packet) => {
info!(
"adding service via erouting {}:{}",
packet.tag, packet.service
);
service_add(packet.tag, packet.service);
}
_ => {}
},
Err(err) => {
info!("err decoding external routing packet: {}", err);
}
}
future::ok(())
});
let receive_from_others = tokio::spawn(async move {
while let Some(cmd) = rx.recv().await {
if let Err(err) = outgoing.send(cmd).await {
error!("err while sending to outgoing channel: {}", err);
}
}
});
future::select(broadcast_incoming, receive_from_others).await;
info!("External routing disconnected");
disconnect();
}
fn disconnect() {
(*CONNECTION.lock().unwrap()) = None;
}
fn send_packet(p: &Packet) {
if let Ok(data) = serde_json::to_string(p) {
if let Some(con) = CONNECTION.lock().unwrap().as_ref() {
if let Err(err) = con.tx.try_send(Message::Text(data)) {
error!("couldn't send packet {}", err)
}
}
}
}
pub fn notify(notification: RoutingNotifcation) {
send_packet(¬ification.into());
}
fn remove_response_channel(id: &str) {
RESPONSES.lock().unwrap().remove(id);
}
fn create_response_channel(id: &str, tx: oneshot::Sender<Packet>) {
RESPONSES.lock().unwrap().insert(id.to_string(), tx);
}
fn unpack_sender_for_bundle(packet: ResponseSenderForBundle) -> (Vec<ClaSenderTask>, bool) {
(
packet
.clas
.iter()
.filter_map(|sender| {
for cla_instance in &(*CLAS.lock()) {
if sender.agent == cla_instance.name() {
let dest = format!(
"{}:{}",
sender.remote,
sender.port.unwrap_or_else(|| cla_instance.port())
);
return Some(ClaSenderTask {
tx: cla_instance.channel(),
dest,
cla_name: cla_instance.name().into(),
next_hop: sender.next_hop.clone(),
});
}
}
None
})
.collect(),
packet.delete_afterwards,
)
}
pub async fn sender_for_bundle(bp: &BundlePack) -> (Vec<ClaSenderTask>, bool) {
trace!("external sender_for_bundle initiated: {}", bp);
if CONNECTION.lock().unwrap().is_none() {
return (vec![], false);
}
let (tx, rx) = oneshot::channel();
create_response_channel(bp.to_string().as_str(), tx);
let packet: Packet = Packet::RequestSenderForBundle(RequestSenderForBundle {
clas: cla_names(),
bp: bp.clone(),
});
send_packet(&packet);
let res = timeout(
time::Duration::from_millis(EROUTING_RESPONSE_TIMEOUT_MS),
rx,
)
.await;
if let Ok(Ok(Packet::ResponseSenderForBundle(packet))) = res {
remove_response_channel(bp.to_string().as_str());
if packet.bp.to_string() != bp.to_string() {
error!("got a wrong bundle pack! {} != {}", bp, packet.bp);
return (vec![], false);
}
return unpack_sender_for_bundle(packet);
}
send_packet(&Packet::Timeout(super::Timeout { bp: bp.clone() }));
info!("timeout while waiting for sender_for_bundle");
remove_response_channel(bp.to_string().as_str());
(vec![], false)
}