robotrt-middleware-core 0.1.0-beta.2

RobotRT modular robotics runtime and middleware components.
Documentation
use std::time::Duration;

use transport_core::Endpoint;

use crate::config::MiddlewareRuntimeConfig;
use crate::discovery::{
    DiscoveryEndpoint, DiscoveryEntry, DiscoveryPruneReport, DiscoverySnapshot,
};
use crate::qos::QosProfile;

use super::MiddlewareStack;

impl MiddlewareStack {
    fn topic_subscriber_kind_label() -> &'static str {
        "kind:topic-subscriber"
    }

    fn topic_subscriber_topic_label(topic: &str) -> String {
        format!("topic:{topic}")
    }

    fn topic_subscriber_qos_label(qos: QosProfile) -> &'static str {
        if qos.reliable {
            "qos:reliable"
        } else {
            "qos:best_effort"
        }
    }

    fn topic_subscriber_reliable_label() -> &'static str {
        "qos:reliable"
    }

    fn topic_subscriber_best_effort_label() -> &'static str {
        "qos:best_effort"
    }

    fn topic_subscriber_origin_local_label() -> &'static str {
        "origin:local"
    }

    fn topic_subscriber_acked_seq_prefix() -> &'static str {
        "acked_seq:"
    }

    fn parse_topic_subscriber_acked_seq(labels: &[String]) -> Option<u64> {
        labels.iter().find_map(|label| {
            label
                .strip_prefix(Self::topic_subscriber_acked_seq_prefix())
                .and_then(|value| value.parse::<u64>().ok())
        })
    }

    pub fn apply_runtime_config(&mut self, config: MiddlewareRuntimeConfig) {
        self.route_rules = config.route_rules;
        self.namespace_isolation = config.namespace_isolation;
        self.topic_bus
            .set_reliability_policy(config.topic_reliability_policy);
        for item in config.topic_qos_overrides {
            self.qos.set_topic_qos(item.topic, item.profile);
        }
    }

    pub fn register_topic(&mut self, topic: impl Into<String>) {
        self.discovery.register_topic(topic);
    }

    pub fn register_topic_with_ttl(&mut self, topic: impl Into<String>, ttl: Duration) {
        self.discovery.register_topic_with_ttl(topic, ttl);
    }

    pub fn set_topic_qos(&mut self, topic: impl Into<String>, profile: QosProfile) {
        self.qos.set_topic_qos(topic, profile);
    }

    pub fn set_topic_qos_if_absent(&mut self, topic: impl Into<String>, profile: QosProfile) {
        self.qos.set_topic_qos_if_absent(topic, profile);
    }

    pub fn topic_qos(&self, topic: &str) -> Option<QosProfile> {
        self.qos.topic_qos(topic)
    }

    pub fn register_service(&mut self, service: impl Into<String>) {
        self.discovery.register_service(service);
    }

    pub fn register_service_with_ttl(&mut self, service: impl Into<String>, ttl: Duration) {
        self.discovery.register_service_with_ttl(service, ttl);
    }

    pub fn register_mission(&mut self, mission: impl Into<String>) {
        self.discovery.register_mission(mission);
    }

    pub fn register_mission_with_ttl(&mut self, mission: impl Into<String>, ttl: Duration) {
        self.discovery.register_mission_with_ttl(mission, ttl);
    }

    pub fn register_endpoint(&mut self, name: impl Into<String>, endpoint: Endpoint) {
        self.discovery.register_endpoint(name, endpoint);
    }

    pub fn register_endpoint_with_ttl(
        &mut self,
        name: impl Into<String>,
        endpoint: Endpoint,
        ttl: Duration,
    ) {
        self.discovery
            .register_endpoint_with_ttl(name, endpoint, ttl);
    }

    pub fn unregister_endpoint(&mut self, name: &str) -> bool {
        self.discovery.unregister_endpoint(name)
    }

    pub fn register_topic_subscriber_endpoint_with_ttl(
        &mut self,
        name: impl Into<String>,
        topic: impl Into<String>,
        mut endpoint: Endpoint,
        qos: QosProfile,
        ttl: Duration,
    ) {
        let name = name.into();
        let topic = topic.into();

        let kind_label = Self::topic_subscriber_kind_label().to_string();
        if !endpoint.labels.contains(&kind_label) {
            endpoint.labels.push(kind_label);
        }
        let topic_label = Self::topic_subscriber_topic_label(&topic);
        if !endpoint.labels.contains(&topic_label) {
            endpoint.labels.push(topic_label.clone());
        }
        let qos_label = Self::topic_subscriber_qos_label(qos).to_string();
        if !endpoint.labels.contains(&qos_label) {
            endpoint.labels.push(qos_label.clone());
        }

        self.register_topic_with_ttl(topic, ttl);
        self.register_endpoint_with_ttl(name.clone(), endpoint, ttl);
        self.discovery.add_labels(
            name,
            vec![
                Self::topic_subscriber_kind_label().to_string(),
                topic_label,
                qos_label,
            ],
        );
    }

    pub fn topic_subscriber_endpoints(&self, topic: &str) -> Vec<DiscoveryEndpoint> {
        let topic_label = Self::topic_subscriber_topic_label(topic);
        self.discovery
            .endpoint_entries()
            .into_iter()
            .filter(|entry| {
                entry
                    .endpoint
                    .labels
                    .iter()
                    .any(|label| label == Self::topic_subscriber_kind_label())
                    && entry
                        .endpoint
                        .labels
                        .iter()
                        .any(|label| label == &topic_label)
            })
            .collect()
    }

    pub fn topic_subscriber_counts(&self, topic: &str) -> (usize, usize) {
        let mut reliable = 0usize;
        let mut best_effort = 0usize;
        for entry in self.topic_subscriber_endpoints(topic) {
            if entry
                .endpoint
                .labels
                .iter()
                .any(|label| label == Self::topic_subscriber_reliable_label())
            {
                reliable += 1;
            } else if entry
                .endpoint
                .labels
                .iter()
                .any(|label| label == Self::topic_subscriber_best_effort_label())
            {
                best_effort += 1;
            }
        }
        (reliable, best_effort)
    }

    pub fn topic_subscriber_count(&self, topic: &str) -> usize {
        let (reliable, best_effort) = self.topic_subscriber_counts(topic);
        reliable + best_effort
    }

    fn topic_reliable_subscriber_acks_by_origin(
        &self,
        topic: &str,
        expect_local_origin: bool,
    ) -> Vec<(String, Option<u64>)> {
        self.topic_subscriber_endpoints(topic)
            .into_iter()
            .filter(|entry| {
                let is_reliable = entry
                    .endpoint
                    .labels
                    .iter()
                    .any(|label| label == Self::topic_subscriber_reliable_label());
                if !is_reliable {
                    return false;
                }

                let is_local = entry
                    .endpoint
                    .labels
                    .iter()
                    .any(|label| label == Self::topic_subscriber_origin_local_label());
                is_local == expect_local_origin
            })
            .map(|entry| {
                let acked_seq = Self::parse_topic_subscriber_acked_seq(&entry.endpoint.labels);
                (entry.name, acked_seq)
            })
            .collect()
    }

    pub fn topic_local_reliable_subscriber_acks(&self, topic: &str) -> Vec<(String, Option<u64>)> {
        self.topic_reliable_subscriber_acks_by_origin(topic, true)
    }

    pub fn topic_reliable_subscriber_acks(&self, topic: &str) -> Vec<(String, Option<u64>)> {
        self.topic_reliable_subscriber_acks_by_origin(topic, false)
    }

    pub fn update_topic_subscriber_ack(
        &mut self,
        endpoint_name: &str,
        acked_seq: Option<u64>,
    ) -> bool {
        let Some(entry) = self.find_endpoint(endpoint_name) else {
            return false;
        };

        let is_topic_subscriber = entry
            .endpoint
            .labels
            .iter()
            .any(|label| label == Self::topic_subscriber_kind_label());
        let is_reliable = entry
            .endpoint
            .labels
            .iter()
            .any(|label| label == Self::topic_subscriber_reliable_label());
        if !is_topic_subscriber || !is_reliable {
            return false;
        }

        let mut labels = entry.endpoint.labels;
        let existing_acked_seq = Self::parse_topic_subscriber_acked_seq(&labels);
        labels.retain(|label| !label.starts_with(Self::topic_subscriber_acked_seq_prefix()));
        if let Some(seq) = acked_seq {
            let seq = existing_acked_seq.map_or(seq, |existing| existing.max(seq));
            labels.push(format!("{}{}", Self::topic_subscriber_acked_seq_prefix(), seq));
        }

        self.discovery.update_endpoint_labels(endpoint_name, labels)
    }

    pub fn find_endpoint(&self, name: &str) -> Option<DiscoveryEndpoint> {
        self.discovery.find_endpoint(name)
    }

    pub fn endpoint_entries(&self) -> Vec<DiscoveryEndpoint> {
        self.discovery.endpoint_entries()
    }

    pub fn renew_topic_lease(&mut self, topic: &str, ttl: Duration) -> bool {
        self.discovery.renew_topic_lease(topic, ttl)
    }

    pub fn renew_service_lease(&mut self, service: &str, ttl: Duration) -> bool {
        self.discovery.renew_service_lease(service, ttl)
    }

    pub fn renew_mission_lease(&mut self, mission: &str, ttl: Duration) -> bool {
        self.discovery.renew_mission_lease(mission, ttl)
    }

    pub fn renew_endpoint_lease(&mut self, endpoint: &str, ttl: Duration) -> bool {
        self.discovery.renew_endpoint_lease(endpoint, ttl)
    }

    pub fn set_topic_health(&mut self, topic: &str, healthy: bool) -> bool {
        self.discovery.set_topic_health(topic, healthy)
    }

    pub fn set_service_health(&mut self, service: &str, healthy: bool) -> bool {
        self.discovery.set_service_health(service, healthy)
    }

    pub fn set_mission_health(&mut self, mission: &str, healthy: bool) -> bool {
        self.discovery.set_mission_health(mission, healthy)
    }

    pub fn set_endpoint_health(&mut self, endpoint: &str, healthy: bool) -> bool {
        self.discovery.set_endpoint_health(endpoint, healthy)
    }

    pub fn prune_discovery_inactive(&mut self) -> DiscoveryPruneReport {
        self.discovery.prune_inactive()
    }

    pub fn snapshot(&self) -> DiscoverySnapshot {
        self.discovery.snapshot()
    }

    pub fn topic_entries(&self) -> Vec<DiscoveryEntry> {
        self.discovery.topic_entries()
    }

    pub fn service_entries(&self) -> Vec<DiscoveryEntry> {
        self.discovery.service_entries()
    }

    pub fn mission_entries(&self) -> Vec<DiscoveryEntry> {
        self.discovery.mission_entries()
    }
}