1use crate::adapter::TelemetryAdapter;
16use crate::audit::{AuditEvent, AuditTrace};
17use crate::envelope::{AdmissibilityEnvelope, EnvelopePosition};
18use crate::episode::{Episode, EpisodeBuilder};
19use crate::grammar::{GrammarMachine, GrammarState, GrammarTransition};
20use crate::heuristics::{
21 AppliedStaticPrior, HeuristicId, HeuristicsBank, MatchResult, StaticPriorSet,
22};
23use crate::residual::{ResidualEstimator, ResidualSample, ResidualSign, ResidualSource};
24use crate::ReasonCode;
25
26#[derive(Debug, Clone)]
28pub struct ObserverConfig {
29 pub persistence_window: usize,
32 pub hysteresis_count: u32,
35 pub default_envelope: AdmissibilityEnvelope,
38 pub static_priors: StaticPriorSet,
40}
41
42impl Default for ObserverConfig {
43 fn default() -> Self {
44 Self::balanced()
45 }
46}
47
48impl ObserverConfig {
49 pub fn balanced() -> Self {
51 Self {
52 persistence_window: 40,
53 hysteresis_count: 5,
54 default_envelope: AdmissibilityEnvelope::symmetric(
55 10.0,
56 1.0,
57 0.5,
58 crate::regime::WorkloadPhase::SteadyState,
59 ),
60 static_priors: StaticPriorSet::default(),
61 }
62 }
63
64 pub fn fast_response() -> Self {
66 Self {
67 persistence_window: 20,
68 hysteresis_count: 3,
69 default_envelope: AdmissibilityEnvelope::symmetric(
70 5.0,
71 0.5,
72 0.25,
73 crate::regime::WorkloadPhase::SteadyState,
74 ),
75 static_priors: StaticPriorSet::default(),
76 }
77 }
78
79 pub fn low_noise() -> Self {
81 Self {
82 persistence_window: 60,
83 hysteresis_count: 6,
84 default_envelope: AdmissibilityEnvelope::symmetric(
85 12.0,
86 1.2,
87 0.6,
88 crate::regime::WorkloadPhase::SteadyState,
89 ),
90 static_priors: StaticPriorSet::default(),
91 }
92 }
93
94 pub fn with_static_priors(mut self, static_priors: StaticPriorSet) -> Self {
96 self.static_priors = static_priors;
97 self
98 }
99}
100
101#[derive(Debug, Clone)]
103pub struct ObservationResult {
104 pub sign: ResidualSign,
106 pub grammar_state: GrammarState,
108 pub envelope_position: EnvelopePosition,
110 pub heuristic_match: MatchResult,
112 pub reason_evidence: ReasonEvidence,
114 pub transition: Option<GrammarTransition>,
116 pub completed_episode: Option<Episode>,
118}
119
120#[derive(Debug, Clone, Copy)]
122pub struct ReasonEvidence {
123 pub reason_code: ReasonCode,
125 pub matched_heuristic: Option<HeuristicId>,
127 pub confidence: f64,
129 pub description: &'static str,
131 pub provenance: &'static str,
133 pub applied_prior: Option<AppliedStaticPrior>,
135}
136
137pub struct DsfbObserver {
143 estimator: ResidualEstimator,
144 grammar: GrammarMachine,
145 heuristics: HeuristicsBank,
146 episode_builder: EpisodeBuilder,
147 audit: AuditTrace,
148 envelope: AdmissibilityEnvelope,
149 source: ResidualSource,
150 static_priors: StaticPriorSet,
151 observation_count: u64,
152}
153
154impl DsfbObserver {
155 pub fn new(source: ResidualSource, config: &ObserverConfig) -> Self {
157 Self {
158 estimator: ResidualEstimator::new(source, config.persistence_window),
159 grammar: GrammarMachine::new(config.hysteresis_count),
160 heuristics: HeuristicsBank::default_bank(),
161 episode_builder: EpisodeBuilder::new(),
162 audit: AuditTrace::new(),
163 envelope: config.default_envelope,
164 source,
165 static_priors: config.static_priors,
166 observation_count: 0,
167 }
168 }
169
170 pub fn with_heuristics(
172 source: ResidualSource,
173 config: &ObserverConfig,
174 heuristics: HeuristicsBank,
175 ) -> Self {
176 let mut obs = Self::new(source, config);
177 obs.heuristics = heuristics;
178 obs
179 }
180
181 pub fn set_envelope(&mut self, envelope: AdmissibilityEnvelope) {
183 self.envelope = envelope;
184 }
185
186 pub fn set_static_priors(&mut self, static_priors: StaticPriorSet) {
188 self.static_priors = static_priors;
189 }
190
191 pub fn observe_adapted<T, A>(&mut self, adapter: &A, input: &T) -> ObservationResult
193 where
194 A: TelemetryAdapter<T>,
195 {
196 let sample = adapter.adapt(input);
197 self.observe(&sample)
198 }
199
200 pub fn observe(&mut self, sample: &ResidualSample) -> ObservationResult {
210 self.observation_count += 1;
211
212 let sign = self.estimator.observe(sample);
214
215 let envelope_position = self.envelope.classify(&sign);
217
218 let (grammar_state, transition) = self.grammar.step(envelope_position, sample.timestamp_ns);
220
221 let heuristic_match =
223 self.heuristics
224 .match_sign_with_priors(&sign, grammar_state, &self.static_priors);
225 let reason_evidence = ReasonEvidence {
226 reason_code: heuristic_match.reason_code,
227 matched_heuristic: heuristic_match.matched_heuristic,
228 confidence: heuristic_match.confidence,
229 description: heuristic_match.description,
230 provenance: heuristic_match.provenance,
231 applied_prior: heuristic_match.applied_prior,
232 };
233
234 let completed_episode =
236 self.manage_episodes(&sign, grammar_state, &heuristic_match, transition.as_ref());
237
238 self.audit.record(AuditEvent {
240 timestamp_ns: sample.timestamp_ns,
241 residual: sign.residual,
242 drift: sign.drift,
243 slew: sign.slew,
244 envelope_position: match envelope_position {
245 EnvelopePosition::Interior => 0,
246 EnvelopePosition::BoundaryZone => 1,
247 EnvelopePosition::Exterior => 2,
248 },
249 grammar_state: grammar_state.severity(),
250 transition_occurred: transition.is_some(),
251 });
252
253 ObservationResult {
254 sign,
255 grammar_state,
256 envelope_position,
257 heuristic_match,
258 reason_evidence,
259 transition,
260 completed_episode,
261 }
262 }
263
264 fn manage_episodes(
266 &mut self,
267 sign: &ResidualSign,
268 grammar_state: GrammarState,
269 heuristic_match: &MatchResult,
270 transition: Option<&GrammarTransition>,
271 ) -> Option<Episode> {
272 let mut completed = None;
273
274 if let Some(trans) = transition {
275 if self.episode_builder.is_open() {
277 completed = self.episode_builder.close(trans.timestamp_ns);
278 }
279
280 if grammar_state != GrammarState::Admissible || self.episode_builder.is_open() {
282 self.episode_builder.open(
283 trans.timestamp_ns,
284 grammar_state,
285 heuristic_match.reason_code,
286 self.source,
287 );
288 }
289 }
290
291 if self.episode_builder.is_open() {
293 self.episode_builder
294 .update(sign.residual, sign.drift, sign.slew);
295 }
296
297 completed
298 }
299
300 pub fn grammar_state(&self) -> GrammarState {
302 self.grammar.state()
303 }
304
305 pub fn observation_count(&self) -> u64 {
307 self.observation_count
308 }
309
310 pub fn audit_trace(&self) -> &AuditTrace {
312 &self.audit
313 }
314
315 pub fn current_episode(&self) -> Option<&Episode> {
317 self.episode_builder.current()
318 }
319
320 pub fn reset(&mut self) {
322 self.estimator.reset();
323 self.grammar.reset();
324 self.audit.reset();
325 self.episode_builder = EpisodeBuilder::new();
326 self.observation_count = 0;
327 }
328
329 pub fn source(&self) -> ResidualSource {
331 self.source
332 }
333}
334
335pub struct MultiChannelObserver {
340 observers: [Option<DsfbObserver>; 16],
341 active_count: usize,
342}
343
344impl MultiChannelObserver {
345 pub fn new() -> Self {
347 Self {
348 observers: Default::default(),
349 active_count: 0,
350 }
351 }
352
353 pub fn add_channel(&mut self, source: ResidualSource, config: &ObserverConfig) -> usize {
359 assert!(self.active_count < 16, "Maximum 16 channels supported");
360 let idx = self.active_count;
361 self.observers[idx] = Some(DsfbObserver::new(source, config));
362 self.active_count += 1;
363 idx
364 }
365
366 pub fn observe(
368 &mut self,
369 channel: usize,
370 sample: &ResidualSample,
371 ) -> Option<ObservationResult> {
372 self.observers
373 .get_mut(channel)
374 .and_then(|opt| opt.as_mut())
375 .map(|obs| obs.observe(sample))
376 }
377
378 pub fn channel_state(&self, channel: usize) -> Option<GrammarState> {
380 self.observers
381 .get(channel)
382 .and_then(|opt| opt.as_ref())
383 .map(|obs| obs.grammar_state())
384 }
385
386 pub fn active_channels(&self) -> usize {
388 self.active_count
389 }
390
391 pub fn any_anomalous(&self) -> bool {
393 self.observers
394 .iter()
395 .filter_map(|opt| opt.as_ref())
396 .any(|obs| obs.grammar_state() != GrammarState::Admissible)
397 }
398
399 pub fn worst_state(&self) -> GrammarState {
401 self.observers
402 .iter()
403 .filter_map(|opt| opt.as_ref())
404 .map(|obs| obs.grammar_state())
405 .max_by_key(|s| s.severity())
406 .unwrap_or(GrammarState::Admissible)
407 }
408}
409
410impl Default for MultiChannelObserver {
411 fn default() -> Self {
412 Self::new()
413 }
414}
415
416#[cfg(test)]
417mod tests {
418 use super::*;
419
420 fn sample(value: f64, baseline: f64, ts: u64) -> ResidualSample {
421 ResidualSample {
422 value,
423 baseline,
424 timestamp_ns: ts,
425 source: ResidualSource::Latency,
426 }
427 }
428
429 #[test]
430 fn test_observer_starts_admissible() {
431 let config = ObserverConfig::default();
432 let obs = DsfbObserver::new(ResidualSource::Latency, &config);
433 assert_eq!(obs.grammar_state(), GrammarState::Admissible);
434 assert_eq!(obs.observation_count(), 0);
435 }
436
437 #[test]
438 fn test_stable_system_stays_admissible() {
439 let config = ObserverConfig {
440 persistence_window: 10,
441 hysteresis_count: 3,
442 default_envelope: AdmissibilityEnvelope::symmetric(
443 10.0,
444 1.0,
445 0.5,
446 crate::regime::WorkloadPhase::SteadyState,
447 ),
448 ..ObserverConfig::fast_response()
449 };
450 let mut obs = DsfbObserver::new(ResidualSource::Latency, &config);
451
452 for i in 0..50u64 {
453 let s = sample(100.0, 100.0, i * 1_000_000_000);
454 let result = obs.observe(&s);
455 assert_eq!(result.grammar_state, GrammarState::Admissible);
456 }
457 }
458
459 #[test]
460 fn test_sustained_drift_triggers_boundary() {
461 let config = ObserverConfig {
462 persistence_window: 10,
463 hysteresis_count: 3,
464 default_envelope: AdmissibilityEnvelope::symmetric(
465 5.0,
466 0.5,
467 0.3,
468 crate::regime::WorkloadPhase::SteadyState,
469 ),
470 ..ObserverConfig::fast_response()
471 };
472 let mut obs = DsfbObserver::new(ResidualSource::Latency, &config);
473
474 let mut found_transition = false;
475 for i in 0..100u64 {
478 let value = 100.0 + 0.5 * i as f64;
480 let s = sample(value, 100.0, i * 1_000_000_000);
481 let result = obs.observe(&s);
482 if result.grammar_state != GrammarState::Admissible {
483 found_transition = true;
484 break;
485 }
486 }
487 assert!(
488 found_transition,
489 "Expected grammar transition from sustained drift"
490 );
491 }
492
493 #[test]
494 fn test_audit_trace_records_observations() {
495 let config = ObserverConfig::default();
496 let mut obs = DsfbObserver::new(ResidualSource::Latency, &config);
497
498 for i in 0..10u64 {
499 let s = sample(100.0, 100.0, i * 1_000_000_000);
500 obs.observe(&s);
501 }
502
503 assert_eq!(obs.audit_trace().total_count(), 10);
504 }
505
506 #[test]
507 fn test_multi_channel_worst_state() {
508 let config = ObserverConfig {
509 persistence_window: 5,
510 hysteresis_count: 2,
511 default_envelope: AdmissibilityEnvelope::symmetric(
512 2.0,
513 0.3,
514 0.2,
515 crate::regime::WorkloadPhase::SteadyState,
516 ),
517 ..ObserverConfig::fast_response()
518 };
519 let mut multi = MultiChannelObserver::new();
520 let ch0 = multi.add_channel(ResidualSource::Latency, &config);
521 let ch1 = multi.add_channel(ResidualSource::HeartbeatRtt, &config);
522
523 for i in 0..20u64 {
525 let s0 = ResidualSample {
526 value: 50.0,
527 baseline: 50.0,
528 timestamp_ns: i * 1_000_000_000,
529 source: ResidualSource::Latency,
530 };
531 multi.observe(ch0, &s0);
532
533 let s1 = ResidualSample {
534 value: 10.0,
535 baseline: 10.0,
536 timestamp_ns: i * 1_000_000_000,
537 source: ResidualSource::HeartbeatRtt,
538 };
539 multi.observe(ch1, &s1);
540 }
541
542 assert!(!multi.any_anomalous());
543 assert_eq!(multi.worst_state(), GrammarState::Admissible);
544 }
545
546 #[test]
547 fn test_nonintrusive_contract() {
548 let config = ObserverConfig::default();
551 let mut obs = DsfbObserver::new(ResidualSource::Latency, &config);
552
553 let original_value = 100.0f64;
554 let s = ResidualSample {
555 value: original_value,
556 baseline: 95.0,
557 timestamp_ns: 0,
558 source: ResidualSource::Latency,
559 };
560
561 let _result = obs.observe(&s);
564
565 assert_eq!(s.value, original_value);
567 assert_eq!(s.baseline, 95.0);
568 }
569
570 #[test]
571 fn test_observe_adapted_uses_adapter_output() {
572 struct QueueDepthAdapter;
573
574 impl TelemetryAdapter<u64> for QueueDepthAdapter {
575 fn adapt(&self, input: &u64) -> ResidualSample {
576 ResidualSample {
577 value: *input as f64,
578 baseline: 8.0,
579 timestamp_ns: 1_000,
580 source: ResidualSource::QueueDepth,
581 }
582 }
583 }
584
585 let mut observer =
586 DsfbObserver::new(ResidualSource::QueueDepth, &ObserverConfig::fast_response());
587 let result = observer.observe_adapted(&QueueDepthAdapter, &11);
588 assert_eq!(result.sign.source, ResidualSource::QueueDepth);
589 assert_eq!(result.sign.timestamp_ns, 1_000);
590 assert_eq!(result.sign.residual, 3.0);
591 }
592}