Skip to main content

aster/background/
shell_manager.rs

1//! 后台 Shell 管理器
2//!
3//! 管理后台执行的 Shell 进程,包括状态追踪、输出收集和资源管理
4//!
5//! # 功能
6//! - Shell 进程生命周期管理
7//! - 输出流式收集
8//! - 进程暂停/恢复支持
9//! - 优雅终止
10
11use std::collections::HashMap;
12use std::process::Stdio;
13use std::sync::Arc;
14use tokio::io::{AsyncBufReadExt, BufReader};
15use tokio::process::{Child, Command};
16use tokio::sync::{broadcast, RwLock};
17
18use super::types::{ShellOutputEvent, ShellOutputType, ShellStats, ShellStatus};
19
20/// 后台 Shell
21pub struct BackgroundShell {
22    pub id: String,
23    pub command: String,
24    pub cwd: String,
25    pub process: Option<Child>,
26    pub status: ShellStatus,
27    pub start_time: i64,
28    pub end_time: Option<i64>,
29    pub exit_code: Option<i32>,
30    pub output: Vec<String>,
31    pub output_size: usize,
32    pub max_runtime: Option<u64>,
33    pub metadata: Option<HashMap<String, serde_json::Value>>,
34}
35
36/// Shell 管理器配置
37#[derive(Debug, Clone)]
38pub struct ShellManagerOptions {
39    pub max_shells: usize,
40    pub max_output_size: usize,
41    pub default_max_runtime: u64,
42}
43
44impl Default for ShellManagerOptions {
45    fn default() -> Self {
46        Self {
47            max_shells: 10,
48            max_output_size: 10 * 1024 * 1024, // 10MB
49            default_max_runtime: 3600000,      // 1 hour
50        }
51    }
52}
53
54/// Shell 创建结果
55#[derive(Debug)]
56pub struct CreateShellResult {
57    pub success: bool,
58    pub id: Option<String>,
59    pub error: Option<String>,
60}
61
62/// Shell 管理器
63pub struct ShellManager {
64    shells: Arc<RwLock<HashMap<String, BackgroundShell>>>,
65    max_shells: usize,
66    max_output_size: usize,
67    default_max_runtime: u64,
68    event_tx: broadcast::Sender<ShellOutputEvent>,
69}
70
71impl ShellManager {
72    /// 创建新的 Shell 管理器
73    pub fn new(options: ShellManagerOptions) -> Self {
74        let (event_tx, _) = broadcast::channel(1000);
75        Self {
76            shells: Arc::new(RwLock::new(HashMap::new())),
77            max_shells: options.max_shells,
78            max_output_size: options.max_output_size,
79            default_max_runtime: options.default_max_runtime,
80            event_tx,
81        }
82    }
83
84    /// 订阅输出事件
85    pub fn subscribe(&self) -> broadcast::Receiver<ShellOutputEvent> {
86        self.event_tx.subscribe()
87    }
88
89    /// 生成唯一的 Shell ID
90    fn generate_shell_id(&self) -> String {
91        let uuid_str = uuid::Uuid::new_v4().to_string();
92        format!(
93            "bash_{}_{}",
94            chrono::Utc::now().timestamp_millis(),
95            uuid_str.get(..8).unwrap_or(&uuid_str)
96        )
97    }
98
99    /// 创建并启动后台 Shell
100    pub async fn create_shell(
101        &self,
102        command: &str,
103        cwd: Option<&str>,
104        max_runtime: Option<u64>,
105        metadata: Option<HashMap<String, serde_json::Value>>,
106    ) -> CreateShellResult {
107        // 检查 shell 数量限制
108        let shell_count = self.shells.read().await.len();
109        if shell_count >= self.max_shells {
110            let cleaned = self.cleanup_completed().await;
111            if cleaned == 0 && shell_count >= self.max_shells {
112                return CreateShellResult {
113                    success: false,
114                    id: None,
115                    error: Some(format!(
116                        "Maximum number of background shells ({}) reached",
117                        self.max_shells
118                    )),
119                };
120            }
121        }
122
123        let id = self.generate_shell_id();
124        let working_dir = cwd.unwrap_or(".").to_string();
125        let runtime = max_runtime.unwrap_or(self.default_max_runtime);
126
127        // 创建进程
128        let child = match Command::new("bash")
129            .arg("-c")
130            .arg(command)
131            .current_dir(&working_dir)
132            .stdout(Stdio::piped())
133            .stderr(Stdio::piped())
134            .spawn()
135        {
136            Ok(c) => c,
137            Err(e) => {
138                return CreateShellResult {
139                    success: false,
140                    id: None,
141                    error: Some(format!("Failed to spawn process: {}", e)),
142                };
143            }
144        };
145
146        let shell = BackgroundShell {
147            id: id.clone(),
148            command: command.to_string(),
149            cwd: working_dir,
150            process: Some(child),
151            status: ShellStatus::Running,
152            start_time: chrono::Utc::now().timestamp_millis(),
153            end_time: None,
154            exit_code: None,
155            output: Vec::new(),
156            output_size: 0,
157            max_runtime: Some(runtime),
158            metadata,
159        };
160
161        self.shells.write().await.insert(id.clone(), shell);
162        self.spawn_output_reader(id.clone()).await;
163
164        CreateShellResult {
165            success: true,
166            id: Some(id),
167            error: None,
168        }
169    }
170
171    /// 启动输出读取器
172    async fn spawn_output_reader(&self, shell_id: String) {
173        let shells = Arc::clone(&self.shells);
174        let event_tx = self.event_tx.clone();
175        let max_output_size = self.max_output_size;
176
177        tokio::spawn(async move {
178            let mut shells_guard = shells.write().await;
179            if let Some(shell) = shells_guard.get_mut(&shell_id) {
180                if let Some(ref mut process) = shell.process {
181                    if let Some(stdout) = process.stdout.take() {
182                        let shells_clone = Arc::clone(&shells);
183                        let id_clone = shell_id.clone();
184                        let tx_clone = event_tx.clone();
185
186                        tokio::spawn(async move {
187                            let reader = BufReader::new(stdout);
188                            let mut lines = reader.lines();
189                            while let Ok(Some(line)) = lines.next_line().await {
190                                let mut guard = shells_clone.write().await;
191                                if let Some(s) = guard.get_mut(&id_clone) {
192                                    if s.output_size < max_output_size {
193                                        s.output.push(line.clone());
194                                        s.output_size += line.len();
195                                    }
196                                }
197                                let _ = tx_clone.send(ShellOutputEvent {
198                                    id: id_clone.clone(),
199                                    data: line,
200                                    output_type: ShellOutputType::Stdout,
201                                });
202                            }
203                        });
204                    }
205                }
206            }
207        });
208    }
209
210    /// 获取 Shell 状态
211    pub async fn get_shell(&self, id: &str) -> Option<ShellStatus> {
212        self.shells.read().await.get(id).map(|s| s.status)
213    }
214
215    /// 获取 Shell 输出
216    pub async fn get_output(&self, id: &str, clear: bool) -> Option<String> {
217        let mut shells = self.shells.write().await;
218        if let Some(shell) = shells.get_mut(id) {
219            let output = shell.output.join("\n");
220            if clear {
221                shell.output.clear();
222            }
223            Some(output)
224        } else {
225            None
226        }
227    }
228
229    /// 终止 Shell
230    pub async fn terminate_shell(&self, id: &str) -> bool {
231        let mut shells = self.shells.write().await;
232        if let Some(shell) = shells.get_mut(id) {
233            if let Some(ref mut process) = shell.process {
234                let _ = process.kill().await;
235            }
236            shell.status = ShellStatus::Terminated;
237            shell.end_time = Some(chrono::Utc::now().timestamp_millis());
238            true
239        } else {
240            false
241        }
242    }
243
244    /// 列出所有 Shell
245    pub async fn list_shells(&self) -> Vec<(String, String, ShellStatus, i64, usize)> {
246        self.shells
247            .read()
248            .await
249            .values()
250            .map(|s| {
251                let duration = s
252                    .end_time
253                    .unwrap_or_else(|| chrono::Utc::now().timestamp_millis())
254                    - s.start_time;
255                (
256                    s.id.clone(),
257                    s.command.chars().take(100).collect(),
258                    s.status,
259                    duration,
260                    s.output_size,
261                )
262            })
263            .collect()
264    }
265
266    /// 清理已完成的 Shell
267    pub async fn cleanup_completed(&self) -> usize {
268        let mut shells = self.shells.write().await;
269        let to_remove: Vec<String> = shells
270            .iter()
271            .filter(|(_, s)| {
272                matches!(
273                    s.status,
274                    ShellStatus::Completed | ShellStatus::Failed | ShellStatus::Terminated
275                )
276            })
277            .map(|(id, _)| id.clone())
278            .collect();
279
280        let count = to_remove.len();
281        for id in to_remove {
282            shells.remove(&id);
283        }
284        count
285    }
286
287    /// 终止所有 Shell
288    pub async fn terminate_all(&self) -> usize {
289        let mut shells = self.shells.write().await;
290        let mut terminated = 0;
291        for shell in shells.values_mut() {
292            if let Some(ref mut process) = shell.process {
293                if process.kill().await.is_ok() {
294                    terminated += 1;
295                }
296            }
297            shell.status = ShellStatus::Terminated;
298        }
299        shells.clear();
300        terminated
301    }
302
303    /// 获取统计信息
304    pub async fn get_stats(&self) -> ShellStats {
305        let shells = self.shells.read().await;
306        let mut stats = ShellStats {
307            total: shells.len(),
308            running: 0,
309            completed: 0,
310            failed: 0,
311            paused: 0,
312            terminated: 0,
313            max_shells: self.max_shells,
314            available: 0,
315        };
316
317        for shell in shells.values() {
318            match shell.status {
319                ShellStatus::Running => stats.running += 1,
320                ShellStatus::Completed => stats.completed += 1,
321                ShellStatus::Failed => stats.failed += 1,
322                ShellStatus::Paused => stats.paused += 1,
323                ShellStatus::Terminated => stats.terminated += 1,
324            }
325        }
326
327        stats.available = self.max_shells.saturating_sub(stats.running + stats.paused);
328        stats
329    }
330}