Skip to main content

hdds_logger/
collector.rs

1// SPDX-License-Identifier: Apache-2.0 OR MIT
2// Copyright (c) 2025-2026 naskel.com
3
4//! Log collector - subscribes to DDS log topics and aggregates logs.
5
6use crate::{
7    filter::LogFilter,
8    formatter::{create_formatter, LogFormatter},
9    output::{create_output, LogOutput},
10    LogConfig, LogLevel,
11};
12use chrono::{DateTime, TimeZone, Utc};
13use hdds::{Participant, TransportMode};
14use serde::{Deserialize, Serialize};
15use std::collections::HashMap;
16use std::io;
17use std::sync::atomic::{AtomicBool, Ordering};
18use std::sync::Arc;
19use std::time::{Duration, Instant};
20
21/// Source of a log entry.
22#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize, Default)]
23pub enum LogSource {
24    /// From DDS log topic (e.g., rt/rosout).
25    #[default]
26    DdsTopic,
27    /// From internal telemetry.
28    Telemetry,
29    /// From local application.
30    Local,
31}
32
33/// A collected log entry.
34#[derive(Debug, Clone, Serialize, Deserialize)]
35pub struct LogEntry {
36    /// Timestamp when log was generated.
37    pub timestamp: DateTime<Utc>,
38    /// Log severity level.
39    pub level: LogLevel,
40    /// Log message.
41    pub message: String,
42    /// Participant GUID (hex string).
43    pub participant_id: String,
44    /// Topic name (if from DDS topic).
45    pub topic: Option<String>,
46    /// Node name (for ROS 2 logs).
47    pub node_name: Option<String>,
48    /// Source file name.
49    pub file: Option<String>,
50    /// Source line number.
51    pub line: Option<u32>,
52    /// Source function name.
53    pub function: Option<String>,
54}
55
56impl Default for LogEntry {
57    fn default() -> Self {
58        Self {
59            timestamp: Utc::now(),
60            level: LogLevel::Info,
61            message: String::new(),
62            participant_id: String::new(),
63            topic: None,
64            node_name: None,
65            file: None,
66            line: None,
67            function: None,
68        }
69    }
70}
71
72impl LogEntry {
73    /// Create a new log entry with message.
74    pub fn new(level: LogLevel, message: impl Into<String>) -> Self {
75        Self {
76            timestamp: Utc::now(),
77            level,
78            message: message.into(),
79            ..Default::default()
80        }
81    }
82
83    /// Set participant ID.
84    pub fn with_participant(mut self, id: impl Into<String>) -> Self {
85        self.participant_id = id.into();
86        self
87    }
88
89    /// Set topic name.
90    pub fn with_topic(mut self, topic: impl Into<String>) -> Self {
91        self.topic = Some(topic.into());
92        self
93    }
94
95    /// Set node name.
96    pub fn with_node(mut self, node: impl Into<String>) -> Self {
97        self.node_name = Some(node.into());
98        self
99    }
100
101    /// Set source location.
102    pub fn with_location(
103        mut self,
104        file: impl Into<String>,
105        line: u32,
106        function: impl Into<String>,
107    ) -> Self {
108        self.file = Some(file.into());
109        self.line = Some(line);
110        self.function = Some(function.into());
111        self
112    }
113}
114
115/// Log collector that subscribes to DDS log topics.
116pub struct LogCollector {
117    config: LogConfig,
118    formatter: Box<dyn LogFormatter + Send + Sync>,
119    output: Box<dyn LogOutput>,
120    filter: LogFilter,
121    running: Arc<AtomicBool>,
122    stats: CollectorStats,
123}
124
125/// Collector statistics.
126#[derive(Debug, Default)]
127pub struct CollectorStats {
128    /// Total logs received.
129    pub logs_received: u64,
130    /// Logs written (after filtering).
131    pub logs_written: u64,
132    /// Logs filtered out.
133    pub logs_filtered: u64,
134    /// Write errors.
135    pub write_errors: u64,
136}
137
138impl LogCollector {
139    /// Create a new log collector.
140    pub fn new(config: LogConfig) -> io::Result<Self> {
141        let formatter = create_formatter(config.format);
142        let output = create_output(&config.output)?;
143        let filter = config.filter.clone();
144
145        Ok(Self {
146            config,
147            formatter,
148            output,
149            filter,
150            running: Arc::new(AtomicBool::new(false)),
151            stats: CollectorStats::default(),
152        })
153    }
154
155    /// Get collector statistics.
156    pub fn stats(&self) -> &CollectorStats {
157        &self.stats
158    }
159
160    /// Process a single log entry.
161    pub fn process(&mut self, entry: LogEntry) -> io::Result<()> {
162        self.stats.logs_received += 1;
163
164        // Apply filter
165        if !self.filter.matches(&entry) {
166            self.stats.logs_filtered += 1;
167            return Ok(());
168        }
169
170        // Format and write
171        let line = self.formatter.format(&entry);
172        match self.output.write(&line) {
173            Ok(()) => {
174                self.stats.logs_written += 1;
175                Ok(())
176            }
177            Err(e) => {
178                self.stats.write_errors += 1;
179                Err(e)
180            }
181        }
182    }
183
184    /// Flush output.
185    pub fn flush(&mut self) -> io::Result<()> {
186        self.output.flush()
187    }
188
189    /// Check if collector is running.
190    pub fn is_running(&self) -> bool {
191        self.running.load(Ordering::SeqCst)
192    }
193
194    /// Stop the collector.
195    pub fn stop(&self) {
196        self.running.store(false, Ordering::SeqCst);
197    }
198
199    /// Get a handle to stop the collector from another thread.
200    pub fn stop_handle(&self) -> StopHandle {
201        StopHandle {
202            running: self.running.clone(),
203        }
204    }
205
206    /// Run the collector (blocking).
207    ///
208    /// This would subscribe to DDS topics and process incoming logs.
209    pub fn run(&mut self) -> io::Result<()> {
210        self.running.store(true, Ordering::SeqCst);
211
212        tracing::info!(
213            domain_id = self.config.domain_id,
214            topic_pattern = %self.config.topic_pattern,
215            "Starting log collector"
216        );
217
218        self.run_with_processor(|collector, entry| collector.process(entry))?;
219
220        tracing::info!(
221            logs_received = self.stats.logs_received,
222            logs_written = self.stats.logs_written,
223            "Log collector stopped"
224        );
225
226        Ok(())
227    }
228
229    /// Run with a callback for each log entry (for testing/integration).
230    pub fn run_with_callback<F>(&mut self, mut callback: F) -> io::Result<()>
231    where
232        F: FnMut(&LogEntry),
233    {
234        self.running.store(true, Ordering::SeqCst);
235
236        self.run_with_processor(|collector, entry| {
237            callback(&entry);
238            collector.process(entry)
239        })
240    }
241
242    fn run_with_processor<F>(&mut self, mut handler: F) -> io::Result<()>
243    where
244        F: FnMut(&mut LogCollector, LogEntry) -> io::Result<()>,
245    {
246        let topic_pattern = self.config.topic_pattern.clone();
247
248        let participant = Participant::builder("hdds-logger")
249            .with_transport(TransportMode::UdpMulticast)
250            .domain_id(self.config.domain_id)
251            .build()
252            .map_err(|e| io::Error::other(e.to_string()))?;
253
254        let mut readers: HashMap<String, hdds::RawDataReader> = HashMap::new();
255        let mut last_discovery = Instant::now()
256            .checked_sub(Duration::from_secs(1))
257            .unwrap_or_else(Instant::now);
258
259        while self.running.load(Ordering::SeqCst) {
260            if last_discovery.elapsed() >= Duration::from_secs(1) {
261                match participant.discover_topics() {
262                    Ok(topics) => {
263                        for info in topics {
264                            if !topic_matches(&topic_pattern, &info.name) {
265                                continue;
266                            }
267
268                            if readers.contains_key(&info.name) {
269                                continue;
270                            }
271
272                            let reader = match participant.create_raw_reader_with_type(
273                                &info.name,
274                                &info.type_name,
275                                Some(info.qos.clone()),
276                                info.type_object.clone(),
277                            ) {
278                                Ok(reader) => reader,
279                                Err(err) => {
280                                    tracing::warn!(
281                                        "Failed to create raw reader for {}: {}",
282                                        info.name,
283                                        err
284                                    );
285                                    match participant.create_raw_reader(&info.name, None) {
286                                        Ok(reader) => reader,
287                                        Err(fallback_err) => {
288                                            tracing::warn!(
289                                                "Fallback raw reader failed for {}: {}",
290                                                info.name,
291                                                fallback_err
292                                            );
293                                            continue;
294                                        }
295                                    }
296                                }
297                            };
298
299                            readers.insert(info.name.clone(), reader);
300                        }
301                    }
302                    Err(err) => {
303                        tracing::warn!("DDS discovery failed: {}", err);
304                    }
305                }
306                last_discovery = Instant::now();
307            }
308
309            for (topic, reader) in readers.iter() {
310                match reader.try_take_raw() {
311                    Ok(samples) => {
312                        for sample in samples {
313                            if let Some(entry) =
314                                parse_ros2_log(&sample.payload, "unknown", topic.as_str())
315                            {
316                                if let Err(err) = handler(self, entry) {
317                                    tracing::warn!("Log output failed: {}", err);
318                                }
319                            }
320                        }
321                    }
322                    Err(err) => {
323                        tracing::debug!("DDS read failed for {}: {}", topic, err);
324                    }
325                }
326            }
327
328            std::thread::sleep(Duration::from_millis(20));
329        }
330
331        self.flush()
332    }
333}
334
335/// Handle to stop a running collector.
336#[derive(Clone)]
337pub struct StopHandle {
338    running: Arc<AtomicBool>,
339}
340
341impl StopHandle {
342    /// Stop the collector.
343    pub fn stop(&self) {
344        self.running.store(false, Ordering::SeqCst);
345    }
346}
347
348/// Parse ROS 2 rcl_interfaces/Log message.
349///
350/// ROS 2 Log message structure:
351/// - stamp: builtin_interfaces/Time
352/// - level: uint8 (DEBUG=10, INFO=20, WARN=30, ERROR=40, FATAL=50)
353/// - name: string (logger name)
354/// - msg: string (log message)
355/// - file: string (source file)
356/// - function: string (function name)
357/// - line: uint32 (line number)
358pub fn parse_ros2_log(data: &[u8], participant_id: &str, topic: &str) -> Option<LogEntry> {
359    let (payload, little_endian) = strip_cdr_encapsulation(data);
360    let mut cursor = CdrCursor::new(payload, little_endian);
361
362    let sec = cursor.read_i32()?;
363    let nanosec = cursor.read_u32()?;
364    let level = cursor.read_u8()?;
365    let name = cursor.read_string()?;
366    let msg = cursor.read_string()?;
367    let file = cursor.read_string()?;
368    let function = cursor.read_string()?;
369    let line = cursor.read_u32()?;
370
371    let timestamp = Utc
372        .timestamp_opt(sec as i64, nanosec)
373        .single()
374        .unwrap_or_else(Utc::now);
375
376    let level = match level {
377        10 => LogLevel::Debug,
378        20 => LogLevel::Info,
379        30 => LogLevel::Warn,
380        40 => LogLevel::Error,
381        50 => LogLevel::Fatal,
382        _ => LogLevel::Unset,
383    };
384
385    Some(LogEntry {
386        timestamp,
387        level,
388        message: msg,
389        participant_id: participant_id.to_string(),
390        topic: Some(topic.to_string()),
391        node_name: if name.is_empty() { None } else { Some(name) },
392        file: if file.is_empty() { None } else { Some(file) },
393        line: Some(line),
394        function: if function.is_empty() {
395            None
396        } else {
397            Some(function)
398        },
399    })
400}
401
402fn topic_matches(pattern: &str, text: &str) -> bool {
403    if pattern == text {
404        return true;
405    }
406    if !pattern.contains('*') && !pattern.contains('?') {
407        return false;
408    }
409    glob_match(pattern, text)
410}
411
412fn glob_match(pattern: &str, text: &str) -> bool {
413    let pattern_chars: Vec<char> = pattern.chars().collect();
414    let text_chars: Vec<char> = text.chars().collect();
415    glob_match_recursive(&pattern_chars, &text_chars, 0, 0)
416}
417
418fn glob_match_recursive(pattern: &[char], text: &[char], pi: usize, ti: usize) -> bool {
419    if pi == pattern.len() {
420        return ti == text.len();
421    }
422
423    match pattern[pi] {
424        '*' => {
425            for i in ti..=text.len() {
426                if glob_match_recursive(pattern, text, pi + 1, i) {
427                    return true;
428                }
429            }
430            false
431        }
432        '?' => {
433            if ti < text.len() {
434                glob_match_recursive(pattern, text, pi + 1, ti + 1)
435            } else {
436                false
437            }
438        }
439        c => {
440            if ti < text.len() && text[ti] == c {
441                glob_match_recursive(pattern, text, pi + 1, ti + 1)
442            } else {
443                false
444            }
445        }
446    }
447}
448
449fn strip_cdr_encapsulation(buf: &[u8]) -> (&[u8], bool) {
450    if buf.len() < 4 {
451        return (buf, true);
452    }
453
454    let rep_id = u16::from_be_bytes([buf[0], buf[1]]);
455    match rep_id {
456        0x0000 | 0x0002 => (&buf[4..], false),
457        0x0001 | 0x0003 => (&buf[4..], true),
458        _ => (buf, true),
459    }
460}
461
462struct CdrCursor<'a> {
463    buf: &'a [u8],
464    pos: usize,
465    little_endian: bool,
466}
467
468impl<'a> CdrCursor<'a> {
469    fn new(buf: &'a [u8], little_endian: bool) -> Self {
470        Self {
471            buf,
472            pos: 0,
473            little_endian,
474        }
475    }
476
477    fn align(&mut self, alignment: usize) -> Option<()> {
478        let mask = alignment.saturating_sub(1);
479        let aligned = (self.pos + mask) & !mask;
480        if aligned > self.buf.len() {
481            return None;
482        }
483        self.pos = aligned;
484        Some(())
485    }
486
487    fn read_u8(&mut self) -> Option<u8> {
488        self.align(1)?;
489        if self.pos + 1 > self.buf.len() {
490            return None;
491        }
492        let val = self.buf[self.pos];
493        self.pos += 1;
494        Some(val)
495    }
496
497    fn read_u32(&mut self) -> Option<u32> {
498        self.align(4)?;
499        self.read_u32_raw()
500    }
501
502    fn read_i32(&mut self) -> Option<i32> {
503        self.align(4)?;
504        self.read_i32_raw()
505    }
506
507    fn read_string(&mut self) -> Option<String> {
508        self.align(4)?;
509        let len = self.read_u32_raw()? as usize;
510        if len == 0 {
511            return Some(String::new());
512        }
513        if self.pos + len > self.buf.len() {
514            return None;
515        }
516        let raw = &self.buf[self.pos..self.pos + len];
517        self.pos += len;
518        let trimmed = if raw.last() == Some(&0) {
519            &raw[..len - 1]
520        } else {
521            raw
522        };
523        Some(String::from_utf8_lossy(trimmed).to_string())
524    }
525
526    fn read_u32_raw(&mut self) -> Option<u32> {
527        if self.pos + 4 > self.buf.len() {
528            return None;
529        }
530        let bytes = [
531            self.buf[self.pos],
532            self.buf[self.pos + 1],
533            self.buf[self.pos + 2],
534            self.buf[self.pos + 3],
535        ];
536        self.pos += 4;
537        Some(if self.little_endian {
538            u32::from_le_bytes(bytes)
539        } else {
540            u32::from_be_bytes(bytes)
541        })
542    }
543
544    fn read_i32_raw(&mut self) -> Option<i32> {
545        if self.pos + 4 > self.buf.len() {
546            return None;
547        }
548        let bytes = [
549            self.buf[self.pos],
550            self.buf[self.pos + 1],
551            self.buf[self.pos + 2],
552            self.buf[self.pos + 3],
553        ];
554        self.pos += 4;
555        Some(if self.little_endian {
556            i32::from_le_bytes(bytes)
557        } else {
558            i32::from_be_bytes(bytes)
559        })
560    }
561}
562
563#[cfg(test)]
564mod tests {
565    use super::*;
566    use crate::{OutputConfig, OutputFormat};
567
568    #[test]
569    fn test_log_entry_builder() {
570        let entry = LogEntry::new(LogLevel::Error, "Something went wrong")
571            .with_participant("01020304-0506-0708-090a-0b0c0d0e0f10")
572            .with_topic("rt/rosout")
573            .with_node("/my_node")
574            .with_location("main.cpp", 42, "main");
575
576        assert_eq!(entry.level, LogLevel::Error);
577        assert_eq!(entry.message, "Something went wrong");
578        assert_eq!(entry.topic, Some("rt/rosout".to_string()));
579        assert_eq!(entry.node_name, Some("/my_node".to_string()));
580        assert_eq!(entry.line, Some(42));
581    }
582
583    #[test]
584    fn test_collector_process() {
585        let config = LogConfig {
586            format: OutputFormat::Text,
587            output: OutputConfig::Stdout,
588            filter: LogFilter::min_level(LogLevel::Warn),
589            ..Default::default()
590        };
591
592        let mut collector = LogCollector::new(config).unwrap();
593
594        // Info should be filtered
595        let info_entry =
596            LogEntry::new(LogLevel::Info, "Info message").with_participant("test-participant");
597        collector.process(info_entry).unwrap();
598        assert_eq!(collector.stats.logs_filtered, 1);
599        assert_eq!(collector.stats.logs_written, 0);
600
601        // Error should pass
602        let error_entry =
603            LogEntry::new(LogLevel::Error, "Error message").with_participant("test-participant");
604        collector.process(error_entry).unwrap();
605        assert_eq!(collector.stats.logs_written, 1);
606    }
607
608    #[test]
609    fn test_stop_handle() {
610        let config = LogConfig::default();
611        let collector = LogCollector::new(config).unwrap();
612        let handle = collector.stop_handle();
613
614        assert!(!collector.is_running());
615        handle.stop();
616        assert!(!collector.is_running());
617    }
618}