Skip to main content

tokio_process_tools/output_stream/backend/single_subscriber/
subscription.rs

1use super::state::{ConfiguredShared, SubscriberId};
2use crate::output_stream::{StreamEvent, Subscription};
3use std::collections::VecDeque;
4use std::future::Future;
5use std::sync::Arc;
6use tokio::sync::mpsc;
7
8/// Subscription handle returned by
9/// [`SingleSubscriberOutputStream::try_subscribe`](crate::SingleSubscriberOutputStream).
10/// Treat this as an opaque value: pass it to a built-in consumer or your own
11/// [`Subscription`]-driven loop. Internal fields are not part of the public API.
12#[derive(Debug)]
13pub struct SingleSubscriberSubscription {
14    pub(super) id: SubscriberId,
15    pub(super) shared: Arc<ConfiguredShared>,
16    pub(super) replay: VecDeque<StreamEvent>,
17    pub(super) terminal_event: Option<StreamEvent>,
18    pub(super) live_receiver: Option<mpsc::Receiver<StreamEvent>>,
19}
20
21impl Subscription for SingleSubscriberSubscription {
22    #[allow(
23        clippy::manual_async_fn,
24        reason = "the trait method must expose a Send future for tokio::spawn"
25    )]
26    fn next_event(&mut self) -> impl Future<Output = Option<StreamEvent>> + Send + '_ {
27        async move {
28            if let Some(event) = self.replay.pop_front() {
29                return Some(event);
30            }
31            if let Some(event) = self.terminal_event.take() {
32                self.live_receiver = None;
33                return Some(event);
34            }
35            match &mut self.live_receiver {
36                Some(receiver) => receiver.recv().await,
37                None => None,
38            }
39        }
40    }
41}
42
43impl Drop for SingleSubscriberSubscription {
44    fn drop(&mut self) {
45        self.live_receiver = None;
46        self.shared.clear_active_if_current(self.id);
47    }
48}
49
50#[cfg(test)]
51mod tests {
52    use super::*;
53    use crate::StreamReadError;
54    use assertr::prelude::*;
55    use std::io;
56
57    fn attach_active(shared: &Arc<ConfiguredShared>) -> SubscriberId {
58        let mut state = shared
59            .state
60            .lock()
61            .expect("single-subscriber state poisoned");
62        state.attach_subscriber()
63    }
64
65    fn subscription_with(
66        shared: Arc<ConfiguredShared>,
67        id: SubscriberId,
68        replay: impl IntoIterator<Item = StreamEvent>,
69        terminal_event: Option<StreamEvent>,
70        live_receiver: Option<mpsc::Receiver<StreamEvent>>,
71    ) -> SingleSubscriberSubscription {
72        SingleSubscriberSubscription {
73            id,
74            shared,
75            replay: replay.into_iter().collect(),
76            terminal_event,
77            live_receiver,
78        }
79    }
80
81    #[tokio::test]
82    async fn emits_replay_before_live_events() {
83        let shared = Arc::new(ConfiguredShared::new());
84        let id = attach_active(&shared);
85        let (sender, receiver) = mpsc::channel(4);
86        let mut subscription = subscription_with(
87            Arc::clone(&shared),
88            id,
89            [StreamEvent::chunk(b"old")],
90            None,
91            Some(receiver),
92        );
93
94        sender.send(StreamEvent::chunk(b"live")).await.unwrap();
95        drop(sender);
96
97        assert_that!(subscription.next_event().await)
98            .is_some()
99            .is_equal_to(StreamEvent::chunk(b"old"));
100        assert_that!(subscription.next_event().await)
101            .is_some()
102            .is_equal_to(StreamEvent::chunk(b"live"));
103        assert_that!(subscription.next_event().await).is_none();
104    }
105
106    #[tokio::test]
107    async fn emits_terminal_event_after_replay_and_closes_live_receiver() {
108        let shared = Arc::new(ConfiguredShared::new());
109        let id = attach_active(&shared);
110        let (sender, receiver) = mpsc::channel(4);
111        let mut subscription = subscription_with(
112            Arc::clone(&shared),
113            id,
114            [StreamEvent::chunk(b"old")],
115            Some(StreamEvent::Eof),
116            Some(receiver),
117        );
118
119        sender
120            .send(StreamEvent::chunk(b"ignored-live"))
121            .await
122            .unwrap();
123
124        assert_that!(subscription.next_event().await)
125            .is_some()
126            .is_equal_to(StreamEvent::chunk(b"old"));
127        assert_that!(subscription.next_event().await)
128            .is_some()
129            .is_equal_to(StreamEvent::Eof);
130        assert_that!(subscription.next_event().await).is_none();
131    }
132
133    #[tokio::test]
134    async fn emits_gap_read_error_and_eof_from_replay() {
135        let shared = Arc::new(ConfiguredShared::new());
136        let id = attach_active(&shared);
137        let mut subscription = subscription_with(
138            Arc::clone(&shared),
139            id,
140            [
141                StreamEvent::Gap,
142                StreamEvent::ReadError(StreamReadError::new(
143                    "custom",
144                    io::Error::from(io::ErrorKind::BrokenPipe),
145                )),
146                StreamEvent::Eof,
147            ],
148            None,
149            None,
150        );
151
152        assert_that!(subscription.next_event().await)
153            .is_some()
154            .is_equal_to(StreamEvent::Gap);
155        match subscription.next_event().await {
156            Some(StreamEvent::ReadError(err)) => {
157                assert_that!(err.stream_name()).is_equal_to("custom");
158                assert_that!(err.kind()).is_equal_to(io::ErrorKind::BrokenPipe);
159            }
160            other => {
161                assert_that!(&other).fail(format_args!("expected read error, got {other:?}"));
162            }
163        }
164        assert_that!(subscription.next_event().await)
165            .is_some()
166            .is_equal_to(StreamEvent::Eof);
167        assert_that!(subscription.next_event().await).is_none();
168    }
169
170    #[tokio::test]
171    async fn drop_clears_active_backend_registration() {
172        let shared = Arc::new(ConfiguredShared::new());
173        let id = attach_active(&shared);
174        let (_sender, receiver) = mpsc::channel(4);
175        let subscription = subscription_with(Arc::clone(&shared), id, [], None, Some(receiver));
176
177        {
178            let state = shared
179                .state
180                .lock()
181                .expect("single-subscriber state poisoned");
182            assert_that!(state.active_id).is_some().is_equal_to(id);
183        }
184
185        drop(subscription);
186
187        let state = shared
188            .state
189            .lock()
190            .expect("single-subscriber state poisoned");
191        assert_that!(state.active_id).is_none();
192    }
193}