use anyhow::Result;
use std::time::Duration;
use cmpb::{
api::rpc_node_client,
common::{
self, EndorsementEntry, KeyValuePair, Payload, SubscribeResult, TxRequest, TxResponse,
TxType,
},
syscontract,
};
use tokio::sync::mpsc::{self, Receiver};
use tonic::transport::Channel;
use crate::{
crypto::Signer,
utils::{self, DEFAULT_SEQUENCE},
};
pub struct ChainClient<S: Signer> {
org_id: String,
chain_id: String,
conn: Channel,
enable_normal_key: bool,
signer: S,
}
pub struct ChainClientBuilder<S: Signer> {
org_id: Option<String>,
chain_id: Option<String>,
node_addr: Option<String>,
enable_normal_key: bool,
signer: S,
}
impl<S: Signer> ChainClientBuilder<S> {
pub fn new(signer: S) -> Self {
Self {
org_id: None,
chain_id: None,
node_addr: None,
enable_normal_key: false,
signer,
}
}
pub fn with_node_addr(mut self, addr: &str) -> Self {
self.node_addr = Some(addr.to_string());
self
}
pub fn with_chain_id(mut self, chain_id: &str) -> Self {
self.chain_id = Some(chain_id.to_string());
self
}
pub fn enable_normal_key(mut self) -> Self {
self.enable_normal_key = true;
self
}
pub async fn build(self) -> Result<ChainClient<S>> {
let uri = self.node_addr.unwrap_or_else(|| {
std::env::var("NODE_ADDR").unwrap_or_else(|_| "http://localhost:12301".to_string())
});
let timeout = 5;
let dur = Duration::from_secs(timeout);
let channel = Channel::from_shared(uri)?
.connect_timeout(dur)
.connect()
.await?;
Ok(ChainClient {
org_id: self.org_id.unwrap_or_default(),
chain_id: self.chain_id.unwrap_or_default(),
conn: channel,
enable_normal_key: self.enable_normal_key,
signer: self.signer,
})
}
}
impl<S: Signer> ChainClient<S> {}
impl<S: Signer> ChainClient<S> {
pub async fn subscribe_tx() {}
pub async fn subscribe_block(
&self,
start_block: i64,
end_block: i64,
with_rwset: bool,
only_header: bool,
) -> Result<Receiver<SubscribeResult>, Box<dyn std::error::Error>> {
let kvs = [
KeyValuePair {
key: syscontract::subscribe_block::Parameter::StartBlock
.as_str_name()
.to_string(),
value: start_block.to_le_bytes().to_vec(),
},
KeyValuePair {
key: syscontract::subscribe_block::Parameter::EndBlock
.as_str_name()
.to_string(),
value: end_block.to_le_bytes().to_vec(),
},
KeyValuePair {
key: syscontract::subscribe_block::Parameter::WithRwset
.as_str_name()
.to_string(),
value: with_rwset.to_string().into_bytes(),
},
KeyValuePair {
key: syscontract::subscribe_block::Parameter::OnlyHeader
.as_str_name()
.to_string(),
value: only_header.to_string().into_bytes(),
},
];
let payload = Payload {
chain_id: self.chain_id.clone(),
tx_type: common::TxType::Subscribe.into(),
tx_id: utils::get_tx_id(),
timestamp: utils::get_timestamp().try_into()?,
expiration_time: 0,
contract_name: syscontract::SystemContract::SubscribeManage
.as_str_name()
.to_string(),
method: syscontract::SubscribeFunction::SubscribeBlock
.as_str_name()
.to_string(),
parameters: kvs.to_vec(),
sequence: DEFAULT_SEQUENCE,
limit: None,
};
self.subscribe(payload).await
}
pub async fn subscribe(
&self,
payload: Payload,
) -> Result<Receiver<SubscribeResult>, Box<dyn std::error::Error>> {
let mut client = rpc_node_client::RpcNodeClient::new(self.conn.clone());
let request = self.generate_tx_request(payload, Vec::new()).unwrap();
let mut stream = client.subscribe(request).await?.into_inner();
let (tx, rx) = mpsc::channel(1);
tokio::spawn(async move {
while let Some(msg) = stream.message().await.unwrap() {
println!("received msg");
let _ = tx.send(msg).await;
}
});
Ok(rx)
}
pub async fn query_contract(
&self,
contract_name: String,
method: String,
kvs: Vec<KeyValuePair>,
) -> Result<TxResponse, Box<dyn std::error::Error>> {
let timestamp = utils::get_timestamp();
let payload = Payload {
chain_id: self.chain_id.clone(),
tx_type: TxType::InvokeContract.into(),
tx_id: utils::get_tx_id(),
timestamp: timestamp.try_into()?,
expiration_time: 0,
contract_name,
method,
parameters: kvs,
sequence: DEFAULT_SEQUENCE,
limit: None,
};
let tx_req = self.generate_tx_request(payload, Vec::new()).unwrap();
let resp = self.send_tx_request(tx_req).await?;
Ok(resp)
}
pub async fn invoke_contract(
&self,
contract_name: String,
method: String,
kvs: Vec<KeyValuePair>,
) -> Result<TxResponse, Box<dyn std::error::Error>> {
let dur = std::time::SystemTime::UNIX_EPOCH.elapsed().expect(
"Getting elapsed time since UNIX_EPOCH. If this fails, we've somehow violated causality",
);
let timestamp = dur.as_secs().try_into().unwrap();
let payload = Payload {
chain_id: self.chain_id.clone(),
tx_type: TxType::QueryContract.into(),
tx_id: utils::get_tx_id(),
timestamp,
expiration_time: 0,
contract_name,
method,
parameters: kvs,
sequence: DEFAULT_SEQUENCE,
limit: None,
};
let tx_req = self.generate_tx_request(payload, Vec::new()).unwrap();
let resp = self.send_tx_request(tx_req).await?;
Ok(resp)
}
pub fn generate_tx_request(
&self,
payload: Payload,
endorsements: Vec<EndorsementEntry>,
) -> Result<TxRequest, String> {
let data = match self.signer.sign(&payload) {
Err(e) => return Err(e),
Ok(data) => data,
};
let sender = EndorsementEntry {
signer: Some(self.signer.new_member()),
signature: data,
};
Ok(TxRequest {
payload: Some(payload),
sender: Some(sender),
endorsers: endorsements,
payer: None,
})
}
async fn send_tx_request(
&self,
tx_request: TxRequest,
) -> Result<TxResponse, Box<dyn std::error::Error>> {
let mut client = rpc_node_client::RpcNodeClient::new(self.conn.clone());
let request = tonic::Request::new(tx_request);
let response = client.send_request(request).await?;
let r = response.get_ref().to_owned();
Ok(r)
}
}