1use std::fmt;
43use std::fs::{File, OpenOptions};
44use std::io::{self, Write};
45use std::path::Path;
46use std::sync::{Arc, Mutex};
47use std::time::SystemTime;
48
49use async_trait::async_trait;
50use chrono::{DateTime, Utc};
51use serde_json::json;
52use tracing::{debug, error, info, trace, warn};
53
54use crate::error::{Error, Result};
55use crate::event::{EventSubscriber, NodeEvent};
56
57#[derive(Clone)]
59pub enum LogDestination {
60 Console,
62
63 File {
65 path: String,
67
68 max_size: Option<usize>,
70
71 rotate: bool,
73 },
74
75 Custom(Arc<dyn Fn(&str) + Send + Sync>),
77}
78
79impl fmt::Debug for LogDestination {
81 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
82 match self {
83 LogDestination::Console => write!(f, "LogDestination::Console"),
84 LogDestination::File {
85 path,
86 max_size,
87 rotate,
88 } => f
89 .debug_struct("LogDestination::File")
90 .field("path", path)
91 .field("max_size", max_size)
92 .field("rotate", rotate)
93 .finish(),
94 LogDestination::Custom(_) => write!(f, "LogDestination::Custom(<function>)"),
95 }
96 }
97}
98
99#[derive(Debug, Clone)]
101pub struct EventLoggerConfig {
102 pub destination: LogDestination,
104
105 pub structured: bool,
107
108 pub log_level: log::Level,
110}
111
112impl Default for EventLoggerConfig {
113 fn default() -> Self {
114 Self {
115 destination: LogDestination::Console,
116 structured: false,
117 log_level: log::Level::Info,
118 }
119 }
120}
121
122pub struct EventLogger {
128 config: EventLoggerConfig,
130
131 file: Option<Arc<Mutex<File>>>,
133}
134
135impl EventLogger {
136 pub fn new(config: EventLoggerConfig) -> Self {
138 let file = match &config.destination {
139 LogDestination::File { path, .. } => match Self::open_log_file(path) {
140 Ok(file) => Some(Arc::new(Mutex::new(file))),
141 Err(err) => {
142 error!("Failed to open log file {}: {}", path, err);
143 None
144 }
145 },
146 _ => None,
147 };
148
149 Self { config, file }
150 }
151
152 fn open_log_file(path: &str) -> io::Result<File> {
154 if let Some(parent) = Path::new(path).parent() {
156 std::fs::create_dir_all(parent)?;
157 }
158
159 OpenOptions::new().create(true).append(true).open(path)
161 }
162
163 fn log_event(&self, event: &NodeEvent) -> Result<()> {
165 let log_message = if self.config.structured {
166 self.format_structured_log(event)?
167 } else {
168 self.format_plain_log(event)
169 };
170
171 match &self.config.destination {
172 LogDestination::Console => {
173 match self.config.log_level {
175 log::Level::Error => error!("{}", log_message),
176 log::Level::Warn => warn!("{}", log_message),
177 log::Level::Info => info!("{}", log_message),
178 log::Level::Debug => debug!("{}", log_message),
179 log::Level::Trace => trace!("{}", log_message),
180 }
181 Ok(())
182 }
183 LogDestination::File { .. } => {
184 if let Some(file) = &self.file {
185 let mut file_guard = file.lock().map_err(|_| {
186 Error::Configuration("Failed to acquire log file lock".to_string())
187 })?;
188
189 writeln!(file_guard, "{}", log_message).map_err(|err| {
191 Error::Configuration(format!("Failed to write to log file: {}", err))
192 })?;
193
194 file_guard.flush().map_err(|err| {
196 Error::Configuration(format!("Failed to flush log file: {}", err))
197 })?;
198
199 Ok(())
200 } else {
201 error!("{}", log_message);
203 Ok(())
204 }
205 }
206 LogDestination::Custom(func) => {
207 func(&log_message);
209 Ok(())
210 }
211 }
212 }
213
214 fn format_plain_log(&self, event: &NodeEvent) -> String {
216 let timestamp = DateTime::<Utc>::from(SystemTime::now()).format("%Y-%m-%dT%H:%M:%S%.3fZ");
217
218 match event {
219 NodeEvent::PlainMessageReceived { message } => {
220 format!("[{}] MESSAGE RECEIVED: {}", timestamp, message)
221 }
222 NodeEvent::PlainMessageSent { message, from, to } => {
223 format!(
224 "[{}] MESSAGE SENT: from={}, to={}, message={}",
225 timestamp, from, to, message
226 )
227 }
228 NodeEvent::AgentRegistered { did } => {
229 format!("[{}] AGENT REGISTERED: {}", timestamp, did)
230 }
231 NodeEvent::AgentUnregistered { did } => {
232 format!("[{}] AGENT UNREGISTERED: {}", timestamp, did)
233 }
234 NodeEvent::DidResolved { did, success } => {
235 format!(
236 "[{}] DID RESOLVED: did={}, success={}",
237 timestamp, did, success
238 )
239 }
240 NodeEvent::AgentPlainMessage { did, message } => {
241 format!(
242 "[{}] AGENT MESSAGE: did={}, message_length={}",
243 timestamp,
244 did,
245 message.len()
246 )
247 }
248 NodeEvent::MessageRejected {
249 message_id,
250 reason,
251 from,
252 to,
253 } => {
254 format!(
255 "[{}] MESSAGE REJECTED: id={}, from={}, to={}, reason={}",
256 timestamp, message_id, from, to, reason
257 )
258 }
259 NodeEvent::MessageAccepted {
260 message_id,
261 message_type,
262 from,
263 to,
264 } => {
265 format!(
266 "[{}] MESSAGE ACCEPTED: id={}, type={}, from={}, to={}",
267 timestamp, message_id, message_type, from, to
268 )
269 }
270 NodeEvent::ReplyReceived {
271 original_message_id,
272 ..
273 } => {
274 format!(
275 "[{}] REPLY RECEIVED: original_id={}",
276 timestamp, original_message_id
277 )
278 }
279 NodeEvent::TransactionStateChanged {
280 transaction_id,
281 old_state,
282 new_state,
283 agent_did,
284 } => match agent_did {
285 Some(did) => format!(
286 "[{}] TRANSACTION STATE CHANGED: id={}, {} -> {} (by {})",
287 timestamp, transaction_id, old_state, new_state, did
288 ),
289 None => format!(
290 "[{}] TRANSACTION STATE CHANGED: id={}, {} -> {}",
291 timestamp, transaction_id, old_state, new_state
292 ),
293 },
294 NodeEvent::MessageReceived { message, source } => {
295 format!(
296 "[{}] MESSAGE RECEIVED: source={}, type={}, id={}",
297 timestamp, source, message.type_, message.id
298 )
299 }
300 NodeEvent::MessageSent {
301 message,
302 destination,
303 } => {
304 format!(
305 "[{}] MESSAGE SENT: destination={}, type={}, id={}",
306 timestamp, destination, message.type_, message.id
307 )
308 }
309 NodeEvent::TransactionCreated {
310 transaction,
311 agent_did,
312 } => {
313 format!(
314 "[{}] TRANSACTION CREATED: id={}, agent={}",
315 timestamp, transaction.id, agent_did
316 )
317 }
318 NodeEvent::CustomerUpdated {
319 customer_id,
320 agent_did,
321 update_type,
322 } => {
323 format!(
324 "[{}] CUSTOMER UPDATED: id={}, agent={}, type={}",
325 timestamp, customer_id, agent_did, update_type
326 )
327 }
328 NodeEvent::DecisionRequired {
329 transaction_id,
330 transaction_state,
331 pending_agents,
332 ..
333 } => {
334 format!(
335 "[{}] DECISION REQUIRED: tx={}, state={}, pending_agents={}",
336 timestamp,
337 transaction_id,
338 transaction_state,
339 pending_agents.join(", ")
340 )
341 }
342 }
343 }
344
345 fn format_structured_log(&self, event: &NodeEvent) -> Result<String> {
347 let timestamp = DateTime::<Utc>::from(SystemTime::now()).to_rfc3339();
349
350 let (event_type, event_data) = match event {
352 NodeEvent::PlainMessageReceived { message } => (
353 "message_received",
354 json!({
355 "message": message,
356 }),
357 ),
358 NodeEvent::PlainMessageSent { message, from, to } => (
359 "message_sent",
360 json!({
361 "from": from,
362 "to": to,
363 "message": message,
364 }),
365 ),
366 NodeEvent::AgentRegistered { did } => (
367 "agent_registered",
368 json!({
369 "did": did,
370 }),
371 ),
372 NodeEvent::AgentUnregistered { did } => (
373 "agent_unregistered",
374 json!({
375 "did": did,
376 }),
377 ),
378 NodeEvent::DidResolved { did, success } => (
379 "did_resolved",
380 json!({
381 "did": did,
382 "success": success,
383 }),
384 ),
385 NodeEvent::AgentPlainMessage { did, message } => (
386 "agent_message",
387 json!({
388 "did": did,
389 "message_length": message.len(),
390 }),
391 ),
392 NodeEvent::MessageRejected {
393 message_id,
394 reason,
395 from,
396 to,
397 } => (
398 "message_rejected",
399 json!({
400 "message_id": message_id,
401 "reason": reason,
402 "from": from,
403 "to": to,
404 }),
405 ),
406 NodeEvent::MessageAccepted {
407 message_id,
408 message_type,
409 from,
410 to,
411 } => (
412 "message_accepted",
413 json!({
414 "message_id": message_id,
415 "message_type": message_type,
416 "from": from,
417 "to": to,
418 }),
419 ),
420 NodeEvent::ReplyReceived {
421 original_message_id,
422 reply_message,
423 original_message,
424 } => (
425 "reply_received",
426 json!({
427 "original_message_id": original_message_id,
428 "reply_message": serde_json::to_value(reply_message).unwrap_or(json!(null)),
429 "original_message": serde_json::to_value(original_message).unwrap_or(json!(null)),
430 }),
431 ),
432 NodeEvent::TransactionStateChanged {
433 transaction_id,
434 old_state,
435 new_state,
436 agent_did,
437 } => (
438 "transaction_state_changed",
439 json!({
440 "transaction_id": transaction_id,
441 "old_state": old_state,
442 "new_state": new_state,
443 "agent_did": agent_did,
444 }),
445 ),
446 NodeEvent::MessageReceived { message, source } => (
447 "message_received_new",
448 json!({
449 "message": serde_json::to_value(message).unwrap_or(json!(null)),
450 "source": source,
451 }),
452 ),
453 NodeEvent::MessageSent {
454 message,
455 destination,
456 } => (
457 "message_sent_new",
458 json!({
459 "message": serde_json::to_value(message).unwrap_or(json!(null)),
460 "destination": destination,
461 }),
462 ),
463 NodeEvent::TransactionCreated {
464 transaction,
465 agent_did,
466 } => (
467 "transaction_created",
468 json!({
469 "transaction_id": transaction.id,
470 "agent_did": agent_did,
471 }),
472 ),
473 NodeEvent::CustomerUpdated {
474 customer_id,
475 agent_did,
476 update_type,
477 } => (
478 "customer_updated",
479 json!({
480 "customer_id": customer_id,
481 "agent_did": agent_did,
482 "update_type": update_type,
483 }),
484 ),
485 NodeEvent::DecisionRequired {
486 transaction_id,
487 transaction_state,
488 decision,
489 pending_agents,
490 } => (
491 "decision_required",
492 json!({
493 "transaction_id": transaction_id,
494 "transaction_state": transaction_state,
495 "decision": decision,
496 "pending_agents": pending_agents,
497 }),
498 ),
499 };
500
501 let log_entry = json!({
503 "timestamp": timestamp,
504 "event_type": event_type,
505 "data": event_data,
506 });
507
508 serde_json::to_string(&log_entry).map_err(|e| Error::Serialization(e.to_string()))
510 }
511}
512
513#[async_trait]
514impl EventSubscriber for EventLogger {
515 async fn handle_event(&self, event: NodeEvent) {
516 if let Err(err) = self.log_event(&event) {
517 error!("Failed to log event: {}", err);
518 }
519 }
520}
521
522impl fmt::Debug for EventLogger {
523 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
524 f.debug_struct("EventLogger")
525 .field("config", &self.config)
526 .field("file", &self.file.is_some())
527 .finish()
528 }
529}