zerodds_dcps/entity.rs
1// SPDX-License-Identifier: Apache-2.0
2// Copyright 2026 ZeroDDS Contributors
3//! Entity-Lifecycle (DDS DCPS 1.4 §2.2.2.1) — gemeinsame Basis fuer
4//! `DomainParticipant`, `Publisher`, `Subscriber`, `Topic`,
5//! `DataWriter`, `DataReader`.
6//!
7//! Spec-Verhalten (§2.2.2.1.1 Entity-Base):
8//! 1. **Lifecycle:** `create_*` → `enable()` → operational → `delete_*`.
9//! Pre-`enable()` ist die Entity inert (kein Discovery, keine Wire-
10//! Aktivitaet); set_qos auf alle Felder erlaubt.
11//! 2. **set_qos** post-`enable()`: nur Felder mit "Changeable=YES"
12//! duerfen geaendert werden — sonst [`DdsError::ImmutablePolicy`]
13//! (§2.2.3 Tab. 2.13 Spalte "Changeable").
14//! 3. **enable()** ist idempotent. Wenn das Parent-Entity (Participant)
15//! `entity_factory.autoenable_created_entities=TRUE` hat, werden
16//! Children bei Erzeugung automatisch enabled.
17//! 4. **StatusCondition** ist der Hook fuer den `WaitSet` —
18//! `trigger_value()` liefert true, wenn ein Status mit Bit in der
19//! `enabled_statuses`-Mask aktiv ist.
20//! 5. **InstanceHandle** ist eindeutig pro Entity (lokaler 64-Bit-Counter,
21//! nicht auf der Wire — siehe [`crate::instance_handle`]).
22//!
23//! .1 liefert die low-level [`Entity`]-Trait + [`EntityState`]
24//! als Building-Block. Die Implementierungen (Publisher, DataWriter, ...)
25//! halten ein `Arc<EntityState>` und delegieren die Trait-Methoden.
26
27extern crate alloc;
28
29use alloc::sync::Arc;
30use core::sync::atomic::{AtomicBool, AtomicU32, Ordering};
31
32use crate::error::{DdsError, Result};
33use crate::instance_handle::{InstanceHandle, InstanceHandleAllocator};
34
35/// Globaler Allocator fuer Entity-InstanceHandles. Eine Instanz
36/// pro Process — Handles sind innerhalb des Process eindeutig.
37static ENTITY_HANDLE_ALLOCATOR: InstanceHandleAllocator = InstanceHandleAllocator::new();
38
39/// `StatusMask` — 32-bit Bitmask der Status-Kinds (DCPS §2.2.4.1).
40/// Werte aus [`crate::psm_constants::status`].
41pub type StatusMask = u32;
42
43/// Atomic-Container fuer den Entity-Lifecycle.
44#[derive(Debug)]
45pub struct EntityState {
46 enabled: AtomicBool,
47 /// `true` nach erfolgreichem `delete_*()` — Spec §2.2.1.1.5
48 /// (RC ALREADY_DELETED). Public-Ops MUESSEN
49 /// [`Self::check_not_deleted`] vor jedem Effekt aufrufen.
50 deleted: AtomicBool,
51 instance_handle: InstanceHandle,
52 /// Bitmask der **seit letztem `get_status_changes()` Read**
53 /// geaenderten Status-Bits.
54 status_changes: AtomicU32,
55 /// Bitmask der vom Listener abgedeckten Status-Bits (zur
56 /// Bubble-Up-Logik in ).
57 listener_mask: AtomicU32,
58}
59
60impl EntityState {
61 /// Neuer State, initial **disabled** (Spec-Default fuer alle
62 /// Entities ausser DomainParticipantFactory).
63 #[must_use]
64 pub fn new() -> Arc<Self> {
65 Arc::new(Self {
66 enabled: AtomicBool::new(false),
67 deleted: AtomicBool::new(false),
68 instance_handle: ENTITY_HANDLE_ALLOCATOR.allocate(),
69 status_changes: AtomicU32::new(0),
70 listener_mask: AtomicU32::new(0),
71 })
72 }
73
74 /// Neuer State, **bereits enabled** — fuer DomainParticipantFactory
75 /// (Spec §2.2.2.1.4: Factory ist immer enabled).
76 #[must_use]
77 pub fn new_enabled() -> Arc<Self> {
78 Arc::new(Self {
79 enabled: AtomicBool::new(true),
80 deleted: AtomicBool::new(false),
81 instance_handle: ENTITY_HANDLE_ALLOCATOR.allocate(),
82 status_changes: AtomicU32::new(0),
83 listener_mask: AtomicU32::new(0),
84 })
85 }
86
87 /// True wenn die Entity enabled ist.
88 #[must_use]
89 pub fn is_enabled(&self) -> bool {
90 self.enabled.load(Ordering::Acquire)
91 }
92
93 /// Setzt enabled=true (idempotent). Liefert `true` wenn der Aufruf
94 /// die Transition false→true vollzogen hat (fuer Cascade-Logik).
95 pub fn enable(&self) -> bool {
96 !self.enabled.swap(true, Ordering::AcqRel)
97 }
98
99 /// Lokaler 64-Bit-Identifier dieser Entity.
100 #[must_use]
101 pub fn instance_handle(&self) -> InstanceHandle {
102 self.instance_handle
103 }
104
105 /// Aktuelle Status-Changes-Mask. Lesen leert NICHT — der Caller
106 /// nimmt entscheidende Bits selbst zurueck via
107 /// [`Self::clear_status_changes`].
108 #[must_use]
109 pub fn status_changes(&self) -> StatusMask {
110 self.status_changes.load(Ordering::Acquire)
111 }
112
113 /// Setzt zusaetzliche Status-Bits (vom Discovery/Runtime-Layer
114 /// gerufen, wenn ein Status-Event eintrifft).
115 pub fn set_status_bits(&self, bits: StatusMask) {
116 self.status_changes.fetch_or(bits, Ordering::AcqRel);
117 }
118
119 /// Loescht die uebergebenen Bits aus der Status-Changes-Mask
120 /// (nach Caller's Read).
121 pub fn clear_status_changes(&self, bits: StatusMask) {
122 self.status_changes.fetch_and(!bits, Ordering::AcqRel);
123 }
124
125 /// Listener-Maske setzen — beeinflusst Bubble-Up.
126 pub fn set_listener_mask(&self, mask: StatusMask) {
127 self.listener_mask.store(mask, Ordering::Release);
128 }
129
130 /// Listener-Maske lesen.
131 #[must_use]
132 pub fn listener_mask(&self) -> StatusMask {
133 self.listener_mask.load(Ordering::Acquire)
134 }
135
136 /// `true` wenn die Entity bereits `delete_*` durchlaufen hat.
137 #[must_use]
138 pub fn is_deleted(&self) -> bool {
139 self.deleted.load(Ordering::Acquire)
140 }
141
142 /// Markiert die Entity als geloescht (idempotent). Liefert `true`
143 /// beim ersten Aufruf (Transition false→true), `false` bei
144 /// nachfolgenden Aufrufen.
145 pub fn mark_deleted(&self) -> bool {
146 !self.deleted.swap(true, Ordering::AcqRel)
147 }
148
149 /// Guard-Helper fuer Public-Ops: liefert `Err(AlreadyDeleted)`
150 /// wenn die Entity bereits geloescht wurde, sonst `Ok(())`.
151 /// Nutzungs-Pattern:
152 /// ```ignore
153 /// pub fn write(&self, sample: T) -> Result<()> {
154 /// self.entity_state().check_not_deleted()?;
155 /// // ... eigentliche Logik ...
156 /// }
157 /// ```
158 ///
159 /// # Errors
160 /// `DdsError::AlreadyDeleted` wenn `is_deleted() == true`.
161 pub fn check_not_deleted(&self) -> crate::error::Result<()> {
162 if self.is_deleted() {
163 Err(crate::error::DdsError::AlreadyDeleted)
164 } else {
165 Ok(())
166 }
167 }
168
169 /// Guard-Helper: liefert `Err(NotEnabled)` wenn die Entity nicht
170 /// enabled ist (Spec §2.2.2.1.1.7 RC NOT_ENABLED).
171 ///
172 /// # Errors
173 /// `DdsError::NotEnabled` wenn `is_enabled() == false`.
174 pub fn check_enabled(&self) -> crate::error::Result<()> {
175 if !self.is_enabled() {
176 Err(crate::error::DdsError::NotEnabled)
177 } else {
178 Ok(())
179 }
180 }
181}
182
183/// `StatusCondition` — Spec §2.2.2.1.6, der primaere WaitSet-Hook.
184///
185/// In minimal: traegt eine `enabled_statuses`-Mask + delegiert
186/// `trigger_value()` an [`EntityState::status_changes`]. In wird
187/// das Object voll integriert (set_enabled_statuses, attach to WaitSet).
188#[derive(Debug, Clone)]
189pub struct StatusCondition {
190 state: Arc<EntityState>,
191 enabled_statuses: Arc<AtomicU32>,
192}
193
194impl StatusCondition {
195 /// Konstruktor (intern; vom Entity erzeugt).
196 #[must_use]
197 pub fn new(state: Arc<EntityState>) -> Self {
198 Self {
199 state,
200 enabled_statuses: Arc::new(AtomicU32::new(crate::psm_constants::status::ANY)),
201 }
202 }
203
204 /// Setzt die `enabled_statuses`-Mask. Spec §2.2.2.1.6.
205 pub fn set_enabled_statuses(&self, mask: StatusMask) {
206 self.enabled_statuses.store(mask, Ordering::Release);
207 }
208
209 /// Liefert die aktuelle `enabled_statuses`-Mask.
210 #[must_use]
211 pub fn enabled_statuses(&self) -> StatusMask {
212 self.enabled_statuses.load(Ordering::Acquire)
213 }
214
215 /// True wenn (status_changes & enabled_statuses) != 0.
216 /// Spec §2.2.2.1.6 trigger_value.
217 #[must_use]
218 pub fn trigger_value(&self) -> bool {
219 let enabled = self.enabled_statuses.load(Ordering::Acquire);
220 let changes = self.state.status_changes();
221 (enabled & changes) != 0
222 }
223
224 /// Liefert das `InstanceHandle` der Entity, an die diese
225 /// StatusCondition gebunden ist. Spec DCPS 1.4 §2.2.2.1.9
226 /// `get_entity()` — die Rust-API liefert den Handle anstelle eines
227 /// `&dyn Entity`-Pointers, weil dieselbe `Arc<EntityState>` von
228 /// mehreren Entity-Wrappern (DataReader/DataWriter/...) gehalten
229 /// werden kann; der Handle ist die einzige Identitaet, die ueber
230 /// die Wrapper-Granularitaet hinaus stabil ist.
231 #[must_use]
232 pub fn get_entity_handle(&self) -> InstanceHandle {
233 self.state.instance_handle()
234 }
235
236 /// Liefert eine geteilte Referenz auf den zugrunde liegenden
237 /// `EntityState` (Spec §2.2.2.1.9 — direkter Pfad). Erlaubt
238 /// Caller-Code, Status-Mask und Lifecycle-Flags der Entity zu
239 /// inspizieren, ohne durch den Entity-Wrapper gehen zu muessen.
240 #[must_use]
241 pub fn entity_state(&self) -> &Arc<EntityState> {
242 &self.state
243 }
244}
245
246/// Entity-Trait — gemeinsame Lifecycle-API der 6 Entity-Typen
247/// (DCPS §2.2.2.1).
248///
249/// Nicht-blocking, Send+Sync — alle Methoden delegieren auf
250/// `Arc<EntityState>`.
251pub trait Entity {
252 /// QoS-Typ fuer diese Entity (z.B. `DomainParticipantQos`,
253 /// `DataWriterQos`, ...).
254 type Qos: Clone;
255
256 /// Liefert die aktuelle QoS (clone).
257 /// Spec §2.2.2.1.2 `get_qos`.
258 fn get_qos(&self) -> Self::Qos;
259
260 /// Aendert QoS. Pre-enable: alles erlaubt. Post-enable: nur
261 /// Felder mit "Changeable=YES" — sonst `ImmutablePolicy`-Error.
262 /// Spec §2.2.2.1.2 `set_qos`.
263 ///
264 /// # Errors
265 /// * [`DdsError::ImmutablePolicy`] wenn ein immutables Feld nach
266 /// `enable()` geaendert werden soll.
267 /// * [`DdsError::InconsistentPolicy`] wenn die neue QoS-Kombination
268 /// inkonsistent ist.
269 fn set_qos(&self, qos: Self::Qos) -> Result<()>;
270
271 /// Enabled die Entity (idempotent). Spec §2.2.2.1.4 `enable`.
272 ///
273 /// # Errors
274 /// [`DdsError::PreconditionNotMet`] wenn das Parent-Entity nicht
275 /// enabled ist (Spec: Children koennen nicht vor Parent enabled
276 /// werden — ausser Factory selbst).
277 fn enable(&self) -> Result<()>;
278
279 /// True wenn die Entity bereits enabled ist.
280 fn is_enabled(&self) -> bool {
281 self.entity_state().is_enabled()
282 }
283
284 /// `StatusCondition` dieser Entity.
285 /// Spec §2.2.2.1.6 `get_status_condition`.
286 fn get_status_condition(&self) -> StatusCondition {
287 StatusCondition::new(self.entity_state())
288 }
289
290 /// Bitmask der Status-Kinds, die seit letztem Read geaendert haben.
291 /// Spec §2.2.2.1.5 `get_status_changes`.
292 fn get_status_changes(&self) -> StatusMask {
293 self.entity_state().status_changes()
294 }
295
296 /// Lokaler 64-Bit-Identifier. Spec §2.2.2.1.7 `get_instance_handle`.
297 fn get_instance_handle(&self) -> InstanceHandle {
298 self.entity_state().instance_handle()
299 }
300
301 /// Interner Accessor — jede Impl liefert ihren `Arc<EntityState>`.
302 fn entity_state(&self) -> Arc<EntityState>;
303}
304
305/// Hilfsfunktion: validiert dass ein QoS-Feld `policy_name` post-enable
306/// nicht geaendert wurde. Verwendung in `set_qos`-Impls:
307///
308/// ```ignore
309/// if state.is_enabled() && new.durability != old.durability {
310/// return Err(immutable_if_enabled("DURABILITY"));
311/// }
312/// ```
313#[must_use]
314pub fn immutable_if_enabled(policy_name: &'static str) -> DdsError {
315 DdsError::ImmutablePolicy {
316 policy: policy_name,
317 }
318}
319
320#[cfg(test)]
321#[allow(clippy::expect_used)]
322mod tests {
323 use super::*;
324
325 #[test]
326 fn entity_state_starts_disabled() {
327 let s = EntityState::new();
328 assert!(!s.is_enabled());
329 }
330
331 #[test]
332 fn entity_state_factory_starts_enabled() {
333 let s = EntityState::new_enabled();
334 assert!(s.is_enabled());
335 }
336
337 #[test]
338 fn enable_is_idempotent_and_reports_first_transition() {
339 let s = EntityState::new();
340 assert!(s.enable(), "first enable returns true");
341 assert!(!s.enable(), "second enable returns false");
342 assert!(s.is_enabled());
343 }
344
345 #[test]
346 fn instance_handles_are_unique_per_entity() {
347 let a = EntityState::new();
348 let b = EntityState::new();
349 assert_ne!(a.instance_handle(), b.instance_handle());
350 }
351
352 #[test]
353 fn status_bits_or_in_and_clear() {
354 let s = EntityState::new();
355 s.set_status_bits(0b0011);
356 s.set_status_bits(0b1100);
357 assert_eq!(s.status_changes(), 0b1111);
358 s.clear_status_changes(0b0101);
359 assert_eq!(s.status_changes(), 0b1010);
360 }
361
362 #[test]
363 fn status_condition_trigger_value() {
364 let s = EntityState::new();
365 let cond = StatusCondition::new(s.clone());
366 cond.set_enabled_statuses(0b1010);
367
368 // Keine Status-Aenderung → kein Trigger.
369 assert!(!cond.trigger_value());
370
371 // Status mit nicht-enabled Bit → kein Trigger.
372 s.set_status_bits(0b0001);
373 assert!(!cond.trigger_value());
374
375 // Status mit enabled Bit → Trigger.
376 s.set_status_bits(0b0010);
377 assert!(cond.trigger_value());
378 }
379
380 #[test]
381 fn listener_mask_is_round_tripped() {
382 let s = EntityState::new();
383 s.set_listener_mask(0xABCD);
384 assert_eq!(s.listener_mask(), 0xABCD);
385 }
386
387 #[test]
388 fn immutable_if_enabled_returns_correct_error() {
389 let e = immutable_if_enabled("DURABILITY");
390 assert!(matches!(
391 e,
392 DdsError::ImmutablePolicy {
393 policy: "DURABILITY"
394 }
395 ));
396 }
397
398 // ---- §2.2.1.1.5 ALREADY_DELETED ----
399
400 #[test]
401 fn check_not_deleted_passes_for_fresh_entity() {
402 let s = EntityState::new();
403 assert!(s.check_not_deleted().is_ok());
404 assert!(!s.is_deleted());
405 }
406
407 #[test]
408 fn check_not_deleted_returns_already_deleted_after_mark() {
409 let s = EntityState::new();
410 let first = s.mark_deleted();
411 assert!(first, "first mark_deleted should return true");
412 assert!(s.is_deleted());
413 let res = s.check_not_deleted();
414 assert!(matches!(res, Err(DdsError::AlreadyDeleted)));
415 }
416
417 #[test]
418 fn mark_deleted_is_idempotent() {
419 let s = EntityState::new();
420 assert!(s.mark_deleted());
421 // Second call returns false (already-deleted state).
422 assert!(!s.mark_deleted());
423 assert!(s.is_deleted());
424 }
425
426 // ---- §2.2.1.1.7 NOT_ENABLED ----
427
428 #[test]
429 fn check_enabled_returns_not_enabled_for_disabled_entity() {
430 let s = EntityState::new();
431 assert!(!s.is_enabled());
432 let res = s.check_enabled();
433 assert!(matches!(res, Err(DdsError::NotEnabled)));
434 }
435
436 #[test]
437 fn check_enabled_passes_after_enable() {
438 let s = EntityState::new();
439 let _ = s.enable();
440 assert!(s.check_enabled().is_ok());
441 }
442
443 #[test]
444 fn check_enabled_passes_for_factory_entity() {
445 // DomainParticipantFactory ist immer enabled (Spec §2.2.2.1.4).
446 let s = EntityState::new_enabled();
447 assert!(s.check_enabled().is_ok());
448 }
449
450 // ---- §2.2.2.1.9 StatusCondition.get_entity ----
451
452 #[test]
453 fn status_condition_get_entity_handle_matches_owner_state() {
454 let state = EntityState::new();
455 let cond = StatusCondition::new(state.clone());
456 // Handle der Condition == Handle der Entity, an die sie gebunden ist.
457 assert_eq!(cond.get_entity_handle(), state.instance_handle());
458 }
459
460 #[test]
461 fn status_condition_get_entity_handle_unique_per_entity() {
462 // Zwei verschiedene Entities → zwei verschiedene Handles ueber
463 // ihre StatusConditions.
464 let s1 = EntityState::new();
465 let s2 = EntityState::new();
466 let c1 = StatusCondition::new(s1);
467 let c2 = StatusCondition::new(s2);
468 assert_ne!(c1.get_entity_handle(), c2.get_entity_handle());
469 }
470
471 #[test]
472 fn status_condition_entity_state_returns_same_arc() {
473 let state = EntityState::new();
474 let cond = StatusCondition::new(state.clone());
475 // Identitaet via Arc::ptr_eq — die Condition haelt genau diesen Arc,
476 // keinen Clone der Inner.
477 assert!(Arc::ptr_eq(&state, cond.entity_state()));
478 }
479
480 #[test]
481 fn status_condition_entity_state_reflects_lifecycle_changes() {
482 // get_entity-Pfad muss Lifecycle-Aenderungen sichtbar machen
483 // (z.B. enable, mark_deleted), damit Caller den State direkt
484 // inspizieren koennen.
485 let state = EntityState::new();
486 let cond = StatusCondition::new(state.clone());
487 assert!(!cond.entity_state().is_enabled());
488 let _ = state.enable();
489 assert!(cond.entity_state().is_enabled());
490 let _ = state.mark_deleted();
491 assert!(cond.entity_state().is_deleted());
492 }
493}