mod instant_policy_fn;
mod meta;
mod sliding_policy_fn;
mod time_range;
mod tumbling_policy_fn;
mod validation;
pub(crate) use instant_policy_fn::apply_instant_policy;
pub(crate) use instant_policy_fn::apply_instant_policy_from_parts;
pub(crate) use meta::SignalPolicyStats;
use polars::prelude::*;
pub(crate) use sliding_policy_fn::apply_sliding_policy;
pub(crate) use time_range::{extract_time_range, extract_time_range_from_parts};
use tracing::info;
use tracing::instrument;
pub(crate) use tumbling_policy_fn::apply_tumbling_policy;
use crate::{EtlResult, signal_policy::WindowStrategy, unit::MeasurementUnit};
#[instrument(skip(df, measurement), fields(measurement = %measurement.name))]
pub(crate) fn apply_signal_policy(
df: DataFrame,
measurement: &MeasurementUnit,
source_name: &str,
) -> EtlResult<(DataFrame, Option<SignalPolicyStats>)> {
info!(
"🟢 ROOT Applying signal policy to measurement '{}'",
measurement.name
);
let Some(ref policy) = measurement.signal_policy else {
info!("🟡 Empty policy; Return early");
return Ok((df, None));
};
let input_points = df.height();
let policy_type = policy.windowing.name();
let ttl_ms = policy.ttl().as_millis() as u64;
let time_range = extract_time_range(&df, measurement)?;
let time_span_ms = time_range.duration_ms;
let result_df = match &policy.windowing {
WindowStrategy::Instant => {
crate::polars_fns::signal_policy::apply_instant_policy(df, measurement)?
}
WindowStrategy::Sliding {
duration,
min_samples,
} => apply_sliding_policy(df, measurement, *duration, *min_samples)?,
WindowStrategy::Tumbling {
duration,
min_samples,
} => apply_tumbling_policy(df, measurement, *duration, *min_samples)?,
};
let grid_points = result_df.height();
let value_col = result_df.column(measurement.name.as_str())?;
let null_observations = value_col.null_count();
let stats = SignalPolicyStats::new(
measurement.name.clone(),
source_name,
policy_type,
input_points,
grid_points,
null_observations,
ttl_ms,
time_span_ms,
);
Ok((result_df, Some(stats)))
}
#[inline]
pub(crate) fn get_value_column(measurement: &MeasurementUnit) -> &str {
&measurement.name
}
#[inline]
pub(crate) fn get_component_names(measurement: &MeasurementUnit) -> Vec<&str> {
measurement.components.iter().map(|c| c.as_str()).collect()
}