1use crate::error::{Error, Result};
4use std::io::{BufRead, BufReader, BufWriter, Write};
5
6use std::process::{Child, ChildStdin, ChildStdout, Command, Stdio};
7
8use std::sync::mpsc::{self, Receiver, RecvTimeoutError};
9use std::time::Duration;
10use tracing::{debug, info, warn};
11
12const DEFAULT_RESPONSE_TIMEOUT: Duration = Duration::from_secs(30);
13
14#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash)]
18pub struct CommandId(pub u32);
19
20impl CommandId {
21 pub fn new(id: u32) -> Self {
23 Self(id)
24 }
25
26 pub fn value(&self) -> u32 {
28 self.0
29 }
30}
31
32#[derive(Debug, Clone)]
34pub struct CommandRequest {
35 pub id: Option<CommandId>,
37 pub args: Vec<String>,
39}
40
41impl CommandRequest {
42 pub fn new(args: Vec<String>) -> Self {
44 Self { id: None, args }
45 }
46
47 pub fn with_id(id: CommandId, args: Vec<String>) -> Self {
49 Self { id: Some(id), args }
50 }
51}
52
53pub struct ExifToolInner {
55 process: Child,
56 stdin: BufWriter<ChildStdin>,
57 stdout_rx: Receiver<String>,
58 response_timeout: Duration,
59}
60
61impl std::fmt::Debug for ExifToolInner {
62 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
63 f.debug_struct("ExifToolInner")
64 .field("process", &self.process.id())
65 .finish()
66 }
67}
68
69impl ExifToolInner {
70 pub fn new() -> Result<Self> {
72 Self::with_executable_and_timeout("exiftool", DEFAULT_RESPONSE_TIMEOUT)
73 }
74
75 pub fn with_executable<P: AsRef<std::ffi::OsStr>>(exe: P) -> Result<Self> {
77 Self::with_executable_and_timeout(exe, DEFAULT_RESPONSE_TIMEOUT)
78 }
79
80 pub fn with_executable_and_timeout<P: AsRef<std::ffi::OsStr>>(
82 exe: P,
83 response_timeout: Duration,
84 ) -> Result<Self> {
85 info!("Starting ExifTool process with -stay_open mode");
86
87 let mut process = Command::new(exe)
88 .arg("-stay_open")
89 .arg("True")
90 .arg("-@")
91 .arg("-")
92 .stdin(Stdio::piped())
93 .stdout(Stdio::piped())
94 .stderr(Stdio::null())
95 .spawn()
96 .map_err(|e| {
97 if e.kind() == std::io::ErrorKind::NotFound {
98 Error::ExifToolNotFound
99 } else {
100 e.into()
101 }
102 })?;
103
104 let stdin = process
105 .stdin
106 .take()
107 .ok_or_else(|| Error::process("Failed to capture stdin"))?;
108
109 let stdout = process
110 .stdout
111 .take()
112 .ok_or_else(|| Error::process("Failed to capture stdout"))?;
113
114 let stdout_rx = Self::spawn_stdout_reader(stdout);
115
116 let mut inner = Self {
117 process,
118 stdin: BufWriter::new(stdin),
119 stdout_rx,
120 response_timeout,
121 };
122
123 inner.verify_process()?;
125
126 info!("ExifTool process started successfully");
127 Ok(inner)
128 }
129
130 fn verify_process(&mut self) -> Result<()> {
132 debug!("Verifying ExifTool process");
133
134 self.send_line("-ver")?;
136 self.send_line("-execute")?;
137 self.stdin.flush()?;
138
139 let response = self.read_response()?;
141 debug!("ExifTool version: {}", response.text().trim());
142
143 Ok(())
144 }
145
146 pub fn send_line(&mut self, line: &str) -> Result<()> {
148 debug!("Sending command: {}", line);
149 writeln!(self.stdin, "{}", line)?;
150 Ok(())
151 }
152
153 pub fn execute(&mut self, args: &[String]) -> Result<Response> {
155 self.execute_with_id(None, args)
156 }
157
158 pub fn execute_with_id(
160 &mut self,
161 command_num: Option<usize>,
162 args: &[String],
163 ) -> Result<Response> {
164 debug!("Executing command with {} args", args.len());
165
166 for arg in args {
168 self.send_line(arg)?;
169 }
170
171 if let Some(num) = command_num {
173 self.send_line(&format!("-execute{}", num))?;
174 } else {
175 self.send_line("-execute")?;
176 }
177 self.stdin.flush()?;
178
179 self.read_response_for_num(command_num)
181 }
182
183 pub fn read_response(&mut self) -> Result<Response> {
185 self.read_response_for_id(None)
186 }
187
188 fn read_response_for_id(&mut self, expected_id: Option<CommandId>) -> Result<Response> {
190 let mut lines = Vec::new();
191
192 loop {
193 let buffer = match self.stdout_rx.recv_timeout(self.response_timeout) {
194 Ok(line) => line,
195 Err(RecvTimeoutError::Timeout) => return Err(Error::Timeout),
196 Err(RecvTimeoutError::Disconnected) => {
197 return Err(Error::process("Unexpected EOF from ExifTool process"));
198 }
199 };
200
201 let trimmed = buffer.trim();
202 debug!("Received line: {}", trimmed);
203
204 if trimmed.starts_with("{ready") && trimmed.ends_with('}') {
205 if trimmed == "{ready}" {
206 if let Some(expected) = expected_id {
208 return Err(Error::process(format!(
209 "Expected {{ready{}}}, but received {{ready}}",
210 expected.value()
211 )));
212 }
213 break;
214 }
215
216 let content = &trimmed[6..trimmed.len() - 1];
218
219 if let Ok(code) = content.parse::<i32>() {
221 if code != 0 {
222 let message = format!("ExifTool 返回错误码: {}", code);
223 return Err(Error::process(message));
224 }
225 } else {
227 match content.parse::<u32>() {
229 Ok(id) => {
230 let received_id = CommandId::new(id);
231 if let Some(expected) = expected_id
232 && received_id != expected
233 {
234 return Err(Error::process(format!(
235 "命令编号不匹配: 期望 {}, 收到 {}",
236 expected.value(),
237 received_id.value()
238 )));
239 }
240 break;
241 }
242 Err(_) => {
243 return Err(Error::process(format!(
244 "无法解析 {{ready}} 标记中的编号: {}",
245 trimmed
246 )));
247 }
248 }
249 }
250 }
251
252 lines.push(buffer.clone());
253 }
254
255 Ok(Response::new(lines))
256 }
257
258 pub fn execute_batch(&mut self, commands: &[Vec<String>]) -> Result<Vec<Response>> {
260 debug!("Executing batch of {} commands", commands.len());
261
262 let mut responses = Vec::with_capacity(commands.len());
263
264 for args in commands {
265 let response = self.execute(args)?;
266 responses.push(response);
267 }
268
269 Ok(responses)
270 }
271
272 pub fn execute_multiple(&mut self, commands: &[Vec<String>]) -> Result<Vec<Response>> {
277 if commands.is_empty() {
278 return Ok(Vec::new());
279 }
280
281 debug!("Executing {} commands atomically", commands.len());
282
283 for (idx, args) in commands.iter().enumerate() {
285 let command_num = idx + 1; for arg in args {
289 self.send_line(arg)?;
290 }
291
292 self.send_line(&format!("-execute{}", command_num))?;
294 }
295 self.stdin.flush()?;
296
297 let mut responses = Vec::with_capacity(commands.len());
299 for idx in 0..commands.len() {
300 let expected_num = idx + 1;
301 let response = self.read_response_for_num(Some(expected_num))?;
302 responses.push(response);
303 }
304
305 Ok(responses)
306 }
307
308 fn read_response_for_num(&mut self, expected_num: Option<usize>) -> Result<Response> {
310 let mut lines = Vec::new();
311
312 loop {
313 let buffer = match self.stdout_rx.recv_timeout(self.response_timeout) {
314 Ok(line) => line,
315 Err(RecvTimeoutError::Timeout) => return Err(Error::Timeout),
316 Err(RecvTimeoutError::Disconnected) => {
317 return Err(Error::process("Unexpected EOF from ExifTool process"));
318 }
319 };
320
321 let trimmed = buffer.trim();
322 debug!("Received line: {}", trimmed);
323
324 if trimmed.starts_with("{ready") && trimmed.ends_with('}') {
325 if trimmed == "{ready}" {
326 if let Some(expected) = expected_num {
328 return Err(Error::process(format!(
329 "Expected {{ready{}}}, but received {{ready}}",
330 expected
331 )));
332 }
333 break;
334 }
335
336 let content = &trimmed[6..trimmed.len() - 1];
338
339 match content.parse::<usize>() {
341 Ok(num) => {
342 if let Some(expected) = expected_num
343 && num != expected
344 {
345 return Err(Error::process(format!(
346 "命令编号不匹配: 期望 {}, 收到 {}",
347 expected, num
348 )));
349 }
350 break;
351 }
352 Err(_) => {
353 return Err(Error::process(format!(
354 "无法解析 {{ready}} 标记中的编号: {}",
355 trimmed
356 )));
357 }
358 }
359 }
360
361 lines.push(buffer.clone());
362 }
363
364 Ok(Response::new(lines))
365 }
366
367 pub fn flush(&mut self) -> Result<()> {
369 self.stdin.flush().map_err(|e| e.into())
370 }
371
372 pub fn close(&mut self) -> Result<()> {
374 info!("Closing ExifTool process");
375
376 let _ = self.send_line("-stay_open");
378 let _ = self.send_line("False");
379 let _ = self.send_line("-execute");
380 let _ = self.stdin.flush();
381
382 match self.wait_with_timeout(Duration::from_secs(5)) {
384 Ok(Some(status)) => {
385 if let Some(code) = status.code() {
386 if code != 0 {
387 warn!("ExifTool exited with code: {}", code);
388 } else {
389 info!("ExifTool process exited cleanly");
390 }
391 }
392 }
393 Ok(None) => {
394 warn!("ExifTool did not exit gracefully, forcing kill");
395 let _ = self.process.kill();
396 }
397 Err(e) => {
398 warn!("Error waiting for ExifTool: {}", e);
399 let _ = self.process.kill();
400 }
401 }
402
403 Ok(())
404 }
405
406 fn wait_with_timeout(&mut self, timeout: Duration) -> Result<Option<std::process::ExitStatus>> {
408 use std::thread;
409
410 let start = std::time::Instant::now();
411
412 loop {
413 match self.process.try_wait()? {
414 Some(status) => return Ok(Some(status)),
415 None => {
416 if start.elapsed() >= timeout {
417 return Ok(None);
418 }
419 thread::sleep(Duration::from_millis(10));
420 }
421 }
422 }
423 }
424}
425
426impl ExifToolInner {
427 fn spawn_stdout_reader(stdout: ChildStdout) -> Receiver<String> {
429 let (tx, rx) = mpsc::channel();
430
431 std::thread::spawn(move || {
432 let mut reader = BufReader::new(stdout);
433 let mut buffer = String::new();
434
435 loop {
436 buffer.clear();
437 match reader.read_line(&mut buffer) {
438 Ok(0) => break,
439 Ok(_) => {
440 if tx.send(buffer.clone()).is_err() {
441 break;
442 }
443 }
444 Err(_) => break,
445 }
446 }
447 });
448
449 rx
450 }
451}
452
453impl Drop for ExifToolInner {
454 fn drop(&mut self) {
455 if let Err(e) = self.close() {
456 warn!("Error closing ExifTool process: {}", e);
457 }
458 }
459}
460
461#[derive(Debug, Clone)]
463pub struct Response {
464 lines: Vec<String>,
465}
466
467impl Response {
468 pub fn new(lines: Vec<String>) -> Self {
470 Self { lines }
471 }
472
473 pub fn lines(&self) -> &[String] {
475 &self.lines
476 }
477
478 pub fn text(&self) -> String {
480 self.lines.join("")
481 }
482
483 pub fn json<T: serde::de::DeserializeOwned>(&self) -> Result<T> {
485 let text = self.text();
486 serde_json::from_str(&text).map_err(|e| e.into())
487 }
488
489 pub fn is_error(&self) -> bool {
491 self.lines.iter().any(|line| line.contains("Error:"))
492 }
493
494 pub fn error_message(&self) -> Option<String> {
496 self.lines
497 .iter()
498 .find(|line| line.contains("Error:"))
499 .cloned()
500 }
501}
502
503#[cfg(test)]
504mod tests {
505 use super::*;
506
507 #[test]
508 fn test_response() {
509 let lines = vec!["Line 1".to_string(), "Line 2".to_string()];
510 let response = Response::new(lines);
511
512 assert_eq!(response.lines().len(), 2);
513 assert_eq!(response.text(), "Line 1Line 2");
514 assert!(!response.is_error());
515 }
516
517 #[test]
518 fn test_response_error() {
519 let lines = vec!["Error: Something went wrong".to_string()];
520 let response = Response::new(lines);
521
522 assert!(response.is_error());
523 assert!(response.error_message().is_some());
524 }
525
526 #[test]
527 fn test_response_warning_not_error() {
528 let lines = vec!["Warning: minor issue".to_string()];
529 let response = Response::new(lines);
530
531 assert!(!response.is_error());
532 assert!(response.error_message().is_none());
533 }
534
535 #[test]
536 fn test_response_json() {
537 let lines = vec![r#"{"key": "value"}"#.to_string()];
538 let response = Response::new(lines);
539
540 #[derive(Debug, serde::Deserialize, PartialEq)]
541 struct TestData {
542 key: String,
543 }
544
545 let data: TestData = response.json().unwrap();
546 assert_eq!(data.key, "value");
547 }
548
549 #[test]
550 fn test_command_id() {
551 let id1 = CommandId::new(1);
552 let id2 = CommandId::new(1);
553 let id3 = CommandId::new(2);
554
555 assert_eq!(id1, id2);
556 assert_ne!(id1, id3);
557 assert_eq!(id1.value(), 1);
558 assert_eq!(id3.value(), 2);
559 }
560
561 #[test]
562 fn test_command_request() {
563 let args = vec!["-ver".to_string()];
564
565 let req1 = CommandRequest::new(args.clone());
566 assert!(req1.id.is_none());
567 assert_eq!(req1.args, args);
568
569 let req2 = CommandRequest::with_id(CommandId::new(42), args.clone());
570 assert_eq!(req2.id.unwrap().value(), 42);
571 assert_eq!(req2.args, args);
572 }
573
574 #[test]
575 fn test_command_response_with_num() {
576 use std::sync::atomic::{AtomicU32, Ordering};
578
579 let counter = AtomicU32::new(1);
580 let id1 = counter.fetch_add(1, Ordering::SeqCst);
581 let id2 = counter.fetch_add(1, Ordering::SeqCst);
582
583 assert_eq!(id1, 1);
584 assert_eq!(id2, 2);
585 }
586}