use std::future::Future;
use std::io;
use std::time::Duration;
use futures_util::Stream;
use openraft_macros::since;
use crate::Instant;
use crate::OptionalSend;
use crate::OptionalSync;
use crate::RaftTypeConfig;
use crate::async_runtime::Mpsc;
use crate::async_runtime::MpscReceiver;
use crate::async_runtime::Oneshot;
use crate::async_runtime::mutex::Mutex;
use crate::async_runtime::watch::Watch;
use crate::errors::ErrorSource;
use crate::type_config::AsyncRuntime;
use crate::type_config::alias::AsyncRuntimeOf;
use crate::type_config::alias::ErrorSourceOf;
use crate::type_config::alias::InstantOf;
use crate::type_config::alias::JoinHandleOf;
use crate::type_config::alias::MpscOf;
use crate::type_config::alias::MpscReceiverOf;
use crate::type_config::alias::MpscSenderOf;
use crate::type_config::alias::MutexOf;
use crate::type_config::alias::OneshotOf;
use crate::type_config::alias::OneshotReceiverOf;
use crate::type_config::alias::OneshotSenderOf;
use crate::type_config::alias::SleepOf;
use crate::type_config::alias::TimeoutOf;
use crate::type_config::alias::WatchOf;
use crate::type_config::alias::WatchReceiverOf;
use crate::type_config::alias::WatchSenderOf;
#[since(version = "0.10.0")]
pub trait TypeConfigExt: RaftTypeConfig {
#[track_caller]
fn now() -> InstantOf<Self> {
InstantOf::<Self>::now()
}
#[track_caller]
fn sleep(duration: Duration) -> SleepOf<Self> {
AsyncRuntimeOf::<Self>::sleep(duration)
}
#[track_caller]
fn yield_now() -> SleepOf<Self> {
AsyncRuntimeOf::<Self>::sleep(Duration::from_nanos(1))
}
#[track_caller]
fn sleep_until(deadline: InstantOf<Self>) -> SleepOf<Self> {
AsyncRuntimeOf::<Self>::sleep_until(deadline)
}
#[track_caller]
fn timeout<R, F: Future<Output = R> + OptionalSend>(duration: Duration, future: F) -> TimeoutOf<Self, R, F> {
AsyncRuntimeOf::<Self>::timeout(duration, future)
}
#[track_caller]
fn timeout_at<R, F: Future<Output = R> + OptionalSend>(
deadline: InstantOf<Self>,
future: F,
) -> TimeoutOf<Self, R, F> {
AsyncRuntimeOf::<Self>::timeout_at(deadline, future)
}
#[track_caller]
fn oneshot<T>() -> (OneshotSenderOf<Self, T>, OneshotReceiverOf<Self, T>)
where T: OptionalSend {
OneshotOf::<Self>::channel()
}
#[track_caller]
fn mpsc<T>(buffer: usize) -> (MpscSenderOf<Self, T>, MpscReceiverOf<Self, T>)
where T: OptionalSend {
MpscOf::<Self>::channel(buffer)
}
fn mpsc_to_stream<T>(rx: MpscReceiverOf<Self, T>) -> impl Stream<Item = T>
where T: OptionalSend {
futures_util::stream::unfold(rx, |mut rx| async move {
let item = MpscReceiver::recv(&mut rx).await?;
Some((item, rx))
})
}
#[track_caller]
fn watch_channel<T>(init: T) -> (WatchSenderOf<Self, T>, WatchReceiverOf<Self, T>)
where T: OptionalSend + OptionalSync {
WatchOf::<Self>::channel(init)
}
#[track_caller]
fn mutex<T>(value: T) -> MutexOf<Self, T>
where T: OptionalSend {
MutexOf::<Self, T>::new(value)
}
#[track_caller]
fn spawn<T>(future: T) -> JoinHandleOf<Self, T::Output>
where
T: Future + OptionalSend + 'static,
T::Output: OptionalSend + 'static,
{
AsyncRuntimeOf::<Self>::spawn(future)
}
#[track_caller]
fn run<F, T>(future: F) -> T
where
F: Future<Output = T>,
T: OptionalSend,
{
<AsyncRuntimeOf<Self> as AsyncRuntime>::run(future)
}
#[track_caller]
fn spawn_blocking<F, T>(f: F) -> impl Future<Output = Result<T, io::Error>> + Send
where
F: FnOnce() -> T + Send + 'static,
T: Send + 'static,
{
AsyncRuntimeOf::<Self>::spawn_blocking(f)
}
fn err_from_string(msg: impl ToString) -> ErrorSourceOf<Self> {
ErrorSourceOf::<Self>::from_string(msg)
}
fn err_from_error<E: std::error::Error + 'static>(e: &E) -> ErrorSourceOf<Self> {
ErrorSourceOf::<Self>::from_error(e)
}
}
impl<T> TypeConfigExt for T where T: RaftTypeConfig {}
#[cfg(test)]
mod tests {
use std::io::Cursor;
use futures_util::StreamExt;
use openraft_rt_tokio::TokioRuntime;
use crate::OptionalSend;
use crate::RaftTypeConfig;
use crate::async_runtime::MpscSender;
use crate::type_config::TypeConfigExt;
#[derive(Debug, Clone, Copy, Default, Eq, PartialEq, Ord, PartialOrd)]
pub(crate) struct UTConfig {}
impl RaftTypeConfig for UTConfig {
type D = u64;
type R = ();
type NodeId = u64;
type Node = ();
type Term = u64;
type LeaderId = crate::impls::leader_id_adv::LeaderId<u64, u64>;
type Vote = crate::impls::Vote<Self::LeaderId>;
type Entry =
crate::Entry<<Self::LeaderId as crate::vote::RaftLeaderId>::Committed, Self::D, Self::NodeId, Self::Node>;
type SnapshotData = Cursor<Vec<u8>>;
type AsyncRuntime = TokioRuntime;
type Responder<T>
= crate::impls::OneshotResponder<Self, T>
where T: OptionalSend + 'static;
type Batch<T>
= crate::impls::InlineBatch<T>
where T: OptionalSend + 'static;
type ErrorSource = anyerror::AnyError;
}
#[test]
fn test_mpsc_to_stream() {
UTConfig::run(async {
let (tx, rx) = UTConfig::mpsc::<u64>(16);
let stream = UTConfig::mpsc_to_stream(rx);
futures_util::pin_mut!(stream);
tx.send(1).await.unwrap();
tx.send(2).await.unwrap();
tx.send(3).await.unwrap();
drop(tx);
let mut received = vec![];
while let Some(item) = stream.next().await {
received.push(item);
}
assert_eq!(received, vec![1, 2, 3]);
});
}
#[test]
fn test_mpsc_to_stream_empty() {
UTConfig::run(async {
let (tx, rx) = UTConfig::mpsc::<u64>(16);
let stream = UTConfig::mpsc_to_stream(rx);
futures_util::pin_mut!(stream);
drop(tx);
let item = stream.next().await;
assert!(item.is_none());
});
}
fn _assert_static<T: 'static>(_: T) {}
#[test]
fn test_mpsc_to_stream_is_static() {
let (_, rx) = UTConfig::mpsc::<u64>(16);
let stream = UTConfig::mpsc_to_stream(rx);
_assert_static(stream);
}
}