1use anyhow::{format_err, Context, Result};
2use std::io::Read;
3use strum::IntoEnumIterator;
4
5mod pipfile;
6
7#[derive(Clone, Debug)]
8pub struct PyExtension {
9 name_: String,
10 registry_host_names_: Vec<String>,
11 registry_human_url_template_: String,
12}
13
14impl thirdpass_core::extension::FromLib for PyExtension {
15 fn new() -> Self {
16 Self {
17 name_: "py".to_string(),
18 registry_host_names_: vec!["pypi.org".to_owned()],
19 registry_human_url_template_:
20 "https://pypi.org/pypi/{{package_name}}/{{package_version}}/".to_string(),
21 }
22 }
23}
24
25impl thirdpass_core::extension::Extension for PyExtension {
26 fn name(&self) -> String {
27 self.name_.clone()
28 }
29
30 fn registries(&self) -> Vec<String> {
31 self.registry_host_names_.clone()
32 }
33
34 fn review_target_policy(&self) -> thirdpass_core::extension::ReviewTargetPolicy {
35 thirdpass_core::extension::ReviewTargetPolicy {
36 excluded_exact_paths: vec![
37 "Pipfile.lock".to_string(),
38 "poetry.lock".to_string(),
39 "uv.lock".to_string(),
40 "pdm.lock".to_string(),
41 ],
42 }
43 }
44
45 fn identify_package_dependencies(
49 &self,
50 _package_name: &str,
51 _package_version: &Option<&str>,
52 _extension_args: &[String],
53 ) -> Result<Vec<thirdpass_core::extension::PackageDependencies>> {
54 Err(format_err!("Function unimplemented."))
55 }
56
57 fn identify_file_defined_dependencies(
58 &self,
59 working_directory: &std::path::Path,
60 _extension_args: &[String],
61 ) -> Result<Vec<thirdpass_core::extension::FileDefinedDependencies>> {
62 let dependency_files = match identify_dependency_files(working_directory) {
64 Some(v) => v,
65 None => return Ok(Vec::new()),
66 };
67
68 let mut all_dependency_specs = Vec::new();
70 for dependency_file in dependency_files {
71 let (dependencies, registry_host_name) = match dependency_file.r#type {
73 DependencyFileType::PipfileLock => (
74 pipfile::get_dependencies(&dependency_file.path)?,
75 pipfile::get_registry_host_name(),
76 ),
77 };
78 all_dependency_specs.push(thirdpass_core::extension::FileDefinedDependencies {
79 path: dependency_file.path,
80 registry_host_name,
81 dependencies: dependencies.into_iter().collect(),
82 });
83 }
84
85 Ok(all_dependency_specs)
86 }
87
88 fn registries_package_metadata(
89 &self,
90 package_name: &str,
91 package_version: &Option<&str>,
92 ) -> Result<Vec<thirdpass_core::extension::RegistryPackageMetadata>> {
93 let package_version = match package_version {
94 Some(v) => Some(v.to_string()),
95 None => get_latest_version(package_name)?,
96 }
97 .ok_or(format_err!("Failed to find package version."))?;
98
99 let registry_host_name = self
101 .registries()
102 .first()
103 .ok_or(format_err!(
104 "Code error: vector of registry host names is empty."
105 ))?
106 .clone();
107
108 let entry_json = get_registry_entry_json(package_name)?;
109 let artifact_url = get_archive_url(&entry_json, &package_version)?;
110 let human_url = get_registry_human_url(self, package_name, &package_version)?;
111
112 Ok(vec![thirdpass_core::extension::RegistryPackageMetadata {
113 registry_host_name,
114 human_url: human_url.to_string(),
115 artifact_url: artifact_url.to_string(),
116 is_primary: true,
117 package_version: package_version.to_string(),
118 }])
119 }
120}
121
122fn get_latest_version(package_name: &str) -> Result<Option<String>> {
124 let json = get_registry_entry_json(package_name)?;
125 let releases = json["releases"]
126 .as_object()
127 .ok_or(format_err!("Failed to find releases JSON section."))?;
128 let mut versions: Vec<semver::Version> = releases
129 .keys()
130 .filter(|v| v.chars().all(|c| c.is_numeric() || c == '.'))
131 .filter_map(|v| semver::Version::parse(v).ok())
132 .collect();
133 versions.sort();
134
135 let latest_version = versions.last().map(|v| v.to_string());
136 Ok(latest_version)
137}
138
139fn get_registry_human_url(
140 extension: &PyExtension,
141 package_name: &str,
142 package_version: &str,
143) -> Result<url::Url> {
144 let handlebars_registry = handlebars::Handlebars::new();
146 let human_url = handlebars_registry.render_template(
147 &extension.registry_human_url_template_,
148 &maplit::btreemap! {
149 "package_name" => package_name,
150 "package_version" => package_version,
151 },
152 )?;
153 Ok(url::Url::parse(human_url.as_str())?)
154}
155
156fn get_registry_entry_json(package_name: &str) -> Result<serde_json::Value> {
157 let handlebars_registry = handlebars::Handlebars::new();
158 let url = handlebars_registry.render_template(
159 "https://pypi.org/pypi/{{package_name}}/json",
160 &maplit::btreemap! {
161 "package_name" => package_name,
162 },
163 )?;
164 let mut result = reqwest::blocking::get(&url.to_string())?;
165 let mut body = String::new();
166 result.read_to_string(&mut body)?;
167
168 serde_json::from_str(&body).context(format!("JSON was not well-formatted:\n{}", body))
169}
170
171fn get_archive_url(
172 registry_entry_json: &serde_json::Value,
173 package_version: &str,
174) -> Result<url::Url> {
175 let releases_section = registry_entry_json
176 .get("releases")
177 .ok_or(format_err!("Failed to find releases JSON section."))?;
178 let release_entry = releases_section.get(package_version).ok_or(format_err!(
179 "Package version not found in registry releases: {}",
180 package_version
181 ))?;
182 let releases = release_entry.as_array().ok_or(format_err!(
183 "Registry releases entry for version {} is not an array.",
184 package_version
185 ))?;
186 if releases.is_empty() {
187 return Err(format_err!(
188 "No release artifacts found for version {}.",
189 package_version
190 ));
191 }
192 for release in releases {
193 let python_version = release["python_version"]
194 .as_str()
195 .ok_or(format_err!("Failed to parse package version."))?;
196 if python_version == "source" {
197 return Ok(url::Url::parse(
198 release["url"]
199 .as_str()
200 .ok_or(format_err!("Failed to parse package archive URL."))?,
201 )?);
202 }
203 }
204 Err(format_err!("Failed to identify package archive URL."))
205}
206
207#[derive(Debug, Copy, Clone, strum_macros::EnumIter)]
209enum DependencyFileType {
210 PipfileLock,
211}
212
213impl DependencyFileType {
214 pub fn file_name(&self) -> std::path::PathBuf {
216 match self {
217 Self::PipfileLock => std::path::PathBuf::from("Pipfile.lock"),
218 }
219 }
220}
221
222#[derive(Debug, Clone)]
224struct DependencyFile {
225 r#type: DependencyFileType,
226 path: std::path::PathBuf,
227}
228
229fn identify_dependency_files(working_directory: &std::path::Path) -> Option<Vec<DependencyFile>> {
233 assert!(working_directory.is_absolute());
234 let mut working_directory = working_directory.to_path_buf();
235
236 loop {
237 let mut found_dependency_file = false;
239
240 let mut dependency_files: Vec<DependencyFile> = Vec::new();
241 for dependency_file_type in DependencyFileType::iter() {
242 let target_absolute_path = working_directory.join(dependency_file_type.file_name());
243 if target_absolute_path.is_file() {
244 found_dependency_file = true;
245 dependency_files.push(DependencyFile {
246 r#type: dependency_file_type,
247 path: target_absolute_path,
248 })
249 }
250 }
251 if found_dependency_file {
252 return Some(dependency_files);
253 }
254
255 if working_directory == std::path::Path::new("/") {
257 break;
258 }
259
260 working_directory.pop();
262 }
263 None
264}
265
266#[cfg(test)]
267mod tests {
268 use super::*;
269 use thirdpass_core::extension::{Extension, FromLib};
270
271 struct TempProject {
272 root: std::path::PathBuf,
273 }
274
275 impl TempProject {
276 fn new(label: &str) -> Result<Self> {
277 let timestamp = std::time::SystemTime::now()
278 .duration_since(std::time::UNIX_EPOCH)?
279 .as_nanos();
280 let root = std::env::temp_dir().join(format!(
281 "thirdpass-py-{}-{}-{}",
282 label,
283 std::process::id(),
284 timestamp
285 ));
286 std::fs::create_dir_all(&root)?;
287 Ok(Self { root })
288 }
289
290 fn path(&self) -> &std::path::Path {
291 &self.root
292 }
293 }
294
295 impl Drop for TempProject {
296 fn drop(&mut self) {
297 let _ = std::fs::remove_dir_all(&self.root);
298 }
299 }
300
301 #[test]
302 fn review_target_policy_skips_python_lockfiles() {
303 let policy = PyExtension::new().review_target_policy();
304
305 assert!(policy.excludes_exact_path("Pipfile.lock"));
306 assert!(policy.excludes_exact_path("poetry.lock"));
307 assert!(policy.excludes_exact_path("uv.lock"));
308 assert!(policy.excludes_exact_path("pdm.lock"));
309 assert!(!policy.excludes_exact_path("pyproject.toml"));
310 assert!(!policy.excludes_exact_path("setup.py"));
311 assert!(!policy.excludes_exact_path("requirements.txt"));
312 assert!(!policy.excludes_exact_path("PKG-INFO"));
313 }
314
315 #[test]
316 fn file_defined_dependencies_parse_pipfile_lock_from_child_directory() -> Result<()> {
317 let project = TempProject::new("file-defined-dependencies")?;
318 let nested = project.path().join("src").join("package");
319 std::fs::create_dir_all(&nested)?;
320
321 let pipfile_lock_path = project.path().join("Pipfile.lock");
322 std::fs::write(
323 &pipfile_lock_path,
324 serde_json::to_string_pretty(&serde_json::json!({
325 "_meta": {},
326 "default": {
327 "requests": {
328 "version": "==2.32.3"
329 }
330 },
331 "develop": {
332 "pytest": {
333 "version": "==8.3.4"
334 }
335 }
336 }))?,
337 )?;
338
339 let extension = PyExtension::new();
340 let extension_args = Vec::new();
341 let groups = extension.identify_file_defined_dependencies(&nested, &extension_args)?;
342
343 assert_eq!(groups.len(), 1);
344 assert_eq!(groups[0].path, pipfile_lock_path);
345 assert_eq!(groups[0].registry_host_name, "pypi.org");
346 assert_dependency(&groups[0].dependencies, "requests", "2.32.3");
347 assert_dependency(&groups[0].dependencies, "pytest", "8.3.4");
348 Ok(())
349 }
350
351 fn assert_dependency(
352 dependencies: &[thirdpass_core::extension::Dependency],
353 name: &str,
354 version: &str,
355 ) {
356 assert!(
357 dependencies
358 .iter()
359 .any(|dependency| dependency.name == name
360 && dependency.version == Ok(version.into())),
361 "expected dependency {}@{} in {:?}",
362 name,
363 version,
364 dependencies
365 );
366 }
367}