use crate::node::Node; use crate::routing::TopicPath;
use crate::services::service_registry::EventHandler;
use crate::services::NodeDelegate;
use crate::services::{EventRegistrationOptions, PublishOptions};
use anyhow::Result;
use runar_common::logging::{Component, Logger, LoggingContext};
use runar_macros_common::{log_debug, log_error, log_info, log_warn};
use runar_serializer::arc_value::AsArcValue;
use runar_serializer::ArcValue;
use std::{collections::HashMap, fmt, sync::Arc};
pub struct RequestContext {
pub topic_path: TopicPath,
pub metadata: Option<ArcValue>,
pub logger: Arc<Logger>,
pub path_params: HashMap<String, String>,
pub user_profile_public_key: Vec<u8>,
pub(crate) node_delegate: Arc<Node>,
}
impl fmt::Debug for RequestContext {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
f.debug_struct("RequestContext")
.field("network_id", &self.network_id())
.field("service_path", &self.service_path())
.field("topic_path", &self.topic_path)
.field("metadata", &self.metadata)
.field("logger", &"<Logger>") .field("path_params", &self.path_params)
.finish()
}
}
impl Clone for RequestContext {
fn clone(&self) -> Self {
Self {
topic_path: self.topic_path.clone(),
metadata: self.metadata.clone(),
logger: self.logger.clone(),
path_params: self.path_params.clone(),
node_delegate: self.node_delegate.clone(),
user_profile_public_key: self.user_profile_public_key.clone(),
}
}
}
impl Default for RequestContext {
fn default() -> Self {
panic!("RequestContext should not be created with default. Use new instead");
}
}
impl RequestContext {
pub fn new(topic_path: &TopicPath, node_delegate: Arc<Node>, logger: Arc<Logger>) -> Self {
let action_path = topic_path.action_path();
let action_logger = if !action_path.is_empty() {
Arc::new(logger.with_action_path(action_path))
} else {
logger
};
Self {
topic_path: topic_path.clone(),
metadata: None,
logger: action_logger,
node_delegate,
path_params: HashMap::new(),
user_profile_public_key: vec![],
}
}
pub fn with_metadata(mut self, metadata: ArcValue) -> Self {
self.metadata = Some(metadata);
self
}
pub fn with_user_profile_public_key(mut self, user_profile_public_key: Vec<u8>) -> Self {
self.user_profile_public_key = user_profile_public_key;
self
}
pub fn network_id(&self) -> String {
self.topic_path.network_id()
}
pub fn service_path(&self) -> String {
self.topic_path.service_path()
}
pub fn debug(&self, message: impl Into<String>) {
log_debug!(self.logger, "{}", message.into());
}
pub fn info(&self, message: impl Into<String>) {
log_info!(self.logger, "{}", message.into());
}
pub fn warn(&self, message: impl Into<String>) {
log_warn!(self.logger, "{}", message.into());
}
pub fn error(&self, message: impl Into<String>) {
log_error!(self.logger, "{}", message.into());
}
pub async fn publish(&self, topic: &str, data: Option<ArcValue>) -> Result<()> {
let topic_string = topic.to_string();
let full_topic = if topic_string.contains(':') {
topic_string
} else if topic_string.contains('/') {
let first_seg = topic_string.split('/').next().unwrap_or("");
if first_seg == self.topic_path.service_path() {
format!(
"{network_id}:{topic}",
network_id = self.topic_path.network_id(),
topic = topic_string,
)
} else {
format!(
"{network_id}:{service}/{topic}",
network_id = self.topic_path.network_id(),
service = self.topic_path.service_path(),
topic = topic_string,
)
}
} else {
format!(
"{}:{}/{}",
self.topic_path.network_id(),
self.topic_path.service_path(),
topic_string
)
};
log_debug!(self.logger, "Publishing to processed topic: {full_topic}");
self.node_delegate.publish(&full_topic, data).await
}
pub async fn publish_with_options(
&self,
topic: &str,
data: Option<ArcValue>,
options: PublishOptions,
) -> Result<()> {
let topic_string = topic.to_string();
let full_topic = if topic_string.contains(':') {
topic_string
} else if topic_string.contains('/') {
let first_seg = topic_string.split('/').next().unwrap_or("");
if first_seg == self.topic_path.service_path() {
format!(
"{network_id}:{topic}",
network_id = self.topic_path.network_id(),
topic = topic_string,
)
} else {
format!(
"{network_id}:{service}/{topic}",
network_id = self.topic_path.network_id(),
service = self.topic_path.service_path(),
topic = topic_string,
)
}
} else {
format!(
"{}:{}/{}",
self.topic_path.network_id(),
self.topic_path.service_path(),
topic_string
)
};
log_debug!(self.logger, "Publishing (with options) to: {full_topic}");
self.node_delegate
.publish_with_options(&full_topic, data, options)
.await
}
pub async fn remote_request<P>(
&self,
path: impl AsRef<str>,
payload: Option<P>,
) -> Result<ArcValue>
where
P: AsArcValue + Send + Sync,
{
let path_string = path.as_ref();
let full_path = if path_string.contains(':') {
path_string.to_string()
} else if path_string.contains('/') {
format!(
"{network_id}:{path_string}",
network_id = self.topic_path.network_id()
)
} else {
format!(
"{}:{}/{}",
self.topic_path.network_id(),
self.topic_path.service_path(),
path_string
)
};
self.logger
.debug(format!("Making request to processed path: {full_path}"));
self.node_delegate
.remote_request::<P>(&full_path, payload)
.await
}
pub async fn request<P>(&self, path: &str, payload: Option<P>) -> Result<ArcValue>
where
P: AsArcValue + Send + Sync,
{
let path_string = path;
let full_path = if path_string.contains(':') {
path_string.to_string()
} else if path_string.contains('/') {
format!(
"{network_id}:{path_string}",
network_id = self.topic_path.network_id()
)
} else {
format!(
"{}:{}/{}",
self.topic_path.network_id(),
self.topic_path.service_path(),
path_string
)
};
log_debug!(self.logger, "Making request to processed path: {full_path}");
self.node_delegate.request::<P>(&full_path, payload).await
}
pub async fn on(
&self,
topic: impl Into<String>,
options: Option<crate::services::OnOptions>,
) -> Result<Option<ArcValue>> {
let handle = self.node_delegate.on(topic, options);
handle.await.map_err(|e| anyhow::anyhow!(e))?
}
pub async fn subscribe(
&self,
topic: impl Into<String>,
callback: EventHandler,
options: Option<EventRegistrationOptions>,
) -> Result<String> {
let topic_string = topic.into();
let full_topic = if topic_string.contains(':') {
topic_string
} else if topic_string.contains('/') {
format!(
"{network_id}:{topic}",
network_id = self.topic_path.network_id(),
topic = topic_string
)
} else {
format!(
"{}:{}/{}",
self.topic_path.network_id(),
self.topic_path.service_path(),
topic_string
)
};
self.node_delegate
.subscribe(&full_topic, callback, options)
.await
}
}
impl LoggingContext for RequestContext {
fn component(&self) -> Component {
Component::Service
}
fn service_path(&self) -> Option<&str> {
let path = self.topic_path.service_path();
Some(Box::leak(path.into_boxed_str()))
}
fn action_path(&self) -> Option<&str> {
let path = self.topic_path.action_path();
Some(Box::leak(path.into_boxed_str()))
}
fn logger(&self) -> &Logger {
&self.logger
}
}