protosocket_rpc/client/reactor/
completion_streaming.rs

1use std::{
2    pin::{pin, Pin},
3    task::{Context, Poll},
4};
5
6use tokio::sync::mpsc;
7
8use super::completion_registry::CompletionGuard;
9use crate::Message;
10
11/// A completion for a streaming RPC.
12///
13/// Make sure you process this stream quickly, and drop data yourself if you have to. The
14/// server will send data as quickly as it can.
15#[derive(Debug)]
16pub struct StreamingCompletion<Response, Request>
17where
18    Response: Message,
19    Request: Message,
20{
21    completion: mpsc::UnboundedReceiver<Response>,
22    completion_guard: CompletionGuard<Response, Request>,
23    closed: bool,
24    nexts: Vec<Response>,
25}
26
27/// SAFETY: There is no unsafe code in this implementation
28impl<Response, Request> Unpin for StreamingCompletion<Response, Request>
29where
30    Response: Message,
31    Request: Message,
32{
33}
34
35const LIMIT: usize = 16;
36
37impl<Response, Request> StreamingCompletion<Response, Request>
38where
39    Response: Message,
40    Request: Message,
41{
42    pub(crate) fn new(
43        completion: mpsc::UnboundedReceiver<Response>,
44        completion_guard: CompletionGuard<Response, Request>,
45    ) -> Self {
46        Self {
47            completion,
48            completion_guard,
49            closed: false,
50            nexts: Vec::with_capacity(LIMIT),
51        }
52    }
53}
54
55impl<Response, Request> futures::Stream for StreamingCompletion<Response, Request>
56where
57    Response: Message,
58    Request: Message,
59{
60    type Item = crate::Result<Response>;
61
62    fn poll_next(mut self: Pin<&mut Self>, context: &mut Context<'_>) -> Poll<Option<Self::Item>> {
63        if self.closed {
64            return Poll::Ready(None);
65        }
66        if self.nexts.is_empty() {
67            let Self {
68                completion, nexts, ..
69            } = &mut *self;
70            let received = pin!(completion).poll_recv_many(context, nexts, LIMIT);
71            match received {
72                Poll::Ready(count) => {
73                    if count == 0 {
74                        self.closed = true;
75                        self.completion_guard.set_closed();
76                        return Poll::Ready(Some(Err(crate::Error::Finished)));
77                    }
78                    // because it is a vector, we have to consume in reverse order. This is because
79                    // of the poll_recv_many argument type.
80                    nexts.reverse();
81                }
82                Poll::Pending => return Poll::Pending,
83            }
84        }
85        match self.nexts.pop() {
86            Some(next) => Poll::Ready(Some(Ok(next))),
87            None => {
88                log::error!("unexpected empty nexts");
89                Poll::Ready(None)
90            }
91        }
92    }
93}