zerodds_corba_dds_bridge/
sync.rs1use alloc::collections::BTreeMap;
14use alloc::vec::Vec;
15
16use std::sync::Mutex;
17
18#[derive(Debug, Clone, PartialEq, Eq)]
20pub enum LifecycleEvent {
21 CorbaActivated {
24 repository_id: alloc::string::String,
26 object_key: Vec<u8>,
28 },
29 CorbaDeactivated {
31 repository_id: alloc::string::String,
33 object_key: Vec<u8>,
35 },
36 DdsInstanceDiscovered {
39 topic: alloc::string::String,
41 instance_handle: u64,
43 },
44 DdsInstanceDisposed {
47 topic: alloc::string::String,
49 instance_handle: u64,
51 },
52}
53
54#[derive(Debug, Default)]
56pub struct LifecycleSync {
57 queue: Mutex<Vec<LifecycleEvent>>,
58 instance_handles: Mutex<BTreeMap<(alloc::string::String, u64), bool>>,
59}
60
61impl LifecycleSync {
62 #[must_use]
64 pub fn new() -> Self {
65 Self::default()
66 }
67
68 pub fn notify(&self, event: LifecycleEvent) {
70 if let LifecycleEvent::DdsInstanceDiscovered {
71 topic,
72 instance_handle,
73 } = &event
74 {
75 if let Ok(mut h) = self.instance_handles.lock() {
76 h.insert((topic.clone(), *instance_handle), true);
77 }
78 }
79 if let LifecycleEvent::DdsInstanceDisposed {
80 topic,
81 instance_handle,
82 } = &event
83 {
84 if let Ok(mut h) = self.instance_handles.lock() {
85 h.insert((topic.clone(), *instance_handle), false);
86 }
87 }
88 if let Ok(mut q) = self.queue.lock() {
89 q.push(event);
90 }
91 }
92
93 #[must_use]
95 pub fn drain(&self) -> Vec<LifecycleEvent> {
96 self.queue
97 .lock()
98 .ok()
99 .map(|mut q| core::mem::take(&mut *q))
100 .unwrap_or_default()
101 }
102
103 #[must_use]
105 pub fn pending(&self) -> usize {
106 self.queue.lock().map(|q| q.len()).unwrap_or(0)
107 }
108
109 #[must_use]
111 pub fn is_dds_instance_alive(&self, topic: &str, handle: u64) -> bool {
112 self.instance_handles
113 .lock()
114 .map(|h| {
115 h.get(&(topic.to_string(), handle))
116 .copied()
117 .unwrap_or(false)
118 })
119 .unwrap_or(false)
120 }
121}
122
123#[cfg(test)]
124#[allow(clippy::expect_used, clippy::unwrap_used, clippy::panic)]
125mod tests {
126 use super::*;
127
128 #[test]
129 fn notify_and_drain_round_trip() {
130 let s = LifecycleSync::new();
131 s.notify(LifecycleEvent::CorbaActivated {
132 repository_id: "IDL:demo/Echo:1.0".into(),
133 object_key: alloc::vec![1],
134 });
135 s.notify(LifecycleEvent::CorbaDeactivated {
136 repository_id: "IDL:demo/Echo:1.0".into(),
137 object_key: alloc::vec![1],
138 });
139 assert_eq!(s.pending(), 2);
140 let events = s.drain();
141 assert_eq!(events.len(), 2);
142 assert_eq!(s.pending(), 0);
143 }
144
145 #[test]
146 fn dds_instance_alive_tracking() {
147 let s = LifecycleSync::new();
148 s.notify(LifecycleEvent::DdsInstanceDiscovered {
149 topic: "T".into(),
150 instance_handle: 42,
151 });
152 assert!(s.is_dds_instance_alive("T", 42));
153 s.notify(LifecycleEvent::DdsInstanceDisposed {
154 topic: "T".into(),
155 instance_handle: 42,
156 });
157 assert!(!s.is_dds_instance_alive("T", 42));
158 }
159
160 #[test]
161 fn unknown_instance_handle_is_not_alive() {
162 let s = LifecycleSync::new();
163 assert!(!s.is_dds_instance_alive("T", 99));
164 }
165
166 #[test]
167 fn multiple_topics_tracked_independently() {
168 let s = LifecycleSync::new();
169 s.notify(LifecycleEvent::DdsInstanceDiscovered {
170 topic: "T1".into(),
171 instance_handle: 1,
172 });
173 s.notify(LifecycleEvent::DdsInstanceDiscovered {
174 topic: "T2".into(),
175 instance_handle: 1,
176 });
177 s.notify(LifecycleEvent::DdsInstanceDisposed {
178 topic: "T1".into(),
179 instance_handle: 1,
180 });
181 assert!(!s.is_dds_instance_alive("T1", 1));
182 assert!(s.is_dds_instance_alive("T2", 1));
183 }
184}