Skip to main content

aster/blueprint/
worker_sandbox.rs

1//! Worker 沙箱隔离机制
2//!
3//! 实现多 Worker 并发执行的隔离和同步:
4//! - 文件系统隔离:每个 Worker 有独立的沙箱目录
5//! - 文件锁机制:防止并发修改冲突
6//! - 资源限制:控制 Worker 的资源使用
7//!
8
9use chrono::{DateTime, Utc};
10use serde::{Deserialize, Serialize};
11use sha2::{Digest, Sha256};
12use std::collections::HashMap;
13use std::fs;
14use std::path::{Path, PathBuf};
15use std::sync::{Arc, RwLock};
16
17// ============================================================================
18// 类型定义
19// ============================================================================
20
21/// 沙箱配置
22#[derive(Debug, Clone)]
23pub struct SandboxConfig {
24    /// Worker ID
25    pub worker_id: String,
26    /// 任务 ID
27    pub task_id: String,
28    /// 项目根目录
29    pub base_dir: PathBuf,
30    /// 沙箱目录(默认 ~/.aster/sandbox/{worker_id})
31    pub sandbox_dir: Option<PathBuf>,
32}
33
34/// 文件同步结果
35#[derive(Debug, Clone, Default)]
36pub struct SyncResult {
37    /// 同步成功的文件
38    pub success: Vec<String>,
39    /// 同步失败的文件
40    pub failed: Vec<SyncFailure>,
41    /// 冲突的文件
42    pub conflicts: Vec<SyncConflict>,
43    /// 总计文件数
44    pub total: usize,
45}
46
47/// 同步失败信息
48#[derive(Debug, Clone)]
49pub struct SyncFailure {
50    pub file: String,
51    pub error: String,
52}
53
54/// 同步冲突信息
55#[derive(Debug, Clone)]
56pub struct SyncConflict {
57    pub file: String,
58    pub reason: String,
59}
60
61/// 锁信息
62#[derive(Debug, Clone, Serialize, Deserialize)]
63pub struct LockInfo {
64    /// Worker ID
65    pub worker_id: String,
66    /// 进程 ID
67    pub pid: u32,
68    /// 文件路径
69    pub file_path: String,
70    /// 锁定时间戳
71    pub timestamp: DateTime<Utc>,
72    /// 超时时间(毫秒)
73    pub timeout: u64,
74}
75
76/// 文件元数据
77#[derive(Debug, Clone)]
78#[allow(dead_code)]
79struct FileMetadata {
80    /// 文件路径(相对于 base_dir)
81    relative_path: String,
82    /// 文件内容 hash
83    hash: String,
84    /// 修改时间
85    mtime: i64,
86    /// 文件大小
87    size: u64,
88}
89
90// ============================================================================
91// 工具函数
92// ============================================================================
93
94/// 计算文件内容的 hash
95fn compute_file_hash(file_path: &Path) -> Result<String, std::io::Error> {
96    let content = fs::read(file_path)?;
97    let mut hasher = Sha256::new();
98    hasher.update(&content);
99    Ok(format!("{:x}", hasher.finalize()))
100}
101
102/// 计算字符串的 hash(用于文件路径)
103fn compute_string_hash(s: &str) -> String {
104    let mut hasher = Sha256::new();
105    hasher.update(s.as_bytes());
106    let hash = format!("{:x}", hasher.finalize());
107    hash.get(..16).unwrap_or(&hash).to_string()
108}
109
110/// 递归复制目录
111fn copy_directory_recursive(src: &Path, dest: &Path) -> Result<(), std::io::Error> {
112    if !dest.exists() {
113        fs::create_dir_all(dest)?;
114    }
115
116    for entry in fs::read_dir(src)? {
117        let entry = entry?;
118        let src_path = entry.path();
119        let dest_path = dest.join(entry.file_name());
120
121        if src_path.is_dir() {
122            copy_directory_recursive(&src_path, &dest_path)?;
123        } else {
124            fs::copy(&src_path, &dest_path)?;
125        }
126    }
127
128    Ok(())
129}
130
131/// 获取默认沙箱根目录
132fn get_default_sandbox_root() -> PathBuf {
133    dirs::home_dir()
134        .unwrap_or_else(|| PathBuf::from("."))
135        .join(".aster")
136        .join("sandbox")
137}
138
139// ============================================================================
140// 文件锁管理器
141// ============================================================================
142
143/// 文件锁管理器
144///
145/// 使用文件系统实现分布式锁:
146/// - 锁文件存储在 ~/.aster/sandbox/locks/
147/// - 支持超时和死锁检测
148pub struct FileLockManager {
149    lock_dir: PathBuf,
150    locks: Arc<RwLock<HashMap<String, LockInfo>>>,
151    default_timeout: u64,
152}
153
154impl FileLockManager {
155    /// 创建新的文件锁管理器
156    pub fn new(lock_dir: Option<PathBuf>) -> Self {
157        let lock_dir = lock_dir.unwrap_or_else(|| get_default_sandbox_root().join("locks"));
158
159        // 确保锁目录存在
160        let _ = fs::create_dir_all(&lock_dir);
161
162        Self {
163            lock_dir,
164            locks: Arc::new(RwLock::new(HashMap::new())),
165            default_timeout: 300000, // 5 分钟
166        }
167    }
168
169    /// 获取锁文件路径
170    fn get_lock_file_path(&self, file_path: &str) -> PathBuf {
171        let hash = compute_string_hash(file_path);
172        self.lock_dir.join(format!("{}.lock", hash))
173    }
174
175    /// 读取锁信息
176    fn read_lock_info(&self, lock_file_path: &Path) -> Option<LockInfo> {
177        let content = fs::read_to_string(lock_file_path).ok()?;
178        serde_json::from_str(&content).ok()
179    }
180
181    /// 写入锁信息
182    fn write_lock_info(&self, lock_file_path: &Path, lock_info: &LockInfo) -> Result<(), String> {
183        let content = serde_json::to_string_pretty(lock_info)
184            .map_err(|e| format!("序列化锁信息失败: {}", e))?;
185        fs::write(lock_file_path, content).map_err(|e| format!("写入锁文件失败: {}", e))
186    }
187
188    /// 检查锁是否过期
189    fn is_lock_expired(&self, lock_info: &LockInfo) -> bool {
190        let now = Utc::now();
191        let elapsed = (now - lock_info.timestamp).num_milliseconds() as u64;
192        elapsed > lock_info.timeout
193    }
194
195    /// 获取文件锁
196    pub fn acquire_lock(
197        &self,
198        file_path: &str,
199        worker_id: &str,
200        timeout: Option<u64>,
201    ) -> Result<bool, String> {
202        let lock_file_path = self.get_lock_file_path(file_path);
203        let timeout = timeout.unwrap_or(self.default_timeout);
204
205        // 检查是否已经存在锁
206        if lock_file_path.exists() {
207            if let Some(existing_lock) = self.read_lock_info(&lock_file_path) {
208                // 如果是同一个 Worker,允许重入
209                if existing_lock.worker_id == worker_id {
210                    return Ok(true);
211                }
212
213                // 检查锁是否过期
214                if self.is_lock_expired(&existing_lock) {
215                    // 锁已过期,删除它
216                    let _ = fs::remove_file(&lock_file_path);
217                } else {
218                    // 锁仍然有效,无法获取
219                    return Ok(false);
220                }
221            }
222        }
223
224        // 创建锁信息
225        let lock_info = LockInfo {
226            worker_id: worker_id.to_string(),
227            pid: std::process::id(),
228            file_path: file_path.to_string(),
229            timestamp: Utc::now(),
230            timeout,
231        };
232
233        // 写入锁文件
234        self.write_lock_info(&lock_file_path, &lock_info)?;
235
236        // 记录锁
237        if let Ok(mut locks) = self.locks.write() {
238            locks.insert(file_path.to_string(), lock_info);
239        }
240
241        Ok(true)
242    }
243
244    /// 释放文件锁
245    pub fn release_lock(&self, file_path: &str, worker_id: &str) -> Result<(), String> {
246        let lock_file_path = self.get_lock_file_path(file_path);
247
248        if !lock_file_path.exists() {
249            return Ok(());
250        }
251
252        if let Some(lock_info) = self.read_lock_info(&lock_file_path) {
253            // 只有持有锁的 Worker 才能释放
254            if lock_info.worker_id != worker_id {
255                return Err(format!(
256                    "无法释放锁:文件被 worker {} 锁定,而非 {}",
257                    lock_info.worker_id, worker_id
258                ));
259            }
260        }
261
262        fs::remove_file(&lock_file_path).map_err(|e| format!("删除锁文件失败: {}", e))?;
263
264        if let Ok(mut locks) = self.locks.write() {
265            locks.remove(file_path);
266        }
267
268        Ok(())
269    }
270
271    /// 检查文件是否被锁定
272    pub fn is_locked(&self, file_path: &str) -> bool {
273        let lock_file_path = self.get_lock_file_path(file_path);
274
275        if !lock_file_path.exists() {
276            return false;
277        }
278
279        if let Some(lock_info) = self.read_lock_info(&lock_file_path) {
280            if self.is_lock_expired(&lock_info) {
281                let _ = fs::remove_file(&lock_file_path);
282                return false;
283            }
284            return true;
285        }
286
287        false
288    }
289
290    /// 获取锁定该文件的 Worker
291    pub fn get_locker(&self, file_path: &str) -> Option<String> {
292        let lock_file_path = self.get_lock_file_path(file_path);
293
294        if !lock_file_path.exists() {
295            return None;
296        }
297
298        let lock_info = self.read_lock_info(&lock_file_path)?;
299
300        if self.is_lock_expired(&lock_info) {
301            let _ = fs::remove_file(&lock_file_path);
302            return None;
303        }
304
305        Some(lock_info.worker_id)
306    }
307
308    /// 获取所有活跃的锁
309    pub fn get_active_locks(&self) -> Vec<LockInfo> {
310        let mut locks = Vec::new();
311
312        if !self.lock_dir.exists() {
313            return locks;
314        }
315
316        if let Ok(entries) = fs::read_dir(&self.lock_dir) {
317            for entry in entries.flatten() {
318                let path = entry.path();
319                if path.extension().is_some_and(|ext| ext == "lock") {
320                    if let Some(lock_info) = self.read_lock_info(&path) {
321                        if !self.is_lock_expired(&lock_info) {
322                            locks.push(lock_info);
323                        }
324                    }
325                }
326            }
327        }
328
329        locks
330    }
331
332    /// 清理所有过期锁
333    pub fn cleanup_stale_locks(&self) -> usize {
334        let mut cleaned = 0;
335
336        if !self.lock_dir.exists() {
337            return cleaned;
338        }
339
340        if let Ok(entries) = fs::read_dir(&self.lock_dir) {
341            for entry in entries.flatten() {
342                let path = entry.path();
343                if path.extension().is_some_and(|ext| ext == "lock") {
344                    if let Some(lock_info) = self.read_lock_info(&path) {
345                        if self.is_lock_expired(&lock_info) && fs::remove_file(&path).is_ok() {
346                            cleaned += 1;
347                        }
348                    }
349                }
350            }
351        }
352
353        cleaned
354    }
355
356    /// 释放指定 Worker 的所有锁
357    pub fn release_all_locks(&self, worker_id: &str) -> usize {
358        let mut released = 0;
359
360        if !self.lock_dir.exists() {
361            return released;
362        }
363
364        if let Ok(entries) = fs::read_dir(&self.lock_dir) {
365            for entry in entries.flatten() {
366                let path = entry.path();
367                if path.extension().is_some_and(|ext| ext == "lock") {
368                    if let Some(lock_info) = self.read_lock_info(&path) {
369                        if lock_info.worker_id == worker_id && fs::remove_file(&path).is_ok() {
370                            released += 1;
371                        }
372                    }
373                }
374            }
375        }
376
377        released
378    }
379}
380
381impl Default for FileLockManager {
382    fn default() -> Self {
383        Self::new(None)
384    }
385}
386
387// ============================================================================
388// Worker 沙箱
389// ============================================================================
390
391/// Worker 沙箱
392///
393/// 为每个 Worker 提供隔离的工作环境:
394/// - 独立的文件系统空间
395/// - 文件修改的版本控制
396/// - 安全的同步机制
397pub struct WorkerSandbox {
398    config: SandboxConfig,
399    sandbox_dir: PathBuf,
400    lock_manager: Arc<FileLockManager>,
401    copied_files: HashMap<String, FileMetadata>,
402}
403
404impl WorkerSandbox {
405    /// 创建新的 Worker 沙箱
406    pub fn new(config: SandboxConfig, lock_manager: Option<Arc<FileLockManager>>) -> Self {
407        let sandbox_dir = config
408            .sandbox_dir
409            .clone()
410            .unwrap_or_else(|| get_default_sandbox_root().join(&config.worker_id));
411
412        Self {
413            config,
414            sandbox_dir,
415            lock_manager: lock_manager.unwrap_or_else(|| Arc::new(FileLockManager::default())),
416            copied_files: HashMap::new(),
417        }
418    }
419
420    /// 创建沙箱环境
421    pub fn setup(&self) -> Result<(), String> {
422        // 创建沙箱目录
423        fs::create_dir_all(&self.sandbox_dir).map_err(|e| format!("创建沙箱目录失败: {}", e))?;
424
425        // 创建元数据文件
426        let metadata_path = self.sandbox_dir.join(".sandbox-metadata.json");
427        let metadata = serde_json::json!({
428            "worker_id": self.config.worker_id,
429            "task_id": self.config.task_id,
430            "base_dir": self.config.base_dir.to_string_lossy(),
431            "created_at": Utc::now().to_rfc3339(),
432            "pid": std::process::id(),
433        });
434
435        fs::write(
436            &metadata_path,
437            serde_json::to_string_pretty(&metadata).unwrap(),
438        )
439        .map_err(|e| format!("写入元数据失败: {}", e))?;
440
441        Ok(())
442    }
443
444    /// 将文件复制到沙箱
445    pub fn copy_to_sandbox(&mut self, files: &[String]) -> Result<(), String> {
446        for file in files {
447            let absolute_path = if Path::new(file).is_absolute() {
448                PathBuf::from(file)
449            } else {
450                self.config.base_dir.join(file)
451            };
452
453            if !absolute_path.exists() {
454                continue;
455            }
456
457            // 计算相对路径
458            let relative_path = absolute_path
459                .strip_prefix(&self.config.base_dir)
460                .map_err(|_| format!("文件不在基础目录内: {}", file))?
461                .to_string_lossy()
462                .to_string();
463
464            let sandbox_path = self.sandbox_dir.join(&relative_path);
465
466            // 确保目标目录存在
467            if let Some(parent) = sandbox_path.parent() {
468                fs::create_dir_all(parent).map_err(|e| format!("创建目录失败: {}", e))?;
469            }
470
471            // 复制文件或目录
472            let metadata =
473                fs::metadata(&absolute_path).map_err(|e| format!("获取文件元数据失败: {}", e))?;
474
475            if metadata.is_dir() {
476                copy_directory_recursive(&absolute_path, &sandbox_path)
477                    .map_err(|e| format!("复制目录失败: {}", e))?;
478            } else {
479                fs::copy(&absolute_path, &sandbox_path)
480                    .map_err(|e| format!("复制文件失败: {}", e))?;
481
482                // 记录文件元数据
483                if let Ok(hash) = compute_file_hash(&absolute_path) {
484                    self.copied_files.insert(
485                        relative_path.clone(),
486                        FileMetadata {
487                            relative_path,
488                            hash,
489                            mtime: metadata
490                                .modified()
491                                .map(|t| {
492                                    t.duration_since(std::time::UNIX_EPOCH).unwrap().as_secs()
493                                        as i64
494                                })
495                                .unwrap_or(0),
496                            size: metadata.len(),
497                        },
498                    );
499                }
500            }
501        }
502
503        Ok(())
504    }
505
506    /// 将修改同步回主目录(需要锁)
507    pub fn sync_back(&self) -> SyncResult {
508        let mut result = SyncResult::default();
509
510        // 扫描沙箱中的文件
511        let sandbox_files = self.scan_sandbox_files();
512        result.total = sandbox_files.len();
513
514        for sandbox_file in sandbox_files {
515            let relative_path = sandbox_file
516                .strip_prefix(&self.sandbox_dir)
517                .map(|p| p.to_string_lossy().to_string())
518                .unwrap_or_default();
519
520            let original_path = self.config.base_dir.join(&relative_path);
521
522            // 获取沙箱文件的 hash
523            let sandbox_hash = match compute_file_hash(&sandbox_file) {
524                Ok(h) => h,
525                Err(e) => {
526                    result.failed.push(SyncFailure {
527                        file: relative_path,
528                        error: format!("计算 hash 失败: {}", e),
529                    });
530                    continue;
531                }
532            };
533
534            // 检查文件是否被修改
535            if let Some(original_metadata) = self.copied_files.get(&relative_path) {
536                if original_metadata.hash == sandbox_hash {
537                    // 文件未修改,跳过
538                    continue;
539                }
540            }
541
542            // 获取文件锁
543            let lock_acquired = self.lock_manager.acquire_lock(
544                original_path.to_str().unwrap_or(""),
545                &self.config.worker_id,
546                Some(60000),
547            );
548
549            match lock_acquired {
550                Ok(true) => {
551                    // 冲突检测:检查主目录文件是否也被修改
552                    if original_path.exists() {
553                        if let Some(original_metadata) = self.copied_files.get(&relative_path) {
554                            if let Ok(current_hash) = compute_file_hash(&original_path) {
555                                if original_metadata.hash != current_hash {
556                                    result.conflicts.push(SyncConflict {
557                                        file: relative_path.clone(),
558                                        reason: "文件在沙箱和主目录中都被修改".to_string(),
559                                    });
560                                    let _ = self.lock_manager.release_lock(
561                                        original_path.to_str().unwrap_or(""),
562                                        &self.config.worker_id,
563                                    );
564                                    continue;
565                                }
566                            }
567                        }
568                    }
569
570                    // 同步文件
571                    if let Some(parent) = original_path.parent() {
572                        let _ = fs::create_dir_all(parent);
573                    }
574
575                    match fs::copy(&sandbox_file, &original_path) {
576                        Ok(_) => result.success.push(relative_path.clone()),
577                        Err(e) => result.failed.push(SyncFailure {
578                            file: relative_path.clone(),
579                            error: format!("复制文件失败: {}", e),
580                        }),
581                    }
582
583                    // 释放锁
584                    let _ = self
585                        .lock_manager
586                        .release_lock(original_path.to_str().unwrap_or(""), &self.config.worker_id);
587                }
588                Ok(false) => {
589                    let locker = self
590                        .lock_manager
591                        .get_locker(original_path.to_str().unwrap_or(""));
592                    result.failed.push(SyncFailure {
593                        file: relative_path,
594                        error: format!("无法获取锁,被 {:?} 锁定", locker),
595                    });
596                }
597                Err(e) => {
598                    result.failed.push(SyncFailure {
599                        file: relative_path,
600                        error: e,
601                    });
602                }
603            }
604        }
605
606        result
607    }
608
609    /// 扫描沙箱中的所有文件
610    fn scan_sandbox_files(&self) -> Vec<PathBuf> {
611        let mut files = Vec::new();
612        self.scan_directory(&self.sandbox_dir, &mut files);
613        files
614    }
615
616    fn scan_directory(&self, dir: &Path, files: &mut Vec<PathBuf>) {
617        if !dir.exists() {
618            return;
619        }
620
621        if let Ok(entries) = fs::read_dir(dir) {
622            for entry in entries.flatten() {
623                let path = entry.path();
624
625                // 跳过元数据文件
626                if path
627                    .file_name()
628                    .is_some_and(|n| n == ".sandbox-metadata.json")
629                {
630                    continue;
631                }
632
633                if path.is_dir() {
634                    self.scan_directory(&path, files);
635                } else if path.is_file() {
636                    files.push(path);
637                }
638            }
639        }
640    }
641
642    /// 清理沙箱
643    pub fn cleanup(&self) -> Result<usize, String> {
644        // 释放所有锁
645        let released = self.lock_manager.release_all_locks(&self.config.worker_id);
646
647        // 删除沙箱目录
648        if self.sandbox_dir.exists() {
649            fs::remove_dir_all(&self.sandbox_dir)
650                .map_err(|e| format!("删除沙箱目录失败: {}", e))?;
651        }
652
653        Ok(released)
654    }
655
656    /// 获取沙箱目录
657    pub fn sandbox_dir(&self) -> &Path {
658        &self.sandbox_dir
659    }
660
661    /// 获取沙箱中的文件路径
662    pub fn get_sandbox_path(&self, relative_path: &str) -> PathBuf {
663        self.sandbox_dir.join(relative_path)
664    }
665
666    /// 检查文件是否在沙箱中
667    pub fn has_file(&self, relative_path: &str) -> bool {
668        self.get_sandbox_path(relative_path).exists()
669    }
670
671    /// 获取沙箱统计信息
672    pub fn get_stats(&self) -> SandboxStats {
673        let files = self.scan_sandbox_files();
674        let total_size: u64 = files
675            .iter()
676            .filter_map(|f| fs::metadata(f).ok())
677            .map(|m| m.len())
678            .sum();
679
680        SandboxStats {
681            file_count: files.len(),
682            total_size,
683            copied_files: self.copied_files.len(),
684        }
685    }
686}
687
688/// 沙箱统计信息
689#[derive(Debug, Clone)]
690pub struct SandboxStats {
691    pub file_count: usize,
692    pub total_size: u64,
693    pub copied_files: usize,
694}
695
696// ============================================================================
697// 工厂函数
698// ============================================================================
699
700/// 创建全局文件锁管理器
701pub fn create_lock_manager(lock_dir: Option<PathBuf>) -> Arc<FileLockManager> {
702    Arc::new(FileLockManager::new(lock_dir))
703}
704
705/// 创建 Worker 沙箱
706pub fn create_worker_sandbox(
707    config: SandboxConfig,
708    lock_manager: Option<Arc<FileLockManager>>,
709) -> WorkerSandbox {
710    WorkerSandbox::new(config, lock_manager)
711}
712
713#[cfg(test)]
714mod tests {
715    use super::*;
716    use std::env::temp_dir;
717
718    #[test]
719    fn test_compute_string_hash() {
720        let hash1 = compute_string_hash("test");
721        let hash2 = compute_string_hash("test");
722        let hash3 = compute_string_hash("different");
723
724        assert_eq!(hash1, hash2);
725        assert_ne!(hash1, hash3);
726        assert_eq!(hash1.len(), 16);
727    }
728
729    #[test]
730    fn test_file_lock_manager() {
731        let lock_dir = temp_dir().join("aster_test_locks");
732        let manager = FileLockManager::new(Some(lock_dir.clone()));
733
734        // 获取锁
735        let result = manager.acquire_lock("/test/file.rs", "worker1", None);
736        assert!(result.is_ok());
737        assert!(result.unwrap());
738
739        // 检查锁状态
740        assert!(manager.is_locked("/test/file.rs"));
741        assert_eq!(
742            manager.get_locker("/test/file.rs"),
743            Some("worker1".to_string())
744        );
745
746        // 释放锁
747        let result = manager.release_lock("/test/file.rs", "worker1");
748        assert!(result.is_ok());
749        assert!(!manager.is_locked("/test/file.rs"));
750
751        // 清理
752        let _ = fs::remove_dir_all(lock_dir);
753    }
754
755    #[test]
756    fn test_sandbox_config() {
757        let config = SandboxConfig {
758            worker_id: "test_worker".to_string(),
759            task_id: "test_task".to_string(),
760            base_dir: PathBuf::from("/tmp/test"),
761            sandbox_dir: None,
762        };
763
764        assert_eq!(config.worker_id, "test_worker");
765        assert_eq!(config.task_id, "test_task");
766    }
767}