1use std::collections::HashMap;
10use std::io::{self, Read, Write};
11
12use git_lfs_git::pktline;
13use git_lfs_pointer::Pointer;
14use git_lfs_store::Store;
15
16use crate::{FetchError, clean, smudge_with_fetch};
17
18#[derive(Debug, thiserror::Error)]
19pub enum FilterProcessError {
20 #[error(transparent)]
21 Io(#[from] io::Error),
22 #[error("filter-process handshake: {0}")]
23 Handshake(String),
24 #[error("filter-process: missing required header {0:?}")]
25 MissingHeader(&'static str),
26 #[error("filter-process: unknown command {0:?}")]
27 UnknownCommand(String),
28}
29
30pub fn filter_process<R, W, F>(
39 store: &Store,
40 input: R,
41 output: W,
42 mut fetch: F,
43) -> Result<(), FilterProcessError>
44where
45 R: Read,
46 W: Write,
47 F: FnMut(&Pointer) -> Result<(), FetchError>,
48{
49 let mut reader = pktline::Reader::new(input);
50 let mut writer = pktline::Writer::new(output);
51
52 handshake(&mut reader, &mut writer)?;
53
54 loop {
55 let headers = match read_headers(&mut reader) {
58 Ok(Some(h)) => h,
59 Ok(None) => return Ok(()),
60 Err(FilterProcessError::Io(e)) if e.kind() == io::ErrorKind::UnexpectedEof => {
61 return Ok(());
62 }
63 Err(e) => return Err(e),
64 };
65
66 let payload = read_payload(&mut reader)?;
67 let command = headers
68 .get("command")
69 .ok_or(FilterProcessError::MissingHeader("command"))?
70 .clone();
71
72 match command.as_str() {
73 "clean" => process_clean(store, &mut writer, &payload)?,
74 "smudge" => process_smudge(store, &mut writer, &payload, &mut fetch)?,
75 other => return Err(FilterProcessError::UnknownCommand(other.into())),
76 }
77 writer.flush()?;
78 }
79}
80
81fn handshake<R: Read, W: Write>(
82 reader: &mut pktline::Reader<R>,
83 writer: &mut pktline::Writer<W>,
84) -> Result<(), FilterProcessError> {
85 let welcome = reader
87 .read_text()?
88 .ok_or_else(|| FilterProcessError::Handshake("expected welcome, got flush".into()))?;
89 if welcome != "git-filter-client" {
90 return Err(FilterProcessError::Handshake(format!(
91 "expected git-filter-client, got {welcome:?}"
92 )));
93 }
94 let mut versions = Vec::new();
95 while let Some(line) = reader.read_text()? {
96 versions.push(line);
97 }
98 if !versions.iter().any(|v| v == "version=2") {
99 return Err(FilterProcessError::Handshake(format!(
100 "client doesn't advertise version=2 (got {versions:?})"
101 )));
102 }
103 writer.write_text("git-filter-server")?;
104 writer.write_text("version=2")?;
105 writer.write_flush()?;
106
107 writer.write_text("capability=clean")?;
115 writer.write_text("capability=smudge")?;
116 writer.write_flush()?;
117 writer.flush()?;
118
119 let mut caps = Vec::new();
124 while let Some(line) = reader.read_text()? {
125 caps.push(line);
126 }
127 for required in ["capability=clean", "capability=smudge"] {
128 if !caps.iter().any(|c| c == required) {
129 return Err(FilterProcessError::Handshake(format!(
130 "client missing required {required} (got {caps:?})"
131 )));
132 }
133 }
134
135 Ok(())
136}
137
138fn read_headers<R: Read>(
139 reader: &mut pktline::Reader<R>,
140) -> Result<Option<HashMap<String, String>>, FilterProcessError> {
141 let first = reader.read_text()?;
142 let Some(first) = first else {
143 return Ok(None);
145 };
146 let mut map = HashMap::new();
147 insert_kv(&mut map, &first);
148 while let Some(line) = reader.read_text()? {
149 insert_kv(&mut map, &line);
150 }
151 Ok(Some(map))
152}
153
154fn insert_kv(map: &mut HashMap<String, String>, line: &str) {
155 if let Some((k, v)) = line.split_once('=') {
156 map.insert(k.to_owned(), v.to_owned());
157 }
158}
159
160fn read_payload<R: Read>(
161 reader: &mut pktline::Reader<R>,
162) -> Result<Vec<u8>, FilterProcessError> {
163 let mut payload = Vec::new();
164 while let Some(packet) = reader.read_packet()? {
165 payload.extend_from_slice(&packet);
166 }
167 Ok(payload)
168}
169
170fn process_clean<W: Write>(
173 store: &Store,
174 writer: &mut pktline::Writer<W>,
175 payload: &[u8],
176) -> Result<(), FilterProcessError> {
177 write_initial_status(writer)?;
178 let result = run_through_sink(writer, |sink| {
179 clean(store, &mut { payload }, sink)
180 .map(|_| ())
181 .map_err(|e| io::Error::other(e.to_string()))
182 });
183 write_final_status(writer, result.is_ok())?;
184 Ok(())
185}
186
187fn process_smudge<W, F>(
188 store: &Store,
189 writer: &mut pktline::Writer<W>,
190 payload: &[u8],
191 fetch: &mut F,
192) -> Result<(), FilterProcessError>
193where
194 W: Write,
195 F: FnMut(&Pointer) -> Result<(), FetchError>,
196{
197 write_initial_status(writer)?;
198 let result = run_through_sink(writer, |sink| {
199 smudge_with_fetch(store, &mut { payload }, sink, |p| fetch(p))
203 .map(|_| ())
204 .map_err(|e| io::Error::other(e.to_string()))
205 });
206 write_final_status(writer, result.is_ok())?;
207 Ok(())
208}
209
210fn write_initial_status<W: Write>(writer: &mut pktline::Writer<W>) -> io::Result<()> {
211 writer.write_text("status=success")?;
212 writer.write_flush()
213}
214
215fn write_final_status<W: Write>(writer: &mut pktline::Writer<W>, ok: bool) -> io::Result<()> {
216 writer.write_text(if ok { "status=success" } else { "status=error" })?;
220 writer.write_flush()
221}
222
223fn run_through_sink<W, F>(writer: &mut pktline::Writer<W>, f: F) -> io::Result<()>
227where
228 W: Write,
229 F: FnOnce(&mut pktline::Sink<'_, W>) -> io::Result<()>,
230{
231 let result = {
232 let mut sink = pktline::Sink::new(writer);
233 let r = f(&mut sink);
234 sink.flush()?;
235 r
236 };
237 writer.write_flush()?;
238 result
239}
240
241#[cfg(test)]
242mod tests {
243 use super::*;
244 use git_lfs_pointer::VERSION_LATEST;
245 use std::io::Cursor;
246 use tempfile::TempDir;
247
248 fn fixture() -> (TempDir, Store) {
249 let tmp = TempDir::new().unwrap();
250 let store = Store::new(tmp.path().join("lfs"));
251 (tmp, store)
252 }
253
254 struct PktBuilder(Vec<u8>);
256
257 impl PktBuilder {
258 fn new() -> Self {
259 Self(Vec::new())
260 }
261 fn text(mut self, s: &str) -> Self {
262 let body = format!("{s}\n");
263 let total = body.len() + 4;
264 self.0.extend_from_slice(format!("{total:04x}").as_bytes());
265 self.0.extend_from_slice(body.as_bytes());
266 self
267 }
268 fn data(mut self, b: &[u8]) -> Self {
269 let total = b.len() + 4;
270 self.0.extend_from_slice(format!("{total:04x}").as_bytes());
271 self.0.extend_from_slice(b);
272 self
273 }
274 fn flush(mut self) -> Self {
275 self.0.extend_from_slice(b"0000");
276 self
277 }
278 fn build(self) -> Vec<u8> {
279 self.0
280 }
281 }
282
283 #[derive(Debug, PartialEq)]
286 enum Tok {
287 Text(String),
288 Bin(Vec<u8>),
289 Flush,
290 }
291
292 fn decode(bytes: &[u8]) -> Vec<Tok> {
293 let mut r = pktline::Reader::new(Cursor::new(bytes));
294 let mut out = Vec::new();
295 loop {
296 match r.read_packet() {
297 Ok(Some(p)) => match String::from_utf8(p.clone()) {
298 Ok(s) => out.push(Tok::Text(s.trim_end_matches('\n').to_owned())),
299 Err(_) => out.push(Tok::Bin(p)),
300 },
301 Ok(None) => out.push(Tok::Flush),
302 Err(e) if e.kind() == io::ErrorKind::UnexpectedEof => return out,
303 Err(e) => panic!("decode error: {e}"),
304 }
305 }
306 }
307
308 fn handshake_input() -> PktBuilder {
309 PktBuilder::new()
310 .text("git-filter-client")
311 .text("version=2")
312 .flush()
313 .text("capability=clean")
314 .text("capability=smudge")
315 .flush()
316 }
317
318 fn no_fetch(_p: &Pointer) -> Result<(), FetchError> {
321 Err("test: no fetcher configured".into())
322 }
323
324 fn run(store: &Store, input: Vec<u8>) -> Vec<u8> {
325 let mut output = Vec::new();
326 filter_process(store, Cursor::new(input), &mut output, no_fetch).unwrap();
327 output
328 }
329
330 #[test]
331 fn handshake_only_then_clean_shutdown() {
332 let (_t, store) = fixture();
333 let output = run(&store, handshake_input().build());
334 let toks = decode(&output);
335 assert_eq!(
337 toks,
338 vec![
339 Tok::Text("git-filter-server".into()),
340 Tok::Text("version=2".into()),
341 Tok::Flush,
342 Tok::Text("capability=clean".into()),
343 Tok::Text("capability=smudge".into()),
344 Tok::Flush,
345 ],
346 );
347 }
348
349 #[test]
350 fn clean_request_emits_pointer() {
351 let (_t, store) = fixture();
352 let input = handshake_input()
353 .text("command=clean")
354 .text("pathname=hello.bin")
355 .flush()
356 .data(b"hello world\n")
357 .flush()
358 .build();
359 let output = run(&store, input);
360
361 let toks = decode(&output);
363 let rest = &toks[6..];
364 assert_eq!(rest[0], Tok::Text("status=success".into()));
365 assert_eq!(rest[1], Tok::Flush);
366 if let Tok::Text(t) = &rest[2] {
368 assert!(t.starts_with("version https://git-lfs.github.com/spec/v1\n"));
369 assert!(t.contains("oid sha256:"));
370 assert!(t.contains("size 12"));
371 } else {
372 panic!("expected text pointer, got {:?}", rest[2]);
373 }
374 assert_eq!(rest[3], Tok::Flush); assert_eq!(rest[4], Tok::Text("status=success".into()));
376 assert_eq!(rest[5], Tok::Flush);
377 }
378
379 #[test]
380 fn smudge_request_emits_content() {
381 let (_t, store) = fixture();
382 let mut pointer = Vec::new();
384 clean(&store, &mut { &b"smudge a\n"[..] }, &mut pointer).unwrap();
385
386 let input = handshake_input()
387 .text("command=smudge")
388 .text("pathname=a.dat")
389 .flush()
390 .data(&pointer)
391 .flush()
392 .build();
393 let output = run(&store, input);
394 let toks = decode(&output);
395 let rest = &toks[6..];
396 assert_eq!(rest[0], Tok::Text("status=success".into()));
397 assert_eq!(rest[1], Tok::Flush);
398 assert_eq!(rest[2], Tok::Text("smudge a".into()));
400 assert_eq!(rest[3], Tok::Flush);
401 assert_eq!(rest[4], Tok::Text("status=success".into()));
402 }
403
404 #[test]
405 fn smudge_missing_object_emits_status_error() {
406 let (_t, store) = fixture();
407 let unknown = "0000000000000000000000000000000000000000000000000000000000000001";
408 let pointer = format!("version {VERSION_LATEST}\noid sha256:{unknown}\nsize 5\n");
409 let input = handshake_input()
410 .text("command=smudge")
411 .text("pathname=missing.dat")
412 .flush()
413 .data(pointer.as_bytes())
414 .flush()
415 .build();
416 let output = run(&store, input);
417 let toks = decode(&output);
418 let rest = &toks[6..];
419 assert_eq!(rest[0], Tok::Text("status=success".into())); assert_eq!(rest[1], Tok::Flush);
421 assert_eq!(rest[2], Tok::Flush);
423 assert_eq!(rest[3], Tok::Text("status=error".into()));
424 assert_eq!(rest[4], Tok::Flush);
425 }
426
427 #[test]
428 fn smudge_invokes_fetcher_when_object_missing() {
429 let (_t, store) = fixture();
430 let content = b"fetched on demand\n";
431 let mut pointer = Vec::new();
434 clean(&store, &mut { &content[..] }, &mut pointer).unwrap();
435 let parsed = git_lfs_pointer::Pointer::parse(&pointer).unwrap();
436 std::fs::remove_file(store.object_path(parsed.oid)).unwrap();
437
438 let input = handshake_input()
439 .text("command=smudge")
440 .text("pathname=a.dat")
441 .flush()
442 .data(&pointer)
443 .flush()
444 .build();
445
446 let mut output = Vec::new();
447 let store_ref = &store;
448 filter_process(&store, Cursor::new(input), &mut output, |p: &Pointer| {
449 store_ref.insert(&mut { &content[..] }).unwrap();
451 assert_eq!(p.oid, parsed.oid);
452 Ok(())
453 })
454 .unwrap();
455
456 let toks = decode(&output);
457 let rest = &toks[6..];
458 assert_eq!(rest[0], Tok::Text("status=success".into()));
459 assert_eq!(rest[1], Tok::Flush);
460 assert_eq!(rest[2], Tok::Text("fetched on demand".into()));
462 assert_eq!(rest[3], Tok::Flush);
463 assert_eq!(rest[4], Tok::Text("status=success".into()));
464 }
465
466 #[test]
467 fn multiple_requests_in_one_session() {
468 let (_t, store) = fixture();
469 let input = handshake_input()
470 .text("command=clean")
471 .text("pathname=a.bin")
472 .flush()
473 .data(b"AAA")
474 .flush()
475 .text("command=clean")
476 .text("pathname=b.bin")
477 .flush()
478 .data(b"BBB")
479 .flush()
480 .build();
481 let output = run(&store, input);
482 let toks = decode(&output);
483 assert_eq!(toks.len(), 6 + 6 + 6, "got tokens: {toks:?}");
486 }
487}