1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
//! XPUB/XSUB Subscription Logging Example
//!
//! This example demonstrates the XPUB socket's ability to receive subscription
//! events from subscribers, which is useful for:
//! - Monitoring what topics subscribers are interested in
//! - Building last value caches (LVC)
//! - Implementing smart message brokers
//! - Subscription auditing and analytics
//!
//! Run this example:
//! ```bash
//! cargo run --example xpub_subscription_logging
//! ```
//!
//! Then in another terminal, run a subscriber:
//! ```bash
//! # Using zmq.rs or libzmq
//! python3 -c "
//! import zmq
//! ctx = zmq.Context()
//! sub = ctx.socket(zmq.SUB)
//! sub.connect('tcp://127.0.0.1:5555')
//! sub.subscribe(b'events.')
//! sub.subscribe(b'alerts.')
//! while True:
//! msg = sub.recv_multipart()
//! print(f'Received: {msg}')
//! "
//! ```
use bytes::Bytes;
use monocoque::zmq::prelude::*;
use std::io;
use tracing::{info, Level};
#[compio::main]
async fn main() -> io::Result<()> {
// Initialize logging
tracing_subscriber::fmt().with_max_level(Level::INFO).init();
info!("Starting XPUB subscription logging example");
// Create XPUB socket with verbose mode enabled
let mut xpub = XPubSocket::bind("127.0.0.1:5555").await?;
xpub.set_verbose(true); // Report all subscription messages
let addr = xpub.local_addr()?;
info!("XPUB socket listening on {}", addr);
info!("Socket type: {:?}", xpub.socket_type());
info!("");
info!("Connect subscribers to tcp://127.0.0.1:{}", addr.port());
info!("Waiting for subscription events...");
info!("");
// Track subscription statistics
let mut total_subscribes = 0u64;
let mut total_unsubscribes = 0u64;
loop {
// Accept new subscribers (non-blocking)
if let Err(e) = xpub.accept().await {
if e.kind() != io::ErrorKind::WouldBlock {
eprintln!("Error accepting connection: {}", e);
}
}
// Check for subscription events
match xpub.recv_subscription().await? {
Some(event) => {
use monocoque_core::subscription::SubscriptionEvent;
match event {
SubscriptionEvent::Subscribe(topic) => {
total_subscribes += 1;
info!(
"📥 SUBSCRIBE: {:?} (total subscribes: {})",
String::from_utf8_lossy(&topic),
total_subscribes
);
}
SubscriptionEvent::Unsubscribe(topic) => {
total_unsubscribes += 1;
info!(
"📤 UNSUBSCRIBE: {:?} (total unsubscribes: {})",
String::from_utf8_lossy(&topic),
total_unsubscribes
);
}
}
}
None => {
// No subscription events, publish some test data
if xpub.subscriber_count() > 0 {
// Send test messages to all subscribers
let topics = vec!["events.login", "events.logout", "alerts.error"];
for topic in topics {
let msg = vec![
Bytes::from(topic),
Bytes::from(format!("Test message for {}", topic)),
];
if let Err(e) = xpub.send(msg).await {
eprintln!("Error sending message: {}", e);
}
}
}
// Small delay to avoid busy-waiting
compio::time::sleep(std::time::Duration::from_millis(100)).await;
}
}
}
}