#![allow(clippy::expect_used, clippy::useless_vec)]
use kode_bridge::{IpcStreamServer, JsonDataSource, Result, StreamMessage, StreamServerConfig};
use rand::RngExt as _;
use serde_json::json;
use std::env;
use std::time::{Duration, SystemTime, UNIX_EPOCH};
use tokio::signal;
use tracing::info;
#[derive(serde::Serialize)]
struct TrafficData {
timestamp: u64,
up: u64,
down: u64,
connections: u32,
}
#[derive(serde::Serialize)]
struct SystemMetrics {
timestamp: u64,
cpu_usage: f64,
memory_usage: f64,
disk_usage: f64,
network_rx: u64,
network_tx: u64,
}
#[derive(serde::Serialize)]
struct EventLog {
timestamp: u64,
level: String,
message: String,
source: String,
}
fn generate_traffic_data() -> Result<serde_json::Value> {
let timestamp = SystemTime::now()
.duration_since(UNIX_EPOCH)
.expect("SystemTime before UNIX_EPOCH")
.as_secs();
let mut rng = rand::rng();
let traffic = TrafficData {
timestamp,
up: rng.random_range(0..1000000), down: rng.random_range(0..5000000), connections: rng.random_range(10..110), };
Ok(serde_json::to_value(traffic)?)
}
fn generate_system_metrics() -> Result<serde_json::Value> {
let timestamp = SystemTime::now()
.duration_since(UNIX_EPOCH)
.expect("SystemTime before UNIX_EPOCH")
.as_secs();
let mut rng = rand::rng();
let metrics = SystemMetrics {
timestamp,
cpu_usage: (rng.random::<f64>() * 100.0).round() / 100.0, memory_usage: (rng.random::<f64>() * 100.0).round() / 100.0, disk_usage: (rng.random::<f64>() * 100.0).round() / 100.0, network_rx: rng.random_range(0..1000000),
network_tx: rng.random_range(0..1000000),
};
Ok(serde_json::to_value(metrics)?)
}
fn generate_event_log() -> Result<serde_json::Value> {
let timestamp = SystemTime::now()
.duration_since(UNIX_EPOCH)
.expect("SystemTime before UNIX_EPOCH")
.as_secs();
let events = [
("INFO", "User logged in successfully", "auth"),
("WARN", "High memory usage detected", "system"),
("ERROR", "Failed to connect to database", "database"),
("INFO", "Backup completed successfully", "backup"),
("DEBUG", "Cache refresh initiated", "cache"),
("INFO", "New client connected", "network"),
("WARN", "Rate limit exceeded", "api"),
];
let mut rng = rand::rng();
let index = rng.random_range(0..events.len());
let (level, message, source) = events[index];
let event = EventLog {
timestamp,
level: level.to_string(),
message: message.to_string(),
source: source.to_string(),
};
Ok(serde_json::to_value(event)?)
}
#[tokio::main]
async fn main() -> Result<()> {
tracing_subscriber::fmt::init();
println!("🌊 Streaming IPC Server Example");
println!("===============================");
#[cfg(unix)]
let ipc_path = env::var("CUSTOM_SOCK").unwrap_or_else(|_| "/tmp/stream_server.sock".to_string());
#[cfg(windows)]
let ipc_path = env::var("CUSTOM_PIPE").unwrap_or_else(|_| r"\\.\pipe\stream_server".to_string());
println!("📡 Server will listen on: {}", ipc_path);
let config = StreamServerConfig {
max_connections: 100,
buffer_size: 65536,
write_timeout: Duration::from_secs(10),
max_message_size: 1024 * 1024, enable_logging: true,
shutdown_timeout: Duration::from_secs(5),
broadcast_capacity: 1000,
keepalive_interval: Duration::from_secs(30),
};
let traffic_source = JsonDataSource::new(generate_traffic_data, Duration::from_secs(2));
#[cfg(unix)]
let mut server = IpcStreamServer::with_config(&ipc_path, config)?.with_listener_mode(0o666);
#[cfg(windows)]
let mut server =
IpcStreamServer::with_config(&ipc_path, config)?.with_listener_security_descriptor("D:(A;;GA;;;WD)");
println!("🌟 Server configured for streaming:");
println!(" 📊 Traffic data every 2 seconds");
println!(" 💾 System metrics");
println!(" 📝 Event logs");
println!(" 🔄 Keep-alive pings");
println!();
let server_broadcast = {
println!("📈 Starting additional data generators...");
tokio::spawn(async move {
let mut interval = tokio::time::interval(Duration::from_secs(5));
loop {
interval.tick().await;
if let Ok(metrics) = generate_system_metrics() {
info!(
"Generated system metrics: CPU {:.1}%",
metrics
.get("cpu_usage")
.and_then(|v| v.as_f64())
.unwrap_or(0.0)
);
}
}
})
};
let event_generator = tokio::spawn(async move {
let mut interval = tokio::time::interval(Duration::from_secs(8));
loop {
interval.tick().await;
if let Ok(event) = generate_event_log() {
info!(
"Generated event: {} - {}",
event
.get("level")
.and_then(|v| v.as_str())
.unwrap_or("UNKNOWN"),
event
.get("message")
.and_then(|v| v.as_str())
.unwrap_or("No message")
);
}
}
});
let server_task = tokio::spawn(async move {
if let Err(e) = server.serve_with_source(traffic_source).await {
eprintln!("Server error: {}", e);
}
});
println!("✅ Server started successfully!");
println!("📊 Data streams active:");
println!(" • Traffic data: Every 2 seconds");
println!(" • System metrics: Every 5 seconds");
println!(" • Event logs: Every 8 seconds");
println!(" • Keep-alive: Every 30 seconds");
println!();
println!("📱 Client connection info:");
#[cfg(unix)]
{
println!(
"CUSTOM_SOCK={} cargo run --features=client --example elegant_stream",
ipc_path
);
}
#[cfg(windows)]
{
println!("set CUSTOM_PIPE={}", ipc_path);
println!("cargo run --features=client --example elegant_stream");
}
println!();
let stats_task = tokio::spawn(async move {
let mut interval = tokio::time::interval(Duration::from_secs(10));
loop {
interval.tick().await;
info!("📊 Server stats: {} connections, broadcasting data streams", 0);
}
});
println!("🎯 Server is running. Press Ctrl+C to shutdown...");
match signal::ctrl_c().await {
Ok(()) => {
println!("🛑 Shutdown signal received");
}
Err(err) => {
eprintln!("Unable to listen for shutdown signal: {}", err);
}
}
println!("🔄 Shutting down server...");
server_task.abort();
server_broadcast.abort();
event_generator.abort();
stats_task.abort();
tokio::time::sleep(Duration::from_millis(500)).await;
println!("✅ Server stopped");
Ok(())
}
#[allow(dead_code)]
fn manual_broadcast_example() {
let _data = json!({
"type": "notification",
"message": "Manual broadcast message",
"timestamp": SystemTime::now().duration_since(UNIX_EPOCH).expect("SystemTime before UNIX_EPOCH").as_secs()
});
info!("Manual broadcast sent");
}
#[allow(dead_code)]
fn demonstrate_message_types() {
let _json_msg = StreamMessage::json(&json!({
"type": "data",
"value": 42
}));
let _text_msg = StreamMessage::text("Hello, streaming clients!");
let _binary_msg = StreamMessage::binary(vec![0x01, 0x02, 0x03, 0x04]);
info!("Demonstrated different message types");
}