use axonml_autograd::Variable;
use axonml_tensor::Tensor;
#[derive(Debug, Clone)]
pub struct EventConfig {
pub threshold: f32,
pub cell_size: usize,
pub dilation: usize,
pub min_density: f32,
}
impl Default for EventConfig {
fn default() -> Self {
Self {
threshold: 15.0,
cell_size: 16,
dilation: 1,
min_density: 0.05,
}
}
}
pub struct EventEncoder {
config: EventConfig,
prev_gray: Option<Vec<f32>>,
prev_width: usize,
prev_height: usize,
}
impl EventEncoder {
pub fn new() -> Self {
Self {
config: EventConfig::default(),
prev_gray: None,
prev_width: 0,
prev_height: 0,
}
}
pub fn with_config(config: EventConfig) -> Self {
Self {
config,
prev_gray: None,
prev_width: 0,
prev_height: 0,
}
}
pub fn reset(&mut self) {
self.prev_gray = None;
}
pub fn is_cold_start(&self) -> bool {
self.prev_gray.is_none()
}
pub fn encode(&mut self, frame: &Variable) -> (Variable, Vec<(usize, usize)>, Variable) {
let shape = frame.shape();
let (batch, _c, h, w) = (shape[0], shape[1], shape[2], shape[3]);
let data = frame.data().to_vec();
let mut gray = vec![0.0f32; batch * h * w];
for b in 0..batch {
for y in 0..h {
for x in 0..w {
let r = data[b * 3 * h * w + 0 * h * w + y * w + x];
let g = data[b * 3 * h * w + h * w + y * w + x];
let b_val = data[b * 3 * h * w + 2 * h * w + y * w + x];
gray[b * h * w + y * w + x] = 0.299 * r + 0.587 * g + 0.114 * b_val;
}
}
}
let mut event_mag = vec![0.0f32; batch * h * w];
if let Some(ref prev) = self.prev_gray {
if self.prev_width == w && self.prev_height == h {
let thresh = self.config.threshold / 255.0 * 4.0;
for i in 0..batch * h * w {
let idx = i % (h * w); let diff = (gray[i] - prev[idx]).abs();
event_mag[i] = if diff > thresh { diff } else { 0.0 };
}
}
}
let cell = self.config.cell_size;
let cell_h = h.div_ceil(cell);
let cell_w = w.div_ceil(cell);
let mut density = vec![0.0f32; batch * cell_h * cell_w];
for b in 0..batch {
for cy in 0..cell_h {
for cx in 0..cell_w {
let mut count = 0.0f32;
let mut total = 0.0f32;
let y_start = cy * cell;
let y_end = ((cy + 1) * cell).min(h);
let x_start = cx * cell;
let x_end = ((cx + 1) * cell).min(w);
for y in y_start..y_end {
for x in x_start..x_end {
total += 1.0;
if event_mag[b * h * w + y * w + x] > 0.0 {
count += 1.0;
}
}
}
density[b * cell_h * cell_w + cy * cell_w + cx] =
if total > 0.0 { count / total } else { 0.0 };
}
}
}
let mut active_mask = vec![false; cell_h * cell_w];
for cy in 0..cell_h {
for cx in 0..cell_w {
if density[cy * cell_w + cx] >= self.config.min_density {
let d = self.config.dilation;
let y_lo = cy.saturating_sub(d);
let y_hi = (cy + d + 1).min(cell_h);
let x_lo = cx.saturating_sub(d);
let x_hi = (cx + d + 1).min(cell_w);
for dy in y_lo..y_hi {
for dx in x_lo..x_hi {
active_mask[dy * cell_w + dx] = true;
}
}
}
}
}
let active_cells: Vec<(usize, usize)> = active_mask
.iter()
.enumerate()
.filter(|&(_, &active)| active)
.map(|(i, _)| (i / cell_w, i % cell_w))
.collect();
self.prev_gray = Some(gray[..h * w].to_vec());
self.prev_width = w;
self.prev_height = h;
let density_tensor = Tensor::from_vec(density, &[batch, 1, cell_h, cell_w]).unwrap();
let event_tensor = Tensor::from_vec(event_mag, &[batch, 1, h, w]).unwrap();
(
Variable::new(density_tensor, false),
active_cells,
Variable::new(event_tensor, false),
)
}
pub fn active_ratio(&self, total_cells: usize, active_count: usize) -> f32 {
if total_cells > 0 {
active_count as f32 / total_cells as f32
} else {
0.0
}
}
}
impl Default for EventEncoder {
fn default() -> Self {
Self::new()
}
}
#[cfg(test)]
mod tests {
use super::*;
fn make_frame(h: usize, w: usize, val: f32) -> Variable {
let data = vec![val; 3 * h * w];
Variable::new(Tensor::from_vec(data, &[1, 3, h, w]).unwrap(), false)
}
#[test]
fn test_event_encoder_cold_start() {
let mut enc = EventEncoder::new();
assert!(enc.is_cold_start());
let frame = make_frame(32, 32, 0.5);
let (density, active, events) = enc.encode(&frame);
assert!(!enc.is_cold_start());
assert_eq!(density.shape()[2], 2); assert_eq!(density.shape()[3], 2);
assert!(active.is_empty()); assert_eq!(events.shape(), vec![1, 1, 32, 32]);
}
#[test]
fn test_identical_frames_zero_events() {
let mut enc = EventEncoder::new();
let frame = make_frame(32, 32, 0.5);
enc.encode(&frame);
let (_density, active, events) = enc.encode(&frame);
assert!(active.is_empty());
let event_data = events.data().to_vec();
assert!(event_data.iter().all(|&v| v == 0.0));
}
#[test]
fn test_changed_frames_produce_events() {
let mut enc = EventEncoder::new();
let frame1 = make_frame(32, 32, 0.0);
let frame2 = make_frame(32, 32, 2.0);
enc.encode(&frame1);
let (_density, active, events) = enc.encode(&frame2);
assert!(!active.is_empty());
let event_data = events.data().to_vec();
assert!(event_data.iter().any(|&v| v > 0.0));
}
#[test]
fn test_partial_change_sparse_events() {
let mut enc = EventEncoder::with_config(EventConfig {
cell_size: 16,
dilation: 0,
..EventConfig::default()
});
let h = 32;
let w = 32;
let frame1 = make_frame(h, w, 0.0);
enc.encode(&frame1);
let mut data2 = vec![0.0f32; 3 * h * w];
for c in 0..3 {
for y in 0..h / 2 {
for x in 0..w / 2 {
data2[c * h * w + y * w + x] = 2.0;
}
}
}
let frame2 = Variable::new(Tensor::from_vec(data2, &[1, 3, h, w]).unwrap(), false);
let (_density, active, _events) = enc.encode(&frame2);
assert!(!active.is_empty());
assert!(active.contains(&(0, 0)));
let cell_h = h / 16;
let cell_w = w / 16;
assert!(!active.contains(&(cell_h - 1, cell_w - 1)));
}
#[test]
fn test_event_encoder_reset() {
let mut enc = EventEncoder::new();
enc.encode(&make_frame(16, 16, 1.0));
assert!(!enc.is_cold_start());
enc.reset();
assert!(enc.is_cold_start());
}
#[test]
fn test_active_ratio() {
let enc = EventEncoder::new();
assert!((enc.active_ratio(100, 25) - 0.25).abs() < 1e-5);
assert_eq!(enc.active_ratio(0, 0), 0.0);
}
}