#![cfg_attr(not(feature = "std"), no_std)]
mod mock;
mod tests;
mod benchmarking;
pub mod weights;
use tet_application_crypto::RuntimeAppPublic;
use codec::{Encode, Decode};
use tet_core::offchain::OpaqueNetworkState;
use tetcore_std::prelude::*;
use tetcore_std::convert::TryInto;
use noble_session::historical::IdentificationTuple;
use tp_runtime::{
offchain::storage::StorageValueRef,
RuntimeDebug,
traits::{Convert, Member, Saturating, AtLeast32BitUnsigned}, Perbill,
transaction_validity::{
TransactionValidity, ValidTransaction, InvalidTransaction, TransactionSource,
TransactionPriority,
},
};
use tp_staking::{
SessionIndex,
offence::{ReportOffence, Offence, Kind},
};
use fabric_support::{
decl_module, decl_event, decl_storage, Parameter, debug, decl_error,
traits::Get,
};
use fabric_system::ensure_none;
use fabric_system::offchain::{
SendTransactionTypes,
SubmitTransaction,
};
pub use weights::WeightInfo;
pub mod sr25519 {
mod app_sr25519 {
use tet_application_crypto::{app_crypto, key_types::IM_ONLINE, sr25519};
app_crypto!(sr25519, IM_ONLINE);
}
tet_application_crypto::with_pair! {
pub type AuthorityPair = app_sr25519::Pair;
}
pub type AuthoritySignature = app_sr25519::Signature;
pub type AuthorityId = app_sr25519::Public;
}
pub mod ed25519 {
mod app_ed25519 {
use tet_application_crypto::{app_crypto, key_types::IM_ONLINE, ed25519};
app_crypto!(ed25519, IM_ONLINE);
}
tet_application_crypto::with_pair! {
pub type AuthorityPair = app_ed25519::Pair;
}
pub type AuthoritySignature = app_ed25519::Signature;
pub type AuthorityId = app_ed25519::Public;
}
const DB_PREFIX: &[u8] = b"tetsy/im-online-heartbeat/";
const INCLUDE_THRESHOLD: u32 = 3;
#[derive(Encode, Decode, Clone, PartialEq, Eq, RuntimeDebug)]
struct HeartbeatStatus<BlockNumber> {
pub session_index: SessionIndex,
pub sent_at: BlockNumber,
}
impl<BlockNumber: PartialEq + AtLeast32BitUnsigned + Copy> HeartbeatStatus<BlockNumber> {
fn is_recent(&self, session_index: SessionIndex, now: BlockNumber) -> bool {
self.session_index == session_index && self.sent_at + INCLUDE_THRESHOLD.into() > now
}
}
#[cfg_attr(test, derive(PartialEq))]
enum OffchainErr<BlockNumber> {
TooEarly(BlockNumber),
WaitingForInclusion(BlockNumber),
AlreadyOnline(u32),
FailedSigning,
FailedToAcquireLock,
NetworkState,
SubmitTransaction,
}
impl<BlockNumber: tetcore_std::fmt::Debug> tetcore_std::fmt::Debug for OffchainErr<BlockNumber> {
fn fmt(&self, fmt: &mut tetcore_std::fmt::Formatter) -> tetcore_std::fmt::Result {
match *self {
OffchainErr::TooEarly(ref block) =>
write!(fmt, "Too early to send heartbeat, next expected at {:?}", block),
OffchainErr::WaitingForInclusion(ref block) =>
write!(fmt, "Heartbeat already sent at {:?}. Waiting for inclusion.", block),
OffchainErr::AlreadyOnline(auth_idx) =>
write!(fmt, "Authority {} is already online", auth_idx),
OffchainErr::FailedSigning => write!(fmt, "Failed to sign heartbeat"),
OffchainErr::FailedToAcquireLock => write!(fmt, "Failed to acquire lock"),
OffchainErr::NetworkState => write!(fmt, "Failed to fetch network state"),
OffchainErr::SubmitTransaction => write!(fmt, "Failed to submit transaction"),
}
}
}
pub type AuthIndex = u32;
#[derive(Encode, Decode, Clone, PartialEq, Eq, RuntimeDebug)]
pub struct Heartbeat<BlockNumber>
where BlockNumber: PartialEq + Eq + Decode + Encode,
{
pub block_number: BlockNumber,
pub network_state: OpaqueNetworkState,
pub session_index: SessionIndex,
pub authority_index: AuthIndex,
pub validators_len: u32,
}
pub trait Config: SendTransactionTypes<Call<Self>> + noble_session::historical::Config {
type AuthorityId: Member + Parameter + RuntimeAppPublic + Default + Ord;
type Event: From<Event<Self>> + Into<<Self as fabric_system::Config>::Event>;
type SessionDuration: Get<Self::BlockNumber>;
type ReportUnresponsiveness:
ReportOffence<
Self::AccountId,
IdentificationTuple<Self>,
UnresponsivenessOffence<IdentificationTuple<Self>>,
>;
type UnsignedPriority: Get<TransactionPriority>;
type WeightInfo: WeightInfo;
}
decl_event!(
pub enum Event<T> where
<T as Config>::AuthorityId,
IdentificationTuple = IdentificationTuple<T>,
{
HeartbeatReceived(AuthorityId),
AllGood,
SomeOffline(Vec<IdentificationTuple>),
}
);
decl_storage! {
trait Store for Module<T: Config> as ImOnline {
HeartbeatAfter get(fn heartbeat_after): T::BlockNumber;
Keys get(fn keys): Vec<T::AuthorityId>;
ReceivedHeartbeats get(fn received_heartbeats):
double_map hasher(twox_64_concat) SessionIndex, hasher(twox_64_concat) AuthIndex
=> Option<Vec<u8>>;
AuthoredBlocks get(fn authored_blocks):
double_map hasher(twox_64_concat) SessionIndex, hasher(twox_64_concat) T::ValidatorId
=> u32;
}
add_extra_genesis {
config(keys): Vec<T::AuthorityId>;
build(|config| Module::<T>::initialize_keys(&config.keys))
}
}
decl_error! {
pub enum Error for Module<T: Config> {
InvalidKey,
DuplicatedHeartbeat,
}
}
decl_module! {
pub struct Module<T: Config> for enum Call where origin: T::Origin {
type Error = Error<T>;
fn deposit_event() = default;
#[weight = <T as Config>::WeightInfo::validate_unsigned_and_then_heartbeat(
heartbeat.validators_len as u32,
heartbeat.network_state.external_addresses.len() as u32,
)]
fn heartbeat(
origin,
heartbeat: Heartbeat<T::BlockNumber>,
_signature: <T::AuthorityId as RuntimeAppPublic>::Signature,
) {
ensure_none(origin)?;
let current_session = <noble_session::Module<T>>::current_index();
let exists = <ReceivedHeartbeats>::contains_key(
¤t_session,
&heartbeat.authority_index
);
let keys = Keys::<T>::get();
let public = keys.get(heartbeat.authority_index as usize);
if let (false, Some(public)) = (exists, public) {
Self::deposit_event(Event::<T>::HeartbeatReceived(public.clone()));
let network_state = heartbeat.network_state.encode();
<ReceivedHeartbeats>::insert(
¤t_session,
&heartbeat.authority_index,
&network_state
);
} else if exists {
Err(Error::<T>::DuplicatedHeartbeat)?
} else {
Err(Error::<T>::InvalidKey)?
}
}
fn offchain_worker(now: T::BlockNumber) {
if tet_io::offchain::is_validator() {
for res in Self::send_heartbeats(now).into_iter().flatten() {
if let Err(e) = res {
debug::debug!(
target: "imonline",
"Skipping heartbeat at {:?}: {:?}",
now,
e,
)
}
}
} else {
debug::trace!(
target: "imonline",
"Skipping heartbeat at {:?}. Not a validator.",
now,
)
}
}
}
}
type OffchainResult<T, A> = Result<A, OffchainErr<<T as fabric_system::Config>::BlockNumber>>;
impl<T: Config + noble_authorship::Config> noble_authorship::EventHandler<T::ValidatorId, T::BlockNumber> for Module<T> {
fn note_author(author: T::ValidatorId) {
Self::note_authorship(author);
}
fn note_uncle(author: T::ValidatorId, _age: T::BlockNumber) {
Self::note_authorship(author);
}
}
impl<T: Config> Module<T> {
pub fn is_online(authority_index: AuthIndex) -> bool {
let current_validators = <noble_session::Module<T>>::validators();
if authority_index >= current_validators.len() as u32 {
return false;
}
let authority = ¤t_validators[authority_index as usize];
Self::is_online_aux(authority_index, authority)
}
fn is_online_aux(authority_index: AuthIndex, authority: &T::ValidatorId) -> bool {
let current_session = <noble_session::Module<T>>::current_index();
<ReceivedHeartbeats>::contains_key(¤t_session, &authority_index) ||
<AuthoredBlocks<T>>::get(
¤t_session,
authority,
) != 0
}
pub fn received_heartbeat_in_current_session(authority_index: AuthIndex) -> bool {
let current_session = <noble_session::Module<T>>::current_index();
<ReceivedHeartbeats>::contains_key(¤t_session, &authority_index)
}
fn note_authorship(author: T::ValidatorId) {
let current_session = <noble_session::Module<T>>::current_index();
<AuthoredBlocks<T>>::mutate(
¤t_session,
author,
|authored| *authored += 1,
);
}
pub(crate) fn send_heartbeats(block_number: T::BlockNumber)
-> OffchainResult<T, impl Iterator<Item=OffchainResult<T, ()>>>
{
let heartbeat_after = <HeartbeatAfter<T>>::get();
if block_number < heartbeat_after {
return Err(OffchainErr::TooEarly(heartbeat_after))
}
let session_index = <noble_session::Module<T>>::current_index();
let validators_len = <noble_session::Module<T>>::validators().len() as u32;
Ok(Self::local_authority_keys()
.map(move |(authority_index, key)|
Self::send_single_heartbeat(
authority_index,
key,
session_index,
block_number,
validators_len,
)
))
}
fn send_single_heartbeat(
authority_index: u32,
key: T::AuthorityId,
session_index: SessionIndex,
block_number: T::BlockNumber,
validators_len: u32,
) -> OffchainResult<T, ()> {
let prepare_heartbeat = || -> OffchainResult<T, Call<T>> {
let network_state = tet_io::offchain::network_state()
.map_err(|_| OffchainErr::NetworkState)?;
let heartbeat_data = Heartbeat {
block_number,
network_state,
session_index,
authority_index,
validators_len,
};
let signature = key.sign(&heartbeat_data.encode()).ok_or(OffchainErr::FailedSigning)?;
Ok(Call::heartbeat(heartbeat_data, signature))
};
if Self::is_online(authority_index) {
return Err(OffchainErr::AlreadyOnline(authority_index));
}
Self::with_heartbeat_lock(
authority_index,
session_index,
block_number,
|| {
let call = prepare_heartbeat()?;
debug::info!(
target: "imonline",
"[index: {:?}] Reporting im-online at block: {:?} (session: {:?}): {:?}",
authority_index,
block_number,
session_index,
call,
);
SubmitTransaction::<T, Call<T>>::submit_unsigned_transaction(call.into())
.map_err(|_| OffchainErr::SubmitTransaction)?;
Ok(())
},
)
}
fn local_authority_keys() -> impl Iterator<Item=(u32, T::AuthorityId)> {
let authorities = Keys::<T>::get();
let mut local_keys = T::AuthorityId::all();
local_keys.sort();
authorities.into_iter()
.enumerate()
.filter_map(move |(index, authority)| {
local_keys.binary_search(&authority)
.ok()
.map(|location| (index as u32, local_keys[location].clone()))
})
}
fn with_heartbeat_lock<R>(
authority_index: u32,
session_index: SessionIndex,
now: T::BlockNumber,
f: impl FnOnce() -> OffchainResult<T, R>,
) -> OffchainResult<T, R> {
let key = {
let mut key = DB_PREFIX.to_vec();
key.extend(authority_index.encode());
key
};
let storage = StorageValueRef::persistent(&key);
let res = storage.mutate(|status: Option<Option<HeartbeatStatus<T::BlockNumber>>>| {
match status {
Some(Some(status)) if status.is_recent(session_index, now) => {
Err(OffchainErr::WaitingForInclusion(status.sent_at))
},
_ => Ok(HeartbeatStatus {
session_index,
sent_at: now,
}),
}
})?;
let mut new_status = res.map_err(|_| OffchainErr::FailedToAcquireLock)?;
let res = f();
if res.is_err() {
new_status.sent_at = 0u32.into();
storage.set(&new_status);
}
res
}
fn initialize_keys(keys: &[T::AuthorityId]) {
if !keys.is_empty() {
assert!(Keys::<T>::get().is_empty(), "Keys are already initialized!");
Keys::<T>::put(keys);
}
}
#[cfg(test)]
fn set_keys(keys: Vec<T::AuthorityId>) {
Keys::<T>::put(&keys)
}
}
impl<T: Config> tp_runtime::BoundToRuntimeAppPublic for Module<T> {
type Public = T::AuthorityId;
}
impl<T: Config> noble_session::OneSessionHandler<T::AccountId> for Module<T> {
type Key = T::AuthorityId;
fn on_genesis_session<'a, I: 'a>(validators: I)
where I: Iterator<Item=(&'a T::AccountId, T::AuthorityId)>
{
let keys = validators.map(|x| x.1).collect::<Vec<_>>();
Self::initialize_keys(&keys);
}
fn on_new_session<'a, I: 'a>(_changed: bool, validators: I, _queued_validators: I)
where I: Iterator<Item=(&'a T::AccountId, T::AuthorityId)>
{
let block_number = <fabric_system::Module<T>>::block_number();
let half_session = T::SessionDuration::get() / 2u32.into();
<HeartbeatAfter<T>>::put(block_number + half_session);
Keys::<T>::put(validators.map(|x| x.1).collect::<Vec<_>>());
}
fn on_before_session_ending() {
let session_index = <noble_session::Module<T>>::current_index();
let keys = Keys::<T>::get();
let current_validators = <noble_session::Module<T>>::validators();
let offenders = current_validators.into_iter().enumerate()
.filter(|(index, id)|
!Self::is_online_aux(*index as u32, id)
).filter_map(|(_, id)|
T::FullIdentificationOf::convert(id.clone()).map(|full_id| (id, full_id))
).collect::<Vec<IdentificationTuple<T>>>();
<ReceivedHeartbeats>::remove_prefix(&<noble_session::Module<T>>::current_index());
<AuthoredBlocks<T>>::remove_prefix(&<noble_session::Module<T>>::current_index());
if offenders.is_empty() {
Self::deposit_event(RawEvent::AllGood);
} else {
Self::deposit_event(RawEvent::SomeOffline(offenders.clone()));
let validator_set_count = keys.len() as u32;
let offence = UnresponsivenessOffence { session_index, validator_set_count, offenders };
if let Err(e) = T::ReportUnresponsiveness::report_offence(vec![], offence) {
tp_runtime::print(e);
}
}
}
fn on_disabled(_i: usize) {
}
}
const INVALID_VALIDATORS_LEN: u8 = 10;
impl<T: Config> fabric_support::unsigned::ValidateUnsigned for Module<T> {
type Call = Call<T>;
fn validate_unsigned(
_source: TransactionSource,
call: &Self::Call,
) -> TransactionValidity {
if let Call::heartbeat(heartbeat, signature) = call {
if <Module<T>>::is_online(heartbeat.authority_index) {
return InvalidTransaction::Stale.into();
}
let current_session = <noble_session::Module<T>>::current_index();
if heartbeat.session_index != current_session {
return InvalidTransaction::Stale.into();
}
let keys = Keys::<T>::get();
if keys.len() as u32 != heartbeat.validators_len {
return InvalidTransaction::Custom(INVALID_VALIDATORS_LEN).into();
}
let authority_id = match keys.get(heartbeat.authority_index as usize) {
Some(id) => id,
None => return InvalidTransaction::BadProof.into(),
};
let signature_valid = heartbeat.using_encoded(|encoded_heartbeat| {
authority_id.verify(&encoded_heartbeat, &signature)
});
if !signature_valid {
return InvalidTransaction::BadProof.into();
}
ValidTransaction::with_tag_prefix("ImOnline")
.priority(T::UnsignedPriority::get())
.and_provides((current_session, authority_id))
.longevity(TryInto::<u64>::try_into(
T::SessionDuration::get() / 2u32.into()
).unwrap_or(64_u64))
.propagate(true)
.build()
} else {
InvalidTransaction::Call.into()
}
}
}
#[derive(RuntimeDebug)]
#[cfg_attr(feature = "std", derive(Clone, PartialEq, Eq))]
pub struct UnresponsivenessOffence<Offender> {
pub session_index: SessionIndex,
pub validator_set_count: u32,
pub offenders: Vec<Offender>,
}
impl<Offender: Clone> Offence<Offender> for UnresponsivenessOffence<Offender> {
const ID: Kind = *b"im-online:offlin";
type TimeSlot = SessionIndex;
fn offenders(&self) -> Vec<Offender> {
self.offenders.clone()
}
fn session_index(&self) -> SessionIndex {
self.session_index
}
fn validator_set_count(&self) -> u32 {
self.validator_set_count
}
fn time_slot(&self) -> Self::TimeSlot {
self.session_index
}
fn slash_fraction(offenders: u32, validator_set_count: u32) -> Perbill {
if let Some(threshold) = offenders.checked_sub(validator_set_count / 10 + 1) {
let x = Perbill::from_rational_approximation(3 * threshold, validator_set_count);
x.saturating_mul(Perbill::from_percent(7))
} else {
Perbill::default()
}
}
}