use std::net::SocketAddr;
use std::sync::Arc;
use std::sync::atomic::{AtomicU32, AtomicUsize, Ordering};
use std::time::Instant;
use dashmap::DashMap;
use epics_base_rs::error::{CaError, CaResult};
use epics_base_rs::runtime::sync::{mpsc, oneshot};
use epics_base_rs::types::{DbFieldType, EpicsValue};
use crate::channel::AccessRights;
use crate::client::state::ChannelState;
pub(crate) type CircuitKey = (SocketAddr, u8);
pub(crate) type ServerLastRxAt = Arc<DashMap<CircuitKey, Instant>>;
pub(crate) const SEND_BACKPRESSURE_FRAMES: usize = 4096;
#[derive(Clone)]
pub(crate) struct DirectServerWriter {
pub(crate) write_tx: mpsc::UnboundedSender<Vec<u8>>,
pub(crate) pending_frames: Arc<AtomicUsize>,
}
impl DirectServerWriter {
pub(crate) fn send_frame(&self, frame: Vec<u8>) -> CaResult<()> {
let pending = self.pending_frames.load(Ordering::Relaxed);
if pending >= SEND_BACKPRESSURE_FRAMES {
return Err(CaError::Disconnected);
}
self.pending_frames.fetch_add(1, Ordering::Relaxed);
if self.write_tx.send(frame).is_err() {
let mut current = self.pending_frames.load(Ordering::Relaxed);
loop {
let next = current.saturating_sub(1);
match self.pending_frames.compare_exchange_weak(
current,
next,
Ordering::Relaxed,
Ordering::Relaxed,
) {
Ok(_) => break,
Err(observed) => current = observed,
}
}
return Err(CaError::Disconnected);
}
Ok(())
}
}
pub(crate) type DirectServerWriters = Arc<DashMap<CircuitKey, DirectServerWriter>>;
#[derive(Clone)]
pub(crate) struct ChannelSnapshotPublic {
pub sid: u32,
pub native_type: DbFieldType,
pub element_count: u32,
pub server_addr: SocketAddr,
pub access_rights: AccessRights,
pub state: ChannelState,
}
pub(crate) type ChannelSnapshots = Arc<DashMap<u32, ChannelSnapshotPublic>>;
pub(crate) type SearchAttempts = Arc<DashMap<u32, std::sync::atomic::AtomicU32>>;
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
#[non_exhaustive]
pub enum CaExceptionKind {
ServerError,
ServerDisconnect,
}
#[derive(Debug, Clone)]
#[non_exhaustive]
pub struct CaException {
pub kind: CaExceptionKind,
pub message: String,
pub server_addr: Option<SocketAddr>,
pub pv_name: Option<String>,
pub status: Option<u32>,
}
pub type CaExceptionHandler = Arc<dyn Fn(&CaException) + Send + Sync>;
pub(crate) type CaExceptionSlot = Arc<parking_lot::RwLock<Option<CaExceptionHandler>>>;
pub(crate) fn dispatch_exception(slot: &CaExceptionSlot, exc: CaException) {
let handler = slot.read().clone();
if let Some(h) = handler {
let _ = std::panic::catch_unwind(std::panic::AssertUnwindSafe(|| h(&exc)));
}
}
#[derive(Clone, Debug)]
pub(crate) struct ClientIdentity {
pub(crate) user: String,
pub(crate) host: String,
}
impl ClientIdentity {
pub(crate) fn from_env() -> Self {
let user = epics_base_rs::runtime::env::get("USER")
.or_else(|| epics_base_rs::runtime::env::get("USERNAME"))
.unwrap_or_else(|| "unknown".to_string());
let host = epics_base_rs::runtime::env::hostname();
Self { user, host }
}
}
pub(crate) type ClientIdentitySlot = Arc<parking_lot::RwLock<ClientIdentity>>;
pub(crate) enum ReadReply {
Plain {
dbr_type: DbFieldType,
value: EpicsValue,
},
Raw {
data_type: u16,
count: u32,
data: Vec<u8>,
},
}
#[derive(Clone, Copy)]
pub(crate) enum ReadReplyMode {
Plain,
Raw,
}
pub(crate) type ReadReplyTx = oneshot::Sender<CaResult<ReadReply>>;
pub(crate) type WriteReplyTx = oneshot::Sender<CaResult<()>>;
pub(crate) type WarmReplySlot = Arc<parking_lot::Mutex<Option<ReadReplyTx>>>;
pub(crate) enum ReadWaiter {
OneShot {
cid: u32,
mode: ReadReplyMode,
reply_tx: ReadReplyTx,
},
Warm {
cid: u32,
mode: ReadReplyMode,
slot: WarmReplySlot,
},
}
impl ReadWaiter {
pub(crate) fn cid(&self) -> u32 {
match self {
Self::OneShot { cid, .. } => *cid,
Self::Warm { cid, .. } => *cid,
}
}
pub(crate) fn mode(&self) -> ReadReplyMode {
match self {
Self::OneShot { mode, .. } => *mode,
Self::Warm { mode, .. } => *mode,
}
}
pub(crate) fn send(self, result: CaResult<ReadReply>) {
match self {
Self::OneShot { reply_tx, .. } => {
let _ = reply_tx.send(result);
}
Self::Warm { slot, .. } => {
if let Some(tx) = slot.lock().take() {
let _ = tx.send(result);
}
}
}
}
}
#[derive(Clone)]
pub(crate) struct InFlightOps {
pub(crate) reads: Arc<DashMap<u32, ReadWaiter>>,
pub(crate) writes: Arc<DashMap<u32, (u32, WriteReplyTx)>>,
next_ioid: Arc<AtomicU32>,
}
impl InFlightOps {
pub(crate) fn new() -> Self {
Self {
reads: Arc::new(DashMap::new()),
writes: Arc::new(DashMap::new()),
next_ioid: Arc::new(AtomicU32::new(1)),
}
}
pub(crate) fn alloc_ioid(&self) -> u32 {
crate::channel::alloc_nonzero_probe(&self.next_ioid, |v| {
self.reads.contains_key(&v) || self.writes.contains_key(&v)
})
}
#[cfg(test)]
pub(crate) fn seed_next_ioid(&self, v: u32) {
self.next_ioid.store(v, Ordering::Relaxed);
}
}
#[derive(Clone)]
pub(crate) struct CidAllocator {
next: Arc<AtomicU32>,
live: Arc<DashMap<u32, ()>>,
}
impl CidAllocator {
pub(crate) fn new() -> Self {
Self {
next: Arc::new(AtomicU32::new(1)),
live: Arc::new(DashMap::new()),
}
}
pub(crate) fn allocate(&self) -> u32 {
let cid = crate::channel::alloc_nonzero_probe(&self.next, |v| self.live.contains_key(&v));
self.live.insert(cid, ());
cid
}
pub(crate) fn release(&self, cid: u32) {
self.live.remove(&cid);
}
#[cfg(test)]
pub(crate) fn live_len(&self) -> usize {
self.live.len()
}
#[cfg(test)]
pub(crate) fn seed_next(&self, v: u32) {
self.next.store(v, Ordering::Relaxed);
}
}
#[cfg(test)]
mod id_alloc_tests {
use super::*;
#[test]
fn ioid_alloc_is_monotonic_and_distinct() {
let f = InFlightOps::new();
let a = f.alloc_ioid();
let b = f.alloc_ioid();
assert_ne!(a, b);
assert_ne!(a, 0);
assert_ne!(b, 0);
}
#[test]
fn ioid_alloc_skips_live_id_on_wrap() {
let f = InFlightOps::new();
f.reads.insert(
1,
ReadWaiter::Warm {
cid: 1,
mode: ReadReplyMode::Plain,
slot: Arc::new(parking_lot::Mutex::new(None)),
},
);
f.seed_next_ioid(1);
let id = f.alloc_ioid();
assert_ne!(
id, 1,
"must not reissue an ioid whose read is still in flight"
);
assert!(!f.reads.contains_key(&id) && !f.writes.contains_key(&id));
}
#[test]
fn ioid_alloc_skips_live_write_on_wrap() {
let f = InFlightOps::new();
let (tx, _rx) = oneshot::channel();
f.writes.insert(2, (7, tx));
f.seed_next_ioid(2);
let id = f.alloc_ioid();
assert_ne!(
id, 2,
"must not reissue an ioid whose write is still in flight"
);
}
#[test]
fn cid_allocator_reserves_distinct_and_releases() {
let a = CidAllocator::new();
let c1 = a.allocate();
let c2 = a.allocate();
assert_ne!(c1, c2);
assert_ne!(c1, 0);
assert_eq!(a.live_len(), 2);
a.release(c1);
assert_eq!(a.live_len(), 1);
a.release(c1);
assert_eq!(a.live_len(), 1);
}
#[test]
fn cid_allocator_skips_live_cid_on_wrap() {
let a = CidAllocator::new();
let live = a.allocate();
a.seed_next(live);
let again = a.allocate();
assert_ne!(
again, live,
"must not reissue a cid whose channel is still live"
);
assert_eq!(a.live_len(), 2);
}
#[test]
fn cid_allocator_reuses_released_cid_only_after_release() {
let a = CidAllocator::new();
let c1 = a.allocate();
a.release(c1);
a.seed_next(c1);
let c2 = a.allocate();
assert_eq!(c2, c1, "a released cid is reusable on wrap");
}
}
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
pub(crate) enum SearchReason {
Initial,
Reconnect,
BeaconAnomaly,
}
pub(crate) enum SearchRequest {
Schedule {
cid: u32,
pv_name: String,
reason: SearchReason,
},
Cancel { cid: u32 },
ConnectResult {
cid: u32,
success: bool,
server_addr: SocketAddr,
},
AddAddress(SocketAddr),
RemoveAddress(SocketAddr),
SetAddressList(Vec<SocketAddr>),
}
pub(crate) enum SearchResponse {
Found {
cid: u32,
server_addr: SocketAddr,
},
MultiplyDefined {
pv_name: String,
prev_addr: SocketAddr,
new_addr: SocketAddr,
},
}
pub(crate) enum TransportCommand {
CreateChannel {
cid: u32,
pv_name: String,
server_addr: SocketAddr,
priority: u8,
},
ReadNotify {
sid: u32,
data_type: u16,
count: u32,
ioid: u32,
server_addr: SocketAddr,
priority: u8,
},
Write {
sid: u32,
data_type: u16,
count: u32,
payload: Vec<u8>,
server_addr: SocketAddr,
priority: u8,
},
WriteNotify {
sid: u32,
data_type: u16,
count: u32,
ioid: u32,
payload: Vec<u8>,
server_addr: SocketAddr,
priority: u8,
},
Subscribe {
sid: u32,
data_type: u16,
count: u32,
subid: u32,
mask: u16,
server_addr: SocketAddr,
priority: u8,
},
Unsubscribe {
sid: u32,
subid: u32,
data_type: u16,
count: u32,
server_addr: SocketAddr,
priority: u8,
},
ClearChannel {
cid: u32,
sid: u32,
server_addr: SocketAddr,
priority: u8,
},
BeaconArrivalNotify {
server_addr: SocketAddr,
anomaly: bool,
},
EventsOff {
server_addr: SocketAddr,
priority: u8,
},
EventsOn {
server_addr: SocketAddr,
priority: u8,
},
}
pub(crate) enum TransportEvent {
ChannelCreated {
cid: u32,
sid: u32,
data_type: u16,
element_count: u32,
access: AccessRights,
server_addr: SocketAddr,
priority: u8,
},
MonitorData {
subid: u32,
data_type: u16,
count: u32,
data: Vec<u8>,
},
MonitorStatusError {
subid: u32,
eca_status: u32,
},
AccessRightsChanged {
cid: u32,
access: AccessRights,
},
ChannelCreateFailed {
cid: u32,
},
ServerError {
eca_status: u32,
original_request: Option<u16>,
message: String,
server_addr: SocketAddr,
},
TcpClosed {
server_addr: SocketAddr,
priority: u8,
},
ServerDisconnect {
cid: u32,
server_addr: SocketAddr,
},
CircuitUnresponsive {
server_addr: SocketAddr,
priority: u8,
},
CircuitResponsive {
server_addr: SocketAddr,
priority: u8,
},
ServerVersion {
server_addr: SocketAddr,
priority: u8,
minor_version: u16,
},
ServerConnected {
server_addr: SocketAddr,
},
}