use once_cell::sync::Lazy;
use std::collections::HashMap;
use std::fmt::Display;
use std::sync::{Arc, Mutex};
use std::time::{Duration, Instant};
pub mod progress;
pub mod rate_limiting;
pub mod bridge;
#[derive(Debug, Clone, Copy, PartialEq, Eq, PartialOrd, Ord, Hash)]
pub enum LogLevel {
Trace = 0,
Debug = 1,
Info = 2,
Warn = 3,
Error = 4,
Critical = 5,
}
impl LogLevel {
pub const fn as_str(&self) -> &'static str {
match self {
LogLevel::Trace => "TRACE",
LogLevel::Debug => "DEBUG",
LogLevel::Info => "INFO",
LogLevel::Warn => "WARN",
LogLevel::Error => "ERROR",
LogLevel::Critical => "CRITICAL",
}
}
}
#[derive(Debug, Clone)]
pub struct LogEntry {
pub timestamp: std::time::SystemTime,
pub level: LogLevel,
pub module: String,
pub message: String,
pub fields: HashMap<String, String>,
}
#[derive(Debug, Clone)]
pub struct LoggerConfig {
pub min_level: LogLevel,
pub show_timestamps: bool,
pub show_modules: bool,
pub module_levels: HashMap<String, LogLevel>,
}
impl Default for LoggerConfig {
fn default() -> Self {
Self {
min_level: LogLevel::Info,
show_timestamps: true,
show_modules: true,
module_levels: HashMap::new(),
}
}
}
static LOGGER_CONFIG: Lazy<Mutex<LoggerConfig>> = Lazy::new(|| Mutex::new(LoggerConfig::default()));
#[allow(dead_code)]
pub fn configurelogger(config: LoggerConfig) {
let mut global_config = LOGGER_CONFIG.lock().expect("Operation failed");
*global_config = config;
}
#[allow(dead_code)]
pub fn set_level(level: LogLevel) {
let mut config = LOGGER_CONFIG.lock().expect("Operation failed");
config.min_level = level;
}
#[allow(dead_code)]
pub fn set_module_level(module: &str, level: LogLevel) {
let mut config = LOGGER_CONFIG.lock().expect("Operation failed");
config.module_levels.insert(module.to_string(), level);
}
pub trait LogHandler: Send + Sync {
fn handle(&self, entry: &LogEntry);
}
pub struct ConsoleLogHandler {
pub format: String,
}
impl Default for ConsoleLogHandler {
fn default() -> Self {
Self {
format: "[{level}] {module}: {message}".to_string(),
}
}
}
impl LogHandler for ConsoleLogHandler {
fn handle(&self, entry: &LogEntry) {
let mut output = self.format.clone();
output = output.replace("{level}", entry.level.as_str());
output = output.replace("{module}", &entry.module);
output = output.replace("{message}", &entry.message);
if self.format.contains("{timestamp}") {
let datetime = chrono::DateTime::<chrono::Utc>::from(entry.timestamp);
output = output.replace(
"{timestamp}",
&datetime.format("%Y-%m-%d %H:%M:%S%.3f").to_string(),
);
}
if self.format.contains("{fields}") {
let fields_str = entry
.fields
.iter()
.map(|(k, v)| format!("{k}={v}"))
.collect::<Vec<_>>()
.join(", ");
output = output.replace("{fields}", &fields_str);
}
match entry.level {
LogLevel::Error | LogLevel::Critical => eprintln!("{output}"),
_ => println!("{output}"),
}
}
}
pub struct FileLogHandler {
pub file_path: String,
pub format: String,
}
impl LogHandler for FileLogHandler {
fn handle(&self, entry: &LogEntry) {
let mut output = self.format.clone();
output = output.replace("{level}", entry.level.as_str());
output = output.replace("{module}", &entry.module);
output = output.replace("{message}", &entry.message);
if self.format.contains("{timestamp}") {
let datetime = chrono::DateTime::<chrono::Utc>::from(entry.timestamp);
output = output.replace(
"{timestamp}",
&datetime.format("%Y-%m-%d %H:%M:%S%.3f").to_string(),
);
}
if self.format.contains("{fields}") {
let fields_str = entry
.fields
.iter()
.map(|(k, v)| format!("{k}={v}"))
.collect::<Vec<_>>()
.join(", ");
output = output.replace("{fields}", &fields_str);
}
if let Ok(mut file) = std::fs::OpenOptions::new()
.create(true)
.append(true)
.open(&self.file_path)
{
use std::io::Write;
let _ = writeln!(file, "{output}");
}
}
}
static LOG_HANDLERS: Lazy<Mutex<Vec<Arc<dyn LogHandler>>>> = Lazy::new(|| {
let console_handler = Arc::new(ConsoleLogHandler::default());
Mutex::new(vec![console_handler])
});
#[allow(dead_code)]
pub fn set_handler(handler: Arc<dyn LogHandler>) {
let mut handlers = LOG_HANDLERS.lock().expect("Operation failed");
handlers.push(handler);
}
#[allow(dead_code)]
pub fn clearlog_handlers() {
let mut handlers = LOG_HANDLERS.lock().expect("Operation failed");
handlers.clear();
}
#[allow(dead_code)]
pub fn resetlog_handlers() {
let mut handlers = LOG_HANDLERS.lock().expect("Operation failed");
handlers.clear();
handlers.push(Arc::new(ConsoleLogHandler::default()));
}
#[derive(Clone)]
pub struct Logger {
module: String,
fields: HashMap<String, String>,
}
impl Logger {
pub fn new(module: &str) -> Self {
Self {
module: module.to_string(),
fields: HashMap::new(),
}
}
pub fn with_field<K, V>(mut self, key: K, value: V) -> Self
where
K: Into<String>,
V: Display,
{
self.fields.insert(key.into(), format!("{value}"));
self
}
pub fn with_fields<K, V, I>(mut self, fields: I) -> Self
where
K: Into<String>,
V: Display,
I: IntoIterator<Item = (K, V)>,
{
for (key, value) in fields {
self.fields.insert(key.into(), format!("{value}"));
}
self
}
pub fn writelog(&self, level: LogLevel, message: &str) {
let config = LOGGER_CONFIG.lock().expect("Operation failed");
let module_level = config
.module_levels
.get(&self.module)
.copied()
.unwrap_or(config.min_level);
if level < module_level {
return;
}
let entry = LogEntry {
timestamp: std::time::SystemTime::now(),
level,
module: self.module.clone(),
message: message.to_string(),
fields: self.fields.clone(),
};
let handlers = LOG_HANDLERS.lock().expect("Operation failed");
for handler in handlers.iter() {
handler.handle(&entry);
}
}
pub fn trace(&self, message: &str) {
self.writelog(LogLevel::Trace, message);
}
pub fn debug(&self, message: &str) {
self.writelog(LogLevel::Debug, message);
}
pub fn info(&self, message: &str) {
self.writelog(LogLevel::Info, message);
}
pub fn warn(&self, message: &str) {
self.writelog(LogLevel::Warn, message);
}
pub fn error(&self, message: &str) {
self.writelog(LogLevel::Error, message);
}
pub fn critical(&self, message: &str) {
self.writelog(LogLevel::Critical, message);
}
pub fn track_progress(
&self,
description: &str,
total: u64,
) -> progress::EnhancedProgressTracker {
use progress::{ProgressBuilder, ProgressStyle};
let builder = ProgressBuilder::new(description, total)
.style(ProgressStyle::DetailedBar)
.show_statistics(true);
let mut tracker = builder.build();
self.info(&format!("Starting progress tracking: {description}"));
tracker.start();
tracker
}
pub fn info_with_progress(
&self,
message: &str,
progress: &mut progress::EnhancedProgressTracker,
update: u64,
) {
self.info(message);
progress.update(update);
}
pub fn with_progress<F, R>(&self, description: &str, total: u64, operation: F) -> R
where
F: FnOnce(&mut progress::EnhancedProgressTracker) -> R,
{
let mut progress = self.track_progress(description, total);
let result = operation(&mut progress);
progress.finish();
let stats = progress.stats();
self.info(&format!(
"Completed progress tracking: {description} - {elapsed:.1}s elapsed",
elapsed = stats.elapsed.as_secs_f64()
));
result
}
}
pub struct ProgressTracker {
name: String,
total: usize,
current: usize,
start_time: Instant,
last_update: Instant,
update_interval: Duration,
logger: Logger,
}
impl ProgressTracker {
pub fn new(name: &str, total: usize) -> Self {
let now = Instant::now();
let logger = Logger::new("progress").with_field("operation", name);
logger.info(&format!("Starting operation: {name}"));
Self {
name: name.to_string(),
total,
current: 0,
start_time: now,
last_update: now,
update_interval: Duration::from_millis(500), logger,
}
}
pub fn set_update_interval(&mut self, interval: Duration) {
self.update_interval = interval;
}
pub fn update(&mut self, current: usize) {
self.current = current;
let now = Instant::now();
if now.duration_since(self.last_update) >= self.update_interval {
self.last_update = now;
let elapsed = now.duration_since(self.start_time);
let percent = (self.current as f64 / self.total as f64) * 100.0;
let eta = if self.current > 0 {
let time_per_item = elapsed.as_secs_f64() / self.current as f64;
let remaining = time_per_item * (self.total - self.current) as f64;
format!("ETA: {remaining:.1}s")
} else {
"ETA: calculating...".to_string()
};
self.logger.debug(&format!(
"{name}: {current}/{total} ({percent:.1}%) - Elapsed: {elapsed:.1}s - {eta}",
name = self.name,
current = self.current,
total = self.total,
elapsed = elapsed.as_secs_f64()
));
}
}
pub fn complete(&mut self) {
let elapsed = self.start_time.elapsed();
self.current = self.total;
self.logger.info(&format!(
"{name} completed: {total}/{total} (100%) - Total time: {elapsed:.1}s",
name = self.name,
total = self.total,
elapsed = elapsed.as_secs_f64()
));
}
pub fn progress_percent(&self) -> f64 {
(self.current as f64 / self.total as f64) * 100.0
}
pub fn elapsed(&self) -> Duration {
self.start_time.elapsed()
}
pub fn eta(&self) -> Option<Duration> {
if self.current == 0 {
return None;
}
let elapsed = self.start_time.elapsed();
let time_per_item = elapsed.as_secs_f64() / self.current as f64;
let remaining_secs = time_per_item * (self.total - self.current) as f64;
Some(Duration::from_secs_f64(remaining_secs))
}
}
#[allow(dead_code)]
pub fn init() {
let handlers = LOG_HANDLERS.lock().expect("Operation failed");
if handlers.is_empty() {
drop(handlers);
resetlog_handlers();
}
}
#[macro_export]
macro_rules! getlogger {
() => {
$crate::logging::Logger::new(module_path!())
};
($name:expr) => {
$crate::logging::Logger::new($name)
};
}
pub mod distributed {
use super::*;
use std::collections::{HashMap, VecDeque};
use std::fmt;
use std::sync::atomic::{AtomicU64, AtomicUsize, Ordering};
use std::sync::{Arc, Mutex, RwLock};
use std::thread;
use std::time::{Duration, Instant, SystemTime, UNIX_EPOCH};
#[derive(Debug, Clone, PartialEq, Eq, Hash)]
pub struct NodeId {
name: String,
instance_id: String,
}
impl NodeId {
pub fn new(name: String, instanceid: String) -> Self {
Self {
name,
instance_id: instanceid,
}
}
pub fn from_hostname() -> Self {
let hostname = std::env::var("HOSTNAME").unwrap_or_else(|_| "unknown".to_string());
let pid = std::process::id();
Self::new(hostname, pid.to_string())
}
pub fn name(&self) -> &str {
&self.name
}
pub fn instance_id(&self) -> &str {
&self.instance_id
}
}
impl fmt::Display for NodeId {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
write!(f, "{}:{}", self.name, self.instance_id)
}
}
#[derive(Debug, Clone)]
pub struct DistributedLogEntry {
pub id: u64,
#[allow(dead_code)]
pub nodeid: NodeId,
pub timestamp: u64,
pub level: LogLevel,
pub logger: String,
pub message: String,
pub context: HashMap<String, String>,
pub sequence: u64,
}
impl DistributedLogEntry {
pub fn new(
nodeid: NodeId,
level: LogLevel,
logger: String,
message: String,
context: HashMap<String, String>,
) -> Self {
static ID_COUNTER: AtomicU64 = AtomicU64::new(1);
static SEQ_COUNTER: AtomicU64 = AtomicU64::new(1);
Self {
id: ID_COUNTER.fetch_add(1, Ordering::Relaxed),
nodeid,
timestamp: SystemTime::now()
.duration_since(UNIX_EPOCH)
.expect("Test: operation failed")
.as_millis() as u64,
level,
logger,
message,
context,
sequence: SEQ_COUNTER.fetch_add(1, Ordering::Relaxed),
}
}
pub fn age(&self) -> Duration {
let now = SystemTime::now()
.duration_since(UNIX_EPOCH)
.expect("Test: operation failed")
.as_millis() as u64;
Duration::from_millis(now.saturating_sub(self.timestamp))
}
}
#[allow(dead_code)]
pub struct LogAggregator {
#[allow(dead_code)]
nodeid: NodeId,
entries: Arc<RwLock<VecDeque<DistributedLogEntry>>>,
max_entries: usize,
aggregation_window: Duration,
stats: Arc<RwLock<AggregationStats>>,
}
#[derive(Debug, Clone, Default)]
pub struct AggregationStats {
pub total_entries: u64,
pub entries_by_level: HashMap<LogLevel, u64>,
pub entries_by_node: HashMap<NodeId, u64>,
pub dropped_entries: u64,
pub aggregation_windows: u64,
}
impl LogAggregator {
pub fn new(nodeid: NodeId, max_entries: usize, aggregationwindow: Duration) -> Self {
Self {
nodeid,
entries: Arc::new(RwLock::new(VecDeque::new())),
max_entries,
aggregation_window: aggregationwindow,
stats: Arc::new(RwLock::new(AggregationStats::default())),
}
}
pub fn add_entry(&self, entry: DistributedLogEntry) {
let mut entries = self.entries.write().expect("Operation failed");
let mut stats = self.stats.write().expect("Operation failed");
let cutoff = entry
.timestamp
.saturating_sub(self.aggregation_window.as_millis() as u64);
while let Some(front) = entries.front() {
if front.timestamp >= cutoff {
break;
}
let removed = entries.pop_front().expect("Operation failed");
if let Some(count) = stats.entries_by_level.get_mut(&removed.level) {
*count = count.saturating_sub(1);
}
if let Some(count) = stats.entries_by_node.get_mut(&removed.nodeid) {
*count = count.saturating_sub(1);
}
}
if entries.len() >= self.max_entries {
if let Some(removed) = entries.pop_front() {
stats.dropped_entries += 1;
if let Some(count) = stats.entries_by_level.get_mut(&removed.level) {
*count = count.saturating_sub(1);
}
if let Some(count) = stats.entries_by_node.get_mut(&removed.nodeid) {
*count = count.saturating_sub(1);
}
}
}
stats.total_entries += 1;
*stats.entries_by_level.entry(entry.level).or_insert(0) += 1;
*stats
.entries_by_node
.entry(entry.nodeid.clone())
.or_insert(0) += 1;
entries.push_back(entry);
}
pub fn get_entries(&self) -> Vec<DistributedLogEntry> {
self.entries
.read()
.expect("Operation failed")
.iter()
.cloned()
.collect()
}
pub fn get_entries_by_level(&self, level: LogLevel) -> Vec<DistributedLogEntry> {
self.entries
.read()
.expect("Test: operation failed")
.iter()
.filter(|entry| entry.level == level)
.cloned()
.collect()
}
pub fn get_entries_by_node(&self, nodeid: &NodeId) -> Vec<DistributedLogEntry> {
self.entries
.read()
.expect("Test: operation failed")
.iter()
.filter(|entry| &entry.nodeid == nodeid)
.cloned()
.collect()
}
pub fn stats(&self) -> AggregationStats {
self.stats.read().expect("Operation failed").clone()
}
pub fn clear(&self) {
self.entries.write().expect("Operation failed").clear();
*self.stats.write().expect("Operation failed") = AggregationStats::default();
}
}
pub struct AdaptiveRateLimiter {
max_rate: Arc<Mutex<f64>>, current_rate: Arc<Mutex<f64>>,
last_reset: Arc<Mutex<Instant>>,
message_count: Arc<AtomicUsize>,
window_duration: Duration,
adaptation_factor: f64,
min_rate: f64,
max_rate_absolute: f64,
}
impl AdaptiveRateLimiter {
pub fn new(
initial_max_rate: f64,
window_duration: Duration,
adaptation_factor: f64,
) -> Self {
Self {
max_rate: Arc::new(Mutex::new(initial_max_rate)),
current_rate: Arc::new(Mutex::new(0.0)),
last_reset: Arc::new(Mutex::new(Instant::now())),
message_count: Arc::new(AtomicUsize::new(0)),
window_duration,
adaptation_factor,
min_rate: initial_max_rate * 0.1, max_rate_absolute: initial_max_rate * 10.0, }
}
pub fn try_acquire(&self) -> bool {
let now = Instant::now();
let count = self.message_count.fetch_add(1, Ordering::Relaxed);
let mut last_reset = self.last_reset.lock().expect("Operation failed");
let elapsed = now.duration_since(*last_reset);
if elapsed >= self.window_duration {
let actual_rate = count as f64 / elapsed.as_secs_f64();
{
let mut current_rate = self.current_rate.lock().expect("Operation failed");
*current_rate = actual_rate;
}
self.message_count.store(0, Ordering::Relaxed);
*last_reset = now;
self.adapt_rate(actual_rate);
true } else {
let elapsed_secs = elapsed.as_secs_f64();
if elapsed_secs < 0.001 {
true
} else {
let current_rate = count as f64 / elapsed_secs;
let max_rate = *self.max_rate.lock().expect("Operation failed");
current_rate <= max_rate
}
}
}
fn adapt_rate(&self, actualrate: f64) {
let mut max_rate = self.max_rate.lock().expect("Operation failed");
if actualrate < *max_rate * 0.5 {
*max_rate = (*max_rate * (1.0 - self.adaptation_factor)).max(self.min_rate);
} else if actualrate >= *max_rate * 0.9 {
*max_rate =
(*max_rate * (1.0 + self.adaptation_factor)).min(self.max_rate_absolute);
}
}
pub fn get_stats(&self) -> RateLimitStats {
let current_rate = *self.current_rate.lock().expect("Operation failed");
let max_rate = *self.max_rate.lock().expect("Operation failed");
RateLimitStats {
current_rate,
max_rate,
message_count: self.message_count.load(Ordering::Relaxed),
window_duration: self.window_duration,
}
}
pub fn reset(&self) {
*self.current_rate.lock().expect("Operation failed") = 0.0;
*self.last_reset.lock().expect("Operation failed") = Instant::now();
self.message_count.store(0, Ordering::Relaxed);
}
}
#[derive(Debug, Clone)]
pub struct RateLimitStats {
pub current_rate: f64,
pub max_rate: f64,
pub message_count: usize,
pub window_duration: Duration,
}
pub struct DistributedLogger {
#[allow(dead_code)]
nodeid: NodeId,
locallogger: Logger,
aggregator: Arc<LogAggregator>,
rate_limiters: Arc<RwLock<HashMap<String, AdaptiveRateLimiter>>>,
default_rate_limit: f64,
}
impl DistributedLogger {
pub fn new(
logger_name: &str,
nodeid: NodeId,
max_entries: usize,
aggregation_window: Duration,
default_rate_limit: f64,
) -> Self {
let locallogger = Logger::new(logger_name);
let aggregator = Arc::new(LogAggregator::new(
nodeid.clone(),
max_entries,
aggregation_window,
));
Self {
nodeid,
locallogger,
aggregator,
rate_limiters: Arc::new(RwLock::new(HashMap::new())),
default_rate_limit,
}
}
pub fn log_adaptive(
&self,
level: LogLevel,
message: &str,
context: Option<HashMap<String, String>>,
) {
let logger_key = self.locallogger.module.clone();
let shouldlog = {
let rate_limiters = self.rate_limiters.read().expect("Operation failed");
if let Some(limiter) = rate_limiters.get(&logger_key) {
limiter.try_acquire()
} else {
drop(rate_limiters);
let mut rate_limiters = self.rate_limiters.write().expect("Operation failed");
let limiter = AdaptiveRateLimiter::new(
self.default_rate_limit,
Duration::from_secs(1),
0.1, );
let shouldlog = limiter.try_acquire();
rate_limiters.insert(logger_key, limiter);
shouldlog
}
};
if shouldlog {
self.locallogger.writelog(level, message);
let entry = DistributedLogEntry::new(
self.nodeid.clone(),
level,
self.locallogger.module.clone(),
message.to_string(),
context.unwrap_or_default(),
);
self.aggregator.add_entry(entry);
}
}
pub fn error_adaptive(&self, message: &str) {
self.log_adaptive(LogLevel::Error, message, None);
}
pub fn warn_adaptive(&self, message: &str) {
self.log_adaptive(LogLevel::Warn, message, None);
}
pub fn info_adaptive(&self, message: &str) {
self.log_adaptive(LogLevel::Info, message, None);
}
pub fn debug_adaptive(&self, message: &str) {
self.log_adaptive(LogLevel::Debug, message, None);
}
pub fn get_aggregatedlogs(&self) -> Vec<DistributedLogEntry> {
self.aggregator.get_entries()
}
pub fn get_rate_stats(&self) -> HashMap<String, RateLimitStats> {
self.rate_limiters
.read()
.expect("Test: operation failed")
.iter()
.map(|(k, v)| (k.clone(), v.get_stats()))
.collect()
}
pub fn get_aggregation_stats(&self) -> AggregationStats {
self.aggregator.stats()
}
pub fn exportlogs_json(&self) -> Result<String, Box<dyn std::error::Error>> {
let entries = self.get_aggregatedlogs();
let stats = self.get_aggregation_stats();
let export_data = serde_json::json!({
"nodeid": self.nodeid.to_string(),
"timestamp": SystemTime::now()
.duration_since(UNIX_EPOCH)
.expect("Test: operation failed")
.as_millis(),
"stats": {
"total_entries": stats.total_entries,
"dropped_entries": stats.dropped_entries,
"aggregation_windows": stats.aggregation_windows
},
"entries": entries.iter().map(|entry| serde_json::json!({
"id": entry.id,
"nodeid": entry.nodeid.to_string(),
"timestamp": entry.timestamp,
"level": format!("{0:?}", entry.level),
"logger": entry.logger,
"message": entry.message,
"context": entry.context,
"sequence": entry.sequence
})).collect::<Vec<_>>()
});
Ok(serde_json::to_string_pretty(&export_data)?)
}
pub fn clear_aggregated_data(&self) {
self.aggregator.clear();
let rate_limiters = self.rate_limiters.write().expect("Operation failed");
for limiter in rate_limiters.values() {
limiter.reset();
}
}
}
pub struct MultiNodeCoordinator {
nodes: Arc<RwLock<HashMap<NodeId, Arc<DistributedLogger>>>>,
global_aggregator: Arc<LogAggregator>,
coordination_interval: Duration,
running: Arc<AtomicUsize>, }
impl MultiNodeCoordinator {
pub fn new(coordinationinterval: Duration) -> Self {
let global_node = NodeId::new("global".to_string(), "coordinator".to_string());
let global_aggregator = Arc::new(LogAggregator::new(
global_node,
100000, Duration::from_secs(3600), ));
Self {
nodes: Arc::new(RwLock::new(HashMap::new())),
global_aggregator,
coordination_interval: coordinationinterval,
running: Arc::new(AtomicUsize::new(0)),
}
}
pub fn register_node(&self, nodeid: NodeId, logger: Arc<DistributedLogger>) {
let mut nodes = self.nodes.write().expect("Operation failed");
nodes.insert(nodeid, logger);
}
pub fn unregister_node(&self, nodeid: &NodeId) {
let mut nodes = self.nodes.write().expect("Operation failed");
nodes.remove(nodeid);
}
pub fn start(&self) {
if self
.running
.compare_exchange(0, 1, Ordering::Relaxed, Ordering::Relaxed)
.is_ok()
{
let nodes = self.nodes.clone();
let global_aggregator = self.global_aggregator.clone();
let interval = self.coordination_interval;
let running = self.running.clone();
thread::spawn(move || {
while running.load(Ordering::Relaxed) == 1 {
let nodes_guard = nodes.read().expect("Operation failed");
for logger in nodes_guard.values() {
let entries = logger.get_aggregatedlogs();
for entry in entries {
global_aggregator.add_entry(entry);
}
}
drop(nodes_guard);
thread::sleep(interval);
}
});
}
}
pub fn stop(&self) {
self.running.store(0, Ordering::Relaxed);
}
pub fn get_global_stats(&self) -> AggregationStats {
self.global_aggregator.stats()
}
pub fn get_global_entries(&self) -> Vec<DistributedLogEntry> {
self.global_aggregator.get_entries()
}
pub fn export_globallogs_json(&self) -> Result<String, Box<dyn std::error::Error>> {
let entries = self.get_global_entries();
let stats = self.get_global_stats();
let export_data = serde_json::json!({
"coordinator": "global",
"timestamp": SystemTime::now()
.duration_since(UNIX_EPOCH)
.expect("Test: operation failed")
.as_millis(),
"stats": {
"total_entries": stats.total_entries,
"dropped_entries": stats.dropped_entries,
"nodes_count": self.nodes.read().expect("Operation failed").len(),
"entries_by_level": stats.entries_by_level.iter().map(|(k, v)| (format!("{k:?}"), *v)).collect::<HashMap<String, u64>>()
},
"entries": entries.iter().map(|entry| serde_json::json!({
"id": entry.id,
"nodeid": entry.nodeid.to_string(),
"timestamp": entry.timestamp,
"level": format!("{0:?}", entry.level),
"logger": entry.logger,
"message": entry.message,
"context": entry.context,
"sequence": entry.sequence
})).collect::<Vec<_>>()
});
Ok(serde_json::to_string_pretty(&export_data)?)
}
}
impl Drop for MultiNodeCoordinator {
fn drop(&mut self) {
self.stop();
}
}
}
#[cfg(test)]
mod distributed_tests {
use super::distributed::*;
use super::*;
use std::time::Duration;
#[test]
fn test_nodeid_creation() {
let node = NodeId::new("worker1".to_string(), "pid123".to_string());
assert_eq!(node.name(), "worker1");
assert_eq!(node.instance_id(), "pid123");
assert_eq!(node.to_string(), "worker1:pid123");
}
#[test]
fn testlog_aggregator() {
let nodeid = NodeId::new("test_node".to_string(), 1.to_string());
let aggregator = LogAggregator::new(nodeid.clone(), 100, Duration::from_secs(60));
let entry = DistributedLogEntry::new(
nodeid,
LogLevel::Info,
"testlogger".to_string(),
"Test message".to_string(),
HashMap::new(),
);
aggregator.add_entry(entry);
let entries = aggregator.get_entries();
assert_eq!(entries.len(), 1);
assert_eq!(entries[0].message, "Test message");
let stats = aggregator.stats();
assert_eq!(stats.total_entries, 1);
}
#[test]
fn test_adaptive_rate_limiter() {
let limiter = AdaptiveRateLimiter::new(10.0, Duration::from_millis(100), 0.1);
assert!(limiter.try_acquire());
assert!(limiter.try_acquire());
let stats = limiter.get_stats();
assert!(stats.current_rate >= 0.0);
assert_eq!(stats.max_rate, 10.0);
}
#[test]
fn test_distributedlogger() {
let nodeid = NodeId::new("test_node".to_string(), 1.to_string());
let logger =
DistributedLogger::new("testlogger", nodeid, 1000, Duration::from_secs(60), 100.0);
logger.info_adaptive("Test message 1");
logger.warn_adaptive("Test message 2");
let entries = logger.get_aggregatedlogs();
assert!(!entries.is_empty());
let stats = logger.get_aggregation_stats();
assert!(stats.total_entries >= 1);
}
#[test]
fn test_multi_node_coordinator() {
let coordinator = MultiNodeCoordinator::new(Duration::from_millis(10));
let node1_id = NodeId::new("node1".to_string(), "1".to_string());
let node1logger = Arc::new(DistributedLogger::new(
"node1logger",
node1_id.clone(),
100,
Duration::from_secs(10),
50.0,
));
coordinator.register_node(node1_id, node1logger);
coordinator.start();
std::thread::sleep(Duration::from_millis(50));
coordinator.stop();
let stats = coordinator.get_global_stats();
let _ = stats.total_entries;
}
#[test]
fn testlog_export() {
let nodeid = NodeId::new("export_test".to_string(), 1.to_string());
let logger =
DistributedLogger::new("exportlogger", nodeid, 100, Duration::from_secs(60), 100.0);
logger.info_adaptive("Export test message");
let json_export = logger.exportlogs_json().expect("Operation failed");
assert!(json_export.contains("export_test"));
assert!(json_export.contains("Export test message"));
}
}