Skip to main content

zerodds_routing_service/
forwarding.rs

1// SPDX-License-Identifier: Apache-2.0
2// Copyright 2026 ZeroDDS Contributors
3
4//! The forwarding engine: a pump that moves samples from one user reader to one
5//! user writer (possibly on a different domain), type-agnostically.
6//!
7//! Built on the runtime user-entity byte path (`register_user_reader_kind` →
8//! `mpsc::Receiver<UserSample>` → `write_user_sample_borrowed`), the same
9//! primitive the durability service uses. The engine adds: per-sample loop
10//! guard, representation-faithful encapsulation override, keyed-instance
11//! lifecycle propagation, and a pluggable [`SampleProcessor`] hook for content
12//! filtering / field transformation (wired in v3).
13
14use std::sync::Arc;
15use std::sync::atomic::{AtomicBool, Ordering};
16use std::sync::mpsc::{Receiver, RecvTimeoutError};
17use std::thread::JoinHandle;
18use std::time::Duration;
19
20use zerodds_dcps::runtime::{DcpsRuntime, UserSample};
21use zerodds_rtps::EntityId;
22
23use crate::metrics::RouteMetrics;
24
25/// Per-sample processor: content filter and/or field transformation.
26///
27/// `process` receives the input CDR body (no encapsulation header) and its
28/// representation (`0` = XCDR1, `1`/`2` = XCDR2). It returns:
29/// * `Some((payload, representation))` — forward this (possibly transformed)
30///   body with the given representation.
31/// * `None` — drop the sample (filtered out).
32///
33/// Byte pass-through routes use no processor (the body is forwarded verbatim).
34pub trait SampleProcessor: Send {
35    /// Process one sample body.
36    fn process(&mut self, payload: &[u8], representation: u8) -> Option<(Vec<u8>, u8)>;
37}
38
39/// Inputs needed to build a [`ForwardingSession`]. The engine registers the
40/// reader/writer and hands the pieces over.
41pub struct SessionParts {
42    /// Route name (for metrics + thread name).
43    pub route_name: String,
44    /// Channel of samples from the input reader.
45    pub rx: Receiver<UserSample>,
46    /// Output runtime (the participant on the output domain).
47    pub output_rt: Arc<DcpsRuntime>,
48    /// Output writer entity id.
49    pub writer_eid: EntityId,
50    /// Whether the topic is keyed (governs lifecycle forwarding).
51    pub keyed: bool,
52    /// Optional content-filter / transform processor (v3).
53    pub processor: Option<Box<dyn SampleProcessor>>,
54    /// Metrics sink for this route.
55    pub metrics: Arc<RouteMetrics>,
56}
57
58/// A running forward pump for one route.
59pub struct ForwardingSession {
60    route_name: String,
61    stop: Arc<AtomicBool>,
62    handle: Option<JoinHandle<()>>,
63}
64
65impl ForwardingSession {
66    /// Starts the pump thread.
67    ///
68    /// # Errors
69    /// [`RoutingError::Dds`] if the OS refuses to spawn the pump thread.
70    pub fn start(parts: SessionParts) -> crate::error::Result<Self> {
71        let stop = Arc::new(AtomicBool::new(false));
72        let pump_stop = Arc::clone(&stop);
73        let route_name = parts.route_name.clone();
74        let handle = std::thread::Builder::new()
75            .name(format!("router-pump-{}", parts.route_name))
76            .spawn(move || pump(parts, &pump_stop))
77            .map_err(|e| {
78                crate::error::RoutingError::Dds(format!("spawn router pump '{route_name}': {e}"))
79            })?;
80        Ok(Self {
81            route_name,
82            stop,
83            handle: Some(handle),
84        })
85    }
86
87    /// Route name this session forwards.
88    #[must_use]
89    pub fn route_name(&self) -> &str {
90        &self.route_name
91    }
92
93    /// Signals the pump to stop and joins it.
94    pub fn stop(&mut self) {
95        self.stop.store(true, Ordering::Relaxed);
96        if let Some(h) = self.handle.take() {
97            let _ = h.join();
98        }
99    }
100}
101
102impl Drop for ForwardingSession {
103    fn drop(&mut self) {
104        self.stop();
105    }
106}
107
108/// Maps a lifecycle [`ChangeKind`](zerodds_rtps::history_cache::ChangeKind) to
109/// the RTPS `PID_STATUS_INFO` bits (Spec §9.6.3.9).
110fn status_bits(kind: zerodds_rtps::history_cache::ChangeKind) -> u32 {
111    use zerodds_rtps::history_cache::ChangeKind;
112    use zerodds_rtps::inline_qos::status_info::{DISPOSED, UNREGISTERED};
113    match kind {
114        ChangeKind::NotAliveDisposed => DISPOSED,
115        ChangeKind::NotAliveUnregistered => UNREGISTERED,
116        ChangeKind::NotAliveDisposedUnregistered => DISPOSED | UNREGISTERED,
117        // Alive / AliveFiltered never reach here (handled as data).
118        ChangeKind::Alive | ChangeKind::AliveFiltered => 0,
119    }
120}
121
122fn pump(mut parts: SessionParts, stop: &AtomicBool) {
123    // Track the last representation written so the encapsulation override is
124    // set only on change (a topic is representation-consistent in practice).
125    let mut last_rep: u8 = 255;
126    while !stop.load(Ordering::Relaxed) {
127        let sample = match parts.rx.recv_timeout(Duration::from_millis(200)) {
128            Ok(s) => s,
129            Err(RecvTimeoutError::Timeout) => continue,
130            Err(RecvTimeoutError::Disconnected) => break,
131        };
132        match sample {
133            UserSample::Alive {
134                payload,
135                representation,
136                ..
137            } => {
138                // Filter / transform, or byte pass-through.
139                if let Some(p) = parts.processor.as_mut() {
140                    match p.process(payload.as_slice(), representation) {
141                        Some((out, rep)) => {
142                            set_rep(&parts, &mut last_rep, rep);
143                            write_alive(&parts, &out);
144                        }
145                        None => parts.metrics.inc_dropped_filter(),
146                    }
147                } else {
148                    set_rep(&parts, &mut last_rep, representation);
149                    write_alive(&parts, payload.as_slice());
150                }
151            }
152            UserSample::Lifecycle { key_hash, kind } => {
153                // Lifecycle (dispose/unregister) only carries meaning for keyed
154                // topics; forward it so a downstream reader sees the instance
155                // state transition.
156                if parts.keyed {
157                    if parts
158                        .output_rt
159                        .write_user_lifecycle(parts.writer_eid, key_hash, status_bits(kind))
160                        .is_ok()
161                    {
162                        parts.metrics.inc_lifecycle();
163                    } else {
164                        parts.metrics.inc_errors();
165                    }
166                }
167            }
168        }
169    }
170}
171
172fn set_rep(parts: &SessionParts, last_rep: &mut u8, rep: u8) {
173    if rep != *last_rep {
174        // representation: 0 = XCDR1, 1/2 = XCDR2 → data_representation id 0 / 2.
175        let off: i16 = if rep == 0 { 0 } else { 2 };
176        let _ = parts
177            .output_rt
178            .set_user_writer_data_rep_override(parts.writer_eid, Some(vec![off]));
179        *last_rep = rep;
180    }
181}
182
183fn write_alive(parts: &SessionParts, body: &[u8]) {
184    if parts
185        .output_rt
186        .write_user_sample_borrowed(parts.writer_eid, body)
187        .is_ok()
188    {
189        parts.metrics.inc_forwarded();
190    } else {
191        parts.metrics.inc_errors();
192    }
193}