Skip to main content

axon/runtime/
reconcile_loop.rs

1//! AXON Runtime — ReconcileLoop (§λ-L-E Fase 3.1)
2//!
3//! Direct port of `axon/runtime/reconcile_loop.py`.
4//!
5//! Free-energy-minimizing control loop per tick:
6//!   1. OBSERVE — `handler.observe(obs, manifest)` → HandlerOutcome.
7//!   2. MEASURE — symmetric Jaccard distance on resource-name sets (drift).
8//!   3. GATE    — skip unless certainty > threshold AND drift > tolerance AND shield approves.
9//!   4. ACT     — apply `on_drift ∈ {provision, alert, refine}`.
10//!   5. BOUND   — cap at `max_retries`.
11
12#![allow(dead_code)]
13
14use std::collections::{HashMap, HashSet};
15
16use crate::handlers::base::{
17    Continuation, Handler, HandlerError, HandlerOutcome, identity_continuation, make_envelope,
18};
19use crate::ir_nodes::{
20    IRFabric, IRManifest, IRObserve, IRProgram, IRReconcile, IRResource,
21};
22
23// ═══════════════════════════════════════════════════════════════════
24//  REPORTS — one per tick
25// ═══════════════════════════════════════════════════════════════════
26
27/// One iteration of the control loop — fully serialisable and immutable.
28#[derive(Debug, Clone)]
29pub struct ReconcileTickReport {
30    pub reconcile_name: String,
31    pub observation: Option<HandlerOutcome>,
32    pub action: TickAction,
33    pub drift: f64,
34    pub certainty: f64,
35    pub shield_approved: bool,
36    pub retries_remaining: i64,
37    pub outcome: Option<HandlerOutcome>,
38    pub note: String,
39}
40
41/// Per-tick action outcome.
42#[derive(Debug, Clone, Copy, PartialEq, Eq)]
43pub enum TickAction {
44    Provision,
45    Alert,
46    Refine,
47    Noop,
48}
49
50impl TickAction {
51    pub fn as_str(&self) -> &'static str {
52        match self {
53            TickAction::Provision => "provision",
54            TickAction::Alert => "alert",
55            TickAction::Refine => "refine",
56            TickAction::Noop => "noop",
57        }
58    }
59}
60
61// ═══════════════════════════════════════════════════════════════════
62//  SHIELD ADAPTER — governance gate
63// ═══════════════════════════════════════════════════════════════════
64
65/// Shield adapter: `(reconcile_name, observation, drift) → approved`.
66///
67/// Real shields plug into this contract once runtime wiring for
68/// `ShieldApplyNode` lands (Fase 4+). Fake adapters simulate deny/approve
69/// in tests.
70pub type ShieldApprove = Box<dyn Fn(&str, &HandlerOutcome, f64) -> bool>;
71
72pub fn allow_all_shield() -> ShieldApprove {
73    Box::new(|_name, _obs, _drift| true)
74}
75
76pub fn deny_all_shield() -> ShieldApprove {
77    Box::new(|_name, _obs, _drift| false)
78}
79
80// ═══════════════════════════════════════════════════════════════════
81//  DRIFT METRIC — free-energy proxy
82// ═══════════════════════════════════════════════════════════════════
83
84/// Symmetric Jaccard distance on resource-name sets.
85///
86/// `|A △ B| / |A ∪ B|` — zero when belief and evidence agree, approaches
87/// 1.0 as they diverge. A principled D_KL proxy for unstructured resource
88/// inventories without priors.
89pub fn jaccard_drift(expected: &[String], observed: &[String]) -> f64 {
90    let a: HashSet<&String> = expected.iter().collect();
91    let b: HashSet<&String> = observed.iter().collect();
92    let union_len = a.union(&b).count();
93    if union_len == 0 {
94        return 0.0;
95    }
96    let sym_diff_len = a.symmetric_difference(&b).count();
97    (sym_diff_len as f64) / (union_len as f64)
98}
99
100// ═══════════════════════════════════════════════════════════════════
101//  RECONCILE LOOP
102// ═══════════════════════════════════════════════════════════════════
103
104/// Executes `tick()` / `run()` against a Handler to close the belief-evidence gap.
105pub struct ReconcileLoop<'p, H: Handler> {
106    ir: IRReconcile,
107    handler: H,
108    shield: ShieldApprove,
109    threshold: f64,
110    tolerance: f64,
111    retries_left: i64,
112    ticks: Vec<ReconcileTickReport>,
113    observe: IRObserve,
114    manifest: IRManifest,
115    resources: HashMap<String, IRResource>,
116    fabrics: HashMap<String, IRFabric>,
117    _phantom: std::marker::PhantomData<&'p ()>,
118}
119
120impl<'p, H: Handler> ReconcileLoop<'p, H> {
121    pub fn new(
122        ir_reconcile: IRReconcile,
123        program: &IRProgram,
124        handler: H,
125    ) -> Result<Self, HandlerError> {
126        Self::with_shield(ir_reconcile, program, handler, allow_all_shield())
127    }
128
129    pub fn with_shield(
130        ir_reconcile: IRReconcile,
131        program: &IRProgram,
132        handler: H,
133        shield: ShieldApprove,
134    ) -> Result<Self, HandlerError> {
135        let observe = program
136            .observations
137            .iter()
138            .find(|o| o.name == ir_reconcile.observe_ref)
139            .cloned()
140            .ok_or_else(|| {
141                HandlerError::caller(format!(
142                    "reconcile '{}' references unknown observe '{}'",
143                    ir_reconcile.name, ir_reconcile.observe_ref
144                ))
145            })?;
146        let manifest_name = observe.target.clone();
147        let manifest = program
148            .manifests
149            .iter()
150            .find(|m| m.name == manifest_name)
151            .cloned()
152            .ok_or_else(|| {
153                HandlerError::caller(format!(
154                    "reconcile '{}': observe '{}' targets unknown manifest '{}'",
155                    ir_reconcile.name, ir_reconcile.observe_ref, manifest_name
156                ))
157            })?;
158        let resources: HashMap<String, IRResource> = program
159            .resources
160            .iter()
161            .map(|r| (r.name.clone(), r.clone()))
162            .collect();
163        let fabrics: HashMap<String, IRFabric> = program
164            .fabrics
165            .iter()
166            .map(|f| (f.name.clone(), f.clone()))
167            .collect();
168        let threshold = ir_reconcile.threshold.unwrap_or(0.85);
169        let tolerance = ir_reconcile.tolerance.unwrap_or(0.10);
170        let retries_left = ir_reconcile.max_retries;
171        Ok(ReconcileLoop {
172            ir: ir_reconcile,
173            handler,
174            shield,
175            threshold,
176            tolerance,
177            retries_left,
178            ticks: Vec::new(),
179            observe,
180            manifest,
181            resources,
182            fabrics,
183            _phantom: std::marker::PhantomData,
184        })
185    }
186
187    /// One control-loop tick. Returns the report capturing all gating
188    /// decisions and any emitted HandlerOutcome.
189    pub fn tick(&mut self) -> Result<ReconcileTickReport, HandlerError> {
190        if self.retries_left <= 0 {
191            let report = ReconcileTickReport {
192                reconcile_name: self.ir.name.clone(),
193                observation: None,
194                action: TickAction::Noop,
195                drift: 0.0,
196                certainty: 0.0,
197                shield_approved: false,
198                retries_remaining: self.retries_left,
199                outcome: None,
200                note: "max_retries exhausted".into(),
201            };
202            self.ticks.push(report.clone());
203            return Ok(report);
204        }
205
206        let mut pass: Continuation<'_> = identity_continuation();
207        let observation = self.handler.observe(&self.observe, &self.manifest, &mut pass)?;
208        // If the handler's observation payload includes a `resources_observed`
209        // string array, use it as evidence; otherwise default to belief.
210        let observed: Vec<String> = match observation.data.get("resources_observed") {
211            Some(serde_json::Value::Array(arr)) => arr
212                .iter()
213                .filter_map(|v| v.as_str().map(|s| s.to_string()))
214                .collect(),
215            _ => self.manifest.resources.clone(),
216        };
217        let drift = jaccard_drift(&self.manifest.resources, &observed);
218        let certainty = observation.envelope.c;
219
220        if certainty < self.threshold {
221            return Ok(self.record(
222                Some(observation),
223                TickAction::Noop,
224                drift,
225                certainty,
226                false,
227                format!(
228                    "certainty {certainty:.2} below threshold {:.2}",
229                    self.threshold
230                ),
231            ));
232        }
233        if drift <= self.tolerance {
234            return Ok(self.record(
235                Some(observation),
236                TickAction::Noop,
237                drift,
238                certainty,
239                true,
240                format!("drift {drift:.3} within tolerance {:.3}", self.tolerance),
241            ));
242        }
243        let approved = (self.shield)(&self.ir.name, &observation, drift);
244        if !approved {
245            return Ok(self.record(
246                Some(observation),
247                TickAction::Noop,
248                drift,
249                certainty,
250                false,
251                "shield denied corrective action".into(),
252            ));
253        }
254
255        let outcome = self.apply_action(&observation, drift, certainty)?;
256        self.retries_left -= 1;
257        let action = match self.ir.on_drift.as_str() {
258            "provision" => TickAction::Provision,
259            "alert" => TickAction::Alert,
260            _ => TickAction::Refine,
261        };
262        let report = ReconcileTickReport {
263            reconcile_name: self.ir.name.clone(),
264            observation: Some(observation),
265            action,
266            drift,
267            certainty,
268            shield_approved: true,
269            retries_remaining: self.retries_left,
270            outcome: Some(outcome),
271            note: format!(
272                "drift {drift:.3} > tolerance {:.3}; applied {}",
273                self.tolerance,
274                action.as_str()
275            ),
276        };
277        self.ticks.push(report.clone());
278        Ok(report)
279    }
280
281    /// Tick until quiescence (two consecutive noops), budget exhaustion,
282    /// or the supplied `max_ticks` (default = `max_retries + 2`).
283    pub fn run(&mut self, max_ticks: Option<u32>) -> Result<Vec<ReconcileTickReport>, HandlerError> {
284        let limit = max_ticks.unwrap_or((self.ir.max_retries + 2) as u32);
285        let mut results = Vec::new();
286        let mut consecutive_noops = 0;
287        for _ in 0..limit {
288            let report = self.tick()?;
289            let is_noop = report.action == TickAction::Noop;
290            let exhausted = report.note.contains("exhausted");
291            results.push(report);
292            if is_noop {
293                consecutive_noops += 1;
294                if consecutive_noops >= 2 || exhausted {
295                    break;
296                }
297            } else {
298                consecutive_noops = 0;
299            }
300        }
301        Ok(results)
302    }
303
304    pub fn history(&self) -> &[ReconcileTickReport] {
305        &self.ticks
306    }
307
308    fn apply_action(
309        &mut self,
310        observation: &HandlerOutcome,
311        drift: f64,
312        certainty: f64,
313    ) -> Result<HandlerOutcome, HandlerError> {
314        match self.ir.on_drift.as_str() {
315            "provision" => {
316                let mut pass = identity_continuation();
317                self.handler
318                    .provision(&self.manifest, &self.resources, &self.fabrics, &mut pass)
319            }
320            "alert" => {
321                let mut data = serde_json::Map::new();
322                data.insert("reconcile".into(), self.ir.name.clone().into());
323                data.insert("drift".into(), serde_json::Value::from(drift));
324                data.insert(
325                    "source_observation".into(),
326                    observation.target.clone().into(),
327                );
328                Ok(HandlerOutcome::new(
329                    "alert",
330                    self.ir.name.clone(),
331                    "ok",
332                    make_envelope(certainty, "reconcile", "inferred", None),
333                    format!("reconcile:{}", self.ir.name),
334                )
335                .with_data(data))
336            }
337            _ => {
338                // refine — belief-revision placeholder (Fase 4+).
339                let mut data = serde_json::Map::new();
340                data.insert("reconcile".into(), self.ir.name.clone().into());
341                data.insert("drift".into(), serde_json::Value::from(drift));
342                data.insert(
343                    "note".into(),
344                    "belief revision reserved for Fase 4 (psyche integration)".into(),
345                );
346                Ok(HandlerOutcome::new(
347                    "refine",
348                    self.ir.name.clone(),
349                    "partial",
350                    make_envelope(certainty, "reconcile", "inferred", None),
351                    format!("reconcile:{}", self.ir.name),
352                )
353                .with_data(data))
354            }
355        }
356    }
357
358    fn record(
359        &mut self,
360        observation: Option<HandlerOutcome>,
361        action: TickAction,
362        drift: f64,
363        certainty: f64,
364        shield_approved: bool,
365        note: String,
366    ) -> ReconcileTickReport {
367        let report = ReconcileTickReport {
368            reconcile_name: self.ir.name.clone(),
369            observation,
370            action,
371            drift,
372            certainty,
373            shield_approved,
374            retries_remaining: self.retries_left,
375            outcome: None,
376            note,
377        };
378        self.ticks.push(report.clone());
379        report
380    }
381}
382
383#[cfg(test)]
384mod tests {
385    use super::*;
386    use crate::handlers::base::{LambdaEnvelope, make_envelope};
387
388    // ── Scripted handler: returns fixed observations and records provisions ──
389
390    struct ScriptedHandler {
391        observation_certainty: f64,
392        observed_resources: Vec<String>,
393        provisions: u32,
394    }
395
396    impl Handler for ScriptedHandler {
397        fn name(&self) -> &str { "scripted" }
398
399        fn provision(
400            &mut self,
401            manifest: &IRManifest,
402            _resources: &HashMap<String, IRResource>,
403            _fabrics: &HashMap<String, IRFabric>,
404            _cont: &mut Continuation<'_>,
405        ) -> Result<HandlerOutcome, HandlerError> {
406            self.provisions += 1;
407            Ok(HandlerOutcome::new(
408                "provision",
409                manifest.name.clone(),
410                "ok",
411                make_envelope(1.0, "scripted", "observed", Some("T".into())),
412                "scripted",
413            ))
414        }
415
416        fn observe(
417            &mut self,
418            obs: &IRObserve,
419            _manifest: &IRManifest,
420            _cont: &mut Continuation<'_>,
421        ) -> Result<HandlerOutcome, HandlerError> {
422            let env = LambdaEnvelope::new(
423                self.observation_certainty,
424                "T".into(),
425                "scripted".into(),
426                "observed".into(),
427            );
428            let mut data = serde_json::Map::new();
429            data.insert(
430                "resources_observed".into(),
431                serde_json::Value::Array(
432                    self.observed_resources
433                        .iter()
434                        .map(|s| serde_json::Value::String(s.clone()))
435                        .collect(),
436                ),
437            );
438            Ok(HandlerOutcome::new("observe", obs.name.clone(), "ok", env, "scripted")
439                .with_data(data))
440        }
441    }
442
443    fn program_with_resources(resources: &[&str]) -> IRProgram {
444        use crate::ir_generator::IRGenerator;
445        use crate::lexer::Lexer;
446        use crate::parser::Parser;
447        // Use the compile pipeline to build a realistic IRProgram, then
448        // manipulate the compiled manifest / observe in place for the tests.
449        let base = format!(
450            r#"
451            resource Db {{ kind: postgres lifetime: linear }}
452            fabric Vpc {{ provider: aws region: "us-east-1" zones: 1 }}
453            manifest Prod {{ resources: [{}] fabric: Vpc }}
454            observe Health from Prod {{ sources: [prom] quorum: 1 }}
455            reconcile R {{
456                observe: Health
457                threshold: 0.5
458                tolerance: 0.1
459                on_drift: provision
460                max_retries: 3
461            }}"#,
462            resources.join(", ")
463        );
464        // The .axon fixture doesn't actually declare every resource in the
465        // `resources:` list as a top-level `resource X {...}` — the type
466        // checker runs in its own pass and would reject undefined refs.
467        // We bypass that here by directly mutating the parsed IR manifest.
468        let real = r#"
469            resource Db { kind: postgres lifetime: linear }
470            fabric Vpc { provider: aws region: "us-east-1" zones: 1 }
471            manifest Prod { resources: [Db] fabric: Vpc }
472            observe Health from Prod { sources: [prom] quorum: 1 }
473            reconcile R {
474                observe: Health
475                threshold: 0.5
476                tolerance: 0.1
477                on_drift: provision
478                max_retries: 3
479            }
480        "#;
481        let _ = base;
482        let tokens = Lexer::new(real, "r").tokenize().unwrap();
483        let program = Parser::new(tokens).parse().unwrap();
484        let mut ir = IRGenerator::new().generate(&program);
485        // Replace the manifest's resources list with the test's intended one.
486        ir.manifests[0].resources = resources.iter().map(|s| s.to_string()).collect();
487        ir
488    }
489
490    #[test]
491    fn jaccard_drift_edges() {
492        assert_eq!(jaccard_drift(&[], &[]), 0.0);
493        assert_eq!(
494            jaccard_drift(&["a".into(), "b".into()], &["a".into(), "b".into()]),
495            0.0
496        );
497        // Disjoint: |sym_diff| = 4, |union| = 4 → 1.0.
498        assert_eq!(
499            jaccard_drift(&["a".into(), "b".into()], &["c".into(), "d".into()]),
500            1.0
501        );
502        // {a,b} vs {b,c}: sym=2, union=3 → 2/3
503        let d = jaccard_drift(&["a".into(), "b".into()], &["b".into(), "c".into()]);
504        assert!((d - 2.0 / 3.0).abs() < 1e-9);
505    }
506
507    #[test]
508    fn tick_noops_when_certainty_below_threshold() {
509        let ir = program_with_resources(&["Db"]);
510        let reconcile = ir.reconciles[0].clone();
511        let handler = ScriptedHandler {
512            observation_certainty: 0.3, // below threshold 0.5
513            observed_resources: vec!["Db".into()],
514            provisions: 0,
515        };
516        let mut loop_ = ReconcileLoop::new(reconcile, &ir, handler).unwrap();
517        let r = loop_.tick().unwrap();
518        assert_eq!(r.action, TickAction::Noop);
519        assert!(r.note.contains("below threshold"));
520    }
521
522    #[test]
523    fn tick_noops_when_drift_within_tolerance() {
524        let ir = program_with_resources(&["Db"]);
525        let reconcile = ir.reconciles[0].clone();
526        let handler = ScriptedHandler {
527            observation_certainty: 1.0,
528            observed_resources: vec!["Db".into()], // zero drift
529            provisions: 0,
530        };
531        let mut loop_ = ReconcileLoop::new(reconcile, &ir, handler).unwrap();
532        let r = loop_.tick().unwrap();
533        assert_eq!(r.action, TickAction::Noop);
534        assert!(r.note.contains("within tolerance"));
535    }
536
537    #[test]
538    fn tick_triggers_provision_on_drift_above_tolerance() {
539        let ir = program_with_resources(&["Db"]);
540        let reconcile = ir.reconciles[0].clone();
541        let handler = ScriptedHandler {
542            observation_certainty: 1.0,
543            observed_resources: vec![], // drift = 1.0
544            provisions: 0,
545        };
546        let mut loop_ = ReconcileLoop::new(reconcile, &ir, handler).unwrap();
547        let r = loop_.tick().unwrap();
548        assert_eq!(r.action, TickAction::Provision);
549        assert_eq!(r.shield_approved, true);
550        assert!(r.outcome.is_some());
551    }
552
553    #[test]
554    fn tick_noops_when_shield_denies() {
555        let ir = program_with_resources(&["Db"]);
556        let reconcile = ir.reconciles[0].clone();
557        let handler = ScriptedHandler {
558            observation_certainty: 1.0,
559            observed_resources: vec![],
560            provisions: 0,
561        };
562        let mut loop_ =
563            ReconcileLoop::with_shield(reconcile, &ir, handler, deny_all_shield()).unwrap();
564        let r = loop_.tick().unwrap();
565        assert_eq!(r.action, TickAction::Noop);
566        assert!(r.note.contains("shield denied"));
567    }
568
569    #[test]
570    fn run_respects_max_retries_budget() {
571        let ir = program_with_resources(&["Db"]);
572        let reconcile = ir.reconciles[0].clone();
573        let max = reconcile.max_retries;
574        let handler = ScriptedHandler {
575            observation_certainty: 1.0,
576            observed_resources: vec![], // drift = 1.0 every tick
577            provisions: 0,
578        };
579        let mut loop_ = ReconcileLoop::new(reconcile, &ir, handler).unwrap();
580        let reports = loop_.run(Some(20)).unwrap();
581        let provisions = reports
582            .iter()
583            .filter(|r| r.action == TickAction::Provision)
584            .count();
585        // Budget caps provisions at max_retries.
586        assert_eq!(provisions as i64, max);
587    }
588
589    #[test]
590    fn reconcile_with_undefined_observe_is_caller_error() {
591        let ir = program_with_resources(&["Db"]);
592        let mut bad = ir.reconciles[0].clone();
593        bad.observe_ref = "Ghost".into();
594        let handler = ScriptedHandler {
595            observation_certainty: 1.0,
596            observed_resources: vec![],
597            provisions: 0,
598        };
599        match ReconcileLoop::new(bad, &ir, handler) {
600            Err(e) => assert_eq!(e.blame, "CT-2"),
601            Ok(_) => panic!("undefined observe must fail to construct the loop"),
602        }
603    }
604}