mikrotik_api/api/call/
streaming.rs

1use std::fmt::Debug;
2use std::{
3    sync::{Arc, Mutex},
4    task::Poll,
5};
6
7use futures::Stream;
8use serde::de::DeserializeOwned;
9use tokio::sync::{
10    mpsc::{self, UnboundedReceiver, UnboundedSender},
11    OnceCell,
12};
13
14use crate::api::{de::deserialize_sentence, Response};
15
16use super::{AsyncCall, CallError};
17pub struct StreamingCall<T> {
18    inner: Arc<Mutex<InnerStreamingCall<Response<T>>>>,
19    // sender: Arc<Mutex<Sender<Response<T>>>>
20}
21
22struct InnerStreamingCall<T> {
23    receiver: UnboundedReceiver<T>,
24    sender: UnboundedSender<T>,
25    cell: OnceCell<()>,
26}
27
28impl<T> InnerStreamingCall<T> {
29    pub fn done(&mut self) -> Result<(), CallError> {
30        self.cell
31            .set(())
32            .map_err(|_| CallError::DoneAlreadyHappened)
33    }
34}
35
36impl<T> StreamingCall<T> {
37    pub fn new() -> Self {
38        let (sender, receiver) = mpsc::unbounded_channel();
39
40        let inner = Arc::new(Mutex::new(InnerStreamingCall {
41            sender,
42            receiver,
43            cell: OnceCell::new(),
44        }));
45
46        Self { inner }
47    }
48}
49
50impl<T: DeserializeOwned + Debug> AsyncCall for StreamingCall<T> {
51    fn push_reply(&mut self, sentence: Vec<String>) -> Result<(), CallError> {
52        let value = deserialize_sentence(sentence.as_slice())?;
53
54        if let Ok(inner) = self.inner.lock() {
55            let _ = inner.sender.send(value);
56
57            return Ok(());
58        }
59
60        Err(CallError::BadLock)
61    }
62
63    fn done(&mut self) -> Result<(), CallError> {
64        if let Ok(mut call) = self.inner.lock() {
65            call.done()?;
66
67            return Ok(());
68        }
69
70        Err(CallError::BadLock)
71    }
72}
73
74impl<T> Clone for StreamingCall<T> {
75    fn clone(&self) -> Self {
76        Self {
77            inner: self.inner.clone(),
78        }
79    }
80}
81
82impl<T> Stream for StreamingCall<T> {
83    type Item = Response<T>;
84
85    fn poll_next(
86        self: std::pin::Pin<&mut Self>,
87        cx: &mut std::task::Context<'_>,
88    ) -> std::task::Poll<Option<Self::Item>> {
89        if let Ok(mut inner) = self.inner.lock() {
90            let next_value = inner.receiver.poll_recv(cx);
91
92            if let Poll::Ready(Some(Response::Done)) = next_value {
93                // A !done reply is our End Of Stream.
94                return Poll::Ready(None);
95            }
96
97            return next_value;
98        }
99
100        Poll::Pending
101    }
102}