use crate::{
error::Error,
logging::{LogEntry, LogSchema},
};
use aptos_infallible::Mutex;
use aptos_logger::prelude::*;
use aptos_types::{
contract_event::ContractEvent,
ledger_info::LedgerInfoWithSignatures,
transaction::{Transaction, Version},
};
use consensus_notifications::{
ConsensusCommitNotification, ConsensusNotification, ConsensusNotificationListener,
ConsensusSyncNotification,
};
use data_streaming_service::data_notification::NotificationId;
use event_notifications::{EventNotificationSender, EventSubscriptionService};
use futures::{channel::mpsc, stream::FusedStream, Stream};
use mempool_notifications::MempoolNotificationSender;
use serde::Serialize;
use std::{
pin::Pin,
sync::Arc,
task::{Context, Poll},
};
const MEMPOOL_COMMIT_ACK_TIMEOUT_MS: u64 = 5000;
#[derive(Clone, Debug)]
pub enum CommitNotification {
CommittedStateSnapshot(CommittedStateSnapshot),
}
#[derive(Clone, Debug)]
pub struct CommittedStateSnapshot {
pub committed_transaction: CommittedTransactions,
pub last_committed_state_index: u64,
pub version: Version,
}
#[derive(Clone, Debug, PartialEq)]
pub struct CommittedTransactions {
pub events: Vec<ContractEvent>,
pub transactions: Vec<Transaction>,
}
impl CommitNotification {
pub fn new_committed_state_snapshot(
events: Vec<ContractEvent>,
transactions: Vec<Transaction>,
last_committed_state_index: u64,
version: Version,
) -> Self {
let committed_transaction = CommittedTransactions {
events,
transactions,
};
let committed_states = CommittedStateSnapshot {
committed_transaction,
last_committed_state_index,
version,
};
CommitNotification::CommittedStateSnapshot(committed_states)
}
pub async fn handle_transaction_notification<M: MempoolNotificationSender>(
events: Vec<ContractEvent>,
transactions: Vec<Transaction>,
latest_synced_version: Version,
latest_synced_ledger_info: LedgerInfoWithSignatures,
mut mempool_notification_handler: MempoolNotificationHandler<M>,
event_subscription_service: Arc<Mutex<EventSubscriptionService>>,
) -> Result<(), Error> {
debug!(
LogSchema::new(LogEntry::NotificationHandler).message(&format!(
"Notifying mempool of transactions at version: {:?}",
latest_synced_version
))
);
let blockchain_timestamp_usecs = latest_synced_ledger_info.ledger_info().timestamp_usecs();
mempool_notification_handler
.notify_mempool_of_committed_transactions(
transactions.clone(),
blockchain_timestamp_usecs,
)
.await?;
debug!(
LogSchema::new(LogEntry::NotificationHandler).message(&format!(
"Notifying the event subscription service of events at version: {:?}",
latest_synced_version
))
);
event_subscription_service
.lock()
.notify_events(latest_synced_version, events.clone())
.map_err(|error| error.into())
}
}
pub struct CommitNotificationListener {
commit_notification_listener: mpsc::UnboundedReceiver<CommitNotification>,
}
impl CommitNotificationListener {
pub fn new() -> (mpsc::UnboundedSender<CommitNotification>, Self) {
let (commit_notification_sender, commit_notification_listener) = mpsc::unbounded();
let commit_notification_listener = Self {
commit_notification_listener,
};
(commit_notification_sender, commit_notification_listener)
}
}
impl Stream for CommitNotificationListener {
type Item = CommitNotification;
fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
Pin::new(&mut self.get_mut().commit_notification_listener).poll_next(cx)
}
}
impl FusedStream for CommitNotificationListener {
fn is_terminated(&self) -> bool {
self.commit_notification_listener.is_terminated()
}
}
pub struct ConsensusSyncRequest {
consensus_sync_notification: ConsensusSyncNotification,
}
impl ConsensusSyncRequest {
pub fn new(consensus_sync_notification: ConsensusSyncNotification) -> Self {
Self {
consensus_sync_notification,
}
}
pub fn get_sync_target(&self) -> LedgerInfoWithSignatures {
self.consensus_sync_notification.target.clone()
}
}
pub struct ConsensusNotificationHandler {
consensus_listener: ConsensusNotificationListener,
consensus_sync_request: Arc<Mutex<Option<ConsensusSyncRequest>>>,
}
impl ConsensusNotificationHandler {
pub fn new(consensus_listener: ConsensusNotificationListener) -> Self {
Self {
consensus_listener,
consensus_sync_request: Arc::new(Mutex::new(None)),
}
}
pub fn active_sync_request(&self) -> bool {
self.consensus_sync_request.lock().is_some()
}
pub fn get_consensus_sync_request(&self) -> Arc<Mutex<Option<ConsensusSyncRequest>>> {
self.consensus_sync_request.clone()
}
pub async fn initialize_sync_request(
&mut self,
sync_notification: ConsensusSyncNotification,
latest_synced_ledger_info: LedgerInfoWithSignatures,
) -> Result<(), Error> {
let sync_target_version = sync_notification.target.ledger_info().version();
let latest_committed_version = latest_synced_ledger_info.ledger_info().version();
if sync_target_version < latest_committed_version {
let error = Err(Error::OldSyncRequest(
sync_target_version,
latest_committed_version,
));
self.respond_to_sync_notification(sync_notification, error.clone())
.await?;
return error;
}
if sync_target_version == latest_committed_version {
info!(LogSchema::new(LogEntry::NotificationHandler)
.message("We're already at the requested sync target version! Returning early"));
let result = Ok(());
self.respond_to_sync_notification(sync_notification, result.clone())
.await?;
return result;
}
let consensus_sync_request = ConsensusSyncRequest::new(sync_notification);
self.consensus_sync_request = Arc::new(Mutex::new(Some(consensus_sync_request)));
Ok(())
}
pub async fn check_sync_request_progress(
&mut self,
latest_synced_ledger_info: LedgerInfoWithSignatures,
) -> Result<(), Error> {
let consensus_sync_request = self.get_consensus_sync_request();
let sync_target_version = consensus_sync_request.lock().as_ref().map(|sync_request| {
sync_request
.consensus_sync_notification
.target
.ledger_info()
.version()
});
if let Some(sync_target_version) = sync_target_version {
let latest_committed_version = latest_synced_ledger_info.ledger_info().version();
if latest_committed_version > sync_target_version {
return Err(Error::SyncedBeyondTarget(
latest_committed_version,
sync_target_version,
));
}
if latest_committed_version == sync_target_version {
let consensus_sync_request = self.get_consensus_sync_request().lock().take();
if let Some(consensus_sync_request) = consensus_sync_request {
self.respond_to_sync_notification(
consensus_sync_request.consensus_sync_notification,
Ok(()),
)
.await?;
}
return Ok(());
}
}
Ok(())
}
pub async fn respond_to_sync_notification(
&mut self,
sync_notification: ConsensusSyncNotification,
result: Result<(), Error>,
) -> Result<(), Error> {
let message = result.map_err(|error| {
consensus_notifications::Error::UnexpectedErrorEncountered(format!("{:?}", error))
});
info!(
LogSchema::new(LogEntry::NotificationHandler).message(&format!(
"Responding to consensus sync notification with message: {:?}",
message
))
);
self.consensus_listener
.respond_to_sync_notification(sync_notification, message)
.await
.map_err(|error| {
Error::CallbackSendFailed(format!(
"Consensus sync request response error: {:?}",
error
))
})
}
pub async fn respond_to_commit_notification(
&mut self,
commit_notification: ConsensusCommitNotification,
result: Result<(), Error>,
) -> Result<(), Error> {
let message = result.map_err(|error| {
consensus_notifications::Error::UnexpectedErrorEncountered(format!("{:?}", error))
});
debug!(
LogSchema::new(LogEntry::NotificationHandler).message(&format!(
"Responding to consensus commit notification with message: {:?}",
message
))
);
self.consensus_listener
.respond_to_commit_notification(commit_notification, message)
.await
.map_err(|error| {
Error::CallbackSendFailed(format!("Consensus commit response error: {:?}", error))
})
}
}
impl Stream for ConsensusNotificationHandler {
type Item = ConsensusNotification;
fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
Pin::new(&mut self.get_mut().consensus_listener).poll_next(cx)
}
}
impl FusedStream for ConsensusNotificationHandler {
fn is_terminated(&self) -> bool {
self.consensus_listener.is_terminated()
}
}
#[derive(Clone, Debug, Serialize)]
pub struct ErrorNotification {
pub error: Error,
pub notification_id: NotificationId,
}
pub struct ErrorNotificationListener {
error_notification_listener: mpsc::UnboundedReceiver<ErrorNotification>,
}
impl ErrorNotificationListener {
pub fn new() -> (mpsc::UnboundedSender<ErrorNotification>, Self) {
let (error_notification_sender, error_notification_listener) = mpsc::unbounded();
let error_notification_listener = Self {
error_notification_listener,
};
(error_notification_sender, error_notification_listener)
}
}
impl Stream for ErrorNotificationListener {
type Item = ErrorNotification;
fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
Pin::new(&mut self.get_mut().error_notification_listener).poll_next(cx)
}
}
impl FusedStream for ErrorNotificationListener {
fn is_terminated(&self) -> bool {
self.error_notification_listener.is_terminated()
}
}
#[derive(Clone)]
pub struct MempoolNotificationHandler<M> {
mempool_notification_sender: M,
}
impl<M: MempoolNotificationSender> MempoolNotificationHandler<M> {
pub fn new(mempool_notification_sender: M) -> Self {
Self {
mempool_notification_sender,
}
}
pub async fn notify_mempool_of_committed_transactions(
&mut self,
committed_transactions: Vec<Transaction>,
block_timestamp_usecs: u64,
) -> Result<(), Error> {
let result = self
.mempool_notification_sender
.notify_new_commit(
committed_transactions,
block_timestamp_usecs,
MEMPOOL_COMMIT_ACK_TIMEOUT_MS,
)
.await;
if let Err(error) = result {
let error = Error::NotifyMempoolError(format!("{:?}", error));
error!(LogSchema::new(LogEntry::NotificationHandler)
.error(&error)
.message("Failed to notify mempool of committed transactions!"));
Err(error)
} else {
Ok(())
}
}
}