tokio_process_tools/output_stream/backend/broadcast/
mod.rs1use crate::WaitForLineResult;
18use crate::output_stream::config::StreamConfig;
19use crate::output_stream::consumer::driver::consume_sync;
20use crate::output_stream::event::StreamEvent;
21use crate::output_stream::line::adapter::ParseLines;
22use crate::output_stream::policy::{
23 Delivery, DeliveryGuarantee, LossyWithoutBackpressure, NoReplay, Replay, ReplayEnabled,
24};
25use crate::output_stream::visitors::wait::WaitForLine;
26use crate::output_stream::{Consumable, OutputStream, Subscribable};
27use crate::{LineParsingOptions, NumBytes};
28use std::borrow::Cow;
29use std::convert::Infallible;
30use std::fmt::{Debug, Formatter};
31use std::future::Future;
32use std::sync::Arc;
33use std::time::Duration;
34use tokio::io::AsyncRead;
35#[cfg(test)]
36use tokio::sync::watch;
37use unwrap_infallible::UnwrapInfallible;
38
39mod fanout;
40mod fast;
41mod state;
42mod subscription;
43
44use fanout::{FanoutReplayBackend, new_fanout_backend};
45use fast::{FastBackend, new_fast_backend};
46use state::{BestEffortLiveQueue, SubscriberSender};
47use subscription::{FastSubscription, LiveReceiver, SharedSubscription};
48
49pub use subscription::BroadcastSubscription;
50
51enum Backend<D, R>
52where
53 D: Delivery,
54 R: Replay,
55{
56 Fast(FastBackend),
57 FanoutReplay(FanoutReplayBackend<D, R>),
58}
59
60pub struct BroadcastOutputStream<D = LossyWithoutBackpressure, R = NoReplay>
68where
69 D: Delivery,
70 R: Replay,
71{
72 backend: Backend<D, R>,
73}
74
75impl<D, R> Drop for BroadcastOutputStream<D, R>
76where
77 D: Delivery,
78 R: Replay,
79{
80 fn drop(&mut self) {
81 match &self.backend {
82 Backend::Fast(backend) => {
83 backend.stream_reader.abort();
84 }
85 Backend::FanoutReplay(backend) => {
86 backend.stream_reader.abort();
87 {
88 let mut state = backend
89 .shared
90 .state
91 .lock()
92 .expect("broadcast state poisoned");
93 state.close_for_drop();
94 }
95 }
96 }
97 }
98}
99
100impl<D, R> Debug for BroadcastOutputStream<D, R>
101where
102 D: Delivery + Debug,
103 R: Replay + Debug,
104{
105 fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
106 let mut debug = f.debug_struct("BroadcastOutputStream");
107 debug.field("output_collector", &"non-debug < JoinHandle<()> >");
108 match &self.backend {
109 Backend::Fast(backend) => {
110 debug.field("backend", &"tokio::sync::broadcast");
111 debug.field("options", &backend.options);
112 debug.field("name", &backend.name);
113 }
114 Backend::FanoutReplay(backend) => {
115 debug.field("backend", &"fanout replay");
116 debug.field("options", &backend.options);
117 debug.field("name", &backend.name);
118 }
119 }
120 debug.finish_non_exhaustive()
121 }
122}
123
124impl<D, R> OutputStream for BroadcastOutputStream<D, R>
125where
126 D: Delivery,
127 R: Replay,
128{
129 fn read_chunk_size(&self) -> NumBytes {
130 match &self.backend {
131 Backend::Fast(backend) => backend.options.read_chunk_size,
132 Backend::FanoutReplay(backend) => backend.options.read_chunk_size,
133 }
134 }
135
136 fn max_buffered_chunks(&self) -> usize {
137 match &self.backend {
138 Backend::Fast(backend) => backend.options.max_buffered_chunks,
139 Backend::FanoutReplay(backend) => backend.options.max_buffered_chunks,
140 }
141 }
142
143 fn name(&self) -> &'static str {
144 match &self.backend {
145 Backend::Fast(backend) => backend.name,
146 Backend::FanoutReplay(backend) => backend.name,
147 }
148 }
149}
150
151impl<D, R> BroadcastOutputStream<D, R>
152where
153 D: Delivery,
154 R: Replay,
155{
156 #[doc(hidden)]
158 #[must_use]
159 pub fn from_stream<S: AsyncRead + Unpin + Send + 'static>(
160 stream: S,
161 stream_name: &'static str,
162 options: StreamConfig<D, R>,
163 ) -> Self {
164 options.assert_valid("options");
165
166 if options.delivery_guarantee() == DeliveryGuarantee::LossyWithoutBackpressure
167 && !options.replay_enabled()
168 {
169 return Self {
170 backend: Backend::Fast(new_fast_backend(
171 stream,
172 stream_name,
173 options.read_chunk_size,
174 options.max_buffered_chunks,
175 )),
176 };
177 }
178
179 Self {
180 backend: Backend::FanoutReplay(new_fanout_backend(stream, stream_name, options)),
181 }
182 }
183}
184
185impl<D> BroadcastOutputStream<D, ReplayEnabled>
186where
187 D: Delivery,
188{
189 pub fn seal_replay(&self) {
198 let Backend::FanoutReplay(backend) = &self.backend else {
199 return;
200 };
201 {
202 let mut state = backend
203 .shared
204 .state
205 .lock()
206 .expect("broadcast state poisoned");
207 state.seal_replay();
208 }
209 }
210
211 #[must_use]
217 pub fn is_replay_sealed(&self) -> bool {
218 let Backend::FanoutReplay(backend) = &self.backend else {
219 return false;
220 };
221 backend
222 .shared
223 .state
224 .lock()
225 .expect("broadcast state poisoned")
226 .replay_sealed
227 }
228}
229
230#[cfg(test)]
231impl<D, R> BroadcastOutputStream<D, R>
232where
233 D: Delivery,
234 R: Replay,
235{
236 pub(super) fn subscribe_bytes_ingested(&self) -> watch::Receiver<u64> {
237 match &self.backend {
238 Backend::Fast(backend) => backend.bytes_ingested_tx.subscribe(),
239 Backend::FanoutReplay(backend) => backend.shared.subscribe_bytes_ingested(),
240 }
241 }
242}
243
244impl<D, R> Subscribable for BroadcastOutputStream<D, R>
245where
246 D: Delivery,
247 R: Replay,
248{
249 type Subscription = BroadcastSubscription<D, R>;
250 type SubscribeError = Infallible;
251
252 fn try_subscribe(&self) -> Result<Self::Subscription, Self::SubscribeError> {
253 Ok(match &self.backend {
254 Backend::Fast(backend) => {
255 let (receiver, emit_terminal_event) = {
256 let state = backend
257 .closure_state
258 .lock()
259 .expect("closure_state poisoned");
260 let receiver = backend.sender.subscribe();
261 let terminal_event = state
262 .read_error
263 .clone()
264 .map(StreamEvent::ReadError)
265 .or_else(|| state.closed.then_some(StreamEvent::Eof));
266 (receiver, terminal_event)
267 };
268
269 BroadcastSubscription::fast(FastSubscription {
270 receiver,
271 emit_terminal_event,
272 })
273 }
274 Backend::FanoutReplay(backend) => {
275 let mut state = backend
276 .shared
277 .state
278 .lock()
279 .expect("broadcast state poisoned");
280
281 let (subscriber_sender, live_receiver) = match backend.options.delivery_guarantee()
282 {
283 DeliveryGuarantee::ReliableWithBackpressure => {
284 let (sender, receiver) =
285 tokio::sync::mpsc::channel(backend.options.max_buffered_chunks);
286 (
287 SubscriberSender::Reliable(sender),
288 LiveReceiver::Reliable(receiver),
289 )
290 }
291 DeliveryGuarantee::LossyWithoutBackpressure => {
292 let queue = Arc::new(BestEffortLiveQueue::new(
293 backend.options.max_buffered_chunks,
294 ));
295 (
296 SubscriberSender::BestEffort(Arc::clone(&queue)),
297 LiveReceiver::BestEffort(queue),
298 )
299 }
300 };
301 let (replay, live_start_seq) = state.replay_snapshot(backend.options);
302 let id = if state.closed || state.terminal.is_some() {
303 None
304 } else {
305 Some(state.add_subscriber(subscriber_sender))
306 };
307
308 BroadcastSubscription::shared(SharedSubscription {
309 shared: Arc::clone(&backend.shared),
310 id,
311 replay,
312 live_start_seq,
313 live_receiver: if id.is_some() {
314 live_receiver
315 } else {
316 LiveReceiver::Closed
317 },
318 _marker: std::marker::PhantomData,
319 done: false,
320 })
321 }
322 })
323 }
324}
325
326impl<D, R> Consumable for BroadcastOutputStream<D, R>
327where
328 D: Delivery,
329 R: Replay,
330{
331 type Error = Infallible;
332}
333
334impl<D, R> BroadcastOutputStream<D, R>
335where
336 D: Delivery,
337 R: Replay,
338{
339 pub fn wait_for_line(
366 &self,
367 timeout: Duration,
368 predicate: impl Fn(Cow<'_, str>) -> bool + Send + 'static,
369 options: LineParsingOptions,
370 ) -> impl Future<Output = Result<WaitForLineResult, crate::StreamReadError>> + Send + 'static
371 {
372 let subscription = self.try_subscribe().unwrap_infallible();
373 let visitor = ParseLines::new(options, WaitForLine::new(predicate));
374 async move {
375 let (_term_sig_tx, term_sig_rx) = tokio::sync::oneshot::channel::<()>();
378 match tokio::time::timeout(timeout, consume_sync(subscription, visitor, term_sig_rx))
379 .await
380 {
381 Ok(Ok(true)) => Ok(WaitForLineResult::Matched),
382 Ok(Ok(false)) => Ok(WaitForLineResult::StreamClosed),
383 Ok(Err(err)) => Err(err),
384 Err(_) => Ok(WaitForLineResult::Timeout),
385 }
386 }
387 }
388}
389
390#[cfg(test)]
391mod tests;