ws2can/
lib.rs

1use std::net::SocketAddr;
2use std::result::Result::Ok;
3
4use futures_util::{future, pin_mut, SinkExt, StreamExt};
5use log::*;
6use tokio::net::{TcpListener, TcpStream};
7use tokio::sync;
8use tokio_socketcan::{CANFrame, CANSocket};
9use tokio_tungstenite::tungstenite::Message;
10
11use can::CanFrame;
12
13mod can;
14
15type Sender<T = CANFrame> = sync::mpsc::Sender<T>;
16type Receiver<T = CANFrame> = sync::mpsc::Receiver<T>;
17
18async fn handle_ws_connection(
19    raw_stream: TcpStream,
20    addr: SocketAddr,
21    in_tx: &Sender,
22    mut out_rx: Receiver,
23) {
24    info!("Incoming TCP connection from: {}", addr);
25
26    let stream = tokio_tungstenite::accept_async(raw_stream)
27        .await
28        .expect("Error during the websocket handshake occurred");
29    info!("WebSocket connection established: {}", addr);
30
31    let (mut outgoing, incoming) = stream.split();
32
33    let ws_receiver = incoming.for_each(|msg| async move {
34        match msg {
35            Ok(msg) => {
36                if let Ok(msg) = msg.to_text() {
37                    if let Ok(can_frame) = CanFrame::from_json(msg) {
38                        info!("WS(in): {}", msg);
39                        if let Err(e) = in_tx.clone().send(can_frame.to_linux_frame()).await {
40                            error!(
41                                "Error occurred while sending frame from WS to CAN channel: {:?}",
42                                e
43                            );
44                        }
45                    } else {
46                        error!("Couldn't parse received can frame json: {}", msg);
47                    }
48                }
49            }
50            Err(e) => {
51                error!("Error occurred while WS receiving a message: {:?}", e);
52            }
53        }
54    });
55
56    let ws_transmitter = tokio::spawn(async move {
57        while let Some(f) = out_rx.recv().await {
58            let j = CanFrame::from_linux_frame(f).to_json();
59            info!("WS(out): {}", j);
60            let msg = Message::Text(j);
61            if let Err(e) = outgoing.send(msg).await {
62                error!("Error occurred while sending WS message: {:?}", e);
63            }
64        }
65    });
66
67    pin_mut!(ws_receiver, ws_transmitter);
68    future::select(ws_receiver, ws_transmitter).await;
69    info!("WS disconnected!");
70}
71
72async fn start_ws(addr: &str, in_tx: &Sender, out_rx: Receiver) {
73    let listener = TcpListener::bind(addr)
74        .await
75        .expect(&format!("Can't bind websocket address {}", addr));
76
77    info!("Listening on: {}", addr);
78
79    if let Ok((stream, addr)) = listener.accept().await {
80        handle_ws_connection(stream, addr, &in_tx, out_rx).await;
81    }
82}
83
84async fn start_can(can_addr: &str, out_tx: &Sender, mut in_rx: Receiver) {
85    let can_socket = CANSocket::open(&can_addr).unwrap();
86    let (mut outgoing, incoming) = can_socket.split();
87
88    let can_receiver = incoming.for_each(|msg| async move {
89        match msg {
90            Ok(msg) => {
91                info!("CAN(in): {:?}", msg);
92                if let Err(e) = out_tx.clone().send(msg).await {
93                    error!(
94                        "Error occurred while sending frame from CAN to WS channel: {:?}",
95                        e
96                    );
97                }
98            }
99            Err(e) => {
100                error!("Error occurred while CAN receiving a message: {:?}", e);
101            }
102        }
103    });
104
105    let can_transmitter = tokio::spawn(async move {
106        while let Some(frame) = in_rx.recv().await {
107            info!("CAN(out): {:?}", frame);
108            if let Err(e) = outgoing.send(frame).await {
109                error!("Error occurred while sending CAN frame: {:?}", e);
110            }
111        }
112    });
113
114    if let (Err(e), _) = tokio::join!(can_transmitter, can_receiver) {
115        error!("Error occurred in can_transmitter task: {:?}", e);
116    }
117}
118
119///       (in_tx   ,   in_rx )      = sync::mpsc::channel();
120///       (out_tx  ,   out_rx)      = sync::mpsc::channel();
121///     -> in_tx ----> in_rx ->
122///  WS                          CAN
123///     <- out_tx <---- out_rx <-
124pub async fn run(ws_addr: &str, can_addr: &str) -> Result<(), Box<dyn std::error::Error>> {
125    let (in_tx, in_rx) = sync::mpsc::channel(100);
126    let (out_tx, out_rx) = sync::mpsc::channel(100);
127
128    let ws = start_ws(&ws_addr, &in_tx, out_rx);
129    let can = start_can(&can_addr, &out_tx, in_rx);
130
131    tokio::join!(ws, can);
132
133    Ok(())
134}