Skip to main content

zerodds_xrce_agent/
lib.rs

1// SPDX-License-Identifier: Apache-2.0
2// Copyright 2026 ZeroDDS Contributors
3
4//! XRCE Agent — DDS-Participant-Wrapper mit Pull-Modell (Spec §7.3).
5//!
6//! Crate `zerodds-xrce-agent`.
7//!
8//! # Spec-Mapping
9//!
10//! OMG DDS-XRCE 1.0 §7.3: "XRCE Agent stellt XRCE Client im DDS Data-
11//! Space dar; Client-Pull-Modell fuer disconnected Devices."
12//!
13//! Wir liefern eine in-process Agent-State-Machine [`XrceAgent`] mit:
14//!
15//! - `register_client(client_key)` — Client wird per ClientKey
16//!   registriert; Agent legt einen Object-Slot pro Client an.
17//! - `create_object(...)` — Spec §7.8.3 CREATE-Pfad.
18//! - `delete_object(...)` — Spec §7.8.3 DELETE-Pfad.
19//! - `submit_sample(reader, payload)` — DDS-Side-Push: legt ein
20//!   Sample in die Pull-Queue eines DataReaders.
21//! - `pull_sample(client_key, reader)` — Spec §7.3 Client-Pull-Modell.
22//!
23//! Der Agent selbst kapselt keinen DDS-Stack — das ist Aufgabe der
24//! integrierenden Anwendung. Wir liefern nur die Object-Verwaltung +
25//! Pull-Queue.
26//!
27//! Safety classification: **STANDARD**.
28
29#![cfg_attr(not(feature = "std"), no_std)]
30#![deny(unsafe_code)]
31#![warn(missing_docs)]
32
33extern crate alloc;
34
35use alloc::collections::{BTreeMap, VecDeque};
36use alloc::vec::Vec;
37
38use alloc::string::String;
39
40use zerodds_xrce::header::{CLIENT_KEY_LEN, ClientKey};
41use zerodds_xrce::object_id::ObjectId;
42use zerodds_xrce::object_repr::ObjectVariant;
43use zerodds_xrce::object_store::{CreateOutcome, CreationMode, ObjectStore};
44
45/// BTreeMap-faehiger Key aus ClientKey (ClientKey selbst hat kein
46/// `Ord`-Derive, weil ein Hash-Container im Original-Crate vorgesehen
47/// war; wir mappen auf das underlying `[u8; CLIENT_KEY_LEN]`).
48type ClientKeyOrd = [u8; CLIENT_KEY_LEN];
49
50fn ord_of(key: ClientKey) -> ClientKeyOrd {
51    key.0
52}
53
54/// Agent-spezifische Error-Klassen.
55#[derive(Debug, Clone, Copy, PartialEq, Eq)]
56pub enum AgentError {
57    /// Client unbekannt — Caller muss vorher `register_client` rufen.
58    UnknownClient,
59    /// DataReader-Object existiert nicht.
60    UnknownReader,
61    /// Pull-Queue ist voll (DoS-Schutz).
62    QueueFull,
63    /// Wire-Layer hat die Operation abgelehnt (z.B. ObjectId.kind
64    /// passt nicht zur Operation).
65    WireRejected,
66}
67
68/// Trace-Event fuer Operation-Tracing (Spec §8.5).
69///
70/// Wird pro CREATE/DELETE/PULL-Operation generiert und kann via
71/// [`XrceAgent::set_trace_sink`] an einen externen Logger geleitet
72/// werden (`tracing`-Crate, structured logger, Test-Sink).
73#[derive(Debug, Clone, PartialEq, Eq)]
74pub struct TraceEvent {
75    /// Operation-Name ("CREATE", "DELETE", "SUBMIT", "PULL").
76    pub operation: String,
77    /// Client-Key (8 bytes).
78    pub client_key: ClientKeyOrd,
79    /// Object-Id (2 bytes wire form).
80    pub object_id: [u8; 2],
81}
82
83/// Trace-Sink-Trait. Eine Implementation logged jedes Event in ein
84/// externes System (e.g. `tracing::info!`).
85pub trait TraceSink {
86    /// Loggt ein Trace-Event.
87    fn record(&mut self, event: TraceEvent);
88}
89
90/// In-Memory XRCE-Agent.
91pub struct XrceAgent {
92    /// Object-Stores pro Client.
93    clients: BTreeMap<ClientKeyOrd, ObjectStore>,
94    /// Pull-Queue pro (Client, Reader). Spec §7.3 Pull-Modell.
95    samples: BTreeMap<(ClientKeyOrd, [u8; 2]), VecDeque<Vec<u8>>>,
96    /// DoS-Cap: maximal gepufferte Samples pro Reader.
97    max_pending_samples: usize,
98    /// Optionaler Trace-Sink (Spec §8.5 Operation-Tracing).
99    trace_sink: Option<alloc::boxed::Box<dyn TraceSink + Send>>,
100}
101
102fn oid_key(oid: ObjectId) -> [u8; 2] {
103    oid.to_bytes()
104}
105
106impl XrceAgent {
107    /// Konstruktor mit Default-DoS-Caps (256 Samples pro Reader).
108    #[must_use]
109    pub fn new() -> Self {
110        Self::with_max_pending_samples(256)
111    }
112
113    /// Konstruktor mit Custom-DoS-Cap.
114    #[must_use]
115    pub fn with_max_pending_samples(max: usize) -> Self {
116        Self {
117            clients: BTreeMap::new(),
118            samples: BTreeMap::new(),
119            max_pending_samples: max,
120            trace_sink: None,
121        }
122    }
123
124    /// Spec §8.5 — registriert einen Operation-Trace-Sink. Pro
125    /// CREATE/DELETE/SUBMIT/PULL-Operation wird ein
126    /// [`TraceEvent`] erzeugt und an den Sink delegiert.
127    pub fn set_trace_sink(&mut self, sink: alloc::boxed::Box<dyn TraceSink + Send>) {
128        self.trace_sink = Some(sink);
129    }
130
131    fn trace(&mut self, op: &str, client_key: ClientKeyOrd, oid: [u8; 2]) {
132        if let Some(sink) = self.trace_sink.as_mut() {
133            sink.record(TraceEvent {
134                operation: String::from(op),
135                client_key,
136                object_id: oid,
137            });
138        }
139    }
140
141    /// Registriert einen neuen Client. Idempotent — mehrfaches
142    /// Registrieren mit derselben ClientKey ueberschreibt den
143    /// existierenden Slot **nicht**.
144    pub fn register_client(&mut self, client_key: ClientKey) {
145        self.clients.entry(ord_of(client_key)).or_default();
146    }
147
148    /// `true` wenn Client registriert ist.
149    #[must_use]
150    pub fn has_client(&self, client_key: ClientKey) -> bool {
151        self.clients.contains_key(&ord_of(client_key))
152    }
153
154    /// Anzahl registrierter Clients.
155    #[must_use]
156    pub fn client_count(&self) -> usize {
157        self.clients.len()
158    }
159
160    /// Spec §7.8.3 CREATE — registriert ein Objekt im Slot des Clients.
161    ///
162    /// # Errors
163    /// `UnknownClient`.
164    pub fn create_object(
165        &mut self,
166        client_key: ClientKey,
167        object_id: ObjectId,
168        representation: ObjectVariant,
169        mode: CreationMode,
170    ) -> Result<CreateOutcome, AgentError> {
171        let ord = ord_of(client_key);
172        let store = self
173            .clients
174            .get_mut(&ord)
175            .ok_or(AgentError::UnknownClient)?;
176        let kind = object_id.kind().map_err(|_| AgentError::WireRejected)?;
177        let outcome = store
178            .create(object_id, kind, representation, mode)
179            .map_err(|_| AgentError::WireRejected)?;
180        self.trace("CREATE", ord, oid_key(object_id));
181        Ok(outcome)
182    }
183
184    /// Spec §7.8.3 DELETE — entfernt ein Objekt + zugehoerige
185    /// Pull-Queue.
186    ///
187    /// # Errors
188    /// `UnknownClient`.
189    pub fn delete_object(
190        &mut self,
191        client_key: ClientKey,
192        object_id: ObjectId,
193    ) -> Result<bool, AgentError> {
194        let ord = ord_of(client_key);
195        let store = self
196            .clients
197            .get_mut(&ord)
198            .ok_or(AgentError::UnknownClient)?;
199        let removed = store.delete(object_id);
200        // Pull-Queue mit aufraeumen.
201        self.samples.remove(&(ord, oid_key(object_id)));
202        self.trace("DELETE", ord, oid_key(object_id));
203        Ok(removed)
204    }
205
206    /// DDS-Side-Push: ein neues Sample wird einer Reader-Queue
207    /// hinzugefuegt. Der Client holt es spaeter via [`pull_sample`].
208    ///
209    /// # Errors
210    /// `UnknownClient`, `UnknownReader`, `QueueFull`.
211    pub fn submit_sample(
212        &mut self,
213        client_key: ClientKey,
214        reader_id: ObjectId,
215        payload: Vec<u8>,
216    ) -> Result<(), AgentError> {
217        let ord = ord_of(client_key);
218        let store = self.clients.get(&ord).ok_or(AgentError::UnknownClient)?;
219        if store.get(reader_id).is_none() {
220            return Err(AgentError::UnknownReader);
221        }
222        let queue = self.samples.entry((ord, oid_key(reader_id))).or_default();
223        if queue.len() >= self.max_pending_samples {
224            return Err(AgentError::QueueFull);
225        }
226        queue.push_back(payload);
227        self.trace("SUBMIT", ord, oid_key(reader_id));
228        Ok(())
229    }
230
231    /// Spec §7.3 Client-Pull-Modell: liefert das aelteste gepufferte
232    /// Sample fuer den (Client, Reader) ab. Liefert `Ok(None)` wenn
233    /// keine Daten anliegen.
234    ///
235    /// # Errors
236    /// `UnknownClient`.
237    pub fn pull_sample(
238        &mut self,
239        client_key: ClientKey,
240        reader_id: ObjectId,
241    ) -> Result<Option<Vec<u8>>, AgentError> {
242        let ord = ord_of(client_key);
243        if !self.clients.contains_key(&ord) {
244            return Err(AgentError::UnknownClient);
245        }
246        let sample = self
247            .samples
248            .get_mut(&(ord, oid_key(reader_id)))
249            .and_then(VecDeque::pop_front);
250        if sample.is_some() {
251            self.trace("PULL", ord, oid_key(reader_id));
252        }
253        Ok(sample)
254    }
255
256    /// Anzahl gepufferter Samples fuer (Client, Reader).
257    #[must_use]
258    pub fn pending_samples(&self, client_key: ClientKey, reader_id: ObjectId) -> usize {
259        self.samples
260            .get(&(ord_of(client_key), oid_key(reader_id)))
261            .map_or(0, VecDeque::len)
262    }
263}
264
265impl Default for XrceAgent {
266    fn default() -> Self {
267        Self::new()
268    }
269}
270
271#[cfg(test)]
272#[allow(clippy::expect_used, clippy::unwrap_used, clippy::panic)]
273mod tests {
274    use super::*;
275    use zerodds_xrce::header::CLIENT_KEY_LEN;
276    use zerodds_xrce::object_kind::{OBJK_DATAREADER, ObjectKind};
277
278    fn key(b: u8) -> ClientKey {
279        ClientKey([b; CLIENT_KEY_LEN])
280    }
281
282    fn reader_id(raw: u16) -> ObjectId {
283        ObjectId::new(raw, ObjectKind::from_u8(OBJK_DATAREADER).unwrap()).unwrap()
284    }
285
286    #[test]
287    fn agent_starts_empty() {
288        let a = XrceAgent::new();
289        assert_eq!(a.client_count(), 0);
290    }
291
292    #[test]
293    fn register_client_idempotent() {
294        let mut a = XrceAgent::new();
295        let k = key(0x01);
296        a.register_client(k);
297        a.register_client(k);
298        assert!(a.has_client(k));
299        assert_eq!(a.client_count(), 1);
300    }
301
302    #[test]
303    fn create_object_for_unknown_client_rejected() {
304        let mut a = XrceAgent::new();
305        let oid = reader_id(0x010);
306        let err = a
307            .create_object(
308                key(0x99),
309                oid,
310                ObjectVariant::ByReference("r".into()),
311                CreationMode::default(),
312            )
313            .expect_err("unknown client");
314        assert_eq!(err, AgentError::UnknownClient);
315    }
316
317    #[test]
318    fn pull_sample_unknown_client_rejected() {
319        let mut a = XrceAgent::new();
320        let oid = reader_id(0x010);
321        let err = a.pull_sample(key(0x99), oid).expect_err("unknown");
322        assert_eq!(err, AgentError::UnknownClient);
323    }
324
325    #[test]
326    fn client_pull_empty_returns_none() {
327        let mut a = XrceAgent::new();
328        let k = key(0x01);
329        a.register_client(k);
330        let oid = reader_id(0x010);
331        let s = a.pull_sample(k, oid).expect("ok");
332        assert!(s.is_none());
333    }
334
335    #[test]
336    fn after_submit_pull_returns_sample_in_fifo_order() {
337        let mut a = XrceAgent::new();
338        let k = key(0x01);
339        a.register_client(k);
340        let reader = reader_id(0x010);
341        a.create_object(
342            k,
343            reader,
344            ObjectVariant::ByReference("R".into()),
345            CreationMode::default(),
346        )
347        .expect("create");
348        a.submit_sample(k, reader, alloc::vec![1, 2, 3])
349            .expect("s1");
350        a.submit_sample(k, reader, alloc::vec![4, 5, 6])
351            .expect("s2");
352        assert_eq!(a.pending_samples(k, reader), 2);
353        let p1 = a.pull_sample(k, reader).expect("ok1").expect("some1");
354        assert_eq!(p1, alloc::vec![1, 2, 3]);
355        let p2 = a.pull_sample(k, reader).expect("ok2").expect("some2");
356        assert_eq!(p2, alloc::vec![4, 5, 6]);
357        assert!(a.pull_sample(k, reader).expect("ok3").is_none());
358    }
359
360    #[test]
361    fn submit_to_unknown_reader_rejected() {
362        let mut a = XrceAgent::new();
363        let k = key(0x01);
364        a.register_client(k);
365        let reader = reader_id(0x010);
366        let err = a
367            .submit_sample(k, reader, alloc::vec![0])
368            .expect_err("unknown reader");
369        assert_eq!(err, AgentError::UnknownReader);
370    }
371
372    #[test]
373    fn dos_cap_max_pending_samples_enforced() {
374        let mut a = XrceAgent::with_max_pending_samples(2);
375        let k = key(0x01);
376        a.register_client(k);
377        let reader = reader_id(0x010);
378        a.create_object(
379            k,
380            reader,
381            ObjectVariant::ByReference("R".into()),
382            CreationMode::default(),
383        )
384        .expect("create");
385        a.submit_sample(k, reader, alloc::vec![1]).expect("s1");
386        a.submit_sample(k, reader, alloc::vec![2]).expect("s2");
387        let err = a
388            .submit_sample(k, reader, alloc::vec![3])
389            .expect_err("full");
390        assert_eq!(err, AgentError::QueueFull);
391    }
392
393    #[test]
394    fn delete_object_removes_pull_queue() {
395        let mut a = XrceAgent::new();
396        let k = key(0x01);
397        a.register_client(k);
398        let reader = reader_id(0x010);
399        a.create_object(
400            k,
401            reader,
402            ObjectVariant::ByReference("R".into()),
403            CreationMode::default(),
404        )
405        .expect("create");
406        a.submit_sample(k, reader, alloc::vec![1]).expect("s1");
407        assert_eq!(a.pending_samples(k, reader), 1);
408        let removed = a.delete_object(k, reader).expect("delete");
409        assert!(removed);
410        assert_eq!(a.pending_samples(k, reader), 0);
411    }
412
413    /// Spec §8.5 — Operation-Tracing.
414    #[test]
415    fn trace_sink_captures_create_delete_submit_pull() {
416        use alloc::sync::Arc;
417        use std::sync::Mutex;
418
419        struct CaptureSink(Arc<Mutex<Vec<TraceEvent>>>);
420        impl TraceSink for CaptureSink {
421            fn record(&mut self, event: TraceEvent) {
422                self.0.lock().unwrap().push(event);
423            }
424        }
425        let log: Arc<Mutex<Vec<TraceEvent>>> = Arc::new(Mutex::new(Vec::new()));
426        let mut a = XrceAgent::new();
427        a.set_trace_sink(alloc::boxed::Box::new(CaptureSink(Arc::clone(&log))));
428        let k = key(0x01);
429        a.register_client(k);
430        let reader = reader_id(0x010);
431        a.create_object(
432            k,
433            reader,
434            ObjectVariant::ByReference("R".into()),
435            CreationMode::default(),
436        )
437        .expect("create");
438        a.submit_sample(k, reader, alloc::vec![1]).expect("submit");
439        a.pull_sample(k, reader).expect("pull");
440        a.delete_object(k, reader).expect("delete");
441
442        let events = log.lock().unwrap();
443        let ops: Vec<&str> = events.iter().map(|e| e.operation.as_str()).collect();
444        assert_eq!(ops, vec!["CREATE", "SUBMIT", "PULL", "DELETE"]);
445        for e in events.iter() {
446            assert_eq!(e.client_key, k.0);
447            assert_eq!(e.object_id, reader.to_bytes());
448        }
449    }
450
451    /// Spec §8.4.6 — CREATE(OBJK_APPLICATION) als Top-Level-Container.
452    #[test]
453    fn create_application_object_via_objk_application() {
454        use zerodds_xrce::object_kind::{OBJK_APPLICATION, ObjectKind};
455        let mut a = XrceAgent::new();
456        let k = key(0x01);
457        a.register_client(k);
458        let app_oid = ObjectId::new(0x100, ObjectKind::from_u8(OBJK_APPLICATION).unwrap()).unwrap();
459        let outcome = a
460            .create_object(
461                k,
462                app_oid,
463                ObjectVariant::ByXmlString("<application name=\"App1\"/>".into()),
464                CreationMode::default(),
465            )
466            .expect("create app");
467        assert_eq!(outcome, CreateOutcome::Created);
468    }
469
470    /// Spec §8.4.1 — Logical Actions Performance.
471    /// Doc-Test der die durchschnittliche Operation-Latenz misst.
472    /// 1000 CREATE+DELETE-Operations in < 100 ms (Spec-Performance-
473    /// Floor; auf Standard-Hardware liegt der Wert weit darunter).
474    #[test]
475    fn agent_create_delete_latency_under_spec_floor() {
476        use std::time::Instant;
477        use zerodds_xrce::object_kind::{OBJK_PARTICIPANT, ObjectKind};
478
479        let mut a = XrceAgent::new();
480        let k = key(0x01);
481        a.register_client(k);
482        let kind = ObjectKind::from_u8(OBJK_PARTICIPANT).unwrap();
483        let start = Instant::now();
484        for i in 0..1000u16 {
485            let oid = ObjectId::new(i, kind).unwrap();
486            a.create_object(
487                k,
488                oid,
489                ObjectVariant::ByReference("p".into()),
490                CreationMode::default(),
491            )
492            .expect("create");
493            a.delete_object(k, oid).expect("delete");
494        }
495        let elapsed = start.elapsed();
496        assert!(
497            elapsed.as_millis() < 100,
498            "1000 CREATE+DELETE-Ops dauerten {} ms (Spec-Floor 100ms)",
499            elapsed.as_millis()
500        );
501    }
502
503    #[test]
504    fn multiple_clients_isolated() {
505        let mut a = XrceAgent::new();
506        let k1 = key(0x01);
507        let k2 = key(0x02);
508        a.register_client(k1);
509        a.register_client(k2);
510        let reader = reader_id(0x010);
511        a.create_object(
512            k1,
513            reader,
514            ObjectVariant::ByReference("R".into()),
515            CreationMode::default(),
516        )
517        .expect("c1");
518        a.create_object(
519            k2,
520            reader,
521            ObjectVariant::ByReference("R".into()),
522            CreationMode::default(),
523        )
524        .expect("c2");
525        a.submit_sample(k1, reader, alloc::vec![100]).expect("s1");
526        // Client 2 sieht das Sample von Client 1 NICHT.
527        assert_eq!(a.pending_samples(k2, reader), 0);
528        assert_eq!(a.pending_samples(k1, reader), 1);
529    }
530}