Skip to main content

grit_lib/
filter_process.rs

1//! Long-running Git filter protocol (`filter.<name>.process`), matching `git-filter` v2.
2//!
3//! See Git's `convert.c` (`apply_multi_file_filter`) and `sub-process.c` (handshake).
4
5use std::collections::{HashMap, HashSet};
6use std::io::{Read, Write};
7use std::process::{Child, ChildStdin, ChildStdout, Command, Stdio};
8use std::sync::{Arc, Mutex, OnceLock};
9
10use crate::objects::ObjectId;
11use crate::refs;
12use crate::repo::Repository;
13
14/// Max data bytes per pkt-line payload (Git `LARGE_PACKET_DATA_MAX`).
15const LARGE_PACKET_DATA_MAX: usize = 65520 - 4;
16
17const CAP_CLEAN: u32 = 1 << 0;
18const CAP_SMUDGE: u32 = 1 << 1;
19const CAP_DELAY: u32 = 1 << 2;
20
21/// Optional metadata sent with smudge (ref, treeish, blob hex).
22#[derive(Debug, Clone, Default)]
23pub struct FilterSmudgeMeta {
24    pub ref_name: Option<String>,
25    pub treeish_hex: Option<String>,
26    pub blob_hex: Option<String>,
27}
28
29/// Smudge metadata for path-only checkouts (`git checkout -- <paths>`): `blob=` only.
30#[must_use]
31pub fn smudge_meta_blob_only(blob_hex: &str) -> FilterSmudgeMeta {
32    FilterSmudgeMeta {
33        blob_hex: Some(blob_hex.to_string()),
34        ..Default::default()
35    }
36}
37
38/// Smudge metadata with `treeish=` only (e.g. `git reset --hard <commit>` / `git merge` checkout).
39#[must_use]
40pub fn smudge_meta_treeish_only(treeish_hex: &str, blob_hex: &str) -> FilterSmudgeMeta {
41    FilterSmudgeMeta {
42        treeish_hex: Some(treeish_hex.to_string()),
43        blob_hex: Some(blob_hex.to_string()),
44        ..Default::default()
45    }
46}
47
48/// Process-smudge metadata for `git reset --hard <ref>` (t0021): `ref=` when the spec names a ref.
49#[must_use]
50pub fn smudge_meta_for_reset(
51    repo: &Repository,
52    commit_spec: &str,
53    resolved_commit: &ObjectId,
54    blob_hex: &str,
55) -> FilterSmudgeMeta {
56    let tip_hex = resolved_commit.to_string();
57    let mut meta = FilterSmudgeMeta {
58        treeish_hex: Some(tip_hex.clone()),
59        blob_hex: Some(blob_hex.to_string()),
60        ..Default::default()
61    };
62    let arg_lower = commit_spec.to_ascii_lowercase();
63    let is_full_hex = arg_lower.len() == 40 && arg_lower.chars().all(|c| c.is_ascii_hexdigit());
64    if is_full_hex && arg_lower == tip_hex.to_ascii_lowercase() {
65        meta.ref_name = None;
66        return meta;
67    }
68    let mut candidates: Vec<String> = Vec::new();
69    if commit_spec == "HEAD" || commit_spec.starts_with("refs/") {
70        candidates.push(commit_spec.to_string());
71    } else {
72        candidates.push(format!("refs/heads/{commit_spec}"));
73        candidates.push(format!("refs/tags/{commit_spec}"));
74        candidates.push(commit_spec.to_string());
75    }
76    for name in candidates {
77        if let Ok(oid) = refs::resolve_ref(&repo.git_dir, &name) {
78            if oid == *resolved_commit {
79                meta.ref_name = Some(name);
80                break;
81            }
82        }
83    }
84    meta
85}
86
87/// Process-smudge metadata for `git archive` (matches Git / t0021).
88///
89/// `tree_ish_arg` is the user's argument (`main`, full commit hex, or tree hex).
90/// `resolved_tip` is the OID `archive` resolved; `tip_is_commit` is true when that object is a commit.
91#[must_use]
92pub fn smudge_meta_for_archive(
93    repo: &Repository,
94    tree_ish_arg: &str,
95    resolved_tip: &ObjectId,
96    tip_is_commit: bool,
97    blob_hex: &str,
98) -> FilterSmudgeMeta {
99    let mut meta = FilterSmudgeMeta {
100        blob_hex: Some(blob_hex.to_string()),
101        ..Default::default()
102    };
103    if !tip_is_commit {
104        meta.treeish_hex = Some(resolved_tip.to_string());
105        return meta;
106    }
107    let tip_hex = resolved_tip.to_string();
108    meta.treeish_hex = Some(tip_hex.clone());
109    let arg_lower = tree_ish_arg.to_ascii_lowercase();
110    let is_full_hex = arg_lower.len() == 40 && arg_lower.chars().all(|c| c.is_ascii_hexdigit());
111    if is_full_hex && arg_lower == tip_hex.to_ascii_lowercase() {
112        meta.ref_name = None;
113        return meta;
114    }
115    if let Ok(oid) = refs::resolve_ref(&repo.git_dir, tree_ish_arg) {
116        if oid == *resolved_tip {
117            meta.ref_name = Some(tree_ish_arg.to_string());
118            return meta;
119        }
120    }
121    let heads = format!("refs/heads/{tree_ish_arg}");
122    if let Ok(oid) = refs::resolve_ref(&repo.git_dir, &heads) {
123        if oid == *resolved_tip {
124            meta.ref_name = Some(heads);
125        }
126    }
127    meta
128}
129
130pub fn smudge_meta_for_checkout(repo: &Repository, blob_hex: &str) -> FilterSmudgeMeta {
131    let mut meta = FilterSmudgeMeta {
132        blob_hex: Some(blob_hex.to_string()),
133        ..Default::default()
134    };
135    let Ok(content) = std::fs::read_to_string(repo.git_dir.join("HEAD")) else {
136        return meta;
137    };
138    let content = content.trim();
139    if let Some(sym) = content.strip_prefix("ref: ") {
140        let sym = sym.trim();
141        meta.ref_name = Some(sym.to_string());
142        if let Ok(oid) = refs::resolve_ref(&repo.git_dir, sym) {
143            meta.treeish_hex = Some(oid.to_string());
144        }
145    } else if content.len() == 40 {
146        if let Ok(oid) = ObjectId::from_hex(content) {
147            meta.treeish_hex = Some(oid.to_string());
148        }
149    }
150    meta
151}
152
153struct RunningFilter {
154    #[allow(dead_code)]
155    child: Child,
156    stdin: Option<ChildStdin>,
157    stdout: Option<ChildStdout>,
158    caps: u32,
159}
160
161fn process_registry() -> &'static Mutex<HashMap<String, Arc<Mutex<RunningFilter>>>> {
162    static REG: OnceLock<Mutex<HashMap<String, Arc<Mutex<RunningFilter>>>>> = OnceLock::new();
163    REG.get_or_init(|| Mutex::new(HashMap::new()))
164}
165
166fn set_packet_header(len: usize, out: &mut [u8; 4]) {
167    const HEX: &[u8; 16] = b"0123456789abcdef";
168    out[0] = HEX[(len >> 12) & 0xf];
169    out[1] = HEX[(len >> 8) & 0xf];
170    out[2] = HEX[(len >> 4) & 0xf];
171    out[3] = HEX[len & 0xf];
172}
173
174fn write_packet(stdin: &mut ChildStdin, payload: &[u8]) -> std::io::Result<()> {
175    if payload.len() > LARGE_PACKET_DATA_MAX {
176        return Err(std::io::Error::other("filter packet payload too large"));
177    }
178    let total = payload.len() + 4;
179    let mut hdr = [0u8; 4];
180    set_packet_header(total, &mut hdr);
181    stdin.write_all(&hdr)?;
182    stdin.write_all(payload)?;
183    stdin.flush()?;
184    Ok(())
185}
186
187fn write_packet_line(stdin: &mut ChildStdin, line: &str) -> std::io::Result<()> {
188    let mut s = line.to_string();
189    if !s.ends_with('\n') {
190        s.push('\n');
191    }
192    write_packet(stdin, s.as_bytes())
193}
194
195fn write_flush(stdin: &mut ChildStdin) -> std::io::Result<()> {
196    stdin.write_all(b"0000")?;
197    stdin.flush()
198}
199
200fn read_exact<R: Read>(r: &mut R, buf: &mut [u8]) -> std::io::Result<()> {
201    let mut off = 0;
202    while off < buf.len() {
203        let n = r.read(&mut buf[off..])?;
204        if n == 0 {
205            return Err(std::io::Error::new(
206                std::io::ErrorKind::UnexpectedEof,
207                "unexpected EOF reading pkt-line",
208            ));
209        }
210        off += n;
211    }
212    Ok(())
213}
214
215fn read_packet_header(stdout: &mut ChildStdout) -> std::io::Result<Option<[u8; 4]>> {
216    let mut hdr = [0u8; 4];
217    let mut off = 0usize;
218    while off < 4 {
219        let n = stdout.read(&mut hdr[off..])?;
220        if n == 0 {
221            if off == 0 {
222                return Ok(None);
223            }
224            return Err(std::io::Error::new(
225                std::io::ErrorKind::UnexpectedEof,
226                "unexpected EOF reading pkt-line",
227            ));
228        }
229        off += n;
230    }
231    Ok(Some(hdr))
232}
233
234fn read_packet_payload(stdout: &mut ChildStdout) -> std::io::Result<Option<Vec<u8>>> {
235    let Some(hdr) = read_packet_header(stdout)? else {
236        return Ok(None);
237    };
238    let hex = std::str::from_utf8(&hdr)
239        .map_err(|e| std::io::Error::new(std::io::ErrorKind::InvalidData, e))?;
240    let total = usize::from_str_radix(hex, 16).map_err(|_| {
241        std::io::Error::new(std::io::ErrorKind::InvalidData, "invalid pkt-line header")
242    })?;
243    if total == 0 {
244        return Ok(None);
245    }
246    if total < 4 {
247        return Err(std::io::Error::new(
248            std::io::ErrorKind::InvalidData,
249            "invalid pkt-line length",
250        ));
251    }
252    let len = total - 4;
253    let mut payload = vec![0u8; len];
254    read_exact(stdout, &mut payload)?;
255    Ok(Some(payload))
256}
257
258fn read_packet_line(stdout: &mut ChildStdout) -> std::io::Result<Option<String>> {
259    let Some(payload) = read_packet_payload(stdout)? else {
260        return Ok(None);
261    };
262    let s = String::from_utf8_lossy(&payload).into_owned();
263    Ok(Some(s.trim_end_matches('\n').to_string()))
264}
265
266/// Read pkt-lines until flush; updates `acc` only when a `status=` line appears (matches Git
267/// `subprocess_read_status` — if the segment is empty, `acc` is left unchanged).
268fn read_status(stdout: &mut ChildStdout, acc: &mut String) -> std::io::Result<()> {
269    loop {
270        let Some(line) = read_packet_line(stdout)? else {
271            break;
272        };
273        if let Some(rest) = line.strip_prefix("status=") {
274            *acc = rest.to_string();
275        }
276    }
277    Ok(())
278}
279
280fn read_packetized(stdout: &mut ChildStdout) -> std::io::Result<Vec<u8>> {
281    let mut out = Vec::new();
282    loop {
283        let Some(chunk) = read_packet_payload(stdout)? else {
284            break;
285        };
286        out.extend_from_slice(&chunk);
287    }
288    Ok(out)
289}
290
291fn handshake(stdout: &mut ChildStdout, stdin: &mut ChildStdin) -> std::io::Result<u32> {
292    // Match Git's test-tool rot13-filter: client sends only `version=2` before the first flush.
293    write_packet_line(stdin, "git-filter-client")?;
294    write_packet_line(stdin, "version=2")?;
295    write_flush(stdin)?;
296
297    let Some(server) = read_packet_line(stdout)? else {
298        return Err(std::io::Error::new(
299            std::io::ErrorKind::UnexpectedEof,
300            "expected git-filter-server",
301        ));
302    };
303    if server != "git-filter-server" {
304        return Err(std::io::Error::new(
305            std::io::ErrorKind::InvalidData,
306            format!("unexpected filter server line: {server}"),
307        ));
308    }
309    let Some(ver_line) = read_packet_line(stdout)? else {
310        return Err(std::io::Error::new(
311            std::io::ErrorKind::UnexpectedEof,
312            "expected version line",
313        ));
314    };
315    let ver = ver_line
316        .strip_prefix("version=")
317        .ok_or_else(|| std::io::Error::new(std::io::ErrorKind::InvalidData, "expected version="))?;
318    if ver != "2" {
319        return Err(std::io::Error::new(
320            std::io::ErrorKind::InvalidData,
321            format!("unsupported filter protocol version {ver}"),
322        ));
323    }
324    if read_packet_line(stdout)?.is_some() {
325        return Err(std::io::Error::new(
326            std::io::ErrorKind::InvalidData,
327            "expected flush after version",
328        ));
329    }
330
331    write_packet_line(stdin, "capability=clean")?;
332    write_packet_line(stdin, "capability=smudge")?;
333    write_packet_line(stdin, "capability=delay")?;
334    write_flush(stdin)?;
335
336    let mut caps = 0u32;
337    loop {
338        let Some(line) = read_packet_line(stdout)? else {
339            break;
340        };
341        if let Some(name) = line.strip_prefix("capability=") {
342            match name {
343                "clean" => caps |= CAP_CLEAN,
344                "smudge" => caps |= CAP_SMUDGE,
345                "delay" => caps |= CAP_DELAY,
346                _ => {}
347            }
348        }
349    }
350
351    Ok(caps)
352}
353
354fn spawn_running(cmd: &str) -> std::io::Result<RunningFilter> {
355    let mut child = Command::new("sh")
356        .arg("-c")
357        .arg(cmd)
358        // Upstream tests isolate `HOME` to the trash dir; if the parent shell exports
359        // `GIT_CONFIG_GLOBAL` to a host file, nested `git`/`grit` inside long-running
360        // filters would ignore `$HOME/.gitconfig` and miss `test_config_global` entries
361        // (t2082 delayed checkout).
362        .env_remove("GIT_CONFIG_GLOBAL")
363        .stdin(Stdio::piped())
364        .stdout(Stdio::piped())
365        .stderr(Stdio::inherit())
366        .spawn()?;
367
368    let mut stdin = child
369        .stdin
370        .take()
371        .ok_or_else(|| std::io::Error::other("filter process missing stdin"))?;
372    let mut stdout = child
373        .stdout
374        .take()
375        .ok_or_else(|| std::io::Error::other("filter process missing stdout"))?;
376
377    let caps = handshake(&mut stdout, &mut stdin)?;
378
379    Ok(RunningFilter {
380        child,
381        stdin: Some(stdin),
382        stdout: Some(stdout),
383        caps,
384    })
385}
386
387/// Ensure the long-running filter for `cmd` is running (handshake complete).
388pub fn ensure_process_filter_started(cmd: &str) -> Result<(), String> {
389    ensure_started(cmd)
390}
391
392fn ensure_started(cmd: &str) -> Result<(), String> {
393    let mut reg = process_registry()
394        .lock()
395        .map_err(|_| "filter registry poisoned".to_string())?;
396    use std::collections::hash_map::Entry;
397    match reg.entry(cmd.to_string()) {
398        Entry::Occupied(_) => Ok(()),
399        Entry::Vacant(v) => {
400            let rf = spawn_running(cmd).map_err(|e| e.to_string())?;
401            v.insert(Arc::new(Mutex::new(rf)));
402            Ok(())
403        }
404    }
405}
406
407fn write_packetized(stdin: &mut ChildStdin, data: &[u8]) -> std::io::Result<()> {
408    let mut off = 0usize;
409    while off < data.len() {
410        let end = (off + LARGE_PACKET_DATA_MAX).min(data.len());
411        write_packet(stdin, &data[off..end])?;
412        off = end;
413    }
414    Ok(())
415}
416
417/// Run clean via long-running filter `cmd` for `path` and `input`.
418pub fn apply_process_clean(cmd: &str, path: &str, input: &[u8]) -> Result<Vec<u8>, String> {
419    ensure_started(cmd)?;
420    let arc = {
421        let reg = process_registry()
422            .lock()
423            .map_err(|_| "filter registry poisoned".to_string())?;
424        reg.get(cmd)
425            .cloned()
426            .ok_or_else(|| "filter process not registered".to_string())?
427    };
428    let mut rf = arc
429        .lock()
430        .map_err(|_| "filter process mutex poisoned".to_string())?;
431    if rf.caps & CAP_CLEAN == 0 {
432        return Err("filter process does not support clean".to_string());
433    }
434    let mut stdin = rf
435        .stdin
436        .take()
437        .ok_or_else(|| "filter stdin missing".to_string())?;
438    let mut stdout = rf
439        .stdout
440        .take()
441        .ok_or_else(|| "filter stdout missing".to_string())?;
442
443    let result = (|| {
444        write_packet_line(&mut stdin, "command=clean").map_err(|e| e.to_string())?;
445        write_packet_line(&mut stdin, &format!("pathname={path}")).map_err(|e| e.to_string())?;
446        write_flush(&mut stdin).map_err(|e| e.to_string())?;
447        write_packetized(&mut stdin, input).map_err(|e| e.to_string())?;
448        write_flush(&mut stdin).map_err(|e| e.to_string())?;
449
450        let mut st = String::new();
451        read_status(&mut stdout, &mut st).map_err(|e| e.to_string())?;
452        if st != "success" {
453            return Err(format!("filter status: {st}"));
454        }
455        let out = read_packetized(&mut stdout).map_err(|e| e.to_string())?;
456        read_status(&mut stdout, &mut st).map_err(|e| e.to_string())?;
457        if st != "success" {
458            return Err(format!("filter tail status: {st}"));
459        }
460        Ok(out)
461    })();
462
463    rf.stdin = Some(stdin);
464    rf.stdout = Some(stdout);
465    result
466}
467
468/// One path deferred by a process filter that returned `status=delayed` (Git `delayed_checkout`).
469#[derive(Debug, Clone)]
470pub struct DelayedProcessCheckoutEntry {
471    /// `filter.<name>.process` command line.
472    pub filter_cmd: String,
473    pub path: String,
474    pub smudge_meta: FilterSmudgeMeta,
475}
476
477/// Paths waiting for `list_available_blobs` / retry smudge (Git `finish_delayed_checkout`).
478#[derive(Debug, Default)]
479pub struct DelayedProcessCheckout {
480    pub entries: Vec<DelayedProcessCheckoutEntry>,
481}
482
483impl DelayedProcessCheckout {
484    /// Record a delayed smudge; the file must be written after [`Self::finish`].
485    pub fn push_delayed(
486        &mut self,
487        filter_cmd: String,
488        path: String,
489        smudge_meta: FilterSmudgeMeta,
490    ) {
491        self.entries.push(DelayedProcessCheckoutEntry {
492            filter_cmd,
493            path,
494            smudge_meta,
495        });
496    }
497
498    /// Complete delayed checkouts: query filters for available paths and materialize each file.
499    ///
500    /// `convert_retry` matches Git `CE_RETRY`: empty blob through ident/encoding/eol then a
501    /// second smudge without `can-delay` (filter returns cached output).
502    pub fn finish(
503        &mut self,
504        mut convert_retry: impl FnMut(&str, &FilterSmudgeMeta) -> Result<Vec<u8>, String>,
505        mut write_out: impl FnMut(&str, &[u8]) -> Result<(), String>,
506    ) -> Result<(), String> {
507        while !self.entries.is_empty() {
508            let mut made_progress = false;
509            let cmds: HashSet<String> = self.entries.iter().map(|e| e.filter_cmd.clone()).collect();
510            for cmd in cmds {
511                let available = list_available_blobs(&cmd)?;
512                for path in available {
513                    let Some(pos) = self
514                        .entries
515                        .iter()
516                        .position(|e| e.filter_cmd == cmd && e.path == path)
517                    else {
518                        continue;
519                    };
520                    let entry = self.entries.swap_remove(pos);
521                    let data = convert_retry(&entry.path, &entry.smudge_meta)?;
522                    write_out(&entry.path, &data)?;
523                    made_progress = true;
524                }
525            }
526            if !made_progress {
527                return Err(format!(
528                    "delayed process filter checkout stalled with {} pending path(s)",
529                    self.entries.len()
530                ));
531            }
532        }
533        Ok(())
534    }
535}
536
537/// True when `cmd` is running (or can be started) and advertises the `delay` capability.
538pub fn process_filter_supports_delay(cmd: &str) -> bool {
539    if cmd.is_empty() {
540        return false;
541    }
542    if ensure_process_filter_started(cmd).is_err() {
543        return false;
544    }
545    let Ok(reg) = process_registry().lock() else {
546        return false;
547    };
548    let Some(arc) = reg.get(cmd) else {
549        return false;
550    };
551    let Ok(rf) = arc.lock() else {
552        return false;
553    };
554    (rf.caps & CAP_DELAY) != 0
555}
556
557fn list_available_blobs(cmd: &str) -> Result<Vec<String>, String> {
558    ensure_started(cmd)?;
559    let arc = {
560        let reg = process_registry()
561            .lock()
562            .map_err(|_| "filter registry poisoned".to_string())?;
563        reg.get(cmd)
564            .cloned()
565            .ok_or_else(|| "filter process not registered".to_string())?
566    };
567    let mut rf = arc
568        .lock()
569        .map_err(|_| "filter process mutex poisoned".to_string())?;
570    if rf.caps & CAP_DELAY == 0 {
571        return Err("filter does not support delay".to_string());
572    }
573    let mut stdin = rf
574        .stdin
575        .take()
576        .ok_or_else(|| "filter stdin missing".to_string())?;
577    let mut stdout = rf
578        .stdout
579        .take()
580        .ok_or_else(|| "filter stdout missing".to_string())?;
581
582    let result = (|| {
583        write_packet_line(&mut stdin, "command=list_available_blobs").map_err(|e| e.to_string())?;
584        write_flush(&mut stdin).map_err(|e| e.to_string())?;
585        let mut paths = Vec::new();
586        loop {
587            let line = read_packet_line(&mut stdout).map_err(|e| e.to_string())?;
588            let Some(line) = line else {
589                break;
590            };
591            if let Some(p) = line.strip_prefix("pathname=") {
592                paths.push(p.to_string());
593            }
594        }
595        let mut st = String::new();
596        read_status(&mut stdout, &mut st).map_err(|e| e.to_string())?;
597        if st != "success" {
598            return Err(format!("list_available_blobs status: {st}"));
599        }
600        Ok(paths)
601    })();
602
603    rf.stdin = Some(stdin);
604    rf.stdout = Some(stdout);
605    result
606}
607
608/// Run smudge via long-running filter.
609///
610/// When `can_delay` is true and the filter returns `status=delayed`, returns `Ok(None)` after
611/// recording is left to the caller ([`DelayedProcessCheckout`]).
612pub fn apply_process_smudge(
613    cmd: &str,
614    path: &str,
615    input: &[u8],
616    meta: Option<&FilterSmudgeMeta>,
617    can_delay: bool,
618) -> Result<Option<Vec<u8>>, String> {
619    ensure_started(cmd)?;
620    let arc = {
621        let reg = process_registry()
622            .lock()
623            .map_err(|_| "filter registry poisoned".to_string())?;
624        reg.get(cmd)
625            .cloned()
626            .ok_or_else(|| "filter process not registered".to_string())?
627    };
628    let mut rf = arc
629        .lock()
630        .map_err(|_| "filter process mutex poisoned".to_string())?;
631    let caps = rf.caps;
632    let mut stdin = rf
633        .stdin
634        .take()
635        .ok_or_else(|| "filter stdin missing".to_string())?;
636    let mut stdout = rf
637        .stdout
638        .take()
639        .ok_or_else(|| "filter stdout missing".to_string())?;
640
641    let result = (|| {
642        if caps & CAP_SMUDGE == 0 {
643            return Ok(Some(input.to_vec()));
644        }
645        write_packet_line(&mut stdin, "command=smudge").map_err(|e| e.to_string())?;
646        write_packet_line(&mut stdin, &format!("pathname={path}")).map_err(|e| e.to_string())?;
647        if let Some(m) = meta {
648            if let Some(r) = &m.ref_name {
649                write_packet_line(&mut stdin, &format!("ref={r}")).map_err(|e| e.to_string())?;
650            }
651            if let Some(t) = &m.treeish_hex {
652                write_packet_line(&mut stdin, &format!("treeish={t}"))
653                    .map_err(|e| e.to_string())?;
654            }
655            if let Some(b) = &m.blob_hex {
656                write_packet_line(&mut stdin, &format!("blob={b}")).map_err(|e| e.to_string())?;
657            }
658        }
659        if can_delay && (caps & CAP_DELAY) != 0 {
660            write_packet_line(&mut stdin, "can-delay=1").map_err(|e| e.to_string())?;
661        }
662        write_flush(&mut stdin).map_err(|e| e.to_string())?;
663        write_packetized(&mut stdin, input).map_err(|e| e.to_string())?;
664        write_flush(&mut stdin).map_err(|e| e.to_string())?;
665
666        let mut st = String::new();
667        read_status(&mut stdout, &mut st).map_err(|e| e.to_string())?;
668        if st == "delayed" {
669            if !can_delay {
670                return Err("unexpected delayed status from filter".to_string());
671            }
672            return Ok(None);
673        }
674        if st != "success" {
675            return Err(format!("filter status: {st}"));
676        }
677        let out = read_packetized(&mut stdout).map_err(|e| e.to_string())?;
678        read_status(&mut stdout, &mut st).map_err(|e| e.to_string())?;
679        if st != "success" {
680            return Err(format!("filter tail status: {st}"));
681        }
682        Ok(Some(out))
683    })();
684
685    rf.stdin = Some(stdin);
686    rf.stdout = Some(stdout);
687    result
688}