Skip to main content

direct_neural_biasing/processing/
signal_processor.rs

1use super::detectors::DetectorInstance;
2use super::filters::FilterInstance;
3use super::triggers::TriggerInstance;
4
5use serde::{Deserialize, Serialize};
6use std::collections::HashMap;
7// use std::time;
8
9use colored::Colorize;
10use std::time::{Duration, Instant, SystemTime, UNIX_EPOCH};
11
12// Uncomment these imports as we'll need them for logging
13use crate::utils::log::log_to_file; // Assumes log_to_file is in crate::utils::log
14use std::sync::mpsc::{self, Receiver, Sender};
15use std::thread;
16// use rayon::prelude::*;
17// use std::os::raw::c_void;
18
19use crate::config::load_config;
20
21// -----------------------------------------------------------------------------
22// RUST CORE LOGIC
23// -----------------------------------------------------------------------------
24
25// SIGNAL PROCESSOR COMPONENT --------------------------------------------------
26
27// not used in this file, but kept for other files
28#[derive(Debug, Serialize, Deserialize, Clone)]
29pub struct SignalProcessorConfig {
30    pub verbose: bool,
31    pub fs: f64,
32    pub channel: usize,
33    pub enable_debug_logging: bool, // New field to control debug logging
34}
35
36pub struct SignalProcessor {
37    pub index: usize,
38    pub filters: HashMap<String, Box<dyn FilterInstance>>,
39    pub detectors: HashMap<String, Box<dyn DetectorInstance>>,
40    pub triggers: HashMap<String, Box<dyn TriggerInstance>>,
41    pub processor_config: SignalProcessorConfig, // Use the config from the main config struct
42    pub results: HashMap<&'static str, f64>,
43    pub keys: Keys, // Consider making Keys constants if they are fixed strings
44    log_sender: Option<Sender<(HashMap<&'static str, f64>, String)>>, // Channel to send logs to background thread
45}
46
47// Consider if Keys struct is necessary, or if constants are sufficient
48pub struct Keys {
49    global_index: &'static str,
50    global_raw_sample: &'static str,
51    global_channel: &'static str,
52    global_timestamp_ms: &'static str,
53}
54
55impl SignalProcessor {
56    /// Builds a new SignalProcessor instance from a configuration file.
57    pub fn from_config_file(config_path: &str) -> Result<Self, String> {
58        // Load the entire config from the file
59        let config = load_config(config_path)?;
60
61        // Setup logging
62        let log_sender = if config.processor.enable_debug_logging {
63            // Set up logging thread only if debug logging is enabled
64            let (tx, rx) = mpsc::channel();
65            // We can keep setup_logging_thread as a private helper within the impl
66            SignalProcessor::setup_logging_thread(rx);
67            Some(tx)
68        } else {
69            None
70        };
71
72        let mut processor = SignalProcessor {
73            index: 0,
74            filters: HashMap::new(),
75            detectors: HashMap::new(),
76            triggers: HashMap::new(),
77            processor_config: config.processor.clone(), // Clone the processor part of the config
78            results: HashMap::with_capacity(32),
79            keys: Keys {
80                // Consider using constants instead of this struct
81                global_index: "global:index",
82                global_raw_sample: "global:raw_sample",
83                global_channel: "global:channel",
84                global_timestamp_ms: "global:timestamp_ms",
85            },
86            log_sender,
87        };
88        // --- End of logic moved from the old `new` function ---
89
90        // --- Start of logic moved from the old `from_config` function ---
91        // Add all filters from config
92        for filter_config in &config.filters.bandpass_filters {
93            let filter = super::filters::bandpass::BandPassFilter::new(
94                super::filters::bandpass::BandPassFilterConfig {
95                    id: filter_config.id.clone(),
96                    f_low: filter_config.f_low,
97                    f_high: filter_config.f_high,
98                },
99                processor.processor_config.fs,
100            );
101            processor.add_filter(Box::new(filter));
102        }
103
104        // Add all detectors from config
105        for detector_config in &config.detectors.wave_peak_detectors {
106            let wave_detector = super::detectors::wave_peak::WavePeakDetector::new(
107                super::detectors::wave_peak::WavePeakDetectorConfig {
108                    id: detector_config.id.clone(),
109                    filter_id: detector_config.filter_id.clone(),
110                    z_score_threshold: detector_config.z_score_threshold,
111                    sinusoidness_threshold: detector_config.sinusoidness_threshold,
112                    check_sinusoidness: detector_config.check_sinusoidness,
113                    wave_polarity: detector_config.wave_polarity.clone(),
114                    min_wave_length_ms: detector_config.min_wave_length_ms,
115                    max_wave_length_ms: detector_config.max_wave_length_ms,
116                },
117            );
118            processor.add_detector(Box::new(wave_detector));
119        }
120
121        // Add all triggers from config
122        for trigger_config in &config.triggers.pulse_triggers {
123            let trigger = super::triggers::pulse::PulseTrigger::new(
124                super::triggers::pulse::PulseTriggerConfig {
125                    id: trigger_config.id.clone(),
126                    activation_detector_id: trigger_config.activation_detector_id.clone(),
127                    inhibition_detector_id: trigger_config.inhibition_detector_id.clone(),
128                    inhibition_cooldown_ms: trigger_config.inhibition_cooldown_ms,
129                    pulse_cooldown_ms: trigger_config.pulse_cooldown_ms,
130                },
131            );
132            processor.add_trigger(Box::new(trigger));
133        }
134        // --- End of logic moved from the old `from_config` function ---
135
136        Ok(processor)
137    }
138
139    // We keep setup_logging_thread as a private helper method
140    fn setup_logging_thread(rx: Receiver<(HashMap<&'static str, f64>, String)>) {
141        thread::spawn(move || {
142            let log_file_name = "trigger_debug.log";
143
144            // --- Call the new function to delete the old log file ---
145            if let Err(e) = crate::utils::log::delete_log_file(log_file_name) {
146                eprintln!(
147                    "{}",
148                    format!(
149                        "Warning: Failed to delete old log file '{}': {}",
150                        log_file_name, e
151                    )
152                    .yellow()
153                );
154                // Continue execution even if deletion failed, just warn the user.
155            }
156            // --- End of call ---
157
158            let _ = log_to_file(log_file_name, "Signal processor trigger logging started");
159
160            while let Ok((results, trigger_id)) = rx.recv() {
161                // Get current timestamp for the log
162                let timestamp = SystemTime::now()
163                    .duration_since(UNIX_EPOCH)
164                    .unwrap_or(Duration::from_secs(0))
165                    .as_secs_f64();
166
167                // Format the trigger event information
168                let mut log_entry = format!("TRIGGER EVENT [{}]\n", timestamp);
169
170                // Add trigger ID
171                log_entry.push_str(&format!("Trigger ID: {}\n\n", trigger_id));
172
173                // Add all results values in a readable format
174                log_entry.push_str("RESULTS CONTEXT:\n");
175                log_entry.push_str("----------------\n");
176
177                // Create sorted list of keys for consistent output
178                let mut keys: Vec<&&'static str> = results.keys().collect();
179                keys.sort();
180
181                for &key in keys {
182                    if let Some(value) = results.get(key) {
183                        log_entry.push_str(&format!("{} = {}\n", key, value));
184                    }
185                }
186
187                // Log the event to file
188                if let Err(e) = log_to_file(log_file_name, &log_entry) {
189                    eprintln!("{}", format!("Failed to log trigger event: {}", e).red());
190                }
191            }
192            // Log shutdown message when sender is dropped and channel is empty
193            let _ = log_to_file(log_file_name, "Signal processor trigger logging shut down.");
194        });
195    }
196
197    pub fn add_filter(&mut self, filter: Box<dyn FilterInstance>) {
198        let id = filter.id().to_string();
199        self.filters.insert(id, filter);
200    }
201
202    pub fn add_detector(&mut self, detector: Box<dyn DetectorInstance>) {
203        let id = detector.id().to_string();
204        let filter_id = detector.filter_id().to_string();
205
206        // Check if the detector references a valid filter ID
207        if !self.filters.contains_key(&filter_id) {
208            // Use the processor_config for verbose checking if needed
209            if self.processor_config.verbose {
210                eprintln!(
211                    "{}",
212                    format!(
213                        "Warning: Detector '{}' references non-existent filter ID: {}",
214                        id, filter_id
215                    )
216                    .yellow()
217                );
218            }
219            // Decide if this should be a panic or just a warning + skipping
220            // For now, let's keep the panic as it indicates a critical config error
221            panic!("Detector references non-existent filter ID: {}", filter_id);
222        }
223
224        self.detectors.insert(id, detector);
225    }
226
227    pub fn add_trigger(&mut self, trigger: Box<dyn TriggerInstance>) {
228        let id = trigger.id().to_string();
229        let activation_detector_id = trigger.activation_detector_id();
230        let inhibition_detector_id = trigger.inhibition_detector_id();
231
232        // Check if the activation detector ID is valid
233        if !self.detectors.contains_key(&activation_detector_id) {
234            if self.processor_config.verbose {
235                eprintln!(
236                    "{}",
237                    format!(
238                        "Warning: Trigger '{}' references non-existent activation detector ID: {}",
239                        id, activation_detector_id
240                    )
241                    .yellow()
242                );
243            }
244            panic!(
245                "Trigger references non-existent activation detector ID: {}",
246                activation_detector_id
247            );
248        }
249
250        // Check if the inhibition detector ID is valid (if not empty)
251        if !inhibition_detector_id.is_empty()
252            && !self.detectors.contains_key(&inhibition_detector_id)
253        {
254            if self.processor_config.verbose {
255                eprintln!(
256                    "{}",
257                    format!(
258                        "Warning: Trigger '{}' references non-existent inhibition detector ID: {}",
259                        id, inhibition_detector_id
260                    )
261                    .yellow()
262                );
263            }
264            panic!(
265                "Trigger references non-existent inhibition detector ID: {}",
266                inhibition_detector_id
267            );
268        }
269        // Handle the case where inhibition_detector_id is empty string, which is valid
270        if inhibition_detector_id.is_empty() {
271            if self.processor_config.verbose {
272                eprintln!(
273                    "{}",
274                    format!("Trigger '{}' has no inhibition detector.", id).blue()
275                );
276            }
277        }
278
279        self.triggers.insert(id, trigger);
280    }
281
282    // Helper method to send log messages if logging is enabled
283    fn send_log_event(&self, results: HashMap<&'static str, f64>, trigger_id: String) {
284        if let Some(sender) = &self.log_sender {
285            if let Err(e) = sender.send((results, trigger_id)) {
286                eprintln!("{}", format!("Failed to send log event: {}", e).red());
287            }
288        }
289    }
290
291    // Process a Vec of raw samples - chunk with defined length
292    pub fn run_chunk(
293        &mut self,
294        raw_samples: Vec<f64>,
295    ) -> (Vec<HashMap<&'static str, f64>>, Option<f64>) {
296        let mut output = Vec::with_capacity(raw_samples.len());
297        let mut trigger_timestamp_option = None;
298        let start_time_whole = Instant::now(); // Start timer before analysis
299
300        // Create a vector to collect trigger events
301        let mut trigger_events_to_log = Vec::new(); // Rename to avoid confusion with results
302
303        for sample in raw_samples {
304            // Reset and update globals
305            self.results.clear();
306            self.results
307                .insert(&self.keys.global_index, self.index as f64);
308            self.results.insert(&self.keys.global_raw_sample, sample);
309            self.results.insert(
310                &self.keys.global_channel,
311                self.processor_config.channel as f64,
312            ); // Use processor_config
313            self.results.insert(
314                &self.keys.global_timestamp_ms,
315                self.index as f64 / self.processor_config.fs * 1000.0, // Correct conversion to ms (divide by fs, multiply by 1000)
316            );
317
318            // Filters process the sample
319            for (_id, filter) in self.filters.iter_mut() {
320                filter.process_sample(&self.processor_config, &mut self.results);
321                // Use processor_config
322            }
323
324            // Detectors process the filtered results
325            for (_id, detector) in self.detectors.iter_mut() {
326                detector.process_sample(&self.processor_config, &mut self.results, self.index);
327                // Use processor_config
328            }
329
330            // Triggers evaluate based on detector outputs
331            for (id, trigger) in self.triggers.iter_mut() {
332                trigger.evaluate(&self.processor_config, &mut self.results); // Use processor_config
333
334                // Check if the trigger has been activated
335                // Note: The Box::leak approach here is a bit unusual and could lead to memory leaks
336                // if used frequently with unique strings. For fixed trigger IDs, it might be okay,
337                // but consider alternative ways to generate keys if they are dynamic.
338                let trigger_key = format!("triggers:{}:triggered", id);
339                let triggered = self
340                    .results
341                    .get(trigger_key.as_str()) // Use as_str() to get the key without leaking
342                    .cloned()
343                    .unwrap_or(0.0)
344                    > 0.0;
345
346                if triggered {
347                    let now = SystemTime::now();
348
349                    let trigger_index_key = format!("triggers:{}:trigger_index", id);
350                    let trigger_index = self
351                        .results
352                        .get(trigger_index_key.as_str()) // Use as_str()
353                        .cloned()
354                        .unwrap_or(0.0) as usize;
355
356                    // Verify the trigger index is ahead of or at the current index
357                    // A trigger occurring at the current index is valid.
358                    if trigger_index < self.index {
359                        eprintln!(
360                            "{}",
361                             format!("Error: Trigger index ({}) for trigger '{}' is behind the current index ({})!",
362                                    trigger_index, id, self.index).red()
363                        );
364                        continue; // Skip processing for this trigger instance
365                    }
366
367                    // Compute the relative time offset as a Duration
368                    let sample_diff = trigger_index as isize - self.index as isize;
369                    let time_offset =
370                        Duration::from_secs_f64(sample_diff as f64 / self.processor_config.fs); // Use processor_config
371
372                    // Add the relative time offset to the current UNIX time
373                    let future_trigger_timestamp = now + time_offset;
374
375                    // Convert SystemTime to UNIX timestamp (f64) for c++ compatibility
376                    let unix_timestamp = future_trigger_timestamp
377                        .duration_since(UNIX_EPOCH)
378                        .expect("Time went backwards") // This should generally not happen with SystemTime::now()
379                        .as_secs_f64();
380
381                    // If debug logging is enabled, collect the trigger event info for later logging
382                    if self.processor_config.enable_debug_logging {
383                        // Use processor_config
384                        // Clone both results and trigger ID for the event log
385                        trigger_events_to_log.push((self.results.clone(), id.clone()));
386                    }
387
388                    // Update the trigger timestamp option
389                    trigger_timestamp_option = Some(unix_timestamp);
390
391                    // Optional: Break after the first trigger if you only want one trigger per chunk
392                    // break;
393                }
394            }
395
396            output.push(self.results.clone());
397            self.index += 1;
398        }
399
400        // Now log all collected trigger events outside the mutable borrow scope
401        // Use the helper method
402        for (results, trigger_id) in trigger_events_to_log {
403            self.send_log_event(results, trigger_id);
404        }
405
406        // debug print timing (conditional on verbose flag)
407        if self.processor_config.verbose {
408            // Use processor_config
409            let duration_whole = start_time_whole.elapsed();
410            eprintln!(
411                "{}",
412                format!("Processed chunk in {:?}", duration_whole).blue()
413            );
414
415            // debug print trigger timestamp option
416            eprintln!(
417                "{}",
418                format!("Trigger timestamp option: {:?}", trigger_timestamp_option).color(
419                    if trigger_timestamp_option.is_some() {
420                        "green"
421                    } else {
422                        "red"
423                    }
424                )
425            );
426
427            // debug print output length
428            eprintln!(
429                "{}",
430                format!("Output length: {:?}", output.len()).color("yellow")
431            );
432        }
433
434        return (output, trigger_timestamp_option);
435    }
436}
437
438// Potential constants for Keys if you remove the Keys struct
439/*
440const GLOBAL_INDEX_KEY: &str = "global:index";
441const GLOBAL_RAW_SAMPLE_KEY: &str = "global:raw_sample";
442const GLOBAL_CHANNEL_KEY: &str = "global:channel";
443const GLOBAL_TIMESTAMP_MS_KEY: &str = "global:timestamp_ms";
444*/