use cosmrs::rpc::endpoint::broadcast::tx_commit::TxResult;
use log::{debug, info};
use serde::Serialize;
use std::ffi::OsStr;
use std::fmt::{self, Debug};
use std::fs;
use std::panic::Location;
use std::path::Path;
use super::error::{ProcessError, ReportError, StoreError};
use crate::client::cosm_client::{tokio_block, CosmClient};
use crate::client::error::ClientError;
use crate::config::cfg::Config;
use crate::config::key::SigningKey;
use crate::orchestrator::deploy::ContractMap;
use crate::profilers::profiler::{CommandType, Profiler, Report};
pub struct CosmOrc {
pub contract_map: ContractMap,
client: CosmClient,
profilers: Vec<Box<dyn Profiler + Send>>,
}
pub enum WasmMsg<X, Y, Z>
where
X: Serialize,
Y: Serialize,
Z: Serialize,
{
InstantiateMsg(X),
ExecuteMsg(Y),
QueryMsg(Z),
}
impl Debug for CosmOrc {
fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
write!(f, "{:?}", self.contract_map)
}
}
impl CosmOrc {
pub fn new(cfg: Config) -> Result<Self, ClientError> {
Ok(Self {
contract_map: ContractMap::new(&cfg.code_ids),
client: CosmClient::new(cfg.chain_cfg)?,
profilers: vec![],
})
}
pub fn add_profiler(mut self, p: Box<dyn Profiler + Send>) -> Self {
self.profilers.push(p);
self
}
#[track_caller]
pub fn store_contracts(
&mut self,
wasm_dir: &str,
key: &SigningKey,
) -> Result<Vec<TxResult>, StoreError> {
let caller_loc = Location::caller();
let mut responses = vec![];
let wasm_path = Path::new(wasm_dir);
for wasm in fs::read_dir(wasm_path).map_err(StoreError::wasmdir)? {
let wasm_path = wasm?.path();
if wasm_path.extension() == Some(OsStr::new("wasm")) {
info!("Storing {:?}", wasm_path);
let wasm = fs::read(&wasm_path).map_err(StoreError::wasmfile)?;
let res =
tokio_block(async { self.client.store(wasm, &key.clone().try_into()?).await })?;
let contract = wasm_path
.file_stem()
.ok_or(StoreError::InvalidWasmFileName)?
.to_str()
.ok_or(StoreError::InvalidWasmFileName)?;
self.contract_map
.register_contract(contract.to_string(), res.code_id);
for prof in &mut self.profilers {
prof.instrument(
contract.to_string(),
"Store".to_string(),
CommandType::Store,
&res.data,
caller_loc,
0,
)
.map_err(StoreError::instrument)?;
}
responses.push(res.data);
}
}
Ok(responses)
}
#[track_caller]
pub fn process_msgs<X, Y, Z, S>(
&mut self,
contract_name: S,
op_name: S,
msgs: &[WasmMsg<X, Y, Z>],
key: &SigningKey,
) -> Result<Vec<TxResult>, ProcessError>
where
X: Serialize,
Y: Serialize,
Z: Serialize,
S: Into<String>,
{
let caller_loc = Location::caller();
let contract_name = contract_name.into();
let op_name = op_name.into();
let mut responses = vec![];
for (idx, msg) in msgs.iter().enumerate() {
let res = self.process_msg_internal(
contract_name.clone(),
op_name.clone(),
msg,
key,
idx,
caller_loc,
)?;
responses.push(res);
}
Ok(responses)
}
#[track_caller]
pub fn process_msg<X, Y, Z, S>(
&mut self,
contract_name: S,
op_name: S,
msg: &WasmMsg<X, Y, Z>,
key: &SigningKey,
) -> Result<TxResult, ProcessError>
where
X: Serialize,
Y: Serialize,
Z: Serialize,
S: Into<String>,
{
let caller_loc = Location::caller();
self.process_msg_internal(
contract_name.into(),
op_name.into(),
msg,
key,
0,
caller_loc,
)
}
fn process_msg_internal<X, Y, Z>(
&mut self,
contract_name: String,
op_name: String,
msg: &WasmMsg<X, Y, Z>,
key: &SigningKey,
idx: usize,
caller_loc: &Location,
) -> Result<TxResult, ProcessError>
where
X: Serialize,
Y: Serialize,
Z: Serialize,
{
let code_id = self.contract_map.code_id(&contract_name)?;
let res = match msg {
WasmMsg::InstantiateMsg(m) => {
let payload = serde_json::to_vec(&m).map_err(ProcessError::json)?;
let res = tokio_block(async {
self.client
.instantiate(code_id, payload, &key.clone().try_into()?)
.await
})?;
self.contract_map.add_address(&contract_name, res.address)?;
res.data
}
WasmMsg::ExecuteMsg(m) => {
let payload = serde_json::to_vec(&m).map_err(ProcessError::json)?;
let addr = self.contract_map.address(&contract_name)?;
let res = tokio_block(async {
self.client
.execute(addr, payload, &key.clone().try_into()?)
.await
})?;
res.data
}
WasmMsg::QueryMsg(m) => {
let payload = serde_json::to_vec(&m).map_err(ProcessError::json)?;
let addr = self.contract_map.address(&contract_name)?;
let res = tokio_block(async { self.client.query(addr, payload).await })?;
res.data
}
};
for prof in &mut self.profilers {
prof.instrument(
contract_name.clone(),
op_name.clone(),
msg.into(),
&res,
caller_loc,
idx,
)
.map_err(ProcessError::instrument)?;
}
debug!("{:?}", res);
Ok(res)
}
pub fn profiler_reports(&self) -> Result<Vec<Report>, ReportError> {
let mut reports = vec![];
for prof in &self.profilers {
reports.push(
prof.report()
.map_err(|e| ReportError::ReportError { source: e })?,
);
}
Ok(reports)
}
}