Skip to main content

drft/parsers/
custom.rs

1use super::{ParseResult, Parser};
2use std::collections::HashMap;
3use std::io::Write;
4use std::process::{Command, Stdio};
5use std::time::Duration;
6
7/// Custom parser. Runs an external command that receives a JSON
8/// options envelope on line 1, then file paths (one per line) on stdin,
9/// and emits NDJSON links on stdout.
10pub struct CustomParser {
11    pub parser_name: String,
12    /// File routing filter. None = receives all File nodes.
13    pub file_filter: Option<globset::GlobSet>,
14    pub command: String,
15    pub timeout_ms: u64,
16    pub scope_dir: std::path::PathBuf,
17    /// Parser options from `[parsers.<name>.options]`. Sent as JSON on stdin line 1.
18    pub options: Option<toml::Value>,
19}
20
21impl Parser for CustomParser {
22    fn name(&self) -> &str {
23        &self.parser_name
24    }
25
26    fn matches(&self, path: &str) -> bool {
27        match &self.file_filter {
28            Some(set) => set.is_match(path),
29            None => true, // No filter = receives all File nodes
30        }
31    }
32
33    fn parse(&self, path: &str, _content: &str) -> ParseResult {
34        // Single-file fallback — used when parse_batch isn't called
35        match self.run_batch(&[path]) {
36            Ok(mut results) => results.remove(path).unwrap_or_default(),
37            Err(e) => {
38                eprintln!("warn: parser {}: {path}: {e}", self.parser_name);
39                ParseResult::default()
40            }
41        }
42    }
43
44    fn parse_batch(&self, files: &[(&str, &str)]) -> HashMap<String, ParseResult> {
45        let paths: Vec<&str> = files.iter().map(|(path, _)| *path).collect();
46        match self.run_batch(&paths) {
47            Ok(results) => results,
48            Err(e) => {
49                eprintln!("warn: parser {}: batch failed: {e}", self.parser_name);
50                HashMap::new()
51            }
52        }
53    }
54}
55
56impl CustomParser {
57    fn run_batch(&self, paths: &[&str]) -> anyhow::Result<HashMap<String, ParseResult>> {
58        if paths.is_empty() {
59            return Ok(HashMap::new());
60        }
61
62        let mut child = Command::new("sh")
63            .arg("-c")
64            .arg(&self.command)
65            .current_dir(&self.scope_dir)
66            .stdin(Stdio::piped())
67            .stdout(Stdio::piped())
68            .stderr(Stdio::piped())
69            .spawn()?;
70
71        // Send options envelope on line 1, then file paths
72        if let Some(mut stdin) = child.stdin.take() {
73            let options_json = match &self.options {
74                Some(val) => serde_json::to_string(val).unwrap_or_else(|_| "{}".into()),
75                None => "{}".into(),
76            };
77            let _ = writeln!(stdin, "{options_json}");
78            for path in paths {
79                let _ = writeln!(stdin, "{path}");
80            }
81        }
82
83        // Wait with timeout
84        let output = match wait_with_timeout(&mut child, Duration::from_millis(self.timeout_ms)) {
85            Ok(output) => output,
86            Err(_) => {
87                let _ = child.kill();
88                anyhow::bail!("timed out after {}ms", self.timeout_ms);
89            }
90        };
91
92        if !output.status.success() {
93            let code = output.status.code().unwrap_or(-1);
94            anyhow::bail!("exited with code {code}");
95        }
96
97        let stdout = String::from_utf8_lossy(&output.stdout);
98        let mut results: HashMap<String, ParseResult> = HashMap::new();
99
100        for line in stdout.lines() {
101            let line = line.trim();
102            if line.is_empty() {
103                continue;
104            }
105
106            // Try as edge first ({ file, target, type }), then as metadata ({ file, metadata })
107            let json: serde_json::Value = match serde_json::from_str(line) {
108                Ok(v) => v,
109                Err(e) => {
110                    eprintln!(
111                        "warn: parser {}: malformed JSON line: {e}",
112                        self.parser_name
113                    );
114                    continue;
115                }
116            };
117
118            let file = match json.get("file").and_then(|v| v.as_str()) {
119                Some(f) => f.to_string(),
120                None => {
121                    eprintln!(
122                        "warn: parser {}: JSON line missing 'file' field",
123                        self.parser_name
124                    );
125                    continue;
126                }
127            };
128
129            if let Some(metadata) = json.get("metadata") {
130                // Metadata line: { file, metadata }
131                results.entry(file).or_default().metadata = Some(metadata.clone());
132            } else if let Some(target) = json.get("target").and_then(|v| v.as_str()) {
133                // Edge line: { file, target }
134                results
135                    .entry(file)
136                    .or_default()
137                    .links
138                    .push(target.to_string());
139            } else {
140                eprintln!(
141                    "warn: parser {}: JSON line has neither 'target' nor 'metadata'",
142                    self.parser_name
143                );
144            }
145        }
146
147        Ok(results)
148    }
149}
150
151fn wait_with_timeout(
152    child: &mut std::process::Child,
153    timeout: Duration,
154) -> Result<std::process::Output, ()> {
155    // Take pipes before the poll loop so reader threads can drain them
156    // concurrently. This prevents deadlock when a script fills the OS
157    // pipe buffer (~64 KB) — without concurrent draining, the child
158    // blocks on write while the parent blocks waiting for exit.
159    let stdout_pipe = child.stdout.take();
160    let stderr_pipe = child.stderr.take();
161
162    let stdout_thread = std::thread::spawn(move || {
163        stdout_pipe
164            .map(|mut s| {
165                let mut buf = Vec::new();
166                std::io::Read::read_to_end(&mut s, &mut buf).ok();
167                buf
168            })
169            .unwrap_or_default()
170    });
171    let stderr_thread = std::thread::spawn(move || {
172        stderr_pipe
173            .map(|mut s| {
174                let mut buf = Vec::new();
175                std::io::Read::read_to_end(&mut s, &mut buf).ok();
176                buf
177            })
178            .unwrap_or_default()
179    });
180
181    let start = std::time::Instant::now();
182    loop {
183        match child.try_wait() {
184            Ok(Some(status)) => {
185                let stdout = stdout_thread.join().unwrap_or_default();
186                let stderr = stderr_thread.join().unwrap_or_default();
187                return Ok(std::process::Output {
188                    status,
189                    stdout,
190                    stderr,
191                });
192            }
193            Ok(None) => {
194                if start.elapsed() > timeout {
195                    // Join reader threads so they don't leak. The child
196                    // will be killed by the caller, which closes the
197                    // pipes and unblocks the readers.
198                    let _ = stdout_thread.join();
199                    let _ = stderr_thread.join();
200                    return Err(());
201                }
202                std::thread::sleep(Duration::from_millis(50));
203            }
204            Err(_) => {
205                let _ = stdout_thread.join();
206                let _ = stderr_thread.join();
207                return Err(());
208            }
209        }
210    }
211}