import contextlib
import logging
import os
from pathlib import Path
from typing import Any, List, Mapping, Optional, Union
import numpy as np
from bed_reader import get_num_threads, open_bed
from .bed_reader import ( encode1_f32,
encode1_f64,
encode1_i8,
write_f32,
write_f64,
write_i8,
)
class create_bed:
def __init__(
self,
location: Union[str, Path],
iid_count: int,
sid_count: int,
properties: Mapping[str, List[Any]] = {},
count_A1: bool = True,
fam_location: Optional[Union[str, Path]] = None,
bim_location: Optional[Union[str, Path]] = None,
major: str = "SNP",
force_python_only: bool = False,
num_threads=None,
) -> None:
self.location = Path(location)
self.iid_count = iid_count
self.sid_count = sid_count
self.count_A1 = count_A1
self.force_python_only = force_python_only
self.num_threads = num_threads or os.cpu_count()
if major == "SNP":
self.major_count = sid_count
self.minor_count = iid_count
elif major == "individual":
self.major_count = iid_count
self.minor_count = sid_count
else:
msg = f"major must be 'SNP' or 'individual', not '{major}'"
raise ValueError(msg)
self.major = major
self.minor_count_div4 = (self.minor_count - 1) // 4 + 1
self.buffer = np.zeros(self.minor_count_div4, dtype=np.uint8)
if not count_A1:
self.zero_code = 0b00
self.two_code = 0b11
else:
self.zero_code = 0b11
self.two_code = 0b00
fam_location = (
Path(fam_location)
if fam_location is not None
else self._replace_extension(self.location, "fam")
)
bim_location = (
Path(bim_location)
if bim_location is not None
else self._replace_extension(self.location, "bim")
)
properties, _ = open_bed._fix_up_properties(
properties,
iid_count,
sid_count,
use_fill_sequence=True,
)
open_bed._write_fam_or_bim(self.location, properties, "fam", fam_location)
open_bed._write_fam_or_bim(self.location, properties, "bim", bim_location)
self.temp_filepath = self.location.with_suffix(".bed_temp")
self.file_pointer = open(self.temp_filepath, "wb")
self.file_pointer.write(bytes(bytearray([0b01101100]))) self.file_pointer.write(bytes(bytearray([0b00011011]))) if self.major == "SNP":
self.file_pointer.write(bytes(bytearray([0b00000001]))) else:
assert self.major == "individual" self.file_pointer.write(bytes(bytearray([0b00000000])))
self.write_count = 0
if os.path.exists(self.location):
os.unlink(self.location)
def __enter__(self):
return self
def __exit__(self, exc_type, exc_value, traceback):
self.close()
def close(self) -> None:
if self.file_pointer is not None:
try:
self.file_pointer.close()
self.file_pointer = None
if self.write_count != self.major_count:
msg = (
f"Attempt to write fewer vectors ({self.write_count}) "
f"than expected ({self.major_count})"
)
raise ValueError(
msg,
)
os.rename(self.temp_filepath, self.location)
except Exception:
self._clean_up_error()
raise
else:
self._clean_up_error()
def _clean_up_error(self) -> None:
with contextlib.suppress(Exception):
os.unlink(self.temp_filepath)
@staticmethod
def _replace_extension(location, extension):
if open_bed._is_url(location):
return location.parent / (location.stem + "." + extension)
return None
def write(self, vector) -> None:
if self.file_pointer is None:
msg = "Attempt to write after file was closed for writing."
raise RuntimeError(msg)
try:
self.write_count += 1
if self.write_count > self.major_count:
msg = (
f"Attempt to write more vectors ({self.write_count}) "
f"than expected ({self.major_count})"
)
raise ValueError(
msg,
)
vector = _fix_up_vector(vector)
if len(vector) != self.minor_count:
msg = f"Expected vector to {self.minor_count} values, got {len(vector)} instead"
raise ValueError(
msg,
)
self._internal_write(vector)
except Exception:
self.file_pointer.close()
self.file_pointer = None raise
def _internal_write(self, vector) -> None:
if not self.force_python_only:
if vector.dtype == np.int8:
encode1_i8(self.count_A1, vector, self.buffer, self.num_threads)
elif vector.dtype == np.float32:
encode1_f32(self.count_A1, vector, self.buffer, self.num_threads)
elif vector.dtype == np.float64:
encode1_f64(self.count_A1, vector, self.buffer, self.num_threads)
else:
msg = (
f"dtype '{vector.dtype}' not known, only 'int8', 'float32', "
f"and 'float64' are allowed."
)
raise ValueError(
msg,
)
self.file_pointer.write(self.buffer)
else:
for minor_by_four in range(0, self.minor_count, 4):
vals_for_this_byte = vector[minor_by_four : minor_by_four + 4]
byte = 0b00000000
for val_index in range(len(vals_for_this_byte)):
val_for_byte = vals_for_this_byte[val_index]
if val_for_byte == 0:
code = self.zero_code
elif val_for_byte == 1:
code = 0b10 elif val_for_byte == 2:
code = self.two_code
elif (vector.dtype == np.int8 and val_for_byte == -127) or np.isnan(
val_for_byte,
):
code = 0b01 else:
raise ValueError(
"Attempt to write illegal value to .bed file. "
+ "Only 0,1,2,missing allowed.",
)
byte |= code << (val_index * 2)
self.file_pointer.write(bytes(bytearray([byte])))
def _fix_up_vector(input):
if not isinstance(input, np.ndarray):
return _fix_up_vector(np.array(input))
if np.issubdtype(input.dtype, np.integer) and input.dtype != np.int8:
return _fix_up_vector(np.array(input, dtype=np.int8))
if np.issubdtype(input.dtype, np.floating) and input.dtype not in (
np.float32,
np.float64,
):
return _fix_up_vector(np.array(input, dtype=np.float32))
if len(input.shape) != 1:
msg = "vector should be one dimensional"
raise ValueError(msg)
return input
def to_bed(
filepath: Union[str, Path],
val: np.ndarray,
properties: Mapping[str, List[Any]] = {},
count_A1: bool = True,
fam_filepath: Optional[Union[str, Path]] = None,
bim_filepath: Optional[Union[str, Path]] = None,
major: str = "SNP",
force_python_only: bool = False,
num_threads=None,
) -> None:
filepath = Path(filepath)
val = _fix_up_val(val)
iid_count = val.shape[0]
sid_count = val.shape[1]
if major == "SNP":
major_count = sid_count
minor_count = iid_count
elif major == "individual":
major_count = iid_count
minor_count = sid_count
else:
msg = f"major must be 'SNP' or 'individual', not '{major}'"
raise ValueError(msg)
properties, _ = open_bed._fix_up_properties(
properties, iid_count=iid_count, sid_count=sid_count, use_fill_sequence=True,
)
open_bed._write_fam_or_bim(filepath, properties, "fam", fam_filepath)
open_bed._write_fam_or_bim(filepath, properties, "bim", bim_filepath)
if not force_python_only:
if not val.flags["C_CONTIGUOUS"] and not val.flags["F_CONTIGUOUS"]:
msg = "val must be contiguous."
raise ValueError(msg)
num_threads = get_num_threads(num_threads)
if major == "individual":
val = val.T
try:
if val.dtype == np.float64:
write_f64(
str(filepath),
is_a1_counted=count_A1,
val=val,
num_threads=num_threads,
)
elif val.dtype == np.float32:
write_f32(
str(filepath),
is_a1_counted=count_A1,
val=val,
num_threads=num_threads,
)
elif val.dtype == np.int8:
write_i8(
str(filepath),
is_a1_counted=count_A1,
val=val,
num_threads=num_threads,
)
else:
raise ValueError(
f"dtype '{val.dtype}' not known, only "
+ "'int8', 'float32', and 'float64' are allowed.",
)
if major == "individual":
with open(filepath, "r+b") as bed_filepointer:
bed_filepointer.seek(2)
bed_filepointer.write(bytes(bytearray([0b00000000])))
except SystemError as system_error:
with contextlib.suppress(Exception):
os.unlink(filepath)
raise system_error.__cause__
else:
if not count_A1:
zero_code = 0b00
two_code = 0b11
else:
zero_code = 0b11
two_code = 0b00
with open(filepath, "wb") as bed_filepointer:
bed_filepointer.write(bytes(bytearray([0b01101100]))) bed_filepointer.write(bytes(bytearray([0b00011011])))
if major == "SNP":
bed_filepointer.write(bytes(bytearray([0b00000001]))) else:
assert major == "individual"
bed_filepointer.write(
bytes(bytearray([0b00000000])),
)
for major_index in range(major_count):
if major_index % 1 == 0:
logging.info(
f"Writing {major} # {major_index} to file '{filepath}'",
)
if major == "SNP":
vector = val[:, major_index]
else:
assert major == "individual"
vector = val[major_index, :]
for minor_by_four in range(0, minor_count, 4):
vals_for_this_byte = vector[minor_by_four : minor_by_four + 4]
byte = 0b00000000
for val_index in range(len(vals_for_this_byte)):
val_for_byte = vals_for_this_byte[val_index]
if val_for_byte == 0:
code = zero_code
elif val_for_byte == 1:
code = 0b10 elif val_for_byte == 2:
code = two_code
elif (
val.dtype == np.int8 and val_for_byte == -127
) or np.isnan(val_for_byte):
code = 0b01 else:
raise ValueError(
"Attempt to write illegal value to .bed file. "
+ "Only 0,1,2,missing allowed.",
)
byte |= code << (val_index * 2)
bed_filepointer.write(bytes(bytearray([byte])))
logging.info(f"Done writing {filepath}")
def _fix_up_val(input):
if not isinstance(input, np.ndarray):
return _fix_up_val(np.array(input))
if np.issubdtype(input.dtype, np.integer) and input.dtype != np.int8:
return _fix_up_val(np.array(input, dtype=np.int8))
if np.issubdtype(input.dtype, np.floating) and input.dtype not in (
np.float32,
np.float64,
):
return _fix_up_val(np.array(input, dtype=np.float32))
if len(input.shape) != 2:
msg = "val should be two dimensional"
raise ValueError(msg)
return input