scud_task_core/publisher.rs
1//! Event publishing system using ZMQ.
2//!
3//! This module provides ZMQ-based event publishing for SCUD task management.
4//! It supports publishing task events via PUB socket and handling requests via REP socket.
5
6use anyhow::{anyhow, Result};
7use zmq;
8
9/// Event publisher for SCUD task management events.
10///
11/// Manages ZMQ sockets for publishing events and handling requests.
12pub struct EventPublisher {
13 /// ZMQ context (kept alive for socket lifetime)
14 #[allow(dead_code)]
15 context: zmq::Context,
16 /// PUB socket for broadcasting task events
17 pub_socket: zmq::Socket,
18 /// REP socket for handling requests
19 rep_socket: zmq::Socket,
20}
21
22impl EventPublisher {
23 /// Create a new EventPublisher with ZMQ sockets.
24 ///
25 /// # Arguments
26 /// * `pub_endpoint` - Endpoint for the PUB socket (e.g., "tcp://*:5555")
27 /// * `rep_endpoint` - Endpoint for the REP socket (e.g., "tcp://*:5556")
28 ///
29 /// # Returns
30 /// A Result containing the EventPublisher or an error
31 pub fn new(pub_endpoint: &str, rep_endpoint: &str) -> Result<Self> {
32 let context = zmq::Context::new();
33
34 // Create PUB socket for publishing events
35 let pub_socket = context
36 .socket(zmq::PUB)
37 .map_err(|e| anyhow!("Failed to create PUB socket: {}", e))?;
38
39 // Create REP socket for handling requests
40 let rep_socket = context
41 .socket(zmq::REP)
42 .map_err(|e| anyhow!("Failed to create REP socket: {}", e))?;
43
44 // Bind sockets to endpoints
45 pub_socket
46 .bind(pub_endpoint)
47 .map_err(|e| anyhow!("Failed to bind PUB socket to {}: {}", pub_endpoint, e))?;
48
49 rep_socket
50 .bind(rep_endpoint)
51 .map_err(|e| anyhow!("Failed to bind REP socket to {}: {}", rep_endpoint, e))?;
52
53 Ok(EventPublisher {
54 context,
55 pub_socket,
56 rep_socket,
57 })
58 }
59
60 /// Publish an event message.
61 ///
62 /// # Arguments
63 /// * `topic` - The topic string for the message
64 /// * `message` - The message content
65 ///
66 /// # Returns
67 /// A Result indicating success or failure
68 pub fn publish(&self, topic: &str, message: &str) -> Result<()> {
69 let full_message = format!("{} {}", topic, message);
70 self.pub_socket
71 .send(&full_message, 0)
72 .map_err(|e| anyhow!("Failed to send message: {}", e))?;
73 Ok(())
74 }
75
76 /// Receive a request message.
77 ///
78 /// This is a blocking call that waits for a request.
79 ///
80 /// # Returns
81 /// A Result containing the received message or an error
82 pub fn receive_request(&self) -> Result<String> {
83 let mut msg = zmq::Message::new();
84 self.rep_socket
85 .recv(&mut msg, 0)
86 .map_err(|e| anyhow!("Failed to receive request: {}", e))?;
87 Ok(msg.as_str().unwrap_or("").to_string())
88 }
89
90 /// Send a reply message.
91 ///
92 /// # Arguments
93 /// * `reply` - The reply message to send
94 ///
95 /// # Returns
96 /// A Result indicating success or failure
97 pub fn send_reply(&self, reply: &str) -> Result<()> {
98 self.rep_socket
99 .send(reply, 0)
100 .map_err(|e| anyhow!("Failed to send reply: {}", e))?;
101 Ok(())
102 }
103}
104
105impl Drop for EventPublisher {
106 fn drop(&mut self) {
107 // ZMQ sockets and context will be cleaned up automatically
108 }
109}