gix_filter/driver/process/
client.rs

1use std::{collections::HashSet, io::Write, str::FromStr};
2
3use bstr::{BStr, BString, ByteVec};
4
5use crate::driver::{
6    process,
7    process::{Capabilities, Client, PacketlineReader},
8};
9
10///
11pub mod handshake {
12    /// The error returned by [Client::handshake()][super::Client::handshake()].
13    #[derive(Debug, thiserror::Error)]
14    #[allow(missing_docs)]
15    pub enum Error {
16        #[error("Failed to read or write to the process")]
17        Io(#[from] std::io::Error),
18        #[error("{msg} '{actual}'")]
19        Protocol { msg: String, actual: String },
20        #[error("The server sent the '{name}' capability which isn't among the ones we desire can support")]
21        UnsupportedCapability { name: String },
22    }
23}
24
25///
26pub mod invoke {
27    /// The error returned by [Client::invoke()][super::Client::invoke()].
28    #[derive(Debug, thiserror::Error)]
29    #[allow(missing_docs)]
30    pub enum Error {
31        #[error("Failed to read or write to the process")]
32        Io(#[from] std::io::Error),
33    }
34
35    ///
36    pub mod without_content {
37        /// The error returned by [Client::invoke_without_content()][super::super::Client::invoke_without_content()].
38        #[derive(Debug, thiserror::Error)]
39        #[allow(missing_docs)]
40        pub enum Error {
41            #[error("Failed to read or write to the process")]
42            Io(#[from] std::io::Error),
43            #[error(transparent)]
44            PacketlineDecode(#[from] gix_packetline::decode::Error),
45        }
46
47        impl From<super::Error> for Error {
48            fn from(value: super::Error) -> Self {
49                match value {
50                    super::Error::Io(err) => Error::Io(err),
51                }
52            }
53        }
54    }
55}
56
57/// Protocol implementation
58impl Client {
59    /// Given a spawned `process` as created from `cmd`, use the 'long-running-process' protocol to send `welcome-prefix` and supported
60    /// `versions`, along with the `desired_capabilities`, and perform the handshake to negotiate a version to use along with
61    /// obtaining supported capabilities, which may be a sub-set of the desired capabilities.
62    pub fn handshake(
63        mut process: std::process::Child,
64        welcome_prefix: &str,
65        versions: &[usize],
66        desired_capabilities: &[&str],
67    ) -> Result<Self, handshake::Error> {
68        let mut out = gix_packetline::Writer::new(process.stdin.take().expect("configured stdin when spawning"));
69        out.write_all(format!("{welcome_prefix}-client").as_bytes())?;
70        for version in versions {
71            out.write_all(format!("version={version}").as_bytes())?;
72        }
73        gix_packetline::encode::flush_to_write(out.inner_mut())?;
74        out.flush()?;
75
76        let mut input = gix_packetline::StreamingPeekableIter::new(
77            process.stdout.take().expect("configured stdout when spawning"),
78            &[gix_packetline::PacketLineRef::Flush],
79            false, /* packet tracing */
80        );
81        let mut read = input.as_read();
82        let mut buf = String::new();
83        read.read_line_to_string(&mut buf)?;
84        if buf
85            .strip_prefix(welcome_prefix)
86            .map_or(true, |rest| rest.trim_end() != "-server")
87        {
88            return Err(handshake::Error::Protocol {
89                msg: format!("Wanted '{welcome_prefix}-server, got "),
90                actual: buf,
91            });
92        }
93
94        let chosen_version;
95        buf.clear();
96        read.read_line_to_string(&mut buf)?;
97        match buf
98            .strip_prefix("version=")
99            .and_then(|version| usize::from_str(version.trim_end()).ok())
100        {
101            Some(version) => {
102                chosen_version = version;
103            }
104            None => {
105                return Err(handshake::Error::Protocol {
106                    msg: "Needed 'version=<integer>', got ".into(),
107                    actual: buf,
108                })
109            }
110        }
111
112        if !versions.contains(&chosen_version) {
113            return Err(handshake::Error::Protocol {
114                msg: format!("Server offered {chosen_version}, we only support "),
115                actual: versions.iter().map(ToString::to_string).collect::<Vec<_>>().join(", "),
116            });
117        }
118
119        if read.read_line_to_string(&mut buf)? != 0 {
120            return Err(handshake::Error::Protocol {
121                msg: "expected flush packet, got".into(),
122                actual: buf,
123            });
124        }
125        for capability in desired_capabilities {
126            out.write_all(format!("capability={capability}").as_bytes())?;
127        }
128        gix_packetline::encode::flush_to_write(out.inner_mut())?;
129        out.flush()?;
130
131        read.reset_with(&[gix_packetline::PacketLineRef::Flush]);
132        let mut capabilities = HashSet::new();
133        loop {
134            buf.clear();
135            let num_read = read.read_line_to_string(&mut buf)?;
136            if num_read == 0 {
137                break;
138            }
139            match buf.strip_prefix("capability=") {
140                Some(cap) => {
141                    let cap = cap.trim_end();
142                    if !desired_capabilities.contains(&cap) {
143                        return Err(handshake::Error::UnsupportedCapability { name: cap.into() });
144                    }
145                    capabilities.insert(cap.to_owned());
146                }
147                None => continue,
148            }
149        }
150
151        drop(read);
152        Ok(Client {
153            child: process,
154            out: input,
155            input: out,
156            capabilities,
157            version: chosen_version,
158        })
159    }
160
161    /// Invoke `command` and send all `meta` data before sending all `content` in full.
162    pub fn invoke(
163        &mut self,
164        command: &str,
165        meta: &mut dyn Iterator<Item = (&str, BString)>,
166        content: &mut dyn std::io::Read,
167    ) -> Result<process::Status, invoke::Error> {
168        self.send_command_and_meta(command, meta)?;
169        std::io::copy(content, &mut self.input)?;
170        gix_packetline::encode::flush_to_write(self.input.inner_mut())?;
171        self.input.flush()?;
172        Ok(self.read_status()?)
173    }
174
175    /// Invoke `command` while passing `meta` data, but don't send any content, and return their status.
176    /// Call `inspect_line` for each line that we see as command response.
177    ///
178    /// This is for commands that don't expect a content stream.
179    pub fn invoke_without_content<'a>(
180        &mut self,
181        command: &str,
182        meta: &mut dyn Iterator<Item = (&'a str, BString)>,
183        inspect_line: &mut dyn FnMut(&BStr),
184    ) -> Result<process::Status, invoke::without_content::Error> {
185        self.send_command_and_meta(command, meta)?;
186        while let Some(data) = self.out.read_line() {
187            let line = data??;
188            if let Some(line) = line.as_text() {
189                inspect_line(line.as_bstr());
190            }
191        }
192        self.out.reset_with(&[gix_packetline::PacketLineRef::Flush]);
193        let status = self.read_status()?;
194        Ok(status)
195    }
196
197    /// Return a `Read` implementation that reads the server process output until the next flush package, and validates
198    /// the status. If the status indicates failure, the last read will also fail.
199    pub fn as_read(&mut self) -> impl std::io::Read + '_ {
200        self.out.reset_with(&[gix_packetline::PacketLineRef::Flush]);
201        ReadProcessOutputAndStatus {
202            inner: self.out.as_read(),
203        }
204    }
205
206    /// Read a `status=` line from the process output until it is exhausted.
207    /// Note that the last sent status line wins and no status line means that the `Previous` still counts.
208    pub fn read_status(&mut self) -> std::io::Result<process::Status> {
209        read_status(&mut self.out.as_read())
210    }
211}
212
213impl Client {
214    fn send_command_and_meta(
215        &mut self,
216        command: &str,
217        meta: &mut dyn Iterator<Item = (&str, BString)>,
218    ) -> Result<(), invoke::Error> {
219        self.input.write_all(format!("command={command}").as_bytes())?;
220        let mut buf = BString::default();
221        for (key, value) in meta {
222            buf.clear();
223            buf.push_str(key);
224            buf.push(b'=');
225            buf.push_str(&value);
226            self.input.write_all(&buf)?;
227        }
228        gix_packetline::encode::flush_to_write(self.input.inner_mut())?;
229        Ok(())
230    }
231}
232
233fn read_status(read: &mut PacketlineReader<'_>) -> std::io::Result<process::Status> {
234    let mut status = process::Status::Previous;
235    let mut buf = String::new();
236    let mut count = 0;
237    loop {
238        buf.clear();
239        let num_read = read.read_line_to_string(&mut buf)?;
240        if num_read == 0 {
241            break;
242        }
243        if let Some(name) = buf.strip_prefix("status=") {
244            status = process::Status::Named(name.trim_end().into());
245        }
246        count += 1;
247    }
248    if count > 0 && matches!(status, process::Status::Previous) {
249        status = process::Status::Unset;
250    }
251    read.reset_with(&[gix_packetline::PacketLineRef::Flush]);
252    Ok(status)
253}
254
255struct ReadProcessOutputAndStatus<'a> {
256    inner: PacketlineReader<'a>,
257}
258
259impl std::io::Read for ReadProcessOutputAndStatus<'_> {
260    fn read(&mut self, buf: &mut [u8]) -> std::io::Result<usize> {
261        let num_read = self.inner.read(buf)?;
262        if num_read == 0 {
263            self.inner.reset_with(&[gix_packetline::PacketLineRef::Flush]);
264            let status = read_status(&mut self.inner)?;
265            if status.is_success() {
266                Ok(0)
267            } else {
268                Err(std::io::Error::new(
269                    std::io::ErrorKind::Other,
270                    format!(
271                        "Process indicated error after reading: {}",
272                        status.message().unwrap_or_default()
273                    ),
274                ))
275            }
276        } else {
277            Ok(num_read)
278        }
279    }
280}
281
282/// Access
283impl Client {
284    /// Return the list of capabilities reported by the serving process.
285    pub fn capabilities(&self) -> &Capabilities {
286        &self.capabilities
287    }
288
289    /// Return the mutable list of capabilities reported by the serving process.
290    pub fn capabilities_mut(&mut self) -> &mut Capabilities {
291        &mut self.capabilities
292    }
293
294    /// Return the negotiated version of the protocol.
295    ///
296    /// Note that it is the highest one that both the client and the server support.
297    pub fn version(&self) -> usize {
298        self.version
299    }
300}
301
302/// Lifecycle
303impl Client {
304    /// Return the child handle of the running process.
305    ///
306    /// Note that this will naturally close input and output handles, which is a signal for the child process to shutdown.
307    pub fn into_child(self) -> std::process::Child {
308        self.child
309    }
310}