1#![allow(dead_code)]
13
14use std::collections::{HashMap, HashSet};
15
16use crate::handlers::base::{
17 Continuation, Handler, HandlerError, HandlerOutcome, identity_continuation, make_envelope,
18};
19use crate::ir_nodes::{
20 IRFabric, IRManifest, IRObserve, IRProgram, IRReconcile, IRResource,
21};
22
23#[derive(Debug, Clone)]
29pub struct ReconcileTickReport {
30 pub reconcile_name: String,
31 pub observation: Option<HandlerOutcome>,
32 pub action: TickAction,
33 pub drift: f64,
34 pub certainty: f64,
35 pub shield_approved: bool,
36 pub retries_remaining: i64,
37 pub outcome: Option<HandlerOutcome>,
38 pub note: String,
39}
40
41#[derive(Debug, Clone, Copy, PartialEq, Eq)]
43pub enum TickAction {
44 Provision,
45 Alert,
46 Refine,
47 Noop,
48}
49
50impl TickAction {
51 pub fn as_str(&self) -> &'static str {
52 match self {
53 TickAction::Provision => "provision",
54 TickAction::Alert => "alert",
55 TickAction::Refine => "refine",
56 TickAction::Noop => "noop",
57 }
58 }
59}
60
61pub type ShieldApprove = Box<dyn Fn(&str, &HandlerOutcome, f64) -> bool>;
71
72pub fn allow_all_shield() -> ShieldApprove {
73 Box::new(|_name, _obs, _drift| true)
74}
75
76pub fn deny_all_shield() -> ShieldApprove {
77 Box::new(|_name, _obs, _drift| false)
78}
79
80pub fn jaccard_drift(expected: &[String], observed: &[String]) -> f64 {
90 let a: HashSet<&String> = expected.iter().collect();
91 let b: HashSet<&String> = observed.iter().collect();
92 let union_len = a.union(&b).count();
93 if union_len == 0 {
94 return 0.0;
95 }
96 let sym_diff_len = a.symmetric_difference(&b).count();
97 (sym_diff_len as f64) / (union_len as f64)
98}
99
100pub struct ReconcileLoop<'p, H: Handler> {
106 ir: IRReconcile,
107 handler: H,
108 shield: ShieldApprove,
109 threshold: f64,
110 tolerance: f64,
111 retries_left: i64,
112 ticks: Vec<ReconcileTickReport>,
113 observe: IRObserve,
114 manifest: IRManifest,
115 resources: HashMap<String, IRResource>,
116 fabrics: HashMap<String, IRFabric>,
117 _phantom: std::marker::PhantomData<&'p ()>,
118}
119
120impl<'p, H: Handler> ReconcileLoop<'p, H> {
121 pub fn new(
122 ir_reconcile: IRReconcile,
123 program: &IRProgram,
124 handler: H,
125 ) -> Result<Self, HandlerError> {
126 Self::with_shield(ir_reconcile, program, handler, allow_all_shield())
127 }
128
129 pub fn with_shield(
130 ir_reconcile: IRReconcile,
131 program: &IRProgram,
132 handler: H,
133 shield: ShieldApprove,
134 ) -> Result<Self, HandlerError> {
135 let observe = program
136 .observations
137 .iter()
138 .find(|o| o.name == ir_reconcile.observe_ref)
139 .cloned()
140 .ok_or_else(|| {
141 HandlerError::caller(format!(
142 "reconcile '{}' references unknown observe '{}'",
143 ir_reconcile.name, ir_reconcile.observe_ref
144 ))
145 })?;
146 let manifest_name = observe.target.clone();
147 let manifest = program
148 .manifests
149 .iter()
150 .find(|m| m.name == manifest_name)
151 .cloned()
152 .ok_or_else(|| {
153 HandlerError::caller(format!(
154 "reconcile '{}': observe '{}' targets unknown manifest '{}'",
155 ir_reconcile.name, ir_reconcile.observe_ref, manifest_name
156 ))
157 })?;
158 let resources: HashMap<String, IRResource> = program
159 .resources
160 .iter()
161 .map(|r| (r.name.clone(), r.clone()))
162 .collect();
163 let fabrics: HashMap<String, IRFabric> = program
164 .fabrics
165 .iter()
166 .map(|f| (f.name.clone(), f.clone()))
167 .collect();
168 let threshold = ir_reconcile.threshold.unwrap_or(0.85);
169 let tolerance = ir_reconcile.tolerance.unwrap_or(0.10);
170 let retries_left = ir_reconcile.max_retries;
171 Ok(ReconcileLoop {
172 ir: ir_reconcile,
173 handler,
174 shield,
175 threshold,
176 tolerance,
177 retries_left,
178 ticks: Vec::new(),
179 observe,
180 manifest,
181 resources,
182 fabrics,
183 _phantom: std::marker::PhantomData,
184 })
185 }
186
187 pub fn tick(&mut self) -> Result<ReconcileTickReport, HandlerError> {
190 if self.retries_left <= 0 {
191 let report = ReconcileTickReport {
192 reconcile_name: self.ir.name.clone(),
193 observation: None,
194 action: TickAction::Noop,
195 drift: 0.0,
196 certainty: 0.0,
197 shield_approved: false,
198 retries_remaining: self.retries_left,
199 outcome: None,
200 note: "max_retries exhausted".into(),
201 };
202 self.ticks.push(report.clone());
203 return Ok(report);
204 }
205
206 let mut pass: Continuation<'_> = identity_continuation();
207 let observation = self.handler.observe(&self.observe, &self.manifest, &mut pass)?;
208 let observed: Vec<String> = match observation.data.get("resources_observed") {
211 Some(serde_json::Value::Array(arr)) => arr
212 .iter()
213 .filter_map(|v| v.as_str().map(|s| s.to_string()))
214 .collect(),
215 _ => self.manifest.resources.clone(),
216 };
217 let drift = jaccard_drift(&self.manifest.resources, &observed);
218 let certainty = observation.envelope.c;
219
220 if certainty < self.threshold {
221 return Ok(self.record(
222 Some(observation),
223 TickAction::Noop,
224 drift,
225 certainty,
226 false,
227 format!(
228 "certainty {certainty:.2} below threshold {:.2}",
229 self.threshold
230 ),
231 ));
232 }
233 if drift <= self.tolerance {
234 return Ok(self.record(
235 Some(observation),
236 TickAction::Noop,
237 drift,
238 certainty,
239 true,
240 format!("drift {drift:.3} within tolerance {:.3}", self.tolerance),
241 ));
242 }
243 let approved = (self.shield)(&self.ir.name, &observation, drift);
244 if !approved {
245 return Ok(self.record(
246 Some(observation),
247 TickAction::Noop,
248 drift,
249 certainty,
250 false,
251 "shield denied corrective action".into(),
252 ));
253 }
254
255 let outcome = self.apply_action(&observation, drift, certainty)?;
256 self.retries_left -= 1;
257 let action = match self.ir.on_drift.as_str() {
258 "provision" => TickAction::Provision,
259 "alert" => TickAction::Alert,
260 _ => TickAction::Refine,
261 };
262 let report = ReconcileTickReport {
263 reconcile_name: self.ir.name.clone(),
264 observation: Some(observation),
265 action,
266 drift,
267 certainty,
268 shield_approved: true,
269 retries_remaining: self.retries_left,
270 outcome: Some(outcome),
271 note: format!(
272 "drift {drift:.3} > tolerance {:.3}; applied {}",
273 self.tolerance,
274 action.as_str()
275 ),
276 };
277 self.ticks.push(report.clone());
278 Ok(report)
279 }
280
281 pub fn run(&mut self, max_ticks: Option<u32>) -> Result<Vec<ReconcileTickReport>, HandlerError> {
284 let limit = max_ticks.unwrap_or((self.ir.max_retries + 2) as u32);
285 let mut results = Vec::new();
286 let mut consecutive_noops = 0;
287 for _ in 0..limit {
288 let report = self.tick()?;
289 let is_noop = report.action == TickAction::Noop;
290 let exhausted = report.note.contains("exhausted");
291 results.push(report);
292 if is_noop {
293 consecutive_noops += 1;
294 if consecutive_noops >= 2 || exhausted {
295 break;
296 }
297 } else {
298 consecutive_noops = 0;
299 }
300 }
301 Ok(results)
302 }
303
304 pub fn history(&self) -> &[ReconcileTickReport] {
305 &self.ticks
306 }
307
308 fn apply_action(
309 &mut self,
310 observation: &HandlerOutcome,
311 drift: f64,
312 certainty: f64,
313 ) -> Result<HandlerOutcome, HandlerError> {
314 match self.ir.on_drift.as_str() {
315 "provision" => {
316 let mut pass = identity_continuation();
317 self.handler
318 .provision(&self.manifest, &self.resources, &self.fabrics, &mut pass)
319 }
320 "alert" => {
321 let mut data = serde_json::Map::new();
322 data.insert("reconcile".into(), self.ir.name.clone().into());
323 data.insert("drift".into(), serde_json::Value::from(drift));
324 data.insert(
325 "source_observation".into(),
326 observation.target.clone().into(),
327 );
328 Ok(HandlerOutcome::new(
329 "alert",
330 self.ir.name.clone(),
331 "ok",
332 make_envelope(certainty, "reconcile", "inferred", None),
333 format!("reconcile:{}", self.ir.name),
334 )
335 .with_data(data))
336 }
337 _ => {
338 let mut data = serde_json::Map::new();
340 data.insert("reconcile".into(), self.ir.name.clone().into());
341 data.insert("drift".into(), serde_json::Value::from(drift));
342 data.insert(
343 "note".into(),
344 "belief revision reserved for Fase 4 (psyche integration)".into(),
345 );
346 Ok(HandlerOutcome::new(
347 "refine",
348 self.ir.name.clone(),
349 "partial",
350 make_envelope(certainty, "reconcile", "inferred", None),
351 format!("reconcile:{}", self.ir.name),
352 )
353 .with_data(data))
354 }
355 }
356 }
357
358 fn record(
359 &mut self,
360 observation: Option<HandlerOutcome>,
361 action: TickAction,
362 drift: f64,
363 certainty: f64,
364 shield_approved: bool,
365 note: String,
366 ) -> ReconcileTickReport {
367 let report = ReconcileTickReport {
368 reconcile_name: self.ir.name.clone(),
369 observation,
370 action,
371 drift,
372 certainty,
373 shield_approved,
374 retries_remaining: self.retries_left,
375 outcome: None,
376 note,
377 };
378 self.ticks.push(report.clone());
379 report
380 }
381}
382
383#[cfg(test)]
384mod tests {
385 use super::*;
386 use crate::handlers::base::{LambdaEnvelope, make_envelope};
387
388 struct ScriptedHandler {
391 observation_certainty: f64,
392 observed_resources: Vec<String>,
393 provisions: u32,
394 }
395
396 impl Handler for ScriptedHandler {
397 fn name(&self) -> &str { "scripted" }
398
399 fn provision(
400 &mut self,
401 manifest: &IRManifest,
402 _resources: &HashMap<String, IRResource>,
403 _fabrics: &HashMap<String, IRFabric>,
404 _cont: &mut Continuation<'_>,
405 ) -> Result<HandlerOutcome, HandlerError> {
406 self.provisions += 1;
407 Ok(HandlerOutcome::new(
408 "provision",
409 manifest.name.clone(),
410 "ok",
411 make_envelope(1.0, "scripted", "observed", Some("T".into())),
412 "scripted",
413 ))
414 }
415
416 fn observe(
417 &mut self,
418 obs: &IRObserve,
419 _manifest: &IRManifest,
420 _cont: &mut Continuation<'_>,
421 ) -> Result<HandlerOutcome, HandlerError> {
422 let env = LambdaEnvelope::new(
423 self.observation_certainty,
424 "T".into(),
425 "scripted".into(),
426 "observed".into(),
427 );
428 let mut data = serde_json::Map::new();
429 data.insert(
430 "resources_observed".into(),
431 serde_json::Value::Array(
432 self.observed_resources
433 .iter()
434 .map(|s| serde_json::Value::String(s.clone()))
435 .collect(),
436 ),
437 );
438 Ok(HandlerOutcome::new("observe", obs.name.clone(), "ok", env, "scripted")
439 .with_data(data))
440 }
441 }
442
443 fn program_with_resources(resources: &[&str]) -> IRProgram {
444 use crate::ir_generator::IRGenerator;
445 use crate::lexer::Lexer;
446 use crate::parser::Parser;
447 let base = format!(
450 r#"
451 resource Db {{ kind: postgres lifetime: linear }}
452 fabric Vpc {{ provider: aws region: "us-east-1" zones: 1 }}
453 manifest Prod {{ resources: [{}] fabric: Vpc }}
454 observe Health from Prod {{ sources: [prom] quorum: 1 }}
455 reconcile R {{
456 observe: Health
457 threshold: 0.5
458 tolerance: 0.1
459 on_drift: provision
460 max_retries: 3
461 }}"#,
462 resources.join(", ")
463 );
464 let real = r#"
469 resource Db { kind: postgres lifetime: linear }
470 fabric Vpc { provider: aws region: "us-east-1" zones: 1 }
471 manifest Prod { resources: [Db] fabric: Vpc }
472 observe Health from Prod { sources: [prom] quorum: 1 }
473 reconcile R {
474 observe: Health
475 threshold: 0.5
476 tolerance: 0.1
477 on_drift: provision
478 max_retries: 3
479 }
480 "#;
481 let _ = base;
482 let tokens = Lexer::new(real, "r").tokenize().unwrap();
483 let program = Parser::new(tokens).parse().unwrap();
484 let mut ir = IRGenerator::new().generate(&program);
485 ir.manifests[0].resources = resources.iter().map(|s| s.to_string()).collect();
487 ir
488 }
489
490 #[test]
491 fn jaccard_drift_edges() {
492 assert_eq!(jaccard_drift(&[], &[]), 0.0);
493 assert_eq!(
494 jaccard_drift(&["a".into(), "b".into()], &["a".into(), "b".into()]),
495 0.0
496 );
497 assert_eq!(
499 jaccard_drift(&["a".into(), "b".into()], &["c".into(), "d".into()]),
500 1.0
501 );
502 let d = jaccard_drift(&["a".into(), "b".into()], &["b".into(), "c".into()]);
504 assert!((d - 2.0 / 3.0).abs() < 1e-9);
505 }
506
507 #[test]
508 fn tick_noops_when_certainty_below_threshold() {
509 let ir = program_with_resources(&["Db"]);
510 let reconcile = ir.reconciles[0].clone();
511 let handler = ScriptedHandler {
512 observation_certainty: 0.3, observed_resources: vec!["Db".into()],
514 provisions: 0,
515 };
516 let mut loop_ = ReconcileLoop::new(reconcile, &ir, handler).unwrap();
517 let r = loop_.tick().unwrap();
518 assert_eq!(r.action, TickAction::Noop);
519 assert!(r.note.contains("below threshold"));
520 }
521
522 #[test]
523 fn tick_noops_when_drift_within_tolerance() {
524 let ir = program_with_resources(&["Db"]);
525 let reconcile = ir.reconciles[0].clone();
526 let handler = ScriptedHandler {
527 observation_certainty: 1.0,
528 observed_resources: vec!["Db".into()], provisions: 0,
530 };
531 let mut loop_ = ReconcileLoop::new(reconcile, &ir, handler).unwrap();
532 let r = loop_.tick().unwrap();
533 assert_eq!(r.action, TickAction::Noop);
534 assert!(r.note.contains("within tolerance"));
535 }
536
537 #[test]
538 fn tick_triggers_provision_on_drift_above_tolerance() {
539 let ir = program_with_resources(&["Db"]);
540 let reconcile = ir.reconciles[0].clone();
541 let handler = ScriptedHandler {
542 observation_certainty: 1.0,
543 observed_resources: vec![], provisions: 0,
545 };
546 let mut loop_ = ReconcileLoop::new(reconcile, &ir, handler).unwrap();
547 let r = loop_.tick().unwrap();
548 assert_eq!(r.action, TickAction::Provision);
549 assert_eq!(r.shield_approved, true);
550 assert!(r.outcome.is_some());
551 }
552
553 #[test]
554 fn tick_noops_when_shield_denies() {
555 let ir = program_with_resources(&["Db"]);
556 let reconcile = ir.reconciles[0].clone();
557 let handler = ScriptedHandler {
558 observation_certainty: 1.0,
559 observed_resources: vec![],
560 provisions: 0,
561 };
562 let mut loop_ =
563 ReconcileLoop::with_shield(reconcile, &ir, handler, deny_all_shield()).unwrap();
564 let r = loop_.tick().unwrap();
565 assert_eq!(r.action, TickAction::Noop);
566 assert!(r.note.contains("shield denied"));
567 }
568
569 #[test]
570 fn run_respects_max_retries_budget() {
571 let ir = program_with_resources(&["Db"]);
572 let reconcile = ir.reconciles[0].clone();
573 let max = reconcile.max_retries;
574 let handler = ScriptedHandler {
575 observation_certainty: 1.0,
576 observed_resources: vec![], provisions: 0,
578 };
579 let mut loop_ = ReconcileLoop::new(reconcile, &ir, handler).unwrap();
580 let reports = loop_.run(Some(20)).unwrap();
581 let provisions = reports
582 .iter()
583 .filter(|r| r.action == TickAction::Provision)
584 .count();
585 assert_eq!(provisions as i64, max);
587 }
588
589 #[test]
590 fn reconcile_with_undefined_observe_is_caller_error() {
591 let ir = program_with_resources(&["Db"]);
592 let mut bad = ir.reconciles[0].clone();
593 bad.observe_ref = "Ghost".into();
594 let handler = ScriptedHandler {
595 observation_certainty: 1.0,
596 observed_resources: vec![],
597 provisions: 0,
598 };
599 match ReconcileLoop::new(bad, &ir, handler) {
600 Err(e) => assert_eq!(e.blame, "CT-2"),
601 Ok(_) => panic!("undefined observe must fail to construct the loop"),
602 }
603 }
604}