1use once_cell::sync::Lazy;
54use std::collections::HashMap;
55use std::fmt::Display;
56use std::sync::{Arc, Mutex};
57use std::time::{Duration, Instant};
58
59pub mod progress;
61
62pub mod rate_limiting;
64
65pub mod bridge;
67
68#[derive(Debug, Clone, Copy, PartialEq, Eq, PartialOrd, Ord, Hash)]
70pub enum LogLevel {
71 Trace = 0,
73 Debug = 1,
75 Info = 2,
77 Warn = 3,
79 Error = 4,
81 Critical = 5,
83}
84
85impl LogLevel {
86 pub const fn as_str(&self) -> &'static str {
88 match self {
89 LogLevel::Trace => "TRACE",
90 LogLevel::Debug => "DEBUG",
91 LogLevel::Info => "INFO",
92 LogLevel::Warn => "WARN",
93 LogLevel::Error => "ERROR",
94 LogLevel::Critical => "CRITICAL",
95 }
96 }
97}
98
99#[derive(Debug, Clone)]
101pub struct LogEntry {
102 pub timestamp: std::time::SystemTime,
104 pub level: LogLevel,
106 pub module: String,
108 pub message: String,
110 pub fields: HashMap<String, String>,
112}
113
114#[derive(Debug, Clone)]
116pub struct LoggerConfig {
117 pub min_level: LogLevel,
119 pub show_timestamps: bool,
121 pub show_modules: bool,
123 pub module_levels: HashMap<String, LogLevel>,
125}
126
127impl Default for LoggerConfig {
128 fn default() -> Self {
129 Self {
130 min_level: LogLevel::Info,
131 show_timestamps: true,
132 show_modules: true,
133 module_levels: HashMap::new(),
134 }
135 }
136}
137
138static LOGGER_CONFIG: Lazy<Mutex<LoggerConfig>> = Lazy::new(|| Mutex::new(LoggerConfig::default()));
140
141#[allow(dead_code)]
143pub fn configurelogger(config: LoggerConfig) {
144 let mut global_config = LOGGER_CONFIG.lock().expect("Operation failed");
145 *global_config = config;
146}
147
148#[allow(dead_code)]
150pub fn set_level(level: LogLevel) {
151 let mut config = LOGGER_CONFIG.lock().expect("Operation failed");
152 config.min_level = level;
153}
154
155#[allow(dead_code)]
157pub fn set_module_level(module: &str, level: LogLevel) {
158 let mut config = LOGGER_CONFIG.lock().expect("Operation failed");
159 config.module_levels.insert(module.to_string(), level);
160}
161
162pub trait LogHandler: Send + Sync {
164 fn handle(&self, entry: &LogEntry);
166}
167
168pub struct ConsoleLogHandler {
170 pub format: String,
172}
173
174impl Default for ConsoleLogHandler {
175 fn default() -> Self {
176 Self {
177 format: "[{level}] {module}: {message}".to_string(),
178 }
179 }
180}
181
182impl LogHandler for ConsoleLogHandler {
183 fn handle(&self, entry: &LogEntry) {
184 let mut output = self.format.clone();
185
186 output = output.replace("{level}", entry.level.as_str());
188 output = output.replace("{module}", &entry.module);
189 output = output.replace("{message}", &entry.message);
190
191 if self.format.contains("{timestamp}") {
192 let datetime = chrono::DateTime::<chrono::Utc>::from(entry.timestamp);
193 output = output.replace(
194 "{timestamp}",
195 &datetime.format("%Y-%m-%d %H:%M:%S%.3f").to_string(),
196 );
197 }
198
199 if self.format.contains("{fields}") {
200 let fields_str = entry
201 .fields
202 .iter()
203 .map(|(k, v)| format!("{k}={v}"))
204 .collect::<Vec<_>>()
205 .join(", ");
206 output = output.replace("{fields}", &fields_str);
207 }
208
209 match entry.level {
211 LogLevel::Error | LogLevel::Critical => eprintln!("{output}"),
212 _ => println!("{output}"),
213 }
214 }
215}
216
217pub struct FileLogHandler {
219 pub file_path: String,
221 pub format: String,
223}
224
225impl LogHandler for FileLogHandler {
226 fn handle(&self, entry: &LogEntry) {
227 let mut output = self.format.clone();
231
232 output = output.replace("{level}", entry.level.as_str());
234 output = output.replace("{module}", &entry.module);
235 output = output.replace("{message}", &entry.message);
236
237 if self.format.contains("{timestamp}") {
238 let datetime = chrono::DateTime::<chrono::Utc>::from(entry.timestamp);
239 output = output.replace(
240 "{timestamp}",
241 &datetime.format("%Y-%m-%d %H:%M:%S%.3f").to_string(),
242 );
243 }
244
245 if self.format.contains("{fields}") {
246 let fields_str = entry
247 .fields
248 .iter()
249 .map(|(k, v)| format!("{k}={v}"))
250 .collect::<Vec<_>>()
251 .join(", ");
252 output = output.replace("{fields}", &fields_str);
253 }
254
255 if let Ok(mut file) = std::fs::OpenOptions::new()
258 .create(true)
259 .append(true)
260 .open(&self.file_path)
261 {
262 use std::io::Write;
263 let _ = writeln!(file, "{output}");
264 }
265 }
266}
267
268static LOG_HANDLERS: Lazy<Mutex<Vec<Arc<dyn LogHandler>>>> = Lazy::new(|| {
270 let console_handler = Arc::new(ConsoleLogHandler::default());
271 Mutex::new(vec![console_handler])
272});
273
274#[allow(dead_code)]
276pub fn set_handler(handler: Arc<dyn LogHandler>) {
277 let mut handlers = LOG_HANDLERS.lock().expect("Operation failed");
278 handlers.push(handler);
279}
280
281#[allow(dead_code)]
283pub fn clearlog_handlers() {
284 let mut handlers = LOG_HANDLERS.lock().expect("Operation failed");
285 handlers.clear();
286}
287
288#[allow(dead_code)]
290pub fn resetlog_handlers() {
291 let mut handlers = LOG_HANDLERS.lock().expect("Operation failed");
292 handlers.clear();
293 handlers.push(Arc::new(ConsoleLogHandler::default()));
294}
295
296#[derive(Clone)]
298pub struct Logger {
299 module: String,
301 fields: HashMap<String, String>,
303}
304
305impl Logger {
306 pub fn new(module: &str) -> Self {
308 Self {
309 module: module.to_string(),
310 fields: HashMap::new(),
311 }
312 }
313
314 pub fn with_field<K, V>(mut self, key: K, value: V) -> Self
316 where
317 K: Into<String>,
318 V: Display,
319 {
320 self.fields.insert(key.into(), format!("{value}"));
321 self
322 }
323
324 pub fn with_fields<K, V, I>(mut self, fields: I) -> Self
326 where
327 K: Into<String>,
328 V: Display,
329 I: IntoIterator<Item = (K, V)>,
330 {
331 for (key, value) in fields {
332 self.fields.insert(key.into(), format!("{value}"));
333 }
334 self
335 }
336
337 pub fn writelog(&self, level: LogLevel, message: &str) {
339 let config = LOGGER_CONFIG.lock().expect("Operation failed");
341 let module_level = config
342 .module_levels
343 .get(&self.module)
344 .copied()
345 .unwrap_or(config.min_level);
346
347 if level < module_level {
348 return;
349 }
350
351 let entry = LogEntry {
353 timestamp: std::time::SystemTime::now(),
354 level,
355 module: self.module.clone(),
356 message: message.to_string(),
357 fields: self.fields.clone(),
358 };
359
360 let handlers = LOG_HANDLERS.lock().expect("Operation failed");
362 for handler in handlers.iter() {
363 handler.handle(&entry);
364 }
365 }
366
367 pub fn trace(&self, message: &str) {
369 self.writelog(LogLevel::Trace, message);
370 }
371
372 pub fn debug(&self, message: &str) {
374 self.writelog(LogLevel::Debug, message);
375 }
376
377 pub fn info(&self, message: &str) {
379 self.writelog(LogLevel::Info, message);
380 }
381
382 pub fn warn(&self, message: &str) {
384 self.writelog(LogLevel::Warn, message);
385 }
386
387 pub fn error(&self, message: &str) {
389 self.writelog(LogLevel::Error, message);
390 }
391
392 pub fn critical(&self, message: &str) {
394 self.writelog(LogLevel::Critical, message);
395 }
396
397 pub fn track_progress(
399 &self,
400 description: &str,
401 total: u64,
402 ) -> progress::EnhancedProgressTracker {
403 use progress::{ProgressBuilder, ProgressStyle};
404
405 let builder = ProgressBuilder::new(description, total)
406 .style(ProgressStyle::DetailedBar)
407 .show_statistics(true);
408
409 let mut tracker = builder.build();
410
411 self.info(&format!("Starting progress tracking: {description}"));
413
414 tracker.start();
415 tracker
416 }
417
418 pub fn info_with_progress(
420 &self,
421 message: &str,
422 progress: &mut progress::EnhancedProgressTracker,
423 update: u64,
424 ) {
425 self.info(message);
426 progress.update(update);
427 }
428
429 pub fn with_progress<F, R>(&self, description: &str, total: u64, operation: F) -> R
431 where
432 F: FnOnce(&mut progress::EnhancedProgressTracker) -> R,
433 {
434 let mut progress = self.track_progress(description, total);
435 let result = operation(&mut progress);
436 progress.finish();
437
438 let stats = progress.stats();
440 self.info(&format!(
441 "Completed progress tracking: {description} - {elapsed:.1}s elapsed",
442 elapsed = stats.elapsed.as_secs_f64()
443 ));
444
445 result
446 }
447}
448
449pub struct ProgressTracker {
451 name: String,
453 total: usize,
455 current: usize,
457 start_time: Instant,
459 last_update: Instant,
461 update_interval: Duration,
463 logger: Logger,
465}
466
467impl ProgressTracker {
468 pub fn new(name: &str, total: usize) -> Self {
470 let now = Instant::now();
471 let logger = Logger::new("progress").with_field("operation", name);
472
473 logger.info(&format!("Starting operation: {name}"));
474
475 Self {
476 name: name.to_string(),
477 total,
478 current: 0,
479 start_time: now,
480 last_update: now,
481 update_interval: Duration::from_millis(500), logger,
483 }
484 }
485
486 pub fn set_update_interval(&mut self, interval: Duration) {
488 self.update_interval = interval;
489 }
490
491 pub fn update(&mut self, current: usize) {
493 self.current = current;
494
495 let now = Instant::now();
496
497 if now.duration_since(self.last_update) >= self.update_interval {
499 self.last_update = now;
500
501 let elapsed = now.duration_since(self.start_time);
502 let percent = (self.current as f64 / self.total as f64) * 100.0;
503
504 let eta = if self.current > 0 {
505 let time_per_item = elapsed.as_secs_f64() / self.current as f64;
506 let remaining = time_per_item * (self.total - self.current) as f64;
507 format!("ETA: {remaining:.1}s")
508 } else {
509 "ETA: calculating...".to_string()
510 };
511
512 self.logger.debug(&format!(
513 "{name}: {current}/{total} ({percent:.1}%) - Elapsed: {elapsed:.1}s - {eta}",
514 name = self.name,
515 current = self.current,
516 total = self.total,
517 elapsed = elapsed.as_secs_f64()
518 ));
519 }
520 }
521
522 pub fn complete(&mut self) {
524 let elapsed = self.start_time.elapsed();
525 self.current = self.total;
526
527 self.logger.info(&format!(
528 "{name} completed: {total}/{total} (100%) - Total time: {elapsed:.1}s",
529 name = self.name,
530 total = self.total,
531 elapsed = elapsed.as_secs_f64()
532 ));
533 }
534
535 pub fn progress_percent(&self) -> f64 {
537 (self.current as f64 / self.total as f64) * 100.0
538 }
539
540 pub fn elapsed(&self) -> Duration {
542 self.start_time.elapsed()
543 }
544
545 pub fn eta(&self) -> Option<Duration> {
547 if self.current == 0 {
548 return None;
549 }
550
551 let elapsed = self.start_time.elapsed();
552 let time_per_item = elapsed.as_secs_f64() / self.current as f64;
553 let remaining_secs = time_per_item * (self.total - self.current) as f64;
554
555 Some(Duration::from_secs_f64(remaining_secs))
556 }
557}
558
559#[allow(dead_code)]
561pub fn init() {
562 let handlers = LOG_HANDLERS.lock().expect("Operation failed");
564 if handlers.is_empty() {
565 drop(handlers);
566 resetlog_handlers();
567 }
568}
569
570#[macro_export]
572macro_rules! getlogger {
573 () => {
574 $crate::logging::Logger::new(module_path!())
575 };
576 ($name:expr) => {
577 $crate::logging::Logger::new($name)
578 };
579}
580
581pub mod distributed {
588 use super::*;
589 use std::collections::{HashMap, VecDeque};
590 use std::fmt;
591 use std::sync::atomic::{AtomicU64, AtomicUsize, Ordering};
592 use std::sync::{Arc, Mutex, RwLock};
593 use std::thread;
594 use std::time::{Duration, Instant, SystemTime, UNIX_EPOCH};
595
596 #[derive(Debug, Clone, PartialEq, Eq, Hash)]
598 pub struct NodeId {
599 name: String,
600 instance_id: String,
601 }
602
603 impl NodeId {
604 pub fn new(name: String, instanceid: String) -> Self {
606 Self {
607 name,
608 instance_id: instanceid,
609 }
610 }
611
612 pub fn from_hostname() -> Self {
614 let hostname = std::env::var("HOSTNAME").unwrap_or_else(|_| "unknown".to_string());
615 let pid = std::process::id();
616 Self::new(hostname, pid.to_string())
617 }
618
619 pub fn name(&self) -> &str {
621 &self.name
622 }
623
624 pub fn instance_id(&self) -> &str {
626 &self.instance_id
627 }
628 }
629
630 impl fmt::Display for NodeId {
631 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
632 write!(f, "{}:{}", self.name, self.instance_id)
633 }
634 }
635
636 #[derive(Debug, Clone)]
638 pub struct DistributedLogEntry {
639 pub id: u64,
641 #[allow(dead_code)]
643 pub nodeid: NodeId,
644 pub timestamp: u64,
646 pub level: LogLevel,
648 pub logger: String,
650 pub message: String,
652 pub context: HashMap<String, String>,
654 pub sequence: u64,
656 }
657
658 impl DistributedLogEntry {
659 pub fn new(
661 nodeid: NodeId,
662 level: LogLevel,
663 logger: String,
664 message: String,
665 context: HashMap<String, String>,
666 ) -> Self {
667 static ID_COUNTER: AtomicU64 = AtomicU64::new(1);
668 static SEQ_COUNTER: AtomicU64 = AtomicU64::new(1);
669
670 Self {
671 id: ID_COUNTER.fetch_add(1, Ordering::Relaxed),
672 nodeid,
673 timestamp: SystemTime::now()
674 .duration_since(UNIX_EPOCH)
675 .expect("Test: operation failed")
676 .as_millis() as u64,
677 level,
678 logger,
679 message,
680 context,
681 sequence: SEQ_COUNTER.fetch_add(1, Ordering::Relaxed),
682 }
683 }
684
685 pub fn age(&self) -> Duration {
687 let now = SystemTime::now()
688 .duration_since(UNIX_EPOCH)
689 .expect("Test: operation failed")
690 .as_millis() as u64;
691 Duration::from_millis(now.saturating_sub(self.timestamp))
692 }
693 }
694
695 #[allow(dead_code)]
697 pub struct LogAggregator {
698 #[allow(dead_code)]
699 nodeid: NodeId,
700 entries: Arc<RwLock<VecDeque<DistributedLogEntry>>>,
701 max_entries: usize,
702 aggregation_window: Duration,
703 stats: Arc<RwLock<AggregationStats>>,
704 }
705
706 #[derive(Debug, Clone, Default)]
708 pub struct AggregationStats {
709 pub total_entries: u64,
710 pub entries_by_level: HashMap<LogLevel, u64>,
711 pub entries_by_node: HashMap<NodeId, u64>,
712 pub dropped_entries: u64,
713 pub aggregation_windows: u64,
714 }
715
716 impl LogAggregator {
717 pub fn new(nodeid: NodeId, max_entries: usize, aggregationwindow: Duration) -> Self {
719 Self {
720 nodeid,
721 entries: Arc::new(RwLock::new(VecDeque::new())),
722 max_entries,
723 aggregation_window: aggregationwindow,
724 stats: Arc::new(RwLock::new(AggregationStats::default())),
725 }
726 }
727
728 pub fn add_entry(&self, entry: DistributedLogEntry) {
730 let mut entries = self.entries.write().expect("Operation failed");
731 let mut stats = self.stats.write().expect("Operation failed");
732
733 let cutoff = entry
735 .timestamp
736 .saturating_sub(self.aggregation_window.as_millis() as u64);
737 while let Some(front) = entries.front() {
738 if front.timestamp >= cutoff {
739 break;
740 }
741 let removed = entries.pop_front().expect("Operation failed");
742 if let Some(count) = stats.entries_by_level.get_mut(&removed.level) {
744 *count = count.saturating_sub(1);
745 }
746 if let Some(count) = stats.entries_by_node.get_mut(&removed.nodeid) {
747 *count = count.saturating_sub(1);
748 }
749 }
750
751 if entries.len() >= self.max_entries {
753 if let Some(removed) = entries.pop_front() {
754 stats.dropped_entries += 1;
755 if let Some(count) = stats.entries_by_level.get_mut(&removed.level) {
757 *count = count.saturating_sub(1);
758 }
759 if let Some(count) = stats.entries_by_node.get_mut(&removed.nodeid) {
760 *count = count.saturating_sub(1);
761 }
762 }
763 }
764
765 stats.total_entries += 1;
767 *stats.entries_by_level.entry(entry.level).or_insert(0) += 1;
768 *stats
769 .entries_by_node
770 .entry(entry.nodeid.clone())
771 .or_insert(0) += 1;
772
773 entries.push_back(entry);
774 }
775
776 pub fn get_entries(&self) -> Vec<DistributedLogEntry> {
778 self.entries
779 .read()
780 .expect("Operation failed")
781 .iter()
782 .cloned()
783 .collect()
784 }
785
786 pub fn get_entries_by_level(&self, level: LogLevel) -> Vec<DistributedLogEntry> {
788 self.entries
789 .read()
790 .expect("Test: operation failed")
791 .iter()
792 .filter(|entry| entry.level == level)
793 .cloned()
794 .collect()
795 }
796
797 pub fn get_entries_by_node(&self, nodeid: &NodeId) -> Vec<DistributedLogEntry> {
799 self.entries
800 .read()
801 .expect("Test: operation failed")
802 .iter()
803 .filter(|entry| &entry.nodeid == nodeid)
804 .cloned()
805 .collect()
806 }
807
808 pub fn stats(&self) -> AggregationStats {
810 self.stats.read().expect("Operation failed").clone()
811 }
812
813 pub fn clear(&self) {
815 self.entries.write().expect("Operation failed").clear();
816 *self.stats.write().expect("Operation failed") = AggregationStats::default();
817 }
818 }
819
820 pub struct AdaptiveRateLimiter {
822 max_rate: Arc<Mutex<f64>>, current_rate: Arc<Mutex<f64>>,
824 last_reset: Arc<Mutex<Instant>>,
825 message_count: Arc<AtomicUsize>,
826 window_duration: Duration,
827 adaptation_factor: f64,
828 min_rate: f64,
829 max_rate_absolute: f64,
830 }
831
832 impl AdaptiveRateLimiter {
833 pub fn new(
835 initial_max_rate: f64,
836 window_duration: Duration,
837 adaptation_factor: f64,
838 ) -> Self {
839 Self {
840 max_rate: Arc::new(Mutex::new(initial_max_rate)),
841 current_rate: Arc::new(Mutex::new(0.0)),
842 last_reset: Arc::new(Mutex::new(Instant::now())),
843 message_count: Arc::new(AtomicUsize::new(0)),
844 window_duration,
845 adaptation_factor,
846 min_rate: initial_max_rate * 0.1, max_rate_absolute: initial_max_rate * 10.0, }
849 }
850
851 pub fn try_acquire(&self) -> bool {
853 let now = Instant::now();
854 let count = self.message_count.fetch_add(1, Ordering::Relaxed);
855
856 let mut last_reset = self.last_reset.lock().expect("Operation failed");
857 let elapsed = now.duration_since(*last_reset);
858
859 if elapsed >= self.window_duration {
860 let actual_rate = count as f64 / elapsed.as_secs_f64();
862 {
863 let mut current_rate = self.current_rate.lock().expect("Operation failed");
864 *current_rate = actual_rate;
865 }
866
867 self.message_count.store(0, Ordering::Relaxed);
868 *last_reset = now;
869
870 self.adapt_rate(actual_rate);
872
873 true } else {
875 let elapsed_secs = elapsed.as_secs_f64();
877 if elapsed_secs < 0.001 {
878 true
880 } else {
881 let current_rate = count as f64 / elapsed_secs;
882 let max_rate = *self.max_rate.lock().expect("Operation failed");
883 current_rate <= max_rate
884 }
885 }
886 }
887
888 fn adapt_rate(&self, actualrate: f64) {
890 let mut max_rate = self.max_rate.lock().expect("Operation failed");
891
892 if actualrate < *max_rate * 0.5 {
895 *max_rate = (*max_rate * (1.0 - self.adaptation_factor)).max(self.min_rate);
897 } else if actualrate >= *max_rate * 0.9 {
898 *max_rate =
900 (*max_rate * (1.0 + self.adaptation_factor)).min(self.max_rate_absolute);
901 }
902 }
903
904 pub fn get_stats(&self) -> RateLimitStats {
906 let current_rate = *self.current_rate.lock().expect("Operation failed");
907 let max_rate = *self.max_rate.lock().expect("Operation failed");
908 RateLimitStats {
909 current_rate,
910 max_rate,
911 message_count: self.message_count.load(Ordering::Relaxed),
912 window_duration: self.window_duration,
913 }
914 }
915
916 pub fn reset(&self) {
918 *self.current_rate.lock().expect("Operation failed") = 0.0;
919 *self.last_reset.lock().expect("Operation failed") = Instant::now();
920 self.message_count.store(0, Ordering::Relaxed);
921 }
922 }
923
924 #[derive(Debug, Clone)]
926 pub struct RateLimitStats {
927 pub current_rate: f64,
928 pub max_rate: f64,
929 pub message_count: usize,
930 pub window_duration: Duration,
931 }
932
933 pub struct DistributedLogger {
935 #[allow(dead_code)]
936 nodeid: NodeId,
937 locallogger: Logger,
938 aggregator: Arc<LogAggregator>,
939 rate_limiters: Arc<RwLock<HashMap<String, AdaptiveRateLimiter>>>,
940 default_rate_limit: f64,
941 }
942
943 impl DistributedLogger {
944 pub fn new(
946 logger_name: &str,
947 nodeid: NodeId,
948 max_entries: usize,
949 aggregation_window: Duration,
950 default_rate_limit: f64,
951 ) -> Self {
952 let locallogger = Logger::new(logger_name);
953 let aggregator = Arc::new(LogAggregator::new(
954 nodeid.clone(),
955 max_entries,
956 aggregation_window,
957 ));
958
959 Self {
960 nodeid,
961 locallogger,
962 aggregator,
963 rate_limiters: Arc::new(RwLock::new(HashMap::new())),
964 default_rate_limit,
965 }
966 }
967
968 pub fn log_adaptive(
970 &self,
971 level: LogLevel,
972 message: &str,
973 context: Option<HashMap<String, String>>,
974 ) {
975 let logger_key = self.locallogger.module.clone();
976
977 let shouldlog = {
979 let rate_limiters = self.rate_limiters.read().expect("Operation failed");
980 if let Some(limiter) = rate_limiters.get(&logger_key) {
981 limiter.try_acquire()
982 } else {
983 drop(rate_limiters);
984
985 let mut rate_limiters = self.rate_limiters.write().expect("Operation failed");
987 let limiter = AdaptiveRateLimiter::new(
988 self.default_rate_limit,
989 Duration::from_secs(1),
990 0.1, );
992 let shouldlog = limiter.try_acquire();
993 rate_limiters.insert(logger_key, limiter);
994 shouldlog
995 }
996 };
997
998 if shouldlog {
999 self.locallogger.writelog(level, message);
1001
1002 let entry = DistributedLogEntry::new(
1004 self.nodeid.clone(),
1005 level,
1006 self.locallogger.module.clone(),
1007 message.to_string(),
1008 context.unwrap_or_default(),
1009 );
1010
1011 self.aggregator.add_entry(entry);
1013 }
1014 }
1015
1016 pub fn error_adaptive(&self, message: &str) {
1018 self.log_adaptive(LogLevel::Error, message, None);
1019 }
1020
1021 pub fn warn_adaptive(&self, message: &str) {
1022 self.log_adaptive(LogLevel::Warn, message, None);
1023 }
1024
1025 pub fn info_adaptive(&self, message: &str) {
1026 self.log_adaptive(LogLevel::Info, message, None);
1027 }
1028
1029 pub fn debug_adaptive(&self, message: &str) {
1030 self.log_adaptive(LogLevel::Debug, message, None);
1031 }
1032
1033 pub fn get_aggregatedlogs(&self) -> Vec<DistributedLogEntry> {
1035 self.aggregator.get_entries()
1036 }
1037
1038 pub fn get_rate_stats(&self) -> HashMap<String, RateLimitStats> {
1040 self.rate_limiters
1041 .read()
1042 .expect("Test: operation failed")
1043 .iter()
1044 .map(|(k, v)| (k.clone(), v.get_stats()))
1045 .collect()
1046 }
1047
1048 pub fn get_aggregation_stats(&self) -> AggregationStats {
1050 self.aggregator.stats()
1051 }
1052
1053 pub fn exportlogs_json(&self) -> Result<String, Box<dyn std::error::Error>> {
1055 let entries = self.get_aggregatedlogs();
1056 let stats = self.get_aggregation_stats();
1057
1058 let export_data = serde_json::json!({
1059 "nodeid": self.nodeid.to_string(),
1060 "timestamp": SystemTime::now()
1061 .duration_since(UNIX_EPOCH)
1062 .expect("Test: operation failed")
1063 .as_millis(),
1064 "stats": {
1065 "total_entries": stats.total_entries,
1066 "dropped_entries": stats.dropped_entries,
1067 "aggregation_windows": stats.aggregation_windows
1068 },
1069 "entries": entries.iter().map(|entry| serde_json::json!({
1070 "id": entry.id,
1071 "nodeid": entry.nodeid.to_string(),
1072 "timestamp": entry.timestamp,
1073 "level": format!("{0:?}", entry.level),
1074 "logger": entry.logger,
1075 "message": entry.message,
1076 "context": entry.context,
1077 "sequence": entry.sequence
1078 })).collect::<Vec<_>>()
1079 });
1080
1081 Ok(serde_json::to_string_pretty(&export_data)?)
1082 }
1083
1084 pub fn clear_aggregated_data(&self) {
1086 self.aggregator.clear();
1087
1088 let rate_limiters = self.rate_limiters.write().expect("Operation failed");
1090 for limiter in rate_limiters.values() {
1091 limiter.reset();
1092 }
1093 }
1094 }
1095
1096 pub struct MultiNodeCoordinator {
1098 nodes: Arc<RwLock<HashMap<NodeId, Arc<DistributedLogger>>>>,
1099 global_aggregator: Arc<LogAggregator>,
1100 coordination_interval: Duration,
1101 running: Arc<AtomicUsize>, }
1103
1104 impl MultiNodeCoordinator {
1105 pub fn new(coordinationinterval: Duration) -> Self {
1107 let global_node = NodeId::new("global".to_string(), "coordinator".to_string());
1108 let global_aggregator = Arc::new(LogAggregator::new(
1109 global_node,
1110 100000, Duration::from_secs(3600), ));
1113
1114 Self {
1115 nodes: Arc::new(RwLock::new(HashMap::new())),
1116 global_aggregator,
1117 coordination_interval: coordinationinterval,
1118 running: Arc::new(AtomicUsize::new(0)),
1119 }
1120 }
1121
1122 pub fn register_node(&self, nodeid: NodeId, logger: Arc<DistributedLogger>) {
1124 let mut nodes = self.nodes.write().expect("Operation failed");
1125 nodes.insert(nodeid, logger);
1126 }
1127
1128 pub fn unregister_node(&self, nodeid: &NodeId) {
1130 let mut nodes = self.nodes.write().expect("Operation failed");
1131 nodes.remove(nodeid);
1132 }
1133
1134 pub fn start(&self) {
1136 if self
1137 .running
1138 .compare_exchange(0, 1, Ordering::Relaxed, Ordering::Relaxed)
1139 .is_ok()
1140 {
1141 let nodes = self.nodes.clone();
1142 let global_aggregator = self.global_aggregator.clone();
1143 let interval = self.coordination_interval;
1144 let running = self.running.clone();
1145
1146 thread::spawn(move || {
1147 while running.load(Ordering::Relaxed) == 1 {
1148 let nodes_guard = nodes.read().expect("Operation failed");
1150 for logger in nodes_guard.values() {
1151 let entries = logger.get_aggregatedlogs();
1152 for entry in entries {
1153 global_aggregator.add_entry(entry);
1154 }
1155 }
1156 drop(nodes_guard);
1157
1158 thread::sleep(interval);
1159 }
1160 });
1161 }
1162 }
1163
1164 pub fn stop(&self) {
1166 self.running.store(0, Ordering::Relaxed);
1167 }
1168
1169 pub fn get_global_stats(&self) -> AggregationStats {
1171 self.global_aggregator.stats()
1172 }
1173
1174 pub fn get_global_entries(&self) -> Vec<DistributedLogEntry> {
1176 self.global_aggregator.get_entries()
1177 }
1178
1179 pub fn export_globallogs_json(&self) -> Result<String, Box<dyn std::error::Error>> {
1181 let entries = self.get_global_entries();
1182 let stats = self.get_global_stats();
1183
1184 let export_data = serde_json::json!({
1185 "coordinator": "global",
1186 "timestamp": SystemTime::now()
1187 .duration_since(UNIX_EPOCH)
1188 .expect("Test: operation failed")
1189 .as_millis(),
1190 "stats": {
1191 "total_entries": stats.total_entries,
1192 "dropped_entries": stats.dropped_entries,
1193 "nodes_count": self.nodes.read().expect("Operation failed").len(),
1194 "entries_by_level": stats.entries_by_level.iter().map(|(k, v)| (format!("{k:?}"), *v)).collect::<HashMap<String, u64>>()
1195 },
1196 "entries": entries.iter().map(|entry| serde_json::json!({
1197 "id": entry.id,
1198 "nodeid": entry.nodeid.to_string(),
1199 "timestamp": entry.timestamp,
1200 "level": format!("{0:?}", entry.level),
1201 "logger": entry.logger,
1202 "message": entry.message,
1203 "context": entry.context,
1204 "sequence": entry.sequence
1205 })).collect::<Vec<_>>()
1206 });
1207
1208 Ok(serde_json::to_string_pretty(&export_data)?)
1209 }
1210 }
1211
1212 impl Drop for MultiNodeCoordinator {
1213 fn drop(&mut self) {
1214 self.stop();
1215 }
1216 }
1217}
1218
1219#[cfg(test)]
1220mod distributed_tests {
1221 use super::distributed::*;
1222 use super::*;
1223 use std::time::Duration;
1224
1225 #[test]
1226 fn test_nodeid_creation() {
1227 let node = NodeId::new("worker1".to_string(), "pid123".to_string());
1228 assert_eq!(node.name(), "worker1");
1229 assert_eq!(node.instance_id(), "pid123");
1230 assert_eq!(node.to_string(), "worker1:pid123");
1231 }
1232
1233 #[test]
1234 fn testlog_aggregator() {
1235 let nodeid = NodeId::new("test_node".to_string(), 1.to_string());
1236 let aggregator = LogAggregator::new(nodeid.clone(), 100, Duration::from_secs(60));
1237
1238 let entry = DistributedLogEntry::new(
1239 nodeid,
1240 LogLevel::Info,
1241 "testlogger".to_string(),
1242 "Test message".to_string(),
1243 HashMap::new(),
1244 );
1245
1246 aggregator.add_entry(entry);
1247
1248 let entries = aggregator.get_entries();
1249 assert_eq!(entries.len(), 1);
1250 assert_eq!(entries[0].message, "Test message");
1251
1252 let stats = aggregator.stats();
1253 assert_eq!(stats.total_entries, 1);
1254 }
1255
1256 #[test]
1257 fn test_adaptive_rate_limiter() {
1258 let limiter = AdaptiveRateLimiter::new(10.0, Duration::from_millis(100), 0.1);
1259
1260 assert!(limiter.try_acquire());
1262 assert!(limiter.try_acquire());
1263
1264 let stats = limiter.get_stats();
1265 assert!(stats.current_rate >= 0.0);
1266 assert_eq!(stats.max_rate, 10.0);
1267 }
1268
1269 #[test]
1270 fn test_distributedlogger() {
1271 let nodeid = NodeId::new("test_node".to_string(), 1.to_string());
1272 let logger =
1273 DistributedLogger::new("testlogger", nodeid, 1000, Duration::from_secs(60), 100.0);
1274
1275 logger.info_adaptive("Test message 1");
1276 logger.warn_adaptive("Test message 2");
1277
1278 let entries = logger.get_aggregatedlogs();
1279 assert!(!entries.is_empty()); let stats = logger.get_aggregation_stats();
1282 assert!(stats.total_entries >= 1);
1283 }
1284
1285 #[test]
1286 fn test_multi_node_coordinator() {
1287 let coordinator = MultiNodeCoordinator::new(Duration::from_millis(10));
1288
1289 let node1_id = NodeId::new("node1".to_string(), "1".to_string());
1290 let node1logger = Arc::new(DistributedLogger::new(
1291 "node1logger",
1292 node1_id.clone(),
1293 100,
1294 Duration::from_secs(10),
1295 50.0,
1296 ));
1297
1298 coordinator.register_node(node1_id, node1logger);
1299
1300 coordinator.start();
1302
1303 std::thread::sleep(Duration::from_millis(50));
1305
1306 coordinator.stop();
1307
1308 let stats = coordinator.get_global_stats();
1309 let _ = stats.total_entries;
1312 }
1313
1314 #[test]
1315 fn testlog_export() {
1316 let nodeid = NodeId::new("export_test".to_string(), 1.to_string());
1317 let logger =
1318 DistributedLogger::new("exportlogger", nodeid, 100, Duration::from_secs(60), 100.0);
1319
1320 logger.info_adaptive("Export test message");
1321
1322 let json_export = logger.exportlogs_json().expect("Operation failed");
1323 assert!(json_export.contains("export_test"));
1324 assert!(json_export.contains("Export test message"));
1325 }
1326}