dataflow_rs/engine/functions/
log.rs1use crate::engine::error::Result;
2use crate::engine::executor::{ArenaContext, with_arena};
3use crate::engine::message::{Change, Message};
4use crate::engine::task_outcome::TaskOutcome;
5use datalogic_rs::{Engine, Logic};
6use datavalue::DataValue;
7use log::{debug, error, info, trace, warn};
8use serde::Deserialize;
9use serde_json::Value;
10use std::collections::HashMap;
11use std::sync::Arc;
12
13#[derive(Debug, Clone, Default, Deserialize)]
15#[serde(rename_all = "lowercase")]
16pub enum LogLevel {
17 Trace,
18 Debug,
19 #[default]
20 Info,
21 Warn,
22 Error,
23}
24
25#[derive(Debug, Clone, Deserialize)]
29pub struct LogConfig {
30 #[serde(default)]
32 pub level: LogLevel,
33
34 pub message: Value,
36
37 #[serde(default)]
39 pub fields: HashMap<String, Value>,
40
41 #[serde(skip)]
43 pub compiled_message: Option<Arc<Logic>>,
44
45 #[serde(skip)]
49 pub compiled_fields: Vec<(String, Option<Arc<Logic>>)>,
50}
51
52impl LogConfig {
53 pub fn execute(
60 &self,
61 message: &mut Message,
62 engine: &Arc<Engine>,
63 ) -> Result<(TaskOutcome, Vec<Change>)> {
64 with_arena(|arena| {
65 let mut arena_ctx = ArenaContext::from_owned(&message.context, arena);
66 self.execute_in_arena(message, &mut arena_ctx, engine)
67 })
68 }
69
70 pub(crate) fn execute_in_arena(
74 &self,
75 _message: &mut Message,
76 arena_ctx: &mut ArenaContext<'_>,
77 engine: &Arc<Engine>,
78 ) -> Result<(TaskOutcome, Vec<Change>)> {
79 let arena = arena_ctx.arena();
80 let ctx_av = arena_ctx.as_data_value();
81
82 let stringify = |compiled: &Logic| -> String {
86 match engine.evaluate(compiled, ctx_av, arena) {
87 Ok(DataValue::String(s)) => (*s).to_string(),
88 Ok(other) => other.to_string(),
89 Err(e) => {
90 error!("Log: Failed to evaluate expression: {:?}", e);
91 "<eval error>".to_string()
92 }
93 }
94 };
95
96 let log_message = match &self.compiled_message {
97 Some(compiled) => stringify(compiled),
98 None => "<uncompiled message>".to_string(),
99 };
100
101 let mut field_parts = Vec::with_capacity(self.compiled_fields.len());
102 for (key, compiled_opt) in &self.compiled_fields {
103 let val = match compiled_opt {
104 Some(compiled) => stringify(compiled),
105 None => "<uncompiled>".to_string(),
106 };
107 field_parts.push(format!("{}={}", key, val));
108 }
109
110 let full_message = if field_parts.is_empty() {
111 log_message
112 } else {
113 format!("{} [{}]", log_message, field_parts.join(", "))
114 };
115
116 match self.level {
117 LogLevel::Trace => trace!(target: "dataflow::log", "{}", full_message),
118 LogLevel::Debug => debug!(target: "dataflow::log", "{}", full_message),
119 LogLevel::Info => info!(target: "dataflow::log", "{}", full_message),
120 LogLevel::Warn => warn!(target: "dataflow::log", "{}", full_message),
121 LogLevel::Error => error!(target: "dataflow::log", "{}", full_message),
122 }
123
124 Ok((TaskOutcome::Success, vec![]))
126 }
127}