Skip to main content

thirdpass_py_lib/
lib.rs

1use anyhow::{format_err, Context, Result};
2use std::io::Read;
3use strum::IntoEnumIterator;
4
5mod pipfile;
6
7#[derive(Clone, Debug)]
8pub struct PyExtension {
9    name_: String,
10    registry_host_names_: Vec<String>,
11    registry_human_url_template_: String,
12}
13
14impl thirdpass_core::extension::FromLib for PyExtension {
15    fn new() -> Self {
16        Self {
17            name_: "py".to_string(),
18            registry_host_names_: vec!["pypi.org".to_owned()],
19            registry_human_url_template_:
20                "https://pypi.org/pypi/{{package_name}}/{{package_version}}/".to_string(),
21        }
22    }
23}
24
25impl thirdpass_core::extension::Extension for PyExtension {
26    fn name(&self) -> String {
27        self.name_.clone()
28    }
29
30    fn registries(&self) -> Vec<String> {
31        self.registry_host_names_.clone()
32    }
33
34    fn review_target_policy(&self) -> thirdpass_core::extension::ReviewTargetPolicy {
35        thirdpass_core::extension::ReviewTargetPolicy {
36            excluded_exact_paths: vec![
37                "Pipfile.lock".to_string(),
38                "poetry.lock".to_string(),
39                "uv.lock".to_string(),
40                "pdm.lock".to_string(),
41            ],
42        }
43    }
44
45    /// Returns a list of dependencies for the given package.
46    ///
47    /// Returns one package dependencies structure per registry.
48    fn identify_package_dependencies(
49        &self,
50        _package_name: &str,
51        _package_version: &Option<&str>,
52        _extension_args: &[String],
53    ) -> Result<Vec<thirdpass_core::extension::PackageDependencies>> {
54        Err(format_err!("Function unimplemented."))
55    }
56
57    fn identify_file_defined_dependencies(
58        &self,
59        working_directory: &std::path::Path,
60        _extension_args: &[String],
61    ) -> Result<Vec<thirdpass_core::extension::FileDefinedDependencies>> {
62        // Identify all dependency definition files.
63        let dependency_files = match identify_dependency_files(working_directory) {
64            Some(v) => v,
65            None => return Ok(Vec::new()),
66        };
67
68        // Read all dependencies definitions files.
69        let mut all_dependency_specs = Vec::new();
70        for dependency_file in dependency_files {
71            // TODO: Add support for parsing all definition file types.
72            let (dependencies, registry_host_name) = match dependency_file.r#type {
73                DependencyFileType::PipfileLock => (
74                    pipfile::get_dependencies(&dependency_file.path)?,
75                    pipfile::get_registry_host_name(),
76                ),
77            };
78            all_dependency_specs.push(thirdpass_core::extension::FileDefinedDependencies {
79                path: dependency_file.path,
80                registry_host_name,
81                dependencies: dependencies.into_iter().collect(),
82            });
83        }
84
85        Ok(all_dependency_specs)
86    }
87
88    fn registries_package_metadata(
89        &self,
90        package_name: &str,
91        package_version: &Option<&str>,
92    ) -> Result<Vec<thirdpass_core::extension::RegistryPackageMetadata>> {
93        let package_version = match package_version {
94            Some(v) => Some(v.to_string()),
95            None => get_latest_version(package_name)?,
96        }
97        .ok_or(format_err!("Failed to find package version."))?;
98
99        // Currently, only one registry is supported. Therefore simply select first.
100        let registry_host_name = self
101            .registries()
102            .first()
103            .ok_or(format_err!(
104                "Code error: vector of registry host names is empty."
105            ))?
106            .clone();
107
108        let entry_json = get_registry_entry_json(package_name)?;
109        let artifact_url = get_archive_url(&entry_json, &package_version)?;
110        let human_url = get_registry_human_url(self, package_name, &package_version)?;
111
112        Ok(vec![thirdpass_core::extension::RegistryPackageMetadata {
113            registry_host_name,
114            human_url: human_url.to_string(),
115            artifact_url: artifact_url.to_string(),
116            is_primary: true,
117            package_version: package_version.to_string(),
118        }])
119    }
120}
121
122/// Given package name, return latest version.
123fn get_latest_version(package_name: &str) -> Result<Option<String>> {
124    let json = get_registry_entry_json(package_name)?;
125    let releases = json["releases"]
126        .as_object()
127        .ok_or(format_err!("Failed to find releases JSON section."))?;
128    let mut versions: Vec<semver::Version> = releases
129        .keys()
130        .filter(|v| v.chars().all(|c| c.is_numeric() || c == '.'))
131        .filter_map(|v| semver::Version::parse(v).ok())
132        .collect();
133    versions.sort();
134
135    let latest_version = versions.last().map(|v| v.to_string());
136    Ok(latest_version)
137}
138
139fn get_registry_human_url(
140    extension: &PyExtension,
141    package_name: &str,
142    package_version: &str,
143) -> Result<url::Url> {
144    // Example return value: https://pypi.org/pypi/numpy/1.18.5/
145    let handlebars_registry = handlebars::Handlebars::new();
146    let human_url = handlebars_registry.render_template(
147        &extension.registry_human_url_template_,
148        &maplit::btreemap! {
149            "package_name" => package_name,
150            "package_version" => package_version,
151        },
152    )?;
153    Ok(url::Url::parse(human_url.as_str())?)
154}
155
156fn get_registry_entry_json(package_name: &str) -> Result<serde_json::Value> {
157    let handlebars_registry = handlebars::Handlebars::new();
158    let url = handlebars_registry.render_template(
159        "https://pypi.org/pypi/{{package_name}}/json",
160        &maplit::btreemap! {
161            "package_name" => package_name,
162        },
163    )?;
164    let mut result = reqwest::blocking::get(&url.to_string())?;
165    let mut body = String::new();
166    result.read_to_string(&mut body)?;
167
168    serde_json::from_str(&body).context(format!("JSON was not well-formatted:\n{}", body))
169}
170
171fn get_archive_url(
172    registry_entry_json: &serde_json::Value,
173    package_version: &str,
174) -> Result<url::Url> {
175    let releases_section = registry_entry_json
176        .get("releases")
177        .ok_or(format_err!("Failed to find releases JSON section."))?;
178    let release_entry = releases_section.get(package_version).ok_or(format_err!(
179        "Package version not found in registry releases: {}",
180        package_version
181    ))?;
182    let releases = release_entry.as_array().ok_or(format_err!(
183        "Registry releases entry for version {} is not an array.",
184        package_version
185    ))?;
186    if releases.is_empty() {
187        return Err(format_err!(
188            "No release artifacts found for version {}.",
189            package_version
190        ));
191    }
192    for release in releases {
193        let python_version = release["python_version"]
194            .as_str()
195            .ok_or(format_err!("Failed to parse package version."))?;
196        if python_version == "source" {
197            return Ok(url::Url::parse(
198                release["url"]
199                    .as_str()
200                    .ok_or(format_err!("Failed to parse package archive URL."))?,
201            )?);
202        }
203    }
204    Err(format_err!("Failed to identify package archive URL."))
205}
206
207/// Package dependency file types.
208#[derive(Debug, Copy, Clone, strum_macros::EnumIter)]
209enum DependencyFileType {
210    PipfileLock,
211}
212
213impl DependencyFileType {
214    /// Return file name associated with dependency type.
215    pub fn file_name(&self) -> std::path::PathBuf {
216        match self {
217            Self::PipfileLock => std::path::PathBuf::from("Pipfile.lock"),
218        }
219    }
220}
221
222/// Package dependency file type and file path.
223#[derive(Debug, Clone)]
224struct DependencyFile {
225    r#type: DependencyFileType,
226    path: std::path::PathBuf,
227}
228
229/// Returns a vector of identified package dependency definition files.
230///
231/// Walks up the directory tree directory tree until the first positive result is found.
232fn identify_dependency_files(working_directory: &std::path::Path) -> Option<Vec<DependencyFile>> {
233    assert!(working_directory.is_absolute());
234    let mut working_directory = working_directory.to_path_buf();
235
236    loop {
237        // If at least one target is found, assume package is present.
238        let mut found_dependency_file = false;
239
240        let mut dependency_files: Vec<DependencyFile> = Vec::new();
241        for dependency_file_type in DependencyFileType::iter() {
242            let target_absolute_path = working_directory.join(dependency_file_type.file_name());
243            if target_absolute_path.is_file() {
244                found_dependency_file = true;
245                dependency_files.push(DependencyFile {
246                    r#type: dependency_file_type,
247                    path: target_absolute_path,
248                })
249            }
250        }
251        if found_dependency_file {
252            return Some(dependency_files);
253        }
254
255        // No need to move further up the directory tree after this loop.
256        if working_directory == std::path::Path::new("/") {
257            break;
258        }
259
260        // Move further up the directory tree.
261        working_directory.pop();
262    }
263    None
264}
265
266#[cfg(test)]
267mod tests {
268    use super::*;
269    use thirdpass_core::extension::{Extension, FromLib};
270
271    struct TempProject {
272        root: std::path::PathBuf,
273    }
274
275    impl TempProject {
276        fn new(label: &str) -> Result<Self> {
277            let timestamp = std::time::SystemTime::now()
278                .duration_since(std::time::UNIX_EPOCH)?
279                .as_nanos();
280            let root = std::env::temp_dir().join(format!(
281                "thirdpass-py-{}-{}-{}",
282                label,
283                std::process::id(),
284                timestamp
285            ));
286            std::fs::create_dir_all(&root)?;
287            Ok(Self { root })
288        }
289
290        fn path(&self) -> &std::path::Path {
291            &self.root
292        }
293    }
294
295    impl Drop for TempProject {
296        fn drop(&mut self) {
297            let _ = std::fs::remove_dir_all(&self.root);
298        }
299    }
300
301    #[test]
302    fn review_target_policy_skips_python_lockfiles() {
303        let policy = PyExtension::new().review_target_policy();
304
305        assert!(policy.excludes_exact_path("Pipfile.lock"));
306        assert!(policy.excludes_exact_path("poetry.lock"));
307        assert!(policy.excludes_exact_path("uv.lock"));
308        assert!(policy.excludes_exact_path("pdm.lock"));
309        assert!(!policy.excludes_exact_path("pyproject.toml"));
310        assert!(!policy.excludes_exact_path("setup.py"));
311        assert!(!policy.excludes_exact_path("requirements.txt"));
312        assert!(!policy.excludes_exact_path("PKG-INFO"));
313    }
314
315    #[test]
316    fn file_defined_dependencies_parse_pipfile_lock_from_child_directory() -> Result<()> {
317        let project = TempProject::new("file-defined-dependencies")?;
318        let nested = project.path().join("src").join("package");
319        std::fs::create_dir_all(&nested)?;
320
321        let pipfile_lock_path = project.path().join("Pipfile.lock");
322        std::fs::write(
323            &pipfile_lock_path,
324            serde_json::to_string_pretty(&serde_json::json!({
325                "_meta": {},
326                "default": {
327                    "requests": {
328                        "version": "==2.32.3"
329                    }
330                },
331                "develop": {
332                    "pytest": {
333                        "version": "==8.3.4"
334                    }
335                }
336            }))?,
337        )?;
338
339        let extension = PyExtension::new();
340        let extension_args = Vec::new();
341        let groups = extension.identify_file_defined_dependencies(&nested, &extension_args)?;
342
343        assert_eq!(groups.len(), 1);
344        assert_eq!(groups[0].path, pipfile_lock_path);
345        assert_eq!(groups[0].registry_host_name, "pypi.org");
346        assert_dependency(&groups[0].dependencies, "requests", "2.32.3");
347        assert_dependency(&groups[0].dependencies, "pytest", "8.3.4");
348        Ok(())
349    }
350
351    fn assert_dependency(
352        dependencies: &[thirdpass_core::extension::Dependency],
353        name: &str,
354        version: &str,
355    ) {
356        assert!(
357            dependencies
358                .iter()
359                .any(|dependency| dependency.name == name
360                    && dependency.version == Ok(version.into())),
361            "expected dependency {}@{} in {:?}",
362            name,
363            version,
364            dependencies
365        );
366    }
367}