1use chrono::{DateTime, Utc};
10use serde::{Deserialize, Serialize};
11use sha2::{Digest, Sha256};
12use std::collections::HashMap;
13use std::fs;
14use std::path::{Path, PathBuf};
15use std::sync::{Arc, RwLock};
16
17#[derive(Debug, Clone)]
23pub struct SandboxConfig {
24 pub worker_id: String,
26 pub task_id: String,
28 pub base_dir: PathBuf,
30 pub sandbox_dir: Option<PathBuf>,
32}
33
34#[derive(Debug, Clone, Default)]
36pub struct SyncResult {
37 pub success: Vec<String>,
39 pub failed: Vec<SyncFailure>,
41 pub conflicts: Vec<SyncConflict>,
43 pub total: usize,
45}
46
47#[derive(Debug, Clone)]
49pub struct SyncFailure {
50 pub file: String,
51 pub error: String,
52}
53
54#[derive(Debug, Clone)]
56pub struct SyncConflict {
57 pub file: String,
58 pub reason: String,
59}
60
61#[derive(Debug, Clone, Serialize, Deserialize)]
63pub struct LockInfo {
64 pub worker_id: String,
66 pub pid: u32,
68 pub file_path: String,
70 pub timestamp: DateTime<Utc>,
72 pub timeout: u64,
74}
75
76#[derive(Debug, Clone)]
78#[allow(dead_code)]
79struct FileMetadata {
80 relative_path: String,
82 hash: String,
84 mtime: i64,
86 size: u64,
88}
89
90fn compute_file_hash(file_path: &Path) -> Result<String, std::io::Error> {
96 let content = fs::read(file_path)?;
97 let mut hasher = Sha256::new();
98 hasher.update(&content);
99 Ok(format!("{:x}", hasher.finalize()))
100}
101
102fn compute_string_hash(s: &str) -> String {
104 let mut hasher = Sha256::new();
105 hasher.update(s.as_bytes());
106 let hash = format!("{:x}", hasher.finalize());
107 hash.get(..16).unwrap_or(&hash).to_string()
108}
109
110fn copy_directory_recursive(src: &Path, dest: &Path) -> Result<(), std::io::Error> {
112 if !dest.exists() {
113 fs::create_dir_all(dest)?;
114 }
115
116 for entry in fs::read_dir(src)? {
117 let entry = entry?;
118 let src_path = entry.path();
119 let dest_path = dest.join(entry.file_name());
120
121 if src_path.is_dir() {
122 copy_directory_recursive(&src_path, &dest_path)?;
123 } else {
124 fs::copy(&src_path, &dest_path)?;
125 }
126 }
127
128 Ok(())
129}
130
131fn get_default_sandbox_root() -> PathBuf {
133 dirs::home_dir()
134 .unwrap_or_else(|| PathBuf::from("."))
135 .join(".aster")
136 .join("sandbox")
137}
138
139pub struct FileLockManager {
149 lock_dir: PathBuf,
150 locks: Arc<RwLock<HashMap<String, LockInfo>>>,
151 default_timeout: u64,
152}
153
154impl FileLockManager {
155 pub fn new(lock_dir: Option<PathBuf>) -> Self {
157 let lock_dir = lock_dir.unwrap_or_else(|| get_default_sandbox_root().join("locks"));
158
159 let _ = fs::create_dir_all(&lock_dir);
161
162 Self {
163 lock_dir,
164 locks: Arc::new(RwLock::new(HashMap::new())),
165 default_timeout: 300000, }
167 }
168
169 fn get_lock_file_path(&self, file_path: &str) -> PathBuf {
171 let hash = compute_string_hash(file_path);
172 self.lock_dir.join(format!("{}.lock", hash))
173 }
174
175 fn read_lock_info(&self, lock_file_path: &Path) -> Option<LockInfo> {
177 let content = fs::read_to_string(lock_file_path).ok()?;
178 serde_json::from_str(&content).ok()
179 }
180
181 fn write_lock_info(&self, lock_file_path: &Path, lock_info: &LockInfo) -> Result<(), String> {
183 let content = serde_json::to_string_pretty(lock_info)
184 .map_err(|e| format!("序列化锁信息失败: {}", e))?;
185 fs::write(lock_file_path, content).map_err(|e| format!("写入锁文件失败: {}", e))
186 }
187
188 fn is_lock_expired(&self, lock_info: &LockInfo) -> bool {
190 let now = Utc::now();
191 let elapsed = (now - lock_info.timestamp).num_milliseconds() as u64;
192 elapsed > lock_info.timeout
193 }
194
195 pub fn acquire_lock(
197 &self,
198 file_path: &str,
199 worker_id: &str,
200 timeout: Option<u64>,
201 ) -> Result<bool, String> {
202 let lock_file_path = self.get_lock_file_path(file_path);
203 let timeout = timeout.unwrap_or(self.default_timeout);
204
205 if lock_file_path.exists() {
207 if let Some(existing_lock) = self.read_lock_info(&lock_file_path) {
208 if existing_lock.worker_id == worker_id {
210 return Ok(true);
211 }
212
213 if self.is_lock_expired(&existing_lock) {
215 let _ = fs::remove_file(&lock_file_path);
217 } else {
218 return Ok(false);
220 }
221 }
222 }
223
224 let lock_info = LockInfo {
226 worker_id: worker_id.to_string(),
227 pid: std::process::id(),
228 file_path: file_path.to_string(),
229 timestamp: Utc::now(),
230 timeout,
231 };
232
233 self.write_lock_info(&lock_file_path, &lock_info)?;
235
236 if let Ok(mut locks) = self.locks.write() {
238 locks.insert(file_path.to_string(), lock_info);
239 }
240
241 Ok(true)
242 }
243
244 pub fn release_lock(&self, file_path: &str, worker_id: &str) -> Result<(), String> {
246 let lock_file_path = self.get_lock_file_path(file_path);
247
248 if !lock_file_path.exists() {
249 return Ok(());
250 }
251
252 if let Some(lock_info) = self.read_lock_info(&lock_file_path) {
253 if lock_info.worker_id != worker_id {
255 return Err(format!(
256 "无法释放锁:文件被 worker {} 锁定,而非 {}",
257 lock_info.worker_id, worker_id
258 ));
259 }
260 }
261
262 fs::remove_file(&lock_file_path).map_err(|e| format!("删除锁文件失败: {}", e))?;
263
264 if let Ok(mut locks) = self.locks.write() {
265 locks.remove(file_path);
266 }
267
268 Ok(())
269 }
270
271 pub fn is_locked(&self, file_path: &str) -> bool {
273 let lock_file_path = self.get_lock_file_path(file_path);
274
275 if !lock_file_path.exists() {
276 return false;
277 }
278
279 if let Some(lock_info) = self.read_lock_info(&lock_file_path) {
280 if self.is_lock_expired(&lock_info) {
281 let _ = fs::remove_file(&lock_file_path);
282 return false;
283 }
284 return true;
285 }
286
287 false
288 }
289
290 pub fn get_locker(&self, file_path: &str) -> Option<String> {
292 let lock_file_path = self.get_lock_file_path(file_path);
293
294 if !lock_file_path.exists() {
295 return None;
296 }
297
298 let lock_info = self.read_lock_info(&lock_file_path)?;
299
300 if self.is_lock_expired(&lock_info) {
301 let _ = fs::remove_file(&lock_file_path);
302 return None;
303 }
304
305 Some(lock_info.worker_id)
306 }
307
308 pub fn get_active_locks(&self) -> Vec<LockInfo> {
310 let mut locks = Vec::new();
311
312 if !self.lock_dir.exists() {
313 return locks;
314 }
315
316 if let Ok(entries) = fs::read_dir(&self.lock_dir) {
317 for entry in entries.flatten() {
318 let path = entry.path();
319 if path.extension().is_some_and(|ext| ext == "lock") {
320 if let Some(lock_info) = self.read_lock_info(&path) {
321 if !self.is_lock_expired(&lock_info) {
322 locks.push(lock_info);
323 }
324 }
325 }
326 }
327 }
328
329 locks
330 }
331
332 pub fn cleanup_stale_locks(&self) -> usize {
334 let mut cleaned = 0;
335
336 if !self.lock_dir.exists() {
337 return cleaned;
338 }
339
340 if let Ok(entries) = fs::read_dir(&self.lock_dir) {
341 for entry in entries.flatten() {
342 let path = entry.path();
343 if path.extension().is_some_and(|ext| ext == "lock") {
344 if let Some(lock_info) = self.read_lock_info(&path) {
345 if self.is_lock_expired(&lock_info) && fs::remove_file(&path).is_ok() {
346 cleaned += 1;
347 }
348 }
349 }
350 }
351 }
352
353 cleaned
354 }
355
356 pub fn release_all_locks(&self, worker_id: &str) -> usize {
358 let mut released = 0;
359
360 if !self.lock_dir.exists() {
361 return released;
362 }
363
364 if let Ok(entries) = fs::read_dir(&self.lock_dir) {
365 for entry in entries.flatten() {
366 let path = entry.path();
367 if path.extension().is_some_and(|ext| ext == "lock") {
368 if let Some(lock_info) = self.read_lock_info(&path) {
369 if lock_info.worker_id == worker_id && fs::remove_file(&path).is_ok() {
370 released += 1;
371 }
372 }
373 }
374 }
375 }
376
377 released
378 }
379}
380
381impl Default for FileLockManager {
382 fn default() -> Self {
383 Self::new(None)
384 }
385}
386
387pub struct WorkerSandbox {
398 config: SandboxConfig,
399 sandbox_dir: PathBuf,
400 lock_manager: Arc<FileLockManager>,
401 copied_files: HashMap<String, FileMetadata>,
402}
403
404impl WorkerSandbox {
405 pub fn new(config: SandboxConfig, lock_manager: Option<Arc<FileLockManager>>) -> Self {
407 let sandbox_dir = config
408 .sandbox_dir
409 .clone()
410 .unwrap_or_else(|| get_default_sandbox_root().join(&config.worker_id));
411
412 Self {
413 config,
414 sandbox_dir,
415 lock_manager: lock_manager.unwrap_or_else(|| Arc::new(FileLockManager::default())),
416 copied_files: HashMap::new(),
417 }
418 }
419
420 pub fn setup(&self) -> Result<(), String> {
422 fs::create_dir_all(&self.sandbox_dir).map_err(|e| format!("创建沙箱目录失败: {}", e))?;
424
425 let metadata_path = self.sandbox_dir.join(".sandbox-metadata.json");
427 let metadata = serde_json::json!({
428 "worker_id": self.config.worker_id,
429 "task_id": self.config.task_id,
430 "base_dir": self.config.base_dir.to_string_lossy(),
431 "created_at": Utc::now().to_rfc3339(),
432 "pid": std::process::id(),
433 });
434
435 fs::write(
436 &metadata_path,
437 serde_json::to_string_pretty(&metadata).unwrap(),
438 )
439 .map_err(|e| format!("写入元数据失败: {}", e))?;
440
441 Ok(())
442 }
443
444 pub fn copy_to_sandbox(&mut self, files: &[String]) -> Result<(), String> {
446 for file in files {
447 let absolute_path = if Path::new(file).is_absolute() {
448 PathBuf::from(file)
449 } else {
450 self.config.base_dir.join(file)
451 };
452
453 if !absolute_path.exists() {
454 continue;
455 }
456
457 let relative_path = absolute_path
459 .strip_prefix(&self.config.base_dir)
460 .map_err(|_| format!("文件不在基础目录内: {}", file))?
461 .to_string_lossy()
462 .to_string();
463
464 let sandbox_path = self.sandbox_dir.join(&relative_path);
465
466 if let Some(parent) = sandbox_path.parent() {
468 fs::create_dir_all(parent).map_err(|e| format!("创建目录失败: {}", e))?;
469 }
470
471 let metadata =
473 fs::metadata(&absolute_path).map_err(|e| format!("获取文件元数据失败: {}", e))?;
474
475 if metadata.is_dir() {
476 copy_directory_recursive(&absolute_path, &sandbox_path)
477 .map_err(|e| format!("复制目录失败: {}", e))?;
478 } else {
479 fs::copy(&absolute_path, &sandbox_path)
480 .map_err(|e| format!("复制文件失败: {}", e))?;
481
482 if let Ok(hash) = compute_file_hash(&absolute_path) {
484 self.copied_files.insert(
485 relative_path.clone(),
486 FileMetadata {
487 relative_path,
488 hash,
489 mtime: metadata
490 .modified()
491 .map(|t| {
492 t.duration_since(std::time::UNIX_EPOCH).unwrap().as_secs()
493 as i64
494 })
495 .unwrap_or(0),
496 size: metadata.len(),
497 },
498 );
499 }
500 }
501 }
502
503 Ok(())
504 }
505
506 pub fn sync_back(&self) -> SyncResult {
508 let mut result = SyncResult::default();
509
510 let sandbox_files = self.scan_sandbox_files();
512 result.total = sandbox_files.len();
513
514 for sandbox_file in sandbox_files {
515 let relative_path = sandbox_file
516 .strip_prefix(&self.sandbox_dir)
517 .map(|p| p.to_string_lossy().to_string())
518 .unwrap_or_default();
519
520 let original_path = self.config.base_dir.join(&relative_path);
521
522 let sandbox_hash = match compute_file_hash(&sandbox_file) {
524 Ok(h) => h,
525 Err(e) => {
526 result.failed.push(SyncFailure {
527 file: relative_path,
528 error: format!("计算 hash 失败: {}", e),
529 });
530 continue;
531 }
532 };
533
534 if let Some(original_metadata) = self.copied_files.get(&relative_path) {
536 if original_metadata.hash == sandbox_hash {
537 continue;
539 }
540 }
541
542 let lock_acquired = self.lock_manager.acquire_lock(
544 original_path.to_str().unwrap_or(""),
545 &self.config.worker_id,
546 Some(60000),
547 );
548
549 match lock_acquired {
550 Ok(true) => {
551 if original_path.exists() {
553 if let Some(original_metadata) = self.copied_files.get(&relative_path) {
554 if let Ok(current_hash) = compute_file_hash(&original_path) {
555 if original_metadata.hash != current_hash {
556 result.conflicts.push(SyncConflict {
557 file: relative_path.clone(),
558 reason: "文件在沙箱和主目录中都被修改".to_string(),
559 });
560 let _ = self.lock_manager.release_lock(
561 original_path.to_str().unwrap_or(""),
562 &self.config.worker_id,
563 );
564 continue;
565 }
566 }
567 }
568 }
569
570 if let Some(parent) = original_path.parent() {
572 let _ = fs::create_dir_all(parent);
573 }
574
575 match fs::copy(&sandbox_file, &original_path) {
576 Ok(_) => result.success.push(relative_path.clone()),
577 Err(e) => result.failed.push(SyncFailure {
578 file: relative_path.clone(),
579 error: format!("复制文件失败: {}", e),
580 }),
581 }
582
583 let _ = self
585 .lock_manager
586 .release_lock(original_path.to_str().unwrap_or(""), &self.config.worker_id);
587 }
588 Ok(false) => {
589 let locker = self
590 .lock_manager
591 .get_locker(original_path.to_str().unwrap_or(""));
592 result.failed.push(SyncFailure {
593 file: relative_path,
594 error: format!("无法获取锁,被 {:?} 锁定", locker),
595 });
596 }
597 Err(e) => {
598 result.failed.push(SyncFailure {
599 file: relative_path,
600 error: e,
601 });
602 }
603 }
604 }
605
606 result
607 }
608
609 fn scan_sandbox_files(&self) -> Vec<PathBuf> {
611 let mut files = Vec::new();
612 self.scan_directory(&self.sandbox_dir, &mut files);
613 files
614 }
615
616 fn scan_directory(&self, dir: &Path, files: &mut Vec<PathBuf>) {
617 if !dir.exists() {
618 return;
619 }
620
621 if let Ok(entries) = fs::read_dir(dir) {
622 for entry in entries.flatten() {
623 let path = entry.path();
624
625 if path
627 .file_name()
628 .is_some_and(|n| n == ".sandbox-metadata.json")
629 {
630 continue;
631 }
632
633 if path.is_dir() {
634 self.scan_directory(&path, files);
635 } else if path.is_file() {
636 files.push(path);
637 }
638 }
639 }
640 }
641
642 pub fn cleanup(&self) -> Result<usize, String> {
644 let released = self.lock_manager.release_all_locks(&self.config.worker_id);
646
647 if self.sandbox_dir.exists() {
649 fs::remove_dir_all(&self.sandbox_dir)
650 .map_err(|e| format!("删除沙箱目录失败: {}", e))?;
651 }
652
653 Ok(released)
654 }
655
656 pub fn sandbox_dir(&self) -> &Path {
658 &self.sandbox_dir
659 }
660
661 pub fn get_sandbox_path(&self, relative_path: &str) -> PathBuf {
663 self.sandbox_dir.join(relative_path)
664 }
665
666 pub fn has_file(&self, relative_path: &str) -> bool {
668 self.get_sandbox_path(relative_path).exists()
669 }
670
671 pub fn get_stats(&self) -> SandboxStats {
673 let files = self.scan_sandbox_files();
674 let total_size: u64 = files
675 .iter()
676 .filter_map(|f| fs::metadata(f).ok())
677 .map(|m| m.len())
678 .sum();
679
680 SandboxStats {
681 file_count: files.len(),
682 total_size,
683 copied_files: self.copied_files.len(),
684 }
685 }
686}
687
688#[derive(Debug, Clone)]
690pub struct SandboxStats {
691 pub file_count: usize,
692 pub total_size: u64,
693 pub copied_files: usize,
694}
695
696pub fn create_lock_manager(lock_dir: Option<PathBuf>) -> Arc<FileLockManager> {
702 Arc::new(FileLockManager::new(lock_dir))
703}
704
705pub fn create_worker_sandbox(
707 config: SandboxConfig,
708 lock_manager: Option<Arc<FileLockManager>>,
709) -> WorkerSandbox {
710 WorkerSandbox::new(config, lock_manager)
711}
712
713#[cfg(test)]
714mod tests {
715 use super::*;
716 use std::env::temp_dir;
717
718 #[test]
719 fn test_compute_string_hash() {
720 let hash1 = compute_string_hash("test");
721 let hash2 = compute_string_hash("test");
722 let hash3 = compute_string_hash("different");
723
724 assert_eq!(hash1, hash2);
725 assert_ne!(hash1, hash3);
726 assert_eq!(hash1.len(), 16);
727 }
728
729 #[test]
730 fn test_file_lock_manager() {
731 let lock_dir = temp_dir().join("aster_test_locks");
732 let manager = FileLockManager::new(Some(lock_dir.clone()));
733
734 let result = manager.acquire_lock("/test/file.rs", "worker1", None);
736 assert!(result.is_ok());
737 assert!(result.unwrap());
738
739 assert!(manager.is_locked("/test/file.rs"));
741 assert_eq!(
742 manager.get_locker("/test/file.rs"),
743 Some("worker1".to_string())
744 );
745
746 let result = manager.release_lock("/test/file.rs", "worker1");
748 assert!(result.is_ok());
749 assert!(!manager.is_locked("/test/file.rs"));
750
751 let _ = fs::remove_dir_all(lock_dir);
753 }
754
755 #[test]
756 fn test_sandbox_config() {
757 let config = SandboxConfig {
758 worker_id: "test_worker".to_string(),
759 task_id: "test_task".to_string(),
760 base_dir: PathBuf::from("/tmp/test"),
761 sandbox_dir: None,
762 };
763
764 assert_eq!(config.worker_id, "test_worker");
765 assert_eq!(config.task_id, "test_task");
766 }
767}