1use std::collections::HashMap;
26use std::path::Path;
27
28use crate::parser_warn as warn;
29use packageurl::PackageUrl;
30use toml::Value as TomlValue;
31use toml::map::Map as TomlMap;
32
33use crate::models::{
34 DatasourceId, Dependency, PackageData, PackageType, ResolvedPackage, Sha256Digest,
35};
36use crate::parsers::python::{build_pypi_urls, read_toml_file};
37use crate::parsers::utils::{MAX_ITERATION_COUNT, truncate_field};
38
39use super::PackageParser;
40
41const FIELD_PACKAGE: &str = "package";
42const FIELD_METADATA: &str = "metadata";
43const FIELD_NAME: &str = "name";
44const FIELD_VERSION: &str = "version";
45const FIELD_PYTHON_VERSIONS: &str = "python-versions";
46const FIELD_DEPENDENCIES: &str = "dependencies";
47const FIELD_EXTRAS: &str = "extras";
48const FIELD_LOCK_VERSION: &str = "lock-version";
49
50pub struct PoetryLockParser;
54
55impl PackageParser for PoetryLockParser {
56 const PACKAGE_TYPE: PackageType = PackageType::Pypi;
57
58 fn is_match(path: &Path) -> bool {
59 path.file_name()
60 .and_then(|name| name.to_str())
61 .map(|name| name == "poetry.lock")
62 .unwrap_or(false)
63 }
64
65 fn extract_packages(path: &Path) -> Vec<PackageData> {
66 let toml_content = match read_toml_file(path) {
67 Ok(content) => content,
68 Err(e) => {
69 warn!("Failed to read poetry.lock at {:?}: {}", path, e);
70 return vec![default_package_data()];
71 }
72 };
73
74 vec![parse_poetry_lock(&toml_content)]
75 }
76}
77
78fn parse_poetry_lock(toml_content: &TomlValue) -> PackageData {
79 let packages = toml_content
80 .get(FIELD_PACKAGE)
81 .and_then(|value| value.as_array())
82 .cloned()
83 .unwrap_or_default();
84
85 let metadata = toml_content
86 .get(FIELD_METADATA)
87 .and_then(|value| value.as_table());
88
89 let mut dependencies = Vec::new();
90 for package in packages.iter().take(MAX_ITERATION_COUNT) {
91 if let Some(package_table) = package.as_table()
92 && let Some(dependency) = build_dependency_from_package(package_table)
93 {
94 dependencies.push(dependency);
95 }
96 }
97
98 PackageData {
99 package_type: Some(PoetryLockParser::PACKAGE_TYPE),
100 namespace: None,
101 name: None,
102 version: None,
103 qualifiers: None,
104 subpath: None,
105 primary_language: Some("Python".to_string()),
106 description: None,
107 release_date: None,
108 parties: Vec::new(),
109 keywords: Vec::new(),
110 homepage_url: None,
111 download_url: None,
112 size: None,
113 sha1: None,
114 md5: None,
115 sha256: None,
116 sha512: None,
117 bug_tracking_url: None,
118 code_view_url: None,
119 vcs_url: None,
120 copyright: None,
121 holder: None,
122 declared_license_expression: None,
123 declared_license_expression_spdx: None,
124 license_detections: Vec::new(),
125 other_license_expression: None,
126 other_license_expression_spdx: None,
127 other_license_detections: Vec::new(),
128 extracted_license_statement: None,
129 notice_text: None,
130 source_packages: Vec::new(),
131 file_references: Vec::new(),
132 is_private: false,
133 is_virtual: false,
134 extra_data: build_metadata_extra_data(metadata),
135 dependencies,
136 repository_homepage_url: None,
137 repository_download_url: None,
138 api_data_url: None,
139 datasource_id: Some(DatasourceId::PypiPoetryLock),
140 purl: None,
141 }
142}
143
144fn build_metadata_extra_data(
145 metadata: Option<&TomlMap<String, TomlValue>>,
146) -> Option<HashMap<String, serde_json::Value>> {
147 let mut extra_data = HashMap::new();
148
149 if let Some(metadata) = metadata {
150 if let Some(python_versions) = metadata
151 .get(FIELD_PYTHON_VERSIONS)
152 .and_then(|value| value.as_str())
153 && !python_versions.is_empty()
154 {
155 extra_data.insert(
156 "python_version".to_string(),
157 serde_json::Value::String(truncate_field(python_versions.to_string())),
158 );
159 }
160
161 if let Some(lock_version) = metadata.get(FIELD_LOCK_VERSION) {
162 let lock_version = lock_version
163 .as_str()
164 .map(|value| value.to_string())
165 .or_else(|| lock_version.as_integer().map(|value| value.to_string()));
166
167 if let Some(lock_version) = lock_version
168 && !lock_version.is_empty()
169 {
170 extra_data.insert(
171 "lock_version".to_string(),
172 serde_json::Value::String(truncate_field(lock_version)),
173 );
174 }
175 }
176 }
177
178 if extra_data.is_empty() {
179 None
180 } else {
181 Some(extra_data)
182 }
183}
184
185fn build_dependency_from_package(package_table: &TomlMap<String, TomlValue>) -> Option<Dependency> {
186 let name = package_table
187 .get(FIELD_NAME)
188 .and_then(|value| value.as_str())
189 .map(normalize_pypi_name)
190 .map(truncate_field)?;
191
192 let version = package_table
193 .get(FIELD_VERSION)
194 .and_then(|value| value.as_str())
195 .map(|value| truncate_field(value.to_string()))?;
196
197 let purl = create_pypi_purl(&name, Some(&version));
198
199 let resolved_package = build_resolved_package(package_table, &name, &version);
200
201 let poetry_optional = package_table
202 .get("optional")
203 .and_then(|value| value.as_bool())
204 .unwrap_or(false);
205
206 let extra_data = Some(HashMap::from([(
207 "poetry_optional".to_string(),
208 serde_json::Value::Bool(poetry_optional),
209 )]));
210
211 Some(Dependency {
212 purl,
213 extracted_requirement: None,
214 scope: None,
215 is_runtime: None,
216 is_optional: None,
217 is_pinned: Some(true),
218 is_direct: None,
219 resolved_package: Some(Box::new(resolved_package)),
220 extra_data,
221 })
222}
223
224fn build_resolved_package(
225 package_table: &TomlMap<String, TomlValue>,
226 name: &str,
227 version: &str,
228) -> ResolvedPackage {
229 let dependencies = extract_package_dependencies(package_table);
230
231 let urls = build_pypi_urls(Some(name), Some(version));
232
233 let repository_homepage_url = urls.repository_homepage_url.map(truncate_field);
234 let repository_download_url = urls.repository_download_url.map(truncate_field);
235 let api_data_url = urls.api_data_url.map(truncate_field);
236 let purl = urls.purl.map(truncate_field);
237
238 let sha256 = extract_sha256_from_files(package_table);
240
241 ResolvedPackage {
242 primary_language: Some("Python".to_string()),
243 download_url: None,
244 sha1: None,
245 sha256: sha256.and_then(|h| Sha256Digest::from_hex(&h).ok()),
246 sha512: None,
247 md5: None,
248 is_virtual: true,
249 extra_data: None,
250 dependencies,
251 repository_homepage_url,
252 repository_download_url,
253 api_data_url,
254 datasource_id: Some(DatasourceId::PypiPoetryLock),
255 purl,
256 ..ResolvedPackage::new(
257 PoetryLockParser::PACKAGE_TYPE,
258 String::new(),
259 truncate_field(name.to_string()),
260 truncate_field(version.to_string()),
261 )
262 }
263}
264
265fn extract_package_dependencies(package_table: &TomlMap<String, TomlValue>) -> Vec<Dependency> {
266 let mut dependencies = Vec::new();
267
268 if let Some(dep_table) = package_table
269 .get(FIELD_DEPENDENCIES)
270 .and_then(|value| value.as_table())
271 {
272 for (dep_name, dep_value) in dep_table.iter().take(MAX_ITERATION_COUNT) {
273 if let Some(dependency) = build_dependency_from_table(dep_name, dep_value) {
274 dependencies.push(dependency);
275 }
276 }
277 }
278
279 if let Some(extras_table) = package_table
280 .get(FIELD_EXTRAS)
281 .and_then(|value| value.as_table())
282 {
283 for (extra_name, extra_values) in extras_table.iter().take(MAX_ITERATION_COUNT) {
284 if let Some(extra_list) = extra_values.as_array() {
285 for extra in extra_list.iter().take(MAX_ITERATION_COUNT) {
286 if let Some(spec) = extra.as_str()
287 && let Some(dependency) = build_dependency_from_extra(extra_name, spec)
288 {
289 dependencies.push(dependency);
290 }
291 }
292 }
293 }
294 }
295
296 dependencies
297}
298
299fn build_dependency_from_table(dep_name: &str, dep_value: &TomlValue) -> Option<Dependency> {
300 let (requirement, is_optional) = match dep_value {
301 TomlValue::String(value) => (Some(truncate_field(value.to_string())), false),
302 TomlValue::Table(table) => (
303 table
304 .get(FIELD_VERSION)
305 .and_then(|value| value.as_str())
306 .map(|value| truncate_field(value.to_string())),
307 table
308 .get("optional")
309 .and_then(|value| value.as_bool())
310 .unwrap_or(false),
311 ),
312 _ => (None, false),
313 };
314
315 let normalized_name = normalize_pypi_name(dep_name);
316 let purl = create_pypi_purl(&normalized_name, None);
317
318 Some(Dependency {
319 purl,
320 extracted_requirement: requirement,
321 scope: Some(truncate_field(FIELD_DEPENDENCIES.to_string())),
322 is_runtime: Some(true),
323 is_optional: Some(is_optional),
324 is_pinned: Some(false),
325 is_direct: Some(true),
326 resolved_package: None,
327 extra_data: None,
328 })
329}
330
331fn build_dependency_from_extra(extra_name: &str, spec: &str) -> Option<Dependency> {
332 let (name, requirement) = parse_poetry_dependency_spec(spec)?;
333 let purl = create_pypi_purl(&name, None);
334
335 Some(Dependency {
336 purl,
337 extracted_requirement: requirement,
338 scope: Some(truncate_field(extra_name.to_string())),
339 is_runtime: None,
340 is_optional: Some(true),
341 is_pinned: Some(false),
342 is_direct: Some(true),
343 resolved_package: None,
344 extra_data: None,
345 })
346}
347
348fn parse_poetry_dependency_spec(spec: &str) -> Option<(String, Option<String>)> {
349 let trimmed = spec.trim();
350 if trimmed.is_empty() {
351 return None;
352 }
353
354 if let Some(paren_pos) = trimmed.find(" (") {
355 let name_part = trimmed[..paren_pos].trim();
356 let requirement_part = trimmed[paren_pos + 2..].trim();
357 let requirement = requirement_part.trim_end_matches(')').trim();
358 if name_part.is_empty() {
359 return None;
360 }
361 let normalized_name = truncate_field(normalize_pypi_name(name_part));
362 let requirement = if requirement.is_empty() {
363 None
364 } else {
365 Some(truncate_field(requirement.to_string()))
366 };
367 return Some((normalized_name, requirement));
368 }
369
370 Some((truncate_field(normalize_pypi_name(trimmed)), None))
371}
372
373fn normalize_pypi_name(name: &str) -> String {
374 name.trim().to_ascii_lowercase()
375}
376
377fn create_pypi_purl(name: &str, version: Option<&str>) -> Option<String> {
378 if name.contains('[') || name.contains(']') {
379 return Some(truncate_field(build_manual_pypi_purl(name, version)));
380 }
381
382 if let Ok(mut purl) = PackageUrl::new(PoetryLockParser::PACKAGE_TYPE.as_str(), name) {
383 if let Some(version) = version
384 && purl.with_version(version).is_err()
385 {
386 return None;
387 }
388 return Some(truncate_field(purl.to_string()));
389 }
390
391 Some(truncate_field(build_manual_pypi_purl(name, version)))
392}
393
394fn build_manual_pypi_purl(name: &str, version: Option<&str>) -> String {
395 let encoded_name = encode_pypi_name(name);
396 let mut purl = format!("pkg:pypi/{}", encoded_name);
397 if let Some(version) = version
398 && !version.is_empty()
399 {
400 purl.push('@');
401 purl.push_str(version);
402 }
403 purl
404}
405
406fn encode_pypi_name(name: &str) -> String {
407 name.replace('[', "%5b").replace(']', "%5d")
408}
409
410fn extract_sha256_from_files(package_table: &TomlMap<String, TomlValue>) -> Option<String> {
411 package_table
412 .get("files")
413 .and_then(|files| files.as_array())
414 .and_then(|files_array| files_array.first())
415 .and_then(|first_file| first_file.as_table())
416 .and_then(|file_table| file_table.get("hash"))
417 .and_then(|hash_value| hash_value.as_str())
418 .and_then(|hash_str| {
419 hash_str
420 .strip_prefix("sha256:")
421 .map(|s| truncate_field(s.to_string()))
422 })
423}
424
425fn default_package_data() -> PackageData {
426 PackageData {
427 package_type: Some(PoetryLockParser::PACKAGE_TYPE),
428 primary_language: Some("Python".to_string()),
429 datasource_id: Some(DatasourceId::PypiPoetryLock),
430 ..Default::default()
431 }
432}
433
434crate::register_parser!(
435 "Poetry lockfile",
436 &["**/poetry.lock"],
437 "pypi",
438 "Python",
439 Some("https://python-poetry.org/docs/basic-usage/#installing-with-poetrylock"),
440);