use crate::error::RPCError;
use async_trait::async_trait;
use ckb_dao_utils::extract_dao_data;
use ckb_db_schema::COLUMN_CELL;
use ckb_jsonrpc_types::{
CellsInfo, Disk, DiskUsage, Global, MiningInfo, Network, NetworkInfo, Overview, PeerInfo,
SysInfo, TerminalPoolInfo,
};
use ckb_logger::error;
use ckb_network::NetworkController;
use ckb_shared::shared::Shared;
use ckb_store::ChainStore;
use ckb_types::utilities::compact_to_difficulty;
use ckb_util::Mutex;
use jsonrpc_core::Result;
use jsonrpc_utils::rpc;
use lru::LruCache;
use std::sync::Arc;
use std::time::{Duration, Instant};
use sysinfo::{Disks as SysDisks, Networks as SysNetworks, System};
pub mod ttl {
use std::time::Duration;
pub const SYSTEM_INFO: Duration = Duration::from_secs(5);
pub const MINING_INFO: Duration = Duration::from_secs(10);
pub const TX_POOL_INFO: Duration = Duration::from_secs(2);
pub const CELLS_INFO: Duration = Duration::from_secs(30);
pub const NETWORK_INFO: Duration = Duration::from_secs(10);
}
bitflags::bitflags! {
#[derive(Debug, Clone, Copy, PartialEq, Eq, PartialOrd, Ord, Hash)]
pub struct RefreshKind: u32 {
const NOTHING = 0b00000000;
const SYSTEM_INFO = 0b00000001;
const MINING_INFO = 0b00000010;
const TX_POOL_INFO = 0b00000100;
const CELLS_INFO = 0b00001000;
const NETWORK_INFO = 0b00010000;
const EVERYTHING = 0b00011111;
}
}
#[derive(Debug, Clone)]
pub struct CacheStats {
pub sys_info_cached: usize,
pub mining_info_cached: usize,
pub tx_pool_info_cached: usize,
pub cells_info_cached: usize,
pub network_info_cached: usize,
}
#[derive(Clone, Debug)]
struct CacheEntry<T> {
data: T,
timestamp: Instant,
}
impl<T> CacheEntry<T> {
fn new(data: T) -> Self {
Self {
data,
timestamp: Instant::now(),
}
}
fn is_expired(&self, ttl: Duration) -> bool {
self.timestamp.elapsed() > ttl
}
}
#[derive(Clone)]
pub struct TerminalCache {
sys_info: Arc<Mutex<LruCache<(), CacheEntry<SysInfo>>>>,
mining_info: Arc<Mutex<LruCache<(), CacheEntry<MiningInfo>>>>,
tx_pool_info: Arc<Mutex<LruCache<(), CacheEntry<TerminalPoolInfo>>>>,
cells_info: Arc<Mutex<LruCache<(), CacheEntry<CellsInfo>>>>,
network_info: Arc<Mutex<LruCache<(), CacheEntry<NetworkInfo>>>>,
}
impl TerminalCache {
pub fn new() -> Self {
Self {
sys_info: Arc::new(Mutex::new(LruCache::new(1))),
mining_info: Arc::new(Mutex::new(LruCache::new(1))),
tx_pool_info: Arc::new(Mutex::new(LruCache::new(1))),
cells_info: Arc::new(Mutex::new(LruCache::new(1))),
network_info: Arc::new(Mutex::new(LruCache::new(1))),
}
}
pub fn get_sys_info(&self) -> Option<SysInfo> {
let mut cache = self.sys_info.lock();
if let Some(entry) = cache.get(&())
&& !entry.is_expired(ttl::SYSTEM_INFO)
{
return Some(entry.data.clone());
}
None
}
pub fn set_sys_info(&self, info: SysInfo) {
let mut cache = self.sys_info.lock();
cache.put((), CacheEntry::new(info));
}
pub fn get_mining_info(&self) -> Option<MiningInfo> {
let mut cache = self.mining_info.lock();
if let Some(entry) = cache.get(&())
&& !entry.is_expired(ttl::MINING_INFO)
{
return Some(entry.data.clone());
}
None
}
pub fn set_mining_info(&self, info: MiningInfo) {
let mut cache = self.mining_info.lock();
cache.put((), CacheEntry::new(info));
}
pub fn get_tx_pool_info(&self) -> Option<TerminalPoolInfo> {
let mut cache = self.tx_pool_info.lock();
if let Some(entry) = cache.get(&())
&& !entry.is_expired(ttl::TX_POOL_INFO)
{
return Some(entry.data.clone());
}
None
}
pub fn set_tx_pool_info(&self, info: TerminalPoolInfo) {
let mut cache = self.tx_pool_info.lock();
cache.put((), CacheEntry::new(info));
}
pub fn get_cells_info(&self) -> Option<CellsInfo> {
let mut cache = self.cells_info.lock();
if let Some(entry) = cache.get(&())
&& !entry.is_expired(ttl::CELLS_INFO)
{
return Some(entry.data.clone());
}
None
}
pub fn set_cells_info(&self, info: CellsInfo) {
let mut cache = self.cells_info.lock();
cache.put((), CacheEntry::new(info));
}
pub fn get_network_info(&self) -> Option<NetworkInfo> {
let mut cache = self.network_info.lock();
if let Some(entry) = cache.get(&())
&& !entry.is_expired(ttl::NETWORK_INFO)
{
return Some(entry.data.clone());
}
None
}
pub fn set_network_info(&self, info: NetworkInfo) {
let mut cache = self.network_info.lock();
cache.put((), CacheEntry::new(info));
}
pub fn clear_specific(&self, refresh: RefreshKind) {
if refresh.contains(RefreshKind::SYSTEM_INFO) {
self.sys_info.lock().clear();
}
if refresh.contains(RefreshKind::MINING_INFO) {
self.mining_info.lock().clear();
}
if refresh.contains(RefreshKind::TX_POOL_INFO) {
self.tx_pool_info.lock().clear();
}
if refresh.contains(RefreshKind::CELLS_INFO) {
self.cells_info.lock().clear();
}
if refresh.contains(RefreshKind::NETWORK_INFO) {
self.network_info.lock().clear();
}
}
pub fn get_stats(&self) -> CacheStats {
CacheStats {
sys_info_cached: self.sys_info.lock().len(),
mining_info_cached: self.mining_info.lock().len(),
tx_pool_info_cached: self.tx_pool_info.lock().len(),
cells_info_cached: self.cells_info.lock().len(),
network_info_cached: self.network_info.lock().len(),
}
}
pub fn clear_all(&self) {
self.sys_info.lock().clear();
self.mining_info.lock().clear();
self.tx_pool_info.lock().clear();
self.cells_info.lock().clear();
self.network_info.lock().clear();
}
}
impl Default for TerminalCache {
fn default() -> Self {
Self::new()
}
}
#[rpc(openrpc)]
#[async_trait]
pub trait TerminalRpc {
#[rpc(name = "get_overview")]
fn get_overview(&self, refresh: Option<u32>) -> Result<Overview>;
}
#[derive(Clone)]
pub(crate) struct TerminalRpcImpl {
pub shared: Shared,
pub network_controller: NetworkController,
pub cache: TerminalCache,
}
#[async_trait]
impl TerminalRpc for TerminalRpcImpl {
fn get_overview(&self, refresh: Option<u32>) -> Result<Overview> {
let refresh = refresh
.and_then(RefreshKind::from_bits)
.unwrap_or(RefreshKind::NOTHING);
if refresh.contains(RefreshKind::EVERYTHING) {
self.cache.clear_all();
}
let sys = self.get_sys_info(refresh)?;
let mining = self.get_mining_info(refresh)?;
let pool = self.get_tx_pool_info(refresh)?;
let cells = self.get_cells_info(refresh)?;
let network = self.get_network_info(refresh)?;
Ok(Overview {
sys,
cells,
mining,
pool,
network,
version: self.network_controller.version().to_owned(),
})
}
}
impl TerminalRpcImpl {
fn get_mining_info(&self, refresh: RefreshKind) -> Result<MiningInfo> {
if !refresh.contains(RefreshKind::MINING_INFO)
&& let Some(cached) = self.cache.get_mining_info()
{
return Ok(cached);
}
let current_epoch_ext =
self.shared
.snapshot()
.get_current_epoch_ext()
.ok_or_else(|| {
RPCError::custom(
RPCError::CKBInternalError,
"failed to get current epoch_ext",
)
})?;
let difficulty = compact_to_difficulty(current_epoch_ext.compact_target());
let mining_info = MiningInfo {
difficulty,
hash_rate: current_epoch_ext.previous_epoch_hash_rate().to_owned(),
};
self.cache.set_mining_info(mining_info.clone());
Ok(mining_info)
}
fn get_sys_info(&self, refresh: RefreshKind) -> Result<SysInfo> {
if !refresh.contains(RefreshKind::SYSTEM_INFO)
&& let Some(cached) = self.cache.get_sys_info()
{
return Ok(cached);
}
let mut sys = System::new_all();
sys.refresh_all();
let total_memory = sys.total_memory();
let used_memory = sys.used_memory();
let global_cpu_usage = sys.global_cpu_usage();
let sys_disks = SysDisks::new_with_refreshed_list();
let disks = sys_disks
.iter()
.map(|disk| Disk {
total_space: disk.total_space(),
available_space: disk.available_space(),
is_removable: disk.is_removable(),
})
.collect();
let sys_networks = SysNetworks::new_with_refreshed_list();
let networks = sys_networks
.iter()
.map(|(name, data)| Network {
interface_name: name.clone(),
received: data.received(),
total_received: data.total_received(),
transmitted: data.transmitted(),
total_transmitted: data.total_transmitted(),
})
.collect();
let global = Global {
total_memory,
used_memory,
global_cpu_usage,
disks,
networks,
};
let process = sys
.process(
sysinfo::get_current_pid()
.map_err(|e| RPCError::custom(RPCError::CKBInternalError, e))?,
)
.ok_or_else(|| {
RPCError::custom(RPCError::CKBInternalError, "failed to get current process")
})?;
let sys_disk_usage = process.disk_usage();
let sys_info = SysInfo {
global,
cpu_usage: process.cpu_usage(),
memory: process.memory(),
disk_usage: DiskUsage {
total_written_bytes: sys_disk_usage.total_written_bytes,
written_bytes: sys_disk_usage.written_bytes,
total_read_bytes: sys_disk_usage.total_read_bytes,
read_bytes: sys_disk_usage.read_bytes,
},
virtual_memory: process.virtual_memory(),
};
self.cache.set_sys_info(sys_info.clone());
Ok(sys_info)
}
fn get_tx_pool_info(&self, refresh: RefreshKind) -> Result<TerminalPoolInfo> {
if !refresh.contains(RefreshKind::TX_POOL_INFO)
&& let Some(cached) = self.cache.get_tx_pool_info()
{
return Ok(cached);
}
let tx_pool = self.shared.tx_pool_controller();
let get_tx_pool_info = tx_pool.get_tx_pool_info();
if let Err(e) = get_tx_pool_info {
error!("Send get_tx_pool_info request error {}", e);
return Err(RPCError::ckb_internal_error(e));
};
let info = get_tx_pool_info.unwrap();
let block_template = self
.shared
.get_block_template(None, None, None)
.map_err(|err| {
error!("Send get_block_template request error {}", err);
RPCError::ckb_internal_error(err)
})?
.map_err(|err| {
error!("Get_block_template result error {}", err);
RPCError::from_any_error(err)
})?;
let total_recent_reject_num = tx_pool.get_total_recent_reject_num().map_err(|err| {
error!("Get_total_recent_reject_num result error {}", err);
RPCError::from_any_error(err)
})?;
let tx_pool_info = TerminalPoolInfo {
pending: (info.pending_size as u64).into(),
proposed: (info.proposed_size as u64).into(),
orphan: (info.orphan_size as u64).into(),
committing: (block_template.transactions.len() as u64).into(),
total_recent_reject_num: total_recent_reject_num.unwrap_or(0).into(),
total_tx_size: (info.total_tx_size as u64).into(),
total_tx_cycles: info.total_tx_cycles.into(),
last_txs_updated_at: info.last_txs_updated_at.into(),
max_tx_pool_size: info.max_tx_pool_size.into(),
};
self.cache.set_tx_pool_info(tx_pool_info.clone());
Ok(tx_pool_info)
}
fn get_cells_info(&self, refresh: RefreshKind) -> Result<CellsInfo> {
if !refresh.contains(RefreshKind::CELLS_INFO)
&& let Some(cached) = self.cache.get_cells_info()
{
return Ok(cached);
}
let snapshot = self.shared.cloned_snapshot();
let tip_header = snapshot.tip_header();
let (_ar, _c, _s, u) = extract_dao_data(tip_header.dao());
let estimate_live_cells_num = self
.shared
.store()
.estimate_num_keys_cf(COLUMN_CELL)
.map_err(|err| {
error!("estimate_num_keys_cf error {}", err);
RPCError::ckb_internal_error(err)
})?;
let cells_info = CellsInfo {
total_occupied_capacities: u.into(),
estimate_live_cells_num: estimate_live_cells_num.unwrap_or(0).into(),
};
self.cache.set_cells_info(cells_info.clone());
Ok(cells_info)
}
fn get_network_info(&self, refresh: RefreshKind) -> Result<NetworkInfo> {
if !refresh.contains(RefreshKind::NETWORK_INFO)
&& let Some(cached) = self.cache.get_network_info()
{
return Ok(cached);
}
let peers = self.network_controller.connected_peers();
let total_peers = peers.len();
let mut outbound_peers = 0;
let mut inbound_peers = 0;
let mut peer_infos = Vec::new();
for (peer_index, peer) in peers {
if peer.is_outbound() {
outbound_peers += 1;
} else {
inbound_peers += 1;
}
let peer_id = peer_index.value();
let is_outbound = peer.is_outbound();
let latency_ms = if let Some(rtt) = peer.ping_rtt {
rtt.as_millis() as u64
} else {
0
};
peer_infos.push(PeerInfo {
peer_id,
is_outbound,
latency_ms: latency_ms.into(),
address: peer.connected_addr.to_string(),
});
}
let network_info = NetworkInfo {
connected_peers: (total_peers as u64).into(),
outbound_peers: (outbound_peers as u64).into(),
inbound_peers: (inbound_peers as u64).into(),
peers: peer_infos,
};
self.cache.set_network_info(network_info.clone());
Ok(network_info)
}
}
#[cfg(test)]
mod tests {
use super::*;
use std::time::Duration;
#[test]
fn test_cache_entry_expiration() {
let entry = CacheEntry::new("test_data");
assert!(!entry.is_expired(Duration::from_secs(1)));
let _ = entry.is_expired(Duration::from_millis(0));
}
#[test]
fn test_refresh_kind_flags() {
let nothing = RefreshKind::NOTHING;
assert!(!nothing.contains(RefreshKind::SYSTEM_INFO));
assert!(!nothing.contains(RefreshKind::MINING_INFO));
let system = RefreshKind::SYSTEM_INFO;
assert!(system.contains(RefreshKind::SYSTEM_INFO));
assert!(!system.contains(RefreshKind::MINING_INFO));
let all = RefreshKind::EVERYTHING;
assert!(all.contains(RefreshKind::SYSTEM_INFO));
assert!(all.contains(RefreshKind::MINING_INFO));
assert!(all.contains(RefreshKind::TX_POOL_INFO));
assert!(all.contains(RefreshKind::CELLS_INFO));
}
#[test]
fn test_terminal_cache_basic_operations() {
let cache = TerminalCache::new();
assert!(cache.get_sys_info().is_none());
assert!(cache.get_mining_info().is_none());
let sys_info = SysInfo {
global: Global {
total_memory: 1000,
used_memory: 500,
global_cpu_usage: 50.0,
disks: vec![],
networks: vec![],
},
cpu_usage: 0.0,
memory: 0,
disk_usage: DiskUsage {
total_written_bytes: 0,
written_bytes: 0,
total_read_bytes: 0,
read_bytes: 0,
},
virtual_memory: 0,
};
cache.set_sys_info(sys_info);
assert!(cache.get_sys_info().is_some());
cache.clear_all();
assert!(cache.get_sys_info().is_none());
}
#[test]
fn test_network_info_basic_operations() {
let cache = TerminalCache::new();
assert!(cache.get_network_info().is_none());
let network_info = NetworkInfo {
connected_peers: 5u64.into(),
outbound_peers: 3u64.into(),
inbound_peers: 2u64.into(),
peers: vec![
PeerInfo {
peer_id: 0,
is_outbound: true,
latency_ms: 150u64.into(),
address: "/ip4/192.168.1.100/tcp/8114".to_string(),
},
PeerInfo {
peer_id: 1,
is_outbound: true,
latency_ms: 50u64.into(),
address: "/ip4/192.168.1.101/tcp/8114".to_string(),
},
PeerInfo {
peer_id: 2,
is_outbound: false,
latency_ms: 300u64.into(),
address: "/ip4/192.168.1.102/tcp/8114".to_string(),
},
PeerInfo {
peer_id: 3,
is_outbound: false,
latency_ms: 100u64.into(),
address: "/ip4/192.168.1.103/tcp/8114".to_string(),
},
PeerInfo {
peer_id: 4,
is_outbound: true,
latency_ms: 0u64.into(),
address: "/ip4/192.168.1.104/tcp/8114".to_string(),
},
],
};
cache.set_network_info(network_info);
let cached = cache
.get_network_info()
.expect("Should have cached network info");
assert_eq!(cached.connected_peers, 5u64.into());
assert_eq!(cached.outbound_peers, 3u64.into());
assert_eq!(cached.inbound_peers, 2u64.into());
assert_eq!(cached.peers.len(), 5);
assert_eq!(cached.peers[0].peer_id, 0);
assert!(cached.peers[0].is_outbound);
assert_eq!(cached.peers[0].latency_ms, 150u64.into());
}
#[test]
fn test_refresh_kind_network_flag() {
let nothing = RefreshKind::NOTHING;
assert!(!nothing.contains(RefreshKind::NETWORK_INFO));
let network = RefreshKind::NETWORK_INFO;
assert!(network.contains(RefreshKind::NETWORK_INFO));
assert!(!network.contains(RefreshKind::SYSTEM_INFO));
let all = RefreshKind::EVERYTHING;
assert!(all.contains(RefreshKind::NETWORK_INFO));
assert!(all.contains(RefreshKind::SYSTEM_INFO));
assert!(all.contains(RefreshKind::MINING_INFO));
assert!(all.contains(RefreshKind::TX_POOL_INFO));
assert!(all.contains(RefreshKind::CELLS_INFO));
}
#[test]
fn test_cache_stats_includes_network() {
let cache = TerminalCache::new();
let stats = cache.get_stats();
assert_eq!(stats.sys_info_cached, 0);
assert_eq!(stats.mining_info_cached, 0);
assert_eq!(stats.tx_pool_info_cached, 0);
assert_eq!(stats.cells_info_cached, 0);
assert_eq!(stats.network_info_cached, 0);
let network_info = NetworkInfo::default();
cache.set_network_info(network_info);
let stats = cache.get_stats();
assert_eq!(stats.network_info_cached, 1);
}
}