1use crate::DdsResult;
2use cyclonedds_rust_sys::*;
3use std::ffi::c_void;
4use std::sync::Arc;
5
6type DataAvailableCb = Arc<dyn Fn(i32) + Send + Sync>;
8type PublicationMatchedCb = Arc<dyn Fn(i32, dds_publication_matched_status_t) + Send + Sync>;
9type SubscriptionMatchedCb = Arc<dyn Fn(i32, dds_subscription_matched_status_t) + Send + Sync>;
10type LivelinessChangedCb = Arc<dyn Fn(i32, dds_liveliness_changed_status_t) + Send + Sync>;
11
12type InconsistentTopicCb = Arc<dyn Fn(i32, dds_inconsistent_topic_status_t) + Send + Sync>;
14type LivelinessLostCb = Arc<dyn Fn(i32, dds_liveliness_lost_status_t) + Send + Sync>;
15type OfferedDeadlineMissedCb = Arc<dyn Fn(i32, dds_offered_deadline_missed_status_t) + Send + Sync>;
16type OfferedIncompatibleQosCb =
17 Arc<dyn Fn(i32, dds_offered_incompatible_qos_status_t) + Send + Sync>;
18type DataOnReadersCb = Arc<dyn Fn(i32) + Send + Sync>;
19type SampleLostCb = Arc<dyn Fn(i32, dds_sample_lost_status_t) + Send + Sync>;
20type SampleRejectedCb = Arc<dyn Fn(i32, dds_sample_rejected_status_t) + Send + Sync>;
21type RequestedDeadlineMissedCb =
22 Arc<dyn Fn(i32, dds_requested_deadline_missed_status_t) + Send + Sync>;
23type RequestedIncompatibleQosCb =
24 Arc<dyn Fn(i32, dds_requested_incompatible_qos_status_t) + Send + Sync>;
25
26struct ListenerClosures {
27 on_data_available: Option<DataAvailableCb>,
29 on_publication_matched: Option<PublicationMatchedCb>,
30 on_subscription_matched: Option<SubscriptionMatchedCb>,
31 on_liveliness_changed: Option<LivelinessChangedCb>,
32 on_inconsistent_topic: Option<InconsistentTopicCb>,
34 on_liveliness_lost: Option<LivelinessLostCb>,
35 on_offered_deadline_missed: Option<OfferedDeadlineMissedCb>,
36 on_offered_incompatible_qos: Option<OfferedIncompatibleQosCb>,
37 on_data_on_readers: Option<DataOnReadersCb>,
38 on_sample_lost: Option<SampleLostCb>,
39 on_sample_rejected: Option<SampleRejectedCb>,
40 on_requested_deadline_missed: Option<RequestedDeadlineMissedCb>,
41 on_requested_incompatible_qos: Option<RequestedIncompatibleQosCb>,
42}
43
44pub struct Listener {
45 ptr: *mut dds_listener_t,
46 _closures: Box<ListenerClosures>,
47}
48
49unsafe extern "C" fn trampoline_data_available(reader: dds_entity_t, arg: *mut c_void) {
52 if arg.is_null() {
53 return;
54 }
55 let closures = &*(arg as *const ListenerClosures);
56 if let Some(ref cb) = closures.on_data_available {
57 cb(reader);
58 }
59}
60
61unsafe extern "C" fn trampoline_publication_matched(
62 writer: dds_entity_t,
63 status: dds_publication_matched_status_t,
64 arg: *mut c_void,
65) {
66 if arg.is_null() {
67 return;
68 }
69 let closures = &*(arg as *const ListenerClosures);
70 if let Some(ref cb) = closures.on_publication_matched {
71 cb(writer, status);
72 }
73}
74
75unsafe extern "C" fn trampoline_subscription_matched(
76 reader: dds_entity_t,
77 status: dds_subscription_matched_status_t,
78 arg: *mut c_void,
79) {
80 if arg.is_null() {
81 return;
82 }
83 let closures = &*(arg as *const ListenerClosures);
84 if let Some(ref cb) = closures.on_subscription_matched {
85 cb(reader, status);
86 }
87}
88
89unsafe extern "C" fn trampoline_liveliness_changed(
90 reader: dds_entity_t,
91 status: dds_liveliness_changed_status_t,
92 arg: *mut c_void,
93) {
94 if arg.is_null() {
95 return;
96 }
97 let closures = &*(arg as *const ListenerClosures);
98 if let Some(ref cb) = closures.on_liveliness_changed {
99 cb(reader, status);
100 }
101}
102
103unsafe extern "C" fn trampoline_inconsistent_topic(
106 topic: dds_entity_t,
107 status: dds_inconsistent_topic_status_t,
108 arg: *mut c_void,
109) {
110 if arg.is_null() {
111 return;
112 }
113 let closures = &*(arg as *const ListenerClosures);
114 if let Some(ref cb) = closures.on_inconsistent_topic {
115 cb(topic, status);
116 }
117}
118
119unsafe extern "C" fn trampoline_liveliness_lost(
120 writer: dds_entity_t,
121 status: dds_liveliness_lost_status_t,
122 arg: *mut c_void,
123) {
124 if arg.is_null() {
125 return;
126 }
127 let closures = &*(arg as *const ListenerClosures);
128 if let Some(ref cb) = closures.on_liveliness_lost {
129 cb(writer, status);
130 }
131}
132
133unsafe extern "C" fn trampoline_offered_deadline_missed(
134 writer: dds_entity_t,
135 status: dds_offered_deadline_missed_status_t,
136 arg: *mut c_void,
137) {
138 if arg.is_null() {
139 return;
140 }
141 let closures = &*(arg as *const ListenerClosures);
142 if let Some(ref cb) = closures.on_offered_deadline_missed {
143 cb(writer, status);
144 }
145}
146
147unsafe extern "C" fn trampoline_offered_incompatible_qos(
148 writer: dds_entity_t,
149 status: dds_offered_incompatible_qos_status_t,
150 arg: *mut c_void,
151) {
152 if arg.is_null() {
153 return;
154 }
155 let closures = &*(arg as *const ListenerClosures);
156 if let Some(ref cb) = closures.on_offered_incompatible_qos {
157 cb(writer, status);
158 }
159}
160
161unsafe extern "C" fn trampoline_data_on_readers(subscriber: dds_entity_t, arg: *mut c_void) {
162 if arg.is_null() {
163 return;
164 }
165 let closures = &*(arg as *const ListenerClosures);
166 if let Some(ref cb) = closures.on_data_on_readers {
167 cb(subscriber);
168 }
169}
170
171unsafe extern "C" fn trampoline_sample_lost(
172 reader: dds_entity_t,
173 status: dds_sample_lost_status_t,
174 arg: *mut c_void,
175) {
176 if arg.is_null() {
177 return;
178 }
179 let closures = &*(arg as *const ListenerClosures);
180 if let Some(ref cb) = closures.on_sample_lost {
181 cb(reader, status);
182 }
183}
184
185unsafe extern "C" fn trampoline_sample_rejected(
186 reader: dds_entity_t,
187 status: dds_sample_rejected_status_t,
188 arg: *mut c_void,
189) {
190 if arg.is_null() {
191 return;
192 }
193 let closures = &*(arg as *const ListenerClosures);
194 if let Some(ref cb) = closures.on_sample_rejected {
195 cb(reader, status);
196 }
197}
198
199unsafe extern "C" fn trampoline_requested_deadline_missed(
200 reader: dds_entity_t,
201 status: dds_requested_deadline_missed_status_t,
202 arg: *mut c_void,
203) {
204 if arg.is_null() {
205 return;
206 }
207 let closures = &*(arg as *const ListenerClosures);
208 if let Some(ref cb) = closures.on_requested_deadline_missed {
209 cb(reader, status);
210 }
211}
212
213unsafe extern "C" fn trampoline_requested_incompatible_qos(
214 reader: dds_entity_t,
215 status: dds_requested_incompatible_qos_status_t,
216 arg: *mut c_void,
217) {
218 if arg.is_null() {
219 return;
220 }
221 let closures = &*(arg as *const ListenerClosures);
222 if let Some(ref cb) = closures.on_requested_incompatible_qos {
223 cb(reader, status);
224 }
225}
226
227pub struct ListenerBuilder {
230 on_data_available: Option<DataAvailableCb>,
232 on_publication_matched: Option<PublicationMatchedCb>,
233 on_subscription_matched: Option<SubscriptionMatchedCb>,
234 on_liveliness_changed: Option<LivelinessChangedCb>,
235 on_inconsistent_topic: Option<InconsistentTopicCb>,
237 on_liveliness_lost: Option<LivelinessLostCb>,
238 on_offered_deadline_missed: Option<OfferedDeadlineMissedCb>,
239 on_offered_incompatible_qos: Option<OfferedIncompatibleQosCb>,
240 on_data_on_readers: Option<DataOnReadersCb>,
241 on_sample_lost: Option<SampleLostCb>,
242 on_sample_rejected: Option<SampleRejectedCb>,
243 on_requested_deadline_missed: Option<RequestedDeadlineMissedCb>,
244 on_requested_incompatible_qos: Option<RequestedIncompatibleQosCb>,
245}
246
247impl ListenerBuilder {
248 pub fn new() -> Self {
249 ListenerBuilder {
250 on_data_available: None,
251 on_publication_matched: None,
252 on_subscription_matched: None,
253 on_liveliness_changed: None,
254 on_inconsistent_topic: None,
255 on_liveliness_lost: None,
256 on_offered_deadline_missed: None,
257 on_offered_incompatible_qos: None,
258 on_data_on_readers: None,
259 on_sample_lost: None,
260 on_sample_rejected: None,
261 on_requested_deadline_missed: None,
262 on_requested_incompatible_qos: None,
263 }
264 }
265
266 pub fn on_data_available(mut self, cb: impl Fn(i32) + Send + Sync + 'static) -> Self {
269 self.on_data_available = Some(Arc::new(cb));
270 self
271 }
272
273 pub fn on_publication_matched(
274 mut self,
275 cb: impl Fn(i32, dds_publication_matched_status_t) + Send + Sync + 'static,
276 ) -> Self {
277 self.on_publication_matched = Some(Arc::new(cb));
278 self
279 }
280
281 pub fn on_subscription_matched(
282 mut self,
283 cb: impl Fn(i32, dds_subscription_matched_status_t) + Send + Sync + 'static,
284 ) -> Self {
285 self.on_subscription_matched = Some(Arc::new(cb));
286 self
287 }
288
289 pub fn on_liveliness_changed(
290 mut self,
291 cb: impl Fn(i32, dds_liveliness_changed_status_t) + Send + Sync + 'static,
292 ) -> Self {
293 self.on_liveliness_changed = Some(Arc::new(cb));
294 self
295 }
296
297 pub fn on_inconsistent_topic(
300 mut self,
301 cb: impl Fn(i32, dds_inconsistent_topic_status_t) + Send + Sync + 'static,
302 ) -> Self {
303 self.on_inconsistent_topic = Some(Arc::new(cb));
304 self
305 }
306
307 pub fn on_liveliness_lost(
308 mut self,
309 cb: impl Fn(i32, dds_liveliness_lost_status_t) + Send + Sync + 'static,
310 ) -> Self {
311 self.on_liveliness_lost = Some(Arc::new(cb));
312 self
313 }
314
315 pub fn on_offered_deadline_missed(
316 mut self,
317 cb: impl Fn(i32, dds_offered_deadline_missed_status_t) + Send + Sync + 'static,
318 ) -> Self {
319 self.on_offered_deadline_missed = Some(Arc::new(cb));
320 self
321 }
322
323 pub fn on_offered_incompatible_qos(
324 mut self,
325 cb: impl Fn(i32, dds_offered_incompatible_qos_status_t) + Send + Sync + 'static,
326 ) -> Self {
327 self.on_offered_incompatible_qos = Some(Arc::new(cb));
328 self
329 }
330
331 pub fn on_data_on_readers(mut self, cb: impl Fn(i32) + Send + Sync + 'static) -> Self {
332 self.on_data_on_readers = Some(Arc::new(cb));
333 self
334 }
335
336 pub fn on_sample_lost(
337 mut self,
338 cb: impl Fn(i32, dds_sample_lost_status_t) + Send + Sync + 'static,
339 ) -> Self {
340 self.on_sample_lost = Some(Arc::new(cb));
341 self
342 }
343
344 pub fn on_sample_rejected(
345 mut self,
346 cb: impl Fn(i32, dds_sample_rejected_status_t) + Send + Sync + 'static,
347 ) -> Self {
348 self.on_sample_rejected = Some(Arc::new(cb));
349 self
350 }
351
352 pub fn on_requested_deadline_missed(
353 mut self,
354 cb: impl Fn(i32, dds_requested_deadline_missed_status_t) + Send + Sync + 'static,
355 ) -> Self {
356 self.on_requested_deadline_missed = Some(Arc::new(cb));
357 self
358 }
359
360 pub fn on_requested_incompatible_qos(
361 mut self,
362 cb: impl Fn(i32, dds_requested_incompatible_qos_status_t) + Send + Sync + 'static,
363 ) -> Self {
364 self.on_requested_incompatible_qos = Some(Arc::new(cb));
365 self
366 }
367
368 pub fn build(self) -> DdsResult<Listener> {
371 let closures = Box::new(ListenerClosures {
372 on_data_available: self.on_data_available,
373 on_publication_matched: self.on_publication_matched,
374 on_subscription_matched: self.on_subscription_matched,
375 on_liveliness_changed: self.on_liveliness_changed,
376 on_inconsistent_topic: self.on_inconsistent_topic,
377 on_liveliness_lost: self.on_liveliness_lost,
378 on_offered_deadline_missed: self.on_offered_deadline_missed,
379 on_offered_incompatible_qos: self.on_offered_incompatible_qos,
380 on_data_on_readers: self.on_data_on_readers,
381 on_sample_lost: self.on_sample_lost,
382 on_sample_rejected: self.on_sample_rejected,
383 on_requested_deadline_missed: self.on_requested_deadline_missed,
384 on_requested_incompatible_qos: self.on_requested_incompatible_qos,
385 });
386
387 let arg_ptr = &*closures as *const ListenerClosures as *mut c_void;
388 let ptr = unsafe { dds_create_listener(arg_ptr) };
389 if ptr.is_null() {
390 return Err(crate::DdsError::OutOfResources);
391 }
392
393 unsafe {
394 if closures.on_data_available.is_some() {
396 dds_lset_data_available(ptr, Some(trampoline_data_available));
397 }
398 if closures.on_publication_matched.is_some() {
399 dds_lset_publication_matched(ptr, Some(trampoline_publication_matched));
400 }
401 if closures.on_subscription_matched.is_some() {
402 dds_lset_subscription_matched(ptr, Some(trampoline_subscription_matched));
403 }
404 if closures.on_liveliness_changed.is_some() {
405 dds_lset_liveliness_changed(ptr, Some(trampoline_liveliness_changed));
406 }
407 if closures.on_inconsistent_topic.is_some() {
409 dds_lset_inconsistent_topic(ptr, Some(trampoline_inconsistent_topic));
410 }
411 if closures.on_liveliness_lost.is_some() {
412 dds_lset_liveliness_lost(ptr, Some(trampoline_liveliness_lost));
413 }
414 if closures.on_offered_deadline_missed.is_some() {
415 dds_lset_offered_deadline_missed(ptr, Some(trampoline_offered_deadline_missed));
416 }
417 if closures.on_offered_incompatible_qos.is_some() {
418 dds_lset_offered_incompatible_qos(ptr, Some(trampoline_offered_incompatible_qos));
419 }
420 if closures.on_data_on_readers.is_some() {
421 dds_lset_data_on_readers(ptr, Some(trampoline_data_on_readers));
422 }
423 if closures.on_sample_lost.is_some() {
424 dds_lset_sample_lost(ptr, Some(trampoline_sample_lost));
425 }
426 if closures.on_sample_rejected.is_some() {
427 dds_lset_sample_rejected(ptr, Some(trampoline_sample_rejected));
428 }
429 if closures.on_requested_deadline_missed.is_some() {
430 dds_lset_requested_deadline_missed(ptr, Some(trampoline_requested_deadline_missed));
431 }
432 if closures.on_requested_incompatible_qos.is_some() {
433 dds_lset_requested_incompatible_qos(
434 ptr,
435 Some(trampoline_requested_incompatible_qos),
436 );
437 }
438 }
439
440 Ok(Listener {
441 ptr,
442 _closures: closures,
443 })
444 }
445}
446
447impl Default for ListenerBuilder {
448 fn default() -> Self {
449 Self::new()
450 }
451}
452
453impl Listener {
454 pub fn builder() -> ListenerBuilder {
455 ListenerBuilder::new()
456 }
457
458 pub fn as_ptr(&self) -> *mut dds_listener_t {
459 self.ptr
460 }
461}
462
463impl Drop for Listener {
464 fn drop(&mut self) {
465 unsafe {
466 dds_delete_listener(self.ptr);
467 }
468 }
469}