protosocket_rpc/client/reactor/
completion_unary.rs

1use std::{
2    future::Future,
3    pin::{pin, Pin},
4    task::{Context, Poll},
5};
6
7use tokio::sync::oneshot;
8
9use super::completion_registry::CompletionGuard;
10use crate::Message;
11
12/// A completion for a unary RPC.
13#[derive(Debug)]
14pub struct UnaryCompletion<Response, Request>
15where
16    Response: Message,
17    Request: Message,
18{
19    completion: oneshot::Receiver<crate::Result<Response>>,
20    completion_guard: CompletionGuard<Response, Request>,
21}
22
23impl<Response, Request> UnaryCompletion<Response, Request>
24where
25    Response: Message,
26    Request: Message,
27{
28    pub(crate) fn new(
29        completion: oneshot::Receiver<crate::Result<Response>>,
30        completion_guard: CompletionGuard<Response, Request>,
31    ) -> Self {
32        Self {
33            completion,
34            completion_guard,
35        }
36    }
37}
38
39impl<Response, Request> Future for UnaryCompletion<Response, Request>
40where
41    Response: Message,
42    Request: Message,
43{
44    type Output = crate::Result<Response>;
45
46    fn poll(mut self: Pin<&mut Self>, context: &mut Context<'_>) -> Poll<Self::Output> {
47        match pin!(&mut self.completion).poll(context) {
48            Poll::Ready(result) => {
49                self.completion_guard.set_closed();
50                match result {
51                    Ok(done) => Poll::Ready(done),
52                    Err(_cancelled) => Poll::Ready(Err(crate::Error::CancelledRemotely)),
53                }
54            }
55            Poll::Pending => Poll::Pending,
56        }
57    }
58}