import os
import shutil
import stat
import subprocess
def force_remove(path):
try:
os.remove(path)
except PermissionError:
try:
os.chmod(path, stat.S_IWRITE)
os.remove(path)
except Exception as e:
print(f"Failed to remove {path}: {e}")
except Exception as e:
print(f"Failed to remove {path}: {e}")
def force_rmtree(path):
def on_rm_error(func, path, exc_info):
try:
os.chmod(path, stat.S_IWRITE)
func(path)
except Exception as e:
print(f"Failed to force remove {path}: {e}")
try:
shutil.rmtree(path, onerror=on_rm_error)
except Exception as e:
print(f"Failed to rmtree {path}: {e}")
def vendor_r(skip_deps=False):
project_root = os.getcwd()
target_dir = os.path.join(project_root, "package-r", "src", "rust", "core")
old_vendor_dir = os.path.join(project_root, "package-r", "src", "rust", "vendor")
print(f"Vendoring core Rust logic into {target_dir}...")
if os.path.exists(old_vendor_dir):
print(f"Cleaning up old vendor directory {old_vendor_dir}...")
force_rmtree(old_vendor_dir)
if os.path.exists(target_dir):
force_rmtree(target_dir)
os.makedirs(target_dir)
shutil.copytree(os.path.join(project_root, "src"), os.path.join(target_dir, "src"))
cargo_path = os.path.join(project_root, "Cargo.toml")
target_cargo_path = os.path.join(target_dir, "Cargo.toml")
with open(cargo_path, "r", encoding="utf-8") as f:
lines = f.readlines()
new_lines = []
skipping_section = False
for line in lines:
stripped = line.strip()
if stripped.startswith("[") and stripped.endswith("]"):
if (
stripped == "[workspace]"
or stripped == "[dev-dependencies]"
or stripped.startswith("[[bench")
):
skipping_section = True
else:
skipping_section = False
if skipping_section:
continue
if 'license = "Apache-2.0"' in line:
line = line.replace('license = "Apache-2.0"', '# license = "Apache-2.0"')
if 'license-file = "LICENSE"' in line:
line = line.replace(
'license-file = "LICENSE"', '# license-file = "LICENSE"'
)
if 'readme = "README.md"' in line:
line = line.replace('readme = "README.md"', '# readme = "README.md"')
new_lines.append(line)
with open(target_cargo_path, "w", encoding="utf-8") as f:
f.writelines(new_lines)
lock_path = os.path.join(project_root, "Cargo.lock")
if os.path.exists(lock_path):
shutil.copy2(lock_path, os.path.join(target_dir, "Cargo.lock"))
print("Vendoring complete.")
if not skip_deps:
vendor_dependencies(project_root)
else:
print("Skipping dependency vendoring as requested.")
src_dir = os.path.join(project_root, "package-r", "src")
strip_exec_permissions(src_dir)
def vendor_dependencies(project_root):
rust_dir = os.path.join(project_root, "package-r", "src", "rust")
vendor_dir = os.path.join(project_root, "package-r", "src", "v")
config_dir = os.path.join(project_root, "package-r", "inst", "c")
old_config_dir = os.path.join(rust_dir, ".cargo")
if os.path.exists(old_config_dir):
force_rmtree(old_config_dir)
for old in ["inst/vendor", "inst/cargo_home", "inst/v", "v"]:
old_path = os.path.join(project_root, "package-r", old)
if os.path.exists(old_path):
force_rmtree(old_path)
print(f"Vendoring dependencies into {vendor_dir}...")
try:
subprocess.run(
["cargo", "vendor", "../v"],
cwd=rust_dir,
capture_output=True,
text=True,
check=True,
)
except subprocess.CalledProcessError as e:
print(f"Error running cargo vendor: {e.stderr}")
raise e
if os.path.exists(config_dir):
force_rmtree(config_dir)
print("Vendoring complete. Config will be passed via CLI in Makevars.")
prune_vendor(vendor_dir)
prune_windows_crate(vendor_dir)
fix_checksums(vendor_dir)
def prune_windows_crate(vendor_dir):
windows_dir = os.path.join(vendor_dir, "windows")
if not os.path.exists(windows_dir):
return
print("Pruning 'windows' crate (keeping sysinfo-required namespaces)...")
win_src = os.path.join(windows_dir, "src", "Windows")
if not os.path.exists(win_src):
return
to_keep_top = {"Win32", "Wdk", "mod.rs"}
for entry in os.listdir(win_src):
path = os.path.join(win_src, entry)
if entry not in to_keep_top and os.path.isdir(path):
print(f" Pruning top-level: {entry}")
force_rmtree(path)
win32_src = os.path.join(win_src, "Win32")
if os.path.exists(win32_src):
to_keep_win32 = {
"Foundation",
"Security",
"System",
"Storage",
"NetworkManagement",
"Networking",
"UI",
}
for entry in os.listdir(win32_src):
path = os.path.join(win32_src, entry)
if entry not in to_keep_win32 and os.path.isdir(path):
print(f" Pruning Win32/{entry}")
force_rmtree(path)
win32_sys = os.path.join(win32_src, "System")
if os.path.exists(win32_sys):
to_keep_sys = {
"Com",
"Diagnostics",
"IO",
"Ioctl",
"Kernel",
"Memory",
"Ole",
"Performance",
"Power",
"ProcessStatus",
"Registry",
"RemoteDesktop",
"Rpc",
"SystemInformation",
"SystemServices",
"Threading",
"Variant",
"WindowsProgramming",
"Wmi",
}
for entry in os.listdir(win32_sys):
path = os.path.join(win32_sys, entry)
if entry not in to_keep_sys and os.path.isdir(path):
print(f" Pruning Win32/System/{entry}")
force_rmtree(path)
win32_diag = os.path.join(win32_sys, "Diagnostics")
if os.path.exists(win32_diag):
to_keep_diag = {"ToolHelp", "Debug"}
for entry in os.listdir(win32_diag):
path = os.path.join(win32_diag, entry)
if entry not in to_keep_diag and os.path.isdir(path):
print(f" Pruning Win32/System/Diagnostics/{entry}")
force_rmtree(path)
def strip_exec_permissions(vendor_dir):
print("Stripping executable permissions from vendored files...")
count = 0
for root, dirs, files in os.walk(vendor_dir):
for f in files:
fpath = os.path.join(root, f)
try:
current = os.stat(fpath).st_mode
new_mode = current & ~(stat.S_IXUSR | stat.S_IXGRP | stat.S_IXOTH)
if new_mode != current:
os.chmod(fpath, new_mode)
count += 1
except Exception:
pass
print(f" Stripped execute bit from {count} files.")
def prune_vendor(vendor_dir):
print(f"Pruning unnecessary files in {vendor_dir}...")
for root, dirs, files in os.walk(vendor_dir, topdown=True):
for name in list(dirs):
if name.startswith(".") or name in [
"tests",
"examples",
"benches",
"doc",
".github",
".vim",
]:
full_path = os.path.join(root, name)
print(f"Removing directory: {full_path}")
force_rmtree(full_path)
dirs.remove(name)
for name in files:
lower_name = name.lower()
full_path = os.path.join(root, name)
if name.startswith("."):
if name == ".cargo-checksum.json":
continue
print(f"Removing hidden file: {full_path}")
force_remove(full_path)
continue
if (
lower_name.endswith(".o")
or lower_name.endswith(".a")
or lower_name.endswith(".so")
or lower_name.endswith(".dylib")
or lower_name.endswith(".dll")
or lower_name.endswith(".lib")
or lower_name.endswith(".pdb")
or lower_name.endswith(".exp")
or lower_name.endswith(".exe")
or name == "AppVeyor.yml"
):
print(f"Removing artifact: {full_path}")
force_remove(full_path)
continue
if lower_name.endswith(
(
".md",
".txt",
".html",
".pdf",
".sh",
".bat",
".ps1",
".yml",
".yaml",
".o",
".a",
".so",
".dylib",
".dll",
".lib",
".pdb",
".exp",
".exe",
".git",
".gitignore",
".gitattributes",
".github",
)
) or name in [
"LICENSE",
"COPYING",
"CONTRIBUTING",
"AUTHORS",
"CHANGELOG",
"Cargo.toml.orig",
"Makefile",
"GNUmakefile",
"Kbuild",
"Doxyfile",
]:
print(f"Deleting unnecessary file: {full_path}")
force_remove(full_path)
continue
patch_vendored_cargo_tomls(vendor_dir)
def patch_vendored_cargo_tomls(vendor_dir):
print(f"Patching Cargo.toml files in {vendor_dir}...")
for root, dirs, files in os.walk(vendor_dir):
if "Cargo.toml" in files:
cargo_path = os.path.join(root, "Cargo.toml")
try:
with open(cargo_path, "r", encoding="utf-8") as f:
lines = f.readlines()
new_lines = []
changed = False
for line in lines:
if 'readme = "' in line or 'license-file = "' in line:
new_lines.append("# " + line)
changed = True
else:
new_lines.append(line)
if changed:
with open(cargo_path, "w", encoding="utf-8") as f:
f.writelines(new_lines)
except Exception as e:
print(f"Failed to patch {cargo_path}: {e}")
try:
os.chmod(cargo_path, stat.S_IWRITE)
except Exception:
pass
fix_checksums(vendor_dir)
def fix_checksums(vendor_dir):
print(f"Scanning {vendor_dir} for checksum issues...")
import hashlib
import json
if not os.path.exists(vendor_dir):
print(f"Error: {vendor_dir} does not exist.")
return
patched_count = 0
for root, dirs, files in os.walk(vendor_dir):
if ".cargo-checksum.json" in files:
checksum_path = os.path.join(root, ".cargo-checksum.json")
try:
with open(checksum_path, "r", encoding="utf-8") as f:
data = json.load(f)
except Exception as e:
print(f"Failed to load {checksum_path}: {e}")
continue
files_dict = data.get("files", {})
new_files_dict = {}
changed = False
for file_rel_path, old_checksum in files_dict.items():
full_path = os.path.join(root, file_rel_path)
if os.path.exists(full_path):
try:
with open(full_path, "rb") as f:
file_hash = hashlib.sha256(f.read()).hexdigest()
if file_hash != old_checksum:
print(
f"Checksum mismatch for {full_path}: expected {old_checksum[:8]}..., got {file_hash[:8]}..."
)
changed = True
new_files_dict[file_rel_path] = file_hash
except Exception as e:
print(f"Failed to calculate checksum for {full_path}: {e}")
new_files_dict[file_rel_path] = old_checksum
else:
changed = True
if changed:
data["files"] = new_files_dict
try:
new_checksum_path = os.path.join(root, "cargo-checksum.json")
if os.path.exists(checksum_path):
force_remove(checksum_path)
with open(new_checksum_path, "w", encoding="utf-8") as f:
json.dump(data, f, separators=(",", ":"))
patched_count += 1
except Exception as e:
print(f"Failed to write {new_checksum_path}: {e}")
else:
new_checksum_path = os.path.join(root, "cargo-checksum.json")
if not os.path.exists(new_checksum_path) and os.path.exists(
checksum_path
):
try:
os.rename(checksum_path, new_checksum_path)
patched_count += 1
except Exception as e:
print(f"Failed to rename {checksum_path}: {e}")
print(f"Done. Patched checksums in {patched_count} crates.")
if __name__ == "__main__":
import argparse
parser = argparse.ArgumentParser(description="Vendor Rust code for R package.")
parser.add_argument(
"--no-deps",
action="store_true",
help="Skip vendoring dependencies (cargo vendor).",
)
args = parser.parse_args()
vendor_r(skip_deps=args.no_deps)