moosicbox_stream_utils/
lib.rs

1#![cfg_attr(feature = "fail-on-warnings", deny(warnings))]
2#![warn(clippy::all, clippy::pedantic, clippy::nursery, clippy::cargo)]
3#![allow(clippy::multiple_crate_versions)]
4
5use std::{
6    sync::{Arc, RwLock, atomic::AtomicUsize},
7    task::Poll,
8};
9
10use bytes::Bytes;
11use tokio::sync::mpsc::{UnboundedReceiver, UnboundedSender, unbounded_channel};
12
13#[cfg(feature = "remote-bytestream")]
14pub mod remote_bytestream;
15#[cfg(feature = "stalled-monitor")]
16pub mod stalled_monitor;
17
18static CUR_ID: AtomicUsize = AtomicUsize::new(1);
19
20pub fn new_byte_writer_id() -> usize {
21    CUR_ID.fetch_add(1, std::sync::atomic::Ordering::SeqCst)
22}
23
24#[derive(Clone)]
25pub struct ByteWriter {
26    pub id: usize,
27    written: Arc<RwLock<u64>>,
28    senders: Arc<RwLock<Vec<UnboundedSender<Bytes>>>>,
29}
30
31impl ByteWriter {
32    #[must_use]
33    pub fn stream(&self) -> ByteStream {
34        ByteStream::from(self)
35    }
36
37    /// # Panics
38    ///
39    /// * If the internal `RwLock` is poisoned
40    #[must_use]
41    pub fn bytes_written(&self) -> u64 {
42        *self.written.read().unwrap()
43    }
44
45    /// # Panics
46    ///
47    /// * If the internal `RwLock` is poisoned
48    pub fn close(&self) {
49        self.senders.write().unwrap().retain(|sender| {
50            if sender.send(Bytes::new()).is_err() {
51                log::debug!(
52                    "Receiver has disconnected from writer id={}. Removing sender.",
53                    self.id
54                );
55                false
56            } else {
57                true
58            }
59        });
60    }
61}
62
63impl Default for ByteWriter {
64    fn default() -> Self {
65        Self {
66            id: new_byte_writer_id(),
67            written: Arc::new(RwLock::new(0)),
68            senders: Arc::new(RwLock::new(vec![])),
69        }
70    }
71}
72
73impl std::io::Write for ByteWriter {
74    fn write(&mut self, buf: &[u8]) -> std::io::Result<usize> {
75        if buf.is_empty() {
76            return Ok(0);
77        }
78
79        let len = buf.len();
80
81        {
82            let written = {
83                let mut written = self.written.write().unwrap();
84                *written += len as u64;
85                *written
86            };
87            log::trace!("ByteWriter written={written}");
88
89            if self.senders.read().unwrap().is_empty() {
90                log::trace!(
91                    "No senders associated with ByteWriter writer id={}. Eating {len} bytes",
92                    self.id
93                );
94            }
95        }
96
97        log::trace!("Sending bytes buf of size {len} writer id={}", self.id);
98        let bytes: Bytes = buf.to_vec().into();
99        self.senders.write().unwrap().retain(|sender| {
100            if sender.send(bytes.clone()).is_err() {
101                log::debug!(
102                    "Receiver has disconnected from writer id={}. Removing sender.",
103                    self.id
104                );
105                false
106            } else {
107                true
108            }
109        });
110        Ok(buf.len())
111    }
112
113    fn flush(&mut self) -> std::io::Result<()> {
114        Ok(())
115    }
116}
117
118pub struct ByteStream {
119    id: usize,
120    receiver: UnboundedReceiver<Bytes>,
121}
122
123#[cfg(feature = "stalled-monitor")]
124impl ByteStream {
125    #[must_use]
126    pub fn stalled_monitor(
127        self,
128    ) -> stalled_monitor::StalledReadMonitor<Result<Bytes, std::io::Error>, Self> {
129        self.into()
130    }
131}
132
133#[cfg(feature = "stalled-monitor")]
134impl From<ByteStream>
135    for stalled_monitor::StalledReadMonitor<Result<Bytes, std::io::Error>, ByteStream>
136{
137    fn from(val: ByteStream) -> Self {
138        Self::new(val)
139    }
140}
141
142impl futures::Stream for ByteStream {
143    type Item = Result<Bytes, std::io::Error>;
144
145    fn poll_next(
146        self: std::pin::Pin<&mut Self>,
147        cx: &mut std::task::Context<'_>,
148    ) -> Poll<Option<Self::Item>> {
149        let stream = self.get_mut();
150        match stream.receiver.poll_recv(cx) {
151            Poll::Ready(Some(response)) => {
152                log::trace!(
153                    "Received bytes buf of size {} from writer id={}",
154                    response.len(),
155                    stream.id
156                );
157                Poll::Ready(Some(Ok(response)))
158            }
159            Poll::Pending => Poll::Pending,
160            Poll::Ready(None) => Poll::Ready(None),
161        }
162    }
163}
164
165#[allow(clippy::fallible_impl_from)]
166impl From<&ByteWriter> for ByteStream {
167    fn from(value: &ByteWriter) -> Self {
168        let (sender, receiver) = unbounded_channel();
169        value.senders.write().unwrap().push(sender);
170        Self {
171            id: value.id,
172            receiver,
173        }
174    }
175}
176
177#[derive(Clone)]
178pub struct TypedWriter<T> {
179    id: usize,
180    senders: Arc<RwLock<Vec<UnboundedSender<T>>>>,
181}
182
183impl<T> TypedWriter<T> {
184    #[must_use]
185    pub fn stream(&self) -> TypedStream<T> {
186        TypedStream::from(self)
187    }
188}
189
190impl<T: Clone> TypedWriter<T> {
191    /// # Panics
192    ///
193    /// * If the internal `RwLock` is poisoned
194    pub fn write(&self, buf: T) {
195        let mut senders = self.senders.write().unwrap();
196        let mut remove = vec![];
197        let len = senders.len();
198        for (i, sender) in senders.iter().enumerate() {
199            if i == len - 1 {
200                if sender.send(buf).is_err() {
201                    log::debug!(
202                        "Receiver has disconnected from writer id={}. Removing sender.",
203                        self.id
204                    );
205                    remove.insert(0, i);
206                }
207                break;
208            } else if sender.send(buf.clone()).is_err() {
209                log::debug!(
210                    "Receiver has disconnected from writer id={}. Removing sender.",
211                    self.id
212                );
213                remove.insert(0, i);
214            }
215        }
216        for i in remove {
217            senders.remove(i);
218        }
219    }
220}
221
222impl<T> Default for TypedWriter<T> {
223    fn default() -> Self {
224        Self {
225            id: new_byte_writer_id(),
226            senders: Arc::new(RwLock::new(vec![])),
227        }
228    }
229}
230
231pub struct TypedStream<T> {
232    receiver: UnboundedReceiver<T>,
233}
234
235#[cfg(feature = "stalled-monitor")]
236impl<T> TypedStream<T> {
237    #[must_use]
238    pub fn stalled_monitor(self) -> stalled_monitor::StalledReadMonitor<T, Self> {
239        self.into()
240    }
241}
242
243#[cfg(feature = "stalled-monitor")]
244impl<T> From<TypedStream<T>> for stalled_monitor::StalledReadMonitor<T, TypedStream<T>> {
245    fn from(val: TypedStream<T>) -> Self {
246        Self::new(val)
247    }
248}
249
250impl<T> futures::Stream for TypedStream<T> {
251    type Item = T;
252
253    fn poll_next(
254        self: std::pin::Pin<&mut Self>,
255        cx: &mut std::task::Context<'_>,
256    ) -> Poll<Option<Self::Item>> {
257        let stream = self.get_mut();
258        match stream.receiver.poll_recv(cx) {
259            Poll::Ready(Some(response)) => {
260                log::trace!("Received item");
261                Poll::Ready(Some(response))
262            }
263            Poll::Pending => Poll::Pending,
264            Poll::Ready(None) => Poll::Ready(None),
265        }
266    }
267}
268
269#[allow(clippy::fallible_impl_from)]
270impl<T> From<&TypedWriter<T>> for TypedStream<T> {
271    fn from(value: &TypedWriter<T>) -> Self {
272        let (sender, receiver) = unbounded_channel();
273        value.senders.write().unwrap().push(sender);
274        Self { receiver }
275    }
276}