use anyhow::{anyhow, Result};
use serde::{Deserialize, Serialize};
use std::collections::{HashMap, VecDeque};
use std::sync::Arc;
use std::time::{Duration, Instant};
use tokio::sync::{broadcast, RwLock};
use tokio::time::interval;
use tracing::{debug, error, info, warn};
use uuid::Uuid;
pub struct MessageBridgeManager {
bridges: Arc<RwLock<HashMap<String, MessageBridge>>>,
configs: Arc<RwLock<HashMap<String, BridgeConfig>>>,
transformers: Arc<RwLock<HashMap<String, Box<dyn MessageTransformer + Send + Sync>>>>,
router: Arc<RoutingEngine>,
stats: Arc<RwLock<BridgeStats>>,
event_notifier: broadcast::Sender<BridgeNotification>,
}
#[derive(Clone)]
struct MessageBridge {
id: String,
bridge_type: BridgeType,
source: ExternalSystemConfig,
target: ExternalSystemConfig,
transformer: String,
routing_rules: Vec<RoutingRule>,
status: BridgeStatus,
stats: BridgeStatistics,
created_at: Instant,
last_activity: Option<Instant>,
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub enum BridgeType {
Bidirectional,
SourceToTarget,
TargetToSource,
Fanout,
Fanin,
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct ExternalSystemConfig {
pub system_type: ExternalSystemType,
pub connection: ConnectionConfig,
pub format: FormatConfig,
pub security: SecurityConfig,
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub enum ExternalSystemType {
Kafka {
brokers: Vec<String>,
topics: Vec<String>,
consumer_group: Option<String>,
},
RabbitMQ {
url: String,
exchange: String,
routing_key: String,
queue: Option<String>,
},
AmazonSQS {
region: String,
queue_url: String,
credentials: AwsCredentials,
},
AzureServiceBus {
connection_string: String,
queue_name: String,
},
GooglePubSub {
project_id: String,
topic: String,
subscription: Option<String>,
},
Pulsar {
service_url: String,
topics: Vec<String>,
subscription: Option<String>,
},
RedisPubSub { url: String, channels: Vec<String> },
HttpRest {
base_url: String,
endpoints: HashMap<String, String>,
headers: HashMap<String, String>,
},
WebSocket { url: String, protocols: Vec<String> },
FileSystem {
directory: String,
pattern: String,
watch_mode: bool,
},
Mqtt {
broker_url: String,
client_id: String,
topic_subscriptions: Vec<String>,
qos: u8,
username: Option<String>,
password: Option<String>,
},
OpcUa {
endpoint_url: String,
security_policy: String,
user_identity: String,
node_subscriptions: Vec<String>,
},
SparkplugB {
broker_url: String,
group_id: String,
edge_node_id: String,
device_ids: Vec<String>,
},
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct AwsCredentials {
pub access_key_id: String,
pub secret_access_key: String,
pub session_token: Option<String>,
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct ConnectionConfig {
pub timeout: Duration,
pub keep_alive: Duration,
pub retry: RetryConfig,
pub tls: Option<TlsConfig>,
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct TlsConfig {
pub enabled: bool,
pub verify_certificate: bool,
pub certificate_path: Option<String>,
pub private_key_path: Option<String>,
pub ca_certificate_path: Option<String>,
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct RetryConfig {
pub max_attempts: u32,
pub initial_delay: Duration,
pub max_delay: Duration,
pub exponential_backoff: bool,
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct FormatConfig {
pub format: MessageFormat,
pub encoding: String,
pub compression: Option<CompressionType>,
pub schema_validation: bool,
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub enum MessageFormat {
Json,
Avro { schema: String },
Protobuf { schema: String },
Xml,
Text,
Binary,
Custom { transformer: String },
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub enum CompressionType {
Gzip,
Snappy,
Lz4,
Zstd,
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct SecurityConfig {
pub auth: AuthenticationMethod,
pub encryption: EncryptionConfig,
pub access_control: AccessControlConfig,
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub enum AuthenticationMethod {
None,
BasicAuth {
username: String,
password: String,
},
BearerToken {
token: String,
},
ApiKey {
key: String,
header: String,
},
OAuth2 {
client_id: String,
client_secret: String,
token_url: String,
},
SaslPlain {
username: String,
password: String,
},
SaslScramSha256 {
username: String,
password: String,
},
Certificate {
cert_path: String,
key_path: String,
},
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct EncryptionConfig {
pub enabled: bool,
pub algorithm: Option<String>,
pub key_id: Option<String>,
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct AccessControlConfig {
pub read_permissions: Vec<String>,
pub write_permissions: Vec<String>,
pub admin_permissions: Vec<String>,
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct RoutingRule {
pub name: String,
pub condition: RuleCondition,
pub action: RuleAction,
pub priority: u32,
pub enabled: bool,
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub enum RuleCondition {
Always,
EventType { types: Vec<String> },
Graph { patterns: Vec<String> },
SubjectPattern { regex: String },
Predicate { predicates: Vec<String> },
Expression { expr: String },
Composite {
operator: LogicalOperator,
conditions: Vec<RuleCondition>,
},
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub enum LogicalOperator {
And,
Or,
Not,
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub enum RuleAction {
Forward,
Drop,
Transform { transformer: String },
Route { target: String },
Duplicate { targets: Vec<String> },
}
#[derive(Debug, Clone, PartialEq)]
enum BridgeStatus {
Active,
#[allow(dead_code)]
Paused,
Stopped,
#[allow(dead_code)]
Failed {
reason: String,
},
}
#[derive(Debug, Clone)]
pub struct BridgeConfig {
pub max_queue_size: usize,
pub batch_size: usize,
pub processing_interval: Duration,
pub enable_monitoring: bool,
pub enable_dlq: bool,
pub message_ttl: Duration,
}
impl Default for BridgeConfig {
fn default() -> Self {
Self {
max_queue_size: 10000,
batch_size: 100,
processing_interval: Duration::from_millis(100),
enable_monitoring: true,
enable_dlq: true,
message_ttl: Duration::from_secs(24 * 60 * 60),
}
}
}
#[derive(Debug, Clone, Default)]
pub struct BridgeStatistics {
pub messages_received: u64,
pub messages_sent: u64,
pub messages_dropped: u64,
pub messages_failed: u64,
pub transform_errors: u64,
pub avg_processing_time: Duration,
pub last_activity: Option<Instant>,
}
#[derive(Debug, Clone, Default)]
pub struct BridgeStats {
pub total_bridges: usize,
pub active_bridges: usize,
pub total_messages: u64,
pub failed_messages: u64,
pub avg_processing_time: Duration,
}
#[derive(Debug, Clone)]
pub enum BridgeNotification {
BridgeCreated { id: String, bridge_type: BridgeType },
BridgeStarted { id: String },
BridgeStopped { id: String },
BridgeFailed { id: String, reason: String },
MessageProcessed {
bridge_id: String,
message_id: String,
duration: Duration,
},
MessageFailed {
bridge_id: String,
message_id: String,
error: String,
},
}
pub trait MessageTransformer {
fn transform(&self, message: &ExternalMessage) -> Result<ExternalMessage>;
fn name(&self) -> &str;
fn supported_formats(&self) -> (MessageFormat, MessageFormat);
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct ExternalMessage {
pub id: String,
pub headers: HashMap<String, String>,
pub payload: Vec<u8>,
pub format: MessageFormat,
pub timestamp: chrono::DateTime<chrono::Utc>,
pub source: String,
pub metadata: HashMap<String, String>,
}
struct RoutingEngine {
_global_rules: Arc<RwLock<Vec<RoutingRule>>>,
_rule_cache: Arc<RwLock<HashMap<String, Vec<RoutingRule>>>>,
}
impl MessageBridgeManager {
pub async fn new() -> Result<Self> {
let (tx, _) = broadcast::channel(1000);
Ok(Self {
bridges: Arc::new(RwLock::new(HashMap::new())),
configs: Arc::new(RwLock::new(HashMap::new())),
transformers: Arc::new(RwLock::new(HashMap::new())),
router: Arc::new(RoutingEngine::new()),
stats: Arc::new(RwLock::new(BridgeStats::default())),
event_notifier: tx,
})
}
pub async fn register_transformer(
&self,
transformer: Box<dyn MessageTransformer + Send + Sync>,
) {
let name = transformer.name().to_string();
self.transformers.write().await.insert(name, transformer);
info!("Registered message transformer");
}
pub async fn create_bridge(
&self,
bridge_type: BridgeType,
source: ExternalSystemConfig,
target: ExternalSystemConfig,
transformer: String,
routing_rules: Vec<RoutingRule>,
config: BridgeConfig,
) -> Result<String> {
if !self.transformers.read().await.contains_key(&transformer) {
return Err(anyhow!("Transformer not found: {}", transformer));
}
let bridge_id = Uuid::new_v4().to_string();
let bridge = MessageBridge {
id: bridge_id.clone(),
bridge_type: bridge_type.clone(),
source,
target,
transformer,
routing_rules,
status: BridgeStatus::Stopped,
stats: BridgeStatistics::default(),
created_at: Instant::now(),
last_activity: None,
};
self.bridges.write().await.insert(bridge_id.clone(), bridge);
self.configs.write().await.insert(bridge_id.clone(), config);
let mut stats = self.stats.write().await;
stats.total_bridges += 1;
drop(stats);
let _ = self.event_notifier.send(BridgeNotification::BridgeCreated {
id: bridge_id.clone(),
bridge_type,
});
info!("Created message bridge: {}", bridge_id);
Ok(bridge_id)
}
pub async fn start_bridge(&self, bridge_id: &str) -> Result<()> {
let bridge_exists = {
let mut bridges = self.bridges.write().await;
if let Some(bridge) = bridges.get_mut(bridge_id) {
bridge.status = BridgeStatus::Active;
true
} else {
false
}
};
if !bridge_exists {
return Err(anyhow!("Bridge not found"));
}
self.start_bridge_processing(bridge_id).await?;
self.stats.write().await.active_bridges += 1;
let _ = self.event_notifier.send(BridgeNotification::BridgeStarted {
id: bridge_id.to_string(),
});
info!("Started bridge: {}", bridge_id);
Ok(())
}
pub async fn stop_bridge(&self, bridge_id: &str) -> Result<()> {
let mut bridges = self.bridges.write().await;
let bridge = bridges
.get_mut(bridge_id)
.ok_or_else(|| anyhow!("Bridge not found"))?;
bridge.status = BridgeStatus::Stopped;
self.stats.write().await.active_bridges = bridges
.values()
.filter(|b| b.status == BridgeStatus::Active)
.count();
let _ = self.event_notifier.send(BridgeNotification::BridgeStopped {
id: bridge_id.to_string(),
});
info!("Stopped bridge: {}", bridge_id);
Ok(())
}
async fn start_bridge_processing(&self, bridge_id: &str) -> Result<()> {
let bridge = {
let bridges_guard = self.bridges.read().await;
bridges_guard
.get(bridge_id)
.ok_or_else(|| anyhow!("Bridge not found"))?
.clone()
};
let config = {
let configs_guard = self.configs.read().await;
configs_guard
.get(bridge_id)
.ok_or_else(|| anyhow!("Bridge config not found"))?
.clone()
};
let bridges = self.bridges.clone();
let transformers = self.transformers.clone();
let router = self.router.clone();
let stats = self.stats.clone();
let event_notifier = self.event_notifier.clone();
let bridge_id = bridge_id.to_string();
tokio::spawn(async move {
let mut interval = interval(config.processing_interval);
let mut message_queue = VecDeque::new();
loop {
interval.tick().await;
let status = {
let bridges_guard = bridges.read().await;
bridges_guard.get(&bridge_id).map(|b| b.status.clone())
};
if let Some(BridgeStatus::Active) = status {
match MessageBridgeManager::receive_messages(&bridge.source, &config).await {
Ok(messages) => {
for message in messages {
message_queue.push_back(message);
if message_queue.len() > config.max_queue_size {
message_queue.pop_front();
warn!("Bridge queue full, dropping oldest message");
}
}
}
Err(e) => {
error!("Failed to receive messages for bridge {}: {}", bridge_id, e);
}
}
let batch_size = config.batch_size.min(message_queue.len());
if batch_size > 0 {
let batch: Vec<_> = message_queue.drain(..batch_size).collect();
for message in batch {
let start_time = Instant::now();
match MessageBridgeManager::process_message(
&bridge,
&message,
&transformers,
&router,
)
.await
{
Ok(_) => {
let duration = start_time.elapsed();
MessageBridgeManager::update_bridge_stats(
&bridges, &bridge_id, true, duration,
)
.await;
stats.write().await.total_messages += 1;
let _ =
event_notifier.send(BridgeNotification::MessageProcessed {
bridge_id: bridge_id.clone(),
message_id: message.id.clone(),
duration,
});
}
Err(e) => {
let duration = start_time.elapsed();
error!(
"Failed to process message {} in bridge {}: {}",
message.id, bridge_id, e
);
MessageBridgeManager::update_bridge_stats(
&bridges, &bridge_id, false, duration,
)
.await;
stats.write().await.failed_messages += 1;
let _ =
event_notifier.send(BridgeNotification::MessageFailed {
bridge_id: bridge_id.clone(),
message_id: message.id.clone(),
error: e.to_string(),
});
if config.enable_dlq {
warn!("Message sent to dead letter queue: {}", message.id);
}
}
}
}
}
} else {
break;
}
}
});
Ok(())
}
async fn receive_messages(
source: &ExternalSystemConfig,
config: &BridgeConfig,
) -> Result<Vec<ExternalMessage>> {
match &source.system_type {
ExternalSystemType::Kafka {
brokers,
topics,
consumer_group,
} => Self::receive_kafka_messages(brokers, topics, consumer_group, config).await,
ExternalSystemType::RabbitMQ {
url,
exchange,
routing_key,
queue,
} => Self::receive_rabbitmq_messages(url, exchange, routing_key, queue, config).await,
ExternalSystemType::RedisPubSub { url, channels } => {
Self::receive_redis_messages(url, channels, config).await
}
ExternalSystemType::HttpRest {
base_url,
endpoints,
headers,
} => Self::receive_http_messages(base_url, endpoints, headers, config).await,
ExternalSystemType::FileSystem {
directory,
pattern,
watch_mode,
} => Self::receive_file_messages(directory, pattern, *watch_mode, config).await,
_ => {
warn!("Message receiving not implemented for this system type");
Ok(vec![])
}
}
}
async fn receive_kafka_messages(
_brokers: &[String],
_topics: &[String],
_consumer_group: &Option<String>,
_config: &BridgeConfig,
) -> Result<Vec<ExternalMessage>> {
Ok(vec![])
}
async fn receive_rabbitmq_messages(
_url: &str,
_exchange: &str,
_routing_key: &str,
_queue: &Option<String>,
_config: &BridgeConfig,
) -> Result<Vec<ExternalMessage>> {
Ok(vec![])
}
async fn receive_redis_messages(
_url: &str,
_channels: &[String],
_config: &BridgeConfig,
) -> Result<Vec<ExternalMessage>> {
Ok(vec![])
}
async fn receive_http_messages(
_base_url: &str,
_endpoints: &HashMap<String, String>,
_headers: &HashMap<String, String>,
_config: &BridgeConfig,
) -> Result<Vec<ExternalMessage>> {
Ok(vec![])
}
async fn receive_file_messages(
_directory: &str,
_pattern: &str,
_watch_mode: bool,
_config: &BridgeConfig,
) -> Result<Vec<ExternalMessage>> {
Ok(vec![])
}
async fn process_message(
bridge: &MessageBridge,
message: &ExternalMessage,
transformers: &Arc<RwLock<HashMap<String, Box<dyn MessageTransformer + Send + Sync>>>>,
router: &Arc<RoutingEngine>,
) -> Result<()> {
let action = router
.evaluate_rules(&bridge.routing_rules, message)
.await?;
match action {
RuleAction::Drop => {
debug!("Message dropped by routing rule: {}", message.id);
return Ok(());
}
RuleAction::Forward => {
}
RuleAction::Transform { transformer } => {
let transformed = {
let transformers_guard = transformers.read().await;
let transformer = transformers_guard
.get(&transformer)
.ok_or_else(|| anyhow!("Transformer not found: {}", transformer))?;
transformer.transform(message)?
};
return Self::send_message(&bridge.target, &transformed).await;
}
_ => {
warn!("Routing action not implemented: {:?}", action);
}
}
let transformed = {
let transformers_guard = transformers.read().await;
let transformer = transformers_guard
.get(&bridge.transformer)
.ok_or_else(|| anyhow!("Transformer not found: {}", bridge.transformer))?;
transformer.transform(message)?
};
Self::send_message(&bridge.target, &transformed).await
}
async fn send_message(target: &ExternalSystemConfig, message: &ExternalMessage) -> Result<()> {
match &target.system_type {
ExternalSystemType::Kafka {
brokers, topics, ..
} => Self::send_kafka_message(brokers, topics, message).await,
ExternalSystemType::RabbitMQ {
url,
exchange,
routing_key,
..
} => Self::send_rabbitmq_message(url, exchange, routing_key, message).await,
ExternalSystemType::RedisPubSub { url, channels } => {
Self::send_redis_message(url, channels, message).await
}
ExternalSystemType::HttpRest {
base_url,
endpoints,
headers,
} => Self::send_http_message(base_url, endpoints, headers, message).await,
ExternalSystemType::FileSystem { directory, .. } => {
Self::send_file_message(directory, message).await
}
_ => {
warn!("Message sending not implemented for this system type");
Ok(())
}
}
}
async fn send_kafka_message(
_brokers: &[String],
_topics: &[String],
_message: &ExternalMessage,
) -> Result<()> {
Ok(())
}
async fn send_rabbitmq_message(
_url: &str,
_exchange: &str,
_routing_key: &str,
_message: &ExternalMessage,
) -> Result<()> {
Ok(())
}
async fn send_redis_message(
_url: &str,
_channels: &[String],
_message: &ExternalMessage,
) -> Result<()> {
Ok(())
}
async fn send_http_message(
_base_url: &str,
_endpoints: &HashMap<String, String>,
_headers: &HashMap<String, String>,
_message: &ExternalMessage,
) -> Result<()> {
Ok(())
}
async fn send_file_message(_directory: &str, _message: &ExternalMessage) -> Result<()> {
Ok(())
}
async fn update_bridge_stats(
bridges: &Arc<RwLock<HashMap<String, MessageBridge>>>,
bridge_id: &str,
success: bool,
duration: Duration,
) {
let mut bridges_guard = bridges.write().await;
if let Some(bridge) = bridges_guard.get_mut(bridge_id) {
bridge.last_activity = Some(Instant::now());
if success {
bridge.stats.messages_sent += 1;
} else {
bridge.stats.messages_failed += 1;
}
let total_messages = bridge.stats.messages_sent + bridge.stats.messages_failed;
let avg_nanos = bridge.stats.avg_processing_time.as_nanos() as u64;
let duration_nanos = duration.as_nanos() as u64;
if let Some(new_avg_nanos) =
(avg_nanos * (total_messages - 1) + duration_nanos).checked_div(total_messages)
{
bridge.stats.avg_processing_time = Duration::from_nanos(new_avg_nanos);
}
}
}
pub async fn get_bridge_stats(&self, bridge_id: &str) -> Result<BridgeStatistics> {
let bridges = self.bridges.read().await;
let bridge = bridges
.get(bridge_id)
.ok_or_else(|| anyhow!("Bridge not found"))?;
Ok(bridge.stats.clone())
}
pub async fn get_stats(&self) -> BridgeStats {
self.stats.read().await.clone()
}
pub async fn list_bridges(&self) -> Vec<BridgeInfo> {
let bridges = self.bridges.read().await;
bridges
.values()
.map(|b| BridgeInfo {
id: b.id.clone(),
bridge_type: b.bridge_type.clone(),
status: format!("{:?}", b.status),
created_at: b.created_at.elapsed(),
last_activity: b.last_activity.map(|t| t.elapsed()),
messages_processed: b.stats.messages_sent + b.stats.messages_failed,
success_rate: if b.stats.messages_sent + b.stats.messages_failed > 0 {
b.stats.messages_sent as f64
/ (b.stats.messages_sent + b.stats.messages_failed) as f64
} else {
0.0
},
})
.collect()
}
pub fn subscribe(&self) -> broadcast::Receiver<BridgeNotification> {
self.event_notifier.subscribe()
}
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct BridgeInfo {
pub id: String,
pub bridge_type: BridgeType,
pub status: String,
pub created_at: Duration,
pub last_activity: Option<Duration>,
pub messages_processed: u64,
pub success_rate: f64,
}
impl RoutingEngine {
fn new() -> Self {
Self {
_global_rules: Arc::new(RwLock::new(Vec::new())),
_rule_cache: Arc::new(RwLock::new(HashMap::new())),
}
}
async fn evaluate_rules(
&self,
rules: &[RoutingRule],
message: &ExternalMessage,
) -> Result<RuleAction> {
let mut sorted_rules = rules.to_vec();
sorted_rules.sort_by_key(|r| r.priority);
for rule in sorted_rules.iter().filter(|r| r.enabled) {
if self.evaluate_condition(&rule.condition, message).await? {
return Ok(rule.action.clone());
}
}
Ok(RuleAction::Forward)
}
#[allow(clippy::only_used_in_recursion)]
fn evaluate_condition<'a>(
&'a self,
condition: &'a RuleCondition,
message: &'a ExternalMessage,
) -> std::pin::Pin<Box<dyn std::future::Future<Output = Result<bool>> + Send + 'a>> {
Box::pin(async move {
match condition {
RuleCondition::Always => Ok(true),
RuleCondition::EventType { types } => {
let unknown = "unknown".to_string();
let event_type = message
.headers
.get("event_type")
.or_else(|| message.metadata.get("event_type"))
.unwrap_or(&unknown);
Ok(types.contains(event_type))
}
RuleCondition::Graph { patterns } => {
let graph = message
.headers
.get("graph")
.or_else(|| message.metadata.get("graph"));
if let Some(g) = graph {
Ok(patterns.iter().any(|p| g.contains(p)))
} else {
Ok(false)
}
}
RuleCondition::SubjectPattern { regex } => {
let subject = message
.headers
.get("subject")
.or_else(|| message.metadata.get("subject"));
if let Some(s) = subject {
let re = regex::Regex::new(regex)
.map_err(|e| anyhow!("Invalid regex: {}", e))?;
Ok(re.is_match(s))
} else {
Ok(false)
}
}
RuleCondition::Predicate { predicates } => {
let predicate = message
.headers
.get("predicate")
.or_else(|| message.metadata.get("predicate"));
if let Some(p) = predicate {
Ok(predicates.contains(p))
} else {
Ok(false)
}
}
RuleCondition::Expression { expr } => {
warn!("Expression evaluation not implemented: {}", expr);
Ok(false)
}
RuleCondition::Composite {
operator,
conditions,
} => match operator {
LogicalOperator::And => {
for cond in conditions {
if !self.evaluate_condition(cond, message).await? {
return Ok(false);
}
}
Ok(true)
}
LogicalOperator::Or => {
for cond in conditions {
if self.evaluate_condition(cond, message).await? {
return Ok(true);
}
}
Ok(false)
}
LogicalOperator::Not => {
if conditions.len() != 1 {
return Err(anyhow!("NOT operator requires exactly one condition"));
}
Ok(!self.evaluate_condition(&conditions[0], message).await?)
}
},
}
})
}
}
pub struct JsonTransformer;
impl MessageTransformer for JsonTransformer {
fn transform(&self, message: &ExternalMessage) -> Result<ExternalMessage> {
Ok(message.clone())
}
fn name(&self) -> &str {
"json"
}
fn supported_formats(&self) -> (MessageFormat, MessageFormat) {
(MessageFormat::Json, MessageFormat::Json)
}
}
pub struct RdfToJsonTransformer;
impl MessageTransformer for RdfToJsonTransformer {
fn transform(&self, message: &ExternalMessage) -> Result<ExternalMessage> {
let mut transformed = message.clone();
transformed.format = MessageFormat::Json;
Ok(transformed)
}
fn name(&self) -> &str {
"rdf-to-json"
}
fn supported_formats(&self) -> (MessageFormat, MessageFormat) {
(MessageFormat::Text, MessageFormat::Json) }
}
#[cfg(test)]
mod tests {
use super::*;
#[tokio::test]
async fn test_bridge_creation() {
let manager = MessageBridgeManager::new().await.unwrap();
let source = ExternalSystemConfig {
system_type: ExternalSystemType::Kafka {
brokers: vec!["localhost:9092".to_string()],
topics: vec!["source-topic".to_string()],
consumer_group: Some("test-group".to_string()),
},
connection: ConnectionConfig {
timeout: Duration::from_secs(30),
keep_alive: Duration::from_secs(60),
retry: RetryConfig {
max_attempts: 3,
initial_delay: Duration::from_millis(100),
max_delay: Duration::from_secs(10),
exponential_backoff: true,
},
tls: None,
},
format: FormatConfig {
format: MessageFormat::Json,
encoding: "utf-8".to_string(),
compression: None,
schema_validation: false,
},
security: SecurityConfig {
auth: AuthenticationMethod::None,
encryption: EncryptionConfig {
enabled: false,
algorithm: None,
key_id: None,
},
access_control: AccessControlConfig {
read_permissions: vec![],
write_permissions: vec![],
admin_permissions: vec![],
},
},
};
let target = source.clone();
manager
.register_transformer(Box::new(JsonTransformer))
.await;
let bridge_id = manager
.create_bridge(
BridgeType::SourceToTarget,
source,
target,
"json".to_string(),
vec![],
BridgeConfig::default(),
)
.await
.unwrap();
assert!(!bridge_id.is_empty());
let bridges = manager.list_bridges().await;
assert_eq!(bridges.len(), 1);
assert_eq!(bridges[0].id, bridge_id);
}
#[tokio::test]
async fn test_routing_rules() {
let engine = RoutingEngine::new();
let rule = RoutingRule {
name: "test-rule".to_string(),
condition: RuleCondition::EventType {
types: vec!["triple_added".to_string()],
},
action: RuleAction::Forward,
priority: 1,
enabled: true,
};
let mut message = ExternalMessage {
id: "test".to_string(),
headers: HashMap::new(),
payload: vec![],
format: MessageFormat::Json,
timestamp: chrono::Utc::now(),
source: "test".to_string(),
metadata: HashMap::new(),
};
message
.headers
.insert("event_type".to_string(), "triple_added".to_string());
let action = engine.evaluate_rules(&[rule], &message).await.unwrap();
assert!(matches!(action, RuleAction::Forward));
}
}