use std::time::Duration;
use transport_core::Endpoint;
use crate::config::MiddlewareRuntimeConfig;
use crate::discovery::{
DiscoveryEndpoint, DiscoveryEntry, DiscoveryPruneReport, DiscoverySnapshot,
};
use crate::qos::QosProfile;
use super::MiddlewareStack;
impl MiddlewareStack {
fn topic_subscriber_kind_label() -> &'static str {
"kind:topic-subscriber"
}
fn topic_subscriber_topic_label(topic: &str) -> String {
format!("topic:{topic}")
}
fn topic_subscriber_qos_label(qos: QosProfile) -> &'static str {
if qos.reliable {
"qos:reliable"
} else {
"qos:best_effort"
}
}
fn topic_subscriber_reliable_label() -> &'static str {
"qos:reliable"
}
fn topic_subscriber_best_effort_label() -> &'static str {
"qos:best_effort"
}
fn topic_subscriber_origin_local_label() -> &'static str {
"origin:local"
}
fn topic_subscriber_acked_seq_prefix() -> &'static str {
"acked_seq:"
}
fn parse_topic_subscriber_acked_seq(labels: &[String]) -> Option<u64> {
labels.iter().find_map(|label| {
label
.strip_prefix(Self::topic_subscriber_acked_seq_prefix())
.and_then(|value| value.parse::<u64>().ok())
})
}
pub fn apply_runtime_config(&mut self, config: MiddlewareRuntimeConfig) {
self.route_rules = config.route_rules;
self.namespace_isolation = config.namespace_isolation;
self.topic_bus
.set_reliability_policy(config.topic_reliability_policy);
for item in config.topic_qos_overrides {
self.qos.set_topic_qos(item.topic, item.profile);
}
}
pub fn register_topic(&mut self, topic: impl Into<String>) {
self.discovery.register_topic(topic);
}
pub fn register_topic_with_ttl(&mut self, topic: impl Into<String>, ttl: Duration) {
self.discovery.register_topic_with_ttl(topic, ttl);
}
pub fn set_topic_qos(&mut self, topic: impl Into<String>, profile: QosProfile) {
self.qos.set_topic_qos(topic, profile);
}
pub fn set_topic_qos_if_absent(&mut self, topic: impl Into<String>, profile: QosProfile) {
self.qos.set_topic_qos_if_absent(topic, profile);
}
pub fn topic_qos(&self, topic: &str) -> Option<QosProfile> {
self.qos.topic_qos(topic)
}
pub fn register_service(&mut self, service: impl Into<String>) {
self.discovery.register_service(service);
}
pub fn register_service_with_ttl(&mut self, service: impl Into<String>, ttl: Duration) {
self.discovery.register_service_with_ttl(service, ttl);
}
pub fn register_mission(&mut self, mission: impl Into<String>) {
self.discovery.register_mission(mission);
}
pub fn register_mission_with_ttl(&mut self, mission: impl Into<String>, ttl: Duration) {
self.discovery.register_mission_with_ttl(mission, ttl);
}
pub fn register_endpoint(&mut self, name: impl Into<String>, endpoint: Endpoint) {
self.discovery.register_endpoint(name, endpoint);
}
pub fn register_endpoint_with_ttl(
&mut self,
name: impl Into<String>,
endpoint: Endpoint,
ttl: Duration,
) {
self.discovery
.register_endpoint_with_ttl(name, endpoint, ttl);
}
pub fn unregister_endpoint(&mut self, name: &str) -> bool {
self.discovery.unregister_endpoint(name)
}
pub fn register_topic_subscriber_endpoint_with_ttl(
&mut self,
name: impl Into<String>,
topic: impl Into<String>,
mut endpoint: Endpoint,
qos: QosProfile,
ttl: Duration,
) {
let name = name.into();
let topic = topic.into();
let kind_label = Self::topic_subscriber_kind_label().to_string();
if !endpoint.labels.contains(&kind_label) {
endpoint.labels.push(kind_label);
}
let topic_label = Self::topic_subscriber_topic_label(&topic);
if !endpoint.labels.contains(&topic_label) {
endpoint.labels.push(topic_label.clone());
}
let qos_label = Self::topic_subscriber_qos_label(qos).to_string();
if !endpoint.labels.contains(&qos_label) {
endpoint.labels.push(qos_label.clone());
}
self.register_topic_with_ttl(topic, ttl);
self.register_endpoint_with_ttl(name.clone(), endpoint, ttl);
self.discovery.add_labels(
name,
vec![
Self::topic_subscriber_kind_label().to_string(),
topic_label,
qos_label,
],
);
}
pub fn topic_subscriber_endpoints(&self, topic: &str) -> Vec<DiscoveryEndpoint> {
let topic_label = Self::topic_subscriber_topic_label(topic);
self.discovery
.endpoint_entries()
.into_iter()
.filter(|entry| {
entry
.endpoint
.labels
.iter()
.any(|label| label == Self::topic_subscriber_kind_label())
&& entry
.endpoint
.labels
.iter()
.any(|label| label == &topic_label)
})
.collect()
}
pub fn topic_subscriber_counts(&self, topic: &str) -> (usize, usize) {
let mut reliable = 0usize;
let mut best_effort = 0usize;
for entry in self.topic_subscriber_endpoints(topic) {
if entry
.endpoint
.labels
.iter()
.any(|label| label == Self::topic_subscriber_reliable_label())
{
reliable += 1;
} else if entry
.endpoint
.labels
.iter()
.any(|label| label == Self::topic_subscriber_best_effort_label())
{
best_effort += 1;
}
}
(reliable, best_effort)
}
pub fn topic_subscriber_count(&self, topic: &str) -> usize {
let (reliable, best_effort) = self.topic_subscriber_counts(topic);
reliable + best_effort
}
fn topic_reliable_subscriber_acks_by_origin(
&self,
topic: &str,
expect_local_origin: bool,
) -> Vec<(String, Option<u64>)> {
self.topic_subscriber_endpoints(topic)
.into_iter()
.filter(|entry| {
let is_reliable = entry
.endpoint
.labels
.iter()
.any(|label| label == Self::topic_subscriber_reliable_label());
if !is_reliable {
return false;
}
let is_local = entry
.endpoint
.labels
.iter()
.any(|label| label == Self::topic_subscriber_origin_local_label());
is_local == expect_local_origin
})
.map(|entry| {
let acked_seq = Self::parse_topic_subscriber_acked_seq(&entry.endpoint.labels);
(entry.name, acked_seq)
})
.collect()
}
pub fn topic_local_reliable_subscriber_acks(&self, topic: &str) -> Vec<(String, Option<u64>)> {
self.topic_reliable_subscriber_acks_by_origin(topic, true)
}
pub fn topic_reliable_subscriber_acks(&self, topic: &str) -> Vec<(String, Option<u64>)> {
self.topic_reliable_subscriber_acks_by_origin(topic, false)
}
pub fn update_topic_subscriber_ack(
&mut self,
endpoint_name: &str,
acked_seq: Option<u64>,
) -> bool {
let Some(entry) = self.find_endpoint(endpoint_name) else {
return false;
};
let is_topic_subscriber = entry
.endpoint
.labels
.iter()
.any(|label| label == Self::topic_subscriber_kind_label());
let is_reliable = entry
.endpoint
.labels
.iter()
.any(|label| label == Self::topic_subscriber_reliable_label());
if !is_topic_subscriber || !is_reliable {
return false;
}
let mut labels = entry.endpoint.labels;
let existing_acked_seq = Self::parse_topic_subscriber_acked_seq(&labels);
labels.retain(|label| !label.starts_with(Self::topic_subscriber_acked_seq_prefix()));
if let Some(seq) = acked_seq {
let seq = existing_acked_seq.map_or(seq, |existing| existing.max(seq));
labels.push(format!("{}{}", Self::topic_subscriber_acked_seq_prefix(), seq));
}
self.discovery.update_endpoint_labels(endpoint_name, labels)
}
pub fn find_endpoint(&self, name: &str) -> Option<DiscoveryEndpoint> {
self.discovery.find_endpoint(name)
}
pub fn endpoint_entries(&self) -> Vec<DiscoveryEndpoint> {
self.discovery.endpoint_entries()
}
pub fn renew_topic_lease(&mut self, topic: &str, ttl: Duration) -> bool {
self.discovery.renew_topic_lease(topic, ttl)
}
pub fn renew_service_lease(&mut self, service: &str, ttl: Duration) -> bool {
self.discovery.renew_service_lease(service, ttl)
}
pub fn renew_mission_lease(&mut self, mission: &str, ttl: Duration) -> bool {
self.discovery.renew_mission_lease(mission, ttl)
}
pub fn renew_endpoint_lease(&mut self, endpoint: &str, ttl: Duration) -> bool {
self.discovery.renew_endpoint_lease(endpoint, ttl)
}
pub fn set_topic_health(&mut self, topic: &str, healthy: bool) -> bool {
self.discovery.set_topic_health(topic, healthy)
}
pub fn set_service_health(&mut self, service: &str, healthy: bool) -> bool {
self.discovery.set_service_health(service, healthy)
}
pub fn set_mission_health(&mut self, mission: &str, healthy: bool) -> bool {
self.discovery.set_mission_health(mission, healthy)
}
pub fn set_endpoint_health(&mut self, endpoint: &str, healthy: bool) -> bool {
self.discovery.set_endpoint_health(endpoint, healthy)
}
pub fn prune_discovery_inactive(&mut self) -> DiscoveryPruneReport {
self.discovery.prune_inactive()
}
pub fn snapshot(&self) -> DiscoverySnapshot {
self.discovery.snapshot()
}
pub fn topic_entries(&self) -> Vec<DiscoveryEntry> {
self.discovery.topic_entries()
}
pub fn service_entries(&self) -> Vec<DiscoveryEntry> {
self.discovery.service_entries()
}
pub fn mission_entries(&self) -> Vec<DiscoveryEntry> {
self.discovery.mission_entries()
}
}