gix_filter/driver/process/
client.rs1use std::{collections::HashSet, io::Write, str::FromStr};
2
3use bstr::{BStr, BString, ByteVec};
4
5use crate::driver::{
6 process,
7 process::{Capabilities, Client, PacketlineReader},
8};
9
10pub mod handshake {
12 #[derive(Debug, thiserror::Error)]
14 #[allow(missing_docs)]
15 pub enum Error {
16 #[error("Failed to read or write to the process")]
17 Io(#[from] std::io::Error),
18 #[error("{msg} '{actual}'")]
19 Protocol { msg: String, actual: String },
20 #[error("The server sent the '{name}' capability which isn't among the ones we desire can support")]
21 UnsupportedCapability { name: String },
22 }
23}
24
25pub mod invoke {
27 #[derive(Debug, thiserror::Error)]
29 #[allow(missing_docs)]
30 pub enum Error {
31 #[error("Failed to read or write to the process")]
32 Io(#[from] std::io::Error),
33 }
34
35 pub mod without_content {
37 #[derive(Debug, thiserror::Error)]
39 #[allow(missing_docs)]
40 pub enum Error {
41 #[error("Failed to read or write to the process")]
42 Io(#[from] std::io::Error),
43 #[error(transparent)]
44 PacketlineDecode(#[from] gix_packetline_blocking::decode::Error),
45 }
46
47 impl From<super::Error> for Error {
48 fn from(value: super::Error) -> Self {
49 match value {
50 super::Error::Io(err) => Error::Io(err),
51 }
52 }
53 }
54 }
55}
56
57impl Client {
59 pub fn handshake(
63 mut process: std::process::Child,
64 welcome_prefix: &str,
65 versions: &[usize],
66 desired_capabilities: &[&str],
67 ) -> Result<Self, handshake::Error> {
68 let mut out =
69 gix_packetline_blocking::Writer::new(process.stdin.take().expect("configured stdin when spawning"));
70 out.write_all(format!("{welcome_prefix}-client").as_bytes())?;
71 for version in versions {
72 out.write_all(format!("version={version}").as_bytes())?;
73 }
74 gix_packetline_blocking::encode::flush_to_write(out.inner_mut())?;
75 out.flush()?;
76
77 let mut input = gix_packetline_blocking::StreamingPeekableIter::new(
78 process.stdout.take().expect("configured stdout when spawning"),
79 &[gix_packetline_blocking::PacketLineRef::Flush],
80 false, );
82 let mut read = input.as_read();
83 let mut buf = String::new();
84 read.read_line_to_string(&mut buf)?;
85 if buf
86 .strip_prefix(welcome_prefix)
87 .is_none_or(|rest| rest.trim_end() != "-server")
88 {
89 return Err(handshake::Error::Protocol {
90 msg: format!("Wanted '{welcome_prefix}-server, got "),
91 actual: buf,
92 });
93 }
94
95 let chosen_version;
96 buf.clear();
97 read.read_line_to_string(&mut buf)?;
98 match buf
99 .strip_prefix("version=")
100 .and_then(|version| usize::from_str(version.trim_end()).ok())
101 {
102 Some(version) => {
103 chosen_version = version;
104 }
105 None => {
106 return Err(handshake::Error::Protocol {
107 msg: "Needed 'version=<integer>', got ".into(),
108 actual: buf,
109 })
110 }
111 }
112
113 if !versions.contains(&chosen_version) {
114 return Err(handshake::Error::Protocol {
115 msg: format!("Server offered {chosen_version}, we only support "),
116 actual: versions.iter().map(ToString::to_string).collect::<Vec<_>>().join(", "),
117 });
118 }
119
120 if read.read_line_to_string(&mut buf)? != 0 {
121 return Err(handshake::Error::Protocol {
122 msg: "expected flush packet, got".into(),
123 actual: buf,
124 });
125 }
126 for capability in desired_capabilities {
127 out.write_all(format!("capability={capability}").as_bytes())?;
128 }
129 gix_packetline_blocking::encode::flush_to_write(out.inner_mut())?;
130 out.flush()?;
131
132 read.reset_with(&[gix_packetline_blocking::PacketLineRef::Flush]);
133 let mut capabilities = HashSet::new();
134 loop {
135 buf.clear();
136 let num_read = read.read_line_to_string(&mut buf)?;
137 if num_read == 0 {
138 break;
139 }
140 match buf.strip_prefix("capability=") {
141 Some(cap) => {
142 let cap = cap.trim_end();
143 if !desired_capabilities.contains(&cap) {
144 return Err(handshake::Error::UnsupportedCapability { name: cap.into() });
145 }
146 capabilities.insert(cap.to_owned());
147 }
148 None => continue,
149 }
150 }
151
152 drop(read);
153 Ok(Client {
154 child: process,
155 out: input,
156 input: out,
157 capabilities,
158 version: chosen_version,
159 })
160 }
161
162 pub fn invoke(
164 &mut self,
165 command: &str,
166 meta: &mut dyn Iterator<Item = (&str, BString)>,
167 content: &mut dyn std::io::Read,
168 ) -> Result<process::Status, invoke::Error> {
169 self.send_command_and_meta(command, meta)?;
170 std::io::copy(content, &mut self.input)?;
171 gix_packetline_blocking::encode::flush_to_write(self.input.inner_mut())?;
172 self.input.flush()?;
173 Ok(self.read_status()?)
174 }
175
176 pub fn invoke_without_content<'a>(
181 &mut self,
182 command: &str,
183 meta: &mut dyn Iterator<Item = (&'a str, BString)>,
184 inspect_line: &mut dyn FnMut(&BStr),
185 ) -> Result<process::Status, invoke::without_content::Error> {
186 self.send_command_and_meta(command, meta)?;
187 while let Some(data) = self.out.read_line() {
188 let line = data??;
189 if let Some(line) = line.as_text() {
190 inspect_line(line.as_bstr());
191 }
192 }
193 self.out.reset_with(&[gix_packetline_blocking::PacketLineRef::Flush]);
194 let status = self.read_status()?;
195 Ok(status)
196 }
197
198 pub fn as_read(&mut self) -> impl std::io::Read + '_ {
201 self.out.reset_with(&[gix_packetline_blocking::PacketLineRef::Flush]);
202 ReadProcessOutputAndStatus {
203 inner: self.out.as_read(),
204 }
205 }
206
207 pub fn read_status(&mut self) -> std::io::Result<process::Status> {
210 read_status(&mut self.out.as_read())
211 }
212}
213
214impl Client {
215 fn send_command_and_meta(
216 &mut self,
217 command: &str,
218 meta: &mut dyn Iterator<Item = (&str, BString)>,
219 ) -> Result<(), invoke::Error> {
220 self.input.write_all(format!("command={command}").as_bytes())?;
221 let mut buf = BString::default();
222 for (key, value) in meta {
223 buf.clear();
224 buf.push_str(key);
225 buf.push(b'=');
226 buf.push_str(&value);
227 self.input.write_all(&buf)?;
228 }
229 gix_packetline_blocking::encode::flush_to_write(self.input.inner_mut())?;
230 Ok(())
231 }
232}
233
234fn read_status(read: &mut PacketlineReader<'_>) -> std::io::Result<process::Status> {
235 let mut status = process::Status::Previous;
236 let mut buf = String::new();
237 let mut count = 0;
238 loop {
239 buf.clear();
240 let num_read = read.read_line_to_string(&mut buf)?;
241 if num_read == 0 {
242 break;
243 }
244 if let Some(name) = buf.strip_prefix("status=") {
245 status = process::Status::Named(name.trim_end().into());
246 }
247 count += 1;
248 }
249 if count > 0 && matches!(status, process::Status::Previous) {
250 status = process::Status::Unset;
251 }
252 read.reset_with(&[gix_packetline_blocking::PacketLineRef::Flush]);
253 Ok(status)
254}
255
256struct ReadProcessOutputAndStatus<'a> {
257 inner: PacketlineReader<'a>,
258}
259
260impl std::io::Read for ReadProcessOutputAndStatus<'_> {
261 fn read(&mut self, buf: &mut [u8]) -> std::io::Result<usize> {
262 let num_read = self.inner.read(buf)?;
263 if num_read == 0 {
264 self.inner.reset_with(&[gix_packetline_blocking::PacketLineRef::Flush]);
265 let status = read_status(&mut self.inner)?;
266 if status.is_success() {
267 Ok(0)
268 } else {
269 Err(std::io::Error::other(format!(
270 "Process indicated error after reading: {}",
271 status.message().unwrap_or_default()
272 )))
273 }
274 } else {
275 Ok(num_read)
276 }
277 }
278}
279
280impl Client {
282 pub fn capabilities(&self) -> &Capabilities {
284 &self.capabilities
285 }
286
287 pub fn capabilities_mut(&mut self) -> &mut Capabilities {
289 &mut self.capabilities
290 }
291
292 pub fn version(&self) -> usize {
296 self.version
297 }
298}
299
300impl Client {
302 pub fn into_child(self) -> std::process::Child {
306 self.child
307 }
308}