protosocket_rpc/client/reactor/
completion_streaming.rs1use std::{
2 pin::{pin, Pin},
3 task::{Context, Poll},
4};
5
6use tokio::sync::mpsc;
7
8use super::completion_registry::CompletionGuard;
9use crate::Message;
10
11#[derive(Debug)]
16pub struct StreamingCompletion<Response, Request>
17where
18 Response: Message,
19 Request: Message,
20{
21 completion: mpsc::UnboundedReceiver<Response>,
22 completion_guard: CompletionGuard<Response, Request>,
23 closed: bool,
24 nexts: Vec<Response>,
25}
26
27impl<Response, Request> Unpin for StreamingCompletion<Response, Request>
29where
30 Response: Message,
31 Request: Message,
32{
33}
34
35const LIMIT: usize = 16;
36
37impl<Response, Request> StreamingCompletion<Response, Request>
38where
39 Response: Message,
40 Request: Message,
41{
42 pub(crate) fn new(
43 completion: mpsc::UnboundedReceiver<Response>,
44 completion_guard: CompletionGuard<Response, Request>,
45 ) -> Self {
46 Self {
47 completion,
48 completion_guard,
49 closed: false,
50 nexts: Vec::with_capacity(LIMIT),
51 }
52 }
53}
54
55impl<Response, Request> futures::Stream for StreamingCompletion<Response, Request>
56where
57 Response: Message,
58 Request: Message,
59{
60 type Item = crate::Result<Response>;
61
62 fn poll_next(mut self: Pin<&mut Self>, context: &mut Context<'_>) -> Poll<Option<Self::Item>> {
63 if self.closed {
64 return Poll::Ready(None);
65 }
66 if self.nexts.is_empty() {
67 let Self {
68 completion, nexts, ..
69 } = &mut *self;
70 let received = pin!(completion).poll_recv_many(context, nexts, LIMIT);
71 match received {
72 Poll::Ready(count) => {
73 if count == 0 {
74 self.closed = true;
75 self.completion_guard.set_closed();
76 return Poll::Ready(Some(Err(crate::Error::Finished)));
77 }
78 nexts.reverse();
81 }
82 Poll::Pending => return Poll::Pending,
83 }
84 }
85 match self.nexts.pop() {
86 Some(next) => Poll::Ready(Some(Ok(next))),
87 None => {
88 log::error!("unexpected empty nexts");
89 Poll::Ready(None)
90 }
91 }
92 }
93}