atc/libs/
listener.rs

1use crate::{ChannelCommand, Frame};
2
3use log::{debug, error, info, warn};
4use std::{io::ErrorKind, sync::Arc, time::Duration};
5use tokio::{
6    io::{self, AsyncReadExt, AsyncWriteExt},
7    net::TcpListener,
8    sync::{
9        mpsc::{channel, Sender},
10        Mutex,
11    },
12    time::{timeout, Instant},
13};
14
15/// Create a TCP listener and a upstream channel sender receiver
16/// to allow stream/socket identification and registration.
17pub async fn create_listener(
18    uri: String,
19    tx_up: Sender<(String, Option<Sender<ChannelCommand>>)>,
20    flag_int: Arc<Mutex<bool>>,
21) -> io::Result<()> {
22    let listener = TcpListener::bind(uri.clone()).await?;
23
24    // Listener loop. For ever incoming socket/stream, create a new async task
25    // to handle it.
26    loop {
27        {
28            if *flag_int.lock().await == true {
29                return Ok(());
30            }
31        }
32
33        if let Ok(listen_result) = timeout(Duration::from_secs(5), listener.accept()).await {
34            if let Ok((mut stream, _)) = listen_result {
35                let tx_up = tx_up.clone();
36
37                // Time elapsed since last PING or channel message.
38                let mut last_active = Instant::now();
39
40                let flag_int_clone = flag_int.clone();
41                tokio::spawn(async move {
42                    info!(target: "atc-listener", "New incomming connection");
43                    let (tx_down, mut rx_down) = channel::<ChannelCommand>(1024);
44
45                    let mut buffer = vec![0u8; 65535];
46                    
47                    let mut parsing_buffer: Vec<u8> = vec![];
48                    let mut write_command_buffer: Option<ChannelCommand> = None;
49
50                    let mut job_id = String::new();
51                    let mut read_timeout_ms = 16f32;
52
53                    loop {
54                        // Check for time elapsed.
55                        if last_active.elapsed().as_secs() > 60 {
56                            warn!(target: "atc-listener", "Connection will be reset due to 60 seconds of inactivity.");
57                            return;
58                        }
59
60                        {
61                            if *flag_int_clone.lock().await == true {
62                                return;
63                            }
64                        }
65                        match rx_down.try_recv() {
66                            Ok(command) => {
67                                write_command_buffer = Some(command);
68                            }
69                            Err(_) => (),
70                        }
71
72                        if let Some(command) = write_command_buffer.clone() {
73                            let frame: Frame = command.into();
74                            let frame_bytes: Vec<u8> = frame.into();
75                            match stream.write_all(&frame_bytes).await {
76                                Ok(_) => {
77                                    write_command_buffer = None;
78                                }
79                                Err(e) => {
80                                    if e.kind() == ErrorKind::ConnectionReset {
81                                        warn!(target: "atc-listener", "Connection closed by peer.");
82                                        break;
83                                    }
84                                    warn!(target:"atc-listener", "Unable to write to stream: {:?}", e);
85                                }
86                            };
87                        }
88
89                        if let Ok(res) = timeout(
90                            Duration::from_millis(read_timeout_ms.floor() as u64),
91                            stream.read(&mut buffer),
92                        )
93                        .await
94                        {
95                            {
96                                if *flag_int_clone.lock().await == true {
97                                    return;
98                                }
99                            }
100
101                            match res {
102                                Ok(n) => {
103                                    if n == 0 {
104                                        if let Err(e) = stream.shutdown().await {
105                                            // Notify manager a connection is down and should be removed from btree-map.
106                                            tx_up.send((job_id.clone(), None)).await.unwrap();
107                                            warn!(target: "atc-listener", "Unable to shutdown TCP connection from server side: {:?}", e);
108                                        };
109                                        break (());
110                                    }
111
112                                    // Update elapsed time.
113                                    last_active = Instant::now();
114
115                                    // Reset read timeout.
116                                    read_timeout_ms = 16f32;
117
118                                    let (frames, remain) =
119                                        Frame::parse_sequence(&buffer[0..n], Some(parsing_buffer));
120                                    parsing_buffer = remain;
121                                    // let content = String::from_utf8(buffer[0..n].to_vec()).unwrap();
122
123                                    // debug!(target: "atc-listener", "RAW MESSAGE: `{}`", content);
124
125                                    buffer = vec![0u8; 65535];
126
127                                    for frame in frames {
128                                        let command = ChannelCommand::from(frame);
129
130                                        match command {
131                                            ChannelCommand::Identify(id) => {
132                                                job_id = id;
133                                                info!(target: "atc-listener", "Received identification: {}", job_id);
134                                                match tx_up
135                                                    .send((job_id.clone(), Some(tx_down.clone())))
136                                                    .await
137                                                {
138                                                    Ok(_) => (),
139                                                    Err(_) => {
140                                                        error!(target: "atc-listener", "Unable to write to server control channel sender.")
141                                                    }
142                                                }
143                                            }
144                                            ChannelCommand::Terminate(job_id) => {
145                                                tx_up.send((job_id.clone(), None)).await.unwrap();
146                                            }
147                                            ChannelCommand::Ping => {
148                                                debug!(target: "atc-listener", "Ping received from `{}`", job_id);
149                                                write_command_buffer = Some(ChannelCommand::Pong);
150                                            }
151                                            _ => (),
152                                        }
153                                    }
154                                }
155
156                                Err(ref e) if e.kind() == ErrorKind::ConnectionReset => {
157                                    debug!(target: "atc-listener", "Connection broken [job-id={}]", job_id);
158                                    if let Err(e) = tx_up.send((job_id.clone(), None)).await {
159                                        error!(target: "atc-listener", "Unable to send control message to channel [connection broken]: {:?}", e);
160                                    };
161                                    break;
162                                }
163                                Err(e) => {
164                                    error!(target: "atc-listener", "Error writing bytes to channel: {:?}", e);
165                                    break;
166                                }
167                            }
168                        } else {
169                            // Should increase
170                            read_timeout_ms *= 1.25;
171                            read_timeout_ms = read_timeout_ms.min(4096.0);
172                        }
173                    }
174
175                    info!(target: "atc-listener", "Socket/stream handler for job-id=`{}` closed.", job_id);
176                });
177            }
178        } else {
179            debug!(target: "atc-listener", "No new connections in last 5 seconds");
180        }
181    }
182}