use crate::{GAFBase, GBZRecord, GraphReference, Subgraph, Alignment, AlignmentBlock};
use crate::alignment::{Flags, TargetPath};
use crate::utils;
use gbz::{Orientation, Pos, GBZ};
use gbz::bwt::Record;
use gbz::support;
use rusqlite::{Row, OptionalExtension};
use std::collections::{BTreeMap, HashSet};
use std::fmt::Display;
use std::io::Write;
use std::ops::Range;
use std::sync::Arc;
#[cfg(test)]
mod tests;
#[derive(Clone, Copy, Debug, PartialEq, Eq, PartialOrd, Ord, Hash)]
pub enum AlignmentOutput {
Overlapping,
Clipped,
Contained,
}
impl Display for AlignmentOutput {
fn fmt(&self, f: &mut std::fmt::Formatter) -> std::fmt::Result {
match self {
AlignmentOutput::Overlapping => write!(f, "overlapping"),
AlignmentOutput::Clipped => write!(f, "clipped"),
AlignmentOutput::Contained => write!(f, "contained"),
}
}
}
#[derive(Debug, Clone, PartialEq, Default)]
pub struct ReadSet {
nodes: BTreeMap<usize, GBZRecord>,
reads: Vec<Alignment>,
unclipped: usize,
blocks: usize,
clusters: usize,
}
impl ReadSet {
pub const CLUSTER_GAP_THRESHOLD: usize = 1000;
fn get_row_id(row: &Row) -> Result<usize, String> {
let row_id: usize = row.get(0).map_err(|x| x.to_string())?;
Ok(row_id)
}
fn decompress_block(row: &Row, from_idx: usize) -> Result<Vec<Alignment>, String> {
let min_handle: Option<usize> = row.get(from_idx + 0).map_err(|x| x.to_string())?;
let max_handle: Option<usize> = row.get(from_idx + 1).map_err(|x| x.to_string())?;
let alignments: usize = row.get(from_idx + 2).map_err(|x| x.to_string())?;
let read_length: Option<usize> = row.get(from_idx + 3).map_err(|x| x.to_string())?;
let gbwt_starts: Vec<u8> = row.get(from_idx + 4).map_err(|x| x.to_string())?;
let names: Vec<u8> = row.get(from_idx + 5).map_err(|x| x.to_string())?;
let quality_strings: Vec<u8> = row.get(from_idx + 6).map_err(|x| x.to_string())?;
let difference_strings: Vec<u8> = row.get(from_idx + 7).map_err(|x| x.to_string())?;
let flags: Vec<u8> = row.get(from_idx + 8).map_err(|x| x.to_string())?;
let numbers: Vec<u8> = row.get(from_idx + 9).map_err(|x| x.to_string())?;
let optional: Vec<u8> = row.get(from_idx + 10).map_err(|x| x.to_string())?;
let block = AlignmentBlock {
min_handle, max_handle, alignments, read_length,
gbwt_starts, names,
quality_strings, difference_strings,
flags: Flags::from(flags), numbers,
optional,
};
block.decode()
}
fn set_target_path(
&mut self, alignment: &mut Alignment, subgraph: &Subgraph,
get_record: &mut dyn FnMut(usize) -> Result<GBZRecord, String>,
contained: bool
) -> Result<(), String> {
let mut pos = match alignment.path {
TargetPath::Path(_) => return Ok(()),
TargetPath::StartPosition(pos) => Some(pos),
};
let mut path = Vec::new();
let mut overlap = false;
let mut target_path_len = 0;
while let Some(p) = pos {
if subgraph.has_handle(p.node) {
overlap = true;
} else if contained {
return Ok(()); }
let mut record = self.nodes.get(&p.node);
if record.is_none() {
let result = get_record(p.node)?;
self.nodes.insert(p.node, result);
record = self.nodes.get(&p.node);
}
path.push(p.node);
let record = record.unwrap();
target_path_len += record.sequence_len();
pos = record.to_gbwt_record().lf(p.offset);
}
if overlap {
alignment.set_target_path(path);
alignment.set_target_path_len(target_path_len);
}
Ok(())
}
fn set_target_path_simple(
&mut self, alignment: &mut Alignment,
get_record: &mut dyn FnMut(usize) -> Result<GBZRecord, String>,
) -> Result<(), String> {
let mut pos = match alignment.path {
TargetPath::Path(_) => return Ok(()),
TargetPath::StartPosition(pos) => Some(pos),
};
let mut path = Vec::new();
let mut target_path_len = 0;
while let Some(p) = pos {
let mut record = self.nodes.get(&p.node);
if record.is_none() {
let result = get_record(p.node)?;
self.nodes.insert(p.node, result);
record = self.nodes.get(&p.node);
}
path.push(p.node);
let record = record.unwrap();
target_path_len += record.sequence_len();
pos = record.to_gbwt_record().lf(p.offset);
}
alignment.set_target_path(path);
alignment.set_target_path_len(target_path_len);
Ok(())
}
pub fn new(graph: GraphReference<'_, '_>, subgraph: &Subgraph, database: &GAFBase, output: AlignmentOutput) -> Result<Self, String> {
let mut read_set = ReadSet::default();
let mut get_node = database.connection.prepare(
"SELECT edges, bwt, sequence FROM Nodes WHERE handle = ?1"
).map_err(|x| x.to_string())?;
let mut graph = graph;
let mut get_record = |handle: usize| -> Result<GBZRecord, String> {
let gaf_result = get_node.query_row(
(handle,),
|row: &Row<'_>| -> rusqlite::Result<(Vec<Pos>, Vec<u8>, Vec<u8>)> {
let edge_bytes: Vec<u8> = row.get(0)?;
let (edges, _) = Record::decompress_edges(&edge_bytes).unwrap();
let bwt: Vec<u8> = row.get(1)?;
let encoded_sequence: Vec<u8> = row.get(2)?;
let sequence = utils::decode_sequence(&encoded_sequence);
Ok((edges, bwt, sequence))
}
).optional().map_err(|x| x.to_string())?;
if gaf_result.is_none() {
return Err(format!("Could not find the record for handle {} in GAF-base", handle));
}
let (edges, bwt, mut sequence) = gaf_result.unwrap();
if sequence.is_empty() {
let seq = subgraph.sequence_for_handle(handle);
sequence = match seq {
Some(seq) => seq.to_vec(),
None => {
let gbz_record = graph.gbz_record(handle)?;
gbz_record.sequence().to_vec()
}
};
}
unsafe {
Ok(GBZRecord::from_raw_parts(handle, edges, bwt, sequence, None))
}
};
let node_ids: Vec<usize> = subgraph.node_iter().collect();
let clusters = utils::cluster_node_ids(node_ids, Self::CLUSTER_GAP_THRESHOLD);
let clusters: Vec<(usize, usize)> = clusters.into_iter()
.map(|r| (support::encode_node(*r.start(), Orientation::Forward), support::encode_node(*r.end(), Orientation::Reverse)))
.collect();
read_set.clusters = clusters.len();
let mut row_ids: HashSet<usize> = HashSet::new();
let mut get_reads = database.connection.prepare(
"SELECT id, min_handle, max_handle, alignments, read_length, gbwt_starts, names, quality_strings, difference_strings, flags, numbers, optional
FROM Alignments
WHERE min_handle <= ?1 AND max_handle >= ?2"
).map_err(|x| x.to_string())?;
for (min_handle, max_handle) in clusters.into_iter() {
let mut rows = get_reads.query((max_handle, min_handle)).map_err(|x| x.to_string())?;
while let Some(row) = rows.next().map_err(|x| x.to_string())? {
let row_id = Self::get_row_id(row)?;
if row_ids.contains(&row_id) {
continue;
}
row_ids.insert(row_id);
let block = Self::decompress_block(row, 1)?;
for mut alignment in block {
read_set.set_target_path(&mut alignment, subgraph, &mut get_record, output == AlignmentOutput::Contained)?;
if alignment.has_target_path() {
if output == AlignmentOutput::Clipped {
let sequence_len = Arc::new(|handle| {
let record = read_set.nodes.get(&handle)?;
Some(record.sequence().len())
});
let clipped = alignment.clip(subgraph, sequence_len)?;
for aln in clipped.into_iter() {
read_set.reads.push(aln);
}
} else {
read_set.reads.push(alignment);
}
read_set.unclipped += 1;
}
}
read_set.blocks += 1;
}
}
Ok(read_set)
}
pub fn from_rows(database: &GAFBase, row_range: Range<usize>, graph: Option<&GBZ>) -> Result<Self, String> {
let mut read_set = ReadSet::default();
read_set.clusters = 1;
let mut get_node = database.connection.prepare(
"SELECT edges, bwt, sequence FROM Nodes WHERE handle = ?1"
).map_err(|x| x.to_string())?;
let mut get_record = |handle: usize| -> Result<GBZRecord, String> {
let gaf_result = get_node.query_row(
(handle,),
|row: &Row<'_>| -> rusqlite::Result<(Vec<Pos>, Vec<u8>, Vec<u8>)> {
let edge_bytes: Vec<u8> = row.get(0)?;
let (edges, _) = Record::decompress_edges(&edge_bytes).unwrap();
let bwt: Vec<u8> = row.get(1)?;
let encoded_sequence: Vec<u8> = row.get(2)?;
let sequence = utils::decode_sequence(&encoded_sequence);
Ok((edges, bwt, sequence))
}
).optional().map_err(|x| x.to_string())?;
if gaf_result.is_none() {
return Err(format!("Could not find the record for handle {} in GAF-base", handle));
}
let (edges, bwt, mut sequence) = gaf_result.unwrap();
if sequence.is_empty() {
if let Some(graph) = graph {
let seq = graph.sequence(support::node_id(handle)).ok_or(
format!("Could not find the sequence for handle {} in GBZ", handle)
)?;
sequence = seq.to_vec();
} else {
return Err(String::from("No reference provided for a reference-based GAF-base"));
}
}
if support::node_orientation(handle) == Orientation::Reverse {
sequence = support::reverse_complement(&sequence);
}
unsafe {
Ok(GBZRecord::from_raw_parts(handle, edges, bwt, sequence, None))
}
};
let mut get_reads = database.connection.prepare(
"SELECT min_handle, max_handle, alignments, read_length, gbwt_starts, names, quality_strings, difference_strings, flags, numbers, optional
FROM Alignments
WHERE id >= ?1 AND id < ?2"
).map_err(|x| x.to_string())?;
let mut rows = get_reads.query((row_range.start, row_range.end)).map_err(|x| x.to_string())?;
while let Some(row) = rows.next().map_err(|x| x.to_string())? {
let block = Self::decompress_block(row, 0)?;
for mut alignment in block {
read_set.set_target_path_simple(&mut alignment, &mut get_record)?;
if alignment.has_target_path() {
read_set.reads.push(alignment);
read_set.unclipped += 1;
}
}
read_set.blocks += 1;
}
Ok(read_set)
}
#[inline]
pub fn len(&self) -> usize {
self.reads.len()
}
#[inline]
pub fn is_empty(&self) -> bool {
self.reads.is_empty()
}
#[inline]
pub fn unclipped(&self) -> usize {
self.unclipped
}
#[inline]
pub fn blocks(&self) -> usize {
self.blocks
}
#[inline]
pub fn node_records(&self) -> usize {
self.nodes.len()
}
#[inline]
pub fn clusters(&self) -> usize {
self.clusters
}
#[inline]
pub fn iter(&self) -> impl Iterator<Item = &Alignment> {
self.reads.iter()
}
fn target_sequence(&self, alignment: &Alignment) -> Result<Vec<u8>, String> {
let target_path = alignment.target_path();
if target_path.is_none() {
return Ok(Vec::new());
}
let target_path = target_path.unwrap();
let mut sequence = Vec::new();
for handle in target_path{
let record = self.nodes.get(handle);
if record.is_none() {
return Err(format!("Read {}: Missing record for node handle {}", alignment.name, handle));
}
let record = record.unwrap();
sequence.extend_from_slice(record.sequence());
}
if sequence.len() != alignment.path_len {
return Err(format!(
"Read {}: Target path length {} does not match the expected length {}",
alignment.name, sequence.len(), alignment.path_len
));
}
Ok(sequence)
}
pub fn to_gaf<W: Write>(&self, writer: &mut W) -> Result<(), String> {
for alignment in self.reads.iter() {
let target_sequence = self.target_sequence(alignment)?;
let mut line = alignment.to_gaf(&target_sequence);
line.push(b'\n');
writer.write_all(&line).map_err(|x| x.to_string())?;
}
Ok(())
}
}