signald_rust/socket/
signaldsocket.rs

1use std::sync::{Arc, Mutex};
2use std::os::unix::net::UnixStream;
3use std::thread;
4use std::io::{Write, BufReader, BufRead};
5use bus::{Bus, BusReader};
6use std::time::Duration;
7use crate::signaldresponse::SignaldResponse;
8use crate::signaldresponse::ResponseType::BusUpdate;
9use crate::signaldrequest::SignaldRequest;
10use crate::socket::Socket;
11
12#[allow(dead_code)]
13pub struct SignaldSocket {
14    socket_path: String,
15    socket: UnixStream,
16    bus: Arc<Mutex<Bus<SignaldResponse>>>,
17}
18impl SignaldSocket {
19    pub fn connect(socket_path: String, bus_size: usize) -> SignaldSocket {
20
21        // Connect the socket
22        let socket = match UnixStream::connect(socket_path.to_string()) {
23            Ok(stream) => {
24                stream
25            }
26            Err(_) => {
27                panic!("Failed to connect socket");
28            }
29        };
30        let socket_clone = socket.try_clone().unwrap();
31
32        // Create a bus
33        let bus = Arc::new(Mutex::new(Bus::new(bus_size)));
34
35        // Broadcast on the bus in a new thread
36        let bus_tx = bus.clone();
37        thread::spawn(move || {
38            let reader = BufReader::new(socket);
39            for line in reader.lines() {
40                match line {
41                    Ok(l) => {
42                        let val = serde_json::from_str(&l).unwrap();
43                        let res: SignaldResponse = SignaldResponse::from_value(val);
44                        bus_tx.lock().unwrap().broadcast(res);
45                    },
46                    Err(_) => {}
47                }
48            }
49        });
50
51        // An update message every second to make sure that the receivers can verify the time they're waiting
52        // When there are no messages on the bus the receivers would otherwise be stuck waiting
53        // This is a hacky implementation and should be changed once recv_deadline can be implemented
54        let bus_tx_seconds = bus.clone();
55        let update_response = SignaldResponse {
56            id: None,
57            data: BusUpdate
58        };
59        thread::spawn(move || {
60            loop {
61                thread::sleep(Duration::from_secs(1));
62                bus_tx_seconds.lock().unwrap().broadcast(update_response.clone());
63            }
64        });
65
66
67        Self {
68            socket_path: socket_path,
69            socket: socket_clone,
70            bus: bus,
71        }
72    }
73}
74impl Socket for SignaldSocket {
75    fn send_request(&mut self, request: &SignaldRequest) {
76        let formatted_request = request.to_json_string() + "\n";
77        match self.socket.write_all(formatted_request.as_bytes()) {
78            Err(_) => panic!("Failed to send message"),
79            Ok(_) => {
80                //println!("mesg sent {}", formatted_request);
81            }
82        }
83    }
84
85    fn get_rx(&mut self) -> BusReader<SignaldResponse> {
86        self.bus.lock().unwrap().add_rx()
87    }
88}