gix_filter/driver/
apply.rs

1use std::collections::HashMap;
2
3use bstr::{BStr, BString};
4
5use crate::{
6    driver,
7    driver::{process, process::client::invoke, Operation, Process, State},
8    Driver,
9};
10
11/// What to do if delay is supported by a process filter.
12#[derive(Default, Debug, Copy, Clone)]
13pub enum Delay {
14    /// Use delayed processing for this entry.
15    ///
16    /// Note that it's up to the filter to determine whether or not the processing should be delayed.
17    #[default]
18    Allow,
19    /// Do not delay the processing, and force it to happen immediately. In this case, no delayed processing will occur
20    /// even if the filter supports it.
21    ///
22    /// This is the default as it requires no special precautions to be taken by the caller as
23    /// outputs will be produced immediately.
24    Forbid,
25}
26
27/// The error returned by [State::apply()][super::State::apply()].
28#[derive(Debug, thiserror::Error)]
29#[allow(missing_docs)]
30pub enum Error {
31    #[error(transparent)]
32    Init(#[from] driver::init::Error),
33    #[error("Could not write entire object to driver")]
34    WriteSource(#[from] std::io::Error),
35    #[error("Filter process delayed an entry even though that was not requested")]
36    DelayNotAllowed,
37    #[error("Failed to invoke '{command}' command")]
38    ProcessInvoke {
39        source: process::client::invoke::Error,
40        command: String,
41    },
42    #[error("The invoked command '{command}' in process indicated an error: {status:?}")]
43    ProcessStatus {
44        status: driver::process::Status,
45        command: String,
46    },
47}
48
49/// Additional information for use in the [`State::apply()`] method.
50#[derive(Debug, Copy, Clone)]
51pub struct Context<'a, 'b> {
52    /// The repo-relative using slashes as separator of the entry currently being processed.
53    pub rela_path: &'a BStr,
54    /// The name of the reference that `HEAD` is pointing to. It's passed to `process` filters if present.
55    pub ref_name: Option<&'b BStr>,
56    /// The root-level tree that contains the current entry directly or indirectly, or the commit owning the tree (if available).
57    ///
58    /// This is passed to `process` filters if present.
59    pub treeish: Option<gix_hash::ObjectId>,
60    /// The actual blob-hash of the data we are processing. It's passed to `process` filters if present.
61    ///
62    /// Note that this hash might be different from the `$Id$` of the respective `ident` filter, as the latter generates the hash itself.
63    pub blob: Option<gix_hash::ObjectId>,
64}
65
66/// Apply operations to filter programs.
67impl State {
68    /// Apply `operation` of `driver` to the bytes read from `src` and return a reader to immediately consume the output
69    /// produced by the filter. `rela_path` is the repo-relative path of the entry to handle.
70    /// It's possible that the filter stays inactive, in which case the `src` isn't consumed and has to be used by the caller.
71    ///
72    /// Each call to this method will cause the corresponding filter to be invoked unless `driver` indicates a `process` filter,
73    /// which is only launched once and maintained using this state.
74    ///
75    /// Note that it's not an error if there is no filter process for `operation` or if a long-running process doesn't support
76    /// the desired capability.
77    ///
78    /// ### Deviation
79    ///
80    /// If a long-running process returns the 'abort' status after receiving the data, it will be removed similarly to how `git` does it.
81    /// However, if it returns an unsuccessful error status later, it will not be removed, but reports the error only.
82    /// If any other non-'error' status is received, the process will be stopped. But that doesn't happen if such a status is received
83    /// after reading the filtered result.
84    pub fn apply<'a>(
85        &'a mut self,
86        driver: &Driver,
87        src: &mut impl std::io::Read,
88        operation: Operation,
89        ctx: Context<'_, '_>,
90    ) -> Result<Option<Box<dyn std::io::Read + 'a>>, Error> {
91        match self.apply_delayed(driver, src, operation, Delay::Forbid, ctx)? {
92            Some(MaybeDelayed::Delayed(_)) => {
93                unreachable!("we forbid delaying the entry")
94            }
95            Some(MaybeDelayed::Immediate(read)) => Ok(Some(read)),
96            None => Ok(None),
97        }
98    }
99
100    /// Like [`apply()`](Self::apply()), but use `delay` to determine if the filter result may be delayed or not.
101    ///
102    /// Poll [`list_delayed_paths()`](Self::list_delayed_paths()) until it is empty and query the available paths again.
103    /// Note that even though it's possible, the API assumes that commands aren't mixed when delays are allowed.
104    pub fn apply_delayed<'a>(
105        &'a mut self,
106        driver: &Driver,
107        src: &mut impl std::io::Read,
108        operation: Operation,
109        delay: Delay,
110        ctx: Context<'_, '_>,
111    ) -> Result<Option<MaybeDelayed<'a>>, Error> {
112        match self.maybe_launch_process(driver, operation, ctx.rela_path)? {
113            Some(Process::SingleFile { mut child, command }) => {
114                // To avoid deadlock when the filter immediately echoes input to output (like `cat`),
115                // we need to write to stdin and read from stdout concurrently. If we write all data
116                // to stdin before reading from stdout, and the pipe buffer fills up, both processes
117                // will block: the filter blocks writing to stdout (buffer full), and we block writing
118                // to stdin (waiting for the filter to consume data).
119                //
120                // Solution: Read all data into a buffer, then spawn a thread to write it to stdin
121                // while we can immediately read from stdout.
122                let mut input_data = Vec::new();
123                std::io::copy(src, &mut input_data)?;
124
125                let stdin = child.stdin.take().expect("configured");
126                let write_thread = WriterThread::write_all_in_background(input_data, stdin)?;
127
128                Ok(Some(MaybeDelayed::Immediate(Box::new(ReadFilterOutput {
129                    inner: child.stdout.take(),
130                    child: driver.required.then_some((child, command)),
131                    write_thread: Some(write_thread),
132                }))))
133            }
134            Some(Process::MultiFile { client, key }) => {
135                let command = operation.as_str();
136                if !client.capabilities().contains(command) {
137                    return Ok(None);
138                }
139
140                let invoke_result = client.invoke(
141                    command,
142                    &mut [
143                        ("pathname", Some(ctx.rela_path.to_owned())),
144                        ("ref", ctx.ref_name.map(ToOwned::to_owned)),
145                        ("treeish", ctx.treeish.map(|id| id.to_hex().to_string().into())),
146                        ("blob", ctx.blob.map(|id| id.to_hex().to_string().into())),
147                        (
148                            "can-delay",
149                            match delay {
150                                Delay::Allow if client.capabilities().contains("delay") => Some("1".into()),
151                                Delay::Forbid | Delay::Allow => None,
152                            },
153                        ),
154                    ]
155                    .into_iter()
156                    .filter_map(|(key, value)| value.map(|v| (key, v))),
157                    src,
158                );
159                let status = match invoke_result {
160                    Ok(status) => status,
161                    Err(err) => {
162                        let invoke::Error::Io(io_err) = &err;
163                        handle_io_err(io_err, &mut self.running, key.0.as_ref());
164                        return Err(Error::ProcessInvoke {
165                            command: command.into(),
166                            source: err,
167                        });
168                    }
169                };
170
171                if status.is_delayed() {
172                    if matches!(delay, Delay::Forbid) {
173                        return Err(Error::DelayNotAllowed);
174                    }
175                    Ok(Some(MaybeDelayed::Delayed(key)))
176                } else if status.is_success() {
177                    // TODO: find a way to not have to do the 'borrow-dance'.
178                    let client = self.running.remove(&key.0).expect("present for borrowcheck dance");
179                    self.running.insert(key.0.clone(), client);
180                    let client = self.running.get_mut(&key.0).expect("just inserted");
181
182                    Ok(Some(MaybeDelayed::Immediate(Box::new(client.as_read()))))
183                } else {
184                    let message = status.message().unwrap_or_default();
185                    match message {
186                        "abort" => {
187                            client.capabilities_mut().remove(command);
188                        }
189                        "error" => {}
190                        _strange => {
191                            let client = self.running.remove(&key.0).expect("we definitely have it");
192                            client.into_child().kill().ok();
193                        }
194                    }
195                    Err(Error::ProcessStatus {
196                        command: command.into(),
197                        status,
198                    })
199                }
200            }
201            None => Ok(None),
202        }
203    }
204}
205
206/// A type to represent delayed or immediate apply-filter results.
207pub enum MaybeDelayed<'a> {
208    /// Using the delayed protocol, this entry has been sent to a long-running process and needs to be
209    /// checked for again, later, using the [`driver::Key`] to refer to the filter who owes a response.
210    ///
211    /// Note that the path to the entry is also needed to obtain the filtered result later.
212    Delayed(driver::Key),
213    /// The filtered result can be read from the contained reader right away.
214    ///
215    /// Note that it must be consumed in full or till a read error occurs.
216    Immediate(Box<dyn std::io::Read + 'a>),
217}
218
219/// A helper to manage writing to stdin on a separate thread to avoid deadlock.
220struct WriterThread {
221    handle: Option<std::thread::JoinHandle<std::io::Result<()>>>,
222}
223
224impl WriterThread {
225    /// Spawn a thread that will write all data from `data` to `stdin`.
226    fn write_all_in_background(data: Vec<u8>, mut stdin: std::process::ChildStdin) -> std::io::Result<Self> {
227        let handle = std::thread::Builder::new()
228            .name("gix-filter-stdin-writer".into())
229            .stack_size(128 * 1024)
230            .spawn(move || {
231                use std::io::Write;
232                stdin.write_all(&data)?;
233                // Explicitly drop stdin to close the pipe and signal EOF to the child
234                drop(stdin);
235                Ok(())
236            })?;
237
238        Ok(Self { handle: Some(handle) })
239    }
240
241    /// Wait for the writer thread to complete and return any error that occurred.
242    fn join(&mut self) -> std::io::Result<()> {
243        let Some(handle) = self.handle.take() else {
244            return Ok(());
245        };
246        handle.join().map_err(|panic_info| {
247            let msg = if let Some(s) = panic_info.downcast_ref::<String>() {
248                format!("Writer thread panicked: {s}")
249            } else if let Some(s) = panic_info.downcast_ref::<&str>() {
250                format!("Writer thread panicked: {s}")
251            } else {
252                "Writer thread panicked while writing to filter stdin".to_string()
253            };
254            std::io::Error::other(msg)
255        })?
256    }
257}
258
259impl Drop for WriterThread {
260    fn drop(&mut self) {
261        // Best effort join on drop.
262        if let Err(_err) = self.join() {
263            gix_trace::debug!(err = %_err, "Failed to join writer thread during drop");
264        }
265    }
266}
267
268/// A utility type to facilitate streaming the output of a filter process.
269struct ReadFilterOutput {
270    inner: Option<std::process::ChildStdout>,
271    /// The child is present if we need its exit code to be positive.
272    child: Option<(std::process::Child, std::process::Command)>,
273    /// The thread writing to stdin, if any. Must be joined when reading is done.
274    write_thread: Option<WriterThread>,
275}
276
277pub(crate) fn handle_io_err(err: &std::io::Error, running: &mut HashMap<BString, process::Client>, process: &BStr) {
278    if matches!(
279        err.kind(),
280        std::io::ErrorKind::BrokenPipe | std::io::ErrorKind::UnexpectedEof
281    ) {
282        running.remove(process).expect("present or we wouldn't be here");
283    }
284}
285
286impl std::io::Read for ReadFilterOutput {
287    fn read(&mut self, buf: &mut [u8]) -> std::io::Result<usize> {
288        match self.inner.as_mut() {
289            Some(inner) => {
290                let num_read = match inner.read(buf) {
291                    Ok(n) => n,
292                    Err(e) => {
293                        // On read error, ensure we join the writer thread before propagating the error.
294                        // This is expected to finish with failure as well as it should fail to write
295                        // to the process which now fails to produce output (that we try to read).
296                        if let Some(mut write_thread) = self.write_thread.take() {
297                            // Try to join but prioritize the original read error
298                            if let Err(_thread_err) = write_thread.join() {
299                                gix_trace::debug!(thread_err = %_thread_err, read_err = %e, "write to stdin error during failed read");
300                            }
301                        }
302                        return Err(e);
303                    }
304                };
305
306                if num_read == 0 {
307                    self.inner.take();
308
309                    // Join the writer thread first to ensure all data has been written
310                    // and that resources are freed now.
311                    if let Some(mut write_thread) = self.write_thread.take() {
312                        write_thread.join()?;
313                    }
314
315                    if let Some((mut child, cmd)) = self.child.take() {
316                        let status = child.wait()?;
317                        if !status.success() {
318                            return Err(std::io::Error::other(format!("Driver process {cmd:?} failed")));
319                        }
320                    }
321                }
322                Ok(num_read)
323            }
324            None => Ok(0),
325        }
326    }
327}