Skip to main content

oris_evokernel/
confidence_daemon.rs

1//! Background confidence control daemon — Issue #283 (Stream D).
2//!
3//! `ConfidenceDaemon` is a `tokio::spawn` background task that periodically
4//! revalidates tracked assets and applies automatic demotion/quarantine when
5//! their confidence drops below `MIN_REPLAY_CONFIDENCE`.
6//!
7//! # Design
8//!
9//! ```text
10//! ConfidenceDaemon::spawn()
11//!     └─ tokio background task
12//!           ↓ every poll_interval
13//!       for each TrackedAsset:
14//!           evaluate_confidence_revalidation()
15//!             → Failed? → evaluate_asset_demotion()
16//!                           → new_state = Quarantined|Demoted
17//!           update shared state
18//! ```
19//!
20//! Quarantined assets set `replay_eligible = false` on their `TrackedAsset`
21//! entry, which callers must consult before replay selection.
22
23use std::sync::{Arc, Mutex};
24use std::time::Duration;
25
26use oris_agent_contract::{
27    ConfidenceDemotionReasonCode, ConfidenceRevalidationResult, ConfidenceState, DemotionDecision,
28    ReplayEligibility, RevalidationOutcome,
29};
30use oris_evolution::MIN_REPLAY_CONFIDENCE;
31use serde::{Deserialize, Serialize};
32use tokio::task::JoinHandle;
33use tokio::time;
34
35// ── ConfidenceDaemonConfig ─────────────────────────────────────────────────
36
37/// Configuration for `ConfidenceDaemon`.
38#[derive(Clone, Debug, Serialize, Deserialize)]
39pub struct ConfidenceDaemonConfig {
40    /// How often the daemon wakes up and revalidates all tracked assets.
41    pub poll_interval: Duration,
42    /// Confidence score below which an asset is eligible for automatic demotion.
43    /// Must be `<= MIN_REPLAY_CONFIDENCE`.
44    pub demotion_confidence_threshold: f32,
45}
46
47impl Default for ConfidenceDaemonConfig {
48    fn default() -> Self {
49        Self {
50            poll_interval: Duration::from_secs(60),
51            demotion_confidence_threshold: MIN_REPLAY_CONFIDENCE,
52        }
53    }
54}
55
56// ── TrackedAsset ───────────────────────────────────────────────────────────
57
58/// A single asset tracked by the confidence daemon.
59#[derive(Clone, Debug, Serialize, Deserialize)]
60pub struct TrackedAsset {
61    /// Unique asset identifier (gene id or capsule id).
62    pub asset_id: String,
63    /// Current confidence lifecycle state.
64    pub state: ConfidenceState,
65    /// Number of consecutive failures recorded against this asset.
66    pub failure_count: u32,
67    /// Current decayed confidence score in `[0.0, 1.0]`.
68    pub decayed_confidence: f32,
69    /// Whether this asset is eligible for replay.  `false` for quarantined
70    /// assets — callers must not select quarantined assets for replay.
71    pub replay_eligible: bool,
72}
73
74impl TrackedAsset {
75    /// Create a new healthy asset entry.
76    pub fn new(asset_id: impl Into<String>, decayed_confidence: f32) -> Self {
77        let replay_eligible = decayed_confidence >= MIN_REPLAY_CONFIDENCE;
78        Self {
79            asset_id: asset_id.into(),
80            state: ConfidenceState::Active,
81            failure_count: 0,
82            decayed_confidence,
83            replay_eligible,
84        }
85    }
86}
87
88// ── ConfidenceEvaluator ────────────────────────────────────────────────────
89
90/// Trait that provides the two EvoKernel evaluation methods needed by the
91/// daemon.  Implemented by `EvoKernel` and by test doubles.
92pub trait ConfidenceEvaluator: Send + Sync {
93    /// Determine whether an asset passes or fails revalidation.
94    fn evaluate_confidence_revalidation(
95        &self,
96        asset_id: &str,
97        current_state: ConfidenceState,
98        failure_count: u32,
99    ) -> ConfidenceRevalidationResult;
100
101    /// Determine the new state (Demoted / Quarantined) after a failed
102    /// revalidation.
103    fn evaluate_asset_demotion(
104        &self,
105        asset_id: &str,
106        prior_state: ConfidenceState,
107        failure_count: u32,
108        reason_code: ConfidenceDemotionReasonCode,
109    ) -> DemotionDecision;
110}
111
112// ── ConfidenceDaemon ───────────────────────────────────────────────────────
113
114/// Background daemon that periodically revalidates tracked assets and
115/// quarantines those whose confidence falls below `MIN_REPLAY_CONFIDENCE`.
116pub struct ConfidenceDaemon {
117    assets: Arc<Mutex<Vec<TrackedAsset>>>,
118    evaluator: Arc<dyn ConfidenceEvaluator>,
119    config: ConfidenceDaemonConfig,
120}
121
122impl ConfidenceDaemon {
123    /// Create a new daemon with the given evaluator and configuration.
124    pub fn new(evaluator: Arc<dyn ConfidenceEvaluator>, config: ConfidenceDaemonConfig) -> Self {
125        Self {
126            assets: Arc::new(Mutex::new(Vec::new())),
127            evaluator,
128            config,
129        }
130    }
131
132    /// Convenience constructor with default configuration.
133    pub fn with_defaults(evaluator: Arc<dyn ConfidenceEvaluator>) -> Self {
134        Self::new(evaluator, ConfidenceDaemonConfig::default())
135    }
136
137    /// Register an asset for confidence tracking.
138    ///
139    /// If the asset_id is already registered, its entry is updated.
140    pub fn track(&self, asset: TrackedAsset) {
141        let mut guard = self.assets.lock().unwrap_or_else(|p| p.into_inner());
142        if let Some(existing) = guard.iter_mut().find(|a| a.asset_id == asset.asset_id) {
143            *existing = asset;
144        } else {
145            guard.push(asset);
146        }
147    }
148
149    /// Return a snapshot of all currently tracked assets.
150    pub fn snapshot(&self) -> Vec<TrackedAsset> {
151        self.assets
152            .lock()
153            .unwrap_or_else(|p| p.into_inner())
154            .clone()
155    }
156
157    /// Run one revalidation cycle synchronously.
158    ///
159    /// For each tracked asset:
160    /// 1. Call `evaluate_confidence_revalidation()`.
161    /// 2. If failed, call `evaluate_asset_demotion()`.
162    /// 3. Transition to `Quarantined` (or `Demoted`) and set
163    ///    `replay_eligible = false` for quarantined assets.
164    ///
165    /// Assets below `demotion_confidence_threshold` are treated as having an
166    /// implicit failure to trigger revalidation.
167    pub fn run_cycle(&self) {
168        let mut guard = self.assets.lock().unwrap_or_else(|p| p.into_inner());
169        let evaluator = Arc::clone(&self.evaluator);
170        let threshold = self.config.demotion_confidence_threshold;
171
172        for asset in guard.iter_mut() {
173            // Skip already-quarantined assets — they are excluded from replay
174            // and don't need further demotion cycles.
175            if asset.state == ConfidenceState::Quarantined {
176                asset.replay_eligible = false;
177                continue;
178            }
179
180            // Determine if below confidence threshold — treat as a failure.
181            let effective_failure_count = if asset.decayed_confidence < threshold {
182                asset.failure_count.saturating_add(1)
183            } else {
184                asset.failure_count
185            };
186
187            let revalidation = evaluator.evaluate_confidence_revalidation(
188                &asset.asset_id,
189                asset.state,
190                effective_failure_count,
191            );
192
193            let revalidation_failed = matches!(
194                revalidation.revalidation_result,
195                RevalidationOutcome::Failed | RevalidationOutcome::ErrorFailClosed
196            ) || revalidation.fail_closed;
197
198            if revalidation_failed {
199                // Escalate failure count and call demotion.
200                asset.failure_count = effective_failure_count;
201                let demotion = evaluator.evaluate_asset_demotion(
202                    &asset.asset_id,
203                    asset.state,
204                    asset.failure_count,
205                    ConfidenceDemotionReasonCode::ConfidenceDecayThreshold,
206                );
207                // Apply the new state.
208                asset.state = demotion.new_state;
209                asset.replay_eligible = demotion.replay_eligibility == ReplayEligibility::Eligible;
210            } else {
211                // Passed — restore eligibility if previously suspended.
212                if asset.decayed_confidence >= threshold {
213                    asset.replay_eligible = true;
214                }
215            }
216        }
217    }
218
219    /// Spawn the daemon as a background task.
220    ///
221    /// The returned `JoinHandle` runs indefinitely until the process
222    /// terminates or the handle is aborted.  Callers may call
223    /// `handle.abort()` to stop the daemon cleanly.
224    pub fn spawn(self) -> JoinHandle<()> {
225        let poll_interval = self.config.poll_interval;
226        let daemon = Arc::new(self);
227        tokio::spawn(async move {
228            let mut interval = time::interval(poll_interval);
229            loop {
230                interval.tick().await;
231                daemon.run_cycle();
232            }
233        })
234    }
235}
236
237// ── Tests ──────────────────────────────────────────────────────────────────
238
239#[cfg(test)]
240mod tests {
241    use super::*;
242
243    // ── Minimal ConfidenceEvaluator test double ──────────────────────────
244
245    struct StubEvaluator;
246
247    impl ConfidenceEvaluator for StubEvaluator {
248        fn evaluate_confidence_revalidation(
249            &self,
250            asset_id: &str,
251            current_state: ConfidenceState,
252            failure_count: u32,
253        ) -> ConfidenceRevalidationResult {
254            let failed = failure_count >= 3;
255            ConfidenceRevalidationResult {
256                revalidation_id: format!("crv-{asset_id}"),
257                asset_id: asset_id.to_string(),
258                confidence_state: current_state,
259                revalidation_result: if failed {
260                    RevalidationOutcome::Failed
261                } else {
262                    RevalidationOutcome::Passed
263                },
264                replay_eligibility: if failed {
265                    ReplayEligibility::Ineligible
266                } else {
267                    ReplayEligibility::Eligible
268                },
269                summary: format!("stub: failure_count={failure_count}"),
270                fail_closed: failed,
271            }
272        }
273
274        fn evaluate_asset_demotion(
275            &self,
276            asset_id: &str,
277            prior_state: ConfidenceState,
278            failure_count: u32,
279            reason_code: ConfidenceDemotionReasonCode,
280        ) -> DemotionDecision {
281            let new_state = if failure_count >= 5 {
282                ConfidenceState::Quarantined
283            } else {
284                ConfidenceState::Demoted
285            };
286            DemotionDecision {
287                demotion_id: format!("dem-{asset_id}"),
288                asset_id: asset_id.to_string(),
289                prior_state,
290                new_state,
291                reason_code,
292                replay_eligibility: ReplayEligibility::Ineligible,
293                summary: format!("stub demotion: new_state={new_state:?}"),
294                quarantine_transition: new_state == ConfidenceState::Quarantined,
295                fail_closed: true,
296            }
297        }
298    }
299
300    fn daemon_with_stub() -> ConfidenceDaemon {
301        ConfidenceDaemon::new(
302            Arc::new(StubEvaluator),
303            ConfidenceDaemonConfig {
304                poll_interval: Duration::from_secs(1),
305                demotion_confidence_threshold: MIN_REPLAY_CONFIDENCE,
306            },
307        )
308    }
309
310    // ── confidence_daemon_healthy_asset_stays_eligible ──────────────────
311
312    #[test]
313    fn confidence_daemon_healthy_asset_stays_eligible() {
314        let daemon = daemon_with_stub();
315        daemon.track(TrackedAsset::new("gene-ok", 1.0));
316
317        daemon.run_cycle();
318
319        let snap = daemon.snapshot();
320        let asset = snap.iter().find(|a| a.asset_id == "gene-ok").unwrap();
321        assert!(
322            asset.replay_eligible,
323            "healthy asset should remain eligible"
324        );
325        assert_ne!(asset.state, ConfidenceState::Quarantined);
326    }
327
328    // ── confidence_daemon_below_threshold_triggers_demotion ─────────────
329
330    #[test]
331    fn confidence_daemon_below_threshold_triggers_demotion() {
332        let daemon = daemon_with_stub();
333        // Start with 2 existing failures and confidence below threshold.
334        let mut asset = TrackedAsset::new("gene-low", 0.0);
335        asset.failure_count = 2;
336        daemon.track(asset);
337
338        // After run_cycle, effective_failure_count = 3 → revalidation fails → Demoted.
339        daemon.run_cycle();
340
341        let snap = daemon.snapshot();
342        let a = snap.iter().find(|a| a.asset_id == "gene-low").unwrap();
343        assert!(
344            matches!(
345                a.state,
346                ConfidenceState::Demoted | ConfidenceState::Quarantined
347            ),
348            "asset below threshold should be demoted, got {:?}",
349            a.state
350        );
351        assert!(
352            !a.replay_eligible,
353            "demoted asset must not be replay eligible"
354        );
355    }
356
357    // ── confidence_daemon_quarantine_auto_transition ─────────────────────
358
359    #[test]
360    fn confidence_daemon_quarantine_auto_transition() {
361        let daemon = daemon_with_stub();
362        // 4 existing failures + 1 from below-threshold push = 5 → Quarantined.
363        let mut asset = TrackedAsset::new("gene-quarantine", 0.0);
364        asset.failure_count = 4;
365        daemon.track(asset);
366
367        daemon.run_cycle();
368
369        let snap = daemon.snapshot();
370        let a = snap
371            .iter()
372            .find(|a| a.asset_id == "gene-quarantine")
373            .unwrap();
374        assert_eq!(
375            a.state,
376            ConfidenceState::Quarantined,
377            "asset with 5 failures should be Quarantined"
378        );
379        assert!(!a.replay_eligible);
380    }
381
382    // ── confidence_daemon_quarantined_excluded_from_replay ───────────────
383
384    #[test]
385    fn confidence_daemon_quarantined_excluded_from_replay() {
386        let daemon = daemon_with_stub();
387        let mut already_quarantined = TrackedAsset::new("gene-q", 0.0);
388        already_quarantined.state = ConfidenceState::Quarantined;
389        already_quarantined.replay_eligible = false;
390        daemon.track(already_quarantined);
391
392        // Run a cycle — quarantined asset should remain excluded.
393        daemon.run_cycle();
394
395        let snap = daemon.snapshot();
396        let a = snap.iter().find(|a| a.asset_id == "gene-q").unwrap();
397        assert_eq!(a.state, ConfidenceState::Quarantined);
398        assert!(
399            !a.replay_eligible,
400            "quarantined asset must never become eligible"
401        );
402    }
403
404    // ── confidence_daemon_spawn_returns_join_handle ──────────────────────
405
406    #[tokio::test]
407    async fn confidence_daemon_spawn_returns_join_handle() {
408        let config = ConfidenceDaemonConfig {
409            poll_interval: Duration::from_millis(50),
410            demotion_confidence_threshold: MIN_REPLAY_CONFIDENCE,
411        };
412        let daemon = ConfidenceDaemon::new(Arc::new(StubEvaluator), config);
413        let handle = daemon.spawn();
414        // Let it tick once.
415        tokio::time::sleep(Duration::from_millis(120)).await;
416        // Abort and confirm it was running.
417        handle.abort();
418        // aborted handle returns Err(JoinError::Cancelled)
419        let result = handle.await;
420        assert!(result.is_err(), "aborted handle should return an error");
421    }
422
423    // ── confidence_daemon_multiple_assets_independent ────────────────────
424
425    #[test]
426    fn confidence_daemon_multiple_assets_independent() {
427        let daemon = daemon_with_stub();
428        daemon.track(TrackedAsset::new("gene-a", 1.0)); // healthy
429        let mut b = TrackedAsset::new("gene-b", 0.0);
430        b.failure_count = 4;
431        daemon.track(b); // will be quarantined
432
433        daemon.run_cycle();
434
435        let snap = daemon.snapshot();
436        let a = snap.iter().find(|a| a.asset_id == "gene-a").unwrap();
437        let b = snap.iter().find(|a| a.asset_id == "gene-b").unwrap();
438        assert!(a.replay_eligible, "healthy asset must stay eligible");
439        assert!(!b.replay_eligible, "quarantined asset must not be eligible");
440        assert_eq!(b.state, ConfidenceState::Quarantined);
441    }
442}