Skip to main content

archiver_core/
registry.rs

1//! PV metadata registry backed by SQLite.
2//!
3//! Persists PV archiving configuration and state across restarts.
4//! This is metadata only — time-series data lives in PlainPB files.
5
6use std::path::Path;
7use std::sync::Mutex;
8use std::time::{Duration, SystemTime};
9
10use chrono::{DateTime, Utc};
11use rusqlite::{Connection, OptionalExtension, params};
12use tracing::info;
13
14use crate::types::ArchDbType;
15
16/// Canonicalize a user-supplied PV name to the form the registry stores.
17///
18/// Java archiver's `PVNames.normalizeChannelName` + `stripPrefixFromName`:
19/// - `pva://X` and `ca://X` are protocol hints, not part of the channel name
20/// - `X.VAL` and `X` refer to the same EPICS record (`.VAL` is the default field)
21///
22/// Without this, `archivePV?pv=PV1.VAL` and `archivePV?pv=PV1` would create
23/// two separate registry rows that subscribe to the same IOC channel.
24pub fn normalize_pv_name(name: &str) -> &str {
25    let name = name
26        .strip_prefix("pva://")
27        .or_else(|| name.strip_prefix("ca://"))
28        .unwrap_or(name);
29    name.strip_suffix(".VAL").unwrap_or(name)
30}
31
32/// Strip a trailing `.<FIELD>` suffix (e.g. `.HIHI`, `.LOLO`, `.DESC`) from
33/// a PV name, returning the bare PV name. Java's
34/// `PVNames.stripFieldNameFromPVName` (c150faad/5b2a7cb4): a query for
35/// `BASE.HIHI` should fall back to typeinfo for `BASE` so handlers can
36/// surface the field-archived metadata. Returns `None` if there is no
37/// field suffix or the suffix contains characters that aren't valid in
38/// EPICS record-field names (uppercase letters / digits / underscores).
39pub fn strip_field_suffix(name: &str) -> Option<&str> {
40    let (base, field) = name.rsplit_once('.')?;
41    if base.is_empty() || field.is_empty() {
42        return None;
43    }
44    if !field
45        .chars()
46        .all(|c| c.is_ascii_uppercase() || c.is_ascii_digit() || c == '_')
47    {
48        return None;
49    }
50    Some(base)
51}
52
53/// Reject PV names that, when mapped to a filesystem path, could escape
54/// the storage root or do other naughty things. Java archiver enforces a
55/// `[A-Za-z0-9_:.\-+\[\]<>;]` allowlist via `PVNames.isValidPVName`; we
56/// instead use a conservative blocklist (no `..`, no `/`, no NUL, no
57/// leading `.`, no leading-`-`, no whitespace) — strict enough to keep
58/// `pv_name_to_key` from generating an absolute or traversal path, but
59/// permissive enough not to break existing site naming conventions
60/// like `SIM:Sine` and `IOC[A]:val<x>`.
61///
62/// Called by every registry entry-point (register / import / add_alias)
63/// AND by `pv_name_to_key` as defense in depth, so any code path that
64/// builds a filesystem name from a PV string fails closed if the
65/// HTTP-layer validation is bypassed.
66pub fn is_valid_pv_name(name: &str) -> bool {
67    if name.is_empty() || name.len() > 256 {
68        return false;
69    }
70    // Leading separator → after pv_name_to_key the result begins with
71    // `/`, and `Path::join(root, "/abs/path")` ignores `root` entirely
72    // (Rust semantics) → escapes the storage root. Same for the `.` /
73    // `-` cases.
74    if name.starts_with('.')
75        || name.starts_with('-')
76        || name.starts_with('/')
77        || name.starts_with(':')
78    {
79        return false;
80    }
81    for component in name.split([':', '/']) {
82        // `..` / `.` are obvious traversal. An empty segment (`A::B`,
83        // `A//B`, trailing `:`/`/`) maps to a `//` in the filesystem
84        // path which most OSes collapse but some path-relative tools
85        // re-split, so just reject.
86        if component.is_empty() || component == ".." || component == "." {
87            return false;
88        }
89    }
90    !name.chars().any(|c| {
91        c == '\0'
92            || c == '\\'
93            || c.is_whitespace()
94            || c.is_control()
95            // Reject backslash and other path-shell metacharacters that
96            // could surprise downstream tools that re-parse the name.
97            || matches!(c, '|' | '&' | ';' | '`' | '$' | '"' | '\'' | '*' | '?')
98    })
99}
100
101/// PV archiving status.
102///
103/// `Alias` is a sentinel applied to alias rows so they don't appear in
104/// status-filtered queries (`pvs_by_status(Active)`, getPVCount, restore
105/// loops). Aliases are routing entries, not archive subjects.
106#[derive(Debug, Clone, Copy, PartialEq, Eq)]
107pub enum PvStatus {
108    Active,
109    Paused,
110    Error,
111    Inactive,
112    Alias,
113}
114
115impl PvStatus {
116    pub fn as_str(self) -> &'static str {
117        match self {
118            Self::Active => "active",
119            Self::Paused => "paused",
120            Self::Error => "error",
121            Self::Inactive => "inactive",
122            Self::Alias => "alias",
123        }
124    }
125}
126
127impl std::str::FromStr for PvStatus {
128    type Err = std::convert::Infallible;
129
130    fn from_str(s: &str) -> Result<Self, Self::Err> {
131        Ok(match s {
132            "active" => Self::Active,
133            "paused" => Self::Paused,
134            "error" => Self::Error,
135            "inactive" => Self::Inactive,
136            "alias" => Self::Alias,
137            _ => Self::Active,
138        })
139    }
140}
141
142/// Sampling mode stored in the registry.
143#[derive(Debug, Clone, PartialEq)]
144pub enum SampleMode {
145    Monitor,
146    Scan { period_secs: f64 },
147}
148
149impl SampleMode {
150    fn to_db(&self) -> (&str, f64) {
151        match self {
152            Self::Monitor => ("monitor", 0.0),
153            Self::Scan { period_secs } => ("scan", *period_secs),
154        }
155    }
156
157    fn from_db(mode: &str, period: f64) -> Self {
158        match mode {
159            "scan" => Self::Scan {
160                period_secs: period,
161            },
162            _ => Self::Monitor,
163        }
164    }
165}
166
167/// A PV record in the registry.
168#[derive(Debug, Clone)]
169pub struct PvRecord {
170    pub pv_name: String,
171    pub dbr_type: ArchDbType,
172    pub sample_mode: SampleMode,
173    pub status: PvStatus,
174    pub element_count: i32,
175    pub last_timestamp: Option<SystemTime>,
176    pub created_at: DateTime<Utc>,
177    pub updated_at: DateTime<Utc>,
178    pub prec: Option<String>,
179    pub egu: Option<String>,
180    /// When set, this row is an alias pointing at another PV name.
181    /// Lookups should resolve to that target before reading/writing data.
182    pub alias_for: Option<String>,
183    /// Names of EPICS metadata fields (e.g. ["HIHI","LOLO","EGU"]) that the
184    /// engine should sample alongside the main value and attach to events.
185    pub archive_fields: Vec<String>,
186    /// Name of the policy that selected this PV's sampling configuration.
187    pub policy_name: Option<String>,
188}
189
190/// SQLite-backed PV metadata registry.
191pub struct PvRegistry {
192    conn: Mutex<Connection>,
193}
194
195impl PvRegistry {
196    fn lock_conn(&self) -> anyhow::Result<std::sync::MutexGuard<'_, Connection>> {
197        self.conn
198            .lock()
199            .map_err(|e| anyhow::anyhow!("PV registry lock poisoned: {e}"))
200    }
201
202    /// Open (or create) the registry database at the given path.
203    pub fn open(path: &Path) -> anyhow::Result<Self> {
204        let conn = Connection::open(path)?;
205        let registry = Self {
206            conn: Mutex::new(conn),
207        };
208        registry.init_schema()?;
209        Ok(registry)
210    }
211
212    /// Create an in-memory registry (for testing).
213    pub fn in_memory() -> anyhow::Result<Self> {
214        let conn = Connection::open_in_memory()?;
215        let registry = Self {
216            conn: Mutex::new(conn),
217        };
218        registry.init_schema()?;
219        Ok(registry)
220    }
221
222    fn init_schema(&self) -> anyhow::Result<()> {
223        let conn = self.lock_conn()?;
224        // Step 1: create the table (idempotent) and indexes that don't depend
225        // on columns added by later migrations.
226        conn.execute_batch(
227            "
228            CREATE TABLE IF NOT EXISTS pv_info (
229                pv_name         TEXT PRIMARY KEY NOT NULL,
230                dbr_type        INTEGER NOT NULL,
231                sample_mode     TEXT NOT NULL DEFAULT 'monitor',
232                sample_period   REAL NOT NULL DEFAULT 0.0,
233                status          TEXT NOT NULL DEFAULT 'active',
234                element_count   INTEGER NOT NULL DEFAULT 1,
235                last_timestamp  TEXT,
236                created_at      TEXT NOT NULL,
237                updated_at      TEXT NOT NULL,
238                prec            TEXT,
239                egu             TEXT,
240                alias_for       TEXT,
241                archive_fields  TEXT,
242                policy_name     TEXT
243            );
244
245            CREATE INDEX IF NOT EXISTS idx_pv_status ON pv_info(status);
246            CREATE INDEX IF NOT EXISTS idx_pv_prefix ON pv_info(pv_name COLLATE NOCASE);
247            ",
248        )?;
249        // Step 2: migrations for tables created with older schemas. Each statement
250        // runs independently because SQLite stops on the first duplicate-column
251        // error when batched. Only "duplicate column" errors are silently
252        // accepted; disk-full / lock / corruption errors must surface.
253        for stmt in [
254            "ALTER TABLE pv_info ADD COLUMN prec TEXT",
255            "ALTER TABLE pv_info ADD COLUMN egu TEXT",
256            "ALTER TABLE pv_info ADD COLUMN alias_for TEXT",
257            "ALTER TABLE pv_info ADD COLUMN archive_fields TEXT",
258            "ALTER TABLE pv_info ADD COLUMN policy_name TEXT",
259        ] {
260            match conn.execute(stmt, []) {
261                Ok(_) => {}
262                Err(e) if is_duplicate_column_error(&e) => {}
263                Err(e) => return Err(e.into()),
264            }
265        }
266        // Step 3: indexes that reference newly-added columns. Done after ALTER
267        // so an upgraded database has the columns to index.
268        conn.execute_batch(
269            "CREATE INDEX IF NOT EXISTS idx_pv_alias \
270             ON pv_info(alias_for) WHERE alias_for IS NOT NULL;",
271        )?;
272        info!("PV registry schema initialized");
273        Ok(())
274    }
275
276    /// Register a new PV for archiving.
277    pub fn register_pv(
278        &self,
279        pv_name: &str,
280        dbr_type: ArchDbType,
281        sample_mode: &SampleMode,
282        element_count: i32,
283    ) -> anyhow::Result<()> {
284        if !is_valid_pv_name(pv_name) {
285            anyhow::bail!("invalid PV name: {pv_name:?}");
286        }
287        let conn = self.lock_conn()?;
288        let now = Utc::now().to_rfc3339();
289        let (mode_str, period) = sample_mode.to_db();
290
291        conn.execute(
292            "INSERT OR REPLACE INTO pv_info
293             (pv_name, dbr_type, sample_mode, sample_period, status, element_count, created_at, updated_at)
294             VALUES (?1, ?2, ?3, ?4, 'active', ?5, COALESCE((SELECT created_at FROM pv_info WHERE pv_name = ?1), ?6), ?6)",
295            params![pv_name, dbr_type as i32, mode_str, period, element_count, now],
296        )?;
297        Ok(())
298    }
299
300    /// Update PV status (active, paused, error).
301    pub fn set_status(&self, pv_name: &str, status: PvStatus) -> anyhow::Result<bool> {
302        let conn = self.lock_conn()?;
303        let now = Utc::now().to_rfc3339();
304        let rows = conn.execute(
305            "UPDATE pv_info SET status = ?1, updated_at = ?2 WHERE pv_name = ?3",
306            params![status.as_str(), now, pv_name],
307        )?;
308        Ok(rows > 0)
309    }
310
311    /// Update the last known timestamp for a PV.
312    pub fn update_last_timestamp(
313        &self,
314        pv_name: &str,
315        timestamp: SystemTime,
316    ) -> anyhow::Result<()> {
317        let conn = self.lock_conn()?;
318        let dt = DateTime::<Utc>::from(timestamp).to_rfc3339();
319        let now = Utc::now().to_rfc3339();
320        conn.execute(
321            "UPDATE pv_info SET last_timestamp = ?1, updated_at = ?2 WHERE pv_name = ?3",
322            params![dt, now, pv_name],
323        )?;
324        Ok(())
325    }
326
327    /// Remove a PV from the registry entirely.
328    pub fn remove_pv(&self, pv_name: &str) -> anyhow::Result<bool> {
329        let conn = self.lock_conn()?;
330        let rows = conn.execute("DELETE FROM pv_info WHERE pv_name = ?1", params![pv_name])?;
331        Ok(rows > 0)
332    }
333
334    /// Get a single PV record.
335    pub fn get_pv(&self, pv_name: &str) -> anyhow::Result<Option<PvRecord>> {
336        let conn = self.lock_conn()?;
337        conn.query_row(
338            "SELECT pv_name, dbr_type, sample_mode, sample_period, status, element_count,
339                    last_timestamp, created_at, updated_at, prec, egu,
340                    alias_for, archive_fields, policy_name
341             FROM pv_info WHERE pv_name = ?1",
342            params![pv_name],
343            row_to_record,
344        )
345        .optional()
346        .map_err(Into::into)
347    }
348
349    /// List all real PV names (alias rows excluded). Use
350    /// [`Self::expanded_pv_names`] to include aliases.
351    pub fn all_pv_names(&self) -> anyhow::Result<Vec<String>> {
352        let conn = self.lock_conn()?;
353        let mut stmt =
354            conn.prepare("SELECT pv_name FROM pv_info WHERE alias_for IS NULL ORDER BY pv_name")?;
355        let names = stmt
356            .query_map([], |row| row.get(0))?
357            .collect::<Result<Vec<String>, _>>()?;
358        Ok(names)
359    }
360
361    /// List all real PVs with a given status. Alias rows have
362    /// `status='alias'` and are excluded from every other-status query.
363    pub fn pvs_by_status(&self, status: PvStatus) -> anyhow::Result<Vec<PvRecord>> {
364        let conn = self.lock_conn()?;
365        let mut stmt = conn.prepare(
366            "SELECT pv_name, dbr_type, sample_mode, sample_period, status, element_count,
367                    last_timestamp, created_at, updated_at, prec, egu,
368                    alias_for, archive_fields, policy_name
369             FROM pv_info WHERE status = ?1 ORDER BY pv_name",
370        )?;
371        let records = stmt
372            .query_map(params![status.as_str()], row_to_record)?
373            .collect::<Result<Vec<_>, _>>()?;
374        Ok(records)
375    }
376
377    /// Match real PV names by glob pattern (SQL GLOB). Excludes aliases.
378    pub fn matching_pvs(&self, pattern: &str) -> anyhow::Result<Vec<String>> {
379        let conn = self.lock_conn()?;
380        let mut stmt = conn.prepare(
381            "SELECT pv_name FROM pv_info
382             WHERE pv_name GLOB ?1 AND alias_for IS NULL
383             ORDER BY pv_name",
384        )?;
385        let names = stmt
386            .query_map(params![pattern], |row| row.get(0))?
387            .collect::<Result<Vec<String>, _>>()?;
388        Ok(names)
389    }
390
391    /// Match PV names by glob pattern, INCLUDING alias rows. Java's
392    /// `getMatchingPVs` returns aliases too (c61f1579) — without this an
393    /// admin walking the inventory by glob silently misses every alias.
394    /// Internal callers that only want real PVs (engine, reports) stay
395    /// on `matching_pvs`.
396    pub fn matching_pvs_expanded(&self, pattern: &str) -> anyhow::Result<Vec<String>> {
397        let conn = self.lock_conn()?;
398        let mut stmt =
399            conn.prepare("SELECT pv_name FROM pv_info WHERE pv_name GLOB ?1 ORDER BY pv_name")?;
400        let names = stmt
401            .query_map(params![pattern], |row| row.get(0))?
402            .collect::<Result<Vec<String>, _>>()?;
403        Ok(names)
404    }
405
406    /// Count real PVs, optionally filtered by status. Excludes aliases.
407    pub fn count(&self, status: Option<PvStatus>) -> anyhow::Result<u64> {
408        let conn = self.lock_conn()?;
409        let count: u64 = match status {
410            Some(s) => conn.query_row(
411                "SELECT COUNT(*) FROM pv_info
412                 WHERE status = ?1 AND alias_for IS NULL",
413                params![s.as_str()],
414                |row| row.get(0),
415            )?,
416            None => conn.query_row(
417                "SELECT COUNT(*) FROM pv_info WHERE alias_for IS NULL",
418                [],
419                |row| row.get(0),
420            )?,
421        };
422        Ok(count)
423    }
424
425    /// Batch update last timestamps (for periodic flush).
426    pub fn batch_update_timestamps(&self, updates: &[(&str, SystemTime)]) -> anyhow::Result<()> {
427        let mut conn = self.lock_conn()?;
428        let tx = conn.transaction()?;
429        let now = Utc::now().to_rfc3339();
430        {
431            let mut stmt = tx.prepare(
432                "UPDATE pv_info SET last_timestamp = ?1, updated_at = ?2 WHERE pv_name = ?3",
433            )?;
434            for (pv_name, ts) in updates {
435                let dt = DateTime::<Utc>::from(*ts).to_rfc3339();
436                stmt.execute(params![dt, now, pv_name])?;
437            }
438        }
439        tx.commit()?;
440        Ok(())
441    }
442
443    /// Get real PVs added since a given time (aliases excluded).
444    pub fn recently_added_pvs(&self, since: SystemTime) -> anyhow::Result<Vec<PvRecord>> {
445        let conn = self.lock_conn()?;
446        let since_str = DateTime::<Utc>::from(since).to_rfc3339();
447        let mut stmt = conn.prepare(
448            "SELECT pv_name, dbr_type, sample_mode, sample_period, status, element_count,
449                    last_timestamp, created_at, updated_at, prec, egu,
450                    alias_for, archive_fields, policy_name
451             FROM pv_info WHERE created_at >= ?1 AND alias_for IS NULL
452             ORDER BY created_at DESC",
453        )?;
454        let records = stmt
455            .query_map(params![since_str], row_to_record)?
456            .collect::<Result<Vec<_>, _>>()?;
457        Ok(records)
458    }
459
460    /// Get real PVs modified since a given time (aliases excluded).
461    pub fn recently_modified_pvs(&self, since: SystemTime) -> anyhow::Result<Vec<PvRecord>> {
462        let conn = self.lock_conn()?;
463        let since_str = DateTime::<Utc>::from(since).to_rfc3339();
464        let mut stmt = conn.prepare(
465            "SELECT pv_name, dbr_type, sample_mode, sample_period, status, element_count,
466                    last_timestamp, created_at, updated_at, prec, egu,
467                    alias_for, archive_fields, policy_name
468             FROM pv_info WHERE updated_at >= ?1 AND alias_for IS NULL
469             ORDER BY updated_at DESC",
470        )?;
471        let records = stmt
472            .query_map(params![since_str], row_to_record)?
473            .collect::<Result<Vec<_>, _>>()?;
474        Ok(records)
475    }
476
477    /// Update the sample mode and period for a PV.
478    pub fn update_sample_mode(&self, pv_name: &str, mode: &SampleMode) -> anyhow::Result<bool> {
479        let conn = self.lock_conn()?;
480        let now = Utc::now().to_rfc3339();
481        let (mode_str, period) = mode.to_db();
482        let rows = conn.execute(
483            "UPDATE pv_info SET sample_mode = ?1, sample_period = ?2, updated_at = ?3 WHERE pv_name = ?4",
484            params![mode_str, period, now, pv_name],
485        )?;
486        Ok(rows > 0)
487    }
488
489    /// Update PREC and EGU metadata for a PV.
490    pub fn update_metadata(
491        &self,
492        pv_name: &str,
493        prec: Option<&str>,
494        egu: Option<&str>,
495    ) -> anyhow::Result<bool> {
496        let conn = self.lock_conn()?;
497        let now = Utc::now().to_rfc3339();
498        let rows = conn.execute(
499            "UPDATE pv_info SET prec = COALESCE(?1, prec), egu = COALESCE(?2, egu), updated_at = ?3 WHERE pv_name = ?4",
500            params![prec, egu, now, pv_name],
501        )?;
502        Ok(rows > 0)
503    }
504
505    /// Import a PV with all fields in a single SQL operation.
506    /// Used during config import to atomically set status, created_at, and metadata.
507    #[allow(clippy::too_many_arguments)]
508    pub fn import_pv(
509        &self,
510        pv_name: &str,
511        dbr_type: ArchDbType,
512        sample_mode: &SampleMode,
513        element_count: i32,
514        status: PvStatus,
515        created_at: Option<&str>,
516        prec: Option<&str>,
517        egu: Option<&str>,
518        alias_for: Option<&str>,
519        archive_fields: &[String],
520        policy_name: Option<&str>,
521    ) -> anyhow::Result<()> {
522        if !is_valid_pv_name(pv_name) {
523            anyhow::bail!("invalid PV name: {pv_name:?}");
524        }
525        if let Some(target) = alias_for
526            && !is_valid_pv_name(target)
527        {
528            anyhow::bail!("invalid alias target: {target:?}");
529        }
530        let conn = self.lock_conn()?;
531        let now = Utc::now().to_rfc3339();
532        let (mode_str, period) = sample_mode.to_db();
533        let created = created_at.unwrap_or(&now);
534        let archive_fields_json = if archive_fields.is_empty() {
535            None
536        } else {
537            Some(serde_json::to_string(archive_fields)?)
538        };
539
540        conn.execute(
541            "INSERT OR REPLACE INTO pv_info
542             (pv_name, dbr_type, sample_mode, sample_period, status, element_count,
543              created_at, updated_at, prec, egu, alias_for, archive_fields, policy_name)
544             VALUES (?1, ?2, ?3, ?4, ?5, ?6, ?7, ?8, ?9, ?10, ?11, ?12, ?13)",
545            params![
546                pv_name,
547                dbr_type as i32,
548                mode_str,
549                period,
550                status.as_str(),
551                element_count,
552                created,
553                now,
554                prec,
555                egu,
556                alias_for,
557                archive_fields_json,
558                policy_name,
559            ],
560        )?;
561        Ok(())
562    }
563
564    /// Set or clear the archive_fields list for a PV.
565    pub fn update_archive_fields(&self, pv_name: &str, fields: &[String]) -> anyhow::Result<bool> {
566        let conn = self.lock_conn()?;
567        let now = Utc::now().to_rfc3339();
568        let json = if fields.is_empty() {
569            None
570        } else {
571            Some(serde_json::to_string(fields)?)
572        };
573        let rows = conn.execute(
574            "UPDATE pv_info SET archive_fields = ?1, updated_at = ?2 WHERE pv_name = ?3",
575            params![json, now, pv_name],
576        )?;
577        Ok(rows > 0)
578    }
579
580    /// Set or clear the policy_name on a PV.
581    pub fn update_policy_name(
582        &self,
583        pv_name: &str,
584        policy_name: Option<&str>,
585    ) -> anyhow::Result<bool> {
586        let conn = self.lock_conn()?;
587        let now = Utc::now().to_rfc3339();
588        let rows = conn.execute(
589            "UPDATE pv_info SET policy_name = ?1, updated_at = ?2 WHERE pv_name = ?3",
590            params![policy_name, now, pv_name],
591        )?;
592        Ok(rows > 0)
593    }
594
595    /// Add an alias `alias` that points at the existing PV `target`.
596    /// Fails if target does not exist or if `alias` already maps elsewhere.
597    /// The alias row mirrors target's dbr_type/sample_mode/element_count for
598    /// display, but `alias_for` distinguishes it from real PVs.
599    pub fn add_alias(&self, alias: &str, target: &str) -> anyhow::Result<()> {
600        if alias == target {
601            anyhow::bail!("alias and target must differ");
602        }
603        if !is_valid_pv_name(alias) {
604            anyhow::bail!("invalid alias name: {alias:?}");
605        }
606        if !is_valid_pv_name(target) {
607            anyhow::bail!("invalid alias target: {target:?}");
608        }
609        let conn = self.lock_conn()?;
610        // Resolve target (must be a real PV, not itself an alias).
611        let row: Option<(i32, String, f64, i32, Option<String>)> = conn
612            .query_row(
613                "SELECT dbr_type, sample_mode, sample_period, element_count, alias_for
614                 FROM pv_info WHERE pv_name = ?1",
615                params![target],
616                |r| Ok((r.get(0)?, r.get(1)?, r.get(2)?, r.get(3)?, r.get(4)?)),
617            )
618            .optional()?;
619        let (dbr_type, mode, period, ec, target_alias) =
620            row.ok_or_else(|| anyhow::anyhow!("target PV '{target}' not found"))?;
621        if target_alias.is_some() {
622            anyhow::bail!(
623                "target PV '{target}' is itself an alias; aliases of aliases are not allowed"
624            );
625        }
626        // Reject if `alias` already exists either as a real PV or different alias.
627        let existing: Option<Option<String>> = conn
628            .query_row(
629                "SELECT alias_for FROM pv_info WHERE pv_name = ?1",
630                params![alias],
631                |r| r.get(0),
632            )
633            .optional()?;
634        if let Some(existing_alias) = existing {
635            if existing_alias.as_deref() == Some(target) {
636                return Ok(()); // idempotent
637            }
638            anyhow::bail!("'{alias}' already exists in registry");
639        }
640        let now = Utc::now().to_rfc3339();
641        conn.execute(
642            "INSERT INTO pv_info
643             (pv_name, dbr_type, sample_mode, sample_period, status, element_count,
644              created_at, updated_at, alias_for)
645             VALUES (?1, ?2, ?3, ?4, 'alias', ?5, ?6, ?6, ?7)",
646            params![alias, dbr_type, mode, period, ec, now, target],
647        )?;
648        Ok(())
649    }
650
651    /// Remove an alias row. Returns true if removed, false if the row was not
652    /// an alias or did not exist. Real PVs are not removed by this method.
653    pub fn remove_alias(&self, alias: &str) -> anyhow::Result<bool> {
654        let conn = self.lock_conn()?;
655        let rows = conn.execute(
656            "DELETE FROM pv_info WHERE pv_name = ?1 AND alias_for IS NOT NULL",
657            params![alias],
658        )?;
659        Ok(rows > 0)
660    }
661
662    /// If `name` is an alias, return its target. Returns None if `name` is
663    /// already a real PV or does not exist.
664    pub fn resolve_alias(&self, name: &str) -> anyhow::Result<Option<String>> {
665        let conn = self.lock_conn()?;
666        let row: Option<Option<String>> = conn
667            .query_row(
668                "SELECT alias_for FROM pv_info WHERE pv_name = ?1",
669                params![name],
670                |r| r.get(0),
671            )
672            .optional()?;
673        Ok(row.flatten())
674    }
675
676    /// Return the canonical PV name. If `name` is an alias, returns the target;
677    /// otherwise returns the input unchanged. Used by lookup/retrieval paths.
678    pub fn canonical_name(&self, name: &str) -> anyhow::Result<String> {
679        Ok(self
680            .resolve_alias(name)?
681            .unwrap_or_else(|| name.to_string()))
682    }
683
684    /// List all alias names pointing at a given target PV.
685    pub fn aliases_for(&self, target: &str) -> anyhow::Result<Vec<String>> {
686        let conn = self.lock_conn()?;
687        let mut stmt =
688            conn.prepare("SELECT pv_name FROM pv_info WHERE alias_for = ?1 ORDER BY pv_name")?;
689        let names = stmt
690            .query_map(params![target], |row| row.get(0))?
691            .collect::<Result<Vec<String>, _>>()?;
692        Ok(names)
693    }
694
695    /// All `(alias, target)` pairs in the registry.
696    pub fn all_aliases(&self) -> anyhow::Result<Vec<(String, String)>> {
697        let conn = self.lock_conn()?;
698        let mut stmt = conn.prepare(
699            "SELECT pv_name, alias_for FROM pv_info
700             WHERE alias_for IS NOT NULL ORDER BY pv_name",
701        )?;
702        let rows = stmt
703            .query_map([], |row| {
704                Ok((row.get::<_, String>(0)?, row.get::<_, String>(1)?))
705            })?
706            .collect::<Result<Vec<_>, _>>()?;
707        Ok(rows)
708    }
709
710    /// All PV names including aliases (`getAllExpandedPVNames`).
711    pub fn expanded_pv_names(&self) -> anyhow::Result<Vec<String>> {
712        let conn = self.lock_conn()?;
713        let mut stmt = conn.prepare("SELECT pv_name FROM pv_info ORDER BY pv_name")?;
714        let names = stmt
715            .query_map([], |row| row.get(0))?
716            .collect::<Result<Vec<String>, _>>()?;
717        Ok(names)
718    }
719
720    /// Get all PV records (for export).
721    pub fn all_records(&self) -> anyhow::Result<Vec<PvRecord>> {
722        let conn = self.lock_conn()?;
723        let mut stmt = conn.prepare(
724            "SELECT pv_name, dbr_type, sample_mode, sample_period, status, element_count,
725                    last_timestamp, created_at, updated_at, prec, egu,
726                    alias_for, archive_fields, policy_name
727             FROM pv_info ORDER BY pv_name",
728        )?;
729        let records = stmt
730            .query_map([], row_to_record)?
731            .collect::<Result<Vec<_>, _>>()?;
732        Ok(records)
733    }
734
735    /// Get real PVs that have not received events for longer than the
736    /// threshold duration. Only returns PVs that have a last_timestamp.
737    /// Aliases are excluded — they don't carry event timestamps.
738    pub fn silent_pvs(&self, threshold: Duration) -> anyhow::Result<Vec<PvRecord>> {
739        let conn = self.lock_conn()?;
740        let cutoff = SystemTime::now()
741            .checked_sub(threshold)
742            .unwrap_or(SystemTime::UNIX_EPOCH);
743        let cutoff_str = DateTime::<Utc>::from(cutoff).to_rfc3339();
744        let mut stmt = conn.prepare(
745            "SELECT pv_name, dbr_type, sample_mode, sample_period, status, element_count,
746                    last_timestamp, created_at, updated_at, prec, egu,
747                    alias_for, archive_fields, policy_name
748             FROM pv_info WHERE last_timestamp IS NOT NULL AND last_timestamp < ?1
749                            AND alias_for IS NULL
750             ORDER BY last_timestamp ASC",
751        )?;
752        let records = stmt
753            .query_map(params![cutoff_str], row_to_record)?
754            .collect::<Result<Vec<_>, _>>()?;
755        Ok(records)
756    }
757}
758
759fn row_to_record(row: &rusqlite::Row) -> rusqlite::Result<PvRecord> {
760    let pv_name: String = row.get(0)?;
761    let dbr_type_i: i32 = row.get(1)?;
762    let sample_mode_str: String = row.get(2)?;
763    let sample_period: f64 = row.get(3)?;
764    let status_str: String = row.get(4)?;
765    let element_count: i32 = row.get(5)?;
766    let last_ts_str: Option<String> = row.get(6)?;
767    let created_str: String = row.get(7)?;
768    let updated_str: String = row.get(8)?;
769    let prec: Option<String> = row.get(9).unwrap_or(None);
770    let egu: Option<String> = row.get(10).unwrap_or(None);
771    let alias_for: Option<String> = row.get(11).unwrap_or(None);
772    let archive_fields_json: Option<String> = row.get(12).unwrap_or(None);
773    let policy_name: Option<String> = row.get(13).unwrap_or(None);
774
775    let last_timestamp = last_ts_str.and_then(|s| {
776        DateTime::parse_from_rfc3339(&s)
777            .ok()
778            .map(|dt| dt.with_timezone(&Utc).into())
779    });
780
781    let archive_fields = archive_fields_json
782        .as_deref()
783        .and_then(|s| serde_json::from_str::<Vec<String>>(s).ok())
784        .unwrap_or_default();
785
786    Ok(PvRecord {
787        pv_name,
788        dbr_type: ArchDbType::from_i32(dbr_type_i).unwrap_or(ArchDbType::ScalarDouble),
789        sample_mode: SampleMode::from_db(&sample_mode_str, sample_period),
790        status: status_str.parse().unwrap_or(PvStatus::Active),
791        element_count,
792        last_timestamp,
793        created_at: DateTime::parse_from_rfc3339(&created_str)
794            .map(|dt| dt.with_timezone(&Utc))
795            .unwrap_or_else(|_| Utc::now()),
796        updated_at: DateTime::parse_from_rfc3339(&updated_str)
797            .map(|dt| dt.with_timezone(&Utc))
798            .unwrap_or_else(|_| Utc::now()),
799        prec,
800        egu,
801        alias_for,
802        archive_fields,
803        policy_name,
804    })
805}
806
807/// SQLite returns generic `Error` for ALTER TABLE failures. Match on the
808/// message because rusqlite doesn't expose a structured "duplicate column"
809/// extended-code; the SQLite error string is `duplicate column name: <name>`.
810fn is_duplicate_column_error(e: &rusqlite::Error) -> bool {
811    matches!(
812        e,
813        rusqlite::Error::SqliteFailure(_, Some(msg))
814            if msg.starts_with("duplicate column name")
815    )
816}
817
818#[cfg(test)]
819mod tests {
820    use super::*;
821
822    #[test]
823    fn invalid_pv_names_rejected() {
824        // path traversal
825        assert!(!is_valid_pv_name("../etc/passwd"));
826        assert!(!is_valid_pv_name("foo/../bar"));
827        assert!(!is_valid_pv_name("foo:..:bar"));
828        assert!(!is_valid_pv_name("foo/./bar"));
829        // absolute paths via leading separator (round-10: leading ':'
830        // becomes '/' after pv_name_to_key and escapes the storage root)
831        assert!(!is_valid_pv_name("/etc/passwd"));
832        assert!(!is_valid_pv_name(":SIM:foo"));
833        assert!(!is_valid_pv_name(":foo"));
834        // empty segments (collapse oddly when split on / or :)
835        assert!(!is_valid_pv_name("foo::bar"));
836        assert!(!is_valid_pv_name("foo//bar"));
837        assert!(!is_valid_pv_name("foo:"));
838        assert!(!is_valid_pv_name("foo/"));
839        // shell metacharacters
840        assert!(!is_valid_pv_name("foo;rm -rf /"));
841        assert!(!is_valid_pv_name("foo|bar"));
842        assert!(!is_valid_pv_name("foo`x`"));
843        assert!(!is_valid_pv_name("foo$BAR"));
844        assert!(!is_valid_pv_name("foo bar"));
845        assert!(!is_valid_pv_name("foo\nbar"));
846        // edge inputs
847        assert!(!is_valid_pv_name(""));
848        assert!(!is_valid_pv_name(".hidden"));
849        assert!(!is_valid_pv_name("-leading-dash"));
850        assert!(!is_valid_pv_name(&"x".repeat(257)));
851    }
852
853    #[test]
854    fn valid_pv_names_accepted() {
855        // typical EPICS site naming conventions
856        assert!(is_valid_pv_name("SIM:Sine"));
857        assert!(is_valid_pv_name("XF:31IDA-OP{Tbl-Ax:X1}Mtr"));
858        assert!(is_valid_pv_name("ACC1-001-RFCAV-01:V<x>"));
859        assert!(is_valid_pv_name("PV.HIHI"));
860        assert!(is_valid_pv_name("BL_X+Y"));
861        assert!(is_valid_pv_name("a"));
862        assert!(is_valid_pv_name(&"x".repeat(256)));
863    }
864
865    #[test]
866    fn strip_field_suffix_basics() {
867        assert_eq!(strip_field_suffix("BASE.HIHI"), Some("BASE"));
868        assert_eq!(strip_field_suffix("BASE.LOLO"), Some("BASE"));
869        assert_eq!(strip_field_suffix("FOO.BAR_99"), Some("FOO"));
870        // no suffix
871        assert_eq!(strip_field_suffix("BASE"), None);
872        // lowercase / mixed-case rejected (not a standard EPICS field)
873        assert_eq!(strip_field_suffix("BASE.hihi"), None);
874        assert_eq!(strip_field_suffix("BASE.Hihi"), None);
875        // empty parts
876        assert_eq!(strip_field_suffix(".HIHI"), None);
877        assert_eq!(strip_field_suffix("BASE."), None);
878    }
879
880    #[test]
881    fn test_register_and_get() {
882        let reg = PvRegistry::in_memory().unwrap();
883        reg.register_pv(
884            "SIM:Sine",
885            ArchDbType::ScalarDouble,
886            &SampleMode::Monitor,
887            1,
888        )
889        .unwrap();
890
891        let record = reg.get_pv("SIM:Sine").unwrap().unwrap();
892        assert_eq!(record.pv_name, "SIM:Sine");
893        assert_eq!(record.dbr_type, ArchDbType::ScalarDouble);
894        assert_eq!(record.status, PvStatus::Active);
895    }
896
897    #[test]
898    fn test_status_transitions() {
899        let reg = PvRegistry::in_memory().unwrap();
900        reg.register_pv(
901            "SIM:Test",
902            ArchDbType::ScalarDouble,
903            &SampleMode::Monitor,
904            1,
905        )
906        .unwrap();
907
908        reg.set_status("SIM:Test", PvStatus::Paused).unwrap();
909        let r = reg.get_pv("SIM:Test").unwrap().unwrap();
910        assert_eq!(r.status, PvStatus::Paused);
911
912        reg.set_status("SIM:Test", PvStatus::Active).unwrap();
913        let r = reg.get_pv("SIM:Test").unwrap().unwrap();
914        assert_eq!(r.status, PvStatus::Active);
915    }
916
917    #[test]
918    fn test_pattern_matching() {
919        let reg = PvRegistry::in_memory().unwrap();
920        reg.register_pv(
921            "SIM:Sine",
922            ArchDbType::ScalarDouble,
923            &SampleMode::Monitor,
924            1,
925        )
926        .unwrap();
927        reg.register_pv(
928            "SIM:Cosine",
929            ArchDbType::ScalarDouble,
930            &SampleMode::Monitor,
931            1,
932        )
933        .unwrap();
934        reg.register_pv(
935            "EXP:BL1:run:active",
936            ArchDbType::ScalarEnum,
937            &SampleMode::Monitor,
938            1,
939        )
940        .unwrap();
941        reg.register_pv(
942            "EXP:BL1:motor:th:readback",
943            ArchDbType::ScalarDouble,
944            &SampleMode::Monitor,
945            1,
946        )
947        .unwrap();
948
949        let sim = reg.matching_pvs("SIM:*").unwrap();
950        assert_eq!(sim.len(), 2);
951
952        let exp = reg.matching_pvs("EXP:BL1:*").unwrap();
953        assert_eq!(exp.len(), 2);
954
955        let motor = reg.matching_pvs("EXP:*:motor:*").unwrap();
956        assert_eq!(motor.len(), 1);
957    }
958
959    #[test]
960    fn test_count_and_list() {
961        let reg = PvRegistry::in_memory().unwrap();
962        for i in 0..100 {
963            reg.register_pv(
964                &format!("PV:Test:{i:04}"),
965                ArchDbType::ScalarDouble,
966                &SampleMode::Monitor,
967                1,
968            )
969            .unwrap();
970        }
971
972        assert_eq!(reg.count(None).unwrap(), 100);
973        assert_eq!(reg.count(Some(PvStatus::Active)).unwrap(), 100);
974
975        let names = reg.all_pv_names().unwrap();
976        assert_eq!(names.len(), 100);
977    }
978
979    #[test]
980    fn test_remove_pv() {
981        let reg = PvRegistry::in_memory().unwrap();
982        reg.register_pv(
983            "SIM:Gone",
984            ArchDbType::ScalarDouble,
985            &SampleMode::Monitor,
986            1,
987        )
988        .unwrap();
989        assert!(reg.get_pv("SIM:Gone").unwrap().is_some());
990
991        reg.remove_pv("SIM:Gone").unwrap();
992        assert!(reg.get_pv("SIM:Gone").unwrap().is_none());
993    }
994
995    #[test]
996    fn test_batch_update_timestamps() {
997        let reg = PvRegistry::in_memory().unwrap();
998        reg.register_pv("PV:A", ArchDbType::ScalarDouble, &SampleMode::Monitor, 1)
999            .unwrap();
1000        reg.register_pv("PV:B", ArchDbType::ScalarDouble, &SampleMode::Monitor, 1)
1001            .unwrap();
1002
1003        let now = SystemTime::now();
1004        reg.batch_update_timestamps(&[("PV:A", now), ("PV:B", now)])
1005            .unwrap();
1006
1007        let a = reg.get_pv("PV:A").unwrap().unwrap();
1008        assert!(a.last_timestamp.is_some());
1009    }
1010
1011    #[test]
1012    fn test_recently_added_pvs() {
1013        let reg = PvRegistry::in_memory().unwrap();
1014        let before = SystemTime::now() - Duration::from_secs(1);
1015        reg.register_pv("PV:New", ArchDbType::ScalarDouble, &SampleMode::Monitor, 1)
1016            .unwrap();
1017
1018        let recent = reg.recently_added_pvs(before).unwrap();
1019        assert_eq!(recent.len(), 1);
1020        assert_eq!(recent[0].pv_name, "PV:New");
1021
1022        let future = SystemTime::now() + Duration::from_secs(3600);
1023        let none = reg.recently_added_pvs(future).unwrap();
1024        assert!(none.is_empty());
1025    }
1026
1027    #[test]
1028    fn test_recently_modified_pvs() {
1029        let reg = PvRegistry::in_memory().unwrap();
1030        reg.register_pv("PV:Mod", ArchDbType::ScalarDouble, &SampleMode::Monitor, 1)
1031            .unwrap();
1032        let before = SystemTime::now() - Duration::from_secs(1);
1033
1034        // Modify status to update updated_at.
1035        reg.set_status("PV:Mod", PvStatus::Paused).unwrap();
1036
1037        let recent = reg.recently_modified_pvs(before).unwrap();
1038        assert!(recent.iter().any(|r| r.pv_name == "PV:Mod"));
1039    }
1040
1041    #[test]
1042    fn test_update_sample_mode() {
1043        let reg = PvRegistry::in_memory().unwrap();
1044        reg.register_pv("PV:Mode", ArchDbType::ScalarDouble, &SampleMode::Monitor, 1)
1045            .unwrap();
1046
1047        let new_mode = SampleMode::Scan { period_secs: 5.0 };
1048        assert!(reg.update_sample_mode("PV:Mode", &new_mode).unwrap());
1049
1050        let r = reg.get_pv("PV:Mode").unwrap().unwrap();
1051        assert_eq!(r.sample_mode, SampleMode::Scan { period_secs: 5.0 });
1052    }
1053
1054    #[test]
1055    fn test_archive_fields_roundtrip() {
1056        let reg = PvRegistry::in_memory().unwrap();
1057        reg.register_pv(
1058            "PV:Fields",
1059            ArchDbType::ScalarDouble,
1060            &SampleMode::Monitor,
1061            1,
1062        )
1063        .unwrap();
1064
1065        // Default is empty.
1066        let r = reg.get_pv("PV:Fields").unwrap().unwrap();
1067        assert!(r.archive_fields.is_empty());
1068
1069        // Set and read back.
1070        let fields = vec!["HIHI".to_string(), "LOLO".to_string(), "EGU".to_string()];
1071        assert!(reg.update_archive_fields("PV:Fields", &fields).unwrap());
1072        let r = reg.get_pv("PV:Fields").unwrap().unwrap();
1073        assert_eq!(r.archive_fields, fields);
1074
1075        // Clearing with [] removes the JSON entry.
1076        assert!(reg.update_archive_fields("PV:Fields", &[]).unwrap());
1077        let r = reg.get_pv("PV:Fields").unwrap().unwrap();
1078        assert!(r.archive_fields.is_empty());
1079    }
1080
1081    #[test]
1082    fn test_policy_name_roundtrip() {
1083        let reg = PvRegistry::in_memory().unwrap();
1084        reg.register_pv("PV:Pol", ArchDbType::ScalarDouble, &SampleMode::Monitor, 1)
1085            .unwrap();
1086
1087        let r = reg.get_pv("PV:Pol").unwrap().unwrap();
1088        assert!(r.policy_name.is_none());
1089
1090        assert!(reg.update_policy_name("PV:Pol", Some("fast")).unwrap());
1091        let r = reg.get_pv("PV:Pol").unwrap().unwrap();
1092        assert_eq!(r.policy_name.as_deref(), Some("fast"));
1093
1094        assert!(reg.update_policy_name("PV:Pol", None).unwrap());
1095        let r = reg.get_pv("PV:Pol").unwrap().unwrap();
1096        assert!(r.policy_name.is_none());
1097    }
1098
1099    #[test]
1100    fn test_import_pv_with_alias_and_fields() {
1101        let reg = PvRegistry::in_memory().unwrap();
1102        let fields = vec!["HIHI".to_string(), "LOLO".to_string()];
1103        reg.import_pv(
1104            "PV:Aliased",
1105            ArchDbType::ScalarDouble,
1106            &SampleMode::Monitor,
1107            1,
1108            PvStatus::Active,
1109            None,
1110            Some("3"),
1111            Some("mA"),
1112            Some("PV:Real"),
1113            &fields,
1114            Some("ring"),
1115        )
1116        .unwrap();
1117
1118        let r = reg.get_pv("PV:Aliased").unwrap().unwrap();
1119        assert_eq!(r.alias_for.as_deref(), Some("PV:Real"));
1120        assert_eq!(r.archive_fields, fields);
1121        assert_eq!(r.policy_name.as_deref(), Some("ring"));
1122        assert_eq!(r.prec.as_deref(), Some("3"));
1123        assert_eq!(r.egu.as_deref(), Some("mA"));
1124    }
1125
1126    #[test]
1127    fn test_migration_from_old_schema() {
1128        // Build a connection with the v0.1.4 schema (no alias/archive_fields/policy)
1129        // to verify ALTER TABLE migrations succeed and old rows decode cleanly.
1130        let conn = Connection::open_in_memory().unwrap();
1131        conn.execute_batch(
1132            "CREATE TABLE pv_info (
1133                pv_name        TEXT PRIMARY KEY NOT NULL,
1134                dbr_type       INTEGER NOT NULL,
1135                sample_mode    TEXT NOT NULL DEFAULT 'monitor',
1136                sample_period  REAL NOT NULL DEFAULT 0.0,
1137                status         TEXT NOT NULL DEFAULT 'active',
1138                element_count  INTEGER NOT NULL DEFAULT 1,
1139                last_timestamp TEXT,
1140                created_at     TEXT NOT NULL,
1141                updated_at     TEXT NOT NULL,
1142                prec           TEXT,
1143                egu            TEXT
1144            );",
1145        )
1146        .unwrap();
1147        let now = Utc::now().to_rfc3339();
1148        conn.execute(
1149            "INSERT INTO pv_info
1150             (pv_name, dbr_type, sample_mode, sample_period, status, element_count,
1151              created_at, updated_at, prec, egu)
1152             VALUES (?1, ?2, 'monitor', 0.0, 'active', 1, ?3, ?3, NULL, NULL)",
1153            params!["PV:Legacy", ArchDbType::ScalarDouble as i32, now],
1154        )
1155        .unwrap();
1156
1157        let reg = PvRegistry {
1158            conn: Mutex::new(conn),
1159        };
1160        // Re-running init_schema should add the new columns without dropping data.
1161        reg.init_schema().unwrap();
1162
1163        let r = reg.get_pv("PV:Legacy").unwrap().unwrap();
1164        assert_eq!(r.pv_name, "PV:Legacy");
1165        assert!(r.alias_for.is_none());
1166        assert!(r.archive_fields.is_empty());
1167        assert!(r.policy_name.is_none());
1168
1169        // New writes work too.
1170        assert!(
1171            reg.update_archive_fields("PV:Legacy", &["HIHI".to_string()])
1172                .unwrap()
1173        );
1174        let r = reg.get_pv("PV:Legacy").unwrap().unwrap();
1175        assert_eq!(r.archive_fields, vec!["HIHI".to_string()]);
1176    }
1177
1178    #[test]
1179    fn test_aliases_basic() {
1180        let reg = PvRegistry::in_memory().unwrap();
1181        reg.register_pv(
1182            "RING:Current",
1183            ArchDbType::ScalarDouble,
1184            &SampleMode::Monitor,
1185            1,
1186        )
1187        .unwrap();
1188
1189        // Add an alias and verify resolution.
1190        reg.add_alias("DEV:Current", "RING:Current").unwrap();
1191        assert_eq!(
1192            reg.resolve_alias("DEV:Current").unwrap().as_deref(),
1193            Some("RING:Current"),
1194        );
1195        assert!(reg.resolve_alias("RING:Current").unwrap().is_none()); // real PV
1196        assert!(reg.resolve_alias("Nonexistent").unwrap().is_none());
1197
1198        assert_eq!(reg.canonical_name("DEV:Current").unwrap(), "RING:Current");
1199        assert_eq!(reg.canonical_name("RING:Current").unwrap(), "RING:Current");
1200
1201        // Aliases for / all aliases.
1202        assert_eq!(
1203            reg.aliases_for("RING:Current").unwrap(),
1204            vec!["DEV:Current".to_string()],
1205        );
1206        assert_eq!(
1207            reg.all_aliases().unwrap(),
1208            vec![("DEV:Current".to_string(), "RING:Current".to_string())],
1209        );
1210
1211        // Expanded names contains both real and alias.
1212        let expanded = reg.expanded_pv_names().unwrap();
1213        assert!(expanded.contains(&"RING:Current".to_string()));
1214        assert!(expanded.contains(&"DEV:Current".to_string()));
1215
1216        // Idempotent re-add.
1217        reg.add_alias("DEV:Current", "RING:Current").unwrap();
1218        assert_eq!(reg.aliases_for("RING:Current").unwrap().len(), 1);
1219
1220        // Remove alias.
1221        assert!(reg.remove_alias("DEV:Current").unwrap());
1222        assert!(reg.resolve_alias("DEV:Current").unwrap().is_none());
1223        assert!(!reg.remove_alias("DEV:Current").unwrap()); // already gone
1224    }
1225
1226    #[test]
1227    fn test_alias_conflicts() {
1228        let reg = PvRegistry::in_memory().unwrap();
1229        reg.register_pv("PV:A", ArchDbType::ScalarDouble, &SampleMode::Monitor, 1)
1230            .unwrap();
1231        reg.register_pv("PV:B", ArchDbType::ScalarDouble, &SampleMode::Monitor, 1)
1232            .unwrap();
1233
1234        // Cannot alias to nonexistent target.
1235        assert!(reg.add_alias("Alias:X", "Nonexistent").is_err());
1236
1237        // Cannot self-alias.
1238        assert!(reg.add_alias("PV:A", "PV:A").is_err());
1239
1240        // Alias name conflicts with existing real PV.
1241        assert!(reg.add_alias("PV:B", "PV:A").is_err());
1242
1243        // Alias of alias not allowed.
1244        reg.add_alias("Alias:A", "PV:A").unwrap();
1245        assert!(reg.add_alias("Alias:Two", "Alias:A").is_err());
1246
1247        // remove_alias does NOT delete real PVs.
1248        assert!(!reg.remove_alias("PV:A").unwrap());
1249        assert!(reg.get_pv("PV:A").unwrap().is_some());
1250    }
1251
1252    #[test]
1253    fn test_silent_pvs() {
1254        let reg = PvRegistry::in_memory().unwrap();
1255        reg.register_pv(
1256            "PV:Silent",
1257            ArchDbType::ScalarDouble,
1258            &SampleMode::Monitor,
1259            1,
1260        )
1261        .unwrap();
1262        reg.register_pv(
1263            "PV:NoData",
1264            ArchDbType::ScalarDouble,
1265            &SampleMode::Monitor,
1266            1,
1267        )
1268        .unwrap();
1269
1270        // Set PV:Silent's last_timestamp to 2 hours ago.
1271        let old_time = SystemTime::now() - Duration::from_secs(7200);
1272        reg.update_last_timestamp("PV:Silent", old_time).unwrap();
1273
1274        // PV:NoData has no last_timestamp — should not appear.
1275        let silent = reg.silent_pvs(Duration::from_secs(3600)).unwrap();
1276        assert_eq!(silent.len(), 1);
1277        assert_eq!(silent[0].pv_name, "PV:Silent");
1278    }
1279}