1use std::time::Duration;
2
3use transport_core::Endpoint;
4
5use crate::config::MiddlewareRuntimeConfig;
6use crate::discovery::{
7 DiscoveryEndpoint, DiscoveryEntry, DiscoveryPruneReport, DiscoverySnapshot,
8};
9use crate::qos::QosProfile;
10
11use super::MiddlewareStack;
12
13impl MiddlewareStack {
14 fn topic_subscriber_kind_label() -> &'static str {
15 "kind:topic-subscriber"
16 }
17
18 fn topic_subscriber_topic_label(topic: &str) -> String {
19 format!("topic:{topic}")
20 }
21
22 fn topic_subscriber_qos_label(qos: QosProfile) -> &'static str {
23 if qos.reliable {
24 "qos:reliable"
25 } else {
26 "qos:best_effort"
27 }
28 }
29
30 fn topic_subscriber_reliable_label() -> &'static str {
31 "qos:reliable"
32 }
33
34 fn topic_subscriber_best_effort_label() -> &'static str {
35 "qos:best_effort"
36 }
37
38 fn topic_subscriber_origin_local_label() -> &'static str {
39 "origin:local"
40 }
41
42 fn topic_subscriber_acked_seq_prefix() -> &'static str {
43 "acked_seq:"
44 }
45
46 fn parse_topic_subscriber_acked_seq(labels: &[String]) -> Option<u64> {
47 labels.iter().find_map(|label| {
48 label
49 .strip_prefix(Self::topic_subscriber_acked_seq_prefix())
50 .and_then(|value| value.parse::<u64>().ok())
51 })
52 }
53
54 pub fn apply_runtime_config(&mut self, config: MiddlewareRuntimeConfig) {
55 self.route_rules = config.route_rules;
56 self.namespace_isolation = config.namespace_isolation;
57 self.topic_bus
58 .set_reliability_policy(config.topic_reliability_policy);
59 for item in config.topic_qos_overrides {
60 self.qos.set_topic_qos(item.topic, item.profile);
61 }
62 }
63
64 pub fn register_topic(&mut self, topic: impl Into<String>) {
65 self.discovery.register_topic(topic);
66 }
67
68 pub fn register_topic_with_ttl(&mut self, topic: impl Into<String>, ttl: Duration) {
69 self.discovery.register_topic_with_ttl(topic, ttl);
70 }
71
72 pub fn set_topic_qos(&mut self, topic: impl Into<String>, profile: QosProfile) {
73 self.qos.set_topic_qos(topic, profile);
74 }
75
76 pub fn set_topic_qos_if_absent(&mut self, topic: impl Into<String>, profile: QosProfile) {
77 self.qos.set_topic_qos_if_absent(topic, profile);
78 }
79
80 pub fn topic_qos(&self, topic: &str) -> Option<QosProfile> {
81 self.qos.topic_qos(topic)
82 }
83
84 pub fn register_service(&mut self, service: impl Into<String>) {
85 self.discovery.register_service(service);
86 }
87
88 pub fn register_service_with_ttl(&mut self, service: impl Into<String>, ttl: Duration) {
89 self.discovery.register_service_with_ttl(service, ttl);
90 }
91
92 pub fn register_mission(&mut self, mission: impl Into<String>) {
93 self.discovery.register_mission(mission);
94 }
95
96 pub fn register_mission_with_ttl(&mut self, mission: impl Into<String>, ttl: Duration) {
97 self.discovery.register_mission_with_ttl(mission, ttl);
98 }
99
100 pub fn register_endpoint(&mut self, name: impl Into<String>, endpoint: Endpoint) {
101 self.discovery.register_endpoint(name, endpoint);
102 }
103
104 pub fn register_endpoint_with_ttl(
105 &mut self,
106 name: impl Into<String>,
107 endpoint: Endpoint,
108 ttl: Duration,
109 ) {
110 self.discovery
111 .register_endpoint_with_ttl(name, endpoint, ttl);
112 }
113
114 pub fn unregister_endpoint(&mut self, name: &str) -> bool {
115 self.discovery.unregister_endpoint(name)
116 }
117
118 pub fn register_topic_subscriber_endpoint_with_ttl(
119 &mut self,
120 name: impl Into<String>,
121 topic: impl Into<String>,
122 mut endpoint: Endpoint,
123 qos: QosProfile,
124 ttl: Duration,
125 ) {
126 let name = name.into();
127 let topic = topic.into();
128
129 let kind_label = Self::topic_subscriber_kind_label().to_string();
130 if !endpoint.labels.contains(&kind_label) {
131 endpoint.labels.push(kind_label);
132 }
133 let topic_label = Self::topic_subscriber_topic_label(&topic);
134 if !endpoint.labels.contains(&topic_label) {
135 endpoint.labels.push(topic_label.clone());
136 }
137 let qos_label = Self::topic_subscriber_qos_label(qos).to_string();
138 if !endpoint.labels.contains(&qos_label) {
139 endpoint.labels.push(qos_label.clone());
140 }
141
142 self.register_topic_with_ttl(topic, ttl);
143 self.register_endpoint_with_ttl(name.clone(), endpoint, ttl);
144 self.discovery.add_labels(
145 name,
146 vec![
147 Self::topic_subscriber_kind_label().to_string(),
148 topic_label,
149 qos_label,
150 ],
151 );
152 }
153
154 pub fn topic_subscriber_endpoints(&self, topic: &str) -> Vec<DiscoveryEndpoint> {
155 let topic_label = Self::topic_subscriber_topic_label(topic);
156 self.discovery
157 .endpoint_entries()
158 .into_iter()
159 .filter(|entry| {
160 entry
161 .endpoint
162 .labels
163 .iter()
164 .any(|label| label == Self::topic_subscriber_kind_label())
165 && entry
166 .endpoint
167 .labels
168 .iter()
169 .any(|label| label == &topic_label)
170 })
171 .collect()
172 }
173
174 pub fn topic_subscriber_counts(&self, topic: &str) -> (usize, usize) {
175 let mut reliable = 0usize;
176 let mut best_effort = 0usize;
177 for entry in self.topic_subscriber_endpoints(topic) {
178 if entry
179 .endpoint
180 .labels
181 .iter()
182 .any(|label| label == Self::topic_subscriber_reliable_label())
183 {
184 reliable += 1;
185 } else if entry
186 .endpoint
187 .labels
188 .iter()
189 .any(|label| label == Self::topic_subscriber_best_effort_label())
190 {
191 best_effort += 1;
192 }
193 }
194 (reliable, best_effort)
195 }
196
197 pub fn topic_subscriber_count(&self, topic: &str) -> usize {
198 let (reliable, best_effort) = self.topic_subscriber_counts(topic);
199 reliable + best_effort
200 }
201
202 fn topic_reliable_subscriber_acks_by_origin(
203 &self,
204 topic: &str,
205 expect_local_origin: bool,
206 ) -> Vec<(String, Option<u64>)> {
207 self.topic_subscriber_endpoints(topic)
208 .into_iter()
209 .filter(|entry| {
210 let is_reliable = entry
211 .endpoint
212 .labels
213 .iter()
214 .any(|label| label == Self::topic_subscriber_reliable_label());
215 if !is_reliable {
216 return false;
217 }
218
219 let is_local = entry
220 .endpoint
221 .labels
222 .iter()
223 .any(|label| label == Self::topic_subscriber_origin_local_label());
224 is_local == expect_local_origin
225 })
226 .map(|entry| {
227 let acked_seq = Self::parse_topic_subscriber_acked_seq(&entry.endpoint.labels);
228 (entry.name, acked_seq)
229 })
230 .collect()
231 }
232
233 pub fn topic_local_reliable_subscriber_acks(&self, topic: &str) -> Vec<(String, Option<u64>)> {
234 self.topic_reliable_subscriber_acks_by_origin(topic, true)
235 }
236
237 pub fn topic_reliable_subscriber_acks(&self, topic: &str) -> Vec<(String, Option<u64>)> {
238 self.topic_reliable_subscriber_acks_by_origin(topic, false)
239 }
240
241 pub fn update_topic_subscriber_ack(
242 &mut self,
243 endpoint_name: &str,
244 acked_seq: Option<u64>,
245 ) -> bool {
246 let Some(entry) = self.find_endpoint(endpoint_name) else {
247 return false;
248 };
249
250 let is_topic_subscriber = entry
251 .endpoint
252 .labels
253 .iter()
254 .any(|label| label == Self::topic_subscriber_kind_label());
255 let is_reliable = entry
256 .endpoint
257 .labels
258 .iter()
259 .any(|label| label == Self::topic_subscriber_reliable_label());
260 if !is_topic_subscriber || !is_reliable {
261 return false;
262 }
263
264 let mut labels = entry.endpoint.labels;
265 let existing_acked_seq = Self::parse_topic_subscriber_acked_seq(&labels);
266 labels.retain(|label| !label.starts_with(Self::topic_subscriber_acked_seq_prefix()));
267 if let Some(seq) = acked_seq {
268 let seq = existing_acked_seq.map_or(seq, |existing| existing.max(seq));
269 labels.push(format!("{}{}", Self::topic_subscriber_acked_seq_prefix(), seq));
270 }
271
272 self.discovery.update_endpoint_labels(endpoint_name, labels)
273 }
274
275 pub fn find_endpoint(&self, name: &str) -> Option<DiscoveryEndpoint> {
276 self.discovery.find_endpoint(name)
277 }
278
279 pub fn endpoint_entries(&self) -> Vec<DiscoveryEndpoint> {
280 self.discovery.endpoint_entries()
281 }
282
283 pub fn renew_topic_lease(&mut self, topic: &str, ttl: Duration) -> bool {
284 self.discovery.renew_topic_lease(topic, ttl)
285 }
286
287 pub fn renew_service_lease(&mut self, service: &str, ttl: Duration) -> bool {
288 self.discovery.renew_service_lease(service, ttl)
289 }
290
291 pub fn renew_mission_lease(&mut self, mission: &str, ttl: Duration) -> bool {
292 self.discovery.renew_mission_lease(mission, ttl)
293 }
294
295 pub fn renew_endpoint_lease(&mut self, endpoint: &str, ttl: Duration) -> bool {
296 self.discovery.renew_endpoint_lease(endpoint, ttl)
297 }
298
299 pub fn set_topic_health(&mut self, topic: &str, healthy: bool) -> bool {
300 self.discovery.set_topic_health(topic, healthy)
301 }
302
303 pub fn set_service_health(&mut self, service: &str, healthy: bool) -> bool {
304 self.discovery.set_service_health(service, healthy)
305 }
306
307 pub fn set_mission_health(&mut self, mission: &str, healthy: bool) -> bool {
308 self.discovery.set_mission_health(mission, healthy)
309 }
310
311 pub fn set_endpoint_health(&mut self, endpoint: &str, healthy: bool) -> bool {
312 self.discovery.set_endpoint_health(endpoint, healthy)
313 }
314
315 pub fn prune_discovery_inactive(&mut self) -> DiscoveryPruneReport {
316 self.discovery.prune_inactive()
317 }
318
319 pub fn snapshot(&self) -> DiscoverySnapshot {
320 self.discovery.snapshot()
321 }
322
323 pub fn topic_entries(&self) -> Vec<DiscoveryEntry> {
324 self.discovery.topic_entries()
325 }
326
327 pub fn service_entries(&self) -> Vec<DiscoveryEntry> {
328 self.discovery.service_entries()
329 }
330
331 pub fn mission_entries(&self) -> Vec<DiscoveryEntry> {
332 self.discovery.mission_entries()
333 }
334}