Skip to main content

st/
hot_watcher.rs

1//
2// -----------------------------------------------------------------------------
3//  HOT WATCHER: Wave-Powered Directory Intelligence
4//
5//  This module uses MEM8 wave memory to track directory activity in real-time.
6//  Each watched path becomes a Wave with properties that evolve:
7//
8//  - Frequency: Change rate (changes per hour)
9//  - Arousal: Current activity level (0.0 = cold, 1.0 = HOT)
10//  - Valence: Security concern (-1.0 = danger, +1.0 = safe)
11//  - Decay: Old activity fades, recent activity persists
12//
13//  When waves resonate, we detect patterns - like coordinated attacks or
14//  related changes across the codebase.
15//
16//  "Memory is wave interference patterns in cognitive space." - MEM8
17// -----------------------------------------------------------------------------
18//
19
20use crate::scanner_interest::InterestLevel;
21use crate::security_scan::SecurityFinding;
22use anyhow::Result;
23use crate::mem8_lite::Wave;
24use notify::{
25    event::{CreateKind, ModifyKind, RemoveKind},
26    Event, EventKind, RecommendedWatcher, RecursiveMode, Watcher,
27};
28use std::collections::HashMap;
29use std::path::{Path, PathBuf};
30use std::sync::{Arc, RwLock};
31use std::time::{Duration, Instant};
32use tokio::sync::mpsc;
33
34/// A directory being watched with its wave state
35#[derive(Debug, Clone)]
36pub struct WatchedDirectory {
37    /// The path being watched
38    pub path: PathBuf,
39    /// Wave representing this directory's activity pattern
40    pub wave: Wave,
41    /// Recent events (for burst detection)
42    pub recent_events: Vec<WatchEvent>,
43    /// Security findings in this directory
44    pub security_findings: Vec<SecurityFinding>,
45    /// Computed interest level
46    pub interest_level: InterestLevel,
47    /// Last time we computed the interest level
48    pub interest_computed_at: Instant,
49}
50
51impl WatchedDirectory {
52    /// Create a new watched directory with initial wave state
53    pub fn new(path: PathBuf) -> Self {
54        // Start with a neutral wave at low arousal
55        let wave = Wave::new(
56            1.0,  // 1 Hz base frequency (1 change per second baseline)
57            0.0,  // Neutral valence (no security concern yet)
58            0.1,  // Low initial arousal
59        );
60
61        Self {
62            path,
63            wave,
64            recent_events: Vec::new(),
65            security_findings: Vec::new(),
66            interest_level: InterestLevel::Background,
67            interest_computed_at: Instant::now(),
68        }
69    }
70
71    /// Record a file system event, updating the wave
72    pub fn record_event(&mut self, event: WatchEvent) {
73        // Update wave properties based on event type
74        match event.kind {
75            WatchEventKind::Created => {
76                // New files increase arousal
77                self.wave.arousal = (self.wave.arousal + 0.2).min(1.0);
78                self.wave.frequency += 0.5;
79            }
80            WatchEventKind::Modified => {
81                // Modifications increase arousal moderately
82                self.wave.arousal = (self.wave.arousal + 0.1).min(1.0);
83                self.wave.frequency += 0.2;
84            }
85            WatchEventKind::Deleted => {
86                // Deletions are notable
87                self.wave.arousal = (self.wave.arousal + 0.15).min(1.0);
88                self.wave.frequency += 0.3;
89            }
90            WatchEventKind::SecurityConcern => {
91                // Security concerns lower valence (toward danger)
92                self.wave.emotional_valence = (self.wave.emotional_valence - 0.3).max(-1.0);
93                self.wave.arousal = 1.0; // Immediate high alert
94            }
95        }
96
97        // Track recent events for burst detection
98        self.recent_events.push(event);
99
100        // Prune old events (keep last 5 minutes)
101        let cutoff = Instant::now() - Duration::from_secs(300);
102        self.recent_events.retain(|e| e.timestamp > cutoff);
103
104        // Check for burst activity
105        if self.recent_events.len() > 10 {
106            // Lots of events = very hot
107            self.wave.arousal = 1.0;
108            self.wave.frequency = self.recent_events.len() as f64 / 300.0 * 3600.0; // events per hour
109        }
110
111        // Update interest level
112        self.recompute_interest();
113    }
114
115    /// Record a security finding
116    pub fn record_security_finding(&mut self, finding: SecurityFinding) {
117        self.security_findings.push(finding);
118
119        // Lower valence based on risk
120        let valence_penalty = match self.security_findings.last().map(|f| &f.risk_level) {
121            Some(crate::security_scan::RiskLevel::Critical) => 0.5,
122            Some(crate::security_scan::RiskLevel::High) => 0.3,
123            Some(crate::security_scan::RiskLevel::Medium) => 0.2,
124            Some(crate::security_scan::RiskLevel::Low) => 0.1,
125            None => 0.0,
126        };
127        self.wave.emotional_valence = (self.wave.emotional_valence - valence_penalty).max(-1.0);
128
129        // Security findings always trigger high arousal
130        self.wave.arousal = (self.wave.arousal + 0.5).min(1.0);
131
132        self.recompute_interest();
133    }
134
135    /// Apply decay to the wave (call periodically)
136    pub fn apply_decay(&mut self, elapsed_secs: f64) {
137        // Arousal decays toward baseline
138        let decay_factor = (-0.001 * elapsed_secs).exp();
139        self.wave.arousal *= decay_factor;
140
141        // Frequency decays toward 1.0 (baseline)
142        self.wave.frequency = 1.0 + (self.wave.frequency - 1.0) * decay_factor;
143
144        // Valence slowly recovers toward neutral (if no new threats)
145        if self.wave.emotional_valence < 0.0 {
146            self.wave.emotional_valence = (self.wave.emotional_valence + 0.0001 * elapsed_secs).min(0.0);
147        }
148    }
149
150    /// Compute interest level from wave properties
151    fn recompute_interest(&mut self) {
152        self.interest_level = if self.wave.emotional_valence < -0.5 {
153            // Security concern
154            InterestLevel::Critical
155        } else if self.wave.arousal > 0.8 {
156            // Very hot
157            InterestLevel::Important
158        } else if self.wave.arousal > 0.4 || self.wave.frequency > 10.0 {
159            // Notable activity
160            InterestLevel::Notable
161        } else if self.wave.arousal > 0.1 {
162            // Some activity
163            InterestLevel::Background
164        } else {
165            // Cold
166            InterestLevel::Boring
167        };
168
169        self.interest_computed_at = Instant::now();
170    }
171
172    /// Check if this directory is "hot" (worth watching closely)
173    pub fn is_hot(&self) -> bool {
174        self.wave.arousal > 0.5 || self.wave.emotional_valence < -0.3
175    }
176
177    /// Compute resonance with another watched directory
178    pub fn resonance_with(&self, other: &WatchedDirectory) -> f64 {
179        self.wave.resonance_with(&other.wave)
180    }
181}
182
183/// Type of watch event
184#[derive(Debug, Clone, Copy, PartialEq, Eq)]
185pub enum WatchEventKind {
186    Created,
187    Modified,
188    Deleted,
189    SecurityConcern,
190}
191
192/// A file system event
193#[derive(Debug, Clone)]
194pub struct WatchEvent {
195    pub path: PathBuf,
196    pub kind: WatchEventKind,
197    pub timestamp: Instant,
198}
199
200/// The Hot Watcher - real-time directory intelligence
201pub struct HotWatcher {
202    /// Watched directories indexed by path
203    directories: Arc<RwLock<HashMap<PathBuf, WatchedDirectory>>>,
204    /// The file system watcher
205    watcher: Option<RecommendedWatcher>,
206    /// Event receiver
207    event_rx: Option<mpsc::Receiver<WatchEvent>>,
208    /// Event sender (for internal use)
209    event_tx: mpsc::Sender<WatchEvent>,
210    /// Last decay application
211    last_decay: Instant,
212}
213
214impl HotWatcher {
215    /// Create a new hot watcher
216    pub fn new() -> Self {
217        let (event_tx, event_rx) = mpsc::channel(1000);
218
219        Self {
220            directories: Arc::new(RwLock::new(HashMap::new())),
221            watcher: None,
222            event_rx: Some(event_rx),
223            event_tx,
224            last_decay: Instant::now(),
225        }
226    }
227
228    /// Start watching a directory
229    pub fn watch(&mut self, path: &Path) -> Result<()> {
230        // Add to our tracked directories
231        {
232            let mut dirs = self.directories.write().unwrap();
233            if !dirs.contains_key(path) {
234                dirs.insert(path.to_path_buf(), WatchedDirectory::new(path.to_path_buf()));
235            }
236        }
237
238        // Set up file system watcher if not already done
239        if self.watcher.is_none() {
240            let tx = self.event_tx.clone();
241            let dirs = Arc::clone(&self.directories);
242
243            let watcher = notify::recommended_watcher(move |res: notify::Result<Event>| {
244                if let Ok(event) = res {
245                    let kind = match event.kind {
246                        EventKind::Create(CreateKind::File | CreateKind::Folder) => {
247                            Some(WatchEventKind::Created)
248                        }
249                        EventKind::Modify(ModifyKind::Data(_) | ModifyKind::Name(_)) => {
250                            Some(WatchEventKind::Modified)
251                        }
252                        EventKind::Remove(RemoveKind::File | RemoveKind::Folder) => {
253                            Some(WatchEventKind::Deleted)
254                        }
255                        _ => None,
256                    };
257
258                    if let Some(kind) = kind {
259                        for path in event.paths {
260                            // Find the watched directory this belongs to
261                            let dirs_read = dirs.read().unwrap();
262                            for watched_path in dirs_read.keys() {
263                                if path.starts_with(watched_path) {
264                                    let watch_event = WatchEvent {
265                                        path: path.clone(),
266                                        kind,
267                                        timestamp: Instant::now(),
268                                    };
269                                    let _ = tx.blocking_send(watch_event);
270                                    break;
271                                }
272                            }
273                        }
274                    }
275                }
276            })?;
277
278            self.watcher = Some(watcher);
279        }
280
281        // Add path to the watcher
282        if let Some(ref mut watcher) = self.watcher {
283            watcher.watch(path, RecursiveMode::Recursive)?;
284        }
285
286        Ok(())
287    }
288
289    /// Stop watching a directory
290    pub fn unwatch(&mut self, path: &Path) -> Result<()> {
291        if let Some(ref mut watcher) = self.watcher {
292            watcher.unwatch(path)?;
293        }
294
295        let mut dirs = self.directories.write().unwrap();
296        dirs.remove(path);
297
298        Ok(())
299    }
300
301    /// Process pending events (call periodically)
302    pub async fn process_events(&mut self) {
303        // Apply decay
304        let elapsed = self.last_decay.elapsed().as_secs_f64();
305        if elapsed > 1.0 {
306            let mut dirs = self.directories.write().unwrap();
307            for dir in dirs.values_mut() {
308                dir.apply_decay(elapsed);
309            }
310            self.last_decay = Instant::now();
311        }
312
313        // Process new events
314        if let Some(ref mut rx) = self.event_rx {
315            while let Ok(event) = rx.try_recv() {
316                let mut dirs = self.directories.write().unwrap();
317
318                // Find the parent watched directory
319                for (watched_path, dir) in dirs.iter_mut() {
320                    if event.path.starts_with(watched_path) {
321                        dir.record_event(event.clone());
322                        break;
323                    }
324                }
325            }
326        }
327    }
328
329    /// Get all hot directories (sorted by arousal)
330    pub fn get_hot_directories(&self) -> Vec<WatchedDirectory> {
331        let dirs = self.directories.read().unwrap();
332        let mut hot: Vec<_> = dirs.values().filter(|d| d.is_hot()).cloned().collect();
333        hot.sort_by(|a, b| b.wave.arousal.partial_cmp(&a.wave.arousal).unwrap());
334        hot
335    }
336
337    /// Get directories by interest level
338    pub fn get_by_interest(&self, level: InterestLevel) -> Vec<WatchedDirectory> {
339        let dirs = self.directories.read().unwrap();
340        dirs.values()
341            .filter(|d| d.interest_level == level)
342            .cloned()
343            .collect()
344    }
345
346    /// Find directories that resonate with a given pattern
347    pub fn find_resonating(&self, wave: &Wave, min_resonance: f64) -> Vec<(WatchedDirectory, f64)> {
348        let dirs = self.directories.read().unwrap();
349        let mut resonating: Vec<_> = dirs
350            .values()
351            .map(|d| {
352                let resonance = d.wave.resonance_with(wave);
353                (d.clone(), resonance)
354            })
355            .filter(|(_, r)| *r >= min_resonance)
356            .collect();
357
358        resonating.sort_by(|a, b| b.1.partial_cmp(&a.1).unwrap());
359        resonating
360    }
361
362    /// Get a summary of watched directories
363    pub fn summary(&self) -> HotWatcherSummary {
364        let dirs = self.directories.read().unwrap();
365
366        let mut critical = 0;
367        let mut hot = 0;
368        let mut warm = 0;
369        let mut cold = 0;
370        let mut total_arousal = 0.0;
371
372        for dir in dirs.values() {
373            total_arousal += dir.wave.arousal;
374            match dir.interest_level {
375                InterestLevel::Critical => critical += 1,
376                InterestLevel::Important => hot += 1,
377                InterestLevel::Notable => warm += 1,
378                _ => cold += 1,
379            }
380        }
381
382        let avg_arousal = if dirs.is_empty() {
383            0.0
384        } else {
385            total_arousal / dirs.len() as f64
386        };
387
388        HotWatcherSummary {
389            total_watched: dirs.len(),
390            critical,
391            hot,
392            warm,
393            cold,
394            average_arousal: avg_arousal,
395        }
396    }
397}
398
399impl Default for HotWatcher {
400    fn default() -> Self {
401        Self::new()
402    }
403}
404
405/// Summary of hot watcher state
406#[derive(Debug, Clone)]
407pub struct HotWatcherSummary {
408    pub total_watched: usize,
409    pub critical: usize,
410    pub hot: usize,
411    pub warm: usize,
412    pub cold: usize,
413    pub average_arousal: f64,
414}
415
416impl std::fmt::Display for HotWatcherSummary {
417    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
418        write!(
419            f,
420            "Watching {} dirs: {} critical, {} hot, {} warm, {} cold (avg arousal: {:.2})",
421            self.total_watched,
422            self.critical,
423            self.hot,
424            self.warm,
425            self.cold,
426            self.average_arousal
427        )
428    }
429}
430
431#[cfg(test)]
432mod tests {
433    use super::*;
434
435    #[test]
436    fn test_watched_directory_creation() {
437        let dir = WatchedDirectory::new(PathBuf::from("/test"));
438        assert_eq!(dir.wave.arousal, 0.1);
439        assert_eq!(dir.interest_level, InterestLevel::Background);
440    }
441
442    #[test]
443    fn test_event_increases_arousal() {
444        let mut dir = WatchedDirectory::new(PathBuf::from("/test"));
445        let initial_arousal = dir.wave.arousal;
446
447        dir.record_event(WatchEvent {
448            path: PathBuf::from("/test/file.rs"),
449            kind: WatchEventKind::Created,
450            timestamp: Instant::now(),
451        });
452
453        assert!(dir.wave.arousal > initial_arousal);
454    }
455
456    #[test]
457    fn test_security_concern_lowers_valence() {
458        let mut dir = WatchedDirectory::new(PathBuf::from("/test"));
459
460        dir.record_event(WatchEvent {
461            path: PathBuf::from("/test/evil.js"),
462            kind: WatchEventKind::SecurityConcern,
463            timestamp: Instant::now(),
464        });
465
466        assert!(dir.wave.emotional_valence < 0.0);
467        assert_eq!(dir.wave.arousal, 1.0);
468    }
469
470    #[test]
471    fn test_decay_reduces_arousal() {
472        let mut dir = WatchedDirectory::new(PathBuf::from("/test"));
473        dir.wave.arousal = 1.0;
474
475        dir.apply_decay(1000.0); // 1000 seconds
476
477        assert!(dir.wave.arousal < 1.0);
478    }
479
480    #[test]
481    fn test_resonance() {
482        let dir1 = WatchedDirectory::new(PathBuf::from("/test1"));
483        let mut dir2 = WatchedDirectory::new(PathBuf::from("/test2"));
484
485        // Make them similar
486        dir2.wave.frequency = dir1.wave.frequency;
487        dir2.wave.emotional_valence = dir1.wave.emotional_valence;
488        dir2.wave.arousal = dir1.wave.arousal;
489
490        let resonance = dir1.resonance_with(&dir2);
491        assert!(resonance > 0.9); // Should be highly resonant
492    }
493}