Skip to main content

tap_http/
event.rs

1//! Event system for TAP HTTP server.
2//!
3//! This module provides an event tracking and logging system for monitoring HTTP server activities.
4//! It allows tracking request/response activity, message processing, and server lifecycle events.
5//! Events can be logged to a configurable location (console, file, or custom handler).
6//!
7//! # Features
8//!
9//! - Track and log HTTP server events
10//! - Request and response monitoring
11//! - Configurable logging destination
12//! - JSON structured logging support
13//! - File rotation capabilities
14//! - Integration with the TAP Node event system
15//!
16//! # Example
17//!
18//! ```no_run
19//! use tap_http::{TapHttpConfig, TapHttpServer};
20//! use tap_http::event::{EventLogger, EventLoggerConfig, LogDestination};
21//! use tap_node::{NodeConfig, TapNode};
22//!
23//! #[tokio::main]
24//! async fn main() -> Result<(), Box<dyn std::error::Error>> {
25//!     // Create a TAP Node with default config
26//!     let node = TapNode::new(NodeConfig::default());
27//!     
28//!     // Configure the HTTP server with event logging
29//!     let mut config = TapHttpConfig::default();
30//!     config.event_logger = Some(EventLoggerConfig {
31//!         destination: LogDestination::File {
32//!             path: "./logs/tap-http.log".to_string(),
33//!             max_size: Some(10 * 1024 * 1024), // 10 MB
34//!             rotate: true,
35//!         },
36//!         structured: true, // Use JSON format
37//!         log_level: tracing::Level::INFO,
38//!     });
39//!     
40//!     // Create and start the server
41//!     let mut server = TapHttpServer::new(config, node);
42//!     server.start().await?;
43//!     
44//!     Ok(())
45//! }
46//! ```
47
48use std::fmt;
49use std::fs::{File, OpenOptions};
50use std::io::{self, Write};
51use std::path::Path;
52use std::sync::{Arc, Mutex};
53use std::time::SystemTime;
54
55use async_trait::async_trait;
56use chrono::{DateTime, Utc};
57use serde_json::json;
58use tracing::{debug, error, info, trace, warn, Level};
59use warp::hyper::StatusCode;
60
61/// HTTP server event types
62///
63/// Represents the various events that can occur within the TAP HTTP server,
64/// including request handling, message processing, and server lifecycle events.
65#[derive(Debug, Clone)]
66pub enum HttpEvent {
67    /// Server started event
68    ServerStarted {
69        /// The address the server is bound to
70        address: String,
71    },
72
73    /// Server stopped event
74    ServerStopped,
75
76    /// Request received event
77    RequestReceived {
78        /// The HTTP method
79        method: String,
80        /// The request path
81        path: String,
82        /// The client IP address
83        client_ip: Option<String>,
84        /// The timestamp when the request was received
85        timestamp: DateTime<Utc>,
86    },
87
88    /// Response sent event
89    ResponseSent {
90        /// The HTTP status code
91        status: StatusCode,
92        /// The response size in bytes
93        size: usize,
94        /// The time it took to process the request in milliseconds
95        duration_ms: u64,
96    },
97
98    /// DIDComm message received event
99    MessageReceived {
100        /// The message ID
101        id: String,
102        /// The message type
103        type_: String,
104        /// The message sender's DID (if available)
105        from: Option<String>,
106        /// The message recipient's DID (if available)
107        to: Option<String>,
108    },
109
110    /// DIDComm message processing error event
111    MessageError {
112        /// The error type
113        error_type: String,
114        /// The error message
115        message: String,
116        /// The message ID (if available)
117        message_id: Option<String>,
118    },
119}
120
121/// Configuration for where event logs should be sent
122#[derive(Clone)]
123pub enum LogDestination {
124    /// Log to the console via the standard logging framework
125    Console,
126
127    /// Log to a file with optional rotation
128    File {
129        /// Path to the log file
130        path: String,
131
132        /// Maximum file size before rotation (in bytes)
133        max_size: Option<usize>,
134
135        /// Whether to rotate log files when they reach max_size
136        rotate: bool,
137    },
138
139    /// Custom logging function
140    Custom(Arc<dyn Fn(&str) + Send + Sync>),
141}
142
143// Custom Debug implementation that doesn't try to print the function pointer
144impl fmt::Debug for LogDestination {
145    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
146        match self {
147            LogDestination::Console => write!(f, "LogDestination::Console"),
148            LogDestination::File {
149                path,
150                max_size,
151                rotate,
152            } => f
153                .debug_struct("LogDestination::File")
154                .field("path", path)
155                .field("max_size", max_size)
156                .field("rotate", rotate)
157                .finish(),
158            LogDestination::Custom(_) => write!(f, "LogDestination::Custom(<function>)"),
159        }
160    }
161}
162
163use serde::{Deserialize, Deserializer, Serialize, Serializer};
164
165/// Configuration for the event logger
166#[derive(Debug, Clone)]
167pub struct EventLoggerConfig {
168    /// Where to send the log output
169    pub destination: LogDestination,
170
171    /// Whether to use structured (JSON) logging
172    pub structured: bool,
173
174    /// The log level to use
175    pub log_level: Level,
176}
177
178// Custom serialization/deserialization for EventLoggerConfig
179impl Serialize for EventLoggerConfig {
180    fn serialize<S>(&self, serializer: S) -> Result<S::Ok, S::Error>
181    where
182        S: Serializer,
183    {
184        use serde::ser::SerializeStruct;
185        let mut state = serializer.serialize_struct("EventLoggerConfig", 3)?;
186
187        // For destination, serialize a type and path if it's a file
188        match &self.destination {
189            LogDestination::Console => {
190                state.serialize_field("destination_type", "console")?;
191                state.serialize_field("destination_path", "")?;
192            }
193            LogDestination::File { path, .. } => {
194                state.serialize_field("destination_type", "file")?;
195                state.serialize_field("destination_path", path)?;
196            }
197            LogDestination::Custom(_) => {
198                state.serialize_field("destination_type", "custom")?;
199                state.serialize_field("destination_path", "")?;
200            }
201        }
202
203        state.serialize_field("structured", &self.structured)?;
204        state.serialize_field("log_level", &format!("{:?}", self.log_level))?;
205        state.end()
206    }
207}
208
209impl<'de> Deserialize<'de> for EventLoggerConfig {
210    fn deserialize<D>(_deserializer: D) -> Result<Self, D::Error>
211    where
212        D: Deserializer<'de>,
213    {
214        // For deserialization, we'll create a default config
215        // The actual destination will be set programmatically
216        Ok(EventLoggerConfig::default())
217    }
218}
219
220impl EventLoggerConfig {
221    // Remove unused method as we now use the log_level field directly
222}
223
224impl Default for EventLoggerConfig {
225    fn default() -> Self {
226        Self {
227            destination: LogDestination::File {
228                path: "./logs/tap-http.log".to_string(),
229                max_size: Some(10 * 1024 * 1024), // 10 MB
230                rotate: true,
231            },
232            structured: true,
233            log_level: Level::INFO,
234        }
235    }
236}
237
238/// Event subscriber trait for TAP HTTP events
239///
240/// Note: This trait is not object-safe because of the async fn.
241/// For dynamic dispatch, we use a type-erased wrapper.
242pub trait EventSubscriber: Send + Sync {
243    /// Handle an HTTP event
244    fn handle_event(&self, event: HttpEvent) -> futures::future::BoxFuture<'_, ()>;
245}
246
247/// Implementation for async handlers
248impl<T> EventSubscriber for T
249where
250    T: Send + Sync + 'static,
251    T: for<'a> HandleEvent<'a>,
252{
253    fn handle_event(&self, event: HttpEvent) -> futures::future::BoxFuture<'_, ()> {
254        Box::pin(self.handle_event_async(event))
255    }
256}
257
258/// Helper trait for async handling
259#[async_trait]
260pub trait HandleEvent<'a>: Send + Sync {
261    async fn handle_event_async(&self, event: HttpEvent);
262}
263
264/// Event bus for TAP HTTP server
265pub struct EventBus {
266    /// Subscribers
267    subscribers: Mutex<Vec<Arc<Box<dyn EventSubscriber>>>>,
268}
269
270impl EventBus {
271    /// Create a new event bus
272    pub fn new() -> Self {
273        Self {
274            subscribers: Mutex::new(Vec::new()),
275        }
276    }
277
278    /// Subscribe to HTTP events with a boxed subscriber
279    pub fn subscribe<S>(&self, subscriber: S)
280    where
281        S: EventSubscriber + 'static,
282    {
283        let boxed = Arc::new(Box::new(subscriber) as Box<dyn EventSubscriber>);
284        let mut subscribers = self.subscribers.lock().unwrap();
285        subscribers.push(boxed);
286    }
287
288    /// Remove a subscriber from the event bus
289    pub fn unsubscribe(&self, subscriber: &Arc<Box<dyn EventSubscriber>>) {
290        let mut subscribers = self.subscribers.lock().unwrap();
291        subscribers.retain(|s| !Arc::ptr_eq(s, subscriber));
292    }
293
294    /// Publish a server started event
295    pub async fn publish_server_started(&self, address: String) {
296        let event = HttpEvent::ServerStarted { address };
297        self.publish_event(event).await;
298    }
299
300    /// Publish a server stopped event
301    pub async fn publish_server_stopped(&self) {
302        let event = HttpEvent::ServerStopped;
303        self.publish_event(event).await;
304    }
305
306    /// Publish a request received event
307    pub async fn publish_request_received(
308        &self,
309        method: String,
310        path: String,
311        client_ip: Option<String>,
312    ) {
313        let event = HttpEvent::RequestReceived {
314            method,
315            path,
316            client_ip,
317            timestamp: Utc::now(),
318        };
319        self.publish_event(event).await;
320    }
321
322    /// Publish a response sent event
323    pub async fn publish_response_sent(&self, status: StatusCode, size: usize, duration_ms: u64) {
324        let event = HttpEvent::ResponseSent {
325            status,
326            size,
327            duration_ms,
328        };
329        self.publish_event(event).await;
330    }
331
332    /// Publish a DIDComm message received event
333    pub async fn publish_message_received(
334        &self,
335        id: String,
336        type_: String,
337        from: Option<String>,
338        to: Option<String>,
339    ) {
340        let event = HttpEvent::MessageReceived {
341            id,
342            type_,
343            from,
344            to,
345        };
346        self.publish_event(event).await;
347    }
348
349    /// Publish a DIDComm message processing error event
350    pub async fn publish_message_error(
351        &self,
352        error_type: String,
353        message: String,
354        message_id: Option<String>,
355    ) {
356        let event = HttpEvent::MessageError {
357            error_type,
358            message,
359            message_id,
360        };
361        self.publish_event(event).await;
362    }
363
364    /// Publish an event to all subscribers
365    async fn publish_event(&self, event: HttpEvent) {
366        // Notify subscribers
367        let subscribers = self.subscribers.lock().unwrap().clone();
368        for subscriber in subscribers.iter() {
369            let fut = subscriber.handle_event(event.clone());
370            fut.await;
371        }
372    }
373
374    /// Get the number of subscribers (for testing)
375    pub fn subscriber_count(&self) -> usize {
376        self.subscribers.lock().unwrap().len()
377    }
378}
379
380impl Default for EventBus {
381    fn default() -> Self {
382        Self::new()
383    }
384}
385
386/// Event logger for TAP HTTP server
387///
388/// This component subscribes to the server's event bus and logs all events
389/// to the configured destination. It supports both plain text and structured
390/// (JSON) logging, and can output to the console or files.
391pub struct EventLogger {
392    /// Configuration for the logger
393    config: EventLoggerConfig,
394
395    /// File handle if using file destination
396    file: Option<Arc<Mutex<File>>>,
397}
398
399impl EventLogger {
400    /// Create a new event logger with the given configuration
401    pub fn new(config: EventLoggerConfig) -> Self {
402        let file = match &config.destination {
403            LogDestination::File { path, .. } => match Self::open_log_file(path) {
404                Ok(file) => Some(Arc::new(Mutex::new(file))),
405                Err(err) => {
406                    error!("Failed to open log file {}: {}", path, err);
407                    None
408                }
409            },
410            _ => None,
411        };
412
413        Self { config, file }
414    }
415
416    /// Open or create a log file
417    fn open_log_file(path: &str) -> io::Result<File> {
418        // Ensure directory exists
419        if let Some(parent) = Path::new(path).parent() {
420            std::fs::create_dir_all(parent)?;
421        }
422
423        // Open or create the file
424        OpenOptions::new().create(true).append(true).open(path)
425    }
426
427    /// Log an event to the configured destination
428    fn log_event(&self, event: &HttpEvent) -> crate::error::Result<()> {
429        let log_message = if self.config.structured {
430            self.format_structured_log(event)?
431        } else {
432            self.format_plain_log(event)
433        };
434
435        match &self.config.destination {
436            LogDestination::Console => {
437                // Use the standard logging framework
438                match self.config.log_level {
439                    Level::ERROR => error!("{}", log_message),
440                    Level::WARN => warn!("{}", log_message),
441                    Level::INFO => info!("{}", log_message),
442                    Level::DEBUG => debug!("{}", log_message),
443                    Level::TRACE => trace!("{}", log_message),
444                }
445                Ok(())
446            }
447            LogDestination::File { .. } => {
448                if let Some(file) = &self.file {
449                    let mut file_guard = file.lock().map_err(|_| {
450                        crate::error::Error::Config("Failed to acquire log file lock".to_string())
451                    })?;
452
453                    // Write to the file with newline
454                    writeln!(file_guard, "{}", log_message).map_err(|err| {
455                        crate::error::Error::Config(format!("Failed to write to log file: {}", err))
456                    })?;
457
458                    // Ensure the log is flushed
459                    file_guard.flush().map_err(|err| {
460                        crate::error::Error::Config(format!("Failed to flush log file: {}", err))
461                    })?;
462
463                    Ok(())
464                } else {
465                    // Fall back to console logging if file isn't available
466                    error!("{}", log_message);
467                    Ok(())
468                }
469            }
470            LogDestination::Custom(func) => {
471                // Call the custom logging function
472                func(&log_message);
473                Ok(())
474            }
475        }
476    }
477
478    /// Format an event as a plain text log message
479    fn format_plain_log(&self, event: &HttpEvent) -> String {
480        let timestamp = DateTime::<Utc>::from(SystemTime::now()).format("%Y-%m-%dT%H:%M:%S%.3fZ");
481
482        match event {
483            HttpEvent::ServerStarted { address } => {
484                format!("[{}] SERVER STARTED: address={}", timestamp, address)
485            }
486            HttpEvent::ServerStopped => {
487                format!("[{}] SERVER STOPPED", timestamp)
488            }
489            HttpEvent::RequestReceived {
490                method,
491                path,
492                client_ip,
493                timestamp,
494            } => {
495                format!(
496                    "[{}] REQUEST RECEIVED: method={}, path={}, client_ip={}, timestamp={}",
497                    timestamp,
498                    method,
499                    path,
500                    client_ip.as_deref().unwrap_or("unknown"),
501                    timestamp.format("%Y-%m-%dT%H:%M:%S%.3fZ")
502                )
503            }
504            HttpEvent::ResponseSent {
505                status,
506                size,
507                duration_ms,
508            } => {
509                format!(
510                    "[{}] RESPONSE SENT: status={}, size={}, duration_ms={}",
511                    timestamp,
512                    status.as_u16(),
513                    size,
514                    duration_ms
515                )
516            }
517            HttpEvent::MessageReceived {
518                id,
519                type_,
520                from,
521                to,
522            } => {
523                format!(
524                    "[{}] MESSAGE RECEIVED: id={}, type={}, from={}, to={}",
525                    timestamp,
526                    id,
527                    type_,
528                    from.as_deref().unwrap_or("unknown"),
529                    to.as_deref().unwrap_or("unknown")
530                )
531            }
532            HttpEvent::MessageError {
533                error_type,
534                message,
535                message_id,
536            } => {
537                format!(
538                    "[{}] MESSAGE ERROR: type={}, message={}, message_id={}",
539                    timestamp,
540                    error_type,
541                    message,
542                    message_id.as_deref().unwrap_or("unknown")
543                )
544            }
545        }
546    }
547
548    /// Format an event as a structured (JSON) log message
549    fn format_structured_log(&self, event: &HttpEvent) -> crate::error::Result<String> {
550        // Create common fields for all event types
551        let timestamp = DateTime::<Utc>::from(SystemTime::now()).to_rfc3339();
552
553        // Create event-specific fields
554        let (event_type, event_data) = match event {
555            HttpEvent::ServerStarted { address } => (
556                "server_started",
557                json!({
558                    "address": address,
559                }),
560            ),
561            HttpEvent::ServerStopped => ("server_stopped", json!({})),
562            HttpEvent::RequestReceived {
563                method,
564                path,
565                client_ip,
566                timestamp,
567            } => (
568                "request_received",
569                json!({
570                    "method": method,
571                    "path": path,
572                    "client_ip": client_ip,
573                    "request_timestamp": timestamp.to_rfc3339(),
574                }),
575            ),
576            HttpEvent::ResponseSent {
577                status,
578                size,
579                duration_ms,
580            } => (
581                "response_sent",
582                json!({
583                    "status": status.as_u16(),
584                    "size": size,
585                    "duration_ms": duration_ms,
586                }),
587            ),
588            HttpEvent::MessageReceived {
589                id,
590                type_,
591                from,
592                to,
593            } => (
594                "message_received",
595                json!({
596                    "id": id,
597                    "type": type_,
598                    "from": from,
599                    "to": to,
600                }),
601            ),
602            HttpEvent::MessageError {
603                error_type,
604                message,
605                message_id,
606            } => (
607                "message_error",
608                json!({
609                    "error_type": error_type,
610                    "message": message,
611                    "message_id": message_id,
612                }),
613            ),
614        };
615
616        // Combine into a single JSON object
617        let log_entry = json!({
618            "timestamp": timestamp,
619            "event_type": event_type,
620            "data": event_data,
621        });
622
623        // Serialize to a string
624        serde_json::to_string(&log_entry).map_err(|err| crate::error::Error::Json(err.to_string()))
625    }
626}
627
628#[async_trait]
629impl HandleEvent<'_> for EventLogger {
630    async fn handle_event_async(&self, event: HttpEvent) {
631        if let Err(err) = self.log_event(&event) {
632            error!("Failed to log event: {}", err);
633        }
634    }
635}
636
637impl fmt::Debug for EventLogger {
638    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
639        f.debug_struct("EventLogger")
640            .field("config", &self.config)
641            .field("file", &self.file.is_some())
642            .finish()
643    }
644}
645
646#[cfg(test)]
647mod tests {
648    use super::*;
649
650    #[tokio::test]
651    async fn test_event_bus_publish() {
652        // Create a custom event subscriber for testing
653        struct TestSubscriber {
654            events: Arc<Mutex<Vec<HttpEvent>>>,
655        }
656
657        #[async_trait]
658        impl HandleEvent<'_> for TestSubscriber {
659            async fn handle_event_async(&self, event: HttpEvent) {
660                self.events.lock().unwrap().push(event);
661            }
662        }
663
664        // Create event bus and subscriber
665        let event_bus = EventBus::new();
666        let events = Arc::new(Mutex::new(Vec::new()));
667        let subscriber = TestSubscriber {
668            events: events.clone(),
669        };
670        event_bus.subscribe(subscriber);
671
672        // Publish some events
673        event_bus
674            .publish_server_started("127.0.0.1:8000".to_string())
675            .await;
676        event_bus
677            .publish_request_received(
678                "GET".to_string(),
679                "/didcomm".to_string(),
680                Some("192.168.1.1".to_string()),
681            )
682            .await;
683
684        // Check that the events were received
685        let received_events = events.lock().unwrap();
686        assert_eq!(received_events.len(), 2);
687
688        match &received_events[0] {
689            HttpEvent::ServerStarted { address } => {
690                assert_eq!(address, "127.0.0.1:8000");
691            }
692            _ => panic!("Expected ServerStarted event"),
693        }
694
695        match &received_events[1] {
696            HttpEvent::RequestReceived {
697                method,
698                path,
699                client_ip,
700                ..
701            } => {
702                assert_eq!(method, "GET");
703                assert_eq!(path, "/didcomm");
704                assert_eq!(client_ip, &Some("192.168.1.1".to_string()));
705            }
706            _ => panic!("Expected RequestReceived event"),
707        }
708    }
709}