use crate::adapter::TelemetryAdapter;
use crate::audit::{AuditEvent, AuditTrace};
use crate::envelope::{AdmissibilityEnvelope, EnvelopePosition};
use crate::episode::{Episode, EpisodeBuilder};
use crate::grammar::{GrammarMachine, GrammarState, GrammarTransition};
use crate::heuristics::{
AppliedStaticPrior, HeuristicId, HeuristicsBank, MatchResult, StaticPriorSet,
};
use crate::residual::{ResidualEstimator, ResidualSample, ResidualSign, ResidualSource};
use crate::ReasonCode;
#[derive(Debug, Clone)]
pub struct ObserverConfig {
pub persistence_window: usize,
pub hysteresis_count: u32,
pub default_envelope: AdmissibilityEnvelope,
pub static_priors: StaticPriorSet,
}
impl Default for ObserverConfig {
fn default() -> Self {
Self::balanced()
}
}
impl ObserverConfig {
pub fn balanced() -> Self {
Self {
persistence_window: 40,
hysteresis_count: 5,
default_envelope: AdmissibilityEnvelope::symmetric(
10.0,
1.0,
0.5,
crate::regime::WorkloadPhase::SteadyState,
),
static_priors: StaticPriorSet::default(),
}
}
pub fn fast_response() -> Self {
Self {
persistence_window: 20,
hysteresis_count: 3,
default_envelope: AdmissibilityEnvelope::symmetric(
5.0,
0.5,
0.25,
crate::regime::WorkloadPhase::SteadyState,
),
static_priors: StaticPriorSet::default(),
}
}
pub fn low_noise() -> Self {
Self {
persistence_window: 60,
hysteresis_count: 6,
default_envelope: AdmissibilityEnvelope::symmetric(
12.0,
1.2,
0.6,
crate::regime::WorkloadPhase::SteadyState,
),
static_priors: StaticPriorSet::default(),
}
}
pub fn with_static_priors(mut self, static_priors: StaticPriorSet) -> Self {
self.static_priors = static_priors;
self
}
}
#[derive(Debug, Clone)]
pub struct ObservationResult {
pub sign: ResidualSign,
pub grammar_state: GrammarState,
pub envelope_position: EnvelopePosition,
pub heuristic_match: MatchResult,
pub reason_evidence: ReasonEvidence,
pub transition: Option<GrammarTransition>,
pub completed_episode: Option<Episode>,
}
#[derive(Debug, Clone, Copy)]
pub struct ReasonEvidence {
pub reason_code: ReasonCode,
pub matched_heuristic: Option<HeuristicId>,
pub confidence: f64,
pub description: &'static str,
pub provenance: &'static str,
pub applied_prior: Option<AppliedStaticPrior>,
}
pub struct DsfbObserver {
estimator: ResidualEstimator,
grammar: GrammarMachine,
heuristics: HeuristicsBank,
episode_builder: EpisodeBuilder,
audit: AuditTrace,
envelope: AdmissibilityEnvelope,
source: ResidualSource,
static_priors: StaticPriorSet,
observation_count: u64,
}
impl DsfbObserver {
pub fn new(source: ResidualSource, config: &ObserverConfig) -> Self {
Self {
estimator: ResidualEstimator::new(source, config.persistence_window),
grammar: GrammarMachine::new(config.hysteresis_count),
heuristics: HeuristicsBank::default_bank(),
episode_builder: EpisodeBuilder::new(),
audit: AuditTrace::new(),
envelope: config.default_envelope,
source,
static_priors: config.static_priors,
observation_count: 0,
}
}
pub fn with_heuristics(
source: ResidualSource,
config: &ObserverConfig,
heuristics: HeuristicsBank,
) -> Self {
let mut obs = Self::new(source, config);
obs.heuristics = heuristics;
obs
}
pub fn set_envelope(&mut self, envelope: AdmissibilityEnvelope) {
self.envelope = envelope;
}
pub fn set_static_priors(&mut self, static_priors: StaticPriorSet) {
self.static_priors = static_priors;
}
pub fn observe_adapted<T, A>(&mut self, adapter: &A, input: &T) -> ObservationResult
where
A: TelemetryAdapter<T>,
{
let sample = adapter.adapt(input);
self.observe(&sample)
}
pub fn observe(&mut self, sample: &ResidualSample) -> ObservationResult {
self.observation_count += 1;
let sign = self.estimator.observe(sample);
let envelope_position = self.envelope.classify(&sign);
let (grammar_state, transition) = self.grammar.step(envelope_position, sample.timestamp_ns);
let heuristic_match =
self.heuristics
.match_sign_with_priors(&sign, grammar_state, &self.static_priors);
let reason_evidence = ReasonEvidence {
reason_code: heuristic_match.reason_code,
matched_heuristic: heuristic_match.matched_heuristic,
confidence: heuristic_match.confidence,
description: heuristic_match.description,
provenance: heuristic_match.provenance,
applied_prior: heuristic_match.applied_prior,
};
let completed_episode =
self.manage_episodes(&sign, grammar_state, &heuristic_match, transition.as_ref());
self.audit.record(AuditEvent {
timestamp_ns: sample.timestamp_ns,
residual: sign.residual,
drift: sign.drift,
slew: sign.slew,
envelope_position: match envelope_position {
EnvelopePosition::Interior => 0,
EnvelopePosition::BoundaryZone => 1,
EnvelopePosition::Exterior => 2,
},
grammar_state: grammar_state.severity(),
transition_occurred: transition.is_some(),
});
ObservationResult {
sign,
grammar_state,
envelope_position,
heuristic_match,
reason_evidence,
transition,
completed_episode,
}
}
fn manage_episodes(
&mut self,
sign: &ResidualSign,
grammar_state: GrammarState,
heuristic_match: &MatchResult,
transition: Option<&GrammarTransition>,
) -> Option<Episode> {
let mut completed = None;
if let Some(trans) = transition {
if self.episode_builder.is_open() {
completed = self.episode_builder.close(trans.timestamp_ns);
}
if grammar_state != GrammarState::Admissible || self.episode_builder.is_open() {
self.episode_builder.open(
trans.timestamp_ns,
grammar_state,
heuristic_match.reason_code,
self.source,
);
}
}
if self.episode_builder.is_open() {
self.episode_builder
.update(sign.residual, sign.drift, sign.slew);
}
completed
}
pub fn grammar_state(&self) -> GrammarState {
self.grammar.state()
}
pub fn observation_count(&self) -> u64 {
self.observation_count
}
pub fn audit_trace(&self) -> &AuditTrace {
&self.audit
}
pub fn current_episode(&self) -> Option<&Episode> {
self.episode_builder.current()
}
pub fn reset(&mut self) {
self.estimator.reset();
self.grammar.reset();
self.audit.reset();
self.episode_builder = EpisodeBuilder::new();
self.observation_count = 0;
}
pub fn source(&self) -> ResidualSource {
self.source
}
}
pub struct MultiChannelObserver {
observers: [Option<DsfbObserver>; 16],
active_count: usize,
}
impl MultiChannelObserver {
pub fn new() -> Self {
Self {
observers: Default::default(),
active_count: 0,
}
}
pub fn add_channel(&mut self, source: ResidualSource, config: &ObserverConfig) -> usize {
assert!(self.active_count < 16, "Maximum 16 channels supported");
let idx = self.active_count;
self.observers[idx] = Some(DsfbObserver::new(source, config));
self.active_count += 1;
idx
}
pub fn observe(
&mut self,
channel: usize,
sample: &ResidualSample,
) -> Option<ObservationResult> {
self.observers
.get_mut(channel)
.and_then(|opt| opt.as_mut())
.map(|obs| obs.observe(sample))
}
pub fn channel_state(&self, channel: usize) -> Option<GrammarState> {
self.observers
.get(channel)
.and_then(|opt| opt.as_ref())
.map(|obs| obs.grammar_state())
}
pub fn active_channels(&self) -> usize {
self.active_count
}
pub fn any_anomalous(&self) -> bool {
self.observers
.iter()
.filter_map(|opt| opt.as_ref())
.any(|obs| obs.grammar_state() != GrammarState::Admissible)
}
pub fn worst_state(&self) -> GrammarState {
self.observers
.iter()
.filter_map(|opt| opt.as_ref())
.map(|obs| obs.grammar_state())
.max_by_key(|s| s.severity())
.unwrap_or(GrammarState::Admissible)
}
}
impl Default for MultiChannelObserver {
fn default() -> Self {
Self::new()
}
}
#[cfg(test)]
mod tests {
use super::*;
fn sample(value: f64, baseline: f64, ts: u64) -> ResidualSample {
ResidualSample {
value,
baseline,
timestamp_ns: ts,
source: ResidualSource::Latency,
}
}
#[test]
fn test_observer_starts_admissible() {
let config = ObserverConfig::default();
let obs = DsfbObserver::new(ResidualSource::Latency, &config);
assert_eq!(obs.grammar_state(), GrammarState::Admissible);
assert_eq!(obs.observation_count(), 0);
}
#[test]
fn test_stable_system_stays_admissible() {
let config = ObserverConfig {
persistence_window: 10,
hysteresis_count: 3,
default_envelope: AdmissibilityEnvelope::symmetric(
10.0,
1.0,
0.5,
crate::regime::WorkloadPhase::SteadyState,
),
..ObserverConfig::fast_response()
};
let mut obs = DsfbObserver::new(ResidualSource::Latency, &config);
for i in 0..50u64 {
let s = sample(100.0, 100.0, i * 1_000_000_000);
let result = obs.observe(&s);
assert_eq!(result.grammar_state, GrammarState::Admissible);
}
}
#[test]
fn test_sustained_drift_triggers_boundary() {
let config = ObserverConfig {
persistence_window: 10,
hysteresis_count: 3,
default_envelope: AdmissibilityEnvelope::symmetric(
5.0,
0.5,
0.3,
crate::regime::WorkloadPhase::SteadyState,
),
..ObserverConfig::fast_response()
};
let mut obs = DsfbObserver::new(ResidualSource::Latency, &config);
let mut found_transition = false;
for i in 0..100u64 {
let value = 100.0 + 0.5 * i as f64;
let s = sample(value, 100.0, i * 1_000_000_000);
let result = obs.observe(&s);
if result.grammar_state != GrammarState::Admissible {
found_transition = true;
break;
}
}
assert!(
found_transition,
"Expected grammar transition from sustained drift"
);
}
#[test]
fn test_audit_trace_records_observations() {
let config = ObserverConfig::default();
let mut obs = DsfbObserver::new(ResidualSource::Latency, &config);
for i in 0..10u64 {
let s = sample(100.0, 100.0, i * 1_000_000_000);
obs.observe(&s);
}
assert_eq!(obs.audit_trace().total_count(), 10);
}
#[test]
fn test_multi_channel_worst_state() {
let config = ObserverConfig {
persistence_window: 5,
hysteresis_count: 2,
default_envelope: AdmissibilityEnvelope::symmetric(
2.0,
0.3,
0.2,
crate::regime::WorkloadPhase::SteadyState,
),
..ObserverConfig::fast_response()
};
let mut multi = MultiChannelObserver::new();
let ch0 = multi.add_channel(ResidualSource::Latency, &config);
let ch1 = multi.add_channel(ResidualSource::HeartbeatRtt, &config);
for i in 0..20u64 {
let s0 = ResidualSample {
value: 50.0,
baseline: 50.0,
timestamp_ns: i * 1_000_000_000,
source: ResidualSource::Latency,
};
multi.observe(ch0, &s0);
let s1 = ResidualSample {
value: 10.0,
baseline: 10.0,
timestamp_ns: i * 1_000_000_000,
source: ResidualSource::HeartbeatRtt,
};
multi.observe(ch1, &s1);
}
assert!(!multi.any_anomalous());
assert_eq!(multi.worst_state(), GrammarState::Admissible);
}
#[test]
fn test_nonintrusive_contract() {
let config = ObserverConfig::default();
let mut obs = DsfbObserver::new(ResidualSource::Latency, &config);
let original_value = 100.0f64;
let s = ResidualSample {
value: original_value,
baseline: 95.0,
timestamp_ns: 0,
source: ResidualSource::Latency,
};
let _result = obs.observe(&s);
assert_eq!(s.value, original_value);
assert_eq!(s.baseline, 95.0);
}
#[test]
fn test_observe_adapted_uses_adapter_output() {
struct QueueDepthAdapter;
impl TelemetryAdapter<u64> for QueueDepthAdapter {
fn adapt(&self, input: &u64) -> ResidualSample {
ResidualSample {
value: *input as f64,
baseline: 8.0,
timestamp_ns: 1_000,
source: ResidualSource::QueueDepth,
}
}
}
let mut observer =
DsfbObserver::new(ResidualSource::QueueDepth, &ObserverConfig::fast_response());
let result = observer.observe_adapted(&QueueDepthAdapter, &11);
assert_eq!(result.sign.source, ResidualSource::QueueDepth);
assert_eq!(result.sign.timestamp_ns, 1_000);
assert_eq!(result.sign.residual, 3.0);
}
}