import contextlib
import logging
import os
import platform
from pathlib import Path
import numpy as np
import pytest
from bed_reader import create_bed, open_bed, subset_f64_f64, to_bed
def test_read1(shared_datadir) -> None:
import math
file = shared_datadir / "plink_sim_10s_100v_10pmiss.bed"
with open_bed(file) as bed:
assert bed.iid_count == 10
assert bed.fid[-1] == "0"
assert bed.iid[-1] == "9"
assert bed.shape == (10, 100)
val = bed.read(dtype="int8")
assert val.mean() == -13.142
val_sparse = bed.read_sparse(dtype="int8")
assert math.isclose(val_sparse.mean(), -13.142, rel_tol=1e-9)
assert bed.chromosome[-1] == "1"
assert bed.bp_position[-1] == 100
def test_write(tmp_path, shared_datadir) -> None:
in_file = shared_datadir / "plink_sim_10s_100v_10pmiss.bed"
out_file = tmp_path / "out.bed"
with open_bed(in_file) as bed:
val0 = bed.read()
properties0 = {
"fid": bed.fid,
"iid": bed.iid,
"sid": bed.sid,
"chromosome": bed.chromosome,
"cm_position": bed.cm_position,
"bp_position": bed.bp_position,
}
to_bed(out_file, val0, properties=properties0)
with open_bed(out_file) as bed1:
assert np.allclose(val0, bed1.read(), equal_nan=True)
val_sparse = bed1.read_sparse()
assert np.allclose(val0, val_sparse.toarray(), equal_nan=True)
assert np.array_equal(bed1.fid, properties0["fid"])
assert np.array_equal(bed1.iid, properties0["iid"])
assert np.array_equal(bed1.sid, properties0["sid"])
assert np.issubdtype(bed1.sid.dtype, np.str_)
assert np.array_equal(bed1.chromosome, properties0["chromosome"])
assert np.allclose(bed1.cm_position, properties0["cm_position"])
assert np.allclose(bed1.bp_position, properties0["bp_position"])
val_float = val0.astype("float")
val_float[0, 0] = 0.5
for force_python_only in [False, True]:
with pytest.raises(ValueError):
to_bed(
out_file,
val_float,
properties=properties0,
force_python_only=force_python_only,
)
val0[np.isnan(val0)] = 0 val_int8 = val0.astype("int8")
val_int8[0, 0] = -1
for force_python_only in [False, True]:
with pytest.raises(ValueError):
to_bed(
out_file,
val_int8,
properties=properties0,
force_python_only=force_python_only,
)
def test_write_stream(tmp_path, shared_datadir) -> None:
in_file = shared_datadir / "plink_sim_10s_100v_10pmiss.bed"
out_file = tmp_path / "out.bed"
with open_bed(in_file) as bed:
val0 = bed.read()
properties0 = {
"fid": bed.fid,
"iid": bed.iid,
"sid": bed.sid,
"chromosome": bed.chromosome,
"cm_position": bed.cm_position,
"bp_position": bed.bp_position,
}
with create_bed(
out_file,
iid_count=bed.iid_count,
sid_count=bed.sid_count,
properties=properties0,
) as bed_writer:
for sid_index in range(bed.sid_count):
bed_writer.write(val0[:, sid_index])
with open_bed(out_file) as bed1:
assert np.allclose(val0, bed1.read(), equal_nan=True)
val_sparse = bed1.read_sparse()
assert np.allclose(val0, val_sparse.toarray(), equal_nan=True)
assert np.array_equal(bed1.fid, properties0["fid"])
assert np.array_equal(bed1.iid, properties0["iid"])
assert np.array_equal(bed1.sid, properties0["sid"])
assert np.issubdtype(bed1.sid.dtype, np.str_)
assert np.array_equal(bed1.chromosome, properties0["chromosome"])
assert np.allclose(bed1.cm_position, properties0["cm_position"])
assert np.allclose(bed1.bp_position, properties0["bp_position"])
val_float = val0.astype("float")
val_float[0, 0] = 0.5
for force_python_only in [False, True]:
with pytest.raises(ValueError):
to_bed(
out_file,
val_float,
properties=properties0,
force_python_only=force_python_only,
)
val0[np.isnan(val0)] = 0 val_int8 = val0.astype("int8")
val_int8[0, 0] = -1
for force_python_only in [False, True]:
with pytest.raises(ValueError):
to_bed(
out_file,
val_int8,
properties=properties0,
force_python_only=force_python_only,
)
def test_overrides(shared_datadir) -> None:
with open_bed(shared_datadir / "some_missing.bed") as bed:
fid = bed.fid
iid = bed.iid
father = bed.father
mother = bed.mother
sex = bed.sex
pheno = bed.pheno
chromosome = bed.chromosome
sid = bed.sid
cm_position = bed.cm_position
bp_position = bed.bp_position
allele_1 = bed.allele_1
allele_2 = bed.allele_2
property_dict = np.load(shared_datadir / "some_missing.properties.npz")
assert np.array_equal(property_dict["fid"], fid)
assert np.array_equal(property_dict["iid"], iid)
assert np.array_equal(property_dict["father"], father)
assert np.array_equal(property_dict["mother"], mother)
assert np.array_equal(property_dict["sex"], sex)
assert np.array_equal(property_dict["pheno"], pheno)
assert np.array_equal(property_dict["chromosome"], chromosome)
assert np.array_equal(property_dict["sid"], sid)
assert np.array_equal(property_dict["cm_position"], cm_position)
assert np.array_equal(property_dict["bp_position"], bp_position)
assert np.array_equal(property_dict["allele_1"], allele_1)
assert np.array_equal(property_dict["allele_2"], allele_2)
with pytest.raises(KeyError):
open_bed(shared_datadir / "some_missing.bed", properties={"unknown": [3, 4, 4]})
with open_bed(
shared_datadir / "some_missing.bed", properties={"iid": None},
) as bed1:
assert bed1.iid is None
with open_bed(shared_datadir / "some_missing.bed", properties={"iid": []}) as bed1:
assert np.issubdtype(bed1.iid.dtype, np.str_)
assert len(bed1.iid) == 0
with pytest.raises(ValueError):
bed1.father
with open_bed(
shared_datadir / "some_missing.bed",
properties={"sid": list(range(len(sid)))},
) as bed1:
assert np.issubdtype(bed1.sid.dtype, np.str_)
assert bed1.sid[0] == "0"
with pytest.raises(ValueError):
open_bed(
shared_datadir / "some_missing.bed",
properties={"sex": ["F" for i in range(len(sex))]},
) with open_bed(
shared_datadir / "some_missing.bed",
properties={"sid": np.array(list(range(len(sid))))},
) as bed1:
assert np.issubdtype(bed1.sid.dtype, np.str_)
assert bed1.sid[0] == "0"
with pytest.raises(ValueError):
open_bed(
shared_datadir / "some_missing.bed",
properties={"sid": np.array([(i, i) for i in range(len(sid))])},
)
with open_bed(
shared_datadir / "some_missing.bed", properties={"sid": [1, 2, 3]},
) as bed1, pytest.raises(ValueError):
bed1.chromosome
def test_str(shared_datadir) -> None:
with open_bed(shared_datadir / "some_missing.bed") as bed:
assert "open_bed(" in str(bed)
def test_bad_bed(shared_datadir) -> None:
with pytest.raises(ValueError):
open_bed(shared_datadir / "badfile.bed")
open_bed(shared_datadir / "badfile.bed", skip_format_check=True)
def test_bad_dtype_or_order(shared_datadir) -> None:
with pytest.raises(ValueError):
open_bed(shared_datadir / "some_missing.bed").read(dtype=np.int32)
with pytest.raises(ValueError):
open_bed(shared_datadir / "some_missing.bed").read(order="X")
with pytest.raises(ValueError):
open_bed(shared_datadir / "some_missing.bed").read_sparse(dtype=np.int32)
def setting_generator(seq_dict, seed=9392):
import itertools
from numpy.random import RandomState
longest = max(len(value_list) for value_list in seq_dict.values())
for test_index in range(longest):
setting = {}
for offset, (key, value_list) in enumerate(seq_dict.items()):
val = value_list[(test_index + offset) % len(value_list)]
if not (isinstance(val, str) and val == "leave_out"):
setting[key] = val
yield setting
all_combo = list(itertools.product(*seq_dict.values()))
random_state = RandomState(seed)
random_state.shuffle(all_combo)
for combo in all_combo:
setting = {
key: value_list
for key, value_list in itertools.zip_longest(seq_dict, combo)
if not (isinstance(value_list, str) and value_list == "leave_out")
}
yield setting
def test_properties(shared_datadir) -> None:
file = shared_datadir / "plink_sim_10s_100v_10pmiss.bed"
with open_bed(file) as bed:
iid_list = bed.iid.tolist()
sid_list = bed.sid.tolist()
chromosome_list = bed.chromosome.tolist()
test_count = 75
seq_dict = {
"iid": ["leave_out", None, iid_list, np.array(iid_list)],
"iid_count": ["leave_out", len(iid_list)],
"iid_before_read": [False, True],
"iid_after_read": [False, True],
"sid": ["leave_out", None, sid_list, np.array(sid_list)],
"sid_count": [None, len(sid_list)],
"sid_before_read": [False, True],
"sid_after_read": [False, True],
"chromosome": ["leave_out", None, chromosome_list, np.array(chromosome_list)],
"chromosome_before_read": [False, True],
"chromosome_after_read": [False, True],
}
def _not_set_to_none(settings, key):
return key not in settings or settings[key] is not None
for test_index, settings in enumerate(setting_generator(seq_dict)):
if test_index >= test_count:
break
with open_bed(
file,
iid_count=settings.get("iid_count"),
sid_count=settings.get("sid_count"),
properties={
k: v for k, v in settings.items() if k in {"iid", "sid", "chromosome"}
},
) as bed:
logging.info(f"Test {test_count}")
if settings["iid_before_read"]:
if _not_set_to_none(settings, "iid"):
assert np.array_equal(bed.iid, iid_list)
else:
assert bed.iid is None
if settings["sid_before_read"]:
if _not_set_to_none(settings, "sid"):
assert np.array_equal(bed.sid, sid_list)
else:
assert bed.sid is None
if settings["chromosome_before_read"]:
if _not_set_to_none(settings, "chromosome"):
assert np.array_equal(bed.chromosome, chromosome_list)
else:
assert bed.chromosome is None
val = bed.read()
assert val.shape == (
len(iid_list),
len(sid_list),
)
val_sparse = bed.read_sparse()
assert np.allclose(val, val_sparse.toarray(), equal_nan=True)
if settings["iid_after_read"]:
if _not_set_to_none(settings, "iid"):
assert np.array_equal(bed.iid, iid_list)
else:
assert bed.iid is None
if settings["sid_after_read"]:
if _not_set_to_none(settings, "sid"):
assert np.array_equal(bed.sid, sid_list)
else:
assert bed.sid is None
if settings["chromosome_after_read"]:
if _not_set_to_none(settings, "chromosome"):
assert np.array_equal(bed.chromosome, chromosome_list)
else:
assert bed.chromosome is None
def test_c_reader_bed(shared_datadir) -> None:
for force_python_only, format in [(False, "csc"), (True, "csr")]:
bed = open_bed(shared_datadir / "some_missing.bed", count_A1=False)
val = bed.read(order="F", force_python_only=force_python_only)
assert val.dtype == np.float32
ref_val = reference_val(shared_datadir)
ref_val = ref_val * -1 + 2
assert np.allclose(ref_val, val, rtol=1e-05, atol=1e-05, equal_nan=True)
val_sparse = bed.read_sparse(format=format)
assert val_sparse.dtype == np.float32
assert np.allclose(
ref_val, val_sparse.toarray(), rtol=1e-05, atol=1e-05, equal_nan=True,
)
val = bed.read(order="F", dtype="int8", force_python_only=False)
assert val.dtype == np.int8
ref_val[ref_val != ref_val] = -127
ref_val = ref_val.astype("int8")
assert np.all(ref_val == val)
del bed
with open_bed(shared_datadir / "some_missing.bed") as bed:
val = bed.read(
order="F", dtype="float64", force_python_only=force_python_only,
)
ref_val = reference_val(shared_datadir)
assert np.allclose(ref_val, val, rtol=1e-05, atol=1e-05, equal_nan=True)
val_sparse = bed.read_sparse(dtype="float64")
assert np.allclose(
ref_val, val_sparse.toarray(), rtol=1e-05, atol=1e-05, equal_nan=True,
)
def reference_val(shared_datadir):
return np.load(shared_datadir / "some_missing.val.npy")
def test_bed_int8(tmp_path, shared_datadir) -> None:
with open_bed(shared_datadir / "some_missing.bed") as bed:
for force_python_only in [True, False]:
for order, format in [("F", "csc"), ("C", "csr")]:
val = bed.read(
dtype="int8", force_python_only=force_python_only, order=order,
)
assert val.dtype == np.int8
assert (val.flags["C_CONTIGUOUS"] and order == "C") or (
val.flags["F_CONTIGUOUS"] and order == "F"
)
ref_val = reference_val(shared_datadir)
ref_val[ref_val != ref_val] = -127
ref_val = ref_val.astype("int8")
assert np.array_equal(ref_val, val)
output = str(tmp_path / "int8.bed")
for count_A1 in [False, True]:
for major in ["individual", "SNP"]:
to_bed(
output,
ref_val,
count_A1=count_A1,
major=major,
force_python_only=force_python_only,
)
with open_bed(output, count_A1=count_A1) as bed2:
val2 = bed2.read(
dtype="int8", force_python_only=force_python_only,
)
assert np.array_equal(val2, ref_val)
val_sparse = bed2.read_sparse(dtype="int8", format=format)
assert np.allclose(val_sparse.toarray(), ref_val)
def test_write1_bed_f64cpp(tmp_path, shared_datadir) -> None:
with open_bed(shared_datadir / "some_missing.bed") as bed:
for iid_index in [0, 1, 5]:
for force_python_only, format in [(False, "csc"), (True, "csr")]:
val_sparse = bed.read_sparse(
np.s_[0:iid_index, :], dtype=np.float64, format=format,
)
assert val_sparse.shape == (iid_index, 100)
val = bed.read(
np.s_[0:iid_index, :],
order="F",
dtype=np.float64,
force_python_only=force_python_only,
)
assert val.shape == (iid_index, 100)
output = str(tmp_path / f"toydata.F64cpp.{iid_index}.bed")
to_bed(output, val, count_A1=False)
val2 = open_bed(output, count_A1=False).read(dtype="float64")
assert np.allclose(val, val2, equal_nan=True)
assert np.allclose(val_sparse.toarray(), val2, equal_nan=True)
def test_write1_bed_f64cpp_stream(tmp_path, shared_datadir) -> None:
with open_bed(shared_datadir / "some_missing.bed") as bed:
for iid_index in [0, 1, 5]:
for force_python_only, format in [(False, "csc"), (True, "csr")]:
val_sparse = bed.read_sparse(
np.s_[0:iid_index, :], dtype=np.float64, format=format,
)
assert val_sparse.shape == (iid_index, 100)
val = bed.read(
np.s_[0:iid_index, :],
order="F",
dtype=np.float64,
force_python_only=force_python_only,
)
assert val.shape == (iid_index, 100)
output = str(tmp_path / f"toydata.F64cpp.{iid_index}.bed")
with create_bed(
output, iid_count=iid_index, sid_count=100, count_A1=False,
) as bed_writer:
for sid_index in range(100):
bed_writer.write(val[:, sid_index])
val2 = open_bed(output, count_A1=False).read(dtype="float64")
assert np.allclose(val, val2, equal_nan=True)
assert np.allclose(val_sparse.toarray(), val2, equal_nan=True)
def test_write1_x_x_cpp(tmp_path, shared_datadir) -> None:
for count_A1 in [False, True]:
with open_bed(shared_datadir / "some_missing.bed", count_A1=count_A1) as bed:
for order, format in [("F", "csc"), ("C", "csr")]:
for dtype in [np.float32, np.float64]:
val = bed.read(order=order, dtype=dtype)
properties = bed.properties
val[-1, 0] = float("NAN")
output = str(
tmp_path
/ "toydata.{}{}.cpp".format(
order, "32" if dtype == np.float32 else "64",
),
)
to_bed(output, val, properties=properties, count_A1=count_A1)
val2 = open_bed(output, count_A1=count_A1).read(dtype=dtype)
assert np.allclose(val, val2, equal_nan=True)
val_sparse = open_bed(output, count_A1=count_A1).read_sparse(
dtype=dtype, format=format,
)
assert np.allclose(val, val_sparse.toarray(), equal_nan=True)
def test_write1_x_x_cpp_stream(tmp_path, shared_datadir) -> None:
for count_A1 in [False, True]:
with open_bed(shared_datadir / "some_missing.bed", count_A1=count_A1) as bed:
for order, format in [("F", "csc"), ("C", "csr")]:
for dtype in [np.float32, np.float64]:
val = bed.read(order=order, dtype=dtype)
properties = bed.properties
val[-1, 0] = float("NAN")
output = str(
tmp_path
/ "toydata.{}{}.cpp".format(
order, "32" if dtype == np.float32 else "64",
),
)
with create_bed(
output,
iid_count=val.shape[0],
sid_count=val.shape[1],
properties=properties,
count_A1=count_A1,
) as bed_writer:
for sid_index in range(val.shape[1]):
bed_writer.write(val[:, sid_index])
val2 = open_bed(output, count_A1=count_A1).read(dtype=dtype)
assert np.allclose(val, val2, equal_nan=True)
val_sparse = open_bed(output, count_A1=count_A1).read_sparse(
dtype=dtype, format=format,
)
assert np.allclose(val, val_sparse.toarray(), equal_nan=True)
def test_respect_read_inputs(shared_datadir) -> None:
from scipy import sparse
ref_val_float = reference_val(shared_datadir)
ref_val_float2 = ref_val_float.copy()
ref_val_float2[ref_val_float != ref_val_float] = -127
ref_val_int8 = ref_val_float2.astype("int8")
with open_bed(shared_datadir / "some_missing.bed") as bed:
for order, format in [("F", "csc"), ("C", "csr")]:
for dtype in [np.int8, np.float32, np.float64]:
for force_python_only in [True, False]:
val = bed.read(
order=order, dtype=dtype, force_python_only=force_python_only,
)
has_right_order = (order == "C" and val.flags["C_CONTIGUOUS"]) or (
order == "F" and val.flags["F_CONTIGUOUS"]
)
assert val.dtype == dtype
assert has_right_order
ref_val = ref_val_int8 if dtype == np.int8 else ref_val_float
assert np.allclose(ref_val, val, equal_nan=True)
val_sparse = bed.read_sparse(dtype=dtype, format=format)
has_right_format = (
format == "csc" and isinstance(val_sparse, sparse.csc_matrix)
) or (format == "csr" and isinstance(val_sparse, sparse.csr_matrix))
assert val_sparse.dtype == dtype
assert has_right_format
assert np.allclose(ref_val, val_sparse.toarray(), equal_nan=True)
def test_threads(shared_datadir) -> None:
ref_val_float = reference_val(shared_datadir)
ref_val_float2 = ref_val_float.copy()
ref_val_float2[ref_val_float != ref_val_float] = -127
ref_val_int8 = ref_val_float2.astype("int8")
for num_threads in [1, 4]:
with open_bed(
shared_datadir / "some_missing.bed", num_threads=num_threads,
) as bed:
val = bed.read(dtype="int8")
assert np.allclose(ref_val_int8, val, equal_nan=True)
val_sparse = bed.read_sparse(dtype="int8")
assert np.allclose(ref_val_int8, val_sparse.toarray(), equal_nan=True)
def test_write12(tmp_path) -> None:
logging.info("starting 'test_writes'")
np.random.seed(0)
output_template = str(tmp_path / "writes.{0}.bed")
i = 0
for row_count in [0, 5, 2, 1]:
for col_count in [0, 4, 2, 1]:
val = np.random.randint(0, 4, size=(row_count, col_count)) * 1.0
val[val == 3] = np.nan
row0 = ["0", "1", "2", "3", "4"][:row_count]
row1 = ["0", "1", "2", "3", "4"][:row_count]
col = ["s0", "s1", "s2", "s3", "s4"][:col_count]
for is_none in [True, False]:
properties = {"fid": row0, "iid": row1, "sid": col}
if is_none:
col_prop012 = list(range(5))[:col_count]
properties["chromosome"] = col_prop012
properties["bp_position"] = col_prop012
properties["cm_position"] = col_prop012
else:
col_prop012 = None
filename = output_template.format(i)
logging.info(filename)
i += 1
to_bed(filename, val, properties=properties)
for subsetter in [None, np.s_[::2, ::3]]:
with open_bed(filename) as bed:
val2 = bed.read(index=subsetter, order="C", dtype="float32")
expected = val if subsetter is None else val[subsetter[0], :][:, subsetter[1]]
assert np.allclose(val2, expected, equal_nan=True)
assert np.array_equal(bed.fid, np.array(row0, dtype="str"))
assert np.array_equal(bed.iid, np.array(row1, dtype="str"))
assert np.array_equal(bed.sid, np.array(col, dtype="str"))
if col_prop012 is not None:
assert np.array_equal(
bed.chromosome, np.array(col_prop012, dtype="str"),
)
assert np.array_equal(
bed.bp_position, np.array(col_prop012),
)
assert np.array_equal(
bed.cm_position, np.array(col_prop012),
)
with contextlib.suppress(Exception):
os.remove(filename)
logging.info("done with 'test_writes'")
def test_writes_small(tmp_path) -> None:
output_file = tmp_path / "small.bed"
val = [[1.0, 0, np.nan, 0], [2, 0, np.nan, 2], [0, 1, 2, 0]]
properties = {
"fid": ["fid1", "fid1", "fid2"],
"iid": ["iid1", "iid2", "iid3"],
"father": ["iid23", "iid23", "iid22"],
"mother": ["iid34", "iid34", "iid33"],
"sex": [1, 2, 0],
"pheno": ["red", "red", "blue"],
"chromosome": ["1", "1", "5", "Y"],
"sid": ["sid1", "sid2", "sid3", "sid4"],
"cm_position": [100.4, 2000.5, 4000.7, 7000.9],
"bp_position": [1, 100, 1000, 1004],
"allele_1": ["A", "T", "A", "T"],
"allele_2": ["A", "C", "C", "G"],
}
to_bed(output_file, val, properties=properties)
with open_bed(output_file) as bed:
assert np.allclose(bed.read(), val, equal_nan=True)
for key, value in bed.properties.items():
assert np.array_equal(value, properties[key]) or np.allclose(
value, properties[key],
)
def test_writes_small_stream(tmp_path) -> None:
output_file = tmp_path / "small.bed"
val = np.array([[1.0, 0, np.nan, 0], [2, 0, np.nan, 2], [0, 1, 2, 0]])
properties = {
"fid": ["fid1", "fid1", "fid2"],
"iid": ["iid1", "iid2", "iid3"],
"father": ["iid23", "iid23", "iid22"],
"mother": ["iid34", "iid34", "iid33"],
"sex": [1, 2, 0],
"pheno": ["red", "red", "blue"],
"chromosome": ["1", "1", "5", "Y"],
"sid": ["sid1", "sid2", "sid3", "sid4"],
"cm_position": [100.4, 2000.5, 4000.7, 7000.9],
"bp_position": [1, 100, 1000, 1004],
"allele_1": ["A", "T", "A", "T"],
"allele_2": ["A", "C", "C", "G"],
}
with create_bed(
output_file,
iid_count=val.shape[0],
sid_count=val.shape[1],
properties=properties,
) as bed_writer:
for column_data in val.T:
bed_writer.write(column_data)
with open_bed(output_file) as bed:
assert np.allclose(bed.read(), val, equal_nan=True)
for key, value in bed.properties.items():
assert np.array_equal(value, properties[key]) or np.allclose(
value, properties[key],
)
def test_index(shared_datadir) -> None:
ref_val_float = reference_val(shared_datadir)
with open_bed(shared_datadir / "some_missing.bed") as bed:
val = bed.read()
assert np.allclose(ref_val_float, val, equal_nan=True)
val_sparse = bed.read_sparse()
assert np.allclose(ref_val_float, val_sparse.toarray(), equal_nan=True)
val = bed.read(2)
assert np.allclose(ref_val_float[:, [2]], val, equal_nan=True)
val_sparse = bed.read_sparse(2)
assert np.allclose(ref_val_float[:, [2]], val_sparse.toarray(), equal_nan=True)
val = bed.read(2)
assert np.allclose(ref_val_float[:, [2]], val, equal_nan=True)
val_sparse = bed.read_sparse(2)
assert np.allclose(ref_val_float[:, [2]], val_sparse.toarray(), equal_nan=True)
val = bed.read((None, 2))
assert np.allclose(ref_val_float[:, [2]], val, equal_nan=True)
val_sparse = bed.read_sparse((None, 2))
assert np.allclose(ref_val_float[:, [2]], val_sparse.toarray(), equal_nan=True)
val = bed.read((1, 2))
assert np.allclose(ref_val_float[[1], [2]], val, equal_nan=True)
val_sparse = bed.read_sparse((1, 2))
assert np.allclose(
ref_val_float[[1], [2]], val_sparse.toarray(), equal_nan=True,
)
val = bed.read([2, -2])
assert np.allclose(ref_val_float[:, [2, -2]], val, equal_nan=True)
val_sparse = bed.read_sparse([2, -2])
assert np.allclose(
ref_val_float[:, [2, -2]], val_sparse.toarray(), equal_nan=True,
)
val = bed.read(([1, -1], [2, -2]))
assert np.allclose(ref_val_float[[1, -1], :][:, [2, -2]], val, equal_nan=True)
val_sparse = bed.read_sparse(([1, -1], [2, -2]))
assert np.allclose(
ref_val_float[[1, -1], :][:, [2, -2]], val_sparse.toarray(), equal_nan=True,
)
iid_bool = ([False, False, True] * bed.iid_count)[: bed.iid_count]
sid_bool = ([True, False, True] * bed.sid_count)[: bed.sid_count]
val = bed.read(sid_bool)
assert np.allclose(ref_val_float[:, sid_bool], val, equal_nan=True)
val_sparse = bed.read_sparse(sid_bool)
assert np.allclose(
ref_val_float[:, sid_bool], val_sparse.toarray(), equal_nan=True,
)
val = bed.read((iid_bool, sid_bool))
assert np.allclose(ref_val_float[iid_bool, :][:, sid_bool], val, equal_nan=True)
val_sparse = bed.read_sparse((iid_bool, sid_bool))
val = bed.read((1, sid_bool))
assert np.allclose(ref_val_float[[1], :][:, sid_bool], val, equal_nan=True)
val_sparse = bed.read_sparse((1, sid_bool))
assert np.allclose(
ref_val_float[[1], :][:, sid_bool], val_sparse.toarray(), equal_nan=True,
)
slicer = np.s_[::2, ::3]
val = bed.read(slicer[1])
assert np.allclose(ref_val_float[:, slicer[1]], val, equal_nan=True)
val_sparse = bed.read_sparse(slicer[1])
assert np.allclose(
ref_val_float[:, slicer[1]], val_sparse.toarray(), equal_nan=True,
)
val = bed.read(slicer)
assert np.allclose(ref_val_float[slicer], val, equal_nan=True)
val_sparse = bed.read_sparse(slicer)
assert np.allclose(ref_val_float[slicer], val_sparse.toarray(), equal_nan=True)
val = bed.read((1, slicer[1]))
assert np.allclose(ref_val_float[[1], slicer[1]], val, equal_nan=True)
val_sparse = bed.read_sparse((1, slicer[1]))
assert np.allclose(
ref_val_float[[1], slicer[1]], val_sparse.toarray(), equal_nan=True,
)
def test_shape(shared_datadir) -> None:
with open_bed(shared_datadir / "plink_sim_10s_100v_10pmiss.bed") as bed:
assert bed.shape == (10, 100)
def test_zero_files(tmp_path) -> None:
for force_python_only, format in [(False, "csc"), (True, "csr")]:
for iid_count in [3, 0]:
for sid_count in [0, 5]:
for dtype in [np.int8, np.float32, np.float64]:
val = np.zeros((iid_count, sid_count), dtype=dtype)
if iid_count * sid_count > 0:
val[0, 0] = 2
val[0, 1] = -127 if np.dtype(dtype) == np.int8 else np.nan
filename = str(tmp_path / "zero_files.bed")
to_bed(filename, val, force_python_only=force_python_only)
with open_bed(filename) as bed2:
val2 = bed2.read(dtype=dtype)
assert np.allclose(val, val2, equal_nan=True)
val_sparse = bed2.read_sparse(dtype=dtype, format=format)
assert np.allclose(val, val_sparse.toarray(), equal_nan=True)
properties2 = bed2.properties
for prop in properties2.values():
assert len(prop) in {iid_count, sid_count}
if iid_count > 0:
properties2["iid"][0] = "iidx"
if sid_count > 0:
properties2["sid"][0] = "sidx"
to_bed(
filename,
val2,
properties=properties2,
force_python_only=force_python_only,
)
with open_bed(filename) as bed3:
val3 = bed3.read(dtype=dtype)
assert np.allclose(val, val3, equal_nan=True)
val_sparse = bed3.read_sparse(dtype=dtype, format=format)
assert np.allclose(val, val_sparse.toarray(), equal_nan=True)
properties3 = bed3.properties
for key2, value_list2 in properties2.items():
value_list3 = properties3[key2]
assert np.array_equal(value_list2, value_list3)
def test_iid_sid_count(shared_datadir) -> None:
iid_count_ref, sid_count_ref = open_bed(
shared_datadir / "plink_sim_10s_100v_10pmiss.bed",
).shape
assert (iid_count_ref, sid_count_ref) == open_bed(
shared_datadir / "plink_sim_10s_100v_10pmiss.bed", iid_count=iid_count_ref,
).shape
assert (iid_count_ref, sid_count_ref) == open_bed(
shared_datadir / "plink_sim_10s_100v_10pmiss.bed", sid_count=sid_count_ref,
).shape
assert (iid_count_ref, sid_count_ref) == open_bed(
shared_datadir / "plink_sim_10s_100v_10pmiss.bed",
iid_count=iid_count_ref,
sid_count=sid_count_ref,
).shape
def test_sample_file() -> None:
from bed_reader import open_bed, sample_file
file_name = sample_file("small.bed")
with open_bed(file_name):
pass
def test_coverage2(shared_datadir, tmp_path) -> None:
with open_bed(
shared_datadir / "plink_sim_10s_100v_10pmiss.bed", properties={"iid": None},
) as bed:
assert bed.iid is None
with pytest.raises(ValueError):
open_bed(
shared_datadir / "plink_sim_10s_100v_10pmiss.bed",
properties={"iid": [1, 2, 3], "mother": [1, 2]},
)
val = np.zeros((3, 5))[::2]
assert not val.flags["C_CONTIGUOUS"]
assert not val.flags["F_CONTIGUOUS"]
with pytest.raises(ValueError):
to_bed(tmp_path / "ignore", val)
val = np.zeros((3, 5), dtype=np.str_)
with pytest.raises(ValueError):
to_bed(tmp_path / "ignore", val)
@pytest.mark.filterwarnings("ignore:invalid value encountered in cast:RuntimeWarning")
def test_coverage3(shared_datadir, tmp_path) -> None:
with open_bed(
shared_datadir / "small.bed", properties={"sex": [1.0, np.nan, 1.0, 2.0]},
) as bed:
assert np.array_equal(bed.sex, np.array([1, 0, 1, 2]))
with open_bed(
shared_datadir / "small.bed",
properties={"cm_position": [1000.0, np.nan, 2000.0, 3000.0]},
) as bed:
assert np.array_equal(bed.cm_position, np.array([1000, 0, 2000, 3000]))
list = [1.0, 0, np.nan, 0]
output_file = tmp_path / "1d.bed"
with pytest.raises(ValueError):
to_bed(output_file, list)
to_bed(output_file, np.array([list], dtype=np.float16))
with open_bed(output_file) as bed:
assert np.allclose(bed.read(), [list], equal_nan=True)
assert np.allclose(bed.read_sparse().toarray(), [list], equal_nan=True)
def test_nones(shared_datadir, tmp_path) -> None:
properties = {
"father": None,
"mother": None,
"sex": None,
"pheno": None,
"allele_1": None,
"allele_2": None,
}
with open_bed(shared_datadir / "small.bed", properties=properties) as bed:
assert np.array_equal(bed.iid, ["iid1", "iid2", "iid3"])
assert bed.father is None
val = [[1.0, 0, np.nan, 0], [2, 0, np.nan, 2], [0, 1, 2, 0]]
out_file = tmp_path / "testnones.bed"
to_bed(out_file, val, properties=properties)
def test_fam_bim_filepath(shared_datadir, tmp_path) -> None:
with open_bed(shared_datadir / "small.bed") as bed:
val = bed.read()
properties = bed.properties
output_file = tmp_path / "small.deb"
fam_file = tmp_path / "small.maf"
bim_file = tmp_path / "small.mib"
to_bed(
output_file,
val,
properties=properties,
fam_filepath=fam_file,
bim_filepath=bim_file,
)
assert output_file.exists()
assert fam_file.exists()
assert bim_file.exists()
with open_bed(output_file, fam_filepath=fam_file, bim_filepath=bim_file) as deb:
val2 = deb.read()
assert np.allclose(val, val2, equal_nan=True)
val_sparse = deb.read_sparse()
assert np.allclose(val, val_sparse.toarray(), equal_nan=True)
properties2 = deb.properties
for key in properties:
np.array_equal(properties[key], properties2[key])
def test_write_nan_properties(shared_datadir, tmp_path) -> None:
with open_bed(shared_datadir / "small.bed") as bed:
val = bed.read()
properties = bed.properties
chrom = bed.chromosome.copy()
chrom[bed.chromosome == "Y"] = 0
chrom = np.array(chrom, dtype="float")
chrom2 = chrom.copy()
chrom2[chrom2 == 0] = np.nan
cm_p = bed.cm_position.copy()
cm_p[cm_p < 3000] = 0
cm_p2 = cm_p.copy()
cm_p2[cm_p == 0] = np.nan
properties["chromosome"] = chrom2
properties["cm_position"] = cm_p2
output_file = tmp_path / "nan.bed"
to_bed(output_file, val, properties=properties)
with open_bed(output_file) as bed2:
assert np.array_equal(bed2.chromosome, ["1.0", "1.0", "5.0", "0"])
assert np.array_equal(bed2.cm_position, cm_p)
with open_bed(
shared_datadir / "small.bed",
properties={"chromosome": chrom2, "cm_position": cm_p2},
) as bed3:
assert np.array_equal(bed3.chromosome, ["1.0", "1.0", "5.0", "0"])
assert np.array_equal(bed3.cm_position, cm_p)
def test_write_nan_properties_stream(shared_datadir, tmp_path) -> None:
with open_bed(shared_datadir / "small.bed") as bed:
val = bed.read()
properties = bed.properties
chrom = bed.chromosome.copy()
chrom[bed.chromosome == "Y"] = 0
chrom = np.array(chrom, dtype="float")
chrom2 = chrom.copy()
chrom2[chrom2 == 0] = np.nan
cm_p = bed.cm_position.copy()
cm_p[cm_p < 3000] = 0
cm_p2 = cm_p.copy()
cm_p2[cm_p == 0] = np.nan
properties["chromosome"] = chrom2
properties["cm_position"] = cm_p2
output_file = tmp_path / "nan.bed"
with create_bed(
output_file,
iid_count=val.shape[0],
sid_count=val.shape[1],
properties=properties,
) as bed_writer:
for column_data in val.T:
bed_writer.write(column_data)
with open_bed(output_file) as bed2:
assert np.array_equal(bed2.chromosome, ["1.0", "1.0", "5.0", "0"])
assert np.array_equal(bed2.cm_position, cm_p)
with open_bed(
shared_datadir / "small.bed",
properties={"chromosome": chrom2, "cm_position": cm_p2},
) as bed3:
assert np.array_equal(bed3.chromosome, ["1.0", "1.0", "5.0", "0"])
assert np.array_equal(bed3.cm_position, cm_p)
def test_env(shared_datadir) -> None:
if platform.system() == "Darwin":
return
key = "MKL_NUM_THREADS"
original_val = os.environ.get(key)
try:
os.environ[key] = "1"
with open_bed(shared_datadir / "some_missing.bed") as bed:
_ = bed.read(np.s_[:100, :100])
_ = bed.read_sparse(np.s_[:100, :100])
os.environ[key] = "10"
with open_bed(shared_datadir / "some_missing.bed") as bed:
_ = bed.read(np.s_[:100, :100])
_ = bed.read_sparse(np.s_[:100, :100])
os.environ[key] = "BADVALUE"
with pytest.raises(ValueError), open_bed(shared_datadir / "some_missing.bed") as bed:
_ = bed.read(np.s_[:100, :100])
with pytest.raises(ValueError), open_bed(shared_datadir / "some_missing.bed") as bed:
_ = bed.read_sparse(np.s_[:100, :100])
finally:
if original_val is None:
if key in os.environ:
del os.environ[key]
else:
os.environ[key] = original_val
def test_noncontig_indexes(shared_datadir) -> None:
with open_bed(shared_datadir / "some_missing.bed") as bed:
whole_iid_index = np.arange(bed.iid_count)
assert whole_iid_index.flags["C_CONTIGUOUS"]
every_other = whole_iid_index[::2]
assert not every_other.flags["C_CONTIGUOUS"]
val = bed.read((every_other, -2))
whole_iid_index = np.arange(val.shape[0])
assert whole_iid_index.flags["C_CONTIGUOUS"]
every_other = whole_iid_index[::2]
assert not every_other.flags["C_CONTIGUOUS"]
val_out = np.zeros((len(every_other), 0))
with pytest.raises(ValueError):
subset_f64_f64(
val.reshape(-1, bed.sid_count, 1), every_other, [], val_out, 1,
)
def test_bed_reading_example() -> None:
import numpy as np
from bed_reader import open_bed, sample_file
file_name = sample_file("small.bed")
with open_bed(file_name, count_A1=False) as bed:
bed.read(index=np.s_[:, :3], dtype="int8", order="C", num_threads=1)
def test_sparse() -> None:
import numpy as np
from bed_reader import open_bed, sample_file
file_name = sample_file("small.bed")
with open_bed(file_name, count_A1=False) as bed:
bed.read_sparse(index=np.s_[:, :3], dtype="int8")
def test_convert_to_dtype() -> None:
from bed_reader._open_bed import _convert_to_dtype
input = [
[["a", "b", "c"], ["a", "b", "c"], None, None],
[["1.0", "2.0", "3.0"], ["1.0", "2.0", "3.0"], [1, 2, 3], [1.0, 2.0, 3.0]],
[["1.0", "2.0", "3.5"], ["1.0", "2.0", "3.5"], None, [1.0, 2.0, 3.5]],
[["1", "2", "3"], ["1", "2", "3"], [1, 2, 3], [1.0, 2.0, 3.0]],
[["1", "A", "3"], ["1", "A", "3"], None, None],
]
input = [
[np.array(inner) if inner is not None else None for inner in outer]
for outer in input
]
for ori, exp_str, exp_int, exp_float in input:
for dtype, exp in (
[np.str_, exp_str],
[np.int32, exp_int],
[
np.float32,
exp_float,
],
):
try:
actual = _convert_to_dtype(ori, dtype)
assert np.array_equal(actual, exp)
except ValueError:
assert exp is None
def test_stream_i8(shared_datadir, tmp_path) -> None:
with open_bed(shared_datadir / "some_missing.bed") as bed2:
val = bed2.read(dtype=np.int8)
properties = bed2.properties
for count_A1 in [True, False]:
for fam_location, bim_location in [
(None, None),
(tmp_path / "some_missing.fam2", tmp_path / "some_missing.bim2"),
]:
for force_python_only in [True, False]:
with create_bed(
tmp_path / "some_missing.bed",
iid_count=val.shape[0],
sid_count=val.shape[1],
properties=properties,
fam_location=fam_location,
bim_location=bim_location,
count_A1=count_A1,
force_python_only=force_python_only,
) as bed_writer:
for column_data in val.T:
bed_writer.write(column_data)
with open_bed(
tmp_path / "some_missing.bed",
count_A1=count_A1,
fam_filepath=fam_location,
bim_filepath=bim_location,
) as bed2:
val2 = bed2.read(dtype=np.int8)
assert np.allclose(val, val2, equal_nan=True)
properties2 = bed2.properties
for key, value in properties.items():
assert np.array_equal(value, properties2[key]) or np.allclose(
value, properties2[key],
)
def test_stream(shared_datadir, tmp_path) -> None:
with open_bed(shared_datadir / "some_missing.bed") as bed2:
val = bed2.read()
properties = bed2.properties
for count_A1 in [False, True]:
for fam_location, bim_location in [
(None, None),
(tmp_path / "some_missing.fam2", tmp_path / "some_missing.bim2"),
]:
with create_bed(
tmp_path / "some_missing.bed",
iid_count=val.shape[0],
sid_count=val.shape[1],
properties=properties,
fam_location=fam_location,
bim_location=bim_location,
count_A1=count_A1,
) as bed_writer:
for column_data in val.T:
bed_writer.write(column_data)
with open_bed(
tmp_path / "some_missing.bed",
count_A1=count_A1,
fam_filepath=fam_location,
bim_filepath=bim_location,
) as bed2:
val2 = bed2.read()
assert np.allclose(val, val2, equal_nan=True)
properties2 = bed2.properties
for key, value in properties.items():
assert np.array_equal(value, properties2[key]) or np.allclose(
value, properties2[key],
)
def test_stream_individual_major(shared_datadir, tmp_path) -> None:
with open_bed(shared_datadir / "small.bed") as bed:
val = bed.read()
properties = bed.properties
with create_bed(
tmp_path / "small.bed",
iid_count=val.shape[0],
sid_count=val.shape[1],
properties=properties,
major="individual",
) as bed_writer:
for row_data in val:
bed_writer.write(row_data)
with open_bed(
tmp_path / "small.bed",
) as bed2:
val2 = bed2.read()
assert np.allclose(val, val2, equal_nan=True)
properties2 = bed2.properties
for key, value in properties.items():
assert np.array_equal(value, properties2[key]) or np.allclose(
value, properties2[key],
)
def test_stream_without_with(shared_datadir, tmp_path) -> None:
with open_bed(shared_datadir / "some_missing.bed") as bed2:
val = bed2.read()
properties = bed2.properties
bed_writer = create_bed(
tmp_path / "some_missing.bed",
iid_count=val.shape[0],
sid_count=val.shape[1],
properties=properties,
)
for column_data in val.T:
bed_writer.write(column_data)
bed_writer.close()
with open_bed(
tmp_path / "some_missing.bed",
) as bed2:
val2 = bed2.read()
assert np.allclose(val, val2, equal_nan=True)
properties2 = bed2.properties
for key, value in properties.items():
assert np.array_equal(value, properties2[key]) or np.allclose(
value, properties2[key],
)
def test_stream_expected_errors(shared_datadir, tmp_path) -> None:
with open_bed(shared_datadir / "some_missing.bed") as bed2:
val = bed2.read()
properties = bed2.properties
with pytest.raises(ValueError), create_bed(
tmp_path / "some_missing.bed",
major="illegal value",
iid_count=val.shape[0],
sid_count=val.shape[1],
properties=properties,
) as bed_writer:
for column_data in val.T:
bed_writer.write(column_data)
with pytest.raises(ValueError), create_bed(
tmp_path / "some_missing.bed",
iid_count=val.shape[0] + 1,
sid_count=val.shape[1],
properties=properties,
) as bed_writer:
for column_data in val.T:
bed_writer.write(column_data)
with pytest.raises(ValueError), create_bed(
tmp_path / "some_missing.bed",
iid_count=val.shape[0],
sid_count=val.shape[1] - 1,
properties=properties,
) as bed_writer:
for column_data in val.T:
bed_writer.write(column_data)
with pytest.raises(ValueError), create_bed(
tmp_path / "some_missing.bed",
iid_count=val.shape[0],
sid_count=val.shape[1],
properties=properties,
) as bed_writer:
for column_data in val.T:
bed_writer.write(column_data)
break
with pytest.raises(ValueError), create_bed(
tmp_path / "some_missing.bed",
iid_count=val.shape[0],
sid_count=val.shape[1],
properties=properties,
) as bed_writer:
for column_data in val.T:
bed_writer.write(column_data)
bed_writer.write(column_data)
with pytest.raises(RuntimeError):
bed_writer = create_bed(
tmp_path / "some_missing.bed",
iid_count=val.shape[0],
sid_count=val.shape[1],
properties=properties,
)
for column_data in val.T:
bed_writer.write(column_data)
bed_writer.close()
bed_writer.write(val[:, 0])
with pytest.raises(ValueError), create_bed(
tmp_path / "some_missing.bed",
iid_count=val.shape[0],
sid_count=val.shape[1],
properties=properties,
) as bed_writer:
for column_data in val.T:
bed_writer.write(column_data[:-1])
with pytest.raises(ValueError), create_bed(
tmp_path / "some_missing.bed",
iid_count=val.shape[0],
sid_count=val.shape[1],
properties=properties,
) as bed_writer:
for column_data in val.T:
bed_writer.write(val)
with pytest.raises(ValueError), create_bed(
tmp_path / "some_missing.bed",
iid_count=val.shape[0],
sid_count=val.shape[1],
properties=properties,
) as bed_writer:
for column_data in val.T:
bed_writer.write(3)
def test_mode(shared_datadir) -> None:
for force_python_only in [False, True]:
with open_bed(shared_datadir / "small.bed") as bed:
val = bed.read(force_python_only=force_python_only)
with open_bed(shared_datadir / "smallmode0.bed") as bed2:
val2 = bed2.read(force_python_only=force_python_only)
assert np.allclose(val, val2.T, equal_nan=True)
def test_stream_modes(shared_datadir, tmp_path) -> None:
for force_python_only in [False, True]:
with open_bed(shared_datadir / "small.bed") as bed:
val = bed.read(force_python_only=force_python_only)
with create_bed(
tmp_path / "mode1.bed",
major="SNP",
iid_count=bed.iid_count,
sid_count=bed.sid_count,
properties=bed.properties,
force_python_only=force_python_only,
) as bed_writer:
for column_data in val.T:
bed_writer.write(column_data)
with open_bed(tmp_path / "mode1.bed") as bed2:
val2 = bed2.read(force_python_only=force_python_only)
assert np.allclose(val, val2, equal_nan=True)
properties2 = bed2.properties
for key, value in bed.properties.items():
assert np.array_equal(value, properties2[key]) or np.allclose(
value, properties2[key],
)
with create_bed(
tmp_path / "mode0.bed",
major="individual",
iid_count=bed.iid_count,
sid_count=bed.sid_count,
properties=bed.properties,
force_python_only=force_python_only,
) as bed_writer:
for snp_data in val:
bed_writer.write(snp_data)
with open_bed(tmp_path / "mode0.bed") as bed3:
val3 = bed3.read(force_python_only=force_python_only)
assert np.allclose(val, val3, equal_nan=True)
properties3 = bed3.properties
for key, value in bed.properties.items():
assert np.array_equal(value, properties3[key]) or np.allclose(
value, properties3[key],
)
def test_stream_modes2(shared_datadir, tmp_path) -> None:
for iid_count in range(8, -1, -1):
for sid_count in range(8, -1, -1):
for force_python_only in [False, True]:
for order in ["f", "c"]:
rng = np.random.default_rng(0)
val = rng.integers(3, size=(iid_count, sid_count), dtype=np.int8)
val = np.ascontiguousarray(np.array(val, order=order))
if iid_count > 0 and sid_count > 0:
val[rng.integers(iid_count), rng.integers(sid_count)] = -127
with create_bed(
tmp_path / "mode1.bed",
major="SNP",
iid_count=iid_count,
sid_count=sid_count,
force_python_only=force_python_only,
) as bed_writer:
for column_data in val.T:
bed_writer.write(column_data)
with open_bed(tmp_path / "mode1.bed") as bed2:
val2 = bed2.read(
force_python_only=force_python_only, dtype=np.int8,
)
assert np.allclose(val, val2, equal_nan=True)
with create_bed(
tmp_path / "mode0.bed",
major="individual",
iid_count=iid_count,
sid_count=sid_count,
force_python_only=force_python_only,
) as bed_writer:
for snp_data in val:
bed_writer.write(snp_data)
with open_bed(tmp_path / "mode0.bed") as bed3:
val3 = bed3.read(
force_python_only=force_python_only, dtype=np.int8,
)
assert np.allclose(val, val3, equal_nan=True)
if __name__ == "__main__":
logging.basicConfig(level=logging.INFO)
shared_datadir = Path(r"D:\OneDrive\programs\bed-reader\bed_reader\tests\data")
tmp_path = Path(r"m:/deldir/tests")
test_bed_reading_example()
pytest.main([__file__])