direct_neural_biasing/processing/
signal_processor.rs1use super::detectors::DetectorInstance;
2use super::filters::FilterInstance;
3use super::triggers::TriggerInstance;
4
5use serde::{Deserialize, Serialize};
6use std::collections::HashMap;
7use colored::Colorize;
10use std::time::{Duration, Instant, SystemTime, UNIX_EPOCH};
11
12use crate::utils::log::log_to_file; use std::sync::mpsc::{self, Receiver, Sender};
15use std::thread;
16use crate::config::load_config;
20
21#[derive(Debug, Serialize, Deserialize, Clone)]
29pub struct SignalProcessorConfig {
30 pub verbose: bool,
31 pub fs: f64,
32 pub channel: usize,
33 pub enable_debug_logging: bool, }
35
36pub struct SignalProcessor {
37 pub index: usize,
38 pub filters: HashMap<String, Box<dyn FilterInstance>>,
39 pub detectors: HashMap<String, Box<dyn DetectorInstance>>,
40 pub triggers: HashMap<String, Box<dyn TriggerInstance>>,
41 pub processor_config: SignalProcessorConfig, pub results: HashMap<&'static str, f64>,
43 pub keys: Keys, log_sender: Option<Sender<(HashMap<&'static str, f64>, String)>>, }
46
47pub struct Keys {
49 global_index: &'static str,
50 global_raw_sample: &'static str,
51 global_channel: &'static str,
52 global_timestamp_ms: &'static str,
53}
54
55impl SignalProcessor {
56 pub fn from_config_file(config_path: &str) -> Result<Self, String> {
58 let config = load_config(config_path)?;
60
61 let log_sender = if config.processor.enable_debug_logging {
63 let (tx, rx) = mpsc::channel();
65 SignalProcessor::setup_logging_thread(rx);
67 Some(tx)
68 } else {
69 None
70 };
71
72 let mut processor = SignalProcessor {
73 index: 0,
74 filters: HashMap::new(),
75 detectors: HashMap::new(),
76 triggers: HashMap::new(),
77 processor_config: config.processor.clone(), results: HashMap::with_capacity(32),
79 keys: Keys {
80 global_index: "global:index",
82 global_raw_sample: "global:raw_sample",
83 global_channel: "global:channel",
84 global_timestamp_ms: "global:timestamp_ms",
85 },
86 log_sender,
87 };
88 for filter_config in &config.filters.bandpass_filters {
93 let filter = super::filters::bandpass::BandPassFilter::new(
94 super::filters::bandpass::BandPassFilterConfig {
95 id: filter_config.id.clone(),
96 f_low: filter_config.f_low,
97 f_high: filter_config.f_high,
98 },
99 processor.processor_config.fs,
100 );
101 processor.add_filter(Box::new(filter));
102 }
103
104 for detector_config in &config.detectors.wave_peak_detectors {
106 let wave_detector = super::detectors::wave_peak::WavePeakDetector::new(
107 super::detectors::wave_peak::WavePeakDetectorConfig {
108 id: detector_config.id.clone(),
109 filter_id: detector_config.filter_id.clone(),
110 z_score_threshold: detector_config.z_score_threshold,
111 sinusoidness_threshold: detector_config.sinusoidness_threshold,
112 check_sinusoidness: detector_config.check_sinusoidness,
113 wave_polarity: detector_config.wave_polarity.clone(),
114 min_wave_length_ms: detector_config.min_wave_length_ms,
115 max_wave_length_ms: detector_config.max_wave_length_ms,
116 },
117 );
118 processor.add_detector(Box::new(wave_detector));
119 }
120
121 for trigger_config in &config.triggers.pulse_triggers {
123 let trigger = super::triggers::pulse::PulseTrigger::new(
124 super::triggers::pulse::PulseTriggerConfig {
125 id: trigger_config.id.clone(),
126 activation_detector_id: trigger_config.activation_detector_id.clone(),
127 inhibition_detector_id: trigger_config.inhibition_detector_id.clone(),
128 inhibition_cooldown_ms: trigger_config.inhibition_cooldown_ms,
129 pulse_cooldown_ms: trigger_config.pulse_cooldown_ms,
130 },
131 );
132 processor.add_trigger(Box::new(trigger));
133 }
134 Ok(processor)
137 }
138
139 fn setup_logging_thread(rx: Receiver<(HashMap<&'static str, f64>, String)>) {
141 thread::spawn(move || {
142 let log_file_name = "trigger_debug.log";
143
144 if let Err(e) = crate::utils::log::delete_log_file(log_file_name) {
146 eprintln!(
147 "{}",
148 format!(
149 "Warning: Failed to delete old log file '{}': {}",
150 log_file_name, e
151 )
152 .yellow()
153 );
154 }
156 let _ = log_to_file(log_file_name, "Signal processor trigger logging started");
159
160 while let Ok((results, trigger_id)) = rx.recv() {
161 let timestamp = SystemTime::now()
163 .duration_since(UNIX_EPOCH)
164 .unwrap_or(Duration::from_secs(0))
165 .as_secs_f64();
166
167 let mut log_entry = format!("TRIGGER EVENT [{}]\n", timestamp);
169
170 log_entry.push_str(&format!("Trigger ID: {}\n\n", trigger_id));
172
173 log_entry.push_str("RESULTS CONTEXT:\n");
175 log_entry.push_str("----------------\n");
176
177 let mut keys: Vec<&&'static str> = results.keys().collect();
179 keys.sort();
180
181 for &key in keys {
182 if let Some(value) = results.get(key) {
183 log_entry.push_str(&format!("{} = {}\n", key, value));
184 }
185 }
186
187 if let Err(e) = log_to_file(log_file_name, &log_entry) {
189 eprintln!("{}", format!("Failed to log trigger event: {}", e).red());
190 }
191 }
192 let _ = log_to_file(log_file_name, "Signal processor trigger logging shut down.");
194 });
195 }
196
197 pub fn add_filter(&mut self, filter: Box<dyn FilterInstance>) {
198 let id = filter.id().to_string();
199 self.filters.insert(id, filter);
200 }
201
202 pub fn add_detector(&mut self, detector: Box<dyn DetectorInstance>) {
203 let id = detector.id().to_string();
204 let filter_id = detector.filter_id().to_string();
205
206 if !self.filters.contains_key(&filter_id) {
208 if self.processor_config.verbose {
210 eprintln!(
211 "{}",
212 format!(
213 "Warning: Detector '{}' references non-existent filter ID: {}",
214 id, filter_id
215 )
216 .yellow()
217 );
218 }
219 panic!("Detector references non-existent filter ID: {}", filter_id);
222 }
223
224 self.detectors.insert(id, detector);
225 }
226
227 pub fn add_trigger(&mut self, trigger: Box<dyn TriggerInstance>) {
228 let id = trigger.id().to_string();
229 let activation_detector_id = trigger.activation_detector_id();
230 let inhibition_detector_id = trigger.inhibition_detector_id();
231
232 if !self.detectors.contains_key(&activation_detector_id) {
234 if self.processor_config.verbose {
235 eprintln!(
236 "{}",
237 format!(
238 "Warning: Trigger '{}' references non-existent activation detector ID: {}",
239 id, activation_detector_id
240 )
241 .yellow()
242 );
243 }
244 panic!(
245 "Trigger references non-existent activation detector ID: {}",
246 activation_detector_id
247 );
248 }
249
250 if !inhibition_detector_id.is_empty()
252 && !self.detectors.contains_key(&inhibition_detector_id)
253 {
254 if self.processor_config.verbose {
255 eprintln!(
256 "{}",
257 format!(
258 "Warning: Trigger '{}' references non-existent inhibition detector ID: {}",
259 id, inhibition_detector_id
260 )
261 .yellow()
262 );
263 }
264 panic!(
265 "Trigger references non-existent inhibition detector ID: {}",
266 inhibition_detector_id
267 );
268 }
269 if inhibition_detector_id.is_empty() {
271 if self.processor_config.verbose {
272 eprintln!(
273 "{}",
274 format!("Trigger '{}' has no inhibition detector.", id).blue()
275 );
276 }
277 }
278
279 self.triggers.insert(id, trigger);
280 }
281
282 fn send_log_event(&self, results: HashMap<&'static str, f64>, trigger_id: String) {
284 if let Some(sender) = &self.log_sender {
285 if let Err(e) = sender.send((results, trigger_id)) {
286 eprintln!("{}", format!("Failed to send log event: {}", e).red());
287 }
288 }
289 }
290
291 pub fn run_chunk(
293 &mut self,
294 raw_samples: Vec<f64>,
295 ) -> (Vec<HashMap<&'static str, f64>>, Option<f64>) {
296 let mut output = Vec::with_capacity(raw_samples.len());
297 let mut trigger_timestamp_option = None;
298 let start_time_whole = Instant::now(); let mut trigger_events_to_log = Vec::new(); for sample in raw_samples {
304 self.results.clear();
306 self.results
307 .insert(&self.keys.global_index, self.index as f64);
308 self.results.insert(&self.keys.global_raw_sample, sample);
309 self.results.insert(
310 &self.keys.global_channel,
311 self.processor_config.channel as f64,
312 ); self.results.insert(
314 &self.keys.global_timestamp_ms,
315 self.index as f64 / self.processor_config.fs * 1000.0, );
317
318 for (_id, filter) in self.filters.iter_mut() {
320 filter.process_sample(&self.processor_config, &mut self.results);
321 }
323
324 for (_id, detector) in self.detectors.iter_mut() {
326 detector.process_sample(&self.processor_config, &mut self.results, self.index);
327 }
329
330 for (id, trigger) in self.triggers.iter_mut() {
332 trigger.evaluate(&self.processor_config, &mut self.results); let trigger_key = format!("triggers:{}:triggered", id);
339 let triggered = self
340 .results
341 .get(trigger_key.as_str()) .cloned()
343 .unwrap_or(0.0)
344 > 0.0;
345
346 if triggered {
347 let now = SystemTime::now();
348
349 let trigger_index_key = format!("triggers:{}:trigger_index", id);
350 let trigger_index = self
351 .results
352 .get(trigger_index_key.as_str()) .cloned()
354 .unwrap_or(0.0) as usize;
355
356 if trigger_index < self.index {
359 eprintln!(
360 "{}",
361 format!("Error: Trigger index ({}) for trigger '{}' is behind the current index ({})!",
362 trigger_index, id, self.index).red()
363 );
364 continue; }
366
367 let sample_diff = trigger_index as isize - self.index as isize;
369 let time_offset =
370 Duration::from_secs_f64(sample_diff as f64 / self.processor_config.fs); let future_trigger_timestamp = now + time_offset;
374
375 let unix_timestamp = future_trigger_timestamp
377 .duration_since(UNIX_EPOCH)
378 .expect("Time went backwards") .as_secs_f64();
380
381 if self.processor_config.enable_debug_logging {
383 trigger_events_to_log.push((self.results.clone(), id.clone()));
386 }
387
388 trigger_timestamp_option = Some(unix_timestamp);
390
391 }
394 }
395
396 output.push(self.results.clone());
397 self.index += 1;
398 }
399
400 for (results, trigger_id) in trigger_events_to_log {
403 self.send_log_event(results, trigger_id);
404 }
405
406 if self.processor_config.verbose {
408 let duration_whole = start_time_whole.elapsed();
410 eprintln!(
411 "{}",
412 format!("Processed chunk in {:?}", duration_whole).blue()
413 );
414
415 eprintln!(
417 "{}",
418 format!("Trigger timestamp option: {:?}", trigger_timestamp_option).color(
419 if trigger_timestamp_option.is_some() {
420 "green"
421 } else {
422 "red"
423 }
424 )
425 );
426
427 eprintln!(
429 "{}",
430 format!("Output length: {:?}", output.len()).color("yellow")
431 );
432 }
433
434 return (output, trigger_timestamp_option);
435 }
436}
437
438