dataflow_rs/engine/functions/
log.rs1use crate::engine::error::Result;
2use crate::engine::message::{Change, Message};
3use datalogic_rs::{CompiledLogic, DataLogic};
4use log::{debug, error, info, trace, warn};
5use serde::Deserialize;
6use serde_json::Value;
7use std::collections::HashMap;
8use std::sync::Arc;
9
10#[derive(Debug, Clone, Default, Deserialize)]
12#[serde(rename_all = "lowercase")]
13pub enum LogLevel {
14 Trace,
15 Debug,
16 #[default]
17 Info,
18 Warn,
19 Error,
20}
21
22#[derive(Debug, Clone, Deserialize)]
26pub struct LogConfig {
27 #[serde(default)]
29 pub level: LogLevel,
30
31 pub message: Value,
33
34 #[serde(default)]
36 pub fields: HashMap<String, Value>,
37
38 #[serde(skip)]
40 pub message_index: Option<usize>,
41
42 #[serde(skip)]
44 pub field_indices: Vec<(String, Option<usize>)>,
45}
46
47impl LogConfig {
48 pub fn execute(
50 &self,
51 message: &mut Message,
52 datalogic: &Arc<DataLogic>,
53 logic_cache: &[Arc<CompiledLogic>],
54 ) -> Result<(usize, Vec<Change>)> {
55 let context_arc = message.get_context_arc();
56
57 let log_message = match self.message_index {
59 Some(idx) if idx < logic_cache.len() => {
60 match datalogic.evaluate(&logic_cache[idx], Arc::clone(&context_arc)) {
61 Ok(Value::String(s)) => s,
62 Ok(other) => other.to_string(),
63 Err(e) => {
64 error!("Log: Failed to evaluate message expression: {:?}", e);
65 "<message eval error>".to_string()
66 }
67 }
68 }
69 _ => "<uncompiled message>".to_string(),
70 };
71
72 let mut field_parts = Vec::new();
74 for (key, idx_opt) in &self.field_indices {
75 let val = match idx_opt {
76 Some(idx) if *idx < logic_cache.len() => {
77 match datalogic.evaluate(&logic_cache[*idx], Arc::clone(&context_arc)) {
78 Ok(Value::String(s)) => s,
79 Ok(v) => v.to_string(),
80 Err(_) => "<eval error>".to_string(),
81 }
82 }
83 _ => "<uncompiled>".to_string(),
84 };
85 field_parts.push(format!("{}={}", key, val));
86 }
87
88 let full_message = if field_parts.is_empty() {
89 log_message
90 } else {
91 format!("{} [{}]", log_message, field_parts.join(", "))
92 };
93
94 match self.level {
95 LogLevel::Trace => trace!(target: "dataflow::log", "{}", full_message),
96 LogLevel::Debug => debug!(target: "dataflow::log", "{}", full_message),
97 LogLevel::Info => info!(target: "dataflow::log", "{}", full_message),
98 LogLevel::Warn => warn!(target: "dataflow::log", "{}", full_message),
99 LogLevel::Error => error!(target: "dataflow::log", "{}", full_message),
100 }
101
102 Ok((200, vec![]))
104 }
105}