tokio_process_tools/output_stream/backend/single_subscriber/
subscription.rs1use super::state::{ConfiguredShared, SubscriberId};
2use crate::output_stream::{StreamEvent, Subscription};
3use std::collections::VecDeque;
4use std::future::Future;
5use std::sync::Arc;
6use tokio::sync::mpsc;
7
8#[derive(Debug)]
13pub struct SingleSubscriberSubscription {
14 pub(super) id: SubscriberId,
15 pub(super) shared: Arc<ConfiguredShared>,
16 pub(super) replay: VecDeque<StreamEvent>,
17 pub(super) terminal_event: Option<StreamEvent>,
18 pub(super) live_receiver: Option<mpsc::Receiver<StreamEvent>>,
19}
20
21impl Subscription for SingleSubscriberSubscription {
22 #[allow(
23 clippy::manual_async_fn,
24 reason = "the trait method must expose a Send future for tokio::spawn"
25 )]
26 fn next_event(&mut self) -> impl Future<Output = Option<StreamEvent>> + Send + '_ {
27 async move {
28 if let Some(event) = self.replay.pop_front() {
29 return Some(event);
30 }
31 if let Some(event) = self.terminal_event.take() {
32 self.live_receiver = None;
33 return Some(event);
34 }
35 match &mut self.live_receiver {
36 Some(receiver) => receiver.recv().await,
37 None => None,
38 }
39 }
40 }
41}
42
43impl Drop for SingleSubscriberSubscription {
44 fn drop(&mut self) {
45 self.live_receiver = None;
46 self.shared.clear_active_if_current(self.id);
47 }
48}
49
50#[cfg(test)]
51mod tests {
52 use super::*;
53 use crate::StreamReadError;
54 use assertr::prelude::*;
55 use std::io;
56
57 fn attach_active(shared: &Arc<ConfiguredShared>) -> SubscriberId {
58 let mut state = shared
59 .state
60 .lock()
61 .expect("single-subscriber state poisoned");
62 state.attach_subscriber()
63 }
64
65 fn subscription_with(
66 shared: Arc<ConfiguredShared>,
67 id: SubscriberId,
68 replay: impl IntoIterator<Item = StreamEvent>,
69 terminal_event: Option<StreamEvent>,
70 live_receiver: Option<mpsc::Receiver<StreamEvent>>,
71 ) -> SingleSubscriberSubscription {
72 SingleSubscriberSubscription {
73 id,
74 shared,
75 replay: replay.into_iter().collect(),
76 terminal_event,
77 live_receiver,
78 }
79 }
80
81 #[tokio::test]
82 async fn emits_replay_before_live_events() {
83 let shared = Arc::new(ConfiguredShared::new());
84 let id = attach_active(&shared);
85 let (sender, receiver) = mpsc::channel(4);
86 let mut subscription = subscription_with(
87 Arc::clone(&shared),
88 id,
89 [StreamEvent::chunk(b"old")],
90 None,
91 Some(receiver),
92 );
93
94 sender.send(StreamEvent::chunk(b"live")).await.unwrap();
95 drop(sender);
96
97 assert_that!(subscription.next_event().await)
98 .is_some()
99 .is_equal_to(StreamEvent::chunk(b"old"));
100 assert_that!(subscription.next_event().await)
101 .is_some()
102 .is_equal_to(StreamEvent::chunk(b"live"));
103 assert_that!(subscription.next_event().await).is_none();
104 }
105
106 #[tokio::test]
107 async fn emits_terminal_event_after_replay_and_closes_live_receiver() {
108 let shared = Arc::new(ConfiguredShared::new());
109 let id = attach_active(&shared);
110 let (sender, receiver) = mpsc::channel(4);
111 let mut subscription = subscription_with(
112 Arc::clone(&shared),
113 id,
114 [StreamEvent::chunk(b"old")],
115 Some(StreamEvent::Eof),
116 Some(receiver),
117 );
118
119 sender
120 .send(StreamEvent::chunk(b"ignored-live"))
121 .await
122 .unwrap();
123
124 assert_that!(subscription.next_event().await)
125 .is_some()
126 .is_equal_to(StreamEvent::chunk(b"old"));
127 assert_that!(subscription.next_event().await)
128 .is_some()
129 .is_equal_to(StreamEvent::Eof);
130 assert_that!(subscription.next_event().await).is_none();
131 }
132
133 #[tokio::test]
134 async fn emits_gap_read_error_and_eof_from_replay() {
135 let shared = Arc::new(ConfiguredShared::new());
136 let id = attach_active(&shared);
137 let mut subscription = subscription_with(
138 Arc::clone(&shared),
139 id,
140 [
141 StreamEvent::Gap,
142 StreamEvent::ReadError(StreamReadError::new(
143 "custom",
144 io::Error::from(io::ErrorKind::BrokenPipe),
145 )),
146 StreamEvent::Eof,
147 ],
148 None,
149 None,
150 );
151
152 assert_that!(subscription.next_event().await)
153 .is_some()
154 .is_equal_to(StreamEvent::Gap);
155 match subscription.next_event().await {
156 Some(StreamEvent::ReadError(err)) => {
157 assert_that!(err.stream_name()).is_equal_to("custom");
158 assert_that!(err.kind()).is_equal_to(io::ErrorKind::BrokenPipe);
159 }
160 other => {
161 assert_that!(&other).fail(format_args!("expected read error, got {other:?}"));
162 }
163 }
164 assert_that!(subscription.next_event().await)
165 .is_some()
166 .is_equal_to(StreamEvent::Eof);
167 assert_that!(subscription.next_event().await).is_none();
168 }
169
170 #[tokio::test]
171 async fn drop_clears_active_backend_registration() {
172 let shared = Arc::new(ConfiguredShared::new());
173 let id = attach_active(&shared);
174 let (_sender, receiver) = mpsc::channel(4);
175 let subscription = subscription_with(Arc::clone(&shared), id, [], None, Some(receiver));
176
177 {
178 let state = shared
179 .state
180 .lock()
181 .expect("single-subscriber state poisoned");
182 assert_that!(state.active_id).is_some().is_equal_to(id);
183 }
184
185 drop(subscription);
186
187 let state = shared
188 .state
189 .lock()
190 .expect("single-subscriber state poisoned");
191 assert_that!(state.active_id).is_none();
192 }
193}