atc/libs/
connector.rs

1use crate::{ChannelCommand, Frame};
2
3use log::{debug, error, info, warn};
4use std::{io::ErrorKind, sync::Arc, time::Duration};
5use tokio::{
6    io::{self, AsyncReadExt, AsyncWriteExt},
7    net::TcpStream,
8    sync::{
9        mpsc::{error::TryRecvError, Receiver, Sender},
10        Mutex,
11    },
12    time::timeout,
13};
14
15/// Connect to remote endpoint with given command receiver as controller and
16/// a sender as remote message proxy.
17pub async fn create_connector(
18    uri: String,
19    mut identity: String,
20    rx_ctrl: Arc<Mutex<Receiver<ChannelCommand>>>,
21    tx_msg: Sender<ChannelCommand>,
22    flag_int: Arc<Mutex<bool>>,
23) -> io::Result<()> {
24    let mut stream = TcpStream::connect(uri.clone()).await?;
25
26    let initial_command = ChannelCommand::Identify(identity.clone());
27
28    let mut command = Some(initial_command);
29    let mut buffer = vec![0; 1024];
30    let mut read_timeout_ms = 16f32;
31    let mut parsing_buffer: Vec<u8> = vec![];
32    loop {
33        {
34            if *flag_int.lock().await == true {
35                break Ok(());
36            }
37        }
38        if !command.is_none() {
39            let command_clone = command.clone().unwrap();
40            let frame: Frame = Into::<Frame>::into(command_clone);
41            let frame_bytes: Vec<u8> = frame.clone().into();
42            match stream.write(&frame_bytes).await {
43                Ok(_) => {
44                    debug!(target: "atc-connector", "Message sent to remote endpoint: {}", frame);
45                    command = None;
46                }
47                Err(e) => {
48                    if e.kind() == ErrorKind::ConnectionReset
49                        || e.kind() == ErrorKind::ConnectionAborted
50                    {
51                        error!(target: "atc-connector", "Connection to remote endpoint broken, message cannot be sent: {:?}", e);
52                        break Ok(());
53                    } else {
54                        warn!(target: "atc-connector", "Error writing message to TCP socket: {:?}", e);
55                    }
56                }
57            };
58        }
59
60        {
61            match rx_ctrl.lock().await.try_recv() {
62                Ok(cmd) => {
63                    command = match cmd.clone() {
64                        ChannelCommand::Terminate(_) => {
65                            info!(target: "atc-connector", "User requested job termination, current job will be discarded.");
66                            Some(cmd)
67                        }
68                        ChannelCommand::Identify(id) => {
69                            info!(target: "atc-connector", "User re-identification: {}.", id);
70                            identity = id;
71                            Some(cmd)
72                        }
73                        ChannelCommand::Ping => Some(cmd),
74                        _ => {
75                            warn!(target: "atc-connector", "User requested to send command other than `[ChannelMessage]`, `[Identify]` and `[Terminate]`.");
76                            panic!(
77                                "You should ONLY send `[ChannelMessage]`, `[Identify]` or `[Terminate]` command."
78                            );
79                        }
80                    };
81                }
82                Err(e) => {
83                    if e == TryRecvError::Disconnected {
84                        error!(target: "atc-connector", "Command channel receiver disconnected: {:?}", e);
85                        break Ok(());
86                    }
87                }
88            }
89        }
90
91        if let Ok(res) = timeout(
92            Duration::from_millis(read_timeout_ms.floor() as u64),
93            stream.read(&mut buffer),
94        )
95        .await
96        {
97            match res {
98                Ok(n) => {
99                    if n > 0 {
100                        // Reset read timeout
101                        read_timeout_ms = 16f32;
102
103                        let (frames, remain) =
104                            Frame::parse_sequence(&buffer[0..n], Some(parsing_buffer));
105                        parsing_buffer = remain;
106                        buffer = vec![0; 1024];
107
108                        // let command =
109                        // AiTcpCommand::from(String::from_utf8(buffer[0..n].to_vec()).unwrap());
110                        for frame in frames {
111                            let command: ChannelCommand = frame.into();
112                            if let ChannelCommand::ChannelMessage((id, _)) = command.clone() {
113                                if id == identity {
114                                    tx_msg.send(command).await.unwrap();
115                                }
116                            }
117                        }
118                    }
119                }
120                Err(e) => {
121                    if e.kind() == ErrorKind::ConnectionReset
122                        || e.kind() == ErrorKind::ConnectionAborted
123                    {
124                        error!(target: "atc-connector", "Error reading from TCP socket, possible because of server unavailable or broken: {:?}", e);
125                        break Ok(());
126                    }
127                    warn!(target: "atc-connector", "Error reading message from TCP socket: {:?}", e);
128                }
129            }
130        } else {
131            // Should increase
132            read_timeout_ms *= 1.25;
133            read_timeout_ms = read_timeout_ms.min(4096.0);
134        }
135    }
136}