1use std::net::SocketAddr;
2use std::result::Result::Ok;
3
4use futures_util::{future, pin_mut, SinkExt, StreamExt};
5use log::*;
6use tokio::net::{TcpListener, TcpStream};
7use tokio::sync;
8use tokio_socketcan::{CANFrame, CANSocket};
9use tokio_tungstenite::tungstenite::Message;
10
11use can::CanFrame;
12
13mod can;
14
15type Sender<T = CANFrame> = sync::mpsc::Sender<T>;
16type Receiver<T = CANFrame> = sync::mpsc::Receiver<T>;
17
18async fn handle_ws_connection(
19 raw_stream: TcpStream,
20 addr: SocketAddr,
21 in_tx: &Sender,
22 mut out_rx: Receiver,
23) {
24 info!("Incoming TCP connection from: {}", addr);
25
26 let stream = tokio_tungstenite::accept_async(raw_stream)
27 .await
28 .expect("Error during the websocket handshake occurred");
29 info!("WebSocket connection established: {}", addr);
30
31 let (mut outgoing, incoming) = stream.split();
32
33 let ws_receiver = incoming.for_each(|msg| async move {
34 match msg {
35 Ok(msg) => {
36 if let Ok(msg) = msg.to_text() {
37 if let Ok(can_frame) = CanFrame::from_json(msg) {
38 info!("WS(in): {}", msg);
39 if let Err(e) = in_tx.clone().send(can_frame.to_linux_frame()).await {
40 error!(
41 "Error occurred while sending frame from WS to CAN channel: {:?}",
42 e
43 );
44 }
45 } else {
46 error!("Couldn't parse received can frame json: {}", msg);
47 }
48 }
49 }
50 Err(e) => {
51 error!("Error occurred while WS receiving a message: {:?}", e);
52 }
53 }
54 });
55
56 let ws_transmitter = tokio::spawn(async move {
57 while let Some(f) = out_rx.recv().await {
58 let j = CanFrame::from_linux_frame(f).to_json();
59 info!("WS(out): {}", j);
60 let msg = Message::Text(j);
61 if let Err(e) = outgoing.send(msg).await {
62 error!("Error occurred while sending WS message: {:?}", e);
63 }
64 }
65 });
66
67 pin_mut!(ws_receiver, ws_transmitter);
68 future::select(ws_receiver, ws_transmitter).await;
69 info!("WS disconnected!");
70}
71
72async fn start_ws(addr: &str, in_tx: &Sender, out_rx: Receiver) {
73 let listener = TcpListener::bind(addr)
74 .await
75 .expect(&format!("Can't bind websocket address {}", addr));
76
77 info!("Listening on: {}", addr);
78
79 if let Ok((stream, addr)) = listener.accept().await {
80 handle_ws_connection(stream, addr, &in_tx, out_rx).await;
81 }
82}
83
84async fn start_can(can_addr: &str, out_tx: &Sender, mut in_rx: Receiver) {
85 let can_socket = CANSocket::open(&can_addr).unwrap();
86 let (mut outgoing, incoming) = can_socket.split();
87
88 let can_receiver = incoming.for_each(|msg| async move {
89 match msg {
90 Ok(msg) => {
91 info!("CAN(in): {:?}", msg);
92 if let Err(e) = out_tx.clone().send(msg).await {
93 error!(
94 "Error occurred while sending frame from CAN to WS channel: {:?}",
95 e
96 );
97 }
98 }
99 Err(e) => {
100 error!("Error occurred while CAN receiving a message: {:?}", e);
101 }
102 }
103 });
104
105 let can_transmitter = tokio::spawn(async move {
106 while let Some(frame) = in_rx.recv().await {
107 info!("CAN(out): {:?}", frame);
108 if let Err(e) = outgoing.send(frame).await {
109 error!("Error occurred while sending CAN frame: {:?}", e);
110 }
111 }
112 });
113
114 if let (Err(e), _) = tokio::join!(can_transmitter, can_receiver) {
115 error!("Error occurred in can_transmitter task: {:?}", e);
116 }
117}
118
119pub async fn run(ws_addr: &str, can_addr: &str) -> Result<(), Box<dyn std::error::Error>> {
125 let (in_tx, in_rx) = sync::mpsc::channel(100);
126 let (out_tx, out_rx) = sync::mpsc::channel(100);
127
128 let ws = start_ws(&ws_addr, &in_tx, out_rx);
129 let can = start_can(&can_addr, &out_tx, in_rx);
130
131 tokio::join!(ws, can);
132
133 Ok(())
134}