use bytes::Bytes;
use monocoque::zmq::proxy::{proxy_steerable, ProxyCommand};
use monocoque::zmq::{DealerSocket, ReqSocket, RouterSocket};
use monocoque_zmtp::pair::PairSocket;
use std::time::Duration;
use tracing::{error, info};
async fn worker(id: u32) -> std::io::Result<()> {
info!("[Worker-{}] Starting", id);
compio::runtime::time::sleep(Duration::from_millis(500)).await;
let mut socket = DealerSocket::connect("127.0.0.1:5556").await?;
loop {
if let Ok(Some(mut msg)) = socket.recv().await {
if !msg.is_empty() && msg[0].is_empty() {
msg.remove(0);
}
if let Some(request) = msg.last() {
info!(
"[Worker-{}] Processing: {}",
id,
String::from_utf8_lossy(request)
);
}
compio::runtime::time::sleep(Duration::from_millis(100)).await;
let reply = format!("Processed by worker-{}", id);
let mut response = vec![Bytes::new()];
response.extend(msg[..msg.len().saturating_sub(1)].to_vec());
response.push(Bytes::from(reply));
socket.send(response).await?;
}
compio::runtime::time::sleep(Duration::from_millis(10)).await;
}
}
async fn client(id: u32, requests: u32) -> std::io::Result<()> {
info!("[Client-{}] Starting", id);
compio::runtime::time::sleep(Duration::from_secs(1)).await;
let mut socket = ReqSocket::connect("127.0.0.1:5555").await?;
for i in 1..=requests {
let request = format!("Request {} from client-{}", i, id);
info!("[Client-{}] Sending: {}", id, request);
socket.send(vec![Bytes::from(request)]).await?;
if let Ok(Some(reply)) = socket.recv().await {
if let Some(data) = reply.first() {
info!(
"[Client-{}] Received: {}",
id,
String::from_utf8_lossy(data)
);
}
}
compio::runtime::time::sleep(Duration::from_millis(500)).await;
}
info!("[Client-{}] Done", id);
Ok(())
}
async fn broker() -> std::io::Result<()> {
info!("🚀 Starting Steerable Broker");
let (_, mut frontend) = RouterSocket::bind("127.0.0.1:5555").await?;
info!("📡 Frontend (clients): 127.0.0.1:5555");
let (_, mut backend) = DealerSocket::bind("127.0.0.1:5556").await?;
info!("📡 Backend (workers): 127.0.0.1:5556");
let (_, mut control) = PairSocket::bind("127.0.0.1:5557").await?;
info!("🎮 Control socket: 127.0.0.1:5557");
info!(" Send commands: PAUSE, RESUME, TERMINATE, STATISTICS\n");
proxy_steerable(
&mut frontend,
&mut backend,
Option::<&mut DealerSocket>::None,
&mut control,
)
.await?;
Ok(())
}
async fn controller() -> std::io::Result<()> {
info!("[Controller] Starting");
compio::runtime::time::sleep(Duration::from_millis(800)).await;
let mut control = PairSocket::connect("127.0.0.1:5557").await?;
compio::runtime::time::sleep(Duration::from_secs(3)).await;
info!("\n[Controller] 🛑 Sending PAUSE command\n");
control.send(vec![Bytes::from("PAUSE")]).await?;
compio::runtime::time::sleep(Duration::from_secs(2)).await;
info!("\n[Controller] ▶️ Sending RESUME command\n");
control.send(vec![Bytes::from("RESUME")]).await?;
compio::runtime::time::sleep(Duration::from_secs(3)).await;
info!("\n[Controller] 📊 Sending STATISTICS command\n");
control.send(vec![Bytes::from("STATISTICS")]).await?;
compio::runtime::time::sleep(Duration::from_secs(1)).await;
info!("\n[Controller] 🛑 Sending TERMINATE command\n");
control.send(vec![Bytes::from("TERMINATE")]).await?;
Ok(())
}
#[compio::main]
async fn main() -> std::io::Result<()> {
tracing_subscriber::fmt()
.with_max_level(tracing::Level::INFO)
.with_target(false)
.init();
info!("🎬 Steerable Proxy Demo");
info!("========================");
info!("Demonstrates:");
info!(" • Steerable proxy with control socket");
info!(" • PAUSE/RESUME/TERMINATE commands");
info!(" • STATISTICS reporting");
info!("========================\n");
compio::runtime::spawn(async {
if let Err(e) = broker().await {
error!("Broker: {}", e);
}
})
.detach();
compio::runtime::time::sleep(Duration::from_millis(500)).await;
compio::runtime::spawn(async {
let _ = worker(1).await;
})
.detach();
compio::runtime::spawn(async {
let _ = worker(2).await;
})
.detach();
compio::runtime::time::sleep(Duration::from_millis(500)).await;
let _ = compio::runtime::spawn(async {
let _ = client(1, 10).await;
});
let controller_task = compio::runtime::spawn(async { controller().await });
let _ = controller_task.await;
compio::runtime::time::sleep(Duration::from_secs(1)).await;
info!("\n✅ Demo Complete!");
info!("\nKey Points:");
info!(" • Proxy can be controlled via control socket");
info!(" • PAUSE stops forwarding (messages dropped)");
info!(" • RESUME restarts forwarding");
info!(" • TERMINATE gracefully stops proxy");
info!(" • STATISTICS reports message count");
Ok(())
}