#[cfg(feature = "unstable-stream")]
use crate::PollingResult;
use crate::{Poller, Result, sealed};
use google_cloud_gax::polling_state::PollingState;
#[derive(Clone, Debug)]
#[non_exhaustive]
pub enum Either<A, B> {
Left(A),
Right(B),
}
impl<A, B> sealed::Poller for Either<A, B>
where
A: sealed::Poller + Send,
B: sealed::Poller + Send,
{
async fn backoff(&mut self, state: &PollingState) {
match self {
Self::Left(s) => s.backoff(state).await,
Self::Right(s) => s.backoff(state).await,
}
}
}
impl<A, B, ResponseType, MetadataType> Poller<ResponseType, MetadataType> for Either<A, B>
where
A: Poller<ResponseType, MetadataType>,
B: Poller<ResponseType, MetadataType>,
ResponseType: Send,
MetadataType: Send,
{
async fn poll(&mut self) -> Option<crate::PollingResult<ResponseType, MetadataType>> {
match self {
Self::Left(s) => s.poll().await,
Self::Right(s) => s.poll().await,
}
}
async fn until_done(self) -> Result<ResponseType> {
match self {
Self::Left(s) => s.until_done().await,
Self::Right(s) => s.until_done().await,
}
}
#[cfg(feature = "unstable-stream")]
fn into_stream(
self,
) -> impl futures::Stream<Item = PollingResult<ResponseType, MetadataType>> + Unpin {
match self {
Self::Left(s) => futures::future::Either::Left(s.into_stream()),
Self::Right(s) => futures::future::Either::Right(s.into_stream()),
}
}
}
#[cfg(test)]
mod tests {
use super::*;
use crate::PollingResult;
use crate::sealed::Poller as _;
use google_cloud_wkt::{Duration, Timestamp};
use mockall::mock;
type ResponseType = Duration;
type MetadataType = Timestamp;
mock! {
PollerA {}
impl sealed::Poller for PollerA {
async fn backoff(&mut self, state: &PollingState);
}
impl Poller<ResponseType, MetadataType> for PollerA {
async fn poll(&mut self) -> Option<PollingResult<ResponseType, MetadataType>>;
async fn until_done(self) -> google_cloud_gax::Result<ResponseType>;
#[cfg(feature = "unstable-stream")]
fn into_stream(
self,
) -> impl futures::Stream<Item = PollingResult<ResponseType, MetadataType>> + Unpin;
}
}
mock! {
PollerB {}
impl sealed::Poller for PollerB {
async fn backoff(&mut self, state: &PollingState);
}
impl Poller<ResponseType, MetadataType> for PollerB {
async fn poll(&mut self) -> Option<PollingResult<ResponseType, MetadataType>>;
async fn until_done(self) -> google_cloud_gax::Result<ResponseType>;
#[cfg(feature = "unstable-stream")]
fn into_stream(
self,
) -> impl futures::Stream<Item = PollingResult<ResponseType, MetadataType>> + Unpin;
}
}
#[tokio::test]
async fn test_either_poller_left() {
let mut mock = MockPollerA::new();
mock.expect_poll().times(1).returning(|| None);
mock.expect_backoff().times(1).returning(|_| ());
let mut poller: Either<MockPollerA, MockPollerB> = Either::Left(mock);
poller.poll().await;
poller.backoff(&PollingState::default()).await;
}
#[tokio::test]
async fn test_either_poller_right() {
let mut mock = MockPollerB::new();
mock.expect_poll().times(1).returning(|| None);
mock.expect_backoff().times(1).returning(|_| ());
let mut poller: Either<MockPollerA, MockPollerB> = Either::Right(mock);
poller.poll().await;
poller.backoff(&PollingState::default()).await;
}
#[tokio::test]
async fn test_return_impl_base_poller() {
fn factory(use_a: bool) -> impl Poller<ResponseType, MetadataType> {
if use_a {
Either::Left(MockPollerA::new())
} else {
Either::Right(MockPollerB::new())
}
}
let _poller_a = factory(true);
let _poller_b = factory(false);
}
#[tokio::test]
async fn test_either_until_done_left() {
let mut mock = MockPollerA::new();
mock.expect_until_done()
.times(1)
.returning(|| Ok(Duration::clamp(123, 456)));
let poller: Either<MockPollerA, MockPollerB> = Either::Left(mock);
let res = poller.until_done().await.unwrap();
assert_eq!(res, Duration::clamp(123, 456));
}
#[tokio::test]
async fn test_either_until_done_right() {
let mut mock = MockPollerB::new();
mock.expect_until_done()
.times(1)
.returning(|| Ok(Duration::clamp(123, 456)));
let poller: Either<MockPollerA, MockPollerB> = Either::Right(mock);
let res = poller.until_done().await.unwrap();
assert_eq!(res, Duration::clamp(123, 456));
}
#[cfg(feature = "unstable-stream")]
#[tokio::test]
async fn test_either_into_stream_left() {
use futures::StreamExt;
let mut mock = MockPollerA::new();
mock.expect_into_stream()
.times(1)
.returning(|| Box::pin(futures::stream::empty()));
let poller: Either<MockPollerA, MockPollerB> = Either::Left(mock);
let mut stream = poller.into_stream();
assert!(stream.next().await.is_none());
}
#[cfg(feature = "unstable-stream")]
#[tokio::test]
async fn test_either_into_stream_right() {
use futures::StreamExt;
let mut mock = MockPollerB::new();
mock.expect_into_stream()
.times(1)
.returning(|| Box::pin(futures::stream::empty()));
let poller: Either<MockPollerA, MockPollerB> = Either::Right(mock);
let mut stream = poller.into_stream();
assert!(stream.next().await.is_none());
}
}