Skip to main content

provenant/parsers/
poetry_lock.rs

1// SPDX-FileCopyrightText: Provenant contributors
2// SPDX-License-Identifier: Apache-2.0
3
4//! Parser for Poetry poetry.lock lockfiles.
5//!
6//! Extracts resolved dependency information from Poetry lockfiles which use TOML format
7//! to store resolved versions and metadata for Python dependencies.
8//!
9//! # Supported Formats
10//! - poetry.lock (TOML-based lockfile with package metadata)
11//!
12//! # Key Features
13//! - Direct vs transitive dependency tracking via `is_direct` flag
14//! - Dependency groups support (main, dev, etc.) via scope field
15//! - Dependency resolution with exact versions
16//! - Package URL (purl) generation for PyPI packages
17//! - Extra dependencies and optional package handling
18//!
19//! # Implementation Notes
20//! - Uses TOML parsing via `toml` crate
21//! - All lockfile versions are pinned (`is_pinned: Some(true)`)
22//! - Graceful error handling with `warn!()` logs
23//! - Integrates with Python parser utilities for PyPI URL building
24
25use std::collections::HashMap;
26use std::path::Path;
27
28use crate::parser_warn as warn;
29use packageurl::PackageUrl;
30use toml::Value as TomlValue;
31use toml::map::Map as TomlMap;
32
33use crate::models::{
34    DatasourceId, Dependency, PackageData, PackageType, ResolvedPackage, Sha256Digest,
35};
36use crate::parsers::python::{build_pypi_urls, read_toml_file};
37use crate::parsers::utils::{MAX_ITERATION_COUNT, truncate_field};
38
39use super::PackageParser;
40
41const FIELD_PACKAGE: &str = "package";
42const FIELD_METADATA: &str = "metadata";
43const FIELD_NAME: &str = "name";
44const FIELD_VERSION: &str = "version";
45const FIELD_PYTHON_VERSIONS: &str = "python-versions";
46const FIELD_DEPENDENCIES: &str = "dependencies";
47const FIELD_EXTRAS: &str = "extras";
48const FIELD_LOCK_VERSION: &str = "lock-version";
49
50/// Poetry lockfile parser for poetry.lock files.
51///
52/// Extracts pinned Python package dependencies from Poetry-managed projects.
53pub struct PoetryLockParser;
54
55impl PackageParser for PoetryLockParser {
56    const PACKAGE_TYPE: PackageType = PackageType::Pypi;
57
58    fn is_match(path: &Path) -> bool {
59        path.file_name()
60            .and_then(|name| name.to_str())
61            .map(|name| name == "poetry.lock")
62            .unwrap_or(false)
63    }
64
65    fn extract_packages(path: &Path) -> Vec<PackageData> {
66        let toml_content = match read_toml_file(path) {
67            Ok(content) => content,
68            Err(e) => {
69                warn!("Failed to read poetry.lock at {:?}: {}", path, e);
70                return vec![default_package_data()];
71            }
72        };
73
74        vec![parse_poetry_lock(&toml_content)]
75    }
76}
77
78fn parse_poetry_lock(toml_content: &TomlValue) -> PackageData {
79    let packages = toml_content
80        .get(FIELD_PACKAGE)
81        .and_then(|value| value.as_array())
82        .cloned()
83        .unwrap_or_default();
84
85    let metadata = toml_content
86        .get(FIELD_METADATA)
87        .and_then(|value| value.as_table());
88
89    let mut dependencies = Vec::new();
90    for package in packages.iter().take(MAX_ITERATION_COUNT) {
91        if let Some(package_table) = package.as_table()
92            && let Some(dependency) = build_dependency_from_package(package_table)
93        {
94            dependencies.push(dependency);
95        }
96    }
97
98    PackageData {
99        package_type: Some(PoetryLockParser::PACKAGE_TYPE),
100        namespace: None,
101        name: None,
102        version: None,
103        qualifiers: None,
104        subpath: None,
105        primary_language: Some("Python".to_string()),
106        description: None,
107        release_date: None,
108        parties: Vec::new(),
109        keywords: Vec::new(),
110        homepage_url: None,
111        download_url: None,
112        size: None,
113        sha1: None,
114        md5: None,
115        sha256: None,
116        sha512: None,
117        bug_tracking_url: None,
118        code_view_url: None,
119        vcs_url: None,
120        copyright: None,
121        holder: None,
122        declared_license_expression: None,
123        declared_license_expression_spdx: None,
124        license_detections: Vec::new(),
125        other_license_expression: None,
126        other_license_expression_spdx: None,
127        other_license_detections: Vec::new(),
128        extracted_license_statement: None,
129        notice_text: None,
130        source_packages: Vec::new(),
131        file_references: Vec::new(),
132        is_private: false,
133        is_virtual: false,
134        extra_data: build_metadata_extra_data(metadata),
135        dependencies,
136        repository_homepage_url: None,
137        repository_download_url: None,
138        api_data_url: None,
139        datasource_id: Some(DatasourceId::PypiPoetryLock),
140        purl: None,
141    }
142}
143
144fn build_metadata_extra_data(
145    metadata: Option<&TomlMap<String, TomlValue>>,
146) -> Option<HashMap<String, serde_json::Value>> {
147    let mut extra_data = HashMap::new();
148
149    if let Some(metadata) = metadata {
150        if let Some(python_versions) = metadata
151            .get(FIELD_PYTHON_VERSIONS)
152            .and_then(|value| value.as_str())
153            && !python_versions.is_empty()
154        {
155            extra_data.insert(
156                "python_version".to_string(),
157                serde_json::Value::String(truncate_field(python_versions.to_string())),
158            );
159        }
160
161        if let Some(lock_version) = metadata.get(FIELD_LOCK_VERSION) {
162            let lock_version = lock_version
163                .as_str()
164                .map(|value| value.to_string())
165                .or_else(|| lock_version.as_integer().map(|value| value.to_string()));
166
167            if let Some(lock_version) = lock_version
168                && !lock_version.is_empty()
169            {
170                extra_data.insert(
171                    "lock_version".to_string(),
172                    serde_json::Value::String(truncate_field(lock_version)),
173                );
174            }
175        }
176    }
177
178    if extra_data.is_empty() {
179        None
180    } else {
181        Some(extra_data)
182    }
183}
184
185fn build_dependency_from_package(package_table: &TomlMap<String, TomlValue>) -> Option<Dependency> {
186    let name = package_table
187        .get(FIELD_NAME)
188        .and_then(|value| value.as_str())
189        .map(normalize_pypi_name)
190        .map(truncate_field)?;
191
192    let version = package_table
193        .get(FIELD_VERSION)
194        .and_then(|value| value.as_str())
195        .map(|value| truncate_field(value.to_string()))?;
196
197    let purl = create_pypi_purl(&name, Some(&version));
198
199    let resolved_package = build_resolved_package(package_table, &name, &version);
200
201    let poetry_optional = package_table
202        .get("optional")
203        .and_then(|value| value.as_bool())
204        .unwrap_or(false);
205
206    let extra_data = Some(HashMap::from([(
207        "poetry_optional".to_string(),
208        serde_json::Value::Bool(poetry_optional),
209    )]));
210
211    Some(Dependency {
212        purl,
213        extracted_requirement: None,
214        scope: None,
215        is_runtime: None,
216        is_optional: None,
217        is_pinned: Some(true),
218        is_direct: None,
219        resolved_package: Some(Box::new(resolved_package)),
220        extra_data,
221    })
222}
223
224fn build_resolved_package(
225    package_table: &TomlMap<String, TomlValue>,
226    name: &str,
227    version: &str,
228) -> ResolvedPackage {
229    let dependencies = extract_package_dependencies(package_table);
230
231    let urls = build_pypi_urls(Some(name), Some(version));
232
233    let repository_homepage_url = urls.repository_homepage_url.map(truncate_field);
234    let repository_download_url = urls.repository_download_url.map(truncate_field);
235    let api_data_url = urls.api_data_url.map(truncate_field);
236    let purl = urls.purl.map(truncate_field);
237
238    // Extract sha256 hash from files array (first file's hash)
239    let sha256 = extract_sha256_from_files(package_table);
240
241    ResolvedPackage {
242        primary_language: Some("Python".to_string()),
243        download_url: None,
244        sha1: None,
245        sha256: sha256.and_then(|h| Sha256Digest::from_hex(&h).ok()),
246        sha512: None,
247        md5: None,
248        is_virtual: true,
249        extra_data: None,
250        dependencies,
251        repository_homepage_url,
252        repository_download_url,
253        api_data_url,
254        datasource_id: Some(DatasourceId::PypiPoetryLock),
255        purl,
256        ..ResolvedPackage::new(
257            PoetryLockParser::PACKAGE_TYPE,
258            String::new(),
259            truncate_field(name.to_string()),
260            truncate_field(version.to_string()),
261        )
262    }
263}
264
265fn extract_package_dependencies(package_table: &TomlMap<String, TomlValue>) -> Vec<Dependency> {
266    let mut dependencies = Vec::new();
267
268    if let Some(dep_table) = package_table
269        .get(FIELD_DEPENDENCIES)
270        .and_then(|value| value.as_table())
271    {
272        for (dep_name, dep_value) in dep_table.iter().take(MAX_ITERATION_COUNT) {
273            if let Some(dependency) = build_dependency_from_table(dep_name, dep_value) {
274                dependencies.push(dependency);
275            }
276        }
277    }
278
279    if let Some(extras_table) = package_table
280        .get(FIELD_EXTRAS)
281        .and_then(|value| value.as_table())
282    {
283        for (extra_name, extra_values) in extras_table.iter().take(MAX_ITERATION_COUNT) {
284            if let Some(extra_list) = extra_values.as_array() {
285                for extra in extra_list.iter().take(MAX_ITERATION_COUNT) {
286                    if let Some(spec) = extra.as_str()
287                        && let Some(dependency) = build_dependency_from_extra(extra_name, spec)
288                    {
289                        dependencies.push(dependency);
290                    }
291                }
292            }
293        }
294    }
295
296    dependencies
297}
298
299fn build_dependency_from_table(dep_name: &str, dep_value: &TomlValue) -> Option<Dependency> {
300    let (requirement, is_optional) = match dep_value {
301        TomlValue::String(value) => (Some(truncate_field(value.to_string())), false),
302        TomlValue::Table(table) => (
303            table
304                .get(FIELD_VERSION)
305                .and_then(|value| value.as_str())
306                .map(|value| truncate_field(value.to_string())),
307            table
308                .get("optional")
309                .and_then(|value| value.as_bool())
310                .unwrap_or(false),
311        ),
312        _ => (None, false),
313    };
314
315    let normalized_name = normalize_pypi_name(dep_name);
316    let purl = create_pypi_purl(&normalized_name, None);
317
318    Some(Dependency {
319        purl,
320        extracted_requirement: requirement,
321        scope: Some(truncate_field(FIELD_DEPENDENCIES.to_string())),
322        is_runtime: Some(true),
323        is_optional: Some(is_optional),
324        is_pinned: Some(false),
325        is_direct: Some(true),
326        resolved_package: None,
327        extra_data: None,
328    })
329}
330
331fn build_dependency_from_extra(extra_name: &str, spec: &str) -> Option<Dependency> {
332    let (name, requirement) = parse_poetry_dependency_spec(spec)?;
333    let purl = create_pypi_purl(&name, None);
334
335    Some(Dependency {
336        purl,
337        extracted_requirement: requirement,
338        scope: Some(truncate_field(extra_name.to_string())),
339        is_runtime: None,
340        is_optional: Some(true),
341        is_pinned: Some(false),
342        is_direct: Some(true),
343        resolved_package: None,
344        extra_data: None,
345    })
346}
347
348fn parse_poetry_dependency_spec(spec: &str) -> Option<(String, Option<String>)> {
349    let trimmed = spec.trim();
350    if trimmed.is_empty() {
351        return None;
352    }
353
354    if let Some(paren_pos) = trimmed.find(" (") {
355        let name_part = trimmed[..paren_pos].trim();
356        let requirement_part = trimmed[paren_pos + 2..].trim();
357        let requirement = requirement_part.trim_end_matches(')').trim();
358        if name_part.is_empty() {
359            return None;
360        }
361        let normalized_name = truncate_field(normalize_pypi_name(name_part));
362        let requirement = if requirement.is_empty() {
363            None
364        } else {
365            Some(truncate_field(requirement.to_string()))
366        };
367        return Some((normalized_name, requirement));
368    }
369
370    Some((truncate_field(normalize_pypi_name(trimmed)), None))
371}
372
373fn normalize_pypi_name(name: &str) -> String {
374    name.trim().to_ascii_lowercase()
375}
376
377fn create_pypi_purl(name: &str, version: Option<&str>) -> Option<String> {
378    if name.contains('[') || name.contains(']') {
379        return Some(truncate_field(build_manual_pypi_purl(name, version)));
380    }
381
382    if let Ok(mut purl) = PackageUrl::new(PoetryLockParser::PACKAGE_TYPE.as_str(), name) {
383        if let Some(version) = version
384            && purl.with_version(version).is_err()
385        {
386            return None;
387        }
388        return Some(truncate_field(purl.to_string()));
389    }
390
391    Some(truncate_field(build_manual_pypi_purl(name, version)))
392}
393
394fn build_manual_pypi_purl(name: &str, version: Option<&str>) -> String {
395    let encoded_name = encode_pypi_name(name);
396    let mut purl = format!("pkg:pypi/{}", encoded_name);
397    if let Some(version) = version
398        && !version.is_empty()
399    {
400        purl.push('@');
401        purl.push_str(version);
402    }
403    purl
404}
405
406fn encode_pypi_name(name: &str) -> String {
407    name.replace('[', "%5b").replace(']', "%5d")
408}
409
410fn extract_sha256_from_files(package_table: &TomlMap<String, TomlValue>) -> Option<String> {
411    package_table
412        .get("files")
413        .and_then(|files| files.as_array())
414        .and_then(|files_array| files_array.first())
415        .and_then(|first_file| first_file.as_table())
416        .and_then(|file_table| file_table.get("hash"))
417        .and_then(|hash_value| hash_value.as_str())
418        .and_then(|hash_str| {
419            hash_str
420                .strip_prefix("sha256:")
421                .map(|s| truncate_field(s.to_string()))
422        })
423}
424
425fn default_package_data() -> PackageData {
426    PackageData {
427        package_type: Some(PoetryLockParser::PACKAGE_TYPE),
428        primary_language: Some("Python".to_string()),
429        datasource_id: Some(DatasourceId::PypiPoetryLock),
430        ..Default::default()
431    }
432}
433
434crate::register_parser!(
435    "Poetry lockfile",
436    &["**/poetry.lock"],
437    "pypi",
438    "Python",
439    Some("https://python-poetry.org/docs/basic-usage/#installing-with-poetrylock"),
440);