use std::path::Path;
use std::sync::Mutex;
use std::time::{Duration, SystemTime};
use chrono::{DateTime, Utc};
use rusqlite::{Connection, OptionalExtension, params};
use tracing::info;
use crate::types::ArchDbType;
#[derive(Debug, Clone, Copy, Default, PartialEq, Eq)]
pub enum Protocol {
#[default]
Ca,
Pva,
}
impl Protocol {
pub fn as_str(self) -> &'static str {
match self {
Protocol::Ca => "ca",
Protocol::Pva => "pva",
}
}
pub fn parse(s: &str) -> Option<Self> {
match s {
"ca" => Some(Protocol::Ca),
"pva" => Some(Protocol::Pva),
_ => None,
}
}
}
pub fn normalize_pv_name(name: &str) -> &str {
let name = name
.strip_prefix("pva://")
.or_else(|| name.strip_prefix("ca://"))
.unwrap_or(name);
name.strip_suffix(".VAL").unwrap_or(name)
}
pub fn parse_pv_with_protocol(name: &str) -> (Protocol, &str) {
if let Some(rest) = name.strip_prefix("pva://") {
(Protocol::Pva, rest.strip_suffix(".VAL").unwrap_or(rest))
} else if let Some(rest) = name.strip_prefix("ca://") {
(Protocol::Ca, rest.strip_suffix(".VAL").unwrap_or(rest))
} else {
(Protocol::Ca, name.strip_suffix(".VAL").unwrap_or(name))
}
}
pub fn strip_field_suffix(name: &str) -> Option<&str> {
let (base, field) = name.rsplit_once('.')?;
if base.is_empty() || field.is_empty() {
return None;
}
if !field
.chars()
.all(|c| c.is_ascii_uppercase() || c.is_ascii_digit() || c == '_')
{
return None;
}
Some(base)
}
pub fn is_valid_pv_name(name: &str) -> bool {
if name.is_empty() || name.len() > 256 {
return false;
}
if name.starts_with('.')
|| name.starts_with('-')
|| name.starts_with('/')
|| name.starts_with(':')
{
return false;
}
for component in name.split([':', '/']) {
if component.is_empty() || component == ".." || component == "." {
return false;
}
}
!name.chars().any(|c| {
c == '\0'
|| c == '\\'
|| c.is_whitespace()
|| c.is_control()
|| matches!(c, '|' | '&' | ';' | '`' | '$' | '"' | '\'' | '*' | '?')
})
}
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
pub enum PvStatus {
Active,
Paused,
Error,
Inactive,
Alias,
}
impl PvStatus {
pub fn as_str(self) -> &'static str {
match self {
Self::Active => "active",
Self::Paused => "paused",
Self::Error => "error",
Self::Inactive => "inactive",
Self::Alias => "alias",
}
}
}
impl std::str::FromStr for PvStatus {
type Err = std::convert::Infallible;
fn from_str(s: &str) -> Result<Self, Self::Err> {
Ok(match s {
"active" => Self::Active,
"paused" => Self::Paused,
"error" => Self::Error,
"inactive" => Self::Inactive,
"alias" => Self::Alias,
_ => Self::Active,
})
}
}
#[derive(Debug, Clone, PartialEq)]
pub enum SampleMode {
Monitor,
Scan { period_secs: f64 },
}
impl SampleMode {
fn to_db(&self) -> (&str, f64) {
match self {
Self::Monitor => ("monitor", 0.0),
Self::Scan { period_secs } => ("scan", *period_secs),
}
}
fn from_db(mode: &str, period: f64) -> Self {
match mode {
"scan" => Self::Scan {
period_secs: period,
},
_ => Self::Monitor,
}
}
}
#[derive(Debug, Clone)]
pub struct PvRecord {
pub pv_name: String,
pub dbr_type: ArchDbType,
pub sample_mode: SampleMode,
pub status: PvStatus,
pub element_count: i32,
pub last_timestamp: Option<SystemTime>,
pub created_at: DateTime<Utc>,
pub updated_at: DateTime<Utc>,
pub prec: Option<String>,
pub egu: Option<String>,
pub alias_for: Option<String>,
pub archive_fields: Vec<String>,
pub policy_name: Option<String>,
pub protocol: Protocol,
}
pub struct PvRegistry {
conn: Mutex<Connection>,
}
impl PvRegistry {
fn lock_conn(&self) -> anyhow::Result<std::sync::MutexGuard<'_, Connection>> {
self.conn
.lock()
.map_err(|e| anyhow::anyhow!("PV registry lock poisoned: {e}"))
}
pub fn open(path: &Path) -> anyhow::Result<Self> {
let conn = Connection::open(path)?;
let registry = Self {
conn: Mutex::new(conn),
};
registry.init_schema()?;
Ok(registry)
}
pub fn in_memory() -> anyhow::Result<Self> {
let conn = Connection::open_in_memory()?;
let registry = Self {
conn: Mutex::new(conn),
};
registry.init_schema()?;
Ok(registry)
}
fn init_schema(&self) -> anyhow::Result<()> {
let conn = self.lock_conn()?;
conn.execute_batch(
"
CREATE TABLE IF NOT EXISTS pv_info (
pv_name TEXT PRIMARY KEY NOT NULL,
dbr_type INTEGER NOT NULL,
sample_mode TEXT NOT NULL DEFAULT 'monitor',
sample_period REAL NOT NULL DEFAULT 0.0,
status TEXT NOT NULL DEFAULT 'active',
element_count INTEGER NOT NULL DEFAULT 1,
last_timestamp TEXT,
created_at TEXT NOT NULL,
updated_at TEXT NOT NULL,
prec TEXT,
egu TEXT,
alias_for TEXT,
archive_fields TEXT,
policy_name TEXT,
protocol TEXT NOT NULL DEFAULT 'ca'
);
CREATE INDEX IF NOT EXISTS idx_pv_status ON pv_info(status);
CREATE INDEX IF NOT EXISTS idx_pv_prefix ON pv_info(pv_name COLLATE NOCASE);
",
)?;
for stmt in [
"ALTER TABLE pv_info ADD COLUMN prec TEXT",
"ALTER TABLE pv_info ADD COLUMN egu TEXT",
"ALTER TABLE pv_info ADD COLUMN alias_for TEXT",
"ALTER TABLE pv_info ADD COLUMN archive_fields TEXT",
"ALTER TABLE pv_info ADD COLUMN policy_name TEXT",
"ALTER TABLE pv_info ADD COLUMN protocol TEXT NOT NULL DEFAULT 'ca'",
] {
match conn.execute(stmt, []) {
Ok(_) => {}
Err(e) if is_duplicate_column_error(&e) => {}
Err(e) => return Err(e.into()),
}
}
conn.execute_batch(
"CREATE INDEX IF NOT EXISTS idx_pv_alias \
ON pv_info(alias_for) WHERE alias_for IS NOT NULL;",
)?;
info!("PV registry schema initialized");
Ok(())
}
pub fn register_pv(
&self,
pv_name: &str,
dbr_type: ArchDbType,
sample_mode: &SampleMode,
element_count: i32,
) -> anyhow::Result<()> {
self.register_pv_with_protocol(pv_name, dbr_type, sample_mode, element_count, Protocol::Ca)
}
pub fn register_pv_with_protocol(
&self,
pv_name: &str,
dbr_type: ArchDbType,
sample_mode: &SampleMode,
element_count: i32,
protocol: Protocol,
) -> anyhow::Result<()> {
if !is_valid_pv_name(pv_name) {
anyhow::bail!("invalid PV name: {pv_name:?}");
}
let conn = self.lock_conn()?;
let now = Utc::now().to_rfc3339();
let (mode_str, period) = sample_mode.to_db();
conn.execute(
"INSERT OR REPLACE INTO pv_info
(pv_name, dbr_type, sample_mode, sample_period, status, element_count, created_at, updated_at, protocol)
VALUES (?1, ?2, ?3, ?4, 'active', ?5, COALESCE((SELECT created_at FROM pv_info WHERE pv_name = ?1), ?6), ?6, ?7)",
params![pv_name, dbr_type as i32, mode_str, period, element_count, now, protocol.as_str()],
)?;
Ok(())
}
pub fn set_status(&self, pv_name: &str, status: PvStatus) -> anyhow::Result<bool> {
let conn = self.lock_conn()?;
let now = Utc::now().to_rfc3339();
let rows = conn.execute(
"UPDATE pv_info SET status = ?1, updated_at = ?2 WHERE pv_name = ?3",
params![status.as_str(), now, pv_name],
)?;
Ok(rows > 0)
}
pub fn update_last_timestamp(
&self,
pv_name: &str,
timestamp: SystemTime,
) -> anyhow::Result<()> {
let conn = self.lock_conn()?;
let dt = DateTime::<Utc>::from(timestamp).to_rfc3339();
let now = Utc::now().to_rfc3339();
conn.execute(
"UPDATE pv_info SET last_timestamp = ?1, updated_at = ?2 WHERE pv_name = ?3",
params![dt, now, pv_name],
)?;
Ok(())
}
pub fn remove_pv(&self, pv_name: &str) -> anyhow::Result<bool> {
let conn = self.lock_conn()?;
let rows = conn.execute("DELETE FROM pv_info WHERE pv_name = ?1", params![pv_name])?;
Ok(rows > 0)
}
pub fn get_pv(&self, pv_name: &str) -> anyhow::Result<Option<PvRecord>> {
let conn = self.lock_conn()?;
conn.query_row(
"SELECT pv_name, dbr_type, sample_mode, sample_period, status, element_count,
last_timestamp, created_at, updated_at, prec, egu,
alias_for, archive_fields, policy_name, protocol
FROM pv_info WHERE pv_name = ?1",
params![pv_name],
row_to_record,
)
.optional()
.map_err(Into::into)
}
pub fn all_pv_names(&self) -> anyhow::Result<Vec<String>> {
let conn = self.lock_conn()?;
let mut stmt =
conn.prepare("SELECT pv_name FROM pv_info WHERE alias_for IS NULL ORDER BY pv_name")?;
let names = stmt
.query_map([], |row| row.get(0))?
.collect::<Result<Vec<String>, _>>()?;
Ok(names)
}
pub fn pvs_by_status(&self, status: PvStatus) -> anyhow::Result<Vec<PvRecord>> {
let conn = self.lock_conn()?;
let mut stmt = conn.prepare(
"SELECT pv_name, dbr_type, sample_mode, sample_period, status, element_count,
last_timestamp, created_at, updated_at, prec, egu,
alias_for, archive_fields, policy_name, protocol
FROM pv_info WHERE status = ?1 ORDER BY pv_name",
)?;
let records = stmt
.query_map(params![status.as_str()], row_to_record)?
.collect::<Result<Vec<_>, _>>()?;
Ok(records)
}
pub fn matching_pvs(&self, pattern: &str) -> anyhow::Result<Vec<String>> {
let conn = self.lock_conn()?;
let mut stmt = conn.prepare(
"SELECT pv_name FROM pv_info
WHERE pv_name GLOB ?1 AND alias_for IS NULL
ORDER BY pv_name",
)?;
let names = stmt
.query_map(params![pattern], |row| row.get(0))?
.collect::<Result<Vec<String>, _>>()?;
Ok(names)
}
pub fn matching_pvs_expanded(&self, pattern: &str) -> anyhow::Result<Vec<String>> {
let conn = self.lock_conn()?;
let mut stmt =
conn.prepare("SELECT pv_name FROM pv_info WHERE pv_name GLOB ?1 ORDER BY pv_name")?;
let names = stmt
.query_map(params![pattern], |row| row.get(0))?
.collect::<Result<Vec<String>, _>>()?;
Ok(names)
}
pub fn count(&self, status: Option<PvStatus>) -> anyhow::Result<u64> {
let conn = self.lock_conn()?;
let count: u64 = match status {
Some(s) => conn.query_row(
"SELECT COUNT(*) FROM pv_info
WHERE status = ?1 AND alias_for IS NULL",
params![s.as_str()],
|row| row.get(0),
)?,
None => conn.query_row(
"SELECT COUNT(*) FROM pv_info WHERE alias_for IS NULL",
[],
|row| row.get(0),
)?,
};
Ok(count)
}
pub fn batch_update_timestamps(&self, updates: &[(&str, SystemTime)]) -> anyhow::Result<()> {
let mut conn = self.lock_conn()?;
let tx = conn.transaction()?;
let now = Utc::now().to_rfc3339();
{
let mut stmt = tx.prepare(
"UPDATE pv_info SET last_timestamp = ?1, updated_at = ?2 WHERE pv_name = ?3",
)?;
for (pv_name, ts) in updates {
let dt = DateTime::<Utc>::from(*ts).to_rfc3339();
stmt.execute(params![dt, now, pv_name])?;
}
}
tx.commit()?;
Ok(())
}
pub fn recently_added_pvs(&self, since: SystemTime) -> anyhow::Result<Vec<PvRecord>> {
let conn = self.lock_conn()?;
let since_str = DateTime::<Utc>::from(since).to_rfc3339();
let mut stmt = conn.prepare(
"SELECT pv_name, dbr_type, sample_mode, sample_period, status, element_count,
last_timestamp, created_at, updated_at, prec, egu,
alias_for, archive_fields, policy_name, protocol
FROM pv_info WHERE created_at >= ?1 AND alias_for IS NULL
ORDER BY created_at DESC",
)?;
let records = stmt
.query_map(params![since_str], row_to_record)?
.collect::<Result<Vec<_>, _>>()?;
Ok(records)
}
pub fn recently_modified_pvs(&self, since: SystemTime) -> anyhow::Result<Vec<PvRecord>> {
let conn = self.lock_conn()?;
let since_str = DateTime::<Utc>::from(since).to_rfc3339();
let mut stmt = conn.prepare(
"SELECT pv_name, dbr_type, sample_mode, sample_period, status, element_count,
last_timestamp, created_at, updated_at, prec, egu,
alias_for, archive_fields, policy_name, protocol
FROM pv_info WHERE updated_at >= ?1 AND alias_for IS NULL
ORDER BY updated_at DESC",
)?;
let records = stmt
.query_map(params![since_str], row_to_record)?
.collect::<Result<Vec<_>, _>>()?;
Ok(records)
}
pub fn update_sample_mode(&self, pv_name: &str, mode: &SampleMode) -> anyhow::Result<bool> {
let conn = self.lock_conn()?;
let now = Utc::now().to_rfc3339();
let (mode_str, period) = mode.to_db();
let rows = conn.execute(
"UPDATE pv_info SET sample_mode = ?1, sample_period = ?2, updated_at = ?3 WHERE pv_name = ?4",
params![mode_str, period, now, pv_name],
)?;
Ok(rows > 0)
}
pub fn update_metadata(
&self,
pv_name: &str,
prec: Option<&str>,
egu: Option<&str>,
) -> anyhow::Result<bool> {
let conn = self.lock_conn()?;
let now = Utc::now().to_rfc3339();
let rows = conn.execute(
"UPDATE pv_info SET prec = COALESCE(?1, prec), egu = COALESCE(?2, egu), updated_at = ?3 WHERE pv_name = ?4",
params![prec, egu, now, pv_name],
)?;
Ok(rows > 0)
}
#[allow(clippy::too_many_arguments)]
pub fn import_pv(
&self,
pv_name: &str,
dbr_type: ArchDbType,
sample_mode: &SampleMode,
element_count: i32,
status: PvStatus,
created_at: Option<&str>,
prec: Option<&str>,
egu: Option<&str>,
alias_for: Option<&str>,
archive_fields: &[String],
policy_name: Option<&str>,
) -> anyhow::Result<()> {
self.import_pv_with_protocol(
pv_name,
dbr_type,
sample_mode,
element_count,
status,
created_at,
prec,
egu,
alias_for,
archive_fields,
policy_name,
Protocol::Ca,
)
}
#[allow(clippy::too_many_arguments)]
pub fn import_pv_with_protocol(
&self,
pv_name: &str,
dbr_type: ArchDbType,
sample_mode: &SampleMode,
element_count: i32,
status: PvStatus,
created_at: Option<&str>,
prec: Option<&str>,
egu: Option<&str>,
alias_for: Option<&str>,
archive_fields: &[String],
policy_name: Option<&str>,
protocol: Protocol,
) -> anyhow::Result<()> {
if !is_valid_pv_name(pv_name) {
anyhow::bail!("invalid PV name: {pv_name:?}");
}
if let Some(target) = alias_for
&& !is_valid_pv_name(target)
{
anyhow::bail!("invalid alias target: {target:?}");
}
let conn = self.lock_conn()?;
let now = Utc::now().to_rfc3339();
let (mode_str, period) = sample_mode.to_db();
let created = created_at.unwrap_or(&now);
let archive_fields_json = if archive_fields.is_empty() {
None
} else {
Some(serde_json::to_string(archive_fields)?)
};
conn.execute(
"INSERT OR REPLACE INTO pv_info
(pv_name, dbr_type, sample_mode, sample_period, status, element_count,
created_at, updated_at, prec, egu, alias_for, archive_fields, policy_name, protocol)
VALUES (?1, ?2, ?3, ?4, ?5, ?6, ?7, ?8, ?9, ?10, ?11, ?12, ?13, ?14)",
params![
pv_name,
dbr_type as i32,
mode_str,
period,
status.as_str(),
element_count,
created,
now,
prec,
egu,
alias_for,
archive_fields_json,
policy_name,
protocol.as_str(),
],
)?;
Ok(())
}
pub fn update_archive_fields(&self, pv_name: &str, fields: &[String]) -> anyhow::Result<bool> {
let conn = self.lock_conn()?;
let now = Utc::now().to_rfc3339();
let json = if fields.is_empty() {
None
} else {
Some(serde_json::to_string(fields)?)
};
let rows = conn.execute(
"UPDATE pv_info SET archive_fields = ?1, updated_at = ?2 WHERE pv_name = ?3",
params![json, now, pv_name],
)?;
Ok(rows > 0)
}
pub fn update_policy_name(
&self,
pv_name: &str,
policy_name: Option<&str>,
) -> anyhow::Result<bool> {
let conn = self.lock_conn()?;
let now = Utc::now().to_rfc3339();
let rows = conn.execute(
"UPDATE pv_info SET policy_name = ?1, updated_at = ?2 WHERE pv_name = ?3",
params![policy_name, now, pv_name],
)?;
Ok(rows > 0)
}
pub fn add_alias(&self, alias: &str, target: &str) -> anyhow::Result<()> {
if alias == target {
anyhow::bail!("alias and target must differ");
}
if !is_valid_pv_name(alias) {
anyhow::bail!("invalid alias name: {alias:?}");
}
if !is_valid_pv_name(target) {
anyhow::bail!("invalid alias target: {target:?}");
}
let conn = self.lock_conn()?;
let row: Option<(i32, String, f64, i32, Option<String>)> = conn
.query_row(
"SELECT dbr_type, sample_mode, sample_period, element_count, alias_for
FROM pv_info WHERE pv_name = ?1",
params![target],
|r| Ok((r.get(0)?, r.get(1)?, r.get(2)?, r.get(3)?, r.get(4)?)),
)
.optional()?;
let (dbr_type, mode, period, ec, target_alias) =
row.ok_or_else(|| anyhow::anyhow!("target PV '{target}' not found"))?;
if target_alias.is_some() {
anyhow::bail!(
"target PV '{target}' is itself an alias; aliases of aliases are not allowed"
);
}
let existing: Option<Option<String>> = conn
.query_row(
"SELECT alias_for FROM pv_info WHERE pv_name = ?1",
params![alias],
|r| r.get(0),
)
.optional()?;
if let Some(existing_alias) = existing {
if existing_alias.as_deref() == Some(target) {
return Ok(()); }
anyhow::bail!("'{alias}' already exists in registry");
}
let now = Utc::now().to_rfc3339();
conn.execute(
"INSERT INTO pv_info
(pv_name, dbr_type, sample_mode, sample_period, status, element_count,
created_at, updated_at, alias_for)
VALUES (?1, ?2, ?3, ?4, 'alias', ?5, ?6, ?6, ?7)",
params![alias, dbr_type, mode, period, ec, now, target],
)?;
Ok(())
}
pub fn remove_alias(&self, alias: &str) -> anyhow::Result<bool> {
let conn = self.lock_conn()?;
let rows = conn.execute(
"DELETE FROM pv_info WHERE pv_name = ?1 AND alias_for IS NOT NULL",
params![alias],
)?;
Ok(rows > 0)
}
pub fn resolve_alias(&self, name: &str) -> anyhow::Result<Option<String>> {
let conn = self.lock_conn()?;
let row: Option<Option<String>> = conn
.query_row(
"SELECT alias_for FROM pv_info WHERE pv_name = ?1",
params![name],
|r| r.get(0),
)
.optional()?;
Ok(row.flatten())
}
pub fn canonical_name(&self, name: &str) -> anyhow::Result<String> {
Ok(self
.resolve_alias(name)?
.unwrap_or_else(|| name.to_string()))
}
pub fn aliases_for(&self, target: &str) -> anyhow::Result<Vec<String>> {
let conn = self.lock_conn()?;
let mut stmt =
conn.prepare("SELECT pv_name FROM pv_info WHERE alias_for = ?1 ORDER BY pv_name")?;
let names = stmt
.query_map(params![target], |row| row.get(0))?
.collect::<Result<Vec<String>, _>>()?;
Ok(names)
}
pub fn all_aliases(&self) -> anyhow::Result<Vec<(String, String)>> {
let conn = self.lock_conn()?;
let mut stmt = conn.prepare(
"SELECT pv_name, alias_for FROM pv_info
WHERE alias_for IS NOT NULL ORDER BY pv_name",
)?;
let rows = stmt
.query_map([], |row| {
Ok((row.get::<_, String>(0)?, row.get::<_, String>(1)?))
})?
.collect::<Result<Vec<_>, _>>()?;
Ok(rows)
}
pub fn expanded_pv_names(&self) -> anyhow::Result<Vec<String>> {
let conn = self.lock_conn()?;
let mut stmt = conn.prepare("SELECT pv_name FROM pv_info ORDER BY pv_name")?;
let names = stmt
.query_map([], |row| row.get(0))?
.collect::<Result<Vec<String>, _>>()?;
Ok(names)
}
pub fn all_records(&self) -> anyhow::Result<Vec<PvRecord>> {
let conn = self.lock_conn()?;
let mut stmt = conn.prepare(
"SELECT pv_name, dbr_type, sample_mode, sample_period, status, element_count,
last_timestamp, created_at, updated_at, prec, egu,
alias_for, archive_fields, policy_name, protocol
FROM pv_info ORDER BY pv_name",
)?;
let records = stmt
.query_map([], row_to_record)?
.collect::<Result<Vec<_>, _>>()?;
Ok(records)
}
pub fn silent_pvs(&self, threshold: Duration) -> anyhow::Result<Vec<PvRecord>> {
let conn = self.lock_conn()?;
let cutoff = SystemTime::now()
.checked_sub(threshold)
.unwrap_or(SystemTime::UNIX_EPOCH);
let cutoff_str = DateTime::<Utc>::from(cutoff).to_rfc3339();
let mut stmt = conn.prepare(
"SELECT pv_name, dbr_type, sample_mode, sample_period, status, element_count,
last_timestamp, created_at, updated_at, prec, egu,
alias_for, archive_fields, policy_name, protocol
FROM pv_info WHERE last_timestamp IS NOT NULL AND last_timestamp < ?1
AND alias_for IS NULL
ORDER BY last_timestamp ASC",
)?;
let records = stmt
.query_map(params![cutoff_str], row_to_record)?
.collect::<Result<Vec<_>, _>>()?;
Ok(records)
}
}
fn row_to_record(row: &rusqlite::Row) -> rusqlite::Result<PvRecord> {
let pv_name: String = row.get(0)?;
let dbr_type_i: i32 = row.get(1)?;
let sample_mode_str: String = row.get(2)?;
let sample_period: f64 = row.get(3)?;
let status_str: String = row.get(4)?;
let element_count: i32 = row.get(5)?;
let last_ts_str: Option<String> = row.get(6)?;
let created_str: String = row.get(7)?;
let updated_str: String = row.get(8)?;
let prec: Option<String> = row.get(9).unwrap_or(None);
let egu: Option<String> = row.get(10).unwrap_or(None);
let alias_for: Option<String> = row.get(11).unwrap_or(None);
let archive_fields_json: Option<String> = row.get(12).unwrap_or(None);
let policy_name: Option<String> = row.get(13).unwrap_or(None);
let protocol_str: Option<String> = row.get(14).unwrap_or(None);
let protocol = protocol_str
.as_deref()
.and_then(Protocol::parse)
.unwrap_or_default();
let last_timestamp = last_ts_str.and_then(|s| {
DateTime::parse_from_rfc3339(&s)
.ok()
.map(|dt| dt.with_timezone(&Utc).into())
});
let archive_fields = archive_fields_json
.as_deref()
.and_then(|s| serde_json::from_str::<Vec<String>>(s).ok())
.unwrap_or_default();
Ok(PvRecord {
pv_name,
dbr_type: ArchDbType::from_i32(dbr_type_i).unwrap_or(ArchDbType::ScalarDouble),
sample_mode: SampleMode::from_db(&sample_mode_str, sample_period),
status: status_str.parse().unwrap_or(PvStatus::Active),
element_count,
last_timestamp,
created_at: DateTime::parse_from_rfc3339(&created_str)
.map(|dt| dt.with_timezone(&Utc))
.unwrap_or_else(|_| Utc::now()),
updated_at: DateTime::parse_from_rfc3339(&updated_str)
.map(|dt| dt.with_timezone(&Utc))
.unwrap_or_else(|_| Utc::now()),
prec,
egu,
alias_for,
archive_fields,
policy_name,
protocol,
})
}
fn is_duplicate_column_error(e: &rusqlite::Error) -> bool {
matches!(
e,
rusqlite::Error::SqliteFailure(_, Some(msg))
if msg.starts_with("duplicate column name")
)
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn invalid_pv_names_rejected() {
assert!(!is_valid_pv_name("../etc/passwd"));
assert!(!is_valid_pv_name("foo/../bar"));
assert!(!is_valid_pv_name("foo:..:bar"));
assert!(!is_valid_pv_name("foo/./bar"));
assert!(!is_valid_pv_name("/etc/passwd"));
assert!(!is_valid_pv_name(":SIM:foo"));
assert!(!is_valid_pv_name(":foo"));
assert!(!is_valid_pv_name("foo::bar"));
assert!(!is_valid_pv_name("foo//bar"));
assert!(!is_valid_pv_name("foo:"));
assert!(!is_valid_pv_name("foo/"));
assert!(!is_valid_pv_name("foo;rm -rf /"));
assert!(!is_valid_pv_name("foo|bar"));
assert!(!is_valid_pv_name("foo`x`"));
assert!(!is_valid_pv_name("foo$BAR"));
assert!(!is_valid_pv_name("foo bar"));
assert!(!is_valid_pv_name("foo\nbar"));
assert!(!is_valid_pv_name(""));
assert!(!is_valid_pv_name(".hidden"));
assert!(!is_valid_pv_name("-leading-dash"));
assert!(!is_valid_pv_name(&"x".repeat(257)));
}
#[test]
fn valid_pv_names_accepted() {
assert!(is_valid_pv_name("SIM:Sine"));
assert!(is_valid_pv_name("XF:31IDA-OP{Tbl-Ax:X1}Mtr"));
assert!(is_valid_pv_name("ACC1-001-RFCAV-01:V<x>"));
assert!(is_valid_pv_name("PV.HIHI"));
assert!(is_valid_pv_name("BL_X+Y"));
assert!(is_valid_pv_name("a"));
assert!(is_valid_pv_name(&"x".repeat(256)));
}
#[test]
fn strip_field_suffix_basics() {
assert_eq!(strip_field_suffix("BASE.HIHI"), Some("BASE"));
assert_eq!(strip_field_suffix("BASE.LOLO"), Some("BASE"));
assert_eq!(strip_field_suffix("FOO.BAR_99"), Some("FOO"));
assert_eq!(strip_field_suffix("BASE"), None);
assert_eq!(strip_field_suffix("BASE.hihi"), None);
assert_eq!(strip_field_suffix("BASE.Hihi"), None);
assert_eq!(strip_field_suffix(".HIHI"), None);
assert_eq!(strip_field_suffix("BASE."), None);
}
#[test]
fn test_register_and_get() {
let reg = PvRegistry::in_memory().unwrap();
reg.register_pv(
"SIM:Sine",
ArchDbType::ScalarDouble,
&SampleMode::Monitor,
1,
)
.unwrap();
let record = reg.get_pv("SIM:Sine").unwrap().unwrap();
assert_eq!(record.pv_name, "SIM:Sine");
assert_eq!(record.dbr_type, ArchDbType::ScalarDouble);
assert_eq!(record.status, PvStatus::Active);
}
#[test]
fn test_status_transitions() {
let reg = PvRegistry::in_memory().unwrap();
reg.register_pv(
"SIM:Test",
ArchDbType::ScalarDouble,
&SampleMode::Monitor,
1,
)
.unwrap();
reg.set_status("SIM:Test", PvStatus::Paused).unwrap();
let r = reg.get_pv("SIM:Test").unwrap().unwrap();
assert_eq!(r.status, PvStatus::Paused);
reg.set_status("SIM:Test", PvStatus::Active).unwrap();
let r = reg.get_pv("SIM:Test").unwrap().unwrap();
assert_eq!(r.status, PvStatus::Active);
}
#[test]
fn test_pattern_matching() {
let reg = PvRegistry::in_memory().unwrap();
reg.register_pv(
"SIM:Sine",
ArchDbType::ScalarDouble,
&SampleMode::Monitor,
1,
)
.unwrap();
reg.register_pv(
"SIM:Cosine",
ArchDbType::ScalarDouble,
&SampleMode::Monitor,
1,
)
.unwrap();
reg.register_pv(
"EXP:BL1:run:active",
ArchDbType::ScalarEnum,
&SampleMode::Monitor,
1,
)
.unwrap();
reg.register_pv(
"EXP:BL1:motor:th:readback",
ArchDbType::ScalarDouble,
&SampleMode::Monitor,
1,
)
.unwrap();
let sim = reg.matching_pvs("SIM:*").unwrap();
assert_eq!(sim.len(), 2);
let exp = reg.matching_pvs("EXP:BL1:*").unwrap();
assert_eq!(exp.len(), 2);
let motor = reg.matching_pvs("EXP:*:motor:*").unwrap();
assert_eq!(motor.len(), 1);
}
#[test]
fn test_count_and_list() {
let reg = PvRegistry::in_memory().unwrap();
for i in 0..100 {
reg.register_pv(
&format!("PV:Test:{i:04}"),
ArchDbType::ScalarDouble,
&SampleMode::Monitor,
1,
)
.unwrap();
}
assert_eq!(reg.count(None).unwrap(), 100);
assert_eq!(reg.count(Some(PvStatus::Active)).unwrap(), 100);
let names = reg.all_pv_names().unwrap();
assert_eq!(names.len(), 100);
}
#[test]
fn test_remove_pv() {
let reg = PvRegistry::in_memory().unwrap();
reg.register_pv(
"SIM:Gone",
ArchDbType::ScalarDouble,
&SampleMode::Monitor,
1,
)
.unwrap();
assert!(reg.get_pv("SIM:Gone").unwrap().is_some());
reg.remove_pv("SIM:Gone").unwrap();
assert!(reg.get_pv("SIM:Gone").unwrap().is_none());
}
#[test]
fn test_batch_update_timestamps() {
let reg = PvRegistry::in_memory().unwrap();
reg.register_pv("PV:A", ArchDbType::ScalarDouble, &SampleMode::Monitor, 1)
.unwrap();
reg.register_pv("PV:B", ArchDbType::ScalarDouble, &SampleMode::Monitor, 1)
.unwrap();
let now = SystemTime::now();
reg.batch_update_timestamps(&[("PV:A", now), ("PV:B", now)])
.unwrap();
let a = reg.get_pv("PV:A").unwrap().unwrap();
assert!(a.last_timestamp.is_some());
}
#[test]
fn test_recently_added_pvs() {
let reg = PvRegistry::in_memory().unwrap();
let before = SystemTime::now() - Duration::from_secs(1);
reg.register_pv("PV:New", ArchDbType::ScalarDouble, &SampleMode::Monitor, 1)
.unwrap();
let recent = reg.recently_added_pvs(before).unwrap();
assert_eq!(recent.len(), 1);
assert_eq!(recent[0].pv_name, "PV:New");
let future = SystemTime::now() + Duration::from_secs(3600);
let none = reg.recently_added_pvs(future).unwrap();
assert!(none.is_empty());
}
#[test]
fn test_recently_modified_pvs() {
let reg = PvRegistry::in_memory().unwrap();
reg.register_pv("PV:Mod", ArchDbType::ScalarDouble, &SampleMode::Monitor, 1)
.unwrap();
let before = SystemTime::now() - Duration::from_secs(1);
reg.set_status("PV:Mod", PvStatus::Paused).unwrap();
let recent = reg.recently_modified_pvs(before).unwrap();
assert!(recent.iter().any(|r| r.pv_name == "PV:Mod"));
}
#[test]
fn test_update_sample_mode() {
let reg = PvRegistry::in_memory().unwrap();
reg.register_pv("PV:Mode", ArchDbType::ScalarDouble, &SampleMode::Monitor, 1)
.unwrap();
let new_mode = SampleMode::Scan { period_secs: 5.0 };
assert!(reg.update_sample_mode("PV:Mode", &new_mode).unwrap());
let r = reg.get_pv("PV:Mode").unwrap().unwrap();
assert_eq!(r.sample_mode, SampleMode::Scan { period_secs: 5.0 });
}
#[test]
fn test_archive_fields_roundtrip() {
let reg = PvRegistry::in_memory().unwrap();
reg.register_pv(
"PV:Fields",
ArchDbType::ScalarDouble,
&SampleMode::Monitor,
1,
)
.unwrap();
let r = reg.get_pv("PV:Fields").unwrap().unwrap();
assert!(r.archive_fields.is_empty());
let fields = vec!["HIHI".to_string(), "LOLO".to_string(), "EGU".to_string()];
assert!(reg.update_archive_fields("PV:Fields", &fields).unwrap());
let r = reg.get_pv("PV:Fields").unwrap().unwrap();
assert_eq!(r.archive_fields, fields);
assert!(reg.update_archive_fields("PV:Fields", &[]).unwrap());
let r = reg.get_pv("PV:Fields").unwrap().unwrap();
assert!(r.archive_fields.is_empty());
}
#[test]
fn test_policy_name_roundtrip() {
let reg = PvRegistry::in_memory().unwrap();
reg.register_pv("PV:Pol", ArchDbType::ScalarDouble, &SampleMode::Monitor, 1)
.unwrap();
let r = reg.get_pv("PV:Pol").unwrap().unwrap();
assert!(r.policy_name.is_none());
assert!(reg.update_policy_name("PV:Pol", Some("fast")).unwrap());
let r = reg.get_pv("PV:Pol").unwrap().unwrap();
assert_eq!(r.policy_name.as_deref(), Some("fast"));
assert!(reg.update_policy_name("PV:Pol", None).unwrap());
let r = reg.get_pv("PV:Pol").unwrap().unwrap();
assert!(r.policy_name.is_none());
}
#[test]
fn test_import_pv_with_alias_and_fields() {
let reg = PvRegistry::in_memory().unwrap();
let fields = vec!["HIHI".to_string(), "LOLO".to_string()];
reg.import_pv(
"PV:Aliased",
ArchDbType::ScalarDouble,
&SampleMode::Monitor,
1,
PvStatus::Active,
None,
Some("3"),
Some("mA"),
Some("PV:Real"),
&fields,
Some("ring"),
)
.unwrap();
let r = reg.get_pv("PV:Aliased").unwrap().unwrap();
assert_eq!(r.alias_for.as_deref(), Some("PV:Real"));
assert_eq!(r.archive_fields, fields);
assert_eq!(r.policy_name.as_deref(), Some("ring"));
assert_eq!(r.prec.as_deref(), Some("3"));
assert_eq!(r.egu.as_deref(), Some("mA"));
}
#[test]
fn test_migration_from_old_schema() {
let conn = Connection::open_in_memory().unwrap();
conn.execute_batch(
"CREATE TABLE pv_info (
pv_name TEXT PRIMARY KEY NOT NULL,
dbr_type INTEGER NOT NULL,
sample_mode TEXT NOT NULL DEFAULT 'monitor',
sample_period REAL NOT NULL DEFAULT 0.0,
status TEXT NOT NULL DEFAULT 'active',
element_count INTEGER NOT NULL DEFAULT 1,
last_timestamp TEXT,
created_at TEXT NOT NULL,
updated_at TEXT NOT NULL,
prec TEXT,
egu TEXT
);",
)
.unwrap();
let now = Utc::now().to_rfc3339();
conn.execute(
"INSERT INTO pv_info
(pv_name, dbr_type, sample_mode, sample_period, status, element_count,
created_at, updated_at, prec, egu)
VALUES (?1, ?2, 'monitor', 0.0, 'active', 1, ?3, ?3, NULL, NULL)",
params!["PV:Legacy", ArchDbType::ScalarDouble as i32, now],
)
.unwrap();
let reg = PvRegistry {
conn: Mutex::new(conn),
};
reg.init_schema().unwrap();
let r = reg.get_pv("PV:Legacy").unwrap().unwrap();
assert_eq!(r.pv_name, "PV:Legacy");
assert!(r.alias_for.is_none());
assert!(r.archive_fields.is_empty());
assert!(r.policy_name.is_none());
assert!(
reg.update_archive_fields("PV:Legacy", &["HIHI".to_string()])
.unwrap()
);
let r = reg.get_pv("PV:Legacy").unwrap().unwrap();
assert_eq!(r.archive_fields, vec!["HIHI".to_string()]);
}
#[test]
fn test_aliases_basic() {
let reg = PvRegistry::in_memory().unwrap();
reg.register_pv(
"RING:Current",
ArchDbType::ScalarDouble,
&SampleMode::Monitor,
1,
)
.unwrap();
reg.add_alias("DEV:Current", "RING:Current").unwrap();
assert_eq!(
reg.resolve_alias("DEV:Current").unwrap().as_deref(),
Some("RING:Current"),
);
assert!(reg.resolve_alias("RING:Current").unwrap().is_none()); assert!(reg.resolve_alias("Nonexistent").unwrap().is_none());
assert_eq!(reg.canonical_name("DEV:Current").unwrap(), "RING:Current");
assert_eq!(reg.canonical_name("RING:Current").unwrap(), "RING:Current");
assert_eq!(
reg.aliases_for("RING:Current").unwrap(),
vec!["DEV:Current".to_string()],
);
assert_eq!(
reg.all_aliases().unwrap(),
vec![("DEV:Current".to_string(), "RING:Current".to_string())],
);
let expanded = reg.expanded_pv_names().unwrap();
assert!(expanded.contains(&"RING:Current".to_string()));
assert!(expanded.contains(&"DEV:Current".to_string()));
reg.add_alias("DEV:Current", "RING:Current").unwrap();
assert_eq!(reg.aliases_for("RING:Current").unwrap().len(), 1);
assert!(reg.remove_alias("DEV:Current").unwrap());
assert!(reg.resolve_alias("DEV:Current").unwrap().is_none());
assert!(!reg.remove_alias("DEV:Current").unwrap()); }
#[test]
fn test_alias_conflicts() {
let reg = PvRegistry::in_memory().unwrap();
reg.register_pv("PV:A", ArchDbType::ScalarDouble, &SampleMode::Monitor, 1)
.unwrap();
reg.register_pv("PV:B", ArchDbType::ScalarDouble, &SampleMode::Monitor, 1)
.unwrap();
assert!(reg.add_alias("Alias:X", "Nonexistent").is_err());
assert!(reg.add_alias("PV:A", "PV:A").is_err());
assert!(reg.add_alias("PV:B", "PV:A").is_err());
reg.add_alias("Alias:A", "PV:A").unwrap();
assert!(reg.add_alias("Alias:Two", "Alias:A").is_err());
assert!(!reg.remove_alias("PV:A").unwrap());
assert!(reg.get_pv("PV:A").unwrap().is_some());
}
#[test]
fn test_silent_pvs() {
let reg = PvRegistry::in_memory().unwrap();
reg.register_pv(
"PV:Silent",
ArchDbType::ScalarDouble,
&SampleMode::Monitor,
1,
)
.unwrap();
reg.register_pv(
"PV:NoData",
ArchDbType::ScalarDouble,
&SampleMode::Monitor,
1,
)
.unwrap();
let old_time = SystemTime::now() - Duration::from_secs(7200);
reg.update_last_timestamp("PV:Silent", old_time).unwrap();
let silent = reg.silent_pvs(Duration::from_secs(3600)).unwrap();
assert_eq!(silent.len(), 1);
assert_eq!(silent[0].pv_name, "PV:Silent");
}
}