pub mod registry;
pub use registry::{
PortEntry, PortRegistry, asyn_record_factory, get_port, register_asyn_record_type,
register_port,
};
use std::collections::HashMap;
use std::sync::atomic::{AtomicBool, Ordering};
use std::sync::{Arc, Mutex};
use epics_base_rs::error::{CaError, CaResult};
use epics_base_rs::server::database::AsyncDbHandle;
use epics_base_rs::server::record::{FieldDesc, ProcessOutcome, Record};
use epics_base_rs::types::{DbFieldType, EpicsValue};
use crate::error::AsynResult;
use crate::exception::{AsynException, ExceptionCallbackId, ExceptionManager};
use crate::port_handle::PortHandle;
use crate::request::{CancelToken, RequestOp, RequestResult};
use crate::trace::{TraceFile, TraceInfoMask, TraceIoMask, TraceManager, TraceMask};
use crate::user::AsynUser;
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
#[repr(u16)]
enum TransferMode {
WriteRead = 0,
Write = 1,
Read = 2,
Flush = 3,
NoIo = 4,
}
impl TransferMode {
fn from_u16(v: u16) -> Self {
match v {
0 => Self::WriteRead,
1 => Self::Write,
2 => Self::Read,
3 => Self::Flush,
4 => Self::NoIo,
_ => Self::WriteRead,
}
}
}
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
#[repr(u16)]
enum InterfaceType {
Octet = 0,
Int32 = 1,
UInt32Digital = 2,
Float64 = 3,
}
impl InterfaceType {
fn from_u16(v: u16) -> Self {
match v {
0 => Self::Octet,
1 => Self::Int32,
2 => Self::UInt32Digital,
3 => Self::Float64,
_ => Self::Octet,
}
}
}
fn baud_rate_to_menu_index(baud: i32) -> i32 {
match baud {
300 => 1,
600 => 2,
1200 => 3,
2400 => 4,
4800 => 5,
9600 => 6,
19200 => 7,
38400 => 8,
57600 => 9,
115200 => 10,
230400 => 11,
460800 => 12,
576000 => 13,
921600 => 14,
1152000 => 15,
_ => 0, }
}
fn menu_index_to_baud_rate(idx: i32) -> i32 {
match idx {
1 => 300,
2 => 600,
3 => 1200,
4 => 2400,
5 => 4800,
6 => 9600,
7 => 19200,
8 => 38400,
9 => 57600,
10 => 115200,
11 => 230400,
12 => 460800,
13 => 576000,
14 => 921600,
15 => 1152000,
_ => 0,
}
}
const ASYN_FMT_ASCII: i32 = 0;
const ASYN_FMT_HYBRID: i32 = 1;
const ASYN_FMT_BINARY: i32 = 2;
fn translate_escape(s: &str) -> Vec<u8> {
translate_escape_bytes(s.as_bytes())
}
fn translate_escape_bytes(input: &[u8]) -> Vec<u8> {
let mut out = Vec::with_capacity(input.len());
let mut chars = input.iter().copied().peekable();
while let Some(c) = chars.next() {
if c != b'\\' {
out.push(c);
continue;
}
let Some(next) = chars.next() else {
out.push(b'\\');
break;
};
let decoded = match next {
b'r' => 0x0D,
b'n' => 0x0A,
b't' => 0x09,
b'\\' => b'\\',
b'"' => b'"',
b'\'' => b'\'',
b'0'..=b'7' => {
let mut val = u32::from(next - b'0');
for _ in 0..2 {
match chars.peek() {
Some(&d) if (b'0'..=b'7').contains(&d) => {
val = val * 8 + u32::from(d - b'0');
chars.next();
}
_ => break,
}
}
out.push((val & 0xFF) as u8);
continue;
}
b'a' => 0x07,
b'b' => 0x08,
b'f' => 0x0C,
b'v' => 0x0B,
other => {
out.push(b'\\');
out.push(other);
continue;
}
};
out.push(decoded);
}
out
}
fn open_trace_file(tfil: &str) -> std::io::Result<TraceFile> {
match tfil {
"" | "<stdout>" => Ok(TraceFile::Stdout),
"<stderr>" => Ok(TraceFile::Stderr),
"<errlog>" => Ok(TraceFile::Errlog),
path => std::fs::OpenOptions::new()
.append(true)
.create(true)
.open(path)
.map(|f| TraceFile::File(Arc::new(std::sync::Mutex::new(f)))),
}
}
pub struct AsynRecord {
pub port: String,
pub addr: i32,
pub pcnct: i32, pub drvinfo: String,
pub reason: i32,
pub tmod: i32, pub tmot: f64, pub iface: i32, pub octetiv: i32, pub optioniv: i32, pub gpibiv: i32, pub i32iv: i32, pub ui32iv: i32, pub f64iv: i32,
pub aout: String,
pub oeos: String,
pub bout: Vec<u8>,
pub omax: i32,
pub nowt: i32,
pub nawt: i32,
pub ofmt: i32,
pub ainp: String,
pub tinp: String,
pub ieos: String,
pub binp: Vec<u8>,
pub imax: i32,
pub nrrd: i32,
pub nord: i32,
pub ifmt: i32, pub eomr: i32,
pub i32inp: i32,
pub i32out: i32,
pub ui32inp: u32,
pub ui32out: u32,
pub ui32mask: u32,
pub f64inp: f64,
pub f64out: f64,
pub baud: i32,
pub lbaud: i32,
pub prty: i32,
pub dbit: i32,
pub sbit: i32,
pub mctl: i32,
pub fctl: i32,
pub ixon: i32,
pub ixoff: i32,
pub ixany: i32,
pub hostinfo: String,
pub drto: i32,
pub ucmd: i32,
pub acmd: i32,
pub spr: i32,
pub tmsk: i32,
pub tb0: i32,
pub tb1: i32,
pub tb2: i32,
pub tb3: i32,
pub tb4: i32,
pub tb5: i32,
pub tiom: i32,
pub tib0: i32,
pub tib1: i32,
pub tib2: i32,
pub tinm: i32,
pub tinb0: i32,
pub tinb1: i32,
pub tinb2: i32,
pub tinb3: i32,
pub tsiz: i32,
pub tfil: String,
pub auct: i32, pub cnct: i32, pub enbl: i32,
pub val: i32,
pub errs: String,
pub aqr: i32,
port_entry: Option<PortEntry>,
resolved_reason: usize,
async_ctx: Option<(String, AsyncDbHandle)>,
io_inflight: Option<IoInFlight>,
trace_status_dirty: Arc<AtomicBool>,
trace_except_cb: Option<(Arc<ExceptionManager>, ExceptionCallbackId)>,
}
impl Default for AsynRecord {
fn default() -> Self {
Self {
port: String::new(),
addr: 0,
pcnct: 0,
drvinfo: String::new(),
reason: 0,
tmod: 0,
tmot: 1.0,
iface: 0,
octetiv: 0,
optioniv: 0,
gpibiv: 0,
i32iv: 0,
ui32iv: 0,
f64iv: 0,
aout: String::new(),
oeos: String::new(),
bout: Vec::new(),
omax: 80,
nowt: 80,
nawt: 0,
ofmt: 0,
ainp: String::new(),
tinp: String::new(),
ieos: String::new(),
binp: Vec::new(),
imax: 80,
nrrd: 0,
nord: 0,
ifmt: 0,
eomr: 0,
i32inp: 0,
i32out: 0,
ui32inp: 0,
ui32out: 0,
ui32mask: 0xFFFFFFFF,
f64inp: 0.0,
f64out: 0.0,
baud: 0,
lbaud: 0,
prty: 0,
dbit: 0,
sbit: 0,
mctl: 0,
fctl: 0,
ixon: 0,
ixoff: 0,
ixany: 0,
hostinfo: String::new(),
drto: 0,
ucmd: 0,
acmd: 0,
spr: 0,
tmsk: 0,
tb0: 0,
tb1: 0,
tb2: 0,
tb3: 0,
tb4: 0,
tb5: 0,
tiom: 0,
tib0: 0,
tib1: 0,
tib2: 0,
tinm: 0,
tinb0: 0,
tinb1: 0,
tinb2: 0,
tinb3: 0,
tsiz: 80,
tfil: String::new(),
auct: 1,
cnct: 0,
enbl: 1,
val: 0,
errs: String::new(),
aqr: 0,
port_entry: None,
resolved_reason: 0,
async_ctx: None,
io_inflight: None,
trace_status_dirty: Arc::new(AtomicBool::new(false)),
trace_except_cb: None,
}
}
}
struct IoInFlight {
cancel: CancelToken,
result: Arc<Mutex<Option<IoOutcome>>>,
}
struct IoPlan {
tmod: TransferMode,
iface: InterfaceType,
reason: usize,
addr: i32,
timeout: std::time::Duration,
octet_out: Vec<u8>,
octet_out_len: usize,
ofmt: i32,
i32out: i32,
ui32out: u32,
ui32mask: u32,
f64out: f64,
octet_buf_size: usize,
ifmt: i32,
}
#[derive(Default)]
struct IoOutcome {
nawt: Option<i32>,
eomr: Option<i32>,
nord: Option<i32>,
tinp: Option<String>,
ainp: Option<String>,
binp: Option<Vec<u8>>,
i32inp: Option<i32>,
ui32inp: Option<u32>,
f64inp: Option<f64>,
errs: Option<String>,
}
fn io_user(plan: &IoPlan) -> AsynUser {
AsynUser::new(plan.reason)
.with_addr(plan.addr)
.with_timeout(plan.timeout)
}
fn flush_user(plan: &IoPlan) -> AsynUser {
AsynUser::new(plan.reason).with_addr(plan.addr)
}
fn io_write_op(plan: &IoPlan) -> RequestOp {
match plan.iface {
InterfaceType::Octet => {
if plan.ofmt == ASYN_FMT_BINARY {
RequestOp::OctetWriteBinary {
data: plan.octet_out.clone(),
}
} else {
RequestOp::OctetWrite {
data: plan.octet_out.clone(),
}
}
}
InterfaceType::Int32 => RequestOp::Int32Write { value: plan.i32out },
InterfaceType::UInt32Digital => RequestOp::UInt32DigitalWrite {
value: plan.ui32out,
mask: plan.ui32mask,
},
InterfaceType::Float64 => RequestOp::Float64Write { value: plan.f64out },
}
}
fn io_read_op(plan: &IoPlan) -> RequestOp {
match plan.iface {
InterfaceType::Octet => {
if plan.ifmt == ASYN_FMT_BINARY {
RequestOp::OctetReadBinary {
buf_size: plan.octet_buf_size,
}
} else {
RequestOp::OctetRead {
buf_size: plan.octet_buf_size,
}
}
}
InterfaceType::Int32 => RequestOp::Int32Read,
InterfaceType::UInt32Digital => RequestOp::UInt32DigitalRead {
mask: plan.ui32mask,
},
InterfaceType::Float64 => RequestOp::Float64Read,
}
}
fn record_write_result(plan: &IoPlan, out: &mut IoOutcome, res: AsynResult<RequestResult>) {
match res {
Ok(_) => {
if plan.iface == InterfaceType::Octet {
out.nawt = Some(plan.octet_out_len as i32);
}
}
Err(e) => out.errs = Some(format!("write: {e}")),
}
}
fn record_read_result(plan: &IoPlan, out: &mut IoOutcome, res: AsynResult<RequestResult>) {
match plan.iface {
InterfaceType::Octet => match res {
Ok(result) => {
out.eomr = Some(result.eom_reason as i32);
if let Some(data) = result.data {
out.nord = Some(data.len() as i32);
out.tinp = Some(crate::trace::format_io_data(&data, TraceIoMask::ESCAPE));
if plan.ifmt == ASYN_FMT_ASCII {
out.ainp = Some(String::from_utf8_lossy(&data).to_string());
} else {
out.binp = Some(data);
}
}
}
Err(e) => out.errs = Some(format!("read: {e}")),
},
InterfaceType::Int32 => match res {
Ok(result) => match result.int_val {
Some(v) => out.i32inp = Some(v),
None => out.errs = Some("read: int32 read returned no value".to_string()),
},
Err(e) => out.errs = Some(format!("read: {e}")),
},
InterfaceType::UInt32Digital => match res {
Ok(result) => {
if let Some(v) = result.uint_val {
out.ui32inp = Some(v);
}
}
Err(e) => out.errs = Some(format!("read: {e}")),
},
InterfaceType::Float64 => match res {
Ok(result) => match result.float_val {
Some(v) => out.f64inp = Some(v),
None => out.errs = Some("read: float64 read returned no value".to_string()),
},
Err(e) => out.errs = Some(format!("read: {e}")),
},
}
}
async fn run_io_plan(handle: PortHandle, plan: IoPlan, cancel: CancelToken) -> IoOutcome {
let mut out = IoOutcome::default();
if cancel.is_cancelled() {
out.errs = Some(CANCELED_MSG.to_string());
return out;
}
if matches!(plan.tmod, TransferMode::Write | TransferMode::WriteRead) {
let res = handle
.submit_cancellable(io_write_op(&plan), io_user(&plan), cancel.clone())
.await;
if cancel.is_cancelled() {
out.errs = Some(CANCELED_MSG.to_string());
return out;
}
record_write_result(&plan, &mut out, res);
}
if matches!(plan.tmod, TransferMode::Read | TransferMode::WriteRead) {
let res = handle
.submit_cancellable(io_read_op(&plan), io_user(&plan), cancel.clone())
.await;
if cancel.is_cancelled() {
out.errs = Some(CANCELED_MSG.to_string());
return out;
}
record_read_result(&plan, &mut out, res);
}
if matches!(plan.tmod, TransferMode::Flush) {
let res = handle
.submit_cancellable(RequestOp::Flush, flush_user(&plan), cancel.clone())
.await;
if cancel.is_cancelled() {
out.errs = Some(CANCELED_MSG.to_string());
return out;
}
if let Err(e) = res {
out.errs = Some(format!("flush: {e}"));
}
}
out
}
const CANCELED_MSG: &str = "I/O request canceled";
fn trace_readback_fields(
trace_mask: u32,
io_mask: u32,
info_mask: u32,
) -> Vec<(String, EpicsValue)> {
let bit = |mask: u32, flag: u32| EpicsValue::Short(i16::from(mask & flag != 0));
vec![
("TMSK".to_string(), EpicsValue::Long(trace_mask as i32)),
("TB0".to_string(), bit(trace_mask, TraceMask::ERROR.bits())),
(
"TB1".to_string(),
bit(trace_mask, TraceMask::IO_DEVICE.bits()),
),
(
"TB2".to_string(),
bit(trace_mask, TraceMask::IO_FILTER.bits()),
),
(
"TB3".to_string(),
bit(trace_mask, TraceMask::IO_DRIVER.bits()),
),
("TB4".to_string(), bit(trace_mask, TraceMask::FLOW.bits())),
(
"TB5".to_string(),
bit(trace_mask, TraceMask::WARNING.bits()),
),
("TIOM".to_string(), EpicsValue::Long(io_mask as i32)),
("TIB0".to_string(), bit(io_mask, TraceIoMask::ASCII.bits())),
("TIB1".to_string(), bit(io_mask, TraceIoMask::ESCAPE.bits())),
("TIB2".to_string(), bit(io_mask, TraceIoMask::HEX.bits())),
("TINM".to_string(), EpicsValue::Long(info_mask as i32)),
(
"TINB0".to_string(),
bit(info_mask, TraceInfoMask::TIME.bits()),
),
(
"TINB1".to_string(),
bit(info_mask, TraceInfoMask::PORT.bits()),
),
(
"TINB2".to_string(),
bit(info_mask, TraceInfoMask::SOURCE.bits()),
),
(
"TINB3".to_string(),
bit(info_mask, TraceInfoMask::THREAD.bits()),
),
]
}
static FIELD_LIST: &[FieldDesc] = &[
FieldDesc {
name: "PORT",
dbf_type: DbFieldType::String,
read_only: false,
},
FieldDesc {
name: "ADDR",
dbf_type: DbFieldType::Long,
read_only: false,
},
FieldDesc {
name: "PCNCT",
dbf_type: DbFieldType::Short,
read_only: false,
},
FieldDesc {
name: "DRVINFO",
dbf_type: DbFieldType::String,
read_only: false,
},
FieldDesc {
name: "REASON",
dbf_type: DbFieldType::Long,
read_only: false,
},
FieldDesc {
name: "TMOD",
dbf_type: DbFieldType::Short,
read_only: false,
},
FieldDesc {
name: "TMOT",
dbf_type: DbFieldType::Double,
read_only: false,
},
FieldDesc {
name: "IFACE",
dbf_type: DbFieldType::Short,
read_only: false,
},
FieldDesc {
name: "OCTETIV",
dbf_type: DbFieldType::Long,
read_only: true,
},
FieldDesc {
name: "OPTIONIV",
dbf_type: DbFieldType::Long,
read_only: true,
},
FieldDesc {
name: "GPIBIV",
dbf_type: DbFieldType::Long,
read_only: true,
},
FieldDesc {
name: "I32IV",
dbf_type: DbFieldType::Long,
read_only: true,
},
FieldDesc {
name: "UI32IV",
dbf_type: DbFieldType::Long,
read_only: true,
},
FieldDesc {
name: "F64IV",
dbf_type: DbFieldType::Long,
read_only: true,
},
FieldDesc {
name: "AOUT",
dbf_type: DbFieldType::String,
read_only: false,
},
FieldDesc {
name: "OEOS",
dbf_type: DbFieldType::String,
read_only: false,
},
FieldDesc {
name: "BOUT",
dbf_type: DbFieldType::Char,
read_only: false,
},
FieldDesc {
name: "OMAX",
dbf_type: DbFieldType::Long,
read_only: true,
},
FieldDesc {
name: "NOWT",
dbf_type: DbFieldType::Long,
read_only: false,
},
FieldDesc {
name: "NAWT",
dbf_type: DbFieldType::Long,
read_only: true,
},
FieldDesc {
name: "OFMT",
dbf_type: DbFieldType::Short,
read_only: false,
},
FieldDesc {
name: "AINP",
dbf_type: DbFieldType::String,
read_only: true,
},
FieldDesc {
name: "TINP",
dbf_type: DbFieldType::String,
read_only: true,
},
FieldDesc {
name: "IEOS",
dbf_type: DbFieldType::String,
read_only: false,
},
FieldDesc {
name: "BINP",
dbf_type: DbFieldType::Char,
read_only: true,
},
FieldDesc {
name: "IMAX",
dbf_type: DbFieldType::Long,
read_only: true,
},
FieldDesc {
name: "NRRD",
dbf_type: DbFieldType::Long,
read_only: false,
},
FieldDesc {
name: "NORD",
dbf_type: DbFieldType::Long,
read_only: true,
},
FieldDesc {
name: "IFMT",
dbf_type: DbFieldType::Short,
read_only: false,
},
FieldDesc {
name: "EOMR",
dbf_type: DbFieldType::Short,
read_only: true,
},
FieldDesc {
name: "I32INP",
dbf_type: DbFieldType::Long,
read_only: true,
},
FieldDesc {
name: "I32OUT",
dbf_type: DbFieldType::Long,
read_only: false,
},
FieldDesc {
name: "UI32INP",
dbf_type: DbFieldType::Long,
read_only: true,
},
FieldDesc {
name: "UI32OUT",
dbf_type: DbFieldType::Long,
read_only: false,
},
FieldDesc {
name: "UI32MASK",
dbf_type: DbFieldType::Long,
read_only: false,
},
FieldDesc {
name: "F64INP",
dbf_type: DbFieldType::Double,
read_only: true,
},
FieldDesc {
name: "F64OUT",
dbf_type: DbFieldType::Double,
read_only: false,
},
FieldDesc {
name: "BAUD",
dbf_type: DbFieldType::Short,
read_only: false,
},
FieldDesc {
name: "LBAUD",
dbf_type: DbFieldType::Long,
read_only: false,
},
FieldDesc {
name: "PRTY",
dbf_type: DbFieldType::Short,
read_only: false,
},
FieldDesc {
name: "DBIT",
dbf_type: DbFieldType::Short,
read_only: false,
},
FieldDesc {
name: "SBIT",
dbf_type: DbFieldType::Short,
read_only: false,
},
FieldDesc {
name: "MCTL",
dbf_type: DbFieldType::Short,
read_only: false,
},
FieldDesc {
name: "FCTL",
dbf_type: DbFieldType::Short,
read_only: false,
},
FieldDesc {
name: "IXON",
dbf_type: DbFieldType::Short,
read_only: false,
},
FieldDesc {
name: "IXOFF",
dbf_type: DbFieldType::Short,
read_only: false,
},
FieldDesc {
name: "IXANY",
dbf_type: DbFieldType::Short,
read_only: false,
},
FieldDesc {
name: "HOSTINFO",
dbf_type: DbFieldType::String,
read_only: false,
},
FieldDesc {
name: "DRTO",
dbf_type: DbFieldType::Short,
read_only: false,
},
FieldDesc {
name: "UCMD",
dbf_type: DbFieldType::Short,
read_only: false,
},
FieldDesc {
name: "ACMD",
dbf_type: DbFieldType::Short,
read_only: false,
},
FieldDesc {
name: "SPR",
dbf_type: DbFieldType::Char,
read_only: true,
},
FieldDesc {
name: "TMSK",
dbf_type: DbFieldType::Long,
read_only: false,
},
FieldDesc {
name: "TB0",
dbf_type: DbFieldType::Short,
read_only: false,
},
FieldDesc {
name: "TB1",
dbf_type: DbFieldType::Short,
read_only: false,
},
FieldDesc {
name: "TB2",
dbf_type: DbFieldType::Short,
read_only: false,
},
FieldDesc {
name: "TB3",
dbf_type: DbFieldType::Short,
read_only: false,
},
FieldDesc {
name: "TB4",
dbf_type: DbFieldType::Short,
read_only: false,
},
FieldDesc {
name: "TB5",
dbf_type: DbFieldType::Short,
read_only: false,
},
FieldDesc {
name: "TIOM",
dbf_type: DbFieldType::Long,
read_only: false,
},
FieldDesc {
name: "TIB0",
dbf_type: DbFieldType::Short,
read_only: false,
},
FieldDesc {
name: "TIB1",
dbf_type: DbFieldType::Short,
read_only: false,
},
FieldDesc {
name: "TIB2",
dbf_type: DbFieldType::Short,
read_only: false,
},
FieldDesc {
name: "TINM",
dbf_type: DbFieldType::Long,
read_only: false,
},
FieldDesc {
name: "TINB0",
dbf_type: DbFieldType::Short,
read_only: false,
},
FieldDesc {
name: "TINB1",
dbf_type: DbFieldType::Short,
read_only: false,
},
FieldDesc {
name: "TINB2",
dbf_type: DbFieldType::Short,
read_only: false,
},
FieldDesc {
name: "TINB3",
dbf_type: DbFieldType::Short,
read_only: false,
},
FieldDesc {
name: "TSIZ",
dbf_type: DbFieldType::Long,
read_only: false,
},
FieldDesc {
name: "TFIL",
dbf_type: DbFieldType::String,
read_only: false,
},
FieldDesc {
name: "AUCT",
dbf_type: DbFieldType::Short,
read_only: false,
},
FieldDesc {
name: "CNCT",
dbf_type: DbFieldType::Short,
read_only: false,
},
FieldDesc {
name: "ENBL",
dbf_type: DbFieldType::Short,
read_only: false,
},
FieldDesc {
name: "VAL",
dbf_type: DbFieldType::Long,
read_only: false,
},
FieldDesc {
name: "ERRS",
dbf_type: DbFieldType::String,
read_only: true,
},
FieldDesc {
name: "AQR",
dbf_type: DbFieldType::Char,
read_only: false,
},
];
impl AsynRecord {
fn update_trace_bits_from_mask(&mut self) {
let mask = self.tmsk as u32;
self.tb0 = if mask & TraceMask::ERROR.bits() != 0 {
1
} else {
0
};
self.tb1 = if mask & TraceMask::IO_DEVICE.bits() != 0 {
1
} else {
0
};
self.tb2 = if mask & TraceMask::IO_FILTER.bits() != 0 {
1
} else {
0
};
self.tb3 = if mask & TraceMask::IO_DRIVER.bits() != 0 {
1
} else {
0
};
self.tb4 = if mask & TraceMask::FLOW.bits() != 0 {
1
} else {
0
};
self.tb5 = if mask & TraceMask::WARNING.bits() != 0 {
1
} else {
0
};
}
fn update_mask_from_trace_bits(&mut self) {
let mut mask: u32 = 0;
if self.tb0 != 0 {
mask |= TraceMask::ERROR.bits();
}
if self.tb1 != 0 {
mask |= TraceMask::IO_DEVICE.bits();
}
if self.tb2 != 0 {
mask |= TraceMask::IO_FILTER.bits();
}
if self.tb3 != 0 {
mask |= TraceMask::IO_DRIVER.bits();
}
if self.tb4 != 0 {
mask |= TraceMask::FLOW.bits();
}
if self.tb5 != 0 {
mask |= TraceMask::WARNING.bits();
}
self.tmsk = mask as i32;
}
fn update_io_bits_from_mask(&mut self) {
let mask = self.tiom as u32;
self.tib0 = if mask & TraceIoMask::ASCII.bits() != 0 {
1
} else {
0
};
self.tib1 = if mask & TraceIoMask::ESCAPE.bits() != 0 {
1
} else {
0
};
self.tib2 = if mask & TraceIoMask::HEX.bits() != 0 {
1
} else {
0
};
}
fn update_mask_from_io_bits(&mut self) {
let mut mask: u32 = 0;
if self.tib0 != 0 {
mask |= TraceIoMask::ASCII.bits();
}
if self.tib1 != 0 {
mask |= TraceIoMask::ESCAPE.bits();
}
if self.tib2 != 0 {
mask |= TraceIoMask::HEX.bits();
}
self.tiom = mask as i32;
}
fn update_info_bits_from_mask(&mut self) {
let mask = self.tinm as u32;
self.tinb0 = if mask & TraceInfoMask::TIME.bits() != 0 {
1
} else {
0
};
self.tinb1 = if mask & TraceInfoMask::PORT.bits() != 0 {
1
} else {
0
};
self.tinb2 = if mask & TraceInfoMask::SOURCE.bits() != 0 {
1
} else {
0
};
self.tinb3 = if mask & TraceInfoMask::THREAD.bits() != 0 {
1
} else {
0
};
}
fn update_mask_from_info_bits(&mut self) {
let mut mask: u32 = 0;
if self.tinb0 != 0 {
mask |= TraceInfoMask::TIME.bits();
}
if self.tinb1 != 0 {
mask |= TraceInfoMask::PORT.bits();
}
if self.tinb2 != 0 {
mask |= TraceInfoMask::SOURCE.bits();
}
if self.tinb3 != 0 {
mask |= TraceInfoMask::THREAD.bits();
}
self.tinm = mask as i32;
}
fn trace_addr_target(&self) -> Option<i32> {
match self.port_entry {
Some(ref entry) if self.addr >= 0 && entry.handle.is_multi_device() => Some(self.addr),
_ => None,
}
}
fn apply_trace_mask(&self) {
if let Some(ref entry) = self.port_entry {
let mask = TraceMask::from_bits_truncate(self.tmsk as u32);
match self.trace_addr_target() {
Some(addr) => entry.trace.set_device_trace_mask(&self.port, addr, mask),
None => entry.trace.set_trace_mask(Some(&self.port), mask),
}
}
}
fn apply_trace_io_mask(&self) {
if let Some(ref entry) = self.port_entry {
let mask = TraceIoMask::from_bits_truncate(self.tiom as u32);
match self.trace_addr_target() {
Some(addr) => entry.trace.set_device_trace_io_mask(&self.port, addr, mask),
None => entry.trace.set_trace_io_mask(Some(&self.port), mask),
}
}
}
fn apply_trace_info_mask(&self) {
if let Some(ref entry) = self.port_entry {
let mask = TraceInfoMask::from_bits_truncate(self.tinm as u32);
match self.trace_addr_target() {
Some(addr) => entry
.trace
.set_device_trace_info_mask(&self.port, addr, mask),
None => entry.trace.set_trace_info_mask(Some(&self.port), mask),
}
}
}
fn apply_trace_truncate_size(&self) {
if let Some(ref entry) = self.port_entry {
let size = self.tsiz as usize;
match self.trace_addr_target() {
Some(addr) => entry
.trace
.set_device_io_truncate_size(&self.port, addr, size),
None => entry.trace.set_io_truncate_size(Some(&self.port), size),
}
}
}
fn apply_trace_file(&mut self) {
let Some(entry) = self.port_entry.clone() else {
return;
};
let tfil = self.tfil.clone();
let file = match open_trace_file(&tfil) {
Ok(file) => file,
Err(e) => {
self.errs = format!("Error opening trace file: {tfil}: {e}");
return;
}
};
match self.trace_addr_target() {
Some(addr) => entry.trace.set_device_trace_file(&self.port, addr, file),
None => entry.trace.set_trace_file(Some(&self.port), file),
}
}
fn read_trace_state(&mut self) {
let (trace_mask, io_mask, info_mask) = match self.port_entry {
Some(ref entry) => {
let port = &self.port;
(
entry.trace.get_trace_mask(Some(port)).bits(),
entry.trace.get_trace_io_mask(Some(port)).bits(),
entry.trace.get_trace_info_mask(Some(port)).bits(),
)
}
None => return,
};
self.tmsk = trace_mask as i32;
self.update_trace_bits_from_mask();
self.tiom = io_mask as i32;
self.update_io_bits_from_mask();
self.tinm = info_mask as i32;
self.update_info_bits_from_mask();
}
fn register_trace_exception_callback(&mut self) {
self.clear_trace_exception_callback();
let Some(ref entry) = self.port_entry else {
return;
};
let Some(mgr) = entry.trace.exception_manager() else {
return;
};
let port = self.port.clone();
let trace = entry.trace.clone();
let dirty = Arc::clone(&self.trace_status_dirty);
let immediate = match (
self.async_ctx.clone(),
tokio::runtime::Handle::try_current().ok(),
) {
(Some((name, db)), Some(handle)) => Some((name, db, handle)),
_ => None,
};
let last_posted: Arc<Mutex<HashMap<String, EpicsValue>>> = Arc::new(Mutex::new(
trace_readback_fields(
trace.get_trace_mask(Some(&port)).bits(),
trace.get_trace_io_mask(Some(&port)).bits(),
trace.get_trace_info_mask(Some(&port)).bits(),
)
.into_iter()
.collect(),
));
let id = mgr.add_callback(move |ev| {
if ev.port_name != port
|| !matches!(
ev.exception,
AsynException::TraceMask
| AsynException::TraceIoMask
| AsynException::TraceInfoMask
)
{
return;
}
match &immediate {
Some((name, db, handle)) => {
let changed: Vec<(String, EpicsValue)> = {
let mut cache = last_posted.lock().unwrap();
let mut changed = Vec::new();
for (field, value) in trace_readback_fields(
trace.get_trace_mask(Some(&port)).bits(),
trace.get_trace_io_mask(Some(&port)).bits(),
trace.get_trace_info_mask(Some(&port)).bits(),
) {
if cache.get(&field) != Some(&value) {
cache.insert(field.clone(), value.clone());
changed.push((field, value));
}
}
changed
};
if changed.is_empty() {
return;
}
let (name, db) = (name.clone(), db.clone());
handle.spawn(async move {
let _ = db.post_fields(&name, changed).await;
});
}
None => {
dirty.store(true, Ordering::Release);
}
}
});
self.trace_except_cb = Some((mgr, id));
}
fn clear_trace_exception_callback(&mut self) {
if let Some((mgr, id)) = self.trace_except_cb.take() {
mgr.remove_callback(id);
}
}
fn read_options_from_driver(&mut self, handle: &PortHandle) {
if let Ok(val) = handle.get_option_blocking("baud") {
self.lbaud = val.parse::<i32>().unwrap_or(0);
self.baud = baud_rate_to_menu_index(self.lbaud);
}
if let Ok(val) = handle.get_option_blocking("parity") {
self.prty = match val.as_str() {
"none" => 1,
"even" => 2,
"odd" => 3,
_ => 0, };
}
if let Ok(val) = handle.get_option_blocking("csize") {
self.dbit = match val.as_str() {
"5" => 1,
"6" => 2,
"7" => 3,
"8" => 4,
_ => 0,
};
}
if let Ok(val) = handle.get_option_blocking("stop") {
self.sbit = match val.as_str() {
"1" => 1,
"2" => 2,
_ => 0,
};
}
if let Ok(val) = handle.get_option_blocking("crtscts") {
self.fctl = match val.as_str() {
"Y" | "Yes" => 2, "N" | "No" | "none" => 1, _ => 0,
};
}
if let Ok(val) = handle.get_option_blocking("clocal") {
self.mctl = match val.as_str() {
"Y" | "Yes" => 1, "N" | "No" => 2, _ => 0,
};
}
if let Ok(val) = handle.get_option_blocking("ixon") {
self.ixon = match val.as_str() {
"Y" | "Yes" => 2,
"N" | "No" => 1,
_ => 0,
};
}
if let Ok(val) = handle.get_option_blocking("ixoff") {
self.ixoff = match val.as_str() {
"Y" | "Yes" => 2,
"N" | "No" => 1,
_ => 0,
};
}
if let Ok(val) = handle.get_option_blocking("ixany") {
self.ixany = match val.as_str() {
"Y" | "Yes" => 2,
"N" | "No" => 1,
_ => 0,
};
}
if let Ok(val) = handle.get_option_blocking("hostinfo") {
self.hostinfo = val;
}
if let Ok(val) = handle.get_option_blocking("disconnectOnReadTimeout") {
self.drto = match val.as_str() {
"Y" | "Yes" => 2,
"N" | "No" => 1,
_ => 0,
};
}
}
fn write_option(&mut self, key: &str, value: &str) {
if let Some(ref entry) = self.port_entry {
if let Err(e) = entry.handle.set_option_blocking(key, value) {
self.errs = format!("set_option({key}): {e}");
}
}
}
fn connect_device(&mut self) {
if self.port.is_empty() {
self.pcnct = 0;
self.cnct = 0;
self.port_entry = None;
self.clear_trace_exception_callback();
return;
}
match registry::get_port(&self.port) {
Some(entry) => {
if !self.drvinfo.is_empty() {
match entry.handle.drv_user_create_blocking(&self.drvinfo) {
Ok(r) => {
self.resolved_reason = r;
self.reason = r as i32;
}
Err(e) => {
self.errs = format!("drvUserCreate failed: {e}");
self.resolved_reason = 0;
}
}
} else {
self.resolved_reason = self.reason as usize;
}
self.octetiv = 1;
self.i32iv = 1;
self.ui32iv = 1;
self.f64iv = 1;
self.optioniv = 1;
self.gpibiv = 0;
self.port_entry = Some(entry.clone());
self.read_trace_state();
self.register_trace_exception_callback();
self.read_options_from_driver(&entry.handle);
self.pcnct = 1;
self.cnct = 1;
if let Ok(enabled) = entry.handle.is_enabled_blocking() {
self.enbl = i32::from(enabled);
}
if let Ok(auto) = entry.handle.is_auto_connect_blocking() {
self.auct = i32::from(auto);
}
if self.errs.is_empty() || self.resolved_reason != 0 || self.drvinfo.is_empty() {
self.errs.clear();
}
}
None => {
self.errs = format!("port '{}' not found", self.port);
self.pcnct = 0;
self.cnct = 0;
self.port_entry = None;
self.clear_trace_exception_callback();
}
}
}
fn octet_output_buffer(&self) -> Vec<u8> {
match self.ofmt {
ASYN_FMT_BINARY => {
let omax = self.omax.max(0) as usize;
let nowt = (self.nowt.max(0) as usize).min(omax);
self.bout[..nowt.min(self.bout.len())].to_vec()
}
ASYN_FMT_HYBRID => {
let end = self
.bout
.iter()
.position(|&b| b == 0)
.unwrap_or(self.bout.len());
translate_escape_bytes(&self.bout[..end])
}
_ => translate_escape(&self.aout),
}
}
fn build_io_plan(&self) -> IoPlan {
let octet_out = self.octet_output_buffer();
let octet_out_len = octet_out.len();
let imax = self.imax.max(0) as usize;
let octet_buf_size = if self.nrrd > 0 {
(self.nrrd as usize).min(imax)
} else {
imax
};
let timeout = if self.tmot > 0.0 {
std::time::Duration::from_secs_f64(self.tmot)
} else {
std::time::Duration::from_secs(1)
};
IoPlan {
tmod: TransferMode::from_u16(self.tmod as u16),
iface: InterfaceType::from_u16(self.iface as u16),
reason: self.resolved_reason,
addr: self.addr,
timeout,
octet_out,
octet_out_len,
ofmt: self.ofmt,
i32out: self.i32out,
ui32out: self.ui32out,
ui32mask: self.ui32mask,
f64out: self.f64out,
octet_buf_size,
ifmt: self.ifmt,
}
}
fn apply_io_outcome(&mut self, out: IoOutcome) {
if let Some(v) = out.nawt {
self.nawt = v;
}
if let Some(v) = out.eomr {
self.eomr = v;
}
if let Some(v) = out.nord {
self.nord = v;
}
if let Some(v) = out.tinp {
self.tinp = v;
}
if let Some(v) = out.ainp {
self.ainp = v;
}
if let Some(v) = out.binp {
self.binp = v;
}
if let Some(v) = out.i32inp {
self.i32inp = v;
}
if let Some(v) = out.ui32inp {
self.ui32inp = v;
}
if let Some(v) = out.f64inp {
self.f64inp = v;
}
if let Some(v) = out.errs {
self.errs = v;
}
}
fn perform_io(&mut self) -> CaResult<()> {
let entry = match &self.port_entry {
Some(e) => e.clone(),
None => {
self.errs = "not connected".to_string();
return Ok(());
}
};
let plan = self.build_io_plan();
let mut out = IoOutcome::default();
if matches!(plan.tmod, TransferMode::Write | TransferMode::WriteRead) {
let res = entry
.handle
.submit_blocking(io_write_op(&plan), io_user(&plan));
record_write_result(&plan, &mut out, res);
}
if matches!(plan.tmod, TransferMode::Read | TransferMode::WriteRead) {
let res = entry
.handle
.submit_blocking(io_read_op(&plan), io_user(&plan));
record_read_result(&plan, &mut out, res);
}
if matches!(plan.tmod, TransferMode::Flush) {
if let Err(e) = entry
.handle
.submit_blocking(RequestOp::Flush, flush_user(&plan))
{
out.errs = Some(format!("flush: {e}"));
}
}
self.apply_io_outcome(out);
Ok(())
}
fn spawn_async_io(
&mut self,
handle: PortHandle,
name: String,
db: AsyncDbHandle,
) -> ProcessOutcome {
let plan = self.build_io_plan();
let cancel = CancelToken::new();
let slot: Arc<Mutex<Option<IoOutcome>>> = Arc::new(Mutex::new(None));
let cancel_task = cancel.clone();
let slot_task = slot.clone();
tokio::spawn(async move {
let outcome = run_io_plan(handle, plan, cancel_task).await;
*slot_task.lock().unwrap() = Some(outcome);
if let Some(token) = db.mint_async_token(&name).await {
let (waitset, completion) = AsyncDbHandle::new_put_notify();
waitset.leave();
let _ = db.reprocess_on_notify(token, completion);
}
});
self.io_inflight = Some(IoInFlight {
cancel,
result: slot,
});
ProcessOutcome::async_pending()
}
}
impl Record for AsynRecord {
fn record_type(&self) -> &'static str {
"asyn"
}
fn field_list(&self) -> &'static [FieldDesc] {
FIELD_LIST
}
fn set_async_context(&mut self, name: String, db: AsyncDbHandle) {
self.async_ctx = Some((name, db));
}
fn get_field(&self, name: &str) -> Option<EpicsValue> {
match name {
"PORT" => Some(EpicsValue::String(self.port.clone().into())),
"ADDR" => Some(EpicsValue::Long(self.addr)),
"PCNCT" => Some(EpicsValue::Short(self.pcnct as i16)),
"DRVINFO" => Some(EpicsValue::String(self.drvinfo.clone().into())),
"REASON" => Some(EpicsValue::Long(self.reason)),
"TMOD" => Some(EpicsValue::Short(self.tmod as i16)),
"TMOT" => Some(EpicsValue::Double(self.tmot)),
"IFACE" => Some(EpicsValue::Short(self.iface as i16)),
"OCTETIV" => Some(EpicsValue::Long(self.octetiv)),
"OPTIONIV" => Some(EpicsValue::Long(self.optioniv)),
"GPIBIV" => Some(EpicsValue::Long(self.gpibiv)),
"I32IV" => Some(EpicsValue::Long(self.i32iv)),
"UI32IV" => Some(EpicsValue::Long(self.ui32iv)),
"F64IV" => Some(EpicsValue::Long(self.f64iv)),
"AOUT" => Some(EpicsValue::String(self.aout.clone().into())),
"OEOS" => Some(EpicsValue::String(self.oeos.clone().into())),
"BOUT" => Some(EpicsValue::CharArray(self.bout.clone())),
"OMAX" => Some(EpicsValue::Long(self.omax)),
"NOWT" => Some(EpicsValue::Long(self.nowt)),
"NAWT" => Some(EpicsValue::Long(self.nawt)),
"OFMT" => Some(EpicsValue::Short(self.ofmt as i16)),
"AINP" => Some(EpicsValue::String(self.ainp.clone().into())),
"TINP" => Some(EpicsValue::String(self.tinp.clone().into())),
"IEOS" => Some(EpicsValue::String(self.ieos.clone().into())),
"BINP" => Some(EpicsValue::CharArray(self.binp.clone())),
"IMAX" => Some(EpicsValue::Long(self.imax)),
"NRRD" => Some(EpicsValue::Long(self.nrrd)),
"NORD" => Some(EpicsValue::Long(self.nord)),
"IFMT" => Some(EpicsValue::Short(self.ifmt as i16)),
"EOMR" => Some(EpicsValue::Short(self.eomr as i16)),
"I32INP" => Some(EpicsValue::Long(self.i32inp)),
"I32OUT" => Some(EpicsValue::Long(self.i32out)),
"UI32INP" => Some(EpicsValue::Long(self.ui32inp as i32)),
"UI32OUT" => Some(EpicsValue::Long(self.ui32out as i32)),
"UI32MASK" => Some(EpicsValue::Long(self.ui32mask as i32)),
"F64INP" => Some(EpicsValue::Double(self.f64inp)),
"F64OUT" => Some(EpicsValue::Double(self.f64out)),
"BAUD" => Some(EpicsValue::Short(self.baud as i16)),
"LBAUD" => Some(EpicsValue::Long(self.lbaud)),
"PRTY" => Some(EpicsValue::Short(self.prty as i16)),
"DBIT" => Some(EpicsValue::Short(self.dbit as i16)),
"SBIT" => Some(EpicsValue::Short(self.sbit as i16)),
"MCTL" => Some(EpicsValue::Short(self.mctl as i16)),
"FCTL" => Some(EpicsValue::Short(self.fctl as i16)),
"IXON" => Some(EpicsValue::Short(self.ixon as i16)),
"IXOFF" => Some(EpicsValue::Short(self.ixoff as i16)),
"IXANY" => Some(EpicsValue::Short(self.ixany as i16)),
"HOSTINFO" => Some(EpicsValue::String(self.hostinfo.clone().into())),
"DRTO" => Some(EpicsValue::Short(self.drto as i16)),
"UCMD" => Some(EpicsValue::Short(self.ucmd as i16)),
"ACMD" => Some(EpicsValue::Short(self.acmd as i16)),
"SPR" => Some(EpicsValue::Char(self.spr as u8)),
"TMSK" => Some(EpicsValue::Long(self.tmsk)),
"TB0" => Some(EpicsValue::Short(self.tb0 as i16)),
"TB1" => Some(EpicsValue::Short(self.tb1 as i16)),
"TB2" => Some(EpicsValue::Short(self.tb2 as i16)),
"TB3" => Some(EpicsValue::Short(self.tb3 as i16)),
"TB4" => Some(EpicsValue::Short(self.tb4 as i16)),
"TB5" => Some(EpicsValue::Short(self.tb5 as i16)),
"TIOM" => Some(EpicsValue::Long(self.tiom)),
"TIB0" => Some(EpicsValue::Short(self.tib0 as i16)),
"TIB1" => Some(EpicsValue::Short(self.tib1 as i16)),
"TIB2" => Some(EpicsValue::Short(self.tib2 as i16)),
"TINM" => Some(EpicsValue::Long(self.tinm)),
"TINB0" => Some(EpicsValue::Short(self.tinb0 as i16)),
"TINB1" => Some(EpicsValue::Short(self.tinb1 as i16)),
"TINB2" => Some(EpicsValue::Short(self.tinb2 as i16)),
"TINB3" => Some(EpicsValue::Short(self.tinb3 as i16)),
"TSIZ" => Some(EpicsValue::Long(self.tsiz)),
"TFIL" => Some(EpicsValue::String(self.tfil.clone().into())),
"AUCT" => Some(EpicsValue::Short(self.auct as i16)),
"CNCT" => Some(EpicsValue::Short(self.cnct as i16)),
"ENBL" => Some(EpicsValue::Short(self.enbl as i16)),
"VAL" => Some(EpicsValue::Long(self.val)),
"ERRS" => Some(EpicsValue::String(self.errs.clone().into())),
"AQR" => Some(EpicsValue::Char(self.aqr as u8)),
_ => None,
}
}
fn put_field(&mut self, name: &str, value: EpicsValue) -> CaResult<()> {
let to_i32 = |v: &EpicsValue| -> i32 { v.to_f64().unwrap_or(0.0) as i32 };
let to_u32 = |v: &EpicsValue| -> u32 { v.to_f64().unwrap_or(0.0) as u32 };
let to_f64 = |v: &EpicsValue| -> f64 { v.to_f64().unwrap_or(0.0) };
let to_str = |v: &EpicsValue| -> String { format!("{v}") };
let to_bytes = |v: &EpicsValue| -> Vec<u8> {
match v {
EpicsValue::CharArray(b) => b.clone(),
EpicsValue::String(s) => s.as_bytes().to_vec(),
_ => Vec::new(),
}
};
match name {
"PORT" => {
self.port = to_str(&value);
}
"ADDR" => {
self.addr = to_i32(&value);
}
"PCNCT" => {
self.pcnct = to_i32(&value);
}
"DRVINFO" => {
self.drvinfo = to_str(&value);
}
"REASON" => {
self.reason = to_i32(&value);
}
"TMOD" => {
self.tmod = to_i32(&value);
}
"TMOT" => {
self.tmot = to_f64(&value);
}
"IFACE" => {
self.iface = to_i32(&value);
}
"OCTETIV" => {
self.octetiv = to_i32(&value);
}
"OPTIONIV" => {
self.optioniv = to_i32(&value);
}
"GPIBIV" => {
self.gpibiv = to_i32(&value);
}
"I32IV" => {
self.i32iv = to_i32(&value);
}
"UI32IV" => {
self.ui32iv = to_i32(&value);
}
"F64IV" => {
self.f64iv = to_i32(&value);
}
"AOUT" => {
self.aout = to_str(&value);
}
"OEOS" => {
self.oeos = to_str(&value);
}
"BOUT" => {
self.bout = to_bytes(&value);
}
"OMAX" => {
self.omax = to_i32(&value);
}
"NOWT" => {
self.nowt = to_i32(&value);
}
"NAWT" => {
self.nawt = to_i32(&value);
}
"OFMT" => {
self.ofmt = to_i32(&value);
}
"AINP" => {
self.ainp = to_str(&value);
}
"TINP" => {
self.tinp = to_str(&value);
}
"IEOS" => {
self.ieos = to_str(&value);
}
"BINP" => {
self.binp = to_bytes(&value);
}
"IMAX" => {
self.imax = to_i32(&value);
}
"NRRD" => {
self.nrrd = to_i32(&value);
}
"NORD" => {
self.nord = to_i32(&value);
}
"IFMT" => {
self.ifmt = to_i32(&value);
}
"EOMR" => {
self.eomr = to_i32(&value);
}
"I32INP" => {
self.i32inp = to_i32(&value);
}
"I32OUT" => {
self.i32out = to_i32(&value);
}
"UI32INP" => {
self.ui32inp = to_u32(&value);
}
"UI32OUT" => {
self.ui32out = to_u32(&value);
}
"UI32MASK" => {
self.ui32mask = to_u32(&value);
}
"F64INP" => {
self.f64inp = to_f64(&value);
}
"F64OUT" => {
self.f64out = to_f64(&value);
}
"BAUD" => {
self.baud = to_i32(&value);
}
"LBAUD" => {
self.lbaud = to_i32(&value);
}
"PRTY" => {
self.prty = to_i32(&value);
}
"DBIT" => {
self.dbit = to_i32(&value);
}
"SBIT" => {
self.sbit = to_i32(&value);
}
"MCTL" => {
self.mctl = to_i32(&value);
}
"FCTL" => {
self.fctl = to_i32(&value);
}
"IXON" => {
self.ixon = to_i32(&value);
}
"IXOFF" => {
self.ixoff = to_i32(&value);
}
"IXANY" => {
self.ixany = to_i32(&value);
}
"HOSTINFO" => {
self.hostinfo = to_str(&value);
}
"DRTO" => {
self.drto = to_i32(&value);
}
"UCMD" => {
self.ucmd = to_i32(&value);
}
"ACMD" => {
self.acmd = to_i32(&value);
}
"SPR" => {
self.spr = to_i32(&value);
}
"TMSK" => {
self.tmsk = to_i32(&value);
}
"TB0" => {
self.tb0 = to_i32(&value);
}
"TB1" => {
self.tb1 = to_i32(&value);
}
"TB2" => {
self.tb2 = to_i32(&value);
}
"TB3" => {
self.tb3 = to_i32(&value);
}
"TB4" => {
self.tb4 = to_i32(&value);
}
"TB5" => {
self.tb5 = to_i32(&value);
}
"TIOM" => {
self.tiom = to_i32(&value);
}
"TIB0" => {
self.tib0 = to_i32(&value);
}
"TIB1" => {
self.tib1 = to_i32(&value);
}
"TIB2" => {
self.tib2 = to_i32(&value);
}
"TINM" => {
self.tinm = to_i32(&value);
}
"TINB0" => {
self.tinb0 = to_i32(&value);
}
"TINB1" => {
self.tinb1 = to_i32(&value);
}
"TINB2" => {
self.tinb2 = to_i32(&value);
}
"TINB3" => {
self.tinb3 = to_i32(&value);
}
"TSIZ" => {
self.tsiz = to_i32(&value);
}
"TFIL" => {
self.tfil = to_str(&value);
}
"AUCT" => {
self.auct = to_i32(&value);
}
"CNCT" => {
self.cnct = to_i32(&value);
}
"ENBL" => {
self.enbl = to_i32(&value);
}
"VAL" => {
self.val = to_i32(&value);
}
"ERRS" => {
self.errs = to_str(&value);
}
"AQR" => {
self.aqr = to_i32(&value);
}
_ => {
return Err(CaError::InvalidValue(format!("unknown field: {name}")));
}
}
Ok(())
}
fn init_record(&mut self, pass: u8) -> CaResult<()> {
if pass == 1 && !self.port.is_empty() {
self.connect_device();
}
Ok(())
}
fn special(&mut self, field: &str, after: bool) -> CaResult<()> {
if !after {
return Ok(());
}
match field {
"PORT" | "ADDR" | "DRVINFO" => {
self.connect_device();
}
"TMSK" => {
self.update_trace_bits_from_mask();
self.apply_trace_mask();
}
"TB0" | "TB1" | "TB2" | "TB3" | "TB4" | "TB5" => {
self.update_mask_from_trace_bits();
self.apply_trace_mask();
}
"TIOM" => {
self.update_io_bits_from_mask();
self.apply_trace_io_mask();
}
"TIB0" | "TIB1" | "TIB2" => {
self.update_mask_from_io_bits();
self.apply_trace_io_mask();
}
"TINM" => {
self.update_info_bits_from_mask();
self.apply_trace_info_mask();
}
"TINB0" | "TINB1" | "TINB2" | "TINB3" => {
self.update_mask_from_info_bits();
self.apply_trace_info_mask();
}
"TSIZ" => {
self.apply_trace_truncate_size();
}
"TFIL" => {
self.apply_trace_file();
}
"ENBL" => {
if let Some(ref entry) = self.port_entry {
let _ = entry.handle.set_enable_blocking(self.enbl != 0);
}
}
"AUCT" => {
if let Some(ref entry) = self.port_entry {
let _ = entry.handle.set_auto_connect_blocking(self.auct != 0);
}
}
"CNCT" => {
if self.cnct != 0 {
self.connect_device();
} else {
self.pcnct = 0;
self.port_entry = None;
self.clear_trace_exception_callback();
}
}
"PCNCT" => {
if self.pcnct != 0 {
self.connect_device();
} else {
self.cnct = 0;
self.port_entry = None;
self.clear_trace_exception_callback();
}
}
"IFACE" => {
}
"REASON" => {
self.resolved_reason = self.reason as usize;
}
"BAUD" => {
let rate = menu_index_to_baud_rate(self.baud);
if rate > 0 {
self.lbaud = rate;
self.write_option("baud", &rate.to_string());
}
}
"LBAUD" => {
if self.lbaud > 0 {
self.baud = baud_rate_to_menu_index(self.lbaud);
self.write_option("baud", &self.lbaud.to_string());
}
}
"PRTY" => {
let val = match self.prty {
1 => "none",
2 => "even",
3 => "odd",
_ => return Ok(()),
};
self.write_option("parity", val);
}
"DBIT" => {
let val = match self.dbit {
1 => "5",
2 => "6",
3 => "7",
4 => "8",
_ => return Ok(()),
};
self.write_option("bits", val);
}
"SBIT" => {
let val = match self.sbit {
1 => "1",
2 => "2",
_ => return Ok(()),
};
self.write_option("stop", val);
}
"MCTL" => {
let val = match self.mctl {
1 => "Y", 2 => "N", _ => return Ok(()),
};
self.write_option("clocal", val);
}
"FCTL" => {
let val = match self.fctl {
1 => "N", 2 => "Y", _ => return Ok(()),
};
self.write_option("crtscts", val);
}
"IXON" => {
let val = match self.ixon {
1 => "N",
2 => "Y",
_ => return Ok(()),
};
self.write_option("ixon", val);
}
"IXOFF" => {
let val = match self.ixoff {
1 => "N",
2 => "Y",
_ => return Ok(()),
};
self.write_option("ixoff", val);
}
"IXANY" => {
let val = match self.ixany {
1 => "N",
2 => "Y",
_ => return Ok(()),
};
self.write_option("ixany", val);
}
"HOSTINFO" => {
if !self.hostinfo.is_empty() {
self.write_option("hostinfo", &self.hostinfo.clone());
}
}
"DRTO" => {
let val = match self.drto {
1 => "N",
2 => "Y",
_ => return Ok(()),
};
self.write_option("disconnectOnReadTimeout", val);
}
"AQR" => {
if let Some(inflight) = &self.io_inflight {
inflight.cancel.cancel();
}
}
"OEOS" => {
let bytes = translate_escape(&self.oeos);
if let Some(ref entry) = self.port_entry {
if let Err(e) = entry.handle.set_output_eos_blocking(&bytes) {
self.errs = format!("set_output_eos: {e}");
}
}
}
"IEOS" => {
let bytes = translate_escape(&self.ieos);
if let Some(ref entry) = self.port_entry {
if let Err(e) = entry.handle.set_input_eos_blocking(&bytes) {
self.errs = format!("set_input_eos: {e}");
}
}
}
"UI32MASK" => {
}
_ => {}
}
Ok(())
}
fn process(&mut self) -> CaResult<ProcessOutcome> {
if let Some(inflight) = self.io_inflight.take() {
let outcome = inflight.result.lock().unwrap().take().unwrap_or_default();
self.apply_io_outcome(outcome);
return Ok(ProcessOutcome::complete());
}
if self.trace_status_dirty.swap(false, Ordering::AcqRel) {
self.read_trace_state();
}
if self.ucmd != 0 {
self.errs = "No asynGpib interface".to_string();
self.ucmd = 0;
return Ok(ProcessOutcome::complete());
}
if self.acmd != 0 {
self.errs = "No asynGpib interface".to_string();
self.acmd = 0;
return Ok(ProcessOutcome::complete());
}
let tmod = TransferMode::from_u16(self.tmod as u16);
if tmod == TransferMode::NoIo {
return Ok(ProcessOutcome::complete());
}
self.errs.clear();
let blocking_handle = self
.port_entry
.as_ref()
.and_then(|e| e.handle.can_block().then(|| e.handle.clone()));
if let (Some(handle), Some((name, db))) = (blocking_handle, self.async_ctx.clone()) {
return Ok(self.spawn_async_io(handle, name, db));
}
self.perform_io()?;
Ok(ProcessOutcome::complete())
}
fn clears_udf(&self) -> bool {
true
}
}
impl Drop for AsynRecord {
fn drop(&mut self) {
self.clear_trace_exception_callback();
}
}
#[cfg(test)]
#[allow(clippy::field_reassign_with_default)]
mod tests {
use super::*;
use epics_base_rs::server::record::RecordProcessResult;
#[test]
fn test_default_fields() {
let rec = AsynRecord::default();
assert_eq!(rec.record_type(), "asyn");
assert_eq!(rec.cnct, 0);
assert_eq!(rec.tmot, 1.0);
assert_eq!(rec.omax, 80);
assert_eq!(rec.imax, 80);
assert_eq!(rec.tsiz, 80);
assert_eq!(rec.ui32mask, 0xFFFFFFFF);
assert_eq!(rec.auct, 1);
assert_eq!(rec.enbl, 1);
}
#[test]
fn test_field_list_count() {
let rec = AsynRecord::default();
assert_eq!(rec.field_list().len(), 76);
}
#[test]
fn test_get_put_roundtrip() {
let mut rec = AsynRecord::default();
rec.put_field("PORT", EpicsValue::String("SIM1".into()))
.unwrap();
assert_eq!(
rec.get_field("PORT"),
Some(EpicsValue::String("SIM1".into()))
);
rec.put_field("ADDR", EpicsValue::Long(3)).unwrap();
assert_eq!(rec.get_field("ADDR"), Some(EpicsValue::Long(3)));
rec.put_field("TMOT", EpicsValue::Double(2.5)).unwrap();
assert_eq!(rec.get_field("TMOT"), Some(EpicsValue::Double(2.5)));
rec.put_field("F64OUT", EpicsValue::Double(3.14)).unwrap();
assert_eq!(rec.get_field("F64OUT"), Some(EpicsValue::Double(3.14)));
}
#[test]
fn test_trace_bit_sync() {
let mut rec = AsynRecord::default();
rec.tmsk = (TraceMask::ERROR | TraceMask::FLOW).bits() as i32;
rec.update_trace_bits_from_mask();
assert_eq!(rec.tb0, 1); assert_eq!(rec.tb4, 1); assert_eq!(rec.tb1, 0);
assert_eq!(rec.tb2, 0);
assert_eq!(rec.tb3, 0);
assert_eq!(rec.tb5, 0);
rec.tb0 = 1;
rec.tb1 = 1;
rec.tb2 = 0;
rec.tb3 = 0;
rec.tb4 = 0;
rec.tb5 = 1;
rec.update_mask_from_trace_bits();
let expected = TraceMask::ERROR | TraceMask::IO_DEVICE | TraceMask::WARNING;
assert_eq!(rec.tmsk, expected.bits() as i32);
}
#[test]
fn test_io_bit_sync() {
let mut rec = AsynRecord::default();
rec.tiom = (TraceIoMask::ASCII | TraceIoMask::HEX).bits() as i32;
rec.update_io_bits_from_mask();
assert_eq!(rec.tib0, 1); assert_eq!(rec.tib1, 0); assert_eq!(rec.tib2, 1); }
#[test]
fn test_info_bit_sync() {
let mut rec = AsynRecord::default();
rec.tinm = (TraceInfoMask::TIME | TraceInfoMask::THREAD).bits() as i32;
rec.update_info_bits_from_mask();
assert_eq!(rec.tinb0, 1); assert_eq!(rec.tinb1, 0); assert_eq!(rec.tinb2, 0); assert_eq!(rec.tinb3, 1); }
#[test]
fn test_connect_nonexistent_port() {
let mut rec = AsynRecord::default();
rec.port = "NONEXISTENT".to_string();
rec.connect_device();
assert_eq!(rec.cnct, 0);
assert!(rec.errs.contains("not found"));
}
#[test]
fn test_connect_empty_port() {
let mut rec = AsynRecord::default();
rec.connect_device();
assert_eq!(rec.cnct, 0);
assert!(rec.port_entry.is_none());
}
#[test]
fn test_process_no_io_mode() {
let mut rec = AsynRecord::default();
rec.tmod = TransferMode::NoIo as i32;
let result = rec.process().unwrap();
assert_eq!(result.result, RecordProcessResult::Complete);
}
#[test]
fn test_process_not_connected() {
let mut rec = AsynRecord::default();
rec.tmod = TransferMode::Read as i32;
rec.process().unwrap();
assert_eq!(rec.errs, "not connected");
}
#[test]
fn process_ucmd_with_no_gpib_interface_errors_and_resets() {
let mut rec = AsynRecord::default();
rec.tmod = TransferMode::Read as i32;
rec.ucmd = 1; rec.process().unwrap();
assert_eq!(rec.errs, "No asynGpib interface");
assert_eq!(rec.ucmd, 0, "UCMD must reset to None after dispatch");
}
#[test]
fn process_acmd_with_no_gpib_interface_errors_and_resets() {
let mut rec = AsynRecord::default();
rec.tmod = TransferMode::Read as i32;
rec.acmd = 1; rec.process().unwrap();
assert_eq!(rec.errs, "No asynGpib interface");
assert_eq!(rec.acmd, 0, "ACMD must reset to None after dispatch");
}
#[test]
fn process_ucmd_takes_priority_over_acmd() {
let mut rec = AsynRecord::default();
rec.ucmd = 1;
rec.acmd = 1;
rec.process().unwrap();
assert_eq!(rec.ucmd, 0, "UCMD consumed first");
assert_eq!(rec.acmd, 1, "ACMD left pending while UCMD was set");
}
#[test]
fn test_special_trace_mask() {
let mut rec = AsynRecord::default();
rec.tmsk = (TraceMask::ERROR | TraceMask::WARNING | TraceMask::FLOW).bits() as i32;
rec.special("TMSK", true).unwrap();
assert_eq!(rec.tb0, 1); assert_eq!(rec.tb4, 1); assert_eq!(rec.tb5, 1); }
#[test]
fn test_special_trace_bits() {
let mut rec = AsynRecord::default();
rec.tb0 = 1;
rec.tb3 = 1;
rec.special("TB0", true).unwrap();
assert_eq!(
rec.tmsk as u32,
(TraceMask::ERROR | TraceMask::IO_DRIVER).bits()
);
}
#[test]
fn test_register_and_get_port() {
use crate::interrupt::InterruptManager;
use crate::port::{PortDriver, PortDriverBase, PortFlags};
use crate::port_actor::PortActor;
use tokio::sync::mpsc;
struct TestDriver(PortDriverBase);
impl TestDriver {
fn new() -> Self {
Self(PortDriverBase::new(
"test_asyn_rec",
1,
PortFlags::default(),
))
}
}
impl PortDriver for TestDriver {
fn base(&self) -> &PortDriverBase {
&self.0
}
fn base_mut(&mut self) -> &mut PortDriverBase {
&mut self.0
}
}
let interrupts = Arc::new(InterruptManager::new(256));
let (tx, rx) = mpsc::channel(256);
let actor = PortActor::new(Box::new(TestDriver::new()), rx);
std::thread::spawn(move || actor.run());
let handle = PortHandle::new(tx, "test_asyn_rec".into(), interrupts);
let trace = Arc::new(TraceManager::new());
register_port("test_asyn_rec", handle, trace);
let entry = registry::get_port("test_asyn_rec");
assert!(entry.is_some());
assert_eq!(entry.unwrap().handle.port_name(), "test_asyn_rec");
}
#[test]
fn trace_controls_route_to_device_on_multi_device_port() {
use crate::interrupt::InterruptManager;
use crate::port::{PortDriver, PortDriverBase, PortFlags};
use crate::port_actor::PortActor;
use tokio::sync::mpsc;
struct MdDriver(PortDriverBase);
impl PortDriver for MdDriver {
fn base(&self) -> &PortDriverBase {
&self.0
}
fn base_mut(&mut self) -> &mut PortDriverBase {
&mut self.0
}
}
let port_name = "test_trace_addr_md";
let flags = PortFlags {
multi_device: true,
..PortFlags::default()
};
let interrupts = Arc::new(InterruptManager::new(256));
let (tx, rx) = mpsc::channel(256);
let actor = PortActor::new(
Box::new(MdDriver(PortDriverBase::new(port_name, 4, flags))),
rx,
);
std::thread::spawn(move || actor.run());
let mut handle = PortHandle::new(tx, port_name.into(), interrupts);
handle.set_capabilities(true, 4);
let trace = Arc::new(TraceManager::new());
register_port(port_name, handle, trace.clone());
let mut rec = AsynRecord::default();
rec.port = port_name.to_string();
rec.addr = 3;
rec.connect_device();
assert_eq!(rec.cnct, 1);
assert!(rec.trace_addr_target() == Some(3));
rec.tmsk = (TraceMask::ERROR | TraceMask::FLOW).bits() as i32;
rec.apply_trace_mask();
rec.tsiz = 17;
rec.apply_trace_truncate_size();
assert!(trace.is_enabled_device(port_name, 3, TraceMask::FLOW));
assert!(!trace.is_enabled_device(port_name, 4, TraceMask::FLOW));
assert!(!trace.is_enabled(port_name, TraceMask::FLOW));
let mut rec0 = AsynRecord::default();
rec0.port = port_name.to_string();
rec0.addr = -1; rec0.connect_device();
assert!(rec0.trace_addr_target().is_none());
}
#[test]
fn read_trace_state_imports_info_mask_on_connect() {
use crate::interrupt::InterruptManager;
use crate::port::{PortDriver, PortDriverBase, PortFlags};
use crate::port_actor::PortActor;
use tokio::sync::mpsc;
struct D(PortDriverBase);
impl PortDriver for D {
fn base(&self) -> &PortDriverBase {
&self.0
}
fn base_mut(&mut self) -> &mut PortDriverBase {
&mut self.0
}
}
let port_name = "test_trace_info_sync";
let interrupts = Arc::new(InterruptManager::new(256));
let (tx, rx) = mpsc::channel(256);
let actor = PortActor::new(
Box::new(D(PortDriverBase::new(port_name, 1, PortFlags::default()))),
rx,
);
std::thread::spawn(move || actor.run());
let handle = PortHandle::new(tx, port_name.into(), interrupts);
let trace = Arc::new(TraceManager::new());
trace.set_trace_info_mask(
Some(port_name),
TraceInfoMask::SOURCE | TraceInfoMask::THREAD,
);
register_port(port_name, handle, trace);
let mut rec = AsynRecord::default();
rec.port = port_name.to_string();
rec.connect_device();
assert_eq!(rec.cnct, 1);
assert_eq!(
rec.tinm as u32,
(TraceInfoMask::SOURCE | TraceInfoMask::THREAD).bits()
);
assert_eq!(rec.tinb0, 0); assert_eq!(rec.tinb1, 0); assert_eq!(rec.tinb2, 1); assert_eq!(rec.tinb3, 1); }
#[test]
fn external_trace_info_mask_reflected_after_process() {
use crate::exception::ExceptionManager;
use crate::interrupt::InterruptManager;
use crate::port::{PortDriver, PortDriverBase, PortFlags};
use crate::port_actor::PortActor;
use tokio::sync::mpsc;
struct D(PortDriverBase);
impl PortDriver for D {
fn base(&self) -> &PortDriverBase {
&self.0
}
fn base_mut(&mut self) -> &mut PortDriverBase {
&mut self.0
}
}
let port_name = "test_trace_info_live";
let interrupts = Arc::new(InterruptManager::new(256));
let (tx, rx) = mpsc::channel(256);
let actor = PortActor::new(
Box::new(D(PortDriverBase::new(port_name, 1, PortFlags::default()))),
rx,
);
std::thread::spawn(move || actor.run());
let handle = PortHandle::new(tx, port_name.into(), interrupts);
let trace = Arc::new(TraceManager::new());
trace.set_exception_sink(Arc::new(ExceptionManager::new()));
trace.set_trace_info_mask(Some(port_name), TraceInfoMask::TIME);
register_port(port_name, handle, trace.clone());
let mut rec = AsynRecord::default();
rec.port = port_name.to_string();
rec.tmod = TransferMode::NoIo as i32; rec.connect_device();
assert_eq!(rec.cnct, 1);
assert_eq!(rec.tinm as u32, TraceInfoMask::TIME.bits());
assert_eq!(rec.tinb0, 1); assert_eq!(rec.tinb1, 0);
trace.set_trace_info_mask(Some(port_name), TraceInfoMask::PORT | TraceInfoMask::THREAD);
assert_eq!(rec.tinm as u32, TraceInfoMask::TIME.bits());
rec.process().unwrap();
assert_eq!(
rec.tinm as u32,
(TraceInfoMask::PORT | TraceInfoMask::THREAD).bits()
);
assert_eq!(rec.tinb0, 0); assert_eq!(rec.tinb1, 1); assert_eq!(rec.tinb3, 1); }
#[test]
fn octet_read_updates_only_ifmt_selected_field() {
use crate::interpose::EomReason;
use crate::interrupt::InterruptManager;
use crate::port::{PortDriver, PortDriverBase, PortFlags};
use crate::port_actor::PortActor;
use crate::user::AsynUser;
use tokio::sync::mpsc;
const PAYLOAD: &[u8] = &[0xFF, b'A', b'B'];
struct OctetDriver(PortDriverBase);
impl PortDriver for OctetDriver {
fn base(&self) -> &PortDriverBase {
&self.0
}
fn base_mut(&mut self) -> &mut PortDriverBase {
&mut self.0
}
fn io_read_octet_eom(
&mut self,
_user: &AsynUser,
buf: &mut [u8],
) -> crate::error::AsynResult<(usize, EomReason)> {
let n = PAYLOAD.len().min(buf.len());
buf[..n].copy_from_slice(&PAYLOAD[..n]);
Ok((n, EomReason::END))
}
}
let port_name = "test_ifmt_octet_read";
let interrupts = Arc::new(InterruptManager::new(256));
let (tx, rx) = mpsc::channel(256);
let actor = PortActor::new(
Box::new(OctetDriver(PortDriverBase::new(
port_name,
1,
PortFlags::default(),
))),
rx,
);
std::thread::spawn(move || actor.run());
let handle = PortHandle::new(tx, port_name.into(), interrupts);
register_port(port_name, handle, Arc::new(TraceManager::new()));
let mut ascii = AsynRecord::default();
ascii.port = port_name.to_string();
ascii.connect_device();
ascii.iface = 0; ascii.tmod = TransferMode::Read as i32;
ascii.imax = 256;
ascii.ifmt = ASYN_FMT_ASCII;
ascii.binp = b"SENTINEL".to_vec();
ascii.process().unwrap();
assert_eq!(ascii.errs, "");
assert_eq!(ascii.nord, PAYLOAD.len() as i32);
assert_eq!(ascii.ainp, String::from_utf8_lossy(PAYLOAD));
assert_eq!(
ascii.binp,
b"SENTINEL".to_vec(),
"ASCII read must not touch BINP"
);
assert!(!ascii.tinp.is_empty(), "TINP is posted for every read mode");
let mut binary = AsynRecord::default();
binary.port = port_name.to_string();
binary.connect_device();
binary.iface = 0;
binary.tmod = TransferMode::Read as i32;
binary.imax = 256;
binary.ifmt = ASYN_FMT_BINARY;
binary.ainp = "SENTINEL".to_string();
binary.process().unwrap();
assert_eq!(binary.errs, "");
assert_eq!(binary.nord, PAYLOAD.len() as i32);
assert_eq!(binary.binp, PAYLOAD.to_vec());
assert_eq!(binary.ainp, "SENTINEL", "Binary read must not touch AINP");
}
#[test]
fn test_register_asyn_record_type() {
register_asyn_record_type();
let rec = epics_base_rs::server::db_loader::create_record("asyn").unwrap();
assert_eq!(rec.record_type(), "asyn");
assert!(rec.field_list().len() > 3);
}
#[test]
fn test_translate_escape_standard_sequences() {
assert_eq!(translate_escape("\\r\\n"), vec![0x0D, 0x0A]);
assert_eq!(translate_escape("\\t"), vec![0x09]);
assert_eq!(translate_escape("\\\\"), vec![b'\\']);
assert_eq!(translate_escape("\\0"), vec![0x00]);
assert_eq!(translate_escape("abc"), vec![b'a', b'b', b'c']);
assert_eq!(translate_escape("\\x"), vec![b'\\', b'x']);
assert_eq!(translate_escape("a\\"), vec![b'a', b'\\']);
}
#[test]
fn test_translate_escape_octal() {
assert_eq!(translate_escape("\\033"), vec![0x1B]); assert_eq!(translate_escape("\\7"), vec![0x07]); assert_eq!(translate_escape("\\101"), vec![b'A']); assert_eq!(translate_escape("\\0119"), vec![0x09, b'9']);
assert_eq!(translate_escape("\\0"), vec![0x00]);
assert_eq!(translate_escape("\\015\\012"), vec![0x0D, 0x0A]);
}
#[test]
fn test_octet_output_buffer_by_ofmt() {
let mut rec = AsynRecord::default();
rec.ofmt = ASYN_FMT_ASCII;
rec.aout = "hi\\r\\n".to_string();
rec.nowt = 2; assert_eq!(
rec.octet_output_buffer(),
vec![b'h', b'i', 0x0D, 0x0A],
"ASCII must escape-translate AOUT and ignore NOWT"
);
rec.ofmt = ASYN_FMT_HYBRID;
rec.bout = b"x\\t".to_vec();
assert_eq!(
rec.octet_output_buffer(),
vec![b'x', 0x09],
"Hybrid must escape-translate the BOUT buffer"
);
rec.bout = b"ab\0cd".to_vec();
assert_eq!(rec.octet_output_buffer(), vec![b'a', b'b']);
rec.ofmt = ASYN_FMT_BINARY;
rec.bout = vec![b'\\', b'r', 0x00, 0x01, 0x02];
rec.omax = 80;
rec.nowt = 4;
assert_eq!(
rec.octet_output_buffer(),
vec![b'\\', b'r', 0x00, 0x01],
"Binary writes raw BOUT untranslated, NOWT bytes"
);
rec.omax = 3;
rec.nowt = 10;
assert_eq!(rec.octet_output_buffer(), vec![b'\\', b'r', 0x00]);
}
#[test]
fn test_tfil_special_targets() {
assert!(matches!(open_trace_file("").unwrap(), TraceFile::Stdout));
assert!(matches!(
open_trace_file("<stdout>").unwrap(),
TraceFile::Stdout
));
assert!(matches!(
open_trace_file("<stderr>").unwrap(),
TraceFile::Stderr
));
assert!(matches!(
open_trace_file("<errlog>").unwrap(),
TraceFile::Errlog
));
}
#[test]
fn test_tfil_bare_names_are_file_paths() {
let dir = std::env::temp_dir().join(format!("asynrec_tfil_bare_{}", std::process::id()));
std::fs::create_dir_all(&dir).unwrap();
for name in ["stdout", "stderr"] {
let path = dir.join(name);
let p = path.to_str().unwrap();
assert!(
matches!(open_trace_file(p).unwrap(), TraceFile::File(_)),
"bare {name} must resolve to a file path, not a console sink"
);
}
let _ = std::fs::remove_dir_all(&dir);
}
#[test]
fn test_tfil_path_appends_not_truncates() {
let path = std::env::temp_dir().join(format!("asynrec_tfil_append_{}", std::process::id()));
let p = path.to_str().unwrap();
let _ = std::fs::remove_file(&path);
open_trace_file(p).unwrap().write_line("first\n");
open_trace_file(p).unwrap().write_line("second\n");
let contents = std::fs::read_to_string(&path).unwrap();
assert_eq!(
contents, "first\nsecond\n",
"re-opening a trace file must append, not truncate"
);
let _ = std::fs::remove_file(&path);
}
fn canblock_int32_entry(value: i32) -> super::registry::PortEntry {
use crate::interrupt::InterruptManager;
use crate::param::ParamType;
use crate::port::{PortDriver, PortDriverBase, PortFlags};
use crate::port_actor::PortActor;
use crate::trace::TraceManager;
use tokio::sync::mpsc;
struct ReadDriver {
base: PortDriverBase,
}
impl PortDriver for ReadDriver {
fn base(&self) -> &PortDriverBase {
&self.base
}
fn base_mut(&mut self) -> &mut PortDriverBase {
&mut self.base
}
}
let mut base = PortDriverBase::new("ASYNIO", 1, PortFlags::default());
let val = base.create_param("VAL", ParamType::Int32).unwrap();
base.set_int32_param(val, 0, value).unwrap();
let (tx, rx) = mpsc::channel(16);
let actor = PortActor::new(Box::new(ReadDriver { base }), rx);
std::thread::Builder::new()
.name("asynio-test-actor".into())
.spawn(move || actor.run())
.unwrap();
let mut handle = PortHandle::new(tx, "ASYNIO".into(), Arc::new(InterruptManager::new(16)));
handle.set_can_block(true);
super::registry::PortEntry {
handle,
trace: Arc::new(TraceManager::new()),
}
}
#[tokio::test]
async fn nonblocking_canblock_port_defers_then_applies_on_reentry() {
use epics_base_rs::server::database::PvDatabase;
let mut rec = AsynRecord::default();
rec.port_entry = Some(canblock_int32_entry(7));
rec.tmod = TransferMode::Read as i32;
rec.iface = InterfaceType::Int32 as i32;
rec.resolved_reason = 0;
let db = PvDatabase::new();
rec.async_ctx = Some(("ASYNIO_REC".to_string(), db.async_handle()));
let out = rec.process().unwrap();
assert_eq!(
out.result,
RecordProcessResult::AsyncPending,
"a can_block port with async context must defer, not run inline"
);
assert!(
rec.io_inflight.is_some(),
"the deferred request is held in io_inflight until completion"
);
assert_eq!(
rec.i32inp, 0,
"the scan thread returned before the read value landed"
);
let slot = rec.io_inflight.as_ref().unwrap().result.clone();
let mut filled = false;
for _ in 0..2000 {
if slot.lock().unwrap().is_some() {
filled = true;
break;
}
tokio::time::sleep(std::time::Duration::from_millis(1)).await;
}
assert!(filled, "the orchestration must fill the result slot");
let out2 = rec.process().unwrap();
assert_eq!(out2.result, RecordProcessResult::Complete);
assert!(
rec.io_inflight.is_none(),
"completion re-entry clears the in-flight slot"
);
assert_eq!(rec.i32inp, 7, "the read value is applied on re-entry");
}
#[tokio::test]
async fn aqr_after_driver_dequeue_runs_to_completion() {
use crate::interrupt::InterruptManager;
use crate::param::ParamType;
use crate::port::{PortDriver, PortDriverBase, PortFlags};
use crate::port_actor::PortActor;
use crate::trace::TraceManager;
use epics_base_rs::server::database::PvDatabase;
use std::sync::Barrier;
use std::sync::atomic::AtomicBool;
use tokio::sync::mpsc;
struct BlockingReadDriver {
base: PortDriverBase,
value: i32,
entered: Arc<AtomicBool>,
release: Arc<Barrier>,
}
impl PortDriver for BlockingReadDriver {
fn base(&self) -> &PortDriverBase {
&self.base
}
fn base_mut(&mut self) -> &mut PortDriverBase {
&mut self.base
}
fn read_int32(&mut self, _user: &AsynUser) -> AsynResult<i32> {
self.entered.store(true, Ordering::SeqCst);
self.release.wait();
Ok(self.value)
}
}
let entered = Arc::new(AtomicBool::new(false));
let release = Arc::new(Barrier::new(2));
let mut base = PortDriverBase::new("ASYNAQR", 1, PortFlags::default());
base.create_param("VAL", ParamType::Int32).unwrap();
let driver = BlockingReadDriver {
base,
value: 7,
entered: entered.clone(),
release: release.clone(),
};
let (tx, rx) = mpsc::channel(16);
let actor = PortActor::new(Box::new(driver), rx);
std::thread::Builder::new()
.name("asynaqr-test-actor".into())
.spawn(move || actor.run())
.unwrap();
let mut handle = PortHandle::new(tx, "ASYNAQR".into(), Arc::new(InterruptManager::new(16)));
handle.set_can_block(true);
let entry = super::registry::PortEntry {
handle,
trace: Arc::new(TraceManager::new()),
};
let mut rec = AsynRecord::default();
rec.port_entry = Some(entry);
rec.tmod = TransferMode::Read as i32;
rec.iface = InterfaceType::Int32 as i32;
rec.resolved_reason = 0;
let db = PvDatabase::new();
rec.async_ctx = Some(("ASYNAQR_REC".to_string(), db.async_handle()));
rec.special("AQR", true).unwrap();
assert!(rec.errs.is_empty(), "AQR with no in-flight request is idle");
let out = rec.process().unwrap();
assert_eq!(out.result, RecordProcessResult::AsyncPending);
for _ in 0..2000 {
if entered.load(Ordering::SeqCst) {
break;
}
tokio::time::sleep(std::time::Duration::from_millis(1)).await;
}
assert!(
entered.load(Ordering::SeqCst),
"the off-thread read must reach the driver before AQR"
);
rec.special("AQR", true).unwrap();
release.wait();
let slot = rec.io_inflight.as_ref().unwrap().result.clone();
let mut filled = false;
for _ in 0..2000 {
if slot.lock().unwrap().is_some() {
filled = true;
break;
}
tokio::time::sleep(std::time::Duration::from_millis(1)).await;
}
assert!(filled, "the running request still produces an outcome");
let out2 = rec.process().unwrap();
assert_eq!(out2.result, RecordProcessResult::Complete);
assert!(rec.io_inflight.is_none(), "completion re-entry leaves idle");
assert!(
rec.errs.is_empty(),
"a cancel that lost the race does not report CANCELED (wasQueued==0)"
);
assert_eq!(
rec.i32inp, 7,
"the device read value applies normally when the cancel loses the race"
);
}
#[tokio::test(flavor = "multi_thread", worker_threads = 2)]
async fn trace_change_posts_readback_fields_immediately() {
use crate::exception::ExceptionManager;
use crate::interrupt::InterruptManager;
use crate::port::{PortDriver, PortDriverBase, PortFlags};
use crate::port_actor::PortActor;
use crate::trace::TraceManager;
use epics_base_rs::server::database::PvDatabase;
use tokio::sync::mpsc;
struct TraceDriver(PortDriverBase);
impl PortDriver for TraceDriver {
fn base(&self) -> &PortDriverBase {
&self.0
}
fn base_mut(&mut self) -> &mut PortDriverBase {
&mut self.0
}
}
let port_name = "test_trace_immediate_post";
let (tx, rx) = mpsc::channel(256);
let actor = PortActor::new(
Box::new(TraceDriver(PortDriverBase::new(
port_name,
1,
PortFlags::default(),
))),
rx,
);
std::thread::spawn(move || actor.run());
let handle = PortHandle::new(tx, port_name.into(), Arc::new(InterruptManager::new(256)));
let trace = Arc::new(TraceManager::new());
trace.set_exception_sink(Arc::new(ExceptionManager::new()));
trace.set_trace_mask(Some(port_name), TraceMask::empty());
super::registry::register_port(port_name, handle, trace.clone());
let db = PvDatabase::new();
let rec_name = "TRACE_IMM_REC";
let mut rec = AsynRecord::default();
rec.port = port_name.to_string();
rec.set_async_context(rec_name.to_string(), db.async_handle());
rec.connect_device();
assert_eq!(rec.cnct, 1, "record must connect to the registered port");
assert_eq!(rec.tmsk, 0, "baseline trace mask is empty");
db.add_record(rec_name, Box::new(rec)).await.unwrap();
let new_mask = TraceMask::ERROR | TraceMask::FLOW;
{
let tm = trace.clone();
let pn = port_name.to_string();
std::thread::spawn(move || tm.set_trace_mask(Some(&pn), new_mask))
.join()
.unwrap();
}
let want = new_mask.bits() as i32;
let mut posted = false;
for _ in 0..2000 {
let inst = db.get_record(rec_name).await.unwrap();
let got = inst.read().await.record.get_field("TMSK");
if got == Some(EpicsValue::Long(want)) {
posted = true;
break;
}
tokio::time::sleep(std::time::Duration::from_millis(1)).await;
}
assert!(
posted,
"trace change must post TMSK immediately, no process()"
);
let inst = db.get_record(rec_name).await.unwrap();
let g = inst.read().await;
assert_eq!(g.record.get_field("TMSK"), Some(EpicsValue::Long(want)));
assert_eq!(
g.record.get_field("TB0"),
Some(EpicsValue::Short(1)),
"ERROR bit posted"
);
assert_eq!(
g.record.get_field("TB4"),
Some(EpicsValue::Short(1)),
"FLOW bit posted"
);
assert_eq!(
g.record.get_field("TB1"),
Some(EpicsValue::Short(0)),
"IO_DEVICE bit stays clear"
);
}
#[tokio::test(flavor = "multi_thread", worker_threads = 2)]
async fn unchanged_trace_field_is_not_reposted() {
use crate::exception::ExceptionManager;
use crate::interrupt::InterruptManager;
use crate::port::{PortDriver, PortDriverBase, PortFlags};
use crate::port_actor::PortActor;
use crate::trace::{TraceIoMask, TraceManager};
use epics_base_rs::server::database::PvDatabase;
use epics_base_rs::server::database::db_access::DbSubscription;
use std::time::Duration;
use tokio::sync::mpsc;
struct TraceDriver(PortDriverBase);
impl PortDriver for TraceDriver {
fn base(&self) -> &PortDriverBase {
&self.0
}
fn base_mut(&mut self) -> &mut PortDriverBase {
&mut self.0
}
}
let port_name = "test_trace_post_if_new";
let (tx, rx) = mpsc::channel(256);
let actor = PortActor::new(
Box::new(TraceDriver(PortDriverBase::new(
port_name,
1,
PortFlags::default(),
))),
rx,
);
std::thread::spawn(move || actor.run());
let handle = PortHandle::new(tx, port_name.into(), Arc::new(InterruptManager::new(256)));
let trace = Arc::new(TraceManager::new());
trace.set_exception_sink(Arc::new(ExceptionManager::new()));
trace.set_trace_mask(Some(port_name), TraceMask::ERROR);
trace.set_trace_io_mask(Some(port_name), TraceIoMask::empty());
super::registry::register_port(port_name, handle, trace.clone());
let db = PvDatabase::new();
let rec_name = "TRACE_POSTIFNEW_REC";
let mut rec = AsynRecord::default();
rec.port = port_name.to_string();
rec.set_async_context(rec_name.to_string(), db.async_handle());
rec.connect_device();
assert_eq!(rec.cnct, 1, "record must connect to the registered port");
db.add_record(rec_name, Box::new(rec)).await.unwrap();
let mut tmsk_sub = DbSubscription::subscribe(&db, &format!("{rec_name}.TMSK"))
.await
.expect("subscribe TMSK");
let mut tiom_sub = DbSubscription::subscribe(&db, &format!("{rec_name}.TIOM"))
.await
.expect("subscribe TIOM");
let new_tmsk = TraceMask::ERROR | TraceMask::FLOW;
{
let (tm, pn) = (trace.clone(), port_name.to_string());
std::thread::spawn(move || tm.set_trace_mask(Some(&pn), new_tmsk))
.join()
.unwrap();
}
assert_eq!(
tmsk_sub.recv().await,
Some(EpicsValue::Long(new_tmsk.bits() as i32)),
"a changed TMSK is posted to its monitor"
);
{
let (tm, pn) = (trace.clone(), port_name.to_string());
std::thread::spawn(move || tm.set_trace_io_mask(Some(&pn), TraceIoMask::ASCII))
.join()
.unwrap();
}
assert_eq!(
tiom_sub.recv().await,
Some(EpicsValue::Long(TraceIoMask::ASCII.bits() as i32)),
"a changed TIOM is posted to its monitor"
);
let reposted = tokio::time::timeout(Duration::from_millis(500), tmsk_sub.recv()).await;
assert!(
reposted.is_err(),
"unchanged TMSK must not be re-posted on an IO-mask-only change, got {reposted:?}"
);
}
}