use anyhow::{anyhow, Result};
use dashmap::DashMap;
use runar_macros_common::{log_debug, log_error, log_info, log_warn};
use std::collections::HashMap;
use std::pin::Pin;
use std::sync::Arc;
use tokio::sync::RwLock;
use uuid::Uuid;
use crate::routing::{PathTrie, TopicPath};
use crate::services::abstract_service::{AbstractService, ServiceState};
use crate::services::{ActionHandler, EventContext, EventRegistrationOptions, RemoteService};
use runar_common::logging::Logger;
use runar_schemas::{ActionMetadata, ServiceMetadata, SubscriptionMetadata};
use runar_serializer::ArcValue;
pub type EventHandler = Arc<
dyn Fn(Arc<EventContext>, Option<ArcValue>) -> Pin<Box<dyn Future<Output = Result<()>> + Send>>
+ Send
+ Sync,
>;
pub type RemoteEventHandler =
Arc<dyn Fn(Option<ArcValue>) -> Pin<Box<dyn Future<Output = Result<()>> + Send>> + Send + Sync>;
use std::future::Future;
pub type ServiceFuture = Pin<Box<dyn Future<Output = Result<ArcValue>> + Send>>;
pub type EventSubscriber = Arc<
dyn Fn(Arc<EventContext>, Option<ArcValue>) -> Pin<Box<dyn Future<Output = Result<()>> + Send>>
+ Send
+ Sync,
>;
pub type ActionRegistrar = Arc<
dyn Fn(
&str,
&str,
ActionHandler,
Option<ActionMetadata>,
) -> Pin<Box<dyn Future<Output = Result<()>> + Send>>
+ Send
+ Sync,
>;
pub enum LocationType {
Local,
Remote,
}
#[derive(Clone)]
pub enum SubscriberKind {
Local(EventHandler),
Remote(RemoteEventHandler),
}
#[derive(Clone)]
pub struct ServiceEntry {
pub service: Arc<dyn AbstractService>,
pub service_topic: TopicPath,
pub service_state: ServiceState,
pub registration_time: u64,
pub last_start_time: Option<u64>,
}
impl std::fmt::Debug for ServiceEntry {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
f.debug_struct("ServiceEntry")
.field("name", &self.service.name())
.field("path", &self.service.path())
.field("version", &self.service.version())
.field("description", &self.service.description())
.field("state", &self.service_state)
.field("topic", &self.service_topic)
.field("registration_time", &self.registration_time)
.field("last_start_time", &self.last_start_time)
.finish()
}
}
pub type LocalActionEntryValue = (ActionHandler, TopicPath, Option<ActionMetadata>);
pub type SubscriptionEntry = (String, SubscriberKind, SubscriptionMetadata);
pub type SubscriptionVec = Vec<SubscriptionEntry>;
pub const INTERNAL_SERVICES: [&str; 2] = ["$registry", "$keys"];
pub struct ServiceRegistry {
local_action_handlers: Arc<RwLock<PathTrie<LocalActionEntryValue>>>,
remote_action_handlers: Arc<RwLock<PathTrie<Vec<ActionHandler>>>>,
event_subscriptions: Arc<RwLock<PathTrie<SubscriptionVec>>>,
subscription_id_to_topic_path: Arc<DashMap<String, TopicPath>>,
subscription_id_to_service_topic_path: Arc<DashMap<String, TopicPath>>,
local_services: Arc<RwLock<PathTrie<Arc<ServiceEntry>>>>,
local_services_list: Arc<DashMap<TopicPath, Arc<ServiceEntry>>>,
remote_services: Arc<RwLock<PathTrie<Arc<RemoteService>>>>,
local_service_states: Arc<DashMap<String, ServiceState>>,
remote_service_states: Arc<DashMap<String, ServiceState>>,
remote_peer_subscriptions: Arc<DashMap<String, DashMap<String, String>>>,
logger: Arc<Logger>,
}
impl Clone for ServiceRegistry {
fn clone(&self) -> Self {
ServiceRegistry {
local_action_handlers: self.local_action_handlers.clone(),
remote_action_handlers: self.remote_action_handlers.clone(),
event_subscriptions: self.event_subscriptions.clone(),
subscription_id_to_topic_path: self.subscription_id_to_topic_path.clone(),
subscription_id_to_service_topic_path: self
.subscription_id_to_service_topic_path
.clone(),
local_services: self.local_services.clone(),
local_services_list: self.local_services_list.clone(),
remote_services: self.remote_services.clone(),
local_service_states: self.local_service_states.clone(),
remote_service_states: self.remote_service_states.clone(),
remote_peer_subscriptions: self.remote_peer_subscriptions.clone(),
logger: self.logger.clone(),
}
}
}
impl ServiceRegistry {
pub fn new(logger: Arc<Logger>) -> Self {
Self {
local_action_handlers: Arc::new(RwLock::new(PathTrie::new())),
remote_action_handlers: Arc::new(RwLock::new(PathTrie::new())),
event_subscriptions: Arc::new(RwLock::new(PathTrie::new())),
subscription_id_to_topic_path: Arc::new(DashMap::new()),
subscription_id_to_service_topic_path: Arc::new(DashMap::new()),
local_services: Arc::new(RwLock::new(PathTrie::new())),
local_services_list: Arc::new(DashMap::new()),
remote_services: Arc::new(RwLock::new(PathTrie::new())),
local_service_states: Arc::new(DashMap::new()),
remote_service_states: Arc::new(DashMap::new()),
remote_peer_subscriptions: Arc::new(DashMap::new()),
logger,
}
}
pub async fn register_local_service(&self, service: Arc<ServiceEntry>) -> Result<()> {
let service_entry = service.clone();
let service_topic = service_entry.service_topic.clone();
log_info!(self.logger, "Registering local service: {service_topic}");
self.local_services
.write()
.await
.set_value(service_topic.clone(), service);
self.local_services_list
.insert(service_topic, service_entry.clone());
Ok(())
}
pub async fn remove_remote_service(&self, service_topic: &TopicPath) -> Result<()> {
let services = self.remote_services.read().await.find(service_topic);
if services.is_empty() {
return Err(anyhow!("Service not found for topic: {}", service_topic));
}
let registry_delegate = Arc::new(self.clone());
for service in services {
let context =
super::RemoteLifecycleContext::new(&service.service_topic, self.logger.clone())
.with_registry_delegate(registry_delegate.clone());
if let Err(e) = service.stop(context).await {
log_error!(
self.logger,
"Failed to stop remote service '{}' error: {}",
service.path(),
e
);
}
}
self.remote_services
.write()
.await
.remove_values(service_topic);
self.remove_remote_service_state(service_topic).await?;
Ok(())
}
pub async fn register_remote_service(&self, service: Arc<RemoteService>) -> bool {
let service_topic = service.service_topic.clone();
let service_path = service.path().to_string();
let peer_node_id = service.peer_node_id().clone();
log_info!(
self.logger,
"Registering remote service: {service_path} from peer: {peer_node_id}"
);
{
let mut services = self.remote_services.write().await;
let matches = services.find_matches(&service_topic);
if matches.is_empty() {
services.set_value(service_topic, service);
} else {
log_warn!(
self.logger,
"Service already exists for topic: {service_topic}"
);
return false;
}
}
true
}
pub async fn register_local_action_handler(
&self,
topic_path: &TopicPath,
handler: ActionHandler,
metadata: Option<ActionMetadata>,
) -> Result<()> {
log_debug!(
self.logger,
"Registering local action handler for: {topic_path}"
);
self.local_action_handlers
.write()
.await
.set_value(topic_path.clone(), (handler, topic_path.clone(), metadata));
Ok(())
}
pub async fn remove_remote_action_handler(&self, topic_path: &TopicPath) -> Result<()> {
log_debug!(
self.logger,
"Removing remote action handler for: {topic_path}"
);
self.remote_action_handlers
.write()
.await
.remove_values(topic_path);
Ok(())
}
pub async fn register_remote_action_handler(
&self,
topic_path: &TopicPath,
handler: ActionHandler,
) -> Result<()> {
log_debug!(
self.logger,
"Registering remote action handler for: {}",
topic_path.as_str()
);
{
let mut handlers_trie = self.remote_action_handlers.write().await;
let matches = handlers_trie.find_matches(topic_path);
if matches.is_empty() {
handlers_trie.set_value(topic_path.clone(), vec![handler.clone()]);
} else {
let mut existing_handlers = matches[0].content.clone();
existing_handlers.push(handler.clone());
handlers_trie.set_value(topic_path.clone(), existing_handlers);
}
}
Ok(())
}
pub async fn get_local_action_handler(
&self,
topic_path: &TopicPath,
) -> Option<(ActionHandler, TopicPath)> {
let handlers_trie = self.local_action_handlers.read().await;
let matches = handlers_trie.find_matches(topic_path);
if !matches.is_empty() {
let (handler, topic_path, _metadata) = matches[0].content.clone();
Some((handler, topic_path))
} else {
None
}
}
pub async fn get_remote_action_handlers(&self, topic_path: &TopicPath) -> Vec<ActionHandler> {
let handlers_trie = self.remote_action_handlers.read().await;
let matches = handlers_trie.find_matches(topic_path);
matches
.into_iter()
.flat_map(|mat| mat.content.clone())
.collect()
}
pub async fn get_action_handler(&self, topic_path: &TopicPath) -> Option<ActionHandler> {
if let Some((handler, _)) = self.get_local_action_handler(topic_path).await {
return Some(handler);
}
let remote_handlers = self.get_remote_action_handlers(topic_path).await;
if !remote_handlers.is_empty() {
return Some(remote_handlers[0].clone());
}
None
}
pub async fn register_local_event_subscription(
&self,
topic_path: &TopicPath,
callback: EventHandler,
_options: &EventRegistrationOptions,
) -> Result<String> {
let subscription_id = Uuid::new_v4().to_string();
{
let mut trie = self.event_subscriptions.write().await;
let mut list = trie
.find_matches(topic_path)
.first()
.map(|m| m.content.clone())
.unwrap_or_default();
list.push((
subscription_id.clone(),
SubscriberKind::Local(callback),
SubscriptionMetadata {
path: topic_path.as_str().to_string(),
},
));
trie.set_value(topic_path.clone(), list);
}
self.subscription_id_to_topic_path
.insert(subscription_id.clone(), topic_path.clone());
let service_topic =
TopicPath::new(&topic_path.service_path(), &topic_path.network_id()).unwrap();
self.subscription_id_to_service_topic_path
.insert(subscription_id.clone(), service_topic);
Ok(subscription_id)
}
pub async fn register_remote_event_subscription(
&self,
topic_path: &TopicPath,
callback: RemoteEventHandler,
_options: EventRegistrationOptions,
) -> Result<String> {
let subscription_id = Uuid::new_v4().to_string();
{
let mut trie = self.event_subscriptions.write().await;
let mut list = trie
.find_matches(topic_path)
.first()
.map(|m| m.content.clone())
.unwrap_or_default();
list.push((
subscription_id.clone(),
SubscriberKind::Remote(callback),
SubscriptionMetadata {
path: topic_path.as_str().to_string(),
},
));
trie.set_value(topic_path.clone(), list);
}
self.subscription_id_to_topic_path
.insert(subscription_id.clone(), topic_path.clone());
Ok(subscription_id)
}
pub async fn remove_remote_event_subscription(&self, topic_path: &TopicPath) -> Result<()> {
let mut trie = self.event_subscriptions.write().await;
let matches = trie.find_matches(topic_path);
let mut ids_to_remove = Vec::new();
for m in &matches {
for (id, kind, _) in &m.content {
if matches!(kind, SubscriberKind::Remote(_)) {
ids_to_remove.push(id.clone());
}
}
}
if ids_to_remove.is_empty() {
return Ok(());
}
for m in matches {
let remaining: Vec<(String, SubscriberKind, SubscriptionMetadata)> = m
.content
.into_iter()
.filter(|(id, kind, _)| {
!(ids_to_remove.contains(id) && matches!(kind, SubscriberKind::Remote(_)))
})
.collect();
if remaining.is_empty() {
trie.remove_values(topic_path);
} else {
trie.set_value(topic_path.clone(), remaining);
}
}
for id in ids_to_remove {
self.subscription_id_to_topic_path.remove(&id);
self.subscription_id_to_service_topic_path.remove(&id);
}
Ok(())
}
pub async fn get_local_event_subscribers(
&self,
topic_path: &TopicPath,
) -> Vec<(String, EventHandler, SubscriptionMetadata)> {
let trie = self.event_subscriptions.read().await;
let matches = trie.find_matches(topic_path);
let estimated: usize = matches.iter().map(|m| m.content.len()).sum();
let mut result = Vec::with_capacity(estimated);
let mut seen_ids = std::collections::HashSet::new();
for m in matches {
for (id, kind, meta) in m.content.clone() {
if seen_ids.contains(&id) {
continue;
}
if let SubscriberKind::Local(handler) = kind {
seen_ids.insert(id.clone());
result.push((id, handler, meta));
}
}
}
result
}
pub async fn get_remote_event_subscribers(
&self,
topic_path: &TopicPath,
) -> Vec<(String, RemoteEventHandler, SubscriptionMetadata)> {
let trie = self.event_subscriptions.read().await;
let matches = trie.find_matches(topic_path);
let estimated: usize = matches.iter().map(|m| m.content.len()).sum();
let mut result = Vec::with_capacity(estimated);
let mut seen_ids = std::collections::HashSet::new();
for m in matches {
for (id, kind, meta) in m.content.clone() {
if seen_ids.contains(&id) {
continue;
}
if let SubscriberKind::Remote(handler) = kind {
seen_ids.insert(id.clone());
result.push((id, handler, meta));
}
}
}
result
}
pub async fn update_local_service_state(
&self,
service_topic: &TopicPath,
state: ServiceState,
) -> Result<()> {
log_debug!(
self.logger,
"Updating local service state for {}: {:?}",
service_topic,
state
);
self.local_service_states
.insert(service_topic.as_str().to_string(), state);
Ok(())
}
pub async fn update_remote_service_state(
&self,
service_topic: &TopicPath,
state: ServiceState,
) -> Result<()> {
log_debug!(
self.logger,
"Updating remote service state for {}: {:?}",
service_topic,
state
);
self.remote_service_states
.insert(service_topic.as_str().to_string(), state);
Ok(())
}
pub async fn remove_remote_service_state(&self, service_topic: &TopicPath) -> Result<()> {
self.remote_service_states.remove(service_topic.as_str());
Ok(())
}
pub async fn get_local_service_state(&self, service_path: &TopicPath) -> Option<ServiceState> {
self.local_service_states
.get(service_path.as_str())
.map(|entry| *entry.value())
}
pub async fn get_remote_service_state(&self, service_path: &TopicPath) -> Option<ServiceState> {
self.remote_service_states
.get(service_path.as_str())
.map(|entry| *entry.value())
}
pub async fn get_subscriptions_metadata(
&self,
search_path: &TopicPath,
) -> Vec<SubscriptionMetadata> {
let events = self.event_subscriptions.read().await;
let matches = events.find_matches(search_path);
let estimated: usize = matches.iter().map(|m| m.content.len()).sum();
let mut result = Vec::with_capacity(estimated);
for match_item in matches {
let event_topic_list = &match_item.content;
for (_, _, metadata) in event_topic_list {
result.push(metadata.clone());
}
}
result
}
pub async fn get_actions_metadata(&self, search_path: &TopicPath) -> Vec<ActionMetadata> {
let actions = self.local_action_handlers.read().await;
let matches = actions.find_matches(search_path);
let mut result = Vec::with_capacity(matches.len());
for match_item in matches {
let (_, _, metadata) = &match_item.content;
if let Some(metadata) = metadata {
result.push(metadata.clone());
}
}
result
}
pub async fn get_local_services(&self) -> HashMap<TopicPath, Arc<ServiceEntry>> {
let mut result = HashMap::with_capacity(self.local_services_list.len());
for entry in self.local_services_list.iter() {
result.insert(entry.key().clone(), entry.value().clone());
}
result
}
pub async fn get_local_services_ref(&self) -> &DashMap<TopicPath, Arc<ServiceEntry>> {
&self.local_services_list
}
pub async fn unsubscribe_local(&self, subscription_id: &str) -> Result<()> {
log_debug!(
self.logger,
"Attempting to unsubscribe local subscription ID: {subscription_id}"
);
let topic_path_option = self
.subscription_id_to_topic_path
.get(subscription_id)
.map(|entry| entry.value().clone());
if let Some(topic_path) = topic_path_option {
log_debug!(
self.logger,
"Found topic path '{}' for subscription ID: {}",
topic_path.as_str(),
subscription_id
);
let mut trie = self.event_subscriptions.write().await;
let matches = trie.find_matches(&topic_path);
if !matches.is_empty() {
for m in matches {
let filtered: Vec<_> = m
.content
.into_iter()
.filter(|(id, kind, _)| {
!(id == subscription_id && matches!(kind, SubscriberKind::Local(_)))
})
.collect();
if filtered.is_empty() {
trie.remove_values(&topic_path);
} else {
trie.set_value(topic_path.clone(), filtered);
}
}
self.subscription_id_to_topic_path.remove(subscription_id);
self.subscription_id_to_service_topic_path
.remove(subscription_id);
log_debug!(
self.logger,
"Successfully unsubscribed from topic: {} with ID: {}",
topic_path.as_str(),
subscription_id
);
Ok(())
} else {
let msg = format!(
"No subscriptions found for topic path {topic_path} and ID {subscription_id}",
);
log_warn!(self.logger, "{}", msg);
Err(anyhow!(msg))
}
} else {
let msg = format!(
"No topic path found mapping to subscription ID: {subscription_id}. Cannot unsubscribe."
);
log_warn!(self.logger, "{}", msg);
Err(anyhow!(msg))
}
}
pub async fn upsert_remote_peer_subscription(
&self,
peer_id: &str,
path: &TopicPath,
sub_id: String,
) {
let peer_subscriptions = self
.remote_peer_subscriptions
.entry(peer_id.to_string())
.or_default();
peer_subscriptions.insert(path.as_str().to_string(), sub_id);
}
pub async fn upsert_remote_peer_subscription_owned(
&self,
peer_id: String,
path: &TopicPath,
sub_id: String,
) {
let peer_subscriptions = self.remote_peer_subscriptions.entry(peer_id).or_default();
peer_subscriptions.insert(path.as_str().to_string(), sub_id);
}
pub async fn remove_remote_peer_subscription(
&self,
peer_id: &str,
path: &TopicPath,
) -> Option<String> {
self.remote_peer_subscriptions
.get(peer_id)
.and_then(|peer_entry| {
peer_entry
.value()
.remove(path.as_str())
.map(|(_, sub_id)| sub_id)
})
}
pub async fn drain_remote_peer_subscriptions(&self, peer_id: &str) -> Vec<String> {
self.remote_peer_subscriptions
.remove(peer_id)
.map(|(_, peer_subscriptions)| {
peer_subscriptions
.into_iter()
.map(|entry| entry.1)
.collect()
})
.unwrap_or_default()
}
pub async fn remote_subscription_paths(
&self,
peer_id: &str,
) -> std::collections::HashSet<String> {
self.remote_peer_subscriptions
.get(peer_id)
.map(|peer_entry| {
peer_entry
.value()
.iter()
.map(|path_entry| path_entry.key().clone())
.collect()
})
.unwrap_or_default()
}
pub async fn unsubscribe_remote(&self, subscription_id: &str) -> Result<()> {
log_debug!(
self.logger,
"Attempting to unsubscribe remote subscription ID: {subscription_id}"
);
let topic_path_option = self
.subscription_id_to_topic_path
.get(subscription_id)
.map(|entry| entry.value().clone());
if let Some(topic_path) = topic_path_option {
log_debug!(
self.logger,
"Found topic path '{}' for subscription ID: {}",
topic_path.as_str(),
subscription_id
);
let mut trie = self.event_subscriptions.write().await;
let matches = trie.find_matches(&topic_path);
if !matches.is_empty() {
let mut removed_flag = false;
for m in matches {
let filtered: Vec<_> = m
.content
.into_iter()
.filter(|(id, kind, _)| {
!(id == subscription_id && matches!(kind, SubscriberKind::Remote(_)))
})
.collect();
if filtered.is_empty() {
trie.remove_values(&topic_path);
} else {
trie.set_value(topic_path.clone(), filtered);
}
removed_flag = true;
}
if removed_flag {
self.subscription_id_to_topic_path.remove(subscription_id);
log_debug!(
self.logger,
"Successfully unsubscribed from remote topic: {} with ID: {}",
topic_path.as_str(),
subscription_id
);
Ok(())
} else {
let msg = format!(
"Subscription handler not found for remote topic path {topic_path} and ID {subscription_id}, although ID was mapped. Potential race condition?"
);
log_warn!(self.logger, "{}", msg);
Err(anyhow!(msg))
}
} else {
let msg = format!(
"No subscriptions found for remote topic path {topic_path} and ID {subscription_id}",
);
log_warn!(self.logger, "{}", msg);
Err(anyhow!(msg))
}
} else {
let msg = format!(
"No topic path found mapping to remote subscription ID: {subscription_id}. Cannot unsubscribe."
);
log_warn!(self.logger, "{}", msg);
Err(anyhow!(msg))
}
}
async fn get_service_metadata(&self, topic_path: &TopicPath) -> Option<ServiceMetadata> {
let services = self.local_services.read().await;
let matches = services.find_matches(topic_path);
if !matches.is_empty() {
let service_entry = &matches[0].content;
let service = &service_entry.service; let search_path = format!("{service_path}/*", service_path = service.path());
let network_id_string = topic_path.network_id();
let service_topic_path =
TopicPath::new(search_path.as_str(), &network_id_string).unwrap();
let actions = self.get_actions_metadata(&service_topic_path).await;
return Some(ServiceMetadata {
network_id: network_id_string,
service_path: service.path().to_string(),
name: service.name().to_string(),
version: service.version().to_string(),
description: service.description().to_string(),
actions,
registration_time: service_entry.registration_time,
last_start_time: service_entry.last_start_time,
});
}
None
}
pub async fn get_all_subscriptions(
&self,
include_internal_services: bool,
) -> Result<Vec<SubscriptionMetadata>> {
let subscriptions = self.event_subscriptions.read().await;
let all_values = subscriptions.get_all_values();
let mut result = Vec::new();
for subscription_vec in all_values {
for (_, _, metadata) in subscription_vec {
if !include_internal_services {
let tp = TopicPath::from_full_path(&metadata.path).map_err(|e| {
anyhow!("Invalid subscription topic path {}: {e}", metadata.path)
})?;
let service_path = tp.service_path();
if service_path.starts_with('$')
|| INTERNAL_SERVICES.contains(&service_path.as_str())
{
continue;
}
}
result.push(metadata);
}
}
Ok(result)
}
pub async fn get_all_subscriptions_optimized(
&self,
include_internal_services: bool,
) -> Result<Vec<SubscriptionMetadata>> {
let subscriptions = self.event_subscriptions.read().await;
let all_values = subscriptions.get_all_values();
let estimated_capacity = all_values.iter().map(|vec| vec.len()).sum();
let mut result = Vec::with_capacity(estimated_capacity);
for subscription_vec in all_values {
for (_, _, metadata) in subscription_vec {
if !include_internal_services {
let tp = TopicPath::from_full_path(&metadata.path).map_err(|e| {
anyhow!("Invalid subscription topic path {}: {e}", metadata.path)
})?;
let service_path = tp.service_path();
if service_path.starts_with('$')
|| INTERNAL_SERVICES.contains(&service_path.as_str())
{
continue;
}
}
result.push(metadata);
}
}
Ok(result)
}
pub async fn get_all_service_metadata(
&self,
include_internal_services: bool,
) -> Result<HashMap<String, ServiceMetadata>> {
let mut result = HashMap::with_capacity(self.local_services_list.len());
let local_services = self.get_local_services().await;
for (_, service_entry) in local_services {
let service = &service_entry.service;
let path_str = service.path();
if !include_internal_services && INTERNAL_SERVICES.contains(&path_str) {
continue;
}
let search_path = format!("{path_str}/*");
let search_topic = TopicPath::new(
&search_path,
&service_entry.service_topic.network_id().to_string(),
)
.map_err(|e| anyhow!("Failed to create topic path: {e}"))?;
let service_metadata = self
.get_service_metadata(&search_topic)
.await
.ok_or_else(|| anyhow!("Service metadata not found for topic: {}", search_topic))?;
result.insert(path_str.to_string(), service_metadata);
}
Ok(result)
}
pub async fn get_all_service_metadata_ref(
&self,
include_internal_services: bool,
) -> Result<HashMap<String, ServiceMetadata>> {
let mut result = HashMap::new();
for entry in self.local_services_list.iter() {
let service_entry = entry.value();
let service = &service_entry.service;
let path_str = service.path();
if !include_internal_services && INTERNAL_SERVICES.contains(&path_str) {
continue;
}
let search_path = format!("{path_str}/*");
let search_topic = TopicPath::new(
&search_path,
&service_entry.service_topic.network_id().to_string(),
)
.map_err(|e| anyhow!("Failed to create topic path: {e}"))?;
let service_metadata = self
.get_service_metadata(&search_topic)
.await
.ok_or_else(|| anyhow!("Service metadata not found for topic: {}", search_topic))?;
result.insert(path_str.to_string(), service_metadata);
}
Ok(result)
}
}
#[async_trait::async_trait]
impl crate::services::RegistryDelegate for ServiceRegistry {
async fn get_local_service_state(&self, service_path: &TopicPath) -> Option<ServiceState> {
self.get_local_service_state(service_path).await
}
async fn get_remote_service_state(&self, service_path: &TopicPath) -> Option<ServiceState> {
self.get_remote_service_state(service_path).await
}
async fn get_actions_metadata(&self, service_topic_path: &TopicPath) -> Vec<ActionMetadata> {
self.get_actions_metadata(service_topic_path).await
}
async fn get_service_metadata(&self, topic_path: &TopicPath) -> Option<ServiceMetadata> {
self.get_service_metadata(topic_path).await
}
async fn get_all_service_metadata(
&self,
include_internal_services: bool,
) -> Result<HashMap<String, ServiceMetadata>> {
self.get_all_service_metadata(include_internal_services)
.await
}
async fn register_remote_action_handler(
&self,
topic_path: &TopicPath,
handler: ActionHandler,
) -> Result<()> {
self.register_remote_action_handler(topic_path, handler)
.await
}
async fn remove_remote_action_handler(&self, topic_path: &TopicPath) -> Result<()> {
self.remove_remote_action_handler(topic_path).await
}
async fn register_remote_event_handler(
&self,
topic_path: &TopicPath,
handler: RemoteEventHandler,
) -> Result<String> {
self.register_remote_event_subscription(
topic_path,
handler,
EventRegistrationOptions::default(),
)
.await
}
async fn remove_remote_event_handler(&self, topic_path: &TopicPath) -> Result<()> {
self.remove_remote_event_subscription(topic_path).await
}
async fn update_local_service_state_if_valid(
&self,
service_path: &TopicPath,
new_state: ServiceState,
current_state: ServiceState,
) -> Result<()> {
match (current_state, new_state) {
(ServiceState::Running, ServiceState::Paused) => {
self.update_local_service_state(service_path, new_state)
.await
}
(ServiceState::Paused, ServiceState::Running) => {
self.update_local_service_state(service_path, new_state)
.await
}
_ => {
Err(anyhow!(
"Invalid state transition from {:?} to {:?}",
current_state,
new_state
))
}
}
}
async fn validate_pause_transition(&self, service_path: &TopicPath) -> Result<()> {
let current_state = self.get_local_service_state(service_path).await;
match current_state {
Some(ServiceState::Running) => {
Ok(())
}
Some(state) => {
Err(anyhow!(
"Cannot pause service in {:?} state. Service must be in Running state.",
state
))
}
None => {
Err(anyhow!("Service not found: {}", service_path.as_str()))
}
}
}
async fn validate_resume_transition(&self, service_path: &TopicPath) -> Result<()> {
let current_state = self.get_local_service_state(service_path).await;
match current_state {
Some(ServiceState::Paused) => {
Ok(())
}
Some(state) => {
Err(anyhow!(
"Cannot resume service in {:?} state. Service must be in Paused state.",
state
))
}
None => {
Err(anyhow!("Service not found: {}", service_path.as_str()))
}
}
}
}