1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
//! XSUB Dynamic Subscription Example
//!
//! This example demonstrates the XSUB socket's ability to send subscription
//! messages upstream to publishers, enabling:
//! - Dynamic topic subscription at runtime
//! - Subscription forwarding in message brokers
//! - Cascading publish-subscribe networks
//!
//! Run this example:
//! ```bash
//! cargo run --example xsub_dynamic_subscription
//! ```
//!
//! First start a publisher:
//! ```bash
//! # Using zmq.rs or libzmq
//! python3 -c "
//! import zmq
//! import time
//! ctx = zmq.Context()
//! pub = ctx.socket(zmq.PUB)
//! pub.bind('tcp://127.0.0.1:5556')
//! time.sleep(1) # Let subscribers connect
//! while True:
//! pub.send_multipart([b'events.login', b'User logged in'])
//! pub.send_multipart([b'events.logout', b'User logged out'])
//! pub.send_multipart([b'alerts.error', b'System error'])
//! time.sleep(1)
//! "
//! ```
use bytes::Bytes;
use monocoque::zmq::prelude::*;
use std::io;
use tracing::{info, Level};
#[compio::main]
async fn main() -> io::Result<()> {
// Initialize logging
tracing_subscriber::fmt().with_max_level(Level::INFO).init();
info!("Starting XSUB dynamic subscription example");
// Connect to publisher
let mut xsub = XSubSocket::connect("127.0.0.1:5556").await?;
info!("Connected to publisher at tcp://127.0.0.1:5556");
info!("Socket type: {:?}", xsub.socket_type());
info!("");
// Dynamically subscribe to different topics over time
let subscription_plan = [
(2, "events.", "Subscribing to all events"),
(4, "alerts.", "Adding alerts subscription"),
(6, "events.", "Unsubscribing from events"),
(8, "", "Subscribing to ALL topics"),
];
let start = std::time::Instant::now();
let mut next_action_idx = 0;
loop {
let elapsed = start.elapsed().as_secs();
// Check if it's time for next subscription action
if next_action_idx < subscription_plan.len() {
let (trigger_sec, topic, description) = &subscription_plan[next_action_idx];
if elapsed >= *trigger_sec {
info!("⏰ [{}s] {}", elapsed, description);
if topic.is_empty() {
// Empty prefix = subscribe to all
xsub.subscribe(Bytes::new()).await?;
} else if next_action_idx == 2 {
// Unsubscribe example
xsub.unsubscribe(Bytes::from_static(b"events.")).await?;
} else {
// Normal subscribe
xsub.subscribe(Bytes::from(*topic)).await?;
}
info!(" Active subscriptions: {}", xsub.subscription_count());
next_action_idx += 1;
}
}
// Try to receive messages
match xsub.recv().await? {
Some(msg) => {
if msg.is_empty() {
continue;
}
let topic = String::from_utf8_lossy(&msg[0]);
let payload = if msg.len() > 1 {
String::from_utf8_lossy(&msg[1])
} else {
"".into()
};
info!("📨 Received: [{}] {}", topic, payload);
}
None => {
// No message available, small delay
compio::time::sleep(std::time::Duration::from_millis(100)).await;
}
}
// Exit after demonstration
if elapsed > 10 {
info!("");
info!("✅ Demonstration complete!");
info!(" Final subscription count: {}", xsub.subscription_count());
break;
}
}
Ok(())
}