use crate::ev_filtering::config::Validatable;
use crate::ev_filtering::{FilterError, FilterResult};
use polars::prelude::*;
#[cfg(unix)]
use tracing::{debug, instrument, warn};
#[cfg(not(unix))]
macro_rules! debug {
($($args:tt)*) => {};
}
#[cfg(not(unix))]
macro_rules! info {
($($args:tt)*) => {};
}
#[cfg(not(unix))]
macro_rules! warn {
($($args:tt)*) => {
eprintln!("[WARN] {}", format!($($args)*))
};
}
#[cfg(not(unix))]
macro_rules! trace {
($($args:tt)*) => {};
}
#[cfg(not(unix))]
macro_rules! error {
($($args:tt)*) => {
eprintln!("[ERROR] {}", format!($($args)*))
};
}
#[cfg(not(unix))]
macro_rules! instrument {
($($args:tt)*) => {};
}
pub const COL_X: &str = "x";
pub const COL_Y: &str = "y";
pub const COL_T: &str = "t";
pub const COL_POLARITY: &str = "polarity";
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
pub enum DenoiseMethod {
RefractoryPeriod,
TemporalCorrelation,
SpatialTemporalCorrelation,
BackgroundActivity,
MultiScale,
}
impl DenoiseMethod {
pub fn description(&self) -> &'static str {
match self {
DenoiseMethod::RefractoryPeriod => "refractory period",
DenoiseMethod::TemporalCorrelation => "temporal correlation",
DenoiseMethod::SpatialTemporalCorrelation => "spatial-temporal correlation",
DenoiseMethod::BackgroundActivity => "background activity",
DenoiseMethod::MultiScale => "multi-scale",
}
}
}
#[derive(Debug, Clone)]
pub struct RefractoryFilter {
pub period_us: f64,
pub per_polarity: bool,
pub absolute_refractory: bool,
}
impl RefractoryFilter {
pub fn new(period_us: f64) -> Self {
Self {
period_us,
per_polarity: true,
absolute_refractory: true,
}
}
pub fn with_per_polarity(mut self, per_polarity: bool) -> Self {
self.per_polarity = per_polarity;
self
}
pub fn with_absolute_refractory(mut self, absolute: bool) -> Self {
self.absolute_refractory = absolute;
self
}
}
#[derive(Debug, Clone)]
pub struct TemporalCorrelationFilter {
pub time_window_us: f64,
pub min_events: usize,
pub max_time_distance_us: f64,
}
impl TemporalCorrelationFilter {
pub fn new(time_window_us: f64, min_events: usize) -> Self {
Self {
time_window_us,
min_events,
max_time_distance_us: time_window_us,
}
}
}
#[derive(Debug, Clone)]
pub struct SpatialTemporalFilter {
pub spatial_radius: u16,
pub time_window_us: f64,
pub min_neighbors: usize,
pub consider_polarity: bool,
}
impl SpatialTemporalFilter {
pub fn new(spatial_radius: u16, time_window_us: f64, min_neighbors: usize) -> Self {
Self {
spatial_radius,
time_window_us,
min_neighbors,
consider_polarity: false,
}
}
pub fn with_polarity_consideration(mut self, consider: bool) -> Self {
self.consider_polarity = consider;
self
}
}
#[derive(Debug, Clone)]
pub struct DenoisePerformanceConfig {
pub chunk_size: usize,
pub enable_parallel: bool,
pub memory_limit: usize,
}
impl Default for DenoisePerformanceConfig {
fn default() -> Self {
Self {
chunk_size: 100_000,
enable_parallel: true,
memory_limit: 1_000_000_000, }
}
}
impl DenoisePerformanceConfig {
pub fn high_performance() -> Self {
Self {
chunk_size: 500_000,
enable_parallel: true,
memory_limit: 4_000_000_000, }
}
pub fn memory_efficient() -> Self {
Self {
chunk_size: 50_000,
enable_parallel: false,
memory_limit: 500_000_000, }
}
}
#[derive(Debug, Clone)]
pub struct DenoiseFilter {
pub method: DenoiseMethod,
pub refractory_filter: Option<RefractoryFilter>,
pub temporal_correlation_filter: Option<TemporalCorrelationFilter>,
pub spatial_temporal_filter: Option<SpatialTemporalFilter>,
pub background_threshold: Option<f64>,
pub preserve_order: bool,
pub validate_results: bool,
pub performance_config: DenoisePerformanceConfig,
}
impl DenoiseFilter {
pub fn refractory(period_us: f64) -> Self {
Self {
method: DenoiseMethod::RefractoryPeriod,
refractory_filter: Some(RefractoryFilter::new(period_us)),
temporal_correlation_filter: None,
spatial_temporal_filter: None,
background_threshold: None,
preserve_order: true,
validate_results: true,
performance_config: DenoisePerformanceConfig::default(),
}
}
pub fn temporal_correlation(time_window_us: f64, min_events: usize) -> Self {
Self {
method: DenoiseMethod::TemporalCorrelation,
refractory_filter: None,
temporal_correlation_filter: Some(TemporalCorrelationFilter::new(
time_window_us,
min_events,
)),
spatial_temporal_filter: None,
background_threshold: None,
preserve_order: true,
validate_results: true,
performance_config: DenoisePerformanceConfig::default(),
}
}
pub fn spatial_temporal(
spatial_radius: u16,
time_window_us: f64,
min_neighbors: usize,
) -> Self {
Self {
method: DenoiseMethod::SpatialTemporalCorrelation,
refractory_filter: None,
temporal_correlation_filter: None,
spatial_temporal_filter: Some(SpatialTemporalFilter::new(
spatial_radius,
time_window_us,
min_neighbors,
)),
background_threshold: None,
preserve_order: true,
validate_results: true,
performance_config: DenoisePerformanceConfig::default(),
}
}
pub fn background_activity(threshold_events_per_sec: f64) -> Self {
Self {
method: DenoiseMethod::BackgroundActivity,
refractory_filter: None,
temporal_correlation_filter: None,
spatial_temporal_filter: None,
background_threshold: Some(threshold_events_per_sec),
preserve_order: true,
validate_results: true,
performance_config: DenoisePerformanceConfig::default(),
}
}
pub fn multi_scale(
refractory_period_us: f64,
spatial_radius: u16,
time_window_us: f64,
) -> Self {
Self {
method: DenoiseMethod::MultiScale,
refractory_filter: Some(RefractoryFilter::new(refractory_period_us)),
temporal_correlation_filter: None,
spatial_temporal_filter: Some(SpatialTemporalFilter::new(
spatial_radius,
time_window_us,
2,
)),
background_threshold: None,
preserve_order: true,
validate_results: true,
performance_config: DenoisePerformanceConfig::default(),
}
}
pub fn with_order_preservation(mut self, preserve: bool) -> Self {
self.preserve_order = preserve;
self
}
pub fn with_validation(mut self, validate: bool) -> Self {
self.validate_results = validate;
self
}
pub fn with_performance_config(mut self, config: DenoisePerformanceConfig) -> Self {
self.performance_config = config;
self
}
pub fn with_high_performance(mut self) -> Self {
self.performance_config = DenoisePerformanceConfig::high_performance();
self
}
pub fn with_memory_efficient(mut self) -> Self {
self.performance_config = DenoisePerformanceConfig::memory_efficient();
self
}
pub fn description(&self) -> String {
let mut parts = vec![self.method.description().to_string()];
if let Some(ref_filter) = &self.refractory_filter {
parts.push(format!("refractory: {:.1}µs", ref_filter.period_us));
}
if let Some(temp_filter) = &self.temporal_correlation_filter {
parts.push(format!(
"temporal: {:.1}µs window, {} events",
temp_filter.time_window_us, temp_filter.min_events
));
}
if let Some(st_filter) = &self.spatial_temporal_filter {
parts.push(format!(
"spatial-temporal: r={}, {:.1}µs, {} neighbors",
st_filter.spatial_radius, st_filter.time_window_us, st_filter.min_neighbors
));
}
if let Some(bg_threshold) = self.background_threshold {
parts.push(format!("background: {:.1} events/s", bg_threshold));
}
parts.join(", ")
}
}
impl Default for DenoiseFilter {
fn default() -> Self {
Self::refractory(1000.0) }
}
impl Validatable for DenoiseFilter {
fn validate(&self) -> FilterResult<()> {
match self.method {
DenoiseMethod::RefractoryPeriod => {
if let Some(ref_filter) = &self.refractory_filter {
if ref_filter.period_us <= 0.0 {
return Err(FilterError::InvalidConfig(
"Refractory period must be positive".to_string(),
));
}
} else {
return Err(FilterError::InvalidConfig(
"Refractory period method requires refractory_filter".to_string(),
));
}
}
DenoiseMethod::TemporalCorrelation => {
if let Some(temp_filter) = &self.temporal_correlation_filter {
if temp_filter.time_window_us <= 0.0 {
return Err(FilterError::InvalidConfig(
"Temporal correlation time window must be positive".to_string(),
));
}
if temp_filter.min_events == 0 {
return Err(FilterError::InvalidConfig(
"Temporal correlation min_events must be positive".to_string(),
));
}
} else {
return Err(FilterError::InvalidConfig(
"Temporal correlation method requires temporal_correlation_filter"
.to_string(),
));
}
}
DenoiseMethod::SpatialTemporalCorrelation => {
if let Some(st_filter) = &self.spatial_temporal_filter {
if st_filter.time_window_us <= 0.0 {
return Err(FilterError::InvalidConfig(
"Spatial-temporal time window must be positive".to_string(),
));
}
if st_filter.min_neighbors == 0 {
return Err(FilterError::InvalidConfig(
"Spatial-temporal min_neighbors must be positive".to_string(),
));
}
} else {
return Err(FilterError::InvalidConfig(
"Spatial-temporal method requires spatial_temporal_filter".to_string(),
));
}
}
DenoiseMethod::BackgroundActivity => {
if let Some(threshold) = self.background_threshold {
if threshold <= 0.0 {
return Err(FilterError::InvalidConfig(
"Background activity threshold must be positive".to_string(),
));
}
} else {
return Err(FilterError::InvalidConfig(
"Background activity method requires background_threshold".to_string(),
));
}
}
DenoiseMethod::MultiScale => {
if self.refractory_filter.is_none() && self.spatial_temporal_filter.is_none() {
return Err(FilterError::InvalidConfig(
"Multi-scale method requires at least one sub-filter".to_string(),
));
}
}
}
Ok(())
}
}
#[cfg_attr(unix, instrument(skip(df), fields(filter = ?filter)))]
pub fn apply_refractory_filter_polars(
df: LazyFrame,
filter: &RefractoryFilter,
) -> PolarsResult<LazyFrame> {
debug!("Applying refractory period filter: {:?}", filter);
let grouping_columns = if filter.per_polarity {
vec![col(COL_X), col(COL_Y), col(COL_POLARITY)]
} else {
vec![col(COL_X), col(COL_Y)]
};
let filtered_df = df
.sort([COL_T], SortMultipleOptions::default()) .with_columns([
(col(COL_T) - col(COL_T).shift(lit(1)).over(grouping_columns.clone()))
.alias("time_diff_s"),
])
.with_columns([
(col("time_diff_s") * lit(1_000_000.0)).alias("time_diff_us"),
])
.filter(
col("time_diff_us")
.is_null()
.or(col("time_diff_us").gt_eq(lit(filter.period_us))),
)
.drop(["time_diff_s", "time_diff_us"]);
debug!("Refractory filter applied using Polars window functions");
Ok(filtered_df)
}
#[cfg_attr(unix, instrument(skip(df), fields(filter = ?filter)))]
pub fn apply_temporal_correlation_filter_polars(
df: LazyFrame,
filter: &TemporalCorrelationFilter,
) -> PolarsResult<LazyFrame> {
debug!("Applying temporal correlation filter: {:?}", filter);
let time_window_sec = filter.time_window_us / 1_000_000.0;
let filtered_df = df
.sort([COL_T], SortMultipleOptions::default()) .with_row_index("original_idx", None) .with_columns([
(col(COL_T) - lit(time_window_sec)).alias("window_start"),
(col(COL_T) + lit(time_window_sec)).alias("window_end"),
])
.with_columns([
col(COL_T)
.map(
move |s| {
let times = s.f64()?;
let len = times.len();
let mut neighbor_counts = Vec::with_capacity(len);
for i in 0..len {
if let Some(event_time) = times.get(i) {
let mut count = 0usize;
let min_time = event_time - time_window_sec;
let max_time = event_time + time_window_sec;
for j in 0..len {
if i != j {
if let Some(other_time) = times.get(j) {
if other_time >= min_time && other_time <= max_time {
count += 1;
}
}
}
}
neighbor_counts.push(Some(count as i64));
} else {
neighbor_counts.push(None);
}
}
Ok(Some(
Int64Chunked::from_iter_options(
"neighbor_count".into(),
neighbor_counts.into_iter(),
)
.into_series()
.into(),
))
},
GetOutput::from_type(DataType::Int64),
)
.alias("neighbor_count"),
])
.filter(
col("neighbor_count").gt_eq(lit(filter.min_events as i64)),
)
.drop([
"original_idx",
"window_start",
"window_end",
"neighbor_count",
]);
debug!("Temporal correlation filter applied using Polars window functions");
Ok(filtered_df)
}
#[cfg_attr(unix, instrument(skip(df), fields(filter = ?filter)))]
pub fn apply_spatial_temporal_filter_polars(
df: LazyFrame,
filter: &SpatialTemporalFilter,
) -> PolarsResult<LazyFrame> {
debug!("Applying spatial-temporal correlation filter: {:?}", filter);
let time_window_sec = filter.time_window_us / 1_000_000.0;
let radius = filter.spatial_radius as i64;
let filtered_df = df
.sort([COL_T], SortMultipleOptions::default()) .with_row_index("event_idx", None) .with_columns([
(col(COL_X).cast(DataType::Int64) / lit((radius.max(1) / 2).max(1))).alias("grid_x"),
(col(COL_Y).cast(DataType::Int64) / lit((radius.max(1) / 2).max(1))).alias("grid_y"),
(col(COL_T) - lit(time_window_sec)).alias("time_min"),
(col(COL_T) + lit(time_window_sec)).alias("time_max"),
]);
let self_df = filtered_df.clone();
let result_df = filtered_df
.join(
self_df.select([
col("event_idx").alias("neighbor_idx"),
col(COL_X).alias("neighbor_x"),
col(COL_Y).alias("neighbor_y"),
col(COL_T).alias("neighbor_t"),
col(COL_POLARITY).alias("neighbor_polarity"),
]),
[col("grid_x"), col("grid_y")],
[col("grid_x"), col("grid_y")],
JoinArgs::new(JoinType::Inner),
)
.filter(
col("event_idx").neq(col("neighbor_idx")),
)
.with_columns([
(col(COL_X).cast(DataType::Int64) - col("neighbor_x").cast(DataType::Int64))
.alias("dx_raw"),
(col(COL_Y).cast(DataType::Int64) - col("neighbor_y").cast(DataType::Int64))
.alias("dy_raw"),
(col(COL_T) - col("neighbor_t")).alias("dt_raw"),
])
.with_columns([
when(col("dx_raw").lt(lit(0)))
.then(-col("dx_raw"))
.otherwise(col("dx_raw"))
.alias("dx_abs"),
when(col("dy_raw").lt(lit(0)))
.then(-col("dy_raw"))
.otherwise(col("dy_raw"))
.alias("dy_abs"),
when(col("dt_raw").lt(lit(0.0)))
.then(-col("dt_raw"))
.otherwise(col("dt_raw"))
.alias("dt_abs"),
])
.with_columns([
when(col("dx_abs").gt(col("dy_abs")))
.then(col("dx_abs"))
.otherwise(col("dy_abs"))
.alias("spatial_distance"),
])
.filter(
col("spatial_distance").lt_eq(lit(radius)),
)
.filter(
col("dt_abs").lt_eq(lit(time_window_sec)),
)
.filter(
if filter.consider_polarity {
col(COL_POLARITY).eq(col("neighbor_polarity"))
} else {
lit(true)
},
)
.group_by([col("event_idx")])
.agg([
col(COL_X).first(),
col(COL_Y).first(),
col(COL_T).first(),
col(COL_POLARITY).first(),
col("neighbor_idx").count().alias("neighbor_count"),
])
.filter(
col("neighbor_count").gt_eq(lit(filter.min_neighbors as u32)),
)
.select([col(COL_X), col(COL_Y), col(COL_T), col(COL_POLARITY)])
.sort([COL_T], SortMultipleOptions::default());
debug!("Spatial-temporal correlation filter applied using Polars grid binning");
Ok(result_df)
}
#[cfg_attr(unix, instrument(skip(df), fields(threshold = threshold_events_per_sec)))]
pub fn apply_background_activity_filter_polars(
df: LazyFrame,
threshold_events_per_sec: f64,
) -> PolarsResult<LazyFrame> {
debug!(
"Applying background activity filter with threshold: {:.1} events/s",
threshold_events_per_sec
);
let time_stats = df
.clone()
.select([
col(COL_T).min().alias("min_time"),
col(COL_T).max().alias("max_time"),
])
.collect()?;
let min_time = time_stats.column("min_time")?.f64()?.get(0).unwrap_or(0.0);
let max_time = time_stats.column("max_time")?.f64()?.get(0).unwrap_or(0.0);
let time_span = (max_time - min_time).max(1e-6);
let filtered_df = df
.with_columns([
(col(COL_X).cast(DataType::String) + lit(",") + col(COL_Y).cast(DataType::String))
.alias("pixel_key"),
])
.with_columns([
lit(time_span).alias("time_span"),
])
.group_by([col("pixel_key")])
.agg([
col(COL_T).count().alias("event_count"),
col("time_span").first().alias("time_span"),
col(COL_X).first().alias("pixel_x"),
col(COL_Y).first().alias("pixel_y"),
col(COL_X).alias("all_x"),
col(COL_Y).alias("all_y"),
col(COL_T).alias("all_t"),
col(COL_POLARITY).alias("all_polarity"),
])
.with_columns([
(col("event_count").cast(DataType::Float64) / col("time_span")).alias("event_rate"),
])
.filter(
col("event_rate").gt_eq(lit(threshold_events_per_sec)),
)
.select([
col("all_x").explode().alias(COL_X),
col("all_y").explode().alias(COL_Y),
col("all_t").explode().alias(COL_T),
col("all_polarity").explode().alias(COL_POLARITY),
])
.sort([COL_T], SortMultipleOptions::default());
debug!("Background activity filter applied using Polars group_by operations");
Ok(filtered_df)
}
#[cfg_attr(unix, instrument(skip(df), fields(filter = ?filter)))]
pub fn apply_multi_scale_filter_polars(
df: LazyFrame,
filter: &DenoiseFilter,
) -> PolarsResult<LazyFrame> {
debug!("Applying multi-scale filter: {:?}", filter);
let mut current_df = df;
if let Some(ref_filter) = &filter.refractory_filter {
current_df = apply_refractory_filter_polars(current_df, ref_filter)?;
debug!("Multi-scale: applied refractory filter");
}
if let Some(st_filter) = &filter.spatial_temporal_filter {
current_df = apply_spatial_temporal_filter_polars(current_df, st_filter)?;
debug!("Multi-scale: applied spatial-temporal filter");
}
if let Some(threshold) = filter.background_threshold {
current_df = apply_background_activity_filter_polars(current_df, threshold)?;
debug!("Multi-scale: applied background activity filter");
}
debug!("Multi-scale filter pipeline completed using Polars operations");
Ok(current_df)
}
#[cfg_attr(unix, instrument(skip(df), fields(method = ?filter.method)))]
pub fn apply_denoise_filter_polars(
df: LazyFrame,
filter: &DenoiseFilter,
) -> PolarsResult<LazyFrame> {
debug!("Applying Polars-first denoise filter: {:?}", filter.method);
match filter.method {
DenoiseMethod::RefractoryPeriod => {
apply_refractory_filter_polars(df, filter.refractory_filter.as_ref().unwrap())
}
DenoiseMethod::TemporalCorrelation => apply_temporal_correlation_filter_polars(
df,
filter.temporal_correlation_filter.as_ref().unwrap(),
),
DenoiseMethod::SpatialTemporalCorrelation => apply_spatial_temporal_filter_polars(
df,
filter.spatial_temporal_filter.as_ref().unwrap(),
),
DenoiseMethod::BackgroundActivity => {
apply_background_activity_filter_polars(df, filter.background_threshold.unwrap())
}
DenoiseMethod::MultiScale => apply_multi_scale_filter_polars(df, filter),
}
}
pub fn apply_refractory_period_polars(df: LazyFrame, period_us: f64) -> PolarsResult<LazyFrame> {
let filter = RefractoryFilter::new(period_us);
apply_refractory_filter_polars(df, &filter)
}
pub fn apply_temporal_correlation_polars(
df: LazyFrame,
time_window_us: f64,
min_events: usize,
) -> PolarsResult<LazyFrame> {
let filter = TemporalCorrelationFilter::new(time_window_us, min_events);
apply_temporal_correlation_filter_polars(df, &filter)
}
pub fn apply_spatial_temporal_polars(
df: LazyFrame,
spatial_radius: u16,
time_window_us: f64,
min_neighbors: usize,
) -> PolarsResult<LazyFrame> {
let filter = SpatialTemporalFilter::new(spatial_radius, time_window_us, min_neighbors);
apply_spatial_temporal_filter_polars(df, &filter)
}
pub fn apply_background_activity_polars(
df: LazyFrame,
threshold_events_per_sec: f64,
) -> PolarsResult<LazyFrame> {
apply_background_activity_filter_polars(df, threshold_events_per_sec)
}