use core::str::FromStr;
use cometbft::Hash;
use cometbft_rpc::{
client::CompatMode,
event::{self, Event, EventData},
query::Query,
Client, Error, HttpClient, Order, Paging, Scheme, Subscription, SubscriptionClient, Url,
WebSocketClient,
};
use futures::StreamExt;
use structopt::StructOpt;
use tokio::{task::JoinHandle, time::Duration};
use tracing::{debug, error, info, level_filters::LevelFilter, warn};
#[derive(Debug, StructOpt)]
struct Opt {
#[structopt(
short,
long,
default_value = "http://127.0.0.1:26657",
env = "COMETBFT_RPC_URL"
)]
url: Url,
#[structopt(long)]
proxy_url: Option<Url>,
#[structopt(short, long)]
verbose: bool,
#[structopt(subcommand)]
req: Request,
}
#[derive(Debug, StructOpt)]
enum Request {
#[structopt(flatten)]
ClientRequest(ClientRequest),
Subscribe {
query: Query,
#[structopt(long)]
max_events: Option<u32>,
#[structopt(long)]
max_time: Option<u32>,
},
}
#[derive(Debug, StructOpt)]
enum ClientRequest {
AbciInfo,
AbciQuery {
#[structopt(long)]
path: Option<String>,
data: String,
#[structopt(long)]
height: Option<u32>,
#[structopt(long)]
prove: bool,
},
Block { height: u32 },
BlockByHash { hash: String },
Blockchain {
min: u32,
max: u32,
},
BlockResults {
height: u32,
},
BlockSearch {
query: Query,
#[structopt(long, default_value = "1")]
page: u32,
#[structopt(long, default_value = "10")]
per_page: u8,
#[structopt(long, default_value = "asc")]
order: Order,
},
BroadcastTxAsync {
tx: String,
},
BroadcastTxCommit {
tx: String,
},
BroadcastTxSync {
tx: String,
},
Commit { height: u32 },
ConsensusParams {
height: u32,
},
ConsensusState,
Genesis,
Health,
LatestBlock,
LatestBlockResults,
LatestConsensusParams,
LatestCommit,
NetInfo,
Status,
Tx {
hash: String,
#[structopt(long)]
prove: bool,
},
TxSearch {
query: Query,
#[structopt(long, default_value = "1")]
page: u32,
#[structopt(long, default_value = "10")]
per_page: u8,
#[structopt(long, default_value = "asc")]
order: Order,
#[structopt(long)]
prove: bool,
},
Validators {
height: u32,
#[structopt(long)]
all: bool,
#[structopt(long)]
page: Option<usize>,
#[structopt(long)]
per_page: Option<u8>,
},
}
#[tokio::main]
async fn main() {
let opt: Opt = Opt::from_args();
let log_level = if opt.verbose {
LevelFilter::DEBUG
} else {
LevelFilter::INFO
};
tracing_subscriber::fmt()
.with_max_level(log_level)
.with_writer(std::io::stderr)
.init();
let proxy_url = match get_http_proxy_url(opt.url.scheme(), opt.proxy_url.clone()) {
Ok(u) => u,
Err(e) => {
error!("Failed to obtain proxy URL: {}", e);
std::process::exit(-1);
},
};
let result = match opt.url.scheme() {
Scheme::Http | Scheme::Https => http_request(opt.url, proxy_url, opt.req).await,
Scheme::WebSocket | Scheme::SecureWebSocket => match opt.proxy_url {
Some(_) => Err(Error::invalid_params(
"proxies are only supported for use with HTTP clients at present".to_string(),
)),
None => websocket_request(opt.url, opt.req).await,
},
};
if let Err(e) = result {
error!("Failed: {}", e);
std::process::exit(-1);
}
}
fn get_http_proxy_url(url_scheme: Scheme, proxy_url: Option<Url>) -> Result<Option<Url>, Error> {
match proxy_url {
Some(u) => Ok(Some(u)),
None => match url_scheme {
Scheme::Http => std::env::var("HTTP_PROXY").ok(),
Scheme::Https => std::env::var("HTTPS_PROXY")
.ok()
.or_else(|| std::env::var("HTTP_PROXY").ok()),
_ => {
if std::env::var("HTTP_PROXY").is_ok() || std::env::var("HTTPS_PROXY").is_ok() {
warn!(
"Ignoring HTTP proxy environment variables for non-HTTP client connection"
);
}
None
},
}
.map(|u| u.parse())
.transpose(),
}
}
async fn http_request(url: Url, proxy_url: Option<Url>, req: Request) -> Result<(), Error> {
let mut client = match proxy_url {
Some(proxy_url) => {
info!(
"Using HTTP client with proxy {} to submit request to {}",
proxy_url, url
);
HttpClient::new_with_proxy(url, proxy_url)
},
None => {
info!("Using HTTP client to submit request to: {}", url);
HttpClient::new(url)
},
}?;
let status = client.status().await?;
let compat_mode = CompatMode::from_version(status.node_info.version)?;
debug!("Using compatibility mode {}", compat_mode);
client.set_compat_mode(compat_mode);
match req {
Request::ClientRequest(r) => client_request(&client, r).await,
_ => Err(Error::invalid_params("HTTP/S clients do not support subscription capabilities (please use the WebSocket client instead)".to_owned()))
}
}
async fn websocket_request(url: Url, req: Request) -> Result<(), Error> {
info!("Using WebSocket client to submit request to: {}", url);
let (client, driver_hdl) = start_websocket_client(url).await?;
let result = match req {
Request::ClientRequest(r) => client_request(&client, r).await,
Request::Subscribe {
query,
max_events,
max_time,
} => subscription_client_request(&client, query, max_events, max_time).await,
};
stop_websocket_client(client, driver_hdl).await?;
result
}
async fn start_websocket_client(
url: Url,
) -> Result<(WebSocketClient, JoinHandle<Result<(), Error>>), Error> {
let (client, driver) = WebSocketClient::new(url.clone()).await?;
let driver_hdl = tokio::spawn(async move { driver.run().await });
let status = client.status().await?;
let compat_mode = CompatMode::from_version(status.node_info.version)?;
if compat_mode == CompatMode::latest() {
debug!("Using compatibility mode {}", compat_mode);
Ok((client, driver_hdl))
} else {
debug!("Reconnecting with compatibility mode {}", compat_mode);
stop_websocket_client(client, driver_hdl).await?;
let (client, driver) = WebSocketClient::builder(url.try_into()?)
.compat_mode(compat_mode)
.build()
.await?;
let driver_hdl = tokio::spawn(async move { driver.run().await });
Ok((client, driver_hdl))
}
}
async fn stop_websocket_client(
client: WebSocketClient,
driver_hdl: JoinHandle<Result<(), Error>>,
) -> Result<(), Error> {
client.close()?;
driver_hdl.await.map_err(Error::join)?
}
async fn client_request<C>(client: &C, req: ClientRequest) -> Result<(), Error>
where
C: Client + Sync,
{
let result = match req {
ClientRequest::AbciInfo => {
serde_json::to_string_pretty(&client.abci_info().await?).map_err(Error::serde)?
},
ClientRequest::AbciQuery {
path,
data,
height,
prove,
} => serde_json::to_string_pretty(
&client
.abci_query(path, data, height.map(Into::into), prove)
.await?,
)
.map_err(Error::serde)?,
ClientRequest::Block { height } => {
serde_json::to_string_pretty(&client.block(height).await?).map_err(Error::serde)?
},
ClientRequest::BlockByHash { hash } => serde_json::to_string_pretty(
&client
.block_by_hash(
cometbft::Hash::from_str(&hash).map_err(|e| Error::parse(e.to_string()))?,
)
.await?,
)
.map_err(Error::serde)?,
ClientRequest::Blockchain { min, max } => {
serde_json::to_string_pretty(&client.blockchain(min, max).await?)
.map_err(Error::serde)?
},
ClientRequest::BlockResults { height } => {
serde_json::to_string_pretty(&client.block_results(height).await?)
.map_err(Error::serde)?
},
ClientRequest::BlockSearch {
query,
page,
per_page,
order,
} => {
serde_json::to_string_pretty(&client.block_search(query, page, per_page, order).await?)
.map_err(Error::serde)?
},
ClientRequest::BroadcastTxAsync { tx } => {
serde_json::to_string_pretty(&client.broadcast_tx_async(tx).await?)
.map_err(Error::serde)?
},
ClientRequest::BroadcastTxCommit { tx } => {
serde_json::to_string_pretty(&client.broadcast_tx_commit(tx).await?)
.map_err(Error::serde)?
},
ClientRequest::BroadcastTxSync { tx } => {
serde_json::to_string_pretty(&client.broadcast_tx_sync(tx).await?)
.map_err(Error::serde)?
},
ClientRequest::ConsensusParams { height } => {
serde_json::to_string_pretty(&client.consensus_params(height).await?)
.map_err(Error::serde)?
},
ClientRequest::Commit { height } => {
serde_json::to_string_pretty(&client.commit(height).await?).map_err(Error::serde)?
},
ClientRequest::LatestBlock => {
serde_json::to_string_pretty(&client.latest_block().await?).map_err(Error::serde)?
},
ClientRequest::LatestBlockResults => {
serde_json::to_string_pretty(&client.latest_block_results().await?)
.map_err(Error::serde)?
},
ClientRequest::LatestCommit => {
serde_json::to_string_pretty(&client.latest_commit().await?).map_err(Error::serde)?
},
ClientRequest::LatestConsensusParams => {
serde_json::to_string_pretty(&client.latest_consensus_params().await?)
.map_err(Error::serde)?
},
ClientRequest::ConsensusState => {
serde_json::to_string_pretty(&client.consensus_state().await?).map_err(Error::serde)?
},
ClientRequest::Genesis => {
serde_json::to_string_pretty(&client.genesis::<serde_json::Value>().await?)
.map_err(Error::serde)?
},
ClientRequest::Health => {
serde_json::to_string_pretty(&client.health().await?).map_err(Error::serde)?
},
ClientRequest::NetInfo => {
serde_json::to_string_pretty(&client.net_info().await?).map_err(Error::serde)?
},
ClientRequest::Status => {
serde_json::to_string_pretty(&client.status().await?).map_err(Error::serde)?
},
ClientRequest::Tx { hash, prove } => serde_json::to_string_pretty(
&client
.tx(
Hash::from_str(&hash).map_err(|e| Error::parse(e.to_string()))?,
prove,
)
.await?,
)
.map_err(Error::serde)?,
ClientRequest::TxSearch {
query,
page,
per_page,
order,
prove,
} => serde_json::to_string_pretty(
&client
.tx_search(query, prove, page, per_page, order)
.await?,
)
.map_err(Error::serde)?,
ClientRequest::Validators {
height,
all,
page,
per_page,
} => {
let paging = if all {
Paging::All
} else {
match page.zip(per_page) {
Some((page, per_page)) => Paging::Specific {
page_number: page.into(),
per_page: per_page.into(),
},
None => Paging::Default,
}
};
serde_json::to_string_pretty(&client.validators(height, paging).await?)
.map_err(Error::serde)?
},
};
println!("{result}");
Ok(())
}
async fn subscription_client_request<C>(
client: &C,
query: Query,
max_events: Option<u32>,
max_time: Option<u32>,
) -> Result<(), Error>
where
C: SubscriptionClient,
{
info!("Creating subscription for query: {}", query);
let subs = client.subscribe(query).await?;
match max_time {
Some(secs) => recv_events_with_timeout(subs, max_events, secs).await,
None => recv_events(subs, max_events).await,
}
}
async fn recv_events_with_timeout(
mut subs: Subscription,
max_events: Option<u32>,
timeout_secs: u32,
) -> Result<(), Error> {
let timeout = tokio::time::sleep(Duration::from_secs(timeout_secs as u64));
let mut event_count = 0u64;
tokio::pin!(timeout);
loop {
tokio::select! {
result_opt = subs.next() => {
let result = match result_opt {
Some(r) => r,
None => {
info!("The server terminated the subscription");
return Ok(());
}
};
let event = result?;
print_event(event)?;
event_count += 1;
if let Some(me) = max_events {
if event_count >= (me as u64) {
info!("Reached maximum number of events: {}", me);
return Ok(());
}
}
}
_ = &mut timeout => {
info!("Reached event receive timeout of {} seconds", timeout_secs);
return Ok(())
}
}
}
}
async fn recv_events(mut subs: Subscription, max_events: Option<u32>) -> Result<(), Error> {
let mut event_count = 0u64;
while let Some(result) = subs.next().await {
let event = result?;
print_event(event)?;
event_count += 1;
if let Some(me) = max_events {
if event_count >= (me as u64) {
info!("Reached maximum number of events: {}", me);
return Ok(());
}
}
}
info!("The server terminated the subscription");
Ok(())
}
fn print_event(event: Event) -> Result<(), Error> {
let json = match &event.data {
EventData::LegacyNewBlock { .. } => {
let ser_event: event::v0_37::SerEvent = event.into();
serde_json::to_string_pretty(&ser_event).map_err(Error::serde)?
},
_ => {
let ser_event: event::v0_38::SerEvent = event.into();
serde_json::to_string_pretty(&ser_event).map_err(Error::serde)?
},
};
println!("{}", json);
Ok(())
}