use crate::ev_formats::EventFormat;
use polars::prelude::*;
#[cfg(unix)]
use tracing::debug;
#[cfg(not(unix))]
macro_rules! debug {
($($args:tt)*) => {};
}
#[cfg(not(unix))]
macro_rules! info {
($($args:tt)*) => {};
}
pub fn convert_timestamp(timestamp: f64) -> i64 {
if timestamp >= 1_000_000_000.0 {
(timestamp / 1_000.0) as i64
} else if timestamp >= 1_000.0 {
timestamp as i64
} else {
(timestamp * 1_000_000.0) as i64
}
}
pub struct EventDataFrameBuilder {
x_builder: PrimitiveChunkedBuilder<Int16Type>,
y_builder: PrimitiveChunkedBuilder<Int16Type>,
timestamp_builder: PrimitiveChunkedBuilder<Int64Type>,
polarity_builder: PrimitiveChunkedBuilder<Int8Type>,
format: EventFormat,
event_count: usize,
}
impl EventDataFrameBuilder {
pub fn new(format: EventFormat, estimated_capacity: usize) -> Self {
Self {
x_builder: PrimitiveChunkedBuilder::<Int16Type>::new("x".into(), estimated_capacity),
y_builder: PrimitiveChunkedBuilder::<Int16Type>::new("y".into(), estimated_capacity),
timestamp_builder: PrimitiveChunkedBuilder::<Int64Type>::new(
"t".into(),
estimated_capacity,
),
polarity_builder: PrimitiveChunkedBuilder::<Int8Type>::new(
"polarity".into(),
estimated_capacity,
),
format,
event_count: 0,
}
}
pub fn add_event(&mut self, x: u16, y: u16, timestamp: f64, polarity: bool) {
self.x_builder.append_value(x as i16);
self.y_builder.append_value(y as i16);
self.timestamp_builder
.append_value(convert_timestamp(timestamp));
self.polarity_builder
.append_value(if polarity { 1i8 } else { 0i8 });
self.event_count += 1;
}
pub fn add_events_batch(&mut self, events: &[(u16, u16, f64, bool)]) {
for &(x, y, timestamp, polarity) in events {
self.add_event(x, y, timestamp, polarity);
}
}
pub fn len(&self) -> usize {
self.event_count
}
pub fn is_empty(&self) -> bool {
self.event_count == 0
}
pub fn build(self) -> PolarsResult<DataFrame> {
if self.event_count == 0 {
let empty_x = Series::new("x".into(), Vec::<i16>::new());
let empty_y = Series::new("y".into(), Vec::<i16>::new());
let empty_timestamp = Series::new("t".into(), Vec::<i64>::new())
.cast(&DataType::Duration(TimeUnit::Microseconds))?;
let empty_polarity = Series::new("polarity".into(), Vec::<i8>::new());
return DataFrame::new(vec![
empty_x.into(),
empty_y.into(),
empty_timestamp.into(),
empty_polarity.into(),
]);
}
let x_series = self.x_builder.finish().into_series();
let y_series = self.y_builder.finish().into_series();
let polarity_series_raw = self.polarity_builder.finish().into_series();
let timestamp_series = self
.timestamp_builder
.finish()
.into_series()
.cast(&DataType::Duration(TimeUnit::Microseconds))?;
let df = DataFrame::new(vec![
x_series.into(),
y_series.into(),
timestamp_series.into(),
polarity_series_raw.into(),
])?;
let df = match self.format {
EventFormat::EVT2 | EventFormat::EVT21 | EventFormat::EVT3 => {
df.lazy()
.with_column(
when(col("polarity").eq(lit(0)))
.then(lit(-1i8))
.otherwise(lit(1i8))
.alias("polarity")
.cast(DataType::Int8),
)
.collect()?
}
#[cfg(not(windows))]
EventFormat::HDF5 => {
df.lazy()
.with_column(
when(col("polarity").eq(lit(0)))
.then(lit(-1i8))
.otherwise(lit(1i8))
.alias("polarity")
.cast(DataType::Int8),
)
.collect()?
}
#[cfg(windows)]
EventFormat::HDF5 => {
return Err(PolarsError::ComputeError(
"HDF5 support is disabled on Windows due to build complexity.".into(),
));
}
_ => {
df.lazy()
.with_column(col("polarity").cast(DataType::Int8))
.collect()?
}
};
debug!(events = self.event_count, format = ?self.format, "Built DataFrame directly");
Ok(df)
}
}
pub fn create_empty_events_dataframe() -> PolarsResult<DataFrame> {
let empty_x = Series::new("x".into(), Vec::<i16>::new());
let empty_y = Series::new("y".into(), Vec::<i16>::new());
let empty_timestamp = Series::new("t".into(), Vec::<i64>::new())
.cast(&DataType::Duration(TimeUnit::Microseconds))?;
let empty_polarity = Series::new("polarity".into(), Vec::<i8>::new());
DataFrame::new(vec![
empty_x.into(),
empty_y.into(),
empty_timestamp.into(),
empty_polarity.into(),
])
}
pub struct EventDataFrameStreamer {
builder: EventDataFrameBuilder,
chunk_size: usize,
total_events: usize,
}
impl EventDataFrameStreamer {
pub fn new(format: EventFormat, chunk_size: usize) -> Self {
Self {
builder: EventDataFrameBuilder::new(format, chunk_size),
chunk_size,
total_events: 0,
}
}
pub fn add_event(
&mut self,
x: u16,
y: u16,
timestamp: f64,
polarity: bool,
) -> PolarsResult<Option<DataFrame>> {
self.builder.add_event(x, y, timestamp, polarity);
self.total_events += 1;
if self.builder.len() >= self.chunk_size {
let df = self.flush()?;
Ok(Some(df))
} else {
Ok(None)
}
}
pub fn flush(&mut self) -> PolarsResult<DataFrame> {
if self.builder.is_empty() {
return create_empty_events_dataframe();
}
let format = self.builder.format;
let old_builder = std::mem::replace(
&mut self.builder,
EventDataFrameBuilder::new(format, self.chunk_size),
);
old_builder.build()
}
pub fn total_events(&self) -> usize {
self.total_events
}
}
pub fn calculate_optimal_chunk_size(file_size: u64, available_memory_bytes: usize) -> usize {
let target_memory_usage = available_memory_bytes / 4;
let estimated_event_size = 16;
let memory_based_chunk = target_memory_usage / estimated_event_size;
let file_based_chunk = if file_size < 10_000_000 {
100_000 } else if file_size < 100_000_000 {
500_000 } else {
2_000_000 };
memory_based_chunk
.min(file_based_chunk)
.clamp(100_000, 5_000_000)
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn test_dataframe_builder() {
let mut builder = EventDataFrameBuilder::new(EventFormat::Text, 10);
builder.add_event(100, 200, 1.5, true);
builder.add_event(150, 250, 2.0, false);
builder.add_event(200, 300, 2.5, true);
assert_eq!(builder.len(), 3);
let df = builder.build().unwrap();
assert_eq!(df.height(), 3);
assert_eq!(df.width(), 4);
let columns = df.get_column_names();
let column_names: Vec<String> = columns.iter().map(|s| s.to_string()).collect();
assert!(column_names.contains(&"x".to_string()));
assert!(column_names.contains(&"y".to_string()));
assert!(column_names.contains(&"t".to_string()));
assert!(column_names.contains(&"polarity".to_string()));
}
#[test]
fn test_chunk_size_calculation() {
let chunk = calculate_optimal_chunk_size(1_000_000, 1_000_000_000); assert!(chunk >= 100_000 && chunk <= 5_000_000);
let chunk = calculate_optimal_chunk_size(1_000_000_000, 1_000_000_000); assert!(chunk >= 100_000 && chunk <= 5_000_000);
}
#[test]
fn test_timestamp_conversion() {
assert_eq!(convert_timestamp(1.5), 1_500_000);
assert_eq!(convert_timestamp(1_500_000.0), 1_500_000);
assert_eq!(convert_timestamp(1_500_000_000.0), 1_500_000);
}
}