#![allow(unused)]
use crate::client::Error::*;
use crate::flow::access::access_api_client::AccessApiClient;
use crate::flow::access::{
ExecuteScriptAtLatestBlockRequest, GetAccountAtLatestBlockRequest,
GetEventsForHeightRangeRequest, GetLatestBlockRequest,
SendAndSubscribeTransactionStatusesRequest,
};
use crate::flow::entities::{transaction, Account, Event, Transaction, TransactionStatus};
use crate::keys::hex_to_bytes;
use crate::network::Network;
use crate::transactions;
use crate::transactions::hash_transaction;
use derive_more::From;
use futures::StreamExt;
use rlp::RlpStream;
use secp256k1::{Message, Secp256k1, SecretKey};
use serde_cadence::{to_cadence_value, CadenceValue, ToCadenceValue};
use sha3::{Digest, Sha3_256};
use std::str::FromStr;
use std::sync::Arc;
use std::time::Duration;
use anyhow::anyhow;
use tokio::select;
use tonic::transport::{Channel, Uri};
use tonic::Request;
#[derive(Debug, From)]
pub enum Error {
#[from]
TonicError(tonic::transport::Error),
#[from]
InvalidEndpoint(tonic::codegen::http::uri::InvalidUri),
CantGetAccount {
address: String,
},
#[from]
TonicStatusError(tonic::Status),
TransactionExpired,
TransactionTimeoutExceeded,
TransactionStreamClosedUnexpectedly,
DigestLenError(Vec<u8>),
#[from]
TransactionsError(transactions::Error),
NoBlockReturned,
#[from]
KeysError(crate::keys::Error),
#[from]
CadenceJsonError(serde_cadence::Error),
#[from]
SerdeError(serde_json::Error),
NoKeyAtIndex {
idx: u32,
},
#[from]
ResultUTF8Error(std::string::FromUtf8Error),
}
pub type Result<T> = std::result::Result<T, Error>;
impl From<Error> for anyhow::Error {
fn from(value: Error) -> Self {
anyhow!(format!("{:?}", value))
}
}
#[derive(Clone)]
pub struct FlowRcpClient {
access_client: AccessApiClient<Channel>,
}
unsafe impl Send for FlowRcpClient {
}
unsafe impl Sync for FlowRcpClient {
}
impl FlowRcpClient {
pub async fn try_new(network: Network) -> Result<FlowRcpClient> {
let endpoint: String = network.into();
let channel = Channel::builder(Uri::from_str(&endpoint)?)
.connect()
.await?;
let client = FlowRcpClient {
access_client: AccessApiClient::new(channel),
};
Ok(client)
}
pub async fn create_transaction_with_params(
&mut self,
script: &str,
params: &[&dyn ToCadenceValue],
sender_address_hex: &str,
gas_limit: u64,
) -> Result<(Transaction, Vec<u8>)> {
let reference_block_id = self.get_reference_block_id().await?;
let account_address = hex_to_bytes(sender_address_hex)?;
let account = self.get_account(account_address.clone()).await?;
let key_index = 0u32;
let sequence_number = account
.keys
.get(key_index as usize)
.ok_or(NoKeyAtIndex { idx: key_index })?
.sequence_number;
let mut tx = Transaction {
script: script.as_bytes().to_vec(),
arguments: vec![], reference_block_id,
gas_limit,
proposal_key: Some(transaction::ProposalKey {
address: account_address.clone(),
key_id: key_index,
sequence_number: sequence_number.into(),
}),
payer: account_address.clone(),
authorizers: vec![account_address.clone()],
payload_signatures: vec![],
envelope_signatures: vec![],
};
for ¶m in params {
let cadence_message = to_cadence_value(param)?;
let encoded_message = serde_json::to_string(&cadence_message)?.into_bytes();
tx.arguments.push(encoded_message);
}
let mut hasher = Sha3_256::new();
hasher.update(hex_to_bytes(transactions::TRANSACTION_DOMAIN_TAG)?);
let mut rlp: RlpStream = RlpStream::new_list(2);
rlp.begin_list(9);
rlp.append(&tx.script);
rlp.begin_list(tx.arguments.len());
for arg in &tx.arguments {
rlp.append(&arg.as_slice());
}
rlp.append(&tx.reference_block_id);
rlp.append(&tx.gas_limit);
if let Some(pk) = &tx.proposal_key {
rlp.append(&pk.address);
rlp.append(&pk.key_id);
rlp.append(&pk.sequence_number);
} else {
rlp.begin_list(0);
}
rlp.append(&tx.payer);
rlp.begin_list(tx.authorizers.len());
for auth in &tx.authorizers {
rlp.append(&auth.as_slice());
}
rlp.begin_list(tx.payload_signatures.len());
for sig in &tx.payload_signatures {
rlp.begin_list(3);
rlp.append(&sig.address);
rlp.append(&sig.key_id);
rlp.append(&sig.signature);
}
let encoded = rlp.out();
hasher.update(&encoded);
Ok((tx, hasher.finalize().to_vec()))
}
pub async fn send_transaction_and_subscribe(
&mut self,
transaction: Transaction,
target_status: TransactionStatus,
timeout: Duration,
) -> Result<Vec<u8>> {
let request = Request::new(SendAndSubscribeTransactionStatusesRequest {
event_encoding_version: 1,
transaction: Some(transaction),
});
let mut stream = self
.access_client
.send_and_subscribe_transaction_statuses(request)
.await?
.into_inner();
let timeout = tokio::time::sleep(timeout);
tokio::pin!(timeout);
loop {
select! {
result = stream.next() => {
match result {
Some(Ok(status_response)) => {
if let Some(response) = status_response.transaction_results {
let status = TransactionStatus::try_from(response.status)
.unwrap_or(TransactionStatus::Unknown);
println!("Transaction status update: {:?}", status);
if status == target_status {
return Ok(response.transaction_id);
}
match status {
TransactionStatus::Expired => {
return Err(TransactionExpired);
}
_ => continue,
}
} else {
continue;
}
}
Some(Err(e)) => {
return Err(e.into());
}
None => {
return Err(TransactionStreamClosedUnexpectedly);
}
}
}
_ = &mut timeout => {
return Err(TransactionTimeoutExceeded);
}
}
}
}
pub async fn execute_script(
&mut self,
script: &str,
arguments: &[&dyn ToCadenceValue],
) -> Result<CadenceValue> {
let script = script.as_bytes().to_vec();
let mut cadence_arguments = vec![];
for arg in arguments {
cadence_arguments.push(serde_json::to_vec(&arg.to_cadence_value()?)?)
}
let request = Request::new(ExecuteScriptAtLatestBlockRequest {
script,
arguments: cadence_arguments,
});
let response = self
.access_client
.execute_script_at_latest_block(request)
.await?;
let result = response.into_inner().value;
let result_string = String::from_utf8(result)?;
let result: CadenceValue = serde_cadence::from_str(&result_string)?;
Ok(result)
}
async fn get_events_for_height_range(
&mut self,
event_type: String,
start_height: u64,
end_height: u64,
) -> std::result::Result<Vec<Event>, Box<dyn std::error::Error>> {
let request = Request::new(GetEventsForHeightRangeRequest {
r#type: event_type,
start_height,
end_height,
event_encoding_version: 0,
});
let response = self
.access_client
.get_events_for_height_range(request)
.await?;
let results = response.into_inner().results;
let mut events = Vec::new();
for block_events in results {
events.extend(block_events.events);
}
Ok(events)
}
pub async fn get_account(&mut self, address: Vec<u8>) -> Result<Account> {
let request = Request::new(GetAccountAtLatestBlockRequest {
address: address.clone(),
});
let response = self
.access_client
.get_account_at_latest_block(request)
.await?;
let account = response.into_inner().account.ok_or(CantGetAccount {
address: hex::encode(&address),
})?;
Ok(account)
}
pub async fn get_reference_block_id(&mut self) -> Result<Vec<u8>> {
let request = Request::new(GetLatestBlockRequest {
full_block_response: false,
is_sealed: true,
});
let response = self.access_client.get_latest_block(request).await?;
let block = response.into_inner().block.ok_or(NoBlockReturned {})?;
Ok(block.id)
}
}