1use std::collections::HashMap;
2use std::path::Path;
3
4use log::warn;
5use packageurl::PackageUrl;
6use serde_json::{Map as JsonMap, Value as JsonValue};
7use toml::Value as TomlValue;
8use toml::map::Map as TomlMap;
9
10use crate::models::{DatasourceId, Dependency, PackageData, PackageType, Party};
11use crate::parsers::conda::build_purl as build_conda_purl;
12use crate::parsers::python::read_toml_file;
13use crate::parsers::utils::split_name_email;
14
15use super::PackageParser;
16
17const FIELD_WORKSPACE: &str = "workspace";
18const FIELD_PROJECT: &str = "project";
19const FIELD_NAME: &str = "name";
20const FIELD_VERSION: &str = "version";
21const FIELD_AUTHORS: &str = "authors";
22const FIELD_DESCRIPTION: &str = "description";
23const FIELD_LICENSE: &str = "license";
24const FIELD_LICENSE_FILE: &str = "license-file";
25const FIELD_README: &str = "readme";
26const FIELD_HOMEPAGE: &str = "homepage";
27const FIELD_REPOSITORY: &str = "repository";
28const FIELD_DOCUMENTATION: &str = "documentation";
29const FIELD_CHANNELS: &str = "channels";
30const FIELD_PLATFORMS: &str = "platforms";
31const FIELD_REQUIRES_PIXI: &str = "requires-pixi";
32const FIELD_EXCLUDE_NEWER: &str = "exclude-newer";
33const FIELD_DEPENDENCIES: &str = "dependencies";
34const FIELD_PYPI_DEPENDENCIES: &str = "pypi-dependencies";
35const FIELD_FEATURE: &str = "feature";
36const FIELD_ENVIRONMENTS: &str = "environments";
37const FIELD_TASKS: &str = "tasks";
38const FIELD_PYPI_OPTIONS: &str = "pypi-options";
39
40pub struct PixiTomlParser;
41
42impl PackageParser for PixiTomlParser {
43 const PACKAGE_TYPE: PackageType = PackageType::Pixi;
44
45 fn is_match(path: &Path) -> bool {
46 path.file_name().is_some_and(|name| name == "pixi.toml")
47 }
48
49 fn extract_packages(path: &Path) -> Vec<PackageData> {
50 let toml_content = match read_toml_file(path) {
51 Ok(content) => content,
52 Err(error) => {
53 warn!("Failed to read pixi.toml at {:?}: {}", path, error);
54 return vec![default_package_data(Some(DatasourceId::PixiToml))];
55 }
56 };
57
58 vec![parse_pixi_toml(&toml_content)]
59 }
60}
61
62pub struct PixiLockParser;
63
64impl PackageParser for PixiLockParser {
65 const PACKAGE_TYPE: PackageType = PackageType::Pixi;
66
67 fn is_match(path: &Path) -> bool {
68 path.file_name().is_some_and(|name| name == "pixi.lock")
69 }
70
71 fn extract_packages(path: &Path) -> Vec<PackageData> {
72 let toml_content = match read_toml_file(path) {
73 Ok(content) => content,
74 Err(error) => {
75 warn!("Failed to read pixi.lock at {:?}: {}", path, error);
76 return vec![default_package_data(Some(DatasourceId::PixiLock))];
77 }
78 };
79
80 vec![parse_pixi_lock(&toml_content)]
81 }
82}
83
84fn parse_pixi_toml(toml_content: &TomlValue) -> PackageData {
85 let identity = toml_content
86 .get(FIELD_WORKSPACE)
87 .and_then(TomlValue::as_table)
88 .or_else(|| {
89 toml_content
90 .get(FIELD_PROJECT)
91 .and_then(TomlValue::as_table)
92 });
93
94 let name = identity
95 .and_then(|table| table.get(FIELD_NAME))
96 .and_then(TomlValue::as_str)
97 .map(ToOwned::to_owned);
98 let version = identity
99 .and_then(|table| table.get(FIELD_VERSION))
100 .and_then(toml_value_to_string);
101
102 let mut package = default_package_data(Some(DatasourceId::PixiToml));
103 package.name = name.clone();
104 package.version = version.clone();
105 package.primary_language = Some("TOML".to_string());
106 package.description = identity
107 .and_then(|table| table.get(FIELD_DESCRIPTION))
108 .and_then(TomlValue::as_str)
109 .map(|value| value.trim().to_string());
110 package.homepage_url = identity
111 .and_then(|table| table.get(FIELD_HOMEPAGE))
112 .and_then(TomlValue::as_str)
113 .map(ToOwned::to_owned);
114 package.vcs_url = identity
115 .and_then(|table| table.get(FIELD_REPOSITORY))
116 .and_then(TomlValue::as_str)
117 .map(ToOwned::to_owned);
118 package.parties = extract_authors(identity);
119 package.extracted_license_statement = identity
120 .and_then(|table| table.get(FIELD_LICENSE))
121 .and_then(TomlValue::as_str)
122 .map(ToOwned::to_owned);
123 package.purl = name
124 .as_deref()
125 .and_then(|value| build_pixi_purl(value, version.as_deref()));
126 package.dependencies = extract_manifest_dependencies(toml_content);
127 package.extra_data = build_manifest_extra_data(toml_content, identity);
128 package
129}
130
131fn parse_pixi_lock(toml_content: &TomlValue) -> PackageData {
132 let mut package = default_package_data(Some(DatasourceId::PixiLock));
133 package.primary_language = Some("TOML".to_string());
134
135 let lock_version = toml_content
136 .get(FIELD_VERSION)
137 .and_then(TomlValue::as_integer);
138 let mut extra_data = HashMap::new();
139 if let Some(lock_version) = lock_version {
140 extra_data.insert("lock_version".to_string(), JsonValue::from(lock_version));
141 }
142 if let Some(env_json) = toml_content.get(FIELD_ENVIRONMENTS).and_then(toml_to_json) {
143 extra_data.insert("lock_environments".to_string(), env_json);
144 }
145 package.extra_data = (!extra_data.is_empty()).then_some(extra_data);
146
147 match lock_version {
148 Some(6) => package.dependencies = extract_v6_lock_dependencies(toml_content),
149 Some(4) => package.dependencies = extract_v4_lock_dependencies(toml_content),
150 Some(_) | None => {}
151 }
152
153 package
154}
155
156fn extract_authors(identity: Option<&TomlMap<String, TomlValue>>) -> Vec<Party> {
157 identity
158 .and_then(|table| table.get(FIELD_AUTHORS))
159 .and_then(TomlValue::as_array)
160 .into_iter()
161 .flatten()
162 .filter_map(TomlValue::as_str)
163 .map(|author| {
164 let (name, email) = split_name_email(author);
165 Party {
166 r#type: None,
167 role: Some("author".to_string()),
168 name,
169 email,
170 url: None,
171 organization: None,
172 organization_url: None,
173 timezone: None,
174 }
175 })
176 .collect()
177}
178
179fn extract_manifest_dependencies(toml_content: &TomlValue) -> Vec<Dependency> {
180 let mut dependencies = Vec::new();
181
182 if let Some(table) = toml_content
183 .get(FIELD_DEPENDENCIES)
184 .and_then(TomlValue::as_table)
185 {
186 dependencies.extend(extract_conda_dependencies(table, None, false));
187 }
188 if let Some(table) = toml_content
189 .get(FIELD_PYPI_DEPENDENCIES)
190 .and_then(TomlValue::as_table)
191 {
192 dependencies.extend(extract_pypi_dependencies(table, None, false));
193 }
194
195 if let Some(feature_table) = toml_content
196 .get(FIELD_FEATURE)
197 .and_then(TomlValue::as_table)
198 {
199 for (feature_name, value) in feature_table {
200 let Some(feature) = value.as_table() else {
201 continue;
202 };
203 if let Some(table) = feature
204 .get(FIELD_DEPENDENCIES)
205 .and_then(TomlValue::as_table)
206 {
207 dependencies.extend(extract_conda_dependencies(table, Some(feature_name), true));
208 }
209 if let Some(table) = feature
210 .get(FIELD_PYPI_DEPENDENCIES)
211 .and_then(TomlValue::as_table)
212 {
213 dependencies.extend(extract_pypi_dependencies(table, Some(feature_name), true));
214 }
215 }
216 }
217
218 dependencies
219}
220
221fn extract_conda_dependencies(
222 table: &TomlMap<String, TomlValue>,
223 scope: Option<&str>,
224 optional: bool,
225) -> Vec<Dependency> {
226 table
227 .iter()
228 .filter_map(|(name, value)| build_conda_dependency(name, value, scope, optional))
229 .collect()
230}
231
232fn build_conda_dependency(
233 name: &str,
234 value: &TomlValue,
235 scope: Option<&str>,
236 optional: bool,
237) -> Option<Dependency> {
238 let requirement = extract_conda_requirement(value);
239 let exact_requirement = match value {
240 TomlValue::String(value) => Some(value.to_string()),
241 TomlValue::Table(table) => table.get(FIELD_VERSION).and_then(toml_value_to_string),
242 _ => None,
243 };
244 let pinned = exact_requirement
245 .as_deref()
246 .is_some_and(is_exact_constraint);
247 let exact_version = exact_requirement
248 .as_deref()
249 .filter(|_| pinned)
250 .map(|value| value.trim_start_matches('='));
251 let purl = build_conda_purl("conda", None, name, exact_version, None, None, None);
252
253 let mut extra_data = HashMap::new();
254 if let TomlValue::Table(dep_table) = value {
255 for key in ["channel", "build", "path", "url", "git"] {
256 if let Some(val) = dep_table.get(key).and_then(toml_value_to_string) {
257 extra_data.insert(key.to_string(), JsonValue::String(val));
258 }
259 }
260 }
261
262 Some(Dependency {
263 purl,
264 extracted_requirement: requirement.clone(),
265 scope: scope.map(ToOwned::to_owned),
266 is_runtime: Some(true),
267 is_optional: Some(optional),
268 is_pinned: Some(pinned),
269 is_direct: Some(true),
270 resolved_package: None,
271 extra_data: (!extra_data.is_empty()).then_some(extra_data),
272 })
273}
274
275fn extract_pypi_dependencies(
276 table: &TomlMap<String, TomlValue>,
277 scope: Option<&str>,
278 optional: bool,
279) -> Vec<Dependency> {
280 table
281 .iter()
282 .filter_map(|(name, value)| build_pypi_dependency(name, value, scope, optional))
283 .collect()
284}
285
286fn build_pypi_dependency(
287 name: &str,
288 value: &TomlValue,
289 scope: Option<&str>,
290 optional: bool,
291) -> Option<Dependency> {
292 let normalized_name = normalize_pypi_name(name);
293 let requirement = extract_pypi_requirement(value);
294 let exact_requirement = match value {
295 TomlValue::String(value) => Some(value.to_string()),
296 TomlValue::Table(table) => table.get(FIELD_VERSION).and_then(toml_value_to_string),
297 _ => None,
298 };
299 let pinned = exact_requirement
300 .as_deref()
301 .is_some_and(is_exact_constraint);
302 let exact_version = exact_requirement
303 .as_deref()
304 .filter(|_| pinned)
305 .map(|value| value.trim_start_matches('='));
306 let purl = build_pypi_purl(&normalized_name, exact_version);
307
308 let mut extra_data = HashMap::new();
309 if let TomlValue::Table(dep_table) = value {
310 for key in [
311 "index",
312 "path",
313 "git",
314 "url",
315 "branch",
316 "tag",
317 "rev",
318 "subdirectory",
319 ] {
320 if let Some(val) = dep_table.get(key).and_then(toml_value_to_string) {
321 extra_data.insert(key.replace('-', "_"), JsonValue::String(val));
322 }
323 }
324 if let Some(editable) = dep_table.get("editable").and_then(TomlValue::as_bool) {
325 extra_data.insert("editable".to_string(), JsonValue::Bool(editable));
326 }
327 if let Some(extras) = dep_table.get("extras").and_then(toml_to_json) {
328 extra_data.insert("extras".to_string(), extras);
329 }
330 }
331
332 Some(Dependency {
333 purl,
334 extracted_requirement: requirement.clone(),
335 scope: scope.map(ToOwned::to_owned),
336 is_runtime: Some(true),
337 is_optional: Some(optional),
338 is_pinned: Some(pinned),
339 is_direct: Some(true),
340 resolved_package: None,
341 extra_data: (!extra_data.is_empty()).then_some(extra_data),
342 })
343}
344
345fn build_manifest_extra_data(
346 toml_content: &TomlValue,
347 identity: Option<&TomlMap<String, TomlValue>>,
348) -> Option<HashMap<String, JsonValue>> {
349 let mut extra_data = HashMap::new();
350
351 for (field, key) in [
352 (FIELD_CHANNELS, "channels"),
353 (FIELD_PLATFORMS, "platforms"),
354 (FIELD_REQUIRES_PIXI, "requires_pixi"),
355 (FIELD_EXCLUDE_NEWER, "exclude_newer"),
356 (FIELD_LICENSE_FILE, "license_file"),
357 (FIELD_README, "readme"),
358 (FIELD_DOCUMENTATION, "documentation"),
359 ] {
360 if let Some(value) = identity
361 .and_then(|table| table.get(field))
362 .and_then(toml_to_json)
363 {
364 extra_data.insert(key.to_string(), value);
365 }
366 }
367 if let Some(value) = toml_content.get(FIELD_ENVIRONMENTS).and_then(toml_to_json) {
368 extra_data.insert("environments".to_string(), value);
369 }
370 if let Some(value) = toml_content.get(FIELD_TASKS).and_then(toml_to_json) {
371 extra_data.insert("tasks".to_string(), value);
372 }
373 if let Some(value) = toml_content.get(FIELD_PYPI_OPTIONS).and_then(toml_to_json) {
374 extra_data.insert("pypi_options".to_string(), value);
375 }
376 if let Some(feature_names) = toml_content
377 .get(FIELD_FEATURE)
378 .and_then(TomlValue::as_table)
379 .map(|table| table.keys().cloned().collect::<Vec<_>>())
380 .filter(|names| !names.is_empty())
381 {
382 extra_data.insert(
383 "features".to_string(),
384 JsonValue::Array(feature_names.into_iter().map(JsonValue::String).collect()),
385 );
386 }
387
388 (!extra_data.is_empty()).then_some(extra_data)
389}
390
391fn extract_v6_lock_dependencies(toml_content: &TomlValue) -> Vec<Dependency> {
392 let environment_refs = collect_v6_package_refs(toml_content);
393 let Some(packages) = toml_content.get("packages").and_then(TomlValue::as_array) else {
394 return Vec::new();
395 };
396
397 packages
398 .iter()
399 .filter_map(TomlValue::as_table)
400 .filter_map(|table| build_v6_lock_dependency(table, &environment_refs))
401 .collect()
402}
403
404fn collect_v6_package_refs(toml_content: &TomlValue) -> HashMap<String, Vec<JsonValue>> {
405 let mut refs = HashMap::new();
406 let Some(environments) = toml_content
407 .get(FIELD_ENVIRONMENTS)
408 .and_then(TomlValue::as_table)
409 else {
410 return refs;
411 };
412
413 for (env_name, env_value) in environments {
414 let Some(env_table) = env_value.as_table() else {
415 continue;
416 };
417 let channels = env_table.get(FIELD_CHANNELS).and_then(toml_to_json);
418 let indexes = env_table.get("indexes").and_then(toml_to_json);
419 let Some(package_platforms) = env_table.get("packages").and_then(TomlValue::as_table)
420 else {
421 continue;
422 };
423 for (platform, values) in package_platforms {
424 let Some(entries) = values.as_array() else {
425 continue;
426 };
427 for entry in entries {
428 let Some(table) = entry.as_table() else {
429 continue;
430 };
431 for (kind, locator_value) in table {
432 if let Some(locator) = toml_value_to_string(locator_value) {
433 let mut data = JsonMap::new();
434 data.insert(
435 "environment".to_string(),
436 JsonValue::String(env_name.clone()),
437 );
438 data.insert("platform".to_string(), JsonValue::String(platform.clone()));
439 data.insert("kind".to_string(), JsonValue::String(kind.clone()));
440 if let Some(channels) = channels.clone() {
441 data.insert("channels".to_string(), channels);
442 }
443 if let Some(indexes) = indexes.clone() {
444 data.insert("indexes".to_string(), indexes);
445 }
446 refs.entry(locator)
447 .or_default()
448 .push(JsonValue::Object(data));
449 }
450 }
451 }
452 }
453 }
454
455 refs
456}
457
458fn build_v6_lock_dependency(
459 table: &TomlMap<String, TomlValue>,
460 refs: &HashMap<String, Vec<JsonValue>>,
461) -> Option<Dependency> {
462 if let Some(locator) = table.get("pypi").and_then(toml_value_to_string) {
463 let name = table
464 .get(FIELD_NAME)
465 .and_then(TomlValue::as_str)
466 .map(normalize_pypi_name)?;
467 let version = table.get(FIELD_VERSION).and_then(toml_value_to_string)?;
468 let mut extra = HashMap::new();
469 extra.insert("source".to_string(), JsonValue::String(locator.clone()));
470 if let Some(val) = table.get("requires_dist").and_then(toml_to_json) {
471 extra.insert("requires_dist".to_string(), val);
472 }
473 if let Some(val) = table.get("requires_python").and_then(toml_to_json) {
474 extra.insert("requires_python".to_string(), val);
475 }
476 for key in ["sha256", "md5"] {
477 if let Some(val) = table.get(key).and_then(toml_to_json) {
478 extra.insert(key.to_string(), val);
479 }
480 }
481 if let Some(values) = refs.get(&locator)
482 && !values.is_empty()
483 {
484 extra.insert(
485 "lock_references".to_string(),
486 JsonValue::Array(values.clone()),
487 );
488 }
489 return Some(Dependency {
490 purl: build_pypi_purl(&name, Some(&version)),
491 extracted_requirement: Some(version),
492 scope: None,
493 is_runtime: Some(true),
494 is_optional: Some(false),
495 is_pinned: Some(true),
496 is_direct: None,
497 resolved_package: None,
498 extra_data: Some(extra),
499 });
500 }
501
502 if let Some(locator) = table.get("conda").and_then(toml_value_to_string) {
503 let name = conda_name_from_locator(&locator)?;
504 let version = table.get(FIELD_VERSION).and_then(toml_value_to_string);
505 let mut extra = HashMap::new();
506 extra.insert("source".to_string(), JsonValue::String(locator.clone()));
507 for key in [
508 "sha256",
509 "md5",
510 "license",
511 "license_family",
512 "depends",
513 "constrains",
514 "purls",
515 ] {
516 if let Some(val) = table.get(key).and_then(toml_to_json) {
517 extra.insert(key.to_string(), val);
518 }
519 }
520 if let Some(values) = refs.get(&locator)
521 && !values.is_empty()
522 {
523 extra.insert(
524 "lock_references".to_string(),
525 JsonValue::Array(values.clone()),
526 );
527 }
528 return Some(Dependency {
529 purl: build_conda_purl("conda", None, &name, version.as_deref(), None, None, None),
530 extracted_requirement: version,
531 scope: None,
532 is_runtime: Some(true),
533 is_optional: Some(false),
534 is_pinned: Some(true),
535 is_direct: None,
536 resolved_package: None,
537 extra_data: Some(extra),
538 });
539 }
540
541 None
542}
543
544fn extract_v4_lock_dependencies(toml_content: &TomlValue) -> Vec<Dependency> {
545 let Some(packages) = toml_content.get("packages").and_then(TomlValue::as_array) else {
546 return Vec::new();
547 };
548
549 packages
550 .iter()
551 .filter_map(TomlValue::as_table)
552 .filter_map(build_v4_lock_dependency)
553 .collect()
554}
555
556fn build_v4_lock_dependency(table: &TomlMap<String, TomlValue>) -> Option<Dependency> {
557 let kind = table.get("kind").and_then(TomlValue::as_str)?;
558 let name = table.get(FIELD_NAME).and_then(toml_value_to_string)?;
559 let version = table.get(FIELD_VERSION).and_then(toml_value_to_string);
560 let mut extra = HashMap::new();
561 for key in [
562 "url",
563 "path",
564 "sha256",
565 "md5",
566 "editable",
567 "build",
568 "subdir",
569 "license",
570 "license_family",
571 "depends",
572 "requires_dist",
573 ] {
574 if let Some(val) = table.get(key).and_then(toml_to_json) {
575 extra.insert(key.replace('-', "_"), val);
576 }
577 }
578
579 Some(Dependency {
580 purl: match kind {
581 "pypi" => build_pypi_purl(&normalize_pypi_name(&name), version.as_deref()),
582 "conda" => build_conda_purl("conda", None, &name, version.as_deref(), None, None, None),
583 _ => None,
584 },
585 extracted_requirement: version,
586 scope: None,
587 is_runtime: Some(true),
588 is_optional: Some(false),
589 is_pinned: Some(true),
590 is_direct: None,
591 resolved_package: None,
592 extra_data: Some(extra),
593 })
594}
595
596fn extract_conda_requirement(value: &TomlValue) -> Option<String> {
597 match value {
598 TomlValue::String(value) => Some(value.to_string()),
599 TomlValue::Table(table) => table
600 .get(FIELD_VERSION)
601 .and_then(toml_value_to_string)
602 .or_else(|| table.get("build").and_then(toml_value_to_string)),
603 _ => None,
604 }
605}
606
607fn extract_pypi_requirement(value: &TomlValue) -> Option<String> {
608 match value {
609 TomlValue::String(value) => Some(value.to_string()),
610 TomlValue::Table(table) => table
611 .get(FIELD_VERSION)
612 .and_then(toml_value_to_string)
613 .or_else(|| table.get("path").and_then(toml_value_to_string))
614 .or_else(|| table.get("git").and_then(toml_value_to_string))
615 .or_else(|| table.get("url").and_then(toml_value_to_string)),
616 _ => None,
617 }
618}
619
620fn toml_value_to_string(value: &TomlValue) -> Option<String> {
621 match value {
622 TomlValue::String(value) => Some(value.clone()),
623 TomlValue::Integer(value) => Some(value.to_string()),
624 TomlValue::Float(value) => Some(value.to_string()),
625 TomlValue::Boolean(value) => Some(value.to_string()),
626 _ => None,
627 }
628}
629
630fn toml_to_json(value: &TomlValue) -> Option<JsonValue> {
631 serde_json::to_value(value).ok()
632}
633
634fn normalize_pypi_name(name: &str) -> String {
635 name.trim().replace('_', "-").to_ascii_lowercase()
636}
637
638fn build_pypi_purl(name: &str, version: Option<&str>) -> Option<String> {
639 let mut purl = PackageUrl::new("pypi", name).ok()?;
640 if let Some(version) = version {
641 purl.with_version(version).ok()?;
642 }
643 Some(purl.to_string())
644}
645
646fn build_pixi_purl(name: &str, version: Option<&str>) -> Option<String> {
647 let mut purl = PackageUrl::new(PackageType::Pixi.as_str(), name).ok()?;
648 if let Some(version) = version {
649 purl.with_version(version).ok()?;
650 }
651 Some(purl.to_string())
652}
653
654fn is_exact_constraint(value: &str) -> bool {
655 let trimmed = value.trim();
656 let normalized = trimmed.trim_start_matches('=');
657 !normalized.is_empty()
658 && !normalized.contains('*')
659 && !normalized.contains('^')
660 && !normalized.contains('~')
661 && !normalized.contains('>')
662 && !normalized.contains('<')
663 && !normalized.contains('=')
664 && !normalized.contains('|')
665 && !normalized.contains(',')
666 && !normalized.contains(' ')
667}
668
669fn conda_name_from_locator(locator: &str) -> Option<String> {
670 let file_name = locator.rsplit('/').next()?;
671 let stem = file_name
672 .strip_suffix(".tar.bz2")
673 .or_else(|| file_name.strip_suffix(".conda"))
674 .unwrap_or(file_name);
675 let mut parts = stem.rsplitn(3, '-');
676 let _ = parts.next()?;
677 let _ = parts.next()?;
678 Some(parts.next()?.to_string())
679}
680
681fn default_package_data(datasource_id: Option<DatasourceId>) -> PackageData {
682 PackageData {
683 package_type: Some(PackageType::Pixi),
684 datasource_id,
685 ..Default::default()
686 }
687}
688
689crate::register_parser!(
690 "Pixi workspace manifest and lockfile",
691 &["**/pixi.toml", "**/pixi.lock"],
692 "pixi",
693 "TOML",
694 Some("https://pixi.sh/latest/reference/pixi_manifest/"),
695);