Skip to main content

git_lfs_filter/
filter_process.rs

1//! The git filter-process protocol.
2//!
3//! Implements the long-running-filter side of git's `gitprotocol-common`(5)
4//! framing: a one-time handshake + capability negotiation, then a loop of
5//! request/response pairs over packet-line framing on stdin/stdout. Same
6//! business logic as per-invocation `clean`/`smudge`, just batched in one
7//! subprocess for the duration of a checkout/commit.
8
9use std::collections::HashMap;
10use std::io::{self, Read, Write};
11
12use git_lfs_git::pktline;
13use git_lfs_pointer::Pointer;
14use git_lfs_store::Store;
15
16use crate::{FetchError, clean, smudge_with_fetch};
17
18#[derive(Debug, thiserror::Error)]
19pub enum FilterProcessError {
20    #[error(transparent)]
21    Io(#[from] io::Error),
22    #[error("filter-process handshake: {0}")]
23    Handshake(String),
24    #[error("filter-process: missing required header {0:?}")]
25    MissingHeader(&'static str),
26    #[error("filter-process: unknown command {0:?}")]
27    UnknownCommand(String),
28}
29
30/// Run the filter-process protocol against `input`/`output` (typically
31/// stdin/stdout). Returns when git closes its end of the pipe.
32///
33/// `fetch` is called when a smudge request hits an object that isn't in
34/// the local store; see [`smudge_with_fetch`] for semantics. If fetching
35/// is not supported in the caller's context, pass a closure that always
36/// errors — the protocol will then surface those smudges as `status=error`
37/// to git, same as if [`smudge`](crate::smudge) hit `ObjectMissing`.
38pub fn filter_process<R, W, F>(
39    store: &Store,
40    input: R,
41    output: W,
42    mut fetch: F,
43) -> Result<(), FilterProcessError>
44where
45    R: Read,
46    W: Write,
47    F: FnMut(&Pointer) -> Result<(), FetchError>,
48{
49    let mut reader = pktline::Reader::new(input);
50    let mut writer = pktline::Writer::new(output);
51
52    handshake(&mut reader, &mut writer)?;
53
54    loop {
55        // A read error here at packet-boundary normally means git closed the
56        // pipe — that's the protocol's "we're done" signal, not a real error.
57        let headers = match read_headers(&mut reader) {
58            Ok(Some(h)) => h,
59            Ok(None) => return Ok(()),
60            Err(FilterProcessError::Io(e)) if e.kind() == io::ErrorKind::UnexpectedEof => {
61                return Ok(());
62            }
63            Err(e) => return Err(e),
64        };
65
66        let payload = read_payload(&mut reader)?;
67        let command = headers
68            .get("command")
69            .ok_or(FilterProcessError::MissingHeader("command"))?
70            .clone();
71
72        match command.as_str() {
73            "clean" => process_clean(store, &mut writer, &payload)?,
74            "smudge" => process_smudge(store, &mut writer, &payload, &mut fetch)?,
75            other => return Err(FilterProcessError::UnknownCommand(other.into())),
76        }
77        writer.flush()?;
78    }
79}
80
81fn handshake<R: Read, W: Write>(
82    reader: &mut pktline::Reader<R>,
83    writer: &mut pktline::Writer<W>,
84) -> Result<(), FilterProcessError> {
85    // Welcome.
86    let welcome = reader
87        .read_text()?
88        .ok_or_else(|| FilterProcessError::Handshake("expected welcome, got flush".into()))?;
89    if welcome != "git-filter-client" {
90        return Err(FilterProcessError::Handshake(format!(
91            "expected git-filter-client, got {welcome:?}"
92        )));
93    }
94    let mut versions = Vec::new();
95    while let Some(line) = reader.read_text()? {
96        versions.push(line);
97    }
98    if !versions.iter().any(|v| v == "version=2") {
99        return Err(FilterProcessError::Handshake(format!(
100            "client doesn't advertise version=2 (got {versions:?})"
101        )));
102    }
103    writer.write_text("git-filter-server")?;
104    writer.write_text("version=2")?;
105    writer.write_flush()?;
106
107    // Send our capabilities *before* reading the client's. The protocol
108    // doc reads as if the client speaks first ("capability=…" then a
109    // flush, then server replies), but real git serializes the two
110    // exchanges and won't send its capabilities until it has seen the
111    // server's. Upstream Go's filter-process advertises preemptively for
112    // the same reason — diverging from that reordering deadlocks any
113    // shell-test that does `git add` of an LFS-tracked path.
114    writer.write_text("capability=clean")?;
115    writer.write_text("capability=smudge")?;
116    writer.write_flush()?;
117    writer.flush()?;
118
119    // Drain the client's advertised capabilities (informational — we
120    // don't gate on them). We *do* require clean + smudge to be in the
121    // set so that misconfigured callers get a clear error rather than
122    // silent "command not understood" later.
123    let mut caps = Vec::new();
124    while let Some(line) = reader.read_text()? {
125        caps.push(line);
126    }
127    for required in ["capability=clean", "capability=smudge"] {
128        if !caps.iter().any(|c| c == required) {
129            return Err(FilterProcessError::Handshake(format!(
130                "client missing required {required} (got {caps:?})"
131            )));
132        }
133    }
134
135    Ok(())
136}
137
138fn read_headers<R: Read>(
139    reader: &mut pktline::Reader<R>,
140) -> Result<Option<HashMap<String, String>>, FilterProcessError> {
141    let first = reader.read_text()?;
142    let Some(first) = first else {
143        // Bare flush at top of loop is unexpected from git; treat as shutdown.
144        return Ok(None);
145    };
146    let mut map = HashMap::new();
147    insert_kv(&mut map, &first);
148    while let Some(line) = reader.read_text()? {
149        insert_kv(&mut map, &line);
150    }
151    Ok(Some(map))
152}
153
154fn insert_kv(map: &mut HashMap<String, String>, line: &str) {
155    if let Some((k, v)) = line.split_once('=') {
156        map.insert(k.to_owned(), v.to_owned());
157    }
158}
159
160fn read_payload<R: Read>(
161    reader: &mut pktline::Reader<R>,
162) -> Result<Vec<u8>, FilterProcessError> {
163    let mut payload = Vec::new();
164    while let Some(packet) = reader.read_packet()? {
165        payload.extend_from_slice(&packet);
166    }
167    Ok(payload)
168}
169
170/// Run one clean request through the protocol envelope:
171/// `status=success` + flush, content packets + flush, final `status=...` + flush.
172fn process_clean<W: Write>(
173    store: &Store,
174    writer: &mut pktline::Writer<W>,
175    payload: &[u8],
176) -> Result<(), FilterProcessError> {
177    write_initial_status(writer)?;
178    let result = run_through_sink(writer, |sink| {
179        clean(store, &mut { payload }, sink)
180            .map(|_| ())
181            .map_err(|e| io::Error::other(e.to_string()))
182    });
183    write_final_status(writer, result.is_ok())?;
184    Ok(())
185}
186
187fn process_smudge<W, F>(
188    store: &Store,
189    writer: &mut pktline::Writer<W>,
190    payload: &[u8],
191    fetch: &mut F,
192) -> Result<(), FilterProcessError>
193where
194    W: Write,
195    F: FnMut(&Pointer) -> Result<(), FetchError>,
196{
197    write_initial_status(writer)?;
198    let result = run_through_sink(writer, |sink| {
199        // The protocol only differentiates success vs. error at this layer;
200        // the specific reason (ObjectMissing, Extensions, FetchFailed, …) is
201        // logged by the caller's stderr if they care.
202        smudge_with_fetch(store, &mut { payload }, sink, |p| fetch(p))
203            .map(|_| ())
204            .map_err(|e| io::Error::other(e.to_string()))
205    });
206    write_final_status(writer, result.is_ok())?;
207    Ok(())
208}
209
210fn write_initial_status<W: Write>(writer: &mut pktline::Writer<W>) -> io::Result<()> {
211    writer.write_text("status=success")?;
212    writer.write_flush()
213}
214
215fn write_final_status<W: Write>(writer: &mut pktline::Writer<W>, ok: bool) -> io::Result<()> {
216    // End-of-content flush comes from the sink runner; this is the
217    // post-content "trailer" status that tells git "all done, no errors"
218    // (or "I lied, error happened").
219    writer.write_text(if ok { "status=success" } else { "status=error" })?;
220    writer.write_flush()
221}
222
223/// Runs `f` with a packet-line sink, then flushes the sink and emits the
224/// end-of-content flush regardless of `f`'s result. The result of `f` is
225/// returned for the caller's status decision.
226fn run_through_sink<W, F>(writer: &mut pktline::Writer<W>, f: F) -> io::Result<()>
227where
228    W: Write,
229    F: FnOnce(&mut pktline::Sink<'_, W>) -> io::Result<()>,
230{
231    let result = {
232        let mut sink = pktline::Sink::new(writer);
233        let r = f(&mut sink);
234        sink.flush()?;
235        r
236    };
237    writer.write_flush()?;
238    result
239}
240
241#[cfg(test)]
242mod tests {
243    use super::*;
244    use git_lfs_pointer::VERSION_LATEST;
245    use std::io::Cursor;
246    use tempfile::TempDir;
247
248    fn fixture() -> (TempDir, Store) {
249        let tmp = TempDir::new().unwrap();
250        let store = Store::new(tmp.path().join("lfs"));
251        (tmp, store)
252    }
253
254    /// Build a stream of pktline packets the way git would send them.
255    struct PktBuilder(Vec<u8>);
256
257    impl PktBuilder {
258        fn new() -> Self {
259            Self(Vec::new())
260        }
261        fn text(mut self, s: &str) -> Self {
262            let body = format!("{s}\n");
263            let total = body.len() + 4;
264            self.0.extend_from_slice(format!("{total:04x}").as_bytes());
265            self.0.extend_from_slice(body.as_bytes());
266            self
267        }
268        fn data(mut self, b: &[u8]) -> Self {
269            let total = b.len() + 4;
270            self.0.extend_from_slice(format!("{total:04x}").as_bytes());
271            self.0.extend_from_slice(b);
272            self
273        }
274        fn flush(mut self) -> Self {
275            self.0.extend_from_slice(b"0000");
276            self
277        }
278        fn build(self) -> Vec<u8> {
279            self.0
280        }
281    }
282
283    /// Decode the response stream into a flat Vec of "packet or flush" tokens
284    /// for assertions.
285    #[derive(Debug, PartialEq)]
286    enum Tok {
287        Text(String),
288        Bin(Vec<u8>),
289        Flush,
290    }
291
292    fn decode(bytes: &[u8]) -> Vec<Tok> {
293        let mut r = pktline::Reader::new(Cursor::new(bytes));
294        let mut out = Vec::new();
295        loop {
296            match r.read_packet() {
297                Ok(Some(p)) => match String::from_utf8(p.clone()) {
298                    Ok(s) => out.push(Tok::Text(s.trim_end_matches('\n').to_owned())),
299                    Err(_) => out.push(Tok::Bin(p)),
300                },
301                Ok(None) => out.push(Tok::Flush),
302                Err(e) if e.kind() == io::ErrorKind::UnexpectedEof => return out,
303                Err(e) => panic!("decode error: {e}"),
304            }
305        }
306    }
307
308    fn handshake_input() -> PktBuilder {
309        PktBuilder::new()
310            .text("git-filter-client")
311            .text("version=2")
312            .flush()
313            .text("capability=clean")
314            .text("capability=smudge")
315            .flush()
316    }
317
318    /// Fetcher that always errors — the right default for tests where
319    /// we don't expect any miss. Lies surface as `status=error` to git.
320    fn no_fetch(_p: &Pointer) -> Result<(), FetchError> {
321        Err("test: no fetcher configured".into())
322    }
323
324    fn run(store: &Store, input: Vec<u8>) -> Vec<u8> {
325        let mut output = Vec::new();
326        filter_process(store, Cursor::new(input), &mut output, no_fetch).unwrap();
327        output
328    }
329
330    #[test]
331    fn handshake_only_then_clean_shutdown() {
332        let (_t, store) = fixture();
333        let output = run(&store, handshake_input().build());
334        let toks = decode(&output);
335        // Server welcome + 2 caps + their respective flushes.
336        assert_eq!(
337            toks,
338            vec![
339                Tok::Text("git-filter-server".into()),
340                Tok::Text("version=2".into()),
341                Tok::Flush,
342                Tok::Text("capability=clean".into()),
343                Tok::Text("capability=smudge".into()),
344                Tok::Flush,
345            ],
346        );
347    }
348
349    #[test]
350    fn clean_request_emits_pointer() {
351        let (_t, store) = fixture();
352        let input = handshake_input()
353            .text("command=clean")
354            .text("pathname=hello.bin")
355            .flush()
356            .data(b"hello world\n")
357            .flush()
358            .build();
359        let output = run(&store, input);
360
361        // Skip past handshake (6 tokens) and find the response.
362        let toks = decode(&output);
363        let rest = &toks[6..];
364        assert_eq!(rest[0], Tok::Text("status=success".into()));
365        assert_eq!(rest[1], Tok::Flush);
366        // Next packet(s) are the pointer text. Should fit in one packet.
367        if let Tok::Text(t) = &rest[2] {
368            assert!(t.starts_with("version https://git-lfs.github.com/spec/v1\n"));
369            assert!(t.contains("oid sha256:"));
370            assert!(t.contains("size 12"));
371        } else {
372            panic!("expected text pointer, got {:?}", rest[2]);
373        }
374        assert_eq!(rest[3], Tok::Flush); // end-of-content
375        assert_eq!(rest[4], Tok::Text("status=success".into()));
376        assert_eq!(rest[5], Tok::Flush);
377    }
378
379    #[test]
380    fn smudge_request_emits_content() {
381        let (_t, store) = fixture();
382        // Pre-populate the store via clean(), then ask filter-process to smudge.
383        let mut pointer = Vec::new();
384        clean(&store, &mut { &b"smudge a\n"[..] }, &mut pointer).unwrap();
385
386        let input = handshake_input()
387            .text("command=smudge")
388            .text("pathname=a.dat")
389            .flush()
390            .data(&pointer)
391            .flush()
392            .build();
393        let output = run(&store, input);
394        let toks = decode(&output);
395        let rest = &toks[6..];
396        assert_eq!(rest[0], Tok::Text("status=success".into()));
397        assert_eq!(rest[1], Tok::Flush);
398        // Content "smudge a\n" is short text, so it'll round-trip as a Text token.
399        assert_eq!(rest[2], Tok::Text("smudge a".into()));
400        assert_eq!(rest[3], Tok::Flush);
401        assert_eq!(rest[4], Tok::Text("status=success".into()));
402    }
403
404    #[test]
405    fn smudge_missing_object_emits_status_error() {
406        let (_t, store) = fixture();
407        let unknown = "0000000000000000000000000000000000000000000000000000000000000001";
408        let pointer = format!("version {VERSION_LATEST}\noid sha256:{unknown}\nsize 5\n");
409        let input = handshake_input()
410            .text("command=smudge")
411            .text("pathname=missing.dat")
412            .flush()
413            .data(pointer.as_bytes())
414            .flush()
415            .build();
416        let output = run(&store, input);
417        let toks = decode(&output);
418        let rest = &toks[6..];
419        assert_eq!(rest[0], Tok::Text("status=success".into())); // initial
420        assert_eq!(rest[1], Tok::Flush);
421        // No content was written; next is end-of-content flush, then error trailer.
422        assert_eq!(rest[2], Tok::Flush);
423        assert_eq!(rest[3], Tok::Text("status=error".into()));
424        assert_eq!(rest[4], Tok::Flush);
425    }
426
427    #[test]
428    fn smudge_invokes_fetcher_when_object_missing() {
429        let (_t, store) = fixture();
430        let content = b"fetched on demand\n";
431        // Build the pointer text by cleaning, then wipe the object so the
432        // smudge will miss and exercise the fetch path.
433        let mut pointer = Vec::new();
434        clean(&store, &mut { &content[..] }, &mut pointer).unwrap();
435        let parsed = git_lfs_pointer::Pointer::parse(&pointer).unwrap();
436        std::fs::remove_file(store.object_path(parsed.oid)).unwrap();
437
438        let input = handshake_input()
439            .text("command=smudge")
440            .text("pathname=a.dat")
441            .flush()
442            .data(&pointer)
443            .flush()
444            .build();
445
446        let mut output = Vec::new();
447        let store_ref = &store;
448        filter_process(&store, Cursor::new(input), &mut output, |p: &Pointer| {
449            // Stand in for a real download — re-insert the bytes.
450            store_ref.insert(&mut { &content[..] }).unwrap();
451            assert_eq!(p.oid, parsed.oid);
452            Ok(())
453        })
454        .unwrap();
455
456        let toks = decode(&output);
457        let rest = &toks[6..];
458        assert_eq!(rest[0], Tok::Text("status=success".into()));
459        assert_eq!(rest[1], Tok::Flush);
460        // Content "fetched on demand\n" comes back as a Text token.
461        assert_eq!(rest[2], Tok::Text("fetched on demand".into()));
462        assert_eq!(rest[3], Tok::Flush);
463        assert_eq!(rest[4], Tok::Text("status=success".into()));
464    }
465
466    #[test]
467    fn multiple_requests_in_one_session() {
468        let (_t, store) = fixture();
469        let input = handshake_input()
470            .text("command=clean")
471            .text("pathname=a.bin")
472            .flush()
473            .data(b"AAA")
474            .flush()
475            .text("command=clean")
476            .text("pathname=b.bin")
477            .flush()
478            .data(b"BBB")
479            .flush()
480            .build();
481        let output = run(&store, input);
482        let toks = decode(&output);
483        // Handshake is 6 tokens; each clean response is 6 tokens.
484        // (status=success, flush, content, flush, status=success, flush)
485        assert_eq!(toks.len(), 6 + 6 + 6, "got tokens: {toks:?}");
486    }
487}