Skip to main content

tap_node/event/
logger.rs

1//! # Event Logger for TAP Node
2//!
3//! This module provides an event logging system that captures all node events
4//! and logs them to a configurable location. It implements the `EventSubscriber`
5//! trait to receive events via callbacks from the event bus.
6//!
7//! The event logger supports different output formats and destinations, including:
8//! - Console logging via the standard logging framework
9//! - File-based logging with rotation support
10//! - Structured JSON logging for machine readability
11//!
12//! ## Usage
13//!
14//! ```no_run
15//! use std::sync::Arc;
16//! use tap_node::{NodeConfig, TapNode};
17//! use tap_node::event::logger::{EventLogger, EventLoggerConfig, LogDestination};
18//!
19//! async fn example() {
20//!     // Create a new TAP node
21//!     let node = TapNode::new(NodeConfig::default());
22//!
23//!     // Configure the event logger
24//!     let logger_config = EventLoggerConfig {
25//!         destination: LogDestination::File {
26//!             path: "/var/log/tap-node/events.log".to_string(),
27//!             max_size: Some(10 * 1024 * 1024), // 10 MB
28//!             rotate: true,
29//!         },
30//!         structured: true, // Use JSON format
31//!         log_level: log::Level::Info,
32//!     };
33//!
34//!     // Create and subscribe the event logger
35//!     let event_logger = Arc::new(EventLogger::new(logger_config));
36//!     node.event_bus().subscribe(event_logger).await;
37//!
38//!     // The event logger will now receive and log all events
39//! }
40//! ```
41
42use std::fmt;
43use std::fs::{File, OpenOptions};
44use std::io::{self, Write};
45use std::path::Path;
46use std::sync::{Arc, Mutex};
47use std::time::SystemTime;
48
49use async_trait::async_trait;
50use chrono::{DateTime, Utc};
51use serde_json::json;
52use tracing::{debug, error, info, trace, warn};
53
54use crate::error::{Error, Result};
55use crate::event::{EventSubscriber, NodeEvent};
56
57/// Configuration for where event logs should be sent
58#[derive(Clone)]
59pub enum LogDestination {
60    /// Log to the console via the standard logging framework
61    Console,
62
63    /// Log to a file with optional rotation
64    File {
65        /// Path to the log file
66        path: String,
67
68        /// Maximum file size before rotation (in bytes)
69        max_size: Option<usize>,
70
71        /// Whether to rotate log files when they reach max_size
72        rotate: bool,
73    },
74
75    /// Custom logging function
76    Custom(Arc<dyn Fn(&str) + Send + Sync>),
77}
78
79// Custom Debug implementation that doesn't try to print the function pointer
80impl fmt::Debug for LogDestination {
81    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
82        match self {
83            LogDestination::Console => write!(f, "LogDestination::Console"),
84            LogDestination::File {
85                path,
86                max_size,
87                rotate,
88            } => f
89                .debug_struct("LogDestination::File")
90                .field("path", path)
91                .field("max_size", max_size)
92                .field("rotate", rotate)
93                .finish(),
94            LogDestination::Custom(_) => write!(f, "LogDestination::Custom(<function>)"),
95        }
96    }
97}
98
99/// Configuration for the event logger
100#[derive(Debug, Clone)]
101pub struct EventLoggerConfig {
102    /// Where to send the log output
103    pub destination: LogDestination,
104
105    /// Whether to use structured (JSON) logging
106    pub structured: bool,
107
108    /// The log level to use
109    pub log_level: log::Level,
110}
111
112impl Default for EventLoggerConfig {
113    fn default() -> Self {
114        Self {
115            destination: LogDestination::Console,
116            structured: false,
117            log_level: log::Level::Info,
118        }
119    }
120}
121
122/// Event logger for TAP Node
123///
124/// This component subscribes to the node's event bus and logs all events
125/// to the configured destination. It supports both plain text and structured
126/// (JSON) logging, and can output to the console or files.
127pub struct EventLogger {
128    /// Configuration for the logger
129    config: EventLoggerConfig,
130
131    /// File handle if using file destination
132    file: Option<Arc<Mutex<File>>>,
133}
134
135impl EventLogger {
136    /// Create a new event logger with the given configuration
137    pub fn new(config: EventLoggerConfig) -> Self {
138        let file = match &config.destination {
139            LogDestination::File { path, .. } => match Self::open_log_file(path) {
140                Ok(file) => Some(Arc::new(Mutex::new(file))),
141                Err(err) => {
142                    error!("Failed to open log file {}: {}", path, err);
143                    None
144                }
145            },
146            _ => None,
147        };
148
149        Self { config, file }
150    }
151
152    /// Open or create a log file
153    fn open_log_file(path: &str) -> io::Result<File> {
154        // Ensure directory exists
155        if let Some(parent) = Path::new(path).parent() {
156            std::fs::create_dir_all(parent)?;
157        }
158
159        // Open or create the file
160        OpenOptions::new().create(true).append(true).open(path)
161    }
162
163    /// Log an event to the configured destination
164    fn log_event(&self, event: &NodeEvent) -> Result<()> {
165        let log_message = if self.config.structured {
166            self.format_structured_log(event)?
167        } else {
168            self.format_plain_log(event)
169        };
170
171        match &self.config.destination {
172            LogDestination::Console => {
173                // Use the standard logging framework
174                match self.config.log_level {
175                    log::Level::Error => error!("{}", log_message),
176                    log::Level::Warn => warn!("{}", log_message),
177                    log::Level::Info => info!("{}", log_message),
178                    log::Level::Debug => debug!("{}", log_message),
179                    log::Level::Trace => trace!("{}", log_message),
180                }
181                Ok(())
182            }
183            LogDestination::File { .. } => {
184                if let Some(file) = &self.file {
185                    let mut file_guard = file.lock().map_err(|_| {
186                        Error::Configuration("Failed to acquire log file lock".to_string())
187                    })?;
188
189                    // Write to the file with newline
190                    writeln!(file_guard, "{}", log_message).map_err(|err| {
191                        Error::Configuration(format!("Failed to write to log file: {}", err))
192                    })?;
193
194                    // Ensure the log is flushed
195                    file_guard.flush().map_err(|err| {
196                        Error::Configuration(format!("Failed to flush log file: {}", err))
197                    })?;
198
199                    Ok(())
200                } else {
201                    // Fall back to console logging if file isn't available
202                    error!("{}", log_message);
203                    Ok(())
204                }
205            }
206            LogDestination::Custom(func) => {
207                // Call the custom logging function
208                func(&log_message);
209                Ok(())
210            }
211        }
212    }
213
214    /// Format an event as a plain text log message
215    fn format_plain_log(&self, event: &NodeEvent) -> String {
216        let timestamp = DateTime::<Utc>::from(SystemTime::now()).format("%Y-%m-%dT%H:%M:%S%.3fZ");
217
218        match event {
219            NodeEvent::PlainMessageReceived { message } => {
220                format!("[{}] MESSAGE RECEIVED: {}", timestamp, message)
221            }
222            NodeEvent::PlainMessageSent { message, from, to } => {
223                format!(
224                    "[{}] MESSAGE SENT: from={}, to={}, message={}",
225                    timestamp, from, to, message
226                )
227            }
228            NodeEvent::AgentRegistered { did } => {
229                format!("[{}] AGENT REGISTERED: {}", timestamp, did)
230            }
231            NodeEvent::AgentUnregistered { did } => {
232                format!("[{}] AGENT UNREGISTERED: {}", timestamp, did)
233            }
234            NodeEvent::DidResolved { did, success } => {
235                format!(
236                    "[{}] DID RESOLVED: did={}, success={}",
237                    timestamp, did, success
238                )
239            }
240            NodeEvent::AgentPlainMessage { did, message } => {
241                format!(
242                    "[{}] AGENT MESSAGE: did={}, message_length={}",
243                    timestamp,
244                    did,
245                    message.len()
246                )
247            }
248            NodeEvent::MessageRejected {
249                message_id,
250                reason,
251                from,
252                to,
253            } => {
254                format!(
255                    "[{}] MESSAGE REJECTED: id={}, from={}, to={}, reason={}",
256                    timestamp, message_id, from, to, reason
257                )
258            }
259            NodeEvent::MessageAccepted {
260                message_id,
261                message_type,
262                from,
263                to,
264            } => {
265                format!(
266                    "[{}] MESSAGE ACCEPTED: id={}, type={}, from={}, to={}",
267                    timestamp, message_id, message_type, from, to
268                )
269            }
270            NodeEvent::ReplyReceived {
271                original_message_id,
272                ..
273            } => {
274                format!(
275                    "[{}] REPLY RECEIVED: original_id={}",
276                    timestamp, original_message_id
277                )
278            }
279            NodeEvent::TransactionStateChanged {
280                transaction_id,
281                old_state,
282                new_state,
283                agent_did,
284            } => match agent_did {
285                Some(did) => format!(
286                    "[{}] TRANSACTION STATE CHANGED: id={}, {} -> {} (by {})",
287                    timestamp, transaction_id, old_state, new_state, did
288                ),
289                None => format!(
290                    "[{}] TRANSACTION STATE CHANGED: id={}, {} -> {}",
291                    timestamp, transaction_id, old_state, new_state
292                ),
293            },
294            NodeEvent::MessageReceived { message, source } => {
295                format!(
296                    "[{}] MESSAGE RECEIVED: source={}, type={}, id={}",
297                    timestamp, source, message.type_, message.id
298                )
299            }
300            NodeEvent::MessageSent {
301                message,
302                destination,
303            } => {
304                format!(
305                    "[{}] MESSAGE SENT: destination={}, type={}, id={}",
306                    timestamp, destination, message.type_, message.id
307                )
308            }
309            NodeEvent::TransactionCreated {
310                transaction,
311                agent_did,
312            } => {
313                format!(
314                    "[{}] TRANSACTION CREATED: id={}, agent={}",
315                    timestamp, transaction.id, agent_did
316                )
317            }
318            NodeEvent::CustomerUpdated {
319                customer_id,
320                agent_did,
321                update_type,
322            } => {
323                format!(
324                    "[{}] CUSTOMER UPDATED: id={}, agent={}, type={}",
325                    timestamp, customer_id, agent_did, update_type
326                )
327            }
328            NodeEvent::DecisionRequired {
329                transaction_id,
330                transaction_state,
331                pending_agents,
332                ..
333            } => {
334                format!(
335                    "[{}] DECISION REQUIRED: tx={}, state={}, pending_agents={}",
336                    timestamp,
337                    transaction_id,
338                    transaction_state,
339                    pending_agents.join(", ")
340                )
341            }
342        }
343    }
344
345    /// Format an event as a structured (JSON) log message
346    fn format_structured_log(&self, event: &NodeEvent) -> Result<String> {
347        // Create common fields for all event types
348        let timestamp = DateTime::<Utc>::from(SystemTime::now()).to_rfc3339();
349
350        // Create event-specific fields
351        let (event_type, event_data) = match event {
352            NodeEvent::PlainMessageReceived { message } => (
353                "message_received",
354                json!({
355                    "message": message,
356                }),
357            ),
358            NodeEvent::PlainMessageSent { message, from, to } => (
359                "message_sent",
360                json!({
361                    "from": from,
362                    "to": to,
363                    "message": message,
364                }),
365            ),
366            NodeEvent::AgentRegistered { did } => (
367                "agent_registered",
368                json!({
369                    "did": did,
370                }),
371            ),
372            NodeEvent::AgentUnregistered { did } => (
373                "agent_unregistered",
374                json!({
375                    "did": did,
376                }),
377            ),
378            NodeEvent::DidResolved { did, success } => (
379                "did_resolved",
380                json!({
381                    "did": did,
382                    "success": success,
383                }),
384            ),
385            NodeEvent::AgentPlainMessage { did, message } => (
386                "agent_message",
387                json!({
388                    "did": did,
389                    "message_length": message.len(),
390                }),
391            ),
392            NodeEvent::MessageRejected {
393                message_id,
394                reason,
395                from,
396                to,
397            } => (
398                "message_rejected",
399                json!({
400                    "message_id": message_id,
401                    "reason": reason,
402                    "from": from,
403                    "to": to,
404                }),
405            ),
406            NodeEvent::MessageAccepted {
407                message_id,
408                message_type,
409                from,
410                to,
411            } => (
412                "message_accepted",
413                json!({
414                    "message_id": message_id,
415                    "message_type": message_type,
416                    "from": from,
417                    "to": to,
418                }),
419            ),
420            NodeEvent::ReplyReceived {
421                original_message_id,
422                reply_message,
423                original_message,
424            } => (
425                "reply_received",
426                json!({
427                    "original_message_id": original_message_id,
428                    "reply_message": serde_json::to_value(reply_message).unwrap_or(json!(null)),
429                    "original_message": serde_json::to_value(original_message).unwrap_or(json!(null)),
430                }),
431            ),
432            NodeEvent::TransactionStateChanged {
433                transaction_id,
434                old_state,
435                new_state,
436                agent_did,
437            } => (
438                "transaction_state_changed",
439                json!({
440                    "transaction_id": transaction_id,
441                    "old_state": old_state,
442                    "new_state": new_state,
443                    "agent_did": agent_did,
444                }),
445            ),
446            NodeEvent::MessageReceived { message, source } => (
447                "message_received_new",
448                json!({
449                    "message": serde_json::to_value(message).unwrap_or(json!(null)),
450                    "source": source,
451                }),
452            ),
453            NodeEvent::MessageSent {
454                message,
455                destination,
456            } => (
457                "message_sent_new",
458                json!({
459                    "message": serde_json::to_value(message).unwrap_or(json!(null)),
460                    "destination": destination,
461                }),
462            ),
463            NodeEvent::TransactionCreated {
464                transaction,
465                agent_did,
466            } => (
467                "transaction_created",
468                json!({
469                    "transaction_id": transaction.id,
470                    "agent_did": agent_did,
471                }),
472            ),
473            NodeEvent::CustomerUpdated {
474                customer_id,
475                agent_did,
476                update_type,
477            } => (
478                "customer_updated",
479                json!({
480                    "customer_id": customer_id,
481                    "agent_did": agent_did,
482                    "update_type": update_type,
483                }),
484            ),
485            NodeEvent::DecisionRequired {
486                transaction_id,
487                transaction_state,
488                decision,
489                pending_agents,
490            } => (
491                "decision_required",
492                json!({
493                    "transaction_id": transaction_id,
494                    "transaction_state": transaction_state,
495                    "decision": decision,
496                    "pending_agents": pending_agents,
497                }),
498            ),
499        };
500
501        // Combine into a single JSON object
502        let log_entry = json!({
503            "timestamp": timestamp,
504            "event_type": event_type,
505            "data": event_data,
506        });
507
508        // Serialize to a string
509        serde_json::to_string(&log_entry).map_err(|e| Error::Serialization(e.to_string()))
510    }
511}
512
513#[async_trait]
514impl EventSubscriber for EventLogger {
515    async fn handle_event(&self, event: NodeEvent) {
516        if let Err(err) = self.log_event(&event) {
517            error!("Failed to log event: {}", err);
518        }
519    }
520}
521
522impl fmt::Debug for EventLogger {
523    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
524        f.debug_struct("EventLogger")
525            .field("config", &self.config)
526            .field("file", &self.file.is_some())
527            .finish()
528    }
529}