Skip to main content

scud_task_core/
publisher.rs

1//! Event publishing system using ZMQ.
2//!
3//! This module provides ZMQ-based event publishing for SCUD task management.
4//! It supports publishing task events via PUB socket and handling requests via REP socket.
5
6use anyhow::{anyhow, Result};
7use zmq;
8
9/// Event publisher for SCUD task management events.
10///
11/// Manages ZMQ sockets for publishing events and handling requests.
12pub struct EventPublisher {
13    /// ZMQ context (kept alive for socket lifetime)
14    #[allow(dead_code)]
15    context: zmq::Context,
16    /// PUB socket for broadcasting task events
17    pub_socket: zmq::Socket,
18    /// REP socket for handling requests
19    rep_socket: zmq::Socket,
20}
21
22impl EventPublisher {
23    /// Create a new EventPublisher with ZMQ sockets.
24    ///
25    /// # Arguments
26    /// * `pub_endpoint` - Endpoint for the PUB socket (e.g., "tcp://*:5555")
27    /// * `rep_endpoint` - Endpoint for the REP socket (e.g., "tcp://*:5556")
28    ///
29    /// # Returns
30    /// A Result containing the EventPublisher or an error
31    pub fn new(pub_endpoint: &str, rep_endpoint: &str) -> Result<Self> {
32        let context = zmq::Context::new();
33
34        // Create PUB socket for publishing events
35        let pub_socket = context
36            .socket(zmq::PUB)
37            .map_err(|e| anyhow!("Failed to create PUB socket: {}", e))?;
38
39        // Create REP socket for handling requests
40        let rep_socket = context
41            .socket(zmq::REP)
42            .map_err(|e| anyhow!("Failed to create REP socket: {}", e))?;
43
44        // Bind sockets to endpoints
45        pub_socket
46            .bind(pub_endpoint)
47            .map_err(|e| anyhow!("Failed to bind PUB socket to {}: {}", pub_endpoint, e))?;
48
49        rep_socket
50            .bind(rep_endpoint)
51            .map_err(|e| anyhow!("Failed to bind REP socket to {}: {}", rep_endpoint, e))?;
52
53        Ok(EventPublisher {
54            context,
55            pub_socket,
56            rep_socket,
57        })
58    }
59
60    /// Publish an event message.
61    ///
62    /// # Arguments
63    /// * `topic` - The topic string for the message
64    /// * `message` - The message content
65    ///
66    /// # Returns
67    /// A Result indicating success or failure
68    pub fn publish(&self, topic: &str, message: &str) -> Result<()> {
69        let full_message = format!("{} {}", topic, message);
70        self.pub_socket
71            .send(&full_message, 0)
72            .map_err(|e| anyhow!("Failed to send message: {}", e))?;
73        Ok(())
74    }
75
76    /// Receive a request message.
77    ///
78    /// This is a blocking call that waits for a request.
79    ///
80    /// # Returns
81    /// A Result containing the received message or an error
82    pub fn receive_request(&self) -> Result<String> {
83        let mut msg = zmq::Message::new();
84        self.rep_socket
85            .recv(&mut msg, 0)
86            .map_err(|e| anyhow!("Failed to receive request: {}", e))?;
87        Ok(msg.as_str().unwrap_or("").to_string())
88    }
89
90    /// Send a reply message.
91    ///
92    /// # Arguments
93    /// * `reply` - The reply message to send
94    ///
95    /// # Returns
96    /// A Result indicating success or failure
97    pub fn send_reply(&self, reply: &str) -> Result<()> {
98        self.rep_socket
99            .send(reply, 0)
100            .map_err(|e| anyhow!("Failed to send reply: {}", e))?;
101        Ok(())
102    }
103}
104
105impl Drop for EventPublisher {
106    fn drop(&mut self) {
107        // ZMQ sockets and context will be cleaned up automatically
108    }
109}