use std::future::Future;
use std::io;
use std::os::fd::AsRawFd;
use std::pin::Pin;
use std::task::{self, Poll};
use crate::{OpIndex, SubmissionQueue};
#[derive(Copy, Clone, Debug)]
#[allow(clippy::module_name_repetitions)]
pub struct MsgToken(pub(crate) OpIndex);
#[allow(clippy::module_name_repetitions)]
pub fn msg_listener(sq: SubmissionQueue) -> io::Result<(MsgListener, MsgToken)> {
let op_index = sq.queue_multishot()?;
Ok((MsgListener { sq, op_index }, MsgToken(op_index)))
}
#[derive(Debug)]
#[must_use = "`Future`s do nothing unless polled"]
#[allow(clippy::module_name_repetitions)]
pub struct MsgListener {
sq: SubmissionQueue,
op_index: OpIndex,
}
impl MsgListener {
pub fn poll_next(self: Pin<&mut Self>, ctx: &mut task::Context<'_>) -> Poll<Option<u32>> {
log::trace!(op_index = self.op_index.0; "polling multishot messages");
if let Some(operation) = self.sq.shared.queued_ops.get(self.op_index.0) {
let mut operation = operation.lock().unwrap();
if let Some(op) = &mut *operation {
return match op.poll_msg(ctx) {
Poll::Ready(data) => Poll::Ready(Some(data.1)),
Poll::Pending => Poll::Pending,
};
}
}
panic!("a10::MsgListener called incorrectly");
}
}
#[cfg(feature = "nightly")]
impl std::async_iter::AsyncIterator for MsgListener {
type Item = u32;
fn poll_next(self: Pin<&mut Self>, ctx: &mut task::Context<'_>) -> Poll<Option<Self::Item>> {
self.poll_next(ctx)
}
}
#[allow(clippy::module_name_repetitions)]
pub fn try_send_msg(sq: &SubmissionQueue, token: MsgToken, data: u32) -> io::Result<()> {
sq.add_no_result(|submission| unsafe {
submission.msg(sq.shared.ring_fd.as_raw_fd(), (token.0).0 as u64, data, 0);
submission.no_completion_event();
})?;
Ok(())
}
#[allow(clippy::module_name_repetitions)]
pub const fn send_msg<'sq>(sq: &'sq SubmissionQueue, token: MsgToken, data: u32) -> SendMsg<'sq> {
SendMsg { sq, token, data }
}
#[derive(Debug)]
#[must_use = "`Future`s do nothing unless polled"]
#[allow(clippy::module_name_repetitions)]
pub struct SendMsg<'sq> {
sq: &'sq SubmissionQueue,
token: MsgToken,
data: u32,
}
impl<'sq> Future for SendMsg<'sq> {
type Output = io::Result<()>;
fn poll(self: Pin<&mut Self>, ctx: &mut task::Context<'_>) -> Poll<Self::Output> {
match try_send_msg(self.sq, self.token, self.data) {
Ok(()) => Poll::Ready(Ok(())),
Err(_) => {
self.sq.wait_for_submission(ctx.waker().clone());
Poll::Pending
}
}
}
}