1use std::collections::HashMap;
23use std::path::Path;
24
25use crate::parser_warn as warn;
26use packageurl::PackageUrl;
27use toml::Value as TomlValue;
28use toml::map::Map as TomlMap;
29
30use crate::models::{
31 DatasourceId, Dependency, PackageData, PackageType, ResolvedPackage, Sha256Digest,
32};
33use crate::parsers::python::{build_pypi_urls, read_toml_file};
34use crate::parsers::utils::{MAX_ITERATION_COUNT, truncate_field};
35
36use super::PackageParser;
37
38const FIELD_PACKAGE: &str = "package";
39const FIELD_METADATA: &str = "metadata";
40const FIELD_NAME: &str = "name";
41const FIELD_VERSION: &str = "version";
42const FIELD_PYTHON_VERSIONS: &str = "python-versions";
43const FIELD_DEPENDENCIES: &str = "dependencies";
44const FIELD_EXTRAS: &str = "extras";
45const FIELD_LOCK_VERSION: &str = "lock-version";
46
47pub struct PoetryLockParser;
51
52impl PackageParser for PoetryLockParser {
53 const PACKAGE_TYPE: PackageType = PackageType::Pypi;
54
55 fn is_match(path: &Path) -> bool {
56 path.file_name()
57 .and_then(|name| name.to_str())
58 .map(|name| name == "poetry.lock")
59 .unwrap_or(false)
60 }
61
62 fn extract_packages(path: &Path) -> Vec<PackageData> {
63 let toml_content = match read_toml_file(path) {
64 Ok(content) => content,
65 Err(e) => {
66 warn!("Failed to read poetry.lock at {:?}: {}", path, e);
67 return vec![default_package_data()];
68 }
69 };
70
71 vec![parse_poetry_lock(&toml_content)]
72 }
73}
74
75fn parse_poetry_lock(toml_content: &TomlValue) -> PackageData {
76 let packages = toml_content
77 .get(FIELD_PACKAGE)
78 .and_then(|value| value.as_array())
79 .cloned()
80 .unwrap_or_default();
81
82 let metadata = toml_content
83 .get(FIELD_METADATA)
84 .and_then(|value| value.as_table());
85
86 let mut dependencies = Vec::new();
87 for package in packages.iter().take(MAX_ITERATION_COUNT) {
88 if let Some(package_table) = package.as_table()
89 && let Some(dependency) = build_dependency_from_package(package_table)
90 {
91 dependencies.push(dependency);
92 }
93 }
94
95 PackageData {
96 package_type: Some(PoetryLockParser::PACKAGE_TYPE),
97 namespace: None,
98 name: None,
99 version: None,
100 qualifiers: None,
101 subpath: None,
102 primary_language: Some("Python".to_string()),
103 description: None,
104 release_date: None,
105 parties: Vec::new(),
106 keywords: Vec::new(),
107 homepage_url: None,
108 download_url: None,
109 size: None,
110 sha1: None,
111 md5: None,
112 sha256: None,
113 sha512: None,
114 bug_tracking_url: None,
115 code_view_url: None,
116 vcs_url: None,
117 copyright: None,
118 holder: None,
119 declared_license_expression: None,
120 declared_license_expression_spdx: None,
121 license_detections: Vec::new(),
122 other_license_expression: None,
123 other_license_expression_spdx: None,
124 other_license_detections: Vec::new(),
125 extracted_license_statement: None,
126 notice_text: None,
127 source_packages: Vec::new(),
128 file_references: Vec::new(),
129 is_private: false,
130 is_virtual: false,
131 extra_data: build_metadata_extra_data(metadata),
132 dependencies,
133 repository_homepage_url: None,
134 repository_download_url: None,
135 api_data_url: None,
136 datasource_id: Some(DatasourceId::PypiPoetryLock),
137 purl: None,
138 }
139}
140
141fn build_metadata_extra_data(
142 metadata: Option<&TomlMap<String, TomlValue>>,
143) -> Option<HashMap<String, serde_json::Value>> {
144 let mut extra_data = HashMap::new();
145
146 if let Some(metadata) = metadata {
147 if let Some(python_versions) = metadata
148 .get(FIELD_PYTHON_VERSIONS)
149 .and_then(|value| value.as_str())
150 && !python_versions.is_empty()
151 {
152 extra_data.insert(
153 "python_version".to_string(),
154 serde_json::Value::String(truncate_field(python_versions.to_string())),
155 );
156 }
157
158 if let Some(lock_version) = metadata.get(FIELD_LOCK_VERSION) {
159 let lock_version = lock_version
160 .as_str()
161 .map(|value| value.to_string())
162 .or_else(|| lock_version.as_integer().map(|value| value.to_string()));
163
164 if let Some(lock_version) = lock_version
165 && !lock_version.is_empty()
166 {
167 extra_data.insert(
168 "lock_version".to_string(),
169 serde_json::Value::String(truncate_field(lock_version)),
170 );
171 }
172 }
173 }
174
175 if extra_data.is_empty() {
176 None
177 } else {
178 Some(extra_data)
179 }
180}
181
182fn build_dependency_from_package(package_table: &TomlMap<String, TomlValue>) -> Option<Dependency> {
183 let name = package_table
184 .get(FIELD_NAME)
185 .and_then(|value| value.as_str())
186 .map(normalize_pypi_name)
187 .map(truncate_field)?;
188
189 let version = package_table
190 .get(FIELD_VERSION)
191 .and_then(|value| value.as_str())
192 .map(|value| truncate_field(value.to_string()))?;
193
194 let purl = create_pypi_purl(&name, Some(&version));
195
196 let resolved_package = build_resolved_package(package_table, &name, &version);
197
198 let poetry_optional = package_table
199 .get("optional")
200 .and_then(|value| value.as_bool())
201 .unwrap_or(false);
202
203 let extra_data = Some(HashMap::from([(
204 "poetry_optional".to_string(),
205 serde_json::Value::Bool(poetry_optional),
206 )]));
207
208 Some(Dependency {
209 purl,
210 extracted_requirement: None,
211 scope: None,
212 is_runtime: None,
213 is_optional: None,
214 is_pinned: Some(true),
215 is_direct: None,
216 resolved_package: Some(Box::new(resolved_package)),
217 extra_data,
218 })
219}
220
221fn build_resolved_package(
222 package_table: &TomlMap<String, TomlValue>,
223 name: &str,
224 version: &str,
225) -> ResolvedPackage {
226 let dependencies = extract_package_dependencies(package_table);
227
228 let (repository_homepage_url, repository_download_url, api_data_url, purl) =
229 build_pypi_urls(Some(name), Some(version));
230
231 let repository_homepage_url = repository_homepage_url.map(truncate_field);
232 let repository_download_url = repository_download_url.map(truncate_field);
233 let api_data_url = api_data_url.map(truncate_field);
234 let purl = purl.map(truncate_field);
235
236 let sha256 = extract_sha256_from_files(package_table);
238
239 ResolvedPackage {
240 primary_language: Some("Python".to_string()),
241 download_url: None,
242 sha1: None,
243 sha256: sha256.and_then(|h| Sha256Digest::from_hex(&h).ok()),
244 sha512: None,
245 md5: None,
246 is_virtual: true,
247 extra_data: None,
248 dependencies,
249 repository_homepage_url,
250 repository_download_url,
251 api_data_url,
252 datasource_id: Some(DatasourceId::PypiPoetryLock),
253 purl,
254 ..ResolvedPackage::new(
255 PoetryLockParser::PACKAGE_TYPE,
256 String::new(),
257 truncate_field(name.to_string()),
258 truncate_field(version.to_string()),
259 )
260 }
261}
262
263fn extract_package_dependencies(package_table: &TomlMap<String, TomlValue>) -> Vec<Dependency> {
264 let mut dependencies = Vec::new();
265
266 if let Some(dep_table) = package_table
267 .get(FIELD_DEPENDENCIES)
268 .and_then(|value| value.as_table())
269 {
270 for (dep_name, dep_value) in dep_table.iter().take(MAX_ITERATION_COUNT) {
271 if let Some(dependency) = build_dependency_from_table(dep_name, dep_value) {
272 dependencies.push(dependency);
273 }
274 }
275 }
276
277 if let Some(extras_table) = package_table
278 .get(FIELD_EXTRAS)
279 .and_then(|value| value.as_table())
280 {
281 for (extra_name, extra_values) in extras_table.iter().take(MAX_ITERATION_COUNT) {
282 if let Some(extra_list) = extra_values.as_array() {
283 for extra in extra_list.iter().take(MAX_ITERATION_COUNT) {
284 if let Some(spec) = extra.as_str()
285 && let Some(dependency) = build_dependency_from_extra(extra_name, spec)
286 {
287 dependencies.push(dependency);
288 }
289 }
290 }
291 }
292 }
293
294 dependencies
295}
296
297fn build_dependency_from_table(dep_name: &str, dep_value: &TomlValue) -> Option<Dependency> {
298 let (requirement, is_optional) = match dep_value {
299 TomlValue::String(value) => (Some(truncate_field(value.to_string())), false),
300 TomlValue::Table(table) => (
301 table
302 .get(FIELD_VERSION)
303 .and_then(|value| value.as_str())
304 .map(|value| truncate_field(value.to_string())),
305 table
306 .get("optional")
307 .and_then(|value| value.as_bool())
308 .unwrap_or(false),
309 ),
310 _ => (None, false),
311 };
312
313 let normalized_name = normalize_pypi_name(dep_name);
314 let purl = create_pypi_purl(&normalized_name, None);
315
316 Some(Dependency {
317 purl,
318 extracted_requirement: requirement,
319 scope: Some(truncate_field(FIELD_DEPENDENCIES.to_string())),
320 is_runtime: Some(true),
321 is_optional: Some(is_optional),
322 is_pinned: Some(false),
323 is_direct: Some(true),
324 resolved_package: None,
325 extra_data: None,
326 })
327}
328
329fn build_dependency_from_extra(extra_name: &str, spec: &str) -> Option<Dependency> {
330 let (name, requirement) = parse_poetry_dependency_spec(spec)?;
331 let purl = create_pypi_purl(&name, None);
332
333 Some(Dependency {
334 purl,
335 extracted_requirement: requirement,
336 scope: Some(truncate_field(extra_name.to_string())),
337 is_runtime: None,
338 is_optional: Some(true),
339 is_pinned: Some(false),
340 is_direct: Some(true),
341 resolved_package: None,
342 extra_data: None,
343 })
344}
345
346fn parse_poetry_dependency_spec(spec: &str) -> Option<(String, Option<String>)> {
347 let trimmed = spec.trim();
348 if trimmed.is_empty() {
349 return None;
350 }
351
352 if let Some(paren_pos) = trimmed.find(" (") {
353 let name_part = trimmed[..paren_pos].trim();
354 let requirement_part = trimmed[paren_pos + 2..].trim();
355 let requirement = requirement_part.trim_end_matches(')').trim();
356 if name_part.is_empty() {
357 return None;
358 }
359 let normalized_name = truncate_field(normalize_pypi_name(name_part));
360 let requirement = if requirement.is_empty() {
361 None
362 } else {
363 Some(truncate_field(requirement.to_string()))
364 };
365 return Some((normalized_name, requirement));
366 }
367
368 Some((truncate_field(normalize_pypi_name(trimmed)), None))
369}
370
371fn normalize_pypi_name(name: &str) -> String {
372 name.trim().to_ascii_lowercase()
373}
374
375fn create_pypi_purl(name: &str, version: Option<&str>) -> Option<String> {
376 if name.contains('[') || name.contains(']') {
377 return Some(truncate_field(build_manual_pypi_purl(name, version)));
378 }
379
380 if let Ok(mut purl) = PackageUrl::new(PoetryLockParser::PACKAGE_TYPE.as_str(), name) {
381 if let Some(version) = version
382 && purl.with_version(version).is_err()
383 {
384 return None;
385 }
386 return Some(truncate_field(purl.to_string()));
387 }
388
389 Some(truncate_field(build_manual_pypi_purl(name, version)))
390}
391
392fn build_manual_pypi_purl(name: &str, version: Option<&str>) -> String {
393 let encoded_name = encode_pypi_name(name);
394 let mut purl = format!("pkg:pypi/{}", encoded_name);
395 if let Some(version) = version
396 && !version.is_empty()
397 {
398 purl.push('@');
399 purl.push_str(version);
400 }
401 purl
402}
403
404fn encode_pypi_name(name: &str) -> String {
405 name.replace('[', "%5b").replace(']', "%5d")
406}
407
408fn extract_sha256_from_files(package_table: &TomlMap<String, TomlValue>) -> Option<String> {
409 package_table
410 .get("files")
411 .and_then(|files| files.as_array())
412 .and_then(|files_array| files_array.first())
413 .and_then(|first_file| first_file.as_table())
414 .and_then(|file_table| file_table.get("hash"))
415 .and_then(|hash_value| hash_value.as_str())
416 .and_then(|hash_str| {
417 hash_str
418 .strip_prefix("sha256:")
419 .map(|s| truncate_field(s.to_string()))
420 })
421}
422
423fn default_package_data() -> PackageData {
424 PackageData {
425 package_type: Some(PoetryLockParser::PACKAGE_TYPE),
426 primary_language: Some("Python".to_string()),
427 datasource_id: Some(DatasourceId::PypiPoetryLock),
428 ..Default::default()
429 }
430}
431
432crate::register_parser!(
433 "Poetry lockfile",
434 &["**/poetry.lock"],
435 "pypi",
436 "Python",
437 Some("https://python-poetry.org/docs/basic-usage/#installing-with-poetrylock"),
438);