Skip to main content

provenant/parsers/
pipfile_lock.rs

1// SPDX-FileCopyrightText: Provenant contributors
2// SPDX-License-Identifier: Apache-2.0
3
4//! Parser for Pipfile.lock lockfiles.
5//!
6//! Extracts resolved dependency information from Pipfile.lock files which store
7//! locked dependency versions for Python projects using pipenv.
8//!
9//! # Supported Formats
10//! - Pipfile.lock (JSON-based lockfile with per-environment dependency sections)
11//!
12//! # Key Features
13//! - Dependency extraction from default and develop sections
14//! - Direct dependency tracking (top-level locks are direct)
15//! - Exact version resolution with hash verification
16//! - Package URL (purl) generation for PyPI packages
17//! - Markers and extras dependency handling
18//!
19//! # Implementation Notes
20//! - Uses JSON parsing via `serde_json` and TOML for secondary parsing
21//! - All lockfile versions are pinned (`is_pinned: Some(true)`)
22//! - Graceful error handling with `warn!()` logs
23//! - Integrates with Python parser utilities for PyPI URL building
24
25use std::collections::HashMap;
26use std::path::Path;
27
28use crate::parser_warn as warn;
29use packageurl::PackageUrl;
30use serde_json::Value as JsonValue;
31use toml::Value as TomlValue;
32use toml::map::Map as TomlMap;
33
34use crate::models::{DatasourceId, Dependency, PackageData, PackageType, Sha256Digest};
35use crate::parsers::python::read_toml_file;
36use crate::parsers::utils::{MAX_ITERATION_COUNT, read_file_to_string, truncate_field};
37
38use super::PackageParser;
39
40const FIELD_META: &str = "_meta";
41const FIELD_HASH: &str = "hash";
42const FIELD_SHA256: &str = "sha256";
43const FIELD_DEFAULT: &str = "default";
44const FIELD_DEVELOP: &str = "develop";
45const FIELD_VERSION: &str = "version";
46const FIELD_HASHES: &str = "hashes";
47
48const FIELD_PACKAGES: &str = "packages";
49const FIELD_DEV_PACKAGES: &str = "dev-packages";
50const FIELD_REQUIRES: &str = "requires";
51const FIELD_SOURCE: &str = "source";
52const FIELD_PYTHON_VERSION: &str = "python_version";
53
54/// Pipenv lockfile and manifest parser for Pipfile.lock and Pipfile files.
55///
56/// Extracts Python package dependencies from Pipenv-managed projects, supporting
57/// both locked versions (Pipfile.lock) and declared dependencies (Pipfile).
58pub struct PipfileLockParser;
59
60impl PackageParser for PipfileLockParser {
61    const PACKAGE_TYPE: PackageType = PackageType::Pypi;
62
63    fn is_match(path: &Path) -> bool {
64        path.file_name()
65            .and_then(|name| name.to_str())
66            .map(|name| name == "Pipfile.lock" || name == "Pipfile")
67            .unwrap_or(false)
68    }
69
70    fn extract_packages(path: &Path) -> Vec<PackageData> {
71        vec![match path.file_name().and_then(|name| name.to_str()) {
72            Some("Pipfile.lock") => extract_from_pipfile_lock(path),
73            Some("Pipfile") => extract_from_pipfile(path),
74            _ => default_package_data(None),
75        }]
76    }
77}
78
79fn extract_from_pipfile_lock(path: &Path) -> PackageData {
80    let content = match read_file_to_string(path, None) {
81        Ok(content) => content,
82        Err(e) => {
83            warn!("Failed to read Pipfile.lock at {:?}: {}", path, e);
84            return default_package_data(Some(DatasourceId::PipfileLock));
85        }
86    };
87
88    let json_content: JsonValue = match serde_json::from_str(&content) {
89        Ok(content) => content,
90        Err(e) => {
91            warn!("Failed to parse Pipfile.lock at {:?}: {}", path, e);
92            return default_package_data(Some(DatasourceId::PipfileLock));
93        }
94    };
95
96    parse_pipfile_lock(&json_content)
97}
98
99fn parse_pipfile_lock(json_content: &JsonValue) -> PackageData {
100    let mut package_data = default_package_data(Some(DatasourceId::PipfileLock));
101    package_data.sha256 = extract_lockfile_sha256(json_content);
102
103    let meta = json_content
104        .get(FIELD_META)
105        .and_then(|value| value.as_object());
106    let pipfile_spec = meta.and_then(|value| value.get("pipfile-spec"));
107    let sources = meta.and_then(|value| value.get("sources"));
108    let requires = meta.and_then(|value| value.get("requires"));
109    let _ = (pipfile_spec, sources, requires);
110
111    let default_deps = extract_lockfile_dependencies(json_content, FIELD_DEFAULT, "install", true);
112    let develop_deps = extract_lockfile_dependencies(json_content, FIELD_DEVELOP, "develop", false);
113    package_data.dependencies = [default_deps, develop_deps].concat();
114
115    package_data
116}
117
118fn extract_lockfile_sha256(json_content: &JsonValue) -> Option<Sha256Digest> {
119    json_content
120        .get(FIELD_META)
121        .and_then(|meta| meta.get(FIELD_HASH))
122        .and_then(|hash| hash.get(FIELD_SHA256))
123        .and_then(|value| value.as_str())
124        .and_then(|s| Sha256Digest::from_hex(s).ok())
125}
126
127fn extract_lockfile_dependencies(
128    json_content: &JsonValue,
129    section: &str,
130    scope: &str,
131    is_runtime: bool,
132) -> Vec<Dependency> {
133    let mut dependencies = Vec::new();
134
135    if let Some(section_map) = json_content
136        .get(section)
137        .and_then(|value| value.as_object())
138    {
139        for (name, value) in section_map.iter().take(MAX_ITERATION_COUNT) {
140            if let Some(dependency) = build_lockfile_dependency(name, value, scope, is_runtime) {
141                dependencies.push(dependency);
142            }
143        }
144    }
145
146    dependencies
147}
148
149fn build_lockfile_dependency(
150    name: &str,
151    value: &JsonValue,
152    scope: &str,
153    is_runtime: bool,
154) -> Option<Dependency> {
155    let normalized_name = normalize_pypi_name(name);
156    let requirement = extract_lockfile_requirement(value)?;
157    let version = strip_pipfile_lock_version(&requirement);
158    let purl = create_pypi_purl(&normalized_name, version.as_deref());
159
160    let _hashes = extract_lockfile_hashes(value);
161
162    Some(Dependency {
163        purl,
164        extracted_requirement: Some(truncate_field(requirement)),
165        scope: Some(scope.to_string()),
166        is_runtime: Some(is_runtime),
167        is_optional: Some(false),
168        is_pinned: Some(true),
169        is_direct: Some(true),
170        resolved_package: None,
171        extra_data: None,
172    })
173}
174
175fn extract_lockfile_requirement(value: &JsonValue) -> Option<String> {
176    match value {
177        JsonValue::String(spec) => Some(truncate_field(spec.to_string())),
178        JsonValue::Object(map) => map
179            .get(FIELD_VERSION)
180            .and_then(|version| version.as_str())
181            .map(|version| truncate_field(version.to_string())),
182        _ => None,
183    }
184}
185
186fn extract_lockfile_hashes(value: &JsonValue) -> Vec<String> {
187    let mut hashes = Vec::new();
188    let hash_values = value
189        .get(FIELD_HASHES)
190        .and_then(|hashes_value| hashes_value.as_array());
191
192    if let Some(hash_values) = hash_values {
193        for hash_value in hash_values {
194            if let Some(hash) = hash_value.as_str()
195                && let Some(stripped) = hash.strip_prefix("sha256:")
196            {
197                hashes.push(truncate_field(stripped.to_string()));
198            }
199        }
200    }
201
202    hashes
203}
204
205fn strip_pipfile_lock_version(requirement: &str) -> Option<String> {
206    let trimmed = requirement.trim();
207    if let Some(stripped) = trimmed.strip_prefix("==") {
208        let version = stripped.trim();
209        if version.is_empty() {
210            None
211        } else {
212            Some(truncate_field(version.to_string()))
213        }
214    } else {
215        None
216    }
217}
218
219fn extract_from_pipfile(path: &Path) -> PackageData {
220    let toml_content = match read_toml_file(path) {
221        Ok(content) => content,
222        Err(e) => {
223            warn!("Failed to read Pipfile at {:?}: {}", path, e);
224            return default_package_data(Some(DatasourceId::Pipfile));
225        }
226    };
227
228    parse_pipfile(&toml_content)
229}
230
231fn parse_pipfile(toml_content: &TomlValue) -> PackageData {
232    let mut package_data = default_package_data(Some(DatasourceId::Pipfile));
233
234    let packages = toml_content
235        .get(FIELD_PACKAGES)
236        .and_then(|value| value.as_table());
237    let dev_packages = toml_content
238        .get(FIELD_DEV_PACKAGES)
239        .and_then(|value| value.as_table());
240
241    let mut dependencies = Vec::new();
242    if let Some(packages) = packages {
243        dependencies.extend(extract_pipfile_dependencies(packages, "install", true));
244    }
245    if let Some(dev_packages) = dev_packages {
246        dependencies.extend(extract_pipfile_dependencies(dev_packages, "develop", false));
247    }
248
249    package_data.dependencies = dependencies;
250    package_data.extra_data = build_pipfile_extra_data(toml_content);
251
252    package_data
253}
254
255fn extract_pipfile_dependencies(
256    packages: &TomlMap<String, TomlValue>,
257    scope: &str,
258    is_runtime: bool,
259) -> Vec<Dependency> {
260    let mut dependencies = Vec::new();
261
262    for (name, value) in packages.iter().take(MAX_ITERATION_COUNT) {
263        if let Some(dependency) = build_pipfile_dependency(name, value, scope, is_runtime) {
264            dependencies.push(dependency);
265        }
266    }
267
268    dependencies
269}
270
271fn build_pipfile_dependency(
272    name: &str,
273    value: &TomlValue,
274    scope: &str,
275    is_runtime: bool,
276) -> Option<Dependency> {
277    let normalized_name = normalize_pypi_name(name);
278    let requirement = extract_pipfile_requirement(value);
279    if requirement.is_none() && is_non_registry_dependency(value) {
280        return None;
281    }
282    let requirement = requirement?;
283    let purl = create_pypi_purl(&normalized_name, None);
284
285    Some(Dependency {
286        purl,
287        extracted_requirement: Some(truncate_field(requirement)),
288        scope: Some(scope.to_string()),
289        is_runtime: Some(is_runtime),
290        is_optional: Some(false),
291        is_pinned: Some(false),
292        is_direct: Some(true),
293        resolved_package: None,
294        extra_data: None,
295    })
296}
297
298fn extract_pipfile_requirement(value: &TomlValue) -> Option<String> {
299    match value {
300        TomlValue::String(spec) => Some(truncate_field(spec.to_string())),
301        TomlValue::Boolean(true) => Some("*".to_string()),
302        TomlValue::Table(table) => table
303            .get(FIELD_VERSION)
304            .and_then(|version| version.as_str())
305            .map(|version| truncate_field(version.to_string())),
306        _ => None,
307    }
308}
309
310fn is_non_registry_dependency(value: &TomlValue) -> bool {
311    let table = match value {
312        TomlValue::Table(table) => table,
313        _ => return false,
314    };
315
316    ["git", "path", "file", "url", "hg", "svn"]
317        .iter()
318        .any(|key| table.contains_key(*key))
319}
320
321fn build_pipfile_extra_data(
322    toml_content: &TomlValue,
323) -> Option<HashMap<String, serde_json::Value>> {
324    let mut extra_data = HashMap::new();
325
326    if let Some(requires_table) = toml_content
327        .get(FIELD_REQUIRES)
328        .and_then(|value| value.as_table())
329        && let Some(python_version) = requires_table
330            .get(FIELD_PYTHON_VERSION)
331            .and_then(|value| value.as_str())
332    {
333        extra_data.insert(
334            FIELD_PYTHON_VERSION.to_string(),
335            serde_json::Value::String(truncate_field(python_version.to_string())),
336        );
337    }
338
339    if let Some(source_value) = toml_content.get(FIELD_SOURCE)
340        && let Some(sources) = parse_pipfile_sources(source_value)
341    {
342        extra_data.insert("sources".to_string(), sources);
343    }
344
345    if extra_data.is_empty() {
346        None
347    } else {
348        Some(extra_data)
349    }
350}
351
352fn parse_pipfile_sources(source_value: &TomlValue) -> Option<serde_json::Value> {
353    match source_value {
354        TomlValue::Array(sources) => {
355            let mut json_sources = Vec::new();
356            for source in sources {
357                if let Some(table) = source.as_table() {
358                    let mut json_map = serde_json::Map::new();
359                    if let Some(name) = table.get("name").and_then(|value| value.as_str()) {
360                        json_map.insert(
361                            "name".to_string(),
362                            serde_json::Value::String(truncate_field(name.to_string())),
363                        );
364                    }
365                    if let Some(url) = table.get("url").and_then(|value| value.as_str()) {
366                        json_map.insert(
367                            "url".to_string(),
368                            serde_json::Value::String(truncate_field(url.to_string())),
369                        );
370                    }
371                    if let Some(verify_ssl) =
372                        table.get("verify_ssl").and_then(|value| value.as_bool())
373                    {
374                        json_map.insert(
375                            "verify_ssl".to_string(),
376                            serde_json::Value::Bool(verify_ssl),
377                        );
378                    }
379                    json_sources.push(serde_json::Value::Object(json_map));
380                }
381            }
382
383            Some(serde_json::Value::Array(json_sources))
384        }
385        TomlValue::Table(table) => {
386            let mut json_map = serde_json::Map::new();
387            for (key, value) in table {
388                match value {
389                    TomlValue::String(value) => {
390                        json_map.insert(
391                            key.to_string(),
392                            serde_json::Value::String(truncate_field(value.to_string())),
393                        );
394                    }
395                    TomlValue::Boolean(value) => {
396                        json_map.insert(key.to_string(), serde_json::Value::Bool(*value));
397                    }
398                    _ => {}
399                }
400            }
401            Some(serde_json::Value::Object(json_map))
402        }
403        _ => None,
404    }
405}
406
407fn normalize_pypi_name(name: &str) -> String {
408    truncate_field(name.trim().to_ascii_lowercase())
409}
410
411fn create_pypi_purl(name: &str, version: Option<&str>) -> Option<String> {
412    let mut purl = PackageUrl::new(PipfileLockParser::PACKAGE_TYPE.as_str(), name).ok()?;
413    if let Some(version) = version
414        && purl.with_version(version).is_err()
415    {
416        return None;
417    }
418
419    Some(purl.to_string())
420}
421
422fn default_package_data(datasource_id: Option<DatasourceId>) -> PackageData {
423    PackageData {
424        package_type: Some(PipfileLockParser::PACKAGE_TYPE),
425        primary_language: Some("Python".to_string()),
426        datasource_id,
427        ..Default::default()
428    }
429}
430
431crate::register_parser!(
432    "Pipenv lockfile and manifest",
433    &["**/Pipfile.lock", "**/Pipfile"],
434    "pypi",
435    "Python",
436    Some("https://github.com/pypa/pipfile"),
437);