serf_rpc/
stream.rs

1use std::{
2    collections::VecDeque,
3    sync::{Arc, Mutex},
4    task::{Poll, Waker},
5};
6
7use futures::Stream;
8
9use crate::{Client, RPCResponse, RPCResult, SeqHandler, SeqRead, SerializedCommand};
10
11impl Client {
12    /// Sends a command and registers a streaming sequence handler.
13    ///
14    /// Note that the request is sent immediately (asyncronously, but not lazily).
15    pub(crate) fn start_stream<R: RPCResponse>(
16        &self,
17        name: &'static str,
18        body: Vec<u8>,
19    ) -> RPCStream<R> {
20        let handler = Arc::new(Mutex::new(RPCStreamHandler {
21            waker: None,
22            queue: VecDeque::new(),
23        }));
24
25        let seq = self.send_command(SerializedCommand { name, body }, Some(handler.clone()));
26
27        RPCStream {
28            seq,
29            client: self.clone(),
30            handler,
31        }
32    }
33}
34
35pub struct RPCStream<R: RPCResponse> {
36    client: Client,
37    seq: u64,
38    handler: Arc<Mutex<RPCStreamHandler<R>>>,
39}
40
41pub(crate) struct RPCStreamHandler<R: RPCResponse> {
42    waker: Option<Waker>,
43    queue: VecDeque<RPCResult<R>>,
44}
45
46impl<T: RPCResponse> SeqHandler for Mutex<RPCStreamHandler<T>> {
47    fn handle(&self, res: RPCResult<SeqRead>) {
48        let RPCStreamHandler { waker, queue } = &mut *self.lock().unwrap();
49
50        let res = res.and_then(T::read_from);
51        queue.push_back(res);
52
53        if let Some(waker) = waker.take() {
54            waker.wake()
55        }
56    }
57    fn streaming(&self) -> bool {
58        true
59    }
60}
61
62impl<T: RPCResponse> Drop for RPCStream<T> {
63    fn drop(&mut self) {
64        self.client.deregister_seq_handler(self.seq);
65        self.client.stop_stream(self.seq).send_ignored();
66    }
67}
68
69impl<C: RPCResponse> Stream for RPCStream<C> {
70    type Item = RPCResult<C>;
71
72    fn poll_next(
73        self: std::pin::Pin<&mut Self>,
74        cx: &mut std::task::Context<'_>,
75    ) -> Poll<Option<Self::Item>> {
76        let RPCStreamHandler { waker, queue } = &mut *self.handler.lock().unwrap();
77
78        if let Some(res) = queue.pop_front() {
79            return Poll::Ready(Some(res));
80        };
81
82        waker.replace(cx.waker().clone());
83
84        Poll::Pending
85    }
86}