utils_box_connections/
zmq_client.rs

1//! # Zero-MQ Client utility
2//! A small ZMQ utility to connect and manipulate ZMQ connections to a server.
3//! Useful for sending and receiving data via the ZMQ transport layer.
4
5use anyhow::{Result, bail};
6use zmq::{Context, Socket};
7
8use utils_box_logger::{log_error, log_info, log_trace};
9
10pub struct ZmqClient {
11    server_ip: String,
12    server_port: u16,
13    socket: Socket,
14}
15
16impl ZmqClient {
17    /// Create a new ZMQ connection to the specified server and connect to it
18    pub fn new(server_ip: String, server_port: u16) -> Result<Self> {
19        let ctx = Context::new();
20
21        let socket = ctx.socket(zmq::REQ)?;
22        socket.connect(&format!("tcp://{server_ip}:{server_port}"))?;
23
24        Ok(Self {
25            server_ip,
26            server_port,
27            socket,
28        })
29    }
30
31    /// Get the server IP and port information
32    pub fn server_info(&self) -> (String, u16) {
33        (self.server_ip.clone(), self.server_port)
34    }
35
36    pub fn send(&self, data: &[u8]) -> Result<()> {
37        match self.socket.send(data, 0) {
38            Ok(_) => {
39                log_trace!(
40                    "[ZmqClient][{}:{}] Send [{} Bytes] SUCCESSFULLY!",
41                    self.server_ip,
42                    self.server_port,
43                    data.len()
44                );
45                Ok(())
46            }
47            Err(e) => {
48                log_error!(
49                    "[ZmqClient][{}:{}] Send FAILED with [{}]",
50                    self.server_ip,
51                    self.server_port,
52                    e
53                );
54                bail!(e)
55            }
56        }
57    }
58
59    pub fn receive(&self) -> Result<Vec<u8>> {
60        match self.socket.recv_bytes(0) {
61            Ok(data) => {
62                log_trace!(
63                    "[ZmqClient][{}:{}] Received [{} Bytes] SUCCESSFULLY!",
64                    self.server_ip,
65                    self.server_port,
66                    data.len()
67                );
68                Ok(data)
69            }
70            Err(e) => {
71                log_error!(
72                    "[ZmqClient][{}:{}] Receive FAILED with [{}]",
73                    self.server_ip,
74                    self.server_port,
75                    e
76                );
77                bail!(e)
78            }
79        }
80    }
81}
82
83impl Drop for ZmqClient {
84    fn drop(&mut self) {
85        match self
86            .socket
87            .disconnect(&format!("tcp://{}:{}", self.server_ip, self.server_port))
88        {
89            Ok(_) => log_info!(
90                "[ZmqClient] Disconnected from [{}:{}]",
91                self.server_ip,
92                self.server_port
93            ),
94            Err(_) => log_error!(
95                "[ZmqClient] FAILED to Grecefully Disconnect from [{}:{}]. Dropping...",
96                self.server_ip,
97                self.server_port
98            ),
99        }
100    }
101}