use std::sync::Mutex;
use std::time::Duration;
use crate::error::{AsynError, AsynResult, AsynStatus};
use crate::port::{PortDriver, PortDriverBase, PortFlags};
use crate::user::AsynUser;
pub const EOT_MARKER: u8 = 0xEF;
pub const DEFAULT_TCP_PORT: u16 = 1234;
pub const DEFAULT_BUF_CAPACITY: usize = 4096;
struct State {
last_primary: i32,
last_secondary: i32,
eos: Option<u8>,
version: String,
read_carry: Vec<u8>,
}
pub struct DrvAsynPrologixPort {
base: PortDriverBase,
inner: super::ip_port::DrvAsynIPPort,
state: Mutex<State>,
}
impl DrvAsynPrologixPort {
pub fn new(port_name: &str, host: &str, no_auto_connect: bool) -> AsynResult<Self> {
let ip_spec = if host.contains(':') {
if host.to_ascii_uppercase().ends_with(" TCP") {
host.to_string()
} else {
format!("{host} TCP")
}
} else {
format!("{host}:{DEFAULT_TCP_PORT} TCP")
};
let inner = super::ip_port::DrvAsynIPPort::new(&format!("{port_name}_TCP"), &ip_spec)?;
let mut base = PortDriverBase::new(
port_name,
31,
PortFlags {
multi_device: true,
can_block: true,
destructible: true,
},
);
base.connected = false;
base.auto_connect = !no_auto_connect;
Ok(Self {
base,
inner,
state: Mutex::new(State {
last_primary: -1,
last_secondary: -1,
eos: None,
version: String::new(),
read_carry: Vec::new(),
}),
})
}
pub fn version(&self) -> String {
self.state.lock().unwrap().version.clone()
}
pub fn eos(&self) -> Option<u8> {
self.state.lock().unwrap().eos
}
pub fn set_eos(&self, eos: Option<u8>) -> AsynResult<()> {
let mut s = self.state.lock().unwrap();
if s.eos == eos {
return Ok(());
}
s.eos = eos;
Ok(())
}
fn decode_addr(addr: i32) -> AsynResult<(i32, i32)> {
let (primary, secondary) = if addr < 100 {
(addr, -1)
} else {
let p = addr / 100;
let s = addr % 100;
if !(0..31).contains(&s) {
return Err(AsynError::Status {
status: AsynStatus::Error,
message: format!("Invalid GPIB secondary address {s}"),
});
}
(p, s)
};
if !(0..31).contains(&primary) {
return Err(AsynError::Status {
status: AsynStatus::Error,
message: format!("Invalid GPIB primary address {primary}"),
});
}
Ok((primary, secondary))
}
fn addr_line(primary: i32, secondary: i32) -> String {
if secondary < 0 {
format!("++addr {primary}\n")
} else {
format!("++addr {primary} {}\n", secondary + 96)
}
}
pub fn stash_char(buf: &mut Vec<u8>, c: u8) {
if matches!(c, b'\r' | b'\n' | 0x1B | b'+') {
buf.push(0x1B);
}
buf.push(c);
}
fn set_address(&mut self, user: &AsynUser) -> AsynResult<()> {
let (primary, secondary) = Self::decode_addr(user.addr)?;
{
let s = self.state.lock().unwrap();
if s.last_primary == primary && s.last_secondary == secondary {
return Ok(());
}
}
let cmd = Self::addr_line(primary, secondary);
let mut bridge_user = AsynUser::default().with_timeout(Duration::from_secs(1));
match self.inner.write_octet(&mut bridge_user, cmd.as_bytes()) {
Ok(()) => {
let mut s = self.state.lock().unwrap();
s.last_primary = primary;
s.last_secondary = secondary;
Ok(())
}
Err(e) => {
let mut s = self.state.lock().unwrap();
s.last_primary = -1;
s.last_secondary = -1;
Err(e)
}
}
}
}
impl PortDriver for DrvAsynPrologixPort {
fn base(&self) -> &PortDriverBase {
&self.base
}
fn base_mut(&mut self) -> &mut PortDriverBase {
&mut self.base
}
fn connect(&mut self, user: &AsynUser) -> AsynResult<()> {
{
let mut s = self.state.lock().unwrap();
s.last_primary = -1;
s.last_secondary = -1;
}
if user.addr < 0 {
self.inner.connect(user)?;
let init = format!(
"++savecfg 0\n++mode 1\n++ifc\n++eos 3\n++eoi 1\n\
++eot_char {EOT_MARKER}\n++eot_enable 1\n++ver\n",
);
let mut tu = AsynUser::default().with_timeout(Duration::from_secs(1));
self.inner.write_octet(&mut tu, init.as_bytes())?;
let mut acc = Vec::with_capacity(64);
let mut buf = [0u8; 64];
loop {
let ru = AsynUser::default().with_timeout(Duration::from_millis(500));
let n = self.inner.read_octet(&ru, &mut buf)?;
if n == 0 {
return Err(AsynError::Status {
status: AsynStatus::Error,
message: "Prologix: bridge closed during version handshake".into(),
});
}
acc.extend_from_slice(&buf[..n]);
if acc.len() > 200 {
return Err(AsynError::Status {
status: AsynStatus::Error,
message: "Prologix: version string too long".into(),
});
}
if acc.len() >= 2 && acc[acc.len() - 2] == b'\r' && acc[acc.len() - 1] == b'\n' {
let v = String::from_utf8_lossy(&acc[..acc.len() - 2]).to_string();
self.state.lock().unwrap().version = v;
break;
}
}
}
self.base.set_connected(true);
Ok(())
}
fn disconnect(&mut self, user: &AsynUser) -> AsynResult<()> {
if user.addr < 0 {
self.inner.disconnect(user)?;
}
self.state.lock().unwrap().read_carry.clear();
self.base.set_connected(false);
Ok(())
}
fn io_flush(&mut self, user: &mut AsynUser) -> AsynResult<()> {
self.state.lock().unwrap().read_carry.clear();
self.inner.io_flush(user)
}
fn write_octet(&mut self, user: &mut AsynUser, data: &[u8]) -> AsynResult<()> {
self.base.check_ready()?;
self.set_address(user)?;
let eos = self.state.lock().unwrap().eos;
let mut out: Vec<u8> = Vec::with_capacity(data.len() + 4);
for &c in data {
Self::stash_char(&mut out, c);
}
if let Some(c) = eos {
Self::stash_char(&mut out, c);
}
out.push(b'\n');
self.inner.write_octet(user, &out)
}
fn read_octet(&mut self, user: &AsynUser, buf: &mut [u8]) -> AsynResult<usize> {
self.base.check_ready()?;
{
let mut st = self.state.lock().unwrap();
if !st.read_carry.is_empty() {
let n = st.read_carry.len().min(buf.len());
buf[..n].copy_from_slice(&st.read_carry[..n]);
st.read_carry.drain(..n);
return Ok(n);
}
}
self.set_address(user)?;
let eos = self.state.lock().unwrap().eos;
let cmd = match eos {
Some(c) => format!("++read {c}\n"),
None => "++read eoi\n".to_string(),
};
let mut bridge_user = AsynUser::default().with_timeout(Duration::from_secs(1));
self.inner.write_octet(&mut bridge_user, cmd.as_bytes())?;
let terminator = eos.unwrap_or(EOT_MARKER);
let mut acc: Vec<u8> = Vec::with_capacity(4096);
let mut chunk = vec![0u8; 4096];
let user_timeout = if user.timeout.is_zero() {
Duration::from_secs(1)
} else {
user.timeout
};
let mut read_timeout = user_timeout;
let mut at_eot = false;
loop {
let ru = AsynUser::default().with_timeout(read_timeout);
match self.inner.read_octet(&ru, &mut chunk) {
Ok(0) => break,
Ok(n) => {
acc.extend_from_slice(&chunk[..n]);
if let Some(&last) = acc.last() {
if last == terminator {
if eos.is_some() {
break;
}
read_timeout = Duration::from_millis(5);
at_eot = true;
continue;
}
}
read_timeout = user_timeout;
at_eot = false;
}
Err(AsynError::Status {
status: AsynStatus::Timeout,
..
}) if at_eot => break,
Err(e) => return Err(e),
}
}
if eos.is_none() && acc.last() == Some(&EOT_MARKER) {
acc.pop();
}
let n = acc.len().min(buf.len());
buf[..n].copy_from_slice(&acc[..n]);
if n < acc.len() {
self.state.lock().unwrap().read_carry = acc.split_off(n);
}
Ok(n)
}
}
#[cfg(test)]
mod tests {
use super::*;
use std::io::{Read, Write};
use std::net::TcpListener;
use std::sync::mpsc;
use std::thread;
#[test]
fn decode_addr_primary_only() {
assert_eq!(DrvAsynPrologixPort::decode_addr(0).unwrap(), (0, -1));
assert_eq!(DrvAsynPrologixPort::decode_addr(15).unwrap(), (15, -1));
assert_eq!(DrvAsynPrologixPort::decode_addr(30).unwrap(), (30, -1));
}
#[test]
fn decode_addr_secondary() {
assert_eq!(DrvAsynPrologixPort::decode_addr(512).unwrap(), (5, 12));
assert_eq!(DrvAsynPrologixPort::decode_addr(2030).unwrap(), (20, 30));
}
#[test]
fn decode_addr_rejects_oob_primary() {
assert!(DrvAsynPrologixPort::decode_addr(31).is_err());
assert!(DrvAsynPrologixPort::decode_addr(-1).is_err());
}
#[test]
fn decode_addr_rejects_oob_secondary() {
assert!(DrvAsynPrologixPort::decode_addr(531).is_err());
}
#[test]
fn addr_line_primary_only_format() {
assert_eq!(DrvAsynPrologixPort::addr_line(7, -1), "++addr 7\n");
}
#[test]
fn addr_line_secondary_adds_96() {
assert_eq!(DrvAsynPrologixPort::addr_line(5, 12), "++addr 5 108\n");
}
#[test]
fn stash_char_escapes_special_bytes() {
let mut buf = Vec::new();
for c in [b'\r', b'\n', 0x1B, b'+'] {
buf.clear();
DrvAsynPrologixPort::stash_char(&mut buf, c);
assert_eq!(buf, vec![0x1B, c], "byte 0x{c:02X} not escaped");
}
}
#[test]
fn stash_char_passes_normal_bytes() {
let mut buf = Vec::new();
for c in [b'A', b'0', b' ', 0x00, 0xFF] {
buf.clear();
DrvAsynPrologixPort::stash_char(&mut buf, c);
assert_eq!(buf, vec![c], "byte 0x{c:02X} unexpectedly escaped");
}
}
fn start_mock_bridge() -> (u16, mpsc::Receiver<Vec<u8>>) {
let listener = TcpListener::bind("127.0.0.1:0").unwrap();
let port = listener.local_addr().unwrap().port();
let (tx, rx) = mpsc::channel();
thread::spawn(move || {
let (mut stream, _) = listener.accept().unwrap();
stream
.set_read_timeout(Some(Duration::from_secs(2)))
.unwrap();
let mut acc = Vec::new();
let mut buf = [0u8; 4096];
let mut version_sent = false;
loop {
match stream.read(&mut buf) {
Ok(0) => break,
Ok(n) => {
acc.extend_from_slice(&buf[..n]);
if !version_sent && acc.windows(6).any(|w| w == b"++ver\n") {
stream.write_all(b"Prologix Test 1.0\r\n").unwrap();
version_sent = true;
}
}
Err(_) => break,
}
}
let _ = tx.send(acc);
});
(port, rx)
}
#[test]
fn connect_sends_init_burst_and_captures_version() {
let (port, rx) = start_mock_bridge();
let mut drv = DrvAsynPrologixPort::new("p", &format!("127.0.0.1:{port}"), false).unwrap();
let user = AsynUser::default().with_addr(-1);
drv.connect(&user).unwrap();
assert!(drv.base.connected);
assert_eq!(drv.version(), "Prologix Test 1.0");
drv.disconnect(&user).unwrap();
let captured = rx.recv_timeout(Duration::from_secs(2)).unwrap();
let s = String::from_utf8_lossy(&captured);
let expected_init = format!(
"++savecfg 0\n++mode 1\n++ifc\n++eos 3\n++eoi 1\n\
++eot_char {EOT_MARKER}\n++eot_enable 1\n++ver\n",
);
assert!(
s.starts_with(&expected_init),
"init burst mismatch — got: {s:?}"
);
}
#[test]
fn write_emits_addr_only_when_changed() {
let (port, rx) = start_mock_bridge();
let mut drv = DrvAsynPrologixPort::new("p", &format!("127.0.0.1:{port}"), false).unwrap();
let user_connect = AsynUser::default().with_addr(-1);
drv.connect(&user_connect).unwrap();
let mut user_w = AsynUser::default()
.with_addr(7)
.with_timeout(Duration::from_secs(2));
drv.write_octet(&mut user_w, b"*IDN?").unwrap();
drv.write_octet(&mut user_w, b"*IDN?").unwrap();
let mut user_w2 = AsynUser::default()
.with_addr(12)
.with_timeout(Duration::from_secs(2));
drv.write_octet(&mut user_w2, b"VAL?").unwrap();
drv.disconnect(&AsynUser::default().with_addr(-1)).unwrap();
let captured = rx.recv_timeout(Duration::from_secs(2)).unwrap();
let s = String::from_utf8_lossy(&captured).to_string();
let init_end = s.find("++ver\n").unwrap() + "++ver\n".len();
let post = &s[init_end..];
assert_eq!(
post, "++addr 7\n*IDN?\n*IDN?\n++addr 12\nVAL?\n",
"post-init wire bytes wrong: {post:?}"
);
}
#[test]
fn read_strips_eot_marker_in_eoi_mode() {
let listener = TcpListener::bind("127.0.0.1:0").unwrap();
let port = listener.local_addr().unwrap().port();
thread::spawn(move || {
let (mut stream, _) = listener.accept().unwrap();
stream
.set_read_timeout(Some(Duration::from_secs(2)))
.unwrap();
let mut acc = Vec::new();
let mut buf = [0u8; 4096];
let mut version_sent = false;
let mut read_replied = false;
loop {
match stream.read(&mut buf) {
Ok(0) => break,
Ok(n) => {
acc.extend_from_slice(&buf[..n]);
if !version_sent && acc.windows(6).any(|w| w == b"++ver\n") {
stream.write_all(b"Prologix Test 1.0\r\n").unwrap();
version_sent = true;
}
if !read_replied && acc.windows(11).any(|w| w == b"++read eoi\n") {
stream.write_all(b"42.5\n\xEF").unwrap();
read_replied = true;
}
}
Err(_) => break,
}
}
});
let mut drv = DrvAsynPrologixPort::new("p", &format!("127.0.0.1:{port}"), false).unwrap();
drv.connect(&AsynUser::default().with_addr(-1)).unwrap();
let user = AsynUser::default()
.with_addr(0)
.with_timeout(Duration::from_secs(2));
let mut buf = [0u8; 64];
let n = drv.read_octet(&user, &mut buf).unwrap();
assert_eq!(
&buf[..n],
b"42.5\n",
"EOT marker should be stripped, leaving `42.5\\n`"
);
drv.disconnect(&AsynUser::default().with_addr(-1)).unwrap();
}
#[test]
fn read_with_eos_keeps_terminator_byte() {
let listener = TcpListener::bind("127.0.0.1:0").unwrap();
let port = listener.local_addr().unwrap().port();
thread::spawn(move || {
let (mut stream, _) = listener.accept().unwrap();
stream
.set_read_timeout(Some(Duration::from_secs(2)))
.unwrap();
let mut acc = Vec::new();
let mut buf = [0u8; 4096];
let mut version_sent = false;
let mut read_replied = false;
loop {
match stream.read(&mut buf) {
Ok(0) => break,
Ok(n) => {
acc.extend_from_slice(&buf[..n]);
if !version_sent && acc.windows(6).any(|w| w == b"++ver\n") {
stream.write_all(b"Prologix Test 1.0\r\n").unwrap();
version_sent = true;
}
if !read_replied && acc.windows(10).any(|w| w == b"++read 10\n") {
stream.write_all(b"OK\n").unwrap();
read_replied = true;
}
}
Err(_) => break,
}
}
});
let mut drv = DrvAsynPrologixPort::new("p", &format!("127.0.0.1:{port}"), false).unwrap();
drv.connect(&AsynUser::default().with_addr(-1)).unwrap();
drv.set_eos(Some(b'\n')).unwrap();
let user = AsynUser::default()
.with_addr(0)
.with_timeout(Duration::from_secs(2));
let mut buf = [0u8; 64];
let n = drv.read_octet(&user, &mut buf).unwrap();
assert_eq!(
&buf[..n],
b"OK\n",
"eos char must be preserved as part of payload"
);
drv.disconnect(&AsynUser::default().with_addr(-1)).unwrap();
}
#[test]
fn write_escapes_special_chars_on_wire() {
let (port, rx) = start_mock_bridge();
let mut drv = DrvAsynPrologixPort::new("p", &format!("127.0.0.1:{port}"), false).unwrap();
drv.connect(&AsynUser::default().with_addr(-1)).unwrap();
let mut user_w = AsynUser::default()
.with_addr(0)
.with_timeout(Duration::from_secs(2));
drv.write_octet(&mut user_w, b"A+B").unwrap();
drv.disconnect(&AsynUser::default().with_addr(-1)).unwrap();
let captured = rx.recv_timeout(Duration::from_secs(2)).unwrap();
let post_init_idx = captured.windows(6).position(|w| w == b"++ver\n").unwrap() + 6;
let post = &captured[post_init_idx..];
assert!(
post.windows(5).any(|w| w == b"A\x1B+B\n"),
"expected escaped payload `A\\033+B\\n` — got: {:?}",
String::from_utf8_lossy(post)
);
}
}