1use std::collections::{HashMap, HashSet};
6use std::io::{Read, Write};
7use std::process::{Child, ChildStdin, ChildStdout, Command, Stdio};
8use std::sync::{Arc, Mutex, OnceLock};
9
10use crate::objects::ObjectId;
11use crate::refs;
12use crate::repo::Repository;
13
14const LARGE_PACKET_DATA_MAX: usize = 65520 - 4;
16
17const CAP_CLEAN: u32 = 1 << 0;
18const CAP_SMUDGE: u32 = 1 << 1;
19const CAP_DELAY: u32 = 1 << 2;
20
21#[derive(Debug, Clone, Default)]
23pub struct FilterSmudgeMeta {
24 pub ref_name: Option<String>,
25 pub treeish_hex: Option<String>,
26 pub blob_hex: Option<String>,
27}
28
29#[must_use]
31pub fn smudge_meta_blob_only(blob_hex: &str) -> FilterSmudgeMeta {
32 FilterSmudgeMeta {
33 blob_hex: Some(blob_hex.to_string()),
34 ..Default::default()
35 }
36}
37
38#[must_use]
40pub fn smudge_meta_treeish_only(treeish_hex: &str, blob_hex: &str) -> FilterSmudgeMeta {
41 FilterSmudgeMeta {
42 treeish_hex: Some(treeish_hex.to_string()),
43 blob_hex: Some(blob_hex.to_string()),
44 ..Default::default()
45 }
46}
47
48#[must_use]
50pub fn smudge_meta_for_reset(
51 repo: &Repository,
52 commit_spec: &str,
53 resolved_commit: &ObjectId,
54 blob_hex: &str,
55) -> FilterSmudgeMeta {
56 let tip_hex = resolved_commit.to_string();
57 let mut meta = FilterSmudgeMeta {
58 treeish_hex: Some(tip_hex.clone()),
59 blob_hex: Some(blob_hex.to_string()),
60 ..Default::default()
61 };
62 let arg_lower = commit_spec.to_ascii_lowercase();
63 let is_full_hex = arg_lower.len() == 40 && arg_lower.chars().all(|c| c.is_ascii_hexdigit());
64 if is_full_hex && arg_lower == tip_hex.to_ascii_lowercase() {
65 meta.ref_name = None;
66 return meta;
67 }
68 let mut candidates: Vec<String> = Vec::new();
69 if commit_spec == "HEAD" || commit_spec.starts_with("refs/") {
70 candidates.push(commit_spec.to_string());
71 } else {
72 candidates.push(format!("refs/heads/{commit_spec}"));
73 candidates.push(format!("refs/tags/{commit_spec}"));
74 candidates.push(commit_spec.to_string());
75 }
76 for name in candidates {
77 if let Ok(oid) = refs::resolve_ref(&repo.git_dir, &name) {
78 if oid == *resolved_commit {
79 meta.ref_name = Some(name);
80 break;
81 }
82 }
83 }
84 meta
85}
86
87#[must_use]
92pub fn smudge_meta_for_archive(
93 repo: &Repository,
94 tree_ish_arg: &str,
95 resolved_tip: &ObjectId,
96 tip_is_commit: bool,
97 blob_hex: &str,
98) -> FilterSmudgeMeta {
99 let mut meta = FilterSmudgeMeta {
100 blob_hex: Some(blob_hex.to_string()),
101 ..Default::default()
102 };
103 if !tip_is_commit {
104 meta.treeish_hex = Some(resolved_tip.to_string());
105 return meta;
106 }
107 let tip_hex = resolved_tip.to_string();
108 meta.treeish_hex = Some(tip_hex.clone());
109 let arg_lower = tree_ish_arg.to_ascii_lowercase();
110 let is_full_hex = arg_lower.len() == 40 && arg_lower.chars().all(|c| c.is_ascii_hexdigit());
111 if is_full_hex && arg_lower == tip_hex.to_ascii_lowercase() {
112 meta.ref_name = None;
113 return meta;
114 }
115 if let Ok(oid) = refs::resolve_ref(&repo.git_dir, tree_ish_arg) {
116 if oid == *resolved_tip {
117 meta.ref_name = Some(tree_ish_arg.to_string());
118 return meta;
119 }
120 }
121 let heads = format!("refs/heads/{tree_ish_arg}");
122 if let Ok(oid) = refs::resolve_ref(&repo.git_dir, &heads) {
123 if oid == *resolved_tip {
124 meta.ref_name = Some(heads);
125 }
126 }
127 meta
128}
129
130pub fn smudge_meta_for_checkout(repo: &Repository, blob_hex: &str) -> FilterSmudgeMeta {
131 let mut meta = FilterSmudgeMeta {
132 blob_hex: Some(blob_hex.to_string()),
133 ..Default::default()
134 };
135 let Ok(content) = std::fs::read_to_string(repo.git_dir.join("HEAD")) else {
136 return meta;
137 };
138 let content = content.trim();
139 if let Some(sym) = content.strip_prefix("ref: ") {
140 let sym = sym.trim();
141 meta.ref_name = Some(sym.to_string());
142 if let Ok(oid) = refs::resolve_ref(&repo.git_dir, sym) {
143 meta.treeish_hex = Some(oid.to_string());
144 }
145 } else if content.len() == 40 {
146 if let Ok(oid) = ObjectId::from_hex(content) {
147 meta.treeish_hex = Some(oid.to_string());
148 }
149 }
150 meta
151}
152
153struct RunningFilter {
154 #[allow(dead_code)]
155 child: Child,
156 stdin: Option<ChildStdin>,
157 stdout: Option<ChildStdout>,
158 caps: u32,
159}
160
161fn process_registry() -> &'static Mutex<HashMap<String, Arc<Mutex<RunningFilter>>>> {
162 static REG: OnceLock<Mutex<HashMap<String, Arc<Mutex<RunningFilter>>>>> = OnceLock::new();
163 REG.get_or_init(|| Mutex::new(HashMap::new()))
164}
165
166fn disabled_process_filters() -> &'static Mutex<HashSet<String>> {
167 static DISABLED: OnceLock<Mutex<HashSet<String>>> = OnceLock::new();
168 DISABLED.get_or_init(|| Mutex::new(HashSet::new()))
169}
170
171pub fn disable_process_filter(cmd: &str) {
176 if let Ok(mut disabled) = disabled_process_filters().lock() {
177 disabled.insert(cmd.to_string());
178 }
179 remove_process_filter(cmd);
180}
181
182fn process_filter_is_disabled(cmd: &str) -> bool {
183 disabled_process_filters()
184 .lock()
185 .ok()
186 .is_some_and(|disabled| disabled.contains(cmd))
187}
188
189fn remove_process_filter(cmd: &str) {
190 if let Ok(mut reg) = process_registry().lock() {
191 reg.remove(cmd);
192 }
193}
194
195fn process_transport_error(err: &str) -> bool {
196 !err.starts_with("filter status:") && !err.starts_with("filter tail status:")
197}
198
199fn set_packet_header(len: usize, out: &mut [u8; 4]) {
200 const HEX: &[u8; 16] = b"0123456789abcdef";
201 out[0] = HEX[(len >> 12) & 0xf];
202 out[1] = HEX[(len >> 8) & 0xf];
203 out[2] = HEX[(len >> 4) & 0xf];
204 out[3] = HEX[len & 0xf];
205}
206
207fn write_packet(stdin: &mut ChildStdin, payload: &[u8]) -> std::io::Result<()> {
208 if payload.len() > LARGE_PACKET_DATA_MAX {
209 return Err(std::io::Error::other("filter packet payload too large"));
210 }
211 let total = payload.len() + 4;
212 let mut hdr = [0u8; 4];
213 set_packet_header(total, &mut hdr);
214 stdin.write_all(&hdr)?;
215 stdin.write_all(payload)?;
216 stdin.flush()?;
217 Ok(())
218}
219
220fn write_packet_line(stdin: &mut ChildStdin, line: &str) -> std::io::Result<()> {
221 let mut s = line.to_string();
222 if !s.ends_with('\n') {
223 s.push('\n');
224 }
225 write_packet(stdin, s.as_bytes())
226}
227
228fn write_flush(stdin: &mut ChildStdin) -> std::io::Result<()> {
229 stdin.write_all(b"0000")?;
230 stdin.flush()
231}
232
233fn read_exact<R: Read>(r: &mut R, buf: &mut [u8]) -> std::io::Result<()> {
234 let mut off = 0;
235 while off < buf.len() {
236 let n = r.read(&mut buf[off..])?;
237 if n == 0 {
238 return Err(std::io::Error::new(
239 std::io::ErrorKind::UnexpectedEof,
240 "unexpected EOF reading pkt-line",
241 ));
242 }
243 off += n;
244 }
245 Ok(())
246}
247
248fn read_packet_header(stdout: &mut ChildStdout) -> std::io::Result<Option<[u8; 4]>> {
249 let mut hdr = [0u8; 4];
250 let mut off = 0usize;
251 while off < 4 {
252 let n = stdout.read(&mut hdr[off..])?;
253 if n == 0 {
254 return Err(std::io::Error::new(
255 std::io::ErrorKind::UnexpectedEof,
256 "unexpected EOF reading pkt-line",
257 ));
258 }
259 off += n;
260 }
261 Ok(Some(hdr))
262}
263
264fn read_packet_payload(stdout: &mut ChildStdout) -> std::io::Result<Option<Vec<u8>>> {
265 let Some(hdr) = read_packet_header(stdout)? else {
266 return Ok(None);
267 };
268 let hex = std::str::from_utf8(&hdr)
269 .map_err(|e| std::io::Error::new(std::io::ErrorKind::InvalidData, e))?;
270 let total = usize::from_str_radix(hex, 16).map_err(|_| {
271 std::io::Error::new(std::io::ErrorKind::InvalidData, "invalid pkt-line header")
272 })?;
273 if total == 0 {
274 return Ok(None);
275 }
276 if total < 4 {
277 return Err(std::io::Error::new(
278 std::io::ErrorKind::InvalidData,
279 "invalid pkt-line length",
280 ));
281 }
282 let len = total - 4;
283 let mut payload = vec![0u8; len];
284 read_exact(stdout, &mut payload)?;
285 Ok(Some(payload))
286}
287
288fn read_packet_line(stdout: &mut ChildStdout) -> std::io::Result<Option<String>> {
289 let Some(payload) = read_packet_payload(stdout)? else {
290 return Ok(None);
291 };
292 let s = String::from_utf8_lossy(&payload).into_owned();
293 Ok(Some(s.trim_end_matches('\n').to_string()))
294}
295
296fn read_status(stdout: &mut ChildStdout, acc: &mut String) -> std::io::Result<()> {
299 loop {
300 let Some(line) = read_packet_line(stdout)? else {
301 break;
302 };
303 if let Some(rest) = line.strip_prefix("status=") {
304 *acc = rest.to_string();
305 }
306 }
307 Ok(())
308}
309
310fn read_packetized(stdout: &mut ChildStdout) -> std::io::Result<Vec<u8>> {
311 let mut out = Vec::new();
312 loop {
313 let Some(chunk) = read_packet_payload(stdout)? else {
314 break;
315 };
316 out.extend_from_slice(&chunk);
317 }
318 Ok(out)
319}
320
321fn handshake(stdout: &mut ChildStdout, stdin: &mut ChildStdin) -> std::io::Result<u32> {
322 write_packet_line(stdin, "git-filter-client")?;
324 write_packet_line(stdin, "version=2")?;
325 write_flush(stdin)?;
326
327 let server = read_packet_line(stdout)?;
331 let server_line = server.as_deref().unwrap_or("<flush packet>");
332 if server_line != "git-filter-server" {
333 return Err(std::io::Error::new(
334 std::io::ErrorKind::InvalidData,
335 format!("Unexpected line '{server_line}', expected git-filter-server"),
336 ));
337 }
338 let Some(ver_line) = read_packet_line(stdout)? else {
339 return Err(std::io::Error::new(
340 std::io::ErrorKind::InvalidData,
341 "Unexpected line '<flush packet>', expected version",
342 ));
343 };
344 let ver = ver_line
345 .strip_prefix("version=")
346 .ok_or_else(|| std::io::Error::new(std::io::ErrorKind::InvalidData, "expected version="))?;
347 if ver != "2" {
348 return Err(std::io::Error::new(
349 std::io::ErrorKind::InvalidData,
350 format!("unsupported filter protocol version {ver}"),
351 ));
352 }
353 if read_packet_line(stdout)?.is_some() {
354 return Err(std::io::Error::new(
355 std::io::ErrorKind::InvalidData,
356 "expected flush after version",
357 ));
358 }
359
360 write_packet_line(stdin, "capability=clean")?;
361 write_packet_line(stdin, "capability=smudge")?;
362 write_packet_line(stdin, "capability=delay")?;
363 write_flush(stdin)?;
364
365 let mut caps = 0u32;
366 loop {
367 let Some(line) = read_packet_line(stdout)? else {
368 break;
369 };
370 if let Some(name) = line.strip_prefix("capability=") {
371 match name {
372 "clean" => caps |= CAP_CLEAN,
373 "smudge" => caps |= CAP_SMUDGE,
374 "delay" => caps |= CAP_DELAY,
375 _ => {}
376 }
377 }
378 }
379
380 Ok(caps)
381}
382
383fn spawn_running(cmd: &str) -> std::io::Result<RunningFilter> {
384 let mut child = Command::new("sh")
385 .arg("-c")
386 .arg(cmd)
387 .env_remove("GIT_CONFIG_GLOBAL")
392 .stdin(Stdio::piped())
393 .stdout(Stdio::piped())
394 .stderr(Stdio::inherit())
395 .spawn()?;
396
397 let mut stdin = child
398 .stdin
399 .take()
400 .ok_or_else(|| std::io::Error::other("filter process missing stdin"))?;
401 let mut stdout = child
402 .stdout
403 .take()
404 .ok_or_else(|| std::io::Error::other("filter process missing stdout"))?;
405
406 let caps = handshake(&mut stdout, &mut stdin)?;
407
408 Ok(RunningFilter {
409 child,
410 stdin: Some(stdin),
411 stdout: Some(stdout),
412 caps,
413 })
414}
415
416pub fn ensure_process_filter_started(cmd: &str) -> Result<(), String> {
418 ensure_started(cmd)
419}
420
421fn ensure_started(cmd: &str) -> Result<(), String> {
422 let mut reg = process_registry()
423 .lock()
424 .map_err(|_| "filter registry poisoned".to_string())?;
425 use std::collections::hash_map::Entry;
426 match reg.entry(cmd.to_string()) {
427 Entry::Occupied(_) => Ok(()),
428 Entry::Vacant(v) => {
429 let rf = spawn_running(cmd).map_err(|e| e.to_string())?;
430 v.insert(Arc::new(Mutex::new(rf)));
431 Ok(())
432 }
433 }
434}
435
436fn write_packetized(stdin: &mut ChildStdin, data: &[u8]) -> std::io::Result<()> {
437 let mut off = 0usize;
438 while off < data.len() {
439 let end = (off + LARGE_PACKET_DATA_MAX).min(data.len());
440 write_packet(stdin, &data[off..end])?;
441 off = end;
442 }
443 Ok(())
444}
445
446pub fn apply_process_clean(cmd: &str, path: &str, input: &[u8]) -> Result<Vec<u8>, String> {
448 if process_filter_is_disabled(cmd) {
449 return Ok(input.to_vec());
450 }
451 ensure_started(cmd)?;
452 let arc = {
453 let reg = process_registry()
454 .lock()
455 .map_err(|_| "filter registry poisoned".to_string())?;
456 reg.get(cmd)
457 .cloned()
458 .ok_or_else(|| "filter process not registered".to_string())?
459 };
460 let mut rf = arc
461 .lock()
462 .map_err(|_| "filter process mutex poisoned".to_string())?;
463 if rf.caps & CAP_CLEAN == 0 {
464 return Err("filter process does not support clean".to_string());
465 }
466 let mut stdin = rf
467 .stdin
468 .take()
469 .ok_or_else(|| "filter stdin missing".to_string())?;
470 let mut stdout = rf
471 .stdout
472 .take()
473 .ok_or_else(|| "filter stdout missing".to_string())?;
474
475 let result = (|| {
476 write_packet_line(&mut stdin, "command=clean").map_err(|e| e.to_string())?;
477 write_packet_line(&mut stdin, &format!("pathname={path}")).map_err(|e| e.to_string())?;
478 write_flush(&mut stdin).map_err(|e| e.to_string())?;
479 write_packetized(&mut stdin, input).map_err(|e| e.to_string())?;
480 write_flush(&mut stdin).map_err(|e| e.to_string())?;
481
482 let mut st = String::new();
483 read_status(&mut stdout, &mut st).map_err(|e| e.to_string())?;
484 if st != "success" {
485 return Err(format!("filter status: {st}"));
486 }
487 let out = read_packetized(&mut stdout).map_err(|e| e.to_string())?;
488 read_status(&mut stdout, &mut st).map_err(|e| e.to_string())?;
489 if st != "success" {
490 return Err(format!("filter tail status: {st}"));
491 }
492 Ok(out)
493 })();
494
495 rf.stdin = Some(stdin);
496 rf.stdout = Some(stdout);
497 result
498}
499
500#[derive(Debug, Clone)]
502pub struct DelayedProcessCheckoutEntry {
503 pub filter_cmd: String,
505 pub path: String,
506 pub smudge_meta: FilterSmudgeMeta,
507}
508
509#[derive(Debug, Default)]
511pub struct DelayedProcessCheckout {
512 pub entries: Vec<DelayedProcessCheckoutEntry>,
513}
514
515impl DelayedProcessCheckout {
516 pub fn push_delayed(
518 &mut self,
519 filter_cmd: String,
520 path: String,
521 smudge_meta: FilterSmudgeMeta,
522 ) {
523 self.entries.push(DelayedProcessCheckoutEntry {
524 filter_cmd,
525 path,
526 smudge_meta,
527 });
528 }
529
530 pub fn finish(
548 &mut self,
549 mut convert_retry: impl FnMut(&str, &FilterSmudgeMeta) -> Result<Vec<u8>, String>,
550 mut write_out: impl FnMut(&str, &[u8]) -> Result<(), String>,
551 ) -> Result<(), DelayedCheckoutError> {
552 let mut filters: Vec<String> = Vec::new();
555 for e in &self.entries {
556 if !filters.contains(&e.filter_cmd) {
557 filters.push(e.filter_cmd.clone());
558 }
559 }
560
561 let mut had_error = false;
562
563 while !filters.is_empty() {
564 let mut still_active: Vec<String> = Vec::new();
565 for cmd in std::mem::take(&mut filters) {
566 let available = match list_available_blobs(&cmd) {
567 Ok(paths) => paths,
568 Err(_) => {
569 had_error = true;
571 continue;
572 }
573 };
574 if available.is_empty() {
575 continue;
577 }
578 let mut drop_filter = false;
579 for path in available {
580 let Some(pos) = self
581 .entries
582 .iter()
583 .position(|e| e.filter_cmd == cmd && e.path == path)
584 else {
585 eprintln!(
588 "error: external filter '{cmd}' signaled that '{path}' is now \
589available although it has not been delayed earlier"
590 );
591 had_error = true;
592 drop_filter = true;
593 continue;
594 };
595 let entry = self.entries.swap_remove(pos);
596 let data = convert_retry(&entry.path, &entry.smudge_meta)
597 .map_err(DelayedCheckoutError::Transport)?;
598 write_out(&entry.path, &data).map_err(DelayedCheckoutError::Transport)?;
599 }
600 if !drop_filter {
603 still_active.push(cmd);
604 }
605 }
606 filters = still_active;
607 }
608
609 for entry in &self.entries {
611 eprintln!("error: '{}' was not filtered properly", entry.path);
612 had_error = true;
613 }
614 self.entries.clear();
615
616 if had_error {
617 return Err(DelayedCheckoutError::Reported);
618 }
619 Ok(())
620 }
621}
622
623#[derive(Debug)]
625pub enum DelayedCheckoutError {
626 Reported,
629 Transport(String),
631}
632
633impl std::fmt::Display for DelayedCheckoutError {
634 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
635 match self {
636 DelayedCheckoutError::Reported => f.write_str("delayed checkout failed"),
637 DelayedCheckoutError::Transport(msg) => f.write_str(msg),
638 }
639 }
640}
641
642impl std::error::Error for DelayedCheckoutError {}
643
644pub fn process_filter_supports_delay(cmd: &str) -> bool {
646 if cmd.is_empty() {
647 return false;
648 }
649 if process_filter_is_disabled(cmd) {
650 return false;
651 }
652 if ensure_process_filter_started(cmd).is_err() {
653 return false;
654 }
655 let Ok(reg) = process_registry().lock() else {
656 return false;
657 };
658 let Some(arc) = reg.get(cmd) else {
659 return false;
660 };
661 let Ok(rf) = arc.lock() else {
662 return false;
663 };
664 (rf.caps & CAP_DELAY) != 0
665}
666
667fn list_available_blobs(cmd: &str) -> Result<Vec<String>, String> {
668 ensure_started(cmd)?;
669 let arc = {
670 let reg = process_registry()
671 .lock()
672 .map_err(|_| "filter registry poisoned".to_string())?;
673 reg.get(cmd)
674 .cloned()
675 .ok_or_else(|| "filter process not registered".to_string())?
676 };
677 let mut rf = arc
678 .lock()
679 .map_err(|_| "filter process mutex poisoned".to_string())?;
680 if rf.caps & CAP_DELAY == 0 {
681 return Err("filter does not support delay".to_string());
682 }
683 let mut stdin = rf
684 .stdin
685 .take()
686 .ok_or_else(|| "filter stdin missing".to_string())?;
687 let mut stdout = rf
688 .stdout
689 .take()
690 .ok_or_else(|| "filter stdout missing".to_string())?;
691
692 let result = (|| {
693 write_packet_line(&mut stdin, "command=list_available_blobs").map_err(|e| e.to_string())?;
694 write_flush(&mut stdin).map_err(|e| e.to_string())?;
695 let mut paths = Vec::new();
696 loop {
697 let line = read_packet_line(&mut stdout).map_err(|e| e.to_string())?;
698 let Some(line) = line else {
699 break;
700 };
701 if let Some(p) = line.strip_prefix("pathname=") {
702 paths.push(p.to_string());
703 }
704 }
705 let mut st = String::new();
706 read_status(&mut stdout, &mut st).map_err(|e| e.to_string())?;
707 if st != "success" {
708 return Err(format!("list_available_blobs status: {st}"));
709 }
710 Ok(paths)
711 })();
712
713 rf.stdin = Some(stdin);
714 rf.stdout = Some(stdout);
715 result
716}
717
718pub fn apply_process_smudge(
723 cmd: &str,
724 path: &str,
725 input: &[u8],
726 meta: Option<&FilterSmudgeMeta>,
727 can_delay: bool,
728) -> Result<Option<Vec<u8>>, String> {
729 if process_filter_is_disabled(cmd) {
730 return Ok(Some(input.to_vec()));
731 }
732 ensure_started(cmd)?;
733 let arc = {
734 let reg = process_registry()
735 .lock()
736 .map_err(|_| "filter registry poisoned".to_string())?;
737 reg.get(cmd)
738 .cloned()
739 .ok_or_else(|| "filter process not registered".to_string())?
740 };
741 let mut rf = arc
742 .lock()
743 .map_err(|_| "filter process mutex poisoned".to_string())?;
744 let caps = rf.caps;
745 let mut stdin = rf
746 .stdin
747 .take()
748 .ok_or_else(|| "filter stdin missing".to_string())?;
749 let mut stdout = rf
750 .stdout
751 .take()
752 .ok_or_else(|| "filter stdout missing".to_string())?;
753
754 let result = (|| {
755 if caps & CAP_SMUDGE == 0 {
756 return Ok(Some(input.to_vec()));
757 }
758 write_packet_line(&mut stdin, "command=smudge").map_err(|e| e.to_string())?;
759 write_packet_line(&mut stdin, &format!("pathname={path}")).map_err(|e| e.to_string())?;
760 if let Some(m) = meta {
761 if let Some(r) = &m.ref_name {
762 write_packet_line(&mut stdin, &format!("ref={r}")).map_err(|e| e.to_string())?;
763 }
764 if let Some(t) = &m.treeish_hex {
765 write_packet_line(&mut stdin, &format!("treeish={t}"))
766 .map_err(|e| e.to_string())?;
767 }
768 if let Some(b) = &m.blob_hex {
769 write_packet_line(&mut stdin, &format!("blob={b}")).map_err(|e| e.to_string())?;
770 }
771 }
772 if can_delay && (caps & CAP_DELAY) != 0 {
773 write_packet_line(&mut stdin, "can-delay=1").map_err(|e| e.to_string())?;
774 }
775 write_flush(&mut stdin).map_err(|e| e.to_string())?;
776 write_packetized(&mut stdin, input).map_err(|e| e.to_string())?;
777 write_flush(&mut stdin).map_err(|e| e.to_string())?;
778
779 let mut st = String::new();
780 read_status(&mut stdout, &mut st).map_err(|e| e.to_string())?;
781 if st == "delayed" {
782 if !can_delay {
783 return Err("unexpected delayed status from filter".to_string());
784 }
785 return Ok(None);
786 }
787 if st != "success" {
788 return Err(format!("filter status: {st}"));
789 }
790 let out = read_packetized(&mut stdout).map_err(|e| e.to_string())?;
791 read_status(&mut stdout, &mut st).map_err(|e| e.to_string())?;
792 if st != "success" {
793 return Err(format!("filter tail status: {st}"));
794 }
795 Ok(Some(out))
796 })();
797
798 if result
799 .as_ref()
800 .err()
801 .is_some_and(|e| process_transport_error(e))
802 {
803 drop(stdin);
804 drop(stdout);
805 drop(rf);
806 remove_process_filter(cmd);
807 return result;
808 }
809
810 rf.stdin = Some(stdin);
811 rf.stdout = Some(stdout);
812 result
813}