1use crate::consensus::doms::attestation::Attestation;
2use crate::consensus::doms::entry::Entry;
3use crate::consensus::doms::tx::TxU;
4use crate::consensus::fabric;
5use crate::consensus::fabric::Fabric;
6use crate::node::protocol::Protocol;
7use crate::utils::bls12_381 as bls;
8use crate::utils::misc::{bin_to_bitvec, get_unix_millis_now};
9use crate::utils::rocksdb::RocksDb;
10use crate::utils::safe_etf::{encode_safe_deterministic, u64_to_term};
11use crate::utils::{Hash, PublicKey, Signature};
12use amadeus_runtime::consensus::consensus_apply::ApplyEnv;
13use amadeus_runtime::consensus::consensus_kv;
14use amadeus_runtime::consensus::consensus_muts::Mutation;
15use amadeus_runtime::consensus::unmask_trainers;
16use amadeus_utils::constants::{DST_ENTRY, DST_VRF};
17use amadeus_utils::vecpak::{Term, VecpakExt, decode};
18use bitvec::prelude::*;
19use std::collections::HashMap;
20use tracing::{debug, info, warn};
21
22#[derive(Debug, thiserror::Error)]
23pub enum Error {
24 #[error("wrong type: {0}")]
25 WrongType(&'static str),
26 #[error("missing: {0}")]
27 Missing(&'static str),
28 #[error("invalid signature")]
29 InvalidSignature,
30 #[error("not implemented: {0}")]
31 NotImplemented(&'static str),
32 #[error("runtime error: {0}")]
33 Runtime(&'static str),
34 #[error(transparent)]
35 EtfDecode(#[from] eetf::DecodeError),
36 #[error(transparent)]
37 EtfEncode(#[from] eetf::EncodeError),
38 #[error(transparent)]
39 Bls(#[from] bls::Error),
40 #[error(transparent)]
41 RocksDb(#[from] crate::utils::rocksdb::Error),
42 #[error(transparent)]
43 Fabric(#[from] fabric::Error),
44 #[error(transparent)]
45 Attestation(#[from] crate::consensus::doms::attestation::Error),
46 #[error(transparent)]
47 Entry(#[from] crate::consensus::doms::entry::Error),
48 #[error("bad format: {0}")]
49 BadFormat(&'static str),
50}
51
52#[derive(Debug, Clone, PartialEq, serde::Serialize, serde::Deserialize)]
54pub struct Aggsig {
55 #[serde(with = "serde_bytes")]
56 pub mask: Vec<u8>,
57 #[serde(with = "serde_bytes")]
58 pub aggsig: Vec<u8>,
59 #[serde(default)]
60 pub mask_size: u64,
61 #[serde(default)]
62 pub mask_set_size: u64,
63}
64
65#[derive(Debug, Clone, PartialEq, serde::Serialize, serde::Deserialize)]
66pub struct Consensus {
67 pub entry_hash: Hash,
68 pub mutations_hash: Hash,
69 pub aggsig: Aggsig,
70}
71
72impl Consensus {
73 pub fn from_vecpak_bin(bin: &[u8]) -> Result<Self, Error> {
75 let map = decode(bin)
76 .map_err(|_| Error::BadFormat("consensus_packed"))?
77 .get_proplist_map()
78 .ok_or(Error::BadFormat("consensus_packed"))?;
79 Self::from_vecpak_map(&map)
80 }
81
82 pub fn from_vecpak_map(map: &amadeus_utils::vecpak::PropListMap) -> Result<Self, Error> {
84 let entry_hash: Hash = map.get_binary(b"entry_hash").ok_or(Error::BadFormat("consensus.entry_hash"))?;
85 let mutations_hash: Hash =
86 map.get_binary(b"mutations_hash").ok_or(Error::BadFormat("consensus.mutations_hash"))?;
87
88 let aggsig_map = map.get_proplist_map(b"aggsig").ok_or(Error::BadFormat("consensus.aggsig"))?;
90 let aggsig = Aggsig {
91 mask: aggsig_map.get_binary(b"mask").unwrap_or_default(),
92 aggsig: aggsig_map.get_binary(b"aggsig").ok_or(Error::BadFormat("consensus.aggsig.aggsig"))?,
93 mask_size: aggsig_map.get_integer(b"mask_size").unwrap_or(0),
94 mask_set_size: aggsig_map.get_integer(b"mask_set_size").unwrap_or(0),
95 };
96
97 Ok(Self { entry_hash, mutations_hash, aggsig })
98 }
99
100 pub fn to_vecpak_term(&self) -> Term {
101 let mut aggsig_pairs = vec![
103 (Term::Binary(b"mask".to_vec()), Term::Binary(self.aggsig.mask.clone())),
104 (Term::Binary(b"aggsig".to_vec()), Term::Binary(self.aggsig.aggsig.clone())),
105 ];
106 if self.aggsig.mask_size > 0 {
107 aggsig_pairs.push((Term::Binary(b"mask_size".to_vec()), Term::VarInt(self.aggsig.mask_size as i128)));
108 }
109 if self.aggsig.mask_set_size > 0 {
110 aggsig_pairs
111 .push((Term::Binary(b"mask_set_size".to_vec()), Term::VarInt(self.aggsig.mask_set_size as i128)));
112 }
113
114 Term::PropList(vec![
115 (Term::Binary(b"aggsig".to_vec()), Term::PropList(aggsig_pairs)),
116 (Term::Binary(b"entry_hash".to_vec()), Term::Binary(self.entry_hash.to_vec())),
117 (Term::Binary(b"mutations_hash".to_vec()), Term::Binary(self.mutations_hash.to_vec())),
118 ])
119 }
120
121 pub fn mask(&self) -> BitVec<u8, Msb0> {
123 bin_to_bitvec(self.aggsig.mask.clone())
124 }
125
126 pub fn signature(&self) -> Option<Signature> {
128 Signature::try_from(self.aggsig.aggsig.as_slice()).ok()
129 }
130}
131
132pub fn chain_muts_rev(fabric: &Fabric, hash: &Hash) -> Option<Vec<Mutation>> {
133 let bin = fabric.get_muts_rev(hash).ok()??;
134 mutations_from_etf(&bin).ok()
135}
136
137pub fn chain_muts(fabric: &Fabric, hash: &Hash) -> Option<Vec<Mutation>> {
138 let bin = fabric.get_muts(hash).ok()??;
139 mutations_from_etf(&bin).ok()
140}
141
142#[derive(Debug, Clone)]
143pub struct TxResult {
144 pub error: String,
145 pub logs: Vec<String>,
146}
147
148impl TxResult {
149 pub fn to_eetf_term(&self) -> eetf::Term {
151 let mut map = HashMap::new();
152
153 map.insert(
155 eetf::Term::Atom(eetf::Atom::from("error")),
156 eetf::Term::Atom(eetf::Atom::from(self.error.as_str())),
157 );
158
159 let logs_terms: Vec<eetf::Term> =
161 self.logs.iter().map(|log| eetf::Term::from(eetf::Binary { bytes: log.as_bytes().to_vec() })).collect();
162 map.insert(eetf::Term::Atom(eetf::Atom::from("logs")), eetf::Term::from(eetf::List { elements: logs_terms }));
163
164 eetf::Term::from(eetf::Map { map })
165 }
166}
167
168fn execute_transaction(
169 env: &mut ApplyEnv,
170 db: &RocksDb,
171 txu: &TxU,
172) -> (String, Vec<String>, Vec<Mutation>, Vec<Mutation>) {
173 let action = &txu.tx.action;
174
175 env.muts.clear();
176 env.muts_rev.clear();
177
178 env.caller_env.tx_hash = txu.hash.to_vec();
179 env.caller_env.tx_signer = txu.tx.signer;
180 env.caller_env.account_caller = txu.tx.signer.to_vec();
181 env.caller_env.attached_symbol = action.attached_symbol.clone().unwrap_or_default();
182 env.caller_env.attached_amount = action.attached_amount.clone().unwrap_or_default();
183
184 let contract_bytes = txu.contract_bytes();
185 let function_str = std::str::from_utf8(&action.function).unwrap_or("");
186 let (error, logs) = match contract_bytes.as_slice() {
187 b"Epoch" => execute_epoch_call(env, function_str, &action.args),
188 b"Coin" => execute_coin_call(env, function_str, &action.args),
189 b"Contract" => execute_contract_call(env, function_str, &action.args),
190 contract if contract.len() == 48 => execute_wasm_call(env, db, contract, function_str, &action.args),
191 _ => ("invalid_contract".to_string(), vec![]),
192 };
193
194 (error, logs, env.muts.clone(), env.muts_rev.clone())
195}
196
197fn execute_epoch_call(env: &mut ApplyEnv, function: &str, args: &[Vec<u8>]) -> (String, Vec<String>) {
198 parse_epoch_call(function, args)
199 .and_then(|call| amadeus_runtime::consensus::bic::epoch::Epoch.call(env, call).map_err(|e| e.to_string()))
200 .map(|_| ("ok".to_string(), vec![]))
201 .unwrap_or_else(|e| (e, vec![]))
202}
203
204fn execute_coin_call(env: &mut ApplyEnv, function: &str, args: &[Vec<u8>]) -> (String, Vec<String>) {
205 amadeus_runtime::consensus::bic::coin::call(env, function, args)
206 .map(|_| ("ok".to_string(), vec![]))
207 .unwrap_or_else(|e| (e.to_string(), vec![]))
208}
209
210fn execute_contract_call(env: &mut ApplyEnv, function: &str, args: &[Vec<u8>]) -> (String, Vec<String>) {
211 amadeus_runtime::consensus::bic::contract::call(env, function, args)
212 .map(|_| ("ok".to_string(), vec![]))
213 .unwrap_or_else(|e| (e.to_string(), vec![]))
214}
215
216fn execute_wasm_call(
217 apply_env: &mut ApplyEnv,
218 _db: &RocksDb,
219 contract: &[u8],
220 function: &str,
221 args: &[Vec<u8>],
222) -> (String, Vec<String>) {
223 let bytecode = match amadeus_runtime::consensus::bic::contract::bytecode(apply_env, contract) {
225 Ok(Some(code)) => code,
226 Ok(None) => return ("account_has_no_bytecode".to_string(), vec![]),
227 Err(e) => return (format!("bytecode_error:{}", e), vec![]),
228 };
229
230 match amadeus_runtime::consensus::wasm::execute(apply_env, &bytecode, function, args) {
232 Ok(result) => {
233 ("ok".to_string(), result.logs)
236 }
237 Err(e) => {
238 (format!("wasm_error:{}", e), vec![])
240 }
241 }
242}
243
244fn parse_epoch_call(
245 function: &str,
246 args: &[Vec<u8>],
247) -> Result<amadeus_runtime::consensus::bic::epoch::EpochCall, String> {
248 use amadeus_runtime::consensus::bic::epoch::EpochCall;
249
250 match function {
251 "submit_sol" => Ok(EpochCall::SubmitSol { sol: args.first().ok_or("missing sol arg")?.clone() }),
252 "set_emission_address" => {
253 let addr_bytes = args.first().ok_or("missing address arg")?;
254 let address = addr_bytes.as_slice().try_into().map_err(|_| "invalid address length")?;
255 Ok(EpochCall::SetEmissionAddress { address })
256 }
257 "slash_trainer" => {
258 let epoch_bytes = args.first().ok_or("missing epoch")?;
259 let epoch = u32::from_le_bytes(epoch_bytes.get(..4).ok_or("invalid epoch")?.try_into().unwrap()) as u64;
260 let malicious_pk = args.get(1).ok_or("missing pk")?.as_slice().try_into().map_err(|_| "invalid pk")?;
261 let signature = args.get(2).ok_or("missing signature")?.clone();
262 let mask = crate::utils::misc::bin_to_bitvec(args.get(3).ok_or("missing mask")?.clone());
263 Ok(EpochCall::SlashTrainer { epoch, malicious_pk, signature, mask, trainers: None })
264 }
265 _ => Err(format!("unknown function: {}", function)),
266 }
267}
268
269fn call_txs_pre(env: &mut ApplyEnv, next_entry: &Entry, txs: &[TxU]) -> Result<(), &'static str> {
270 let epoch = next_entry.header.height / 100_000;
271
272 let entry_signer_key =
273 crate::utils::misc::bcat(&[b"account:", next_entry.header.signer.as_ref(), b":balance:AMA"]);
274 let burn_address_key = crate::utils::misc::bcat(&[
275 b"account:",
276 &amadeus_runtime::consensus::bic::coin::BURN_ADDRESS,
277 b":balance:AMA",
278 ]);
279
280 for tx in txs {
281 let nonce_key = crate::utils::misc::bcat(&[b"account:", tx.tx.signer.as_ref(), b":attribute:nonce"]);
282 let nonce_i64 = i64::try_from(tx.tx.nonce).unwrap_or(i64::MAX);
283 consensus_kv::kv_put(env, &nonce_key, &nonce_i64.to_string().into_bytes())?;
284
285 let bytes = tx.tx_encoded().len() + 32 + 96;
286 let exec_cost = if epoch >= 295 {
287 amadeus_runtime::consensus::bic::coin::to_cents((1 + bytes / 1024) as i128)
288 } else {
289 amadeus_runtime::consensus::bic::coin::to_cents((3 + bytes / 256 * 3) as i128)
290 };
291
292 let signer_balance_key = crate::utils::misc::bcat(&[b"account:", tx.tx.signer.as_ref(), b":balance:AMA"]);
293 consensus_kv::kv_increment(env, &signer_balance_key, -exec_cost)?;
294
295 consensus_kv::kv_increment(env, &entry_signer_key, exec_cost / 2)?;
296 consensus_kv::kv_increment(env, &burn_address_key, exec_cost / 2)?;
297 }
298 Ok(())
299}
300
301fn mutations_to_etf(muts: &[Mutation]) -> Vec<u8> {
303 use crate::utils::safe_etf::{encode_safe_deterministic, u64_to_term};
304 use eetf::{Atom, Binary, List, Map, Term};
305 use std::collections::HashMap;
306
307 let mut etf_list = Vec::new();
308
309 for m in muts {
310 let mut map = HashMap::new();
311
312 match m {
313 Mutation::Put { op: _, key, value } => {
314 map.insert(Term::Atom(Atom::from("op")), Term::Atom(Atom::from("put")));
315 map.insert(Term::Atom(Atom::from("key")), Term::Binary(Binary { bytes: key.clone() }));
316 map.insert(Term::Atom(Atom::from("value")), Term::Binary(Binary { bytes: value.clone() }));
317 }
318 Mutation::Delete { op: _, key } => {
319 map.insert(Term::Atom(Atom::from("op")), Term::Atom(Atom::from("delete")));
320 map.insert(Term::Atom(Atom::from("key")), Term::Binary(Binary { bytes: key.clone() }));
321 }
322 Mutation::SetBit { op: _, key, value, bloomsize } => {
323 map.insert(Term::Atom(Atom::from("op")), Term::Atom(Atom::from("set_bit")));
324 map.insert(Term::Atom(Atom::from("key")), Term::Binary(Binary { bytes: key.clone() }));
325 map.insert(Term::Atom(Atom::from("value")), u64_to_term(*value));
326 map.insert(Term::Atom(Atom::from("bloomsize")), u64_to_term(*bloomsize));
327 }
328 Mutation::ClearBit { op: _, key, value } => {
329 map.insert(Term::Atom(Atom::from("op")), Term::Atom(Atom::from("clear_bit")));
330 map.insert(Term::Atom(Atom::from("key")), Term::Binary(Binary { bytes: key.clone() }));
331 map.insert(Term::Atom(Atom::from("value")), u64_to_term(*value));
332 }
333 }
334
335 etf_list.push(Term::Map(Map { map }));
336 }
337
338 let list = Term::List(List { elements: etf_list });
339 encode_safe_deterministic(&list)
340}
341
342fn mutations_from_etf(bin: &[u8]) -> Result<Vec<Mutation>, Error> {
343 use crate::utils::misc::TermExt;
344 use eetf::{Atom, Term};
345
346 let term = Term::decode(bin).map_err(|_| Error::WrongType("invalid_etf"))?;
347 let list = match &term {
348 Term::List(l) => &l.elements,
349 _ => return Err(Error::WrongType("not_list")),
350 };
351
352 let mut muts = Vec::new();
353 for elem in list {
354 let map = match elem {
355 Term::Map(m) => &m.map,
356 _ => return Err(Error::WrongType("not_map")),
357 };
358
359 let op = map
360 .get(&Term::Atom(Atom::from("op")))
361 .and_then(|t| match t {
362 Term::Atom(a) => Some(a),
363 _ => None,
364 })
365 .ok_or(Error::Missing("op"))?;
366
367 match op.name.as_str() {
368 "put" => {
369 let key = map
370 .get(&Term::Atom(Atom::from("key")))
371 .and_then(|t| t.get_binary())
372 .ok_or(Error::Missing("key"))?
373 .to_vec();
374 let value = map
375 .get(&Term::Atom(Atom::from("value")))
376 .and_then(|t| t.get_binary())
377 .ok_or(Error::Missing("value"))?
378 .to_vec();
379 muts.push(Mutation::Put { op: vec![], key, value });
380 }
381 "delete" => {
382 let key = map
383 .get(&Term::Atom(Atom::from("key")))
384 .and_then(|t| t.get_binary())
385 .ok_or(Error::Missing("key"))?
386 .to_vec();
387 muts.push(Mutation::Delete { op: vec![], key });
388 }
389 "set_bit" => {
390 let key = map
391 .get(&Term::Atom(Atom::from("key")))
392 .and_then(|t| t.get_binary())
393 .ok_or(Error::Missing("key"))?
394 .to_vec();
395 let value = map
396 .get(&Term::Atom(Atom::from("value")))
397 .and_then(|t| t.get_integer())
398 .map(|i| i as u64)
399 .ok_or(Error::Missing("value"))?;
400 let bloomsize = map
401 .get(&Term::Atom(Atom::from("bloomsize")))
402 .and_then(|t| t.get_integer())
403 .map(|i| i as u64)
404 .ok_or(Error::Missing("bloomsize"))?;
405 muts.push(Mutation::SetBit { op: vec![], key, value, bloomsize });
406 }
407 "clear_bit" => {
408 let key = map
409 .get(&Term::Atom(Atom::from("key")))
410 .and_then(|t| t.get_binary())
411 .ok_or(Error::Missing("key"))?
412 .to_vec();
413 let value = map
414 .get(&Term::Atom(Atom::from("value")))
415 .and_then(|t| t.get_integer())
416 .map(|i| i as u64)
417 .ok_or(Error::Missing("value"))?;
418 muts.push(Mutation::ClearBit { op: vec![], key, value });
419 }
420 _ => return Err(Error::WrongType("unknown_op")),
421 }
422 }
423
424 Ok(muts)
425}
426
427fn call_exit(env: &mut ApplyEnv, next_entry: &Entry) -> Result<(), &'static str> {
429 let vr = next_entry.header.vr.to_vec();
431 let seed_hash = crate::utils::blake3::hash(&vr);
432 env.caller_env.seed = seed_hash.to_vec();
433 let seedf64 = f64::from_le_bytes(seed_hash[0..8].try_into().unwrap_or([0u8; 8]));
435 env.caller_env.seedf64 = seedf64;
436
437 if next_entry.header.height % 1000 == 0 {
439 consensus_kv::kv_put(
440 env,
441 b"bic:epoch:segment_vr_hash",
442 crate::utils::blake3::hash(next_entry.header.vr.as_ref()).as_ref(),
443 )?;
444 }
445
446 if next_entry.header.height % 100_000 == 99_999 {
448 env.caller_env.readonly = true;
450 env.caller_env.tx_hash = vec![];
451 env.caller_env.tx_signer = PublicKey::from([0u8; 48]);
452 env.caller_env.account_caller = vec![];
453 env.caller_env.call_exec_points = 0;
454 env.caller_env.call_exec_points_remaining = 0;
455 env.caller_env.attached_symbol = vec![];
456 env.caller_env.attached_amount = vec![];
457 let _ = amadeus_runtime::consensus::bic::epoch::Epoch.next(env);
458 }
459 Ok(())
460}
461
462pub fn apply_entry(
463 fabric: &Fabric,
464 config: &crate::config::Config,
465 next_entry: &Entry,
466) -> Result<Option<Vec<u8>>, Error> {
467 let Some(curr_h) = fabric.get_temporal_height().ok().flatten() else {
468 return Err(Error::Missing("temporal_height"));
469 };
470
471 if next_entry.header.height != curr_h + 1 {
472 return Err(Error::WrongType("invalid_height"));
473 }
474
475 let txs = &next_entry.txs;
476
477 let entry_vr_b3 = crate::utils::blake3::hash(next_entry.header.vr.as_ref());
479 let mut env = amadeus_runtime::consensus::consensus_apply::make_apply_env(
480 fabric.db(),
481 "contractstate",
482 &next_entry.header.signer,
483 &next_entry.header.prev_hash,
484 next_entry.header.slot,
485 next_entry.header.prev_slot as u64,
486 next_entry.header.height,
487 next_entry.header.height / 100_000,
488 &next_entry.header.vr,
489 &Hash::from(entry_vr_b3),
490 &next_entry.header.dr,
491 )
492 .map_err(Error::Runtime)?;
493
494 call_txs_pre(&mut env, next_entry, txs).map_err(Error::Runtime)?;
496 let mut muts = env.muts.clone();
498 let mut muts_rev = env.muts_rev.clone();
499
500 let mut tx_results = Vec::new();
502 let db = fabric.db();
503 for txu in txs {
504 let (error, logs, m3, m_rev3) = execute_transaction(&mut env, db, txu);
505
506 if error == "ok" {
507 muts.extend(m3);
509 muts_rev.extend(m_rev3);
510 } else {
511 consensus_kv::revert(&mut env).map_err(Error::Runtime)?;
513 }
514
515 tx_results.push(TxResult { error, logs });
516 }
517
518 env.muts.clear();
520 env.muts_rev.clear();
521
522 call_exit(&mut env, next_entry).map_err(Error::Runtime)?;
524
525 let muts_exit = env.muts.clone();
527 let muts_exit_rev = env.muts_rev.clone();
528 muts.extend(muts_exit);
529 muts_rev.extend(muts_exit_rev);
530
531 let mutations_hash = hash_mutations_with_results(&muts, &tx_results);
533
534 env.txn.commit().map_err(crate::utils::rocksdb::Error::from)?;
536
537 let pk = config.get_pk();
539 let sk = config.get_sk();
540 let attestation = Attestation::sign_with(pk.as_ref(), &sk, &next_entry.hash, &Hash::from(mutations_hash))?;
541 let attestation_packed = attestation.to_vecpak_bin();
542
543 fabric.put_attestation(&next_entry.hash, &attestation_packed)?;
545
546 let trainers = fabric.trainers_for_height(next_entry.header.height).ok_or(Error::Missing("trainers_for_height"))?;
548 let is_trainer = trainers.iter().any(|t| t == &pk);
549
550 let seen_time_ms = get_unix_millis_now();
551 fabric.put_entry_seen_time(&next_entry.hash, seen_time_ms)?;
552
553 fabric.set_temporal_hash_height(next_entry)?;
555
556 let muts_bin = mutations_to_etf(&muts);
558 fabric.put_muts(&next_entry.hash, &muts_bin)?;
559 let muts_rev_bin = mutations_to_etf(&muts_rev);
560 fabric.put_muts_rev(&next_entry.hash, &muts_rev_bin)?;
561
562 let entry_bin = next_entry.to_vecpak_bin();
564 fabric.insert_entry(
565 &next_entry.hash,
566 next_entry.header.height,
567 next_entry.header.slot,
568 &entry_bin,
569 seen_time_ms,
570 )?;
571
572 for (tx, result) in next_entry.txs.iter().zip(tx_results.iter()) {
573 let nonce_padded = format!("{:020}", tx.tx.nonce);
574 let key = format!("{}:{}", bs58::encode(&tx.tx.signer).into_string(), nonce_padded);
575 fabric.put_tx_account_nonce(key.as_bytes(), &tx.hash)?;
576
577 let tx_bin = amadeus_utils::vecpak::to_vec(tx).unwrap_or_default();
578 let entry_bin = next_entry.to_vecpak_bin();
579 if let Some(pos) = entry_bin.windows(tx_bin.len()).position(|w| w == tx_bin) {
580 let mut tx_meta = HashMap::new();
582 tx_meta.insert(
583 eetf::Term::Atom(eetf::Atom::from("entry_hash")),
584 eetf::Term::from(eetf::Binary { bytes: next_entry.hash.to_vec() }),
585 );
586 tx_meta.insert(eetf::Term::Atom(eetf::Atom::from("result")), result.to_eetf_term());
587 tx_meta.insert(eetf::Term::Atom(eetf::Atom::from("index_start")), u64_to_term(pos as u64));
588 tx_meta.insert(eetf::Term::Atom(eetf::Atom::from("index_size")), u64_to_term(tx_bin.len() as u64));
589
590 let term = eetf::Term::Map(eetf::Map { map: tx_meta });
591 let tx_meta_bin = encode_safe_deterministic(&term);
592 fabric.put_tx_metadata(tx.hash.as_ref(), &tx_meta_bin)?;
593 }
594 }
595
596 Ok(if is_trainer { Some(attestation_packed) } else { None })
597}
598
599pub fn produce_entry(fabric: &Fabric, config: &crate::config::Config, slot: u64) -> Result<Entry, Error> {
600 let cur_entry = fabric.get_temporal_entry()?.ok_or(Error::Missing("temporal_tip"))?;
601
602 let pk = config.get_pk();
604 let sk = config.get_sk();
605 let next_header = cur_entry.build_next_header(slot, &pk, &sk)?;
606
607 let txs: Vec<crate::consensus::doms::tx::EntryTx> = Vec::new();
609
610 let txs_bin: Vec<u8> = txs.iter().flat_map(|tx| amadeus_utils::vecpak::to_vec(tx).unwrap_or_default()).collect();
612 let root_tx = crate::utils::blake3::hash(&txs_bin);
613
614 let mut header = next_header;
615 header.root_tx = Hash::from(root_tx);
616
617 let header_bin = header.to_vecpak_bin();
619 let header_hash = crate::utils::blake3::hash(&header_bin);
620 let signature = bls::sign(&sk, header_hash.as_ref(), DST_ENTRY)?;
621
622 let entry = Entry {
624 hash: Hash::from([0u8; 32]), header,
626 signature,
627 mask: None,
628 txs,
629 };
630
631 let entry_bin = entry.to_vecpak_bin();
633 let hash = crate::utils::blake3::hash(&entry_bin);
634
635 Ok(Entry {
636 hash: Hash::from(hash),
637 header: entry.header,
638 signature: entry.signature,
639 mask: entry.mask,
640 txs: entry.txs,
641 })
642}
643
644pub fn chain_rewind(fabric: &Fabric, target_hash: &Hash) -> Result<bool, Error> {
645 if !fabric.is_in_chain(target_hash) {
646 return Ok(false);
647 }
648
649 let tip_entry = fabric.get_temporal_entry()?.ok_or(Error::Missing("temporal_tip"))?;
650 let entry = chain_rewind_internal(fabric, &tip_entry, target_hash)?;
651
652 fabric.set_temporal_hash_height(&entry)?;
653
654 let rooted_hash = fabric.get_rooted_hash()?.ok_or(Error::Missing("rooted_tip"))?;
655 if fabric.get_entry_raw(&Hash::from(rooted_hash))?.is_none() {
656 let rooted_height = fabric.get_rooted_height()?.ok_or(Error::Missing("rooted_height"))?;
657 warn!("Rewind rolled back rooted entries from {rooted_height} until {}", entry.header.height);
658 fabric.set_rooted_hash_height(&entry)?;
659 }
660
661 Ok(true)
662}
663
664fn chain_rewind_internal(fabric: &Fabric, current_entry: &Entry, target_hash: &Hash) -> Result<Entry, Error> {
665 let mut current = current_entry.clone();
666
667 loop {
668 let prev_entry = fabric.get_entry_by_hash(¤t.header.prev_hash);
670
671 let db = fabric.db();
673 if let Some(m_rev_new) = chain_muts_rev(fabric, ¤t.hash) {
674 consensus_kv::apply_mutations(db, "contractstate", &m_rev_new).map_err(Error::Runtime)?;
675 }
676
677 fabric.delete_entry(¤t.hash)?;
679 fabric.delete_entry_seen_time(¤t.hash)?;
680
681 let mut height_key = current.header.height.to_string().into_bytes();
682 height_key.push(b':');
683 height_key.extend_from_slice(current.hash.as_ref());
684 fabric.delete_entry_by_height(&height_key)?;
685
686 let mut slot_key = current.header.slot.to_string().into_bytes();
687 slot_key.push(b':');
688 slot_key.extend_from_slice(current.hash.as_ref());
689 fabric.delete_entry_by_slot(&slot_key)?;
690
691 fabric.delete_consensus(¤t.hash)?;
692 fabric.delete_attestation(¤t.hash)?;
693
694 for tx in ¤t.txs {
695 fabric.delete_tx_metadata(&tx.hash)?;
696 let nonce_padded = format!("{:020}", tx.tx.nonce);
697 let key = format!("{}:{}", bs58::encode(&tx.tx.signer).into_string(), nonce_padded);
698 fabric.delete_tx_account_nonce(key.as_bytes())?;
699 }
700
701 if current.hash == *target_hash {
703 return prev_entry.ok_or(Error::Missing("prev_entry_in_rewind"));
704 }
705
706 current = prev_entry.ok_or(Error::Missing("prev_entry_in_rewind"))?;
708 }
709}
710
711pub fn best_by_weight(
712 trainers: &[PublicKey],
713 consensuses: &HashMap<[u8; 32], Consensus>,
714) -> (Option<[u8; 32]>, Option<f64>, Option<Consensus>) {
715 let max_score = trainers.len() as f64;
716 let mut best: Option<([u8; 32], f64, Consensus)> = None;
717
718 for (k, v) in consensuses.iter() {
719 let mask = v.mask();
721 let trainers_signed = if mask.is_empty() { trainers.to_vec() } else { unmask_trainers(&mask, trainers) };
722 let mut score = 0.0;
723 for _pk in trainers_signed {
724 score += 1.0;
726 }
727 score /= max_score;
728
729 match &mut best {
730 None => best = Some((*k, score, v.clone())),
731 Some((_, best_score, _)) if score > *best_score => best = Some((*k, score, v.clone())),
732 _ => {}
733 }
734 }
735
736 match best {
737 Some((k, score, v)) => (Some(k), Some(score), Some(v)),
738 None => (None, None, None),
739 }
740}
741
742#[derive(Debug, Clone)]
743pub struct ScoredEntry {
744 pub entry: Entry,
745 pub mutations_hash: Option<[u8; 32]>,
746 pub score: Option<f64>,
747}
748
749pub fn best_entry_for_height(fabric: &Fabric, height: u64) -> Result<Vec<ScoredEntry>, Error> {
750 let rooted_tip = fabric.get_rooted_hash()?.unwrap_or([0u8; 32]);
751
752 let entry_bins = fabric.entries_by_height(height as u64)?;
754 let mut entries = Vec::new();
755
756 for entry_bin in entry_bins {
757 let entry = Entry::from_vecpak_bin(&entry_bin)?;
758
759 if entry.header.prev_hash != rooted_tip {
761 continue;
762 }
763
764 let trainers = fabric.trainers_for_height(entry.header.height).ok_or(Error::Missing("trainers_for_height"))?;
766
767 let (mutations_hash, score, _consensus) = fabric.best_consensus_by_entryhash(&trainers, entry.hash.as_ref())?;
769
770 if mutations_hash.is_some() {
771 entries.push(ScoredEntry { entry, mutations_hash, score });
772 }
773 }
774
775 entries.sort_by(|a, b| {
777 let score_cmp = b.score.partial_cmp(&a.score).unwrap_or(std::cmp::Ordering::Equal);
778 if score_cmp != std::cmp::Ordering::Equal {
779 return score_cmp;
780 }
781
782 let slot_cmp = a.entry.header.slot.cmp(&b.entry.header.slot);
783 if slot_cmp != std::cmp::Ordering::Equal {
784 return slot_cmp;
785 }
786
787 let mask_cmp = a.entry.mask.is_none().cmp(&b.entry.mask.is_none());
788 if mask_cmp != std::cmp::Ordering::Equal {
789 return mask_cmp;
790 }
791
792 a.entry.hash.cmp(&b.entry.hash)
793 });
794
795 Ok(entries)
796}
797
798pub fn proc_consensus(fabric: &Fabric) -> Result<(), Error> {
799 if fabric.get_temporal_entry()?.is_none() {
801 return Ok(());
802 }
803
804 let initial_rooted_hash = fabric.get_rooted_hash()?.unwrap_or([0u8; 32]);
805
806 loop {
807 let entry_root = fabric.get_rooted_entry()?.ok_or(Error::Missing("rooted_tip"))?;
808 let entry_temp = fabric.get_temporal_entry()?.ok_or(Error::Missing("temporal_tip"))?;
809 let height_root = entry_root.header.height;
810 let height_temp = entry_temp.header.height;
811
812 if height_root >= height_temp {
814 debug!(
815 "proc_consensus: rooted_height {} >= temporal_height {}, nothing to process",
816 height_root, height_temp
817 );
818 break;
819 }
820
821 let next_height = height_root + 1;
822
823 let next_entries = best_entry_for_height(fabric, next_height)?;
824
825 let Some(best_entry_info) = next_entries.first() else {
826 break;
827 };
828 let score = best_entry_info.score.unwrap_or(0.0);
838 let best_entry = &best_entry_info.entry;
839
840 if score < 0.67 {
842 warn!(
843 "proc_consensus: insufficient consensus score {:.2} (need 0.67) for entry {} at height {}",
844 score,
845 bs58::encode(&best_entry.hash).into_string(),
846 best_entry.header.height
847 );
848 break;
849 }
850
851 let mutations_hash = best_entry_info.mutations_hash.unwrap();
852
853 let my_attestation = fabric.my_attestation_by_entryhash(best_entry.hash.as_ref()).ok().flatten();
855
856 match my_attestation {
857 None => {
858 warn!(
860 "proc_consensus softfork: rewind to entry {} height {}",
861 bs58::encode(&best_entry.hash).into_string(),
862 best_entry.header.height
863 );
864 let rewind_hash = match best_entry_for_height(fabric, next_height - 1)?.first() {
866 Some(prev_best) => prev_best.entry.hash,
867 None => Hash::from(fabric.get_temporal_hash()?.unwrap_or([0u8; 32])),
868 };
869 chain_rewind(fabric, &rewind_hash)?;
870 continue; }
872 Some(my_att) => {
873 if mutations_hash != my_att.mutations_hash {
874 warn!(
875 "EMERGENCY: state divergence at height {}: our mutations {} != consensus {}, halting",
876 best_entry.header.height,
877 bs58::encode(&my_att.mutations_hash).into_string(),
878 bs58::encode(&mutations_hash).into_string()
879 );
880 if let Some(prev_best) = best_entry_for_height(fabric, next_height - 1)?.first() {
882 let _ = chain_rewind(fabric, &prev_best.entry.hash);
883 }
884 break;
885 } else {
886 info!(
888 "proc_consensus: rooting entry {} at height {} with score {:.2}",
889 bs58::encode(&best_entry.hash).into_string(),
890 best_entry.header.height,
891 score
892 );
893 fabric.set_rooted_hash_height(best_entry)?;
894 }
896 }
897 }
898 }
899
900 let final_rooted_hash = fabric.get_rooted_hash()?.unwrap_or([0u8; 32]);
902 if final_rooted_hash != initial_rooted_hash {
903 info!(
904 "proc_consensus: rooted tip changed from {} to {}",
905 bs58::encode(&initial_rooted_hash).into_string(),
906 bs58::encode(&final_rooted_hash).into_string()
907 );
908 }
910
911 Ok(())
912}
913
914pub fn validate_next_entry(current_entry: &Entry, next_entry: &Entry) -> Result<(), Error> {
915 let ceh = ¤t_entry.header;
916 let neh = &next_entry.header;
917
918 if ceh.slot as i64 != neh.prev_slot {
920 return Err(Error::WrongType("invalid_slot"));
921 }
922
923 if ceh.height != (neh.height - 1) {
925 return Err(Error::WrongType("invalid_height"));
926 }
927
928 if current_entry.hash != neh.prev_hash {
930 return Err(Error::WrongType("invalid_hash"));
931 }
932
933 let expected_dr = crate::utils::blake3::hash(ceh.dr.as_ref());
935 if expected_dr != neh.dr {
936 return Err(Error::WrongType("invalid_dr"));
937 }
938
939 if bls::verify(&neh.signer, &neh.vr, &*ceh.vr, DST_VRF).is_err() {
941 return Err(Error::InvalidSignature);
942 }
943
944 Ok(())
948}
949
950fn is_quorum_synced_off_by_x(fabric: &Fabric, x: u64) -> bool {
954 let temporal_height = fabric.get_temporal_height().ok().flatten().unwrap_or(0);
956 let rooted_height = fabric.get_rooted_height().ok().flatten().unwrap_or(0);
957
958 temporal_height.saturating_sub(rooted_height) <= x
960}
961
962pub fn delete_transactions_from_pool(_txs: &[crate::consensus::doms::tx::EntryTx]) {
963 }
971
972#[derive(Debug, Clone)]
973pub struct SoftforkSettings {
974 pub softfork_hash: Vec<Hash>,
975 pub softfork_deny_hash: Vec<Hash>,
976}
977
978pub fn get_softfork_settings() -> SoftforkSettings {
979 SoftforkSettings { softfork_hash: Vec::new(), softfork_deny_hash: Vec::new() }
982}
983
984pub async fn proc_entries(fabric: &Fabric, config: &crate::config::Config, ctx: &crate::Context) -> Result<(), Error> {
985 if fabric.get_temporal_entry()?.is_none() {
987 return Ok(());
988 }
989
990 let softfork_settings = get_softfork_settings();
991
992 loop {
994 let cur_entry = fabric.get_temporal_entry()?.ok_or(Error::Missing("temporal_tip"))?;
995 let cur_slot = cur_entry.header.slot;
996 let next_height = cur_entry.header.height + 1;
997
998 let mut next_entries: Vec<Entry> = fabric
1000 .entries_by_height(next_height as u64)?
1001 .into_iter()
1002 .filter_map(|entry_bin| Entry::from_vecpak_bin(&entry_bin).ok())
1003 .filter(|next_entry| {
1004 fabric.validate_entry_slot_trainer(next_entry, cur_slot)
1006 && !softfork_settings.softfork_deny_hash.contains(&next_entry.hash)
1007 && validate_next_entry(&cur_entry, next_entry).is_ok()
1008 })
1009 .collect();
1010
1011 next_entries.sort_by_key(|entry| {
1013 (
1014 softfork_settings.softfork_hash.contains(&entry.hash), entry.header.slot,
1016 entry.mask.is_some(), entry.hash,
1018 )
1019 });
1020
1021 let Some(entry) = next_entries.first() else {
1023 return Ok(());
1024 };
1025
1026 let attestation_packed = apply_entry(fabric, config, entry)?;
1028
1029 debug!("Applied entry {} at height {}", bs58::encode(&entry.hash).into_string(), entry.header.height);
1031
1032 if let Some(attestation_packed) = attestation_packed {
1034 if is_quorum_synced_off_by_x(fabric, 6) {
1035 broadcast_attestation(ctx, &attestation_packed, &entry.hash).await;
1036 }
1037 }
1038
1039 delete_transactions_from_pool(&entry.txs);
1041
1042 }
1044}
1045
1046async fn broadcast_attestation(ctx: &crate::Context, attestation_packed: &[u8], entry_hash: &Hash) {
1048 use crate::consensus::doms::attestation::{Attestation, EventAttestation};
1049
1050 let Some(attestation) = Attestation::from_vecpak_bin(attestation_packed) else {
1051 warn!("failed to decode attestation for broadcast");
1052 return;
1053 };
1054
1055 let event_att = Protocol::EventAttestation(EventAttestation::new(vec![attestation]));
1056
1057 if let Ok(peers) = ctx.peers.get_all().await {
1058 for peer in peers {
1059 let _ = event_att.send_to_with_metrics(ctx, peer.ip).await;
1060 }
1061 }
1062
1063 for seed_ip in &ctx.config.seed_ips {
1064 let _ = event_att.send_to_with_metrics(ctx, *seed_ip).await;
1065 }
1066
1067 debug!("Broadcasted attestation for entry {}", bs58::encode(entry_hash).into_string());
1068}
1069
1070pub fn hash_mutations_with_results(muts: &[Mutation], results: &[TxResult]) -> [u8; 32] {
1072 use crate::utils::safe_etf::{encode_safe_deterministic, u64_to_term};
1073 use eetf::{Atom, Binary, List, Map, Term};
1074 use std::collections::HashMap;
1075
1076 let mut etf_list = Vec::new();
1077
1078 for result in results {
1079 let mut map = HashMap::new();
1080 map.insert(Term::Atom(Atom::from("error")), Term::Atom(Atom::from(result.error.as_str())));
1081 etf_list.push(Term::Map(Map { map }));
1082 }
1083
1084 for m in muts {
1085 let mut map = HashMap::new();
1086
1087 match m {
1088 Mutation::Put { op: _, key, value } => {
1089 map.insert(Term::Atom(Atom::from("op")), Term::Atom(Atom::from("put")));
1090 map.insert(Term::Atom(Atom::from("key")), Term::Binary(Binary { bytes: key.clone() }));
1091 map.insert(Term::Atom(Atom::from("value")), Term::Binary(Binary { bytes: value.clone() }));
1092 }
1093 Mutation::Delete { op: _, key } => {
1094 map.insert(Term::Atom(Atom::from("op")), Term::Atom(Atom::from("delete")));
1095 map.insert(Term::Atom(Atom::from("key")), Term::Binary(Binary { bytes: key.clone() }));
1096 }
1097 Mutation::SetBit { op: _, key, value, bloomsize } => {
1098 map.insert(Term::Atom(Atom::from("op")), Term::Atom(Atom::from("set_bit")));
1099 map.insert(Term::Atom(Atom::from("key")), Term::Binary(Binary { bytes: key.clone() }));
1100 map.insert(Term::Atom(Atom::from("value")), u64_to_term(*value));
1101 map.insert(Term::Atom(Atom::from("bloomsize")), u64_to_term(*bloomsize));
1102 }
1103 Mutation::ClearBit { op: _, key, value } => {
1104 map.insert(Term::Atom(Atom::from("op")), Term::Atom(Atom::from("clear_bit")));
1105 map.insert(Term::Atom(Atom::from("key")), Term::Binary(Binary { bytes: key.clone() }));
1106 map.insert(Term::Atom(Atom::from("value")), u64_to_term(*value));
1107 }
1108 }
1109
1110 etf_list.push(Term::Map(Map { map }));
1111 }
1112
1113 let list_term = Term::List(List { elements: etf_list });
1115
1116 let encoded = encode_safe_deterministic(&list_term);
1118
1119 amadeus_utils::blake3::hash(&encoded)
1121}