Skip to main content

protosocket_rpc/client/reactor/
completion_streaming.rs

1use std::{
2    pin::Pin,
3    task::{Context, Poll},
4};
5
6use super::completion_registry::CompletionGuard;
7use crate::Message;
8
9/// A completion for a streaming RPC.
10///
11/// Make sure you process this stream quickly, and drop data yourself if you have to. The
12/// server will send data as quickly as it can.
13#[derive(Debug)]
14pub struct StreamingCompletion<Response, Request>
15where
16    Response: Message,
17    Request: Message,
18{
19    completion: spillway::Receiver<Response>,
20    completion_guard: CompletionGuard<Response, Request>,
21    closed: bool,
22}
23
24/// SAFETY: There is no unsafe code in this implementation
25impl<Response, Request> Unpin for StreamingCompletion<Response, Request>
26where
27    Response: Message,
28    Request: Message,
29{
30}
31
32impl<Response, Request> StreamingCompletion<Response, Request>
33where
34    Response: Message,
35    Request: Message,
36{
37    pub(crate) fn new(
38        completion: spillway::Receiver<Response>,
39        completion_guard: CompletionGuard<Response, Request>,
40    ) -> Self {
41        Self {
42            completion,
43            completion_guard,
44            closed: false,
45        }
46    }
47}
48
49impl<Response, Request> futures::Stream for StreamingCompletion<Response, Request>
50where
51    Response: Message,
52    Request: Message,
53{
54    type Item = crate::Result<Response>;
55
56    fn poll_next(mut self: Pin<&mut Self>, context: &mut Context<'_>) -> Poll<Option<Self::Item>> {
57        if self.closed {
58            return Poll::Ready(None);
59        }
60        match self.completion.poll_next(context) {
61            Poll::Ready(Some(next)) => Poll::Ready(Some(Ok(next))),
62            Poll::Ready(None) => {
63                self.closed = true;
64                self.completion_guard.set_closed();
65                Poll::Ready(Some(Err(crate::Error::Finished)))
66            }
67            Poll::Pending => Poll::Pending,
68        }
69    }
70}