1use std::path::Path;
7use std::sync::Mutex;
8use std::time::{Duration, SystemTime};
9
10use chrono::{DateTime, Utc};
11use rusqlite::{Connection, OptionalExtension, params};
12use tracing::info;
13
14use crate::types::ArchDbType;
15
16#[derive(Debug, Clone, Copy, Default, PartialEq, Eq)]
21pub enum Protocol {
22 #[default]
24 Ca,
25 Pva,
27}
28
29impl Protocol {
30 pub fn as_str(self) -> &'static str {
31 match self {
32 Protocol::Ca => "ca",
33 Protocol::Pva => "pva",
34 }
35 }
36
37 pub fn parse(s: &str) -> Option<Self> {
38 match s {
39 "ca" => Some(Protocol::Ca),
40 "pva" => Some(Protocol::Pva),
41 _ => None,
42 }
43 }
44}
45
46pub fn normalize_pv_name(name: &str) -> &str {
55 let name = name
56 .strip_prefix("pva://")
57 .or_else(|| name.strip_prefix("ca://"))
58 .unwrap_or(name);
59 name.strip_suffix(".VAL").unwrap_or(name)
60}
61
62pub fn parse_pv_with_protocol(name: &str) -> (Protocol, &str) {
65 if let Some(rest) = name.strip_prefix("pva://") {
66 (Protocol::Pva, rest.strip_suffix(".VAL").unwrap_or(rest))
67 } else if let Some(rest) = name.strip_prefix("ca://") {
68 (Protocol::Ca, rest.strip_suffix(".VAL").unwrap_or(rest))
69 } else {
70 (Protocol::Ca, name.strip_suffix(".VAL").unwrap_or(name))
71 }
72}
73
74pub fn strip_field_suffix(name: &str) -> Option<&str> {
82 let (base, field) = name.rsplit_once('.')?;
83 if base.is_empty() || field.is_empty() {
84 return None;
85 }
86 if !field
87 .chars()
88 .all(|c| c.is_ascii_uppercase() || c.is_ascii_digit() || c == '_')
89 {
90 return None;
91 }
92 Some(base)
93}
94
95pub fn is_valid_pv_name(name: &str) -> bool {
109 if name.is_empty() || name.len() > 256 {
110 return false;
111 }
112 if name.starts_with('.')
117 || name.starts_with('-')
118 || name.starts_with('/')
119 || name.starts_with(':')
120 {
121 return false;
122 }
123 for component in name.split([':', '/']) {
124 if component.is_empty() || component == ".." || component == "." {
129 return false;
130 }
131 }
132 !name.chars().any(|c| {
133 c == '\0'
134 || c == '\\'
135 || c.is_whitespace()
136 || c.is_control()
137 || matches!(c, '|' | '&' | ';' | '`' | '$' | '"' | '\'' | '*' | '?')
140 })
141}
142
143#[derive(Debug, Clone, Copy, PartialEq, Eq)]
149pub enum PvStatus {
150 Active,
151 Paused,
152 Error,
153 Inactive,
154 Alias,
155}
156
157impl PvStatus {
158 pub fn as_str(self) -> &'static str {
159 match self {
160 Self::Active => "active",
161 Self::Paused => "paused",
162 Self::Error => "error",
163 Self::Inactive => "inactive",
164 Self::Alias => "alias",
165 }
166 }
167}
168
169impl std::str::FromStr for PvStatus {
170 type Err = std::convert::Infallible;
171
172 fn from_str(s: &str) -> Result<Self, Self::Err> {
173 Ok(match s {
174 "active" => Self::Active,
175 "paused" => Self::Paused,
176 "error" => Self::Error,
177 "inactive" => Self::Inactive,
178 "alias" => Self::Alias,
179 _ => Self::Active,
180 })
181 }
182}
183
184#[derive(Debug, Clone, PartialEq)]
186pub enum SampleMode {
187 Monitor,
188 Scan { period_secs: f64 },
189}
190
191impl SampleMode {
192 fn to_db(&self) -> (&str, f64) {
193 match self {
194 Self::Monitor => ("monitor", 0.0),
195 Self::Scan { period_secs } => ("scan", *period_secs),
196 }
197 }
198
199 fn from_db(mode: &str, period: f64) -> Self {
200 match mode {
201 "scan" => Self::Scan {
202 period_secs: period,
203 },
204 _ => Self::Monitor,
205 }
206 }
207}
208
209#[derive(Debug, Clone)]
211pub struct PvRecord {
212 pub pv_name: String,
213 pub dbr_type: ArchDbType,
214 pub sample_mode: SampleMode,
215 pub status: PvStatus,
216 pub element_count: i32,
217 pub last_timestamp: Option<SystemTime>,
218 pub created_at: DateTime<Utc>,
219 pub updated_at: DateTime<Utc>,
220 pub prec: Option<String>,
221 pub egu: Option<String>,
222 pub alias_for: Option<String>,
225 pub archive_fields: Vec<String>,
228 pub policy_name: Option<String>,
230 pub protocol: Protocol,
234}
235
236pub struct PvRegistry {
238 conn: Mutex<Connection>,
239}
240
241impl PvRegistry {
242 fn lock_conn(&self) -> anyhow::Result<std::sync::MutexGuard<'_, Connection>> {
243 self.conn
244 .lock()
245 .map_err(|e| anyhow::anyhow!("PV registry lock poisoned: {e}"))
246 }
247
248 pub fn open(path: &Path) -> anyhow::Result<Self> {
250 let conn = Connection::open(path)?;
251 let registry = Self {
252 conn: Mutex::new(conn),
253 };
254 registry.init_schema()?;
255 Ok(registry)
256 }
257
258 pub fn in_memory() -> anyhow::Result<Self> {
260 let conn = Connection::open_in_memory()?;
261 let registry = Self {
262 conn: Mutex::new(conn),
263 };
264 registry.init_schema()?;
265 Ok(registry)
266 }
267
268 fn init_schema(&self) -> anyhow::Result<()> {
269 let conn = self.lock_conn()?;
270 conn.execute_batch(
273 "
274 CREATE TABLE IF NOT EXISTS pv_info (
275 pv_name TEXT PRIMARY KEY NOT NULL,
276 dbr_type INTEGER NOT NULL,
277 sample_mode TEXT NOT NULL DEFAULT 'monitor',
278 sample_period REAL NOT NULL DEFAULT 0.0,
279 status TEXT NOT NULL DEFAULT 'active',
280 element_count INTEGER NOT NULL DEFAULT 1,
281 last_timestamp TEXT,
282 created_at TEXT NOT NULL,
283 updated_at TEXT NOT NULL,
284 prec TEXT,
285 egu TEXT,
286 alias_for TEXT,
287 archive_fields TEXT,
288 policy_name TEXT,
289 protocol TEXT NOT NULL DEFAULT 'ca'
290 );
291
292 CREATE INDEX IF NOT EXISTS idx_pv_status ON pv_info(status);
293 CREATE INDEX IF NOT EXISTS idx_pv_prefix ON pv_info(pv_name COLLATE NOCASE);
294 ",
295 )?;
296 for stmt in [
301 "ALTER TABLE pv_info ADD COLUMN prec TEXT",
302 "ALTER TABLE pv_info ADD COLUMN egu TEXT",
303 "ALTER TABLE pv_info ADD COLUMN alias_for TEXT",
304 "ALTER TABLE pv_info ADD COLUMN archive_fields TEXT",
305 "ALTER TABLE pv_info ADD COLUMN policy_name TEXT",
306 "ALTER TABLE pv_info ADD COLUMN protocol TEXT NOT NULL DEFAULT 'ca'",
307 ] {
308 match conn.execute(stmt, []) {
309 Ok(_) => {}
310 Err(e) if is_duplicate_column_error(&e) => {}
311 Err(e) => return Err(e.into()),
312 }
313 }
314 conn.execute_batch(
317 "CREATE INDEX IF NOT EXISTS idx_pv_alias \
318 ON pv_info(alias_for) WHERE alias_for IS NOT NULL;",
319 )?;
320 info!("PV registry schema initialized");
321 Ok(())
322 }
323
324 pub fn register_pv(
327 &self,
328 pv_name: &str,
329 dbr_type: ArchDbType,
330 sample_mode: &SampleMode,
331 element_count: i32,
332 ) -> anyhow::Result<()> {
333 self.register_pv_with_protocol(
334 pv_name,
335 dbr_type,
336 sample_mode,
337 element_count,
338 Protocol::Ca,
339 )
340 }
341
342 pub fn register_pv_with_protocol(
345 &self,
346 pv_name: &str,
347 dbr_type: ArchDbType,
348 sample_mode: &SampleMode,
349 element_count: i32,
350 protocol: Protocol,
351 ) -> anyhow::Result<()> {
352 if !is_valid_pv_name(pv_name) {
353 anyhow::bail!("invalid PV name: {pv_name:?}");
354 }
355 let conn = self.lock_conn()?;
356 let now = Utc::now().to_rfc3339();
357 let (mode_str, period) = sample_mode.to_db();
358
359 conn.execute(
360 "INSERT OR REPLACE INTO pv_info
361 (pv_name, dbr_type, sample_mode, sample_period, status, element_count, created_at, updated_at, protocol)
362 VALUES (?1, ?2, ?3, ?4, 'active', ?5, COALESCE((SELECT created_at FROM pv_info WHERE pv_name = ?1), ?6), ?6, ?7)",
363 params![pv_name, dbr_type as i32, mode_str, period, element_count, now, protocol.as_str()],
364 )?;
365 Ok(())
366 }
367
368 pub fn set_status(&self, pv_name: &str, status: PvStatus) -> anyhow::Result<bool> {
370 let conn = self.lock_conn()?;
371 let now = Utc::now().to_rfc3339();
372 let rows = conn.execute(
373 "UPDATE pv_info SET status = ?1, updated_at = ?2 WHERE pv_name = ?3",
374 params![status.as_str(), now, pv_name],
375 )?;
376 Ok(rows > 0)
377 }
378
379 pub fn update_last_timestamp(
381 &self,
382 pv_name: &str,
383 timestamp: SystemTime,
384 ) -> anyhow::Result<()> {
385 let conn = self.lock_conn()?;
386 let dt = DateTime::<Utc>::from(timestamp).to_rfc3339();
387 let now = Utc::now().to_rfc3339();
388 conn.execute(
389 "UPDATE pv_info SET last_timestamp = ?1, updated_at = ?2 WHERE pv_name = ?3",
390 params![dt, now, pv_name],
391 )?;
392 Ok(())
393 }
394
395 pub fn remove_pv(&self, pv_name: &str) -> anyhow::Result<bool> {
397 let conn = self.lock_conn()?;
398 let rows = conn.execute("DELETE FROM pv_info WHERE pv_name = ?1", params![pv_name])?;
399 Ok(rows > 0)
400 }
401
402 pub fn get_pv(&self, pv_name: &str) -> anyhow::Result<Option<PvRecord>> {
404 let conn = self.lock_conn()?;
405 conn.query_row(
406 "SELECT pv_name, dbr_type, sample_mode, sample_period, status, element_count,
407 last_timestamp, created_at, updated_at, prec, egu,
408 alias_for, archive_fields, policy_name, protocol
409 FROM pv_info WHERE pv_name = ?1",
410 params![pv_name],
411 row_to_record,
412 )
413 .optional()
414 .map_err(Into::into)
415 }
416
417 pub fn all_pv_names(&self) -> anyhow::Result<Vec<String>> {
420 let conn = self.lock_conn()?;
421 let mut stmt =
422 conn.prepare("SELECT pv_name FROM pv_info WHERE alias_for IS NULL ORDER BY pv_name")?;
423 let names = stmt
424 .query_map([], |row| row.get(0))?
425 .collect::<Result<Vec<String>, _>>()?;
426 Ok(names)
427 }
428
429 pub fn pvs_by_status(&self, status: PvStatus) -> anyhow::Result<Vec<PvRecord>> {
432 let conn = self.lock_conn()?;
433 let mut stmt = conn.prepare(
434 "SELECT pv_name, dbr_type, sample_mode, sample_period, status, element_count,
435 last_timestamp, created_at, updated_at, prec, egu,
436 alias_for, archive_fields, policy_name, protocol
437 FROM pv_info WHERE status = ?1 ORDER BY pv_name",
438 )?;
439 let records = stmt
440 .query_map(params![status.as_str()], row_to_record)?
441 .collect::<Result<Vec<_>, _>>()?;
442 Ok(records)
443 }
444
445 pub fn matching_pvs(&self, pattern: &str) -> anyhow::Result<Vec<String>> {
447 let conn = self.lock_conn()?;
448 let mut stmt = conn.prepare(
449 "SELECT pv_name FROM pv_info
450 WHERE pv_name GLOB ?1 AND alias_for IS NULL
451 ORDER BY pv_name",
452 )?;
453 let names = stmt
454 .query_map(params![pattern], |row| row.get(0))?
455 .collect::<Result<Vec<String>, _>>()?;
456 Ok(names)
457 }
458
459 pub fn matching_pvs_expanded(&self, pattern: &str) -> anyhow::Result<Vec<String>> {
465 let conn = self.lock_conn()?;
466 let mut stmt =
467 conn.prepare("SELECT pv_name FROM pv_info WHERE pv_name GLOB ?1 ORDER BY pv_name")?;
468 let names = stmt
469 .query_map(params![pattern], |row| row.get(0))?
470 .collect::<Result<Vec<String>, _>>()?;
471 Ok(names)
472 }
473
474 pub fn count(&self, status: Option<PvStatus>) -> anyhow::Result<u64> {
476 let conn = self.lock_conn()?;
477 let count: u64 = match status {
478 Some(s) => conn.query_row(
479 "SELECT COUNT(*) FROM pv_info
480 WHERE status = ?1 AND alias_for IS NULL",
481 params![s.as_str()],
482 |row| row.get(0),
483 )?,
484 None => conn.query_row(
485 "SELECT COUNT(*) FROM pv_info WHERE alias_for IS NULL",
486 [],
487 |row| row.get(0),
488 )?,
489 };
490 Ok(count)
491 }
492
493 pub fn batch_update_timestamps(&self, updates: &[(&str, SystemTime)]) -> anyhow::Result<()> {
495 let mut conn = self.lock_conn()?;
496 let tx = conn.transaction()?;
497 let now = Utc::now().to_rfc3339();
498 {
499 let mut stmt = tx.prepare(
500 "UPDATE pv_info SET last_timestamp = ?1, updated_at = ?2 WHERE pv_name = ?3",
501 )?;
502 for (pv_name, ts) in updates {
503 let dt = DateTime::<Utc>::from(*ts).to_rfc3339();
504 stmt.execute(params![dt, now, pv_name])?;
505 }
506 }
507 tx.commit()?;
508 Ok(())
509 }
510
511 pub fn recently_added_pvs(&self, since: SystemTime) -> anyhow::Result<Vec<PvRecord>> {
513 let conn = self.lock_conn()?;
514 let since_str = DateTime::<Utc>::from(since).to_rfc3339();
515 let mut stmt = conn.prepare(
516 "SELECT pv_name, dbr_type, sample_mode, sample_period, status, element_count,
517 last_timestamp, created_at, updated_at, prec, egu,
518 alias_for, archive_fields, policy_name, protocol
519 FROM pv_info WHERE created_at >= ?1 AND alias_for IS NULL
520 ORDER BY created_at DESC",
521 )?;
522 let records = stmt
523 .query_map(params![since_str], row_to_record)?
524 .collect::<Result<Vec<_>, _>>()?;
525 Ok(records)
526 }
527
528 pub fn recently_modified_pvs(&self, since: SystemTime) -> anyhow::Result<Vec<PvRecord>> {
530 let conn = self.lock_conn()?;
531 let since_str = DateTime::<Utc>::from(since).to_rfc3339();
532 let mut stmt = conn.prepare(
533 "SELECT pv_name, dbr_type, sample_mode, sample_period, status, element_count,
534 last_timestamp, created_at, updated_at, prec, egu,
535 alias_for, archive_fields, policy_name, protocol
536 FROM pv_info WHERE updated_at >= ?1 AND alias_for IS NULL
537 ORDER BY updated_at DESC",
538 )?;
539 let records = stmt
540 .query_map(params![since_str], row_to_record)?
541 .collect::<Result<Vec<_>, _>>()?;
542 Ok(records)
543 }
544
545 pub fn update_sample_mode(&self, pv_name: &str, mode: &SampleMode) -> anyhow::Result<bool> {
547 let conn = self.lock_conn()?;
548 let now = Utc::now().to_rfc3339();
549 let (mode_str, period) = mode.to_db();
550 let rows = conn.execute(
551 "UPDATE pv_info SET sample_mode = ?1, sample_period = ?2, updated_at = ?3 WHERE pv_name = ?4",
552 params![mode_str, period, now, pv_name],
553 )?;
554 Ok(rows > 0)
555 }
556
557 pub fn update_metadata(
559 &self,
560 pv_name: &str,
561 prec: Option<&str>,
562 egu: Option<&str>,
563 ) -> anyhow::Result<bool> {
564 let conn = self.lock_conn()?;
565 let now = Utc::now().to_rfc3339();
566 let rows = conn.execute(
567 "UPDATE pv_info SET prec = COALESCE(?1, prec), egu = COALESCE(?2, egu), updated_at = ?3 WHERE pv_name = ?4",
568 params![prec, egu, now, pv_name],
569 )?;
570 Ok(rows > 0)
571 }
572
573 #[allow(clippy::too_many_arguments)]
576 pub fn import_pv(
577 &self,
578 pv_name: &str,
579 dbr_type: ArchDbType,
580 sample_mode: &SampleMode,
581 element_count: i32,
582 status: PvStatus,
583 created_at: Option<&str>,
584 prec: Option<&str>,
585 egu: Option<&str>,
586 alias_for: Option<&str>,
587 archive_fields: &[String],
588 policy_name: Option<&str>,
589 ) -> anyhow::Result<()> {
590 self.import_pv_with_protocol(
591 pv_name,
592 dbr_type,
593 sample_mode,
594 element_count,
595 status,
596 created_at,
597 prec,
598 egu,
599 alias_for,
600 archive_fields,
601 policy_name,
602 Protocol::Ca,
603 )
604 }
605
606 #[allow(clippy::too_many_arguments)]
609 pub fn import_pv_with_protocol(
610 &self,
611 pv_name: &str,
612 dbr_type: ArchDbType,
613 sample_mode: &SampleMode,
614 element_count: i32,
615 status: PvStatus,
616 created_at: Option<&str>,
617 prec: Option<&str>,
618 egu: Option<&str>,
619 alias_for: Option<&str>,
620 archive_fields: &[String],
621 policy_name: Option<&str>,
622 protocol: Protocol,
623 ) -> anyhow::Result<()> {
624 if !is_valid_pv_name(pv_name) {
625 anyhow::bail!("invalid PV name: {pv_name:?}");
626 }
627 if let Some(target) = alias_for
628 && !is_valid_pv_name(target)
629 {
630 anyhow::bail!("invalid alias target: {target:?}");
631 }
632 let conn = self.lock_conn()?;
633 let now = Utc::now().to_rfc3339();
634 let (mode_str, period) = sample_mode.to_db();
635 let created = created_at.unwrap_or(&now);
636 let archive_fields_json = if archive_fields.is_empty() {
637 None
638 } else {
639 Some(serde_json::to_string(archive_fields)?)
640 };
641
642 conn.execute(
643 "INSERT OR REPLACE INTO pv_info
644 (pv_name, dbr_type, sample_mode, sample_period, status, element_count,
645 created_at, updated_at, prec, egu, alias_for, archive_fields, policy_name, protocol)
646 VALUES (?1, ?2, ?3, ?4, ?5, ?6, ?7, ?8, ?9, ?10, ?11, ?12, ?13, ?14)",
647 params![
648 pv_name,
649 dbr_type as i32,
650 mode_str,
651 period,
652 status.as_str(),
653 element_count,
654 created,
655 now,
656 prec,
657 egu,
658 alias_for,
659 archive_fields_json,
660 policy_name,
661 protocol.as_str(),
662 ],
663 )?;
664 Ok(())
665 }
666
667 pub fn update_archive_fields(&self, pv_name: &str, fields: &[String]) -> anyhow::Result<bool> {
669 let conn = self.lock_conn()?;
670 let now = Utc::now().to_rfc3339();
671 let json = if fields.is_empty() {
672 None
673 } else {
674 Some(serde_json::to_string(fields)?)
675 };
676 let rows = conn.execute(
677 "UPDATE pv_info SET archive_fields = ?1, updated_at = ?2 WHERE pv_name = ?3",
678 params![json, now, pv_name],
679 )?;
680 Ok(rows > 0)
681 }
682
683 pub fn update_policy_name(
685 &self,
686 pv_name: &str,
687 policy_name: Option<&str>,
688 ) -> anyhow::Result<bool> {
689 let conn = self.lock_conn()?;
690 let now = Utc::now().to_rfc3339();
691 let rows = conn.execute(
692 "UPDATE pv_info SET policy_name = ?1, updated_at = ?2 WHERE pv_name = ?3",
693 params![policy_name, now, pv_name],
694 )?;
695 Ok(rows > 0)
696 }
697
698 pub fn add_alias(&self, alias: &str, target: &str) -> anyhow::Result<()> {
703 if alias == target {
704 anyhow::bail!("alias and target must differ");
705 }
706 if !is_valid_pv_name(alias) {
707 anyhow::bail!("invalid alias name: {alias:?}");
708 }
709 if !is_valid_pv_name(target) {
710 anyhow::bail!("invalid alias target: {target:?}");
711 }
712 let conn = self.lock_conn()?;
713 let row: Option<(i32, String, f64, i32, Option<String>)> = conn
715 .query_row(
716 "SELECT dbr_type, sample_mode, sample_period, element_count, alias_for
717 FROM pv_info WHERE pv_name = ?1",
718 params![target],
719 |r| Ok((r.get(0)?, r.get(1)?, r.get(2)?, r.get(3)?, r.get(4)?)),
720 )
721 .optional()?;
722 let (dbr_type, mode, period, ec, target_alias) =
723 row.ok_or_else(|| anyhow::anyhow!("target PV '{target}' not found"))?;
724 if target_alias.is_some() {
725 anyhow::bail!(
726 "target PV '{target}' is itself an alias; aliases of aliases are not allowed"
727 );
728 }
729 let existing: Option<Option<String>> = conn
731 .query_row(
732 "SELECT alias_for FROM pv_info WHERE pv_name = ?1",
733 params![alias],
734 |r| r.get(0),
735 )
736 .optional()?;
737 if let Some(existing_alias) = existing {
738 if existing_alias.as_deref() == Some(target) {
739 return Ok(()); }
741 anyhow::bail!("'{alias}' already exists in registry");
742 }
743 let now = Utc::now().to_rfc3339();
744 conn.execute(
745 "INSERT INTO pv_info
746 (pv_name, dbr_type, sample_mode, sample_period, status, element_count,
747 created_at, updated_at, alias_for)
748 VALUES (?1, ?2, ?3, ?4, 'alias', ?5, ?6, ?6, ?7)",
749 params![alias, dbr_type, mode, period, ec, now, target],
750 )?;
751 Ok(())
752 }
753
754 pub fn remove_alias(&self, alias: &str) -> anyhow::Result<bool> {
757 let conn = self.lock_conn()?;
758 let rows = conn.execute(
759 "DELETE FROM pv_info WHERE pv_name = ?1 AND alias_for IS NOT NULL",
760 params![alias],
761 )?;
762 Ok(rows > 0)
763 }
764
765 pub fn resolve_alias(&self, name: &str) -> anyhow::Result<Option<String>> {
768 let conn = self.lock_conn()?;
769 let row: Option<Option<String>> = conn
770 .query_row(
771 "SELECT alias_for FROM pv_info WHERE pv_name = ?1",
772 params![name],
773 |r| r.get(0),
774 )
775 .optional()?;
776 Ok(row.flatten())
777 }
778
779 pub fn canonical_name(&self, name: &str) -> anyhow::Result<String> {
782 Ok(self
783 .resolve_alias(name)?
784 .unwrap_or_else(|| name.to_string()))
785 }
786
787 pub fn aliases_for(&self, target: &str) -> anyhow::Result<Vec<String>> {
789 let conn = self.lock_conn()?;
790 let mut stmt =
791 conn.prepare("SELECT pv_name FROM pv_info WHERE alias_for = ?1 ORDER BY pv_name")?;
792 let names = stmt
793 .query_map(params![target], |row| row.get(0))?
794 .collect::<Result<Vec<String>, _>>()?;
795 Ok(names)
796 }
797
798 pub fn all_aliases(&self) -> anyhow::Result<Vec<(String, String)>> {
800 let conn = self.lock_conn()?;
801 let mut stmt = conn.prepare(
802 "SELECT pv_name, alias_for FROM pv_info
803 WHERE alias_for IS NOT NULL ORDER BY pv_name",
804 )?;
805 let rows = stmt
806 .query_map([], |row| {
807 Ok((row.get::<_, String>(0)?, row.get::<_, String>(1)?))
808 })?
809 .collect::<Result<Vec<_>, _>>()?;
810 Ok(rows)
811 }
812
813 pub fn expanded_pv_names(&self) -> anyhow::Result<Vec<String>> {
815 let conn = self.lock_conn()?;
816 let mut stmt = conn.prepare("SELECT pv_name FROM pv_info ORDER BY pv_name")?;
817 let names = stmt
818 .query_map([], |row| row.get(0))?
819 .collect::<Result<Vec<String>, _>>()?;
820 Ok(names)
821 }
822
823 pub fn all_records(&self) -> anyhow::Result<Vec<PvRecord>> {
825 let conn = self.lock_conn()?;
826 let mut stmt = conn.prepare(
827 "SELECT pv_name, dbr_type, sample_mode, sample_period, status, element_count,
828 last_timestamp, created_at, updated_at, prec, egu,
829 alias_for, archive_fields, policy_name, protocol
830 FROM pv_info ORDER BY pv_name",
831 )?;
832 let records = stmt
833 .query_map([], row_to_record)?
834 .collect::<Result<Vec<_>, _>>()?;
835 Ok(records)
836 }
837
838 pub fn silent_pvs(&self, threshold: Duration) -> anyhow::Result<Vec<PvRecord>> {
842 let conn = self.lock_conn()?;
843 let cutoff = SystemTime::now()
844 .checked_sub(threshold)
845 .unwrap_or(SystemTime::UNIX_EPOCH);
846 let cutoff_str = DateTime::<Utc>::from(cutoff).to_rfc3339();
847 let mut stmt = conn.prepare(
848 "SELECT pv_name, dbr_type, sample_mode, sample_period, status, element_count,
849 last_timestamp, created_at, updated_at, prec, egu,
850 alias_for, archive_fields, policy_name, protocol
851 FROM pv_info WHERE last_timestamp IS NOT NULL AND last_timestamp < ?1
852 AND alias_for IS NULL
853 ORDER BY last_timestamp ASC",
854 )?;
855 let records = stmt
856 .query_map(params![cutoff_str], row_to_record)?
857 .collect::<Result<Vec<_>, _>>()?;
858 Ok(records)
859 }
860}
861
862fn row_to_record(row: &rusqlite::Row) -> rusqlite::Result<PvRecord> {
863 let pv_name: String = row.get(0)?;
864 let dbr_type_i: i32 = row.get(1)?;
865 let sample_mode_str: String = row.get(2)?;
866 let sample_period: f64 = row.get(3)?;
867 let status_str: String = row.get(4)?;
868 let element_count: i32 = row.get(5)?;
869 let last_ts_str: Option<String> = row.get(6)?;
870 let created_str: String = row.get(7)?;
871 let updated_str: String = row.get(8)?;
872 let prec: Option<String> = row.get(9).unwrap_or(None);
873 let egu: Option<String> = row.get(10).unwrap_or(None);
874 let alias_for: Option<String> = row.get(11).unwrap_or(None);
875 let archive_fields_json: Option<String> = row.get(12).unwrap_or(None);
876 let policy_name: Option<String> = row.get(13).unwrap_or(None);
877 let protocol_str: Option<String> = row.get(14).unwrap_or(None);
880 let protocol = protocol_str
881 .as_deref()
882 .and_then(Protocol::parse)
883 .unwrap_or_default();
884
885 let last_timestamp = last_ts_str.and_then(|s| {
886 DateTime::parse_from_rfc3339(&s)
887 .ok()
888 .map(|dt| dt.with_timezone(&Utc).into())
889 });
890
891 let archive_fields = archive_fields_json
892 .as_deref()
893 .and_then(|s| serde_json::from_str::<Vec<String>>(s).ok())
894 .unwrap_or_default();
895
896 Ok(PvRecord {
897 pv_name,
898 dbr_type: ArchDbType::from_i32(dbr_type_i).unwrap_or(ArchDbType::ScalarDouble),
899 sample_mode: SampleMode::from_db(&sample_mode_str, sample_period),
900 status: status_str.parse().unwrap_or(PvStatus::Active),
901 element_count,
902 last_timestamp,
903 created_at: DateTime::parse_from_rfc3339(&created_str)
904 .map(|dt| dt.with_timezone(&Utc))
905 .unwrap_or_else(|_| Utc::now()),
906 updated_at: DateTime::parse_from_rfc3339(&updated_str)
907 .map(|dt| dt.with_timezone(&Utc))
908 .unwrap_or_else(|_| Utc::now()),
909 prec,
910 egu,
911 alias_for,
912 archive_fields,
913 policy_name,
914 protocol,
915 })
916}
917
918fn is_duplicate_column_error(e: &rusqlite::Error) -> bool {
922 matches!(
923 e,
924 rusqlite::Error::SqliteFailure(_, Some(msg))
925 if msg.starts_with("duplicate column name")
926 )
927}
928
929#[cfg(test)]
930mod tests {
931 use super::*;
932
933 #[test]
934 fn invalid_pv_names_rejected() {
935 assert!(!is_valid_pv_name("../etc/passwd"));
937 assert!(!is_valid_pv_name("foo/../bar"));
938 assert!(!is_valid_pv_name("foo:..:bar"));
939 assert!(!is_valid_pv_name("foo/./bar"));
940 assert!(!is_valid_pv_name("/etc/passwd"));
943 assert!(!is_valid_pv_name(":SIM:foo"));
944 assert!(!is_valid_pv_name(":foo"));
945 assert!(!is_valid_pv_name("foo::bar"));
947 assert!(!is_valid_pv_name("foo//bar"));
948 assert!(!is_valid_pv_name("foo:"));
949 assert!(!is_valid_pv_name("foo/"));
950 assert!(!is_valid_pv_name("foo;rm -rf /"));
952 assert!(!is_valid_pv_name("foo|bar"));
953 assert!(!is_valid_pv_name("foo`x`"));
954 assert!(!is_valid_pv_name("foo$BAR"));
955 assert!(!is_valid_pv_name("foo bar"));
956 assert!(!is_valid_pv_name("foo\nbar"));
957 assert!(!is_valid_pv_name(""));
959 assert!(!is_valid_pv_name(".hidden"));
960 assert!(!is_valid_pv_name("-leading-dash"));
961 assert!(!is_valid_pv_name(&"x".repeat(257)));
962 }
963
964 #[test]
965 fn valid_pv_names_accepted() {
966 assert!(is_valid_pv_name("SIM:Sine"));
968 assert!(is_valid_pv_name("XF:31IDA-OP{Tbl-Ax:X1}Mtr"));
969 assert!(is_valid_pv_name("ACC1-001-RFCAV-01:V<x>"));
970 assert!(is_valid_pv_name("PV.HIHI"));
971 assert!(is_valid_pv_name("BL_X+Y"));
972 assert!(is_valid_pv_name("a"));
973 assert!(is_valid_pv_name(&"x".repeat(256)));
974 }
975
976 #[test]
977 fn strip_field_suffix_basics() {
978 assert_eq!(strip_field_suffix("BASE.HIHI"), Some("BASE"));
979 assert_eq!(strip_field_suffix("BASE.LOLO"), Some("BASE"));
980 assert_eq!(strip_field_suffix("FOO.BAR_99"), Some("FOO"));
981 assert_eq!(strip_field_suffix("BASE"), None);
983 assert_eq!(strip_field_suffix("BASE.hihi"), None);
985 assert_eq!(strip_field_suffix("BASE.Hihi"), None);
986 assert_eq!(strip_field_suffix(".HIHI"), None);
988 assert_eq!(strip_field_suffix("BASE."), None);
989 }
990
991 #[test]
992 fn test_register_and_get() {
993 let reg = PvRegistry::in_memory().unwrap();
994 reg.register_pv(
995 "SIM:Sine",
996 ArchDbType::ScalarDouble,
997 &SampleMode::Monitor,
998 1,
999 )
1000 .unwrap();
1001
1002 let record = reg.get_pv("SIM:Sine").unwrap().unwrap();
1003 assert_eq!(record.pv_name, "SIM:Sine");
1004 assert_eq!(record.dbr_type, ArchDbType::ScalarDouble);
1005 assert_eq!(record.status, PvStatus::Active);
1006 }
1007
1008 #[test]
1009 fn test_status_transitions() {
1010 let reg = PvRegistry::in_memory().unwrap();
1011 reg.register_pv(
1012 "SIM:Test",
1013 ArchDbType::ScalarDouble,
1014 &SampleMode::Monitor,
1015 1,
1016 )
1017 .unwrap();
1018
1019 reg.set_status("SIM:Test", PvStatus::Paused).unwrap();
1020 let r = reg.get_pv("SIM:Test").unwrap().unwrap();
1021 assert_eq!(r.status, PvStatus::Paused);
1022
1023 reg.set_status("SIM:Test", PvStatus::Active).unwrap();
1024 let r = reg.get_pv("SIM:Test").unwrap().unwrap();
1025 assert_eq!(r.status, PvStatus::Active);
1026 }
1027
1028 #[test]
1029 fn test_pattern_matching() {
1030 let reg = PvRegistry::in_memory().unwrap();
1031 reg.register_pv(
1032 "SIM:Sine",
1033 ArchDbType::ScalarDouble,
1034 &SampleMode::Monitor,
1035 1,
1036 )
1037 .unwrap();
1038 reg.register_pv(
1039 "SIM:Cosine",
1040 ArchDbType::ScalarDouble,
1041 &SampleMode::Monitor,
1042 1,
1043 )
1044 .unwrap();
1045 reg.register_pv(
1046 "EXP:BL1:run:active",
1047 ArchDbType::ScalarEnum,
1048 &SampleMode::Monitor,
1049 1,
1050 )
1051 .unwrap();
1052 reg.register_pv(
1053 "EXP:BL1:motor:th:readback",
1054 ArchDbType::ScalarDouble,
1055 &SampleMode::Monitor,
1056 1,
1057 )
1058 .unwrap();
1059
1060 let sim = reg.matching_pvs("SIM:*").unwrap();
1061 assert_eq!(sim.len(), 2);
1062
1063 let exp = reg.matching_pvs("EXP:BL1:*").unwrap();
1064 assert_eq!(exp.len(), 2);
1065
1066 let motor = reg.matching_pvs("EXP:*:motor:*").unwrap();
1067 assert_eq!(motor.len(), 1);
1068 }
1069
1070 #[test]
1071 fn test_count_and_list() {
1072 let reg = PvRegistry::in_memory().unwrap();
1073 for i in 0..100 {
1074 reg.register_pv(
1075 &format!("PV:Test:{i:04}"),
1076 ArchDbType::ScalarDouble,
1077 &SampleMode::Monitor,
1078 1,
1079 )
1080 .unwrap();
1081 }
1082
1083 assert_eq!(reg.count(None).unwrap(), 100);
1084 assert_eq!(reg.count(Some(PvStatus::Active)).unwrap(), 100);
1085
1086 let names = reg.all_pv_names().unwrap();
1087 assert_eq!(names.len(), 100);
1088 }
1089
1090 #[test]
1091 fn test_remove_pv() {
1092 let reg = PvRegistry::in_memory().unwrap();
1093 reg.register_pv(
1094 "SIM:Gone",
1095 ArchDbType::ScalarDouble,
1096 &SampleMode::Monitor,
1097 1,
1098 )
1099 .unwrap();
1100 assert!(reg.get_pv("SIM:Gone").unwrap().is_some());
1101
1102 reg.remove_pv("SIM:Gone").unwrap();
1103 assert!(reg.get_pv("SIM:Gone").unwrap().is_none());
1104 }
1105
1106 #[test]
1107 fn test_batch_update_timestamps() {
1108 let reg = PvRegistry::in_memory().unwrap();
1109 reg.register_pv("PV:A", ArchDbType::ScalarDouble, &SampleMode::Monitor, 1)
1110 .unwrap();
1111 reg.register_pv("PV:B", ArchDbType::ScalarDouble, &SampleMode::Monitor, 1)
1112 .unwrap();
1113
1114 let now = SystemTime::now();
1115 reg.batch_update_timestamps(&[("PV:A", now), ("PV:B", now)])
1116 .unwrap();
1117
1118 let a = reg.get_pv("PV:A").unwrap().unwrap();
1119 assert!(a.last_timestamp.is_some());
1120 }
1121
1122 #[test]
1123 fn test_recently_added_pvs() {
1124 let reg = PvRegistry::in_memory().unwrap();
1125 let before = SystemTime::now() - Duration::from_secs(1);
1126 reg.register_pv("PV:New", ArchDbType::ScalarDouble, &SampleMode::Monitor, 1)
1127 .unwrap();
1128
1129 let recent = reg.recently_added_pvs(before).unwrap();
1130 assert_eq!(recent.len(), 1);
1131 assert_eq!(recent[0].pv_name, "PV:New");
1132
1133 let future = SystemTime::now() + Duration::from_secs(3600);
1134 let none = reg.recently_added_pvs(future).unwrap();
1135 assert!(none.is_empty());
1136 }
1137
1138 #[test]
1139 fn test_recently_modified_pvs() {
1140 let reg = PvRegistry::in_memory().unwrap();
1141 reg.register_pv("PV:Mod", ArchDbType::ScalarDouble, &SampleMode::Monitor, 1)
1142 .unwrap();
1143 let before = SystemTime::now() - Duration::from_secs(1);
1144
1145 reg.set_status("PV:Mod", PvStatus::Paused).unwrap();
1147
1148 let recent = reg.recently_modified_pvs(before).unwrap();
1149 assert!(recent.iter().any(|r| r.pv_name == "PV:Mod"));
1150 }
1151
1152 #[test]
1153 fn test_update_sample_mode() {
1154 let reg = PvRegistry::in_memory().unwrap();
1155 reg.register_pv("PV:Mode", ArchDbType::ScalarDouble, &SampleMode::Monitor, 1)
1156 .unwrap();
1157
1158 let new_mode = SampleMode::Scan { period_secs: 5.0 };
1159 assert!(reg.update_sample_mode("PV:Mode", &new_mode).unwrap());
1160
1161 let r = reg.get_pv("PV:Mode").unwrap().unwrap();
1162 assert_eq!(r.sample_mode, SampleMode::Scan { period_secs: 5.0 });
1163 }
1164
1165 #[test]
1166 fn test_archive_fields_roundtrip() {
1167 let reg = PvRegistry::in_memory().unwrap();
1168 reg.register_pv(
1169 "PV:Fields",
1170 ArchDbType::ScalarDouble,
1171 &SampleMode::Monitor,
1172 1,
1173 )
1174 .unwrap();
1175
1176 let r = reg.get_pv("PV:Fields").unwrap().unwrap();
1178 assert!(r.archive_fields.is_empty());
1179
1180 let fields = vec!["HIHI".to_string(), "LOLO".to_string(), "EGU".to_string()];
1182 assert!(reg.update_archive_fields("PV:Fields", &fields).unwrap());
1183 let r = reg.get_pv("PV:Fields").unwrap().unwrap();
1184 assert_eq!(r.archive_fields, fields);
1185
1186 assert!(reg.update_archive_fields("PV:Fields", &[]).unwrap());
1188 let r = reg.get_pv("PV:Fields").unwrap().unwrap();
1189 assert!(r.archive_fields.is_empty());
1190 }
1191
1192 #[test]
1193 fn test_policy_name_roundtrip() {
1194 let reg = PvRegistry::in_memory().unwrap();
1195 reg.register_pv("PV:Pol", ArchDbType::ScalarDouble, &SampleMode::Monitor, 1)
1196 .unwrap();
1197
1198 let r = reg.get_pv("PV:Pol").unwrap().unwrap();
1199 assert!(r.policy_name.is_none());
1200
1201 assert!(reg.update_policy_name("PV:Pol", Some("fast")).unwrap());
1202 let r = reg.get_pv("PV:Pol").unwrap().unwrap();
1203 assert_eq!(r.policy_name.as_deref(), Some("fast"));
1204
1205 assert!(reg.update_policy_name("PV:Pol", None).unwrap());
1206 let r = reg.get_pv("PV:Pol").unwrap().unwrap();
1207 assert!(r.policy_name.is_none());
1208 }
1209
1210 #[test]
1211 fn test_import_pv_with_alias_and_fields() {
1212 let reg = PvRegistry::in_memory().unwrap();
1213 let fields = vec!["HIHI".to_string(), "LOLO".to_string()];
1214 reg.import_pv(
1215 "PV:Aliased",
1216 ArchDbType::ScalarDouble,
1217 &SampleMode::Monitor,
1218 1,
1219 PvStatus::Active,
1220 None,
1221 Some("3"),
1222 Some("mA"),
1223 Some("PV:Real"),
1224 &fields,
1225 Some("ring"),
1226 )
1227 .unwrap();
1228
1229 let r = reg.get_pv("PV:Aliased").unwrap().unwrap();
1230 assert_eq!(r.alias_for.as_deref(), Some("PV:Real"));
1231 assert_eq!(r.archive_fields, fields);
1232 assert_eq!(r.policy_name.as_deref(), Some("ring"));
1233 assert_eq!(r.prec.as_deref(), Some("3"));
1234 assert_eq!(r.egu.as_deref(), Some("mA"));
1235 }
1236
1237 #[test]
1238 fn test_migration_from_old_schema() {
1239 let conn = Connection::open_in_memory().unwrap();
1242 conn.execute_batch(
1243 "CREATE TABLE pv_info (
1244 pv_name TEXT PRIMARY KEY NOT NULL,
1245 dbr_type INTEGER NOT NULL,
1246 sample_mode TEXT NOT NULL DEFAULT 'monitor',
1247 sample_period REAL NOT NULL DEFAULT 0.0,
1248 status TEXT NOT NULL DEFAULT 'active',
1249 element_count INTEGER NOT NULL DEFAULT 1,
1250 last_timestamp TEXT,
1251 created_at TEXT NOT NULL,
1252 updated_at TEXT NOT NULL,
1253 prec TEXT,
1254 egu TEXT
1255 );",
1256 )
1257 .unwrap();
1258 let now = Utc::now().to_rfc3339();
1259 conn.execute(
1260 "INSERT INTO pv_info
1261 (pv_name, dbr_type, sample_mode, sample_period, status, element_count,
1262 created_at, updated_at, prec, egu)
1263 VALUES (?1, ?2, 'monitor', 0.0, 'active', 1, ?3, ?3, NULL, NULL)",
1264 params!["PV:Legacy", ArchDbType::ScalarDouble as i32, now],
1265 )
1266 .unwrap();
1267
1268 let reg = PvRegistry {
1269 conn: Mutex::new(conn),
1270 };
1271 reg.init_schema().unwrap();
1273
1274 let r = reg.get_pv("PV:Legacy").unwrap().unwrap();
1275 assert_eq!(r.pv_name, "PV:Legacy");
1276 assert!(r.alias_for.is_none());
1277 assert!(r.archive_fields.is_empty());
1278 assert!(r.policy_name.is_none());
1279
1280 assert!(
1282 reg.update_archive_fields("PV:Legacy", &["HIHI".to_string()])
1283 .unwrap()
1284 );
1285 let r = reg.get_pv("PV:Legacy").unwrap().unwrap();
1286 assert_eq!(r.archive_fields, vec!["HIHI".to_string()]);
1287 }
1288
1289 #[test]
1290 fn test_aliases_basic() {
1291 let reg = PvRegistry::in_memory().unwrap();
1292 reg.register_pv(
1293 "RING:Current",
1294 ArchDbType::ScalarDouble,
1295 &SampleMode::Monitor,
1296 1,
1297 )
1298 .unwrap();
1299
1300 reg.add_alias("DEV:Current", "RING:Current").unwrap();
1302 assert_eq!(
1303 reg.resolve_alias("DEV:Current").unwrap().as_deref(),
1304 Some("RING:Current"),
1305 );
1306 assert!(reg.resolve_alias("RING:Current").unwrap().is_none()); assert!(reg.resolve_alias("Nonexistent").unwrap().is_none());
1308
1309 assert_eq!(reg.canonical_name("DEV:Current").unwrap(), "RING:Current");
1310 assert_eq!(reg.canonical_name("RING:Current").unwrap(), "RING:Current");
1311
1312 assert_eq!(
1314 reg.aliases_for("RING:Current").unwrap(),
1315 vec!["DEV:Current".to_string()],
1316 );
1317 assert_eq!(
1318 reg.all_aliases().unwrap(),
1319 vec![("DEV:Current".to_string(), "RING:Current".to_string())],
1320 );
1321
1322 let expanded = reg.expanded_pv_names().unwrap();
1324 assert!(expanded.contains(&"RING:Current".to_string()));
1325 assert!(expanded.contains(&"DEV:Current".to_string()));
1326
1327 reg.add_alias("DEV:Current", "RING:Current").unwrap();
1329 assert_eq!(reg.aliases_for("RING:Current").unwrap().len(), 1);
1330
1331 assert!(reg.remove_alias("DEV:Current").unwrap());
1333 assert!(reg.resolve_alias("DEV:Current").unwrap().is_none());
1334 assert!(!reg.remove_alias("DEV:Current").unwrap()); }
1336
1337 #[test]
1338 fn test_alias_conflicts() {
1339 let reg = PvRegistry::in_memory().unwrap();
1340 reg.register_pv("PV:A", ArchDbType::ScalarDouble, &SampleMode::Monitor, 1)
1341 .unwrap();
1342 reg.register_pv("PV:B", ArchDbType::ScalarDouble, &SampleMode::Monitor, 1)
1343 .unwrap();
1344
1345 assert!(reg.add_alias("Alias:X", "Nonexistent").is_err());
1347
1348 assert!(reg.add_alias("PV:A", "PV:A").is_err());
1350
1351 assert!(reg.add_alias("PV:B", "PV:A").is_err());
1353
1354 reg.add_alias("Alias:A", "PV:A").unwrap();
1356 assert!(reg.add_alias("Alias:Two", "Alias:A").is_err());
1357
1358 assert!(!reg.remove_alias("PV:A").unwrap());
1360 assert!(reg.get_pv("PV:A").unwrap().is_some());
1361 }
1362
1363 #[test]
1364 fn test_silent_pvs() {
1365 let reg = PvRegistry::in_memory().unwrap();
1366 reg.register_pv(
1367 "PV:Silent",
1368 ArchDbType::ScalarDouble,
1369 &SampleMode::Monitor,
1370 1,
1371 )
1372 .unwrap();
1373 reg.register_pv(
1374 "PV:NoData",
1375 ArchDbType::ScalarDouble,
1376 &SampleMode::Monitor,
1377 1,
1378 )
1379 .unwrap();
1380
1381 let old_time = SystemTime::now() - Duration::from_secs(7200);
1383 reg.update_last_timestamp("PV:Silent", old_time).unwrap();
1384
1385 let silent = reg.silent_pvs(Duration::from_secs(3600)).unwrap();
1387 assert_eq!(silent.len(), 1);
1388 assert_eq!(silent[0].pv_name, "PV:Silent");
1389 }
1390}