use dotenvy::dotenv;
use kode_bridge::{ClientConfig, IpcHttpClient, IpcStreamClient, Result, StreamClientConfig};
use std::env;
use std::time::Duration;
#[derive(Debug, serde::Deserialize, serde::Serialize)]
pub struct TrafficData {
pub up: u64,
pub down: u64,
}
pub trait TrafficMonitorExt {
fn monitor_traffic(&self, timeout: Duration) -> impl std::future::Future<Output = Result<Vec<TrafficData>>> + Send;
}
impl TrafficMonitorExt for IpcStreamClient {
async fn monitor_traffic(&self, timeout: Duration) -> Result<Vec<TrafficData>> {
self.get("/traffic").timeout(timeout).json_results().await
}
}
#[tokio::main]
async fn main() -> Result<()> {
dotenv().ok();
println!("ποΈ Clean Architecture: Two Client Types");
println!("========================================");
#[cfg(unix)]
let ipc_path = env::var("CUSTOM_SOCK").unwrap_or_else(|_| "/tmp/example.sock".to_string());
#[cfg(windows)]
let ipc_path = env::var("CUSTOM_PIPE").unwrap_or_else(|_| r"\\.\pipe\example".to_string());
println!("π§ Testing IpcHttpClient (Request/Response)");
let http_config = ClientConfig {
default_timeout: Duration::from_secs(15),
enable_pooling: true,
max_retries: 3,
retry_delay: Duration::from_millis(200),
..Default::default()
};
let http_client = IpcHttpClient::with_config(&ipc_path, http_config)?;
let response = http_client
.get("/proxies")
.timeout(Duration::from_secs(5))
.send()
.await?;
println!("β
Status: {}", response.status());
println!("π Response length: {} bytes", response.content_length());
println!("β¨ Is success: {}", response.is_success());
println!("π Is client error: {}", response.is_client_error());
println!("π₯ Is server error: {}", response.is_server_error());
if response.is_success() {
match response.json_value() {
Ok(json_data) => {
if let Some(proxies_obj) = json_data.as_object() {
println!("π Found {} proxy groups", proxies_obj.len());
for (i, (name, _)) in proxies_obj.iter().take(3).enumerate() {
println!(" {}. {}", i + 1, name);
}
}
}
Err(e) => {
println!("β οΈ JSON parsing failed: {}", e);
println!("π Raw response: {:?}", response.body()?);
}
}
}
println!("\nπ€ Testing POST request with JSON body");
let update_data = serde_json::json!({
"allow-lan": true,
"bind-address": "*",
"port": 7890
});
let post_response = http_client
.post("/configs")
.json_body(&update_data)
.timeout(Duration::from_secs(5))
.send()
.await?;
println!("β
POST Status: {}", post_response.status());
println!("\nπ Testing IpcStreamClient (Streaming)");
let stream_config = StreamClientConfig {
default_timeout: Duration::from_secs(30),
max_retries: 3,
retry_delay: Duration::from_millis(100),
buffer_size: 16384,
};
let stream_client = IpcStreamClient::with_config(&ipc_path, stream_config)?;
let traffic_data: Vec<TrafficData> = stream_client
.monitor_traffic(Duration::from_secs(6))
.await?;
println!("β
Collected {} traffic samples", traffic_data.len());
if let Some(latest) = traffic_data.last() {
println!(
"π Latest: β¬οΈ {} β¬οΈ {}",
format_bytes(latest.up),
format_bytes(latest.down)
);
}
if traffic_data.len() > 1 {
let total_up: u64 = traffic_data.iter().map(|t| t.up).sum();
let total_down: u64 = traffic_data.iter().map(|t| t.down).sum();
let avg_up = total_up / traffic_data.len() as u64;
let avg_down = total_down / traffic_data.len() as u64;
println!(
"π Average: β¬οΈ {}/s β¬οΈ {}/s",
format_bytes(avg_up),
format_bytes(avg_down)
);
}
println!("\nπ Testing real-time stream processing");
let mut sample_count = 0;
stream_client
.get("/traffic")
.timeout(Duration::from_secs(3))
.process_lines(|line| {
if line.trim().is_empty() {
return Ok(());
}
if let Ok(traffic) = serde_json::from_str::<TrafficData>(line) {
sample_count += 1;
println!(
"β‘ Live #{}: {} total/s",
sample_count,
format_bytes(traffic.up + traffic.down)
);
}
if sample_count >= 5 {
Err("Reached limit".into())
} else {
Ok(())
}
})
.await
.or_else(|e| {
if e.to_string().contains("Reached limit") {
Ok(())
} else {
Err(e)
}
})?;
if let Some(stats) = http_client.pool_stats() {
println!("\nπ HTTP Client Pool Stats: {}", stats);
}
println!("\nπ― Architecture Comparison:");
println!("βββββββββββββββββββ¬ββββββββββββββββββββββ¬ββββββββββββββββββββββ");
println!("β Feature β IpcHttpClient β IpcStreamClient β");
println!("βββββββββββββββββββΌββββββββββββββββββββββΌββββββββββββββββββββββ€");
println!("β Use Case β API calls, configs β Real-time monitoringβ");
println!("β Response Type β Complete HTTP resp β Streaming data β");
println!("β Connection Pool β β
Built-in β β Direct connects β");
println!("β JSON Parsing β β
Type-safe β β
Stream-optimized β");
println!("β Timeout Control β β
Per-request β β
Per-stream β");
println!("β Error Handling β β
Rich status info β β
Stream-aware β");
println!("β Best For β GET, POST, PUT, etc β Logs, metrics, eventsβ");
println!("βββββββββββββββββββ΄ββββββββββββββββββββββ΄ββββββββββββββββββββββ");
println!("\nπ Both clients work together seamlessly!");
println!("π Use IpcHttpClient for: Configuration, API calls, one-time requests");
println!("π Use IpcStreamClient for: Monitoring, logs, real-time data streams");
Ok(())
}
fn format_bytes(bytes: u64) -> String {
const UNITS: &[&str] = &["B", "KB", "MB", "GB"];
let mut value = bytes as f64;
let mut unit_idx = 0;
while value >= 1024.0 && unit_idx < UNITS.len() - 1 {
value /= 1024.0;
unit_idx += 1;
}
if unit_idx == 0 {
format!("{:.0}{}", value, UNITS[unit_idx])
} else {
format!("{:.1}{}", value, UNITS[unit_idx])
}
}