pub mod abstract_service;
pub mod event_context;
pub mod keys_service;
pub mod load_balancing;
pub mod registry_service;
pub mod remote_service;
pub mod request_context;
pub mod service_registry;
use crate::node::Node; use crate::routing::TopicPath;
use crate::services::service_registry::{EventHandler, RemoteEventHandler};
use anyhow::{anyhow, Result};
use runar_common::logging::{Component, Logger, LoggingContext};
use runar_macros_common::{log_debug, log_error, log_info, log_warn};
use runar_schemas::{ActionMetadata, FieldSchema};
use runar_serializer::arc_value::AsArcValue;
use runar_serializer::ArcValue;
use std::collections::HashMap;
use std::fmt::Debug;
use std::future::Future;
use std::pin::Pin;
use std::sync::Arc;
use crate::services::abstract_service::ServiceState;
use crate::services::remote_service::RemoteService;
use runar_schemas::ServiceMetadata;
pub use crate::services::event_context::EventContext;
pub use crate::services::request_context::RequestContext;
pub type ActionHandler =
Arc<dyn Fn(Option<ArcValue>, RequestContext) -> ServiceFuture + Send + Sync>;
pub type ActionRegistrar = Arc<
dyn Fn(
&TopicPath,
ActionHandler,
Option<ActionMetadata>,
) -> Pin<Box<dyn Future<Output = Result<()>> + Send>>
+ Send
+ Sync,
>;
#[derive(Clone)]
pub struct LifecycleContext {
pub network_id: String,
pub service_path: String,
pub config: Option<ArcValue>,
pub logger: Arc<Logger>,
node_delegate: Arc<Node>,
}
impl LifecycleContext {
pub fn new(topic_path: &TopicPath, node_delegate: Arc<Node>, logger: Arc<Logger>) -> Self {
Self {
network_id: topic_path.network_id(),
service_path: topic_path.service_path(),
config: None,
logger,
node_delegate,
}
}
pub fn with_config(mut self, config: ArcValue) -> Self {
self.config = Some(config);
self
}
pub fn debug(&self, message: impl Into<String>) {
log_debug!(self.logger, "{}", message.into());
}
pub fn info(&self, message: impl Into<String>) {
log_info!(self.logger, "{}", message.into());
}
pub fn warn(&self, message: impl Into<String>) {
log_warn!(self.logger, "{}", message.into());
}
pub fn error(&self, message: impl Into<String>) {
log_error!(self.logger, "{}", message.into());
}
pub async fn remote_request<P>(
&self,
topic: impl AsRef<str>,
payload: Option<P>,
) -> Result<ArcValue>
where
P: AsArcValue + Send + Sync,
{
let topic_string = topic.as_ref();
let full_topic = if topic_string.contains(':') {
topic_string.to_string()
} else if topic_string.contains('/') {
format!("{0}:{1}", self.network_id, topic_string)
} else {
format!(
"{0}:{1}/{2}",
self.network_id, self.service_path, topic_string
)
};
self.logger
.debug(format!("Making request to processed path: {full_topic}"));
self.node_delegate
.remote_request::<P>(&full_topic, payload)
.await
}
pub async fn request<P>(&self, topic: impl AsRef<str>, payload: Option<P>) -> Result<ArcValue>
where
P: AsArcValue + Send + Sync,
{
let topic_string = topic.as_ref();
let full_topic = if topic_string.contains(':') {
topic_string.to_string()
} else if topic_string.contains('/') {
format!("{0}:{1}", self.network_id, topic_string)
} else {
format!(
"{0}:{1}/{2}",
self.network_id, self.service_path, topic_string
)
};
self.logger.debug(format!(
"LifecycleContext making request to processed path: {full_topic}"
));
self.node_delegate.request::<P>(&full_topic, payload).await
}
pub async fn publish(&self, topic: &str, data: Option<ArcValue>) -> Result<()> {
let full_topic = if topic.contains(':') {
topic.to_string()
} else if topic.contains('/') {
format!("{0}:{1}", self.network_id, topic)
} else {
format!("{0}:{1}/{2}", self.network_id, self.service_path, topic)
};
self.logger.debug(format!(
"LifecycleContext publishing to processed topic: {full_topic}"
));
self.node_delegate.publish(&full_topic, data).await
}
pub async fn publish_with_options(
&self,
topic: impl AsRef<str>,
data: Option<ArcValue>,
options: PublishOptions,
) -> Result<()> {
let topic_string = topic.as_ref();
let full_topic = if topic_string.contains(':') {
topic_string.to_string()
} else if topic_string.contains('/') {
format!("{0}:{1}", self.network_id, topic_string)
} else {
format!(
"{0}:{1}/{2}",
self.network_id, self.service_path, topic_string
)
};
self.logger.debug(format!(
"LifecycleContext publishing (with options) to processed topic: {full_topic}"
));
self.node_delegate
.publish_with_options(&full_topic, data, options)
.await
}
pub async fn on(
&self,
topic: impl AsRef<str>,
options: Option<OnOptions>,
) -> Result<Option<ArcValue>> {
let topic_string = topic.as_ref();
let full_topic = if topic_string.contains(':') {
topic_string.to_string()
} else if topic_string.contains('/') {
format!("{0}:{1}", self.network_id, topic_string)
} else {
format!(
"{0}:{1}/{2}",
self.network_id, self.service_path, topic_string
)
};
let handle = self.node_delegate.on(&full_topic, options);
let inner = handle.await.map_err(|e| anyhow::anyhow!(e))?;
inner
}
pub async fn register_action(
&self,
action_name: impl Into<String>,
handler: ActionHandler,
) -> Result<()> {
let delegate = &self.node_delegate;
let action_name_string = action_name.into();
let action_path = format!(
"{service_path}/{action_name}",
service_path = self.service_path,
action_name = action_name_string
);
self.logger.debug(format!(
"register_action name={}, service_path={}, action_path={}",
action_name_string, self.service_path, action_path
));
let topic_path: TopicPath = TopicPath::new(&action_path, &self.network_id)
.map_err(|e| anyhow!("Invalid action path: {e}"))?;
self.logger
.debug(format!("register_action: created TopicPath {topic_path}"));
let metadata = ActionMetadata {
name: action_name_string.clone(),
description: format!(
"Action {} for service {}",
action_name_string, self.service_path
),
input_schema: None,
output_schema: None,
};
delegate
.register_action_handler(topic_path, handler, Some(metadata))
.await
}
pub async fn register_action_with_options(
&self,
action_name: impl Into<String>,
handler: ActionHandler,
options: ActionRegistrationOptions,
) -> Result<()> {
let action_name_string = action_name.into();
let metadata = ActionMetadata {
name: action_name_string.clone(),
description: options.description.unwrap_or_default(),
input_schema: options.input_schema,
output_schema: options.output_schema,
};
let delegate = &self.node_delegate;
let action_path = format!(
"{service_path}/{action_name}",
service_path = self.service_path,
action_name = action_name_string
);
let topic_path = TopicPath::new(&action_path, &self.network_id)
.map_err(|e| anyhow!("Invalid action path: {e}"))?;
delegate
.register_action_handler(topic_path, handler, Some(metadata))
.await
}
pub async fn subscribe(
&self,
topic: &str,
callback: EventHandler,
options: Option<EventRegistrationOptions>,
) -> Result<String> {
let topic_string = topic.to_string();
let full_topic = if topic_string.contains(':') {
topic_string
} else if topic_string.contains('/') {
format!("{0}:{1}", self.network_id, topic_string)
} else {
format!(
"{0}:{1}/{2}",
self.network_id, self.service_path, topic_string
)
};
let delegate = &self.node_delegate;
delegate.subscribe(&full_topic, callback, options).await
}
pub async fn unsubscribe(&self, subscription_id: &str) -> Result<()> {
self.node_delegate.unsubscribe(subscription_id).await
}
}
impl LoggingContext for LifecycleContext {
fn component(&self) -> Component {
Component::Service
}
fn service_path(&self) -> Option<&str> {
Some(&self.service_path)
}
fn logger(&self) -> &Logger {
&self.logger
}
}
pub struct ServiceRequest {
pub topic_path: TopicPath,
pub data: ArcValue,
pub context: Arc<RequestContext>,
}
impl ServiceRequest {
pub fn new(
service_path: impl AsRef<str>,
action_or_event: impl AsRef<str>,
data: ArcValue,
context: Arc<RequestContext>,
) -> Self {
let service_path_string = service_path.as_ref();
let action_or_event_string = action_or_event.as_ref();
let path_string = format!("{service_path_string}/{action_or_event_string}");
let topic_path =
TopicPath::new(&path_string, &context.network_id()).expect("Invalid path format");
Self {
topic_path,
data,
context,
}
}
pub fn new_with_topic_path(
topic_path: TopicPath,
data: ArcValue,
context: Arc<RequestContext>,
) -> Self {
Self {
topic_path,
data,
context,
}
}
pub fn new_with_optional(
service_path: impl AsRef<str>,
action_or_event: impl AsRef<str>,
data: Option<ArcValue>,
context: Arc<RequestContext>,
) -> Self {
let service_path_string = service_path.as_ref();
let action_or_event_string = action_or_event.as_ref();
let path_string = format!("{service_path_string}/{action_or_event_string}");
let topic_path =
TopicPath::new(&path_string, &context.network_id()).expect("Invalid path format");
Self {
topic_path,
data: data.unwrap_or_else(ArcValue::null),
context,
}
}
pub fn path(&self) -> String {
self.topic_path.service_path()
}
pub fn action_or_event(&self) -> String {
let action_path = self.topic_path.action_path();
if let Some(idx) = action_path.rfind('/') {
action_path[idx + 1..].to_string()
} else {
action_path
}
}
}
#[derive(Debug, Clone, Default)]
pub struct PublishOptions {
pub broadcast: bool,
pub guaranteed_delivery: bool,
pub retain_for: Option<std::time::Duration>,
pub target: Option<String>,
}
impl PublishOptions {
pub fn local_only() -> Self {
Self {
broadcast: false,
guaranteed_delivery: false,
retain_for: None,
target: None,
}
}
pub fn with_retain_for(mut self, duration: std::time::Duration) -> Self {
self.retain_for = Some(duration);
self
}
}
#[derive(Clone, Debug, Default)]
pub struct EventRegistrationOptions {
pub include_past: Option<std::time::Duration>,
}
#[derive(Clone, Debug)]
pub struct OnOptions {
pub timeout: std::time::Duration,
pub include_past: Option<std::time::Duration>,
}
pub struct ActionRegistrationOptions {
pub description: Option<String>,
pub input_schema: Option<FieldSchema>,
pub output_schema: Option<FieldSchema>,
}
#[async_trait::async_trait]
pub trait NodeRequestHandler: Send + Sync {
async fn request(&self, path: String, params: Option<ArcValue>) -> Result<ArcValue>;
async fn publish(&self, topic: String, data: Option<ArcValue>) -> Result<()>;
async fn subscribe(
&self,
topic: String,
callback: Box<
dyn Fn(Arc<EventContext>, ArcValue) -> Pin<Box<dyn Future<Output = Result<()>> + Send>>
+ Send
+ Sync,
>,
options: Option<EventRegistrationOptions>,
) -> Result<String>;
async fn unsubscribe(&self, topic: String, subscription_id: Option<&str>) -> Result<()>;
}
pub trait ArcContextLogging {
fn debug(&self, message: impl Into<String>);
fn info(&self, message: impl Into<String>);
fn warn(&self, message: impl Into<String>);
fn error(&self, message: impl Into<String>);
}
impl ArcContextLogging for Arc<RequestContext> {
fn debug(&self, message: impl Into<String>) {
self.logger.debug(message);
}
fn info(&self, message: impl Into<String>) {
self.logger.info(message);
}
fn warn(&self, message: impl Into<String>) {
self.logger.warn(message);
}
fn error(&self, message: impl Into<String>) {
self.logger.error(message);
}
}
pub type ServiceFuture = Pin<Box<dyn Future<Output = Result<ArcValue>> + Send>>;
#[async_trait::async_trait]
pub trait EventDispatcher: Send + Sync {
async fn publish(
&self,
topic: &str,
event: &str,
data: Option<ArcValue>,
network_id: &str,
) -> Result<()>;
}
#[async_trait::async_trait]
pub trait NodeDelegate: Send + Sync {
async fn request<P>(&self, path: &str, payload: Option<P>) -> Result<ArcValue>
where
P: AsArcValue + Send + Sync;
async fn publish(&self, topic: &str, data: Option<ArcValue>) -> Result<()>;
async fn subscribe(
&self,
topic: &str,
callback: EventHandler,
options: Option<EventRegistrationOptions>, // None means default behavior
) -> Result<String>;
async fn unsubscribe(&self, subscription_id: &str) -> Result<()>;
async fn register_action_handler(
&self,
topic_path: TopicPath,
handler: ActionHandler,
metadata: Option<ActionMetadata>,
) -> Result<()>;
async fn on(&self, topic: &str, options: Option<OnOptions>) -> Result<Option<ArcValue>>;
}
#[async_trait::async_trait]
pub trait KeysDelegate: Send + Sync {
async fn ensure_symmetric_key(&self, key_name: &str) -> Result<ArcValue>;
}
#[async_trait::async_trait]
pub trait RegistryDelegate: Send + Sync {
async fn get_local_service_state(&self, service_path: &TopicPath) -> Option<ServiceState>;
async fn get_remote_service_state(&self, service_path: &TopicPath) -> Option<ServiceState>;
async fn get_service_metadata(&self, service_path: &TopicPath) -> Option<ServiceMetadata>;
async fn get_all_service_metadata(
&self,
include_internal_services: bool,
) -> Result<HashMap<String, ServiceMetadata>>;
async fn get_actions_metadata(&self, service_topic_path: &TopicPath) -> Vec<ActionMetadata>;
async fn register_remote_action_handler(
&self,
topic_path: &TopicPath,
handler: ActionHandler,
) -> Result<()>;
async fn remove_remote_action_handler(&self, topic_path: &TopicPath) -> Result<()>;
async fn register_remote_event_handler(
&self,
topic_path: &TopicPath,
handler: RemoteEventHandler,
) -> Result<String>;
async fn remove_remote_event_handler(&self, topic_path: &TopicPath) -> Result<()>;
async fn update_local_service_state_if_valid(
&self,
service_path: &TopicPath,
new_state: ServiceState,
current_state: ServiceState,
) -> Result<()>;
async fn validate_pause_transition(&self, service_path: &TopicPath) -> Result<()>;
async fn validate_resume_transition(&self, service_path: &TopicPath) -> Result<()>;
}
pub struct RemoteLifecycleContext {
pub network_id: String,
pub service_path: String,
pub config: Option<ArcValue>,
pub logger: Arc<Logger>,
registry_delegate: Option<Arc<dyn RegistryDelegate + Send + Sync>>,
}
impl RemoteLifecycleContext {
pub fn new(topic_path: &TopicPath, logger: Arc<Logger>) -> Self {
Self {
network_id: topic_path.network_id().to_string(),
service_path: topic_path.service_path(),
config: None,
logger,
registry_delegate: None,
}
}
pub fn with_config(mut self, config: ArcValue) -> Self {
self.config = Some(config);
self
}
pub fn with_registry_delegate(
mut self,
delegate: Arc<dyn RegistryDelegate + Send + Sync>,
) -> Self {
self.registry_delegate = Some(delegate);
self
}
pub fn debug(&self, message: impl Into<String>) {
self.logger.debug(message);
}
pub fn info(&self, message: impl Into<String>) {
self.logger.info(message);
}
pub fn warn(&self, message: impl Into<String>) {
self.logger.warn(message);
}
pub fn error(&self, message: impl Into<String>) {
self.logger.error(message);
}
pub async fn remove_remote_action_handler(&self, topic_path: &TopicPath) -> Result<()> {
let delegate = match &self.registry_delegate {
Some(d) => d,
None => return Err(anyhow!("No registry delegate available")),
};
delegate.remove_remote_action_handler(topic_path).await
}
pub async fn register_remote_action_handler(
&self,
topic_path: &TopicPath,
handler: ActionHandler,
) -> Result<()> {
let delegate = match &self.registry_delegate {
Some(d) => d,
None => return Err(anyhow!("No registry delegate available")),
};
delegate
.register_remote_action_handler(topic_path, handler)
.await
}
}
pub type ServiceHandler = Box<
dyn Fn(
Option<ArcValue>,
Arc<RequestContext>,
) -> Pin<Box<dyn Future<Output = Option<ArcValue>> + Send>>
+ Send
+ Sync,
>;