1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
//! User space messages.
//!
//! To setup a [`MsgListener`] use [`msg_listener`]. It returns the listener as
//! well as a [`MsgToken`], which can be used in [`try_send_msg`] and
//! [`send_msg`] to send a message to the created `MsgListener`.

use std::future::Future;
use std::io;
use std::os::fd::AsRawFd;
use std::pin::Pin;
use std::task::{self, Poll};

use crate::{OpIndex, SubmissionQueue};

/// Token used to the messages.
///
/// See [`msg_listener`].
#[derive(Copy, Clone, Debug)]
#[allow(clippy::module_name_repetitions)]
pub struct MsgToken(pub(crate) OpIndex);

/// Setup a listener for user space messages.
///
/// The returned [`MsgListener`] will return all messages send using
/// [`try_send_msg`] and [`send_msg`] using the returned `MsgToken`.
///
/// # Notes
///
/// This will return an error if too many operations are already queued,
/// this is usually resolved by calling [`Ring::poll`].
///
/// The returned `MsgToken` has an implicitly lifetime linked to
/// `MsgListener`. If `MsgListener` is dropped the `MsgToken` will
/// become invalid.
///
/// Due to the limitations mentioned above it's advised to consider the
/// usefulness of the type severly limited. The returned `MsgListener`
/// iterator should live for the entire lifetime of the `Ring`, to ensure we
/// don't use `MsgToken` after it became invalid. Furthermore to ensure
/// the creation of it succeeds it should be done early in the lifetime of
/// `Ring`.
///
/// [`Ring::poll`]: crate::Ring::poll
#[allow(clippy::module_name_repetitions)]
pub fn msg_listener(sq: SubmissionQueue) -> io::Result<(MsgListener, MsgToken)> {
    let op_index = sq.queue_multishot()?;
    Ok((MsgListener { sq, op_index }, MsgToken(op_index)))
}

/// [`AsyncIterator`] behind [`msg_listener`].
///
/// [`AsyncIterator`]: std::async_iter::AsyncIterator
#[derive(Debug)]
#[must_use = "`Future`s do nothing unless polled"]
#[allow(clippy::module_name_repetitions)]
pub struct MsgListener {
    sq: SubmissionQueue,
    op_index: OpIndex,
}

impl MsgListener {
    /// This is the same as the `AsyncIterator::poll_next` function, but then
    /// available on stable Rust.
    pub fn poll_next(self: Pin<&mut Self>, ctx: &mut task::Context<'_>) -> Poll<Option<u32>> {
        log::trace!(op_index = self.op_index.0; "polling multishot messages");
        if let Some(operation) = self.sq.shared.queued_ops.get(self.op_index.0) {
            let mut operation = operation.lock().unwrap();
            if let Some(op) = &mut *operation {
                return match op.poll_msg(ctx) {
                    Poll::Ready(data) => Poll::Ready(Some(data.1)),
                    Poll::Pending => Poll::Pending,
                };
            }
        }
        panic!("a10::MsgListener called incorrectly");
    }
}

#[cfg(feature = "nightly")]
impl std::async_iter::AsyncIterator for MsgListener {
    type Item = u32;

    fn poll_next(self: Pin<&mut Self>, ctx: &mut task::Context<'_>) -> Poll<Option<Self::Item>> {
        self.poll_next(ctx)
    }
}

/// Try to send a message to iterator listening for message using [`MsgToken`].
///
/// This will use the io_uring submission queue to share `data` with the
/// receiving end. This means that it will wake up the thread if it's
/// currently [polling].
///
/// This will fail if the submission queue is currently full. See [`send_msg`]
/// for a version that tries again when the submission queue is full.
///
/// See [`msg_listener`] for examples.
///
/// [polling]: crate::Ring::poll
#[allow(clippy::module_name_repetitions)]
pub fn try_send_msg(sq: &SubmissionQueue, token: MsgToken, data: u32) -> io::Result<()> {
    sq.add_no_result(|submission| unsafe {
        submission.msg(sq.shared.ring_fd.as_raw_fd(), (token.0).0 as u64, data, 0);
        submission.no_completion_event();
    })?;
    Ok(())
}

/// Send a message to iterator listening for message using [`MsgToken`].
#[allow(clippy::module_name_repetitions)]
pub const fn send_msg<'sq>(sq: &'sq SubmissionQueue, token: MsgToken, data: u32) -> SendMsg<'sq> {
    SendMsg { sq, token, data }
}

/// [`Future`] behind [`send_msg`].
#[derive(Debug)]
#[must_use = "`Future`s do nothing unless polled"]
#[allow(clippy::module_name_repetitions)]
pub struct SendMsg<'sq> {
    sq: &'sq SubmissionQueue,
    token: MsgToken,
    data: u32,
}

impl<'sq> Future for SendMsg<'sq> {
    type Output = io::Result<()>;

    fn poll(self: Pin<&mut Self>, ctx: &mut task::Context<'_>) -> Poll<Self::Output> {
        match try_send_msg(self.sq, self.token, self.data) {
            Ok(()) => Poll::Ready(Ok(())),
            Err(_) => {
                self.sq.wait_for_submission(ctx.waker().clone());
                Poll::Pending
            }
        }
    }
}