Skip to main content

synheart_sensor_agent/flux/
processor.rs

1//! Sensor-aware Flux processor for baseline tracking and HSI enrichment.
2//!
3//! This module wraps the synheart-flux BehaviorProcessor to provide
4//! baseline tracking and HSI enrichment for sensor agent data.
5
6use crate::core::features::WindowFeatures;
7use crate::core::hsi::HsiSnapshot;
8use crate::core::windowing::EventWindow;
9use crate::flux::adapter::SensorBehaviorAdapter;
10use serde::{Deserialize, Serialize};
11use synheart_flux::behavior::BehaviorProcessor;
12use synheart_flux::ComputeError;
13
14/// Enriched snapshot with baseline-adjusted metrics.
15#[derive(Debug, Clone, Serialize, Deserialize)]
16pub struct EnrichedSnapshot {
17    /// Original sensor snapshot
18    pub base: HsiSnapshot,
19    /// Flux-computed behavioral metrics (if available)
20    #[serde(skip_serializing_if = "Option::is_none")]
21    pub flux_behavior: Option<FluxBehaviorMetrics>,
22    /// Baseline information (if available)
23    #[serde(skip_serializing_if = "Option::is_none")]
24    pub baseline: Option<BaselineInfo>,
25}
26
27/// Flux-computed behavioral metrics.
28#[derive(Debug, Clone, Serialize, Deserialize)]
29pub struct FluxBehaviorMetrics {
30    /// Distraction score (0.0 to 1.0)
31    pub distraction_score: f64,
32    /// Focus hint (inverse of distraction)
33    pub focus_hint: f64,
34    /// Task switch rate (normalized)
35    pub task_switch_rate: f64,
36    /// Notification load (normalized)
37    pub notification_load: f64,
38    /// Burstiness index (Barabasi formula)
39    pub burstiness: f64,
40    /// Scroll jitter rate
41    pub scroll_jitter_rate: f64,
42    /// Interaction intensity
43    pub interaction_intensity: f64,
44    /// Deep focus block count
45    pub deep_focus_blocks: u32,
46}
47
48/// Baseline information for the current session.
49#[derive(Debug, Clone, Serialize, Deserialize)]
50pub struct BaselineInfo {
51    /// Baseline distraction score
52    pub distraction: Option<f64>,
53    /// Baseline focus score
54    pub focus: Option<f64>,
55    /// Deviation from baseline (percentage)
56    pub distraction_deviation_pct: Option<f64>,
57    /// Number of sessions in the baseline
58    pub sessions_in_baseline: u32,
59}
60
61/// Sensor-aware Flux processor for baseline tracking and HSI enrichment.
62pub struct SensorFluxProcessor {
63    /// Internal behavior processor
64    processor: BehaviorProcessor,
65    /// Adapter for converting sensor events
66    adapter: SensorBehaviorAdapter,
67    /// Session counter
68    session_count: usize,
69}
70
71impl SensorFluxProcessor {
72    /// Create a new processor with the specified baseline window size.
73    ///
74    /// # Arguments
75    ///
76    /// * `baseline_window_sessions` - Number of sessions to include in rolling baseline (default: 20)
77    pub fn new(baseline_window_sessions: usize) -> Self {
78        Self {
79            processor: BehaviorProcessor::with_baseline_window(baseline_window_sessions),
80            adapter: SensorBehaviorAdapter::with_defaults(),
81            session_count: 0,
82        }
83    }
84
85    /// Create with a custom device ID.
86    pub fn with_device_id(baseline_window_sessions: usize, device_id: &str) -> Self {
87        Self {
88            processor: BehaviorProcessor::with_baseline_window(baseline_window_sessions),
89            adapter: SensorBehaviorAdapter::new(device_id.to_string(), "UTC".to_string()),
90            session_count: 0,
91        }
92    }
93
94    /// Process a window and return an enriched snapshot with flux metrics.
95    ///
96    /// This converts the sensor window to a behavior session, processes it
97    /// through the flux pipeline, and returns an enriched snapshot.
98    pub fn process_window(
99        &mut self,
100        window: &EventWindow,
101        _features: &WindowFeatures,
102        base_snapshot: HsiSnapshot,
103    ) -> Result<EnrichedSnapshot, ComputeError> {
104        self.session_count += 1;
105        let session_id = format!("sensor-{}", self.session_count);
106
107        // Convert window to behavior session
108        let session = self.adapter.convert(&session_id, window);
109
110        // Serialize session to JSON for flux processing
111        let session_json = serde_json::to_string(&session)
112            .map_err(|e| ComputeError::EncodingError(e.to_string()))?;
113
114        // Process through flux
115        let hsi_json = self.processor.process(&session_json)?;
116
117        // Extract metrics from HSI JSON
118        let (flux_behavior, baseline) = extract_flux_metrics_from_json(&hsi_json)?;
119
120        Ok(EnrichedSnapshot {
121            base: base_snapshot,
122            flux_behavior,
123            baseline,
124        })
125    }
126
127    /// Process a window without enrichment (just baseline update).
128    ///
129    /// This updates the baseline without returning enriched output.
130    pub fn update_baseline(&mut self, window: &EventWindow) -> Result<(), ComputeError> {
131        self.session_count += 1;
132        let session_id = format!("sensor-{}", self.session_count);
133        let session = self.adapter.convert(&session_id, window);
134
135        let session_json = serde_json::to_string(&session)
136            .map_err(|e| ComputeError::EncodingError(e.to_string()))?;
137
138        let _ = self.processor.process(&session_json)?;
139        Ok(())
140    }
141
142    /// Save baselines to JSON for persistence.
143    pub fn save_baselines(&self) -> Result<String, ComputeError> {
144        self.processor.save_baselines()
145    }
146
147    /// Load baselines from JSON.
148    pub fn load_baselines(&mut self, json: &str) -> Result<(), ComputeError> {
149        self.processor.load_baselines(json)
150    }
151
152    /// Get the number of sessions processed.
153    pub fn session_count(&self) -> usize {
154        self.session_count
155    }
156}
157
158/// Extract flux metrics from HSI JSON string.
159fn extract_flux_metrics_from_json(
160    hsi_json: &str,
161) -> Result<(Option<FluxBehaviorMetrics>, Option<BaselineInfo>), ComputeError> {
162    let payload: serde_json::Value =
163        serde_json::from_str(hsi_json).map_err(|e| ComputeError::ParseError(e.to_string()))?;
164
165    let windows = payload.get("behavior_windows").and_then(|w| w.as_array());
166
167    let window = match windows {
168        Some(w) if !w.is_empty() => &w[0],
169        _ => return Ok((None, None)),
170    };
171
172    // Extract behavior metrics
173    let behavior = window.get("behavior");
174    let flux_behavior = behavior.map(|b| FluxBehaviorMetrics {
175        distraction_score: b
176            .get("distraction_score")
177            .and_then(|v| v.as_f64())
178            .unwrap_or(0.0),
179        focus_hint: b.get("focus_hint").and_then(|v| v.as_f64()).unwrap_or(0.0),
180        task_switch_rate: b
181            .get("task_switch_rate")
182            .and_then(|v| v.as_f64())
183            .unwrap_or(0.0),
184        notification_load: b
185            .get("notification_load")
186            .and_then(|v| v.as_f64())
187            .unwrap_or(0.0),
188        burstiness: b.get("burstiness").and_then(|v| v.as_f64()).unwrap_or(0.0),
189        scroll_jitter_rate: b
190            .get("scroll_jitter_rate")
191            .and_then(|v| v.as_f64())
192            .unwrap_or(0.0),
193        interaction_intensity: b
194            .get("interaction_intensity")
195            .and_then(|v| v.as_f64())
196            .unwrap_or(0.0),
197        deep_focus_blocks: b
198            .get("deep_focus_blocks")
199            .and_then(|v| v.as_u64())
200            .unwrap_or(0) as u32,
201    });
202
203    // Extract baseline
204    let baseline_json = window.get("baseline");
205    let baseline = baseline_json.map(|b| BaselineInfo {
206        distraction: b.get("distraction").and_then(|v| v.as_f64()),
207        focus: b.get("focus").and_then(|v| v.as_f64()),
208        distraction_deviation_pct: b.get("distraction_deviation_pct").and_then(|v| v.as_f64()),
209        sessions_in_baseline: b
210            .get("sessions_in_baseline")
211            .and_then(|v| v.as_u64())
212            .unwrap_or(0) as u32,
213    });
214
215    Ok((flux_behavior, baseline))
216}
217
218impl Default for SensorFluxProcessor {
219    fn default() -> Self {
220        Self::new(20)
221    }
222}
223
224#[cfg(test)]
225mod tests {
226    use super::*;
227
228    #[test]
229    fn test_processor_creation() {
230        let processor = SensorFluxProcessor::new(20);
231        assert_eq!(processor.session_count(), 0);
232    }
233
234    #[test]
235    fn test_processor_with_device_id() {
236        let processor = SensorFluxProcessor::with_device_id(20, "test-device");
237        assert_eq!(processor.session_count(), 0);
238    }
239}