Skip to main content

zerodds_rpc/
endpoint.rs

1// SPDX-License-Identifier: Apache-2.0
2// Copyright 2026 ZeroDDS Contributors
3
4//! RPC-Endpoint-Helper — Spec §7.6.2.
5//!
6//! Ein RPC-Endpoint ist ein Tupel von zwei DDS-Endpoints:
7//!
8//! * **Requester**: Request-Writer (sendet Anfragen) + Reply-Reader (liest
9//!   Antworten).
10//! * **Replier**: Request-Reader (empfaengt Anfragen) + Reply-Writer
11//!   (sendet Antworten).
12//!
13//! Spec §7.6.2 fordert, dass die zwei zusammengehoerigen Endpoints in der
14//! SEDP-Discovery als logisches Bundle erkennbar sind. Wir setzen dafuer:
15//!
16//! * `PID_SERVICE_INSTANCE_NAME` (0x0080) auf beide Endpoints — Service-
17//!   Identifikation.
18//! * `PID_RELATED_ENTITY_GUID` (0x0081) — auf jeden Endpoint zeigt diese
19//!   PID auf den GUID des Pendant-Endpoints (Request-Writer ↔ Reply-Reader
20//!   beim Requester; Request-Reader ↔ Reply-Writer beim Replier).
21//! * `PID_TOPIC_ALIASES` (0x0082) optional — alternative Topic-Namen.
22//!
23//! Diese Stufe (C6.1.B) baut nur die **statische Konstruktion** auf:
24//! GUIDs zuweisen, Topic-Namen ableiten, Discovery-PIDs in
25//! [`PublicationBuiltinTopicData`] / [`SubscriptionBuiltinTopicData`]
26//! eintragen. Threading, History-Caches und Korrelation per
27//! `PID_RELATED_SAMPLE_IDENTITY` sind C6.1.C.
28
29extern crate alloc;
30
31use alloc::string::String;
32use alloc::vec::Vec;
33
34use zerodds_rtps::publication_data::PublicationBuiltinTopicData;
35use zerodds_rtps::subscription_data::SubscriptionBuiltinTopicData;
36use zerodds_rtps::wire_types::{EntityId, Guid, GuidPrefix};
37
38use crate::error::{RpcError, RpcResult};
39use crate::service_mapping::ServiceDef;
40use crate::topic_naming::ServiceTopicNames;
41
42// ---------------------------------------------------------------------
43// Builder
44// ---------------------------------------------------------------------
45
46/// Builder fuer ein RPC-Endpoint-Pair (Requester _oder_ Replier).
47#[derive(Debug, Clone)]
48pub struct RpcEndpointBuilder {
49    service: ServiceDef,
50    instance_name: Option<String>,
51    topic_aliases: Vec<String>,
52    participant_prefix: GuidPrefix,
53    request_entity: EntityId,
54    reply_entity: EntityId,
55    type_name_request: String,
56    type_name_reply: String,
57}
58
59impl RpcEndpointBuilder {
60    /// Neuer Builder fuer einen Service.
61    ///
62    /// `participant_prefix` ist der GUID-Prefix des hostenden Participants.
63    /// `request_entity`/`reply_entity` sind die EntityIds, die intern fuer
64    /// den Request- bzw. Reply-Endpoint vergeben werden — der Aufrufer
65    /// (DCPS-Layer) muss sicherstellen, dass sie eindeutig sind.
66    ///
67    /// # Errors
68    /// `RpcError::InvalidServiceName` wenn der Service-Name keinem
69    /// IDL-Identifier entspricht.
70    pub fn new(
71        service: ServiceDef,
72        participant_prefix: GuidPrefix,
73        request_entity: EntityId,
74        reply_entity: EntityId,
75    ) -> RpcResult<Self> {
76        // Validierungs-Round-Trip ueber `topic_names()` — wirft
77        // InvalidServiceName, falls der Name leer/illegal ist.
78        let _ = service.topic_names()?;
79        let type_name_request = alloc::format!("{}_Request", service.name);
80        let type_name_reply = alloc::format!("{}_Reply", service.name);
81        Ok(Self {
82            service,
83            instance_name: None,
84            topic_aliases: Vec::new(),
85            participant_prefix,
86            request_entity,
87            reply_entity,
88            type_name_request,
89            type_name_reply,
90        })
91    }
92
93    /// Setzt den `PID_SERVICE_INSTANCE_NAME` (DDS-RPC §7.8.2). Erlaubt
94    /// es, mehrere Instanzen desselben Service-Typs auf einem Participant
95    /// disjunkt zu adressieren (z.B. `"calc-A"` vs. `"calc-B"`).
96    #[must_use]
97    pub fn instance_name(mut self, name: impl Into<String>) -> Self {
98        self.instance_name = Some(name.into());
99        self
100    }
101
102    /// Setzt `PID_TOPIC_ALIASES`. Reihenfolge bleibt erhalten.
103    #[must_use]
104    pub fn topic_aliases(mut self, aliases: Vec<String>) -> Self {
105        self.topic_aliases = aliases;
106        self
107    }
108
109    /// Override des per-Default abgeleiteten Type-Names der Request-
110    /// Wire-Struktur (`<Service>_Request`).
111    #[must_use]
112    pub fn request_type_name(mut self, n: impl Into<String>) -> Self {
113        self.type_name_request = n.into();
114        self
115    }
116
117    /// Override des per-Default abgeleiteten Type-Names der Reply-
118    /// Wire-Struktur.
119    #[must_use]
120    pub fn reply_type_name(mut self, n: impl Into<String>) -> Self {
121        self.type_name_reply = n.into();
122        self
123    }
124
125    /// Baut das Requester-Pair: Request-**Writer**-Discovery + Reply-
126    /// **Reader**-Discovery, mit korrekt ueberkreuzten `RELATED_ENTITY_GUID`-
127    /// Verweisen.
128    ///
129    /// # Errors
130    /// `RpcError::EmptyService` wenn der Service keine Methoden hat
131    /// (nichts zu transportieren).
132    pub fn build_requester(&self) -> RpcResult<RequesterEndpoint> {
133        self.check_non_empty()?;
134        let topics = self.service.topic_names()?;
135        let req_writer_guid = Guid::new(self.participant_prefix, self.request_entity);
136        let rep_reader_guid = Guid::new(self.participant_prefix, self.reply_entity);
137
138        let request_writer = self.publication(
139            &topics,
140            req_writer_guid,
141            /*related=*/ rep_reader_guid,
142            /*request_side=*/ true,
143        );
144        let reply_reader = self.subscription(
145            &topics,
146            rep_reader_guid,
147            /*related=*/ req_writer_guid,
148            /*request_side=*/ false,
149        );
150        Ok(RequesterEndpoint {
151            request_writer,
152            reply_reader,
153        })
154    }
155
156    /// Baut das Replier-Pair: Request-**Reader**-Discovery + Reply-
157    /// **Writer**-Discovery.
158    ///
159    /// # Errors
160    /// Siehe [`Self::build_requester`].
161    pub fn build_replier(&self) -> RpcResult<ReplierEndpoint> {
162        self.check_non_empty()?;
163        let topics = self.service.topic_names()?;
164        let req_reader_guid = Guid::new(self.participant_prefix, self.request_entity);
165        let rep_writer_guid = Guid::new(self.participant_prefix, self.reply_entity);
166
167        let request_reader = self.subscription(
168            &topics,
169            req_reader_guid,
170            /*related=*/ rep_writer_guid,
171            /*request_side=*/ true,
172        );
173        let reply_writer = self.publication(
174            &topics,
175            rep_writer_guid,
176            /*related=*/ req_reader_guid,
177            /*request_side=*/ false,
178        );
179        Ok(ReplierEndpoint {
180            request_reader,
181            reply_writer,
182        })
183    }
184
185    fn check_non_empty(&self) -> RpcResult<()> {
186        if self.service.methods.is_empty() {
187            return Err(RpcError::EmptyService(self.service.name.clone()));
188        }
189        Ok(())
190    }
191
192    fn publication(
193        &self,
194        topics: &ServiceTopicNames,
195        my_guid: Guid,
196        related: Guid,
197        request_side: bool,
198    ) -> PublicationBuiltinTopicData {
199        let (topic_name, type_name) = if request_side {
200            (topics.request.clone(), self.type_name_request.clone())
201        } else {
202            (topics.reply.clone(), self.type_name_reply.clone())
203        };
204        PublicationBuiltinTopicData {
205            key: my_guid,
206            participant_key: Guid::new(self.participant_prefix, EntityId::PARTICIPANT),
207            topic_name,
208            type_name,
209            durability: zerodds_rtps::publication_data::DurabilityKind::default(),
210            reliability: zerodds_rtps::publication_data::ReliabilityQos::default(),
211            ownership: zerodds_qos::OwnershipKind::Shared,
212            ownership_strength: 0,
213            liveliness: zerodds_qos::LivelinessQosPolicy::default(),
214            deadline: zerodds_qos::DeadlineQosPolicy::default(),
215            lifespan: zerodds_qos::LifespanQosPolicy::default(),
216            partition: Vec::new(),
217            user_data: Vec::new(),
218            topic_data: Vec::new(),
219            group_data: Vec::new(),
220            type_information: None,
221            data_representation: Vec::new(),
222            security_info: None,
223            service_instance_name: self.instance_name.clone(),
224            related_entity_guid: Some(related),
225            topic_aliases: if self.topic_aliases.is_empty() {
226                None
227            } else {
228                Some(self.topic_aliases.clone())
229            },
230            type_identifier: zerodds_types::TypeIdentifier::None,
231        }
232    }
233
234    fn subscription(
235        &self,
236        topics: &ServiceTopicNames,
237        my_guid: Guid,
238        related: Guid,
239        request_side: bool,
240    ) -> SubscriptionBuiltinTopicData {
241        let (topic_name, type_name) = if request_side {
242            (topics.request.clone(), self.type_name_request.clone())
243        } else {
244            (topics.reply.clone(), self.type_name_reply.clone())
245        };
246        SubscriptionBuiltinTopicData {
247            key: my_guid,
248            participant_key: Guid::new(self.participant_prefix, EntityId::PARTICIPANT),
249            topic_name,
250            type_name,
251            durability: zerodds_rtps::publication_data::DurabilityKind::default(),
252            reliability: zerodds_rtps::publication_data::ReliabilityQos::default(),
253            ownership: zerodds_qos::OwnershipKind::Shared,
254            liveliness: zerodds_qos::LivelinessQosPolicy::default(),
255            deadline: zerodds_qos::DeadlineQosPolicy::default(),
256            partition: Vec::new(),
257            user_data: Vec::new(),
258            topic_data: Vec::new(),
259            group_data: Vec::new(),
260            type_information: None,
261            data_representation: Vec::new(),
262            content_filter: None,
263            security_info: None,
264            service_instance_name: self.instance_name.clone(),
265            related_entity_guid: Some(related),
266            topic_aliases: if self.topic_aliases.is_empty() {
267                None
268            } else {
269                Some(self.topic_aliases.clone())
270            },
271            type_identifier: zerodds_types::TypeIdentifier::None,
272        }
273    }
274}
275
276// ---------------------------------------------------------------------
277// Endpoint-Pair-Strukturen
278// ---------------------------------------------------------------------
279
280/// Requester-Pair (Request-Writer + Reply-Reader).
281#[derive(Debug, Clone, PartialEq)]
282pub struct RequesterEndpoint {
283    /// SEDP-Daten des Request-Writers.
284    pub request_writer: PublicationBuiltinTopicData,
285    /// SEDP-Daten des Reply-Readers.
286    pub reply_reader: SubscriptionBuiltinTopicData,
287}
288
289/// Replier-Pair (Request-Reader + Reply-Writer).
290#[derive(Debug, Clone, PartialEq)]
291pub struct ReplierEndpoint {
292    /// SEDP-Daten des Request-Readers.
293    pub request_reader: SubscriptionBuiltinTopicData,
294    /// SEDP-Daten des Reply-Writers.
295    pub reply_writer: PublicationBuiltinTopicData,
296}
297
298#[cfg(test)]
299#[allow(clippy::unwrap_used, clippy::expect_used)]
300mod tests {
301    use super::*;
302    use crate::annotations::lower_rpc_annotations;
303    use crate::service_mapping::lower_service;
304    use zerodds_idl::ast::{
305        Annotation, AnnotationParams, Export, Identifier, IntegerType, InterfaceDef, InterfaceKind,
306        OpDecl, ParamAttribute, ParamDecl, PrimitiveType, ScopedName, TypeSpec,
307    };
308    use zerodds_idl::errors::Span;
309
310    fn sp() -> Span {
311        Span::SYNTHETIC
312    }
313
314    fn ident(t: &str) -> Identifier {
315        Identifier::new(t, sp())
316    }
317
318    fn long_t() -> TypeSpec {
319        TypeSpec::Primitive(PrimitiveType::Integer(IntegerType::Long))
320    }
321
322    fn ann_simple(name: &str) -> Annotation {
323        Annotation {
324            name: ScopedName {
325                absolute: false,
326                parts: alloc::vec![ident(name)],
327                span: sp(),
328            },
329            params: AnnotationParams::None,
330            span: sp(),
331        }
332    }
333
334    fn calc_service() -> ServiceDef {
335        let add = OpDecl {
336            name: ident("add"),
337            oneway: false,
338            return_type: Some(long_t()),
339            params: alloc::vec![ParamDecl {
340                attribute: ParamAttribute::In,
341                type_spec: long_t(),
342                name: ident("a"),
343                annotations: Vec::new(),
344                span: sp(),
345            }],
346            raises: Vec::new(),
347            annotations: Vec::new(),
348            span: sp(),
349        };
350        let i = InterfaceDef {
351            kind: InterfaceKind::Plain,
352            name: ident("Calculator"),
353            bases: Vec::new(),
354            exports: alloc::vec![Export::Op(add)],
355            annotations: alloc::vec![ann_simple("service")],
356            span: sp(),
357        };
358        let lowered = lower_rpc_annotations(&i.annotations);
359        lower_service(&i, &lowered).unwrap()
360    }
361
362    fn pp() -> GuidPrefix {
363        GuidPrefix::from_bytes([0x99; 12])
364    }
365
366    fn req_eid() -> EntityId {
367        EntityId::user_writer_with_key([0xA0, 0xA1, 0xA2])
368    }
369
370    fn rep_eid() -> EntityId {
371        EntityId::user_reader_with_key([0xB0, 0xB1, 0xB2])
372    }
373
374    #[test]
375    fn requester_topic_names_match_service() {
376        let svc = calc_service();
377        let b = RpcEndpointBuilder::new(svc, pp(), req_eid(), rep_eid()).unwrap();
378        let r = b.build_requester().unwrap();
379        assert_eq!(r.request_writer.topic_name, "Calculator_Request");
380        assert_eq!(r.reply_reader.topic_name, "Calculator_Reply");
381    }
382
383    #[test]
384    fn requester_related_entity_guids_cross_link() {
385        let svc = calc_service();
386        let b = RpcEndpointBuilder::new(svc, pp(), req_eid(), rep_eid()).unwrap();
387        let r = b.build_requester().unwrap();
388        // Request-Writer.related → Reply-Reader.guid
389        assert_eq!(
390            r.request_writer.related_entity_guid,
391            Some(r.reply_reader.key)
392        );
393        // Reply-Reader.related → Request-Writer.guid
394        assert_eq!(
395            r.reply_reader.related_entity_guid,
396            Some(r.request_writer.key)
397        );
398    }
399
400    #[test]
401    fn replier_topic_names_match_service() {
402        let svc = calc_service();
403        let b = RpcEndpointBuilder::new(svc, pp(), req_eid(), rep_eid()).unwrap();
404        let r = b.build_replier().unwrap();
405        assert_eq!(r.request_reader.topic_name, "Calculator_Request");
406        assert_eq!(r.reply_writer.topic_name, "Calculator_Reply");
407    }
408
409    #[test]
410    fn replier_related_entity_guids_cross_link() {
411        let svc = calc_service();
412        let b = RpcEndpointBuilder::new(svc, pp(), req_eid(), rep_eid()).unwrap();
413        let r = b.build_replier().unwrap();
414        assert_eq!(
415            r.request_reader.related_entity_guid,
416            Some(r.reply_writer.key)
417        );
418        assert_eq!(
419            r.reply_writer.related_entity_guid,
420            Some(r.request_reader.key)
421        );
422    }
423
424    #[test]
425    fn instance_name_propagated_to_both_sides() {
426        let svc = calc_service();
427        let b = RpcEndpointBuilder::new(svc, pp(), req_eid(), rep_eid())
428            .unwrap()
429            .instance_name("calc-A");
430        let r = b.build_requester().unwrap();
431        assert_eq!(
432            r.request_writer.service_instance_name.as_deref(),
433            Some("calc-A")
434        );
435        assert_eq!(
436            r.reply_reader.service_instance_name.as_deref(),
437            Some("calc-A")
438        );
439        let p = b.build_replier().unwrap();
440        assert_eq!(
441            p.request_reader.service_instance_name.as_deref(),
442            Some("calc-A")
443        );
444        assert_eq!(
445            p.reply_writer.service_instance_name.as_deref(),
446            Some("calc-A")
447        );
448    }
449
450    #[test]
451    fn topic_aliases_propagated_to_both_sides() {
452        let svc = calc_service();
453        let b = RpcEndpointBuilder::new(svc, pp(), req_eid(), rep_eid())
454            .unwrap()
455            .topic_aliases(alloc::vec!["LegacyCalc_Request".into()]);
456        let r = b.build_requester().unwrap();
457        assert_eq!(
458            r.request_writer.topic_aliases.as_deref(),
459            Some(alloc::vec!["LegacyCalc_Request".to_string()].as_slice())
460        );
461    }
462
463    #[test]
464    fn topic_aliases_empty_yields_none() {
465        let svc = calc_service();
466        let b = RpcEndpointBuilder::new(svc, pp(), req_eid(), rep_eid()).unwrap();
467        let r = b.build_requester().unwrap();
468        assert!(r.request_writer.topic_aliases.is_none());
469    }
470
471    #[test]
472    fn type_names_default_to_service_request_reply() {
473        let svc = calc_service();
474        let b = RpcEndpointBuilder::new(svc, pp(), req_eid(), rep_eid()).unwrap();
475        let r = b.build_requester().unwrap();
476        assert_eq!(r.request_writer.type_name, "Calculator_Request");
477        assert_eq!(r.reply_reader.type_name, "Calculator_Reply");
478    }
479
480    #[test]
481    fn type_name_overrides_apply() {
482        let svc = calc_service();
483        let b = RpcEndpointBuilder::new(svc, pp(), req_eid(), rep_eid())
484            .unwrap()
485            .request_type_name("X")
486            .reply_type_name("Y");
487        let r = b.build_requester().unwrap();
488        assert_eq!(r.request_writer.type_name, "X");
489        assert_eq!(r.reply_reader.type_name, "Y");
490    }
491
492    #[test]
493    fn empty_service_yields_error() {
494        let svc = ServiceDef {
495            name: "Empty".into(),
496            methods: Vec::new(),
497        };
498        let b = RpcEndpointBuilder::new(svc, pp(), req_eid(), rep_eid()).unwrap();
499        let err = b.build_requester().unwrap_err();
500        assert!(matches!(err, RpcError::EmptyService(ref n) if n == "Empty"));
501        let err = b.build_replier().unwrap_err();
502        assert!(matches!(err, RpcError::EmptyService(_)));
503    }
504
505    #[test]
506    fn invalid_service_name_rejected_in_builder_new() {
507        let svc = ServiceDef {
508            name: String::new(),
509            methods: Vec::new(),
510        };
511        let err = RpcEndpointBuilder::new(svc, pp(), req_eid(), rep_eid()).unwrap_err();
512        assert!(matches!(err, RpcError::InvalidServiceName(_)));
513    }
514
515    #[test]
516    fn participant_key_uses_participant_entity_id() {
517        let svc = calc_service();
518        let b = RpcEndpointBuilder::new(svc, pp(), req_eid(), rep_eid()).unwrap();
519        let r = b.build_requester().unwrap();
520        assert_eq!(
521            r.request_writer.participant_key.entity_id,
522            EntityId::PARTICIPANT
523        );
524        assert_eq!(r.request_writer.participant_key.prefix, pp());
525    }
526}