gix_filter/driver/apply.rs
1use std::collections::HashMap;
2
3use bstr::{BStr, BString};
4
5use crate::{
6 driver,
7 driver::{process, process::client::invoke, Operation, Process, State},
8 Driver,
9};
10
11/// What to do if delay is supported by a process filter.
12#[derive(Default, Debug, Copy, Clone)]
13pub enum Delay {
14 /// Use delayed processing for this entry.
15 ///
16 /// Note that it's up to the filter to determine whether or not the processing should be delayed.
17 #[default]
18 Allow,
19 /// Do not delay the processing, and force it to happen immediately. In this case, no delayed processing will occur
20 /// even if the filter supports it.
21 ///
22 /// This is the default as it requires no special precautions to be taken by the caller as
23 /// outputs will be produced immediately.
24 Forbid,
25}
26
27/// The error returned by [State::apply()][super::State::apply()].
28#[derive(Debug, thiserror::Error)]
29#[allow(missing_docs)]
30pub enum Error {
31 #[error(transparent)]
32 Init(#[from] driver::init::Error),
33 #[error("Could not write entire object to driver")]
34 WriteSource(#[from] std::io::Error),
35 #[error("Filter process delayed an entry even though that was not requested")]
36 DelayNotAllowed,
37 #[error("Failed to invoke '{command}' command")]
38 ProcessInvoke {
39 source: process::client::invoke::Error,
40 command: String,
41 },
42 #[error("The invoked command '{command}' in process indicated an error: {status:?}")]
43 ProcessStatus {
44 status: driver::process::Status,
45 command: String,
46 },
47}
48
49/// Additional information for use in the [`State::apply()`] method.
50#[derive(Debug, Copy, Clone)]
51pub struct Context<'a, 'b> {
52 /// The repo-relative using slashes as separator of the entry currently being processed.
53 pub rela_path: &'a BStr,
54 /// The name of the reference that `HEAD` is pointing to. It's passed to `process` filters if present.
55 pub ref_name: Option<&'b BStr>,
56 /// The root-level tree that contains the current entry directly or indirectly, or the commit owning the tree (if available).
57 ///
58 /// This is passed to `process` filters if present.
59 pub treeish: Option<gix_hash::ObjectId>,
60 /// The actual blob-hash of the data we are processing. It's passed to `process` filters if present.
61 ///
62 /// Note that this hash might be different from the `$Id$` of the respective `ident` filter, as the latter generates the hash itself.
63 pub blob: Option<gix_hash::ObjectId>,
64}
65
66/// Apply operations to filter programs.
67impl State {
68 /// Apply `operation` of `driver` to the bytes read from `src` and return a reader to immediately consume the output
69 /// produced by the filter. `rela_path` is the repo-relative path of the entry to handle.
70 /// It's possible that the filter stays inactive, in which case the `src` isn't consumed and has to be used by the caller.
71 ///
72 /// Each call to this method will cause the corresponding filter to be invoked unless `driver` indicates a `process` filter,
73 /// which is only launched once and maintained using this state.
74 ///
75 /// Note that it's not an error if there is no filter process for `operation` or if a long-running process doesn't support
76 /// the desired capability.
77 ///
78 /// ### Deviation
79 ///
80 /// If a long running process returns the 'abort' status after receiving the data, it will be removed similarly to how `git` does it.
81 /// However, if it returns an unsuccessful error status later, it will not be removed, but reports the error only.
82 /// If any other non-'error' status is received, the process will be stopped. But that doesn't happen if such a status is received
83 /// after reading the filtered result.
84 pub fn apply<'a>(
85 &'a mut self,
86 driver: &Driver,
87 src: &mut impl std::io::Read,
88 operation: Operation,
89 ctx: Context<'_, '_>,
90 ) -> Result<Option<Box<dyn std::io::Read + 'a>>, Error> {
91 match self.apply_delayed(driver, src, operation, Delay::Forbid, ctx)? {
92 Some(MaybeDelayed::Delayed(_)) => {
93 unreachable!("we forbid delaying the entry")
94 }
95 Some(MaybeDelayed::Immediate(read)) => Ok(Some(read)),
96 None => Ok(None),
97 }
98 }
99
100 /// Like [`apply()]`[Self::apply()], but use `delay` to determine if the filter result may be delayed or not.
101 ///
102 /// Poll [`list_delayed_paths()`][Self::list_delayed_paths()] until it is empty and query the available paths again.
103 /// Note that even though it's possible, the API assumes that commands aren't mixed when delays are allowed.
104 pub fn apply_delayed<'a>(
105 &'a mut self,
106 driver: &Driver,
107 src: &mut impl std::io::Read,
108 operation: Operation,
109 delay: Delay,
110 ctx: Context<'_, '_>,
111 ) -> Result<Option<MaybeDelayed<'a>>, Error> {
112 match self.maybe_launch_process(driver, operation, ctx.rela_path)? {
113 Some(Process::SingleFile { mut child, command }) => {
114 std::io::copy(src, &mut child.stdin.take().expect("configured"))?;
115 Ok(Some(MaybeDelayed::Immediate(Box::new(ReadFilterOutput {
116 inner: child.stdout.take(),
117 child: driver.required.then_some((child, command)),
118 }))))
119 }
120 Some(Process::MultiFile { client, key }) => {
121 let command = operation.as_str();
122 if !client.capabilities().contains(command) {
123 return Ok(None);
124 }
125
126 let invoke_result = client.invoke(
127 command,
128 &mut [
129 ("pathname", Some(ctx.rela_path.to_owned())),
130 ("ref", ctx.ref_name.map(ToOwned::to_owned)),
131 ("treeish", ctx.treeish.map(|id| id.to_hex().to_string().into())),
132 ("blob", ctx.blob.map(|id| id.to_hex().to_string().into())),
133 (
134 "can-delay",
135 match delay {
136 Delay::Allow if client.capabilities().contains("delay") => Some("1".into()),
137 Delay::Forbid | Delay::Allow => None,
138 },
139 ),
140 ]
141 .into_iter()
142 .filter_map(|(key, value)| value.map(|v| (key, v))),
143 src,
144 );
145 let status = match invoke_result {
146 Ok(status) => status,
147 Err(err) => {
148 let invoke::Error::Io(io_err) = &err;
149 handle_io_err(io_err, &mut self.running, key.0.as_ref());
150 return Err(Error::ProcessInvoke {
151 command: command.into(),
152 source: err,
153 });
154 }
155 };
156
157 if status.is_delayed() {
158 if matches!(delay, Delay::Forbid) {
159 return Err(Error::DelayNotAllowed);
160 }
161 Ok(Some(MaybeDelayed::Delayed(key)))
162 } else if status.is_success() {
163 // TODO: find a way to not have to do the 'borrow-dance'.
164 let client = self.running.remove(&key.0).expect("present for borrowcheck dance");
165 self.running.insert(key.0.clone(), client);
166 let client = self.running.get_mut(&key.0).expect("just inserted");
167
168 Ok(Some(MaybeDelayed::Immediate(Box::new(client.as_read()))))
169 } else {
170 let message = status.message().unwrap_or_default();
171 match message {
172 "abort" => {
173 client.capabilities_mut().remove(command);
174 }
175 "error" => {}
176 _strange => {
177 let client = self.running.remove(&key.0).expect("we definitely have it");
178 client.into_child().kill().ok();
179 }
180 }
181 Err(Error::ProcessStatus {
182 command: command.into(),
183 status,
184 })
185 }
186 }
187 None => Ok(None),
188 }
189 }
190}
191
192/// A type to represent delayed or immediate apply-filter results.
193pub enum MaybeDelayed<'a> {
194 /// Using the delayed protocol, this entry has been sent to a long-running process and needs to be
195 /// checked for again, later, using the [`driver::Key`] to refer to the filter who owes a response.
196 ///
197 /// Note that the path to the entry is also needed to obtain the filtered result later.
198 Delayed(driver::Key),
199 /// The filtered result can be read from the contained reader right away.
200 ///
201 /// Note that it must be consumed in full or till a read error occurs.
202 Immediate(Box<dyn std::io::Read + 'a>),
203}
204
205/// A utility type to facilitate streaming the output of a filter process.
206struct ReadFilterOutput {
207 inner: Option<std::process::ChildStdout>,
208 /// The child is present if we need its exit code to be positive.
209 child: Option<(std::process::Child, std::process::Command)>,
210}
211
212pub(crate) fn handle_io_err(err: &std::io::Error, running: &mut HashMap<BString, process::Client>, process: &BStr) {
213 if matches!(
214 err.kind(),
215 std::io::ErrorKind::BrokenPipe | std::io::ErrorKind::UnexpectedEof
216 ) {
217 running.remove(process).expect("present or we wouldn't be here");
218 }
219}
220
221impl std::io::Read for ReadFilterOutput {
222 fn read(&mut self, buf: &mut [u8]) -> std::io::Result<usize> {
223 match self.inner.as_mut() {
224 Some(inner) => {
225 let num_read = inner.read(buf)?;
226 if num_read == 0 {
227 self.inner.take();
228 if let Some((mut child, cmd)) = self.child.take() {
229 let status = child.wait()?;
230 if !status.success() {
231 return Err(std::io::Error::other(format!("Driver process {cmd:?} failed")));
232 }
233 }
234 }
235 Ok(num_read)
236 }
237 None => Ok(0),
238 }
239 }
240}