1use std::collections::HashMap;
26use std::path::Path;
27
28use crate::parser_warn as warn;
29use packageurl::PackageUrl;
30use serde_json::Value as JsonValue;
31use toml::Value as TomlValue;
32use toml::map::Map as TomlMap;
33
34use crate::models::{DatasourceId, Dependency, PackageData, PackageType, Sha256Digest};
35use crate::parsers::python::read_toml_file;
36use crate::parsers::utils::{MAX_ITERATION_COUNT, read_file_to_string, truncate_field};
37
38use super::PackageParser;
39
40const FIELD_META: &str = "_meta";
41const FIELD_HASH: &str = "hash";
42const FIELD_SHA256: &str = "sha256";
43const FIELD_DEFAULT: &str = "default";
44const FIELD_DEVELOP: &str = "develop";
45const FIELD_VERSION: &str = "version";
46const FIELD_HASHES: &str = "hashes";
47
48const FIELD_PACKAGES: &str = "packages";
49const FIELD_DEV_PACKAGES: &str = "dev-packages";
50const FIELD_REQUIRES: &str = "requires";
51const FIELD_SOURCE: &str = "source";
52const FIELD_PYTHON_VERSION: &str = "python_version";
53
54pub struct PipfileLockParser;
59
60impl PackageParser for PipfileLockParser {
61 const PACKAGE_TYPE: PackageType = PackageType::Pypi;
62
63 fn is_match(path: &Path) -> bool {
64 path.file_name()
65 .and_then(|name| name.to_str())
66 .map(|name| name == "Pipfile.lock" || name == "Pipfile")
67 .unwrap_or(false)
68 }
69
70 fn extract_packages(path: &Path) -> Vec<PackageData> {
71 vec![match path.file_name().and_then(|name| name.to_str()) {
72 Some("Pipfile.lock") => extract_from_pipfile_lock(path),
73 Some("Pipfile") => extract_from_pipfile(path),
74 _ => default_package_data(None),
75 }]
76 }
77}
78
79fn extract_from_pipfile_lock(path: &Path) -> PackageData {
80 let content = match read_file_to_string(path, None) {
81 Ok(content) => content,
82 Err(e) => {
83 warn!("Failed to read Pipfile.lock at {:?}: {}", path, e);
84 return default_package_data(Some(DatasourceId::PipfileLock));
85 }
86 };
87
88 let json_content: JsonValue = match serde_json::from_str(&content) {
89 Ok(content) => content,
90 Err(e) => {
91 warn!("Failed to parse Pipfile.lock at {:?}: {}", path, e);
92 return default_package_data(Some(DatasourceId::PipfileLock));
93 }
94 };
95
96 parse_pipfile_lock(&json_content)
97}
98
99fn parse_pipfile_lock(json_content: &JsonValue) -> PackageData {
100 let mut package_data = default_package_data(Some(DatasourceId::PipfileLock));
101 package_data.sha256 = extract_lockfile_sha256(json_content);
102
103 let meta = json_content
104 .get(FIELD_META)
105 .and_then(|value| value.as_object());
106 let pipfile_spec = meta.and_then(|value| value.get("pipfile-spec"));
107 let sources = meta.and_then(|value| value.get("sources"));
108 let requires = meta.and_then(|value| value.get("requires"));
109 let _ = (pipfile_spec, sources, requires);
110
111 let default_deps = extract_lockfile_dependencies(json_content, FIELD_DEFAULT, "install", true);
112 let develop_deps = extract_lockfile_dependencies(json_content, FIELD_DEVELOP, "develop", false);
113 package_data.dependencies = [default_deps, develop_deps].concat();
114
115 package_data
116}
117
118fn extract_lockfile_sha256(json_content: &JsonValue) -> Option<Sha256Digest> {
119 json_content
120 .get(FIELD_META)
121 .and_then(|meta| meta.get(FIELD_HASH))
122 .and_then(|hash| hash.get(FIELD_SHA256))
123 .and_then(|value| value.as_str())
124 .and_then(|s| Sha256Digest::from_hex(s).ok())
125}
126
127fn extract_lockfile_dependencies(
128 json_content: &JsonValue,
129 section: &str,
130 scope: &str,
131 is_runtime: bool,
132) -> Vec<Dependency> {
133 let mut dependencies = Vec::new();
134
135 if let Some(section_map) = json_content
136 .get(section)
137 .and_then(|value| value.as_object())
138 {
139 for (name, value) in section_map.iter().take(MAX_ITERATION_COUNT) {
140 if let Some(dependency) = build_lockfile_dependency(name, value, scope, is_runtime) {
141 dependencies.push(dependency);
142 }
143 }
144 }
145
146 dependencies
147}
148
149fn build_lockfile_dependency(
150 name: &str,
151 value: &JsonValue,
152 scope: &str,
153 is_runtime: bool,
154) -> Option<Dependency> {
155 let normalized_name = normalize_pypi_name(name);
156 let requirement = extract_lockfile_requirement(value)?;
157 let version = strip_pipfile_lock_version(&requirement);
158 let purl = create_pypi_purl(&normalized_name, version.as_deref());
159
160 let _hashes = extract_lockfile_hashes(value);
161
162 Some(Dependency {
163 purl,
164 extracted_requirement: Some(truncate_field(requirement)),
165 scope: Some(scope.to_string()),
166 is_runtime: Some(is_runtime),
167 is_optional: Some(false),
168 is_pinned: Some(true),
169 is_direct: Some(true),
170 resolved_package: None,
171 extra_data: None,
172 })
173}
174
175fn extract_lockfile_requirement(value: &JsonValue) -> Option<String> {
176 match value {
177 JsonValue::String(spec) => Some(truncate_field(spec.to_string())),
178 JsonValue::Object(map) => map
179 .get(FIELD_VERSION)
180 .and_then(|version| version.as_str())
181 .map(|version| truncate_field(version.to_string())),
182 _ => None,
183 }
184}
185
186fn extract_lockfile_hashes(value: &JsonValue) -> Vec<String> {
187 let mut hashes = Vec::new();
188 let hash_values = value
189 .get(FIELD_HASHES)
190 .and_then(|hashes_value| hashes_value.as_array());
191
192 if let Some(hash_values) = hash_values {
193 for hash_value in hash_values {
194 if let Some(hash) = hash_value.as_str()
195 && let Some(stripped) = hash.strip_prefix("sha256:")
196 {
197 hashes.push(truncate_field(stripped.to_string()));
198 }
199 }
200 }
201
202 hashes
203}
204
205fn strip_pipfile_lock_version(requirement: &str) -> Option<String> {
206 let trimmed = requirement.trim();
207 if let Some(stripped) = trimmed.strip_prefix("==") {
208 let version = stripped.trim();
209 if version.is_empty() {
210 None
211 } else {
212 Some(truncate_field(version.to_string()))
213 }
214 } else {
215 None
216 }
217}
218
219fn extract_from_pipfile(path: &Path) -> PackageData {
220 let toml_content = match read_toml_file(path) {
221 Ok(content) => content,
222 Err(e) => {
223 warn!("Failed to read Pipfile at {:?}: {}", path, e);
224 return default_package_data(Some(DatasourceId::Pipfile));
225 }
226 };
227
228 parse_pipfile(&toml_content)
229}
230
231fn parse_pipfile(toml_content: &TomlValue) -> PackageData {
232 let mut package_data = default_package_data(Some(DatasourceId::Pipfile));
233
234 let packages = toml_content
235 .get(FIELD_PACKAGES)
236 .and_then(|value| value.as_table());
237 let dev_packages = toml_content
238 .get(FIELD_DEV_PACKAGES)
239 .and_then(|value| value.as_table());
240
241 let mut dependencies = Vec::new();
242 if let Some(packages) = packages {
243 dependencies.extend(extract_pipfile_dependencies(packages, "install", true));
244 }
245 if let Some(dev_packages) = dev_packages {
246 dependencies.extend(extract_pipfile_dependencies(dev_packages, "develop", false));
247 }
248
249 package_data.dependencies = dependencies;
250 package_data.extra_data = build_pipfile_extra_data(toml_content);
251
252 package_data
253}
254
255fn extract_pipfile_dependencies(
256 packages: &TomlMap<String, TomlValue>,
257 scope: &str,
258 is_runtime: bool,
259) -> Vec<Dependency> {
260 let mut dependencies = Vec::new();
261
262 for (name, value) in packages.iter().take(MAX_ITERATION_COUNT) {
263 if let Some(dependency) = build_pipfile_dependency(name, value, scope, is_runtime) {
264 dependencies.push(dependency);
265 }
266 }
267
268 dependencies
269}
270
271fn build_pipfile_dependency(
272 name: &str,
273 value: &TomlValue,
274 scope: &str,
275 is_runtime: bool,
276) -> Option<Dependency> {
277 let normalized_name = normalize_pypi_name(name);
278 let requirement = extract_pipfile_requirement(value);
279 if requirement.is_none() && is_non_registry_dependency(value) {
280 return None;
281 }
282 let requirement = requirement?;
283 let purl = create_pypi_purl(&normalized_name, None);
284
285 Some(Dependency {
286 purl,
287 extracted_requirement: Some(truncate_field(requirement)),
288 scope: Some(scope.to_string()),
289 is_runtime: Some(is_runtime),
290 is_optional: Some(false),
291 is_pinned: Some(false),
292 is_direct: Some(true),
293 resolved_package: None,
294 extra_data: None,
295 })
296}
297
298fn extract_pipfile_requirement(value: &TomlValue) -> Option<String> {
299 match value {
300 TomlValue::String(spec) => Some(truncate_field(spec.to_string())),
301 TomlValue::Boolean(true) => Some("*".to_string()),
302 TomlValue::Table(table) => table
303 .get(FIELD_VERSION)
304 .and_then(|version| version.as_str())
305 .map(|version| truncate_field(version.to_string())),
306 _ => None,
307 }
308}
309
310fn is_non_registry_dependency(value: &TomlValue) -> bool {
311 let table = match value {
312 TomlValue::Table(table) => table,
313 _ => return false,
314 };
315
316 ["git", "path", "file", "url", "hg", "svn"]
317 .iter()
318 .any(|key| table.contains_key(*key))
319}
320
321fn build_pipfile_extra_data(
322 toml_content: &TomlValue,
323) -> Option<HashMap<String, serde_json::Value>> {
324 let mut extra_data = HashMap::new();
325
326 if let Some(requires_table) = toml_content
327 .get(FIELD_REQUIRES)
328 .and_then(|value| value.as_table())
329 && let Some(python_version) = requires_table
330 .get(FIELD_PYTHON_VERSION)
331 .and_then(|value| value.as_str())
332 {
333 extra_data.insert(
334 FIELD_PYTHON_VERSION.to_string(),
335 serde_json::Value::String(truncate_field(python_version.to_string())),
336 );
337 }
338
339 if let Some(source_value) = toml_content.get(FIELD_SOURCE)
340 && let Some(sources) = parse_pipfile_sources(source_value)
341 {
342 extra_data.insert("sources".to_string(), sources);
343 }
344
345 if extra_data.is_empty() {
346 None
347 } else {
348 Some(extra_data)
349 }
350}
351
352fn parse_pipfile_sources(source_value: &TomlValue) -> Option<serde_json::Value> {
353 match source_value {
354 TomlValue::Array(sources) => {
355 let mut json_sources = Vec::new();
356 for source in sources {
357 if let Some(table) = source.as_table() {
358 let mut json_map = serde_json::Map::new();
359 if let Some(name) = table.get("name").and_then(|value| value.as_str()) {
360 json_map.insert(
361 "name".to_string(),
362 serde_json::Value::String(truncate_field(name.to_string())),
363 );
364 }
365 if let Some(url) = table.get("url").and_then(|value| value.as_str()) {
366 json_map.insert(
367 "url".to_string(),
368 serde_json::Value::String(truncate_field(url.to_string())),
369 );
370 }
371 if let Some(verify_ssl) =
372 table.get("verify_ssl").and_then(|value| value.as_bool())
373 {
374 json_map.insert(
375 "verify_ssl".to_string(),
376 serde_json::Value::Bool(verify_ssl),
377 );
378 }
379 json_sources.push(serde_json::Value::Object(json_map));
380 }
381 }
382
383 Some(serde_json::Value::Array(json_sources))
384 }
385 TomlValue::Table(table) => {
386 let mut json_map = serde_json::Map::new();
387 for (key, value) in table {
388 match value {
389 TomlValue::String(value) => {
390 json_map.insert(
391 key.to_string(),
392 serde_json::Value::String(truncate_field(value.to_string())),
393 );
394 }
395 TomlValue::Boolean(value) => {
396 json_map.insert(key.to_string(), serde_json::Value::Bool(*value));
397 }
398 _ => {}
399 }
400 }
401 Some(serde_json::Value::Object(json_map))
402 }
403 _ => None,
404 }
405}
406
407fn normalize_pypi_name(name: &str) -> String {
408 truncate_field(name.trim().to_ascii_lowercase())
409}
410
411fn create_pypi_purl(name: &str, version: Option<&str>) -> Option<String> {
412 let mut purl = PackageUrl::new(PipfileLockParser::PACKAGE_TYPE.as_str(), name).ok()?;
413 if let Some(version) = version
414 && purl.with_version(version).is_err()
415 {
416 return None;
417 }
418
419 Some(purl.to_string())
420}
421
422fn default_package_data(datasource_id: Option<DatasourceId>) -> PackageData {
423 PackageData {
424 package_type: Some(PipfileLockParser::PACKAGE_TYPE),
425 primary_language: Some("Python".to_string()),
426 datasource_id,
427 ..Default::default()
428 }
429}
430
431crate::register_parser!(
432 "Pipenv lockfile and manifest",
433 &["**/Pipfile.lock", "**/Pipfile"],
434 "pypi",
435 "Python",
436 Some("https://github.com/pypa/pipfile"),
437);