Skip to main content

exiftool_rs_wrapper/
process.rs

1//! ExifTool `-stay_open` 进程管理
2
3use crate::error::{Error, Result};
4use std::io::{BufRead, BufReader, BufWriter, Write};
5
6use std::process::{Child, ChildStdin, ChildStdout, Command, Stdio};
7
8use std::sync::mpsc::{self, Receiver, RecvTimeoutError};
9use std::time::Duration;
10use tracing::{debug, info, warn};
11
12const DEFAULT_RESPONSE_TIMEOUT: Duration = Duration::from_secs(30);
13
14/// 命令编号类型
15///
16/// 用于在多命令执行场景中区分不同命令的响应
17#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash)]
18pub struct CommandId(pub u32);
19
20impl CommandId {
21    /// 创建新的命令编号
22    pub fn new(id: u32) -> Self {
23        Self(id)
24    }
25
26    /// 获取编号值
27    pub fn value(&self) -> u32 {
28        self.0
29    }
30}
31
32/// 命令执行请求
33#[derive(Debug, Clone)]
34pub struct CommandRequest {
35    /// 命令编号(用于多命令场景)
36    pub id: Option<CommandId>,
37    /// 命令参数
38    pub args: Vec<String>,
39}
40
41impl CommandRequest {
42    /// 创建新的命令请求(无编号)
43    pub fn new(args: Vec<String>) -> Self {
44        Self { id: None, args }
45    }
46
47    /// 创建带编号的命令请求
48    pub fn with_id(id: CommandId, args: Vec<String>) -> Self {
49        Self { id: Some(id), args }
50    }
51}
52
53/// ExifTool 进程内部状态
54pub struct ExifToolInner {
55    process: Child,
56    stdin: BufWriter<ChildStdin>,
57    stdout_rx: Receiver<String>,
58    response_timeout: Duration,
59}
60
61impl std::fmt::Debug for ExifToolInner {
62    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
63        f.debug_struct("ExifToolInner")
64            .field("process", &self.process.id())
65            .finish()
66    }
67}
68
69impl ExifToolInner {
70    /// 启动新的 ExifTool 进程(-stay_open 模式,从 PATH 查找)
71    pub fn new() -> Result<Self> {
72        Self::with_executable_and_timeout("exiftool", DEFAULT_RESPONSE_TIMEOUT)
73    }
74
75    /// 使用指定的可执行文件路径启动 ExifTool 进程
76    pub fn with_executable<P: AsRef<std::ffi::OsStr>>(exe: P) -> Result<Self> {
77        Self::with_executable_and_timeout(exe, DEFAULT_RESPONSE_TIMEOUT)
78    }
79
80    /// 使用指定可执行文件和响应超时启动 ExifTool 进程
81    pub fn with_executable_and_timeout<P: AsRef<std::ffi::OsStr>>(
82        exe: P,
83        response_timeout: Duration,
84    ) -> Result<Self> {
85        info!("Starting ExifTool process with -stay_open mode");
86
87        let mut process = Command::new(exe)
88            .arg("-stay_open")
89            .arg("True")
90            .arg("-@")
91            .arg("-")
92            .stdin(Stdio::piped())
93            .stdout(Stdio::piped())
94            .stderr(Stdio::null())
95            .spawn()
96            .map_err(|e| {
97                if e.kind() == std::io::ErrorKind::NotFound {
98                    Error::ExifToolNotFound
99                } else {
100                    e.into()
101                }
102            })?;
103
104        let stdin = process
105            .stdin
106            .take()
107            .ok_or_else(|| Error::process("Failed to capture stdin"))?;
108
109        let stdout = process
110            .stdout
111            .take()
112            .ok_or_else(|| Error::process("Failed to capture stdout"))?;
113
114        let stdout_rx = Self::spawn_stdout_reader(stdout);
115
116        let mut inner = Self {
117            process,
118            stdin: BufWriter::new(stdin),
119            stdout_rx,
120            response_timeout,
121        };
122
123        // 验证进程是否正常工作
124        inner.verify_process()?;
125
126        info!("ExifTool process started successfully");
127        Ok(inner)
128    }
129
130    /// 验证进程是否正常工作
131    fn verify_process(&mut self) -> Result<()> {
132        debug!("Verifying ExifTool process");
133
134        // 发送版本查询命令
135        self.send_line("-ver")?;
136        self.send_line("-execute")?;
137        self.stdin.flush()?;
138
139        // 读取响应
140        let response = self.read_response()?;
141        debug!("ExifTool version: {}", response.text().trim());
142
143        Ok(())
144    }
145
146    /// 发送单行命令
147    pub fn send_line(&mut self, line: &str) -> Result<()> {
148        debug!("Sending command: {}", line);
149        writeln!(self.stdin, "{}", line)?;
150        Ok(())
151    }
152
153    /// 执行命令并获取响应(无编号)
154    pub fn execute(&mut self, args: &[String]) -> Result<Response> {
155        self.execute_with_id(None, args)
156    }
157
158    /// 执行命令并获取响应(支持编号)
159    pub fn execute_with_id(
160        &mut self,
161        command_num: Option<usize>,
162        args: &[String],
163    ) -> Result<Response> {
164        debug!("Executing command with {} args", args.len());
165
166        // 发送所有参数
167        for arg in args {
168            self.send_line(arg)?;
169        }
170
171        // 发送执行命令(带编号或不带)
172        if let Some(num) = command_num {
173            self.send_line(&format!("-execute{}", num))?;
174        } else {
175            self.send_line("-execute")?;
176        }
177        self.stdin.flush()?;
178
179        // 读取响应
180        self.read_response_for_num(command_num)
181    }
182
183    /// 读取响应(直到遇到 {ready} 或 {readyNUM})
184    pub fn read_response(&mut self) -> Result<Response> {
185        self.read_response_for_id(None)
186    }
187
188    /// 读取特定命令编号的响应
189    fn read_response_for_id(&mut self, expected_id: Option<CommandId>) -> Result<Response> {
190        let mut lines = Vec::new();
191
192        loop {
193            let buffer = match self.stdout_rx.recv_timeout(self.response_timeout) {
194                Ok(line) => line,
195                Err(RecvTimeoutError::Timeout) => return Err(Error::Timeout),
196                Err(RecvTimeoutError::Disconnected) => {
197                    return Err(Error::process("Unexpected EOF from ExifTool process"));
198                }
199            };
200
201            let trimmed = buffer.trim();
202            debug!("Received line: {}", trimmed);
203
204            if trimmed.starts_with("{ready") && trimmed.ends_with('}') {
205                if trimmed == "{ready}" {
206                    // 无编号响应
207                    if let Some(expected) = expected_id {
208                        return Err(Error::process(format!(
209                            "Expected {{ready{}}}, but received {{ready}}",
210                            expected.value()
211                        )));
212                    }
213                    break;
214                }
215
216                // 解析 {readyNUM} 格式
217                let content = &trimmed[6..trimmed.len() - 1];
218
219                // 检查是否为错误码
220                if let Ok(code) = content.parse::<i32>() {
221                    if code != 0 {
222                        let message = format!("ExifTool 返回错误码: {}", code);
223                        return Err(Error::process(message));
224                    }
225                    // 编号为 0 表示成功但不匹配(正常情况下不应该发生)
226                } else {
227                    // 解析命令编号
228                    match content.parse::<u32>() {
229                        Ok(id) => {
230                            let received_id = CommandId::new(id);
231                            if let Some(expected) = expected_id
232                                && received_id != expected
233                            {
234                                return Err(Error::process(format!(
235                                    "命令编号不匹配: 期望 {}, 收到 {}",
236                                    expected.value(),
237                                    received_id.value()
238                                )));
239                            }
240                            break;
241                        }
242                        Err(_) => {
243                            return Err(Error::process(format!(
244                                "无法解析 {{ready}} 标记中的编号: {}",
245                                trimmed
246                            )));
247                        }
248                    }
249                }
250            }
251
252            lines.push(buffer.clone());
253        }
254
255        Ok(Response::new(lines))
256    }
257
258    /// 批量执行命令(串行执行)
259    pub fn execute_batch(&mut self, commands: &[Vec<String>]) -> Result<Vec<Response>> {
260        debug!("Executing batch of {} commands", commands.len());
261
262        let mut responses = Vec::with_capacity(commands.len());
263
264        for args in commands {
265            let response = self.execute(args)?;
266            responses.push(response);
267        }
268
269        Ok(responses)
270    }
271
272    /// 批量执行命令(原子多命令,使用 -execute[NUM])
273    ///
274    /// 所有命令在一个事务中发送,通过顺序区分响应。
275    /// 使用 -execute1, -execute2 等格式让 ExifTool 在 {ready} 标记中包含编号。
276    pub fn execute_multiple(&mut self, commands: &[Vec<String>]) -> Result<Vec<Response>> {
277        if commands.is_empty() {
278            return Ok(Vec::new());
279        }
280
281        debug!("Executing {} commands atomically", commands.len());
282
283        // 发送所有命令(不立即读取响应)
284        for (idx, args) in commands.iter().enumerate() {
285            let command_num = idx + 1; // 从 1 开始计数
286
287            // 发送参数
288            for arg in args {
289                self.send_line(arg)?;
290            }
291
292            // 发送带编号的执行命令
293            self.send_line(&format!("-execute{}", command_num))?;
294        }
295        self.stdin.flush()?;
296
297        // 按顺序读取所有响应
298        let mut responses = Vec::with_capacity(commands.len());
299        for idx in 0..commands.len() {
300            let expected_num = idx + 1;
301            let response = self.read_response_for_num(Some(expected_num))?;
302            responses.push(response);
303        }
304
305        Ok(responses)
306    }
307
308    /// 读取特定命令编号的响应
309    fn read_response_for_num(&mut self, expected_num: Option<usize>) -> Result<Response> {
310        let mut lines = Vec::new();
311
312        loop {
313            let buffer = match self.stdout_rx.recv_timeout(self.response_timeout) {
314                Ok(line) => line,
315                Err(RecvTimeoutError::Timeout) => return Err(Error::Timeout),
316                Err(RecvTimeoutError::Disconnected) => {
317                    return Err(Error::process("Unexpected EOF from ExifTool process"));
318                }
319            };
320
321            let trimmed = buffer.trim();
322            debug!("Received line: {}", trimmed);
323
324            if trimmed.starts_with("{ready") && trimmed.ends_with('}') {
325                if trimmed == "{ready}" {
326                    // 无编号响应
327                    if let Some(expected) = expected_num {
328                        return Err(Error::process(format!(
329                            "Expected {{ready{}}}, but received {{ready}}",
330                            expected
331                        )));
332                    }
333                    break;
334                }
335
336                // 解析 {readyNUM} 格式
337                let content = &trimmed[6..trimmed.len() - 1];
338
339                // 尝试解析为命令编号
340                match content.parse::<usize>() {
341                    Ok(num) => {
342                        if let Some(expected) = expected_num
343                            && num != expected
344                        {
345                            return Err(Error::process(format!(
346                                "命令编号不匹配: 期望 {}, 收到 {}",
347                                expected, num
348                            )));
349                        }
350                        break;
351                    }
352                    Err(_) => {
353                        return Err(Error::process(format!(
354                            "无法解析 {{ready}} 标记中的编号: {}",
355                            trimmed
356                        )));
357                    }
358                }
359            }
360
361            lines.push(buffer.clone());
362        }
363
364        Ok(Response::new(lines))
365    }
366
367    /// 刷新 stdin
368    pub fn flush(&mut self) -> Result<()> {
369        self.stdin.flush().map_err(|e| e.into())
370    }
371
372    /// 关闭进程
373    pub fn close(&mut self) -> Result<()> {
374        info!("Closing ExifTool process");
375
376        // 发送关闭命令
377        let _ = self.send_line("-stay_open");
378        let _ = self.send_line("False");
379        let _ = self.send_line("-execute");
380        let _ = self.stdin.flush();
381
382        // 等待进程退出
383        match self.wait_with_timeout(Duration::from_secs(5)) {
384            Ok(Some(status)) => {
385                if let Some(code) = status.code() {
386                    if code != 0 {
387                        warn!("ExifTool exited with code: {}", code);
388                    } else {
389                        info!("ExifTool process exited cleanly");
390                    }
391                }
392            }
393            Ok(None) => {
394                warn!("ExifTool did not exit gracefully, forcing kill");
395                let _ = self.process.kill();
396            }
397            Err(e) => {
398                warn!("Error waiting for ExifTool: {}", e);
399                let _ = self.process.kill();
400            }
401        }
402
403        Ok(())
404    }
405
406    /// 带超时的等待
407    fn wait_with_timeout(&mut self, timeout: Duration) -> Result<Option<std::process::ExitStatus>> {
408        use std::thread;
409
410        let start = std::time::Instant::now();
411
412        loop {
413            match self.process.try_wait()? {
414                Some(status) => return Ok(Some(status)),
415                None => {
416                    if start.elapsed() >= timeout {
417                        return Ok(None);
418                    }
419                    thread::sleep(Duration::from_millis(10));
420                }
421            }
422        }
423    }
424}
425
426impl ExifToolInner {
427    /// 启动 stdout 读取线程,逐行转发到通道
428    fn spawn_stdout_reader(stdout: ChildStdout) -> Receiver<String> {
429        let (tx, rx) = mpsc::channel();
430
431        std::thread::spawn(move || {
432            let mut reader = BufReader::new(stdout);
433            let mut buffer = String::new();
434
435            loop {
436                buffer.clear();
437                match reader.read_line(&mut buffer) {
438                    Ok(0) => break,
439                    Ok(_) => {
440                        if tx.send(buffer.clone()).is_err() {
441                            break;
442                        }
443                    }
444                    Err(_) => break,
445                }
446            }
447        });
448
449        rx
450    }
451}
452
453impl Drop for ExifToolInner {
454    fn drop(&mut self) {
455        if let Err(e) = self.close() {
456            warn!("Error closing ExifTool process: {}", e);
457        }
458    }
459}
460
461/// 命令响应
462#[derive(Debug, Clone)]
463pub struct Response {
464    lines: Vec<String>,
465}
466
467impl Response {
468    /// 创建新响应
469    pub fn new(lines: Vec<String>) -> Self {
470        Self { lines }
471    }
472
473    /// 获取所有行
474    pub fn lines(&self) -> &[String] {
475        &self.lines
476    }
477
478    /// 获取合并的文本内容
479    pub fn text(&self) -> String {
480        self.lines.join("")
481    }
482
483    /// 获取 JSON 解析结果
484    pub fn json<T: serde::de::DeserializeOwned>(&self) -> Result<T> {
485        let text = self.text();
486        serde_json::from_str(&text).map_err(|e| e.into())
487    }
488
489    /// 检查是否有错误
490    pub fn is_error(&self) -> bool {
491        self.lines.iter().any(|line| line.contains("Error:"))
492    }
493
494    /// 获取错误信息
495    pub fn error_message(&self) -> Option<String> {
496        self.lines
497            .iter()
498            .find(|line| line.contains("Error:"))
499            .cloned()
500    }
501}
502
503#[cfg(test)]
504mod tests {
505    use super::*;
506
507    #[test]
508    fn test_response() {
509        let lines = vec!["Line 1".to_string(), "Line 2".to_string()];
510        let response = Response::new(lines);
511
512        assert_eq!(response.lines().len(), 2);
513        assert_eq!(response.text(), "Line 1Line 2");
514        assert!(!response.is_error());
515    }
516
517    #[test]
518    fn test_response_error() {
519        let lines = vec!["Error: Something went wrong".to_string()];
520        let response = Response::new(lines);
521
522        assert!(response.is_error());
523        assert!(response.error_message().is_some());
524    }
525
526    #[test]
527    fn test_response_warning_not_error() {
528        let lines = vec!["Warning: minor issue".to_string()];
529        let response = Response::new(lines);
530
531        assert!(!response.is_error());
532        assert!(response.error_message().is_none());
533    }
534
535    #[test]
536    fn test_response_json() {
537        let lines = vec![r#"{"key": "value"}"#.to_string()];
538        let response = Response::new(lines);
539
540        #[derive(Debug, serde::Deserialize, PartialEq)]
541        struct TestData {
542            key: String,
543        }
544
545        let data: TestData = response.json().unwrap();
546        assert_eq!(data.key, "value");
547    }
548
549    #[test]
550    fn test_command_id() {
551        let id1 = CommandId::new(1);
552        let id2 = CommandId::new(1);
553        let id3 = CommandId::new(2);
554
555        assert_eq!(id1, id2);
556        assert_ne!(id1, id3);
557        assert_eq!(id1.value(), 1);
558        assert_eq!(id3.value(), 2);
559    }
560
561    #[test]
562    fn test_command_request() {
563        let args = vec!["-ver".to_string()];
564
565        let req1 = CommandRequest::new(args.clone());
566        assert!(req1.id.is_none());
567        assert_eq!(req1.args, args);
568
569        let req2 = CommandRequest::with_id(CommandId::new(42), args.clone());
570        assert_eq!(req2.id.unwrap().value(), 42);
571        assert_eq!(req2.args, args);
572    }
573
574    #[test]
575    fn test_command_response_with_num() {
576        // 测试带编号的响应解析
577        use std::sync::atomic::{AtomicU32, Ordering};
578
579        let counter = AtomicU32::new(1);
580        let id1 = counter.fetch_add(1, Ordering::SeqCst);
581        let id2 = counter.fetch_add(1, Ordering::SeqCst);
582
583        assert_eq!(id1, 1);
584        assert_eq!(id2, 2);
585    }
586}