Skip to main content

archiver_core/
registry.rs

1//! PV metadata registry backed by SQLite.
2//!
3//! Persists PV archiving configuration and state across restarts.
4//! This is metadata only — time-series data lives in PlainPB files.
5
6use std::path::Path;
7use std::sync::Mutex;
8use std::time::{Duration, SystemTime};
9
10use chrono::{DateTime, Utc};
11use rusqlite::{Connection, OptionalExtension, params};
12use tracing::info;
13
14use crate::types::ArchDbType;
15
16/// Wire protocol the engine should use when subscribing to a PV.
17///
18/// Selected from a `pva://` / `ca://` prefix on the user-supplied name.
19/// Unprefixed names default to [`Protocol::Ca`] for Java EAA parity.
20#[derive(Debug, Clone, Copy, Default, PartialEq, Eq)]
21pub enum Protocol {
22    /// Channel Access (the original EPICS protocol).
23    #[default]
24    Ca,
25    /// pvAccess (structured types, larger payloads).
26    Pva,
27}
28
29impl Protocol {
30    pub fn as_str(self) -> &'static str {
31        match self {
32            Protocol::Ca => "ca",
33            Protocol::Pva => "pva",
34        }
35    }
36
37    pub fn parse(s: &str) -> Option<Self> {
38        match s {
39            "ca" => Some(Protocol::Ca),
40            "pva" => Some(Protocol::Pva),
41            _ => None,
42        }
43    }
44}
45
46/// Canonicalize a user-supplied PV name to the form the registry stores.
47///
48/// Java archiver's `PVNames.normalizeChannelName` + `stripPrefixFromName`:
49/// - `pva://X` and `ca://X` are protocol hints, not part of the channel name
50/// - `X.VAL` and `X` refer to the same EPICS record (`.VAL` is the default field)
51///
52/// Without this, `archivePV?pv=PV1.VAL` and `archivePV?pv=PV1` would create
53/// two separate registry rows that subscribe to the same IOC channel.
54pub fn normalize_pv_name(name: &str) -> &str {
55    let name = name
56        .strip_prefix("pva://")
57        .or_else(|| name.strip_prefix("ca://"))
58        .unwrap_or(name);
59    name.strip_suffix(".VAL").unwrap_or(name)
60}
61
62/// Like [`normalize_pv_name`] but also reports the protocol the prefix
63/// asked for. Returns `(Protocol::Ca, name)` when no prefix is present.
64pub fn parse_pv_with_protocol(name: &str) -> (Protocol, &str) {
65    if let Some(rest) = name.strip_prefix("pva://") {
66        (Protocol::Pva, rest.strip_suffix(".VAL").unwrap_or(rest))
67    } else if let Some(rest) = name.strip_prefix("ca://") {
68        (Protocol::Ca, rest.strip_suffix(".VAL").unwrap_or(rest))
69    } else {
70        (Protocol::Ca, name.strip_suffix(".VAL").unwrap_or(name))
71    }
72}
73
74/// Strip a trailing `.<FIELD>` suffix (e.g. `.HIHI`, `.LOLO`, `.DESC`) from
75/// a PV name, returning the bare PV name. Java's
76/// `PVNames.stripFieldNameFromPVName` (c150faad/5b2a7cb4): a query for
77/// `BASE.HIHI` should fall back to typeinfo for `BASE` so handlers can
78/// surface the field-archived metadata. Returns `None` if there is no
79/// field suffix or the suffix contains characters that aren't valid in
80/// EPICS record-field names (uppercase letters / digits / underscores).
81pub fn strip_field_suffix(name: &str) -> Option<&str> {
82    let (base, field) = name.rsplit_once('.')?;
83    if base.is_empty() || field.is_empty() {
84        return None;
85    }
86    if !field
87        .chars()
88        .all(|c| c.is_ascii_uppercase() || c.is_ascii_digit() || c == '_')
89    {
90        return None;
91    }
92    Some(base)
93}
94
95/// Reject PV names that, when mapped to a filesystem path, could escape
96/// the storage root or do other naughty things. Java archiver enforces a
97/// `[A-Za-z0-9_:.\-+\[\]<>;]` allowlist via `PVNames.isValidPVName`; we
98/// instead use a conservative blocklist (no `..`, no `/`, no NUL, no
99/// leading `.`, no leading-`-`, no whitespace) — strict enough to keep
100/// `pv_name_to_key` from generating an absolute or traversal path, but
101/// permissive enough not to break existing site naming conventions
102/// like `SIM:Sine` and `IOC[A]:val<x>`.
103///
104/// Called by every registry entry-point (register / import / add_alias)
105/// AND by `pv_name_to_key` as defense in depth, so any code path that
106/// builds a filesystem name from a PV string fails closed if the
107/// HTTP-layer validation is bypassed.
108pub fn is_valid_pv_name(name: &str) -> bool {
109    if name.is_empty() || name.len() > 256 {
110        return false;
111    }
112    // Leading separator → after pv_name_to_key the result begins with
113    // `/`, and `Path::join(root, "/abs/path")` ignores `root` entirely
114    // (Rust semantics) → escapes the storage root. Same for the `.` /
115    // `-` cases.
116    if name.starts_with('.')
117        || name.starts_with('-')
118        || name.starts_with('/')
119        || name.starts_with(':')
120    {
121        return false;
122    }
123    for component in name.split([':', '/']) {
124        // `..` / `.` are obvious traversal. An empty segment (`A::B`,
125        // `A//B`, trailing `:`/`/`) maps to a `//` in the filesystem
126        // path which most OSes collapse but some path-relative tools
127        // re-split, so just reject.
128        if component.is_empty() || component == ".." || component == "." {
129            return false;
130        }
131    }
132    !name.chars().any(|c| {
133        c == '\0'
134            || c == '\\'
135            || c.is_whitespace()
136            || c.is_control()
137            // Reject backslash and other path-shell metacharacters that
138            // could surprise downstream tools that re-parse the name.
139            || matches!(c, '|' | '&' | ';' | '`' | '$' | '"' | '\'' | '*' | '?')
140    })
141}
142
143/// PV archiving status.
144///
145/// `Alias` is a sentinel applied to alias rows so they don't appear in
146/// status-filtered queries (`pvs_by_status(Active)`, getPVCount, restore
147/// loops). Aliases are routing entries, not archive subjects.
148#[derive(Debug, Clone, Copy, PartialEq, Eq)]
149pub enum PvStatus {
150    Active,
151    Paused,
152    Error,
153    Inactive,
154    Alias,
155}
156
157impl PvStatus {
158    pub fn as_str(self) -> &'static str {
159        match self {
160            Self::Active => "active",
161            Self::Paused => "paused",
162            Self::Error => "error",
163            Self::Inactive => "inactive",
164            Self::Alias => "alias",
165        }
166    }
167}
168
169impl std::str::FromStr for PvStatus {
170    type Err = std::convert::Infallible;
171
172    fn from_str(s: &str) -> Result<Self, Self::Err> {
173        Ok(match s {
174            "active" => Self::Active,
175            "paused" => Self::Paused,
176            "error" => Self::Error,
177            "inactive" => Self::Inactive,
178            "alias" => Self::Alias,
179            _ => Self::Active,
180        })
181    }
182}
183
184/// Sampling mode stored in the registry.
185#[derive(Debug, Clone, PartialEq)]
186pub enum SampleMode {
187    Monitor,
188    Scan { period_secs: f64 },
189}
190
191impl SampleMode {
192    fn to_db(&self) -> (&str, f64) {
193        match self {
194            Self::Monitor => ("monitor", 0.0),
195            Self::Scan { period_secs } => ("scan", *period_secs),
196        }
197    }
198
199    fn from_db(mode: &str, period: f64) -> Self {
200        match mode {
201            "scan" => Self::Scan {
202                period_secs: period,
203            },
204            _ => Self::Monitor,
205        }
206    }
207}
208
209/// A PV record in the registry.
210#[derive(Debug, Clone)]
211pub struct PvRecord {
212    pub pv_name: String,
213    pub dbr_type: ArchDbType,
214    pub sample_mode: SampleMode,
215    pub status: PvStatus,
216    pub element_count: i32,
217    pub last_timestamp: Option<SystemTime>,
218    pub created_at: DateTime<Utc>,
219    pub updated_at: DateTime<Utc>,
220    pub prec: Option<String>,
221    pub egu: Option<String>,
222    /// When set, this row is an alias pointing at another PV name.
223    /// Lookups should resolve to that target before reading/writing data.
224    pub alias_for: Option<String>,
225    /// Names of EPICS metadata fields (e.g. ["HIHI","LOLO","EGU"]) that the
226    /// engine should sample alongside the main value and attach to events.
227    pub archive_fields: Vec<String>,
228    /// Name of the policy that selected this PV's sampling configuration.
229    pub policy_name: Option<String>,
230    /// Wire protocol the engine subscribes with — selected from the
231    /// `pva://` / `ca://` prefix at archive_pv time. Defaults to
232    /// [`Protocol::Ca`] for legacy rows.
233    pub protocol: Protocol,
234}
235
236/// SQLite-backed PV metadata registry.
237pub struct PvRegistry {
238    conn: Mutex<Connection>,
239}
240
241impl PvRegistry {
242    fn lock_conn(&self) -> anyhow::Result<std::sync::MutexGuard<'_, Connection>> {
243        self.conn
244            .lock()
245            .map_err(|e| anyhow::anyhow!("PV registry lock poisoned: {e}"))
246    }
247
248    /// Open (or create) the registry database at the given path.
249    pub fn open(path: &Path) -> anyhow::Result<Self> {
250        let conn = Connection::open(path)?;
251        let registry = Self {
252            conn: Mutex::new(conn),
253        };
254        registry.init_schema()?;
255        Ok(registry)
256    }
257
258    /// Create an in-memory registry (for testing).
259    pub fn in_memory() -> anyhow::Result<Self> {
260        let conn = Connection::open_in_memory()?;
261        let registry = Self {
262            conn: Mutex::new(conn),
263        };
264        registry.init_schema()?;
265        Ok(registry)
266    }
267
268    fn init_schema(&self) -> anyhow::Result<()> {
269        let conn = self.lock_conn()?;
270        // Step 1: create the table (idempotent) and indexes that don't depend
271        // on columns added by later migrations.
272        conn.execute_batch(
273            "
274            CREATE TABLE IF NOT EXISTS pv_info (
275                pv_name         TEXT PRIMARY KEY NOT NULL,
276                dbr_type        INTEGER NOT NULL,
277                sample_mode     TEXT NOT NULL DEFAULT 'monitor',
278                sample_period   REAL NOT NULL DEFAULT 0.0,
279                status          TEXT NOT NULL DEFAULT 'active',
280                element_count   INTEGER NOT NULL DEFAULT 1,
281                last_timestamp  TEXT,
282                created_at      TEXT NOT NULL,
283                updated_at      TEXT NOT NULL,
284                prec            TEXT,
285                egu             TEXT,
286                alias_for       TEXT,
287                archive_fields  TEXT,
288                policy_name     TEXT,
289                protocol        TEXT NOT NULL DEFAULT 'ca'
290            );
291
292            CREATE INDEX IF NOT EXISTS idx_pv_status ON pv_info(status);
293            CREATE INDEX IF NOT EXISTS idx_pv_prefix ON pv_info(pv_name COLLATE NOCASE);
294            ",
295        )?;
296        // Step 2: migrations for tables created with older schemas. Each statement
297        // runs independently because SQLite stops on the first duplicate-column
298        // error when batched. Only "duplicate column" errors are silently
299        // accepted; disk-full / lock / corruption errors must surface.
300        for stmt in [
301            "ALTER TABLE pv_info ADD COLUMN prec TEXT",
302            "ALTER TABLE pv_info ADD COLUMN egu TEXT",
303            "ALTER TABLE pv_info ADD COLUMN alias_for TEXT",
304            "ALTER TABLE pv_info ADD COLUMN archive_fields TEXT",
305            "ALTER TABLE pv_info ADD COLUMN policy_name TEXT",
306            "ALTER TABLE pv_info ADD COLUMN protocol TEXT NOT NULL DEFAULT 'ca'",
307        ] {
308            match conn.execute(stmt, []) {
309                Ok(_) => {}
310                Err(e) if is_duplicate_column_error(&e) => {}
311                Err(e) => return Err(e.into()),
312            }
313        }
314        // Step 3: indexes that reference newly-added columns. Done after ALTER
315        // so an upgraded database has the columns to index.
316        conn.execute_batch(
317            "CREATE INDEX IF NOT EXISTS idx_pv_alias \
318             ON pv_info(alias_for) WHERE alias_for IS NOT NULL;",
319        )?;
320        info!("PV registry schema initialized");
321        Ok(())
322    }
323
324    /// Register a new PV for archiving with default CA protocol.
325    /// Backwards-compatible wrapper around [`register_pv_with_protocol`].
326    pub fn register_pv(
327        &self,
328        pv_name: &str,
329        dbr_type: ArchDbType,
330        sample_mode: &SampleMode,
331        element_count: i32,
332    ) -> anyhow::Result<()> {
333        self.register_pv_with_protocol(pv_name, dbr_type, sample_mode, element_count, Protocol::Ca)
334    }
335
336    /// Register a new PV for archiving, recording which wire protocol the
337    /// engine should use (CA vs PVA).
338    pub fn register_pv_with_protocol(
339        &self,
340        pv_name: &str,
341        dbr_type: ArchDbType,
342        sample_mode: &SampleMode,
343        element_count: i32,
344        protocol: Protocol,
345    ) -> anyhow::Result<()> {
346        if !is_valid_pv_name(pv_name) {
347            anyhow::bail!("invalid PV name: {pv_name:?}");
348        }
349        let conn = self.lock_conn()?;
350        let now = Utc::now().to_rfc3339();
351        let (mode_str, period) = sample_mode.to_db();
352
353        conn.execute(
354            "INSERT OR REPLACE INTO pv_info
355             (pv_name, dbr_type, sample_mode, sample_period, status, element_count, created_at, updated_at, protocol)
356             VALUES (?1, ?2, ?3, ?4, 'active', ?5, COALESCE((SELECT created_at FROM pv_info WHERE pv_name = ?1), ?6), ?6, ?7)",
357            params![pv_name, dbr_type as i32, mode_str, period, element_count, now, protocol.as_str()],
358        )?;
359        Ok(())
360    }
361
362    /// Update PV status (active, paused, error).
363    pub fn set_status(&self, pv_name: &str, status: PvStatus) -> anyhow::Result<bool> {
364        let conn = self.lock_conn()?;
365        let now = Utc::now().to_rfc3339();
366        let rows = conn.execute(
367            "UPDATE pv_info SET status = ?1, updated_at = ?2 WHERE pv_name = ?3",
368            params![status.as_str(), now, pv_name],
369        )?;
370        Ok(rows > 0)
371    }
372
373    /// Update the last known timestamp for a PV.
374    pub fn update_last_timestamp(
375        &self,
376        pv_name: &str,
377        timestamp: SystemTime,
378    ) -> anyhow::Result<()> {
379        let conn = self.lock_conn()?;
380        let dt = DateTime::<Utc>::from(timestamp).to_rfc3339();
381        let now = Utc::now().to_rfc3339();
382        conn.execute(
383            "UPDATE pv_info SET last_timestamp = ?1, updated_at = ?2 WHERE pv_name = ?3",
384            params![dt, now, pv_name],
385        )?;
386        Ok(())
387    }
388
389    /// Remove a PV from the registry entirely.
390    pub fn remove_pv(&self, pv_name: &str) -> anyhow::Result<bool> {
391        let conn = self.lock_conn()?;
392        let rows = conn.execute("DELETE FROM pv_info WHERE pv_name = ?1", params![pv_name])?;
393        Ok(rows > 0)
394    }
395
396    /// Get a single PV record.
397    pub fn get_pv(&self, pv_name: &str) -> anyhow::Result<Option<PvRecord>> {
398        let conn = self.lock_conn()?;
399        conn.query_row(
400            "SELECT pv_name, dbr_type, sample_mode, sample_period, status, element_count,
401                    last_timestamp, created_at, updated_at, prec, egu,
402                    alias_for, archive_fields, policy_name, protocol
403             FROM pv_info WHERE pv_name = ?1",
404            params![pv_name],
405            row_to_record,
406        )
407        .optional()
408        .map_err(Into::into)
409    }
410
411    /// List all real PV names (alias rows excluded). Use
412    /// [`Self::expanded_pv_names`] to include aliases.
413    pub fn all_pv_names(&self) -> anyhow::Result<Vec<String>> {
414        let conn = self.lock_conn()?;
415        let mut stmt =
416            conn.prepare("SELECT pv_name FROM pv_info WHERE alias_for IS NULL ORDER BY pv_name")?;
417        let names = stmt
418            .query_map([], |row| row.get(0))?
419            .collect::<Result<Vec<String>, _>>()?;
420        Ok(names)
421    }
422
423    /// List all real PVs with a given status. Alias rows have
424    /// `status='alias'` and are excluded from every other-status query.
425    pub fn pvs_by_status(&self, status: PvStatus) -> anyhow::Result<Vec<PvRecord>> {
426        let conn = self.lock_conn()?;
427        let mut stmt = conn.prepare(
428            "SELECT pv_name, dbr_type, sample_mode, sample_period, status, element_count,
429                    last_timestamp, created_at, updated_at, prec, egu,
430                    alias_for, archive_fields, policy_name, protocol
431             FROM pv_info WHERE status = ?1 ORDER BY pv_name",
432        )?;
433        let records = stmt
434            .query_map(params![status.as_str()], row_to_record)?
435            .collect::<Result<Vec<_>, _>>()?;
436        Ok(records)
437    }
438
439    /// Match real PV names by glob pattern (SQL GLOB). Excludes aliases.
440    pub fn matching_pvs(&self, pattern: &str) -> anyhow::Result<Vec<String>> {
441        let conn = self.lock_conn()?;
442        let mut stmt = conn.prepare(
443            "SELECT pv_name FROM pv_info
444             WHERE pv_name GLOB ?1 AND alias_for IS NULL
445             ORDER BY pv_name",
446        )?;
447        let names = stmt
448            .query_map(params![pattern], |row| row.get(0))?
449            .collect::<Result<Vec<String>, _>>()?;
450        Ok(names)
451    }
452
453    /// Match PV names by glob pattern, INCLUDING alias rows. Java's
454    /// `getMatchingPVs` returns aliases too (c61f1579) — without this an
455    /// admin walking the inventory by glob silently misses every alias.
456    /// Internal callers that only want real PVs (engine, reports) stay
457    /// on `matching_pvs`.
458    pub fn matching_pvs_expanded(&self, pattern: &str) -> anyhow::Result<Vec<String>> {
459        let conn = self.lock_conn()?;
460        let mut stmt =
461            conn.prepare("SELECT pv_name FROM pv_info WHERE pv_name GLOB ?1 ORDER BY pv_name")?;
462        let names = stmt
463            .query_map(params![pattern], |row| row.get(0))?
464            .collect::<Result<Vec<String>, _>>()?;
465        Ok(names)
466    }
467
468    /// Count real PVs, optionally filtered by status. Excludes aliases.
469    pub fn count(&self, status: Option<PvStatus>) -> anyhow::Result<u64> {
470        let conn = self.lock_conn()?;
471        let count: u64 = match status {
472            Some(s) => conn.query_row(
473                "SELECT COUNT(*) FROM pv_info
474                 WHERE status = ?1 AND alias_for IS NULL",
475                params![s.as_str()],
476                |row| row.get(0),
477            )?,
478            None => conn.query_row(
479                "SELECT COUNT(*) FROM pv_info WHERE alias_for IS NULL",
480                [],
481                |row| row.get(0),
482            )?,
483        };
484        Ok(count)
485    }
486
487    /// Batch update last timestamps (for periodic flush).
488    pub fn batch_update_timestamps(&self, updates: &[(&str, SystemTime)]) -> anyhow::Result<()> {
489        let mut conn = self.lock_conn()?;
490        let tx = conn.transaction()?;
491        let now = Utc::now().to_rfc3339();
492        {
493            let mut stmt = tx.prepare(
494                "UPDATE pv_info SET last_timestamp = ?1, updated_at = ?2 WHERE pv_name = ?3",
495            )?;
496            for (pv_name, ts) in updates {
497                let dt = DateTime::<Utc>::from(*ts).to_rfc3339();
498                stmt.execute(params![dt, now, pv_name])?;
499            }
500        }
501        tx.commit()?;
502        Ok(())
503    }
504
505    /// Get real PVs added since a given time (aliases excluded).
506    pub fn recently_added_pvs(&self, since: SystemTime) -> anyhow::Result<Vec<PvRecord>> {
507        let conn = self.lock_conn()?;
508        let since_str = DateTime::<Utc>::from(since).to_rfc3339();
509        let mut stmt = conn.prepare(
510            "SELECT pv_name, dbr_type, sample_mode, sample_period, status, element_count,
511                    last_timestamp, created_at, updated_at, prec, egu,
512                    alias_for, archive_fields, policy_name, protocol
513             FROM pv_info WHERE created_at >= ?1 AND alias_for IS NULL
514             ORDER BY created_at DESC",
515        )?;
516        let records = stmt
517            .query_map(params![since_str], row_to_record)?
518            .collect::<Result<Vec<_>, _>>()?;
519        Ok(records)
520    }
521
522    /// Get real PVs modified since a given time (aliases excluded).
523    pub fn recently_modified_pvs(&self, since: SystemTime) -> anyhow::Result<Vec<PvRecord>> {
524        let conn = self.lock_conn()?;
525        let since_str = DateTime::<Utc>::from(since).to_rfc3339();
526        let mut stmt = conn.prepare(
527            "SELECT pv_name, dbr_type, sample_mode, sample_period, status, element_count,
528                    last_timestamp, created_at, updated_at, prec, egu,
529                    alias_for, archive_fields, policy_name, protocol
530             FROM pv_info WHERE updated_at >= ?1 AND alias_for IS NULL
531             ORDER BY updated_at DESC",
532        )?;
533        let records = stmt
534            .query_map(params![since_str], row_to_record)?
535            .collect::<Result<Vec<_>, _>>()?;
536        Ok(records)
537    }
538
539    /// Update the sample mode and period for a PV.
540    pub fn update_sample_mode(&self, pv_name: &str, mode: &SampleMode) -> anyhow::Result<bool> {
541        let conn = self.lock_conn()?;
542        let now = Utc::now().to_rfc3339();
543        let (mode_str, period) = mode.to_db();
544        let rows = conn.execute(
545            "UPDATE pv_info SET sample_mode = ?1, sample_period = ?2, updated_at = ?3 WHERE pv_name = ?4",
546            params![mode_str, period, now, pv_name],
547        )?;
548        Ok(rows > 0)
549    }
550
551    /// Update PREC and EGU metadata for a PV.
552    pub fn update_metadata(
553        &self,
554        pv_name: &str,
555        prec: Option<&str>,
556        egu: Option<&str>,
557    ) -> anyhow::Result<bool> {
558        let conn = self.lock_conn()?;
559        let now = Utc::now().to_rfc3339();
560        let rows = conn.execute(
561            "UPDATE pv_info SET prec = COALESCE(?1, prec), egu = COALESCE(?2, egu), updated_at = ?3 WHERE pv_name = ?4",
562            params![prec, egu, now, pv_name],
563        )?;
564        Ok(rows > 0)
565    }
566
567    /// Import a PV with default CA protocol — backwards-compatible
568    /// wrapper around [`import_pv_with_protocol`].
569    #[allow(clippy::too_many_arguments)]
570    pub fn import_pv(
571        &self,
572        pv_name: &str,
573        dbr_type: ArchDbType,
574        sample_mode: &SampleMode,
575        element_count: i32,
576        status: PvStatus,
577        created_at: Option<&str>,
578        prec: Option<&str>,
579        egu: Option<&str>,
580        alias_for: Option<&str>,
581        archive_fields: &[String],
582        policy_name: Option<&str>,
583    ) -> anyhow::Result<()> {
584        self.import_pv_with_protocol(
585            pv_name,
586            dbr_type,
587            sample_mode,
588            element_count,
589            status,
590            created_at,
591            prec,
592            egu,
593            alias_for,
594            archive_fields,
595            policy_name,
596            Protocol::Ca,
597        )
598    }
599
600    /// Import a PV with all fields in a single SQL operation.
601    /// Used during config import to atomically set status, created_at, and metadata.
602    #[allow(clippy::too_many_arguments)]
603    pub fn import_pv_with_protocol(
604        &self,
605        pv_name: &str,
606        dbr_type: ArchDbType,
607        sample_mode: &SampleMode,
608        element_count: i32,
609        status: PvStatus,
610        created_at: Option<&str>,
611        prec: Option<&str>,
612        egu: Option<&str>,
613        alias_for: Option<&str>,
614        archive_fields: &[String],
615        policy_name: Option<&str>,
616        protocol: Protocol,
617    ) -> anyhow::Result<()> {
618        if !is_valid_pv_name(pv_name) {
619            anyhow::bail!("invalid PV name: {pv_name:?}");
620        }
621        if let Some(target) = alias_for
622            && !is_valid_pv_name(target)
623        {
624            anyhow::bail!("invalid alias target: {target:?}");
625        }
626        let conn = self.lock_conn()?;
627        let now = Utc::now().to_rfc3339();
628        let (mode_str, period) = sample_mode.to_db();
629        let created = created_at.unwrap_or(&now);
630        let archive_fields_json = if archive_fields.is_empty() {
631            None
632        } else {
633            Some(serde_json::to_string(archive_fields)?)
634        };
635
636        conn.execute(
637            "INSERT OR REPLACE INTO pv_info
638             (pv_name, dbr_type, sample_mode, sample_period, status, element_count,
639              created_at, updated_at, prec, egu, alias_for, archive_fields, policy_name, protocol)
640             VALUES (?1, ?2, ?3, ?4, ?5, ?6, ?7, ?8, ?9, ?10, ?11, ?12, ?13, ?14)",
641            params![
642                pv_name,
643                dbr_type as i32,
644                mode_str,
645                period,
646                status.as_str(),
647                element_count,
648                created,
649                now,
650                prec,
651                egu,
652                alias_for,
653                archive_fields_json,
654                policy_name,
655                protocol.as_str(),
656            ],
657        )?;
658        Ok(())
659    }
660
661    /// Set or clear the archive_fields list for a PV.
662    pub fn update_archive_fields(&self, pv_name: &str, fields: &[String]) -> anyhow::Result<bool> {
663        let conn = self.lock_conn()?;
664        let now = Utc::now().to_rfc3339();
665        let json = if fields.is_empty() {
666            None
667        } else {
668            Some(serde_json::to_string(fields)?)
669        };
670        let rows = conn.execute(
671            "UPDATE pv_info SET archive_fields = ?1, updated_at = ?2 WHERE pv_name = ?3",
672            params![json, now, pv_name],
673        )?;
674        Ok(rows > 0)
675    }
676
677    /// Set or clear the policy_name on a PV.
678    pub fn update_policy_name(
679        &self,
680        pv_name: &str,
681        policy_name: Option<&str>,
682    ) -> anyhow::Result<bool> {
683        let conn = self.lock_conn()?;
684        let now = Utc::now().to_rfc3339();
685        let rows = conn.execute(
686            "UPDATE pv_info SET policy_name = ?1, updated_at = ?2 WHERE pv_name = ?3",
687            params![policy_name, now, pv_name],
688        )?;
689        Ok(rows > 0)
690    }
691
692    /// Add an alias `alias` that points at the existing PV `target`.
693    /// Fails if target does not exist or if `alias` already maps elsewhere.
694    /// The alias row mirrors target's dbr_type/sample_mode/element_count for
695    /// display, but `alias_for` distinguishes it from real PVs.
696    pub fn add_alias(&self, alias: &str, target: &str) -> anyhow::Result<()> {
697        if alias == target {
698            anyhow::bail!("alias and target must differ");
699        }
700        if !is_valid_pv_name(alias) {
701            anyhow::bail!("invalid alias name: {alias:?}");
702        }
703        if !is_valid_pv_name(target) {
704            anyhow::bail!("invalid alias target: {target:?}");
705        }
706        let conn = self.lock_conn()?;
707        // Resolve target (must be a real PV, not itself an alias).
708        let row: Option<(i32, String, f64, i32, Option<String>)> = conn
709            .query_row(
710                "SELECT dbr_type, sample_mode, sample_period, element_count, alias_for
711                 FROM pv_info WHERE pv_name = ?1",
712                params![target],
713                |r| Ok((r.get(0)?, r.get(1)?, r.get(2)?, r.get(3)?, r.get(4)?)),
714            )
715            .optional()?;
716        let (dbr_type, mode, period, ec, target_alias) =
717            row.ok_or_else(|| anyhow::anyhow!("target PV '{target}' not found"))?;
718        if target_alias.is_some() {
719            anyhow::bail!(
720                "target PV '{target}' is itself an alias; aliases of aliases are not allowed"
721            );
722        }
723        // Reject if `alias` already exists either as a real PV or different alias.
724        let existing: Option<Option<String>> = conn
725            .query_row(
726                "SELECT alias_for FROM pv_info WHERE pv_name = ?1",
727                params![alias],
728                |r| r.get(0),
729            )
730            .optional()?;
731        if let Some(existing_alias) = existing {
732            if existing_alias.as_deref() == Some(target) {
733                return Ok(()); // idempotent
734            }
735            anyhow::bail!("'{alias}' already exists in registry");
736        }
737        let now = Utc::now().to_rfc3339();
738        conn.execute(
739            "INSERT INTO pv_info
740             (pv_name, dbr_type, sample_mode, sample_period, status, element_count,
741              created_at, updated_at, alias_for)
742             VALUES (?1, ?2, ?3, ?4, 'alias', ?5, ?6, ?6, ?7)",
743            params![alias, dbr_type, mode, period, ec, now, target],
744        )?;
745        Ok(())
746    }
747
748    /// Remove an alias row. Returns true if removed, false if the row was not
749    /// an alias or did not exist. Real PVs are not removed by this method.
750    pub fn remove_alias(&self, alias: &str) -> anyhow::Result<bool> {
751        let conn = self.lock_conn()?;
752        let rows = conn.execute(
753            "DELETE FROM pv_info WHERE pv_name = ?1 AND alias_for IS NOT NULL",
754            params![alias],
755        )?;
756        Ok(rows > 0)
757    }
758
759    /// If `name` is an alias, return its target. Returns None if `name` is
760    /// already a real PV or does not exist.
761    pub fn resolve_alias(&self, name: &str) -> anyhow::Result<Option<String>> {
762        let conn = self.lock_conn()?;
763        let row: Option<Option<String>> = conn
764            .query_row(
765                "SELECT alias_for FROM pv_info WHERE pv_name = ?1",
766                params![name],
767                |r| r.get(0),
768            )
769            .optional()?;
770        Ok(row.flatten())
771    }
772
773    /// Return the canonical PV name. If `name` is an alias, returns the target;
774    /// otherwise returns the input unchanged. Used by lookup/retrieval paths.
775    pub fn canonical_name(&self, name: &str) -> anyhow::Result<String> {
776        Ok(self
777            .resolve_alias(name)?
778            .unwrap_or_else(|| name.to_string()))
779    }
780
781    /// List all alias names pointing at a given target PV.
782    pub fn aliases_for(&self, target: &str) -> anyhow::Result<Vec<String>> {
783        let conn = self.lock_conn()?;
784        let mut stmt =
785            conn.prepare("SELECT pv_name FROM pv_info WHERE alias_for = ?1 ORDER BY pv_name")?;
786        let names = stmt
787            .query_map(params![target], |row| row.get(0))?
788            .collect::<Result<Vec<String>, _>>()?;
789        Ok(names)
790    }
791
792    /// All `(alias, target)` pairs in the registry.
793    pub fn all_aliases(&self) -> anyhow::Result<Vec<(String, String)>> {
794        let conn = self.lock_conn()?;
795        let mut stmt = conn.prepare(
796            "SELECT pv_name, alias_for FROM pv_info
797             WHERE alias_for IS NOT NULL ORDER BY pv_name",
798        )?;
799        let rows = stmt
800            .query_map([], |row| {
801                Ok((row.get::<_, String>(0)?, row.get::<_, String>(1)?))
802            })?
803            .collect::<Result<Vec<_>, _>>()?;
804        Ok(rows)
805    }
806
807    /// All PV names including aliases (`getAllExpandedPVNames`).
808    pub fn expanded_pv_names(&self) -> anyhow::Result<Vec<String>> {
809        let conn = self.lock_conn()?;
810        let mut stmt = conn.prepare("SELECT pv_name FROM pv_info ORDER BY pv_name")?;
811        let names = stmt
812            .query_map([], |row| row.get(0))?
813            .collect::<Result<Vec<String>, _>>()?;
814        Ok(names)
815    }
816
817    /// Get all PV records (for export).
818    pub fn all_records(&self) -> anyhow::Result<Vec<PvRecord>> {
819        let conn = self.lock_conn()?;
820        let mut stmt = conn.prepare(
821            "SELECT pv_name, dbr_type, sample_mode, sample_period, status, element_count,
822                    last_timestamp, created_at, updated_at, prec, egu,
823                    alias_for, archive_fields, policy_name, protocol
824             FROM pv_info ORDER BY pv_name",
825        )?;
826        let records = stmt
827            .query_map([], row_to_record)?
828            .collect::<Result<Vec<_>, _>>()?;
829        Ok(records)
830    }
831
832    /// Get real PVs that have not received events for longer than the
833    /// threshold duration. Only returns PVs that have a last_timestamp.
834    /// Aliases are excluded — they don't carry event timestamps.
835    pub fn silent_pvs(&self, threshold: Duration) -> anyhow::Result<Vec<PvRecord>> {
836        let conn = self.lock_conn()?;
837        let cutoff = SystemTime::now()
838            .checked_sub(threshold)
839            .unwrap_or(SystemTime::UNIX_EPOCH);
840        let cutoff_str = DateTime::<Utc>::from(cutoff).to_rfc3339();
841        let mut stmt = conn.prepare(
842            "SELECT pv_name, dbr_type, sample_mode, sample_period, status, element_count,
843                    last_timestamp, created_at, updated_at, prec, egu,
844                    alias_for, archive_fields, policy_name, protocol
845             FROM pv_info WHERE last_timestamp IS NOT NULL AND last_timestamp < ?1
846                            AND alias_for IS NULL
847             ORDER BY last_timestamp ASC",
848        )?;
849        let records = stmt
850            .query_map(params![cutoff_str], row_to_record)?
851            .collect::<Result<Vec<_>, _>>()?;
852        Ok(records)
853    }
854}
855
856fn row_to_record(row: &rusqlite::Row) -> rusqlite::Result<PvRecord> {
857    let pv_name: String = row.get(0)?;
858    let dbr_type_i: i32 = row.get(1)?;
859    let sample_mode_str: String = row.get(2)?;
860    let sample_period: f64 = row.get(3)?;
861    let status_str: String = row.get(4)?;
862    let element_count: i32 = row.get(5)?;
863    let last_ts_str: Option<String> = row.get(6)?;
864    let created_str: String = row.get(7)?;
865    let updated_str: String = row.get(8)?;
866    let prec: Option<String> = row.get(9).unwrap_or(None);
867    let egu: Option<String> = row.get(10).unwrap_or(None);
868    let alias_for: Option<String> = row.get(11).unwrap_or(None);
869    let archive_fields_json: Option<String> = row.get(12).unwrap_or(None);
870    let policy_name: Option<String> = row.get(13).unwrap_or(None);
871    // Column 14 ("protocol") may be absent on a row written by an
872    // older archiver — `unwrap_or(None)` falls back to default Ca.
873    let protocol_str: Option<String> = row.get(14).unwrap_or(None);
874    let protocol = protocol_str
875        .as_deref()
876        .and_then(Protocol::parse)
877        .unwrap_or_default();
878
879    let last_timestamp = last_ts_str.and_then(|s| {
880        DateTime::parse_from_rfc3339(&s)
881            .ok()
882            .map(|dt| dt.with_timezone(&Utc).into())
883    });
884
885    let archive_fields = archive_fields_json
886        .as_deref()
887        .and_then(|s| serde_json::from_str::<Vec<String>>(s).ok())
888        .unwrap_or_default();
889
890    Ok(PvRecord {
891        pv_name,
892        dbr_type: ArchDbType::from_i32(dbr_type_i).unwrap_or(ArchDbType::ScalarDouble),
893        sample_mode: SampleMode::from_db(&sample_mode_str, sample_period),
894        status: status_str.parse().unwrap_or(PvStatus::Active),
895        element_count,
896        last_timestamp,
897        created_at: DateTime::parse_from_rfc3339(&created_str)
898            .map(|dt| dt.with_timezone(&Utc))
899            .unwrap_or_else(|_| Utc::now()),
900        updated_at: DateTime::parse_from_rfc3339(&updated_str)
901            .map(|dt| dt.with_timezone(&Utc))
902            .unwrap_or_else(|_| Utc::now()),
903        prec,
904        egu,
905        alias_for,
906        archive_fields,
907        policy_name,
908        protocol,
909    })
910}
911
912/// SQLite returns generic `Error` for ALTER TABLE failures. Match on the
913/// message because rusqlite doesn't expose a structured "duplicate column"
914/// extended-code; the SQLite error string is `duplicate column name: <name>`.
915fn is_duplicate_column_error(e: &rusqlite::Error) -> bool {
916    matches!(
917        e,
918        rusqlite::Error::SqliteFailure(_, Some(msg))
919            if msg.starts_with("duplicate column name")
920    )
921}
922
923#[cfg(test)]
924mod tests {
925    use super::*;
926
927    #[test]
928    fn invalid_pv_names_rejected() {
929        // path traversal
930        assert!(!is_valid_pv_name("../etc/passwd"));
931        assert!(!is_valid_pv_name("foo/../bar"));
932        assert!(!is_valid_pv_name("foo:..:bar"));
933        assert!(!is_valid_pv_name("foo/./bar"));
934        // absolute paths via leading separator (round-10: leading ':'
935        // becomes '/' after pv_name_to_key and escapes the storage root)
936        assert!(!is_valid_pv_name("/etc/passwd"));
937        assert!(!is_valid_pv_name(":SIM:foo"));
938        assert!(!is_valid_pv_name(":foo"));
939        // empty segments (collapse oddly when split on / or :)
940        assert!(!is_valid_pv_name("foo::bar"));
941        assert!(!is_valid_pv_name("foo//bar"));
942        assert!(!is_valid_pv_name("foo:"));
943        assert!(!is_valid_pv_name("foo/"));
944        // shell metacharacters
945        assert!(!is_valid_pv_name("foo;rm -rf /"));
946        assert!(!is_valid_pv_name("foo|bar"));
947        assert!(!is_valid_pv_name("foo`x`"));
948        assert!(!is_valid_pv_name("foo$BAR"));
949        assert!(!is_valid_pv_name("foo bar"));
950        assert!(!is_valid_pv_name("foo\nbar"));
951        // edge inputs
952        assert!(!is_valid_pv_name(""));
953        assert!(!is_valid_pv_name(".hidden"));
954        assert!(!is_valid_pv_name("-leading-dash"));
955        assert!(!is_valid_pv_name(&"x".repeat(257)));
956    }
957
958    #[test]
959    fn valid_pv_names_accepted() {
960        // typical EPICS site naming conventions
961        assert!(is_valid_pv_name("SIM:Sine"));
962        assert!(is_valid_pv_name("XF:31IDA-OP{Tbl-Ax:X1}Mtr"));
963        assert!(is_valid_pv_name("ACC1-001-RFCAV-01:V<x>"));
964        assert!(is_valid_pv_name("PV.HIHI"));
965        assert!(is_valid_pv_name("BL_X+Y"));
966        assert!(is_valid_pv_name("a"));
967        assert!(is_valid_pv_name(&"x".repeat(256)));
968    }
969
970    #[test]
971    fn strip_field_suffix_basics() {
972        assert_eq!(strip_field_suffix("BASE.HIHI"), Some("BASE"));
973        assert_eq!(strip_field_suffix("BASE.LOLO"), Some("BASE"));
974        assert_eq!(strip_field_suffix("FOO.BAR_99"), Some("FOO"));
975        // no suffix
976        assert_eq!(strip_field_suffix("BASE"), None);
977        // lowercase / mixed-case rejected (not a standard EPICS field)
978        assert_eq!(strip_field_suffix("BASE.hihi"), None);
979        assert_eq!(strip_field_suffix("BASE.Hihi"), None);
980        // empty parts
981        assert_eq!(strip_field_suffix(".HIHI"), None);
982        assert_eq!(strip_field_suffix("BASE."), None);
983    }
984
985    #[test]
986    fn test_register_and_get() {
987        let reg = PvRegistry::in_memory().unwrap();
988        reg.register_pv(
989            "SIM:Sine",
990            ArchDbType::ScalarDouble,
991            &SampleMode::Monitor,
992            1,
993        )
994        .unwrap();
995
996        let record = reg.get_pv("SIM:Sine").unwrap().unwrap();
997        assert_eq!(record.pv_name, "SIM:Sine");
998        assert_eq!(record.dbr_type, ArchDbType::ScalarDouble);
999        assert_eq!(record.status, PvStatus::Active);
1000    }
1001
1002    #[test]
1003    fn test_status_transitions() {
1004        let reg = PvRegistry::in_memory().unwrap();
1005        reg.register_pv(
1006            "SIM:Test",
1007            ArchDbType::ScalarDouble,
1008            &SampleMode::Monitor,
1009            1,
1010        )
1011        .unwrap();
1012
1013        reg.set_status("SIM:Test", PvStatus::Paused).unwrap();
1014        let r = reg.get_pv("SIM:Test").unwrap().unwrap();
1015        assert_eq!(r.status, PvStatus::Paused);
1016
1017        reg.set_status("SIM:Test", PvStatus::Active).unwrap();
1018        let r = reg.get_pv("SIM:Test").unwrap().unwrap();
1019        assert_eq!(r.status, PvStatus::Active);
1020    }
1021
1022    #[test]
1023    fn test_pattern_matching() {
1024        let reg = PvRegistry::in_memory().unwrap();
1025        reg.register_pv(
1026            "SIM:Sine",
1027            ArchDbType::ScalarDouble,
1028            &SampleMode::Monitor,
1029            1,
1030        )
1031        .unwrap();
1032        reg.register_pv(
1033            "SIM:Cosine",
1034            ArchDbType::ScalarDouble,
1035            &SampleMode::Monitor,
1036            1,
1037        )
1038        .unwrap();
1039        reg.register_pv(
1040            "EXP:BL1:run:active",
1041            ArchDbType::ScalarEnum,
1042            &SampleMode::Monitor,
1043            1,
1044        )
1045        .unwrap();
1046        reg.register_pv(
1047            "EXP:BL1:motor:th:readback",
1048            ArchDbType::ScalarDouble,
1049            &SampleMode::Monitor,
1050            1,
1051        )
1052        .unwrap();
1053
1054        let sim = reg.matching_pvs("SIM:*").unwrap();
1055        assert_eq!(sim.len(), 2);
1056
1057        let exp = reg.matching_pvs("EXP:BL1:*").unwrap();
1058        assert_eq!(exp.len(), 2);
1059
1060        let motor = reg.matching_pvs("EXP:*:motor:*").unwrap();
1061        assert_eq!(motor.len(), 1);
1062    }
1063
1064    #[test]
1065    fn test_count_and_list() {
1066        let reg = PvRegistry::in_memory().unwrap();
1067        for i in 0..100 {
1068            reg.register_pv(
1069                &format!("PV:Test:{i:04}"),
1070                ArchDbType::ScalarDouble,
1071                &SampleMode::Monitor,
1072                1,
1073            )
1074            .unwrap();
1075        }
1076
1077        assert_eq!(reg.count(None).unwrap(), 100);
1078        assert_eq!(reg.count(Some(PvStatus::Active)).unwrap(), 100);
1079
1080        let names = reg.all_pv_names().unwrap();
1081        assert_eq!(names.len(), 100);
1082    }
1083
1084    #[test]
1085    fn test_remove_pv() {
1086        let reg = PvRegistry::in_memory().unwrap();
1087        reg.register_pv(
1088            "SIM:Gone",
1089            ArchDbType::ScalarDouble,
1090            &SampleMode::Monitor,
1091            1,
1092        )
1093        .unwrap();
1094        assert!(reg.get_pv("SIM:Gone").unwrap().is_some());
1095
1096        reg.remove_pv("SIM:Gone").unwrap();
1097        assert!(reg.get_pv("SIM:Gone").unwrap().is_none());
1098    }
1099
1100    #[test]
1101    fn test_batch_update_timestamps() {
1102        let reg = PvRegistry::in_memory().unwrap();
1103        reg.register_pv("PV:A", ArchDbType::ScalarDouble, &SampleMode::Monitor, 1)
1104            .unwrap();
1105        reg.register_pv("PV:B", ArchDbType::ScalarDouble, &SampleMode::Monitor, 1)
1106            .unwrap();
1107
1108        let now = SystemTime::now();
1109        reg.batch_update_timestamps(&[("PV:A", now), ("PV:B", now)])
1110            .unwrap();
1111
1112        let a = reg.get_pv("PV:A").unwrap().unwrap();
1113        assert!(a.last_timestamp.is_some());
1114    }
1115
1116    #[test]
1117    fn test_recently_added_pvs() {
1118        let reg = PvRegistry::in_memory().unwrap();
1119        let before = SystemTime::now() - Duration::from_secs(1);
1120        reg.register_pv("PV:New", ArchDbType::ScalarDouble, &SampleMode::Monitor, 1)
1121            .unwrap();
1122
1123        let recent = reg.recently_added_pvs(before).unwrap();
1124        assert_eq!(recent.len(), 1);
1125        assert_eq!(recent[0].pv_name, "PV:New");
1126
1127        let future = SystemTime::now() + Duration::from_secs(3600);
1128        let none = reg.recently_added_pvs(future).unwrap();
1129        assert!(none.is_empty());
1130    }
1131
1132    #[test]
1133    fn test_recently_modified_pvs() {
1134        let reg = PvRegistry::in_memory().unwrap();
1135        reg.register_pv("PV:Mod", ArchDbType::ScalarDouble, &SampleMode::Monitor, 1)
1136            .unwrap();
1137        let before = SystemTime::now() - Duration::from_secs(1);
1138
1139        // Modify status to update updated_at.
1140        reg.set_status("PV:Mod", PvStatus::Paused).unwrap();
1141
1142        let recent = reg.recently_modified_pvs(before).unwrap();
1143        assert!(recent.iter().any(|r| r.pv_name == "PV:Mod"));
1144    }
1145
1146    #[test]
1147    fn test_update_sample_mode() {
1148        let reg = PvRegistry::in_memory().unwrap();
1149        reg.register_pv("PV:Mode", ArchDbType::ScalarDouble, &SampleMode::Monitor, 1)
1150            .unwrap();
1151
1152        let new_mode = SampleMode::Scan { period_secs: 5.0 };
1153        assert!(reg.update_sample_mode("PV:Mode", &new_mode).unwrap());
1154
1155        let r = reg.get_pv("PV:Mode").unwrap().unwrap();
1156        assert_eq!(r.sample_mode, SampleMode::Scan { period_secs: 5.0 });
1157    }
1158
1159    #[test]
1160    fn test_archive_fields_roundtrip() {
1161        let reg = PvRegistry::in_memory().unwrap();
1162        reg.register_pv(
1163            "PV:Fields",
1164            ArchDbType::ScalarDouble,
1165            &SampleMode::Monitor,
1166            1,
1167        )
1168        .unwrap();
1169
1170        // Default is empty.
1171        let r = reg.get_pv("PV:Fields").unwrap().unwrap();
1172        assert!(r.archive_fields.is_empty());
1173
1174        // Set and read back.
1175        let fields = vec!["HIHI".to_string(), "LOLO".to_string(), "EGU".to_string()];
1176        assert!(reg.update_archive_fields("PV:Fields", &fields).unwrap());
1177        let r = reg.get_pv("PV:Fields").unwrap().unwrap();
1178        assert_eq!(r.archive_fields, fields);
1179
1180        // Clearing with [] removes the JSON entry.
1181        assert!(reg.update_archive_fields("PV:Fields", &[]).unwrap());
1182        let r = reg.get_pv("PV:Fields").unwrap().unwrap();
1183        assert!(r.archive_fields.is_empty());
1184    }
1185
1186    #[test]
1187    fn test_policy_name_roundtrip() {
1188        let reg = PvRegistry::in_memory().unwrap();
1189        reg.register_pv("PV:Pol", ArchDbType::ScalarDouble, &SampleMode::Monitor, 1)
1190            .unwrap();
1191
1192        let r = reg.get_pv("PV:Pol").unwrap().unwrap();
1193        assert!(r.policy_name.is_none());
1194
1195        assert!(reg.update_policy_name("PV:Pol", Some("fast")).unwrap());
1196        let r = reg.get_pv("PV:Pol").unwrap().unwrap();
1197        assert_eq!(r.policy_name.as_deref(), Some("fast"));
1198
1199        assert!(reg.update_policy_name("PV:Pol", None).unwrap());
1200        let r = reg.get_pv("PV:Pol").unwrap().unwrap();
1201        assert!(r.policy_name.is_none());
1202    }
1203
1204    #[test]
1205    fn test_import_pv_with_alias_and_fields() {
1206        let reg = PvRegistry::in_memory().unwrap();
1207        let fields = vec!["HIHI".to_string(), "LOLO".to_string()];
1208        reg.import_pv(
1209            "PV:Aliased",
1210            ArchDbType::ScalarDouble,
1211            &SampleMode::Monitor,
1212            1,
1213            PvStatus::Active,
1214            None,
1215            Some("3"),
1216            Some("mA"),
1217            Some("PV:Real"),
1218            &fields,
1219            Some("ring"),
1220        )
1221        .unwrap();
1222
1223        let r = reg.get_pv("PV:Aliased").unwrap().unwrap();
1224        assert_eq!(r.alias_for.as_deref(), Some("PV:Real"));
1225        assert_eq!(r.archive_fields, fields);
1226        assert_eq!(r.policy_name.as_deref(), Some("ring"));
1227        assert_eq!(r.prec.as_deref(), Some("3"));
1228        assert_eq!(r.egu.as_deref(), Some("mA"));
1229    }
1230
1231    #[test]
1232    fn test_migration_from_old_schema() {
1233        // Build a connection with the v0.1.4 schema (no alias/archive_fields/policy)
1234        // to verify ALTER TABLE migrations succeed and old rows decode cleanly.
1235        let conn = Connection::open_in_memory().unwrap();
1236        conn.execute_batch(
1237            "CREATE TABLE pv_info (
1238                pv_name        TEXT PRIMARY KEY NOT NULL,
1239                dbr_type       INTEGER NOT NULL,
1240                sample_mode    TEXT NOT NULL DEFAULT 'monitor',
1241                sample_period  REAL NOT NULL DEFAULT 0.0,
1242                status         TEXT NOT NULL DEFAULT 'active',
1243                element_count  INTEGER NOT NULL DEFAULT 1,
1244                last_timestamp TEXT,
1245                created_at     TEXT NOT NULL,
1246                updated_at     TEXT NOT NULL,
1247                prec           TEXT,
1248                egu            TEXT
1249            );",
1250        )
1251        .unwrap();
1252        let now = Utc::now().to_rfc3339();
1253        conn.execute(
1254            "INSERT INTO pv_info
1255             (pv_name, dbr_type, sample_mode, sample_period, status, element_count,
1256              created_at, updated_at, prec, egu)
1257             VALUES (?1, ?2, 'monitor', 0.0, 'active', 1, ?3, ?3, NULL, NULL)",
1258            params!["PV:Legacy", ArchDbType::ScalarDouble as i32, now],
1259        )
1260        .unwrap();
1261
1262        let reg = PvRegistry {
1263            conn: Mutex::new(conn),
1264        };
1265        // Re-running init_schema should add the new columns without dropping data.
1266        reg.init_schema().unwrap();
1267
1268        let r = reg.get_pv("PV:Legacy").unwrap().unwrap();
1269        assert_eq!(r.pv_name, "PV:Legacy");
1270        assert!(r.alias_for.is_none());
1271        assert!(r.archive_fields.is_empty());
1272        assert!(r.policy_name.is_none());
1273
1274        // New writes work too.
1275        assert!(
1276            reg.update_archive_fields("PV:Legacy", &["HIHI".to_string()])
1277                .unwrap()
1278        );
1279        let r = reg.get_pv("PV:Legacy").unwrap().unwrap();
1280        assert_eq!(r.archive_fields, vec!["HIHI".to_string()]);
1281    }
1282
1283    #[test]
1284    fn test_aliases_basic() {
1285        let reg = PvRegistry::in_memory().unwrap();
1286        reg.register_pv(
1287            "RING:Current",
1288            ArchDbType::ScalarDouble,
1289            &SampleMode::Monitor,
1290            1,
1291        )
1292        .unwrap();
1293
1294        // Add an alias and verify resolution.
1295        reg.add_alias("DEV:Current", "RING:Current").unwrap();
1296        assert_eq!(
1297            reg.resolve_alias("DEV:Current").unwrap().as_deref(),
1298            Some("RING:Current"),
1299        );
1300        assert!(reg.resolve_alias("RING:Current").unwrap().is_none()); // real PV
1301        assert!(reg.resolve_alias("Nonexistent").unwrap().is_none());
1302
1303        assert_eq!(reg.canonical_name("DEV:Current").unwrap(), "RING:Current");
1304        assert_eq!(reg.canonical_name("RING:Current").unwrap(), "RING:Current");
1305
1306        // Aliases for / all aliases.
1307        assert_eq!(
1308            reg.aliases_for("RING:Current").unwrap(),
1309            vec!["DEV:Current".to_string()],
1310        );
1311        assert_eq!(
1312            reg.all_aliases().unwrap(),
1313            vec![("DEV:Current".to_string(), "RING:Current".to_string())],
1314        );
1315
1316        // Expanded names contains both real and alias.
1317        let expanded = reg.expanded_pv_names().unwrap();
1318        assert!(expanded.contains(&"RING:Current".to_string()));
1319        assert!(expanded.contains(&"DEV:Current".to_string()));
1320
1321        // Idempotent re-add.
1322        reg.add_alias("DEV:Current", "RING:Current").unwrap();
1323        assert_eq!(reg.aliases_for("RING:Current").unwrap().len(), 1);
1324
1325        // Remove alias.
1326        assert!(reg.remove_alias("DEV:Current").unwrap());
1327        assert!(reg.resolve_alias("DEV:Current").unwrap().is_none());
1328        assert!(!reg.remove_alias("DEV:Current").unwrap()); // already gone
1329    }
1330
1331    #[test]
1332    fn test_alias_conflicts() {
1333        let reg = PvRegistry::in_memory().unwrap();
1334        reg.register_pv("PV:A", ArchDbType::ScalarDouble, &SampleMode::Monitor, 1)
1335            .unwrap();
1336        reg.register_pv("PV:B", ArchDbType::ScalarDouble, &SampleMode::Monitor, 1)
1337            .unwrap();
1338
1339        // Cannot alias to nonexistent target.
1340        assert!(reg.add_alias("Alias:X", "Nonexistent").is_err());
1341
1342        // Cannot self-alias.
1343        assert!(reg.add_alias("PV:A", "PV:A").is_err());
1344
1345        // Alias name conflicts with existing real PV.
1346        assert!(reg.add_alias("PV:B", "PV:A").is_err());
1347
1348        // Alias of alias not allowed.
1349        reg.add_alias("Alias:A", "PV:A").unwrap();
1350        assert!(reg.add_alias("Alias:Two", "Alias:A").is_err());
1351
1352        // remove_alias does NOT delete real PVs.
1353        assert!(!reg.remove_alias("PV:A").unwrap());
1354        assert!(reg.get_pv("PV:A").unwrap().is_some());
1355    }
1356
1357    #[test]
1358    fn test_silent_pvs() {
1359        let reg = PvRegistry::in_memory().unwrap();
1360        reg.register_pv(
1361            "PV:Silent",
1362            ArchDbType::ScalarDouble,
1363            &SampleMode::Monitor,
1364            1,
1365        )
1366        .unwrap();
1367        reg.register_pv(
1368            "PV:NoData",
1369            ArchDbType::ScalarDouble,
1370            &SampleMode::Monitor,
1371            1,
1372        )
1373        .unwrap();
1374
1375        // Set PV:Silent's last_timestamp to 2 hours ago.
1376        let old_time = SystemTime::now() - Duration::from_secs(7200);
1377        reg.update_last_timestamp("PV:Silent", old_time).unwrap();
1378
1379        // PV:NoData has no last_timestamp — should not appear.
1380        let silent = reg.silent_pvs(Duration::from_secs(3600)).unwrap();
1381        assert_eq!(silent.len(), 1);
1382        assert_eq!(silent[0].pv_name, "PV:Silent");
1383    }
1384}