#![deny(clippy::all)]
#![forbid(clippy::indexing_slicing)]
use std::collections::BTreeMap;
use std::collections::BTreeSet;
use std::fs;
use std::path::Path;
use std::path::PathBuf;
use glob::glob;
use monochange_core::AdapterDiscovery;
use monochange_core::DependencyKind;
use monochange_core::Ecosystem;
use monochange_core::EcosystemAdapter;
use monochange_core::LockfileCommandExecution;
use monochange_core::MonochangeError;
use monochange_core::MonochangeResult;
use monochange_core::PackageDependency;
use monochange_core::PackageRecord;
use monochange_core::PublishState;
use monochange_core::ShellConfig;
use monochange_core::SourceConfiguration;
use monochange_core::normalize_path;
use monochange_publish::PublishRequest;
use semver::Version;
use toml::Value;
use toml_edit::DocumentMut;
use toml_edit::Item;
use walkdir::DirEntry;
use walkdir::WalkDir;
pub const PYPROJECT_FILE: &str = "pyproject.toml";
pub const UV_LOCK_FILE: &str = "uv.lock";
pub const POETRY_LOCK_FILE: &str = "poetry.lock";
pub fn write_python_placeholder_manifest(
dir: &Path,
request: &PublishRequest,
source: Option<&SourceConfiguration>,
) -> MonochangeResult<()> {
let module_name = python_placeholder_module_name(&request.package_name);
let project_urls = source.map(|source| {
format!(
"\n[project.urls]\nRepository = \"https://github.com/{}/{}\"\n",
source.owner, source.repo
)
});
let pyproject = format!(
"[build-system]\nrequires = [\"hatchling\"]\nbuild-backend = \"hatchling.build\"\n\n[project]\nname = \"{}\"\nversion = \"{}\"\ndescription = \"Placeholder package for {}\"\nreadme = \"README.md\"\nrequires-python = \">=3.8\"\n{}\n[tool.hatch.build.targets.wheel]\npackages = [\"src/{}\"]\n",
request.package_name,
request.version,
request.package_name,
project_urls.unwrap_or_default(),
module_name,
);
fs::write(dir.join("pyproject.toml"), pyproject).map_err(|error| {
MonochangeError::Io(format!(
"failed to write placeholder pyproject.toml: {error}"
))
})?;
let package_dir = dir.join("src").join(&module_name);
fs::create_dir_all(&package_dir).map_err(|error| {
MonochangeError::Io(format!(
"failed to create placeholder Python package: {error}"
))
})?;
fs::write(
package_dir.join("__init__.py"),
"\"\"\"Placeholder package published by monochange.\"\"\"\n",
)
.map_err(|error| {
MonochangeError::Io(format!(
"failed to write placeholder Python package module: {error}"
))
})
}
fn python_placeholder_module_name(package_name: &str) -> String {
let mut module = String::new();
for character in package_name.chars() {
if character.is_ascii_alphanumeric() || character == '_' {
module.push(character.to_ascii_lowercase());
} else {
module.push('_');
}
}
if module.is_empty() || module.starts_with(|character: char| character.is_ascii_digit()) {
module.insert_str(0, "placeholder_");
}
module
}
pub struct PythonAdapter;
#[must_use]
pub const fn adapter() -> PythonAdapter {
PythonAdapter
}
impl EcosystemAdapter for PythonAdapter {
fn ecosystem(&self) -> Ecosystem {
Ecosystem::Python
}
fn discover(&self, root: &Path) -> MonochangeResult<AdapterDiscovery> {
discover_python_packages(root)
}
fn load_configured(
&self,
_root: &Path,
_package_path: &Path,
) -> MonochangeResult<Option<PackageRecord>> {
Ok(None)
}
fn supported_versioned_file_kind(&self, path: &Path) -> bool {
supported_versioned_file_kind(path).is_some()
}
fn validate_versioned_file(
&self,
full_path: &Path,
display_path: &str,
custom_fields: Option<&[String]>,
) -> MonochangeResult<()> {
validate_versioned_file(full_path, display_path, custom_fields)
}
}
#[derive(Debug, Clone, Copy, Eq, PartialEq)]
pub enum PythonVersionedFileKind {
Manifest,
Lock,
}
#[must_use]
pub fn supported_versioned_file_kind(path: &Path) -> Option<PythonVersionedFileKind> {
let file_name = path
.file_name()
.and_then(|name| name.to_str())
.unwrap_or_default();
match file_name {
PYPROJECT_FILE => Some(PythonVersionedFileKind::Manifest),
UV_LOCK_FILE | POETRY_LOCK_FILE => Some(PythonVersionedFileKind::Lock),
_ => None,
}
}
pub fn discover_lockfiles(package: &PackageRecord) -> Vec<PathBuf> {
let manifest_dir = package
.manifest_path
.parent()
.map_or_else(|| package.workspace_root.clone(), Path::to_path_buf);
let scope = if manifest_dir == package.workspace_root {
manifest_dir.clone()
} else {
package.workspace_root.clone()
};
let mut discovered: Vec<PathBuf> = [scope.join(UV_LOCK_FILE), scope.join(POETRY_LOCK_FILE)]
.into_iter()
.filter(|path| path.exists())
.collect();
if discovered.is_empty() && scope != manifest_dir {
discovered.extend(
[
manifest_dir.join(UV_LOCK_FILE),
manifest_dir.join(POETRY_LOCK_FILE),
]
.into_iter()
.filter(|path| path.exists()),
);
}
discovered
}
pub fn default_lockfile_commands(package: &PackageRecord) -> Vec<LockfileCommandExecution> {
let lockfiles = discover_lockfiles(package);
lockfiles
.into_iter()
.filter_map(|lockfile| {
let file_name = lockfile.file_name()?.to_str()?;
let command = lockfile_command(file_name)?;
Some(LockfileCommandExecution {
command: command.to_string(),
cwd: lockfile
.parent()
.unwrap_or(&package.workspace_root)
.to_path_buf(),
shell: ShellConfig::None,
})
})
.collect()
}
fn lockfile_command(file_name: &str) -> Option<&'static str> {
match file_name {
UV_LOCK_FILE => Some("uv lock"),
POETRY_LOCK_FILE => Some("poetry lock --no-update"),
_ => None,
}
}
pub fn update_versioned_file_text(
contents: &str,
kind: PythonVersionedFileKind,
owner_version: Option<&str>,
versioned_deps: &BTreeMap<String, String>,
) -> Result<String, toml_edit::TomlError> {
let mut document = contents.parse::<DocumentMut>()?;
update_versioned_file(&mut document, kind, owner_version, versioned_deps);
Ok(document.to_string())
}
pub fn update_versioned_file(
document: &mut DocumentMut,
kind: PythonVersionedFileKind,
owner_version: Option<&str>,
versioned_deps: &BTreeMap<String, String>,
) {
match kind {
PythonVersionedFileKind::Manifest => {
update_project_version(document, owner_version);
update_project_dependencies(document, versioned_deps);
}
PythonVersionedFileKind::Lock => {
}
}
}
fn update_project_version(document: &mut DocumentMut, owner_version: Option<&str>) {
let Some(version) = owner_version else {
return;
};
let Some(project) = document
.get_mut("project")
.and_then(Item::as_table_like_mut)
else {
return;
};
if let Some(existing) = project.get_mut("version")
&& let Some(existing_value) = existing.as_value()
{
let mut new_value = toml_edit::Value::from(version);
*new_value.decor_mut() = existing_value.decor().clone();
*existing = Item::Value(new_value);
}
}
fn update_project_dependencies(
document: &mut DocumentMut,
versioned_deps: &BTreeMap<String, String>,
) {
if versioned_deps.is_empty() {
return;
}
let Some(project) = document
.get_mut("project")
.and_then(Item::as_table_like_mut)
else {
return;
};
let Some(deps) = project.get_mut("dependencies").and_then(Item::as_array_mut) else {
return;
};
for item in deps.iter_mut() {
let Some(spec) = item.as_str() else {
continue;
};
if let Some(updated) = update_dependency_specifier(spec, versioned_deps) {
let mut new_value = toml_edit::Value::from(updated);
*new_value.decor_mut() = item.decor().clone();
*item = new_value;
}
}
}
fn update_dependency_specifier(
spec: &str,
versioned_deps: &BTreeMap<String, String>,
) -> Option<String> {
let name = parse_dependency_name(spec)?;
let normalized = normalize_python_package_name(&name);
let version = versioned_deps.get(&normalized)?;
let after_name = &spec[name.len()..];
let extras_end = after_name
.find(|ch: char| ch != '[' && ch != ']' && !ch.is_alphanumeric() && ch != ',')
.unwrap_or(0);
let extras = &after_name[..extras_end];
Some(format!("{name}{extras}{version}"))
}
fn parse_dependency_name(spec: &str) -> Option<String> {
let name: String = spec
.chars()
.take_while(|ch| ch.is_alphanumeric() || *ch == '-' || *ch == '_' || *ch == '.')
.collect();
if name.is_empty() { None } else { Some(name) }
}
fn normalize_python_package_name(name: &str) -> String {
let mut result = String::with_capacity(name.len());
let mut prev_was_separator = false;
for ch in name.chars() {
if ch == '-' || ch == '_' || ch == '.' {
if !prev_was_separator && !result.is_empty() {
result.push('-');
}
prev_was_separator = true;
} else {
result.push(ch.to_ascii_lowercase());
prev_was_separator = false;
}
}
result
}
#[tracing::instrument(skip_all)]
pub fn discover_python_packages(root: &Path) -> MonochangeResult<AdapterDiscovery> {
let mut packages = Vec::new();
let mut warnings = Vec::new();
let mut included_manifests = BTreeSet::new();
let root_manifest = root.join(PYPROJECT_FILE);
if root_manifest.exists() {
let workspace_members = match parse_uv_workspace_members(&root_manifest) {
Ok(members) => members,
Err(error) => {
warnings.push(format!("skipped {}: {error}", root_manifest.display()));
None
}
};
if let Some(workspace_members) = workspace_members {
included_manifests.insert(normalize_path(&root_manifest));
let member_manifests =
expand_workspace_members(root, &workspace_members, &mut warnings);
for manifest_path in member_manifests {
if let Some(package) = parse_python_package(&manifest_path, root)? {
included_manifests.insert(normalize_path(&manifest_path));
packages.push(package);
}
}
}
}
for manifest_path in find_all_pyproject_files(root) {
let normalized = normalize_path(&manifest_path);
if included_manifests.contains(&normalized) {
continue;
}
let manifest_dir = manifest_path
.parent()
.unwrap_or_else(|| Path::new("."))
.to_path_buf();
match parse_python_package(&manifest_path, &manifest_dir) {
Ok(Some(package)) => packages.push(package),
Ok(None) => {}
Err(error) => {
warnings.push(format!("skipped {}: {error}", manifest_path.display()));
}
}
}
packages.sort_by(|left, right| left.id.cmp(&right.id));
packages.dedup_by(|left, right| left.id == right.id);
tracing::debug!(packages = packages.len(), "discovered python packages");
Ok(AdapterDiscovery { packages, warnings })
}
fn parse_uv_workspace_members(manifest_path: &Path) -> MonochangeResult<Option<Vec<String>>> {
let contents = fs::read_to_string(manifest_path).map_err(|error| {
MonochangeError::Io(format!(
"failed to read {}: {error}",
manifest_path.display()
))
})?;
let parsed = toml::from_str::<Value>(&contents).map_err(|error| {
MonochangeError::Discovery(format!(
"failed to parse {}: {error}",
manifest_path.display()
))
})?;
let members = parsed
.get("tool")
.and_then(|tool| tool.get("uv"))
.and_then(|uv| uv.get("workspace"))
.and_then(|workspace| workspace.get("members"))
.and_then(Value::as_array)
.map(|members| {
members
.iter()
.filter_map(Value::as_str)
.map(ToString::to_string)
.collect()
});
Ok(members)
}
fn expand_workspace_members(
root: &Path,
patterns: &[String],
warnings: &mut Vec<String>,
) -> BTreeSet<PathBuf> {
let mut manifests = BTreeSet::new();
for pattern in patterns {
let joined = root.join(pattern).to_string_lossy().to_string();
let matches: Vec<PathBuf> = glob(&joined)
.into_iter()
.flat_map(|paths| paths.filter_map(Result::ok))
.map(|path| normalize_path(&path))
.collect();
if matches.is_empty() {
warnings.push(format!(
"uv workspace pattern `{pattern}` under {} matched no packages",
root.display()
));
}
for matched_path in matches {
let manifest_path = if matched_path.is_dir() {
matched_path.join(PYPROJECT_FILE)
} else if matched_path.file_name().and_then(|name| name.to_str())
== Some(PYPROJECT_FILE)
{
matched_path
} else {
continue;
};
if manifest_path.exists() {
manifests.insert(manifest_path);
}
}
}
manifests
}
fn parse_python_package(
manifest_path: &Path,
workspace_root: &Path,
) -> MonochangeResult<Option<PackageRecord>> {
let contents = fs::read_to_string(manifest_path).map_err(|error| {
MonochangeError::Io(format!(
"failed to read {}: {error}",
manifest_path.display()
))
})?;
let parsed = toml::from_str::<Value>(&contents).map_err(|error| {
MonochangeError::Discovery(format!(
"failed to parse {}: {error}",
manifest_path.display()
))
})?;
let (name, version, dependencies) = if let Some(project) = parsed.get("project") {
let name = project.get("name").and_then(Value::as_str);
let version = project
.get("version")
.and_then(Value::as_str)
.and_then(parse_pep440_as_semver);
let dynamic = project
.get("dynamic")
.and_then(Value::as_array)
.is_some_and(|arr| arr.iter().any(|v| v.as_str() == Some("version")));
let version = if dynamic { None } else { version };
let deps = parse_pep621_dependencies(project);
(name, version, deps)
} else if let Some(poetry) = parsed.get("tool").and_then(|tool| tool.get("poetry")) {
let name = poetry.get("name").and_then(Value::as_str);
let version = poetry
.get("version")
.and_then(Value::as_str)
.and_then(parse_pep440_as_semver);
let deps = parse_poetry_dependencies(poetry);
(name, version, deps)
} else {
return Ok(None);
};
let Some(name) = name else {
return Ok(None);
};
let mut record = PackageRecord::new(
Ecosystem::Python,
name,
normalize_path(manifest_path),
normalize_path(workspace_root),
version,
PublishState::Public,
);
record.declared_dependencies = dependencies;
Ok(Some(record))
}
fn parse_pep621_dependencies(project: &Value) -> Vec<PackageDependency> {
let mut deps = Vec::new();
if let Some(dep_array) = project.get("dependencies").and_then(Value::as_array) {
for dep in dep_array {
if let Some(spec) = dep.as_str()
&& let Some(name) = parse_dependency_name(spec)
{
deps.push(PackageDependency {
name: normalize_python_package_name(&name),
kind: DependencyKind::Runtime,
version_constraint: extract_version_constraint(spec, &name),
optional: false,
source_field: Some("dependencies".to_string()),
});
}
}
}
if let Some(optional_deps) = project
.get("optional-dependencies")
.and_then(Value::as_table)
{
for (_group, group_deps) in optional_deps {
if let Some(dep_array) = group_deps.as_array() {
for dep in dep_array {
if let Some(spec) = dep.as_str()
&& let Some(name) = parse_dependency_name(spec)
{
deps.push(PackageDependency {
name: normalize_python_package_name(&name),
kind: DependencyKind::Development,
version_constraint: extract_version_constraint(spec, &name),
optional: true,
source_field: Some("optional-dependencies".to_string()),
});
}
}
}
}
}
deps
}
fn parse_poetry_dependencies(poetry: &Value) -> Vec<PackageDependency> {
let mut deps = Vec::new();
if let Some(dep_table) = poetry.get("dependencies").and_then(Value::as_table) {
for (name, value) in dep_table {
if name == "python" {
continue;
}
let constraint = match value {
Value::String(version) => Some(version.clone()),
Value::Table(table) => {
table
.get("version")
.and_then(Value::as_str)
.map(ToString::to_string)
}
_ => None,
};
deps.push(PackageDependency {
name: normalize_python_package_name(name),
kind: DependencyKind::Runtime,
version_constraint: constraint,
optional: false,
source_field: Some("dependencies".to_string()),
});
}
}
if let Some(groups) = poetry.get("group").and_then(Value::as_table) {
for (_group_name, group) in groups {
if let Some(group_deps) = group
.as_table()
.and_then(|table| table.get("dependencies"))
.and_then(Value::as_table)
{
for (name, value) in group_deps {
let constraint = match value {
Value::String(version) => Some(version.clone()),
Value::Table(table) => {
table
.get("version")
.and_then(Value::as_str)
.map(ToString::to_string)
}
_ => None,
};
deps.push(PackageDependency {
name: normalize_python_package_name(name),
kind: DependencyKind::Development,
version_constraint: constraint,
optional: false,
source_field: Some("group.dependencies".to_string()),
});
}
}
}
}
deps
}
fn extract_version_constraint(spec: &str, name: &str) -> Option<String> {
let rest = spec.get(name.len()..)?;
let after_extras = if rest.starts_with('[') {
rest.find(']').map_or(rest, |end| &rest[end + 1..])
} else {
rest
};
let constraint = after_extras.split(';').next().unwrap_or("").trim();
if constraint.is_empty() {
None
} else {
Some(constraint.to_string())
}
}
fn parse_pep440_as_semver(version: &str) -> Option<Version> {
if let Ok(version) = Version::parse(version) {
return Some(version);
}
let parts: Vec<&str> = version.split('.').collect();
match parts.len() {
2 => {
let extended = format!("{version}.0");
Version::parse(&extended).ok()
}
_ => None,
}
}
fn find_all_pyproject_files(root: &Path) -> Vec<PathBuf> {
WalkDir::new(root)
.into_iter()
.filter_entry(should_descend)
.filter_map(Result::ok)
.filter(|entry| entry.file_name() == PYPROJECT_FILE)
.map(DirEntry::into_path)
.map(|path| normalize_path(&path))
.collect()
}
fn should_descend(entry: &DirEntry) -> bool {
let file_name = entry.file_name().to_string_lossy();
!matches!(
file_name.as_ref(),
".git"
| ".venv" | "venv"
| "__pycache__"
| ".mypy_cache"
| ".ruff_cache"
| ".pytest_cache"
| "node_modules"
| "target"
| ".devenv"
| "book" | ".tox"
| "dist" | "build"
| ".eggs" | "*.egg-info"
)
}
pub fn validate_versioned_file(
_full_path: &Path,
_display_path: &str,
_custom_fields: Option<&[String]>,
) -> MonochangeResult<()> {
Ok(())
}
#[must_use]
pub fn default_dependency_version_prefix() -> &'static str {
">="
}
#[must_use]
pub fn default_dependency_fields() -> &'static [&'static str] {
&["dependencies"]
}
#[cfg(test)]
#[path = "__tests__/lib_tests.rs"]
mod tests;