use std::sync::Arc;
use std::sync::atomic::{AtomicU8, Ordering as AtomicOrdering};
use std::time::SystemTime;
use crate::error::AsynStatus;
#[derive(Debug, Clone)]
pub enum ParamSetValue {
Int32 {
reason: usize,
addr: i32,
value: i32,
},
Float64 {
reason: usize,
addr: i32,
value: f64,
},
Octet {
reason: usize,
addr: i32,
value: String,
},
Float64Array {
reason: usize,
addr: i32,
value: Vec<f64>,
},
Int32Array {
reason: usize,
addr: i32,
value: Vec<i32>,
},
UInt32Digital {
reason: usize,
addr: i32,
value: u32,
mask: u32,
interrupt_mask: u32,
},
}
#[derive(Debug, Clone)]
pub enum RequestOp {
OctetWrite {
data: Vec<u8>,
},
OctetRead {
buf_size: usize,
},
OctetWriteRead {
data: Vec<u8>,
buf_size: usize,
},
OctetWriteBinary {
data: Vec<u8>,
},
OctetReadBinary {
buf_size: usize,
},
Int32Write {
value: i32,
},
Int32Read,
Int64Write {
value: i64,
},
Int64Read,
Float64Write {
value: f64,
},
Float64Read,
UInt32DigitalWrite {
value: u32,
mask: u32,
},
UInt32DigitalRead {
mask: u32,
},
Flush,
Connect,
Disconnect,
ShutdownPort,
ConnectAddr,
DisconnectAddr,
EnableAddr,
DisableAddr,
SetEnable {
yes: bool,
},
SetAutoConnect {
yes: bool,
},
GetBoundsInt32,
GetBoundsInt64,
GetEnable,
GetAutoConnect,
BlockProcess,
UnblockProcess,
DrvUserCreate {
drv_info: String,
},
EnumRead,
EnumWrite {
index: usize,
},
Int32ArrayRead {
max_elements: usize,
},
Int32ArrayWrite {
data: Vec<i32>,
},
Float64ArrayRead {
max_elements: usize,
},
Float64ArrayWrite {
data: Vec<f64>,
},
Int8ArrayRead {
max_elements: usize,
},
Int8ArrayWrite {
data: Vec<i8>,
},
Int16ArrayRead {
max_elements: usize,
},
Int16ArrayWrite {
data: Vec<i16>,
},
Int64ArrayRead {
max_elements: usize,
},
Int64ArrayWrite {
data: Vec<i64>,
},
Float32ArrayRead {
max_elements: usize,
},
Float32ArrayWrite {
data: Vec<f32>,
},
CallParamCallbacks {
addr: i32,
updates: Vec<ParamSetValue>,
},
GetOption {
key: String,
},
SetOption {
key: String,
value: String,
},
Report {
level: i32,
},
SetInputEos {
eos: Vec<u8>,
},
SetOutputEos {
eos: Vec<u8>,
},
}
#[derive(Debug)]
pub struct RequestResult {
pub status: AsynStatus,
pub message: String,
pub nbytes: usize,
pub data: Option<Vec<u8>>,
pub int_val: Option<i32>,
pub int64_val: Option<i64>,
pub float_val: Option<f64>,
pub uint_val: Option<u32>,
pub reason: Option<usize>,
pub enum_index: Option<usize>,
pub int32_array: Option<Vec<i32>>,
pub float64_array: Option<Vec<f64>>,
pub int8_array: Option<Vec<i8>>,
pub int16_array: Option<Vec<i16>>,
pub int64_array: Option<Vec<i64>>,
pub float32_array: Option<Vec<f32>>,
pub alarm_status: u16,
pub alarm_severity: u16,
pub timestamp: Option<SystemTime>,
pub option_value: Option<String>,
pub bounds: Option<(i64, i64)>,
pub eom_reason: u32,
}
impl RequestResult {
fn base() -> Self {
Self {
status: AsynStatus::Success,
message: String::new(),
nbytes: 0,
data: None,
int_val: None,
int64_val: None,
float_val: None,
uint_val: None,
reason: None,
enum_index: None,
int32_array: None,
float64_array: None,
int8_array: None,
int16_array: None,
int64_array: None,
float32_array: None,
alarm_status: 0,
alarm_severity: 0,
timestamp: None,
option_value: None,
bounds: None,
eom_reason: 0,
}
}
pub fn write_ok() -> Self {
Self::base()
}
pub fn octet_read(buf: Vec<u8>, nbytes: usize) -> Self {
Self {
nbytes,
data: Some(buf),
..Self::base()
}
}
pub fn octet_read_eom(buf: Vec<u8>, nbytes: usize, eom_reason: u32) -> Self {
Self {
nbytes,
data: Some(buf),
eom_reason,
..Self::base()
}
}
pub fn int32_read(value: i32) -> Self {
Self {
int_val: Some(value),
..Self::base()
}
}
pub fn int64_read(value: i64) -> Self {
Self {
int64_val: Some(value),
..Self::base()
}
}
pub fn float64_read(value: f64) -> Self {
Self {
float_val: Some(value),
..Self::base()
}
}
pub fn uint32_read(value: u32) -> Self {
Self {
uint_val: Some(value),
..Self::base()
}
}
pub fn drv_user_create(reason: usize) -> Self {
Self {
reason: Some(reason),
..Self::base()
}
}
pub fn enum_read(index: usize) -> Self {
Self {
enum_index: Some(index),
..Self::base()
}
}
pub fn int32_array_read(data: Vec<i32>) -> Self {
Self {
int32_array: Some(data),
..Self::base()
}
}
pub fn float64_array_read(data: Vec<f64>) -> Self {
Self {
float64_array: Some(data),
..Self::base()
}
}
pub fn int8_array_read(data: Vec<i8>) -> Self {
Self {
int8_array: Some(data),
..Self::base()
}
}
pub fn int16_array_read(data: Vec<i16>) -> Self {
Self {
int16_array: Some(data),
..Self::base()
}
}
pub fn int64_array_read(data: Vec<i64>) -> Self {
Self {
int64_array: Some(data),
..Self::base()
}
}
pub fn float32_array_read(data: Vec<f32>) -> Self {
Self {
float32_array: Some(data),
..Self::base()
}
}
pub fn option_read(value: String) -> Self {
Self {
option_value: Some(value),
..Self::base()
}
}
pub fn bounds_read(low: i64, high: i64) -> Self {
Self {
bounds: Some((low, high)),
..Self::base()
}
}
pub fn with_alarm(
mut self,
alarm_status: u16,
alarm_severity: u16,
timestamp: Option<SystemTime>,
) -> Self {
self.alarm_status = alarm_status;
self.alarm_severity = alarm_severity;
self.timestamp = timestamp;
self
}
}
const STATE_QUEUED: u8 = 0;
const STATE_RUNNING: u8 = 1;
const STATE_DONE: u8 = 2;
const STATE_CANCELLED: u8 = 3;
#[derive(Clone, Debug)]
pub struct CancelToken(pub Arc<AtomicU8>);
impl CancelToken {
pub fn new() -> Self {
Self(Arc::new(AtomicU8::new(STATE_QUEUED)))
}
pub fn cancel(&self) -> bool {
self.0
.compare_exchange(
STATE_QUEUED,
STATE_CANCELLED,
AtomicOrdering::AcqRel,
AtomicOrdering::Acquire,
)
.is_ok()
}
pub fn begin_running(&self) -> bool {
let mut cur = self.0.load(AtomicOrdering::Acquire);
loop {
if cur == STATE_CANCELLED {
return false;
}
match self.0.compare_exchange_weak(
cur,
STATE_RUNNING,
AtomicOrdering::AcqRel,
AtomicOrdering::Acquire,
) {
Ok(_) => return true,
Err(actual) => cur = actual,
}
}
}
pub fn finish(&self) {
let _ = self.0.compare_exchange(
STATE_RUNNING,
STATE_DONE,
AtomicOrdering::AcqRel,
AtomicOrdering::Acquire,
);
}
pub fn is_cancelled(&self) -> bool {
self.0.load(AtomicOrdering::Acquire) == STATE_CANCELLED
}
}
impl Default for CancelToken {
fn default() -> Self {
Self::new()
}
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn cancel_succeeds_only_while_queued() {
let token = CancelToken::new();
assert!(!token.is_cancelled());
assert!(token.cancel(), "a queued request reports wasQueued==true");
assert!(token.is_cancelled());
assert!(
!token.begin_running(),
"a cancelled request is not claimed for execution"
);
}
#[test]
fn cancel_after_begin_running_is_noop() {
let token = CancelToken::new();
assert!(
token.begin_running(),
"the executor claims a queued request"
);
assert!(
!token.cancel(),
"a cancel during execution reports wasQueued==false"
);
assert!(
!token.is_cancelled(),
"the running I/O is not treated as cancelled"
);
token.finish();
assert!(!token.is_cancelled(), "the completed I/O applies normally");
}
#[test]
fn cancel_after_finish_is_noop() {
let token = CancelToken::new();
assert!(token.begin_running());
token.finish();
assert!(
!token.cancel(),
"a cancel after completion reports wasQueued==false"
);
assert!(!token.is_cancelled());
}
#[test]
fn begin_running_reclaims_token_for_next_phase() {
let token = CancelToken::new();
assert!(token.begin_running(), "write phase claims the queued token");
token.finish();
assert!(
token.begin_running(),
"read phase re-claims the finished token"
);
token.finish();
assert!(!token.is_cancelled());
}
#[test]
fn cancel_is_terminal_across_phases() {
let token = CancelToken::new();
assert!(token.cancel());
assert!(!token.begin_running(), "cancelled is terminal");
assert!(token.is_cancelled());
}
}