use std::collections::HashMap;
use std::sync::{Arc, Mutex};
use std::time::{Duration, SystemTime, UNIX_EPOCH};
use thiserror::Error;
#[derive(Error, Debug)]
pub enum DistributedLogError {
#[error("Network error: {0}")]
NetworkError(String),
#[error("Serialization error: {0}")]
SerializationError(String),
#[error("Node not found: {0}")]
NodeNotFound(String),
#[error("Configuration error: {0}")]
ConfigurationError(String),
}
#[derive(Debug, Clone)]
pub struct DistributedLogEntry {
pub id: String,
pub nodeid: String,
pub level: crate::logging::LogLevel,
pub message: String,
pub context: HashMap<String, String>,
pub timestamp: SystemTime,
pub correlation_id: Option<String>,
pub service: String,
}
impl DistributedLogEntry {
pub fn new(nodeid: String,
level: crate::logging::LogLevel,
message: String,
service: String,
) -> Self {
Self {
id: Self::generate_id(),
nodeid,
level,
message,
context: HashMap::new(),
timestamp: SystemTime::now(),
correlation_id: None,
service,
}
}
pub fn with_context(mut self, key: &str, value: &str) -> Self {
self.context.insert(key.to_string(), value.to_string());
self
}
pub fn with_correlation_id(mut self, correlationid: String) -> Self {
self.correlation_id = Some(correlation_id);
self
}
fn generate_id() -> String {
use std::sync::atomic::{AtomicU64, Ordering};
static COUNTER: AtomicU64 = AtomicU64::new(0);
let timestamp = SystemTime::now()
.duration_since(UNIX_EPOCH)
.unwrap_or(Duration::ZERO)
.as_millis();
let counter = COUNTER.fetch_add(1, Ordering::SeqCst);
format!("{:x}-{:x}", timestamp, counter)
}
pub fn to_json(&self) -> String {
format!(
r#"{{"id":"{}","nodeid":"{}","level":"{}","message":"{}","service":"{}","timestamp":"{}"}}"#,
self.id,
self.nodeid,
format!("{self.level:?}"),
self.message.replace(", "\\\""),
self.service,
self.timestamp
.duration_since(UNIX_EPOCH)
.unwrap_or(Duration::ZERO)
.as_secs()
)
}
}
#[derive(Debug, Clone)]
pub struct LogNode {
pub id: String,
pub address: String,
pub role: NodeRole,
pub last_heartbeat: SystemTime,
pub status: NodeStatus,
}
#[derive(Debug, Clone, PartialEq)]
pub enum NodeRole {
Aggregator,
Producer,
Storage,
Forwarder,
}
#[derive(Debug, Clone, PartialEq)]
pub enum NodeStatus {
Healthy,
Degraded,
Unreachable,
}
#[derive(Debug, Clone)]
pub struct DistributedConfig {
pub nodeid: String,
pub node_role: NodeRole,
pub aggregators: Vec<String>,
pub buffersize: usize,
pub flush_interval: Duration,
pub retry_attempts: usize,
pub enable_compression: bool,
pub heartbeat_interval: Duration,
}
impl Default for DistributedConfig {
fn default() -> Self {
Self {
nodeid: format!("{}", uuid::Uuid::new_v4()),
node_role: NodeRole::Producer,
aggregators: Vec::new(),
buffersize: 1000,
flush_interval: Duration::from_secs(5),
retry_attempts: 3,
enable_compression: false,
heartbeat_interval: Duration::from_secs(30),
}
}
}
#[derive(Debug, Clone)]
pub enum AggregationStrategy {
Forward,
LoadBalance,
Replicate,
Custom(String),
}
pub struct DistributedLogger {
config: DistributedConfig,
buffer: Arc<Mutex<Vec<DistributedLogEntry>>>,
nodes: Arc<Mutex<HashMap<String, LogNode>>>,
aggregation_strategy: AggregationStrategy,
running: Arc<Mutex<bool>>,
}
impl DistributedLogger {
pub fn new(config: DistributedConfig) -> Self {
Self {
config,
buffer: Arc::new(Mutex::new(Vec::new())),
nodes: Arc::new(Mutex::new(HashMap::new())),
aggregation_strategy: AggregationStrategy::Forward,
running: Arc::new(Mutex::new(false)),
}
}
pub fn start(&self) -> Result<(), DistributedLogError> {
let mut running = self.running.lock().expect("Operation failed");
if *running {
return Ok(());
}
*running = true;
for aggregator_addr in &self.config.aggregators {
let node = LogNode {
id: format!("{aggregator_addr}"),
address: aggregator_addr.clone(),
role: NodeRole::Aggregator,
last_heartbeat: SystemTime::now(),
status: NodeStatus::Healthy,
};
self.nodes.lock().expect("Operation failed").insert(node.id.clone(), node);
}
self.start_flush_task();
self.start_heartbeat_task();
Ok(())
}
pub fn stop(&self) {
*self.running.lock().expect("Operation failed") = false;
}
pub fn log(&self, level: crate::logging::LogLevel, message: &str, service: &str) {
let entry = DistributedLogEntry::new(
self.config.nodeid.clone(),
level,
message.to_string(),
service.to_string(),
);
self.buffer.lock().expect("Operation failed").push(entry);
if self.buffer.lock().expect("Operation failed").len() >= self.config.buffersize {
self.flush_buffer();
}
}
pub fn log_with_correlation(&self, level: crate::logging::LogLevel, message: &str, service: &str, correlationid: &str
) {
let entry = DistributedLogEntry::new(
self.config.nodeid.clone(),
level,
message.to_string(),
service.to_string(),
)
.with_correlation_id(correlation_id.to_string());
self.buffer.lock().expect("Operation failed").push(entry);
}
pub fn flush_buffer(&self) {
let mut buffer = self.buffer.lock().expect("Operation failed");
if buffer.is_empty() {
return;
}
let entries: Vec<_> = buffer.drain(..).collect();
drop(buffer);
match self.aggregation_strategy {
AggregationStrategy::Forward => {
self.forward_entries(&entries);
}
AggregationStrategy::LoadBalance => {
self.load_balance_entries(&entries);
}
AggregationStrategy::Replicate => {
self.replicate_entries(&entries);
}
AggregationStrategy::Custom(_) => {
self.forward_entries(&entries);
}
}
}
fn forward_entries(&self, entries: &[DistributedLogEntry]) {
let nodes = self.nodes.lock().expect("Operation failed");
let aggregator = nodes
.values()
.find(|node| node.role == NodeRole::Aggregator && node.status == NodeStatus::Healthy);
if let Some(node) = aggregator {
self.send_entries_to_node(entries, node);
}
}
fn load_balance_entries(&self, entries: &[DistributedLogEntry]) {
let nodes = self.nodes.lock().expect("Operation failed");
let aggregators: Vec<_> = nodes
.values()
.filter(|node| node.role == NodeRole::Aggregator && node.status == NodeStatus::Healthy)
.collect();
if aggregators.is_empty() {
return;
}
for (i, entry) in entries.iter().enumerate() {
let node = &aggregators[i % aggregators.len()];
self.send_entries_to_node(&[entry.clone()], node);
}
}
fn replicate_entries(&self, entries: &[DistributedLogEntry]) {
let nodes = self.nodes.lock().expect("Operation failed");
for node in nodes.values() {
if node.role == NodeRole::Aggregator && node.status == NodeStatus::Healthy {
self.send_entries_to_node(entries, node);
}
}
}
fn send_entries_to_node(&self, entries: &[DistributedLogEntry], node: &LogNode) {
for entry in entries {
println!("Sending log to {}: {}", node.address, entry.to_json());
}
}
fn start_flush_task(&self) {
let buffer = Arc::clone(&self.buffer);
let running = Arc::clone(&self.running);
let flush_interval = self.config.flush_interval;
std::thread::spawn(move || {
while *running.lock().expect("Operation failed") {
std::thread::sleep(flush_interval);
let buffersize = buffer.lock().expect("Operation failed").len();
if buffersize > 0 {
println!("Background flush: {} entries buffered", buffersize);
}
}
});
}
fn start_heartbeat_task(&self) {
let nodes = Arc::clone(&self.nodes);
let running = Arc::clone(&self.running);
let heartbeat_interval = self.config.heartbeat_interval;
std::thread::spawn(move || {
while *running.lock().expect("Operation failed") {
std::thread::sleep(heartbeat_interval);
let mut nodes_guard = nodes.lock().expect("Operation failed");
for node in nodes_guard.values_mut() {
if node.last_heartbeat.elapsed().unwrap_or(Duration::ZERO)
> heartbeat_interval * 2
{
node.status = NodeStatus::Unreachable;
}
}
}
});
}
pub fn get_node_stats(&self) -> NodeStats {
let buffersize = self.buffer.lock().expect("Operation failed").len();
let nodes = self.nodes.lock().expect("Operation failed");
let healthy_nodes = nodes
.values()
.filter(|n| n.status == NodeStatus::Healthy)
.count();
let total_nodes = nodes.len();
NodeStats {
buffersize,
healthy_nodes,
total_nodes,
nodeid: self.config.nodeid.clone(),
uptime: SystemTime::now(),
}
}
pub fn set_aggregation_strategy(&mut self, strategy: AggregationStrategy) {
self.aggregation_strategy = strategy;
}
}
impl Drop for DistributedLogger {
fn drop(&mut self) {
self.stop();
self.flush_buffer();
}
}
#[derive(Debug)]
pub struct NodeStats {
pub buffersize: usize,
pub healthy_nodes: usize,
pub total_nodes: usize,
pub nodeid: String,
pub uptime: SystemTime,
}
pub struct LogAggregator {
config: DistributedConfig,
collectedlogs: Arc<Mutex<Vec<DistributedLogEntry>>>,
running: Arc<Mutex<bool>>,
}
impl LogAggregator {
pub fn new(config: DistributedConfig) -> Self {
Self {
config,
collectedlogs: Arc::new(Mutex::new(Vec::new())),
running: Arc::new(Mutex::new(false)),
}
}
pub fn start(&self) -> Result<(), DistributedLogError> {
*self.running.lock().expect("Operation failed") = true;
println!("Log aggregator started on node: {}", self.config.nodeid);
Ok(())
}
pub fn stop(&self) {
*self.running.lock().expect("Operation failed") = false;
}
pub fn receivelogs(&self, entries: Vec<DistributedLogEntry>) {
let mut logs = self.collectedlogs.lock().expect("Operation failed");
logs.extend(entries);
if logs.len() > 10000 {
logs.drain(0..1000);
}
}
pub fn querylogs(
&self,
service: Option<&str>,
level: Option<crate::logging::LogLevel>,
) -> Vec<DistributedLogEntry> {
let logs = self.collectedlogs.lock().expect("Operation failed");
logs.iter()
.filter(|entry| {
if let Some(svc) = service {
if entry.service != svc {
return false;
}
}
if let Some(lvl) = level {
if std::mem::discriminant(&entry.level) != std::mem::discriminant(&lvl) {
return false;
}
}
true
})
.cloned()
.collect()
}
pub fn get_stats(&self) -> AggregatorStats {
let logs = self.collectedlogs.lock().expect("Operation failed");
let mut service_counts = HashMap::new();
let mut level_counts = HashMap::new();
for entry in logs.iter() {
*service_counts.entry(entry.service.clone()).or_insert(0) += 1;
*level_counts
.entry(format!("{entry.level:?}"))
.or_insert(0) += 1;
}
AggregatorStats {
totallogs: logs.len(),
service_counts,
level_counts,
aggregator_id: self.config.nodeid.clone(),
}
}
}
#[derive(Debug)]
pub struct AggregatorStats {
pub totallogs: usize,
pub service_counts: HashMap<String, usize>,
pub level_counts: HashMap<String, usize>,
pub aggregator_id: String,
}
pub mod utils {
use super::*;
pub fn create_simplelogger(noderole: NodeRole) -> DistributedLogger {
let config = DistributedConfig {
node_role,
..Default::default()
};
DistributedLogger::new(config)
}
pub fn create_cluster(aggregatoraddresses: Vec<String>) -> Vec<DistributedLogger> {
let mut loggers = Vec::new();
for addr in &aggregator_addresses {
let config = DistributedConfig {
nodeid: format!("{addr}"),
node_role: NodeRole::Aggregator,
..Default::default()
};
loggers.push(DistributedLogger::new(config));
}
let producer_config = DistributedConfig {
aggregators: aggregator_addresses.clone(),
node_role: NodeRole::Producer,
..Default::default()
};
loggers.push(DistributedLogger::new(producer_config));
loggers
}
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn test_distributedlog_entry() {
let entry = DistributedLogEntry::new(
"test-node".to_string(),
crate::logging::LogLevel::Info,
"Test message".to_string(),
"test-service".to_string(),
);
assert_eq!(entry.nodeid, "test-node");
assert_eq!(entry.message, "Test message");
assert_eq!(entry.service, "test-service");
assert!(!entry.id.is_empty());
}
#[test]
fn test_distributedlogger_creation() {
let config = DistributedConfig::default();
let logger = DistributedLogger::new(config);
assert_eq!(*logger.running.lock().expect("Operation failed"), false);
}
#[test]
fn testlog_aggregator() {
let config = DistributedConfig {
node_role: NodeRole::Aggregator,
..Default::default()
};
let aggregator = LogAggregator::new(config);
let entry = DistributedLogEntry::new(
"producer-1".to_string(),
crate::logging::LogLevel::Error,
"Error occurred".to_string(),
"api-service".to_string(),
);
aggregator.receivelogs(vec![entry]);
let logs = aggregator.querylogs(Some("api-service"), None);
assert_eq!(logs.len(), 1);
assert_eq!(logs[0].message, "Error occurred");
}
#[test]
fn test_node_stats() {
let config = DistributedConfig::default();
let logger = DistributedLogger::new(config);
let stats = logger.get_node_stats();
assert_eq!(stats.buffersize, 0);
assert_eq!(stats.total_nodes, 0);
}
}