Skip to main content

git_lfs_filter/
filter_process.rs

1//! The git filter-process protocol.
2//!
3//! Implements the long-running-filter side of git's `gitprotocol-common`(5)
4//! framing: a one-time handshake + capability negotiation, then a loop of
5//! request/response pairs over packet-line framing on stdin/stdout. Same
6//! business logic as per-invocation `clean`/`smudge`, just batched in one
7//! subprocess for the duration of a checkout/commit.
8
9use std::collections::HashMap;
10use std::io::{self, Read, Write};
11
12use git_lfs_git::pktline;
13use git_lfs_pointer::Pointer;
14use git_lfs_store::Store;
15
16use crate::{CleanExtension, FetchError, clean, smudge_with_fetch};
17
18#[derive(Debug, thiserror::Error)]
19pub enum FilterProcessError {
20    #[error(transparent)]
21    Io(#[from] io::Error),
22    #[error("filter-process handshake: {0}")]
23    Handshake(String),
24    #[error("filter-process: missing required header {0:?}")]
25    MissingHeader(&'static str),
26    #[error("filter-process: unknown command {0:?}")]
27    UnknownCommand(String),
28}
29
30/// Run the filter-process protocol against `input`/`output` (typically
31/// stdin/stdout). Returns when git closes its end of the pipe.
32///
33/// `fetch` is called when a smudge request hits an object that isn't in
34/// the local store; see [`smudge_with_fetch`] for semantics. If fetching
35/// is not supported in the caller's context, pass a closure that always
36/// errors — the protocol will then surface those smudges as `status=error`
37/// to git, same as if [`smudge`](crate::smudge) hit `ObjectMissing`.
38///
39/// `skip_smudge` reflects the upstream `GIT_LFS_SKIP_SMUDGE` env var.
40/// When true, smudge requests pass the pointer text through unchanged
41/// — the working-tree file ends up holding pointer text and `git lfs
42/// pull` (or another smudge run) is the recovery path. Clean requests
43/// are unaffected.
44pub fn filter_process<R, W, F>(
45    store: &Store,
46    input: R,
47    output: W,
48    mut fetch: F,
49    skip_smudge: bool,
50    extensions: &[CleanExtension],
51) -> Result<(), FilterProcessError>
52where
53    R: Read,
54    W: Write,
55    F: FnMut(&Pointer) -> Result<(), FetchError>,
56{
57    let mut reader = pktline::Reader::new(input);
58    let mut writer = pktline::Writer::new(output);
59
60    handshake(&mut reader, &mut writer)?;
61
62    loop {
63        // A read error here at packet-boundary normally means git closed the
64        // pipe — that's the protocol's "we're done" signal, not a real error.
65        let headers = match read_headers(&mut reader) {
66            Ok(Some(h)) => h,
67            Ok(None) => return Ok(()),
68            Err(FilterProcessError::Io(e)) if e.kind() == io::ErrorKind::UnexpectedEof => {
69                return Ok(());
70            }
71            Err(e) => return Err(e),
72        };
73
74        let payload = read_payload(&mut reader)?;
75        let command = headers
76            .get("command")
77            .ok_or(FilterProcessError::MissingHeader("command"))?
78            .clone();
79        let pathname = headers.get("pathname").map(String::as_str).unwrap_or("");
80
81        match command.as_str() {
82            "clean" => process_clean(store, &mut writer, &payload, pathname, extensions)?,
83            "smudge" if skip_smudge => process_smudge_passthrough(&mut writer, &payload)?,
84            "smudge" => process_smudge(store, &mut writer, &payload, &mut fetch)?,
85            other => return Err(FilterProcessError::UnknownCommand(other.into())),
86        }
87        writer.flush()?;
88    }
89}
90
91fn handshake<R: Read, W: Write>(
92    reader: &mut pktline::Reader<R>,
93    writer: &mut pktline::Writer<W>,
94) -> Result<(), FilterProcessError> {
95    // Welcome.
96    let welcome = reader
97        .read_text()?
98        .ok_or_else(|| FilterProcessError::Handshake("expected welcome, got flush".into()))?;
99    if welcome != "git-filter-client" {
100        return Err(FilterProcessError::Handshake(format!(
101            "expected git-filter-client, got {welcome:?}"
102        )));
103    }
104    let mut versions = Vec::new();
105    while let Some(line) = reader.read_text()? {
106        versions.push(line);
107    }
108    if !versions.iter().any(|v| v == "version=2") {
109        return Err(FilterProcessError::Handshake(format!(
110            "client doesn't advertise version=2 (got {versions:?})"
111        )));
112    }
113    writer.write_text("git-filter-server")?;
114    writer.write_text("version=2")?;
115    writer.write_flush()?;
116
117    // Send our capabilities *before* reading the client's. The protocol
118    // doc reads as if the client speaks first ("capability=…" then a
119    // flush, then server replies), but real git serializes the two
120    // exchanges and won't send its capabilities until it has seen the
121    // server's. Upstream Go's filter-process advertises preemptively for
122    // the same reason — diverging from that reordering deadlocks any
123    // shell-test that does `git add` of an LFS-tracked path.
124    writer.write_text("capability=clean")?;
125    writer.write_text("capability=smudge")?;
126    writer.write_flush()?;
127    writer.flush()?;
128
129    // Drain the client's advertised capabilities (informational — we
130    // don't gate on them). We *do* require clean + smudge to be in the
131    // set so that misconfigured callers get a clear error rather than
132    // silent "command not understood" later.
133    let mut caps = Vec::new();
134    while let Some(line) = reader.read_text()? {
135        caps.push(line);
136    }
137    for required in ["capability=clean", "capability=smudge"] {
138        if !caps.iter().any(|c| c == required) {
139            return Err(FilterProcessError::Handshake(format!(
140                "client missing required {required} (got {caps:?})"
141            )));
142        }
143    }
144
145    Ok(())
146}
147
148fn read_headers<R: Read>(
149    reader: &mut pktline::Reader<R>,
150) -> Result<Option<HashMap<String, String>>, FilterProcessError> {
151    let first = reader.read_text()?;
152    let Some(first) = first else {
153        // Bare flush at top of loop is unexpected from git; treat as shutdown.
154        return Ok(None);
155    };
156    let mut map = HashMap::new();
157    insert_kv(&mut map, &first);
158    while let Some(line) = reader.read_text()? {
159        insert_kv(&mut map, &line);
160    }
161    Ok(Some(map))
162}
163
164fn insert_kv(map: &mut HashMap<String, String>, line: &str) {
165    if let Some((k, v)) = line.split_once('=') {
166        map.insert(k.to_owned(), v.to_owned());
167    }
168}
169
170fn read_payload<R: Read>(reader: &mut pktline::Reader<R>) -> Result<Vec<u8>, FilterProcessError> {
171    let mut payload = Vec::new();
172    while let Some(packet) = reader.read_packet()? {
173        payload.extend_from_slice(&packet);
174    }
175    Ok(payload)
176}
177
178/// Run one clean request through the protocol envelope:
179/// `status=success` + flush, content packets + flush, final `status=...` + flush.
180fn process_clean<W: Write>(
181    store: &Store,
182    writer: &mut pktline::Writer<W>,
183    payload: &[u8],
184    pathname: &str,
185    extensions: &[CleanExtension],
186) -> Result<(), FilterProcessError> {
187    write_initial_status(writer)?;
188    let result = run_through_sink(writer, |sink| {
189        clean(store, &mut { payload }, sink, pathname, extensions)
190            .map(|_| ())
191            .map_err(|e| io::Error::other(e.to_string()))
192    });
193    write_final_status(writer, result.is_ok())?;
194    Ok(())
195}
196
197/// `GIT_LFS_SKIP_SMUDGE=1` mode: emit the pointer payload unchanged.
198fn process_smudge_passthrough<W: Write>(
199    writer: &mut pktline::Writer<W>,
200    payload: &[u8],
201) -> Result<(), FilterProcessError> {
202    write_initial_status(writer)?;
203    let result = run_through_sink(writer, |sink| sink.write_all(payload));
204    write_final_status(writer, result.is_ok())?;
205    Ok(())
206}
207
208fn process_smudge<W, F>(
209    store: &Store,
210    writer: &mut pktline::Writer<W>,
211    payload: &[u8],
212    fetch: &mut F,
213) -> Result<(), FilterProcessError>
214where
215    W: Write,
216    F: FnMut(&Pointer) -> Result<(), FetchError>,
217{
218    write_initial_status(writer)?;
219    let result = run_through_sink(writer, |sink| {
220        // The protocol only differentiates success vs. error at this layer;
221        // the specific reason (ObjectMissing, Extensions, FetchFailed, …) is
222        // logged by the caller's stderr if they care.
223        smudge_with_fetch(store, &mut { payload }, sink, |p| fetch(p))
224            .map(|_| ())
225            .map_err(|e| io::Error::other(e.to_string()))
226    });
227    write_final_status(writer, result.is_ok())?;
228    Ok(())
229}
230
231fn write_initial_status<W: Write>(writer: &mut pktline::Writer<W>) -> io::Result<()> {
232    writer.write_text("status=success")?;
233    writer.write_flush()
234}
235
236fn write_final_status<W: Write>(writer: &mut pktline::Writer<W>, ok: bool) -> io::Result<()> {
237    // End-of-content flush comes from the sink runner; this is the
238    // post-content "trailer" status that tells git "all done, no errors"
239    // (or "I lied, error happened").
240    writer.write_text(if ok { "status=success" } else { "status=error" })?;
241    writer.write_flush()
242}
243
244/// Runs `f` with a packet-line sink, then flushes the sink and emits the
245/// end-of-content flush regardless of `f`'s result. The result of `f` is
246/// returned for the caller's status decision.
247fn run_through_sink<W, F>(writer: &mut pktline::Writer<W>, f: F) -> io::Result<()>
248where
249    W: Write,
250    F: FnOnce(&mut pktline::Sink<'_, W>) -> io::Result<()>,
251{
252    let result = {
253        let mut sink = pktline::Sink::new(writer);
254        let r = f(&mut sink);
255        sink.flush()?;
256        r
257    };
258    writer.write_flush()?;
259    result
260}
261
262#[cfg(test)]
263mod tests {
264    use super::*;
265    use git_lfs_pointer::VERSION_LATEST;
266    use std::io::Cursor;
267    use tempfile::TempDir;
268
269    fn fixture() -> (TempDir, Store) {
270        let tmp = TempDir::new().unwrap();
271        let store = Store::new(tmp.path().join("lfs"));
272        (tmp, store)
273    }
274
275    /// Build a stream of pktline packets the way git would send them.
276    struct PktBuilder(Vec<u8>);
277
278    impl PktBuilder {
279        fn new() -> Self {
280            Self(Vec::new())
281        }
282        fn text(mut self, s: &str) -> Self {
283            let body = format!("{s}\n");
284            let total = body.len() + 4;
285            self.0.extend_from_slice(format!("{total:04x}").as_bytes());
286            self.0.extend_from_slice(body.as_bytes());
287            self
288        }
289        fn data(mut self, b: &[u8]) -> Self {
290            let total = b.len() + 4;
291            self.0.extend_from_slice(format!("{total:04x}").as_bytes());
292            self.0.extend_from_slice(b);
293            self
294        }
295        fn flush(mut self) -> Self {
296            self.0.extend_from_slice(b"0000");
297            self
298        }
299        fn build(self) -> Vec<u8> {
300            self.0
301        }
302    }
303
304    /// Decode the response stream into a flat Vec of "packet or flush" tokens
305    /// for assertions.
306    #[derive(Debug, PartialEq)]
307    enum Tok {
308        Text(String),
309        Bin(Vec<u8>),
310        Flush,
311    }
312
313    fn decode(bytes: &[u8]) -> Vec<Tok> {
314        let mut r = pktline::Reader::new(Cursor::new(bytes));
315        let mut out = Vec::new();
316        loop {
317            match r.read_packet() {
318                Ok(Some(p)) => match String::from_utf8(p.clone()) {
319                    Ok(s) => out.push(Tok::Text(s.trim_end_matches('\n').to_owned())),
320                    Err(_) => out.push(Tok::Bin(p)),
321                },
322                Ok(None) => out.push(Tok::Flush),
323                Err(e) if e.kind() == io::ErrorKind::UnexpectedEof => return out,
324                Err(e) => panic!("decode error: {e}"),
325            }
326        }
327    }
328
329    fn handshake_input() -> PktBuilder {
330        PktBuilder::new()
331            .text("git-filter-client")
332            .text("version=2")
333            .flush()
334            .text("capability=clean")
335            .text("capability=smudge")
336            .flush()
337    }
338
339    /// Fetcher that always errors — the right default for tests where
340    /// we don't expect any miss. Lies surface as `status=error` to git.
341    fn no_fetch(_p: &Pointer) -> Result<(), FetchError> {
342        Err("test: no fetcher configured".into())
343    }
344
345    fn run(store: &Store, input: Vec<u8>) -> Vec<u8> {
346        let mut output = Vec::new();
347        filter_process(store, Cursor::new(input), &mut output, no_fetch, false, &[]).unwrap();
348        output
349    }
350
351    #[test]
352    fn handshake_only_then_clean_shutdown() {
353        let (_t, store) = fixture();
354        let output = run(&store, handshake_input().build());
355        let toks = decode(&output);
356        // Server welcome + 2 caps + their respective flushes.
357        assert_eq!(
358            toks,
359            vec![
360                Tok::Text("git-filter-server".into()),
361                Tok::Text("version=2".into()),
362                Tok::Flush,
363                Tok::Text("capability=clean".into()),
364                Tok::Text("capability=smudge".into()),
365                Tok::Flush,
366            ],
367        );
368    }
369
370    #[test]
371    fn clean_request_emits_pointer() {
372        let (_t, store) = fixture();
373        let input = handshake_input()
374            .text("command=clean")
375            .text("pathname=hello.bin")
376            .flush()
377            .data(b"hello world\n")
378            .flush()
379            .build();
380        let output = run(&store, input);
381
382        // Skip past handshake (6 tokens) and find the response.
383        let toks = decode(&output);
384        let rest = &toks[6..];
385        assert_eq!(rest[0], Tok::Text("status=success".into()));
386        assert_eq!(rest[1], Tok::Flush);
387        // Next packet(s) are the pointer text. Should fit in one packet.
388        if let Tok::Text(t) = &rest[2] {
389            assert!(t.starts_with("version https://git-lfs.github.com/spec/v1\n"));
390            assert!(t.contains("oid sha256:"));
391            assert!(t.contains("size 12"));
392        } else {
393            panic!("expected text pointer, got {:?}", rest[2]);
394        }
395        assert_eq!(rest[3], Tok::Flush); // end-of-content
396        assert_eq!(rest[4], Tok::Text("status=success".into()));
397        assert_eq!(rest[5], Tok::Flush);
398    }
399
400    #[test]
401    fn smudge_request_emits_content() {
402        let (_t, store) = fixture();
403        // Pre-populate the store via clean(), then ask filter-process to smudge.
404        let mut pointer = Vec::new();
405        clean(&store, &mut { &b"smudge a\n"[..] }, &mut pointer, "", &[]).unwrap();
406
407        let input = handshake_input()
408            .text("command=smudge")
409            .text("pathname=a.dat")
410            .flush()
411            .data(&pointer)
412            .flush()
413            .build();
414        let output = run(&store, input);
415        let toks = decode(&output);
416        let rest = &toks[6..];
417        assert_eq!(rest[0], Tok::Text("status=success".into()));
418        assert_eq!(rest[1], Tok::Flush);
419        // Content "smudge a\n" is short text, so it'll round-trip as a Text token.
420        assert_eq!(rest[2], Tok::Text("smudge a".into()));
421        assert_eq!(rest[3], Tok::Flush);
422        assert_eq!(rest[4], Tok::Text("status=success".into()));
423    }
424
425    #[test]
426    fn smudge_missing_object_emits_status_error() {
427        let (_t, store) = fixture();
428        let unknown = "0000000000000000000000000000000000000000000000000000000000000001";
429        let pointer = format!("version {VERSION_LATEST}\noid sha256:{unknown}\nsize 5\n");
430        let input = handshake_input()
431            .text("command=smudge")
432            .text("pathname=missing.dat")
433            .flush()
434            .data(pointer.as_bytes())
435            .flush()
436            .build();
437        let output = run(&store, input);
438        let toks = decode(&output);
439        let rest = &toks[6..];
440        assert_eq!(rest[0], Tok::Text("status=success".into())); // initial
441        assert_eq!(rest[1], Tok::Flush);
442        // No content was written; next is end-of-content flush, then error trailer.
443        assert_eq!(rest[2], Tok::Flush);
444        assert_eq!(rest[3], Tok::Text("status=error".into()));
445        assert_eq!(rest[4], Tok::Flush);
446    }
447
448    #[test]
449    fn smudge_invokes_fetcher_when_object_missing() {
450        let (_t, store) = fixture();
451        let content = b"fetched on demand\n";
452        // Build the pointer text by cleaning, then wipe the object so the
453        // smudge will miss and exercise the fetch path.
454        let mut pointer = Vec::new();
455        clean(&store, &mut { &content[..] }, &mut pointer, "", &[]).unwrap();
456        let parsed = git_lfs_pointer::Pointer::parse(&pointer).unwrap();
457        std::fs::remove_file(store.object_path(parsed.oid)).unwrap();
458
459        let input = handshake_input()
460            .text("command=smudge")
461            .text("pathname=a.dat")
462            .flush()
463            .data(&pointer)
464            .flush()
465            .build();
466
467        let mut output = Vec::new();
468        let store_ref = &store;
469        filter_process(
470            &store,
471            Cursor::new(input),
472            &mut output,
473            |p: &Pointer| {
474                // Stand in for a real download — re-insert the bytes.
475                store_ref.insert(&mut { &content[..] }).unwrap();
476                assert_eq!(p.oid, parsed.oid);
477                Ok(())
478            },
479            false,
480            &[],
481        )
482        .unwrap();
483
484        let toks = decode(&output);
485        let rest = &toks[6..];
486        assert_eq!(rest[0], Tok::Text("status=success".into()));
487        assert_eq!(rest[1], Tok::Flush);
488        // Content "fetched on demand\n" comes back as a Text token.
489        assert_eq!(rest[2], Tok::Text("fetched on demand".into()));
490        assert_eq!(rest[3], Tok::Flush);
491        assert_eq!(rest[4], Tok::Text("status=success".into()));
492    }
493
494    #[test]
495    fn multiple_requests_in_one_session() {
496        let (_t, store) = fixture();
497        let input = handshake_input()
498            .text("command=clean")
499            .text("pathname=a.bin")
500            .flush()
501            .data(b"AAA")
502            .flush()
503            .text("command=clean")
504            .text("pathname=b.bin")
505            .flush()
506            .data(b"BBB")
507            .flush()
508            .build();
509        let output = run(&store, input);
510        let toks = decode(&output);
511        // Handshake is 6 tokens; each clean response is 6 tokens.
512        // (status=success, flush, content, flush, status=success, flush)
513        assert_eq!(toks.len(), 6 + 6 + 6, "got tokens: {toks:?}");
514    }
515}