1use std::path::{Path, PathBuf};
7use std::sync::atomic::{AtomicU64, AtomicUsize, Ordering};
8use std::sync::Arc;
9use std::time::{Duration, Instant};
10
11use scirs2_core::parallel_ops::*;
12use serde::{Deserialize, Serialize};
13use torsh_core::error::{Result, TorshError};
14
15use crate::dependency::{PackageRegistry, ResolvedDependency};
16use crate::dependency_lockfile::LockedDependency;
17
18#[derive(Debug, Clone)]
20pub struct DownloadOptions {
21 pub max_parallel: usize,
23 pub timeout_secs: u64,
25 pub max_retries: usize,
27 pub retry_delay_ms: u64,
29 pub buffer_size: usize,
31 pub verify_integrity: bool,
33 pub resume_partial: bool,
35}
36
37#[derive(Debug, Clone)]
39pub struct InstallationProgress {
40 pub total_packages: usize,
42 pub downloaded: Arc<AtomicUsize>,
44 pub installed: Arc<AtomicUsize>,
46 pub failed: Arc<AtomicUsize>,
48 pub total_bytes: u64,
50 pub downloaded_bytes: Arc<AtomicU64>,
52 pub start_time: Instant,
54}
55
56#[derive(Debug, Clone, Serialize, Deserialize)]
58pub struct InstallationPlan {
59 pub packages: Vec<PlannedPackage>,
61 pub total_size: u64,
63 pub estimated_time: u64,
65}
66
67#[derive(Debug, Clone, Serialize, Deserialize)]
69pub struct PlannedPackage {
70 pub name: String,
72 pub version: String,
74 pub priority: usize,
76 pub depends_on: Vec<String>,
78 pub size: u64,
80}
81
82pub struct ParallelDependencyInstaller {
84 options: DownloadOptions,
86 registry: Arc<dyn PackageRegistry>,
88 install_dir: PathBuf,
90 progress: Arc<InstallationProgress>,
92}
93
94#[derive(Debug)]
96pub struct InstallationResult {
97 pub name: String,
99 pub version: String,
101 pub success: bool,
103 pub error: Option<String>,
105 pub duration_ms: u64,
107 pub bytes_downloaded: u64,
109}
110
111#[derive(Debug, Clone)]
113pub struct InstallationStatistics {
114 pub total_installed: usize,
116 pub total_failed: usize,
118 pub total_time_secs: f64,
120 pub total_bytes: u64,
122 pub avg_download_speed: f64,
124 pub packages_per_sec: f64,
126}
127
128impl Default for DownloadOptions {
129 fn default() -> Self {
130 Self {
131 max_parallel: 8,
132 timeout_secs: 300,
133 max_retries: 3,
134 retry_delay_ms: 1000,
135 buffer_size: 8192,
136 verify_integrity: true,
137 resume_partial: true,
138 }
139 }
140}
141
142impl DownloadOptions {
143 pub fn new() -> Self {
145 Self::default()
146 }
147
148 pub fn with_max_parallel(mut self, max: usize) -> Self {
150 self.max_parallel = max;
151 self
152 }
153
154 pub fn with_timeout(mut self, secs: u64) -> Self {
156 self.timeout_secs = secs;
157 self
158 }
159
160 pub fn with_max_retries(mut self, retries: usize) -> Self {
162 self.max_retries = retries;
163 self
164 }
165
166 pub fn with_verify_integrity(mut self, verify: bool) -> Self {
168 self.verify_integrity = verify;
169 self
170 }
171}
172
173impl InstallationProgress {
174 pub fn new(total_packages: usize, total_bytes: u64) -> Self {
176 Self {
177 total_packages,
178 downloaded: Arc::new(AtomicUsize::new(0)),
179 installed: Arc::new(AtomicUsize::new(0)),
180 failed: Arc::new(AtomicUsize::new(0)),
181 total_bytes,
182 downloaded_bytes: Arc::new(AtomicU64::new(0)),
183 start_time: Instant::now(),
184 }
185 }
186
187 pub fn percentage(&self) -> f64 {
189 if self.total_packages == 0 {
190 return 100.0;
191 }
192 (self.installed.load(Ordering::Relaxed) as f64 / self.total_packages as f64) * 100.0
193 }
194
195 pub fn download_percentage(&self) -> f64 {
197 if self.total_bytes == 0 {
198 return 100.0;
199 }
200 (self.downloaded_bytes.load(Ordering::Relaxed) as f64 / self.total_bytes as f64) * 100.0
201 }
202
203 pub fn elapsed_secs(&self) -> f64 {
205 self.start_time.elapsed().as_secs_f64()
206 }
207
208 pub fn estimated_remaining_secs(&self) -> f64 {
210 let installed = self.installed.load(Ordering::Relaxed);
211 if installed == 0 {
212 return 0.0;
213 }
214
215 let elapsed = self.elapsed_secs();
216 let rate = installed as f64 / elapsed;
217 let remaining = self.total_packages - installed;
218
219 remaining as f64 / rate
220 }
221
222 pub fn download_speed(&self) -> f64 {
224 let elapsed = self.elapsed_secs();
225 if elapsed == 0.0 {
226 return 0.0;
227 }
228
229 self.downloaded_bytes.load(Ordering::Relaxed) as f64 / elapsed
230 }
231
232 pub fn mark_downloaded(&self, bytes: u64) {
234 self.downloaded.fetch_add(1, Ordering::Relaxed);
235 self.downloaded_bytes.fetch_add(bytes, Ordering::Relaxed);
236 }
237
238 pub fn mark_installed(&self) {
240 self.installed.fetch_add(1, Ordering::Relaxed);
241 }
242
243 pub fn mark_failed(&self) {
245 self.failed.fetch_add(1, Ordering::Relaxed);
246 }
247}
248
249impl InstallationPlan {
250 pub fn new() -> Self {
252 Self {
253 packages: Vec::new(),
254 total_size: 0,
255 estimated_time: 0,
256 }
257 }
258
259 pub fn add_package(&mut self, package: PlannedPackage) {
261 self.total_size += package.size;
262 self.packages.push(package);
263 }
264
265 pub fn sort_by_dependencies(&mut self) -> Result<()> {
267 let mut sorted = Vec::new();
269 let mut visited = std::collections::HashSet::new();
270 let mut visiting = std::collections::HashSet::new();
271
272 for pkg in &self.packages {
273 if !visited.contains(&pkg.name) {
274 self.visit_package(&pkg.name, &mut visited, &mut visiting, &mut sorted)?;
275 }
276 }
277
278 self.packages = sorted;
279 Ok(())
280 }
281
282 fn visit_package(
284 &self,
285 name: &str,
286 visited: &mut std::collections::HashSet<String>,
287 visiting: &mut std::collections::HashSet<String>,
288 sorted: &mut Vec<PlannedPackage>,
289 ) -> Result<()> {
290 if visiting.contains(name) {
291 return Err(TorshError::InvalidArgument(format!(
292 "Circular dependency detected: {}",
293 name
294 )));
295 }
296
297 if visited.contains(name) {
298 return Ok(());
299 }
300
301 visiting.insert(name.to_string());
302
303 let package = self
305 .packages
306 .iter()
307 .find(|p| p.name == name)
308 .ok_or_else(|| {
309 TorshError::InvalidArgument(format!("Package not found in plan: {}", name))
310 })?;
311
312 for dep in &package.depends_on {
314 self.visit_package(dep, visited, visiting, sorted)?;
315 }
316
317 visiting.remove(name);
318 visited.insert(name.to_string());
319 sorted.push(package.clone());
320
321 Ok(())
322 }
323
324 pub fn get_by_priority(&self, priority: usize) -> Vec<&PlannedPackage> {
326 self.packages
327 .iter()
328 .filter(|p| p.priority == priority)
329 .collect()
330 }
331
332 pub fn estimate_time(&mut self, bandwidth_bytes_per_sec: u64) {
334 if bandwidth_bytes_per_sec > 0 {
335 self.estimated_time = self.total_size / bandwidth_bytes_per_sec;
336 }
337 }
338}
339
340impl Default for InstallationPlan {
341 fn default() -> Self {
342 Self::new()
343 }
344}
345
346impl ParallelDependencyInstaller {
347 pub fn new(
349 registry: Arc<dyn PackageRegistry>,
350 install_dir: PathBuf,
351 options: DownloadOptions,
352 ) -> Self {
353 Self {
354 options,
355 registry,
356 install_dir,
357 progress: Arc::new(InstallationProgress::new(0, 0)),
358 }
359 }
360
361 pub fn create_plan(&self, dependencies: &[ResolvedDependency]) -> Result<InstallationPlan> {
363 let mut plan = InstallationPlan::new();
364
365 for (priority, dep) in dependencies.iter().enumerate() {
366 let package_info = self
367 .registry
368 .get_package_info(&dep.spec.name, &dep.resolved_version)?;
369
370 let planned = PlannedPackage {
371 name: dep.spec.name.clone(),
372 version: dep.resolved_version.clone(),
373 priority,
374 depends_on: dep
375 .dependencies
376 .iter()
377 .map(|d| d.spec.name.clone())
378 .collect(),
379 size: package_info.size,
380 };
381
382 plan.add_package(planned);
383 }
384
385 plan.sort_by_dependencies()?;
387
388 plan.estimate_time(10 * 1024 * 1024);
390
391 Ok(plan)
392 }
393
394 pub fn create_plan_from_lockfile(
396 &self,
397 dependencies: &[LockedDependency],
398 ) -> Result<InstallationPlan> {
399 let mut plan = InstallationPlan::new();
400
401 for (priority, dep) in dependencies.iter().enumerate() {
402 let planned = PlannedPackage {
403 name: dep.name.clone(),
404 version: dep.version.clone(),
405 priority,
406 depends_on: dep.dependencies.clone(),
407 size: 1024 * 1024, };
409
410 plan.add_package(planned);
411 }
412
413 plan.sort_by_dependencies()?;
414 plan.estimate_time(10 * 1024 * 1024);
415
416 Ok(plan)
417 }
418
419 pub fn install(&mut self, plan: &InstallationPlan) -> Result<InstallationStatistics> {
421 let total_packages = plan.packages.len();
422 let total_bytes = plan.total_size;
423
424 self.progress = Arc::new(InstallationProgress::new(total_packages, total_bytes));
425
426 let max_priority = plan.packages.iter().map(|p| p.priority).max().unwrap_or(0);
428
429 let mut results = Vec::new();
430
431 for level in 0..=max_priority {
433 let level_packages: Vec<_> = plan.get_by_priority(level);
434
435 if level_packages.is_empty() {
436 continue;
437 }
438
439 let level_results = self.install_parallel(&level_packages)?;
441 results.extend(level_results);
442 }
443
444 let stats = self.compute_statistics(&results);
446
447 Ok(stats)
448 }
449
450 fn install_parallel(&self, packages: &[&PlannedPackage]) -> Result<Vec<InstallationResult>> {
452 let _chunk_size = (packages.len() / self.options.max_parallel).max(1);
453
454 let results: Vec<_> = packages
456 .into_par_iter()
457 .map(|pkg| self.install_package(pkg))
458 .collect();
459
460 for result in &results {
462 if !result.success {
463 if let Some(error) = &result.error {
464 eprintln!("Failed to install {}: {}", result.name, error);
466 }
467 }
468 }
469
470 Ok(results)
471 }
472
473 fn install_package(&self, package: &PlannedPackage) -> InstallationResult {
475 let start_time = Instant::now();
476 let mut bytes_downloaded = 0u64;
477
478 let result = self.install_package_with_retry(package, &mut bytes_downloaded);
479
480 let duration_ms = start_time.elapsed().as_millis() as u64;
481
482 match result {
483 Ok(()) => {
484 self.progress.mark_installed();
485 InstallationResult {
486 name: package.name.clone(),
487 version: package.version.clone(),
488 success: true,
489 error: None,
490 duration_ms,
491 bytes_downloaded,
492 }
493 }
494 Err(e) => {
495 self.progress.mark_failed();
496 InstallationResult {
497 name: package.name.clone(),
498 version: package.version.clone(),
499 success: false,
500 error: Some(e.to_string()),
501 duration_ms,
502 bytes_downloaded,
503 }
504 }
505 }
506 }
507
508 fn install_package_with_retry(
510 &self,
511 package: &PlannedPackage,
512 bytes_downloaded: &mut u64,
513 ) -> Result<()> {
514 let mut last_error = None;
515
516 for attempt in 0..=self.options.max_retries {
517 if attempt > 0 {
518 std::thread::sleep(Duration::from_millis(self.options.retry_delay_ms));
520 }
521
522 match self.download_and_install(package, bytes_downloaded) {
523 Ok(()) => return Ok(()),
524 Err(e) => {
525 last_error = Some(e);
526 if attempt < self.options.max_retries {
527 eprintln!(
528 "Download failed for {} (attempt {}/{}), retrying...",
529 package.name,
530 attempt + 1,
531 self.options.max_retries
532 );
533 }
534 }
535 }
536 }
537
538 Err(last_error.unwrap_or_else(|| {
539 TorshError::InvalidArgument("Installation failed with unknown error".to_string())
540 }))
541 }
542
543 fn download_and_install(
545 &self,
546 package: &PlannedPackage,
547 bytes_downloaded: &mut u64,
548 ) -> Result<()> {
549 let package_path = self
550 .install_dir
551 .join(format!("{}-{}.torshpkg", package.name, package.version));
552
553 self.registry
555 .download_package(&package.name, &package.version, &package_path)?;
556
557 *bytes_downloaded = package.size;
559 self.progress.mark_downloaded(package.size);
560
561 if self.options.verify_integrity {
563 self.verify_package_integrity(&package_path)?;
564 }
565
566 Ok(())
567 }
568
569 fn verify_package_integrity(&self, _package_path: &Path) -> Result<()> {
571 Ok(())
573 }
574
575 fn compute_statistics(&self, results: &[InstallationResult]) -> InstallationStatistics {
577 let total_installed = results.iter().filter(|r| r.success).count();
578 let total_failed = results.iter().filter(|r| !r.success).count();
579 let total_bytes: u64 = results.iter().map(|r| r.bytes_downloaded).sum();
580 let total_time_secs = self.progress.elapsed_secs();
581
582 let avg_download_speed = if total_time_secs > 0.0 {
583 total_bytes as f64 / total_time_secs
584 } else {
585 0.0
586 };
587
588 let packages_per_sec = if total_time_secs > 0.0 {
589 total_installed as f64 / total_time_secs
590 } else {
591 0.0
592 };
593
594 InstallationStatistics {
595 total_installed,
596 total_failed,
597 total_time_secs,
598 total_bytes,
599 avg_download_speed,
600 packages_per_sec,
601 }
602 }
603
604 pub fn get_progress(&self) -> &InstallationProgress {
606 &self.progress
607 }
608}
609
610#[cfg(test)]
611mod tests {
612 use super::*;
613
614 #[test]
615 fn test_download_options() {
616 let options = DownloadOptions::new()
617 .with_max_parallel(16)
618 .with_timeout(600)
619 .with_max_retries(5);
620
621 assert_eq!(options.max_parallel, 16);
622 assert_eq!(options.timeout_secs, 600);
623 assert_eq!(options.max_retries, 5);
624 }
625
626 #[test]
627 fn test_installation_progress() {
628 let progress = InstallationProgress::new(10, 1000);
629
630 progress.mark_downloaded(100);
631 assert_eq!(progress.downloaded.load(Ordering::Relaxed), 1);
632 assert_eq!(progress.downloaded_bytes.load(Ordering::Relaxed), 100);
633
634 progress.mark_installed();
635 assert_eq!(progress.installed.load(Ordering::Relaxed), 1);
636
637 assert_eq!(progress.percentage(), 10.0);
638 assert_eq!(progress.download_percentage(), 10.0);
639 }
640
641 #[test]
642 fn test_installation_plan() {
643 let mut plan = InstallationPlan::new();
644
645 let pkg1 = PlannedPackage {
646 name: "pkg1".to_string(),
647 version: "1.0.0".to_string(),
648 priority: 0,
649 depends_on: vec![],
650 size: 1000,
651 };
652
653 let pkg2 = PlannedPackage {
654 name: "pkg2".to_string(),
655 version: "1.0.0".to_string(),
656 priority: 1,
657 depends_on: vec!["pkg1".to_string()],
658 size: 2000,
659 };
660
661 plan.add_package(pkg1);
662 plan.add_package(pkg2);
663
664 assert_eq!(plan.total_size, 3000);
665 assert_eq!(plan.packages.len(), 2);
666 }
667
668 #[test]
669 fn test_topological_sort() {
670 let mut plan = InstallationPlan::new();
671
672 plan.add_package(PlannedPackage {
674 name: "pkg3".to_string(),
675 version: "1.0.0".to_string(),
676 priority: 0,
677 depends_on: vec!["pkg2".to_string()],
678 size: 1000,
679 });
680
681 plan.add_package(PlannedPackage {
682 name: "pkg2".to_string(),
683 version: "1.0.0".to_string(),
684 priority: 1,
685 depends_on: vec!["pkg1".to_string()],
686 size: 1000,
687 });
688
689 plan.add_package(PlannedPackage {
690 name: "pkg1".to_string(),
691 version: "1.0.0".to_string(),
692 priority: 2,
693 depends_on: vec![],
694 size: 1000,
695 });
696
697 plan.sort_by_dependencies().unwrap();
698
699 assert_eq!(plan.packages[0].name, "pkg1");
701 assert_eq!(plan.packages[1].name, "pkg2");
702 assert_eq!(plan.packages[2].name, "pkg3");
703 }
704}