gix_filter/driver/process/
client.rs1use std::{collections::HashSet, io::Write, str::FromStr};
2
3use bstr::{BStr, BString, ByteVec};
4
5use crate::driver::{
6 process,
7 process::{Capabilities, Client, PacketlineReader},
8};
9
10pub mod handshake {
12 #[derive(Debug, thiserror::Error)]
14 #[allow(missing_docs)]
15 pub enum Error {
16 #[error("Failed to read or write to the process")]
17 Io(#[from] std::io::Error),
18 #[error("{msg} '{actual}'")]
19 Protocol { msg: String, actual: String },
20 #[error("The server sent the '{name}' capability which isn't among the ones we desire can support")]
21 UnsupportedCapability { name: String },
22 }
23}
24
25pub mod invoke {
27 #[derive(Debug, thiserror::Error)]
29 #[allow(missing_docs)]
30 pub enum Error {
31 #[error("Failed to read or write to the process")]
32 Io(#[from] std::io::Error),
33 }
34
35 pub mod without_content {
37 #[derive(Debug, thiserror::Error)]
39 #[allow(missing_docs)]
40 pub enum Error {
41 #[error("Failed to read or write to the process")]
42 Io(#[from] std::io::Error),
43 #[error(transparent)]
44 PacketlineDecode(#[from] gix_packetline::decode::Error),
45 }
46
47 impl From<super::Error> for Error {
48 fn from(value: super::Error) -> Self {
49 match value {
50 super::Error::Io(err) => Error::Io(err),
51 }
52 }
53 }
54 }
55}
56
57impl Client {
59 pub fn handshake(
63 mut process: std::process::Child,
64 welcome_prefix: &str,
65 versions: &[usize],
66 desired_capabilities: &[&str],
67 ) -> Result<Self, handshake::Error> {
68 let mut out = gix_packetline::Writer::new(process.stdin.take().expect("configured stdin when spawning"));
69 out.write_all(format!("{welcome_prefix}-client").as_bytes())?;
70 for version in versions {
71 out.write_all(format!("version={version}").as_bytes())?;
72 }
73 gix_packetline::encode::flush_to_write(out.inner_mut())?;
74 out.flush()?;
75
76 let mut input = gix_packetline::StreamingPeekableIter::new(
77 process.stdout.take().expect("configured stdout when spawning"),
78 &[gix_packetline::PacketLineRef::Flush],
79 false, );
81 let mut read = input.as_read();
82 let mut buf = String::new();
83 read.read_line_to_string(&mut buf)?;
84 if buf
85 .strip_prefix(welcome_prefix)
86 .map_or(true, |rest| rest.trim_end() != "-server")
87 {
88 return Err(handshake::Error::Protocol {
89 msg: format!("Wanted '{welcome_prefix}-server, got "),
90 actual: buf,
91 });
92 }
93
94 let chosen_version;
95 buf.clear();
96 read.read_line_to_string(&mut buf)?;
97 match buf
98 .strip_prefix("version=")
99 .and_then(|version| usize::from_str(version.trim_end()).ok())
100 {
101 Some(version) => {
102 chosen_version = version;
103 }
104 None => {
105 return Err(handshake::Error::Protocol {
106 msg: "Needed 'version=<integer>', got ".into(),
107 actual: buf,
108 })
109 }
110 }
111
112 if !versions.contains(&chosen_version) {
113 return Err(handshake::Error::Protocol {
114 msg: format!("Server offered {chosen_version}, we only support "),
115 actual: versions.iter().map(ToString::to_string).collect::<Vec<_>>().join(", "),
116 });
117 }
118
119 if read.read_line_to_string(&mut buf)? != 0 {
120 return Err(handshake::Error::Protocol {
121 msg: "expected flush packet, got".into(),
122 actual: buf,
123 });
124 }
125 for capability in desired_capabilities {
126 out.write_all(format!("capability={capability}").as_bytes())?;
127 }
128 gix_packetline::encode::flush_to_write(out.inner_mut())?;
129 out.flush()?;
130
131 read.reset_with(&[gix_packetline::PacketLineRef::Flush]);
132 let mut capabilities = HashSet::new();
133 loop {
134 buf.clear();
135 let num_read = read.read_line_to_string(&mut buf)?;
136 if num_read == 0 {
137 break;
138 }
139 match buf.strip_prefix("capability=") {
140 Some(cap) => {
141 let cap = cap.trim_end();
142 if !desired_capabilities.contains(&cap) {
143 return Err(handshake::Error::UnsupportedCapability { name: cap.into() });
144 }
145 capabilities.insert(cap.to_owned());
146 }
147 None => continue,
148 }
149 }
150
151 drop(read);
152 Ok(Client {
153 child: process,
154 out: input,
155 input: out,
156 capabilities,
157 version: chosen_version,
158 })
159 }
160
161 pub fn invoke(
163 &mut self,
164 command: &str,
165 meta: &mut dyn Iterator<Item = (&str, BString)>,
166 content: &mut dyn std::io::Read,
167 ) -> Result<process::Status, invoke::Error> {
168 self.send_command_and_meta(command, meta)?;
169 std::io::copy(content, &mut self.input)?;
170 gix_packetline::encode::flush_to_write(self.input.inner_mut())?;
171 self.input.flush()?;
172 Ok(self.read_status()?)
173 }
174
175 pub fn invoke_without_content<'a>(
180 &mut self,
181 command: &str,
182 meta: &mut dyn Iterator<Item = (&'a str, BString)>,
183 inspect_line: &mut dyn FnMut(&BStr),
184 ) -> Result<process::Status, invoke::without_content::Error> {
185 self.send_command_and_meta(command, meta)?;
186 while let Some(data) = self.out.read_line() {
187 let line = data??;
188 if let Some(line) = line.as_text() {
189 inspect_line(line.as_bstr());
190 }
191 }
192 self.out.reset_with(&[gix_packetline::PacketLineRef::Flush]);
193 let status = self.read_status()?;
194 Ok(status)
195 }
196
197 pub fn as_read(&mut self) -> impl std::io::Read + '_ {
200 self.out.reset_with(&[gix_packetline::PacketLineRef::Flush]);
201 ReadProcessOutputAndStatus {
202 inner: self.out.as_read(),
203 }
204 }
205
206 pub fn read_status(&mut self) -> std::io::Result<process::Status> {
209 read_status(&mut self.out.as_read())
210 }
211}
212
213impl Client {
214 fn send_command_and_meta(
215 &mut self,
216 command: &str,
217 meta: &mut dyn Iterator<Item = (&str, BString)>,
218 ) -> Result<(), invoke::Error> {
219 self.input.write_all(format!("command={command}").as_bytes())?;
220 let mut buf = BString::default();
221 for (key, value) in meta {
222 buf.clear();
223 buf.push_str(key);
224 buf.push(b'=');
225 buf.push_str(&value);
226 self.input.write_all(&buf)?;
227 }
228 gix_packetline::encode::flush_to_write(self.input.inner_mut())?;
229 Ok(())
230 }
231}
232
233fn read_status(read: &mut PacketlineReader<'_>) -> std::io::Result<process::Status> {
234 let mut status = process::Status::Previous;
235 let mut buf = String::new();
236 let mut count = 0;
237 loop {
238 buf.clear();
239 let num_read = read.read_line_to_string(&mut buf)?;
240 if num_read == 0 {
241 break;
242 }
243 if let Some(name) = buf.strip_prefix("status=") {
244 status = process::Status::Named(name.trim_end().into());
245 }
246 count += 1;
247 }
248 if count > 0 && matches!(status, process::Status::Previous) {
249 status = process::Status::Unset;
250 }
251 read.reset_with(&[gix_packetline::PacketLineRef::Flush]);
252 Ok(status)
253}
254
255struct ReadProcessOutputAndStatus<'a> {
256 inner: PacketlineReader<'a>,
257}
258
259impl std::io::Read for ReadProcessOutputAndStatus<'_> {
260 fn read(&mut self, buf: &mut [u8]) -> std::io::Result<usize> {
261 let num_read = self.inner.read(buf)?;
262 if num_read == 0 {
263 self.inner.reset_with(&[gix_packetline::PacketLineRef::Flush]);
264 let status = read_status(&mut self.inner)?;
265 if status.is_success() {
266 Ok(0)
267 } else {
268 Err(std::io::Error::new(
269 std::io::ErrorKind::Other,
270 format!(
271 "Process indicated error after reading: {}",
272 status.message().unwrap_or_default()
273 ),
274 ))
275 }
276 } else {
277 Ok(num_read)
278 }
279 }
280}
281
282impl Client {
284 pub fn capabilities(&self) -> &Capabilities {
286 &self.capabilities
287 }
288
289 pub fn capabilities_mut(&mut self) -> &mut Capabilities {
291 &mut self.capabilities
292 }
293
294 pub fn version(&self) -> usize {
298 self.version
299 }
300}
301
302impl Client {
304 pub fn into_child(self) -> std::process::Child {
308 self.child
309 }
310}