use std::future::Future;
use tokio::sync::mpsc;
use tokio_util::sync::CancellationToken;
use super::ConnectionActor;
use crate::{push::FrameLike, response::FrameStream};
impl<F, E> ConnectionActor<F, E>
where
F: FrameLike,
{
#[inline]
pub(super) async fn wait_shutdown(token: CancellationToken) { token.cancelled_owned().await; }
#[inline]
pub(super) async fn recv_push(rx: &mut mpsc::Receiver<F>) -> Option<F> { rx.recv().await }
#[expect(
clippy::manual_async_fn,
reason = "Generic lifetime requires explicit async move"
)]
pub(super) fn poll_optional<'a, T, Fut, R>(
opt: Option<&'a mut T>,
f: impl FnOnce(&'a mut T) -> Fut + Send + 'a,
) -> impl Future<Output = Option<R>> + Send + 'a
where
T: Send + 'a,
Fut: Future<Output = Option<R>> + Send + 'a,
{
async move {
if let Some(value) = opt {
f(value).await
} else {
None
}
}
}
pub(super) async fn poll_queue(rx: Option<&mut mpsc::Receiver<F>>) -> Option<F> {
Self::poll_optional(rx, Self::recv_push).await
}
}
impl<F, E> ConnectionActor<F, E>
where
F: FrameLike,
E: std::fmt::Debug,
{
pub(super) async fn poll_response(
resp: Option<&mut FrameStream<F, E>>,
) -> Option<Result<F, crate::response::WireframeError<E>>> {
use futures::StreamExt;
Self::poll_optional(resp, |s| s.next()).await
}
}