polykit_core/
streaming.rs1use tokio::io::{AsyncBufReadExt, BufReader};
4use tokio::process::{Child, Command};
5
6use crate::command_validator::CommandValidator;
7use crate::error::{Error, Result};
8use crate::package::Package;
9
10pub struct StreamingTask {
11 child: Child,
12 package_name: String,
13 task_name: String,
14}
15
16impl StreamingTask {
17 pub async fn spawn(
18 package: &Package,
19 task_name: &str,
20 package_path: &std::path::Path,
21 ) -> Result<Self> {
22 let task = package
23 .get_task(task_name)
24 .ok_or_else(|| Error::TaskExecution {
25 package: package.name.clone(),
26 task: task_name.to_string(),
27 message: format!("Task '{}' not found", task_name),
28 })?;
29
30 let validator = CommandValidator::new();
31 validator.validate(&task.command)?;
32
33 let child = Command::new("sh")
34 .arg("-c")
35 .arg(&task.command)
36 .current_dir(package_path)
37 .stdout(std::process::Stdio::piped())
38 .stderr(std::process::Stdio::piped())
39 .spawn()
40 .map_err(|e| Error::TaskExecution {
41 package: package.name.clone(),
42 task: task_name.to_string(),
43 message: format!("Failed to spawn task: {}", e),
44 })?;
45
46 Ok(Self {
47 child,
48 package_name: package.name.clone(),
49 task_name: task_name.to_string(),
50 })
51 }
52
53 pub async fn stream_output<F>(mut self, mut on_line: F) -> Result<bool>
54 where
55 F: FnMut(&str, bool) + Send,
56 {
57 let package_name = self.package_name.clone();
58 let task_name = self.task_name.clone();
59 let mut stdout = self
60 .child
61 .stdout
62 .take()
63 .ok_or_else(|| Error::TaskExecution {
64 package: package_name.clone(),
65 task: task_name.clone(),
66 message: "Failed to capture stdout".to_string(),
67 })?;
68 let mut stderr = self
69 .child
70 .stderr
71 .take()
72 .ok_or_else(|| Error::TaskExecution {
73 package: package_name.clone(),
74 task: task_name.clone(),
75 message: "Failed to capture stderr".to_string(),
76 })?;
77
78 let mut stdout_reader = BufReader::new(&mut stdout);
79 let mut stderr_reader = BufReader::new(&mut stderr);
80 let mut stdout_done = false;
81 let mut stderr_done = false;
82 let mut exit_status = None;
83
84 loop {
85 let mut stdout_line = String::new();
86 let mut stderr_line = String::new();
87
88 tokio::select! {
89 result = stdout_reader.read_line(&mut stdout_line), if !stdout_done => {
90 match result {
91 Ok(0) => {
92 stdout_done = true;
93 }
94 Ok(_) => {
95 let trimmed = stdout_line.trim_end();
96 if !trimmed.is_empty() {
97 on_line(trimmed, false);
98 }
99 }
100 Err(e) => {
101 return Err(Error::TaskExecution {
102 package: self.package_name.clone(),
103 task: self.task_name.clone(),
104 message: format!("Failed to read stdout: {}", e),
105 });
106 }
107 }
108 }
109 result = stderr_reader.read_line(&mut stderr_line), if !stderr_done => {
110 match result {
111 Ok(0) => {
112 stderr_done = true;
113 }
114 Ok(_) => {
115 let trimmed = stderr_line.trim_end();
116 if !trimmed.is_empty() {
117 on_line(trimmed, true);
118 }
119 }
120 Err(e) => {
121 return Err(Error::TaskExecution {
122 package: self.package_name.clone(),
123 task: self.task_name.clone(),
124 message: format!("Failed to read stderr: {}", e),
125 });
126 }
127 }
128 }
129 status = self.child.wait(), if exit_status.is_none() => {
130 exit_status = Some(status.map_err(|e| Error::TaskExecution {
131 package: self.package_name.clone(),
132 task: self.task_name.clone(),
133 message: format!("Failed to wait for process: {}", e),
134 })?);
135 }
136 }
137
138 if let Some(status) = exit_status {
140 if stdout_done && stderr_done {
141 return Ok(status.success());
142 }
143 }
144 }
145 }
146
147 pub fn package_name(&self) -> &str {
148 &self.package_name
149 }
150
151 pub fn task_name(&self) -> &str {
152 &self.task_name
153 }
154}