gix_filter/driver/
apply.rs

1use std::collections::HashMap;
2
3use bstr::{BStr, BString};
4
5use crate::{
6    driver,
7    driver::{process, process::client::invoke, Operation, Process, State},
8    Driver,
9};
10
11/// What to do if delay is supported by a process filter.
12#[derive(Default, Debug, Copy, Clone)]
13pub enum Delay {
14    /// Use delayed processing for this entry.
15    ///
16    /// Note that it's up to the filter to determine whether or not the processing should be delayed.
17    #[default]
18    Allow,
19    /// Do not delay the processing, and force it to happen immediately. In this case, no delayed processing will occur
20    /// even if the filter supports it.
21    ///
22    /// This is the default as it requires no special precautions to be taken by the caller as
23    /// outputs will be produced immediately.
24    Forbid,
25}
26
27/// The error returned by [State::apply()][super::State::apply()].
28#[derive(Debug, thiserror::Error)]
29#[allow(missing_docs)]
30pub enum Error {
31    #[error(transparent)]
32    Init(#[from] driver::init::Error),
33    #[error("Could not write entire object to driver")]
34    WriteSource(#[from] std::io::Error),
35    #[error("Filter process delayed an entry even though that was not requested")]
36    DelayNotAllowed,
37    #[error("Failed to invoke '{command}' command")]
38    ProcessInvoke {
39        source: process::client::invoke::Error,
40        command: String,
41    },
42    #[error("The invoked command '{command}' in process indicated an error: {status:?}")]
43    ProcessStatus {
44        status: driver::process::Status,
45        command: String,
46    },
47}
48
49/// Additional information for use in the [`State::apply()`] method.
50#[derive(Debug, Copy, Clone)]
51pub struct Context<'a, 'b> {
52    /// The repo-relative using slashes as separator of the entry currently being processed.
53    pub rela_path: &'a BStr,
54    /// The name of the reference that `HEAD` is pointing to. It's passed to `process` filters if present.
55    pub ref_name: Option<&'b BStr>,
56    /// The root-level tree that contains the current entry directly or indirectly, or the commit owning the tree (if available).
57    ///
58    /// This is passed to `process` filters if present.
59    pub treeish: Option<gix_hash::ObjectId>,
60    /// The actual blob-hash of the data we are processing. It's passed to `process` filters if present.
61    ///
62    /// Note that this hash might be different from the `$Id$` of the respective `ident` filter, as the latter generates the hash itself.
63    pub blob: Option<gix_hash::ObjectId>,
64}
65
66/// Apply operations to filter programs.
67impl State {
68    /// Apply `operation` of `driver` to the bytes read from `src` and return a reader to immediately consume the output
69    /// produced by the filter. `rela_path` is the repo-relative path of the entry to handle.
70    /// It's possible that the filter stays inactive, in which case the `src` isn't consumed and has to be used by the caller.
71    ///
72    /// Each call to this method will cause the corresponding filter to be invoked unless `driver` indicates a `process` filter,
73    /// which is only launched once and maintained using this state.
74    ///
75    /// Note that it's not an error if there is no filter process for `operation` or if a long-running process doesn't support
76    /// the desired capability.
77    ///
78    /// ### Deviation
79    ///
80    /// If a long running process returns the 'abort' status after receiving the data, it will be removed similarly to how `git` does it.
81    /// However, if it returns an unsuccessful error status later, it will not be removed, but reports the error only.
82    /// If any other non-'error' status is received, the process will be stopped. But that doesn't happen if such a status is received
83    /// after reading the filtered result.
84    pub fn apply<'a>(
85        &'a mut self,
86        driver: &Driver,
87        src: &mut impl std::io::Read,
88        operation: Operation,
89        ctx: Context<'_, '_>,
90    ) -> Result<Option<Box<dyn std::io::Read + 'a>>, Error> {
91        match self.apply_delayed(driver, src, operation, Delay::Forbid, ctx)? {
92            Some(MaybeDelayed::Delayed(_)) => {
93                unreachable!("we forbid delaying the entry")
94            }
95            Some(MaybeDelayed::Immediate(read)) => Ok(Some(read)),
96            None => Ok(None),
97        }
98    }
99
100    /// Like [`apply()]`[Self::apply()], but use `delay` to determine if the filter result may be delayed or not.
101    ///
102    /// Poll [`list_delayed_paths()`][Self::list_delayed_paths()] until it is empty and query the available paths again.
103    /// Note that even though it's possible, the API assumes that commands aren't mixed when delays are allowed.
104    pub fn apply_delayed<'a>(
105        &'a mut self,
106        driver: &Driver,
107        src: &mut impl std::io::Read,
108        operation: Operation,
109        delay: Delay,
110        ctx: Context<'_, '_>,
111    ) -> Result<Option<MaybeDelayed<'a>>, Error> {
112        match self.maybe_launch_process(driver, operation, ctx.rela_path)? {
113            Some(Process::SingleFile { mut child, command }) => {
114                std::io::copy(src, &mut child.stdin.take().expect("configured"))?;
115                Ok(Some(MaybeDelayed::Immediate(Box::new(ReadFilterOutput {
116                    inner: child.stdout.take(),
117                    child: driver.required.then_some((child, command)),
118                }))))
119            }
120            Some(Process::MultiFile { client, key }) => {
121                let command = operation.as_str();
122                if !client.capabilities().contains(command) {
123                    return Ok(None);
124                }
125
126                let invoke_result = client.invoke(
127                    command,
128                    &mut [
129                        ("pathname", Some(ctx.rela_path.to_owned())),
130                        ("ref", ctx.ref_name.map(ToOwned::to_owned)),
131                        ("treeish", ctx.treeish.map(|id| id.to_hex().to_string().into())),
132                        ("blob", ctx.blob.map(|id| id.to_hex().to_string().into())),
133                        (
134                            "can-delay",
135                            match delay {
136                                Delay::Allow if client.capabilities().contains("delay") => Some("1".into()),
137                                Delay::Forbid | Delay::Allow => None,
138                            },
139                        ),
140                    ]
141                    .into_iter()
142                    .filter_map(|(key, value)| value.map(|v| (key, v))),
143                    src,
144                );
145                let status = match invoke_result {
146                    Ok(status) => status,
147                    Err(err) => {
148                        let invoke::Error::Io(io_err) = &err;
149                        handle_io_err(io_err, &mut self.running, key.0.as_ref());
150                        return Err(Error::ProcessInvoke {
151                            command: command.into(),
152                            source: err,
153                        });
154                    }
155                };
156
157                if status.is_delayed() {
158                    if matches!(delay, Delay::Forbid) {
159                        return Err(Error::DelayNotAllowed);
160                    }
161                    Ok(Some(MaybeDelayed::Delayed(key)))
162                } else if status.is_success() {
163                    // TODO: find a way to not have to do the 'borrow-dance'.
164                    let client = self.running.remove(&key.0).expect("present for borrowcheck dance");
165                    self.running.insert(key.0.clone(), client);
166                    let client = self.running.get_mut(&key.0).expect("just inserted");
167
168                    Ok(Some(MaybeDelayed::Immediate(Box::new(client.as_read()))))
169                } else {
170                    let message = status.message().unwrap_or_default();
171                    match message {
172                        "abort" => {
173                            client.capabilities_mut().remove(command);
174                        }
175                        "error" => {}
176                        _strange => {
177                            let client = self.running.remove(&key.0).expect("we definitely have it");
178                            client.into_child().kill().ok();
179                        }
180                    }
181                    Err(Error::ProcessStatus {
182                        command: command.into(),
183                        status,
184                    })
185                }
186            }
187            None => Ok(None),
188        }
189    }
190}
191
192/// A type to represent delayed or immediate apply-filter results.
193pub enum MaybeDelayed<'a> {
194    /// Using the delayed protocol, this entry has been sent to a long-running process and needs to be
195    /// checked for again, later, using the [`driver::Key`] to refer to the filter who owes a response.
196    ///
197    /// Note that the path to the entry is also needed to obtain the filtered result later.
198    Delayed(driver::Key),
199    /// The filtered result can be read from the contained reader right away.
200    ///
201    /// Note that it must be consumed in full or till a read error occurs.
202    Immediate(Box<dyn std::io::Read + 'a>),
203}
204
205/// A utility type to facilitate streaming the output of a filter process.
206struct ReadFilterOutput {
207    inner: Option<std::process::ChildStdout>,
208    /// The child is present if we need its exit code to be positive.
209    child: Option<(std::process::Child, std::process::Command)>,
210}
211
212pub(crate) fn handle_io_err(err: &std::io::Error, running: &mut HashMap<BString, process::Client>, process: &BStr) {
213    if matches!(
214        err.kind(),
215        std::io::ErrorKind::BrokenPipe | std::io::ErrorKind::UnexpectedEof
216    ) {
217        running.remove(process).expect("present or we wouldn't be here");
218    }
219}
220
221impl std::io::Read for ReadFilterOutput {
222    fn read(&mut self, buf: &mut [u8]) -> std::io::Result<usize> {
223        match self.inner.as_mut() {
224            Some(inner) => {
225                let num_read = inner.read(buf)?;
226                if num_read == 0 {
227                    self.inner.take();
228                    if let Some((mut child, cmd)) = self.child.take() {
229                        let status = child.wait()?;
230                        if !status.success() {
231                            return Err(std::io::Error::other(format!("Driver process {cmd:?} failed")));
232                        }
233                    }
234                }
235                Ok(num_read)
236            }
237            None => Ok(0),
238        }
239    }
240}