use crate::consensus::doms::attestation::Attestation;
use crate::consensus::doms::entry::Entry;
use crate::consensus::doms::tx::TxU;
use crate::consensus::fabric;
use crate::consensus::fabric::Fabric;
use crate::node::protocol::Protocol;
use crate::utils::bls12_381 as bls;
use crate::utils::misc::{bin_to_bitvec, get_unix_millis_now};
use crate::utils::rocksdb::RocksDb;
use crate::utils::safe_etf::{encode_safe_deterministic, u64_to_term};
use crate::utils::{Hash, PublicKey, Signature};
use amadeus_runtime::consensus::consensus_apply::ApplyEnv;
use amadeus_runtime::consensus::consensus_kv;
use amadeus_runtime::consensus::consensus_muts::Mutation;
use amadeus_runtime::consensus::unmask_trainers;
use amadeus_utils::constants::{DST_ENTRY, DST_VRF};
use amadeus_utils::vecpak::{Term, VecpakExt, decode};
use bitvec::prelude::*;
use std::collections::HashMap;
use tracing::{debug, info, warn};
#[derive(Debug, thiserror::Error)]
pub enum Error {
#[error("wrong type: {0}")]
WrongType(&'static str),
#[error("missing: {0}")]
Missing(&'static str),
#[error("invalid signature")]
InvalidSignature,
#[error("not implemented: {0}")]
NotImplemented(&'static str),
#[error("runtime error: {0}")]
Runtime(&'static str),
#[error(transparent)]
EtfDecode(#[from] eetf::DecodeError),
#[error(transparent)]
EtfEncode(#[from] eetf::EncodeError),
#[error(transparent)]
Bls(#[from] bls::Error),
#[error(transparent)]
RocksDb(#[from] crate::utils::rocksdb::Error),
#[error(transparent)]
Fabric(#[from] fabric::Error),
#[error(transparent)]
Attestation(#[from] crate::consensus::doms::attestation::Error),
#[error(transparent)]
Entry(#[from] crate::consensus::doms::entry::Error),
#[error("bad format: {0}")]
BadFormat(&'static str),
}
#[derive(Debug, Clone, PartialEq, serde::Serialize, serde::Deserialize)]
pub struct Aggsig {
#[serde(with = "serde_bytes")]
pub mask: Vec<u8>,
#[serde(with = "serde_bytes")]
pub aggsig: Vec<u8>,
#[serde(default)]
pub mask_size: u64,
#[serde(default)]
pub mask_set_size: u64,
}
#[derive(Debug, Clone, PartialEq, serde::Serialize, serde::Deserialize)]
pub struct Consensus {
pub entry_hash: Hash,
pub mutations_hash: Hash,
pub aggsig: Aggsig,
}
impl Consensus {
pub fn from_vecpak_bin(bin: &[u8]) -> Result<Self, Error> {
let map = decode(bin)
.map_err(|_| Error::BadFormat("consensus_packed"))?
.get_proplist_map()
.ok_or(Error::BadFormat("consensus_packed"))?;
Self::from_vecpak_map(&map)
}
pub fn from_vecpak_map(map: &amadeus_utils::vecpak::PropListMap) -> Result<Self, Error> {
let entry_hash: Hash = map.get_binary(b"entry_hash").ok_or(Error::BadFormat("consensus.entry_hash"))?;
let mutations_hash: Hash =
map.get_binary(b"mutations_hash").ok_or(Error::BadFormat("consensus.mutations_hash"))?;
let aggsig_map = map.get_proplist_map(b"aggsig").ok_or(Error::BadFormat("consensus.aggsig"))?;
let aggsig = Aggsig {
mask: aggsig_map.get_binary(b"mask").unwrap_or_default(),
aggsig: aggsig_map.get_binary(b"aggsig").ok_or(Error::BadFormat("consensus.aggsig.aggsig"))?,
mask_size: aggsig_map.get_integer(b"mask_size").unwrap_or(0),
mask_set_size: aggsig_map.get_integer(b"mask_set_size").unwrap_or(0),
};
Ok(Self { entry_hash, mutations_hash, aggsig })
}
pub fn to_vecpak_term(&self) -> Term {
let mut aggsig_pairs = vec![
(Term::Binary(b"mask".to_vec()), Term::Binary(self.aggsig.mask.clone())),
(Term::Binary(b"aggsig".to_vec()), Term::Binary(self.aggsig.aggsig.clone())),
];
if self.aggsig.mask_size > 0 {
aggsig_pairs.push((Term::Binary(b"mask_size".to_vec()), Term::VarInt(self.aggsig.mask_size as i128)));
}
if self.aggsig.mask_set_size > 0 {
aggsig_pairs
.push((Term::Binary(b"mask_set_size".to_vec()), Term::VarInt(self.aggsig.mask_set_size as i128)));
}
Term::PropList(vec![
(Term::Binary(b"aggsig".to_vec()), Term::PropList(aggsig_pairs)),
(Term::Binary(b"entry_hash".to_vec()), Term::Binary(self.entry_hash.to_vec())),
(Term::Binary(b"mutations_hash".to_vec()), Term::Binary(self.mutations_hash.to_vec())),
])
}
pub fn mask(&self) -> BitVec<u8, Msb0> {
bin_to_bitvec(self.aggsig.mask.clone())
}
pub fn signature(&self) -> Option<Signature> {
Signature::try_from(self.aggsig.aggsig.as_slice()).ok()
}
}
pub fn chain_muts_rev(fabric: &Fabric, hash: &Hash) -> Option<Vec<Mutation>> {
let bin = fabric.get_muts_rev(hash).ok()??;
mutations_from_etf(&bin).ok()
}
pub fn chain_muts(fabric: &Fabric, hash: &Hash) -> Option<Vec<Mutation>> {
let bin = fabric.get_muts(hash).ok()??;
mutations_from_etf(&bin).ok()
}
#[derive(Debug, Clone)]
pub struct TxResult {
pub error: String,
pub logs: Vec<String>,
}
impl TxResult {
pub fn to_eetf_term(&self) -> eetf::Term {
let mut map = HashMap::new();
map.insert(
eetf::Term::Atom(eetf::Atom::from("error")),
eetf::Term::Atom(eetf::Atom::from(self.error.as_str())),
);
let logs_terms: Vec<eetf::Term> =
self.logs.iter().map(|log| eetf::Term::from(eetf::Binary { bytes: log.as_bytes().to_vec() })).collect();
map.insert(eetf::Term::Atom(eetf::Atom::from("logs")), eetf::Term::from(eetf::List { elements: logs_terms }));
eetf::Term::from(eetf::Map { map })
}
}
fn execute_transaction(
env: &mut ApplyEnv,
db: &RocksDb,
txu: &TxU,
) -> (String, Vec<String>, Vec<Mutation>, Vec<Mutation>) {
let action = &txu.tx.action;
env.muts.clear();
env.muts_rev.clear();
env.caller_env.tx_hash = txu.hash.to_vec();
env.caller_env.tx_signer = txu.tx.signer;
env.caller_env.account_caller = txu.tx.signer.to_vec();
env.caller_env.attached_symbol = action.attached_symbol.clone().unwrap_or_default();
env.caller_env.attached_amount = action.attached_amount.clone().unwrap_or_default();
let contract_bytes = txu.contract_bytes();
let function_str = std::str::from_utf8(&action.function).unwrap_or("");
let (error, logs) = match contract_bytes.as_slice() {
b"Epoch" => execute_epoch_call(env, function_str, &action.args),
b"Coin" => execute_coin_call(env, function_str, &action.args),
b"Contract" => execute_contract_call(env, function_str, &action.args),
contract if contract.len() == 48 => execute_wasm_call(env, db, contract, function_str, &action.args),
_ => ("invalid_contract".to_string(), vec![]),
};
(error, logs, env.muts.clone(), env.muts_rev.clone())
}
fn execute_epoch_call(env: &mut ApplyEnv, function: &str, args: &[Vec<u8>]) -> (String, Vec<String>) {
parse_epoch_call(function, args)
.and_then(|call| amadeus_runtime::consensus::bic::epoch::Epoch.call(env, call).map_err(|e| e.to_string()))
.map(|_| ("ok".to_string(), vec![]))
.unwrap_or_else(|e| (e, vec![]))
}
fn execute_coin_call(env: &mut ApplyEnv, function: &str, args: &[Vec<u8>]) -> (String, Vec<String>) {
amadeus_runtime::consensus::bic::coin::call(env, function, args)
.map(|_| ("ok".to_string(), vec![]))
.unwrap_or_else(|e| (e.to_string(), vec![]))
}
fn execute_contract_call(env: &mut ApplyEnv, function: &str, args: &[Vec<u8>]) -> (String, Vec<String>) {
amadeus_runtime::consensus::bic::contract::call(env, function, args)
.map(|_| ("ok".to_string(), vec![]))
.unwrap_or_else(|e| (e.to_string(), vec![]))
}
fn execute_wasm_call(
apply_env: &mut ApplyEnv,
_db: &RocksDb,
contract: &[u8],
function: &str,
args: &[Vec<u8>],
) -> (String, Vec<String>) {
let bytecode = match amadeus_runtime::consensus::bic::contract::bytecode(apply_env, contract) {
Ok(Some(code)) => code,
Ok(None) => return ("account_has_no_bytecode".to_string(), vec![]),
Err(e) => return (format!("bytecode_error:{}", e), vec![]),
};
match amadeus_runtime::consensus::wasm::execute(apply_env, &bytecode, function, args) {
Ok(result) => {
("ok".to_string(), result.logs)
}
Err(e) => {
(format!("wasm_error:{}", e), vec![])
}
}
}
fn parse_epoch_call(
function: &str,
args: &[Vec<u8>],
) -> Result<amadeus_runtime::consensus::bic::epoch::EpochCall, String> {
use amadeus_runtime::consensus::bic::epoch::EpochCall;
match function {
"submit_sol" => Ok(EpochCall::SubmitSol { sol: args.first().ok_or("missing sol arg")?.clone() }),
"set_emission_address" => {
let addr_bytes = args.first().ok_or("missing address arg")?;
let address = addr_bytes.as_slice().try_into().map_err(|_| "invalid address length")?;
Ok(EpochCall::SetEmissionAddress { address })
}
"slash_trainer" => {
let epoch_bytes = args.first().ok_or("missing epoch")?;
let epoch = u32::from_le_bytes(epoch_bytes.get(..4).ok_or("invalid epoch")?.try_into().unwrap()) as u64;
let malicious_pk = args.get(1).ok_or("missing pk")?.as_slice().try_into().map_err(|_| "invalid pk")?;
let signature = args.get(2).ok_or("missing signature")?.clone();
let mask = crate::utils::misc::bin_to_bitvec(args.get(3).ok_or("missing mask")?.clone());
Ok(EpochCall::SlashTrainer { epoch, malicious_pk, signature, mask, trainers: None })
}
_ => Err(format!("unknown function: {}", function)),
}
}
fn call_txs_pre(env: &mut ApplyEnv, next_entry: &Entry, txs: &[TxU]) -> Result<(), &'static str> {
let epoch = next_entry.header.height / 100_000;
let entry_signer_key =
crate::utils::misc::bcat(&[b"account:", next_entry.header.signer.as_ref(), b":balance:AMA"]);
let burn_address_key = crate::utils::misc::bcat(&[
b"account:",
&amadeus_runtime::consensus::bic::coin::BURN_ADDRESS,
b":balance:AMA",
]);
for tx in txs {
let nonce_key = crate::utils::misc::bcat(&[b"account:", tx.tx.signer.as_ref(), b":attribute:nonce"]);
let nonce_i64 = i64::try_from(tx.tx.nonce).unwrap_or(i64::MAX);
consensus_kv::kv_put(env, &nonce_key, &nonce_i64.to_string().into_bytes())?;
let bytes = tx.tx_encoded().len() + 32 + 96;
let exec_cost = if epoch >= 295 {
amadeus_runtime::consensus::bic::coin::to_cents((1 + bytes / 1024) as i128)
} else {
amadeus_runtime::consensus::bic::coin::to_cents((3 + bytes / 256 * 3) as i128)
};
let signer_balance_key = crate::utils::misc::bcat(&[b"account:", tx.tx.signer.as_ref(), b":balance:AMA"]);
consensus_kv::kv_increment(env, &signer_balance_key, -exec_cost)?;
consensus_kv::kv_increment(env, &entry_signer_key, exec_cost / 2)?;
consensus_kv::kv_increment(env, &burn_address_key, exec_cost / 2)?;
}
Ok(())
}
fn mutations_to_etf(muts: &[Mutation]) -> Vec<u8> {
use crate::utils::safe_etf::{encode_safe_deterministic, u64_to_term};
use eetf::{Atom, Binary, List, Map, Term};
use std::collections::HashMap;
let mut etf_list = Vec::new();
for m in muts {
let mut map = HashMap::new();
match m {
Mutation::Put { op: _, key, value } => {
map.insert(Term::Atom(Atom::from("op")), Term::Atom(Atom::from("put")));
map.insert(Term::Atom(Atom::from("key")), Term::Binary(Binary { bytes: key.clone() }));
map.insert(Term::Atom(Atom::from("value")), Term::Binary(Binary { bytes: value.clone() }));
}
Mutation::Delete { op: _, key } => {
map.insert(Term::Atom(Atom::from("op")), Term::Atom(Atom::from("delete")));
map.insert(Term::Atom(Atom::from("key")), Term::Binary(Binary { bytes: key.clone() }));
}
Mutation::SetBit { op: _, key, value, bloomsize } => {
map.insert(Term::Atom(Atom::from("op")), Term::Atom(Atom::from("set_bit")));
map.insert(Term::Atom(Atom::from("key")), Term::Binary(Binary { bytes: key.clone() }));
map.insert(Term::Atom(Atom::from("value")), u64_to_term(*value));
map.insert(Term::Atom(Atom::from("bloomsize")), u64_to_term(*bloomsize));
}
Mutation::ClearBit { op: _, key, value } => {
map.insert(Term::Atom(Atom::from("op")), Term::Atom(Atom::from("clear_bit")));
map.insert(Term::Atom(Atom::from("key")), Term::Binary(Binary { bytes: key.clone() }));
map.insert(Term::Atom(Atom::from("value")), u64_to_term(*value));
}
}
etf_list.push(Term::Map(Map { map }));
}
let list = Term::List(List { elements: etf_list });
encode_safe_deterministic(&list)
}
fn mutations_from_etf(bin: &[u8]) -> Result<Vec<Mutation>, Error> {
use crate::utils::misc::TermExt;
use eetf::{Atom, Term};
let term = Term::decode(bin).map_err(|_| Error::WrongType("invalid_etf"))?;
let list = match &term {
Term::List(l) => &l.elements,
_ => return Err(Error::WrongType("not_list")),
};
let mut muts = Vec::new();
for elem in list {
let map = match elem {
Term::Map(m) => &m.map,
_ => return Err(Error::WrongType("not_map")),
};
let op = map
.get(&Term::Atom(Atom::from("op")))
.and_then(|t| match t {
Term::Atom(a) => Some(a),
_ => None,
})
.ok_or(Error::Missing("op"))?;
match op.name.as_str() {
"put" => {
let key = map
.get(&Term::Atom(Atom::from("key")))
.and_then(|t| t.get_binary())
.ok_or(Error::Missing("key"))?
.to_vec();
let value = map
.get(&Term::Atom(Atom::from("value")))
.and_then(|t| t.get_binary())
.ok_or(Error::Missing("value"))?
.to_vec();
muts.push(Mutation::Put { op: vec![], key, value });
}
"delete" => {
let key = map
.get(&Term::Atom(Atom::from("key")))
.and_then(|t| t.get_binary())
.ok_or(Error::Missing("key"))?
.to_vec();
muts.push(Mutation::Delete { op: vec![], key });
}
"set_bit" => {
let key = map
.get(&Term::Atom(Atom::from("key")))
.and_then(|t| t.get_binary())
.ok_or(Error::Missing("key"))?
.to_vec();
let value = map
.get(&Term::Atom(Atom::from("value")))
.and_then(|t| t.get_integer())
.map(|i| i as u64)
.ok_or(Error::Missing("value"))?;
let bloomsize = map
.get(&Term::Atom(Atom::from("bloomsize")))
.and_then(|t| t.get_integer())
.map(|i| i as u64)
.ok_or(Error::Missing("bloomsize"))?;
muts.push(Mutation::SetBit { op: vec![], key, value, bloomsize });
}
"clear_bit" => {
let key = map
.get(&Term::Atom(Atom::from("key")))
.and_then(|t| t.get_binary())
.ok_or(Error::Missing("key"))?
.to_vec();
let value = map
.get(&Term::Atom(Atom::from("value")))
.and_then(|t| t.get_integer())
.map(|i| i as u64)
.ok_or(Error::Missing("value"))?;
muts.push(Mutation::ClearBit { op: vec![], key, value });
}
_ => return Err(Error::WrongType("unknown_op")),
}
}
Ok(muts)
}
fn call_exit(env: &mut ApplyEnv, next_entry: &Entry) -> Result<(), &'static str> {
let vr = next_entry.header.vr.to_vec();
let seed_hash = crate::utils::blake3::hash(&vr);
env.caller_env.seed = seed_hash.to_vec();
let seedf64 = f64::from_le_bytes(seed_hash[0..8].try_into().unwrap_or([0u8; 8]));
env.caller_env.seedf64 = seedf64;
if next_entry.header.height % 1000 == 0 {
consensus_kv::kv_put(
env,
b"bic:epoch:segment_vr_hash",
crate::utils::blake3::hash(next_entry.header.vr.as_ref()).as_ref(),
)?;
}
if next_entry.header.height % 100_000 == 99_999 {
env.caller_env.readonly = true;
env.caller_env.tx_hash = vec![];
env.caller_env.tx_signer = PublicKey::from([0u8; 48]);
env.caller_env.account_caller = vec![];
env.caller_env.call_exec_points = 0;
env.caller_env.call_exec_points_remaining = 0;
env.caller_env.attached_symbol = vec![];
env.caller_env.attached_amount = vec![];
let _ = amadeus_runtime::consensus::bic::epoch::Epoch.next(env);
}
Ok(())
}
pub fn apply_entry(
fabric: &Fabric,
config: &crate::config::Config,
next_entry: &Entry,
) -> Result<Option<Vec<u8>>, Error> {
let Some(curr_h) = fabric.get_temporal_height().ok().flatten() else {
return Err(Error::Missing("temporal_height"));
};
if next_entry.header.height != curr_h + 1 {
return Err(Error::WrongType("invalid_height"));
}
let txs = &next_entry.txs;
let entry_vr_b3 = crate::utils::blake3::hash(next_entry.header.vr.as_ref());
let mut env = amadeus_runtime::consensus::consensus_apply::make_apply_env(
fabric.db(),
"contractstate",
&next_entry.header.signer,
&next_entry.header.prev_hash,
next_entry.header.slot,
next_entry.header.prev_slot as u64,
next_entry.header.height,
next_entry.header.height / 100_000,
&next_entry.header.vr,
&Hash::from(entry_vr_b3),
&next_entry.header.dr,
)
.map_err(Error::Runtime)?;
call_txs_pre(&mut env, next_entry, txs).map_err(Error::Runtime)?;
let mut muts = env.muts.clone();
let mut muts_rev = env.muts_rev.clone();
let mut tx_results = Vec::new();
let db = fabric.db();
for txu in txs {
let (error, logs, m3, m_rev3) = execute_transaction(&mut env, db, txu);
if error == "ok" {
muts.extend(m3);
muts_rev.extend(m_rev3);
} else {
consensus_kv::revert(&mut env).map_err(Error::Runtime)?;
}
tx_results.push(TxResult { error, logs });
}
env.muts.clear();
env.muts_rev.clear();
call_exit(&mut env, next_entry).map_err(Error::Runtime)?;
let muts_exit = env.muts.clone();
let muts_exit_rev = env.muts_rev.clone();
muts.extend(muts_exit);
muts_rev.extend(muts_exit_rev);
let mutations_hash = hash_mutations_with_results(&muts, &tx_results);
env.txn.commit().map_err(crate::utils::rocksdb::Error::from)?;
let pk = config.get_pk();
let sk = config.get_sk();
let attestation = Attestation::sign_with(pk.as_ref(), &sk, &next_entry.hash, &Hash::from(mutations_hash))?;
let attestation_packed = attestation.to_vecpak_bin();
fabric.put_attestation(&next_entry.hash, &attestation_packed)?;
let trainers = fabric.trainers_for_height(next_entry.header.height).ok_or(Error::Missing("trainers_for_height"))?;
let is_trainer = trainers.iter().any(|t| t == &pk);
let seen_time_ms = get_unix_millis_now();
fabric.put_entry_seen_time(&next_entry.hash, seen_time_ms)?;
fabric.set_temporal_hash_height(next_entry)?;
let muts_bin = mutations_to_etf(&muts);
fabric.put_muts(&next_entry.hash, &muts_bin)?;
let muts_rev_bin = mutations_to_etf(&muts_rev);
fabric.put_muts_rev(&next_entry.hash, &muts_rev_bin)?;
let entry_bin = next_entry.to_vecpak_bin();
fabric.insert_entry(
&next_entry.hash,
next_entry.header.height,
next_entry.header.slot,
&entry_bin,
seen_time_ms,
)?;
for (tx, result) in next_entry.txs.iter().zip(tx_results.iter()) {
let nonce_padded = format!("{:020}", tx.tx.nonce);
let key = format!("{}:{}", bs58::encode(&tx.tx.signer).into_string(), nonce_padded);
fabric.put_tx_account_nonce(key.as_bytes(), &tx.hash)?;
let tx_bin = amadeus_utils::vecpak::to_vec(tx).unwrap_or_default();
let entry_bin = next_entry.to_vecpak_bin();
if let Some(pos) = entry_bin.windows(tx_bin.len()).position(|w| w == tx_bin) {
let mut tx_meta = HashMap::new();
tx_meta.insert(
eetf::Term::Atom(eetf::Atom::from("entry_hash")),
eetf::Term::from(eetf::Binary { bytes: next_entry.hash.to_vec() }),
);
tx_meta.insert(eetf::Term::Atom(eetf::Atom::from("result")), result.to_eetf_term());
tx_meta.insert(eetf::Term::Atom(eetf::Atom::from("index_start")), u64_to_term(pos as u64));
tx_meta.insert(eetf::Term::Atom(eetf::Atom::from("index_size")), u64_to_term(tx_bin.len() as u64));
let term = eetf::Term::Map(eetf::Map { map: tx_meta });
let tx_meta_bin = encode_safe_deterministic(&term);
fabric.put_tx_metadata(tx.hash.as_ref(), &tx_meta_bin)?;
}
}
Ok(if is_trainer { Some(attestation_packed) } else { None })
}
pub fn produce_entry(fabric: &Fabric, config: &crate::config::Config, slot: u64) -> Result<Entry, Error> {
let cur_entry = fabric.get_temporal_entry()?.ok_or(Error::Missing("temporal_tip"))?;
let pk = config.get_pk();
let sk = config.get_sk();
let next_header = cur_entry.build_next_header(slot, &pk, &sk)?;
let txs: Vec<crate::consensus::doms::tx::EntryTx> = Vec::new();
let txs_bin: Vec<u8> = txs.iter().flat_map(|tx| amadeus_utils::vecpak::to_vec(tx).unwrap_or_default()).collect();
let root_tx = crate::utils::blake3::hash(&txs_bin);
let mut header = next_header;
header.root_tx = Hash::from(root_tx);
let header_bin = header.to_vecpak_bin();
let header_hash = crate::utils::blake3::hash(&header_bin);
let signature = bls::sign(&sk, header_hash.as_ref(), DST_ENTRY)?;
let entry = Entry {
hash: Hash::from([0u8; 32]), header,
signature,
mask: None,
txs,
};
let entry_bin = entry.to_vecpak_bin();
let hash = crate::utils::blake3::hash(&entry_bin);
Ok(Entry {
hash: Hash::from(hash),
header: entry.header,
signature: entry.signature,
mask: entry.mask,
txs: entry.txs,
})
}
pub fn chain_rewind(fabric: &Fabric, target_hash: &Hash) -> Result<bool, Error> {
if !fabric.is_in_chain(target_hash) {
return Ok(false);
}
let tip_entry = fabric.get_temporal_entry()?.ok_or(Error::Missing("temporal_tip"))?;
let entry = chain_rewind_internal(fabric, &tip_entry, target_hash)?;
fabric.set_temporal_hash_height(&entry)?;
let rooted_hash = fabric.get_rooted_hash()?.ok_or(Error::Missing("rooted_tip"))?;
if fabric.get_entry_raw(&Hash::from(rooted_hash))?.is_none() {
let rooted_height = fabric.get_rooted_height()?.ok_or(Error::Missing("rooted_height"))?;
warn!("Rewind rolled back rooted entries from {rooted_height} until {}", entry.header.height);
fabric.set_rooted_hash_height(&entry)?;
}
Ok(true)
}
fn chain_rewind_internal(fabric: &Fabric, current_entry: &Entry, target_hash: &Hash) -> Result<Entry, Error> {
let mut current = current_entry.clone();
loop {
let prev_entry = fabric.get_entry_by_hash(¤t.header.prev_hash);
let db = fabric.db();
if let Some(m_rev_new) = chain_muts_rev(fabric, ¤t.hash) {
consensus_kv::apply_mutations(db, "contractstate", &m_rev_new).map_err(Error::Runtime)?;
}
fabric.delete_entry(¤t.hash)?;
fabric.delete_entry_seen_time(¤t.hash)?;
let mut height_key = current.header.height.to_string().into_bytes();
height_key.push(b':');
height_key.extend_from_slice(current.hash.as_ref());
fabric.delete_entry_by_height(&height_key)?;
let mut slot_key = current.header.slot.to_string().into_bytes();
slot_key.push(b':');
slot_key.extend_from_slice(current.hash.as_ref());
fabric.delete_entry_by_slot(&slot_key)?;
fabric.delete_consensus(¤t.hash)?;
fabric.delete_attestation(¤t.hash)?;
for tx in ¤t.txs {
fabric.delete_tx_metadata(&tx.hash)?;
let nonce_padded = format!("{:020}", tx.tx.nonce);
let key = format!("{}:{}", bs58::encode(&tx.tx.signer).into_string(), nonce_padded);
fabric.delete_tx_account_nonce(key.as_bytes())?;
}
if current.hash == *target_hash {
return prev_entry.ok_or(Error::Missing("prev_entry_in_rewind"));
}
current = prev_entry.ok_or(Error::Missing("prev_entry_in_rewind"))?;
}
}
pub fn best_by_weight(
trainers: &[PublicKey],
consensuses: &HashMap<[u8; 32], Consensus>,
) -> (Option<[u8; 32]>, Option<f64>, Option<Consensus>) {
let max_score = trainers.len() as f64;
let mut best: Option<([u8; 32], f64, Consensus)> = None;
for (k, v) in consensuses.iter() {
let mask = v.mask();
let trainers_signed = if mask.is_empty() { trainers.to_vec() } else { unmask_trainers(&mask, trainers) };
let mut score = 0.0;
for _pk in trainers_signed {
score += 1.0;
}
score /= max_score;
match &mut best {
None => best = Some((*k, score, v.clone())),
Some((_, best_score, _)) if score > *best_score => best = Some((*k, score, v.clone())),
_ => {}
}
}
match best {
Some((k, score, v)) => (Some(k), Some(score), Some(v)),
None => (None, None, None),
}
}
#[derive(Debug, Clone)]
pub struct ScoredEntry {
pub entry: Entry,
pub mutations_hash: Option<[u8; 32]>,
pub score: Option<f64>,
}
pub fn best_entry_for_height(fabric: &Fabric, height: u64) -> Result<Vec<ScoredEntry>, Error> {
let rooted_tip = fabric.get_rooted_hash()?.unwrap_or([0u8; 32]);
let entry_bins = fabric.entries_by_height(height as u64)?;
let mut entries = Vec::new();
for entry_bin in entry_bins {
let entry = Entry::from_vecpak_bin(&entry_bin)?;
if entry.header.prev_hash != rooted_tip {
continue;
}
let trainers = fabric.trainers_for_height(entry.header.height).ok_or(Error::Missing("trainers_for_height"))?;
let (mutations_hash, score, _consensus) = fabric.best_consensus_by_entryhash(&trainers, entry.hash.as_ref())?;
if mutations_hash.is_some() {
entries.push(ScoredEntry { entry, mutations_hash, score });
}
}
entries.sort_by(|a, b| {
let score_cmp = b.score.partial_cmp(&a.score).unwrap_or(std::cmp::Ordering::Equal);
if score_cmp != std::cmp::Ordering::Equal {
return score_cmp;
}
let slot_cmp = a.entry.header.slot.cmp(&b.entry.header.slot);
if slot_cmp != std::cmp::Ordering::Equal {
return slot_cmp;
}
let mask_cmp = a.entry.mask.is_none().cmp(&b.entry.mask.is_none());
if mask_cmp != std::cmp::Ordering::Equal {
return mask_cmp;
}
a.entry.hash.cmp(&b.entry.hash)
});
Ok(entries)
}
pub fn proc_consensus(fabric: &Fabric) -> Result<(), Error> {
if fabric.get_temporal_entry()?.is_none() {
return Ok(());
}
let initial_rooted_hash = fabric.get_rooted_hash()?.unwrap_or([0u8; 32]);
loop {
let entry_root = fabric.get_rooted_entry()?.ok_or(Error::Missing("rooted_tip"))?;
let entry_temp = fabric.get_temporal_entry()?.ok_or(Error::Missing("temporal_tip"))?;
let height_root = entry_root.header.height;
let height_temp = entry_temp.header.height;
if height_root >= height_temp {
debug!(
"proc_consensus: rooted_height {} >= temporal_height {}, nothing to process",
height_root, height_temp
);
break;
}
let next_height = height_root + 1;
let next_entries = best_entry_for_height(fabric, next_height)?;
let Some(best_entry_info) = next_entries.first() else {
break;
};
let score = best_entry_info.score.unwrap_or(0.0);
let best_entry = &best_entry_info.entry;
if score < 0.67 {
warn!(
"proc_consensus: insufficient consensus score {:.2} (need 0.67) for entry {} at height {}",
score,
bs58::encode(&best_entry.hash).into_string(),
best_entry.header.height
);
break;
}
let mutations_hash = best_entry_info.mutations_hash.unwrap();
let my_attestation = fabric.my_attestation_by_entryhash(best_entry.hash.as_ref()).ok().flatten();
match my_attestation {
None => {
warn!(
"proc_consensus softfork: rewind to entry {} height {}",
bs58::encode(&best_entry.hash).into_string(),
best_entry.header.height
);
let rewind_hash = match best_entry_for_height(fabric, next_height - 1)?.first() {
Some(prev_best) => prev_best.entry.hash,
None => Hash::from(fabric.get_temporal_hash()?.unwrap_or([0u8; 32])),
};
chain_rewind(fabric, &rewind_hash)?;
continue; }
Some(my_att) => {
if mutations_hash != my_att.mutations_hash {
warn!(
"EMERGENCY: state divergence at height {}: our mutations {} != consensus {}, halting",
best_entry.header.height,
bs58::encode(&my_att.mutations_hash).into_string(),
bs58::encode(&mutations_hash).into_string()
);
if let Some(prev_best) = best_entry_for_height(fabric, next_height - 1)?.first() {
let _ = chain_rewind(fabric, &prev_best.entry.hash);
}
break;
} else {
info!(
"proc_consensus: rooting entry {} at height {} with score {:.2}",
bs58::encode(&best_entry.hash).into_string(),
best_entry.header.height,
score
);
fabric.set_rooted_hash_height(best_entry)?;
}
}
}
}
let final_rooted_hash = fabric.get_rooted_hash()?.unwrap_or([0u8; 32]);
if final_rooted_hash != initial_rooted_hash {
info!(
"proc_consensus: rooted tip changed from {} to {}",
bs58::encode(&initial_rooted_hash).into_string(),
bs58::encode(&final_rooted_hash).into_string()
);
}
Ok(())
}
pub fn validate_next_entry(current_entry: &Entry, next_entry: &Entry) -> Result<(), Error> {
let ceh = ¤t_entry.header;
let neh = &next_entry.header;
if ceh.slot as i64 != neh.prev_slot {
return Err(Error::WrongType("invalid_slot"));
}
if ceh.height != (neh.height - 1) {
return Err(Error::WrongType("invalid_height"));
}
if current_entry.hash != neh.prev_hash {
return Err(Error::WrongType("invalid_hash"));
}
let expected_dr = crate::utils::blake3::hash(ceh.dr.as_ref());
if expected_dr != neh.dr {
return Err(Error::WrongType("invalid_dr"));
}
if bls::verify(&neh.signer, &neh.vr, &*ceh.vr, DST_VRF).is_err() {
return Err(Error::InvalidSignature);
}
Ok(())
}
fn is_quorum_synced_off_by_x(fabric: &Fabric, x: u64) -> bool {
let temporal_height = fabric.get_temporal_height().ok().flatten().unwrap_or(0);
let rooted_height = fabric.get_rooted_height().ok().flatten().unwrap_or(0);
temporal_height.saturating_sub(rooted_height) <= x
}
pub fn delete_transactions_from_pool(_txs: &[crate::consensus::doms::tx::EntryTx]) {
}
#[derive(Debug, Clone)]
pub struct SoftforkSettings {
pub softfork_hash: Vec<Hash>,
pub softfork_deny_hash: Vec<Hash>,
}
pub fn get_softfork_settings() -> SoftforkSettings {
SoftforkSettings { softfork_hash: Vec::new(), softfork_deny_hash: Vec::new() }
}
pub async fn proc_entries(fabric: &Fabric, config: &crate::config::Config, ctx: &crate::Context) -> Result<(), Error> {
if fabric.get_temporal_entry()?.is_none() {
return Ok(());
}
let softfork_settings = get_softfork_settings();
loop {
let cur_entry = fabric.get_temporal_entry()?.ok_or(Error::Missing("temporal_tip"))?;
let cur_slot = cur_entry.header.slot;
let next_height = cur_entry.header.height + 1;
let mut next_entries: Vec<Entry> = fabric
.entries_by_height(next_height as u64)?
.into_iter()
.filter_map(|entry_bin| Entry::from_vecpak_bin(&entry_bin).ok())
.filter(|next_entry| {
fabric.validate_entry_slot_trainer(next_entry, cur_slot)
&& !softfork_settings.softfork_deny_hash.contains(&next_entry.hash)
&& validate_next_entry(&cur_entry, next_entry).is_ok()
})
.collect();
next_entries.sort_by_key(|entry| {
(
softfork_settings.softfork_hash.contains(&entry.hash), entry.header.slot,
entry.mask.is_some(), entry.hash,
)
});
let Some(entry) = next_entries.first() else {
return Ok(());
};
let attestation_packed = apply_entry(fabric, config, entry)?;
debug!("Applied entry {} at height {}", bs58::encode(&entry.hash).into_string(), entry.header.height);
if let Some(attestation_packed) = attestation_packed {
if is_quorum_synced_off_by_x(fabric, 6) {
broadcast_attestation(ctx, &attestation_packed, &entry.hash).await;
}
}
delete_transactions_from_pool(&entry.txs);
}
}
async fn broadcast_attestation(ctx: &crate::Context, attestation_packed: &[u8], entry_hash: &Hash) {
use crate::consensus::doms::attestation::{Attestation, EventAttestation};
let Some(attestation) = Attestation::from_vecpak_bin(attestation_packed) else {
warn!("failed to decode attestation for broadcast");
return;
};
let event_att = Protocol::EventAttestation(EventAttestation::new(vec![attestation]));
if let Ok(peers) = ctx.peers.get_all().await {
for peer in peers {
let _ = event_att.send_to_with_metrics(ctx, peer.ip).await;
}
}
for seed_ip in &ctx.config.seed_ips {
let _ = event_att.send_to_with_metrics(ctx, *seed_ip).await;
}
debug!("Broadcasted attestation for entry {}", bs58::encode(entry_hash).into_string());
}
pub fn hash_mutations_with_results(muts: &[Mutation], results: &[TxResult]) -> [u8; 32] {
use crate::utils::safe_etf::{encode_safe_deterministic, u64_to_term};
use eetf::{Atom, Binary, List, Map, Term};
use std::collections::HashMap;
let mut etf_list = Vec::new();
for result in results {
let mut map = HashMap::new();
map.insert(Term::Atom(Atom::from("error")), Term::Atom(Atom::from(result.error.as_str())));
etf_list.push(Term::Map(Map { map }));
}
for m in muts {
let mut map = HashMap::new();
match m {
Mutation::Put { op: _, key, value } => {
map.insert(Term::Atom(Atom::from("op")), Term::Atom(Atom::from("put")));
map.insert(Term::Atom(Atom::from("key")), Term::Binary(Binary { bytes: key.clone() }));
map.insert(Term::Atom(Atom::from("value")), Term::Binary(Binary { bytes: value.clone() }));
}
Mutation::Delete { op: _, key } => {
map.insert(Term::Atom(Atom::from("op")), Term::Atom(Atom::from("delete")));
map.insert(Term::Atom(Atom::from("key")), Term::Binary(Binary { bytes: key.clone() }));
}
Mutation::SetBit { op: _, key, value, bloomsize } => {
map.insert(Term::Atom(Atom::from("op")), Term::Atom(Atom::from("set_bit")));
map.insert(Term::Atom(Atom::from("key")), Term::Binary(Binary { bytes: key.clone() }));
map.insert(Term::Atom(Atom::from("value")), u64_to_term(*value));
map.insert(Term::Atom(Atom::from("bloomsize")), u64_to_term(*bloomsize));
}
Mutation::ClearBit { op: _, key, value } => {
map.insert(Term::Atom(Atom::from("op")), Term::Atom(Atom::from("clear_bit")));
map.insert(Term::Atom(Atom::from("key")), Term::Binary(Binary { bytes: key.clone() }));
map.insert(Term::Atom(Atom::from("value")), u64_to_term(*value));
}
}
etf_list.push(Term::Map(Map { map }));
}
let list_term = Term::List(List { elements: etf_list });
let encoded = encode_safe_deterministic(&list_term);
amadeus_utils::blake3::hash(&encoded)
}