use crate::transcoder::event_pixel_tree::Mode::Continuous;
use crate::transcoder::source::video::SourceError::BufferEmpty;
use crate::transcoder::source::video::{
integrate_for_px, show_display, Source, SourceError, Video, VideoBuilder,
};
use crate::{Codec, DeltaT, Event, PlaneSize, SourceCamera, SourceType, TimeMode};
use aedat::events_generated::Event as DvsEvent;
use davis_edi_rs::util::reconstructor::{IterVal, ReconstructionError, Reconstructor};
use rayon::iter::ParallelIterator;
use rayon::iter::{IndexedParallelIterator, IntoParallelRefMutIterator};
use opencv::core::{Mat, CV_8U};
use opencv::prelude::*;
use bumpalo::Bump;
use ndarray::{Array3, Axis};
use rayon::iter::IntoParallelIterator;
use rayon::{current_num_threads, ThreadPool};
use std::cmp::max;
use std::error::Error;
use std::time::Instant;
use crate::framer::scale_intensity::FrameValue;
use crate::raw::stream::Error as StreamError;
use crate::transcoder::event_pixel_tree::Intensity32;
use tokio::runtime::Runtime;
pub struct Framed {}
pub struct Raw {}
#[derive(PartialEq, Eq, Clone, Copy)]
pub enum TranscoderMode {
Framed,
RawDavis,
RawDvs,
}
pub struct Davis {
reconstructor: Reconstructor,
pub(crate) input_frame_scaled: Mat,
pub(crate) video: Video,
image_8u: Mat,
thread_pool_edi: ThreadPool,
dvs_c: f64,
dvs_events_before: Option<Vec<DvsEvent>>,
dvs_events_after: Option<Vec<DvsEvent>>,
pub start_of_frame_timestamp: Option<i64>,
pub end_of_frame_timestamp: Option<i64>,
pub rt: Runtime,
pub dvs_last_timestamps: Array3<i64>,
pub dvs_last_ln_val: Array3<f64>,
optimize_adder_controller: bool,
pub mode: TranscoderMode,
pub time_mode: TimeMode,
}
unsafe impl Sync for Davis {}
impl Davis {
pub fn new(reconstructor: Reconstructor, rt: Runtime) -> Result<Self, Box<dyn Error>> {
let plane = PlaneSize::new(reconstructor.width, reconstructor.height, 1)?;
let video = Video::new(plane.clone(), Continuous)?.chunk_rows(plane.h_usize() / 4);
let thread_pool_edi = rayon::ThreadPoolBuilder::new()
.num_threads(max(current_num_threads() - 4, 1))
.build()?;
let plane = &video.state.plane;
let timestamps = vec![0_i64; video.state.plane.volume()];
let dvs_last_timestamps: Array3<i64> = Array3::from_shape_vec(
(
plane.height.into(),
plane.width.into(),
plane.channels.into(),
),
timestamps,
)?;
let timestamps = vec![0.0_f64; video.state.plane.volume()];
let dvs_last_ln_val: Array3<f64> = Array3::from_shape_vec(
(
plane.height as usize,
plane.width as usize,
plane.channels as usize,
),
timestamps,
)?;
let davis_source = Davis {
reconstructor,
input_frame_scaled: Mat::default(),
video,
image_8u: Mat::default(),
thread_pool_edi,
dvs_c: 0.15,
dvs_events_before: None,
dvs_events_after: None,
start_of_frame_timestamp: None,
end_of_frame_timestamp: None,
rt,
dvs_last_timestamps,
dvs_last_ln_val,
optimize_adder_controller: false,
mode: TranscoderMode::Framed,
time_mode: TimeMode::DeltaT,
};
Ok(davis_source)
}
pub fn optimize_adder_controller(mut self, optimize: bool) -> Self {
self.optimize_adder_controller = optimize;
self
}
pub fn mode(mut self, mode: TranscoderMode) -> Self {
self.mode = mode;
self
}
pub fn time_mode(mut self, time_mode: TimeMode) -> Self {
self.time_mode = time_mode;
self
}
#[allow(clippy::cast_sign_loss)]
pub fn integrate_dvs_events<F: Fn(i64, i64) -> bool + Send + 'static + std::marker::Sync>(
&mut self,
dvs_events: &Vec<DvsEvent>,
frame_timestamp: &i64,
event_check: F,
) -> Result<(), StreamError> {
let mut dvs_chunks: [Vec<DvsEvent>; 4] = [
Vec::with_capacity(100_000),
Vec::with_capacity(100_000),
Vec::with_capacity(100_000),
Vec::with_capacity(100_000),
];
let mut chunk_idx;
for dvs_event in dvs_events {
chunk_idx = dvs_event.y() as usize / (self.video.state.plane.h_usize() / 4);
dvs_chunks[chunk_idx].push(*dvs_event);
}
let chunk_rows = self.video.state.chunk_rows;
let big_buffer: Vec<Vec<Event>> = self
.video
.event_pixel_trees
.axis_chunks_iter_mut(Axis(0), chunk_rows)
.into_par_iter()
.zip(
self.dvs_last_ln_val
.axis_chunks_iter_mut(Axis(0), chunk_rows)
.into_par_iter()
.zip(
self.dvs_last_timestamps
.axis_chunks_iter_mut(Axis(0), chunk_rows)
.into_par_iter(),
),
)
.enumerate()
.map(
|(
chunk_idx,
(mut px_chunk, (mut dvs_last_ln_val_chunk, mut dvs_last_timestamps_chunk)),
)| {
let mut buffer: Vec<Event> = Vec::with_capacity(100_000);
for event in &dvs_chunks[chunk_idx] {
if event_check(event.t(), *frame_timestamp) {
let px = &mut px_chunk
[[(event.y() as usize) % chunk_rows, event.x() as usize, 0]];
let base_val = px.base_val;
let last_val_ln = &mut dvs_last_ln_val_chunk
[[(event.y() as usize) % chunk_rows, event.x() as usize, 0]];
let last_val = (last_val_ln.exp() - 1.0) * 255.0;
let delta_t_micro = event.t()
- dvs_last_timestamps_chunk
[[event.y() as usize % chunk_rows, event.x() as usize, 0]];
if delta_t_micro == event.t() {
continue;
}
let ticks_per_micro = self.video.state.tps as f32 / 1e6;
let delta_t_ticks = delta_t_micro as f32 * ticks_per_micro;
if delta_t_ticks <= 0.0 {
continue; }
assert!(delta_t_ticks > 0.0);
let first_integration = ((last_val as Intensity32)
/ self.video.state.ref_time as f32
* delta_t_ticks)
.max(0.0);
if px.need_to_pop_top {
buffer.push(px.pop_top_event(
first_integration,
Continuous,
self.video.state.ref_time,
));
}
px.integrate(
first_integration,
delta_t_ticks.into(),
Continuous,
self.video.state.delta_t_max,
self.video.state.ref_time,
);
if px.need_to_pop_top {
buffer.push(px.pop_top_event(
first_integration,
Continuous,
self.video.state.ref_time,
));
}
*last_val_ln += if event.on() { self.dvs_c } else { -self.dvs_c };
let mut frame_val = (last_val_ln.exp() - 1.0) * 255.0;
clamp_u8(&mut frame_val, last_val_ln);
let frame_val_u8 = frame_val as u8;
if frame_val_u8 < base_val.saturating_sub(self.video.state.c_thresh_neg)
|| frame_val_u8
> base_val.saturating_add(self.video.state.c_thresh_pos)
{
px.pop_best_events(
&mut buffer,
Continuous,
self.video.state.ref_time,
);
px.base_val = frame_val_u8;
match px.set_d_for_continuous(frame_val as Intensity32) {
None => {}
Some(event) => buffer.push(event),
};
}
dvs_last_timestamps_chunk
[[event.y() as usize % chunk_rows, event.x() as usize, 0]] =
event.t();
}
}
buffer
},
)
.collect();
if self.video.state.write_out {
self.video.stream.encode_events_events(&big_buffer)?;
}
Ok(())
}
#[allow(clippy::cast_possible_truncation)]
fn integrate_frame_gaps(&mut self) -> Result<(), SourceError> {
let px_per_chunk: usize = self.video.state.chunk_rows * self.video.state.plane.area_wc();
let start_of_frame_timestamp = match self.start_of_frame_timestamp {
Some(t) => t,
None => return Err(SourceError::UninitializedData),
};
let big_buffer: Vec<Vec<Event>> = self
.video
.event_pixel_trees
.axis_chunks_iter_mut(Axis(0), self.video.state.chunk_rows)
.into_par_iter()
.zip(
self.dvs_last_ln_val
.axis_chunks_iter_mut(Axis(0), self.video.state.chunk_rows)
.into_par_iter(),
)
.enumerate()
.map(|(chunk_idx, (mut chunk_px, mut chunk_ln_val))| {
let mut buffer: Vec<Event> = Vec::with_capacity(px_per_chunk);
let bump = Bump::new();
let base_val = bump.alloc(0);
let px_idx = bump.alloc(0);
let frame_val = bump.alloc(0);
for (chunk_px_idx, (px, last_val_ln)) in
chunk_px.iter_mut().zip(chunk_ln_val.iter_mut()).enumerate()
{
*px_idx = chunk_px_idx + px_per_chunk * chunk_idx;
let last_val = (last_val_ln.exp() - 1.0) * 255.0;
*base_val = px.base_val;
*frame_val = last_val as u8;
let ticks_per_micro = self.video.state.tps as f32 / 1e6;
let delta_t_micro = start_of_frame_timestamp
- self.dvs_last_timestamps[[px.coord.y as usize, px.coord.x as usize, 0]];
if delta_t_micro == start_of_frame_timestamp {
continue;
}
let delta_t_ticks = delta_t_micro as f32 * ticks_per_micro;
if delta_t_ticks <= 0.0 {
continue;
}
assert!(delta_t_ticks > 0.0);
let integration = ((last_val / f64::from(self.video.state.ref_time))
* f64::from(delta_t_ticks))
.max(0.0);
assert!(integration >= 0.0);
integrate_for_px(
px,
base_val,
frame_val,
integration as f32,
delta_t_ticks,
&mut buffer,
&self.video.state,
);
if px.need_to_pop_top {
buffer.push(px.pop_top_event(
integration as f32,
self.video.state.pixel_tree_mode,
self.video.state.ref_time,
));
}
}
buffer
})
.collect();
if self.video.state.write_out {
self.video.stream.encode_events_events(&big_buffer)?;
}
let db = match self.video.instantaneous_frame.data_bytes_mut() {
Ok(db) => db,
Err(e) => return Err(SourceError::OpencvError(e)),
};
let practical_d_max = fast_math::log2_raw(
255.0 * (self.video.state.delta_t_max / self.video.state.ref_time) as f32,
);
db.par_iter_mut().enumerate().for_each(|(idx, val)| {
let y = idx / self.video.state.plane.area_wc();
let x = (idx % self.video.state.plane.area_wc()) / self.video.state.plane.c_usize();
let c = idx % self.video.state.plane.c_usize();
*val = match self.video.event_pixel_trees[[y, x, c]].arena[0].best_event {
Some(event) => u8::get_frame_value(
&event.into(),
SourceType::U8,
self.video.state.ref_time as DeltaT,
practical_d_max,
self.video.state.delta_t_max,
self.video.instantaneous_view_mode,
),
None => *val,
};
});
if self.video.state.show_live {
show_display("instance", &self.video.instantaneous_frame, 1, &self.video)?;
}
Ok(())
}
#[allow(clippy::cast_precision_loss)]
fn control_latency(&mut self, opt_timestamp: Option<Instant>) {
if self.optimize_adder_controller {
match opt_timestamp {
None => {}
Some(timestamp) => {
let latency = timestamp.elapsed().as_millis();
if latency as f64 >= self.reconstructor.target_latency * 3.0 {
self.video.state.c_thresh_pos =
self.video.state.c_thresh_pos.saturating_add(1);
self.video.state.c_thresh_neg =
self.video.state.c_thresh_neg.saturating_add(1);
} else {
self.video.state.c_thresh_pos =
self.video.state.c_thresh_pos.saturating_sub(1);
self.video.state.c_thresh_neg =
self.video.state.c_thresh_neg.saturating_sub(1);
}
eprintln!(
" adder latency = {}, adder c = {}",
latency, self.video.state.c_thresh_pos
);
}
}
}
}
pub fn get_reconstructor(&self) -> &Reconstructor {
&self.reconstructor
}
pub fn get_reconstructor_mut(&mut self) -> &mut Reconstructor {
&mut self.reconstructor
}
}
impl Source for Davis {
fn consume(
&mut self,
view_interval: u32,
thread_pool: &ThreadPool,
) -> std::result::Result<Vec<Vec<Event>>, SourceError> {
let with_events = match self.mode {
TranscoderMode::Framed => false,
TranscoderMode::RawDavis | TranscoderMode::RawDvs => true,
};
let mat_opt = self.rt.block_on(get_next_image(
&mut self.reconstructor,
&self.thread_pool_edi,
with_events,
));
match mat_opt {
Ok(None) => {
println!("Popping remaining events");
let px_per_chunk: usize =
self.video.state.chunk_rows * self.video.state.plane.area_wc();
let big_buffer: Vec<Vec<Event>> = self
.video
.event_pixel_trees
.axis_chunks_iter_mut(Axis(0), self.video.state.chunk_rows)
.into_par_iter()
.enumerate()
.map(|(_chunk_idx, mut chunk)| {
let mut buffer: Vec<Event> = Vec::with_capacity(px_per_chunk);
for (_, px) in chunk.iter_mut().enumerate() {
px.pop_best_events(
&mut buffer,
self.video.state.pixel_tree_mode,
self.video.state.ref_time,
);
}
buffer
})
.collect();
if self.video.state.write_out {
self.video.stream.encode_events_events(&big_buffer)?;
}
return Err(SourceError::NoData);
}
Ok(Some((
mat,
opt_timestamp,
Some((c, events_before, events_after, img_start_ts, img_end_ts)),
))) => {
self.control_latency(opt_timestamp);
self.input_frame_scaled = mat;
self.dvs_c = c;
self.dvs_events_before = Some(events_before);
self.dvs_events_after = Some(events_after);
self.start_of_frame_timestamp = Some(img_start_ts);
self.end_of_frame_timestamp = Some(img_end_ts);
self.video.state.ref_time_divisor =
(img_end_ts - img_start_ts) as f64 / f64::from(self.video.state.ref_time);
}
Ok(Some((mat, opt_timestamp, None))) => {
self.control_latency(opt_timestamp);
self.input_frame_scaled = mat;
}
Err(e) => return Err(SourceError::EdiError(e)),
}
let start_of_frame_timestamp = match self.start_of_frame_timestamp {
Some(t) => t,
None => return Err(SourceError::UninitializedData),
};
let end_of_frame_timestamp = match self.end_of_frame_timestamp {
Some(t) => t,
None => return Err(SourceError::UninitializedData),
};
if with_events {
if self.video.state.in_interval_count == 0 {
self.dvs_last_timestamps.par_map_inplace(|ts| {
*ts = start_of_frame_timestamp;
});
} else {
let dvs_events_before = match &self.dvs_events_before {
Some(events) => events.clone(),
None => return Err(SourceError::UninitializedData),
};
self.integrate_dvs_events(
&dvs_events_before,
&start_of_frame_timestamp,
check_dvs_before,
)?;
self.integrate_frame_gaps()?;
}
}
if self.input_frame_scaled.empty() {
return Err(BufferEmpty);
}
match self
.input_frame_scaled
.convert_to(&mut self.image_8u, CV_8U, 255.0, 0.0)
{
Ok(_) => {}
Err(e) => {
return Err(SourceError::OpencvError(e));
}
}
let mut tmp = self.image_8u.clone();
let mat_integration_time = match self.mode {
TranscoderMode::Framed => self.video.state.ref_time as f32,
TranscoderMode::RawDavis => (end_of_frame_timestamp - start_of_frame_timestamp) as f32,
TranscoderMode::RawDvs => {
self.dvs_c = 0.15;
match tmp.data_bytes_mut() {
Ok(bytes) => {
for byte in bytes {
*byte = 0;
}
}
Err(e) => {
return Err(SourceError::OpencvError(e));
}
}
0.0
}
};
let ret = thread_pool.install(|| {
self.video
.integrate_matrix(tmp, mat_integration_time, view_interval)
});
#[allow(clippy::cast_possible_wrap, clippy::cast_possible_truncation)]
unsafe {
for (idx, val) in self.dvs_last_ln_val.iter_mut().enumerate() {
let px = match self.input_frame_scaled.at_unchecked::<f64>(idx as i32) {
Ok(px) => px,
Err(e) => {
return Err(SourceError::OpencvError(e));
}
};
match self.mode {
TranscoderMode::RawDavis | TranscoderMode::Framed => {
*val = px.ln_1p();
}
TranscoderMode::RawDvs => {
*val = 0.5_f64.ln_1p();
}
}
}
}
if with_events {
let dvs_events_after = match &self.dvs_events_after {
Some(events) => events.clone(),
None => return Err(SourceError::UninitializedData),
};
self.dvs_last_timestamps.par_map_inplace(|ts| {
*ts = end_of_frame_timestamp;
});
self.integrate_dvs_events(&dvs_events_after, &end_of_frame_timestamp, check_dvs_after)?;
}
ret
}
fn get_video_mut(&mut self) -> &mut Video {
&mut self.video
}
fn get_video_ref(&self) -> &Video {
&self.video
}
fn get_video(self) -> Video {
self.video
}
}
impl VideoBuilder for Davis {
fn contrast_thresholds(mut self, c_thresh_pos: u8, c_thresh_neg: u8) -> Self {
self.video = self.video.c_thresh_pos(c_thresh_pos);
self.video = self.video.c_thresh_neg(c_thresh_neg);
self
}
fn c_thresh_pos(mut self, c_thresh_pos: u8) -> Self {
self.video = self.video.c_thresh_pos(c_thresh_pos);
self
}
fn c_thresh_neg(mut self, c_thresh_neg: u8) -> Self {
self.video = self.video.c_thresh_neg(c_thresh_neg);
self
}
fn chunk_rows(mut self, chunk_rows: usize) -> Self {
self.video = self.video.chunk_rows(chunk_rows);
self
}
fn time_parameters(
mut self,
tps: crate::transcoder::event_pixel_tree::DeltaT,
ref_time: crate::transcoder::event_pixel_tree::DeltaT,
delta_t_max: crate::transcoder::event_pixel_tree::DeltaT,
) -> Result<Self, Box<dyn Error>> {
self.video = self.video.time_parameters(tps, ref_time, delta_t_max)?;
Ok(self)
}
fn write_out(
mut self,
output_filename: String,
source_camera: SourceCamera,
time_mode: TimeMode,
) -> Result<Box<Self>, Box<dyn Error>> {
self.video = self
.video
.write_out(output_filename, Some(source_camera), Some(time_mode))?;
Ok(Box::new(self))
}
fn show_display(mut self, show_display: bool) -> Self {
self.video = self.video.show_display(show_display);
self
}
}
fn check_dvs_before(dvs_event_t: i64, timestamp_before: i64) -> bool {
dvs_event_t < timestamp_before
}
fn check_dvs_after(dvs_event_t: i64, timestamp_after: i64) -> bool {
dvs_event_t > timestamp_after
}
fn clamp_u8(frame_val: &mut f64, last_val_ln: &mut f64) {
if *frame_val <= 0.0 {
*frame_val = 0.0;
*last_val_ln = 0.0; } else if *frame_val > 255.0 {
*frame_val = 255.0;
*last_val_ln = 255.0_f64.ln_1p();
}
}
pub async fn get_next_image(
reconstructor: &mut Reconstructor,
thread_pool: &ThreadPool,
with_events: bool,
) -> Result<Option<IterVal>, ReconstructionError> {
thread_pool
.install(|| async {
match reconstructor.next(with_events).await {
None => {
println!("\nFinished!");
Ok(None)
}
Some(res) => match res {
Ok(a) => Ok(Some(a)),
Err(e) => Err(e),
},
}
})
.await
}