Skip to main content

provenant/parsers/
poetry_lock.rs

1//! Parser for Poetry poetry.lock lockfiles.
2//!
3//! Extracts resolved dependency information from Poetry lockfiles which use TOML format
4//! to store resolved versions and metadata for Python dependencies.
5//!
6//! # Supported Formats
7//! - poetry.lock (TOML-based lockfile with package metadata)
8//!
9//! # Key Features
10//! - Direct vs transitive dependency tracking via `is_direct` flag
11//! - Dependency groups support (main, dev, etc.) via scope field
12//! - Dependency resolution with exact versions
13//! - Package URL (purl) generation for PyPI packages
14//! - Extra dependencies and optional package handling
15//!
16//! # Implementation Notes
17//! - Uses TOML parsing via `toml` crate
18//! - All lockfile versions are pinned (`is_pinned: Some(true)`)
19//! - Graceful error handling with `warn!()` logs
20//! - Integrates with Python parser utilities for PyPI URL building
21
22use std::collections::HashMap;
23use std::path::Path;
24
25use crate::parser_warn as warn;
26use packageurl::PackageUrl;
27use toml::Value as TomlValue;
28use toml::map::Map as TomlMap;
29
30use crate::models::{
31    DatasourceId, Dependency, PackageData, PackageType, ResolvedPackage, Sha256Digest,
32};
33use crate::parsers::python::{build_pypi_urls, read_toml_file};
34use crate::parsers::utils::{MAX_ITERATION_COUNT, truncate_field};
35
36use super::PackageParser;
37
38const FIELD_PACKAGE: &str = "package";
39const FIELD_METADATA: &str = "metadata";
40const FIELD_NAME: &str = "name";
41const FIELD_VERSION: &str = "version";
42const FIELD_PYTHON_VERSIONS: &str = "python-versions";
43const FIELD_DEPENDENCIES: &str = "dependencies";
44const FIELD_EXTRAS: &str = "extras";
45const FIELD_LOCK_VERSION: &str = "lock-version";
46
47/// Poetry lockfile parser for poetry.lock files.
48///
49/// Extracts pinned Python package dependencies from Poetry-managed projects.
50pub struct PoetryLockParser;
51
52impl PackageParser for PoetryLockParser {
53    const PACKAGE_TYPE: PackageType = PackageType::Pypi;
54
55    fn is_match(path: &Path) -> bool {
56        path.file_name()
57            .and_then(|name| name.to_str())
58            .map(|name| name == "poetry.lock")
59            .unwrap_or(false)
60    }
61
62    fn extract_packages(path: &Path) -> Vec<PackageData> {
63        let toml_content = match read_toml_file(path) {
64            Ok(content) => content,
65            Err(e) => {
66                warn!("Failed to read poetry.lock at {:?}: {}", path, e);
67                return vec![default_package_data()];
68            }
69        };
70
71        vec![parse_poetry_lock(&toml_content)]
72    }
73}
74
75fn parse_poetry_lock(toml_content: &TomlValue) -> PackageData {
76    let packages = toml_content
77        .get(FIELD_PACKAGE)
78        .and_then(|value| value.as_array())
79        .cloned()
80        .unwrap_or_default();
81
82    let metadata = toml_content
83        .get(FIELD_METADATA)
84        .and_then(|value| value.as_table());
85
86    let mut dependencies = Vec::new();
87    for package in packages.iter().take(MAX_ITERATION_COUNT) {
88        if let Some(package_table) = package.as_table()
89            && let Some(dependency) = build_dependency_from_package(package_table)
90        {
91            dependencies.push(dependency);
92        }
93    }
94
95    PackageData {
96        package_type: Some(PoetryLockParser::PACKAGE_TYPE),
97        namespace: None,
98        name: None,
99        version: None,
100        qualifiers: None,
101        subpath: None,
102        primary_language: Some("Python".to_string()),
103        description: None,
104        release_date: None,
105        parties: Vec::new(),
106        keywords: Vec::new(),
107        homepage_url: None,
108        download_url: None,
109        size: None,
110        sha1: None,
111        md5: None,
112        sha256: None,
113        sha512: None,
114        bug_tracking_url: None,
115        code_view_url: None,
116        vcs_url: None,
117        copyright: None,
118        holder: None,
119        declared_license_expression: None,
120        declared_license_expression_spdx: None,
121        license_detections: Vec::new(),
122        other_license_expression: None,
123        other_license_expression_spdx: None,
124        other_license_detections: Vec::new(),
125        extracted_license_statement: None,
126        notice_text: None,
127        source_packages: Vec::new(),
128        file_references: Vec::new(),
129        is_private: false,
130        is_virtual: false,
131        extra_data: build_metadata_extra_data(metadata),
132        dependencies,
133        repository_homepage_url: None,
134        repository_download_url: None,
135        api_data_url: None,
136        datasource_id: Some(DatasourceId::PypiPoetryLock),
137        purl: None,
138    }
139}
140
141fn build_metadata_extra_data(
142    metadata: Option<&TomlMap<String, TomlValue>>,
143) -> Option<HashMap<String, serde_json::Value>> {
144    let mut extra_data = HashMap::new();
145
146    if let Some(metadata) = metadata {
147        if let Some(python_versions) = metadata
148            .get(FIELD_PYTHON_VERSIONS)
149            .and_then(|value| value.as_str())
150            && !python_versions.is_empty()
151        {
152            extra_data.insert(
153                "python_version".to_string(),
154                serde_json::Value::String(truncate_field(python_versions.to_string())),
155            );
156        }
157
158        if let Some(lock_version) = metadata.get(FIELD_LOCK_VERSION) {
159            let lock_version = lock_version
160                .as_str()
161                .map(|value| value.to_string())
162                .or_else(|| lock_version.as_integer().map(|value| value.to_string()));
163
164            if let Some(lock_version) = lock_version
165                && !lock_version.is_empty()
166            {
167                extra_data.insert(
168                    "lock_version".to_string(),
169                    serde_json::Value::String(truncate_field(lock_version)),
170                );
171            }
172        }
173    }
174
175    if extra_data.is_empty() {
176        None
177    } else {
178        Some(extra_data)
179    }
180}
181
182fn build_dependency_from_package(package_table: &TomlMap<String, TomlValue>) -> Option<Dependency> {
183    let name = package_table
184        .get(FIELD_NAME)
185        .and_then(|value| value.as_str())
186        .map(normalize_pypi_name)
187        .map(truncate_field)?;
188
189    let version = package_table
190        .get(FIELD_VERSION)
191        .and_then(|value| value.as_str())
192        .map(|value| truncate_field(value.to_string()))?;
193
194    let purl = create_pypi_purl(&name, Some(&version));
195
196    let resolved_package = build_resolved_package(package_table, &name, &version);
197
198    let poetry_optional = package_table
199        .get("optional")
200        .and_then(|value| value.as_bool())
201        .unwrap_or(false);
202
203    let extra_data = Some(HashMap::from([(
204        "poetry_optional".to_string(),
205        serde_json::Value::Bool(poetry_optional),
206    )]));
207
208    Some(Dependency {
209        purl,
210        extracted_requirement: None,
211        scope: None,
212        is_runtime: None,
213        is_optional: None,
214        is_pinned: Some(true),
215        is_direct: None,
216        resolved_package: Some(Box::new(resolved_package)),
217        extra_data,
218    })
219}
220
221fn build_resolved_package(
222    package_table: &TomlMap<String, TomlValue>,
223    name: &str,
224    version: &str,
225) -> ResolvedPackage {
226    let dependencies = extract_package_dependencies(package_table);
227
228    let (repository_homepage_url, repository_download_url, api_data_url, purl) =
229        build_pypi_urls(Some(name), Some(version));
230
231    let repository_homepage_url = repository_homepage_url.map(truncate_field);
232    let repository_download_url = repository_download_url.map(truncate_field);
233    let api_data_url = api_data_url.map(truncate_field);
234    let purl = purl.map(truncate_field);
235
236    // Extract sha256 hash from files array (first file's hash)
237    let sha256 = extract_sha256_from_files(package_table);
238
239    ResolvedPackage {
240        primary_language: Some("Python".to_string()),
241        download_url: None,
242        sha1: None,
243        sha256: sha256.and_then(|h| Sha256Digest::from_hex(&h).ok()),
244        sha512: None,
245        md5: None,
246        is_virtual: true,
247        extra_data: None,
248        dependencies,
249        repository_homepage_url,
250        repository_download_url,
251        api_data_url,
252        datasource_id: Some(DatasourceId::PypiPoetryLock),
253        purl,
254        ..ResolvedPackage::new(
255            PoetryLockParser::PACKAGE_TYPE,
256            String::new(),
257            truncate_field(name.to_string()),
258            truncate_field(version.to_string()),
259        )
260    }
261}
262
263fn extract_package_dependencies(package_table: &TomlMap<String, TomlValue>) -> Vec<Dependency> {
264    let mut dependencies = Vec::new();
265
266    if let Some(dep_table) = package_table
267        .get(FIELD_DEPENDENCIES)
268        .and_then(|value| value.as_table())
269    {
270        for (dep_name, dep_value) in dep_table.iter().take(MAX_ITERATION_COUNT) {
271            if let Some(dependency) = build_dependency_from_table(dep_name, dep_value) {
272                dependencies.push(dependency);
273            }
274        }
275    }
276
277    if let Some(extras_table) = package_table
278        .get(FIELD_EXTRAS)
279        .and_then(|value| value.as_table())
280    {
281        for (extra_name, extra_values) in extras_table.iter().take(MAX_ITERATION_COUNT) {
282            if let Some(extra_list) = extra_values.as_array() {
283                for extra in extra_list.iter().take(MAX_ITERATION_COUNT) {
284                    if let Some(spec) = extra.as_str()
285                        && let Some(dependency) = build_dependency_from_extra(extra_name, spec)
286                    {
287                        dependencies.push(dependency);
288                    }
289                }
290            }
291        }
292    }
293
294    dependencies
295}
296
297fn build_dependency_from_table(dep_name: &str, dep_value: &TomlValue) -> Option<Dependency> {
298    let (requirement, is_optional) = match dep_value {
299        TomlValue::String(value) => (Some(truncate_field(value.to_string())), false),
300        TomlValue::Table(table) => (
301            table
302                .get(FIELD_VERSION)
303                .and_then(|value| value.as_str())
304                .map(|value| truncate_field(value.to_string())),
305            table
306                .get("optional")
307                .and_then(|value| value.as_bool())
308                .unwrap_or(false),
309        ),
310        _ => (None, false),
311    };
312
313    let normalized_name = normalize_pypi_name(dep_name);
314    let purl = create_pypi_purl(&normalized_name, None);
315
316    Some(Dependency {
317        purl,
318        extracted_requirement: requirement,
319        scope: Some(truncate_field(FIELD_DEPENDENCIES.to_string())),
320        is_runtime: Some(true),
321        is_optional: Some(is_optional),
322        is_pinned: Some(false),
323        is_direct: Some(true),
324        resolved_package: None,
325        extra_data: None,
326    })
327}
328
329fn build_dependency_from_extra(extra_name: &str, spec: &str) -> Option<Dependency> {
330    let (name, requirement) = parse_poetry_dependency_spec(spec)?;
331    let purl = create_pypi_purl(&name, None);
332
333    Some(Dependency {
334        purl,
335        extracted_requirement: requirement,
336        scope: Some(truncate_field(extra_name.to_string())),
337        is_runtime: None,
338        is_optional: Some(true),
339        is_pinned: Some(false),
340        is_direct: Some(true),
341        resolved_package: None,
342        extra_data: None,
343    })
344}
345
346fn parse_poetry_dependency_spec(spec: &str) -> Option<(String, Option<String>)> {
347    let trimmed = spec.trim();
348    if trimmed.is_empty() {
349        return None;
350    }
351
352    if let Some(paren_pos) = trimmed.find(" (") {
353        let name_part = trimmed[..paren_pos].trim();
354        let requirement_part = trimmed[paren_pos + 2..].trim();
355        let requirement = requirement_part.trim_end_matches(')').trim();
356        if name_part.is_empty() {
357            return None;
358        }
359        let normalized_name = truncate_field(normalize_pypi_name(name_part));
360        let requirement = if requirement.is_empty() {
361            None
362        } else {
363            Some(truncate_field(requirement.to_string()))
364        };
365        return Some((normalized_name, requirement));
366    }
367
368    Some((truncate_field(normalize_pypi_name(trimmed)), None))
369}
370
371fn normalize_pypi_name(name: &str) -> String {
372    name.trim().to_ascii_lowercase()
373}
374
375fn create_pypi_purl(name: &str, version: Option<&str>) -> Option<String> {
376    if name.contains('[') || name.contains(']') {
377        return Some(truncate_field(build_manual_pypi_purl(name, version)));
378    }
379
380    if let Ok(mut purl) = PackageUrl::new(PoetryLockParser::PACKAGE_TYPE.as_str(), name) {
381        if let Some(version) = version
382            && purl.with_version(version).is_err()
383        {
384            return None;
385        }
386        return Some(truncate_field(purl.to_string()));
387    }
388
389    Some(truncate_field(build_manual_pypi_purl(name, version)))
390}
391
392fn build_manual_pypi_purl(name: &str, version: Option<&str>) -> String {
393    let encoded_name = encode_pypi_name(name);
394    let mut purl = format!("pkg:pypi/{}", encoded_name);
395    if let Some(version) = version
396        && !version.is_empty()
397    {
398        purl.push('@');
399        purl.push_str(version);
400    }
401    purl
402}
403
404fn encode_pypi_name(name: &str) -> String {
405    name.replace('[', "%5b").replace(']', "%5d")
406}
407
408fn extract_sha256_from_files(package_table: &TomlMap<String, TomlValue>) -> Option<String> {
409    package_table
410        .get("files")
411        .and_then(|files| files.as_array())
412        .and_then(|files_array| files_array.first())
413        .and_then(|first_file| first_file.as_table())
414        .and_then(|file_table| file_table.get("hash"))
415        .and_then(|hash_value| hash_value.as_str())
416        .and_then(|hash_str| {
417            hash_str
418                .strip_prefix("sha256:")
419                .map(|s| truncate_field(s.to_string()))
420        })
421}
422
423fn default_package_data() -> PackageData {
424    PackageData {
425        package_type: Some(PoetryLockParser::PACKAGE_TYPE),
426        primary_language: Some("Python".to_string()),
427        datasource_id: Some(DatasourceId::PypiPoetryLock),
428        ..Default::default()
429    }
430}
431
432crate::register_parser!(
433    "Poetry lockfile",
434    &["**/poetry.lock"],
435    "pypi",
436    "Python",
437    Some("https://python-poetry.org/docs/basic-usage/#installing-with-poetrylock"),
438);