protofetch 0.1.17

A source dependency management tool for Protobuf.
Documentation
pub mod parallel;

use std::{str::Utf8Error, thread};

use crate::{
    cache::RepositoryCache, git::coord_locks::CoordinateLocks,
    model::protofetch::resolved::ResolvedDependency, sync::Semaphore,
};
use log::info;
use thiserror::Error;

pub use parallel::ParallelConfig;

#[derive(Error, Debug)]
pub enum FetchError {
    #[error("Error while fetching repo from cache: {0}")]
    Cache(anyhow::Error),
    #[error("Git error: {0}")]
    GitError(#[from] git2::Error),
    #[error("Error while decoding utf8 bytes from blob: {0}")]
    BlobRead(#[from] Utf8Error),
    #[error("Error while parsing descriptor")]
    Parsing(#[from] crate::model::ParseError),
    #[error("Error while processing protobuf repository: {0}")]
    ProtoRepoError(#[from] crate::git::repository::ProtoRepoError),
    #[error("IO error: {0}")]
    IO(#[from] std::io::Error),
    #[error(transparent)]
    Resolver(anyhow::Error),
}

/// Fans dependencies out across `network_jobs` worker threads, gated by
/// the network semaphore and serialized per-coordinate so two fetches into
/// the same on-disk bare repo don't race.
pub fn fetch_sources_parallel<C>(
    cache: C,
    dependencies: Vec<ResolvedDependency>,
    coord_locks: CoordinateLocks,
    network_jobs: usize,
) -> Result<(), FetchError>
where
    C: RepositoryCache + Clone + 'static,
{
    info!("Fetching dependencies source files...");
    let net_sem = Semaphore::new(network_jobs.max(1));

    thread::scope(|s| -> Result<(), FetchError> {
        let mut handles = Vec::with_capacity(dependencies.len());
        for dependency in dependencies {
            let cache = cache.clone();
            let coord_lock = coord_locks.lock_for(&dependency.coordinate);
            let net_sem = &net_sem;
            handles.push(s.spawn(move || {
                let _permit = net_sem.acquire();
                let _g = coord_lock.lock().expect("coord lock poisoned");
                cache
                    .fetch(
                        &dependency.coordinate,
                        &dependency.specification,
                        &dependency.commit_hash,
                    )
                    .map_err(FetchError::Cache)
            }));
        }
        for h in handles {
            match h.join() {
                Ok(result) => result?,
                Err(payload) => std::panic::resume_unwind(payload),
            }
        }
        Ok(())
    })
}

#[cfg(test)]
pub(crate) mod tests {
    use std::collections::BTreeMap;

    use anyhow::anyhow;

    use crate::{
        model::protofetch::{
            lock::{LockFile, LockedCoordinate, LockedDependency},
            resolved::ResolvedModule,
            Coordinate, Dependency, Descriptor, ModuleName, Revision, RevisionSpecification, Rules,
        },
        resolver::{CommitAndDescriptor, ModuleResolver},
    };

    use super::*;

    use pretty_assertions::assert_eq;

    /// Sequential resolver kept for unit-test parity with the parallel implementation.
    pub(crate) fn resolve(
        descriptor: &Descriptor,
        resolver: &impl ModuleResolver,
    ) -> Result<(ResolvedModule, LockFile), FetchError> {
        fn go(
            resolver: &impl ModuleResolver,
            results: &mut BTreeMap<ModuleName, (LockedDependency, ResolvedDependency)>,
            dependencies: &[Dependency],
        ) -> Result<(), FetchError> {
            let mut children = Vec::new();
            for dependency in dependencies {
                let locked_coordinate = LockedCoordinate::from(&dependency.coordinate);
                match results.get(&dependency.name) {
                    None => {
                        log::info!("Resolving {}", dependency.coordinate);
                        let CommitAndDescriptor {
                            commit_hash,
                            mut descriptor,
                        } = resolver
                            .resolve(
                                &dependency.coordinate,
                                &dependency.specification,
                                None,
                                &dependency.name,
                            )
                            .map_err(FetchError::Resolver)?;

                        let locked = LockedDependency {
                            name: dependency.name.clone(),
                            commit_hash: commit_hash.clone(),
                            coordinate: locked_coordinate,
                            specification: dependency.specification.clone(),
                        };

                        let resolved = ResolvedDependency {
                            name: dependency.name.clone(),
                            commit_hash,
                            coordinate: dependency.coordinate.clone(),
                            specification: dependency.specification.clone(),
                            rules: dependency.rules.clone(),
                            dependencies: descriptor
                                .dependencies
                                .iter()
                                .map(|d| d.name.clone())
                                .collect(),
                        };

                        results.insert(dependency.name.clone(), (locked, resolved));
                        children.append(&mut descriptor.dependencies);
                    }
                    Some((already_locked, _)) => {
                        if already_locked.coordinate != locked_coordinate {
                            log::warn!(
                                "discarded {} in favor of {} for {}",
                                dependency.coordinate,
                                already_locked.coordinate,
                                &dependency.name
                            );
                        } else if already_locked.specification != dependency.specification {
                            log::warn!(
                                "discarded {} in favor of {} for {}",
                                dependency.specification,
                                already_locked.specification,
                                &dependency.name
                            );
                        }
                    }
                }
            }

            if !children.is_empty() {
                go(resolver, results, &children)?;
            }

            Ok(())
        }

        let mut results = BTreeMap::new();

        go(resolver, &mut results, &descriptor.dependencies)?;

        let (locked, resolved) = results.into_values().unzip();

        let resolved = ResolvedModule {
            module_name: descriptor.name.clone(),
            dependencies: resolved,
        };

        let lockfile = LockFile {
            dependencies: locked,
        };

        Ok((resolved, lockfile))
    }

    #[derive(Default)]
    struct FakeModuleResolver {
        entries: BTreeMap<Coordinate, BTreeMap<RevisionSpecification, CommitAndDescriptor>>,
    }

    impl FakeModuleResolver {
        fn push(&mut self, name: &str, revision: &str, commit_hash: &str, descriptor: Descriptor) {
            self.entries.entry(coordinate(name)).or_default().insert(
                RevisionSpecification {
                    revision: Revision::pinned(revision),
                    branch: None,
                },
                CommitAndDescriptor {
                    commit_hash: commit_hash.to_string(),
                    descriptor,
                },
            );
        }
    }

    impl ModuleResolver for FakeModuleResolver {
        fn resolve(
            &self,
            coordinate: &Coordinate,
            specification: &RevisionSpecification,
            _: Option<&str>,
            _: &ModuleName,
        ) -> anyhow::Result<CommitAndDescriptor> {
            Ok(self
                .entries
                .get(coordinate)
                .ok_or_else(|| anyhow!("Coordinate not found: {}", coordinate))?
                .get(specification)
                .ok_or_else(|| anyhow!("Specification not found: {}", specification))?
                .clone())
        }
    }

    fn coordinate(name: &str) -> Coordinate {
        Coordinate::from_url(&format!("example.com/org/{}", name)).unwrap()
    }

    fn dependency(name: &str, revision: &str) -> Dependency {
        Dependency {
            name: ModuleName::from(name),
            coordinate: coordinate(name),
            specification: RevisionSpecification {
                revision: Revision::pinned(revision),
                branch: None,
            },
            rules: Rules::default(),
        }
    }

    fn locked_dependency(name: &str, revision: &str, commit_hash: &str) -> LockedDependency {
        LockedDependency {
            name: ModuleName::from(name),
            coordinate: LockedCoordinate {
                url: format!("example.com/org/{}", name),
                protocol: None,
            },
            specification: RevisionSpecification {
                revision: Revision::pinned(revision),
                branch: None,
            },
            commit_hash: commit_hash.to_owned(),
        }
    }

    #[test]
    fn resolve_transitive() {
        let mut resolver = FakeModuleResolver::default();
        resolver.push(
            "foo",
            "1.0.0",
            "c1",
            Descriptor {
                name: ModuleName::from("foo"),
                description: None,
                proto_out_dir: None,
                dependencies: vec![dependency("bar", "2.0.0")],
            },
        );

        resolver.push(
            "bar",
            "2.0.0",
            "c2",
            Descriptor {
                name: ModuleName::from("bar"),
                description: None,
                proto_out_dir: None,
                dependencies: Vec::new(),
            },
        );

        let (_, lockfile) = resolve(
            &Descriptor {
                name: ModuleName::from("root"),
                description: None,
                proto_out_dir: None,
                dependencies: vec![dependency("foo", "1.0.0")],
            },
            &resolver,
        )
        .unwrap();

        assert_eq!(
            lockfile,
            LockFile {
                dependencies: vec![
                    locked_dependency("bar", "2.0.0", "c2"),
                    locked_dependency("foo", "1.0.0", "c1")
                ]
            }
        )
    }

    #[test]
    fn resolve_transitive_root_priority() {
        let mut resolver = FakeModuleResolver::default();
        resolver.push(
            "foo",
            "1.0.0",
            "c1",
            Descriptor {
                name: ModuleName::from("foo"),
                description: None,
                proto_out_dir: None,
                dependencies: vec![dependency("bar", "2.0.0")],
            },
        );

        resolver.push(
            "bar",
            "1.0.0",
            "c3",
            Descriptor {
                name: ModuleName::from("bar"),
                description: None,
                proto_out_dir: None,
                dependencies: Vec::new(),
            },
        );
        resolver.push(
            "bar",
            "2.0.0",
            "c2",
            Descriptor {
                name: ModuleName::from("bar"),
                description: None,
                proto_out_dir: None,
                dependencies: Vec::new(),
            },
        );

        let (_, lockfile) = resolve(
            &Descriptor {
                name: ModuleName::from("root"),
                description: None,
                proto_out_dir: None,
                dependencies: vec![dependency("foo", "1.0.0"), dependency("bar", "1.0.0")],
            },
            &resolver,
        )
        .unwrap();

        assert_eq!(
            lockfile,
            LockFile {
                dependencies: vec![
                    locked_dependency("bar", "1.0.0", "c3"),
                    locked_dependency("foo", "1.0.0", "c1"),
                ]
            }
        )
    }
}