use crate::OptionalSend;
use crate::RaftTypeConfig;
use crate::async_runtime::OneshotSender;
use crate::raft::responder::Responder;
use crate::type_config::TypeConfigExt;
use crate::type_config::alias::LogIdOf;
use crate::type_config::alias::OneshotReceiverOf;
use crate::type_config::alias::OneshotSenderOf;
pub struct ProgressResponder<C, T>
where
C: RaftTypeConfig,
T: OptionalSend,
{
commit_tx: Option<OneshotSenderOf<C, LogIdOf<C>>>,
complete_tx: OneshotSenderOf<C, T>,
}
impl<C, T> ProgressResponder<C, T>
where
C: RaftTypeConfig,
T: OptionalSend,
{
pub fn new() -> (Self, OneshotReceiverOf<C, LogIdOf<C>>, OneshotReceiverOf<C, T>) {
let (commit_tx, commit_rx) = C::oneshot();
let (complete_tx, complete_rx) = C::oneshot();
let responder = Self {
commit_tx: Some(commit_tx),
complete_tx,
};
(responder, commit_rx, complete_rx)
}
}
impl<C, T> Responder<C, T> for ProgressResponder<C, T>
where
C: RaftTypeConfig,
T: OptionalSend + 'static,
{
fn on_commit(&mut self, log_id: LogIdOf<C>) {
if let Some(tx) = self.commit_tx.take() {
let res = tx.send(log_id);
if res.is_ok() {
tracing::debug!("ProgressResponder.commit_tx.send: is_ok: {}", res.is_ok());
} else {
tracing::warn!("ProgressResponder.commit_tx.send: is_ok: {}", res.is_ok());
}
}
}
fn on_complete(self, res: T) {
let res = self.complete_tx.send(res);
if res.is_ok() {
tracing::debug!("ProgressResponder.complete_tx.send: is_ok: {}", res.is_ok());
} else {
tracing::warn!("ProgressResponder.complete_tx.send: is_ok: {}", res.is_ok());
}
}
}
#[cfg(test)]
mod tests {
use std::time::Duration;
use crate::engine::testing::UTConfig;
use crate::engine::testing::log_id;
use crate::raft::responder::ProgressResponder;
use crate::raft::responder::Responder;
use crate::type_config::TypeConfigExt;
#[test]
fn test_twoshot_responder_new() {
UTConfig::<()>::run(async {
let (_responder, mut commit_rx, mut complete_rx): (ProgressResponder<UTConfig, String>, _, _) =
ProgressResponder::new();
assert!(commit_rx.try_recv().is_err());
assert!(complete_rx.try_recv().is_err());
});
}
#[test]
fn test_twoshot_responder_on_commit() {
UTConfig::<()>::run(async {
let (mut responder, commit_rx, _complete_rx): (ProgressResponder<UTConfig, String>, _, _) =
ProgressResponder::new();
let test_log_id = log_id(1, 2, 3);
responder.on_commit(test_log_id);
let received_log_id = commit_rx.await.unwrap();
assert_eq!(test_log_id, received_log_id);
});
}
#[test]
fn test_twoshot_responder_on_commit_multiple_calls() {
UTConfig::<()>::run(async {
let (mut responder, commit_rx, _complete_rx): (ProgressResponder<UTConfig, String>, _, _) =
ProgressResponder::new();
let test_log_id_1 = log_id(1, 2, 3);
let test_log_id_2 = log_id(2, 3, 4);
responder.on_commit(test_log_id_1);
responder.on_commit(test_log_id_2);
let received_log_id = commit_rx.await.unwrap();
assert_eq!(test_log_id_1, received_log_id);
});
}
#[test]
fn test_twoshot_responder_send() {
UTConfig::<()>::run(async {
let (responder, _commit_rx, complete_rx): (ProgressResponder<UTConfig, String>, _, _) =
ProgressResponder::new();
let test_result = "test_result".to_string();
responder.on_complete(test_result.clone());
let received_result = complete_rx.await.unwrap();
assert_eq!(test_result, received_result);
});
}
#[test]
fn test_twoshot_responder_both_channels() {
UTConfig::<()>::run(async {
let (mut responder, commit_rx, complete_rx): (ProgressResponder<UTConfig, String>, _, _) =
ProgressResponder::new();
let test_log_id = log_id(1, 2, 3);
let test_result = "test_result".to_string();
responder.on_commit(test_log_id);
let received_log_id = commit_rx.await.unwrap();
assert_eq!(test_log_id, received_log_id);
responder.on_complete(test_result.clone());
let received_result = complete_rx.await.unwrap();
assert_eq!(test_result, received_result);
});
}
#[test]
fn test_twoshot_responder_send_without_commit() {
UTConfig::<()>::run(async {
let (responder, mut commit_rx, complete_rx): (ProgressResponder<UTConfig, String>, _, _) =
ProgressResponder::new();
let test_result = "test_result".to_string();
responder.on_complete(test_result.clone());
let received_result = complete_rx.await.unwrap();
assert_eq!(test_result, received_result);
assert!(commit_rx.try_recv().is_err());
});
}
#[test]
fn test_twoshot_responder_ordering() {
UTConfig::<()>::run(async {
let (mut responder, commit_rx, complete_rx): (ProgressResponder<UTConfig, i32>, _, _) =
ProgressResponder::new();
let test_log_id = log_id(5, 10, 15);
let test_result = 42;
let commit_task = UTConfig::<()>::spawn(async move { commit_rx.await.unwrap() });
let complete_task = UTConfig::<()>::spawn(async move { complete_rx.await.unwrap() });
UTConfig::<()>::sleep(Duration::from_millis(10)).await;
responder.on_commit(test_log_id);
responder.on_complete(test_result);
let received_log_id = commit_task.await.unwrap();
let received_result = complete_task.await.unwrap();
assert_eq!(test_log_id, received_log_id);
assert_eq!(test_result, received_result);
});
}
}