scirs2_core/advanced_ecosystem_integration/
communication.rs1use super::types::*;
4use crate::error::{CoreError, CoreResult, ErrorContext};
5use serde::{Deserialize, Serialize};
6use std::collections::HashMap;
7use std::time::{Duration, Instant};
8
9#[allow(dead_code)]
11#[derive(Debug)]
12pub struct ModuleCommunicationHub {
13 message_queues: HashMap<String, Vec<InterModuleMessage>>,
15 #[allow(dead_code)]
17 comm_stats: CommunicationStatistics,
18 routing_table: HashMap<String, Vec<String>>,
20}
21
22#[allow(dead_code)]
24#[derive(Debug, Clone)]
25pub struct CommunicationStatistics {
26 pub messages_sent: u64,
28 pub messages_received: u64,
30 pub avg_latency: Duration,
32 pub error_rate: f64,
34}
35
36#[allow(dead_code)]
38#[derive(Debug, Clone, Serialize, Deserialize)]
39pub struct OptimizationOpportunity {
40 pub modulename: String,
42 pub opportunity_type: String,
44 pub description: String,
46 pub potential_improvement: f64,
48 pub priority: Priority,
50}
51
52impl Default for ModuleCommunicationHub {
53 fn default() -> Self {
54 Self::new()
55 }
56}
57
58impl ModuleCommunicationHub {
59 pub fn new() -> Self {
61 Self {
62 message_queues: HashMap::new(),
63 comm_stats: CommunicationStatistics {
64 messages_sent: 0,
65 messages_received: 0,
66 avg_latency: Duration::default(),
67 error_rate: 0.0,
68 },
69 routing_table: HashMap::new(),
70 }
71 }
72
73 pub fn send_message(&mut self, message: InterModuleMessage) -> CoreResult<()> {
75 if message.from.is_empty() || message.to.is_empty() {
77 return Err(CoreError::InvalidInput(ErrorContext::new(
78 "Message must have valid source and destination",
79 )));
80 }
81
82 if !self.message_queues.contains_key(&message.to) {
84 self.message_queues.insert(message.to.clone(), Vec::new());
85 }
86
87 if let Some(queue) = self.message_queues.get_mut(&message.to) {
89 queue.push(message);
90 }
91
92 self.comm_stats.messages_sent += 1;
94
95 Ok(())
96 }
97
98 pub fn receive_messages(&mut self, module_name: &str) -> Vec<InterModuleMessage> {
100 if let Some(queue) = self.message_queues.get_mut(module_name) {
101 let message_count = queue.len();
102 let messages = std::mem::take(queue);
103 self.comm_stats.messages_received += message_count as u64;
104 messages
105 } else {
106 Vec::new()
107 }
108 }
109
110 pub fn get_pending_count(&self, module_name: &str) -> usize {
112 self.message_queues
113 .get(module_name)
114 .map_or(0, |queue| queue.len())
115 }
116
117 pub fn optimize_routing(&mut self) -> CoreResult<()> {
119 self.message_queues.clear();
121
122 for (source, destinations) in &mut self.routing_table {
124 destinations.sort();
126 println!(" ๐ Optimized routing for module: {source}");
127 }
128
129 Ok(())
130 }
131
132 pub fn enable_compression(&mut self) -> CoreResult<()> {
134 println!(" ๐๏ธ Enabled message compression");
135 Ok(())
136 }
137
138 pub fn add_route(&mut self, source: String, destination: String) {
140 self.routing_table
141 .entry(source)
142 .or_default()
143 .push(destination);
144 }
145
146 pub fn remove_route(&mut self, source: &str, destination: &str) {
148 if let Some(destinations) = self.routing_table.get_mut(source) {
149 destinations.retain(|dest| dest != destination);
150 }
151 }
152
153 pub fn get_routes(&self, source: &str) -> Vec<String> {
155 self.routing_table.get(source).cloned().unwrap_or_default()
156 }
157
158 pub fn broadcast_message(&mut self, message: InterModuleMessage) -> CoreResult<()> {
160 let destinations = self
161 .routing_table
162 .get(&message.from)
163 .cloned()
164 .unwrap_or_default();
165 for destination in destinations {
166 let mut broadcast_message = message.clone();
167 broadcast_message.to = destination;
168 self.send_message(broadcast_message)?;
169 }
170 Ok(())
171 }
172
173 pub fn get_statistics(&self) -> &CommunicationStatistics {
175 &self.comm_stats
176 }
177
178 pub fn reset_statistics(&mut self) {
180 self.comm_stats = CommunicationStatistics {
181 messages_sent: 0,
182 messages_received: 0,
183 avg_latency: Duration::default(),
184 error_rate: 0.0,
185 };
186 }
187
188 pub fn update_latency(&mut self, latency: Duration) {
190 if self.comm_stats.avg_latency.is_zero() {
192 self.comm_stats.avg_latency = latency;
193 } else {
194 let current_nanos = self.comm_stats.avg_latency.as_nanos();
195 let new_nanos = latency.as_nanos();
196 let avg_nanos = (current_nanos + new_nanos) / 2;
197 self.comm_stats.avg_latency = Duration::from_nanos(avg_nanos as u64);
198 }
199 }
200
201 pub fn update_error_rate(&mut self, error_occurred: bool) {
203 let total_messages = self.comm_stats.messages_sent + self.comm_stats.messages_received;
204 if total_messages > 0 {
205 if error_occurred {
206 self.comm_stats.error_rate =
207 (self.comm_stats.error_rate * (total_messages - 1) as f64 + 1.0)
208 / total_messages as f64;
209 } else {
210 self.comm_stats.error_rate = (self.comm_stats.error_rate
211 * (total_messages - 1) as f64)
212 / total_messages as f64;
213 }
214 }
215 }
216
217 pub fn clear_queues(&mut self) {
219 self.message_queues.clear();
220 println!(" ๐งน Cleared all message queues");
221 }
222
223 pub fn clear_module_queue(&mut self, module_name: &str) {
225 if let Some(queue) = self.message_queues.get_mut(module_name) {
226 queue.clear();
227 println!(" ๐งน Cleared message queue for module: {}", module_name);
228 }
229 }
230
231 pub fn get_memory_usage(&self) -> usize {
233 self.message_queues
234 .values()
235 .map(|queue| queue.len() * std::mem::size_of::<InterModuleMessage>())
236 .sum()
237 }
238
239 pub fn cleanup_expired_messages(&mut self, max_age: Duration) {
241 let now = Instant::now();
242 let mut cleaned_count = 0;
243
244 for queue in self.message_queues.values_mut() {
245 let original_len = queue.len();
246 queue.retain(|msg| now.duration_since(msg.timestamp) < max_age);
247 cleaned_count += original_len - queue.len();
248 }
249
250 if cleaned_count > 0 {
251 println!(" ๐งน Cleaned up {} expired messages", cleaned_count);
252 }
253 }
254
255 pub fn get_health_status(&self) -> CommunicationHealth {
257 let total_queue_size: usize = self.message_queues.values().map(|q| q.len()).sum();
258 let memory_usage = self.get_memory_usage();
259
260 if self.comm_stats.error_rate > 0.1 {
261 CommunicationHealth::Critical
262 } else if total_queue_size > 10000 || memory_usage > 100 * 1024 * 1024 {
263 CommunicationHealth::Warning
264 } else if self.comm_stats.error_rate > 0.05 {
265 CommunicationHealth::Degraded
266 } else {
267 CommunicationHealth::Healthy
268 }
269 }
270
271 pub fn create_optimized_pipeline(
273 &self,
274 input: &AdvancedInput,
275 config: &CrossModuleOptimizationConfig,
276 ) -> CoreResult<OptimizedPipeline> {
277 let stages = vec![
278 PipelineStage {
279 name: "preprocessing".to_string(),
280 module: "core".to_string(),
281 config: HashMap::from([("operation".to_string(), "normalize".to_string())]),
282 dependencies: vec![],
283 },
284 PipelineStage {
285 name: "processing".to_string(),
286 module: input.context.operationtype.clone(),
287 config: HashMap::from([("operation".to_string(), "advanced_process".to_string())]),
288 dependencies: vec!["preprocessing".to_string()],
289 },
290 ];
291
292 Ok(OptimizedPipeline {
293 stages,
294 optimization_level: config.optimization_level.clone(),
295 estimated_performance: PerformanceMetrics {
296 throughput: 1000.0,
297 latency: Duration::from_millis(100),
298 cpu_usage: 50.0,
299 memory_usage: 1024 * 1024,
300 gpu_usage: 30.0,
301 },
302 })
303 }
304}
305
306#[derive(Debug, Clone, PartialEq)]
308pub enum CommunicationHealth {
309 Healthy,
310 Warning,
311 Degraded,
312 Critical,
313}
314
315#[derive(Debug, Clone)]
317pub struct DeliveryConfirmation {
318 pub message_id: String,
319 pub delivered: bool,
320 pub delivery_time: Duration,
321 pub error_message: Option<String>,
322}
323
324#[derive(Debug, Clone)]
326pub struct ChannelConfig {
327 pub max_queue_size: usize,
328 pub compression_enabled: bool,
329 pub encryption_enabled: bool,
330 pub priority_enabled: bool,
331}
332
333impl Default for ChannelConfig {
334 fn default() -> Self {
335 Self {
336 max_queue_size: 1000,
337 compression_enabled: false,
338 encryption_enabled: false,
339 priority_enabled: true,
340 }
341 }
342}