Skip to main content

thirdpass_py_lib/
lib.rs

1use anyhow::{format_err, Context, Result};
2use std::io::Read;
3use strum::IntoEnumIterator;
4
5mod pipfile;
6
7#[derive(Clone, Debug)]
8pub struct PyExtension {
9    name_: String,
10    registry_host_names_: Vec<String>,
11    registry_human_url_template_: String,
12}
13
14impl thirdpass_core::extension::FromLib for PyExtension {
15    fn new() -> Self {
16        Self {
17            name_: "py".to_string(),
18            registry_host_names_: vec!["pypi.org".to_owned()],
19            registry_human_url_template_:
20                "https://pypi.org/pypi/{{package_name}}/{{package_version}}/".to_string(),
21        }
22    }
23}
24
25impl thirdpass_core::extension::Extension for PyExtension {
26    fn name(&self) -> String {
27        self.name_.clone()
28    }
29
30    fn registries(&self) -> Vec<String> {
31        self.registry_host_names_.clone()
32    }
33
34    fn review_target_policy(&self) -> thirdpass_core::extension::ReviewTargetPolicy {
35        thirdpass_core::extension::ReviewTargetPolicy {
36            excluded_exact_paths: vec![
37                "Pipfile.lock".to_string(),
38                "poetry.lock".to_string(),
39                "uv.lock".to_string(),
40                "pdm.lock".to_string(),
41            ],
42        }
43    }
44
45    /// Returns a list of dependencies for the given package.
46    ///
47    /// Returns one package dependencies structure per registry.
48    fn identify_package_dependencies(
49        &self,
50        _package_name: &str,
51        _package_version: &Option<&str>,
52        _extension_args: &Vec<String>,
53    ) -> Result<Vec<thirdpass_core::extension::PackageDependencies>> {
54        Err(format_err!("Function unimplemented."))
55    }
56
57    fn identify_file_defined_dependencies(
58        &self,
59        working_directory: &std::path::PathBuf,
60        _extension_args: &Vec<String>,
61    ) -> Result<Vec<thirdpass_core::extension::FileDefinedDependencies>> {
62        // Identify all dependency definition files.
63        let dependency_files = match identify_dependency_files(&working_directory) {
64            Some(v) => v,
65            None => return Ok(Vec::new()),
66        };
67
68        // Read all dependencies definitions files.
69        let mut all_dependency_specs = Vec::new();
70        for dependency_file in dependency_files {
71            // TODO: Add support for parsing all definition file types.
72            let (dependencies, registry_host_name) = match dependency_file.r#type {
73                DependencyFileType::PipfileLock => (
74                    pipfile::get_dependencies(&dependency_file.path)?,
75                    pipfile::get_registry_host_name(),
76                ),
77            };
78            all_dependency_specs.push(thirdpass_core::extension::FileDefinedDependencies {
79                path: dependency_file.path,
80                registry_host_name: registry_host_name,
81                dependencies: dependencies.into_iter().collect(),
82            });
83        }
84
85        Ok(all_dependency_specs)
86    }
87
88    fn registries_package_metadata(
89        &self,
90        package_name: &str,
91        package_version: &Option<&str>,
92    ) -> Result<Vec<thirdpass_core::extension::RegistryPackageMetadata>> {
93        let package_version = match package_version {
94            Some(v) => Some(v.to_string()),
95            None => get_latest_version(&package_name)?,
96        }
97        .ok_or(format_err!("Failed to find package version."))?;
98
99        // Currently, only one registry is supported. Therefore simply select first.
100        let registry_host_name = self
101            .registries()
102            .first()
103            .ok_or(format_err!(
104                "Code error: vector of registry host names is empty."
105            ))?
106            .clone();
107
108        let entry_json = get_registry_entry_json(&package_name)?;
109        let artifact_url = get_archive_url(&entry_json, &package_version)?;
110        let human_url = get_registry_human_url(&self, &package_name, &package_version)?;
111
112        Ok(vec![thirdpass_core::extension::RegistryPackageMetadata {
113            registry_host_name: registry_host_name,
114            human_url: human_url.to_string(),
115            artifact_url: artifact_url.to_string(),
116            is_primary: true,
117            package_version: package_version.to_string(),
118        }])
119    }
120}
121
122/// Given package name, return latest version.
123fn get_latest_version(package_name: &str) -> Result<Option<String>> {
124    let json = get_registry_entry_json(&package_name)?;
125    let releases = json["releases"]
126        .as_object()
127        .ok_or(format_err!("Failed to find releases JSON section."))?;
128    let mut versions: Vec<semver::Version> = releases
129        .keys()
130        .filter(|v| v.chars().all(|c| c.is_numeric() || c == '.'))
131        .map(|v| semver::Version::parse(v))
132        .filter(|v| v.is_ok())
133        .map(|v| v.unwrap())
134        .collect();
135    versions.sort();
136
137    let latest_version = versions.last().map(|v| v.to_string());
138    Ok(latest_version)
139}
140
141fn get_registry_human_url(
142    extension: &PyExtension,
143    package_name: &str,
144    package_version: &str,
145) -> Result<url::Url> {
146    // Example return value: https://pypi.org/pypi/numpy/1.18.5/
147    let handlebars_registry = handlebars::Handlebars::new();
148    let human_url = handlebars_registry.render_template(
149        &extension.registry_human_url_template_,
150        &maplit::btreemap! {
151            "package_name" => package_name,
152            "package_version" => package_version,
153        },
154    )?;
155    Ok(url::Url::parse(human_url.as_str())?)
156}
157
158fn get_registry_entry_json(package_name: &str) -> Result<serde_json::Value> {
159    let handlebars_registry = handlebars::Handlebars::new();
160    let url = handlebars_registry.render_template(
161        "https://pypi.org/pypi/{{package_name}}/json",
162        &maplit::btreemap! {
163            "package_name" => package_name,
164        },
165    )?;
166    let mut result = reqwest::blocking::get(&url.to_string())?;
167    let mut body = String::new();
168    result.read_to_string(&mut body)?;
169
170    Ok(serde_json::from_str(&body).context(format!("JSON was not well-formatted:\n{}", body))?)
171}
172
173fn get_archive_url(
174    registry_entry_json: &serde_json::Value,
175    package_version: &str,
176) -> Result<url::Url> {
177    let releases_section = registry_entry_json
178        .get("releases")
179        .ok_or(format_err!("Failed to find releases JSON section."))?;
180    let release_entry = releases_section.get(package_version).ok_or(format_err!(
181        "Package version not found in registry releases: {}",
182        package_version
183    ))?;
184    let releases = release_entry.as_array().ok_or(format_err!(
185        "Registry releases entry for version {} is not an array.",
186        package_version
187    ))?;
188    if releases.is_empty() {
189        return Err(format_err!(
190            "No release artifacts found for version {}.",
191            package_version
192        ));
193    }
194    for release in releases {
195        let python_version = release["python_version"]
196            .as_str()
197            .ok_or(format_err!("Failed to parse package version."))?;
198        if python_version == "source" {
199            return Ok(url::Url::parse(
200                release["url"]
201                    .as_str()
202                    .ok_or(format_err!("Failed to parse package archive URL."))?,
203            )?);
204        }
205    }
206    Err(format_err!("Failed to identify package archive URL."))
207}
208
209/// Package dependency file types.
210#[derive(Debug, Copy, Clone, strum_macros::EnumIter)]
211enum DependencyFileType {
212    PipfileLock,
213}
214
215impl DependencyFileType {
216    /// Return file name associated with dependency type.
217    pub fn file_name(&self) -> std::path::PathBuf {
218        match self {
219            Self::PipfileLock => std::path::PathBuf::from("Pipfile.lock"),
220        }
221    }
222}
223
224/// Package dependency file type and file path.
225#[derive(Debug, Clone)]
226struct DependencyFile {
227    r#type: DependencyFileType,
228    path: std::path::PathBuf,
229}
230
231/// Returns a vector of identified package dependency definition files.
232///
233/// Walks up the directory tree directory tree until the first positive result is found.
234fn identify_dependency_files(
235    working_directory: &std::path::PathBuf,
236) -> Option<Vec<DependencyFile>> {
237    assert!(working_directory.is_absolute());
238    let mut working_directory = working_directory.clone();
239
240    loop {
241        // If at least one target is found, assume package is present.
242        let mut found_dependency_file = false;
243
244        let mut dependency_files: Vec<DependencyFile> = Vec::new();
245        for dependency_file_type in DependencyFileType::iter() {
246            let target_absolute_path = working_directory.join(dependency_file_type.file_name());
247            if target_absolute_path.is_file() {
248                found_dependency_file = true;
249                dependency_files.push(DependencyFile {
250                    r#type: dependency_file_type,
251                    path: target_absolute_path,
252                })
253            }
254        }
255        if found_dependency_file {
256            return Some(dependency_files);
257        }
258
259        // No need to move further up the directory tree after this loop.
260        if working_directory == std::path::PathBuf::from("/") {
261            break;
262        }
263
264        // Move further up the directory tree.
265        working_directory.pop();
266    }
267    None
268}
269
270#[cfg(test)]
271mod tests {
272    use super::*;
273    use thirdpass_core::extension::{Extension, FromLib};
274
275    struct TempProject {
276        root: std::path::PathBuf,
277    }
278
279    impl TempProject {
280        fn new(label: &str) -> Result<Self> {
281            let timestamp = std::time::SystemTime::now()
282                .duration_since(std::time::UNIX_EPOCH)?
283                .as_nanos();
284            let root = std::env::temp_dir().join(format!(
285                "thirdpass-py-{}-{}-{}",
286                label,
287                std::process::id(),
288                timestamp
289            ));
290            std::fs::create_dir_all(&root)?;
291            Ok(Self { root })
292        }
293
294        fn path(&self) -> &std::path::Path {
295            &self.root
296        }
297    }
298
299    impl Drop for TempProject {
300        fn drop(&mut self) {
301            let _ = std::fs::remove_dir_all(&self.root);
302        }
303    }
304
305    #[test]
306    fn review_target_policy_skips_python_lockfiles() {
307        let policy = PyExtension::new().review_target_policy();
308
309        assert!(policy.excludes_exact_path("Pipfile.lock"));
310        assert!(policy.excludes_exact_path("poetry.lock"));
311        assert!(policy.excludes_exact_path("uv.lock"));
312        assert!(policy.excludes_exact_path("pdm.lock"));
313        assert!(!policy.excludes_exact_path("pyproject.toml"));
314        assert!(!policy.excludes_exact_path("setup.py"));
315        assert!(!policy.excludes_exact_path("requirements.txt"));
316        assert!(!policy.excludes_exact_path("PKG-INFO"));
317    }
318
319    #[test]
320    fn file_defined_dependencies_parse_pipfile_lock_from_child_directory() -> Result<()> {
321        let project = TempProject::new("file-defined-dependencies")?;
322        let nested = project.path().join("src").join("package");
323        std::fs::create_dir_all(&nested)?;
324
325        let pipfile_lock_path = project.path().join("Pipfile.lock");
326        std::fs::write(
327            &pipfile_lock_path,
328            serde_json::to_string_pretty(&serde_json::json!({
329                "_meta": {},
330                "default": {
331                    "requests": {
332                        "version": "==2.32.3"
333                    }
334                },
335                "develop": {
336                    "pytest": {
337                        "version": "==8.3.4"
338                    }
339                }
340            }))?,
341        )?;
342
343        let extension = PyExtension::new();
344        let extension_args = Vec::new();
345        let groups = extension.identify_file_defined_dependencies(&nested, &extension_args)?;
346
347        assert_eq!(groups.len(), 1);
348        assert_eq!(groups[0].path, pipfile_lock_path);
349        assert_eq!(groups[0].registry_host_name, "pypi.org");
350        assert_dependency(&groups[0].dependencies, "requests", "2.32.3");
351        assert_dependency(&groups[0].dependencies, "pytest", "8.3.4");
352        Ok(())
353    }
354
355    fn assert_dependency(
356        dependencies: &[thirdpass_core::extension::Dependency],
357        name: &str,
358        version: &str,
359    ) {
360        assert!(
361            dependencies
362                .iter()
363                .any(|dependency| dependency.name == name
364                    && dependency.version == Ok(version.into())),
365            "expected dependency {}@{} in {:?}",
366            name,
367            version,
368            dependencies
369        );
370    }
371}