Skip to main content

zerodds_corba_dds_bridge/
sync.rs

1// SPDX-License-Identifier: Apache-2.0
2// Copyright 2026 ZeroDDS Contributors
3
4//! Lifecycle-Sync — Spec-Verhalten-Mapping zwischen CORBA-POA-State
5//! und DDS-Discovery.
6//!
7//! Ein CORBA-Object wird mit `activate_object` aktiv und mit
8//! `deactivate_object` inaktiv. Auf der DDS-Seite entspricht das einem
9//! `register_instance` / `unregister_instance` (Spec OMG DDS 1.4
10//! §2.2.2.2.1). Der `LifecycleSync` propagiert diese Events
11//! bidirektional.
12
13use alloc::collections::BTreeMap;
14use alloc::vec::Vec;
15
16use std::sync::Mutex;
17
18/// Lifecycle-Event.
19#[derive(Debug, Clone, PartialEq, Eq)]
20pub enum LifecycleEvent {
21    /// CORBA-Object wurde aktiviert; wir registrieren auf DDS-Seite
22    /// die zugehoerige Instance.
23    CorbaActivated {
24        /// Repository-ID.
25        repository_id: alloc::string::String,
26        /// Object-Key.
27        object_key: Vec<u8>,
28    },
29    /// CORBA-Object wurde deaktiviert; wir unregistern die Instance.
30    CorbaDeactivated {
31        /// Repository-ID.
32        repository_id: alloc::string::String,
33        /// Object-Key.
34        object_key: Vec<u8>,
35    },
36    /// DDS-Reader hat eine Instance entdeckt; wir koennen einen
37    /// CORBA-Forwarder-Servant aktivieren.
38    DdsInstanceDiscovered {
39        /// Topic.
40        topic: alloc::string::String,
41        /// Instance-Handle (Caller-Layer-Type).
42        instance_handle: u64,
43    },
44    /// DDS-Reader meldet `NOT_ALIVE_DISPOSED` — entsprechender
45    /// Forwarder-Servant wird deaktiviert.
46    DdsInstanceDisposed {
47        /// Topic.
48        topic: alloc::string::String,
49        /// Instance-Handle.
50        instance_handle: u64,
51    },
52}
53
54/// Lifecycle-Sync — sammelt Events und liefert sie an Caller.
55#[derive(Debug, Default)]
56pub struct LifecycleSync {
57    queue: Mutex<Vec<LifecycleEvent>>,
58    instance_handles: Mutex<BTreeMap<(alloc::string::String, u64), bool>>,
59}
60
61impl LifecycleSync {
62    /// Konstruktor.
63    #[must_use]
64    pub fn new() -> Self {
65        Self::default()
66    }
67
68    /// Notify-Hook fuer eingehende Events.
69    pub fn notify(&self, event: LifecycleEvent) {
70        if let LifecycleEvent::DdsInstanceDiscovered {
71            topic,
72            instance_handle,
73        } = &event
74        {
75            if let Ok(mut h) = self.instance_handles.lock() {
76                h.insert((topic.clone(), *instance_handle), true);
77            }
78        }
79        if let LifecycleEvent::DdsInstanceDisposed {
80            topic,
81            instance_handle,
82        } = &event
83        {
84            if let Ok(mut h) = self.instance_handles.lock() {
85                h.insert((topic.clone(), *instance_handle), false);
86            }
87        }
88        if let Ok(mut q) = self.queue.lock() {
89            q.push(event);
90        }
91    }
92
93    /// Drain alle pending Events.
94    #[must_use]
95    pub fn drain(&self) -> Vec<LifecycleEvent> {
96        self.queue
97            .lock()
98            .ok()
99            .map(|mut q| core::mem::take(&mut *q))
100            .unwrap_or_default()
101    }
102
103    /// Anzahl pending Events.
104    #[must_use]
105    pub fn pending(&self) -> usize {
106        self.queue.lock().map(|q| q.len()).unwrap_or(0)
107    }
108
109    /// Pruefe ob eine DDS-Instance momentan alive ist.
110    #[must_use]
111    pub fn is_dds_instance_alive(&self, topic: &str, handle: u64) -> bool {
112        self.instance_handles
113            .lock()
114            .map(|h| {
115                h.get(&(topic.to_string(), handle))
116                    .copied()
117                    .unwrap_or(false)
118            })
119            .unwrap_or(false)
120    }
121}
122
123#[cfg(test)]
124#[allow(clippy::expect_used, clippy::unwrap_used, clippy::panic)]
125mod tests {
126    use super::*;
127
128    #[test]
129    fn notify_and_drain_round_trip() {
130        let s = LifecycleSync::new();
131        s.notify(LifecycleEvent::CorbaActivated {
132            repository_id: "IDL:demo/Echo:1.0".into(),
133            object_key: alloc::vec![1],
134        });
135        s.notify(LifecycleEvent::CorbaDeactivated {
136            repository_id: "IDL:demo/Echo:1.0".into(),
137            object_key: alloc::vec![1],
138        });
139        assert_eq!(s.pending(), 2);
140        let events = s.drain();
141        assert_eq!(events.len(), 2);
142        assert_eq!(s.pending(), 0);
143    }
144
145    #[test]
146    fn dds_instance_alive_tracking() {
147        let s = LifecycleSync::new();
148        s.notify(LifecycleEvent::DdsInstanceDiscovered {
149            topic: "T".into(),
150            instance_handle: 42,
151        });
152        assert!(s.is_dds_instance_alive("T", 42));
153        s.notify(LifecycleEvent::DdsInstanceDisposed {
154            topic: "T".into(),
155            instance_handle: 42,
156        });
157        assert!(!s.is_dds_instance_alive("T", 42));
158    }
159
160    #[test]
161    fn unknown_instance_handle_is_not_alive() {
162        let s = LifecycleSync::new();
163        assert!(!s.is_dds_instance_alive("T", 99));
164    }
165
166    #[test]
167    fn multiple_topics_tracked_independently() {
168        let s = LifecycleSync::new();
169        s.notify(LifecycleEvent::DdsInstanceDiscovered {
170            topic: "T1".into(),
171            instance_handle: 1,
172        });
173        s.notify(LifecycleEvent::DdsInstanceDiscovered {
174            topic: "T2".into(),
175            instance_handle: 1,
176        });
177        s.notify(LifecycleEvent::DdsInstanceDisposed {
178            topic: "T1".into(),
179            instance_handle: 1,
180        });
181        assert!(!s.is_dds_instance_alive("T1", 1));
182        assert!(s.is_dds_instance_alive("T2", 1));
183    }
184}