use crate::ev_filtering::config::Validatable;
use crate::ev_filtering::{FilterError, FilterResult};
use polars::prelude::*;
use std::collections::HashSet;
#[cfg(unix)]
use tracing::{debug, instrument, warn};
#[cfg(not(unix))]
macro_rules! debug {
($($args:tt)*) => {};
}
#[cfg(not(unix))]
macro_rules! info {
($($args:tt)*) => {};
}
#[cfg(not(unix))]
macro_rules! warn {
($($args:tt)*) => {
eprintln!("[WARN] {}", format!($($args)*))
};
}
#[cfg(not(unix))]
macro_rules! instrument {
($($args:tt)*) => {};
}
pub const COL_X: &str = "x";
pub const COL_Y: &str = "y";
pub const COL_T: &str = "t";
pub const COL_POLARITY: &str = "polarity";
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
pub struct Point {
pub x: u16,
pub y: u16,
}
impl Point {
pub fn new(x: u16, y: u16) -> Self {
Self { x, y }
}
}
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
pub struct RegionOfInterest {
pub min_x: u16,
pub max_x: u16,
pub min_y: u16,
pub max_y: u16,
}
impl RegionOfInterest {
pub fn new(min_x: u16, max_x: u16, min_y: u16, max_y: u16) -> Result<Self, FilterError> {
if min_x >= max_x {
return Err(FilterError::InvalidConfig(format!(
"min_x ({}) must be less than max_x ({})",
min_x, max_x
)));
}
if min_y >= max_y {
return Err(FilterError::InvalidConfig(format!(
"min_y ({}) must be less than max_y ({})",
min_y, max_y
)));
}
Ok(Self {
min_x,
max_x,
min_y,
max_y,
})
}
pub fn from_center(
center_x: u16,
center_y: u16,
width: u16,
height: u16,
) -> Result<Self, FilterError> {
let half_width = width / 2;
let half_height = height / 2;
let min_x = center_x.saturating_sub(half_width);
let max_x = center_x.saturating_add(half_width);
let min_y = center_y.saturating_sub(half_height);
let max_y = center_y.saturating_add(half_height);
Self::new(min_x, max_x, min_y, max_y)
}
pub fn full_sensor(width: u16, height: u16) -> Self {
Self {
min_x: 0,
max_x: width - 1,
min_y: 0,
max_y: height - 1,
}
}
pub fn width(&self) -> u16 {
self.max_x - self.min_x + 1
}
pub fn height(&self) -> u16 {
self.max_y - self.min_y + 1
}
pub fn area(&self) -> u32 {
self.width() as u32 * self.height() as u32
}
pub fn center(&self) -> (u16, u16) {
let center_x = (self.min_x + self.max_x) / 2;
let center_y = (self.min_y + self.max_y) / 2;
(center_x, center_y)
}
pub fn to_polars_expr(&self) -> Expr {
col(COL_X)
.gt_eq(lit(self.min_x as i64))
.and(col(COL_X).lt_eq(lit(self.max_x as i64)))
.and(col(COL_Y).gt_eq(lit(self.min_y as i64)))
.and(col(COL_Y).lt_eq(lit(self.max_y as i64)))
}
#[inline]
pub fn contains(&self, x: u16, y: u16) -> bool {
x >= self.min_x && x <= self.max_x && y >= self.min_y && y <= self.max_y
}
pub fn overlaps_with(&self, other: &RegionOfInterest) -> bool {
!(self.max_x < other.min_x
|| self.min_x > other.max_x
|| self.max_y < other.min_y
|| self.min_y > other.max_y)
}
pub fn intersection(&self, other: &RegionOfInterest) -> Option<RegionOfInterest> {
if !self.overlaps_with(other) {
return None;
}
let min_x = self.min_x.max(other.min_x);
let max_x = self.max_x.min(other.max_x);
let min_y = self.min_y.max(other.min_y);
let max_y = self.max_y.min(other.max_y);
Some(RegionOfInterest {
min_x,
max_x,
min_y,
max_y,
})
}
pub fn scale(&self, scale_x: f64, scale_y: f64) -> Result<RegionOfInterest, FilterError> {
let min_x = (self.min_x as f64 * scale_x).round() as u16;
let max_x = (self.max_x as f64 * scale_x).round() as u16;
let min_y = (self.min_y as f64 * scale_y).round() as u16;
let max_y = (self.max_y as f64 * scale_y).round() as u16;
Self::new(min_x, max_x, min_y, max_y)
}
pub fn description(&self) -> String {
format!(
"{}x{} to {}x{} ({}x{} pixels)",
self.min_x,
self.min_y,
self.max_x,
self.max_y,
self.width(),
self.height()
)
}
}
impl std::fmt::Display for RegionOfInterest {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
write!(f, "{}", self.description())
}
}
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
pub struct CircularROI {
pub center_x: u16,
pub center_y: u16,
pub radius: u16,
}
impl CircularROI {
pub fn new(center_x: u16, center_y: u16, radius: u16) -> Self {
Self {
center_x,
center_y,
radius,
}
}
pub fn to_polars_expr(&self) -> Expr {
let dx = col(COL_X).cast(DataType::Float64) - lit(self.center_x as f64);
let dy = col(COL_Y).cast(DataType::Float64) - lit(self.center_y as f64);
let distance_squared = dx.clone() * dx + dy.clone() * dy;
let radius_squared = lit((self.radius as f64).powi(2));
distance_squared.lt_eq(radius_squared)
}
#[inline]
pub fn contains(&self, x: u16, y: u16) -> bool {
let dx = (x as i32 - self.center_x as i32) as f64;
let dy = (y as i32 - self.center_y as i32) as f64;
let distance_squared = dx * dx + dy * dy;
distance_squared <= (self.radius as f64).powi(2)
}
pub fn bounding_box(&self) -> RegionOfInterest {
let min_x = self.center_x.saturating_sub(self.radius);
let max_x = self.center_x.saturating_add(self.radius);
let min_y = self.center_y.saturating_sub(self.radius);
let max_y = self.center_y.saturating_add(self.radius);
RegionOfInterest::new(min_x, max_x, min_y, max_y)
.unwrap_or_else(|_| RegionOfInterest::new(0, u16::MAX, 0, u16::MAX).unwrap())
}
pub fn area(&self) -> f64 {
std::f64::consts::PI * (self.radius as f64).powi(2)
}
}
#[derive(Debug, Clone, PartialEq, Eq)]
pub struct PolygonROI {
pub vertices: Vec<Point>,
bounding_box: Option<RegionOfInterest>,
}
impl PolygonROI {
pub fn new(vertices: Vec<Point>) -> Result<Self, FilterError> {
if vertices.len() < 3 {
return Err(FilterError::InvalidConfig(
"Polygon must have at least 3 vertices".to_string(),
));
}
let mut polygon = Self {
vertices,
bounding_box: None,
};
polygon.update_bounding_box();
Ok(polygon)
}
pub fn triangle(p1: Point, p2: Point, p3: Point) -> Result<Self, FilterError> {
Self::new(vec![p1, p2, p3])
}
pub fn rectangle(min_x: u16, max_x: u16, min_y: u16, max_y: u16) -> Result<Self, FilterError> {
if min_x >= max_x || min_y >= max_y {
return Err(FilterError::InvalidConfig(
"Invalid rectangle bounds".to_string(),
));
}
let vertices = vec![
Point::new(min_x, min_y),
Point::new(max_x, min_y),
Point::new(max_x, max_y),
Point::new(min_x, max_y),
];
Self::new(vertices)
}
fn update_bounding_box(&mut self) {
if self.vertices.is_empty() {
self.bounding_box = None;
return;
}
let mut min_x = self.vertices[0].x;
let mut max_x = self.vertices[0].x;
let mut min_y = self.vertices[0].y;
let mut max_y = self.vertices[0].y;
for vertex in &self.vertices {
min_x = min_x.min(vertex.x);
max_x = max_x.max(vertex.x);
min_y = min_y.min(vertex.y);
max_y = max_y.max(vertex.y);
}
self.bounding_box = Some(
RegionOfInterest::new(min_x, max_x, min_y, max_y)
.unwrap_or_else(|_| RegionOfInterest::new(0, u16::MAX, 0, u16::MAX).unwrap()),
);
}
pub fn bounding_box(&self) -> Option<&RegionOfInterest> {
self.bounding_box.as_ref()
}
pub fn to_polars_expr(&self) -> Expr {
match &self.bounding_box {
Some(bbox) => bbox.to_polars_expr(),
None => lit(false), }
}
#[inline]
pub fn contains(&self, x: u16, y: u16) -> bool {
if let Some(bbox) = &self.bounding_box {
if !bbox.contains(x, y) {
return false;
}
}
let mut inside = false;
let test_x = x as f64;
let test_y = y as f64;
let mut j = self.vertices.len() - 1;
for i in 0..self.vertices.len() {
let vi = &self.vertices[i];
let vj = &self.vertices[j];
let vi_x = vi.x as f64;
let vi_y = vi.y as f64;
let vj_x = vj.x as f64;
let vj_y = vj.y as f64;
if ((vi_y > test_y) != (vj_y > test_y))
&& (test_x < (vj_x - vi_x) * (test_y - vi_y) / (vj_y - vi_y) + vi_x)
{
inside = !inside;
}
j = i;
}
inside
}
pub fn area(&self) -> f64 {
if self.vertices.len() < 3 {
return 0.0;
}
let mut area = 0.0;
let n = self.vertices.len();
for i in 0..n {
let j = (i + 1) % n;
let xi = self.vertices[i].x as f64;
let yi = self.vertices[i].y as f64;
let xj = self.vertices[j].x as f64;
let yj = self.vertices[j].y as f64;
area += xi * yj - xj * yi;
}
(area / 2.0).abs()
}
}
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
pub enum ROICombination {
Union,
Intersection,
Difference,
SymmetricDifference,
}
#[derive(Debug, Clone)]
pub struct MultipleROIs {
pub rois: Vec<RegionOfInterest>,
pub circular_rois: Vec<CircularROI>,
pub polygon_rois: Vec<PolygonROI>,
pub combination: ROICombination,
}
impl MultipleROIs {
pub fn new(combination: ROICombination) -> Self {
Self {
rois: Vec::new(),
circular_rois: Vec::new(),
polygon_rois: Vec::new(),
combination,
}
}
pub fn add_roi(mut self, roi: RegionOfInterest) -> Self {
self.rois.push(roi);
self
}
pub fn add_circular_roi(mut self, circular_roi: CircularROI) -> Self {
self.circular_rois.push(circular_roi);
self
}
pub fn add_polygon_roi(mut self, polygon_roi: PolygonROI) -> Self {
self.polygon_rois.push(polygon_roi);
self
}
pub fn to_polars_expr(&self) -> Option<Expr> {
let mut expressions = Vec::new();
for roi in &self.rois {
expressions.push(roi.to_polars_expr());
}
for circular_roi in &self.circular_rois {
expressions.push(circular_roi.to_polars_expr());
}
for polygon_roi in &self.polygon_rois {
expressions.push(polygon_roi.to_polars_expr());
}
if expressions.is_empty() {
return None;
}
match self.combination {
ROICombination::Union => Some(
expressions
.into_iter()
.reduce(|acc, expr| acc.or(expr))
.unwrap(),
),
ROICombination::Intersection => Some(
expressions
.into_iter()
.reduce(|acc, expr| acc.and(expr))
.unwrap(),
),
ROICombination::Difference => {
if expressions.is_empty() {
None
} else {
let first = expressions[0].clone();
if expressions.len() == 1 {
Some(first)
} else {
let others = expressions
.into_iter()
.skip(1)
.reduce(|acc, expr| acc.or(expr))
.unwrap();
Some(first.and(others.not()))
}
}
}
ROICombination::SymmetricDifference => {
warn!("SymmetricDifference ROI combination approximated as Union in Polars expressions");
Some(
expressions
.into_iter()
.reduce(|acc, expr| acc.or(expr))
.unwrap(),
)
}
}
}
pub fn contains(&self, x: u16, y: u16) -> bool {
let mut matches = Vec::new();
for roi in &self.rois {
matches.push(roi.contains(x, y));
}
for circular_roi in &self.circular_rois {
matches.push(circular_roi.contains(x, y));
}
for polygon_roi in &self.polygon_rois {
matches.push(polygon_roi.contains(x, y));
}
if matches.is_empty() {
return false;
}
match self.combination {
ROICombination::Union => matches.iter().any(|&m| m),
ROICombination::Intersection => matches.iter().all(|&m| m),
ROICombination::Difference => {
matches.first().copied().unwrap_or(false) && !matches.iter().skip(1).any(|&m| m)
}
ROICombination::SymmetricDifference => matches.iter().filter(|&&m| m).count() % 2 == 1,
}
}
pub fn total_rois(&self) -> usize {
self.rois.len() + self.circular_rois.len() + self.polygon_rois.len()
}
}
#[derive(Debug, Clone)]
pub struct SpatialFilter {
pub roi: Option<RegionOfInterest>,
pub excluded_pixels: Option<HashSet<(u16, u16)>>,
pub included_pixels: Option<HashSet<(u16, u16)>>,
pub validate_coordinates: bool,
pub max_coordinate: Option<(u16, u16)>,
pub circular_roi: Option<CircularROI>,
pub polygon_roi: Option<PolygonROI>,
pub multiple_rois: Option<MultipleROIs>,
}
impl SpatialFilter {
pub fn roi(min_x: u16, max_x: u16, min_y: u16, max_y: u16) -> Self {
Self {
roi: Some(RegionOfInterest::new(min_x, max_x, min_y, max_y).unwrap()),
excluded_pixels: None,
included_pixels: None,
validate_coordinates: true,
max_coordinate: None,
circular_roi: None,
polygon_roi: None,
multiple_rois: None,
}
}
pub fn from_roi(roi: RegionOfInterest) -> Self {
Self {
roi: Some(roi),
excluded_pixels: None,
included_pixels: None,
validate_coordinates: true,
max_coordinate: None,
circular_roi: None,
polygon_roi: None,
multiple_rois: None,
}
}
pub fn exclude_pixels(pixels: HashSet<(u16, u16)>) -> Self {
warn!("Excluded pixels filtering is not optimized for Polars - consider using ROI-based filtering");
Self {
roi: None,
excluded_pixels: Some(pixels),
included_pixels: None,
validate_coordinates: true,
max_coordinate: None,
circular_roi: None,
polygon_roi: None,
multiple_rois: None,
}
}
pub fn include_only_pixels(pixels: HashSet<(u16, u16)>) -> Self {
warn!("Included pixels filtering is not optimized for Polars - consider using ROI-based filtering");
Self {
roi: None,
excluded_pixels: None,
included_pixels: Some(pixels),
validate_coordinates: true,
max_coordinate: None,
circular_roi: None,
polygon_roi: None,
multiple_rois: None,
}
}
pub fn sensor_bounds(width: u16, height: u16) -> Self {
Self {
roi: Some(RegionOfInterest::full_sensor(width, height)),
excluded_pixels: None,
included_pixels: None,
validate_coordinates: true,
max_coordinate: Some((width - 1, height - 1)),
circular_roi: None,
polygon_roi: None,
multiple_rois: None,
}
}
pub fn circular(center_x: u16, center_y: u16, radius: u16) -> Self {
Self {
roi: None,
excluded_pixels: None,
included_pixels: None,
validate_coordinates: true,
max_coordinate: None,
circular_roi: Some(CircularROI::new(center_x, center_y, radius)),
polygon_roi: None,
multiple_rois: None,
}
}
pub fn from_circular_roi(circular_roi: CircularROI) -> Self {
Self {
roi: None,
excluded_pixels: None,
included_pixels: None,
validate_coordinates: true,
max_coordinate: None,
circular_roi: Some(circular_roi),
polygon_roi: None,
multiple_rois: None,
}
}
pub fn polygon(vertices: Vec<Point>) -> Result<Self, FilterError> {
let polygon_roi = PolygonROI::new(vertices)?;
Ok(Self {
roi: None,
excluded_pixels: None,
included_pixels: None,
validate_coordinates: true,
max_coordinate: None,
circular_roi: None,
polygon_roi: Some(polygon_roi),
multiple_rois: None,
})
}
pub fn triangle(p1: Point, p2: Point, p3: Point) -> Result<Self, FilterError> {
let polygon_roi = PolygonROI::triangle(p1, p2, p3)?;
Ok(Self {
roi: None,
excluded_pixels: None,
included_pixels: None,
validate_coordinates: true,
max_coordinate: None,
circular_roi: None,
polygon_roi: Some(polygon_roi),
multiple_rois: None,
})
}
pub fn from_polygon_roi(polygon_roi: PolygonROI) -> Self {
Self {
roi: None,
excluded_pixels: None,
included_pixels: None,
validate_coordinates: true,
max_coordinate: None,
circular_roi: None,
polygon_roi: Some(polygon_roi),
multiple_rois: None,
}
}
pub fn multiple_rois(multiple_rois: MultipleROIs) -> Self {
Self {
roi: None,
excluded_pixels: None,
included_pixels: None,
validate_coordinates: true,
max_coordinate: None,
circular_roi: None,
polygon_roi: None,
multiple_rois: Some(multiple_rois),
}
}
pub fn union() -> Self {
Self::multiple_rois(MultipleROIs::new(ROICombination::Union))
}
pub fn intersection() -> Self {
Self::multiple_rois(MultipleROIs::new(ROICombination::Intersection))
}
pub fn with_excluded_pixels(mut self, pixels: HashSet<(u16, u16)>) -> Self {
warn!(
"with_excluded_pixels is not optimized for Polars - consider using ROI-based filtering"
);
self.excluded_pixels = Some(pixels);
self
}
pub fn with_included_pixels(mut self, pixels: HashSet<(u16, u16)>) -> Self {
warn!(
"with_included_pixels is not optimized for Polars - consider using ROI-based filtering"
);
self.included_pixels = Some(pixels);
self
}
pub fn with_coordinate_validation(mut self, validate: bool) -> Self {
self.validate_coordinates = validate;
self
}
pub fn with_max_coordinates(mut self, max_x: u16, max_y: u16) -> Self {
self.max_coordinate = Some((max_x, max_y));
self
}
pub fn to_polars_expr(&self) -> PolarsResult<Option<Expr>> {
let mut conditions = Vec::new();
if let Some(roi) = &self.roi {
conditions.push(roi.to_polars_expr());
}
if let Some(circular_roi) = &self.circular_roi {
conditions.push(circular_roi.to_polars_expr());
}
if let Some(polygon_roi) = &self.polygon_roi {
conditions.push(polygon_roi.to_polars_expr());
}
if let Some(multiple_rois) = &self.multiple_rois {
if let Some(expr) = multiple_rois.to_polars_expr() {
conditions.push(expr);
}
}
if self.validate_coordinates {
if let Some((max_x, max_y)) = self.max_coordinate {
conditions.push(
col(COL_X)
.lt_eq(lit(max_x as i64))
.and(col(COL_Y).lt_eq(lit(max_y as i64)))
.and(col(COL_X).gt_eq(lit(0)))
.and(col(COL_Y).gt_eq(lit(0))),
);
}
}
match conditions.len() {
0 => Ok(None), 1 => Ok(Some(conditions.into_iter().next().unwrap())),
_ => {
let combined = conditions
.into_iter()
.reduce(|acc, cond| acc.and(cond))
.unwrap();
Ok(Some(combined))
}
}
}
#[inline]
pub fn passes_filter(&self, x: u16, y: u16) -> bool {
if let Some(roi) = &self.roi {
if !roi.contains(x, y) {
return false;
}
}
if let Some(circular_roi) = &self.circular_roi {
if !circular_roi.contains(x, y) {
return false;
}
}
if let Some(polygon_roi) = &self.polygon_roi {
if !polygon_roi.contains(x, y) {
return false;
}
}
if let Some(multiple_rois) = &self.multiple_rois {
if !multiple_rois.contains(x, y) {
return false;
}
}
if let Some(excluded) = &self.excluded_pixels {
if excluded.contains(&(x, y)) {
return false;
}
}
if let Some(included) = &self.included_pixels {
if !included.contains(&(x, y)) {
return false;
}
}
if self.validate_coordinates {
if let Some((max_x, max_y)) = self.max_coordinate {
if x > max_x || y > max_y {
return false;
}
}
}
true
}
pub fn description(&self) -> String {
let mut parts = Vec::new();
if let Some(roi) = &self.roi {
parts.push(format!("ROI: {}", roi.description()));
}
if let Some(circular_roi) = &self.circular_roi {
parts.push(format!(
"Circular ROI: center({}, {}) radius={}",
circular_roi.center_x, circular_roi.center_y, circular_roi.radius
));
}
if let Some(polygon_roi) = &self.polygon_roi {
parts.push(format!(
"Polygon ROI: {} vertices, area={:.1}",
polygon_roi.vertices.len(),
polygon_roi.area()
));
}
if let Some(multiple_rois) = &self.multiple_rois {
parts.push(format!(
"Multiple ROIs: {} ROIs with {:?} combination",
multiple_rois.total_rois(),
multiple_rois.combination
));
}
if let Some(excluded) = &self.excluded_pixels {
parts.push(format!("Excluded: {} pixels", excluded.len()));
}
if let Some(included) = &self.included_pixels {
parts.push(format!("Included: {} pixels", included.len()));
}
if let Some((max_x, max_y)) = self.max_coordinate {
parts.push(format!("Max coords: {}x{}", max_x, max_y));
}
if parts.is_empty() {
"No spatial constraints".to_string()
} else {
parts.join(", ")
}
}
pub fn apply_to_dataframe(&self, df: LazyFrame) -> PolarsResult<LazyFrame> {
apply_spatial_filter(df, self)
}
pub fn apply_to_dataframe_eager(&self, df: DataFrame) -> PolarsResult<DataFrame> {
apply_spatial_filter(df.lazy(), self)?.collect()
}
}
impl Default for SpatialFilter {
fn default() -> Self {
Self {
roi: None,
excluded_pixels: None,
included_pixels: None,
validate_coordinates: true,
max_coordinate: None,
circular_roi: None,
polygon_roi: None,
multiple_rois: None,
}
}
}
impl Validatable for SpatialFilter {
fn validate(&self) -> FilterResult<()> {
if let Some(roi) = &self.roi {
if roi.width() == 0 || roi.height() == 0 {
return Err(FilterError::InvalidConfig(
"ROI must have non-zero width and height".to_string(),
));
}
}
if let (Some(included), Some(excluded)) = (&self.included_pixels, &self.excluded_pixels) {
let intersection: Vec<_> = included.intersection(excluded).collect();
if !intersection.is_empty() {
warn!(
"Spatial filter has {} pixels in both included and excluded sets",
intersection.len()
);
}
}
if let Some((max_x, max_y)) = self.max_coordinate {
if max_x == 0 || max_y == 0 {
return Err(FilterError::InvalidConfig(
"Maximum coordinates must be positive".to_string(),
));
}
}
Ok(())
}
}
#[cfg_attr(unix, instrument(skip(df), fields(filter = ?filter.description())))]
pub fn apply_spatial_filter(df: LazyFrame, filter: &SpatialFilter) -> PolarsResult<LazyFrame> {
debug!("Applying spatial filter: {}", filter.description());
let mut filtered_df = df;
if let Some(expr) = filter.to_polars_expr()? {
debug!("Applying ROI and coordinate-based filtering with Polars expressions");
filtered_df = filtered_df.filter(expr);
}
filtered_df = apply_pixel_based_filter_polars(filtered_df, filter)?;
debug!("Spatial filtering completed using pure Polars operations");
Ok(filtered_df)
}
#[cfg_attr(unix, instrument(skip(df, filter)))]
fn apply_pixel_based_filter_polars(
df: LazyFrame,
filter: &SpatialFilter,
) -> PolarsResult<LazyFrame> {
let mut filtered_df = df;
if let Some(excluded_pixels) = &filter.excluded_pixels {
if !excluded_pixels.is_empty() {
debug!(
"Applying excluded pixels filter using Polars expressions ({} pixels)",
excluded_pixels.len()
);
let excluded_coords: Vec<(i64, i64)> = excluded_pixels
.iter()
.map(|(x, y)| (*x as i64, *y as i64))
.collect();
if excluded_coords.len() <= 100 {
let mut exclusion_expr: Option<Expr> = None;
for (x, y) in excluded_coords {
let pixel_expr = col(COL_X).eq(lit(x)).and(col(COL_Y).eq(lit(y)));
exclusion_expr = match exclusion_expr {
None => Some(pixel_expr),
Some(existing) => Some(existing.or(pixel_expr)),
};
}
if let Some(expr) = exclusion_expr {
filtered_df = filtered_df.filter(expr.not());
}
} else {
let excluded_x: Vec<i64> = excluded_coords.iter().map(|(x, _)| *x).collect();
let excluded_y: Vec<i64> = excluded_coords.iter().map(|(_, y)| *y).collect();
let excluded_df = df![
"excluded_x" => excluded_x,
"excluded_y" => excluded_y,
]?
.lazy();
filtered_df = filtered_df
.join(
excluded_df,
[col(COL_X), col(COL_Y)],
[col("excluded_x"), col("excluded_y")],
JoinArgs::new(JoinType::Left).with_suffix(Some("_right".into())),
)
.filter(col("excluded_x_right").is_null());
}
}
}
if let Some(included_pixels) = &filter.included_pixels {
if !included_pixels.is_empty() {
debug!(
"Applying included pixels filter using Polars expressions ({} pixels)",
included_pixels.len()
);
let included_coords: Vec<(i64, i64)> = included_pixels
.iter()
.map(|(x, y)| (*x as i64, *y as i64))
.collect();
if included_coords.len() <= 100 {
let mut inclusion_expr: Option<Expr> = None;
for (x, y) in included_coords {
let pixel_expr = col(COL_X).eq(lit(x)).and(col(COL_Y).eq(lit(y)));
inclusion_expr = match inclusion_expr {
None => Some(pixel_expr),
Some(existing) => Some(existing.or(pixel_expr)),
};
}
if let Some(expr) = inclusion_expr {
filtered_df = filtered_df.filter(expr);
}
} else {
let included_x: Vec<i64> = included_coords.iter().map(|(x, _)| *x).collect();
let included_y: Vec<i64> = included_coords.iter().map(|(_, y)| *y).collect();
let included_df = df![
"included_x" => included_x,
"included_y" => included_y,
]?
.lazy();
filtered_df = filtered_df
.join(
included_df,
[col(COL_X), col(COL_Y)],
[col("included_x"), col("included_y")],
JoinArgs::new(JoinType::Inner),
)
.select([col(COL_X), col(COL_Y), col(COL_T), col(COL_POLARITY)]);
}
} else {
debug!("Empty included pixels set - filtering out all events");
filtered_df = filtered_df.filter(lit(false));
}
}
Ok(filtered_df)
}
#[cfg_attr(unix, instrument(skip(df)))]
pub fn filter_roi_polars(
df: LazyFrame,
min_x: u16,
max_x: u16,
min_y: u16,
max_y: u16,
) -> PolarsResult<LazyFrame> {
let roi = RegionOfInterest::new(min_x, max_x, min_y, max_y)
.map_err(|e| PolarsError::ComputeError(format!("Invalid ROI: {}", e).into()))?;
Ok(df.filter(roi.to_polars_expr()))
}
#[cfg_attr(unix, instrument(skip(df)))]
pub fn filter_circular_roi_polars(
df: LazyFrame,
center_x: u16,
center_y: u16,
radius: u16,
) -> PolarsResult<LazyFrame> {
let circular_roi = CircularROI::new(center_x, center_y, radius);
Ok(df.filter(circular_roi.to_polars_expr()))
}
#[cfg_attr(unix, instrument(skip(df)))]
pub fn get_spatial_statistics(df: LazyFrame) -> PolarsResult<DataFrame> {
df.select([
col(COL_X).min().alias("min_x"),
col(COL_X).max().alias("max_x"),
col(COL_X).mean().alias("mean_x"),
col(COL_X).std(1).alias("std_x"),
col(COL_Y).min().alias("min_y"),
col(COL_Y).max().alias("max_y"),
col(COL_Y).mean().alias("mean_y"),
col(COL_Y).std(1).alias("std_y"),
len().alias("total_events"),
(col(COL_X).max() - col(COL_X).min()).alias("width"),
(col(COL_Y).max() - col(COL_Y).min()).alias("height"),
len().alias("unique_pixels"),
])
.collect()
}
#[cfg_attr(unix, instrument(skip(df)))]
pub fn create_spatial_histogram(
df: LazyFrame,
bin_size_x: u16,
bin_size_y: u16,
) -> PolarsResult<DataFrame> {
df.with_columns([
((col(COL_X).cast(DataType::Float64) / lit(bin_size_x as f64))
.cast(DataType::Int64)
.cast(DataType::Float64)
* lit(bin_size_x as f64))
.alias("bin_x"),
((col(COL_Y).cast(DataType::Float64) / lit(bin_size_y as f64))
.cast(DataType::Int64)
.cast(DataType::Float64)
* lit(bin_size_y as f64))
.alias("bin_y"),
])
.group_by([col("bin_x"), col("bin_y")])
.agg([
len().alias("event_count"),
col(COL_POLARITY).sum().alias("positive_events"),
col(COL_T).min().alias("first_event_time"),
col(COL_T).max().alias("last_event_time"),
])
.with_columns([
(col("event_count") - col("positive_events")).alias("negative_events"),
(col("last_event_time") - col("first_event_time")).alias("temporal_span"),
])
.sort(["bin_x", "bin_y"], SortMultipleOptions::default())
.collect()
}
#[cfg_attr(unix, instrument(skip(df)))]
pub fn find_spatial_hotspots_polars(
df: LazyFrame,
grid_size: u16,
threshold_percentile: f64,
) -> PolarsResult<DataFrame> {
let grid_df = df
.with_columns([
((col(COL_X) / lit(grid_size as i64)) * lit(grid_size as i64)).alias("grid_x"),
((col(COL_Y) / lit(grid_size as i64)) * lit(grid_size as i64)).alias("grid_y"),
])
.group_by([col("grid_x"), col("grid_y")])
.agg([
len().alias("event_count"),
col(COL_T).min().alias("first_event"),
col(COL_T).max().alias("last_event"),
])
.with_columns([(col("last_event") - col("first_event")).alias("temporal_span")]);
let grid_with_threshold = grid_df.with_columns([col("event_count")
.quantile(lit(threshold_percentile / 100.0), QuantileMethod::Linear)
.alias("threshold")]);
grid_with_threshold
.filter(col("event_count").gt(col("threshold")))
.with_columns([
(col("grid_x") + lit(grid_size as i64 / 2)).alias("center_x"),
(col("grid_y") + lit(grid_size as i64 / 2)).alias("center_y"),
])
.sort(
["event_count"],
SortMultipleOptions::default().with_order_descending(true),
)
.collect()
}
#[cfg_attr(unix, instrument(skip(df)))]
pub fn split_by_spatial_grid_polars(
df: LazyFrame,
grid_width: u16,
grid_height: u16,
sensor_width: u16,
sensor_height: u16,
) -> PolarsResult<DataFrame> {
if grid_width == 0 || grid_height == 0 {
return Err(PolarsError::ComputeError(
"Grid dimensions must be positive".into(),
));
}
let cell_width = sensor_width / grid_width;
let cell_height = sensor_height / grid_height;
df.with_columns([
(col(COL_X) / lit(cell_width as i64)).alias("grid_x"),
(col(COL_Y) / lit(cell_height as i64)).alias("grid_y"),
])
.group_by([col("grid_x"), col("grid_y")])
.agg([
len().alias("event_count"),
col(COL_T).min().alias("first_event"),
col(COL_T).max().alias("last_event"),
col(COL_POLARITY).sum().alias("positive_events"),
])
.with_columns([
(col("event_count") - col("positive_events")).alias("negative_events"),
(col("last_event") - col("first_event")).alias("temporal_span"),
])
.sort(["grid_x", "grid_y"], SortMultipleOptions::default())
.collect()
}
#[cfg_attr(unix, instrument(skip(df)))]
pub fn create_pixel_mask_polars(
df: LazyFrame,
_sensor_width: u16,
_sensor_height: u16,
) -> PolarsResult<DataFrame> {
df.select([col(COL_X), col(COL_Y)])
.unique(None, UniqueKeepStrategy::Any)
.with_columns([lit(true).alias("has_events")])
.collect()
}
#[cfg_attr(unix, instrument(skip(df)))]
pub fn find_spatial_clusters_polars(
df: LazyFrame,
cluster_distance: u16,
min_cluster_size: usize,
) -> PolarsResult<DataFrame> {
let grid_size = cluster_distance;
df.with_columns([
((col(COL_X) / lit(grid_size as i64)) * lit(grid_size as i64)).alias("cluster_x"),
((col(COL_Y) / lit(grid_size as i64)) * lit(grid_size as i64)).alias("cluster_y"),
])
.group_by([col("cluster_x"), col("cluster_y")])
.agg([
len().alias("cluster_size"),
col(COL_T).min().alias("first_event"),
col(COL_T).max().alias("last_event"),
col(COL_X).mean().alias("center_x"),
col(COL_Y).mean().alias("center_y"),
])
.filter(col("cluster_size").gt_eq(lit(min_cluster_size as u32)))
.sort(
["cluster_size"],
SortMultipleOptions::default().with_order_descending(true),
)
.collect()
}
pub fn filter_by_roi_df(
df: LazyFrame,
min_x: u16,
max_x: u16,
min_y: u16,
max_y: u16,
) -> PolarsResult<LazyFrame> {
let filter = SpatialFilter::roi(min_x, max_x, min_y, max_y);
filter.apply_to_dataframe(df)
}
pub fn filter_by_circular_roi_df(
df: LazyFrame,
center_x: u16,
center_y: u16,
radius: u16,
) -> PolarsResult<LazyFrame> {
let filter = SpatialFilter::circular(center_x, center_y, radius);
filter.apply_to_dataframe(df)
}
#[cfg(test)]
mod tests {
use super::*;
use crate::{events_to_dataframe, Event};
fn create_test_events() -> Events {
vec![
Event {
t: 1.0,
x: 100,
y: 200,
polarity: true,
},
Event {
t: 2.0,
x: 150,
y: 250,
polarity: false,
},
Event {
t: 3.0,
x: 200,
y: 300,
polarity: true,
},
Event {
t: 4.0,
x: 50,
y: 100,
polarity: false,
},
Event {
t: 5.0,
x: 300,
y: 400,
polarity: true,
},
]
}
#[test]
fn test_roi_creation() {
let roi = RegionOfInterest::new(100, 200, 150, 250).unwrap();
assert_eq!(roi.min_x, 100);
assert_eq!(roi.max_x, 200);
assert_eq!(roi.min_y, 150);
assert_eq!(roi.max_y, 250);
assert_eq!(roi.width(), 101);
assert_eq!(roi.height(), 101);
assert_eq!(roi.area(), 101 * 101);
assert!(RegionOfInterest::new(200, 100, 150, 250).is_err());
}
#[test]
fn test_roi_polars_filtering() -> PolarsResult<()> {
let events = create_test_events();
let df = events_to_dataframe(&events)?.lazy();
let filtered = filter_roi_polars(df, 80, 180, 180, 280)?;
let result = filtered.collect()?;
assert_eq!(result.height(), 2);
Ok(())
}
#[test]
fn test_circular_roi_polars_filtering() -> PolarsResult<()> {
let events = create_test_events();
let df = events_to_dataframe(&events)?.lazy();
let filtered = filter_circular_roi_polars(df, 150, 250, 100)?;
let result = filtered.collect()?;
assert!(result.height() >= 1);
Ok(())
}
#[test]
fn test_spatial_filter_polars() -> PolarsResult<()> {
let events = create_test_events();
let df = events_to_dataframe(&events)?.lazy();
let filter = SpatialFilter::roi(80, 180, 180, 280);
let filtered = apply_spatial_filter(df, &filter)?;
let result = filtered.collect()?;
assert_eq!(result.height(), 2);
Ok(())
}
#[test]
fn test_spatial_filter_dataframe_native() -> PolarsResult<()> {
let events = create_test_events();
let df = events_to_dataframe(&events)?.lazy();
let filter = SpatialFilter::roi(80, 180, 180, 280);
let filtered = filter.apply_to_dataframe(df)?;
let result = filtered.collect()?;
assert_eq!(result.height(), 2);
Ok(())
}
#[test]
fn test_filter_by_roi_dataframe() -> PolarsResult<()> {
let events = create_test_events();
let df = events_to_dataframe(&events)?.lazy();
let filtered = filter_by_roi_df(df, 80, 180, 180, 280)?;
let result = filtered.collect()?;
assert_eq!(result.height(), 2);
Ok(())
}
#[test]
fn test_filter_by_circular_roi_dataframe() -> PolarsResult<()> {
let events = create_test_events();
let df = events_to_dataframe(&events)?.lazy();
let filtered = filter_by_circular_roi_df(df, 150, 250, 100)?;
let result = filtered.collect()?;
assert!(result.height() >= 1);
Ok(())
}
#[test]
fn test_spatial_filter_expressions() -> PolarsResult<()> {
let filter = SpatialFilter::roi(100, 200, 150, 250);
let expr = filter.to_polars_expr()?;
assert!(expr.is_some());
Ok(())
}
#[test]
fn test_spatial_statistics() -> PolarsResult<()> {
let events = create_test_events();
let df = events_to_dataframe(&events)?.lazy();
let stats = get_spatial_statistics(df)?;
assert_eq!(stats.height(), 1);
assert!(stats.width() >= 9);
Ok(())
}
#[test]
fn test_spatial_histogram() -> PolarsResult<()> {
let events = create_test_events();
let df = events_to_dataframe(&events)?.lazy();
let histogram = create_spatial_histogram(df, 50, 50)?;
assert!(histogram.height() > 0);
assert!(histogram.column("event_count").is_ok());
Ok(())
}
#[test]
fn test_hotspot_detection() -> PolarsResult<()> {
let events = create_test_events();
let df = events_to_dataframe(&events)?.lazy();
let hotspots = find_spatial_hotspots_polars(df, 100, 50.0)?;
assert!(hotspots.height() >= 0);
Ok(())
}
#[test]
fn test_spatial_grid_splitting() -> PolarsResult<()> {
let events = create_test_events();
let df = events_to_dataframe(&events)?.lazy();
let grid = split_by_spatial_grid_polars(df, 2, 2, 640, 480)?;
assert!(grid.height() > 0);
assert!(grid.column("event_count").is_ok());
Ok(())
}
#[test]
fn test_circular_roi_expressions() {
let circular_roi = CircularROI::new(100, 100, 50);
let _expr = circular_roi.to_polars_expr();
assert!(true); }
#[test]
fn test_multiple_rois_expressions() {
let roi1 = RegionOfInterest::new(100, 150, 100, 150).unwrap();
let roi2 = RegionOfInterest::new(200, 250, 200, 250).unwrap();
let multiple_rois = MultipleROIs::new(ROICombination::Union)
.add_roi(roi1)
.add_roi(roi2);
let expr = multiple_rois.to_polars_expr();
assert!(expr.is_some());
}
#[test]
fn test_polygon_roi_creation() {
let vertices = vec![
Point::new(100, 100),
Point::new(200, 100),
Point::new(150, 200),
];
let polygon_roi = PolygonROI::new(vertices).unwrap();
assert_eq!(polygon_roi.vertices.len(), 3);
assert!(polygon_roi.bounding_box().is_some());
}
#[test]
fn test_legacy_compatibility() {
let events = create_test_events();
let filtered = filter_by_roi(&events, 80, 180, 180, 280).unwrap();
assert_eq!(filtered.len(), 2);
let circular_filtered = filter_by_circular_roi(&events, 150, 250, 100).unwrap();
assert!(circular_filtered.len() >= 1);
}
#[test]
fn test_filter_validation() {
let filter = SpatialFilter::roi(100, 200, 150, 250);
assert!(filter.validate().is_ok());
let mut included = HashSet::new();
included.insert((100, 200));
let mut excluded = HashSet::new();
excluded.insert((100, 200));
let filter = SpatialFilter::default()
.with_included_pixels(included)
.with_excluded_pixels(excluded);
assert!(filter.validate().is_ok());
}
#[test]
fn test_empty_events() -> PolarsResult<()> {
let events = Vec::new();
let df = events_to_dataframe(&events)?.lazy();
let filter = SpatialFilter::roi(100, 200, 150, 250);
let filtered = apply_spatial_filter(df, &filter)?;
let result = filtered.collect()?;
assert_eq!(result.height(), 0);
Ok(())
}
#[test]
fn test_pixel_mask_creation_polars() -> PolarsResult<()> {
let events = create_test_events();
let df = events_to_dataframe(&events)?.lazy();
let mask = create_pixel_mask_polars(df, 640, 480)?;
assert_eq!(mask.height(), 5); assert!(mask.column("has_events").is_ok());
Ok(())
}
#[test]
fn test_spatial_clustering_polars() -> PolarsResult<()> {
let events = create_test_events();
let df = events_to_dataframe(&events)?.lazy();
let clusters = find_spatial_clusters_polars(df, 50, 1)?;
assert!(clusters.height() > 0);
assert!(clusters.column("cluster_size").is_ok());
Ok(())
}
}