use std::{
cmp,
convert::{TryFrom, TryInto},
str::FromStr,
};
use borsh::{BorshDeserialize, BorshSerialize};
use chrono::Utc;
use either::Either;
use futures::{SinkExt, channel::mpsc};
use log::*;
use minotari_app_grpc::{
conversions::transaction_output::grpc_output_with_payref,
tari_rpc::{
self,
CalcType,
ReadinessStatus,
Sorting,
readiness_status::{State as ReadinessState, Status as ReadinessStatusEnum},
},
};
use tari_common_types::{
epoch::VnEpoch,
payment_reference::generate_payment_reference,
tari_address::TariAddress,
types::{
CompressedCommitment,
CompressedPublicKey,
CompressedSignature,
FixedHash,
UncompressedCommitment,
UncompressedPublicKey,
UncompressedSignature,
},
};
use tari_comms::{Bytes, CommsNode};
use tari_core::{
base_node::{
LocalNodeCommsInterface,
StateMachineHandle,
comms_interface::CommsInterfaceError,
state_machine_service::states::StateInfo,
tari_pulse_service::TariPulseHandle,
},
chain_storage::{ChainStorageError, ValidatorNodeRegistrationInfo},
consensus::BaseNodeConsensusManager,
iterators::NonOverlappingIntegerPairIter,
mempool::{TxStorageResponse, service::LocalMempoolService},
validation::tari_rx_vm_key_height,
};
use tari_node_components::blocks::{Block, BlockHeader, NewBlockTemplate};
use tari_p2p::{auto_update::SoftwareUpdaterHandle, services::liveness::LivenessHandle};
use tari_transaction_components::{
consensus::NetworkConsensus,
generate_coinbase_with_wallet_output,
key_manager::{KeyManager, TariKeyId, TransactionKeyManagerInterface, TxoStage},
tari_proof_of_work::{Difficulty, PowAlgorithm},
transaction_components::{
CoinBaseExtra,
KernelBuilder,
RangeProofType,
Transaction,
TransactionKernel,
TransactionKernelVersion,
memo_field::{MemoField, TxType},
},
};
use tari_utilities::{ByteArray, hex::Hex, message_format::MessageFormat};
use tokio::task;
use tonic::{Request, Response, Status};
use crate::{
BaseNodeConfig,
builder::BaseNodeContext,
grpc::{
blocks::{GET_BLOCKS_MAX_HEIGHTS, GET_BLOCKS_PAGE_SIZE, block_fees, block_heights, block_size},
data_cache::DataCache,
hash_rate::{HashRateMovingAverage, NANOS_PER_UNIT, display_u_decimal_value},
helpers::{mean, median},
},
grpc_method::GrpcMethod,
};
const LOG_TARGET: &str = "minotari::base_node::grpc";
const GET_TOKENS_IN_CIRCULATION_MAX_HEIGHTS: usize = 1_000_000;
const GET_TOKENS_IN_CIRCULATION_PAGE_SIZE: usize = 1_000;
const GET_DIFFICULTY_MAX_HEIGHTS: u64 = 10_000;
const GET_DIFFICULTY_PAGE_SIZE: usize = 1_000;
const LIST_HEADERS_MAX_NUM_HEADERS: u64 = 10_000;
const LIST_HEADERS_PAGE_SIZE: usize = 10;
const LIST_HEADERS_DEFAULT_NUM_HEADERS: u64 = 10;
const BLOCK_TIMING_MAX_BLOCKS: u64 = 10_000;
pub struct BaseNodeGrpcServer {
node_service: LocalNodeCommsInterface,
mempool_service: LocalMempoolService,
network: NetworkConsensus,
state_machine_handle: StateMachineHandle,
consensus_rules: BaseNodeConsensusManager,
software_updater: SoftwareUpdaterHandle,
comms: CommsNode,
liveness: LivenessHandle,
report_grpc_error: bool,
tari_pulse: TariPulseHandle,
config: BaseNodeConfig,
data_cache: DataCache,
}
impl BaseNodeGrpcServer {
pub fn from_base_node_context(ctx: &BaseNodeContext, config: BaseNodeConfig) -> Self {
Self {
node_service: ctx.local_node(),
mempool_service: ctx.local_mempool(),
network: ctx.network().into(),
state_machine_handle: ctx.state_machine(),
consensus_rules: ctx.consensus_rules().clone(),
software_updater: ctx.software_updater(),
comms: ctx.base_node_comms().clone(),
liveness: ctx.liveness(),
report_grpc_error: ctx.get_report_grpc_error(),
tari_pulse: ctx.tari_pulse(),
config,
data_cache: DataCache::new(),
}
}
pub fn report_error_flag(&self) -> bool {
self.report_grpc_error
}
fn is_method_enabled(&self, grpc_method: GrpcMethod) -> bool {
const MINING_METHOD: &[GrpcMethod] = &[
GrpcMethod::GetVersion,
GrpcMethod::GetNewBlockTemplate,
GrpcMethod::GetNewBlockWithCoinbases,
GrpcMethod::GetNewBlockTemplateWithCoinbases,
GrpcMethod::GetNewBlock,
GrpcMethod::GetNewBlockBlob,
GrpcMethod::GetNetworkDifficulty,
GrpcMethod::SubmitBlock,
GrpcMethod::SubmitBlockBlob,
GrpcMethod::GetTipInfo,
GrpcMethod::Identify,
GrpcMethod::GetSyncProgress,
];
const SECOND_LAYER_METHODS: &[GrpcMethod] = &[
GrpcMethod::GetVersion,
GrpcMethod::GetConstants,
GrpcMethod::GetMempoolTransactions,
GrpcMethod::GetMempoolStats,
GrpcMethod::ListHeaders,
GrpcMethod::GetTipInfo,
GrpcMethod::GetActiveValidatorNodes,
GrpcMethod::GetValidatorNodeChanges,
GrpcMethod::GetShardKey,
GrpcMethod::GetTemplateRegistrations,
GrpcMethod::GetHeaderByHash,
GrpcMethod::GetSideChainUtxos,
];
if self.config.mining_enabled && MINING_METHOD.contains(&grpc_method) {
return true;
}
if self.config.second_layer_grpc_enabled && SECOND_LAYER_METHODS.contains(&grpc_method) {
return true;
}
self.config.grpc_server_allow_methods.contains(&grpc_method)
}
#[allow(clippy::result_large_err)]
fn check_method_enabled(&self, method: GrpcMethod) -> Result<(), Status> {
if !self.is_method_enabled(method) {
warn!(
target: LOG_TARGET,
"`{method}` method called but it is not allowed. Allow it in the config file or start the node with a \
different set of CLI options"
);
return Err(Status::permission_denied(format!(
"`{method}` method not made available"
)));
}
Ok(())
}
}
pub fn obscure_error_if_true(report: bool, status: Status) -> Status {
if report {
status
} else {
warn!(target: LOG_TARGET, "Obscured status error: {status}");
Status::new(status.code(), "Error has occurred. Details are obscured.")
}
}
pub async fn get_heights(
request: &tari_rpc::HeightRequest,
handler: LocalNodeCommsInterface,
) -> Result<(u64, u64), Status> {
block_heights(handler, request.start_height, request.end_height, request.from_tip).await
}
impl BaseNodeGrpcServer {}
#[tonic::async_trait]
impl tari_rpc::base_node_server::BaseNode for BaseNodeGrpcServer {
type FetchMatchingUtxosStream = mpsc::Receiver<Result<tari_rpc::FetchMatchingUtxosResponse, Status>>;
type GetActiveValidatorNodesStream = mpsc::Receiver<Result<tari_rpc::GetActiveValidatorNodesResponse, Status>>;
type GetBlocksStream = mpsc::Receiver<Result<tari_rpc::HistoricalBlock, Status>>;
type GetMempoolTransactionsStream = mpsc::Receiver<Result<tari_rpc::GetMempoolTransactionsResponse, Status>>;
type GetNetworkDifficultyStream = mpsc::Receiver<Result<tari_rpc::NetworkDifficultyResponse, Status>>;
type GetPeersStream = mpsc::Receiver<Result<tari_rpc::GetPeersResponse, Status>>;
type GetSideChainUtxosStream = mpsc::Receiver<Result<tari_rpc::GetSideChainUtxosResponse, Status>>;
type GetTemplateRegistrationsStream = mpsc::Receiver<Result<tari_rpc::GetTemplateRegistrationResponse, Status>>;
type GetTokensInCirculationStream = mpsc::Receiver<Result<tari_rpc::ValueAtHeightResponse, Status>>;
type ListHeadersStream = mpsc::Receiver<Result<tari_rpc::BlockHeaderResponse, Status>>;
type SearchKernelsStream = mpsc::Receiver<Result<tari_rpc::HistoricalBlock, Status>>;
type SearchPaymentReferencesStream = mpsc::Receiver<Result<tari_rpc::PaymentReferenceResponse, Status>>;
type SearchPaymentReferencesViaOutputHashStream =
mpsc::Receiver<Result<tari_rpc::PaymentReferenceResponse, Status>>;
type SearchUtxosStream = mpsc::Receiver<Result<tari_rpc::HistoricalBlock, Status>>;
#[allow(clippy::too_many_lines)]
async fn get_network_difficulty(
&self,
request: Request<tari_rpc::HeightRequest>,
) -> Result<Response<Self::GetNetworkDifficultyStream>, Status> {
self.check_method_enabled(GrpcMethod::GetNetworkDifficulty)?;
let report_error_flag = self.report_error_flag();
let request = request.into_inner();
trace!(
target: LOG_TARGET,
"Incoming GRPC request for GetNetworkDifficulty: from_tip: {:?} start_height: {:?} end_height: {:?}",
request.from_tip,
request.start_height,
request.end_height
);
let mut handler = self.node_service.clone();
let (start_height, end_height) = get_heights(&request, handler.clone())
.await
.map_err(|e| obscure_error_if_true(report_error_flag, e))?;
let num_requested = end_height.checked_sub(start_height).ok_or_else(|| {
obscure_error_if_true(
report_error_flag,
Status::invalid_argument("Start height is more than end height"),
)
})?;
if num_requested > GET_DIFFICULTY_MAX_HEIGHTS {
return Err(obscure_error_if_true(
report_error_flag,
Status::invalid_argument(format!(
"Number of headers requested exceeds maximum. Expected less than {GET_DIFFICULTY_MAX_HEIGHTS} but \
got {num_requested}"
)),
));
}
let (mut tx, rx) = mpsc::channel(cmp::min(
usize::try_from(num_requested).map_err(|e| {
obscure_error_if_true(
report_error_flag,
Status::internal(format!("Error converting u64 to usize '{e}'")),
)
})?,
GET_DIFFICULTY_PAGE_SIZE,
));
let mut sha3x_hash_rate_moving_average =
HashRateMovingAverage::new(PowAlgorithm::Sha3x, self.consensus_rules.clone(), false);
let mut monero_randomx_hash_rate_moving_average =
HashRateMovingAverage::new(PowAlgorithm::RandomXM, self.consensus_rules.clone(), false);
let mut tari_randomx_hash_rate_moving_average =
HashRateMovingAverage::new(PowAlgorithm::RandomXT, self.consensus_rules.clone(), false);
let mut cuckaroo_hash_rate_moving_average =
HashRateMovingAverage::new(PowAlgorithm::Cuckaroo, self.consensus_rules.clone(), true);
let page_iter =
NonOverlappingIntegerPairIter::new(start_height, end_height.saturating_add(1), GET_DIFFICULTY_PAGE_SIZE)
.map_err(|e| obscure_error_if_true(report_error_flag, Status::invalid_argument(e)))?;
debug!(target: LOG_TARGET, "Starting GetNetworkDifficulty request from {start_height} to {end_height}");
task::spawn(async move {
for (start, end) in page_iter {
let headers = match handler.get_headers(start..=end).await {
Ok(headers) => headers,
Err(err) => {
warn!(target: LOG_TARGET, "Base node service error: {err:?}");
let _ = tx
.send(Err(obscure_error_if_true(
report_error_flag,
Status::internal("Internal error when fetching blocks"),
)))
.await;
return;
},
};
if headers.is_empty() {
let _network_difficulty_response = tx.send(Err(obscure_error_if_true(
report_error_flag,
Status::invalid_argument(format!("No blocks found within range {start} - {end}")),
)));
return;
}
for chain_header in &headers {
let current_difficulty = chain_header.accumulated_data().target_difficulty;
let current_timestamp = chain_header.header().timestamp;
let current_height = chain_header.header().height;
let pow_algo = chain_header.header().pow.pow_algo;
let current_hash_rate_moving_average = match pow_algo {
PowAlgorithm::RandomXM => &mut monero_randomx_hash_rate_moving_average,
PowAlgorithm::RandomXT => &mut tari_randomx_hash_rate_moving_average,
PowAlgorithm::Sha3x => &mut sha3x_hash_rate_moving_average,
PowAlgorithm::Cuckaroo => &mut cuckaroo_hash_rate_moving_average,
};
current_hash_rate_moving_average.add(current_height, current_difficulty);
let sha3x_estimated_hash_rate = sha3x_hash_rate_moving_average.average();
let monero_randomx_estimated_hash_rate = monero_randomx_hash_rate_moving_average.average();
let tari_randomx_estimated_hash_rate = tari_randomx_hash_rate_moving_average.average();
let cuckaroo_estimated_hash_rate = cuckaroo_hash_rate_moving_average.average();
let estimated_hash_rate = sha3x_estimated_hash_rate
.saturating_add(monero_randomx_estimated_hash_rate)
.saturating_add(tari_randomx_estimated_hash_rate)
.saturating_add(cuckaroo_estimated_hash_rate);
let block = match handler.get_block(current_height, true).await {
Ok(block) => block,
Err(err) => {
warn!(target: LOG_TARGET, "Base node service error: {err:?}");
let _network_difficulty_response = tx.send(Err(obscure_error_if_true(
report_error_flag,
Status::internal(format!("Error fetching block at height {current_height}")),
)));
return;
},
};
if block.is_none() {
let _network_difficulty_response = tx.send(Err(obscure_error_if_true(
report_error_flag,
Status::internal(format!("Block not found at height {current_height}")),
)));
return;
}
let block = block.unwrap();
let coinbases = block.block().body.get_coinbase_outputs();
let cuckaroo_estimated_hash_rate_decimal = cuckaroo_hash_rate_moving_average.u_decimal_average();
trace!(
target: LOG_TARGET,
"Difficulties: #{}, sha3: {}, RmXM: {}, RmXT: {}, C29: {}",
current_height,
sha3x_estimated_hash_rate,
monero_randomx_estimated_hash_rate,
tari_randomx_estimated_hash_rate,
display_u_decimal_value(&cuckaroo_estimated_hash_rate_decimal),
);
let difficulty = tari_rpc::NetworkDifficultyResponse {
difficulty: current_difficulty.as_u64(),
estimated_hash_rate,
sha3x_estimated_hash_rate,
tari_randomx_estimated_hash_rate,
monero_randomx_estimated_hash_rate,
cuckaroo_estimated_hash_rate: Some(cuckaroo_estimated_hash_rate_decimal),
height: current_height,
timestamp: current_timestamp.as_u64(),
pow_algo: pow_algo.as_u64(),
num_coinbases: coinbases.len() as u64,
coinbase_extras: coinbases.iter().map(|c| c.features.coinbase_extra.to_vec()).collect(),
};
if let Err(err) = tx.send(Ok(difficulty)).await {
warn!(target: LOG_TARGET, "Error sending difficulties via GRPC: {err}");
return;
}
}
}
});
trace!(
target: LOG_TARGET,
"Sending GetNetworkDifficulty response stream to client"
);
Ok(Response::new(rx))
}
#[allow(clippy::too_many_lines)]
async fn get_network_state(
&self,
_request: Request<tari_rpc::GetNetworkStateRequest>,
) -> Result<Response<tari_rpc::GetNetworkStateResponse>, Status> {
trace!(target: LOG_TARGET, "Incoming GRPC request for get network hash rate");
let report_error_flag = self.report_error_flag();
let mut handler = self.node_service.clone();
let metadata = handler.get_metadata().await.map_err(|e| {
warn!(
target: LOG_TARGET,
"Could not get node tip: {e}"
);
obscure_error_if_true(report_error_flag, Status::internal(e.to_string()))
})?;
let reward = self
.consensus_rules
.get_block_reward_at(metadata.best_block_height())
.as_u64();
let enabled_algos = self
.consensus_rules
.consensus_constants(metadata.best_block_height())
.current_permitted_pow_algos();
let constants = self.consensus_rules.consensus_constants(metadata.best_block_height());
let sha3x_estimated_hash_rate = if enabled_algos.contains(&PowAlgorithm::Sha3x) {
match self
.data_cache
.get_sha3x_estimated_hash_rate(metadata.best_block_hash())
.await
{
Some(hash_rate) => hash_rate,
None => {
let target_difficulty = handler
.get_target_difficulty_for_next_block(PowAlgorithm::Sha3x)
.await
.inspect_err(|e| {
warn!(
target: LOG_TARGET,
"Could not get target difficulty for Sha3x: {e}"
);
})
.unwrap_or(Difficulty::min());
let target_time = constants.pow_target_block_interval(PowAlgorithm::Sha3x);
let estimated_hash_rate = target_difficulty.as_u64().checked_div(target_time).unwrap_or(0);
self.data_cache
.set_sha3x_estimated_hash_rate(estimated_hash_rate, *metadata.best_block_hash())
.await;
estimated_hash_rate
},
}
} else {
0
};
let monero_randomx_estimated_hash_rate = if enabled_algos.contains(&PowAlgorithm::RandomXM) {
match self
.data_cache
.get_monero_randomx_estimated_hash_rate(metadata.best_block_hash())
.await
{
Some(hash_rate) => hash_rate,
None => {
let target_difficulty = handler
.get_target_difficulty_for_next_block(PowAlgorithm::RandomXM)
.await
.inspect_err(|e| {
warn!(
target: LOG_TARGET,
"Could not get target difficulty for Monero RandomX: {e}"
);
})
.unwrap_or(Difficulty::min());
let target_time = constants.pow_target_block_interval(PowAlgorithm::RandomXM);
let estimated_hash_rate = target_difficulty.as_u64().checked_div(target_time).unwrap_or(0);
self.data_cache
.set_monero_randomx_estimated_hash_rate(estimated_hash_rate, *metadata.best_block_hash())
.await;
estimated_hash_rate
},
}
} else {
0
};
let tari_randomx_estimated_hash_rate = if enabled_algos.contains(&PowAlgorithm::RandomXT) {
match self
.data_cache
.get_tari_randomx_estimated_hash_rate(metadata.best_block_hash())
.await
{
Some(hash_rate) => hash_rate,
None => {
let target_difficulty = handler
.get_target_difficulty_for_next_block(PowAlgorithm::RandomXT)
.await
.inspect_err(|e| {
warn!(
target: LOG_TARGET,
"Could not get target difficulty for Tari RandomX: {e}"
);
})
.unwrap_or(Difficulty::min());
let target_time = constants.pow_target_block_interval(PowAlgorithm::RandomXT);
let estimated_hash_rate = target_difficulty.as_u64().checked_div(target_time).unwrap_or(0);
self.data_cache
.set_tari_randomx_estimated_hash_rate(estimated_hash_rate, *metadata.best_block_hash())
.await;
estimated_hash_rate
},
}
} else {
0
};
let cuckaroo_estimated_hash_rate = if enabled_algos.contains(&PowAlgorithm::Cuckaroo) {
match self
.data_cache
.get_cuckaroo_estimated_hash_rate(metadata.best_block_hash())
.await
{
Some(hash_rate) => hash_rate,
None => {
let target_difficulty = handler
.get_target_difficulty_for_next_block(PowAlgorithm::Cuckaroo)
.await
.inspect_err(|e| {
warn!(
target: LOG_TARGET,
"Could not get target difficulty for Cuckaroo: {e}"
);
})
.unwrap_or(Difficulty::min());
let target_time = constants.pow_target_block_interval(PowAlgorithm::Cuckaroo);
let estimated_hash_rate_scaled = target_difficulty
.as_u64()
.saturating_mul(NANOS_PER_UNIT) .checked_div(target_time)
.unwrap_or(0);
let estimated_hash_rate = HashRateMovingAverage::average_as_u_decimal(estimated_hash_rate_scaled);
self.data_cache
.set_cuckaroo_estimated_hash_rate(estimated_hash_rate, *metadata.best_block_hash())
.await;
estimated_hash_rate
},
}
} else {
tari_rpc::UDecimalValue { units: 0, nanos: 0 }
};
let failed_checkpoints = *self.tari_pulse.get_failed_checkpoints_notifier();
let status_watch = self.state_machine_handle.get_status_info_watch();
let (state, network_silence, initial_sync_achieved) = {
let status = status_watch.borrow();
let state: tari_rpc::BaseNodeState = (&status.state_info).into();
let network_silence = matches!(&status.state_info, StateInfo::Listening(info) if info.is_network_silence());
(state, network_silence, status.bootstrapped)
};
let mut connectivity = self.comms.connectivity();
let connected_peers = connectivity
.get_active_connections()
.await
.map_err(|err| obscure_error_if_true(report_error_flag, Status::internal(err.to_string())))?;
let liveness_results = (*self.tari_pulse.get_liveness_checks()).clone();
let mut liveness = Vec::new();
for data in liveness_results {
let liveness_check = tari_rpc::LivenessResult {
peer_node_id: data.peer.to_string().into_bytes(),
discover_latency: data
.discovery_latency
.map(|v| u64::try_from(v.as_millis()).unwrap_or(u64::MAX))
.unwrap_or_else(|| u64::MAX),
ping_latency: data
.ping_latency
.map(|v| u64::try_from(v.as_millis()).unwrap_or(u64::MAX))
.unwrap_or_else(|| u64::MAX),
};
liveness.push(liveness_check);
}
trace!(
target: LOG_TARGET,
"Difficulties: #{}, sha3: {}, RmXM: {}, RmXT: {}, C29: {}",
metadata.best_block_height(),
sha3x_estimated_hash_rate,
monero_randomx_estimated_hash_rate,
tari_randomx_estimated_hash_rate,
display_u_decimal_value(&cuckaroo_estimated_hash_rate),
);
let response = tari_rpc::GetNetworkStateResponse {
metadata: Some(metadata.into()),
initial_sync_achieved,
base_node_state: state.into(),
network_silence,
failed_checkpoints,
reward,
sha3x_estimated_hash_rate,
monero_randomx_estimated_hash_rate,
tari_randomx_estimated_hash_rate,
cuckaroo_estimated_hash_rate: Some(cuckaroo_estimated_hash_rate),
num_connections: connected_peers.len() as u64,
liveness_results: liveness,
readiness_status: Some(ReadinessStatus {
status: Some(ReadinessStatusEnum::State(ReadinessState::Ready.into())),
timestamp: Utc::now().timestamp_millis() as u64,
}),
};
trace!(target: LOG_TARGET, "Sending GetNetworkState response to client");
Ok(Response::new(response))
}
async fn get_mempool_transactions(
&self,
request: Request<tari_rpc::GetMempoolTransactionsRequest>,
) -> Result<Response<Self::GetMempoolTransactionsStream>, Status> {
self.check_method_enabled(GrpcMethod::GetMempoolTransactions)?;
let report_error_flag = self.report_error_flag();
let _request = request.into_inner();
trace!(target: LOG_TARGET, "Incoming GRPC request for GetMempoolTransactions",);
let mut mempool = self.mempool_service.clone();
let (mut tx, rx) = mpsc::channel(1000);
task::spawn(async move {
let transactions = match mempool.get_mempool_state().await {
Err(err) => {
warn!(target: LOG_TARGET, "Error communicating with base node: {err}");
return;
},
Ok(data) => data,
};
for transaction in transactions.unconfirmed_pool {
let transaction = match tari_rpc::Transaction::try_from(transaction) {
Ok(t) => t,
Err(e) => {
if tx
.send(Err(obscure_error_if_true(
report_error_flag,
Status::internal(format!("Error converting transaction: {e}")),
)))
.await
.is_err()
{
warn!(
target: LOG_TARGET,
"[get_mempool_transactions] GRPC request cancelled while sending response"
);
}
return;
},
};
if tx
.send(Ok(tari_rpc::GetMempoolTransactionsResponse {
transaction: Some(transaction),
}))
.await
.is_err()
{
warn!(target: LOG_TARGET, "GRPC request cancelled while sending response");
}
}
});
trace!(target: LOG_TARGET, "Sending GetMempool response stream to client");
Ok(Response::new(rx))
}
#[allow(clippy::cast_possible_truncation)]
#[allow(clippy::too_many_lines)]
async fn list_headers(
&self,
request: Request<tari_rpc::ListHeadersRequest>,
) -> Result<Response<Self::ListHeadersStream>, Status> {
self.check_method_enabled(GrpcMethod::ListHeaders)?;
let report_error_flag = self.report_error_flag();
let request = request.into_inner();
trace!(
target: LOG_TARGET,
"Incoming GRPC request for ListHeaders: from_height: {}, num_headers:{}, sorting:{}",
request.from_height,
request.num_headers,
request.sorting
);
let mut handler = self.node_service.clone();
let tip = match handler.get_metadata().await {
Err(err) => {
warn!(target: LOG_TARGET, "Error communicating with base node: {err}");
return Err(obscure_error_if_true(
report_error_flag,
Status::internal(err.to_string()),
));
},
Ok(data) => data.best_block_height(),
};
let sorting: Sorting = request.sorting();
let num_headers = match request.num_headers {
0 => LIST_HEADERS_DEFAULT_NUM_HEADERS,
_ => request.num_headers,
};
let num_headers = cmp::min(num_headers, LIST_HEADERS_MAX_NUM_HEADERS);
let (mut tx, rx) = mpsc::channel(LIST_HEADERS_PAGE_SIZE);
let from_height = cmp::min(request.from_height, tip);
let (header_range, is_reversed) = if from_height == 0 {
match sorting {
Sorting::Desc => {
let from = match tip.overflowing_sub(num_headers) {
(_, true) => 0,
(res, false) => res + 1,
};
(from..=tip, true)
},
Sorting::Asc => (0..=num_headers.saturating_sub(1), false),
}
} else {
match sorting {
Sorting::Desc => {
let from = match from_height.overflowing_sub(num_headers) {
(_, true) => 0,
(res, false) => res + 1,
};
(from..=from_height, true)
},
Sorting::Asc => {
let to = from_height.saturating_add(num_headers).saturating_sub(1);
(from_height..=to, false)
},
}
};
let consensus_rules = self.consensus_rules.clone();
let page_iter = NonOverlappingIntegerPairIter::new(
*header_range.start(),
header_range.end().saturating_add(1),
LIST_HEADERS_PAGE_SIZE,
)
.map_err(|e| obscure_error_if_true(report_error_flag, Status::invalid_argument(e)))?;
task::spawn(async move {
trace!(
target: LOG_TARGET,
"Starting base node request {}-{}",
header_range.start(),
header_range.end()
);
let page_iter = if is_reversed {
Either::Left(page_iter.rev())
} else {
Either::Right(page_iter)
};
for (start, end) in page_iter {
trace!(target: LOG_TARGET, "Page: {start}-{end}");
let result_data = match handler.get_blocks(start..=end, true).await {
Err(err) => {
warn!(target: LOG_TARGET, "Internal base node service error: {err}");
return;
},
Ok(data) => {
if is_reversed {
data.into_iter()
.map(|chain_block| {
let (block, acc_data, confirmations) = chain_block.dissolve();
match consensus_rules
.calculate_coinbase_and_fees(block.header.height, block.body.kernels())
{
Ok(total_block_reward) => Ok(tari_rpc::BlockHeaderResponse {
difficulty: acc_data.achieved_difficulty.into(),
num_transactions: block.body.kernels().len() as u32,
confirmations,
header: Some(block.header.into()),
reward: total_block_reward.into(),
}),
Err(e) => {
Err(obscure_error_if_true(report_error_flag, Status::internal(e))
.to_string())
},
}
})
.rev()
.collect::<Result<Vec<_>, String>>()
} else {
data.into_iter()
.map(|chain_block| {
let (block, acc_data, confirmations) = chain_block.dissolve();
match consensus_rules
.calculate_coinbase_and_fees(block.header.height, block.body.kernels())
{
Ok(total_block_reward) => Ok(tari_rpc::BlockHeaderResponse {
difficulty: acc_data.achieved_difficulty.into(),
num_transactions: block.body.kernels().len() as u32,
confirmations,
header: Some(block.header.into()),
reward: total_block_reward.into(),
}),
Err(e) => {
Err(obscure_error_if_true(report_error_flag, Status::internal(e))
.to_string())
},
}
})
.collect::<Result<Vec<_>, String>>()
}
},
};
match result_data {
Err(e) => {
error!(target: LOG_TARGET, "No result headers transmitted due to error: {e}")
},
Ok(result_data) => {
let result_size = result_data.len();
trace!(target: LOG_TARGET, "Result headers: {result_size}");
for response in result_data {
trace!(
target: LOG_TARGET,
"Sending block header: {}",
response.header.as_ref().map( | h| h.height).unwrap_or(0)
);
if tx.send(Ok(response)).await.is_err() {
warn!(
target: LOG_TARGET,
"[list_headers] GRPC request cancelled while sending response"
);
return;
}
}
},
}
}
});
trace!(target: LOG_TARGET, "Sending ListHeaders response stream to client");
Ok(Response::new(rx))
}
#[allow(clippy::too_many_lines)]
async fn get_new_block_template(
&self,
request: Request<tari_rpc::NewBlockTemplateRequest>,
) -> Result<Response<tari_rpc::NewBlockTemplateResponse>, Status> {
self.check_method_enabled(GrpcMethod::GetNewBlockTemplate)?;
let report_error_flag = self.report_error_flag();
let request = request.into_inner();
trace!(target: LOG_TARGET, "Incoming GRPC request for get new block template");
trace!(target: LOG_TARGET, "Request {:?}", request);
let algo = request
.algo
.map(|algo| u64::try_from(algo.pow_algo))
.ok_or_else(|| obscure_error_if_true(report_error_flag, Status::invalid_argument("PoW algo not provided")))?
.map_err(|e| {
obscure_error_if_true(
report_error_flag,
Status::invalid_argument(format!("Invalid PoW algo '{e}'")),
)
})?;
let algo = PowAlgorithm::try_from(algo).map_err(|e| {
obscure_error_if_true(
report_error_flag,
Status::invalid_argument(format!("Invalid PoW algo '{e}'")),
)
})?;
let mut handler = self.node_service.clone();
let metadata = handler.get_metadata().await.map_err(|e| {
warn!(
target: LOG_TARGET,
"Could not get node tip: {e}"
);
obscure_error_if_true(report_error_flag, Status::internal(e.to_string()))
})?;
let new_template = match algo {
PowAlgorithm::Sha3x => {
match self
.data_cache
.get_sha3x_new_block_template(metadata.best_block_hash())
.await
{
Some(template) => template,
None => {
let new_template =
handler
.get_new_block_template(algo, request.max_weight)
.await
.map_err(|e| {
warn!(
target: LOG_TARGET,
"Could not get new block template: {e}"
);
obscure_error_if_true(report_error_flag, Status::internal(e.to_string()))
})?;
self.data_cache
.set_sha3x_new_block_template(new_template.clone(), *metadata.best_block_hash())
.await;
new_template
},
}
},
PowAlgorithm::RandomXM => {
match self
.data_cache
.get_monero_randomx_new_block_template(metadata.best_block_hash())
.await
{
Some(template) => template,
None => {
let new_template =
handler
.get_new_block_template(algo, request.max_weight)
.await
.map_err(|e| {
warn!(
target: LOG_TARGET,
"Could not get new block template: {e}"
);
obscure_error_if_true(report_error_flag, Status::internal(e.to_string()))
})?;
self.data_cache
.set_monero_randomx_new_block_template(new_template.clone(), *metadata.best_block_hash())
.await;
new_template
},
}
},
PowAlgorithm::RandomXT => {
match self
.data_cache
.get_tari_randomx_new_block_template(metadata.best_block_hash())
.await
{
Some(template) => template,
None => {
let new_template =
handler
.get_new_block_template(algo, request.max_weight)
.await
.map_err(|e| {
warn!(
target: LOG_TARGET,
"Could not get new block template: {e}"
);
obscure_error_if_true(report_error_flag, Status::internal(e.to_string()))
})?;
self.data_cache
.set_tari_randomx_new_block_template(new_template.clone(), *metadata.best_block_hash())
.await;
new_template
},
}
},
PowAlgorithm::Cuckaroo => {
match self
.data_cache
.get_cuckaroo_new_block_template(metadata.best_block_hash())
.await
{
Some(template) => template,
None => {
let new_template =
handler
.get_new_block_template(algo, request.max_weight)
.await
.map_err(|e| {
warn!(
target: LOG_TARGET,
"Could not get new block template: {e}"
);
obscure_error_if_true(report_error_flag, Status::internal(e.to_string()))
})?;
self.data_cache
.set_cuckaroo_new_block_template(new_template.clone(), *metadata.best_block_hash())
.await;
new_template
},
}
},
};
let status_watch = self.state_machine_handle.get_status_info_watch();
let pow = algo as i32;
let response = tari_rpc::NewBlockTemplateResponse {
miner_data: Some(tari_rpc::MinerData {
reward: new_template.reward.into(),
target_difficulty: new_template.target_difficulty.as_u64(),
total_fees: new_template.total_fees.into(),
algo: Some(tari_rpc::PowAlgo { pow_algo: pow }),
}),
new_block_template: Some(
new_template
.try_into()
.map_err(|e| obscure_error_if_true(report_error_flag, Status::internal(e)))?,
),
initial_sync_achieved: status_watch.borrow().bootstrapped,
};
trace!(target: LOG_TARGET, "Sending GetNewBlockTemplate response to client");
Ok(Response::new(response))
}
#[allow(clippy::too_many_lines)]
async fn get_new_block(
&self,
request: Request<tari_rpc::NewBlockTemplate>,
) -> Result<Response<tari_rpc::GetNewBlockResult>, Status> {
self.check_method_enabled(GrpcMethod::GetNewBlock)?;
let report_error_flag = self.report_error_flag();
let request = request.into_inner();
trace!(target: LOG_TARGET, "Incoming GRPC request for get new block");
let block_template: NewBlockTemplate = request.try_into().map_err(|s| {
obscure_error_if_true(
report_error_flag,
Status::invalid_argument(format!("Malformed block template provided: {s}")),
)
})?;
let algo = block_template.header.pow.pow_algo;
let mut handler = self.node_service.clone();
let new_block = match handler.get_new_block(block_template).await {
Ok(b) => b,
Err(CommsInterfaceError::ChainStorageError(ChainStorageError::InvalidArguments { message, .. })) => {
return Err(obscure_error_if_true(
report_error_flag,
Status::invalid_argument(message),
));
},
Err(CommsInterfaceError::ChainStorageError(ChainStorageError::CannotCalculateNonTipMmr(msg))) => {
let status = Status::with_details(
tonic::Code::FailedPrecondition,
msg,
Bytes::from_static(b"CannotCalculateNonTipMmr"),
);
return Err(obscure_error_if_true(report_error_flag, status));
},
Err(e) => {
return Err(obscure_error_if_true(
report_error_flag,
Status::internal(e.to_string()),
));
},
};
let fees = new_block.body.get_total_fee().map_err(|_| {
obscure_error_if_true(
report_error_flag,
Status::invalid_argument("Invalid fees in block".to_string()),
)
})?;
let gen_hash = handler
.get_header(0)
.await
.map_err(|_| {
obscure_error_if_true(
report_error_flag,
Status::invalid_argument("Tari genesis block not found".to_string()),
)
})?
.ok_or_else(|| {
obscure_error_if_true(
report_error_flag,
Status::not_found("Tari genesis block not found".to_string()),
)
})?
.hash()
.to_vec();
let block_hash = new_block.hash().to_vec();
let mining_hash = match new_block.header.pow.pow_algo {
PowAlgorithm::Sha3x => new_block.header.mining_hash().to_vec(),
PowAlgorithm::RandomXM => new_block.header.merge_mining_hash().to_vec(),
PowAlgorithm::RandomXT => new_block.header.mining_hash().to_vec(),
PowAlgorithm::Cuckaroo => new_block.header.mining_hash().to_vec(),
};
let block: Option<tari_rpc::Block> = Some(
new_block
.try_into()
.map_err(|e| obscure_error_if_true(report_error_flag, Status::internal(e)))?,
);
let new_template = handler.get_new_block_template(algo, 0).await.map_err(|e| {
warn!(
target: LOG_TARGET,
"Could not get new block template: {e}"
);
obscure_error_if_true(report_error_flag, Status::internal(e.to_string()))
})?;
let pow = algo as i32;
let miner_data = tari_rpc::MinerData {
reward: new_template.reward.into(),
target_difficulty: new_template.target_difficulty.as_u64(),
total_fees: fees.as_u64(),
algo: Some(tari_rpc::PowAlgo { pow_algo: pow }),
};
let vm_key = *handler
.get_header(tari_rx_vm_key_height(new_template.header.height))
.await
.map_err(|_| {
obscure_error_if_true(
report_error_flag,
Status::not_found("Tari block not found ".to_string()),
)
})?
.ok_or_else(|| {
obscure_error_if_true(
report_error_flag,
Status::not_found("Tari block not found ".to_string()),
)
})?
.hash();
let response = tari_rpc::GetNewBlockResult {
block_hash,
block,
merge_mining_hash: mining_hash,
tari_unique_id: gen_hash,
miner_data: Some(miner_data),
vm_key: vm_key.to_vec(),
};
trace!(target: LOG_TARGET, "Sending GetNewBlock response to client");
Ok(Response::new(response))
}
#[allow(clippy::too_many_lines)]
async fn get_new_block_template_with_coinbases(
&self,
request: Request<tari_rpc::GetNewBlockTemplateWithCoinbasesRequest>,
) -> Result<Response<tari_rpc::GetNewBlockResult>, Status> {
if !self.is_method_enabled(GrpcMethod::GetNewBlockTemplateWithCoinbases) {
return Err(Status::permission_denied(
"`GetNewBlockTemplateWithCoinbases` method not made available",
));
}
let report_error_flag = self.report_error_flag();
let request = request.into_inner();
let shares = request
.coinbases
.iter()
.map(|c| c.value.to_string())
.collect::<Vec<_>>()
.join(", ");
debug!(target: LOG_TARGET, "Incoming GRPC request for get new block template with coinbases: {shares}");
let algo = request
.algo
.map(|algo| u64::try_from(algo.pow_algo))
.ok_or_else(|| obscure_error_if_true(report_error_flag, Status::invalid_argument("PoW algo not provided")))?
.map_err(|e| {
obscure_error_if_true(
report_error_flag,
Status::invalid_argument(format!("Invalid PoW algo '{e}'")),
)
})?;
let algo = PowAlgorithm::try_from(algo).map_err(|e| {
obscure_error_if_true(
report_error_flag,
Status::invalid_argument(format!("Invalid PoW algo '{e}'")),
)
})?;
let mut handler = self.node_service.clone();
let meta = handler
.get_metadata()
.await
.map_err(|e| obscure_error_if_true(report_error_flag, Status::internal(e.to_string())))?;
let constants_weight = self
.consensus_rules
.consensus_constants(meta.best_block_height().saturating_add(1))
.max_block_transaction_weight();
let asking_weight = if request.max_weight > constants_weight || request.max_weight == 0 {
constants_weight
} else {
request.max_weight
};
let mut new_template = handler.get_new_block_template(algo, asking_weight).await.map_err(|e| {
warn!(
target: LOG_TARGET,
"Could not get new block template: {e}"
);
obscure_error_if_true(report_error_flag, Status::internal(e.to_string()))
})?;
let pow = algo as i32;
let miner_data = tari_rpc::MinerData {
reward: new_template.reward.into(),
target_difficulty: new_template.target_difficulty.as_u64(),
total_fees: new_template.total_fees.into(),
algo: Some(tari_rpc::PowAlgo { pow_algo: pow }),
};
let mut coinbases: Vec<tari_rpc::NewBlockCoinbase> = request.coinbases;
if coinbases.len() as u64 >
self.consensus_rules
.consensus_constants(meta.best_block_height().saturating_add(1))
.max_block_coinbase_count()
{
return Err(obscure_error_if_true(
report_error_flag,
Status::internal("Too many coinbases, breaking consensus".to_string()),
));
}
let reward = u128::from(
self.consensus_rules
.calculate_coinbase_and_fees(new_template.header.height, new_template.body.kernels())
.map_err(|_| {
obscure_error_if_true(
report_error_flag,
Status::internal("Could not calculate the amount of fees in the block".to_string()),
)
})?
.as_u64(),
);
if coinbases.len() == 1 && coinbases.first().expect("Already checked").value == 0 {
coinbases.get_mut(0).expect("Already checked").value = 1;
}
let mut total_shares = 0u128;
for coinbase in &coinbases {
total_shares += u128::from(coinbase.value);
}
let mut cur_share_sum = 0u128;
let mut prev_coinbase_value = 0u128;
for coinbase in &mut coinbases {
cur_share_sum += u128::from(coinbase.value);
coinbase.value = u64::try_from(
(cur_share_sum.saturating_mul(reward))
.checked_div(total_shares)
.ok_or_else(|| {
obscure_error_if_true(report_error_flag, Status::internal("total shares are zero".to_string()))
})? -
prev_coinbase_value,
)
.map_err(|_| {
obscure_error_if_true(
report_error_flag,
Status::internal("Single coinbase fees exceeded u64".to_string()),
)
})?;
prev_coinbase_value += u128::from(coinbase.value);
}
let key_manager = KeyManager::new_random().map_err(|e| {
obscure_error_if_true(report_error_flag, Status::internal(format!("Key manager error: '{e}'")))
})?;
let height = new_template.header.height;
let script_key_id = TariKeyId::default();
let mut total_excess = UncompressedCommitment::default();
let mut total_nonce = UncompressedPublicKey::default();
let mut private_keys = Vec::new();
let mut kernel_message = [0; 32];
let mut last_kernel = Default::default();
for coinbase in coinbases {
let address = TariAddress::from_str(&coinbase.address)
.map_err(|e| obscure_error_if_true(report_error_flag, Status::internal(e.to_string())))?;
let range_proof_type = if coinbase.revealed_value_proof {
RangeProofType::RevealedValue
} else {
RangeProofType::BulletProofPlus
};
let (_, coinbase_output, coinbase_kernel, wallet_output) = generate_coinbase_with_wallet_output(
0.into(),
coinbase.value.into(),
height,
&CoinBaseExtra::try_from(coinbase.coinbase_extra)
.map_err(|e| obscure_error_if_true(report_error_flag, Status::internal(e.to_string())))?,
&key_manager,
&script_key_id,
&address,
coinbase.stealth_payment,
self.consensus_rules.consensus_constants(height),
range_proof_type,
MemoField::new_open(vec![], TxType::Coinbase).expect("empty user-data should always be valid"),
)
.map_err(|e| obscure_error_if_true(report_error_flag, Status::internal(e.to_string())))?;
new_template.body.add_output(coinbase_output);
let new_nonce = key_manager
.get_random_key(None, None)
.map_err(|e| obscure_error_if_true(report_error_flag, Status::internal(e.to_string())))?;
total_nonce = &total_nonce +
&new_nonce
.pub_key
.to_public_key()
.map_err(|e| obscure_error_if_true(report_error_flag, Status::internal(e.to_string())))?;
total_excess = &total_excess +
&coinbase_kernel
.excess
.to_commitment()
.map_err(|e| obscure_error_if_true(report_error_flag, Status::internal(e.to_string())))?;
private_keys.push((wallet_output.commitment_mask_key_id().clone(), new_nonce.key_id));
kernel_message = TransactionKernel::build_kernel_signature_message(
TransactionKernelVersion::get_current_version(),
coinbase_kernel.fee,
coinbase_kernel.lock_height,
&coinbase_kernel.features,
&None,
);
last_kernel = coinbase_kernel;
}
let mut kernel_signature = UncompressedSignature::default();
for (spending_key_id, nonce) in private_keys {
kernel_signature = &kernel_signature +
&key_manager
.get_partial_txo_kernel_signature(
&spending_key_id,
&nonce,
&CompressedPublicKey::new_from_pk(total_nonce.clone()),
&CompressedPublicKey::new_from_pk(total_excess.as_public_key().clone()),
TransactionKernelVersion::get_current_version(),
&kernel_message,
&last_kernel.features,
TxoStage::Output,
)
.map_err(|e| obscure_error_if_true(report_error_flag, Status::internal(e.to_string())))?
.to_schnorr_signature()
.map_err(|e| obscure_error_if_true(report_error_flag, Status::internal(e.to_string())))?;
}
let kernel_new = KernelBuilder::new()
.with_fee(0.into())
.with_features(last_kernel.features)
.with_lock_height(last_kernel.lock_height)
.with_excess(&CompressedCommitment::from_commitment(total_excess))
.with_signature(CompressedSignature::new_from_schnorr(kernel_signature))
.build()
.unwrap();
new_template.body.add_kernel(kernel_new);
new_template.body.sort();
let new_block = match handler.get_new_block(new_template).await {
Ok(b) => b,
Err(CommsInterfaceError::ChainStorageError(ChainStorageError::InvalidArguments { message, .. })) => {
return Err(obscure_error_if_true(
report_error_flag,
Status::invalid_argument(message),
));
},
Err(CommsInterfaceError::ChainStorageError(ChainStorageError::CannotCalculateNonTipMmr(msg))) => {
let status = Status::with_details(
tonic::Code::FailedPrecondition,
msg,
Bytes::from_static(b"CannotCalculateNonTipMmr"),
);
return Err(obscure_error_if_true(report_error_flag, status));
},
Err(e) => {
return Err(obscure_error_if_true(
report_error_flag,
Status::internal(e.to_string()),
));
},
};
let gen_hash = handler
.get_header(0)
.await
.map_err(|_| {
obscure_error_if_true(
report_error_flag,
Status::invalid_argument("Tari genesis block not found".to_string()),
)
})?
.ok_or_else(|| {
obscure_error_if_true(
report_error_flag,
Status::not_found("Tari genesis block not found".to_string()),
)
})?
.hash()
.to_vec();
let block_hash = new_block.hash().to_vec();
let mining_hash = match new_block.header.pow.pow_algo {
PowAlgorithm::Sha3x => new_block.header.mining_hash().to_vec(),
PowAlgorithm::RandomXT => new_block.header.mining_hash().to_vec(),
PowAlgorithm::RandomXM => new_block.header.merge_mining_hash().to_vec(),
PowAlgorithm::Cuckaroo => new_block.header.mining_hash().to_vec(),
};
let vm_key = *handler
.get_header(tari_rx_vm_key_height(new_block.header.height))
.await
.map_err(|_| {
obscure_error_if_true(
report_error_flag,
Status::not_found("Tari block not found ".to_string()),
)
})?
.ok_or_else(|| {
obscure_error_if_true(
report_error_flag,
Status::not_found("Tari block not found ".to_string()),
)
})?
.hash();
let block: Option<tari_rpc::Block> = Some(
new_block
.try_into()
.map_err(|e| obscure_error_if_true(report_error_flag, Status::internal(e)))?,
);
let response = tari_rpc::GetNewBlockResult {
block_hash,
block,
merge_mining_hash: mining_hash,
tari_unique_id: gen_hash,
miner_data: Some(miner_data),
vm_key: vm_key.to_vec(),
};
trace!(target: LOG_TARGET, "Sending GetNewBlock response to client");
Ok(Response::new(response))
}
#[allow(clippy::too_many_lines)]
async fn get_new_block_with_coinbases(
&self,
request: Request<tari_rpc::GetNewBlockWithCoinbasesRequest>,
) -> Result<Response<tari_rpc::GetNewBlockResult>, Status> {
if !self.is_method_enabled(GrpcMethod::GetNewBlockWithCoinbases) {
return Err(Status::permission_denied(
"`GetNewBlockWithCoinbasesRequest` method not made available",
));
}
let report_error_flag = self.report_error_flag();
let request = request.into_inner();
trace!(target: LOG_TARGET, "Incoming GRPC request for get new block with coinbases");
let mut block_template: NewBlockTemplate = request
.new_template
.ok_or_else(|| {
obscure_error_if_true(
report_error_flag,
Status::invalid_argument("Malformed block template provided".to_string()),
)
})?
.try_into()
.map_err(|s| {
obscure_error_if_true(
report_error_flag,
Status::invalid_argument(format!("Malformed block template provided: {s}")),
)
})?;
let mut coinbases: Vec<tari_rpc::NewBlockCoinbase> = request.coinbases;
if coinbases.len() as u64 >
self.consensus_rules
.consensus_constants(block_template.header.height)
.max_block_coinbase_count()
{
return Err(obscure_error_if_true(
report_error_flag,
Status::internal("Too many coinbases, breaking consensus".to_string()),
));
}
let mut handler = self.node_service.clone();
let reward = self
.consensus_rules
.calculate_coinbase_and_fees(block_template.header.height, block_template.body.kernels())
.map_err(|_| {
obscure_error_if_true(
report_error_flag,
Status::internal("Could not calculate the amount of fees in the block".to_string()),
)
})?;
if coinbases.len() == 1 && coinbases.first().expect("Already checked").value == 0 {
coinbases.get_mut(0).expect("Already checked").value = reward.as_u64();
}
let mut amount = 0u64;
for coinbase in &coinbases {
amount += coinbase.value;
}
if amount != reward.as_u64() {
return Err(obscure_error_if_true(
report_error_flag,
Status::invalid_argument("Malformed coinbase amounts".to_string()),
));
}
let key_manager = KeyManager::new_random().map_err(|s| {
obscure_error_if_true(report_error_flag, Status::internal(format!("Key manager error: {s}")))
})?;
let height = block_template.header.height;
let script_key_id = TariKeyId::default();
let mut total_excess = UncompressedCommitment::default();
let mut total_nonce = UncompressedPublicKey::default();
let mut private_keys = Vec::new();
let mut kernel_message = [0; 32];
let mut last_kernel = Default::default();
for coinbase in coinbases {
let address = TariAddress::from_str(&coinbase.address)
.map_err(|e| obscure_error_if_true(report_error_flag, Status::internal(e.to_string())))?;
let range_proof_type = if coinbase.revealed_value_proof {
RangeProofType::RevealedValue
} else {
RangeProofType::BulletProofPlus
};
let (_, coinbase_output, coinbase_kernel, wallet_output) = generate_coinbase_with_wallet_output(
0.into(),
coinbase.value.into(),
height,
&CoinBaseExtra::try_from(coinbase.coinbase_extra)
.map_err(|e| obscure_error_if_true(report_error_flag, Status::internal(e.to_string())))?,
&key_manager,
&script_key_id,
&address,
coinbase.stealth_payment,
self.consensus_rules.consensus_constants(height),
range_proof_type,
MemoField::new_open(vec![], TxType::Coinbase).expect("empty user-data should always be valid"),
)
.map_err(|e| obscure_error_if_true(report_error_flag, Status::internal(e.to_string())))?;
block_template.body.add_output(coinbase_output);
let new_nonce = key_manager
.get_random_key(None, None)
.map_err(|e| obscure_error_if_true(report_error_flag, Status::internal(e.to_string())))?;
total_nonce = &total_nonce +
&new_nonce
.pub_key
.to_public_key()
.map_err(|e| obscure_error_if_true(report_error_flag, Status::internal(e.to_string())))?;
total_excess = &total_excess +
&coinbase_kernel
.excess
.to_commitment()
.map_err(|e| obscure_error_if_true(report_error_flag, Status::internal(e.to_string())))?;
private_keys.push((wallet_output.commitment_mask_key_id().clone(), new_nonce.key_id));
kernel_message = TransactionKernel::build_kernel_signature_message(
TransactionKernelVersion::get_current_version(),
coinbase_kernel.fee,
coinbase_kernel.lock_height,
&coinbase_kernel.features,
&None,
);
last_kernel = coinbase_kernel;
}
let mut kernel_signature = UncompressedSignature::default();
for (spending_key_id, nonce) in private_keys {
kernel_signature = &kernel_signature +
&key_manager
.get_partial_txo_kernel_signature(
&spending_key_id,
&nonce,
&CompressedPublicKey::new_from_pk(total_nonce.clone()),
&CompressedPublicKey::new_from_pk(total_excess.as_public_key().clone()),
TransactionKernelVersion::get_current_version(),
&kernel_message,
&last_kernel.features,
TxoStage::Output,
)
.map_err(|e| obscure_error_if_true(report_error_flag, Status::internal(e.to_string())))?
.to_schnorr_signature()
.map_err(|e| obscure_error_if_true(report_error_flag, Status::internal(e.to_string())))?;
}
let kernel_new = KernelBuilder::new()
.with_fee(0.into())
.with_features(last_kernel.features)
.with_lock_height(last_kernel.lock_height)
.with_excess(&CompressedCommitment::from_commitment(total_excess))
.with_signature(CompressedSignature::new_from_schnorr(kernel_signature))
.build()
.unwrap();
block_template.body.add_kernel(kernel_new);
block_template.body.sort();
let new_block = match handler.get_new_block(block_template).await {
Ok(b) => b,
Err(CommsInterfaceError::ChainStorageError(ChainStorageError::InvalidArguments { message, .. })) => {
return Err(obscure_error_if_true(
report_error_flag,
Status::invalid_argument(message),
));
},
Err(CommsInterfaceError::ChainStorageError(ChainStorageError::CannotCalculateNonTipMmr(msg))) => {
let status = Status::with_details(
tonic::Code::FailedPrecondition,
msg,
Bytes::from_static(b"CannotCalculateNonTipMmr"),
);
return Err(obscure_error_if_true(report_error_flag, status));
},
Err(e) => {
return Err(obscure_error_if_true(
report_error_flag,
Status::internal(e.to_string()),
));
},
};
let fees = new_block.body.get_total_fee().map_err(|_| {
obscure_error_if_true(
report_error_flag,
Status::invalid_argument("Invalid fees in block".to_string()),
)
})?;
let algo = new_block.header.pow.pow_algo;
let gen_hash = handler
.get_header(0)
.await
.map_err(|_| {
obscure_error_if_true(
report_error_flag,
Status::invalid_argument("Tari genesis block not found".to_string()),
)
})?
.ok_or_else(|| {
obscure_error_if_true(
report_error_flag,
Status::not_found("Tari genesis block not found".to_string()),
)
})?
.hash()
.to_vec();
let block_hash = new_block.hash().to_vec();
let mining_hash = match new_block.header.pow.pow_algo {
PowAlgorithm::Sha3x => new_block.header.mining_hash().to_vec(),
PowAlgorithm::RandomXT => new_block.header.mining_hash().to_vec(),
PowAlgorithm::RandomXM => new_block.header.merge_mining_hash().to_vec(),
PowAlgorithm::Cuckaroo => new_block.header.mining_hash().to_vec(),
};
let block: Option<tari_rpc::Block> = Some(
new_block
.try_into()
.map_err(|e| obscure_error_if_true(report_error_flag, Status::internal(e)))?,
);
let new_template = handler.get_new_block_template(algo, 0).await.map_err(|e| {
warn!(
target: LOG_TARGET,
"Could not get new block template: {e}"
);
obscure_error_if_true(report_error_flag, Status::internal(e.to_string()))
})?;
let pow = algo as i32;
let miner_data = tari_rpc::MinerData {
reward: new_template.reward.into(),
target_difficulty: new_template.target_difficulty.as_u64(),
total_fees: fees.as_u64(),
algo: Some(tari_rpc::PowAlgo { pow_algo: pow }),
};
let vm_key = *handler
.get_header(tari_rx_vm_key_height(new_template.header.height))
.await
.map_err(|_| {
obscure_error_if_true(
report_error_flag,
Status::not_found("Tari block not found ".to_string()),
)
})?
.ok_or_else(|| {
obscure_error_if_true(
report_error_flag,
Status::not_found("Tari block not found ".to_string()),
)
})?
.hash();
let response = tari_rpc::GetNewBlockResult {
block_hash,
block,
merge_mining_hash: mining_hash,
tari_unique_id: gen_hash,
miner_data: Some(miner_data),
vm_key: vm_key.to_vec(),
};
trace!(target: LOG_TARGET, "Sending GetNewBlock response to client");
Ok(Response::new(response))
}
async fn get_new_block_blob(
&self,
request: Request<tari_rpc::NewBlockTemplate>,
) -> Result<Response<tari_rpc::GetNewBlockBlobResult>, Status> {
self.check_method_enabled(GrpcMethod::GetNewBlockBlob)?;
let report_error_flag = self.report_error_flag();
let request = request.into_inner();
trace!(target: LOG_TARGET, "Incoming GRPC request for get new block blob");
let block_template: NewBlockTemplate = request.try_into().map_err(|s| {
obscure_error_if_true(
report_error_flag,
Status::invalid_argument(format!("Invalid block template: {s}")),
)
})?;
let mut handler = self.node_service.clone();
let new_block = match handler.get_new_block(block_template).await {
Ok(b) => b,
Err(CommsInterfaceError::ChainStorageError(ChainStorageError::InvalidArguments { message, .. })) => {
return Err(obscure_error_if_true(
report_error_flag,
Status::invalid_argument(message),
));
},
Err(CommsInterfaceError::ChainStorageError(ChainStorageError::CannotCalculateNonTipMmr(msg))) => {
let status = Status::with_details(
tonic::Code::FailedPrecondition,
msg,
Bytes::from_static(b"CannotCalculateNonTipMmr"),
);
return Err(obscure_error_if_true(report_error_flag, status));
},
Err(e) => {
return Err(obscure_error_if_true(
report_error_flag,
Status::internal(e.to_string()),
));
},
};
let block_hash = new_block.hash().to_vec();
let mining_hash = match new_block.header.pow.pow_algo {
PowAlgorithm::Sha3x => new_block.header.mining_hash().to_vec(),
PowAlgorithm::RandomXT => new_block.header.mining_hash().to_vec(),
PowAlgorithm::RandomXM => new_block.header.merge_mining_hash().to_vec(),
PowAlgorithm::Cuckaroo => new_block.header.mining_hash().to_vec(),
};
let gen_hash = handler
.get_header(0)
.await
.map_err(|_| {
obscure_error_if_true(
report_error_flag,
Status::invalid_argument("Tari genesis block not found".to_string()),
)
})?
.ok_or_else(|| {
obscure_error_if_true(
report_error_flag,
Status::not_found("Tari genesis block not found".to_string()),
)
})?
.hash()
.to_vec();
let (header, block_body) = new_block.into_header_body();
let mut header_bytes = Vec::new();
BorshSerialize::serialize(&header, &mut header_bytes)
.map_err(|err| obscure_error_if_true(report_error_flag, Status::internal(err.to_string())))?;
let mut block_body_bytes = Vec::new();
BorshSerialize::serialize(&block_body, &mut block_body_bytes)
.map_err(|err| obscure_error_if_true(report_error_flag, Status::internal(err.to_string())))?;
let response = tari_rpc::GetNewBlockBlobResult {
block_hash,
header: header_bytes,
block_body: block_body_bytes,
merge_mining_hash: mining_hash,
utxo_mr: header.output_mr.to_vec(),
tari_unique_id: gen_hash,
};
trace!(target: LOG_TARGET, "Sending GetNewBlockBlob response to client");
Ok(Response::new(response))
}
async fn submit_block(
&self,
request: Request<tari_rpc::Block>,
) -> Result<Response<tari_rpc::SubmitBlockResponse>, Status> {
self.check_method_enabled(GrpcMethod::SubmitBlock)?;
let report_error_flag = self.report_error_flag();
let request = request.into_inner();
let block = Block::try_from(request).map_err(|e| {
obscure_error_if_true(
report_error_flag,
Status::invalid_argument(format!("Invalid block provided: {e}")),
)
})?;
let block_height = block.header.height;
trace!(target: LOG_TARGET, "Miner submitted block: {block}");
info!(
target: LOG_TARGET,
"Received SubmitBlock #{block_height} request from client"
);
let mut handler = self.node_service.clone();
let block_hash = handler
.submit_block(block)
.await
.map_err(|e| obscure_error_if_true(report_error_flag, Status::internal(e.to_string())))?
.to_vec();
trace!(
target: LOG_TARGET,
"Sending SubmitBlock #{block_height} response to client"
);
Ok(Response::new(tari_rpc::SubmitBlockResponse { block_hash }))
}
async fn submit_block_blob(
&self,
request: Request<tari_rpc::BlockBlobRequest>,
) -> Result<Response<tari_rpc::SubmitBlockResponse>, Status> {
self.check_method_enabled(GrpcMethod::SubmitBlockBlob)?;
let report_error_flag = self.report_error_flag();
trace!(target: LOG_TARGET, "Received block blob from miner: {request:?}");
let request = request.into_inner();
trace!(target: LOG_TARGET, "request: {request:?}");
let mut header_bytes = request.header_blob.as_slice();
let mut body_bytes = request.body_blob.as_slice();
trace!(target: LOG_TARGET, "doing header");
let header = BorshDeserialize::deserialize(&mut header_bytes)
.map_err(|e| obscure_error_if_true(report_error_flag, Status::internal(e.to_string())))?;
trace!(target: LOG_TARGET, "doing body");
let body = BorshDeserialize::deserialize(&mut body_bytes)
.map_err(|e| obscure_error_if_true(report_error_flag, Status::internal(e.to_string())))?;
let block = Block::new(header, body);
let block_height = block.header.height;
trace!(target: LOG_TARGET, "Miner submitted block: {block}");
info!(
target: LOG_TARGET,
"Received SubmitBlock #{block_height} request from client"
);
let mut handler = self.node_service.clone();
let block_hash = handler
.submit_block(block)
.await
.map_err(|e| obscure_error_if_true(report_error_flag, Status::internal(e.to_string())))?
.to_vec();
trace!(
target: LOG_TARGET,
"Sending SubmitBlock #{block_height} response to client"
);
Ok(Response::new(tari_rpc::SubmitBlockResponse { block_hash }))
}
async fn submit_transaction(
&self,
request: Request<tari_rpc::SubmitTransactionRequest>,
) -> Result<Response<tari_rpc::SubmitTransactionResponse>, Status> {
self.check_method_enabled(GrpcMethod::SubmitTransaction)?;
let report_error_flag = self.report_error_flag();
let request = request.into_inner();
let txn: Transaction = request
.transaction
.ok_or_else(|| obscure_error_if_true(report_error_flag, Status::invalid_argument("Transaction is empty")))?
.try_into()
.map_err(|e| {
obscure_error_if_true(
report_error_flag,
Status::invalid_argument(format!("Invalid transaction provided: {e}")),
)
})?;
trace!(
target: LOG_TARGET,
"Received SubmitTransaction request from client ({} kernels, {} outputs, {} inputs)",
txn.body.kernels().len(),
txn.body.outputs().len(),
txn.body.inputs().len()
);
let mut handler = self.mempool_service.clone();
let res = handler.submit_transaction(txn).await.map_err(|e| {
error!(target: LOG_TARGET, "Error submitting:{e}");
obscure_error_if_true(report_error_flag, Status::internal(e.to_string()))
})?;
let response = match res {
TxStorageResponse::UnconfirmedPool => tari_rpc::SubmitTransactionResponse {
result: tari_rpc::SubmitTransactionResult::Accepted.into(),
},
TxStorageResponse::ReorgPool |
TxStorageResponse::NotStoredAlreadySpent |
TxStorageResponse::NotStoredAlreadyMined => tari_rpc::SubmitTransactionResponse {
result: tari_rpc::SubmitTransactionResult::AlreadyMined.into(),
},
TxStorageResponse::NotStored(_) |
TxStorageResponse::NotStoredOrphan |
TxStorageResponse::NotStoredConsensus(_) |
TxStorageResponse::NotStoredFeeTooLow |
TxStorageResponse::NotStoredTimeLocked => tari_rpc::SubmitTransactionResponse {
result: tari_rpc::SubmitTransactionResult::Rejected.into(),
},
};
trace!(target: LOG_TARGET, "Sending SubmitTransaction response to client");
Ok(Response::new(response))
}
async fn transaction_state(
&self,
request: Request<tari_rpc::TransactionStateRequest>,
) -> Result<Response<tari_rpc::TransactionStateResponse>, Status> {
self.check_method_enabled(GrpcMethod::TransactionState)?;
let report_error_flag = self.report_error_flag();
let request = request.into_inner();
let excess_sig: CompressedSignature = request
.excess_sig
.ok_or_else(|| {
obscure_error_if_true(
report_error_flag,
Status::invalid_argument("excess_sig not provided".to_string()),
)
})?
.try_into()
.map_err(|e| {
obscure_error_if_true(
report_error_flag,
Status::invalid_argument(format!("excess_sig could not be converted '{e}'")),
)
})?;
trace!(
target: LOG_TARGET,
"Received TransactionState request from client ({} excess_sig)",
excess_sig
.to_json()
.unwrap_or_else(|e| format!("Failed to serialize signature '{e}'")),
);
let mut node_handler = self.node_service.clone();
let mut mem_handler = self.mempool_service.clone();
let base_node_response = node_handler
.get_kernel_by_excess_sig(excess_sig.clone())
.await
.map_err(|e| {
error!(target: LOG_TARGET, "Error submitting query:{e}");
obscure_error_if_true(report_error_flag, Status::internal(e.to_string()))
})?;
if !base_node_response.is_empty() {
let response = tari_rpc::TransactionStateResponse {
result: tari_rpc::TransactionLocation::Mined.into(),
};
trace!(
target: LOG_TARGET,
"Sending Transaction state response to client {response:?}"
);
return Ok(Response::new(response));
}
let res = mem_handler
.get_transaction_state_by_excess_sig(excess_sig.clone())
.await
.map_err(|e| {
error!(target: LOG_TARGET, "Error submitting query:{e}");
obscure_error_if_true(report_error_flag, Status::internal(e.to_string()))
})?;
let response = match res {
TxStorageResponse::UnconfirmedPool => tari_rpc::TransactionStateResponse {
result: tari_rpc::TransactionLocation::Mempool.into(),
},
TxStorageResponse::ReorgPool | TxStorageResponse::NotStoredAlreadySpent => {
tari_rpc::TransactionStateResponse {
result: tari_rpc::TransactionLocation::Unknown.into(),
}
},
TxStorageResponse::NotStored(_) |
TxStorageResponse::NotStoredConsensus(_) |
TxStorageResponse::NotStoredOrphan |
TxStorageResponse::NotStoredFeeTooLow |
TxStorageResponse::NotStoredTimeLocked |
TxStorageResponse::NotStoredAlreadyMined => tari_rpc::TransactionStateResponse {
result: tari_rpc::TransactionLocation::NotStored.into(),
},
};
trace!(
target: LOG_TARGET,
"Sending Transaction state response to client {response:?}"
);
Ok(Response::new(response))
}
async fn get_peers(
&self,
_request: Request<tari_rpc::GetPeersRequest>,
) -> Result<Response<Self::GetPeersStream>, Status> {
self.check_method_enabled(GrpcMethod::GetPeers)?;
let report_error_flag = self.report_error_flag();
trace!(target: LOG_TARGET, "Incoming GRPC request for get all peers");
let peers = self
.comms
.peer_manager()
.all(None)
.await
.map_err(|e| obscure_error_if_true(report_error_flag, Status::internal(e.to_string())))?;
let peers: Vec<tari_rpc::Peer> = peers.into_iter().map(|p| p.into()).collect();
let (mut tx, rx) = mpsc::channel(peers.len());
task::spawn(async move {
for peer in peers {
let response = tari_rpc::GetPeersResponse { peer: Some(peer) };
if tx.send(Ok(response)).await.is_err() {
warn!(
target: LOG_TARGET,
"[get_peers] Request was cancelled while sending a response"
);
return;
}
}
});
trace!(target: LOG_TARGET, "Sending peers response to client");
Ok(Response::new(rx))
}
async fn get_blocks(
&self,
request: Request<tari_rpc::GetBlocksRequest>,
) -> Result<Response<Self::GetBlocksStream>, Status> {
self.check_method_enabled(GrpcMethod::GetBlocks)?;
let report_error_flag = self.report_error_flag();
let request = request.into_inner();
trace!(
target: LOG_TARGET,
"Incoming GRPC request for GetBlocks: {:?}", request.heights
);
let mut heights = request.heights;
if heights.is_empty() {
let mut handler = self.node_service.clone();
if let Ok(tip) = handler.get_metadata().await {
heights.push(tip.best_block_height());
}
}
heights.truncate(GET_BLOCKS_MAX_HEIGHTS);
heights.sort_unstable();
let start = *heights.first().expect("unreachable");
let end = *heights.last().expect("unreachable");
let mut handler = self.node_service.clone();
let (mut tx, rx) = mpsc::channel(GET_BLOCKS_PAGE_SIZE);
let page_iter = NonOverlappingIntegerPairIter::new(start, end.saturating_add(1), GET_BLOCKS_PAGE_SIZE)
.map_err(|e| obscure_error_if_true(report_error_flag, Status::invalid_argument(e)))?;
task::spawn(async move {
for (start, end) in page_iter {
let blocks = match handler.get_blocks(start..=end, false).await {
Err(err) => {
warn!(
target: LOG_TARGET,
"Error communicating with local base node: {err:?}"
);
return;
},
Ok(data) => data.into_iter().filter(|b| heights.contains(&b.header().height)),
};
for block in blocks {
trace!(
target: LOG_TARGET,
"GetBlock GRPC sending block #{}",
block.header().height
);
let result = block.try_into().map_err(|err| {
obscure_error_if_true(
report_error_flag,
Status::internal(format!("Could not provide block: {err}")),
)
});
if tx.send(result).await.is_err() {
warn!(
target: LOG_TARGET,
"[get_blocks] Request was cancelled while sending a response"
);
}
}
}
});
trace!(target: LOG_TARGET, "Sending GetBlocks response stream to client");
Ok(Response::new(rx))
}
async fn get_tip_info(
&self,
_request: Request<tari_rpc::Empty>,
) -> Result<Response<tari_rpc::TipInfoResponse>, Status> {
self.check_method_enabled(GrpcMethod::GetTipInfo)?;
let report_error_flag = self.report_error_flag();
trace!(target: LOG_TARGET, "Incoming GRPC request for BN tip data");
let mut handler = self.node_service.clone();
let failed_checkpoints = *self.tari_pulse.get_failed_checkpoints_notifier();
let meta = handler
.get_metadata()
.await
.map_err(|e| obscure_error_if_true(report_error_flag, Status::internal(e.to_string())))?;
let status_watch = self.state_machine_handle.get_status_info_watch();
let state: tari_rpc::BaseNodeState = (&status_watch.borrow().state_info).into();
let response = tari_rpc::TipInfoResponse {
metadata: Some(meta.into()),
initial_sync_achieved: status_watch.borrow().bootstrapped,
base_node_state: state.into(),
failed_checkpoints,
};
trace!(target: LOG_TARGET, "Sending MetaData response to client");
Ok(Response::new(response))
}
async fn search_kernels(
&self,
request: Request<tari_rpc::SearchKernelsRequest>,
) -> Result<Response<Self::SearchKernelsStream>, Status> {
self.check_method_enabled(GrpcMethod::SearchKernels)?;
let report_error_flag = self.report_error_flag();
trace!(target: LOG_TARGET, "Incoming GRPC request for SearchKernels");
let request = request.into_inner();
let kernels = request
.signatures
.into_iter()
.map(|s| s.try_into())
.collect::<Result<Vec<_>, _>>()
.map_err(|e| {
obscure_error_if_true(
report_error_flag,
Status::invalid_argument(format!("Invalid signatures provided: {e}")),
)
})?;
let mut handler = self.node_service.clone();
let (mut tx, rx) = mpsc::channel(GET_BLOCKS_PAGE_SIZE);
task::spawn(async move {
let blocks = match handler.get_blocks_with_kernels(kernels).await {
Err(err) => {
warn!(
target: LOG_TARGET,
"Error communicating with local base node: {err:?}"
);
return;
},
Ok(data) => data,
};
for block in blocks {
let result = block.try_into().map_err(|err| {
obscure_error_if_true(
report_error_flag,
Status::internal(format!("Could not provide block:{err}")),
)
});
if tx.send(result).await.is_err() {
warn!(
target: LOG_TARGET,
"[search_kernels] Request was cancelled while sending a response"
);
return;
}
}
});
trace!(target: LOG_TARGET, "Sending SearchKernels response stream to client");
Ok(Response::new(rx))
}
async fn search_utxos(
&self,
request: Request<tari_rpc::SearchUtxosRequest>,
) -> Result<Response<Self::SearchUtxosStream>, Status> {
self.check_method_enabled(GrpcMethod::SearchUtxos)?;
let report_error_flag = self.report_error_flag();
trace!(target: LOG_TARGET, "Incoming GRPC request for SearchUtxos");
let request = request.into_inner();
let outputs = request
.commitments
.into_iter()
.map(|s| CompressedCommitment::from_canonical_bytes(&s))
.collect::<Result<Vec<_>, _>>()
.map_err(|e| {
obscure_error_if_true(
report_error_flag,
Status::invalid_argument(format!("Invalid commitments provided '{e}'")),
)
})?;
let mut handler = self.node_service.clone();
let (mut tx, rx) = mpsc::channel(GET_BLOCKS_PAGE_SIZE);
task::spawn(async move {
let blocks = match handler.fetch_blocks_with_utxos(outputs).await {
Err(err) => {
warn!(
target: LOG_TARGET,
"Error communicating with local base node: {err:?}"
);
return;
},
Ok(data) => data,
};
for block in blocks {
let result = block.try_into().map_err(|err| {
obscure_error_if_true(
report_error_flag,
Status::internal(format!("Could not provide block:{err}")),
)
});
if tx.send(result).await.is_err() {
warn!(
target: LOG_TARGET,
"[search_utxos] Request was cancelled while sending a response"
);
}
}
});
trace!(target: LOG_TARGET, "Sending SearchUtxos response stream to client");
Ok(Response::new(rx))
}
#[allow(clippy::useless_conversion)]
async fn fetch_matching_utxos(
&self,
request: Request<tari_rpc::FetchMatchingUtxosRequest>,
) -> Result<Response<Self::FetchMatchingUtxosStream>, Status> {
self.check_method_enabled(GrpcMethod::FetchMatchingUtxos)?;
let report_error_flag = self.report_error_flag();
trace!(target: LOG_TARGET, "Incoming GRPC request for FetchMatchingUtxos");
let request = request.into_inner();
let hashes = request
.hashes
.into_iter()
.map(|s| s.try_into())
.collect::<Result<Vec<_>, _>>()
.map_err(|e| {
obscure_error_if_true(
report_error_flag,
Status::invalid_argument(format!("Invalid hashes provided '{e}'")),
)
})?;
let mut handler = self.node_service.clone();
let (mut tx, rx) = mpsc::channel(GET_BLOCKS_PAGE_SIZE);
task::spawn(async move {
let outputs = match handler.fetch_matching_utxos(hashes).await {
Err(err) => {
warn!(
target: LOG_TARGET,
"Error communicating with local base node: {err:?}"
);
let _ignore = tx.send(Err(obscure_error_if_true(
report_error_flag,
Status::internal(format!("Error communicating with local base node: {err}")),
)));
return;
},
Ok(data) => data,
};
for output in outputs {
match grpc_output_with_payref(output, None) {
Ok(output) => {
let resp = tari_rpc::FetchMatchingUtxosResponse { output: Some(output) };
if tx.send(Ok(resp)).await.is_err() {
warn!(
target: LOG_TARGET,
"[fetch_matching_utxos] Request was cancelled while sending a response"
);
return;
}
},
Err(err) => {
let _ignore = tx.send(Err(obscure_error_if_true(
report_error_flag,
Status::internal(format!("Error communicating with local base node: {err}")),
)));
return;
},
}
}
});
trace!(
target: LOG_TARGET,
"Sending FindMatchingUtxos response stream to client"
);
Ok(Response::new(rx))
}
async fn get_block_timing(
&self,
request: Request<tari_rpc::HeightRequest>,
) -> Result<Response<tari_rpc::BlockTimingResponse>, Status> {
self.check_method_enabled(GrpcMethod::GetBlockTiming)?;
let report_error_flag = self.report_error_flag();
let request = request.into_inner();
trace!(
target: LOG_TARGET,
"Incoming GRPC request for GetBlockTiming: from_tip: {:?} start_height: {:?} end_height: {:?}",
request.from_tip,
request.start_height,
request.end_height
);
let mut handler = self.node_service.clone();
let (start, end) = get_heights(&request, handler.clone()).await?;
let num_requested = end.saturating_sub(start);
if num_requested > BLOCK_TIMING_MAX_BLOCKS {
warn!(
target: LOG_TARGET,
"GetBlockTiming request for too many blocks. Requested: {num_requested}. Max: {BLOCK_TIMING_MAX_BLOCKS}."
);
return Err(obscure_error_if_true(
report_error_flag,
Status::invalid_argument(format!(
"Exceeded max blocks request limit of {BLOCK_TIMING_MAX_BLOCKS}"
)),
));
}
let headers = handler.get_headers(start..=end).await.map_err(|err| {
obscure_error_if_true(
report_error_flag,
Status::internal(format!("Could not provide headers:{err}")),
)
})?;
let headers = headers.into_iter().map(|h| h.into_header()).rev().collect::<Vec<_>>();
let (max, min, avg) = BlockHeader::timing_stats(&headers);
let response = tari_rpc::BlockTimingResponse { max, min, avg };
trace!(target: LOG_TARGET, "Sending GetBlockTiming response to client");
Ok(Response::new(response))
}
async fn get_constants(
&self,
request: Request<tari_rpc::BlockHeight>,
) -> Result<Response<tari_rpc::ConsensusConstants>, Status> {
self.check_method_enabled(GrpcMethod::GetConstants)?;
let report_error_flag = self.report_error_flag();
trace!(target: LOG_TARGET, "Incoming GRPC request for GetConstants",);
trace!(target: LOG_TARGET, "Sending GetConstants response to client");
let block_height = request.into_inner().block_height;
let consensus_manager = BaseNodeConsensusManager::builder(self.network.as_network())
.build()
.map_err(|e| {
obscure_error_if_true(
report_error_flag,
Status::unknown(format!("Could not retrieve consensus manager '{e}'")),
)
})?;
let consensus_constants = consensus_manager.consensus_constants(block_height);
Ok(Response::new(tari_rpc::ConsensusConstants::from(
consensus_constants.clone(),
)))
}
async fn get_block_size(
&self,
request: Request<tari_rpc::BlockGroupRequest>,
) -> Result<Response<tari_rpc::BlockGroupResponse>, Status> {
self.check_method_enabled(GrpcMethod::GetBlockSize)?;
let report_error_flag = self.report_error_flag();
get_block_group(
self.node_service.clone(),
request,
BlockGroupType::BlockSize,
report_error_flag,
)
.await
}
async fn get_block_fees(
&self,
request: Request<tari_rpc::BlockGroupRequest>,
) -> Result<Response<tari_rpc::BlockGroupResponse>, Status> {
self.check_method_enabled(GrpcMethod::GetBlockFees)?;
let report_error_flag = self.report_error_flag();
get_block_group(
self.node_service.clone(),
request,
BlockGroupType::BlockFees,
report_error_flag,
)
.await
}
async fn get_version(
&self,
_request: Request<tari_rpc::Empty>,
) -> Result<Response<tari_rpc::BaseNodeGetVersionResponse>, Status> {
self.check_method_enabled(GrpcMethod::GetVersion)?;
let resp = tari_rpc::BaseNodeGetVersionResponse {
version: env!("CARGO_PKG_VERSION").to_string(),
network: u32::from(self.network.as_network().as_byte()),
};
Ok(Response::new(resp))
}
async fn check_for_updates(
&self,
_request: Request<tari_rpc::Empty>,
) -> Result<Response<tari_rpc::SoftwareUpdate>, Status> {
self.check_method_enabled(GrpcMethod::CheckForUpdates)?;
let mut resp = tari_rpc::SoftwareUpdate::default();
if let Some(ref update) = *self.software_updater.update_notifier().borrow() {
resp.has_update = true;
resp.version = update.version().to_string();
resp.sha = update.to_hash_hex();
resp.download_url = update.download_url().to_string();
}
Ok(Response::new(resp))
}
async fn get_tokens_in_circulation(
&self,
request: Request<tari_rpc::GetBlocksRequest>,
) -> Result<Response<Self::GetTokensInCirculationStream>, Status> {
self.check_method_enabled(GrpcMethod::GetTokensInCirculation)?;
let report_error_flag = self.report_error_flag();
trace!(target: LOG_TARGET, "Incoming GRPC request for GetTokensInCirculation",);
let request = request.into_inner();
let mut heights = request.heights;
if heights.is_empty() {
let mut handler = self.node_service.clone();
if let Ok(tip) = handler.get_metadata().await {
heights.push(tip.best_block_height());
}
}
heights = heights
.drain(..cmp::min(heights.len(), GET_TOKENS_IN_CIRCULATION_MAX_HEIGHTS))
.collect();
let consensus_manager = BaseNodeConsensusManager::builder(self.network.as_network())
.build()
.map_err(|e| {
obscure_error_if_true(
report_error_flag,
Status::unknown(format!("Could not retrieve consensus manager '{e}'")),
)
})?;
let (mut tx, rx) = mpsc::channel(GET_TOKENS_IN_CIRCULATION_PAGE_SIZE);
task::spawn(async move {
let mut page: Vec<u64> = heights
.drain(..cmp::min(heights.len(), GET_TOKENS_IN_CIRCULATION_PAGE_SIZE))
.collect();
while !page.is_empty() {
let values = page
.clone()
.into_iter()
.map(|height| {
let circulating_supply = consensus_manager.total_tokens_circulating_at_height(height)?.into();
let mined_rewards = consensus_manager.block_rewards_mined_at_height(height)?.into();
let spendable_rewards = consensus_manager.block_rewards_spendable_at_height(height)?.into();
let spendable_pre_mine = consensus_manager.pre_mine_spendable_at_height(height)?.into();
let total_spendable = consensus_manager.total_tokens_spendable_at_height(height)?.into();
let total_pre_mine = consensus_manager.total_pre_mine_in_genesis_block().into();
let time_locked_pre_mine = consensus_manager.time_locked_pre_mine(height)?.into();
Ok(tari_rpc::ValueAtHeightResponse {
circulating_supply,
height,
mined_rewards,
spendable_rewards,
spendable_pre_mine,
total_spendable,
total_pre_mine,
time_locked_pre_mine,
})
})
.collect::<Result<Vec<tari_rpc::ValueAtHeightResponse>, String>>();
let result_size = match values {
Ok(values) => {
let values_len = values.len();
for value in values {
if tx.send(Ok(value)).await.is_err() {
warn!(
target: LOG_TARGET,
"[get_tokens_in_circulation] Request was cancelled while sending a response"
);
return;
}
}
values_len
},
Err(e) => {
warn!(
target: LOG_TARGET,
"Error communicating with local base node: {e:?}"
);
let _ignore = tx.send(Err(obscure_error_if_true(
report_error_flag,
Status::internal(format!("Error communicating with local base node: {e}")),
)));
return;
},
};
if result_size < GET_TOKENS_IN_CIRCULATION_PAGE_SIZE {
break;
}
page = heights
.drain(..cmp::min(heights.len(), GET_TOKENS_IN_CIRCULATION_PAGE_SIZE))
.collect();
}
});
trace!(target: LOG_TARGET, "Sending GetTokensInCirculation response to client");
Ok(Response::new(rx))
}
async fn get_sync_progress(
&self,
_request: Request<tari_rpc::Empty>,
) -> Result<Response<tari_rpc::SyncProgressResponse>, Status> {
self.check_method_enabled(GrpcMethod::GetSyncProgress)?;
let state = self
.state_machine_handle
.get_status_info_watch()
.borrow()
.state_info
.clone();
let short_desc = state.short_desc();
let mut handler = self.node_service.clone();
let report_error_flag = self.report_error_flag();
let meta = handler
.get_metadata()
.await
.map_err(|e| obscure_error_if_true(report_error_flag, Status::internal(e.to_string())))?;
let response = match state {
StateInfo::HeaderSync(None) => tari_rpc::SyncProgressResponse {
tip_height: 0,
local_height: 0,
state: tari_rpc::SyncState::HeaderStarting.into(),
short_desc,
initial_connected_peers: 0,
},
StateInfo::HeaderSync(Some(info)) => tari_rpc::SyncProgressResponse {
tip_height: info.tip_height,
local_height: info.local_height,
state: tari_rpc::SyncState::Header.into(),
short_desc,
initial_connected_peers: 0,
},
StateInfo::Connecting(_) => tari_rpc::SyncProgressResponse {
tip_height: 0,
local_height: 0,
state: tari_rpc::SyncState::BlockStarting.into(),
short_desc,
initial_connected_peers: 0,
},
StateInfo::BlockSync(info) => tari_rpc::SyncProgressResponse {
tip_height: info.tip_height,
local_height: info.local_height,
state: tari_rpc::SyncState::Block.into(),
short_desc,
initial_connected_peers: 0,
},
_ => tari_rpc::SyncProgressResponse {
tip_height: if state.is_synced() { meta.best_block_height() } else { 0 },
local_height: if state.is_synced() { meta.best_block_height() } else { 0 },
state: if state.is_synced() {
tari_rpc::SyncState::Done.into()
} else {
tari_rpc::SyncState::Startup.into()
},
short_desc,
initial_connected_peers: state.get_initial_connected_peers(),
},
};
Ok(Response::new(response))
}
async fn get_sync_info(
&self,
_request: Request<tari_rpc::Empty>,
) -> Result<Response<tari_rpc::SyncInfoResponse>, Status> {
self.check_method_enabled(GrpcMethod::GetSyncInfo)?;
trace!(target: LOG_TARGET, "Incoming GRPC request for BN sync data");
let response = self
.state_machine_handle
.get_status_info_watch()
.borrow()
.state_info
.get_block_sync_info()
.map(|info| {
let node_ids = info.sync_peer.node_id().to_string().into_bytes();
tari_rpc::SyncInfoResponse {
tip_height: info.tip_height,
local_height: info.local_height,
peer_node_id: vec![node_ids],
}
})
.unwrap_or_default();
trace!(target: LOG_TARGET, "Sending SyncData response to client");
Ok(Response::new(response))
}
#[allow(clippy::cast_possible_truncation)]
async fn get_header_by_hash(
&self,
request: Request<tari_rpc::GetHeaderByHashRequest>,
) -> Result<Response<tari_rpc::BlockHeaderResponse>, Status> {
self.check_method_enabled(GrpcMethod::GetHeaderByHash)?;
let report_error_flag = self.report_error_flag();
let tari_rpc::GetHeaderByHashRequest { hash } = request.into_inner();
let mut node_service = self.node_service.clone();
let hash_hex = hash.to_hex();
let block_hash = hash.try_into().map_err(|e| {
obscure_error_if_true(
report_error_flag,
Status::invalid_argument(format!("Malformed block hash '{e}'")),
)
})?;
let block = node_service
.get_block_by_hash(block_hash)
.await
.map_err(|err| obscure_error_if_true(report_error_flag, Status::internal(err.to_string())))?
.ok_or_else(|| {
obscure_error_if_true(
report_error_flag,
Status::not_found(format!("Header not found with hash `{hash_hex}`")),
)
})?;
let (block, acc_data, confirmations) = block.dissolve();
let total_block_reward = self
.consensus_rules
.calculate_coinbase_and_fees(block.header.height, block.body.kernels())
.map_err(|e| obscure_error_if_true(report_error_flag, Status::out_of_range(e)))?;
let resp = tari_rpc::BlockHeaderResponse {
difficulty: acc_data.achieved_difficulty.into(),
num_transactions: block.body.kernels().len() as u32,
confirmations,
header: Some(block.header.into()),
reward: total_block_reward.into(),
};
Ok(Response::new(resp))
}
async fn identify(&self, _: Request<tari_rpc::Empty>) -> Result<Response<tari_rpc::NodeIdentity>, Status> {
self.check_method_enabled(GrpcMethod::Identify)?;
let identity = self.comms.node_identity_ref();
Ok(Response::new(tari_rpc::NodeIdentity {
public_key: identity.public_key().to_vec(),
public_addresses: identity.public_addresses().iter().map(|a| a.to_string()).collect(),
node_id: identity.node_id().to_vec(),
}))
}
async fn get_network_status(
&self,
_: Request<tari_rpc::Empty>,
) -> Result<Response<tari_rpc::NetworkStatusResponse>, Status> {
self.check_method_enabled(GrpcMethod::GetNetworkStatus)?;
let report_error_flag = self.report_error_flag();
let status = self
.comms
.connectivity()
.get_connectivity_status()
.await
.map_err(|err| obscure_error_if_true(report_error_flag, Status::internal(err.to_string())))?;
let latency = self
.liveness
.clone()
.get_network_avg_latency()
.await
.map_err(|err| obscure_error_if_true(report_error_flag, Status::internal(err.to_string())))?;
let resp = tari_rpc::NetworkStatusResponse {
status: tari_rpc::ConnectivityStatus::from(status) as i32,
avg_latency_ms: latency
.map(|l| u32::try_from(l.as_millis()).unwrap_or(u32::MAX))
.unwrap_or(0),
num_node_connections: u32::try_from(status.num_connected_nodes()).map_err(|e| {
obscure_error_if_true(
report_error_flag,
Status::internal(format!("Error converting usize to u32 '{e}'")),
)
})?,
};
Ok(Response::new(resp))
}
async fn list_connected_peers(
&self,
_: Request<tari_rpc::Empty>,
) -> Result<Response<tari_rpc::ListConnectedPeersResponse>, Status> {
self.check_method_enabled(GrpcMethod::ListConnectedPeers)?;
let report_error_flag = self.report_error_flag();
let mut connectivity = self.comms.connectivity();
let peer_manager = self.comms.peer_manager();
let connected_peers = connectivity
.get_active_connections()
.await
.map_err(|err| obscure_error_if_true(report_error_flag, Status::internal(err.to_string())))?;
let node_ids = connected_peers
.iter()
.map(|c| c.peer_node_id())
.cloned()
.collect::<Vec<_>>();
let peers = peer_manager
.get_peers_by_node_ids(&node_ids)
.await
.map_err(|err| Status::internal(err.to_string()))?;
if peers.len() != node_ids.len() {
let mut error_response = Vec::new();
node_ids.iter().for_each(|node_id| {
if !peers.iter().any(|p| p.node_id == *node_id) {
warn!(target: LOG_TARGET, "Peer '{node_id}' not found");
error_response.push(format!("'{node_id}'"));
}
});
if !error_response.is_empty() {
return Err(Status::not_found(format!(
"Peer(s) not found: {}",
error_response.join(", ")
)));
}
}
let resp = tari_rpc::ListConnectedPeersResponse {
connected_peers: peers.into_iter().map(Into::into).collect(),
};
Ok(Response::new(resp))
}
async fn get_mempool_stats(
&self,
_: Request<tari_rpc::Empty>,
) -> Result<Response<tari_rpc::MempoolStatsResponse>, Status> {
self.check_method_enabled(GrpcMethod::GetMempoolStats)?;
let report_error_flag = self.report_error_flag();
let mut mempool_handle = self.mempool_service.clone();
let mempool_stats = mempool_handle.get_mempool_stats().await.map_err(|e| {
error!(target: LOG_TARGET, "Error submitting query:{e}");
obscure_error_if_true(report_error_flag, Status::internal(e.to_string()))
})?;
let response = tari_rpc::MempoolStatsResponse {
unconfirmed_txs: mempool_stats.unconfirmed_txs,
reorg_txs: mempool_stats.reorg_txs,
unconfirmed_weight: mempool_stats.unconfirmed_weight,
};
Ok(Response::new(response))
}
async fn get_shard_key(
&self,
request: Request<tari_rpc::GetShardKeyRequest>,
) -> Result<Response<tari_rpc::GetShardKeyResponse>, Status> {
self.check_method_enabled(GrpcMethod::GetShardKey)?;
let request = request.into_inner();
let report_error_flag = self.report_error_flag();
let mut handler = self.node_service.clone();
let public_key = CompressedPublicKey::from_canonical_bytes(&request.public_key)
.map_err(|e| obscure_error_if_true(report_error_flag, Status::invalid_argument(e.to_string())))?;
let epoch = request.epoch;
let validator_node = handler
.get_validator_node(None, public_key)
.await
.map_err(|e| {
error!(target: LOG_TARGET, "Error {e}");
obscure_error_if_true(report_error_flag, Status::internal(e.to_string()))
})?
.ok_or_else(|| Status::not_found("Validator node not found"))?;
if validator_node.activation_epoch.as_u64() > epoch {
return Err(Status::not_found(format!(
"Validator node found but not active for epoch {epoch}"
)));
}
Ok(Response::new(tari_rpc::GetShardKeyResponse {
shard_key: validator_node.shard_key.to_vec(),
}))
}
async fn get_active_validator_nodes(
&self,
request: Request<tari_rpc::GetActiveValidatorNodesRequest>,
) -> Result<Response<Self::GetActiveValidatorNodesStream>, Status> {
self.check_method_enabled(GrpcMethod::GetActiveValidatorNodes)?;
let request = request.into_inner();
trace!(target: LOG_TARGET, "Incoming GRPC request for GetActiveValidatorNodes");
let mut handler = self.node_service.clone();
let (mut tx, rx) = mpsc::channel(1000);
let sidechain_id = if request.sidechain_id.is_empty() {
None
} else {
Some(
CompressedPublicKey::from_canonical_bytes(&request.sidechain_id)
.map_err(|e| Status::invalid_argument(format!("Invalid sidechain_id '{e}'")))?,
)
};
task::spawn(async move {
let active_validator_nodes = match handler.get_active_validator_nodes(request.height, sidechain_id).await {
Err(err) => {
warn!(target: LOG_TARGET, "Base node service error: {err}");
return;
},
Ok(data) => data,
};
for ValidatorNodeRegistrationInfo {
public_key,
sidechain_id,
shard_key,
activation_epoch,
original_registration,
minimum_value_promise,
} in active_validator_nodes
{
let active_validator_node = tari_rpc::GetActiveValidatorNodesResponse {
public_key: public_key.to_vec(),
shard_key: shard_key.to_vec(),
sidechain_id: sidechain_id.as_ref().map(|n| n.to_vec()).unwrap_or_default(),
claim_public_key: original_registration.claim_public_key().to_vec(),
minimum_value_promise: minimum_value_promise.as_u64(),
activation_epoch: activation_epoch.as_u64(),
};
if tx.send(Ok(active_validator_node)).await.is_err() {
trace!(
target: LOG_TARGET,
"[get_active_validator_nodes] Client has disconnected before stream completed"
);
return;
}
}
});
trace!(
target: LOG_TARGET,
"Sending GetActiveValidatorNodes response stream to client"
);
Ok(Response::new(rx))
}
async fn get_template_registrations(
&self,
request: Request<tari_rpc::GetTemplateRegistrationsRequest>,
) -> Result<Response<Self::GetTemplateRegistrationsStream>, Status> {
self.check_method_enabled(GrpcMethod::GetTemplateRegistrations)?;
let request = request.into_inner();
let report_error_flag = self.report_error_flag();
trace!(target: LOG_TARGET, "Incoming GRPC request for GetTemplateRegistrations");
let (mut tx, rx) = mpsc::channel(10);
let start_hash = Some(request.start_hash)
.filter(|x| !x.is_empty())
.map(FixedHash::try_from)
.transpose()
.map_err(|e| {
obscure_error_if_true(
report_error_flag,
Status::invalid_argument(format!("Invalid start_hash '{e}'")),
)
})?;
let mut node_service = self.node_service.clone();
let start_height = match start_hash {
Some(hash) => {
let header = node_service
.get_header_by_hash(hash)
.await
.map_err(|err| obscure_error_if_true(self.report_grpc_error, Status::internal(err.to_string())))?;
header.map(|h| h.height()).ok_or_else(|| {
obscure_error_if_true(report_error_flag, Status::not_found("Start hash not found"))
})?
},
None => 0,
};
if request.count == 0 {
return Ok(Response::new(rx));
}
let end_height = start_height.checked_add(request.count).ok_or_else(|| {
obscure_error_if_true(
report_error_flag,
Status::invalid_argument("Request start height + count overflows u64"),
)
})?;
task::spawn(async move {
let template_registrations = match node_service.get_template_registrations(start_height, end_height).await {
Err(err) => {
warn!(target: LOG_TARGET, "Base node service error: {err}");
return;
},
Ok(data) => data,
};
for template_registration in template_registrations {
let registration = template_registration.registration_data().into();
let resp = tari_rpc::GetTemplateRegistrationResponse {
utxo_hash: template_registration.output_hash.to_vec(),
registration: Some(registration),
};
if tx.send(Ok(resp)).await.is_err() {
trace!(
target: LOG_TARGET,
"[get_template_registrations] Client has disconnected before stream completed"
);
return;
}
}
});
trace!(
target: LOG_TARGET,
"Sending GetTemplateRegistrations response stream to client"
);
Ok(Response::new(rx))
}
#[allow(clippy::too_many_lines)]
async fn get_side_chain_utxos(
&self,
request: Request<tari_rpc::GetSideChainUtxosRequest>,
) -> Result<Response<Self::GetSideChainUtxosStream>, Status> {
self.check_method_enabled(GrpcMethod::GetSideChainUtxos)?;
let request = request.into_inner();
let report_error_flag = self.report_error_flag();
trace!(target: LOG_TARGET, "Incoming GRPC request for GetTemplateRegistrations");
let (mut tx, rx) = mpsc::channel(10);
let start_hash = Some(request.start_hash)
.filter(|x| !x.is_empty())
.map(FixedHash::try_from)
.transpose()
.map_err(|e| Status::invalid_argument(format!("Invalid start_hash '{e}'")))?;
let mut node_service = self.node_service.clone();
let start_header = match start_hash {
Some(hash) => node_service
.get_header_by_hash(hash)
.await
.map_err(|err| obscure_error_if_true(self.report_grpc_error, Status::internal(err.to_string())))?
.ok_or_else(|| Status::not_found("Start hash not found"))?,
None => node_service
.get_header(0)
.await
.map_err(|err| obscure_error_if_true(self.report_grpc_error, Status::internal(err.to_string())))?
.ok_or_else(|| Status::unavailable("Genesis block not available"))?,
};
if request.count == 0 {
return Ok(Response::new(rx));
}
let start_height = start_header.height();
let end_height = start_height
.checked_add(request.count - 1)
.ok_or_else(|| Status::invalid_argument("Request start height + count overflows u64"))?;
task::spawn(async move {
let mut current_header = start_header;
for height in start_height..=end_height {
let header_hash = *current_header.hash();
let utxos = match node_service.fetch_unspent_utxos_in_block(header_hash).await {
Ok(utxos) => utxos,
Err(e) => {
warn!(target: LOG_TARGET, "Base node service error: {e}");
return;
},
};
let next_header = match node_service.get_header(height.saturating_add(1)).await {
Ok(h) => h,
Err(e) => {
let _ignore = tx.send(Err(obscure_error_if_true(
report_error_flag,
Status::internal(e.to_string()),
)));
return;
},
};
let sidechain_outputs = utxos
.into_iter()
.filter(|u| u.features.output_type.is_sidechain_type() || u.is_burned_to_sidechain())
.map(|o| grpc_output_with_payref(o, Some(header_hash)))
.collect::<Result<Vec<_>, _>>();
match sidechain_outputs {
Ok(outputs) => {
let resp = tari_rpc::GetSideChainUtxosResponse {
block_info: Some(tari_rpc::BlockInfo {
height: current_header.height(),
hash: header_hash.to_vec(),
next_block_hash: next_header.as_ref().map(|h| h.hash().to_vec()).unwrap_or_default(),
}),
outputs,
};
if tx.send(Ok(resp)).await.is_err() {
trace!(
target: LOG_TARGET,
"[get_template_registrations] Client has disconnected before stream completed"
);
return;
}
},
Err(e) => {
warn!(
target: LOG_TARGET,
"Error sending converting sidechain output for GRPC: {e}"
);
let _ignore = tx
.send(Err(obscure_error_if_true(
report_error_flag,
Status::internal(format!("Error converting sidechain output: {e}")),
)))
.await;
return;
},
};
match next_header {
Some(header) => {
current_header = header;
},
None => break,
}
}
});
trace!(
target: LOG_TARGET,
"Sending GetTemplateRegistrations response stream to client"
);
Ok(Response::new(rx))
}
#[allow(clippy::too_many_lines)]
async fn search_payment_references_via_output_hash(
&self,
request: Request<tari_rpc::FetchMatchingUtxosRequest>,
) -> Result<Response<Self::SearchPaymentReferencesViaOutputHashStream>, Status> {
self.check_method_enabled(GrpcMethod::SearchPaymentReferencesViaOutputHash)?;
let report_error_flag = self.report_error_flag();
let request = request.into_inner();
trace!(
target: LOG_TARGET,
"Incoming GRPC request for SearchPaymentReferencesViaOutputHash: {} hashes",
request.hashes.len()
);
let hashes = request
.hashes
.into_iter()
.map(|s| s.try_into())
.collect::<Result<Vec<_>, _>>()
.map_err(|e| {
obscure_error_if_true(
report_error_flag,
Status::invalid_argument(format!("Invalid hashes provided '{e}'")),
)
})?;
let mut node_service = self.node_service.clone();
let (mut tx, rx) = mpsc::channel(GET_BLOCKS_PAGE_SIZE);
task::spawn(async move {
for output_hash in &hashes {
let mut response = tari_rpc::PaymentReferenceResponse::default();
match node_service.fetch_mined_info_by_output_hash(output_hash).await {
Ok(mined_info) => {
let has_output = mined_info.output.is_some();
let has_input = mined_info.input.is_some();
if let Some(output_info) = mined_info.output {
response.payment_reference_hex =
generate_payment_reference(&output_info.header_hash, output_hash).to_hex();
response.block_height = output_info.mined_height;
response.block_hash = output_info.header_hash.to_vec();
response.mined_timestamp = output_info.mined_timestamp;
response.commitment = output_info.output.commitment.to_vec();
response.min_value_promise = output_info.output.minimum_value_promise.as_u64();
response.output_hash = output_info.output.hash().to_vec();
}
if let Some(input_info) = mined_info.input {
response.is_spent = true;
response.spent_height = input_info.spent_height;
response.spent_block_hash = input_info.header_hash.to_vec();
response.spent_timestamp = input_info.spent_timestamp;
if response.output_hash.is_empty() {
response.output_hash = input_info.input.output_hash().to_vec();
}
}
if has_output || has_input {
trace!(
target: LOG_TARGET,
"GRPC request SearchPaymentReferencesViaOutputHash for {output_hash} found"
);
if tx.send(Ok(response)).await.is_err() {
return;
}
}
},
Err(e) => {
warn!(target: LOG_TARGET, "Error looking up mined info via output hash {output_hash}: {e}");
let error = obscure_error_if_true(
report_error_flag,
Status::internal(format!("Mined info via output hash lookup error: {e}")),
);
if tx.send(Err(error)).await.is_err() {
break;
}
},
}
}
});
trace!(
target: LOG_TARGET,
"Sending SearchPaymentReferencesViaOutputHash response stream to client"
);
Ok(Response::new(rx))
}
#[allow(clippy::too_many_lines)]
async fn search_payment_references(
&self,
request: Request<tari_rpc::SearchPaymentReferencesRequest>,
) -> Result<Response<Self::SearchPaymentReferencesStream>, Status> {
self.check_method_enabled(GrpcMethod::SearchPaymentReferences)?;
let request = request.into_inner();
let report_error_flag = self.report_error_flag();
trace!(
target: LOG_TARGET,
"Incoming GRPC request for SearchPaymentReferences: {} PayRefs",
request.payment_reference_hex.len()
);
let (mut tx, rx) = mpsc::channel(100);
let mut node_service = self.node_service.clone();
task::spawn(async move {
let mut payrefs = Vec::new();
for payref_hex in request.payment_reference_hex {
if payref_hex.len() != 64 || !payref_hex.chars().all(|c| c.is_ascii_hexdigit()) {
let error = obscure_error_if_true(
report_error_flag,
Status::invalid_argument(format!("Invalid PayRef format: {payref_hex}")),
);
if tx.send(Err(error)).await.is_err() {
break;
}
continue;
}
let payref_bytes = match FixedHash::from_hex(&payref_hex) {
Ok(v) => v,
Err(e) => {
let error = obscure_error_if_true(
report_error_flag,
Status::invalid_argument(format!("Invalid PayRef hex: {payref_hex}, {e}")),
);
if tx.send(Err(error)).await.is_err() {
break;
}
continue;
},
};
payrefs.push(payref_bytes);
}
for payref_bytes in request.payment_reference_bytes {
let payref_fixed_hash = match FixedHash::try_from(payref_bytes.clone()) {
Ok(v) => v,
Err(e) => {
let error = obscure_error_if_true(
report_error_flag,
Status::invalid_argument(format!("Invalid PayRef bytes {}, {}", payref_bytes.to_hex(), e)),
);
if tx.send(Err(error)).await.is_err() {
break;
}
continue;
},
};
payrefs.push(payref_fixed_hash);
}
for payref in payrefs {
let mut response = tari_rpc::PaymentReferenceResponse::default();
match node_service.fetch_mined_info_by_payref(&payref).await {
Ok(mined_info) => {
let has_output = mined_info.output.is_some();
let has_input = mined_info.input.is_some();
if let Some(output_info) = mined_info.output {
response.payment_reference_hex = payref.to_hex();
response.block_height = output_info.mined_height;
response.block_hash = output_info.header_hash.to_vec();
response.mined_timestamp = output_info.mined_timestamp;
response.commitment = output_info.output.commitment.to_vec();
response.min_value_promise = output_info.output.minimum_value_promise.as_u64();
response.output_hash = output_info.output.hash().to_vec();
}
if let Some(input_info) = mined_info.input {
response.is_spent = true;
response.spent_height = input_info.spent_height;
response.spent_block_hash = input_info.header_hash.to_vec();
response.spent_timestamp = input_info.spent_timestamp;
if response.output_hash.is_empty() {
response.output_hash = input_info.input.output_hash().to_vec();
}
}
if has_output || has_input {
trace!(target: LOG_TARGET, "GRPC request SearchPaymentReferences for {payref} found");
if tx.send(Ok(response)).await.is_err() {
return;
}
}
},
Err(e) => {
warn!(target: LOG_TARGET, "Error looking up PayRef {payref}: {e}");
let error = obscure_error_if_true(
report_error_flag,
Status::internal(format!("PayRef lookup error: {e}")),
);
if tx.send(Err(error)).await.is_err() {
break;
}
},
}
}
});
Ok(Response::new(rx))
}
async fn get_validator_node_changes(
&self,
request: Request<tari_rpc::GetValidatorNodeChangesRequest>,
) -> Result<Response<tari_rpc::GetValidatorNodeChangesResponse>, Status> {
self.check_method_enabled(GrpcMethod::GetValidatorNodeChanges)?;
let request = request.into_inner();
trace!(target: LOG_TARGET, "Incoming GRPC request for GetValidatorNodeChanges");
let mut handler = self.node_service.clone();
let sidechain_id = Some(request.sidechain_id)
.filter(|id| !id.is_empty())
.map(|id| {
CompressedPublicKey::from_canonical_bytes(&id)
.map_err(|e| Status::invalid_argument(format!("Invalid sidechain_id '{e}'")))
})
.transpose()?;
let changes = handler
.get_validator_node_changes(sidechain_id, VnEpoch(request.epoch))
.await
.map_err(|error| {
warn!(target: LOG_TARGET, "Base node service error: {error}");
Status::internal("Internal error!")
})?
.iter()
.map(|node_change| node_change.into())
.collect();
Ok(Response::new(tari_rpc::GetValidatorNodeChangesResponse { changes }))
}
}
enum BlockGroupType {
BlockFees,
BlockSize,
}
async fn get_block_group(
mut handler: LocalNodeCommsInterface,
request: Request<tari_rpc::BlockGroupRequest>,
block_group_type: BlockGroupType,
report_error_flag: bool,
) -> Result<Response<tari_rpc::BlockGroupResponse>, Status> {
let request = request.into_inner();
let calc_type_response = request.calc_type;
let calc_type: CalcType = request.calc_type();
let height_request: tari_rpc::HeightRequest = request.into();
trace!(
target: LOG_TARGET,
"Incoming GRPC request for GetBlockSize: from_tip: {:?} start_height: {:?} end_height: {:?}",
height_request.from_tip,
height_request.start_height,
height_request.end_height
);
let (start, end) = get_heights(&height_request, handler.clone())
.await
.map_err(|e| obscure_error_if_true(report_error_flag, e))?;
let blocks = match handler.get_blocks(start..=end, false).await {
Err(err) => {
warn!(
target: LOG_TARGET,
"Error communicating with local base node: {err:?}"
);
vec![]
},
Ok(data) => data,
};
let extractor = match block_group_type {
BlockGroupType::BlockFees => block_fees,
BlockGroupType::BlockSize => block_size,
};
let values = blocks.iter().map(extractor).collect::<Vec<u64>>();
let value = match calc_type {
CalcType::Median => median(values).map(|v| vec![v]),
CalcType::Mean => mean(values).map(|v| vec![v]),
CalcType::Quantile => {
return Err(obscure_error_if_true(
report_error_flag,
Status::unimplemented("Quantile has not been implemented"),
));
},
CalcType::Quartile => {
return Err(obscure_error_if_true(
report_error_flag,
Status::unimplemented("Quartile has not been implemented"),
));
},
}
.unwrap_or_default();
trace!(
target: LOG_TARGET,
"Sending GetBlockSize response to client: {value:?}"
);
Ok(Response::new(tari_rpc::BlockGroupResponse {
value,
calc_type: calc_type_response,
}))
}