Skip to main content

camel_component_log/
lib.rs

1//! Log component for rust-camel — routes exchange body and headers to the
2//! `tracing` subsystem at a configurable log level, primarily for debugging.
3//!
4//! Main types: `LogComponent`, `LogEndpoint`, `LogProducer`, `LogLevel`.
5
6use std::future::Future;
7use std::pin::Pin;
8use std::str::FromStr;
9use std::sync::Arc;
10use std::sync::atomic::{AtomicUsize, Ordering};
11use std::task::{Context, Poll};
12
13use tower::Service;
14use tracing::{debug, error, info, trace, warn};
15
16use camel_component_api::UriConfig;
17use camel_component_api::parse_uri;
18use camel_component_api::{BoxProcessor, CamelError, Exchange};
19use camel_component_api::{Component, Consumer, Endpoint, ProducerContext, RuntimeObservability};
20
21// ---------------------------------------------------------------------------
22// LogLevel
23// ---------------------------------------------------------------------------
24
25/// Log level for the log component.
26#[derive(Debug, Clone, Copy, PartialEq, Eq, Default)]
27pub enum LogLevel {
28    Trace,
29    Debug,
30    #[default]
31    Info,
32    Warn,
33    Error,
34}
35
36impl FromStr for LogLevel {
37    type Err = String;
38
39    fn from_str(s: &str) -> Result<Self, Self::Err> {
40        parse_log_level(s).map_err(|e| e.to_string())
41    }
42}
43
44fn parse_log_level(s: &str) -> Result<LogLevel, CamelError> {
45    match s.to_uppercase().as_str() {
46        "TRACE" => Ok(LogLevel::Trace),
47        "DEBUG" => Ok(LogLevel::Debug),
48        "INFO" => Ok(LogLevel::Info),
49        "WARN" | "WARNING" => Ok(LogLevel::Warn),
50        "ERROR" => Ok(LogLevel::Error),
51        _ => Err(CamelError::Config(format!(
52            "unknown log level: '{}'. Valid: TRACE, DEBUG, INFO, WARN, ERROR",
53            s
54        ))),
55    }
56}
57
58// ---------------------------------------------------------------------------
59// LogConfig
60// ---------------------------------------------------------------------------
61
62/// Configuration parsed from a log URI.
63///
64/// Format: `log:category?level=info&showHeaders=true&showBody=true&logMask=false&showStreamInfo=false&groupSize=10`
65#[derive(Debug, Clone)]
66pub struct LogConfig {
67    /// Log category (the path portion of the URI).
68    pub category: String,
69    /// Log level. Default: Info.
70    pub level: LogLevel,
71    /// Whether to include headers in the log output.
72    pub show_headers: bool,
73    /// Whether to include the body in the log output.
74    pub show_body: bool,
75    /// Maximum number of characters for the body in log output.
76    /// Bodies longer than this are truncated. `None` means no limit.
77    pub max_chars: Option<usize>,
78    /// When true, redact sensitive headers and body in log output.
79    /// Headers matching `/(?i)(password|secret|token|key|auth|credential)/` → `[REDACTED]`.
80    /// Body → `[Body redacted by logMask]`.
81    pub log_mask: bool,
82    /// When true, show stream origin metadata. When false (default), show `[Stream]`.
83    pub show_stream_info: bool,
84    /// When set, only emit a log every `n` exchanges (group logging).
85    /// The log message includes the exchange count.
86    pub group_size: Option<usize>,
87}
88
89impl UriConfig for LogConfig {
90    fn scheme() -> &'static str {
91        "log"
92    }
93
94    fn from_uri(uri: &str) -> Result<Self, CamelError> {
95        let parts = parse_uri(uri)?;
96        Self::from_components(parts)
97    }
98
99    fn from_components(parts: camel_component_api::UriComponents) -> Result<Self, CamelError> {
100        if parts.scheme != Self::scheme() {
101            return Err(CamelError::InvalidUri(format!(
102                "expected scheme '{}' but got '{}'",
103                Self::scheme(),
104                parts.scheme
105            )));
106        }
107
108        let level = match parts.params.get("level") {
109            Some(raw) => parse_log_level(raw)?,
110            None => LogLevel::Info,
111        };
112
113        let show_headers = match parts.params.get("showHeaders") {
114            Some(raw) => raw.parse::<bool>().map_err(|_| {
115                CamelError::InvalidUri(format!("invalid boolean value for showHeaders: {raw}"))
116            })?,
117            None => false,
118        };
119
120        let show_body = match parts.params.get("showBody") {
121            Some(raw) => raw.parse::<bool>().map_err(|_| {
122                CamelError::InvalidUri(format!("invalid boolean value for showBody: {raw}"))
123            })?,
124            None => true,
125        };
126
127        let max_chars = match parts.params.get("maxChars") {
128            Some(raw) => Some(raw.parse::<usize>().map_err(|_| {
129                CamelError::InvalidUri(format!("invalid integer value for maxChars: {raw}"))
130            })?),
131            None => None,
132        };
133
134        let log_mask = match parts.params.get("logMask") {
135            Some(raw) => raw.parse::<bool>().map_err(|_| {
136                CamelError::InvalidUri(format!("invalid boolean value for logMask: {raw}"))
137            })?,
138            None => false,
139        };
140
141        let show_stream_info = match parts.params.get("showStreamInfo") {
142            Some(raw) => raw.parse::<bool>().map_err(|_| {
143                CamelError::InvalidUri(format!("invalid boolean value for showStreamInfo: {raw}"))
144            })?,
145            None => false,
146        };
147
148        let group_size = match parts.params.get("groupSize") {
149            Some(raw) => Some(raw.parse::<usize>().map_err(|_| {
150                CamelError::InvalidUri(format!("invalid integer value for groupSize: {raw}"))
151            })?),
152            None => None,
153        };
154
155        Ok(Self {
156            category: parts.path,
157            level,
158            show_headers,
159            show_body,
160            max_chars,
161            log_mask,
162            show_stream_info,
163            group_size,
164        })
165    }
166}
167
168// ---------------------------------------------------------------------------
169// LogComponent
170// ---------------------------------------------------------------------------
171
172/// The Log component logs exchange information using `tracing`.
173pub struct LogComponent;
174
175impl LogComponent {
176    pub fn new() -> Self {
177        Self
178    }
179}
180
181impl Default for LogComponent {
182    fn default() -> Self {
183        Self::new()
184    }
185}
186
187impl Component for LogComponent {
188    fn scheme(&self) -> &str {
189        "log"
190    }
191
192    fn create_endpoint(
193        &self,
194        uri: &str,
195        _ctx: &dyn camel_component_api::ComponentContext,
196    ) -> Result<Box<dyn Endpoint>, CamelError> {
197        let config = LogConfig::from_uri(uri)?;
198        Ok(Box::new(LogEndpoint {
199            uri: uri.to_string(),
200            config,
201        }))
202    }
203}
204
205// ---------------------------------------------------------------------------
206// LogEndpoint
207// ---------------------------------------------------------------------------
208
209struct LogEndpoint {
210    uri: String,
211    config: LogConfig,
212}
213
214impl Endpoint for LogEndpoint {
215    fn uri(&self) -> &str {
216        &self.uri
217    }
218
219    fn create_consumer(
220        &self,
221        _rt: Arc<dyn RuntimeObservability>,
222    ) -> Result<Box<dyn Consumer>, CamelError> {
223        Err(CamelError::EndpointCreationFailed(
224            "log endpoint does not support consumers".to_string(),
225        ))
226    }
227
228    fn create_producer(
229        &self,
230        _rt: Arc<dyn RuntimeObservability>,
231        _ctx: &ProducerContext,
232    ) -> Result<BoxProcessor, CamelError> {
233        Ok(BoxProcessor::new(LogProducer::new(self.config.clone())))
234    }
235}
236
237// ---------------------------------------------------------------------------
238// LogProducer
239// ---------------------------------------------------------------------------
240
241#[derive(Clone)]
242struct LogProducer {
243    config: LogConfig,
244    exchange_count: Arc<AtomicUsize>,
245}
246
247impl LogProducer {
248    fn new(config: LogConfig) -> Self {
249        Self {
250            config,
251            exchange_count: Arc::new(AtomicUsize::new(0)),
252        }
253    }
254
255    /// Returns true if the header key matches sensitive patterns.
256    fn is_sensitive_header(key: &str) -> bool {
257        let lower = key.to_lowercase();
258        let sensitive_keywords = [
259            "password",
260            "passwd",
261            "secret",
262            "token",
263            "apikey",
264            "api-key",
265            "api_key",
266            "authorization",
267            "auth",
268            "credential",
269            "private",
270            "signature",
271        ];
272        sensitive_keywords.iter().any(|kw| {
273            lower == *kw
274                || lower.ends_with(&format!("-{kw}"))
275                || lower.ends_with(&format!("_{kw}"))
276                || lower.starts_with(&format!("{kw}-"))
277                || lower.starts_with(&format!("{kw}_"))
278        })
279    }
280
281    fn format_exchange(&self, exchange: &Exchange, count: usize) -> String {
282        let mut parts = Vec::new();
283
284        if self.config.show_body {
285            let body_str = if self.config.log_mask {
286                "[Body redacted by logMask]".to_string()
287            } else {
288                match &exchange.input.body {
289                    camel_component_api::Body::Empty => "[empty]".to_string(),
290                    camel_component_api::Body::Text(s) => s.clone(),
291                    camel_component_api::Body::Json(v) => v.to_string(),
292                    camel_component_api::Body::Xml(s) => s.clone(),
293                    camel_component_api::Body::Bytes(b) => format!("[{} bytes]", b.len()),
294                    camel_component_api::Body::Stream(s) => {
295                        if self.config.show_stream_info {
296                            format!("[Stream: origin={:?}]", s.metadata.origin)
297                        } else {
298                            "[Stream]".to_string()
299                        }
300                    }
301                }
302            };
303
304            let mut body_str = body_str;
305            if let Some(limit) = self.config.max_chars
306                && body_str.len() > limit
307            {
308                body_str.truncate(limit);
309            }
310
311            parts.push(format!("Body: {body_str}"));
312        }
313
314        if self.config.show_headers && !exchange.input.headers.is_empty() {
315            let headers: Vec<String> = exchange
316                .input
317                .headers
318                .iter()
319                .map(|(k, v)| {
320                    if self.config.log_mask && Self::is_sensitive_header(k) {
321                        format!("{k}=[REDACTED]")
322                    } else {
323                        format!("{k}={v}")
324                    }
325                })
326                .collect();
327            parts.push(format!("Headers: {{{}}}", headers.join(", ")));
328        }
329
330        if parts.is_empty() {
331            format!("[{}] Exchange received", self.config.category)
332        } else if self.config.group_size.is_some() {
333            format!(
334                "[{}] Group of {count}: {}",
335                self.config.category,
336                parts.join(" | ")
337            )
338        } else {
339            format!("[{}] {}", self.config.category, parts.join(" | "))
340        }
341    }
342}
343
344impl Service<Exchange> for LogProducer {
345    type Response = Exchange;
346    type Error = CamelError;
347    type Future = Pin<Box<dyn Future<Output = Result<Exchange, CamelError>> + Send>>;
348
349    fn poll_ready(&mut self, _cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
350        Poll::Ready(Ok(()))
351    }
352
353    fn call(&mut self, exchange: Exchange) -> Self::Future {
354        let count = self.exchange_count.fetch_add(1, Ordering::Relaxed) + 1;
355
356        // Group logging: only emit every group_size exchanges
357        if let Some(group_size) = self.config.group_size
358            && !count.is_multiple_of(group_size)
359        {
360            return Box::pin(async move { Ok(exchange) });
361        }
362
363        let msg = self.format_exchange(&exchange, count);
364        let level = self.config.level;
365
366        Box::pin(async move {
367            match level {
368                LogLevel::Trace => trace!("{msg}"),
369                LogLevel::Debug => debug!("{msg}"),
370                LogLevel::Info => info!("{msg}"),
371                LogLevel::Warn => warn!("{msg}"),
372                LogLevel::Error => error!("{msg}"),
373            }
374
375            Ok(exchange)
376        })
377    }
378}
379
380// ---------------------------------------------------------------------------
381// Tests
382// ---------------------------------------------------------------------------
383
384#[cfg(test)]
385mod tests {
386    use camel_component_api::test_support::PanicRuntimeObservability;
387    fn rt() -> std::sync::Arc<dyn camel_component_api::RuntimeObservability> {
388        std::sync::Arc::new(PanicRuntimeObservability)
389    }
390    use super::*;
391    use camel_component_api::Body;
392    use camel_component_api::Message;
393    use camel_component_api::NoOpComponentContext;
394    use tower::ServiceExt;
395
396    fn test_producer_ctx() -> ProducerContext {
397        ProducerContext::new()
398    }
399
400    #[test]
401    fn test_log_config_defaults() {
402        let config = LogConfig::from_uri("log:myCategory").unwrap();
403        assert_eq!(config.category, "myCategory");
404        assert_eq!(config.level, LogLevel::Info);
405        assert!(!config.show_headers);
406        assert!(config.show_body);
407    }
408
409    #[test]
410    fn test_log_config_with_params() {
411        let config =
412            LogConfig::from_uri("log:app?level=debug&showHeaders=true&showBody=false").unwrap();
413        assert_eq!(config.category, "app");
414        assert_eq!(config.level, LogLevel::Debug);
415        assert!(config.show_headers);
416        assert!(!config.show_body);
417    }
418
419    #[test]
420    fn test_log_config_wrong_scheme() {
421        let result = LogConfig::from_uri("timer:tick");
422        assert!(result.is_err());
423    }
424
425    #[test]
426    fn test_log_component_scheme() {
427        let component = LogComponent::new();
428        assert_eq!(component.scheme(), "log");
429    }
430
431    #[test]
432    fn test_log_component_default() {
433        let component = LogComponent;
434        assert_eq!(component.scheme(), "log");
435    }
436
437    #[test]
438    fn test_log_level_from_str_variants() {
439        assert_eq!("trace".parse::<LogLevel>().unwrap(), LogLevel::Trace);
440        assert_eq!("DEBUG".parse::<LogLevel>().unwrap(), LogLevel::Debug);
441        assert_eq!("Info".parse::<LogLevel>().unwrap(), LogLevel::Info);
442        assert_eq!("warning".parse::<LogLevel>().unwrap(), LogLevel::Warn);
443        assert_eq!("error".parse::<LogLevel>().unwrap(), LogLevel::Error);
444    }
445
446    #[test]
447    fn test_log_level_from_str_invalid() {
448        let err = "nope".parse::<LogLevel>().unwrap_err();
449        assert_eq!(
450            err,
451            "Configuration error: unknown log level: 'nope'. Valid: TRACE, DEBUG, INFO, WARN, ERROR"
452        );
453    }
454
455    #[test]
456    fn test_log_config_invalid_level_rejected() {
457        let err = LogConfig::from_uri("log:test?level=invalid").unwrap_err();
458        assert!(
459            err.to_string()
460                .contains("unknown log level: 'invalid'. Valid: TRACE, DEBUG, INFO, WARN, ERROR")
461        );
462    }
463
464    #[test]
465    fn test_valid_log_levels_accepted() {
466        assert!(parse_log_level("DEBUG").is_ok());
467        assert!(parse_log_level("info").is_ok());
468        assert!(parse_log_level("WARN").is_ok());
469        assert!(parse_log_level("WARNING").is_ok());
470    }
471
472    #[test]
473    fn test_invalid_log_level_rejected() {
474        assert!(parse_log_level("VERBOSE").is_err());
475        assert!(parse_log_level("").is_err());
476        assert!(parse_log_level("log").is_err());
477    }
478
479    #[test]
480    fn test_log_endpoint_uri() {
481        let component = LogComponent::new();
482        let endpoint = component
483            .create_endpoint("log:uri-check", &NoOpComponentContext)
484            .unwrap();
485        assert_eq!(endpoint.uri(), "log:uri-check");
486    }
487
488    #[test]
489    fn test_log_endpoint_no_consumer() {
490        let component = LogComponent::new();
491        let endpoint = component
492            .create_endpoint("log:info", &NoOpComponentContext)
493            .unwrap();
494        assert!(endpoint.create_consumer(rt()).is_err());
495    }
496
497    #[test]
498    fn test_log_endpoint_creates_producer() {
499        let ctx = test_producer_ctx();
500        let component = LogComponent::new();
501        let endpoint = component
502            .create_endpoint("log:info", &NoOpComponentContext)
503            .unwrap();
504        assert!(endpoint.create_producer(rt(), &ctx).is_ok());
505    }
506
507    #[tokio::test]
508    async fn test_log_producer_processes_exchange() {
509        let ctx = test_producer_ctx();
510        let component = LogComponent::new();
511        let endpoint = component
512            .create_endpoint("log:test?showHeaders=true", &NoOpComponentContext)
513            .unwrap();
514        let producer = endpoint.create_producer(rt(), &ctx).unwrap();
515
516        let mut exchange = Exchange::new(Message::new("hello world"));
517        exchange
518            .input
519            .set_header("source", serde_json::Value::String("test".into()));
520
521        let result = producer.oneshot(exchange).await.unwrap();
522        // Log producer passes exchange through unchanged
523        assert_eq!(result.input.body.as_text(), Some("hello world"));
524    }
525
526    #[test]
527    fn test_format_exchange_without_body_or_headers() {
528        let producer = LogProducer::new(LogConfig {
529            category: "cat".to_string(),
530            level: LogLevel::Info,
531            show_headers: false,
532            show_body: false,
533            max_chars: None,
534            log_mask: false,
535            show_stream_info: false,
536            group_size: None,
537        });
538        let exchange = Exchange::new(Message::new("ignored"));
539        let formatted = producer.format_exchange(&exchange, 1);
540        assert_eq!(formatted, "[cat] Exchange received");
541    }
542
543    #[test]
544    fn test_format_exchange_body_variants() {
545        let base = LogProducer::new(LogConfig {
546            category: "cat".to_string(),
547            level: LogLevel::Info,
548            show_headers: false,
549            show_body: true,
550            max_chars: None,
551            log_mask: false,
552            show_stream_info: true,
553            group_size: None,
554        });
555
556        let empty = Exchange::new(Message::default());
557        assert!(base.format_exchange(&empty, 1).contains("Body: [empty]"));
558
559        let mut json_msg = Message::new("");
560        json_msg.body = Body::Json(serde_json::json!({"k":"v"}));
561        let json_ex = Exchange::new(json_msg);
562        assert!(
563            base.format_exchange(&json_ex, 2)
564                .contains("Body: {\"k\":\"v\"}")
565        );
566
567        let mut xml_msg = Message::new("");
568        xml_msg.body = Body::Xml("<a/>".to_string());
569        let xml_ex = Exchange::new(xml_msg);
570        assert!(base.format_exchange(&xml_ex, 3).contains("Body: <a/>"));
571
572        let mut bytes_msg = Message::new("");
573        bytes_msg.body = Body::Bytes(b"abc".to_vec().into());
574        let bytes_ex = Exchange::new(bytes_msg);
575        assert!(
576            base.format_exchange(&bytes_ex, 4)
577                .contains("Body: [3 bytes]")
578        );
579    }
580
581    #[test]
582    fn test_log_truncates_large_body() {
583        let producer = LogProducer::new(LogConfig {
584            category: "trunc".to_string(),
585            level: LogLevel::Info,
586            show_headers: false,
587            show_body: true,
588            max_chars: Some(10),
589            log_mask: false,
590            show_stream_info: false,
591            group_size: None,
592        });
593
594        let long_body = "a".repeat(100);
595        let exchange = Exchange::new(Message::new(long_body));
596        let formatted = producer.format_exchange(&exchange, 1);
597
598        // Extract the body part from "[trunc] Body: ..."
599        let body_part = formatted.split_once("Body: ").unwrap().1;
600        assert!(
601            body_part.len() <= 10,
602            "expected body <= 10 chars, got {} chars: {body_part:?}",
603            body_part.len()
604        );
605    }
606
607    #[test]
608    fn test_log_no_truncation_when_max_chars_unset() {
609        let producer = LogProducer::new(LogConfig {
610            category: "notrunc".to_string(),
611            level: LogLevel::Info,
612            show_headers: false,
613            show_body: true,
614            max_chars: None,
615            log_mask: false,
616            show_stream_info: false,
617            group_size: None,
618        });
619
620        let long_body = "b".repeat(200);
621        let exchange = Exchange::new(Message::new(long_body));
622        let formatted = producer.format_exchange(&exchange, 1);
623
624        let body_part = formatted.split_once("Body: ").unwrap().1;
625        assert_eq!(body_part.len(), 200);
626    }
627
628    #[test]
629    fn test_log_config_max_chars_param() {
630        let config = LogConfig::from_uri("log:test?maxChars=50").unwrap();
631        assert_eq!(config.max_chars, Some(50));
632    }
633
634    #[test]
635    fn test_log_config_max_chars_default_unset() {
636        let config = LogConfig::from_uri("log:test").unwrap();
637        assert_eq!(config.max_chars, None);
638    }
639
640    // --- LOG-007: logMask tests ---
641
642    #[test]
643    fn test_log_config_log_mask_param() {
644        let config = LogConfig::from_uri("log:test?logMask=true").unwrap();
645        assert!(config.log_mask);
646    }
647
648    #[test]
649    fn test_log_config_log_mask_default_false() {
650        let config = LogConfig::from_uri("log:test").unwrap();
651        assert!(!config.log_mask);
652    }
653
654    #[test]
655    fn test_log_mask_redacts_sensitive_headers() {
656        let producer = LogProducer::new(LogConfig {
657            category: "cat".to_string(),
658            level: LogLevel::Info,
659            show_headers: true,
660            show_body: false,
661            max_chars: None,
662            log_mask: true,
663            show_stream_info: false,
664            group_size: None,
665        });
666
667        let mut exchange = Exchange::new(Message::new("body"));
668        exchange.input.set_header(
669            "X-Auth-Token",
670            serde_json::Value::String("secret123".into()),
671        );
672        exchange
673            .input
674            .set_header("password", serde_json::Value::String("hunter2".into()));
675        exchange
676            .input
677            .set_header("ApiKey", serde_json::Value::String("abc".into()));
678        exchange
679            .input
680            .set_header("normal-header", serde_json::Value::String("visible".into()));
681        exchange.input.set_header(
682            "user-credential",
683            serde_json::Value::String("sensitive".into()),
684        );
685        exchange
686            .input
687            .set_header("secret-value", serde_json::Value::String("hidden".into()));
688
689        let formatted = producer.format_exchange(&exchange, 1);
690        assert!(
691            formatted.contains("X-Auth-Token=[REDACTED]"),
692            "auth header must be redacted: {formatted}"
693        );
694        assert!(
695            formatted.contains("password=[REDACTED]"),
696            "password header must be redacted: {formatted}"
697        );
698        assert!(
699            formatted.contains("ApiKey=[REDACTED]"),
700            "key header must be redacted: {formatted}"
701        );
702        assert!(
703            formatted.contains("normal-header=\"visible\""),
704            "normal header must be visible: {formatted}"
705        );
706        assert!(
707            formatted.contains("user-credential=[REDACTED]"),
708            "credential header must be redacted: {formatted}"
709        );
710        assert!(
711            formatted.contains("secret-value=[REDACTED]"),
712            "secret header must be redacted: {formatted}"
713        );
714    }
715
716    #[test]
717    fn test_log_mask_redacts_body() {
718        let producer = LogProducer::new(LogConfig {
719            category: "cat".to_string(),
720            level: LogLevel::Info,
721            show_headers: false,
722            show_body: true,
723            max_chars: None,
724            log_mask: true,
725            show_stream_info: false,
726            group_size: None,
727        });
728
729        let exchange = Exchange::new(Message::new("sensitive body content"));
730        let formatted = producer.format_exchange(&exchange, 1);
731        assert!(
732            formatted.contains("[Body redacted by logMask]"),
733            "body must be redacted: {formatted}"
734        );
735        assert!(
736            !formatted.contains("sensitive body content"),
737            "body content must not appear: {formatted}"
738        );
739    }
740
741    #[test]
742    fn test_log_mask_off_shows_data() {
743        let producer = LogProducer::new(LogConfig {
744            category: "cat".to_string(),
745            level: LogLevel::Info,
746            show_headers: true,
747            show_body: true,
748            max_chars: None,
749            log_mask: false,
750            show_stream_info: false,
751            group_size: None,
752        });
753
754        let mut exchange = Exchange::new(Message::new("visible body"));
755        exchange
756            .input
757            .set_header("password", serde_json::Value::String("hunter2".into()));
758
759        let formatted = producer.format_exchange(&exchange, 1);
760        assert!(
761            formatted.contains("visible body"),
762            "body must be visible when mask off: {formatted}"
763        );
764        assert!(
765            formatted.contains("hunter2"),
766            "header value must be visible when mask off: {formatted}"
767        );
768    }
769
770    // --- LOG-002: showStreamInfo tests ---
771
772    #[test]
773    fn test_log_stream_show_info() {
774        // show_stream_info = false → just [Stream]
775        let producer_no_info = LogProducer::new(LogConfig {
776            category: "cat".to_string(),
777            level: LogLevel::Info,
778            show_headers: false,
779            show_body: true,
780            max_chars: None,
781            log_mask: false,
782            show_stream_info: false,
783            group_size: None,
784        });
785
786        let mut msg = Message::new("");
787        msg.body = Body::Stream(camel_component_api::StreamBody {
788            stream: std::sync::Arc::new(tokio::sync::Mutex::new(None)),
789            metadata: camel_component_api::StreamMetadata {
790                origin: Some("file:///data/test.txt".to_string()),
791                ..Default::default()
792            },
793        });
794        let exchange = Exchange::new(msg);
795        let formatted = producer_no_info.format_exchange(&exchange, 1);
796        assert!(
797            formatted.contains("Body: [Stream]"),
798            "must show [Stream] when show_stream_info=false: {formatted}"
799        );
800
801        // show_stream_info = true → [Stream: origin=...]
802        let producer_with_info = LogProducer::new(LogConfig {
803            category: "cat".to_string(),
804            level: LogLevel::Info,
805            show_headers: false,
806            show_body: true,
807            max_chars: None,
808            log_mask: false,
809            show_stream_info: true,
810            group_size: None,
811        });
812
813        let formatted = producer_with_info.format_exchange(&exchange, 1);
814        assert!(
815            formatted.contains("Body: [Stream: origin=Some(\"file:///data/test.txt\")]"),
816            "must show origin when show_stream_info=true: {formatted}"
817        );
818    }
819
820    // --- LOG-008: groupSize tests ---
821
822    #[test]
823    fn test_log_group_size() {
824        let producer = LogProducer::new(LogConfig {
825            category: "cat".to_string(),
826            level: LogLevel::Info,
827            show_headers: false,
828            show_body: true,
829            max_chars: None,
830            log_mask: false,
831            show_stream_info: false,
832            group_size: Some(3),
833        });
834
835        // Count 1 and 2 should not trigger logging, count 3 should
836        let ex1 = Exchange::new(Message::new("first"));
837        let formatted1 = producer.format_exchange(&ex1, 3);
838        assert!(
839            formatted1.contains("Group of 3:"),
840            "group_size=3 must include count: {formatted1}"
841        );
842        assert!(
843            formatted1.contains("Body: first"),
844            "group log must include body: {formatted1}"
845        );
846    }
847
848    #[test]
849    fn test_log_config_group_size_param() {
850        let config = LogConfig::from_uri("log:test?groupSize=10").unwrap();
851        assert_eq!(config.group_size, Some(10));
852    }
853
854    #[test]
855    fn test_log_config_group_size_default_unset() {
856        let config = LogConfig::from_uri("log:test").unwrap();
857        assert_eq!(config.group_size, None);
858    }
859
860    #[test]
861    fn test_log_config_show_stream_info_param() {
862        let config = LogConfig::from_uri("log:test?showStreamInfo=true").unwrap();
863        assert!(config.show_stream_info);
864    }
865
866    #[test]
867    fn test_log_config_show_stream_info_default_false() {
868        let config = LogConfig::from_uri("log:test").unwrap();
869        assert!(!config.show_stream_info);
870    }
871}