use std::sync::Arc;
use std::time::{Duration, SystemTime};
use epics_base_rs::error::{CaError, CaResult};
use epics_base_rs::server::device_support::{DeviceReadOutcome, DeviceSupport, WriteCompletion};
use epics_base_rs::server::record::{Record, ScanType};
use epics_base_rs::types::EpicsValue;
use crate::error::AsynError;
use crate::interfaces::InterfaceType;
use crate::interrupt::{InterruptFilter, InterruptSubscription};
use crate::port_handle::{AsyncCompletionHandle, PortHandle};
use crate::request::{RequestOp, RequestResult};
use crate::user::AsynUser;
#[derive(Debug, Clone)]
pub struct AsynLink {
pub port_name: String,
pub addr: i32,
pub timeout: Duration,
pub drv_info: String,
}
pub fn parse_asyn_link(s: &str) -> Result<AsynLink, AsynError> {
let s = s.trim();
let rest = s
.strip_prefix("@asyn(")
.ok_or_else(|| AsynError::InvalidLinkSyntax(format!("must start with @asyn(: {s}")))?;
let paren_end = rest
.find(')')
.ok_or_else(|| AsynError::InvalidLinkSyntax(format!("missing closing paren: {s}")))?;
let args_str = &rest[..paren_end];
let drv_info = rest[paren_end + 1..].trim().to_string();
let parts: Vec<&str> = if args_str.contains(',') {
args_str.split(',').map(|p| p.trim()).collect()
} else {
args_str.split_whitespace().collect()
};
if parts.is_empty() || parts[0].is_empty() {
return Err(AsynError::InvalidLinkSyntax("portName is required".into()));
}
let port_name = parts[0].to_string();
let addr = if parts.len() > 1 {
parts[1]
.parse::<i32>()
.map_err(|_| AsynError::InvalidLinkSyntax(format!("invalid addr: {}", parts[1])))?
} else {
0
};
let timeout = if parts.len() > 2 {
let secs: f64 = parts[2]
.parse()
.map_err(|_| AsynError::InvalidLinkSyntax(format!("invalid timeout: {}", parts[2])))?;
Duration::from_secs_f64(secs)
} else {
Duration::from_secs(1)
};
Ok(AsynLink {
port_name,
addr,
timeout,
drv_info,
})
}
#[derive(Debug, Clone)]
pub struct AsynMaskLink {
pub port_name: String,
pub addr: i32,
pub mask: u32,
pub timeout: Duration,
pub drv_info: String,
}
pub fn parse_asyn_mask_link(s: &str) -> Result<AsynMaskLink, AsynError> {
let s = s.trim();
let rest = s
.strip_prefix("@asynMask(")
.ok_or_else(|| AsynError::InvalidLinkSyntax(format!("must start with @asynMask(: {s}")))?;
let paren_end = rest
.find(')')
.ok_or_else(|| AsynError::InvalidLinkSyntax(format!("missing closing paren: {s}")))?;
let args_str = &rest[..paren_end];
let drv_info = rest[paren_end + 1..].trim().to_string();
let parts: Vec<&str> = args_str.split(',').map(|p| p.trim()).collect();
if parts.len() < 3 {
return Err(AsynError::InvalidLinkSyntax(
"asynMask requires at least 3 arguments: portName, addr, mask".into(),
));
}
let port_name = parts[0].to_string();
let addr = parts[1]
.parse::<i32>()
.map_err(|_| AsynError::InvalidLinkSyntax(format!("invalid addr: {}", parts[1])))?;
let mask_str = parts[2];
let mask = if let Some(hex) = mask_str
.strip_prefix("0x")
.or_else(|| mask_str.strip_prefix("0X"))
{
u32::from_str_radix(hex, 16)
.map_err(|_| AsynError::InvalidLinkSyntax(format!("invalid mask: {mask_str}")))?
} else {
mask_str
.parse::<u32>()
.map_err(|_| AsynError::InvalidLinkSyntax(format!("invalid mask: {mask_str}")))?
};
let timeout = if parts.len() > 3 {
let secs: f64 = parts[3]
.parse()
.map_err(|_| AsynError::InvalidLinkSyntax(format!("invalid timeout: {}", parts[3])))?;
Duration::from_secs_f64(secs)
} else {
Duration::from_secs(1)
};
Ok(AsynMaskLink {
port_name,
addr,
mask,
timeout,
drv_info,
})
}
pub struct AsynDeviceSupport {
handle: PortHandle,
addr: i32,
timeout: Duration,
drv_info: String,
reason: usize,
reason_set: bool,
iface_type: String,
iface: Option<InterfaceType>,
mask: u32,
last_alarm_status: u16,
last_alarm_severity: u16,
last_ts: Option<SystemTime>,
record_name: String,
scan: ScanType,
max_array_elements: usize,
octet_max_size: usize,
initial_readback: bool,
asyn_readback: bool,
write_only: bool,
interrupt_sub: Option<InterruptSubscription>,
interrupt_fifo: Arc<std::sync::Mutex<InterruptFifo>>,
}
struct CachedInterrupt {
value: crate::param::ParamValue,
timestamp: SystemTime,
}
struct InterruptFifo {
entries: std::collections::VecDeque<CachedInterrupt>,
ring_size: usize,
overflows: u64,
}
const DEFAULT_RING_BUFFER_SIZE: usize = 10;
impl InterruptFifo {
fn new() -> Self {
Self {
entries: std::collections::VecDeque::with_capacity(DEFAULT_RING_BUFFER_SIZE),
ring_size: DEFAULT_RING_BUFFER_SIZE,
overflows: 0,
}
}
fn push_with_overflow(&mut self, entry: CachedInterrupt) -> bool {
if self.entries.len() >= self.ring_size {
self.entries.pop_front();
self.overflows += 1;
self.entries.push_back(entry);
false
} else {
self.entries.push_back(entry);
true
}
}
fn pop(&mut self) -> Option<CachedInterrupt> {
self.entries.pop_front()
}
fn take_overflows(&mut self) -> u64 {
std::mem::take(&mut self.overflows)
}
}
impl AsynDeviceSupport {
pub fn from_handle(handle: PortHandle, link: AsynLink, iface_type: &str) -> Self {
let iface = InterfaceType::from_asyn_name(iface_type);
Self {
handle,
addr: link.addr,
timeout: link.timeout,
drv_info: link.drv_info,
reason: 0,
reason_set: false,
iface_type: iface_type.to_string(),
iface,
mask: 0xFFFFFFFF,
max_array_elements: 307200,
octet_max_size: 256,
last_alarm_status: 0,
last_alarm_severity: 0,
last_ts: None,
record_name: String::new(),
scan: ScanType::Passive,
initial_readback: false,
asyn_readback: false,
write_only: false,
interrupt_sub: None,
interrupt_fifo: Arc::new(std::sync::Mutex::new(InterruptFifo::new())),
}
}
pub fn with_interface_handle(handle: PortHandle, link: AsynLink, iface: InterfaceType) -> Self {
Self::from_handle(handle, link, iface.asyn_name())
}
pub fn with_mask(mut self, mask: u32) -> Self {
self.mask = mask;
self
}
pub fn with_initial_readback(mut self) -> Self {
self.initial_readback = true;
self
}
pub fn set_asyn_readback(&mut self, on: bool) {
self.asyn_readback = on;
}
pub fn set_initial_readback(&mut self, on: bool) {
self.initial_readback = on;
}
pub fn set_fifo_size(&mut self, n: usize) {
let mut g = self.interrupt_fifo.lock().unwrap();
g.ring_size = n.max(1);
while g.entries.len() > g.ring_size {
g.entries.pop_front();
g.overflows = g.overflows.saturating_add(1);
}
}
pub fn set_drv_info(&mut self, drv_info: &str) {
self.drv_info = drv_info.to_string();
}
pub fn set_iface_type(&mut self, iface_type: &str) {
self.iface_type = iface_type.to_string();
self.iface = InterfaceType::from_asyn_name(iface_type);
}
pub fn set_reason(&mut self, reason: usize) {
self.reason = reason;
self.reason_set = true;
}
pub fn reason(&self) -> usize {
self.reason
}
pub fn addr(&self) -> i32 {
self.addr
}
pub fn handle(&self) -> &PortHandle {
&self.handle
}
pub fn write_op_pub(&self, val: &EpicsValue) -> Option<RequestOp> {
self.write_op(val)
}
}
fn compute_mask_shift(mask: u32) -> u32 {
let mut bit: u32 = 1;
for i in 0..32 {
if (mask & bit) != 0 {
return i;
}
bit <<= 1;
}
32
}
fn parse_info_bool(raw: &str) -> bool {
let v = raw.trim();
!v.is_empty()
&& !v.eq_ignore_ascii_case("0")
&& !v.eq_ignore_ascii_case("no")
&& !v.eq_ignore_ascii_case("false")
}
fn asyn_to_ca_error(e: AsynError) -> CaError {
CaError::Protocol(e.to_string())
}
fn asyn_error_to_alarm(e: &AsynError) -> (u16, u16) {
match e {
AsynError::Status {
status: crate::error::AsynStatus::Timeout,
..
} => (7, 2), AsynError::Status {
status: crate::error::AsynStatus::Disconnected,
..
}
| AsynError::Status {
status: crate::error::AsynStatus::Disabled,
..
} => (9, 3), _ => (7, 2), }
}
fn param_value_to_epics_value(pv: &crate::param::ParamValue) -> Option<EpicsValue> {
use crate::param::ParamValue;
match pv {
ParamValue::Int32(v) => Some(EpicsValue::Long(*v)),
ParamValue::Int64(v) => Some(EpicsValue::Double(*v as f64)),
ParamValue::Float64(v) => Some(EpicsValue::Double(*v)),
ParamValue::Octet(s) => Some(EpicsValue::String(s.clone())),
ParamValue::UInt32Digital(v) => Some(EpicsValue::Long(*v as i32)),
ParamValue::Enum { index, .. } => Some(EpicsValue::Enum(*index as u16)),
ParamValue::Int8Array(a) => {
Some(EpicsValue::CharArray(a.iter().map(|&x| x as u8).collect()))
}
ParamValue::Int16Array(a) => Some(EpicsValue::ShortArray(a.to_vec())),
ParamValue::Int32Array(a) => Some(EpicsValue::LongArray(a.to_vec())),
ParamValue::Int64Array(a) => {
Some(EpicsValue::LongArray(a.iter().map(|&x| x as i32).collect()))
}
ParamValue::Float32Array(a) => Some(EpicsValue::FloatArray(a.to_vec())),
ParamValue::Float64Array(a) => Some(EpicsValue::DoubleArray(a.to_vec())),
_ => None,
}
}
struct AsynAsyncWriteCompletion {
handle: parking_lot::Mutex<Option<AsyncCompletionHandle>>,
}
impl WriteCompletion for AsynAsyncWriteCompletion {
fn wait(&self, timeout: Duration) -> CaResult<()> {
if let Some(h) = self.handle.lock().take() {
match h.wait_blocking(timeout) {
Ok(_) => Ok(()),
Err(e) => Err(CaError::Protocol(e.to_string())),
}
} else {
Ok(())
}
}
}
impl AsynDeviceSupport {
fn apply_linear_eslo_eoff(&self, record: &mut dyn Record) {
let op = if self.iface_type == "asynInt64" {
RequestOp::GetBoundsInt64
} else {
RequestOp::GetBoundsInt32
};
let user = AsynUser::new(self.reason)
.with_addr(self.addr)
.with_timeout(self.timeout);
let result = match self.handle.submit_blocking(op, user) {
Ok(r) => r,
Err(_) => return,
};
let (low, high) = match result.bounds {
Some((l, h)) => (l as f64, h as f64),
None => return,
};
if (high - low).abs() < f64::EPSILON {
return;
}
let eguf = match record.get_field("EGUF") {
Some(EpicsValue::Double(v)) => v,
_ => return,
};
let egul = match record.get_field("EGUL") {
Some(EpicsValue::Double(v)) => v,
_ => return,
};
let denom = high - low;
let eslo = (eguf - egul) / denom;
let eoff = (high * egul - low * eguf) / denom;
let _ = record.put_field("ESLO", EpicsValue::Double(eslo));
let _ = record.put_field("EOFF", EpicsValue::Double(eoff));
}
fn read_op(&self) -> Option<RequestOp> {
match self.iface_type.as_str() {
"asynInt32" => Some(RequestOp::Int32Read),
"asynInt64" => Some(RequestOp::Int64Read),
"asynFloat64" => Some(RequestOp::Float64Read),
"asynOctet" => Some(RequestOp::OctetRead {
buf_size: self.octet_max_size,
}),
"asynUInt32Digital" => Some(RequestOp::UInt32DigitalRead { mask: self.mask }),
"asynEnum" => Some(RequestOp::EnumRead),
"asynInt8Array" => Some(RequestOp::Int8ArrayRead {
max_elements: self.max_array_elements,
}),
"asynInt16Array" => Some(RequestOp::Int16ArrayRead {
max_elements: self.max_array_elements,
}),
"asynInt32Array" => Some(RequestOp::Int32ArrayRead {
max_elements: self.max_array_elements,
}),
"asynInt64Array" => Some(RequestOp::Int64ArrayRead {
max_elements: self.max_array_elements,
}),
"asynFloat32Array" => Some(RequestOp::Float32ArrayRead {
max_elements: self.max_array_elements,
}),
"asynFloat64Array" => Some(RequestOp::Float64ArrayRead {
max_elements: self.max_array_elements,
}),
_ => None,
}
}
fn result_to_value(&self, result: &RequestResult) -> Option<EpicsValue> {
match self.iface_type.as_str() {
"asynInt32" => result.int_val.map(EpicsValue::Long),
"asynInt64" => result.int64_val.map(|v| EpicsValue::Double(v as f64)),
"asynFloat64" => result.float_val.map(EpicsValue::Double),
"asynOctet" => result.data.as_ref().map(|d| {
let n = result.nbytes.min(d.len());
EpicsValue::String(String::from_utf8_lossy(&d[..n]).into_owned())
}),
"asynUInt32Digital" => result.uint_val.map(|v| EpicsValue::Long(v as i32)),
"asynEnum" => result.enum_index.map(|v| EpicsValue::Enum(v as u16)),
"asynInt8Array" => result
.int8_array
.clone()
.map(|v| EpicsValue::CharArray(v.iter().map(|&x| x as u8).collect())),
"asynInt16Array" => result.int16_array.clone().map(EpicsValue::ShortArray),
"asynInt32Array" => result.int32_array.clone().map(EpicsValue::LongArray),
"asynInt64Array" => result
.int64_array
.clone()
.map(|v| EpicsValue::LongArray(v.iter().map(|&x| x as i32).collect())),
"asynFloat32Array" => result.float32_array.clone().map(EpicsValue::FloatArray),
"asynFloat64Array" => result.float64_array.clone().map(EpicsValue::DoubleArray),
_ => None,
}
}
fn write_op(&self, val: &EpicsValue) -> Option<RequestOp> {
match (self.iface_type.as_str(), val) {
("asynInt32", EpicsValue::Long(v)) => Some(RequestOp::Int32Write { value: *v }),
("asynInt32", EpicsValue::Enum(v)) => Some(RequestOp::Int32Write { value: *v as i32 }),
("asynInt32", EpicsValue::Short(v)) => Some(RequestOp::Int32Write { value: *v as i32 }),
("asynInt32", EpicsValue::Double(v)) => {
Some(RequestOp::Int32Write { value: *v as i32 })
}
("asynInt32", EpicsValue::Float(v)) => Some(RequestOp::Int32Write { value: *v as i32 }),
("asynInt64", EpicsValue::Long(v)) => Some(RequestOp::Int64Write { value: *v as i64 }),
("asynInt64", EpicsValue::Double(v)) => {
Some(RequestOp::Int64Write { value: *v as i64 })
}
("asynFloat64", EpicsValue::Double(v)) => Some(RequestOp::Float64Write { value: *v }),
("asynFloat64", EpicsValue::Long(v)) => {
Some(RequestOp::Float64Write { value: *v as f64 })
}
("asynFloat64", EpicsValue::Float(v)) => {
Some(RequestOp::Float64Write { value: *v as f64 })
}
("asynFloat64", EpicsValue::Short(v)) => {
Some(RequestOp::Float64Write { value: *v as f64 })
}
("asynFloat64", EpicsValue::Enum(v)) => {
Some(RequestOp::Float64Write { value: *v as f64 })
}
("asynOctet", EpicsValue::String(s)) => Some(RequestOp::OctetWrite {
data: s.as_bytes().to_vec(),
}),
("asynOctet", EpicsValue::CharArray(data)) => {
let len = data.iter().position(|&b| b == 0).unwrap_or(data.len());
Some(RequestOp::OctetWrite {
data: data[..len].to_vec(),
})
}
("asynOctet", v) => {
let s = format!("{v}");
Some(RequestOp::OctetWrite {
data: s.as_bytes().to_vec(),
})
}
("asynUInt32Digital", EpicsValue::Long(v)) => Some(RequestOp::UInt32DigitalWrite {
value: *v as u32,
mask: self.mask,
}),
("asynUInt32Digital", EpicsValue::Enum(v)) => Some(RequestOp::UInt32DigitalWrite {
value: *v as u32,
mask: self.mask,
}),
("asynEnum", EpicsValue::Long(v)) => Some(RequestOp::EnumWrite { index: *v as usize }),
("asynEnum", EpicsValue::Enum(v)) => Some(RequestOp::EnumWrite { index: *v as usize }),
("asynInt8Array", EpicsValue::CharArray(data)) => Some(RequestOp::Int8ArrayWrite {
data: data.iter().map(|&x| x as i8).collect(),
}),
("asynInt16Array", EpicsValue::ShortArray(data)) => {
Some(RequestOp::Int16ArrayWrite { data: data.clone() })
}
("asynInt32Array", EpicsValue::LongArray(data)) => {
Some(RequestOp::Int32ArrayWrite { data: data.clone() })
}
("asynInt64Array", EpicsValue::LongArray(data)) => Some(RequestOp::Int64ArrayWrite {
data: data.iter().map(|&x| x as i64).collect(),
}),
("asynFloat32Array", EpicsValue::FloatArray(data)) => {
Some(RequestOp::Float32ArrayWrite { data: data.clone() })
}
("asynFloat64Array", EpicsValue::DoubleArray(data)) => {
Some(RequestOp::Float64ArrayWrite { data: data.clone() })
}
_ => None,
}
}
}
impl DeviceSupport for AsynDeviceSupport {
fn init(&mut self, record: &mut dyn Record) -> CaResult<()> {
if !self.reason_set {
match self.handle.drv_user_create_blocking(&self.drv_info) {
Ok(reason) => {
self.reason = reason;
}
Err(e) => {
eprintln!(
"[asyn] init FAILED: port='{}' drv_info='{}' err={e}",
self.handle.port_name(),
self.drv_info
);
self.reason_set = false;
return Ok(());
}
}
self.reason_set = true;
}
if let Some(EpicsValue::Long(nelm)) = record.get_field("NELM") {
if nelm > 0 {
self.max_array_elements = nelm as usize;
}
}
if let Some(EpicsValue::Short(sizv)) = record.get_field("SIZV") {
if sizv > 0 {
self.octet_max_size = sizv as usize;
}
}
if self.iface_type == "asynUInt32Digital" && self.mask != 0 {
let shft = compute_mask_shift(self.mask);
let _ = record.put_field("MASK", EpicsValue::Long(self.mask as i32));
let _ = record.put_field("SHFT", EpicsValue::Short(shft as i16));
}
if (self.iface_type == "asynInt32" || self.iface_type == "asynInt64")
&& record.get_field("ESLO").is_some()
{
self.apply_linear_eslo_eoff(record);
}
if self.initial_readback {
if let Some(op) = self.read_op() {
let user = AsynUser::new(self.reason)
.with_addr(self.addr)
.with_timeout(self.timeout);
if let Ok(result) = self.handle.submit_blocking(op, user) {
if let Some(val) = self.result_to_value(&result) {
let _ = record.set_val(val);
}
}
}
}
Ok(())
}
fn read(&mut self, record: &mut dyn Record) -> CaResult<DeviceReadOutcome> {
if !self.reason_set {
return Ok(DeviceReadOutcome::ok());
}
if self.write_only {
if let Some(val) = record.val() {
if let Some(op) = self.write_op(&val) {
let user = AsynUser::new(self.reason)
.with_addr(self.addr)
.with_timeout(self.timeout);
let _ = self.handle.submit_blocking(op, user);
}
}
return Ok(DeviceReadOutcome::ok());
}
if self.scan == ScanType::IoIntr {
let (entry, overflows) = {
let mut fifo = self.interrupt_fifo.lock().unwrap();
(fifo.pop(), fifo.take_overflows())
};
if overflows > 0 {
tracing::warn!(
target: "asyn_rs::adapter",
port = %self.handle.port_name(),
record = %self.record_name,
overflows = overflows,
"ring buffer overflows (C asyn ASYN_TRACE_WARNING)"
);
}
if let Some(ci) = entry {
if let Some(val) = param_value_to_epics_value(&ci.value) {
let _ = record.set_val(val);
}
self.last_ts = Some(ci.timestamp);
}
return Ok(DeviceReadOutcome::computed());
}
if let Some(op) = self.read_op() {
let user = AsynUser::new(self.reason)
.with_addr(self.addr)
.with_timeout(self.timeout);
match self.handle.submit_blocking(op, user) {
Ok(result) => {
if let Some(val) = self.result_to_value(&result) {
let _ = record.set_val(val);
}
self.last_alarm_status = result.alarm_status;
self.last_alarm_severity = result.alarm_severity;
self.last_ts = result.timestamp;
}
Err(e) => {
let (alarm_status, alarm_severity) = asyn_error_to_alarm(&e);
self.last_alarm_status = alarm_status;
self.last_alarm_severity = alarm_severity;
}
}
}
Ok(DeviceReadOutcome::computed())
}
fn write(&mut self, record: &mut dyn Record) -> CaResult<()> {
if !self.reason_set {
return Ok(());
}
if let Some(val) = record.val() {
if let Some(op) = self.write_op(&val) {
let user = AsynUser::new(self.reason)
.with_addr(self.addr)
.with_timeout(self.timeout);
self.handle
.submit_blocking(op, user)
.map_err(asyn_to_ca_error)?;
}
}
Ok(())
}
fn dtyp(&self) -> &str {
&self.iface_type
}
fn last_alarm(&self) -> Option<(u16, u16)> {
if self.last_alarm_status == 0 && self.last_alarm_severity == 0 {
None
} else {
Some((self.last_alarm_status, self.last_alarm_severity))
}
}
fn last_timestamp(&self) -> Option<SystemTime> {
self.last_ts
}
fn set_record_info(&mut self, name: &str, scan: ScanType) {
self.record_name = name.to_string();
self.scan = scan;
}
fn apply_record_info(&mut self, info: &std::collections::HashMap<String, String>) {
if let Some(raw) = info.get("asyn:READBACK") {
self.set_asyn_readback(parse_info_bool(raw));
}
if let Some(raw) = info.get("asyn:INITIAL_READBACK") {
self.set_initial_readback(parse_info_bool(raw));
}
if let Some(raw) = info.get("asyn:FIFO") {
if let Ok(n) = raw.trim().parse::<i64>() {
if n > 0 {
self.set_fifo_size(n as usize);
}
}
}
}
fn write_begin(
&mut self,
record: &mut dyn Record,
) -> CaResult<Option<Box<dyn WriteCompletion>>> {
let val = match record.val() {
Some(v) => v,
None => return Ok(None),
};
let op = match self.write_op(&val) {
Some(op) => op,
None => return Ok(None),
};
let user = AsynUser::new(self.reason)
.with_addr(self.addr)
.with_timeout(self.timeout);
if !self.handle.can_block() {
let _ = self
.handle
.submit_blocking(op, user)
.map_err(asyn_to_ca_error)?;
return Ok(None); }
let completion = self.handle.try_submit(op, user).map_err(asyn_to_ca_error)?;
Ok(Some(Box::new(AsynAsyncWriteCompletion {
handle: parking_lot::Mutex::new(Some(completion)),
})))
}
fn io_intr_receiver(&mut self) -> Option<tokio::sync::mpsc::Receiver<()>> {
if !self.reason_set {
return None;
}
if self.scan != ScanType::IoIntr && !self.asyn_readback {
return None;
}
let filter = InterruptFilter {
reason: Some(self.reason),
addr: Some(self.addr),
uint32_mask: None,
};
let (sub, mut intr_rx) = self.handle.interrupts().register_interrupt_user(filter);
self.interrupt_sub = Some(sub);
let (tx, rx) = tokio::sync::mpsc::channel(16);
let fifo = self.interrupt_fifo.clone();
tokio::spawn(async move {
while let Some(iv) = intr_rx.recv().await {
let entry = CachedInterrupt {
value: iv.value,
timestamp: iv.timestamp,
};
let was_fresh_add = {
let mut g = fifo.lock().unwrap();
g.push_with_overflow(entry)
};
if was_fresh_add && tx.send(()).await.is_err() {
break;
}
}
});
Some(rx)
}
}
fn normalize_asyn_dtyp(dtyp: &str) -> String {
if let Some(base) = dtyp.strip_suffix("In").or_else(|| dtyp.strip_suffix("Out")) {
if base.ends_with("Array") {
return base.to_string();
}
}
if dtyp == "asynOctetRead" || dtyp == "asynOctetWrite" {
return "asynOctet".to_string();
}
dtyp.to_string()
}
pub fn universal_asyn_factory(
ctx: &epics_base_rs::server::ioc_app::DeviceSupportContext,
) -> Option<Box<dyn DeviceSupport>> {
let (link_str, is_output) = if ctx.out.contains("@asyn") || ctx.out.contains("@asynMask") {
(ctx.out, true)
} else if ctx.inp.contains("@asyn") || ctx.inp.contains("@asynMask") {
let is_write_dtyp = ctx.dtyp == "asynOctetWrite";
(ctx.inp, is_write_dtyp)
} else {
return None;
};
let link = if link_str.contains("@asynMask") {
let ml = parse_asyn_mask_link(link_str).ok()?;
AsynLink {
port_name: ml.port_name,
addr: ml.addr,
timeout: ml.timeout,
drv_info: ml.drv_info,
}
} else {
parse_asyn_link(link_str).ok()?
};
let entry = crate::asyn_record::get_port(&link.port_name)?;
let dtyp = normalize_asyn_dtyp(ctx.dtyp);
let mut adapter = AsynDeviceSupport::from_handle(entry.handle, link, &dtyp);
if is_output {
if ctx.dtyp == "asynOctetWrite" {
adapter.write_only = true;
} else {
adapter = adapter.with_initial_readback();
}
}
if link_str.contains("@asynMask") {
if let Ok(ml) = parse_asyn_mask_link(link_str) {
adapter = adapter.with_mask(ml.mask);
}
}
Some(Box::new(adapter))
}
pub fn register_asyn_device_support(
app: epics_base_rs::server::ioc_app::IocApplication,
) -> epics_base_rs::server::ioc_app::IocApplication {
app.register_dynamic_device_support(universal_asyn_factory)
}
pub fn register_asyn_device_support_for_builder(
builder: epics_base_rs::server::ioc_builder::IocBuilder,
) -> epics_base_rs::server::ioc_builder::IocBuilder {
builder.register_dynamic_device_support(universal_asyn_factory)
}
#[cfg(test)]
mod tests {
use super::*;
#[tokio::test]
async fn register_asyn_device_support_for_builder_compiles_and_attaches() {
use epics_base_rs::server::device_support::{DeviceReadOutcome, DeviceSupport};
use epics_base_rs::server::ioc_builder::IocBuilder;
use epics_base_rs::server::record::ScanType;
use epics_base_rs::types::EpicsValue;
let _ = (
ScanType::Passive,
EpicsValue::Double(0.0),
std::any::type_name::<dyn DeviceSupport>(),
std::any::type_name::<DeviceReadOutcome>(),
);
let _builder = register_asyn_device_support_for_builder(IocBuilder::new());
}
#[test]
fn test_parse_full() {
let link = parse_asyn_link("@asyn(myPort, 0, 1.0) TEMPERATURE").unwrap();
assert_eq!(link.port_name, "myPort");
assert_eq!(link.addr, 0);
assert_eq!(link.timeout, Duration::from_secs_f64(1.0));
assert_eq!(link.drv_info, "TEMPERATURE");
}
#[test]
fn test_parse_port_only() {
let link = parse_asyn_link("@asyn(port1) PARAM").unwrap();
assert_eq!(link.port_name, "port1");
assert_eq!(link.addr, 0);
assert_eq!(link.timeout, Duration::from_secs(1));
assert_eq!(link.drv_info, "PARAM");
}
#[test]
fn test_parse_port_and_addr() {
let link = parse_asyn_link("@asyn(port2, 3) VALUE").unwrap();
assert_eq!(link.port_name, "port2");
assert_eq!(link.addr, 3);
assert_eq!(link.drv_info, "VALUE");
}
#[test]
fn test_parse_fractional_timeout() {
let link = parse_asyn_link("@asyn(dev, 1, 0.5) CMD").unwrap();
assert_eq!(link.timeout, Duration::from_secs_f64(0.5));
}
#[test]
fn test_parse_no_drv_info() {
let link = parse_asyn_link("@asyn(port1)").unwrap();
assert_eq!(link.drv_info, "");
}
#[test]
fn test_parse_invalid_prefix() {
assert!(parse_asyn_link("@wrong(port)").is_err());
}
#[test]
fn test_parse_missing_paren() {
assert!(parse_asyn_link("@asyn(port").is_err());
}
#[test]
fn test_parse_invalid_addr() {
assert!(parse_asyn_link("@asyn(port, abc) X").is_err());
}
#[test]
fn test_parse_invalid_timeout() {
assert!(parse_asyn_link("@asyn(port, 0, xyz) X").is_err());
}
#[test]
fn test_parse_space_separated() {
let link = parse_asyn_link("@asyn(CB1 0)CIRC_BUFF_CONTROL").unwrap();
assert_eq!(link.port_name, "CB1");
assert_eq!(link.addr, 0);
assert_eq!(link.drv_info, "CIRC_BUFF_CONTROL");
}
#[test]
fn test_parse_space_separated_with_timeout() {
let link = parse_asyn_link("@asyn(PORT1 2 1.5) PARAM").unwrap();
assert_eq!(link.port_name, "PORT1");
assert_eq!(link.addr, 2);
assert_eq!(link.timeout, Duration::from_secs_f64(1.5));
assert_eq!(link.drv_info, "PARAM");
}
#[test]
fn test_parse_mask_link_full() {
let link = parse_asyn_mask_link("@asynMask(port1, 0, 0xFF, 2.0) BITS").unwrap();
assert_eq!(link.port_name, "port1");
assert_eq!(link.addr, 0);
assert_eq!(link.mask, 0xFF);
assert_eq!(link.timeout, Duration::from_secs_f64(2.0));
assert_eq!(link.drv_info, "BITS");
}
#[test]
fn test_parse_mask_link_no_timeout() {
let link = parse_asyn_mask_link("@asynMask(port1, 0, 255) BITS").unwrap();
assert_eq!(link.mask, 255);
assert_eq!(link.timeout, Duration::from_secs(1));
}
#[test]
fn test_parse_mask_link_hex_upper() {
let link = parse_asyn_mask_link("@asynMask(p, 0, 0XFF00) X").unwrap();
assert_eq!(link.mask, 0xFF00);
}
#[test]
fn test_parse_mask_link_too_few_args() {
assert!(parse_asyn_mask_link("@asynMask(port1, 0) BITS").is_err());
}
#[test]
fn test_parse_mask_link_invalid_prefix() {
assert!(parse_asyn_mask_link("@asyn(port1, 0, 0xFF) BITS").is_err());
}
use crate::error::AsynResult;
use crate::interrupt::InterruptManager;
use crate::param::ParamType;
use crate::port::{PortDriver, PortDriverBase, PortFlags};
use crate::port_actor::PortActor;
use std::sync::Arc;
struct TestPort {
base: PortDriverBase,
}
impl TestPort {
fn new() -> Self {
let mut base = PortDriverBase::new("test", 1, PortFlags::default());
base.create_param("VAL", ParamType::Int32).unwrap();
Self { base }
}
}
impl PortDriver for TestPort {
fn base(&self) -> &PortDriverBase {
&self.base
}
fn base_mut(&mut self) -> &mut PortDriverBase {
&mut self.base
}
}
struct BoundedPort {
base: PortDriverBase,
low32: i32,
high32: i32,
}
impl BoundedPort {
fn new(low32: i32, high32: i32) -> Self {
let mut base = PortDriverBase::new("test_bounds", 1, PortFlags::default());
base.create_param("VAL", ParamType::Int32).unwrap();
Self {
base,
low32,
high32,
}
}
}
impl PortDriver for BoundedPort {
fn base(&self) -> &PortDriverBase {
&self.base
}
fn base_mut(&mut self) -> &mut PortDriverBase {
&mut self.base
}
fn get_bounds_int32(&self, _user: &AsynUser) -> AsynResult<(i32, i32)> {
Ok((self.low32, self.high32))
}
}
fn make_bounded_adapter(low: i32, high: i32, iface: &str) -> AsynDeviceSupport {
let driver = BoundedPort::new(low, high);
let interrupts = Arc::new(InterruptManager::new(256));
let (tx, rx) = tokio::sync::mpsc::channel(256);
let actor = PortActor::new(Box::new(driver), rx);
std::thread::Builder::new()
.name("test-bounds-actor".into())
.spawn(move || actor.run())
.unwrap();
let handle = PortHandle::new(tx, "test_bounds".into(), interrupts);
let link = AsynLink {
port_name: "test_bounds".into(),
addr: 0,
timeout: Duration::from_secs(1),
drv_info: "VAL".into(),
};
let mut ads = AsynDeviceSupport::from_handle(handle, link, iface);
ads.set_record_info("TEST:AI", ScanType::Passive);
ads
}
fn make_adapter(scan: ScanType) -> AsynDeviceSupport {
let driver = TestPort::new();
let interrupts = Arc::new(InterruptManager::new(256));
let (tx, rx) = tokio::sync::mpsc::channel(256);
let actor = PortActor::new(Box::new(driver), rx);
std::thread::Builder::new()
.name("test-adapter-actor".into())
.spawn(move || actor.run())
.unwrap();
let handle = PortHandle::new(tx, "test".into(), interrupts);
let link = AsynLink {
port_name: "test".into(),
addr: 0,
timeout: Duration::from_secs(1),
drv_info: "VAL".into(),
};
let mut ads = AsynDeviceSupport::from_handle(handle, link, "asynInt32");
ads.set_record_info("TEST:REC", scan);
ads
}
#[test]
fn test_io_intr_receiver_none_when_passive() {
let mut ads = make_adapter(ScanType::Passive);
assert!(ads.io_intr_receiver().is_none());
}
#[tokio::test(flavor = "multi_thread", worker_threads = 2)]
async fn test_io_intr_receiver_some_when_io_intr() {
let mut ads = make_adapter(ScanType::IoIntr);
use epics_base_rs::server::records::longin::LonginRecord;
let mut rec = LonginRecord::new(0);
ads.init(&mut rec).unwrap();
let rx = ads.io_intr_receiver();
assert!(rx.is_some());
}
#[test]
fn test_adapter_init_resolves_reason() {
let mut ads = make_adapter(ScanType::Passive);
use epics_base_rs::server::records::longin::LonginRecord;
let mut rec = LonginRecord::new(0);
ads.init(&mut rec).unwrap();
assert_eq!(ads.reason, 0); }
#[tokio::test(flavor = "multi_thread", worker_threads = 2)]
async fn ai_linear_eslo_eoff_filled_from_get_bounds_int32() {
let mut ads = make_bounded_adapter(0, 4095, "asynInt32");
use epics_base_rs::server::records::ai::AiRecord;
let mut rec = AiRecord::new(0.0);
rec.eguf = 10.0;
rec.egul = 0.0;
rec.linr = 2; ads.init(&mut rec).unwrap();
let eslo = match rec.get_field("ESLO").unwrap() {
EpicsValue::Double(v) => v,
_ => panic!(),
};
let eoff = match rec.get_field("EOFF").unwrap() {
EpicsValue::Double(v) => v,
_ => panic!(),
};
assert!(
(eslo - 10.0 / 4095.0).abs() < 1e-9,
"ESLO must equal (EGUF-EGUL)/(high-low): got {eslo}"
);
assert!(
eoff.abs() < 1e-9,
"EOFF must equal 0 for symmetric range: got {eoff}"
);
}
#[tokio::test(flavor = "multi_thread", worker_threads = 2)]
async fn ai_linear_eslo_eoff_signed_range() {
let mut ads = make_bounded_adapter(-2048, 2047, "asynInt32");
use epics_base_rs::server::records::ai::AiRecord;
let mut rec = AiRecord::new(0.0);
rec.eguf = 10.0;
rec.egul = -10.0;
rec.linr = 2;
ads.init(&mut rec).unwrap();
let eslo = match rec.get_field("ESLO").unwrap() {
EpicsValue::Double(v) => v,
_ => panic!(),
};
let eoff = match rec.get_field("EOFF").unwrap() {
EpicsValue::Double(v) => v,
_ => panic!(),
};
let denom = (2047 - -2048) as f64;
assert!((eslo - 20.0 / denom).abs() < 1e-9);
let expected_eoff = (2047.0 * -10.0 - -2048.0 * 10.0) / denom;
assert!(
(eoff - expected_eoff).abs() < 1e-9,
"EOFF expected {expected_eoff} got {eoff}"
);
}
#[tokio::test(flavor = "multi_thread", worker_threads = 2)]
async fn ai_linear_skip_when_bounds_equal() {
let mut ads = make_bounded_adapter(0, 0, "asynInt32");
use epics_base_rs::server::records::ai::AiRecord;
let mut rec = AiRecord::new(0.0);
rec.eguf = 10.0;
rec.egul = 0.0;
rec.linr = 2;
rec.eslo = 123.456;
rec.eoff = 42.0;
ads.init(&mut rec).unwrap();
assert!((rec.eslo - 123.456).abs() < 1e-9);
assert!((rec.eoff - 42.0).abs() < 1e-9);
}
#[tokio::test(flavor = "multi_thread", worker_threads = 2)]
async fn longin_skips_linear_wiring() {
let mut ads = make_bounded_adapter(0, 4095, "asynInt32");
use epics_base_rs::server::records::longin::LonginRecord;
let mut rec = LonginRecord::new(0);
ads.init(&mut rec).unwrap();
}
fn intr_entry(v: i32, t_ms: u64) -> CachedInterrupt {
CachedInterrupt {
value: crate::param::ParamValue::Int32(v),
timestamp: SystemTime::UNIX_EPOCH + Duration::from_millis(t_ms),
}
}
#[test]
fn fifo_default_size_matches_c_constant() {
let f = InterruptFifo::new();
assert_eq!(f.ring_size, 10);
assert!(f.entries.is_empty());
assert_eq!(f.overflows, 0);
}
#[test]
fn fifo_push_pop_fifo_order() {
let mut f = InterruptFifo::new();
assert!(f.push_with_overflow(intr_entry(1, 1)));
assert!(f.push_with_overflow(intr_entry(2, 2)));
assert!(f.push_with_overflow(intr_entry(3, 3)));
let popped: Vec<_> = std::iter::from_fn(|| f.pop())
.map(|c| match c.value {
crate::param::ParamValue::Int32(v) => v,
_ => panic!(),
})
.collect();
assert_eq!(popped, vec![1, 2, 3], "FIFO order, not LIFO");
assert_eq!(f.take_overflows(), 0);
}
#[test]
fn fifo_overflow_drops_oldest_and_counts() {
let mut f = InterruptFifo::new();
f.ring_size = 3;
assert!(f.push_with_overflow(intr_entry(1, 1)));
assert!(f.push_with_overflow(intr_entry(2, 2)));
assert!(f.push_with_overflow(intr_entry(3, 3)));
assert!(!f.push_with_overflow(intr_entry(4, 4)));
assert!(!f.push_with_overflow(intr_entry(5, 5)));
assert_eq!(f.overflows, 2);
let popped: Vec<_> = std::iter::from_fn(|| f.pop())
.map(|c| match c.value {
crate::param::ParamValue::Int32(v) => v,
_ => panic!(),
})
.collect();
assert_eq!(popped, vec![3, 4, 5]);
}
#[test]
fn fifo_take_overflows_resets() {
let mut f = InterruptFifo::new();
f.ring_size = 1;
f.push_with_overflow(intr_entry(1, 1));
f.push_with_overflow(intr_entry(2, 2)); f.push_with_overflow(intr_entry(3, 3)); assert_eq!(f.take_overflows(), 2);
assert_eq!(f.take_overflows(), 0);
}
#[test]
fn set_fifo_size_truncates_existing_entries() {
let mut ads = make_adapter(ScanType::IoIntr);
{
let mut g = ads.interrupt_fifo.lock().unwrap();
g.ring_size = 10;
g.push_with_overflow(intr_entry(1, 1));
g.push_with_overflow(intr_entry(2, 2));
g.push_with_overflow(intr_entry(3, 3));
g.push_with_overflow(intr_entry(4, 4));
}
ads.set_fifo_size(2);
let g = ads.interrupt_fifo.lock().unwrap();
assert_eq!(g.entries.len(), 2);
assert_eq!(g.overflows, 2);
}
#[test]
fn apply_record_info_parses_asyn_fifo() {
let mut ads = make_adapter(ScanType::IoIntr);
let mut info = std::collections::HashMap::new();
info.insert("asyn:FIFO".to_string(), "32".to_string());
ads.apply_record_info(&info);
assert_eq!(ads.interrupt_fifo.lock().unwrap().ring_size, 32);
info.insert("asyn:FIFO".to_string(), "garbage".to_string());
ads.apply_record_info(&info);
assert_eq!(
ads.interrupt_fifo.lock().unwrap().ring_size,
32,
"non-numeric must not clobber size"
);
info.insert("asyn:FIFO".to_string(), "0".to_string());
ads.apply_record_info(&info);
assert_eq!(ads.interrupt_fifo.lock().unwrap().ring_size, 32);
}
#[test]
fn compute_mask_shift_matches_c() {
assert_eq!(compute_mask_shift(0x0001), 0);
assert_eq!(compute_mask_shift(0x0002), 1);
assert_eq!(compute_mask_shift(0x0080), 7);
assert_eq!(compute_mask_shift(0x0F00), 8);
assert_eq!(compute_mask_shift(0xFF00), 8);
assert_eq!(compute_mask_shift(0x8000_0000), 31);
assert_eq!(compute_mask_shift(0), 32);
}
#[test]
fn uint32_digital_init_propagates_mask_and_shft_to_record() {
let mut ads = make_adapter(ScanType::Passive);
ads.set_iface_type("asynUInt32Digital");
ads = ads.with_mask(0xFF00);
use epics_base_rs::server::records::mbbi::MbbiRecord;
let mut rec = MbbiRecord::default();
ads.init(&mut rec).unwrap();
assert_eq!(
rec.get_field("MASK"),
Some(EpicsValue::Long(0xFF00)),
"MASK must propagate"
);
assert_eq!(
rec.get_field("SHFT"),
Some(EpicsValue::Short(8)),
"SHFT must equal computeShift(0xFF00) = 8"
);
}
#[test]
fn octet_buffer_picks_up_sizv_from_record() {
let mut ads = make_adapter(ScanType::Passive);
ads.set_iface_type("asynOctet");
assert_eq!(ads.octet_max_size, 256);
use epics_base_rs::server::records::lsi::LsiRecord;
let mut rec = LsiRecord::new("");
ads.init(&mut rec).unwrap();
assert_eq!(ads.octet_max_size, 256);
rec.sizv = 1024;
ads.init(&mut rec).unwrap();
assert_eq!(ads.octet_max_size, 1024);
}
#[tokio::test(flavor = "multi_thread", worker_threads = 2)]
async fn apply_record_info_enables_readback_for_truthy_value() {
let mut ads = make_adapter(ScanType::Passive);
use epics_base_rs::server::records::longin::LonginRecord;
let mut rec = LonginRecord::new(0);
ads.init(&mut rec).unwrap();
assert!(ads.io_intr_receiver().is_none());
let mut info = std::collections::HashMap::new();
info.insert("asyn:READBACK".to_string(), "1".to_string());
ads.apply_record_info(&info);
assert!(
ads.io_intr_receiver().is_some(),
"asyn:READBACK=1 must enable readback path"
);
}
#[tokio::test(flavor = "multi_thread", worker_threads = 2)]
async fn apply_record_info_falsey_values_do_not_enable_readback() {
let mut ads = make_adapter(ScanType::Passive);
use epics_base_rs::server::records::longin::LonginRecord;
let mut rec = LonginRecord::new(0);
ads.init(&mut rec).unwrap();
for falsey in ["0", "no", "NO", "false", "False", ""] {
let mut info = std::collections::HashMap::new();
info.insert("asyn:READBACK".to_string(), falsey.to_string());
ads.apply_record_info(&info);
assert!(
ads.io_intr_receiver().is_none(),
"value {falsey:?} must not enable readback"
);
}
}
#[tokio::test(flavor = "multi_thread", worker_threads = 2)]
async fn apply_record_info_handles_initial_readback_tag() {
let mut ads = make_adapter(ScanType::Passive);
assert!(!ads.initial_readback);
let mut info = std::collections::HashMap::new();
info.insert("asyn:INITIAL_READBACK".to_string(), "1".to_string());
ads.apply_record_info(&info);
assert!(ads.initial_readback, "info tag must enable readback");
info.insert("asyn:INITIAL_READBACK".to_string(), "0".to_string());
ads.apply_record_info(&info);
assert!(!ads.initial_readback, "value '0' must disable readback");
}
#[test]
fn test_adapter_write_read() {
let mut ads = make_adapter(ScanType::Passive);
use epics_base_rs::server::records::longin::LonginRecord;
let mut rec = LonginRecord::new(0);
ads.init(&mut rec).unwrap();
rec.set_val(EpicsValue::Long(42)).unwrap();
ads.write(&mut rec).unwrap();
let mut rec2 = LonginRecord::new(0);
ads.read(&mut rec2).unwrap();
assert_eq!(rec2.val(), Some(EpicsValue::Long(42)));
}
#[test]
fn dtyp_normalize_aai_aao_array_in_out() {
assert_eq!(
normalize_asyn_dtyp("asynFloat64ArrayIn"),
"asynFloat64Array"
);
assert_eq!(
normalize_asyn_dtyp("asynFloat64ArrayOut"),
"asynFloat64Array"
);
assert_eq!(normalize_asyn_dtyp("asynInt32ArrayIn"), "asynInt32Array");
assert_eq!(normalize_asyn_dtyp("asynInt32ArrayOut"), "asynInt32Array");
assert_eq!(normalize_asyn_dtyp("asynInt8ArrayIn"), "asynInt8Array");
assert_eq!(normalize_asyn_dtyp("asynInt16ArrayOut"), "asynInt16Array");
assert_eq!(normalize_asyn_dtyp("asynInt64ArrayIn"), "asynInt64Array");
assert_eq!(
normalize_asyn_dtyp("asynFloat32ArrayOut"),
"asynFloat32Array"
);
}
#[test]
fn dtyp_normalize_preserves_non_array_dtyps() {
assert_eq!(normalize_asyn_dtyp("asynInt32"), "asynInt32");
assert_eq!(normalize_asyn_dtyp("asynFloat64"), "asynFloat64");
assert_eq!(normalize_asyn_dtyp("asynOctetRead"), "asynOctet");
assert_eq!(normalize_asyn_dtyp("asynOctetWrite"), "asynOctet");
}
}