zerodds_routing_service/
forwarding.rs1use std::sync::Arc;
15use std::sync::atomic::{AtomicBool, Ordering};
16use std::sync::mpsc::{Receiver, RecvTimeoutError};
17use std::thread::JoinHandle;
18use std::time::Duration;
19
20use zerodds_dcps::runtime::{DcpsRuntime, UserSample};
21use zerodds_rtps::EntityId;
22
23use crate::metrics::RouteMetrics;
24
25pub trait SampleProcessor: Send {
35 fn process(&mut self, payload: &[u8], representation: u8) -> Option<(Vec<u8>, u8)>;
37}
38
39pub struct SessionParts {
42 pub route_name: String,
44 pub rx: Receiver<UserSample>,
46 pub output_rt: Arc<DcpsRuntime>,
48 pub writer_eid: EntityId,
50 pub keyed: bool,
52 pub processor: Option<Box<dyn SampleProcessor>>,
54 pub metrics: Arc<RouteMetrics>,
56}
57
58pub struct ForwardingSession {
60 route_name: String,
61 stop: Arc<AtomicBool>,
62 handle: Option<JoinHandle<()>>,
63}
64
65impl ForwardingSession {
66 pub fn start(parts: SessionParts) -> crate::error::Result<Self> {
71 let stop = Arc::new(AtomicBool::new(false));
72 let pump_stop = Arc::clone(&stop);
73 let route_name = parts.route_name.clone();
74 let handle = std::thread::Builder::new()
75 .name(format!("router-pump-{}", parts.route_name))
76 .spawn(move || pump(parts, &pump_stop))
77 .map_err(|e| {
78 crate::error::RoutingError::Dds(format!("spawn router pump '{route_name}': {e}"))
79 })?;
80 Ok(Self {
81 route_name,
82 stop,
83 handle: Some(handle),
84 })
85 }
86
87 #[must_use]
89 pub fn route_name(&self) -> &str {
90 &self.route_name
91 }
92
93 pub fn stop(&mut self) {
95 self.stop.store(true, Ordering::Relaxed);
96 if let Some(h) = self.handle.take() {
97 let _ = h.join();
98 }
99 }
100}
101
102impl Drop for ForwardingSession {
103 fn drop(&mut self) {
104 self.stop();
105 }
106}
107
108fn status_bits(kind: zerodds_rtps::history_cache::ChangeKind) -> u32 {
111 use zerodds_rtps::history_cache::ChangeKind;
112 use zerodds_rtps::inline_qos::status_info::{DISPOSED, UNREGISTERED};
113 match kind {
114 ChangeKind::NotAliveDisposed => DISPOSED,
115 ChangeKind::NotAliveUnregistered => UNREGISTERED,
116 ChangeKind::NotAliveDisposedUnregistered => DISPOSED | UNREGISTERED,
117 ChangeKind::Alive | ChangeKind::AliveFiltered => 0,
119 }
120}
121
122fn pump(mut parts: SessionParts, stop: &AtomicBool) {
123 let mut last_rep: u8 = 255;
126 while !stop.load(Ordering::Relaxed) {
127 let sample = match parts.rx.recv_timeout(Duration::from_millis(200)) {
128 Ok(s) => s,
129 Err(RecvTimeoutError::Timeout) => continue,
130 Err(RecvTimeoutError::Disconnected) => break,
131 };
132 match sample {
133 UserSample::Alive {
134 payload,
135 representation,
136 ..
137 } => {
138 if let Some(p) = parts.processor.as_mut() {
140 match p.process(payload.as_slice(), representation) {
141 Some((out, rep)) => {
142 set_rep(&parts, &mut last_rep, rep);
143 write_alive(&parts, &out);
144 }
145 None => parts.metrics.inc_dropped_filter(),
146 }
147 } else {
148 set_rep(&parts, &mut last_rep, representation);
149 write_alive(&parts, payload.as_slice());
150 }
151 }
152 UserSample::Lifecycle { key_hash, kind } => {
153 if parts.keyed {
157 if parts
158 .output_rt
159 .write_user_lifecycle(parts.writer_eid, key_hash, status_bits(kind))
160 .is_ok()
161 {
162 parts.metrics.inc_lifecycle();
163 } else {
164 parts.metrics.inc_errors();
165 }
166 }
167 }
168 }
169 }
170}
171
172fn set_rep(parts: &SessionParts, last_rep: &mut u8, rep: u8) {
173 if rep != *last_rep {
174 let off: i16 = if rep == 0 { 0 } else { 2 };
176 let _ = parts
177 .output_rt
178 .set_user_writer_data_rep_override(parts.writer_eid, Some(vec![off]));
179 *last_rep = rep;
180 }
181}
182
183fn write_alive(parts: &SessionParts, body: &[u8]) {
184 if parts
185 .output_rt
186 .write_user_sample_borrowed(parts.writer_eid, body)
187 .is_ok()
188 {
189 parts.metrics.inc_forwarded();
190 } else {
191 parts.metrics.inc_errors();
192 }
193}