use std::collections::HashMap;
use std::sync::Arc;
use tokio::sync::{RwLock, mpsc};
use epics_pva_rs::pvdata::{FieldDesc, PvField, PvStructure};
use super::provider::{
AnyChannel, BridgeProvider, Channel, ChannelProvider, ClientCreds, PvaMonitor,
};
#[derive(Clone)]
pub struct PvaPvHandle {
pub latest: Arc<parking_lot::Mutex<Option<PvField>>>,
pub subscribers: Arc<parking_lot::Mutex<Vec<mpsc::Sender<PvField>>>>,
pub descriptor: Option<FieldDesc>,
}
static PVA_PV_REGISTRY: std::sync::LazyLock<
std::sync::Mutex<std::collections::HashMap<String, PvaPvHandle>>,
> = std::sync::LazyLock::new(|| std::sync::Mutex::new(std::collections::HashMap::new()));
pub fn register_pva_pv_global(pv_name: &str, handle: PvaPvHandle) {
assert_handle_root_kind(pv_name, &handle);
PVA_PV_REGISTRY
.lock()
.unwrap()
.insert(pv_name.to_string(), handle);
}
fn assert_handle_root_kind(pv_name: &str, handle: &PvaPvHandle) {
if let Some(desc) = &handle.descriptor {
let guard = handle.latest.lock();
if let Some(value) = guard.as_ref() {
assert!(
root_kind_matches(value, desc),
"PvaPvHandle for {pv_name:?}: supplied descriptor root kind \
does not match value root kind ({value_kind} vs {desc_kind}) — \
introspection would disagree with served values",
value_kind = root_kind_name_value(value),
desc_kind = root_kind_name_desc(desc),
);
}
}
}
pub fn take_registered_pva_pvs() -> std::collections::HashMap<String, PvaPvHandle> {
std::mem::take(&mut *PVA_PV_REGISTRY.lock().unwrap())
}
fn root_kind_matches(value: &PvField, desc: &FieldDesc) -> bool {
matches!(
(value, desc),
(PvField::Scalar(_), FieldDesc::Scalar(_))
| (
PvField::ScalarArray(_) | PvField::ScalarArrayTyped(_),
FieldDesc::ScalarArray(_)
)
| (PvField::Structure(_), FieldDesc::Structure { .. })
| (PvField::StructureArray(_), FieldDesc::StructureArray { .. })
| (PvField::Union { .. }, FieldDesc::Union { .. })
| (PvField::UnionArray(_), FieldDesc::UnionArray { .. })
| (PvField::Variant(_) | PvField::Null, FieldDesc::Variant)
| (PvField::VariantArray(_), FieldDesc::VariantArray)
)
}
fn root_kind_name_value(v: &PvField) -> &'static str {
match v {
PvField::Scalar(_) => "Scalar",
PvField::ScalarArray(_) | PvField::ScalarArrayTyped(_) => "ScalarArray",
PvField::Structure(_) => "Structure",
PvField::StructureArray(_) => "StructureArray",
PvField::Union { .. } => "Union",
PvField::UnionArray(_) => "UnionArray",
PvField::Variant(_) => "Variant",
PvField::VariantArray(_) => "VariantArray",
PvField::Null => "Null",
}
}
fn root_kind_name_desc(d: &FieldDesc) -> &'static str {
match d {
FieldDesc::Scalar(_) => "Scalar",
FieldDesc::ScalarArray(_) => "ScalarArray",
FieldDesc::Structure { .. } => "Structure",
FieldDesc::StructureArray { .. } => "StructureArray",
FieldDesc::Union { .. } => "Union",
FieldDesc::UnionArray { .. } => "UnionArray",
FieldDesc::Variant => "Variant",
FieldDesc::VariantArray => "VariantArray",
FieldDesc::BoundedString(_) => "BoundedString",
}
}
fn ctx_to_creds(ctx: &epics_pva_rs::server_native::source::ChannelContext) -> ClientCreds {
ClientCreds {
user: ctx.account.clone(),
host: ctx.host.clone(),
method: ctx.method.clone(),
authority: ctx.authority.clone(),
roles: ctx.roles.clone(),
}
}
pub struct QsrvPvStore {
provider: Arc<BridgeProvider>,
pva_pvs: Arc<RwLock<HashMap<String, PvaPvHandle>>>,
}
impl QsrvPvStore {
pub fn new(provider: Arc<BridgeProvider>) -> Self {
Self {
provider,
pva_pvs: Arc::new(RwLock::new(HashMap::new())),
}
}
pub fn provider(&self) -> &Arc<BridgeProvider> {
&self.provider
}
pub async fn register_pva_pv(
&self,
pv_name: &str,
latest: Arc<parking_lot::Mutex<Option<PvField>>>,
subscribers: Arc<parking_lot::Mutex<Vec<mpsc::Sender<PvField>>>>,
descriptor: Option<FieldDesc>,
) {
let handle = PvaPvHandle {
latest,
subscribers,
descriptor,
};
assert_handle_root_kind(pv_name, &handle);
self.pva_pvs
.write()
.await
.insert(pv_name.to_string(), handle);
}
async fn channel(&self, name: &str) -> Option<AnyChannel> {
self.channel_for(name, "", "").await
}
async fn channel_for(&self, name: &str, user: &str, host: &str) -> Option<AnyChannel> {
self.provider
.create_channel_for(name, user, host)
.await
.ok()
}
}
impl epics_pva_rs::server_native::ChannelSource for QsrvPvStore {
fn get_value_checked(
&self,
checked: epics_pva_rs::server_native::source::AccessChecked,
ctx: epics_pva_rs::server_native::source::ChannelContext,
) -> impl std::future::Future<Output = Option<PvField>> + Send {
let pva_pvs = self.pva_pvs.clone();
let provider = self.provider.clone();
async move {
if !checked.allows_read() {
return None;
}
let name = checked.pv_name().to_string();
if let Some(handle) = pva_pvs.read().await.get(&name).cloned()
&& let Some(value) = handle.latest.lock().clone()
{
return Some(value);
}
let channel = provider
.create_channel_with_creds(&name, ctx_to_creds(&ctx))
.await
.ok()?;
let empty_request = PvStructure::new("");
let request = match &ctx.pv_request {
Some(PvField::Structure(s)) => s,
_ => &empty_request,
};
match channel.get(request).await {
Ok(pv) => Some(PvField::Structure(pv)),
Err(e) => {
tracing::debug!(
"qsrv get_value_checked({name}) {} from {}@{}: {e}",
ctx.account,
ctx.method,
ctx.host
);
None
}
}
}
}
fn put_value_checked(
&self,
checked: epics_pva_rs::server_native::source::AccessChecked,
value: PvField,
ctx: epics_pva_rs::server_native::source::ChannelContext,
) -> impl std::future::Future<Output = Result<(), String>> + Send {
let provider = self.provider.clone();
async move {
if !checked.allows_write() {
return Err(format!(
"PUT denied by access security: '{}' from {}@{}",
checked.pv_name(),
ctx.account,
ctx.host
));
}
let name = checked.pv_name().to_string();
let pv = match value {
PvField::Structure(s) => s,
other => return Err(format!("qsrv PUT expects a structure value, got {other}")),
};
let init_req = match ctx.pv_request {
Some(PvField::Structure(ref req)) => Some(req),
_ => None,
};
let opts = match init_req {
Some(req) => crate::qsrv::channel::PutOptions::from_pv_request(req),
None => crate::qsrv::channel::PutOptions::from_pv_request(&pv),
};
let channel = provider
.create_channel_with_creds(&name, ctx_to_creds(&ctx))
.await
.map_err(|e| e.to_string())?;
match channel {
crate::qsrv::AnyChannel::Single(single) => single
.put_with_options(&pv, opts)
.await
.map_err(|e| e.to_string()),
crate::qsrv::AnyChannel::Group(group) => {
let atomic_override = match init_req {
Some(req) => crate::qsrv::channel::atomic_from_pv_request(req),
None => crate::qsrv::channel::atomic_from_pv_request(&pv),
};
group
.put_with_options(&pv, opts, atomic_override)
.await
.map_err(|e| e.to_string())
}
}
}
}
fn process(&self, name: &str) -> impl std::future::Future<Output = Result<(), String>> + Send {
let provider = self.provider.clone();
let pva_pvs = self.pva_pvs.clone();
let name = name.to_string();
async move {
if pva_pvs.read().await.contains_key(&name) {
return Err(format!(
"PROCESS not supported for native PVA PV '{name}' (no processing chain)"
));
}
if provider.groups().contains_key(&name) {
return Err(format!(
"PROCESS not supported for group PV '{name}' (no record-level chain)"
));
}
let (record_name, _field) = epics_base_rs::server::database::parse_pv_name(&name);
provider
.database()
.process_record(record_name)
.await
.map_err(|e| format!("PROCESS on '{name}': {e}"))
}
}
#[allow(clippy::manual_async_fn)]
fn rpc_checked(
&self,
checked: epics_pva_rs::server_native::source::AccessChecked,
request_desc: FieldDesc,
request_value: PvField,
ctx: epics_pva_rs::server_native::source::ChannelContext,
) -> impl std::future::Future<Output = Result<(FieldDesc, PvField), String>> + Send {
async move {
if !checked.allows_read() {
return Err(format!(
"RPC denied by access security: '{}' from {}@{}",
checked.pv_name(),
ctx.account,
ctx.host,
));
}
let has_writes = match &request_value {
PvField::Structure(root) => {
if let Some((_, PvField::Structure(q))) =
root.fields.iter().find(|(n, _)| n == "query")
{
!q.fields.is_empty()
} else if root.struct_id == "epics:nt/NTURI:1.0" {
false
} else {
root.fields
.iter()
.any(|(n, _)| !n.starts_with("scheme") && !n.starts_with("path"))
}
}
_ => false,
};
if has_writes && !checked.allows_write() {
return Err(format!(
"RPC write denied by access security: '{}' from {}@{} \
(RPC query arguments require WRITE access in QSRV)",
checked.pv_name(),
ctx.account,
ctx.host,
));
}
self.rpc(checked.pv_name(), request_desc, request_value)
.await
}
}
fn subscribe_checked(
&self,
checked: epics_pva_rs::server_native::source::AccessChecked,
ctx: epics_pva_rs::server_native::source::ChannelContext,
) -> impl std::future::Future<Output = Option<mpsc::Receiver<PvField>>> + Send {
let provider = self.provider.clone();
let pva_pvs = self.pva_pvs.clone();
async move {
if !checked.allows_read() {
return None;
}
let name = checked.pv_name().to_string();
if let Some(handle) = pva_pvs.read().await.get(&name).cloned() {
let (tx, rx) = mpsc::channel::<PvField>(64);
{
let mut subs = handle.subscribers.lock();
subs.retain(|s| !s.is_closed());
subs.push(tx);
}
return Some(rx);
}
let channel = provider
.create_channel_with_creds(&name, ctx_to_creds(&ctx))
.await
.ok()?;
let dbe_mask = match ctx.pv_request {
Some(PvField::Structure(ref req)) => {
crate::qsrv::channel::dbe_mask_from_pv_request(req)
}
_ => None,
};
let queue_size = match ctx.pv_request {
Some(PvField::Structure(ref req)) => crate::qsrv::group::negotiated_queue_size(req),
_ => crate::qsrv::group::GROUP_DEFAULT_QUEUE_SIZE,
};
let mut monitor = match channel {
crate::qsrv::AnyChannel::Single(single) => {
single.create_monitor_with_value_mask(dbe_mask).await.ok()?
}
other => other
.create_monitor()
.await
.ok()?
.with_queue_size(queue_size),
};
monitor.start().await.ok()?;
let (tx, rx) = mpsc::channel::<PvField>(64);
tokio::spawn(async move {
while let Some(snapshot) = monitor.poll().await {
if tx.send(PvField::Structure(snapshot)).await.is_err() {
break;
}
}
monitor.stop().await;
});
Some(rx)
}
}
fn list_pvs(&self) -> impl std::future::Future<Output = Vec<String>> + Send {
let provider = self.provider.clone();
let pva_pvs = self.pva_pvs.clone();
async move {
let mut names = provider.channel_list().await;
for key in pva_pvs.read().await.keys() {
if !names.contains(key) {
names.push(key.clone());
}
}
names.sort();
names
}
}
fn has_pv(&self, name: &str) -> impl std::future::Future<Output = bool> + Send {
let provider = self.provider.clone();
let pva_pvs = self.pva_pvs.clone();
let name = name.to_string();
async move {
if pva_pvs.read().await.contains_key(&name) {
return true;
}
provider.channel_find(&name).await
}
}
fn get_introspection(
&self,
name: &str,
) -> impl std::future::Future<Output = Option<epics_pva_rs::pvdata::FieldDesc>> + Send {
let name_owned = name.to_string();
let pva_pvs = self.pva_pvs.clone();
async move {
if let Some(handle) = pva_pvs.read().await.get(&name_owned).cloned() {
if let Some(desc) = handle.descriptor.clone() {
return Some(desc);
}
if let Some(value) = handle.latest.lock().clone() {
return Some(value.descriptor());
}
}
let channel = self.channel(&name_owned).await?;
channel.get_field().await.ok()
}
}
fn get_value(&self, name: &str) -> impl std::future::Future<Output = Option<PvField>> + Send {
let name_owned = name.to_string();
let pva_pvs = self.pva_pvs.clone();
async move {
if let Some(handle) = pva_pvs.read().await.get(&name_owned).cloned()
&& let Some(value) = handle.latest.lock().clone()
{
return Some(value);
}
let channel = self.channel(&name_owned).await?;
let empty_request = PvStructure::new("");
match channel.get(&empty_request).await {
Ok(pv) => Some(PvField::Structure(pv)),
Err(e) => {
tracing::debug!("qsrv get_value({name_owned}) failed: {e}");
None
}
}
}
}
fn put_value(
&self,
name: &str,
value: PvField,
) -> impl std::future::Future<Output = Result<(), String>> + Send {
let name_owned = name.to_string();
async move {
let channel = self
.channel(&name_owned)
.await
.ok_or_else(|| format!("PV not found: {name_owned}"))?;
let pv = match value {
PvField::Structure(s) => s,
other => return Err(format!("qsrv PUT expects a structure value, got {other}")),
};
channel.put(&pv).await.map_err(|e| e.to_string())
}
}
fn is_writable(&self, name: &str) -> impl std::future::Future<Output = bool> + Send {
let provider = self.provider.clone();
let pva_pvs = self.pva_pvs.clone();
let name = name.to_string();
async move {
if pva_pvs.read().await.contains_key(&name) {
return false;
}
provider.is_writable(&name).await
}
}
fn rpc(
&self,
name: &str,
request_desc: FieldDesc,
request_value: PvField,
) -> impl std::future::Future<Output = Result<(FieldDesc, PvField), String>> + Send {
let name_owned = name.to_string();
let pva_pvs = self.pva_pvs.clone();
async move {
let _ = request_desc;
let query: Vec<(String, PvField)> = match &request_value {
PvField::Structure(s) => match s.get_field("query") {
Some(PvField::Structure(q)) => q.fields.clone(),
_ => s.fields.clone(),
},
PvField::Null => Vec::new(),
other => {
return Err(format!(
"qsrv RPC on {name_owned:?}: expected an NTURI or structure \
request, got {other}"
));
}
};
if let Some(handle) = pva_pvs.read().await.get(&name_owned).cloned() {
if !query.is_empty() {
return Err(format!(
"qsrv RPC on {name_owned:?}: native PVA PV is read-only, \
RPC query arguments are not accepted"
));
}
let value = handle.latest.lock().clone().ok_or_else(|| {
format!("qsrv RPC on {name_owned:?}: native PVA PV has no value yet")
})?;
let desc = handle
.descriptor
.clone()
.unwrap_or_else(|| value.descriptor());
return Ok((desc, value));
}
let channel = self
.channel(&name_owned)
.await
.ok_or_else(|| format!("qsrv RPC: PV not found: {name_owned}"))?;
if !query.is_empty() {
if let AnyChannel::Single(_) = &channel {
for (field, _) in &query {
let top = field.split('.').next().unwrap_or(field);
if top != "value" {
return Err(format!(
"qsrv RPC on {name_owned:?}: single-record channel \
accepts only the `value` query field, got {field:?}"
));
}
}
}
let mut put = PvStructure::new("epics:nt/NTScalar:1.0");
for (field, field_value) in &query {
super::group::set_nested_field(&mut put, field, field_value.clone());
}
channel
.put(&put)
.await
.map_err(|e| format!("qsrv RPC put on {name_owned} failed: {e}"))?;
}
let empty_request = PvStructure::new("");
let value = channel
.get(&empty_request)
.await
.map_err(|e| format!("qsrv RPC get-back on {name_owned} failed: {e}"))?;
let desc = channel
.get_field()
.await
.map_err(|e| format!("qsrv RPC introspection on {name_owned} failed: {e}"))?;
Ok((desc, PvField::Structure(value)))
}
}
fn subscribe(
&self,
name: &str,
) -> impl std::future::Future<Output = Option<mpsc::Receiver<PvField>>> + Send {
let name_owned = name.to_string();
let pva_pvs = self.pva_pvs.clone();
async move {
if let Some(handle) = pva_pvs.read().await.get(&name_owned).cloned() {
let (tx, rx) = mpsc::channel::<PvField>(64);
{
let mut subs = handle.subscribers.lock();
subs.retain(|s| !s.is_closed());
subs.push(tx);
}
return Some(rx);
}
let channel = self.channel(&name_owned).await?;
let mut monitor = channel.create_monitor().await.ok()?;
monitor.start().await.ok()?;
let (tx, rx) = mpsc::channel::<PvField>(64);
tokio::spawn(async move {
while let Some(snapshot) = monitor.poll().await {
if tx.send(PvField::Structure(snapshot)).await.is_err() {
break;
}
}
monitor.stop().await;
});
Some(rx)
}
}
fn monitor_emits_partial(&self, name: &str) -> bool {
self.provider.group_is_pure_self_trigger(name)
}
}
pub async fn run_ca_pva_qsrv_ioc(
config: epics_base_rs::server::ioc_app::IocRunConfig,
) -> epics_base_rs::error::CaResult<()> {
use epics_base_rs::error::CaError;
let db = config.db.clone();
let ca_port = config.port;
let pva_port: u16 = std::env::var("EPICS_PVA_SERVER_PORT")
.ok()
.and_then(|s| s.parse().ok())
.unwrap_or(5075);
let provider = Arc::new(BridgeProvider::new(db.clone()));
if let Some(acf_cfg) = config.acf.clone() {
let acf = Arc::new(super::provider::AcfAccessControl::new(db.clone(), acf_cfg));
provider.set_access_control(acf);
tracing::info!("qsrv: ACF installed on BridgeProvider");
}
let store = Arc::new(QsrvPvStore::new(provider));
let pva_pvs = take_registered_pva_pvs();
for (pv_name, handle) in pva_pvs {
tracing::info!(pv = %pv_name, "registering native PVA PV");
store
.register_pva_pv(
&pv_name,
handle.latest,
handle.subscribers,
handle.descriptor,
)
.await;
}
#[cfg(any(feature = "calink", feature = "pvalink"))]
let mut shell_commands = config.shell_commands;
#[cfg(not(any(feature = "calink", feature = "pvalink")))]
let shell_commands = config.shell_commands;
#[cfg(feature = "calink")]
{
match crate::calink::install_calink_resolver(&db, tokio::runtime::Handle::current()).await {
Ok(resolver) => {
shell_commands.extend(crate::calink::register_calink_commands(resolver));
tracing::info!("calink: `ca` link set installed");
}
Err(e) => {
tracing::warn!("calink: CA link set NOT installed: {e}");
}
}
}
#[cfg(feature = "pvalink")]
{
let resolver =
crate::pvalink::install_pvalink_resolver(&db, tokio::runtime::Handle::current()).await;
shell_commands.extend(crate::pvalink::register_pvalink_commands(resolver));
tracing::info!("pvalink: `pva` link set installed");
}
let ca_server = epics_ca_rs::server::CaServer::from_parts(
db.clone(),
ca_port,
None,
config.acf.clone(),
config.autosave_config.clone(),
config.autosave_manager.clone(),
);
epics_base_rs::runtime::task::spawn(async move {
if let Err(e) = ca_server.run().await {
eprintln!("CA server error: {e}");
}
});
let pva_server = epics_pva_rs::server::PvaServer::from_parts(
db,
pva_port,
config.acf,
config.autosave_config,
config.autosave_manager,
);
pva_server
.run_with_source_and_shell(store, move |shell| {
for cmd in shell_commands {
shell.register(cmd);
}
})
.await
.map_err(|e| CaError::InvalidValue(e.to_string()))
}
#[cfg(test)]
mod tests {
use super::*;
#[tokio::test]
async fn has_pv_falls_through_to_provider() {
use epics_base_rs::server::database::PvDatabase;
use epics_pva_rs::server_native::ChannelSource;
let db = Arc::new(PvDatabase::new());
db.add_pv("TEST:X", epics_base_rs::types::EpicsValue::Double(1.0))
.await
.unwrap();
let provider = Arc::new(BridgeProvider::new(db));
let store = QsrvPvStore::new(provider);
assert!(store.has_pv("TEST:X").await);
assert!(!store.has_pv("NOT:THERE").await);
}
#[tokio::test]
async fn get_introspection_uses_supplied_descriptor_for_union_array() {
use epics_base_rs::server::database::PvDatabase;
use epics_pva_rs::pvdata::{FieldDesc, PvField, ScalarType, ScalarValue, UnionItem};
use epics_pva_rs::server_native::ChannelSource;
let db = Arc::new(PvDatabase::new());
let provider = Arc::new(BridgeProvider::new(db));
let store = QsrvPvStore::new(provider);
let canonical = FieldDesc::UnionArray {
struct_id: String::new(),
variants: vec![
("as_int".into(), FieldDesc::Scalar(ScalarType::Int)),
("as_double".into(), FieldDesc::Scalar(ScalarType::Double)),
],
};
let value = PvField::UnionArray(vec![UnionItem {
selector: 0,
variant_name: "as_int".into(),
value: PvField::Scalar(ScalarValue::Int(7)),
}]);
let latest = Arc::new(parking_lot::Mutex::new(Some(value)));
let subscribers = Arc::new(parking_lot::Mutex::new(Vec::new()));
store
.register_pva_pv("TEST:UARR", latest, subscribers, Some(canonical.clone()))
.await;
let got = store.get_introspection("TEST:UARR").await.unwrap();
assert_eq!(got, canonical, "supplied descriptor must round-trip");
}
#[tokio::test]
async fn get_introspection_falls_back_to_value_descriptor_when_unset() {
use epics_base_rs::server::database::PvDatabase;
use epics_pva_rs::pvdata::{FieldDesc, PvField, UnionItem};
use epics_pva_rs::server_native::ChannelSource;
let db = Arc::new(PvDatabase::new());
let provider = Arc::new(BridgeProvider::new(db));
let store = QsrvPvStore::new(provider);
let value = PvField::UnionArray(vec![UnionItem {
selector: 0,
variant_name: "as_int".into(),
value: PvField::Scalar(epics_pva_rs::pvdata::ScalarValue::Int(7)),
}]);
let latest = Arc::new(parking_lot::Mutex::new(Some(value)));
let subscribers = Arc::new(parking_lot::Mutex::new(Vec::new()));
store
.register_pva_pv("TEST:UARR_LOSSY", latest, subscribers, None)
.await;
let got = store.get_introspection("TEST:UARR_LOSSY").await.unwrap();
assert_eq!(
got,
FieldDesc::UnionArray {
struct_id: String::new(),
variants: Vec::new(),
},
"documented lossy recovery: variants list must be empty"
);
}
#[test]
#[should_panic(expected = "root kind")]
fn register_pva_pv_global_panics_on_root_mismatch() {
use epics_pva_rs::pvdata::{FieldDesc, PvField, PvStructure, ScalarType};
let value = PvField::Structure(PvStructure::new("x"));
let bogus_desc = FieldDesc::UnionArray {
struct_id: String::new(),
variants: vec![("as_int".into(), FieldDesc::Scalar(ScalarType::Int))],
};
register_pva_pv_global(
"TEST:BOGUS_GLOBAL",
PvaPvHandle {
latest: Arc::new(parking_lot::Mutex::new(Some(value))),
subscribers: Arc::new(parking_lot::Mutex::new(Vec::new())),
descriptor: Some(bogus_desc),
},
);
}
#[tokio::test]
#[should_panic(expected = "root kind")]
async fn register_pva_pv_panics_on_descriptor_value_root_mismatch() {
use epics_base_rs::server::database::PvDatabase;
use epics_pva_rs::pvdata::{FieldDesc, PvField, PvStructure, ScalarType};
let db = Arc::new(PvDatabase::new());
let provider = Arc::new(BridgeProvider::new(db));
let store = QsrvPvStore::new(provider);
let value = PvField::Structure(PvStructure::new("x"));
let bogus_desc = FieldDesc::UnionArray {
struct_id: String::new(),
variants: vec![("as_int".into(), FieldDesc::Scalar(ScalarType::Int))],
};
let latest = Arc::new(parking_lot::Mutex::new(Some(value)));
let subscribers = Arc::new(parking_lot::Mutex::new(Vec::new()));
store
.register_pva_pv("TEST:BOGUS", latest, subscribers, Some(bogus_desc))
.await;
}
#[tokio::test]
async fn rpc_on_qsrv_record_without_args_returns_current_value() {
use epics_base_rs::server::database::PvDatabase;
use epics_base_rs::server::records::ai::AiRecord;
use epics_pva_rs::pvdata::{PvField, ScalarValue};
use epics_pva_rs::server_native::ChannelSource;
let db = Arc::new(PvDatabase::new());
db.add_record("RPC:AI", Box::new(AiRecord::new(2.5)))
.await
.unwrap();
let provider = Arc::new(BridgeProvider::new(db));
let store = QsrvPvStore::new(provider);
let (desc, value) = store
.rpc("RPC:AI", FieldDesc::Variant, PvField::Null)
.await
.expect("RPC on a QSRV record must succeed");
assert!(
matches!(desc, FieldDesc::Structure { .. }),
"RPC response descriptor must be the record's NT structure"
);
let s = match value {
PvField::Structure(s) => s,
other => panic!("RPC response must be a structure, got {other}"),
};
match s.get_field("value") {
Some(PvField::Scalar(ScalarValue::Double(v))) => {
assert_eq!(*v, 2.5, "RPC must read back the record's current value")
}
other => panic!("expected scalar value field, got {other:?}"),
}
}
#[tokio::test]
async fn rpc_on_qsrv_record_with_query_args_writes_then_reads() {
use epics_base_rs::server::database::PvDatabase;
use epics_base_rs::server::records::ao::AoRecord;
use epics_pva_rs::pvdata::{PvField, PvStructure, ScalarValue};
use epics_pva_rs::server_native::ChannelSource;
let db = Arc::new(PvDatabase::new());
db.add_record("RPC:AO", Box::new(AoRecord::new(0.0)))
.await
.unwrap();
let provider = Arc::new(BridgeProvider::new(db));
let store = QsrvPvStore::new(provider);
let mut query = PvStructure::new("");
query
.fields
.push(("value".into(), PvField::Scalar(ScalarValue::Double(7.0))));
let mut nturi = PvStructure::new("epics:nt/NTURI:1.0");
nturi.fields.push((
"scheme".into(),
PvField::Scalar(ScalarValue::String("pva".into())),
));
nturi.fields.push((
"path".into(),
PvField::Scalar(ScalarValue::String("RPC:AO".into())),
));
nturi
.fields
.push(("query".into(), PvField::Structure(query)));
let request = PvField::Structure(nturi);
let (_desc, value) = store
.rpc("RPC:AO", request.descriptor(), request)
.await
.expect("RPC with query args must succeed");
let s = match value {
PvField::Structure(s) => s,
other => panic!("RPC response must be a structure, got {other}"),
};
match s.get_field("value") {
Some(PvField::Scalar(ScalarValue::Double(v))) => {
assert_eq!(
*v, 7.0,
"RPC query arg must have been written to the record"
)
}
other => panic!("expected scalar value field, got {other:?}"),
}
}
#[tokio::test]
async fn rpc_on_unknown_pv_errors() {
use epics_base_rs::server::database::PvDatabase;
use epics_pva_rs::server_native::ChannelSource;
let db = Arc::new(PvDatabase::new());
let provider = Arc::new(BridgeProvider::new(db));
let store = QsrvPvStore::new(provider);
let err = store
.rpc("NOPE:NOPV", FieldDesc::Variant, PvField::Null)
.await
.expect_err("RPC on a missing PV must error");
assert!(
err.contains("PV not found"),
"error must name the missing PV: {err}"
);
}
#[tokio::test]
async fn rpc_single_record_rejects_non_value_query_field() {
use epics_base_rs::server::database::PvDatabase;
use epics_base_rs::server::records::ao::AoRecord;
use epics_pva_rs::pvdata::{PvField, PvStructure, ScalarValue};
use epics_pva_rs::server_native::ChannelSource;
let db = Arc::new(PvDatabase::new());
db.add_record("RPC:AO2", Box::new(AoRecord::new(0.0)))
.await
.unwrap();
let provider = Arc::new(BridgeProvider::new(db));
let store = QsrvPvStore::new(provider);
let mut query = PvStructure::new("");
query
.fields
.push(("freq".into(), PvField::Scalar(ScalarValue::Double(1.0))));
let mut nturi = PvStructure::new("epics:nt/NTURI:1.0");
nturi
.fields
.push(("query".into(), PvField::Structure(query)));
let request = PvField::Structure(nturi);
let err = store
.rpc("RPC:AO2", request.descriptor(), request)
.await
.expect_err("non-`value` query field must be rejected");
assert!(
err.contains("freq") && err.contains("value"),
"error must name the bad field and the accepted one: {err}"
);
}
#[tokio::test]
async fn rpc_on_group_pv_writes_members() {
use epics_base_rs::server::database::PvDatabase;
use epics_base_rs::server::records::ai::AiRecord;
use epics_base_rs::server::records::longin::LonginRecord;
use epics_pva_rs::pvdata::{PvField, PvStructure, ScalarValue};
use epics_pva_rs::server_native::ChannelSource;
const GROUP_JSON: &str = r#"{
"RPC:GRP": {
"+id": "epics:nt/NTGroup:1.0",
"+atomic": true,
"level": { "+channel": "RPC:GRP:level.VAL", "+type": "plain", "+putorder": 0 },
"count": { "+channel": "RPC:GRP:count.VAL", "+type": "plain", "+putorder": 1 }
}
}"#;
let db = Arc::new(PvDatabase::new());
db.add_record("RPC:GRP:level", Box::new(AiRecord::new(1.0)))
.await
.unwrap();
db.add_record("RPC:GRP:count", Box::new(LonginRecord::new(2)))
.await
.unwrap();
let provider = Arc::new(BridgeProvider::new(db));
provider.load_group_config(GROUP_JSON).expect("load group");
provider.process_groups();
let store = QsrvPvStore::new(provider);
let mut query = PvStructure::new("");
query
.fields
.push(("level".into(), PvField::Scalar(ScalarValue::Double(9.0))));
query
.fields
.push(("count".into(), PvField::Scalar(ScalarValue::Long(8))));
let mut nturi = PvStructure::new("epics:nt/NTURI:1.0");
nturi
.fields
.push(("query".into(), PvField::Structure(query)));
let request = PvField::Structure(nturi);
let (_desc, value) = store
.rpc("RPC:GRP", request.descriptor(), request)
.await
.expect("RPC on a group PV must accept member query fields");
let s = match value {
PvField::Structure(s) => s,
other => panic!("group RPC response must be a structure, got {other}"),
};
assert_eq!(
s.get_field("level"),
Some(&PvField::Scalar(ScalarValue::Double(9.0))),
"group member `level` must reflect the RPC write"
);
match s.get_field("count") {
Some(PvField::Scalar(ScalarValue::Long(v))) => assert_eq!(*v, 8),
Some(PvField::Scalar(ScalarValue::Int(v))) => assert_eq!(*v as i64, 8),
other => panic!("group member `count` mismatch: {other:?}"),
}
}
#[tokio::test]
async fn pva_server_serves_canonical_union_array_descriptor_over_wire() {
use std::time::Duration;
use epics_base_rs::server::database::PvDatabase;
use epics_pva_rs::pvdata::{FieldDesc, PvField, ScalarType, ScalarValue, UnionItem};
use epics_pva_rs::server_native::{PvaServer, PvaServerConfig};
let db = Arc::new(PvDatabase::new());
let provider = Arc::new(BridgeProvider::new(db));
let store = Arc::new(QsrvPvStore::new(provider));
let canonical = FieldDesc::UnionArray {
struct_id: String::new(),
variants: vec![
("as_int".into(), FieldDesc::Scalar(ScalarType::Int)),
("as_double".into(), FieldDesc::Scalar(ScalarType::Double)),
],
};
let value = PvField::UnionArray(vec![UnionItem {
selector: 0,
variant_name: "as_int".into(),
value: PvField::Scalar(ScalarValue::Int(7)),
}]);
store
.register_pva_pv(
"TEST:WIRE:UARR",
Arc::new(parking_lot::Mutex::new(Some(value))),
Arc::new(parking_lot::Mutex::new(Vec::new())),
Some(canonical.clone()),
)
.await;
let server =
PvaServer::start(store, PvaServerConfig::isolated()).expect("test server must start");
let client = server.client_config();
let got = tokio::time::timeout(Duration::from_secs(5), client.pvinfo("TEST:WIRE:UARR"))
.await
.expect("pvinfo timeout")
.expect("pvinfo failed");
assert_eq!(
got, canonical,
"client-side introspection must recover the producer's UnionArray variants over the wire"
);
}
#[tokio::test]
async fn put_value_checked_honors_pv_request_process_force() {
use epics_base_rs::server::database::PvDatabase;
use epics_base_rs::server::records::ai::AiRecord;
use epics_base_rs::types::EpicsValue;
use epics_pva_rs::pvdata::{PvField, PvStructure, ScalarValue};
use epics_pva_rs::server_native::ChannelSource;
use epics_pva_rs::server_native::source::{AccessGate, ChannelContext};
let db = Arc::new(PvDatabase::new());
db.add_record("TEST:proc", Box::new(AiRecord::new(0.0)))
.await
.unwrap();
let provider = Arc::new(BridgeProvider::new(db.clone()));
let store = QsrvPvStore::new(provider);
let mut value = PvStructure::new("epics:nt/NTScalar:1.0");
value
.fields
.push(("value".into(), PvField::Scalar(ScalarValue::Double(2.5))));
let mut opts = PvStructure::new("");
opts.fields.push((
"process".into(),
PvField::Scalar(ScalarValue::String("true".into())),
));
let mut record = PvStructure::new("");
record
.fields
.push(("_options".into(), PvField::Structure(opts)));
let mut req = PvStructure::new("");
req.fields
.push(("record".into(), PvField::Structure(record)));
let ctx = ChannelContext {
peer: "127.0.0.1:5075".parse().unwrap(),
account: "anonymous".into(),
method: "anonymous".into(),
host: "127.0.0.1".into(),
authority: String::new(),
roles: Vec::new(),
pv_request: Some(PvField::Structure(req)),
};
let checked = AccessGate::open()
.check("TEST:proc", "127.0.0.1", "anonymous", "anonymous", "")
.await;
let val0 = {
let rec = db.get_record("TEST:proc").await.unwrap();
let inst = rec.read().await;
inst.snapshot_for_field("VAL").map(|s| s.value)
};
assert!(matches!(val0, Some(EpicsValue::Double(v)) if v == 0.0));
store
.put_value_checked(checked, PvField::Structure(value), ctx)
.await
.expect("put_value_checked must succeed");
let val1 = {
let rec = db.get_record("TEST:proc").await.unwrap();
let inst = rec.read().await;
inst.snapshot_for_field("VAL").map(|s| s.value)
};
assert!(
matches!(val1, Some(EpicsValue::Double(v)) if (v - 2.5).abs() < 1e-9),
"post-put VAL must be 2.5, got {val1:?}"
);
}
#[tokio::test]
async fn mr_r13_get_value_checked_forwards_pv_request() {
use epics_base_rs::server::database::PvDatabase;
use epics_base_rs::server::records::ai::AiRecord;
use epics_pva_rs::pvdata::{PvField, PvStructure};
use epics_pva_rs::server_native::ChannelSource;
use epics_pva_rs::server_native::source::{AccessGate, ChannelContext};
let db = Arc::new(PvDatabase::new());
db.add_record("TEST:mrr13", Box::new(AiRecord::new(1.5)))
.await
.unwrap();
let provider = Arc::new(BridgeProvider::new(db.clone()));
let store = QsrvPvStore::new(provider);
let value_sel = PvStructure::new("");
let mut field_spec = PvStructure::new("");
field_spec
.fields
.push(("value".into(), PvField::Structure(value_sel)));
let mut req = PvStructure::new("");
req.fields
.push(("field".into(), PvField::Structure(field_spec)));
let checked = AccessGate::open()
.check("TEST:mrr13", "127.0.0.1", "anonymous", "anonymous", "")
.await;
let ctx = ChannelContext {
peer: "127.0.0.1:5075".parse().unwrap(),
account: "anonymous".into(),
method: "anonymous".into(),
host: "127.0.0.1".into(),
authority: String::new(),
roles: Vec::new(),
pv_request: Some(PvField::Structure(req)),
};
let got = store
.get_value_checked(checked, ctx)
.await
.expect("get_value_checked must return a value");
let PvField::Structure(s) = got else {
panic!("expected a structure result");
};
assert!(
s.get_field("value").is_some(),
"projected `value` field must be present"
);
assert_eq!(
s.fields.len(),
1,
"pvRequest `field {{ value }}` must filter the GET to one field, got: {:?}",
s.fields.iter().map(|(n, _)| n).collect::<Vec<_>>()
);
}
#[test]
fn mr_r11_ctx_to_creds_forwards_roles() {
use epics_pva_rs::server_native::source::ChannelContext;
let ctx = ChannelContext {
peer: "127.0.0.1:5075".parse().unwrap(),
account: "alice".into(),
method: "ca".into(),
host: "ws01".into(),
authority: String::new(),
roles: vec!["operators".into(), "experts".into()],
pv_request: None,
};
let creds = ctx_to_creds(&ctx);
assert_eq!(
creds.roles,
vec!["operators".to_string(), "experts".to_string()],
"ctx_to_creds must forward ChannelContext.roles into ClientCreds"
);
assert_eq!(creds.user, "alice");
assert_eq!(creds.method, "ca");
}
#[tokio::test]
async fn process_runs_record_processing_for_single_record_pvs() {
use epics_base_rs::server::database::PvDatabase;
use epics_base_rs::server::records::ai::AiRecord;
use epics_pva_rs::server_native::ChannelSource;
let db = Arc::new(PvDatabase::new());
db.add_record("TEST:proc_call", Box::new(AiRecord::new(0.0)))
.await
.unwrap();
let provider = Arc::new(BridgeProvider::new(db.clone()));
let store = QsrvPvStore::new(provider);
let before = {
let rec = db.get_record("TEST:proc_call").await.unwrap();
let inst = rec.read().await;
inst.common.time
};
tokio::time::sleep(std::time::Duration::from_millis(10)).await;
store
.process("TEST:proc_call")
.await
.expect("PROCESS must run");
let after = {
let rec = db.get_record("TEST:proc_call").await.unwrap();
let inst = rec.read().await;
inst.common.time
};
assert!(
after >= before,
"PROCESS must touch the record's TIME (post-process \
timestamp must be >= pre): before={before:?}, after={after:?}"
);
assert!(
after > before,
"TIME must strictly advance after PROCESS (clock too \
coarse to discriminate the BR-R38 fix): {before:?} -> {after:?}"
);
}
#[tokio::test]
async fn process_rejects_unknown_or_group_pv() {
use epics_base_rs::server::database::PvDatabase;
use epics_pva_rs::server_native::ChannelSource;
let db = Arc::new(PvDatabase::new());
let provider = Arc::new(BridgeProvider::new(db));
let store = QsrvPvStore::new(provider);
let err = store
.process("UNKNOWN:PV")
.await
.expect_err("PROCESS on unknown PV must error");
assert!(
err.contains("UNKNOWN:PV"),
"error must name the PV; got: {err}"
);
}
#[tokio::test]
async fn mr_r10_group_put_honors_init_pv_request_process() {
use epics_base_rs::server::database::PvDatabase;
use epics_base_rs::server::records::longin::LonginRecord;
use epics_pva_rs::pvdata::{PvField, PvStructure, ScalarValue};
use epics_pva_rs::server_native::ChannelSource;
use epics_pva_rs::server_native::source::{AccessGate, ChannelContext};
const GROUP_JSON: &str = r#"{
"MRR10:grp": {
"+atomic": false,
"a": { "+channel": "MRR10:a.VAL", "+type": "plain", "+putorder": 0 },
"b": { "+channel": "MRR10:b.VAL", "+type": "plain", "+putorder": 1 }
}
}"#;
let db = Arc::new(PvDatabase::new());
db.add_record("MRR10:a", Box::new(LonginRecord::new(0)))
.await
.unwrap();
db.add_record("MRR10:b", Box::new(LonginRecord::new(0)))
.await
.unwrap();
let provider = Arc::new(BridgeProvider::new(db.clone()));
provider.load_group_config(GROUP_JSON).expect("load group");
let store = QsrvPvStore::new(provider);
let member_time = |db: Arc<PvDatabase>, rec_name: &'static str| async move {
let rec = db.get_record(rec_name).await.unwrap();
let inst = rec.read().await;
inst.common.time
};
let a_before = member_time(db.clone(), "MRR10:a").await;
let b_before = member_time(db.clone(), "MRR10:b").await;
tokio::time::sleep(std::time::Duration::from_millis(10)).await;
let mut value = PvStructure::new("structure");
value
.fields
.push(("a".into(), PvField::Scalar(ScalarValue::Long(11))));
value
.fields
.push(("b".into(), PvField::Scalar(ScalarValue::Long(22))));
let mut opts = PvStructure::new("");
opts.fields.push((
"process".into(),
PvField::Scalar(ScalarValue::String("false".into())),
));
let mut record = PvStructure::new("");
record
.fields
.push(("_options".into(), PvField::Structure(opts)));
let mut req = PvStructure::new("");
req.fields
.push(("record".into(), PvField::Structure(record)));
let ctx = ChannelContext {
peer: "127.0.0.1:5075".parse().unwrap(),
account: "anonymous".into(),
method: "anonymous".into(),
host: "127.0.0.1".into(),
authority: String::new(),
roles: Vec::new(),
pv_request: Some(PvField::Structure(req)),
};
let checked = AccessGate::open()
.check("MRR10:grp", "127.0.0.1", "anonymous", "anonymous", "")
.await;
store
.put_value_checked(checked, PvField::Structure(value), ctx)
.await
.expect("group put_value_checked must succeed");
let a_val = {
let rec = db.get_record("MRR10:a").await.unwrap();
let inst = rec.read().await;
inst.snapshot_for_field("VAL").map(|s| s.value)
};
assert!(
matches!(a_val, Some(epics_base_rs::types::EpicsValue::Long(11))),
"member a VAL must be 11, got {a_val:?}"
);
let a_after = member_time(db.clone(), "MRR10:a").await;
let b_after = member_time(db.clone(), "MRR10:b").await;
assert_eq!(
a_after, a_before,
"member a TIME must NOT advance: process=false in the INIT \
pvRequest must suppress member processing (got {a_before:?} \
-> {a_after:?})"
);
assert_eq!(
b_after, b_before,
"member b TIME must NOT advance: process=false in the INIT \
pvRequest must suppress member processing (got {b_before:?} \
-> {b_after:?})"
);
}
}