mosaik_api/
tcp.rs

1//! The async TCP-Manager for the communication between Mosaik and the simulators.
2//!
3//! It consists of 3 loops:
4//! 1. The `tcp_receiver` reads the requests from the TCP-Stream and sends them to the `broker_loop`.
5//! 2. The `broker_loop` receives the requests, parses them, calls the API and sends the response to the `tcp_sender`.
6//! 3. The `tcp_sender` receives the responses from the `broker_loop` and writes them to the TCP-Stream.
7//!
8//! The `build_connection` function is the entry point for the TCP-Manager. It creates a TCP-Stream and spawns the 3 loops.
9
10use crate::{
11    mosaik_protocol::{self, Response},
12    MosaikApi,
13};
14
15use async_std::{
16    net::{TcpListener, TcpStream},
17    prelude::*,
18    task,
19};
20use futures::{
21    channel::{
22        mpsc,
23        oneshot::{self, Canceled},
24    },
25    select,
26    sink::SinkExt,
27    FutureExt,
28};
29use log::{debug, error, info, trace};
30use std::{future::Future, net::SocketAddr, sync::Arc};
31
32type Result<T> = std::result::Result<T, Box<dyn std::error::Error + Send + Sync>>;
33
34/// The direction of the connection with the address of the socket.
35/// Either we listen on an address or we connect to an address.
36/// This is used in the `run_simulation` function.
37pub enum ConnectionDirection {
38    ConnectToAddress(SocketAddr),
39    ListenOnAddress(SocketAddr),
40}
41
42/// Build the connection between Mosaik and us. 2 cases, we connect to them or they connect to us.
43pub(crate) async fn build_connection<T: MosaikApi>(
44    addr: ConnectionDirection,
45    simulator: T,
46) -> Result<()> {
47    // Create a TCP Stream
48    let stream: TcpStream = match addr {
49        // Case: We need to listen for a possible connector
50        ConnectionDirection::ListenOnAddress(addr) => {
51            let listener = TcpListener::bind(addr).await?;
52            let (stream, _addr) = listener.accept().await?;
53            info!("Accepting from: {}", stream.peer_addr()?);
54            stream
55        }
56        // Case: We need to connect to a stream
57        ConnectionDirection::ConnectToAddress(addr) => TcpStream::connect(addr).await?,
58    };
59
60    // Wrap the stream in an Arc to share it between the tasks
61    let stream = Arc::new(stream);
62
63    // Create the channels for the communication between the tasks
64    let (receiver2broker_tx, receiver2broker_rx) = mpsc::unbounded();
65    // The broker_loop needs to be able to shutdown the receiver_loop
66    // the tcp_sender will be shutdown by dropping the channel to it in the broker_loop
67    let (shutdown_signal_tx, shutdown_signal_rx) = oneshot::channel::<bool>();
68    // Channel to the writer loop, gets shutdown by dropping the channel in the broker_loop
69    let (broker2sender_tx, broker2sender_rx) = mpsc::unbounded();
70
71    // Spawn the tasks
72    // 1. Read the requests from the TCP-Stream and send them to the broker_loop
73    let receiver_handle = spawn_and_log_error(tcp_receiver(
74        receiver2broker_tx,
75        shutdown_signal_rx,
76        Arc::clone(&stream),
77    ));
78    // 2. Connect broker_loop with the receiver_loop, simulator and sender_loop, add a connection to
79    let broker_handle = task::spawn(broker_loop(
80        receiver2broker_rx,
81        broker2sender_tx,
82        simulator,
83        shutdown_signal_tx,
84    ));
85    // 3. Connect broker loop to the TCP sender loop.
86    spawn_and_log_error(async move {
87        //spawn a connection writer with the message received over the channel
88        tcp_sender(broker2sender_rx, Arc::clone(&stream)).await
89    });
90
91    receiver_handle.await;
92    broker_handle.await;
93    info!("Finished TCP");
94    Ok(())
95}
96
97/// Receive the Requests, send them to the `broker_loop`.
98async fn tcp_receiver(
99    mut broker: mpsc::UnboundedSender<String>,
100    shutdown_signal_rx: oneshot::Receiver<bool>,
101    stream: Arc<TcpStream>,
102) -> Result<()> {
103    info!("Started connection loop");
104    let mut stream = &*stream;
105    let mut size_data = [0u8; 4]; // use 4 byte buffer for the big_endian number in front of the request.
106
107    let mut rx = shutdown_signal_rx.fuse();
108
109    // Loop until no incoming message stream gets closed or shutdown signal is received
110    loop {
111        select! {
112            msg = stream.read_exact(&mut size_data).fuse() => {
113                //Read the rest of the data and send it to the broker_loop
114                read_complete_message(msg, &size_data, stream, &mut broker).await?;
115            },
116            shutdown_signal = rx => {
117                shutdown_msg(shutdown_signal);
118                break;
119            },
120        }
121    }
122
123    // Receiver finished
124    info!("Receiver finished.");
125    Ok(())
126}
127
128// Helper for the tcp receiver shutdown messages
129fn shutdown_msg(shutdown_signal: std::result::Result<bool, Canceled>) {
130    if shutdown_signal.is_ok() {
131        info!("TCP Receiver received shutdown signal.");
132    } else {
133        info!("TCP Receivers shutdown signal channel closed. Shutting down...");
134    }
135}
136
137// Helper to read messages in the tcp receiver messages
138async fn read_complete_message(
139    msg: std::result::Result<(), std::io::Error>,
140    size_data: &[u8; 4],
141    mut stream: &TcpStream,
142    broker: &mut mpsc::UnboundedSender<String>,
143) -> Result<()> {
144    //Check if there was an error reading the size data
145    debug!("Received a new message");
146    msg?;
147
148    let size = u32::from_be_bytes(*size_data) as usize;
149    debug!("New message contains {} Bytes", size);
150
151    // Read the rest of the data
152    let mut full_package = vec![0; size];
153    stream.read_exact(&mut full_package).await?;
154
155    debug!("Parsing string as utf8");
156    let json_string = String::from_utf8(full_package[0..size].to_vec())?;
157
158    debug!("Sending message to broker: {:?}", json_string);
159    broker.send(json_string).await?;
160
161    Ok(())
162}
163
164// Receive the Response from the broker_loop and write it in the stream.
165async fn tcp_sender(
166    mut messages: mpsc::UnboundedReceiver<Vec<u8>>,
167    stream: Arc<TcpStream>,
168) -> Result<()> {
169    let mut stream = &*stream;
170
171    // loop for the messages
172    // messages will be None when the broker_loop is closed -> which ends the loop
173    while let Some(msg) = messages.next().await {
174        stream.write_all(&msg).await?; //write the message
175    }
176
177    info!("Sender finished.");
178    Ok(())
179}
180
181/// Receive requests from the `connection_loop`, parse them, get the values from the API and send the finished response to the `connection_writer_loop`
182async fn broker_loop<T: MosaikApi>(
183    mut received_requests: mpsc::UnboundedReceiver<String>,
184    mut response_sender: mpsc::UnboundedSender<Vec<u8>>,
185    mut simulator: T,
186    shutdown_signal_tx: oneshot::Sender<bool>,
187) {
188    //loop for the different events.
189    'event_loop: while let Some(json_string) = received_requests.next().await {
190        debug!("Received event: {:?}", json_string);
191        //The event that will happen the rest of the time, because the only connector is mosaik.
192        //parse the request
193        match mosaik_protocol::parse_json_request(&json_string) {
194            Ok(request) => {
195                //Handle the request -> simulations calls etc.
196                trace!("The request: {:?}", request);
197                match mosaik_protocol::handle_request(&mut simulator, request) {
198                    Response::Reply(mosaik_msg) => {
199                        let response = mosaik_msg.to_network_message();
200
201                        //get the second argument in the tuple of peer
202                        //-> send the message to mosaik channel receiver
203                        if let Err(e) = response_sender.send(response).await {
204                            error!("error sending response to peer: {}", e);
205                            // FIXME what to send in this case? Failure?
206                        }
207                    }
208                    Response::Stop => {
209                        info!("Received stop signal. Closing all connections ...");
210                        // shutdown sender loop
211                        drop(response_sender);
212                        drop(received_requests);
213                        // shutdown receiver loop
214                        if let Err(e) = shutdown_signal_tx.send(true) {
215                            error!("error sending to the shutdown channel: {}", e);
216                        }
217                        break 'event_loop;
218                    }
219                }
220            }
221            Err(e) => {
222                //if let Err(e) = peer.1.send()
223                error!("Error while parsing the request: {:?}", e);
224                todo!("send a failure message")
225            }
226        }
227    }
228    info!("Broker finished.");
229}
230
231/// Spawns the tasks and handles errors.
232fn spawn_and_log_error<F>(fut: F) -> task::JoinHandle<()>
233where
234    F: Future<Output = Result<()>> + Send + 'static,
235{
236    trace!("Spawn task");
237    task::spawn(async move {
238        trace!("Task Spawned");
239        if let Err(e) = fut.await {
240            error!("{}", e);
241        }
242    })
243}