scirs2_core/
logging.rs

1//! # Logging and Diagnostics
2//!
3//! This module provides structured logging and diagnostics utilities for scientific computing.
4//!
5//! ## Features
6//!
7//! * Structured logging for scientific computing
8//! * Enhanced progress tracking with multiple visualization styles
9//! * Performance metrics collection
10//! * Log filtering and formatting
11//! * Multi-progress tracking for parallel operations
12//! * Adaptive update rates and predictive ETA calculations
13//!
14//! ## Usage
15//!
16//! ```rust,no_run
17//! use scirs2_core::logging::{Logger, LogLevel, ProgressTracker};
18//! use scirs2_core::logging::progress::{ProgressBuilder, ProgressStyle};
19//!
20//! // Create a logger
21//! let logger = Logger::new("matrix_operations");
22//!
23//! // Log messages at different levels
24//! logger.info("Starting matrix multiplication");
25//! logger.debug("Using algorithm: Standard");
26//!
27//! // Create an enhanced progress tracker
28//! let mut progress = ProgressBuilder::new("Matrix multiplication", 1000)
29//!     .style(ProgressStyle::DetailedBar)
30//!     .show_statistics(true)
31//!     .build();
32//!
33//! progress.start();
34//!
35//! for i in 0..1000 {
36//!     // Perform computation
37//!
38//!     // Update progress
39//!     progress.update(i + 1);
40//!
41//!     // Log intermediate results at low frequency to avoid flooding logs
42//!     if i % 100 == 0 {
43//!         logger.debug(&format!("Completed {}/1000 iterations", i + 1));
44//!     }
45//! }
46//!
47//! // Complete the progress tracking
48//! progress.finish();
49//!
50//! logger.info("Matrix multiplication completed");
51//! ```
52
53use once_cell::sync::Lazy;
54use std::collections::HashMap;
55use std::fmt::Display;
56use std::sync::{Arc, Mutex};
57use std::time::{Duration, Instant};
58
59// Enhanced progress tracking module
60pub mod progress;
61
62/// Smart rate limiting for high-frequency log events
63pub mod rate_limiting;
64
65/// Log level enumeration
66#[derive(Debug, Clone, Copy, PartialEq, Eq, PartialOrd, Ord, Hash)]
67pub enum LogLevel {
68    /// Trace level for detailed debugging
69    Trace = 0,
70    /// Debug level for debugging information
71    Debug = 1,
72    /// Info level for general information
73    Info = 2,
74    /// Warning level for potential issues
75    Warn = 3,
76    /// Error level for error conditions
77    Error = 4,
78    /// Critical level for critical errors
79    Critical = 5,
80}
81
82impl LogLevel {
83    /// Convert a log level to a string
84    pub const fn as_str(&self) -> &'static str {
85        match self {
86            LogLevel::Trace => "TRACE",
87            LogLevel::Debug => "DEBUG",
88            LogLevel::Info => "INFO",
89            LogLevel::Warn => "WARN",
90            LogLevel::Error => "ERROR",
91            LogLevel::Critical => "CRITICAL",
92        }
93    }
94}
95
96/// Structured log entry
97#[derive(Debug, Clone)]
98pub struct LogEntry {
99    /// Timestamp of the log entry
100    pub timestamp: std::time::SystemTime,
101    /// Log level
102    pub level: LogLevel,
103    /// Module or component name
104    pub module: String,
105    /// Log message
106    pub message: String,
107    /// Additional context fields
108    pub fields: HashMap<String, String>,
109}
110
111/// Logger configuration
112#[derive(Debug, Clone)]
113pub struct LoggerConfig {
114    /// Minimum log level to record
115    pub min_level: LogLevel,
116    /// Enable/disable timestamps
117    pub show_timestamps: bool,
118    /// Enable/disable module names
119    pub show_modules: bool,
120    /// Module-specific log levels
121    pub module_levels: HashMap<String, LogLevel>,
122}
123
124impl Default for LoggerConfig {
125    fn default() -> Self {
126        Self {
127            min_level: LogLevel::Info,
128            show_timestamps: true,
129            show_modules: true,
130            module_levels: HashMap::new(),
131        }
132    }
133}
134
135/// Global logger configuration
136static LOGGER_CONFIG: Lazy<Mutex<LoggerConfig>> = Lazy::new(|| Mutex::new(LoggerConfig::default()));
137
138/// Configure the global logger
139#[allow(dead_code)]
140pub fn configurelogger(config: LoggerConfig) {
141    let mut global_config = LOGGER_CONFIG.lock().expect("Operation failed");
142    *global_config = config;
143}
144
145/// Set the global minimum log level
146#[allow(dead_code)]
147pub fn set_level(level: LogLevel) {
148    let mut config = LOGGER_CONFIG.lock().expect("Operation failed");
149    config.min_level = level;
150}
151
152/// Set a module-specific log level
153#[allow(dead_code)]
154pub fn set_module_level(module: &str, level: LogLevel) {
155    let mut config = LOGGER_CONFIG.lock().expect("Operation failed");
156    config.module_levels.insert(module.to_string(), level);
157}
158
159/// Handler trait for processing log entries
160pub trait LogHandler: Send + Sync {
161    /// Handle a log entry
162    fn handle(&self, entry: &LogEntry);
163}
164
165/// Console log handler
166pub struct ConsoleLogHandler {
167    /// Format string for log entries
168    pub format: String,
169}
170
171impl Default for ConsoleLogHandler {
172    fn default() -> Self {
173        Self {
174            format: "[{level}] {module}: {message}".to_string(),
175        }
176    }
177}
178
179impl LogHandler for ConsoleLogHandler {
180    fn handle(&self, entry: &LogEntry) {
181        let mut output = self.format.clone();
182
183        // Replace placeholders in the format string
184        output = output.replace("{level}", entry.level.as_str());
185        output = output.replace("{module}", &entry.module);
186        output = output.replace("{message}", &entry.message);
187
188        if self.format.contains("{timestamp}") {
189            let datetime = chrono::DateTime::<chrono::Utc>::from(entry.timestamp);
190            output = output.replace(
191                "{timestamp}",
192                &datetime.format("%Y-%m-%d %H:%M:%S%.3f").to_string(),
193            );
194        }
195
196        if self.format.contains("{fields}") {
197            let fields_str = entry
198                .fields
199                .iter()
200                .map(|(k, v)| format!("{k}={v}"))
201                .collect::<Vec<_>>()
202                .join(", ");
203            output = output.replace("{fields}", &fields_str);
204        }
205
206        // Print to the appropriate output stream based on level
207        match entry.level {
208            LogLevel::Error | LogLevel::Critical => eprintln!("{output}"),
209            _ => println!("{output}"),
210        }
211    }
212}
213
214/// File log handler
215pub struct FileLogHandler {
216    /// Path to the log file
217    pub file_path: String,
218    /// Format string for log entries
219    pub format: String,
220}
221
222impl LogHandler for FileLogHandler {
223    fn handle(&self, entry: &LogEntry) {
224        // This is a simplified implementation
225        // A real implementation would handle file I/O more efficiently
226
227        let mut output = self.format.clone();
228
229        // Replace placeholders in the format string
230        output = output.replace("{level}", entry.level.as_str());
231        output = output.replace("{module}", &entry.module);
232        output = output.replace("{message}", &entry.message);
233
234        if self.format.contains("{timestamp}") {
235            let datetime = chrono::DateTime::<chrono::Utc>::from(entry.timestamp);
236            output = output.replace(
237                "{timestamp}",
238                &datetime.format("%Y-%m-%d %H:%M:%S%.3f").to_string(),
239            );
240        }
241
242        if self.format.contains("{fields}") {
243            let fields_str = entry
244                .fields
245                .iter()
246                .map(|(k, v)| format!("{k}={v}"))
247                .collect::<Vec<_>>()
248                .join(", ");
249            output = output.replace("{fields}", &fields_str);
250        }
251
252        // Append to the log file
253        // This would use proper error handling and buffering in a real implementation
254        if let Ok(mut file) = std::fs::OpenOptions::new()
255            .create(true)
256            .append(true)
257            .open(&self.file_path)
258        {
259            use std::io::Write;
260            let _ = writeln!(file, "{output}");
261        }
262    }
263}
264
265/// Global log handlers
266static LOG_HANDLERS: Lazy<Mutex<Vec<Arc<dyn LogHandler>>>> = Lazy::new(|| {
267    let console_handler = Arc::new(ConsoleLogHandler::default());
268    Mutex::new(vec![console_handler])
269});
270
271/// Register a log handler
272#[allow(dead_code)]
273pub fn set_handler(handler: Arc<dyn LogHandler>) {
274    let mut handlers = LOG_HANDLERS.lock().expect("Operation failed");
275    handlers.push(handler);
276}
277
278/// Clear all log handlers
279#[allow(dead_code)]
280pub fn clearlog_handlers() {
281    let mut handlers = LOG_HANDLERS.lock().expect("Operation failed");
282    handlers.clear();
283}
284
285/// Reset log handlers to the default configuration
286#[allow(dead_code)]
287pub fn resetlog_handlers() {
288    let mut handlers = LOG_HANDLERS.lock().expect("Operation failed");
289    handlers.clear();
290    handlers.push(Arc::new(ConsoleLogHandler::default()));
291}
292
293/// Logger for a specific module
294#[derive(Clone)]
295pub struct Logger {
296    /// Module name
297    module: String,
298    /// Additional context fields
299    fields: HashMap<String, String>,
300}
301
302impl Logger {
303    /// Create a new logger for the specified module
304    pub fn new(module: &str) -> Self {
305        Self {
306            module: module.to_string(),
307            fields: HashMap::new(),
308        }
309    }
310
311    /// Add a context field to the logger
312    pub fn with_field<K, V>(mut self, key: K, value: V) -> Self
313    where
314        K: Into<String>,
315        V: Display,
316    {
317        self.fields.insert(key.into(), format!("{value}"));
318        self
319    }
320
321    /// Add multiple context fields to the logger
322    pub fn with_fields<K, V, I>(mut self, fields: I) -> Self
323    where
324        K: Into<String>,
325        V: Display,
326        I: IntoIterator<Item = (K, V)>,
327    {
328        for (key, value) in fields {
329            self.fields.insert(key.into(), format!("{value}"));
330        }
331        self
332    }
333
334    /// Log a message at a specific level
335    pub fn writelog(&self, level: LogLevel, message: &str) {
336        // Check if this log should be processed based on configuration
337        let config = LOGGER_CONFIG.lock().expect("Operation failed");
338        let module_level = config
339            .module_levels
340            .get(&self.module)
341            .copied()
342            .unwrap_or(config.min_level);
343
344        if level < module_level {
345            return;
346        }
347
348        // Create the log entry
349        let entry = LogEntry {
350            timestamp: std::time::SystemTime::now(),
351            level,
352            module: self.module.clone(),
353            message: message.to_string(),
354            fields: self.fields.clone(),
355        };
356
357        // Process the log entry with all registered handlers
358        let handlers = LOG_HANDLERS.lock().expect("Operation failed");
359        for handler in handlers.iter() {
360            handler.handle(&entry);
361        }
362    }
363
364    /// Log a message at trace level
365    pub fn trace(&self, message: &str) {
366        self.writelog(LogLevel::Trace, message);
367    }
368
369    /// Log a message at debug level
370    pub fn debug(&self, message: &str) {
371        self.writelog(LogLevel::Debug, message);
372    }
373
374    /// Log a message at info level
375    pub fn info(&self, message: &str) {
376        self.writelog(LogLevel::Info, message);
377    }
378
379    /// Log a message at warning level
380    pub fn warn(&self, message: &str) {
381        self.writelog(LogLevel::Warn, message);
382    }
383
384    /// Log a message at error level
385    pub fn error(&self, message: &str) {
386        self.writelog(LogLevel::Error, message);
387    }
388
389    /// Log a message at critical level
390    pub fn critical(&self, message: &str) {
391        self.writelog(LogLevel::Critical, message);
392    }
393
394    /// Create an enhanced progress tracker using the logger's context
395    pub fn track_progress(
396        &self,
397        description: &str,
398        total: u64,
399    ) -> progress::EnhancedProgressTracker {
400        use progress::{ProgressBuilder, ProgressStyle};
401
402        let builder = ProgressBuilder::new(description, total)
403            .style(ProgressStyle::DetailedBar)
404            .show_statistics(true);
405
406        let mut tracker = builder.build();
407
408        // Log the start of progress tracking
409        self.info(&format!("Starting progress tracking: {description}"));
410
411        tracker.start();
412        tracker
413    }
414
415    /// Log a message with progress update
416    pub fn info_with_progress(
417        &self,
418        message: &str,
419        progress: &mut progress::EnhancedProgressTracker,
420        update: u64,
421    ) {
422        self.info(message);
423        progress.update(update);
424    }
425
426    /// Execute an operation with progress tracking
427    pub fn with_progress<F, R>(&self, description: &str, total: u64, operation: F) -> R
428    where
429        F: FnOnce(&mut progress::EnhancedProgressTracker) -> R,
430    {
431        let mut progress = self.track_progress(description, total);
432        let result = operation(&mut progress);
433        progress.finish();
434
435        // Log completion
436        let stats = progress.stats();
437        self.info(&format!(
438            "Completed progress tracking: {description} - {elapsed:.1}s elapsed",
439            elapsed = stats.elapsed.as_secs_f64()
440        ));
441
442        result
443    }
444}
445
446/// Progress tracker for long-running operations
447pub struct ProgressTracker {
448    /// Operation name
449    name: String,
450    /// Total number of steps
451    total: usize,
452    /// Current progress
453    current: usize,
454    /// Start time
455    start_time: Instant,
456    /// Last update time
457    last_update: Instant,
458    /// Minimum time between progress updates
459    update_interval: Duration,
460    /// Associated logger
461    logger: Logger,
462}
463
464impl ProgressTracker {
465    /// Create a new progress tracker
466    pub fn new(name: &str, total: usize) -> Self {
467        let now = Instant::now();
468        let logger = Logger::new("progress").with_field("operation", name);
469
470        logger.info(&format!("Starting operation: {name}"));
471
472        Self {
473            name: name.to_string(),
474            total,
475            current: 0,
476            start_time: now,
477            last_update: now,
478            update_interval: Duration::from_millis(500), // Update at most every 500ms
479            logger,
480        }
481    }
482
483    /// Set the minimum interval between progress updates
484    pub fn set_update_interval(&mut self, interval: Duration) {
485        self.update_interval = interval;
486    }
487
488    /// Update the current progress
489    pub fn update(&mut self, current: usize) {
490        self.current = current;
491
492        let now = Instant::now();
493
494        // Only log an update if enough time has passed since the last update
495        if now.duration_since(self.last_update) >= self.update_interval {
496            self.last_update = now;
497
498            let elapsed = now.duration_since(self.start_time);
499            let percent = (self.current as f64 / self.total as f64) * 100.0;
500
501            let eta = if self.current > 0 {
502                let time_per_item = elapsed.as_secs_f64() / self.current as f64;
503                let remaining = time_per_item * (self.total - self.current) as f64;
504                format!("ETA: {remaining:.1}s")
505            } else {
506                "ETA: calculating...".to_string()
507            };
508
509            self.logger.debug(&format!(
510                "{name}: {current}/{total} ({percent:.1}%) - Elapsed: {elapsed:.1}s - {eta}",
511                name = self.name,
512                current = self.current,
513                total = self.total,
514                elapsed = elapsed.as_secs_f64()
515            ));
516        }
517    }
518
519    /// Mark the operation as complete
520    pub fn complete(&mut self) {
521        let elapsed = self.start_time.elapsed();
522        self.current = self.total;
523
524        self.logger.info(&format!(
525            "{name} completed: {total}/{total} (100%) - Total time: {elapsed:.1}s",
526            name = self.name,
527            total = self.total,
528            elapsed = elapsed.as_secs_f64()
529        ));
530    }
531
532    /// Get the current progress as a percentage
533    pub fn progress_percent(&self) -> f64 {
534        (self.current as f64 / self.total as f64) * 100.0
535    }
536
537    /// Get the elapsed time
538    pub fn elapsed(&self) -> Duration {
539        self.start_time.elapsed()
540    }
541
542    /// Estimate the remaining time
543    pub fn eta(&self) -> Option<Duration> {
544        if self.current == 0 {
545            return None;
546        }
547
548        let elapsed = self.start_time.elapsed();
549        let time_per_item = elapsed.as_secs_f64() / self.current as f64;
550        let remaining_secs = time_per_item * (self.total - self.current) as f64;
551
552        Some(Duration::from_secs_f64(remaining_secs))
553    }
554}
555
556/// Initialize the default logging system
557#[allow(dead_code)]
558pub fn init() {
559    // Register the default console handler if not already done
560    let handlers = LOG_HANDLERS.lock().expect("Operation failed");
561    if handlers.is_empty() {
562        drop(handlers);
563        resetlog_handlers();
564    }
565}
566
567/// Get a logger for the current module
568#[macro_export]
569macro_rules! getlogger {
570    () => {
571        $crate::logging::Logger::new(module_path!())
572    };
573    ($name:expr) => {
574        $crate::logging::Logger::new($name)
575    };
576}
577
578// # Distributed Logging and Adaptive Rate Limiting (Beta 2)
579//
580// This section provides advanced distributed logging capabilities with
581// aggregation, adaptive rate limiting, and multi-node coordination.
582
583/// Distributed logging capabilities for multi-node computations
584pub mod distributed {
585    use super::*;
586    use std::collections::{HashMap, VecDeque};
587    use std::fmt;
588    use std::sync::atomic::{AtomicU64, AtomicUsize, Ordering};
589    use std::sync::{Arc, Mutex, RwLock};
590    use std::thread;
591    use std::time::{Duration, Instant, SystemTime, UNIX_EPOCH};
592
593    /// Node identifier for distributed logging
594    #[derive(Debug, Clone, PartialEq, Eq, Hash)]
595    pub struct NodeId {
596        name: String,
597        instance_id: String,
598    }
599
600    impl NodeId {
601        /// Create a new node identifier
602        pub fn new(name: String, instanceid: String) -> Self {
603            Self {
604                name,
605                instance_id: instanceid,
606            }
607        }
608
609        /// Create from hostname and process ID
610        pub fn from_hostname() -> Self {
611            let hostname = std::env::var("HOSTNAME").unwrap_or_else(|_| "unknown".to_string());
612            let pid = std::process::id();
613            Self::new(hostname, pid.to_string())
614        }
615
616        /// Get node name
617        pub fn name(&self) -> &str {
618            &self.name
619        }
620
621        /// Get instance ID
622        pub fn instance_id(&self) -> &str {
623            &self.instance_id
624        }
625    }
626
627    impl fmt::Display for NodeId {
628        fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
629            write!(f, "{}:{}", self.name, self.instance_id)
630        }
631    }
632
633    /// Distributed log entry with metadata
634    #[derive(Debug, Clone)]
635    pub struct DistributedLogEntry {
636        /// Unique entry ID
637        pub id: u64,
638        /// Source node
639        #[allow(dead_code)]
640        pub nodeid: NodeId,
641        /// Timestamp (Unix epoch milliseconds)
642        pub timestamp: u64,
643        /// Log level
644        pub level: LogLevel,
645        /// Logger name
646        pub logger: String,
647        /// Message content
648        pub message: String,
649        /// Additional context fields
650        pub context: HashMap<String, String>,
651        /// Sequence number for ordering
652        pub sequence: u64,
653    }
654
655    impl DistributedLogEntry {
656        /// Create a new distributed log entry
657        pub fn new(
658            nodeid: NodeId,
659            level: LogLevel,
660            logger: String,
661            message: String,
662            context: HashMap<String, String>,
663        ) -> Self {
664            static ID_COUNTER: AtomicU64 = AtomicU64::new(1);
665            static SEQ_COUNTER: AtomicU64 = AtomicU64::new(1);
666
667            Self {
668                id: ID_COUNTER.fetch_add(1, Ordering::Relaxed),
669                nodeid,
670                timestamp: SystemTime::now()
671                    .duration_since(UNIX_EPOCH)
672                    .expect("Test: operation failed")
673                    .as_millis() as u64,
674                level,
675                logger,
676                message,
677                context,
678                sequence: SEQ_COUNTER.fetch_add(1, Ordering::Relaxed),
679            }
680        }
681
682        /// Get age of this log entry
683        pub fn age(&self) -> Duration {
684            let now = SystemTime::now()
685                .duration_since(UNIX_EPOCH)
686                .expect("Test: operation failed")
687                .as_millis() as u64;
688            Duration::from_millis(now.saturating_sub(self.timestamp))
689        }
690    }
691
692    /// Log aggregator that collects and processes distributed log entries
693    #[allow(dead_code)]
694    pub struct LogAggregator {
695        #[allow(dead_code)]
696        nodeid: NodeId,
697        entries: Arc<RwLock<VecDeque<DistributedLogEntry>>>,
698        max_entries: usize,
699        aggregation_window: Duration,
700        stats: Arc<RwLock<AggregationStats>>,
701    }
702
703    /// Statistics for log aggregation
704    #[derive(Debug, Clone, Default)]
705    pub struct AggregationStats {
706        pub total_entries: u64,
707        pub entries_by_level: HashMap<LogLevel, u64>,
708        pub entries_by_node: HashMap<NodeId, u64>,
709        pub dropped_entries: u64,
710        pub aggregation_windows: u64,
711    }
712
713    impl LogAggregator {
714        /// Create a new log aggregator
715        pub fn new(nodeid: NodeId, max_entries: usize, aggregationwindow: Duration) -> Self {
716            Self {
717                nodeid,
718                entries: Arc::new(RwLock::new(VecDeque::new())),
719                max_entries,
720                aggregation_window: aggregationwindow,
721                stats: Arc::new(RwLock::new(AggregationStats::default())),
722            }
723        }
724
725        /// Add a log entry to the aggregator
726        pub fn add_entry(&self, entry: DistributedLogEntry) {
727            let mut entries = self.entries.write().expect("Operation failed");
728            let mut stats = self.stats.write().expect("Operation failed");
729
730            // Remove old entries beyond the window
731            let cutoff = entry
732                .timestamp
733                .saturating_sub(self.aggregation_window.as_millis() as u64);
734            while let Some(front) = entries.front() {
735                if front.timestamp >= cutoff {
736                    break;
737                }
738                let removed = entries.pop_front().expect("Operation failed");
739                // Update stats for removed entry
740                if let Some(count) = stats.entries_by_level.get_mut(&removed.level) {
741                    *count = count.saturating_sub(1);
742                }
743                if let Some(count) = stats.entries_by_node.get_mut(&removed.nodeid) {
744                    *count = count.saturating_sub(1);
745                }
746            }
747
748            // Add new entry
749            if entries.len() >= self.max_entries {
750                if let Some(removed) = entries.pop_front() {
751                    stats.dropped_entries += 1;
752                    // Update stats for dropped entry
753                    if let Some(count) = stats.entries_by_level.get_mut(&removed.level) {
754                        *count = count.saturating_sub(1);
755                    }
756                    if let Some(count) = stats.entries_by_node.get_mut(&removed.nodeid) {
757                        *count = count.saturating_sub(1);
758                    }
759                }
760            }
761
762            // Update stats for new entry
763            stats.total_entries += 1;
764            *stats.entries_by_level.entry(entry.level).or_insert(0) += 1;
765            *stats
766                .entries_by_node
767                .entry(entry.nodeid.clone())
768                .or_insert(0) += 1;
769
770            entries.push_back(entry);
771        }
772
773        /// Get all entries within the aggregation window
774        pub fn get_entries(&self) -> Vec<DistributedLogEntry> {
775            self.entries
776                .read()
777                .expect("Operation failed")
778                .iter()
779                .cloned()
780                .collect()
781        }
782
783        /// Get entries filtered by level
784        pub fn get_entries_by_level(&self, level: LogLevel) -> Vec<DistributedLogEntry> {
785            self.entries
786                .read()
787                .expect("Test: operation failed")
788                .iter()
789                .filter(|entry| entry.level == level)
790                .cloned()
791                .collect()
792        }
793
794        /// Get entries from specific node
795        pub fn get_entries_by_node(&self, nodeid: &NodeId) -> Vec<DistributedLogEntry> {
796            self.entries
797                .read()
798                .expect("Test: operation failed")
799                .iter()
800                .filter(|entry| &entry.nodeid == nodeid)
801                .cloned()
802                .collect()
803        }
804
805        /// Get aggregation statistics
806        pub fn stats(&self) -> AggregationStats {
807            self.stats.read().expect("Operation failed").clone()
808        }
809
810        /// Clear all entries
811        pub fn clear(&self) {
812            self.entries.write().expect("Operation failed").clear();
813            *self.stats.write().expect("Operation failed") = AggregationStats::default();
814        }
815    }
816
817    /// Adaptive rate limiter for high-frequency logging
818    pub struct AdaptiveRateLimiter {
819        max_rate: Arc<Mutex<f64>>, // Maximum messages per second
820        current_rate: Arc<Mutex<f64>>,
821        last_reset: Arc<Mutex<Instant>>,
822        message_count: Arc<AtomicUsize>,
823        window_duration: Duration,
824        adaptation_factor: f64,
825        min_rate: f64,
826        max_rate_absolute: f64,
827    }
828
829    impl AdaptiveRateLimiter {
830        /// Create a new adaptive rate limiter
831        pub fn new(
832            initial_max_rate: f64,
833            window_duration: Duration,
834            adaptation_factor: f64,
835        ) -> Self {
836            Self {
837                max_rate: Arc::new(Mutex::new(initial_max_rate)),
838                current_rate: Arc::new(Mutex::new(0.0)),
839                last_reset: Arc::new(Mutex::new(Instant::now())),
840                message_count: Arc::new(AtomicUsize::new(0)),
841                window_duration,
842                adaptation_factor,
843                min_rate: initial_max_rate * 0.1, // 10% of initial rate
844                max_rate_absolute: initial_max_rate * 10.0, // 10x initial rate
845            }
846        }
847
848        /// Check if a message should be allowed through
849        pub fn try_acquire(&self) -> bool {
850            let now = Instant::now();
851            let count = self.message_count.fetch_add(1, Ordering::Relaxed);
852
853            let mut last_reset = self.last_reset.lock().expect("Operation failed");
854            let elapsed = now.duration_since(*last_reset);
855
856            if elapsed >= self.window_duration {
857                // Reset window and update current rate
858                let actual_rate = count as f64 / elapsed.as_secs_f64();
859                {
860                    let mut current_rate = self.current_rate.lock().expect("Operation failed");
861                    *current_rate = actual_rate;
862                }
863
864                self.message_count.store(0, Ordering::Relaxed);
865                *last_reset = now;
866
867                // Adapt max rate based on actual usage
868                self.adapt_rate(actual_rate);
869
870                true // Allow message at window boundary
871            } else {
872                // Check if current rate exceeds limit
873                let elapsed_secs = elapsed.as_secs_f64();
874                if elapsed_secs < 0.001 {
875                    // For very short durations, allow the message
876                    true
877                } else {
878                    let current_rate = count as f64 / elapsed_secs;
879                    let max_rate = *self.max_rate.lock().expect("Operation failed");
880                    current_rate <= max_rate
881                }
882            }
883        }
884
885        /// Adapt the maximum rate based on observed patterns
886        fn adapt_rate(&self, actualrate: f64) {
887            let mut max_rate = self.max_rate.lock().expect("Operation failed");
888
889            // If actual rate is consistently lower, reduce max rate
890            // If actual rate hits the limit, increase max rate
891            if actualrate < *max_rate * 0.5 {
892                // Reduce max rate
893                *max_rate = (*max_rate * (1.0 - self.adaptation_factor)).max(self.min_rate);
894            } else if actualrate >= *max_rate * 0.9 {
895                // Increase max rate
896                *max_rate =
897                    (*max_rate * (1.0 + self.adaptation_factor)).min(self.max_rate_absolute);
898            }
899        }
900
901        /// Get current rate statistics
902        pub fn get_stats(&self) -> RateLimitStats {
903            let current_rate = *self.current_rate.lock().expect("Operation failed");
904            let max_rate = *self.max_rate.lock().expect("Operation failed");
905            RateLimitStats {
906                current_rate,
907                max_rate,
908                message_count: self.message_count.load(Ordering::Relaxed),
909                window_duration: self.window_duration,
910            }
911        }
912
913        /// Reset the rate limiter
914        pub fn reset(&self) {
915            *self.current_rate.lock().expect("Operation failed") = 0.0;
916            *self.last_reset.lock().expect("Operation failed") = Instant::now();
917            self.message_count.store(0, Ordering::Relaxed);
918        }
919    }
920
921    /// Rate limiting statistics
922    #[derive(Debug, Clone)]
923    pub struct RateLimitStats {
924        pub current_rate: f64,
925        pub max_rate: f64,
926        pub message_count: usize,
927        pub window_duration: Duration,
928    }
929
930    /// Distributed logger that coordinates with multiple nodes
931    pub struct DistributedLogger {
932        #[allow(dead_code)]
933        nodeid: NodeId,
934        locallogger: Logger,
935        aggregator: Arc<LogAggregator>,
936        rate_limiters: Arc<RwLock<HashMap<String, AdaptiveRateLimiter>>>,
937        default_rate_limit: f64,
938    }
939
940    impl DistributedLogger {
941        /// Create a new distributed logger
942        pub fn new(
943            logger_name: &str,
944            nodeid: NodeId,
945            max_entries: usize,
946            aggregation_window: Duration,
947            default_rate_limit: f64,
948        ) -> Self {
949            let locallogger = Logger::new(logger_name);
950            let aggregator = Arc::new(LogAggregator::new(
951                nodeid.clone(),
952                max_entries,
953                aggregation_window,
954            ));
955
956            Self {
957                nodeid,
958                locallogger,
959                aggregator,
960                rate_limiters: Arc::new(RwLock::new(HashMap::new())),
961                default_rate_limit,
962            }
963        }
964
965        /// Log a message with adaptive rate limiting
966        pub fn log_adaptive(
967            &self,
968            level: LogLevel,
969            message: &str,
970            context: Option<HashMap<String, String>>,
971        ) {
972            let logger_key = self.locallogger.module.clone();
973
974            // Get or create rate limiter for this logger
975            let shouldlog = {
976                let rate_limiters = self.rate_limiters.read().expect("Operation failed");
977                if let Some(limiter) = rate_limiters.get(&logger_key) {
978                    limiter.try_acquire()
979                } else {
980                    drop(rate_limiters);
981
982                    // Create new rate limiter
983                    let mut rate_limiters = self.rate_limiters.write().expect("Operation failed");
984                    let limiter = AdaptiveRateLimiter::new(
985                        self.default_rate_limit,
986                        Duration::from_secs(1),
987                        0.1, // 10% adaptation factor
988                    );
989                    let shouldlog = limiter.try_acquire();
990                    rate_limiters.insert(logger_key, limiter);
991                    shouldlog
992                }
993            };
994
995            if shouldlog {
996                // Log locally
997                self.locallogger.writelog(level, message);
998
999                // Create distributed log entry
1000                let entry = DistributedLogEntry::new(
1001                    self.nodeid.clone(),
1002                    level,
1003                    self.locallogger.module.clone(),
1004                    message.to_string(),
1005                    context.unwrap_or_default(),
1006                );
1007
1008                // Add to aggregator
1009                self.aggregator.add_entry(entry);
1010            }
1011        }
1012
1013        /// Convenience methods for different log levels
1014        pub fn error_adaptive(&self, message: &str) {
1015            self.log_adaptive(LogLevel::Error, message, None);
1016        }
1017
1018        pub fn warn_adaptive(&self, message: &str) {
1019            self.log_adaptive(LogLevel::Warn, message, None);
1020        }
1021
1022        pub fn info_adaptive(&self, message: &str) {
1023            self.log_adaptive(LogLevel::Info, message, None);
1024        }
1025
1026        pub fn debug_adaptive(&self, message: &str) {
1027            self.log_adaptive(LogLevel::Debug, message, None);
1028        }
1029
1030        /// Get aggregated log entries
1031        pub fn get_aggregatedlogs(&self) -> Vec<DistributedLogEntry> {
1032            self.aggregator.get_entries()
1033        }
1034
1035        /// Get rate limiting statistics for all loggers
1036        pub fn get_rate_stats(&self) -> HashMap<String, RateLimitStats> {
1037            self.rate_limiters
1038                .read()
1039                .expect("Test: operation failed")
1040                .iter()
1041                .map(|(k, v)| (k.clone(), v.get_stats()))
1042                .collect()
1043        }
1044
1045        /// Get aggregation statistics
1046        pub fn get_aggregation_stats(&self) -> AggregationStats {
1047            self.aggregator.stats()
1048        }
1049
1050        /// Export logs to JSON format
1051        pub fn exportlogs_json(&self) -> Result<String, Box<dyn std::error::Error>> {
1052            let entries = self.get_aggregatedlogs();
1053            let stats = self.get_aggregation_stats();
1054
1055            let export_data = serde_json::json!({
1056                "nodeid": self.nodeid.to_string(),
1057                "timestamp": SystemTime::now()
1058                    .duration_since(UNIX_EPOCH)
1059                    .expect("Test: operation failed")
1060                    .as_millis(),
1061                "stats": {
1062                    "total_entries": stats.total_entries,
1063                    "dropped_entries": stats.dropped_entries,
1064                    "aggregation_windows": stats.aggregation_windows
1065                },
1066                "entries": entries.iter().map(|entry| serde_json::json!({
1067                    "id": entry.id,
1068                    "nodeid": entry.nodeid.to_string(),
1069                    "timestamp": entry.timestamp,
1070                    "level": format!("{0:?}", entry.level),
1071                    "logger": entry.logger,
1072                    "message": entry.message,
1073                    "context": entry.context,
1074                    "sequence": entry.sequence
1075                })).collect::<Vec<_>>()
1076            });
1077
1078            Ok(serde_json::to_string_pretty(&export_data)?)
1079        }
1080
1081        /// Clear all aggregated data
1082        pub fn clear_aggregated_data(&self) {
1083            self.aggregator.clear();
1084
1085            // Reset rate limiters
1086            let rate_limiters = self.rate_limiters.write().expect("Operation failed");
1087            for limiter in rate_limiters.values() {
1088                limiter.reset();
1089            }
1090        }
1091    }
1092
1093    /// Multi-node log coordinator for distributed systems
1094    pub struct MultiNodeCoordinator {
1095        nodes: Arc<RwLock<HashMap<NodeId, Arc<DistributedLogger>>>>,
1096        global_aggregator: Arc<LogAggregator>,
1097        coordination_interval: Duration,
1098        running: Arc<AtomicUsize>, // 0 = stopped, 1 = running
1099    }
1100
1101    impl MultiNodeCoordinator {
1102        /// Create a new multi-node coordinator
1103        pub fn new(coordinationinterval: Duration) -> Self {
1104            let global_node = NodeId::new("global".to_string(), "coordinator".to_string());
1105            let global_aggregator = Arc::new(LogAggregator::new(
1106                global_node,
1107                100000,                    // Large capacity for global aggregation
1108                Duration::from_secs(3600), // 1 hour window
1109            ));
1110
1111            Self {
1112                nodes: Arc::new(RwLock::new(HashMap::new())),
1113                global_aggregator,
1114                coordination_interval: coordinationinterval,
1115                running: Arc::new(AtomicUsize::new(0)),
1116            }
1117        }
1118
1119        /// Register a distributed logger
1120        pub fn register_node(&self, nodeid: NodeId, logger: Arc<DistributedLogger>) {
1121            let mut nodes = self.nodes.write().expect("Operation failed");
1122            nodes.insert(nodeid, logger);
1123        }
1124
1125        /// Unregister a node
1126        pub fn unregister_node(&self, nodeid: &NodeId) {
1127            let mut nodes = self.nodes.write().expect("Operation failed");
1128            nodes.remove(nodeid);
1129        }
1130
1131        /// Start coordination process
1132        pub fn start(&self) {
1133            if self
1134                .running
1135                .compare_exchange(0, 1, Ordering::Relaxed, Ordering::Relaxed)
1136                .is_ok()
1137            {
1138                let nodes = self.nodes.clone();
1139                let global_aggregator = self.global_aggregator.clone();
1140                let interval = self.coordination_interval;
1141                let running = self.running.clone();
1142
1143                thread::spawn(move || {
1144                    while running.load(Ordering::Relaxed) == 1 {
1145                        // Collect logs from all nodes
1146                        let nodes_guard = nodes.read().expect("Operation failed");
1147                        for logger in nodes_guard.values() {
1148                            let entries = logger.get_aggregatedlogs();
1149                            for entry in entries {
1150                                global_aggregator.add_entry(entry);
1151                            }
1152                        }
1153                        drop(nodes_guard);
1154
1155                        thread::sleep(interval);
1156                    }
1157                });
1158            }
1159        }
1160
1161        /// Stop coordination process
1162        pub fn stop(&self) {
1163            self.running.store(0, Ordering::Relaxed);
1164        }
1165
1166        /// Get global aggregated statistics
1167        pub fn get_global_stats(&self) -> AggregationStats {
1168            self.global_aggregator.stats()
1169        }
1170
1171        /// Get all global log entries
1172        pub fn get_global_entries(&self) -> Vec<DistributedLogEntry> {
1173            self.global_aggregator.get_entries()
1174        }
1175
1176        /// Export global logs to JSON
1177        pub fn export_globallogs_json(&self) -> Result<String, Box<dyn std::error::Error>> {
1178            let entries = self.get_global_entries();
1179            let stats = self.get_global_stats();
1180
1181            let export_data = serde_json::json!({
1182                "coordinator": "global",
1183                "timestamp": SystemTime::now()
1184                    .duration_since(UNIX_EPOCH)
1185                    .expect("Test: operation failed")
1186                    .as_millis(),
1187                "stats": {
1188                    "total_entries": stats.total_entries,
1189                    "dropped_entries": stats.dropped_entries,
1190                    "nodes_count": self.nodes.read().expect("Operation failed").len(),
1191                    "entries_by_level": stats.entries_by_level.iter().map(|(k, v)| (format!("{k:?}"), *v)).collect::<HashMap<String, u64>>()
1192                },
1193                "entries": entries.iter().map(|entry| serde_json::json!({
1194                    "id": entry.id,
1195                    "nodeid": entry.nodeid.to_string(),
1196                    "timestamp": entry.timestamp,
1197                    "level": format!("{0:?}", entry.level),
1198                    "logger": entry.logger,
1199                    "message": entry.message,
1200                    "context": entry.context,
1201                    "sequence": entry.sequence
1202                })).collect::<Vec<_>>()
1203            });
1204
1205            Ok(serde_json::to_string_pretty(&export_data)?)
1206        }
1207    }
1208
1209    impl Drop for MultiNodeCoordinator {
1210        fn drop(&mut self) {
1211            self.stop();
1212        }
1213    }
1214}
1215
1216#[cfg(test)]
1217mod distributed_tests {
1218    use super::distributed::*;
1219    use super::*;
1220    use std::time::Duration;
1221
1222    #[test]
1223    fn test_nodeid_creation() {
1224        let node = NodeId::new("worker1".to_string(), "pid123".to_string());
1225        assert_eq!(node.name(), "worker1");
1226        assert_eq!(node.instance_id(), "pid123");
1227        assert_eq!(node.to_string(), "worker1:pid123");
1228    }
1229
1230    #[test]
1231    fn testlog_aggregator() {
1232        let nodeid = NodeId::new("test_node".to_string(), 1.to_string());
1233        let aggregator = LogAggregator::new(nodeid.clone(), 100, Duration::from_secs(60));
1234
1235        let entry = DistributedLogEntry::new(
1236            nodeid,
1237            LogLevel::Info,
1238            "testlogger".to_string(),
1239            "Test message".to_string(),
1240            HashMap::new(),
1241        );
1242
1243        aggregator.add_entry(entry);
1244
1245        let entries = aggregator.get_entries();
1246        assert_eq!(entries.len(), 1);
1247        assert_eq!(entries[0].message, "Test message");
1248
1249        let stats = aggregator.stats();
1250        assert_eq!(stats.total_entries, 1);
1251    }
1252
1253    #[test]
1254    fn test_adaptive_rate_limiter() {
1255        let limiter = AdaptiveRateLimiter::new(10.0, Duration::from_millis(100), 0.1);
1256
1257        // Should allow initial messages
1258        assert!(limiter.try_acquire());
1259        assert!(limiter.try_acquire());
1260
1261        let stats = limiter.get_stats();
1262        assert!(stats.current_rate >= 0.0);
1263        assert_eq!(stats.max_rate, 10.0);
1264    }
1265
1266    #[test]
1267    fn test_distributedlogger() {
1268        let nodeid = NodeId::new("test_node".to_string(), 1.to_string());
1269        let logger =
1270            DistributedLogger::new("testlogger", nodeid, 1000, Duration::from_secs(60), 100.0);
1271
1272        logger.info_adaptive("Test message 1");
1273        logger.warn_adaptive("Test message 2");
1274
1275        let entries = logger.get_aggregatedlogs();
1276        assert!(!entries.is_empty()); // At least one message should go through
1277
1278        let stats = logger.get_aggregation_stats();
1279        assert!(stats.total_entries >= 1);
1280    }
1281
1282    #[test]
1283    fn test_multi_node_coordinator() {
1284        let coordinator = MultiNodeCoordinator::new(Duration::from_millis(10));
1285
1286        let node1_id = NodeId::new("node1".to_string(), "1".to_string());
1287        let node1logger = Arc::new(DistributedLogger::new(
1288            "node1logger",
1289            node1_id.clone(),
1290            100,
1291            Duration::from_secs(10),
1292            50.0,
1293        ));
1294
1295        coordinator.register_node(node1_id, node1logger);
1296
1297        // Start coordination
1298        coordinator.start();
1299
1300        // Let it run briefly
1301        std::thread::sleep(Duration::from_millis(50));
1302
1303        coordinator.stop();
1304
1305        let stats = coordinator.get_global_stats();
1306        // Should have basic structure even if no messages
1307        // Note: total_entries is u64 so always >= 0, just check it exists
1308        let _ = stats.total_entries;
1309    }
1310
1311    #[test]
1312    fn testlog_export() {
1313        let nodeid = NodeId::new("export_test".to_string(), 1.to_string());
1314        let logger =
1315            DistributedLogger::new("exportlogger", nodeid, 100, Duration::from_secs(60), 100.0);
1316
1317        logger.info_adaptive("Export test message");
1318
1319        let json_export = logger.exportlogs_json().expect("Operation failed");
1320        assert!(json_export.contains("export_test"));
1321        assert!(json_export.contains("Export test message"));
1322    }
1323}