Skip to main content

jsdet_core/
streaming.rs

1/// Streaming observation API for real-time reaction during execution.
2///
3/// Instead of waiting for all scripts to finish, the consumer receives
4/// each observation as it happens. This enables:
5///
6/// - **Early termination** — stop execution when a confirmed finding is detected
7/// - **Real-time MCTS reward** — Soleno feeds observations directly to search
8/// - **Rate limiting** — drop observations when the buffer is too large
9/// - **Live monitoring** — security researchers watch execution in real-time
10///
11use crate::observation::Observation;
12
13/// Control flow decision returned by the streaming callback.
14#[derive(Debug, Clone, Copy, PartialEq, Eq)]
15pub enum ControlFlow {
16    /// Continue execution normally.
17    Continue,
18    /// Halt execution immediately. Remaining scripts are not evaluated.
19    /// The `ExecutionResult` will have `timed_out = true` and contain
20    /// all observations collected up to this point.
21    Halt,
22}
23
24/// A callback that receives observations during execution.
25///
26/// Implement this trait to process observations in real-time.
27/// The default implementation collects into a Vec (batch mode).
28pub trait ObservationSink: Send {
29    /// Called for each observation during execution.
30    ///
31    /// Return `ControlFlow::Continue` to keep executing.
32    /// Return `ControlFlow::Halt` to stop immediately.
33    fn on_observation(&mut self, observation: &Observation) -> ControlFlow;
34
35    /// Called when execution completes (normally or via halt).
36    /// Override to perform cleanup or final analysis.
37    fn on_complete(&mut self) {}
38}
39
40/// Batch collector — the default sink. Collects all observations into a Vec.
41pub struct BatchCollector {
42    observations: Vec<Observation>,
43    max_observations: usize,
44}
45
46impl BatchCollector {
47    #[must_use]
48    pub fn new(max_observations: usize) -> Self {
49        Self {
50            observations: Vec::new(),
51            max_observations,
52        }
53    }
54
55    #[must_use]
56    pub fn into_observations(self) -> Vec<Observation> {
57        self.observations
58    }
59}
60
61impl ObservationSink for BatchCollector {
62    fn on_observation(&mut self, observation: &Observation) -> ControlFlow {
63        if self.observations.len() < self.max_observations {
64            self.observations.push(observation.clone());
65        }
66        ControlFlow::Continue
67    }
68}
69
70/// Early-stop sink — halts execution when a predicate matches.
71pub struct EarlyStopSink<F: FnMut(&Observation) -> bool> {
72    predicate: F,
73    observations: Vec<Observation>,
74}
75
76impl<F: FnMut(&Observation) -> bool> EarlyStopSink<F> {
77    pub fn new(predicate: F) -> Self {
78        Self {
79            predicate,
80            observations: Vec::new(),
81        }
82    }
83
84    pub fn into_observations(self) -> Vec<Observation> {
85        self.observations
86    }
87}
88
89impl<F: FnMut(&Observation) -> bool + Send> ObservationSink for EarlyStopSink<F> {
90    fn on_observation(&mut self, observation: &Observation) -> ControlFlow {
91        self.observations.push(observation.clone());
92        if (self.predicate)(observation) {
93            ControlFlow::Halt
94        } else {
95            ControlFlow::Continue
96        }
97    }
98}
99
100/// Counting sink — counts observations by category without storing them.
101/// Useful for high-throughput scanning where individual observations
102/// are less important than aggregate behavior.
103#[derive(Debug, Default)]
104pub struct CountingSink {
105    pub api_calls: u64,
106    pub dom_mutations: u64,
107    pub network_requests: u64,
108    pub dynamic_code: u64,
109    pub cookie_access: u64,
110    pub errors: u64,
111    pub wasm_instantiations: u64,
112    pub fingerprint_access: u64,
113    pub context_messages: u64,
114    pub resource_limits: u64,
115    pub total: u64,
116}
117
118impl ObservationSink for CountingSink {
119    fn on_observation(&mut self, observation: &Observation) -> ControlFlow {
120        // CRITICAL FIX: Use saturating arithmetic to prevent overflow
121        self.total = self.total.saturating_add(1);
122        match observation {
123            Observation::ApiCall { .. } => self.api_calls = self.api_calls.saturating_add(1),
124            Observation::DomMutation { .. } => {
125                self.dom_mutations = self.dom_mutations.saturating_add(1)
126            }
127            Observation::NetworkRequest { .. } => {
128                self.network_requests = self.network_requests.saturating_add(1)
129            }
130            Observation::DynamicCodeExec { .. } => {
131                self.dynamic_code = self.dynamic_code.saturating_add(1)
132            }
133            Observation::CookieAccess { .. } => {
134                self.cookie_access = self.cookie_access.saturating_add(1)
135            }
136            Observation::Error { .. } => self.errors = self.errors.saturating_add(1),
137            Observation::WasmInstantiation { .. } => {
138                self.wasm_instantiations = self.wasm_instantiations.saturating_add(1)
139            }
140            Observation::FingerprintAccess { .. } => {
141                self.fingerprint_access = self.fingerprint_access.saturating_add(1)
142            }
143            Observation::ContextMessage { .. } => {
144                self.context_messages = self.context_messages.saturating_add(1)
145            }
146            Observation::ResourceLimit { .. } => {
147                self.resource_limits = self.resource_limits.saturating_add(1)
148            }
149            _ => {}
150        }
151        ControlFlow::Continue
152    }
153}