use std::collections::HashMap;
use std::sync::{Arc, Mutex};
use std::time::Duration;
mod support;
use ergo_adapter::{
AdapterProvides, EventId, EventTime, ExternalEvent, ExternalEventKind, FaultRuntimeHandle,
GraphId, RunTermination, RuntimeHandle,
};
use ergo_runtime::action::ActionOutcome;
use ergo_runtime::catalog::{build_core_catalog, core_registries};
use ergo_runtime::cluster::{
ExpandedEdge, ExpandedEndpoint, ExpandedGraph, ExpandedNode, ImplementationInstance,
OutputPortSpec, OutputRef, ParameterValue,
};
use ergo_runtime::provenance::{compute_runtime_provenance, RuntimeProvenanceScheme};
use ergo_supervisor::replay::compare_decisions;
use ergo_supervisor::replay::replay;
use ergo_supervisor::{
CapturingSession, Constraints, Decision, DecisionLog, DecisionLogEntry, Supervisor,
};
use support::demo_1;
#[derive(Clone)]
struct CapturingLog {
entries: Arc<Mutex<Vec<DecisionLogEntry>>>,
}
impl CapturingLog {
fn new() -> Self {
Self {
entries: Arc::new(Mutex::new(Vec::new())),
}
}
fn entries(&self) -> Vec<DecisionLogEntry> {
self.entries.lock().unwrap().clone()
}
}
impl DecisionLog for CapturingLog {
fn log(&self, entry: DecisionLogEntry) {
self.entries.lock().unwrap().push(entry);
}
}
fn build_hello_world_graph() -> ExpandedGraph {
let mut nodes = HashMap::new();
nodes.insert(
"src_a".to_string(),
ExpandedNode {
runtime_id: "src_a".to_string(),
authoring_path: vec![],
implementation: ImplementationInstance {
impl_id: "number_source".to_string(),
requested_version: "0.1.0".to_string(),
version: "0.1.0".to_string(),
},
parameters: HashMap::from([("value".to_string(), ParameterValue::Number(3.0))]),
},
);
nodes.insert(
"src_b".to_string(),
ExpandedNode {
runtime_id: "src_b".to_string(),
authoring_path: vec![],
implementation: ImplementationInstance {
impl_id: "number_source".to_string(),
requested_version: "0.1.0".to_string(),
version: "0.1.0".to_string(),
},
parameters: HashMap::from([("value".to_string(), ParameterValue::Number(1.0))]),
},
);
nodes.insert(
"gt1".to_string(),
ExpandedNode {
runtime_id: "gt1".to_string(),
authoring_path: vec![],
implementation: ImplementationInstance {
impl_id: "gt".to_string(),
requested_version: "0.1.0".to_string(),
version: "0.1.0".to_string(),
},
parameters: HashMap::new(),
},
);
nodes.insert(
"emit".to_string(),
ExpandedNode {
runtime_id: "emit".to_string(),
authoring_path: vec![],
implementation: ImplementationInstance {
impl_id: "emit_if_true".to_string(),
requested_version: "0.1.0".to_string(),
version: "0.1.0".to_string(),
},
parameters: HashMap::new(),
},
);
nodes.insert(
"act".to_string(),
ExpandedNode {
runtime_id: "act".to_string(),
authoring_path: vec![],
implementation: ImplementationInstance {
impl_id: "ack_action".to_string(),
requested_version: "0.1.0".to_string(),
version: "0.1.0".to_string(),
},
parameters: HashMap::from([("accept".to_string(), ParameterValue::Bool(true))]),
},
);
let edges = vec![
ExpandedEdge {
from: ExpandedEndpoint::NodePort {
node_id: "src_a".to_string(),
port_name: "value".to_string(),
},
to: ExpandedEndpoint::NodePort {
node_id: "gt1".to_string(),
port_name: "a".to_string(),
},
},
ExpandedEdge {
from: ExpandedEndpoint::NodePort {
node_id: "src_b".to_string(),
port_name: "value".to_string(),
},
to: ExpandedEndpoint::NodePort {
node_id: "gt1".to_string(),
port_name: "b".to_string(),
},
},
ExpandedEdge {
from: ExpandedEndpoint::NodePort {
node_id: "gt1".to_string(),
port_name: "result".to_string(),
},
to: ExpandedEndpoint::NodePort {
node_id: "emit".to_string(),
port_name: "input".to_string(),
},
},
ExpandedEdge {
from: ExpandedEndpoint::NodePort {
node_id: "emit".to_string(),
port_name: "event".to_string(),
},
to: ExpandedEndpoint::NodePort {
node_id: "act".to_string(),
port_name: "event".to_string(),
},
},
];
ExpandedGraph {
nodes,
edges,
boundary_inputs: Vec::new(),
boundary_outputs: vec![OutputPortSpec {
name: "action_outcome".to_string(),
maps_to: OutputRef {
node_id: "act".to_string(),
port_name: "outcome".to_string(),
},
}],
}
}
#[allow(clippy::arc_with_non_send_sync)]
#[test]
fn supervisor_with_real_runtime_executes_hello_world() {
let graph = Arc::new(build_hello_world_graph());
let catalog = Arc::new(build_core_catalog());
let registries = Arc::new(core_registries().expect("core registries should build"));
let log = CapturingLog::new();
let mut supervisor = Supervisor::new(
GraphId::new("hello_world"),
Constraints::default(),
log.clone(),
graph,
catalog,
registries,
);
let event = ExternalEvent::mechanical(EventId::new("test_event"), ExternalEventKind::Command);
supervisor.on_event(event);
let entries = log.entries();
assert_eq!(entries.len(), 1, "Expected exactly one decision log entry");
let entry = &entries[0];
assert_eq!(
entry.decision,
Decision::Invoke,
"Expected Decision::Invoke, got {:?}",
entry.decision
);
assert_eq!(
entry.termination,
Some(RunTermination::Completed),
"Expected RunTermination::Completed, got {:?}",
entry.termination
);
assert_eq!(entry.retry_count, 0, "Expected no retries");
}
#[allow(clippy::arc_with_non_send_sync)]
#[test]
fn capturing_session_enables_round_trip_replay() {
let graph = Arc::new(build_hello_world_graph());
let catalog = Arc::new(build_core_catalog());
let core_registries = Arc::new(core_registries().expect("core registries should build"));
let runtime = RuntimeHandle::new(
graph.clone(),
catalog.clone(),
core_registries.clone(),
AdapterProvides::default(),
);
let runtime_provenance = compute_runtime_provenance(
RuntimeProvenanceScheme::Rpv1,
"hello_world_capture",
graph.as_ref(),
catalog.as_ref(),
)
.expect("runtime provenance should compute");
let mut session = CapturingSession::new(
GraphId::new("hello_world_capture"),
Constraints::default(),
CapturingLog::new(),
runtime,
runtime_provenance,
);
let event = ExternalEvent::mechanical(EventId::new("capture_event"), ExternalEventKind::Pump);
session.on_event(event);
let bundle = session.into_bundle();
assert_eq!(
bundle.events.len(),
1,
"expected exactly one captured event"
);
assert_eq!(
bundle.decisions.len(),
1,
"expected exactly one captured decision"
);
let replay_decisions = replay(&bundle, FaultRuntimeHandle::new(RunTermination::Completed));
assert_eq!(
replay_decisions.len(),
1,
"expected exactly one replay decision"
);
assert_eq!(
replay_decisions[0].decision, bundle.decisions[0].decision,
"decision should round trip through replay"
);
assert_eq!(
replay_decisions[0].termination, bundle.decisions[0].termination,
"termination should round trip through replay"
);
assert_eq!(
replay_decisions[0].retry_count, bundle.decisions[0].retry_count,
"retry_count should round trip through replay"
);
}
#[allow(clippy::arc_with_non_send_sync)]
#[test]
fn demo_1_complex_graph_executes_and_replays() {
let graph = Arc::new(demo_1::build_demo_1_graph());
let catalog = Arc::new(build_core_catalog());
let core_registries = Arc::new(core_registries().expect("core registries should build"));
let runtime = RuntimeHandle::new(
graph.clone(),
catalog.clone(),
core_registries.clone(),
AdapterProvides::default(),
);
let runtime_provenance = compute_runtime_provenance(
RuntimeProvenanceScheme::Rpv1,
"demo_1",
graph.as_ref(),
catalog.as_ref(),
)
.expect("runtime provenance should compute");
let mut session = CapturingSession::new(
GraphId::new("demo_1"),
Constraints::default(),
CapturingLog::new(),
runtime,
runtime_provenance,
);
let event_1 = ExternalEvent::mechanical(EventId::new("demo_evt_1"), ExternalEventKind::Command);
let event_2 = ExternalEvent::mechanical(EventId::new("demo_evt_2"), ExternalEventKind::Command);
session.on_event(event_1);
session.on_event(event_2);
let bundle = session.into_bundle();
assert_eq!(bundle.events.len(), 2, "expected two captured events");
assert_eq!(bundle.decisions.len(), 2, "expected two captured decisions");
assert!(bundle
.decisions
.iter()
.all(|record| record.decision == Decision::Invoke));
let summary = demo_1::compute_summary(&graph, &catalog, &core_registries);
assert_eq!(summary.sum_left, 6.0);
assert_eq!(summary.sum_total, 8.0);
assert_eq!(
summary.action_a_outcome,
ActionOutcome::Completed,
"TriggerA should emit; ActionA should execute"
);
assert_eq!(
summary.action_b_outcome,
ActionOutcome::Skipped,
"TriggerB should not emit; ActionB should be skipped"
);
for record in &bundle.decisions {
println!(
"{}",
demo_1::format_episode_summary(record.episode_id, &record.event_id, &summary)
);
}
let replay_decisions = replay(&bundle, FaultRuntimeHandle::new(RunTermination::Completed));
let replay_matches = compare_decisions(&bundle.decisions, &replay_decisions).unwrap();
println!("{}", demo_1::format_replay_identity(replay_matches));
assert!(replay_matches, "replay decisions must match capture");
}
#[test]
fn deferred_episode_retried_on_tick() {
let log = CapturingLog::new();
let runtime = FaultRuntimeHandle::new(RunTermination::Completed);
let constraints = Constraints {
max_in_flight: Some(0),
..Default::default()
};
let mut supervisor =
Supervisor::with_runtime(GraphId::new("test"), constraints, log.clone(), runtime);
let event = ExternalEvent::mechanical_at(
EventId::new("e1"),
ExternalEventKind::Command,
EventTime::from_duration(Duration::from_secs(0)),
);
supervisor.on_event(event);
let entries = log.entries();
assert_eq!(entries.len(), 1);
assert_eq!(entries[0].decision, Decision::Defer);
assert!(entries[0].termination.is_none());
let log2 = CapturingLog::new();
let runtime2 = FaultRuntimeHandle::new(RunTermination::Completed);
let constraints2 = Constraints {
max_per_window: Some(1),
rate_window: Some(Duration::from_secs(10)),
..Default::default()
};
let mut supervisor2 =
Supervisor::with_runtime(GraphId::new("test2"), constraints2, log2.clone(), runtime2);
let event1 = ExternalEvent::mechanical_at(
EventId::new("e1"),
ExternalEventKind::Command,
EventTime::from_duration(Duration::from_secs(0)),
);
supervisor2.on_event(event1);
let event2 = ExternalEvent::mechanical_at(
EventId::new("e2"),
ExternalEventKind::Command,
EventTime::from_duration(Duration::from_secs(0)),
);
supervisor2.on_event(event2);
let entries2 = log2.entries();
assert_eq!(entries2.len(), 2);
assert_eq!(entries2[0].decision, Decision::Invoke);
assert_eq!(entries2[1].decision, Decision::Defer);
let tick = ExternalEvent::mechanical_at(
EventId::new("tick1"),
ExternalEventKind::Pump,
EventTime::from_duration(Duration::from_secs(10)),
);
supervisor2.on_event(tick);
let entries3 = log2.entries();
assert_eq!(entries3.len(), 3);
assert_eq!(
entries3[2].decision,
Decision::Invoke,
"Tick should invoke deferred episode"
);
assert_eq!(entries3[2].termination, Some(RunTermination::Completed));
}
#[test]
fn tick_with_empty_queue_logs_noop() {
let log = CapturingLog::new();
let runtime = FaultRuntimeHandle::new(RunTermination::Completed);
let mut supervisor = Supervisor::with_runtime(
GraphId::new("test"),
Constraints::default(),
log.clone(),
runtime,
);
let tick = ExternalEvent::mechanical_at(
EventId::new("tick1"),
ExternalEventKind::Pump,
EventTime::from_duration(Duration::from_secs(0)),
);
supervisor.on_event(tick);
let entries = log.entries();
assert_eq!(
entries.len(),
1,
"Tick should produce exactly one log entry"
);
assert_eq!(
entries[0].decision,
Decision::Defer,
"Empty queue Tick should log Defer"
);
assert_eq!(
entries[0].schedule_at, None,
"Empty queue Tick should have no schedule_at"
);
assert!(
entries[0].termination.is_none(),
"Empty queue Tick should have no termination"
);
}
#[test]
fn tick_respects_episode_id_ordering() {
let log = CapturingLog::new();
let runtime = FaultRuntimeHandle::new(RunTermination::Completed);
let constraints = Constraints {
max_per_window: Some(1),
rate_window: Some(Duration::from_secs(10)),
..Default::default()
};
let mut supervisor =
Supervisor::with_runtime(GraphId::new("test"), constraints, log.clone(), runtime);
let event1 = ExternalEvent::mechanical_at(
EventId::new("e1"),
ExternalEventKind::Command,
EventTime::from_duration(Duration::from_secs(0)),
);
supervisor.on_event(event1);
let event2 = ExternalEvent::mechanical_at(
EventId::new("e2"),
ExternalEventKind::Command,
EventTime::from_duration(Duration::from_secs(0)),
);
supervisor.on_event(event2);
let event3 = ExternalEvent::mechanical_at(
EventId::new("e3"),
ExternalEventKind::Command,
EventTime::from_duration(Duration::from_secs(0)),
);
supervisor.on_event(event3);
let entries = log.entries();
assert_eq!(entries.len(), 3);
assert_eq!(entries[0].decision, Decision::Invoke);
assert_eq!(entries[1].decision, Decision::Defer);
assert_eq!(entries[2].decision, Decision::Defer);
let tick1 = ExternalEvent::mechanical_at(
EventId::new("tick1"),
ExternalEventKind::Pump,
EventTime::from_duration(Duration::from_secs(10)),
);
supervisor.on_event(tick1);
let tick2 = ExternalEvent::mechanical_at(
EventId::new("tick2"),
ExternalEventKind::Pump,
EventTime::from_duration(Duration::from_secs(20)),
);
supervisor.on_event(tick2);
let final_entries = log.entries();
assert_eq!(final_entries.len(), 5);
assert_eq!(final_entries[3].decision, Decision::Invoke);
assert_eq!(final_entries[4].decision, Decision::Invoke);
}