protosocket_rpc/client/reactor/
completion_streaming.rs1use std::{
2 pin::Pin,
3 task::{Context, Poll},
4};
5
6use super::completion_registry::CompletionGuard;
7use crate::Message;
8
9#[derive(Debug)]
14pub struct StreamingCompletion<Response, Request>
15where
16 Response: Message,
17 Request: Message,
18{
19 completion: spillway::Receiver<Response>,
20 completion_guard: CompletionGuard<Response, Request>,
21 closed: bool,
22}
23
24impl<Response, Request> Unpin for StreamingCompletion<Response, Request>
26where
27 Response: Message,
28 Request: Message,
29{
30}
31
32impl<Response, Request> StreamingCompletion<Response, Request>
33where
34 Response: Message,
35 Request: Message,
36{
37 pub(crate) fn new(
38 completion: spillway::Receiver<Response>,
39 completion_guard: CompletionGuard<Response, Request>,
40 ) -> Self {
41 Self {
42 completion,
43 completion_guard,
44 closed: false,
45 }
46 }
47}
48
49impl<Response, Request> futures::Stream for StreamingCompletion<Response, Request>
50where
51 Response: Message,
52 Request: Message,
53{
54 type Item = crate::Result<Response>;
55
56 fn poll_next(mut self: Pin<&mut Self>, context: &mut Context<'_>) -> Poll<Option<Self::Item>> {
57 if self.closed {
58 return Poll::Ready(None);
59 }
60 match self.completion.poll_next(context) {
61 Poll::Ready(Some(next)) => Poll::Ready(Some(Ok(next))),
62 Poll::Ready(None) => {
63 self.closed = true;
64 self.completion_guard.set_closed();
65 Poll::Ready(Some(Err(crate::Error::Finished)))
66 }
67 Poll::Pending => Poll::Pending,
68 }
69 }
70}