1use std::collections::HashMap;
12use std::process::Stdio;
13use std::sync::Arc;
14use tokio::io::{AsyncBufReadExt, BufReader};
15use tokio::process::{Child, Command};
16use tokio::sync::{broadcast, RwLock};
17
18use super::types::{ShellOutputEvent, ShellOutputType, ShellStats, ShellStatus};
19
20pub struct BackgroundShell {
22 pub id: String,
23 pub command: String,
24 pub cwd: String,
25 pub process: Option<Child>,
26 pub status: ShellStatus,
27 pub start_time: i64,
28 pub end_time: Option<i64>,
29 pub exit_code: Option<i32>,
30 pub output: Vec<String>,
31 pub output_size: usize,
32 pub max_runtime: Option<u64>,
33 pub metadata: Option<HashMap<String, serde_json::Value>>,
34}
35
36#[derive(Debug, Clone)]
38pub struct ShellManagerOptions {
39 pub max_shells: usize,
40 pub max_output_size: usize,
41 pub default_max_runtime: u64,
42}
43
44impl Default for ShellManagerOptions {
45 fn default() -> Self {
46 Self {
47 max_shells: 10,
48 max_output_size: 10 * 1024 * 1024, default_max_runtime: 3600000, }
51 }
52}
53
54#[derive(Debug)]
56pub struct CreateShellResult {
57 pub success: bool,
58 pub id: Option<String>,
59 pub error: Option<String>,
60}
61
62pub struct ShellManager {
64 shells: Arc<RwLock<HashMap<String, BackgroundShell>>>,
65 max_shells: usize,
66 max_output_size: usize,
67 default_max_runtime: u64,
68 event_tx: broadcast::Sender<ShellOutputEvent>,
69}
70
71impl ShellManager {
72 pub fn new(options: ShellManagerOptions) -> Self {
74 let (event_tx, _) = broadcast::channel(1000);
75 Self {
76 shells: Arc::new(RwLock::new(HashMap::new())),
77 max_shells: options.max_shells,
78 max_output_size: options.max_output_size,
79 default_max_runtime: options.default_max_runtime,
80 event_tx,
81 }
82 }
83
84 pub fn subscribe(&self) -> broadcast::Receiver<ShellOutputEvent> {
86 self.event_tx.subscribe()
87 }
88
89 fn generate_shell_id(&self) -> String {
91 let uuid_str = uuid::Uuid::new_v4().to_string();
92 format!(
93 "bash_{}_{}",
94 chrono::Utc::now().timestamp_millis(),
95 uuid_str.get(..8).unwrap_or(&uuid_str)
96 )
97 }
98
99 pub async fn create_shell(
101 &self,
102 command: &str,
103 cwd: Option<&str>,
104 max_runtime: Option<u64>,
105 metadata: Option<HashMap<String, serde_json::Value>>,
106 ) -> CreateShellResult {
107 let shell_count = self.shells.read().await.len();
109 if shell_count >= self.max_shells {
110 let cleaned = self.cleanup_completed().await;
111 if cleaned == 0 && shell_count >= self.max_shells {
112 return CreateShellResult {
113 success: false,
114 id: None,
115 error: Some(format!(
116 "Maximum number of background shells ({}) reached",
117 self.max_shells
118 )),
119 };
120 }
121 }
122
123 let id = self.generate_shell_id();
124 let working_dir = cwd.unwrap_or(".").to_string();
125 let runtime = max_runtime.unwrap_or(self.default_max_runtime);
126
127 let child = match Command::new("bash")
129 .arg("-c")
130 .arg(command)
131 .current_dir(&working_dir)
132 .stdout(Stdio::piped())
133 .stderr(Stdio::piped())
134 .spawn()
135 {
136 Ok(c) => c,
137 Err(e) => {
138 return CreateShellResult {
139 success: false,
140 id: None,
141 error: Some(format!("Failed to spawn process: {}", e)),
142 };
143 }
144 };
145
146 let shell = BackgroundShell {
147 id: id.clone(),
148 command: command.to_string(),
149 cwd: working_dir,
150 process: Some(child),
151 status: ShellStatus::Running,
152 start_time: chrono::Utc::now().timestamp_millis(),
153 end_time: None,
154 exit_code: None,
155 output: Vec::new(),
156 output_size: 0,
157 max_runtime: Some(runtime),
158 metadata,
159 };
160
161 self.shells.write().await.insert(id.clone(), shell);
162 self.spawn_output_reader(id.clone()).await;
163
164 CreateShellResult {
165 success: true,
166 id: Some(id),
167 error: None,
168 }
169 }
170
171 async fn spawn_output_reader(&self, shell_id: String) {
173 let shells = Arc::clone(&self.shells);
174 let event_tx = self.event_tx.clone();
175 let max_output_size = self.max_output_size;
176
177 tokio::spawn(async move {
178 let mut shells_guard = shells.write().await;
179 if let Some(shell) = shells_guard.get_mut(&shell_id) {
180 if let Some(ref mut process) = shell.process {
181 if let Some(stdout) = process.stdout.take() {
182 let shells_clone = Arc::clone(&shells);
183 let id_clone = shell_id.clone();
184 let tx_clone = event_tx.clone();
185
186 tokio::spawn(async move {
187 let reader = BufReader::new(stdout);
188 let mut lines = reader.lines();
189 while let Ok(Some(line)) = lines.next_line().await {
190 let mut guard = shells_clone.write().await;
191 if let Some(s) = guard.get_mut(&id_clone) {
192 if s.output_size < max_output_size {
193 s.output.push(line.clone());
194 s.output_size += line.len();
195 }
196 }
197 let _ = tx_clone.send(ShellOutputEvent {
198 id: id_clone.clone(),
199 data: line,
200 output_type: ShellOutputType::Stdout,
201 });
202 }
203 });
204 }
205 }
206 }
207 });
208 }
209
210 pub async fn get_shell(&self, id: &str) -> Option<ShellStatus> {
212 self.shells.read().await.get(id).map(|s| s.status)
213 }
214
215 pub async fn get_output(&self, id: &str, clear: bool) -> Option<String> {
217 let mut shells = self.shells.write().await;
218 if let Some(shell) = shells.get_mut(id) {
219 let output = shell.output.join("\n");
220 if clear {
221 shell.output.clear();
222 }
223 Some(output)
224 } else {
225 None
226 }
227 }
228
229 pub async fn terminate_shell(&self, id: &str) -> bool {
231 let mut shells = self.shells.write().await;
232 if let Some(shell) = shells.get_mut(id) {
233 if let Some(ref mut process) = shell.process {
234 let _ = process.kill().await;
235 }
236 shell.status = ShellStatus::Terminated;
237 shell.end_time = Some(chrono::Utc::now().timestamp_millis());
238 true
239 } else {
240 false
241 }
242 }
243
244 pub async fn list_shells(&self) -> Vec<(String, String, ShellStatus, i64, usize)> {
246 self.shells
247 .read()
248 .await
249 .values()
250 .map(|s| {
251 let duration = s
252 .end_time
253 .unwrap_or_else(|| chrono::Utc::now().timestamp_millis())
254 - s.start_time;
255 (
256 s.id.clone(),
257 s.command.chars().take(100).collect(),
258 s.status,
259 duration,
260 s.output_size,
261 )
262 })
263 .collect()
264 }
265
266 pub async fn cleanup_completed(&self) -> usize {
268 let mut shells = self.shells.write().await;
269 let to_remove: Vec<String> = shells
270 .iter()
271 .filter(|(_, s)| {
272 matches!(
273 s.status,
274 ShellStatus::Completed | ShellStatus::Failed | ShellStatus::Terminated
275 )
276 })
277 .map(|(id, _)| id.clone())
278 .collect();
279
280 let count = to_remove.len();
281 for id in to_remove {
282 shells.remove(&id);
283 }
284 count
285 }
286
287 pub async fn terminate_all(&self) -> usize {
289 let mut shells = self.shells.write().await;
290 let mut terminated = 0;
291 for shell in shells.values_mut() {
292 if let Some(ref mut process) = shell.process {
293 if process.kill().await.is_ok() {
294 terminated += 1;
295 }
296 }
297 shell.status = ShellStatus::Terminated;
298 }
299 shells.clear();
300 terminated
301 }
302
303 pub async fn get_stats(&self) -> ShellStats {
305 let shells = self.shells.read().await;
306 let mut stats = ShellStats {
307 total: shells.len(),
308 running: 0,
309 completed: 0,
310 failed: 0,
311 paused: 0,
312 terminated: 0,
313 max_shells: self.max_shells,
314 available: 0,
315 };
316
317 for shell in shells.values() {
318 match shell.status {
319 ShellStatus::Running => stats.running += 1,
320 ShellStatus::Completed => stats.completed += 1,
321 ShellStatus::Failed => stats.failed += 1,
322 ShellStatus::Paused => stats.paused += 1,
323 ShellStatus::Terminated => stats.terminated += 1,
324 }
325 }
326
327 stats.available = self.max_shells.saturating_sub(stats.running + stats.paused);
328 stats
329 }
330}