Skip to main content

dataflow_rs/engine/functions/
log.rs

1use crate::engine::error::Result;
2use crate::engine::executor::{ArenaContext, with_arena};
3use crate::engine::message::{Change, Message};
4use crate::engine::task_outcome::TaskOutcome;
5use datalogic_rs::{Engine, Logic};
6use datavalue::DataValue;
7use log::{debug, error, info, trace, warn};
8use serde::Deserialize;
9use serde_json::Value;
10use std::collections::HashMap;
11use std::sync::Arc;
12
13/// Log levels supported by the log function
14#[derive(Debug, Clone, Default, Deserialize)]
15#[serde(rename_all = "lowercase")]
16pub enum LogLevel {
17    Trace,
18    Debug,
19    #[default]
20    Info,
21    Warn,
22    Error,
23}
24
25/// Configuration for the log function.
26///
27/// The message and field expressions are pre-compiled at startup.
28#[derive(Debug, Clone, Deserialize)]
29pub struct LogConfig {
30    /// Log level to emit at
31    #[serde(default)]
32    pub level: LogLevel,
33
34    /// JSONLogic expression to produce the log message string
35    pub message: Value,
36
37    /// Additional structured fields: each value is a JSONLogic expression
38    #[serde(default)]
39    pub fields: HashMap<String, Value>,
40
41    /// Pre-compiled `message` JSONLogic, populated by `LogicCompiler`.
42    #[serde(skip)]
43    pub compiled_message: Option<Arc<Logic>>,
44
45    /// Pre-compiled JSONLogic for each `fields` entry, populated by
46    /// `LogicCompiler`. The inner `Option` is `None` for fields whose logic
47    /// failed to compile (logged at engine construction).
48    #[serde(skip)]
49    pub compiled_fields: Vec<(String, Option<Arc<Logic>>)>,
50}
51
52impl LogConfig {
53    /// Execute the log function, opening a fresh thread-local arena scope.
54    ///
55    /// Use this entry point when calling `LogConfig` outside an existing
56    /// `with_arena` scope (direct API users, tests). Inside a workflow sync
57    /// stretch the dispatch goes through [`Self::execute_in_arena`] to reuse
58    /// the cached `ArenaContext` and avoid a redundant `to_arena` walk.
59    pub fn execute(
60        &self,
61        message: &mut Message,
62        engine: &Arc<Engine>,
63    ) -> Result<(TaskOutcome, Vec<Change>)> {
64        with_arena(|arena| {
65            let mut arena_ctx = ArenaContext::from_owned(&message.context, arena);
66            self.execute_in_arena(message, &mut arena_ctx, engine)
67        })
68    }
69
70    /// Execute against an externally-provided `ArenaContext` so the cached
71    /// arena form of `message.context` (built once at the top of the workflow
72    /// sync stretch) is reused across every JSONLogic eval performed here.
73    pub(crate) fn execute_in_arena(
74        &self,
75        _message: &mut Message,
76        arena_ctx: &mut ArenaContext<'_>,
77        engine: &Arc<Engine>,
78    ) -> Result<(TaskOutcome, Vec<Change>)> {
79        let arena = arena_ctx.arena();
80        let ctx_av = arena_ctx.as_data_value();
81
82        // Stringify a single eval result. For the common String case we copy
83        // the str directly; otherwise the JSON emitter writes straight from
84        // `&DataValue<'_>` without an intermediate `to_owned()` deep clone.
85        let stringify = |compiled: &Logic| -> String {
86            match engine.evaluate(compiled, ctx_av, arena) {
87                Ok(DataValue::String(s)) => (*s).to_string(),
88                Ok(other) => other.to_string(),
89                Err(e) => {
90                    error!("Log: Failed to evaluate expression: {:?}", e);
91                    "<eval error>".to_string()
92                }
93            }
94        };
95
96        let log_message = match &self.compiled_message {
97            Some(compiled) => stringify(compiled),
98            None => "<uncompiled message>".to_string(),
99        };
100
101        let mut field_parts = Vec::with_capacity(self.compiled_fields.len());
102        for (key, compiled_opt) in &self.compiled_fields {
103            let val = match compiled_opt {
104                Some(compiled) => stringify(compiled),
105                None => "<uncompiled>".to_string(),
106            };
107            field_parts.push(format!("{}={}", key, val));
108        }
109
110        let full_message = if field_parts.is_empty() {
111            log_message
112        } else {
113            format!("{} [{}]", log_message, field_parts.join(", "))
114        };
115
116        match self.level {
117            LogLevel::Trace => trace!(target: "dataflow::log", "{}", full_message),
118            LogLevel::Debug => debug!(target: "dataflow::log", "{}", full_message),
119            LogLevel::Info => info!(target: "dataflow::log", "{}", full_message),
120            LogLevel::Warn => warn!(target: "dataflow::log", "{}", full_message),
121            LogLevel::Error => error!(target: "dataflow::log", "{}", full_message),
122        }
123
124        // Log function never modifies message, never fails
125        Ok((TaskOutcome::Success, vec![]))
126    }
127}