use std::sync::atomic::{AtomicI32, AtomicI64, AtomicU64, Ordering};
use std::sync::{Arc, Mutex};
use std::time::{Duration, SystemTime};
use dashmap::DashMap;
use epics_rs::base::server::snapshot::DbrClass;
use epics_rs::base::types::{DbFieldType, EpicsValue};
use epics_rs::ca::client::{CaChannel, CaClient, ConnectionEvent};
use epics_rs::pva::client_native::PvaClient;
use epics_rs::pva::proto::ByteOrder;
use epics_rs::pva::pvdata::encode::{encode_pv_field, encode_type_desc};
use epics_rs::pva::pvdata::{PvField, ScalarType, ScalarValue, TypedScalarArray};
use tokio::sync::mpsc;
use tokio_util::sync::CancellationToken;
use tracing::{debug, error, info, warn};
use archiver_core::registry::{Protocol, PvRecord, PvRegistry, PvStatus, SampleMode};
use archiver_core::storage::traits::{AppendMeta, IngestFlushResult, StoragePlugin};
use archiver_core::types::{ArchDbType, ArchiverSample, ArchiverValue};
use crate::policy::PolicyConfig;
const CA_CONNECT_TIMEOUT: Duration = Duration::from_secs(10);
const CA_RECONNECT_TIMEOUT: Duration = Duration::from_secs(30);
const CA_RETRY_DELAY: Duration = Duration::from_secs(5);
const PAST_CUTOFF_UNIX_SECS: i64 = 662_688_000;
fn ioc_timestamp_in_window(ts: SystemTime, now: SystemTime, drift_secs: u64) -> bool {
let unix = ts
.duration_since(SystemTime::UNIX_EPOCH)
.map(|d| d.as_secs() as i64)
.unwrap_or(i64::MIN);
if unix < PAST_CUTOFF_UNIX_SECS {
return false;
}
let now_unix = now
.duration_since(SystemTime::UNIX_EPOCH)
.map(|d| d.as_secs() as i64)
.unwrap_or(0);
let delta = (unix - now_unix).unsigned_abs();
delta <= drift_secs
}
#[derive(Debug, Clone, Copy, Default, PartialEq, Eq)]
pub enum PvConnectionState {
#[default]
Idle,
Connecting,
Connected,
Disconnected,
}
impl PvConnectionState {
pub fn as_str(self) -> &'static str {
match self {
Self::Idle => "Idle",
Self::Connecting => "Connecting",
Self::Connected => "Connected",
Self::Disconnected => "Disconnected",
}
}
}
#[derive(Debug, Clone, Default)]
pub struct ConnectionInfo {
pub connected_since: Option<SystemTime>,
pub last_event_time: Option<SystemTime>,
pub is_connected: bool,
pub state: PvConnectionState,
}
#[derive(Debug)]
pub struct PvCounters {
pub events_received: AtomicU64,
pub events_stored: AtomicU64,
pub first_event_unix_secs: AtomicI64,
pub buffer_overflow_drops: AtomicU64,
pub timestamp_drops: AtomicU64,
pub type_change_drops: AtomicU64,
pub disconnect_count: AtomicU64,
pub last_disconnect_unix_secs: AtomicI64,
pub transient_error_count: AtomicU64,
pub latest_observed_dbr: AtomicI32,
pub metadata_fetch_failures: AtomicU64,
pub storage_append_timeouts: AtomicU64,
}
impl Default for PvCounters {
fn default() -> Self {
Self {
events_received: AtomicU64::new(0),
events_stored: AtomicU64::new(0),
first_event_unix_secs: AtomicI64::new(0),
buffer_overflow_drops: AtomicU64::new(0),
timestamp_drops: AtomicU64::new(0),
type_change_drops: AtomicU64::new(0),
disconnect_count: AtomicU64::new(0),
last_disconnect_unix_secs: AtomicI64::new(0),
transient_error_count: AtomicU64::new(0),
latest_observed_dbr: AtomicI32::new(-1),
metadata_fetch_failures: AtomicU64::new(0),
storage_append_timeouts: AtomicU64::new(0),
}
}
}
#[derive(Debug, Clone)]
pub struct PvCountersSnapshot {
pub events_received: u64,
pub events_stored: u64,
pub first_event_unix_secs: Option<i64>,
pub buffer_overflow_drops: u64,
pub timestamp_drops: u64,
pub type_change_drops: u64,
pub disconnect_count: u64,
pub last_disconnect_unix_secs: Option<i64>,
pub transient_error_count: u64,
pub latest_observed_dbr: Option<i32>,
pub metadata_fetch_failures: u64,
pub storage_append_timeouts: u64,
}
impl From<&PvCounters> for PvCountersSnapshot {
fn from(c: &PvCounters) -> Self {
let first = c.first_event_unix_secs.load(Ordering::Relaxed);
let last_disc = c.last_disconnect_unix_secs.load(Ordering::Relaxed);
Self {
events_received: c.events_received.load(Ordering::Relaxed),
events_stored: c.events_stored.load(Ordering::Relaxed),
first_event_unix_secs: if first == 0 { None } else { Some(first) },
buffer_overflow_drops: c.buffer_overflow_drops.load(Ordering::Relaxed),
timestamp_drops: c.timestamp_drops.load(Ordering::Relaxed),
type_change_drops: c.type_change_drops.load(Ordering::Relaxed),
disconnect_count: c.disconnect_count.load(Ordering::Relaxed),
last_disconnect_unix_secs: if last_disc == 0 {
None
} else {
Some(last_disc)
},
transient_error_count: c.transient_error_count.load(Ordering::Relaxed),
latest_observed_dbr: match c.latest_observed_dbr.load(Ordering::Relaxed) {
-1 => None,
v => Some(v),
},
metadata_fetch_failures: c.metadata_fetch_failures.load(Ordering::Relaxed),
storage_append_timeouts: c.storage_append_timeouts.load(Ordering::Relaxed),
}
}
}
struct PvHandle {
channel: Option<CaChannel>,
cancel_token: CancellationToken,
#[allow(dead_code)]
dbr_type: ArchDbType,
conn_info: Arc<Mutex<ConnectionInfo>>,
extras: Arc<ExtraFieldsCache>,
field_tokens: Arc<DashMap<String, CancellationToken>>,
update_lock: Arc<tokio::sync::Mutex<()>>,
counters: Arc<PvCounters>,
}
type ExtraFieldsCache = DashMap<String, String>;
const SAMPLE_CHANNEL_CAPACITY: usize = 500_000;
struct PendingGuard<'a> {
map: &'a DashMap<String, ()>,
key: String,
}
impl Drop for PendingGuard<'_> {
fn drop(&mut self) {
self.map.remove(&self.key);
}
}
pub struct ChannelManager {
ca_client: CaClient,
pva_client: PvaClient,
channels: DashMap<String, PvHandle>,
pending_archives: DashMap<String, ()>,
op_locks: DashMap<String, Arc<tokio::sync::Mutex<()>>>,
#[allow(dead_code)]
storage: Arc<dyn StoragePlugin>,
registry: Arc<PvRegistry>,
sample_tx: mpsc::Sender<PvSample>,
policy: Option<PolicyConfig>,
server_ioc_drift_secs: u64,
}
pub struct PvSample {
pub pv_name: String,
pub dbr_type: ArchDbType,
pub sample: ArchiverSample,
pub element_count: Option<i32>,
pub counters: Option<Arc<PvCounters>>,
}
impl ChannelManager {
pub async fn new(
storage: Arc<dyn StoragePlugin>,
registry: Arc<PvRegistry>,
policy: Option<PolicyConfig>,
) -> anyhow::Result<(Self, mpsc::Receiver<PvSample>)> {
Self::new_with_drift(storage, registry, policy, 30 * 60).await
}
pub async fn new_with_drift(
storage: Arc<dyn StoragePlugin>,
registry: Arc<PvRegistry>,
policy: Option<PolicyConfig>,
server_ioc_drift_secs: u64,
) -> anyhow::Result<(Self, mpsc::Receiver<PvSample>)> {
let ca_client = CaClient::new().await.map_err(|e| anyhow::anyhow!("{e}"))?;
let pva_client = PvaClient::new().map_err(|e| anyhow::anyhow!("{e}"))?;
let (tx, rx) = mpsc::channel(SAMPLE_CHANNEL_CAPACITY);
let mgr = Self {
ca_client,
pva_client,
channels: DashMap::new(),
pending_archives: DashMap::new(),
op_locks: DashMap::new(),
storage,
registry,
sample_tx: tx,
policy,
server_ioc_drift_secs,
};
Ok((mgr, rx))
}
fn op_lock(&self, pv_name: &str) -> Arc<tokio::sync::Mutex<()>> {
if let Some(existing) = self.op_locks.get(pv_name) {
return existing.clone();
}
self.op_locks
.entry(pv_name.to_string())
.or_insert_with(|| Arc::new(tokio::sync::Mutex::new(())))
.clone()
}
pub async fn restore_from_registry(&self) -> anyhow::Result<u64> {
let active_pvs = self.registry.pvs_by_status(PvStatus::Active)?;
let total = active_pvs.len() as u64;
info!(total, "Restoring PVs from registry");
let mut restored = 0u64;
for record in active_pvs {
if record.alias_for.is_some() {
warn!(
pv = record.pv_name,
target = record.alias_for.as_deref(),
"Skipping alias row in restore; aliases are routed, not archived"
);
continue;
}
if let Err(e) = self.start_archiving_internal(&record).await {
warn!(pv = record.pv_name, "Failed to restore PV: {e}");
self.registry.set_status(&record.pv_name, PvStatus::Error)?;
} else {
restored += 1;
}
}
metrics::gauge!("archiver_pvs_active").set(restored as f64);
if restored < total {
warn!(
restored,
failed = total - restored,
"Some PVs failed to restore"
);
}
Ok(restored)
}
pub async fn archive_pv(
&self,
pv_name: &str,
sample_mode: &SampleMode,
protocol: Protocol,
) -> anyhow::Result<()> {
let lock = self.op_lock(pv_name);
let _g = lock.lock().await;
if self.channels.contains_key(pv_name) {
anyhow::bail!("PV {pv_name} is already being archived");
}
if self
.pending_archives
.insert(pv_name.to_string(), ())
.is_some()
{
anyhow::bail!("PV {pv_name} archive operation already in progress");
}
let _guard = PendingGuard {
map: &self.pending_archives,
key: pv_name.to_string(),
};
match protocol {
Protocol::Ca => self.archive_pv_inner(pv_name, sample_mode).await,
Protocol::Pva => self.archive_pv_inner_pva(pv_name, sample_mode).await,
}
}
async fn archive_pv_inner(
&self,
pv_name: &str,
sample_mode: &SampleMode,
) -> anyhow::Result<()> {
if self.channels.contains_key(pv_name) {
anyhow::bail!("PV {pv_name} is already being archived");
}
let (effective_mode, matched_policy_name) = if let Some(ref policy) = self.policy {
if let Some(p) = policy.find_policy(pv_name) {
(p.to_sample_mode(), Some(p.policy_name().to_string()))
} else {
(sample_mode.clone(), None)
}
} else {
(sample_mode.clone(), None)
};
let channel = self.ca_client.create_channel(pv_name);
channel
.wait_connected(CA_CONNECT_TIMEOUT)
.await
.map_err(|e| anyhow::anyhow!("Failed to connect to {pv_name}: {e}"))?;
let info = self
.ca_client
.cainfo(pv_name)
.await
.map_err(|e| anyhow::anyhow!("Failed to get info for {pv_name}: {e}"))?;
let dbr_type = dbr_field_to_arch_type(info.native_type);
let element_count = info.element_count as i32;
self.registry
.register_pv(pv_name, dbr_type, &effective_mode, element_count)?;
if let Some(ref name) = matched_policy_name {
self.registry.update_policy_name(pv_name, Some(name))?;
}
let record = self
.registry
.get_pv(pv_name)?
.ok_or_else(|| anyhow::anyhow!("PV {pv_name} not found in registry"))?;
self.start_archiving_internal(&record).await?;
metrics::gauge!("archiver_pvs_active").increment(1.0);
info!(pv = pv_name, ?dbr_type, element_count, "Started archiving");
Ok(())
}
async fn archive_pv_inner_pva(
&self,
pv_name: &str,
sample_mode: &SampleMode,
) -> anyhow::Result<()> {
if self.channels.contains_key(pv_name) {
anyhow::bail!("PV {pv_name} is already being archived");
}
let (effective_mode, matched_policy_name) = if let Some(ref policy) = self.policy {
if let Some(p) = policy.find_policy(pv_name) {
(p.to_sample_mode(), Some(p.policy_name().to_string()))
} else {
(sample_mode.clone(), None)
}
} else {
(sample_mode.clone(), None)
};
let connect = tokio::time::timeout(CA_CONNECT_TIMEOUT, self.pva_client.pvconnect(pv_name))
.await
.map_err(|_| anyhow::anyhow!("PVA connect to {pv_name} timed out"))?
.map_err(|e| anyhow::anyhow!("Failed to connect to {pv_name} via PVA: {e}"))?;
debug!(pv = pv_name, server = %connect, "PVA channel connected");
let initial = tokio::time::timeout(CA_CONNECT_TIMEOUT, self.pva_client.pvget_full(pv_name))
.await
.map_err(|_| anyhow::anyhow!("PVA pvget for {pv_name} timed out"))?
.map_err(|e| anyhow::anyhow!("Failed to pvget {pv_name}: {e}"))?;
let (dbr_type, element_count) =
pv_field_to_arch_db_type(&initial.value).ok_or_else(|| {
anyhow::anyhow!("PV {pv_name}: empty PVA scalar-array; cannot infer element type")
})?;
self.registry.register_pv_with_protocol(
pv_name,
dbr_type,
&effective_mode,
element_count,
Protocol::Pva,
)?;
if let Some(ref name) = matched_policy_name {
self.registry.update_policy_name(pv_name, Some(name))?;
}
let (prec, egu) = pv_field_extract_display(&initial.value);
if (prec.is_some() || egu.is_some())
&& let Err(e) = self
.registry
.update_metadata(pv_name, prec.as_deref(), egu.as_deref())
{
debug!(pv = pv_name, "Failed to persist PVA PREC/EGU: {e}");
}
let record = self
.registry
.get_pv(pv_name)?
.ok_or_else(|| anyhow::anyhow!("PV {pv_name} not found in registry"))?;
self.start_archiving_internal(&record).await?;
metrics::gauge!("archiver_pvs_active").increment(1.0);
info!(
pv = pv_name,
?dbr_type,
element_count,
protocol = "pva",
"Started archiving"
);
Ok(())
}
async fn start_archiving_internal(&self, record: &PvRecord) -> anyhow::Result<()> {
if record.protocol == Protocol::Pva {
return self.start_archiving_internal_pva(record).await;
}
let pv_name = record.pv_name.clone();
let dbr_type = record.dbr_type;
let element_count = record.element_count;
let channel = self.ca_client.create_channel(&pv_name);
let cancel_token = CancellationToken::new();
let conn_info = Arc::new(Mutex::new(ConnectionInfo::default()));
let extras: Arc<ExtraFieldsCache> = Arc::new(DashMap::new());
let field_tokens: Arc<DashMap<String, CancellationToken>> = Arc::new(DashMap::new());
let update_lock = Arc::new(tokio::sync::Mutex::new(()));
let counters = Arc::new(PvCounters::default());
let _guard = update_lock.lock().await;
self.channels.insert(
pv_name.clone(),
PvHandle {
channel: Some(channel.clone()),
cancel_token: cancel_token.clone(),
dbr_type,
conn_info: conn_info.clone(),
extras: extras.clone(),
field_tokens: field_tokens.clone(),
update_lock: update_lock.clone(),
counters: counters.clone(),
},
);
for field in &record.archive_fields {
let child = cancel_token.child_token();
field_tokens.insert(field.clone(), child.clone());
spawn_extra_field_monitor(
&self.ca_client,
&pv_name,
field,
extras.clone(),
child,
counters.clone(),
);
}
metrics::gauge!("archiver_extra_field_tasks").increment(record.archive_fields.len() as f64);
drop(_guard);
let tx = self.sample_tx.clone();
let token = cancel_token.clone();
let ci = conn_info.clone();
let extras_for_loop = extras.clone();
let counters_for_loop = counters.clone();
let registry_for_loop = self.registry.clone();
let drift = self.server_ioc_drift_secs;
match &record.sample_mode {
SampleMode::Monitor => {
tokio::spawn(async move {
monitor_loop(
pv_name,
dbr_type,
element_count,
channel,
tx,
token,
ci,
registry_for_loop,
extras_for_loop,
counters_for_loop,
drift,
)
.await;
});
}
SampleMode::Scan { period_secs } => {
let period = *period_secs;
tokio::spawn(async move {
scan_loop(
pv_name,
dbr_type,
element_count,
channel,
tx,
token,
period,
ci,
registry_for_loop,
extras_for_loop,
counters_for_loop,
)
.await;
});
}
}
Ok(())
}
async fn start_archiving_internal_pva(&self, record: &PvRecord) -> anyhow::Result<()> {
let pv_name = record.pv_name.clone();
let dbr_type = record.dbr_type;
let element_count = record.element_count;
let cancel_token = CancellationToken::new();
let conn_info = Arc::new(Mutex::new(ConnectionInfo::default()));
let extras: Arc<ExtraFieldsCache> = Arc::new(DashMap::new());
let field_tokens: Arc<DashMap<String, CancellationToken>> = Arc::new(DashMap::new());
let update_lock = Arc::new(tokio::sync::Mutex::new(()));
let counters = Arc::new(PvCounters::default());
self.channels.insert(
pv_name.clone(),
PvHandle {
channel: None,
cancel_token: cancel_token.clone(),
dbr_type,
conn_info: conn_info.clone(),
extras: extras.clone(),
field_tokens: field_tokens.clone(),
update_lock,
counters: counters.clone(),
},
);
let tx = self.sample_tx.clone();
let pva_client = self.pva_client.clone();
let token = cancel_token.clone();
let ci = conn_info.clone();
let counters_for_loop = counters.clone();
let drift = self.server_ioc_drift_secs;
match &record.sample_mode {
SampleMode::Monitor => {
let pv_name_loop = pv_name.clone();
let archive_fields_loop = record.archive_fields.clone();
let extras_for_loop = extras.clone();
tokio::spawn(async move {
monitor_loop_pva(
pv_name_loop,
dbr_type,
element_count,
pva_client,
tx,
token,
ci,
counters_for_loop,
drift,
archive_fields_loop,
extras_for_loop,
)
.await;
});
}
SampleMode::Scan { period_secs } => {
let period = *period_secs;
let pv_name_loop = pv_name.clone();
let archive_fields_loop = record.archive_fields.clone();
let extras_for_loop = extras.clone();
tokio::spawn(async move {
scan_loop_pva(
pv_name_loop,
dbr_type,
pva_client,
tx,
token,
period,
ci,
counters_for_loop,
drift,
archive_fields_loop,
extras_for_loop,
)
.await;
});
}
}
{
let pv_name = pv_name.clone();
let pva_client = self.pva_client.clone();
let registry = self.registry.clone();
let cancel = cancel_token.clone();
let counters_for_refresh = counters.clone();
tokio::spawn(async move {
pva_metadata_refresh_loop(
pv_name,
pva_client,
registry,
cancel,
counters_for_refresh,
)
.await;
});
}
{
let pv_name = pv_name.clone();
let conn_info = conn_info.clone();
let counters_for_watch = counters.clone();
let cancel = cancel_token.clone();
let sample_mode = record.sample_mode.clone();
tokio::spawn(async move {
pva_state_watchdog(pv_name, conn_info, counters_for_watch, cancel, sample_mode)
.await;
});
}
Ok(())
}
pub async fn update_archive_fields(
&self,
pv_name: &str,
fields: &[String],
) -> anyhow::Result<()> {
self.registry.update_archive_fields(pv_name, fields)?;
let (parent_token, extras, field_tokens, update_lock, counters) = {
let Some(handle) = self.channels.get(pv_name) else {
return Ok(());
};
(
handle.cancel_token.clone(),
handle.extras.clone(),
handle.field_tokens.clone(),
handle.update_lock.clone(),
handle.counters.clone(),
)
};
let _guard = update_lock.lock().await;
let wanted: std::collections::HashSet<&str> = fields.iter().map(|s| s.as_str()).collect();
let to_remove: Vec<String> = field_tokens
.iter()
.filter(|e| !wanted.contains(e.key().as_str()))
.map(|e| e.key().clone())
.collect();
let removed_count = to_remove.len();
for key in to_remove {
if let Some((_, token)) = field_tokens.remove(&key) {
token.cancel();
}
extras.remove(&key);
}
let mut added_count = 0usize;
for f in fields {
if !field_tokens.contains_key(f) {
let child = parent_token.child_token();
field_tokens.insert(f.clone(), child.clone());
spawn_extra_field_monitor(
&self.ca_client,
pv_name,
f,
extras.clone(),
child,
counters.clone(),
);
added_count += 1;
}
}
let net = added_count as i64 - removed_count as i64;
if net != 0 {
metrics::gauge!("archiver_extra_field_tasks").increment(net as f64);
}
Ok(())
}
pub async fn pause_pv(&self, pv_name: &str) -> anyhow::Result<()> {
let lock = self.op_lock(pv_name);
let _g = lock.lock().await;
if let Some((_key, handle)) = self.channels.remove(pv_name) {
let extra_count = handle.field_tokens.len() as f64;
handle.cancel_token.cancel();
metrics::gauge!("archiver_pvs_active").decrement(1.0);
if extra_count > 0.0 {
metrics::gauge!("archiver_extra_field_tasks").decrement(extra_count);
}
}
self.registry.set_status(pv_name, PvStatus::Paused)?;
info!(pv = pv_name, "Paused archiving");
Ok(())
}
pub async fn resume_pv(&self, pv_name: &str) -> anyhow::Result<()> {
let lock = self.op_lock(pv_name);
let _g = lock.lock().await;
let record = self
.registry
.get_pv(pv_name)?
.ok_or_else(|| anyhow::anyhow!("PV {pv_name} not found in registry"))?;
if record.status == PvStatus::Active && self.channels.contains_key(pv_name) {
info!(
pv = pv_name,
"PV is already actively archived, skipping resume"
);
return Ok(());
}
if let Some((_key, handle)) = self.channels.remove(pv_name) {
let extra_count = handle.field_tokens.len() as f64;
handle.cancel_token.cancel();
if extra_count > 0.0 {
metrics::gauge!("archiver_extra_field_tasks").decrement(extra_count);
}
}
self.start_archiving_internal(&record).await?;
self.registry.set_status(pv_name, PvStatus::Active)?;
metrics::gauge!("archiver_pvs_active").increment(1.0);
info!(pv = pv_name, "Resumed archiving");
Ok(())
}
pub async fn stop_pv(&self, pv_name: &str) -> anyhow::Result<()> {
let lock = self.op_lock(pv_name);
let _g = lock.lock().await;
if let Some((_key, handle)) = self.channels.remove(pv_name) {
let extra_count = handle.field_tokens.len() as f64;
handle.cancel_token.cancel();
metrics::gauge!("archiver_pvs_active").decrement(1.0);
if extra_count > 0.0 {
metrics::gauge!("archiver_extra_field_tasks").decrement(extra_count);
}
}
self.registry.set_status(pv_name, PvStatus::Inactive)?;
info!(pv = pv_name, "Stopped archiving (inactive)");
Ok(())
}
pub async fn destroy_pv(&self, pv_name: &str) -> anyhow::Result<()> {
let lock = self.op_lock(pv_name);
let _g = lock.lock().await;
if let Some((_key, handle)) = self.channels.remove(pv_name) {
let extra_count = handle.field_tokens.len() as f64;
handle.cancel_token.cancel();
metrics::gauge!("archiver_pvs_active").decrement(1.0);
if extra_count > 0.0 {
metrics::gauge!("archiver_extra_field_tasks").decrement(extra_count);
}
}
self.registry.remove_pv(pv_name)?;
info!(pv = pv_name, "Destroyed archiving channel");
Ok(())
}
pub fn list_pvs(&self) -> Vec<String> {
self.registry.all_pv_names().unwrap_or_else(|e| {
warn!("Failed to list PVs: {e}");
Vec::new()
})
}
pub fn matching_pvs(&self, pattern: &str) -> Vec<String> {
self.registry.matching_pvs(pattern).unwrap_or_else(|e| {
warn!("Failed to match PVs: {e}");
Vec::new()
})
}
pub fn registry(&self) -> &Arc<PvRegistry> {
&self.registry
}
pub fn get_connection_info(&self, pv: &str) -> Option<ConnectionInfo> {
self.channels.get(pv).map(|h| {
h.conn_info
.lock()
.unwrap_or_else(|e| e.into_inner())
.clone()
})
}
pub fn get_never_connected_pvs(&self) -> Vec<String> {
self.channels
.iter()
.filter(|entry| {
let ci = entry
.value()
.conn_info
.lock()
.unwrap_or_else(|e| e.into_inner());
ci.connected_since.is_none()
})
.map(|entry| entry.key().clone())
.collect()
}
pub fn pv_counters(&self, pv_name: &str) -> Option<Arc<PvCounters>> {
self.channels.get(pv_name).map(|h| h.counters.clone())
}
pub fn all_pv_counters(&self) -> Vec<(String, PvCountersSnapshot)> {
self.channels
.iter()
.map(|e| {
(
e.key().clone(),
PvCountersSnapshot::from(&*e.value().counters),
)
})
.collect()
}
pub async fn live_value(
&self,
pv_name: &str,
timeout: Duration,
) -> Option<anyhow::Result<ArchiverValue>> {
let channel_opt = self.channels.get(pv_name)?.channel.clone();
let Some(channel) = channel_opt else {
let pva = self.pva_client.clone();
let name = pv_name.to_string();
let res = tokio::time::timeout(timeout, pva.pvget_full(&name)).await;
return Some(match res {
Ok(Ok(result)) => pv_field_scalar_to_archiver(&result.value, &result.introspection)
.ok_or_else(|| anyhow::anyhow!("PVA value has no archiver mapping")),
Ok(Err(e)) => Err(anyhow::anyhow!("PVA get failed: {e}")),
Err(_) => Err(anyhow::anyhow!("PVA get timed out after {timeout:?}")),
});
};
if channel.wait_connected(timeout).await.is_err() {
return Some(Err(anyhow::anyhow!(
"channel not connected within {timeout:?}"
)));
}
match tokio::time::timeout(timeout, channel.get()).await {
Ok(Ok((_dbr_type, val))) => Some(Ok(epics_value_to_archiver(&val))),
Ok(Err(e)) => Some(Err(anyhow::anyhow!("CA get failed: {e}"))),
Err(_) => Some(Err(anyhow::anyhow!("CA get timed out after {timeout:?}"))),
}
}
pub fn extras_snapshot(&self, pv_name: &str) -> std::collections::HashMap<String, String> {
match self.channels.get(pv_name) {
Some(handle) => handle
.extras
.iter()
.map(|e| (e.key().clone(), e.value().clone()))
.collect(),
None => std::collections::HashMap::new(),
}
}
pub fn get_currently_disconnected_pvs(&self) -> Vec<String> {
self.channels
.iter()
.filter(|entry| {
let ci = entry
.value()
.conn_info
.lock()
.unwrap_or_else(|e| e.into_inner());
!ci.is_connected
})
.map(|entry| entry.key().clone())
.collect()
}
}
async fn refresh_ctrl_metadata(
channel: &CaChannel,
registry: &PvRegistry,
pv_name: &str,
counters: &PvCounters,
) {
const FETCH_TIMEOUT: Duration = Duration::from_secs(15);
let snapshot = match tokio::time::timeout(
FETCH_TIMEOUT,
channel.get_with_metadata(DbrClass::Ctrl),
)
.await
{
Ok(Ok(s)) => s,
Ok(Err(e)) => {
counters
.metadata_fetch_failures
.fetch_add(1, Ordering::Relaxed);
debug!(pv = pv_name, "Ctrl metadata fetch failed: {e}");
return;
}
Err(_) => {
counters
.metadata_fetch_failures
.fetch_add(1, Ordering::Relaxed);
debug!(pv = pv_name, "Ctrl metadata fetch timed out");
return;
}
};
let Some(display) = snapshot.display else {
return;
};
let new_prec_opt: Option<String> = if display.precision < 0 {
None
} else {
Some(display.precision.to_string())
};
let new_egu_trimmed = display.units.trim();
let new_egu_opt: Option<&str> = if new_egu_trimmed.is_empty() {
None
} else {
Some(new_egu_trimmed)
};
let stored = match registry.get_pv(pv_name) {
Ok(Some(r)) => r,
Ok(None) => return,
Err(e) => {
debug!(
pv = pv_name,
"Registry read for metadata compare failed: {e}"
);
return;
}
};
let prec_changed = match (stored.prec.as_deref(), new_prec_opt.as_deref()) {
(Some(s), Some(n)) => s != n,
(None, Some(_)) => true,
_ => false,
};
let egu_changed = match (stored.egu.as_deref(), new_egu_opt) {
(Some(s), Some(n)) => s != n,
(None, Some(_)) => true,
_ => false,
};
if !prec_changed && !egu_changed {
return;
}
let prec_arg = if prec_changed {
new_prec_opt.as_deref()
} else {
None
};
let egu_arg = if egu_changed { new_egu_opt } else { None };
if let Err(e) = registry.update_metadata(pv_name, prec_arg, egu_arg) {
warn!(pv = pv_name, "Failed to persist PREC/EGU: {e}");
} else {
debug!(
pv = pv_name,
prec = ?prec_arg, egu = ?egu_arg,
"Refreshed PREC/EGU from DBR_CTRL"
);
}
}
#[allow(clippy::too_many_arguments)]
fn pva_handle_event(
field: &PvField,
canonical_desc: &epics_rs::pva::pvdata::FieldDesc,
pv_name: &str,
dbr_type: ArchDbType,
tx: &mpsc::Sender<PvSample>,
conn_info: &Mutex<ConnectionInfo>,
counters: &Arc<PvCounters>,
extras: &ExtraFieldsCache,
extras_paths: &[(String, &'static str)],
drift_secs: u64,
) {
let now = SystemTime::now();
let first_after_connect = {
let mut ci = conn_info.lock().unwrap_or_else(|e| e.into_inner());
let first = ci.last_event_time.is_none();
if ci.connected_since.is_none() {
ci.connected_since = Some(now);
}
ci.is_connected = true;
ci.last_event_time = Some(now);
ci.state = PvConnectionState::Connected;
first
};
if first_after_connect {
counters
.first_event_unix_secs
.compare_exchange(0, unix_secs(now), Ordering::Relaxed, Ordering::Relaxed)
.ok();
}
counters.events_received.fetch_add(1, Ordering::Relaxed);
let Some(value) = pv_field_scalar_to_archiver(field, canonical_desc) else {
counters.type_change_drops.fetch_add(1, Ordering::Relaxed);
debug!(pv = pv_name, "PVA event has no archiver mapping; dropping");
return;
};
let ts = pv_field_extract_timestamp(field);
if !first_after_connect && !ioc_timestamp_in_window(ts, now, drift_secs) {
counters.timestamp_drops.fetch_add(1, Ordering::Relaxed);
debug!(
pv = pv_name,
?ts,
"Dropping PVA sample with out-of-window timestamp"
);
return;
}
let elem_count = match pv_field_element_count(field) {
0 => 1,
n => n,
};
refresh_nt_enum_extras(field, extras);
for (field_name, path) in extras_paths {
if let Some(PvField::Scalar(s)) = pv_field_walk_path(field, path) {
extras.insert(field_name.clone(), scalar_value_to_string(s));
}
}
let mut sample = ArchiverSample::new(ts, value);
attach_extras(extras, &mut sample);
let pv_sample = PvSample {
pv_name: pv_name.to_string(),
dbr_type,
sample,
element_count: Some(elem_count),
counters: Some(counters.clone()),
};
if let Err(tokio::sync::mpsc::error::TrySendError::Full(_)) = tx.try_send(pv_sample) {
counters
.buffer_overflow_drops
.fetch_add(1, Ordering::Relaxed);
}
}
#[allow(clippy::too_many_arguments)]
async fn monitor_loop_pva(
pv_name: String,
dbr_type: ArchDbType,
element_count: i32,
pva_client: PvaClient,
tx: mpsc::Sender<PvSample>,
cancel_token: CancellationToken,
conn_info: Arc<Mutex<ConnectionInfo>>,
counters: Arc<PvCounters>,
server_ioc_drift_secs: u64,
archive_fields: Vec<String>,
extras: Arc<ExtraFieldsCache>,
) {
let drift_secs = server_ioc_drift_secs;
let _ = element_count;
let extras_paths: Vec<(String, &'static str)> = archive_fields
.iter()
.filter_map(|f| ca_archive_field_to_pva_path(f).map(|p| (f.clone(), p)))
.collect();
let request_expr = if extras_paths.is_empty() {
None
} else {
let mut builder = epics_rs::pva::pv_request::PvRequestBuilder::new()
.field("value")
.field("alarm")
.field("timeStamp");
for (_, path) in &extras_paths {
builder = builder.field(*path);
}
Some(builder.build())
};
loop {
if let Some(ref req) = request_expr {
let canonical = match pva_client.pvinfo(&pv_name).await {
Ok(d) => Arc::new(d),
Err(e) => {
counters
.transient_error_count
.fetch_add(1, Ordering::Relaxed);
warn!(
pv = pv_name,
"PVA pvinfo failed: {e}; cannot capture canonical descriptor — retrying subscribe"
);
tokio::select! {
_ = cancel_token.cancelled() => return,
_ = tokio::time::sleep(CA_RETRY_DELAY) => continue,
}
}
};
let pv_name_cb = pv_name.clone();
let tx_cb = tx.clone();
let conn_info_cb = conn_info.clone();
let counters_cb = counters.clone();
let extras_cb = extras.clone();
let extras_paths_cb = extras_paths.clone();
let canonical_cb = canonical.clone();
let cb = move |field: &PvField| {
pva_handle_event(
field,
&canonical_cb,
&pv_name_cb,
dbr_type,
&tx_cb,
&conn_info_cb,
&counters_cb,
&extras_cb,
&extras_paths_cb,
drift_secs,
);
};
tokio::select! {
_ = cancel_token.cancelled() => {
debug!(pv = pv_name, "PVA monitor (custom request) cancelled");
return;
}
res = pva_client.pvmonitor_with_request(&pv_name, req, cb) => {
match res {
Ok(()) => {
debug!(pv = pv_name, "PVA pvmonitor_with_request returned Ok; resubscribing");
}
Err(e) => {
counters
.transient_error_count
.fetch_add(1, Ordering::Relaxed);
warn!(pv = pv_name, "PVA pvmonitor_with_request failed: {e}; retrying");
}
}
tokio::select! {
_ = cancel_token.cancelled() => return,
_ = tokio::time::sleep(CA_RETRY_DELAY) => continue,
}
}
}
} else {
let pv_name_cb = pv_name.clone();
let tx_cb = tx.clone();
let conn_info_cb = conn_info.clone();
let counters_cb = counters.clone();
let extras_cb = extras.clone();
let extras_paths_cb = extras_paths.clone();
let cb = move |desc: &epics_rs::pva::pvdata::FieldDesc, field: &PvField| {
pva_handle_event(
field,
desc,
&pv_name_cb,
dbr_type,
&tx_cb,
&conn_info_cb,
&counters_cb,
&extras_cb,
&extras_paths_cb,
drift_secs,
);
};
let handle = match pva_client.pvmonitor_handle(&pv_name, cb).await {
Ok(h) => h,
Err(e) => {
counters
.transient_error_count
.fetch_add(1, Ordering::Relaxed);
warn!(pv = pv_name, "PVA pvmonitor failed: {e}; retrying");
tokio::select! {
_ = cancel_token.cancelled() => return,
_ = tokio::time::sleep(CA_RETRY_DELAY) => continue,
}
}
};
debug!(pv = pv_name, "PVA monitor active");
cancel_token.cancelled().await;
debug!(pv = pv_name, "PVA monitor cancelled; dropping subscription");
drop(handle);
return;
}
}
}
#[allow(clippy::too_many_arguments)]
async fn scan_loop_pva(
pv_name: String,
dbr_type: ArchDbType,
pva_client: PvaClient,
tx: mpsc::Sender<PvSample>,
cancel_token: CancellationToken,
period_secs: f64,
conn_info: Arc<Mutex<ConnectionInfo>>,
counters: Arc<PvCounters>,
server_ioc_drift_secs: u64,
archive_fields: Vec<String>,
extras: Arc<ExtraFieldsCache>,
) {
let extras_paths: Vec<(String, &'static str)> = archive_fields
.iter()
.filter_map(|f| ca_archive_field_to_pva_path(f).map(|p| (f.clone(), p)))
.collect();
let period = Duration::from_secs_f64(period_secs);
let mut interval = tokio::time::interval(period);
let drift_secs = server_ioc_drift_secs;
let pvget_timeout = period.max(Duration::from_secs(5));
loop {
tokio::select! {
_ = cancel_token.cancelled() => return,
_ = interval.tick() => {}
}
let res = tokio::time::timeout(pvget_timeout, pva_client.pvget_full(&pv_name)).await;
let (field, canonical) = match res {
Ok(Ok(r)) => (r.value, r.introspection),
Ok(Err(e)) => {
let was_connected = {
let mut ci = conn_info.lock().unwrap_or_else(|e| e.into_inner());
let prev = ci.is_connected;
ci.is_connected = false;
ci.last_event_time = None;
ci.state = match ci.state {
PvConnectionState::Connected => PvConnectionState::Disconnected,
PvConnectionState::Disconnected => PvConnectionState::Disconnected,
_ => PvConnectionState::Connecting,
};
prev
};
if was_connected {
counters.disconnect_count.fetch_add(1, Ordering::Relaxed);
counters
.last_disconnect_unix_secs
.store(unix_secs(SystemTime::now()), Ordering::Relaxed);
}
counters
.transient_error_count
.fetch_add(1, Ordering::Relaxed);
debug!(pv = pv_name, "PVA scan pvget failed: {e}");
continue;
}
Err(_) => {
counters
.transient_error_count
.fetch_add(1, Ordering::Relaxed);
debug!(pv = pv_name, "PVA scan pvget timed out");
continue;
}
};
let now = SystemTime::now();
let first_after_connect = {
let mut ci = conn_info.lock().unwrap_or_else(|e| e.into_inner());
let first = ci.last_event_time.is_none();
if ci.connected_since.is_none() {
ci.connected_since = Some(now);
}
ci.is_connected = true;
ci.last_event_time = Some(now);
ci.state = PvConnectionState::Connected;
first
};
counters.events_received.fetch_add(1, Ordering::Relaxed);
if first_after_connect {
counters
.first_event_unix_secs
.compare_exchange(0, unix_secs(now), Ordering::Relaxed, Ordering::Relaxed)
.ok();
}
let Some(value) = pv_field_scalar_to_archiver(&field, &canonical) else {
counters.type_change_drops.fetch_add(1, Ordering::Relaxed);
debug!(
pv = pv_name,
"PVA scan value has no archiver mapping; dropping"
);
continue;
};
let _ = drift_secs; let elem_count = match pv_field_element_count(&field) {
0 => 1,
n => n,
};
refresh_nt_enum_extras(&field, &extras);
for (field_name, path) in &extras_paths {
if let Some(PvField::Scalar(s)) = pv_field_walk_path(&field, path) {
extras.insert(field_name.clone(), scalar_value_to_string(s));
}
}
let mut sample = ArchiverSample::new(now, value);
attach_extras(&extras, &mut sample);
let pv_sample = PvSample {
pv_name: pv_name.clone(),
dbr_type,
sample,
element_count: Some(elem_count),
counters: Some(counters.clone()),
};
if let Err(rejected) = try_send_with_overflow_count(&tx, pv_sample, &counters).await {
let _ = rejected;
return;
}
}
}
async fn pva_metadata_refresh_loop(
pv_name: String,
pva_client: PvaClient,
registry: Arc<PvRegistry>,
cancel_token: CancellationToken,
counters: Arc<PvCounters>,
) {
const REFRESH_INTERVAL: Duration = Duration::from_secs(5 * 60);
const FETCH_TIMEOUT: Duration = Duration::from_secs(15);
let mut tick = tokio::time::interval(REFRESH_INTERVAL);
tick.tick().await;
loop {
tokio::select! {
_ = cancel_token.cancelled() => return,
_ = tick.tick() => {}
}
let res = tokio::time::timeout(FETCH_TIMEOUT, pva_client.pvget(&pv_name)).await;
let field = match res {
Ok(Ok(f)) => f,
_ => {
counters
.metadata_fetch_failures
.fetch_add(1, Ordering::Relaxed);
continue;
}
};
let (new_prec, new_egu) = pv_field_extract_display(&field);
let stored = match registry.get_pv(&pv_name) {
Ok(Some(r)) => r,
_ => continue,
};
let prec_changed = match (stored.prec.as_deref(), new_prec.as_deref()) {
(Some(s), Some(n)) => s != n,
(None, Some(_)) => true,
_ => false,
};
let egu_changed = match (stored.egu.as_deref(), new_egu.as_deref()) {
(Some(s), Some(n)) => s != n,
(None, Some(_)) => true,
_ => false,
};
if !prec_changed && !egu_changed {
continue;
}
let prec_arg = if prec_changed {
new_prec.as_deref()
} else {
None
};
let egu_arg = if egu_changed {
new_egu.as_deref()
} else {
None
};
if let Err(e) = registry.update_metadata(&pv_name, prec_arg, egu_arg) {
warn!(pv = pv_name, "Failed to persist PVA PREC/EGU: {e}");
} else {
debug!(pv = pv_name, prec = ?prec_arg, egu = ?egu_arg, "Refreshed PVA display");
}
}
}
async fn pva_state_watchdog(
pv_name: String,
conn_info: Arc<Mutex<ConnectionInfo>>,
counters: Arc<PvCounters>,
cancel_token: CancellationToken,
sample_mode: SampleMode,
) {
const POLL_INTERVAL: Duration = Duration::from_secs(5);
let stale_threshold = match sample_mode {
SampleMode::Monitor => Duration::from_secs(60),
SampleMode::Scan { period_secs } => Duration::from_secs_f64((period_secs * 3.0).max(60.0)),
};
let mut interval = tokio::time::interval(POLL_INTERVAL);
interval.tick().await;
loop {
tokio::select! {
_ = cancel_token.cancelled() => return,
_ = interval.tick() => {}
}
let stale_now = {
let ci = conn_info.lock().unwrap_or_else(|e| e.into_inner());
if !ci.is_connected {
continue;
}
match ci.last_event_time {
Some(t) => SystemTime::now()
.duration_since(t)
.map(|d| d > stale_threshold)
.unwrap_or(false),
None => false,
}
};
if stale_now {
let was_connected = {
let mut ci = conn_info.lock().unwrap_or_else(|e| e.into_inner());
let prev = ci.is_connected;
ci.is_connected = false;
ci.state = PvConnectionState::Disconnected;
prev
};
if was_connected {
counters.disconnect_count.fetch_add(1, Ordering::Relaxed);
counters
.last_disconnect_unix_secs
.store(unix_secs(SystemTime::now()), Ordering::Relaxed);
debug!(
pv = pv_name,
"PVA watchdog: marking disconnected after stale heartbeat"
);
}
}
}
}
#[allow(clippy::too_many_arguments)]
async fn monitor_loop(
pv_name: String,
dbr_type: ArchDbType,
element_count: i32,
channel: CaChannel,
tx: mpsc::Sender<PvSample>,
cancel_token: CancellationToken,
conn_info: Arc<Mutex<ConnectionInfo>>,
registry: Arc<PvRegistry>,
extras: Arc<ExtraFieldsCache>,
counters: Arc<PvCounters>,
server_ioc_drift_secs: u64,
) {
loop {
tokio::select! {
_ = cancel_token.cancelled() => return,
result = channel.wait_connected(CA_RECONNECT_TIMEOUT) => {
if result.is_err() {
let was_connected = {
let mut ci = conn_info.lock().unwrap_or_else(|e| e.into_inner());
let prev_connected = ci.is_connected;
ci.is_connected = false;
ci.last_event_time = None;
ci.state = match ci.state {
PvConnectionState::Connected => PvConnectionState::Disconnected,
PvConnectionState::Disconnected => PvConnectionState::Disconnected,
_ => PvConnectionState::Connecting,
};
prev_connected
};
if was_connected {
counters.disconnect_count.fetch_add(1, Ordering::Relaxed);
counters
.last_disconnect_unix_secs
.store(unix_secs(SystemTime::now()), Ordering::Relaxed);
}
let mut conn_rx = channel.connection_events();
if channel
.wait_connected(Duration::from_millis(100))
.await
.is_err()
{
loop {
tokio::select! {
_ = cancel_token.cancelled() => return,
event = conn_rx.recv() => {
use tokio::sync::broadcast::error::RecvError;
match event {
Ok(ConnectionEvent::Connected) => break,
Ok(_) => continue,
Err(RecvError::Lagged(_)) => {
if channel
.wait_connected(Duration::from_millis(100))
.await
.is_ok()
{
break;
}
continue;
}
Err(RecvError::Closed) => return,
}
}
}
}
}
}
}
}
{
let channel = channel.clone();
let registry = registry.clone();
let counters = counters.clone();
let pv_name = pv_name.clone();
let cancel = cancel_token.clone();
tokio::spawn(async move {
tokio::select! {
_ = cancel.cancelled() => {}
_ = refresh_ctrl_metadata(&channel, ®istry, &pv_name, &counters) => {}
}
});
}
let mut monitor = match channel.subscribe().await {
Ok(m) => m,
Err(e) => {
counters
.transient_error_count
.fetch_add(1, Ordering::Relaxed);
warn!(pv = pv_name, "Subscribe failed: {e}, retrying...");
tokio::select! {
_ = cancel_token.cancelled() => return,
_ = tokio::time::sleep(CA_RETRY_DELAY) => continue,
}
}
};
debug!(pv = pv_name, "Monitor subscription active");
loop {
tokio::select! {
_ = cancel_token.cancelled() => return,
result = monitor.recv() => {
match result {
Some(Ok(snapshot)) => {
let now = SystemTime::now();
let first_after_connect = {
let mut ci = conn_info.lock().unwrap_or_else(|e| e.into_inner());
let first = ci.last_event_time.is_none();
if ci.connected_since.is_none() {
ci.connected_since = Some(now);
}
ci.is_connected = true;
ci.last_event_time = Some(now);
ci.state = PvConnectionState::Connected;
first
};
if !first_after_connect
&& !ioc_timestamp_in_window(
snapshot.timestamp,
now,
server_ioc_drift_secs,
)
{
counters.timestamp_drops.fetch_add(1, Ordering::Relaxed);
debug!(
pv = pv_name,
?snapshot.timestamp,
"Dropping sample with out-of-window IOC timestamp"
);
continue;
}
counters.events_received.fetch_add(1, Ordering::Relaxed);
let now_secs = unix_secs(now);
let _ = counters.first_event_unix_secs.compare_exchange(
0,
now_secs,
Ordering::Relaxed,
Ordering::Relaxed,
);
let archiver_val = epics_value_to_archiver(&snapshot.value);
let mut sample = ArchiverSample::new(snapshot.timestamp, archiver_val);
attach_extras(&extras, &mut sample);
if first_after_connect {
let lost_secs = counters
.last_disconnect_unix_secs
.load(Ordering::Relaxed);
attach_cnx_lost_headers(&mut sample, lost_secs, now_secs);
}
let pv_sample = PvSample {
pv_name: pv_name.clone(),
dbr_type,
sample,
element_count: Some(element_count),
counters: Some(counters.clone()),
};
if let Err(pv_sample) = try_send_with_overflow_count(
&tx,
pv_sample,
&counters,
)
.await
{
let _ = pv_sample;
return; }
}
Some(Err(e)) => {
counters.transient_error_count.fetch_add(1, Ordering::Relaxed);
warn!(pv = pv_name, "Monitor error: {e}");
}
None => break, }
}
}
}
{
let mut ci = conn_info.lock().unwrap_or_else(|e| e.into_inner());
ci.is_connected = false;
ci.last_event_time = None;
ci.state = PvConnectionState::Disconnected;
}
counters.disconnect_count.fetch_add(1, Ordering::Relaxed);
counters
.last_disconnect_unix_secs
.store(unix_secs(SystemTime::now()), Ordering::Relaxed);
debug!(pv = pv_name, "Monitor ended, waiting for reconnection");
}
}
fn unix_secs(t: SystemTime) -> i64 {
t.duration_since(SystemTime::UNIX_EPOCH)
.map(|d| d.as_secs() as i64)
.unwrap_or(0)
}
async fn try_send_with_overflow_count(
tx: &mpsc::Sender<PvSample>,
pv_sample: PvSample,
counters: &PvCounters,
) -> Result<(), PvSample> {
match tx.try_send(pv_sample) {
Ok(()) => Ok(()),
Err(tokio::sync::mpsc::error::TrySendError::Full(pv_sample)) => {
counters
.buffer_overflow_drops
.fetch_add(1, Ordering::Relaxed);
tx.send(pv_sample).await.map_err(|e| e.0)
}
Err(tokio::sync::mpsc::error::TrySendError::Closed(pv_sample)) => Err(pv_sample),
}
}
#[allow(clippy::too_many_arguments)]
async fn scan_loop(
pv_name: String,
dbr_type: ArchDbType,
element_count: i32,
channel: CaChannel,
tx: mpsc::Sender<PvSample>,
cancel_token: CancellationToken,
period_secs: f64,
conn_info: Arc<Mutex<ConnectionInfo>>,
registry: Arc<PvRegistry>,
extras: Arc<ExtraFieldsCache>,
counters: Arc<PvCounters>,
) {
let period = Duration::from_secs_f64(period_secs);
let mut interval = tokio::time::interval(period);
let mut metadata_done = false;
loop {
tokio::select! {
_ = cancel_token.cancelled() => return,
_ = interval.tick() => {}
}
if channel.wait_connected(CA_RETRY_DELAY).await.is_err() {
let was_connected = {
let mut ci = conn_info.lock().unwrap_or_else(|e| e.into_inner());
let prev = ci.is_connected;
ci.is_connected = false;
ci.last_event_time = None;
ci.state = match ci.state {
PvConnectionState::Connected => PvConnectionState::Disconnected,
PvConnectionState::Disconnected => PvConnectionState::Disconnected,
_ => PvConnectionState::Connecting,
};
prev
};
if was_connected {
counters.disconnect_count.fetch_add(1, Ordering::Relaxed);
counters
.last_disconnect_unix_secs
.store(unix_secs(SystemTime::now()), Ordering::Relaxed);
}
metadata_done = false;
continue;
}
if !metadata_done {
let channel = channel.clone();
let registry = registry.clone();
let counters = counters.clone();
let pv_name = pv_name.clone();
let cancel = cancel_token.clone();
tokio::spawn(async move {
tokio::select! {
_ = cancel.cancelled() => {}
_ = refresh_ctrl_metadata(&channel, ®istry, &pv_name, &counters) => {}
}
});
metadata_done = true;
}
match channel.get().await {
Ok((_dbr_type, epics_val)) => {
let now = SystemTime::now();
let first_after_connect = {
let mut ci = conn_info.lock().unwrap_or_else(|e| e.into_inner());
let first = ci.last_event_time.is_none();
if ci.connected_since.is_none() {
ci.connected_since = Some(now);
}
ci.is_connected = true;
ci.last_event_time = Some(now);
ci.state = PvConnectionState::Connected;
first
};
counters.events_received.fetch_add(1, Ordering::Relaxed);
let now_secs = unix_secs(now);
let _ = counters.first_event_unix_secs.compare_exchange(
0,
now_secs,
Ordering::Relaxed,
Ordering::Relaxed,
);
let archiver_val = epics_value_to_archiver(&epics_val);
let mut sample = ArchiverSample::new(now, archiver_val);
attach_extras(&extras, &mut sample);
if first_after_connect {
let lost_secs = counters.last_disconnect_unix_secs.load(Ordering::Relaxed);
attach_cnx_lost_headers(&mut sample, lost_secs, now_secs);
}
let pv_sample = PvSample {
pv_name: pv_name.clone(),
dbr_type,
sample,
element_count: Some(element_count),
counters: Some(counters.clone()),
};
if try_send_with_overflow_count(&tx, pv_sample, &counters)
.await
.is_err()
{
return;
}
}
Err(e) => {
counters
.transient_error_count
.fetch_add(1, Ordering::Relaxed);
debug!(pv = pv_name, "Scan read error: {e}");
}
}
}
}
fn attach_cnx_lost_headers(sample: &mut ArchiverSample, lost_secs: i64, now_secs: i64) {
sample
.field_values
.push(("cnxlostepsecs".to_string(), lost_secs.to_string()));
sample
.field_values
.push(("cnxregainedepsecs".to_string(), now_secs.to_string()));
sample
.field_values
.push(("startup".to_string(), "true".to_string()));
}
fn attach_extras(extras: &ExtraFieldsCache, sample: &mut ArchiverSample) {
if extras.is_empty() {
return;
}
let mut entries: Vec<(String, String)> = extras
.iter()
.map(|e| (e.key().clone(), e.value().clone()))
.collect();
entries.sort_by(|a, b| a.0.cmp(&b.0));
sample.field_values = entries;
}
fn epics_value_to_field_string(val: &EpicsValue) -> String {
match val {
EpicsValue::String(s) => s.clone(),
EpicsValue::Short(v) => v.to_string(),
EpicsValue::Float(v) => v.to_string(),
EpicsValue::Enum(v) => v.to_string(),
EpicsValue::Char(v) => v.to_string(),
EpicsValue::Long(v) => v.to_string(),
EpicsValue::Int64(v) => v.to_string(),
EpicsValue::Double(v) => v.to_string(),
EpicsValue::ShortArray(v) => format!("{v:?}"),
EpicsValue::FloatArray(v) => format!("{v:?}"),
EpicsValue::EnumArray(v) => format!("{v:?}"),
EpicsValue::DoubleArray(v) => format!("{v:?}"),
EpicsValue::LongArray(v) => format!("{v:?}"),
EpicsValue::Int64Array(v) => format!("{v:?}"),
EpicsValue::CharArray(v) => String::from_utf8_lossy(v).into_owned(),
EpicsValue::StringArray(v) => format!("{v:?}"),
}
}
fn spawn_extra_field_monitor(
ca_client: &CaClient,
pv_name: &str,
field: &str,
extras: Arc<ExtraFieldsCache>,
parent_token: CancellationToken,
counters: Arc<PvCounters>,
) {
let full_name = format!("{pv_name}.{field}");
let channel = ca_client.create_channel(&full_name);
let field_owned = field.to_string();
let pv_owned = pv_name.to_string();
let panic_pv = pv_owned.clone();
let panic_field = field_owned.clone();
tokio::spawn(async move {
let body = std::panic::AssertUnwindSafe(extra_field_monitor_body(
channel,
pv_owned,
field_owned,
extras,
parent_token,
counters,
));
if let Err(payload) = futures::FutureExt::catch_unwind(body).await {
let msg = panic_payload_msg(&payload);
error!(
pv = panic_pv,
field = panic_field,
"Extra-field monitor panicked: {msg}"
);
}
});
}
async fn extra_field_monitor_body(
channel: CaChannel,
pv_owned: String,
field_owned: String,
extras: Arc<ExtraFieldsCache>,
parent_token: CancellationToken,
counters: Arc<PvCounters>,
) {
if channel.wait_connected(CA_CONNECT_TIMEOUT).await.is_err() {
debug!(
pv = pv_owned,
field = field_owned,
"Extra-field channel did not connect within timeout (will keep retrying via subscribe)"
);
}
let mut backoff = CA_RETRY_DELAY;
let max_backoff = Duration::from_secs(60);
let mut warned_at_cap = false;
loop {
tokio::select! {
_ = parent_token.cancelled() => return,
sub = channel.subscribe() => {
let mut monitor = match sub {
Ok(m) => m,
Err(e) => {
counters.transient_error_count.fetch_add(1, Ordering::Relaxed);
debug!(
pv = pv_owned,
field = field_owned,
?backoff,
"Extra-field subscribe failed: {e}; retrying"
);
if backoff >= max_backoff && !warned_at_cap {
warn!(
pv = pv_owned,
field = field_owned,
"Extra-field repeatedly fails to subscribe; \
check archive_fields config (now retrying every 60s)"
);
warned_at_cap = true;
}
let sleep_for = backoff;
backoff = (backoff * 2).min(max_backoff);
tokio::select! {
_ = parent_token.cancelled() => return,
_ = tokio::time::sleep(sleep_for) => continue,
}
}
};
backoff = CA_RETRY_DELAY;
warned_at_cap = false;
loop {
tokio::select! {
_ = parent_token.cancelled() => return,
ev = monitor.recv() => match ev {
Some(Ok(snapshot)) => {
extras.insert(
field_owned.clone(),
epics_value_to_field_string(&snapshot.value),
);
}
Some(Err(e)) => {
counters.transient_error_count.fetch_add(1, Ordering::Relaxed);
debug!(
pv = pv_owned,
field = field_owned,
"Extra-field monitor error: {e}"
);
}
None => break, }
}
}
}
}
}
}
fn panic_payload_msg(payload: &Box<dyn std::any::Any + Send>) -> String {
if let Some(s) = payload.downcast_ref::<&'static str>() {
(*s).to_string()
} else if let Some(s) = payload.downcast_ref::<String>() {
s.clone()
} else {
"<non-string panic payload>".to_string()
}
}
fn pv_value_field(field: &PvField) -> &PvField {
match field {
PvField::Structure(s) => s.get_field("value").unwrap_or(field),
_ => field,
}
}
fn scalar_value_to_archiver(s: &ScalarValue) -> ArchiverValue {
match s {
ScalarValue::Boolean(v) => ArchiverValue::ScalarEnum(*v as i32),
ScalarValue::Byte(v) => ArchiverValue::ScalarByte(vec![*v as u8]),
ScalarValue::UByte(v) => ArchiverValue::ScalarByte(vec![*v]),
ScalarValue::Short(v) => ArchiverValue::ScalarShort(*v as i32),
ScalarValue::UShort(v) => ArchiverValue::ScalarShort(*v as i32),
ScalarValue::Int(v) => ArchiverValue::ScalarInt(*v),
ScalarValue::UInt(v) => ArchiverValue::ScalarInt(*v as i32),
ScalarValue::Long(v) => ArchiverValue::ScalarInt(*v as i32),
ScalarValue::ULong(v) => ArchiverValue::ScalarInt(*v as i32),
ScalarValue::Float(v) => ArchiverValue::ScalarFloat(*v),
ScalarValue::Double(v) => ArchiverValue::ScalarDouble(*v),
ScalarValue::String(s) => ArchiverValue::ScalarString(s.clone()),
}
}
fn pv_field_scalar_to_archiver(
field: &PvField,
canonical_desc: &epics_rs::pva::pvdata::FieldDesc,
) -> Option<ArchiverValue> {
if let Some((index, _)) = nt_enum_parts(field) {
return Some(ArchiverValue::ScalarEnum(index));
}
match pv_value_field(field) {
PvField::Scalar(s) => Some(scalar_value_to_archiver(s)),
PvField::ScalarArrayTyped(arr) => Some(typed_scalar_array_to_archiver(arr)),
PvField::ScalarArray(items) => {
let st = items.first()?.scalar_type();
let typed = TypedScalarArray::from_scalar_values(items, st)?;
Some(typed_scalar_array_to_archiver(&typed))
}
_ => Some(ArchiverValue::V4GenericBytes(
encode_pv_field_self_describing(field, canonical_desc),
)),
}
}
fn pv_field_to_arch_db_type(field: &PvField) -> Option<(ArchDbType, i32)> {
if nt_enum_parts(field).is_some() {
return Some((ArchDbType::ScalarEnum, 1));
}
let value = pv_value_field(field);
Some(match value {
PvField::Scalar(s) => (scalar_value_to_arch_db_type(s), 1),
PvField::ScalarArray(arr) => {
let elem = arr.first()?;
(
scalar_type_to_waveform(elem.scalar_type()),
arr.len() as i32,
)
}
PvField::ScalarArrayTyped(arr) => {
(scalar_type_to_waveform(arr.scalar_type()), arr.len() as i32)
}
_ => (ArchDbType::V4GenericBytes, 1),
})
}
fn scalar_value_to_arch_db_type(s: &ScalarValue) -> ArchDbType {
match s {
ScalarValue::Boolean(_) => ArchDbType::ScalarEnum,
ScalarValue::Byte(_) | ScalarValue::UByte(_) => ArchDbType::ScalarByte,
ScalarValue::Short(_) | ScalarValue::UShort(_) => ArchDbType::ScalarShort,
ScalarValue::Int(_)
| ScalarValue::UInt(_)
| ScalarValue::Long(_)
| ScalarValue::ULong(_) => ArchDbType::ScalarInt,
ScalarValue::Float(_) => ArchDbType::ScalarFloat,
ScalarValue::Double(_) => ArchDbType::ScalarDouble,
ScalarValue::String(_) => ArchDbType::ScalarString,
}
}
fn scalar_type_to_waveform(st: ScalarType) -> ArchDbType {
match st {
ScalarType::Boolean => ArchDbType::WaveformEnum,
ScalarType::Byte | ScalarType::UByte => ArchDbType::WaveformByte,
ScalarType::Short | ScalarType::UShort => ArchDbType::WaveformShort,
ScalarType::Int | ScalarType::UInt | ScalarType::Long | ScalarType::ULong => {
ArchDbType::WaveformInt
}
ScalarType::Float => ArchDbType::WaveformFloat,
ScalarType::Double => ArchDbType::WaveformDouble,
ScalarType::String => ArchDbType::WaveformString,
}
}
fn typed_scalar_array_to_archiver(arr: &TypedScalarArray) -> ArchiverValue {
match arr {
TypedScalarArray::Boolean(a) => {
ArchiverValue::VectorEnum(a.iter().map(|&b| b as i32).collect())
}
TypedScalarArray::Byte(a) => {
ArchiverValue::VectorChar(a.iter().map(|&v| v as u8).collect())
}
TypedScalarArray::UByte(a) => ArchiverValue::VectorChar(a.to_vec()),
TypedScalarArray::Short(a) => {
ArchiverValue::VectorShort(a.iter().map(|&v| v as i32).collect())
}
TypedScalarArray::UShort(a) => {
ArchiverValue::VectorShort(a.iter().map(|&v| v as i32).collect())
}
TypedScalarArray::Int(a) => ArchiverValue::VectorInt(a.to_vec()),
TypedScalarArray::UInt(a) => {
ArchiverValue::VectorInt(a.iter().map(|&v| v as i32).collect())
}
TypedScalarArray::Long(a) => {
ArchiverValue::VectorInt(a.iter().map(|&v| v as i32).collect())
}
TypedScalarArray::ULong(a) => {
ArchiverValue::VectorInt(a.iter().map(|&v| v as i32).collect())
}
TypedScalarArray::Float(a) => ArchiverValue::VectorFloat(a.to_vec()),
TypedScalarArray::Double(a) => ArchiverValue::VectorDouble(a.to_vec()),
TypedScalarArray::String(a) => ArchiverValue::VectorString(a.to_vec()),
}
}
fn nt_enum_parts(field: &PvField) -> Option<(i32, Option<Vec<String>>)> {
let PvField::Structure(root) = field else {
return None;
};
if !root.struct_id.starts_with("epics:nt/NTEnum") {
return None;
}
let PvField::Structure(inner) = root.get_field("value")? else {
return None;
};
let index = match inner.get_field("index")? {
PvField::Scalar(ScalarValue::Int(v)) => *v,
PvField::Scalar(ScalarValue::Long(v)) => *v as i32,
PvField::Scalar(ScalarValue::Short(v)) => *v as i32,
PvField::Scalar(ScalarValue::UInt(v)) => *v as i32,
PvField::Scalar(ScalarValue::ULong(v)) => *v as i32,
PvField::Scalar(ScalarValue::UShort(v)) => *v as i32,
_ => return None,
};
let choices = match inner.get_field("choices") {
Some(PvField::ScalarArrayTyped(TypedScalarArray::String(arr))) => Some(arr.to_vec()),
Some(PvField::ScalarArray(items)) => {
let mut out = Vec::with_capacity(items.len());
for s in items {
if let ScalarValue::String(c) = s {
out.push(c.clone());
} else {
return None; }
}
Some(out)
}
Some(_) => return None,
None => None,
};
Some((index, choices))
}
fn nt_enum_choices_to_extras(choices: &[String]) -> String {
let mut out = String::with_capacity(2 + choices.iter().map(|s| s.len() + 3).sum::<usize>());
out.push('[');
for (i, c) in choices.iter().enumerate() {
if i > 0 {
out.push(',');
}
out.push('"');
for ch in c.chars() {
match ch {
'"' => out.push_str("\\\""),
'\\' => out.push_str("\\\\"),
'\n' => out.push_str("\\n"),
'\r' => out.push_str("\\r"),
'\t' => out.push_str("\\t"),
c if (c as u32) < 0x20 => {
use std::fmt::Write;
let _ = write!(out, "\\u{:04x}", c as u32);
}
c => out.push(c),
}
}
out.push('"');
}
out.push(']');
out
}
fn refresh_nt_enum_extras(field: &PvField, extras: &ExtraFieldsCache) {
if let Some((_, Some(choices))) = nt_enum_parts(field) {
extras.insert("enum_strs".to_string(), nt_enum_choices_to_extras(&choices));
}
}
fn encode_pv_field_self_describing(
field: &PvField,
canonical_desc: &epics_rs::pva::pvdata::FieldDesc,
) -> Vec<u8> {
let mut out = Vec::with_capacity(256);
encode_type_desc(canonical_desc, ByteOrder::Big, &mut out);
encode_pv_field(field, canonical_desc, ByteOrder::Big, &mut out);
out
}
fn ca_archive_field_to_pva_path(field: &str) -> Option<&'static str> {
Some(match field {
"EGU" => "display.units",
"PREC" => "display.precision",
"DESC" => "display.description",
"HOPR" => "display.limitHigh",
"LOPR" => "display.limitLow",
"HIHI" => "valueAlarm.highAlarmLimit",
"LOLO" => "valueAlarm.lowAlarmLimit",
"HIGH" => "valueAlarm.highWarningLimit",
"LOW" => "valueAlarm.lowWarningLimit",
"HHSV" => "valueAlarm.highAlarmSeverity",
"LLSV" => "valueAlarm.lowAlarmSeverity",
"HSV" => "valueAlarm.highWarningSeverity",
"LSV" => "valueAlarm.lowWarningSeverity",
"HYST" => "valueAlarm.hysteresis",
"DRVH" => "control.limitHigh",
"DRVL" => "control.limitLow",
_ => return None,
})
}
fn pv_field_walk_path<'a>(root: &'a PvField, path: &str) -> Option<&'a PvField> {
let mut current = root;
for segment in path.split('.') {
let PvField::Structure(s) = current else {
return None;
};
current = s.get_field(segment)?;
}
Some(current)
}
fn scalar_value_to_string(s: &ScalarValue) -> String {
match s {
ScalarValue::Boolean(v) => v.to_string(),
ScalarValue::Byte(v) => v.to_string(),
ScalarValue::Short(v) => v.to_string(),
ScalarValue::Int(v) => v.to_string(),
ScalarValue::Long(v) => v.to_string(),
ScalarValue::UByte(v) => v.to_string(),
ScalarValue::UShort(v) => v.to_string(),
ScalarValue::UInt(v) => v.to_string(),
ScalarValue::ULong(v) => v.to_string(),
ScalarValue::Float(v) => v.to_string(),
ScalarValue::Double(v) => v.to_string(),
ScalarValue::String(s) => s.clone(),
}
}
fn pv_field_element_count(field: &PvField) -> i32 {
match pv_value_field(field) {
PvField::Scalar(_) => 1,
PvField::ScalarArray(arr) => arr.len() as i32,
PvField::ScalarArrayTyped(t) => t.len() as i32,
_ => 0,
}
}
fn pv_field_extract_display(field: &PvField) -> (Option<String>, Option<String>) {
let PvField::Structure(s) = field else {
return (None, None);
};
let Some(PvField::Structure(disp)) = s.get_field("display") else {
return (None, None);
};
let prec = match disp.get_field("precision") {
Some(PvField::Scalar(ScalarValue::Int(p))) if *p >= 0 => Some(p.to_string()),
Some(PvField::Scalar(ScalarValue::Short(p))) if *p >= 0 => Some(p.to_string()),
Some(PvField::Scalar(ScalarValue::Long(p))) if *p >= 0 => Some(p.to_string()),
_ => None,
};
let egu = match disp.get_field("units") {
Some(PvField::Scalar(ScalarValue::String(u))) => {
let t = u.trim();
if t.is_empty() {
None
} else {
Some(t.to_string())
}
}
_ => None,
};
(prec, egu)
}
fn pv_field_extract_timestamp(field: &PvField) -> SystemTime {
let PvField::Structure(s) = field else {
return SystemTime::now();
};
let Some(PvField::Structure(ts)) = s.get_field("timeStamp") else {
return SystemTime::now();
};
let secs = match ts.get_field("secondsPastEpoch") {
Some(PvField::Scalar(ScalarValue::Long(v))) => *v as u64,
Some(PvField::Scalar(ScalarValue::ULong(v))) => *v,
Some(PvField::Scalar(ScalarValue::Int(v))) => *v as u64,
Some(PvField::Scalar(ScalarValue::UInt(v))) => *v as u64,
_ => return SystemTime::now(),
};
let nanos = match ts.get_field("nanoseconds") {
Some(PvField::Scalar(ScalarValue::Int(v))) => *v as u32,
Some(PvField::Scalar(ScalarValue::UInt(v))) => *v,
_ => 0,
};
SystemTime::UNIX_EPOCH + Duration::new(secs, nanos)
}
fn dbr_field_to_arch_type(field_type: DbFieldType) -> ArchDbType {
match field_type {
DbFieldType::String => ArchDbType::ScalarString,
DbFieldType::Short => ArchDbType::ScalarShort,
DbFieldType::Float => ArchDbType::ScalarFloat,
DbFieldType::Enum => ArchDbType::ScalarEnum,
DbFieldType::Char => ArchDbType::ScalarByte,
DbFieldType::Long => ArchDbType::ScalarInt,
DbFieldType::Int64 => ArchDbType::ScalarInt,
DbFieldType::Double => ArchDbType::ScalarDouble,
}
}
fn epics_value_to_archiver(val: &EpicsValue) -> ArchiverValue {
match val {
EpicsValue::String(s) => ArchiverValue::ScalarString(s.clone()),
EpicsValue::Short(v) => ArchiverValue::ScalarShort(*v as i32),
EpicsValue::Float(v) => ArchiverValue::ScalarFloat(*v),
EpicsValue::Enum(v) => ArchiverValue::ScalarEnum(*v as i32),
EpicsValue::Char(v) => ArchiverValue::ScalarByte(vec![*v]),
EpicsValue::Long(v) => ArchiverValue::ScalarInt(*v),
EpicsValue::Int64(v) => ArchiverValue::ScalarInt(*v as i32),
EpicsValue::Double(v) => ArchiverValue::ScalarDouble(*v),
EpicsValue::ShortArray(v) => {
ArchiverValue::VectorShort(v.iter().map(|x| *x as i32).collect())
}
EpicsValue::FloatArray(v) => ArchiverValue::VectorFloat(v.clone()),
EpicsValue::EnumArray(v) => {
ArchiverValue::VectorEnum(v.iter().map(|x| *x as i32).collect())
}
EpicsValue::DoubleArray(v) => ArchiverValue::VectorDouble(v.clone()),
EpicsValue::LongArray(v) => ArchiverValue::VectorInt(v.clone()),
EpicsValue::Int64Array(v) => {
ArchiverValue::VectorInt(v.iter().map(|x| *x as i32).collect())
}
EpicsValue::CharArray(v) => ArchiverValue::VectorChar(v.clone()),
EpicsValue::StringArray(v) => ArchiverValue::VectorString(v.clone()),
}
}
#[derive(Debug, Clone)]
pub struct WriteLoopConfig {
pub flush_period: Duration,
pub append_timeout: Duration,
pub flush_timeout: Duration,
pub drain_per_sample_timeout: Duration,
pub drain_total_budget: Duration,
pub shutdown_flush_timeout: Duration,
}
impl Default for WriteLoopConfig {
fn default() -> Self {
Self {
flush_period: Duration::from_secs(10),
append_timeout: Duration::from_secs(30),
flush_timeout: Duration::from_secs(30),
drain_per_sample_timeout: Duration::from_secs(5),
drain_total_budget: Duration::from_secs(30),
shutdown_flush_timeout: Duration::from_secs(15),
}
}
}
#[derive(Debug, Clone)]
pub struct ShardedWritePoolConfig {
pub shards: usize,
pub per_shard_buffer: usize,
pub write_loop: WriteLoopConfig,
}
impl Default for ShardedWritePoolConfig {
fn default() -> Self {
Self {
shards: 1,
per_shard_buffer: 4096,
write_loop: WriteLoopConfig::default(),
}
}
}
use archiver_core::storage::plainpb::shard_for_pv;
pub async fn run_sharded_write_pool(
storage: Arc<dyn StoragePlugin>,
registry: Arc<PvRegistry>,
rx: mpsc::Receiver<PvSample>,
shutdown: tokio::sync::watch::Receiver<bool>,
cfg: ShardedWritePoolConfig,
) {
let n = cfg.shards.max(1);
let pending = Arc::new(PendingReports::new());
let flush_owner_handle = tokio::spawn(flush_owner_loop(
storage.clone(),
registry.clone(),
pending.clone(),
shutdown.clone(),
cfg.write_loop.clone(),
));
if n == 1 {
shard_append_loop(
0,
storage.clone(),
rx,
pending.clone(),
shutdown.clone(),
cfg.write_loop.clone(),
)
.await;
} else {
let mut shard_txs = Vec::with_capacity(n);
let mut shard_handles = Vec::with_capacity(n);
for shard_idx in 0..n {
let (s_tx, s_rx) = mpsc::channel::<PvSample>(cfg.per_shard_buffer);
shard_txs.push(s_tx);
let storage = storage.clone();
let pending = pending.clone();
let shard_shutdown = shutdown.clone();
let shard_cfg = cfg.write_loop.clone();
shard_handles.push(tokio::spawn(shard_append_loop(
shard_idx,
storage,
s_rx,
pending,
shard_shutdown,
shard_cfg,
)));
}
dispatch_loop(rx, shard_txs, shutdown.clone()).await;
for h in shard_handles {
let _ = h.await;
}
}
let _ = flush_owner_handle.await;
}
async fn dispatch_loop(
mut rx: mpsc::Receiver<PvSample>,
shard_txs: Vec<mpsc::Sender<PvSample>>,
mut shutdown: tokio::sync::watch::Receiver<bool>,
) {
let n = shard_txs.len();
info!(shards = n, "Sharded write dispatcher started");
loop {
tokio::select! {
biased;
_ = shutdown.changed() => {
if *shutdown.borrow() {
while let Ok(sample) = rx.try_recv() {
let idx = shard_for_pv(&sample.pv_name, n);
match shard_txs[idx].try_send(sample) {
Ok(()) => {}
Err(mpsc::error::TrySendError::Full(s)) => {
if let Some(c) = s.counters.as_ref() {
c.buffer_overflow_drops
.fetch_add(1, Ordering::Relaxed);
}
metrics::counter!(
"archiver_dispatcher_shard_overflow_drops_total",
"shard" => idx.to_string(),
"phase" => "shutdown_drain",
)
.increment(1);
debug!(
shard = idx,
pv = s.pv_name,
"Shutdown-drain shard channel full; sample dropped"
);
}
Err(mpsc::error::TrySendError::Closed(s)) => {
warn!(
shard = idx,
pv = s.pv_name,
"Shutdown-drain shard channel closed; sample dropped"
);
}
}
}
break;
}
}
maybe = rx.recv() => {
match maybe {
Some(sample) => {
let idx = shard_for_pv(&sample.pv_name, n);
match shard_txs[idx].try_send(sample) {
Ok(()) => {}
Err(mpsc::error::TrySendError::Full(s)) => {
if let Some(c) = s.counters.as_ref() {
c.buffer_overflow_drops
.fetch_add(1, Ordering::Relaxed);
}
metrics::counter!(
"archiver_dispatcher_shard_overflow_drops_total",
"shard" => idx.to_string(),
)
.increment(1);
debug!(
shard = idx,
pv = s.pv_name,
"Shard channel full; sample dropped \
(per-shard isolation)"
);
}
Err(mpsc::error::TrySendError::Closed(s)) => {
warn!(
shard = idx,
pv = s.pv_name,
"Shard channel closed; sample dropped"
);
}
}
}
None => break, }
}
}
}
info!("Sharded write dispatcher exiting");
}
pub async fn write_loop(
storage: Arc<dyn StoragePlugin>,
registry: Arc<PvRegistry>,
rx: mpsc::Receiver<PvSample>,
shutdown: tokio::sync::watch::Receiver<bool>,
flush_period: Duration,
) {
let cfg = WriteLoopConfig {
flush_period,
..Default::default()
};
write_loop_with_config(storage, registry, rx, shutdown, cfg).await
}
struct FlushInFlightGuard {
flag: Arc<std::sync::atomic::AtomicBool>,
}
impl Drop for FlushInFlightGuard {
fn drop(&mut self) {
self.flag.store(false, Ordering::Release);
}
}
async fn run_flush_and_commit(
storage: &Arc<dyn StoragePlugin>,
registry: &Arc<PvRegistry>,
pending: &Arc<PendingReports>,
flush_timeout: Duration,
in_flight: &Arc<std::sync::atomic::AtomicBool>,
) -> bool {
if in_flight.load(Ordering::Acquire) {
return false;
}
let snapshot = pending.snapshot();
if snapshot.is_empty() {
return false;
}
in_flight.store(true, Ordering::Release);
let storage_for_flush = storage.clone();
let in_flight_for_task = in_flight.clone();
let flush_join = tokio::task::spawn_blocking(move || {
let _guard = FlushInFlightGuard {
flag: in_flight_for_task,
};
let rt = tokio::runtime::Handle::current();
rt.block_on(storage_for_flush.flush_ingest_writes())
});
match tokio::time::timeout(flush_timeout, flush_join).await {
Ok(Ok(Ok(IngestFlushResult { failed, deferred }))) => {
if !failed.is_empty() {
pending.remove_failed(&failed);
error!(
"STS flush dropped {} PV(s) from timestamp commit \
(bytes never reached disk): {:?}",
failed.len(),
failed
);
}
if !deferred.is_empty() {
debug!(
"STS flush deferred {} PV(s); keeping in pending \
for next cycle: {:?}",
deferred.len(),
deferred
);
}
let failed_set: std::collections::HashSet<&str> =
failed.iter().map(|s| s.as_str()).collect();
let deferred_set: std::collections::HashSet<&str> =
deferred.iter().map(|s| s.as_str()).collect();
let to_commit: Vec<(&str, SystemTime)> = snapshot
.iter()
.filter(|(pv, _)| {
!failed_set.contains(pv.as_str()) && !deferred_set.contains(pv.as_str())
})
.map(|(pv, ts)| (pv.as_str(), *ts))
.collect();
if to_commit.is_empty() {
return true;
}
match registry.batch_update_timestamps(&to_commit) {
Ok(()) => {
let committed_map: std::collections::HashMap<String, SystemTime> = to_commit
.iter()
.map(|(pv, ts)| ((*pv).to_string(), *ts))
.collect();
pending.remove_committed(&committed_map);
}
Err(e) => {
error!(
"Registry timestamp commit failed; \
keeping {} pending for retry: {e}",
snapshot.len()
);
}
}
}
Ok(Ok(Err(e))) => {
error!(
"STS ingest flush errored; deferring all {} \
pending timestamp commits: {e}",
snapshot.len()
);
}
Ok(Err(join_err)) => {
error!(
"STS ingest flush task panicked; deferring all {} \
pending timestamp commits: {join_err}",
snapshot.len()
);
}
Err(_) => {
metrics::counter!("archiver_storage_flush_timeouts_total").increment(1);
let dropped = snapshot.len();
pending.remove_committed(&snapshot);
error!(
"STS ingest flush timed out after {flush_timeout:?}; \
conservatively dropped {dropped} pending timestamp \
commit(s) (task remains on blocking pool; \
timestamps will be rebuilt from subsequent samples)"
);
}
}
true
}
#[derive(Default)]
pub struct PendingReports {
inner: dashmap::DashMap<String, SystemTime>,
}
impl PendingReports {
pub fn new() -> Self {
Self::default()
}
pub fn report(&self, pv: &str, ts: SystemTime) {
self.inner
.entry(pv.to_string())
.and_modify(|cur| {
if *cur < ts {
*cur = ts;
}
})
.or_insert(ts);
}
pub fn snapshot(&self) -> std::collections::HashMap<String, SystemTime> {
self.inner
.iter()
.map(|kv| (kv.key().clone(), *kv.value()))
.collect()
}
pub fn remove_committed(&self, committed: &std::collections::HashMap<String, SystemTime>) {
for (pv, &committed_ts) in committed {
let _ = self.inner.remove_if(pv, |_, v| *v == committed_ts);
}
}
pub fn remove_failed(&self, failed: &[String]) {
for pv in failed {
let _ = self.inner.remove(pv);
}
}
pub fn is_empty(&self) -> bool {
self.inner.is_empty()
}
pub fn len(&self) -> usize {
self.inner.len()
}
}
pub async fn write_loop_with_config(
storage: Arc<dyn StoragePlugin>,
registry: Arc<PvRegistry>,
rx: mpsc::Receiver<PvSample>,
shutdown: tokio::sync::watch::Receiver<bool>,
cfg: WriteLoopConfig,
) {
let pool_cfg = ShardedWritePoolConfig {
shards: 1,
per_shard_buffer: 4096,
write_loop: cfg,
};
run_sharded_write_pool(storage, registry, rx, shutdown, pool_cfg).await;
}
async fn shard_append_loop(
shard_idx: usize,
storage: Arc<dyn StoragePlugin>,
mut sample_rx: mpsc::Receiver<PvSample>,
pending: Arc<PendingReports>,
mut shutdown: tokio::sync::watch::Receiver<bool>,
cfg: WriteLoopConfig,
) {
let append_timeout = cfg.append_timeout;
info!(shard = shard_idx, "Shard append loop started");
let mut last_ts: std::collections::HashMap<String, SystemTime> =
std::collections::HashMap::new();
let mut last_dbr_type: std::collections::HashMap<String, ArchDbType> =
std::collections::HashMap::new();
loop {
tokio::select! {
Some(pv_sample) = sample_rx.recv() => {
shard_handle_sample(
shard_idx,
&storage,
&pending,
pv_sample,
&mut last_ts,
&mut last_dbr_type,
append_timeout,
)
.await;
}
_ = shutdown.changed() => {
shard_drain_on_shutdown(
shard_idx,
&storage,
&pending,
&mut sample_rx,
&mut last_ts,
&mut last_dbr_type,
&cfg,
)
.await;
break;
}
}
}
info!(shard = shard_idx, "Shard append loop exited");
}
async fn shard_handle_sample(
shard_idx: usize,
storage: &Arc<dyn StoragePlugin>,
pending: &Arc<PendingReports>,
pv_sample: PvSample,
last_ts: &mut std::collections::HashMap<String, SystemTime>,
last_dbr_type: &mut std::collections::HashMap<String, ArchDbType>,
append_timeout: Duration,
) {
let ts = pv_sample.sample.timestamp;
if let Some(prev_ts) = last_ts.get(&pv_sample.pv_name)
&& ts < *prev_ts
{
if let Some(ref c) = pv_sample.counters {
c.timestamp_drops.fetch_add(1, Ordering::Relaxed);
}
debug!(
shard = shard_idx,
pv = pv_sample.pv_name,
?ts,
?prev_ts,
"Dropping out-of-order sample"
);
return;
}
let prev_type = last_dbr_type.insert(pv_sample.pv_name.clone(), pv_sample.dbr_type);
if let Some(prev) = prev_type
&& prev != pv_sample.dbr_type
{
if let Some(ref c) = pv_sample.counters {
c.type_change_drops.fetch_add(1, Ordering::Relaxed);
c.latest_observed_dbr
.store(pv_sample.dbr_type as i32, Ordering::Relaxed);
}
debug!(
shard = shard_idx,
pv = pv_sample.pv_name,
?prev,
new = ?pv_sample.dbr_type,
"Dropping type-changed sample"
);
last_dbr_type.insert(pv_sample.pv_name.clone(), prev);
return;
}
let meta = AppendMeta {
element_count: pv_sample.element_count,
..Default::default()
};
let pv_name_for_post = pv_sample.pv_name.clone();
let counters_for_post = pv_sample.counters.clone();
let counters_in_task = pv_sample.counters.clone();
let storage_for_task = storage.clone();
let pending_for_task = pending.clone();
let join = tokio::task::spawn_blocking(move || {
let rt = tokio::runtime::Handle::current();
let res = rt.block_on(storage_for_task.append_event_with_meta(
&pv_sample.pv_name,
pv_sample.dbr_type,
&pv_sample.sample,
&meta,
));
if res.is_ok() {
metrics::counter!("archiver_events_stored_total").increment(1);
if let Some(c) = counters_in_task.as_ref() {
c.events_stored.fetch_add(1, Ordering::Relaxed);
}
pending_for_task.report(&pv_sample.pv_name, ts);
}
res
});
let res = tokio::time::timeout(append_timeout, join).await;
last_ts.insert(pv_name_for_post.clone(), ts);
match res {
Ok(Ok(Ok(()))) => {
}
Ok(Ok(Err(e))) => {
error!(shard = shard_idx, pv = pv_name_for_post, "Write error: {e}");
}
Ok(Err(join_err)) => {
error!(
shard = shard_idx,
pv = pv_name_for_post,
"Storage write task panicked: {join_err}"
);
}
Err(_) => {
error!(
shard = shard_idx,
pv = pv_name_for_post,
"Storage append timed out after {append_timeout:?}; \
shard abandoning (task remains on blocking pool, \
may still succeed late)"
);
if let Some(ref c) = counters_for_post {
c.storage_append_timeouts.fetch_add(1, Ordering::Relaxed);
}
}
}
}
async fn shard_drain_on_shutdown(
shard_idx: usize,
storage: &Arc<dyn StoragePlugin>,
pending: &Arc<PendingReports>,
sample_rx: &mut mpsc::Receiver<PvSample>,
last_ts: &mut std::collections::HashMap<String, SystemTime>,
last_dbr_type: &mut std::collections::HashMap<String, ArchDbType>,
cfg: &WriteLoopConfig,
) {
let drain_per_sample_timeout = cfg.drain_per_sample_timeout;
let drain_total_budget = cfg.drain_total_budget;
let drain_start = std::time::Instant::now();
let mut drained_skipped = 0usize;
while let Ok(pv_sample) = sample_rx.try_recv() {
if drain_start.elapsed() > drain_total_budget {
drained_skipped += 1;
continue;
}
shard_handle_sample(
shard_idx,
storage,
pending,
pv_sample,
last_ts,
last_dbr_type,
drain_per_sample_timeout,
)
.await;
}
if drained_skipped > 0 {
warn!(
shard = shard_idx,
"Shutdown drain budget ({drain_total_budget:?}) exhausted; \
{drained_skipped} buffered sample(s) abandoned without \
attempting append"
);
}
}
async fn flush_owner_loop(
storage: Arc<dyn StoragePlugin>,
registry: Arc<PvRegistry>,
pending: Arc<PendingReports>,
mut shutdown: tokio::sync::watch::Receiver<bool>,
cfg: WriteLoopConfig,
) {
let flush_period = cfg.flush_period;
let flush_timeout = cfg.flush_timeout;
let shutdown_flush_timeout = cfg.shutdown_flush_timeout;
let drain_total_budget = cfg.drain_total_budget;
let flush_period = if flush_period.is_zero() {
warn!(
"flush_owner: flush_period was Duration::ZERO; \
clamping to 1s to avoid tokio::time::interval panic"
);
Duration::from_secs(1)
} else {
flush_period
};
info!("Flush owner started");
let mut flush_ticker = tokio::time::interval(flush_period);
flush_ticker.set_missed_tick_behavior(tokio::time::MissedTickBehavior::Skip);
let _ = flush_ticker.tick().await;
let flush_in_flight = Arc::new(std::sync::atomic::AtomicBool::new(false));
loop {
tokio::select! {
biased;
_ = shutdown.changed() => {
if *shutdown.borrow() {
break;
}
}
_ = flush_ticker.tick() => {
if !pending.is_empty() {
run_flush_and_commit(
&storage,
®istry,
&pending,
flush_timeout,
&flush_in_flight,
)
.await;
}
}
}
}
let phase2_deadline = std::time::Instant::now() + drain_total_budget;
let min_grace = std::cmp::min(drain_total_budget, Duration::from_millis(200));
if !min_grace.is_zero() {
tokio::time::sleep(min_grace).await;
}
while flush_in_flight.load(Ordering::Acquire) && std::time::Instant::now() < phase2_deadline {
tokio::time::sleep(Duration::from_millis(50)).await;
}
if flush_in_flight.load(Ordering::Acquire) {
warn!(
"Shutdown grace exhausted while a flush is still in flight; \
final flush will be skipped (pending entries left for next \
process restart to rebuild from re-archived samples)"
);
}
info!(
pending_at_shutdown = pending.len(),
"Flush owner running final flush + commit"
);
if !pending.is_empty() {
run_flush_and_commit(
&storage,
®istry,
&pending,
shutdown_flush_timeout,
&flush_in_flight,
)
.await;
}
info!("Flush owner exiting");
}
#[cfg(test)]
mod pva_mapping_tests {
use super::*;
use epics_rs::pva::pvdata::PvStructure;
use epics_rs::pva::pvdata::encode::{decode_pv_field, decode_type_desc};
use std::io::Cursor;
fn make_nttable() -> PvField {
let mut table = PvStructure::new("epics:nt/NTTable:1.0");
let labels =
TypedScalarArray::String(vec!["x".to_string(), "y".to_string()].into_iter().collect());
table
.fields
.push(("labels".into(), PvField::ScalarArrayTyped(labels)));
let mut value_inner = PvStructure::new("");
value_inner.fields.push((
"x".into(),
PvField::ScalarArrayTyped(TypedScalarArray::Double(vec![1.0, 2.0, 3.0].into())),
));
value_inner.fields.push((
"y".into(),
PvField::ScalarArrayTyped(TypedScalarArray::Double(vec![4.0, 5.0, 6.0].into())),
));
table
.fields
.push(("value".into(), PvField::Structure(value_inner)));
PvField::Structure(table)
}
#[test]
fn nttable_classifies_as_v4_generic_bytes() {
let pv = make_nttable();
let (db, ec) = pv_field_to_arch_db_type(&pv).expect("classified");
assert_eq!(db, ArchDbType::V4GenericBytes);
assert_eq!(ec, 1);
}
#[test]
fn nttable_round_trips_through_self_describing_bytes() {
let pv = make_nttable();
let av = pv_field_scalar_to_archiver(&pv, &pv.descriptor()).expect("converted");
let bytes = match av {
ArchiverValue::V4GenericBytes(b) => b,
other => panic!("expected V4GenericBytes, got {:?}", other.db_type()),
};
let mut cur = Cursor::new(bytes.as_slice());
let desc = decode_type_desc(&mut cur, ByteOrder::Big).expect("desc decode");
let decoded = decode_pv_field(&desc, &mut cur, ByteOrder::Big).expect("value decode");
let PvField::Structure(s) = decoded else {
panic!("expected Structure after decode");
};
assert_eq!(s.struct_id, "epics:nt/NTTable:1.0");
let labels = s
.fields
.iter()
.find_map(|(n, f)| if n == "labels" { Some(f) } else { None })
.expect("labels");
let Some(PvField::ScalarArrayTyped(TypedScalarArray::String(arr))) = Some(labels) else {
panic!("labels not a string array");
};
assert_eq!(arr.len(), 2);
let value = s
.fields
.iter()
.find_map(|(n, f)| {
if n == "value" {
if let PvField::Structure(v) = f {
Some(v)
} else {
None
}
} else {
None
}
})
.expect("value substruct");
assert_eq!(value.fields.len(), 2);
}
#[test]
fn nt_scalar_array_double_classifies_as_waveform() {
let mut wrapper = PvStructure::new("epics:nt/NTScalarArray:1.0");
wrapper.fields.push((
"value".into(),
PvField::ScalarArrayTyped(TypedScalarArray::Double(vec![1.5, 2.5, 3.5].into())),
));
let pv = PvField::Structure(wrapper);
let (db, ec) = pv_field_to_arch_db_type(&pv).expect("classified");
assert_eq!(db, ArchDbType::WaveformDouble);
assert_eq!(ec, 3);
let av = pv_field_scalar_to_archiver(&pv, &pv.descriptor()).expect("converted");
match av {
ArchiverValue::VectorDouble(v) => assert_eq!(v, vec![1.5, 2.5, 3.5]),
other => panic!("expected VectorDouble, got {:?}", other.db_type()),
}
}
#[test]
fn nt_enum_classifies_as_scalar_enum() {
let pv = make_nt_enum(2, &["Zero", "One", "Two"]);
let (db, ec) = pv_field_to_arch_db_type(&pv).expect("classified");
assert_eq!(db, ArchDbType::ScalarEnum);
assert_eq!(ec, 1);
}
#[test]
fn nt_enum_value_is_index() {
let pv = make_nt_enum(2, &["Off", "On"]);
let av = pv_field_scalar_to_archiver(&pv, &pv.descriptor()).expect("converted");
match av {
ArchiverValue::ScalarEnum(i) => assert_eq!(i, 2),
other => panic!("expected ScalarEnum, got {:?}", other.db_type()),
}
}
#[test]
fn nt_enum_extras_carries_json_choices() {
let pv = make_nt_enum(1, &["Off", "On", "Trip"]);
let extras: ExtraFieldsCache = DashMap::new();
refresh_nt_enum_extras(&pv, &extras);
let stored = extras
.get("enum_strs")
.map(|s| s.value().clone())
.expect("enum_strs cached");
assert_eq!(stored, r#"["Off","On","Trip"]"#);
}
#[test]
fn union_array_roundtrips_with_canonical_descriptor() {
use epics_rs::pva::pvdata::encode::{decode_pv_field, decode_type_desc};
use epics_rs::pva::pvdata::{FieldDesc, UnionItem};
use std::io::Cursor;
let variants_desc = vec![
("intVal".to_string(), FieldDesc::Scalar(ScalarType::Int)),
("dblVal".to_string(), FieldDesc::Scalar(ScalarType::Double)),
];
let canonical = FieldDesc::UnionArray {
struct_id: String::new(),
variants: variants_desc.clone(),
};
let items = vec![
UnionItem {
selector: 0,
variant_name: "intVal".into(),
value: PvField::Scalar(ScalarValue::Int(42)),
},
UnionItem {
selector: 1,
variant_name: "dblVal".into(),
value: PvField::Scalar(ScalarValue::Double(1.5)),
},
];
let field = PvField::UnionArray(items);
let wire = encode_pv_field_self_describing(&field, &canonical);
let mut cur = Cursor::new(wire.as_slice());
let desc_back = decode_type_desc(&mut cur, ByteOrder::Big).expect("desc");
let val_back = decode_pv_field(&desc_back, &mut cur, ByteOrder::Big).expect("value");
let PvField::UnionArray(items_back) = val_back else {
panic!("expected UnionArray, got {val_back:?}");
};
assert_eq!(items_back.len(), 2);
assert_eq!(items_back[0].selector, 0);
assert!(matches!(
items_back[0].value,
PvField::Scalar(ScalarValue::Int(42))
));
assert_eq!(items_back[1].selector, 1);
match &items_back[1].value {
PvField::Scalar(ScalarValue::Double(d)) => assert!((*d - 1.5).abs() < 1e-9),
other => panic!("variant 1 not Double, got {other:?}"),
}
match desc_back {
FieldDesc::UnionArray { variants, .. } => {
assert_eq!(variants.len(), 2);
assert_eq!(variants[0].0, "intVal");
assert!(matches!(variants[0].1, FieldDesc::Scalar(ScalarType::Int)));
assert_eq!(variants[1].0, "dblVal");
assert!(matches!(
variants[1].1,
FieldDesc::Scalar(ScalarType::Double)
));
}
other => panic!("expected UnionArray descriptor, got {other:?}"),
}
}
#[test]
fn nt_nd_array_union_preserves_all_variants_via_canonical_desc() {
use epics_rs::pva::pvdata::FieldDesc;
use epics_rs::pva::pvdata::encode::{decode_pv_field, decode_type_desc};
use std::io::Cursor;
let variant_specs: &[(&str, ScalarType)] = &[
("booleanValue", ScalarType::Boolean),
("byteValue", ScalarType::Byte),
("shortValue", ScalarType::Short),
("intValue", ScalarType::Int),
("longValue", ScalarType::Long),
("ubyteValue", ScalarType::UByte),
("ushortValue", ScalarType::UShort),
("uintValue", ScalarType::UInt),
("ulongValue", ScalarType::ULong),
("floatValue", ScalarType::Float),
("doubleValue", ScalarType::Double),
];
let union_variants: Vec<(String, FieldDesc)> = variant_specs
.iter()
.map(|(n, st)| (n.to_string(), FieldDesc::ScalarArray(*st)))
.collect();
let canonical = FieldDesc::Structure {
struct_id: "epics:nt/NTNDArray:1.0".into(),
fields: vec![(
"value".into(),
FieldDesc::Union {
struct_id: String::new(),
variants: union_variants.clone(),
},
)],
};
let mut root = PvStructure::new("epics:nt/NTNDArray:1.0");
root.fields.push((
"value".into(),
PvField::Union {
selector: 10,
variant_name: "doubleValue".into(),
value: Box::new(PvField::ScalarArrayTyped(TypedScalarArray::Double(
vec![1.0, 2.0, 3.0].into(),
))),
},
));
let pv = PvField::Structure(root);
let av = pv_field_scalar_to_archiver(&pv, &canonical).expect("converted");
let bytes = match av {
ArchiverValue::V4GenericBytes(b) => b,
other => panic!("expected V4GenericBytes, got {:?}", other.db_type()),
};
let mut cur = Cursor::new(bytes.as_slice());
let desc_back = decode_type_desc(&mut cur, ByteOrder::Big).expect("desc");
let decoded = decode_pv_field(&desc_back, &mut cur, ByteOrder::Big).expect("value");
let PvField::Structure(s) = decoded else {
panic!("expected Structure root");
};
let value = s
.fields
.iter()
.find_map(|(n, f)| if n == "value" { Some(f) } else { None })
.expect("value");
let PvField::Union {
selector,
variant_name,
value: inner,
} = value
else {
panic!("expected Union, got {value:?}");
};
assert_eq!(*selector, 10);
assert_eq!(variant_name, "doubleValue");
let PvField::ScalarArrayTyped(TypedScalarArray::Double(arr)) = inner.as_ref() else {
panic!("expected Double[]");
};
assert_eq!(arr.as_ref(), &[1.0, 2.0, 3.0]);
let value_field_desc = match &desc_back {
FieldDesc::Structure { fields, .. } => fields
.iter()
.find_map(|(n, f)| if n == "value" { Some(f) } else { None })
.expect("value field in desc"),
other => panic!("expected Structure descriptor, got {other:?}"),
};
let FieldDesc::Union {
variants: v_back, ..
} = value_field_desc
else {
panic!("expected Union descriptor for value");
};
assert_eq!(
v_back.len(),
variant_specs.len(),
"all sibling Union variants must survive the wire round-trip — \
this is what value-recovery descriptor would have dropped"
);
for (i, (expected_name, expected_st)) in variant_specs.iter().enumerate() {
assert_eq!(&v_back[i].0, expected_name);
assert!(matches!(v_back[i].1, FieldDesc::ScalarArray(st) if st == *expected_st));
}
}
#[test]
fn structure_without_nt_enum_id_is_v4_bytes() {
let mut inner = PvStructure::new("");
inner
.fields
.push(("index".into(), PvField::Scalar(ScalarValue::Int(1))));
inner.fields.push((
"choices".into(),
PvField::ScalarArrayTyped(TypedScalarArray::String(
vec!["a".to_string(), "b".to_string()].into_iter().collect(),
)),
));
let mut root = PvStructure::new("my:custom/Thing:1.0");
root.fields
.push(("value".into(), PvField::Structure(inner)));
let pv = PvField::Structure(root);
let (db, _) = pv_field_to_arch_db_type(&pv).expect("classified");
assert_eq!(db, ArchDbType::V4GenericBytes);
}
fn make_nt_enum(index: i32, choices: &[&str]) -> PvField {
let mut inner = PvStructure::new("enum_t");
inner
.fields
.push(("index".into(), PvField::Scalar(ScalarValue::Int(index))));
let arr = TypedScalarArray::String(
choices
.iter()
.map(|s| s.to_string())
.collect::<Vec<_>>()
.into(),
);
inner
.fields
.push(("choices".into(), PvField::ScalarArrayTyped(arr)));
let mut root = PvStructure::new("epics:nt/NTEnum:1.0");
root.fields
.push(("value".into(), PvField::Structure(inner)));
PvField::Structure(root)
}
#[test]
fn nt_scalar_double_still_scalar() {
let mut wrapper = PvStructure::new("epics:nt/NTScalar:1.0");
wrapper
.fields
.push(("value".into(), PvField::Scalar(ScalarValue::Double(42.5))));
let pv = PvField::Structure(wrapper);
let (db, ec) = pv_field_to_arch_db_type(&pv).expect("classified");
assert_eq!(db, ArchDbType::ScalarDouble);
assert_eq!(ec, 1);
match pv_field_scalar_to_archiver(&pv, &pv.descriptor()).expect("converted") {
ArchiverValue::ScalarDouble(v) => assert!((v - 42.5).abs() < 1e-9),
other => panic!("expected ScalarDouble, got {:?}", other.db_type()),
}
}
}