use crate::error::QuorumStoreError;
use crate::monitor;
use anyhow::{format_err, Result};
use aptos_infallible::Mutex;
use consensus_types::{common::Round, request_response::ConsensusRequest};
use futures::channel::{mpsc, mpsc::Sender, oneshot};
use std::time::Duration;
use tokio::time::timeout;
#[async_trait::async_trait]
pub trait CommitNotifier: Send + Sync {
async fn notify_commit(&self, epoch: u64, round: Round) -> Result<(), QuorumStoreError>;
fn new_epoch(&self, quorum_store_commit_sender: mpsc::Sender<ConsensusRequest>);
}
pub struct QuorumStoreCommitNotifier {
quorum_store_commit_sender: Mutex<mpsc::Sender<ConsensusRequest>>,
quorum_store_commit_timeout_ms: u64,
}
impl QuorumStoreCommitNotifier {
pub fn new(quorum_store_commit_timeout_ms: u64) -> Self {
let (dummy_sender, _) = mpsc::channel(1);
Self {
quorum_store_commit_sender: Mutex::new(dummy_sender),
quorum_store_commit_timeout_ms,
}
}
}
#[async_trait::async_trait]
impl CommitNotifier for QuorumStoreCommitNotifier {
async fn notify_commit(&self, epoch: u64, round: Round) -> Result<(), QuorumStoreError> {
let (callback, callback_rcv) = oneshot::channel();
let req = ConsensusRequest::CleanRequest(epoch, round, callback);
self.quorum_store_commit_sender
.lock()
.clone()
.try_send(req)
.map_err(anyhow::Error::from)?;
if let Err(e) = monitor!(
"notify_commit",
timeout(
Duration::from_millis(self.quorum_store_commit_timeout_ms),
callback_rcv
)
.await
) {
Err(format_err!(
"[consensus] quorum store commit notifier did not receive ACK on time: {:?}",
e
)
.into())
} else {
Ok(())
}
}
fn new_epoch(&self, quorum_store_commit_sender: Sender<ConsensusRequest>) {
*self.quorum_store_commit_sender.lock() = quorum_store_commit_sender;
}
}