candid_server/
client_handler.rs

1use std::io::{BufReader, Read, Write};
2use std::net::TcpStream;
3use std::sync::mpsc;
4use std::thread;
5
6use byteorder::{NetworkEndian, ReadBytesExt, WriteBytesExt};
7
8use socketcan::{CANFrame, CANSocket};
9
10/// Spawns two threads: one to read incoming frames from the client (and write them to the
11/// CANSocket), and one to read incoming frames from the `CANReader` (and write them to the client)
12pub fn new_client_handler(
13    mut connection: TcpStream,
14    can_interface: String,
15    can_reader: mpsc::Receiver<CANFrame>,
16) -> (thread::JoinHandle<()>, thread::JoinHandle<()>) {
17    let connection_clone = connection.try_clone().unwrap();
18    // Thread to pass frames from the server to the client
19    let tx_handle = thread::spawn(move || {
20        let ip = connection.peer_addr().unwrap();
21
22        loop {
23            // Get an incoming frame from the reader
24            let frame = can_reader.recv().unwrap();
25
26            // Relay to the client
27            if let Err(_) = write_frame(&mut connection, frame) {
28                println!("Connection to client {:?} dropped.", ip);
29                break;
30            }
31        }
32    });
33
34    // Thread to pass frames from the client to the CAN socket
35    let rx_handle = thread::spawn(move || {
36        let ip = connection_clone.peer_addr().unwrap();
37        let mut connection = BufReader::new(connection_clone);
38
39        // Possible issue: There is a new socket for each thread
40        let can_socket = CANSocket::open(&can_interface).unwrap();
41
42        loop {
43            // Get an incoming frame from the client
44            let id;
45            if let Ok(inner) = connection.read_u32::<NetworkEndian>() {
46                id = inner;
47            } else {
48                println!("Connection to client {:?} dropped.", ip);
49                break;
50            }
51
52            let mut data = [0 as u8; 8];
53            if let Err(_) = connection.read_exact(&mut data) {
54                println!("Connection to client {:?} dropped.", ip);
55                break;
56            }
57
58            // Write the frame to the CAN Socket
59            let frame = CANFrame::new(id, &data, false, false).unwrap();
60            can_socket.write_frame_insist(&frame).unwrap();
61        }
62    });
63
64    (tx_handle, rx_handle)
65}
66
67fn write_frame(connection: &mut TcpStream, frame: CANFrame) -> Result<(), std::io::Error> {
68    connection.write_u32::<NetworkEndian>(frame.id())?;
69    connection.write(&frame.data())?;
70    connection.flush().unwrap();
71    Ok(())
72}