git_filter_server/
lib.rs

1use std::io::{ErrorKind, Read, Result, Write};
2
3use ext::{ReadExt, WriteExt};
4
5use tracing::{error, info_span};
6use util::{ReadPktUntilFlush, WritePkt};
7pub(crate) mod ext;
8mod processor;
9mod util;
10pub use processor::*;
11
12#[macro_export]
13macro_rules! parse_error {
14    ($e:expr) => {
15        std::io::Error::new(std::io::ErrorKind::InvalidData, $e)
16    };
17}
18
19pub struct GitFilterServer<P>(P);
20
21impl<P> GitFilterServer<P> {
22    pub fn new(processor: P) -> Self {
23        Self(processor)
24    }
25}
26
27impl<P: Processor> GitFilterServer<P> {
28    fn communicate_internal<R: Read, W: Write>(
29        &mut self,
30        mut input: &mut R,
31        mut output: &mut W,
32    ) -> Result<()> {
33        let mut buf = Vec::new();
34        {
35            if input.pkt_text_read(&mut buf)? != Some("git-filter-client") {
36                return Err(parse_error!("bad prelude"));
37            }
38            if input.pkt_text_read(&mut buf)? != Some("version=2") {
39                return Err(parse_error!("unknown version"));
40            }
41            if input.pkt_text_read(&mut buf)? != None {
42                return Err(parse_error!("unexpected text after client hello"));
43            }
44        }
45        {
46            output.pkt_text_write("git-filter-server")?;
47            output.pkt_text_write("version=2")?;
48            output.pkt_end()?;
49        }
50        {
51            let mut filter = false;
52            let mut smudge = false;
53            let mut delay = false;
54            while let Some(command) = input.pkt_text_read(&mut buf)? {
55                match command {
56                    "capability=clean" => filter = true,
57                    "capability=smudge" => smudge = true,
58                    "capability=delay" => delay = true,
59                    _ => {}
60                }
61            }
62            if filter && self.0.supports_processing(ProcessingType::Clean) {
63                output.pkt_text_write("capability=clean")?;
64            }
65            if smudge && self.0.supports_processing(ProcessingType::Smudge) {
66                output.pkt_text_write("capability=smudge")?;
67            }
68            if delay {
69                output.pkt_text_write("capability=delay")?;
70            }
71            output.pkt_end()?;
72        }
73
74        let mut waiting_for_blobs = false;
75        loop {
76            let mut command = None;
77            let mut pathname = None;
78            let mut can_delay = false;
79            while let Some(input) = input.pkt_text_read(&mut buf)? {
80                if let Some(command_val) = input.strip_prefix("command=") {
81                    command = Some(command_val.to_owned());
82                } else if let Some(pathname_val) = input.strip_prefix("pathname=") {
83                    pathname = Some(pathname_val.to_owned())
84                } else if input == "can-delay=1" {
85                    can_delay = true;
86                }
87            }
88            let command = command.ok_or_else(|| parse_error!("missing command"))?;
89            let _span = info_span!("command", command = format_args!("{:?}", command),).entered();
90
91            match command.as_str() {
92                t @ "clean" | t @ "smudge" => {
93                    let process_type = match t {
94                        "clean" => ProcessingType::Clean,
95                        "smudge" => ProcessingType::Smudge,
96                        _ => unreachable!(),
97                    };
98                    let pathname = pathname.ok_or_else(|| parse_error!("missing pathname"))?;
99                    let mut process_input = ReadPktUntilFlush::new(&mut input);
100                    if waiting_for_blobs {
101                        let _span = info_span!(
102                            "resolving delayed",
103                            pathname = format_args!("{}", pathname)
104                        )
105                        .entered();
106                        let mut sink = [0; 1];
107                        process_input
108                            .read_exact(&mut sink)
109                            .map_err(|_| parse_error!("delayed blob should have no data"))?;
110                        assert!(process_input.finished());
111
112                        output.pkt_text_write("status=success")?;
113                        output.pkt_end()?;
114                        let mut process_output = WritePkt::new(&mut output);
115                        if let Err(e) =
116                            self.0
117                                .get_scheduled(&pathname, process_type, &mut process_output)
118                        {
119                            process_output.flush()?;
120                            drop(process_output);
121                            error!("{:#}", e);
122                            output.pkt_end()?;
123                            output.pkt_text_write("status=error")?;
124                            output.pkt_end()?;
125                            return Ok(());
126                        } else {
127                            process_output.flush()?;
128                            drop(process_output);
129                            output.pkt_end()?;
130                            // Keep status
131                            output.pkt_end()?;
132                        }
133                    } else if can_delay && self.0.should_delay(&pathname, process_type) {
134                        let _span =
135                            info_span!("scheduling", pathname = format_args!("{}", pathname))
136                                .entered();
137                        if let Err(e) =
138                            self.0
139                                .schedule_process(&pathname, process_type, &mut process_input)
140                        {
141                            error!("{:#}", e);
142                            output.pkt_text_write("status=error")?;
143                            output.pkt_end()?;
144                            return Ok(());
145                        } else {
146                            output.pkt_text_write("status=delayed")?;
147                            output.pkt_end()?;
148                        }
149                    } else {
150                        let _span =
151                            info_span!("processing", pathname = format_args!("{}", pathname))
152                                .entered();
153                        output.pkt_text_write("status=success")?;
154                        output.pkt_end()?;
155                        let mut process_output = WritePkt::new(&mut output);
156                        if let Err(e) = self.0.process(
157                            &pathname,
158                            process_type,
159                            &mut process_input,
160                            &mut process_output,
161                        ) {
162                            process_output.flush()?;
163                            drop(process_output);
164                            error!("{:#}", e);
165                            output.pkt_end()?;
166                            output.pkt_text_write("status=error")?;
167                            output.pkt_end()?;
168                            return Ok(());
169                        } else {
170                            process_output.flush()?;
171                            drop(process_output);
172                            output.pkt_end()?;
173                            // Keep status
174                            output.pkt_end()?;
175                        }
176                    }
177                    // Input should be stopped at flush
178                    assert!(process_input.finished());
179                }
180                "list_available_blobs" => {
181                    self.0.switch_to_wait();
182                    waiting_for_blobs = true;
183                }
184                cmd => return Err(parse_error!(format!("unknown command: {}", cmd))),
185            }
186        }
187    }
188
189    pub fn communicate<R: Read, W: Write>(&mut self, input: &mut R, output: &mut W) -> Result<()> {
190        match self.communicate_internal(input, output) {
191            Ok(_) => Ok(()),
192            // Communication is done, not a error
193            Err(e) if e.kind() == ErrorKind::UnexpectedEof => Ok(()),
194            Err(e) => Err(e),
195        }
196    }
197
198    pub fn communicate_stdio(&mut self) -> Result<()> {
199        let stdin = std::io::stdin();
200        let stdout = std::io::stdout();
201
202        self.communicate(&mut stdin.lock(), &mut stdout.lock())?;
203        Ok(())
204    }
205}