Skip to main content

middleware_core/stack/
discovery_registry.rs

1use std::time::Duration;
2
3use transport_core::Endpoint;
4
5use crate::config::MiddlewareRuntimeConfig;
6use crate::discovery::{
7    DiscoveryEndpoint, DiscoveryEntry, DiscoveryPruneReport, DiscoverySnapshot,
8};
9use crate::qos::QosProfile;
10
11use super::MiddlewareStack;
12
13impl MiddlewareStack {
14    fn topic_subscriber_kind_label() -> &'static str {
15        "kind:topic-subscriber"
16    }
17
18    fn topic_subscriber_topic_label(topic: &str) -> String {
19        format!("topic:{topic}")
20    }
21
22    fn topic_subscriber_qos_label(qos: QosProfile) -> &'static str {
23        if qos.reliable {
24            "qos:reliable"
25        } else {
26            "qos:best_effort"
27        }
28    }
29
30    fn topic_subscriber_reliable_label() -> &'static str {
31        "qos:reliable"
32    }
33
34    fn topic_subscriber_best_effort_label() -> &'static str {
35        "qos:best_effort"
36    }
37
38    fn topic_subscriber_origin_local_label() -> &'static str {
39        "origin:local"
40    }
41
42    fn topic_subscriber_acked_seq_prefix() -> &'static str {
43        "acked_seq:"
44    }
45
46    fn parse_topic_subscriber_acked_seq(labels: &[String]) -> Option<u64> {
47        labels.iter().find_map(|label| {
48            label
49                .strip_prefix(Self::topic_subscriber_acked_seq_prefix())
50                .and_then(|value| value.parse::<u64>().ok())
51        })
52    }
53
54    pub fn apply_runtime_config(&mut self, config: MiddlewareRuntimeConfig) {
55        self.route_rules = config.route_rules;
56        self.namespace_isolation = config.namespace_isolation;
57        self.topic_bus
58            .set_reliability_policy(config.topic_reliability_policy);
59        for item in config.topic_qos_overrides {
60            self.qos.set_topic_qos(item.topic, item.profile);
61        }
62    }
63
64    pub fn register_topic(&mut self, topic: impl Into<String>) {
65        self.discovery.register_topic(topic);
66    }
67
68    pub fn register_topic_with_ttl(&mut self, topic: impl Into<String>, ttl: Duration) {
69        self.discovery.register_topic_with_ttl(topic, ttl);
70    }
71
72    pub fn set_topic_qos(&mut self, topic: impl Into<String>, profile: QosProfile) {
73        self.qos.set_topic_qos(topic, profile);
74    }
75
76    pub fn set_topic_qos_if_absent(&mut self, topic: impl Into<String>, profile: QosProfile) {
77        self.qos.set_topic_qos_if_absent(topic, profile);
78    }
79
80    pub fn topic_qos(&self, topic: &str) -> Option<QosProfile> {
81        self.qos.topic_qos(topic)
82    }
83
84    pub fn register_service(&mut self, service: impl Into<String>) {
85        self.discovery.register_service(service);
86    }
87
88    pub fn register_service_with_ttl(&mut self, service: impl Into<String>, ttl: Duration) {
89        self.discovery.register_service_with_ttl(service, ttl);
90    }
91
92    pub fn register_mission(&mut self, mission: impl Into<String>) {
93        self.discovery.register_mission(mission);
94    }
95
96    pub fn register_mission_with_ttl(&mut self, mission: impl Into<String>, ttl: Duration) {
97        self.discovery.register_mission_with_ttl(mission, ttl);
98    }
99
100    pub fn register_endpoint(&mut self, name: impl Into<String>, endpoint: Endpoint) {
101        self.discovery.register_endpoint(name, endpoint);
102    }
103
104    pub fn register_endpoint_with_ttl(
105        &mut self,
106        name: impl Into<String>,
107        endpoint: Endpoint,
108        ttl: Duration,
109    ) {
110        self.discovery
111            .register_endpoint_with_ttl(name, endpoint, ttl);
112    }
113
114    pub fn unregister_endpoint(&mut self, name: &str) -> bool {
115        self.discovery.unregister_endpoint(name)
116    }
117
118    pub fn register_topic_subscriber_endpoint_with_ttl(
119        &mut self,
120        name: impl Into<String>,
121        topic: impl Into<String>,
122        mut endpoint: Endpoint,
123        qos: QosProfile,
124        ttl: Duration,
125    ) {
126        let name = name.into();
127        let topic = topic.into();
128
129        let kind_label = Self::topic_subscriber_kind_label().to_string();
130        if !endpoint.labels.contains(&kind_label) {
131            endpoint.labels.push(kind_label);
132        }
133        let topic_label = Self::topic_subscriber_topic_label(&topic);
134        if !endpoint.labels.contains(&topic_label) {
135            endpoint.labels.push(topic_label.clone());
136        }
137        let qos_label = Self::topic_subscriber_qos_label(qos).to_string();
138        if !endpoint.labels.contains(&qos_label) {
139            endpoint.labels.push(qos_label.clone());
140        }
141
142        self.register_topic_with_ttl(topic, ttl);
143        self.register_endpoint_with_ttl(name.clone(), endpoint, ttl);
144        self.discovery.add_labels(
145            name,
146            vec![
147                Self::topic_subscriber_kind_label().to_string(),
148                topic_label,
149                qos_label,
150            ],
151        );
152    }
153
154    pub fn topic_subscriber_endpoints(&self, topic: &str) -> Vec<DiscoveryEndpoint> {
155        let topic_label = Self::topic_subscriber_topic_label(topic);
156        self.discovery
157            .endpoint_entries()
158            .into_iter()
159            .filter(|entry| {
160                entry
161                    .endpoint
162                    .labels
163                    .iter()
164                    .any(|label| label == Self::topic_subscriber_kind_label())
165                    && entry
166                        .endpoint
167                        .labels
168                        .iter()
169                        .any(|label| label == &topic_label)
170            })
171            .collect()
172    }
173
174    pub fn topic_subscriber_counts(&self, topic: &str) -> (usize, usize) {
175        let mut reliable = 0usize;
176        let mut best_effort = 0usize;
177        for entry in self.topic_subscriber_endpoints(topic) {
178            if entry
179                .endpoint
180                .labels
181                .iter()
182                .any(|label| label == Self::topic_subscriber_reliable_label())
183            {
184                reliable += 1;
185            } else if entry
186                .endpoint
187                .labels
188                .iter()
189                .any(|label| label == Self::topic_subscriber_best_effort_label())
190            {
191                best_effort += 1;
192            }
193        }
194        (reliable, best_effort)
195    }
196
197    pub fn topic_subscriber_count(&self, topic: &str) -> usize {
198        let (reliable, best_effort) = self.topic_subscriber_counts(topic);
199        reliable + best_effort
200    }
201
202    fn topic_reliable_subscriber_acks_by_origin(
203        &self,
204        topic: &str,
205        expect_local_origin: bool,
206    ) -> Vec<(String, Option<u64>)> {
207        self.topic_subscriber_endpoints(topic)
208            .into_iter()
209            .filter(|entry| {
210                let is_reliable = entry
211                    .endpoint
212                    .labels
213                    .iter()
214                    .any(|label| label == Self::topic_subscriber_reliable_label());
215                if !is_reliable {
216                    return false;
217                }
218
219                let is_local = entry
220                    .endpoint
221                    .labels
222                    .iter()
223                    .any(|label| label == Self::topic_subscriber_origin_local_label());
224                is_local == expect_local_origin
225            })
226            .map(|entry| {
227                let acked_seq = Self::parse_topic_subscriber_acked_seq(&entry.endpoint.labels);
228                (entry.name, acked_seq)
229            })
230            .collect()
231    }
232
233    pub fn topic_local_reliable_subscriber_acks(&self, topic: &str) -> Vec<(String, Option<u64>)> {
234        self.topic_reliable_subscriber_acks_by_origin(topic, true)
235    }
236
237    pub fn topic_reliable_subscriber_acks(&self, topic: &str) -> Vec<(String, Option<u64>)> {
238        self.topic_reliable_subscriber_acks_by_origin(topic, false)
239    }
240
241    pub fn update_topic_subscriber_ack(
242        &mut self,
243        endpoint_name: &str,
244        acked_seq: Option<u64>,
245    ) -> bool {
246        let Some(entry) = self.find_endpoint(endpoint_name) else {
247            return false;
248        };
249
250        let is_topic_subscriber = entry
251            .endpoint
252            .labels
253            .iter()
254            .any(|label| label == Self::topic_subscriber_kind_label());
255        let is_reliable = entry
256            .endpoint
257            .labels
258            .iter()
259            .any(|label| label == Self::topic_subscriber_reliable_label());
260        if !is_topic_subscriber || !is_reliable {
261            return false;
262        }
263
264        let mut labels = entry.endpoint.labels;
265        let existing_acked_seq = Self::parse_topic_subscriber_acked_seq(&labels);
266        labels.retain(|label| !label.starts_with(Self::topic_subscriber_acked_seq_prefix()));
267        if let Some(seq) = acked_seq {
268            let seq = existing_acked_seq.map_or(seq, |existing| existing.max(seq));
269            labels.push(format!("{}{}", Self::topic_subscriber_acked_seq_prefix(), seq));
270        }
271
272        self.discovery.update_endpoint_labels(endpoint_name, labels)
273    }
274
275    pub fn find_endpoint(&self, name: &str) -> Option<DiscoveryEndpoint> {
276        self.discovery.find_endpoint(name)
277    }
278
279    pub fn endpoint_entries(&self) -> Vec<DiscoveryEndpoint> {
280        self.discovery.endpoint_entries()
281    }
282
283    pub fn renew_topic_lease(&mut self, topic: &str, ttl: Duration) -> bool {
284        self.discovery.renew_topic_lease(topic, ttl)
285    }
286
287    pub fn renew_service_lease(&mut self, service: &str, ttl: Duration) -> bool {
288        self.discovery.renew_service_lease(service, ttl)
289    }
290
291    pub fn renew_mission_lease(&mut self, mission: &str, ttl: Duration) -> bool {
292        self.discovery.renew_mission_lease(mission, ttl)
293    }
294
295    pub fn renew_endpoint_lease(&mut self, endpoint: &str, ttl: Duration) -> bool {
296        self.discovery.renew_endpoint_lease(endpoint, ttl)
297    }
298
299    pub fn set_topic_health(&mut self, topic: &str, healthy: bool) -> bool {
300        self.discovery.set_topic_health(topic, healthy)
301    }
302
303    pub fn set_service_health(&mut self, service: &str, healthy: bool) -> bool {
304        self.discovery.set_service_health(service, healthy)
305    }
306
307    pub fn set_mission_health(&mut self, mission: &str, healthy: bool) -> bool {
308        self.discovery.set_mission_health(mission, healthy)
309    }
310
311    pub fn set_endpoint_health(&mut self, endpoint: &str, healthy: bool) -> bool {
312        self.discovery.set_endpoint_health(endpoint, healthy)
313    }
314
315    pub fn prune_discovery_inactive(&mut self) -> DiscoveryPruneReport {
316        self.discovery.prune_inactive()
317    }
318
319    pub fn snapshot(&self) -> DiscoverySnapshot {
320        self.discovery.snapshot()
321    }
322
323    pub fn topic_entries(&self) -> Vec<DiscoveryEntry> {
324        self.discovery.topic_entries()
325    }
326
327    pub fn service_entries(&self) -> Vec<DiscoveryEntry> {
328        self.discovery.service_entries()
329    }
330
331    pub fn mission_entries(&self) -> Vec<DiscoveryEntry> {
332        self.discovery.mission_entries()
333    }
334}