1use crate::{
7 filter::LogFilter,
8 formatter::{create_formatter, LogFormatter},
9 output::{create_output, LogOutput},
10 LogConfig, LogLevel,
11};
12use chrono::{DateTime, TimeZone, Utc};
13use hdds::{Participant, TransportMode};
14use serde::{Deserialize, Serialize};
15use std::collections::HashMap;
16use std::io;
17use std::sync::atomic::{AtomicBool, Ordering};
18use std::sync::Arc;
19use std::time::{Duration, Instant};
20
21#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize, Default)]
23pub enum LogSource {
24 #[default]
26 DdsTopic,
27 Telemetry,
29 Local,
31}
32
33#[derive(Debug, Clone, Serialize, Deserialize)]
35pub struct LogEntry {
36 pub timestamp: DateTime<Utc>,
38 pub level: LogLevel,
40 pub message: String,
42 pub participant_id: String,
44 pub topic: Option<String>,
46 pub node_name: Option<String>,
48 pub file: Option<String>,
50 pub line: Option<u32>,
52 pub function: Option<String>,
54}
55
56impl Default for LogEntry {
57 fn default() -> Self {
58 Self {
59 timestamp: Utc::now(),
60 level: LogLevel::Info,
61 message: String::new(),
62 participant_id: String::new(),
63 topic: None,
64 node_name: None,
65 file: None,
66 line: None,
67 function: None,
68 }
69 }
70}
71
72impl LogEntry {
73 pub fn new(level: LogLevel, message: impl Into<String>) -> Self {
75 Self {
76 timestamp: Utc::now(),
77 level,
78 message: message.into(),
79 ..Default::default()
80 }
81 }
82
83 pub fn with_participant(mut self, id: impl Into<String>) -> Self {
85 self.participant_id = id.into();
86 self
87 }
88
89 pub fn with_topic(mut self, topic: impl Into<String>) -> Self {
91 self.topic = Some(topic.into());
92 self
93 }
94
95 pub fn with_node(mut self, node: impl Into<String>) -> Self {
97 self.node_name = Some(node.into());
98 self
99 }
100
101 pub fn with_location(
103 mut self,
104 file: impl Into<String>,
105 line: u32,
106 function: impl Into<String>,
107 ) -> Self {
108 self.file = Some(file.into());
109 self.line = Some(line);
110 self.function = Some(function.into());
111 self
112 }
113}
114
115pub struct LogCollector {
117 config: LogConfig,
118 formatter: Box<dyn LogFormatter + Send + Sync>,
119 output: Box<dyn LogOutput>,
120 filter: LogFilter,
121 running: Arc<AtomicBool>,
122 stats: CollectorStats,
123}
124
125#[derive(Debug, Default)]
127pub struct CollectorStats {
128 pub logs_received: u64,
130 pub logs_written: u64,
132 pub logs_filtered: u64,
134 pub write_errors: u64,
136}
137
138impl LogCollector {
139 pub fn new(config: LogConfig) -> io::Result<Self> {
141 let formatter = create_formatter(config.format);
142 let output = create_output(&config.output)?;
143 let filter = config.filter.clone();
144
145 Ok(Self {
146 config,
147 formatter,
148 output,
149 filter,
150 running: Arc::new(AtomicBool::new(false)),
151 stats: CollectorStats::default(),
152 })
153 }
154
155 pub fn stats(&self) -> &CollectorStats {
157 &self.stats
158 }
159
160 pub fn process(&mut self, entry: LogEntry) -> io::Result<()> {
162 self.stats.logs_received += 1;
163
164 if !self.filter.matches(&entry) {
166 self.stats.logs_filtered += 1;
167 return Ok(());
168 }
169
170 let line = self.formatter.format(&entry);
172 match self.output.write(&line) {
173 Ok(()) => {
174 self.stats.logs_written += 1;
175 Ok(())
176 }
177 Err(e) => {
178 self.stats.write_errors += 1;
179 Err(e)
180 }
181 }
182 }
183
184 pub fn flush(&mut self) -> io::Result<()> {
186 self.output.flush()
187 }
188
189 pub fn is_running(&self) -> bool {
191 self.running.load(Ordering::SeqCst)
192 }
193
194 pub fn stop(&self) {
196 self.running.store(false, Ordering::SeqCst);
197 }
198
199 pub fn stop_handle(&self) -> StopHandle {
201 StopHandle {
202 running: self.running.clone(),
203 }
204 }
205
206 pub fn run(&mut self) -> io::Result<()> {
210 self.running.store(true, Ordering::SeqCst);
211
212 tracing::info!(
213 domain_id = self.config.domain_id,
214 topic_pattern = %self.config.topic_pattern,
215 "Starting log collector"
216 );
217
218 self.run_with_processor(|collector, entry| collector.process(entry))?;
219
220 tracing::info!(
221 logs_received = self.stats.logs_received,
222 logs_written = self.stats.logs_written,
223 "Log collector stopped"
224 );
225
226 Ok(())
227 }
228
229 pub fn run_with_callback<F>(&mut self, mut callback: F) -> io::Result<()>
231 where
232 F: FnMut(&LogEntry),
233 {
234 self.running.store(true, Ordering::SeqCst);
235
236 self.run_with_processor(|collector, entry| {
237 callback(&entry);
238 collector.process(entry)
239 })
240 }
241
242 fn run_with_processor<F>(&mut self, mut handler: F) -> io::Result<()>
243 where
244 F: FnMut(&mut LogCollector, LogEntry) -> io::Result<()>,
245 {
246 let topic_pattern = self.config.topic_pattern.clone();
247
248 let participant = Participant::builder("hdds-logger")
249 .with_transport(TransportMode::UdpMulticast)
250 .domain_id(self.config.domain_id)
251 .build()
252 .map_err(|e| io::Error::other(e.to_string()))?;
253
254 let mut readers: HashMap<String, hdds::RawDataReader> = HashMap::new();
255 let mut last_discovery = Instant::now()
256 .checked_sub(Duration::from_secs(1))
257 .unwrap_or_else(Instant::now);
258
259 while self.running.load(Ordering::SeqCst) {
260 if last_discovery.elapsed() >= Duration::from_secs(1) {
261 match participant.discover_topics() {
262 Ok(topics) => {
263 for info in topics {
264 if !topic_matches(&topic_pattern, &info.name) {
265 continue;
266 }
267
268 if readers.contains_key(&info.name) {
269 continue;
270 }
271
272 let reader = match participant.create_raw_reader_with_type(
273 &info.name,
274 &info.type_name,
275 Some(info.qos.clone()),
276 info.type_object.clone(),
277 ) {
278 Ok(reader) => reader,
279 Err(err) => {
280 tracing::warn!(
281 "Failed to create raw reader for {}: {}",
282 info.name,
283 err
284 );
285 match participant.create_raw_reader(&info.name, None) {
286 Ok(reader) => reader,
287 Err(fallback_err) => {
288 tracing::warn!(
289 "Fallback raw reader failed for {}: {}",
290 info.name,
291 fallback_err
292 );
293 continue;
294 }
295 }
296 }
297 };
298
299 readers.insert(info.name.clone(), reader);
300 }
301 }
302 Err(err) => {
303 tracing::warn!("DDS discovery failed: {}", err);
304 }
305 }
306 last_discovery = Instant::now();
307 }
308
309 for (topic, reader) in readers.iter() {
310 match reader.try_take_raw() {
311 Ok(samples) => {
312 for sample in samples {
313 if let Some(entry) =
314 parse_ros2_log(&sample.payload, "unknown", topic.as_str())
315 {
316 if let Err(err) = handler(self, entry) {
317 tracing::warn!("Log output failed: {}", err);
318 }
319 }
320 }
321 }
322 Err(err) => {
323 tracing::debug!("DDS read failed for {}: {}", topic, err);
324 }
325 }
326 }
327
328 std::thread::sleep(Duration::from_millis(20));
329 }
330
331 self.flush()
332 }
333}
334
335#[derive(Clone)]
337pub struct StopHandle {
338 running: Arc<AtomicBool>,
339}
340
341impl StopHandle {
342 pub fn stop(&self) {
344 self.running.store(false, Ordering::SeqCst);
345 }
346}
347
348pub fn parse_ros2_log(data: &[u8], participant_id: &str, topic: &str) -> Option<LogEntry> {
359 let (payload, little_endian) = strip_cdr_encapsulation(data);
360 let mut cursor = CdrCursor::new(payload, little_endian);
361
362 let sec = cursor.read_i32()?;
363 let nanosec = cursor.read_u32()?;
364 let level = cursor.read_u8()?;
365 let name = cursor.read_string()?;
366 let msg = cursor.read_string()?;
367 let file = cursor.read_string()?;
368 let function = cursor.read_string()?;
369 let line = cursor.read_u32()?;
370
371 let timestamp = Utc
372 .timestamp_opt(sec as i64, nanosec)
373 .single()
374 .unwrap_or_else(Utc::now);
375
376 let level = match level {
377 10 => LogLevel::Debug,
378 20 => LogLevel::Info,
379 30 => LogLevel::Warn,
380 40 => LogLevel::Error,
381 50 => LogLevel::Fatal,
382 _ => LogLevel::Unset,
383 };
384
385 Some(LogEntry {
386 timestamp,
387 level,
388 message: msg,
389 participant_id: participant_id.to_string(),
390 topic: Some(topic.to_string()),
391 node_name: if name.is_empty() { None } else { Some(name) },
392 file: if file.is_empty() { None } else { Some(file) },
393 line: Some(line),
394 function: if function.is_empty() {
395 None
396 } else {
397 Some(function)
398 },
399 })
400}
401
402fn topic_matches(pattern: &str, text: &str) -> bool {
403 if pattern == text {
404 return true;
405 }
406 if !pattern.contains('*') && !pattern.contains('?') {
407 return false;
408 }
409 glob_match(pattern, text)
410}
411
412fn glob_match(pattern: &str, text: &str) -> bool {
413 let pattern_chars: Vec<char> = pattern.chars().collect();
414 let text_chars: Vec<char> = text.chars().collect();
415 glob_match_recursive(&pattern_chars, &text_chars, 0, 0)
416}
417
418fn glob_match_recursive(pattern: &[char], text: &[char], pi: usize, ti: usize) -> bool {
419 if pi == pattern.len() {
420 return ti == text.len();
421 }
422
423 match pattern[pi] {
424 '*' => {
425 for i in ti..=text.len() {
426 if glob_match_recursive(pattern, text, pi + 1, i) {
427 return true;
428 }
429 }
430 false
431 }
432 '?' => {
433 if ti < text.len() {
434 glob_match_recursive(pattern, text, pi + 1, ti + 1)
435 } else {
436 false
437 }
438 }
439 c => {
440 if ti < text.len() && text[ti] == c {
441 glob_match_recursive(pattern, text, pi + 1, ti + 1)
442 } else {
443 false
444 }
445 }
446 }
447}
448
449fn strip_cdr_encapsulation(buf: &[u8]) -> (&[u8], bool) {
450 if buf.len() < 4 {
451 return (buf, true);
452 }
453
454 let rep_id = u16::from_be_bytes([buf[0], buf[1]]);
455 match rep_id {
456 0x0000 | 0x0002 => (&buf[4..], false),
457 0x0001 | 0x0003 => (&buf[4..], true),
458 _ => (buf, true),
459 }
460}
461
462struct CdrCursor<'a> {
463 buf: &'a [u8],
464 pos: usize,
465 little_endian: bool,
466}
467
468impl<'a> CdrCursor<'a> {
469 fn new(buf: &'a [u8], little_endian: bool) -> Self {
470 Self {
471 buf,
472 pos: 0,
473 little_endian,
474 }
475 }
476
477 fn align(&mut self, alignment: usize) -> Option<()> {
478 let mask = alignment.saturating_sub(1);
479 let aligned = (self.pos + mask) & !mask;
480 if aligned > self.buf.len() {
481 return None;
482 }
483 self.pos = aligned;
484 Some(())
485 }
486
487 fn read_u8(&mut self) -> Option<u8> {
488 self.align(1)?;
489 if self.pos + 1 > self.buf.len() {
490 return None;
491 }
492 let val = self.buf[self.pos];
493 self.pos += 1;
494 Some(val)
495 }
496
497 fn read_u32(&mut self) -> Option<u32> {
498 self.align(4)?;
499 self.read_u32_raw()
500 }
501
502 fn read_i32(&mut self) -> Option<i32> {
503 self.align(4)?;
504 self.read_i32_raw()
505 }
506
507 fn read_string(&mut self) -> Option<String> {
508 self.align(4)?;
509 let len = self.read_u32_raw()? as usize;
510 if len == 0 {
511 return Some(String::new());
512 }
513 if self.pos + len > self.buf.len() {
514 return None;
515 }
516 let raw = &self.buf[self.pos..self.pos + len];
517 self.pos += len;
518 let trimmed = if raw.last() == Some(&0) {
519 &raw[..len - 1]
520 } else {
521 raw
522 };
523 Some(String::from_utf8_lossy(trimmed).to_string())
524 }
525
526 fn read_u32_raw(&mut self) -> Option<u32> {
527 if self.pos + 4 > self.buf.len() {
528 return None;
529 }
530 let bytes = [
531 self.buf[self.pos],
532 self.buf[self.pos + 1],
533 self.buf[self.pos + 2],
534 self.buf[self.pos + 3],
535 ];
536 self.pos += 4;
537 Some(if self.little_endian {
538 u32::from_le_bytes(bytes)
539 } else {
540 u32::from_be_bytes(bytes)
541 })
542 }
543
544 fn read_i32_raw(&mut self) -> Option<i32> {
545 if self.pos + 4 > self.buf.len() {
546 return None;
547 }
548 let bytes = [
549 self.buf[self.pos],
550 self.buf[self.pos + 1],
551 self.buf[self.pos + 2],
552 self.buf[self.pos + 3],
553 ];
554 self.pos += 4;
555 Some(if self.little_endian {
556 i32::from_le_bytes(bytes)
557 } else {
558 i32::from_be_bytes(bytes)
559 })
560 }
561}
562
563#[cfg(test)]
564mod tests {
565 use super::*;
566 use crate::{OutputConfig, OutputFormat};
567
568 #[test]
569 fn test_log_entry_builder() {
570 let entry = LogEntry::new(LogLevel::Error, "Something went wrong")
571 .with_participant("01020304-0506-0708-090a-0b0c0d0e0f10")
572 .with_topic("rt/rosout")
573 .with_node("/my_node")
574 .with_location("main.cpp", 42, "main");
575
576 assert_eq!(entry.level, LogLevel::Error);
577 assert_eq!(entry.message, "Something went wrong");
578 assert_eq!(entry.topic, Some("rt/rosout".to_string()));
579 assert_eq!(entry.node_name, Some("/my_node".to_string()));
580 assert_eq!(entry.line, Some(42));
581 }
582
583 #[test]
584 fn test_collector_process() {
585 let config = LogConfig {
586 format: OutputFormat::Text,
587 output: OutputConfig::Stdout,
588 filter: LogFilter::min_level(LogLevel::Warn),
589 ..Default::default()
590 };
591
592 let mut collector = LogCollector::new(config).unwrap();
593
594 let info_entry =
596 LogEntry::new(LogLevel::Info, "Info message").with_participant("test-participant");
597 collector.process(info_entry).unwrap();
598 assert_eq!(collector.stats.logs_filtered, 1);
599 assert_eq!(collector.stats.logs_written, 0);
600
601 let error_entry =
603 LogEntry::new(LogLevel::Error, "Error message").with_participant("test-participant");
604 collector.process(error_entry).unwrap();
605 assert_eq!(collector.stats.logs_written, 1);
606 }
607
608 #[test]
609 fn test_stop_handle() {
610 let config = LogConfig::default();
611 let collector = LogCollector::new(config).unwrap();
612 let handle = collector.stop_handle();
613
614 assert!(!collector.is_running());
615 handle.stop();
616 assert!(!collector.is_running());
617 }
618}