use clap::Args;
use console::style;
use futures::StreamExt;
use serde_json::Value;
use tokio::pin;
use tokio::signal;
use zenoh::{self, key_expr::KeyExpr};
use crate::error::Result;
#[derive(Args)]
pub struct MonitorArgs {
topic: String,
#[clap(short, long)]
timestamps: bool,
#[clap(short, long)]
json: bool,
}
pub async fn execute(args: MonitorArgs) -> Result<()> {
println!(
"{label} {topic}",
label = style("Monitoring topic:").bold(),
topic = args.topic
);
println!("Press Ctrl+C to exit");
let session = zenoh::open(zenoh::config::Config::default()).await?;
let topic_path = format!("zenobuf/topic/{topic}", topic = args.topic);
let key_expr = KeyExpr::try_from(topic_path)?;
let subscriber = session.declare_subscriber(key_expr).await?;
let mut stream = subscriber.stream();
let interrupt = signal::ctrl_c();
pin!(interrupt);
loop {
tokio::select! {
_ = &mut interrupt => {
println!("\nMonitoring stopped");
break;
}
sample = stream.next() => {
if let Some(sample) = sample {
let timestamp = chrono::Local::now().format("%Y-%m-%d %H:%M:%S%.3f");
let payload = sample.payload().to_bytes();
if args.json {
if let Ok(json) = serde_json::from_slice::<Value>(&payload) {
if args.timestamps {
println!("{timestamp} {json}", timestamp = timestamp, json = serde_json::to_string_pretty(&json)?);
} else {
println!("{}", serde_json::to_string_pretty(&json)?);
}
} else {
let payload_str = String::from_utf8_lossy(&payload);
if args.timestamps {
println!("{timestamp} {payload_str}");
} else {
println!("{payload_str}");
}
}
} else {
let payload_str = String::from_utf8_lossy(&payload);
if args.timestamps {
println!("{timestamp} {payload_str}");
} else {
println!("{payload_str}");
}
}
}
}
}
}
Ok(())
}