Skip to main content

git_lfs_filter/
filter_process.rs

1//! The git filter-process protocol.
2//!
3//! Implements the long-running-filter side of git's `gitprotocol-common`(5)
4//! framing: a one-time handshake + capability negotiation, then a loop of
5//! request/response pairs over packet-line framing on stdin/stdout. Same
6//! business logic as per-invocation `clean`/`smudge`, just batched in one
7//! subprocess for the duration of a checkout/commit.
8
9use std::collections::HashMap;
10use std::io::{self, Read, Write};
11
12use git_lfs_git::pktline;
13use git_lfs_pointer::Pointer;
14use git_lfs_store::Store;
15
16use crate::{CleanExtension, FetchError, SmudgeExtension, SmudgeOutcome, clean, smudge_with_fetch};
17
18/// Things that can go wrong while running [`filter_process`].
19#[derive(Debug, thiserror::Error)]
20pub enum FilterProcessError {
21    /// Underlying pipe I/O failed (typically because git closed its
22    /// end mid-session).
23    #[error(transparent)]
24    Io(#[from] io::Error),
25    /// The initial capability-negotiation handshake didn't match
26    /// what the filter-process protocol expects.
27    #[error("filter-process handshake: {0}")]
28    Handshake(String),
29    /// A request was missing a required header (`command`, etc.).
30    #[error("filter-process: missing required header {0:?}")]
31    MissingHeader(&'static str),
32    /// Git asked for a command we don't recognize.
33    #[error("filter-process: unknown command {0:?}")]
34    UnknownCommand(String),
35}
36
37/// Run the filter-process protocol against `input`/`output` (typically
38/// stdin/stdout). Returns when git closes its end of the pipe.
39///
40/// `fetch` is called when a smudge request hits an object that isn't in
41/// the local store; see [`smudge_with_fetch`] for semantics. If fetching
42/// is not supported in the caller's context, pass a closure that always
43/// errors — the protocol will then surface those smudges as `status=error`
44/// to git, same as if [`smudge`](crate::smudge) hit `ObjectMissing`.
45///
46/// `skip_smudge` reflects the upstream `GIT_LFS_SKIP_SMUDGE` env var.
47/// When true, smudge requests pass the pointer text through
48/// unchanged: the working-tree file ends up holding pointer text
49/// and `git lfs pull` (or another smudge run) is the recovery
50/// path. Clean requests are unaffected.
51#[allow(clippy::too_many_arguments)]
52pub fn filter_process<R, W, F>(
53    store: &Store,
54    input: R,
55    output: W,
56    mut fetch: F,
57    skip_smudge: bool,
58    clean_extensions: &[CleanExtension],
59    smudge_extensions: &[SmudgeExtension],
60    smudge_path_filter: &dyn Fn(&str) -> bool,
61) -> Result<(), FilterProcessError>
62where
63    R: Read,
64    W: Write,
65    F: FnMut(&Pointer) -> Result<(), FetchError>,
66{
67    let mut reader = pktline::Reader::new(input);
68    let mut writer = pktline::Writer::new(output);
69
70    handshake(&mut reader, &mut writer)?;
71
72    let mut malformed: Vec<String> = Vec::new();
73
74    loop {
75        // A read error here at packet-boundary normally means git closed the
76        // pipe — that's the protocol's "we're done" signal, not a real error.
77        let headers = match read_headers(&mut reader) {
78            Ok(Some(h)) => h,
79            Ok(None) => break,
80            Err(FilterProcessError::Io(e)) if e.kind() == io::ErrorKind::UnexpectedEof => {
81                break;
82            }
83            Err(e) => return Err(e),
84        };
85
86        let payload = read_payload(&mut reader)?;
87        let command = headers
88            .get("command")
89            .ok_or(FilterProcessError::MissingHeader("command"))?
90            .clone();
91        let pathname = headers.get("pathname").map(String::as_str).unwrap_or("");
92
93        match command.as_str() {
94            "clean" => process_clean(store, &mut writer, &payload, pathname, clean_extensions)?,
95            "smudge" if skip_smudge => process_smudge_passthrough(&mut writer, &payload)?,
96            // `lfs.fetchinclude` / `lfs.fetchexclude` excluded the
97            // path: pass the pointer text through unchanged. Mirrors
98            // upstream's clone-time include/exclude (test 2 of
99            // t-filter-process); the user can run `git lfs pull`
100            // later to materialize.
101            "smudge" if !smudge_path_filter(pathname) => {
102                process_smudge_passthrough(&mut writer, &payload)?;
103            }
104            "smudge" => {
105                let outcome = process_smudge(
106                    store,
107                    &mut writer,
108                    &payload,
109                    pathname,
110                    smudge_extensions,
111                    &mut fetch,
112                )?;
113                if matches!(outcome, Some(SmudgeOutcome::Passthrough)) {
114                    malformed.push(pathname.to_owned());
115                }
116            }
117            other => return Err(FilterProcessError::UnknownCommand(other.into())),
118        }
119        writer.flush()?;
120    }
121
122    if !malformed.is_empty() {
123        report_malformed(&malformed);
124    }
125    Ok(())
126}
127
128fn report_malformed(malformed: &[String]) {
129    let stderr = io::stderr();
130    let mut out = stderr.lock();
131    let header = if malformed.len() == 1 {
132        format!(
133            "Encountered {} file that should have been a pointer, but wasn't:\n",
134            malformed.len()
135        )
136    } else {
137        format!(
138            "Encountered {} files that should have been pointers, but weren't:\n",
139            malformed.len()
140        )
141    };
142    let _ = out.write_all(header.as_bytes());
143    for name in malformed {
144        let _ = writeln!(out, "\t{name}");
145    }
146}
147
148fn handshake<R: Read, W: Write>(
149    reader: &mut pktline::Reader<R>,
150    writer: &mut pktline::Writer<W>,
151) -> Result<(), FilterProcessError> {
152    // Welcome.
153    let welcome = reader
154        .read_text()?
155        .ok_or_else(|| FilterProcessError::Handshake("expected welcome, got flush".into()))?;
156    if welcome != "git-filter-client" {
157        return Err(FilterProcessError::Handshake(format!(
158            "expected git-filter-client, got {welcome:?}"
159        )));
160    }
161    let mut versions = Vec::new();
162    while let Some(line) = reader.read_text()? {
163        versions.push(line);
164    }
165    if !versions.iter().any(|v| v == "version=2") {
166        return Err(FilterProcessError::Handshake(format!(
167            "client doesn't advertise version=2 (got {versions:?})"
168        )));
169    }
170    writer.write_text("git-filter-server")?;
171    writer.write_text("version=2")?;
172    writer.write_flush()?;
173
174    // Send our capabilities *before* reading the client's. The protocol
175    // doc reads as if the client speaks first ("capability=…" then a
176    // flush, then server replies), but real git serializes the two
177    // exchanges and won't send its capabilities until it has seen the
178    // server's. Upstream Go's filter-process advertises preemptively for
179    // the same reason — diverging from that reordering deadlocks any
180    // shell-test that does `git add` of an LFS-tracked path.
181    writer.write_text("capability=clean")?;
182    writer.write_text("capability=smudge")?;
183    writer.write_flush()?;
184    writer.flush()?;
185
186    // Drain the client's advertised capabilities (informational — we
187    // don't gate on them). We *do* require clean + smudge to be in the
188    // set so that misconfigured callers get a clear error rather than
189    // silent "command not understood" later.
190    let mut caps = Vec::new();
191    while let Some(line) = reader.read_text()? {
192        caps.push(line);
193    }
194    for required in ["capability=clean", "capability=smudge"] {
195        if !caps.iter().any(|c| c == required) {
196            return Err(FilterProcessError::Handshake(format!(
197                "client missing required {required} (got {caps:?})"
198            )));
199        }
200    }
201
202    Ok(())
203}
204
205fn read_headers<R: Read>(
206    reader: &mut pktline::Reader<R>,
207) -> Result<Option<HashMap<String, String>>, FilterProcessError> {
208    let first = reader.read_text()?;
209    let Some(first) = first else {
210        // Bare flush at top of loop is unexpected from git; treat as shutdown.
211        return Ok(None);
212    };
213    let mut map = HashMap::new();
214    insert_kv(&mut map, &first);
215    while let Some(line) = reader.read_text()? {
216        insert_kv(&mut map, &line);
217    }
218    Ok(Some(map))
219}
220
221fn insert_kv(map: &mut HashMap<String, String>, line: &str) {
222    if let Some((k, v)) = line.split_once('=') {
223        map.insert(k.to_owned(), v.to_owned());
224    }
225}
226
227fn read_payload<R: Read>(reader: &mut pktline::Reader<R>) -> Result<Vec<u8>, FilterProcessError> {
228    let mut payload = Vec::new();
229    while let Some(packet) = reader.read_packet()? {
230        payload.extend_from_slice(&packet);
231    }
232    Ok(payload)
233}
234
235/// Run one clean request through the protocol envelope:
236/// `status=success` + flush, content packets + flush, final `status=...` + flush.
237fn process_clean<W: Write>(
238    store: &Store,
239    writer: &mut pktline::Writer<W>,
240    payload: &[u8],
241    pathname: &str,
242    extensions: &[CleanExtension],
243) -> Result<(), FilterProcessError> {
244    write_initial_status(writer)?;
245    let result = run_through_sink(writer, |sink| {
246        clean(store, &mut { payload }, sink, pathname, extensions)
247            .map(|_| ())
248            .map_err(|e| io::Error::other(e.to_string()))
249    });
250    write_final_status(writer, result.is_ok())?;
251    Ok(())
252}
253
254/// `GIT_LFS_SKIP_SMUDGE=1` mode: emit the pointer payload unchanged.
255fn process_smudge_passthrough<W: Write>(
256    writer: &mut pktline::Writer<W>,
257    payload: &[u8],
258) -> Result<(), FilterProcessError> {
259    write_initial_status(writer)?;
260    let result = run_through_sink(writer, |sink| sink.write_all(payload));
261    write_final_status(writer, result.is_ok())?;
262    Ok(())
263}
264
265fn process_smudge<W, F>(
266    store: &Store,
267    writer: &mut pktline::Writer<W>,
268    payload: &[u8],
269    pathname: &str,
270    smudge_extensions: &[SmudgeExtension],
271    fetch: &mut F,
272) -> Result<Option<SmudgeOutcome>, FilterProcessError>
273where
274    W: Write,
275    F: FnMut(&Pointer) -> Result<(), FetchError>,
276{
277    write_initial_status(writer)?;
278    let mut outcome: Option<SmudgeOutcome> = None;
279    let result = run_through_sink(writer, |sink| {
280        // The protocol only differentiates success vs. error at this layer;
281        // the specific reason (ObjectMissing, FetchFailed, ExtensionFailed,
282        // …) is logged by the caller's stderr if they care.
283        match smudge_with_fetch(
284            store,
285            &mut { payload },
286            sink,
287            pathname,
288            smudge_extensions,
289            |p| fetch(p),
290        ) {
291            Ok(o) => {
292                outcome = Some(o);
293                Ok(())
294            }
295            Err(e) => Err(io::Error::other(e.to_string())),
296        }
297    });
298    write_final_status(writer, result.is_ok())?;
299    Ok(outcome)
300}
301
302fn write_initial_status<W: Write>(writer: &mut pktline::Writer<W>) -> io::Result<()> {
303    writer.write_text("status=success")?;
304    writer.write_flush()
305}
306
307fn write_final_status<W: Write>(writer: &mut pktline::Writer<W>, ok: bool) -> io::Result<()> {
308    // End-of-content flush comes from the sink runner; this is the
309    // post-content "trailer" status that tells git "all done, no errors"
310    // (or "I lied, error happened").
311    writer.write_text(if ok { "status=success" } else { "status=error" })?;
312    writer.write_flush()
313}
314
315/// Runs `f` with a packet-line sink, then flushes the sink and emits the
316/// end-of-content flush regardless of `f`'s result. The result of `f` is
317/// returned for the caller's status decision.
318fn run_through_sink<W, F>(writer: &mut pktline::Writer<W>, f: F) -> io::Result<()>
319where
320    W: Write,
321    F: FnOnce(&mut pktline::Sink<'_, W>) -> io::Result<()>,
322{
323    let result = {
324        let mut sink = pktline::Sink::new(writer);
325        let r = f(&mut sink);
326        sink.flush()?;
327        r
328    };
329    writer.write_flush()?;
330    result
331}
332
333#[cfg(test)]
334mod tests {
335    use super::*;
336    use git_lfs_pointer::VERSION_LATEST;
337    use std::io::Cursor;
338    use tempfile::TempDir;
339
340    fn fixture() -> (TempDir, Store) {
341        let tmp = TempDir::new().unwrap();
342        let store = Store::new(tmp.path().join("lfs"));
343        (tmp, store)
344    }
345
346    /// Build a stream of pktline packets the way git would send them.
347    struct PktBuilder(Vec<u8>);
348
349    impl PktBuilder {
350        fn new() -> Self {
351            Self(Vec::new())
352        }
353        fn text(mut self, s: &str) -> Self {
354            let body = format!("{s}\n");
355            let total = body.len() + 4;
356            self.0.extend_from_slice(format!("{total:04x}").as_bytes());
357            self.0.extend_from_slice(body.as_bytes());
358            self
359        }
360        fn data(mut self, b: &[u8]) -> Self {
361            let total = b.len() + 4;
362            self.0.extend_from_slice(format!("{total:04x}").as_bytes());
363            self.0.extend_from_slice(b);
364            self
365        }
366        fn flush(mut self) -> Self {
367            self.0.extend_from_slice(b"0000");
368            self
369        }
370        fn build(self) -> Vec<u8> {
371            self.0
372        }
373    }
374
375    /// Decode the response stream into a flat Vec of "packet or flush" tokens
376    /// for assertions.
377    #[derive(Debug, PartialEq)]
378    enum Tok {
379        Text(String),
380        Bin(Vec<u8>),
381        Flush,
382    }
383
384    fn decode(bytes: &[u8]) -> Vec<Tok> {
385        let mut r = pktline::Reader::new(Cursor::new(bytes));
386        let mut out = Vec::new();
387        loop {
388            match r.read_packet() {
389                Ok(Some(p)) => match String::from_utf8(p.clone()) {
390                    Ok(s) => out.push(Tok::Text(s.trim_end_matches('\n').to_owned())),
391                    Err(_) => out.push(Tok::Bin(p)),
392                },
393                Ok(None) => out.push(Tok::Flush),
394                Err(e) if e.kind() == io::ErrorKind::UnexpectedEof => return out,
395                Err(e) => panic!("decode error: {e}"),
396            }
397        }
398    }
399
400    fn handshake_input() -> PktBuilder {
401        PktBuilder::new()
402            .text("git-filter-client")
403            .text("version=2")
404            .flush()
405            .text("capability=clean")
406            .text("capability=smudge")
407            .flush()
408    }
409
410    /// Fetcher that always errors — the right default for tests where
411    /// we don't expect any miss. Lies surface as `status=error` to git.
412    fn no_fetch(_p: &Pointer) -> Result<(), FetchError> {
413        Err("test: no fetcher configured".into())
414    }
415
416    fn run(store: &Store, input: Vec<u8>) -> Vec<u8> {
417        let mut output = Vec::new();
418        filter_process(
419            store,
420            Cursor::new(input),
421            &mut output,
422            no_fetch,
423            false,
424            &[],
425            &[],
426            &|_| true,
427        )
428        .unwrap();
429        output
430    }
431
432    #[test]
433    fn handshake_only_then_clean_shutdown() {
434        let (_t, store) = fixture();
435        let output = run(&store, handshake_input().build());
436        let toks = decode(&output);
437        // Server welcome + 2 caps + their respective flushes.
438        assert_eq!(
439            toks,
440            vec![
441                Tok::Text("git-filter-server".into()),
442                Tok::Text("version=2".into()),
443                Tok::Flush,
444                Tok::Text("capability=clean".into()),
445                Tok::Text("capability=smudge".into()),
446                Tok::Flush,
447            ],
448        );
449    }
450
451    #[test]
452    fn clean_request_emits_pointer() {
453        let (_t, store) = fixture();
454        let input = handshake_input()
455            .text("command=clean")
456            .text("pathname=hello.bin")
457            .flush()
458            .data(b"hello world\n")
459            .flush()
460            .build();
461        let output = run(&store, input);
462
463        // Skip past handshake (6 tokens) and find the response.
464        let toks = decode(&output);
465        let rest = &toks[6..];
466        assert_eq!(rest[0], Tok::Text("status=success".into()));
467        assert_eq!(rest[1], Tok::Flush);
468        // Next packet(s) are the pointer text. Should fit in one packet.
469        if let Tok::Text(t) = &rest[2] {
470            assert!(t.starts_with("version https://git-lfs.github.com/spec/v1\n"));
471            assert!(t.contains("oid sha256:"));
472            assert!(t.contains("size 12"));
473        } else {
474            panic!("expected text pointer, got {:?}", rest[2]);
475        }
476        assert_eq!(rest[3], Tok::Flush); // end-of-content
477        assert_eq!(rest[4], Tok::Text("status=success".into()));
478        assert_eq!(rest[5], Tok::Flush);
479    }
480
481    #[test]
482    fn smudge_request_emits_content() {
483        let (_t, store) = fixture();
484        // Pre-populate the store via clean(), then ask filter-process to smudge.
485        let mut pointer = Vec::new();
486        clean(&store, &mut { &b"smudge a\n"[..] }, &mut pointer, "", &[]).unwrap();
487
488        let input = handshake_input()
489            .text("command=smudge")
490            .text("pathname=a.dat")
491            .flush()
492            .data(&pointer)
493            .flush()
494            .build();
495        let output = run(&store, input);
496        let toks = decode(&output);
497        let rest = &toks[6..];
498        assert_eq!(rest[0], Tok::Text("status=success".into()));
499        assert_eq!(rest[1], Tok::Flush);
500        // Content "smudge a\n" is short text, so it'll round-trip as a Text token.
501        assert_eq!(rest[2], Tok::Text("smudge a".into()));
502        assert_eq!(rest[3], Tok::Flush);
503        assert_eq!(rest[4], Tok::Text("status=success".into()));
504    }
505
506    #[test]
507    fn smudge_missing_object_emits_status_error() {
508        let (_t, store) = fixture();
509        let unknown = "0000000000000000000000000000000000000000000000000000000000000001";
510        let pointer = format!("version {VERSION_LATEST}\noid sha256:{unknown}\nsize 5\n");
511        let input = handshake_input()
512            .text("command=smudge")
513            .text("pathname=missing.dat")
514            .flush()
515            .data(pointer.as_bytes())
516            .flush()
517            .build();
518        let output = run(&store, input);
519        let toks = decode(&output);
520        let rest = &toks[6..];
521        assert_eq!(rest[0], Tok::Text("status=success".into())); // initial
522        assert_eq!(rest[1], Tok::Flush);
523        // No content was written; next is end-of-content flush, then error trailer.
524        assert_eq!(rest[2], Tok::Flush);
525        assert_eq!(rest[3], Tok::Text("status=error".into()));
526        assert_eq!(rest[4], Tok::Flush);
527    }
528
529    #[test]
530    fn smudge_invokes_fetcher_when_object_missing() {
531        let (_t, store) = fixture();
532        let content = b"fetched on demand\n";
533        // Build the pointer text by cleaning, then wipe the object so the
534        // smudge will miss and exercise the fetch path.
535        let mut pointer = Vec::new();
536        clean(&store, &mut { &content[..] }, &mut pointer, "", &[]).unwrap();
537        let parsed = git_lfs_pointer::Pointer::parse(&pointer).unwrap();
538        std::fs::remove_file(store.object_path(parsed.oid)).unwrap();
539
540        let input = handshake_input()
541            .text("command=smudge")
542            .text("pathname=a.dat")
543            .flush()
544            .data(&pointer)
545            .flush()
546            .build();
547
548        let mut output = Vec::new();
549        let store_ref = &store;
550        filter_process(
551            &store,
552            Cursor::new(input),
553            &mut output,
554            |p: &Pointer| {
555                // Stand in for a real download — re-insert the bytes.
556                store_ref.insert(&mut { &content[..] }).unwrap();
557                assert_eq!(p.oid, parsed.oid);
558                Ok(())
559            },
560            false,
561            &[],
562            &[],
563            &|_| true,
564        )
565        .unwrap();
566
567        let toks = decode(&output);
568        let rest = &toks[6..];
569        assert_eq!(rest[0], Tok::Text("status=success".into()));
570        assert_eq!(rest[1], Tok::Flush);
571        // Content "fetched on demand\n" comes back as a Text token.
572        assert_eq!(rest[2], Tok::Text("fetched on demand".into()));
573        assert_eq!(rest[3], Tok::Flush);
574        assert_eq!(rest[4], Tok::Text("status=success".into()));
575    }
576
577    #[test]
578    fn multiple_requests_in_one_session() {
579        let (_t, store) = fixture();
580        let input = handshake_input()
581            .text("command=clean")
582            .text("pathname=a.bin")
583            .flush()
584            .data(b"AAA")
585            .flush()
586            .text("command=clean")
587            .text("pathname=b.bin")
588            .flush()
589            .data(b"BBB")
590            .flush()
591            .build();
592        let output = run(&store, input);
593        let toks = decode(&output);
594        // Handshake is 6 tokens; each clean response is 6 tokens.
595        // (status=success, flush, content, flush, status=success, flush)
596        assert_eq!(toks.len(), 6 + 6 + 6, "got tokens: {toks:?}");
597    }
598}