use crate::{
signal::{handle_signal_data, read_int16_from_file},
POD5_VERSION, SOFTWARE,
};
use arrow::{
array::{
Array, BooleanArray, DictionaryArray, FixedSizeBinaryBuilder, Float32Array, Int16Array,
ListArray, ListBuilder, StringArray, UInt16Array, UInt32Array, UInt64Array, UInt64Builder,
UInt8Array,
},
datatypes::{DataType, Field, Int16Type, Schema},
record_batch::RecordBatch,
};
use std::{
collections::{HashMap, HashSet},
error::Error,
fmt,
sync::Arc,
};
use uuid::Uuid;
pub enum PoreType {
R9,
R10,
NotSet,
}
impl fmt::Display for PoreType {
fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
write!(
f,
"{}",
match self {
PoreType::R9 => "R9.4.1",
PoreType::R10 => "R10.4.1",
PoreType::NotSet => "not-set",
}
)
}
}
#[allow(non_camel_case_types)]
pub enum EndReason {
UNKNOWN,
MUX_CHANGE,
UNBLOCK_MUX_CHANGE,
DATA_SERVICE_UNBLOCK_MUX_CHANGE,
SIGNAL_POSITIVE,
SIGNAL_NEGATIVE,
}
impl fmt::Display for EndReason {
fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
write!(
f,
"{}",
match self {
EndReason::UNKNOWN => "unknown",
EndReason::MUX_CHANGE => "mux_change",
EndReason::UNBLOCK_MUX_CHANGE => "unblock_mux_change",
EndReason::DATA_SERVICE_UNBLOCK_MUX_CHANGE => "data_service_unblock_mux_change",
EndReason::SIGNAL_POSITIVE => "signal_positive",
EndReason::SIGNAL_NEGATIVE => "signal_negative",
}
)
}
}
pub fn _build_read_id(read_id: Uuid) -> arrow::error::Result<arrow::array::FixedSizeBinaryArray> {
let bytes = read_id.as_bytes();
let mut builder = FixedSizeBinaryBuilder::new(16);
builder.append_value(bytes)?;
let read_id: arrow::array::FixedSizeBinaryArray = builder.finish();
Ok(read_id)
}
pub fn _build_signal_index<I>(
values: I,
) -> arrow::error::Result<arrow::array::GenericListArray<i32>>
where
I: Iterator<Item = usize>,
{
let values_builder = UInt64Builder::new();
let mut list_builder = ListBuilder::new(values_builder);
for i in values {
list_builder.values().append_value(i as u64);
}
list_builder.append(true);
let signal: arrow::array::GenericListArray<i32> = list_builder.finish();
Ok(signal)
}
pub fn create_reads_arrow_schema(file_identifier: &Uuid) -> Result<Schema, Box<dyn Error>> {
let signal_field = Arc::new(Field::new("item", DataType::UInt64, true));
let mut metadata = HashMap::new();
metadata.insert(
"ARROW:extension:name".to_string(),
"minknow.uuid".to_string(),
);
metadata.insert("ARROW:extension:metadata".to_string(), "".to_string());
let fields = vec![
Field::new("read_id", DataType::FixedSizeBinary(16), false).with_metadata(metadata), Field::new("signal", DataType::List(signal_field), false),
Field::new("channel", DataType::UInt16, false),
Field::new("well", DataType::UInt8, false),
Field::new(
"pore_type",
DataType::Dictionary(Box::new(DataType::Int16), Box::new(DataType::Utf8)),
false,
),
Field::new("calibration_offset", DataType::Float32, false),
Field::new("calibration_scale", DataType::Float32, false),
Field::new("read_number", DataType::UInt32, false),
Field::new("start", DataType::UInt64, false),
Field::new("median_before", DataType::Float32, false),
Field::new("tracked_scaling_scale", DataType::Float32, false),
Field::new("tracked_scaling_shift", DataType::Float32, false),
Field::new("predicted_scaling_scale", DataType::Float32, false),
Field::new("predicted_scaling_shift", DataType::Float32, false),
Field::new("num_reads_since_mux_change", DataType::UInt32, false),
Field::new("time_since_mux_change", DataType::Float32, false),
Field::new("num_minknow_events", DataType::UInt64, false),
Field::new(
"end_reason",
DataType::Dictionary(Box::new(DataType::Int16), Box::new(DataType::Utf8)),
false,
),
Field::new("end_reason_forced", DataType::Boolean, false),
Field::new(
"run_info",
DataType::Dictionary(Box::new(DataType::Int16), Box::new(DataType::Utf8)),
false,
),
Field::new("num_samples", DataType::UInt64, false),
];
let mut metadata = HashMap::new();
metadata.insert("MINKNOW:pod5_version".to_string(), POD5_VERSION.to_string());
metadata.insert("MINKNOW:software".to_string(), SOFTWARE.to_string());
metadata.insert(
"MINKNOW:file_identifier".to_string(),
file_identifier.to_string(),
);
Ok(Schema::new_with_metadata(fields, metadata))
}
pub struct ReadInfo {
pub read_id: Uuid,
pub pore_type: PoreType,
pub signal_: Vec<i16>,
pub channel: u16,
pub well: u8,
pub calibration_offset: f32,
pub calibration_scale: f32,
pub read_number: u32,
pub start: u64,
pub median_before: f32,
pub tracked_scaling_shift: f32,
pub tracked_scaling_scale: f32,
pub predicted_scaling_shift: f32,
pub predicted_scaling_scale: f32,
pub num_reads_since_mux_change: u32,
pub time_since_mux_change: f32,
pub num_minknow_events: u64,
pub end_reason: EndReason,
pub end_reason_forced: bool,
pub run_info: String,
pub num_samples: u64,
}
pub fn create_read_batches(
schema: Arc<Schema>,
reads: &Vec<ReadInfo>,
_signal: &mut Vec<RecordBatch>,
signal_schema: Arc<Schema>,
) -> Result<Vec<RecordBatch>, Box<dyn Error>> {
let mut pore_values: Vec<String> = reads.iter().map(|x| x.pore_type.to_string()).collect();
let mut end_reasons: Vec<String> = reads.iter().map(|x| x.end_reason.to_string()).collect();
let mut run_index: Vec<String> = reads
.iter()
.map(|x: &ReadInfo| x.run_info.to_string())
.collect();
pore_values.append(&mut end_reasons);
pore_values.append(&mut run_index);
let unique_values: HashSet<String> = HashSet::from_iter(pore_values.drain(0..));
let pore_values: Vec<String> = Vec::from_iter(unique_values);
let dict_map: HashMap<&String, i16> = pore_values
.iter()
.enumerate()
.map(|(index, value)| (value, index as i16))
.collect();
let dict_values = Arc::new(StringArray::from(pore_values.clone()));
let mut batches = vec![];
for read in reads {
let pt = Arc::new(
DictionaryArray::<Int16Type>::try_new(
Int16Array::from(vec![*dict_map.get(&read.pore_type.to_string()).unwrap()]),
dict_values.clone(),
)
.unwrap(),
);
let er = Arc::new(
DictionaryArray::<Int16Type>::try_new(
Int16Array::from(vec![*dict_map.get(&read.end_reason.to_string()).unwrap()]),
dict_values.clone(),
)
.unwrap(),
);
let rid = Arc::new(
DictionaryArray::<Int16Type>::try_new(
Int16Array::from(vec![*dict_map.get(&read.run_info).unwrap()]),
dict_values.clone(),
)
.unwrap(),
);
batches.push(
create_read_row(
schema.clone(),
read,
_signal,
signal_schema.clone(),
pt,
er,
rid,
)
.unwrap(),
)
}
Ok(batches)
}
pub fn create_read_row(
schema: Arc<Schema>,
read: &ReadInfo,
_signal: &mut Vec<RecordBatch>,
signal_schema: Arc<Schema>,
pore_type: Arc<DictionaryArray<arrow::datatypes::Int16Type>>,
end_reason: Arc<DictionaryArray<arrow::datatypes::Int16Type>>,
run_info: Arc<DictionaryArray<arrow::datatypes::Int16Type>>,
) -> Result<RecordBatch, Box<dyn Error>> {
let read_id = _build_read_id(read.read_id)?;
let signal_batches = handle_signal_data(signal_schema, read_id.clone(), &read.signal_)?;
let num_signal_rows = signal_batches.len();
let offset = _signal.len();
_signal.extend(signal_batches);
let signal_: ListArray = _build_signal_index(offset..(offset + num_signal_rows))?;
let channel = UInt16Array::from(vec![read.channel]);
let well = UInt8Array::from(vec![read.well]);
let calibration_offset = Float32Array::from(vec![read.calibration_offset]);
let calibration_scale = Float32Array::from(vec![read.calibration_scale]);
let read_number = UInt32Array::from(vec![read.read_number]);
let start = UInt64Array::from(vec![read.start]);
let median_before = Float32Array::from(vec![read.median_before]);
let tracked_scaling_scale = Float32Array::from(vec![read.tracked_scaling_scale]);
let tracked_scaling_shift = Float32Array::from(vec![read.tracked_scaling_shift]);
let predicted_scaling_scale = Float32Array::from(vec![read.predicted_scaling_scale]);
let predicted_scaling_shift = Float32Array::from(vec![read.predicted_scaling_shift]);
let num_reads_since_mux_change = UInt32Array::from(vec![read.num_reads_since_mux_change]);
let time_since_mux_change = Float32Array::from(vec![read.time_since_mux_change]);
let num_minknow_events = UInt64Array::from(vec![read.num_minknow_events]);
let end_reason_forced = BooleanArray::from(vec![read.end_reason_forced]);
let num_samples = UInt64Array::from(vec![read.num_samples]);
let batch2 = RecordBatch::try_new(
schema.clone(),
vec![
Arc::new(read_id.clone()),
Arc::new(signal_) as Arc<dyn Array>,
Arc::new(channel),
Arc::new(well),
pore_type,
Arc::new(calibration_offset) as Arc<dyn Array>,
Arc::new(calibration_scale) as Arc<dyn Array>,
Arc::new(read_number) as Arc<dyn Array>,
Arc::new(start) as Arc<dyn Array>,
Arc::new(median_before) as Arc<dyn Array>,
Arc::new(tracked_scaling_scale) as Arc<dyn Array>,
Arc::new(tracked_scaling_shift) as Arc<dyn Array>,
Arc::new(predicted_scaling_scale) as Arc<dyn Array>,
Arc::new(predicted_scaling_shift) as Arc<dyn Array>,
Arc::new(num_reads_since_mux_change) as Arc<dyn Array>,
Arc::new(time_since_mux_change) as Arc<dyn Array>,
Arc::new(num_minknow_events) as Arc<dyn Array>,
end_reason,
Arc::new(end_reason_forced) as Arc<dyn Array>,
run_info,
Arc::new(num_samples) as Arc<dyn Array>,
],
)
.unwrap();
Ok(batch2)
}
pub fn dummy_read_row(read_id: Option<&str>) -> Result<ReadInfo, Box<dyn Error>> {
let signal_data = read_int16_from_file("static/test_signal.bin")?;
let signal_data: Vec<i16> = signal_data
.iter()
.cycle()
.take(signal_data.len() * 10)
.cloned()
.collect();
let num_samples = signal_data.len() as u64;
let read: ReadInfo = ReadInfo {
read_id: Uuid::parse_str(read_id.unwrap_or("56202382-7cda-49e4-9403-2a4f6acc22ab"))?,
pore_type: PoreType::R10,
signal_: signal_data,
channel: 1,
well: 1,
calibration_offset: -264.0,
calibration_scale: 0.187_069_85,
read_number: 1,
start: 1,
median_before: 100.0,
tracked_scaling_scale: 1.0,
tracked_scaling_shift: 0.1,
predicted_scaling_scale: 1.5,
predicted_scaling_shift: 0.15,
num_reads_since_mux_change: 10,
time_since_mux_change: 5.0,
num_minknow_events: 100,
end_reason: EndReason::SIGNAL_POSITIVE,
end_reason_forced: true,
run_info: "value1".to_string(),
num_samples,
};
Ok(read)
}