Skip to main content

git_lfs_filter/
filter_process.rs

1//! The git filter-process protocol.
2//!
3//! Implements the long-running-filter side of git's `gitprotocol-common`(5)
4//! framing: a one-time handshake + capability negotiation, then a loop of
5//! request/response pairs over packet-line framing on stdin/stdout. Same
6//! business logic as per-invocation `clean`/`smudge`, just batched in one
7//! subprocess for the duration of a checkout/commit.
8
9use std::collections::HashMap;
10use std::io::{self, Read, Write};
11
12use git_lfs_git::pktline;
13use git_lfs_pointer::Pointer;
14use git_lfs_store::Store;
15
16use crate::{CleanExtension, FetchError, SmudgeExtension, SmudgeOutcome, clean, smudge_with_fetch};
17
18#[derive(Debug, thiserror::Error)]
19pub enum FilterProcessError {
20    #[error(transparent)]
21    Io(#[from] io::Error),
22    #[error("filter-process handshake: {0}")]
23    Handshake(String),
24    #[error("filter-process: missing required header {0:?}")]
25    MissingHeader(&'static str),
26    #[error("filter-process: unknown command {0:?}")]
27    UnknownCommand(String),
28}
29
30/// Run the filter-process protocol against `input`/`output` (typically
31/// stdin/stdout). Returns when git closes its end of the pipe.
32///
33/// `fetch` is called when a smudge request hits an object that isn't in
34/// the local store; see [`smudge_with_fetch`] for semantics. If fetching
35/// is not supported in the caller's context, pass a closure that always
36/// errors — the protocol will then surface those smudges as `status=error`
37/// to git, same as if [`smudge`](crate::smudge) hit `ObjectMissing`.
38///
39/// `skip_smudge` reflects the upstream `GIT_LFS_SKIP_SMUDGE` env var.
40/// When true, smudge requests pass the pointer text through unchanged
41/// — the working-tree file ends up holding pointer text and `git lfs
42/// pull` (or another smudge run) is the recovery path. Clean requests
43/// are unaffected.
44#[allow(clippy::too_many_arguments)]
45pub fn filter_process<R, W, F>(
46    store: &Store,
47    input: R,
48    output: W,
49    mut fetch: F,
50    skip_smudge: bool,
51    clean_extensions: &[CleanExtension],
52    smudge_extensions: &[SmudgeExtension],
53    smudge_path_filter: &dyn Fn(&str) -> bool,
54) -> Result<(), FilterProcessError>
55where
56    R: Read,
57    W: Write,
58    F: FnMut(&Pointer) -> Result<(), FetchError>,
59{
60    let mut reader = pktline::Reader::new(input);
61    let mut writer = pktline::Writer::new(output);
62
63    handshake(&mut reader, &mut writer)?;
64
65    let mut malformed: Vec<String> = Vec::new();
66
67    loop {
68        // A read error here at packet-boundary normally means git closed the
69        // pipe — that's the protocol's "we're done" signal, not a real error.
70        let headers = match read_headers(&mut reader) {
71            Ok(Some(h)) => h,
72            Ok(None) => break,
73            Err(FilterProcessError::Io(e)) if e.kind() == io::ErrorKind::UnexpectedEof => {
74                break;
75            }
76            Err(e) => return Err(e),
77        };
78
79        let payload = read_payload(&mut reader)?;
80        let command = headers
81            .get("command")
82            .ok_or(FilterProcessError::MissingHeader("command"))?
83            .clone();
84        let pathname = headers.get("pathname").map(String::as_str).unwrap_or("");
85
86        match command.as_str() {
87            "clean" => process_clean(store, &mut writer, &payload, pathname, clean_extensions)?,
88            "smudge" if skip_smudge => process_smudge_passthrough(&mut writer, &payload)?,
89            // `lfs.fetchinclude` / `lfs.fetchexclude` excluded the
90            // path: pass the pointer text through unchanged. Mirrors
91            // upstream's clone-time include/exclude (test 2 of
92            // t-filter-process); the user can run `git lfs pull`
93            // later to materialize.
94            "smudge" if !smudge_path_filter(pathname) => {
95                process_smudge_passthrough(&mut writer, &payload)?;
96            }
97            "smudge" => {
98                let outcome = process_smudge(
99                    store,
100                    &mut writer,
101                    &payload,
102                    pathname,
103                    smudge_extensions,
104                    &mut fetch,
105                )?;
106                if matches!(outcome, Some(SmudgeOutcome::Passthrough)) {
107                    malformed.push(pathname.to_owned());
108                }
109            }
110            other => return Err(FilterProcessError::UnknownCommand(other.into())),
111        }
112        writer.flush()?;
113    }
114
115    if !malformed.is_empty() {
116        report_malformed(&malformed);
117    }
118    Ok(())
119}
120
121fn report_malformed(malformed: &[String]) {
122    let stderr = io::stderr();
123    let mut out = stderr.lock();
124    let header = if malformed.len() == 1 {
125        format!(
126            "Encountered {} file that should have been a pointer, but wasn't:\n",
127            malformed.len()
128        )
129    } else {
130        format!(
131            "Encountered {} files that should have been pointers, but weren't:\n",
132            malformed.len()
133        )
134    };
135    let _ = out.write_all(header.as_bytes());
136    for name in malformed {
137        let _ = writeln!(out, "\t{name}");
138    }
139}
140
141fn handshake<R: Read, W: Write>(
142    reader: &mut pktline::Reader<R>,
143    writer: &mut pktline::Writer<W>,
144) -> Result<(), FilterProcessError> {
145    // Welcome.
146    let welcome = reader
147        .read_text()?
148        .ok_or_else(|| FilterProcessError::Handshake("expected welcome, got flush".into()))?;
149    if welcome != "git-filter-client" {
150        return Err(FilterProcessError::Handshake(format!(
151            "expected git-filter-client, got {welcome:?}"
152        )));
153    }
154    let mut versions = Vec::new();
155    while let Some(line) = reader.read_text()? {
156        versions.push(line);
157    }
158    if !versions.iter().any(|v| v == "version=2") {
159        return Err(FilterProcessError::Handshake(format!(
160            "client doesn't advertise version=2 (got {versions:?})"
161        )));
162    }
163    writer.write_text("git-filter-server")?;
164    writer.write_text("version=2")?;
165    writer.write_flush()?;
166
167    // Send our capabilities *before* reading the client's. The protocol
168    // doc reads as if the client speaks first ("capability=…" then a
169    // flush, then server replies), but real git serializes the two
170    // exchanges and won't send its capabilities until it has seen the
171    // server's. Upstream Go's filter-process advertises preemptively for
172    // the same reason — diverging from that reordering deadlocks any
173    // shell-test that does `git add` of an LFS-tracked path.
174    writer.write_text("capability=clean")?;
175    writer.write_text("capability=smudge")?;
176    writer.write_flush()?;
177    writer.flush()?;
178
179    // Drain the client's advertised capabilities (informational — we
180    // don't gate on them). We *do* require clean + smudge to be in the
181    // set so that misconfigured callers get a clear error rather than
182    // silent "command not understood" later.
183    let mut caps = Vec::new();
184    while let Some(line) = reader.read_text()? {
185        caps.push(line);
186    }
187    for required in ["capability=clean", "capability=smudge"] {
188        if !caps.iter().any(|c| c == required) {
189            return Err(FilterProcessError::Handshake(format!(
190                "client missing required {required} (got {caps:?})"
191            )));
192        }
193    }
194
195    Ok(())
196}
197
198fn read_headers<R: Read>(
199    reader: &mut pktline::Reader<R>,
200) -> Result<Option<HashMap<String, String>>, FilterProcessError> {
201    let first = reader.read_text()?;
202    let Some(first) = first else {
203        // Bare flush at top of loop is unexpected from git; treat as shutdown.
204        return Ok(None);
205    };
206    let mut map = HashMap::new();
207    insert_kv(&mut map, &first);
208    while let Some(line) = reader.read_text()? {
209        insert_kv(&mut map, &line);
210    }
211    Ok(Some(map))
212}
213
214fn insert_kv(map: &mut HashMap<String, String>, line: &str) {
215    if let Some((k, v)) = line.split_once('=') {
216        map.insert(k.to_owned(), v.to_owned());
217    }
218}
219
220fn read_payload<R: Read>(reader: &mut pktline::Reader<R>) -> Result<Vec<u8>, FilterProcessError> {
221    let mut payload = Vec::new();
222    while let Some(packet) = reader.read_packet()? {
223        payload.extend_from_slice(&packet);
224    }
225    Ok(payload)
226}
227
228/// Run one clean request through the protocol envelope:
229/// `status=success` + flush, content packets + flush, final `status=...` + flush.
230fn process_clean<W: Write>(
231    store: &Store,
232    writer: &mut pktline::Writer<W>,
233    payload: &[u8],
234    pathname: &str,
235    extensions: &[CleanExtension],
236) -> Result<(), FilterProcessError> {
237    write_initial_status(writer)?;
238    let result = run_through_sink(writer, |sink| {
239        clean(store, &mut { payload }, sink, pathname, extensions)
240            .map(|_| ())
241            .map_err(|e| io::Error::other(e.to_string()))
242    });
243    write_final_status(writer, result.is_ok())?;
244    Ok(())
245}
246
247/// `GIT_LFS_SKIP_SMUDGE=1` mode: emit the pointer payload unchanged.
248fn process_smudge_passthrough<W: Write>(
249    writer: &mut pktline::Writer<W>,
250    payload: &[u8],
251) -> Result<(), FilterProcessError> {
252    write_initial_status(writer)?;
253    let result = run_through_sink(writer, |sink| sink.write_all(payload));
254    write_final_status(writer, result.is_ok())?;
255    Ok(())
256}
257
258fn process_smudge<W, F>(
259    store: &Store,
260    writer: &mut pktline::Writer<W>,
261    payload: &[u8],
262    pathname: &str,
263    smudge_extensions: &[SmudgeExtension],
264    fetch: &mut F,
265) -> Result<Option<SmudgeOutcome>, FilterProcessError>
266where
267    W: Write,
268    F: FnMut(&Pointer) -> Result<(), FetchError>,
269{
270    write_initial_status(writer)?;
271    let mut outcome: Option<SmudgeOutcome> = None;
272    let result = run_through_sink(writer, |sink| {
273        // The protocol only differentiates success vs. error at this layer;
274        // the specific reason (ObjectMissing, FetchFailed, ExtensionFailed,
275        // …) is logged by the caller's stderr if they care.
276        match smudge_with_fetch(
277            store,
278            &mut { payload },
279            sink,
280            pathname,
281            smudge_extensions,
282            |p| fetch(p),
283        ) {
284            Ok(o) => {
285                outcome = Some(o);
286                Ok(())
287            }
288            Err(e) => Err(io::Error::other(e.to_string())),
289        }
290    });
291    write_final_status(writer, result.is_ok())?;
292    Ok(outcome)
293}
294
295fn write_initial_status<W: Write>(writer: &mut pktline::Writer<W>) -> io::Result<()> {
296    writer.write_text("status=success")?;
297    writer.write_flush()
298}
299
300fn write_final_status<W: Write>(writer: &mut pktline::Writer<W>, ok: bool) -> io::Result<()> {
301    // End-of-content flush comes from the sink runner; this is the
302    // post-content "trailer" status that tells git "all done, no errors"
303    // (or "I lied, error happened").
304    writer.write_text(if ok { "status=success" } else { "status=error" })?;
305    writer.write_flush()
306}
307
308/// Runs `f` with a packet-line sink, then flushes the sink and emits the
309/// end-of-content flush regardless of `f`'s result. The result of `f` is
310/// returned for the caller's status decision.
311fn run_through_sink<W, F>(writer: &mut pktline::Writer<W>, f: F) -> io::Result<()>
312where
313    W: Write,
314    F: FnOnce(&mut pktline::Sink<'_, W>) -> io::Result<()>,
315{
316    let result = {
317        let mut sink = pktline::Sink::new(writer);
318        let r = f(&mut sink);
319        sink.flush()?;
320        r
321    };
322    writer.write_flush()?;
323    result
324}
325
326#[cfg(test)]
327mod tests {
328    use super::*;
329    use git_lfs_pointer::VERSION_LATEST;
330    use std::io::Cursor;
331    use tempfile::TempDir;
332
333    fn fixture() -> (TempDir, Store) {
334        let tmp = TempDir::new().unwrap();
335        let store = Store::new(tmp.path().join("lfs"));
336        (tmp, store)
337    }
338
339    /// Build a stream of pktline packets the way git would send them.
340    struct PktBuilder(Vec<u8>);
341
342    impl PktBuilder {
343        fn new() -> Self {
344            Self(Vec::new())
345        }
346        fn text(mut self, s: &str) -> Self {
347            let body = format!("{s}\n");
348            let total = body.len() + 4;
349            self.0.extend_from_slice(format!("{total:04x}").as_bytes());
350            self.0.extend_from_slice(body.as_bytes());
351            self
352        }
353        fn data(mut self, b: &[u8]) -> Self {
354            let total = b.len() + 4;
355            self.0.extend_from_slice(format!("{total:04x}").as_bytes());
356            self.0.extend_from_slice(b);
357            self
358        }
359        fn flush(mut self) -> Self {
360            self.0.extend_from_slice(b"0000");
361            self
362        }
363        fn build(self) -> Vec<u8> {
364            self.0
365        }
366    }
367
368    /// Decode the response stream into a flat Vec of "packet or flush" tokens
369    /// for assertions.
370    #[derive(Debug, PartialEq)]
371    enum Tok {
372        Text(String),
373        Bin(Vec<u8>),
374        Flush,
375    }
376
377    fn decode(bytes: &[u8]) -> Vec<Tok> {
378        let mut r = pktline::Reader::new(Cursor::new(bytes));
379        let mut out = Vec::new();
380        loop {
381            match r.read_packet() {
382                Ok(Some(p)) => match String::from_utf8(p.clone()) {
383                    Ok(s) => out.push(Tok::Text(s.trim_end_matches('\n').to_owned())),
384                    Err(_) => out.push(Tok::Bin(p)),
385                },
386                Ok(None) => out.push(Tok::Flush),
387                Err(e) if e.kind() == io::ErrorKind::UnexpectedEof => return out,
388                Err(e) => panic!("decode error: {e}"),
389            }
390        }
391    }
392
393    fn handshake_input() -> PktBuilder {
394        PktBuilder::new()
395            .text("git-filter-client")
396            .text("version=2")
397            .flush()
398            .text("capability=clean")
399            .text("capability=smudge")
400            .flush()
401    }
402
403    /// Fetcher that always errors — the right default for tests where
404    /// we don't expect any miss. Lies surface as `status=error` to git.
405    fn no_fetch(_p: &Pointer) -> Result<(), FetchError> {
406        Err("test: no fetcher configured".into())
407    }
408
409    fn run(store: &Store, input: Vec<u8>) -> Vec<u8> {
410        let mut output = Vec::new();
411        filter_process(
412            store,
413            Cursor::new(input),
414            &mut output,
415            no_fetch,
416            false,
417            &[],
418            &[],
419            &|_| true,
420        )
421        .unwrap();
422        output
423    }
424
425    #[test]
426    fn handshake_only_then_clean_shutdown() {
427        let (_t, store) = fixture();
428        let output = run(&store, handshake_input().build());
429        let toks = decode(&output);
430        // Server welcome + 2 caps + their respective flushes.
431        assert_eq!(
432            toks,
433            vec![
434                Tok::Text("git-filter-server".into()),
435                Tok::Text("version=2".into()),
436                Tok::Flush,
437                Tok::Text("capability=clean".into()),
438                Tok::Text("capability=smudge".into()),
439                Tok::Flush,
440            ],
441        );
442    }
443
444    #[test]
445    fn clean_request_emits_pointer() {
446        let (_t, store) = fixture();
447        let input = handshake_input()
448            .text("command=clean")
449            .text("pathname=hello.bin")
450            .flush()
451            .data(b"hello world\n")
452            .flush()
453            .build();
454        let output = run(&store, input);
455
456        // Skip past handshake (6 tokens) and find the response.
457        let toks = decode(&output);
458        let rest = &toks[6..];
459        assert_eq!(rest[0], Tok::Text("status=success".into()));
460        assert_eq!(rest[1], Tok::Flush);
461        // Next packet(s) are the pointer text. Should fit in one packet.
462        if let Tok::Text(t) = &rest[2] {
463            assert!(t.starts_with("version https://git-lfs.github.com/spec/v1\n"));
464            assert!(t.contains("oid sha256:"));
465            assert!(t.contains("size 12"));
466        } else {
467            panic!("expected text pointer, got {:?}", rest[2]);
468        }
469        assert_eq!(rest[3], Tok::Flush); // end-of-content
470        assert_eq!(rest[4], Tok::Text("status=success".into()));
471        assert_eq!(rest[5], Tok::Flush);
472    }
473
474    #[test]
475    fn smudge_request_emits_content() {
476        let (_t, store) = fixture();
477        // Pre-populate the store via clean(), then ask filter-process to smudge.
478        let mut pointer = Vec::new();
479        clean(&store, &mut { &b"smudge a\n"[..] }, &mut pointer, "", &[]).unwrap();
480
481        let input = handshake_input()
482            .text("command=smudge")
483            .text("pathname=a.dat")
484            .flush()
485            .data(&pointer)
486            .flush()
487            .build();
488        let output = run(&store, input);
489        let toks = decode(&output);
490        let rest = &toks[6..];
491        assert_eq!(rest[0], Tok::Text("status=success".into()));
492        assert_eq!(rest[1], Tok::Flush);
493        // Content "smudge a\n" is short text, so it'll round-trip as a Text token.
494        assert_eq!(rest[2], Tok::Text("smudge a".into()));
495        assert_eq!(rest[3], Tok::Flush);
496        assert_eq!(rest[4], Tok::Text("status=success".into()));
497    }
498
499    #[test]
500    fn smudge_missing_object_emits_status_error() {
501        let (_t, store) = fixture();
502        let unknown = "0000000000000000000000000000000000000000000000000000000000000001";
503        let pointer = format!("version {VERSION_LATEST}\noid sha256:{unknown}\nsize 5\n");
504        let input = handshake_input()
505            .text("command=smudge")
506            .text("pathname=missing.dat")
507            .flush()
508            .data(pointer.as_bytes())
509            .flush()
510            .build();
511        let output = run(&store, input);
512        let toks = decode(&output);
513        let rest = &toks[6..];
514        assert_eq!(rest[0], Tok::Text("status=success".into())); // initial
515        assert_eq!(rest[1], Tok::Flush);
516        // No content was written; next is end-of-content flush, then error trailer.
517        assert_eq!(rest[2], Tok::Flush);
518        assert_eq!(rest[3], Tok::Text("status=error".into()));
519        assert_eq!(rest[4], Tok::Flush);
520    }
521
522    #[test]
523    fn smudge_invokes_fetcher_when_object_missing() {
524        let (_t, store) = fixture();
525        let content = b"fetched on demand\n";
526        // Build the pointer text by cleaning, then wipe the object so the
527        // smudge will miss and exercise the fetch path.
528        let mut pointer = Vec::new();
529        clean(&store, &mut { &content[..] }, &mut pointer, "", &[]).unwrap();
530        let parsed = git_lfs_pointer::Pointer::parse(&pointer).unwrap();
531        std::fs::remove_file(store.object_path(parsed.oid)).unwrap();
532
533        let input = handshake_input()
534            .text("command=smudge")
535            .text("pathname=a.dat")
536            .flush()
537            .data(&pointer)
538            .flush()
539            .build();
540
541        let mut output = Vec::new();
542        let store_ref = &store;
543        filter_process(
544            &store,
545            Cursor::new(input),
546            &mut output,
547            |p: &Pointer| {
548                // Stand in for a real download — re-insert the bytes.
549                store_ref.insert(&mut { &content[..] }).unwrap();
550                assert_eq!(p.oid, parsed.oid);
551                Ok(())
552            },
553            false,
554            &[],
555            &[],
556            &|_| true,
557        )
558        .unwrap();
559
560        let toks = decode(&output);
561        let rest = &toks[6..];
562        assert_eq!(rest[0], Tok::Text("status=success".into()));
563        assert_eq!(rest[1], Tok::Flush);
564        // Content "fetched on demand\n" comes back as a Text token.
565        assert_eq!(rest[2], Tok::Text("fetched on demand".into()));
566        assert_eq!(rest[3], Tok::Flush);
567        assert_eq!(rest[4], Tok::Text("status=success".into()));
568    }
569
570    #[test]
571    fn multiple_requests_in_one_session() {
572        let (_t, store) = fixture();
573        let input = handshake_input()
574            .text("command=clean")
575            .text("pathname=a.bin")
576            .flush()
577            .data(b"AAA")
578            .flush()
579            .text("command=clean")
580            .text("pathname=b.bin")
581            .flush()
582            .data(b"BBB")
583            .flush()
584            .build();
585        let output = run(&store, input);
586        let toks = decode(&output);
587        // Handshake is 6 tokens; each clean response is 6 tokens.
588        // (status=success, flush, content, flush, status=success, flush)
589        assert_eq!(toks.len(), 6 + 6 + 6, "got tokens: {toks:?}");
590    }
591}