pub mod parallel;
use std::{str::Utf8Error, thread};
use crate::{
cache::RepositoryCache, git::coord_locks::CoordinateLocks,
model::protofetch::resolved::ResolvedDependency, sync::Semaphore,
};
use log::info;
use thiserror::Error;
pub use parallel::ParallelConfig;
#[derive(Error, Debug)]
pub enum FetchError {
#[error("Error while fetching repo from cache: {0}")]
Cache(anyhow::Error),
#[error("Git error: {0}")]
GitError(#[from] git2::Error),
#[error("Error while decoding utf8 bytes from blob: {0}")]
BlobRead(#[from] Utf8Error),
#[error("Error while parsing descriptor")]
Parsing(#[from] crate::model::ParseError),
#[error("Error while processing protobuf repository: {0}")]
ProtoRepoError(#[from] crate::git::repository::ProtoRepoError),
#[error("IO error: {0}")]
IO(#[from] std::io::Error),
#[error(transparent)]
Resolver(anyhow::Error),
}
pub fn fetch_sources_parallel<C>(
cache: C,
dependencies: Vec<ResolvedDependency>,
coord_locks: CoordinateLocks,
network_jobs: usize,
) -> Result<(), FetchError>
where
C: RepositoryCache + Clone + 'static,
{
info!("Fetching dependencies source files...");
let net_sem = Semaphore::new(network_jobs.max(1));
thread::scope(|s| -> Result<(), FetchError> {
let mut handles = Vec::with_capacity(dependencies.len());
for dependency in dependencies {
let cache = cache.clone();
let coord_lock = coord_locks.lock_for(&dependency.coordinate);
let net_sem = &net_sem;
handles.push(s.spawn(move || {
let _permit = net_sem.acquire();
let _g = coord_lock.lock().expect("coord lock poisoned");
cache
.fetch(
&dependency.coordinate,
&dependency.specification,
&dependency.commit_hash,
)
.map_err(FetchError::Cache)
}));
}
for h in handles {
match h.join() {
Ok(result) => result?,
Err(payload) => std::panic::resume_unwind(payload),
}
}
Ok(())
})
}
#[cfg(test)]
pub(crate) mod tests {
use std::collections::BTreeMap;
use anyhow::anyhow;
use crate::{
model::protofetch::{
lock::{LockFile, LockedCoordinate, LockedDependency},
resolved::ResolvedModule,
Coordinate, Dependency, Descriptor, ModuleName, Revision, RevisionSpecification, Rules,
},
resolver::{CommitAndDescriptor, ModuleResolver},
};
use super::*;
use pretty_assertions::assert_eq;
pub(crate) fn resolve(
descriptor: &Descriptor,
resolver: &impl ModuleResolver,
) -> Result<(ResolvedModule, LockFile), FetchError> {
fn go(
resolver: &impl ModuleResolver,
results: &mut BTreeMap<ModuleName, (LockedDependency, ResolvedDependency)>,
dependencies: &[Dependency],
) -> Result<(), FetchError> {
let mut children = Vec::new();
for dependency in dependencies {
let locked_coordinate = LockedCoordinate::from(&dependency.coordinate);
match results.get(&dependency.name) {
None => {
log::info!("Resolving {}", dependency.coordinate);
let CommitAndDescriptor {
commit_hash,
mut descriptor,
} = resolver
.resolve(
&dependency.coordinate,
&dependency.specification,
None,
&dependency.name,
)
.map_err(FetchError::Resolver)?;
let locked = LockedDependency {
name: dependency.name.clone(),
commit_hash: commit_hash.clone(),
coordinate: locked_coordinate,
specification: dependency.specification.clone(),
};
let resolved = ResolvedDependency {
name: dependency.name.clone(),
commit_hash,
coordinate: dependency.coordinate.clone(),
specification: dependency.specification.clone(),
rules: dependency.rules.clone(),
dependencies: descriptor
.dependencies
.iter()
.map(|d| d.name.clone())
.collect(),
};
results.insert(dependency.name.clone(), (locked, resolved));
children.append(&mut descriptor.dependencies);
}
Some((already_locked, _)) => {
if already_locked.coordinate != locked_coordinate {
log::warn!(
"discarded {} in favor of {} for {}",
dependency.coordinate,
already_locked.coordinate,
&dependency.name
);
} else if already_locked.specification != dependency.specification {
log::warn!(
"discarded {} in favor of {} for {}",
dependency.specification,
already_locked.specification,
&dependency.name
);
}
}
}
}
if !children.is_empty() {
go(resolver, results, &children)?;
}
Ok(())
}
let mut results = BTreeMap::new();
go(resolver, &mut results, &descriptor.dependencies)?;
let (locked, resolved) = results.into_values().unzip();
let resolved = ResolvedModule {
module_name: descriptor.name.clone(),
dependencies: resolved,
};
let lockfile = LockFile {
dependencies: locked,
};
Ok((resolved, lockfile))
}
#[derive(Default)]
struct FakeModuleResolver {
entries: BTreeMap<Coordinate, BTreeMap<RevisionSpecification, CommitAndDescriptor>>,
}
impl FakeModuleResolver {
fn push(&mut self, name: &str, revision: &str, commit_hash: &str, descriptor: Descriptor) {
self.entries.entry(coordinate(name)).or_default().insert(
RevisionSpecification {
revision: Revision::pinned(revision),
branch: None,
},
CommitAndDescriptor {
commit_hash: commit_hash.to_string(),
descriptor,
},
);
}
}
impl ModuleResolver for FakeModuleResolver {
fn resolve(
&self,
coordinate: &Coordinate,
specification: &RevisionSpecification,
_: Option<&str>,
_: &ModuleName,
) -> anyhow::Result<CommitAndDescriptor> {
Ok(self
.entries
.get(coordinate)
.ok_or_else(|| anyhow!("Coordinate not found: {}", coordinate))?
.get(specification)
.ok_or_else(|| anyhow!("Specification not found: {}", specification))?
.clone())
}
}
fn coordinate(name: &str) -> Coordinate {
Coordinate::from_url(&format!("example.com/org/{}", name)).unwrap()
}
fn dependency(name: &str, revision: &str) -> Dependency {
Dependency {
name: ModuleName::from(name),
coordinate: coordinate(name),
specification: RevisionSpecification {
revision: Revision::pinned(revision),
branch: None,
},
rules: Rules::default(),
}
}
fn locked_dependency(name: &str, revision: &str, commit_hash: &str) -> LockedDependency {
LockedDependency {
name: ModuleName::from(name),
coordinate: LockedCoordinate {
url: format!("example.com/org/{}", name),
protocol: None,
},
specification: RevisionSpecification {
revision: Revision::pinned(revision),
branch: None,
},
commit_hash: commit_hash.to_owned(),
}
}
#[test]
fn resolve_transitive() {
let mut resolver = FakeModuleResolver::default();
resolver.push(
"foo",
"1.0.0",
"c1",
Descriptor {
name: ModuleName::from("foo"),
description: None,
proto_out_dir: None,
dependencies: vec![dependency("bar", "2.0.0")],
},
);
resolver.push(
"bar",
"2.0.0",
"c2",
Descriptor {
name: ModuleName::from("bar"),
description: None,
proto_out_dir: None,
dependencies: Vec::new(),
},
);
let (_, lockfile) = resolve(
&Descriptor {
name: ModuleName::from("root"),
description: None,
proto_out_dir: None,
dependencies: vec![dependency("foo", "1.0.0")],
},
&resolver,
)
.unwrap();
assert_eq!(
lockfile,
LockFile {
dependencies: vec![
locked_dependency("bar", "2.0.0", "c2"),
locked_dependency("foo", "1.0.0", "c1")
]
}
)
}
#[test]
fn resolve_transitive_root_priority() {
let mut resolver = FakeModuleResolver::default();
resolver.push(
"foo",
"1.0.0",
"c1",
Descriptor {
name: ModuleName::from("foo"),
description: None,
proto_out_dir: None,
dependencies: vec![dependency("bar", "2.0.0")],
},
);
resolver.push(
"bar",
"1.0.0",
"c3",
Descriptor {
name: ModuleName::from("bar"),
description: None,
proto_out_dir: None,
dependencies: Vec::new(),
},
);
resolver.push(
"bar",
"2.0.0",
"c2",
Descriptor {
name: ModuleName::from("bar"),
description: None,
proto_out_dir: None,
dependencies: Vec::new(),
},
);
let (_, lockfile) = resolve(
&Descriptor {
name: ModuleName::from("root"),
description: None,
proto_out_dir: None,
dependencies: vec![dependency("foo", "1.0.0"), dependency("bar", "1.0.0")],
},
&resolver,
)
.unwrap();
assert_eq!(
lockfile,
LockFile {
dependencies: vec![
locked_dependency("bar", "1.0.0", "c3"),
locked_dependency("foo", "1.0.0", "c1"),
]
}
)
}
}