1
  2
  3
  4
  5
  6
  7
  8
  9
 10
 11
 12
 13
 14
 15
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
use std::sync::Arc;

use bytes::Bytes;
use creep::Context;
use futures::channel::mpsc::UnboundedSender;
use muta_apm::derive::tracing_span;

use crate::types::{Address, AggregatedVote, OverlordMsg};
use crate::utils::auth_manage::AuthorityManage;
use crate::{Codec, ConsensusResult, Crypto};

#[tracing_span(kind = "overlord.vreify_sig_pool")]
pub async fn parallel_verify<T: Codec + 'static, C: Crypto + Sync + 'static>(
    ctx: Context,
    msg: OverlordMsg<T>,
    crypto: Arc<C>,
    authority: AuthorityManage,
    tx: UnboundedSender<(Context, OverlordMsg<T>)>,
) {
    let msg_clone = msg.clone();
    tokio::spawn(async move {
        match msg {
            OverlordMsg::SignedProposal(sp) => {
                let hash = crypto.hash(Bytes::from(rlp::encode(&sp.proposal)));
                if let Err(err) = crypto.verify_signature(
                    sp.signature.clone(),
                    hash,
                    sp.proposal.proposer.clone(),
                ) {
                    log::error!(
                        "Overlord: verify {:?} proposal signature failed {:?}",
                        sp,
                        err
                    );
                    return;
                }

                if let Some(polc) = sp.proposal.lock {
                    verify_qc(
                        ctx.clone(),
                        crypto,
                        polc.lock_votes,
                        authority,
                        tx.clone(),
                        msg_clone.clone(),
                    );
                } else {
                    let _ = tx.unbounded_send((ctx, msg_clone));
                }
            }

            OverlordMsg::SignedVote(sv) => {
                let hash = crypto.hash(Bytes::from(rlp::encode(&sv.vote)));
                crypto
                    .verify_signature(sv.signature.clone(), hash, sv.voter.clone())
                    .map_or_else(
                        |err| {
                            log::error!(
                                "Overlord: verify {:?} vote signature failed {:?}",
                                sv,
                                err
                            );
                        },
                        |_| {
                            let _ = tx.unbounded_send((ctx, msg_clone));
                        },
                    );
            }

            OverlordMsg::AggregatedVote(qc) => {
                verify_qc(ctx, crypto, qc, authority, tx, msg_clone);
            }

            OverlordMsg::SignedChoke(sc) => {
                let hash = crypto.hash(Bytes::from(rlp::encode(&sc.choke.to_hash())));
                crypto
                    .verify_signature(sc.signature.clone(), hash, sc.address.clone())
                    .map_or_else(
                        |err| {
                            log::error!(
                                "Overlord: verify {:?} choke signature failed {:?}",
                                sc,
                                err
                            );
                        },
                        |_| {
                            let _ = tx.unbounded_send((ctx, msg_clone));
                        },
                    )
            }

            _ => (),
        }
    });
}

fn get_voters(
    addr_bitmap: &Bytes,
    authority_manage: AuthorityManage,
) -> ConsensusResult<Vec<Address>> {
    authority_manage.is_above_threshold(&addr_bitmap)?;
    authority_manage.get_voters(&addr_bitmap)
}

fn verify_qc<T: Codec, C: Crypto>(
    ctx: Context,
    crypto: Arc<C>,
    qc: AggregatedVote,
    authority: AuthorityManage,
    tx: UnboundedSender<(Context, OverlordMsg<T>)>,
    msg_clone: OverlordMsg<T>,
) {
    let hash = crypto.hash(Bytes::from(rlp::encode(&qc.to_vote())));
    if let Ok(voters) = get_voters(&qc.signature.address_bitmap, authority) {
        crypto
            .verify_aggregated_signature(qc.signature.signature.clone(), hash, voters)
            .map_or_else(
                |err| {
                    log::error!(
                        "Overlord: verify {:?} aggregated signature error {:?}",
                        qc,
                        err
                    );
                },
                |_| {
                    let _ = tx.unbounded_send((ctx, msg_clone));
                },
            );
    }
}