1use anyhow::{format_err, Context, Result};
2use std::io::Read;
3use strum::IntoEnumIterator;
4
5mod pipfile;
6
7#[derive(Clone, Debug)]
8pub struct PyExtension {
9 name_: String,
10 registry_host_names_: Vec<String>,
11 registry_human_url_template_: String,
12}
13
14impl thirdpass_core::extension::FromLib for PyExtension {
15 fn new() -> Self {
16 Self {
17 name_: "py".to_string(),
18 registry_host_names_: vec!["pypi.org".to_owned()],
19 registry_human_url_template_:
20 "https://pypi.org/pypi/{{package_name}}/{{package_version}}/".to_string(),
21 }
22 }
23}
24
25impl thirdpass_core::extension::Extension for PyExtension {
26 fn name(&self) -> String {
27 self.name_.clone()
28 }
29
30 fn registries(&self) -> Vec<String> {
31 self.registry_host_names_.clone()
32 }
33
34 fn review_target_policy(&self) -> thirdpass_core::extension::ReviewTargetPolicy {
35 thirdpass_core::extension::ReviewTargetPolicy {
36 excluded_exact_paths: vec![
37 "Pipfile.lock".to_string(),
38 "poetry.lock".to_string(),
39 "uv.lock".to_string(),
40 "pdm.lock".to_string(),
41 ],
42 }
43 }
44
45 fn identify_package_dependencies(
49 &self,
50 _package_name: &str,
51 _package_version: &Option<&str>,
52 _extension_args: &Vec<String>,
53 ) -> Result<Vec<thirdpass_core::extension::PackageDependencies>> {
54 Err(format_err!("Function unimplemented."))
55 }
56
57 fn identify_file_defined_dependencies(
58 &self,
59 working_directory: &std::path::PathBuf,
60 _extension_args: &Vec<String>,
61 ) -> Result<Vec<thirdpass_core::extension::FileDefinedDependencies>> {
62 let dependency_files = match identify_dependency_files(&working_directory) {
64 Some(v) => v,
65 None => return Ok(Vec::new()),
66 };
67
68 let mut all_dependency_specs = Vec::new();
70 for dependency_file in dependency_files {
71 let (dependencies, registry_host_name) = match dependency_file.r#type {
73 DependencyFileType::PipfileLock => (
74 pipfile::get_dependencies(&dependency_file.path)?,
75 pipfile::get_registry_host_name(),
76 ),
77 };
78 all_dependency_specs.push(thirdpass_core::extension::FileDefinedDependencies {
79 path: dependency_file.path,
80 registry_host_name: registry_host_name,
81 dependencies: dependencies.into_iter().collect(),
82 });
83 }
84
85 Ok(all_dependency_specs)
86 }
87
88 fn registries_package_metadata(
89 &self,
90 package_name: &str,
91 package_version: &Option<&str>,
92 ) -> Result<Vec<thirdpass_core::extension::RegistryPackageMetadata>> {
93 let package_version = match package_version {
94 Some(v) => Some(v.to_string()),
95 None => get_latest_version(&package_name)?,
96 }
97 .ok_or(format_err!("Failed to find package version."))?;
98
99 let registry_host_name = self
101 .registries()
102 .first()
103 .ok_or(format_err!(
104 "Code error: vector of registry host names is empty."
105 ))?
106 .clone();
107
108 let entry_json = get_registry_entry_json(&package_name)?;
109 let artifact_url = get_archive_url(&entry_json, &package_version)?;
110 let human_url = get_registry_human_url(&self, &package_name, &package_version)?;
111
112 Ok(vec![thirdpass_core::extension::RegistryPackageMetadata {
113 registry_host_name: registry_host_name,
114 human_url: human_url.to_string(),
115 artifact_url: artifact_url.to_string(),
116 is_primary: true,
117 package_version: package_version.to_string(),
118 }])
119 }
120}
121
122fn get_latest_version(package_name: &str) -> Result<Option<String>> {
124 let json = get_registry_entry_json(&package_name)?;
125 let releases = json["releases"]
126 .as_object()
127 .ok_or(format_err!("Failed to find releases JSON section."))?;
128 let mut versions: Vec<semver::Version> = releases
129 .keys()
130 .filter(|v| v.chars().all(|c| c.is_numeric() || c == '.'))
131 .map(|v| semver::Version::parse(v))
132 .filter(|v| v.is_ok())
133 .map(|v| v.unwrap())
134 .collect();
135 versions.sort();
136
137 let latest_version = versions.last().map(|v| v.to_string());
138 Ok(latest_version)
139}
140
141fn get_registry_human_url(
142 extension: &PyExtension,
143 package_name: &str,
144 package_version: &str,
145) -> Result<url::Url> {
146 let handlebars_registry = handlebars::Handlebars::new();
148 let human_url = handlebars_registry.render_template(
149 &extension.registry_human_url_template_,
150 &maplit::btreemap! {
151 "package_name" => package_name,
152 "package_version" => package_version,
153 },
154 )?;
155 Ok(url::Url::parse(human_url.as_str())?)
156}
157
158fn get_registry_entry_json(package_name: &str) -> Result<serde_json::Value> {
159 let handlebars_registry = handlebars::Handlebars::new();
160 let url = handlebars_registry.render_template(
161 "https://pypi.org/pypi/{{package_name}}/json",
162 &maplit::btreemap! {
163 "package_name" => package_name,
164 },
165 )?;
166 let mut result = reqwest::blocking::get(&url.to_string())?;
167 let mut body = String::new();
168 result.read_to_string(&mut body)?;
169
170 Ok(serde_json::from_str(&body).context(format!("JSON was not well-formatted:\n{}", body))?)
171}
172
173fn get_archive_url(
174 registry_entry_json: &serde_json::Value,
175 package_version: &str,
176) -> Result<url::Url> {
177 let releases_section = registry_entry_json
178 .get("releases")
179 .ok_or(format_err!("Failed to find releases JSON section."))?;
180 let release_entry = releases_section.get(package_version).ok_or(format_err!(
181 "Package version not found in registry releases: {}",
182 package_version
183 ))?;
184 let releases = release_entry.as_array().ok_or(format_err!(
185 "Registry releases entry for version {} is not an array.",
186 package_version
187 ))?;
188 if releases.is_empty() {
189 return Err(format_err!(
190 "No release artifacts found for version {}.",
191 package_version
192 ));
193 }
194 for release in releases {
195 let python_version = release["python_version"]
196 .as_str()
197 .ok_or(format_err!("Failed to parse package version."))?;
198 if python_version == "source" {
199 return Ok(url::Url::parse(
200 release["url"]
201 .as_str()
202 .ok_or(format_err!("Failed to parse package archive URL."))?,
203 )?);
204 }
205 }
206 Err(format_err!("Failed to identify package archive URL."))
207}
208
209#[derive(Debug, Copy, Clone, strum_macros::EnumIter)]
211enum DependencyFileType {
212 PipfileLock,
213}
214
215impl DependencyFileType {
216 pub fn file_name(&self) -> std::path::PathBuf {
218 match self {
219 Self::PipfileLock => std::path::PathBuf::from("Pipfile.lock"),
220 }
221 }
222}
223
224#[derive(Debug, Clone)]
226struct DependencyFile {
227 r#type: DependencyFileType,
228 path: std::path::PathBuf,
229}
230
231fn identify_dependency_files(
235 working_directory: &std::path::PathBuf,
236) -> Option<Vec<DependencyFile>> {
237 assert!(working_directory.is_absolute());
238 let mut working_directory = working_directory.clone();
239
240 loop {
241 let mut found_dependency_file = false;
243
244 let mut dependency_files: Vec<DependencyFile> = Vec::new();
245 for dependency_file_type in DependencyFileType::iter() {
246 let target_absolute_path = working_directory.join(dependency_file_type.file_name());
247 if target_absolute_path.is_file() {
248 found_dependency_file = true;
249 dependency_files.push(DependencyFile {
250 r#type: dependency_file_type,
251 path: target_absolute_path,
252 })
253 }
254 }
255 if found_dependency_file {
256 return Some(dependency_files);
257 }
258
259 if working_directory == std::path::PathBuf::from("/") {
261 break;
262 }
263
264 working_directory.pop();
266 }
267 None
268}
269
270#[cfg(test)]
271mod tests {
272 use super::*;
273 use thirdpass_core::extension::{Extension, FromLib};
274
275 struct TempProject {
276 root: std::path::PathBuf,
277 }
278
279 impl TempProject {
280 fn new(label: &str) -> Result<Self> {
281 let timestamp = std::time::SystemTime::now()
282 .duration_since(std::time::UNIX_EPOCH)?
283 .as_nanos();
284 let root = std::env::temp_dir().join(format!(
285 "thirdpass-py-{}-{}-{}",
286 label,
287 std::process::id(),
288 timestamp
289 ));
290 std::fs::create_dir_all(&root)?;
291 Ok(Self { root })
292 }
293
294 fn path(&self) -> &std::path::Path {
295 &self.root
296 }
297 }
298
299 impl Drop for TempProject {
300 fn drop(&mut self) {
301 let _ = std::fs::remove_dir_all(&self.root);
302 }
303 }
304
305 #[test]
306 fn review_target_policy_skips_python_lockfiles() {
307 let policy = PyExtension::new().review_target_policy();
308
309 assert!(policy.excludes_exact_path("Pipfile.lock"));
310 assert!(policy.excludes_exact_path("poetry.lock"));
311 assert!(policy.excludes_exact_path("uv.lock"));
312 assert!(policy.excludes_exact_path("pdm.lock"));
313 assert!(!policy.excludes_exact_path("pyproject.toml"));
314 assert!(!policy.excludes_exact_path("setup.py"));
315 assert!(!policy.excludes_exact_path("requirements.txt"));
316 assert!(!policy.excludes_exact_path("PKG-INFO"));
317 }
318
319 #[test]
320 fn file_defined_dependencies_parse_pipfile_lock_from_child_directory() -> Result<()> {
321 let project = TempProject::new("file-defined-dependencies")?;
322 let nested = project.path().join("src").join("package");
323 std::fs::create_dir_all(&nested)?;
324
325 let pipfile_lock_path = project.path().join("Pipfile.lock");
326 std::fs::write(
327 &pipfile_lock_path,
328 serde_json::to_string_pretty(&serde_json::json!({
329 "_meta": {},
330 "default": {
331 "requests": {
332 "version": "==2.32.3"
333 }
334 },
335 "develop": {
336 "pytest": {
337 "version": "==8.3.4"
338 }
339 }
340 }))?,
341 )?;
342
343 let extension = PyExtension::new();
344 let extension_args = Vec::new();
345 let groups = extension.identify_file_defined_dependencies(&nested, &extension_args)?;
346
347 assert_eq!(groups.len(), 1);
348 assert_eq!(groups[0].path, pipfile_lock_path);
349 assert_eq!(groups[0].registry_host_name, "pypi.org");
350 assert_dependency(&groups[0].dependencies, "requests", "2.32.3");
351 assert_dependency(&groups[0].dependencies, "pytest", "8.3.4");
352 Ok(())
353 }
354
355 fn assert_dependency(
356 dependencies: &[thirdpass_core::extension::Dependency],
357 name: &str,
358 version: &str,
359 ) {
360 assert!(
361 dependencies
362 .iter()
363 .any(|dependency| dependency.name == name
364 && dependency.version == Ok(version.into())),
365 "expected dependency {}@{} in {:?}",
366 name,
367 version,
368 dependencies
369 );
370 }
371}