1use crate::{
11 mosaik_protocol::{self, Response},
12 MosaikApi,
13};
14
15use async_std::{
16 net::{TcpListener, TcpStream},
17 prelude::*,
18 task,
19};
20use futures::{
21 channel::{
22 mpsc,
23 oneshot::{self, Canceled},
24 },
25 select,
26 sink::SinkExt,
27 FutureExt,
28};
29use log::{debug, error, info, trace};
30use std::{future::Future, net::SocketAddr, sync::Arc};
31
32type Result<T> = std::result::Result<T, Box<dyn std::error::Error + Send + Sync>>;
33
34pub enum ConnectionDirection {
38 ConnectToAddress(SocketAddr),
39 ListenOnAddress(SocketAddr),
40}
41
42pub(crate) async fn build_connection<T: MosaikApi>(
44 addr: ConnectionDirection,
45 simulator: T,
46) -> Result<()> {
47 let stream: TcpStream = match addr {
49 ConnectionDirection::ListenOnAddress(addr) => {
51 let listener = TcpListener::bind(addr).await?;
52 let (stream, _addr) = listener.accept().await?;
53 info!("Accepting from: {}", stream.peer_addr()?);
54 stream
55 }
56 ConnectionDirection::ConnectToAddress(addr) => TcpStream::connect(addr).await?,
58 };
59
60 let stream = Arc::new(stream);
62
63 let (receiver2broker_tx, receiver2broker_rx) = mpsc::unbounded();
65 let (shutdown_signal_tx, shutdown_signal_rx) = oneshot::channel::<bool>();
68 let (broker2sender_tx, broker2sender_rx) = mpsc::unbounded();
70
71 let receiver_handle = spawn_and_log_error(tcp_receiver(
74 receiver2broker_tx,
75 shutdown_signal_rx,
76 Arc::clone(&stream),
77 ));
78 let broker_handle = task::spawn(broker_loop(
80 receiver2broker_rx,
81 broker2sender_tx,
82 simulator,
83 shutdown_signal_tx,
84 ));
85 spawn_and_log_error(async move {
87 tcp_sender(broker2sender_rx, Arc::clone(&stream)).await
89 });
90
91 receiver_handle.await;
92 broker_handle.await;
93 info!("Finished TCP");
94 Ok(())
95}
96
97async fn tcp_receiver(
99 mut broker: mpsc::UnboundedSender<String>,
100 shutdown_signal_rx: oneshot::Receiver<bool>,
101 stream: Arc<TcpStream>,
102) -> Result<()> {
103 info!("Started connection loop");
104 let mut stream = &*stream;
105 let mut size_data = [0u8; 4]; let mut rx = shutdown_signal_rx.fuse();
108
109 loop {
111 select! {
112 msg = stream.read_exact(&mut size_data).fuse() => {
113 read_complete_message(msg, &size_data, stream, &mut broker).await?;
115 },
116 shutdown_signal = rx => {
117 shutdown_msg(shutdown_signal);
118 break;
119 },
120 }
121 }
122
123 info!("Receiver finished.");
125 Ok(())
126}
127
128fn shutdown_msg(shutdown_signal: std::result::Result<bool, Canceled>) {
130 if shutdown_signal.is_ok() {
131 info!("TCP Receiver received shutdown signal.");
132 } else {
133 info!("TCP Receivers shutdown signal channel closed. Shutting down...");
134 }
135}
136
137async fn read_complete_message(
139 msg: std::result::Result<(), std::io::Error>,
140 size_data: &[u8; 4],
141 mut stream: &TcpStream,
142 broker: &mut mpsc::UnboundedSender<String>,
143) -> Result<()> {
144 debug!("Received a new message");
146 msg?;
147
148 let size = u32::from_be_bytes(*size_data) as usize;
149 debug!("New message contains {} Bytes", size);
150
151 let mut full_package = vec![0; size];
153 stream.read_exact(&mut full_package).await?;
154
155 debug!("Parsing string as utf8");
156 let json_string = String::from_utf8(full_package[0..size].to_vec())?;
157
158 debug!("Sending message to broker: {:?}", json_string);
159 broker.send(json_string).await?;
160
161 Ok(())
162}
163
164async fn tcp_sender(
166 mut messages: mpsc::UnboundedReceiver<Vec<u8>>,
167 stream: Arc<TcpStream>,
168) -> Result<()> {
169 let mut stream = &*stream;
170
171 while let Some(msg) = messages.next().await {
174 stream.write_all(&msg).await?; }
176
177 info!("Sender finished.");
178 Ok(())
179}
180
181async fn broker_loop<T: MosaikApi>(
183 mut received_requests: mpsc::UnboundedReceiver<String>,
184 mut response_sender: mpsc::UnboundedSender<Vec<u8>>,
185 mut simulator: T,
186 shutdown_signal_tx: oneshot::Sender<bool>,
187) {
188 'event_loop: while let Some(json_string) = received_requests.next().await {
190 debug!("Received event: {:?}", json_string);
191 match mosaik_protocol::parse_json_request(&json_string) {
194 Ok(request) => {
195 trace!("The request: {:?}", request);
197 match mosaik_protocol::handle_request(&mut simulator, request) {
198 Response::Reply(mosaik_msg) => {
199 let response = mosaik_msg.to_network_message();
200
201 if let Err(e) = response_sender.send(response).await {
204 error!("error sending response to peer: {}", e);
205 }
207 }
208 Response::Stop => {
209 info!("Received stop signal. Closing all connections ...");
210 drop(response_sender);
212 drop(received_requests);
213 if let Err(e) = shutdown_signal_tx.send(true) {
215 error!("error sending to the shutdown channel: {}", e);
216 }
217 break 'event_loop;
218 }
219 }
220 }
221 Err(e) => {
222 error!("Error while parsing the request: {:?}", e);
224 todo!("send a failure message")
225 }
226 }
227 }
228 info!("Broker finished.");
229}
230
231fn spawn_and_log_error<F>(fut: F) -> task::JoinHandle<()>
233where
234 F: Future<Output = Result<()>> + Send + 'static,
235{
236 trace!("Spawn task");
237 task::spawn(async move {
238 trace!("Task Spawned");
239 if let Err(e) = fut.await {
240 error!("{}", e);
241 }
242 })
243}