use std::sync::Arc;
use std::sync::atomic::{AtomicBool, Ordering};
use std::time::SystemTime;
use chrono::{DateTime, Local};
use clap::Parser;
use epics_base_rs::types::WallTime;
use epics_ca_rs::cli::{
FloatFormat, FloatStyle, IntStyle, PV_NAME_WIDTH, ValueFormat, format_value,
};
use epics_ca_rs::client::{CaChannel, CaClient, ConnectionEvent, EnumReadback};
const VERSION_INFO: &str = concat!(
"\nEPICS Version epics-rs ",
env!("CARGO_PKG_VERSION"),
", CA Protocol version 4.13"
);
#[derive(Parser)]
#[command(
name = "camonitor-rs",
about = "Monitor EPICS PVs for changes",
disable_version_flag = true
)]
struct Args {
#[arg(short = 'V', long, hide = true)]
version: bool,
#[arg(short = 'w', long = "wait")]
timeout: Option<f64>,
#[arg(short = 'm', long, value_name = "MASK")]
event_mask: Option<String>,
#[arg(short = 'p', long)]
priority: Option<u8>,
#[arg(short = 't', long = "timestamp", value_name = "KEY")]
timestamp_key: Option<String>,
#[arg(short = 'n', long = "num-enum")]
enum_as_number: bool,
#[arg(short = '#', long = "max-elements", value_name = "COUNT")]
max_elements: Option<usize>,
#[arg(short = 'S', long = "char-as-string")]
char_array_as_string: bool,
#[arg(short = 'e', long = "format-e", value_name = "PRECISION")]
fmt_e: Option<u32>,
#[arg(short = 'f', long = "format-f", value_name = "PRECISION")]
fmt_f: Option<u32>,
#[arg(short = 'g', long = "format-g", value_name = "PRECISION")]
fmt_g: Option<u32>,
#[arg(short = 's', long = "string-format")]
string_format: bool,
#[arg(long = "lx", conflicts_with_all = ["lo_flag", "lb_flag", "ix_flag", "io_flag", "ib_flag"])]
lx_flag: bool,
#[arg(long = "lo", conflicts_with_all = ["lx_flag", "lb_flag", "ix_flag", "io_flag", "ib_flag"])]
lo_flag: bool,
#[arg(long = "lb", conflicts_with_all = ["lx_flag", "lo_flag", "ix_flag", "io_flag", "ib_flag"])]
lb_flag: bool,
#[arg(long = "0x", conflicts_with_all = ["io_flag", "ib_flag"])]
ix_flag: bool,
#[arg(long = "0o", conflicts_with_all = ["ix_flag", "ib_flag"])]
io_flag: bool,
#[arg(long = "0b", conflicts_with_all = ["ix_flag", "io_flag"])]
ib_flag: bool,
#[arg(short = 'F', long = "field-separator", value_name = "OFS")]
field_separator: Option<char>,
#[arg(required_unless_present_any = ["version"])]
pv_names: Vec<String>,
}
impl Args {
fn value_format(&self) -> ValueFormat {
let mut fmt = ValueFormat::default();
if let Some(p) = self.fmt_e {
fmt.float = FloatFormat {
style: FloatStyle::E,
precision: p,
};
} else if let Some(p) = self.fmt_f {
fmt.float = FloatFormat {
style: FloatStyle::F,
precision: p,
};
} else if let Some(p) = self.fmt_g {
fmt.float = FloatFormat {
style: FloatStyle::G,
precision: p,
};
}
if self.ix_flag || self.lx_flag {
fmt.int_style = IntStyle::Hex;
} else if self.io_flag || self.lo_flag {
fmt.int_style = IntStyle::Oct;
} else if self.ib_flag || self.lb_flag {
fmt.int_style = IntStyle::Bin;
}
fmt.float_as_int = self.lx_flag || self.lo_flag || self.lb_flag;
fmt.enum_as_number = self.enum_as_number;
fmt.char_array_as_string = self.char_array_as_string;
fmt.max_elements = self.max_elements;
if let Some(c) = self.field_separator {
fmt.field_separator = c;
}
fmt
}
}
#[tokio::main]
async fn main() {
let args = Args::parse();
if args.version {
println!("{VERSION_INFO}");
return;
}
let client = CaClient::new().await.expect("failed to create CA client");
let priority = args.priority.unwrap_or(0);
let connected_flags: Vec<Arc<AtomicBool>> = args
.pv_names
.iter()
.map(|_| Arc::new(AtomicBool::new(false)))
.collect();
let fmt = Arc::new(args.value_format());
let req_elems_present = args.max_elements.is_some();
let mask = parse_event_mask(args.event_mask.as_deref());
let spec = parse_timestamp_spec(args.timestamp_key.as_deref());
let float_as_string = args.string_format;
let req_count = args.max_elements.map(|n| n as u32);
let start = SystemTime::now();
let first_server = Arc::new(std::sync::Mutex::new(None::<SystemTime>));
let prev_all_server = Arc::new(std::sync::Mutex::new(None::<SystemTime>));
let prev_all_client = Arc::new(std::sync::Mutex::new(None::<SystemTime>));
let mut handles = Vec::new();
for (i, pv_name) in args.pv_names.iter().enumerate() {
let channel = client.create_channel_with_priority(pv_name, priority);
let pv = pv_name.clone();
let flag = connected_flags[i].clone();
let fmt = fmt.clone();
let first_server = first_server.clone();
let prev_all_server = prev_all_server.clone();
let prev_all_client = prev_all_client.clone();
handles.push(tokio::spawn(async move {
monitor_pv(
channel,
pv,
flag,
fmt,
float_as_string,
req_elems_present,
req_count,
mask,
spec,
start,
first_server,
prev_all_server,
prev_all_client,
)
.await;
}));
}
let timeout_secs = args
.timeout
.unwrap_or_else(epics_ca_rs::cli::env_default_timeout);
tokio::time::sleep(epics_ca_rs::cli::timeout_duration(timeout_secs)).await;
let sep = args.field_separator.unwrap_or(' ');
for (i, pv_name) in args.pv_names.iter().enumerate() {
if !connected_flags[i].load(Ordering::Acquire) {
let name_col = if sep == ' ' {
format!("{pv_name:<width$}", width = PV_NAME_WIDTH)
} else {
pv_name.clone()
};
println!("{name_col}{sep}*** Not connected (PV not found)");
}
}
for handle in handles {
let _ = handle.await;
}
}
fn format_server_timestamp(ts: SystemTime) -> String {
let dt: DateTime<Local> = ts.into();
dt.format("%Y-%m-%d %H:%M:%S%.6f").to_string()
}
fn sevr_to_str(sevr: u16) -> &'static str {
match sevr {
0 => "NO_ALARM",
1 => "MINOR",
2 => "MAJOR",
3 => "INVALID",
_ => "Illegal value",
}
}
fn stat_to_str(stat: u16) -> &'static str {
match stat {
0 => "NO_ALARM",
1 => "READ",
2 => "WRITE",
3 => "HIHI",
4 => "HIGH",
5 => "LOLO",
6 => "LOW",
7 => "STATE",
8 => "COS",
9 => "COMM",
10 => "TIMEOUT",
11 => "HW_LIMIT",
12 => "CALC",
13 => "SCAN",
14 => "LINK",
15 => "SOFT",
16 => "BAD_SUB",
17 => "UDF",
18 => "DISABLE",
19 => "SIMM",
20 => "READ_ACCESS",
21 => "WRITE_ACCESS",
_ => "Illegal value",
}
}
#[allow(clippy::too_many_arguments)]
async fn monitor_pv(
channel: CaChannel,
pv_name: String,
connected_flag: Arc<AtomicBool>,
fmt: Arc<ValueFormat>,
float_as_string: bool,
req_elems_present: bool,
req_count: Option<u32>,
mask: u16,
spec: TimestampSpec,
start: SystemTime,
first_server: Arc<std::sync::Mutex<Option<SystemTime>>>,
prev_all_server: Arc<std::sync::Mutex<Option<SystemTime>>>,
prev_all_client: Arc<std::sync::Mutex<Option<SystemTime>>>,
) {
let mut prev_chan_server: Option<SystemTime> = None;
let mut prev_chan_client: Option<SystemTime> = None;
let mut first_printed = false;
let mut conn_rx = channel.connection_events();
let pv = pv_name.clone();
let flag = connected_flag.clone();
let sep = fmt.field_separator;
tokio::spawn(async move {
while let Ok(evt) = conn_rx.recv().await {
match evt {
ConnectionEvent::Connected => {
flag.store(true, Ordering::Release);
}
ConnectionEvent::Disconnected => {
let now = Local::now().format("%Y-%m-%d %H:%M:%S%.6f");
let name_col = if sep == ' ' {
format!("{pv:<width$}", width = PV_NAME_WIDTH)
} else {
pv.clone()
};
println!("{name_col}{sep}{now} *** disconnected");
}
_ => {}
}
}
});
let enum_readback = if fmt.enum_as_number {
EnumReadback::Numeric
} else {
EnumReadback::Label
};
let Ok(mut monitor) = channel
.subscribe_with_mask_readback_count(0.0, mask, enum_readback, float_as_string, req_count)
.await
else {
return;
};
while let Some(result) = monitor.recv().await {
match result {
Ok(snap) => {
let recv_time = SystemTime::now();
let mut fs = first_server.lock().unwrap();
let time_seg = if spec.kind == TimestampKind::IncrAll {
let mut ps = prev_all_server.lock().unwrap();
let mut pc = prev_all_client.lock().unwrap();
let mut st = TimestampState {
first_server: &mut fs,
first_printed: &mut first_printed,
prev_server: &mut ps,
prev_client: &mut pc,
};
render_timestamp(spec, snap.timestamp, recv_time, start, &mut st)
} else {
let mut st = TimestampState {
first_server: &mut fs,
first_printed: &mut first_printed,
prev_server: &mut prev_chan_server,
prev_client: &mut prev_chan_client,
};
render_timestamp(spec, snap.timestamp, recv_time, start, &mut st)
}
.map(|ts| format!("{ts}{sep}"))
.unwrap_or_default();
drop(fs);
let enum_strings = snap.enums.as_ref().map(|e| e.strings.as_slice());
let rendered = format_value(&snap.value, &fmt, enum_strings, req_elems_present);
let is_scalar = snap.value.count() == 1;
let name_col = if is_scalar && sep == ' ' {
format!("{pv_name:<width$}", width = PV_NAME_WIDTH)
} else {
pv_name.clone()
};
let stat = snap.alarm.status;
let sevr = snap.alarm.severity;
if stat == 0 && sevr == 0 {
println!("{name_col}{sep}{time_seg}{rendered}{sep}{sep}");
} else {
println!(
"{name_col}{sep}{time_seg}{rendered}{sep}{stat_str}{sep}{sevr_str}",
stat_str = stat_to_str(stat),
sevr_str = sevr_to_str(sevr),
);
}
}
Err(e) => {
eprintln!("{pv_name}: {e}");
}
}
}
}
fn parse_event_mask(m: Option<&str>) -> u16 {
const DBE_VALUE: u16 = 1;
const DBE_LOG: u16 = 2;
const DBE_ALARM: u16 = 4;
const DBE_PROPERTY: u16 = 8;
const DEFAULT: u16 = DBE_VALUE | DBE_ALARM;
let Some(s) = m else { return DEFAULT };
let mut mask = 0u16;
for c in s.chars() {
match c {
'v' => mask |= DBE_VALUE,
'a' => mask |= DBE_ALARM,
'l' => mask |= DBE_LOG,
'p' => mask |= DBE_PROPERTY,
_ => {
eprintln!("Invalid argument '{s}' for option '-m' - ignored.");
return DEFAULT;
}
}
}
mask
}
#[derive(Clone, Copy, PartialEq, Eq)]
enum TimestampKind {
Absolute,
Relative,
IncrAll,
IncrChan,
}
#[derive(Clone, Copy)]
struct TimestampSpec {
server: bool,
client: bool,
kind: TimestampKind,
}
fn parse_timestamp_spec(k: Option<&str>) -> TimestampSpec {
let Some(k) = k else {
return TimestampSpec {
server: true,
client: false,
kind: TimestampKind::Absolute,
};
};
let mut spec = TimestampSpec {
server: false,
client: false,
kind: TimestampKind::Absolute,
};
for c in k.chars() {
match c {
's' => spec.server = true,
'c' => spec.client = true,
'r' => spec.kind = TimestampKind::Relative,
'i' => spec.kind = TimestampKind::IncrAll,
'I' => spec.kind = TimestampKind::IncrChan,
_ => {} }
}
spec
}
struct TimestampState<'a> {
first_server: &'a mut Option<SystemTime>,
first_printed: &'a mut bool,
prev_server: &'a mut Option<SystemTime>,
prev_client: &'a mut Option<SystemTime>,
}
fn render_timestamp(
spec: TimestampSpec,
server_ts: WallTime,
client_ts: SystemTime,
start: SystemTime,
state: &mut TimestampState<'_>,
) -> Option<String> {
let server_ts: SystemTime = server_ts.into();
fn secs_between(a: SystemTime, b: SystemTime) -> f64 {
match a.duration_since(b) {
Ok(d) => d.as_secs_f64(),
Err(e) => -e.duration().as_secs_f64(),
}
}
if state.first_server.is_none() {
*state.first_server = Some(server_ts);
}
let print_abs = spec.kind == TimestampKind::Absolute || !*state.first_printed;
let server_ref = state.first_server.unwrap_or(server_ts);
let render_one = |ts: SystemTime, is_server: bool, prev: Option<SystemTime>| -> String {
if print_abs {
return format_server_timestamp(ts);
}
match spec.kind {
TimestampKind::Absolute => format_server_timestamp(ts),
TimestampKind::Relative => {
let r = if is_server { server_ref } else { start };
format!("{:+12.6}", secs_between(ts, r))
}
TimestampKind::IncrAll | TimestampKind::IncrChan => {
format!("{:+12.6}", secs_between(ts, prev.unwrap_or(ts)))
}
}
};
const DIFF_PREFIX: &str = " "; let prefix = if print_abs { "" } else { DIFF_PREFIX };
let mut out = String::new();
if spec.server {
out.push_str(prefix);
out.push_str(&render_one(server_ts, true, *state.prev_server));
}
if spec.client {
out.push_str(prefix);
out.push('(');
out.push_str(&render_one(client_ts, false, *state.prev_client));
out.push(')');
}
*state.prev_server = Some(server_ts);
*state.prev_client = Some(client_ts);
if print_abs {
*state.first_printed = true;
}
if out.is_empty() { None } else { Some(out) }
}
#[cfg(test)]
mod tests {
use super::{
TimestampKind, TimestampSpec, TimestampState, parse_event_mask, parse_timestamp_spec,
render_timestamp,
};
use std::time::{Duration, SystemTime};
#[test]
fn mask_default_is_value_alarm() {
assert_eq!(parse_event_mask(None), 1 | 4);
}
#[test]
fn mask_invalid_letter_reverts_to_value_alarm() {
assert_eq!(parse_event_mask(Some("xyz")), 1 | 4);
assert_eq!(parse_event_mask(Some("vx")), 1 | 4);
}
#[test]
fn mask_empty_selects_no_events() {
assert_eq!(parse_event_mask(Some("")), 0);
}
#[test]
fn mask_parses_dbe_letters() {
assert_eq!(parse_event_mask(Some("a")), 4, "alarm-only");
assert_eq!(parse_event_mask(Some("v")), 1, "value-only");
assert_eq!(parse_event_mask(Some("p")), 8, "property-only");
assert_eq!(parse_event_mask(Some("val")), 1 | 4 | 2, "value+alarm+log");
}
#[test]
fn timestamp_spec_parses_keys() {
let s = parse_timestamp_spec(None);
assert!(s.server && !s.client && matches!(s.kind, TimestampKind::Absolute));
let s = parse_timestamp_spec(Some("s"));
assert!(s.server && !s.client);
let s = parse_timestamp_spec(Some("c"));
assert!(!s.server && s.client);
let s = parse_timestamp_spec(Some("sc"));
assert!(s.server && s.client);
let s = parse_timestamp_spec(Some("n"));
assert!(!s.server && !s.client, "n selects no source");
let s = parse_timestamp_spec(Some("cr"));
assert!(!s.server && s.client && matches!(s.kind, TimestampKind::Relative));
assert!(matches!(
parse_timestamp_spec(Some("i")).kind,
TimestampKind::IncrAll
));
assert!(matches!(
parse_timestamp_spec(Some("I")).kind,
TimestampKind::IncrChan
));
}
fn ts_state<'a>(
first_server: &'a mut Option<SystemTime>,
first_printed: &'a mut bool,
prev_server: &'a mut Option<SystemTime>,
prev_client: &'a mut Option<SystemTime>,
) -> TimestampState<'a> {
TimestampState {
first_server,
first_printed,
prev_server,
prev_client,
}
}
fn srv_diff(d: f64) -> String {
format!(" {d:+12.6}")
}
fn cli_diff(d: f64) -> String {
format!(" ({d:+12.6})")
}
#[test]
fn timestamp_first_event_is_absolute_then_diffs() {
let start = SystemTime::UNIX_EPOCH;
let t1 = start + Duration::from_secs(10);
let t2 = start + Duration::from_secs(13);
let srv = |kind| TimestampSpec {
server: true,
client: false,
kind,
};
let none = TimestampSpec {
server: false,
client: false,
kind: TimestampKind::Absolute,
};
let (mut fsv, mut fp, mut ps, mut pc) = (None, false, None, None);
let mut st = ts_state(&mut fsv, &mut fp, &mut ps, &mut pc);
assert!(render_timestamp(none, t1.into(), t1, start, &mut st).is_none());
let (mut fsv, mut fp, mut ps, mut pc) = (None, false, None, None);
let mut st = ts_state(&mut fsv, &mut fp, &mut ps, &mut pc);
let first = render_timestamp(srv(TimestampKind::Relative), t1.into(), t1, start, &mut st);
assert_eq!(
first.as_deref(),
Some(super::format_server_timestamp(t1).as_str()),
"first event must render the absolute server stamp"
);
let second = render_timestamp(srv(TimestampKind::Relative), t2.into(), t2, start, &mut st);
assert_eq!(second.as_deref(), Some(srv_diff(3.0).as_str()));
}
#[test]
fn timestamp_server_incremental_diffs_against_prev() {
let start = SystemTime::UNIX_EPOCH;
let t1 = start + Duration::from_secs(10);
let t2 = start + Duration::from_secs(13);
let srv = TimestampSpec {
server: true,
client: false,
kind: TimestampKind::IncrAll,
};
let (mut fsv, mut fp, mut ps, mut pc) = (None, false, None, None);
let mut st = ts_state(&mut fsv, &mut fp, &mut ps, &mut pc);
assert_eq!(
render_timestamp(srv, t1.into(), t1, start, &mut st).as_deref(),
Some(super::format_server_timestamp(t1).as_str()),
"leading incremental event is absolute"
);
assert_eq!(
render_timestamp(srv, t2.into(), t2, start, &mut st).as_deref(),
Some(srv_diff(3.0).as_str()),
"second incremental event diffs against the prior stamp"
);
}
#[test]
fn timestamp_backward_step_renders_negative_delta() {
let start = SystemTime::UNIX_EPOCH;
let t1 = start + Duration::from_secs(10);
let t2 = start + Duration::from_secs(7); let srv = TimestampSpec {
server: true,
client: false,
kind: TimestampKind::IncrAll,
};
let (mut fsv, mut fp, mut ps, mut pc) = (None, false, None, None);
let mut st = ts_state(&mut fsv, &mut fp, &mut ps, &mut pc);
render_timestamp(srv, t1.into(), t1, start, &mut st);
let second = render_timestamp(srv, t2.into(), t2, start, &mut st);
assert_eq!(
second.as_deref(),
Some(srv_diff(-3.0).as_str()),
"backward step must render as a negative delta, not +3"
);
assert!(
second.as_deref().unwrap().contains("-3.000000"),
"delta carries a minus sign: {second:?}"
);
}
#[test]
fn timestamp_client_relative_uses_program_start_after_first() {
let start = SystemTime::UNIX_EPOCH;
let c1 = start + Duration::from_secs(4);
let c2 = start + Duration::from_secs(10);
let cr = TimestampSpec {
server: false,
client: true,
kind: TimestampKind::Relative,
};
let (mut fsv, mut fp, mut ps, mut pc) = (None, false, None, None);
let mut st = ts_state(&mut fsv, &mut fp, &mut ps, &mut pc);
let first = render_timestamp(cr, start.into(), c1, start, &mut st);
assert_eq!(
first.as_deref(),
Some(format!("({})", super::format_server_timestamp(c1)).as_str())
);
let second = render_timestamp(cr, start.into(), c2, start, &mut st);
assert_eq!(second.as_deref(), Some(cli_diff(10.0).as_str()));
}
#[test]
fn timestamp_both_sources_render_independently_after_first() {
let start = SystemTime::UNIX_EPOCH;
let s1 = start + Duration::from_secs(5);
let c1 = start + Duration::from_secs(4);
let s2 = start + Duration::from_secs(8);
let c2 = start + Duration::from_secs(10);
let both = TimestampSpec {
server: true,
client: true,
kind: TimestampKind::Relative,
};
let (mut fsv, mut fp, mut ps, mut pc) = (None, false, None, None);
let mut st = ts_state(&mut fsv, &mut fp, &mut ps, &mut pc);
let first = render_timestamp(both, s1.into(), c1, start, &mut st);
assert_eq!(
first.as_deref(),
Some(
format!(
"{}({})",
super::format_server_timestamp(s1),
super::format_server_timestamp(c1)
)
.as_str()
)
);
let second = render_timestamp(both, s2.into(), c2, start, &mut st);
assert_eq!(
second.as_deref(),
Some(format!("{}{}", srv_diff(3.0), cli_diff(10.0)).as_str())
);
}
}