mikrotik_api/api/call/
streaming.rs1use std::fmt::Debug;
2use std::{
3 sync::{Arc, Mutex},
4 task::Poll,
5};
6
7use futures::Stream;
8use serde::de::DeserializeOwned;
9use tokio::sync::{
10 mpsc::{self, UnboundedReceiver, UnboundedSender},
11 OnceCell,
12};
13
14use crate::api::{de::deserialize_sentence, Response};
15
16use super::{AsyncCall, CallError};
17pub struct StreamingCall<T> {
18 inner: Arc<Mutex<InnerStreamingCall<Response<T>>>>,
19 }
21
22struct InnerStreamingCall<T> {
23 receiver: UnboundedReceiver<T>,
24 sender: UnboundedSender<T>,
25 cell: OnceCell<()>,
26}
27
28impl<T> InnerStreamingCall<T> {
29 pub fn done(&mut self) -> Result<(), CallError> {
30 self.cell
31 .set(())
32 .map_err(|_| CallError::DoneAlreadyHappened)
33 }
34}
35
36impl<T> StreamingCall<T> {
37 pub fn new() -> Self {
38 let (sender, receiver) = mpsc::unbounded_channel();
39
40 let inner = Arc::new(Mutex::new(InnerStreamingCall {
41 sender,
42 receiver,
43 cell: OnceCell::new(),
44 }));
45
46 Self { inner }
47 }
48}
49
50impl<T: DeserializeOwned + Debug> AsyncCall for StreamingCall<T> {
51 fn push_reply(&mut self, sentence: Vec<String>) -> Result<(), CallError> {
52 let value = deserialize_sentence(sentence.as_slice())?;
53
54 if let Ok(inner) = self.inner.lock() {
55 let _ = inner.sender.send(value);
56
57 return Ok(());
58 }
59
60 Err(CallError::BadLock)
61 }
62
63 fn done(&mut self) -> Result<(), CallError> {
64 if let Ok(mut call) = self.inner.lock() {
65 call.done()?;
66
67 return Ok(());
68 }
69
70 Err(CallError::BadLock)
71 }
72}
73
74impl<T> Clone for StreamingCall<T> {
75 fn clone(&self) -> Self {
76 Self {
77 inner: self.inner.clone(),
78 }
79 }
80}
81
82impl<T> Stream for StreamingCall<T> {
83 type Item = Response<T>;
84
85 fn poll_next(
86 self: std::pin::Pin<&mut Self>,
87 cx: &mut std::task::Context<'_>,
88 ) -> std::task::Poll<Option<Self::Item>> {
89 if let Ok(mut inner) = self.inner.lock() {
90 let next_value = inner.receiver.poll_recv(cx);
91
92 if let Poll::Ready(Some(Response::Done)) = next_value {
93 return Poll::Ready(None);
95 }
96
97 return next_value;
98 }
99
100 Poll::Pending
101 }
102}