use std::sync::Arc;
use std::sync::atomic::{AtomicBool, Ordering};
use std::sync::mpsc::{Receiver, RecvTimeoutError};
use std::thread::JoinHandle;
use std::time::Duration;
use zerodds_dcps::runtime::{DcpsRuntime, UserSample};
use zerodds_rtps::EntityId;
use crate::metrics::RouteMetrics;
pub trait SampleProcessor: Send {
fn process(&mut self, payload: &[u8], representation: u8) -> Option<(Vec<u8>, u8)>;
}
pub struct SessionParts {
pub route_name: String,
pub rx: Receiver<UserSample>,
pub output_rt: Arc<DcpsRuntime>,
pub writer_eid: EntityId,
pub keyed: bool,
pub processor: Option<Box<dyn SampleProcessor>>,
pub metrics: Arc<RouteMetrics>,
}
pub struct ForwardingSession {
route_name: String,
stop: Arc<AtomicBool>,
handle: Option<JoinHandle<()>>,
}
impl ForwardingSession {
pub fn start(parts: SessionParts) -> crate::error::Result<Self> {
let stop = Arc::new(AtomicBool::new(false));
let pump_stop = Arc::clone(&stop);
let route_name = parts.route_name.clone();
let handle = std::thread::Builder::new()
.name(format!("router-pump-{}", parts.route_name))
.spawn(move || pump(parts, &pump_stop))
.map_err(|e| {
crate::error::RoutingError::Dds(format!("spawn router pump '{route_name}': {e}"))
})?;
Ok(Self {
route_name,
stop,
handle: Some(handle),
})
}
#[must_use]
pub fn route_name(&self) -> &str {
&self.route_name
}
pub fn stop(&mut self) {
self.stop.store(true, Ordering::Relaxed);
if let Some(h) = self.handle.take() {
let _ = h.join();
}
}
}
impl Drop for ForwardingSession {
fn drop(&mut self) {
self.stop();
}
}
fn status_bits(kind: zerodds_rtps::history_cache::ChangeKind) -> u32 {
use zerodds_rtps::history_cache::ChangeKind;
use zerodds_rtps::inline_qos::status_info::{DISPOSED, UNREGISTERED};
match kind {
ChangeKind::NotAliveDisposed => DISPOSED,
ChangeKind::NotAliveUnregistered => UNREGISTERED,
ChangeKind::NotAliveDisposedUnregistered => DISPOSED | UNREGISTERED,
ChangeKind::Alive | ChangeKind::AliveFiltered => 0,
}
}
fn pump(mut parts: SessionParts, stop: &AtomicBool) {
let mut last_rep: u8 = 255;
while !stop.load(Ordering::Relaxed) {
let sample = match parts.rx.recv_timeout(Duration::from_millis(200)) {
Ok(s) => s,
Err(RecvTimeoutError::Timeout) => continue,
Err(RecvTimeoutError::Disconnected) => break,
};
match sample {
UserSample::Alive {
payload,
representation,
..
} => {
if let Some(p) = parts.processor.as_mut() {
match p.process(payload.as_slice(), representation) {
Some((out, rep)) => {
set_rep(&parts, &mut last_rep, rep);
write_alive(&parts, &out);
}
None => parts.metrics.inc_dropped_filter(),
}
} else {
set_rep(&parts, &mut last_rep, representation);
write_alive(&parts, payload.as_slice());
}
}
UserSample::Lifecycle { key_hash, kind } => {
if parts.keyed {
if parts
.output_rt
.write_user_lifecycle(parts.writer_eid, key_hash, status_bits(kind))
.is_ok()
{
parts.metrics.inc_lifecycle();
} else {
parts.metrics.inc_errors();
}
}
}
}
}
}
fn set_rep(parts: &SessionParts, last_rep: &mut u8, rep: u8) {
if rep != *last_rep {
let off: i16 = if rep == 0 { 0 } else { 2 };
let _ = parts
.output_rt
.set_user_writer_data_rep_override(parts.writer_eid, Some(vec![off]));
*last_rep = rep;
}
}
fn write_alive(parts: &SessionParts, body: &[u8]) {
if parts
.output_rt
.write_user_sample_borrowed(parts.writer_eid, body)
.is_ok()
{
parts.metrics.inc_forwarded();
} else {
parts.metrics.inc_errors();
}
}