use std::sync::Arc;
use epics_pva_rs::pvdata::{FieldDesc, PvField};
use epics_pva_rs::server_native::ChannelContext;
use epics_pva_rs::server_native::source::{ChannelSource, RawMonitorEvent};
use tokio::sync::mpsc;
pub trait Layer<S: ChannelSource>: Send + Sync + 'static {
type Wrapped: ChannelSource;
fn layer(self, inner: S) -> Self::Wrapped;
}
pub struct ReadOnlyLayer;
pub struct ReadOnly<S> {
inner: Arc<S>,
}
impl<S: ChannelSource> Layer<S> for ReadOnlyLayer {
type Wrapped = ReadOnly<S>;
fn layer(self, inner: S) -> ReadOnly<S> {
ReadOnly {
inner: Arc::new(inner),
}
}
}
impl<S: ChannelSource> ChannelSource for ReadOnly<S> {
fn access(&self) -> &epics_pva_rs::server_native::source::AccessGate {
self.inner.access()
}
async fn list_pvs(&self) -> Vec<String> {
self.inner.list_pvs().await
}
async fn has_pv(&self, name: &str) -> bool {
self.inner.has_pv(name).await
}
async fn get_introspection(&self, name: &str) -> Option<FieldDesc> {
self.inner.get_introspection(name).await
}
async fn get_value(&self, name: &str) -> Option<PvField> {
self.inner.get_value(name).await
}
async fn put_value(&self, _name: &str, _value: PvField) -> Result<(), String> {
Err("read-only mode: PUT rejected".into())
}
async fn put_value_checked(
&self,
_checked: epics_pva_rs::server_native::source::AccessChecked,
_value: PvField,
_ctx: ChannelContext,
) -> Result<(), String> {
Err("read-only mode: PUT rejected".into())
}
async fn put_delta_checked(
&self,
_checked: epics_pva_rs::server_native::source::AccessChecked,
_desc: FieldDesc,
_changed: epics_pva_rs::proto::BitSet,
_delta: PvField,
_ctx: ChannelContext,
) -> Result<(), String> {
Err("read-only mode: PUT rejected".into())
}
async fn process(&self, _name: &str) -> Result<(), String> {
Err("read-only mode: PROCESS rejected".into())
}
async fn process_checked(
&self,
_checked: epics_pva_rs::server_native::source::AccessChecked,
_ctx: ChannelContext,
) -> Result<(), String> {
Err("read-only mode: PROCESS rejected".into())
}
async fn is_writable(&self, _name: &str) -> bool {
false
}
async fn subscribe(&self, name: &str) -> Option<mpsc::Receiver<PvField>> {
self.inner.subscribe(name).await
}
async fn get_value_checked(
&self,
checked: epics_pva_rs::server_native::source::AccessChecked,
ctx: ChannelContext,
) -> Option<PvField> {
self.inner.get_value_checked(checked, ctx).await
}
async fn subscribe_checked(
&self,
checked: epics_pva_rs::server_native::source::AccessChecked,
ctx: ChannelContext,
) -> Option<mpsc::Receiver<PvField>> {
self.inner.subscribe_checked(checked, ctx).await
}
async fn subscribe_raw(&self, name: &str) -> Option<mpsc::Receiver<RawMonitorEvent>> {
self.inner.subscribe_raw(name).await
}
async fn subscribe_raw_checked(
&self,
checked: epics_pva_rs::server_native::source::AccessChecked,
ctx: ChannelContext,
) -> Option<mpsc::Receiver<RawMonitorEvent>> {
self.inner.subscribe_raw_checked(checked, ctx).await
}
async fn subscribe_checked_opts(
&self,
checked: epics_pva_rs::server_native::source::AccessChecked,
ctx: ChannelContext,
opts: epics_pva_rs::server_native::MonitorOptions,
) -> Option<mpsc::Receiver<PvField>> {
self.inner.subscribe_checked_opts(checked, ctx, opts).await
}
async fn subscribe_raw_checked_opts(
&self,
checked: epics_pva_rs::server_native::source::AccessChecked,
ctx: ChannelContext,
opts: epics_pva_rs::server_native::MonitorOptions,
) -> Option<mpsc::Receiver<RawMonitorEvent>> {
self.inner
.subscribe_raw_checked_opts(checked, ctx, opts)
.await
}
async fn rpc(
&self,
name: &str,
request_desc: FieldDesc,
request_value: PvField,
) -> Result<(FieldDesc, PvField), String> {
self.inner.rpc(name, request_desc, request_value).await
}
async fn rpc_checked(
&self,
checked: epics_pva_rs::server_native::source::AccessChecked,
request_desc: FieldDesc,
request_value: PvField,
ctx: ChannelContext,
) -> Result<(FieldDesc, PvField), String> {
self.inner
.rpc_checked(checked, request_desc, request_value, ctx)
.await
}
fn notify_watermark_high(&self, name: &str) {
self.inner.notify_watermark_high(name);
}
fn notify_watermark_low(&self, name: &str) {
self.inner.notify_watermark_low(name);
}
}
#[derive(Clone, Default)]
pub struct AclConfig {
pub deny: Vec<String>,
pub allow_only: Vec<String>,
deny_re: Vec<regex::Regex>,
allow_re: Vec<regex::Regex>,
}
impl AclConfig {
pub fn deny_regex(mut self, pattern: &str) -> Result<Self, regex::Error> {
self.deny_re.push(compile_anchored(pattern)?);
Ok(self)
}
pub fn allow_regex(mut self, pattern: &str) -> Result<Self, regex::Error> {
self.allow_re.push(compile_anchored(pattern)?);
Ok(self)
}
fn has_allow_list(&self) -> bool {
!self.allow_only.is_empty() || !self.allow_re.is_empty()
}
pub fn allowed(&self, name: &str) -> bool {
if self.deny.iter().any(|p| matches_pattern(p, name))
|| self.deny_re.iter().any(|re| re.is_match(name))
{
return false;
}
if self.has_allow_list() {
let allowed = self.allow_only.iter().any(|p| matches_pattern(p, name))
|| self.allow_re.iter().any(|re| re.is_match(name));
if !allowed {
return false;
}
}
true
}
}
const ACL_REGEX_SIZE_LIMIT: usize = 256 * 1024;
fn compile_anchored(pattern: &str) -> Result<regex::Regex, regex::Error> {
regex::RegexBuilder::new(&format!("^(?:{pattern})$"))
.size_limit(ACL_REGEX_SIZE_LIMIT)
.dfa_size_limit(ACL_REGEX_SIZE_LIMIT)
.build()
}
fn matches_pattern(pattern: &str, name: &str) -> bool {
if let Some(prefix) = pattern.strip_suffix('*') {
return name.starts_with(prefix);
}
if let Some(suffix) = pattern.strip_prefix('*') {
return name.ends_with(suffix);
}
name == pattern
}
pub struct AclLayer {
config: AclConfig,
}
impl AclLayer {
pub fn new(config: AclConfig) -> Self {
Self { config }
}
}
pub struct Acl<S> {
inner: Arc<S>,
config: AclConfig,
}
impl<S: ChannelSource> Layer<S> for AclLayer {
type Wrapped = Acl<S>;
fn layer(self, inner: S) -> Acl<S> {
Acl {
inner: Arc::new(inner),
config: self.config,
}
}
}
impl<S: ChannelSource> ChannelSource for Acl<S> {
async fn list_pvs(&self) -> Vec<String> {
let mut names = self.inner.list_pvs().await;
names.retain(|n| self.config.allowed(n));
names
}
async fn has_pv(&self, name: &str) -> bool {
if !self.config.allowed(name) {
return false;
}
self.inner.has_pv(name).await
}
async fn get_introspection(&self, name: &str) -> Option<FieldDesc> {
if !self.config.allowed(name) {
return None;
}
self.inner.get_introspection(name).await
}
async fn get_value(&self, name: &str) -> Option<PvField> {
if !self.config.allowed(name) {
return None;
}
self.inner.get_value(name).await
}
async fn put_value(&self, name: &str, value: PvField) -> Result<(), String> {
if !self.config.allowed(name) {
return Err(format!("ACL: PV '{name}' denied"));
}
self.inner.put_value(name, value).await
}
async fn process(&self, name: &str) -> Result<(), String> {
if !self.config.allowed(name) {
return Err(format!("ACL: PV '{name}' denied"));
}
self.inner.process(name).await
}
fn access(&self) -> &epics_pva_rs::server_native::source::AccessGate {
self.inner.access()
}
async fn get_value_checked(
&self,
checked: epics_pva_rs::server_native::source::AccessChecked,
ctx: ChannelContext,
) -> Option<PvField> {
if !self.config.allowed(checked.pv_name()) {
return None;
}
self.inner.get_value_checked(checked, ctx).await
}
async fn put_value_checked(
&self,
checked: epics_pva_rs::server_native::source::AccessChecked,
value: PvField,
ctx: ChannelContext,
) -> Result<(), String> {
if !self.config.allowed(checked.pv_name()) {
return Err(format!("ACL: PV '{}' denied", checked.pv_name()));
}
self.inner.put_value_checked(checked, value, ctx).await
}
async fn put_delta_checked(
&self,
checked: epics_pva_rs::server_native::source::AccessChecked,
desc: FieldDesc,
changed: epics_pva_rs::proto::BitSet,
delta: PvField,
ctx: ChannelContext,
) -> Result<(), String> {
if !self.config.allowed(checked.pv_name()) {
return Err(format!("ACL: PV '{}' denied", checked.pv_name()));
}
self.inner
.put_delta_checked(checked, desc, changed, delta, ctx)
.await
}
async fn is_writable(&self, name: &str) -> bool {
self.config.allowed(name) && self.inner.is_writable(name).await
}
async fn subscribe(&self, name: &str) -> Option<mpsc::Receiver<PvField>> {
if !self.config.allowed(name) {
return None;
}
self.inner.subscribe(name).await
}
async fn subscribe_checked(
&self,
checked: epics_pva_rs::server_native::source::AccessChecked,
ctx: ChannelContext,
) -> Option<mpsc::Receiver<PvField>> {
if !self.config.allowed(checked.pv_name()) {
return None;
}
self.inner.subscribe_checked(checked, ctx).await
}
async fn subscribe_raw(&self, name: &str) -> Option<mpsc::Receiver<RawMonitorEvent>> {
if !self.config.allowed(name) {
return None;
}
self.inner.subscribe_raw(name).await
}
async fn subscribe_raw_checked(
&self,
checked: epics_pva_rs::server_native::source::AccessChecked,
ctx: ChannelContext,
) -> Option<mpsc::Receiver<RawMonitorEvent>> {
if !self.config.allowed(checked.pv_name()) {
return None;
}
self.inner.subscribe_raw_checked(checked, ctx).await
}
async fn subscribe_checked_opts(
&self,
checked: epics_pva_rs::server_native::source::AccessChecked,
ctx: ChannelContext,
opts: epics_pva_rs::server_native::MonitorOptions,
) -> Option<mpsc::Receiver<PvField>> {
if !self.config.allowed(checked.pv_name()) {
return None;
}
self.inner.subscribe_checked_opts(checked, ctx, opts).await
}
async fn subscribe_raw_checked_opts(
&self,
checked: epics_pva_rs::server_native::source::AccessChecked,
ctx: ChannelContext,
opts: epics_pva_rs::server_native::MonitorOptions,
) -> Option<mpsc::Receiver<RawMonitorEvent>> {
if !self.config.allowed(checked.pv_name()) {
return None;
}
self.inner
.subscribe_raw_checked_opts(checked, ctx, opts)
.await
}
async fn rpc(
&self,
name: &str,
request_desc: FieldDesc,
request_value: PvField,
) -> Result<(FieldDesc, PvField), String> {
if !self.config.allowed(name) {
return Err(format!("ACL: PV '{name}' denied"));
}
self.inner.rpc(name, request_desc, request_value).await
}
async fn rpc_checked(
&self,
checked: epics_pva_rs::server_native::source::AccessChecked,
request_desc: FieldDesc,
request_value: PvField,
ctx: ChannelContext,
) -> Result<(FieldDesc, PvField), String> {
if !self.config.allowed(checked.pv_name()) {
return Err(format!("ACL: PV '{}' denied", checked.pv_name()));
}
self.inner
.rpc_checked(checked, request_desc, request_value, ctx)
.await
}
async fn process_checked(
&self,
checked: epics_pva_rs::server_native::source::AccessChecked,
ctx: ChannelContext,
) -> Result<(), String> {
if !self.config.allowed(checked.pv_name()) {
return Err(format!("ACL: PV '{}' denied", checked.pv_name()));
}
self.inner.process_checked(checked, ctx).await
}
fn notify_watermark_high(&self, name: &str) {
self.inner.notify_watermark_high(name);
}
fn notify_watermark_low(&self, name: &str) {
self.inner.notify_watermark_low(name);
}
}
pub trait AuditSink: Send + Sync + 'static {
fn record(&self, event: AuditEvent);
}
pub struct NoopAudit;
impl AuditSink for NoopAudit {
fn record(&self, _event: AuditEvent) {}
}
impl<A: AuditSink + ?Sized> AuditSink for Arc<A> {
fn record(&self, event: AuditEvent) {
(**self).record(event);
}
}
pub struct ClosureAudit<F: Fn(AuditEvent) + Send + Sync + 'static>(pub F);
impl<F: Fn(AuditEvent) + Send + Sync + 'static> AuditSink for ClosureAudit<F> {
fn record(&self, event: AuditEvent) {
(self.0)(event);
}
}
pub struct MpscAuditSink {
tx: tokio::sync::mpsc::Sender<AuditEvent>,
drops: Arc<std::sync::atomic::AtomicU64>,
}
impl MpscAuditSink {
pub fn wrap<A: AuditSink>(capacity: usize, inner: A) -> Self {
let (tx, mut rx) = tokio::sync::mpsc::channel::<AuditEvent>(capacity.max(1));
tokio::spawn(async move {
while let Some(ev) = rx.recv().await {
inner.record(ev);
}
});
Self {
tx,
drops: Arc::new(std::sync::atomic::AtomicU64::new(0)),
}
}
pub fn drops(&self) -> u64 {
self.drops.load(std::sync::atomic::Ordering::Relaxed)
}
}
impl AuditSink for MpscAuditSink {
fn record(&self, event: AuditEvent) {
if self.tx.try_send(event).is_err() {
self.drops
.fetch_add(1, std::sync::atomic::Ordering::Relaxed);
}
}
}
#[derive(Debug, Clone)]
pub struct AuditEvent {
pub pv: String,
pub event: AuditEventKind,
pub user: String,
pub host: String,
pub result: AuditResult,
pub timestamp: std::time::SystemTime,
pub error: String,
}
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
pub enum AuditEventKind {
Put,
Get,
Subscribe,
Rpc,
Process,
}
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
pub enum AuditResult {
Ok,
Denied,
Failed,
}
fn make_audit_event(name: &str, user: &str, host: &str, result: &Result<(), String>) -> AuditEvent {
let (kind, error) = match result {
Ok(_) => (AuditResult::Ok, String::new()),
Err(msg) => {
let lower = msg.to_lowercase();
if lower.contains("deny")
|| lower.contains("denied")
|| lower.contains("acl:")
|| lower.contains("read-only")
{
(AuditResult::Denied, msg.clone())
} else {
(AuditResult::Failed, msg.clone())
}
}
};
AuditEvent {
pv: name.to_string(),
event: AuditEventKind::Put,
user: user.to_string(),
host: host.to_string(),
result: kind,
timestamp: std::time::SystemTime::now(),
error,
}
}
pub struct AuditLayer<A: AuditSink> {
sink: Arc<A>,
audit_get: bool,
audit_subscribe: bool,
audit_rpc: bool,
}
impl<A: AuditSink> AuditLayer<A> {
pub fn new(sink: A) -> Self {
Self {
sink: Arc::new(sink),
audit_get: false,
audit_subscribe: false,
audit_rpc: false,
}
}
}
impl AuditLayer<MpscAuditSink> {
pub fn with_blocking_sink<I: AuditSink>(capacity: usize, inner: I) -> Self {
Self {
sink: Arc::new(MpscAuditSink::wrap(capacity, inner)),
audit_get: false,
audit_subscribe: false,
audit_rpc: false,
}
}
}
impl<A: AuditSink> AuditLayer<A> {
pub fn with_get(mut self) -> Self {
self.audit_get = true;
self
}
pub fn with_subscribe(mut self) -> Self {
self.audit_subscribe = true;
self
}
pub fn with_rpc(mut self) -> Self {
self.audit_rpc = true;
self
}
}
pub struct Audited<S, A> {
inner: Arc<S>,
sink: Arc<A>,
audit_get: bool,
audit_subscribe: bool,
audit_rpc: bool,
}
impl<S: ChannelSource, A: AuditSink> Layer<S> for AuditLayer<A> {
type Wrapped = Audited<S, A>;
fn layer(self, inner: S) -> Audited<S, A> {
Audited {
inner: Arc::new(inner),
sink: self.sink,
audit_get: self.audit_get,
audit_subscribe: self.audit_subscribe,
audit_rpc: self.audit_rpc,
}
}
}
impl<S: ChannelSource, A: AuditSink> ChannelSource for Audited<S, A> {
fn access(&self) -> &epics_pva_rs::server_native::source::AccessGate {
self.inner.access()
}
async fn list_pvs(&self) -> Vec<String> {
self.inner.list_pvs().await
}
async fn has_pv(&self, name: &str) -> bool {
self.inner.has_pv(name).await
}
async fn get_introspection(&self, name: &str) -> Option<FieldDesc> {
self.inner.get_introspection(name).await
}
async fn get_value(&self, name: &str) -> Option<PvField> {
let result = self.inner.get_value(name).await;
if self.audit_get {
let outcome: Result<(), String> = if result.is_some() {
Ok(())
} else {
Err(format!("PV '{name}' not found"))
};
let mut ev = make_audit_event(name, "", "", &outcome);
ev.event = AuditEventKind::Get;
self.sink.record(ev);
}
result
}
async fn put_value(&self, name: &str, value: PvField) -> Result<(), String> {
let result = self.inner.put_value(name, value).await;
self.sink.record(make_audit_event(name, "", "", &result));
result
}
async fn put_value_checked(
&self,
checked: epics_pva_rs::server_native::source::AccessChecked,
value: PvField,
ctx: ChannelContext,
) -> Result<(), String> {
let pv = checked.pv_name().to_string();
let user = ctx.account.clone();
let host = ctx.host.clone();
let result = self.inner.put_value_checked(checked, value, ctx).await;
self.sink
.record(make_audit_event(&pv, &user, &host, &result));
result
}
async fn put_delta_checked(
&self,
checked: epics_pva_rs::server_native::source::AccessChecked,
desc: FieldDesc,
changed: epics_pva_rs::proto::BitSet,
delta: PvField,
ctx: ChannelContext,
) -> Result<(), String> {
let pv = checked.pv_name().to_string();
let user = ctx.account.clone();
let host = ctx.host.clone();
let result = self
.inner
.put_delta_checked(checked, desc, changed, delta, ctx)
.await;
self.sink
.record(make_audit_event(&pv, &user, &host, &result));
result
}
async fn process(&self, name: &str) -> Result<(), String> {
let result = self.inner.process(name).await;
let mut ev = make_audit_event(name, "", "", &result);
ev.event = AuditEventKind::Process;
self.sink.record(ev);
result
}
async fn process_checked(
&self,
checked: epics_pva_rs::server_native::source::AccessChecked,
ctx: ChannelContext,
) -> Result<(), String> {
let pv = checked.pv_name().to_string();
let user = ctx.account.clone();
let host = ctx.host.clone();
let result = self.inner.process_checked(checked, ctx).await;
let mut ev = make_audit_event(&pv, &user, &host, &result);
ev.event = AuditEventKind::Process;
self.sink.record(ev);
result
}
async fn get_value_checked(
&self,
checked: epics_pva_rs::server_native::source::AccessChecked,
ctx: ChannelContext,
) -> Option<PvField> {
let pv = checked.pv_name().to_string();
let user = ctx.account.clone();
let host = ctx.host.clone();
let result = self.inner.get_value_checked(checked, ctx).await;
if self.audit_get {
let outcome: Result<(), String> = if result.is_some() {
Ok(())
} else {
Err(format!("PV '{pv}' not found / denied"))
};
let mut ev = make_audit_event(&pv, &user, &host, &outcome);
ev.event = AuditEventKind::Get;
self.sink.record(ev);
}
result
}
async fn subscribe_checked(
&self,
checked: epics_pva_rs::server_native::source::AccessChecked,
ctx: ChannelContext,
) -> Option<mpsc::Receiver<PvField>> {
let pv = checked.pv_name().to_string();
let user = ctx.account.clone();
let host = ctx.host.clone();
let result = self.inner.subscribe_checked(checked, ctx).await;
if self.audit_subscribe {
let outcome: Result<(), String> = if result.is_some() {
Ok(())
} else {
Err(format!("PV '{pv}' not subscribable"))
};
let mut ev = make_audit_event(&pv, &user, &host, &outcome);
ev.event = AuditEventKind::Subscribe;
self.sink.record(ev);
}
result
}
async fn is_writable(&self, name: &str) -> bool {
self.inner.is_writable(name).await
}
async fn subscribe(&self, name: &str) -> Option<mpsc::Receiver<PvField>> {
let result = self.inner.subscribe(name).await;
if self.audit_subscribe {
let outcome: Result<(), String> = if result.is_some() {
Ok(())
} else {
Err(format!("PV '{name}' not subscribable"))
};
let mut ev = make_audit_event(name, "", "", &outcome);
ev.event = AuditEventKind::Subscribe;
self.sink.record(ev);
}
result
}
async fn subscribe_raw(&self, name: &str) -> Option<mpsc::Receiver<RawMonitorEvent>> {
let result = self.inner.subscribe_raw(name).await;
if self.audit_subscribe {
let outcome: Result<(), String> = if result.is_some() {
Ok(())
} else {
Err(format!("PV '{name}' not subscribable (raw)"))
};
let mut ev = make_audit_event(name, "", "", &outcome);
ev.event = AuditEventKind::Subscribe;
self.sink.record(ev);
}
result
}
async fn subscribe_raw_checked(
&self,
checked: epics_pva_rs::server_native::source::AccessChecked,
ctx: epics_pva_rs::server_native::source::ChannelContext,
) -> Option<mpsc::Receiver<RawMonitorEvent>> {
let pv = checked.pv_name().to_string();
let user = ctx.account.clone();
let host = ctx.host.clone();
let result = self.inner.subscribe_raw_checked(checked, ctx).await;
if self.audit_subscribe {
let outcome: Result<(), String> = if result.is_some() {
Ok(())
} else {
Err(format!("PV '{pv}' not subscribable (raw)"))
};
let mut ev = make_audit_event(&pv, &user, &host, &outcome);
ev.event = AuditEventKind::Subscribe;
self.sink.record(ev);
}
result
}
async fn subscribe_checked_opts(
&self,
checked: epics_pva_rs::server_native::source::AccessChecked,
ctx: ChannelContext,
opts: epics_pva_rs::server_native::MonitorOptions,
) -> Option<mpsc::Receiver<PvField>> {
let pv = checked.pv_name().to_string();
let user = ctx.account.clone();
let host = ctx.host.clone();
let result = self.inner.subscribe_checked_opts(checked, ctx, opts).await;
if self.audit_subscribe {
let outcome: Result<(), String> = if result.is_some() {
Ok(())
} else {
Err(format!("PV '{pv}' not subscribable"))
};
let mut ev = make_audit_event(&pv, &user, &host, &outcome);
ev.event = AuditEventKind::Subscribe;
self.sink.record(ev);
}
result
}
async fn subscribe_raw_checked_opts(
&self,
checked: epics_pva_rs::server_native::source::AccessChecked,
ctx: ChannelContext,
opts: epics_pva_rs::server_native::MonitorOptions,
) -> Option<mpsc::Receiver<RawMonitorEvent>> {
let pv = checked.pv_name().to_string();
let user = ctx.account.clone();
let host = ctx.host.clone();
let result = self
.inner
.subscribe_raw_checked_opts(checked, ctx, opts)
.await;
if self.audit_subscribe {
let outcome: Result<(), String> = if result.is_some() {
Ok(())
} else {
Err(format!("PV '{pv}' not subscribable (raw)"))
};
let mut ev = make_audit_event(&pv, &user, &host, &outcome);
ev.event = AuditEventKind::Subscribe;
self.sink.record(ev);
}
result
}
async fn rpc(
&self,
name: &str,
request_desc: FieldDesc,
request_value: PvField,
) -> Result<(FieldDesc, PvField), String> {
let result = self.inner.rpc(name, request_desc, request_value).await;
if self.audit_rpc {
let outcome: Result<(), String> = match &result {
Ok(_) => Ok(()),
Err(e) => Err(e.clone()),
};
let mut ev = make_audit_event(name, "", "", &outcome);
ev.event = AuditEventKind::Rpc;
self.sink.record(ev);
}
result
}
async fn rpc_checked(
&self,
checked: epics_pva_rs::server_native::source::AccessChecked,
request_desc: FieldDesc,
request_value: PvField,
ctx: epics_pva_rs::server_native::source::ChannelContext,
) -> Result<(FieldDesc, PvField), String> {
let pv = checked.pv_name().to_string();
let user = ctx.account.clone();
let host = ctx.host.clone();
let result = self
.inner
.rpc_checked(checked, request_desc, request_value, ctx)
.await;
if self.audit_rpc {
let outcome: Result<(), String> = match &result {
Ok(_) => Ok(()),
Err(e) => Err(e.clone()),
};
let mut ev = make_audit_event(&pv, &user, &host, &outcome);
ev.event = AuditEventKind::Rpc;
self.sink.record(ev);
}
result
}
fn notify_watermark_high(&self, name: &str) {
self.inner.notify_watermark_high(name);
}
fn notify_watermark_low(&self, name: &str) {
self.inner.notify_watermark_low(name);
}
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn pattern_matching() {
assert!(matches_pattern("MOTOR:*", "MOTOR:VAL"));
assert!(matches_pattern("*VAL", "MOTOR:VAL"));
assert!(matches_pattern("EXACT", "EXACT"));
assert!(!matches_pattern("EXACT", "EXACT2"));
assert!(!matches_pattern("MOTOR:*", "OTHER:VAL"));
}
#[test]
fn acl_allow_only() {
let cfg = AclConfig {
allow_only: vec!["BL10C:*".into()],
..Default::default()
};
assert!(cfg.allowed("BL10C:VG-01:PRESSURE"));
assert!(!cfg.allowed("RFP:HV"));
}
#[test]
fn acl_deny_overrides_allow() {
let cfg = AclConfig {
allow_only: vec!["MOTOR:*".into()],
deny: vec!["MOTOR:JOG:*".into()],
..Default::default()
};
assert!(cfg.allowed("MOTOR:VAL"));
assert!(!cfg.allowed("MOTOR:JOG:UP"));
assert!(!cfg.allowed("OTHER:PV"));
}
#[test]
fn acl_regex_deny_anchored() {
let cfg = AclConfig::default().deny_regex(r"BL10C:.*:HV").unwrap();
assert!(!cfg.allowed("BL10C:RFP:HV"));
assert!(!cfg.allowed("BL10C::HV"));
assert!(cfg.allowed("X:BL10C:RFP:HV"));
assert!(cfg.allowed("BL10C:RFP:HV:SETPOINT"));
assert!(cfg.allowed("BL10D:RFP:HV"));
}
#[test]
fn acl_regex_allow_only_default_denies() {
let cfg = AclConfig::default().allow_regex(r"(SR|BL)\d+:.*").unwrap();
assert!(cfg.allowed("SR01:CURRENT"));
assert!(cfg.allowed("BL10:SHUTTER"));
assert!(!cfg.allowed("RFP:HV"));
assert!(!cfg.allowed("X-SR01:CURRENT"));
}
#[test]
fn acl_glob_and_regex_mixed() {
let cfg = AclConfig {
allow_only: vec!["MOTOR:*".into()],
..Default::default()
}
.allow_regex(r"TEMP:\d+")
.unwrap()
.deny_regex(r"MOTOR:.*:JOG")
.unwrap();
assert!(cfg.allowed("MOTOR:X:VAL"));
assert!(cfg.allowed("TEMP:42"));
assert!(!cfg.allowed("TEMP:hot")); assert!(!cfg.allowed("MOTOR:X:JOG"));
assert!(!cfg.allowed("RFP:HV"));
}
#[test]
fn acl_invalid_regex_rejected_at_build() {
assert!(AclConfig::default().deny_regex(r"BL10C:[").is_err());
assert!(AclConfig::default().allow_regex(r"(unclosed").is_err());
}
#[test]
fn acl_oversized_regex_rejected_by_size_limit() {
let pathological = r"(?:a{1000}){1000}";
match compile_anchored(pathological) {
Err(regex::Error::CompiledTooBig(_)) => {}
other => panic!("expected CompiledTooBig, got {other:?}"),
}
assert!(
AclConfig::default().deny_regex(pathological).is_err(),
"oversized deny regex must be rejected"
);
assert!(
AclConfig::default().allow_regex(pathological).is_err(),
"oversized allow regex must be rejected"
);
assert!(compile_anchored(r"BL\d+C:.*:HV").is_ok());
assert!(AclConfig::default().deny_regex(r"BL\d+C:.*:HV").is_ok());
}
#[tokio::test]
async fn acl_layer_applies_regex_via_channel_source() {
use super::super::channel_cache::{ChannelCache, DEFAULT_CLEANUP_INTERVAL};
use super::super::source::GatewayChannelSource;
use epics_pva_rs::client::PvaClient;
let client = Arc::new(PvaClient::builder().build());
let cache = ChannelCache::new(client, DEFAULT_CLEANUP_INTERVAL);
let inner = GatewayChannelSource::new(cache);
let cfg = AclConfig::default().deny_regex(r"SECRET:.*").unwrap();
let acl = AclLayer::new(cfg).layer(inner);
assert!(!acl.has_pv("SECRET:KEY").await);
let err = acl
.put_value(
"SECRET:KEY",
PvField::Scalar(epics_pva_rs::pvdata::ScalarValue::Double(0.0)),
)
.await
.expect_err("regex-denied PUT must fail at the ACL layer");
assert!(err.contains("denied"));
}
#[test]
fn audit_event_classifies_results() {
let denied = make_audit_event(
"MOTOR:VAL",
"alice",
"host1",
&Err("ACL: PV 'MOTOR:VAL' denied".into()),
);
assert_eq!(denied.result, AuditResult::Denied);
assert!(!denied.error.is_empty());
let read_only = make_audit_event(
"MOTOR:VAL",
"",
"",
&Err("read-only mode: PUT rejected".into()),
);
assert_eq!(read_only.result, AuditResult::Denied);
let failed = make_audit_event(
"MOTOR:VAL",
"alice",
"host1",
&Err("upstream timeout".into()),
);
assert_eq!(failed.result, AuditResult::Failed);
let ok = make_audit_event("MOTOR:VAL", "alice", "host1", &Ok(()));
assert_eq!(ok.result, AuditResult::Ok);
assert!(ok.error.is_empty());
}
use epics_pva_rs::pvdata::{ScalarType, ScalarValue};
use epics_pva_rs::server_native::source::{AccessChecked, AccessGate};
use std::sync::atomic::{AtomicBool, Ordering};
struct RecordingSource {
delta_reached: Arc<AtomicBool>,
value_reached: Arc<AtomicBool>,
process_reached: Arc<AtomicBool>,
}
impl ChannelSource for RecordingSource {
async fn list_pvs(&self) -> Vec<String> {
vec!["X".into()]
}
async fn has_pv(&self, _name: &str) -> bool {
true
}
async fn get_introspection(&self, _name: &str) -> Option<FieldDesc> {
Some(FieldDesc::Scalar(ScalarType::Double))
}
async fn get_value(&self, _name: &str) -> Option<PvField> {
Some(PvField::Scalar(ScalarValue::Double(0.0)))
}
async fn put_value(&self, _name: &str, _value: PvField) -> Result<(), String> {
Ok(())
}
async fn put_value_checked(
&self,
_checked: AccessChecked,
_value: PvField,
_ctx: ChannelContext,
) -> Result<(), String> {
self.value_reached.store(true, Ordering::SeqCst);
Ok(())
}
async fn put_delta_checked(
&self,
_checked: AccessChecked,
_desc: FieldDesc,
_changed: epics_pva_rs::proto::BitSet,
_delta: PvField,
_ctx: ChannelContext,
) -> Result<(), String> {
self.delta_reached.store(true, Ordering::SeqCst);
Ok(())
}
async fn process(&self, _name: &str) -> Result<(), String> {
self.process_reached.store(true, Ordering::SeqCst);
Ok(())
}
async fn is_writable(&self, _name: &str) -> bool {
true
}
async fn subscribe(&self, _name: &str) -> Option<mpsc::Receiver<PvField>> {
None
}
}
fn test_ctx() -> ChannelContext {
ChannelContext {
peer: "127.0.0.1:5075".parse().unwrap(),
account: "alice".into(),
method: "anonymous".into(),
host: "host1".into(),
authority: String::new(),
roles: Vec::new(),
pv_request: None,
}
}
async fn checked_for(name: &str) -> AccessChecked {
AccessGate::open()
.check(name, "host1", "alice", "anonymous", "")
.await
}
#[tokio::test]
async fn acl_forwards_put_delta_checked_to_inner() {
let delta_reached = Arc::new(AtomicBool::new(false));
let value_reached = Arc::new(AtomicBool::new(false));
let inner = RecordingSource {
delta_reached: delta_reached.clone(),
value_reached: value_reached.clone(),
process_reached: Arc::new(AtomicBool::new(false)),
};
let acl = AclLayer::new(AclConfig::default()).layer(inner);
let mut changed = epics_pva_rs::proto::BitSet::new();
changed.set(0);
let res = acl
.put_delta_checked(
checked_for("X").await,
FieldDesc::Scalar(ScalarType::Double),
changed,
PvField::Scalar(ScalarValue::Double(1.0)),
test_ctx(),
)
.await;
assert!(res.is_ok());
assert!(
delta_reached.load(Ordering::SeqCst),
"Acl must route delta PUT to inner put_delta_checked"
);
assert!(
!value_reached.load(Ordering::SeqCst),
"Acl must NOT fall back to the non-atomic put_value_checked merge"
);
}
#[tokio::test]
async fn acl_denied_put_delta_checked_short_circuits() {
let delta_reached = Arc::new(AtomicBool::new(false));
let value_reached = Arc::new(AtomicBool::new(false));
let inner = RecordingSource {
delta_reached: delta_reached.clone(),
value_reached: value_reached.clone(),
process_reached: Arc::new(AtomicBool::new(false)),
};
let cfg = AclConfig::default().deny_regex(r"SECRET:.*").unwrap();
let acl = AclLayer::new(cfg).layer(inner);
let mut changed = epics_pva_rs::proto::BitSet::new();
changed.set(0);
let err = acl
.put_delta_checked(
checked_for("SECRET:KEY").await,
FieldDesc::Scalar(ScalarType::Double),
changed,
PvField::Scalar(ScalarValue::Double(1.0)),
test_ctx(),
)
.await
.expect_err("ACL-denied delta PUT must fail at the layer");
assert!(err.contains("denied"));
assert!(!delta_reached.load(Ordering::SeqCst));
assert!(!value_reached.load(Ordering::SeqCst));
}
#[tokio::test]
async fn audited_forwards_put_delta_checked_and_records() {
let delta_reached = Arc::new(AtomicBool::new(false));
let value_reached = Arc::new(AtomicBool::new(false));
let inner = RecordingSource {
delta_reached: delta_reached.clone(),
value_reached: value_reached.clone(),
process_reached: Arc::new(AtomicBool::new(false)),
};
let events: Arc<std::sync::Mutex<Vec<AuditEvent>>> =
Arc::new(std::sync::Mutex::new(Vec::new()));
let events_sink = events.clone();
let audited = AuditLayer::new(ClosureAudit(move |ev| {
events_sink.lock().unwrap().push(ev);
}))
.layer(inner);
let mut changed = epics_pva_rs::proto::BitSet::new();
changed.set(0);
let res = audited
.put_delta_checked(
checked_for("X").await,
FieldDesc::Scalar(ScalarType::Double),
changed,
PvField::Scalar(ScalarValue::Double(1.0)),
test_ctx(),
)
.await;
assert!(res.is_ok());
assert!(
delta_reached.load(Ordering::SeqCst),
"Audited must route delta PUT to inner put_delta_checked"
);
assert!(
!value_reached.load(Ordering::SeqCst),
"Audited must NOT fall back to the non-atomic put_value_checked merge"
);
let recorded = events.lock().unwrap();
assert_eq!(recorded.len(), 1, "exactly one audit row per delta PUT");
assert_eq!(recorded[0].event, AuditEventKind::Put);
assert_eq!(recorded[0].result, AuditResult::Ok);
assert_eq!(recorded[0].pv, "X");
assert_eq!(recorded[0].user, "alice");
assert_eq!(recorded[0].host, "host1");
}
#[tokio::test]
async fn read_only_rejects_put_delta_checked() {
let delta_reached = Arc::new(AtomicBool::new(false));
let value_reached = Arc::new(AtomicBool::new(false));
let inner = RecordingSource {
delta_reached: delta_reached.clone(),
value_reached: value_reached.clone(),
process_reached: Arc::new(AtomicBool::new(false)),
};
let ro = ReadOnlyLayer.layer(inner);
let mut changed = epics_pva_rs::proto::BitSet::new();
changed.set(0);
let err = ro
.put_delta_checked(
checked_for("X").await,
FieldDesc::Scalar(ScalarType::Double),
changed,
PvField::Scalar(ScalarValue::Double(1.0)),
test_ctx(),
)
.await
.expect_err("read-only delta PUT must be rejected");
assert!(err.contains("read-only"));
assert!(!delta_reached.load(Ordering::SeqCst));
assert!(!value_reached.load(Ordering::SeqCst));
}
#[tokio::test]
async fn acl_forwards_process_checked_to_inner() {
let process_reached = Arc::new(AtomicBool::new(false));
let inner = RecordingSource {
delta_reached: Arc::new(AtomicBool::new(false)),
value_reached: Arc::new(AtomicBool::new(false)),
process_reached: process_reached.clone(),
};
let acl = AclLayer::new(AclConfig::default()).layer(inner);
let res = acl
.process_checked(checked_for("X").await, test_ctx())
.await;
assert!(res.is_ok());
assert!(
process_reached.load(Ordering::SeqCst),
"Acl must route PROCESS to the inner source's process_checked"
);
}
#[tokio::test]
async fn acl_denied_process_checked_short_circuits() {
let process_reached = Arc::new(AtomicBool::new(false));
let inner = RecordingSource {
delta_reached: Arc::new(AtomicBool::new(false)),
value_reached: Arc::new(AtomicBool::new(false)),
process_reached: process_reached.clone(),
};
let cfg = AclConfig::default().deny_regex(r"SECRET:.*").unwrap();
let acl = AclLayer::new(cfg).layer(inner);
let err = acl
.process_checked(checked_for("SECRET:KEY").await, test_ctx())
.await
.expect_err("ACL-denied PROCESS must fail at the layer");
assert!(err.contains("denied"));
assert!(
!process_reached.load(Ordering::SeqCst),
"ACL-denied PROCESS must not reach the inner source"
);
}
#[tokio::test]
async fn read_only_rejects_process_checked() {
let process_reached = Arc::new(AtomicBool::new(false));
let inner = RecordingSource {
delta_reached: Arc::new(AtomicBool::new(false)),
value_reached: Arc::new(AtomicBool::new(false)),
process_reached: process_reached.clone(),
};
let ro = ReadOnlyLayer.layer(inner);
let err = ro
.process_checked(checked_for("X").await, test_ctx())
.await
.expect_err("read-only PROCESS must be rejected");
assert!(err.contains("read-only"));
assert!(
!process_reached.load(Ordering::SeqCst),
"read-only PROCESS must not reach the inner source"
);
let err = ro
.process("X")
.await
.expect_err("read-only ctx-less PROCESS must be rejected");
assert!(err.contains("read-only"));
assert!(!process_reached.load(Ordering::SeqCst));
}
#[tokio::test]
async fn audited_forwards_process_checked_and_records() {
let process_reached = Arc::new(AtomicBool::new(false));
let inner = RecordingSource {
delta_reached: Arc::new(AtomicBool::new(false)),
value_reached: Arc::new(AtomicBool::new(false)),
process_reached: process_reached.clone(),
};
let events: Arc<std::sync::Mutex<Vec<AuditEvent>>> =
Arc::new(std::sync::Mutex::new(Vec::new()));
let events_sink = events.clone();
let audited = AuditLayer::new(ClosureAudit(move |ev| {
events_sink.lock().unwrap().push(ev);
}))
.layer(inner);
let res = audited
.process_checked(checked_for("X").await, test_ctx())
.await;
assert!(res.is_ok());
assert!(
process_reached.load(Ordering::SeqCst),
"Audited must route PROCESS to the inner source's process_checked"
);
let recorded = events.lock().unwrap();
assert_eq!(recorded.len(), 1, "exactly one audit row per PROCESS");
assert_eq!(recorded[0].event, AuditEventKind::Process);
assert_eq!(recorded[0].result, AuditResult::Ok);
assert_eq!(recorded[0].pv, "X");
assert_eq!(recorded[0].user, "alice");
assert_eq!(recorded[0].host, "host1");
}
#[tokio::test]
async fn layered_audited_acl_forwards_process_to_inner() {
let process_reached = Arc::new(AtomicBool::new(false));
let inner = RecordingSource {
delta_reached: Arc::new(AtomicBool::new(false)),
value_reached: Arc::new(AtomicBool::new(false)),
process_reached: process_reached.clone(),
};
let events: Arc<std::sync::Mutex<Vec<AuditEvent>>> =
Arc::new(std::sync::Mutex::new(Vec::new()));
let events_sink = events.clone();
let acl = AclLayer::new(AclConfig::default()).layer(inner);
let layered = AuditLayer::new(ClosureAudit(move |ev| {
events_sink.lock().unwrap().push(ev);
}))
.layer(acl);
let res = layered
.process_checked(checked_for("X").await, test_ctx())
.await;
assert!(res.is_ok());
assert!(
process_reached.load(Ordering::SeqCst),
"PROCESS through Audited(Acl(inner)) must reach the inner source"
);
assert_eq!(
events.lock().unwrap()[0].event,
AuditEventKind::Process,
"the layered PROCESS must still be audited"
);
}
#[tokio::test]
async fn layered_audited_acl_denies_process() {
let process_reached = Arc::new(AtomicBool::new(false));
let inner = RecordingSource {
delta_reached: Arc::new(AtomicBool::new(false)),
value_reached: Arc::new(AtomicBool::new(false)),
process_reached: process_reached.clone(),
};
let events: Arc<std::sync::Mutex<Vec<AuditEvent>>> =
Arc::new(std::sync::Mutex::new(Vec::new()));
let events_sink = events.clone();
let cfg = AclConfig::default().deny_regex(r"SECRET:.*").unwrap();
let acl = AclLayer::new(cfg).layer(inner);
let layered = AuditLayer::new(ClosureAudit(move |ev| {
events_sink.lock().unwrap().push(ev);
}))
.layer(acl);
let err = layered
.process_checked(checked_for("SECRET:KEY").await, test_ctx())
.await
.expect_err("ACL-denied PROCESS must fail through the layered stack");
assert!(err.contains("denied"));
assert!(
!process_reached.load(Ordering::SeqCst),
"ACL-denied PROCESS must not reach the inner source"
);
let recorded = events.lock().unwrap();
assert_eq!(recorded[0].event, AuditEventKind::Process);
assert_eq!(recorded[0].result, AuditResult::Denied);
}
struct OptsRecordingSource {
opts_reached: Arc<AtomicBool>,
}
impl ChannelSource for OptsRecordingSource {
async fn list_pvs(&self) -> Vec<String> {
vec!["X".into()]
}
async fn has_pv(&self, _name: &str) -> bool {
true
}
async fn get_introspection(&self, _name: &str) -> Option<FieldDesc> {
Some(FieldDesc::Scalar(ScalarType::Double))
}
async fn get_value(&self, _name: &str) -> Option<PvField> {
Some(PvField::Scalar(ScalarValue::Double(0.0)))
}
async fn put_value(&self, _name: &str, _value: PvField) -> Result<(), String> {
Ok(())
}
async fn is_writable(&self, _name: &str) -> bool {
false
}
async fn subscribe(&self, _name: &str) -> Option<mpsc::Receiver<PvField>> {
None
}
async fn subscribe_checked_opts(
&self,
_checked: AccessChecked,
_ctx: ChannelContext,
_opts: epics_pva_rs::server_native::MonitorOptions,
) -> Option<mpsc::Receiver<PvField>> {
self.opts_reached.store(true, Ordering::SeqCst);
None
}
async fn subscribe_raw_checked_opts(
&self,
_checked: AccessChecked,
_ctx: ChannelContext,
_opts: epics_pva_rs::server_native::MonitorOptions,
) -> Option<mpsc::Receiver<RawMonitorEvent>> {
self.opts_reached.store(true, Ordering::SeqCst);
None
}
}
#[tokio::test]
async fn middleware_layers_forward_subscribe_opts_to_inner() {
use epics_pva_rs::server_native::MonitorOptions;
for raw in [false, true] {
let opts_reached = Arc::new(AtomicBool::new(false));
let inner = OptsRecordingSource {
opts_reached: opts_reached.clone(),
};
let acl = AclLayer::new(AclConfig::default()).layer(inner);
let read_only = ReadOnlyLayer.layer(acl);
let layered = AuditLayer::new(NoopAudit).layer(read_only);
let opts = MonitorOptions {
pipeline: false,
queue_size: None,
server_filter: true,
};
if raw {
let _ = layered
.subscribe_raw_checked_opts(checked_for("X").await, test_ctx(), opts)
.await;
} else {
let _ = layered
.subscribe_checked_opts(checked_for("X").await, test_ctx(), opts)
.await;
}
assert!(
opts_reached.load(Ordering::SeqCst),
"middleware stack must forward subscribe{}_checked_opts to the inner source",
if raw { "_raw" } else { "" },
);
}
}
}