tokio_process_tools/output_stream/backend/single_subscriber/
mod.rs1use crate::WaitForLineResult;
4use crate::output_stream::config::StreamConfig;
5use crate::output_stream::consumer::driver::consume_sync;
6use crate::output_stream::consumer::{spawn_consumer_async, spawn_consumer_sync};
7use crate::output_stream::event::StreamEvent;
8use crate::output_stream::line::adapter::LineAdapter;
9use crate::output_stream::policy::{
10 BestEffortDelivery, Delivery, DeliveryGuarantee, NoReplay, Replay, ReplayEnabled,
11 ReplayRetention,
12};
13use crate::output_stream::visitors::factories::impl_consumer_factories;
14use crate::output_stream::visitors::wait::WaitForLineSink;
15use crate::output_stream::{OutputStream, Subscription, TrySubscribable};
16use crate::{
17 AsyncStreamVisitor, Consumer, LineParsingOptions, NumBytes, StreamConsumerError, StreamVisitor,
18};
19use std::borrow::Cow;
20use std::collections::VecDeque;
21use std::fmt::{Debug, Formatter};
22use std::future::Future;
23use std::sync::Arc;
24use std::time::Duration;
25use tokio::io::AsyncRead;
26use tokio::sync::mpsc;
27use tokio::task::JoinHandle;
28
29type FactoryReturn<T> = Result<T, StreamConsumerError>;
33
34mod reader;
35mod state;
36mod subscription;
37
38use reader::{read_chunked_best_effort, read_chunked_reliable};
39use state::{ActiveSubscriber, ConfiguredShared};
40use subscription::SingleSubscriberSubscription;
41
42impl Subscription for mpsc::Receiver<StreamEvent> {
43 fn next_event(&mut self) -> impl Future<Output = Option<StreamEvent>> + Send + '_ {
44 self.recv()
45 }
46}
47
48pub struct SingleSubscriberOutputStream<D = BestEffortDelivery, R = NoReplay>
57where
58 D: Delivery,
59 R: Replay,
60{
61 stream_reader: JoinHandle<()>,
64
65 options: StreamConfig<D, R>,
67
68 configured_shared: Arc<ConfiguredShared>,
70
71 name: &'static str,
73}
74
75impl<D, R> Drop for SingleSubscriberOutputStream<D, R>
76where
77 D: Delivery,
78 R: Replay,
79{
80 fn drop(&mut self) {
81 self.stream_reader.abort();
82 self.configured_shared.clear_active();
83 }
84}
85
86impl<D, R> Debug for SingleSubscriberOutputStream<D, R>
87where
88 D: Delivery + Debug,
89 R: Replay + Debug,
90{
91 fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
92 f.debug_struct("SingleSubscriberOutputStream")
93 .field("output_collector", &"non-debug < JoinHandle<()> >")
94 .field("options", &self.options)
95 .field("name", &self.name)
96 .finish_non_exhaustive()
97 }
98}
99
100impl<D, R> OutputStream for SingleSubscriberOutputStream<D, R>
101where
102 D: Delivery,
103 R: Replay,
104{
105 fn read_chunk_size(&self) -> NumBytes {
106 self.options.read_chunk_size
107 }
108
109 fn max_buffered_chunks(&self) -> usize {
110 self.options.max_buffered_chunks
111 }
112
113 fn name(&self) -> &'static str {
114 self.name
115 }
116}
117
118impl<D, R> SingleSubscriberOutputStream<D, R>
119where
120 D: Delivery,
121 R: Replay,
122{
123 pub fn from_stream<S>(stream: S, stream_name: &'static str, options: StreamConfig<D, R>) -> Self
125 where
126 S: AsyncRead + Unpin + Send + 'static,
127 {
128 options.assert_valid("options");
129
130 let shared = Arc::new(ConfiguredShared::new());
131 let active_rx = shared.subscribe_active();
132 let delivery_guarantee = options.delivery_guarantee();
133 let replay_retention = options.replay_retention();
134
135 let stream_reader = match delivery_guarantee {
136 DeliveryGuarantee::BestEffort => tokio::spawn(read_chunked_best_effort(
137 stream,
138 Arc::clone(&shared),
139 active_rx,
140 options.read_chunk_size,
141 replay_retention,
142 stream_name,
143 )),
144 DeliveryGuarantee::ReliableForActiveSubscribers => tokio::spawn(read_chunked_reliable(
145 stream,
146 Arc::clone(&shared),
147 active_rx,
148 options.read_chunk_size,
149 replay_retention,
150 stream_name,
151 )),
152 };
153
154 Self {
155 stream_reader,
156 options,
157 configured_shared: shared,
158 name: stream_name,
159 }
160 }
161
162 #[must_use]
164 pub fn replay_enabled(&self) -> bool {
165 self.options.replay_enabled()
166 }
167
168 #[must_use]
170 pub fn replay_retention(&self) -> Option<ReplayRetention> {
171 self.options.replay_retention()
172 }
173
174 fn take_subscription(&self) -> Result<SingleSubscriberSubscription, StreamConsumerError> {
175 let shared = &self.configured_shared;
176
177 let (sender, receiver) = mpsc::channel(self.options.max_buffered_chunks);
178 let (id, replay, terminal_event) = {
179 let mut state = shared
180 .state
181 .lock()
182 .expect("single-subscriber state poisoned");
183
184 if state.active_id.is_some() {
185 return Err(StreamConsumerError::ActiveConsumer {
186 stream_name: self.name,
187 });
188 }
189
190 let replay = if state.replay_sealed || self.options.replay_retention().is_none() {
191 VecDeque::default()
192 } else {
193 state.snapshot_events()
194 };
195 let id = state.attach_subscriber();
196 shared
197 .active_tx
198 .send_replace(Some(Arc::new(ActiveSubscriber { id, sender })));
199 (id, replay, state.terminal_event.clone())
200 };
201
202 Ok(SingleSubscriberSubscription {
203 id,
204 shared: Arc::clone(shared),
205 replay,
206 terminal_event,
207 live_receiver: Some(receiver),
208 })
209 }
210}
211
212impl<D> SingleSubscriberOutputStream<D, ReplayEnabled>
213where
214 D: Delivery,
215{
216 pub fn seal_replay(&self) {
224 let mut state = self
225 .configured_shared
226 .state
227 .lock()
228 .expect("single-subscriber state poisoned");
229 state.replay_sealed = true;
230 state.trim_replay_window(self.options.replay_retention());
231 }
232
233 #[must_use]
239 pub fn is_replay_sealed(&self) -> bool {
240 self.configured_shared
241 .state
242 .lock()
243 .expect("single-subscriber state poisoned")
244 .replay_sealed
245 }
246}
247
248impl<D, R> TrySubscribable for SingleSubscriberOutputStream<D, R>
249where
250 D: Delivery,
251 R: Replay,
252{
253 fn try_subscribe(&self) -> Result<impl Subscription, StreamConsumerError> {
254 self.take_subscription()
255 }
256}
257
258impl<D, R> SingleSubscriberOutputStream<D, R>
259where
260 D: Delivery,
261 R: Replay,
262{
263 pub fn consume_with<V>(&self, visitor: V) -> Result<Consumer<V::Output>, StreamConsumerError>
274 where
275 V: StreamVisitor,
276 {
277 Ok(spawn_consumer_sync(
278 self.name(),
279 self.take_subscription()?,
280 visitor,
281 ))
282 }
283
284 pub fn consume_with_async<V>(
290 &self,
291 visitor: V,
292 ) -> Result<Consumer<V::Output>, StreamConsumerError>
293 where
294 V: AsyncStreamVisitor,
295 {
296 Ok(spawn_consumer_async(
297 self.name(),
298 self.take_subscription()?,
299 visitor,
300 ))
301 }
302
303 impl_consumer_factories!();
304
305 pub fn wait_for_line(
315 &self,
316 timeout: Duration,
317 predicate: impl Fn(Cow<'_, str>) -> bool + Send + Sync + 'static,
318 options: LineParsingOptions,
319 ) -> Result<
320 impl Future<Output = Result<WaitForLineResult, crate::StreamReadError>> + Send + 'static,
321 StreamConsumerError,
322 > {
323 let subscription = self.take_subscription()?;
324 let visitor = LineAdapter::new(options, WaitForLineSink::new(predicate));
325 Ok(async move {
326 let (_term_sig_tx, term_sig_rx) = tokio::sync::oneshot::channel::<()>();
327 match tokio::time::timeout(timeout, consume_sync(subscription, visitor, term_sig_rx))
328 .await
329 {
330 Ok(Ok(true)) => Ok(WaitForLineResult::Matched),
331 Ok(Ok(false)) => Ok(WaitForLineResult::StreamClosed),
332 Ok(Err(err)) => Err(err),
333 Err(_) => Ok(WaitForLineResult::Timeout),
334 }
335 })
336 }
337}
338
339#[cfg(test)]
340mod tests;