1use crate::{ChannelCommand, Frame};
2
3use log::{debug, error, info, warn};
4use std::{io::ErrorKind, sync::Arc, time::Duration};
5use tokio::{
6 io::{self, AsyncReadExt, AsyncWriteExt},
7 net::TcpStream,
8 sync::{
9 mpsc::{error::TryRecvError, Receiver, Sender},
10 Mutex,
11 },
12 time::timeout,
13};
14
15pub async fn create_connector(
18 uri: String,
19 mut identity: String,
20 rx_ctrl: Arc<Mutex<Receiver<ChannelCommand>>>,
21 tx_msg: Sender<ChannelCommand>,
22 flag_int: Arc<Mutex<bool>>,
23) -> io::Result<()> {
24 let mut stream = TcpStream::connect(uri.clone()).await?;
25
26 let initial_command = ChannelCommand::Identify(identity.clone());
27
28 let mut command = Some(initial_command);
29 let mut buffer = vec![0; 1024];
30 let mut read_timeout_ms = 16f32;
31 let mut parsing_buffer: Vec<u8> = vec![];
32 loop {
33 {
34 if *flag_int.lock().await == true {
35 break Ok(());
36 }
37 }
38 if !command.is_none() {
39 let command_clone = command.clone().unwrap();
40 let frame: Frame = Into::<Frame>::into(command_clone);
41 let frame_bytes: Vec<u8> = frame.clone().into();
42 match stream.write(&frame_bytes).await {
43 Ok(_) => {
44 debug!(target: "atc-connector", "Message sent to remote endpoint: {}", frame);
45 command = None;
46 }
47 Err(e) => {
48 if e.kind() == ErrorKind::ConnectionReset
49 || e.kind() == ErrorKind::ConnectionAborted
50 {
51 error!(target: "atc-connector", "Connection to remote endpoint broken, message cannot be sent: {:?}", e);
52 break Ok(());
53 } else {
54 warn!(target: "atc-connector", "Error writing message to TCP socket: {:?}", e);
55 }
56 }
57 };
58 }
59
60 {
61 match rx_ctrl.lock().await.try_recv() {
62 Ok(cmd) => {
63 command = match cmd.clone() {
64 ChannelCommand::Terminate(_) => {
65 info!(target: "atc-connector", "User requested job termination, current job will be discarded.");
66 Some(cmd)
67 }
68 ChannelCommand::Identify(id) => {
69 info!(target: "atc-connector", "User re-identification: {}.", id);
70 identity = id;
71 Some(cmd)
72 }
73 ChannelCommand::Ping => Some(cmd),
74 _ => {
75 warn!(target: "atc-connector", "User requested to send command other than `[ChannelMessage]`, `[Identify]` and `[Terminate]`.");
76 panic!(
77 "You should ONLY send `[ChannelMessage]`, `[Identify]` or `[Terminate]` command."
78 );
79 }
80 };
81 }
82 Err(e) => {
83 if e == TryRecvError::Disconnected {
84 error!(target: "atc-connector", "Command channel receiver disconnected: {:?}", e);
85 break Ok(());
86 }
87 }
88 }
89 }
90
91 if let Ok(res) = timeout(
92 Duration::from_millis(read_timeout_ms.floor() as u64),
93 stream.read(&mut buffer),
94 )
95 .await
96 {
97 match res {
98 Ok(n) => {
99 if n > 0 {
100 read_timeout_ms = 16f32;
102
103 let (frames, remain) =
104 Frame::parse_sequence(&buffer[0..n], Some(parsing_buffer));
105 parsing_buffer = remain;
106 buffer = vec![0; 1024];
107
108 for frame in frames {
111 let command: ChannelCommand = frame.into();
112 if let ChannelCommand::ChannelMessage((id, _)) = command.clone() {
113 if id == identity {
114 tx_msg.send(command).await.unwrap();
115 }
116 }
117 }
118 }
119 }
120 Err(e) => {
121 if e.kind() == ErrorKind::ConnectionReset
122 || e.kind() == ErrorKind::ConnectionAborted
123 {
124 error!(target: "atc-connector", "Error reading from TCP socket, possible because of server unavailable or broken: {:?}", e);
125 break Ok(());
126 }
127 warn!(target: "atc-connector", "Error reading message from TCP socket: {:?}", e);
128 }
129 }
130 } else {
131 read_timeout_ms *= 1.25;
133 read_timeout_ms = read_timeout_ms.min(4096.0);
134 }
135 }
136}