Skip to main content

cyclonedds/
listener.rs

1use crate::DdsResult;
2use cyclonedds_rust_sys::*;
3use std::ffi::c_void;
4use std::sync::Arc;
5
6// Existing callback types (4)
7type DataAvailableCb = Arc<dyn Fn(i32) + Send + Sync>;
8type PublicationMatchedCb = Arc<dyn Fn(i32, dds_publication_matched_status_t) + Send + Sync>;
9type SubscriptionMatchedCb = Arc<dyn Fn(i32, dds_subscription_matched_status_t) + Send + Sync>;
10type LivelinessChangedCb = Arc<dyn Fn(i32, dds_liveliness_changed_status_t) + Send + Sync>;
11
12// New callback types (9 with status, 1 without)
13type InconsistentTopicCb = Arc<dyn Fn(i32, dds_inconsistent_topic_status_t) + Send + Sync>;
14type LivelinessLostCb = Arc<dyn Fn(i32, dds_liveliness_lost_status_t) + Send + Sync>;
15type OfferedDeadlineMissedCb = Arc<dyn Fn(i32, dds_offered_deadline_missed_status_t) + Send + Sync>;
16type OfferedIncompatibleQosCb =
17    Arc<dyn Fn(i32, dds_offered_incompatible_qos_status_t) + Send + Sync>;
18type DataOnReadersCb = Arc<dyn Fn(i32) + Send + Sync>;
19type SampleLostCb = Arc<dyn Fn(i32, dds_sample_lost_status_t) + Send + Sync>;
20type SampleRejectedCb = Arc<dyn Fn(i32, dds_sample_rejected_status_t) + Send + Sync>;
21type RequestedDeadlineMissedCb =
22    Arc<dyn Fn(i32, dds_requested_deadline_missed_status_t) + Send + Sync>;
23type RequestedIncompatibleQosCb =
24    Arc<dyn Fn(i32, dds_requested_incompatible_qos_status_t) + Send + Sync>;
25
26struct ListenerClosures {
27    // Existing
28    on_data_available: Option<DataAvailableCb>,
29    on_publication_matched: Option<PublicationMatchedCb>,
30    on_subscription_matched: Option<SubscriptionMatchedCb>,
31    on_liveliness_changed: Option<LivelinessChangedCb>,
32    // New
33    on_inconsistent_topic: Option<InconsistentTopicCb>,
34    on_liveliness_lost: Option<LivelinessLostCb>,
35    on_offered_deadline_missed: Option<OfferedDeadlineMissedCb>,
36    on_offered_incompatible_qos: Option<OfferedIncompatibleQosCb>,
37    on_data_on_readers: Option<DataOnReadersCb>,
38    on_sample_lost: Option<SampleLostCb>,
39    on_sample_rejected: Option<SampleRejectedCb>,
40    on_requested_deadline_missed: Option<RequestedDeadlineMissedCb>,
41    on_requested_incompatible_qos: Option<RequestedIncompatibleQosCb>,
42}
43
44pub struct Listener {
45    ptr: *mut dds_listener_t,
46    _closures: Box<ListenerClosures>,
47}
48
49// ── Existing trampolines ────────────────────────────────────────────
50
51unsafe extern "C" fn trampoline_data_available(reader: dds_entity_t, arg: *mut c_void) {
52    if arg.is_null() {
53        return;
54    }
55    let closures = &*(arg as *const ListenerClosures);
56    if let Some(ref cb) = closures.on_data_available {
57        cb(reader);
58    }
59}
60
61unsafe extern "C" fn trampoline_publication_matched(
62    writer: dds_entity_t,
63    status: dds_publication_matched_status_t,
64    arg: *mut c_void,
65) {
66    if arg.is_null() {
67        return;
68    }
69    let closures = &*(arg as *const ListenerClosures);
70    if let Some(ref cb) = closures.on_publication_matched {
71        cb(writer, status);
72    }
73}
74
75unsafe extern "C" fn trampoline_subscription_matched(
76    reader: dds_entity_t,
77    status: dds_subscription_matched_status_t,
78    arg: *mut c_void,
79) {
80    if arg.is_null() {
81        return;
82    }
83    let closures = &*(arg as *const ListenerClosures);
84    if let Some(ref cb) = closures.on_subscription_matched {
85        cb(reader, status);
86    }
87}
88
89unsafe extern "C" fn trampoline_liveliness_changed(
90    reader: dds_entity_t,
91    status: dds_liveliness_changed_status_t,
92    arg: *mut c_void,
93) {
94    if arg.is_null() {
95        return;
96    }
97    let closures = &*(arg as *const ListenerClosures);
98    if let Some(ref cb) = closures.on_liveliness_changed {
99        cb(reader, status);
100    }
101}
102
103// ── New trampolines ─────────────────────────────────────────────────
104
105unsafe extern "C" fn trampoline_inconsistent_topic(
106    topic: dds_entity_t,
107    status: dds_inconsistent_topic_status_t,
108    arg: *mut c_void,
109) {
110    if arg.is_null() {
111        return;
112    }
113    let closures = &*(arg as *const ListenerClosures);
114    if let Some(ref cb) = closures.on_inconsistent_topic {
115        cb(topic, status);
116    }
117}
118
119unsafe extern "C" fn trampoline_liveliness_lost(
120    writer: dds_entity_t,
121    status: dds_liveliness_lost_status_t,
122    arg: *mut c_void,
123) {
124    if arg.is_null() {
125        return;
126    }
127    let closures = &*(arg as *const ListenerClosures);
128    if let Some(ref cb) = closures.on_liveliness_lost {
129        cb(writer, status);
130    }
131}
132
133unsafe extern "C" fn trampoline_offered_deadline_missed(
134    writer: dds_entity_t,
135    status: dds_offered_deadline_missed_status_t,
136    arg: *mut c_void,
137) {
138    if arg.is_null() {
139        return;
140    }
141    let closures = &*(arg as *const ListenerClosures);
142    if let Some(ref cb) = closures.on_offered_deadline_missed {
143        cb(writer, status);
144    }
145}
146
147unsafe extern "C" fn trampoline_offered_incompatible_qos(
148    writer: dds_entity_t,
149    status: dds_offered_incompatible_qos_status_t,
150    arg: *mut c_void,
151) {
152    if arg.is_null() {
153        return;
154    }
155    let closures = &*(arg as *const ListenerClosures);
156    if let Some(ref cb) = closures.on_offered_incompatible_qos {
157        cb(writer, status);
158    }
159}
160
161unsafe extern "C" fn trampoline_data_on_readers(subscriber: dds_entity_t, arg: *mut c_void) {
162    if arg.is_null() {
163        return;
164    }
165    let closures = &*(arg as *const ListenerClosures);
166    if let Some(ref cb) = closures.on_data_on_readers {
167        cb(subscriber);
168    }
169}
170
171unsafe extern "C" fn trampoline_sample_lost(
172    reader: dds_entity_t,
173    status: dds_sample_lost_status_t,
174    arg: *mut c_void,
175) {
176    if arg.is_null() {
177        return;
178    }
179    let closures = &*(arg as *const ListenerClosures);
180    if let Some(ref cb) = closures.on_sample_lost {
181        cb(reader, status);
182    }
183}
184
185unsafe extern "C" fn trampoline_sample_rejected(
186    reader: dds_entity_t,
187    status: dds_sample_rejected_status_t,
188    arg: *mut c_void,
189) {
190    if arg.is_null() {
191        return;
192    }
193    let closures = &*(arg as *const ListenerClosures);
194    if let Some(ref cb) = closures.on_sample_rejected {
195        cb(reader, status);
196    }
197}
198
199unsafe extern "C" fn trampoline_requested_deadline_missed(
200    reader: dds_entity_t,
201    status: dds_requested_deadline_missed_status_t,
202    arg: *mut c_void,
203) {
204    if arg.is_null() {
205        return;
206    }
207    let closures = &*(arg as *const ListenerClosures);
208    if let Some(ref cb) = closures.on_requested_deadline_missed {
209        cb(reader, status);
210    }
211}
212
213unsafe extern "C" fn trampoline_requested_incompatible_qos(
214    reader: dds_entity_t,
215    status: dds_requested_incompatible_qos_status_t,
216    arg: *mut c_void,
217) {
218    if arg.is_null() {
219        return;
220    }
221    let closures = &*(arg as *const ListenerClosures);
222    if let Some(ref cb) = closures.on_requested_incompatible_qos {
223        cb(reader, status);
224    }
225}
226
227// ── ListenerBuilder ─────────────────────────────────────────────────
228
229pub struct ListenerBuilder {
230    // Existing
231    on_data_available: Option<DataAvailableCb>,
232    on_publication_matched: Option<PublicationMatchedCb>,
233    on_subscription_matched: Option<SubscriptionMatchedCb>,
234    on_liveliness_changed: Option<LivelinessChangedCb>,
235    // New
236    on_inconsistent_topic: Option<InconsistentTopicCb>,
237    on_liveliness_lost: Option<LivelinessLostCb>,
238    on_offered_deadline_missed: Option<OfferedDeadlineMissedCb>,
239    on_offered_incompatible_qos: Option<OfferedIncompatibleQosCb>,
240    on_data_on_readers: Option<DataOnReadersCb>,
241    on_sample_lost: Option<SampleLostCb>,
242    on_sample_rejected: Option<SampleRejectedCb>,
243    on_requested_deadline_missed: Option<RequestedDeadlineMissedCb>,
244    on_requested_incompatible_qos: Option<RequestedIncompatibleQosCb>,
245}
246
247impl ListenerBuilder {
248    pub fn new() -> Self {
249        ListenerBuilder {
250            on_data_available: None,
251            on_publication_matched: None,
252            on_subscription_matched: None,
253            on_liveliness_changed: None,
254            on_inconsistent_topic: None,
255            on_liveliness_lost: None,
256            on_offered_deadline_missed: None,
257            on_offered_incompatible_qos: None,
258            on_data_on_readers: None,
259            on_sample_lost: None,
260            on_sample_rejected: None,
261            on_requested_deadline_missed: None,
262            on_requested_incompatible_qos: None,
263        }
264    }
265
266    // ── Existing builder methods ────────────────────────────────────
267
268    pub fn on_data_available(mut self, cb: impl Fn(i32) + Send + Sync + 'static) -> Self {
269        self.on_data_available = Some(Arc::new(cb));
270        self
271    }
272
273    pub fn on_publication_matched(
274        mut self,
275        cb: impl Fn(i32, dds_publication_matched_status_t) + Send + Sync + 'static,
276    ) -> Self {
277        self.on_publication_matched = Some(Arc::new(cb));
278        self
279    }
280
281    pub fn on_subscription_matched(
282        mut self,
283        cb: impl Fn(i32, dds_subscription_matched_status_t) + Send + Sync + 'static,
284    ) -> Self {
285        self.on_subscription_matched = Some(Arc::new(cb));
286        self
287    }
288
289    pub fn on_liveliness_changed(
290        mut self,
291        cb: impl Fn(i32, dds_liveliness_changed_status_t) + Send + Sync + 'static,
292    ) -> Self {
293        self.on_liveliness_changed = Some(Arc::new(cb));
294        self
295    }
296
297    // ── New builder methods ─────────────────────────────────────────
298
299    pub fn on_inconsistent_topic(
300        mut self,
301        cb: impl Fn(i32, dds_inconsistent_topic_status_t) + Send + Sync + 'static,
302    ) -> Self {
303        self.on_inconsistent_topic = Some(Arc::new(cb));
304        self
305    }
306
307    pub fn on_liveliness_lost(
308        mut self,
309        cb: impl Fn(i32, dds_liveliness_lost_status_t) + Send + Sync + 'static,
310    ) -> Self {
311        self.on_liveliness_lost = Some(Arc::new(cb));
312        self
313    }
314
315    pub fn on_offered_deadline_missed(
316        mut self,
317        cb: impl Fn(i32, dds_offered_deadline_missed_status_t) + Send + Sync + 'static,
318    ) -> Self {
319        self.on_offered_deadline_missed = Some(Arc::new(cb));
320        self
321    }
322
323    pub fn on_offered_incompatible_qos(
324        mut self,
325        cb: impl Fn(i32, dds_offered_incompatible_qos_status_t) + Send + Sync + 'static,
326    ) -> Self {
327        self.on_offered_incompatible_qos = Some(Arc::new(cb));
328        self
329    }
330
331    pub fn on_data_on_readers(mut self, cb: impl Fn(i32) + Send + Sync + 'static) -> Self {
332        self.on_data_on_readers = Some(Arc::new(cb));
333        self
334    }
335
336    pub fn on_sample_lost(
337        mut self,
338        cb: impl Fn(i32, dds_sample_lost_status_t) + Send + Sync + 'static,
339    ) -> Self {
340        self.on_sample_lost = Some(Arc::new(cb));
341        self
342    }
343
344    pub fn on_sample_rejected(
345        mut self,
346        cb: impl Fn(i32, dds_sample_rejected_status_t) + Send + Sync + 'static,
347    ) -> Self {
348        self.on_sample_rejected = Some(Arc::new(cb));
349        self
350    }
351
352    pub fn on_requested_deadline_missed(
353        mut self,
354        cb: impl Fn(i32, dds_requested_deadline_missed_status_t) + Send + Sync + 'static,
355    ) -> Self {
356        self.on_requested_deadline_missed = Some(Arc::new(cb));
357        self
358    }
359
360    pub fn on_requested_incompatible_qos(
361        mut self,
362        cb: impl Fn(i32, dds_requested_incompatible_qos_status_t) + Send + Sync + 'static,
363    ) -> Self {
364        self.on_requested_incompatible_qos = Some(Arc::new(cb));
365        self
366    }
367
368    // ── Build ───────────────────────────────────────────────────────
369
370    pub fn build(self) -> DdsResult<Listener> {
371        let closures = Box::new(ListenerClosures {
372            on_data_available: self.on_data_available,
373            on_publication_matched: self.on_publication_matched,
374            on_subscription_matched: self.on_subscription_matched,
375            on_liveliness_changed: self.on_liveliness_changed,
376            on_inconsistent_topic: self.on_inconsistent_topic,
377            on_liveliness_lost: self.on_liveliness_lost,
378            on_offered_deadline_missed: self.on_offered_deadline_missed,
379            on_offered_incompatible_qos: self.on_offered_incompatible_qos,
380            on_data_on_readers: self.on_data_on_readers,
381            on_sample_lost: self.on_sample_lost,
382            on_sample_rejected: self.on_sample_rejected,
383            on_requested_deadline_missed: self.on_requested_deadline_missed,
384            on_requested_incompatible_qos: self.on_requested_incompatible_qos,
385        });
386
387        let arg_ptr = &*closures as *const ListenerClosures as *mut c_void;
388        let ptr = unsafe { dds_create_listener(arg_ptr) };
389        if ptr.is_null() {
390            return Err(crate::DdsError::OutOfResources);
391        }
392
393        unsafe {
394            // Existing
395            if closures.on_data_available.is_some() {
396                dds_lset_data_available(ptr, Some(trampoline_data_available));
397            }
398            if closures.on_publication_matched.is_some() {
399                dds_lset_publication_matched(ptr, Some(trampoline_publication_matched));
400            }
401            if closures.on_subscription_matched.is_some() {
402                dds_lset_subscription_matched(ptr, Some(trampoline_subscription_matched));
403            }
404            if closures.on_liveliness_changed.is_some() {
405                dds_lset_liveliness_changed(ptr, Some(trampoline_liveliness_changed));
406            }
407            // New
408            if closures.on_inconsistent_topic.is_some() {
409                dds_lset_inconsistent_topic(ptr, Some(trampoline_inconsistent_topic));
410            }
411            if closures.on_liveliness_lost.is_some() {
412                dds_lset_liveliness_lost(ptr, Some(trampoline_liveliness_lost));
413            }
414            if closures.on_offered_deadline_missed.is_some() {
415                dds_lset_offered_deadline_missed(ptr, Some(trampoline_offered_deadline_missed));
416            }
417            if closures.on_offered_incompatible_qos.is_some() {
418                dds_lset_offered_incompatible_qos(ptr, Some(trampoline_offered_incompatible_qos));
419            }
420            if closures.on_data_on_readers.is_some() {
421                dds_lset_data_on_readers(ptr, Some(trampoline_data_on_readers));
422            }
423            if closures.on_sample_lost.is_some() {
424                dds_lset_sample_lost(ptr, Some(trampoline_sample_lost));
425            }
426            if closures.on_sample_rejected.is_some() {
427                dds_lset_sample_rejected(ptr, Some(trampoline_sample_rejected));
428            }
429            if closures.on_requested_deadline_missed.is_some() {
430                dds_lset_requested_deadline_missed(ptr, Some(trampoline_requested_deadline_missed));
431            }
432            if closures.on_requested_incompatible_qos.is_some() {
433                dds_lset_requested_incompatible_qos(
434                    ptr,
435                    Some(trampoline_requested_incompatible_qos),
436                );
437            }
438        }
439
440        Ok(Listener {
441            ptr,
442            _closures: closures,
443        })
444    }
445}
446
447impl Default for ListenerBuilder {
448    fn default() -> Self {
449        Self::new()
450    }
451}
452
453impl Listener {
454    pub fn builder() -> ListenerBuilder {
455        ListenerBuilder::new()
456    }
457
458    pub fn as_ptr(&self) -> *mut dds_listener_t {
459        self.ptr
460    }
461}
462
463impl Drop for Listener {
464    fn drop(&mut self) {
465        unsafe {
466            dds_delete_listener(self.ptr);
467        }
468    }
469}