atc/libs/listener.rs
1use crate::{ChannelCommand, Frame};
2
3use log::{debug, error, info, warn};
4use std::{io::ErrorKind, sync::Arc, time::Duration};
5use tokio::{
6 io::{self, AsyncReadExt, AsyncWriteExt},
7 net::TcpListener,
8 sync::{
9 mpsc::{channel, Sender},
10 Mutex,
11 },
12 time::{timeout, Instant},
13};
14
15/// Create a TCP listener and a upstream channel sender receiver
16/// to allow stream/socket identification and registration.
17pub async fn create_listener(
18 uri: String,
19 tx_up: Sender<(String, Option<Sender<ChannelCommand>>)>,
20 flag_int: Arc<Mutex<bool>>,
21) -> io::Result<()> {
22 let listener = TcpListener::bind(uri.clone()).await?;
23
24 // Listener loop. For ever incoming socket/stream, create a new async task
25 // to handle it.
26 loop {
27 {
28 if *flag_int.lock().await == true {
29 return Ok(());
30 }
31 }
32
33 if let Ok(listen_result) = timeout(Duration::from_secs(5), listener.accept()).await {
34 if let Ok((mut stream, _)) = listen_result {
35 let tx_up = tx_up.clone();
36
37 // Time elapsed since last PING or channel message.
38 let mut last_active = Instant::now();
39
40 let flag_int_clone = flag_int.clone();
41 tokio::spawn(async move {
42 info!(target: "atc-listener", "New incomming connection");
43 let (tx_down, mut rx_down) = channel::<ChannelCommand>(1024);
44
45 let mut buffer = vec![0u8; 65535];
46
47 let mut parsing_buffer: Vec<u8> = vec![];
48 let mut write_command_buffer: Option<ChannelCommand> = None;
49
50 let mut job_id = String::new();
51 let mut read_timeout_ms = 16f32;
52
53 loop {
54 // Check for time elapsed.
55 if last_active.elapsed().as_secs() > 60 {
56 warn!(target: "atc-listener", "Connection will be reset due to 60 seconds of inactivity.");
57 return;
58 }
59
60 {
61 if *flag_int_clone.lock().await == true {
62 return;
63 }
64 }
65 match rx_down.try_recv() {
66 Ok(command) => {
67 write_command_buffer = Some(command);
68 }
69 Err(_) => (),
70 }
71
72 if let Some(command) = write_command_buffer.clone() {
73 let frame: Frame = command.into();
74 let frame_bytes: Vec<u8> = frame.into();
75 match stream.write_all(&frame_bytes).await {
76 Ok(_) => {
77 write_command_buffer = None;
78 }
79 Err(e) => {
80 if e.kind() == ErrorKind::ConnectionReset {
81 warn!(target: "atc-listener", "Connection closed by peer.");
82 break;
83 }
84 warn!(target:"atc-listener", "Unable to write to stream: {:?}", e);
85 }
86 };
87 }
88
89 if let Ok(res) = timeout(
90 Duration::from_millis(read_timeout_ms.floor() as u64),
91 stream.read(&mut buffer),
92 )
93 .await
94 {
95 {
96 if *flag_int_clone.lock().await == true {
97 return;
98 }
99 }
100
101 match res {
102 Ok(n) => {
103 if n == 0 {
104 if let Err(e) = stream.shutdown().await {
105 // Notify manager a connection is down and should be removed from btree-map.
106 tx_up.send((job_id.clone(), None)).await.unwrap();
107 warn!(target: "atc-listener", "Unable to shutdown TCP connection from server side: {:?}", e);
108 };
109 break (());
110 }
111
112 // Update elapsed time.
113 last_active = Instant::now();
114
115 // Reset read timeout.
116 read_timeout_ms = 16f32;
117
118 let (frames, remain) =
119 Frame::parse_sequence(&buffer[0..n], Some(parsing_buffer));
120 parsing_buffer = remain;
121 // let content = String::from_utf8(buffer[0..n].to_vec()).unwrap();
122
123 // debug!(target: "atc-listener", "RAW MESSAGE: `{}`", content);
124
125 buffer = vec![0u8; 65535];
126
127 for frame in frames {
128 let command = ChannelCommand::from(frame);
129
130 match command {
131 ChannelCommand::Identify(id) => {
132 job_id = id;
133 info!(target: "atc-listener", "Received identification: {}", job_id);
134 match tx_up
135 .send((job_id.clone(), Some(tx_down.clone())))
136 .await
137 {
138 Ok(_) => (),
139 Err(_) => {
140 error!(target: "atc-listener", "Unable to write to server control channel sender.")
141 }
142 }
143 }
144 ChannelCommand::Terminate(job_id) => {
145 tx_up.send((job_id.clone(), None)).await.unwrap();
146 }
147 ChannelCommand::Ping => {
148 debug!(target: "atc-listener", "Ping received from `{}`", job_id);
149 write_command_buffer = Some(ChannelCommand::Pong);
150 }
151 _ => (),
152 }
153 }
154 }
155
156 Err(ref e) if e.kind() == ErrorKind::ConnectionReset => {
157 debug!(target: "atc-listener", "Connection broken [job-id={}]", job_id);
158 if let Err(e) = tx_up.send((job_id.clone(), None)).await {
159 error!(target: "atc-listener", "Unable to send control message to channel [connection broken]: {:?}", e);
160 };
161 break;
162 }
163 Err(e) => {
164 error!(target: "atc-listener", "Error writing bytes to channel: {:?}", e);
165 break;
166 }
167 }
168 } else {
169 // Should increase
170 read_timeout_ms *= 1.25;
171 read_timeout_ms = read_timeout_ms.min(4096.0);
172 }
173 }
174
175 info!(target: "atc-listener", "Socket/stream handler for job-id=`{}` closed.", job_id);
176 });
177 }
178 } else {
179 debug!(target: "atc-listener", "No new connections in last 5 seconds");
180 }
181 }
182}