mod column;
use anyhow::anyhow;
use column::Column;
use crossterm::{
cursor::{Hide, MoveTo, Show},
event::{Event, EventStream, KeyCode, KeyEvent, KeyModifiers as KeyMod},
style::{Color, Print, PrintStyledContent, Stylize},
terminal::{
disable_raw_mode, enable_raw_mode, size, Clear, ClearType, DisableLineWrap, EnableLineWrap,
EnterAlternateScreen, LeaveAlternateScreen, SetSize, SetTitle,
},
ExecutableCommand as _, QueueableCommand,
};
use futures::{future::try_join_all, FutureExt, StreamExt};
use jam_program_blob_common::CrateInfo;
use jsonrpsee::ws_client::WsClient;
use clap::Parser;
use jam_std_common::{
BlockDesc, ChainSubUpdate, CoresStats, Node, NodeExt, Service, ServiceActivityRecord,
Statistics, VersionedParameters,
};
use jam_types::{
core_count, epoch_period, hex, max_accumulate_gas, max_refine_gas, HeaderHash, ServiceId, Slot,
UnsignedGas, VecMap,
};
use std::{
io::{self, Write as _},
sync::Arc,
};
use tokio::{select, signal::ctrl_c};
use corevm_tooling::{ArcNodeExt as _, CoreVmCodeInfo};
use jam_tooling::{
format::{amount, bytes, gas, percent, slot as format_slot, slot_time},
CodeInfo, CommonArgs,
};
type ServiceInfoMap = VecMap<ServiceId, (Service, CodeInfo<CrateInfo>, Option<CoreVmCodeInfo>)>;
type ServiceInfoMapExt = VecMap<
ServiceId,
(Service, CodeInfo<CrateInfo>, Option<CoreVmCodeInfo>, ServiceActivityRecord),
>;
const REF_COL: Color = Color::Rgb { r: 0xFF, g: 0xCC, b: 0x00 };
const ACC_COL: Color = Color::Rgb { r: 0x00, g: 0xFF, b: 0x00 };
const PROV_COL: Color = Color::Rgb { r: 0x00, g: 0x00, b: 0xFF };
#[derive(Parser, Debug)]
#[command(name = "jamtop", version, long_version = format!("{}\nGP {}", env!("CARGO_PKG_VERSION"), jam_types::GP_VERSION))]
pub struct Args {
#[command(flatten)]
common: CommonArgs,
}
#[tokio::main]
async fn main() -> anyhow::Result<()> {
use Column::*;
let args = Args::parse();
let rpc = Arc::new(args.common.connect_rpc(0).await?);
let VersionedParameters::V1(params) = rpc.parameters().await?;
params.apply().map_err(|s| anyhow!("{}", s))?;
let mut stats_sub = rpc.subscribe_statistics(false).await?;
let mut stdout = io::stdout();
stdout
.queue(EnterAlternateScreen)?
.queue(Hide)?
.queue(SetTitle("jamtop"))?
.queue(DisableLineWrap)?
.flush()?;
enable_raw_mode()?;
let BlockDesc { header_hash: head, slot } = rpc.best_block().await?;
let services = rpc.list_services(head).await?;
let mut service_data =
try_join_all(services.iter().map(|&id| {
rpc.query_service_and_corevm_guest(head, id).map(move |r| r.map(|s| (id, s)))
}))
.await?
.into_iter()
.collect::<ServiceInfoMap>();
let mut reader = EventStream::new();
let mut sort_col = Items;
let all_columns = [
Id,
Name,
Flags,
Items,
RefItems,
RefGas,
RefPercent,
RefPercentCore,
AccItems,
AccGas,
AccPercent,
AccPercentCore,
PreimageCount,
PreimageSize,
ImportsCount,
ExportsCount,
ExtrinsicsCount,
ExtrinsicsSize,
StorageItems,
StorageSize,
Balance,
FreeBalance,
MinBalance,
Version,
];
let mut viz = [
true, true, true, true, true, false, false, true, true, false, false, true, true, true,
true, true, true, true, true, true, true, true, true, false,
];
assert_eq!(viz.len(), all_columns.len());
let mut combined = combine_stats(&mut service_data, head, rpc.clone(), None).await?;
let refresh = |v: &[bool]| {
all_columns.into_iter().zip(v).filter(|x| *x.1).map(|x| x.0).collect::<Vec<_>>()
};
let mut columns = refresh(&viz);
let mut history_from = 0;
let mut history = vec![];
loop {
select! {
_ = ctrl_c() => {
eprintln!("Ctrl-C received");
break
}
maybe_event = reader.next().fuse() => {
match maybe_event {
Some(Ok(Event::Key(KeyEvent { code, modifiers: mods, .. }))) => {
let shift = if (mods & KeyMod::SHIFT).is_empty() { 0 } else { 1 } as u8;
match code {
KeyCode::Char('q') | KeyCode::Esc => break,
KeyCode::F(n) => {
if let Some(c) = columns.get((n + shift * 12) as usize) {
sort_col = *c
};
}
KeyCode::Char(x) if x.is_alphabetic() || x.is_numeric() => {
if let Some(i) = "1234567890QWERTYUIOPASDFGHJK".find(x) {
if i < viz.len() {
viz[i] = !viz[i];
}
columns = refresh(&viz);
}
},
_ => {}
}
update_view(head, slot, &combined, &history, &columns, sort_col)?;
}
Some(Err(e)) => {
eprintln!("Error reading event: {e}");
break;
},
Some(Ok(Event::Resize(w, h))) => {
stdout.execute(SetSize(w, h))?;
update_view(head, slot, &combined, &history, &columns, sort_col)?;
},
Some(Ok(_)) | None => {
}
}
}
n = stats_sub.next() => if let Some(Ok(ChainSubUpdate { header_hash, slot, value })) = n {
combined = combine_stats(&mut service_data, header_hash, rpc.clone(), Some(value)).await?;
if history.is_empty() {
history_from = slot;
}
while history_from + (history.len() as Slot) < slot {
history.push(None);
}
history.push(Some((combined.cores.clone(), combined.acc_gas_total)));
update_view(header_hash, slot, &combined, &history, &columns, sort_col)?;
} else {
break
}
};
}
disable_raw_mode()?;
stdout
.queue(LeaveAlternateScreen)?
.queue(Show)?
.queue(EnableLineWrap)?
.flush()?;
Ok(())
}
struct CombinedStats {
all: ServiceInfoMapExt,
active: VecMap<ServiceId, ServiceActivityRecord>,
cores: CoresStats,
ref_total: u32,
acc_total: u32,
refing: usize,
accing: usize,
exports_total: u32,
imports_total: u32,
xts_total: u32,
xt_data_total: u32,
prov_total: u32,
prov_data_total: u32,
ref_gas_total: UnsignedGas,
acc_gas_total: UnsignedGas,
}
async fn combine_stats(
service_info: &mut ServiceInfoMap,
head: HeaderHash,
rpc: Arc<WsClient>,
maybe_activity_data: Option<Statistics>,
) -> anyhow::Result<CombinedStats> {
let (active, cores) = if let Some(Statistics { services, cores, .. }) = maybe_activity_data {
(services, cores)
} else {
Default::default()
};
let ref_total: u32 = active.iter().map(|s| s.1.refinement_count).sum();
let acc_total: u32 = active.iter().map(|s| s.1.accumulate_count).sum();
let refing: usize = active.iter().filter(|s| s.1.refinement_count > 0).count();
let accing: usize = active.iter().filter(|s| s.1.accumulate_count > 0).count();
let exports_total: u32 = active.iter().map(|s| s.1.exports).sum();
let imports_total: u32 = active.iter().map(|s| s.1.imports).sum();
let xts_total: u32 = active.iter().map(|s| s.1.extrinsic_count).sum();
let xt_data_total: u32 = active.iter().map(|s| s.1.extrinsic_size).sum();
let prov_total: u32 = active.iter().map(|s| s.1.provided_count as u32).sum();
let prov_data_total: u32 = active.iter().map(|s| s.1.provided_size).sum();
let ref_gas_total: UnsignedGas = active.iter().map(|s| s.1.refinement_gas_used).sum();
let acc_gas_total: UnsignedGas = active.iter().map(|s| s.1.accumulate_gas_used).sum();
let updated = try_join_all(active.iter().map(|&(id, _)| {
rpc.query_service_and_corevm_guest(head, id).map(move |r| r.map(|s| (id, s)))
}))
.await?;
for (id, (info, meta, inner_meta)) in updated.into_iter() {
service_info.insert(id, (info, meta, inner_meta));
}
let mut all = service_info
.iter()
.cloned()
.map(|(id, (info, meta, inner_meta))| {
(id, (info, meta, inner_meta, ServiceActivityRecord::default()))
})
.collect::<VecMap<_, _>>();
for (id, sta) in active.iter() {
if let Some(m) = all.get_mut(id) {
m.3 = sta.clone();
}
}
Ok(CombinedStats {
all,
active,
cores,
ref_total,
acc_total,
refing,
accing,
exports_total,
imports_total,
xts_total,
xt_data_total,
ref_gas_total,
acc_gas_total,
prov_total,
prov_data_total,
})
}
fn graphic<T: Into<u128>>(x: T, d: T, inv: bool, color: Color) -> String {
let x = x.into();
let d = d.into();
let chars = if inv { "▇▆▅▄▃▂▁ " } else { "▁▂▃▄▅▆▇█" };
let count = 8;
let index = if d == 0 { 0 } else { ((x * (count as u128) + d / 2) / d) as usize };
let c = chars
.chars()
.nth(index.min(count - 1))
.expect("non-empty const; qed")
.to_string();
match (inv, x == 0) {
(false, false) => c.with(color).on(Color::Black),
(false, true) => c.with(Color::DarkGrey).dim().on(Color::Black),
(true, false) => c.with(Color::Black).on(color),
(true, true) => c.with(Color::Black).on(Color::DarkGrey),
}
.to_string()
}
fn update_view(
header_hash: HeaderHash,
slot: Slot,
stats: &CombinedStats,
history: &[Option<(CoresStats, UnsignedGas)>],
columns: &[Column],
sort_col: Column,
) -> anyhow::Result<()> {
let (width, height) = size()?;
let mut stdout = io::stdout();
let mut q = stdout.queue(Clear(ClearType::All))?;
let m_time = slot_time(slot);
let m_slot = format_slot(slot);
let w = if epoch_period() > 99 { 3 } else { 2 };
let m_epoch = format!("E+{}.{:>02$}", slot / epoch_period(), slot % epoch_period(), w);
let m_hash = format!("{}…", hex::to_hex(&header_hash.0[0..4]));
let right_width = m_time.len().max(m_slot.len()).max(m_epoch.len()).max(m_hash.len()) as u16;
q = q
.queue(MoveTo(width - m_time.len() as u16, 0))?
.queue(PrintStyledContent(m_time.white()))?
.queue(MoveTo(width - m_slot.len() as u16, 1))?
.queue(PrintStyledContent(m_slot.reset()))?
.queue(MoveTo(width - m_epoch.len() as u16, 2))?
.queue(PrintStyledContent(m_epoch.reset()))?
.queue(MoveTo(width - 9u16, 3))?
.queue(PrintStyledContent(m_hash.reset()))?;
let preamble_lines = 4;
let max_ref_gas = max_refine_gas() * core_count() as UnsignedGas;
let max_acc_gas = max_accumulate_gas() * core_count() as UnsignedGas;
let services_msg = format!(
"Services: {} total, {} active, {} refining, {} acc'ing",
amount(stats.all.len() as u128),
amount(stats.active.len() as u128),
amount(stats.refing as u128),
amount(stats.accing as u128),
);
let cores_msg = format!(
"APU: Acc: {}% ; Cores: {}/{}% idle",
percent(stats.acc_gas_total, max_acc_gas),
stats
.cores
.iter()
.map(|s| format!("{}%", percent(s.gas_used, max_refine_gas())))
.collect::<Vec<_>>()
.join("/"),
percent(max_ref_gas - stats.ref_gas_total, max_ref_gas),
);
let proc_msg = format!(
"Processed: {} refines/{} gas; {} accs/{} gas",
amount(stats.ref_total),
gas(stats.ref_gas_total),
amount(stats.acc_total),
gas(stats.acc_gas_total),
);
let d3l_msg = format!(
"D3L: {} imports; {} exports; {} extrinsics/{}; {} prov/{}",
amount(stats.imports_total),
amount(stats.exports_total),
amount(stats.xts_total),
bytes(stats.xt_data_total),
amount(stats.prov_total),
bytes(stats.prov_data_total),
);
let preamble_width = 73
.max(services_msg.len())
.max(cores_msg.len())
.max(proc_msg.len())
.max(d3l_msg.len()) as u16;
if preamble_width + right_width + 6 < width {
let graph_width = (width - 6 - preamble_width - right_width) as usize;
let label_x = preamble_width + 2;
let graph_x = preamble_width + 2 + 2;
if core_count() == 2 {
let num_cores = 2;
let mut graph = vec![String::with_capacity(graph_width); num_cores + 1];
for data in history[history.len().saturating_sub(graph_width)..].iter() {
if let Some((cores, acc_gas)) = data {
for (i, (c, l)) in
cores.iter().take(num_cores).zip(graph.iter_mut()).enumerate()
{
l.push_str(&graphic(c.gas_used, max_refine_gas(), i % 2 == 1, REF_COL));
}
graph[num_cores].push_str(&graphic(
*acc_gas,
max_accumulate_gas() * num_cores as UnsignedGas,
false,
ACC_COL,
));
} else {
for l in graph.iter_mut() {
l.push_str(&'X'.dark_grey().to_string());
}
}
}
for (i, l) in graph.into_iter().enumerate() {
q = q
.queue(MoveTo(label_x, i as u16))?
.queue(PrintStyledContent(match i {
0 | 1 => i.to_string().with(REF_COL),
2 => "A".to_string().with(ACC_COL),
_ => unreachable!(),
}))?
.queue(MoveTo(graph_x, i as u16))?
.queue(Print(l))?;
}
}
}
q = q
.queue(MoveTo(0, 0))?
.queue(PrintStyledContent(services_msg.white()))?
.queue(MoveTo(0, 1))?
.queue(PrintStyledContent(cores_msg.white()))?
.queue(MoveTo(0, 2))?
.queue(PrintStyledContent(proc_msg.white()))?
.queue(MoveTo(0, 3))?
.queue(PrintStyledContent(d3l_msg.white()))?;
let first_row = preamble_lines + 1;
let row_count = height - first_row;
let widths = columns.iter().map(|c| c.width()).collect::<Vec<_>>();
let rows = [columns.iter().map(|c| c.to_string()).collect::<Vec<_>>()]
.into_iter()
.chain(
sort_col
.sort_services(stats.all.clone().to_vec())
.iter()
.map(|&(id, (ref info, ref meta, ref inner_meta, ref activity))| {
columns
.iter()
.map(|c| c.extract(id, info, meta, inner_meta, activity))
.collect::<Vec<_>>()
})
.take(row_count as usize - 1),
)
.collect::<Vec<_>>();
let mut available = width + 1;
let mut num_cols = 0;
while num_cols < widths.len() && available as usize > widths[num_cols] {
available -= widths[num_cols] as u16 + 1;
num_cols += 1;
}
for (i, cells) in rows.into_iter().enumerate() {
for ((x, cell), col) in widths
.iter()
.take(num_cols)
.scan(0, |state, &w| {
*state += w + 1;
Some(*state - w - 1)
})
.zip(cells.iter().cloned())
.zip(columns.iter())
{
q = q.queue(MoveTo(x as u16, i as u16 + first_row))?.queue(PrintStyledContent(
if i > 0 {
cell.reset()
} else if *col == sort_col {
cell.white().reverse()
} else {
cell.white()
},
))?;
}
}
q.flush()?;
Ok(())
}