1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53
use crate::message::{Message, OutMessage, SendMessage}; use crate::utils::write_msg; use std::net::TcpStream; use std::sync::mpsc::{channel, Receiver, Sender}; use std::time::Duration; use std::{io, thread}; pub struct EventBusPublisher { socket: TcpStream, tx: Sender<()>, } impl EventBusPublisher { pub fn new(socket: TcpStream) -> io::Result<Self> { let (tx, rx) = channel::<()>(); let mut created = EventBusPublisher { socket, tx }; created.send_heartbeat_periodically(rx)?; Ok(created) } pub fn send(&mut self, msg: SendMessage) -> io::Result<&mut Self> { write_msg(&self.socket, &OutMessage::Send(msg)).map(|_| self) } pub fn publish(&mut self, msg: Message) -> io::Result<&mut Self> { write_msg(&self.socket, &OutMessage::Publish(msg)).map(|_| self) } pub fn ping(&mut self) -> io::Result<&mut Self> { write_msg(&self.socket, &OutMessage::Ping).map(|_| self) } fn send_heartbeat_periodically(&mut self, rx: Receiver<()>) -> io::Result<()> { let heartbeat_socket = self.socket.try_clone()?; thread::spawn(move || loop { if write_msg(&heartbeat_socket, &OutMessage::Ping).is_err() { println!("Could not send periodic heartbeat to TCP server") } if rx.try_recv().ok().is_none() { thread::sleep(Duration::from_secs(10)); } else { break; } }); Ok(()) } } impl Drop for EventBusPublisher { fn drop(&mut self) { if self.tx.send(()).is_err() {} } }