1use std::collections::HashMap;
26use std::path::Path;
27
28use crate::parser_warn as warn;
29use packageurl::PackageUrl;
30use serde_json::Value as JsonValue;
31use toml::Value as TomlValue;
32use toml::map::Map as TomlMap;
33
34use crate::models::{DatasourceId, Dependency, PackageData, PackageType, Sha256Digest};
35use crate::parsers::python::read_toml_file;
36use crate::parsers::utils::{MAX_ITERATION_COUNT, read_file_to_string, truncate_field};
37
38use super::PackageParser;
39use super::metadata::ParserMetadata;
40
41const FIELD_META: &str = "_meta";
42const FIELD_HASH: &str = "hash";
43const FIELD_SHA256: &str = "sha256";
44const FIELD_DEFAULT: &str = "default";
45const FIELD_DEVELOP: &str = "develop";
46const FIELD_VERSION: &str = "version";
47const FIELD_HASHES: &str = "hashes";
48
49const FIELD_PACKAGES: &str = "packages";
50const FIELD_DEV_PACKAGES: &str = "dev-packages";
51const FIELD_REQUIRES: &str = "requires";
52const FIELD_SOURCE: &str = "source";
53const FIELD_PYTHON_VERSION: &str = "python_version";
54
55pub struct PipfileLockParser;
60
61impl PackageParser for PipfileLockParser {
62 const PACKAGE_TYPE: PackageType = PackageType::Pypi;
63
64 fn metadata() -> Vec<ParserMetadata> {
65 vec![ParserMetadata {
66 description: "Pipenv lockfile and manifest",
67 file_patterns: &["**/Pipfile.lock", "**/Pipfile"],
68 package_type: "pypi",
69 primary_language: "Python",
70 documentation_url: Some("https://github.com/pypa/pipfile"),
71 }]
72 }
73
74 fn is_match(path: &Path) -> bool {
75 path.file_name()
76 .and_then(|name| name.to_str())
77 .map(|name| name == "Pipfile.lock" || name == "Pipfile")
78 .unwrap_or(false)
79 }
80
81 fn extract_packages(path: &Path) -> Vec<PackageData> {
82 vec![match path.file_name().and_then(|name| name.to_str()) {
83 Some("Pipfile.lock") => extract_from_pipfile_lock(path),
84 Some("Pipfile") => extract_from_pipfile(path),
85 _ => default_package_data(None),
86 }]
87 }
88}
89
90fn extract_from_pipfile_lock(path: &Path) -> PackageData {
91 let content = match read_file_to_string(path, None) {
92 Ok(content) => content,
93 Err(e) => {
94 warn!("Failed to read Pipfile.lock at {:?}: {}", path, e);
95 return default_package_data(Some(DatasourceId::PipfileLock));
96 }
97 };
98
99 let json_content: JsonValue = match serde_json::from_str(&content) {
100 Ok(content) => content,
101 Err(e) => {
102 warn!("Failed to parse Pipfile.lock at {:?}: {}", path, e);
103 return default_package_data(Some(DatasourceId::PipfileLock));
104 }
105 };
106
107 parse_pipfile_lock(&json_content)
108}
109
110fn parse_pipfile_lock(json_content: &JsonValue) -> PackageData {
111 let mut package_data = default_package_data(Some(DatasourceId::PipfileLock));
112 package_data.sha256 = extract_lockfile_sha256(json_content);
113
114 let meta = json_content
115 .get(FIELD_META)
116 .and_then(|value| value.as_object());
117 let pipfile_spec = meta.and_then(|value| value.get("pipfile-spec"));
118 let sources = meta.and_then(|value| value.get("sources"));
119 let requires = meta.and_then(|value| value.get("requires"));
120 let _ = (pipfile_spec, sources, requires);
121
122 let default_deps = extract_lockfile_dependencies(json_content, FIELD_DEFAULT, "install", true);
123 let develop_deps = extract_lockfile_dependencies(json_content, FIELD_DEVELOP, "develop", false);
124 package_data.dependencies = [default_deps, develop_deps].concat();
125
126 package_data
127}
128
129fn extract_lockfile_sha256(json_content: &JsonValue) -> Option<Sha256Digest> {
130 json_content
131 .get(FIELD_META)
132 .and_then(|meta| meta.get(FIELD_HASH))
133 .and_then(|hash| hash.get(FIELD_SHA256))
134 .and_then(|value| value.as_str())
135 .and_then(|s| Sha256Digest::from_hex(s).ok())
136}
137
138fn extract_lockfile_dependencies(
139 json_content: &JsonValue,
140 section: &str,
141 scope: &str,
142 is_runtime: bool,
143) -> Vec<Dependency> {
144 let mut dependencies = Vec::new();
145
146 if let Some(section_map) = json_content
147 .get(section)
148 .and_then(|value| value.as_object())
149 {
150 for (name, value) in section_map.iter().take(MAX_ITERATION_COUNT) {
151 if let Some(dependency) = build_lockfile_dependency(name, value, scope, is_runtime) {
152 dependencies.push(dependency);
153 }
154 }
155 }
156
157 dependencies
158}
159
160fn build_lockfile_dependency(
161 name: &str,
162 value: &JsonValue,
163 scope: &str,
164 is_runtime: bool,
165) -> Option<Dependency> {
166 let normalized_name = normalize_pypi_name(name);
167 let requirement = extract_lockfile_requirement(value)?;
168 let version = strip_pipfile_lock_version(&requirement);
169 let purl = create_pypi_purl(&normalized_name, version.as_deref());
170
171 let _hashes = extract_lockfile_hashes(value);
172
173 Some(Dependency {
174 purl,
175 extracted_requirement: Some(truncate_field(requirement)),
176 scope: Some(scope.to_string()),
177 is_runtime: Some(is_runtime),
178 is_optional: Some(false),
179 is_pinned: Some(true),
180 is_direct: Some(true),
181 resolved_package: None,
182 extra_data: None,
183 })
184}
185
186fn extract_lockfile_requirement(value: &JsonValue) -> Option<String> {
187 match value {
188 JsonValue::String(spec) => Some(truncate_field(spec.to_string())),
189 JsonValue::Object(map) => map
190 .get(FIELD_VERSION)
191 .and_then(|version| version.as_str())
192 .map(|version| truncate_field(version.to_string())),
193 _ => None,
194 }
195}
196
197fn extract_lockfile_hashes(value: &JsonValue) -> Vec<String> {
198 let mut hashes = Vec::new();
199 let hash_values = value
200 .get(FIELD_HASHES)
201 .and_then(|hashes_value| hashes_value.as_array());
202
203 if let Some(hash_values) = hash_values {
204 for hash_value in hash_values {
205 if let Some(hash) = hash_value.as_str()
206 && let Some(stripped) = hash.strip_prefix("sha256:")
207 {
208 hashes.push(truncate_field(stripped.to_string()));
209 }
210 }
211 }
212
213 hashes
214}
215
216fn strip_pipfile_lock_version(requirement: &str) -> Option<String> {
217 let trimmed = requirement.trim();
218 if let Some(stripped) = trimmed.strip_prefix("==") {
219 let version = stripped.trim();
220 if version.is_empty() {
221 None
222 } else {
223 Some(truncate_field(version.to_string()))
224 }
225 } else {
226 None
227 }
228}
229
230fn extract_from_pipfile(path: &Path) -> PackageData {
231 let toml_content = match read_toml_file(path) {
232 Ok(content) => content,
233 Err(e) => {
234 warn!("Failed to read Pipfile at {:?}: {}", path, e);
235 return default_package_data(Some(DatasourceId::Pipfile));
236 }
237 };
238
239 parse_pipfile(&toml_content)
240}
241
242fn parse_pipfile(toml_content: &TomlValue) -> PackageData {
243 let mut package_data = default_package_data(Some(DatasourceId::Pipfile));
244
245 let packages = toml_content
246 .get(FIELD_PACKAGES)
247 .and_then(|value| value.as_table());
248 let dev_packages = toml_content
249 .get(FIELD_DEV_PACKAGES)
250 .and_then(|value| value.as_table());
251
252 let mut dependencies = Vec::new();
253 if let Some(packages) = packages {
254 dependencies.extend(extract_pipfile_dependencies(packages, "install", true));
255 }
256 if let Some(dev_packages) = dev_packages {
257 dependencies.extend(extract_pipfile_dependencies(dev_packages, "develop", false));
258 }
259
260 package_data.dependencies = dependencies;
261 package_data.extra_data = build_pipfile_extra_data(toml_content);
262
263 package_data
264}
265
266fn extract_pipfile_dependencies(
267 packages: &TomlMap<String, TomlValue>,
268 scope: &str,
269 is_runtime: bool,
270) -> Vec<Dependency> {
271 let mut dependencies = Vec::new();
272
273 for (name, value) in packages.iter().take(MAX_ITERATION_COUNT) {
274 if let Some(dependency) = build_pipfile_dependency(name, value, scope, is_runtime) {
275 dependencies.push(dependency);
276 }
277 }
278
279 dependencies
280}
281
282fn build_pipfile_dependency(
283 name: &str,
284 value: &TomlValue,
285 scope: &str,
286 is_runtime: bool,
287) -> Option<Dependency> {
288 let normalized_name = normalize_pypi_name(name);
289 let requirement = extract_pipfile_requirement(value);
290 if requirement.is_none() && is_non_registry_dependency(value) {
291 return None;
292 }
293 let requirement = requirement?;
294 let purl = create_pypi_purl(&normalized_name, None);
295
296 Some(Dependency {
297 purl,
298 extracted_requirement: Some(truncate_field(requirement)),
299 scope: Some(scope.to_string()),
300 is_runtime: Some(is_runtime),
301 is_optional: Some(false),
302 is_pinned: Some(false),
303 is_direct: Some(true),
304 resolved_package: None,
305 extra_data: None,
306 })
307}
308
309fn extract_pipfile_requirement(value: &TomlValue) -> Option<String> {
310 match value {
311 TomlValue::String(spec) => Some(truncate_field(spec.to_string())),
312 TomlValue::Boolean(true) => Some("*".to_string()),
313 TomlValue::Table(table) => table
314 .get(FIELD_VERSION)
315 .and_then(|version| version.as_str())
316 .map(|version| truncate_field(version.to_string())),
317 _ => None,
318 }
319}
320
321fn is_non_registry_dependency(value: &TomlValue) -> bool {
322 let table = match value {
323 TomlValue::Table(table) => table,
324 _ => return false,
325 };
326
327 ["git", "path", "file", "url", "hg", "svn"]
328 .iter()
329 .any(|key| table.contains_key(*key))
330}
331
332fn build_pipfile_extra_data(
333 toml_content: &TomlValue,
334) -> Option<HashMap<String, serde_json::Value>> {
335 let mut extra_data = HashMap::new();
336
337 if let Some(requires_table) = toml_content
338 .get(FIELD_REQUIRES)
339 .and_then(|value| value.as_table())
340 && let Some(python_version) = requires_table
341 .get(FIELD_PYTHON_VERSION)
342 .and_then(|value| value.as_str())
343 {
344 extra_data.insert(
345 FIELD_PYTHON_VERSION.to_string(),
346 serde_json::Value::String(truncate_field(python_version.to_string())),
347 );
348 }
349
350 if let Some(source_value) = toml_content.get(FIELD_SOURCE)
351 && let Some(sources) = parse_pipfile_sources(source_value)
352 {
353 extra_data.insert("sources".to_string(), sources);
354 }
355
356 if extra_data.is_empty() {
357 None
358 } else {
359 Some(extra_data)
360 }
361}
362
363fn parse_pipfile_sources(source_value: &TomlValue) -> Option<serde_json::Value> {
364 match source_value {
365 TomlValue::Array(sources) => {
366 let mut json_sources = Vec::new();
367 for source in sources {
368 if let Some(table) = source.as_table() {
369 let mut json_map = serde_json::Map::new();
370 if let Some(name) = table.get("name").and_then(|value| value.as_str()) {
371 json_map.insert(
372 "name".to_string(),
373 serde_json::Value::String(truncate_field(name.to_string())),
374 );
375 }
376 if let Some(url) = table.get("url").and_then(|value| value.as_str()) {
377 json_map.insert(
378 "url".to_string(),
379 serde_json::Value::String(truncate_field(url.to_string())),
380 );
381 }
382 if let Some(verify_ssl) =
383 table.get("verify_ssl").and_then(|value| value.as_bool())
384 {
385 json_map.insert(
386 "verify_ssl".to_string(),
387 serde_json::Value::Bool(verify_ssl),
388 );
389 }
390 json_sources.push(serde_json::Value::Object(json_map));
391 }
392 }
393
394 Some(serde_json::Value::Array(json_sources))
395 }
396 TomlValue::Table(table) => {
397 let mut json_map = serde_json::Map::new();
398 for (key, value) in table {
399 match value {
400 TomlValue::String(value) => {
401 json_map.insert(
402 key.to_string(),
403 serde_json::Value::String(truncate_field(value.to_string())),
404 );
405 }
406 TomlValue::Boolean(value) => {
407 json_map.insert(key.to_string(), serde_json::Value::Bool(*value));
408 }
409 _ => {}
410 }
411 }
412 Some(serde_json::Value::Object(json_map))
413 }
414 _ => None,
415 }
416}
417
418fn normalize_pypi_name(name: &str) -> String {
419 truncate_field(name.trim().to_ascii_lowercase())
420}
421
422fn create_pypi_purl(name: &str, version: Option<&str>) -> Option<String> {
423 let mut purl = PackageUrl::new(PipfileLockParser::PACKAGE_TYPE.as_str(), name).ok()?;
424 if let Some(version) = version
425 && purl.with_version(version).is_err()
426 {
427 return None;
428 }
429
430 Some(purl.to_string())
431}
432
433fn default_package_data(datasource_id: Option<DatasourceId>) -> PackageData {
434 PackageData {
435 package_type: Some(PipfileLockParser::PACKAGE_TYPE),
436 primary_language: Some("Python".to_string()),
437 datasource_id,
438 ..Default::default()
439 }
440}