iflow_cli_sdk_rust/
process_manager.rs

1//! Process manager for iFlow CLI
2//!
3//! This module handles the lifecycle of the iFlow CLI process,
4//! including starting, stopping, and managing stdio communication.
5
6use crate::error::{IFlowError, Result};
7use std::process::Stdio;
8use std::time::Duration;
9use tokio::process::Child;
10use tokio::time::sleep;
11
12/// Manages iFlow CLI process lifecycle
13///
14/// Handles starting and stopping the iFlow CLI process, as well as
15/// providing access to its stdio streams for communication.
16pub struct IFlowProcessManager {
17    pub process: Option<Child>, // Made public for access in Drop
18    start_port: u16,
19    port: Option<u16>,
20    debug: bool,
21}
22
23impl IFlowProcessManager {
24    /// Create a new process manager
25    ///
26    /// # Arguments
27    /// * `start_port` - The port to start the process on
28    /// * `debug` - Whether to enable debug mode
29    ///
30    /// # Returns
31    /// A new IFlowProcessManager instance
32    pub fn new(start_port: u16, debug: bool) -> Self {
33        Self {
34            process: None,
35            start_port,
36            port: None,
37            debug,
38        }
39    }
40
41    /// Check if a port is available for use
42    ///
43    /// # Arguments
44    /// * `port` - Port number to check
45    ///
46    /// # Returns
47    /// True if the port is available, False otherwise
48    fn is_port_available(port: u16) -> bool {
49        use std::net::TcpListener;
50        TcpListener::bind(("localhost", port)).is_ok()
51    }
52
53    /// Check if a port is listening (has a server running)
54    ///
55    /// # Arguments
56    /// * `port` - Port number to check
57    ///
58    /// # Returns
59    /// True if the port is listening, False otherwise
60    pub fn is_port_listening(port: u16) -> bool {
61        use std::net::TcpStream;
62        use std::time::Duration;
63        TcpStream::connect_timeout(
64            &format!("127.0.0.1:{}", port).parse().unwrap(),
65            Duration::from_millis(100),
66        )
67        .is_ok()
68    }
69
70    /// Find an available port starting from the given port
71    ///
72    /// # Arguments
73    /// * `start_port` - Port to start searching from
74    /// * `max_attempts` - Maximum number of ports to try
75    ///
76    /// # Returns
77    /// An available port number
78    ///
79    /// # Errors
80    /// Returns an error if no available port is found
81    fn find_available_port(start_port: u16, max_attempts: u16) -> Result<u16> {
82        for i in 0..max_attempts {
83            let port = start_port + i;
84            if Self::is_port_available(port) {
85                tracing::debug!("Found available port: {}", port);
86                return Ok(port);
87            }
88        }
89
90        Err(IFlowError::ProcessManager(format!(
91            "No available port found in range {}-{}",
92            start_port,
93            start_port + max_attempts
94        )))
95    }
96
97    /// Start the iFlow process
98    ///
99    /// Starts the iFlow CLI process with ACP support and WebSocket communication.
100    ///
101    /// # Returns
102    /// * `Ok(String)` containing the WebSocket URL if the process was started successfully
103    /// * `Err(IFlowError)` if there was an error starting the process
104    pub async fn start(&mut self, use_websocket: bool) -> Result<Option<String>> {
105        if use_websocket {
106            tracing::debug!("Starting iFlow process with experimental ACP and WebSocket support");
107
108            // Find an available port
109            let port = Self::find_available_port(self.start_port, 100)?;
110            self.port = Some(port);
111
112            // Start iFlow process with WebSocket support
113            let mut cmd = tokio::process::Command::new("iflow");
114            cmd.arg("--experimental-acp");
115            cmd.arg("--port");
116            cmd.arg(port.to_string());
117
118            // Add debug flag if enabled
119            if self.debug {
120                cmd.arg("--debug");
121            }
122
123            // In WebSocket mode, set stdout/stderr to inherit to avoid blocking/exit when pipes are not consumed
124            cmd.stdout(Stdio::inherit());
125            cmd.stderr(Stdio::inherit());
126            cmd.stdin(Stdio::null()); // No stdin needed for WebSocket
127
128            let child = cmd
129                .spawn()
130                .map_err(|e| IFlowError::ProcessManager(format!("Failed to start iflow: {}", e)))?;
131
132            self.process = Some(child);
133
134            // Wait longer for process to start and WebSocket server to be ready
135            tracing::debug!("Waiting for iFlow process to start...");
136            sleep(Duration::from_secs(8)).await;
137
138            // Verify the port is actually listening with more retries and longer timeout
139            let mut attempts = 0;
140            let max_attempts = 30; // 30 attempts * 1 second = 30 seconds total
141
142            while attempts < max_attempts {
143                if Self::is_port_listening(port) {
144                    tracing::debug!("iFlow WebSocket server is ready on port {}", port);
145                    break;
146                }
147
148                attempts += 1;
149                if attempts % 5 == 0 {
150                    tracing::debug!(
151                        "Still waiting for iFlow to be ready... (attempt {}/{})",
152                        attempts,
153                        max_attempts
154                    );
155                }
156
157                sleep(Duration::from_secs(1)).await;
158            }
159
160            if attempts >= max_attempts {
161                return Err(IFlowError::ProcessManager(format!(
162                    "iFlow process failed to start WebSocket server on port {} after {} seconds",
163                    port, max_attempts
164                )));
165            }
166
167            tracing::debug!(
168                "iFlow process started with WebSocket support on port {}",
169                port
170            );
171
172            // Return the WebSocket URL with peer parameter
173            Ok(Some(format!("ws://localhost:{}/acp?peer=iflow", port)))
174        } else {
175            tracing::debug!("Starting iFlow process with experimental ACP and stdio support");
176
177            // Start iFlow process with stdio support
178            let mut cmd = tokio::process::Command::new("iflow");
179            cmd.arg("--experimental-acp");
180
181            // Add debug flag if enabled
182            if self.debug {
183                cmd.arg("--debug");
184            }
185
186            cmd.stdout(Stdio::piped());
187            cmd.stderr(Stdio::piped());
188            cmd.stdin(Stdio::piped()); // stdin needed for stdio
189
190            tracing::debug!("Starting iFlow process with command: {:?}", cmd);
191
192            let child = cmd
193                .spawn()
194                .map_err(|e| IFlowError::ProcessManager(format!("Failed to start iflow: {}", e)))?;
195
196            self.process = Some(child);
197
198            // Wait for process to start
199            tracing::debug!("Waiting for iFlow process to start...");
200            sleep(Duration::from_secs(5)).await;
201            tracing::debug!("iFlow process should be started by now");
202
203            tracing::debug!("iFlow process started with stdio support");
204
205            // No WebSocket URL for stdio
206            Ok(None)
207        }
208    }
209
210    /// Stop the iFlow process
211    ///
212    /// Attempts to gracefully stop the iFlow process if it's running.
213    ///
214    /// # Returns
215    /// * `Ok(())` if the process was stopped successfully or wasn't running
216    /// * `Err(IFlowError)` if there was an error stopping the process
217    pub async fn stop(&mut self) -> Result<()> {
218        if let Some(mut process) = self.process.take() {
219            tracing::debug!("Stopping iFlow process");
220
221            // Try graceful shutdown first
222            match tokio::time::timeout(Duration::from_secs(5), process.kill()).await {
223                Ok(Ok(_)) => {
224                    // Wait for the process to actually exit with a timeout
225                    match tokio::time::timeout(Duration::from_secs(5), process.wait()).await {
226                        Ok(Ok(_)) => tracing::debug!("iFlow process stopped gracefully"),
227                        Ok(Err(e)) => tracing::warn!("Error waiting for iFlow process: {}", e),
228                        Err(_) => {
229                            tracing::warn!(
230                                "Timeout waiting for iFlow process to exit, forcing termination"
231                            );
232                            // Force kill if it didn't exit in time
233                            let _ = process.start_kill();
234                        }
235                    }
236                }
237                Ok(Err(e)) => {
238                    tracing::warn!("Failed to kill iFlow process: {}, forcing termination", e);
239                    let _ = process.start_kill();
240                }
241                Err(_) => {
242                    tracing::warn!("Timeout killing iFlow process, forcing termination");
243                    let _ = process.start_kill();
244                }
245            }
246
247            // Add a small delay to ensure all resources are released
248            tokio::time::sleep(std::time::Duration::from_millis(100)).await;
249
250            tracing::debug!("iFlow process stopped");
251        }
252
253        // Clear the port when stopping
254        self.port = None;
255
256        Ok(())
257    }
258
259    /// Get the port the iFlow process is running on
260    ///
261    /// # Returns
262    /// The port number, or None if not running
263    pub fn port(&self) -> Option<u16> {
264        self.port
265    }
266
267    /// Check if the iFlow process is running
268    ///
269    /// # Returns
270    /// `true` if the process is running, `false` otherwise
271    pub fn is_running(&self) -> bool {
272        self.process.is_some()
273    }
274
275    /// Take ownership of the process's stdin
276    ///
277    /// Takes ownership of the process's stdin stream for communication.
278    /// This method can only be called once, as it consumes the stream.
279    ///
280    /// # Returns
281    /// `Some(ChildStdin)` if the process is running and has a stdin stream, `None` otherwise
282    pub fn take_stdin(&mut self) -> Option<tokio::process::ChildStdin> {
283        self.process.as_mut().and_then(|p| p.stdin.take())
284    }
285
286    /// Take ownership of the process's stdout
287    ///
288    /// Takes ownership of the process's stdout stream for communication.
289    /// This method can only be called once, as it consumes the stream.
290    ///
291    /// # Returns
292    /// `Some(ChildStdout)` if the process is running and has a stdout stream, `None` otherwise
293    pub fn take_stdout(&mut self) -> Option<tokio::process::ChildStdout> {
294        self.process.as_mut().and_then(|p| p.stdout.take())
295    }
296}