polykit_core/
streaming.rs

1//! Streaming output utilities for task execution.
2
3use tokio::io::{AsyncBufReadExt, BufReader};
4use tokio::process::{Child, Command};
5
6use crate::command_validator::CommandValidator;
7use crate::error::{Error, Result};
8use crate::package::Package;
9
10pub struct StreamingTask {
11    child: Child,
12    package_name: String,
13    task_name: String,
14}
15
16impl StreamingTask {
17    pub async fn spawn(
18        package: &Package,
19        task_name: &str,
20        package_path: &std::path::Path,
21    ) -> Result<Self> {
22        let task = package
23            .get_task(task_name)
24            .ok_or_else(|| Error::TaskExecution {
25                package: package.name.clone(),
26                task: task_name.to_string(),
27                message: format!("Task '{}' not found", task_name),
28            })?;
29
30        let validator = CommandValidator::new();
31        validator.validate(&task.command)?;
32
33        let child = Command::new("sh")
34            .arg("-c")
35            .arg(&task.command)
36            .current_dir(package_path)
37            .stdout(std::process::Stdio::piped())
38            .stderr(std::process::Stdio::piped())
39            .spawn()
40            .map_err(|e| Error::TaskExecution {
41                package: package.name.clone(),
42                task: task_name.to_string(),
43                message: format!("Failed to spawn task: {}", e),
44            })?;
45
46        Ok(Self {
47            child,
48            package_name: package.name.clone(),
49            task_name: task_name.to_string(),
50        })
51    }
52
53    pub async fn stream_output<F>(mut self, mut on_line: F) -> Result<bool>
54    where
55        F: FnMut(&str, bool) + Send,
56    {
57        let package_name = self.package_name.clone();
58        let task_name = self.task_name.clone();
59        let mut stdout = self
60            .child
61            .stdout
62            .take()
63            .ok_or_else(|| Error::TaskExecution {
64                package: package_name.clone(),
65                task: task_name.clone(),
66                message: "Failed to capture stdout".to_string(),
67            })?;
68        let mut stderr = self
69            .child
70            .stderr
71            .take()
72            .ok_or_else(|| Error::TaskExecution {
73                package: package_name.clone(),
74                task: task_name.clone(),
75                message: "Failed to capture stderr".to_string(),
76            })?;
77
78        let mut stdout_reader = BufReader::new(&mut stdout);
79        let mut stderr_reader = BufReader::new(&mut stderr);
80        let mut stdout_done = false;
81        let mut stderr_done = false;
82        let mut exit_status = None;
83
84        loop {
85            let mut stdout_line = String::new();
86            let mut stderr_line = String::new();
87
88            tokio::select! {
89                result = stdout_reader.read_line(&mut stdout_line), if !stdout_done => {
90                    match result {
91                        Ok(0) => {
92                            stdout_done = true;
93                        }
94                        Ok(_) => {
95                            let trimmed = stdout_line.trim_end();
96                            if !trimmed.is_empty() {
97                                on_line(trimmed, false);
98                            }
99                        }
100                        Err(e) => {
101                            return Err(Error::TaskExecution {
102                                package: self.package_name.clone(),
103                                task: self.task_name.clone(),
104                                message: format!("Failed to read stdout: {}", e),
105                            });
106                        }
107                    }
108                }
109                result = stderr_reader.read_line(&mut stderr_line), if !stderr_done => {
110                    match result {
111                        Ok(0) => {
112                            stderr_done = true;
113                        }
114                        Ok(_) => {
115                            let trimmed = stderr_line.trim_end();
116                            if !trimmed.is_empty() {
117                                on_line(trimmed, true);
118                            }
119                        }
120                        Err(e) => {
121                            return Err(Error::TaskExecution {
122                                package: self.package_name.clone(),
123                                task: self.task_name.clone(),
124                                message: format!("Failed to read stderr: {}", e),
125                            });
126                        }
127                    }
128                }
129                status = self.child.wait(), if exit_status.is_none() => {
130                    exit_status = Some(status.map_err(|e| Error::TaskExecution {
131                        package: self.package_name.clone(),
132                        task: self.task_name.clone(),
133                        message: format!("Failed to wait for process: {}", e),
134                    })?);
135                }
136            }
137
138            // If process finished and both streams are done, exit
139            if let Some(status) = exit_status {
140                if stdout_done && stderr_done {
141                    return Ok(status.success());
142                }
143            }
144        }
145    }
146
147    pub fn package_name(&self) -> &str {
148        &self.package_name
149    }
150
151    pub fn task_name(&self) -> &str {
152        &self.task_name
153    }
154}