1use std::collections::{HashMap, HashSet, VecDeque};
2use std::path::Path;
3
4use log::warn;
5use packageurl::PackageUrl;
6use serde_json::Value as JsonValue;
7use toml::Value as TomlValue;
8use toml::map::Map as TomlMap;
9
10use crate::models::{DatasourceId, Dependency, PackageData, PackageType, ResolvedPackage};
11use crate::parsers::python::read_toml_file;
12
13use super::PackageParser;
14
15const FIELD_PACKAGE: &str = "package";
16const FIELD_NAME: &str = "name";
17const FIELD_VERSION: &str = "version";
18const FIELD_SOURCE: &str = "source";
19const FIELD_DEPENDENCIES: &str = "dependencies";
20const FIELD_OPTIONAL_DEPENDENCIES: &str = "optional-dependencies";
21const FIELD_DEV_DEPENDENCIES: &str = "dev-dependencies";
22const FIELD_METADATA: &str = "metadata";
23const FIELD_REQUIRES_DIST: &str = "requires-dist";
24const FIELD_REQUIRES_DEV: &str = "requires-dev";
25const FIELD_METADATA_OPTIONAL_DEPENDENCIES: &str = "optional-dependencies";
26const FIELD_MARKER: &str = "marker";
27const FIELD_EXTRA: &str = "extra";
28const FIELD_SPECIFIER: &str = "specifier";
29const FIELD_REVISION: &str = "revision";
30const FIELD_REQUIRES_PYTHON: &str = "requires-python";
31const FIELD_RESOLUTION_MARKERS: &str = "resolution-markers";
32const FIELD_MANIFEST: &str = "manifest";
33
34pub struct UvLockParser;
35
36#[derive(Clone, Debug, Default)]
37struct DirectDependencyInfo {
38 extracted_requirement: Option<String>,
39 scope: Option<String>,
40 is_runtime: bool,
41 is_optional: bool,
42 extra_data: Option<HashMap<String, JsonValue>>,
43 source_key: Option<String>,
44}
45
46#[derive(Clone, Debug)]
47struct DependencyEdge {
48 name: String,
49 extracted_requirement: Option<String>,
50 scope: Option<String>,
51 is_runtime: bool,
52 is_optional: bool,
53 source_key: Option<String>,
54 extra_data: Option<HashMap<String, JsonValue>>,
55}
56
57impl PackageParser for UvLockParser {
58 const PACKAGE_TYPE: PackageType = PackageType::Pypi;
59
60 fn is_match(path: &Path) -> bool {
61 path.file_name()
62 .and_then(|name| name.to_str())
63 .is_some_and(|name| name == "uv.lock")
64 }
65
66 fn extract_packages(path: &Path) -> Vec<PackageData> {
67 let toml_content = match read_toml_file(path) {
68 Ok(content) => content,
69 Err(e) => {
70 warn!("Failed to read uv.lock at {:?}: {}", path, e);
71 return vec![default_package_data()];
72 }
73 };
74
75 vec![parse_uv_lock(&toml_content)]
76 }
77}
78
79fn parse_uv_lock(toml_content: &TomlValue) -> PackageData {
80 let packages = toml_content
81 .get(FIELD_PACKAGE)
82 .and_then(TomlValue::as_array)
83 .cloned()
84 .unwrap_or_default();
85
86 if packages.is_empty() {
87 return default_package_data();
88 }
89
90 let package_tables: Vec<&TomlMap<String, TomlValue>> =
91 packages.iter().filter_map(TomlValue::as_table).collect();
92
93 if package_tables.is_empty() {
94 return default_package_data();
95 }
96
97 let root_index = find_root_package_index(&package_tables);
98 let package_lookup = build_package_lookup(&package_tables);
99
100 let direct_infos = root_index
101 .and_then(|index| package_tables.get(index).copied())
102 .map(collect_root_direct_dependencies)
103 .unwrap_or_default();
104
105 let runtime_roots: Vec<(String, Option<String>)> = direct_infos
106 .iter()
107 .filter(|(_, info)| info.is_runtime)
108 .map(|(name, info)| (name.clone(), info.source_key.clone()))
109 .collect();
110 let dev_roots: Vec<(String, Option<String>)> = direct_infos
111 .iter()
112 .filter(|(_, info)| !info.is_runtime && !info.is_optional)
113 .map(|(name, info)| (name.clone(), info.source_key.clone()))
114 .collect();
115 let optional_roots: Vec<(String, Option<String>)> = direct_infos
116 .iter()
117 .filter(|(_, info)| info.is_optional)
118 .map(|(name, info)| (name.clone(), info.source_key.clone()))
119 .collect();
120
121 let runtime_reachable =
122 collect_reachable_packages(&package_tables, &package_lookup, &runtime_roots, false);
123 let dev_reachable =
124 collect_reachable_packages(&package_tables, &package_lookup, &dev_roots, true);
125 let optional_reachable =
126 collect_reachable_packages(&package_tables, &package_lookup, &optional_roots, true);
127
128 let mut package_data = default_package_data();
129 package_data.extra_data = build_lock_extra_data(toml_content);
130
131 if let Some(index) = root_index
132 && let Some(root_table) = package_tables.get(index)
133 {
134 package_data.name = root_table
135 .get(FIELD_NAME)
136 .and_then(TomlValue::as_str)
137 .map(normalize_pypi_name);
138 package_data.version = root_table
139 .get(FIELD_VERSION)
140 .and_then(TomlValue::as_str)
141 .map(|value| value.to_string());
142 package_data.is_virtual =
143 package_source_table(root_table).is_some_and(|source| source.contains_key("virtual"));
144 package_data.purl = package_data
145 .name
146 .as_deref()
147 .and_then(|name| create_pypi_purl(name, package_data.version.as_deref()));
148 }
149
150 package_data.dependencies = package_tables
151 .iter()
152 .enumerate()
153 .filter(|(index, _)| Some(*index) != root_index)
154 .filter_map(|(_, package_table)| {
155 build_top_level_dependency(
156 package_table,
157 root_index.is_none(),
158 &direct_infos,
159 &runtime_reachable,
160 &dev_reachable,
161 &optional_reachable,
162 &package_lookup,
163 )
164 })
165 .collect();
166
167 package_data
168}
169
170fn build_top_level_dependency(
171 package_table: &TomlMap<String, TomlValue>,
172 no_root_package: bool,
173 direct_infos: &HashMap<String, DirectDependencyInfo>,
174 runtime_reachable: &HashSet<String>,
175 dev_reachable: &HashSet<String>,
176 optional_reachable: &HashSet<String>,
177 package_lookup: &HashMap<String, Vec<usize>>,
178) -> Option<Dependency> {
179 let name = package_table
180 .get(FIELD_NAME)
181 .and_then(TomlValue::as_str)
182 .map(normalize_pypi_name)?;
183 let version = package_table
184 .get(FIELD_VERSION)
185 .and_then(TomlValue::as_str)
186 .map(|value| value.to_string())?;
187
188 let direct_info = direct_infos.get(&name);
189 let is_direct = direct_info.is_some();
190 let is_runtime = if no_root_package {
191 true
192 } else if let Some(info) = direct_info {
193 info.is_runtime
194 } else if runtime_reachable.contains(&name) {
195 true
196 } else {
197 !dev_reachable.contains(&name) && !optional_reachable.contains(&name)
198 };
199 let is_optional = direct_info.is_some_and(|info| info.is_optional)
200 || (!is_direct && optional_reachable.contains(&name) && !runtime_reachable.contains(&name));
201
202 Some(Dependency {
203 purl: create_pypi_purl(&name, Some(&version)),
204 extracted_requirement: direct_info.and_then(|info| info.extracted_requirement.clone()),
205 scope: direct_info.and_then(|info| info.scope.clone()),
206 is_runtime: Some(is_runtime),
207 is_optional: Some(is_optional),
208 is_pinned: Some(true),
209 is_direct: Some(is_direct),
210 resolved_package: Some(Box::new(build_resolved_package(
211 package_table,
212 package_lookup,
213 ))),
214 extra_data: direct_info.and_then(|info| info.extra_data.clone()),
215 })
216}
217
218fn build_resolved_package(
219 package_table: &TomlMap<String, TomlValue>,
220 package_lookup: &HashMap<String, Vec<usize>>,
221) -> ResolvedPackage {
222 let name = package_table
223 .get(FIELD_NAME)
224 .and_then(TomlValue::as_str)
225 .map(normalize_pypi_name)
226 .unwrap_or_default();
227 let version = package_table
228 .get(FIELD_VERSION)
229 .and_then(TomlValue::as_str)
230 .map(|value| value.to_string())
231 .unwrap_or_default();
232
233 let (_, repository_download_url, api_data_url, purl) =
234 build_pypi_urls(Some(&name), Some(&version));
235 let repository_homepage_url = Some(format!("https://pypi.org/project/{}", name));
236 let (download_url, sha256) = extract_artifact_metadata(package_table);
237
238 ResolvedPackage {
239 package_type: UvLockParser::PACKAGE_TYPE,
240 namespace: String::new(),
241 name,
242 version,
243 primary_language: Some("Python".to_string()),
244 download_url,
245 sha1: None,
246 sha256,
247 sha512: None,
248 md5: None,
249 is_virtual: true,
250 extra_data: build_package_extra_data(package_table),
251 dependencies: collect_package_dependency_edges(package_table)
252 .into_iter()
253 .map(|edge| edge_to_dependency(edge, package_lookup))
254 .collect(),
255 repository_homepage_url,
256 repository_download_url,
257 api_data_url,
258 datasource_id: Some(DatasourceId::PypiUvLock),
259 purl,
260 }
261}
262
263fn edge_to_dependency(
264 edge: DependencyEdge,
265 package_lookup: &HashMap<String, Vec<usize>>,
266) -> Dependency {
267 let is_pinned = edge
268 .source_key
269 .as_ref()
270 .map(|_| !package_lookup.contains_key(&edge.name))
271 .unwrap_or(false);
272
273 Dependency {
274 purl: create_pypi_purl(&edge.name, None),
275 extracted_requirement: edge.extracted_requirement,
276 scope: edge.scope,
277 is_runtime: Some(edge.is_runtime),
278 is_optional: Some(edge.is_optional),
279 is_pinned: Some(is_pinned),
280 is_direct: Some(true),
281 resolved_package: None,
282 extra_data: edge.extra_data,
283 }
284}
285
286fn collect_root_direct_dependencies(
287 root_table: &TomlMap<String, TomlValue>,
288) -> HashMap<String, DirectDependencyInfo> {
289 let mut infos = HashMap::new();
290 let metadata = root_table.get(FIELD_METADATA).and_then(TomlValue::as_table);
291 let runtime_requirements = metadata
292 .and_then(|metadata| metadata.get(FIELD_REQUIRES_DIST))
293 .map(parse_requirement_metadata_array)
294 .unwrap_or_default();
295 let dev_requirements = metadata
296 .and_then(|metadata| metadata.get(FIELD_REQUIRES_DEV))
297 .and_then(TomlValue::as_table)
298 .map(parse_requirement_metadata_table)
299 .unwrap_or_default();
300 let optional_requirements = metadata
301 .and_then(|metadata| metadata.get(FIELD_METADATA_OPTIONAL_DEPENDENCIES))
302 .and_then(TomlValue::as_table)
303 .map(parse_requirement_metadata_table)
304 .unwrap_or_default();
305
306 for edge in collect_dependency_edges_from_array(
307 root_table
308 .get(FIELD_DEPENDENCIES)
309 .and_then(TomlValue::as_array),
310 None,
311 true,
312 false,
313 runtime_requirements.get("__runtime__"),
314 ) {
315 merge_direct_dependency_info(&mut infos, edge);
316 }
317
318 if let Some(optional_table) = root_table
319 .get(FIELD_OPTIONAL_DEPENDENCIES)
320 .and_then(TomlValue::as_table)
321 {
322 for (group, value) in optional_table {
323 let requirement_map = optional_requirements.get(group);
324 for edge in collect_dependency_edges_from_array(
325 value.as_array(),
326 Some(group.to_string()),
327 false,
328 true,
329 requirement_map,
330 ) {
331 merge_direct_dependency_info(&mut infos, edge);
332 }
333 }
334 }
335
336 if let Some(dev_table) = root_table
337 .get(FIELD_DEV_DEPENDENCIES)
338 .and_then(TomlValue::as_table)
339 {
340 for (group, value) in dev_table {
341 let requirement_map = dev_requirements.get(group);
342 for edge in collect_dependency_edges_from_array(
343 value.as_array(),
344 Some(group.to_string()),
345 false,
346 false,
347 requirement_map,
348 ) {
349 merge_direct_dependency_info(&mut infos, edge);
350 }
351 }
352 }
353
354 infos
355}
356
357fn merge_direct_dependency_info(
358 infos: &mut HashMap<String, DirectDependencyInfo>,
359 edge: DependencyEdge,
360) {
361 let name = edge.name.clone();
362 let new_info = direct_info_from_edge(edge);
363
364 if let Some(existing) = infos.get_mut(&name) {
365 existing.is_runtime |= new_info.is_runtime;
366 existing.is_optional &= new_info.is_optional;
367
368 if existing.extracted_requirement.is_none() {
369 existing.extracted_requirement = new_info.extracted_requirement.clone();
370 }
371
372 existing.scope = merge_scope(existing.scope.as_ref(), new_info.scope.as_ref());
373 existing.extra_data =
374 merge_optional_json_maps(existing.extra_data.take(), new_info.extra_data);
375
376 if existing.source_key != new_info.source_key {
377 existing.source_key = None;
378 }
379 } else {
380 infos.insert(name, new_info);
381 }
382}
383
384fn merge_scope(current: Option<&String>, new: Option<&String>) -> Option<String> {
385 match (current, new) {
386 (None, None) => None,
387 (None, Some(_)) | (Some(_), None) => None,
388 (Some(left), Some(right)) if left == right => Some(left.clone()),
389 _ => None,
390 }
391}
392
393fn merge_optional_json_maps(
394 current: Option<HashMap<String, JsonValue>>,
395 new: Option<HashMap<String, JsonValue>>,
396) -> Option<HashMap<String, JsonValue>> {
397 match (current, new) {
398 (None, None) => None,
399 (Some(map), None) | (None, Some(map)) => Some(map),
400 (Some(mut current), Some(new)) => {
401 for (key, value) in new {
402 current.entry(key).or_insert(value);
403 }
404 Some(current)
405 }
406 }
407}
408
409fn direct_info_from_edge(edge: DependencyEdge) -> DirectDependencyInfo {
410 DirectDependencyInfo {
411 extracted_requirement: edge.extracted_requirement,
412 scope: edge.scope,
413 is_runtime: edge.is_runtime,
414 is_optional: edge.is_optional,
415 extra_data: edge.extra_data,
416 source_key: edge.source_key,
417 }
418}
419
420fn collect_package_dependency_edges(
421 package_table: &TomlMap<String, TomlValue>,
422) -> Vec<DependencyEdge> {
423 let mut edges = Vec::new();
424
425 edges.extend(collect_dependency_edges_from_array(
426 package_table
427 .get(FIELD_DEPENDENCIES)
428 .and_then(TomlValue::as_array),
429 None,
430 true,
431 false,
432 None,
433 ));
434
435 if let Some(optional_table) = package_table
436 .get(FIELD_OPTIONAL_DEPENDENCIES)
437 .and_then(TomlValue::as_table)
438 {
439 for (group, value) in optional_table {
440 edges.extend(collect_dependency_edges_from_array(
441 value.as_array(),
442 Some(group.to_string()),
443 false,
444 true,
445 None,
446 ));
447 }
448 }
449
450 if let Some(dev_table) = package_table
451 .get(FIELD_DEV_DEPENDENCIES)
452 .and_then(TomlValue::as_table)
453 {
454 for (group, value) in dev_table {
455 edges.extend(collect_dependency_edges_from_array(
456 value.as_array(),
457 Some(group.to_string()),
458 false,
459 false,
460 None,
461 ));
462 }
463 }
464
465 edges
466}
467
468fn collect_dependency_edges_from_array(
469 values: Option<&Vec<TomlValue>>,
470 scope: Option<String>,
471 is_runtime: bool,
472 is_optional: bool,
473 requirement_map: Option<&HashMap<String, String>>,
474) -> Vec<DependencyEdge> {
475 values
476 .into_iter()
477 .flatten()
478 .filter_map(|value| {
479 build_dependency_edge(
480 value,
481 scope.clone(),
482 is_runtime,
483 is_optional,
484 requirement_map,
485 )
486 })
487 .collect()
488}
489
490fn build_dependency_edge(
491 value: &TomlValue,
492 scope: Option<String>,
493 is_runtime: bool,
494 is_optional: bool,
495 requirement_map: Option<&HashMap<String, String>>,
496) -> Option<DependencyEdge> {
497 let table = value.as_table()?;
498 let name = table
499 .get(FIELD_NAME)
500 .and_then(TomlValue::as_str)
501 .map(normalize_pypi_name)?;
502
503 let mut extra_data = HashMap::new();
504 if let Some(marker) = table.get(FIELD_MARKER).and_then(TomlValue::as_str) {
505 extra_data.insert(
506 FIELD_MARKER.to_string(),
507 JsonValue::String(marker.to_string()),
508 );
509 }
510 if let Some(extra_value) = table.get(FIELD_EXTRA) {
511 let json_value = toml_value_to_json(extra_value);
512 extra_data.insert(FIELD_EXTRA.to_string(), json_value);
513 }
514
515 let source_key = table
516 .get(FIELD_SOURCE)
517 .and_then(TomlValue::as_table)
518 .and_then(source_table_key);
519 if let Some(source) = table.get(FIELD_SOURCE) {
520 extra_data.insert(FIELD_SOURCE.to_string(), toml_value_to_json(source));
521 }
522
523 let extracted_requirement = requirement_map
524 .and_then(|map| map.get(&name).cloned())
525 .or_else(|| {
526 table
527 .get(FIELD_SPECIFIER)
528 .and_then(TomlValue::as_str)
529 .map(|value| value.to_string())
530 });
531
532 Some(DependencyEdge {
533 name,
534 extracted_requirement,
535 scope,
536 is_runtime,
537 is_optional,
538 source_key,
539 extra_data: (!extra_data.is_empty()).then_some(extra_data),
540 })
541}
542
543fn parse_requirement_metadata_array(value: &TomlValue) -> HashMap<String, HashMap<String, String>> {
544 let mut grouped = HashMap::new();
545 let runtime = value
546 .as_array()
547 .map(|values| parse_requirement_entries(values))
548 .unwrap_or_default();
549 grouped.insert("__runtime__".to_string(), runtime);
550 grouped
551}
552
553fn parse_requirement_metadata_table(
554 table: &TomlMap<String, TomlValue>,
555) -> HashMap<String, HashMap<String, String>> {
556 table
557 .iter()
558 .map(|(group, value)| {
559 (
560 group.to_string(),
561 value
562 .as_array()
563 .map(|values| parse_requirement_entries(values))
564 .unwrap_or_default(),
565 )
566 })
567 .collect()
568}
569
570fn parse_requirement_entries(values: &[TomlValue]) -> HashMap<String, String> {
571 values
572 .iter()
573 .filter_map(|value| {
574 let table = value.as_table()?;
575 let name = table
576 .get(FIELD_NAME)
577 .and_then(TomlValue::as_str)
578 .map(normalize_pypi_name)?;
579 let specifier = table
580 .get(FIELD_SPECIFIER)
581 .and_then(TomlValue::as_str)
582 .map(|value| value.to_string())?;
583 Some((name, specifier))
584 })
585 .collect()
586}
587
588fn collect_reachable_packages(
589 package_tables: &[&TomlMap<String, TomlValue>],
590 package_lookup: &HashMap<String, Vec<usize>>,
591 roots: &[(String, Option<String>)],
592 include_non_runtime_edges: bool,
593) -> HashSet<String> {
594 let mut visited = HashSet::new();
595 let mut queue: VecDeque<(String, Option<String>)> = roots.iter().cloned().collect();
596
597 while let Some((name, source_key)) = queue.pop_front() {
598 let Some(index) =
599 match_package_index(package_tables, package_lookup, &name, source_key.as_deref())
600 else {
601 continue;
602 };
603
604 let Some(package_table) = package_tables.get(index) else {
605 continue;
606 };
607
608 let package_name = package_table
609 .get(FIELD_NAME)
610 .and_then(TomlValue::as_str)
611 .map(normalize_pypi_name)
612 .unwrap_or(name);
613
614 if !visited.insert(package_name.clone()) {
615 continue;
616 }
617
618 let edges = if include_non_runtime_edges {
619 collect_package_dependency_edges(package_table)
620 } else {
621 collect_dependency_edges_from_array(
622 package_table
623 .get(FIELD_DEPENDENCIES)
624 .and_then(TomlValue::as_array),
625 None,
626 true,
627 false,
628 None,
629 )
630 };
631
632 for edge in edges {
633 queue.push_back((edge.name, edge.source_key));
634 }
635 }
636
637 visited
638}
639
640fn build_package_lookup(
641 package_tables: &[&TomlMap<String, TomlValue>],
642) -> HashMap<String, Vec<usize>> {
643 let mut lookup: HashMap<String, Vec<usize>> = HashMap::new();
644 for (index, package_table) in package_tables.iter().enumerate() {
645 if let Some(name) = package_table
646 .get(FIELD_NAME)
647 .and_then(TomlValue::as_str)
648 .map(normalize_pypi_name)
649 {
650 lookup.entry(name).or_default().push(index);
651 }
652 }
653 lookup
654}
655
656fn match_package_index(
657 package_tables: &[&TomlMap<String, TomlValue>],
658 package_lookup: &HashMap<String, Vec<usize>>,
659 name: &str,
660 source_key: Option<&str>,
661) -> Option<usize> {
662 let candidates = package_lookup.get(name)?;
663 if candidates.len() == 1 {
664 return candidates.first().copied();
665 }
666
667 let source_key = source_key?;
668 candidates.iter().copied().find(|index| {
669 package_tables
670 .get(*index)
671 .and_then(|table| package_source_table(table))
672 .and_then(source_table_key)
673 .as_deref()
674 == Some(source_key)
675 })
676}
677
678fn find_root_package_index(package_tables: &[&TomlMap<String, TomlValue>]) -> Option<usize> {
679 if let Some(index) = package_tables.iter().position(|table| {
680 package_source_table(table)
681 .and_then(local_source_path)
682 .is_some_and(|path| path == ".")
683 }) {
684 return Some(index);
685 }
686
687 package_tables.iter().position(|table| {
688 package_source_table(table)
689 .is_some_and(|source| source.contains_key("editable") || source.contains_key("virtual"))
690 })
691}
692
693fn local_source_path(source_table: &TomlMap<String, TomlValue>) -> Option<&str> {
694 source_table
695 .get("virtual")
696 .and_then(TomlValue::as_str)
697 .or_else(|| source_table.get("editable").and_then(TomlValue::as_str))
698}
699
700fn build_lock_extra_data(toml_content: &TomlValue) -> Option<HashMap<String, JsonValue>> {
701 let mut extra_data = HashMap::new();
702
703 if let Some(version) = toml_content
704 .get(FIELD_VERSION)
705 .and_then(TomlValue::as_integer)
706 {
707 extra_data.insert(
708 "lockfile_version".to_string(),
709 JsonValue::String(version.to_string()),
710 );
711 }
712
713 if let Some(revision) = toml_content
714 .get(FIELD_REVISION)
715 .and_then(TomlValue::as_integer)
716 {
717 extra_data.insert(
718 FIELD_REVISION.to_string(),
719 JsonValue::String(revision.to_string()),
720 );
721 }
722
723 if let Some(requires_python) = toml_content
724 .get(FIELD_REQUIRES_PYTHON)
725 .and_then(TomlValue::as_str)
726 {
727 extra_data.insert(
728 "requires_python".to_string(),
729 JsonValue::String(requires_python.to_string()),
730 );
731 }
732
733 if let Some(markers) = toml_content.get(FIELD_RESOLUTION_MARKERS) {
734 extra_data.insert(
735 FIELD_RESOLUTION_MARKERS.to_string(),
736 toml_value_to_json(markers),
737 );
738 }
739
740 if let Some(manifest) = toml_content.get(FIELD_MANIFEST) {
741 extra_data.insert(FIELD_MANIFEST.to_string(), toml_value_to_json(manifest));
742 }
743
744 (!extra_data.is_empty()).then_some(extra_data)
745}
746
747fn build_package_extra_data(
748 package_table: &TomlMap<String, TomlValue>,
749) -> Option<HashMap<String, JsonValue>> {
750 let mut extra_data = HashMap::new();
751
752 if let Some(source) = package_table.get(FIELD_SOURCE) {
753 extra_data.insert(FIELD_SOURCE.to_string(), toml_value_to_json(source));
754 }
755
756 if let Some(metadata) = package_table.get(FIELD_METADATA) {
757 extra_data.insert(FIELD_METADATA.to_string(), toml_value_to_json(metadata));
758 }
759
760 (!extra_data.is_empty()).then_some(extra_data)
761}
762
763fn extract_artifact_metadata(
764 package_table: &TomlMap<String, TomlValue>,
765) -> (Option<String>, Option<String>) {
766 if let Some(sdist_table) = package_table.get("sdist").and_then(TomlValue::as_table) {
767 let download_url = sdist_table
768 .get("url")
769 .and_then(TomlValue::as_str)
770 .map(|value| value.to_string());
771 let sha256 = sdist_table
772 .get("hash")
773 .and_then(TomlValue::as_str)
774 .and_then(strip_sha256_prefix);
775 if download_url.is_some() || sha256.is_some() {
776 return (download_url, sha256);
777 }
778 }
779
780 let wheel_table = package_table
781 .get("wheels")
782 .and_then(TomlValue::as_array)
783 .and_then(|wheels| wheels.first())
784 .and_then(TomlValue::as_table);
785
786 let download_url = wheel_table
787 .and_then(|table| table.get("url"))
788 .and_then(TomlValue::as_str)
789 .map(|value| value.to_string());
790 let sha256 = wheel_table
791 .and_then(|table| table.get("hash"))
792 .and_then(TomlValue::as_str)
793 .and_then(strip_sha256_prefix);
794
795 (download_url, sha256)
796}
797
798fn strip_sha256_prefix(value: &str) -> Option<String> {
799 value.strip_prefix("sha256:").map(|hash| hash.to_string())
800}
801
802fn package_source_table(
803 package_table: &TomlMap<String, TomlValue>,
804) -> Option<&TomlMap<String, TomlValue>> {
805 package_table
806 .get(FIELD_SOURCE)
807 .and_then(TomlValue::as_table)
808}
809
810fn source_table_key(source_table: &TomlMap<String, TomlValue>) -> Option<String> {
811 ["registry", "editable", "virtual", "git"]
812 .into_iter()
813 .find_map(|key| {
814 source_table
815 .get(key)
816 .and_then(TomlValue::as_str)
817 .map(|value| format!("{}:{}", key, value))
818 })
819}
820
821fn build_pypi_urls(
822 name: Option<&str>,
823 version: Option<&str>,
824) -> (
825 Option<String>,
826 Option<String>,
827 Option<String>,
828 Option<String>,
829) {
830 let repository_homepage_url = name.map(|value| format!("https://pypi.org/project/{}", value));
831
832 let repository_download_url = name.and_then(|value| {
833 version.map(|ver| {
834 format!(
835 "https://pypi.org/packages/source/{}/{}/{}-{}.tar.gz",
836 &value[..1.min(value.len())],
837 value,
838 value,
839 ver
840 )
841 })
842 });
843
844 let api_data_url = name.map(|value| {
845 if let Some(ver) = version {
846 format!("https://pypi.org/pypi/{}/{}/json", value, ver)
847 } else {
848 format!("https://pypi.org/pypi/{}/json", value)
849 }
850 });
851
852 let purl = name.and_then(|value| create_pypi_purl(value, version));
853
854 (
855 repository_homepage_url,
856 repository_download_url,
857 api_data_url,
858 purl,
859 )
860}
861
862fn normalize_pypi_name(name: &str) -> String {
863 name.trim().to_ascii_lowercase()
864}
865
866fn create_pypi_purl(name: &str, version: Option<&str>) -> Option<String> {
867 if name.contains('[') || name.contains(']') {
868 return Some(build_manual_pypi_purl(name, version));
869 }
870
871 if let Ok(mut purl) = PackageUrl::new(UvLockParser::PACKAGE_TYPE.as_str(), name) {
872 if let Some(version) = version
873 && purl.with_version(version).is_err()
874 {
875 return None;
876 }
877 return Some(purl.to_string());
878 }
879
880 Some(build_manual_pypi_purl(name, version))
881}
882
883fn build_manual_pypi_purl(name: &str, version: Option<&str>) -> String {
884 let encoded_name = name.replace('[', "%5b").replace(']', "%5d");
885 let mut purl = format!("pkg:pypi/{}", encoded_name);
886 if let Some(version) = version
887 && !version.is_empty()
888 {
889 purl.push('@');
890 purl.push_str(version);
891 }
892 purl
893}
894
895fn toml_value_to_json(value: &TomlValue) -> JsonValue {
896 match value {
897 TomlValue::String(value) => JsonValue::String(value.clone()),
898 TomlValue::Integer(value) => JsonValue::String(value.to_string()),
899 TomlValue::Float(value) => JsonValue::String(value.to_string()),
900 TomlValue::Boolean(value) => JsonValue::Bool(*value),
901 TomlValue::Datetime(value) => JsonValue::String(value.to_string()),
902 TomlValue::Array(values) => {
903 JsonValue::Array(values.iter().map(toml_value_to_json).collect())
904 }
905 TomlValue::Table(values) => JsonValue::Object(
906 values
907 .iter()
908 .map(|(key, value)| (key.clone(), toml_value_to_json(value)))
909 .collect(),
910 ),
911 }
912}
913
914fn default_package_data() -> PackageData {
915 PackageData {
916 package_type: Some(UvLockParser::PACKAGE_TYPE),
917 primary_language: Some("Python".to_string()),
918 datasource_id: Some(DatasourceId::PypiUvLock),
919 ..Default::default()
920 }
921}
922
923crate::register_parser!(
924 "uv lockfile",
925 &["**/uv.lock"],
926 "pypi",
927 "Python",
928 Some("https://docs.astral.sh/uv/concepts/projects/layout/"),
929);