1use once_cell::sync::Lazy;
54use std::collections::HashMap;
55use std::fmt::Display;
56use std::sync::{Arc, Mutex};
57use std::time::{Duration, Instant};
58
59pub mod progress;
61
62pub mod rate_limiting;
64
65#[derive(Debug, Clone, Copy, PartialEq, Eq, PartialOrd, Ord, Hash)]
67pub enum LogLevel {
68 Trace = 0,
70 Debug = 1,
72 Info = 2,
74 Warn = 3,
76 Error = 4,
78 Critical = 5,
80}
81
82impl LogLevel {
83 pub const fn as_str(&self) -> &'static str {
85 match self {
86 LogLevel::Trace => "TRACE",
87 LogLevel::Debug => "DEBUG",
88 LogLevel::Info => "INFO",
89 LogLevel::Warn => "WARN",
90 LogLevel::Error => "ERROR",
91 LogLevel::Critical => "CRITICAL",
92 }
93 }
94}
95
96#[derive(Debug, Clone)]
98pub struct LogEntry {
99 pub timestamp: std::time::SystemTime,
101 pub level: LogLevel,
103 pub module: String,
105 pub message: String,
107 pub fields: HashMap<String, String>,
109}
110
111#[derive(Debug, Clone)]
113pub struct LoggerConfig {
114 pub min_level: LogLevel,
116 pub show_timestamps: bool,
118 pub show_modules: bool,
120 pub module_levels: HashMap<String, LogLevel>,
122}
123
124impl Default for LoggerConfig {
125 fn default() -> Self {
126 Self {
127 min_level: LogLevel::Info,
128 show_timestamps: true,
129 show_modules: true,
130 module_levels: HashMap::new(),
131 }
132 }
133}
134
135static LOGGER_CONFIG: Lazy<Mutex<LoggerConfig>> = Lazy::new(|| Mutex::new(LoggerConfig::default()));
137
138#[allow(dead_code)]
140pub fn configurelogger(config: LoggerConfig) {
141 let mut global_config = LOGGER_CONFIG.lock().expect("Operation failed");
142 *global_config = config;
143}
144
145#[allow(dead_code)]
147pub fn set_level(level: LogLevel) {
148 let mut config = LOGGER_CONFIG.lock().expect("Operation failed");
149 config.min_level = level;
150}
151
152#[allow(dead_code)]
154pub fn set_module_level(module: &str, level: LogLevel) {
155 let mut config = LOGGER_CONFIG.lock().expect("Operation failed");
156 config.module_levels.insert(module.to_string(), level);
157}
158
159pub trait LogHandler: Send + Sync {
161 fn handle(&self, entry: &LogEntry);
163}
164
165pub struct ConsoleLogHandler {
167 pub format: String,
169}
170
171impl Default for ConsoleLogHandler {
172 fn default() -> Self {
173 Self {
174 format: "[{level}] {module}: {message}".to_string(),
175 }
176 }
177}
178
179impl LogHandler for ConsoleLogHandler {
180 fn handle(&self, entry: &LogEntry) {
181 let mut output = self.format.clone();
182
183 output = output.replace("{level}", entry.level.as_str());
185 output = output.replace("{module}", &entry.module);
186 output = output.replace("{message}", &entry.message);
187
188 if self.format.contains("{timestamp}") {
189 let datetime = chrono::DateTime::<chrono::Utc>::from(entry.timestamp);
190 output = output.replace(
191 "{timestamp}",
192 &datetime.format("%Y-%m-%d %H:%M:%S%.3f").to_string(),
193 );
194 }
195
196 if self.format.contains("{fields}") {
197 let fields_str = entry
198 .fields
199 .iter()
200 .map(|(k, v)| format!("{k}={v}"))
201 .collect::<Vec<_>>()
202 .join(", ");
203 output = output.replace("{fields}", &fields_str);
204 }
205
206 match entry.level {
208 LogLevel::Error | LogLevel::Critical => eprintln!("{output}"),
209 _ => println!("{output}"),
210 }
211 }
212}
213
214pub struct FileLogHandler {
216 pub file_path: String,
218 pub format: String,
220}
221
222impl LogHandler for FileLogHandler {
223 fn handle(&self, entry: &LogEntry) {
224 let mut output = self.format.clone();
228
229 output = output.replace("{level}", entry.level.as_str());
231 output = output.replace("{module}", &entry.module);
232 output = output.replace("{message}", &entry.message);
233
234 if self.format.contains("{timestamp}") {
235 let datetime = chrono::DateTime::<chrono::Utc>::from(entry.timestamp);
236 output = output.replace(
237 "{timestamp}",
238 &datetime.format("%Y-%m-%d %H:%M:%S%.3f").to_string(),
239 );
240 }
241
242 if self.format.contains("{fields}") {
243 let fields_str = entry
244 .fields
245 .iter()
246 .map(|(k, v)| format!("{k}={v}"))
247 .collect::<Vec<_>>()
248 .join(", ");
249 output = output.replace("{fields}", &fields_str);
250 }
251
252 if let Ok(mut file) = std::fs::OpenOptions::new()
255 .create(true)
256 .append(true)
257 .open(&self.file_path)
258 {
259 use std::io::Write;
260 let _ = writeln!(file, "{output}");
261 }
262 }
263}
264
265static LOG_HANDLERS: Lazy<Mutex<Vec<Arc<dyn LogHandler>>>> = Lazy::new(|| {
267 let console_handler = Arc::new(ConsoleLogHandler::default());
268 Mutex::new(vec![console_handler])
269});
270
271#[allow(dead_code)]
273pub fn set_handler(handler: Arc<dyn LogHandler>) {
274 let mut handlers = LOG_HANDLERS.lock().expect("Operation failed");
275 handlers.push(handler);
276}
277
278#[allow(dead_code)]
280pub fn clearlog_handlers() {
281 let mut handlers = LOG_HANDLERS.lock().expect("Operation failed");
282 handlers.clear();
283}
284
285#[allow(dead_code)]
287pub fn resetlog_handlers() {
288 let mut handlers = LOG_HANDLERS.lock().expect("Operation failed");
289 handlers.clear();
290 handlers.push(Arc::new(ConsoleLogHandler::default()));
291}
292
293#[derive(Clone)]
295pub struct Logger {
296 module: String,
298 fields: HashMap<String, String>,
300}
301
302impl Logger {
303 pub fn new(module: &str) -> Self {
305 Self {
306 module: module.to_string(),
307 fields: HashMap::new(),
308 }
309 }
310
311 pub fn with_field<K, V>(mut self, key: K, value: V) -> Self
313 where
314 K: Into<String>,
315 V: Display,
316 {
317 self.fields.insert(key.into(), format!("{value}"));
318 self
319 }
320
321 pub fn with_fields<K, V, I>(mut self, fields: I) -> Self
323 where
324 K: Into<String>,
325 V: Display,
326 I: IntoIterator<Item = (K, V)>,
327 {
328 for (key, value) in fields {
329 self.fields.insert(key.into(), format!("{value}"));
330 }
331 self
332 }
333
334 pub fn writelog(&self, level: LogLevel, message: &str) {
336 let config = LOGGER_CONFIG.lock().expect("Operation failed");
338 let module_level = config
339 .module_levels
340 .get(&self.module)
341 .copied()
342 .unwrap_or(config.min_level);
343
344 if level < module_level {
345 return;
346 }
347
348 let entry = LogEntry {
350 timestamp: std::time::SystemTime::now(),
351 level,
352 module: self.module.clone(),
353 message: message.to_string(),
354 fields: self.fields.clone(),
355 };
356
357 let handlers = LOG_HANDLERS.lock().expect("Operation failed");
359 for handler in handlers.iter() {
360 handler.handle(&entry);
361 }
362 }
363
364 pub fn trace(&self, message: &str) {
366 self.writelog(LogLevel::Trace, message);
367 }
368
369 pub fn debug(&self, message: &str) {
371 self.writelog(LogLevel::Debug, message);
372 }
373
374 pub fn info(&self, message: &str) {
376 self.writelog(LogLevel::Info, message);
377 }
378
379 pub fn warn(&self, message: &str) {
381 self.writelog(LogLevel::Warn, message);
382 }
383
384 pub fn error(&self, message: &str) {
386 self.writelog(LogLevel::Error, message);
387 }
388
389 pub fn critical(&self, message: &str) {
391 self.writelog(LogLevel::Critical, message);
392 }
393
394 pub fn track_progress(
396 &self,
397 description: &str,
398 total: u64,
399 ) -> progress::EnhancedProgressTracker {
400 use progress::{ProgressBuilder, ProgressStyle};
401
402 let builder = ProgressBuilder::new(description, total)
403 .style(ProgressStyle::DetailedBar)
404 .show_statistics(true);
405
406 let mut tracker = builder.build();
407
408 self.info(&format!("Starting progress tracking: {description}"));
410
411 tracker.start();
412 tracker
413 }
414
415 pub fn info_with_progress(
417 &self,
418 message: &str,
419 progress: &mut progress::EnhancedProgressTracker,
420 update: u64,
421 ) {
422 self.info(message);
423 progress.update(update);
424 }
425
426 pub fn with_progress<F, R>(&self, description: &str, total: u64, operation: F) -> R
428 where
429 F: FnOnce(&mut progress::EnhancedProgressTracker) -> R,
430 {
431 let mut progress = self.track_progress(description, total);
432 let result = operation(&mut progress);
433 progress.finish();
434
435 let stats = progress.stats();
437 self.info(&format!(
438 "Completed progress tracking: {description} - {elapsed:.1}s elapsed",
439 elapsed = stats.elapsed.as_secs_f64()
440 ));
441
442 result
443 }
444}
445
446pub struct ProgressTracker {
448 name: String,
450 total: usize,
452 current: usize,
454 start_time: Instant,
456 last_update: Instant,
458 update_interval: Duration,
460 logger: Logger,
462}
463
464impl ProgressTracker {
465 pub fn new(name: &str, total: usize) -> Self {
467 let now = Instant::now();
468 let logger = Logger::new("progress").with_field("operation", name);
469
470 logger.info(&format!("Starting operation: {name}"));
471
472 Self {
473 name: name.to_string(),
474 total,
475 current: 0,
476 start_time: now,
477 last_update: now,
478 update_interval: Duration::from_millis(500), logger,
480 }
481 }
482
483 pub fn set_update_interval(&mut self, interval: Duration) {
485 self.update_interval = interval;
486 }
487
488 pub fn update(&mut self, current: usize) {
490 self.current = current;
491
492 let now = Instant::now();
493
494 if now.duration_since(self.last_update) >= self.update_interval {
496 self.last_update = now;
497
498 let elapsed = now.duration_since(self.start_time);
499 let percent = (self.current as f64 / self.total as f64) * 100.0;
500
501 let eta = if self.current > 0 {
502 let time_per_item = elapsed.as_secs_f64() / self.current as f64;
503 let remaining = time_per_item * (self.total - self.current) as f64;
504 format!("ETA: {remaining:.1}s")
505 } else {
506 "ETA: calculating...".to_string()
507 };
508
509 self.logger.debug(&format!(
510 "{name}: {current}/{total} ({percent:.1}%) - Elapsed: {elapsed:.1}s - {eta}",
511 name = self.name,
512 current = self.current,
513 total = self.total,
514 elapsed = elapsed.as_secs_f64()
515 ));
516 }
517 }
518
519 pub fn complete(&mut self) {
521 let elapsed = self.start_time.elapsed();
522 self.current = self.total;
523
524 self.logger.info(&format!(
525 "{name} completed: {total}/{total} (100%) - Total time: {elapsed:.1}s",
526 name = self.name,
527 total = self.total,
528 elapsed = elapsed.as_secs_f64()
529 ));
530 }
531
532 pub fn progress_percent(&self) -> f64 {
534 (self.current as f64 / self.total as f64) * 100.0
535 }
536
537 pub fn elapsed(&self) -> Duration {
539 self.start_time.elapsed()
540 }
541
542 pub fn eta(&self) -> Option<Duration> {
544 if self.current == 0 {
545 return None;
546 }
547
548 let elapsed = self.start_time.elapsed();
549 let time_per_item = elapsed.as_secs_f64() / self.current as f64;
550 let remaining_secs = time_per_item * (self.total - self.current) as f64;
551
552 Some(Duration::from_secs_f64(remaining_secs))
553 }
554}
555
556#[allow(dead_code)]
558pub fn init() {
559 let handlers = LOG_HANDLERS.lock().expect("Operation failed");
561 if handlers.is_empty() {
562 drop(handlers);
563 resetlog_handlers();
564 }
565}
566
567#[macro_export]
569macro_rules! getlogger {
570 () => {
571 $crate::logging::Logger::new(module_path!())
572 };
573 ($name:expr) => {
574 $crate::logging::Logger::new($name)
575 };
576}
577
578pub mod distributed {
585 use super::*;
586 use std::collections::{HashMap, VecDeque};
587 use std::fmt;
588 use std::sync::atomic::{AtomicU64, AtomicUsize, Ordering};
589 use std::sync::{Arc, Mutex, RwLock};
590 use std::thread;
591 use std::time::{Duration, Instant, SystemTime, UNIX_EPOCH};
592
593 #[derive(Debug, Clone, PartialEq, Eq, Hash)]
595 pub struct NodeId {
596 name: String,
597 instance_id: String,
598 }
599
600 impl NodeId {
601 pub fn new(name: String, instanceid: String) -> Self {
603 Self {
604 name,
605 instance_id: instanceid,
606 }
607 }
608
609 pub fn from_hostname() -> Self {
611 let hostname = std::env::var("HOSTNAME").unwrap_or_else(|_| "unknown".to_string());
612 let pid = std::process::id();
613 Self::new(hostname, pid.to_string())
614 }
615
616 pub fn name(&self) -> &str {
618 &self.name
619 }
620
621 pub fn instance_id(&self) -> &str {
623 &self.instance_id
624 }
625 }
626
627 impl fmt::Display for NodeId {
628 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
629 write!(f, "{}:{}", self.name, self.instance_id)
630 }
631 }
632
633 #[derive(Debug, Clone)]
635 pub struct DistributedLogEntry {
636 pub id: u64,
638 #[allow(dead_code)]
640 pub nodeid: NodeId,
641 pub timestamp: u64,
643 pub level: LogLevel,
645 pub logger: String,
647 pub message: String,
649 pub context: HashMap<String, String>,
651 pub sequence: u64,
653 }
654
655 impl DistributedLogEntry {
656 pub fn new(
658 nodeid: NodeId,
659 level: LogLevel,
660 logger: String,
661 message: String,
662 context: HashMap<String, String>,
663 ) -> Self {
664 static ID_COUNTER: AtomicU64 = AtomicU64::new(1);
665 static SEQ_COUNTER: AtomicU64 = AtomicU64::new(1);
666
667 Self {
668 id: ID_COUNTER.fetch_add(1, Ordering::Relaxed),
669 nodeid,
670 timestamp: SystemTime::now()
671 .duration_since(UNIX_EPOCH)
672 .expect("Test: operation failed")
673 .as_millis() as u64,
674 level,
675 logger,
676 message,
677 context,
678 sequence: SEQ_COUNTER.fetch_add(1, Ordering::Relaxed),
679 }
680 }
681
682 pub fn age(&self) -> Duration {
684 let now = SystemTime::now()
685 .duration_since(UNIX_EPOCH)
686 .expect("Test: operation failed")
687 .as_millis() as u64;
688 Duration::from_millis(now.saturating_sub(self.timestamp))
689 }
690 }
691
692 #[allow(dead_code)]
694 pub struct LogAggregator {
695 #[allow(dead_code)]
696 nodeid: NodeId,
697 entries: Arc<RwLock<VecDeque<DistributedLogEntry>>>,
698 max_entries: usize,
699 aggregation_window: Duration,
700 stats: Arc<RwLock<AggregationStats>>,
701 }
702
703 #[derive(Debug, Clone, Default)]
705 pub struct AggregationStats {
706 pub total_entries: u64,
707 pub entries_by_level: HashMap<LogLevel, u64>,
708 pub entries_by_node: HashMap<NodeId, u64>,
709 pub dropped_entries: u64,
710 pub aggregation_windows: u64,
711 }
712
713 impl LogAggregator {
714 pub fn new(nodeid: NodeId, max_entries: usize, aggregationwindow: Duration) -> Self {
716 Self {
717 nodeid,
718 entries: Arc::new(RwLock::new(VecDeque::new())),
719 max_entries,
720 aggregation_window: aggregationwindow,
721 stats: Arc::new(RwLock::new(AggregationStats::default())),
722 }
723 }
724
725 pub fn add_entry(&self, entry: DistributedLogEntry) {
727 let mut entries = self.entries.write().expect("Operation failed");
728 let mut stats = self.stats.write().expect("Operation failed");
729
730 let cutoff = entry
732 .timestamp
733 .saturating_sub(self.aggregation_window.as_millis() as u64);
734 while let Some(front) = entries.front() {
735 if front.timestamp >= cutoff {
736 break;
737 }
738 let removed = entries.pop_front().expect("Operation failed");
739 if let Some(count) = stats.entries_by_level.get_mut(&removed.level) {
741 *count = count.saturating_sub(1);
742 }
743 if let Some(count) = stats.entries_by_node.get_mut(&removed.nodeid) {
744 *count = count.saturating_sub(1);
745 }
746 }
747
748 if entries.len() >= self.max_entries {
750 if let Some(removed) = entries.pop_front() {
751 stats.dropped_entries += 1;
752 if let Some(count) = stats.entries_by_level.get_mut(&removed.level) {
754 *count = count.saturating_sub(1);
755 }
756 if let Some(count) = stats.entries_by_node.get_mut(&removed.nodeid) {
757 *count = count.saturating_sub(1);
758 }
759 }
760 }
761
762 stats.total_entries += 1;
764 *stats.entries_by_level.entry(entry.level).or_insert(0) += 1;
765 *stats
766 .entries_by_node
767 .entry(entry.nodeid.clone())
768 .or_insert(0) += 1;
769
770 entries.push_back(entry);
771 }
772
773 pub fn get_entries(&self) -> Vec<DistributedLogEntry> {
775 self.entries
776 .read()
777 .expect("Operation failed")
778 .iter()
779 .cloned()
780 .collect()
781 }
782
783 pub fn get_entries_by_level(&self, level: LogLevel) -> Vec<DistributedLogEntry> {
785 self.entries
786 .read()
787 .expect("Test: operation failed")
788 .iter()
789 .filter(|entry| entry.level == level)
790 .cloned()
791 .collect()
792 }
793
794 pub fn get_entries_by_node(&self, nodeid: &NodeId) -> Vec<DistributedLogEntry> {
796 self.entries
797 .read()
798 .expect("Test: operation failed")
799 .iter()
800 .filter(|entry| &entry.nodeid == nodeid)
801 .cloned()
802 .collect()
803 }
804
805 pub fn stats(&self) -> AggregationStats {
807 self.stats.read().expect("Operation failed").clone()
808 }
809
810 pub fn clear(&self) {
812 self.entries.write().expect("Operation failed").clear();
813 *self.stats.write().expect("Operation failed") = AggregationStats::default();
814 }
815 }
816
817 pub struct AdaptiveRateLimiter {
819 max_rate: Arc<Mutex<f64>>, current_rate: Arc<Mutex<f64>>,
821 last_reset: Arc<Mutex<Instant>>,
822 message_count: Arc<AtomicUsize>,
823 window_duration: Duration,
824 adaptation_factor: f64,
825 min_rate: f64,
826 max_rate_absolute: f64,
827 }
828
829 impl AdaptiveRateLimiter {
830 pub fn new(
832 initial_max_rate: f64,
833 window_duration: Duration,
834 adaptation_factor: f64,
835 ) -> Self {
836 Self {
837 max_rate: Arc::new(Mutex::new(initial_max_rate)),
838 current_rate: Arc::new(Mutex::new(0.0)),
839 last_reset: Arc::new(Mutex::new(Instant::now())),
840 message_count: Arc::new(AtomicUsize::new(0)),
841 window_duration,
842 adaptation_factor,
843 min_rate: initial_max_rate * 0.1, max_rate_absolute: initial_max_rate * 10.0, }
846 }
847
848 pub fn try_acquire(&self) -> bool {
850 let now = Instant::now();
851 let count = self.message_count.fetch_add(1, Ordering::Relaxed);
852
853 let mut last_reset = self.last_reset.lock().expect("Operation failed");
854 let elapsed = now.duration_since(*last_reset);
855
856 if elapsed >= self.window_duration {
857 let actual_rate = count as f64 / elapsed.as_secs_f64();
859 {
860 let mut current_rate = self.current_rate.lock().expect("Operation failed");
861 *current_rate = actual_rate;
862 }
863
864 self.message_count.store(0, Ordering::Relaxed);
865 *last_reset = now;
866
867 self.adapt_rate(actual_rate);
869
870 true } else {
872 let elapsed_secs = elapsed.as_secs_f64();
874 if elapsed_secs < 0.001 {
875 true
877 } else {
878 let current_rate = count as f64 / elapsed_secs;
879 let max_rate = *self.max_rate.lock().expect("Operation failed");
880 current_rate <= max_rate
881 }
882 }
883 }
884
885 fn adapt_rate(&self, actualrate: f64) {
887 let mut max_rate = self.max_rate.lock().expect("Operation failed");
888
889 if actualrate < *max_rate * 0.5 {
892 *max_rate = (*max_rate * (1.0 - self.adaptation_factor)).max(self.min_rate);
894 } else if actualrate >= *max_rate * 0.9 {
895 *max_rate =
897 (*max_rate * (1.0 + self.adaptation_factor)).min(self.max_rate_absolute);
898 }
899 }
900
901 pub fn get_stats(&self) -> RateLimitStats {
903 let current_rate = *self.current_rate.lock().expect("Operation failed");
904 let max_rate = *self.max_rate.lock().expect("Operation failed");
905 RateLimitStats {
906 current_rate,
907 max_rate,
908 message_count: self.message_count.load(Ordering::Relaxed),
909 window_duration: self.window_duration,
910 }
911 }
912
913 pub fn reset(&self) {
915 *self.current_rate.lock().expect("Operation failed") = 0.0;
916 *self.last_reset.lock().expect("Operation failed") = Instant::now();
917 self.message_count.store(0, Ordering::Relaxed);
918 }
919 }
920
921 #[derive(Debug, Clone)]
923 pub struct RateLimitStats {
924 pub current_rate: f64,
925 pub max_rate: f64,
926 pub message_count: usize,
927 pub window_duration: Duration,
928 }
929
930 pub struct DistributedLogger {
932 #[allow(dead_code)]
933 nodeid: NodeId,
934 locallogger: Logger,
935 aggregator: Arc<LogAggregator>,
936 rate_limiters: Arc<RwLock<HashMap<String, AdaptiveRateLimiter>>>,
937 default_rate_limit: f64,
938 }
939
940 impl DistributedLogger {
941 pub fn new(
943 logger_name: &str,
944 nodeid: NodeId,
945 max_entries: usize,
946 aggregation_window: Duration,
947 default_rate_limit: f64,
948 ) -> Self {
949 let locallogger = Logger::new(logger_name);
950 let aggregator = Arc::new(LogAggregator::new(
951 nodeid.clone(),
952 max_entries,
953 aggregation_window,
954 ));
955
956 Self {
957 nodeid,
958 locallogger,
959 aggregator,
960 rate_limiters: Arc::new(RwLock::new(HashMap::new())),
961 default_rate_limit,
962 }
963 }
964
965 pub fn log_adaptive(
967 &self,
968 level: LogLevel,
969 message: &str,
970 context: Option<HashMap<String, String>>,
971 ) {
972 let logger_key = self.locallogger.module.clone();
973
974 let shouldlog = {
976 let rate_limiters = self.rate_limiters.read().expect("Operation failed");
977 if let Some(limiter) = rate_limiters.get(&logger_key) {
978 limiter.try_acquire()
979 } else {
980 drop(rate_limiters);
981
982 let mut rate_limiters = self.rate_limiters.write().expect("Operation failed");
984 let limiter = AdaptiveRateLimiter::new(
985 self.default_rate_limit,
986 Duration::from_secs(1),
987 0.1, );
989 let shouldlog = limiter.try_acquire();
990 rate_limiters.insert(logger_key, limiter);
991 shouldlog
992 }
993 };
994
995 if shouldlog {
996 self.locallogger.writelog(level, message);
998
999 let entry = DistributedLogEntry::new(
1001 self.nodeid.clone(),
1002 level,
1003 self.locallogger.module.clone(),
1004 message.to_string(),
1005 context.unwrap_or_default(),
1006 );
1007
1008 self.aggregator.add_entry(entry);
1010 }
1011 }
1012
1013 pub fn error_adaptive(&self, message: &str) {
1015 self.log_adaptive(LogLevel::Error, message, None);
1016 }
1017
1018 pub fn warn_adaptive(&self, message: &str) {
1019 self.log_adaptive(LogLevel::Warn, message, None);
1020 }
1021
1022 pub fn info_adaptive(&self, message: &str) {
1023 self.log_adaptive(LogLevel::Info, message, None);
1024 }
1025
1026 pub fn debug_adaptive(&self, message: &str) {
1027 self.log_adaptive(LogLevel::Debug, message, None);
1028 }
1029
1030 pub fn get_aggregatedlogs(&self) -> Vec<DistributedLogEntry> {
1032 self.aggregator.get_entries()
1033 }
1034
1035 pub fn get_rate_stats(&self) -> HashMap<String, RateLimitStats> {
1037 self.rate_limiters
1038 .read()
1039 .expect("Test: operation failed")
1040 .iter()
1041 .map(|(k, v)| (k.clone(), v.get_stats()))
1042 .collect()
1043 }
1044
1045 pub fn get_aggregation_stats(&self) -> AggregationStats {
1047 self.aggregator.stats()
1048 }
1049
1050 pub fn exportlogs_json(&self) -> Result<String, Box<dyn std::error::Error>> {
1052 let entries = self.get_aggregatedlogs();
1053 let stats = self.get_aggregation_stats();
1054
1055 let export_data = serde_json::json!({
1056 "nodeid": self.nodeid.to_string(),
1057 "timestamp": SystemTime::now()
1058 .duration_since(UNIX_EPOCH)
1059 .expect("Test: operation failed")
1060 .as_millis(),
1061 "stats": {
1062 "total_entries": stats.total_entries,
1063 "dropped_entries": stats.dropped_entries,
1064 "aggregation_windows": stats.aggregation_windows
1065 },
1066 "entries": entries.iter().map(|entry| serde_json::json!({
1067 "id": entry.id,
1068 "nodeid": entry.nodeid.to_string(),
1069 "timestamp": entry.timestamp,
1070 "level": format!("{0:?}", entry.level),
1071 "logger": entry.logger,
1072 "message": entry.message,
1073 "context": entry.context,
1074 "sequence": entry.sequence
1075 })).collect::<Vec<_>>()
1076 });
1077
1078 Ok(serde_json::to_string_pretty(&export_data)?)
1079 }
1080
1081 pub fn clear_aggregated_data(&self) {
1083 self.aggregator.clear();
1084
1085 let rate_limiters = self.rate_limiters.write().expect("Operation failed");
1087 for limiter in rate_limiters.values() {
1088 limiter.reset();
1089 }
1090 }
1091 }
1092
1093 pub struct MultiNodeCoordinator {
1095 nodes: Arc<RwLock<HashMap<NodeId, Arc<DistributedLogger>>>>,
1096 global_aggregator: Arc<LogAggregator>,
1097 coordination_interval: Duration,
1098 running: Arc<AtomicUsize>, }
1100
1101 impl MultiNodeCoordinator {
1102 pub fn new(coordinationinterval: Duration) -> Self {
1104 let global_node = NodeId::new("global".to_string(), "coordinator".to_string());
1105 let global_aggregator = Arc::new(LogAggregator::new(
1106 global_node,
1107 100000, Duration::from_secs(3600), ));
1110
1111 Self {
1112 nodes: Arc::new(RwLock::new(HashMap::new())),
1113 global_aggregator,
1114 coordination_interval: coordinationinterval,
1115 running: Arc::new(AtomicUsize::new(0)),
1116 }
1117 }
1118
1119 pub fn register_node(&self, nodeid: NodeId, logger: Arc<DistributedLogger>) {
1121 let mut nodes = self.nodes.write().expect("Operation failed");
1122 nodes.insert(nodeid, logger);
1123 }
1124
1125 pub fn unregister_node(&self, nodeid: &NodeId) {
1127 let mut nodes = self.nodes.write().expect("Operation failed");
1128 nodes.remove(nodeid);
1129 }
1130
1131 pub fn start(&self) {
1133 if self
1134 .running
1135 .compare_exchange(0, 1, Ordering::Relaxed, Ordering::Relaxed)
1136 .is_ok()
1137 {
1138 let nodes = self.nodes.clone();
1139 let global_aggregator = self.global_aggregator.clone();
1140 let interval = self.coordination_interval;
1141 let running = self.running.clone();
1142
1143 thread::spawn(move || {
1144 while running.load(Ordering::Relaxed) == 1 {
1145 let nodes_guard = nodes.read().expect("Operation failed");
1147 for logger in nodes_guard.values() {
1148 let entries = logger.get_aggregatedlogs();
1149 for entry in entries {
1150 global_aggregator.add_entry(entry);
1151 }
1152 }
1153 drop(nodes_guard);
1154
1155 thread::sleep(interval);
1156 }
1157 });
1158 }
1159 }
1160
1161 pub fn stop(&self) {
1163 self.running.store(0, Ordering::Relaxed);
1164 }
1165
1166 pub fn get_global_stats(&self) -> AggregationStats {
1168 self.global_aggregator.stats()
1169 }
1170
1171 pub fn get_global_entries(&self) -> Vec<DistributedLogEntry> {
1173 self.global_aggregator.get_entries()
1174 }
1175
1176 pub fn export_globallogs_json(&self) -> Result<String, Box<dyn std::error::Error>> {
1178 let entries = self.get_global_entries();
1179 let stats = self.get_global_stats();
1180
1181 let export_data = serde_json::json!({
1182 "coordinator": "global",
1183 "timestamp": SystemTime::now()
1184 .duration_since(UNIX_EPOCH)
1185 .expect("Test: operation failed")
1186 .as_millis(),
1187 "stats": {
1188 "total_entries": stats.total_entries,
1189 "dropped_entries": stats.dropped_entries,
1190 "nodes_count": self.nodes.read().expect("Operation failed").len(),
1191 "entries_by_level": stats.entries_by_level.iter().map(|(k, v)| (format!("{k:?}"), *v)).collect::<HashMap<String, u64>>()
1192 },
1193 "entries": entries.iter().map(|entry| serde_json::json!({
1194 "id": entry.id,
1195 "nodeid": entry.nodeid.to_string(),
1196 "timestamp": entry.timestamp,
1197 "level": format!("{0:?}", entry.level),
1198 "logger": entry.logger,
1199 "message": entry.message,
1200 "context": entry.context,
1201 "sequence": entry.sequence
1202 })).collect::<Vec<_>>()
1203 });
1204
1205 Ok(serde_json::to_string_pretty(&export_data)?)
1206 }
1207 }
1208
1209 impl Drop for MultiNodeCoordinator {
1210 fn drop(&mut self) {
1211 self.stop();
1212 }
1213 }
1214}
1215
1216#[cfg(test)]
1217mod distributed_tests {
1218 use super::distributed::*;
1219 use super::*;
1220 use std::time::Duration;
1221
1222 #[test]
1223 fn test_nodeid_creation() {
1224 let node = NodeId::new("worker1".to_string(), "pid123".to_string());
1225 assert_eq!(node.name(), "worker1");
1226 assert_eq!(node.instance_id(), "pid123");
1227 assert_eq!(node.to_string(), "worker1:pid123");
1228 }
1229
1230 #[test]
1231 fn testlog_aggregator() {
1232 let nodeid = NodeId::new("test_node".to_string(), 1.to_string());
1233 let aggregator = LogAggregator::new(nodeid.clone(), 100, Duration::from_secs(60));
1234
1235 let entry = DistributedLogEntry::new(
1236 nodeid,
1237 LogLevel::Info,
1238 "testlogger".to_string(),
1239 "Test message".to_string(),
1240 HashMap::new(),
1241 );
1242
1243 aggregator.add_entry(entry);
1244
1245 let entries = aggregator.get_entries();
1246 assert_eq!(entries.len(), 1);
1247 assert_eq!(entries[0].message, "Test message");
1248
1249 let stats = aggregator.stats();
1250 assert_eq!(stats.total_entries, 1);
1251 }
1252
1253 #[test]
1254 fn test_adaptive_rate_limiter() {
1255 let limiter = AdaptiveRateLimiter::new(10.0, Duration::from_millis(100), 0.1);
1256
1257 assert!(limiter.try_acquire());
1259 assert!(limiter.try_acquire());
1260
1261 let stats = limiter.get_stats();
1262 assert!(stats.current_rate >= 0.0);
1263 assert_eq!(stats.max_rate, 10.0);
1264 }
1265
1266 #[test]
1267 fn test_distributedlogger() {
1268 let nodeid = NodeId::new("test_node".to_string(), 1.to_string());
1269 let logger =
1270 DistributedLogger::new("testlogger", nodeid, 1000, Duration::from_secs(60), 100.0);
1271
1272 logger.info_adaptive("Test message 1");
1273 logger.warn_adaptive("Test message 2");
1274
1275 let entries = logger.get_aggregatedlogs();
1276 assert!(!entries.is_empty()); let stats = logger.get_aggregation_stats();
1279 assert!(stats.total_entries >= 1);
1280 }
1281
1282 #[test]
1283 fn test_multi_node_coordinator() {
1284 let coordinator = MultiNodeCoordinator::new(Duration::from_millis(10));
1285
1286 let node1_id = NodeId::new("node1".to_string(), "1".to_string());
1287 let node1logger = Arc::new(DistributedLogger::new(
1288 "node1logger",
1289 node1_id.clone(),
1290 100,
1291 Duration::from_secs(10),
1292 50.0,
1293 ));
1294
1295 coordinator.register_node(node1_id, node1logger);
1296
1297 coordinator.start();
1299
1300 std::thread::sleep(Duration::from_millis(50));
1302
1303 coordinator.stop();
1304
1305 let stats = coordinator.get_global_stats();
1306 let _ = stats.total_entries;
1309 }
1310
1311 #[test]
1312 fn testlog_export() {
1313 let nodeid = NodeId::new("export_test".to_string(), 1.to_string());
1314 let logger =
1315 DistributedLogger::new("exportlogger", nodeid, 100, Duration::from_secs(60), 100.0);
1316
1317 logger.info_adaptive("Export test message");
1318
1319 let json_export = logger.exportlogs_json().expect("Operation failed");
1320 assert!(json_export.contains("export_test"));
1321 assert!(json_export.contains("Export test message"));
1322 }
1323}