Skip to main content

synheart_sensor_agent/core/
windowing.rs

1//! Window management for collecting events into time-based windows.
2//!
3//! Events are collected into fixed-duration windows (default 10 seconds)
4//! for feature extraction. Session boundaries are detected based on gaps.
5
6use crate::collector::types::{KeyboardEvent, MouseEvent, SensorEvent};
7use chrono::{DateTime, Duration, Utc};
8use serde::{Deserialize, Serialize};
9
10/// A time window containing collected events.
11#[derive(Debug, Clone, Serialize, Deserialize)]
12pub struct EventWindow {
13    /// Start time of the window
14    pub start: DateTime<Utc>,
15    /// End time of the window
16    pub end: DateTime<Utc>,
17    /// Keyboard events in this window
18    pub keyboard_events: Vec<KeyboardEvent>,
19    /// Mouse events in this window
20    pub mouse_events: Vec<MouseEvent>,
21    /// Whether this window marks the start of a new session
22    pub is_session_start: bool,
23}
24
25impl EventWindow {
26    /// Create a new empty window starting at the given time.
27    pub fn new(start: DateTime<Utc>, duration: Duration) -> Self {
28        Self {
29            start,
30            end: start + duration,
31            keyboard_events: Vec::new(),
32            mouse_events: Vec::new(),
33            is_session_start: false,
34        }
35    }
36
37    /// Check if a timestamp falls within this window.
38    pub fn contains(&self, timestamp: DateTime<Utc>) -> bool {
39        timestamp >= self.start && timestamp < self.end
40    }
41
42    /// Add an event to this window.
43    pub fn add_event(&mut self, event: SensorEvent) {
44        match event {
45            SensorEvent::Keyboard(e) => self.keyboard_events.push(e),
46            SensorEvent::Mouse(e) => self.mouse_events.push(e),
47        }
48    }
49
50    /// Check if the window has any events.
51    pub fn is_empty(&self) -> bool {
52        self.keyboard_events.is_empty() && self.mouse_events.is_empty()
53    }
54
55    /// Get the total number of events in this window.
56    pub fn event_count(&self) -> usize {
57        self.keyboard_events.len() + self.mouse_events.len()
58    }
59
60    /// Get the duration of this window in seconds.
61    pub fn duration_secs(&self) -> f64 {
62        (self.end - self.start).num_milliseconds() as f64 / 1000.0
63    }
64}
65
66/// Manages the collection of events into time windows.
67pub struct WindowManager {
68    /// Duration of each window
69    window_duration: Duration,
70    /// Gap threshold for session boundaries
71    session_gap_threshold: Duration,
72    /// Current window being filled
73    current_window: Option<EventWindow>,
74    /// Completed windows ready for processing
75    completed_windows: Vec<EventWindow>,
76    /// Timestamp of the last event received
77    last_event_time: Option<DateTime<Utc>>,
78}
79
80impl WindowManager {
81    /// Create a new window manager with the given window duration.
82    pub fn new(window_duration_secs: u64, session_gap_threshold_secs: u64) -> Self {
83        Self {
84            window_duration: Duration::seconds(window_duration_secs as i64),
85            session_gap_threshold: Duration::seconds(session_gap_threshold_secs as i64),
86            current_window: None,
87            completed_windows: Vec::new(),
88            last_event_time: None,
89        }
90    }
91
92    /// Process an incoming event.
93    ///
94    /// This will:
95    /// 1. Detect session boundaries based on gaps
96    /// 2. Create new windows as needed
97    /// 3. Complete windows when their time expires
98    pub fn process_event(&mut self, event: SensorEvent) {
99        let event_time = event.timestamp();
100
101        // Check for session boundary (gap in events)
102        let is_new_session = if let Some(last_time) = self.last_event_time {
103            event_time - last_time > self.session_gap_threshold
104        } else {
105            true // First event starts a session
106        };
107
108        // If this is a new session, complete the current window
109        if is_new_session && self.current_window.is_some() {
110            self.complete_current_window();
111        }
112
113        // Ensure we have a current window
114        if self.current_window.is_none() {
115            let mut window = EventWindow::new(event_time, self.window_duration);
116            window.is_session_start = is_new_session;
117            self.current_window = Some(window);
118        }
119
120        // Check if the event falls outside the current window
121        let window = self.current_window.as_ref().unwrap();
122        if event_time >= window.end {
123            // Complete the current window and create a new one
124            self.complete_current_window();
125
126            // Align the new window to the event time
127            let mut window = EventWindow::new(event_time, self.window_duration);
128            window.is_session_start = is_new_session;
129            self.current_window = Some(window);
130        }
131
132        // Add the event to the current window
133        if let Some(ref mut window) = self.current_window {
134            window.add_event(event);
135        }
136
137        self.last_event_time = Some(event_time);
138    }
139
140    /// Force completion of the current window (e.g., on pause or stop).
141    pub fn flush(&mut self) {
142        self.complete_current_window();
143    }
144
145    /// Get and remove completed windows.
146    pub fn take_completed_windows(&mut self) -> Vec<EventWindow> {
147        std::mem::take(&mut self.completed_windows)
148    }
149
150    /// Check if there are completed windows available.
151    pub fn has_completed_windows(&self) -> bool {
152        !self.completed_windows.is_empty()
153    }
154
155    /// Get the number of completed windows.
156    pub fn completed_window_count(&self) -> usize {
157        self.completed_windows.len()
158    }
159
160    /// Complete the current window and move it to completed.
161    fn complete_current_window(&mut self) {
162        if let Some(window) = self.current_window.take() {
163            // Only keep non-empty windows
164            if !window.is_empty() {
165                self.completed_windows.push(window);
166            }
167        }
168    }
169
170    /// Check and complete the current window if it has expired.
171    pub fn check_window_expiry(&mut self) {
172        let now = Utc::now();
173        if let Some(ref window) = self.current_window {
174            if now >= window.end {
175                self.complete_current_window();
176            }
177        }
178    }
179}
180
181#[cfg(test)]
182mod tests {
183    use super::*;
184
185    #[test]
186    fn test_window_creation() {
187        let start = Utc::now();
188        let window = EventWindow::new(start, Duration::seconds(10));
189
190        assert_eq!(window.start, start);
191        assert_eq!(window.end, start + Duration::seconds(10));
192        assert!(window.is_empty());
193    }
194
195    #[test]
196    fn test_window_contains() {
197        let start = Utc::now();
198        let window = EventWindow::new(start, Duration::seconds(10));
199
200        assert!(window.contains(start));
201        assert!(window.contains(start + Duration::seconds(5)));
202        assert!(!window.contains(start + Duration::seconds(10)));
203        assert!(!window.contains(start - Duration::seconds(1)));
204    }
205
206    #[test]
207    fn test_window_manager_basic() {
208        let mut manager = WindowManager::new(10, 300);
209
210        // Process some keyboard events
211        for _ in 0..5 {
212            let event = SensorEvent::Keyboard(crate::collector::types::KeyboardEvent::new(true));
213            manager.process_event(event);
214        }
215
216        // Window shouldn't be complete yet
217        assert!(!manager.has_completed_windows());
218
219        // Flush to complete the current window
220        manager.flush();
221        assert!(manager.has_completed_windows());
222
223        let windows = manager.take_completed_windows();
224        assert_eq!(windows.len(), 1);
225        assert_eq!(windows[0].keyboard_events.len(), 5);
226    }
227}