pub mod db_access;
mod field_io;
mod link_set;
mod links;
mod processing;
mod scan_index;
pub use link_set::{DynLinkSet, LinkSet, LinkSetRegistry};
use crate::runtime::sync::RwLock;
use std::collections::{BTreeSet, HashMap};
use std::sync::Arc;
use crate::server::pv::ProcessVariable;
use crate::server::record::{Record, RecordInstance, ScanType};
use crate::types::EpicsValue;
pub fn parse_pv_name(name: &str) -> (&str, &str) {
match name.rsplit_once('.') {
Some((base, field)) => (base, field),
None => (name, "VAL"),
}
}
fn apply_timestamp(common: &mut super::record::CommonFields, _is_soft: bool) {
match common.tse {
0 => {
common.time = crate::runtime::general_time::get_current();
}
-1 => {
if common.time == std::time::SystemTime::UNIX_EPOCH {
common.time = crate::runtime::general_time::get_event(-1);
}
}
-2 => {
}
_ => {
common.time = crate::runtime::general_time::get_event(common.tse as i32);
}
}
}
pub enum PvEntry {
Simple(Arc<ProcessVariable>),
Record(Arc<RwLock<RecordInstance>>),
}
pub type ExternalPvResolver = Arc<dyn Fn(&str) -> Option<EpicsValue> + Send + Sync>;
pub type SearchResolver = Arc<
dyn Fn(String) -> std::pin::Pin<Box<dyn std::future::Future<Output = bool> + Send>>
+ Send
+ Sync,
>;
struct PvDatabaseInner {
simple_pvs: RwLock<HashMap<String, Arc<ProcessVariable>>>,
records: RwLock<HashMap<String, Arc<RwLock<RecordInstance>>>>,
scan_index: RwLock<HashMap<ScanType, BTreeSet<(i16, String)>>>,
cp_links: RwLock<HashMap<String, Vec<String>>>,
external_resolver: RwLock<Option<ExternalPvResolver>>,
search_resolver: RwLock<Option<SearchResolver>>,
link_sets: RwLock<link_set::LinkSetRegistry>,
scan_started: std::sync::atomic::AtomicBool,
pini_done: std::sync::atomic::AtomicBool,
pini_notify: tokio::sync::Notify,
}
#[derive(Clone)]
pub struct PvDatabase {
inner: Arc<PvDatabaseInner>,
}
fn select_link_indices(selm: i16, seln: i16, count: usize) -> Vec<usize> {
match selm {
0 => (0..count).collect(),
1 => {
let i = seln as usize;
if i < count { vec![i] } else { vec![] }
}
2 => (0..count)
.filter(|i| (seln as u16) & (1 << i) != 0)
.collect(),
_ => (0..count).collect(),
}
}
impl PvDatabase {
pub fn new() -> Self {
Self {
inner: Arc::new(PvDatabaseInner {
simple_pvs: RwLock::new(HashMap::new()),
external_resolver: RwLock::new(None),
search_resolver: RwLock::new(None),
link_sets: RwLock::new(link_set::LinkSetRegistry::new()),
records: RwLock::new(HashMap::new()),
scan_index: RwLock::new(HashMap::new()),
cp_links: RwLock::new(HashMap::new()),
scan_started: std::sync::atomic::AtomicBool::new(false),
pini_done: std::sync::atomic::AtomicBool::new(false),
pini_notify: tokio::sync::Notify::new(),
}),
}
}
pub fn try_claim_scan_start(&self) -> bool {
self.inner
.scan_started
.compare_exchange(
false,
true,
std::sync::atomic::Ordering::AcqRel,
std::sync::atomic::Ordering::Acquire,
)
.is_ok()
}
pub fn mark_pini_done(&self) {
self.inner
.pini_done
.store(true, std::sync::atomic::Ordering::Release);
self.inner.pini_notify.notify_waiters();
}
pub async fn wait_for_pini(&self) {
if self
.inner
.pini_done
.load(std::sync::atomic::Ordering::Acquire)
{
return;
}
let notified = self.inner.pini_notify.notified();
if self
.inner
.pini_done
.load(std::sync::atomic::Ordering::Acquire)
{
return;
}
notified.await;
}
pub async fn set_search_resolver(&self, resolver: SearchResolver) {
*self.inner.search_resolver.write().await = Some(resolver);
}
pub async fn clear_search_resolver(&self) {
*self.inner.search_resolver.write().await = None;
}
pub async fn set_external_resolver(&self, resolver: ExternalPvResolver) {
*self.inner.external_resolver.write().await = Some(resolver);
}
pub async fn register_link_set(&self, scheme: &str, lset: link_set::DynLinkSet) {
self.inner.link_sets.write().await.register(scheme, lset);
}
pub async fn link_set(&self, scheme: &str) -> Option<link_set::DynLinkSet> {
self.inner.link_sets.read().await.get(scheme)
}
pub async fn registered_link_schemes(&self) -> Vec<String> {
let mut s = self.inner.link_sets.read().await.schemes();
s.sort();
s
}
pub async fn record_link_fields(
&self,
record_name: &str,
) -> Vec<(String, String, crate::server::record::ParsedLink)> {
let rec = match self.get_record(record_name).await {
Some(r) => r,
None => return Vec::new(),
};
let inst = rec.read().await;
let mut out = Vec::new();
for fd in inst.record.field_list() {
if !matches!(fd.dbf_type, crate::types::DbFieldType::String) {
continue;
}
let raw = match inst.record.get_field(fd.name) {
Some(EpicsValue::String(s)) => s,
_ => continue,
};
if raw.is_empty() {
continue;
}
let parsed = crate::server::record::parse_link_v2(&raw);
if !matches!(parsed, crate::server::record::ParsedLink::None) {
out.push((fd.name.to_string(), raw, parsed));
}
}
out
}
pub(crate) async fn resolve_external_pv(&self, name: &str) -> Option<EpicsValue> {
let (scheme, body) = if let Some(rest) = name.strip_prefix("pva://") {
("pva", rest)
} else if let Some(rest) = name.strip_prefix("ca://") {
("ca", rest)
} else {
let registry = self.inner.link_sets.read().await;
for s in registry.schemes() {
if let Some(lset) = registry.get(&s) {
if let Some(v) = lset.get_value(name) {
return Some(v);
}
}
}
drop(registry);
let resolver = self.inner.external_resolver.read().await;
return resolver.as_ref().and_then(|r| r(name));
};
if let Some(lset) = self.inner.link_sets.read().await.get(scheme) {
if let Some(v) = lset.get_value(body) {
return Some(v);
}
}
let resolver = self.inner.external_resolver.read().await;
resolver.as_ref().and_then(|r| r(name))
}
pub async fn add_pv(&self, name: &str, initial: EpicsValue) {
let pv = Arc::new(ProcessVariable::new(name.to_string(), initial));
self.inner
.simple_pvs
.write()
.await
.insert(name.to_string(), pv);
}
pub async fn add_pv_with_hook(
&self,
name: &str,
initial: EpicsValue,
hook: crate::server::pv::WriteHook,
) {
let pv = Arc::new(ProcessVariable::new(name.to_string(), initial));
pv.set_write_hook(hook);
self.inner
.simple_pvs
.write()
.await
.insert(name.to_string(), pv);
}
pub async fn remove_simple_pv(&self, name: &str) -> Option<Arc<ProcessVariable>> {
self.inner.simple_pvs.write().await.remove(name)
}
pub async fn add_record(&self, name: &str, record: Box<dyn Record>) {
let instance = RecordInstance::new_boxed(name.to_string(), record);
let scan = instance.common.scan;
let phas = instance.common.phas;
self.inner
.records
.write()
.await
.insert(name.to_string(), Arc::new(RwLock::new(instance)));
if scan != ScanType::Passive {
self.inner
.scan_index
.write()
.await
.entry(scan)
.or_default()
.insert((phas, name.to_string()));
}
}
async fn find_entry_no_resolve(&self, name: &str) -> Option<PvEntry> {
let (base, _field) = parse_pv_name(name);
if let Some(pv) = self.inner.simple_pvs.read().await.get(name) {
return Some(PvEntry::Simple(pv.clone()));
}
if let Some(rec) = self.inner.records.read().await.get(base) {
return Some(PvEntry::Record(rec.clone()));
}
None
}
async fn has_name_no_resolve(&self, name: &str) -> bool {
let (base, _) = parse_pv_name(name);
if self.inner.simple_pvs.read().await.contains_key(name) {
return true;
}
self.inner.records.read().await.contains_key(base)
}
pub async fn find_entry(&self, name: &str) -> Option<PvEntry> {
if let Some(entry) = self.find_entry_no_resolve(name).await {
return Some(entry);
}
let resolver = self.inner.search_resolver.read().await.clone();
if let Some(r) = resolver {
if r(name.to_string()).await {
return self.find_entry_no_resolve(name).await;
}
}
None
}
pub async fn has_name(&self, name: &str) -> bool {
if self.has_name_no_resolve(name).await {
return true;
}
let resolver = self.inner.search_resolver.read().await.clone();
if let Some(r) = resolver {
if r(name.to_string()).await {
return self.has_name_no_resolve(name).await;
}
}
false
}
pub async fn find_pv(&self, name: &str) -> Option<Arc<ProcessVariable>> {
if let Some(pv) = self.inner.simple_pvs.read().await.get(name) {
return Some(pv.clone());
}
None
}
pub async fn get_record(&self, name: &str) -> Option<Arc<RwLock<RecordInstance>>> {
self.inner.records.read().await.get(name).cloned()
}
pub async fn all_record_names(&self) -> Vec<String> {
self.inner.records.read().await.keys().cloned().collect()
}
pub async fn all_simple_pv_names(&self) -> Vec<String> {
self.inner.simple_pvs.read().await.keys().cloned().collect()
}
}
#[cfg(test)]
mod tests {
use super::*;
#[tokio::test]
async fn test_select_link_indices() {
assert_eq!(select_link_indices(0, 0, 6), vec![0, 1, 2, 3, 4, 5]);
assert_eq!(select_link_indices(1, 2, 6), vec![2]);
assert_eq!(select_link_indices(1, 10, 6), Vec::<usize>::new());
assert_eq!(select_link_indices(2, 5, 6), vec![0, 2]);
}
}