Skip to main content

dataflow_rs/engine/functions/
log.rs

1use crate::engine::error::Result;
2use crate::engine::message::{Change, Message};
3use datalogic_rs::{CompiledLogic, DataLogic};
4use log::{debug, error, info, trace, warn};
5use serde::Deserialize;
6use serde_json::Value;
7use std::collections::HashMap;
8use std::sync::Arc;
9
10/// Log levels supported by the log function
11#[derive(Debug, Clone, Default, Deserialize)]
12#[serde(rename_all = "lowercase")]
13pub enum LogLevel {
14    Trace,
15    Debug,
16    #[default]
17    Info,
18    Warn,
19    Error,
20}
21
22/// Configuration for the log function.
23///
24/// The message and field expressions are pre-compiled at startup.
25#[derive(Debug, Clone, Deserialize)]
26pub struct LogConfig {
27    /// Log level to emit at
28    #[serde(default)]
29    pub level: LogLevel,
30
31    /// JSONLogic expression to produce the log message string
32    pub message: Value,
33
34    /// Additional structured fields: each value is a JSONLogic expression
35    #[serde(default)]
36    pub fields: HashMap<String, Value>,
37
38    /// Cache index for the compiled message expression
39    #[serde(skip)]
40    pub message_index: Option<usize>,
41
42    /// Cache indices for the compiled field expressions
43    #[serde(skip)]
44    pub field_indices: Vec<(String, Option<usize>)>,
45}
46
47impl LogConfig {
48    /// Execute the log function. Always returns Ok((200, [])).
49    pub fn execute(
50        &self,
51        message: &mut Message,
52        datalogic: &Arc<DataLogic>,
53        logic_cache: &[Arc<CompiledLogic>],
54    ) -> Result<(usize, Vec<Change>)> {
55        let context_arc = message.get_context_arc();
56
57        // Evaluate message expression
58        let log_message = match self.message_index {
59            Some(idx) if idx < logic_cache.len() => {
60                match datalogic.evaluate(&logic_cache[idx], Arc::clone(&context_arc)) {
61                    Ok(Value::String(s)) => s,
62                    Ok(other) => other.to_string(),
63                    Err(e) => {
64                        error!("Log: Failed to evaluate message expression: {:?}", e);
65                        "<message eval error>".to_string()
66                    }
67                }
68            }
69            _ => "<uncompiled message>".to_string(),
70        };
71
72        // Evaluate field expressions
73        let mut field_parts = Vec::new();
74        for (key, idx_opt) in &self.field_indices {
75            let val = match idx_opt {
76                Some(idx) if *idx < logic_cache.len() => {
77                    match datalogic.evaluate(&logic_cache[*idx], Arc::clone(&context_arc)) {
78                        Ok(Value::String(s)) => s,
79                        Ok(v) => v.to_string(),
80                        Err(_) => "<eval error>".to_string(),
81                    }
82                }
83                _ => "<uncompiled>".to_string(),
84            };
85            field_parts.push(format!("{}={}", key, val));
86        }
87
88        let full_message = if field_parts.is_empty() {
89            log_message
90        } else {
91            format!("{} [{}]", log_message, field_parts.join(", "))
92        };
93
94        match self.level {
95            LogLevel::Trace => trace!(target: "dataflow::log", "{}", full_message),
96            LogLevel::Debug => debug!(target: "dataflow::log", "{}", full_message),
97            LogLevel::Info => info!(target: "dataflow::log", "{}", full_message),
98            LogLevel::Warn => warn!(target: "dataflow::log", "{}", full_message),
99            LogLevel::Error => error!(target: "dataflow::log", "{}", full_message),
100        }
101
102        // Log function never modifies message, never fails
103        Ok((200, vec![]))
104    }
105}