Skip to main content

mongodb/runtime/
acknowledged_message.rs

1use std::{pin::Pin, task::Poll};
2
3use futures_core::Future;
4
5/// A message type that includes an acknowledgement mechanism.
6/// When this is dropped or `acknowledge` is called, the sender will be notified.
7#[derive(Debug)]
8pub(crate) struct AcknowledgedMessage<M, R = ()> {
9    acknowledger: AcknowledgmentSender<R>,
10    message: M,
11}
12
13impl<M, R> AcknowledgedMessage<M, R> {
14    /// Create a new message and return it along with the AcknowledgmentReceiver that will
15    /// be notified when the message is received or when it is dropped.
16    pub(crate) fn package(message: M) -> (Self, AcknowledgmentReceiver<R>) {
17        let (sender, receiver) = tokio::sync::oneshot::channel();
18        (
19            Self {
20                message,
21                acknowledger: AcknowledgmentSender { sender },
22            },
23            AcknowledgmentReceiver { receiver },
24        )
25    }
26
27    /// Send acknowledgement to the receiver.
28    pub(crate) fn acknowledge(self, result: impl Into<R>) {
29        self.acknowledger.acknowledge(result)
30    }
31
32    pub(crate) fn into_parts(self) -> (M, AcknowledgmentSender<R>) {
33        (self.message, self.acknowledger)
34    }
35}
36
37impl<M, R> std::ops::Deref for AcknowledgedMessage<M, R> {
38    type Target = M;
39
40    fn deref(&self) -> &Self::Target {
41        &self.message
42    }
43}
44
45#[derive(Debug)]
46pub(crate) struct AcknowledgmentSender<R> {
47    sender: tokio::sync::oneshot::Sender<R>,
48}
49
50impl<R> AcknowledgmentSender<R> {
51    /// Send acknowledgement to the receiver.
52    pub(crate) fn acknowledge(self, result: impl Into<R>) {
53        // returns an error when the other end hangs up e.g. due to a timeout.
54        let _: std::result::Result<_, _> = self.sender.send(result.into());
55    }
56}
57
58/// Receiver for the acknowledgement that the message was received or dropped.
59pub(crate) struct AcknowledgmentReceiver<R> {
60    receiver: tokio::sync::oneshot::Receiver<R>,
61}
62
63impl<R> AcknowledgmentReceiver<R> {
64    /// Wait for the message to be acknowledged. If this returns None, that means the message
65    /// was dropped without the receiving end explicitly sending anything back.
66    pub(crate) async fn wait_for_acknowledgment(self) -> Option<R> {
67        self.receiver.await.ok()
68    }
69}
70
71impl<R> Future for AcknowledgmentReceiver<R> {
72    type Output = Option<R>;
73
74    fn poll(self: std::pin::Pin<&mut Self>, cx: &mut std::task::Context<'_>) -> Poll<Self::Output> {
75        match Pin::new(&mut self.get_mut().receiver).poll(cx) {
76            Poll::Ready(r) => Poll::Ready(r.ok()),
77            Poll::Pending => Poll::Pending,
78        }
79    }
80}