#![warn(missing_docs)]
use std::sync::Arc;
use futures::{FutureExt, TryFutureExt, TryStreamExt, StreamExt};
use log::warn;
use tetsy_jsonrpc_derive::rpc;
use tetsy_jsonrpc_pubsub::{typed::Subscriber, SubscriptionId, manager::SubscriptionManager};
use tetsy_jsonrpc_core::futures::{
sink::Sink as Sink01,
stream::Stream as Stream01,
future::Future as Future01,
future::Executor as Executor01,
};
mod error;
mod finality;
mod notification;
mod report;
use tc_finality_grandpa::GrandpaJustificationStream;
use tp_runtime::traits::{Block as BlockT, NumberFor};
use finality::{EncodedFinalityProof, RpcFinalityProofProvider};
use report::{ReportAuthoritySet, ReportVoterState, ReportedRoundStates};
use notification::JustificationNotification;
type FutureResult<T> =
Box<dyn tetsy_jsonrpc_core::futures::Future<Item = T, Error = tetsy_jsonrpc_core::Error> + Send>;
#[rpc]
pub trait GrandpaApi<Notification, Hash, Number> {
type Metadata;
#[rpc(name = "grandpa_roundState")]
fn round_state(&self) -> FutureResult<ReportedRoundStates>;
#[pubsub(
subscription = "grandpa_justifications",
subscribe,
name = "grandpa_subscribeJustifications"
)]
fn subscribe_justifications(
&self,
metadata: Self::Metadata,
subscriber: Subscriber<Notification>
);
#[pubsub(
subscription = "grandpa_justifications",
unsubscribe,
name = "grandpa_unsubscribeJustifications"
)]
fn unsubscribe_justifications(
&self,
metadata: Option<Self::Metadata>,
id: SubscriptionId
) -> tetsy_jsonrpc_core::Result<bool>;
#[rpc(name = "grandpa_proveFinality")]
fn prove_finality(
&self,
block: Number,
) -> FutureResult<Option<EncodedFinalityProof>>;
}
pub struct GrandpaRpcHandler<AuthoritySet, VoterState, Block: BlockT, ProofProvider> {
authority_set: AuthoritySet,
voter_state: VoterState,
justification_stream: GrandpaJustificationStream<Block>,
manager: SubscriptionManager,
finality_proof_provider: Arc<ProofProvider>,
}
impl<AuthoritySet, VoterState, Block: BlockT, ProofProvider>
GrandpaRpcHandler<AuthoritySet, VoterState, Block, ProofProvider>
{
pub fn new<E>(
authority_set: AuthoritySet,
voter_state: VoterState,
justification_stream: GrandpaJustificationStream<Block>,
executor: E,
finality_proof_provider: Arc<ProofProvider>,
) -> Self
where
E: Executor01<Box<dyn Future01<Item = (), Error = ()> + Send>> + Send + Sync + 'static,
{
let manager = SubscriptionManager::new(Arc::new(executor));
Self {
authority_set,
voter_state,
justification_stream,
manager,
finality_proof_provider,
}
}
}
impl<AuthoritySet, VoterState, Block, ProofProvider>
GrandpaApi<JustificationNotification, Block::Hash, NumberFor<Block>>
for GrandpaRpcHandler<AuthoritySet, VoterState, Block, ProofProvider>
where
VoterState: ReportVoterState + Send + Sync + 'static,
AuthoritySet: ReportAuthoritySet + Send + Sync + 'static,
Block: BlockT,
ProofProvider: RpcFinalityProofProvider<Block> + Send + Sync + 'static,
{
type Metadata = tc_rpc::Metadata;
fn round_state(&self) -> FutureResult<ReportedRoundStates> {
let round_states = ReportedRoundStates::from(&self.authority_set, &self.voter_state);
let future = async move { round_states }.boxed();
Box::new(future.map_err(tetsy_jsonrpc_core::Error::from).compat())
}
fn subscribe_justifications(
&self,
_metadata: Self::Metadata,
subscriber: Subscriber<JustificationNotification>
) {
let stream = self.justification_stream.subscribe()
.map(|x| Ok::<_,()>(JustificationNotification::from(x)))
.map_err(|e| warn!("Notification stream error: {:?}", e))
.compat();
self.manager.add(subscriber, |sink| {
let stream = stream.map(|res| Ok(res));
sink.sink_map_err(|e| warn!("Error sending notifications: {:?}", e))
.send_all(stream)
.map(|_| ())
});
}
fn unsubscribe_justifications(
&self,
_metadata: Option<Self::Metadata>,
id: SubscriptionId
) -> tetsy_jsonrpc_core::Result<bool> {
Ok(self.manager.cancel(id))
}
fn prove_finality(
&self,
block: NumberFor<Block>,
) -> FutureResult<Option<EncodedFinalityProof>> {
let result = self.finality_proof_provider.rpc_prove_finality(block);
let future = async move { result }.boxed();
Box::new(
future
.map_err(|e| {
warn!("Error proving finality: {}", e);
error::Error::ProveFinalityFailed(e)
})
.map_err(tetsy_jsonrpc_core::Error::from)
.compat()
)
}
}
#[cfg(test)]
mod tests {
use super::*;
use std::{collections::HashSet, convert::TryInto, sync::Arc};
use tetsy_jsonrpc_core::{Notification, Output, types::Params};
use tetsy_scale_codec::{Encode, Decode};
use tc_block_builder::BlockBuilder;
use tc_finality_grandpa::{
report, AuthorityId, GrandpaJustificationSender, GrandpaJustification,
FinalityProof,
};
use tp_blockchain::HeaderBackend;
use tp_consensus::RecordProof;
use tet_core::crypto::Public;
use tp_keyring::Ed25519Keyring;
use tp_runtime::traits::{Block as BlockT, Header as HeaderT};
use tetcore_test_runtime_client::{
runtime::{Block, Header, H256},
DefaultTestClientBuilderExt,
TestClientBuilderExt,
TestClientBuilder,
};
struct TestAuthoritySet;
struct TestVoterState;
struct EmptyVoterState;
struct TestFinalityProofProvider {
finality_proof: Option<FinalityProof<Header>>,
}
fn voters() -> HashSet<AuthorityId> {
let voter_id_1 = AuthorityId::from_slice(&[1; 32]);
let voter_id_2 = AuthorityId::from_slice(&[2; 32]);
vec![voter_id_1, voter_id_2].into_iter().collect()
}
impl ReportAuthoritySet for TestAuthoritySet {
fn get(&self) -> (u64, HashSet<AuthorityId>) {
(1, voters())
}
}
impl ReportVoterState for EmptyVoterState {
fn get(&self) -> Option<report::VoterState<AuthorityId>> {
None
}
}
fn header(number: u64) -> Header {
let parent_hash = match number {
0 => Default::default(),
_ => header(number - 1).hash(),
};
Header::new(
number,
H256::from_low_u64_be(0),
H256::from_low_u64_be(0),
parent_hash,
Default::default(),
)
}
impl<Block: BlockT> RpcFinalityProofProvider<Block> for TestFinalityProofProvider {
fn rpc_prove_finality(
&self,
_block: NumberFor<Block>
) -> Result<Option<EncodedFinalityProof>, tc_finality_grandpa::FinalityProofError> {
Ok(Some(EncodedFinalityProof(
self.finality_proof
.as_ref()
.expect("Don't call rpc_prove_finality without setting the FinalityProof")
.encode()
.into()
)))
}
}
impl ReportVoterState for TestVoterState {
fn get(&self) -> Option<report::VoterState<AuthorityId>> {
let voter_id_1 = AuthorityId::from_slice(&[1; 32]);
let voters_best: HashSet<_> = vec![voter_id_1].into_iter().collect();
let best_round_state = tc_finality_grandpa::report::RoundState {
total_weight: 100_u64.try_into().unwrap(),
threshold_weight: 67_u64.try_into().unwrap(),
prevote_current_weight: 50.into(),
prevote_ids: voters_best,
precommit_current_weight: 0.into(),
precommit_ids: HashSet::new(),
};
let past_round_state = tc_finality_grandpa::report::RoundState {
total_weight: 100_u64.try_into().unwrap(),
threshold_weight: 67_u64.try_into().unwrap(),
prevote_current_weight: 100.into(),
prevote_ids: voters(),
precommit_current_weight: 100.into(),
precommit_ids: voters(),
};
let background_rounds = vec![(1, past_round_state)].into_iter().collect();
Some(report::VoterState {
background_rounds,
best_round: (2, best_round_state),
})
}
}
fn setup_io_handler<VoterState>(voter_state: VoterState) -> (
tetsy_jsonrpc_core::MetaIoHandler<tc_rpc::Metadata>,
GrandpaJustificationSender<Block>,
) where
VoterState: ReportVoterState + Send + Sync + 'static,
{
setup_io_handler_with_finality_proofs(voter_state, None)
}
fn setup_io_handler_with_finality_proofs<VoterState>(
voter_state: VoterState,
finality_proof: Option<FinalityProof<Header>>,
) -> (
tetsy_jsonrpc_core::MetaIoHandler<tc_rpc::Metadata>,
GrandpaJustificationSender<Block>,
) where
VoterState: ReportVoterState + Send + Sync + 'static,
{
let (justification_sender, justification_stream) = GrandpaJustificationStream::channel();
let finality_proof_provider = Arc::new(TestFinalityProofProvider { finality_proof });
let handler = GrandpaRpcHandler::new(
TestAuthoritySet,
voter_state,
justification_stream,
tc_rpc::testing::TaskExecutor,
finality_proof_provider,
);
let mut io = tetsy_jsonrpc_core::MetaIoHandler::default();
io.extend_with(GrandpaApi::to_delegate(handler));
(io, justification_sender)
}
#[test]
fn uninitialized_rpc_handler() {
let (io, _) = setup_io_handler(EmptyVoterState);
let request = r#"{"jsonrpc":"2.0","method":"grandpa_roundState","params":[],"id":1}"#;
let response = r#"{"jsonrpc":"2.0","error":{"code":1,"message":"GRANDPA RPC endpoint not ready"},"id":1}"#;
let meta = tc_rpc::Metadata::default();
assert_eq!(Some(response.into()), io.handle_request_sync(request, meta));
}
#[test]
fn working_rpc_handler() {
let (io, _) = setup_io_handler(TestVoterState);
let request = r#"{"jsonrpc":"2.0","method":"grandpa_roundState","params":[],"id":1}"#;
let response = "{\"jsonrpc\":\"2.0\",\"result\":{\
\"background\":[{\
\"precommits\":{\"currentWeight\":100,\"missing\":[]},\
\"prevotes\":{\"currentWeight\":100,\"missing\":[]},\
\"round\":1,\"thresholdWeight\":67,\"totalWeight\":100\
}],\
\"best\":{\
\"precommits\":{\"currentWeight\":0,\"missing\":[\"5C62Ck4UrFPiBtoCmeSrgF7x9yv9mn38446dhCpsi2mLHiFT\",\"5C7LYpP2ZH3tpKbvVvwiVe54AapxErdPBbvkYhe6y9ZBkqWt\"]},\
\"prevotes\":{\"currentWeight\":50,\"missing\":[\"5C7LYpP2ZH3tpKbvVvwiVe54AapxErdPBbvkYhe6y9ZBkqWt\"]},\
\"round\":2,\"thresholdWeight\":67,\"totalWeight\":100\
},\
\"setId\":1\
},\"id\":1}";
let meta = tc_rpc::Metadata::default();
assert_eq!(io.handle_request_sync(request, meta), Some(response.into()));
}
fn setup_session() -> (tc_rpc::Metadata, tetsy_jsonrpc_core::futures::sync::mpsc::Receiver<String>) {
let (tx, rx) = tetsy_jsonrpc_core::futures::sync::mpsc::channel(1);
let meta = tc_rpc::Metadata::new(tx);
(meta, rx)
}
#[test]
fn subscribe_and_unsubscribe_to_justifications() {
let (io, _) = setup_io_handler(TestVoterState);
let (meta, _) = setup_session();
let sub_request = r#"{"jsonrpc":"2.0","method":"grandpa_subscribeJustifications","params":[],"id":1}"#;
let resp = io.handle_request_sync(sub_request, meta.clone());
let resp: Output = serde_json::from_str(&resp.unwrap()).unwrap();
let sub_id = match resp {
Output::Success(success) => success.result,
_ => panic!(),
};
let unsub_req = format!(
"{{\"jsonrpc\":\"2.0\",\"method\":\"grandpa_unsubscribeJustifications\",\"params\":[{}],\"id\":1}}",
sub_id
);
assert_eq!(
io.handle_request_sync(&unsub_req, meta.clone()),
Some(r#"{"jsonrpc":"2.0","result":true,"id":1}"#.into()),
);
assert_eq!(
io.handle_request_sync(&unsub_req, meta),
Some(r#"{"jsonrpc":"2.0","result":false,"id":1}"#.into()),
);
}
#[test]
fn subscribe_and_unsubscribe_with_wrong_id() {
let (io, _) = setup_io_handler(TestVoterState);
let (meta, _) = setup_session();
let sub_request = r#"{"jsonrpc":"2.0","method":"grandpa_subscribeJustifications","params":[],"id":1}"#;
let resp = io.handle_request_sync(sub_request, meta.clone());
let resp: Output = serde_json::from_str(&resp.unwrap()).unwrap();
assert!(matches!(resp, Output::Success(_)));
assert_eq!(
io.handle_request_sync(
r#"{"jsonrpc":"2.0","method":"grandpa_unsubscribeJustifications","params":["FOO"],"id":1}"#,
meta.clone()
),
Some(r#"{"jsonrpc":"2.0","result":false,"id":1}"#.into())
);
}
fn create_justification() -> GrandpaJustification<Block> {
let peers = &[Ed25519Keyring::Alice];
let builder = TestClientBuilder::new();
let backend = builder.backend();
let client = builder.build();
let client = Arc::new(client);
let built_block = BlockBuilder::new(
&*client,
client.info().best_hash,
client.info().best_number,
RecordProof::Yes,
Default::default(),
&*backend,
).unwrap().build().unwrap();
let block = built_block.block;
let block_hash = block.hash();
let justification = {
let round = 1;
let set_id = 0;
let precommit = tetsy_finality_grandpa::Precommit {
target_hash: block_hash,
target_number: *block.header.number(),
};
let msg = tetsy_finality_grandpa::Message::Precommit(precommit.clone());
let encoded = tp_finality_grandpa::localized_payload(round, set_id, &msg);
let signature = peers[0].sign(&encoded[..]).into();
let precommit = tetsy_finality_grandpa::SignedPrecommit {
precommit,
signature,
id: peers[0].public().into(),
};
let commit = tetsy_finality_grandpa::Commit {
target_hash: block_hash,
target_number: *block.header.number(),
precommits: vec![precommit],
};
GrandpaJustification::from_commit(&client, round, commit).unwrap()
};
justification
}
#[test]
fn subscribe_and_listen_to_one_justification() {
let (io, justification_sender) = setup_io_handler(TestVoterState);
let (meta, receiver) = setup_session();
let sub_request =
r#"{"jsonrpc":"2.0","method":"grandpa_subscribeJustifications","params":[],"id":1}"#;
let resp = io.handle_request_sync(sub_request, meta.clone());
let mut resp: serde_json::Value = serde_json::from_str(&resp.unwrap()).unwrap();
let sub_id: String = serde_json::from_value(resp["result"].take()).unwrap();
let justification = create_justification();
justification_sender.notify(|| Ok(justification.clone())).unwrap();
let recv = receiver.take(1).wait().flatten().collect::<Vec<_>>();
let recv: Notification = serde_json::from_str(&recv[0]).unwrap();
let mut json_map = match recv.params {
Params::Map(json_map) => json_map,
_ => panic!(),
};
let recv_sub_id: String =
serde_json::from_value(json_map["subscription"].take()).unwrap();
let recv_justification: tet_core::Bytes =
serde_json::from_value(json_map["result"].take()).unwrap();
let recv_justification: GrandpaJustification<Block> =
Decode::decode(&mut &recv_justification[..]).unwrap();
assert_eq!(recv.method, "grandpa_justifications");
assert_eq!(recv_sub_id, sub_id);
assert_eq!(recv_justification, justification);
}
#[test]
fn prove_finality_with_test_finality_proof_provider() {
let finality_proof = FinalityProof {
block: header(42).hash(),
justification: create_justification().encode(),
unknown_headers: vec![header(2)],
};
let (io, _) = setup_io_handler_with_finality_proofs(
TestVoterState,
Some(finality_proof.clone()),
);
let request =
"{\"jsonrpc\":\"2.0\",\"method\":\"grandpa_proveFinality\",\"params\":[42],\"id\":1}";
let meta = tc_rpc::Metadata::default();
let resp = io.handle_request_sync(request, meta);
let mut resp: serde_json::Value = serde_json::from_str(&resp.unwrap()).unwrap();
let result: tet_core::Bytes = serde_json::from_value(resp["result"].take()).unwrap();
let finality_proof_rpc: FinalityProof<Header> = Decode::decode(&mut &result[..]).unwrap();
assert_eq!(finality_proof_rpc, finality_proof);
}
}