use std::future::Future;
use std::pin::Pin;
use std::task::{Context, Poll, ready};
use tokio::sync::oneshot;
pub struct PublishFuture {
pub(crate) rx: oneshot::Receiver<std::result::Result<String, crate::error::PublishError>>,
}
impl Future for PublishFuture {
type Output = std::result::Result<String, crate::error::PublishError>;
fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
let result = ready!(Pin::new(&mut self.rx).poll(cx));
match result {
Ok(result) => Poll::Ready(result),
Err(_) => Poll::Ready(Err(crate::error::PublishError::Shutdown)),
}
}
}
#[cfg(test)]
mod tests {
use super::*;
#[tokio::test]
async fn resolve_publish_future_success() -> anyhow::Result<()> {
let (tx, rx) = oneshot::channel();
let handle = PublishFuture { rx };
let _ = tx.send(Ok("message_id".to_string()));
assert_eq!(handle.await?, "message_id");
Ok(())
}
#[tokio::test]
async fn resolve_publish_future_error() -> anyhow::Result<()> {
let (tx, rx) = oneshot::channel();
let fut = PublishFuture { rx };
let _ = tx.send(Err(crate::error::PublishError::OrderingKeyPaused));
let res = fut.await;
assert!(
matches!(res, Err(crate::error::PublishError::OrderingKeyPaused)),
"{res:?}"
);
Ok(())
}
#[tokio::test]
async fn resolve_publish_future_error_send_error() -> anyhow::Result<()> {
let (tx, rx) = oneshot::channel();
let fut = PublishFuture { rx };
drop(tx);
let res = fut.await;
assert!(
matches!(res, Err(crate::error::PublishError::Shutdown)),
"{res:?}"
);
Ok(())
}
}