Skip to main content

scirs2_core/
logging.rs

1//! # Logging and Diagnostics
2//!
3//! This module provides structured logging and diagnostics utilities for scientific computing.
4//!
5//! ## Features
6//!
7//! * Structured logging for scientific computing
8//! * Enhanced progress tracking with multiple visualization styles
9//! * Performance metrics collection
10//! * Log filtering and formatting
11//! * Multi-progress tracking for parallel operations
12//! * Adaptive update rates and predictive ETA calculations
13//!
14//! ## Usage
15//!
16//! ```rust,no_run
17//! use scirs2_core::logging::{Logger, LogLevel, ProgressTracker};
18//! use scirs2_core::logging::progress::{ProgressBuilder, ProgressStyle};
19//!
20//! // Create a logger
21//! let logger = Logger::new("matrix_operations");
22//!
23//! // Log messages at different levels
24//! logger.info("Starting matrix multiplication");
25//! logger.debug("Using algorithm: Standard");
26//!
27//! // Create an enhanced progress tracker
28//! let mut progress = ProgressBuilder::new("Matrix multiplication", 1000)
29//!     .style(ProgressStyle::DetailedBar)
30//!     .show_statistics(true)
31//!     .build();
32//!
33//! progress.start();
34//!
35//! for i in 0..1000 {
36//!     // Perform computation
37//!
38//!     // Update progress
39//!     progress.update(i + 1);
40//!
41//!     // Log intermediate results at low frequency to avoid flooding logs
42//!     if i % 100 == 0 {
43//!         logger.debug(&format!("Completed {}/1000 iterations", i + 1));
44//!     }
45//! }
46//!
47//! // Complete the progress tracking
48//! progress.finish();
49//!
50//! logger.info("Matrix multiplication completed");
51//! ```
52
53use once_cell::sync::Lazy;
54use std::collections::HashMap;
55use std::fmt::Display;
56use std::sync::{Arc, Mutex};
57use std::time::{Duration, Instant};
58
59// Enhanced progress tracking module
60pub mod progress;
61
62/// Smart rate limiting for high-frequency log events
63pub mod rate_limiting;
64
65/// Integration bridge with the `log` crate facade, timing macros, and memory tracking
66pub mod bridge;
67
68/// Log level enumeration
69#[derive(Debug, Clone, Copy, PartialEq, Eq, PartialOrd, Ord, Hash)]
70pub enum LogLevel {
71    /// Trace level for detailed debugging
72    Trace = 0,
73    /// Debug level for debugging information
74    Debug = 1,
75    /// Info level for general information
76    Info = 2,
77    /// Warning level for potential issues
78    Warn = 3,
79    /// Error level for error conditions
80    Error = 4,
81    /// Critical level for critical errors
82    Critical = 5,
83}
84
85impl LogLevel {
86    /// Convert a log level to a string
87    pub const fn as_str(&self) -> &'static str {
88        match self {
89            LogLevel::Trace => "TRACE",
90            LogLevel::Debug => "DEBUG",
91            LogLevel::Info => "INFO",
92            LogLevel::Warn => "WARN",
93            LogLevel::Error => "ERROR",
94            LogLevel::Critical => "CRITICAL",
95        }
96    }
97}
98
99/// Structured log entry
100#[derive(Debug, Clone)]
101pub struct LogEntry {
102    /// Timestamp of the log entry
103    pub timestamp: std::time::SystemTime,
104    /// Log level
105    pub level: LogLevel,
106    /// Module or component name
107    pub module: String,
108    /// Log message
109    pub message: String,
110    /// Additional context fields
111    pub fields: HashMap<String, String>,
112}
113
114/// Logger configuration
115#[derive(Debug, Clone)]
116pub struct LoggerConfig {
117    /// Minimum log level to record
118    pub min_level: LogLevel,
119    /// Enable/disable timestamps
120    pub show_timestamps: bool,
121    /// Enable/disable module names
122    pub show_modules: bool,
123    /// Module-specific log levels
124    pub module_levels: HashMap<String, LogLevel>,
125}
126
127impl Default for LoggerConfig {
128    fn default() -> Self {
129        Self {
130            min_level: LogLevel::Info,
131            show_timestamps: true,
132            show_modules: true,
133            module_levels: HashMap::new(),
134        }
135    }
136}
137
138/// Global logger configuration
139static LOGGER_CONFIG: Lazy<Mutex<LoggerConfig>> = Lazy::new(|| Mutex::new(LoggerConfig::default()));
140
141/// Configure the global logger
142#[allow(dead_code)]
143pub fn configurelogger(config: LoggerConfig) {
144    let mut global_config = LOGGER_CONFIG.lock().expect("Operation failed");
145    *global_config = config;
146}
147
148/// Set the global minimum log level
149#[allow(dead_code)]
150pub fn set_level(level: LogLevel) {
151    let mut config = LOGGER_CONFIG.lock().expect("Operation failed");
152    config.min_level = level;
153}
154
155/// Set a module-specific log level
156#[allow(dead_code)]
157pub fn set_module_level(module: &str, level: LogLevel) {
158    let mut config = LOGGER_CONFIG.lock().expect("Operation failed");
159    config.module_levels.insert(module.to_string(), level);
160}
161
162/// Handler trait for processing log entries
163pub trait LogHandler: Send + Sync {
164    /// Handle a log entry
165    fn handle(&self, entry: &LogEntry);
166}
167
168/// Console log handler
169pub struct ConsoleLogHandler {
170    /// Format string for log entries
171    pub format: String,
172}
173
174impl Default for ConsoleLogHandler {
175    fn default() -> Self {
176        Self {
177            format: "[{level}] {module}: {message}".to_string(),
178        }
179    }
180}
181
182impl LogHandler for ConsoleLogHandler {
183    fn handle(&self, entry: &LogEntry) {
184        let mut output = self.format.clone();
185
186        // Replace placeholders in the format string
187        output = output.replace("{level}", entry.level.as_str());
188        output = output.replace("{module}", &entry.module);
189        output = output.replace("{message}", &entry.message);
190
191        if self.format.contains("{timestamp}") {
192            let datetime = chrono::DateTime::<chrono::Utc>::from(entry.timestamp);
193            output = output.replace(
194                "{timestamp}",
195                &datetime.format("%Y-%m-%d %H:%M:%S%.3f").to_string(),
196            );
197        }
198
199        if self.format.contains("{fields}") {
200            let fields_str = entry
201                .fields
202                .iter()
203                .map(|(k, v)| format!("{k}={v}"))
204                .collect::<Vec<_>>()
205                .join(", ");
206            output = output.replace("{fields}", &fields_str);
207        }
208
209        // Print to the appropriate output stream based on level
210        match entry.level {
211            LogLevel::Error | LogLevel::Critical => eprintln!("{output}"),
212            _ => println!("{output}"),
213        }
214    }
215}
216
217/// File log handler
218pub struct FileLogHandler {
219    /// Path to the log file
220    pub file_path: String,
221    /// Format string for log entries
222    pub format: String,
223}
224
225impl LogHandler for FileLogHandler {
226    fn handle(&self, entry: &LogEntry) {
227        // This is a simplified implementation
228        // A real implementation would handle file I/O more efficiently
229
230        let mut output = self.format.clone();
231
232        // Replace placeholders in the format string
233        output = output.replace("{level}", entry.level.as_str());
234        output = output.replace("{module}", &entry.module);
235        output = output.replace("{message}", &entry.message);
236
237        if self.format.contains("{timestamp}") {
238            let datetime = chrono::DateTime::<chrono::Utc>::from(entry.timestamp);
239            output = output.replace(
240                "{timestamp}",
241                &datetime.format("%Y-%m-%d %H:%M:%S%.3f").to_string(),
242            );
243        }
244
245        if self.format.contains("{fields}") {
246            let fields_str = entry
247                .fields
248                .iter()
249                .map(|(k, v)| format!("{k}={v}"))
250                .collect::<Vec<_>>()
251                .join(", ");
252            output = output.replace("{fields}", &fields_str);
253        }
254
255        // Append to the log file
256        // This would use proper error handling and buffering in a real implementation
257        if let Ok(mut file) = std::fs::OpenOptions::new()
258            .create(true)
259            .append(true)
260            .open(&self.file_path)
261        {
262            use std::io::Write;
263            let _ = writeln!(file, "{output}");
264        }
265    }
266}
267
268/// Global log handlers
269static LOG_HANDLERS: Lazy<Mutex<Vec<Arc<dyn LogHandler>>>> = Lazy::new(|| {
270    let console_handler = Arc::new(ConsoleLogHandler::default());
271    Mutex::new(vec![console_handler])
272});
273
274/// Register a log handler
275#[allow(dead_code)]
276pub fn set_handler(handler: Arc<dyn LogHandler>) {
277    let mut handlers = LOG_HANDLERS.lock().expect("Operation failed");
278    handlers.push(handler);
279}
280
281/// Clear all log handlers
282#[allow(dead_code)]
283pub fn clearlog_handlers() {
284    let mut handlers = LOG_HANDLERS.lock().expect("Operation failed");
285    handlers.clear();
286}
287
288/// Reset log handlers to the default configuration
289#[allow(dead_code)]
290pub fn resetlog_handlers() {
291    let mut handlers = LOG_HANDLERS.lock().expect("Operation failed");
292    handlers.clear();
293    handlers.push(Arc::new(ConsoleLogHandler::default()));
294}
295
296/// Logger for a specific module
297#[derive(Clone)]
298pub struct Logger {
299    /// Module name
300    module: String,
301    /// Additional context fields
302    fields: HashMap<String, String>,
303}
304
305impl Logger {
306    /// Create a new logger for the specified module
307    pub fn new(module: &str) -> Self {
308        Self {
309            module: module.to_string(),
310            fields: HashMap::new(),
311        }
312    }
313
314    /// Add a context field to the logger
315    pub fn with_field<K, V>(mut self, key: K, value: V) -> Self
316    where
317        K: Into<String>,
318        V: Display,
319    {
320        self.fields.insert(key.into(), format!("{value}"));
321        self
322    }
323
324    /// Add multiple context fields to the logger
325    pub fn with_fields<K, V, I>(mut self, fields: I) -> Self
326    where
327        K: Into<String>,
328        V: Display,
329        I: IntoIterator<Item = (K, V)>,
330    {
331        for (key, value) in fields {
332            self.fields.insert(key.into(), format!("{value}"));
333        }
334        self
335    }
336
337    /// Log a message at a specific level
338    pub fn writelog(&self, level: LogLevel, message: &str) {
339        // Check if this log should be processed based on configuration
340        let config = LOGGER_CONFIG.lock().expect("Operation failed");
341        let module_level = config
342            .module_levels
343            .get(&self.module)
344            .copied()
345            .unwrap_or(config.min_level);
346
347        if level < module_level {
348            return;
349        }
350
351        // Create the log entry
352        let entry = LogEntry {
353            timestamp: std::time::SystemTime::now(),
354            level,
355            module: self.module.clone(),
356            message: message.to_string(),
357            fields: self.fields.clone(),
358        };
359
360        // Process the log entry with all registered handlers
361        let handlers = LOG_HANDLERS.lock().expect("Operation failed");
362        for handler in handlers.iter() {
363            handler.handle(&entry);
364        }
365    }
366
367    /// Log a message at trace level
368    pub fn trace(&self, message: &str) {
369        self.writelog(LogLevel::Trace, message);
370    }
371
372    /// Log a message at debug level
373    pub fn debug(&self, message: &str) {
374        self.writelog(LogLevel::Debug, message);
375    }
376
377    /// Log a message at info level
378    pub fn info(&self, message: &str) {
379        self.writelog(LogLevel::Info, message);
380    }
381
382    /// Log a message at warning level
383    pub fn warn(&self, message: &str) {
384        self.writelog(LogLevel::Warn, message);
385    }
386
387    /// Log a message at error level
388    pub fn error(&self, message: &str) {
389        self.writelog(LogLevel::Error, message);
390    }
391
392    /// Log a message at critical level
393    pub fn critical(&self, message: &str) {
394        self.writelog(LogLevel::Critical, message);
395    }
396
397    /// Create an enhanced progress tracker using the logger's context
398    pub fn track_progress(
399        &self,
400        description: &str,
401        total: u64,
402    ) -> progress::EnhancedProgressTracker {
403        use progress::{ProgressBuilder, ProgressStyle};
404
405        let builder = ProgressBuilder::new(description, total)
406            .style(ProgressStyle::DetailedBar)
407            .show_statistics(true);
408
409        let mut tracker = builder.build();
410
411        // Log the start of progress tracking
412        self.info(&format!("Starting progress tracking: {description}"));
413
414        tracker.start();
415        tracker
416    }
417
418    /// Log a message with progress update
419    pub fn info_with_progress(
420        &self,
421        message: &str,
422        progress: &mut progress::EnhancedProgressTracker,
423        update: u64,
424    ) {
425        self.info(message);
426        progress.update(update);
427    }
428
429    /// Execute an operation with progress tracking
430    pub fn with_progress<F, R>(&self, description: &str, total: u64, operation: F) -> R
431    where
432        F: FnOnce(&mut progress::EnhancedProgressTracker) -> R,
433    {
434        let mut progress = self.track_progress(description, total);
435        let result = operation(&mut progress);
436        progress.finish();
437
438        // Log completion
439        let stats = progress.stats();
440        self.info(&format!(
441            "Completed progress tracking: {description} - {elapsed:.1}s elapsed",
442            elapsed = stats.elapsed.as_secs_f64()
443        ));
444
445        result
446    }
447}
448
449/// Progress tracker for long-running operations
450pub struct ProgressTracker {
451    /// Operation name
452    name: String,
453    /// Total number of steps
454    total: usize,
455    /// Current progress
456    current: usize,
457    /// Start time
458    start_time: Instant,
459    /// Last update time
460    last_update: Instant,
461    /// Minimum time between progress updates
462    update_interval: Duration,
463    /// Associated logger
464    logger: Logger,
465}
466
467impl ProgressTracker {
468    /// Create a new progress tracker
469    pub fn new(name: &str, total: usize) -> Self {
470        let now = Instant::now();
471        let logger = Logger::new("progress").with_field("operation", name);
472
473        logger.info(&format!("Starting operation: {name}"));
474
475        Self {
476            name: name.to_string(),
477            total,
478            current: 0,
479            start_time: now,
480            last_update: now,
481            update_interval: Duration::from_millis(500), // Update at most every 500ms
482            logger,
483        }
484    }
485
486    /// Set the minimum interval between progress updates
487    pub fn set_update_interval(&mut self, interval: Duration) {
488        self.update_interval = interval;
489    }
490
491    /// Update the current progress
492    pub fn update(&mut self, current: usize) {
493        self.current = current;
494
495        let now = Instant::now();
496
497        // Only log an update if enough time has passed since the last update
498        if now.duration_since(self.last_update) >= self.update_interval {
499            self.last_update = now;
500
501            let elapsed = now.duration_since(self.start_time);
502            let percent = (self.current as f64 / self.total as f64) * 100.0;
503
504            let eta = if self.current > 0 {
505                let time_per_item = elapsed.as_secs_f64() / self.current as f64;
506                let remaining = time_per_item * (self.total - self.current) as f64;
507                format!("ETA: {remaining:.1}s")
508            } else {
509                "ETA: calculating...".to_string()
510            };
511
512            self.logger.debug(&format!(
513                "{name}: {current}/{total} ({percent:.1}%) - Elapsed: {elapsed:.1}s - {eta}",
514                name = self.name,
515                current = self.current,
516                total = self.total,
517                elapsed = elapsed.as_secs_f64()
518            ));
519        }
520    }
521
522    /// Mark the operation as complete
523    pub fn complete(&mut self) {
524        let elapsed = self.start_time.elapsed();
525        self.current = self.total;
526
527        self.logger.info(&format!(
528            "{name} completed: {total}/{total} (100%) - Total time: {elapsed:.1}s",
529            name = self.name,
530            total = self.total,
531            elapsed = elapsed.as_secs_f64()
532        ));
533    }
534
535    /// Get the current progress as a percentage
536    pub fn progress_percent(&self) -> f64 {
537        (self.current as f64 / self.total as f64) * 100.0
538    }
539
540    /// Get the elapsed time
541    pub fn elapsed(&self) -> Duration {
542        self.start_time.elapsed()
543    }
544
545    /// Estimate the remaining time
546    pub fn eta(&self) -> Option<Duration> {
547        if self.current == 0 {
548            return None;
549        }
550
551        let elapsed = self.start_time.elapsed();
552        let time_per_item = elapsed.as_secs_f64() / self.current as f64;
553        let remaining_secs = time_per_item * (self.total - self.current) as f64;
554
555        Some(Duration::from_secs_f64(remaining_secs))
556    }
557}
558
559/// Initialize the default logging system
560#[allow(dead_code)]
561pub fn init() {
562    // Register the default console handler if not already done
563    let handlers = LOG_HANDLERS.lock().expect("Operation failed");
564    if handlers.is_empty() {
565        drop(handlers);
566        resetlog_handlers();
567    }
568}
569
570/// Get a logger for the current module
571#[macro_export]
572macro_rules! getlogger {
573    () => {
574        $crate::logging::Logger::new(module_path!())
575    };
576    ($name:expr) => {
577        $crate::logging::Logger::new($name)
578    };
579}
580
581// # Distributed Logging and Adaptive Rate Limiting (Beta 2)
582//
583// This section provides advanced distributed logging capabilities with
584// aggregation, adaptive rate limiting, and multi-node coordination.
585
586/// Distributed logging capabilities for multi-node computations
587pub mod distributed {
588    use super::*;
589    use std::collections::{HashMap, VecDeque};
590    use std::fmt;
591    use std::sync::atomic::{AtomicU64, AtomicUsize, Ordering};
592    use std::sync::{Arc, Mutex, RwLock};
593    use std::thread;
594    use std::time::{Duration, Instant, SystemTime, UNIX_EPOCH};
595
596    /// Node identifier for distributed logging
597    #[derive(Debug, Clone, PartialEq, Eq, Hash)]
598    pub struct NodeId {
599        name: String,
600        instance_id: String,
601    }
602
603    impl NodeId {
604        /// Create a new node identifier
605        pub fn new(name: String, instanceid: String) -> Self {
606            Self {
607                name,
608                instance_id: instanceid,
609            }
610        }
611
612        /// Create from hostname and process ID
613        pub fn from_hostname() -> Self {
614            let hostname = std::env::var("HOSTNAME").unwrap_or_else(|_| "unknown".to_string());
615            let pid = std::process::id();
616            Self::new(hostname, pid.to_string())
617        }
618
619        /// Get node name
620        pub fn name(&self) -> &str {
621            &self.name
622        }
623
624        /// Get instance ID
625        pub fn instance_id(&self) -> &str {
626            &self.instance_id
627        }
628    }
629
630    impl fmt::Display for NodeId {
631        fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
632            write!(f, "{}:{}", self.name, self.instance_id)
633        }
634    }
635
636    /// Distributed log entry with metadata
637    #[derive(Debug, Clone)]
638    pub struct DistributedLogEntry {
639        /// Unique entry ID
640        pub id: u64,
641        /// Source node
642        #[allow(dead_code)]
643        pub nodeid: NodeId,
644        /// Timestamp (Unix epoch milliseconds)
645        pub timestamp: u64,
646        /// Log level
647        pub level: LogLevel,
648        /// Logger name
649        pub logger: String,
650        /// Message content
651        pub message: String,
652        /// Additional context fields
653        pub context: HashMap<String, String>,
654        /// Sequence number for ordering
655        pub sequence: u64,
656    }
657
658    impl DistributedLogEntry {
659        /// Create a new distributed log entry
660        pub fn new(
661            nodeid: NodeId,
662            level: LogLevel,
663            logger: String,
664            message: String,
665            context: HashMap<String, String>,
666        ) -> Self {
667            static ID_COUNTER: AtomicU64 = AtomicU64::new(1);
668            static SEQ_COUNTER: AtomicU64 = AtomicU64::new(1);
669
670            Self {
671                id: ID_COUNTER.fetch_add(1, Ordering::Relaxed),
672                nodeid,
673                timestamp: SystemTime::now()
674                    .duration_since(UNIX_EPOCH)
675                    .expect("Test: operation failed")
676                    .as_millis() as u64,
677                level,
678                logger,
679                message,
680                context,
681                sequence: SEQ_COUNTER.fetch_add(1, Ordering::Relaxed),
682            }
683        }
684
685        /// Get age of this log entry
686        pub fn age(&self) -> Duration {
687            let now = SystemTime::now()
688                .duration_since(UNIX_EPOCH)
689                .expect("Test: operation failed")
690                .as_millis() as u64;
691            Duration::from_millis(now.saturating_sub(self.timestamp))
692        }
693    }
694
695    /// Log aggregator that collects and processes distributed log entries
696    #[allow(dead_code)]
697    pub struct LogAggregator {
698        #[allow(dead_code)]
699        nodeid: NodeId,
700        entries: Arc<RwLock<VecDeque<DistributedLogEntry>>>,
701        max_entries: usize,
702        aggregation_window: Duration,
703        stats: Arc<RwLock<AggregationStats>>,
704    }
705
706    /// Statistics for log aggregation
707    #[derive(Debug, Clone, Default)]
708    pub struct AggregationStats {
709        pub total_entries: u64,
710        pub entries_by_level: HashMap<LogLevel, u64>,
711        pub entries_by_node: HashMap<NodeId, u64>,
712        pub dropped_entries: u64,
713        pub aggregation_windows: u64,
714    }
715
716    impl LogAggregator {
717        /// Create a new log aggregator
718        pub fn new(nodeid: NodeId, max_entries: usize, aggregationwindow: Duration) -> Self {
719            Self {
720                nodeid,
721                entries: Arc::new(RwLock::new(VecDeque::new())),
722                max_entries,
723                aggregation_window: aggregationwindow,
724                stats: Arc::new(RwLock::new(AggregationStats::default())),
725            }
726        }
727
728        /// Add a log entry to the aggregator
729        pub fn add_entry(&self, entry: DistributedLogEntry) {
730            let mut entries = self.entries.write().expect("Operation failed");
731            let mut stats = self.stats.write().expect("Operation failed");
732
733            // Remove old entries beyond the window
734            let cutoff = entry
735                .timestamp
736                .saturating_sub(self.aggregation_window.as_millis() as u64);
737            while let Some(front) = entries.front() {
738                if front.timestamp >= cutoff {
739                    break;
740                }
741                let removed = entries.pop_front().expect("Operation failed");
742                // Update stats for removed entry
743                if let Some(count) = stats.entries_by_level.get_mut(&removed.level) {
744                    *count = count.saturating_sub(1);
745                }
746                if let Some(count) = stats.entries_by_node.get_mut(&removed.nodeid) {
747                    *count = count.saturating_sub(1);
748                }
749            }
750
751            // Add new entry
752            if entries.len() >= self.max_entries {
753                if let Some(removed) = entries.pop_front() {
754                    stats.dropped_entries += 1;
755                    // Update stats for dropped entry
756                    if let Some(count) = stats.entries_by_level.get_mut(&removed.level) {
757                        *count = count.saturating_sub(1);
758                    }
759                    if let Some(count) = stats.entries_by_node.get_mut(&removed.nodeid) {
760                        *count = count.saturating_sub(1);
761                    }
762                }
763            }
764
765            // Update stats for new entry
766            stats.total_entries += 1;
767            *stats.entries_by_level.entry(entry.level).or_insert(0) += 1;
768            *stats
769                .entries_by_node
770                .entry(entry.nodeid.clone())
771                .or_insert(0) += 1;
772
773            entries.push_back(entry);
774        }
775
776        /// Get all entries within the aggregation window
777        pub fn get_entries(&self) -> Vec<DistributedLogEntry> {
778            self.entries
779                .read()
780                .expect("Operation failed")
781                .iter()
782                .cloned()
783                .collect()
784        }
785
786        /// Get entries filtered by level
787        pub fn get_entries_by_level(&self, level: LogLevel) -> Vec<DistributedLogEntry> {
788            self.entries
789                .read()
790                .expect("Test: operation failed")
791                .iter()
792                .filter(|entry| entry.level == level)
793                .cloned()
794                .collect()
795        }
796
797        /// Get entries from specific node
798        pub fn get_entries_by_node(&self, nodeid: &NodeId) -> Vec<DistributedLogEntry> {
799            self.entries
800                .read()
801                .expect("Test: operation failed")
802                .iter()
803                .filter(|entry| &entry.nodeid == nodeid)
804                .cloned()
805                .collect()
806        }
807
808        /// Get aggregation statistics
809        pub fn stats(&self) -> AggregationStats {
810            self.stats.read().expect("Operation failed").clone()
811        }
812
813        /// Clear all entries
814        pub fn clear(&self) {
815            self.entries.write().expect("Operation failed").clear();
816            *self.stats.write().expect("Operation failed") = AggregationStats::default();
817        }
818    }
819
820    /// Adaptive rate limiter for high-frequency logging
821    pub struct AdaptiveRateLimiter {
822        max_rate: Arc<Mutex<f64>>, // Maximum messages per second
823        current_rate: Arc<Mutex<f64>>,
824        last_reset: Arc<Mutex<Instant>>,
825        message_count: Arc<AtomicUsize>,
826        window_duration: Duration,
827        adaptation_factor: f64,
828        min_rate: f64,
829        max_rate_absolute: f64,
830    }
831
832    impl AdaptiveRateLimiter {
833        /// Create a new adaptive rate limiter
834        pub fn new(
835            initial_max_rate: f64,
836            window_duration: Duration,
837            adaptation_factor: f64,
838        ) -> Self {
839            Self {
840                max_rate: Arc::new(Mutex::new(initial_max_rate)),
841                current_rate: Arc::new(Mutex::new(0.0)),
842                last_reset: Arc::new(Mutex::new(Instant::now())),
843                message_count: Arc::new(AtomicUsize::new(0)),
844                window_duration,
845                adaptation_factor,
846                min_rate: initial_max_rate * 0.1, // 10% of initial rate
847                max_rate_absolute: initial_max_rate * 10.0, // 10x initial rate
848            }
849        }
850
851        /// Check if a message should be allowed through
852        pub fn try_acquire(&self) -> bool {
853            let now = Instant::now();
854            let count = self.message_count.fetch_add(1, Ordering::Relaxed);
855
856            let mut last_reset = self.last_reset.lock().expect("Operation failed");
857            let elapsed = now.duration_since(*last_reset);
858
859            if elapsed >= self.window_duration {
860                // Reset window and update current rate
861                let actual_rate = count as f64 / elapsed.as_secs_f64();
862                {
863                    let mut current_rate = self.current_rate.lock().expect("Operation failed");
864                    *current_rate = actual_rate;
865                }
866
867                self.message_count.store(0, Ordering::Relaxed);
868                *last_reset = now;
869
870                // Adapt max rate based on actual usage
871                self.adapt_rate(actual_rate);
872
873                true // Allow message at window boundary
874            } else {
875                // Check if current rate exceeds limit
876                let elapsed_secs = elapsed.as_secs_f64();
877                if elapsed_secs < 0.001 {
878                    // For very short durations, allow the message
879                    true
880                } else {
881                    let current_rate = count as f64 / elapsed_secs;
882                    let max_rate = *self.max_rate.lock().expect("Operation failed");
883                    current_rate <= max_rate
884                }
885            }
886        }
887
888        /// Adapt the maximum rate based on observed patterns
889        fn adapt_rate(&self, actualrate: f64) {
890            let mut max_rate = self.max_rate.lock().expect("Operation failed");
891
892            // If actual rate is consistently lower, reduce max rate
893            // If actual rate hits the limit, increase max rate
894            if actualrate < *max_rate * 0.5 {
895                // Reduce max rate
896                *max_rate = (*max_rate * (1.0 - self.adaptation_factor)).max(self.min_rate);
897            } else if actualrate >= *max_rate * 0.9 {
898                // Increase max rate
899                *max_rate =
900                    (*max_rate * (1.0 + self.adaptation_factor)).min(self.max_rate_absolute);
901            }
902        }
903
904        /// Get current rate statistics
905        pub fn get_stats(&self) -> RateLimitStats {
906            let current_rate = *self.current_rate.lock().expect("Operation failed");
907            let max_rate = *self.max_rate.lock().expect("Operation failed");
908            RateLimitStats {
909                current_rate,
910                max_rate,
911                message_count: self.message_count.load(Ordering::Relaxed),
912                window_duration: self.window_duration,
913            }
914        }
915
916        /// Reset the rate limiter
917        pub fn reset(&self) {
918            *self.current_rate.lock().expect("Operation failed") = 0.0;
919            *self.last_reset.lock().expect("Operation failed") = Instant::now();
920            self.message_count.store(0, Ordering::Relaxed);
921        }
922    }
923
924    /// Rate limiting statistics
925    #[derive(Debug, Clone)]
926    pub struct RateLimitStats {
927        pub current_rate: f64,
928        pub max_rate: f64,
929        pub message_count: usize,
930        pub window_duration: Duration,
931    }
932
933    /// Distributed logger that coordinates with multiple nodes
934    pub struct DistributedLogger {
935        #[allow(dead_code)]
936        nodeid: NodeId,
937        locallogger: Logger,
938        aggregator: Arc<LogAggregator>,
939        rate_limiters: Arc<RwLock<HashMap<String, AdaptiveRateLimiter>>>,
940        default_rate_limit: f64,
941    }
942
943    impl DistributedLogger {
944        /// Create a new distributed logger
945        pub fn new(
946            logger_name: &str,
947            nodeid: NodeId,
948            max_entries: usize,
949            aggregation_window: Duration,
950            default_rate_limit: f64,
951        ) -> Self {
952            let locallogger = Logger::new(logger_name);
953            let aggregator = Arc::new(LogAggregator::new(
954                nodeid.clone(),
955                max_entries,
956                aggregation_window,
957            ));
958
959            Self {
960                nodeid,
961                locallogger,
962                aggregator,
963                rate_limiters: Arc::new(RwLock::new(HashMap::new())),
964                default_rate_limit,
965            }
966        }
967
968        /// Log a message with adaptive rate limiting
969        pub fn log_adaptive(
970            &self,
971            level: LogLevel,
972            message: &str,
973            context: Option<HashMap<String, String>>,
974        ) {
975            let logger_key = self.locallogger.module.clone();
976
977            // Get or create rate limiter for this logger
978            let shouldlog = {
979                let rate_limiters = self.rate_limiters.read().expect("Operation failed");
980                if let Some(limiter) = rate_limiters.get(&logger_key) {
981                    limiter.try_acquire()
982                } else {
983                    drop(rate_limiters);
984
985                    // Create new rate limiter
986                    let mut rate_limiters = self.rate_limiters.write().expect("Operation failed");
987                    let limiter = AdaptiveRateLimiter::new(
988                        self.default_rate_limit,
989                        Duration::from_secs(1),
990                        0.1, // 10% adaptation factor
991                    );
992                    let shouldlog = limiter.try_acquire();
993                    rate_limiters.insert(logger_key, limiter);
994                    shouldlog
995                }
996            };
997
998            if shouldlog {
999                // Log locally
1000                self.locallogger.writelog(level, message);
1001
1002                // Create distributed log entry
1003                let entry = DistributedLogEntry::new(
1004                    self.nodeid.clone(),
1005                    level,
1006                    self.locallogger.module.clone(),
1007                    message.to_string(),
1008                    context.unwrap_or_default(),
1009                );
1010
1011                // Add to aggregator
1012                self.aggregator.add_entry(entry);
1013            }
1014        }
1015
1016        /// Convenience methods for different log levels
1017        pub fn error_adaptive(&self, message: &str) {
1018            self.log_adaptive(LogLevel::Error, message, None);
1019        }
1020
1021        pub fn warn_adaptive(&self, message: &str) {
1022            self.log_adaptive(LogLevel::Warn, message, None);
1023        }
1024
1025        pub fn info_adaptive(&self, message: &str) {
1026            self.log_adaptive(LogLevel::Info, message, None);
1027        }
1028
1029        pub fn debug_adaptive(&self, message: &str) {
1030            self.log_adaptive(LogLevel::Debug, message, None);
1031        }
1032
1033        /// Get aggregated log entries
1034        pub fn get_aggregatedlogs(&self) -> Vec<DistributedLogEntry> {
1035            self.aggregator.get_entries()
1036        }
1037
1038        /// Get rate limiting statistics for all loggers
1039        pub fn get_rate_stats(&self) -> HashMap<String, RateLimitStats> {
1040            self.rate_limiters
1041                .read()
1042                .expect("Test: operation failed")
1043                .iter()
1044                .map(|(k, v)| (k.clone(), v.get_stats()))
1045                .collect()
1046        }
1047
1048        /// Get aggregation statistics
1049        pub fn get_aggregation_stats(&self) -> AggregationStats {
1050            self.aggregator.stats()
1051        }
1052
1053        /// Export logs to JSON format
1054        pub fn exportlogs_json(&self) -> Result<String, Box<dyn std::error::Error>> {
1055            let entries = self.get_aggregatedlogs();
1056            let stats = self.get_aggregation_stats();
1057
1058            let export_data = serde_json::json!({
1059                "nodeid": self.nodeid.to_string(),
1060                "timestamp": SystemTime::now()
1061                    .duration_since(UNIX_EPOCH)
1062                    .expect("Test: operation failed")
1063                    .as_millis(),
1064                "stats": {
1065                    "total_entries": stats.total_entries,
1066                    "dropped_entries": stats.dropped_entries,
1067                    "aggregation_windows": stats.aggregation_windows
1068                },
1069                "entries": entries.iter().map(|entry| serde_json::json!({
1070                    "id": entry.id,
1071                    "nodeid": entry.nodeid.to_string(),
1072                    "timestamp": entry.timestamp,
1073                    "level": format!("{0:?}", entry.level),
1074                    "logger": entry.logger,
1075                    "message": entry.message,
1076                    "context": entry.context,
1077                    "sequence": entry.sequence
1078                })).collect::<Vec<_>>()
1079            });
1080
1081            Ok(serde_json::to_string_pretty(&export_data)?)
1082        }
1083
1084        /// Clear all aggregated data
1085        pub fn clear_aggregated_data(&self) {
1086            self.aggregator.clear();
1087
1088            // Reset rate limiters
1089            let rate_limiters = self.rate_limiters.write().expect("Operation failed");
1090            for limiter in rate_limiters.values() {
1091                limiter.reset();
1092            }
1093        }
1094    }
1095
1096    /// Multi-node log coordinator for distributed systems
1097    pub struct MultiNodeCoordinator {
1098        nodes: Arc<RwLock<HashMap<NodeId, Arc<DistributedLogger>>>>,
1099        global_aggregator: Arc<LogAggregator>,
1100        coordination_interval: Duration,
1101        running: Arc<AtomicUsize>, // 0 = stopped, 1 = running
1102    }
1103
1104    impl MultiNodeCoordinator {
1105        /// Create a new multi-node coordinator
1106        pub fn new(coordinationinterval: Duration) -> Self {
1107            let global_node = NodeId::new("global".to_string(), "coordinator".to_string());
1108            let global_aggregator = Arc::new(LogAggregator::new(
1109                global_node,
1110                100000,                    // Large capacity for global aggregation
1111                Duration::from_secs(3600), // 1 hour window
1112            ));
1113
1114            Self {
1115                nodes: Arc::new(RwLock::new(HashMap::new())),
1116                global_aggregator,
1117                coordination_interval: coordinationinterval,
1118                running: Arc::new(AtomicUsize::new(0)),
1119            }
1120        }
1121
1122        /// Register a distributed logger
1123        pub fn register_node(&self, nodeid: NodeId, logger: Arc<DistributedLogger>) {
1124            let mut nodes = self.nodes.write().expect("Operation failed");
1125            nodes.insert(nodeid, logger);
1126        }
1127
1128        /// Unregister a node
1129        pub fn unregister_node(&self, nodeid: &NodeId) {
1130            let mut nodes = self.nodes.write().expect("Operation failed");
1131            nodes.remove(nodeid);
1132        }
1133
1134        /// Start coordination process
1135        pub fn start(&self) {
1136            if self
1137                .running
1138                .compare_exchange(0, 1, Ordering::Relaxed, Ordering::Relaxed)
1139                .is_ok()
1140            {
1141                let nodes = self.nodes.clone();
1142                let global_aggregator = self.global_aggregator.clone();
1143                let interval = self.coordination_interval;
1144                let running = self.running.clone();
1145
1146                thread::spawn(move || {
1147                    while running.load(Ordering::Relaxed) == 1 {
1148                        // Collect logs from all nodes
1149                        let nodes_guard = nodes.read().expect("Operation failed");
1150                        for logger in nodes_guard.values() {
1151                            let entries = logger.get_aggregatedlogs();
1152                            for entry in entries {
1153                                global_aggregator.add_entry(entry);
1154                            }
1155                        }
1156                        drop(nodes_guard);
1157
1158                        thread::sleep(interval);
1159                    }
1160                });
1161            }
1162        }
1163
1164        /// Stop coordination process
1165        pub fn stop(&self) {
1166            self.running.store(0, Ordering::Relaxed);
1167        }
1168
1169        /// Get global aggregated statistics
1170        pub fn get_global_stats(&self) -> AggregationStats {
1171            self.global_aggregator.stats()
1172        }
1173
1174        /// Get all global log entries
1175        pub fn get_global_entries(&self) -> Vec<DistributedLogEntry> {
1176            self.global_aggregator.get_entries()
1177        }
1178
1179        /// Export global logs to JSON
1180        pub fn export_globallogs_json(&self) -> Result<String, Box<dyn std::error::Error>> {
1181            let entries = self.get_global_entries();
1182            let stats = self.get_global_stats();
1183
1184            let export_data = serde_json::json!({
1185                "coordinator": "global",
1186                "timestamp": SystemTime::now()
1187                    .duration_since(UNIX_EPOCH)
1188                    .expect("Test: operation failed")
1189                    .as_millis(),
1190                "stats": {
1191                    "total_entries": stats.total_entries,
1192                    "dropped_entries": stats.dropped_entries,
1193                    "nodes_count": self.nodes.read().expect("Operation failed").len(),
1194                    "entries_by_level": stats.entries_by_level.iter().map(|(k, v)| (format!("{k:?}"), *v)).collect::<HashMap<String, u64>>()
1195                },
1196                "entries": entries.iter().map(|entry| serde_json::json!({
1197                    "id": entry.id,
1198                    "nodeid": entry.nodeid.to_string(),
1199                    "timestamp": entry.timestamp,
1200                    "level": format!("{0:?}", entry.level),
1201                    "logger": entry.logger,
1202                    "message": entry.message,
1203                    "context": entry.context,
1204                    "sequence": entry.sequence
1205                })).collect::<Vec<_>>()
1206            });
1207
1208            Ok(serde_json::to_string_pretty(&export_data)?)
1209        }
1210    }
1211
1212    impl Drop for MultiNodeCoordinator {
1213        fn drop(&mut self) {
1214            self.stop();
1215        }
1216    }
1217}
1218
1219#[cfg(test)]
1220mod distributed_tests {
1221    use super::distributed::*;
1222    use super::*;
1223    use std::time::Duration;
1224
1225    #[test]
1226    fn test_nodeid_creation() {
1227        let node = NodeId::new("worker1".to_string(), "pid123".to_string());
1228        assert_eq!(node.name(), "worker1");
1229        assert_eq!(node.instance_id(), "pid123");
1230        assert_eq!(node.to_string(), "worker1:pid123");
1231    }
1232
1233    #[test]
1234    fn testlog_aggregator() {
1235        let nodeid = NodeId::new("test_node".to_string(), 1.to_string());
1236        let aggregator = LogAggregator::new(nodeid.clone(), 100, Duration::from_secs(60));
1237
1238        let entry = DistributedLogEntry::new(
1239            nodeid,
1240            LogLevel::Info,
1241            "testlogger".to_string(),
1242            "Test message".to_string(),
1243            HashMap::new(),
1244        );
1245
1246        aggregator.add_entry(entry);
1247
1248        let entries = aggregator.get_entries();
1249        assert_eq!(entries.len(), 1);
1250        assert_eq!(entries[0].message, "Test message");
1251
1252        let stats = aggregator.stats();
1253        assert_eq!(stats.total_entries, 1);
1254    }
1255
1256    #[test]
1257    fn test_adaptive_rate_limiter() {
1258        let limiter = AdaptiveRateLimiter::new(10.0, Duration::from_millis(100), 0.1);
1259
1260        // Should allow initial messages
1261        assert!(limiter.try_acquire());
1262        assert!(limiter.try_acquire());
1263
1264        let stats = limiter.get_stats();
1265        assert!(stats.current_rate >= 0.0);
1266        assert_eq!(stats.max_rate, 10.0);
1267    }
1268
1269    #[test]
1270    fn test_distributedlogger() {
1271        let nodeid = NodeId::new("test_node".to_string(), 1.to_string());
1272        let logger =
1273            DistributedLogger::new("testlogger", nodeid, 1000, Duration::from_secs(60), 100.0);
1274
1275        logger.info_adaptive("Test message 1");
1276        logger.warn_adaptive("Test message 2");
1277
1278        let entries = logger.get_aggregatedlogs();
1279        assert!(!entries.is_empty()); // At least one message should go through
1280
1281        let stats = logger.get_aggregation_stats();
1282        assert!(stats.total_entries >= 1);
1283    }
1284
1285    #[test]
1286    fn test_multi_node_coordinator() {
1287        let coordinator = MultiNodeCoordinator::new(Duration::from_millis(10));
1288
1289        let node1_id = NodeId::new("node1".to_string(), "1".to_string());
1290        let node1logger = Arc::new(DistributedLogger::new(
1291            "node1logger",
1292            node1_id.clone(),
1293            100,
1294            Duration::from_secs(10),
1295            50.0,
1296        ));
1297
1298        coordinator.register_node(node1_id, node1logger);
1299
1300        // Start coordination
1301        coordinator.start();
1302
1303        // Let it run briefly
1304        std::thread::sleep(Duration::from_millis(50));
1305
1306        coordinator.stop();
1307
1308        let stats = coordinator.get_global_stats();
1309        // Should have basic structure even if no messages
1310        // Note: total_entries is u64 so always >= 0, just check it exists
1311        let _ = stats.total_entries;
1312    }
1313
1314    #[test]
1315    fn testlog_export() {
1316        let nodeid = NodeId::new("export_test".to_string(), 1.to_string());
1317        let logger =
1318            DistributedLogger::new("exportlogger", nodeid, 100, Duration::from_secs(60), 100.0);
1319
1320        logger.info_adaptive("Export test message");
1321
1322        let json_export = logger.exportlogs_json().expect("Operation failed");
1323        assert!(json_export.contains("export_test"));
1324        assert!(json_export.contains("Export test message"));
1325    }
1326}