use std::fmt;
use std::fs::{File, OpenOptions};
use std::io::{self, Write};
use std::path::Path;
use std::sync::{Arc, Mutex};
use std::time::SystemTime;
use async_trait::async_trait;
use chrono::{DateTime, Utc};
use serde_json::json;
use tracing::{debug, error, info, trace, warn};
use crate::error::{Error, Result};
use crate::event::{EventSubscriber, NodeEvent};
#[derive(Clone)]
pub enum LogDestination {
Console,
File {
path: String,
max_size: Option<usize>,
rotate: bool,
},
Custom(Arc<dyn Fn(&str) + Send + Sync>),
}
impl fmt::Debug for LogDestination {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
match self {
LogDestination::Console => write!(f, "LogDestination::Console"),
LogDestination::File {
path,
max_size,
rotate,
} => f
.debug_struct("LogDestination::File")
.field("path", path)
.field("max_size", max_size)
.field("rotate", rotate)
.finish(),
LogDestination::Custom(_) => write!(f, "LogDestination::Custom(<function>)"),
}
}
}
#[derive(Debug, Clone)]
pub struct EventLoggerConfig {
pub destination: LogDestination,
pub structured: bool,
pub log_level: log::Level,
}
impl Default for EventLoggerConfig {
fn default() -> Self {
Self {
destination: LogDestination::Console,
structured: false,
log_level: log::Level::Info,
}
}
}
pub struct EventLogger {
config: EventLoggerConfig,
file: Option<Arc<Mutex<File>>>,
}
impl EventLogger {
pub fn new(config: EventLoggerConfig) -> Self {
let file = match &config.destination {
LogDestination::File { path, .. } => match Self::open_log_file(path) {
Ok(file) => Some(Arc::new(Mutex::new(file))),
Err(err) => {
error!("Failed to open log file {}: {}", path, err);
None
}
},
_ => None,
};
Self { config, file }
}
fn open_log_file(path: &str) -> io::Result<File> {
if let Some(parent) = Path::new(path).parent() {
std::fs::create_dir_all(parent)?;
}
OpenOptions::new().create(true).append(true).open(path)
}
fn log_event(&self, event: &NodeEvent) -> Result<()> {
let log_message = if self.config.structured {
self.format_structured_log(event)?
} else {
self.format_plain_log(event)
};
match &self.config.destination {
LogDestination::Console => {
match self.config.log_level {
log::Level::Error => error!("{}", log_message),
log::Level::Warn => warn!("{}", log_message),
log::Level::Info => info!("{}", log_message),
log::Level::Debug => debug!("{}", log_message),
log::Level::Trace => trace!("{}", log_message),
}
Ok(())
}
LogDestination::File { .. } => {
if let Some(file) = &self.file {
let mut file_guard = file.lock().map_err(|_| {
Error::Configuration("Failed to acquire log file lock".to_string())
})?;
writeln!(file_guard, "{}", log_message).map_err(|err| {
Error::Configuration(format!("Failed to write to log file: {}", err))
})?;
file_guard.flush().map_err(|err| {
Error::Configuration(format!("Failed to flush log file: {}", err))
})?;
Ok(())
} else {
error!("{}", log_message);
Ok(())
}
}
LogDestination::Custom(func) => {
func(&log_message);
Ok(())
}
}
}
fn format_plain_log(&self, event: &NodeEvent) -> String {
let timestamp = DateTime::<Utc>::from(SystemTime::now()).format("%Y-%m-%dT%H:%M:%S%.3fZ");
match event {
NodeEvent::PlainMessageReceived { message } => {
format!("[{}] MESSAGE RECEIVED: {}", timestamp, message)
}
NodeEvent::PlainMessageSent { message, from, to } => {
format!(
"[{}] MESSAGE SENT: from={}, to={}, message={}",
timestamp, from, to, message
)
}
NodeEvent::AgentRegistered { did } => {
format!("[{}] AGENT REGISTERED: {}", timestamp, did)
}
NodeEvent::AgentUnregistered { did } => {
format!("[{}] AGENT UNREGISTERED: {}", timestamp, did)
}
NodeEvent::DidResolved { did, success } => {
format!(
"[{}] DID RESOLVED: did={}, success={}",
timestamp, did, success
)
}
NodeEvent::AgentPlainMessage { did, message } => {
format!(
"[{}] AGENT MESSAGE: did={}, message_length={}",
timestamp,
did,
message.len()
)
}
NodeEvent::MessageRejected {
message_id,
reason,
from,
to,
} => {
format!(
"[{}] MESSAGE REJECTED: id={}, from={}, to={}, reason={}",
timestamp, message_id, from, to, reason
)
}
NodeEvent::MessageAccepted {
message_id,
message_type,
from,
to,
} => {
format!(
"[{}] MESSAGE ACCEPTED: id={}, type={}, from={}, to={}",
timestamp, message_id, message_type, from, to
)
}
NodeEvent::ReplyReceived {
original_message_id,
..
} => {
format!(
"[{}] REPLY RECEIVED: original_id={}",
timestamp, original_message_id
)
}
NodeEvent::TransactionStateChanged {
transaction_id,
old_state,
new_state,
agent_did,
} => match agent_did {
Some(did) => format!(
"[{}] TRANSACTION STATE CHANGED: id={}, {} -> {} (by {})",
timestamp, transaction_id, old_state, new_state, did
),
None => format!(
"[{}] TRANSACTION STATE CHANGED: id={}, {} -> {}",
timestamp, transaction_id, old_state, new_state
),
},
NodeEvent::MessageReceived { message, source } => {
format!(
"[{}] MESSAGE RECEIVED: source={}, type={}, id={}",
timestamp, source, message.type_, message.id
)
}
NodeEvent::MessageSent {
message,
destination,
} => {
format!(
"[{}] MESSAGE SENT: destination={}, type={}, id={}",
timestamp, destination, message.type_, message.id
)
}
NodeEvent::TransactionCreated {
transaction,
agent_did,
} => {
format!(
"[{}] TRANSACTION CREATED: id={}, agent={}",
timestamp, transaction.id, agent_did
)
}
NodeEvent::CustomerUpdated {
customer_id,
agent_did,
update_type,
} => {
format!(
"[{}] CUSTOMER UPDATED: id={}, agent={}, type={}",
timestamp, customer_id, agent_did, update_type
)
}
NodeEvent::DecisionRequired {
transaction_id,
transaction_state,
pending_agents,
..
} => {
format!(
"[{}] DECISION REQUIRED: tx={}, state={}, pending_agents={}",
timestamp,
transaction_id,
transaction_state,
pending_agents.join(", ")
)
}
}
}
fn format_structured_log(&self, event: &NodeEvent) -> Result<String> {
let timestamp = DateTime::<Utc>::from(SystemTime::now()).to_rfc3339();
let (event_type, event_data) = match event {
NodeEvent::PlainMessageReceived { message } => (
"message_received",
json!({
"message": message,
}),
),
NodeEvent::PlainMessageSent { message, from, to } => (
"message_sent",
json!({
"from": from,
"to": to,
"message": message,
}),
),
NodeEvent::AgentRegistered { did } => (
"agent_registered",
json!({
"did": did,
}),
),
NodeEvent::AgentUnregistered { did } => (
"agent_unregistered",
json!({
"did": did,
}),
),
NodeEvent::DidResolved { did, success } => (
"did_resolved",
json!({
"did": did,
"success": success,
}),
),
NodeEvent::AgentPlainMessage { did, message } => (
"agent_message",
json!({
"did": did,
"message_length": message.len(),
}),
),
NodeEvent::MessageRejected {
message_id,
reason,
from,
to,
} => (
"message_rejected",
json!({
"message_id": message_id,
"reason": reason,
"from": from,
"to": to,
}),
),
NodeEvent::MessageAccepted {
message_id,
message_type,
from,
to,
} => (
"message_accepted",
json!({
"message_id": message_id,
"message_type": message_type,
"from": from,
"to": to,
}),
),
NodeEvent::ReplyReceived {
original_message_id,
reply_message,
original_message,
} => (
"reply_received",
json!({
"original_message_id": original_message_id,
"reply_message": serde_json::to_value(reply_message).unwrap_or(json!(null)),
"original_message": serde_json::to_value(original_message).unwrap_or(json!(null)),
}),
),
NodeEvent::TransactionStateChanged {
transaction_id,
old_state,
new_state,
agent_did,
} => (
"transaction_state_changed",
json!({
"transaction_id": transaction_id,
"old_state": old_state,
"new_state": new_state,
"agent_did": agent_did,
}),
),
NodeEvent::MessageReceived { message, source } => (
"message_received_new",
json!({
"message": serde_json::to_value(message).unwrap_or(json!(null)),
"source": source,
}),
),
NodeEvent::MessageSent {
message,
destination,
} => (
"message_sent_new",
json!({
"message": serde_json::to_value(message).unwrap_or(json!(null)),
"destination": destination,
}),
),
NodeEvent::TransactionCreated {
transaction,
agent_did,
} => (
"transaction_created",
json!({
"transaction_id": transaction.id,
"agent_did": agent_did,
}),
),
NodeEvent::CustomerUpdated {
customer_id,
agent_did,
update_type,
} => (
"customer_updated",
json!({
"customer_id": customer_id,
"agent_did": agent_did,
"update_type": update_type,
}),
),
NodeEvent::DecisionRequired {
transaction_id,
transaction_state,
decision,
pending_agents,
} => (
"decision_required",
json!({
"transaction_id": transaction_id,
"transaction_state": transaction_state,
"decision": decision,
"pending_agents": pending_agents,
}),
),
};
let log_entry = json!({
"timestamp": timestamp,
"event_type": event_type,
"data": event_data,
});
serde_json::to_string(&log_entry).map_err(|e| Error::Serialization(e.to_string()))
}
}
#[async_trait]
impl EventSubscriber for EventLogger {
async fn handle_event(&self, event: NodeEvent) {
if let Err(err) = self.log_event(&event) {
error!("Failed to log event: {}", err);
}
}
}
impl fmt::Debug for EventLogger {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
f.debug_struct("EventLogger")
.field("config", &self.config)
.field("file", &self.file.is_some())
.finish()
}
}