import argparse
import os
from typing import List, Tuple
import numpy as np
from liftover import ChainFile
from pyfaidx import Fasta
from utils import check_tool, run
def read_first_record(fa_path: str) -> Tuple[str, str]:
fa = Fasta(fa_path)
if not fa.keys():
raise ValueError(f"No records in {fa_path}")
name = list(fa.keys())[0]
return name, str(fa[name][:].seq).upper()
def write_fasta(path: str, header: str, seq: str, width: int = 60):
with open(path, "w", encoding="utf-8") as fw:
fw.write(f">{header}\n")
for i in range(0, len(seq), width):
fw.write(seq[i : i + width] + "\n")
def run_minimap2_paf(faA: str, faB: str, threads: int, out_paf: str):
cmd = ["minimap2", "-cx", "asm5", "--cs", "-t", str(threads), faB, faA]
with open(out_paf, "w", encoding="utf-8") as out:
run(cmd, stdout=out)
def paf_to_chain_with_transanno(paf: str, out_chain: str):
cmd = ["transanno", "minimap2chain", paf, "--output", out_chain]
run(cmd)
def sample_breakpoints(L: int, n: int, dmin: int, seed: int) -> List[int]:
if L <= 2 or n <= 0:
return []
rng = np.random.default_rng(seed)
picks: List[int] = []
tries = 0
max_tries = 200000
low, high = 1, L - 1
while len(picks) < n and tries < max_tries:
x = int(rng.integers(low, high))
if all(abs(x - y) >= dmin for y in picks):
picks.append(x)
tries += 1
picks.sort()
return picks
def lift_pos(converter, chromA: str, posA_0based: int):
return converter[chromA][posA_0based]
def choose_mapped_breaks(
converter, chromA: str, chromB_target: str, L: int, n: int, dmin: int, seed: int
) -> Tuple[List[int], List[int]]:
rng = np.random.default_rng(seed)
attempts = 200
for _ in range(attempts):
a_breaks = sample_breakpoints(L, n, dmin, int(rng.integers(0, 2**31 - 1)))
if len(a_breaks) < n:
continue
b_hits: List[int] = []
ok = True
prev_b = -1
for a in a_breaks:
candidates = lift_pos(converter, chromA, a)
candidates = [
(c, p, s) for (c, p, s) in candidates if c == chromB_target and s == "+"
]
if not candidates:
ok = False
break
_, bpos, _ = candidates[0]
bpos = int(bpos)
if bpos <= prev_b:
ok = False
break
b_hits.append(bpos)
prev_b = bpos
if ok:
return a_breaks, b_hits
raise RuntimeError(
"Could not find enough mapped breakpoints; try lowering --min-distance or using a different seed."
)
def build_mosaic_alternate(
seqA: str, seqB: str, a_breaks: List[int], b_breaks: List[int]
) -> str:
assert len(a_breaks) == len(b_breaks) and len(a_breaks) > 0
L = len(seqA)
a_pos = [0] + a_breaks + [L]
pieces: List[str] = []
donor_A = True for i in range(len(a_pos) - 1):
aL, aR = a_pos[i], a_pos[i + 1]
if donor_A:
pieces.append(seqA[aL:aR])
else:
bi = i - 1
bL, bR = b_breaks[bi], b_breaks[bi + 1]
pieces.append(seqB[bL:bR] if bL <= bR else "")
donor_A = not donor_A
return "".join(pieces)
def main():
ap = argparse.ArgumentParser(
description="Breakpoint-first mosaic using a precomputed chain (or generate one if absent)."
)
ap.add_argument("faA", help="FASTA for genome A (source of breakpoints)")
ap.add_argument("faB", help="FASTA for genome B (target of liftover)")
ap.add_argument(
"out_prefix",
help="Output prefix (creates .tsv and .fa; may also create .paf/.chain if --chain not provided)",
)
ap.add_argument(
"--chain",
help="Precomputed UCSC chain file mapping A->B. If provided, minimap2/transanno are skipped.",
)
ap.add_argument("--threads", type=int, default=16)
ap.add_argument(
"--min-distance",
type=int,
default=1_000_000,
help="Minimum spacing between A breakpoints (default: %(default)s bp)",
)
ap.add_argument(
"--n", type=int, default=4, help="Number of breakpoints to simulate"
)
ap.add_argument("--seed", type=int, default=42)
args = ap.parse_args()
build_mosaic(
args.faA,
args.faB,
args.out_prefix,
args.chain,
args.threads,
args.min_distance,
args.n,
args.seed,
)
def build_mosaic(
faA: str,
faB: str,
out_prefix: str,
chain: str | None,
threads: int,
min_distance: int,
n: int,
seed: int,
):
chromA, seqA = read_first_record(faA)
chromB, seqB = read_first_record(faB)
out_tsv = out_prefix + ".breakpoints.tsv"
out_fa = out_prefix + ".mosaic.fa"
if chain:
chain_path = chain
if not os.path.exists(chain_path):
raise FileNotFoundError(f"--chain file not found: {chain_path}")
else:
check_tool("minimap2")
check_tool("transanno")
out_paf = out_prefix + ".paf"
chain_path = out_prefix + ".chain"
run_minimap2_paf(faA, faB, threads, out_paf)
paf_to_chain_with_transanno(out_paf, chain_path)
converter = ChainFile(chain_path, one_based=False)
a_breaks, b_breaks = choose_mapped_breaks(
converter=converter,
chromA=chromA,
chromB_target=chromB,
L=len(seqA),
n=int(n),
dmin=int(min_distance),
seed=int(seed),
)
os.makedirs(os.path.dirname(out_tsv) or ".", exist_ok=True)
with open(out_tsv, "w", encoding="utf-8") as w:
w.write("idx\tA_chrom\tA_bp_0based\tB_chrom\tB_bp_0based\tstrand\n")
for i, (a, b) in enumerate(zip(a_breaks, b_breaks)):
w.write(f"{i}\t{chromA}\t{a}\t{chromB}\t{b}\t+\n")
mosaic = build_mosaic_alternate(seqA, seqB, a_breaks, b_breaks)
write_fasta(out_fa, header=os.path.basename(out_prefix), seq=mosaic)
if __name__ == "__main__":
main()