Skip to main content

torsh_package/
dependency_installer.rs

1//! Parallel dependency installation with async support
2//!
3//! This module provides functionality for downloading and installing
4//! package dependencies in parallel with progress tracking and retry logic.
5
6use std::path::{Path, PathBuf};
7use std::sync::atomic::{AtomicU64, AtomicUsize, Ordering};
8use std::sync::Arc;
9use std::time::{Duration, Instant};
10
11use scirs2_core::parallel_ops::*;
12use serde::{Deserialize, Serialize};
13use torsh_core::error::{Result, TorshError};
14
15use crate::dependency::{PackageRegistry, ResolvedDependency};
16use crate::dependency_lockfile::LockedDependency;
17
18/// Download options for parallel installation
19#[derive(Debug, Clone)]
20pub struct DownloadOptions {
21    /// Maximum number of parallel downloads
22    pub max_parallel: usize,
23    /// Connection timeout in seconds
24    pub timeout_secs: u64,
25    /// Number of retry attempts for failed downloads
26    pub max_retries: usize,
27    /// Delay between retries in milliseconds
28    pub retry_delay_ms: u64,
29    /// Download buffer size in bytes
30    pub buffer_size: usize,
31    /// Whether to verify package integrity after download
32    pub verify_integrity: bool,
33    /// Whether to resume partial downloads
34    pub resume_partial: bool,
35}
36
37/// Installation progress tracking
38#[derive(Debug, Clone)]
39pub struct InstallationProgress {
40    /// Total number of packages to install
41    pub total_packages: usize,
42    /// Number of packages downloaded
43    pub downloaded: Arc<AtomicUsize>,
44    /// Number of packages installed
45    pub installed: Arc<AtomicUsize>,
46    /// Number of packages failed
47    pub failed: Arc<AtomicUsize>,
48    /// Total bytes to download
49    pub total_bytes: u64,
50    /// Bytes downloaded so far
51    pub downloaded_bytes: Arc<AtomicU64>,
52    /// Start time of installation
53    pub start_time: Instant,
54}
55
56/// Installation plan with dependency ordering
57#[derive(Debug, Clone, Serialize, Deserialize)]
58pub struct InstallationPlan {
59    /// Packages to install in dependency order
60    pub packages: Vec<PlannedPackage>,
61    /// Total estimated download size
62    pub total_size: u64,
63    /// Estimated installation time in seconds
64    pub estimated_time: u64,
65}
66
67/// Planned package installation
68#[derive(Debug, Clone, Serialize, Deserialize)]
69pub struct PlannedPackage {
70    /// Package name
71    pub name: String,
72    /// Package version
73    pub version: String,
74    /// Installation priority (lower = higher priority)
75    pub priority: usize,
76    /// Dependencies that must be installed first
77    pub depends_on: Vec<String>,
78    /// Estimated download size in bytes
79    pub size: u64,
80}
81
82/// Parallel dependency installer
83pub struct ParallelDependencyInstaller {
84    /// Download options
85    options: DownloadOptions,
86    /// Package registry for downloading
87    registry: Arc<dyn PackageRegistry>,
88    /// Installation directory
89    install_dir: PathBuf,
90    /// Progress tracker
91    progress: Arc<InstallationProgress>,
92}
93
94/// Installation result for a single package
95#[derive(Debug)]
96pub struct InstallationResult {
97    /// Package name
98    pub name: String,
99    /// Package version
100    pub version: String,
101    /// Whether installation succeeded
102    pub success: bool,
103    /// Error message if failed
104    pub error: Option<String>,
105    /// Time taken to install in milliseconds
106    pub duration_ms: u64,
107    /// Bytes downloaded
108    pub bytes_downloaded: u64,
109}
110
111/// Installation statistics
112#[derive(Debug, Clone)]
113pub struct InstallationStatistics {
114    /// Total packages installed
115    pub total_installed: usize,
116    /// Total packages failed
117    pub total_failed: usize,
118    /// Total time taken in seconds
119    pub total_time_secs: f64,
120    /// Total bytes downloaded
121    pub total_bytes: u64,
122    /// Average download speed in bytes/sec
123    pub avg_download_speed: f64,
124    /// Packages installed per second
125    pub packages_per_sec: f64,
126}
127
128impl Default for DownloadOptions {
129    fn default() -> Self {
130        Self {
131            max_parallel: 8,
132            timeout_secs: 300,
133            max_retries: 3,
134            retry_delay_ms: 1000,
135            buffer_size: 8192,
136            verify_integrity: true,
137            resume_partial: true,
138        }
139    }
140}
141
142impl DownloadOptions {
143    /// Create new download options with defaults
144    pub fn new() -> Self {
145        Self::default()
146    }
147
148    /// Set maximum parallel downloads
149    pub fn with_max_parallel(mut self, max: usize) -> Self {
150        self.max_parallel = max;
151        self
152    }
153
154    /// Set connection timeout
155    pub fn with_timeout(mut self, secs: u64) -> Self {
156        self.timeout_secs = secs;
157        self
158    }
159
160    /// Set maximum retries
161    pub fn with_max_retries(mut self, retries: usize) -> Self {
162        self.max_retries = retries;
163        self
164    }
165
166    /// Enable/disable integrity verification
167    pub fn with_verify_integrity(mut self, verify: bool) -> Self {
168        self.verify_integrity = verify;
169        self
170    }
171}
172
173impl InstallationProgress {
174    /// Create new progress tracker
175    pub fn new(total_packages: usize, total_bytes: u64) -> Self {
176        Self {
177            total_packages,
178            downloaded: Arc::new(AtomicUsize::new(0)),
179            installed: Arc::new(AtomicUsize::new(0)),
180            failed: Arc::new(AtomicUsize::new(0)),
181            total_bytes,
182            downloaded_bytes: Arc::new(AtomicU64::new(0)),
183            start_time: Instant::now(),
184        }
185    }
186
187    /// Get current progress as percentage (0-100)
188    pub fn percentage(&self) -> f64 {
189        if self.total_packages == 0 {
190            return 100.0;
191        }
192        (self.installed.load(Ordering::Relaxed) as f64 / self.total_packages as f64) * 100.0
193    }
194
195    /// Get download progress as percentage (0-100)
196    pub fn download_percentage(&self) -> f64 {
197        if self.total_bytes == 0 {
198            return 100.0;
199        }
200        (self.downloaded_bytes.load(Ordering::Relaxed) as f64 / self.total_bytes as f64) * 100.0
201    }
202
203    /// Get elapsed time in seconds
204    pub fn elapsed_secs(&self) -> f64 {
205        self.start_time.elapsed().as_secs_f64()
206    }
207
208    /// Get estimated remaining time in seconds
209    pub fn estimated_remaining_secs(&self) -> f64 {
210        let installed = self.installed.load(Ordering::Relaxed);
211        if installed == 0 {
212            return 0.0;
213        }
214
215        let elapsed = self.elapsed_secs();
216        let rate = installed as f64 / elapsed;
217        let remaining = self.total_packages - installed;
218
219        remaining as f64 / rate
220    }
221
222    /// Get download speed in bytes/sec
223    pub fn download_speed(&self) -> f64 {
224        let elapsed = self.elapsed_secs();
225        if elapsed == 0.0 {
226            return 0.0;
227        }
228
229        self.downloaded_bytes.load(Ordering::Relaxed) as f64 / elapsed
230    }
231
232    /// Mark a package as downloaded
233    pub fn mark_downloaded(&self, bytes: u64) {
234        self.downloaded.fetch_add(1, Ordering::Relaxed);
235        self.downloaded_bytes.fetch_add(bytes, Ordering::Relaxed);
236    }
237
238    /// Mark a package as installed
239    pub fn mark_installed(&self) {
240        self.installed.fetch_add(1, Ordering::Relaxed);
241    }
242
243    /// Mark a package as failed
244    pub fn mark_failed(&self) {
245        self.failed.fetch_add(1, Ordering::Relaxed);
246    }
247}
248
249impl InstallationPlan {
250    /// Create a new installation plan
251    pub fn new() -> Self {
252        Self {
253            packages: Vec::new(),
254            total_size: 0,
255            estimated_time: 0,
256        }
257    }
258
259    /// Add a package to the plan
260    pub fn add_package(&mut self, package: PlannedPackage) {
261        self.total_size += package.size;
262        self.packages.push(package);
263    }
264
265    /// Sort packages by dependency order
266    pub fn sort_by_dependencies(&mut self) -> Result<()> {
267        // Topological sort
268        let mut sorted = Vec::new();
269        let mut visited = std::collections::HashSet::new();
270        let mut visiting = std::collections::HashSet::new();
271
272        for pkg in &self.packages {
273            if !visited.contains(&pkg.name) {
274                self.visit_package(&pkg.name, &mut visited, &mut visiting, &mut sorted)?;
275            }
276        }
277
278        self.packages = sorted;
279        Ok(())
280    }
281
282    /// Visit package for topological sort (DFS)
283    fn visit_package(
284        &self,
285        name: &str,
286        visited: &mut std::collections::HashSet<String>,
287        visiting: &mut std::collections::HashSet<String>,
288        sorted: &mut Vec<PlannedPackage>,
289    ) -> Result<()> {
290        if visiting.contains(name) {
291            return Err(TorshError::InvalidArgument(format!(
292                "Circular dependency detected: {}",
293                name
294            )));
295        }
296
297        if visited.contains(name) {
298            return Ok(());
299        }
300
301        visiting.insert(name.to_string());
302
303        // Find the package
304        let package = self
305            .packages
306            .iter()
307            .find(|p| p.name == name)
308            .ok_or_else(|| {
309                TorshError::InvalidArgument(format!("Package not found in plan: {}", name))
310            })?;
311
312        // Visit dependencies first
313        for dep in &package.depends_on {
314            self.visit_package(dep, visited, visiting, sorted)?;
315        }
316
317        visiting.remove(name);
318        visited.insert(name.to_string());
319        sorted.push(package.clone());
320
321        Ok(())
322    }
323
324    /// Get packages by priority level
325    pub fn get_by_priority(&self, priority: usize) -> Vec<&PlannedPackage> {
326        self.packages
327            .iter()
328            .filter(|p| p.priority == priority)
329            .collect()
330    }
331
332    /// Estimate total installation time based on size and bandwidth
333    pub fn estimate_time(&mut self, bandwidth_bytes_per_sec: u64) {
334        if bandwidth_bytes_per_sec > 0 {
335            self.estimated_time = self.total_size / bandwidth_bytes_per_sec;
336        }
337    }
338}
339
340impl Default for InstallationPlan {
341    fn default() -> Self {
342        Self::new()
343    }
344}
345
346impl ParallelDependencyInstaller {
347    /// Create a new parallel installer
348    pub fn new(
349        registry: Arc<dyn PackageRegistry>,
350        install_dir: PathBuf,
351        options: DownloadOptions,
352    ) -> Self {
353        Self {
354            options,
355            registry,
356            install_dir,
357            progress: Arc::new(InstallationProgress::new(0, 0)),
358        }
359    }
360
361    /// Create installation plan from resolved dependencies
362    pub fn create_plan(&self, dependencies: &[ResolvedDependency]) -> Result<InstallationPlan> {
363        let mut plan = InstallationPlan::new();
364
365        for (priority, dep) in dependencies.iter().enumerate() {
366            let package_info = self
367                .registry
368                .get_package_info(&dep.spec.name, &dep.resolved_version)?;
369
370            let planned = PlannedPackage {
371                name: dep.spec.name.clone(),
372                version: dep.resolved_version.clone(),
373                priority,
374                depends_on: dep
375                    .dependencies
376                    .iter()
377                    .map(|d| d.spec.name.clone())
378                    .collect(),
379                size: package_info.size,
380            };
381
382            plan.add_package(planned);
383        }
384
385        // Sort by dependencies
386        plan.sort_by_dependencies()?;
387
388        // Estimate installation time (assuming 10 MB/s bandwidth)
389        plan.estimate_time(10 * 1024 * 1024);
390
391        Ok(plan)
392    }
393
394    /// Create installation plan from locked dependencies
395    pub fn create_plan_from_lockfile(
396        &self,
397        dependencies: &[LockedDependency],
398    ) -> Result<InstallationPlan> {
399        let mut plan = InstallationPlan::new();
400
401        for (priority, dep) in dependencies.iter().enumerate() {
402            let planned = PlannedPackage {
403                name: dep.name.clone(),
404                version: dep.version.clone(),
405                priority,
406                depends_on: dep.dependencies.clone(),
407                size: 1024 * 1024, // Default 1MB, would be fetched from registry
408            };
409
410            plan.add_package(planned);
411        }
412
413        plan.sort_by_dependencies()?;
414        plan.estimate_time(10 * 1024 * 1024);
415
416        Ok(plan)
417    }
418
419    /// Install dependencies according to plan
420    pub fn install(&mut self, plan: &InstallationPlan) -> Result<InstallationStatistics> {
421        let total_packages = plan.packages.len();
422        let total_bytes = plan.total_size;
423
424        self.progress = Arc::new(InstallationProgress::new(total_packages, total_bytes));
425
426        // Group packages by priority level for parallel installation
427        let max_priority = plan.packages.iter().map(|p| p.priority).max().unwrap_or(0);
428
429        let mut results = Vec::new();
430
431        // Install packages level by level
432        for level in 0..=max_priority {
433            let level_packages: Vec<_> = plan.get_by_priority(level);
434
435            if level_packages.is_empty() {
436                continue;
437            }
438
439            // Install this level in parallel
440            let level_results = self.install_parallel(&level_packages)?;
441            results.extend(level_results);
442        }
443
444        // Compute statistics
445        let stats = self.compute_statistics(&results);
446
447        Ok(stats)
448    }
449
450    /// Install packages in parallel
451    fn install_parallel(&self, packages: &[&PlannedPackage]) -> Result<Vec<InstallationResult>> {
452        let _chunk_size = (packages.len() / self.options.max_parallel).max(1);
453
454        // Use scirs2-core parallel operations
455        let results: Vec<_> = packages
456            .into_par_iter()
457            .map(|pkg| self.install_package(pkg))
458            .collect();
459
460        // Check for errors
461        for result in &results {
462            if !result.success {
463                if let Some(error) = &result.error {
464                    // Log error but continue with other packages
465                    eprintln!("Failed to install {}: {}", result.name, error);
466                }
467            }
468        }
469
470        Ok(results)
471    }
472
473    /// Install a single package
474    fn install_package(&self, package: &PlannedPackage) -> InstallationResult {
475        let start_time = Instant::now();
476        let mut bytes_downloaded = 0u64;
477
478        let result = self.install_package_with_retry(package, &mut bytes_downloaded);
479
480        let duration_ms = start_time.elapsed().as_millis() as u64;
481
482        match result {
483            Ok(()) => {
484                self.progress.mark_installed();
485                InstallationResult {
486                    name: package.name.clone(),
487                    version: package.version.clone(),
488                    success: true,
489                    error: None,
490                    duration_ms,
491                    bytes_downloaded,
492                }
493            }
494            Err(e) => {
495                self.progress.mark_failed();
496                InstallationResult {
497                    name: package.name.clone(),
498                    version: package.version.clone(),
499                    success: false,
500                    error: Some(e.to_string()),
501                    duration_ms,
502                    bytes_downloaded,
503                }
504            }
505        }
506    }
507
508    /// Install package with retry logic
509    fn install_package_with_retry(
510        &self,
511        package: &PlannedPackage,
512        bytes_downloaded: &mut u64,
513    ) -> Result<()> {
514        let mut last_error = None;
515
516        for attempt in 0..=self.options.max_retries {
517            if attempt > 0 {
518                // Wait before retry
519                std::thread::sleep(Duration::from_millis(self.options.retry_delay_ms));
520            }
521
522            match self.download_and_install(package, bytes_downloaded) {
523                Ok(()) => return Ok(()),
524                Err(e) => {
525                    last_error = Some(e);
526                    if attempt < self.options.max_retries {
527                        eprintln!(
528                            "Download failed for {} (attempt {}/{}), retrying...",
529                            package.name,
530                            attempt + 1,
531                            self.options.max_retries
532                        );
533                    }
534                }
535            }
536        }
537
538        Err(last_error.unwrap_or_else(|| {
539            TorshError::InvalidArgument("Installation failed with unknown error".to_string())
540        }))
541    }
542
543    /// Download and install a package
544    fn download_and_install(
545        &self,
546        package: &PlannedPackage,
547        bytes_downloaded: &mut u64,
548    ) -> Result<()> {
549        let package_path = self
550            .install_dir
551            .join(format!("{}-{}.torshpkg", package.name, package.version));
552
553        // Download the package
554        self.registry
555            .download_package(&package.name, &package.version, &package_path)?;
556
557        // Update progress
558        *bytes_downloaded = package.size;
559        self.progress.mark_downloaded(package.size);
560
561        // Verify integrity if enabled
562        if self.options.verify_integrity {
563            self.verify_package_integrity(&package_path)?;
564        }
565
566        Ok(())
567    }
568
569    /// Verify package integrity
570    fn verify_package_integrity(&self, _package_path: &Path) -> Result<()> {
571        // Simplified verification - in production, check against lockfile hash
572        Ok(())
573    }
574
575    /// Compute installation statistics
576    fn compute_statistics(&self, results: &[InstallationResult]) -> InstallationStatistics {
577        let total_installed = results.iter().filter(|r| r.success).count();
578        let total_failed = results.iter().filter(|r| !r.success).count();
579        let total_bytes: u64 = results.iter().map(|r| r.bytes_downloaded).sum();
580        let total_time_secs = self.progress.elapsed_secs();
581
582        let avg_download_speed = if total_time_secs > 0.0 {
583            total_bytes as f64 / total_time_secs
584        } else {
585            0.0
586        };
587
588        let packages_per_sec = if total_time_secs > 0.0 {
589            total_installed as f64 / total_time_secs
590        } else {
591            0.0
592        };
593
594        InstallationStatistics {
595            total_installed,
596            total_failed,
597            total_time_secs,
598            total_bytes,
599            avg_download_speed,
600            packages_per_sec,
601        }
602    }
603
604    /// Get current progress
605    pub fn get_progress(&self) -> &InstallationProgress {
606        &self.progress
607    }
608}
609
610#[cfg(test)]
611mod tests {
612    use super::*;
613
614    #[test]
615    fn test_download_options() {
616        let options = DownloadOptions::new()
617            .with_max_parallel(16)
618            .with_timeout(600)
619            .with_max_retries(5);
620
621        assert_eq!(options.max_parallel, 16);
622        assert_eq!(options.timeout_secs, 600);
623        assert_eq!(options.max_retries, 5);
624    }
625
626    #[test]
627    fn test_installation_progress() {
628        let progress = InstallationProgress::new(10, 1000);
629
630        progress.mark_downloaded(100);
631        assert_eq!(progress.downloaded.load(Ordering::Relaxed), 1);
632        assert_eq!(progress.downloaded_bytes.load(Ordering::Relaxed), 100);
633
634        progress.mark_installed();
635        assert_eq!(progress.installed.load(Ordering::Relaxed), 1);
636
637        assert_eq!(progress.percentage(), 10.0);
638        assert_eq!(progress.download_percentage(), 10.0);
639    }
640
641    #[test]
642    fn test_installation_plan() {
643        let mut plan = InstallationPlan::new();
644
645        let pkg1 = PlannedPackage {
646            name: "pkg1".to_string(),
647            version: "1.0.0".to_string(),
648            priority: 0,
649            depends_on: vec![],
650            size: 1000,
651        };
652
653        let pkg2 = PlannedPackage {
654            name: "pkg2".to_string(),
655            version: "1.0.0".to_string(),
656            priority: 1,
657            depends_on: vec!["pkg1".to_string()],
658            size: 2000,
659        };
660
661        plan.add_package(pkg1);
662        plan.add_package(pkg2);
663
664        assert_eq!(plan.total_size, 3000);
665        assert_eq!(plan.packages.len(), 2);
666    }
667
668    #[test]
669    fn test_topological_sort() {
670        let mut plan = InstallationPlan::new();
671
672        // Create dependency chain: pkg3 -> pkg2 -> pkg1
673        plan.add_package(PlannedPackage {
674            name: "pkg3".to_string(),
675            version: "1.0.0".to_string(),
676            priority: 0,
677            depends_on: vec!["pkg2".to_string()],
678            size: 1000,
679        });
680
681        plan.add_package(PlannedPackage {
682            name: "pkg2".to_string(),
683            version: "1.0.0".to_string(),
684            priority: 1,
685            depends_on: vec!["pkg1".to_string()],
686            size: 1000,
687        });
688
689        plan.add_package(PlannedPackage {
690            name: "pkg1".to_string(),
691            version: "1.0.0".to_string(),
692            priority: 2,
693            depends_on: vec![],
694            size: 1000,
695        });
696
697        plan.sort_by_dependencies().unwrap();
698
699        // pkg1 should be first, then pkg2, then pkg3
700        assert_eq!(plan.packages[0].name, "pkg1");
701        assert_eq!(plan.packages[1].name, "pkg2");
702        assert_eq!(plan.packages[2].name, "pkg3");
703    }
704}