1use std::path::Path;
7use std::sync::Mutex;
8use std::time::{Duration, SystemTime};
9
10use chrono::{DateTime, Utc};
11use rusqlite::{Connection, OptionalExtension, params};
12use tracing::info;
13
14use crate::types::ArchDbType;
15
16#[derive(Debug, Clone, Copy, Default, PartialEq, Eq)]
21pub enum Protocol {
22 #[default]
24 Ca,
25 Pva,
27}
28
29impl Protocol {
30 pub fn as_str(self) -> &'static str {
31 match self {
32 Protocol::Ca => "ca",
33 Protocol::Pva => "pva",
34 }
35 }
36
37 pub fn parse(s: &str) -> Option<Self> {
38 match s {
39 "ca" => Some(Protocol::Ca),
40 "pva" => Some(Protocol::Pva),
41 _ => None,
42 }
43 }
44}
45
46pub fn normalize_pv_name(name: &str) -> &str {
55 let name = name
56 .strip_prefix("pva://")
57 .or_else(|| name.strip_prefix("ca://"))
58 .unwrap_or(name);
59 name.strip_suffix(".VAL").unwrap_or(name)
60}
61
62pub fn parse_pv_with_protocol(name: &str) -> (Protocol, &str) {
65 if let Some(rest) = name.strip_prefix("pva://") {
66 (Protocol::Pva, rest.strip_suffix(".VAL").unwrap_or(rest))
67 } else if let Some(rest) = name.strip_prefix("ca://") {
68 (Protocol::Ca, rest.strip_suffix(".VAL").unwrap_or(rest))
69 } else {
70 (Protocol::Ca, name.strip_suffix(".VAL").unwrap_or(name))
71 }
72}
73
74pub fn strip_field_suffix(name: &str) -> Option<&str> {
82 let (base, field) = name.rsplit_once('.')?;
83 if base.is_empty() || field.is_empty() {
84 return None;
85 }
86 if !field
87 .chars()
88 .all(|c| c.is_ascii_uppercase() || c.is_ascii_digit() || c == '_')
89 {
90 return None;
91 }
92 Some(base)
93}
94
95pub fn is_valid_pv_name(name: &str) -> bool {
109 if name.is_empty() || name.len() > 256 {
110 return false;
111 }
112 if name.starts_with('.')
117 || name.starts_with('-')
118 || name.starts_with('/')
119 || name.starts_with(':')
120 {
121 return false;
122 }
123 for component in name.split([':', '/']) {
124 if component.is_empty() || component == ".." || component == "." {
129 return false;
130 }
131 }
132 !name.chars().any(|c| {
133 c == '\0'
134 || c == '\\'
135 || c.is_whitespace()
136 || c.is_control()
137 || matches!(c, '|' | '&' | ';' | '`' | '$' | '"' | '\'' | '*' | '?')
140 })
141}
142
143#[derive(Debug, Clone, Copy, PartialEq, Eq)]
149pub enum PvStatus {
150 Active,
151 Paused,
152 Error,
153 Inactive,
154 Alias,
155}
156
157impl PvStatus {
158 pub fn as_str(self) -> &'static str {
159 match self {
160 Self::Active => "active",
161 Self::Paused => "paused",
162 Self::Error => "error",
163 Self::Inactive => "inactive",
164 Self::Alias => "alias",
165 }
166 }
167}
168
169impl std::str::FromStr for PvStatus {
170 type Err = std::convert::Infallible;
171
172 fn from_str(s: &str) -> Result<Self, Self::Err> {
173 Ok(match s {
174 "active" => Self::Active,
175 "paused" => Self::Paused,
176 "error" => Self::Error,
177 "inactive" => Self::Inactive,
178 "alias" => Self::Alias,
179 _ => Self::Active,
180 })
181 }
182}
183
184#[derive(Debug, Clone, PartialEq)]
186pub enum SampleMode {
187 Monitor,
188 Scan { period_secs: f64 },
189}
190
191impl SampleMode {
192 fn to_db(&self) -> (&str, f64) {
193 match self {
194 Self::Monitor => ("monitor", 0.0),
195 Self::Scan { period_secs } => ("scan", *period_secs),
196 }
197 }
198
199 fn from_db(mode: &str, period: f64) -> Self {
200 match mode {
201 "scan" => Self::Scan {
202 period_secs: period,
203 },
204 _ => Self::Monitor,
205 }
206 }
207}
208
209#[derive(Debug, Clone)]
211pub struct PvRecord {
212 pub pv_name: String,
213 pub dbr_type: ArchDbType,
214 pub sample_mode: SampleMode,
215 pub status: PvStatus,
216 pub element_count: i32,
217 pub last_timestamp: Option<SystemTime>,
218 pub created_at: DateTime<Utc>,
219 pub updated_at: DateTime<Utc>,
220 pub prec: Option<String>,
221 pub egu: Option<String>,
222 pub alias_for: Option<String>,
225 pub archive_fields: Vec<String>,
228 pub policy_name: Option<String>,
230 pub protocol: Protocol,
234}
235
236pub struct PvRegistry {
238 conn: Mutex<Connection>,
239}
240
241impl PvRegistry {
242 fn lock_conn(&self) -> anyhow::Result<std::sync::MutexGuard<'_, Connection>> {
243 self.conn
244 .lock()
245 .map_err(|e| anyhow::anyhow!("PV registry lock poisoned: {e}"))
246 }
247
248 pub fn open(path: &Path) -> anyhow::Result<Self> {
250 let conn = Connection::open(path)?;
251 let registry = Self {
252 conn: Mutex::new(conn),
253 };
254 registry.init_schema()?;
255 Ok(registry)
256 }
257
258 pub fn in_memory() -> anyhow::Result<Self> {
260 let conn = Connection::open_in_memory()?;
261 let registry = Self {
262 conn: Mutex::new(conn),
263 };
264 registry.init_schema()?;
265 Ok(registry)
266 }
267
268 fn init_schema(&self) -> anyhow::Result<()> {
269 let conn = self.lock_conn()?;
270 conn.execute_batch(
273 "
274 CREATE TABLE IF NOT EXISTS pv_info (
275 pv_name TEXT PRIMARY KEY NOT NULL,
276 dbr_type INTEGER NOT NULL,
277 sample_mode TEXT NOT NULL DEFAULT 'monitor',
278 sample_period REAL NOT NULL DEFAULT 0.0,
279 status TEXT NOT NULL DEFAULT 'active',
280 element_count INTEGER NOT NULL DEFAULT 1,
281 last_timestamp TEXT,
282 created_at TEXT NOT NULL,
283 updated_at TEXT NOT NULL,
284 prec TEXT,
285 egu TEXT,
286 alias_for TEXT,
287 archive_fields TEXT,
288 policy_name TEXT,
289 protocol TEXT NOT NULL DEFAULT 'ca'
290 );
291
292 CREATE INDEX IF NOT EXISTS idx_pv_status ON pv_info(status);
293 CREATE INDEX IF NOT EXISTS idx_pv_prefix ON pv_info(pv_name COLLATE NOCASE);
294 ",
295 )?;
296 for stmt in [
301 "ALTER TABLE pv_info ADD COLUMN prec TEXT",
302 "ALTER TABLE pv_info ADD COLUMN egu TEXT",
303 "ALTER TABLE pv_info ADD COLUMN alias_for TEXT",
304 "ALTER TABLE pv_info ADD COLUMN archive_fields TEXT",
305 "ALTER TABLE pv_info ADD COLUMN policy_name TEXT",
306 "ALTER TABLE pv_info ADD COLUMN protocol TEXT NOT NULL DEFAULT 'ca'",
307 ] {
308 match conn.execute(stmt, []) {
309 Ok(_) => {}
310 Err(e) if is_duplicate_column_error(&e) => {}
311 Err(e) => return Err(e.into()),
312 }
313 }
314 conn.execute_batch(
317 "CREATE INDEX IF NOT EXISTS idx_pv_alias \
318 ON pv_info(alias_for) WHERE alias_for IS NOT NULL;",
319 )?;
320 info!("PV registry schema initialized");
321 Ok(())
322 }
323
324 pub fn register_pv(
327 &self,
328 pv_name: &str,
329 dbr_type: ArchDbType,
330 sample_mode: &SampleMode,
331 element_count: i32,
332 ) -> anyhow::Result<()> {
333 self.register_pv_with_protocol(pv_name, dbr_type, sample_mode, element_count, Protocol::Ca)
334 }
335
336 pub fn register_pv_with_protocol(
339 &self,
340 pv_name: &str,
341 dbr_type: ArchDbType,
342 sample_mode: &SampleMode,
343 element_count: i32,
344 protocol: Protocol,
345 ) -> anyhow::Result<()> {
346 if !is_valid_pv_name(pv_name) {
347 anyhow::bail!("invalid PV name: {pv_name:?}");
348 }
349 let conn = self.lock_conn()?;
350 let now = Utc::now().to_rfc3339();
351 let (mode_str, period) = sample_mode.to_db();
352
353 conn.execute(
354 "INSERT OR REPLACE INTO pv_info
355 (pv_name, dbr_type, sample_mode, sample_period, status, element_count, created_at, updated_at, protocol)
356 VALUES (?1, ?2, ?3, ?4, 'active', ?5, COALESCE((SELECT created_at FROM pv_info WHERE pv_name = ?1), ?6), ?6, ?7)",
357 params![pv_name, dbr_type as i32, mode_str, period, element_count, now, protocol.as_str()],
358 )?;
359 Ok(())
360 }
361
362 pub fn set_status(&self, pv_name: &str, status: PvStatus) -> anyhow::Result<bool> {
364 let conn = self.lock_conn()?;
365 let now = Utc::now().to_rfc3339();
366 let rows = conn.execute(
367 "UPDATE pv_info SET status = ?1, updated_at = ?2 WHERE pv_name = ?3",
368 params![status.as_str(), now, pv_name],
369 )?;
370 Ok(rows > 0)
371 }
372
373 pub fn update_last_timestamp(
375 &self,
376 pv_name: &str,
377 timestamp: SystemTime,
378 ) -> anyhow::Result<()> {
379 let conn = self.lock_conn()?;
380 let dt = DateTime::<Utc>::from(timestamp).to_rfc3339();
381 let now = Utc::now().to_rfc3339();
382 conn.execute(
383 "UPDATE pv_info SET last_timestamp = ?1, updated_at = ?2 WHERE pv_name = ?3",
384 params![dt, now, pv_name],
385 )?;
386 Ok(())
387 }
388
389 pub fn remove_pv(&self, pv_name: &str) -> anyhow::Result<bool> {
391 let conn = self.lock_conn()?;
392 let rows = conn.execute("DELETE FROM pv_info WHERE pv_name = ?1", params![pv_name])?;
393 Ok(rows > 0)
394 }
395
396 pub fn get_pv(&self, pv_name: &str) -> anyhow::Result<Option<PvRecord>> {
398 let conn = self.lock_conn()?;
399 conn.query_row(
400 "SELECT pv_name, dbr_type, sample_mode, sample_period, status, element_count,
401 last_timestamp, created_at, updated_at, prec, egu,
402 alias_for, archive_fields, policy_name, protocol
403 FROM pv_info WHERE pv_name = ?1",
404 params![pv_name],
405 row_to_record,
406 )
407 .optional()
408 .map_err(Into::into)
409 }
410
411 pub fn all_pv_names(&self) -> anyhow::Result<Vec<String>> {
414 let conn = self.lock_conn()?;
415 let mut stmt =
416 conn.prepare("SELECT pv_name FROM pv_info WHERE alias_for IS NULL ORDER BY pv_name")?;
417 let names = stmt
418 .query_map([], |row| row.get(0))?
419 .collect::<Result<Vec<String>, _>>()?;
420 Ok(names)
421 }
422
423 pub fn pvs_by_status(&self, status: PvStatus) -> anyhow::Result<Vec<PvRecord>> {
426 let conn = self.lock_conn()?;
427 let mut stmt = conn.prepare(
428 "SELECT pv_name, dbr_type, sample_mode, sample_period, status, element_count,
429 last_timestamp, created_at, updated_at, prec, egu,
430 alias_for, archive_fields, policy_name, protocol
431 FROM pv_info WHERE status = ?1 ORDER BY pv_name",
432 )?;
433 let records = stmt
434 .query_map(params![status.as_str()], row_to_record)?
435 .collect::<Result<Vec<_>, _>>()?;
436 Ok(records)
437 }
438
439 pub fn matching_pvs(&self, pattern: &str) -> anyhow::Result<Vec<String>> {
441 let conn = self.lock_conn()?;
442 let mut stmt = conn.prepare(
443 "SELECT pv_name FROM pv_info
444 WHERE pv_name GLOB ?1 AND alias_for IS NULL
445 ORDER BY pv_name",
446 )?;
447 let names = stmt
448 .query_map(params![pattern], |row| row.get(0))?
449 .collect::<Result<Vec<String>, _>>()?;
450 Ok(names)
451 }
452
453 pub fn matching_pvs_expanded(&self, pattern: &str) -> anyhow::Result<Vec<String>> {
459 let conn = self.lock_conn()?;
460 let mut stmt =
461 conn.prepare("SELECT pv_name FROM pv_info WHERE pv_name GLOB ?1 ORDER BY pv_name")?;
462 let names = stmt
463 .query_map(params![pattern], |row| row.get(0))?
464 .collect::<Result<Vec<String>, _>>()?;
465 Ok(names)
466 }
467
468 pub fn count(&self, status: Option<PvStatus>) -> anyhow::Result<u64> {
470 let conn = self.lock_conn()?;
471 let count: u64 = match status {
472 Some(s) => conn.query_row(
473 "SELECT COUNT(*) FROM pv_info
474 WHERE status = ?1 AND alias_for IS NULL",
475 params![s.as_str()],
476 |row| row.get(0),
477 )?,
478 None => conn.query_row(
479 "SELECT COUNT(*) FROM pv_info WHERE alias_for IS NULL",
480 [],
481 |row| row.get(0),
482 )?,
483 };
484 Ok(count)
485 }
486
487 pub fn batch_update_timestamps(&self, updates: &[(&str, SystemTime)]) -> anyhow::Result<()> {
489 let mut conn = self.lock_conn()?;
490 let tx = conn.transaction()?;
491 let now = Utc::now().to_rfc3339();
492 {
493 let mut stmt = tx.prepare(
494 "UPDATE pv_info SET last_timestamp = ?1, updated_at = ?2 WHERE pv_name = ?3",
495 )?;
496 for (pv_name, ts) in updates {
497 let dt = DateTime::<Utc>::from(*ts).to_rfc3339();
498 stmt.execute(params![dt, now, pv_name])?;
499 }
500 }
501 tx.commit()?;
502 Ok(())
503 }
504
505 pub fn recently_added_pvs(&self, since: SystemTime) -> anyhow::Result<Vec<PvRecord>> {
507 let conn = self.lock_conn()?;
508 let since_str = DateTime::<Utc>::from(since).to_rfc3339();
509 let mut stmt = conn.prepare(
510 "SELECT pv_name, dbr_type, sample_mode, sample_period, status, element_count,
511 last_timestamp, created_at, updated_at, prec, egu,
512 alias_for, archive_fields, policy_name, protocol
513 FROM pv_info WHERE created_at >= ?1 AND alias_for IS NULL
514 ORDER BY created_at DESC",
515 )?;
516 let records = stmt
517 .query_map(params![since_str], row_to_record)?
518 .collect::<Result<Vec<_>, _>>()?;
519 Ok(records)
520 }
521
522 pub fn recently_modified_pvs(&self, since: SystemTime) -> anyhow::Result<Vec<PvRecord>> {
524 let conn = self.lock_conn()?;
525 let since_str = DateTime::<Utc>::from(since).to_rfc3339();
526 let mut stmt = conn.prepare(
527 "SELECT pv_name, dbr_type, sample_mode, sample_period, status, element_count,
528 last_timestamp, created_at, updated_at, prec, egu,
529 alias_for, archive_fields, policy_name, protocol
530 FROM pv_info WHERE updated_at >= ?1 AND alias_for IS NULL
531 ORDER BY updated_at DESC",
532 )?;
533 let records = stmt
534 .query_map(params![since_str], row_to_record)?
535 .collect::<Result<Vec<_>, _>>()?;
536 Ok(records)
537 }
538
539 pub fn update_sample_mode(&self, pv_name: &str, mode: &SampleMode) -> anyhow::Result<bool> {
541 let conn = self.lock_conn()?;
542 let now = Utc::now().to_rfc3339();
543 let (mode_str, period) = mode.to_db();
544 let rows = conn.execute(
545 "UPDATE pv_info SET sample_mode = ?1, sample_period = ?2, updated_at = ?3 WHERE pv_name = ?4",
546 params![mode_str, period, now, pv_name],
547 )?;
548 Ok(rows > 0)
549 }
550
551 pub fn update_metadata(
553 &self,
554 pv_name: &str,
555 prec: Option<&str>,
556 egu: Option<&str>,
557 ) -> anyhow::Result<bool> {
558 let conn = self.lock_conn()?;
559 let now = Utc::now().to_rfc3339();
560 let rows = conn.execute(
561 "UPDATE pv_info SET prec = COALESCE(?1, prec), egu = COALESCE(?2, egu), updated_at = ?3 WHERE pv_name = ?4",
562 params![prec, egu, now, pv_name],
563 )?;
564 Ok(rows > 0)
565 }
566
567 #[allow(clippy::too_many_arguments)]
570 pub fn import_pv(
571 &self,
572 pv_name: &str,
573 dbr_type: ArchDbType,
574 sample_mode: &SampleMode,
575 element_count: i32,
576 status: PvStatus,
577 created_at: Option<&str>,
578 prec: Option<&str>,
579 egu: Option<&str>,
580 alias_for: Option<&str>,
581 archive_fields: &[String],
582 policy_name: Option<&str>,
583 ) -> anyhow::Result<()> {
584 self.import_pv_with_protocol(
585 pv_name,
586 dbr_type,
587 sample_mode,
588 element_count,
589 status,
590 created_at,
591 prec,
592 egu,
593 alias_for,
594 archive_fields,
595 policy_name,
596 Protocol::Ca,
597 )
598 }
599
600 #[allow(clippy::too_many_arguments)]
603 pub fn import_pv_with_protocol(
604 &self,
605 pv_name: &str,
606 dbr_type: ArchDbType,
607 sample_mode: &SampleMode,
608 element_count: i32,
609 status: PvStatus,
610 created_at: Option<&str>,
611 prec: Option<&str>,
612 egu: Option<&str>,
613 alias_for: Option<&str>,
614 archive_fields: &[String],
615 policy_name: Option<&str>,
616 protocol: Protocol,
617 ) -> anyhow::Result<()> {
618 if !is_valid_pv_name(pv_name) {
619 anyhow::bail!("invalid PV name: {pv_name:?}");
620 }
621 if let Some(target) = alias_for
622 && !is_valid_pv_name(target)
623 {
624 anyhow::bail!("invalid alias target: {target:?}");
625 }
626 let conn = self.lock_conn()?;
627 let now = Utc::now().to_rfc3339();
628 let (mode_str, period) = sample_mode.to_db();
629 let created = created_at.unwrap_or(&now);
630 let archive_fields_json = if archive_fields.is_empty() {
631 None
632 } else {
633 Some(serde_json::to_string(archive_fields)?)
634 };
635
636 conn.execute(
637 "INSERT OR REPLACE INTO pv_info
638 (pv_name, dbr_type, sample_mode, sample_period, status, element_count,
639 created_at, updated_at, prec, egu, alias_for, archive_fields, policy_name, protocol)
640 VALUES (?1, ?2, ?3, ?4, ?5, ?6, ?7, ?8, ?9, ?10, ?11, ?12, ?13, ?14)",
641 params![
642 pv_name,
643 dbr_type as i32,
644 mode_str,
645 period,
646 status.as_str(),
647 element_count,
648 created,
649 now,
650 prec,
651 egu,
652 alias_for,
653 archive_fields_json,
654 policy_name,
655 protocol.as_str(),
656 ],
657 )?;
658 Ok(())
659 }
660
661 pub fn update_archive_fields(&self, pv_name: &str, fields: &[String]) -> anyhow::Result<bool> {
663 let conn = self.lock_conn()?;
664 let now = Utc::now().to_rfc3339();
665 let json = if fields.is_empty() {
666 None
667 } else {
668 Some(serde_json::to_string(fields)?)
669 };
670 let rows = conn.execute(
671 "UPDATE pv_info SET archive_fields = ?1, updated_at = ?2 WHERE pv_name = ?3",
672 params![json, now, pv_name],
673 )?;
674 Ok(rows > 0)
675 }
676
677 pub fn update_policy_name(
679 &self,
680 pv_name: &str,
681 policy_name: Option<&str>,
682 ) -> anyhow::Result<bool> {
683 let conn = self.lock_conn()?;
684 let now = Utc::now().to_rfc3339();
685 let rows = conn.execute(
686 "UPDATE pv_info SET policy_name = ?1, updated_at = ?2 WHERE pv_name = ?3",
687 params![policy_name, now, pv_name],
688 )?;
689 Ok(rows > 0)
690 }
691
692 pub fn add_alias(&self, alias: &str, target: &str) -> anyhow::Result<()> {
697 if alias == target {
698 anyhow::bail!("alias and target must differ");
699 }
700 if !is_valid_pv_name(alias) {
701 anyhow::bail!("invalid alias name: {alias:?}");
702 }
703 if !is_valid_pv_name(target) {
704 anyhow::bail!("invalid alias target: {target:?}");
705 }
706 let conn = self.lock_conn()?;
707 let row: Option<(i32, String, f64, i32, Option<String>)> = conn
709 .query_row(
710 "SELECT dbr_type, sample_mode, sample_period, element_count, alias_for
711 FROM pv_info WHERE pv_name = ?1",
712 params![target],
713 |r| Ok((r.get(0)?, r.get(1)?, r.get(2)?, r.get(3)?, r.get(4)?)),
714 )
715 .optional()?;
716 let (dbr_type, mode, period, ec, target_alias) =
717 row.ok_or_else(|| anyhow::anyhow!("target PV '{target}' not found"))?;
718 if target_alias.is_some() {
719 anyhow::bail!(
720 "target PV '{target}' is itself an alias; aliases of aliases are not allowed"
721 );
722 }
723 let existing: Option<Option<String>> = conn
725 .query_row(
726 "SELECT alias_for FROM pv_info WHERE pv_name = ?1",
727 params![alias],
728 |r| r.get(0),
729 )
730 .optional()?;
731 if let Some(existing_alias) = existing {
732 if existing_alias.as_deref() == Some(target) {
733 return Ok(()); }
735 anyhow::bail!("'{alias}' already exists in registry");
736 }
737 let now = Utc::now().to_rfc3339();
738 conn.execute(
739 "INSERT INTO pv_info
740 (pv_name, dbr_type, sample_mode, sample_period, status, element_count,
741 created_at, updated_at, alias_for)
742 VALUES (?1, ?2, ?3, ?4, 'alias', ?5, ?6, ?6, ?7)",
743 params![alias, dbr_type, mode, period, ec, now, target],
744 )?;
745 Ok(())
746 }
747
748 pub fn remove_alias(&self, alias: &str) -> anyhow::Result<bool> {
751 let conn = self.lock_conn()?;
752 let rows = conn.execute(
753 "DELETE FROM pv_info WHERE pv_name = ?1 AND alias_for IS NOT NULL",
754 params![alias],
755 )?;
756 Ok(rows > 0)
757 }
758
759 pub fn resolve_alias(&self, name: &str) -> anyhow::Result<Option<String>> {
762 let conn = self.lock_conn()?;
763 let row: Option<Option<String>> = conn
764 .query_row(
765 "SELECT alias_for FROM pv_info WHERE pv_name = ?1",
766 params![name],
767 |r| r.get(0),
768 )
769 .optional()?;
770 Ok(row.flatten())
771 }
772
773 pub fn canonical_name(&self, name: &str) -> anyhow::Result<String> {
776 Ok(self
777 .resolve_alias(name)?
778 .unwrap_or_else(|| name.to_string()))
779 }
780
781 pub fn aliases_for(&self, target: &str) -> anyhow::Result<Vec<String>> {
783 let conn = self.lock_conn()?;
784 let mut stmt =
785 conn.prepare("SELECT pv_name FROM pv_info WHERE alias_for = ?1 ORDER BY pv_name")?;
786 let names = stmt
787 .query_map(params![target], |row| row.get(0))?
788 .collect::<Result<Vec<String>, _>>()?;
789 Ok(names)
790 }
791
792 pub fn all_aliases(&self) -> anyhow::Result<Vec<(String, String)>> {
794 let conn = self.lock_conn()?;
795 let mut stmt = conn.prepare(
796 "SELECT pv_name, alias_for FROM pv_info
797 WHERE alias_for IS NOT NULL ORDER BY pv_name",
798 )?;
799 let rows = stmt
800 .query_map([], |row| {
801 Ok((row.get::<_, String>(0)?, row.get::<_, String>(1)?))
802 })?
803 .collect::<Result<Vec<_>, _>>()?;
804 Ok(rows)
805 }
806
807 pub fn expanded_pv_names(&self) -> anyhow::Result<Vec<String>> {
809 let conn = self.lock_conn()?;
810 let mut stmt = conn.prepare("SELECT pv_name FROM pv_info ORDER BY pv_name")?;
811 let names = stmt
812 .query_map([], |row| row.get(0))?
813 .collect::<Result<Vec<String>, _>>()?;
814 Ok(names)
815 }
816
817 pub fn all_records(&self) -> anyhow::Result<Vec<PvRecord>> {
819 let conn = self.lock_conn()?;
820 let mut stmt = conn.prepare(
821 "SELECT pv_name, dbr_type, sample_mode, sample_period, status, element_count,
822 last_timestamp, created_at, updated_at, prec, egu,
823 alias_for, archive_fields, policy_name, protocol
824 FROM pv_info ORDER BY pv_name",
825 )?;
826 let records = stmt
827 .query_map([], row_to_record)?
828 .collect::<Result<Vec<_>, _>>()?;
829 Ok(records)
830 }
831
832 pub fn silent_pvs(&self, threshold: Duration) -> anyhow::Result<Vec<PvRecord>> {
836 let conn = self.lock_conn()?;
837 let cutoff = SystemTime::now()
838 .checked_sub(threshold)
839 .unwrap_or(SystemTime::UNIX_EPOCH);
840 let cutoff_str = DateTime::<Utc>::from(cutoff).to_rfc3339();
841 let mut stmt = conn.prepare(
842 "SELECT pv_name, dbr_type, sample_mode, sample_period, status, element_count,
843 last_timestamp, created_at, updated_at, prec, egu,
844 alias_for, archive_fields, policy_name, protocol
845 FROM pv_info WHERE last_timestamp IS NOT NULL AND last_timestamp < ?1
846 AND alias_for IS NULL
847 ORDER BY last_timestamp ASC",
848 )?;
849 let records = stmt
850 .query_map(params![cutoff_str], row_to_record)?
851 .collect::<Result<Vec<_>, _>>()?;
852 Ok(records)
853 }
854}
855
856fn row_to_record(row: &rusqlite::Row) -> rusqlite::Result<PvRecord> {
857 let pv_name: String = row.get(0)?;
858 let dbr_type_i: i32 = row.get(1)?;
859 let sample_mode_str: String = row.get(2)?;
860 let sample_period: f64 = row.get(3)?;
861 let status_str: String = row.get(4)?;
862 let element_count: i32 = row.get(5)?;
863 let last_ts_str: Option<String> = row.get(6)?;
864 let created_str: String = row.get(7)?;
865 let updated_str: String = row.get(8)?;
866 let prec: Option<String> = row.get(9).unwrap_or(None);
867 let egu: Option<String> = row.get(10).unwrap_or(None);
868 let alias_for: Option<String> = row.get(11).unwrap_or(None);
869 let archive_fields_json: Option<String> = row.get(12).unwrap_or(None);
870 let policy_name: Option<String> = row.get(13).unwrap_or(None);
871 let protocol_str: Option<String> = row.get(14).unwrap_or(None);
874 let protocol = protocol_str
875 .as_deref()
876 .and_then(Protocol::parse)
877 .unwrap_or_default();
878
879 let last_timestamp = last_ts_str.and_then(|s| {
880 DateTime::parse_from_rfc3339(&s)
881 .ok()
882 .map(|dt| dt.with_timezone(&Utc).into())
883 });
884
885 let archive_fields = archive_fields_json
886 .as_deref()
887 .and_then(|s| serde_json::from_str::<Vec<String>>(s).ok())
888 .unwrap_or_default();
889
890 Ok(PvRecord {
891 pv_name,
892 dbr_type: ArchDbType::from_i32(dbr_type_i).unwrap_or(ArchDbType::ScalarDouble),
893 sample_mode: SampleMode::from_db(&sample_mode_str, sample_period),
894 status: status_str.parse().unwrap_or(PvStatus::Active),
895 element_count,
896 last_timestamp,
897 created_at: DateTime::parse_from_rfc3339(&created_str)
898 .map(|dt| dt.with_timezone(&Utc))
899 .unwrap_or_else(|_| Utc::now()),
900 updated_at: DateTime::parse_from_rfc3339(&updated_str)
901 .map(|dt| dt.with_timezone(&Utc))
902 .unwrap_or_else(|_| Utc::now()),
903 prec,
904 egu,
905 alias_for,
906 archive_fields,
907 policy_name,
908 protocol,
909 })
910}
911
912fn is_duplicate_column_error(e: &rusqlite::Error) -> bool {
916 matches!(
917 e,
918 rusqlite::Error::SqliteFailure(_, Some(msg))
919 if msg.starts_with("duplicate column name")
920 )
921}
922
923#[cfg(test)]
924mod tests {
925 use super::*;
926
927 #[test]
928 fn invalid_pv_names_rejected() {
929 assert!(!is_valid_pv_name("../etc/passwd"));
931 assert!(!is_valid_pv_name("foo/../bar"));
932 assert!(!is_valid_pv_name("foo:..:bar"));
933 assert!(!is_valid_pv_name("foo/./bar"));
934 assert!(!is_valid_pv_name("/etc/passwd"));
937 assert!(!is_valid_pv_name(":SIM:foo"));
938 assert!(!is_valid_pv_name(":foo"));
939 assert!(!is_valid_pv_name("foo::bar"));
941 assert!(!is_valid_pv_name("foo//bar"));
942 assert!(!is_valid_pv_name("foo:"));
943 assert!(!is_valid_pv_name("foo/"));
944 assert!(!is_valid_pv_name("foo;rm -rf /"));
946 assert!(!is_valid_pv_name("foo|bar"));
947 assert!(!is_valid_pv_name("foo`x`"));
948 assert!(!is_valid_pv_name("foo$BAR"));
949 assert!(!is_valid_pv_name("foo bar"));
950 assert!(!is_valid_pv_name("foo\nbar"));
951 assert!(!is_valid_pv_name(""));
953 assert!(!is_valid_pv_name(".hidden"));
954 assert!(!is_valid_pv_name("-leading-dash"));
955 assert!(!is_valid_pv_name(&"x".repeat(257)));
956 }
957
958 #[test]
959 fn valid_pv_names_accepted() {
960 assert!(is_valid_pv_name("SIM:Sine"));
962 assert!(is_valid_pv_name("XF:31IDA-OP{Tbl-Ax:X1}Mtr"));
963 assert!(is_valid_pv_name("ACC1-001-RFCAV-01:V<x>"));
964 assert!(is_valid_pv_name("PV.HIHI"));
965 assert!(is_valid_pv_name("BL_X+Y"));
966 assert!(is_valid_pv_name("a"));
967 assert!(is_valid_pv_name(&"x".repeat(256)));
968 }
969
970 #[test]
971 fn strip_field_suffix_basics() {
972 assert_eq!(strip_field_suffix("BASE.HIHI"), Some("BASE"));
973 assert_eq!(strip_field_suffix("BASE.LOLO"), Some("BASE"));
974 assert_eq!(strip_field_suffix("FOO.BAR_99"), Some("FOO"));
975 assert_eq!(strip_field_suffix("BASE"), None);
977 assert_eq!(strip_field_suffix("BASE.hihi"), None);
979 assert_eq!(strip_field_suffix("BASE.Hihi"), None);
980 assert_eq!(strip_field_suffix(".HIHI"), None);
982 assert_eq!(strip_field_suffix("BASE."), None);
983 }
984
985 #[test]
986 fn test_register_and_get() {
987 let reg = PvRegistry::in_memory().unwrap();
988 reg.register_pv(
989 "SIM:Sine",
990 ArchDbType::ScalarDouble,
991 &SampleMode::Monitor,
992 1,
993 )
994 .unwrap();
995
996 let record = reg.get_pv("SIM:Sine").unwrap().unwrap();
997 assert_eq!(record.pv_name, "SIM:Sine");
998 assert_eq!(record.dbr_type, ArchDbType::ScalarDouble);
999 assert_eq!(record.status, PvStatus::Active);
1000 }
1001
1002 #[test]
1003 fn test_status_transitions() {
1004 let reg = PvRegistry::in_memory().unwrap();
1005 reg.register_pv(
1006 "SIM:Test",
1007 ArchDbType::ScalarDouble,
1008 &SampleMode::Monitor,
1009 1,
1010 )
1011 .unwrap();
1012
1013 reg.set_status("SIM:Test", PvStatus::Paused).unwrap();
1014 let r = reg.get_pv("SIM:Test").unwrap().unwrap();
1015 assert_eq!(r.status, PvStatus::Paused);
1016
1017 reg.set_status("SIM:Test", PvStatus::Active).unwrap();
1018 let r = reg.get_pv("SIM:Test").unwrap().unwrap();
1019 assert_eq!(r.status, PvStatus::Active);
1020 }
1021
1022 #[test]
1023 fn test_pattern_matching() {
1024 let reg = PvRegistry::in_memory().unwrap();
1025 reg.register_pv(
1026 "SIM:Sine",
1027 ArchDbType::ScalarDouble,
1028 &SampleMode::Monitor,
1029 1,
1030 )
1031 .unwrap();
1032 reg.register_pv(
1033 "SIM:Cosine",
1034 ArchDbType::ScalarDouble,
1035 &SampleMode::Monitor,
1036 1,
1037 )
1038 .unwrap();
1039 reg.register_pv(
1040 "EXP:BL1:run:active",
1041 ArchDbType::ScalarEnum,
1042 &SampleMode::Monitor,
1043 1,
1044 )
1045 .unwrap();
1046 reg.register_pv(
1047 "EXP:BL1:motor:th:readback",
1048 ArchDbType::ScalarDouble,
1049 &SampleMode::Monitor,
1050 1,
1051 )
1052 .unwrap();
1053
1054 let sim = reg.matching_pvs("SIM:*").unwrap();
1055 assert_eq!(sim.len(), 2);
1056
1057 let exp = reg.matching_pvs("EXP:BL1:*").unwrap();
1058 assert_eq!(exp.len(), 2);
1059
1060 let motor = reg.matching_pvs("EXP:*:motor:*").unwrap();
1061 assert_eq!(motor.len(), 1);
1062 }
1063
1064 #[test]
1065 fn test_count_and_list() {
1066 let reg = PvRegistry::in_memory().unwrap();
1067 for i in 0..100 {
1068 reg.register_pv(
1069 &format!("PV:Test:{i:04}"),
1070 ArchDbType::ScalarDouble,
1071 &SampleMode::Monitor,
1072 1,
1073 )
1074 .unwrap();
1075 }
1076
1077 assert_eq!(reg.count(None).unwrap(), 100);
1078 assert_eq!(reg.count(Some(PvStatus::Active)).unwrap(), 100);
1079
1080 let names = reg.all_pv_names().unwrap();
1081 assert_eq!(names.len(), 100);
1082 }
1083
1084 #[test]
1085 fn test_remove_pv() {
1086 let reg = PvRegistry::in_memory().unwrap();
1087 reg.register_pv(
1088 "SIM:Gone",
1089 ArchDbType::ScalarDouble,
1090 &SampleMode::Monitor,
1091 1,
1092 )
1093 .unwrap();
1094 assert!(reg.get_pv("SIM:Gone").unwrap().is_some());
1095
1096 reg.remove_pv("SIM:Gone").unwrap();
1097 assert!(reg.get_pv("SIM:Gone").unwrap().is_none());
1098 }
1099
1100 #[test]
1101 fn test_batch_update_timestamps() {
1102 let reg = PvRegistry::in_memory().unwrap();
1103 reg.register_pv("PV:A", ArchDbType::ScalarDouble, &SampleMode::Monitor, 1)
1104 .unwrap();
1105 reg.register_pv("PV:B", ArchDbType::ScalarDouble, &SampleMode::Monitor, 1)
1106 .unwrap();
1107
1108 let now = SystemTime::now();
1109 reg.batch_update_timestamps(&[("PV:A", now), ("PV:B", now)])
1110 .unwrap();
1111
1112 let a = reg.get_pv("PV:A").unwrap().unwrap();
1113 assert!(a.last_timestamp.is_some());
1114 }
1115
1116 #[test]
1117 fn test_recently_added_pvs() {
1118 let reg = PvRegistry::in_memory().unwrap();
1119 let before = SystemTime::now() - Duration::from_secs(1);
1120 reg.register_pv("PV:New", ArchDbType::ScalarDouble, &SampleMode::Monitor, 1)
1121 .unwrap();
1122
1123 let recent = reg.recently_added_pvs(before).unwrap();
1124 assert_eq!(recent.len(), 1);
1125 assert_eq!(recent[0].pv_name, "PV:New");
1126
1127 let future = SystemTime::now() + Duration::from_secs(3600);
1128 let none = reg.recently_added_pvs(future).unwrap();
1129 assert!(none.is_empty());
1130 }
1131
1132 #[test]
1133 fn test_recently_modified_pvs() {
1134 let reg = PvRegistry::in_memory().unwrap();
1135 reg.register_pv("PV:Mod", ArchDbType::ScalarDouble, &SampleMode::Monitor, 1)
1136 .unwrap();
1137 let before = SystemTime::now() - Duration::from_secs(1);
1138
1139 reg.set_status("PV:Mod", PvStatus::Paused).unwrap();
1141
1142 let recent = reg.recently_modified_pvs(before).unwrap();
1143 assert!(recent.iter().any(|r| r.pv_name == "PV:Mod"));
1144 }
1145
1146 #[test]
1147 fn test_update_sample_mode() {
1148 let reg = PvRegistry::in_memory().unwrap();
1149 reg.register_pv("PV:Mode", ArchDbType::ScalarDouble, &SampleMode::Monitor, 1)
1150 .unwrap();
1151
1152 let new_mode = SampleMode::Scan { period_secs: 5.0 };
1153 assert!(reg.update_sample_mode("PV:Mode", &new_mode).unwrap());
1154
1155 let r = reg.get_pv("PV:Mode").unwrap().unwrap();
1156 assert_eq!(r.sample_mode, SampleMode::Scan { period_secs: 5.0 });
1157 }
1158
1159 #[test]
1160 fn test_archive_fields_roundtrip() {
1161 let reg = PvRegistry::in_memory().unwrap();
1162 reg.register_pv(
1163 "PV:Fields",
1164 ArchDbType::ScalarDouble,
1165 &SampleMode::Monitor,
1166 1,
1167 )
1168 .unwrap();
1169
1170 let r = reg.get_pv("PV:Fields").unwrap().unwrap();
1172 assert!(r.archive_fields.is_empty());
1173
1174 let fields = vec!["HIHI".to_string(), "LOLO".to_string(), "EGU".to_string()];
1176 assert!(reg.update_archive_fields("PV:Fields", &fields).unwrap());
1177 let r = reg.get_pv("PV:Fields").unwrap().unwrap();
1178 assert_eq!(r.archive_fields, fields);
1179
1180 assert!(reg.update_archive_fields("PV:Fields", &[]).unwrap());
1182 let r = reg.get_pv("PV:Fields").unwrap().unwrap();
1183 assert!(r.archive_fields.is_empty());
1184 }
1185
1186 #[test]
1187 fn test_policy_name_roundtrip() {
1188 let reg = PvRegistry::in_memory().unwrap();
1189 reg.register_pv("PV:Pol", ArchDbType::ScalarDouble, &SampleMode::Monitor, 1)
1190 .unwrap();
1191
1192 let r = reg.get_pv("PV:Pol").unwrap().unwrap();
1193 assert!(r.policy_name.is_none());
1194
1195 assert!(reg.update_policy_name("PV:Pol", Some("fast")).unwrap());
1196 let r = reg.get_pv("PV:Pol").unwrap().unwrap();
1197 assert_eq!(r.policy_name.as_deref(), Some("fast"));
1198
1199 assert!(reg.update_policy_name("PV:Pol", None).unwrap());
1200 let r = reg.get_pv("PV:Pol").unwrap().unwrap();
1201 assert!(r.policy_name.is_none());
1202 }
1203
1204 #[test]
1205 fn test_import_pv_with_alias_and_fields() {
1206 let reg = PvRegistry::in_memory().unwrap();
1207 let fields = vec!["HIHI".to_string(), "LOLO".to_string()];
1208 reg.import_pv(
1209 "PV:Aliased",
1210 ArchDbType::ScalarDouble,
1211 &SampleMode::Monitor,
1212 1,
1213 PvStatus::Active,
1214 None,
1215 Some("3"),
1216 Some("mA"),
1217 Some("PV:Real"),
1218 &fields,
1219 Some("ring"),
1220 )
1221 .unwrap();
1222
1223 let r = reg.get_pv("PV:Aliased").unwrap().unwrap();
1224 assert_eq!(r.alias_for.as_deref(), Some("PV:Real"));
1225 assert_eq!(r.archive_fields, fields);
1226 assert_eq!(r.policy_name.as_deref(), Some("ring"));
1227 assert_eq!(r.prec.as_deref(), Some("3"));
1228 assert_eq!(r.egu.as_deref(), Some("mA"));
1229 }
1230
1231 #[test]
1232 fn test_migration_from_old_schema() {
1233 let conn = Connection::open_in_memory().unwrap();
1236 conn.execute_batch(
1237 "CREATE TABLE pv_info (
1238 pv_name TEXT PRIMARY KEY NOT NULL,
1239 dbr_type INTEGER NOT NULL,
1240 sample_mode TEXT NOT NULL DEFAULT 'monitor',
1241 sample_period REAL NOT NULL DEFAULT 0.0,
1242 status TEXT NOT NULL DEFAULT 'active',
1243 element_count INTEGER NOT NULL DEFAULT 1,
1244 last_timestamp TEXT,
1245 created_at TEXT NOT NULL,
1246 updated_at TEXT NOT NULL,
1247 prec TEXT,
1248 egu TEXT
1249 );",
1250 )
1251 .unwrap();
1252 let now = Utc::now().to_rfc3339();
1253 conn.execute(
1254 "INSERT INTO pv_info
1255 (pv_name, dbr_type, sample_mode, sample_period, status, element_count,
1256 created_at, updated_at, prec, egu)
1257 VALUES (?1, ?2, 'monitor', 0.0, 'active', 1, ?3, ?3, NULL, NULL)",
1258 params!["PV:Legacy", ArchDbType::ScalarDouble as i32, now],
1259 )
1260 .unwrap();
1261
1262 let reg = PvRegistry {
1263 conn: Mutex::new(conn),
1264 };
1265 reg.init_schema().unwrap();
1267
1268 let r = reg.get_pv("PV:Legacy").unwrap().unwrap();
1269 assert_eq!(r.pv_name, "PV:Legacy");
1270 assert!(r.alias_for.is_none());
1271 assert!(r.archive_fields.is_empty());
1272 assert!(r.policy_name.is_none());
1273
1274 assert!(
1276 reg.update_archive_fields("PV:Legacy", &["HIHI".to_string()])
1277 .unwrap()
1278 );
1279 let r = reg.get_pv("PV:Legacy").unwrap().unwrap();
1280 assert_eq!(r.archive_fields, vec!["HIHI".to_string()]);
1281 }
1282
1283 #[test]
1284 fn test_aliases_basic() {
1285 let reg = PvRegistry::in_memory().unwrap();
1286 reg.register_pv(
1287 "RING:Current",
1288 ArchDbType::ScalarDouble,
1289 &SampleMode::Monitor,
1290 1,
1291 )
1292 .unwrap();
1293
1294 reg.add_alias("DEV:Current", "RING:Current").unwrap();
1296 assert_eq!(
1297 reg.resolve_alias("DEV:Current").unwrap().as_deref(),
1298 Some("RING:Current"),
1299 );
1300 assert!(reg.resolve_alias("RING:Current").unwrap().is_none()); assert!(reg.resolve_alias("Nonexistent").unwrap().is_none());
1302
1303 assert_eq!(reg.canonical_name("DEV:Current").unwrap(), "RING:Current");
1304 assert_eq!(reg.canonical_name("RING:Current").unwrap(), "RING:Current");
1305
1306 assert_eq!(
1308 reg.aliases_for("RING:Current").unwrap(),
1309 vec!["DEV:Current".to_string()],
1310 );
1311 assert_eq!(
1312 reg.all_aliases().unwrap(),
1313 vec![("DEV:Current".to_string(), "RING:Current".to_string())],
1314 );
1315
1316 let expanded = reg.expanded_pv_names().unwrap();
1318 assert!(expanded.contains(&"RING:Current".to_string()));
1319 assert!(expanded.contains(&"DEV:Current".to_string()));
1320
1321 reg.add_alias("DEV:Current", "RING:Current").unwrap();
1323 assert_eq!(reg.aliases_for("RING:Current").unwrap().len(), 1);
1324
1325 assert!(reg.remove_alias("DEV:Current").unwrap());
1327 assert!(reg.resolve_alias("DEV:Current").unwrap().is_none());
1328 assert!(!reg.remove_alias("DEV:Current").unwrap()); }
1330
1331 #[test]
1332 fn test_alias_conflicts() {
1333 let reg = PvRegistry::in_memory().unwrap();
1334 reg.register_pv("PV:A", ArchDbType::ScalarDouble, &SampleMode::Monitor, 1)
1335 .unwrap();
1336 reg.register_pv("PV:B", ArchDbType::ScalarDouble, &SampleMode::Monitor, 1)
1337 .unwrap();
1338
1339 assert!(reg.add_alias("Alias:X", "Nonexistent").is_err());
1341
1342 assert!(reg.add_alias("PV:A", "PV:A").is_err());
1344
1345 assert!(reg.add_alias("PV:B", "PV:A").is_err());
1347
1348 reg.add_alias("Alias:A", "PV:A").unwrap();
1350 assert!(reg.add_alias("Alias:Two", "Alias:A").is_err());
1351
1352 assert!(!reg.remove_alias("PV:A").unwrap());
1354 assert!(reg.get_pv("PV:A").unwrap().is_some());
1355 }
1356
1357 #[test]
1358 fn test_silent_pvs() {
1359 let reg = PvRegistry::in_memory().unwrap();
1360 reg.register_pv(
1361 "PV:Silent",
1362 ArchDbType::ScalarDouble,
1363 &SampleMode::Monitor,
1364 1,
1365 )
1366 .unwrap();
1367 reg.register_pv(
1368 "PV:NoData",
1369 ArchDbType::ScalarDouble,
1370 &SampleMode::Monitor,
1371 1,
1372 )
1373 .unwrap();
1374
1375 let old_time = SystemTime::now() - Duration::from_secs(7200);
1377 reg.update_last_timestamp("PV:Silent", old_time).unwrap();
1378
1379 let silent = reg.silent_pvs(Duration::from_secs(3600)).unwrap();
1381 assert_eq!(silent.len(), 1);
1382 assert_eq!(silent[0].pv_name, "PV:Silent");
1383 }
1384}