1use anyhow::{anyhow, Result};
11use serde::{Deserialize, Serialize};
12use std::collections::{HashMap, VecDeque};
13use std::sync::Arc;
14use std::time::{Duration, Instant};
15use tokio::sync::{broadcast, RwLock};
16use tokio::time::interval;
17use tracing::{debug, error, info, warn};
18use uuid::Uuid;
19
20pub struct MessageBridgeManager {
22 bridges: Arc<RwLock<HashMap<String, MessageBridge>>>,
24 configs: Arc<RwLock<HashMap<String, BridgeConfig>>>,
26 transformers: Arc<RwLock<HashMap<String, Box<dyn MessageTransformer + Send + Sync>>>>,
28 router: Arc<RoutingEngine>,
30 stats: Arc<RwLock<BridgeStats>>,
32 event_notifier: broadcast::Sender<BridgeNotification>,
34}
35
36#[derive(Clone)]
38struct MessageBridge {
39 id: String,
41 bridge_type: BridgeType,
43 source: ExternalSystemConfig,
45 target: ExternalSystemConfig,
47 transformer: String,
49 routing_rules: Vec<RoutingRule>,
51 status: BridgeStatus,
53 stats: BridgeStatistics,
55 created_at: Instant,
57 last_activity: Option<Instant>,
59}
60
61#[derive(Debug, Clone, Serialize, Deserialize)]
63pub enum BridgeType {
64 Bidirectional,
66 SourceToTarget,
68 TargetToSource,
70 Fanout,
72 Fanin,
74}
75
76#[derive(Debug, Clone, Serialize, Deserialize)]
78pub struct ExternalSystemConfig {
79 pub system_type: ExternalSystemType,
81 pub connection: ConnectionConfig,
83 pub format: FormatConfig,
85 pub security: SecurityConfig,
87}
88
89#[derive(Debug, Clone, Serialize, Deserialize)]
91pub enum ExternalSystemType {
92 Kafka {
94 brokers: Vec<String>,
95 topics: Vec<String>,
96 consumer_group: Option<String>,
97 },
98 RabbitMQ {
100 url: String,
101 exchange: String,
102 routing_key: String,
103 queue: Option<String>,
104 },
105 AmazonSQS {
107 region: String,
108 queue_url: String,
109 credentials: AwsCredentials,
110 },
111 AzureServiceBus {
113 connection_string: String,
114 queue_name: String,
115 },
116 GooglePubSub {
118 project_id: String,
119 topic: String,
120 subscription: Option<String>,
121 },
122 Pulsar {
124 service_url: String,
125 topics: Vec<String>,
126 subscription: Option<String>,
127 },
128 RedisPubSub { url: String, channels: Vec<String> },
130 HttpRest {
132 base_url: String,
133 endpoints: HashMap<String, String>,
134 headers: HashMap<String, String>,
135 },
136 WebSocket { url: String, protocols: Vec<String> },
138 FileSystem {
140 directory: String,
141 pattern: String,
142 watch_mode: bool,
143 },
144 Mqtt {
146 broker_url: String,
147 client_id: String,
148 topic_subscriptions: Vec<String>,
149 qos: u8,
150 username: Option<String>,
151 password: Option<String>,
152 },
153 OpcUa {
155 endpoint_url: String,
156 security_policy: String,
157 user_identity: String,
158 node_subscriptions: Vec<String>,
159 },
160 SparkplugB {
162 broker_url: String,
163 group_id: String,
164 edge_node_id: String,
165 device_ids: Vec<String>,
166 },
167}
168
169#[derive(Debug, Clone, Serialize, Deserialize)]
171pub struct AwsCredentials {
172 pub access_key_id: String,
173 pub secret_access_key: String,
174 pub session_token: Option<String>,
175}
176
177#[derive(Debug, Clone, Serialize, Deserialize)]
179pub struct ConnectionConfig {
180 pub timeout: Duration,
182 pub keep_alive: Duration,
184 pub retry: RetryConfig,
186 pub tls: Option<TlsConfig>,
188}
189
190#[derive(Debug, Clone, Serialize, Deserialize)]
192pub struct TlsConfig {
193 pub enabled: bool,
194 pub verify_certificate: bool,
195 pub certificate_path: Option<String>,
196 pub private_key_path: Option<String>,
197 pub ca_certificate_path: Option<String>,
198}
199
200#[derive(Debug, Clone, Serialize, Deserialize)]
202pub struct RetryConfig {
203 pub max_attempts: u32,
204 pub initial_delay: Duration,
205 pub max_delay: Duration,
206 pub exponential_backoff: bool,
207}
208
209#[derive(Debug, Clone, Serialize, Deserialize)]
211pub struct FormatConfig {
212 pub format: MessageFormat,
214 pub encoding: String,
216 pub compression: Option<CompressionType>,
218 pub schema_validation: bool,
220}
221
222#[derive(Debug, Clone, Serialize, Deserialize)]
224pub enum MessageFormat {
225 Json,
227 Avro { schema: String },
229 Protobuf { schema: String },
231 Xml,
233 Text,
235 Binary,
237 Custom { transformer: String },
239}
240
241#[derive(Debug, Clone, Serialize, Deserialize)]
243pub enum CompressionType {
244 Gzip,
245 Snappy,
246 Lz4,
247 Zstd,
248}
249
250#[derive(Debug, Clone, Serialize, Deserialize)]
252pub struct SecurityConfig {
253 pub auth: AuthenticationMethod,
255 pub encryption: EncryptionConfig,
257 pub access_control: AccessControlConfig,
259}
260
261#[derive(Debug, Clone, Serialize, Deserialize)]
263pub enum AuthenticationMethod {
264 None,
265 BasicAuth {
266 username: String,
267 password: String,
268 },
269 BearerToken {
270 token: String,
271 },
272 ApiKey {
273 key: String,
274 header: String,
275 },
276 OAuth2 {
277 client_id: String,
278 client_secret: String,
279 token_url: String,
280 },
281 SaslPlain {
282 username: String,
283 password: String,
284 },
285 SaslScramSha256 {
286 username: String,
287 password: String,
288 },
289 Certificate {
290 cert_path: String,
291 key_path: String,
292 },
293}
294
295#[derive(Debug, Clone, Serialize, Deserialize)]
297pub struct EncryptionConfig {
298 pub enabled: bool,
299 pub algorithm: Option<String>,
300 pub key_id: Option<String>,
301}
302
303#[derive(Debug, Clone, Serialize, Deserialize)]
305pub struct AccessControlConfig {
306 pub read_permissions: Vec<String>,
307 pub write_permissions: Vec<String>,
308 pub admin_permissions: Vec<String>,
309}
310
311#[derive(Debug, Clone, Serialize, Deserialize)]
313pub struct RoutingRule {
314 pub name: String,
316 pub condition: RuleCondition,
318 pub action: RuleAction,
320 pub priority: u32,
322 pub enabled: bool,
324}
325
326#[derive(Debug, Clone, Serialize, Deserialize)]
328pub enum RuleCondition {
329 Always,
331 EventType { types: Vec<String> },
333 Graph { patterns: Vec<String> },
335 SubjectPattern { regex: String },
337 Predicate { predicates: Vec<String> },
339 Expression { expr: String },
341 Composite {
343 operator: LogicalOperator,
344 conditions: Vec<RuleCondition>,
345 },
346}
347
348#[derive(Debug, Clone, Serialize, Deserialize)]
350pub enum LogicalOperator {
351 And,
352 Or,
353 Not,
354}
355
356#[derive(Debug, Clone, Serialize, Deserialize)]
358pub enum RuleAction {
359 Forward,
361 Drop,
363 Transform { transformer: String },
365 Route { target: String },
367 Duplicate { targets: Vec<String> },
369}
370
371#[derive(Debug, Clone, PartialEq)]
373enum BridgeStatus {
374 Active,
375 #[allow(dead_code)]
376 Paused,
377 Stopped,
378 #[allow(dead_code)]
379 Failed {
380 reason: String,
381 },
382}
383
384#[derive(Debug, Clone)]
386pub struct BridgeConfig {
387 pub max_queue_size: usize,
389 pub batch_size: usize,
391 pub processing_interval: Duration,
393 pub enable_monitoring: bool,
395 pub enable_dlq: bool,
397 pub message_ttl: Duration,
399}
400
401impl Default for BridgeConfig {
402 fn default() -> Self {
403 Self {
404 max_queue_size: 10000,
405 batch_size: 100,
406 processing_interval: Duration::from_millis(100),
407 enable_monitoring: true,
408 enable_dlq: true,
409 message_ttl: Duration::from_secs(24 * 60 * 60),
410 }
411 }
412}
413
414#[derive(Debug, Clone, Default)]
416pub struct BridgeStatistics {
417 pub messages_received: u64,
419 pub messages_sent: u64,
421 pub messages_dropped: u64,
423 pub messages_failed: u64,
425 pub transform_errors: u64,
427 pub avg_processing_time: Duration,
429 pub last_activity: Option<Instant>,
431}
432
433#[derive(Debug, Clone, Default)]
435pub struct BridgeStats {
436 pub total_bridges: usize,
438 pub active_bridges: usize,
440 pub total_messages: u64,
442 pub failed_messages: u64,
444 pub avg_processing_time: Duration,
446}
447
448#[derive(Debug, Clone)]
450pub enum BridgeNotification {
451 BridgeCreated { id: String, bridge_type: BridgeType },
453 BridgeStarted { id: String },
455 BridgeStopped { id: String },
457 BridgeFailed { id: String, reason: String },
459 MessageProcessed {
461 bridge_id: String,
462 message_id: String,
463 duration: Duration,
464 },
465 MessageFailed {
467 bridge_id: String,
468 message_id: String,
469 error: String,
470 },
471}
472
473pub trait MessageTransformer {
475 fn transform(&self, message: &ExternalMessage) -> Result<ExternalMessage>;
477
478 fn name(&self) -> &str;
480
481 fn supported_formats(&self) -> (MessageFormat, MessageFormat);
483}
484
485#[derive(Debug, Clone, Serialize, Deserialize)]
487pub struct ExternalMessage {
488 pub id: String,
490 pub headers: HashMap<String, String>,
492 pub payload: Vec<u8>,
494 pub format: MessageFormat,
496 pub timestamp: chrono::DateTime<chrono::Utc>,
498 pub source: String,
500 pub metadata: HashMap<String, String>,
502}
503
504struct RoutingEngine {
506 _global_rules: Arc<RwLock<Vec<RoutingRule>>>,
508 _rule_cache: Arc<RwLock<HashMap<String, Vec<RoutingRule>>>>,
510}
511
512impl MessageBridgeManager {
513 pub async fn new() -> Result<Self> {
515 let (tx, _) = broadcast::channel(1000);
516
517 Ok(Self {
518 bridges: Arc::new(RwLock::new(HashMap::new())),
519 configs: Arc::new(RwLock::new(HashMap::new())),
520 transformers: Arc::new(RwLock::new(HashMap::new())),
521 router: Arc::new(RoutingEngine::new()),
522 stats: Arc::new(RwLock::new(BridgeStats::default())),
523 event_notifier: tx,
524 })
525 }
526
527 pub async fn register_transformer(
529 &self,
530 transformer: Box<dyn MessageTransformer + Send + Sync>,
531 ) {
532 let name = transformer.name().to_string();
533 self.transformers.write().await.insert(name, transformer);
534 info!("Registered message transformer");
535 }
536
537 pub async fn create_bridge(
539 &self,
540 bridge_type: BridgeType,
541 source: ExternalSystemConfig,
542 target: ExternalSystemConfig,
543 transformer: String,
544 routing_rules: Vec<RoutingRule>,
545 config: BridgeConfig,
546 ) -> Result<String> {
547 if !self.transformers.read().await.contains_key(&transformer) {
549 return Err(anyhow!("Transformer not found: {}", transformer));
550 }
551
552 let bridge_id = Uuid::new_v4().to_string();
554
555 let bridge = MessageBridge {
557 id: bridge_id.clone(),
558 bridge_type: bridge_type.clone(),
559 source,
560 target,
561 transformer,
562 routing_rules,
563 status: BridgeStatus::Stopped,
564 stats: BridgeStatistics::default(),
565 created_at: Instant::now(),
566 last_activity: None,
567 };
568
569 self.bridges.write().await.insert(bridge_id.clone(), bridge);
571 self.configs.write().await.insert(bridge_id.clone(), config);
572
573 let mut stats = self.stats.write().await;
575 stats.total_bridges += 1;
576 drop(stats);
577
578 let _ = self.event_notifier.send(BridgeNotification::BridgeCreated {
580 id: bridge_id.clone(),
581 bridge_type,
582 });
583
584 info!("Created message bridge: {}", bridge_id);
585 Ok(bridge_id)
586 }
587
588 pub async fn start_bridge(&self, bridge_id: &str) -> Result<()> {
590 let bridge_exists = {
591 let mut bridges = self.bridges.write().await;
592 if let Some(bridge) = bridges.get_mut(bridge_id) {
593 bridge.status = BridgeStatus::Active;
594 true
595 } else {
596 false
597 }
598 };
599
600 if !bridge_exists {
601 return Err(anyhow!("Bridge not found"));
602 }
603
604 self.start_bridge_processing(bridge_id).await?;
606
607 self.stats.write().await.active_bridges += 1;
609
610 let _ = self.event_notifier.send(BridgeNotification::BridgeStarted {
612 id: bridge_id.to_string(),
613 });
614
615 info!("Started bridge: {}", bridge_id);
616 Ok(())
617 }
618
619 pub async fn stop_bridge(&self, bridge_id: &str) -> Result<()> {
621 let mut bridges = self.bridges.write().await;
622 let bridge = bridges
623 .get_mut(bridge_id)
624 .ok_or_else(|| anyhow!("Bridge not found"))?;
625
626 bridge.status = BridgeStatus::Stopped;
627
628 self.stats.write().await.active_bridges = bridges
630 .values()
631 .filter(|b| b.status == BridgeStatus::Active)
632 .count();
633
634 let _ = self.event_notifier.send(BridgeNotification::BridgeStopped {
636 id: bridge_id.to_string(),
637 });
638
639 info!("Stopped bridge: {}", bridge_id);
640 Ok(())
641 }
642
643 async fn start_bridge_processing(&self, bridge_id: &str) -> Result<()> {
645 let bridge = {
647 let bridges_guard = self.bridges.read().await;
648 bridges_guard
649 .get(bridge_id)
650 .ok_or_else(|| anyhow!("Bridge not found"))?
651 .clone()
652 };
653
654 let config = {
655 let configs_guard = self.configs.read().await;
656 configs_guard
657 .get(bridge_id)
658 .ok_or_else(|| anyhow!("Bridge config not found"))?
659 .clone()
660 };
661
662 let bridges = self.bridges.clone();
663 let transformers = self.transformers.clone();
664 let router = self.router.clone();
665 let stats = self.stats.clone();
666 let event_notifier = self.event_notifier.clone();
667 let bridge_id = bridge_id.to_string();
668
669 tokio::spawn(async move {
670 let mut interval = interval(config.processing_interval);
671 let mut message_queue = VecDeque::new();
672
673 loop {
674 interval.tick().await;
675
676 let status = {
678 let bridges_guard = bridges.read().await;
679 bridges_guard.get(&bridge_id).map(|b| b.status.clone())
680 };
681
682 if let Some(BridgeStatus::Active) = status {
683 match MessageBridgeManager::receive_messages(&bridge.source, &config).await {
685 Ok(messages) => {
686 for message in messages {
687 message_queue.push_back(message);
688
689 if message_queue.len() > config.max_queue_size {
691 message_queue.pop_front();
692 warn!("Bridge queue full, dropping oldest message");
693 }
694 }
695 }
696 Err(e) => {
697 error!("Failed to receive messages for bridge {}: {}", bridge_id, e);
698 }
699 }
700
701 let batch_size = config.batch_size.min(message_queue.len());
703 if batch_size > 0 {
704 let batch: Vec<_> = message_queue.drain(..batch_size).collect();
705
706 for message in batch {
707 let start_time = Instant::now();
708
709 match MessageBridgeManager::process_message(
710 &bridge,
711 &message,
712 &transformers,
713 &router,
714 )
715 .await
716 {
717 Ok(_) => {
718 let duration = start_time.elapsed();
719
720 MessageBridgeManager::update_bridge_stats(
722 &bridges, &bridge_id, true, duration,
723 )
724 .await;
725 stats.write().await.total_messages += 1;
726
727 let _ =
728 event_notifier.send(BridgeNotification::MessageProcessed {
729 bridge_id: bridge_id.clone(),
730 message_id: message.id.clone(),
731 duration,
732 });
733 }
734 Err(e) => {
735 let duration = start_time.elapsed();
736
737 error!(
738 "Failed to process message {} in bridge {}: {}",
739 message.id, bridge_id, e
740 );
741
742 MessageBridgeManager::update_bridge_stats(
744 &bridges, &bridge_id, false, duration,
745 )
746 .await;
747 stats.write().await.failed_messages += 1;
748
749 let _ =
750 event_notifier.send(BridgeNotification::MessageFailed {
751 bridge_id: bridge_id.clone(),
752 message_id: message.id.clone(),
753 error: e.to_string(),
754 });
755
756 if config.enable_dlq {
758 warn!("Message sent to dead letter queue: {}", message.id);
760 }
761 }
762 }
763 }
764 }
765 } else {
766 break;
768 }
769 }
770 });
771
772 Ok(())
773 }
774
775 async fn receive_messages(
777 source: &ExternalSystemConfig,
778 config: &BridgeConfig,
779 ) -> Result<Vec<ExternalMessage>> {
780 match &source.system_type {
781 ExternalSystemType::Kafka {
782 brokers,
783 topics,
784 consumer_group,
785 } => Self::receive_kafka_messages(brokers, topics, consumer_group, config).await,
786 ExternalSystemType::RabbitMQ {
787 url,
788 exchange,
789 routing_key,
790 queue,
791 } => Self::receive_rabbitmq_messages(url, exchange, routing_key, queue, config).await,
792 ExternalSystemType::RedisPubSub { url, channels } => {
793 Self::receive_redis_messages(url, channels, config).await
794 }
795 ExternalSystemType::HttpRest {
796 base_url,
797 endpoints,
798 headers,
799 } => Self::receive_http_messages(base_url, endpoints, headers, config).await,
800 ExternalSystemType::FileSystem {
801 directory,
802 pattern,
803 watch_mode,
804 } => Self::receive_file_messages(directory, pattern, *watch_mode, config).await,
805 _ => {
806 warn!("Message receiving not implemented for this system type");
807 Ok(vec![])
808 }
809 }
810 }
811
812 async fn receive_kafka_messages(
814 _brokers: &[String],
815 _topics: &[String],
816 _consumer_group: &Option<String>,
817 _config: &BridgeConfig,
818 ) -> Result<Vec<ExternalMessage>> {
819 Ok(vec![])
822 }
823
824 async fn receive_rabbitmq_messages(
826 _url: &str,
827 _exchange: &str,
828 _routing_key: &str,
829 _queue: &Option<String>,
830 _config: &BridgeConfig,
831 ) -> Result<Vec<ExternalMessage>> {
832 Ok(vec![])
834 }
835
836 async fn receive_redis_messages(
838 _url: &str,
839 _channels: &[String],
840 _config: &BridgeConfig,
841 ) -> Result<Vec<ExternalMessage>> {
842 Ok(vec![])
844 }
845
846 async fn receive_http_messages(
848 _base_url: &str,
849 _endpoints: &HashMap<String, String>,
850 _headers: &HashMap<String, String>,
851 _config: &BridgeConfig,
852 ) -> Result<Vec<ExternalMessage>> {
853 Ok(vec![])
855 }
856
857 async fn receive_file_messages(
859 _directory: &str,
860 _pattern: &str,
861 _watch_mode: bool,
862 _config: &BridgeConfig,
863 ) -> Result<Vec<ExternalMessage>> {
864 Ok(vec![])
866 }
867
868 async fn process_message(
870 bridge: &MessageBridge,
871 message: &ExternalMessage,
872 transformers: &Arc<RwLock<HashMap<String, Box<dyn MessageTransformer + Send + Sync>>>>,
873 router: &Arc<RoutingEngine>,
874 ) -> Result<()> {
875 let action = router
877 .evaluate_rules(&bridge.routing_rules, message)
878 .await?;
879
880 match action {
881 RuleAction::Drop => {
882 debug!("Message dropped by routing rule: {}", message.id);
883 return Ok(());
884 }
885 RuleAction::Forward => {
886 }
888 RuleAction::Transform { transformer } => {
889 let transformed = {
891 let transformers_guard = transformers.read().await;
892 let transformer = transformers_guard
893 .get(&transformer)
894 .ok_or_else(|| anyhow!("Transformer not found: {}", transformer))?;
895 transformer.transform(message)?
896 };
897
898 return Self::send_message(&bridge.target, &transformed).await;
899 }
900 _ => {
901 warn!("Routing action not implemented: {:?}", action);
903 }
904 }
905
906 let transformed = {
908 let transformers_guard = transformers.read().await;
909 let transformer = transformers_guard
910 .get(&bridge.transformer)
911 .ok_or_else(|| anyhow!("Transformer not found: {}", bridge.transformer))?;
912 transformer.transform(message)?
913 };
914
915 Self::send_message(&bridge.target, &transformed).await
917 }
918
919 async fn send_message(target: &ExternalSystemConfig, message: &ExternalMessage) -> Result<()> {
921 match &target.system_type {
922 ExternalSystemType::Kafka {
923 brokers, topics, ..
924 } => Self::send_kafka_message(brokers, topics, message).await,
925 ExternalSystemType::RabbitMQ {
926 url,
927 exchange,
928 routing_key,
929 ..
930 } => Self::send_rabbitmq_message(url, exchange, routing_key, message).await,
931 ExternalSystemType::RedisPubSub { url, channels } => {
932 Self::send_redis_message(url, channels, message).await
933 }
934 ExternalSystemType::HttpRest {
935 base_url,
936 endpoints,
937 headers,
938 } => Self::send_http_message(base_url, endpoints, headers, message).await,
939 ExternalSystemType::FileSystem { directory, .. } => {
940 Self::send_file_message(directory, message).await
941 }
942 _ => {
943 warn!("Message sending not implemented for this system type");
944 Ok(())
945 }
946 }
947 }
948
949 async fn send_kafka_message(
951 _brokers: &[String],
952 _topics: &[String],
953 _message: &ExternalMessage,
954 ) -> Result<()> {
955 Ok(())
957 }
958
959 async fn send_rabbitmq_message(
961 _url: &str,
962 _exchange: &str,
963 _routing_key: &str,
964 _message: &ExternalMessage,
965 ) -> Result<()> {
966 Ok(())
968 }
969
970 async fn send_redis_message(
972 _url: &str,
973 _channels: &[String],
974 _message: &ExternalMessage,
975 ) -> Result<()> {
976 Ok(())
978 }
979
980 async fn send_http_message(
982 _base_url: &str,
983 _endpoints: &HashMap<String, String>,
984 _headers: &HashMap<String, String>,
985 _message: &ExternalMessage,
986 ) -> Result<()> {
987 Ok(())
989 }
990
991 async fn send_file_message(_directory: &str, _message: &ExternalMessage) -> Result<()> {
993 Ok(())
995 }
996
997 async fn update_bridge_stats(
999 bridges: &Arc<RwLock<HashMap<String, MessageBridge>>>,
1000 bridge_id: &str,
1001 success: bool,
1002 duration: Duration,
1003 ) {
1004 let mut bridges_guard = bridges.write().await;
1005 if let Some(bridge) = bridges_guard.get_mut(bridge_id) {
1006 bridge.last_activity = Some(Instant::now());
1007
1008 if success {
1009 bridge.stats.messages_sent += 1;
1010 } else {
1011 bridge.stats.messages_failed += 1;
1012 }
1013
1014 let total_messages = bridge.stats.messages_sent + bridge.stats.messages_failed;
1016 if total_messages > 0 {
1017 let avg_nanos = bridge.stats.avg_processing_time.as_nanos() as u64;
1018 let duration_nanos = duration.as_nanos() as u64;
1019 let new_avg_nanos =
1020 (avg_nanos * (total_messages - 1) + duration_nanos) / total_messages;
1021 bridge.stats.avg_processing_time = Duration::from_nanos(new_avg_nanos);
1022 }
1023 }
1024 }
1025
1026 pub async fn get_bridge_stats(&self, bridge_id: &str) -> Result<BridgeStatistics> {
1028 let bridges = self.bridges.read().await;
1029 let bridge = bridges
1030 .get(bridge_id)
1031 .ok_or_else(|| anyhow!("Bridge not found"))?;
1032
1033 Ok(bridge.stats.clone())
1034 }
1035
1036 pub async fn get_stats(&self) -> BridgeStats {
1038 self.stats.read().await.clone()
1039 }
1040
1041 pub async fn list_bridges(&self) -> Vec<BridgeInfo> {
1043 let bridges = self.bridges.read().await;
1044 bridges
1045 .values()
1046 .map(|b| BridgeInfo {
1047 id: b.id.clone(),
1048 bridge_type: b.bridge_type.clone(),
1049 status: format!("{:?}", b.status),
1050 created_at: b.created_at.elapsed(),
1051 last_activity: b.last_activity.map(|t| t.elapsed()),
1052 messages_processed: b.stats.messages_sent + b.stats.messages_failed,
1053 success_rate: if b.stats.messages_sent + b.stats.messages_failed > 0 {
1054 b.stats.messages_sent as f64
1055 / (b.stats.messages_sent + b.stats.messages_failed) as f64
1056 } else {
1057 0.0
1058 },
1059 })
1060 .collect()
1061 }
1062
1063 pub fn subscribe(&self) -> broadcast::Receiver<BridgeNotification> {
1065 self.event_notifier.subscribe()
1066 }
1067}
1068
1069#[derive(Debug, Clone, Serialize, Deserialize)]
1071pub struct BridgeInfo {
1072 pub id: String,
1073 pub bridge_type: BridgeType,
1074 pub status: String,
1075 pub created_at: Duration,
1076 pub last_activity: Option<Duration>,
1077 pub messages_processed: u64,
1078 pub success_rate: f64,
1079}
1080
1081impl RoutingEngine {
1082 fn new() -> Self {
1084 Self {
1085 _global_rules: Arc::new(RwLock::new(Vec::new())),
1086 _rule_cache: Arc::new(RwLock::new(HashMap::new())),
1087 }
1088 }
1089
1090 async fn evaluate_rules(
1092 &self,
1093 rules: &[RoutingRule],
1094 message: &ExternalMessage,
1095 ) -> Result<RuleAction> {
1096 let mut sorted_rules = rules.to_vec();
1098 sorted_rules.sort_by_key(|r| r.priority);
1099
1100 for rule in sorted_rules.iter().filter(|r| r.enabled) {
1102 if self.evaluate_condition(&rule.condition, message).await? {
1103 return Ok(rule.action.clone());
1104 }
1105 }
1106
1107 Ok(RuleAction::Forward)
1109 }
1110
1111 #[allow(clippy::only_used_in_recursion)]
1113 fn evaluate_condition<'a>(
1114 &'a self,
1115 condition: &'a RuleCondition,
1116 message: &'a ExternalMessage,
1117 ) -> std::pin::Pin<Box<dyn std::future::Future<Output = Result<bool>> + Send + 'a>> {
1118 Box::pin(async move {
1119 match condition {
1120 RuleCondition::Always => Ok(true),
1121 RuleCondition::EventType { types } => {
1122 let unknown = "unknown".to_string();
1123 let event_type = message
1124 .headers
1125 .get("event_type")
1126 .or_else(|| message.metadata.get("event_type"))
1127 .unwrap_or(&unknown);
1128 Ok(types.contains(event_type))
1129 }
1130 RuleCondition::Graph { patterns } => {
1131 let graph = message
1132 .headers
1133 .get("graph")
1134 .or_else(|| message.metadata.get("graph"));
1135 if let Some(g) = graph {
1136 Ok(patterns.iter().any(|p| g.contains(p)))
1137 } else {
1138 Ok(false)
1139 }
1140 }
1141 RuleCondition::SubjectPattern { regex } => {
1142 let subject = message
1143 .headers
1144 .get("subject")
1145 .or_else(|| message.metadata.get("subject"));
1146 if let Some(s) = subject {
1147 let re = regex::Regex::new(regex)
1148 .map_err(|e| anyhow!("Invalid regex: {}", e))?;
1149 Ok(re.is_match(s))
1150 } else {
1151 Ok(false)
1152 }
1153 }
1154 RuleCondition::Predicate { predicates } => {
1155 let predicate = message
1156 .headers
1157 .get("predicate")
1158 .or_else(|| message.metadata.get("predicate"));
1159 if let Some(p) = predicate {
1160 Ok(predicates.contains(p))
1161 } else {
1162 Ok(false)
1163 }
1164 }
1165 RuleCondition::Expression { expr } => {
1166 warn!("Expression evaluation not implemented: {}", expr);
1168 Ok(false)
1169 }
1170 RuleCondition::Composite {
1171 operator,
1172 conditions,
1173 } => match operator {
1174 LogicalOperator::And => {
1175 for cond in conditions {
1176 if !self.evaluate_condition(cond, message).await? {
1177 return Ok(false);
1178 }
1179 }
1180 Ok(true)
1181 }
1182 LogicalOperator::Or => {
1183 for cond in conditions {
1184 if self.evaluate_condition(cond, message).await? {
1185 return Ok(true);
1186 }
1187 }
1188 Ok(false)
1189 }
1190 LogicalOperator::Not => {
1191 if conditions.len() != 1 {
1192 return Err(anyhow!("NOT operator requires exactly one condition"));
1193 }
1194 Ok(!self.evaluate_condition(&conditions[0], message).await?)
1195 }
1196 },
1197 }
1198 })
1199 }
1200}
1201
1202pub struct JsonTransformer;
1204
1205impl MessageTransformer for JsonTransformer {
1206 fn transform(&self, message: &ExternalMessage) -> Result<ExternalMessage> {
1207 Ok(message.clone())
1209 }
1210
1211 fn name(&self) -> &str {
1212 "json"
1213 }
1214
1215 fn supported_formats(&self) -> (MessageFormat, MessageFormat) {
1216 (MessageFormat::Json, MessageFormat::Json)
1217 }
1218}
1219
1220pub struct RdfToJsonTransformer;
1222
1223impl MessageTransformer for RdfToJsonTransformer {
1224 fn transform(&self, message: &ExternalMessage) -> Result<ExternalMessage> {
1225 let mut transformed = message.clone();
1227 transformed.format = MessageFormat::Json;
1228
1229 Ok(transformed)
1233 }
1234
1235 fn name(&self) -> &str {
1236 "rdf-to-json"
1237 }
1238
1239 fn supported_formats(&self) -> (MessageFormat, MessageFormat) {
1240 (MessageFormat::Text, MessageFormat::Json) }
1242}
1243
1244#[cfg(test)]
1245mod tests {
1246 use super::*;
1247
1248 #[tokio::test]
1249 async fn test_bridge_creation() {
1250 let manager = MessageBridgeManager::new().await.unwrap();
1251
1252 let source = ExternalSystemConfig {
1253 system_type: ExternalSystemType::Kafka {
1254 brokers: vec!["localhost:9092".to_string()],
1255 topics: vec!["source-topic".to_string()],
1256 consumer_group: Some("test-group".to_string()),
1257 },
1258 connection: ConnectionConfig {
1259 timeout: Duration::from_secs(30),
1260 keep_alive: Duration::from_secs(60),
1261 retry: RetryConfig {
1262 max_attempts: 3,
1263 initial_delay: Duration::from_millis(100),
1264 max_delay: Duration::from_secs(10),
1265 exponential_backoff: true,
1266 },
1267 tls: None,
1268 },
1269 format: FormatConfig {
1270 format: MessageFormat::Json,
1271 encoding: "utf-8".to_string(),
1272 compression: None,
1273 schema_validation: false,
1274 },
1275 security: SecurityConfig {
1276 auth: AuthenticationMethod::None,
1277 encryption: EncryptionConfig {
1278 enabled: false,
1279 algorithm: None,
1280 key_id: None,
1281 },
1282 access_control: AccessControlConfig {
1283 read_permissions: vec![],
1284 write_permissions: vec![],
1285 admin_permissions: vec![],
1286 },
1287 },
1288 };
1289
1290 let target = source.clone(); manager
1294 .register_transformer(Box::new(JsonTransformer))
1295 .await;
1296
1297 let bridge_id = manager
1298 .create_bridge(
1299 BridgeType::SourceToTarget,
1300 source,
1301 target,
1302 "json".to_string(),
1303 vec![],
1304 BridgeConfig::default(),
1305 )
1306 .await
1307 .unwrap();
1308
1309 assert!(!bridge_id.is_empty());
1310
1311 let bridges = manager.list_bridges().await;
1312 assert_eq!(bridges.len(), 1);
1313 assert_eq!(bridges[0].id, bridge_id);
1314 }
1315
1316 #[tokio::test]
1317 async fn test_routing_rules() {
1318 let engine = RoutingEngine::new();
1319
1320 let rule = RoutingRule {
1321 name: "test-rule".to_string(),
1322 condition: RuleCondition::EventType {
1323 types: vec!["triple_added".to_string()],
1324 },
1325 action: RuleAction::Forward,
1326 priority: 1,
1327 enabled: true,
1328 };
1329
1330 let mut message = ExternalMessage {
1331 id: "test".to_string(),
1332 headers: HashMap::new(),
1333 payload: vec![],
1334 format: MessageFormat::Json,
1335 timestamp: chrono::Utc::now(),
1336 source: "test".to_string(),
1337 metadata: HashMap::new(),
1338 };
1339
1340 message
1341 .headers
1342 .insert("event_type".to_string(), "triple_added".to_string());
1343
1344 let action = engine.evaluate_rules(&[rule], &message).await.unwrap();
1345 assert!(matches!(action, RuleAction::Forward));
1346 }
1347}