use crate::OptionalSend;
use crate::OptionalSync;
use crate::RaftTypeConfig;
use crate::Vote;
use crate::async_runtime::watch::RecvError;
use crate::async_runtime::watch::WatchReceiver;
use crate::core::io_flush_tracking::FlushPoint;
use crate::type_config::alias::LeaderIdOf;
use crate::type_config::alias::LogIdOf;
use crate::type_config::alias::WatchReceiverOf;
pub type LogProgress<C> = WatchProgress<C, Option<FlushPoint<C>>>;
pub type VoteProgress<C> = WatchProgress<C, Option<Vote<LeaderIdOf<C>>>>;
pub type CommitProgress<C> = WatchProgress<C, Option<LogIdOf<C>>>;
pub type SnapshotProgress<C> = WatchProgress<C, Option<LogIdOf<C>>>;
pub type AppliedProgress<C> = WatchProgress<C, Option<LogIdOf<C>>>;
#[derive(Clone)]
pub struct WatchProgress<C, T>
where
C: RaftTypeConfig,
T: OptionalSend + OptionalSync + PartialOrd + Clone,
{
inner: WatchReceiverOf<C, T>,
}
impl<C, T> WatchProgress<C, T>
where
C: RaftTypeConfig,
T: OptionalSend + OptionalSync + PartialOrd + Clone,
{
pub(crate) fn new(inner: WatchReceiverOf<C, T>) -> Self {
Self { inner }
}
pub async fn wait_until_ge(&mut self, target: &T) -> Result<T, RecvError> {
self.inner.wait_until_ge(target).await
}
pub async fn wait_until<F>(&mut self, condition: F) -> Result<T, RecvError>
where F: Fn(&T) -> bool + OptionalSend {
self.inner.wait_until(condition).await
}
pub fn get(&self) -> T {
self.inner.borrow_watched().clone()
}
pub async fn changed(&mut self) -> Result<(), RecvError> {
self.inner.changed().await
}
pub async fn next(&mut self) -> Result<T, RecvError> {
self.changed().await?;
Ok(self.get())
}
}
#[cfg(test)]
mod tests {
use std::io::Cursor;
use openraft_rt_tokio::TokioRuntime;
use super::*;
use crate::RaftTypeConfig;
use crate::async_runtime::WatchSender;
use crate::impls::Vote;
use crate::type_config::TypeConfigExt;
#[derive(Debug, Clone, Copy, Default, Eq, PartialEq, Ord, PartialOrd)]
struct TestConfig;
impl RaftTypeConfig for TestConfig {
type D = u64;
type R = ();
type NodeId = u64;
type Node = ();
type Term = u64;
type LeaderId = crate::impls::leader_id_adv::LeaderId<u64, u64>;
type Vote = Vote<Self::LeaderId>;
type Entry =
crate::Entry<<Self::LeaderId as crate::vote::RaftLeaderId>::Committed, Self::D, Self::NodeId, Self::Node>;
type SnapshotData = Cursor<Vec<u8>>;
type AsyncRuntime = TokioRuntime;
type Responder<T>
= crate::impls::OneshotResponder<Self, T>
where T: OptionalSend + 'static;
type Batch<T>
= crate::impls::InlineBatch<T>
where T: OptionalSend + 'static;
type ErrorSource = anyerror::AnyError;
}
#[test]
fn test_wait_until_ge() {
TestConfig::run(async {
let (tx, rx) = TestConfig::watch_channel(0u64);
let mut progress = WatchProgress::<TestConfig, u64>::new(rx);
assert_eq!(progress.get(), 0);
TestConfig::spawn(async move {
TestConfig::sleep(std::time::Duration::from_millis(10)).await;
tx.send(5).unwrap();
TestConfig::sleep(std::time::Duration::from_millis(10)).await;
tx.send(10).unwrap();
});
let result = progress.wait_until_ge(&8).await.unwrap();
assert!(result >= 8);
assert_eq!(result, 10);
});
}
#[test]
fn test_wait_until_ge_immediate() {
TestConfig::run(async {
let (tx, rx) = TestConfig::watch_channel(10u64);
let mut progress = WatchProgress::<TestConfig, u64>::new(rx);
let result = progress.wait_until_ge(&5).await.unwrap();
assert_eq!(result, 10);
drop(tx);
});
}
#[test]
fn test_wait_until_custom_condition() {
TestConfig::run(async {
let (tx, rx) = TestConfig::watch_channel(1u64);
let mut progress = WatchProgress::<TestConfig, u64>::new(rx);
TestConfig::spawn(async move {
for i in 2..=10 {
TestConfig::sleep(std::time::Duration::from_millis(5)).await;
tx.send(i).unwrap();
}
});
let result = progress.wait_until(|v| v % 2 == 0).await.unwrap();
assert_eq!(result % 2, 0);
assert_eq!(result, 2);
});
}
#[test]
fn test_wait_until_immediate() {
TestConfig::run(async {
let (tx, rx) = TestConfig::watch_channel(10u64);
let mut progress = WatchProgress::<TestConfig, u64>::new(rx);
let result = progress.wait_until(|v| v >= &5).await.unwrap();
assert_eq!(result, 10);
drop(tx);
});
}
#[test]
fn test_changed_waits_for_notification() {
TestConfig::run(async {
let (tx, rx) = TestConfig::watch_channel(0u64);
let mut progress = WatchProgress::<TestConfig, u64>::new(rx);
assert_eq!(progress.get(), 0);
TestConfig::spawn(async move {
TestConfig::sleep(std::time::Duration::from_millis(10)).await;
tx.send(5).unwrap();
});
progress.changed().await.unwrap();
assert_eq!(progress.get(), 5);
});
}
#[test]
fn test_changed_returns_immediately_if_unseen_value() {
TestConfig::run(async {
let (tx, rx) = TestConfig::watch_channel(0u64);
let mut progress = WatchProgress::<TestConfig, u64>::new(rx);
tx.send(5).unwrap();
progress.changed().await.unwrap();
assert_eq!(progress.get(), 5);
});
}
#[test]
fn test_changed_returns_error_when_sender_dropped() {
TestConfig::run(async {
let (tx, rx) = TestConfig::watch_channel(0u64);
let mut progress = WatchProgress::<TestConfig, u64>::new(rx);
drop(tx);
let result = progress.changed().await;
assert!(result.is_err());
});
}
#[test]
fn test_next_returns_changed_value() {
TestConfig::run(async {
let (tx, rx) = TestConfig::watch_channel(0u64);
let mut progress = WatchProgress::<TestConfig, u64>::new(rx);
TestConfig::spawn(async move {
TestConfig::sleep(std::time::Duration::from_millis(10)).await;
tx.send(5).unwrap();
TestConfig::sleep(std::time::Duration::from_millis(10)).await;
tx.send(10).unwrap();
});
let value = progress.next().await.unwrap();
assert_eq!(value, 5);
let value = progress.next().await.unwrap();
assert_eq!(value, 10);
});
}
#[test]
fn test_next_returns_immediate_if_unseen_value() {
TestConfig::run(async {
let (tx, rx) = TestConfig::watch_channel(0u64);
let mut progress = WatchProgress::<TestConfig, u64>::new(rx);
tx.send(42).unwrap();
let value = progress.next().await.unwrap();
assert_eq!(value, 42);
});
}
#[test]
fn test_next_returns_error_when_sender_dropped() {
TestConfig::run(async {
let (tx, rx) = TestConfig::watch_channel(0u64);
let mut progress = WatchProgress::<TestConfig, u64>::new(rx);
drop(tx);
let result = progress.next().await;
assert!(result.is_err());
});
}
}