Skip to main content

dsfb_gray/
observer.rs

1//! The DSFB observer: complete pipeline from residual sample to episode.
2//!
3//! [`DsfbObserver`] is the top-level entry point. It wires together the
4//! residual estimator, admissibility envelope, grammar state machine,
5//! heuristics bank, episode builder, and audit trace into a single
6//! `observe()` call that accepts an immutable sample and produces
7//! a fully classified structural interpretation.
8//!
9//! ## Non-Interference Contract
10//!
11//! The observer accepts all inputs as immutable references. No mutable
12//! reference to any upstream system component is created. If the observer
13//! is removed, upstream behavior is unchanged.
14
15use crate::adapter::TelemetryAdapter;
16use crate::audit::{AuditEvent, AuditTrace};
17use crate::envelope::{AdmissibilityEnvelope, EnvelopePosition};
18use crate::episode::{Episode, EpisodeBuilder};
19use crate::grammar::{GrammarMachine, GrammarState, GrammarTransition};
20use crate::heuristics::{
21    AppliedStaticPrior, HeuristicId, HeuristicsBank, MatchResult, StaticPriorSet,
22};
23use crate::residual::{ResidualEstimator, ResidualSample, ResidualSign, ResidualSource};
24use crate::ReasonCode;
25
26/// Configuration for the DSFB observer.
27#[derive(Debug, Clone)]
28pub struct ObserverConfig {
29    /// Persistence window for drift/slew estimation.
30    /// Recommended: 20–100 for 1-second sampling intervals.
31    pub persistence_window: usize,
32    /// Hysteresis count for grammar state transitions.
33    /// Recommended: 3–10.
34    pub hysteresis_count: u32,
35    /// Default admissibility envelope (used when no regime-specific
36    /// envelope is configured).
37    pub default_envelope: AdmissibilityEnvelope,
38    /// Optional static priors from the crate scanner or caller.
39    pub static_priors: StaticPriorSet,
40}
41
42impl Default for ObserverConfig {
43    fn default() -> Self {
44        Self::balanced()
45    }
46}
47
48impl ObserverConfig {
49    /// Balanced preset for general-purpose monitoring.
50    pub fn balanced() -> Self {
51        Self {
52            persistence_window: 40,
53            hysteresis_count: 5,
54            default_envelope: AdmissibilityEnvelope::symmetric(
55                10.0,
56                1.0,
57                0.5,
58                crate::regime::WorkloadPhase::SteadyState,
59            ),
60            static_priors: StaticPriorSet::default(),
61        }
62    }
63
64    /// Lower-latency preset that favors earlier transitions.
65    pub fn fast_response() -> Self {
66        Self {
67            persistence_window: 20,
68            hysteresis_count: 3,
69            default_envelope: AdmissibilityEnvelope::symmetric(
70                5.0,
71                0.5,
72                0.25,
73                crate::regime::WorkloadPhase::SteadyState,
74            ),
75            static_priors: StaticPriorSet::default(),
76        }
77    }
78
79    /// Conservative preset that favors stability over early transitions.
80    pub fn low_noise() -> Self {
81        Self {
82            persistence_window: 60,
83            hysteresis_count: 6,
84            default_envelope: AdmissibilityEnvelope::symmetric(
85                12.0,
86                1.2,
87                0.6,
88                crate::regime::WorkloadPhase::SteadyState,
89            ),
90            static_priors: StaticPriorSet::default(),
91        }
92    }
93
94    /// Return a copy of this configuration with static priors attached.
95    pub fn with_static_priors(mut self, static_priors: StaticPriorSet) -> Self {
96        self.static_priors = static_priors;
97        self
98    }
99}
100
101/// Observation result from a single `observe()` call.
102#[derive(Debug, Clone)]
103pub struct ObservationResult {
104    /// The computed residual sign (r, ω, α).
105    pub sign: ResidualSign,
106    /// Current grammar state after this observation.
107    pub grammar_state: GrammarState,
108    /// Envelope position classification.
109    pub envelope_position: EnvelopePosition,
110    /// Heuristic match result (reason code + confidence).
111    pub heuristic_match: MatchResult,
112    /// Human-readable evidence for the emitted reason code.
113    pub reason_evidence: ReasonEvidence,
114    /// Grammar transition, if one occurred at this step.
115    pub transition: Option<GrammarTransition>,
116    /// Completed episode, if a grammar transition closed the previous one.
117    pub completed_episode: Option<Episode>,
118}
119
120/// Human-readable explanation of the structural reason selected by DSFB.
121#[derive(Debug, Clone, Copy)]
122pub struct ReasonEvidence {
123    /// Selected reason code.
124    pub reason_code: ReasonCode,
125    /// Heuristic that produced the reason code, if any.
126    pub matched_heuristic: Option<HeuristicId>,
127    /// Match confidence after bounded threshold scaling.
128    pub confidence: f64,
129    /// Human-readable description of the pattern.
130    pub description: &'static str,
131    /// Rust-specific provenance of the pattern.
132    pub provenance: &'static str,
133    /// Static prior applied to the winning heuristic, if any.
134    pub applied_prior: Option<AppliedStaticPrior>,
135}
136
137/// The DSFB observer for a single residual source channel.
138///
139/// For multi-channel observation (e.g., monitoring latency + throughput +
140/// heartbeat RTT simultaneously), create one `DsfbObserver` per channel
141/// and correlate their outputs externally.
142pub struct DsfbObserver {
143    estimator: ResidualEstimator,
144    grammar: GrammarMachine,
145    heuristics: HeuristicsBank,
146    episode_builder: EpisodeBuilder,
147    audit: AuditTrace,
148    envelope: AdmissibilityEnvelope,
149    source: ResidualSource,
150    static_priors: StaticPriorSet,
151    observation_count: u64,
152}
153
154impl DsfbObserver {
155    /// Create a new observer for the given source channel.
156    pub fn new(source: ResidualSource, config: &ObserverConfig) -> Self {
157        Self {
158            estimator: ResidualEstimator::new(source, config.persistence_window),
159            grammar: GrammarMachine::new(config.hysteresis_count),
160            heuristics: HeuristicsBank::default_bank(),
161            episode_builder: EpisodeBuilder::new(),
162            audit: AuditTrace::new(),
163            envelope: config.default_envelope,
164            source,
165            static_priors: config.static_priors,
166            observation_count: 0,
167        }
168    }
169
170    /// Create an observer with a custom heuristics bank.
171    pub fn with_heuristics(
172        source: ResidualSource,
173        config: &ObserverConfig,
174        heuristics: HeuristicsBank,
175    ) -> Self {
176        let mut obs = Self::new(source, config);
177        obs.heuristics = heuristics;
178        obs
179    }
180
181    /// Set the admissibility envelope (e.g., after regime classification).
182    pub fn set_envelope(&mut self, envelope: AdmissibilityEnvelope) {
183        self.envelope = envelope;
184    }
185
186    /// Replace the static priors used during heuristic matching.
187    pub fn set_static_priors(&mut self, static_priors: StaticPriorSet) {
188        self.static_priors = static_priors;
189    }
190
191    /// Adapt and observe one application-specific telemetry record.
192    pub fn observe_adapted<T, A>(&mut self, adapter: &A, input: &T) -> ObservationResult
193    where
194        A: TelemetryAdapter<T>,
195    {
196        let sample = adapter.adapt(input);
197        self.observe(&sample)
198    }
199
200    /// Process a single residual sample and return the full observation result.
201    ///
202    /// This is the primary API. It accepts an immutable reference to a sample
203    /// and returns a complete structural interpretation.
204    ///
205    /// ## Non-Interference
206    ///
207    /// The sample is accepted as `&ResidualSample`. No mutation of the sample
208    /// or its originating system is possible through this API.
209    pub fn observe(&mut self, sample: &ResidualSample) -> ObservationResult {
210        self.observation_count += 1;
211
212        // Step 1: Compute residual sign (r, ω, α)
213        let sign = self.estimator.observe(sample);
214
215        // Step 2: Classify against admissibility envelope
216        let envelope_position = self.envelope.classify(&sign);
217
218        // Step 3: Update grammar state machine
219        let (grammar_state, transition) = self.grammar.step(envelope_position, sample.timestamp_ns);
220
221        // Step 4: Match against heuristics bank
222        let heuristic_match =
223            self.heuristics
224                .match_sign_with_priors(&sign, grammar_state, &self.static_priors);
225        let reason_evidence = ReasonEvidence {
226            reason_code: heuristic_match.reason_code,
227            matched_heuristic: heuristic_match.matched_heuristic,
228            confidence: heuristic_match.confidence,
229            description: heuristic_match.description,
230            provenance: heuristic_match.provenance,
231            applied_prior: heuristic_match.applied_prior,
232        };
233
234        // Step 5: Manage episodes
235        let completed_episode =
236            self.manage_episodes(&sign, grammar_state, &heuristic_match, transition.as_ref());
237
238        // Step 6: Record audit event
239        self.audit.record(AuditEvent {
240            timestamp_ns: sample.timestamp_ns,
241            residual: sign.residual,
242            drift: sign.drift,
243            slew: sign.slew,
244            envelope_position: match envelope_position {
245                EnvelopePosition::Interior => 0,
246                EnvelopePosition::BoundaryZone => 1,
247                EnvelopePosition::Exterior => 2,
248            },
249            grammar_state: grammar_state.severity(),
250            transition_occurred: transition.is_some(),
251        });
252
253        ObservationResult {
254            sign,
255            grammar_state,
256            envelope_position,
257            heuristic_match,
258            reason_evidence,
259            transition,
260            completed_episode,
261        }
262    }
263
264    /// Manage episode lifecycle based on grammar transitions.
265    fn manage_episodes(
266        &mut self,
267        sign: &ResidualSign,
268        grammar_state: GrammarState,
269        heuristic_match: &MatchResult,
270        transition: Option<&GrammarTransition>,
271    ) -> Option<Episode> {
272        let mut completed = None;
273
274        if let Some(trans) = transition {
275            // Close the previous episode if one was open
276            if self.episode_builder.is_open() {
277                completed = self.episode_builder.close(trans.timestamp_ns);
278            }
279
280            // Open a new episode for the new state
281            if grammar_state != GrammarState::Admissible || self.episode_builder.is_open() {
282                self.episode_builder.open(
283                    trans.timestamp_ns,
284                    grammar_state,
285                    heuristic_match.reason_code,
286                    self.source,
287                );
288            }
289        }
290
291        // Update the current episode with this observation
292        if self.episode_builder.is_open() {
293            self.episode_builder
294                .update(sign.residual, sign.drift, sign.slew);
295        }
296
297        completed
298    }
299
300    /// Current grammar state.
301    pub fn grammar_state(&self) -> GrammarState {
302        self.grammar.state()
303    }
304
305    /// Total observations processed.
306    pub fn observation_count(&self) -> u64 {
307        self.observation_count
308    }
309
310    /// Reference to the audit trace.
311    pub fn audit_trace(&self) -> &AuditTrace {
312        &self.audit
313    }
314
315    /// Current open episode, if any.
316    pub fn current_episode(&self) -> Option<&Episode> {
317        self.episode_builder.current()
318    }
319
320    /// Reset the observer state. Used on system restart or phase transition.
321    pub fn reset(&mut self) {
322        self.estimator.reset();
323        self.grammar.reset();
324        self.audit.reset();
325        self.episode_builder = EpisodeBuilder::new();
326        self.observation_count = 0;
327    }
328
329    /// Source channel this observer tracks.
330    pub fn source(&self) -> ResidualSource {
331        self.source
332    }
333}
334
335/// Multi-channel observer that tracks multiple residual sources simultaneously.
336///
337/// Provides a unified interface for observing all telemetry channels of a
338/// distributed system and correlating their structural interpretations.
339pub struct MultiChannelObserver {
340    observers: [Option<DsfbObserver>; 16],
341    active_count: usize,
342}
343
344impl MultiChannelObserver {
345    /// Create a new multi-channel observer with no channels configured.
346    pub fn new() -> Self {
347        Self {
348            observers: Default::default(),
349            active_count: 0,
350        }
351    }
352
353    /// Add an observer for a new source channel. Returns the channel index.
354    ///
355    /// # Panics
356    ///
357    /// Panics if 16 channels are already configured.
358    pub fn add_channel(&mut self, source: ResidualSource, config: &ObserverConfig) -> usize {
359        assert!(self.active_count < 16, "Maximum 16 channels supported");
360        let idx = self.active_count;
361        self.observers[idx] = Some(DsfbObserver::new(source, config));
362        self.active_count += 1;
363        idx
364    }
365
366    /// Observe a sample on a specific channel.
367    pub fn observe(
368        &mut self,
369        channel: usize,
370        sample: &ResidualSample,
371    ) -> Option<ObservationResult> {
372        self.observers
373            .get_mut(channel)
374            .and_then(|opt| opt.as_mut())
375            .map(|obs| obs.observe(sample))
376    }
377
378    /// Get the grammar state of a specific channel.
379    pub fn channel_state(&self, channel: usize) -> Option<GrammarState> {
380        self.observers
381            .get(channel)
382            .and_then(|opt| opt.as_ref())
383            .map(|obs| obs.grammar_state())
384    }
385
386    /// Number of active channels.
387    pub fn active_channels(&self) -> usize {
388        self.active_count
389    }
390
391    /// Check if ANY channel is in Boundary or Violation state.
392    pub fn any_anomalous(&self) -> bool {
393        self.observers
394            .iter()
395            .filter_map(|opt| opt.as_ref())
396            .any(|obs| obs.grammar_state() != GrammarState::Admissible)
397    }
398
399    /// Collect the worst grammar state across all channels.
400    pub fn worst_state(&self) -> GrammarState {
401        self.observers
402            .iter()
403            .filter_map(|opt| opt.as_ref())
404            .map(|obs| obs.grammar_state())
405            .max_by_key(|s| s.severity())
406            .unwrap_or(GrammarState::Admissible)
407    }
408}
409
410impl Default for MultiChannelObserver {
411    fn default() -> Self {
412        Self::new()
413    }
414}
415
416#[cfg(test)]
417mod tests {
418    use super::*;
419
420    fn sample(value: f64, baseline: f64, ts: u64) -> ResidualSample {
421        ResidualSample {
422            value,
423            baseline,
424            timestamp_ns: ts,
425            source: ResidualSource::Latency,
426        }
427    }
428
429    #[test]
430    fn test_observer_starts_admissible() {
431        let config = ObserverConfig::default();
432        let obs = DsfbObserver::new(ResidualSource::Latency, &config);
433        assert_eq!(obs.grammar_state(), GrammarState::Admissible);
434        assert_eq!(obs.observation_count(), 0);
435    }
436
437    #[test]
438    fn test_stable_system_stays_admissible() {
439        let config = ObserverConfig {
440            persistence_window: 10,
441            hysteresis_count: 3,
442            default_envelope: AdmissibilityEnvelope::symmetric(
443                10.0,
444                1.0,
445                0.5,
446                crate::regime::WorkloadPhase::SteadyState,
447            ),
448            ..ObserverConfig::fast_response()
449        };
450        let mut obs = DsfbObserver::new(ResidualSource::Latency, &config);
451
452        for i in 0..50u64 {
453            let s = sample(100.0, 100.0, i * 1_000_000_000);
454            let result = obs.observe(&s);
455            assert_eq!(result.grammar_state, GrammarState::Admissible);
456        }
457    }
458
459    #[test]
460    fn test_sustained_drift_triggers_boundary() {
461        let config = ObserverConfig {
462            persistence_window: 10,
463            hysteresis_count: 3,
464            default_envelope: AdmissibilityEnvelope::symmetric(
465                5.0,
466                0.5,
467                0.3,
468                crate::regime::WorkloadPhase::SteadyState,
469            ),
470            ..ObserverConfig::fast_response()
471        };
472        let mut obs = DsfbObserver::new(ResidualSource::Latency, &config);
473
474        let mut found_transition = false;
475        // Feed a linearly increasing residual that will eventually
476        // breach the envelope boundary and trigger grammar transition
477        for i in 0..100u64 {
478            // value grows from 100 to 150 over 100 samples
479            let value = 100.0 + 0.5 * i as f64;
480            let s = sample(value, 100.0, i * 1_000_000_000);
481            let result = obs.observe(&s);
482            if result.grammar_state != GrammarState::Admissible {
483                found_transition = true;
484                break;
485            }
486        }
487        assert!(
488            found_transition,
489            "Expected grammar transition from sustained drift"
490        );
491    }
492
493    #[test]
494    fn test_audit_trace_records_observations() {
495        let config = ObserverConfig::default();
496        let mut obs = DsfbObserver::new(ResidualSource::Latency, &config);
497
498        for i in 0..10u64 {
499            let s = sample(100.0, 100.0, i * 1_000_000_000);
500            obs.observe(&s);
501        }
502
503        assert_eq!(obs.audit_trace().total_count(), 10);
504    }
505
506    #[test]
507    fn test_multi_channel_worst_state() {
508        let config = ObserverConfig {
509            persistence_window: 5,
510            hysteresis_count: 2,
511            default_envelope: AdmissibilityEnvelope::symmetric(
512                2.0,
513                0.3,
514                0.2,
515                crate::regime::WorkloadPhase::SteadyState,
516            ),
517            ..ObserverConfig::fast_response()
518        };
519        let mut multi = MultiChannelObserver::new();
520        let ch0 = multi.add_channel(ResidualSource::Latency, &config);
521        let ch1 = multi.add_channel(ResidualSource::HeartbeatRtt, &config);
522
523        // Feed stable data to both channels
524        for i in 0..20u64 {
525            let s0 = ResidualSample {
526                value: 50.0,
527                baseline: 50.0,
528                timestamp_ns: i * 1_000_000_000,
529                source: ResidualSource::Latency,
530            };
531            multi.observe(ch0, &s0);
532
533            let s1 = ResidualSample {
534                value: 10.0,
535                baseline: 10.0,
536                timestamp_ns: i * 1_000_000_000,
537                source: ResidualSource::HeartbeatRtt,
538            };
539            multi.observe(ch1, &s1);
540        }
541
542        assert!(!multi.any_anomalous());
543        assert_eq!(multi.worst_state(), GrammarState::Admissible);
544    }
545
546    #[test]
547    fn test_nonintrusive_contract() {
548        // This test verifies the non-interference contract:
549        // The observer only accepts immutable references.
550        let config = ObserverConfig::default();
551        let mut obs = DsfbObserver::new(ResidualSource::Latency, &config);
552
553        let original_value = 100.0f64;
554        let s = ResidualSample {
555            value: original_value,
556            baseline: 95.0,
557            timestamp_ns: 0,
558            source: ResidualSource::Latency,
559        };
560
561        // The observer takes &ResidualSample — immutable reference.
562        // After observation, the original sample is unchanged.
563        let _result = obs.observe(&s);
564
565        // Original sample is still accessible and unchanged
566        assert_eq!(s.value, original_value);
567        assert_eq!(s.baseline, 95.0);
568    }
569
570    #[test]
571    fn test_observe_adapted_uses_adapter_output() {
572        struct QueueDepthAdapter;
573
574        impl TelemetryAdapter<u64> for QueueDepthAdapter {
575            fn adapt(&self, input: &u64) -> ResidualSample {
576                ResidualSample {
577                    value: *input as f64,
578                    baseline: 8.0,
579                    timestamp_ns: 1_000,
580                    source: ResidualSource::QueueDepth,
581                }
582            }
583        }
584
585        let mut observer =
586            DsfbObserver::new(ResidualSource::QueueDepth, &ObserverConfig::fast_response());
587        let result = observer.observe_adapted(&QueueDepthAdapter, &11);
588        assert_eq!(result.sign.source, ResidualSource::QueueDepth);
589        assert_eq!(result.sign.timestamp_ns, 1_000);
590        assert_eq!(result.sign.residual, 3.0);
591    }
592}