1use std::io::{ErrorKind, Read, Result, Write};
2
3use ext::{ReadExt, WriteExt};
4
5use tracing::{error, info_span};
6use util::{ReadPktUntilFlush, WritePkt};
7pub(crate) mod ext;
8mod processor;
9mod util;
10pub use processor::*;
11
12#[macro_export]
13macro_rules! parse_error {
14 ($e:expr) => {
15 std::io::Error::new(std::io::ErrorKind::InvalidData, $e)
16 };
17}
18
19pub struct GitFilterServer<P>(P);
20
21impl<P> GitFilterServer<P> {
22 pub fn new(processor: P) -> Self {
23 Self(processor)
24 }
25}
26
27impl<P: Processor> GitFilterServer<P> {
28 fn communicate_internal<R: Read, W: Write>(
29 &mut self,
30 mut input: &mut R,
31 mut output: &mut W,
32 ) -> Result<()> {
33 let mut buf = Vec::new();
34 {
35 if input.pkt_text_read(&mut buf)? != Some("git-filter-client") {
36 return Err(parse_error!("bad prelude"));
37 }
38 if input.pkt_text_read(&mut buf)? != Some("version=2") {
39 return Err(parse_error!("unknown version"));
40 }
41 if input.pkt_text_read(&mut buf)? != None {
42 return Err(parse_error!("unexpected text after client hello"));
43 }
44 }
45 {
46 output.pkt_text_write("git-filter-server")?;
47 output.pkt_text_write("version=2")?;
48 output.pkt_end()?;
49 }
50 {
51 let mut filter = false;
52 let mut smudge = false;
53 let mut delay = false;
54 while let Some(command) = input.pkt_text_read(&mut buf)? {
55 match command {
56 "capability=clean" => filter = true,
57 "capability=smudge" => smudge = true,
58 "capability=delay" => delay = true,
59 _ => {}
60 }
61 }
62 if filter && self.0.supports_processing(ProcessingType::Clean) {
63 output.pkt_text_write("capability=clean")?;
64 }
65 if smudge && self.0.supports_processing(ProcessingType::Smudge) {
66 output.pkt_text_write("capability=smudge")?;
67 }
68 if delay {
69 output.pkt_text_write("capability=delay")?;
70 }
71 output.pkt_end()?;
72 }
73
74 let mut waiting_for_blobs = false;
75 loop {
76 let mut command = None;
77 let mut pathname = None;
78 let mut can_delay = false;
79 while let Some(input) = input.pkt_text_read(&mut buf)? {
80 if let Some(command_val) = input.strip_prefix("command=") {
81 command = Some(command_val.to_owned());
82 } else if let Some(pathname_val) = input.strip_prefix("pathname=") {
83 pathname = Some(pathname_val.to_owned())
84 } else if input == "can-delay=1" {
85 can_delay = true;
86 }
87 }
88 let command = command.ok_or_else(|| parse_error!("missing command"))?;
89 let _span = info_span!("command", command = format_args!("{:?}", command),).entered();
90
91 match command.as_str() {
92 t @ "clean" | t @ "smudge" => {
93 let process_type = match t {
94 "clean" => ProcessingType::Clean,
95 "smudge" => ProcessingType::Smudge,
96 _ => unreachable!(),
97 };
98 let pathname = pathname.ok_or_else(|| parse_error!("missing pathname"))?;
99 let mut process_input = ReadPktUntilFlush::new(&mut input);
100 if waiting_for_blobs {
101 let _span = info_span!(
102 "resolving delayed",
103 pathname = format_args!("{}", pathname)
104 )
105 .entered();
106 let mut sink = [0; 1];
107 process_input
108 .read_exact(&mut sink)
109 .map_err(|_| parse_error!("delayed blob should have no data"))?;
110 assert!(process_input.finished());
111
112 output.pkt_text_write("status=success")?;
113 output.pkt_end()?;
114 let mut process_output = WritePkt::new(&mut output);
115 if let Err(e) =
116 self.0
117 .get_scheduled(&pathname, process_type, &mut process_output)
118 {
119 process_output.flush()?;
120 drop(process_output);
121 error!("{:#}", e);
122 output.pkt_end()?;
123 output.pkt_text_write("status=error")?;
124 output.pkt_end()?;
125 return Ok(());
126 } else {
127 process_output.flush()?;
128 drop(process_output);
129 output.pkt_end()?;
130 output.pkt_end()?;
132 }
133 } else if can_delay && self.0.should_delay(&pathname, process_type) {
134 let _span =
135 info_span!("scheduling", pathname = format_args!("{}", pathname))
136 .entered();
137 if let Err(e) =
138 self.0
139 .schedule_process(&pathname, process_type, &mut process_input)
140 {
141 error!("{:#}", e);
142 output.pkt_text_write("status=error")?;
143 output.pkt_end()?;
144 return Ok(());
145 } else {
146 output.pkt_text_write("status=delayed")?;
147 output.pkt_end()?;
148 }
149 } else {
150 let _span =
151 info_span!("processing", pathname = format_args!("{}", pathname))
152 .entered();
153 output.pkt_text_write("status=success")?;
154 output.pkt_end()?;
155 let mut process_output = WritePkt::new(&mut output);
156 if let Err(e) = self.0.process(
157 &pathname,
158 process_type,
159 &mut process_input,
160 &mut process_output,
161 ) {
162 process_output.flush()?;
163 drop(process_output);
164 error!("{:#}", e);
165 output.pkt_end()?;
166 output.pkt_text_write("status=error")?;
167 output.pkt_end()?;
168 return Ok(());
169 } else {
170 process_output.flush()?;
171 drop(process_output);
172 output.pkt_end()?;
173 output.pkt_end()?;
175 }
176 }
177 assert!(process_input.finished());
179 }
180 "list_available_blobs" => {
181 self.0.switch_to_wait();
182 waiting_for_blobs = true;
183 }
184 cmd => return Err(parse_error!(format!("unknown command: {}", cmd))),
185 }
186 }
187 }
188
189 pub fn communicate<R: Read, W: Write>(&mut self, input: &mut R, output: &mut W) -> Result<()> {
190 match self.communicate_internal(input, output) {
191 Ok(_) => Ok(()),
192 Err(e) if e.kind() == ErrorKind::UnexpectedEof => Ok(()),
194 Err(e) => Err(e),
195 }
196 }
197
198 pub fn communicate_stdio(&mut self) -> Result<()> {
199 let stdin = std::io::stdin();
200 let stdout = std::io::stdout();
201
202 self.communicate(&mut stdin.lock(), &mut stdout.lock())?;
203 Ok(())
204 }
205}