ipfrs_transport/
observability.rs

1//! Observability module for structured logging and event tracking
2//!
3//! This module provides utilities for tracking transport layer events,
4//! structured logging, and integration with observability platforms.
5//!
6//! # Example
7//!
8//! ```
9//! use ipfrs_transport::observability::{EventLogger, LogLevel, TransportEvent};
10//!
11//! let mut logger = EventLogger::new();
12//! logger.log_event(TransportEvent::BlockRequested {
13//!     cid: "QmTest".to_string(),
14//!     peer_id: "peer1".to_string(),
15//!     priority: "High".to_string(),
16//! });
17//!
18//! let events = logger.get_recent_events(10);
19//! assert_eq!(events.len(), 1);
20//! ```
21
22use std::collections::VecDeque;
23use std::sync::{Arc, Mutex};
24use std::time::{SystemTime, UNIX_EPOCH};
25
26/// Log level for events
27#[derive(Debug, Clone, Copy, PartialEq, Eq, PartialOrd, Ord)]
28pub enum LogLevel {
29    /// Debug level - detailed information for debugging
30    Debug,
31    /// Info level - general informational messages
32    Info,
33    /// Warn level - warning messages for potentially problematic situations
34    Warn,
35    /// Error level - error messages for failures
36    Error,
37}
38
39impl std::fmt::Display for LogLevel {
40    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
41        match self {
42            LogLevel::Debug => write!(f, "DEBUG"),
43            LogLevel::Info => write!(f, "INFO"),
44            LogLevel::Warn => write!(f, "WARN"),
45            LogLevel::Error => write!(f, "ERROR"),
46        }
47    }
48}
49
50/// Transport layer events that can be logged
51#[derive(Debug, Clone)]
52pub enum TransportEvent {
53    /// Block requested from a peer
54    BlockRequested {
55        cid: String,
56        peer_id: String,
57        priority: String,
58    },
59    /// Block received from a peer
60    BlockReceived {
61        cid: String,
62        peer_id: String,
63        bytes: usize,
64        latency_ms: u64,
65    },
66    /// Block request failed
67    BlockRequestFailed {
68        cid: String,
69        peer_id: String,
70        error: String,
71    },
72    /// Peer connected
73    PeerConnected {
74        peer_id: String,
75        transport_type: String,
76        address: String,
77    },
78    /// Peer disconnected
79    PeerDisconnected { peer_id: String, reason: String },
80    /// Session started
81    SessionStarted {
82        session_id: String,
83        block_count: usize,
84    },
85    /// Session completed
86    SessionCompleted {
87        session_id: String,
88        duration_ms: u64,
89        bytes_transferred: u64,
90    },
91    /// Circuit breaker opened
92    CircuitBreakerOpened {
93        peer_id: String,
94        failure_count: usize,
95    },
96    /// Network partition detected
97    PartitionDetected {
98        peer_count: usize,
99        suspected_peers: Vec<String>,
100    },
101    /// Network partition recovered
102    PartitionRecovered { duration_ms: u64 },
103    /// Custom event with arbitrary key-value pairs
104    Custom {
105        event_type: String,
106        data: Vec<(String, String)>,
107    },
108}
109
110impl std::fmt::Display for TransportEvent {
111    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
112        match self {
113            TransportEvent::BlockRequested {
114                cid,
115                peer_id,
116                priority,
117            } => {
118                write!(
119                    f,
120                    "BlockRequested(cid={}, peer={}, priority={})",
121                    cid, peer_id, priority
122                )
123            }
124            TransportEvent::BlockReceived {
125                cid,
126                peer_id,
127                bytes,
128                latency_ms,
129            } => {
130                write!(
131                    f,
132                    "BlockReceived(cid={}, peer={}, bytes={}, latency={}ms)",
133                    cid, peer_id, bytes, latency_ms
134                )
135            }
136            TransportEvent::BlockRequestFailed {
137                cid,
138                peer_id,
139                error,
140            } => {
141                write!(
142                    f,
143                    "BlockRequestFailed(cid={}, peer={}, error={})",
144                    cid, peer_id, error
145                )
146            }
147            TransportEvent::PeerConnected {
148                peer_id,
149                transport_type,
150                address,
151            } => {
152                write!(
153                    f,
154                    "PeerConnected(peer={}, transport={}, addr={})",
155                    peer_id, transport_type, address
156                )
157            }
158            TransportEvent::PeerDisconnected { peer_id, reason } => {
159                write!(f, "PeerDisconnected(peer={}, reason={})", peer_id, reason)
160            }
161            TransportEvent::SessionStarted {
162                session_id,
163                block_count,
164            } => {
165                write!(
166                    f,
167                    "SessionStarted(id={}, blocks={})",
168                    session_id, block_count
169                )
170            }
171            TransportEvent::SessionCompleted {
172                session_id,
173                duration_ms,
174                bytes_transferred,
175            } => {
176                write!(
177                    f,
178                    "SessionCompleted(id={}, duration={}ms, bytes={})",
179                    session_id, duration_ms, bytes_transferred
180                )
181            }
182            TransportEvent::CircuitBreakerOpened {
183                peer_id,
184                failure_count,
185            } => {
186                write!(
187                    f,
188                    "CircuitBreakerOpened(peer={}, failures={})",
189                    peer_id, failure_count
190                )
191            }
192            TransportEvent::PartitionDetected {
193                peer_count,
194                suspected_peers,
195            } => {
196                write!(
197                    f,
198                    "PartitionDetected(peers={}, suspected={:?})",
199                    peer_count, suspected_peers
200                )
201            }
202            TransportEvent::PartitionRecovered { duration_ms } => {
203                write!(f, "PartitionRecovered(duration={}ms)", duration_ms)
204            }
205            TransportEvent::Custom { event_type, data } => {
206                write!(f, "{}(", event_type)?;
207                for (i, (k, v)) in data.iter().enumerate() {
208                    if i > 0 {
209                        write!(f, ", ")?;
210                    }
211                    write!(f, "{}={}", k, v)?;
212                }
213                write!(f, ")")
214            }
215        }
216    }
217}
218
219/// Log entry with timestamp and level
220#[derive(Debug, Clone)]
221pub struct LogEntry {
222    /// Timestamp in milliseconds since UNIX epoch
223    pub timestamp_ms: u64,
224    /// Log level
225    pub level: LogLevel,
226    /// Event that occurred
227    pub event: TransportEvent,
228}
229
230impl LogEntry {
231    /// Create a new log entry with current timestamp
232    pub fn new(level: LogLevel, event: TransportEvent) -> Self {
233        let timestamp_ms = SystemTime::now()
234            .duration_since(UNIX_EPOCH)
235            .unwrap_or_default()
236            .as_millis() as u64;
237
238        Self {
239            timestamp_ms,
240            level,
241            event,
242        }
243    }
244}
245
246impl std::fmt::Display for LogEntry {
247    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
248        write!(f, "[{}] {} {}", self.timestamp_ms, self.level, self.event)
249    }
250}
251
252/// Configuration for event logger
253#[derive(Debug, Clone)]
254pub struct LoggerConfig {
255    /// Maximum number of events to keep in memory
256    pub max_events: usize,
257    /// Minimum log level to record
258    pub min_level: LogLevel,
259    /// Whether to print events to stdout
260    pub print_to_stdout: bool,
261}
262
263impl Default for LoggerConfig {
264    fn default() -> Self {
265        Self {
266            max_events: 10000,
267            min_level: LogLevel::Info,
268            print_to_stdout: false,
269        }
270    }
271}
272
273/// Event logger for transport layer events
274pub struct EventLogger {
275    config: LoggerConfig,
276    events: Arc<Mutex<VecDeque<LogEntry>>>,
277}
278
279impl EventLogger {
280    /// Create a new event logger with default configuration
281    pub fn new() -> Self {
282        Self::with_config(LoggerConfig::default())
283    }
284
285    /// Create a new event logger with custom configuration
286    pub fn with_config(config: LoggerConfig) -> Self {
287        Self {
288            config,
289            events: Arc::new(Mutex::new(VecDeque::new())),
290        }
291    }
292
293    /// Log an event with specified level
294    pub fn log(&mut self, level: LogLevel, event: TransportEvent) {
295        if level < self.config.min_level {
296            return;
297        }
298
299        let entry = LogEntry::new(level, event);
300
301        if self.config.print_to_stdout {
302            println!("{}", entry);
303        }
304
305        let mut events = self.events.lock().unwrap();
306        events.push_back(entry);
307
308        // Trim to max size
309        while events.len() > self.config.max_events {
310            events.pop_front();
311        }
312    }
313
314    /// Log an event with Info level
315    pub fn log_event(&mut self, event: TransportEvent) {
316        self.log(LogLevel::Info, event);
317    }
318
319    /// Log a debug event
320    pub fn debug(&mut self, event: TransportEvent) {
321        self.log(LogLevel::Debug, event);
322    }
323
324    /// Log an info event
325    pub fn info(&mut self, event: TransportEvent) {
326        self.log(LogLevel::Info, event);
327    }
328
329    /// Log a warning event
330    pub fn warn(&mut self, event: TransportEvent) {
331        self.log(LogLevel::Warn, event);
332    }
333
334    /// Log an error event
335    pub fn error(&mut self, event: TransportEvent) {
336        self.log(LogLevel::Error, event);
337    }
338
339    /// Get recent events (most recent first)
340    pub fn get_recent_events(&self, count: usize) -> Vec<LogEntry> {
341        let events = self.events.lock().unwrap();
342        events.iter().rev().take(count).cloned().collect()
343    }
344
345    /// Get all events matching a log level
346    pub fn get_events_by_level(&self, level: LogLevel) -> Vec<LogEntry> {
347        let events = self.events.lock().unwrap();
348        events
349            .iter()
350            .filter(|e| e.level == level)
351            .cloned()
352            .collect()
353    }
354
355    /// Get events within a time range (milliseconds since UNIX epoch)
356    pub fn get_events_by_time(&self, start_ms: u64, end_ms: u64) -> Vec<LogEntry> {
357        let events = self.events.lock().unwrap();
358        events
359            .iter()
360            .filter(|e| e.timestamp_ms >= start_ms && e.timestamp_ms <= end_ms)
361            .cloned()
362            .collect()
363    }
364
365    /// Clear all logged events
366    pub fn clear(&mut self) {
367        let mut events = self.events.lock().unwrap();
368        events.clear();
369    }
370
371    /// Get total number of logged events
372    pub fn event_count(&self) -> usize {
373        let events = self.events.lock().unwrap();
374        events.len()
375    }
376
377    /// Get configuration
378    pub fn config(&self) -> &LoggerConfig {
379        &self.config
380    }
381
382    /// Update configuration
383    pub fn update_config(&mut self, config: LoggerConfig) {
384        self.config = config;
385    }
386}
387
388impl Default for EventLogger {
389    fn default() -> Self {
390        Self::new()
391    }
392}
393
394impl Clone for EventLogger {
395    fn clone(&self) -> Self {
396        Self {
397            config: self.config.clone(),
398            events: Arc::clone(&self.events),
399        }
400    }
401}
402
403#[cfg(test)]
404mod tests {
405    use super::*;
406
407    #[test]
408    fn test_event_logger_creation() {
409        let logger = EventLogger::new();
410        assert_eq!(logger.event_count(), 0);
411    }
412
413    #[test]
414    fn test_log_event() {
415        let mut logger = EventLogger::new();
416        logger.log_event(TransportEvent::BlockRequested {
417            cid: "QmTest".to_string(),
418            peer_id: "peer1".to_string(),
419            priority: "High".to_string(),
420        });
421
422        assert_eq!(logger.event_count(), 1);
423        let events = logger.get_recent_events(1);
424        assert_eq!(events.len(), 1);
425        assert_eq!(events[0].level, LogLevel::Info);
426    }
427
428    #[test]
429    fn test_log_levels() {
430        let mut logger = EventLogger::new();
431
432        logger.debug(TransportEvent::Custom {
433            event_type: "test".to_string(),
434            data: vec![],
435        });
436        logger.info(TransportEvent::Custom {
437            event_type: "test".to_string(),
438            data: vec![],
439        });
440        logger.warn(TransportEvent::Custom {
441            event_type: "test".to_string(),
442            data: vec![],
443        });
444        logger.error(TransportEvent::Custom {
445            event_type: "test".to_string(),
446            data: vec![],
447        });
448
449        // Debug events are filtered by default (min_level is Info)
450        assert_eq!(logger.event_count(), 3);
451    }
452
453    #[test]
454    fn test_min_level_filtering() {
455        let config = LoggerConfig {
456            max_events: 100,
457            min_level: LogLevel::Warn,
458            print_to_stdout: false,
459        };
460        let mut logger = EventLogger::with_config(config);
461
462        logger.debug(TransportEvent::Custom {
463            event_type: "debug".to_string(),
464            data: vec![],
465        });
466        logger.info(TransportEvent::Custom {
467            event_type: "info".to_string(),
468            data: vec![],
469        });
470        logger.warn(TransportEvent::Custom {
471            event_type: "warn".to_string(),
472            data: vec![],
473        });
474        logger.error(TransportEvent::Custom {
475            event_type: "error".to_string(),
476            data: vec![],
477        });
478
479        // Only warn and error should be logged
480        assert_eq!(logger.event_count(), 2);
481    }
482
483    #[test]
484    fn test_max_events_limit() {
485        let config = LoggerConfig {
486            max_events: 5,
487            min_level: LogLevel::Debug,
488            print_to_stdout: false,
489        };
490        let mut logger = EventLogger::with_config(config);
491
492        for i in 0..10 {
493            logger.info(TransportEvent::Custom {
494                event_type: format!("event{}", i),
495                data: vec![],
496            });
497        }
498
499        assert_eq!(logger.event_count(), 5);
500        let events = logger.get_recent_events(10);
501        assert_eq!(events.len(), 5);
502    }
503
504    #[test]
505    fn test_get_events_by_level() {
506        let mut logger = EventLogger::with_config(LoggerConfig {
507            min_level: LogLevel::Debug,
508            ..Default::default()
509        });
510
511        logger.info(TransportEvent::Custom {
512            event_type: "info1".to_string(),
513            data: vec![],
514        });
515        logger.error(TransportEvent::Custom {
516            event_type: "error1".to_string(),
517            data: vec![],
518        });
519        logger.info(TransportEvent::Custom {
520            event_type: "info2".to_string(),
521            data: vec![],
522        });
523
524        let info_events = logger.get_events_by_level(LogLevel::Info);
525        assert_eq!(info_events.len(), 2);
526
527        let error_events = logger.get_events_by_level(LogLevel::Error);
528        assert_eq!(error_events.len(), 1);
529    }
530
531    #[test]
532    fn test_clear_events() {
533        let mut logger = EventLogger::new();
534
535        logger.info(TransportEvent::Custom {
536            event_type: "test".to_string(),
537            data: vec![],
538        });
539        assert_eq!(logger.event_count(), 1);
540
541        logger.clear();
542        assert_eq!(logger.event_count(), 0);
543    }
544
545    #[test]
546    fn test_event_display() {
547        let event = TransportEvent::BlockRequested {
548            cid: "QmTest".to_string(),
549            peer_id: "peer1".to_string(),
550            priority: "High".to_string(),
551        };
552
553        let display = format!("{}", event);
554        assert!(display.contains("BlockRequested"));
555        assert!(display.contains("QmTest"));
556        assert!(display.contains("peer1"));
557    }
558
559    #[test]
560    fn test_log_entry_display() {
561        let entry = LogEntry::new(
562            LogLevel::Info,
563            TransportEvent::Custom {
564                event_type: "test".to_string(),
565                data: vec![("key".to_string(), "value".to_string())],
566            },
567        );
568
569        let display = format!("{}", entry);
570        assert!(display.contains("INFO"));
571        assert!(display.contains("test"));
572    }
573
574    #[test]
575    fn test_clone_logger() {
576        let mut logger1 = EventLogger::new();
577        logger1.info(TransportEvent::Custom {
578            event_type: "test".to_string(),
579            data: vec![],
580        });
581
582        let logger2 = logger1.clone();
583        assert_eq!(logger2.event_count(), 1);
584    }
585
586    #[test]
587    fn test_update_config() {
588        let mut logger = EventLogger::new();
589
590        let new_config = LoggerConfig {
591            max_events: 50,
592            min_level: LogLevel::Debug,
593            print_to_stdout: true,
594        };
595
596        logger.update_config(new_config.clone());
597        assert_eq!(logger.config().max_events, 50);
598        assert_eq!(logger.config().min_level, LogLevel::Debug);
599    }
600
601    #[test]
602    fn test_all_event_types() {
603        let mut logger = EventLogger::new();
604
605        logger.info(TransportEvent::BlockRequested {
606            cid: "QmTest1".to_string(),
607            peer_id: "peer1".to_string(),
608            priority: "High".to_string(),
609        });
610
611        logger.info(TransportEvent::BlockReceived {
612            cid: "QmTest2".to_string(),
613            peer_id: "peer2".to_string(),
614            bytes: 1024,
615            latency_ms: 50,
616        });
617
618        logger.info(TransportEvent::PeerConnected {
619            peer_id: "peer3".to_string(),
620            transport_type: "QUIC".to_string(),
621            address: "127.0.0.1:4001".to_string(),
622        });
623
624        logger.info(TransportEvent::SessionStarted {
625            session_id: "session1".to_string(),
626            block_count: 100,
627        });
628
629        assert_eq!(logger.event_count(), 4);
630    }
631}