gix_filter/driver/apply.rs
1use std::collections::HashMap;
2
3use bstr::{BStr, BString};
4
5use crate::{
6 driver,
7 driver::{process, process::client::invoke, Operation, Process, State},
8 Driver,
9};
10
11/// What to do if delay is supported by a process filter.
12#[derive(Default, Debug, Copy, Clone)]
13pub enum Delay {
14 /// Use delayed processing for this entry.
15 ///
16 /// Note that it's up to the filter to determine whether or not the processing should be delayed.
17 #[default]
18 Allow,
19 /// Do not delay the processing, and force it to happen immediately. In this case, no delayed processing will occur
20 /// even if the filter supports it.
21 ///
22 /// This is the default as it requires no special precautions to be taken by the caller as
23 /// outputs will be produced immediately.
24 Forbid,
25}
26
27/// The error returned by [State::apply()][super::State::apply()].
28#[derive(Debug, thiserror::Error)]
29#[allow(missing_docs)]
30pub enum Error {
31 #[error(transparent)]
32 Init(#[from] driver::init::Error),
33 #[error("Could not write entire object to driver")]
34 WriteSource(#[from] std::io::Error),
35 #[error("Filter process delayed an entry even though that was not requested")]
36 DelayNotAllowed,
37 #[error("Failed to invoke '{command}' command")]
38 ProcessInvoke {
39 source: process::client::invoke::Error,
40 command: String,
41 },
42 #[error("The invoked command '{command}' in process indicated an error: {status:?}")]
43 ProcessStatus {
44 status: driver::process::Status,
45 command: String,
46 },
47}
48
49/// Additional information for use in the [`State::apply()`] method.
50#[derive(Debug, Copy, Clone)]
51pub struct Context<'a, 'b> {
52 /// The repo-relative using slashes as separator of the entry currently being processed.
53 pub rela_path: &'a BStr,
54 /// The name of the reference that `HEAD` is pointing to. It's passed to `process` filters if present.
55 pub ref_name: Option<&'b BStr>,
56 /// The root-level tree that contains the current entry directly or indirectly, or the commit owning the tree (if available).
57 ///
58 /// This is passed to `process` filters if present.
59 pub treeish: Option<gix_hash::ObjectId>,
60 /// The actual blob-hash of the data we are processing. It's passed to `process` filters if present.
61 ///
62 /// Note that this hash might be different from the `$Id$` of the respective `ident` filter, as the latter generates the hash itself.
63 pub blob: Option<gix_hash::ObjectId>,
64}
65
66/// Apply operations to filter programs.
67impl State {
68 /// Apply `operation` of `driver` to the bytes read from `src` and return a reader to immediately consume the output
69 /// produced by the filter. `rela_path` is the repo-relative path of the entry to handle.
70 /// It's possible that the filter stays inactive, in which case the `src` isn't consumed and has to be used by the caller.
71 ///
72 /// Each call to this method will cause the corresponding filter to be invoked unless `driver` indicates a `process` filter,
73 /// which is only launched once and maintained using this state.
74 ///
75 /// Note that it's not an error if there is no filter process for `operation` or if a long-running process doesn't support
76 /// the desired capability.
77 ///
78 /// ### Deviation
79 ///
80 /// If a long-running process returns the 'abort' status after receiving the data, it will be removed similarly to how `git` does it.
81 /// However, if it returns an unsuccessful error status later, it will not be removed, but reports the error only.
82 /// If any other non-'error' status is received, the process will be stopped. But that doesn't happen if such a status is received
83 /// after reading the filtered result.
84 pub fn apply<'a>(
85 &'a mut self,
86 driver: &Driver,
87 src: &mut impl std::io::Read,
88 operation: Operation,
89 ctx: Context<'_, '_>,
90 ) -> Result<Option<Box<dyn std::io::Read + 'a>>, Error> {
91 match self.apply_delayed(driver, src, operation, Delay::Forbid, ctx)? {
92 Some(MaybeDelayed::Delayed(_)) => {
93 unreachable!("we forbid delaying the entry")
94 }
95 Some(MaybeDelayed::Immediate(read)) => Ok(Some(read)),
96 None => Ok(None),
97 }
98 }
99
100 /// Like [`apply()`](Self::apply()), but use `delay` to determine if the filter result may be delayed or not.
101 ///
102 /// Poll [`list_delayed_paths()`](Self::list_delayed_paths()) until it is empty and query the available paths again.
103 /// Note that even though it's possible, the API assumes that commands aren't mixed when delays are allowed.
104 pub fn apply_delayed<'a>(
105 &'a mut self,
106 driver: &Driver,
107 src: &mut impl std::io::Read,
108 operation: Operation,
109 delay: Delay,
110 ctx: Context<'_, '_>,
111 ) -> Result<Option<MaybeDelayed<'a>>, Error> {
112 match self.maybe_launch_process(driver, operation, ctx.rela_path)? {
113 Some(Process::SingleFile { mut child, command }) => {
114 // To avoid deadlock when the filter immediately echoes input to output (like `cat`),
115 // we need to write to stdin and read from stdout concurrently. If we write all data
116 // to stdin before reading from stdout, and the pipe buffer fills up, both processes
117 // will block: the filter blocks writing to stdout (buffer full), and we block writing
118 // to stdin (waiting for the filter to consume data).
119 //
120 // Solution: Read all data into a buffer, then spawn a thread to write it to stdin
121 // while we can immediately read from stdout.
122 let mut input_data = Vec::new();
123 std::io::copy(src, &mut input_data)?;
124
125 let stdin = child.stdin.take().expect("configured");
126 let write_thread = WriterThread::write_all_in_background(input_data, stdin)?;
127
128 Ok(Some(MaybeDelayed::Immediate(Box::new(ReadFilterOutput {
129 inner: child.stdout.take(),
130 child: driver.required.then_some((child, command)),
131 write_thread: Some(write_thread),
132 }))))
133 }
134 Some(Process::MultiFile { client, key }) => {
135 let command = operation.as_str();
136 if !client.capabilities().contains(command) {
137 return Ok(None);
138 }
139
140 let invoke_result = client.invoke(
141 command,
142 &mut [
143 ("pathname", Some(ctx.rela_path.to_owned())),
144 ("ref", ctx.ref_name.map(ToOwned::to_owned)),
145 ("treeish", ctx.treeish.map(|id| id.to_hex().to_string().into())),
146 ("blob", ctx.blob.map(|id| id.to_hex().to_string().into())),
147 (
148 "can-delay",
149 match delay {
150 Delay::Allow if client.capabilities().contains("delay") => Some("1".into()),
151 Delay::Forbid | Delay::Allow => None,
152 },
153 ),
154 ]
155 .into_iter()
156 .filter_map(|(key, value)| value.map(|v| (key, v))),
157 src,
158 );
159 let status = match invoke_result {
160 Ok(status) => status,
161 Err(err) => {
162 let invoke::Error::Io(io_err) = &err;
163 handle_io_err(io_err, &mut self.running, key.0.as_ref());
164 return Err(Error::ProcessInvoke {
165 command: command.into(),
166 source: err,
167 });
168 }
169 };
170
171 if status.is_delayed() {
172 if matches!(delay, Delay::Forbid) {
173 return Err(Error::DelayNotAllowed);
174 }
175 Ok(Some(MaybeDelayed::Delayed(key)))
176 } else if status.is_success() {
177 // TODO: find a way to not have to do the 'borrow-dance'.
178 let client = self.running.remove(&key.0).expect("present for borrowcheck dance");
179 self.running.insert(key.0.clone(), client);
180 let client = self.running.get_mut(&key.0).expect("just inserted");
181
182 Ok(Some(MaybeDelayed::Immediate(Box::new(client.as_read()))))
183 } else {
184 let message = status.message().unwrap_or_default();
185 match message {
186 "abort" => {
187 client.capabilities_mut().remove(command);
188 }
189 "error" => {}
190 _strange => {
191 let client = self.running.remove(&key.0).expect("we definitely have it");
192 client.into_child().kill().ok();
193 }
194 }
195 Err(Error::ProcessStatus {
196 command: command.into(),
197 status,
198 })
199 }
200 }
201 None => Ok(None),
202 }
203 }
204}
205
206/// A type to represent delayed or immediate apply-filter results.
207pub enum MaybeDelayed<'a> {
208 /// Using the delayed protocol, this entry has been sent to a long-running process and needs to be
209 /// checked for again, later, using the [`driver::Key`] to refer to the filter who owes a response.
210 ///
211 /// Note that the path to the entry is also needed to obtain the filtered result later.
212 Delayed(driver::Key),
213 /// The filtered result can be read from the contained reader right away.
214 ///
215 /// Note that it must be consumed in full or till a read error occurs.
216 Immediate(Box<dyn std::io::Read + 'a>),
217}
218
219/// A helper to manage writing to stdin on a separate thread to avoid deadlock.
220struct WriterThread {
221 handle: Option<std::thread::JoinHandle<std::io::Result<()>>>,
222}
223
224impl WriterThread {
225 /// Spawn a thread that will write all data from `data` to `stdin`.
226 fn write_all_in_background(data: Vec<u8>, mut stdin: std::process::ChildStdin) -> std::io::Result<Self> {
227 let handle = std::thread::Builder::new()
228 .name("gix-filter-stdin-writer".into())
229 .stack_size(128 * 1024)
230 .spawn(move || {
231 use std::io::Write;
232 stdin.write_all(&data)?;
233 // Explicitly drop stdin to close the pipe and signal EOF to the child
234 drop(stdin);
235 Ok(())
236 })?;
237
238 Ok(Self { handle: Some(handle) })
239 }
240
241 /// Wait for the writer thread to complete and return any error that occurred.
242 fn join(&mut self) -> std::io::Result<()> {
243 let Some(handle) = self.handle.take() else {
244 return Ok(());
245 };
246 handle.join().map_err(|panic_info| {
247 let msg = if let Some(s) = panic_info.downcast_ref::<String>() {
248 format!("Writer thread panicked: {s}")
249 } else if let Some(s) = panic_info.downcast_ref::<&str>() {
250 format!("Writer thread panicked: {s}")
251 } else {
252 "Writer thread panicked while writing to filter stdin".to_string()
253 };
254 std::io::Error::other(msg)
255 })?
256 }
257}
258
259impl Drop for WriterThread {
260 fn drop(&mut self) {
261 // Best effort join on drop.
262 if let Err(_err) = self.join() {
263 gix_trace::debug!(err = %_err, "Failed to join writer thread during drop");
264 }
265 }
266}
267
268/// A utility type to facilitate streaming the output of a filter process.
269struct ReadFilterOutput {
270 inner: Option<std::process::ChildStdout>,
271 /// The child is present if we need its exit code to be positive.
272 child: Option<(std::process::Child, std::process::Command)>,
273 /// The thread writing to stdin, if any. Must be joined when reading is done.
274 write_thread: Option<WriterThread>,
275}
276
277pub(crate) fn handle_io_err(err: &std::io::Error, running: &mut HashMap<BString, process::Client>, process: &BStr) {
278 if matches!(
279 err.kind(),
280 std::io::ErrorKind::BrokenPipe | std::io::ErrorKind::UnexpectedEof
281 ) {
282 running.remove(process).expect("present or we wouldn't be here");
283 }
284}
285
286impl std::io::Read for ReadFilterOutput {
287 fn read(&mut self, buf: &mut [u8]) -> std::io::Result<usize> {
288 match self.inner.as_mut() {
289 Some(inner) => {
290 let num_read = match inner.read(buf) {
291 Ok(n) => n,
292 Err(e) => {
293 // On read error, ensure we join the writer thread before propagating the error.
294 // This is expected to finish with failure as well as it should fail to write
295 // to the process which now fails to produce output (that we try to read).
296 if let Some(mut write_thread) = self.write_thread.take() {
297 // Try to join but prioritize the original read error
298 if let Err(_thread_err) = write_thread.join() {
299 gix_trace::debug!(thread_err = %_thread_err, read_err = %e, "write to stdin error during failed read");
300 }
301 }
302 return Err(e);
303 }
304 };
305
306 if num_read == 0 {
307 self.inner.take();
308
309 // Join the writer thread first to ensure all data has been written
310 // and that resources are freed now.
311 if let Some(mut write_thread) = self.write_thread.take() {
312 write_thread.join()?;
313 }
314
315 if let Some((mut child, cmd)) = self.child.take() {
316 let status = child.wait()?;
317 if !status.success() {
318 return Err(std::io::Error::other(format!("Driver process {cmd:?} failed")));
319 }
320 }
321 }
322 Ok(num_read)
323 }
324 None => Ok(0),
325 }
326 }
327}