moosicbox_stream_utils/
lib.rs1#![cfg_attr(feature = "fail-on-warnings", deny(warnings))]
2#![warn(clippy::all, clippy::pedantic, clippy::nursery, clippy::cargo)]
3#![allow(clippy::multiple_crate_versions)]
4
5use std::{
6 sync::{Arc, RwLock, atomic::AtomicUsize},
7 task::Poll,
8};
9
10use bytes::Bytes;
11use tokio::sync::mpsc::{UnboundedReceiver, UnboundedSender, unbounded_channel};
12
13#[cfg(feature = "remote-bytestream")]
14pub mod remote_bytestream;
15#[cfg(feature = "stalled-monitor")]
16pub mod stalled_monitor;
17
18static CUR_ID: AtomicUsize = AtomicUsize::new(1);
19
20pub fn new_byte_writer_id() -> usize {
21 CUR_ID.fetch_add(1, std::sync::atomic::Ordering::SeqCst)
22}
23
24#[derive(Clone)]
25pub struct ByteWriter {
26 pub id: usize,
27 written: Arc<RwLock<u64>>,
28 senders: Arc<RwLock<Vec<UnboundedSender<Bytes>>>>,
29}
30
31impl ByteWriter {
32 #[must_use]
33 pub fn stream(&self) -> ByteStream {
34 ByteStream::from(self)
35 }
36
37 #[must_use]
41 pub fn bytes_written(&self) -> u64 {
42 *self.written.read().unwrap()
43 }
44
45 pub fn close(&self) {
49 self.senders.write().unwrap().retain(|sender| {
50 if sender.send(Bytes::new()).is_err() {
51 log::debug!(
52 "Receiver has disconnected from writer id={}. Removing sender.",
53 self.id
54 );
55 false
56 } else {
57 true
58 }
59 });
60 }
61}
62
63impl Default for ByteWriter {
64 fn default() -> Self {
65 Self {
66 id: new_byte_writer_id(),
67 written: Arc::new(RwLock::new(0)),
68 senders: Arc::new(RwLock::new(vec![])),
69 }
70 }
71}
72
73impl std::io::Write for ByteWriter {
74 fn write(&mut self, buf: &[u8]) -> std::io::Result<usize> {
75 if buf.is_empty() {
76 return Ok(0);
77 }
78
79 let len = buf.len();
80
81 {
82 let written = {
83 let mut written = self.written.write().unwrap();
84 *written += len as u64;
85 *written
86 };
87 log::trace!("ByteWriter written={written}");
88
89 if self.senders.read().unwrap().is_empty() {
90 log::trace!(
91 "No senders associated with ByteWriter writer id={}. Eating {len} bytes",
92 self.id
93 );
94 }
95 }
96
97 log::trace!("Sending bytes buf of size {len} writer id={}", self.id);
98 let bytes: Bytes = buf.to_vec().into();
99 self.senders.write().unwrap().retain(|sender| {
100 if sender.send(bytes.clone()).is_err() {
101 log::debug!(
102 "Receiver has disconnected from writer id={}. Removing sender.",
103 self.id
104 );
105 false
106 } else {
107 true
108 }
109 });
110 Ok(buf.len())
111 }
112
113 fn flush(&mut self) -> std::io::Result<()> {
114 Ok(())
115 }
116}
117
118pub struct ByteStream {
119 id: usize,
120 receiver: UnboundedReceiver<Bytes>,
121}
122
123#[cfg(feature = "stalled-monitor")]
124impl ByteStream {
125 #[must_use]
126 pub fn stalled_monitor(
127 self,
128 ) -> stalled_monitor::StalledReadMonitor<Result<Bytes, std::io::Error>, Self> {
129 self.into()
130 }
131}
132
133#[cfg(feature = "stalled-monitor")]
134impl From<ByteStream>
135 for stalled_monitor::StalledReadMonitor<Result<Bytes, std::io::Error>, ByteStream>
136{
137 fn from(val: ByteStream) -> Self {
138 Self::new(val)
139 }
140}
141
142impl futures::Stream for ByteStream {
143 type Item = Result<Bytes, std::io::Error>;
144
145 fn poll_next(
146 self: std::pin::Pin<&mut Self>,
147 cx: &mut std::task::Context<'_>,
148 ) -> Poll<Option<Self::Item>> {
149 let stream = self.get_mut();
150 match stream.receiver.poll_recv(cx) {
151 Poll::Ready(Some(response)) => {
152 log::trace!(
153 "Received bytes buf of size {} from writer id={}",
154 response.len(),
155 stream.id
156 );
157 Poll::Ready(Some(Ok(response)))
158 }
159 Poll::Pending => Poll::Pending,
160 Poll::Ready(None) => Poll::Ready(None),
161 }
162 }
163}
164
165#[allow(clippy::fallible_impl_from)]
166impl From<&ByteWriter> for ByteStream {
167 fn from(value: &ByteWriter) -> Self {
168 let (sender, receiver) = unbounded_channel();
169 value.senders.write().unwrap().push(sender);
170 Self {
171 id: value.id,
172 receiver,
173 }
174 }
175}
176
177#[derive(Clone)]
178pub struct TypedWriter<T> {
179 id: usize,
180 senders: Arc<RwLock<Vec<UnboundedSender<T>>>>,
181}
182
183impl<T> TypedWriter<T> {
184 #[must_use]
185 pub fn stream(&self) -> TypedStream<T> {
186 TypedStream::from(self)
187 }
188}
189
190impl<T: Clone> TypedWriter<T> {
191 pub fn write(&self, buf: T) {
195 let mut senders = self.senders.write().unwrap();
196 let mut remove = vec![];
197 let len = senders.len();
198 for (i, sender) in senders.iter().enumerate() {
199 if i == len - 1 {
200 if sender.send(buf).is_err() {
201 log::debug!(
202 "Receiver has disconnected from writer id={}. Removing sender.",
203 self.id
204 );
205 remove.insert(0, i);
206 }
207 break;
208 } else if sender.send(buf.clone()).is_err() {
209 log::debug!(
210 "Receiver has disconnected from writer id={}. Removing sender.",
211 self.id
212 );
213 remove.insert(0, i);
214 }
215 }
216 for i in remove {
217 senders.remove(i);
218 }
219 }
220}
221
222impl<T> Default for TypedWriter<T> {
223 fn default() -> Self {
224 Self {
225 id: new_byte_writer_id(),
226 senders: Arc::new(RwLock::new(vec![])),
227 }
228 }
229}
230
231pub struct TypedStream<T> {
232 receiver: UnboundedReceiver<T>,
233}
234
235#[cfg(feature = "stalled-monitor")]
236impl<T> TypedStream<T> {
237 #[must_use]
238 pub fn stalled_monitor(self) -> stalled_monitor::StalledReadMonitor<T, Self> {
239 self.into()
240 }
241}
242
243#[cfg(feature = "stalled-monitor")]
244impl<T> From<TypedStream<T>> for stalled_monitor::StalledReadMonitor<T, TypedStream<T>> {
245 fn from(val: TypedStream<T>) -> Self {
246 Self::new(val)
247 }
248}
249
250impl<T> futures::Stream for TypedStream<T> {
251 type Item = T;
252
253 fn poll_next(
254 self: std::pin::Pin<&mut Self>,
255 cx: &mut std::task::Context<'_>,
256 ) -> Poll<Option<Self::Item>> {
257 let stream = self.get_mut();
258 match stream.receiver.poll_recv(cx) {
259 Poll::Ready(Some(response)) => {
260 log::trace!("Received item");
261 Poll::Ready(Some(response))
262 }
263 Poll::Pending => Poll::Pending,
264 Poll::Ready(None) => Poll::Ready(None),
265 }
266 }
267}
268
269#[allow(clippy::fallible_impl_from)]
270impl<T> From<&TypedWriter<T>> for TypedStream<T> {
271 fn from(value: &TypedWriter<T>) -> Self {
272 let (sender, receiver) = unbounded_channel();
273 value.senders.write().unwrap().push(sender);
274 Self { receiver }
275 }
276}