Skip to main content

provenant/parsers/
pipfile_lock.rs

1// SPDX-FileCopyrightText: Provenant contributors
2// SPDX-License-Identifier: Apache-2.0
3
4//! Parser for Pipfile.lock lockfiles.
5//!
6//! Extracts resolved dependency information from Pipfile.lock files which store
7//! locked dependency versions for Python projects using pipenv.
8//!
9//! # Supported Formats
10//! - Pipfile.lock (JSON-based lockfile with per-environment dependency sections)
11//!
12//! # Key Features
13//! - Dependency extraction from default and develop sections
14//! - Direct dependency tracking (top-level locks are direct)
15//! - Exact version resolution with hash verification
16//! - Package URL (purl) generation for PyPI packages
17//! - Markers and extras dependency handling
18//!
19//! # Implementation Notes
20//! - Uses JSON parsing via `serde_json` and TOML for secondary parsing
21//! - All lockfile versions are pinned (`is_pinned: Some(true)`)
22//! - Graceful error handling with `warn!()` logs
23//! - Integrates with Python parser utilities for PyPI URL building
24
25use std::collections::HashMap;
26use std::path::Path;
27
28use crate::parser_warn as warn;
29use packageurl::PackageUrl;
30use serde_json::Value as JsonValue;
31use toml::Value as TomlValue;
32use toml::map::Map as TomlMap;
33
34use crate::models::{DatasourceId, Dependency, PackageData, PackageType, Sha256Digest};
35use crate::parsers::python::read_toml_file;
36use crate::parsers::utils::{MAX_ITERATION_COUNT, read_file_to_string, truncate_field};
37
38use super::PackageParser;
39use super::metadata::ParserMetadata;
40
41const FIELD_META: &str = "_meta";
42const FIELD_HASH: &str = "hash";
43const FIELD_SHA256: &str = "sha256";
44const FIELD_DEFAULT: &str = "default";
45const FIELD_DEVELOP: &str = "develop";
46const FIELD_VERSION: &str = "version";
47const FIELD_HASHES: &str = "hashes";
48
49const FIELD_PACKAGES: &str = "packages";
50const FIELD_DEV_PACKAGES: &str = "dev-packages";
51const FIELD_REQUIRES: &str = "requires";
52const FIELD_SOURCE: &str = "source";
53const FIELD_PYTHON_VERSION: &str = "python_version";
54
55/// Pipenv lockfile and manifest parser for Pipfile.lock and Pipfile files.
56///
57/// Extracts Python package dependencies from Pipenv-managed projects, supporting
58/// both locked versions (Pipfile.lock) and declared dependencies (Pipfile).
59pub struct PipfileLockParser;
60
61impl PackageParser for PipfileLockParser {
62    const PACKAGE_TYPE: PackageType = PackageType::Pypi;
63
64    fn metadata() -> Vec<ParserMetadata> {
65        vec![ParserMetadata {
66            description: "Pipenv lockfile and manifest",
67            file_patterns: &["**/Pipfile.lock", "**/Pipfile"],
68            package_type: "pypi",
69            primary_language: "Python",
70            documentation_url: Some("https://github.com/pypa/pipfile"),
71        }]
72    }
73
74    fn is_match(path: &Path) -> bool {
75        path.file_name()
76            .and_then(|name| name.to_str())
77            .map(|name| name == "Pipfile.lock" || name == "Pipfile")
78            .unwrap_or(false)
79    }
80
81    fn extract_packages(path: &Path) -> Vec<PackageData> {
82        vec![match path.file_name().and_then(|name| name.to_str()) {
83            Some("Pipfile.lock") => extract_from_pipfile_lock(path),
84            Some("Pipfile") => extract_from_pipfile(path),
85            _ => default_package_data(None),
86        }]
87    }
88}
89
90fn extract_from_pipfile_lock(path: &Path) -> PackageData {
91    let content = match read_file_to_string(path, None) {
92        Ok(content) => content,
93        Err(e) => {
94            warn!("Failed to read Pipfile.lock at {:?}: {}", path, e);
95            return default_package_data(Some(DatasourceId::PipfileLock));
96        }
97    };
98
99    let json_content: JsonValue = match serde_json::from_str(&content) {
100        Ok(content) => content,
101        Err(e) => {
102            warn!("Failed to parse Pipfile.lock at {:?}: {}", path, e);
103            return default_package_data(Some(DatasourceId::PipfileLock));
104        }
105    };
106
107    parse_pipfile_lock(&json_content)
108}
109
110fn parse_pipfile_lock(json_content: &JsonValue) -> PackageData {
111    let mut package_data = default_package_data(Some(DatasourceId::PipfileLock));
112    package_data.sha256 = extract_lockfile_sha256(json_content);
113
114    let meta = json_content
115        .get(FIELD_META)
116        .and_then(|value| value.as_object());
117    let pipfile_spec = meta.and_then(|value| value.get("pipfile-spec"));
118    let sources = meta.and_then(|value| value.get("sources"));
119    let requires = meta.and_then(|value| value.get("requires"));
120    let _ = (pipfile_spec, sources, requires);
121
122    let default_deps = extract_lockfile_dependencies(json_content, FIELD_DEFAULT, "install", true);
123    let develop_deps = extract_lockfile_dependencies(json_content, FIELD_DEVELOP, "develop", false);
124    package_data.dependencies = [default_deps, develop_deps].concat();
125
126    package_data
127}
128
129fn extract_lockfile_sha256(json_content: &JsonValue) -> Option<Sha256Digest> {
130    json_content
131        .get(FIELD_META)
132        .and_then(|meta| meta.get(FIELD_HASH))
133        .and_then(|hash| hash.get(FIELD_SHA256))
134        .and_then(|value| value.as_str())
135        .and_then(|s| Sha256Digest::from_hex(s).ok())
136}
137
138fn extract_lockfile_dependencies(
139    json_content: &JsonValue,
140    section: &str,
141    scope: &str,
142    is_runtime: bool,
143) -> Vec<Dependency> {
144    let mut dependencies = Vec::new();
145
146    if let Some(section_map) = json_content
147        .get(section)
148        .and_then(|value| value.as_object())
149    {
150        for (name, value) in section_map.iter().take(MAX_ITERATION_COUNT) {
151            if let Some(dependency) = build_lockfile_dependency(name, value, scope, is_runtime) {
152                dependencies.push(dependency);
153            }
154        }
155    }
156
157    dependencies
158}
159
160fn build_lockfile_dependency(
161    name: &str,
162    value: &JsonValue,
163    scope: &str,
164    is_runtime: bool,
165) -> Option<Dependency> {
166    let normalized_name = normalize_pypi_name(name);
167    let requirement = extract_lockfile_requirement(value)?;
168    let version = strip_pipfile_lock_version(&requirement);
169    let purl = create_pypi_purl(&normalized_name, version.as_deref());
170
171    let _hashes = extract_lockfile_hashes(value);
172
173    Some(Dependency {
174        purl,
175        extracted_requirement: Some(truncate_field(requirement)),
176        scope: Some(scope.to_string()),
177        is_runtime: Some(is_runtime),
178        is_optional: Some(false),
179        is_pinned: Some(true),
180        is_direct: Some(true),
181        resolved_package: None,
182        extra_data: None,
183    })
184}
185
186fn extract_lockfile_requirement(value: &JsonValue) -> Option<String> {
187    match value {
188        JsonValue::String(spec) => Some(truncate_field(spec.to_string())),
189        JsonValue::Object(map) => map
190            .get(FIELD_VERSION)
191            .and_then(|version| version.as_str())
192            .map(|version| truncate_field(version.to_string())),
193        _ => None,
194    }
195}
196
197fn extract_lockfile_hashes(value: &JsonValue) -> Vec<String> {
198    let mut hashes = Vec::new();
199    let hash_values = value
200        .get(FIELD_HASHES)
201        .and_then(|hashes_value| hashes_value.as_array());
202
203    if let Some(hash_values) = hash_values {
204        for hash_value in hash_values {
205            if let Some(hash) = hash_value.as_str()
206                && let Some(stripped) = hash.strip_prefix("sha256:")
207            {
208                hashes.push(truncate_field(stripped.to_string()));
209            }
210        }
211    }
212
213    hashes
214}
215
216fn strip_pipfile_lock_version(requirement: &str) -> Option<String> {
217    let trimmed = requirement.trim();
218    if let Some(stripped) = trimmed.strip_prefix("==") {
219        let version = stripped.trim();
220        if version.is_empty() {
221            None
222        } else {
223            Some(truncate_field(version.to_string()))
224        }
225    } else {
226        None
227    }
228}
229
230fn extract_from_pipfile(path: &Path) -> PackageData {
231    let toml_content = match read_toml_file(path) {
232        Ok(content) => content,
233        Err(e) => {
234            warn!("Failed to read Pipfile at {:?}: {}", path, e);
235            return default_package_data(Some(DatasourceId::Pipfile));
236        }
237    };
238
239    parse_pipfile(&toml_content)
240}
241
242fn parse_pipfile(toml_content: &TomlValue) -> PackageData {
243    let mut package_data = default_package_data(Some(DatasourceId::Pipfile));
244
245    let packages = toml_content
246        .get(FIELD_PACKAGES)
247        .and_then(|value| value.as_table());
248    let dev_packages = toml_content
249        .get(FIELD_DEV_PACKAGES)
250        .and_then(|value| value.as_table());
251
252    let mut dependencies = Vec::new();
253    if let Some(packages) = packages {
254        dependencies.extend(extract_pipfile_dependencies(packages, "install", true));
255    }
256    if let Some(dev_packages) = dev_packages {
257        dependencies.extend(extract_pipfile_dependencies(dev_packages, "develop", false));
258    }
259
260    package_data.dependencies = dependencies;
261    package_data.extra_data = build_pipfile_extra_data(toml_content);
262
263    package_data
264}
265
266fn extract_pipfile_dependencies(
267    packages: &TomlMap<String, TomlValue>,
268    scope: &str,
269    is_runtime: bool,
270) -> Vec<Dependency> {
271    let mut dependencies = Vec::new();
272
273    for (name, value) in packages.iter().take(MAX_ITERATION_COUNT) {
274        if let Some(dependency) = build_pipfile_dependency(name, value, scope, is_runtime) {
275            dependencies.push(dependency);
276        }
277    }
278
279    dependencies
280}
281
282fn build_pipfile_dependency(
283    name: &str,
284    value: &TomlValue,
285    scope: &str,
286    is_runtime: bool,
287) -> Option<Dependency> {
288    let normalized_name = normalize_pypi_name(name);
289    let requirement = extract_pipfile_requirement(value);
290    if requirement.is_none() && is_non_registry_dependency(value) {
291        return None;
292    }
293    let requirement = requirement?;
294    let purl = create_pypi_purl(&normalized_name, None);
295
296    Some(Dependency {
297        purl,
298        extracted_requirement: Some(truncate_field(requirement)),
299        scope: Some(scope.to_string()),
300        is_runtime: Some(is_runtime),
301        is_optional: Some(false),
302        is_pinned: Some(false),
303        is_direct: Some(true),
304        resolved_package: None,
305        extra_data: None,
306    })
307}
308
309fn extract_pipfile_requirement(value: &TomlValue) -> Option<String> {
310    match value {
311        TomlValue::String(spec) => Some(truncate_field(spec.to_string())),
312        TomlValue::Boolean(true) => Some("*".to_string()),
313        TomlValue::Table(table) => table
314            .get(FIELD_VERSION)
315            .and_then(|version| version.as_str())
316            .map(|version| truncate_field(version.to_string())),
317        _ => None,
318    }
319}
320
321fn is_non_registry_dependency(value: &TomlValue) -> bool {
322    let table = match value {
323        TomlValue::Table(table) => table,
324        _ => return false,
325    };
326
327    ["git", "path", "file", "url", "hg", "svn"]
328        .iter()
329        .any(|key| table.contains_key(*key))
330}
331
332fn build_pipfile_extra_data(
333    toml_content: &TomlValue,
334) -> Option<HashMap<String, serde_json::Value>> {
335    let mut extra_data = HashMap::new();
336
337    if let Some(requires_table) = toml_content
338        .get(FIELD_REQUIRES)
339        .and_then(|value| value.as_table())
340        && let Some(python_version) = requires_table
341            .get(FIELD_PYTHON_VERSION)
342            .and_then(|value| value.as_str())
343    {
344        extra_data.insert(
345            FIELD_PYTHON_VERSION.to_string(),
346            serde_json::Value::String(truncate_field(python_version.to_string())),
347        );
348    }
349
350    if let Some(source_value) = toml_content.get(FIELD_SOURCE)
351        && let Some(sources) = parse_pipfile_sources(source_value)
352    {
353        extra_data.insert("sources".to_string(), sources);
354    }
355
356    if extra_data.is_empty() {
357        None
358    } else {
359        Some(extra_data)
360    }
361}
362
363fn parse_pipfile_sources(source_value: &TomlValue) -> Option<serde_json::Value> {
364    match source_value {
365        TomlValue::Array(sources) => {
366            let mut json_sources = Vec::new();
367            for source in sources {
368                if let Some(table) = source.as_table() {
369                    let mut json_map = serde_json::Map::new();
370                    if let Some(name) = table.get("name").and_then(|value| value.as_str()) {
371                        json_map.insert(
372                            "name".to_string(),
373                            serde_json::Value::String(truncate_field(name.to_string())),
374                        );
375                    }
376                    if let Some(url) = table.get("url").and_then(|value| value.as_str()) {
377                        json_map.insert(
378                            "url".to_string(),
379                            serde_json::Value::String(truncate_field(url.to_string())),
380                        );
381                    }
382                    if let Some(verify_ssl) =
383                        table.get("verify_ssl").and_then(|value| value.as_bool())
384                    {
385                        json_map.insert(
386                            "verify_ssl".to_string(),
387                            serde_json::Value::Bool(verify_ssl),
388                        );
389                    }
390                    json_sources.push(serde_json::Value::Object(json_map));
391                }
392            }
393
394            Some(serde_json::Value::Array(json_sources))
395        }
396        TomlValue::Table(table) => {
397            let mut json_map = serde_json::Map::new();
398            for (key, value) in table {
399                match value {
400                    TomlValue::String(value) => {
401                        json_map.insert(
402                            key.to_string(),
403                            serde_json::Value::String(truncate_field(value.to_string())),
404                        );
405                    }
406                    TomlValue::Boolean(value) => {
407                        json_map.insert(key.to_string(), serde_json::Value::Bool(*value));
408                    }
409                    _ => {}
410                }
411            }
412            Some(serde_json::Value::Object(json_map))
413        }
414        _ => None,
415    }
416}
417
418fn normalize_pypi_name(name: &str) -> String {
419    truncate_field(name.trim().to_ascii_lowercase())
420}
421
422fn create_pypi_purl(name: &str, version: Option<&str>) -> Option<String> {
423    let mut purl = PackageUrl::new(PipfileLockParser::PACKAGE_TYPE.as_str(), name).ok()?;
424    if let Some(version) = version
425        && purl.with_version(version).is_err()
426    {
427        return None;
428    }
429
430    Some(purl.to_string())
431}
432
433fn default_package_data(datasource_id: Option<DatasourceId>) -> PackageData {
434    PackageData {
435        package_type: Some(PipfileLockParser::PACKAGE_TYPE),
436        primary_language: Some("Python".to_string()),
437        datasource_id,
438        ..Default::default()
439    }
440}