gix_filter/driver/process/
client.rs

1use std::{collections::HashSet, io::Write, str::FromStr};
2
3use bstr::{BStr, BString, ByteVec};
4
5use crate::driver::{
6    process,
7    process::{Capabilities, Client, PacketlineReader},
8};
9
10///
11pub mod handshake {
12    /// The error returned by [Client::handshake()][super::Client::handshake()].
13    #[derive(Debug, thiserror::Error)]
14    #[allow(missing_docs)]
15    pub enum Error {
16        #[error("Failed to read or write to the process")]
17        Io(#[from] std::io::Error),
18        #[error("{msg} '{actual}'")]
19        Protocol { msg: String, actual: String },
20        #[error("The server sent the '{name}' capability which isn't among the ones we desire can support")]
21        UnsupportedCapability { name: String },
22    }
23}
24
25///
26pub mod invoke {
27    /// The error returned by [Client::invoke()][super::Client::invoke()].
28    #[derive(Debug, thiserror::Error)]
29    #[allow(missing_docs)]
30    pub enum Error {
31        #[error("Failed to read or write to the process")]
32        Io(#[from] std::io::Error),
33    }
34
35    ///
36    pub mod without_content {
37        /// The error returned by [Client::invoke_without_content()][super::super::Client::invoke_without_content()].
38        #[derive(Debug, thiserror::Error)]
39        #[allow(missing_docs)]
40        pub enum Error {
41            #[error("Failed to read or write to the process")]
42            Io(#[from] std::io::Error),
43            #[error(transparent)]
44            PacketlineDecode(#[from] gix_packetline_blocking::decode::Error),
45        }
46
47        impl From<super::Error> for Error {
48            fn from(value: super::Error) -> Self {
49                match value {
50                    super::Error::Io(err) => Error::Io(err),
51                }
52            }
53        }
54    }
55}
56
57/// Protocol implementation
58impl Client {
59    /// Given a spawned `process` as created from `cmd`, use the 'long-running-process' protocol to send `welcome-prefix` and supported
60    /// `versions`, along with the `desired_capabilities`, and perform the handshake to negotiate a version to use along with
61    /// obtaining supported capabilities, which may be a sub-set of the desired capabilities.
62    pub fn handshake(
63        mut process: std::process::Child,
64        welcome_prefix: &str,
65        versions: &[usize],
66        desired_capabilities: &[&str],
67    ) -> Result<Self, handshake::Error> {
68        let mut out =
69            gix_packetline_blocking::Writer::new(process.stdin.take().expect("configured stdin when spawning"));
70        out.write_all(format!("{welcome_prefix}-client").as_bytes())?;
71        for version in versions {
72            out.write_all(format!("version={version}").as_bytes())?;
73        }
74        gix_packetline_blocking::encode::flush_to_write(out.inner_mut())?;
75        out.flush()?;
76
77        let mut input = gix_packetline_blocking::StreamingPeekableIter::new(
78            process.stdout.take().expect("configured stdout when spawning"),
79            &[gix_packetline_blocking::PacketLineRef::Flush],
80            false, /* packet tracing */
81        );
82        let mut read = input.as_read();
83        let mut buf = String::new();
84        read.read_line_to_string(&mut buf)?;
85        if buf
86            .strip_prefix(welcome_prefix)
87            .is_none_or(|rest| rest.trim_end() != "-server")
88        {
89            return Err(handshake::Error::Protocol {
90                msg: format!("Wanted '{welcome_prefix}-server, got "),
91                actual: buf,
92            });
93        }
94
95        let chosen_version;
96        buf.clear();
97        read.read_line_to_string(&mut buf)?;
98        match buf
99            .strip_prefix("version=")
100            .and_then(|version| usize::from_str(version.trim_end()).ok())
101        {
102            Some(version) => {
103                chosen_version = version;
104            }
105            None => {
106                return Err(handshake::Error::Protocol {
107                    msg: "Needed 'version=<integer>', got ".into(),
108                    actual: buf,
109                })
110            }
111        }
112
113        if !versions.contains(&chosen_version) {
114            return Err(handshake::Error::Protocol {
115                msg: format!("Server offered {chosen_version}, we only support "),
116                actual: versions.iter().map(ToString::to_string).collect::<Vec<_>>().join(", "),
117            });
118        }
119
120        if read.read_line_to_string(&mut buf)? != 0 {
121            return Err(handshake::Error::Protocol {
122                msg: "expected flush packet, got".into(),
123                actual: buf,
124            });
125        }
126        for capability in desired_capabilities {
127            out.write_all(format!("capability={capability}").as_bytes())?;
128        }
129        gix_packetline_blocking::encode::flush_to_write(out.inner_mut())?;
130        out.flush()?;
131
132        read.reset_with(&[gix_packetline_blocking::PacketLineRef::Flush]);
133        let mut capabilities = HashSet::new();
134        loop {
135            buf.clear();
136            let num_read = read.read_line_to_string(&mut buf)?;
137            if num_read == 0 {
138                break;
139            }
140            match buf.strip_prefix("capability=") {
141                Some(cap) => {
142                    let cap = cap.trim_end();
143                    if !desired_capabilities.contains(&cap) {
144                        return Err(handshake::Error::UnsupportedCapability { name: cap.into() });
145                    }
146                    capabilities.insert(cap.to_owned());
147                }
148                None => continue,
149            }
150        }
151
152        drop(read);
153        Ok(Client {
154            child: process,
155            out: input,
156            input: out,
157            capabilities,
158            version: chosen_version,
159        })
160    }
161
162    /// Invoke `command` and send all `meta` data before sending all `content` in full.
163    pub fn invoke(
164        &mut self,
165        command: &str,
166        meta: &mut dyn Iterator<Item = (&str, BString)>,
167        content: &mut dyn std::io::Read,
168    ) -> Result<process::Status, invoke::Error> {
169        self.send_command_and_meta(command, meta)?;
170        std::io::copy(content, &mut self.input)?;
171        gix_packetline_blocking::encode::flush_to_write(self.input.inner_mut())?;
172        self.input.flush()?;
173        Ok(self.read_status()?)
174    }
175
176    /// Invoke `command` while passing `meta` data, but don't send any content, and return their status.
177    /// Call `inspect_line` for each line that we see as command response.
178    ///
179    /// This is for commands that don't expect a content stream.
180    pub fn invoke_without_content<'a>(
181        &mut self,
182        command: &str,
183        meta: &mut dyn Iterator<Item = (&'a str, BString)>,
184        inspect_line: &mut dyn FnMut(&BStr),
185    ) -> Result<process::Status, invoke::without_content::Error> {
186        self.send_command_and_meta(command, meta)?;
187        while let Some(data) = self.out.read_line() {
188            let line = data??;
189            if let Some(line) = line.as_text() {
190                inspect_line(line.as_bstr());
191            }
192        }
193        self.out.reset_with(&[gix_packetline_blocking::PacketLineRef::Flush]);
194        let status = self.read_status()?;
195        Ok(status)
196    }
197
198    /// Return a `Read` implementation that reads the server process output until the next flush package, and validates
199    /// the status. If the status indicates failure, the last read will also fail.
200    pub fn as_read(&mut self) -> impl std::io::Read + '_ {
201        self.out.reset_with(&[gix_packetline_blocking::PacketLineRef::Flush]);
202        ReadProcessOutputAndStatus {
203            inner: self.out.as_read(),
204        }
205    }
206
207    /// Read a `status=` line from the process output until it is exhausted.
208    /// Note that the last sent status line wins and no status line means that the `Previous` still counts.
209    pub fn read_status(&mut self) -> std::io::Result<process::Status> {
210        read_status(&mut self.out.as_read())
211    }
212}
213
214impl Client {
215    fn send_command_and_meta(
216        &mut self,
217        command: &str,
218        meta: &mut dyn Iterator<Item = (&str, BString)>,
219    ) -> Result<(), invoke::Error> {
220        self.input.write_all(format!("command={command}").as_bytes())?;
221        let mut buf = BString::default();
222        for (key, value) in meta {
223            buf.clear();
224            buf.push_str(key);
225            buf.push(b'=');
226            buf.push_str(&value);
227            self.input.write_all(&buf)?;
228        }
229        gix_packetline_blocking::encode::flush_to_write(self.input.inner_mut())?;
230        Ok(())
231    }
232}
233
234fn read_status(read: &mut PacketlineReader<'_>) -> std::io::Result<process::Status> {
235    let mut status = process::Status::Previous;
236    let mut buf = String::new();
237    let mut count = 0;
238    loop {
239        buf.clear();
240        let num_read = read.read_line_to_string(&mut buf)?;
241        if num_read == 0 {
242            break;
243        }
244        if let Some(name) = buf.strip_prefix("status=") {
245            status = process::Status::Named(name.trim_end().into());
246        }
247        count += 1;
248    }
249    if count > 0 && matches!(status, process::Status::Previous) {
250        status = process::Status::Unset;
251    }
252    read.reset_with(&[gix_packetline_blocking::PacketLineRef::Flush]);
253    Ok(status)
254}
255
256struct ReadProcessOutputAndStatus<'a> {
257    inner: PacketlineReader<'a>,
258}
259
260impl std::io::Read for ReadProcessOutputAndStatus<'_> {
261    fn read(&mut self, buf: &mut [u8]) -> std::io::Result<usize> {
262        let num_read = self.inner.read(buf)?;
263        if num_read == 0 {
264            self.inner.reset_with(&[gix_packetline_blocking::PacketLineRef::Flush]);
265            let status = read_status(&mut self.inner)?;
266            if status.is_success() {
267                Ok(0)
268            } else {
269                Err(std::io::Error::other(format!(
270                    "Process indicated error after reading: {}",
271                    status.message().unwrap_or_default()
272                )))
273            }
274        } else {
275            Ok(num_read)
276        }
277    }
278}
279
280/// Access
281impl Client {
282    /// Return the list of capabilities reported by the serving process.
283    pub fn capabilities(&self) -> &Capabilities {
284        &self.capabilities
285    }
286
287    /// Return the mutable list of capabilities reported by the serving process.
288    pub fn capabilities_mut(&mut self) -> &mut Capabilities {
289        &mut self.capabilities
290    }
291
292    /// Return the negotiated version of the protocol.
293    ///
294    /// Note that it is the highest one that both the client and the server support.
295    pub fn version(&self) -> usize {
296        self.version
297    }
298}
299
300/// Lifecycle
301impl Client {
302    /// Return the child handle of the running process.
303    ///
304    /// Note that this will naturally close input and output handles, which is a signal for the child process to shutdown.
305    pub fn into_child(self) -> std::process::Child {
306        self.child
307    }
308}