use std::net::SocketAddr;
use std::sync::Arc;
use std::sync::atomic::{AtomicUsize, Ordering};
use std::time::Instant;
use dashmap::DashMap;
use epics_base_rs::error::{CaError, CaResult};
use epics_base_rs::runtime::sync::{mpsc, oneshot};
use epics_base_rs::types::{DbFieldType, EpicsValue};
use crate::channel::AccessRights;
use crate::client::state::ChannelState;
pub(crate) type ServerLastRxAt = Arc<DashMap<SocketAddr, Instant>>;
pub(crate) const SEND_BACKPRESSURE_FRAMES: usize = 4096;
#[derive(Clone)]
pub(crate) struct DirectServerWriter {
pub(crate) write_tx: mpsc::UnboundedSender<Vec<u8>>,
pub(crate) pending_frames: Arc<AtomicUsize>,
}
impl DirectServerWriter {
pub(crate) fn send_frame(&self, frame: Vec<u8>) -> CaResult<()> {
let pending = self.pending_frames.load(Ordering::Relaxed);
if pending >= SEND_BACKPRESSURE_FRAMES {
return Err(CaError::Disconnected);
}
self.pending_frames.fetch_add(1, Ordering::Relaxed);
if self.write_tx.send(frame).is_err() {
let prev = self.pending_frames.load(Ordering::Relaxed);
self.pending_frames
.store(prev.saturating_sub(1), Ordering::Relaxed);
return Err(CaError::Disconnected);
}
Ok(())
}
}
pub(crate) type DirectServerWriters = Arc<DashMap<SocketAddr, DirectServerWriter>>;
#[derive(Clone)]
pub(crate) struct ChannelSnapshotPublic {
pub sid: u32,
pub native_type: DbFieldType,
pub element_count: u32,
pub server_addr: SocketAddr,
pub access_rights: AccessRights,
pub state: ChannelState,
}
pub(crate) type ChannelSnapshots = Arc<DashMap<u32, ChannelSnapshotPublic>>;
pub(crate) type SearchAttempts = Arc<DashMap<u32, std::sync::atomic::AtomicU32>>;
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
#[non_exhaustive]
pub enum CaExceptionKind {
ServerError,
ServerDisconnect,
}
#[derive(Debug, Clone)]
#[non_exhaustive]
pub struct CaException {
pub kind: CaExceptionKind,
pub message: String,
pub server_addr: Option<SocketAddr>,
pub pv_name: Option<String>,
pub status: Option<u32>,
}
pub type CaExceptionHandler = Arc<dyn Fn(&CaException) + Send + Sync>;
pub(crate) type CaExceptionSlot = Arc<parking_lot::RwLock<Option<CaExceptionHandler>>>;
pub(crate) fn dispatch_exception(slot: &CaExceptionSlot, exc: CaException) {
let handler = slot.read().clone();
if let Some(h) = handler {
let _ = std::panic::catch_unwind(std::panic::AssertUnwindSafe(|| h(&exc)));
}
}
pub(crate) enum ReadReply {
Plain {
dbr_type: DbFieldType,
value: EpicsValue,
},
Raw {
data_type: u16,
count: u32,
data: Vec<u8>,
},
}
#[derive(Clone, Copy)]
pub(crate) enum ReadReplyMode {
Plain,
Raw,
}
pub(crate) type ReadReplyTx = oneshot::Sender<CaResult<ReadReply>>;
pub(crate) type WriteReplyTx = oneshot::Sender<CaResult<()>>;
pub(crate) type WarmReplySlot = Arc<parking_lot::Mutex<Option<ReadReplyTx>>>;
pub(crate) enum ReadWaiter {
OneShot {
cid: u32,
mode: ReadReplyMode,
reply_tx: ReadReplyTx,
},
Warm {
cid: u32,
mode: ReadReplyMode,
slot: WarmReplySlot,
},
}
impl ReadWaiter {
pub(crate) fn cid(&self) -> u32 {
match self {
Self::OneShot { cid, .. } => *cid,
Self::Warm { cid, .. } => *cid,
}
}
pub(crate) fn mode(&self) -> ReadReplyMode {
match self {
Self::OneShot { mode, .. } => *mode,
Self::Warm { mode, .. } => *mode,
}
}
pub(crate) fn send(self, result: CaResult<ReadReply>) {
match self {
Self::OneShot { reply_tx, .. } => {
let _ = reply_tx.send(result);
}
Self::Warm { slot, .. } => {
if let Some(tx) = slot.lock().take() {
let _ = tx.send(result);
}
}
}
}
}
#[derive(Clone, Default)]
pub(crate) struct InFlightOps {
pub(crate) reads: Arc<DashMap<u32, ReadWaiter>>,
pub(crate) writes: Arc<DashMap<u32, (u32, WriteReplyTx)>>,
}
impl InFlightOps {
pub(crate) fn new() -> Self {
Self::default()
}
}
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
pub(crate) enum SearchReason {
Initial,
Reconnect,
BeaconAnomaly,
}
pub(crate) enum SearchRequest {
Schedule {
cid: u32,
pv_name: String,
reason: SearchReason,
},
Cancel { cid: u32 },
ConnectResult {
cid: u32,
success: bool,
server_addr: SocketAddr,
},
AddAddress(SocketAddr),
SetAddressList(Vec<SocketAddr>),
}
pub(crate) enum SearchResponse {
Found { cid: u32, server_addr: SocketAddr },
}
pub(crate) enum TransportCommand {
CreateChannel {
cid: u32,
pv_name: String,
server_addr: SocketAddr,
},
ReadNotify {
sid: u32,
data_type: u16,
count: u32,
ioid: u32,
server_addr: SocketAddr,
},
Write {
sid: u32,
data_type: u16,
count: u32,
payload: Vec<u8>,
server_addr: SocketAddr,
},
WriteNotify {
sid: u32,
data_type: u16,
count: u32,
ioid: u32,
payload: Vec<u8>,
server_addr: SocketAddr,
},
Subscribe {
sid: u32,
data_type: u16,
count: u32,
subid: u32,
mask: u16,
server_addr: SocketAddr,
},
Unsubscribe {
sid: u32,
subid: u32,
data_type: u16,
server_addr: SocketAddr,
},
ClearChannel {
cid: u32,
sid: u32,
server_addr: SocketAddr,
},
BeaconArrivalNotify {
server_addr: SocketAddr,
anomaly: bool,
},
EventsOff {
server_addr: SocketAddr,
},
EventsOn {
server_addr: SocketAddr,
},
}
pub(crate) enum TransportEvent {
ChannelCreated {
cid: u32,
sid: u32,
data_type: u16,
element_count: u32,
access: AccessRights,
server_addr: SocketAddr,
},
MonitorData {
subid: u32,
data_type: u16,
count: u32,
data: Vec<u8>,
},
AccessRightsChanged {
cid: u32,
access: AccessRights,
},
ChannelCreateFailed {
cid: u32,
},
ServerError {
eca_status: u32,
original_request: Option<u16>,
message: String,
server_addr: SocketAddr,
},
TcpClosed {
server_addr: SocketAddr,
},
ServerDisconnect {
cid: u32,
server_addr: SocketAddr,
},
CircuitUnresponsive {
server_addr: SocketAddr,
},
CircuitResponsive {
server_addr: SocketAddr,
},
ServerVersion {
server_addr: SocketAddr,
minor_version: u16,
},
ServerConnected {
server_addr: SocketAddr,
},
}