use std::error::Error;
use std::fmt;
#[cfg(unix)]
use tracing::warn;
#[cfg(not(unix))]
macro_rules! warn {
($($args:tt)*) => {
eprintln!("[WARN] {}", format!($($args)*))
};
}
use crate::ev_filtering::{
DenoiseFilter, DownsamplingFilter, DropPixelFilter, HotPixelFilter, PolarityFilter,
SpatialFilter, TemporalFilter,
};
pub type FilterResult<T> = Result<T, FilterError>;
#[derive(Debug, Clone)]
pub enum FilterError {
InvalidConfig(String),
InvalidInput(String),
OutOfMemory(String),
Timeout(String),
ProcessingError(String),
IoError(String),
MathError(String),
PolarsError(String),
ParallelError(String),
}
impl fmt::Display for FilterError {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
match self {
FilterError::InvalidConfig(msg) => write!(f, "Invalid filter configuration: {}", msg),
FilterError::InvalidInput(msg) => write!(f, "Invalid input data: {}", msg),
FilterError::OutOfMemory(msg) => write!(f, "Out of memory: {}", msg),
FilterError::Timeout(msg) => write!(f, "Operation timed out: {}", msg),
FilterError::ProcessingError(msg) => write!(f, "Processing error: {}", msg),
FilterError::IoError(msg) => write!(f, "I/O error: {}", msg),
FilterError::MathError(msg) => write!(f, "Mathematical error: {}", msg),
FilterError::PolarsError(msg) => write!(f, "Polars error: {}", msg),
FilterError::ParallelError(msg) => write!(f, "Parallel processing error: {}", msg),
}
}
}
impl Error for FilterError {}
impl From<std::io::Error> for FilterError {
fn from(error: std::io::Error) -> Self {
FilterError::IoError(error.to_string())
}
}
impl From<polars::error::PolarsError> for FilterError {
fn from(error: polars::error::PolarsError) -> Self {
FilterError::PolarsError(error.to_string())
}
}
#[derive(Debug, Clone, Default)]
pub struct FilterConfig {
pub temporal_filter: Option<TemporalFilter>,
pub spatial_filter: Option<SpatialFilter>,
pub polarity_filter: Option<PolarityFilter>,
pub hot_pixel_filter: Option<HotPixelFilter>,
pub denoise_filter: Option<DenoiseFilter>,
pub drop_pixel_filter: Option<DropPixelFilter>,
pub downsampling_filter: Option<DownsamplingFilter>,
pub ensure_temporal_order: bool,
pub timeout_seconds: f64,
pub enable_parallel: bool,
pub chunk_size: usize,
pub collect_stats: bool,
pub validate_input: bool,
pub memory_limit: usize,
}
impl FilterConfig {
pub fn new() -> Self {
Self {
temporal_filter: None,
spatial_filter: None,
polarity_filter: None,
hot_pixel_filter: None,
denoise_filter: None,
drop_pixel_filter: None,
downsampling_filter: None,
ensure_temporal_order: false,
timeout_seconds: 0.0,
enable_parallel: true,
chunk_size: 0,
collect_stats: false,
validate_input: true,
memory_limit: 0,
}
}
pub fn with_time_range(mut self, t_start: Option<f64>, t_end: Option<f64>) -> Self {
let filter = match (t_start, t_end) {
(Some(start), Some(end)) => TemporalFilter::time_window(start, end),
(Some(start), None) => TemporalFilter::from_time(start),
(None, Some(end)) => TemporalFilter::until_time(end),
(None, None) => return self, };
self.temporal_filter = Some(filter);
self
}
pub fn with_roi(mut self, x_min: u16, x_max: u16, y_min: u16, y_max: u16) -> Self {
self.spatial_filter = Some(SpatialFilter::roi(x_min, x_max, y_min, y_max));
self
}
pub fn with_polarity(mut self, polarities: Vec<i8>) -> Self {
self.polarity_filter = Some(PolarityFilter::from_values(polarities));
self
}
pub fn with_hot_pixel_removal(mut self, threshold_percentile: f64) -> Self {
self.hot_pixel_filter = Some(HotPixelFilter::percentile(threshold_percentile));
self
}
pub fn with_refractory_period(mut self, refractory_period_us: f64) -> Self {
self.denoise_filter = Some(DenoiseFilter::refractory(refractory_period_us));
self
}
pub fn with_downsampling(mut self, fraction: f64) -> Self {
self.downsampling_filter = Some(DownsamplingFilter::uniform(fraction));
self
}
pub fn with_excluded_pixels(mut self, excluded_pixels: Vec<(u16, u16)>) -> Self {
use std::collections::HashSet;
let pixel_set: HashSet<(u16, u16)> = excluded_pixels.into_iter().collect();
self.drop_pixel_filter = Some(DropPixelFilter::exclude(pixel_set));
self
}
pub fn with_temporal_filter(mut self, filter: TemporalFilter) -> Self {
self.temporal_filter = Some(filter);
self
}
pub fn with_spatial_filter(mut self, filter: SpatialFilter) -> Self {
self.spatial_filter = Some(filter);
self
}
pub fn with_polarity_filter(mut self, filter: PolarityFilter) -> Self {
self.polarity_filter = Some(filter);
self
}
pub fn with_hot_pixel_filter(mut self, filter: HotPixelFilter) -> Self {
self.hot_pixel_filter = Some(filter);
self
}
pub fn with_denoise_filter(mut self, filter: DenoiseFilter) -> Self {
self.denoise_filter = Some(filter);
self
}
pub fn with_drop_pixel_filter(mut self, filter: DropPixelFilter) -> Self {
self.drop_pixel_filter = Some(filter);
self
}
pub fn with_downsampling_filter(mut self, filter: DownsamplingFilter) -> Self {
self.downsampling_filter = Some(filter);
self
}
pub fn with_ensure_temporal_order(mut self, ensure: bool) -> Self {
self.ensure_temporal_order = ensure;
self
}
pub fn with_timeout(mut self, seconds: f64) -> Self {
self.timeout_seconds = seconds;
self
}
pub fn with_parallel_processing(mut self, enable: bool) -> Self {
self.enable_parallel = enable;
self
}
pub fn with_chunk_size(mut self, size: usize) -> Self {
self.chunk_size = size;
self
}
pub fn with_stats_collection(mut self, collect: bool) -> Self {
self.collect_stats = collect;
self
}
pub fn with_input_validation(mut self, validate: bool) -> Self {
self.validate_input = validate;
self
}
pub fn with_memory_limit(mut self, limit: usize) -> Self {
self.memory_limit = limit;
self
}
pub fn noise_removal() -> Self {
Self::new()
.with_hot_pixel_removal(99.5)
.with_refractory_period(1000.0)
.with_input_validation(true)
.with_stats_collection(true)
}
pub fn aggressive_noise_removal() -> Self {
Self::new()
.with_hot_pixel_removal(99.0)
.with_refractory_period(500.0)
.with_input_validation(true)
.with_stats_collection(true)
}
pub fn high_throughput() -> Self {
Self::new()
.with_hot_pixel_removal(98.0)
.with_refractory_period(2000.0)
.with_downsampling(0.5)
.with_parallel_processing(true)
.with_chunk_size(1_000_000)
.with_input_validation(false) }
pub fn quality_preservation() -> Self {
Self::new()
.with_hot_pixel_removal(99.9)
.with_input_validation(true)
.with_stats_collection(true)
.with_ensure_temporal_order(true)
}
pub fn debug() -> Self {
Self::new()
.with_hot_pixel_removal(99.5)
.with_refractory_period(1000.0)
.with_parallel_processing(false)
.with_chunk_size(10_000)
.with_input_validation(true)
.with_stats_collection(true)
.with_ensure_temporal_order(true)
}
pub fn has_filters(&self) -> bool {
self.temporal_filter.is_some()
|| self.spatial_filter.is_some()
|| self.polarity_filter.is_some()
|| self.hot_pixel_filter.is_some()
|| self.denoise_filter.is_some()
|| self.drop_pixel_filter.is_some()
|| self.downsampling_filter.is_some()
}
pub fn filter_count(&self) -> usize {
let mut count = 0;
if self.temporal_filter.is_some() {
count += 1;
}
if self.spatial_filter.is_some() {
count += 1;
}
if self.polarity_filter.is_some() {
count += 1;
}
if self.hot_pixel_filter.is_some() {
count += 1;
}
if self.denoise_filter.is_some() {
count += 1;
}
if self.drop_pixel_filter.is_some() {
count += 1;
}
if self.downsampling_filter.is_some() {
count += 1;
}
count
}
pub fn description(&self) -> String {
let mut parts = Vec::new();
if let Some(_filter) = &self.temporal_filter {
parts.push("Temporal(filter)".to_string());
}
if let Some(_filter) = &self.spatial_filter {
parts.push("Spatial(filter)".to_string());
}
if let Some(_filter) = &self.polarity_filter {
parts.push("Polarity(filter)".to_string());
}
if let Some(_filter) = &self.hot_pixel_filter {
parts.push("HotPixel(filter)".to_string());
}
if let Some(_filter) = &self.denoise_filter {
parts.push("Denoise(filter)".to_string());
}
if let Some(_filter) = &self.drop_pixel_filter {
parts.push("DropPixel(filter)".to_string());
}
if let Some(_filter) = &self.downsampling_filter {
parts.push("Downsample(filter)".to_string());
}
if parts.is_empty() {
"No filters configured".to_string()
} else {
format!("Filters: [{}]", parts.join(", "))
}
}
pub fn validate(&self) -> FilterResult<()> {
if self.timeout_seconds < 0.0 {
return Err(FilterError::InvalidConfig(
"Timeout cannot be negative".to_string(),
));
}
if self.chunk_size == 1 {
return Err(FilterError::InvalidConfig(
"Chunk size of 1 is inefficient, use 0 for automatic or larger value".to_string(),
));
}
if let Some(filter) = &self.temporal_filter {
filter.validate()?;
}
if let Some(filter) = &self.spatial_filter {
filter.validate()?;
}
if let Some(filter) = &self.polarity_filter {
filter.validate()?;
}
if let Some(filter) = &self.hot_pixel_filter {
filter.validate()?;
}
if let Some(filter) = &self.denoise_filter {
filter.validate()?;
}
if let Some(filter) = &self.drop_pixel_filter {
filter.validate()?;
}
if let Some(filter) = &self.downsampling_filter {
filter.validate()?;
}
if let (Some(_temporal), Some(_downsampling)) =
(&self.temporal_filter, &self.downsampling_filter)
{
warn!("Temporal filtering followed by downsampling may produce unexpected results");
}
if self.memory_limit > 0 && self.memory_limit < 1024 * 1024 {
warn!(
"Memory limit of {} bytes is very low and may cause performance issues",
self.memory_limit
);
}
if !self.enable_parallel && self.chunk_size > 1_000_000 {
warn!(
"Large chunk size ({}) with parallel processing disabled may cause memory issues",
self.chunk_size
);
}
Ok(())
}
}
#[derive(Debug, Clone)]
pub struct ProcessingConfig {
pub use_streaming: bool,
pub stream_chunk_size: usize,
pub num_threads: usize,
pub enable_simd: bool,
pub memory_target: usize,
pub progress_interval: usize,
pub verbose_logging: bool,
}
impl Default for ProcessingConfig {
fn default() -> Self {
Self {
use_streaming: true,
stream_chunk_size: 1_000_000, num_threads: 0, enable_simd: true,
memory_target: 1024 * 1024 * 1024, progress_interval: 10_000_000, verbose_logging: false,
}
}
}
impl ProcessingConfig {
pub fn new() -> Self {
Self::default()
}
pub fn with_streaming(mut self, enable: bool) -> Self {
self.use_streaming = enable;
self
}
pub fn with_chunk_size(mut self, size: usize) -> Self {
self.stream_chunk_size = size;
self
}
pub fn with_threads(mut self, num_threads: usize) -> Self {
self.num_threads = num_threads;
self
}
pub fn with_simd(mut self, enable: bool) -> Self {
self.enable_simd = enable;
self
}
pub fn with_memory_target(mut self, target: usize) -> Self {
self.memory_target = target;
self
}
pub fn with_progress_interval(mut self, interval: usize) -> Self {
self.progress_interval = interval;
self
}
pub fn with_verbose_logging(mut self, enable: bool) -> Self {
self.verbose_logging = enable;
self
}
pub fn high_performance() -> Self {
Self {
use_streaming: false, stream_chunk_size: 10_000_000, num_threads: 0, enable_simd: true,
memory_target: 4 * 1024 * 1024 * 1024, progress_interval: 50_000_000, verbose_logging: false,
}
}
pub fn memory_efficient() -> Self {
Self {
use_streaming: true,
stream_chunk_size: 100_000, num_threads: 2, enable_simd: true,
memory_target: 256 * 1024 * 1024, progress_interval: 1_000_000, verbose_logging: false,
}
}
pub fn debug() -> Self {
Self {
use_streaming: true,
stream_chunk_size: 10_000, num_threads: 1, enable_simd: false, memory_target: 100 * 1024 * 1024, progress_interval: 10_000, verbose_logging: true,
}
}
}
pub trait Validatable {
fn validate(&self) -> FilterResult<()>;
}
#[cfg(test)]
mod tests {
use super::*;
use crate::ev_filtering::{PolarityFilter, SpatialFilter, TemporalFilter};
#[test]
fn test_empty_config() {
let config = FilterConfig::new();
assert!(!config.has_filters());
assert_eq!(config.filter_count(), 0);
assert!(config.validate().is_ok());
}
#[test]
fn test_config_builder() {
let config = FilterConfig::new()
.with_temporal_filter(TemporalFilter::new(1.0, 2.0))
.with_spatial_filter(SpatialFilter::roi(0, 100, 0, 100))
.with_polarity_filter(PolarityFilter::positive_only())
.with_ensure_temporal_order(true)
.with_timeout(30.0)
.with_parallel_processing(true);
assert!(config.has_filters());
assert_eq!(config.filter_count(), 3);
assert!(config.ensure_temporal_order);
assert_eq!(config.timeout_seconds, 30.0);
assert!(config.enable_parallel);
}
#[test]
fn test_convenience_builders() {
let config = FilterConfig::new()
.with_time_range(Some(0.1), Some(0.5))
.with_roi(100, 500, 100, 400)
.with_polarity(vec![1])
.with_hot_pixel_removal(99.9)
.with_refractory_period(1000.0);
assert!(config.has_filters());
assert_eq!(config.filter_count(), 5);
assert!(config.temporal_filter.is_some());
assert!(config.spatial_filter.is_some());
assert!(config.polarity_filter.is_some());
assert!(config.hot_pixel_filter.is_some());
assert!(config.denoise_filter.is_some());
}
#[test]
fn test_preset_configurations() {
let noise_config = FilterConfig::noise_removal();
assert!(noise_config.hot_pixel_filter.is_some());
assert!(noise_config.denoise_filter.is_some());
assert!(noise_config.validate_input);
assert!(noise_config.collect_stats);
let throughput_config = FilterConfig::high_throughput();
assert!(throughput_config.hot_pixel_filter.is_some());
assert!(throughput_config.denoise_filter.is_some());
assert!(throughput_config.downsampling_filter.is_some());
assert!(throughput_config.enable_parallel);
assert!(!throughput_config.validate_input);
let quality_config = FilterConfig::quality_preservation();
assert!(quality_config.hot_pixel_filter.is_some());
assert!(quality_config.denoise_filter.is_none()); assert!(quality_config.validate_input);
assert!(quality_config.ensure_temporal_order);
let debug_config = FilterConfig::debug();
assert!(!debug_config.enable_parallel);
assert_eq!(debug_config.chunk_size, 10_000);
assert!(debug_config.validate_input);
assert!(debug_config.collect_stats);
}
#[test]
fn test_config_validation() {
let config = FilterConfig::new().with_temporal_filter(TemporalFilter::new(1.0, 2.0));
assert!(config.validate().is_ok());
let config = FilterConfig::new().with_timeout(-1.0);
assert!(config.validate().is_err());
let config = FilterConfig::new().with_chunk_size(1);
assert!(config.validate().is_err());
}
#[test]
fn test_config_description() {
let config = FilterConfig::new()
.with_temporal_filter(TemporalFilter::new(1.0, 2.0))
.with_polarity_filter(PolarityFilter::positive_only());
let description = config.description();
assert!(description.contains("Temporal"));
assert!(description.contains("Polarity"));
}
#[test]
fn test_processing_config_presets() {
let high_perf = ProcessingConfig::high_performance();
assert!(!high_perf.use_streaming);
assert!(high_perf.enable_simd);
let mem_efficient = ProcessingConfig::memory_efficient();
assert!(mem_efficient.use_streaming);
assert_eq!(mem_efficient.stream_chunk_size, 100_000);
let debug = ProcessingConfig::debug();
assert!(debug.verbose_logging);
assert_eq!(debug.num_threads, 1);
}
#[test]
fn test_processing_config_builder() {
let config = ProcessingConfig::new()
.with_streaming(false)
.with_chunk_size(500_000)
.with_threads(4)
.with_simd(true)
.with_memory_target(2 * 1024 * 1024 * 1024) .with_progress_interval(1_000_000)
.with_verbose_logging(true);
assert!(!config.use_streaming);
assert_eq!(config.stream_chunk_size, 500_000);
assert_eq!(config.num_threads, 4);
assert!(config.enable_simd);
assert_eq!(config.memory_target, 2 * 1024 * 1024 * 1024);
assert_eq!(config.progress_interval, 1_000_000);
assert!(config.verbose_logging);
}
#[test]
fn test_error_display() {
let error = FilterError::InvalidConfig("test error".to_string());
assert_eq!(
format!("{}", error),
"Invalid filter configuration: test error"
);
let error = FilterError::ProcessingError("processing failed".to_string());
assert_eq!(format!("{}", error), "Processing error: processing failed");
}
}