use crate::{
output_manager_service::{handle::OutputManagerHandle, TxId},
transaction_service::{
config::TransactionServiceConfig,
error::{TransactionServiceError, TransactionServiceProtocolError},
handle::{TransactionEvent, TransactionEventSender, TransactionServiceRequest, TransactionServiceResponse},
protocols::{
transaction_broadcast_protocol::TransactionBroadcastProtocol,
transaction_coinbase_monitoring_protocol::TransactionCoinbaseMonitoringProtocol,
transaction_receive_protocol::{TransactionReceiveProtocol, TransactionReceiveProtocolStage},
transaction_send_protocol::{TransactionSendProtocol, TransactionSendProtocolStage},
},
storage::{
database::{TransactionBackend, TransactionDatabase},
models::{CompletedTransaction, TransactionDirection, TransactionStatus},
},
tasks::{
send_finalized_transaction::send_finalized_transaction_message,
send_transaction_cancelled::send_transaction_cancelled_message,
send_transaction_reply::send_transaction_reply,
},
},
};
use chrono::{NaiveDateTime, Utc};
use futures::{
channel::{mpsc, mpsc::Sender, oneshot},
pin_mut,
stream::FuturesUnordered,
SinkExt,
Stream,
StreamExt,
};
use log::*;
use rand::{rngs::OsRng, RngCore};
use std::{collections::HashMap, convert::TryInto, sync::Arc, time::Duration};
use tari_comms::{connectivity::ConnectivityRequester, peer_manager::NodeIdentity, types::CommsPublicKey};
use tari_comms_dht::outbound::OutboundMessageRequester;
#[cfg(feature = "test_harness")]
use tari_core::transactions::{tari_amount::uT, types::BlindingFactor};
use tari_core::{
proto::base_node as base_node_proto,
transactions::{
tari_amount::MicroTari,
transaction::Transaction,
transaction_protocol::{proto, recipient::RecipientSignedMessage, sender::TransactionSenderMessage},
types::{CryptoFactories, PrivateKey},
},
};
use tari_p2p::domain_message::DomainMessage;
use tari_service_framework::{reply_channel, reply_channel::Receiver};
use tari_shutdown::ShutdownSignal;
use tokio::{sync::broadcast, task::JoinHandle};
const LOG_TARGET: &str = "wallet::transaction_service::service";
pub struct TransactionService<
TTxStream,
TTxReplyStream,
TTxFinalizedStream,
BNResponseStream,
TBackend,
TTxCancelledStream,
> where TBackend: TransactionBackend + 'static
{
config: TransactionServiceConfig,
db: TransactionDatabase<TBackend>,
output_manager_service: OutputManagerHandle,
transaction_stream: Option<TTxStream>,
transaction_reply_stream: Option<TTxReplyStream>,
transaction_finalized_stream: Option<TTxFinalizedStream>,
base_node_response_stream: Option<BNResponseStream>,
transaction_cancelled_stream: Option<TTxCancelledStream>,
request_stream: Option<
reply_channel::Receiver<TransactionServiceRequest, Result<TransactionServiceResponse, TransactionServiceError>>,
>,
event_publisher: TransactionEventSender,
node_identity: Arc<NodeIdentity>,
base_node_public_key: Option<CommsPublicKey>,
resources: TransactionServiceResources<TBackend>,
pending_transaction_reply_senders: HashMap<TxId, Sender<(CommsPublicKey, RecipientSignedMessage)>>,
base_node_response_senders: HashMap<u64, (TxId, Sender<base_node_proto::BaseNodeServiceResponse>)>,
send_transaction_cancellation_senders: HashMap<u64, oneshot::Sender<()>>,
finalized_transaction_senders: HashMap<u64, Sender<(CommsPublicKey, TxId, Transaction)>>,
receiver_transaction_cancellation_senders: HashMap<u64, oneshot::Sender<()>>,
timeout_update_publisher: broadcast::Sender<Duration>,
base_node_update_publisher: broadcast::Sender<CommsPublicKey>,
power_mode: PowerMode,
}
#[allow(clippy::too_many_arguments)]
impl<TTxStream, TTxReplyStream, TTxFinalizedStream, BNResponseStream, TBackend, TTxCancelledStream>
TransactionService<TTxStream, TTxReplyStream, TTxFinalizedStream, BNResponseStream, TBackend, TTxCancelledStream>
where
TTxStream: Stream<Item = DomainMessage<proto::TransactionSenderMessage>>,
TTxReplyStream: Stream<Item = DomainMessage<proto::RecipientSignedMessage>>,
TTxFinalizedStream: Stream<Item = DomainMessage<proto::TransactionFinalizedMessage>>,
BNResponseStream: Stream<Item = DomainMessage<base_node_proto::BaseNodeServiceResponse>>,
TTxCancelledStream: Stream<Item = DomainMessage<proto::TransactionCancelledMessage>>,
TBackend: TransactionBackend + 'static,
{
pub fn new(
config: TransactionServiceConfig,
db: TransactionDatabase<TBackend>,
request_stream: Receiver<
TransactionServiceRequest,
Result<TransactionServiceResponse, TransactionServiceError>,
>,
transaction_stream: TTxStream,
transaction_reply_stream: TTxReplyStream,
transaction_finalized_stream: TTxFinalizedStream,
base_node_response_stream: BNResponseStream,
transaction_cancelled_stream: TTxCancelledStream,
output_manager_service: OutputManagerHandle,
outbound_message_service: OutboundMessageRequester,
connectivity_manager: ConnectivityRequester,
event_publisher: TransactionEventSender,
node_identity: Arc<NodeIdentity>,
factories: CryptoFactories,
shutdown_signal: ShutdownSignal,
) -> Self
{
let resources = TransactionServiceResources {
db: db.clone(),
output_manager_service: output_manager_service.clone(),
outbound_message_service,
connectivity_manager,
event_publisher: event_publisher.clone(),
node_identity: node_identity.clone(),
factories,
config: config.clone(),
shutdown_signal,
};
let (timeout_update_publisher, _) = broadcast::channel(20);
let (base_node_update_publisher, _) = broadcast::channel(20);
TransactionService {
config,
db,
output_manager_service,
transaction_stream: Some(transaction_stream),
transaction_reply_stream: Some(transaction_reply_stream),
transaction_finalized_stream: Some(transaction_finalized_stream),
base_node_response_stream: Some(base_node_response_stream),
transaction_cancelled_stream: Some(transaction_cancelled_stream),
request_stream: Some(request_stream),
event_publisher,
node_identity,
base_node_public_key: None,
resources,
pending_transaction_reply_senders: HashMap::new(),
base_node_response_senders: HashMap::new(),
send_transaction_cancellation_senders: HashMap::new(),
finalized_transaction_senders: HashMap::new(),
receiver_transaction_cancellation_senders: HashMap::new(),
timeout_update_publisher,
base_node_update_publisher,
power_mode: PowerMode::Normal,
}
}
#[warn(unreachable_code)]
pub async fn start(mut self) -> Result<(), TransactionServiceError> {
let request_stream = self
.request_stream
.take()
.expect("Transaction Service initialized without request_stream")
.fuse();
pin_mut!(request_stream);
let transaction_stream = self
.transaction_stream
.take()
.expect("Transaction Service initialized without transaction_stream")
.fuse();
pin_mut!(transaction_stream);
let transaction_reply_stream = self
.transaction_reply_stream
.take()
.expect("Transaction Service initialized without transaction_reply_stream")
.fuse();
pin_mut!(transaction_reply_stream);
let transaction_finalized_stream = self
.transaction_finalized_stream
.take()
.expect("Transaction Service initialized without transaction_finalized_stream")
.fuse();
pin_mut!(transaction_finalized_stream);
let base_node_response_stream = self
.base_node_response_stream
.take()
.expect("Transaction Service initialized without base_node_response_stream")
.fuse();
pin_mut!(base_node_response_stream);
let transaction_cancelled_stream = self
.transaction_cancelled_stream
.take()
.expect("Transaction Service initialized without transaction_cancelled_stream")
.fuse();
pin_mut!(transaction_cancelled_stream);
let mut shutdown = self.resources.shutdown_signal.clone();
let mut send_transaction_protocol_handles: FuturesUnordered<
JoinHandle<Result<u64, TransactionServiceProtocolError>>,
> = FuturesUnordered::new();
let mut receive_transaction_protocol_handles: FuturesUnordered<
JoinHandle<Result<u64, TransactionServiceProtocolError>>,
> = FuturesUnordered::new();
let mut transaction_broadcast_protocol_handles: FuturesUnordered<
JoinHandle<Result<u64, TransactionServiceProtocolError>>,
> = FuturesUnordered::new();
let mut coinbase_transaction_monitoring_protocol_handles: FuturesUnordered<
JoinHandle<Result<u64, TransactionServiceProtocolError>>,
> = FuturesUnordered::new();
info!(target: LOG_TARGET, "Transaction Service started");
loop {
futures::select! {
request_context = request_stream.select_next_some() => {
trace!(target: LOG_TARGET, "Handling Service API Request");
let (request, reply_tx) = request_context.split();
let response = self.handle_request(request,
&mut send_transaction_protocol_handles,
&mut receive_transaction_protocol_handles,
&mut transaction_broadcast_protocol_handles,
&mut coinbase_transaction_monitoring_protocol_handles).await.map_err(|e| {
warn!(target: LOG_TARGET, "Error handling request: {:?}", e);
e
});
let _ = reply_tx.send(response).map_err(|e| {
warn!(target: LOG_TARGET, "Failed to send reply");
e
});
},
msg = transaction_stream.select_next_some() => {
let (origin_public_key, inner_msg) = msg.clone().into_origin_and_inner();
trace!(target: LOG_TARGET, "Handling Transaction Message, Trace: {}", msg.dht_header.message_tag);
let result = self.accept_transaction(origin_public_key, inner_msg,
msg.dht_header.message_tag.as_value(), &mut receive_transaction_protocol_handles).await;
match result {
Err(TransactionServiceError::RepeatedMessageError) => {
trace!(target: LOG_TARGET, "A repeated Transaction message was received, Trace: {}",
msg.dht_header.message_tag);
}
Err(e) => {
warn!(target: LOG_TARGET, "Failed to handle incoming Transaction message: {:?} for NodeID: {}, Trace: {}",
e, self.node_identity.node_id().short_str(), msg.dht_header.message_tag);
let _ = self.event_publisher.send(Arc::new(TransactionEvent::Error(format!("Error handling \
Transaction Sender message: {:?}", e).to_string())));
}
_ => (),
}
},
msg = transaction_reply_stream.select_next_some() => {
let (origin_public_key, inner_msg) = msg.clone().into_origin_and_inner();
trace!(target: LOG_TARGET, "Handling Transaction Reply Message, Trace: {}", msg.dht_header.message_tag);
let result = self.accept_recipient_reply(origin_public_key, inner_msg).await;
match result {
Err(TransactionServiceError::TransactionDoesNotExistError) => {
trace!(target: LOG_TARGET, "Unable to handle incoming Transaction Reply message from NodeId: \
{} due to Transaction not existing. This usually means the message was a repeated message \
from Store and Forward, Trace: {}", self.node_identity.node_id().short_str(),
msg.dht_header.message_tag);
},
Err(e) => {
warn!(target: LOG_TARGET, "Failed to handle incoming Transaction Reply message: {:?} \
for NodeId: {}, Trace: {}", e, self.node_identity.node_id().short_str(),
msg.dht_header.message_tag);
let _ = self.event_publisher.send(Arc::new(TransactionEvent::Error("Error handling \
Transaction Recipient Reply message".to_string())));
},
Ok(_) => (),
}
},
msg = transaction_finalized_stream.select_next_some() => {
let (origin_public_key, inner_msg) = msg.clone().into_origin_and_inner();
trace!(target: LOG_TARGET, "Handling Transaction Finalized Message, Trace: {}",
msg.dht_header.message_tag.as_value());
let result = self.accept_finalized_transaction(origin_public_key, inner_msg, ).await;
match result {
Err(TransactionServiceError::TransactionDoesNotExistError) => {
trace!(target: LOG_TARGET, "Unable to handle incoming Finalized Transaction message from NodeId: \
{} due to Transaction not existing. This usually means the message was a repeated message \
from Store and Forward, Trace: {}", self.node_identity.node_id().short_str(),
msg.dht_header.message_tag);
},
Err(e) => {
warn!(target: LOG_TARGET, "Failed to handle incoming Transaction Finalized message: {:?} \
for NodeID: {}, Trace: {}", e , self.node_identity.node_id().short_str(),
msg.dht_header.message_tag.as_value());
let _ = self.event_publisher.send(Arc::new(TransactionEvent::Error("Error handling Transaction \
Finalized message".to_string(),)));
},
Ok(_) => ()
}
},
msg = base_node_response_stream.select_next_some() => {
let (origin_public_key, inner_msg) = msg.clone().into_origin_and_inner();
trace!(target: LOG_TARGET, "Handling Base Node Response, Trace: {}", msg.dht_header.message_tag);
let _ = self.handle_base_node_response(inner_msg).await.map_err(|e| {
warn!(target: LOG_TARGET, "Error handling base node service response from {}: {:?} for \
NodeID: {}, Trace: {}", origin_public_key, e, self.node_identity.node_id().short_str(),
msg.dht_header.message_tag.as_value());
e
});
}
msg = transaction_cancelled_stream.select_next_some() => {
let (origin_public_key, inner_msg) = msg.clone().into_origin_and_inner();
trace!(target: LOG_TARGET, "Handling Transaction Cancelled message, Trace: {}", msg.dht_header.message_tag);
if let Err(e) = self.handle_transaction_cancelled_message(origin_public_key, inner_msg, ).await {
warn!(target: LOG_TARGET, "Error handing Transaction Cancelled Message: {:?}", e);
}
}
join_result = send_transaction_protocol_handles.select_next_some() => {
trace!(target: LOG_TARGET, "Send Protocol for Transaction has ended with result {:?}", join_result);
match join_result {
Ok(join_result_inner) => self.complete_send_transaction_protocol(join_result_inner,
&mut transaction_broadcast_protocol_handles).await,
Err(e) => error!(target: LOG_TARGET, "Error resolving Send Transaction Protocol: {:?}", e),
};
}
join_result = receive_transaction_protocol_handles.select_next_some() => {
trace!(target: LOG_TARGET, "Receive Transaction Protocol has ended with result {:?}", join_result);
match join_result {
Ok(join_result_inner) => self.complete_receive_transaction_protocol(join_result_inner,
&mut transaction_broadcast_protocol_handles).await,
Err(e) => error!(target: LOG_TARGET, "Error resolving Send Transaction Protocol: {:?}", e),
};
}
join_result = transaction_broadcast_protocol_handles.select_next_some() => {
trace!(target: LOG_TARGET, "Transaction Broadcast protocol has ended with result {:?}", join_result);
match join_result {
Ok(join_result_inner) => self.complete_transaction_broadcast_protocol(join_result_inner).await,
Err(e) => error!(target: LOG_TARGET, "Error resolving Broadcast Protocol: {:?}", e),
};
}
join_result = coinbase_transaction_monitoring_protocol_handles.select_next_some() => {
trace!(target: LOG_TARGET, "Coinbase transaction monitoring protocol has ended with result {:?}",
join_result);
match join_result {
Ok(join_result_inner) => self.complete_coinbase_transaction_monitoring_protocol(join_result_inner),
Err(e) => error!(target: LOG_TARGET, "Error resolving Coinbase Monitoring protocol: {:?}", e),
};
}
_ = shutdown => {
info!(target: LOG_TARGET, "Transaction service shutting down because it received the shutdown signal");
break;
}
complete => {
info!(target: LOG_TARGET, "Transaction service shutting down");
break;
}
}
}
info!(target: LOG_TARGET, "Transaction service shut down");
Ok(())
}
async fn handle_request(
&mut self,
request: TransactionServiceRequest,
send_transaction_join_handles: &mut FuturesUnordered<JoinHandle<Result<u64, TransactionServiceProtocolError>>>,
receive_transaction_join_handles: &mut FuturesUnordered<
JoinHandle<Result<u64, TransactionServiceProtocolError>>,
>,
transaction_broadcast_join_handles: &mut FuturesUnordered<
JoinHandle<Result<u64, TransactionServiceProtocolError>>,
>,
coinbase_monitoring_join_handles: &mut FuturesUnordered<
JoinHandle<Result<u64, TransactionServiceProtocolError>>,
>,
) -> Result<TransactionServiceResponse, TransactionServiceError>
{
trace!(target: LOG_TARGET, "Handling Service Request: {}", request);
match request {
TransactionServiceRequest::SendTransaction((dest_pubkey, amount, fee_per_gram, message)) => self
.send_transaction(
dest_pubkey,
amount,
fee_per_gram,
message,
send_transaction_join_handles,
)
.await
.map(TransactionServiceResponse::TransactionSent),
TransactionServiceRequest::CancelTransaction(tx_id) => self
.cancel_transaction(tx_id)
.await
.map(|_| TransactionServiceResponse::TransactionCancelled),
TransactionServiceRequest::GetPendingInboundTransactions => {
Ok(TransactionServiceResponse::PendingInboundTransactions(
self.db.get_pending_inbound_transactions().await?,
))
},
TransactionServiceRequest::GetPendingOutboundTransactions => {
Ok(TransactionServiceResponse::PendingOutboundTransactions(
self.db.get_pending_outbound_transactions().await?,
))
},
TransactionServiceRequest::GetCompletedTransactions => Ok(
TransactionServiceResponse::CompletedTransactions(self.db.get_completed_transactions().await?),
),
TransactionServiceRequest::GetCancelledPendingInboundTransactions => {
Ok(TransactionServiceResponse::PendingInboundTransactions(
self.db.get_cancelled_pending_inbound_transactions().await?,
))
},
TransactionServiceRequest::GetCancelledPendingOutboundTransactions => {
Ok(TransactionServiceResponse::PendingOutboundTransactions(
self.db.get_cancelled_pending_outbound_transactions().await?,
))
},
TransactionServiceRequest::GetCancelledCompletedTransactions => {
Ok(TransactionServiceResponse::CompletedTransactions(
self.db.get_cancelled_completed_transactions().await?,
))
},
TransactionServiceRequest::GetCompletedTransaction(tx_id) => {
Ok(TransactionServiceResponse::CompletedTransaction(Box::new(
self.db.get_completed_transaction(tx_id).await?,
)))
},
TransactionServiceRequest::GetAnyTransaction(tx_id) => Ok(TransactionServiceResponse::AnyTransaction(
Box::new(self.db.get_any_transaction(tx_id).await?),
)),
TransactionServiceRequest::SetBaseNodePublicKey(public_key) => {
self.set_base_node_public_key(public_key);
Ok(TransactionServiceResponse::BaseNodePublicKeySet)
},
TransactionServiceRequest::ImportUtxo(value, source_public_key, message) => self
.add_utxo_import_transaction(value, source_public_key, message)
.await
.map(TransactionServiceResponse::UtxoImported),
TransactionServiceRequest::SubmitTransaction((tx_id, tx, fee, amount, message)) => self
.submit_transaction(transaction_broadcast_join_handles, tx_id, tx, fee, amount, message)
.await
.map(|_| TransactionServiceResponse::TransactionSubmitted),
TransactionServiceRequest::GenerateCoinbaseTransaction(reward, fees, block_height) => self
.generate_coinbase_transaction(reward, fees, block_height, coinbase_monitoring_join_handles)
.await
.map(|tx| TransactionServiceResponse::CoinbaseTransactionGenerated(Box::new(tx))),
#[cfg(feature = "test_harness")]
TransactionServiceRequest::CompletePendingOutboundTransaction(completed_transaction) => {
self.complete_pending_outbound_transaction(completed_transaction)
.await?;
Ok(TransactionServiceResponse::CompletedPendingTransaction)
},
#[cfg(feature = "test_harness")]
TransactionServiceRequest::FinalizePendingInboundTransaction(tx_id) => {
self.finalize_received_test_transaction(tx_id).await?;
Ok(TransactionServiceResponse::FinalizedPendingInboundTransaction)
},
#[cfg(feature = "test_harness")]
TransactionServiceRequest::AcceptTestTransaction((tx_id, amount, source_pubkey)) => {
self.receive_test_transaction(tx_id, amount, source_pubkey).await?;
Ok(TransactionServiceResponse::AcceptedTestTransaction)
},
#[cfg(feature = "test_harness")]
TransactionServiceRequest::BroadcastTransaction(tx_id) => {
self.broadcast_transaction(tx_id).await?;
Ok(TransactionServiceResponse::TransactionBroadcast)
},
#[cfg(feature = "test_harness")]
TransactionServiceRequest::MineTransaction(tx_id) => {
self.mine_transaction(tx_id).await?;
Ok(TransactionServiceResponse::TransactionMined)
},
TransactionServiceRequest::SetLowPowerMode => {
self.set_power_mode(PowerMode::Low).await?;
Ok(TransactionServiceResponse::LowPowerModeSet)
},
TransactionServiceRequest::SetNormalPowerMode => {
self.set_power_mode(PowerMode::Normal).await?;
Ok(TransactionServiceResponse::NormalPowerModeSet)
},
TransactionServiceRequest::ApplyEncryption(cipher) => self
.db
.apply_encryption(*cipher)
.await
.map(|_| TransactionServiceResponse::EncryptionApplied)
.map_err(TransactionServiceError::TransactionStorageError),
TransactionServiceRequest::RemoveEncryption => self
.db
.remove_encryption()
.await
.map(|_| TransactionServiceResponse::EncryptionRemoved)
.map_err(TransactionServiceError::TransactionStorageError),
TransactionServiceRequest::RestartTransactionProtocols => self
.restart_transaction_negotiation_protocols(
send_transaction_join_handles,
receive_transaction_join_handles,
)
.await
.map(|_| TransactionServiceResponse::ProtocolsRestarted),
TransactionServiceRequest::RestartBroadcastProtocols => self
.restart_broadcast_protocols(transaction_broadcast_join_handles, coinbase_monitoring_join_handles)
.await
.map(|_| TransactionServiceResponse::ProtocolsRestarted),
}
}
pub async fn send_transaction(
&mut self,
dest_pubkey: CommsPublicKey,
amount: MicroTari,
fee_per_gram: MicroTari,
message: String,
join_handles: &mut FuturesUnordered<JoinHandle<Result<u64, TransactionServiceProtocolError>>>,
) -> Result<TxId, TransactionServiceError>
{
let sender_protocol = self
.output_manager_service
.prepare_transaction_to_send(amount, fee_per_gram, None, message.clone())
.await?;
let tx_id = sender_protocol.get_tx_id()?;
let (tx_reply_sender, tx_reply_receiver) = mpsc::channel(100);
let (cancellation_sender, cancellation_receiver) = oneshot::channel();
self.pending_transaction_reply_senders.insert(tx_id, tx_reply_sender);
self.send_transaction_cancellation_senders
.insert(tx_id, cancellation_sender);
let protocol = TransactionSendProtocol::new(
tx_id,
self.resources.clone(),
tx_reply_receiver,
cancellation_receiver,
dest_pubkey,
amount,
message,
sender_protocol,
TransactionSendProtocolStage::Initial,
);
let join_handle = tokio::spawn(protocol.execute());
join_handles.push(join_handle);
Ok(tx_id)
}
pub async fn accept_recipient_reply(
&mut self,
source_pubkey: CommsPublicKey,
recipient_reply: proto::RecipientSignedMessage,
) -> Result<(), TransactionServiceError>
{
let recipient_reply: RecipientSignedMessage = recipient_reply
.try_into()
.map_err(TransactionServiceError::InvalidMessageError)?;
let tx_id = recipient_reply.tx_id;
let cancelled_outbound_tx = self.db.get_cancelled_pending_outbound_transaction(tx_id).await;
let completed_tx = self.db.get_completed_transaction_cancelled_or_not(tx_id).await;
let check_cooldown = |timestamp: Option<NaiveDateTime>| {
if let Some(t) = timestamp {
if let Ok(elapsed_time) = Utc::now().naive_utc().signed_duration_since(t).to_std() {
if elapsed_time < self.resources.config.resend_response_cooldown {
trace!(
target: LOG_TARGET,
"A repeated Transaction Reply (TxId: {}) has been received before the resend cooldown has \
expired. Ignoring.",
tx_id
);
return false;
}
}
}
true
};
if let Ok(ctx) = completed_tx {
if ctx.destination_public_key != source_pubkey {
return Err(TransactionServiceError::InvalidSourcePublicKey);
}
if !check_cooldown(ctx.last_send_timestamp) {
return Ok(());
}
if ctx.cancelled {
debug!(
target: LOG_TARGET,
"A repeated Transaction Reply (TxId: {}) has been received for cancelled completed transaction. \
Transaction Cancelled response is being sent.",
tx_id
);
tokio::spawn(send_transaction_cancelled_message(
tx_id,
source_pubkey.clone(),
self.resources.outbound_message_service.clone(),
));
} else {
debug!(
target: LOG_TARGET,
"A repeated Transaction Reply (TxId: {}) has been received. Reply is being resent.", tx_id
);
tokio::spawn(send_finalized_transaction_message(
tx_id,
ctx.transaction,
source_pubkey.clone(),
self.resources.outbound_message_service.clone(),
self.resources.config.direct_send_timeout,
));
}
if let Err(e) = self.resources.db.increment_send_count(tx_id).await {
warn!(
target: LOG_TARGET,
"Could not increment send count for completed transaction TxId {}: {:?}", tx_id, e
);
}
return Ok(());
}
if let Ok(otx) = cancelled_outbound_tx {
if otx.destination_public_key != source_pubkey {
return Err(TransactionServiceError::InvalidSourcePublicKey);
}
if !check_cooldown(otx.last_send_timestamp) {
return Ok(());
}
debug!(
target: LOG_TARGET,
"A repeated Transaction Reply (TxId: {}) has been received for cancelled pending outbound \
transaction. Transaction Cancelled response is being sent.",
tx_id
);
tokio::spawn(send_transaction_cancelled_message(
tx_id,
source_pubkey.clone(),
self.resources.outbound_message_service.clone(),
));
if let Err(e) = self.resources.db.increment_send_count(tx_id).await {
warn!(
target: LOG_TARGET,
"Could not increment send count for completed transaction TxId {}: {:?}", tx_id, e
);
}
return Ok(());
}
let sender = match self.pending_transaction_reply_senders.get_mut(&tx_id) {
None => return Err(TransactionServiceError::TransactionDoesNotExistError),
Some(s) => s,
};
sender
.send((source_pubkey, recipient_reply))
.await
.map_err(|_| TransactionServiceError::ProtocolChannelError)?;
Ok(())
}
async fn complete_send_transaction_protocol(
&mut self,
join_result: Result<u64, TransactionServiceProtocolError>,
transaction_broadcast_join_handles: &mut FuturesUnordered<
JoinHandle<Result<u64, TransactionServiceProtocolError>>,
>,
)
{
match join_result {
Ok(id) => {
let _ = self.pending_transaction_reply_senders.remove(&id);
let _ = self.send_transaction_cancellation_senders.remove(&id);
let _ = self
.broadcast_completed_transaction(id, transaction_broadcast_join_handles)
.await
.map_err(|resp| {
error!(
target: LOG_TARGET,
"Error starting Broadcast Protocol after completed Send Transaction Protocol : {:?}", resp
);
resp
});
trace!(
target: LOG_TARGET,
"Send Transaction Protocol for TxId: {} completed successfully",
id
);
},
Err(TransactionServiceProtocolError { id, error }) => {
let _ = self.pending_transaction_reply_senders.remove(&id);
let _ = self.send_transaction_cancellation_senders.remove(&id);
if let TransactionServiceError::Shutdown = error {
return;
}
warn!(
target: LOG_TARGET,
"Error completing Send Transaction Protocol (Id: {}): {:?}", id, error
);
let _ = self
.event_publisher
.send(Arc::new(TransactionEvent::Error(format!("{:?}", error))));
},
}
}
async fn cancel_transaction(&mut self, tx_id: TxId) -> Result<(), TransactionServiceError> {
self.db.cancel_pending_transaction(tx_id).await.map_err(|e| {
warn!(
target: LOG_TARGET,
"Pending Transaction does not exist and could not be cancelled: {:?}", e
);
e
})?;
self.output_manager_service.cancel_transaction(tx_id).await?;
if let Some(cancellation_sender) = self.send_transaction_cancellation_senders.remove(&tx_id) {
let _ = cancellation_sender.send(());
}
let _ = self.pending_transaction_reply_senders.remove(&tx_id);
if let Some(cancellation_sender) = self.receiver_transaction_cancellation_senders.remove(&tx_id) {
let _ = cancellation_sender.send(());
}
let _ = self.finalized_transaction_senders.remove(&tx_id);
let _ = self
.event_publisher
.send(Arc::new(TransactionEvent::TransactionCancelled(tx_id)))
.map_err(|e| {
trace!(
target: LOG_TARGET,
"Error sending event because there are no subscribers: {:?}",
e
);
e
});
info!(target: LOG_TARGET, "Pending Transaction (TxId: {}) cancelled", tx_id);
Ok(())
}
pub async fn handle_transaction_cancelled_message(
&mut self,
source_pubkey: CommsPublicKey,
transaction_cancelled: proto::TransactionCancelledMessage,
) -> Result<(), TransactionServiceError>
{
let tx_id = transaction_cancelled.tx_id;
if let Ok(inbound_tx) = self.db.get_pending_inbound_transaction(tx_id).await {
if inbound_tx.source_public_key == source_pubkey {
self.cancel_transaction(tx_id).await?;
} else {
trace!(
target: LOG_TARGET,
"Received a Transaction Cancelled (TxId: {}) message from an unknown source, ignoring",
tx_id
);
}
}
Ok(())
}
#[allow(clippy::map_entry)]
async fn restart_all_send_transaction_protocols(
&mut self,
join_handles: &mut FuturesUnordered<JoinHandle<Result<u64, TransactionServiceProtocolError>>>,
) -> Result<(), TransactionServiceError>
{
let outbound_txs = self.db.get_pending_outbound_transactions().await?;
for (tx_id, tx) in outbound_txs {
if !self.pending_transaction_reply_senders.contains_key(&tx_id) {
debug!(
target: LOG_TARGET,
"Restarting listening for Reply for Pending Outbound Transaction TxId: {}", tx_id
);
let (tx_reply_sender, tx_reply_receiver) = mpsc::channel(100);
let (cancellation_sender, cancellation_receiver) = oneshot::channel();
self.pending_transaction_reply_senders.insert(tx_id, tx_reply_sender);
self.send_transaction_cancellation_senders
.insert(tx_id, cancellation_sender);
let protocol = TransactionSendProtocol::new(
tx_id,
self.resources.clone(),
tx_reply_receiver,
cancellation_receiver,
tx.destination_public_key,
tx.amount,
tx.message,
tx.sender_protocol,
TransactionSendProtocolStage::WaitForReply,
);
let join_handle = tokio::spawn(protocol.execute());
join_handles.push(join_handle);
}
}
Ok(())
}
pub async fn accept_transaction(
&mut self,
source_pubkey: CommsPublicKey,
sender_message: proto::TransactionSenderMessage,
traced_message_tag: u64,
join_handles: &mut FuturesUnordered<JoinHandle<Result<u64, TransactionServiceProtocolError>>>,
) -> Result<(), TransactionServiceError>
{
let sender_message: TransactionSenderMessage = sender_message
.try_into()
.map_err(TransactionServiceError::InvalidMessageError)?;
if let TransactionSenderMessage::Single(data) = sender_message.clone() {
trace!(
target: LOG_TARGET,
"Transaction (TxId: {}) received from {}, Trace: {}",
data.tx_id,
source_pubkey,
traced_message_tag
);
if let Ok(inbound_tx) = self.db.get_pending_inbound_transaction(data.tx_id).await {
if inbound_tx.source_public_key != source_pubkey {
return Err(TransactionServiceError::InvalidSourcePublicKey);
}
if let Some(timestamp) = inbound_tx.last_send_timestamp {
let elapsed_time = Utc::now()
.naive_utc()
.signed_duration_since(timestamp)
.to_std()
.map_err(|_| {
TransactionServiceError::ConversionError("duration::OutOfRangeError".to_string())
})?;
if elapsed_time < self.resources.config.resend_response_cooldown {
trace!(
target: LOG_TARGET,
"A repeated Transaction (TxId: {}) has been received before the resend cooldown has \
expired. Ignoring.",
inbound_tx.tx_id
);
return Ok(());
}
}
debug!(
target: LOG_TARGET,
"A repeated Transaction (TxId: {}) has been received. Reply is being resent.", inbound_tx.tx_id
);
let tx_id = inbound_tx.tx_id;
tokio::spawn(send_transaction_reply(
inbound_tx,
self.resources.outbound_message_service.clone(),
self.resources.config.direct_send_timeout,
));
if let Err(e) = self.resources.db.increment_send_count(tx_id).await {
warn!(
target: LOG_TARGET,
"Could not increment send count for inbound transaction TxId {}: {:?}", tx_id, e
);
}
return Ok(());
}
if self.finalized_transaction_senders.contains_key(&data.tx_id) ||
self.receiver_transaction_cancellation_senders.contains_key(&data.tx_id)
{
trace!(
target: LOG_TARGET,
"Transaction (TxId: {}) has already been received, this is probably a repeated message, Trace:
{}.",
data.tx_id,
traced_message_tag
);
return Err(TransactionServiceError::RepeatedMessageError);
}
let (tx_finalized_sender, tx_finalized_receiver) = mpsc::channel(100);
let (cancellation_sender, cancellation_receiver) = oneshot::channel();
self.finalized_transaction_senders
.insert(data.tx_id, tx_finalized_sender);
self.receiver_transaction_cancellation_senders
.insert(data.tx_id, cancellation_sender);
let protocol = TransactionReceiveProtocol::new(
data.tx_id,
source_pubkey,
sender_message,
TransactionReceiveProtocolStage::Initial,
self.resources.clone(),
tx_finalized_receiver,
cancellation_receiver,
);
let join_handle = tokio::spawn(protocol.execute());
join_handles.push(join_handle);
Ok(())
} else {
Err(TransactionServiceError::InvalidStateError)
}
}
pub async fn accept_finalized_transaction(
&mut self,
source_pubkey: CommsPublicKey,
finalized_transaction: proto::TransactionFinalizedMessage,
) -> Result<(), TransactionServiceError>
{
let tx_id = finalized_transaction.tx_id;
let transaction: Transaction = finalized_transaction
.transaction
.ok_or_else(|| {
TransactionServiceError::InvalidMessageError(
"Finalized Transaction missing Transaction field".to_string(),
)
})?
.try_into()
.map_err(|_| {
TransactionServiceError::InvalidMessageError(
"Cannot convert Transaction field from TransactionFinalized message".to_string(),
)
})?;
let sender = match self.finalized_transaction_senders.get_mut(&tx_id) {
None => return Err(TransactionServiceError::TransactionDoesNotExistError),
Some(s) => s,
};
sender
.send((source_pubkey, tx_id, transaction))
.await
.map_err(|_| TransactionServiceError::ProtocolChannelError)?;
Ok(())
}
async fn complete_receive_transaction_protocol(
&mut self,
join_result: Result<u64, TransactionServiceProtocolError>,
transaction_broadcast_join_handles: &mut FuturesUnordered<
JoinHandle<Result<u64, TransactionServiceProtocolError>>,
>,
)
{
match join_result {
Ok(id) => {
let _ = self.finalized_transaction_senders.remove(&id);
let _ = self.receiver_transaction_cancellation_senders.remove(&id);
let _ = self
.broadcast_completed_transaction(id, transaction_broadcast_join_handles)
.await
.map_err(|e| {
warn!(
target: LOG_TARGET,
"Error broadcasting completed transaction TxId: {} to mempool: {:?}", id, e
);
e
});
trace!(
target: LOG_TARGET,
"Receive Transaction Protocol for TxId: {} completed successfully",
id
);
},
Err(TransactionServiceProtocolError { id, error }) => {
let _ = self.finalized_transaction_senders.remove(&id);
let _ = self.receiver_transaction_cancellation_senders.remove(&id);
match error {
TransactionServiceError::RepeatedMessageError => debug!(
target: LOG_TARGET,
"Receive Transaction Protocol (Id: {}) aborted as it is a repeated transaction that has \
already been processed",
id
),
TransactionServiceError::Shutdown => {
return;
},
_ => warn!(
target: LOG_TARGET,
"Error completing Receive Transaction Protocol (Id: {}): {}", id, error
),
}
let _ = self
.event_publisher
.send(Arc::new(TransactionEvent::Error(format!("{:?}", error))));
},
}
}
async fn restart_all_receive_transaction_protocols(
&mut self,
join_handles: &mut FuturesUnordered<JoinHandle<Result<u64, TransactionServiceProtocolError>>>,
) -> Result<(), TransactionServiceError>
{
let inbound_txs = self.db.get_pending_inbound_transactions().await?;
for (tx_id, tx) in inbound_txs {
if !self.pending_transaction_reply_senders.contains_key(&tx_id) {
debug!(
target: LOG_TARGET,
"Restarting listening for Transaction Finalize for Pending Inbound Transaction TxId: {}", tx_id
);
let (tx_finalized_sender, tx_finalized_receiver) = mpsc::channel(100);
let (cancellation_sender, cancellation_receiver) = oneshot::channel();
self.finalized_transaction_senders.insert(tx_id, tx_finalized_sender);
self.receiver_transaction_cancellation_senders
.insert(tx_id, cancellation_sender);
let protocol = TransactionReceiveProtocol::new(
tx_id,
tx.source_public_key,
TransactionSenderMessage::None,
TransactionReceiveProtocolStage::WaitForFinalize,
self.resources.clone(),
tx_finalized_receiver,
cancellation_receiver,
);
let join_handle = tokio::spawn(protocol.execute());
join_handles.push(join_handle);
}
}
Ok(())
}
fn set_base_node_public_key(&mut self, base_node_public_key: CommsPublicKey) {
info!(
target: LOG_TARGET,
"Setting base node public key {} for service", base_node_public_key
);
self.base_node_public_key = Some(base_node_public_key.clone());
if let Err(e) = self.base_node_update_publisher.send(base_node_public_key) {
trace!(
target: LOG_TARGET,
"No subscribers to receive base node public key update: {:?}",
e
);
}
}
async fn restart_transaction_negotiation_protocols(
&mut self,
send_transaction_join_handles: &mut FuturesUnordered<JoinHandle<Result<u64, TransactionServiceProtocolError>>>,
receive_transaction_join_handles: &mut FuturesUnordered<
JoinHandle<Result<u64, TransactionServiceProtocolError>>,
>,
) -> Result<(), TransactionServiceError>
{
trace!(target: LOG_TARGET, "Restarting transaction negotiation protocols");
self.restart_all_send_transaction_protocols(send_transaction_join_handles)
.await
.map_err(|resp| {
error!(
target: LOG_TARGET,
"Error restarting protocols for all pending outbound transactions: {:?}", resp
);
resp
})?;
self.restart_all_receive_transaction_protocols(receive_transaction_join_handles)
.await
.map_err(|resp| {
error!(
target: LOG_TARGET,
"Error restarting protocols for all coinbase transactions: {:?}", resp
);
resp
})?;
Ok(())
}
async fn restart_broadcast_protocols(
&mut self,
broadcast_join_handles: &mut FuturesUnordered<JoinHandle<Result<u64, TransactionServiceProtocolError>>>,
coinbase_transaction_join_handles: &mut FuturesUnordered<
JoinHandle<Result<u64, TransactionServiceProtocolError>>,
>,
) -> Result<(), TransactionServiceError>
{
if self.base_node_public_key.is_none() {
return Err(TransactionServiceError::NoBaseNodeKeysProvided);
}
trace!(target: LOG_TARGET, "Restarting transaction broadcast protocols");
self.broadcast_all_completed_transactions(broadcast_join_handles)
.await
.map_err(|resp| {
error!(
target: LOG_TARGET,
"Error broadcasting all completed transactions: {:?}", resp
);
resp
})?;
self.restart_chain_monitoring_for_all_coinbase_transactions(coinbase_transaction_join_handles)
.await
.map_err(|resp| {
error!(
target: LOG_TARGET,
"Error restarting protocols for all coinbase transactions: {:?}", resp
);
resp
})?;
Ok(())
}
pub async fn broadcast_completed_transaction(
&mut self,
tx_id: TxId,
join_handles: &mut FuturesUnordered<JoinHandle<Result<u64, TransactionServiceProtocolError>>>,
) -> Result<(), TransactionServiceError>
{
let completed_tx = self.db.get_completed_transaction(tx_id).await?;
if !(completed_tx.status == TransactionStatus::Completed || completed_tx.status == TransactionStatus::Broadcast) ||
completed_tx.transaction.body.kernels().is_empty()
{
return Err(TransactionServiceError::InvalidCompletedTransaction);
}
let timeout = match self.power_mode {
PowerMode::Normal => self.config.broadcast_monitoring_timeout,
PowerMode::Low => self.config.low_power_polling_timeout,
};
match self.base_node_public_key.clone() {
None => return Err(TransactionServiceError::NoBaseNodeKeysProvided),
Some(pk) => {
let protocol = TransactionBroadcastProtocol::new(
tx_id,
self.resources.clone(),
timeout,
pk,
self.timeout_update_publisher.subscribe(),
self.base_node_update_publisher.subscribe(),
);
let join_handle = tokio::spawn(protocol.execute());
join_handles.push(join_handle);
},
}
Ok(())
}
async fn broadcast_all_completed_transactions(
&mut self,
join_handles: &mut FuturesUnordered<JoinHandle<Result<u64, TransactionServiceProtocolError>>>,
) -> Result<(), TransactionServiceError>
{
trace!(target: LOG_TARGET, "Attempting to Broadcast all Completed Transactions");
let completed_txs = self.db.get_completed_transactions().await?;
for completed_tx in completed_txs.values() {
if completed_tx.status == TransactionStatus::Completed ||
completed_tx.status == TransactionStatus::Broadcast
{
self.broadcast_completed_transaction(completed_tx.tx_id, join_handles)
.await?;
}
}
Ok(())
}
async fn complete_transaction_broadcast_protocol(
&mut self,
join_result: Result<u64, TransactionServiceProtocolError>,
)
{
match join_result {
Ok(id) => {
debug!(
target: LOG_TARGET,
"Transaction Broadcast Protocol for TxId: {} completed successfully", id
);
},
Err(TransactionServiceProtocolError { id, error }) => {
if let TransactionServiceError::Shutdown = error {
return;
}
warn!(
target: LOG_TARGET,
"Error completing Transaction Broadcast Protocol (Id: {}): {:?}", id, error
);
let _ = self
.event_publisher
.send(Arc::new(TransactionEvent::Error(format!("{:?}", error))));
},
}
}
pub async fn handle_base_node_response(
&mut self,
response: base_node_proto::BaseNodeServiceResponse,
) -> Result<(), TransactionServiceError>
{
let sender = match self.base_node_response_senders.get_mut(&response.request_key) {
None => {
trace!(
target: LOG_TARGET,
"Received Base Node response with unexpected key: {}. Not for this service",
response.request_key
);
return Ok(());
},
Some((_, s)) => s,
};
sender
.send(response.clone())
.await
.map_err(|_| TransactionServiceError::ProtocolChannelError)?;
Ok(())
}
async fn set_power_mode(&mut self, mode: PowerMode) -> Result<(), TransactionServiceError> {
self.power_mode = mode;
let timeout = match mode {
PowerMode::Low => self.config.low_power_polling_timeout,
PowerMode::Normal => self.config.broadcast_monitoring_timeout,
};
if let Err(e) = self.timeout_update_publisher.send(timeout) {
trace!(
target: LOG_TARGET,
"Could not send Timeout update, no subscribers to receive. (Err {:?})",
e
);
}
Ok(())
}
pub async fn add_utxo_import_transaction(
&mut self,
value: MicroTari,
source_public_key: CommsPublicKey,
message: String,
) -> Result<TxId, TransactionServiceError>
{
let tx_id = OsRng.next_u64();
self.db
.add_utxo_import_transaction(
tx_id,
value,
source_public_key,
self.node_identity.public_key().clone(),
message,
)
.await?;
Ok(tx_id)
}
pub async fn submit_transaction(
&mut self,
transaction_broadcast_join_handles: &mut FuturesUnordered<
JoinHandle<Result<u64, TransactionServiceProtocolError>>,
>,
tx_id: TxId,
tx: Transaction,
fee: MicroTari,
amount: MicroTari,
message: String,
) -> Result<(), TransactionServiceError>
{
trace!(target: LOG_TARGET, "Submit transaction ({}) to db.", tx_id);
self.db
.insert_completed_transaction(
tx_id,
CompletedTransaction::new(
tx_id,
self.node_identity.public_key().clone(),
self.node_identity.public_key().clone(),
amount,
fee,
tx,
TransactionStatus::Completed,
message,
Utc::now().naive_utc(),
TransactionDirection::Inbound,
None,
),
)
.await?;
trace!(
target: LOG_TARGET,
"Launch the transaction broadcast protocol for submitted transaction ({}).",
tx_id
);
self.complete_send_transaction_protocol(Ok(tx_id), transaction_broadcast_join_handles)
.await;
Ok(())
}
async fn generate_coinbase_transaction(
&mut self,
reward: MicroTari,
fees: MicroTari,
block_height: u64,
coinbase_monitoring_protocol_join_handles: &mut FuturesUnordered<
JoinHandle<Result<u64, TransactionServiceProtocolError>>,
>,
) -> Result<Transaction, TransactionServiceError>
{
let amount = reward + fees;
let find_result = self
.db
.find_coinbase_transaction_at_block_height(block_height, amount)
.await?;
let (tx_id, completed_transaction) = match find_result {
Some(completed_tx) => {
debug!(
target: LOG_TARGET,
"Coinbase transaction (TxId: {}) for Block Height: {} found, with Amount {}.",
completed_tx.tx_id,
block_height,
amount
);
(completed_tx.tx_id, completed_tx.transaction)
},
None => {
let tx_id = OsRng.next_u64();
let tx = self
.output_manager_service
.get_coinbase_transaction(tx_id, reward, fees, block_height)
.await?;
self.db
.cancel_coinbase_transaction_at_block_height(block_height)
.await?;
self.db
.insert_completed_transaction(
tx_id,
CompletedTransaction::new(
tx_id,
self.node_identity.public_key().clone(),
self.node_identity.public_key().clone(),
amount,
MicroTari::from(0),
tx.clone(),
TransactionStatus::Coinbase,
format!("Coinbase Transaction for Block {}", block_height),
Utc::now().naive_utc(),
TransactionDirection::Inbound,
Some(block_height),
),
)
.await?;
debug!(
target: LOG_TARGET,
"Coinbase transaction (TxId: {}) for Block Height: {} added", tx_id, block_height
);
(tx_id, tx)
},
};
if let Err(e) = self
.start_coinbase_transaction_monitoring_protocol(tx_id, coinbase_monitoring_protocol_join_handles)
.await
{
warn!(
target: LOG_TARGET,
"Could not start chain monitoring for Coinbase transaction (TxId: {}): {:?}", tx_id, e
);
}
Ok(completed_transaction)
}
async fn start_coinbase_transaction_monitoring_protocol(
&mut self,
tx_id: TxId,
join_handles: &mut FuturesUnordered<JoinHandle<Result<u64, TransactionServiceProtocolError>>>,
) -> Result<(), TransactionServiceError>
{
let completed_tx = self.db.get_completed_transaction(tx_id).await?;
if completed_tx.status != TransactionStatus::Coinbase || completed_tx.coinbase_block_height.is_none() {
return Err(TransactionServiceError::InvalidCompletedTransaction);
}
if self.base_node_response_senders.values().any(|(t, _s)| t == &tx_id) {
return Ok(());
}
let block_height = if let Some(bh) = completed_tx.coinbase_block_height {
bh
} else {
0
};
let timeout = match self.power_mode {
PowerMode::Normal => self.config.broadcast_monitoring_timeout,
PowerMode::Low => self.config.low_power_polling_timeout,
};
match self.base_node_public_key.clone() {
None => return Err(TransactionServiceError::NoBaseNodeKeysProvided),
Some(pk) => {
let protocol_id = OsRng.next_u64();
let (base_node_response_sender, base_node_response_receiver) = mpsc::channel(100);
self.base_node_response_senders
.insert(protocol_id, (tx_id, base_node_response_sender));
let protocol = TransactionCoinbaseMonitoringProtocol::new(
protocol_id,
completed_tx.tx_id,
block_height,
self.resources.clone(),
timeout,
pk,
base_node_response_receiver,
self.timeout_update_publisher.subscribe(),
);
let join_handle = tokio::spawn(protocol.execute());
join_handles.push(join_handle);
},
}
Ok(())
}
fn complete_coinbase_transaction_monitoring_protocol(
&mut self,
join_result: Result<u64, TransactionServiceProtocolError>,
)
{
match join_result {
Ok(id) => {
let _ = self.base_node_response_senders.remove(&id);
trace!(
target: LOG_TARGET,
"Coinbase Transaction monitoring Protocol for TxId: {} completed successfully",
id
);
},
Err(TransactionServiceProtocolError { id, error }) => {
let _ = self.base_node_response_senders.remove(&id);
if let TransactionServiceError::Shutdown = error {
return;
}
warn!(
target: LOG_TARGET,
"Error completing Coinbase Transaction monitoring Protocol (Id: {}): {:?}", id, error
);
let _ = self
.event_publisher
.send(Arc::new(TransactionEvent::Error(format!("{:?}", error))));
},
}
}
async fn restart_chain_monitoring_for_all_coinbase_transactions(
&mut self,
join_handles: &mut FuturesUnordered<JoinHandle<Result<u64, TransactionServiceProtocolError>>>,
) -> Result<(), TransactionServiceError>
{
trace!(
target: LOG_TARGET,
"Starting Coinbase monitoring for all Broadcast Transactions"
);
let completed_txs = self.db.get_completed_transactions().await?;
for completed_tx in completed_txs.values() {
if completed_tx.status == TransactionStatus::Coinbase {
self.start_coinbase_transaction_monitoring_protocol(completed_tx.tx_id, join_handles)
.await?;
}
}
Ok(())
}
#[cfg(feature = "test_harness")]
pub async fn complete_pending_outbound_transaction(
&mut self,
completed_tx: CompletedTransaction,
) -> Result<(), TransactionServiceError>
{
self.db
.complete_outbound_transaction(completed_tx.tx_id, completed_tx.clone())
.await?;
Ok(())
}
#[cfg(feature = "test_harness")]
pub async fn broadcast_transaction(&mut self, tx_id: TxId) -> Result<(), TransactionServiceError> {
let completed_txs = self.db.get_completed_transactions().await?;
completed_txs.get(&tx_id).ok_or_else(|| {
TransactionServiceError::TestHarnessError("Could not find Completed TX to broadcast.".to_string())
})?;
self.db.broadcast_completed_transaction(tx_id).await?;
let _ = self
.event_publisher
.send(Arc::new(TransactionEvent::TransactionBroadcast(tx_id)))
.map_err(|e| {
trace!(
target: LOG_TARGET,
"Error sending event, usually because there are no subscribers: {:?}",
e
);
e
});
Ok(())
}
#[cfg(feature = "test_harness")]
pub async fn mine_transaction(&mut self, tx_id: TxId) -> Result<(), TransactionServiceError> {
use tari_core::transactions::transaction::OutputFeatures;
let completed_txs = self.db.get_completed_transactions().await?;
let _found_tx = completed_txs.get(&tx_id).ok_or_else(|| {
TransactionServiceError::TestHarnessError("Could not find Completed TX to mine.".to_string())
})?;
let pending_tx_outputs = self.output_manager_service.get_pending_transactions().await?;
let pending_tx = pending_tx_outputs.get(&tx_id).ok_or_else(|| {
TransactionServiceError::TestHarnessError("Could not find Pending TX to complete.".to_string())
})?;
self.output_manager_service
.confirm_transaction(
tx_id,
pending_tx
.outputs_to_be_spent
.iter()
.map(|o| {
o.unblinded_output
.as_transaction_input(&self.resources.factories.commitment, OutputFeatures::default())
})
.collect(),
pending_tx
.outputs_to_be_received
.iter()
.map(|o| {
o.unblinded_output
.as_transaction_output(&self.resources.factories)
.expect("Failed to convert to Transaction Output")
})
.collect(),
)
.await?;
self.db.mine_completed_transaction(tx_id).await?;
let _ = self
.event_publisher
.send(Arc::new(TransactionEvent::TransactionMined(tx_id)))
.map_err(|e| {
trace!(
target: LOG_TARGET,
"Error sending event, usually because there are no subscribers: {:?}",
e
);
e
});
Ok(())
}
#[cfg(feature = "test_harness")]
pub async fn receive_test_transaction(
&mut self,
_tx_id: TxId,
amount: MicroTari,
source_public_key: CommsPublicKey,
) -> Result<(), TransactionServiceError>
{
use crate::{
output_manager_service::{
config::OutputManagerServiceConfig,
error::OutputManagerError,
service::OutputManagerService,
storage::{database::OutputManagerDatabase, memory_db::OutputManagerMemoryDatabase},
},
transaction_service::{handle::TransactionServiceHandle, storage::models::InboundTransaction},
};
use futures::stream;
use tari_core::consensus::{ConsensusConstantsBuilder, Network};
use tari_shutdown::Shutdown;
let (_sender, receiver) = reply_channel::unbounded();
let (tx, _rx) = mpsc::channel(20);
let (oms_event_publisher, _oms_event_subscriber) = broadcast::channel(100);
let (ts_request_sender, _ts_request_receiver) = reply_channel::unbounded();
let (event_publisher, _) = broadcast::channel(100);
let ts_handle = TransactionServiceHandle::new(ts_request_sender, event_publisher.clone());
let constants = ConsensusConstantsBuilder::new(Network::Stibbons).build();
let shutdown = Shutdown::new();
let mut fake_oms = OutputManagerService::new(
OutputManagerServiceConfig::default(),
OutboundMessageRequester::new(tx),
ts_handle,
receiver,
stream::empty(),
OutputManagerDatabase::new(OutputManagerMemoryDatabase::new()),
oms_event_publisher,
self.resources.factories.clone(),
constants,
shutdown.to_signal(),
)
.await?;
use crate::testnet_utils::make_input;
let (_ti, uo) = make_input(&mut OsRng, amount + 1000 * uT, &self.resources.factories);
fake_oms.add_output(uo).await?;
let mut stp = fake_oms
.prepare_transaction_to_send(amount, MicroTari::from(25), None, "".to_string())
.await?;
let msg = stp.build_single_round_message()?;
let proto_msg = proto::TransactionSenderMessage::single(msg.into());
let sender_message: TransactionSenderMessage = proto_msg
.try_into()
.map_err(TransactionServiceError::InvalidMessageError)?;
let (tx_id, _amount) = match sender_message.clone() {
TransactionSenderMessage::Single(data) => (data.tx_id, data.amount),
_ => {
return Err(TransactionServiceError::OutputManagerError(
OutputManagerError::InvalidSenderMessage,
))
},
};
let rtp = self
.output_manager_service
.get_recipient_transaction(sender_message)
.await?;
let inbound_transaction = InboundTransaction::new(
tx_id,
source_public_key,
amount,
rtp,
TransactionStatus::Pending,
"".to_string(),
Utc::now().naive_utc(),
);
self.db
.add_pending_inbound_transaction(tx_id, inbound_transaction.clone())
.await?;
let _ = self
.event_publisher
.send(Arc::new(TransactionEvent::ReceivedTransaction(tx_id)))
.map_err(|e| {
trace!(
target: LOG_TARGET,
"Error sending event, usually because there are no subscribers: {:?}",
e
);
e
});
Ok(())
}
#[cfg(feature = "test_harness")]
pub async fn finalize_received_test_transaction(&mut self, tx_id: TxId) -> Result<(), TransactionServiceError> {
use tari_core::transactions::{transaction::KernelBuilder, types::Signature};
use tari_crypto::commitment::HomomorphicCommitmentFactory;
let factories = CryptoFactories::default();
let inbound_txs = self.db.get_pending_inbound_transactions().await?;
let found_tx = inbound_txs.get(&tx_id).ok_or_else(|| {
TransactionServiceError::TestHarnessError("Could not find Pending Inbound TX to finalize.".to_string())
})?;
let kernel = KernelBuilder::new()
.with_excess(&factories.commitment.zero())
.with_signature(&Signature::default())
.build()
.unwrap();
let completed_transaction = CompletedTransaction::new(
tx_id,
found_tx.source_public_key.clone(),
self.node_identity.public_key().clone(),
found_tx.amount,
MicroTari::from(2000), Transaction::new(Vec::new(), Vec::new(), vec![kernel], BlindingFactor::default()),
TransactionStatus::Completed,
found_tx.message.clone(),
found_tx.timestamp,
TransactionDirection::Inbound,
None,
);
self.db
.complete_inbound_transaction(tx_id, completed_transaction.clone())
.await?;
let _ = self
.event_publisher
.send(Arc::new(TransactionEvent::ReceivedFinalizedTransaction(tx_id)))
.map_err(|e| {
trace!(
target: LOG_TARGET,
"Error sending event, usually because there are no subscribers: {:?}",
e
);
e
});
Ok(())
}
}
#[derive(Clone)]
pub struct TransactionServiceResources<TBackend>
where TBackend: TransactionBackend + 'static
{
pub db: TransactionDatabase<TBackend>,
pub output_manager_service: OutputManagerHandle,
pub outbound_message_service: OutboundMessageRequester,
pub connectivity_manager: ConnectivityRequester,
pub event_publisher: TransactionEventSender,
pub node_identity: Arc<NodeIdentity>,
pub factories: CryptoFactories,
pub config: TransactionServiceConfig,
pub shutdown_signal: ShutdownSignal,
}
#[derive(Clone, Copy)]
enum PowerMode {
Low,
Normal,
}
#[derive(Debug)]
pub struct PendingCoinbaseSpendingKey {
pub tx_id: TxId,
pub spending_key: PrivateKey,
}