Skip to main content

camel_component_log/
lib.rs

1//! Log component for rust-camel — routes exchange body and headers to the
2//! `tracing` subsystem at a configurable log level, primarily for debugging.
3//!
4//! Main types: `LogComponent`, `LogEndpoint`, `LogProducer`, `LogLevel`.
5
6use std::future::Future;
7use std::pin::Pin;
8use std::str::FromStr;
9use std::sync::Arc;
10use std::sync::atomic::{AtomicUsize, Ordering};
11use std::task::{Context, Poll};
12
13use tower::Service;
14use tracing::{debug, error, info, trace, warn};
15
16use camel_component_api::UriConfig;
17use camel_component_api::parse_uri;
18use camel_component_api::{BoxProcessor, CamelError, Exchange};
19use camel_component_api::{Component, Consumer, Endpoint, ProducerContext};
20
21// ---------------------------------------------------------------------------
22// LogLevel
23// ---------------------------------------------------------------------------
24
25/// Log level for the log component.
26#[derive(Debug, Clone, Copy, PartialEq, Eq, Default)]
27pub enum LogLevel {
28    Trace,
29    Debug,
30    #[default]
31    Info,
32    Warn,
33    Error,
34}
35
36impl FromStr for LogLevel {
37    type Err = String;
38
39    fn from_str(s: &str) -> Result<Self, Self::Err> {
40        parse_log_level(s).map_err(|e| e.to_string())
41    }
42}
43
44fn parse_log_level(s: &str) -> Result<LogLevel, CamelError> {
45    match s.to_uppercase().as_str() {
46        "TRACE" => Ok(LogLevel::Trace),
47        "DEBUG" => Ok(LogLevel::Debug),
48        "INFO" => Ok(LogLevel::Info),
49        "WARN" | "WARNING" => Ok(LogLevel::Warn),
50        "ERROR" => Ok(LogLevel::Error),
51        _ => Err(CamelError::Config(format!(
52            "unknown log level: '{}'. Valid: TRACE, DEBUG, INFO, WARN, ERROR",
53            s
54        ))),
55    }
56}
57
58// ---------------------------------------------------------------------------
59// LogConfig
60// ---------------------------------------------------------------------------
61
62/// Configuration parsed from a log URI.
63///
64/// Format: `log:category?level=info&showHeaders=true&showBody=true&logMask=false&showStreamInfo=false&groupSize=10`
65#[derive(Debug, Clone)]
66pub struct LogConfig {
67    /// Log category (the path portion of the URI).
68    pub category: String,
69    /// Log level. Default: Info.
70    pub level: LogLevel,
71    /// Whether to include headers in the log output.
72    pub show_headers: bool,
73    /// Whether to include the body in the log output.
74    pub show_body: bool,
75    /// Maximum number of characters for the body in log output.
76    /// Bodies longer than this are truncated. `None` means no limit.
77    pub max_chars: Option<usize>,
78    /// When true, redact sensitive headers and body in log output.
79    /// Headers matching `/(?i)(password|secret|token|key|auth|credential)/` → `[REDACTED]`.
80    /// Body → `[Body redacted by logMask]`.
81    pub log_mask: bool,
82    /// When true, show stream origin metadata. When false (default), show `[Stream]`.
83    pub show_stream_info: bool,
84    /// When set, only emit a log every `n` exchanges (group logging).
85    /// The log message includes the exchange count.
86    pub group_size: Option<usize>,
87}
88
89impl UriConfig for LogConfig {
90    fn scheme() -> &'static str {
91        "log"
92    }
93
94    fn from_uri(uri: &str) -> Result<Self, CamelError> {
95        let parts = parse_uri(uri)?;
96        Self::from_components(parts)
97    }
98
99    fn from_components(parts: camel_component_api::UriComponents) -> Result<Self, CamelError> {
100        if parts.scheme != Self::scheme() {
101            return Err(CamelError::InvalidUri(format!(
102                "expected scheme '{}' but got '{}'",
103                Self::scheme(),
104                parts.scheme
105            )));
106        }
107
108        let level = match parts.params.get("level") {
109            Some(raw) => parse_log_level(raw)?,
110            None => LogLevel::Info,
111        };
112
113        let show_headers = match parts.params.get("showHeaders") {
114            Some(raw) => raw.parse::<bool>().map_err(|_| {
115                CamelError::InvalidUri(format!("invalid boolean value for showHeaders: {raw}"))
116            })?,
117            None => false,
118        };
119
120        let show_body = match parts.params.get("showBody") {
121            Some(raw) => raw.parse::<bool>().map_err(|_| {
122                CamelError::InvalidUri(format!("invalid boolean value for showBody: {raw}"))
123            })?,
124            None => true,
125        };
126
127        let max_chars = match parts.params.get("maxChars") {
128            Some(raw) => Some(raw.parse::<usize>().map_err(|_| {
129                CamelError::InvalidUri(format!("invalid integer value for maxChars: {raw}"))
130            })?),
131            None => None,
132        };
133
134        let log_mask = match parts.params.get("logMask") {
135            Some(raw) => raw.parse::<bool>().map_err(|_| {
136                CamelError::InvalidUri(format!("invalid boolean value for logMask: {raw}"))
137            })?,
138            None => false,
139        };
140
141        let show_stream_info = match parts.params.get("showStreamInfo") {
142            Some(raw) => raw.parse::<bool>().map_err(|_| {
143                CamelError::InvalidUri(format!("invalid boolean value for showStreamInfo: {raw}"))
144            })?,
145            None => false,
146        };
147
148        let group_size = match parts.params.get("groupSize") {
149            Some(raw) => Some(raw.parse::<usize>().map_err(|_| {
150                CamelError::InvalidUri(format!("invalid integer value for groupSize: {raw}"))
151            })?),
152            None => None,
153        };
154
155        Ok(Self {
156            category: parts.path,
157            level,
158            show_headers,
159            show_body,
160            max_chars,
161            log_mask,
162            show_stream_info,
163            group_size,
164        })
165    }
166}
167
168// ---------------------------------------------------------------------------
169// LogComponent
170// ---------------------------------------------------------------------------
171
172/// The Log component logs exchange information using `tracing`.
173pub struct LogComponent;
174
175impl LogComponent {
176    pub fn new() -> Self {
177        Self
178    }
179}
180
181impl Default for LogComponent {
182    fn default() -> Self {
183        Self::new()
184    }
185}
186
187impl Component for LogComponent {
188    fn scheme(&self) -> &str {
189        "log"
190    }
191
192    fn create_endpoint(
193        &self,
194        uri: &str,
195        _ctx: &dyn camel_component_api::ComponentContext,
196    ) -> Result<Box<dyn Endpoint>, CamelError> {
197        let config = LogConfig::from_uri(uri)?;
198        Ok(Box::new(LogEndpoint {
199            uri: uri.to_string(),
200            config,
201        }))
202    }
203}
204
205// ---------------------------------------------------------------------------
206// LogEndpoint
207// ---------------------------------------------------------------------------
208
209struct LogEndpoint {
210    uri: String,
211    config: LogConfig,
212}
213
214impl Endpoint for LogEndpoint {
215    fn uri(&self) -> &str {
216        &self.uri
217    }
218
219    fn create_consumer(&self) -> Result<Box<dyn Consumer>, CamelError> {
220        Err(CamelError::EndpointCreationFailed(
221            "log endpoint does not support consumers".to_string(),
222        ))
223    }
224
225    fn create_producer(&self, _ctx: &ProducerContext) -> Result<BoxProcessor, CamelError> {
226        Ok(BoxProcessor::new(LogProducer::new(self.config.clone())))
227    }
228}
229
230// ---------------------------------------------------------------------------
231// LogProducer
232// ---------------------------------------------------------------------------
233
234#[derive(Clone)]
235struct LogProducer {
236    config: LogConfig,
237    exchange_count: Arc<AtomicUsize>,
238}
239
240impl LogProducer {
241    fn new(config: LogConfig) -> Self {
242        Self {
243            config,
244            exchange_count: Arc::new(AtomicUsize::new(0)),
245        }
246    }
247
248    /// Returns true if the header key matches sensitive patterns.
249    fn is_sensitive_header(key: &str) -> bool {
250        let lower = key.to_lowercase();
251        let sensitive_keywords = [
252            "password",
253            "passwd",
254            "secret",
255            "token",
256            "apikey",
257            "api-key",
258            "api_key",
259            "authorization",
260            "auth",
261            "credential",
262            "private",
263            "signature",
264        ];
265        sensitive_keywords.iter().any(|kw| {
266            lower == *kw
267                || lower.ends_with(&format!("-{kw}"))
268                || lower.ends_with(&format!("_{kw}"))
269                || lower.starts_with(&format!("{kw}-"))
270                || lower.starts_with(&format!("{kw}_"))
271        })
272    }
273
274    fn format_exchange(&self, exchange: &Exchange, count: usize) -> String {
275        let mut parts = Vec::new();
276
277        if self.config.show_body {
278            let body_str = if self.config.log_mask {
279                "[Body redacted by logMask]".to_string()
280            } else {
281                match &exchange.input.body {
282                    camel_component_api::Body::Empty => "[empty]".to_string(),
283                    camel_component_api::Body::Text(s) => s.clone(),
284                    camel_component_api::Body::Json(v) => v.to_string(),
285                    camel_component_api::Body::Xml(s) => s.clone(),
286                    camel_component_api::Body::Bytes(b) => format!("[{} bytes]", b.len()),
287                    camel_component_api::Body::Stream(s) => {
288                        if self.config.show_stream_info {
289                            format!("[Stream: origin={:?}]", s.metadata.origin)
290                        } else {
291                            "[Stream]".to_string()
292                        }
293                    }
294                }
295            };
296
297            let mut body_str = body_str;
298            if let Some(limit) = self.config.max_chars
299                && body_str.len() > limit
300            {
301                body_str.truncate(limit);
302            }
303
304            parts.push(format!("Body: {body_str}"));
305        }
306
307        if self.config.show_headers && !exchange.input.headers.is_empty() {
308            let headers: Vec<String> = exchange
309                .input
310                .headers
311                .iter()
312                .map(|(k, v)| {
313                    if self.config.log_mask && Self::is_sensitive_header(k) {
314                        format!("{k}=[REDACTED]")
315                    } else {
316                        format!("{k}={v}")
317                    }
318                })
319                .collect();
320            parts.push(format!("Headers: {{{}}}", headers.join(", ")));
321        }
322
323        if parts.is_empty() {
324            format!("[{}] Exchange received", self.config.category)
325        } else if self.config.group_size.is_some() {
326            format!(
327                "[{}] Group of {count}: {}",
328                self.config.category,
329                parts.join(" | ")
330            )
331        } else {
332            format!("[{}] {}", self.config.category, parts.join(" | "))
333        }
334    }
335}
336
337impl Service<Exchange> for LogProducer {
338    type Response = Exchange;
339    type Error = CamelError;
340    type Future = Pin<Box<dyn Future<Output = Result<Exchange, CamelError>> + Send>>;
341
342    fn poll_ready(&mut self, _cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
343        Poll::Ready(Ok(()))
344    }
345
346    fn call(&mut self, exchange: Exchange) -> Self::Future {
347        let count = self.exchange_count.fetch_add(1, Ordering::Relaxed) + 1;
348
349        // Group logging: only emit every group_size exchanges
350        if let Some(group_size) = self.config.group_size
351            && !count.is_multiple_of(group_size)
352        {
353            return Box::pin(async move { Ok(exchange) });
354        }
355
356        let msg = self.format_exchange(&exchange, count);
357        let level = self.config.level;
358
359        Box::pin(async move {
360            match level {
361                LogLevel::Trace => trace!("{msg}"),
362                LogLevel::Debug => debug!("{msg}"),
363                LogLevel::Info => info!("{msg}"),
364                LogLevel::Warn => warn!("{msg}"),
365                LogLevel::Error => error!("{msg}"),
366            }
367
368            Ok(exchange)
369        })
370    }
371}
372
373// ---------------------------------------------------------------------------
374// Tests
375// ---------------------------------------------------------------------------
376
377#[cfg(test)]
378mod tests {
379    use super::*;
380    use camel_component_api::Body;
381    use camel_component_api::Message;
382    use camel_component_api::NoOpComponentContext;
383    use tower::ServiceExt;
384
385    fn test_producer_ctx() -> ProducerContext {
386        ProducerContext::new()
387    }
388
389    #[test]
390    fn test_log_config_defaults() {
391        let config = LogConfig::from_uri("log:myCategory").unwrap();
392        assert_eq!(config.category, "myCategory");
393        assert_eq!(config.level, LogLevel::Info);
394        assert!(!config.show_headers);
395        assert!(config.show_body);
396    }
397
398    #[test]
399    fn test_log_config_with_params() {
400        let config =
401            LogConfig::from_uri("log:app?level=debug&showHeaders=true&showBody=false").unwrap();
402        assert_eq!(config.category, "app");
403        assert_eq!(config.level, LogLevel::Debug);
404        assert!(config.show_headers);
405        assert!(!config.show_body);
406    }
407
408    #[test]
409    fn test_log_config_wrong_scheme() {
410        let result = LogConfig::from_uri("timer:tick");
411        assert!(result.is_err());
412    }
413
414    #[test]
415    fn test_log_component_scheme() {
416        let component = LogComponent::new();
417        assert_eq!(component.scheme(), "log");
418    }
419
420    #[test]
421    fn test_log_component_default() {
422        let component = LogComponent;
423        assert_eq!(component.scheme(), "log");
424    }
425
426    #[test]
427    fn test_log_level_from_str_variants() {
428        assert_eq!("trace".parse::<LogLevel>().unwrap(), LogLevel::Trace);
429        assert_eq!("DEBUG".parse::<LogLevel>().unwrap(), LogLevel::Debug);
430        assert_eq!("Info".parse::<LogLevel>().unwrap(), LogLevel::Info);
431        assert_eq!("warning".parse::<LogLevel>().unwrap(), LogLevel::Warn);
432        assert_eq!("error".parse::<LogLevel>().unwrap(), LogLevel::Error);
433    }
434
435    #[test]
436    fn test_log_level_from_str_invalid() {
437        let err = "nope".parse::<LogLevel>().unwrap_err();
438        assert_eq!(
439            err,
440            "Configuration error: unknown log level: 'nope'. Valid: TRACE, DEBUG, INFO, WARN, ERROR"
441        );
442    }
443
444    #[test]
445    fn test_log_config_invalid_level_rejected() {
446        let err = LogConfig::from_uri("log:test?level=invalid").unwrap_err();
447        assert!(
448            err.to_string()
449                .contains("unknown log level: 'invalid'. Valid: TRACE, DEBUG, INFO, WARN, ERROR")
450        );
451    }
452
453    #[test]
454    fn test_valid_log_levels_accepted() {
455        assert!(parse_log_level("DEBUG").is_ok());
456        assert!(parse_log_level("info").is_ok());
457        assert!(parse_log_level("WARN").is_ok());
458        assert!(parse_log_level("WARNING").is_ok());
459    }
460
461    #[test]
462    fn test_invalid_log_level_rejected() {
463        assert!(parse_log_level("VERBOSE").is_err());
464        assert!(parse_log_level("").is_err());
465        assert!(parse_log_level("log").is_err());
466    }
467
468    #[test]
469    fn test_log_endpoint_uri() {
470        let component = LogComponent::new();
471        let endpoint = component
472            .create_endpoint("log:uri-check", &NoOpComponentContext)
473            .unwrap();
474        assert_eq!(endpoint.uri(), "log:uri-check");
475    }
476
477    #[test]
478    fn test_log_endpoint_no_consumer() {
479        let component = LogComponent::new();
480        let endpoint = component
481            .create_endpoint("log:info", &NoOpComponentContext)
482            .unwrap();
483        assert!(endpoint.create_consumer().is_err());
484    }
485
486    #[test]
487    fn test_log_endpoint_creates_producer() {
488        let ctx = test_producer_ctx();
489        let component = LogComponent::new();
490        let endpoint = component
491            .create_endpoint("log:info", &NoOpComponentContext)
492            .unwrap();
493        assert!(endpoint.create_producer(&ctx).is_ok());
494    }
495
496    #[tokio::test]
497    async fn test_log_producer_processes_exchange() {
498        let ctx = test_producer_ctx();
499        let component = LogComponent::new();
500        let endpoint = component
501            .create_endpoint("log:test?showHeaders=true", &NoOpComponentContext)
502            .unwrap();
503        let producer = endpoint.create_producer(&ctx).unwrap();
504
505        let mut exchange = Exchange::new(Message::new("hello world"));
506        exchange
507            .input
508            .set_header("source", serde_json::Value::String("test".into()));
509
510        let result = producer.oneshot(exchange).await.unwrap();
511        // Log producer passes exchange through unchanged
512        assert_eq!(result.input.body.as_text(), Some("hello world"));
513    }
514
515    #[test]
516    fn test_format_exchange_without_body_or_headers() {
517        let producer = LogProducer::new(LogConfig {
518            category: "cat".to_string(),
519            level: LogLevel::Info,
520            show_headers: false,
521            show_body: false,
522            max_chars: None,
523            log_mask: false,
524            show_stream_info: false,
525            group_size: None,
526        });
527        let exchange = Exchange::new(Message::new("ignored"));
528        let formatted = producer.format_exchange(&exchange, 1);
529        assert_eq!(formatted, "[cat] Exchange received");
530    }
531
532    #[test]
533    fn test_format_exchange_body_variants() {
534        let base = LogProducer::new(LogConfig {
535            category: "cat".to_string(),
536            level: LogLevel::Info,
537            show_headers: false,
538            show_body: true,
539            max_chars: None,
540            log_mask: false,
541            show_stream_info: true,
542            group_size: None,
543        });
544
545        let empty = Exchange::new(Message::default());
546        assert!(base.format_exchange(&empty, 1).contains("Body: [empty]"));
547
548        let mut json_msg = Message::new("");
549        json_msg.body = Body::Json(serde_json::json!({"k":"v"}));
550        let json_ex = Exchange::new(json_msg);
551        assert!(
552            base.format_exchange(&json_ex, 2)
553                .contains("Body: {\"k\":\"v\"}")
554        );
555
556        let mut xml_msg = Message::new("");
557        xml_msg.body = Body::Xml("<a/>".to_string());
558        let xml_ex = Exchange::new(xml_msg);
559        assert!(base.format_exchange(&xml_ex, 3).contains("Body: <a/>"));
560
561        let mut bytes_msg = Message::new("");
562        bytes_msg.body = Body::Bytes(b"abc".to_vec().into());
563        let bytes_ex = Exchange::new(bytes_msg);
564        assert!(
565            base.format_exchange(&bytes_ex, 4)
566                .contains("Body: [3 bytes]")
567        );
568    }
569
570    #[test]
571    fn test_log_truncates_large_body() {
572        let producer = LogProducer::new(LogConfig {
573            category: "trunc".to_string(),
574            level: LogLevel::Info,
575            show_headers: false,
576            show_body: true,
577            max_chars: Some(10),
578            log_mask: false,
579            show_stream_info: false,
580            group_size: None,
581        });
582
583        let long_body = "a".repeat(100);
584        let exchange = Exchange::new(Message::new(long_body));
585        let formatted = producer.format_exchange(&exchange, 1);
586
587        // Extract the body part from "[trunc] Body: ..."
588        let body_part = formatted.split_once("Body: ").unwrap().1;
589        assert!(
590            body_part.len() <= 10,
591            "expected body <= 10 chars, got {} chars: {body_part:?}",
592            body_part.len()
593        );
594    }
595
596    #[test]
597    fn test_log_no_truncation_when_max_chars_unset() {
598        let producer = LogProducer::new(LogConfig {
599            category: "notrunc".to_string(),
600            level: LogLevel::Info,
601            show_headers: false,
602            show_body: true,
603            max_chars: None,
604            log_mask: false,
605            show_stream_info: false,
606            group_size: None,
607        });
608
609        let long_body = "b".repeat(200);
610        let exchange = Exchange::new(Message::new(long_body));
611        let formatted = producer.format_exchange(&exchange, 1);
612
613        let body_part = formatted.split_once("Body: ").unwrap().1;
614        assert_eq!(body_part.len(), 200);
615    }
616
617    #[test]
618    fn test_log_config_max_chars_param() {
619        let config = LogConfig::from_uri("log:test?maxChars=50").unwrap();
620        assert_eq!(config.max_chars, Some(50));
621    }
622
623    #[test]
624    fn test_log_config_max_chars_default_unset() {
625        let config = LogConfig::from_uri("log:test").unwrap();
626        assert_eq!(config.max_chars, None);
627    }
628
629    // --- LOG-007: logMask tests ---
630
631    #[test]
632    fn test_log_config_log_mask_param() {
633        let config = LogConfig::from_uri("log:test?logMask=true").unwrap();
634        assert!(config.log_mask);
635    }
636
637    #[test]
638    fn test_log_config_log_mask_default_false() {
639        let config = LogConfig::from_uri("log:test").unwrap();
640        assert!(!config.log_mask);
641    }
642
643    #[test]
644    fn test_log_mask_redacts_sensitive_headers() {
645        let producer = LogProducer::new(LogConfig {
646            category: "cat".to_string(),
647            level: LogLevel::Info,
648            show_headers: true,
649            show_body: false,
650            max_chars: None,
651            log_mask: true,
652            show_stream_info: false,
653            group_size: None,
654        });
655
656        let mut exchange = Exchange::new(Message::new("body"));
657        exchange.input.set_header(
658            "X-Auth-Token",
659            serde_json::Value::String("secret123".into()),
660        );
661        exchange
662            .input
663            .set_header("password", serde_json::Value::String("hunter2".into()));
664        exchange
665            .input
666            .set_header("ApiKey", serde_json::Value::String("abc".into()));
667        exchange
668            .input
669            .set_header("normal-header", serde_json::Value::String("visible".into()));
670        exchange.input.set_header(
671            "user-credential",
672            serde_json::Value::String("sensitive".into()),
673        );
674        exchange
675            .input
676            .set_header("secret-value", serde_json::Value::String("hidden".into()));
677
678        let formatted = producer.format_exchange(&exchange, 1);
679        assert!(
680            formatted.contains("X-Auth-Token=[REDACTED]"),
681            "auth header must be redacted: {formatted}"
682        );
683        assert!(
684            formatted.contains("password=[REDACTED]"),
685            "password header must be redacted: {formatted}"
686        );
687        assert!(
688            formatted.contains("ApiKey=[REDACTED]"),
689            "key header must be redacted: {formatted}"
690        );
691        assert!(
692            formatted.contains("normal-header=\"visible\""),
693            "normal header must be visible: {formatted}"
694        );
695        assert!(
696            formatted.contains("user-credential=[REDACTED]"),
697            "credential header must be redacted: {formatted}"
698        );
699        assert!(
700            formatted.contains("secret-value=[REDACTED]"),
701            "secret header must be redacted: {formatted}"
702        );
703    }
704
705    #[test]
706    fn test_log_mask_redacts_body() {
707        let producer = LogProducer::new(LogConfig {
708            category: "cat".to_string(),
709            level: LogLevel::Info,
710            show_headers: false,
711            show_body: true,
712            max_chars: None,
713            log_mask: true,
714            show_stream_info: false,
715            group_size: None,
716        });
717
718        let exchange = Exchange::new(Message::new("sensitive body content"));
719        let formatted = producer.format_exchange(&exchange, 1);
720        assert!(
721            formatted.contains("[Body redacted by logMask]"),
722            "body must be redacted: {formatted}"
723        );
724        assert!(
725            !formatted.contains("sensitive body content"),
726            "body content must not appear: {formatted}"
727        );
728    }
729
730    #[test]
731    fn test_log_mask_off_shows_data() {
732        let producer = LogProducer::new(LogConfig {
733            category: "cat".to_string(),
734            level: LogLevel::Info,
735            show_headers: true,
736            show_body: true,
737            max_chars: None,
738            log_mask: false,
739            show_stream_info: false,
740            group_size: None,
741        });
742
743        let mut exchange = Exchange::new(Message::new("visible body"));
744        exchange
745            .input
746            .set_header("password", serde_json::Value::String("hunter2".into()));
747
748        let formatted = producer.format_exchange(&exchange, 1);
749        assert!(
750            formatted.contains("visible body"),
751            "body must be visible when mask off: {formatted}"
752        );
753        assert!(
754            formatted.contains("hunter2"),
755            "header value must be visible when mask off: {formatted}"
756        );
757    }
758
759    // --- LOG-002: showStreamInfo tests ---
760
761    #[test]
762    fn test_log_stream_show_info() {
763        // show_stream_info = false → just [Stream]
764        let producer_no_info = LogProducer::new(LogConfig {
765            category: "cat".to_string(),
766            level: LogLevel::Info,
767            show_headers: false,
768            show_body: true,
769            max_chars: None,
770            log_mask: false,
771            show_stream_info: false,
772            group_size: None,
773        });
774
775        let mut msg = Message::new("");
776        msg.body = Body::Stream(camel_component_api::StreamBody {
777            stream: std::sync::Arc::new(tokio::sync::Mutex::new(None)),
778            metadata: camel_component_api::StreamMetadata {
779                origin: Some("file:///data/test.txt".to_string()),
780                ..Default::default()
781            },
782        });
783        let exchange = Exchange::new(msg);
784        let formatted = producer_no_info.format_exchange(&exchange, 1);
785        assert!(
786            formatted.contains("Body: [Stream]"),
787            "must show [Stream] when show_stream_info=false: {formatted}"
788        );
789
790        // show_stream_info = true → [Stream: origin=...]
791        let producer_with_info = LogProducer::new(LogConfig {
792            category: "cat".to_string(),
793            level: LogLevel::Info,
794            show_headers: false,
795            show_body: true,
796            max_chars: None,
797            log_mask: false,
798            show_stream_info: true,
799            group_size: None,
800        });
801
802        let formatted = producer_with_info.format_exchange(&exchange, 1);
803        assert!(
804            formatted.contains("Body: [Stream: origin=Some(\"file:///data/test.txt\")]"),
805            "must show origin when show_stream_info=true: {formatted}"
806        );
807    }
808
809    // --- LOG-008: groupSize tests ---
810
811    #[test]
812    fn test_log_group_size() {
813        let producer = LogProducer::new(LogConfig {
814            category: "cat".to_string(),
815            level: LogLevel::Info,
816            show_headers: false,
817            show_body: true,
818            max_chars: None,
819            log_mask: false,
820            show_stream_info: false,
821            group_size: Some(3),
822        });
823
824        // Count 1 and 2 should not trigger logging, count 3 should
825        let ex1 = Exchange::new(Message::new("first"));
826        let formatted1 = producer.format_exchange(&ex1, 3);
827        assert!(
828            formatted1.contains("Group of 3:"),
829            "group_size=3 must include count: {formatted1}"
830        );
831        assert!(
832            formatted1.contains("Body: first"),
833            "group log must include body: {formatted1}"
834        );
835    }
836
837    #[test]
838    fn test_log_config_group_size_param() {
839        let config = LogConfig::from_uri("log:test?groupSize=10").unwrap();
840        assert_eq!(config.group_size, Some(10));
841    }
842
843    #[test]
844    fn test_log_config_group_size_default_unset() {
845        let config = LogConfig::from_uri("log:test").unwrap();
846        assert_eq!(config.group_size, None);
847    }
848
849    #[test]
850    fn test_log_config_show_stream_info_param() {
851        let config = LogConfig::from_uri("log:test?showStreamInfo=true").unwrap();
852        assert!(config.show_stream_info);
853    }
854
855    #[test]
856    fn test_log_config_show_stream_info_default_false() {
857        let config = LogConfig::from_uri("log:test").unwrap();
858        assert!(!config.show_stream_info);
859    }
860}