use breezyshim::branch::{Branch, PyBranch};
use breezyshim::commit::NullCommitReporter;
use breezyshim::debian::error::Error as BrzDebianError;
use breezyshim::debian::merge_upstream::{
do_import, get_existing_imported_upstream_revids, get_tarballs,
};
use breezyshim::debian::upstream::{
get_pristine_tar_source, upstream_version_add_revision,
PristineTarSource,
UpstreamBranchSource,
UpstreamSource,
};
use breezyshim::debian::{TarballKind, VersionKind, DEFAULT_ORIG_DIR};
use breezyshim::error::Error as BrzError;
use breezyshim::tree::MutableTree;
use breezyshim::tree::{PyTree, Tree};
use breezyshim::workingtree::{GenericWorkingTree, PyWorkingTree, WorkingTree};
use breezyshim::RevisionId;
use debian_analyzer::versions::debianize_upstream_version;
use debian_analyzer::wnpp::{BugId, BugKind};
use debian_analyzer::Certainty;
use debversion::Version;
use ognibuild::dependencies::debian::valid_debian_package_name;
use ognibuild::dependencies::debian::DebianDependency;
use std::ffi::OsString;
use std::collections::HashMap;
use std::path::{Path, PathBuf};
use std::process::Command;
use upstream_ontologist::{get_upstream_info, ProviderError, UpstreamMetadata};
pub mod fixer;
pub mod names;
pub mod processors;
pub mod simple_apt_repo;
pub mod vcs;
pub fn default_debianize_cache_dir() -> std::io::Result<std::path::PathBuf> {
xdg::BaseDirectories::with_prefix("debianize")?.create_cache_directory("")
}
pub fn default_create_dist(
tree: &GenericWorkingTree,
package: &str,
version: &str,
session: &mut dyn ognibuild::session::Session,
target_dir: &Path,
subpath: &Path,
) -> Result<OsString, Error> {
log::info!(
"Creating distribution tarball for {} version {} using ognibuild",
package,
version
);
struct SimpleLogManager;
impl ognibuild::logs::LogManager for SimpleLogManager {
fn start(&mut self) -> std::io::Result<()> {
Ok(())
}
}
let mut log_manager = SimpleLogManager;
match ognibuild::dist::create_dist(
session,
tree,
target_dir,
Some(false), &mut log_manager,
Some(version),
subpath,
Some(package), ) {
Ok(filename) => {
log::info!("Successfully created distribution tarball: {:?}", filename);
Ok(filename)
}
Err(e) => {
log::warn!(
"ognibuild dist creation failed: {}, falling back to simple export",
e
);
create_simple_tarball(tree, package, version, target_dir, subpath)
}
}
}
fn create_simple_tarball(
tree: &GenericWorkingTree,
package: &str,
version: &str,
target_dir: &Path,
subpath: &Path,
) -> Result<OsString, Error> {
use std::process::Command;
log::info!(
"Creating simple export tarball for {} version {}",
package,
version
);
let temp_dir = tempfile::tempdir()
.map_err(|e| Error::Other(format!("Failed to create temp directory: {}", e)))?;
let export_dir = temp_dir.path().join(format!("{}-{}", package, version));
std::fs::create_dir_all(&export_dir)
.map_err(|e| Error::Other(format!("Failed to create export directory: {}", e)))?;
let tree_path = tree.basedir().join(subpath);
let tarball_name = format!("{}_{}.orig.tar.gz", package, version);
let tarball_path = target_dir.join(&tarball_name);
if let Some(parent) = tarball_path.parent() {
std::fs::create_dir_all(parent)
.map_err(|e| Error::Other(format!("Failed to create target directory: {}", e)))?;
}
let tar_output = Command::new("tar")
.args([
"-czf",
tarball_path.to_str().unwrap(),
"--exclude=.git",
"--exclude=.bzr",
"--exclude=.hg",
"--exclude=.svn",
"--exclude=_darcs",
"--transform",
&format!("s,^\\./,{}-{}/,", package, version),
"-C",
tree_path.to_str().unwrap(),
".",
])
.output()
.map_err(|e| Error::Other(format!("Failed to run tar command: {}", e)))?;
if !tar_output.status.success() {
return Err(Error::Other(format!(
"tar command failed: {}",
String::from_utf8_lossy(&tar_output.stderr)
)));
}
log::info!("Created simple export tarball: {}", tarball_path.display());
Ok(OsString::from(tarball_path))
}
pub fn write_changelog_template(
path: &std::path::Path,
source_name: &str,
version: &Version,
author: Option<(String, String)>,
wnpp_bugs: Vec<(BugId, BugKind)>,
) -> Result<(), std::io::Error> {
let author = author.unwrap_or_else(|| debian_changelog::get_maintainer().unwrap());
let closes = if !wnpp_bugs.is_empty() {
format!(
" Closes: {}",
wnpp_bugs
.iter()
.map(|(n, _k)| format!("#{}", n))
.collect::<Vec<_>>()
.join(", ")
)
} else {
"".to_string()
};
let mut cl = debian_changelog::ChangeLog::new();
cl.new_entry()
.package(source_name.to_string())
.version(version.clone())
.distribution("UNRELEASED".to_string())
.urgency(debian_changelog::Urgency::Low)
.change_line(format!(" * Initial release.{}", closes))
.maintainer(author)
.finish();
let buf = cl.to_string();
std::fs::write(path, buf)?;
Ok(())
}
pub fn use_packaging_branch(wt: &GenericWorkingTree, branch_name: &str) -> Result<(), BrzError> {
let last_revision = wt.last_revision()?;
let target_branch = match wt.controldir().open_branch(Some(branch_name)) {
Ok(b) => b,
Err(BrzError::NotBranchError { .. }) => wt.controldir().create_branch(Some(branch_name))?,
Err(e) => return Err(e),
};
target_branch.generate_revision_history(&last_revision)?;
log::info!("Switching to packaging branch {}.", branch_name);
wt.controldir()
.set_branch_reference(target_branch.as_ref(), Some(""))?;
pyo3::Python::attach(|py| -> pyo3::PyResult<()> {
use pyo3::IntoPyObject;
let wt_py = wt.to_object(py);
let branch_py = target_branch.into_pyobject(py)?;
wt_py.setattr(py, "_branch", branch_py)?;
Ok(())
})
.unwrap();
Ok(())
}
pub fn import_upstream_version_from_dist(
wt: &dyn PyWorkingTree,
subpath: &std::path::Path,
upstream_source: &UpstreamBranchSource,
source_name: &str,
upstream_version: &str,
files_excluded: Option<&[&std::path::Path]>,
) -> Result<
(
HashMap<TarballKind, (RevisionId, PathBuf)>,
HashMap<TarballKind, String>,
String,
),
BrzDebianError,
> {
let orig_dir = Path::new(DEFAULT_ORIG_DIR).canonicalize().unwrap();
let mut tag_names = HashMap::new();
let td = tempfile::tempdir().unwrap();
let locations = upstream_source.fetch_tarballs(
Some(source_name),
upstream_version,
td.path(),
Some(&[TarballKind::Orig]),
)?;
let tarball_filenames = match get_tarballs(
&orig_dir,
wt,
source_name,
upstream_version,
locations
.iter()
.map(|x| x.as_ref())
.collect::<Vec<_>>()
.as_slice(),
) {
Ok(filenames) => filenames,
Err(BrzDebianError::BrzError(BrzError::FileExists(path, _))) => {
log::warn!("Tarball {} exists, reusing existing file.", path.display());
vec![orig_dir.join(path)]
}
Err(e) => return Err(e),
};
let upstream_revisions =
upstream_source.version_as_revisions(Some(source_name), upstream_version, None)?;
log::info!(
"Importing upstream version {} from distribution tarball",
upstream_version
);
let imported_revids = match do_import(
wt,
subpath,
tarball_filenames
.iter()
.map(|x| x.as_path())
.collect::<Vec<_>>()
.as_slice(),
source_name,
upstream_version,
None,
upstream_source.upstream_branch().as_ref(),
upstream_revisions,
None,
false,
false,
None,
files_excluded,
) {
Ok(revids) => {
log::info!(
"Successfully imported upstream version {}",
upstream_version
);
revids
}
Err(BrzDebianError::UpstreamAlreadyImported(version)) => {
log::warn!(
"Upstream release {} already imported, reusing existing import",
version
);
get_existing_imported_upstream_revids(upstream_source, source_name, upstream_version)?
}
Err(e) => {
log::error!(
"Failed to import upstream version {}: {}",
upstream_version,
e
);
return Err(e);
}
};
let mut pristine_revids = HashMap::new();
for (component, tag_name, revid, _pristine_tar_imported, subpath) in imported_revids {
pristine_revids.insert(component.clone(), (revid, subpath));
tag_names.insert(component, tag_name);
}
std::mem::drop(td);
let upstream_branch_name = "upstream";
match wt.controldir().create_branch(Some(upstream_branch_name)) {
Ok(branch) => {
branch
.generate_revision_history(&pristine_revids.get(&TarballKind::Orig).unwrap().0)?;
log::info!("Created upstream branch.");
}
Err(BrzError::AlreadyBranch(..)) => {
log::info!("Upstream branch already exists; not creating.");
wt.controldir().open_branch(Some(upstream_branch_name))?;
}
Err(e) => return Err(e.into()),
}
Ok((pristine_revids, tag_names, upstream_branch_name.to_string()))
}
pub fn import_upstream_dist(
pristine_tar_source: &PristineTarSource,
wt: &dyn PyWorkingTree,
upstream_source: &UpstreamBranchSource,
subpath: &Path,
source_name: &str,
upstream_version: &UpstreamVersion,
files_excluded: Option<&[&std::path::Path]>,
) -> Result<(RevisionId, Option<String>, HashMap<TarballKind, String>), BrzDebianError> {
let (mut pristine_revids, tag_names, upstream_branch_name) = if pristine_tar_source
.has_version(Some(source_name), &upstream_version.version, None, false)?
{
log::warn!(
"Upstream version {}/{} already imported.",
source_name,
upstream_version.version,
);
let pristine_revids = pristine_tar_source.version_as_revisions(
Some(source_name),
&upstream_version.version,
None,
)?;
let upstream_branch_name = None;
let tag_names = HashMap::new();
(pristine_revids, tag_names, upstream_branch_name)
} else {
let (pristine_revids, tag_names, upstream_branch_name) = import_upstream_version_from_dist(
wt,
subpath,
upstream_source,
source_name,
&upstream_version.version,
files_excluded,
)?;
(pristine_revids, tag_names, Some(upstream_branch_name))
};
let orig_revid = pristine_revids.remove(&TarballKind::Orig).unwrap().0;
Ok((orig_revid, upstream_branch_name, tag_names))
}
pub fn import_upstream_with_pristine_tar(
wt: &dyn PyWorkingTree,
subpath: &Path,
upstream_source: &UpstreamBranchSource,
source_name: &str,
upstream_version: &UpstreamVersion,
files_excluded: Option<&[&std::path::Path]>,
) -> Result<(RevisionId, Option<String>, HashMap<TarballKind, String>), BrzDebianError> {
log::info!(
"Importing upstream version {} with pristine-tar support",
upstream_version.version
);
if let Some(excluded) = files_excluded {
log::info!(
"Excluding {} files/directories during import",
excluded.len()
);
}
let pristine_tar_source = match get_pristine_tar_source(wt, &wt.branch()) {
Ok(pts) => pts,
Err(e) => {
log::warn!(
"Failed to get pristine-tar source: {:?}. Falling back to basic import.",
e
);
return Err(BrzDebianError::BrzError(BrzError::UnknownFormat(
"Failed to get pristine-tar source".to_string(),
)));
}
};
import_upstream_dist(
&pristine_tar_source,
wt,
upstream_source,
subpath,
source_name,
upstream_version,
files_excluded,
)
}
pub fn create_kickstart_from_dist<'a>(
upstream_source: &'a UpstreamBranchSource,
source_name: String,
upstream_version: UpstreamVersion,
files_excluded: Option<Vec<std::path::PathBuf>>,
) -> impl Fn(&dyn PyWorkingTree, &Path) -> Result<(), Error> + 'a {
move |wt: &dyn PyWorkingTree, subpath: &Path| {
log::info!(
"Kickstarting from dist tarball. Using upstream version {}",
upstream_version.version
);
let files_excluded_refs: Option<Vec<&std::path::Path>> = files_excluded
.as_ref()
.map(|paths| paths.iter().map(|p| p.as_path()).collect());
let files_excluded_slice: Option<&[&std::path::Path]> = files_excluded_refs.as_deref();
let (upstream_dist_revid, _upstream_branch_name, _tag_names) =
import_upstream_with_pristine_tar(
wt,
subpath,
upstream_source,
&source_name,
&upstream_version,
files_excluded_slice,
)
.map_err(|e| Error::Other(format!("Failed to import upstream: {}", e)))?;
if wt.branch().last_revision() != upstream_dist_revid {
wt.pull(
upstream_source.upstream_branch().as_ref(),
Some(true), Some(&upstream_dist_revid),
Some(false), )
.map_err(Error::BrzError)?;
log::info!(
"Updated working tree to upstream revision {}",
upstream_dist_revid
);
}
let debian_path = subpath.join("debian");
let source_path = debian_path.join("source");
if !wt.has_filename(&source_path) {
wt.mkdir(&source_path).map_err(Error::BrzError)?;
wt.add(&[&source_path]).map_err(Error::BrzError)?;
}
let format_file = source_path.join("format");
wt.put_file_bytes_non_atomic(&format_file, b"3.0 (quilt)\n")
.map_err(Error::BrzError)?;
log::info!("Created debian/source/format file");
Ok(())
}
}
pub fn last_resort_upstream_version(
upstream_source: &UpstreamBranchSource,
upstream_revision: &RevisionId,
) -> Result<String, BrzDebianError> {
let upstream_version = upstream_version_add_revision(
upstream_source.upstream_branch().as_ref(),
"0",
upstream_revision,
Some("+"),
)?;
log::warn!(
"Unable to determine upstream version, using {}.",
upstream_version
);
Ok(upstream_version)
}
#[cfg(test)]
mod tests {
use super::*;
use std::collections::HashMap;
use std::fs;
use tempfile::tempdir;
#[test]
fn test_write_changelog_template() {
let td = tempdir().unwrap();
let path = td.path().join("changelog");
let source_name = "test-package";
let version = Version {
epoch: None,
upstream_version: "1.0".to_string(),
debian_revision: Some("1".to_string()),
};
let author = Some(("Test Author".to_string(), "test@example.com".to_string()));
let wnpp_bugs = vec![(123456, BugKind::ITP)];
write_changelog_template(&path, source_name, &version, author, wnpp_bugs).unwrap();
let content = fs::read_to_string(&path).unwrap();
assert!(content.contains("test-package (1.0-1) UNRELEASED"));
assert!(content.contains("* Initial release. Closes: #123456"));
assert!(content.contains("Test Author <test@example.com>"));
}
#[test]
fn test_write_changelog_template_no_bugs() {
let td = tempdir().unwrap();
let path = td.path().join("changelog");
let source_name = "test-package";
let version = Version {
epoch: None,
upstream_version: "1.0".to_string(),
debian_revision: Some("1".to_string()),
};
let author = Some(("Test Author".to_string(), "test@example.com".to_string()));
let wnpp_bugs = vec![];
write_changelog_template(&path, source_name, &version, author, wnpp_bugs).unwrap();
let content = fs::read_to_string(&path).unwrap();
assert!(content.contains("test-package (1.0-1) UNRELEASED"));
assert!(content.contains("* Initial release."));
assert!(!content.contains("Closes:"));
assert!(content.contains("Test Author <test@example.com>"));
}
#[test]
fn test_default_debianize_cache_dir() {
let _result = default_debianize_cache_dir();
}
#[test]
fn test_upstream_version_from() {
let version_str = "1.2.3".to_string();
let upstream_version = UpstreamVersion::from(version_str.clone());
assert_eq!(upstream_version.version, "1.2.3");
assert_eq!(
upstream_version.mangled_version,
debianize_upstream_version(&version_str)
);
}
#[test]
fn test_debianize_preferences_default() {
let prefs = DebianizePreferences::default();
assert_eq!(prefs.use_inotify, None);
assert_eq!(prefs.diligence, 0);
assert!(!prefs.trust);
assert!(!prefs.check);
assert!(prefs.net_access);
assert!(!prefs.force_subprocess);
assert!(!prefs.force_new_directory);
assert_eq!(prefs.compat_release, None);
assert_eq!(prefs.minimum_certainty, Certainty::Confident);
assert!(prefs.consult_external_directory);
assert!(!prefs.verbose);
match prefs.session {
#[cfg(target_os = "linux")]
SessionPreferences::Unshare(ref path) if path.as_os_str().is_empty() => {}
#[cfg(not(target_os = "linux"))]
SessionPreferences::Plain => {}
_ => panic!("Expected default isolated session (Unshare on Linux, Plain otherwise)"),
}
assert!(prefs.create_dist.is_none());
assert!(prefs.committer.is_none());
assert_eq!(prefs.upstream_version_kind, VersionKind::Auto);
assert_eq!(prefs.debian_revision, "1".to_string());
assert!(prefs.team.is_none());
}
#[test]
fn test_debianize_preferences_into_fixer_preferences() {
let debianize_prefs = DebianizePreferences {
use_inotify: Some(true),
diligence: 2,
trust: true,
check: true,
net_access: false,
force_subprocess: true,
force_new_directory: true,
compat_release: Some("stable".to_string()),
minimum_certainty: Certainty::Certain,
consult_external_directory: false,
verbose: true,
session: SessionPreferences::Plain,
create_dist: None,
committer: None,
upstream_version_kind: VersionKind::Release,
debian_revision: "2".to_string(),
team: None,
author: None,
compat_level: Some(13),
check_wnpp: true,
run_fixers: true,
};
let fixer_prefs: lintian_brush::FixerPreferences = debianize_prefs.into();
assert_eq!(fixer_prefs.diligence, Some(2));
assert_eq!(fixer_prefs.net_access, Some(false));
assert_eq!(fixer_prefs.compat_release, Some("stable".to_string()));
assert_eq!(fixer_prefs.minimum_certainty, Some(Certainty::Certain));
assert_eq!(fixer_prefs.trust_package, Some(true));
assert_eq!(fixer_prefs.opinionated, Some(true));
assert_eq!(fixer_prefs.allow_reformatting, Some(true));
}
#[test]
fn test_error_display() {
let error = Error::NoVcsLocation;
assert_eq!(format!("{}", error), "No VCS location found.");
let error = Error::SourcePackageNameInvalid("invalid:name".to_string());
assert_eq!(
format!("{}", error),
"Invalid source package name: invalid:name."
);
let error = Error::IoError(std::io::Error::new(
std::io::ErrorKind::NotFound,
"File not found",
));
assert_eq!(format!("{}", error), "I/O error: File not found");
let error = Error::NoUpstreamReleases(Some("test-package".to_string()));
assert_eq!(
format!("{}", error),
"No upstream releases found for test-package."
);
let error = Error::NoUpstreamReleases(None);
assert_eq!(
format!("{}", error),
"No upstream releases found for unknown."
);
}
#[test]
fn test_session_preferences_create_session() {
let plain_pref = SessionPreferences::Plain;
let session = plain_pref.create_session().unwrap();
let _pwd = session.pwd();
if std::process::Command::new("which")
.arg("schroot")
.output()
.map(|o| o.status.success())
.unwrap_or(false)
{
let schroot_pref = SessionPreferences::Schroot("test-chroot".to_string());
let result = schroot_pref.create_session();
assert!(result.is_err());
}
let temp_dir = tempdir().unwrap();
let tarball_path = temp_dir.path().join("test.tar.gz");
std::fs::write(&tarball_path, b"dummy tarball content").unwrap();
let unshare_pref = SessionPreferences::Unshare(tarball_path.clone());
let result = unshare_pref.create_session();
assert!(result.is_err());
}
#[test]
fn test_detect_buildsystem_name() {
use std::fs;
let temp_dir = tempdir().unwrap();
let temp_path = temp_dir.path();
fs::write(temp_path.join("setup.py"), "#!/usr/bin/env python").unwrap();
let expected_buildfiles = [
("setup.py", "setup.py"),
("pyproject.toml", "setup.py"),
("package.json", "npm"),
("pom.xml", "maven"),
("dist.ini", "dist-zilla"),
("Makefile.PL", "makefile.pl"),
("Build.PL", "perl-build-tiny"),
("Cargo.toml", "cargo"),
("go.mod", "golang"),
("DESCRIPTION", "R"),
("DESCRIPTION.in", "octave"),
("Makefile", "make"),
("CMakeLists.txt", "cmake"),
("configure.ac", "autotools"),
("configure.in", "autotools"),
];
assert_eq!(expected_buildfiles.len(), 15);
assert_eq!(expected_buildfiles[0], ("setup.py", "setup.py"));
assert_eq!(expected_buildfiles[2], ("package.json", "npm"));
assert_eq!(expected_buildfiles[7], ("Cargo.toml", "cargo"));
}
#[test]
fn test_determine_browser_url() {
let github_https = url::Url::parse("https://github.com/user/repo.git").unwrap();
let browser_url = determine_browser_url("git", &github_https);
assert_eq!(
browser_url,
Some("https://github.com/user/repo".to_string())
);
let github_git = url::Url::parse("git://github.com/user/repo.git").unwrap();
let browser_url = determine_browser_url("git", &github_git);
assert_eq!(
browser_url,
Some("https://github.com/user/repo".to_string())
);
let github_no_git = url::Url::parse("https://github.com/user/repo").unwrap();
let browser_url = determine_browser_url("git", &github_no_git);
assert_eq!(
browser_url,
Some("https://github.com/user/repo".to_string())
);
let gitlab_url = url::Url::parse("https://gitlab.com/user/repo.git").unwrap();
let browser_url = determine_browser_url("git", &gitlab_url);
assert_eq!(
browser_url,
Some("https://gitlab.com/user/repo".to_string())
);
let salsa_url = url::Url::parse("https://salsa.debian.org/user/repo.git").unwrap();
let browser_url = determine_browser_url("git", &salsa_url);
assert_eq!(
browser_url,
Some("https://salsa.debian.org/user/repo".to_string())
);
let unknown_url = url::Url::parse("https://example.com/user/repo.git").unwrap();
let browser_url = determine_browser_url("git", &unknown_url);
assert_eq!(browser_url, None);
let svn_url = url::Url::parse("svn://example.com/repo").unwrap();
let browser_url = determine_browser_url("svn", &svn_url);
assert_eq!(browser_url, None);
}
#[test]
fn test_unsplit_vcs_url() {
let url = url::Url::parse("https://github.com/user/repo").unwrap();
let result = unsplit_vcs_url("git", &url);
assert_eq!(result, "https://github.com/user/repo");
let url = url::Url::parse("https://github.com/user/repo/tree/main").unwrap();
let result = unsplit_vcs_url("git", &url);
assert_eq!(result, "https://github.com/user/repo/tree/main");
}
#[test]
fn test_get_maintainer() {
let (name, email) = get_maintainer();
assert!(!name.is_empty());
assert!(!email.is_empty());
assert!(email.contains("@"));
}
#[test]
fn test_find_wnpp_bugs_for_package() {
let result = find_wnpp_bugs_for_package("test-package", Some("upstream-name"));
assert!(result.is_ok() || result.is_err());
let result2 = find_wnpp_bugs_for_package("test-package", None);
assert!(result2.is_ok() || result2.is_err()); }
#[test]
fn test_import_metadata_from_path() {
let temp_dir = tempdir().unwrap();
let temp_path = temp_dir.path();
let pyproject_content = r#"
[project]
name = "test-package"
version = "1.0.0"
description = "A test package"
"#;
std::fs::write(temp_path.join("pyproject.toml"), pyproject_content).unwrap();
let _metadata = UpstreamMetadata::new();
let prefs = DebianizePreferences::default();
assert!(!prefs.trust);
assert!(prefs.net_access);
assert!(!prefs.check);
}
#[test]
fn test_write_initial_changelog() {
let temp_dir = tempdir().unwrap();
let debian_path = temp_dir.path().join("debian");
std::fs::create_dir_all(&debian_path).unwrap();
let changelog_path = debian_path.join("changelog");
let source_name = "test-package";
let version = UpstreamVersion::from("1.0.0".to_string());
let author = ("Test Author".to_string(), "test@example.com".to_string());
let wnpp_bugs = vec![(123456, BugKind::ITP)];
write_changelog_template(
&changelog_path,
source_name,
&version.as_debian_version(),
Some(author),
wnpp_bugs,
)
.unwrap();
let content = std::fs::read_to_string(&changelog_path).unwrap();
assert!(content.contains("test-package"));
assert!(content.contains("1.0.0-1"));
assert!(content.contains("Test Author"));
assert!(content.contains("Closes: #123456"));
}
#[test]
fn test_upstream_version_as_debian_version() {
let upstream_version = UpstreamVersion::from("1.0.0".to_string());
let debian_version = upstream_version.as_debian_version();
assert_eq!(debian_version.upstream_version, "1.0.0");
assert_eq!(debian_version.debian_revision, Some("1".to_string()));
assert_eq!(debian_version.epoch, None);
}
#[test]
fn test_upstream_version_mangled() {
let upstream_version = UpstreamVersion::from("1.0.0-beta1".to_string());
assert_eq!(upstream_version.version, "1.0.0-beta1");
assert!(!upstream_version.mangled_version.is_empty());
let upstream_version2 = UpstreamVersion::from("1.0.0~beta1".to_string());
assert_eq!(upstream_version2.version, "1.0.0~beta1");
assert!(!upstream_version2.mangled_version.is_empty());
}
#[test]
fn test_debianize_result_structure() {
let result = DebianizeResult {
vcs_url: Some(url::Url::parse("https://github.com/user/repo").unwrap()),
wnpp_bugs: vec![(123456, BugKind::ITP)],
upstream_version: Some("1.0.0".to_string()),
tag_names: HashMap::new(),
upstream_branch_name: Some("upstream".to_string()),
};
assert!(result.vcs_url.is_some());
assert_eq!(result.wnpp_bugs.len(), 1);
assert_eq!(result.wnpp_bugs[0].0, 123456);
assert_eq!(result.wnpp_bugs[0].1, BugKind::ITP);
assert_eq!(result.upstream_version, Some("1.0.0".to_string()));
assert_eq!(result.tag_names.len(), 0);
assert_eq!(result.upstream_branch_name, Some("upstream".to_string()));
}
#[test]
fn test_version_kind_default() {
let prefs = DebianizePreferences::default();
assert_eq!(prefs.upstream_version_kind, VersionKind::Auto);
let prefs = DebianizePreferences {
upstream_version_kind: VersionKind::Release,
..Default::default()
};
assert_eq!(prefs.upstream_version_kind, VersionKind::Release);
}
#[test]
fn test_error_variants() {
let errors = vec![
Error::NoVcsLocation,
Error::SourceNameUnknown(Some("test".to_string())),
Error::SourceNameUnknown(None),
Error::SourcePackageNameInvalid("invalid".to_string()),
Error::MissingUpstreamInfo("test".to_string()),
Error::NoUpstreamReleases(Some("test".to_string())),
Error::NoUpstreamReleases(None),
Error::Other("test".to_string()),
];
for error in errors {
let _display = format!("{}", error);
let _debug = format!("{:?}", error);
}
}
#[test]
fn test_session_preferences_all_variants() {
let plain = SessionPreferences::Plain;
let schroot = SessionPreferences::Schroot("test".to_string());
let temp_dir = tempdir().unwrap();
let tarball_path = temp_dir.path().join("test.tar.gz");
std::fs::write(&tarball_path, b"test").unwrap();
let unshare = SessionPreferences::Unshare(tarball_path);
assert_ne!(
std::mem::discriminant(&plain),
std::mem::discriminant(&schroot)
);
assert_ne!(
std::mem::discriminant(&plain),
std::mem::discriminant(&unshare)
);
assert_ne!(
std::mem::discriminant(&schroot),
std::mem::discriminant(&unshare)
);
}
#[test]
fn test_bug_kind_usage() {
let itp_bug = BugKind::ITP;
let rfp_bug = BugKind::RFP;
let bugs = [(123456, itp_bug), (789012, rfp_bug)];
assert_eq!(bugs.len(), 2);
assert_ne!(
std::mem::discriminant(&BugKind::ITP),
std::mem::discriminant(&BugKind::RFP)
);
}
#[test]
fn test_upstream_metadata_basic_usage() {
let metadata = UpstreamMetadata::new();
assert_eq!(metadata.name(), None);
assert_eq!(metadata.summary(), None);
assert_eq!(metadata.description(), None);
assert_eq!(metadata.homepage(), None);
assert_eq!(metadata.repository(), None);
assert_eq!(metadata.archive(), None);
assert_eq!(metadata.license(), None);
assert_eq!(metadata.maintainer(), None);
assert_eq!(metadata.author(), None);
assert_eq!(metadata.version(), None);
}
#[test]
fn test_compat_level_handling() {
let mut prefs = DebianizePreferences::default();
assert_eq!(prefs.compat_level, None);
prefs.compat_level = Some(13);
assert_eq!(prefs.compat_level, Some(13));
prefs.compat_level = Some(14);
assert_eq!(prefs.compat_level, Some(14));
}
#[test]
fn test_run_fixers_flag() {
let mut prefs = DebianizePreferences::default();
assert!(prefs.run_fixers);
prefs.run_fixers = false;
assert!(!prefs.run_fixers);
prefs.run_fixers = true;
assert!(prefs.run_fixers);
}
#[test]
fn test_check_wnpp_flag() {
let mut prefs = DebianizePreferences::default();
assert!(prefs.check_wnpp);
prefs.check_wnpp = false;
assert!(!prefs.check_wnpp);
prefs.check_wnpp = true;
assert!(prefs.check_wnpp);
}
}
#[derive(Debug, Clone)]
pub enum SessionPreferences {
Plain,
Schroot(String),
Unshare(PathBuf),
}
impl SessionPreferences {
pub fn default_isolated() -> Self {
#[cfg(target_os = "linux")]
{
SessionPreferences::Unshare(PathBuf::new()) }
#[cfg(not(target_os = "linux"))]
{
SessionPreferences::Plain
}
}
pub fn acquire(&self) -> Result<Box<dyn ognibuild::session::Session>, Error> {
self.create_session()
}
pub fn create_session(&self) -> Result<Box<dyn ognibuild::session::Session>, Error> {
match self {
SessionPreferences::Plain => {
Ok(Box::new(ognibuild::session::plain::PlainSession::new()))
}
SessionPreferences::Schroot(name) => {
#[cfg(target_os = "linux")]
{
ognibuild::session::schroot::SchrootSession::new(name, None)
.map(Box::new)
.map(|b| b as Box<dyn ognibuild::session::Session>)
.map_err(|e| {
Error::Other(format!("Failed to create schroot session: {}", e))
})
}
#[cfg(not(target_os = "linux"))]
{
Err(Error::Other(
"Schroot is only available on Linux".to_string(),
))
}
}
SessionPreferences::Unshare(path) => {
#[cfg(target_os = "linux")]
{
if path.as_os_str().is_empty() {
log::info!("Creating unshare session from cached Debian sid image");
ognibuild::session::unshare::create_debian_session_for_testing(
"sid",
false )
.map(Box::new)
.map(|b| b as Box<dyn ognibuild::session::Session>)
.map_err(|e| {
Error::Other(format!(
"Failed to create unshare session from cached image: {}. \
Ensure the image is cached at ~/.cache/ognibuild/images/debian-sid-*.tar.gz",
e
))
})
} else {
ognibuild::session::unshare::UnshareSession::from_tarball(path)
.map(Box::new)
.map(|b| b as Box<dyn ognibuild::session::Session>)
.map_err(|e| {
Error::Other(format!("Failed to create unshare session: {}", e))
})
}
}
#[cfg(not(target_os = "linux"))]
{
Err(Error::Other(
"Unshare is only available on Linux".to_string(),
))
}
}
}
}
}
pub struct DebianizePreferences {
pub use_inotify: Option<bool>,
pub diligence: u8,
pub trust: bool,
pub check: bool,
pub net_access: bool,
pub force_subprocess: bool,
pub force_new_directory: bool,
pub compat_release: Option<String>,
pub minimum_certainty: Certainty,
pub consult_external_directory: bool,
pub verbose: bool,
pub session: SessionPreferences,
pub create_dist: Option<
Box<
dyn for<'a, 'b, 'c, 'd, 'e> Fn(
&'a dyn PyTree,
&'b str,
&'c Version,
&'d Path,
&'e Path,
)
-> Result<bool, breezyshim::debian::error::Error>,
>,
>,
pub committer: Option<String>,
pub upstream_version_kind: VersionKind,
pub debian_revision: String,
pub team: Option<String>,
pub author: Option<String>,
pub compat_level: Option<u32>,
pub check_wnpp: bool,
pub run_fixers: bool,
}
impl Default for DebianizePreferences {
fn default() -> Self {
let author = debian_changelog::get_maintainer();
Self {
use_inotify: None,
diligence: 0,
trust: false,
check: false,
net_access: true,
force_subprocess: false,
force_new_directory: false,
compat_release: None,
minimum_certainty: Certainty::Confident,
consult_external_directory: true,
verbose: false,
session: SessionPreferences::default_isolated(),
create_dist: None,
committer: None,
upstream_version_kind: VersionKind::Auto,
debian_revision: "1".to_string(),
team: None,
author: author.map(|(name, email)| format!("{} <{}>", name, email)),
compat_level: None,
check_wnpp: true,
run_fixers: true,
}
}
}
impl From<DebianizePreferences> for lintian_brush::FixerPreferences {
fn from(p: DebianizePreferences) -> Self {
Self {
diligence: Some(p.diligence.into()),
net_access: Some(p.net_access),
compat_release: p.compat_release,
minimum_certainty: Some(p.minimum_certainty),
trust_package: Some(p.trust),
opinionated: Some(true),
allow_reformatting: Some(true),
..Default::default()
}
}
}
#[derive(Debug)]
pub enum Error {
DebianDirectoryExists(PathBuf),
DebianizedPackageRequirementMismatch {
dep: DebianDependency,
binary_names: Vec<String>,
version: Version,
branch: Option<url::Url>,
},
EditorError(debian_analyzer::editor::EditorError),
MissingUpstreamInfo(String),
NoVcsLocation,
NoUpstreamReleases(Option<String>),
SourcePackageNameInvalid(String),
SubdirectoryNotFound {
subpath: PathBuf,
version: Option<String>,
},
IoError(std::io::Error),
BrzError(BrzError),
SqlxError(sqlx::Error),
SourceNameUnknown(Option<String>),
Other(String),
ProviderError(ProviderError),
UncommittedChanges,
}
impl From<sqlx::Error> for Error {
fn from(e: sqlx::Error) -> Self {
Error::SqlxError(e)
}
}
impl From<BrzError> for Error {
fn from(e: BrzError) -> Self {
Error::BrzError(e)
}
}
impl From<std::io::Error> for Error {
fn from(e: std::io::Error) -> Self {
Error::IoError(e)
}
}
impl From<debian_analyzer::editor::EditorError> for Error {
fn from(e: debian_analyzer::editor::EditorError) -> Self {
match e {
debian_analyzer::editor::EditorError::IoError(e) => Error::IoError(e),
debian_analyzer::editor::EditorError::BrzError(e) => Error::BrzError(e),
e => Error::EditorError(e),
}
}
}
impl From<ProviderError> for Error {
fn from(e: ProviderError) -> Self {
Error::ProviderError(e)
}
}
impl From<BrzDebianError> for Error {
fn from(e: BrzDebianError) -> Self {
match e {
BrzDebianError::BrzError(brz_err) => Error::BrzError(brz_err),
_ => Error::Other(format!("Debian error: {:?}", e)),
}
}
}
impl std::fmt::Display for Error {
fn fmt(&self, f: &mut std::fmt::Formatter) -> std::fmt::Result {
use Error::*;
match self {
DebianDirectoryExists(path) => {
write!(f, "Debian directory already exists at {}.", path.display())
}
DebianizedPackageRequirementMismatch {
dep,
binary_names,
version,
branch,
} => {
write!(
f,
"Debianized package {} (version: {}) from {} does not match requirements for {}.",
binary_names.join(", "),
version,
branch.as_ref().map_or_else(|| "unknown branch".to_string(), |b| b.to_string()),
dep.relation_string(),
)
}
NoVcsLocation => {
write!(f, "No VCS location found.")
}
NoUpstreamReleases(source_name) => {
write!(
f,
"No upstream releases found for {}.",
source_name.as_deref().unwrap_or("unknown")
)
}
SourcePackageNameInvalid(name) => write!(f, "Invalid source package name: {}.", name),
SubdirectoryNotFound { subpath, version } => {
write!(
f,
"Subdirectory {} not found in upstream source{}.",
subpath.display(),
version
.as_ref()
.map(|v| format!(" for version {}", v))
.unwrap_or_default()
)
}
IoError(e) => write!(f, "I/O error: {}", e),
BrzError(e) => write!(f, "Breezy error: {}", e),
MissingUpstreamInfo(name) => write!(f, "Missing upstream information for {}.", name),
EditorError(e) => write!(f, "Editor error: {}", e),
SqlxError(e) => write!(f, "SQLx error: {}", e),
SourceNameUnknown(name) => write!(
f,
"Unable to determine source name{}",
name.as_ref()
.map(|n| format!(" from {}", n))
.unwrap_or_default()
),
Other(msg) => write!(f, "{}", msg),
ProviderError(e) => write!(f, "Provider error: {:?}", e),
UncommittedChanges => write!(
f,
"Working tree has uncommitted changes. Use --force to override."
),
}
}
}
impl std::error::Error for Error {}
pub fn debianize(
wt: &GenericWorkingTree,
subpath: &Path,
upstream_branch: Option<&dyn PyBranch>,
_upstream_subpath: Option<&Path>,
preferences: &DebianizePreferences,
version: Option<&str>,
upstream_metadata: &UpstreamMetadata,
) -> Result<DebianizeResult, Error> {
let _lock = wt.lock_write()?;
let debian_path = subpath.join("debian");
if wt.has_filename(&debian_path) {
if !preferences.force_new_directory {
return Err(Error::DebianDirectoryExists(debian_path));
}
wt.remove(&[&debian_path])?;
}
let _reset_guard = ResetOnFailure::new(wt, subpath)?;
let mut metadata = upstream_metadata.clone();
import_metadata_from_path(wt, subpath, &mut metadata, preferences)?;
if upstream_branch.is_none() {
return Err(Error::Other("No upstream branch provided".to_string()));
}
let upstream_source = if let Some(upstream_branch) = upstream_branch {
log::info!("Creating UpstreamBranchSource for enhanced upstream metadata handling");
let controldir = wt.controldir();
match controldir
.as_any()
.downcast_ref::<breezyshim::controldir::GenericControlDir>()
{
Some(generic_controldir) => {
let py_controldir: &dyn breezyshim::controldir::PyControlDir = generic_controldir;
let create_dist_fn: Option<
Box<
dyn Fn(
&dyn PyTree,
&str,
&str,
&Path,
&Path,
) -> Result<OsString, BrzDebianError>
+ Send
+ Sync,
>,
> = preferences.create_dist.as_ref().map(|_f| {
Box::new(
|_tree: &dyn PyTree,
package: &str,
version: &str,
_target_dir: &Path,
_subpath: &Path| {
Ok(OsString::from(format!("{}-{}.tar.gz", package, version)))
},
)
as Box<
dyn Fn(
&dyn PyTree,
&str,
&str,
&Path,
&Path,
)
-> Result<OsString, BrzDebianError>
+ Send
+ Sync,
>
});
match UpstreamBranchSource::from_branch(
upstream_branch,
Some(preferences.upstream_version_kind.clone()),
py_controldir,
create_dist_fn,
) {
Ok(source) => {
log::info!("Successfully created UpstreamBranchSource");
Some(source)
}
Err(e) => {
log::warn!("Failed to create UpstreamBranchSource: {:?}, falling back to basic version detection", e);
None
}
}
}
None => {
log::warn!("Failed to downcast ControlDir to GenericControlDir, falling back to basic version detection");
None
}
}
} else {
None
};
let upstream_version = if let Some(v) = version {
UpstreamVersion::from(v.to_string())
} else if let Some(ref upstream_source) = upstream_source {
determine_upstream_version(
upstream_source,
&metadata,
preferences.upstream_version_kind.clone(),
)?
} else {
return Err(Error::Other(
"Cannot determine upstream version without upstream source".to_string(),
));
};
log::info!("Using upstream version: {}", upstream_version.version);
let session = preferences.session.create_session()?;
let upstream_branch_ref = upstream_branch.unwrap();
let _orig_revid = upstream_branch_ref.last_revision();
let source_name = generic_get_source_name(wt, subpath, &metadata)
.ok_or_else(|| Error::SourceNameUnknown(metadata.name().map(|s| s.to_string())))?;
let _upstream_branch_name = basic_import_upstream_version(
wt,
subpath,
upstream_branch_ref,
&source_name,
&upstream_version.version,
)?;
let _import_tag_names: HashMap<TarballKind, String> = HashMap::new();
let buildsystem_subpath = if subpath == Path::new("") {
PathBuf::new()
} else {
subpath.to_path_buf()
};
if !wt.has_filename(&debian_path) {
wt.mkdir(&debian_path)?;
} else if !wt.is_versioned(&debian_path) {
wt.add(&[&debian_path])?;
}
let (maintainer_name, maintainer_email) = if let Some(ref author) = preferences.author {
let parts: Vec<&str> = author.splitn(2, '<').collect();
if parts.len() == 2 {
let name = parts[0].trim().to_string();
let email = parts[1].trim_end_matches('>').to_string();
(name, email)
} else {
get_maintainer()
}
} else {
get_maintainer()
};
let maintainer = format!("{} <{}>", maintainer_name, maintainer_email);
let buildsystem_path = if subpath.as_os_str().is_empty() {
wt.basedir()
} else {
wt.basedir().join(subpath)
};
log::debug!("Detecting buildsystems in path: {:?}", buildsystem_path);
let buildsystems = ognibuild::buildsystem::detect_buildsystems(&buildsystem_path);
let buildsystem = buildsystems
.into_iter()
.next()
.ok_or_else(|| Error::Other("No buildsystem detected".to_string()))?;
let compat_release = preferences
.compat_release
.clone()
.unwrap_or_else(|| "unstable".to_string());
let kickstart_from_dist: Option<
Box<dyn FnOnce(&dyn PyWorkingTree, &Path) -> Result<(), Error>>,
> = None;
processors::process(
session.as_ref(),
wt,
subpath.to_path_buf(),
debian_path.clone(),
upstream_version.version.clone(),
&metadata,
compat_release,
buildsystem,
buildsystem_subpath,
Some(maintainer.clone()),
kickstart_from_dist,
)?;
let control_path = wt.abspath(&debian_path.join("control"))?;
let control = debian_analyzer::control::TemplatedControlEditor::open(&control_path)
.map_err(|e| Error::Other(format!("Failed to open debian/control: {}", e)))?;
let source_name = control
.source()
.and_then(|s| s.name())
.unwrap_or(source_name);
if !valid_debian_package_name(&source_name) {
return Err(Error::SourcePackageNameInvalid(source_name));
}
log::info!("Using source package name: {}", source_name);
let source_path = debian_path.join("source");
if !wt.has_filename(&source_path) {
wt.mkdir(&source_path)?;
wt.add(&[&source_path])?;
}
let format_file = source_path.join("format");
wt.put_file_bytes_non_atomic(&format_file, b"3.0 (quilt)\n")?;
let mut vcs_url = None;
match vcs::update_official_vcs(
wt,
subpath,
None, preferences.committer.as_deref(),
false, false, ) {
Ok(url) => {
vcs_url = url.parse().ok();
log::info!("Successfully set VCS URL: {}", url);
}
Err(e) => {
log::debug!("Could not update VCS information: {}", e);
match determine_vcs_url(&wt.branch(), subpath) {
Ok(url) => {
let vcs_type = match wt.branch().vcs_type() {
breezyshim::foreign::VcsType::Git => "git",
breezyshim::foreign::VcsType::Bazaar => "bzr",
breezyshim::foreign::VcsType::Svn => "svn",
breezyshim::foreign::VcsType::Hg => "hg",
breezyshim::foreign::VcsType::Cvs => "cvs",
breezyshim::foreign::VcsType::Darcs => "darcs",
breezyshim::foreign::VcsType::Fossil => "fossil",
breezyshim::foreign::VcsType::Arch => "arch",
breezyshim::foreign::VcsType::Svk => "svk",
};
vcs_url = Some(url.clone());
update_vcs_fields(wt, &debian_path, &url, vcs_type)?;
}
Err(e) => {
log::debug!("Could not determine VCS URL: {}", e);
}
}
}
}
let wnpp_bugs = if preferences.check_wnpp {
find_wnpp_bugs_for_package(&source_name, metadata.name())?
} else {
vec![]
};
log::info!(
"Writing initial changelog with source_name: {}",
source_name
);
write_initial_changelog(
wt,
&debian_path,
&source_name,
&upstream_version.mangled_version,
&maintainer_name,
&maintainer_email,
&wnpp_bugs,
)?;
let upstream_revid = upstream_branch.unwrap().last_revision();
let tag_names = commit_debianization(
wt,
subpath,
&upstream_revid,
&upstream_version.version,
&upstream_version.mangled_version,
)?;
if preferences.run_fixers {
run_debianize_fixers(wt, subpath, preferences)?;
}
Ok(DebianizeResult {
vcs_url,
wnpp_bugs,
upstream_version: Some(upstream_version.version),
tag_names,
upstream_branch_name: upstream_branch.and_then(|b| b.name()),
})
}
#[derive(Default, Debug, serde::Serialize)]
pub struct DebianizeResult {
pub vcs_url: Option<url::Url>,
pub wnpp_bugs: Vec<(BugId, BugKind)>,
pub upstream_version: Option<String>,
pub tag_names: HashMap<String, RevisionId>,
pub upstream_branch_name: Option<String>,
}
pub(crate) struct ResetOnFailure<'a>(&'a dyn PyWorkingTree, PathBuf);
impl<'a> ResetOnFailure<'a> {
pub fn new(wt: &'a dyn PyWorkingTree, subpath: &Path) -> Result<Self, BrzError> {
match wt.basis_tree() {
Ok(basis_tree) => {
match breezyshim::workspace::check_clean_tree(wt, &basis_tree, subpath) {
Ok(_) => {}
Err(BrzError::Other(ref py_err))
if py_err.to_string().contains("IndexError") =>
{
log::warn!("Ignoring dirstate IndexError during clean tree check");
}
Err(e) => return Err(e),
}
}
Err(e) => {
log::warn!("Could not get basis tree: {:?}", e);
}
}
Ok(Self(wt, subpath.to_path_buf()))
}
}
impl<'a> Drop for ResetOnFailure<'a> {
fn drop(&mut self) {
if std::thread::panicking() {
match breezyshim::workspace::reset_tree(self.0, None, Some(&self.1)) {
Ok(_) => log::info!("Reset tree after failure"),
Err(e) => log::error!("Failed to reset tree: {:?}", e),
}
}
}
}
fn run_debianize_fixers(
wt: &GenericWorkingTree,
subpath: &Path,
preferences: &DebianizePreferences,
) -> Result<(), Error> {
log::info!("Running lintian fixers on debianized package");
let fixers: Vec<Box<dyn lintian_brush::Fixer>> = lintian_brush::all_lintian_fixers().collect();
log::info!("Found {} lintian fixers", fixers.len());
let fixer_preferences: lintian_brush::FixerPreferences = lintian_brush::FixerPreferences {
compat_release: preferences.compat_release.clone(),
minimum_certainty: Some(preferences.minimum_certainty),
trust_package: Some(preferences.trust),
allow_reformatting: Some(true),
net_access: Some(preferences.net_access),
opinionated: Some(true),
diligence: Some(preferences.diligence as i32),
..Default::default()
};
match lintian_brush::run_lintian_fixers(
wt,
&fixers,
None::<fn() -> bool>, preferences.verbose,
preferences.committer.as_deref(),
&fixer_preferences,
Some(true), Some(subpath),
None, None, None, ) {
Ok(result) => {
log::info!("Lintian fixers completed successfully");
let success_count = result.success.len();
let failed_count = result.failed_fixers.len();
log::info!(
"Lintian fixers completed: {} applied, {} failed",
success_count,
failed_count
);
if failed_count > 0 {
log::warn!("Some lintian fixers failed to apply");
for (fixer_name, error) in &result.failed_fixers {
log::debug!("Fixer {} failed: {:?}", fixer_name, error);
}
}
}
Err(e) => {
log::warn!("Failed to run lintian fixers: {:?}", e);
}
}
Ok(())
}
fn generic_get_source_name(
wt: &dyn PyWorkingTree,
subpath: &Path,
metadata: &UpstreamMetadata,
) -> Option<String> {
let mut source_name = if let Some(name) = metadata.name() {
let mut source_name = names::upstream_name_to_debian_source_name(name);
if !valid_debian_package_name(source_name.as_ref().unwrap()) {
source_name = None;
}
source_name
} else {
None
};
if source_name.is_none() {
source_name = names::upstream_name_to_debian_source_name(
wt.abspath(subpath).unwrap().to_str().unwrap(),
);
if !valid_debian_package_name(source_name.as_ref().unwrap()) {
source_name = None;
}
}
source_name
}
fn import_metadata_from_path(
tree: &dyn PyWorkingTree,
subpath: &Path,
metadata: &mut UpstreamMetadata,
preferences: &DebianizePreferences,
) -> Result<(), ProviderError> {
let p = tree.abspath(subpath).unwrap();
let rt = tokio::runtime::Runtime::new().unwrap();
metadata.update(rt.block_on(get_upstream_info(
&p,
Some(preferences.trust),
Some(preferences.net_access),
Some(preferences.consult_external_directory),
Some(preferences.check),
))?);
Ok(())
}
#[derive(Clone)]
pub struct UpstreamVersion {
pub version: String,
pub mangled_version: String,
}
impl From<String> for UpstreamVersion {
fn from(v: String) -> Self {
Self {
version: v.clone(),
mangled_version: debianize_upstream_version(&v),
}
}
}
impl UpstreamVersion {
pub fn as_debian_version(&self) -> Version {
Version {
epoch: None,
upstream_version: self.mangled_version.clone(),
debian_revision: Some("1".to_string()),
}
}
}
pub fn determine_upstream_version(
upstream_source: &UpstreamBranchSource,
metadata: &UpstreamMetadata,
version_kind: VersionKind,
) -> Result<UpstreamVersion, Error> {
let name = metadata.name();
if let Some((upstream_version, mangled_version)) =
upstream_source.get_latest_version(name, None).unwrap()
{
return Ok(UpstreamVersion {
version: upstream_version,
mangled_version,
});
}
if version_kind == VersionKind::Release {
return Err(Error::NoUpstreamReleases(
metadata.name().map(|x| x.to_string()),
));
}
let upstream_revision = upstream_source.upstream_branch().last_revision();
if let Some(next_upstream_version) = metadata.version() {
let next_upstream_version = debianize_upstream_version(next_upstream_version);
let upstream_version = upstream_version_add_revision(
upstream_source.upstream_branch().as_ref(),
&next_upstream_version,
&upstream_revision,
Some("~"),
)
.map_err(|e| match e {
BrzDebianError::BrzError(brz_err) => Error::BrzError(brz_err),
_ => Error::BrzError(BrzError::Other(pyo3::PyErr::new::<
pyo3::exceptions::PyRuntimeError,
_,
>(format!(
"Debian error: {:?}",
e
)))),
})?;
return Ok(UpstreamVersion::from(upstream_version));
}
let upstream_version = upstream_version_add_revision(
upstream_source.upstream_branch().as_ref(),
"0",
&upstream_revision,
Some("+"),
)
.map_err(|e| match e {
BrzDebianError::BrzError(brz_err) => Error::BrzError(brz_err),
_ => Error::BrzError(BrzError::Other(pyo3::PyErr::new::<
pyo3::exceptions::PyRuntimeError,
_,
>(format!("Debian error: {:?}", e)))),
})?;
log::warn!(
"Unable to determine upstream version, using {}.",
upstream_version
);
Ok(UpstreamVersion::from(upstream_version))
}
fn get_maintainer() -> (String, String) {
if let (Ok(name), Ok(email)) = (std::env::var("DEBFULLNAME"), std::env::var("DEBEMAIL")) {
return (name, email);
}
let name = Command::new("git")
.args(["config", "user.name"])
.output()
.ok()
.and_then(|o| String::from_utf8(o.stdout).ok())
.map(|s| s.trim().to_string())
.unwrap_or_else(|| "Debian Maintainer".to_string());
let email = Command::new("git")
.args(["config", "user.email"])
.output()
.ok()
.and_then(|o| String::from_utf8(o.stdout).ok())
.map(|s| s.trim().to_string())
.unwrap_or_else(|| "maintainer@debian.org".to_string());
(name, email)
}
fn determine_vcs_url(branch: &dyn Branch, subpath: &Path) -> Result<url::Url, Error> {
let branch_url = match branch.get_public_branch() {
Some(url) => url,
None => {
match branch.get_push_location() {
Some(url) => url,
None => {
branch.get_user_url().to_string()
}
}
}
};
let url = branch_url
.parse::<url::Url>()
.map_err(|e| Error::Other(format!("Failed to parse branch URL: {}", e)))?;
if !subpath.as_os_str().is_empty() && subpath != Path::new(".") {
let mut url = url;
let current_path = url.path();
let new_path = if current_path.ends_with('/') {
format!("{}{}", current_path, subpath.display())
} else if current_path.is_empty() || current_path == "/" {
format!("/{}", subpath.display())
} else {
format!("{}/{}", current_path, subpath.display())
};
url.set_path(&new_path);
log::debug!("Branch URL with subpath: {}", url);
return Ok(url);
}
Ok(url)
}
fn update_vcs_fields(
wt: &dyn PyWorkingTree,
debian_path: &Path,
vcs_url: &url::Url,
vcs_type: &str,
) -> Result<(), Error> {
use debian_analyzer::editor::{Editor, TreeEditor};
use debian_control::lossless::Control;
use std::ops::Deref;
let editor = TreeEditor::<Control>::new(wt, &debian_path.join("control"), false, false)?;
let control = editor.deref();
let mut source = control
.source()
.ok_or_else(|| Error::Other("No source package found".to_string()))?;
match vcs_type {
"git" => source.set_vcs_git(vcs_url.as_str()),
"bzr" => source.set_vcs_bzr(vcs_url.as_str()),
"svn" => source.set_vcs_svn(vcs_url.as_str()),
"hg" => source.set_vcs_hg(vcs_url.as_str()),
"cvs" => source.set_vcs_cvs(vcs_url.as_str()),
"darcs" => source.set_vcs_darcs(vcs_url.as_str()),
_ => {
log::warn!("Unknown VCS type: {}, defaulting to Git", vcs_type);
source.set_vcs_git(vcs_url.as_str())
}
}
if let Some(browser_url) = determine_browser_url(vcs_type, vcs_url) {
source.set_vcs_browser(Some(&browser_url));
}
editor.commit()?;
Ok(())
}
fn determine_browser_url(vcs_type: &str, vcs_url: &url::Url) -> Option<String> {
match vcs_type {
"git" => {
let url_str = vcs_url.as_str();
if url_str.contains("github.com") {
Some(
url_str
.replace("git@github.com:", "https://github.com/")
.replace("git://github.com/", "https://github.com/")
.replace("git+ssh://git@github.com/", "https://github.com/")
.replace(".git", ""),
)
} else if url_str.contains("gitlab") {
if url_str.starts_with("https://") {
Some(url_str.replace(".git", ""))
} else {
Some(
url_str
.replace("git@", "https://")
.replace(":", "/")
.replace(".git", ""),
)
}
} else if url_str.contains("salsa.debian.org") {
if url_str.starts_with("https://") {
Some(url_str.replace(".git", ""))
} else {
Some(
url_str
.replace("git@salsa.debian.org:", "https://salsa.debian.org/")
.replace("git://salsa.debian.org/", "https://salsa.debian.org/")
.replace(".git", ""),
)
}
} else {
None
}
}
_ => None,
}
}
async fn find_wnpp_bugs_for_package_async(
source_name: &str,
upstream_name: Option<&str>,
) -> Result<Vec<(BugId, BugKind)>, Error> {
let mut names = vec![source_name];
if let Some(upstream) = upstream_name {
if upstream != source_name {
names.push(upstream);
}
}
let name_refs: Vec<&str> = names.to_vec();
match debian_analyzer::wnpp::find_wnpp_bugs_harder(&name_refs).await {
Ok(bugs) => Ok(bugs),
Err(e) => {
log::warn!("Failed to query WNPP bugs: {}", e);
Ok(vec![])
}
}
}
fn find_wnpp_bugs_for_package(
source_name: &str,
upstream_name: Option<&str>,
) -> Result<Vec<(BugId, BugKind)>, Error> {
let rt = tokio::runtime::Runtime::new()
.map_err(|e| Error::Other(format!("Failed to create async runtime: {}", e)))?;
rt.block_on(find_wnpp_bugs_for_package_async(source_name, upstream_name))
}
pub fn get_project_wide_deps_with_session(
session: &dyn ognibuild::session::Session,
_wt: &dyn PyWorkingTree,
_subpath: &Path,
buildsystem: &dyn ognibuild::buildsystem::BuildSystem,
) -> Result<
(
Vec<ognibuild::dependencies::debian::DebianDependency>,
Vec<ognibuild::dependencies::debian::DebianDependency>,
),
Error,
> {
log::debug!("Setting up dependency resolution with ognibuild session");
let (build_deps, test_deps) =
ognibuild::debian::upstream_deps::get_project_wide_deps(session, buildsystem);
log::debug!(
"Found {} build dependencies and {} test dependencies",
build_deps.len(),
test_deps.len()
);
Ok((build_deps, test_deps))
}
fn commit_debianization(
wt: &dyn PyWorkingTree,
subpath: &Path,
upstream_revid: &RevisionId,
upstream_version: &str,
mangled_version: &str,
) -> Result<HashMap<String, RevisionId>, Error> {
let mut tag_names = HashMap::new();
let debian_path = subpath.join("debian");
let debian_path_ref = debian_path.as_path();
let debian_path_abs = wt.basedir().join(&debian_path);
wt.smart_add(&[debian_path_abs.as_path()])?;
let message = format!("Import upstream version {}", upstream_version);
let _reporter = NullCommitReporter::new();
match wt.commit(&message, None, None, Some(false), Some(&[debian_path_ref])) {
Ok(revid) => {
let tag_name = format!("upstream/{}", mangled_version);
tag_names.insert(tag_name, upstream_revid.clone());
let debian_tag = format!("debian/{}-1", mangled_version);
tag_names.insert(debian_tag, revid);
}
Err(e) if e.to_string().contains("PointlessCommit") => {
}
Err(e) => return Err(Error::BrzError(e)),
}
Ok(tag_names)
}
fn write_initial_changelog(
wt: &dyn PyWorkingTree,
debian_path: &Path,
source_name: &str,
version: &str,
maintainer_name: &str,
maintainer_email: &str,
wnpp_bugs: &[(BugId, BugKind)],
) -> Result<(), Error> {
let changelog_path = debian_path.join("changelog");
let mut content = format!(
"{} ({}-1) UNRELEASED; urgency=low\n\n",
source_name, version
);
content.push_str(" * Initial release.");
for (bug_id, bug_kind) in wnpp_bugs {
match bug_kind {
BugKind::ITP => content.push_str(&format!(" (Closes: #{})", bug_id)),
BugKind::RFP => content.push_str(&format!(" (Closes: #{})", bug_id)),
}
}
content.push_str("\n\n");
let timestamp = chrono::Local::now().format("%a, %d %b %Y %H:%M:%S %z");
content.push_str(&format!(
" -- {} <{}> {}\n",
maintainer_name, maintainer_email, timestamp
));
wt.put_file_bytes_non_atomic(&changelog_path, content.as_bytes())?;
Ok(())
}
fn basic_import_upstream_version(
wt: &dyn PyWorkingTree,
_subpath: &Path,
upstream_branch: &dyn PyBranch,
_source_name: &str,
upstream_version: &str,
) -> Result<String, Error> {
let upstream_branch_name = "upstream";
match wt.controldir().create_branch(Some(upstream_branch_name)) {
Ok(branch) => {
let upstream_revid = upstream_branch.last_revision();
branch.generate_revision_history(&upstream_revid)?;
log::info!(
"Created upstream branch pointing to revision {}",
upstream_revid
);
}
Err(BrzError::AlreadyBranch(..)) => {
log::info!("Upstream branch already exists");
}
Err(e) => return Err(Error::BrzError(e)),
}
let upstream_tag = format!("upstream/{}", upstream_version);
let upstream_revid = upstream_branch.last_revision();
match wt.branch().tags() {
Ok(tags) => match tags.set_tag(&upstream_tag, &upstream_revid) {
Ok(_) => {
log::info!("Created upstream tag: {}", upstream_tag);
}
Err(BrzError::TagAlreadyExists(..)) => {
log::info!("Upstream tag {} already exists", upstream_tag);
}
Err(e) => {
log::warn!("Failed to create upstream tag {}: {}", upstream_tag, e);
}
},
Err(e) => {
log::warn!("Failed to get tags: {}", e);
}
}
Ok(upstream_branch_name.to_string())
}
#[cfg(test)]
fn unsplit_vcs_url(_vcs_type: &str, url: &url::Url) -> String {
url.to_string()
}