use super::{PipelineContext, ScriptResult, ScriptStage};
use crate::config::TimestampFilterConfig;
use crate::engine::RhaiEngine;
use crate::event::Event;
use anyhow::Result;
pub struct FilterStage {
compiled_filter: crate::engine::CompiledExpression,
}
impl FilterStage {
pub fn new(filter: String, engine: &mut RhaiEngine) -> Result<Self> {
let compiled_filter = engine.compile_filter(&filter)?;
Ok(Self { compiled_filter })
}
}
impl ScriptStage for FilterStage {
fn apply(&mut self, event: Event, ctx: &mut PipelineContext) -> ScriptResult {
let result = if ctx.window.is_empty() {
ctx.rhai
.execute_compiled_filter(&self.compiled_filter, &event, &mut ctx.tracker)
} else {
ctx.rhai.execute_compiled_filter_with_window(
&self.compiled_filter,
&event,
&ctx.window,
&mut ctx.tracker,
)
};
match result {
Ok(result) => {
if result {
ScriptResult::Emit(event)
} else {
ScriptResult::Skip
}
}
Err(e) => {
crate::rhai_functions::tracking::track_error(
"rhai",
ctx.meta.line_number,
&format!("Filter error: {}", e),
ctx.config.verbose,
ctx.config.quiet,
Some(&ctx.config),
);
if ctx.config.strict {
ScriptResult::Error(format!("Filter error: {}", e))
} else {
ScriptResult::Skip
}
}
}
}
}
pub struct ExecStage {
compiled_exec: crate::engine::CompiledExpression,
}
impl ExecStage {
pub fn new(exec: String, engine: &mut RhaiEngine) -> Result<Self> {
let compiled_exec = engine.compile_exec(&exec)?;
Ok(Self { compiled_exec })
}
}
impl ScriptStage for ExecStage {
fn apply(&mut self, event: Event, ctx: &mut PipelineContext) -> ScriptResult {
let mut event_copy = event.clone();
let result = if ctx.window.is_empty() {
ctx.rhai
.execute_compiled_exec(&self.compiled_exec, &mut event_copy, &mut ctx.tracker)
} else {
ctx.rhai.execute_compiled_exec_with_window(
&self.compiled_exec,
&mut event_copy,
&ctx.window,
&mut ctx.tracker,
)
};
match result {
Ok(()) => {
ScriptResult::Emit(event_copy)
}
Err(e) => {
crate::rhai_functions::tracking::track_error(
"rhai",
ctx.meta.line_number,
&format!("Exec error: {}", e),
ctx.config.verbose,
ctx.config.quiet,
Some(&ctx.config),
);
if ctx.config.strict {
ScriptResult::Error(format!("Exec error: {}", e))
} else {
ScriptResult::Emit(event)
}
}
}
}
}
#[allow(dead_code)] pub struct BeginStage {
compiled_begin: Option<crate::engine::CompiledExpression>,
}
impl BeginStage {
#[allow(dead_code)] pub fn new(begin: Option<String>, engine: &mut RhaiEngine) -> Result<Self> {
let compiled_begin = if let Some(begin_expr) = begin {
Some(engine.compile_begin(&begin_expr)?)
} else {
None
};
Ok(Self { compiled_begin })
}
#[allow(dead_code)] pub fn execute(&self, ctx: &mut PipelineContext) -> Result<()> {
if let Some(ref compiled) = self.compiled_begin {
let _init_map = ctx
.rhai
.execute_compiled_begin(compiled, &mut ctx.tracker)?;
Ok(())
} else {
Ok(())
}
}
}
#[allow(dead_code)] pub struct EndStage {
compiled_end: Option<crate::engine::CompiledExpression>,
}
impl EndStage {
#[allow(dead_code)] pub fn new(end: Option<String>, engine: &mut RhaiEngine) -> Result<Self> {
let compiled_end = if let Some(end_expr) = end {
Some(engine.compile_end(&end_expr)?)
} else {
None
};
Ok(Self { compiled_end })
}
#[allow(dead_code)] pub fn execute(&self, ctx: &PipelineContext) -> Result<()> {
if let Some(ref compiled) = self.compiled_end {
ctx.rhai.execute_compiled_end(compiled, &ctx.tracker)
} else {
Ok(())
}
}
}
pub struct LevelFilterStage {
levels: Vec<String>,
exclude_levels: Vec<String>,
}
impl LevelFilterStage {
pub fn new(levels: Vec<String>, exclude_levels: Vec<String>) -> Self {
Self {
levels,
exclude_levels,
}
}
pub fn is_active(&self) -> bool {
!self.levels.is_empty() || !self.exclude_levels.is_empty()
}
}
impl ScriptStage for LevelFilterStage {
fn apply(&mut self, event: Event, _ctx: &mut PipelineContext) -> ScriptResult {
if !self.is_active() {
return ScriptResult::Emit(event);
}
let event_level = {
let mut found_level: Option<String> = None;
for level_field_name in crate::event::LEVEL_FIELD_NAMES {
if let Some(value) = event.fields.get(*level_field_name) {
if let Ok(level_str) = value.clone().into_string() {
found_level = Some(level_str);
break;
}
}
}
match found_level {
Some(level) => level,
None => {
if self.levels.is_empty() {
return ScriptResult::Emit(event);
} else {
return ScriptResult::Skip;
}
}
}
};
if !self.exclude_levels.is_empty() {
for exclude_level in &self.exclude_levels {
if event_level.eq_ignore_ascii_case(exclude_level) {
return ScriptResult::Skip;
}
}
}
if !self.levels.is_empty() {
for level in &self.levels {
if event_level.eq_ignore_ascii_case(level) {
return ScriptResult::Emit(event);
}
}
return ScriptResult::Skip;
}
ScriptResult::Emit(event)
}
}
pub struct KeyFilterStage {
keys: Vec<String>,
exclude_keys: Vec<String>,
}
impl KeyFilterStage {
pub fn new(keys: Vec<String>, exclude_keys: Vec<String>) -> Self {
Self { keys, exclude_keys }
}
pub fn is_active(&self) -> bool {
!self.keys.is_empty() || !self.exclude_keys.is_empty()
}
}
impl ScriptStage for KeyFilterStage {
fn apply(&mut self, mut event: Event, _ctx: &mut PipelineContext) -> ScriptResult {
if !self.is_active() {
return ScriptResult::Emit(event);
}
let available_keys: Vec<String> = event.fields.keys().cloned().collect();
let effective_keys = {
let mut result_keys = if self.keys.is_empty() {
available_keys
} else {
available_keys
.iter()
.filter(|key| self.keys.contains(key))
.cloned()
.collect()
};
result_keys.retain(|key| !self.exclude_keys.contains(key));
result_keys
};
event.filter_keys(&effective_keys);
if self.is_active() && event.fields.is_empty() {
ScriptResult::Skip
} else {
ScriptResult::Emit(event)
}
}
}
pub struct TimestampFilterStage {
config: TimestampFilterConfig,
}
impl TimestampFilterStage {
pub fn new(config: TimestampFilterConfig) -> Self {
Self { config }
}
}
impl ScriptStage for TimestampFilterStage {
fn apply(&mut self, event: Event, ctx: &mut PipelineContext) -> ScriptResult {
let event_timestamp = match event.parsed_ts {
Some(ts) => ts,
None => {
if ctx.config.strict {
return ScriptResult::Error(
"Event has no valid timestamp for --since/--until filtering".to_string(),
);
} else {
return ScriptResult::Skip;
}
}
};
if let Some(since) = self.config.since {
if event_timestamp < since {
return ScriptResult::Skip;
}
}
if let Some(until) = self.config.until {
if event_timestamp > until {
return ScriptResult::Skip;
}
}
ScriptResult::Emit(event)
}
}
#[cfg(test)]
mod tests {
use super::*;
use crate::config::TimestampFilterConfig;
use crate::pipeline::{MetaData, PipelineConfig};
use chrono::{Duration, Utc};
#[test]
fn test_timestamp_filter_stage_since() {
let since = Utc::now() - Duration::hours(1);
let config = TimestampFilterConfig {
since: Some(since),
until: None,
};
let mut stage = TimestampFilterStage::new(config);
let mut ctx = PipelineContext {
config: PipelineConfig {
error_report: crate::config::ErrorReportConfig {
style: crate::config::ErrorReportStyle::Summary,
file: None,
},
brief: false,
color_mode: crate::config::ColorMode::Auto,
timestamp_formatting: crate::config::TimestampFormatConfig::default(),
strict: false,
verbose: false,
quiet: false,
no_emoji: false,
},
tracker: std::collections::HashMap::new(),
window: Vec::new(),
rhai: crate::engine::RhaiEngine::new(),
meta: MetaData::default(),
};
let old_event = crate::event::Event {
parsed_ts: Some(since - Duration::minutes(30)),
..Default::default()
};
let result = stage.apply(old_event, &mut ctx);
matches!(result, ScriptResult::Skip);
let new_event = crate::event::Event {
parsed_ts: Some(since + Duration::minutes(30)),
..Default::default()
};
let result = stage.apply(new_event, &mut ctx);
matches!(result, ScriptResult::Emit(_));
}
#[test]
fn test_timestamp_filter_stage_until() {
let until = Utc::now() - Duration::hours(1);
let config = TimestampFilterConfig {
since: None,
until: Some(until),
};
let mut stage = TimestampFilterStage::new(config);
let mut ctx = PipelineContext {
config: PipelineConfig {
error_report: crate::config::ErrorReportConfig {
style: crate::config::ErrorReportStyle::Summary,
file: None,
},
brief: false,
color_mode: crate::config::ColorMode::Auto,
timestamp_formatting: crate::config::TimestampFormatConfig::default(),
strict: false,
verbose: false,
quiet: false,
no_emoji: false,
},
tracker: std::collections::HashMap::new(),
window: Vec::new(),
rhai: crate::engine::RhaiEngine::new(),
meta: MetaData::default(),
};
let old_event = crate::event::Event {
parsed_ts: Some(until - Duration::minutes(30)),
..Default::default()
};
let result = stage.apply(old_event, &mut ctx);
matches!(result, ScriptResult::Emit(_));
let new_event = crate::event::Event {
parsed_ts: Some(until + Duration::minutes(30)),
..Default::default()
};
let result = stage.apply(new_event, &mut ctx);
matches!(result, ScriptResult::Skip);
}
#[test]
fn test_timestamp_filter_stage_no_timestamp() {
let config = TimestampFilterConfig {
since: Some(Utc::now() - Duration::hours(1)),
until: Some(Utc::now() + Duration::hours(1)),
};
let mut stage = TimestampFilterStage::new(config);
let mut ctx = PipelineContext {
config: PipelineConfig {
error_report: crate::config::ErrorReportConfig {
style: crate::config::ErrorReportStyle::Summary,
file: None,
},
brief: false,
color_mode: crate::config::ColorMode::Auto,
timestamp_formatting: crate::config::TimestampFormatConfig::default(),
strict: false,
verbose: false,
quiet: false,
no_emoji: false,
},
tracker: std::collections::HashMap::new(),
window: Vec::new(),
rhai: crate::engine::RhaiEngine::new(),
meta: MetaData::default(),
};
let event_no_ts = crate::event::Event {
parsed_ts: None,
..Default::default()
};
let result = stage.apply(event_no_ts, &mut ctx);
matches!(result, ScriptResult::Emit(_));
}
}