protosocket_rpc/client/reactor/
completion_streaming.rs

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
use std::{
    pin::{pin, Pin},
    task::{Context, Poll},
};

use tokio::sync::mpsc;

use super::completion_registry::CompletionGuard;
use crate::Message;

/// A completion for a streaming RPC.
///
/// Make sure you process this stream quickly, and drop data yourself if you have to. The
/// server will send data as quickly as it can.
#[derive(Debug)]
pub struct StreamingCompletion<Response>
where
    Response: Message,
{
    completion: mpsc::UnboundedReceiver<Response>,
    _completion_guard: CompletionGuard<Response>,
    closed: bool,
    nexts: Vec<Response>,
}

/// SAFETY: There is no unsafe code in this implementation
impl<Response> Unpin for StreamingCompletion<Response> where Response: Message {}

const LIMIT: usize = 16;

impl<Response> StreamingCompletion<Response>
where
    Response: Message,
{
    pub(crate) fn new(
        completion: mpsc::UnboundedReceiver<Response>,
        completion_guard: CompletionGuard<Response>,
    ) -> Self {
        Self {
            completion,
            _completion_guard: completion_guard,
            closed: false,
            nexts: Vec::with_capacity(LIMIT),
        }
    }
}

impl<Response> futures::Stream for StreamingCompletion<Response>
where
    Response: Message,
{
    type Item = crate::Result<Response>;

    fn poll_next(mut self: Pin<&mut Self>, context: &mut Context<'_>) -> Poll<Option<Self::Item>> {
        if self.closed {
            return Poll::Ready(None);
        }
        if self.nexts.is_empty() {
            let Self {
                completion, nexts, ..
            } = &mut *self;
            let received = pin!(completion).poll_recv_many(context, nexts, LIMIT);
            match received {
                Poll::Ready(count) => {
                    if count == 0 {
                        self.closed = true;
                        return Poll::Ready(Some(Err(crate::Error::Finished)));
                    }
                    // because it is a vector, we have to consume in reverse order. This is because
                    // of the poll_recv_many argument type.
                    nexts.reverse();
                }
                Poll::Pending => return Poll::Pending,
            }
        }
        match self.nexts.pop() {
            Some(next) => Poll::Ready(Some(Ok(next))),
            None => {
                log::error!("unexpected empty nexts");
                Poll::Ready(None)
            }
        }
    }
}