Skip to main content

provenant/parsers/
pixi.rs

1use std::collections::HashMap;
2use std::path::Path;
3
4use log::warn;
5use packageurl::PackageUrl;
6use serde_json::{Map as JsonMap, Value as JsonValue};
7use toml::Value as TomlValue;
8use toml::map::Map as TomlMap;
9
10use crate::models::{DatasourceId, Dependency, PackageData, PackageType, Party};
11use crate::parsers::conda::build_purl as build_conda_purl;
12use crate::parsers::python::read_toml_file;
13use crate::parsers::utils::split_name_email;
14
15use super::PackageParser;
16
17const FIELD_WORKSPACE: &str = "workspace";
18const FIELD_PROJECT: &str = "project";
19const FIELD_NAME: &str = "name";
20const FIELD_VERSION: &str = "version";
21const FIELD_AUTHORS: &str = "authors";
22const FIELD_DESCRIPTION: &str = "description";
23const FIELD_LICENSE: &str = "license";
24const FIELD_LICENSE_FILE: &str = "license-file";
25const FIELD_README: &str = "readme";
26const FIELD_HOMEPAGE: &str = "homepage";
27const FIELD_REPOSITORY: &str = "repository";
28const FIELD_DOCUMENTATION: &str = "documentation";
29const FIELD_CHANNELS: &str = "channels";
30const FIELD_PLATFORMS: &str = "platforms";
31const FIELD_REQUIRES_PIXI: &str = "requires-pixi";
32const FIELD_EXCLUDE_NEWER: &str = "exclude-newer";
33const FIELD_DEPENDENCIES: &str = "dependencies";
34const FIELD_PYPI_DEPENDENCIES: &str = "pypi-dependencies";
35const FIELD_FEATURE: &str = "feature";
36const FIELD_ENVIRONMENTS: &str = "environments";
37const FIELD_TASKS: &str = "tasks";
38const FIELD_PYPI_OPTIONS: &str = "pypi-options";
39
40pub struct PixiTomlParser;
41
42impl PackageParser for PixiTomlParser {
43    const PACKAGE_TYPE: PackageType = PackageType::Pixi;
44
45    fn is_match(path: &Path) -> bool {
46        path.file_name().is_some_and(|name| name == "pixi.toml")
47    }
48
49    fn extract_packages(path: &Path) -> Vec<PackageData> {
50        let toml_content = match read_toml_file(path) {
51            Ok(content) => content,
52            Err(error) => {
53                warn!("Failed to read pixi.toml at {:?}: {}", path, error);
54                return vec![default_package_data(Some(DatasourceId::PixiToml))];
55            }
56        };
57
58        vec![parse_pixi_toml(&toml_content)]
59    }
60}
61
62pub struct PixiLockParser;
63
64impl PackageParser for PixiLockParser {
65    const PACKAGE_TYPE: PackageType = PackageType::Pixi;
66
67    fn is_match(path: &Path) -> bool {
68        path.file_name().is_some_and(|name| name == "pixi.lock")
69    }
70
71    fn extract_packages(path: &Path) -> Vec<PackageData> {
72        let toml_content = match read_toml_file(path) {
73            Ok(content) => content,
74            Err(error) => {
75                warn!("Failed to read pixi.lock at {:?}: {}", path, error);
76                return vec![default_package_data(Some(DatasourceId::PixiLock))];
77            }
78        };
79
80        vec![parse_pixi_lock(&toml_content)]
81    }
82}
83
84fn parse_pixi_toml(toml_content: &TomlValue) -> PackageData {
85    let identity = toml_content
86        .get(FIELD_WORKSPACE)
87        .and_then(TomlValue::as_table)
88        .or_else(|| {
89            toml_content
90                .get(FIELD_PROJECT)
91                .and_then(TomlValue::as_table)
92        });
93
94    let name = identity
95        .and_then(|table| table.get(FIELD_NAME))
96        .and_then(TomlValue::as_str)
97        .map(ToOwned::to_owned);
98    let version = identity
99        .and_then(|table| table.get(FIELD_VERSION))
100        .and_then(toml_value_to_string);
101
102    let mut package = default_package_data(Some(DatasourceId::PixiToml));
103    package.name = name.clone();
104    package.version = version.clone();
105    package.primary_language = Some("TOML".to_string());
106    package.description = identity
107        .and_then(|table| table.get(FIELD_DESCRIPTION))
108        .and_then(TomlValue::as_str)
109        .map(|value| value.trim().to_string());
110    package.homepage_url = identity
111        .and_then(|table| table.get(FIELD_HOMEPAGE))
112        .and_then(TomlValue::as_str)
113        .map(ToOwned::to_owned);
114    package.vcs_url = identity
115        .and_then(|table| table.get(FIELD_REPOSITORY))
116        .and_then(TomlValue::as_str)
117        .map(ToOwned::to_owned);
118    package.parties = extract_authors(identity);
119    package.extracted_license_statement = identity
120        .and_then(|table| table.get(FIELD_LICENSE))
121        .and_then(TomlValue::as_str)
122        .map(ToOwned::to_owned);
123    package.purl = name
124        .as_deref()
125        .and_then(|value| build_pixi_purl(value, version.as_deref()));
126    package.dependencies = extract_manifest_dependencies(toml_content);
127    package.extra_data = build_manifest_extra_data(toml_content, identity);
128    package
129}
130
131fn parse_pixi_lock(toml_content: &TomlValue) -> PackageData {
132    let mut package = default_package_data(Some(DatasourceId::PixiLock));
133    package.primary_language = Some("TOML".to_string());
134
135    let lock_version = toml_content
136        .get(FIELD_VERSION)
137        .and_then(TomlValue::as_integer);
138    let mut extra_data = HashMap::new();
139    if let Some(lock_version) = lock_version {
140        extra_data.insert("lock_version".to_string(), JsonValue::from(lock_version));
141    }
142    if let Some(env_json) = toml_content.get(FIELD_ENVIRONMENTS).and_then(toml_to_json) {
143        extra_data.insert("lock_environments".to_string(), env_json);
144    }
145    package.extra_data = (!extra_data.is_empty()).then_some(extra_data);
146
147    match lock_version {
148        Some(6) => package.dependencies = extract_v6_lock_dependencies(toml_content),
149        Some(4) => package.dependencies = extract_v4_lock_dependencies(toml_content),
150        Some(_) | None => {}
151    }
152
153    package
154}
155
156fn extract_authors(identity: Option<&TomlMap<String, TomlValue>>) -> Vec<Party> {
157    identity
158        .and_then(|table| table.get(FIELD_AUTHORS))
159        .and_then(TomlValue::as_array)
160        .into_iter()
161        .flatten()
162        .filter_map(TomlValue::as_str)
163        .map(|author| {
164            let (name, email) = split_name_email(author);
165            Party {
166                r#type: None,
167                role: Some("author".to_string()),
168                name,
169                email,
170                url: None,
171                organization: None,
172                organization_url: None,
173                timezone: None,
174            }
175        })
176        .collect()
177}
178
179fn extract_manifest_dependencies(toml_content: &TomlValue) -> Vec<Dependency> {
180    let mut dependencies = Vec::new();
181
182    if let Some(table) = toml_content
183        .get(FIELD_DEPENDENCIES)
184        .and_then(TomlValue::as_table)
185    {
186        dependencies.extend(extract_conda_dependencies(table, None, false));
187    }
188    if let Some(table) = toml_content
189        .get(FIELD_PYPI_DEPENDENCIES)
190        .and_then(TomlValue::as_table)
191    {
192        dependencies.extend(extract_pypi_dependencies(table, None, false));
193    }
194
195    if let Some(feature_table) = toml_content
196        .get(FIELD_FEATURE)
197        .and_then(TomlValue::as_table)
198    {
199        for (feature_name, value) in feature_table {
200            let Some(feature) = value.as_table() else {
201                continue;
202            };
203            if let Some(table) = feature
204                .get(FIELD_DEPENDENCIES)
205                .and_then(TomlValue::as_table)
206            {
207                dependencies.extend(extract_conda_dependencies(table, Some(feature_name), true));
208            }
209            if let Some(table) = feature
210                .get(FIELD_PYPI_DEPENDENCIES)
211                .and_then(TomlValue::as_table)
212            {
213                dependencies.extend(extract_pypi_dependencies(table, Some(feature_name), true));
214            }
215        }
216    }
217
218    dependencies
219}
220
221fn extract_conda_dependencies(
222    table: &TomlMap<String, TomlValue>,
223    scope: Option<&str>,
224    optional: bool,
225) -> Vec<Dependency> {
226    table
227        .iter()
228        .filter_map(|(name, value)| build_conda_dependency(name, value, scope, optional))
229        .collect()
230}
231
232fn build_conda_dependency(
233    name: &str,
234    value: &TomlValue,
235    scope: Option<&str>,
236    optional: bool,
237) -> Option<Dependency> {
238    let requirement = extract_conda_requirement(value);
239    let exact_requirement = match value {
240        TomlValue::String(value) => Some(value.to_string()),
241        TomlValue::Table(table) => table.get(FIELD_VERSION).and_then(toml_value_to_string),
242        _ => None,
243    };
244    let pinned = exact_requirement
245        .as_deref()
246        .is_some_and(is_exact_constraint);
247    let exact_version = exact_requirement
248        .as_deref()
249        .filter(|_| pinned)
250        .map(|value| value.trim_start_matches('='));
251    let purl = build_conda_purl("conda", None, name, exact_version, None, None, None);
252
253    let mut extra_data = HashMap::new();
254    if let TomlValue::Table(dep_table) = value {
255        for key in ["channel", "build", "path", "url", "git"] {
256            if let Some(val) = dep_table.get(key).and_then(toml_value_to_string) {
257                extra_data.insert(key.to_string(), JsonValue::String(val));
258            }
259        }
260    }
261
262    Some(Dependency {
263        purl,
264        extracted_requirement: requirement.clone(),
265        scope: scope.map(ToOwned::to_owned),
266        is_runtime: Some(true),
267        is_optional: Some(optional),
268        is_pinned: Some(pinned),
269        is_direct: Some(true),
270        resolved_package: None,
271        extra_data: (!extra_data.is_empty()).then_some(extra_data),
272    })
273}
274
275fn extract_pypi_dependencies(
276    table: &TomlMap<String, TomlValue>,
277    scope: Option<&str>,
278    optional: bool,
279) -> Vec<Dependency> {
280    table
281        .iter()
282        .filter_map(|(name, value)| build_pypi_dependency(name, value, scope, optional))
283        .collect()
284}
285
286fn build_pypi_dependency(
287    name: &str,
288    value: &TomlValue,
289    scope: Option<&str>,
290    optional: bool,
291) -> Option<Dependency> {
292    let normalized_name = normalize_pypi_name(name);
293    let requirement = extract_pypi_requirement(value);
294    let exact_requirement = match value {
295        TomlValue::String(value) => Some(value.to_string()),
296        TomlValue::Table(table) => table.get(FIELD_VERSION).and_then(toml_value_to_string),
297        _ => None,
298    };
299    let pinned = exact_requirement
300        .as_deref()
301        .is_some_and(is_exact_constraint);
302    let exact_version = exact_requirement
303        .as_deref()
304        .filter(|_| pinned)
305        .map(|value| value.trim_start_matches('='));
306    let purl = build_pypi_purl(&normalized_name, exact_version);
307
308    let mut extra_data = HashMap::new();
309    if let TomlValue::Table(dep_table) = value {
310        for key in [
311            "index",
312            "path",
313            "git",
314            "url",
315            "branch",
316            "tag",
317            "rev",
318            "subdirectory",
319        ] {
320            if let Some(val) = dep_table.get(key).and_then(toml_value_to_string) {
321                extra_data.insert(key.replace('-', "_"), JsonValue::String(val));
322            }
323        }
324        if let Some(editable) = dep_table.get("editable").and_then(TomlValue::as_bool) {
325            extra_data.insert("editable".to_string(), JsonValue::Bool(editable));
326        }
327        if let Some(extras) = dep_table.get("extras").and_then(toml_to_json) {
328            extra_data.insert("extras".to_string(), extras);
329        }
330    }
331
332    Some(Dependency {
333        purl,
334        extracted_requirement: requirement.clone(),
335        scope: scope.map(ToOwned::to_owned),
336        is_runtime: Some(true),
337        is_optional: Some(optional),
338        is_pinned: Some(pinned),
339        is_direct: Some(true),
340        resolved_package: None,
341        extra_data: (!extra_data.is_empty()).then_some(extra_data),
342    })
343}
344
345fn build_manifest_extra_data(
346    toml_content: &TomlValue,
347    identity: Option<&TomlMap<String, TomlValue>>,
348) -> Option<HashMap<String, JsonValue>> {
349    let mut extra_data = HashMap::new();
350
351    for (field, key) in [
352        (FIELD_CHANNELS, "channels"),
353        (FIELD_PLATFORMS, "platforms"),
354        (FIELD_REQUIRES_PIXI, "requires_pixi"),
355        (FIELD_EXCLUDE_NEWER, "exclude_newer"),
356        (FIELD_LICENSE_FILE, "license_file"),
357        (FIELD_README, "readme"),
358        (FIELD_DOCUMENTATION, "documentation"),
359    ] {
360        if let Some(value) = identity
361            .and_then(|table| table.get(field))
362            .and_then(toml_to_json)
363        {
364            extra_data.insert(key.to_string(), value);
365        }
366    }
367    if let Some(value) = toml_content.get(FIELD_ENVIRONMENTS).and_then(toml_to_json) {
368        extra_data.insert("environments".to_string(), value);
369    }
370    if let Some(value) = toml_content.get(FIELD_TASKS).and_then(toml_to_json) {
371        extra_data.insert("tasks".to_string(), value);
372    }
373    if let Some(value) = toml_content.get(FIELD_PYPI_OPTIONS).and_then(toml_to_json) {
374        extra_data.insert("pypi_options".to_string(), value);
375    }
376    if let Some(feature_names) = toml_content
377        .get(FIELD_FEATURE)
378        .and_then(TomlValue::as_table)
379        .map(|table| table.keys().cloned().collect::<Vec<_>>())
380        .filter(|names| !names.is_empty())
381    {
382        extra_data.insert(
383            "features".to_string(),
384            JsonValue::Array(feature_names.into_iter().map(JsonValue::String).collect()),
385        );
386    }
387
388    (!extra_data.is_empty()).then_some(extra_data)
389}
390
391fn extract_v6_lock_dependencies(toml_content: &TomlValue) -> Vec<Dependency> {
392    let environment_refs = collect_v6_package_refs(toml_content);
393    let Some(packages) = toml_content.get("packages").and_then(TomlValue::as_array) else {
394        return Vec::new();
395    };
396
397    packages
398        .iter()
399        .filter_map(TomlValue::as_table)
400        .filter_map(|table| build_v6_lock_dependency(table, &environment_refs))
401        .collect()
402}
403
404fn collect_v6_package_refs(toml_content: &TomlValue) -> HashMap<String, Vec<JsonValue>> {
405    let mut refs = HashMap::new();
406    let Some(environments) = toml_content
407        .get(FIELD_ENVIRONMENTS)
408        .and_then(TomlValue::as_table)
409    else {
410        return refs;
411    };
412
413    for (env_name, env_value) in environments {
414        let Some(env_table) = env_value.as_table() else {
415            continue;
416        };
417        let channels = env_table.get(FIELD_CHANNELS).and_then(toml_to_json);
418        let indexes = env_table.get("indexes").and_then(toml_to_json);
419        let Some(package_platforms) = env_table.get("packages").and_then(TomlValue::as_table)
420        else {
421            continue;
422        };
423        for (platform, values) in package_platforms {
424            let Some(entries) = values.as_array() else {
425                continue;
426            };
427            for entry in entries {
428                let Some(table) = entry.as_table() else {
429                    continue;
430                };
431                for (kind, locator_value) in table {
432                    if let Some(locator) = toml_value_to_string(locator_value) {
433                        let mut data = JsonMap::new();
434                        data.insert(
435                            "environment".to_string(),
436                            JsonValue::String(env_name.clone()),
437                        );
438                        data.insert("platform".to_string(), JsonValue::String(platform.clone()));
439                        data.insert("kind".to_string(), JsonValue::String(kind.clone()));
440                        if let Some(channels) = channels.clone() {
441                            data.insert("channels".to_string(), channels);
442                        }
443                        if let Some(indexes) = indexes.clone() {
444                            data.insert("indexes".to_string(), indexes);
445                        }
446                        refs.entry(locator)
447                            .or_default()
448                            .push(JsonValue::Object(data));
449                    }
450                }
451            }
452        }
453    }
454
455    refs
456}
457
458fn build_v6_lock_dependency(
459    table: &TomlMap<String, TomlValue>,
460    refs: &HashMap<String, Vec<JsonValue>>,
461) -> Option<Dependency> {
462    if let Some(locator) = table.get("pypi").and_then(toml_value_to_string) {
463        let name = table
464            .get(FIELD_NAME)
465            .and_then(TomlValue::as_str)
466            .map(normalize_pypi_name)?;
467        let version = table.get(FIELD_VERSION).and_then(toml_value_to_string)?;
468        let mut extra = HashMap::new();
469        extra.insert("source".to_string(), JsonValue::String(locator.clone()));
470        if let Some(val) = table.get("requires_dist").and_then(toml_to_json) {
471            extra.insert("requires_dist".to_string(), val);
472        }
473        if let Some(val) = table.get("requires_python").and_then(toml_to_json) {
474            extra.insert("requires_python".to_string(), val);
475        }
476        for key in ["sha256", "md5"] {
477            if let Some(val) = table.get(key).and_then(toml_to_json) {
478                extra.insert(key.to_string(), val);
479            }
480        }
481        if let Some(values) = refs.get(&locator)
482            && !values.is_empty()
483        {
484            extra.insert(
485                "lock_references".to_string(),
486                JsonValue::Array(values.clone()),
487            );
488        }
489        return Some(Dependency {
490            purl: build_pypi_purl(&name, Some(&version)),
491            extracted_requirement: Some(version),
492            scope: None,
493            is_runtime: Some(true),
494            is_optional: Some(false),
495            is_pinned: Some(true),
496            is_direct: None,
497            resolved_package: None,
498            extra_data: Some(extra),
499        });
500    }
501
502    if let Some(locator) = table.get("conda").and_then(toml_value_to_string) {
503        let name = conda_name_from_locator(&locator)?;
504        let version = table.get(FIELD_VERSION).and_then(toml_value_to_string);
505        let mut extra = HashMap::new();
506        extra.insert("source".to_string(), JsonValue::String(locator.clone()));
507        for key in [
508            "sha256",
509            "md5",
510            "license",
511            "license_family",
512            "depends",
513            "constrains",
514            "purls",
515        ] {
516            if let Some(val) = table.get(key).and_then(toml_to_json) {
517                extra.insert(key.to_string(), val);
518            }
519        }
520        if let Some(values) = refs.get(&locator)
521            && !values.is_empty()
522        {
523            extra.insert(
524                "lock_references".to_string(),
525                JsonValue::Array(values.clone()),
526            );
527        }
528        return Some(Dependency {
529            purl: build_conda_purl("conda", None, &name, version.as_deref(), None, None, None),
530            extracted_requirement: version,
531            scope: None,
532            is_runtime: Some(true),
533            is_optional: Some(false),
534            is_pinned: Some(true),
535            is_direct: None,
536            resolved_package: None,
537            extra_data: Some(extra),
538        });
539    }
540
541    None
542}
543
544fn extract_v4_lock_dependencies(toml_content: &TomlValue) -> Vec<Dependency> {
545    let Some(packages) = toml_content.get("packages").and_then(TomlValue::as_array) else {
546        return Vec::new();
547    };
548
549    packages
550        .iter()
551        .filter_map(TomlValue::as_table)
552        .filter_map(build_v4_lock_dependency)
553        .collect()
554}
555
556fn build_v4_lock_dependency(table: &TomlMap<String, TomlValue>) -> Option<Dependency> {
557    let kind = table.get("kind").and_then(TomlValue::as_str)?;
558    let name = table.get(FIELD_NAME).and_then(toml_value_to_string)?;
559    let version = table.get(FIELD_VERSION).and_then(toml_value_to_string);
560    let mut extra = HashMap::new();
561    for key in [
562        "url",
563        "path",
564        "sha256",
565        "md5",
566        "editable",
567        "build",
568        "subdir",
569        "license",
570        "license_family",
571        "depends",
572        "requires_dist",
573    ] {
574        if let Some(val) = table.get(key).and_then(toml_to_json) {
575            extra.insert(key.replace('-', "_"), val);
576        }
577    }
578
579    Some(Dependency {
580        purl: match kind {
581            "pypi" => build_pypi_purl(&normalize_pypi_name(&name), version.as_deref()),
582            "conda" => build_conda_purl("conda", None, &name, version.as_deref(), None, None, None),
583            _ => None,
584        },
585        extracted_requirement: version,
586        scope: None,
587        is_runtime: Some(true),
588        is_optional: Some(false),
589        is_pinned: Some(true),
590        is_direct: None,
591        resolved_package: None,
592        extra_data: Some(extra),
593    })
594}
595
596fn extract_conda_requirement(value: &TomlValue) -> Option<String> {
597    match value {
598        TomlValue::String(value) => Some(value.to_string()),
599        TomlValue::Table(table) => table
600            .get(FIELD_VERSION)
601            .and_then(toml_value_to_string)
602            .or_else(|| table.get("build").and_then(toml_value_to_string)),
603        _ => None,
604    }
605}
606
607fn extract_pypi_requirement(value: &TomlValue) -> Option<String> {
608    match value {
609        TomlValue::String(value) => Some(value.to_string()),
610        TomlValue::Table(table) => table
611            .get(FIELD_VERSION)
612            .and_then(toml_value_to_string)
613            .or_else(|| table.get("path").and_then(toml_value_to_string))
614            .or_else(|| table.get("git").and_then(toml_value_to_string))
615            .or_else(|| table.get("url").and_then(toml_value_to_string)),
616        _ => None,
617    }
618}
619
620fn toml_value_to_string(value: &TomlValue) -> Option<String> {
621    match value {
622        TomlValue::String(value) => Some(value.clone()),
623        TomlValue::Integer(value) => Some(value.to_string()),
624        TomlValue::Float(value) => Some(value.to_string()),
625        TomlValue::Boolean(value) => Some(value.to_string()),
626        _ => None,
627    }
628}
629
630fn toml_to_json(value: &TomlValue) -> Option<JsonValue> {
631    serde_json::to_value(value).ok()
632}
633
634fn normalize_pypi_name(name: &str) -> String {
635    name.trim().replace('_', "-").to_ascii_lowercase()
636}
637
638fn build_pypi_purl(name: &str, version: Option<&str>) -> Option<String> {
639    let mut purl = PackageUrl::new("pypi", name).ok()?;
640    if let Some(version) = version {
641        purl.with_version(version).ok()?;
642    }
643    Some(purl.to_string())
644}
645
646fn build_pixi_purl(name: &str, version: Option<&str>) -> Option<String> {
647    let mut purl = PackageUrl::new(PackageType::Pixi.as_str(), name).ok()?;
648    if let Some(version) = version {
649        purl.with_version(version).ok()?;
650    }
651    Some(purl.to_string())
652}
653
654fn is_exact_constraint(value: &str) -> bool {
655    let trimmed = value.trim();
656    let normalized = trimmed.trim_start_matches('=');
657    !normalized.is_empty()
658        && !normalized.contains('*')
659        && !normalized.contains('^')
660        && !normalized.contains('~')
661        && !normalized.contains('>')
662        && !normalized.contains('<')
663        && !normalized.contains('=')
664        && !normalized.contains('|')
665        && !normalized.contains(',')
666        && !normalized.contains(' ')
667}
668
669fn conda_name_from_locator(locator: &str) -> Option<String> {
670    let file_name = locator.rsplit('/').next()?;
671    let stem = file_name
672        .strip_suffix(".tar.bz2")
673        .or_else(|| file_name.strip_suffix(".conda"))
674        .unwrap_or(file_name);
675    let mut parts = stem.rsplitn(3, '-');
676    let _ = parts.next()?;
677    let _ = parts.next()?;
678    Some(parts.next()?.to_string())
679}
680
681fn default_package_data(datasource_id: Option<DatasourceId>) -> PackageData {
682    PackageData {
683        package_type: Some(PackageType::Pixi),
684        datasource_id,
685        ..Default::default()
686    }
687}
688
689crate::register_parser!(
690    "Pixi workspace manifest and lockfile",
691    &["**/pixi.toml", "**/pixi.lock"],
692    "pixi",
693    "TOML",
694    Some("https://pixi.sh/latest/reference/pixi_manifest/"),
695);