tokio_process_tools/output_stream/backend/single_subscriber/
mod.rs1use crate::WaitForLineResult;
4use crate::output_stream::config::StreamConfig;
5use crate::output_stream::consumer::driver::consume_sync;
6use crate::output_stream::event::StreamEvent;
7use crate::output_stream::line::adapter::ParseLines;
8use crate::output_stream::policy::{
9 Delivery, DeliveryGuarantee, LossyWithoutBackpressure, NoReplay, Replay, ReplayEnabled,
10 ReplayRetention,
11};
12use crate::output_stream::visitors::wait::WaitForLine;
13use crate::output_stream::{Consumable, OutputStream, Subscribable, Subscription};
14use crate::{LineParsingOptions, NumBytes, StreamConsumerError};
15use std::borrow::Cow;
16use std::collections::VecDeque;
17use std::fmt::{Debug, Formatter};
18use std::future::Future;
19use std::sync::Arc;
20use std::time::Duration;
21use tokio::io::AsyncRead;
22use tokio::sync::mpsc;
23use tokio::task::JoinHandle;
24
25mod reader;
26mod state;
27mod subscription;
28
29use reader::{read_chunked_best_effort, read_chunked_reliable};
30use state::{ActiveSubscriber, ConfiguredShared};
31
32pub use subscription::SingleSubscriberSubscription;
33
34impl Subscription for mpsc::Receiver<StreamEvent> {
35 fn next_event(&mut self) -> impl Future<Output = Option<StreamEvent>> + Send + '_ {
36 self.recv()
37 }
38}
39
40pub struct SingleSubscriberOutputStream<D = LossyWithoutBackpressure, R = NoReplay>
49where
50 D: Delivery,
51 R: Replay,
52{
53 stream_reader: JoinHandle<()>,
56
57 options: StreamConfig<D, R>,
59
60 configured_shared: Arc<ConfiguredShared>,
62
63 name: &'static str,
65}
66
67impl<D, R> Drop for SingleSubscriberOutputStream<D, R>
68where
69 D: Delivery,
70 R: Replay,
71{
72 fn drop(&mut self) {
73 self.stream_reader.abort();
74 self.configured_shared.clear_active();
75 }
76}
77
78impl<D, R> Debug for SingleSubscriberOutputStream<D, R>
79where
80 D: Delivery + Debug,
81 R: Replay + Debug,
82{
83 fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
84 f.debug_struct("SingleSubscriberOutputStream")
85 .field("output_collector", &"non-debug < JoinHandle<()> >")
86 .field("options", &self.options)
87 .field("name", &self.name)
88 .finish_non_exhaustive()
89 }
90}
91
92impl<D, R> OutputStream for SingleSubscriberOutputStream<D, R>
93where
94 D: Delivery,
95 R: Replay,
96{
97 fn read_chunk_size(&self) -> NumBytes {
98 self.options.read_chunk_size
99 }
100
101 fn max_buffered_chunks(&self) -> usize {
102 self.options.max_buffered_chunks
103 }
104
105 fn name(&self) -> &'static str {
106 self.name
107 }
108}
109
110impl<D, R> SingleSubscriberOutputStream<D, R>
111where
112 D: Delivery,
113 R: Replay,
114{
115 #[doc(hidden)]
117 #[must_use]
118 pub fn from_stream<S>(stream: S, stream_name: &'static str, options: StreamConfig<D, R>) -> Self
119 where
120 S: AsyncRead + Unpin + Send + 'static,
121 {
122 options.assert_valid("options");
123
124 let shared = Arc::new(ConfiguredShared::new());
125 let active_rx = shared.subscribe_active();
126 let delivery_guarantee = options.delivery_guarantee();
127 let replay_retention = options.replay_retention();
128
129 let stream_reader = match delivery_guarantee {
130 DeliveryGuarantee::LossyWithoutBackpressure => tokio::spawn(read_chunked_best_effort(
131 stream,
132 Arc::clone(&shared),
133 active_rx,
134 options.read_chunk_size,
135 replay_retention,
136 stream_name,
137 )),
138 DeliveryGuarantee::ReliableWithBackpressure => tokio::spawn(read_chunked_reliable(
139 stream,
140 Arc::clone(&shared),
141 active_rx,
142 options.read_chunk_size,
143 replay_retention,
144 stream_name,
145 )),
146 };
147
148 Self {
149 stream_reader,
150 options,
151 configured_shared: shared,
152 name: stream_name,
153 }
154 }
155
156 #[must_use]
158 pub fn replay_enabled(&self) -> bool {
159 self.options.replay_enabled()
160 }
161
162 #[must_use]
164 pub fn replay_retention(&self) -> Option<ReplayRetention> {
165 self.options.replay_retention()
166 }
167}
168
169impl<D> SingleSubscriberOutputStream<D, ReplayEnabled>
170where
171 D: Delivery,
172{
173 pub fn seal_replay(&self) {
181 let mut state = self
182 .configured_shared
183 .state
184 .lock()
185 .expect("single-subscriber state poisoned");
186 state.replay_sealed = true;
187 state.trim_replay_window(self.options.replay_retention());
188 }
189
190 #[must_use]
196 pub fn is_replay_sealed(&self) -> bool {
197 self.configured_shared
198 .state
199 .lock()
200 .expect("single-subscriber state poisoned")
201 .replay_sealed
202 }
203}
204
205impl<D, R> Subscribable for SingleSubscriberOutputStream<D, R>
206where
207 D: Delivery,
208 R: Replay,
209{
210 type Subscription = SingleSubscriberSubscription;
211 type SubscribeError = StreamConsumerError;
212
213 fn try_subscribe(&self) -> Result<Self::Subscription, Self::SubscribeError> {
214 let shared = &self.configured_shared;
215
216 let (sender, receiver) = mpsc::channel(self.options.max_buffered_chunks);
217 let (id, replay, terminal_event) = {
218 let mut state = shared
219 .state
220 .lock()
221 .expect("single-subscriber state poisoned");
222
223 if state.active_id.is_some() {
224 return Err(StreamConsumerError::ActiveConsumer {
225 stream_name: self.name,
226 });
227 }
228
229 let replay = if state.replay_sealed || self.options.replay_retention().is_none() {
230 VecDeque::default()
231 } else {
232 state.snapshot_events()
233 };
234 let id = state.attach_subscriber();
235 shared
236 .active_tx
237 .send_replace(Some(Arc::new(ActiveSubscriber { id, sender })));
238 (id, replay, state.terminal_event.clone())
239 };
240
241 Ok(SingleSubscriberSubscription {
242 id,
243 shared: Arc::clone(shared),
244 replay,
245 terminal_event,
246 live_receiver: Some(receiver),
247 })
248 }
249}
250
251impl<D, R> Consumable for SingleSubscriberOutputStream<D, R>
252where
253 D: Delivery,
254 R: Replay,
255{
256 type Error = StreamConsumerError;
257}
258
259impl<D, R> SingleSubscriberOutputStream<D, R>
260where
261 D: Delivery,
262 R: Replay,
263{
264 pub fn wait_for_line(
274 &self,
275 timeout: Duration,
276 predicate: impl Fn(Cow<'_, str>) -> bool + Send + 'static,
277 options: LineParsingOptions,
278 ) -> Result<
279 impl Future<Output = Result<WaitForLineResult, crate::StreamReadError>> + Send + 'static,
280 StreamConsumerError,
281 > {
282 let subscription = self.try_subscribe()?;
283 let visitor = ParseLines::new(options, WaitForLine::new(predicate));
284 Ok(async move {
285 let (_term_sig_tx, term_sig_rx) = tokio::sync::oneshot::channel::<()>();
286 match tokio::time::timeout(timeout, consume_sync(subscription, visitor, term_sig_rx))
287 .await
288 {
289 Ok(Ok(true)) => Ok(WaitForLineResult::Matched),
290 Ok(Ok(false)) => Ok(WaitForLineResult::StreamClosed),
291 Ok(Err(err)) => Err(err),
292 Err(_) => Ok(WaitForLineResult::Timeout),
293 }
294 })
295 }
296}
297
298#[cfg(test)]
299mod tests;