import ctypes
import os
import sys
import pyarrow as pa
def load_librype():
script_dir = os.path.dirname(os.path.abspath(__file__))
lib_path = os.path.join(script_dir, "..", "target", "release", "librype.so")
lib_path = os.path.normpath(lib_path)
if not os.path.exists(lib_path):
print(f"ERROR: {lib_path} not found.", file=sys.stderr)
print(
"Build with: cargo build --release --lib --features arrow-ffi",
file=sys.stderr,
)
sys.exit(1)
lib = ctypes.CDLL(lib_path)
lib.rype_extract_minimizer_set_arrow.argtypes = [
ctypes.c_void_p, ctypes.c_size_t, ctypes.c_size_t, ctypes.c_uint64, ctypes.c_void_p, ]
lib.rype_extract_minimizer_set_arrow.restype = ctypes.c_int
lib.rype_extract_strand_minimizers_arrow.argtypes = [
ctypes.c_void_p, ctypes.c_size_t, ctypes.c_size_t, ctypes.c_uint64, ctypes.c_void_p, ]
lib.rype_extract_strand_minimizers_arrow.restype = ctypes.c_int
lib.rype_get_last_error.argtypes = []
lib.rype_get_last_error.restype = ctypes.c_char_p
return lib
ARROW_ARRAY_STREAM_SIZE = 5 * ctypes.sizeof(ctypes.c_void_p)
def alloc_arrow_stream():
buf = ctypes.create_string_buffer(ARROW_ARRAY_STREAM_SIZE)
return buf, ctypes.cast(buf, ctypes.c_void_p).value
def call_extraction(lib, func, reader, k, w, salt):
in_buf, in_ptr = alloc_arrow_stream()
out_buf, out_ptr = alloc_arrow_stream()
reader._export_to_c(in_ptr)
rc = func(in_ptr, k, w, salt, out_ptr)
if rc != 0:
err = lib.rype_get_last_error()
msg = err.decode("utf-8") if err else "unknown error"
raise RuntimeError(f"rype extraction failed: {msg}")
return pa.RecordBatchReader._import_from_c(out_ptr)
def make_input_reader(ids, sequences):
schema = pa.schema([("id", pa.int64()), ("sequence", pa.binary())])
batch = pa.record_batch(
[pa.array(ids, type=pa.int64()), pa.array(sequences, type=pa.binary())],
schema=schema,
)
return pa.RecordBatchReader.from_batches(schema, [batch])
def main():
lib = load_librype()
sequences = [
b"AAAAACCCCCAAAAACCCCCAAAAACCCCCAAAAACCCCCAAAAACCCCCAAAAACCCCCAAAAACCCCC",
b"AAAATTTTGGGGCCCCAAAATTTTGGGGCCCCAAAATTTTGGGGCCCC",
b"ACGT", ]
ids = [1, 2, 3]
k, w, salt = 16, 5, 0
print(f"PyArrow version: {pa.__version__}")
print(f"Parameters: k={k}, w={w}, salt=0x{salt:016x}")
print(f"Sequences: {len(sequences)}")
for i, seq in enumerate(sequences):
display = seq[:40].decode() + ("..." if len(seq) > 40 else "")
print(f" [{ids[i]}] len={len(seq)}: {display}")
print("\n=== Minimizer Set Extraction ===")
reader = make_input_reader(ids, sequences)
result = call_extraction(lib, lib.rype_extract_minimizer_set_arrow, reader, k, w, salt)
table = result.read_all()
print(f"Schema: {table.schema}")
print(f"Rows: {table.num_rows}")
for i in range(table.num_rows):
row_id = table.column("id")[i].as_py()
fwd = table.column("fwd_set")[i].as_py()
rc = table.column("rc_set")[i].as_py()
print(f" id={row_id}: fwd_set({len(fwd)} hashes), rc_set({len(rc)} hashes)")
if fwd:
print(f" fwd[0:3] = {['0x%016x' % h for h in fwd[:3]]}")
if rc:
print(f" rc[0:3] = {['0x%016x' % h for h in rc[:3]]}")
print("\n=== Strand Minimizers Extraction ===")
reader = make_input_reader(ids, sequences)
result = call_extraction(lib, lib.rype_extract_strand_minimizers_arrow, reader, k, w, salt)
table = result.read_all()
print(f"Schema: {table.schema}")
print(f"Rows: {table.num_rows}")
for i in range(table.num_rows):
row_id = table.column("id")[i].as_py()
fwd_h = table.column("fwd_hashes")[i].as_py()
fwd_p = table.column("fwd_positions")[i].as_py()
rc_h = table.column("rc_hashes")[i].as_py()
rc_p = table.column("rc_positions")[i].as_py()
print(f" id={row_id}: fwd({len(fwd_h)} minimizers), rc({len(rc_h)} minimizers)")
if fwd_h:
print(f" fwd[0]: hash=0x{fwd_h[0]:016x}, pos={fwd_p[0]}")
if rc_h:
print(f" rc[0]: hash=0x{rc_h[0]:016x}, pos={rc_p[0]}")
print("\nDone.")
if __name__ == "__main__":
main()