1use std::{
2 collections::VecDeque,
3 sync::{Arc, Mutex},
4 task::{Poll, Waker},
5};
6
7use futures::Stream;
8
9use crate::{Client, RPCResponse, RPCResult, SeqHandler, SeqRead, SerializedCommand};
10
11impl Client {
12 pub(crate) fn start_stream<R: RPCResponse>(
16 &self,
17 name: &'static str,
18 body: Vec<u8>,
19 ) -> RPCStream<R> {
20 let handler = Arc::new(Mutex::new(RPCStreamHandler {
21 waker: None,
22 queue: VecDeque::new(),
23 }));
24
25 let seq = self.send_command(SerializedCommand { name, body }, Some(handler.clone()));
26
27 RPCStream {
28 seq,
29 client: self.clone(),
30 handler,
31 }
32 }
33}
34
35pub struct RPCStream<R: RPCResponse> {
36 client: Client,
37 seq: u64,
38 handler: Arc<Mutex<RPCStreamHandler<R>>>,
39}
40
41pub(crate) struct RPCStreamHandler<R: RPCResponse> {
42 waker: Option<Waker>,
43 queue: VecDeque<RPCResult<R>>,
44}
45
46impl<T: RPCResponse> SeqHandler for Mutex<RPCStreamHandler<T>> {
47 fn handle(&self, res: RPCResult<SeqRead>) {
48 let RPCStreamHandler { waker, queue } = &mut *self.lock().unwrap();
49
50 let res = res.and_then(T::read_from);
51 queue.push_back(res);
52
53 if let Some(waker) = waker.take() {
54 waker.wake()
55 }
56 }
57 fn streaming(&self) -> bool {
58 true
59 }
60}
61
62impl<T: RPCResponse> Drop for RPCStream<T> {
63 fn drop(&mut self) {
64 self.client.deregister_seq_handler(self.seq);
65 self.client.stop_stream(self.seq).send_ignored();
66 }
67}
68
69impl<C: RPCResponse> Stream for RPCStream<C> {
70 type Item = RPCResult<C>;
71
72 fn poll_next(
73 self: std::pin::Pin<&mut Self>,
74 cx: &mut std::task::Context<'_>,
75 ) -> Poll<Option<Self::Item>> {
76 let RPCStreamHandler { waker, queue } = &mut *self.handler.lock().unwrap();
77
78 if let Some(res) = queue.pop_front() {
79 return Poll::Ready(Some(res));
80 };
81
82 waker.replace(cx.waker().clone());
83
84 Poll::Pending
85 }
86}