#![cfg_attr(not(feature = "unaudited-infrastructure-holons"), allow(dead_code))]
use std::{sync::Arc, time::Duration};
use exo_core::{
PublicKey, Signature,
hash::hash_structured,
types::{Did, Timestamp},
};
use exo_gatekeeper::{
authority_link_signature_message,
combinator::{Combinator, CombinatorInput, Predicate, TransformFn},
holon::{self, Holon, HolonState},
invariants::InvariantSet,
kernel::{AdjudicationContext, Kernel},
provenance_signature_message,
types::{
AuthorityChain, AuthorityLink, BailmentState, ConsentRecord, GovernmentBranch, Permission,
PermissionSet, Provenance, Role, TrustedAuthorityKeys, TrustedProvenanceKeys,
},
};
use serde::Serialize;
use tokio::sync::mpsc;
use crate::{
network::NetworkHandle,
reactor::{self, SharedReactorState},
store::SqliteDagStore,
wire::{GovernanceEventType, ValidatorChange},
};
pub const INFRASTRUCTURE_HOLONS_FEATURE: &str = "unaudited-infrastructure-holons";
pub const INFRASTRUCTURE_HOLONS_INITIATIVE: &str =
"Initiatives/fix-onyx-4-r5-holons-stub-context.md";
#[must_use]
pub const fn infrastructure_holons_enabled() -> bool {
cfg!(feature = "unaudited-infrastructure-holons")
}
#[derive(Debug, Clone)]
#[cfg_attr(not(feature = "unaudited-infrastructure-holons"), allow(dead_code))]
pub enum HolonEvent {
TopologyAnalysis {
peer_count: usize,
diversity_score_bp: u32,
recommendation: String,
},
ScalingRecommendation {
validator_count: usize,
node_count: usize,
recommendation: String,
},
HealthCheck {
consensus_round: u64,
committed_height: u64,
status: HealthStatus,
},
HolonTerminated { holon_id: Did, reason: String },
}
#[derive(Debug, Clone, PartialEq, Eq)]
pub enum HealthStatus {
Healthy,
Degraded { reason: String },
Critical { reason: String },
}
#[derive(Clone)]
pub struct HolonManagerConfig {
pub node_did: Did,
pub root_did: Did,
pub root_public_key: PublicKey,
pub root_signer: Arc<dyn Fn(&[u8]) -> Signature + Send + Sync>,
pub provenance_timestamp_source: Arc<dyn Fn() -> Result<Timestamp, String> + Send + Sync>,
pub topology_interval_secs: u64,
pub scaling_interval_secs: u64,
pub health_interval_secs: u64,
}
impl std::fmt::Debug for HolonManagerConfig {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
f.debug_struct("HolonManagerConfig")
.field("node_did", &self.node_did)
.field("root_did", &self.root_did)
.field("root_public_key", &self.root_public_key)
.field("provenance_timestamp_source", &"<deterministic-hlc-source>")
.field("topology_interval_secs", &self.topology_interval_secs)
.field("scaling_interval_secs", &self.scaling_interval_secs)
.field("health_interval_secs", &self.health_interval_secs)
.finish_non_exhaustive()
}
}
#[must_use]
pub fn hlc_provenance_timestamp_source() -> Arc<dyn Fn() -> Result<Timestamp, String> + Send + Sync>
{
let clock = Arc::new(std::sync::Mutex::new(exo_core::hlc::HybridClock::new()));
Arc::new(move || {
let mut clock = clock
.lock()
.unwrap_or_else(|poisoned| poisoned.into_inner());
clock
.now()
.map_err(|err| format!("Holon provenance HLC exhausted: {err}"))
})
}
fn static_did(value: &'static str) -> Did {
match Did::new(value) {
Ok(did) => did,
Err(error) => unreachable!("hardcoded DID {value} must be valid: {error}"),
}
}
fn did_with_static_fallback(candidate: String, fallback: &'static str) -> Did {
match Did::new(&candidate) {
Ok(did) => did,
Err(error) => {
tracing::warn!(
candidate,
fallback,
err = %error,
"Generated Holon DID was invalid; using static fallback"
);
static_did(fallback)
}
}
}
fn ratio_basis_points(numerator: usize, denominator: usize) -> u32 {
if denominator == 0 {
return 0;
}
let numerator = u128::try_from(numerator).unwrap_or(u128::MAX);
let denominator = u128::try_from(denominator).unwrap_or(u128::MAX);
let ratio = numerator.saturating_mul(10_000) / denominator;
u32::try_from(ratio).unwrap_or(u32::MAX)
}
fn usize_to_u64_saturating(value: usize) -> u64 {
u64::try_from(value).unwrap_or(u64::MAX)
}
pub fn create_topology_holon(node_did: &Did) -> Holon {
let holon_did = did_with_static_fallback(
format!("did:exo:archon-topology-{node_did}"),
"did:exo:archon-topology",
);
holon::spawn(
holon_did,
PermissionSet::new(vec![
Permission::new("network.peers.read"),
Permission::new("network.topology.recommend"),
]),
Combinator::Sequence(vec![
Combinator::Guard(
Box::new(Combinator::Identity),
Predicate {
name: "has_peers".into(),
required_key: "peer_count".into(),
expected_value: None, },
),
Combinator::Transform(
Box::new(Combinator::Identity),
TransformFn {
name: "topology_analysis".into(),
output_key: "topology_recommendation".into(),
output_value: "analyzed".into(),
},
),
]),
)
}
pub fn create_scaling_holon(node_did: &Did) -> Holon {
let holon_did = did_with_static_fallback(
format!("did:exo:archon-scaling-{node_did}"),
"did:exo:archon-scaling",
);
holon::spawn(
holon_did,
PermissionSet::new(vec![
Permission::new("consensus.validators.read"),
Permission::new("governance.propose"),
]),
Combinator::Sequence(vec![
Combinator::Guard(
Box::new(Combinator::Identity),
Predicate {
name: "has_validator_info".into(),
required_key: "validator_count".into(),
expected_value: None,
},
),
Combinator::Transform(
Box::new(Combinator::Identity),
TransformFn {
name: "scaling_analysis".into(),
output_key: "scaling_recommendation".into(),
output_value: "analyzed".into(),
},
),
]),
)
}
pub fn create_health_holon(node_did: &Did) -> Holon {
let holon_did = did_with_static_fallback(
format!("did:exo:archon-health-{node_did}"),
"did:exo:archon-health",
);
holon::spawn(
holon_did,
PermissionSet::new(vec![
Permission::new("node.status.read"),
Permission::new("consensus.status.read"),
]),
Combinator::Transform(
Box::new(Combinator::Identity),
TransformFn {
name: "health_check".into(),
output_key: "health_status".into(),
output_value: "checked".into(),
},
),
)
}
pub fn create_infrastructure_kernel() -> Kernel {
let invariants = InvariantSet::all();
Kernel::new(b"exochain-constitutional-governance-v1", invariants)
}
#[derive(Serialize)]
struct InfrastructureHolonStepActionPayload<'a> {
domain: &'static str,
holon_id: &'a Did,
holon_state: HolonState,
capabilities: &'a PermissionSet,
}
fn signed_authority_link(
holon: &Holon,
config: &HolonManagerConfig,
) -> Result<AuthorityLink, String> {
let mut link = AuthorityLink {
grantor: config.root_did.clone(),
grantee: holon.id.clone(),
permissions: holon.capabilities.clone(),
signature: Vec::new(),
grantor_public_key: Some(config.root_public_key.as_bytes().to_vec()),
};
let message = authority_link_signature_message(&link)
.map_err(|err| format!("failed to encode authority-link signature payload: {err}"))?;
let signature = (config.root_signer)(message.as_bytes());
link.signature = signature.to_bytes().to_vec();
Ok(link)
}
fn signed_provenance(
holon: &Holon,
config: &HolonManagerConfig,
provenance_timestamp: Timestamp,
) -> Result<Provenance, String> {
let timestamp = provenance_timestamp.to_string();
let action_hash = hash_structured(&InfrastructureHolonStepActionPayload {
domain: "exo.node.infrastructure_holon.step_action.v1",
holon_id: &holon.id,
holon_state: holon.state,
capabilities: &holon.capabilities,
})
.map_err(|err| format!("failed to encode Holon provenance action hash payload: {err}"))?
.as_bytes()
.to_vec();
let mut provenance = Provenance {
actor: holon.id.clone(),
timestamp,
action_hash,
signature: Vec::new(),
public_key: Some(config.root_public_key.as_bytes().to_vec()),
voice_kind: None,
independence: None,
review_order: None,
};
let message = provenance_signature_message(&provenance)
.map_err(|err| format!("failed to encode provenance signature payload: {err}"))?;
let signature = (config.root_signer)(message.as_bytes());
provenance.signature = signature.to_bytes().to_vec();
Ok(provenance)
}
fn next_provenance_timestamp(config: &HolonManagerConfig) -> Result<Timestamp, String> {
let timestamp = (config.provenance_timestamp_source)()?;
if timestamp == Timestamp::ZERO {
return Err("Holon provenance timestamp source returned zero HLC timestamp".into());
}
Ok(timestamp)
}
fn capability_scope(capabilities: &PermissionSet) -> String {
let mut permissions: Vec<&str> = capabilities
.permissions
.iter()
.map(|permission| permission.0.as_str())
.collect();
permissions.sort_unstable();
permissions.dedup();
permissions.join(";")
}
pub fn build_holon_adjudication_context(
holon: &Holon,
config: &HolonManagerConfig,
) -> Result<AdjudicationContext, String> {
let provenance_timestamp = next_provenance_timestamp(config)?;
let authority_chain = AuthorityChain {
links: vec![signed_authority_link(holon, config)?],
};
let mut trusted_authority_keys = TrustedAuthorityKeys::default();
for link in &authority_chain.links {
if let Some(public_key) = &link.grantor_public_key {
trusted_authority_keys.insert(link.grantor.clone(), vec![public_key.clone()]);
}
}
let mut trusted_provenance_keys = TrustedProvenanceKeys::default();
trusted_provenance_keys.insert(
holon.id.clone(),
vec![config.root_public_key.as_bytes().to_vec()],
);
let consent_scope = capability_scope(&holon.capabilities);
Ok(AdjudicationContext {
actor_roles: vec![Role {
name: "worker".into(),
branch: GovernmentBranch::Executive,
}],
authority_chain,
consent_records: vec![ConsentRecord {
subject: config.root_did.clone(),
granted_to: holon.id.clone(),
scope: consent_scope.clone(),
active: true,
}],
bailment_state: BailmentState::Active {
bailor: config.root_did.clone(),
bailee: holon.id.clone(),
scope: consent_scope,
},
human_override_preserved: true,
actor_permissions: holon.capabilities.clone(),
trusted_authority_keys,
trusted_provenance_keys,
provenance: Some(signed_provenance(holon, config, provenance_timestamp)?),
quorum_evidence: None,
active_challenge_reason: None,
})
}
fn analyze_topology(peer_count: usize, _net_handle: &NetworkHandle) -> (u32, String) {
let diversity_score_bp = if peer_count == 0 {
0
} else if peer_count < 3 {
3000
} else if peer_count < 7 {
6000
} else if peer_count < 15 {
8000
} else {
10_000
};
let recommendation = if diversity_score_bp < 3000 {
"CRITICAL: No peers connected. Node is isolated.".into()
} else if diversity_score_bp < 5000 {
format!("WARNING: Only {peer_count} peers. Recommend connecting to more diverse nodes.")
} else if diversity_score_bp < 8000 {
format!(
"FAIR: {peer_count} peers, diversity score {diversity_score_bp} bp. Consider adding peers from different ASNs."
)
} else {
format!(
"GOOD: {peer_count} peers, diversity score {diversity_score_bp} bp. Topology is healthy."
)
};
(diversity_score_bp, recommendation)
}
fn analyze_scaling(validator_count: usize, node_count: usize) -> String {
if node_count == 0 {
return "No nodes in network.".into();
}
let ratio_bp = ratio_basis_points(validator_count, node_count);
let ratio_percent = ratio_bp / 100;
if validator_count < 3 {
format!(
"CRITICAL: Only {validator_count} validators. BFT requires at least 3f+1 = 4. \
Recommend promoting {} more nodes.",
4usize.saturating_sub(validator_count)
)
} else if ratio_bp < 2000 {
format!(
"LOW: {validator_count}/{node_count} nodes are validators ({ratio_percent}%). \
Consider promoting more nodes for resilience.",
)
} else if ratio_bp >= 8000 {
format!(
"HIGH: {validator_count}/{node_count} nodes are validators ({ratio_percent}%). \
Consider whether all need validator status.",
)
} else {
format!(
"GOOD: {validator_count}/{node_count} nodes are validators ({ratio_percent}%). \
Ratio is healthy.",
)
}
}
fn analyze_health(consensus_round: u64, committed_height: u64) -> HealthStatus {
if committed_height == 0 && consensus_round > 10 {
HealthStatus::Critical {
reason: format!(
"No committed nodes after {consensus_round} rounds — consensus may be stalled"
),
}
} else if consensus_round > 0 && committed_height == 0 {
HealthStatus::Degraded {
reason: "Consensus is running but no nodes committed yet".into(),
}
} else {
HealthStatus::Healthy
}
}
async fn execute_governance_action(
state: &SharedReactorState,
store: &std::sync::Arc<std::sync::Mutex<SqliteDagStore>>,
net_handle: &NetworkHandle,
action_type: GovernanceEventType,
payload: &[u8],
) -> anyhow::Result<()> {
reactor::submit_proposal(state, store, net_handle, payload).await?;
reactor::broadcast_governance_event(state, net_handle, action_type, payload.to_vec()).await
}
fn encode_validator_change(change: &ValidatorChange) -> Result<Vec<u8>, String> {
let mut buf = Vec::new();
ciborium::into_writer(change, &mut buf)
.map_err(|err| format!("failed to encode validator-set change payload: {err}"))?;
Ok(buf)
}
async fn read_holon_peer_count(
net_handle: &NetworkHandle,
holon_name: &'static str,
) -> Result<usize, String> {
net_handle
.peer_count()
.await
.map_err(|err| format!("{holon_name} Holon peer-count read failed: {err}"))
}
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
struct HolonScalingSnapshot {
validator_count: usize,
is_validator: bool,
}
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
struct HolonHealthSnapshot {
consensus_round: u64,
committed_height: u64,
}
async fn read_holon_scaling_snapshot(
reactor_state: SharedReactorState,
) -> Result<HolonScalingSnapshot, String> {
tokio::task::spawn_blocking(move || {
let state = reactor_state
.lock()
.map_err(|_| "Reactor state mutex poisoned in scaling holon".to_owned())?;
Ok(HolonScalingSnapshot {
validator_count: state.consensus.config.validators.len(),
is_validator: state.is_validator,
})
})
.await
.map_err(|error| format!("Scaling Holon reactor snapshot task failed: {error}"))?
}
async fn read_holon_health_snapshot(
reactor_state: SharedReactorState,
) -> Result<HolonHealthSnapshot, String> {
tokio::task::spawn_blocking(move || {
let state = reactor_state
.lock()
.map_err(|_| "Reactor state mutex poisoned in health holon".to_owned())?;
Ok(HolonHealthSnapshot {
consensus_round: state.consensus.current_round,
committed_height: usize_to_u64_saturating(state.consensus.committed.len()),
})
})
.await
.map_err(|error| format!("Health Holon reactor snapshot task failed: {error}"))?
}
pub async fn run_holon_manager(
config: HolonManagerConfig,
reactor_state: SharedReactorState,
shared_store: std::sync::Arc<std::sync::Mutex<SqliteDagStore>>,
net_handle: NetworkHandle,
event_tx: mpsc::Sender<HolonEvent>,
) {
let kernel = create_infrastructure_kernel();
let mut topology_holon = create_topology_holon(&config.node_did);
let mut scaling_holon = create_scaling_holon(&config.node_did);
let mut health_holon = create_health_holon(&config.node_did);
let mut topology_timer =
tokio::time::interval(Duration::from_secs(config.topology_interval_secs));
let mut scaling_timer =
tokio::time::interval(Duration::from_secs(config.scaling_interval_secs));
let mut health_timer = tokio::time::interval(Duration::from_secs(config.health_interval_secs));
topology_timer.set_missed_tick_behavior(tokio::time::MissedTickBehavior::Skip);
scaling_timer.set_missed_tick_behavior(tokio::time::MissedTickBehavior::Skip);
health_timer.set_missed_tick_behavior(tokio::time::MissedTickBehavior::Skip);
loop {
tokio::select! {
_ = topology_timer.tick() => {
if topology_holon.state == HolonState::Terminated {
continue;
}
let peer_count = match read_holon_peer_count(&net_handle, "Topology").await {
Ok(peer_count) => peer_count,
Err(e) => {
tracing::error!(err = %e, "Topology Holon peer-count read failed");
continue;
}
};
let (diversity_score_bp, recommendation) = analyze_topology(peer_count, &net_handle);
let input = CombinatorInput::new()
.with("peer_count", peer_count.to_string())
.with("diversity_score_bp", diversity_score_bp.to_string());
let ctx = match build_holon_adjudication_context(&topology_holon, &config) {
Ok(ctx) => ctx,
Err(e) => {
tracing::warn!(
err = %e,
holon = %topology_holon.id,
"Topology Holon context construction failed"
);
continue;
}
};
match holon::step(&mut topology_holon, &input, &kernel, &ctx) {
Ok(_output) => {
if event_tx.send(HolonEvent::TopologyAnalysis {
peer_count,
diversity_score_bp,
recommendation,
}).await.is_err() {
tracing::warn!("Holon event channel closed — TopologyAnalysis dropped");
}
tracing::debug!(
peer_count,
diversity_score_bp,
"Topology Holon: analysis complete"
);
}
Err(e) => {
tracing::warn!(
err = %e,
holon = %topology_holon.id,
"Topology Holon step failed"
);
if topology_holon.state == HolonState::Terminated
&& event_tx
.send(HolonEvent::HolonTerminated {
holon_id: topology_holon.id.clone(),
reason: e.to_string(),
})
.await
.is_err()
{
tracing::warn!("Holon event channel closed — HolonTerminated dropped");
}
}
}
}
_ = scaling_timer.tick() => {
if scaling_holon.state == HolonState::Terminated {
continue;
}
let scaling_snapshot =
match read_holon_scaling_snapshot(Arc::clone(&reactor_state)).await {
Ok(snapshot) => snapshot,
Err(e) => {
tracing::error!(err = %e, "Scaling Holon reactor snapshot failed");
continue;
}
};
let validator_count = scaling_snapshot.validator_count;
let peer_count = match read_holon_peer_count(&net_handle, "Scaling").await {
Ok(peer_count) => peer_count,
Err(e) => {
tracing::error!(err = %e, "Scaling Holon peer-count read failed");
continue;
}
};
let node_count = match peer_count.checked_add(1) {
Some(node_count) => node_count,
None => {
tracing::error!(
peer_count,
"Scaling Holon node-count computation overflowed"
);
continue;
}
};
let recommendation = analyze_scaling(validator_count, node_count);
let input = CombinatorInput::new()
.with("validator_count", validator_count.to_string())
.with("node_count", node_count.to_string());
let ctx = match build_holon_adjudication_context(&scaling_holon, &config) {
Ok(ctx) => ctx,
Err(e) => {
tracing::warn!(
err = %e,
holon = %scaling_holon.id,
"Scaling Holon context construction failed"
);
continue;
}
};
match holon::step(&mut scaling_holon, &input, &kernel, &ctx) {
Ok(_output) => {
if event_tx.send(HolonEvent::ScalingRecommendation {
validator_count,
node_count,
recommendation: recommendation.clone(),
}).await.is_err() {
tracing::warn!("Holon event channel closed — ScalingRecommendation dropped");
}
if validator_count < 3
&& node_count > validator_count
&& scaling_snapshot.is_validator
{
let candidate = Did::new(&format!(
"did:exo:auto-promoted-{node_count}"
))
.unwrap_or_else(|_| static_did("did:exo:candidate"));
let change = ValidatorChange::AddValidator {
did: candidate.clone(),
};
match encode_validator_change(&change) {
Ok(buf) => {
if let Err(e) = execute_governance_action(
&reactor_state,
&shared_store,
&net_handle,
GovernanceEventType::ValidatorSetChange,
&buf,
)
.await
{
tracing::warn!(
err = %e,
candidate = %candidate,
"Scaling Holon: auto-promotion failed"
);
} else {
tracing::info!(
candidate = %candidate,
"Scaling Holon: auto-promoted candidate"
);
}
}
Err(e) => {
tracing::warn!(
err = %e,
candidate = %candidate,
"Scaling Holon: validator-set change encoding failed"
);
}
}
}
tracing::debug!(
validator_count,
node_count,
"Scaling Holon: analysis complete"
);
}
Err(e) => {
tracing::warn!(
err = %e,
holon = %scaling_holon.id,
"Scaling Holon step failed"
);
if scaling_holon.state == HolonState::Terminated
&& event_tx
.send(HolonEvent::HolonTerminated {
holon_id: scaling_holon.id.clone(),
reason: e.to_string(),
})
.await
.is_err()
{
tracing::warn!("Holon event channel closed — HolonTerminated dropped");
}
}
}
}
_ = health_timer.tick() => {
if health_holon.state == HolonState::Terminated {
continue;
}
let health_snapshot =
match read_holon_health_snapshot(Arc::clone(&reactor_state)).await {
Ok(snapshot) => snapshot,
Err(e) => {
tracing::error!(err = %e, "Health Holon reactor snapshot failed");
continue;
}
};
let consensus_round = health_snapshot.consensus_round;
let committed_height = health_snapshot.committed_height;
let status = analyze_health(consensus_round, committed_height);
let input = CombinatorInput::new()
.with("consensus_round", consensus_round.to_string())
.with("committed_height", committed_height.to_string());
let ctx = match build_holon_adjudication_context(&health_holon, &config) {
Ok(ctx) => ctx,
Err(e) => {
tracing::warn!(
err = %e,
holon = %health_holon.id,
"Health Holon context construction failed"
);
continue;
}
};
match holon::step(&mut health_holon, &input, &kernel, &ctx) {
Ok(_output) => {
if event_tx.send(HolonEvent::HealthCheck {
consensus_round,
committed_height,
status,
}).await.is_err() {
tracing::warn!("Holon event channel closed — HealthCheck dropped");
}
tracing::debug!(
consensus_round,
committed_height,
"Health Holon: check complete"
);
}
Err(e) => {
tracing::warn!(
err = %e,
holon = %health_holon.id,
"Health Holon step failed"
);
if health_holon.state == HolonState::Terminated
&& event_tx
.send(HolonEvent::HolonTerminated {
holon_id: health_holon.id.clone(),
reason: e.to_string(),
})
.await
.is_err()
{
tracing::warn!("Holon event channel closed — HolonTerminated dropped");
}
}
}
}
}
}
}
#[cfg(test)]
#[allow(clippy::unwrap_used, clippy::expect_used)]
mod tests {
use std::sync::atomic::{AtomicU64, Ordering};
use super::*;
fn test_did() -> Did {
Did::new("did:exo:test-node").unwrap()
}
fn deterministic_provenance_timestamp_source(
start_physical_ms: u64,
) -> Arc<dyn Fn() -> Result<Timestamp, String> + Send + Sync> {
let next_physical_ms = Arc::new(AtomicU64::new(start_physical_ms.max(1)));
Arc::new(move || {
let physical_ms = next_physical_ms.fetch_add(1, Ordering::Relaxed);
Ok(Timestamp::new(physical_ms, 0))
})
}
fn test_config_with_intervals(
topology_interval_secs: u64,
scaling_interval_secs: u64,
health_interval_secs: u64,
) -> HolonManagerConfig {
let keypair = exo_core::crypto::KeyPair::from_secret_bytes([0x48; 32]).unwrap();
let root_public_key = *keypair.public_key();
let root_secret_key = keypair.secret_key().clone();
HolonManagerConfig {
node_did: test_did(),
root_did: Did::new("did:exo:test-root").unwrap(),
root_public_key,
root_signer: Arc::new(move |message: &[u8]| {
exo_core::crypto::sign(message, &root_secret_key)
}),
provenance_timestamp_source: deterministic_provenance_timestamp_source(1),
topology_interval_secs,
scaling_interval_secs,
health_interval_secs,
}
}
fn test_config() -> HolonManagerConfig {
test_config_with_intervals(60, 300, 30)
}
#[test]
fn module_doc_retains_infrastructure_holon_audit_status() {
let src = include_str!("holons.rs");
assert!(
src.contains("# Audit status"),
"module doc must retain the R5 audit-status section"
);
assert!(
src.contains(INFRASTRUCTURE_HOLONS_FEATURE),
"module doc must name the default-off feature flag"
);
assert!(
src.contains(INFRASTRUCTURE_HOLONS_INITIATIVE),
"module doc must point at the R5 initiative"
);
assert!(
src.contains("Ed25519 authority key"),
"module doc must call out the signed adjudication authority"
);
}
#[test]
fn production_holon_config_does_not_compile_default_authority_secret() {
let source = include_str!("holons.rs");
let production = source
.split("#[cfg(test)]")
.next()
.expect("test module marker present");
assert!(
!production.contains("impl Default for HolonManagerConfig"),
"production Holon config must not provide a default authority identity"
);
assert!(
!production.contains("default_holon_keypair"),
"production Holon config must not carry a default authority keypair helper"
);
assert!(
!production.contains("from_secret_bytes([0x48; 32])"),
"production Holon config must not compile a hardcoded authority secret"
);
}
#[test]
fn holon_provenance_source_does_not_hardcode_timestamp() {
let src = include_str!("holons.rs");
let forbidden_timestamp = ["2026-04-27", "T00:00:00Z"].concat();
assert!(
!src.contains(&forbidden_timestamp),
"Holon provenance must use caller-supplied deterministic HLC metadata, not a hardcoded timestamp"
);
}
#[test]
fn holon_provenance_action_hash_uses_structured_payload() {
let source = include_str!("holons.rs");
let production = source
.split("#[cfg(test)]")
.next()
.expect("test module marker present");
assert!(
!production.contains("Hash256::digest(format!"),
"Holon provenance action hashes must not use raw formatted string hashing"
);
assert!(
production.contains("hash_structured"),
"Holon provenance action hashes must use canonical structured hashing"
);
}
#[test]
fn scaling_auto_promotion_serialization_errors_are_not_silently_dropped() {
let source = include_str!("holons.rs");
let production = source
.split("#[cfg(test)]")
.next()
.expect("production section");
let auto_promotion = production
.split("let change = ValidatorChange::AddValidator")
.nth(1)
.expect("scaling auto-promotion block")
.split("tracing::debug!(")
.next()
.expect("auto-promotion serialization block");
assert!(
!auto_promotion.contains("ciborium::into_writer(&change, &mut buf).is_ok()"),
"validator-change CBOR serialization must not be silently dropped on error"
);
assert!(
auto_promotion.contains("encode_validator_change(&change)"),
"validator-change CBOR serialization must use the fail-closed helper"
);
}
#[test]
fn production_holon_manager_does_not_default_failed_peer_count_reads_to_zero() {
let source = include_str!("holons.rs");
let production = source
.split("#[cfg(test)]")
.next()
.expect("production section");
assert!(
!production.contains("peer_count().await.unwrap_or(0)"),
"network peer-count read failures must not be fabricated as zero peers"
);
assert!(
production.contains("read_holon_peer_count"),
"Holon manager must use a fail-closed peer-count read helper"
);
}
#[tokio::test]
async fn read_holon_peer_count_propagates_network_errors() {
let (cmd_tx, cmd_rx) = mpsc::channel(1);
drop(cmd_rx);
let net_handle = NetworkHandle::new(cmd_tx);
let err = read_holon_peer_count(&net_handle, "Topology")
.await
.expect_err("closed network command channel must be an error");
assert!(err.contains("Topology Holon peer-count read failed"));
assert!(err.contains("Network task has stopped"));
}
#[test]
fn production_holon_source_does_not_suppress_security_relevant_clippy_lints() {
let source = include_str!("holons.rs");
let production = source
.split("#[cfg(test)]")
.next()
.expect("test module marker present");
for lint in [
"clippy::expect_used",
"clippy::unwrap_used",
"clippy::as_conversions",
"clippy::single_match",
] {
assert!(
!production.contains(lint),
"production Holon source must not suppress {lint}"
);
}
}
#[test]
fn holon_manager_async_path_does_not_lock_reactor_state_directly() {
let source = include_str!("holons.rs");
let manager_source = source
.split("pub async fn run_holon_manager")
.nth(1)
.expect("Holon manager source must be present")
.split("// ---------------------------------------------------------------------------\n// Tests")
.next()
.expect("Holon manager source ends before tests");
assert!(
!manager_source.contains("reactor_state.lock()"),
"Holon async manager must isolate reactor std::sync::Mutex access from Tokio workers"
);
}
#[test]
fn holon_provenance_uses_configured_hlc_timestamp_source() {
let mut config = test_config();
config.provenance_timestamp_source = Arc::new(|| Ok(Timestamp::new(42_424, 7)));
let h = create_health_holon(&test_did());
let ctx = build_holon_adjudication_context(&h, &config).expect("holon context");
let provenance = ctx.provenance.expect("provenance");
assert_eq!(provenance.timestamp, "42424:7");
}
#[test]
fn holon_provenance_rejects_zero_hlc_timestamp_source() {
let mut config = test_config();
config.provenance_timestamp_source = Arc::new(|| Ok(Timestamp::ZERO));
let h = create_health_holon(&test_did());
let result = build_holon_adjudication_context(&h, &config);
assert!(
result.is_err(),
"Holon context construction must fail closed on zero provenance HLC metadata"
);
}
#[cfg(not(feature = "unaudited-infrastructure-holons"))]
#[test]
fn infrastructure_holons_disabled_without_feature_flag() {
assert!(
!infrastructure_holons_enabled(),
"infrastructure Holons must be disabled by default while R5 is open"
);
}
#[cfg(feature = "unaudited-infrastructure-holons")]
#[test]
fn infrastructure_holons_feature_enables_runtime() {
assert!(
infrastructure_holons_enabled(),
"feature flag must explicitly opt into the unaudited Holon runtime"
);
}
#[test]
fn topology_holon_spawns_idle() {
let h = create_topology_holon(&test_did());
assert_eq!(h.state, HolonState::Idle);
assert!(h.id.to_string().contains("archon-topology"));
}
#[test]
fn scaling_holon_spawns_idle() {
let h = create_scaling_holon(&test_did());
assert_eq!(h.state, HolonState::Idle);
assert!(h.id.to_string().contains("archon-scaling"));
}
#[test]
fn health_holon_spawns_idle() {
let h = create_health_holon(&test_did());
assert_eq!(h.state, HolonState::Idle);
assert!(h.id.to_string().contains("archon-health"));
}
#[test]
fn topology_holon_step_succeeds_with_peers() {
let kernel = create_infrastructure_kernel();
let config = test_config();
let mut h = create_topology_holon(&test_did());
let ctx = build_holon_adjudication_context(&h, &config).expect("holon context");
let input = CombinatorInput::new().with("peer_count", "5");
let output = holon::step(&mut h, &input, &kernel, &ctx).unwrap();
assert_eq!(h.state, HolonState::Idle);
assert!(output.fields.contains_key("topology_recommendation"));
assert_eq!(
output.fields.get("topology_recommendation"),
Some(&"analyzed".to_string())
);
}
#[test]
fn topology_holon_guard_rejects_no_peer_count() {
let kernel = create_infrastructure_kernel();
let config = test_config();
let mut h = create_topology_holon(&test_did());
let ctx = build_holon_adjudication_context(&h, &config).expect("holon context");
let input = CombinatorInput::new();
let result = holon::step(&mut h, &input, &kernel, &ctx);
assert!(result.is_err(), "Guard should reject missing peer_count");
}
#[test]
fn scaling_holon_step_succeeds() {
let kernel = create_infrastructure_kernel();
let config = test_config();
let mut h = create_scaling_holon(&test_did());
let ctx = build_holon_adjudication_context(&h, &config).expect("holon context");
let input = CombinatorInput::new()
.with("validator_count", "4")
.with("node_count", "12");
let output = holon::step(&mut h, &input, &kernel, &ctx).unwrap();
assert_eq!(h.state, HolonState::Idle);
assert!(output.fields.contains_key("scaling_recommendation"));
}
#[test]
fn health_holon_step_succeeds() {
let kernel = create_infrastructure_kernel();
let config = test_config();
let mut h = create_health_holon(&test_did());
let ctx = build_holon_adjudication_context(&h, &config).expect("holon context");
let input = CombinatorInput::new()
.with("consensus_round", "10")
.with("committed_height", "5");
let output = holon::step(&mut h, &input, &kernel, &ctx).unwrap();
assert_eq!(h.state, HolonState::Idle);
assert!(output.fields.contains_key("health_status"));
}
#[test]
fn topology_analysis_scoring() {
let (score_bp, rec) = analyze_topology(0, &{
let (tx, _rx) = mpsc::channel(1);
NetworkHandle::new(tx)
});
let _: u32 = score_bp;
assert_eq!(score_bp, 0);
assert!(rec.contains("CRITICAL"));
let handle = {
let (tx, _rx) = mpsc::channel(1);
NetworkHandle::new(tx)
};
let (score_bp, rec) = analyze_topology(2, &handle);
assert_eq!(score_bp, 3000);
assert!(rec.contains("WARNING"));
let (score_bp, _) = analyze_topology(10, &handle);
assert_eq!(score_bp, 8000);
let (score_bp, rec) = analyze_topology(20, &handle);
assert_eq!(score_bp, 10_000);
assert!(rec.contains("GOOD"));
}
#[test]
fn scaling_analysis_recommendations() {
let rec = analyze_scaling(2, 10);
assert!(rec.contains("CRITICAL"));
let rec = analyze_scaling(4, 50);
assert!(rec.contains("LOW"));
let rec = analyze_scaling(8, 10);
assert!(rec.contains("HIGH"));
let rec = analyze_scaling(5, 12);
assert!(rec.contains("GOOD"));
}
#[test]
fn health_analysis_statuses() {
assert_eq!(analyze_health(5, 3), HealthStatus::Healthy);
match analyze_health(5, 0) {
HealthStatus::Degraded { .. } => {}
other => panic!("Expected Degraded, got {other:?}"),
}
match analyze_health(20, 0) {
HealthStatus::Critical { .. } => {}
other => panic!("Expected Critical, got {other:?}"),
}
}
#[test]
fn holon_no_self_grant_enforced() {
let kernel = create_infrastructure_kernel();
let config = test_config();
let h = create_health_holon(&test_did());
let ctx = build_holon_adjudication_context(&h, &config).expect("holon context");
let mut h2 = create_health_holon(&test_did());
let input = CombinatorInput::new()
.with("consensus_round", "1")
.with("committed_height", "1");
assert!(holon::step(&mut h2, &input, &kernel, &ctx).is_ok());
}
#[test]
fn kernel_immutability_enforced() {
let _kernel = create_infrastructure_kernel();
let config = test_config();
let h = create_health_holon(&test_did());
let ctx = build_holon_adjudication_context(&h, &config).expect("holon context");
assert!(ctx.human_override_preserved);
assert!(!ctx.authority_chain.links.is_empty());
assert!(ctx.consent_records.iter().all(|c| c.active));
}
#[tokio::test]
async fn holon_manager_emits_health_event() {
use std::{collections::BTreeSet, sync::Arc};
use exo_core::types::Signature;
let validators: BTreeSet<Did> = (0..4)
.map(|i| Did::new(&format!("did:exo:v{i}")).unwrap())
.collect();
let reactor_config = crate::reactor::ReactorConfig {
node_did: test_did(),
is_validator: true,
validators,
validator_public_keys: std::collections::BTreeMap::new(),
round_timeout_ms: 5000,
};
let sign_fn: Arc<dyn Fn(&[u8]) -> Signature + Send + Sync> =
Arc::new(|_| Signature::from_bytes([0u8; 64]));
let reactor_state = crate::reactor::create_reactor_state(&reactor_config, sign_fn, None);
let (cmd_tx, mut cmd_rx) = mpsc::channel(256);
let net_handle = NetworkHandle::new(cmd_tx);
let (event_tx, mut event_rx) = mpsc::channel(32);
let config = test_config_with_intervals(3600, 3600, 1);
tokio::spawn(async move {
while let Some(cmd) = cmd_rx.recv().await {
match cmd {
crate::network::NetworkCommand::Publish { reply, .. } => {
let _ = reply.send(Ok(()));
}
crate::network::NetworkCommand::PeerCount { reply } => {
let _ = reply.send(0);
}
_ => {}
}
}
});
let dir = tempfile::tempdir().unwrap();
let shared_store = std::sync::Arc::new(std::sync::Mutex::new(
crate::store::SqliteDagStore::open(dir.path()).unwrap(),
));
let manager = tokio::spawn(run_holon_manager(
config,
reactor_state,
shared_store,
net_handle,
event_tx,
));
let event = tokio::time::timeout(Duration::from_secs(5), event_rx.recv())
.await
.expect("Should receive an event within 5s")
.expect("Channel should not be closed");
match event {
HolonEvent::HealthCheck { .. }
| HolonEvent::TopologyAnalysis { .. }
| HolonEvent::ScalingRecommendation { .. } => {}
other => panic!("Expected a Holon analysis event, got {other:?}"),
}
manager.abort();
}
}