tokio_process_tools/process_handle/
replay.rs1use super::ProcessHandle;
2use crate::output_stream::OutputStream;
3use crate::output_stream::backend::broadcast::BroadcastOutputStream;
4use crate::output_stream::backend::single_subscriber::SingleSubscriberOutputStream;
5use crate::output_stream::policy::{Delivery, ReplayEnabled};
6
7macro_rules! impl_seal_stdout_replay {
11 ($stdout:ident) => {
12 impl<StdoutD, Stderr> ProcessHandle<$stdout<StdoutD, ReplayEnabled>, Stderr>
13 where
14 StdoutD: Delivery,
15 Stderr: OutputStream,
16 {
17 pub fn seal_stdout_replay(&self) {
19 self.std_out_stream.seal_replay();
20 }
21 }
22 };
23}
24
25macro_rules! impl_seal_stderr_replay {
27 ($stderr:ident) => {
28 impl<Stdout, StderrD> ProcessHandle<Stdout, $stderr<StderrD, ReplayEnabled>>
29 where
30 Stdout: OutputStream,
31 StderrD: Delivery,
32 {
33 pub fn seal_stderr_replay(&self) {
35 self.std_err_stream.seal_replay();
36 }
37 }
38 };
39}
40
41macro_rules! impl_seal_output_replay {
45 ($stdout:ident, $stderr:ident) => {
46 impl<StdoutD, StderrD>
47 ProcessHandle<$stdout<StdoutD, ReplayEnabled>, $stderr<StderrD, ReplayEnabled>>
48 where
49 StdoutD: Delivery,
50 StderrD: Delivery,
51 {
52 pub fn seal_output_replay(&self) {
54 self.seal_stdout_replay();
55 self.seal_stderr_replay();
56 }
57 }
58 };
59}
60
61impl_seal_stdout_replay!(BroadcastOutputStream);
62impl_seal_stdout_replay!(SingleSubscriberOutputStream);
63impl_seal_stderr_replay!(BroadcastOutputStream);
64impl_seal_stderr_replay!(SingleSubscriberOutputStream);
65impl_seal_output_replay!(BroadcastOutputStream, BroadcastOutputStream);
66impl_seal_output_replay!(BroadcastOutputStream, SingleSubscriberOutputStream);
67impl_seal_output_replay!(SingleSubscriberOutputStream, BroadcastOutputStream);
68impl_seal_output_replay!(SingleSubscriberOutputStream, SingleSubscriberOutputStream);
69
70#[cfg(test)]
71mod tests {
72 use crate::test_support::long_running_command;
73 use crate::{
74 AutoName, DEFAULT_MAX_BUFFERED_CHUNKS, DEFAULT_READ_CHUNK_SIZE, NumBytesExt, Process,
75 };
76 use assertr::prelude::*;
77 use std::time::Duration;
78
79 #[tokio::test]
80 async fn process_handle_seal_output_replay_seals_stdout_and_stderr() {
81 let process = Process::new(long_running_command(Duration::from_millis(100)))
82 .name(AutoName::program_only())
83 .stdout_and_stderr(|stream| {
84 stream
85 .broadcast()
86 .reliable_for_active_subscribers()
87 .replay_last_bytes(1.megabytes())
88 .read_chunk_size(DEFAULT_READ_CHUNK_SIZE)
89 .max_buffered_chunks(DEFAULT_MAX_BUFFERED_CHUNKS)
90 })
91 .spawn()
92 .expect("Failed to spawn");
93
94 assert_that!(process.stdout().is_replay_sealed()).is_false();
95 assert_that!(process.stderr().is_replay_sealed()).is_false();
96
97 process.seal_output_replay();
98
99 assert_that!(process.stdout().is_replay_sealed()).is_true();
100 assert_that!(process.stderr().is_replay_sealed()).is_true();
101
102 let mut process = process;
103 let _ = process
104 .wait_for_completion(Duration::from_secs(2))
105 .await
106 .unwrap();
107 }
108
109 #[tokio::test]
110 async fn seal_output_replay_seals_broadcast_stdout_and_single_subscriber_stderr() {
111 let process = Process::new(long_running_command(Duration::from_millis(100)))
112 .name(AutoName::program_only())
113 .stdout(|stdout| {
114 stdout
115 .broadcast()
116 .reliable_for_active_subscribers()
117 .replay_last_bytes(1.megabytes())
118 .read_chunk_size(DEFAULT_READ_CHUNK_SIZE)
119 .max_buffered_chunks(DEFAULT_MAX_BUFFERED_CHUNKS)
120 })
121 .stderr(|stderr| {
122 stderr
123 .single_subscriber()
124 .reliable_for_active_subscribers()
125 .replay_last_bytes(1.megabytes())
126 .read_chunk_size(DEFAULT_READ_CHUNK_SIZE)
127 .max_buffered_chunks(DEFAULT_MAX_BUFFERED_CHUNKS)
128 })
129 .spawn()
130 .expect("Failed to spawn");
131
132 assert_that!(process.stdout().is_replay_sealed()).is_false();
133 assert_that!(process.stderr().is_replay_sealed()).is_false();
134
135 process.seal_output_replay();
136
137 assert_that!(process.stdout().is_replay_sealed()).is_true();
138 assert_that!(process.stderr().is_replay_sealed()).is_true();
139
140 let mut process = process;
141 let _ = process
142 .wait_for_completion(Duration::from_secs(2))
143 .await
144 .unwrap();
145 }
146
147 #[tokio::test]
148 async fn seal_output_replay_seals_single_subscriber_stdout_and_broadcast_stderr() {
149 let process = Process::new(long_running_command(Duration::from_millis(100)))
150 .name(AutoName::program_only())
151 .stdout(|stdout| {
152 stdout
153 .single_subscriber()
154 .reliable_for_active_subscribers()
155 .replay_last_bytes(1.megabytes())
156 .read_chunk_size(DEFAULT_READ_CHUNK_SIZE)
157 .max_buffered_chunks(DEFAULT_MAX_BUFFERED_CHUNKS)
158 })
159 .stderr(|stderr| {
160 stderr
161 .broadcast()
162 .reliable_for_active_subscribers()
163 .replay_last_bytes(1.megabytes())
164 .read_chunk_size(DEFAULT_READ_CHUNK_SIZE)
165 .max_buffered_chunks(DEFAULT_MAX_BUFFERED_CHUNKS)
166 })
167 .spawn()
168 .expect("Failed to spawn");
169
170 assert_that!(process.stdout().is_replay_sealed()).is_false();
171 assert_that!(process.stderr().is_replay_sealed()).is_false();
172
173 process.seal_output_replay();
174
175 assert_that!(process.stdout().is_replay_sealed()).is_true();
176 assert_that!(process.stderr().is_replay_sealed()).is_true();
177
178 let mut process = process;
179 let _ = process
180 .wait_for_completion(Duration::from_secs(2))
181 .await
182 .unwrap();
183 }
184}