use std::{thread::sleep, time::Duration};
use anyhow::anyhow;
use codec::Decode;
use log::info;
use serde::{Deserialize, Serialize};
use subxt::{
blocks::ExtrinsicEvents,
ext::sp_core::Bytes,
metadata::DecodeWithMetadata,
rpc::RpcParams,
storage::{address::Yes, StaticStorageAddress, StorageAddress},
tx::{PolkadotExtrinsicParamsBuilder, TxPayload},
OnlineClient, PolkadotConfig, SubstrateConfig,
};
use crate::{key_pair::KeyPair, AccountId, BlockHash, TxHash, TxStatus};
#[derive(Clone)]
pub struct Connection {
client: OnlineClient<PolkadotConfig>,
}
#[derive(Clone)]
pub struct SignedConnection {
connection: Connection,
signer: KeyPair,
}
#[derive(Clone)]
pub struct RootConnection {
connection: SignedConnection,
}
pub trait AsConnection {
fn as_connection(&self) -> &Connection;
}
pub trait AsSigned {
fn as_signed(&self) -> &SignedConnection;
}
impl AsConnection for Connection {
fn as_connection(&self) -> &Connection {
self
}
}
impl<S: AsSigned> AsConnection for S {
fn as_connection(&self) -> &Connection {
&self.as_signed().connection
}
}
impl AsSigned for SignedConnection {
fn as_signed(&self) -> &SignedConnection {
self
}
}
impl AsSigned for RootConnection {
fn as_signed(&self) -> &SignedConnection {
&self.connection
}
}
#[async_trait::async_trait]
pub trait ConnectionApi: Sync {
async fn get_storage_entry<T: DecodeWithMetadata + Sync, Defaultable: Sync, Iterable: Sync>(
&self,
addrs: &StaticStorageAddress<T, Yes, Defaultable, Iterable>,
at: Option<BlockHash>,
) -> T::Target;
async fn get_storage_entry_maybe<
T: DecodeWithMetadata + Sync,
Defaultable: Sync,
Iterable: Sync,
>(
&self,
addrs: &StaticStorageAddress<T, Yes, Defaultable, Iterable>,
at: Option<BlockHash>,
) -> Option<T::Target>;
async fn rpc_call<R: Decode>(&self, func_name: String, params: RpcParams) -> anyhow::Result<R>;
async fn rpc_call_no_return(&self, func_name: String, params: RpcParams) -> anyhow::Result<()>;
}
#[derive(Copy, Clone, Eq, PartialEq, Debug, Deserialize, Serialize)]
pub struct TxInfo {
pub block_hash: BlockHash,
pub tx_hash: TxHash,
}
impl From<ExtrinsicEvents<PolkadotConfig>> for TxInfo {
fn from(ee: ExtrinsicEvents<PolkadotConfig>) -> Self {
Self {
block_hash: ee.block_hash(),
tx_hash: ee.extrinsic_hash(),
}
}
}
#[async_trait::async_trait]
pub trait SignedConnectionApi: ConnectionApi {
async fn send_tx<Call: TxPayload + Send + Sync>(
&self,
tx: Call,
status: TxStatus,
) -> anyhow::Result<TxInfo>;
async fn send_tx_with_params<Call: TxPayload + Send + Sync>(
&self,
tx: Call,
params: PolkadotExtrinsicParamsBuilder<SubstrateConfig>,
status: TxStatus,
) -> anyhow::Result<TxInfo>;
fn account_id(&self) -> &AccountId;
fn signer(&self) -> &KeyPair;
async fn try_as_root(&self) -> anyhow::Result<RootConnection>;
}
#[async_trait::async_trait]
impl<C: AsConnection + Sync> ConnectionApi for C {
async fn get_storage_entry<T: DecodeWithMetadata + Sync, Defaultable: Sync, Iterable: Sync>(
&self,
addrs: &StaticStorageAddress<T, Yes, Defaultable, Iterable>,
at: Option<BlockHash>,
) -> T::Target {
self.get_storage_entry_maybe(addrs, at)
.await
.expect("There should be a value")
}
async fn get_storage_entry_maybe<
T: DecodeWithMetadata + Sync,
Defaultable: Sync,
Iterable: Sync,
>(
&self,
addrs: &StaticStorageAddress<T, Yes, Defaultable, Iterable>,
at: Option<BlockHash>,
) -> Option<T::Target> {
info!(target: "subxtxt", "accessing storage at {}::{} at block {:?}", addrs.pallet_name(), addrs.entry_name(), at);
self.as_connection()
.as_client()
.storage()
.fetch(addrs, at)
.await
.expect("Should access storage")
}
async fn rpc_call<R: Decode>(&self, func_name: String, params: RpcParams) -> anyhow::Result<R> {
info!(target: "subxtxt", "submitting rpc call `{}`, with params {:?}", func_name, params.clone().build());
let bytes: Bytes = self
.as_connection()
.as_client()
.rpc()
.request(&func_name, params)
.await?;
Ok(R::decode(&mut bytes.as_ref())?)
}
async fn rpc_call_no_return(&self, func_name: String, params: RpcParams) -> anyhow::Result<()> {
info!(target: "subxtxt", "submitting rpc call `{}`, with params {:?}", func_name, params.clone().build());
let _: () = self
.as_connection()
.as_client()
.rpc()
.request(&func_name, params)
.await?;
Ok(())
}
}
#[async_trait::async_trait]
impl<S: AsSigned + Sync> SignedConnectionApi for S {
async fn send_tx<Call: TxPayload + Send + Sync>(
&self,
tx: Call,
status: TxStatus,
) -> anyhow::Result<TxInfo> {
self.send_tx_with_params(tx, Default::default(), status)
.await
}
async fn send_tx_with_params<Call: TxPayload + Send + Sync>(
&self,
tx: Call,
params: PolkadotExtrinsicParamsBuilder<SubstrateConfig>,
status: TxStatus,
) -> anyhow::Result<TxInfo> {
if let Some(details) = tx.validation_details() {
info!(target:"subxtxt", "Sending extrinsic {}.{} with params: {:?}", details.pallet_name, details.call_name, params);
}
let progress = self
.as_connection()
.as_client()
.tx()
.sign_and_submit_then_watch(&tx, self.as_signed().signer().pair_signer(), params)
.await
.map_err(|e| anyhow!("Failed to submit transaction: {:?}", e))?;
let info: TxInfo = match status {
TxStatus::InBlock => progress
.wait_for_in_block()
.await?
.wait_for_success()
.await?
.into(),
TxStatus::Finalized => progress.wait_for_finalized_success().await?.into(),
TxStatus::Submitted => {
return Ok(TxInfo {
block_hash: Default::default(),
tx_hash: progress.extrinsic_hash(),
})
}
};
info!(target: "subxtxt", "tx with hash {:?} included in block {:?}", info.tx_hash, info.block_hash);
Ok(info)
}
fn account_id(&self) -> &AccountId {
self.as_signed().signer().account_id()
}
fn signer(&self) -> &KeyPair {
&self.as_signed().signer
}
async fn try_as_root(&self) -> anyhow::Result<RootConnection> {
todo!()
}
}
impl Connection {
const DEFAULT_RETRIES: u32 = 10;
const RETRY_WAIT_SECS: u64 = 1;
pub async fn new(address: &str) -> Connection {
Self::new_with_retries(address, Self::DEFAULT_RETRIES).await
}
async fn new_with_retries(address: &str, mut retries: u32) -> Connection {
loop {
let client = OnlineClient::<PolkadotConfig>::from_url(&address).await;
match (retries, client) {
(_, Ok(client)) => return Connection { client },
(0, Err(e)) => panic!("{e:?}"),
_ => {
sleep(Duration::from_secs(Self::RETRY_WAIT_SECS));
retries -= 1;
}
}
}
}
pub fn as_client(&self) -> &OnlineClient<PolkadotConfig> {
&self.client
}
}
impl SignedConnection {
pub async fn new(address: &str, signer: KeyPair) -> Self {
Self::from_connection(Connection::new(address).await, signer)
}
pub fn from_connection(connection: Connection, signer: KeyPair) -> Self {
Self { connection, signer }
}
}