tokio_process_tools/output_stream/backend/broadcast/
mod.rs1use crate::WaitForLineResult;
18use crate::output_stream::config::StreamConfig;
19use crate::output_stream::consumer::driver::consume_sync;
20use crate::output_stream::consumer::{spawn_consumer_async, spawn_consumer_sync};
21use crate::output_stream::event::StreamEvent;
22use crate::output_stream::line::adapter::LineAdapter;
23use crate::output_stream::policy::{
24 BestEffortDelivery, Delivery, DeliveryGuarantee, NoReplay, Replay, ReplayEnabled,
25};
26use crate::output_stream::visitors::factories::impl_consumer_factories;
27use crate::output_stream::visitors::wait::WaitForLineSink;
28use crate::output_stream::{OutputStream, TrySubscribable};
29use crate::{
30 AsyncStreamVisitor, Consumer, LineParsingOptions, NumBytes, StreamConsumerError, StreamVisitor,
31};
32use std::borrow::Cow;
33use std::fmt::{Debug, Formatter};
34use std::future::Future;
35use std::sync::Arc;
36use std::time::Duration;
37use tokio::io::AsyncRead;
38#[cfg(test)]
39use tokio::sync::watch;
40
41type FactoryReturn<T> = T;
44
45mod fanout;
46mod fast;
47mod state;
48mod subscription;
49
50use fanout::{FanoutReplayBackend, new_fanout_backend};
51use fast::{FastBackend, new_fast_backend};
52use state::{BestEffortLiveQueue, SubscriberSender};
53use subscription::{BroadcastSubscription, FastSubscription, LiveReceiver, SharedSubscription};
54
55enum Backend<D, R>
56where
57 D: Delivery,
58 R: Replay,
59{
60 Fast(FastBackend),
61 FanoutReplay(FanoutReplayBackend<D, R>),
62}
63
64pub struct BroadcastOutputStream<D = BestEffortDelivery, R = NoReplay>
72where
73 D: Delivery,
74 R: Replay,
75{
76 backend: Backend<D, R>,
77}
78
79impl<D, R> Drop for BroadcastOutputStream<D, R>
80where
81 D: Delivery,
82 R: Replay,
83{
84 fn drop(&mut self) {
85 match &self.backend {
86 Backend::Fast(backend) => {
87 backend.stream_reader.abort();
88 }
89 Backend::FanoutReplay(backend) => {
90 backend.stream_reader.abort();
91 {
92 let mut state = backend
93 .shared
94 .state
95 .lock()
96 .expect("broadcast state poisoned");
97 state.close_for_drop();
98 }
99 }
100 }
101 }
102}
103
104impl<D, R> Debug for BroadcastOutputStream<D, R>
105where
106 D: Delivery + Debug,
107 R: Replay + Debug,
108{
109 fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
110 let mut debug = f.debug_struct("BroadcastOutputStream");
111 debug.field("output_collector", &"non-debug < JoinHandle<()> >");
112 match &self.backend {
113 Backend::Fast(backend) => {
114 debug.field("backend", &"tokio::sync::broadcast");
115 debug.field("options", &backend.options);
116 debug.field("name", &backend.name);
117 }
118 Backend::FanoutReplay(backend) => {
119 debug.field("backend", &"fanout replay");
120 debug.field("options", &backend.options);
121 debug.field("name", &backend.name);
122 }
123 }
124 debug.finish_non_exhaustive()
125 }
126}
127
128impl<D, R> OutputStream for BroadcastOutputStream<D, R>
129where
130 D: Delivery,
131 R: Replay,
132{
133 fn read_chunk_size(&self) -> NumBytes {
134 match &self.backend {
135 Backend::Fast(backend) => backend.options.read_chunk_size,
136 Backend::FanoutReplay(backend) => backend.options.read_chunk_size,
137 }
138 }
139
140 fn max_buffered_chunks(&self) -> usize {
141 match &self.backend {
142 Backend::Fast(backend) => backend.options.max_buffered_chunks,
143 Backend::FanoutReplay(backend) => backend.options.max_buffered_chunks,
144 }
145 }
146
147 fn name(&self) -> &'static str {
148 match &self.backend {
149 Backend::Fast(backend) => backend.name,
150 Backend::FanoutReplay(backend) => backend.name,
151 }
152 }
153}
154
155impl<D, R> BroadcastOutputStream<D, R>
156where
157 D: Delivery,
158 R: Replay,
159{
160 pub fn from_stream<S: AsyncRead + Unpin + Send + 'static>(
162 stream: S,
163 stream_name: &'static str,
164 options: StreamConfig<D, R>,
165 ) -> Self {
166 options.assert_valid("options");
167
168 if options.delivery_guarantee() == DeliveryGuarantee::BestEffort
169 && !options.replay_enabled()
170 {
171 return Self {
172 backend: Backend::Fast(new_fast_backend(
173 stream,
174 stream_name,
175 options.read_chunk_size,
176 options.max_buffered_chunks,
177 )),
178 };
179 }
180
181 Self {
182 backend: Backend::FanoutReplay(new_fanout_backend(stream, stream_name, options)),
183 }
184 }
185}
186
187impl<D> BroadcastOutputStream<D, ReplayEnabled>
188where
189 D: Delivery,
190{
191 pub fn seal_replay(&self) {
200 let Backend::FanoutReplay(backend) = &self.backend else {
201 return;
202 };
203 {
204 let mut state = backend
205 .shared
206 .state
207 .lock()
208 .expect("broadcast state poisoned");
209 state.seal_replay();
210 }
211 }
212
213 #[must_use]
219 pub fn is_replay_sealed(&self) -> bool {
220 let Backend::FanoutReplay(backend) = &self.backend else {
221 return false;
222 };
223 backend
224 .shared
225 .state
226 .lock()
227 .expect("broadcast state poisoned")
228 .replay_sealed
229 }
230}
231
232#[cfg(test)]
233impl<D, R> BroadcastOutputStream<D, R>
234where
235 D: Delivery,
236 R: Replay,
237{
238 pub(super) fn subscribe_bytes_ingested(&self) -> watch::Receiver<u64> {
239 match &self.backend {
240 Backend::Fast(backend) => backend.bytes_ingested_tx.subscribe(),
241 Backend::FanoutReplay(backend) => backend.shared.subscribe_bytes_ingested(),
242 }
243 }
244}
245
246impl<D, R> BroadcastOutputStream<D, R>
247where
248 D: Delivery,
249 R: Replay,
250{
251 fn subscribe(&self) -> BroadcastSubscription<D, R> {
252 let Backend::FanoutReplay(backend) = &self.backend else {
253 panic!("fanout broadcast subscription requested for fast backend");
254 };
255 let mut state = backend
256 .shared
257 .state
258 .lock()
259 .expect("broadcast state poisoned");
260
261 let (subscriber_sender, live_receiver) = match backend.options.delivery_guarantee() {
262 DeliveryGuarantee::ReliableForActiveSubscribers => {
263 let (sender, receiver) =
264 tokio::sync::mpsc::channel(backend.options.max_buffered_chunks);
265 (
266 SubscriberSender::Reliable(sender),
267 LiveReceiver::Reliable(receiver),
268 )
269 }
270 DeliveryGuarantee::BestEffort => {
271 let queue = Arc::new(BestEffortLiveQueue::new(
272 backend.options.max_buffered_chunks,
273 ));
274 (
275 SubscriberSender::BestEffort(Arc::clone(&queue)),
276 LiveReceiver::BestEffort(queue),
277 )
278 }
279 };
280 let (replay, live_start_seq) = state.replay_snapshot(backend.options);
281 let id = if state.closed || state.terminal.is_some() {
282 None
283 } else {
284 Some(state.add_subscriber(subscriber_sender))
285 };
286
287 BroadcastSubscription::Shared(SharedSubscription {
288 shared: Arc::clone(&backend.shared),
289 id,
290 replay,
291 live_start_seq,
292 live_receiver: if id.is_some() {
293 live_receiver
294 } else {
295 LiveReceiver::Closed
296 },
297 _marker: std::marker::PhantomData,
298 done: false,
299 })
300 }
301
302 fn subscribe_normal(&self) -> BroadcastSubscription<D, R> {
303 match &self.backend {
304 Backend::Fast(backend) => {
305 let (receiver, emit_terminal_event) = {
306 let state = backend
307 .closure_state
308 .lock()
309 .expect("closure_state poisoned");
310 let receiver = backend.sender.subscribe();
311 let terminal_event = state
312 .read_error
313 .clone()
314 .map(StreamEvent::ReadError)
315 .or_else(|| state.closed.then_some(StreamEvent::Eof));
316 (receiver, terminal_event)
317 };
318
319 BroadcastSubscription::Fast(FastSubscription {
320 receiver,
321 emit_terminal_event,
322 })
323 }
324 Backend::FanoutReplay(_) => self.subscribe(),
325 }
326 }
327}
328
329impl<D, R> TrySubscribable for BroadcastOutputStream<D, R>
330where
331 D: Delivery,
332 R: Replay,
333{
334 fn try_subscribe(
335 &self,
336 ) -> Result<impl crate::output_stream::Subscription, StreamConsumerError> {
337 Ok(self.subscribe_normal())
338 }
339}
340
341impl<D, R> BroadcastOutputStream<D, R>
342where
343 D: Delivery,
344 R: Replay,
345{
346 #[must_use = "If not at least assigned to a variable, the return value will be dropped immediately, which in turn drops the internal tokio task, meaning that your visitor is never invoked and the consumer effectively dies immediately. You can safely do a `let _consumer = ...` binding to ignore the typical 'unused' warning."]
355 pub fn consume_with<V>(&self, visitor: V) -> Consumer<V::Output>
356 where
357 V: StreamVisitor,
358 {
359 spawn_consumer_sync(self.name(), self.subscribe_normal(), visitor)
360 }
361
362 #[must_use = "If not at least assigned to a variable, the return value will be dropped immediately, which in turn drops the internal tokio task, meaning that your visitor is never invoked and the consumer effectively dies immediately. You can safely do a `let _consumer = ...` binding to ignore the typical 'unused' warning."]
369 pub fn consume_with_async<V>(&self, visitor: V) -> Consumer<V::Output>
370 where
371 V: AsyncStreamVisitor,
372 {
373 spawn_consumer_async(self.name(), self.subscribe_normal(), visitor)
374 }
375
376 impl_consumer_factories!();
377
378 pub fn wait_for_line(
405 &self,
406 timeout: Duration,
407 predicate: impl Fn(Cow<'_, str>) -> bool + Send + Sync + 'static,
408 options: LineParsingOptions,
409 ) -> impl Future<Output = Result<WaitForLineResult, crate::StreamReadError>> + Send + 'static
410 {
411 let subscription = self.subscribe_normal();
412 let visitor = LineAdapter::new(options, WaitForLineSink::new(predicate));
413 async move {
414 let (_term_sig_tx, term_sig_rx) = tokio::sync::oneshot::channel::<()>();
417 match tokio::time::timeout(timeout, consume_sync(subscription, visitor, term_sig_rx))
418 .await
419 {
420 Ok(Ok(true)) => Ok(WaitForLineResult::Matched),
421 Ok(Ok(false)) => Ok(WaitForLineResult::StreamClosed),
422 Ok(Err(err)) => Err(err),
423 Err(_) => Ok(WaitForLineResult::Timeout),
424 }
425 }
426 }
427}
428
429#[cfg(test)]
430mod tests;