iflow_cli_sdk_rust/process_manager.rs
1//! Process manager for iFlow CLI
2//!
3//! This module handles the lifecycle of the iFlow CLI process,
4//! including starting, stopping, and managing stdio communication.
5
6use crate::error::{IFlowError, Result};
7use std::process::Stdio;
8use std::time::Duration;
9use tokio::process::Child;
10use tokio::time::sleep;
11
12/// Manages iFlow CLI process lifecycle
13///
14/// Handles starting and stopping the iFlow CLI process, as well as
15/// providing access to its stdio streams for communication.
16pub struct IFlowProcessManager {
17 pub process: Option<Child>, // Made public for access in Drop
18 start_port: u16,
19 port: Option<u16>,
20 debug: bool,
21}
22
23impl IFlowProcessManager {
24 /// Create a new process manager
25 ///
26 /// # Arguments
27 /// * `start_port` - The port to start the process on
28 /// * `debug` - Whether to enable debug mode
29 ///
30 /// # Returns
31 /// A new IFlowProcessManager instance
32 pub fn new(start_port: u16, debug: bool) -> Self {
33 Self {
34 process: None,
35 start_port,
36 port: None,
37 debug,
38 }
39 }
40
41 /// Check if a port is available for use
42 ///
43 /// # Arguments
44 /// * `port` - Port number to check
45 ///
46 /// # Returns
47 /// True if the port is available, False otherwise
48 fn is_port_available(port: u16) -> bool {
49 use std::net::TcpListener;
50 TcpListener::bind(("localhost", port)).is_ok()
51 }
52
53 /// Check if a port is listening (has a server running)
54 ///
55 /// # Arguments
56 /// * `port` - Port number to check
57 ///
58 /// # Returns
59 /// True if the port is listening, False otherwise
60 pub fn is_port_listening(port: u16) -> bool {
61 use std::net::TcpStream;
62 use std::time::Duration;
63 TcpStream::connect_timeout(
64 &format!("127.0.0.1:{}", port).parse().unwrap(),
65 Duration::from_millis(100),
66 )
67 .is_ok()
68 }
69
70 /// Find an available port starting from the given port
71 ///
72 /// # Arguments
73 /// * `start_port` - Port to start searching from
74 /// * `max_attempts` - Maximum number of ports to try
75 ///
76 /// # Returns
77 /// An available port number
78 ///
79 /// # Errors
80 /// Returns an error if no available port is found
81 fn find_available_port(start_port: u16, max_attempts: u16) -> Result<u16> {
82 for i in 0..max_attempts {
83 let port = start_port + i;
84 if Self::is_port_available(port) {
85 tracing::debug!("Found available port: {}", port);
86 return Ok(port);
87 }
88 }
89
90 Err(IFlowError::ProcessManager(format!(
91 "No available port found in range {}-{}",
92 start_port,
93 start_port + max_attempts
94 )))
95 }
96
97 /// Start the iFlow process
98 ///
99 /// Starts the iFlow CLI process with ACP support and WebSocket communication.
100 ///
101 /// # Returns
102 /// * `Ok(String)` containing the WebSocket URL if the process was started successfully
103 /// * `Err(IFlowError)` if there was an error starting the process
104 pub async fn start(&mut self, use_websocket: bool) -> Result<Option<String>> {
105 if use_websocket {
106 tracing::debug!("Starting iFlow process with experimental ACP and WebSocket support");
107
108 // Find an available port
109 let port = Self::find_available_port(self.start_port, 100)?;
110 self.port = Some(port);
111
112 // Start iFlow process with WebSocket support
113 let mut cmd = tokio::process::Command::new("iflow");
114 cmd.arg("--experimental-acp");
115 cmd.arg("--port");
116 cmd.arg(port.to_string());
117
118 // Add debug flag if enabled
119 if self.debug {
120 cmd.arg("--debug");
121 }
122
123 // In WebSocket mode, set stdout/stderr to inherit to avoid blocking/exit when pipes are not consumed
124 cmd.stdout(Stdio::inherit());
125 cmd.stderr(Stdio::inherit());
126 cmd.stdin(Stdio::null()); // No stdin needed for WebSocket
127
128 let child = cmd
129 .spawn()
130 .map_err(|e| IFlowError::ProcessManager(format!("Failed to start iflow: {}", e)))?;
131
132 self.process = Some(child);
133
134 // Wait longer for process to start and WebSocket server to be ready
135 tracing::debug!("Waiting for iFlow process to start...");
136 sleep(Duration::from_secs(8)).await;
137
138 // Verify the port is actually listening with more retries and longer timeout
139 let mut attempts = 0;
140 let max_attempts = 30; // 30 attempts * 1 second = 30 seconds total
141
142 while attempts < max_attempts {
143 if Self::is_port_listening(port) {
144 tracing::debug!("iFlow WebSocket server is ready on port {}", port);
145 break;
146 }
147
148 attempts += 1;
149 if attempts % 5 == 0 {
150 tracing::debug!(
151 "Still waiting for iFlow to be ready... (attempt {}/{})",
152 attempts,
153 max_attempts
154 );
155 }
156
157 sleep(Duration::from_secs(1)).await;
158 }
159
160 if attempts >= max_attempts {
161 return Err(IFlowError::ProcessManager(format!(
162 "iFlow process failed to start WebSocket server on port {} after {} seconds",
163 port, max_attempts
164 )));
165 }
166
167 tracing::debug!(
168 "iFlow process started with WebSocket support on port {}",
169 port
170 );
171
172 // Return the WebSocket URL with peer parameter
173 Ok(Some(format!("ws://localhost:{}/acp?peer=iflow", port)))
174 } else {
175 tracing::debug!("Starting iFlow process with experimental ACP and stdio support");
176
177 // Start iFlow process with stdio support
178 let mut cmd = tokio::process::Command::new("iflow");
179 cmd.arg("--experimental-acp");
180
181 // Add debug flag if enabled
182 if self.debug {
183 cmd.arg("--debug");
184 }
185
186 cmd.stdout(Stdio::piped());
187 cmd.stderr(Stdio::piped());
188 cmd.stdin(Stdio::piped()); // stdin needed for stdio
189
190 tracing::debug!("Starting iFlow process with command: {:?}", cmd);
191
192 let child = cmd
193 .spawn()
194 .map_err(|e| IFlowError::ProcessManager(format!("Failed to start iflow: {}", e)))?;
195
196 self.process = Some(child);
197
198 // Wait for process to start
199 tracing::debug!("Waiting for iFlow process to start...");
200 sleep(Duration::from_secs(5)).await;
201 tracing::debug!("iFlow process should be started by now");
202
203 tracing::debug!("iFlow process started with stdio support");
204
205 // No WebSocket URL for stdio
206 Ok(None)
207 }
208 }
209
210 /// Stop the iFlow process
211 ///
212 /// Attempts to gracefully stop the iFlow process if it's running.
213 ///
214 /// # Returns
215 /// * `Ok(())` if the process was stopped successfully or wasn't running
216 /// * `Err(IFlowError)` if there was an error stopping the process
217 pub async fn stop(&mut self) -> Result<()> {
218 if let Some(mut process) = self.process.take() {
219 tracing::debug!("Stopping iFlow process");
220
221 // Try graceful shutdown first
222 match tokio::time::timeout(Duration::from_secs(5), process.kill()).await {
223 Ok(Ok(_)) => {
224 // Wait for the process to actually exit with a timeout
225 match tokio::time::timeout(Duration::from_secs(5), process.wait()).await {
226 Ok(Ok(_)) => tracing::debug!("iFlow process stopped gracefully"),
227 Ok(Err(e)) => tracing::warn!("Error waiting for iFlow process: {}", e),
228 Err(_) => {
229 tracing::warn!(
230 "Timeout waiting for iFlow process to exit, forcing termination"
231 );
232 // Force kill if it didn't exit in time
233 let _ = process.start_kill();
234 }
235 }
236 }
237 Ok(Err(e)) => {
238 tracing::warn!("Failed to kill iFlow process: {}, forcing termination", e);
239 let _ = process.start_kill();
240 }
241 Err(_) => {
242 tracing::warn!("Timeout killing iFlow process, forcing termination");
243 let _ = process.start_kill();
244 }
245 }
246
247 // Add a small delay to ensure all resources are released
248 tokio::time::sleep(std::time::Duration::from_millis(100)).await;
249
250 tracing::debug!("iFlow process stopped");
251 }
252
253 // Clear the port when stopping
254 self.port = None;
255
256 Ok(())
257 }
258
259 /// Get the port the iFlow process is running on
260 ///
261 /// # Returns
262 /// The port number, or None if not running
263 pub fn port(&self) -> Option<u16> {
264 self.port
265 }
266
267 /// Check if the iFlow process is running
268 ///
269 /// # Returns
270 /// `true` if the process is running, `false` otherwise
271 pub fn is_running(&self) -> bool {
272 self.process.is_some()
273 }
274
275 /// Take ownership of the process's stdin
276 ///
277 /// Takes ownership of the process's stdin stream for communication.
278 /// This method can only be called once, as it consumes the stream.
279 ///
280 /// # Returns
281 /// `Some(ChildStdin)` if the process is running and has a stdin stream, `None` otherwise
282 pub fn take_stdin(&mut self) -> Option<tokio::process::ChildStdin> {
283 self.process.as_mut().and_then(|p| p.stdin.take())
284 }
285
286 /// Take ownership of the process's stdout
287 ///
288 /// Takes ownership of the process's stdout stream for communication.
289 /// This method can only be called once, as it consumes the stream.
290 ///
291 /// # Returns
292 /// `Some(ChildStdout)` if the process is running and has a stdout stream, `None` otherwise
293 pub fn take_stdout(&mut self) -> Option<tokio::process::ChildStdout> {
294 self.process.as_mut().and_then(|p| p.stdout.take())
295 }
296}