use alloy::{
primitives::{keccak256, Bytes, FixedBytes},
sol_types::SolValue,
};
use eigensdk::types::operator::OperatorId;
use newton_prover_core::{
common::compute_consensus_digest,
newton_prover_task_manager::{INewtonProverTaskManager::TaskResponse as BindingTaskResponse, NewtonMessage},
};
use serde_json::Value as JsonValue;
use std::collections::HashMap;
use tracing::{debug, info, warn};
use crate::rpc_server::{
ConsensusData, ConsensusPrepareResponse, FieldAdjustment as RpcFieldAdjustment, SignedTaskResponse, TaskResponse,
};
pub const DEFAULT_TOLERANCE_PCT: f64 = 10.0;
#[derive(Debug)]
pub enum ConsensusResult {
AlreadyConsensus {
digest: FixedBytes<32>,
response_count: usize,
},
Normalized {
digest: FixedBytes<32>,
normalized_responses: Vec<(OperatorId, SignedTaskResponse)>,
field_adjustments: Vec<FieldAdjustment>,
},
CannotReachConsensus {
reason: String,
divergent_fields: Vec<DivergentField>,
},
}
#[derive(Debug, Clone)]
pub struct FieldAdjustment {
pub field_path: String,
pub original_values: Vec<f64>,
pub median_value: f64,
pub max_deviation_pct: f64,
}
#[derive(Debug, Clone)]
pub struct DivergentField {
pub field_path: String,
pub values: Vec<f64>,
pub median: f64,
pub max_deviation_pct: f64,
pub tolerance_pct: f64,
}
#[derive(Debug, Clone)]
struct NumericField {
path: String,
value: f64,
}
pub fn build_consensus(responses: &[(OperatorId, SignedTaskResponse)], tolerance_pct: f64) -> ConsensusResult {
if responses.is_empty() {
return ConsensusResult::CannotReachConsensus {
reason: "No responses provided".to_string(),
divergent_fields: vec![],
};
}
let digests: Vec<FixedBytes<32>> = responses
.iter()
.map(|(_, r)| {
let binding: BindingTaskResponse = r.task_response.clone().into();
compute_consensus_digest(&binding)
})
.collect();
let first_digest = digests[0];
if digests.iter().all(|d| *d == first_digest) {
info!(
"All {} responses already have consensus digest {}",
responses.len(),
first_digest
);
return ConsensusResult::AlreadyConsensus {
digest: first_digest,
response_count: responses.len(),
};
}
info!(
"Found {} distinct digests among {} responses, attempting median-based normalization",
digests.iter().collect::<std::collections::HashSet<_>>().len(),
responses.len()
);
let mut all_fields: HashMap<String, Vec<(usize, f64)>> = HashMap::new();
for (idx, (operator_id, signed_response)) in responses.iter().enumerate() {
let numeric_fields = extract_numeric_fields(&signed_response.task_response.policy_task_data);
debug!(
"Operator {:?} has {} numeric fields in policyTaskData",
operator_id,
numeric_fields.len()
);
for field in numeric_fields {
all_fields.entry(field.path).or_default().push((idx, field.value));
}
}
if all_fields.is_empty() {
warn!("No numeric fields in policyTaskData; differences are in non-numeric fields");
return ConsensusResult::CannotReachConsensus {
reason: "Differences in non-numeric fields cannot be normalized".to_string(),
divergent_fields: vec![],
};
}
let mut field_adjustments = Vec::new();
let mut divergent_fields = Vec::new();
for (field_path, values) in &all_fields {
let just_values: Vec<f64> = values.iter().map(|(_, v)| *v).collect();
let median = compute_median(&just_values);
let max_deviation_pct = just_values
.iter()
.map(|v| {
if median.abs() < f64::EPSILON {
if v.abs() < f64::EPSILON {
0.0
} else {
f64::INFINITY
}
} else {
((v - median).abs() / median.abs()) * 100.0
}
})
.fold(0.0_f64, f64::max);
if max_deviation_pct > tolerance_pct {
divergent_fields.push(DivergentField {
field_path: field_path.clone(),
values: just_values.clone(),
median,
max_deviation_pct,
tolerance_pct,
});
} else {
field_adjustments.push(FieldAdjustment {
field_path: field_path.clone(),
original_values: just_values,
median_value: median,
max_deviation_pct,
});
}
}
if !divergent_fields.is_empty() {
let reason = format!(
"Numeric fields exceed {}% tolerance: {}",
tolerance_pct,
divergent_fields
.iter()
.map(|f| format!(
"{}(max_dev={:.1}%, values={:?})",
f.field_path, f.max_deviation_pct, f.values
))
.collect::<Vec<_>>()
.join(", ")
);
warn!("{}", reason);
return ConsensusResult::CannotReachConsensus {
reason,
divergent_fields,
};
}
info!(
"All {} numeric fields within {}% tolerance, normalizing to median values",
field_adjustments.len(),
tolerance_pct
);
let median_values: HashMap<String, f64> = field_adjustments
.iter()
.map(|adj| (adj.field_path.clone(), adj.median_value))
.collect();
let normalized_responses: Vec<(OperatorId, SignedTaskResponse)> = responses
.iter()
.map(|(op_id, signed_response)| {
let normalized = normalize_response(&signed_response.task_response, &median_values);
(
*op_id,
SignedTaskResponse::new(
signed_response.task_id,
normalized,
signed_response.signature(),
signed_response.operator_id(),
),
)
})
.collect();
let first_normalized = &normalized_responses[0].1.task_response;
let binding: BindingTaskResponse = first_normalized.clone().into();
let consensus_digest = compute_consensus_digest(&binding);
let all_same = normalized_responses.iter().all(|(_, r)| {
let b: BindingTaskResponse = r.task_response.clone().into();
compute_consensus_digest(&b) == consensus_digest
});
if !all_same {
return ConsensusResult::CannotReachConsensus {
reason: "Normalization failed - responses still have different digests after median adjustment".to_string(),
divergent_fields: vec![],
};
}
info!(
"Successfully normalized {} responses to consensus digest {}",
normalized_responses.len(),
consensus_digest
);
ConsensusResult::Normalized {
digest: consensus_digest,
normalized_responses,
field_adjustments,
}
}
fn extract_numeric_fields(policy_task_data: &NewtonMessage::PolicyTaskData) -> Vec<NumericField> {
let mut fields = Vec::new();
for (data_idx, policy_data) in policy_task_data.policyData.iter().enumerate() {
let data_str = match std::str::from_utf8(&policy_data.data) {
Ok(s) => s,
Err(_) => continue,
};
let json: JsonValue = match serde_json::from_str(data_str) {
Ok(v) => v,
Err(_) => continue,
};
if let JsonValue::Object(map) = json {
for (key, value) in map {
if let Some(num) = parse_numeric_value(&value) {
fields.push(NumericField {
path: format!("policyData[{}].{}", data_idx, key),
value: num,
});
}
}
}
}
fields
}
fn parse_numeric_value(value: &JsonValue) -> Option<f64> {
match value {
JsonValue::Number(n) => n.as_f64(),
JsonValue::String(s) => {
s.trim().parse::<f64>().ok()
}
_ => None,
}
}
fn compute_median(values: &[f64]) -> f64 {
if values.is_empty() {
return 0.0;
}
let mut sorted: Vec<f64> = values.to_vec();
sorted.sort_by(|a, b| a.partial_cmp(b).unwrap_or(std::cmp::Ordering::Equal));
let len = sorted.len();
if len.is_multiple_of(2) {
(sorted[len / 2 - 1] + sorted[len / 2]) / 2.0
} else {
sorted[len / 2]
}
}
pub fn compute_consensus_from_unsigned(
responses: &[ConsensusPrepareResponse],
tolerance_pct: f64,
) -> Result<ConsensusData, String> {
if responses.is_empty() {
return Err("No responses provided".to_string());
}
let first_hash = responses[0].data_hash;
let all_same = responses.iter().all(|r| r.data_hash == first_hash);
if all_same {
info!(
"[Prepare] All {} responses have identical policyTaskData (hash: {})",
responses.len(),
first_hash
);
return Ok(ConsensusData {
policy_task_data: responses[0].policy_task_data.clone(),
data_hash: first_hash,
field_adjustments: vec![],
});
}
info!(
"[Prepare] Found {} distinct hashes among {} responses, attempting median-based normalization",
responses
.iter()
.map(|r| r.data_hash)
.collect::<std::collections::HashSet<_>>()
.len(),
responses.len()
);
let mut all_fields: HashMap<String, Vec<(usize, f64)>> = HashMap::new();
for (idx, unsigned_response) in responses.iter().enumerate() {
let numeric_fields = extract_numeric_fields(&unsigned_response.policy_task_data);
debug!(
"Operator {:?} has {} numeric fields in policyTaskData",
unsigned_response.operator_id,
numeric_fields.len()
);
for field in numeric_fields {
all_fields.entry(field.path).or_default().push((idx, field.value));
}
}
if all_fields.is_empty() {
warn!("[Prepare] No numeric fields in policyTaskData; differences are in non-numeric fields");
return Err("Differences in non-numeric fields cannot be normalized".to_string());
}
let mut field_adjustments = Vec::new();
let mut divergent_fields = Vec::new();
for (field_path, values) in &all_fields {
let just_values: Vec<f64> = values.iter().map(|(_, v)| *v).collect();
let median = compute_median(&just_values);
let max_deviation_pct = just_values
.iter()
.map(|v| {
if median.abs() < f64::EPSILON {
if v.abs() < f64::EPSILON {
0.0
} else {
f64::INFINITY
}
} else {
((v - median).abs() / median.abs()) * 100.0
}
})
.fold(0.0_f64, f64::max);
if max_deviation_pct > tolerance_pct {
divergent_fields.push(DivergentField {
field_path: field_path.clone(),
values: just_values.clone(),
median,
max_deviation_pct,
tolerance_pct,
});
} else {
field_adjustments.push(FieldAdjustment {
field_path: field_path.clone(),
original_values: just_values,
median_value: median,
max_deviation_pct,
});
}
}
if !divergent_fields.is_empty() {
let reason = format!(
"Numeric fields exceed {}% tolerance: {}",
tolerance_pct,
divergent_fields
.iter()
.map(|f| format!(
"{}(max_dev={:.1}%, values={:?})",
f.field_path, f.max_deviation_pct, f.values
))
.collect::<Vec<_>>()
.join(", ")
);
warn!("[Prepare] {}", reason);
return Err(reason);
}
info!(
"[Prepare] All {} numeric fields within {}% tolerance, normalizing to median values",
field_adjustments.len(),
tolerance_pct
);
let median_values: HashMap<String, f64> = field_adjustments
.iter()
.map(|adj| (adj.field_path.clone(), adj.median_value))
.collect();
let normalized_policy_data = normalize_policy_task_data(&responses[0].policy_task_data, &median_values);
let data_hash = keccak256(normalized_policy_data.abi_encode());
info!(
"[Prepare] Successfully computed consensus policyTaskData (hash: {}) from {} responses",
data_hash,
responses.len()
);
let rpc_adjustments: Vec<RpcFieldAdjustment> = field_adjustments
.into_iter()
.map(|adj| RpcFieldAdjustment {
field_path: adj.field_path,
original_values: adj.original_values,
median_value: adj.median_value,
})
.collect();
Ok(ConsensusData {
policy_task_data: normalized_policy_data,
data_hash,
field_adjustments: rpc_adjustments,
})
}
pub fn check_early_consensus(responses: &[ConsensusPrepareResponse]) -> Option<FixedBytes<32>> {
if responses.is_empty() {
return None;
}
let first_hash = responses[0].data_hash;
if responses.iter().all(|r| r.data_hash == first_hash) {
Some(first_hash)
} else {
None
}
}
fn normalize_policy_task_data(
policy_task_data: &NewtonMessage::PolicyTaskData,
median_values: &HashMap<String, f64>,
) -> NewtonMessage::PolicyTaskData {
let mut normalized = policy_task_data.clone();
for (data_idx, policy_data) in normalized.policyData.iter_mut().enumerate() {
let data_str = match std::str::from_utf8(&policy_data.data) {
Ok(s) => s,
Err(_) => continue,
};
let mut json_value: JsonValue = match serde_json::from_str(data_str) {
Ok(v) => v,
Err(_) => continue,
};
for (field_path, median) in median_values {
let prefix = format!("policyData[{}].data.", data_idx);
if let Some(relative_path) = field_path.strip_prefix(&prefix) {
apply_median_value(&mut json_value, relative_path, *median);
}
}
if let Ok(normalized_str) = serde_json::to_string(&json_value) {
policy_data.data = Bytes::from(normalized_str.into_bytes());
}
}
normalized
}
fn apply_median_value(json: &mut JsonValue, path: &str, median: f64) {
let parts: Vec<&str> = path.split('.').collect();
let mut current = json;
for (i, part) in parts.iter().enumerate() {
if i == parts.len() - 1 {
if let JsonValue::Object(ref mut map) = current {
if let Some(value) = map.get_mut(*part) {
match value {
JsonValue::Number(_) => {
if let Some(n) = serde_json::Number::from_f64(median) {
*value = JsonValue::Number(n);
}
}
JsonValue::String(_) => {
if median.fract().abs() < f64::EPSILON {
*value = JsonValue::String(format!("{:.0}", median));
} else {
*value = JsonValue::String(median.to_string());
}
}
_ => {}
}
}
}
} else {
if let JsonValue::Object(ref mut map) = current {
if let Some(next) = map.get_mut(*part) {
current = next;
} else {
return;
}
} else {
return;
}
}
}
}
fn normalize_response(response: &TaskResponse, median_values: &HashMap<String, f64>) -> TaskResponse {
let mut normalized = response.clone();
for (data_idx, policy_data) in normalized.policy_task_data.policyData.iter_mut().enumerate() {
let data_str = match std::str::from_utf8(&policy_data.data) {
Ok(s) => s,
Err(_) => continue,
};
let mut json: JsonValue = match serde_json::from_str(data_str) {
Ok(v) => v,
Err(_) => continue,
};
if let JsonValue::Object(ref mut map) = json {
for (key, value) in map.iter_mut() {
let path = format!("policyData[{}].{}", data_idx, key);
if let Some(&median) = median_values.get(&path) {
match value {
JsonValue::Number(_) => {
if let Some(n) = serde_json::Number::from_f64(median) {
*value = JsonValue::Number(n);
}
}
JsonValue::String(_) => {
if median.fract().abs() < f64::EPSILON {
*value = JsonValue::String(format!("{:.0}", median));
} else {
*value = JsonValue::String(median.to_string());
}
}
_ => {}
}
}
}
}
if let Ok(normalized_str) = serde_json::to_string(&json) {
policy_data.data = Bytes::from(normalized_str.into_bytes());
}
}
normalized
}
#[cfg(test)]
mod tests {
use super::*;
use alloy::primitives::{Address, B256, U256};
use ark_bn254::G1Affine;
use ark_ec::AffineRepr;
use chrono::Utc;
use eigensdk::crypto_bls::Signature;
use newton_prover_core::newton_prover_task_manager::INewtonPolicy;
fn create_policy_task_data(json_data: &str) -> NewtonMessage::PolicyTaskData {
NewtonMessage::PolicyTaskData {
policyId: B256::ZERO,
policyAddress: Address::ZERO,
policy: Bytes::new(),
policyData: vec![NewtonMessage::PolicyData {
wasmArgs: Bytes::new(),
data: Bytes::from(json_data.as_bytes().to_vec()),
policyDataAddress: Address::ZERO,
expireBlock: 1000,
}],
}
}
fn create_test_intent() -> NewtonMessage::Intent {
NewtonMessage::Intent {
from: Address::ZERO,
to: Address::ZERO,
value: U256::ZERO,
data: Bytes::new(),
chainId: U256::from(1),
functionSignature: Bytes::new(),
}
}
fn create_test_policy_config() -> INewtonPolicy::PolicyConfig {
INewtonPolicy::PolicyConfig {
policyParams: Bytes::new(),
expireAfter: 1000,
}
}
fn create_test_signature() -> Signature {
Signature::new(G1Affine::generator())
}
fn create_unsigned_response(operator_idx: u8, price: f64, confidence: f64) -> ConsensusPrepareResponse {
let json_data = format!(
r#"{{"price": {}, "confidence": {}, "timestamp": 1234567890}}"#,
price, confidence
);
let policy_task_data = create_policy_task_data(&json_data);
let data_hash = keccak256(policy_task_data.abi_encode());
ConsensusPrepareResponse {
task_id: B256::ZERO,
operator_id: OperatorId::from([operator_idx; 32]),
policy_task_data,
data_hash,
timestamp: Utc::now(),
partial_decryption: None,
ephemeral_partial_decryptions: None,
identity_partial_decryptions: None,
confidential_partial_decryptions: None,
identity_data_ref_ids: None,
confidential_data_ref_ids: None,
encrypted_partials: None,
}
}
fn create_unsigned_response_string_numbers(
operator_idx: u8,
price: &str,
confidence: &str,
) -> ConsensusPrepareResponse {
let json_data = format!(
r#"{{"price": "{}", "confidence": "{}", "symbol": "BTC/USD"}}"#,
price, confidence
);
let policy_task_data = create_policy_task_data(&json_data);
let data_hash = keccak256(policy_task_data.abi_encode());
ConsensusPrepareResponse {
task_id: B256::ZERO,
operator_id: OperatorId::from([operator_idx; 32]),
policy_task_data,
data_hash,
timestamp: Utc::now(),
partial_decryption: None,
ephemeral_partial_decryptions: None,
identity_partial_decryptions: None,
confidential_partial_decryptions: None,
identity_data_ref_ids: None,
confidential_data_ref_ids: None,
encrypted_partials: None,
}
}
#[allow(dead_code)]
fn create_unsigned_response_nested(operator_idx: u8, price: f64) -> ConsensusPrepareResponse {
let json_data = format!(
r#"{{"market": {{"price": {}, "volume": 1000}}, "meta": "info"}}"#,
price
);
let policy_task_data = create_policy_task_data(&json_data);
let data_hash = keccak256(policy_task_data.abi_encode());
ConsensusPrepareResponse {
task_id: B256::ZERO,
operator_id: OperatorId::from([operator_idx; 32]),
policy_task_data,
data_hash,
timestamp: Utc::now(),
partial_decryption: None,
ephemeral_partial_decryptions: None,
identity_partial_decryptions: None,
confidential_partial_decryptions: None,
identity_data_ref_ids: None,
confidential_data_ref_ids: None,
encrypted_partials: None,
}
}
fn create_task_response(json_data: &str) -> TaskResponse {
TaskResponse {
task_id: B256::ZERO,
policy_client: Address::ZERO,
policy_id: B256::ZERO,
policy_address: Address::ZERO,
intent: create_test_intent(),
intent_signature: Bytes::new(),
evaluation_result: vec![1],
policy_task_data: create_policy_task_data(json_data),
policy_config: create_test_policy_config(),
initialization_timestamp: U256::ZERO,
}
}
fn create_signed_response(operator_idx: u8, json_data: &str) -> (OperatorId, SignedTaskResponse) {
let task_response = create_task_response(json_data);
let operator_id = OperatorId::from([operator_idx; 32]);
let signature = create_test_signature();
(
operator_id,
SignedTaskResponse::new(B256::ZERO, task_response, signature, operator_id),
)
}
#[test]
fn test_compute_median_odd() {
assert_eq!(compute_median(&[1.0, 2.0, 3.0]), 2.0);
assert_eq!(compute_median(&[5.0, 1.0, 3.0]), 3.0);
}
#[test]
fn test_compute_median_even() {
assert_eq!(compute_median(&[1.0, 2.0, 3.0, 4.0]), 2.5);
assert_eq!(compute_median(&[1.0, 2.0]), 1.5);
}
#[test]
fn test_compute_median_empty() {
assert_eq!(compute_median(&[]), 0.0);
}
#[test]
fn test_compute_median_single() {
assert_eq!(compute_median(&[42.0]), 42.0);
}
#[test]
fn test_parse_numeric_value() {
let n = serde_json::json!(123.45);
assert_eq!(parse_numeric_value(&n), Some(123.45));
let s = serde_json::json!("100.5");
assert_eq!(parse_numeric_value(&s), Some(100.5));
let non_num = serde_json::json!("hello");
assert_eq!(parse_numeric_value(&non_num), None);
let b = serde_json::json!(true);
assert_eq!(parse_numeric_value(&b), None);
}
#[test]
fn test_tolerance_calculation() {
let values = vec![95.0, 100.0, 105.0];
let median = compute_median(&values);
assert_eq!(median, 100.0);
let max_deviation = values
.iter()
.map(|v| ((v - median).abs() / median.abs()) * 100.0)
.fold(0.0_f64, f64::max);
assert_eq!(max_deviation, 5.0); }
#[test]
fn test_tolerance_exceeded() {
let values = vec![85.0, 100.0, 120.0]; let median = compute_median(&values);
assert_eq!(median, 100.0);
let max_deviation = values
.iter()
.map(|v| ((v - median).abs() / median.abs()) * 100.0)
.fold(0.0_f64, f64::max);
assert_eq!(max_deviation, 20.0); }
#[test]
fn test_extract_numeric_fields_json_numbers() {
let policy_data = create_policy_task_data(r#"{"price": 100.5, "volume": 1000}"#);
let fields = extract_numeric_fields(&policy_data);
assert_eq!(fields.len(), 2);
let price_field = fields.iter().find(|f| f.path.contains("price")).unwrap();
let volume_field = fields.iter().find(|f| f.path.contains("volume")).unwrap();
assert_eq!(price_field.value, 100.5);
assert_eq!(volume_field.value, 1000.0);
}
#[test]
fn test_extract_numeric_fields_string_numbers() {
let policy_data = create_policy_task_data(r#"{"price": "8384971100447", "confidence": "6650250235"}"#);
let fields = extract_numeric_fields(&policy_data);
assert_eq!(fields.len(), 2);
let price_field = fields.iter().find(|f| f.path.contains("price")).unwrap();
assert_eq!(price_field.value, 8384971100447.0);
}
#[test]
fn test_extract_numeric_fields_mixed_types() {
let policy_data =
create_policy_task_data(r#"{"price": 100, "symbol": "BTC/USD", "active": true, "nested": {"value": 50}}"#);
let fields = extract_numeric_fields(&policy_data);
assert_eq!(fields.len(), 1);
assert!(fields[0].path.contains("price"));
}
#[test]
fn test_extract_numeric_fields_empty_json() {
let policy_data = create_policy_task_data(r#"{}"#);
let fields = extract_numeric_fields(&policy_data);
assert!(fields.is_empty());
}
#[test]
fn test_extract_numeric_fields_invalid_json() {
let policy_data = NewtonMessage::PolicyTaskData {
policyId: B256::ZERO,
policyAddress: Address::ZERO,
policy: Bytes::new(),
policyData: vec![NewtonMessage::PolicyData {
wasmArgs: Bytes::new(),
data: Bytes::from(b"not valid json".to_vec()),
policyDataAddress: Address::ZERO,
expireBlock: 1000,
}],
};
let fields = extract_numeric_fields(&policy_data);
assert!(fields.is_empty());
}
#[test]
fn test_check_early_consensus_empty() {
let responses: Vec<ConsensusPrepareResponse> = vec![];
assert!(check_early_consensus(&responses).is_none());
}
#[test]
fn test_check_early_consensus_all_identical() {
let responses = [
create_unsigned_response(1, 100.0, 10.0),
create_unsigned_response(2, 100.0, 10.0),
create_unsigned_response(3, 100.0, 10.0),
];
let json_data = r#"{"price": 100, "confidence": 10}"#;
let policy_data = create_policy_task_data(json_data);
let data_hash = keccak256(policy_data.abi_encode());
let identical_responses: Vec<ConsensusPrepareResponse> = (1..=3)
.map(|i| ConsensusPrepareResponse {
task_id: B256::ZERO,
operator_id: OperatorId::from([i; 32]),
policy_task_data: policy_data.clone(),
data_hash,
timestamp: Utc::now(),
partial_decryption: None,
ephemeral_partial_decryptions: None,
identity_partial_decryptions: None,
confidential_partial_decryptions: None,
identity_data_ref_ids: None,
confidential_data_ref_ids: None,
encrypted_partials: None,
})
.collect();
let result = check_early_consensus(&identical_responses);
assert!(result.is_some());
assert_eq!(result.unwrap(), data_hash);
}
#[test]
fn test_check_early_consensus_different_values() {
let responses = vec![
create_unsigned_response(1, 100.0, 10.0),
create_unsigned_response(2, 102.0, 10.0), create_unsigned_response(3, 100.0, 10.0),
];
assert!(check_early_consensus(&responses).is_none());
}
#[test]
fn test_compute_consensus_from_unsigned_empty() {
let responses: Vec<ConsensusPrepareResponse> = vec![];
let result = compute_consensus_from_unsigned(&responses, DEFAULT_TOLERANCE_PCT);
assert!(result.is_err());
assert_eq!(result.unwrap_err(), "No responses provided");
}
#[test]
fn test_compute_consensus_from_unsigned_fast_path() {
let json_data = r#"{"price": 100, "confidence": 10}"#;
let policy_data = create_policy_task_data(json_data);
let data_hash = keccak256(policy_data.abi_encode());
let identical_responses: Vec<ConsensusPrepareResponse> = (1..=3)
.map(|i| ConsensusPrepareResponse {
task_id: B256::ZERO,
operator_id: OperatorId::from([i; 32]),
policy_task_data: policy_data.clone(),
data_hash,
timestamp: Utc::now(),
partial_decryption: None,
ephemeral_partial_decryptions: None,
identity_partial_decryptions: None,
confidential_partial_decryptions: None,
identity_data_ref_ids: None,
confidential_data_ref_ids: None,
encrypted_partials: None,
})
.collect();
let result = compute_consensus_from_unsigned(&identical_responses, DEFAULT_TOLERANCE_PCT);
assert!(result.is_ok());
let consensus = result.unwrap();
assert!(consensus.field_adjustments.is_empty());
}
#[test]
fn test_compute_consensus_from_unsigned_within_tolerance() {
let responses = vec![
create_unsigned_response(1, 95.0, 10.0),
create_unsigned_response(2, 100.0, 10.0),
create_unsigned_response(3, 105.0, 10.0),
];
let result = compute_consensus_from_unsigned(&responses, DEFAULT_TOLERANCE_PCT);
assert!(result.is_ok());
let consensus = result.unwrap();
assert!(!consensus.field_adjustments.is_empty());
let price_adj = consensus
.field_adjustments
.iter()
.find(|a| a.field_path.contains("price"))
.unwrap();
assert_eq!(price_adj.median_value, 100.0);
assert_eq!(price_adj.original_values.len(), 3);
}
#[test]
fn test_compute_consensus_from_unsigned_exceeds_tolerance() {
let responses = vec![
create_unsigned_response(1, 80.0, 10.0),
create_unsigned_response(2, 100.0, 10.0),
create_unsigned_response(3, 125.0, 10.0),
];
let result = compute_consensus_from_unsigned(&responses, DEFAULT_TOLERANCE_PCT);
assert!(result.is_err());
let error = result.unwrap_err();
assert!(error.contains("exceed"));
assert!(error.contains("tolerance"));
}
#[test]
fn test_compute_consensus_from_unsigned_with_string_numbers() {
let responses = vec![
create_unsigned_response_string_numbers(1, "8384971100447", "6650250235"),
create_unsigned_response_string_numbers(2, "8384983677929", "6662827716"), create_unsigned_response_string_numbers(3, "8384977389188", "6656538976"), ];
let result = compute_consensus_from_unsigned(&responses, DEFAULT_TOLERANCE_PCT);
assert!(result.is_ok());
let consensus = result.unwrap();
assert!(!consensus.field_adjustments.is_empty());
}
#[test]
fn test_compute_consensus_from_unsigned_non_numeric_difference() {
let policy_data_1 = create_policy_task_data(r#"{"symbol": "BTC/USD", "active": true}"#);
let policy_data_2 = create_policy_task_data(r#"{"symbol": "ETH/USD", "active": true}"#);
let responses = vec![
ConsensusPrepareResponse {
task_id: B256::ZERO,
operator_id: OperatorId::from([1; 32]),
policy_task_data: policy_data_1.clone(),
data_hash: keccak256(policy_data_1.abi_encode()),
timestamp: Utc::now(),
partial_decryption: None,
ephemeral_partial_decryptions: None,
identity_partial_decryptions: None,
confidential_partial_decryptions: None,
identity_data_ref_ids: None,
confidential_data_ref_ids: None,
encrypted_partials: None,
},
ConsensusPrepareResponse {
task_id: B256::ZERO,
operator_id: OperatorId::from([2; 32]),
policy_task_data: policy_data_2.clone(),
data_hash: keccak256(policy_data_2.abi_encode()),
timestamp: Utc::now(),
partial_decryption: None,
ephemeral_partial_decryptions: None,
identity_partial_decryptions: None,
confidential_partial_decryptions: None,
identity_data_ref_ids: None,
confidential_data_ref_ids: None,
encrypted_partials: None,
},
];
let result = compute_consensus_from_unsigned(&responses, DEFAULT_TOLERANCE_PCT);
assert!(result.is_err());
assert!(result.unwrap_err().contains("non-numeric"));
}
#[test]
fn test_compute_consensus_from_unsigned_zero_median() {
let responses = [
create_unsigned_response(1, 0.0, 10.0),
create_unsigned_response(2, 0.0, 10.0),
create_unsigned_response(3, 0.0, 10.0),
];
let json_data = r#"{"price": 0, "confidence": 10}"#;
let policy_data = create_policy_task_data(json_data);
let data_hash = keccak256(policy_data.abi_encode());
let identical_responses: Vec<ConsensusPrepareResponse> = (1..=3)
.map(|i| ConsensusPrepareResponse {
task_id: B256::ZERO,
operator_id: OperatorId::from([i; 32]),
policy_task_data: policy_data.clone(),
data_hash,
timestamp: Utc::now(),
partial_decryption: None,
ephemeral_partial_decryptions: None,
identity_partial_decryptions: None,
confidential_partial_decryptions: None,
identity_data_ref_ids: None,
confidential_data_ref_ids: None,
encrypted_partials: None,
})
.collect();
let result = compute_consensus_from_unsigned(&identical_responses, DEFAULT_TOLERANCE_PCT);
assert!(result.is_ok());
}
#[test]
fn test_apply_median_value_simple_number() {
let mut json = serde_json::json!({"price": 100.0, "volume": 500});
apply_median_value(&mut json, "price", 105.0);
assert_eq!(json["price"], 105.0);
assert_eq!(json["volume"], 500); }
#[test]
fn test_apply_median_value_string_to_integer() {
let mut json = serde_json::json!({"price": "100"});
apply_median_value(&mut json, "price", 105.0);
assert_eq!(json["price"], "105");
}
#[test]
fn test_apply_median_value_string_to_float() {
let mut json = serde_json::json!({"price": "100.5"});
apply_median_value(&mut json, "price", 105.75);
assert_eq!(json["price"], "105.75");
}
#[test]
fn test_apply_median_value_nested_path() {
let mut json = serde_json::json!({
"market": {
"price": 100.0,
"volume": 500
}
});
apply_median_value(&mut json, "market.price", 105.0);
assert_eq!(json["market"]["price"], 105.0);
assert_eq!(json["market"]["volume"], 500); }
#[test]
fn test_apply_median_value_nonexistent_path() {
let mut json = serde_json::json!({"price": 100.0});
let original = json.clone();
apply_median_value(&mut json, "nonexistent", 105.0);
assert_eq!(json, original);
}
#[test]
fn test_apply_median_value_deeply_nested() {
let mut json = serde_json::json!({
"level1": {
"level2": {
"level3": {
"value": 100.0
}
}
}
});
apply_median_value(&mut json, "level1.level2.level3.value", 200.0);
assert_eq!(json["level1"]["level2"]["level3"]["value"], 200.0);
}
#[test]
fn test_normalize_policy_task_data_applies_medians() {
let policy_data = create_policy_task_data(r#"{"price": 95.0, "confidence": 9.0}"#);
let mut median_values = HashMap::new();
median_values.insert("policyData[0].data.price".to_string(), 100.0);
median_values.insert("policyData[0].data.confidence".to_string(), 10.0);
let normalized = normalize_policy_task_data(&policy_data, &median_values);
let data_str = String::from_utf8(normalized.policyData[0].data.to_vec()).unwrap();
let json: JsonValue = serde_json::from_str(&data_str).unwrap();
assert_eq!(json["price"], 100.0);
assert_eq!(json["confidence"], 10.0);
}
#[test]
fn test_normalize_policy_task_data_multiple_policy_data() {
let policy_data = NewtonMessage::PolicyTaskData {
policyId: B256::ZERO,
policyAddress: Address::ZERO,
policy: Bytes::new(),
policyData: vec![
NewtonMessage::PolicyData {
wasmArgs: Bytes::new(),
data: Bytes::from(r#"{"price": 95.0}"#.as_bytes().to_vec()),
policyDataAddress: Address::ZERO,
expireBlock: 1000,
},
NewtonMessage::PolicyData {
wasmArgs: Bytes::new(),
data: Bytes::from(r#"{"volume": 900}"#.as_bytes().to_vec()),
policyDataAddress: Address::ZERO,
expireBlock: 1000,
},
],
};
let mut median_values = HashMap::new();
median_values.insert("policyData[0].data.price".to_string(), 100.0);
median_values.insert("policyData[1].data.volume".to_string(), 1000.0);
let normalized = normalize_policy_task_data(&policy_data, &median_values);
let data0 = String::from_utf8(normalized.policyData[0].data.to_vec()).unwrap();
let data1 = String::from_utf8(normalized.policyData[1].data.to_vec()).unwrap();
let json0: JsonValue = serde_json::from_str(&data0).unwrap();
let json1: JsonValue = serde_json::from_str(&data1).unwrap();
assert_eq!(json0["price"], 100.0);
assert_eq!(json1["volume"], 1000.0);
}
#[test]
fn test_build_consensus_empty_responses() {
let responses: Vec<(OperatorId, SignedTaskResponse)> = vec![];
let result = build_consensus(&responses, DEFAULT_TOLERANCE_PCT);
match result {
ConsensusResult::CannotReachConsensus { reason, .. } => {
assert!(reason.contains("No responses"));
}
_ => panic!("Expected CannotReachConsensus"),
}
}
#[test]
fn test_build_consensus_already_consensus() {
let json_data = r#"{"price": 100, "confidence": 10}"#;
let task_response = create_task_response(json_data);
let responses: Vec<(OperatorId, SignedTaskResponse)> = (1..=3)
.map(|i| {
let operator_id = OperatorId::from([i; 32]);
(
operator_id,
SignedTaskResponse::new(B256::ZERO, task_response.clone(), create_test_signature(), operator_id),
)
})
.collect();
let result = build_consensus(&responses, DEFAULT_TOLERANCE_PCT);
match result {
ConsensusResult::AlreadyConsensus { response_count, .. } => {
assert_eq!(response_count, 3);
}
_ => panic!("Expected AlreadyConsensus"),
}
}
#[test]
fn test_build_consensus_normalized() {
let responses = vec![
create_signed_response(1, r#"{"price": 95, "confidence": 10}"#),
create_signed_response(2, r#"{"price": 100, "confidence": 10}"#),
create_signed_response(3, r#"{"price": 105, "confidence": 10}"#),
];
let result = build_consensus(&responses, DEFAULT_TOLERANCE_PCT);
match result {
ConsensusResult::Normalized {
normalized_responses,
field_adjustments,
..
} => {
assert_eq!(normalized_responses.len(), 3);
assert!(!field_adjustments.is_empty());
let price_adj = field_adjustments
.iter()
.find(|a| a.field_path.contains("price"))
.unwrap();
assert_eq!(price_adj.median_value, 100.0);
}
_ => panic!("Expected Normalized, got {:?}", result),
}
}
#[test]
fn test_build_consensus_exceeds_tolerance() {
let responses = vec![
create_signed_response(1, r#"{"price": 80, "confidence": 10}"#),
create_signed_response(2, r#"{"price": 100, "confidence": 10}"#),
create_signed_response(3, r#"{"price": 125, "confidence": 10}"#),
];
let result = build_consensus(&responses, DEFAULT_TOLERANCE_PCT);
match result {
ConsensusResult::CannotReachConsensus {
divergent_fields,
reason,
..
} => {
assert!(!divergent_fields.is_empty());
assert!(reason.contains("tolerance"));
}
_ => panic!("Expected CannotReachConsensus"),
}
}
#[test]
fn test_build_consensus_no_numeric_fields() {
let responses = vec![
create_signed_response(1, r#"{"symbol": "BTC/USD"}"#),
create_signed_response(2, r#"{"symbol": "ETH/USD"}"#),
];
let result = build_consensus(&responses, DEFAULT_TOLERANCE_PCT);
match result {
ConsensusResult::CannotReachConsensus { reason, .. } => {
assert!(reason.contains("non-numeric"));
}
_ => panic!("Expected CannotReachConsensus"),
}
}
#[test]
fn test_consensus_with_large_numbers() {
let responses = vec![
create_unsigned_response(1, 8384971100447.0, 6650250235.0),
create_unsigned_response(2, 8384983677929.0, 6662827716.0),
create_unsigned_response(3, 8384977389188.0, 6656538976.0),
];
let result = compute_consensus_from_unsigned(&responses, DEFAULT_TOLERANCE_PCT);
assert!(result.is_ok());
}
#[test]
fn test_consensus_preserves_non_numeric_fields() {
let responses = vec![
create_unsigned_response(1, 95.0, 10.0),
create_unsigned_response(2, 100.0, 10.0),
create_unsigned_response(3, 105.0, 10.0),
];
let result = compute_consensus_from_unsigned(&responses, DEFAULT_TOLERANCE_PCT);
assert!(result.is_ok());
let consensus = result.unwrap();
let data_str = String::from_utf8(consensus.policy_task_data.policyData[0].data.to_vec()).unwrap();
let json: JsonValue = serde_json::from_str(&data_str).unwrap();
assert!(json.get("timestamp").is_some());
}
#[test]
fn test_single_response_always_consensus() {
let responses = vec![create_unsigned_response(1, 100.0, 10.0)];
let result = compute_consensus_from_unsigned(&responses, DEFAULT_TOLERANCE_PCT);
assert!(result.is_ok());
}
#[test]
fn test_custom_tolerance() {
let responses = vec![
create_unsigned_response(1, 99.0, 10.0),
create_unsigned_response(2, 100.0, 10.0),
create_unsigned_response(3, 101.0, 10.0),
];
let result = compute_consensus_from_unsigned(&responses, 0.5);
assert!(result.is_err());
let result = compute_consensus_from_unsigned(&responses, 2.0);
assert!(result.is_ok());
}
}