Skip to main content

archiver_core/
registry.rs

1//! PV metadata registry backed by SQLite.
2//!
3//! Persists PV archiving configuration and state across restarts.
4//! This is metadata only — time-series data lives in PlainPB files.
5
6use std::path::Path;
7use std::sync::Mutex;
8use std::time::{Duration, SystemTime};
9
10use chrono::{DateTime, Utc};
11use rusqlite::{Connection, OptionalExtension, params};
12use tracing::info;
13
14use crate::types::ArchDbType;
15
16/// Wire protocol the engine should use when subscribing to a PV.
17///
18/// Selected from a `pva://` / `ca://` prefix on the user-supplied name.
19/// Unprefixed names default to [`Protocol::Ca`] for Java EAA parity.
20#[derive(Debug, Clone, Copy, Default, PartialEq, Eq)]
21pub enum Protocol {
22    /// Channel Access (the original EPICS protocol).
23    #[default]
24    Ca,
25    /// pvAccess (structured types, larger payloads).
26    Pva,
27}
28
29impl Protocol {
30    pub fn as_str(self) -> &'static str {
31        match self {
32            Protocol::Ca => "ca",
33            Protocol::Pva => "pva",
34        }
35    }
36
37    pub fn parse(s: &str) -> Option<Self> {
38        match s {
39            "ca" => Some(Protocol::Ca),
40            "pva" => Some(Protocol::Pva),
41            _ => None,
42        }
43    }
44}
45
46/// Canonicalize a user-supplied PV name to the form the registry stores.
47///
48/// Java archiver's `PVNames.normalizeChannelName` + `stripPrefixFromName`:
49/// - `pva://X` and `ca://X` are protocol hints, not part of the channel name
50/// - `X.VAL` and `X` refer to the same EPICS record (`.VAL` is the default field)
51///
52/// Without this, `archivePV?pv=PV1.VAL` and `archivePV?pv=PV1` would create
53/// two separate registry rows that subscribe to the same IOC channel.
54pub fn normalize_pv_name(name: &str) -> &str {
55    let name = name
56        .strip_prefix("pva://")
57        .or_else(|| name.strip_prefix("ca://"))
58        .unwrap_or(name);
59    name.strip_suffix(".VAL").unwrap_or(name)
60}
61
62/// Like [`normalize_pv_name`] but also reports the protocol the prefix
63/// asked for. Returns `(Protocol::Ca, name)` when no prefix is present.
64pub fn parse_pv_with_protocol(name: &str) -> (Protocol, &str) {
65    if let Some(rest) = name.strip_prefix("pva://") {
66        (Protocol::Pva, rest.strip_suffix(".VAL").unwrap_or(rest))
67    } else if let Some(rest) = name.strip_prefix("ca://") {
68        (Protocol::Ca, rest.strip_suffix(".VAL").unwrap_or(rest))
69    } else {
70        (Protocol::Ca, name.strip_suffix(".VAL").unwrap_or(name))
71    }
72}
73
74/// Strip a trailing `.<FIELD>` suffix (e.g. `.HIHI`, `.LOLO`, `.DESC`) from
75/// a PV name, returning the bare PV name. Java's
76/// `PVNames.stripFieldNameFromPVName` (c150faad/5b2a7cb4): a query for
77/// `BASE.HIHI` should fall back to typeinfo for `BASE` so handlers can
78/// surface the field-archived metadata. Returns `None` if there is no
79/// field suffix or the suffix contains characters that aren't valid in
80/// EPICS record-field names (uppercase letters / digits / underscores).
81pub fn strip_field_suffix(name: &str) -> Option<&str> {
82    let (base, field) = name.rsplit_once('.')?;
83    if base.is_empty() || field.is_empty() {
84        return None;
85    }
86    if !field
87        .chars()
88        .all(|c| c.is_ascii_uppercase() || c.is_ascii_digit() || c == '_')
89    {
90        return None;
91    }
92    Some(base)
93}
94
95/// Reject PV names that, when mapped to a filesystem path, could escape
96/// the storage root or do other naughty things. Java archiver enforces a
97/// `[A-Za-z0-9_:.\-+\[\]<>;]` allowlist via `PVNames.isValidPVName`; we
98/// instead use a conservative blocklist (no `..`, no `/`, no NUL, no
99/// leading `.`, no leading-`-`, no whitespace) — strict enough to keep
100/// `pv_name_to_key` from generating an absolute or traversal path, but
101/// permissive enough not to break existing site naming conventions
102/// like `SIM:Sine` and `IOC[A]:val<x>`.
103///
104/// Called by every registry entry-point (register / import / add_alias)
105/// AND by `pv_name_to_key` as defense in depth, so any code path that
106/// builds a filesystem name from a PV string fails closed if the
107/// HTTP-layer validation is bypassed.
108pub fn is_valid_pv_name(name: &str) -> bool {
109    if name.is_empty() || name.len() > 256 {
110        return false;
111    }
112    // Leading separator → after pv_name_to_key the result begins with
113    // `/`, and `Path::join(root, "/abs/path")` ignores `root` entirely
114    // (Rust semantics) → escapes the storage root. Same for the `.` /
115    // `-` cases.
116    if name.starts_with('.')
117        || name.starts_with('-')
118        || name.starts_with('/')
119        || name.starts_with(':')
120    {
121        return false;
122    }
123    for component in name.split([':', '/']) {
124        // `..` / `.` are obvious traversal. An empty segment (`A::B`,
125        // `A//B`, trailing `:`/`/`) maps to a `//` in the filesystem
126        // path which most OSes collapse but some path-relative tools
127        // re-split, so just reject.
128        if component.is_empty() || component == ".." || component == "." {
129            return false;
130        }
131    }
132    !name.chars().any(|c| {
133        c == '\0'
134            || c == '\\'
135            || c.is_whitespace()
136            || c.is_control()
137            // Reject backslash and other path-shell metacharacters that
138            // could surprise downstream tools that re-parse the name.
139            || matches!(c, '|' | '&' | ';' | '`' | '$' | '"' | '\'' | '*' | '?')
140    })
141}
142
143/// PV archiving status.
144///
145/// `Alias` is a sentinel applied to alias rows so they don't appear in
146/// status-filtered queries (`pvs_by_status(Active)`, getPVCount, restore
147/// loops). Aliases are routing entries, not archive subjects.
148#[derive(Debug, Clone, Copy, PartialEq, Eq)]
149pub enum PvStatus {
150    Active,
151    Paused,
152    Error,
153    Inactive,
154    Alias,
155}
156
157impl PvStatus {
158    pub fn as_str(self) -> &'static str {
159        match self {
160            Self::Active => "active",
161            Self::Paused => "paused",
162            Self::Error => "error",
163            Self::Inactive => "inactive",
164            Self::Alias => "alias",
165        }
166    }
167}
168
169impl std::str::FromStr for PvStatus {
170    type Err = std::convert::Infallible;
171
172    fn from_str(s: &str) -> Result<Self, Self::Err> {
173        Ok(match s {
174            "active" => Self::Active,
175            "paused" => Self::Paused,
176            "error" => Self::Error,
177            "inactive" => Self::Inactive,
178            "alias" => Self::Alias,
179            _ => Self::Active,
180        })
181    }
182}
183
184/// Sampling mode stored in the registry.
185#[derive(Debug, Clone, PartialEq)]
186pub enum SampleMode {
187    Monitor,
188    Scan { period_secs: f64 },
189}
190
191impl SampleMode {
192    fn to_db(&self) -> (&str, f64) {
193        match self {
194            Self::Monitor => ("monitor", 0.0),
195            Self::Scan { period_secs } => ("scan", *period_secs),
196        }
197    }
198
199    fn from_db(mode: &str, period: f64) -> Self {
200        match mode {
201            "scan" => Self::Scan {
202                period_secs: period,
203            },
204            _ => Self::Monitor,
205        }
206    }
207}
208
209/// A PV record in the registry.
210#[derive(Debug, Clone)]
211pub struct PvRecord {
212    pub pv_name: String,
213    pub dbr_type: ArchDbType,
214    pub sample_mode: SampleMode,
215    pub status: PvStatus,
216    pub element_count: i32,
217    pub last_timestamp: Option<SystemTime>,
218    pub created_at: DateTime<Utc>,
219    pub updated_at: DateTime<Utc>,
220    pub prec: Option<String>,
221    pub egu: Option<String>,
222    /// When set, this row is an alias pointing at another PV name.
223    /// Lookups should resolve to that target before reading/writing data.
224    pub alias_for: Option<String>,
225    /// Names of EPICS metadata fields (e.g. ["HIHI","LOLO","EGU"]) that the
226    /// engine should sample alongside the main value and attach to events.
227    pub archive_fields: Vec<String>,
228    /// Name of the policy that selected this PV's sampling configuration.
229    pub policy_name: Option<String>,
230    /// Wire protocol the engine subscribes with — selected from the
231    /// `pva://` / `ca://` prefix at archive_pv time. Defaults to
232    /// [`Protocol::Ca`] for legacy rows.
233    pub protocol: Protocol,
234}
235
236/// SQLite-backed PV metadata registry.
237pub struct PvRegistry {
238    conn: Mutex<Connection>,
239}
240
241impl PvRegistry {
242    fn lock_conn(&self) -> anyhow::Result<std::sync::MutexGuard<'_, Connection>> {
243        self.conn
244            .lock()
245            .map_err(|e| anyhow::anyhow!("PV registry lock poisoned: {e}"))
246    }
247
248    /// Open (or create) the registry database at the given path.
249    pub fn open(path: &Path) -> anyhow::Result<Self> {
250        let conn = Connection::open(path)?;
251        let registry = Self {
252            conn: Mutex::new(conn),
253        };
254        registry.init_schema()?;
255        Ok(registry)
256    }
257
258    /// Create an in-memory registry (for testing).
259    pub fn in_memory() -> anyhow::Result<Self> {
260        let conn = Connection::open_in_memory()?;
261        let registry = Self {
262            conn: Mutex::new(conn),
263        };
264        registry.init_schema()?;
265        Ok(registry)
266    }
267
268    fn init_schema(&self) -> anyhow::Result<()> {
269        let conn = self.lock_conn()?;
270        // Step 1: create the table (idempotent) and indexes that don't depend
271        // on columns added by later migrations.
272        conn.execute_batch(
273            "
274            CREATE TABLE IF NOT EXISTS pv_info (
275                pv_name         TEXT PRIMARY KEY NOT NULL,
276                dbr_type        INTEGER NOT NULL,
277                sample_mode     TEXT NOT NULL DEFAULT 'monitor',
278                sample_period   REAL NOT NULL DEFAULT 0.0,
279                status          TEXT NOT NULL DEFAULT 'active',
280                element_count   INTEGER NOT NULL DEFAULT 1,
281                last_timestamp  TEXT,
282                created_at      TEXT NOT NULL,
283                updated_at      TEXT NOT NULL,
284                prec            TEXT,
285                egu             TEXT,
286                alias_for       TEXT,
287                archive_fields  TEXT,
288                policy_name     TEXT,
289                protocol        TEXT NOT NULL DEFAULT 'ca'
290            );
291
292            CREATE INDEX IF NOT EXISTS idx_pv_status ON pv_info(status);
293            CREATE INDEX IF NOT EXISTS idx_pv_prefix ON pv_info(pv_name COLLATE NOCASE);
294            ",
295        )?;
296        // Step 2: migrations for tables created with older schemas. Each statement
297        // runs independently because SQLite stops on the first duplicate-column
298        // error when batched. Only "duplicate column" errors are silently
299        // accepted; disk-full / lock / corruption errors must surface.
300        for stmt in [
301            "ALTER TABLE pv_info ADD COLUMN prec TEXT",
302            "ALTER TABLE pv_info ADD COLUMN egu TEXT",
303            "ALTER TABLE pv_info ADD COLUMN alias_for TEXT",
304            "ALTER TABLE pv_info ADD COLUMN archive_fields TEXT",
305            "ALTER TABLE pv_info ADD COLUMN policy_name TEXT",
306            "ALTER TABLE pv_info ADD COLUMN protocol TEXT NOT NULL DEFAULT 'ca'",
307        ] {
308            match conn.execute(stmt, []) {
309                Ok(_) => {}
310                Err(e) if is_duplicate_column_error(&e) => {}
311                Err(e) => return Err(e.into()),
312            }
313        }
314        // Step 3: indexes that reference newly-added columns. Done after ALTER
315        // so an upgraded database has the columns to index.
316        conn.execute_batch(
317            "CREATE INDEX IF NOT EXISTS idx_pv_alias \
318             ON pv_info(alias_for) WHERE alias_for IS NOT NULL;",
319        )?;
320        info!("PV registry schema initialized");
321        Ok(())
322    }
323
324    /// Register a new PV for archiving with default CA protocol.
325    /// Backwards-compatible wrapper around [`register_pv_with_protocol`].
326    pub fn register_pv(
327        &self,
328        pv_name: &str,
329        dbr_type: ArchDbType,
330        sample_mode: &SampleMode,
331        element_count: i32,
332    ) -> anyhow::Result<()> {
333        self.register_pv_with_protocol(
334            pv_name,
335            dbr_type,
336            sample_mode,
337            element_count,
338            Protocol::Ca,
339        )
340    }
341
342    /// Register a new PV for archiving, recording which wire protocol the
343    /// engine should use (CA vs PVA).
344    pub fn register_pv_with_protocol(
345        &self,
346        pv_name: &str,
347        dbr_type: ArchDbType,
348        sample_mode: &SampleMode,
349        element_count: i32,
350        protocol: Protocol,
351    ) -> anyhow::Result<()> {
352        if !is_valid_pv_name(pv_name) {
353            anyhow::bail!("invalid PV name: {pv_name:?}");
354        }
355        let conn = self.lock_conn()?;
356        let now = Utc::now().to_rfc3339();
357        let (mode_str, period) = sample_mode.to_db();
358
359        conn.execute(
360            "INSERT OR REPLACE INTO pv_info
361             (pv_name, dbr_type, sample_mode, sample_period, status, element_count, created_at, updated_at, protocol)
362             VALUES (?1, ?2, ?3, ?4, 'active', ?5, COALESCE((SELECT created_at FROM pv_info WHERE pv_name = ?1), ?6), ?6, ?7)",
363            params![pv_name, dbr_type as i32, mode_str, period, element_count, now, protocol.as_str()],
364        )?;
365        Ok(())
366    }
367
368    /// Update PV status (active, paused, error).
369    pub fn set_status(&self, pv_name: &str, status: PvStatus) -> anyhow::Result<bool> {
370        let conn = self.lock_conn()?;
371        let now = Utc::now().to_rfc3339();
372        let rows = conn.execute(
373            "UPDATE pv_info SET status = ?1, updated_at = ?2 WHERE pv_name = ?3",
374            params![status.as_str(), now, pv_name],
375        )?;
376        Ok(rows > 0)
377    }
378
379    /// Update the last known timestamp for a PV.
380    pub fn update_last_timestamp(
381        &self,
382        pv_name: &str,
383        timestamp: SystemTime,
384    ) -> anyhow::Result<()> {
385        let conn = self.lock_conn()?;
386        let dt = DateTime::<Utc>::from(timestamp).to_rfc3339();
387        let now = Utc::now().to_rfc3339();
388        conn.execute(
389            "UPDATE pv_info SET last_timestamp = ?1, updated_at = ?2 WHERE pv_name = ?3",
390            params![dt, now, pv_name],
391        )?;
392        Ok(())
393    }
394
395    /// Remove a PV from the registry entirely.
396    pub fn remove_pv(&self, pv_name: &str) -> anyhow::Result<bool> {
397        let conn = self.lock_conn()?;
398        let rows = conn.execute("DELETE FROM pv_info WHERE pv_name = ?1", params![pv_name])?;
399        Ok(rows > 0)
400    }
401
402    /// Get a single PV record.
403    pub fn get_pv(&self, pv_name: &str) -> anyhow::Result<Option<PvRecord>> {
404        let conn = self.lock_conn()?;
405        conn.query_row(
406            "SELECT pv_name, dbr_type, sample_mode, sample_period, status, element_count,
407                    last_timestamp, created_at, updated_at, prec, egu,
408                    alias_for, archive_fields, policy_name, protocol
409             FROM pv_info WHERE pv_name = ?1",
410            params![pv_name],
411            row_to_record,
412        )
413        .optional()
414        .map_err(Into::into)
415    }
416
417    /// List all real PV names (alias rows excluded). Use
418    /// [`Self::expanded_pv_names`] to include aliases.
419    pub fn all_pv_names(&self) -> anyhow::Result<Vec<String>> {
420        let conn = self.lock_conn()?;
421        let mut stmt =
422            conn.prepare("SELECT pv_name FROM pv_info WHERE alias_for IS NULL ORDER BY pv_name")?;
423        let names = stmt
424            .query_map([], |row| row.get(0))?
425            .collect::<Result<Vec<String>, _>>()?;
426        Ok(names)
427    }
428
429    /// List all real PVs with a given status. Alias rows have
430    /// `status='alias'` and are excluded from every other-status query.
431    pub fn pvs_by_status(&self, status: PvStatus) -> anyhow::Result<Vec<PvRecord>> {
432        let conn = self.lock_conn()?;
433        let mut stmt = conn.prepare(
434            "SELECT pv_name, dbr_type, sample_mode, sample_period, status, element_count,
435                    last_timestamp, created_at, updated_at, prec, egu,
436                    alias_for, archive_fields, policy_name, protocol
437             FROM pv_info WHERE status = ?1 ORDER BY pv_name",
438        )?;
439        let records = stmt
440            .query_map(params![status.as_str()], row_to_record)?
441            .collect::<Result<Vec<_>, _>>()?;
442        Ok(records)
443    }
444
445    /// Match real PV names by glob pattern (SQL GLOB). Excludes aliases.
446    pub fn matching_pvs(&self, pattern: &str) -> anyhow::Result<Vec<String>> {
447        let conn = self.lock_conn()?;
448        let mut stmt = conn.prepare(
449            "SELECT pv_name FROM pv_info
450             WHERE pv_name GLOB ?1 AND alias_for IS NULL
451             ORDER BY pv_name",
452        )?;
453        let names = stmt
454            .query_map(params![pattern], |row| row.get(0))?
455            .collect::<Result<Vec<String>, _>>()?;
456        Ok(names)
457    }
458
459    /// Match PV names by glob pattern, INCLUDING alias rows. Java's
460    /// `getMatchingPVs` returns aliases too (c61f1579) — without this an
461    /// admin walking the inventory by glob silently misses every alias.
462    /// Internal callers that only want real PVs (engine, reports) stay
463    /// on `matching_pvs`.
464    pub fn matching_pvs_expanded(&self, pattern: &str) -> anyhow::Result<Vec<String>> {
465        let conn = self.lock_conn()?;
466        let mut stmt =
467            conn.prepare("SELECT pv_name FROM pv_info WHERE pv_name GLOB ?1 ORDER BY pv_name")?;
468        let names = stmt
469            .query_map(params![pattern], |row| row.get(0))?
470            .collect::<Result<Vec<String>, _>>()?;
471        Ok(names)
472    }
473
474    /// Count real PVs, optionally filtered by status. Excludes aliases.
475    pub fn count(&self, status: Option<PvStatus>) -> anyhow::Result<u64> {
476        let conn = self.lock_conn()?;
477        let count: u64 = match status {
478            Some(s) => conn.query_row(
479                "SELECT COUNT(*) FROM pv_info
480                 WHERE status = ?1 AND alias_for IS NULL",
481                params![s.as_str()],
482                |row| row.get(0),
483            )?,
484            None => conn.query_row(
485                "SELECT COUNT(*) FROM pv_info WHERE alias_for IS NULL",
486                [],
487                |row| row.get(0),
488            )?,
489        };
490        Ok(count)
491    }
492
493    /// Batch update last timestamps (for periodic flush).
494    pub fn batch_update_timestamps(&self, updates: &[(&str, SystemTime)]) -> anyhow::Result<()> {
495        let mut conn = self.lock_conn()?;
496        let tx = conn.transaction()?;
497        let now = Utc::now().to_rfc3339();
498        {
499            let mut stmt = tx.prepare(
500                "UPDATE pv_info SET last_timestamp = ?1, updated_at = ?2 WHERE pv_name = ?3",
501            )?;
502            for (pv_name, ts) in updates {
503                let dt = DateTime::<Utc>::from(*ts).to_rfc3339();
504                stmt.execute(params![dt, now, pv_name])?;
505            }
506        }
507        tx.commit()?;
508        Ok(())
509    }
510
511    /// Get real PVs added since a given time (aliases excluded).
512    pub fn recently_added_pvs(&self, since: SystemTime) -> anyhow::Result<Vec<PvRecord>> {
513        let conn = self.lock_conn()?;
514        let since_str = DateTime::<Utc>::from(since).to_rfc3339();
515        let mut stmt = conn.prepare(
516            "SELECT pv_name, dbr_type, sample_mode, sample_period, status, element_count,
517                    last_timestamp, created_at, updated_at, prec, egu,
518                    alias_for, archive_fields, policy_name, protocol
519             FROM pv_info WHERE created_at >= ?1 AND alias_for IS NULL
520             ORDER BY created_at DESC",
521        )?;
522        let records = stmt
523            .query_map(params![since_str], row_to_record)?
524            .collect::<Result<Vec<_>, _>>()?;
525        Ok(records)
526    }
527
528    /// Get real PVs modified since a given time (aliases excluded).
529    pub fn recently_modified_pvs(&self, since: SystemTime) -> anyhow::Result<Vec<PvRecord>> {
530        let conn = self.lock_conn()?;
531        let since_str = DateTime::<Utc>::from(since).to_rfc3339();
532        let mut stmt = conn.prepare(
533            "SELECT pv_name, dbr_type, sample_mode, sample_period, status, element_count,
534                    last_timestamp, created_at, updated_at, prec, egu,
535                    alias_for, archive_fields, policy_name, protocol
536             FROM pv_info WHERE updated_at >= ?1 AND alias_for IS NULL
537             ORDER BY updated_at DESC",
538        )?;
539        let records = stmt
540            .query_map(params![since_str], row_to_record)?
541            .collect::<Result<Vec<_>, _>>()?;
542        Ok(records)
543    }
544
545    /// Update the sample mode and period for a PV.
546    pub fn update_sample_mode(&self, pv_name: &str, mode: &SampleMode) -> anyhow::Result<bool> {
547        let conn = self.lock_conn()?;
548        let now = Utc::now().to_rfc3339();
549        let (mode_str, period) = mode.to_db();
550        let rows = conn.execute(
551            "UPDATE pv_info SET sample_mode = ?1, sample_period = ?2, updated_at = ?3 WHERE pv_name = ?4",
552            params![mode_str, period, now, pv_name],
553        )?;
554        Ok(rows > 0)
555    }
556
557    /// Update PREC and EGU metadata for a PV.
558    pub fn update_metadata(
559        &self,
560        pv_name: &str,
561        prec: Option<&str>,
562        egu: Option<&str>,
563    ) -> anyhow::Result<bool> {
564        let conn = self.lock_conn()?;
565        let now = Utc::now().to_rfc3339();
566        let rows = conn.execute(
567            "UPDATE pv_info SET prec = COALESCE(?1, prec), egu = COALESCE(?2, egu), updated_at = ?3 WHERE pv_name = ?4",
568            params![prec, egu, now, pv_name],
569        )?;
570        Ok(rows > 0)
571    }
572
573    /// Import a PV with default CA protocol — backwards-compatible
574    /// wrapper around [`import_pv_with_protocol`].
575    #[allow(clippy::too_many_arguments)]
576    pub fn import_pv(
577        &self,
578        pv_name: &str,
579        dbr_type: ArchDbType,
580        sample_mode: &SampleMode,
581        element_count: i32,
582        status: PvStatus,
583        created_at: Option<&str>,
584        prec: Option<&str>,
585        egu: Option<&str>,
586        alias_for: Option<&str>,
587        archive_fields: &[String],
588        policy_name: Option<&str>,
589    ) -> anyhow::Result<()> {
590        self.import_pv_with_protocol(
591            pv_name,
592            dbr_type,
593            sample_mode,
594            element_count,
595            status,
596            created_at,
597            prec,
598            egu,
599            alias_for,
600            archive_fields,
601            policy_name,
602            Protocol::Ca,
603        )
604    }
605
606    /// Import a PV with all fields in a single SQL operation.
607    /// Used during config import to atomically set status, created_at, and metadata.
608    #[allow(clippy::too_many_arguments)]
609    pub fn import_pv_with_protocol(
610        &self,
611        pv_name: &str,
612        dbr_type: ArchDbType,
613        sample_mode: &SampleMode,
614        element_count: i32,
615        status: PvStatus,
616        created_at: Option<&str>,
617        prec: Option<&str>,
618        egu: Option<&str>,
619        alias_for: Option<&str>,
620        archive_fields: &[String],
621        policy_name: Option<&str>,
622        protocol: Protocol,
623    ) -> anyhow::Result<()> {
624        if !is_valid_pv_name(pv_name) {
625            anyhow::bail!("invalid PV name: {pv_name:?}");
626        }
627        if let Some(target) = alias_for
628            && !is_valid_pv_name(target)
629        {
630            anyhow::bail!("invalid alias target: {target:?}");
631        }
632        let conn = self.lock_conn()?;
633        let now = Utc::now().to_rfc3339();
634        let (mode_str, period) = sample_mode.to_db();
635        let created = created_at.unwrap_or(&now);
636        let archive_fields_json = if archive_fields.is_empty() {
637            None
638        } else {
639            Some(serde_json::to_string(archive_fields)?)
640        };
641
642        conn.execute(
643            "INSERT OR REPLACE INTO pv_info
644             (pv_name, dbr_type, sample_mode, sample_period, status, element_count,
645              created_at, updated_at, prec, egu, alias_for, archive_fields, policy_name, protocol)
646             VALUES (?1, ?2, ?3, ?4, ?5, ?6, ?7, ?8, ?9, ?10, ?11, ?12, ?13, ?14)",
647            params![
648                pv_name,
649                dbr_type as i32,
650                mode_str,
651                period,
652                status.as_str(),
653                element_count,
654                created,
655                now,
656                prec,
657                egu,
658                alias_for,
659                archive_fields_json,
660                policy_name,
661                protocol.as_str(),
662            ],
663        )?;
664        Ok(())
665    }
666
667    /// Set or clear the archive_fields list for a PV.
668    pub fn update_archive_fields(&self, pv_name: &str, fields: &[String]) -> anyhow::Result<bool> {
669        let conn = self.lock_conn()?;
670        let now = Utc::now().to_rfc3339();
671        let json = if fields.is_empty() {
672            None
673        } else {
674            Some(serde_json::to_string(fields)?)
675        };
676        let rows = conn.execute(
677            "UPDATE pv_info SET archive_fields = ?1, updated_at = ?2 WHERE pv_name = ?3",
678            params![json, now, pv_name],
679        )?;
680        Ok(rows > 0)
681    }
682
683    /// Set or clear the policy_name on a PV.
684    pub fn update_policy_name(
685        &self,
686        pv_name: &str,
687        policy_name: Option<&str>,
688    ) -> anyhow::Result<bool> {
689        let conn = self.lock_conn()?;
690        let now = Utc::now().to_rfc3339();
691        let rows = conn.execute(
692            "UPDATE pv_info SET policy_name = ?1, updated_at = ?2 WHERE pv_name = ?3",
693            params![policy_name, now, pv_name],
694        )?;
695        Ok(rows > 0)
696    }
697
698    /// Add an alias `alias` that points at the existing PV `target`.
699    /// Fails if target does not exist or if `alias` already maps elsewhere.
700    /// The alias row mirrors target's dbr_type/sample_mode/element_count for
701    /// display, but `alias_for` distinguishes it from real PVs.
702    pub fn add_alias(&self, alias: &str, target: &str) -> anyhow::Result<()> {
703        if alias == target {
704            anyhow::bail!("alias and target must differ");
705        }
706        if !is_valid_pv_name(alias) {
707            anyhow::bail!("invalid alias name: {alias:?}");
708        }
709        if !is_valid_pv_name(target) {
710            anyhow::bail!("invalid alias target: {target:?}");
711        }
712        let conn = self.lock_conn()?;
713        // Resolve target (must be a real PV, not itself an alias).
714        let row: Option<(i32, String, f64, i32, Option<String>)> = conn
715            .query_row(
716                "SELECT dbr_type, sample_mode, sample_period, element_count, alias_for
717                 FROM pv_info WHERE pv_name = ?1",
718                params![target],
719                |r| Ok((r.get(0)?, r.get(1)?, r.get(2)?, r.get(3)?, r.get(4)?)),
720            )
721            .optional()?;
722        let (dbr_type, mode, period, ec, target_alias) =
723            row.ok_or_else(|| anyhow::anyhow!("target PV '{target}' not found"))?;
724        if target_alias.is_some() {
725            anyhow::bail!(
726                "target PV '{target}' is itself an alias; aliases of aliases are not allowed"
727            );
728        }
729        // Reject if `alias` already exists either as a real PV or different alias.
730        let existing: Option<Option<String>> = conn
731            .query_row(
732                "SELECT alias_for FROM pv_info WHERE pv_name = ?1",
733                params![alias],
734                |r| r.get(0),
735            )
736            .optional()?;
737        if let Some(existing_alias) = existing {
738            if existing_alias.as_deref() == Some(target) {
739                return Ok(()); // idempotent
740            }
741            anyhow::bail!("'{alias}' already exists in registry");
742        }
743        let now = Utc::now().to_rfc3339();
744        conn.execute(
745            "INSERT INTO pv_info
746             (pv_name, dbr_type, sample_mode, sample_period, status, element_count,
747              created_at, updated_at, alias_for)
748             VALUES (?1, ?2, ?3, ?4, 'alias', ?5, ?6, ?6, ?7)",
749            params![alias, dbr_type, mode, period, ec, now, target],
750        )?;
751        Ok(())
752    }
753
754    /// Remove an alias row. Returns true if removed, false if the row was not
755    /// an alias or did not exist. Real PVs are not removed by this method.
756    pub fn remove_alias(&self, alias: &str) -> anyhow::Result<bool> {
757        let conn = self.lock_conn()?;
758        let rows = conn.execute(
759            "DELETE FROM pv_info WHERE pv_name = ?1 AND alias_for IS NOT NULL",
760            params![alias],
761        )?;
762        Ok(rows > 0)
763    }
764
765    /// If `name` is an alias, return its target. Returns None if `name` is
766    /// already a real PV or does not exist.
767    pub fn resolve_alias(&self, name: &str) -> anyhow::Result<Option<String>> {
768        let conn = self.lock_conn()?;
769        let row: Option<Option<String>> = conn
770            .query_row(
771                "SELECT alias_for FROM pv_info WHERE pv_name = ?1",
772                params![name],
773                |r| r.get(0),
774            )
775            .optional()?;
776        Ok(row.flatten())
777    }
778
779    /// Return the canonical PV name. If `name` is an alias, returns the target;
780    /// otherwise returns the input unchanged. Used by lookup/retrieval paths.
781    pub fn canonical_name(&self, name: &str) -> anyhow::Result<String> {
782        Ok(self
783            .resolve_alias(name)?
784            .unwrap_or_else(|| name.to_string()))
785    }
786
787    /// List all alias names pointing at a given target PV.
788    pub fn aliases_for(&self, target: &str) -> anyhow::Result<Vec<String>> {
789        let conn = self.lock_conn()?;
790        let mut stmt =
791            conn.prepare("SELECT pv_name FROM pv_info WHERE alias_for = ?1 ORDER BY pv_name")?;
792        let names = stmt
793            .query_map(params![target], |row| row.get(0))?
794            .collect::<Result<Vec<String>, _>>()?;
795        Ok(names)
796    }
797
798    /// All `(alias, target)` pairs in the registry.
799    pub fn all_aliases(&self) -> anyhow::Result<Vec<(String, String)>> {
800        let conn = self.lock_conn()?;
801        let mut stmt = conn.prepare(
802            "SELECT pv_name, alias_for FROM pv_info
803             WHERE alias_for IS NOT NULL ORDER BY pv_name",
804        )?;
805        let rows = stmt
806            .query_map([], |row| {
807                Ok((row.get::<_, String>(0)?, row.get::<_, String>(1)?))
808            })?
809            .collect::<Result<Vec<_>, _>>()?;
810        Ok(rows)
811    }
812
813    /// All PV names including aliases (`getAllExpandedPVNames`).
814    pub fn expanded_pv_names(&self) -> anyhow::Result<Vec<String>> {
815        let conn = self.lock_conn()?;
816        let mut stmt = conn.prepare("SELECT pv_name FROM pv_info ORDER BY pv_name")?;
817        let names = stmt
818            .query_map([], |row| row.get(0))?
819            .collect::<Result<Vec<String>, _>>()?;
820        Ok(names)
821    }
822
823    /// Get all PV records (for export).
824    pub fn all_records(&self) -> anyhow::Result<Vec<PvRecord>> {
825        let conn = self.lock_conn()?;
826        let mut stmt = conn.prepare(
827            "SELECT pv_name, dbr_type, sample_mode, sample_period, status, element_count,
828                    last_timestamp, created_at, updated_at, prec, egu,
829                    alias_for, archive_fields, policy_name, protocol
830             FROM pv_info ORDER BY pv_name",
831        )?;
832        let records = stmt
833            .query_map([], row_to_record)?
834            .collect::<Result<Vec<_>, _>>()?;
835        Ok(records)
836    }
837
838    /// Get real PVs that have not received events for longer than the
839    /// threshold duration. Only returns PVs that have a last_timestamp.
840    /// Aliases are excluded — they don't carry event timestamps.
841    pub fn silent_pvs(&self, threshold: Duration) -> anyhow::Result<Vec<PvRecord>> {
842        let conn = self.lock_conn()?;
843        let cutoff = SystemTime::now()
844            .checked_sub(threshold)
845            .unwrap_or(SystemTime::UNIX_EPOCH);
846        let cutoff_str = DateTime::<Utc>::from(cutoff).to_rfc3339();
847        let mut stmt = conn.prepare(
848            "SELECT pv_name, dbr_type, sample_mode, sample_period, status, element_count,
849                    last_timestamp, created_at, updated_at, prec, egu,
850                    alias_for, archive_fields, policy_name, protocol
851             FROM pv_info WHERE last_timestamp IS NOT NULL AND last_timestamp < ?1
852                            AND alias_for IS NULL
853             ORDER BY last_timestamp ASC",
854        )?;
855        let records = stmt
856            .query_map(params![cutoff_str], row_to_record)?
857            .collect::<Result<Vec<_>, _>>()?;
858        Ok(records)
859    }
860}
861
862fn row_to_record(row: &rusqlite::Row) -> rusqlite::Result<PvRecord> {
863    let pv_name: String = row.get(0)?;
864    let dbr_type_i: i32 = row.get(1)?;
865    let sample_mode_str: String = row.get(2)?;
866    let sample_period: f64 = row.get(3)?;
867    let status_str: String = row.get(4)?;
868    let element_count: i32 = row.get(5)?;
869    let last_ts_str: Option<String> = row.get(6)?;
870    let created_str: String = row.get(7)?;
871    let updated_str: String = row.get(8)?;
872    let prec: Option<String> = row.get(9).unwrap_or(None);
873    let egu: Option<String> = row.get(10).unwrap_or(None);
874    let alias_for: Option<String> = row.get(11).unwrap_or(None);
875    let archive_fields_json: Option<String> = row.get(12).unwrap_or(None);
876    let policy_name: Option<String> = row.get(13).unwrap_or(None);
877    // Column 14 ("protocol") may be absent on a row written by an
878    // older archiver — `unwrap_or(None)` falls back to default Ca.
879    let protocol_str: Option<String> = row.get(14).unwrap_or(None);
880    let protocol = protocol_str
881        .as_deref()
882        .and_then(Protocol::parse)
883        .unwrap_or_default();
884
885    let last_timestamp = last_ts_str.and_then(|s| {
886        DateTime::parse_from_rfc3339(&s)
887            .ok()
888            .map(|dt| dt.with_timezone(&Utc).into())
889    });
890
891    let archive_fields = archive_fields_json
892        .as_deref()
893        .and_then(|s| serde_json::from_str::<Vec<String>>(s).ok())
894        .unwrap_or_default();
895
896    Ok(PvRecord {
897        pv_name,
898        dbr_type: ArchDbType::from_i32(dbr_type_i).unwrap_or(ArchDbType::ScalarDouble),
899        sample_mode: SampleMode::from_db(&sample_mode_str, sample_period),
900        status: status_str.parse().unwrap_or(PvStatus::Active),
901        element_count,
902        last_timestamp,
903        created_at: DateTime::parse_from_rfc3339(&created_str)
904            .map(|dt| dt.with_timezone(&Utc))
905            .unwrap_or_else(|_| Utc::now()),
906        updated_at: DateTime::parse_from_rfc3339(&updated_str)
907            .map(|dt| dt.with_timezone(&Utc))
908            .unwrap_or_else(|_| Utc::now()),
909        prec,
910        egu,
911        alias_for,
912        archive_fields,
913        policy_name,
914        protocol,
915    })
916}
917
918/// SQLite returns generic `Error` for ALTER TABLE failures. Match on the
919/// message because rusqlite doesn't expose a structured "duplicate column"
920/// extended-code; the SQLite error string is `duplicate column name: <name>`.
921fn is_duplicate_column_error(e: &rusqlite::Error) -> bool {
922    matches!(
923        e,
924        rusqlite::Error::SqliteFailure(_, Some(msg))
925            if msg.starts_with("duplicate column name")
926    )
927}
928
929#[cfg(test)]
930mod tests {
931    use super::*;
932
933    #[test]
934    fn invalid_pv_names_rejected() {
935        // path traversal
936        assert!(!is_valid_pv_name("../etc/passwd"));
937        assert!(!is_valid_pv_name("foo/../bar"));
938        assert!(!is_valid_pv_name("foo:..:bar"));
939        assert!(!is_valid_pv_name("foo/./bar"));
940        // absolute paths via leading separator (round-10: leading ':'
941        // becomes '/' after pv_name_to_key and escapes the storage root)
942        assert!(!is_valid_pv_name("/etc/passwd"));
943        assert!(!is_valid_pv_name(":SIM:foo"));
944        assert!(!is_valid_pv_name(":foo"));
945        // empty segments (collapse oddly when split on / or :)
946        assert!(!is_valid_pv_name("foo::bar"));
947        assert!(!is_valid_pv_name("foo//bar"));
948        assert!(!is_valid_pv_name("foo:"));
949        assert!(!is_valid_pv_name("foo/"));
950        // shell metacharacters
951        assert!(!is_valid_pv_name("foo;rm -rf /"));
952        assert!(!is_valid_pv_name("foo|bar"));
953        assert!(!is_valid_pv_name("foo`x`"));
954        assert!(!is_valid_pv_name("foo$BAR"));
955        assert!(!is_valid_pv_name("foo bar"));
956        assert!(!is_valid_pv_name("foo\nbar"));
957        // edge inputs
958        assert!(!is_valid_pv_name(""));
959        assert!(!is_valid_pv_name(".hidden"));
960        assert!(!is_valid_pv_name("-leading-dash"));
961        assert!(!is_valid_pv_name(&"x".repeat(257)));
962    }
963
964    #[test]
965    fn valid_pv_names_accepted() {
966        // typical EPICS site naming conventions
967        assert!(is_valid_pv_name("SIM:Sine"));
968        assert!(is_valid_pv_name("XF:31IDA-OP{Tbl-Ax:X1}Mtr"));
969        assert!(is_valid_pv_name("ACC1-001-RFCAV-01:V<x>"));
970        assert!(is_valid_pv_name("PV.HIHI"));
971        assert!(is_valid_pv_name("BL_X+Y"));
972        assert!(is_valid_pv_name("a"));
973        assert!(is_valid_pv_name(&"x".repeat(256)));
974    }
975
976    #[test]
977    fn strip_field_suffix_basics() {
978        assert_eq!(strip_field_suffix("BASE.HIHI"), Some("BASE"));
979        assert_eq!(strip_field_suffix("BASE.LOLO"), Some("BASE"));
980        assert_eq!(strip_field_suffix("FOO.BAR_99"), Some("FOO"));
981        // no suffix
982        assert_eq!(strip_field_suffix("BASE"), None);
983        // lowercase / mixed-case rejected (not a standard EPICS field)
984        assert_eq!(strip_field_suffix("BASE.hihi"), None);
985        assert_eq!(strip_field_suffix("BASE.Hihi"), None);
986        // empty parts
987        assert_eq!(strip_field_suffix(".HIHI"), None);
988        assert_eq!(strip_field_suffix("BASE."), None);
989    }
990
991    #[test]
992    fn test_register_and_get() {
993        let reg = PvRegistry::in_memory().unwrap();
994        reg.register_pv(
995            "SIM:Sine",
996            ArchDbType::ScalarDouble,
997            &SampleMode::Monitor,
998            1,
999        )
1000        .unwrap();
1001
1002        let record = reg.get_pv("SIM:Sine").unwrap().unwrap();
1003        assert_eq!(record.pv_name, "SIM:Sine");
1004        assert_eq!(record.dbr_type, ArchDbType::ScalarDouble);
1005        assert_eq!(record.status, PvStatus::Active);
1006    }
1007
1008    #[test]
1009    fn test_status_transitions() {
1010        let reg = PvRegistry::in_memory().unwrap();
1011        reg.register_pv(
1012            "SIM:Test",
1013            ArchDbType::ScalarDouble,
1014            &SampleMode::Monitor,
1015            1,
1016        )
1017        .unwrap();
1018
1019        reg.set_status("SIM:Test", PvStatus::Paused).unwrap();
1020        let r = reg.get_pv("SIM:Test").unwrap().unwrap();
1021        assert_eq!(r.status, PvStatus::Paused);
1022
1023        reg.set_status("SIM:Test", PvStatus::Active).unwrap();
1024        let r = reg.get_pv("SIM:Test").unwrap().unwrap();
1025        assert_eq!(r.status, PvStatus::Active);
1026    }
1027
1028    #[test]
1029    fn test_pattern_matching() {
1030        let reg = PvRegistry::in_memory().unwrap();
1031        reg.register_pv(
1032            "SIM:Sine",
1033            ArchDbType::ScalarDouble,
1034            &SampleMode::Monitor,
1035            1,
1036        )
1037        .unwrap();
1038        reg.register_pv(
1039            "SIM:Cosine",
1040            ArchDbType::ScalarDouble,
1041            &SampleMode::Monitor,
1042            1,
1043        )
1044        .unwrap();
1045        reg.register_pv(
1046            "EXP:BL1:run:active",
1047            ArchDbType::ScalarEnum,
1048            &SampleMode::Monitor,
1049            1,
1050        )
1051        .unwrap();
1052        reg.register_pv(
1053            "EXP:BL1:motor:th:readback",
1054            ArchDbType::ScalarDouble,
1055            &SampleMode::Monitor,
1056            1,
1057        )
1058        .unwrap();
1059
1060        let sim = reg.matching_pvs("SIM:*").unwrap();
1061        assert_eq!(sim.len(), 2);
1062
1063        let exp = reg.matching_pvs("EXP:BL1:*").unwrap();
1064        assert_eq!(exp.len(), 2);
1065
1066        let motor = reg.matching_pvs("EXP:*:motor:*").unwrap();
1067        assert_eq!(motor.len(), 1);
1068    }
1069
1070    #[test]
1071    fn test_count_and_list() {
1072        let reg = PvRegistry::in_memory().unwrap();
1073        for i in 0..100 {
1074            reg.register_pv(
1075                &format!("PV:Test:{i:04}"),
1076                ArchDbType::ScalarDouble,
1077                &SampleMode::Monitor,
1078                1,
1079            )
1080            .unwrap();
1081        }
1082
1083        assert_eq!(reg.count(None).unwrap(), 100);
1084        assert_eq!(reg.count(Some(PvStatus::Active)).unwrap(), 100);
1085
1086        let names = reg.all_pv_names().unwrap();
1087        assert_eq!(names.len(), 100);
1088    }
1089
1090    #[test]
1091    fn test_remove_pv() {
1092        let reg = PvRegistry::in_memory().unwrap();
1093        reg.register_pv(
1094            "SIM:Gone",
1095            ArchDbType::ScalarDouble,
1096            &SampleMode::Monitor,
1097            1,
1098        )
1099        .unwrap();
1100        assert!(reg.get_pv("SIM:Gone").unwrap().is_some());
1101
1102        reg.remove_pv("SIM:Gone").unwrap();
1103        assert!(reg.get_pv("SIM:Gone").unwrap().is_none());
1104    }
1105
1106    #[test]
1107    fn test_batch_update_timestamps() {
1108        let reg = PvRegistry::in_memory().unwrap();
1109        reg.register_pv("PV:A", ArchDbType::ScalarDouble, &SampleMode::Monitor, 1)
1110            .unwrap();
1111        reg.register_pv("PV:B", ArchDbType::ScalarDouble, &SampleMode::Monitor, 1)
1112            .unwrap();
1113
1114        let now = SystemTime::now();
1115        reg.batch_update_timestamps(&[("PV:A", now), ("PV:B", now)])
1116            .unwrap();
1117
1118        let a = reg.get_pv("PV:A").unwrap().unwrap();
1119        assert!(a.last_timestamp.is_some());
1120    }
1121
1122    #[test]
1123    fn test_recently_added_pvs() {
1124        let reg = PvRegistry::in_memory().unwrap();
1125        let before = SystemTime::now() - Duration::from_secs(1);
1126        reg.register_pv("PV:New", ArchDbType::ScalarDouble, &SampleMode::Monitor, 1)
1127            .unwrap();
1128
1129        let recent = reg.recently_added_pvs(before).unwrap();
1130        assert_eq!(recent.len(), 1);
1131        assert_eq!(recent[0].pv_name, "PV:New");
1132
1133        let future = SystemTime::now() + Duration::from_secs(3600);
1134        let none = reg.recently_added_pvs(future).unwrap();
1135        assert!(none.is_empty());
1136    }
1137
1138    #[test]
1139    fn test_recently_modified_pvs() {
1140        let reg = PvRegistry::in_memory().unwrap();
1141        reg.register_pv("PV:Mod", ArchDbType::ScalarDouble, &SampleMode::Monitor, 1)
1142            .unwrap();
1143        let before = SystemTime::now() - Duration::from_secs(1);
1144
1145        // Modify status to update updated_at.
1146        reg.set_status("PV:Mod", PvStatus::Paused).unwrap();
1147
1148        let recent = reg.recently_modified_pvs(before).unwrap();
1149        assert!(recent.iter().any(|r| r.pv_name == "PV:Mod"));
1150    }
1151
1152    #[test]
1153    fn test_update_sample_mode() {
1154        let reg = PvRegistry::in_memory().unwrap();
1155        reg.register_pv("PV:Mode", ArchDbType::ScalarDouble, &SampleMode::Monitor, 1)
1156            .unwrap();
1157
1158        let new_mode = SampleMode::Scan { period_secs: 5.0 };
1159        assert!(reg.update_sample_mode("PV:Mode", &new_mode).unwrap());
1160
1161        let r = reg.get_pv("PV:Mode").unwrap().unwrap();
1162        assert_eq!(r.sample_mode, SampleMode::Scan { period_secs: 5.0 });
1163    }
1164
1165    #[test]
1166    fn test_archive_fields_roundtrip() {
1167        let reg = PvRegistry::in_memory().unwrap();
1168        reg.register_pv(
1169            "PV:Fields",
1170            ArchDbType::ScalarDouble,
1171            &SampleMode::Monitor,
1172            1,
1173        )
1174        .unwrap();
1175
1176        // Default is empty.
1177        let r = reg.get_pv("PV:Fields").unwrap().unwrap();
1178        assert!(r.archive_fields.is_empty());
1179
1180        // Set and read back.
1181        let fields = vec!["HIHI".to_string(), "LOLO".to_string(), "EGU".to_string()];
1182        assert!(reg.update_archive_fields("PV:Fields", &fields).unwrap());
1183        let r = reg.get_pv("PV:Fields").unwrap().unwrap();
1184        assert_eq!(r.archive_fields, fields);
1185
1186        // Clearing with [] removes the JSON entry.
1187        assert!(reg.update_archive_fields("PV:Fields", &[]).unwrap());
1188        let r = reg.get_pv("PV:Fields").unwrap().unwrap();
1189        assert!(r.archive_fields.is_empty());
1190    }
1191
1192    #[test]
1193    fn test_policy_name_roundtrip() {
1194        let reg = PvRegistry::in_memory().unwrap();
1195        reg.register_pv("PV:Pol", ArchDbType::ScalarDouble, &SampleMode::Monitor, 1)
1196            .unwrap();
1197
1198        let r = reg.get_pv("PV:Pol").unwrap().unwrap();
1199        assert!(r.policy_name.is_none());
1200
1201        assert!(reg.update_policy_name("PV:Pol", Some("fast")).unwrap());
1202        let r = reg.get_pv("PV:Pol").unwrap().unwrap();
1203        assert_eq!(r.policy_name.as_deref(), Some("fast"));
1204
1205        assert!(reg.update_policy_name("PV:Pol", None).unwrap());
1206        let r = reg.get_pv("PV:Pol").unwrap().unwrap();
1207        assert!(r.policy_name.is_none());
1208    }
1209
1210    #[test]
1211    fn test_import_pv_with_alias_and_fields() {
1212        let reg = PvRegistry::in_memory().unwrap();
1213        let fields = vec!["HIHI".to_string(), "LOLO".to_string()];
1214        reg.import_pv(
1215            "PV:Aliased",
1216            ArchDbType::ScalarDouble,
1217            &SampleMode::Monitor,
1218            1,
1219            PvStatus::Active,
1220            None,
1221            Some("3"),
1222            Some("mA"),
1223            Some("PV:Real"),
1224            &fields,
1225            Some("ring"),
1226        )
1227        .unwrap();
1228
1229        let r = reg.get_pv("PV:Aliased").unwrap().unwrap();
1230        assert_eq!(r.alias_for.as_deref(), Some("PV:Real"));
1231        assert_eq!(r.archive_fields, fields);
1232        assert_eq!(r.policy_name.as_deref(), Some("ring"));
1233        assert_eq!(r.prec.as_deref(), Some("3"));
1234        assert_eq!(r.egu.as_deref(), Some("mA"));
1235    }
1236
1237    #[test]
1238    fn test_migration_from_old_schema() {
1239        // Build a connection with the v0.1.4 schema (no alias/archive_fields/policy)
1240        // to verify ALTER TABLE migrations succeed and old rows decode cleanly.
1241        let conn = Connection::open_in_memory().unwrap();
1242        conn.execute_batch(
1243            "CREATE TABLE pv_info (
1244                pv_name        TEXT PRIMARY KEY NOT NULL,
1245                dbr_type       INTEGER NOT NULL,
1246                sample_mode    TEXT NOT NULL DEFAULT 'monitor',
1247                sample_period  REAL NOT NULL DEFAULT 0.0,
1248                status         TEXT NOT NULL DEFAULT 'active',
1249                element_count  INTEGER NOT NULL DEFAULT 1,
1250                last_timestamp TEXT,
1251                created_at     TEXT NOT NULL,
1252                updated_at     TEXT NOT NULL,
1253                prec           TEXT,
1254                egu            TEXT
1255            );",
1256        )
1257        .unwrap();
1258        let now = Utc::now().to_rfc3339();
1259        conn.execute(
1260            "INSERT INTO pv_info
1261             (pv_name, dbr_type, sample_mode, sample_period, status, element_count,
1262              created_at, updated_at, prec, egu)
1263             VALUES (?1, ?2, 'monitor', 0.0, 'active', 1, ?3, ?3, NULL, NULL)",
1264            params!["PV:Legacy", ArchDbType::ScalarDouble as i32, now],
1265        )
1266        .unwrap();
1267
1268        let reg = PvRegistry {
1269            conn: Mutex::new(conn),
1270        };
1271        // Re-running init_schema should add the new columns without dropping data.
1272        reg.init_schema().unwrap();
1273
1274        let r = reg.get_pv("PV:Legacy").unwrap().unwrap();
1275        assert_eq!(r.pv_name, "PV:Legacy");
1276        assert!(r.alias_for.is_none());
1277        assert!(r.archive_fields.is_empty());
1278        assert!(r.policy_name.is_none());
1279
1280        // New writes work too.
1281        assert!(
1282            reg.update_archive_fields("PV:Legacy", &["HIHI".to_string()])
1283                .unwrap()
1284        );
1285        let r = reg.get_pv("PV:Legacy").unwrap().unwrap();
1286        assert_eq!(r.archive_fields, vec!["HIHI".to_string()]);
1287    }
1288
1289    #[test]
1290    fn test_aliases_basic() {
1291        let reg = PvRegistry::in_memory().unwrap();
1292        reg.register_pv(
1293            "RING:Current",
1294            ArchDbType::ScalarDouble,
1295            &SampleMode::Monitor,
1296            1,
1297        )
1298        .unwrap();
1299
1300        // Add an alias and verify resolution.
1301        reg.add_alias("DEV:Current", "RING:Current").unwrap();
1302        assert_eq!(
1303            reg.resolve_alias("DEV:Current").unwrap().as_deref(),
1304            Some("RING:Current"),
1305        );
1306        assert!(reg.resolve_alias("RING:Current").unwrap().is_none()); // real PV
1307        assert!(reg.resolve_alias("Nonexistent").unwrap().is_none());
1308
1309        assert_eq!(reg.canonical_name("DEV:Current").unwrap(), "RING:Current");
1310        assert_eq!(reg.canonical_name("RING:Current").unwrap(), "RING:Current");
1311
1312        // Aliases for / all aliases.
1313        assert_eq!(
1314            reg.aliases_for("RING:Current").unwrap(),
1315            vec!["DEV:Current".to_string()],
1316        );
1317        assert_eq!(
1318            reg.all_aliases().unwrap(),
1319            vec![("DEV:Current".to_string(), "RING:Current".to_string())],
1320        );
1321
1322        // Expanded names contains both real and alias.
1323        let expanded = reg.expanded_pv_names().unwrap();
1324        assert!(expanded.contains(&"RING:Current".to_string()));
1325        assert!(expanded.contains(&"DEV:Current".to_string()));
1326
1327        // Idempotent re-add.
1328        reg.add_alias("DEV:Current", "RING:Current").unwrap();
1329        assert_eq!(reg.aliases_for("RING:Current").unwrap().len(), 1);
1330
1331        // Remove alias.
1332        assert!(reg.remove_alias("DEV:Current").unwrap());
1333        assert!(reg.resolve_alias("DEV:Current").unwrap().is_none());
1334        assert!(!reg.remove_alias("DEV:Current").unwrap()); // already gone
1335    }
1336
1337    #[test]
1338    fn test_alias_conflicts() {
1339        let reg = PvRegistry::in_memory().unwrap();
1340        reg.register_pv("PV:A", ArchDbType::ScalarDouble, &SampleMode::Monitor, 1)
1341            .unwrap();
1342        reg.register_pv("PV:B", ArchDbType::ScalarDouble, &SampleMode::Monitor, 1)
1343            .unwrap();
1344
1345        // Cannot alias to nonexistent target.
1346        assert!(reg.add_alias("Alias:X", "Nonexistent").is_err());
1347
1348        // Cannot self-alias.
1349        assert!(reg.add_alias("PV:A", "PV:A").is_err());
1350
1351        // Alias name conflicts with existing real PV.
1352        assert!(reg.add_alias("PV:B", "PV:A").is_err());
1353
1354        // Alias of alias not allowed.
1355        reg.add_alias("Alias:A", "PV:A").unwrap();
1356        assert!(reg.add_alias("Alias:Two", "Alias:A").is_err());
1357
1358        // remove_alias does NOT delete real PVs.
1359        assert!(!reg.remove_alias("PV:A").unwrap());
1360        assert!(reg.get_pv("PV:A").unwrap().is_some());
1361    }
1362
1363    #[test]
1364    fn test_silent_pvs() {
1365        let reg = PvRegistry::in_memory().unwrap();
1366        reg.register_pv(
1367            "PV:Silent",
1368            ArchDbType::ScalarDouble,
1369            &SampleMode::Monitor,
1370            1,
1371        )
1372        .unwrap();
1373        reg.register_pv(
1374            "PV:NoData",
1375            ArchDbType::ScalarDouble,
1376            &SampleMode::Monitor,
1377            1,
1378        )
1379        .unwrap();
1380
1381        // Set PV:Silent's last_timestamp to 2 hours ago.
1382        let old_time = SystemTime::now() - Duration::from_secs(7200);
1383        reg.update_last_timestamp("PV:Silent", old_time).unwrap();
1384
1385        // PV:NoData has no last_timestamp — should not appear.
1386        let silent = reg.silent_pvs(Duration::from_secs(3600)).unwrap();
1387        assert_eq!(silent.len(), 1);
1388        assert_eq!(silent[0].pv_name, "PV:Silent");
1389    }
1390}