1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182
use crate::{ChannelCommand, Frame};
use log::{debug, error, info, warn};
use std::{io::ErrorKind, sync::Arc, time::Duration};
use tokio::{
io::{self, AsyncReadExt, AsyncWriteExt},
net::TcpListener,
sync::{
mpsc::{channel, Sender},
Mutex,
},
time::{timeout, Instant},
};
/// Create a TCP listener and a upstream channel sender receiver
/// to allow stream/socket identification and registration.
pub async fn create_listener(
uri: String,
tx_up: Sender<(String, Option<Sender<ChannelCommand>>)>,
flag_int: Arc<Mutex<bool>>,
) -> io::Result<()> {
let listener = TcpListener::bind(uri.clone()).await?;
// Listener loop. For ever incoming socket/stream, create a new async task
// to handle it.
loop {
{
if *flag_int.lock().await == true {
return Ok(());
}
}
if let Ok(listen_result) = timeout(Duration::from_secs(5), listener.accept()).await {
if let Ok((mut stream, _)) = listen_result {
let tx_up = tx_up.clone();
// Time elapsed since last PING or channel message.
let mut last_active = Instant::now();
let flag_int_clone = flag_int.clone();
tokio::spawn(async move {
info!(target: "atc-listener", "New incomming connection");
let (tx_down, mut rx_down) = channel::<ChannelCommand>(1024);
let mut buffer = vec![0u8; 65535];
let mut parsing_buffer: Vec<u8> = vec![];
let mut write_command_buffer: Option<ChannelCommand> = None;
let mut job_id = String::new();
let mut read_timeout_ms = 16f32;
loop {
// Check for time elapsed.
if last_active.elapsed().as_secs() > 60 {
warn!(target: "atc-listener", "Connection will be reset due to 60 seconds of inactivity.");
return;
}
{
if *flag_int_clone.lock().await == true {
return;
}
}
match rx_down.try_recv() {
Ok(command) => {
write_command_buffer = Some(command);
}
Err(_) => (),
}
if let Some(command) = write_command_buffer.clone() {
let frame: Frame = command.into();
let frame_bytes: Vec<u8> = frame.into();
match stream.write_all(&frame_bytes).await {
Ok(_) => {
write_command_buffer = None;
}
Err(e) => {
if e.kind() == ErrorKind::ConnectionReset {
warn!(target: "atc-listener", "Connection closed by peer.");
break;
}
warn!(target:"atc-listener", "Unable to write to stream: {:?}", e);
}
};
}
if let Ok(res) = timeout(
Duration::from_millis(read_timeout_ms.floor() as u64),
stream.read(&mut buffer),
)
.await
{
{
if *flag_int_clone.lock().await == true {
return;
}
}
match res {
Ok(n) => {
if n == 0 {
if let Err(e) = stream.shutdown().await {
// Notify manager a connection is down and should be removed from btree-map.
tx_up.send((job_id.clone(), None)).await.unwrap();
warn!(target: "atc-listener", "Unable to shutdown TCP connection from server side: {:?}", e);
};
break (());
}
// Update elapsed time.
last_active = Instant::now();
// Reset read timeout.
read_timeout_ms = 16f32;
let (frames, remain) =
Frame::parse_sequence(&buffer[0..n], Some(parsing_buffer));
parsing_buffer = remain;
// let content = String::from_utf8(buffer[0..n].to_vec()).unwrap();
// debug!(target: "atc-listener", "RAW MESSAGE: `{}`", content);
buffer = vec![0u8; 65535];
for frame in frames {
let command = ChannelCommand::from(frame);
match command {
ChannelCommand::Identify(id) => {
job_id = id;
info!(target: "atc-listener", "Received identification: {}", job_id);
match tx_up
.send((job_id.clone(), Some(tx_down.clone())))
.await
{
Ok(_) => (),
Err(_) => {
error!(target: "atc-listener", "Unable to write to server control channel sender.")
}
}
}
ChannelCommand::Terminate(job_id) => {
tx_up.send((job_id.clone(), None)).await.unwrap();
}
ChannelCommand::Ping => {
debug!(target: "atc-listener", "Ping received from `{}`", job_id);
write_command_buffer = Some(ChannelCommand::Pong);
}
_ => (),
}
}
}
Err(ref e) if e.kind() == ErrorKind::ConnectionReset => {
debug!(target: "atc-listener", "Connection broken [job-id={}]", job_id);
if let Err(e) = tx_up.send((job_id.clone(), None)).await {
error!(target: "atc-listener", "Unable to send control message to channel [connection broken]: {:?}", e);
};
break;
}
Err(e) => {
error!(target: "atc-listener", "Error writing bytes to channel: {:?}", e);
break;
}
}
} else {
// Should increase
read_timeout_ms *= 1.25;
read_timeout_ms = read_timeout_ms.min(4096.0);
}
}
info!(target: "atc-listener", "Socket/stream handler for job-id=`{}` closed.", job_id);
});
}
} else {
debug!(target: "atc-listener", "No new connections in last 5 seconds");
}
}
}