scirs2_core/advanced_ecosystem_integration/
communication.rs1use super::types::*;
4use crate::error::{CoreError, CoreResult, ErrorContext};
5#[cfg(feature = "serialization")]
6use serde::{Deserialize, Serialize};
7use std::collections::HashMap;
8use std::time::{Duration, Instant};
9
10#[allow(dead_code)]
12#[derive(Debug)]
13pub struct ModuleCommunicationHub {
14 message_queues: HashMap<String, Vec<InterModuleMessage>>,
16 #[allow(dead_code)]
18 comm_stats: CommunicationStatistics,
19 routing_table: HashMap<String, Vec<String>>,
21}
22
23#[allow(dead_code)]
25#[derive(Debug, Clone)]
26pub struct CommunicationStatistics {
27 pub messages_sent: u64,
29 pub messages_received: u64,
31 pub avg_latency: Duration,
33 pub error_rate: f64,
35}
36
37#[allow(dead_code)]
39#[cfg_attr(feature = "serialization", derive(Serialize, Deserialize))]
40#[derive(Debug, Clone)]
41pub struct OptimizationOpportunity {
42 pub modulename: String,
44 pub opportunity_type: String,
46 pub description: String,
48 pub potential_improvement: f64,
50 pub priority: Priority,
52}
53
54impl Default for ModuleCommunicationHub {
55 fn default() -> Self {
56 Self::new()
57 }
58}
59
60impl ModuleCommunicationHub {
61 pub fn new() -> Self {
63 Self {
64 message_queues: HashMap::new(),
65 comm_stats: CommunicationStatistics {
66 messages_sent: 0,
67 messages_received: 0,
68 avg_latency: Duration::default(),
69 error_rate: 0.0,
70 },
71 routing_table: HashMap::new(),
72 }
73 }
74
75 pub fn send_message(&mut self, message: InterModuleMessage) -> CoreResult<()> {
77 if message.from.is_empty() || message.to.is_empty() {
79 return Err(CoreError::InvalidInput(ErrorContext::new(
80 "Message must have valid source and destination",
81 )));
82 }
83
84 if !self.message_queues.contains_key(&message.to) {
86 self.message_queues.insert(message.to.clone(), Vec::new());
87 }
88
89 if let Some(queue) = self.message_queues.get_mut(&message.to) {
91 queue.push(message);
92 }
93
94 self.comm_stats.messages_sent += 1;
96
97 Ok(())
98 }
99
100 pub fn receive_messages(&mut self, module_name: &str) -> Vec<InterModuleMessage> {
102 if let Some(queue) = self.message_queues.get_mut(module_name) {
103 let message_count = queue.len();
104 let messages = std::mem::take(queue);
105 self.comm_stats.messages_received += message_count as u64;
106 messages
107 } else {
108 Vec::new()
109 }
110 }
111
112 pub fn get_pending_count(&self, module_name: &str) -> usize {
114 self.message_queues
115 .get(module_name)
116 .map_or(0, |queue| queue.len())
117 }
118
119 pub fn optimize_routing(&mut self) -> CoreResult<()> {
121 self.message_queues.clear();
123
124 for (source, destinations) in &mut self.routing_table {
126 destinations.sort();
128 println!(" ๐ Optimized routing for module: {source}");
129 }
130
131 Ok(())
132 }
133
134 pub fn enable_compression(&mut self) -> CoreResult<()> {
136 println!(" ๐๏ธ Enabled message compression");
137 Ok(())
138 }
139
140 pub fn add_route(&mut self, source: String, destination: String) {
142 self.routing_table
143 .entry(source)
144 .or_default()
145 .push(destination);
146 }
147
148 pub fn remove_route(&mut self, source: &str, destination: &str) {
150 if let Some(destinations) = self.routing_table.get_mut(source) {
151 destinations.retain(|dest| dest != destination);
152 }
153 }
154
155 pub fn get_routes(&self, source: &str) -> Vec<String> {
157 self.routing_table.get(source).cloned().unwrap_or_default()
158 }
159
160 pub fn broadcast_message(&mut self, message: InterModuleMessage) -> CoreResult<()> {
162 let destinations = self
163 .routing_table
164 .get(&message.from)
165 .cloned()
166 .unwrap_or_default();
167 for destination in destinations {
168 let mut broadcast_message = message.clone();
169 broadcast_message.to = destination;
170 self.send_message(broadcast_message)?;
171 }
172 Ok(())
173 }
174
175 pub fn get_statistics(&self) -> &CommunicationStatistics {
177 &self.comm_stats
178 }
179
180 pub fn reset_statistics(&mut self) {
182 self.comm_stats = CommunicationStatistics {
183 messages_sent: 0,
184 messages_received: 0,
185 avg_latency: Duration::default(),
186 error_rate: 0.0,
187 };
188 }
189
190 pub fn update_latency(&mut self, latency: Duration) {
192 if self.comm_stats.avg_latency.is_zero() {
194 self.comm_stats.avg_latency = latency;
195 } else {
196 let current_nanos = self.comm_stats.avg_latency.as_nanos();
197 let new_nanos = latency.as_nanos();
198 let avg_nanos = (current_nanos + new_nanos) / 2;
199 self.comm_stats.avg_latency = Duration::from_nanos(avg_nanos as u64);
200 }
201 }
202
203 pub fn update_error_rate(&mut self, error_occurred: bool) {
205 let total_messages = self.comm_stats.messages_sent + self.comm_stats.messages_received;
206 if total_messages > 0 {
207 if error_occurred {
208 self.comm_stats.error_rate =
209 (self.comm_stats.error_rate * (total_messages - 1) as f64 + 1.0)
210 / total_messages as f64;
211 } else {
212 self.comm_stats.error_rate = (self.comm_stats.error_rate
213 * (total_messages - 1) as f64)
214 / total_messages as f64;
215 }
216 }
217 }
218
219 pub fn clear_queues(&mut self) {
221 self.message_queues.clear();
222 println!(" ๐งน Cleared all message queues");
223 }
224
225 pub fn clear_module_queue(&mut self, module_name: &str) {
227 if let Some(queue) = self.message_queues.get_mut(module_name) {
228 queue.clear();
229 println!(" ๐งน Cleared message queue for module: {}", module_name);
230 }
231 }
232
233 pub fn get_memory_usage(&self) -> usize {
235 self.message_queues
236 .values()
237 .map(|queue| queue.len() * std::mem::size_of::<InterModuleMessage>())
238 .sum()
239 }
240
241 pub fn cleanup_expired_messages(&mut self, max_age: Duration) {
243 let now = Instant::now();
244 let mut cleaned_count = 0;
245
246 for queue in self.message_queues.values_mut() {
247 let original_len = queue.len();
248 queue.retain(|msg| now.duration_since(msg.timestamp) < max_age);
249 cleaned_count += original_len - queue.len();
250 }
251
252 if cleaned_count > 0 {
253 println!(" ๐งน Cleaned up {} expired messages", cleaned_count);
254 }
255 }
256
257 pub fn get_health_status(&self) -> CommunicationHealth {
259 let total_queue_size: usize = self.message_queues.values().map(|q| q.len()).sum();
260 let memory_usage = self.get_memory_usage();
261
262 if self.comm_stats.error_rate > 0.1 {
263 CommunicationHealth::Critical
264 } else if total_queue_size > 10000 || memory_usage > 100 * 1024 * 1024 {
265 CommunicationHealth::Warning
266 } else if self.comm_stats.error_rate > 0.05 {
267 CommunicationHealth::Degraded
268 } else {
269 CommunicationHealth::Healthy
270 }
271 }
272
273 pub fn create_optimized_pipeline(
275 &self,
276 input: &AdvancedInput,
277 config: &CrossModuleOptimizationConfig,
278 ) -> CoreResult<OptimizedPipeline> {
279 let stages = vec![
280 PipelineStage {
281 name: "preprocessing".to_string(),
282 module: "core".to_string(),
283 config: HashMap::from([("operation".to_string(), "normalize".to_string())]),
284 dependencies: vec![],
285 },
286 PipelineStage {
287 name: "processing".to_string(),
288 module: input.context.operationtype.clone(),
289 config: HashMap::from([("operation".to_string(), "advanced_process".to_string())]),
290 dependencies: vec!["preprocessing".to_string()],
291 },
292 ];
293
294 Ok(OptimizedPipeline {
295 stages,
296 optimization_level: config.optimization_level.clone(),
297 estimated_performance: PerformanceMetrics {
298 throughput: 1000.0,
299 latency: Duration::from_millis(100),
300 cpu_usage: 50.0,
301 memory_usage: 1024 * 1024,
302 gpu_usage: 30.0,
303 },
304 })
305 }
306}
307
308#[derive(Debug, Clone, PartialEq)]
310pub enum CommunicationHealth {
311 Healthy,
312 Warning,
313 Degraded,
314 Critical,
315}
316
317#[derive(Debug, Clone)]
319pub struct DeliveryConfirmation {
320 pub message_id: String,
321 pub delivered: bool,
322 pub delivery_time: Duration,
323 pub error_message: Option<String>,
324}
325
326#[derive(Debug, Clone)]
328pub struct ChannelConfig {
329 pub max_queue_size: usize,
330 pub compression_enabled: bool,
331 pub encryption_enabled: bool,
332 pub priority_enabled: bool,
333}
334
335impl Default for ChannelConfig {
336 fn default() -> Self {
337 Self {
338 max_queue_size: 1000,
339 compression_enabled: false,
340 encryption_enabled: false,
341 priority_enabled: true,
342 }
343 }
344}