Skip to main content

amadeus_node/consensus/
consensus.rs

1use crate::consensus::doms::attestation::Attestation;
2use crate::consensus::doms::entry::Entry;
3use crate::consensus::doms::tx::TxU;
4use crate::consensus::fabric;
5use crate::consensus::fabric::Fabric;
6use crate::node::protocol::Protocol;
7use crate::utils::bls12_381 as bls;
8use crate::utils::misc::{bin_to_bitvec, get_unix_millis_now};
9use crate::utils::rocksdb::RocksDb;
10use crate::utils::safe_etf::{encode_safe_deterministic, u64_to_term};
11use crate::utils::{Hash, PublicKey, Signature};
12use amadeus_runtime::consensus::consensus_apply::ApplyEnv;
13use amadeus_runtime::consensus::consensus_kv;
14use amadeus_runtime::consensus::consensus_muts::Mutation;
15use amadeus_runtime::consensus::unmask_trainers;
16use amadeus_utils::constants::{DST_ENTRY, DST_VRF};
17use amadeus_utils::vecpak::{Term, VecpakExt, decode};
18use bitvec::prelude::*;
19use std::collections::HashMap;
20use tracing::{debug, info, warn};
21
22#[derive(Debug, thiserror::Error)]
23pub enum Error {
24    #[error("wrong type: {0}")]
25    WrongType(&'static str),
26    #[error("missing: {0}")]
27    Missing(&'static str),
28    #[error("invalid signature")]
29    InvalidSignature,
30    #[error("not implemented: {0}")]
31    NotImplemented(&'static str),
32    #[error("runtime error: {0}")]
33    Runtime(&'static str),
34    #[error(transparent)]
35    EtfDecode(#[from] eetf::DecodeError),
36    #[error(transparent)]
37    EtfEncode(#[from] eetf::EncodeError),
38    #[error(transparent)]
39    Bls(#[from] bls::Error),
40    #[error(transparent)]
41    RocksDb(#[from] crate::utils::rocksdb::Error),
42    #[error(transparent)]
43    Fabric(#[from] fabric::Error),
44    #[error(transparent)]
45    Attestation(#[from] crate::consensus::doms::attestation::Error),
46    #[error(transparent)]
47    Entry(#[from] crate::consensus::doms::entry::Error),
48    #[error("bad format: {0}")]
49    BadFormat(&'static str),
50}
51
52/// Nested aggsig structure matching Elixir's format
53#[derive(Debug, Clone, PartialEq, serde::Serialize, serde::Deserialize)]
54pub struct Aggsig {
55    #[serde(with = "serde_bytes")]
56    pub mask: Vec<u8>,
57    #[serde(with = "serde_bytes")]
58    pub aggsig: Vec<u8>,
59    #[serde(default)]
60    pub mask_size: u64,
61    #[serde(default)]
62    pub mask_set_size: u64,
63}
64
65#[derive(Debug, Clone, PartialEq, serde::Serialize, serde::Deserialize)]
66pub struct Consensus {
67    pub entry_hash: Hash,
68    pub mutations_hash: Hash,
69    pub aggsig: Aggsig,
70}
71
72impl Consensus {
73    /// Decode from vecpak format (data from Elixir now uses vecpak)
74    pub fn from_vecpak_bin(bin: &[u8]) -> Result<Self, Error> {
75        let map = decode(bin)
76            .map_err(|_| Error::BadFormat("consensus_packed"))?
77            .get_proplist_map()
78            .ok_or(Error::BadFormat("consensus_packed"))?;
79        Self::from_vecpak_map(&map)
80    }
81
82    /// Parse Consensus from vecpak term
83    pub fn from_vecpak_map(map: &amadeus_utils::vecpak::PropListMap) -> Result<Self, Error> {
84        let entry_hash: Hash = map.get_binary(b"entry_hash").ok_or(Error::BadFormat("consensus.entry_hash"))?;
85        let mutations_hash: Hash =
86            map.get_binary(b"mutations_hash").ok_or(Error::BadFormat("consensus.mutations_hash"))?;
87
88        // Parse nested aggsig structure
89        let aggsig_map = map.get_proplist_map(b"aggsig").ok_or(Error::BadFormat("consensus.aggsig"))?;
90        let aggsig = Aggsig {
91            mask: aggsig_map.get_binary(b"mask").unwrap_or_default(),
92            aggsig: aggsig_map.get_binary(b"aggsig").ok_or(Error::BadFormat("consensus.aggsig.aggsig"))?,
93            mask_size: aggsig_map.get_integer(b"mask_size").unwrap_or(0),
94            mask_set_size: aggsig_map.get_integer(b"mask_set_size").unwrap_or(0),
95        };
96
97        Ok(Self { entry_hash, mutations_hash, aggsig })
98    }
99
100    pub fn to_vecpak_term(&self) -> Term {
101        // Build nested aggsig structure
102        let mut aggsig_pairs = vec![
103            (Term::Binary(b"mask".to_vec()), Term::Binary(self.aggsig.mask.clone())),
104            (Term::Binary(b"aggsig".to_vec()), Term::Binary(self.aggsig.aggsig.clone())),
105        ];
106        if self.aggsig.mask_size > 0 {
107            aggsig_pairs.push((Term::Binary(b"mask_size".to_vec()), Term::VarInt(self.aggsig.mask_size as i128)));
108        }
109        if self.aggsig.mask_set_size > 0 {
110            aggsig_pairs
111                .push((Term::Binary(b"mask_set_size".to_vec()), Term::VarInt(self.aggsig.mask_set_size as i128)));
112        }
113
114        Term::PropList(vec![
115            (Term::Binary(b"aggsig".to_vec()), Term::PropList(aggsig_pairs)),
116            (Term::Binary(b"entry_hash".to_vec()), Term::Binary(self.entry_hash.to_vec())),
117            (Term::Binary(b"mutations_hash".to_vec()), Term::Binary(self.mutations_hash.to_vec())),
118        ])
119    }
120
121    /// Get mask as BitVec for compatibility
122    pub fn mask(&self) -> BitVec<u8, Msb0> {
123        bin_to_bitvec(self.aggsig.mask.clone())
124    }
125
126    /// Get signature for compatibility
127    pub fn signature(&self) -> Option<Signature> {
128        Signature::try_from(self.aggsig.aggsig.as_slice()).ok()
129    }
130}
131
132pub fn chain_muts_rev(fabric: &Fabric, hash: &Hash) -> Option<Vec<Mutation>> {
133    let bin = fabric.get_muts_rev(hash).ok()??;
134    mutations_from_etf(&bin).ok()
135}
136
137pub fn chain_muts(fabric: &Fabric, hash: &Hash) -> Option<Vec<Mutation>> {
138    let bin = fabric.get_muts(hash).ok()??;
139    mutations_from_etf(&bin).ok()
140}
141
142#[derive(Debug, Clone)]
143pub struct TxResult {
144    pub error: String,
145    pub logs: Vec<String>,
146}
147
148impl TxResult {
149    /// Convert TxResult to ETF term matching Elixir format: %{error: :ok, logs: []}
150    pub fn to_eetf_term(&self) -> eetf::Term {
151        let mut map = HashMap::new();
152
153        // error field as atom
154        map.insert(
155            eetf::Term::Atom(eetf::Atom::from("error")),
156            eetf::Term::Atom(eetf::Atom::from(self.error.as_str())),
157        );
158
159        // logs field as list of binaries
160        let logs_terms: Vec<eetf::Term> =
161            self.logs.iter().map(|log| eetf::Term::from(eetf::Binary { bytes: log.as_bytes().to_vec() })).collect();
162        map.insert(eetf::Term::Atom(eetf::Atom::from("logs")), eetf::Term::from(eetf::List { elements: logs_terms }));
163
164        eetf::Term::from(eetf::Map { map })
165    }
166}
167
168fn execute_transaction(
169    env: &mut ApplyEnv,
170    db: &RocksDb,
171    txu: &TxU,
172) -> (String, Vec<String>, Vec<Mutation>, Vec<Mutation>) {
173    let action = &txu.tx.action;
174
175    env.muts.clear();
176    env.muts_rev.clear();
177
178    env.caller_env.tx_hash = txu.hash.to_vec();
179    env.caller_env.tx_signer = txu.tx.signer;
180    env.caller_env.account_caller = txu.tx.signer.to_vec();
181    env.caller_env.attached_symbol = action.attached_symbol.clone().unwrap_or_default();
182    env.caller_env.attached_amount = action.attached_amount.clone().unwrap_or_default();
183
184    let contract_bytes = txu.contract_bytes();
185    let function_str = std::str::from_utf8(&action.function).unwrap_or("");
186    let (error, logs) = match contract_bytes.as_slice() {
187        b"Epoch" => execute_epoch_call(env, function_str, &action.args),
188        b"Coin" => execute_coin_call(env, function_str, &action.args),
189        b"Contract" => execute_contract_call(env, function_str, &action.args),
190        contract if contract.len() == 48 => execute_wasm_call(env, db, contract, function_str, &action.args),
191        _ => ("invalid_contract".to_string(), vec![]),
192    };
193
194    (error, logs, env.muts.clone(), env.muts_rev.clone())
195}
196
197fn execute_epoch_call(env: &mut ApplyEnv, function: &str, args: &[Vec<u8>]) -> (String, Vec<String>) {
198    parse_epoch_call(function, args)
199        .and_then(|call| amadeus_runtime::consensus::bic::epoch::Epoch.call(env, call).map_err(|e| e.to_string()))
200        .map(|_| ("ok".to_string(), vec![]))
201        .unwrap_or_else(|e| (e, vec![]))
202}
203
204fn execute_coin_call(env: &mut ApplyEnv, function: &str, args: &[Vec<u8>]) -> (String, Vec<String>) {
205    amadeus_runtime::consensus::bic::coin::call(env, function, args)
206        .map(|_| ("ok".to_string(), vec![]))
207        .unwrap_or_else(|e| (e.to_string(), vec![]))
208}
209
210fn execute_contract_call(env: &mut ApplyEnv, function: &str, args: &[Vec<u8>]) -> (String, Vec<String>) {
211    amadeus_runtime::consensus::bic::contract::call(env, function, args)
212        .map(|_| ("ok".to_string(), vec![]))
213        .unwrap_or_else(|e| (e.to_string(), vec![]))
214}
215
216fn execute_wasm_call(
217    apply_env: &mut ApplyEnv,
218    _db: &RocksDb,
219    contract: &[u8],
220    function: &str,
221    args: &[Vec<u8>],
222) -> (String, Vec<String>) {
223    // check if contract has bytecode
224    let bytecode = match amadeus_runtime::consensus::bic::contract::bytecode(apply_env, contract) {
225        Ok(Some(code)) => code,
226        Ok(None) => return ("account_has_no_bytecode".to_string(), vec![]),
227        Err(e) => return (format!("bytecode_error:{}", e), vec![]),
228    };
229
230    // execute wasm using the runtime from amadeus-runtime
231    match amadeus_runtime::consensus::wasm::execute(apply_env, &bytecode, function, args) {
232        Ok(result) => {
233            // mutations are already updated in apply_env by the runtime
234            // return success with logs
235            ("ok".to_string(), result.logs)
236        }
237        Err(e) => {
238            // wasm execution failed
239            (format!("wasm_error:{}", e), vec![])
240        }
241    }
242}
243
244fn parse_epoch_call(
245    function: &str,
246    args: &[Vec<u8>],
247) -> Result<amadeus_runtime::consensus::bic::epoch::EpochCall, String> {
248    use amadeus_runtime::consensus::bic::epoch::EpochCall;
249
250    match function {
251        "submit_sol" => Ok(EpochCall::SubmitSol { sol: args.first().ok_or("missing sol arg")?.clone() }),
252        "set_emission_address" => {
253            let addr_bytes = args.first().ok_or("missing address arg")?;
254            let address = addr_bytes.as_slice().try_into().map_err(|_| "invalid address length")?;
255            Ok(EpochCall::SetEmissionAddress { address })
256        }
257        "slash_trainer" => {
258            let epoch_bytes = args.first().ok_or("missing epoch")?;
259            let epoch = u32::from_le_bytes(epoch_bytes.get(..4).ok_or("invalid epoch")?.try_into().unwrap()) as u64;
260            let malicious_pk = args.get(1).ok_or("missing pk")?.as_slice().try_into().map_err(|_| "invalid pk")?;
261            let signature = args.get(2).ok_or("missing signature")?.clone();
262            let mask = crate::utils::misc::bin_to_bitvec(args.get(3).ok_or("missing mask")?.clone());
263            Ok(EpochCall::SlashTrainer { epoch, malicious_pk, signature, mask, trainers: None })
264        }
265        _ => Err(format!("unknown function: {}", function)),
266    }
267}
268
269fn call_txs_pre(env: &mut ApplyEnv, next_entry: &Entry, txs: &[TxU]) -> Result<(), &'static str> {
270    let epoch = next_entry.header.height / 100_000;
271
272    let entry_signer_key =
273        crate::utils::misc::bcat(&[b"account:", next_entry.header.signer.as_ref(), b":balance:AMA"]);
274    let burn_address_key = crate::utils::misc::bcat(&[
275        b"account:",
276        &amadeus_runtime::consensus::bic::coin::BURN_ADDRESS,
277        b":balance:AMA",
278    ]);
279
280    for tx in txs {
281        let nonce_key = crate::utils::misc::bcat(&[b"account:", tx.tx.signer.as_ref(), b":attribute:nonce"]);
282        let nonce_i64 = i64::try_from(tx.tx.nonce).unwrap_or(i64::MAX);
283        consensus_kv::kv_put(env, &nonce_key, &nonce_i64.to_string().into_bytes())?;
284
285        let bytes = tx.tx_encoded().len() + 32 + 96;
286        let exec_cost = if epoch >= 295 {
287            amadeus_runtime::consensus::bic::coin::to_cents((1 + bytes / 1024) as i128)
288        } else {
289            amadeus_runtime::consensus::bic::coin::to_cents((3 + bytes / 256 * 3) as i128)
290        };
291
292        let signer_balance_key = crate::utils::misc::bcat(&[b"account:", tx.tx.signer.as_ref(), b":balance:AMA"]);
293        consensus_kv::kv_increment(env, &signer_balance_key, -exec_cost)?;
294
295        consensus_kv::kv_increment(env, &entry_signer_key, exec_cost / 2)?;
296        consensus_kv::kv_increment(env, &burn_address_key, exec_cost / 2)?;
297    }
298    Ok(())
299}
300
301/// Convert Mutation type to ETF
302fn mutations_to_etf(muts: &[Mutation]) -> Vec<u8> {
303    use crate::utils::safe_etf::{encode_safe_deterministic, u64_to_term};
304    use eetf::{Atom, Binary, List, Map, Term};
305    use std::collections::HashMap;
306
307    let mut etf_list = Vec::new();
308
309    for m in muts {
310        let mut map = HashMap::new();
311
312        match m {
313            Mutation::Put { op: _, key, value } => {
314                map.insert(Term::Atom(Atom::from("op")), Term::Atom(Atom::from("put")));
315                map.insert(Term::Atom(Atom::from("key")), Term::Binary(Binary { bytes: key.clone() }));
316                map.insert(Term::Atom(Atom::from("value")), Term::Binary(Binary { bytes: value.clone() }));
317            }
318            Mutation::Delete { op: _, key } => {
319                map.insert(Term::Atom(Atom::from("op")), Term::Atom(Atom::from("delete")));
320                map.insert(Term::Atom(Atom::from("key")), Term::Binary(Binary { bytes: key.clone() }));
321            }
322            Mutation::SetBit { op: _, key, value, bloomsize } => {
323                map.insert(Term::Atom(Atom::from("op")), Term::Atom(Atom::from("set_bit")));
324                map.insert(Term::Atom(Atom::from("key")), Term::Binary(Binary { bytes: key.clone() }));
325                map.insert(Term::Atom(Atom::from("value")), u64_to_term(*value));
326                map.insert(Term::Atom(Atom::from("bloomsize")), u64_to_term(*bloomsize));
327            }
328            Mutation::ClearBit { op: _, key, value } => {
329                map.insert(Term::Atom(Atom::from("op")), Term::Atom(Atom::from("clear_bit")));
330                map.insert(Term::Atom(Atom::from("key")), Term::Binary(Binary { bytes: key.clone() }));
331                map.insert(Term::Atom(Atom::from("value")), u64_to_term(*value));
332            }
333        }
334
335        etf_list.push(Term::Map(Map { map }));
336    }
337
338    let list = Term::List(List { elements: etf_list });
339    encode_safe_deterministic(&list)
340}
341
342fn mutations_from_etf(bin: &[u8]) -> Result<Vec<Mutation>, Error> {
343    use crate::utils::misc::TermExt;
344    use eetf::{Atom, Term};
345
346    let term = Term::decode(bin).map_err(|_| Error::WrongType("invalid_etf"))?;
347    let list = match &term {
348        Term::List(l) => &l.elements,
349        _ => return Err(Error::WrongType("not_list")),
350    };
351
352    let mut muts = Vec::new();
353    for elem in list {
354        let map = match elem {
355            Term::Map(m) => &m.map,
356            _ => return Err(Error::WrongType("not_map")),
357        };
358
359        let op = map
360            .get(&Term::Atom(Atom::from("op")))
361            .and_then(|t| match t {
362                Term::Atom(a) => Some(a),
363                _ => None,
364            })
365            .ok_or(Error::Missing("op"))?;
366
367        match op.name.as_str() {
368            "put" => {
369                let key = map
370                    .get(&Term::Atom(Atom::from("key")))
371                    .and_then(|t| t.get_binary())
372                    .ok_or(Error::Missing("key"))?
373                    .to_vec();
374                let value = map
375                    .get(&Term::Atom(Atom::from("value")))
376                    .and_then(|t| t.get_binary())
377                    .ok_or(Error::Missing("value"))?
378                    .to_vec();
379                muts.push(Mutation::Put { op: vec![], key, value });
380            }
381            "delete" => {
382                let key = map
383                    .get(&Term::Atom(Atom::from("key")))
384                    .and_then(|t| t.get_binary())
385                    .ok_or(Error::Missing("key"))?
386                    .to_vec();
387                muts.push(Mutation::Delete { op: vec![], key });
388            }
389            "set_bit" => {
390                let key = map
391                    .get(&Term::Atom(Atom::from("key")))
392                    .and_then(|t| t.get_binary())
393                    .ok_or(Error::Missing("key"))?
394                    .to_vec();
395                let value = map
396                    .get(&Term::Atom(Atom::from("value")))
397                    .and_then(|t| t.get_integer())
398                    .map(|i| i as u64)
399                    .ok_or(Error::Missing("value"))?;
400                let bloomsize = map
401                    .get(&Term::Atom(Atom::from("bloomsize")))
402                    .and_then(|t| t.get_integer())
403                    .map(|i| i as u64)
404                    .ok_or(Error::Missing("bloomsize"))?;
405                muts.push(Mutation::SetBit { op: vec![], key, value, bloomsize });
406            }
407            "clear_bit" => {
408                let key = map
409                    .get(&Term::Atom(Atom::from("key")))
410                    .and_then(|t| t.get_binary())
411                    .ok_or(Error::Missing("key"))?
412                    .to_vec();
413                let value = map
414                    .get(&Term::Atom(Atom::from("value")))
415                    .and_then(|t| t.get_integer())
416                    .map(|i| i as u64)
417                    .ok_or(Error::Missing("value"))?;
418                muts.push(Mutation::ClearBit { op: vec![], key, value });
419            }
420            _ => return Err(Error::WrongType("unknown_op")),
421        }
422    }
423
424    Ok(muts)
425}
426
427/// Exit logic: segment VR updates and epoch transitions
428fn call_exit(env: &mut ApplyEnv, next_entry: &Entry) -> Result<(), &'static str> {
429    // seed random (matches Elixir: seed_random(env.entry_vr, "", "", ""))
430    let vr = next_entry.header.vr.to_vec();
431    let seed_hash = crate::utils::blake3::hash(&vr);
432    env.caller_env.seed = seed_hash.to_vec();
433    // extract f64 from first 8 bytes of seed_hash in little-endian
434    let seedf64 = f64::from_le_bytes(seed_hash[0..8].try_into().unwrap_or([0u8; 8]));
435    env.caller_env.seedf64 = seedf64;
436
437    // Update segment VR hash every 1000 blocks
438    if next_entry.header.height % 1000 == 0 {
439        consensus_kv::kv_put(
440            env,
441            b"bic:epoch:segment_vr_hash",
442            crate::utils::blake3::hash(next_entry.header.vr.as_ref()).as_ref(),
443        )?;
444    }
445
446    // Epoch transition every 100k blocks
447    if next_entry.header.height % 100_000 == 99_999 {
448        // Update caller_env for epoch transition (readonly mode)
449        env.caller_env.readonly = true;
450        env.caller_env.tx_hash = vec![];
451        env.caller_env.tx_signer = PublicKey::from([0u8; 48]);
452        env.caller_env.account_caller = vec![];
453        env.caller_env.call_exec_points = 0;
454        env.caller_env.call_exec_points_remaining = 0;
455        env.caller_env.attached_symbol = vec![];
456        env.caller_env.attached_amount = vec![];
457        let _ = amadeus_runtime::consensus::bic::epoch::Epoch.next(env);
458    }
459    Ok(())
460}
461
462pub fn apply_entry(
463    fabric: &Fabric,
464    config: &crate::config::Config,
465    next_entry: &Entry,
466) -> Result<Option<Vec<u8>>, Error> {
467    let Some(curr_h) = fabric.get_temporal_height().ok().flatten() else {
468        return Err(Error::Missing("temporal_height"));
469    };
470
471    if next_entry.header.height != curr_h + 1 {
472        return Err(Error::WrongType("invalid_height"));
473    }
474
475    let txs = &next_entry.txs;
476
477    // Create transaction and ApplyEnv
478    let entry_vr_b3 = crate::utils::blake3::hash(next_entry.header.vr.as_ref());
479    let mut env = amadeus_runtime::consensus::consensus_apply::make_apply_env(
480        fabric.db(),
481        "contractstate",
482        &next_entry.header.signer,
483        &next_entry.header.prev_hash,
484        next_entry.header.slot,
485        next_entry.header.prev_slot as u64,
486        next_entry.header.height,
487        next_entry.header.height / 100_000,
488        &next_entry.header.vr,
489        &Hash::from(entry_vr_b3),
490        &next_entry.header.dr,
491    )
492    .map_err(Error::Runtime)?;
493
494    // pre-process transactions (nonce updates, gas deduction)
495    call_txs_pre(&mut env, next_entry, txs).map_err(Error::Runtime)?;
496    // Collect mutations from pre-processing AFTER call_txs_pre
497    let mut muts = env.muts.clone();
498    let mut muts_rev = env.muts_rev.clone();
499
500    // execute transactions (mutations include gas)
501    let mut tx_results = Vec::new();
502    let db = fabric.db();
503    for txu in txs {
504        let (error, logs, m3, m_rev3) = execute_transaction(&mut env, db, txu);
505
506        if error == "ok" {
507            // success: add all mutations
508            muts.extend(m3);
509            muts_rev.extend(m_rev3);
510        } else {
511            // failure: apply reverse mutations
512            consensus_kv::revert(&mut env).map_err(Error::Runtime)?;
513        }
514
515        tx_results.push(TxResult { error, logs });
516    }
517
518    // Clear mutations before call_exit to avoid collecting the last transaction's mutations twice
519    env.muts.clear();
520    env.muts_rev.clear();
521
522    // call exit logic (segment VR updates, epoch transitions)
523    call_exit(&mut env, next_entry).map_err(Error::Runtime)?;
524
525    // get exit mutations and combine
526    let muts_exit = env.muts.clone();
527    let muts_exit_rev = env.muts_rev.clone();
528    muts.extend(muts_exit);
529    muts_rev.extend(muts_exit_rev);
530
531    // Hash results + mutations
532    let mutations_hash = hash_mutations_with_results(&muts, &tx_results);
533
534    // Commit the transaction
535    env.txn.commit().map_err(crate::utils::rocksdb::Error::from)?;
536
537    // sign attestation
538    let pk = config.get_pk();
539    let sk = config.get_sk();
540    let attestation = Attestation::sign_with(pk.as_ref(), &sk, &next_entry.hash, &Hash::from(mutations_hash))?;
541    let attestation_packed = attestation.to_vecpak_bin();
542
543    // store my attestation
544    fabric.put_attestation(&next_entry.hash, &attestation_packed)?;
545
546    // check if we're a trainer for this height
547    let trainers = fabric.trainers_for_height(next_entry.header.height).ok_or(Error::Missing("trainers_for_height"))?;
548    let is_trainer = trainers.iter().any(|t| t == &pk);
549
550    let seen_time_ms = get_unix_millis_now();
551    fabric.put_entry_seen_time(&next_entry.hash, seen_time_ms)?;
552
553    // update chain tip
554    fabric.set_temporal_hash_height(next_entry)?;
555
556    // store mutations and reverse mutations for potential rewind
557    let muts_bin = mutations_to_etf(&muts);
558    fabric.put_muts(&next_entry.hash, &muts_bin)?;
559    let muts_rev_bin = mutations_to_etf(&muts_rev);
560    fabric.put_muts_rev(&next_entry.hash, &muts_rev_bin)?;
561
562    // store entry itself and index it (fabric.insert_entry handles both)
563    let entry_bin = next_entry.to_vecpak_bin();
564    fabric.insert_entry(
565        &next_entry.hash,
566        next_entry.header.height,
567        next_entry.header.slot,
568        &entry_bin,
569        seen_time_ms,
570    )?;
571
572    for (tx, result) in next_entry.txs.iter().zip(tx_results.iter()) {
573        let nonce_padded = format!("{:020}", tx.tx.nonce);
574        let key = format!("{}:{}", bs58::encode(&tx.tx.signer).into_string(), nonce_padded);
575        fabric.put_tx_account_nonce(key.as_bytes(), &tx.hash)?;
576
577        let tx_bin = amadeus_utils::vecpak::to_vec(tx).unwrap_or_default();
578        let entry_bin = next_entry.to_vecpak_bin();
579        if let Some(pos) = entry_bin.windows(tx_bin.len()).position(|w| w == tx_bin) {
580            // Build tx metadata map matching Elixir structure
581            let mut tx_meta = HashMap::new();
582            tx_meta.insert(
583                eetf::Term::Atom(eetf::Atom::from("entry_hash")),
584                eetf::Term::from(eetf::Binary { bytes: next_entry.hash.to_vec() }),
585            );
586            tx_meta.insert(eetf::Term::Atom(eetf::Atom::from("result")), result.to_eetf_term());
587            tx_meta.insert(eetf::Term::Atom(eetf::Atom::from("index_start")), u64_to_term(pos as u64));
588            tx_meta.insert(eetf::Term::Atom(eetf::Atom::from("index_size")), u64_to_term(tx_bin.len() as u64));
589
590            let term = eetf::Term::Map(eetf::Map { map: tx_meta });
591            let tx_meta_bin = encode_safe_deterministic(&term);
592            fabric.put_tx_metadata(tx.hash.as_ref(), &tx_meta_bin)?;
593        }
594    }
595
596    Ok(if is_trainer { Some(attestation_packed) } else { None })
597}
598
599pub fn produce_entry(fabric: &Fabric, config: &crate::config::Config, slot: u64) -> Result<Entry, Error> {
600    let cur_entry = fabric.get_temporal_entry()?.ok_or(Error::Missing("temporal_tip"))?;
601
602    // build next header
603    let pk = config.get_pk();
604    let sk = config.get_sk();
605    let next_header = cur_entry.build_next_header(slot, &pk, &sk)?;
606
607    // TODO: grab transactions from TXPool
608    let txs: Vec<crate::consensus::doms::tx::EntryTx> = Vec::new();
609
610    // Serialize all transactions to compute root_tx hash
611    let txs_bin: Vec<u8> = txs.iter().flat_map(|tx| amadeus_utils::vecpak::to_vec(tx).unwrap_or_default()).collect();
612    let root_tx = crate::utils::blake3::hash(&txs_bin);
613
614    let mut header = next_header;
615    header.root_tx = Hash::from(root_tx);
616
617    // sign the header
618    let header_bin = header.to_vecpak_bin();
619    let header_hash = crate::utils::blake3::hash(&header_bin);
620    let signature = bls::sign(&sk, header_hash.as_ref(), DST_ENTRY)?;
621
622    // compute entry hash
623    let entry = Entry {
624        hash: Hash::from([0u8; 32]), // will be computed below
625        header,
626        signature,
627        mask: None,
628        txs,
629    };
630
631    // compute proper entry hash
632    let entry_bin = entry.to_vecpak_bin();
633    let hash = crate::utils::blake3::hash(&entry_bin);
634
635    Ok(Entry {
636        hash: Hash::from(hash),
637        header: entry.header,
638        signature: entry.signature,
639        mask: entry.mask,
640        txs: entry.txs,
641    })
642}
643
644pub fn chain_rewind(fabric: &Fabric, target_hash: &Hash) -> Result<bool, Error> {
645    if !fabric.is_in_chain(target_hash) {
646        return Ok(false);
647    }
648
649    let tip_entry = fabric.get_temporal_entry()?.ok_or(Error::Missing("temporal_tip"))?;
650    let entry = chain_rewind_internal(fabric, &tip_entry, target_hash)?;
651
652    fabric.set_temporal_hash_height(&entry)?;
653
654    let rooted_hash = fabric.get_rooted_hash()?.ok_or(Error::Missing("rooted_tip"))?;
655    if fabric.get_entry_raw(&Hash::from(rooted_hash))?.is_none() {
656        let rooted_height = fabric.get_rooted_height()?.ok_or(Error::Missing("rooted_height"))?;
657        warn!("Rewind rolled back rooted entries from {rooted_height} until {}", entry.header.height);
658        fabric.set_rooted_hash_height(&entry)?;
659    }
660
661    Ok(true)
662}
663
664fn chain_rewind_internal(fabric: &Fabric, current_entry: &Entry, target_hash: &Hash) -> Result<Entry, Error> {
665    let mut current = current_entry.clone();
666
667    loop {
668        // get previous entry BEFORE unapplying (since unapply deletes the entry from DB)
669        let prev_entry = fabric.get_entry_by_hash(&current.header.prev_hash);
670
671        // revert mutations for current entry
672        let db = fabric.db();
673        if let Some(m_rev_new) = chain_muts_rev(fabric, &current.hash) {
674            consensus_kv::apply_mutations(db, "contractstate", &m_rev_new).map_err(Error::Runtime)?;
675        }
676
677        // remove current entry from indices
678        fabric.delete_entry(&current.hash)?;
679        fabric.delete_entry_seen_time(&current.hash)?;
680
681        let mut height_key = current.header.height.to_string().into_bytes();
682        height_key.push(b':');
683        height_key.extend_from_slice(current.hash.as_ref());
684        fabric.delete_entry_by_height(&height_key)?;
685
686        let mut slot_key = current.header.slot.to_string().into_bytes();
687        slot_key.push(b':');
688        slot_key.extend_from_slice(current.hash.as_ref());
689        fabric.delete_entry_by_slot(&slot_key)?;
690
691        fabric.delete_consensus(&current.hash)?;
692        fabric.delete_attestation(&current.hash)?;
693
694        for tx in &current.txs {
695            fabric.delete_tx_metadata(&tx.hash)?;
696            let nonce_padded = format!("{:020}", tx.tx.nonce);
697            let key = format!("{}:{}", bs58::encode(&tx.tx.signer).into_string(), nonce_padded);
698            fabric.delete_tx_account_nonce(key.as_bytes())?;
699        }
700
701        // if we just unapplied the target, return its parent
702        if current.hash == *target_hash {
703            return prev_entry.ok_or(Error::Missing("prev_entry_in_rewind"));
704        }
705
706        // continue rewinding
707        current = prev_entry.ok_or(Error::Missing("prev_entry_in_rewind"))?;
708    }
709}
710
711pub fn best_by_weight(
712    trainers: &[PublicKey],
713    consensuses: &HashMap<[u8; 32], Consensus>,
714) -> (Option<[u8; 32]>, Option<f64>, Option<Consensus>) {
715    let max_score = trainers.len() as f64;
716    let mut best: Option<([u8; 32], f64, Consensus)> = None;
717
718    for (k, v) in consensuses.iter() {
719        // calculate weighted score
720        let mask = v.mask();
721        let trainers_signed = if mask.is_empty() { trainers.to_vec() } else { unmask_trainers(&mask, trainers) };
722        let mut score = 0.0;
723        for _pk in trainers_signed {
724            // TODO: implement ConsensusWeight.count(pk) - for now use unit weight
725            score += 1.0;
726        }
727        score /= max_score;
728
729        match &mut best {
730            None => best = Some((*k, score, v.clone())),
731            Some((_, best_score, _)) if score > *best_score => best = Some((*k, score, v.clone())),
732            _ => {}
733        }
734    }
735
736    match best {
737        Some((k, score, v)) => (Some(k), Some(score), Some(v)),
738        None => (None, None, None),
739    }
740}
741
742#[derive(Debug, Clone)]
743pub struct ScoredEntry {
744    pub entry: Entry,
745    pub mutations_hash: Option<[u8; 32]>,
746    pub score: Option<f64>,
747}
748
749pub fn best_entry_for_height(fabric: &Fabric, height: u64) -> Result<Vec<ScoredEntry>, Error> {
750    let rooted_tip = fabric.get_rooted_hash()?.unwrap_or([0u8; 32]);
751
752    // get entries by height
753    let entry_bins = fabric.entries_by_height(height as u64)?;
754    let mut entries = Vec::new();
755
756    for entry_bin in entry_bins {
757        let entry = Entry::from_vecpak_bin(&entry_bin)?;
758
759        // filter by prev_hash == rooted_tip
760        if entry.header.prev_hash != rooted_tip {
761            continue;
762        }
763
764        // get trainers for this height
765        let trainers = fabric.trainers_for_height(entry.header.height).ok_or(Error::Missing("trainers_for_height"))?;
766
767        // get best consensus for this entry
768        let (mutations_hash, score, _consensus) = fabric.best_consensus_by_entryhash(&trainers, entry.hash.as_ref())?;
769
770        if mutations_hash.is_some() {
771            entries.push(ScoredEntry { entry, mutations_hash, score });
772        }
773    }
774
775    // sort by score (descending), slot, mask presence, hash
776    entries.sort_by(|a, b| {
777        let score_cmp = b.score.partial_cmp(&a.score).unwrap_or(std::cmp::Ordering::Equal);
778        if score_cmp != std::cmp::Ordering::Equal {
779            return score_cmp;
780        }
781
782        let slot_cmp = a.entry.header.slot.cmp(&b.entry.header.slot);
783        if slot_cmp != std::cmp::Ordering::Equal {
784            return slot_cmp;
785        }
786
787        let mask_cmp = a.entry.mask.is_none().cmp(&b.entry.mask.is_none());
788        if mask_cmp != std::cmp::Ordering::Equal {
789            return mask_cmp;
790        }
791
792        a.entry.hash.cmp(&b.entry.hash)
793    });
794
795    Ok(entries)
796}
797
798pub fn proc_consensus(fabric: &Fabric) -> Result<(), Error> {
799    // Skip processing if no temporal_tip or if entry data not available yet
800    if fabric.get_temporal_entry()?.is_none() {
801        return Ok(());
802    }
803
804    let initial_rooted_hash = fabric.get_rooted_hash()?.unwrap_or([0u8; 32]);
805
806    loop {
807        let entry_root = fabric.get_rooted_entry()?.ok_or(Error::Missing("rooted_tip"))?;
808        let entry_temp = fabric.get_temporal_entry()?.ok_or(Error::Missing("temporal_tip"))?;
809        let height_root = entry_root.header.height;
810        let height_temp = entry_temp.header.height;
811
812        // nothing more to process
813        if height_root >= height_temp {
814            debug!(
815                "proc_consensus: rooted_height {} >= temporal_height {}, nothing to process",
816                height_root, height_temp
817            );
818            break;
819        }
820
821        let next_height = height_root + 1;
822
823        let next_entries = best_entry_for_height(fabric, next_height)?;
824
825        let Some(best_entry_info) = next_entries.first() else {
826            break;
827        };
828        //
829        // let entry = fabric.get_entry_by_hash(&best_entry_info.entry.hash).unwrap();
830        // println!("height {next_height}");
831        // // print entry transactions
832        // entry.txs.iter().for_each(|tx_packed| {
833        //     let txu = TxU::from_vanilla(tx_packed).unwrap();
834        //     println!("Transaction: {:?}", txu.tx.actions);
835        // });
836
837        let score = best_entry_info.score.unwrap_or(0.0);
838        let best_entry = &best_entry_info.entry;
839
840        // not enough consensus
841        if score < 0.67 {
842            warn!(
843                "proc_consensus: insufficient consensus score {:.2} (need 0.67) for entry {} at height {}",
844                score,
845                bs58::encode(&best_entry.hash).into_string(),
846                best_entry.header.height
847            );
848            break;
849        }
850
851        let mutations_hash = best_entry_info.mutations_hash.unwrap();
852
853        // get our local attestation for this entry to verify we applied it with same mutations
854        let my_attestation = fabric.my_attestation_by_entryhash(best_entry.hash.as_ref()).ok().flatten();
855
856        match my_attestation {
857            None => {
858                // softfork: consensus chose entry we don't have applied, need to rewind
859                warn!(
860                    "proc_consensus softfork: rewind to entry {} height {}",
861                    bs58::encode(&best_entry.hash).into_string(),
862                    best_entry.header.height
863                );
864                // get best entry for previous height to rewind to
865                let rewind_hash = match best_entry_for_height(fabric, next_height - 1)?.first() {
866                    Some(prev_best) => prev_best.entry.hash,
867                    None => Hash::from(fabric.get_temporal_hash()?.unwrap_or([0u8; 32])),
868                };
869                chain_rewind(fabric, &rewind_hash)?;
870                continue; // retry proc_consensus
871            }
872            Some(my_att) => {
873                if mutations_hash != my_att.mutations_hash {
874                    warn!(
875                        "EMERGENCY: state divergence at height {}: our mutations {} != consensus {}, halting",
876                        best_entry.header.height,
877                        bs58::encode(&my_att.mutations_hash).into_string(),
878                        bs58::encode(&mutations_hash).into_string()
879                    );
880                    // rewind to previous height before halting
881                    if let Some(prev_best) = best_entry_for_height(fabric, next_height - 1)?.first() {
882                        let _ = chain_rewind(fabric, &prev_best.entry.hash);
883                    }
884                    break;
885                } else {
886                    // mutations match, safe to root the entry
887                    info!(
888                        "proc_consensus: rooting entry {} at height {} with score {:.2}",
889                        bs58::encode(&best_entry.hash).into_string(),
890                        best_entry.header.height,
891                        score
892                    );
893                    fabric.set_rooted_hash_height(best_entry)?;
894                    // continue loop to process next height
895                }
896            }
897        }
898    }
899
900    // check if rooted tip changed
901    let final_rooted_hash = fabric.get_rooted_hash()?.unwrap_or([0u8; 32]);
902    if final_rooted_hash != initial_rooted_hash {
903        info!(
904            "proc_consensus: rooted tip changed from {} to {}",
905            bs58::encode(&initial_rooted_hash).into_string(),
906            bs58::encode(&final_rooted_hash).into_string()
907        );
908        // TODO: trigger event_consensus and NodeGen.broadcast_tip()
909    }
910
911    Ok(())
912}
913
914pub fn validate_next_entry(current_entry: &Entry, next_entry: &Entry) -> Result<(), Error> {
915    let ceh = &current_entry.header;
916    let neh = &next_entry.header;
917
918    // validate slot consistency
919    if ceh.slot as i64 != neh.prev_slot {
920        return Err(Error::WrongType("invalid_slot"));
921    }
922
923    // validate height consistency
924    if ceh.height != (neh.height - 1) {
925        return Err(Error::WrongType("invalid_height"));
926    }
927
928    // validate hash consistency
929    if current_entry.hash != neh.prev_hash {
930        return Err(Error::WrongType("invalid_hash"));
931    }
932
933    // validate dr (deterministic random)
934    let expected_dr = crate::utils::blake3::hash(ceh.dr.as_ref());
935    if expected_dr != neh.dr {
936        return Err(Error::WrongType("invalid_dr"));
937    }
938
939    // validate vr (verifiable random)
940    if bls::verify(&neh.signer, &neh.vr, &*ceh.vr, DST_VRF).is_err() {
941        return Err(Error::InvalidSignature);
942    }
943
944    // TODO: validate transactions with TXPool.validate_tx
945    // For now, we'll skip detailed transaction validation
946
947    Ok(())
948}
949
950/// Stub function for checking if the node is synced with quorum
951/// Returns true if the node is within X entries of the quorum (BFT threshold)
952/// TODO: implement proper sync checking via FabricSyncAttestGen.isQuorumSyncedOffByX
953fn is_quorum_synced_off_by_x(fabric: &Fabric, x: u64) -> bool {
954    // stub implementation - check if rooted tip is close to temporal tip
955    let temporal_height = fabric.get_temporal_height().ok().flatten().unwrap_or(0);
956    let rooted_height = fabric.get_rooted_height().ok().flatten().unwrap_or(0);
957
958    // consider synced if within X entries of temporal tip
959    temporal_height.saturating_sub(rooted_height) <= x
960}
961
962pub fn delete_transactions_from_pool(_txs: &[crate::consensus::doms::tx::EntryTx]) {
963    // TODO: integrate TXPool with Context to enable transaction removal
964    // Implementation exists: TXPool::delete_packed has been implemented in node/txpool.rs
965    // What's needed:
966    // 1. Add TXPool field to Context struct
967    // 2. Initialize TXPool in Context::with_config_and_socket
968    // 3. Call ctx.txpool.delete_packed(txs).await here
969    // This is critical to prevent memory leaks and double-spending
970}
971
972#[derive(Debug, Clone)]
973pub struct SoftforkSettings {
974    pub softfork_hash: Vec<Hash>,
975    pub softfork_deny_hash: Vec<Hash>,
976}
977
978pub fn get_softfork_settings() -> SoftforkSettings {
979    // TODO: read from persistent_term or config
980    // For now, return empty settings
981    SoftforkSettings { softfork_hash: Vec::new(), softfork_deny_hash: Vec::new() }
982}
983
984pub async fn proc_entries(fabric: &Fabric, config: &crate::config::Config, ctx: &crate::Context) -> Result<(), Error> {
985    // skip processing if no temporal_tip or if entry data not available yet
986    if fabric.get_temporal_entry()?.is_none() {
987        return Ok(());
988    }
989
990    let softfork_settings = get_softfork_settings();
991
992    // use a loop instead of tail recursion (Rust doesn't optimize tail calls)
993    loop {
994        let cur_entry = fabric.get_temporal_entry()?.ok_or(Error::Missing("temporal_tip"))?;
995        let cur_slot = cur_entry.header.slot;
996        let next_height = cur_entry.header.height + 1;
997
998        // filter and sort entries using functional pipeline matching Elixir logic
999        let mut next_entries: Vec<Entry> = fabric
1000            .entries_by_height(next_height as u64)?
1001            .into_iter()
1002            .filter_map(|entry_bin| Entry::from_vecpak_bin(&entry_bin).ok())
1003            .filter(|next_entry| {
1004                // all conditions must be true (matches Elixir cond logic)
1005                fabric.validate_entry_slot_trainer(next_entry, cur_slot)
1006                    && !softfork_settings.softfork_deny_hash.contains(&next_entry.hash)
1007                    && validate_next_entry(&cur_entry, next_entry).is_ok()
1008            })
1009            .collect();
1010
1011        // sort by tuple (matches Elixir sort_by with tuple comparison)
1012        next_entries.sort_by_key(|entry| {
1013            (
1014                softfork_settings.softfork_hash.contains(&entry.hash), // false (not in list) comes first
1015                entry.header.slot,
1016                entry.mask.is_some(), // false (no mask) comes first (Elixir !mask)
1017                entry.hash,
1018            )
1019        });
1020
1021        // process first entry if available (matches Elixir pattern matching)
1022        let Some(entry) = next_entries.first() else {
1023            return Ok(());
1024        };
1025
1026        // apply entry (matches Elixir Task.async pattern but synchronously)
1027        let attestation_packed = apply_entry(fabric, config, entry)?;
1028
1029        // TODO: FabricEventGen.event_applied(entry, mutations_hash, muts, logs)
1030        debug!("Applied entry {} at height {}", bs58::encode(&entry.hash).into_string(), entry.header.height);
1031
1032        // broadcast attestation if synced and we're a trainer
1033        if let Some(attestation_packed) = attestation_packed {
1034            if is_quorum_synced_off_by_x(fabric, 6) {
1035                broadcast_attestation(ctx, &attestation_packed, &entry.hash).await;
1036            }
1037        }
1038
1039        // remove transactions from pool (matches Elixir TXPool.delete_packed)
1040        delete_transactions_from_pool(&entry.txs);
1041
1042        // continue loop to process more entries
1043    }
1044}
1045
1046/// Helper to broadcast attestation to peers and seed nodes
1047async fn broadcast_attestation(ctx: &crate::Context, attestation_packed: &[u8], entry_hash: &Hash) {
1048    use crate::consensus::doms::attestation::{Attestation, EventAttestation};
1049
1050    let Some(attestation) = Attestation::from_vecpak_bin(attestation_packed) else {
1051        warn!("failed to decode attestation for broadcast");
1052        return;
1053    };
1054
1055    let event_att = Protocol::EventAttestation(EventAttestation::new(vec![attestation]));
1056
1057    if let Ok(peers) = ctx.peers.get_all().await {
1058        for peer in peers {
1059            let _ = event_att.send_to_with_metrics(ctx, peer.ip).await;
1060        }
1061    }
1062
1063    for seed_ip in &ctx.config.seed_ips {
1064        let _ = event_att.send_to_with_metrics(ctx, *seed_ip).await;
1065    }
1066
1067    debug!("Broadcasted attestation for entry {}", bs58::encode(entry_hash).into_string());
1068}
1069
1070/// Hash mutations with transaction results prepended, hash_mutations(l ++ m)
1071pub fn hash_mutations_with_results(muts: &[Mutation], results: &[TxResult]) -> [u8; 32] {
1072    use crate::utils::safe_etf::{encode_safe_deterministic, u64_to_term};
1073    use eetf::{Atom, Binary, List, Map, Term};
1074    use std::collections::HashMap;
1075
1076    let mut etf_list = Vec::new();
1077
1078    for result in results {
1079        let mut map = HashMap::new();
1080        map.insert(Term::Atom(Atom::from("error")), Term::Atom(Atom::from(result.error.as_str())));
1081        etf_list.push(Term::Map(Map { map }));
1082    }
1083
1084    for m in muts {
1085        let mut map = HashMap::new();
1086
1087        match m {
1088            Mutation::Put { op: _, key, value } => {
1089                map.insert(Term::Atom(Atom::from("op")), Term::Atom(Atom::from("put")));
1090                map.insert(Term::Atom(Atom::from("key")), Term::Binary(Binary { bytes: key.clone() }));
1091                map.insert(Term::Atom(Atom::from("value")), Term::Binary(Binary { bytes: value.clone() }));
1092            }
1093            Mutation::Delete { op: _, key } => {
1094                map.insert(Term::Atom(Atom::from("op")), Term::Atom(Atom::from("delete")));
1095                map.insert(Term::Atom(Atom::from("key")), Term::Binary(Binary { bytes: key.clone() }));
1096            }
1097            Mutation::SetBit { op: _, key, value, bloomsize } => {
1098                map.insert(Term::Atom(Atom::from("op")), Term::Atom(Atom::from("set_bit")));
1099                map.insert(Term::Atom(Atom::from("key")), Term::Binary(Binary { bytes: key.clone() }));
1100                map.insert(Term::Atom(Atom::from("value")), u64_to_term(*value));
1101                map.insert(Term::Atom(Atom::from("bloomsize")), u64_to_term(*bloomsize));
1102            }
1103            Mutation::ClearBit { op: _, key, value } => {
1104                map.insert(Term::Atom(Atom::from("op")), Term::Atom(Atom::from("clear_bit")));
1105                map.insert(Term::Atom(Atom::from("key")), Term::Binary(Binary { bytes: key.clone() }));
1106                map.insert(Term::Atom(Atom::from("value")), u64_to_term(*value));
1107            }
1108        }
1109
1110        etf_list.push(Term::Map(Map { map }));
1111    }
1112
1113    // Create list term
1114    let list_term = Term::List(List { elements: etf_list });
1115
1116    // Encode with deterministic ETF encoding
1117    let encoded = encode_safe_deterministic(&list_term);
1118
1119    // Hash the encoded bytes
1120    amadeus_utils::blake3::hash(&encoded)
1121}