utils_box_connections/
zmq_client.rs1use anyhow::{Result, bail};
6use zmq::{Context, Socket};
7
8use utils_box_logger::{log_error, log_info, log_trace};
9
10pub struct ZmqClient {
11 server_ip: String,
12 server_port: u16,
13 socket: Socket,
14}
15
16impl ZmqClient {
17 pub fn new(server_ip: String, server_port: u16) -> Result<Self> {
19 let ctx = Context::new();
20
21 let socket = ctx.socket(zmq::REQ)?;
22 socket.connect(&format!("tcp://{server_ip}:{server_port}"))?;
23
24 Ok(Self {
25 server_ip,
26 server_port,
27 socket,
28 })
29 }
30
31 pub fn server_info(&self) -> (String, u16) {
33 (self.server_ip.clone(), self.server_port)
34 }
35
36 pub fn send(&self, data: &[u8]) -> Result<()> {
37 match self.socket.send(data, 0) {
38 Ok(_) => {
39 log_trace!(
40 "[ZmqClient][{}:{}] Send [{} Bytes] SUCCESSFULLY!",
41 self.server_ip,
42 self.server_port,
43 data.len()
44 );
45 Ok(())
46 }
47 Err(e) => {
48 log_error!(
49 "[ZmqClient][{}:{}] Send FAILED with [{}]",
50 self.server_ip,
51 self.server_port,
52 e
53 );
54 bail!(e)
55 }
56 }
57 }
58
59 pub fn receive(&self) -> Result<Vec<u8>> {
60 match self.socket.recv_bytes(0) {
61 Ok(data) => {
62 log_trace!(
63 "[ZmqClient][{}:{}] Received [{} Bytes] SUCCESSFULLY!",
64 self.server_ip,
65 self.server_port,
66 data.len()
67 );
68 Ok(data)
69 }
70 Err(e) => {
71 log_error!(
72 "[ZmqClient][{}:{}] Receive FAILED with [{}]",
73 self.server_ip,
74 self.server_port,
75 e
76 );
77 bail!(e)
78 }
79 }
80 }
81}
82
83impl Drop for ZmqClient {
84 fn drop(&mut self) {
85 match self
86 .socket
87 .disconnect(&format!("tcp://{}:{}", self.server_ip, self.server_port))
88 {
89 Ok(_) => log_info!(
90 "[ZmqClient] Disconnected from [{}:{}]",
91 self.server_ip,
92 self.server_port
93 ),
94 Err(_) => log_error!(
95 "[ZmqClient] FAILED to Grecefully Disconnect from [{}:{}]. Dropping...",
96 self.server_ip,
97 self.server_port
98 ),
99 }
100 }
101}