Skip to main content

tokio_process_tools/process_handle/
replay.rs

1use super::ProcessHandle;
2use crate::output_stream::OutputStream;
3use crate::output_stream::backend::broadcast::BroadcastOutputStream;
4use crate::output_stream::backend::single_subscriber::SingleSubscriberOutputStream;
5use crate::output_stream::policy::{Delivery, ReplayEnabled};
6
7/// Generates `seal_stdout_replay` for any `ProcessHandle` whose stdout is the named replay-enabled
8/// backend. The stderr slot stays free in the type parameters so each backend has one impl block
9/// regardless of what the caller paired it with on stderr.
10macro_rules! impl_seal_stdout_replay {
11    ($stdout:ident) => {
12        impl<StdoutD, Stderr> ProcessHandle<$stdout<StdoutD, ReplayEnabled>, Stderr>
13        where
14            StdoutD: Delivery,
15            Stderr: OutputStream,
16        {
17            /// Seals stdout replay history for future subscribers.
18            pub fn seal_stdout_replay(&self) {
19                self.std_out_stream.seal_replay();
20            }
21        }
22    };
23}
24
25/// Mirror of [`impl_seal_stdout_replay`] for the stderr slot.
26macro_rules! impl_seal_stderr_replay {
27    ($stderr:ident) => {
28        impl<Stdout, StderrD> ProcessHandle<Stdout, $stderr<StderrD, ReplayEnabled>>
29        where
30            Stdout: OutputStream,
31            StderrD: Delivery,
32        {
33            /// Seals stderr replay history for future subscribers.
34            pub fn seal_stderr_replay(&self) {
35                self.std_err_stream.seal_replay();
36            }
37        }
38    };
39}
40
41/// Generates `seal_output_replay` for the cartesian product of replay-enabled backends across
42/// stdout and stderr. Delegates to the per-stream methods so the actual sealing logic stays in
43/// one place.
44macro_rules! impl_seal_output_replay {
45    ($stdout:ident, $stderr:ident) => {
46        impl<StdoutD, StderrD>
47            ProcessHandle<$stdout<StdoutD, ReplayEnabled>, $stderr<StderrD, ReplayEnabled>>
48        where
49            StdoutD: Delivery,
50            StderrD: Delivery,
51        {
52            /// Seals stdout and stderr replay history for replay-enabled streams.
53            pub fn seal_output_replay(&self) {
54                self.seal_stdout_replay();
55                self.seal_stderr_replay();
56            }
57        }
58    };
59}
60
61impl_seal_stdout_replay!(BroadcastOutputStream);
62impl_seal_stdout_replay!(SingleSubscriberOutputStream);
63impl_seal_stderr_replay!(BroadcastOutputStream);
64impl_seal_stderr_replay!(SingleSubscriberOutputStream);
65impl_seal_output_replay!(BroadcastOutputStream, BroadcastOutputStream);
66impl_seal_output_replay!(BroadcastOutputStream, SingleSubscriberOutputStream);
67impl_seal_output_replay!(SingleSubscriberOutputStream, BroadcastOutputStream);
68impl_seal_output_replay!(SingleSubscriberOutputStream, SingleSubscriberOutputStream);
69
70#[cfg(test)]
71mod tests {
72    use crate::test_support::long_running_command;
73    use crate::{
74        AutoName, DEFAULT_MAX_BUFFERED_CHUNKS, DEFAULT_READ_CHUNK_SIZE, NumBytesExt, Process,
75    };
76    use assertr::prelude::*;
77    use std::time::Duration;
78
79    #[tokio::test]
80    async fn process_handle_seal_output_replay_seals_stdout_and_stderr() {
81        let process = Process::new(long_running_command(Duration::from_millis(100)))
82            .name(AutoName::program_only())
83            .stdout_and_stderr(|stream| {
84                stream
85                    .broadcast()
86                    .reliable_for_active_subscribers()
87                    .replay_last_bytes(1.megabytes())
88                    .read_chunk_size(DEFAULT_READ_CHUNK_SIZE)
89                    .max_buffered_chunks(DEFAULT_MAX_BUFFERED_CHUNKS)
90            })
91            .spawn()
92            .expect("Failed to spawn");
93
94        assert_that!(process.stdout().is_replay_sealed()).is_false();
95        assert_that!(process.stderr().is_replay_sealed()).is_false();
96
97        process.seal_output_replay();
98
99        assert_that!(process.stdout().is_replay_sealed()).is_true();
100        assert_that!(process.stderr().is_replay_sealed()).is_true();
101
102        let mut process = process;
103        let _ = process
104            .wait_for_completion(Duration::from_secs(2))
105            .await
106            .unwrap();
107    }
108
109    #[tokio::test]
110    async fn seal_output_replay_seals_broadcast_stdout_and_single_subscriber_stderr() {
111        let process = Process::new(long_running_command(Duration::from_millis(100)))
112            .name(AutoName::program_only())
113            .stdout(|stdout| {
114                stdout
115                    .broadcast()
116                    .reliable_for_active_subscribers()
117                    .replay_last_bytes(1.megabytes())
118                    .read_chunk_size(DEFAULT_READ_CHUNK_SIZE)
119                    .max_buffered_chunks(DEFAULT_MAX_BUFFERED_CHUNKS)
120            })
121            .stderr(|stderr| {
122                stderr
123                    .single_subscriber()
124                    .reliable_for_active_subscribers()
125                    .replay_last_bytes(1.megabytes())
126                    .read_chunk_size(DEFAULT_READ_CHUNK_SIZE)
127                    .max_buffered_chunks(DEFAULT_MAX_BUFFERED_CHUNKS)
128            })
129            .spawn()
130            .expect("Failed to spawn");
131
132        assert_that!(process.stdout().is_replay_sealed()).is_false();
133        assert_that!(process.stderr().is_replay_sealed()).is_false();
134
135        process.seal_output_replay();
136
137        assert_that!(process.stdout().is_replay_sealed()).is_true();
138        assert_that!(process.stderr().is_replay_sealed()).is_true();
139
140        let mut process = process;
141        let _ = process
142            .wait_for_completion(Duration::from_secs(2))
143            .await
144            .unwrap();
145    }
146
147    #[tokio::test]
148    async fn seal_output_replay_seals_single_subscriber_stdout_and_broadcast_stderr() {
149        let process = Process::new(long_running_command(Duration::from_millis(100)))
150            .name(AutoName::program_only())
151            .stdout(|stdout| {
152                stdout
153                    .single_subscriber()
154                    .reliable_for_active_subscribers()
155                    .replay_last_bytes(1.megabytes())
156                    .read_chunk_size(DEFAULT_READ_CHUNK_SIZE)
157                    .max_buffered_chunks(DEFAULT_MAX_BUFFERED_CHUNKS)
158            })
159            .stderr(|stderr| {
160                stderr
161                    .broadcast()
162                    .reliable_for_active_subscribers()
163                    .replay_last_bytes(1.megabytes())
164                    .read_chunk_size(DEFAULT_READ_CHUNK_SIZE)
165                    .max_buffered_chunks(DEFAULT_MAX_BUFFERED_CHUNKS)
166            })
167            .spawn()
168            .expect("Failed to spawn");
169
170        assert_that!(process.stdout().is_replay_sealed()).is_false();
171        assert_that!(process.stderr().is_replay_sealed()).is_false();
172
173        process.seal_output_replay();
174
175        assert_that!(process.stdout().is_replay_sealed()).is_true();
176        assert_that!(process.stderr().is_replay_sealed()).is_true();
177
178        let mut process = process;
179        let _ = process
180            .wait_for_completion(Duration::from_secs(2))
181            .await
182            .unwrap();
183    }
184}