import argparse
import gzip
import re
import sys
from concurrent.futures import ThreadPoolExecutor, as_completed
from pathlib import Path
from typing import Iterable, Tuple
from jcvi.apps.base import logger
FA_EXTS = (".fa", ".fasta", ".fna")
DEFAULT_PATTERN = (
r"(?ix)" r"^(" r"chr[_\-\s]*0*1" r"|chromosome[_\-\s]*0*1" r"|0*1" r")$"
)
def parse_args():
ap = argparse.ArgumentParser(
description="Extract chr1 → ref/<prefix>.fasta with renamed header."
)
ap.add_argument(
"--glob",
default="version2.5.2019-10-09/*.fasta.gz",
help="Input glob for FASTA files (default: %(default)s)",
)
ap.add_argument(
"--outdir",
type=Path,
default=Path("ref"),
help="Output directory (default: %(default)s)",
)
ap.add_argument(
"--pattern",
default=DEFAULT_PATTERN,
help="Regex applied to the first header token to detect chr1 (default matches common chr1 spellings)",
)
ap.add_argument(
"--id-sep",
default="_",
help="Separator between prefix and chr1 in new header (default: '%(default)s')",
)
ap.add_argument(
"--jobs", type=int, default=8, help="Parallel workers (default: %(default)d)"
)
ap.add_argument("--force", action="store_true", help="Overwrite existing outputs")
ap.add_argument(
"--dry-run", action="store_true", help="Print planned actions without writing"
)
return ap.parse_args()
def prefix_until_dot_or_underscore(s: str) -> str:
return s.split(".")[0].split("_")[0]
def guess_prefix(p: Path) -> str:
name = p.name
if name.endswith(".gz"):
name = name[:-3]
for ext in FA_EXTS:
if name.endswith(ext):
return prefix_until_dot_or_underscore(name[: -len(ext)])
s = Path(name).stem
return prefix_until_dot_or_underscore(s)
def nopen(path: Path):
return gzip.open(path, "rt") if path.suffix == ".gz" else open(path, "rt")
def iter_fasta_records(path: Path) -> Iterable[Tuple[str, Iterable[str]]]:
handle = nopen(path)
header = None
buf = []
try:
for line in handle:
line = line.rstrip("\n")
if not line:
continue
if line.startswith(">"):
if header is not None:
seq_iter = (s for s in buf)
yield header, seq_iter
header = line[1:].strip()
buf = []
else:
buf.append(line)
if header is not None:
seq_iter = (s for s in buf)
yield header, seq_iter
finally:
handle.close()
def first_token(header: str) -> str:
return header.split()[0]
def find_chr1_token(headers: Iterable[str], pattern: re.Pattern) -> str | None:
for h in headers:
tok = first_token(h)
if pattern.search(tok):
return h
return None
def extract_chr1(in_path: Path, out_path: Path, new_id: str, pat: re.Pattern) -> int:
bases = 0
target_header = None
headers = []
for h, _ in iter_fasta_records(in_path):
headers.append(h)
if pat.search(first_token(h)):
target_header = h
break
if target_header is None:
tokens = [first_token(h) for h in headers[:10]]
raise RuntimeError(
f"No chr1-like header found in {in_path}.\n"
f"Seen tokens (first 10): {tokens}\n"
"Consider adjusting --pattern."
)
with open(out_path, "wt") as out:
out.write(f">{new_id}\n")
for h, seq_iter in iter_fasta_records(in_path):
if first_token(h) == first_token(target_header):
for s in seq_iter:
out.write(s + "\n")
bases += len(s)
break
return bases
def process_one(
in_path: Path,
outdir: Path,
pat: re.Pattern,
id_sep: str,
force: bool,
dry_run: bool,
) -> str:
prefix = guess_prefix(in_path)
out_path = outdir / f"{prefix}.fa"
new_id = f"{prefix}{id_sep}chr1"
if out_path.exists() and not force:
return f"[SKIP] {out_path} exists"
if dry_run:
return f"[DRY] {in_path} → {out_path} header: >{new_id}"
outdir.mkdir(parents=True, exist_ok=True)
n = extract_chr1(in_path, out_path, new_id, pat)
return f"[OK] {in_path.name} → {out_path.name} ({n:,} bp)"
def main():
args = parse_args()
files = sorted(Path().glob(args.glob))
if not files:
sys.exit(f"No files matched: {args.glob}")
pat = re.compile(args.pattern)
logger.info("Found %d file(s). Writing to `%s/`", len(files), args.outdir)
if args.dry_run:
logger.info("DRY RUN (no files will be written)\n")
results = []
rc = 0
with ThreadPoolExecutor(max_workers=max(1, args.jobs)) as ex:
futs = {
ex.submit(
process_one, f, args.outdir, pat, args.id_sep, args.force, args.dry_run
): f
for f in files
}
for fut in as_completed(futs):
try:
msg = fut.result()
logger.info(msg)
results.append(msg)
except Exception as e:
rc = 1
logger.error(f"[FAIL] {futs[fut].name}: %s", e)
sys.exit(rc)
if __name__ == "__main__":
main()