1use std::path::Path;
7use std::sync::Mutex;
8use std::time::{Duration, SystemTime};
9
10use chrono::{DateTime, Utc};
11use rusqlite::{Connection, OptionalExtension, params};
12use tracing::info;
13
14use crate::types::ArchDbType;
15
16pub fn normalize_pv_name(name: &str) -> &str {
25 let name = name
26 .strip_prefix("pva://")
27 .or_else(|| name.strip_prefix("ca://"))
28 .unwrap_or(name);
29 name.strip_suffix(".VAL").unwrap_or(name)
30}
31
32pub fn strip_field_suffix(name: &str) -> Option<&str> {
40 let (base, field) = name.rsplit_once('.')?;
41 if base.is_empty() || field.is_empty() {
42 return None;
43 }
44 if !field
45 .chars()
46 .all(|c| c.is_ascii_uppercase() || c.is_ascii_digit() || c == '_')
47 {
48 return None;
49 }
50 Some(base)
51}
52
53pub fn is_valid_pv_name(name: &str) -> bool {
67 if name.is_empty() || name.len() > 256 {
68 return false;
69 }
70 if name.starts_with('.')
75 || name.starts_with('-')
76 || name.starts_with('/')
77 || name.starts_with(':')
78 {
79 return false;
80 }
81 for component in name.split([':', '/']) {
82 if component.is_empty() || component == ".." || component == "." {
87 return false;
88 }
89 }
90 !name.chars().any(|c| {
91 c == '\0'
92 || c == '\\'
93 || c.is_whitespace()
94 || c.is_control()
95 || matches!(c, '|' | '&' | ';' | '`' | '$' | '"' | '\'' | '*' | '?')
98 })
99}
100
101#[derive(Debug, Clone, Copy, PartialEq, Eq)]
107pub enum PvStatus {
108 Active,
109 Paused,
110 Error,
111 Inactive,
112 Alias,
113}
114
115impl PvStatus {
116 pub fn as_str(self) -> &'static str {
117 match self {
118 Self::Active => "active",
119 Self::Paused => "paused",
120 Self::Error => "error",
121 Self::Inactive => "inactive",
122 Self::Alias => "alias",
123 }
124 }
125}
126
127impl std::str::FromStr for PvStatus {
128 type Err = std::convert::Infallible;
129
130 fn from_str(s: &str) -> Result<Self, Self::Err> {
131 Ok(match s {
132 "active" => Self::Active,
133 "paused" => Self::Paused,
134 "error" => Self::Error,
135 "inactive" => Self::Inactive,
136 "alias" => Self::Alias,
137 _ => Self::Active,
138 })
139 }
140}
141
142#[derive(Debug, Clone, PartialEq)]
144pub enum SampleMode {
145 Monitor,
146 Scan { period_secs: f64 },
147}
148
149impl SampleMode {
150 fn to_db(&self) -> (&str, f64) {
151 match self {
152 Self::Monitor => ("monitor", 0.0),
153 Self::Scan { period_secs } => ("scan", *period_secs),
154 }
155 }
156
157 fn from_db(mode: &str, period: f64) -> Self {
158 match mode {
159 "scan" => Self::Scan {
160 period_secs: period,
161 },
162 _ => Self::Monitor,
163 }
164 }
165}
166
167#[derive(Debug, Clone)]
169pub struct PvRecord {
170 pub pv_name: String,
171 pub dbr_type: ArchDbType,
172 pub sample_mode: SampleMode,
173 pub status: PvStatus,
174 pub element_count: i32,
175 pub last_timestamp: Option<SystemTime>,
176 pub created_at: DateTime<Utc>,
177 pub updated_at: DateTime<Utc>,
178 pub prec: Option<String>,
179 pub egu: Option<String>,
180 pub alias_for: Option<String>,
183 pub archive_fields: Vec<String>,
186 pub policy_name: Option<String>,
188}
189
190pub struct PvRegistry {
192 conn: Mutex<Connection>,
193}
194
195impl PvRegistry {
196 fn lock_conn(&self) -> anyhow::Result<std::sync::MutexGuard<'_, Connection>> {
197 self.conn
198 .lock()
199 .map_err(|e| anyhow::anyhow!("PV registry lock poisoned: {e}"))
200 }
201
202 pub fn open(path: &Path) -> anyhow::Result<Self> {
204 let conn = Connection::open(path)?;
205 let registry = Self {
206 conn: Mutex::new(conn),
207 };
208 registry.init_schema()?;
209 Ok(registry)
210 }
211
212 pub fn in_memory() -> anyhow::Result<Self> {
214 let conn = Connection::open_in_memory()?;
215 let registry = Self {
216 conn: Mutex::new(conn),
217 };
218 registry.init_schema()?;
219 Ok(registry)
220 }
221
222 fn init_schema(&self) -> anyhow::Result<()> {
223 let conn = self.lock_conn()?;
224 conn.execute_batch(
227 "
228 CREATE TABLE IF NOT EXISTS pv_info (
229 pv_name TEXT PRIMARY KEY NOT NULL,
230 dbr_type INTEGER NOT NULL,
231 sample_mode TEXT NOT NULL DEFAULT 'monitor',
232 sample_period REAL NOT NULL DEFAULT 0.0,
233 status TEXT NOT NULL DEFAULT 'active',
234 element_count INTEGER NOT NULL DEFAULT 1,
235 last_timestamp TEXT,
236 created_at TEXT NOT NULL,
237 updated_at TEXT NOT NULL,
238 prec TEXT,
239 egu TEXT,
240 alias_for TEXT,
241 archive_fields TEXT,
242 policy_name TEXT
243 );
244
245 CREATE INDEX IF NOT EXISTS idx_pv_status ON pv_info(status);
246 CREATE INDEX IF NOT EXISTS idx_pv_prefix ON pv_info(pv_name COLLATE NOCASE);
247 ",
248 )?;
249 for stmt in [
254 "ALTER TABLE pv_info ADD COLUMN prec TEXT",
255 "ALTER TABLE pv_info ADD COLUMN egu TEXT",
256 "ALTER TABLE pv_info ADD COLUMN alias_for TEXT",
257 "ALTER TABLE pv_info ADD COLUMN archive_fields TEXT",
258 "ALTER TABLE pv_info ADD COLUMN policy_name TEXT",
259 ] {
260 match conn.execute(stmt, []) {
261 Ok(_) => {}
262 Err(e) if is_duplicate_column_error(&e) => {}
263 Err(e) => return Err(e.into()),
264 }
265 }
266 conn.execute_batch(
269 "CREATE INDEX IF NOT EXISTS idx_pv_alias \
270 ON pv_info(alias_for) WHERE alias_for IS NOT NULL;",
271 )?;
272 info!("PV registry schema initialized");
273 Ok(())
274 }
275
276 pub fn register_pv(
278 &self,
279 pv_name: &str,
280 dbr_type: ArchDbType,
281 sample_mode: &SampleMode,
282 element_count: i32,
283 ) -> anyhow::Result<()> {
284 if !is_valid_pv_name(pv_name) {
285 anyhow::bail!("invalid PV name: {pv_name:?}");
286 }
287 let conn = self.lock_conn()?;
288 let now = Utc::now().to_rfc3339();
289 let (mode_str, period) = sample_mode.to_db();
290
291 conn.execute(
292 "INSERT OR REPLACE INTO pv_info
293 (pv_name, dbr_type, sample_mode, sample_period, status, element_count, created_at, updated_at)
294 VALUES (?1, ?2, ?3, ?4, 'active', ?5, COALESCE((SELECT created_at FROM pv_info WHERE pv_name = ?1), ?6), ?6)",
295 params![pv_name, dbr_type as i32, mode_str, period, element_count, now],
296 )?;
297 Ok(())
298 }
299
300 pub fn set_status(&self, pv_name: &str, status: PvStatus) -> anyhow::Result<bool> {
302 let conn = self.lock_conn()?;
303 let now = Utc::now().to_rfc3339();
304 let rows = conn.execute(
305 "UPDATE pv_info SET status = ?1, updated_at = ?2 WHERE pv_name = ?3",
306 params![status.as_str(), now, pv_name],
307 )?;
308 Ok(rows > 0)
309 }
310
311 pub fn update_last_timestamp(
313 &self,
314 pv_name: &str,
315 timestamp: SystemTime,
316 ) -> anyhow::Result<()> {
317 let conn = self.lock_conn()?;
318 let dt = DateTime::<Utc>::from(timestamp).to_rfc3339();
319 let now = Utc::now().to_rfc3339();
320 conn.execute(
321 "UPDATE pv_info SET last_timestamp = ?1, updated_at = ?2 WHERE pv_name = ?3",
322 params![dt, now, pv_name],
323 )?;
324 Ok(())
325 }
326
327 pub fn remove_pv(&self, pv_name: &str) -> anyhow::Result<bool> {
329 let conn = self.lock_conn()?;
330 let rows = conn.execute("DELETE FROM pv_info WHERE pv_name = ?1", params![pv_name])?;
331 Ok(rows > 0)
332 }
333
334 pub fn get_pv(&self, pv_name: &str) -> anyhow::Result<Option<PvRecord>> {
336 let conn = self.lock_conn()?;
337 conn.query_row(
338 "SELECT pv_name, dbr_type, sample_mode, sample_period, status, element_count,
339 last_timestamp, created_at, updated_at, prec, egu,
340 alias_for, archive_fields, policy_name
341 FROM pv_info WHERE pv_name = ?1",
342 params![pv_name],
343 row_to_record,
344 )
345 .optional()
346 .map_err(Into::into)
347 }
348
349 pub fn all_pv_names(&self) -> anyhow::Result<Vec<String>> {
352 let conn = self.lock_conn()?;
353 let mut stmt =
354 conn.prepare("SELECT pv_name FROM pv_info WHERE alias_for IS NULL ORDER BY pv_name")?;
355 let names = stmt
356 .query_map([], |row| row.get(0))?
357 .collect::<Result<Vec<String>, _>>()?;
358 Ok(names)
359 }
360
361 pub fn pvs_by_status(&self, status: PvStatus) -> anyhow::Result<Vec<PvRecord>> {
364 let conn = self.lock_conn()?;
365 let mut stmt = conn.prepare(
366 "SELECT pv_name, dbr_type, sample_mode, sample_period, status, element_count,
367 last_timestamp, created_at, updated_at, prec, egu,
368 alias_for, archive_fields, policy_name
369 FROM pv_info WHERE status = ?1 ORDER BY pv_name",
370 )?;
371 let records = stmt
372 .query_map(params![status.as_str()], row_to_record)?
373 .collect::<Result<Vec<_>, _>>()?;
374 Ok(records)
375 }
376
377 pub fn matching_pvs(&self, pattern: &str) -> anyhow::Result<Vec<String>> {
379 let conn = self.lock_conn()?;
380 let mut stmt = conn.prepare(
381 "SELECT pv_name FROM pv_info
382 WHERE pv_name GLOB ?1 AND alias_for IS NULL
383 ORDER BY pv_name",
384 )?;
385 let names = stmt
386 .query_map(params![pattern], |row| row.get(0))?
387 .collect::<Result<Vec<String>, _>>()?;
388 Ok(names)
389 }
390
391 pub fn matching_pvs_expanded(&self, pattern: &str) -> anyhow::Result<Vec<String>> {
397 let conn = self.lock_conn()?;
398 let mut stmt =
399 conn.prepare("SELECT pv_name FROM pv_info WHERE pv_name GLOB ?1 ORDER BY pv_name")?;
400 let names = stmt
401 .query_map(params![pattern], |row| row.get(0))?
402 .collect::<Result<Vec<String>, _>>()?;
403 Ok(names)
404 }
405
406 pub fn count(&self, status: Option<PvStatus>) -> anyhow::Result<u64> {
408 let conn = self.lock_conn()?;
409 let count: u64 = match status {
410 Some(s) => conn.query_row(
411 "SELECT COUNT(*) FROM pv_info
412 WHERE status = ?1 AND alias_for IS NULL",
413 params![s.as_str()],
414 |row| row.get(0),
415 )?,
416 None => conn.query_row(
417 "SELECT COUNT(*) FROM pv_info WHERE alias_for IS NULL",
418 [],
419 |row| row.get(0),
420 )?,
421 };
422 Ok(count)
423 }
424
425 pub fn batch_update_timestamps(&self, updates: &[(&str, SystemTime)]) -> anyhow::Result<()> {
427 let mut conn = self.lock_conn()?;
428 let tx = conn.transaction()?;
429 let now = Utc::now().to_rfc3339();
430 {
431 let mut stmt = tx.prepare(
432 "UPDATE pv_info SET last_timestamp = ?1, updated_at = ?2 WHERE pv_name = ?3",
433 )?;
434 for (pv_name, ts) in updates {
435 let dt = DateTime::<Utc>::from(*ts).to_rfc3339();
436 stmt.execute(params![dt, now, pv_name])?;
437 }
438 }
439 tx.commit()?;
440 Ok(())
441 }
442
443 pub fn recently_added_pvs(&self, since: SystemTime) -> anyhow::Result<Vec<PvRecord>> {
445 let conn = self.lock_conn()?;
446 let since_str = DateTime::<Utc>::from(since).to_rfc3339();
447 let mut stmt = conn.prepare(
448 "SELECT pv_name, dbr_type, sample_mode, sample_period, status, element_count,
449 last_timestamp, created_at, updated_at, prec, egu,
450 alias_for, archive_fields, policy_name
451 FROM pv_info WHERE created_at >= ?1 AND alias_for IS NULL
452 ORDER BY created_at DESC",
453 )?;
454 let records = stmt
455 .query_map(params![since_str], row_to_record)?
456 .collect::<Result<Vec<_>, _>>()?;
457 Ok(records)
458 }
459
460 pub fn recently_modified_pvs(&self, since: SystemTime) -> anyhow::Result<Vec<PvRecord>> {
462 let conn = self.lock_conn()?;
463 let since_str = DateTime::<Utc>::from(since).to_rfc3339();
464 let mut stmt = conn.prepare(
465 "SELECT pv_name, dbr_type, sample_mode, sample_period, status, element_count,
466 last_timestamp, created_at, updated_at, prec, egu,
467 alias_for, archive_fields, policy_name
468 FROM pv_info WHERE updated_at >= ?1 AND alias_for IS NULL
469 ORDER BY updated_at DESC",
470 )?;
471 let records = stmt
472 .query_map(params![since_str], row_to_record)?
473 .collect::<Result<Vec<_>, _>>()?;
474 Ok(records)
475 }
476
477 pub fn update_sample_mode(&self, pv_name: &str, mode: &SampleMode) -> anyhow::Result<bool> {
479 let conn = self.lock_conn()?;
480 let now = Utc::now().to_rfc3339();
481 let (mode_str, period) = mode.to_db();
482 let rows = conn.execute(
483 "UPDATE pv_info SET sample_mode = ?1, sample_period = ?2, updated_at = ?3 WHERE pv_name = ?4",
484 params![mode_str, period, now, pv_name],
485 )?;
486 Ok(rows > 0)
487 }
488
489 pub fn update_metadata(
491 &self,
492 pv_name: &str,
493 prec: Option<&str>,
494 egu: Option<&str>,
495 ) -> anyhow::Result<bool> {
496 let conn = self.lock_conn()?;
497 let now = Utc::now().to_rfc3339();
498 let rows = conn.execute(
499 "UPDATE pv_info SET prec = COALESCE(?1, prec), egu = COALESCE(?2, egu), updated_at = ?3 WHERE pv_name = ?4",
500 params![prec, egu, now, pv_name],
501 )?;
502 Ok(rows > 0)
503 }
504
505 #[allow(clippy::too_many_arguments)]
508 pub fn import_pv(
509 &self,
510 pv_name: &str,
511 dbr_type: ArchDbType,
512 sample_mode: &SampleMode,
513 element_count: i32,
514 status: PvStatus,
515 created_at: Option<&str>,
516 prec: Option<&str>,
517 egu: Option<&str>,
518 alias_for: Option<&str>,
519 archive_fields: &[String],
520 policy_name: Option<&str>,
521 ) -> anyhow::Result<()> {
522 if !is_valid_pv_name(pv_name) {
523 anyhow::bail!("invalid PV name: {pv_name:?}");
524 }
525 if let Some(target) = alias_for
526 && !is_valid_pv_name(target)
527 {
528 anyhow::bail!("invalid alias target: {target:?}");
529 }
530 let conn = self.lock_conn()?;
531 let now = Utc::now().to_rfc3339();
532 let (mode_str, period) = sample_mode.to_db();
533 let created = created_at.unwrap_or(&now);
534 let archive_fields_json = if archive_fields.is_empty() {
535 None
536 } else {
537 Some(serde_json::to_string(archive_fields)?)
538 };
539
540 conn.execute(
541 "INSERT OR REPLACE INTO pv_info
542 (pv_name, dbr_type, sample_mode, sample_period, status, element_count,
543 created_at, updated_at, prec, egu, alias_for, archive_fields, policy_name)
544 VALUES (?1, ?2, ?3, ?4, ?5, ?6, ?7, ?8, ?9, ?10, ?11, ?12, ?13)",
545 params![
546 pv_name,
547 dbr_type as i32,
548 mode_str,
549 period,
550 status.as_str(),
551 element_count,
552 created,
553 now,
554 prec,
555 egu,
556 alias_for,
557 archive_fields_json,
558 policy_name,
559 ],
560 )?;
561 Ok(())
562 }
563
564 pub fn update_archive_fields(&self, pv_name: &str, fields: &[String]) -> anyhow::Result<bool> {
566 let conn = self.lock_conn()?;
567 let now = Utc::now().to_rfc3339();
568 let json = if fields.is_empty() {
569 None
570 } else {
571 Some(serde_json::to_string(fields)?)
572 };
573 let rows = conn.execute(
574 "UPDATE pv_info SET archive_fields = ?1, updated_at = ?2 WHERE pv_name = ?3",
575 params![json, now, pv_name],
576 )?;
577 Ok(rows > 0)
578 }
579
580 pub fn update_policy_name(
582 &self,
583 pv_name: &str,
584 policy_name: Option<&str>,
585 ) -> anyhow::Result<bool> {
586 let conn = self.lock_conn()?;
587 let now = Utc::now().to_rfc3339();
588 let rows = conn.execute(
589 "UPDATE pv_info SET policy_name = ?1, updated_at = ?2 WHERE pv_name = ?3",
590 params![policy_name, now, pv_name],
591 )?;
592 Ok(rows > 0)
593 }
594
595 pub fn add_alias(&self, alias: &str, target: &str) -> anyhow::Result<()> {
600 if alias == target {
601 anyhow::bail!("alias and target must differ");
602 }
603 if !is_valid_pv_name(alias) {
604 anyhow::bail!("invalid alias name: {alias:?}");
605 }
606 if !is_valid_pv_name(target) {
607 anyhow::bail!("invalid alias target: {target:?}");
608 }
609 let conn = self.lock_conn()?;
610 let row: Option<(i32, String, f64, i32, Option<String>)> = conn
612 .query_row(
613 "SELECT dbr_type, sample_mode, sample_period, element_count, alias_for
614 FROM pv_info WHERE pv_name = ?1",
615 params![target],
616 |r| Ok((r.get(0)?, r.get(1)?, r.get(2)?, r.get(3)?, r.get(4)?)),
617 )
618 .optional()?;
619 let (dbr_type, mode, period, ec, target_alias) =
620 row.ok_or_else(|| anyhow::anyhow!("target PV '{target}' not found"))?;
621 if target_alias.is_some() {
622 anyhow::bail!(
623 "target PV '{target}' is itself an alias; aliases of aliases are not allowed"
624 );
625 }
626 let existing: Option<Option<String>> = conn
628 .query_row(
629 "SELECT alias_for FROM pv_info WHERE pv_name = ?1",
630 params![alias],
631 |r| r.get(0),
632 )
633 .optional()?;
634 if let Some(existing_alias) = existing {
635 if existing_alias.as_deref() == Some(target) {
636 return Ok(()); }
638 anyhow::bail!("'{alias}' already exists in registry");
639 }
640 let now = Utc::now().to_rfc3339();
641 conn.execute(
642 "INSERT INTO pv_info
643 (pv_name, dbr_type, sample_mode, sample_period, status, element_count,
644 created_at, updated_at, alias_for)
645 VALUES (?1, ?2, ?3, ?4, 'alias', ?5, ?6, ?6, ?7)",
646 params![alias, dbr_type, mode, period, ec, now, target],
647 )?;
648 Ok(())
649 }
650
651 pub fn remove_alias(&self, alias: &str) -> anyhow::Result<bool> {
654 let conn = self.lock_conn()?;
655 let rows = conn.execute(
656 "DELETE FROM pv_info WHERE pv_name = ?1 AND alias_for IS NOT NULL",
657 params![alias],
658 )?;
659 Ok(rows > 0)
660 }
661
662 pub fn resolve_alias(&self, name: &str) -> anyhow::Result<Option<String>> {
665 let conn = self.lock_conn()?;
666 let row: Option<Option<String>> = conn
667 .query_row(
668 "SELECT alias_for FROM pv_info WHERE pv_name = ?1",
669 params![name],
670 |r| r.get(0),
671 )
672 .optional()?;
673 Ok(row.flatten())
674 }
675
676 pub fn canonical_name(&self, name: &str) -> anyhow::Result<String> {
679 Ok(self
680 .resolve_alias(name)?
681 .unwrap_or_else(|| name.to_string()))
682 }
683
684 pub fn aliases_for(&self, target: &str) -> anyhow::Result<Vec<String>> {
686 let conn = self.lock_conn()?;
687 let mut stmt =
688 conn.prepare("SELECT pv_name FROM pv_info WHERE alias_for = ?1 ORDER BY pv_name")?;
689 let names = stmt
690 .query_map(params![target], |row| row.get(0))?
691 .collect::<Result<Vec<String>, _>>()?;
692 Ok(names)
693 }
694
695 pub fn all_aliases(&self) -> anyhow::Result<Vec<(String, String)>> {
697 let conn = self.lock_conn()?;
698 let mut stmt = conn.prepare(
699 "SELECT pv_name, alias_for FROM pv_info
700 WHERE alias_for IS NOT NULL ORDER BY pv_name",
701 )?;
702 let rows = stmt
703 .query_map([], |row| {
704 Ok((row.get::<_, String>(0)?, row.get::<_, String>(1)?))
705 })?
706 .collect::<Result<Vec<_>, _>>()?;
707 Ok(rows)
708 }
709
710 pub fn expanded_pv_names(&self) -> anyhow::Result<Vec<String>> {
712 let conn = self.lock_conn()?;
713 let mut stmt = conn.prepare("SELECT pv_name FROM pv_info ORDER BY pv_name")?;
714 let names = stmt
715 .query_map([], |row| row.get(0))?
716 .collect::<Result<Vec<String>, _>>()?;
717 Ok(names)
718 }
719
720 pub fn all_records(&self) -> anyhow::Result<Vec<PvRecord>> {
722 let conn = self.lock_conn()?;
723 let mut stmt = conn.prepare(
724 "SELECT pv_name, dbr_type, sample_mode, sample_period, status, element_count,
725 last_timestamp, created_at, updated_at, prec, egu,
726 alias_for, archive_fields, policy_name
727 FROM pv_info ORDER BY pv_name",
728 )?;
729 let records = stmt
730 .query_map([], row_to_record)?
731 .collect::<Result<Vec<_>, _>>()?;
732 Ok(records)
733 }
734
735 pub fn silent_pvs(&self, threshold: Duration) -> anyhow::Result<Vec<PvRecord>> {
739 let conn = self.lock_conn()?;
740 let cutoff = SystemTime::now()
741 .checked_sub(threshold)
742 .unwrap_or(SystemTime::UNIX_EPOCH);
743 let cutoff_str = DateTime::<Utc>::from(cutoff).to_rfc3339();
744 let mut stmt = conn.prepare(
745 "SELECT pv_name, dbr_type, sample_mode, sample_period, status, element_count,
746 last_timestamp, created_at, updated_at, prec, egu,
747 alias_for, archive_fields, policy_name
748 FROM pv_info WHERE last_timestamp IS NOT NULL AND last_timestamp < ?1
749 AND alias_for IS NULL
750 ORDER BY last_timestamp ASC",
751 )?;
752 let records = stmt
753 .query_map(params![cutoff_str], row_to_record)?
754 .collect::<Result<Vec<_>, _>>()?;
755 Ok(records)
756 }
757}
758
759fn row_to_record(row: &rusqlite::Row) -> rusqlite::Result<PvRecord> {
760 let pv_name: String = row.get(0)?;
761 let dbr_type_i: i32 = row.get(1)?;
762 let sample_mode_str: String = row.get(2)?;
763 let sample_period: f64 = row.get(3)?;
764 let status_str: String = row.get(4)?;
765 let element_count: i32 = row.get(5)?;
766 let last_ts_str: Option<String> = row.get(6)?;
767 let created_str: String = row.get(7)?;
768 let updated_str: String = row.get(8)?;
769 let prec: Option<String> = row.get(9).unwrap_or(None);
770 let egu: Option<String> = row.get(10).unwrap_or(None);
771 let alias_for: Option<String> = row.get(11).unwrap_or(None);
772 let archive_fields_json: Option<String> = row.get(12).unwrap_or(None);
773 let policy_name: Option<String> = row.get(13).unwrap_or(None);
774
775 let last_timestamp = last_ts_str.and_then(|s| {
776 DateTime::parse_from_rfc3339(&s)
777 .ok()
778 .map(|dt| dt.with_timezone(&Utc).into())
779 });
780
781 let archive_fields = archive_fields_json
782 .as_deref()
783 .and_then(|s| serde_json::from_str::<Vec<String>>(s).ok())
784 .unwrap_or_default();
785
786 Ok(PvRecord {
787 pv_name,
788 dbr_type: ArchDbType::from_i32(dbr_type_i).unwrap_or(ArchDbType::ScalarDouble),
789 sample_mode: SampleMode::from_db(&sample_mode_str, sample_period),
790 status: status_str.parse().unwrap_or(PvStatus::Active),
791 element_count,
792 last_timestamp,
793 created_at: DateTime::parse_from_rfc3339(&created_str)
794 .map(|dt| dt.with_timezone(&Utc))
795 .unwrap_or_else(|_| Utc::now()),
796 updated_at: DateTime::parse_from_rfc3339(&updated_str)
797 .map(|dt| dt.with_timezone(&Utc))
798 .unwrap_or_else(|_| Utc::now()),
799 prec,
800 egu,
801 alias_for,
802 archive_fields,
803 policy_name,
804 })
805}
806
807fn is_duplicate_column_error(e: &rusqlite::Error) -> bool {
811 matches!(
812 e,
813 rusqlite::Error::SqliteFailure(_, Some(msg))
814 if msg.starts_with("duplicate column name")
815 )
816}
817
818#[cfg(test)]
819mod tests {
820 use super::*;
821
822 #[test]
823 fn invalid_pv_names_rejected() {
824 assert!(!is_valid_pv_name("../etc/passwd"));
826 assert!(!is_valid_pv_name("foo/../bar"));
827 assert!(!is_valid_pv_name("foo:..:bar"));
828 assert!(!is_valid_pv_name("foo/./bar"));
829 assert!(!is_valid_pv_name("/etc/passwd"));
832 assert!(!is_valid_pv_name(":SIM:foo"));
833 assert!(!is_valid_pv_name(":foo"));
834 assert!(!is_valid_pv_name("foo::bar"));
836 assert!(!is_valid_pv_name("foo//bar"));
837 assert!(!is_valid_pv_name("foo:"));
838 assert!(!is_valid_pv_name("foo/"));
839 assert!(!is_valid_pv_name("foo;rm -rf /"));
841 assert!(!is_valid_pv_name("foo|bar"));
842 assert!(!is_valid_pv_name("foo`x`"));
843 assert!(!is_valid_pv_name("foo$BAR"));
844 assert!(!is_valid_pv_name("foo bar"));
845 assert!(!is_valid_pv_name("foo\nbar"));
846 assert!(!is_valid_pv_name(""));
848 assert!(!is_valid_pv_name(".hidden"));
849 assert!(!is_valid_pv_name("-leading-dash"));
850 assert!(!is_valid_pv_name(&"x".repeat(257)));
851 }
852
853 #[test]
854 fn valid_pv_names_accepted() {
855 assert!(is_valid_pv_name("SIM:Sine"));
857 assert!(is_valid_pv_name("XF:31IDA-OP{Tbl-Ax:X1}Mtr"));
858 assert!(is_valid_pv_name("ACC1-001-RFCAV-01:V<x>"));
859 assert!(is_valid_pv_name("PV.HIHI"));
860 assert!(is_valid_pv_name("BL_X+Y"));
861 assert!(is_valid_pv_name("a"));
862 assert!(is_valid_pv_name(&"x".repeat(256)));
863 }
864
865 #[test]
866 fn strip_field_suffix_basics() {
867 assert_eq!(strip_field_suffix("BASE.HIHI"), Some("BASE"));
868 assert_eq!(strip_field_suffix("BASE.LOLO"), Some("BASE"));
869 assert_eq!(strip_field_suffix("FOO.BAR_99"), Some("FOO"));
870 assert_eq!(strip_field_suffix("BASE"), None);
872 assert_eq!(strip_field_suffix("BASE.hihi"), None);
874 assert_eq!(strip_field_suffix("BASE.Hihi"), None);
875 assert_eq!(strip_field_suffix(".HIHI"), None);
877 assert_eq!(strip_field_suffix("BASE."), None);
878 }
879
880 #[test]
881 fn test_register_and_get() {
882 let reg = PvRegistry::in_memory().unwrap();
883 reg.register_pv(
884 "SIM:Sine",
885 ArchDbType::ScalarDouble,
886 &SampleMode::Monitor,
887 1,
888 )
889 .unwrap();
890
891 let record = reg.get_pv("SIM:Sine").unwrap().unwrap();
892 assert_eq!(record.pv_name, "SIM:Sine");
893 assert_eq!(record.dbr_type, ArchDbType::ScalarDouble);
894 assert_eq!(record.status, PvStatus::Active);
895 }
896
897 #[test]
898 fn test_status_transitions() {
899 let reg = PvRegistry::in_memory().unwrap();
900 reg.register_pv(
901 "SIM:Test",
902 ArchDbType::ScalarDouble,
903 &SampleMode::Monitor,
904 1,
905 )
906 .unwrap();
907
908 reg.set_status("SIM:Test", PvStatus::Paused).unwrap();
909 let r = reg.get_pv("SIM:Test").unwrap().unwrap();
910 assert_eq!(r.status, PvStatus::Paused);
911
912 reg.set_status("SIM:Test", PvStatus::Active).unwrap();
913 let r = reg.get_pv("SIM:Test").unwrap().unwrap();
914 assert_eq!(r.status, PvStatus::Active);
915 }
916
917 #[test]
918 fn test_pattern_matching() {
919 let reg = PvRegistry::in_memory().unwrap();
920 reg.register_pv(
921 "SIM:Sine",
922 ArchDbType::ScalarDouble,
923 &SampleMode::Monitor,
924 1,
925 )
926 .unwrap();
927 reg.register_pv(
928 "SIM:Cosine",
929 ArchDbType::ScalarDouble,
930 &SampleMode::Monitor,
931 1,
932 )
933 .unwrap();
934 reg.register_pv(
935 "EXP:BL1:run:active",
936 ArchDbType::ScalarEnum,
937 &SampleMode::Monitor,
938 1,
939 )
940 .unwrap();
941 reg.register_pv(
942 "EXP:BL1:motor:th:readback",
943 ArchDbType::ScalarDouble,
944 &SampleMode::Monitor,
945 1,
946 )
947 .unwrap();
948
949 let sim = reg.matching_pvs("SIM:*").unwrap();
950 assert_eq!(sim.len(), 2);
951
952 let exp = reg.matching_pvs("EXP:BL1:*").unwrap();
953 assert_eq!(exp.len(), 2);
954
955 let motor = reg.matching_pvs("EXP:*:motor:*").unwrap();
956 assert_eq!(motor.len(), 1);
957 }
958
959 #[test]
960 fn test_count_and_list() {
961 let reg = PvRegistry::in_memory().unwrap();
962 for i in 0..100 {
963 reg.register_pv(
964 &format!("PV:Test:{i:04}"),
965 ArchDbType::ScalarDouble,
966 &SampleMode::Monitor,
967 1,
968 )
969 .unwrap();
970 }
971
972 assert_eq!(reg.count(None).unwrap(), 100);
973 assert_eq!(reg.count(Some(PvStatus::Active)).unwrap(), 100);
974
975 let names = reg.all_pv_names().unwrap();
976 assert_eq!(names.len(), 100);
977 }
978
979 #[test]
980 fn test_remove_pv() {
981 let reg = PvRegistry::in_memory().unwrap();
982 reg.register_pv(
983 "SIM:Gone",
984 ArchDbType::ScalarDouble,
985 &SampleMode::Monitor,
986 1,
987 )
988 .unwrap();
989 assert!(reg.get_pv("SIM:Gone").unwrap().is_some());
990
991 reg.remove_pv("SIM:Gone").unwrap();
992 assert!(reg.get_pv("SIM:Gone").unwrap().is_none());
993 }
994
995 #[test]
996 fn test_batch_update_timestamps() {
997 let reg = PvRegistry::in_memory().unwrap();
998 reg.register_pv("PV:A", ArchDbType::ScalarDouble, &SampleMode::Monitor, 1)
999 .unwrap();
1000 reg.register_pv("PV:B", ArchDbType::ScalarDouble, &SampleMode::Monitor, 1)
1001 .unwrap();
1002
1003 let now = SystemTime::now();
1004 reg.batch_update_timestamps(&[("PV:A", now), ("PV:B", now)])
1005 .unwrap();
1006
1007 let a = reg.get_pv("PV:A").unwrap().unwrap();
1008 assert!(a.last_timestamp.is_some());
1009 }
1010
1011 #[test]
1012 fn test_recently_added_pvs() {
1013 let reg = PvRegistry::in_memory().unwrap();
1014 let before = SystemTime::now() - Duration::from_secs(1);
1015 reg.register_pv("PV:New", ArchDbType::ScalarDouble, &SampleMode::Monitor, 1)
1016 .unwrap();
1017
1018 let recent = reg.recently_added_pvs(before).unwrap();
1019 assert_eq!(recent.len(), 1);
1020 assert_eq!(recent[0].pv_name, "PV:New");
1021
1022 let future = SystemTime::now() + Duration::from_secs(3600);
1023 let none = reg.recently_added_pvs(future).unwrap();
1024 assert!(none.is_empty());
1025 }
1026
1027 #[test]
1028 fn test_recently_modified_pvs() {
1029 let reg = PvRegistry::in_memory().unwrap();
1030 reg.register_pv("PV:Mod", ArchDbType::ScalarDouble, &SampleMode::Monitor, 1)
1031 .unwrap();
1032 let before = SystemTime::now() - Duration::from_secs(1);
1033
1034 reg.set_status("PV:Mod", PvStatus::Paused).unwrap();
1036
1037 let recent = reg.recently_modified_pvs(before).unwrap();
1038 assert!(recent.iter().any(|r| r.pv_name == "PV:Mod"));
1039 }
1040
1041 #[test]
1042 fn test_update_sample_mode() {
1043 let reg = PvRegistry::in_memory().unwrap();
1044 reg.register_pv("PV:Mode", ArchDbType::ScalarDouble, &SampleMode::Monitor, 1)
1045 .unwrap();
1046
1047 let new_mode = SampleMode::Scan { period_secs: 5.0 };
1048 assert!(reg.update_sample_mode("PV:Mode", &new_mode).unwrap());
1049
1050 let r = reg.get_pv("PV:Mode").unwrap().unwrap();
1051 assert_eq!(r.sample_mode, SampleMode::Scan { period_secs: 5.0 });
1052 }
1053
1054 #[test]
1055 fn test_archive_fields_roundtrip() {
1056 let reg = PvRegistry::in_memory().unwrap();
1057 reg.register_pv(
1058 "PV:Fields",
1059 ArchDbType::ScalarDouble,
1060 &SampleMode::Monitor,
1061 1,
1062 )
1063 .unwrap();
1064
1065 let r = reg.get_pv("PV:Fields").unwrap().unwrap();
1067 assert!(r.archive_fields.is_empty());
1068
1069 let fields = vec!["HIHI".to_string(), "LOLO".to_string(), "EGU".to_string()];
1071 assert!(reg.update_archive_fields("PV:Fields", &fields).unwrap());
1072 let r = reg.get_pv("PV:Fields").unwrap().unwrap();
1073 assert_eq!(r.archive_fields, fields);
1074
1075 assert!(reg.update_archive_fields("PV:Fields", &[]).unwrap());
1077 let r = reg.get_pv("PV:Fields").unwrap().unwrap();
1078 assert!(r.archive_fields.is_empty());
1079 }
1080
1081 #[test]
1082 fn test_policy_name_roundtrip() {
1083 let reg = PvRegistry::in_memory().unwrap();
1084 reg.register_pv("PV:Pol", ArchDbType::ScalarDouble, &SampleMode::Monitor, 1)
1085 .unwrap();
1086
1087 let r = reg.get_pv("PV:Pol").unwrap().unwrap();
1088 assert!(r.policy_name.is_none());
1089
1090 assert!(reg.update_policy_name("PV:Pol", Some("fast")).unwrap());
1091 let r = reg.get_pv("PV:Pol").unwrap().unwrap();
1092 assert_eq!(r.policy_name.as_deref(), Some("fast"));
1093
1094 assert!(reg.update_policy_name("PV:Pol", None).unwrap());
1095 let r = reg.get_pv("PV:Pol").unwrap().unwrap();
1096 assert!(r.policy_name.is_none());
1097 }
1098
1099 #[test]
1100 fn test_import_pv_with_alias_and_fields() {
1101 let reg = PvRegistry::in_memory().unwrap();
1102 let fields = vec!["HIHI".to_string(), "LOLO".to_string()];
1103 reg.import_pv(
1104 "PV:Aliased",
1105 ArchDbType::ScalarDouble,
1106 &SampleMode::Monitor,
1107 1,
1108 PvStatus::Active,
1109 None,
1110 Some("3"),
1111 Some("mA"),
1112 Some("PV:Real"),
1113 &fields,
1114 Some("ring"),
1115 )
1116 .unwrap();
1117
1118 let r = reg.get_pv("PV:Aliased").unwrap().unwrap();
1119 assert_eq!(r.alias_for.as_deref(), Some("PV:Real"));
1120 assert_eq!(r.archive_fields, fields);
1121 assert_eq!(r.policy_name.as_deref(), Some("ring"));
1122 assert_eq!(r.prec.as_deref(), Some("3"));
1123 assert_eq!(r.egu.as_deref(), Some("mA"));
1124 }
1125
1126 #[test]
1127 fn test_migration_from_old_schema() {
1128 let conn = Connection::open_in_memory().unwrap();
1131 conn.execute_batch(
1132 "CREATE TABLE pv_info (
1133 pv_name TEXT PRIMARY KEY NOT NULL,
1134 dbr_type INTEGER NOT NULL,
1135 sample_mode TEXT NOT NULL DEFAULT 'monitor',
1136 sample_period REAL NOT NULL DEFAULT 0.0,
1137 status TEXT NOT NULL DEFAULT 'active',
1138 element_count INTEGER NOT NULL DEFAULT 1,
1139 last_timestamp TEXT,
1140 created_at TEXT NOT NULL,
1141 updated_at TEXT NOT NULL,
1142 prec TEXT,
1143 egu TEXT
1144 );",
1145 )
1146 .unwrap();
1147 let now = Utc::now().to_rfc3339();
1148 conn.execute(
1149 "INSERT INTO pv_info
1150 (pv_name, dbr_type, sample_mode, sample_period, status, element_count,
1151 created_at, updated_at, prec, egu)
1152 VALUES (?1, ?2, 'monitor', 0.0, 'active', 1, ?3, ?3, NULL, NULL)",
1153 params!["PV:Legacy", ArchDbType::ScalarDouble as i32, now],
1154 )
1155 .unwrap();
1156
1157 let reg = PvRegistry {
1158 conn: Mutex::new(conn),
1159 };
1160 reg.init_schema().unwrap();
1162
1163 let r = reg.get_pv("PV:Legacy").unwrap().unwrap();
1164 assert_eq!(r.pv_name, "PV:Legacy");
1165 assert!(r.alias_for.is_none());
1166 assert!(r.archive_fields.is_empty());
1167 assert!(r.policy_name.is_none());
1168
1169 assert!(
1171 reg.update_archive_fields("PV:Legacy", &["HIHI".to_string()])
1172 .unwrap()
1173 );
1174 let r = reg.get_pv("PV:Legacy").unwrap().unwrap();
1175 assert_eq!(r.archive_fields, vec!["HIHI".to_string()]);
1176 }
1177
1178 #[test]
1179 fn test_aliases_basic() {
1180 let reg = PvRegistry::in_memory().unwrap();
1181 reg.register_pv(
1182 "RING:Current",
1183 ArchDbType::ScalarDouble,
1184 &SampleMode::Monitor,
1185 1,
1186 )
1187 .unwrap();
1188
1189 reg.add_alias("DEV:Current", "RING:Current").unwrap();
1191 assert_eq!(
1192 reg.resolve_alias("DEV:Current").unwrap().as_deref(),
1193 Some("RING:Current"),
1194 );
1195 assert!(reg.resolve_alias("RING:Current").unwrap().is_none()); assert!(reg.resolve_alias("Nonexistent").unwrap().is_none());
1197
1198 assert_eq!(reg.canonical_name("DEV:Current").unwrap(), "RING:Current");
1199 assert_eq!(reg.canonical_name("RING:Current").unwrap(), "RING:Current");
1200
1201 assert_eq!(
1203 reg.aliases_for("RING:Current").unwrap(),
1204 vec!["DEV:Current".to_string()],
1205 );
1206 assert_eq!(
1207 reg.all_aliases().unwrap(),
1208 vec![("DEV:Current".to_string(), "RING:Current".to_string())],
1209 );
1210
1211 let expanded = reg.expanded_pv_names().unwrap();
1213 assert!(expanded.contains(&"RING:Current".to_string()));
1214 assert!(expanded.contains(&"DEV:Current".to_string()));
1215
1216 reg.add_alias("DEV:Current", "RING:Current").unwrap();
1218 assert_eq!(reg.aliases_for("RING:Current").unwrap().len(), 1);
1219
1220 assert!(reg.remove_alias("DEV:Current").unwrap());
1222 assert!(reg.resolve_alias("DEV:Current").unwrap().is_none());
1223 assert!(!reg.remove_alias("DEV:Current").unwrap()); }
1225
1226 #[test]
1227 fn test_alias_conflicts() {
1228 let reg = PvRegistry::in_memory().unwrap();
1229 reg.register_pv("PV:A", ArchDbType::ScalarDouble, &SampleMode::Monitor, 1)
1230 .unwrap();
1231 reg.register_pv("PV:B", ArchDbType::ScalarDouble, &SampleMode::Monitor, 1)
1232 .unwrap();
1233
1234 assert!(reg.add_alias("Alias:X", "Nonexistent").is_err());
1236
1237 assert!(reg.add_alias("PV:A", "PV:A").is_err());
1239
1240 assert!(reg.add_alias("PV:B", "PV:A").is_err());
1242
1243 reg.add_alias("Alias:A", "PV:A").unwrap();
1245 assert!(reg.add_alias("Alias:Two", "Alias:A").is_err());
1246
1247 assert!(!reg.remove_alias("PV:A").unwrap());
1249 assert!(reg.get_pv("PV:A").unwrap().is_some());
1250 }
1251
1252 #[test]
1253 fn test_silent_pvs() {
1254 let reg = PvRegistry::in_memory().unwrap();
1255 reg.register_pv(
1256 "PV:Silent",
1257 ArchDbType::ScalarDouble,
1258 &SampleMode::Monitor,
1259 1,
1260 )
1261 .unwrap();
1262 reg.register_pv(
1263 "PV:NoData",
1264 ArchDbType::ScalarDouble,
1265 &SampleMode::Monitor,
1266 1,
1267 )
1268 .unwrap();
1269
1270 let old_time = SystemTime::now() - Duration::from_secs(7200);
1272 reg.update_last_timestamp("PV:Silent", old_time).unwrap();
1273
1274 let silent = reg.silent_pvs(Duration::from_secs(3600)).unwrap();
1276 assert_eq!(silent.len(), 1);
1277 assert_eq!(silent[0].pv_name, "PV:Silent");
1278 }
1279}