use crate::{Alignment, AlignmentBlock, Chains};
use crate::formats::JSONValue;
use crate::{formats, utils};
use std::io::BufRead;
use std::path::Path;
use std::sync::{mpsc, Arc};
use std::{fs, thread};
use rusqlite::{Connection, OpenFlags, OptionalExtension, Row, Statement};
use gbz::{FullPathName, GBWT, GBZ, Orientation, Pos};
use gbz::bwt::{BWT, Record};
use gbz::support::{self, Tags};
use pggname::GraphName;
use simple_sds::serialize;
#[cfg(test)]
mod tests;
#[derive(Debug)]
pub struct GBZBase {
connection: Connection,
version: String,
nodes: usize,
chains: usize,
chain_links: usize,
paths: usize,
samples: usize,
haplotypes: usize,
contigs: usize,
}
impl GBZBase {
pub const INDEX_INTERVAL: usize = 1000;
const KEY_VERSION: &'static str = "version";
pub const VERSION: &'static str = "GBZ-base v0.4.0";
const KEY_NODES: &'static str = "nodes";
const KEY_CHAINS: &'static str = "chains";
const KEY_CHAIN_LINKS: &'static str = "chain_links";
const KEY_PATHS: &'static str = "paths";
const KEY_SAMPLES: &'static str = "samples";
const KEY_HAPLOTYPES: &'static str = "haplotypes";
const KEY_CONTIGS: &'static str = "contigs";
const KEY_GBWT: &'static str = "gbwt_";
const KEY_GBZ: &'static str = "gbz_";
pub fn open<P: AsRef<Path>>(filename: P) -> Result<Self, String> {
let flags = OpenFlags::SQLITE_OPEN_READ_ONLY | OpenFlags::SQLITE_OPEN_NO_MUTEX;
let connection = Connection::open_with_flags(filename, flags).map_err(|x| x.to_string())?;
let mut get_tag = connection.prepare(
"SELECT value FROM Tags WHERE key = ?1"
).map_err(|x| x.to_string())?;
let version = get_string_value(&mut get_tag, Self::KEY_VERSION)?;
if version != Self::VERSION {
return Err(format!("Unsupported database version: {} (expected {})", version, Self::VERSION));
}
let nodes = get_numeric_value(&mut get_tag, Self::KEY_NODES)?;
let chains = get_numeric_value(&mut get_tag, Self::KEY_CHAINS)?;
let chain_links = get_numeric_value(&mut get_tag, Self::KEY_CHAIN_LINKS)?;
let paths = get_numeric_value(&mut get_tag, Self::KEY_PATHS)?;
let samples = get_numeric_value(&mut get_tag, Self::KEY_SAMPLES)?;
let haplotypes = get_numeric_value(&mut get_tag, Self::KEY_HAPLOTYPES)?;
let contigs = get_numeric_value(&mut get_tag, Self::KEY_CONTIGS)?;
drop(get_tag);
Ok(GBZBase {
connection,
version,
nodes, chains, chain_links,
paths, samples, haplotypes, contigs,
})
}
pub fn filename(&self) -> Option<&str> {
self.connection.path()
}
pub fn file_size(&self) -> Option<String> {
let filename = self.filename()?;
utils::file_size(filename)
}
pub fn version(&self) -> &str {
&self.version
}
pub fn nodes(&self) -> usize {
self.nodes
}
pub fn chains(&self) -> usize {
self.chains
}
pub fn chain_links(&self) -> usize {
self.chain_links
}
pub fn paths(&self) -> usize {
self.paths
}
pub fn samples(&self) -> usize {
self.samples
}
pub fn haplotypes(&self) -> usize {
self.haplotypes
}
pub fn contigs(&self) -> usize {
self.contigs
}
}
impl GBZBase {
pub fn create_from_files(gbz_file: &Path, chains_file: Option<&Path>, db_file: &Path) -> Result<(), String> {
eprintln!("Loading GBZ graph {}", gbz_file.display());
let graph: GBZ = serialize::load_from(gbz_file).map_err(|x| x.to_string())?;
let chains = if let Some(filename) = chains_file {
eprintln!("Loading top-level chain file {}", filename.display());
Chains::load_from(filename)?
} else {
Chains::new()
};
Self::create(&graph, &chains, db_file)
}
fn sanity_checks(graph: &GBZ) -> Result<(), String> {
let metadata = graph.metadata().ok_or(
String::from("The graph does not contain metadata")
)?;
if !metadata.has_path_names() {
return Err("The metadata does not contain path names".to_string());
}
if !metadata.has_sample_names() {
return Err("The metadata does not contain sample names".to_string());
}
if !metadata.has_contig_names() {
return Err("The metadata does not contain contig names".to_string());
}
Ok(())
}
pub fn create<P: AsRef<Path>>(graph: &GBZ, chains: &Chains, filename: P) -> Result<(), String> {
eprintln!("Creating database {}", filename.as_ref().display());
if utils::file_exists(&filename) {
return Err(format!("Database {} already exists", filename.as_ref().display()));
}
Self::sanity_checks(graph)?;
let mut connection = Connection::open(filename).map_err(|x| x.to_string())?;
Self::insert_tags(graph, chains, &mut connection).map_err(|x| x.to_string())?;
Self::insert_nodes(graph, chains, &mut connection).map_err(|x| x.to_string())?;
Self::insert_paths(graph, &mut connection).map_err(|x| x.to_string())?;
Self::index_reference_paths(graph, &mut connection).map_err(|x| x.to_string())?;
Ok(())
}
fn insert_tags(graph: &GBZ, chains: &Chains, connection: &mut Connection) -> rusqlite::Result<()> {
eprintln!("Inserting header and tags");
connection.execute(
"CREATE TABLE Tags (
key TEXT PRIMARY KEY,
value TEXT NOT NULL
) STRICT",
(),
)?;
let mut inserted = 0;
let transaction = connection.transaction()?;
{
let mut insert = transaction.prepare(
"INSERT INTO Tags(key, value) VALUES (?1, ?2)"
)?;
let metadata = graph.metadata().unwrap();
insert.execute((Self::KEY_VERSION, Self::VERSION))?;
insert.execute((Self::KEY_NODES, graph.nodes()))?;
insert.execute((Self::KEY_CHAINS, chains.len()))?;
insert.execute((Self::KEY_CHAIN_LINKS, chains.links()))?;
insert.execute((Self::KEY_PATHS, metadata.paths()))?;
insert.execute((Self::KEY_SAMPLES, metadata.samples()))?;
insert.execute((Self::KEY_HAPLOTYPES, metadata.haplotypes()))?;
insert.execute((Self::KEY_CONTIGS, metadata.contigs()))?;
inserted += 6;
let index: &GBWT = graph.as_ref();
for (key, value) in index.tags().iter() {
let key = format!("{}{}", Self::KEY_GBWT, key);
insert.execute((key, value))?;
inserted += 1;
}
for (key, value) in graph.tags().iter() {
let key = format!("{}{}", Self::KEY_GBZ, key);
insert.execute((key, value))?;
inserted += 1;
}
}
transaction.commit()?;
eprintln!("Inserted {} key-value pairs", inserted);
Ok(())
}
fn insert_nodes(graph: &GBZ, chains: &Chains, connection: &mut Connection) -> rusqlite::Result<()> {
eprintln!("Inserting nodes");
connection.execute(
"CREATE TABLE Nodes (
handle INTEGER PRIMARY KEY,
edges BLOB NOT NULL,
bwt BLOB NOT NULL,
sequence BLOB NOT NULL,
next INTEGER
) STRICT",
(),
)?;
let mut inserted = 0;
let transaction = connection.transaction()?;
{
let mut insert = transaction.prepare(
"INSERT INTO Nodes(handle, edges, bwt, sequence, next) VALUES (?1, ?2, ?3, ?4, ?5)"
)?;
let index: &GBWT = graph.as_ref();
let bwt: &BWT = index.as_ref();
for node_id in graph.node_iter() {
let forward_id = support::encode_node(node_id, Orientation::Forward);
let record_id = index.node_to_record(forward_id);
let (edge_bytes, bwt_bytes) = bwt.compressed_record(record_id).unwrap();
let sequence = graph.sequence(node_id).unwrap();
let encoded_sequence = utils::encode_sequence(sequence);
let next: Option<usize> = chains.next(forward_id);
insert.execute((forward_id, edge_bytes, bwt_bytes, encoded_sequence, next))?;
inserted += 1;
let reverse_id = support::encode_node(node_id, Orientation::Reverse);
let record_id = index.node_to_record(reverse_id);
let (edge_bytes, bwt_bytes) = bwt.compressed_record(record_id).unwrap();
let sequence = support::reverse_complement(sequence);
let encoded_sequence = utils::encode_sequence(&sequence);
let next: Option<usize> = chains.next(reverse_id);
insert.execute((reverse_id, edge_bytes, bwt_bytes, encoded_sequence, next))?;
inserted += 1;
}
}
transaction.commit()?;
eprintln!("Inserted {} node records", inserted);
Ok(())
}
fn insert_paths(graph: &GBZ, connection: &mut Connection) -> rusqlite::Result<()> {
eprintln!("Inserting paths");
connection.execute(
"CREATE TABLE Paths (
handle INTEGER PRIMARY KEY,
fw_node INTEGER NOT NULL,
fw_offset INTEGER NOT NULL,
rev_node INTEGER NOT NULL,
rev_offset INTEGER NOT NULL,
sample TEXT NOT NULL,
contig TEXT NOT NULL,
haplotype INTEGER NOT NULL,
fragment INTEGER NOT NULL,
is_indexed INTEGER NOT NULL
) STRICT",
(),
)?;
let mut inserted = 0;
let transaction = connection.transaction()?;
{
let mut insert = transaction.prepare(
"INSERT INTO
Paths(handle, fw_node, fw_offset, rev_node, rev_offset, sample, contig, haplotype, fragment, is_indexed)
VALUES (?1, ?2, ?3, ?4, ?5, ?6, ?7, ?8, ?9, FALSE)"
)?;
let index: &GBWT = graph.as_ref();
let metadata = graph.metadata().unwrap();
for handle in 0..metadata.paths() {
let fw_start = path_start(index, handle, Orientation::Forward);
let rev_start = path_start(index, handle, Orientation::Reverse);
let name = FullPathName::from_metadata(metadata, handle).unwrap();
insert.execute((
handle,
fw_start.node, fw_start.offset,
rev_start.node, rev_start.offset,
name.sample, name.contig, name.haplotype, name.fragment
))?;
inserted += 1;
}
}
transaction.commit()?;
eprintln!("Inserted information on {} paths", inserted);
Ok(())
}
fn index_reference_paths(graph: &GBZ, connection: &mut Connection) -> rusqlite::Result<()> {
eprintln!("Indexing reference paths");
connection.execute(
"CREATE TABLE ReferenceIndex (
path_handle INTEGER NOT NULL,
path_offset INTEGER NOT NULL,
node_handle INTEGER NOT NULL,
node_offset INTEGER NOT NULL,
PRIMARY KEY (path_handle, path_offset)
) STRICT",
(),
)?;
let reference_paths = graph.reference_positions(Self::INDEX_INTERVAL, true);
if reference_paths.is_empty() {
eprintln!("No reference paths to index");
return Ok(());
}
eprintln!("Inserting indexed positions");
let transaction = connection.transaction()?;
{
let mut set_as_indexed = transaction.prepare(
"UPDATE Paths SET is_indexed = TRUE WHERE handle = ?1"
)?;
let mut insert_position = transaction.prepare(
"INSERT INTO ReferenceIndex(path_handle, path_offset, node_handle, node_offset)
VALUES (?1, ?2, ?3, ?4)"
)?;
for ref_path in reference_paths.iter() {
set_as_indexed.execute((ref_path.id,))?;
for (path_offset, gbwt_position) in ref_path.positions.iter() {
insert_position.execute((ref_path.id, path_offset, gbwt_position.node, gbwt_position.offset))?;
}
}
}
transaction.commit()?;
Ok(())
}
}
#[derive(Debug)]
pub struct GAFBase {
pub(crate) connection: Connection,
version: String,
nodes: usize,
alignments: usize,
blocks: usize,
bidirectional_gbwt: bool,
tags: Tags,
}
impl GAFBase {
const KEY_VERSION: &'static str = "version";
pub const VERSION: &'static str = "GAF-base version 3";
const KEY_NODES: &'static str = "nodes";
const KEY_ALIGNMENTS: &'static str = "alignments";
const KEY_BIDIRECTIONAL_GBWT: &'static str = "bidirectional_gbwt";
const KEY_PARAMS: &'static str = "params";
fn get_string_value(tags: &Tags, key: &str) -> String {
tags.get(key).cloned().unwrap_or_default()
}
fn get_numeric_value(tags: &Tags, key: &str) -> Result<usize, String> {
let value = Self::get_string_value(tags, key);
value.parse::<usize>().map_err(|x| format!("Invalid numeric value for key {}: {}", key, x))
}
fn get_boolean_value(tags: &Tags, key: &str) -> Result<bool, String> {
let value = Self::get_numeric_value(tags, key)?;
match value {
0 => Ok(false),
1 => Ok(true),
_ => Err(format!("Invalid boolean value for key {}: {}", key, value)),
}
}
pub fn open<P: AsRef<Path>>(filename: P) -> Result<Self, String> {
let flags = OpenFlags::SQLITE_OPEN_READ_ONLY | OpenFlags::SQLITE_OPEN_NO_MUTEX;
let connection = Connection::open_with_flags(filename, flags).map_err(|x| x.to_string())?;
let mut get_tags = connection.prepare(
"SELECT key, value FROM Tags"
).map_err(|x| x.to_string())?;
let mut tags = Tags::new();
let mut rows = get_tags.query(()).map_err(|x| x.to_string())?;
while let Some(row) = rows.next().map_err(|x| x.to_string())? {
let key: String = row.get(0).map_err(|x| x.to_string())?;
let value: String = row.get(1).map_err(|x| x.to_string())?;
tags.insert(&key, &value);
}
drop(rows);
drop(get_tags);
let version = Self::get_string_value(&tags, Self::KEY_VERSION);
if version != Self::VERSION {
return Err(format!("Unsupported database version: {} (expected {})", version, Self::VERSION));
}
let nodes = Self::get_numeric_value(&tags, Self::KEY_NODES)?;
let alignments = Self::get_numeric_value(&tags, Self::KEY_ALIGNMENTS)?;
let bidirectional_gbwt = Self::get_boolean_value(&tags, Self::KEY_BIDIRECTIONAL_GBWT)?;
let mut count_rows = connection.prepare(
"SELECT COUNT(*) FROM Alignments"
).map_err(|x| x.to_string())?;
let blocks = count_rows.query_row((), |row|
row.get::<_, usize>(0)
).map_err(|x| x.to_string())?;
drop(count_rows);
Ok(GAFBase {
connection,
version,
nodes, alignments, blocks, bidirectional_gbwt,
tags,
})
}
pub fn filename(&self) -> Option<&str> {
self.connection.path()
}
pub fn file_size(&self) -> Option<String> {
let filename = self.filename()?;
utils::file_size(filename)
}
pub fn version(&self) -> &str {
&self.version
}
pub fn nodes(&self) -> usize {
self.nodes
}
pub fn alignments(&self) -> usize {
self.alignments
}
pub fn blocks(&self) -> usize {
self.blocks
}
pub fn bidirectional_gbwt(&self) -> bool {
self.bidirectional_gbwt
}
pub fn tags(&self) -> &Tags {
&self.tags
}
pub fn graph_name(&self) -> Result<GraphName, String> {
GraphName::from_tags(self.tags())
}
}
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
struct AlignmentStats {
pub alignments: usize,
pub start_bytes: usize,
pub name_bytes: usize,
pub quality_bytes: usize,
pub difference_bytes: usize,
pub flag_bytes: usize,
pub number_bytes: usize,
pub optional_bytes: usize,
}
impl AlignmentStats {
pub fn new() -> Self {
Self {
alignments: 0,
start_bytes: 0,
name_bytes: 0,
quality_bytes: 0,
difference_bytes: 0,
flag_bytes: 0,
number_bytes: 0,
optional_bytes: 0,
}
}
pub fn update(&mut self, block: &AlignmentBlock) {
self.alignments += block.len();
self.start_bytes += block.gbwt_starts.len();
self.name_bytes += block.names.len();
self.quality_bytes += block.quality_strings.len();
self.difference_bytes += block.difference_strings.len();
self.flag_bytes += block.flags.bytes();
self.number_bytes += block.numbers.len();
self.optional_bytes += block.optional.len();
}
}
#[derive(Debug, Clone, PartialEq, Eq)]
pub struct GAFBaseParams {
pub block_size: usize,
pub reference_free: bool,
pub store_quality_strings: bool,
pub store_optional_fields: bool,
}
impl GAFBaseParams {
pub const BLOCK_SIZE: usize = 1000;
pub fn to_json(&self) -> String {
let mut fields = Vec::new();
if self.reference_free {
fields.push(JSONValue::String(String::from("reference_free")));
}
if self.store_quality_strings {
fields.push(JSONValue::String(String::from("quality_strings")));
}
if self.store_optional_fields {
fields.push(JSONValue::String(String::from("optional")));
}
let fields = JSONValue::Array(fields);
let json = JSONValue::Object(vec![
(String::from("block_size"), JSONValue::Number(self.block_size)),
(String::from("fields"), fields),
]);
json.to_string()
}
}
impl Default for GAFBaseParams {
fn default() -> Self {
Self {
block_size: Self::BLOCK_SIZE,
reference_free: false,
store_quality_strings: true,
store_optional_fields: true,
}
}
}
impl GAFBase {
pub fn create_from_files(
gaf_file: &Path, gbwt_file: &Path, db_file: &Path,
graph: GraphReference<'_, '_>,
params: &GAFBaseParams
) -> Result<(), String> {
eprintln!("Loading GBWT index {}", gbwt_file.display());
let index: Arc<GBWT> = Arc::new(serialize::load_from(gbwt_file).map_err(|x| x.to_string())?);
Self::create(gaf_file, index, db_file, graph, params)
}
pub fn create<P: AsRef<Path>, Q: AsRef<Path>>(
gaf_file: P, index: Arc<GBWT>, db_file: Q,
graph: GraphReference<'_, '_>,
params: &GAFBaseParams
) -> Result<(), String> {
eprintln!("Creating database {}", db_file.as_ref().display());
if utils::file_exists(&db_file) {
return Err(format!("Database {} already exists", db_file.as_ref().display()));
}
let mut graph = graph;
if params.reference_free {
if graph.is_none() {
return Err(String::from("The construction of a reference-free GAF-base requires a graph"));
}
} else {
graph = GraphReference::None;
}
let mut gaf_file = utils::open_file(gaf_file)?;
let aln_name = Self::parse_gaf_headers(&mut gaf_file)?;
let graph_name = graph.graph_name()?;
utils::require_valid_reference(&aln_name, &graph_name)?;
let mut connection = Connection::open(&db_file).map_err(|x| x.to_string())?;
let nodes = Self::insert_nodes(&index, graph, &mut connection)?;
eprintln!("Database size: {}", utils::file_size(&db_file).unwrap_or(String::from("unknown")));
Self::insert_tags(&index, nodes, &aln_name, &mut connection, params)?;
eprintln!("Database size: {}", utils::file_size(&db_file).unwrap_or(String::from("unknown")));
Self::insert_alignments(index, gaf_file, connection, params)?;
eprintln!("Database size: {}", utils::file_size(&db_file).unwrap_or(String::from("unknown")));
Ok(())
}
fn parse_gaf_headers(gaf_file: &mut impl BufRead) -> Result<GraphName, String> {
let header_lines = formats::read_gaf_header_lines(gaf_file).map_err(|x| x.to_string())?;
Ok(GraphName::from_header_lines(&header_lines).unwrap_or_default())
}
fn gbwt_paths(index: &GBWT) -> usize {
if index.is_bidirectional() { index.sequences() / 2 } else { index.sequences() }
}
fn insert_tags(index: &GBWT, nodes: usize, aln_name: &GraphName, connection: &mut Connection, params: &GAFBaseParams) -> Result<(), String> {
eprintln!("Inserting header and tags");
connection.execute(
"CREATE TABLE Tags (
key TEXT PRIMARY KEY,
value TEXT NOT NULL
) STRICT",
(),
).map_err(|x| x.to_string())?;
let mut additional_tags = Tags::new();
aln_name.set_tags(&mut additional_tags);
let mut inserted = 0;
let transaction = connection.transaction().map_err(|x| x.to_string())?;
{
let mut insert = transaction.prepare(
"INSERT INTO Tags(key, value) VALUES (?1, ?2)"
).map_err(|x| x.to_string())?;
let alignments = Self::gbwt_paths(index);
insert.execute((Self::KEY_VERSION, Self::VERSION)).map_err(|x| x.to_string())?;
insert.execute((Self::KEY_NODES, nodes)).map_err(|x| x.to_string())?;
insert.execute((Self::KEY_ALIGNMENTS, alignments)).map_err(|x| x.to_string())?;
let bidirectional: usize = if index.is_bidirectional() { 1 } else { 0 };
insert.execute((Self::KEY_BIDIRECTIONAL_GBWT, bidirectional)).map_err(|x| x.to_string())?;
insert.execute((Self::KEY_PARAMS, params.to_json())).map_err(|x| x.to_string())?;
inserted += 5;
for (key, value) in additional_tags.iter() {
insert.execute((key, value)).map_err(|x| x.to_string())?;
inserted += 1;
}
}
transaction.commit().map_err(|x| x.to_string())?;
eprintln!("Inserted {} key-value pairs", inserted);
Ok(())
}
fn insert_nodes(index: &GBWT, graph: GraphReference<'_, '_>, connection: &mut Connection) -> Result<usize, String> {
eprintln!("Inserting nodes");
connection.execute(
"CREATE TABLE Nodes (
handle INTEGER PRIMARY KEY,
edges BLOB NOT NULL,
bwt BLOB NOT NULL,
sequence BLOB NOT NULL
) STRICT",
(),
).map_err(|x| x.to_string())?;
let mut inserted = 0;
let mut graph = graph;
let transaction = connection.transaction().map_err(|x| x.to_string())?;
{
let mut insert = transaction.prepare(
"INSERT INTO Nodes(handle, edges, bwt, sequence) VALUES (?1, ?2, ?3, ?4)"
).map_err(|x| x.to_string())?;
let bwt: &BWT = index.as_ref();
for record_id in bwt.id_iter() {
if record_id == gbz::ENDMARKER {
continue;
}
let handle = index.record_to_node(record_id);
let (edge_bytes, bwt_bytes) = bwt.compressed_record(record_id).unwrap();
let sequence = if graph.is_none() {
Vec::new()
} else {
let record = graph.gbz_record(handle)?;
utils::encode_sequence(record.sequence())
};
insert.execute((handle, edge_bytes, bwt_bytes, sequence)).map_err(|x| x.to_string())?;
inserted += 1;
}
}
transaction.commit().map_err(|x| x.to_string())?;
eprintln!("Inserted {} node records", inserted);
Ok(inserted / 2)
}
fn insert_alignments(index: Arc<GBWT>, gaf_file: Box<dyn BufRead>, connection: Connection, params: &GAFBaseParams) -> Result<(), String> {
eprintln!("Inserting alignments");
let mut gaf_file = gaf_file;
let mut connection = connection;
connection.execute(
"CREATE TABLE Alignments (
id INTEGER PRIMARY KEY,
min_handle INTEGER,
max_handle INTEGER CHECK (min_handle <= max_handle),
alignments INTEGER NOT NULL,
read_length INTEGER,
gbwt_starts BLOB NOT NULL,
names BLOB NOT NULL,
quality_strings BLOB NOT NULL,
difference_strings BLOB NOT NULL,
flags BLOB NOT NULL,
numbers BLOB NOT NULL,
optional BLOB NOT NULL
) STRICT",
(),
).map_err(|x| x.to_string())?;
connection.execute(
"CREATE INDEX AlignmentNodeInterval
ON Alignments(min_handle, max_handle)",
(),
).map_err(|x| x.to_string())?;
let (to_encoder, from_parser) = mpsc::sync_channel(4);
let (to_insert, from_encoder) = mpsc::sync_channel(4);
let (to_report, from_insert) = mpsc::sync_channel(1);
let gbwt_index = index.clone();
let encoder_thread = thread::spawn(move || {
let mut alignment_id = 0;
loop {
let block: Result<Vec<Alignment>, String> = from_parser.recv().unwrap();
match block {
Ok(block) => {
let encoded = AlignmentBlock::new(&block, &gbwt_index, alignment_id);
match encoded {
Ok(data) => {
let _ = to_insert.send(Ok(data));
},
Err(message) => {
let _ = to_insert.send(Err(message));
return;
}
}
alignment_id += block.len();
if block.is_empty() {
return;
}
},
Err(message) => {
let _ = to_insert.send(Err(message));
return;
},
}
}
});
let insert_thread = thread::spawn(move || {
let transaction = connection.transaction().map_err(|x| x.to_string());
if let Err(message) = transaction {
let _ = to_report.send(Err(message));
return;
}
let transaction = transaction.unwrap();
let insert = transaction.prepare(
"INSERT INTO
Alignments(min_handle, max_handle, alignments, read_length, gbwt_starts, names, quality_strings, difference_strings, flags, numbers, optional)
VALUES (?1, ?2, ?3, ?4, ?5, ?6, ?7, ?8, ?9, ?10, ?11)"
).map_err(|x| x.to_string());
if let Err(message) = insert {
let _ = to_report.send(Err(message));
return;
}
let mut insert = insert.unwrap();
let mut statistics = AlignmentStats::new();
loop {
let block: Result<AlignmentBlock, String> = from_encoder.recv().unwrap();
match block {
Ok(block) => {
if block.is_empty() {
break;
}
statistics.update(&block);
let result = insert.execute((
block.min_handle, block.max_handle, block.alignments, block.read_length,
block.gbwt_starts, block.names,
block.quality_strings, block.difference_strings,
block.flags.as_ref(), block.numbers, block.optional
)).map_err(|x| x.to_string());
if let Err(message) = result {
let _ = to_report.send(Err(message));
return;
}
},
Err(message) => {
let _ = to_report.send(Err(message));
return;
},
}
}
drop(insert);
let result = transaction.commit().map_err(|x| x.to_string());
if let Err(message) = result {
let _ = to_report.send(Err(message));
return;
}
let _ = to_report.send(Ok(statistics));
});
let mut line_num: usize = 0;
let mut unaligned_block = false;
let mut block: Vec<Alignment> = Vec::new();
let mut failed = false;
loop {
let mut buf: Vec<u8> = Vec::new();
let len = gaf_file.read_until(b'\n', &mut buf).map_err(|x| x.to_string());
match len {
Ok(len) => {
if len == 0 {
break;
}
},
Err(message) => {
let _ = to_encoder.send(Err(message));
failed = true;
break;
},
};
if formats::is_gaf_header_line(&buf) {
line_num += 1;
continue;
}
let aln = Alignment::from_gaf(&buf).map_err(
|x| format!("Failed to parse the alignment on line {}: {}", line_num, x)
);
match aln {
Ok(aln) => {
let mut aln = aln;
if !params.store_quality_strings {
aln.base_quality.clear();
}
if !params.store_optional_fields {
aln.optional.clear();
}
if aln.is_unaligned() != unaligned_block {
if !block.is_empty() {
let _ = to_encoder.send(Ok(block));
block = Vec::new();
}
unaligned_block = aln.is_unaligned();
}
block.push(aln);
if block.len() >= params.block_size {
let _ = to_encoder.send(Ok(block));
block = Vec::new();
}
},
Err(message) => {
let _ = to_encoder.send(Err(message));
failed = true;
break;
},
}
line_num += 1;
}
if !failed {
if !block.is_empty() {
let _ = to_encoder.send(Ok(block));
}
let _ = to_encoder.send(Ok(Vec::new()));
}
let _ = encoder_thread.join();
let _ = insert_thread.join();
let statistics = from_insert.recv().unwrap()?;
eprintln!("Inserted information on {} alignments", statistics.alignments);
let expected_alignments = Self::gbwt_paths(&index);
if statistics.alignments != expected_alignments {
eprintln!("Warning: Expected {} alignments", expected_alignments);
}
eprintln!(
"Field sizes: gbwt_starts {}, names {}, quality_strings {}, difference_strings {}, flags {}, numbers {}, optional {}",
utils::human_readable_size(statistics.start_bytes),
utils::human_readable_size(statistics.name_bytes),
utils::human_readable_size(statistics.quality_bytes),
utils::human_readable_size(statistics.difference_bytes),
utils::human_readable_size(statistics.flag_bytes),
utils::human_readable_size(statistics.number_bytes),
utils::human_readable_size(statistics.optional_bytes),
);
Ok(())
}
}
#[derive(Clone, Debug, PartialEq, Eq)]
pub struct GBZRecord {
handle: usize,
edges: Vec<Pos>,
bwt: Vec<u8>,
sequence: Vec<u8>,
next: Option<usize>,
}
impl GBZRecord {
pub fn from_gbz(graph: &GBZ, handle: usize) -> Option<Self> {
let node_id = support::node_id(handle);
if !graph.has_node(node_id) {
return None;
}
let index: &GBWT = graph.as_ref();
let bwt_records: &BWT = index.as_ref();
let record_id = index.node_to_record(handle);
let (edge_bytes, bwt_bytes) = bwt_records.compressed_record(record_id)?;
let (edges, _) = Record::decompress_edges(edge_bytes)?;
let bwt = bwt_bytes.to_vec();
let sequence = graph.sequence(node_id)?;
let sequence = if support::node_orientation(handle) == Orientation::Forward {
sequence.to_vec()
} else {
support::reverse_complement(sequence)
};
let next = None;
Some(GBZRecord { handle, edges, bwt, sequence, next })
}
#[doc(hidden)]
pub unsafe fn from_raw_parts(handle: usize, edges: Vec<Pos>, bwt: Vec<u8>, sequence: Vec<u8>, next: Option<usize>) -> Self {
GBZRecord { handle, edges, bwt, sequence, next }
}
pub fn to_gbwt_record(&self) -> Record<'_> {
if self.edges.is_empty() {
panic!("GBZRecord::to_gbwt_record: Empty record");
}
unsafe { Record::from_raw_parts(self.handle, self.edges.clone(), &self.bwt) }
}
#[inline]
pub fn handle(&self) -> usize {
self.handle
}
#[inline]
pub fn id(&self) -> usize {
support::node_id(self.handle)
}
#[inline]
pub fn orientation(&self) -> Orientation {
support::node_orientation(self.handle)
}
#[inline]
pub fn successors(&self) -> impl Iterator<Item = usize> + '_ {
self.edges.iter().filter(|x| x.node != gbz::ENDMARKER).map(|x| x.node)
}
#[inline]
pub fn edges(&self) -> impl Iterator<Item = Pos> + '_ {
self.edges.iter().filter(|x| x.node != gbz::ENDMARKER).copied()
}
pub(crate) fn edges_slice(&self) -> &[Pos] {
if !self.edges.is_empty() && self.edges[0].node == gbz::ENDMARKER {
return &self.edges[1..];
}
&self.edges
}
#[inline]
pub fn sequence(&self) -> &[u8] {
&self.sequence
}
#[inline]
pub fn sequence_len(&self) -> usize {
self.sequence.len()
}
#[inline]
pub fn next(&self) -> Option<usize> {
self.next
}
}
#[derive(Clone, Debug, PartialEq, Eq)]
pub struct GBZPath {
pub handle: usize,
pub fw_start: Pos,
pub rev_start: Pos,
pub name: FullPathName,
pub is_indexed: bool,
}
impl GBZPath {
pub fn name(&self) -> String {
self.name.to_string()
}
pub fn with_id(graph: &GBZ, path_id: usize) -> Option<Self> {
let metadata = graph.metadata()?;
let index: &GBWT = graph.as_ref();
let fw_start = path_start(index, path_id, Orientation::Forward);
let rev_start = path_start(index, path_id, Orientation::Reverse);
let name = FullPathName::from_metadata(metadata, path_id)?;
Some(GBZPath {
handle: path_id,
fw_start, rev_start,
name,
is_indexed: false,
})
}
pub fn with_name(graph: &GBZ, name: &FullPathName) -> Option<Self> {
let metadata = graph.metadata()?;
let path_id = metadata.find_fragment(name)?;
let index: &GBWT = graph.as_ref();
let fw_start = path_start(index, path_id, Orientation::Forward);
let rev_start = path_start(index, path_id, Orientation::Reverse);
Some(GBZPath {
handle: path_id,
fw_start, rev_start,
name: FullPathName::from_metadata(metadata, path_id)?,
is_indexed: false,
})
}
}
impl AsRef<FullPathName> for GBZPath {
fn as_ref(&self) -> &FullPathName {
&self.name
}
}
#[derive(Debug)]
pub struct GraphInterface<'a> {
get_tag: Statement<'a>,
get_tags: Statement<'a>,
get_record: Statement<'a>,
get_path: Statement<'a>,
find_path: Statement<'a>,
paths_for_sample: Statement<'a>,
indexed_position: Statement<'a>,
}
impl<'a> GraphInterface<'a> {
pub fn new(database: &'a GBZBase) -> Result<Self, String> {
let get_tag = database.connection.prepare(
"SELECT value FROM Tags WHERE key = ?1"
).map_err(|x| x.to_string())?;
let get_tags = database.connection.prepare(
"SELECT key, value FROM Tags WHERE key LIKE ?1"
).map_err(|x| x.to_string())?;
let get_record = database.connection.prepare(
"SELECT edges, bwt, sequence, next FROM Nodes WHERE handle = ?1"
).map_err(|x| x.to_string())?;
let get_path = database.connection.prepare(
"SELECT * FROM Paths WHERE handle = ?1"
).map_err(|x| x.to_string())?;
let find_path = database.connection.prepare(
"SELECT * FROM Paths
WHERE sample = ?1 AND contig = ?2 AND haplotype = ?3 AND fragment <= ?4
ORDER BY fragment DESC
LIMIT 1"
).map_err(|x| x.to_string())?;
let paths_for_sample = database.connection.prepare(
"SELECT * FROM Paths WHERE sample = ?1"
).map_err(|x| x.to_string())?;
let indexed_position = database.connection.prepare(
"SELECT path_offset, node_handle, node_offset FROM ReferenceIndex
WHERE path_handle = ?1 AND path_offset <= ?2
ORDER BY path_offset DESC
LIMIT 1"
).map_err(|x| x.to_string())?;
Ok(GraphInterface {
get_tag, get_tags,
get_record,
get_path, find_path, paths_for_sample,
indexed_position,
})
}
pub fn get_gbwt_tag(&mut self, key: &str) -> Result<Option<String>, String> {
let key = format!("{}{}", GBZBase::KEY_GBWT, key);
self.get_tag.query_row(
(key,),
|row| row.get(0)
).optional().map_err(|x| x.to_string())
}
pub fn get_gbz_tag(&mut self, key: &str) -> Result<Option<String>, String> {
let key = format!("{}{}", GBZBase::KEY_GBZ, key);
self.get_tag.query_row(
(key,),
|row| row.get(0)
).optional().map_err(|x| x.to_string())
}
fn get_tags_with_prefix(&mut self, prefix: &str) -> Result<Tags, String> {
let mut tags = Tags::new();
let pattern = format!("{}%", prefix);
let mut rows = self.get_tags.query((pattern,)).map_err(|x| x.to_string())?;
while let Some(row) = rows.next().map_err(|x| x.to_string())? {
let key: String = row.get(0).map_err(|x| x.to_string())?;
let value: String = row.get(1).map_err(|x| x.to_string())?;
let key = key.trim_start_matches(prefix).to_string();
tags.insert(&key, &value);
}
Ok(tags)
}
pub fn get_gbwt_tags(&mut self) -> Result<Tags, String> {
self.get_tags_with_prefix(GBZBase::KEY_GBWT)
}
pub fn get_gbz_tags(&mut self) -> Result<Tags, String> {
self.get_tags_with_prefix(GBZBase::KEY_GBZ)
}
pub fn graph_name(&mut self) -> Result<GraphName, String> {
let tags = self.get_gbz_tags()?;
Ok(GraphName::from_tags(&tags).unwrap_or_default())
}
pub fn get_record(&mut self, handle: usize) -> Result<Option<GBZRecord>, String> {
self.get_record.query_row(
(handle,),
|row| {
let edge_bytes: Vec<u8> = row.get(0)?;
let (edges, _) = Record::decompress_edges(&edge_bytes).unwrap();
let bwt: Vec<u8> = row.get(1)?;
let encoded_sequence: Vec<u8> = row.get(2)?;
let sequence: Vec<u8> = utils::decode_sequence(&encoded_sequence);
let next: Option<usize> = row.get(3)?;
Ok(GBZRecord { handle, edges, bwt, sequence, next })
}
).optional().map_err(|x| x.to_string())
}
fn row_to_gbz_path(row: &Row) -> rusqlite::Result<GBZPath> {
let handle = row.get(0)?;
let fw_start = Pos::new(row.get(1)?, row.get(2)?);
let rev_start = Pos::new(row.get(3)?, row.get(4)?);
let name = FullPathName {
sample: row.get(5)?,
contig: row.get(6)?,
haplotype: row.get(7)?,
fragment: row.get(8)?,
};
let is_indexed = row.get(9)?;
Ok(GBZPath {
handle,
fw_start, rev_start,
name,
is_indexed,
})
}
pub fn get_path(&mut self, handle: usize) -> Result<Option<GBZPath>, String> {
self.get_path.query_row((handle,), Self::row_to_gbz_path).optional().map_err(|x| x.to_string())
}
pub fn find_path(&mut self, name: &FullPathName) -> Result<Option<GBZPath>, String> {
self.find_path.query_row(
(&name.sample, &name.contig, name.haplotype, name.fragment),
Self::row_to_gbz_path
).optional().map_err(|x| x.to_string())
}
pub fn paths_for_sample(&mut self, sample_name: &str) -> Result<Vec<GBZPath>, String> {
let mut result: Vec<GBZPath> = Vec::new();
let mut rows = self.paths_for_sample.query((sample_name,)).map_err(|x| x.to_string())?;
while let Some(row) = rows.next().map_err(|x| x.to_string())? {
let path = Self::row_to_gbz_path(row).map_err(|x| x.to_string())?;
result.push(path);
}
Ok(result)
}
pub fn indexed_position(&mut self, path_handle: usize, path_offset: usize) -> Result<Option<(usize, Pos)>, String> {
self.indexed_position.query_row(
(path_handle, path_offset),
|row| {
let path_offset = row.get(0)?;
let node_handle = row.get(1)?;
let node_offset = row.get(2)?;
Ok((path_offset, Pos::new(node_handle, node_offset)))
}
).optional().map_err(|x| x.to_string())
}
}
pub enum GraphReference<'reference, 'graph> {
Gbz(&'reference GBZ),
Db(&'reference mut GraphInterface<'graph>),
None,
}
impl<'reference, 'graph> GraphReference<'reference, 'graph> {
pub fn gbz_record(&mut self, handle: usize) -> Result<GBZRecord, String> {
match self {
GraphReference::Gbz(gbz) => {
GBZRecord::from_gbz(gbz, handle).ok_or_else(|| format!("The graph does not contain handle {}", handle))
},
GraphReference::Db(db) => {
db.get_record(handle)?.ok_or_else(|| format!("The graph does not contain handle {}", handle))
},
GraphReference::None => Err(String::from("No reference graph provided")),
}
}
pub fn graph_name(&mut self) -> Result<GraphName, String> {
match self {
GraphReference::Gbz(gbz) => {
Ok(GraphName::from_gbz(gbz))
},
GraphReference::Db(db) => {
db.graph_name()
},
GraphReference::None => Ok(GraphName::default()),
}
}
#[inline]
pub fn is_none(&self) -> bool {
matches!(self, GraphReference::None)
}
}
pub fn path_start(index: &GBWT, path_id: usize, orientation: Orientation) -> Pos {
index.start(support::encode_path(path_id, orientation)).unwrap_or(Pos::new(gbz::ENDMARKER, 0))
}
#[derive(Clone, Debug, PartialEq, Eq)]
pub enum FileType {
Missing,
NotFile,
UnknownFile,
Gbz,
UnknownDatabase,
Version(String),
}
pub fn identify_file<P: AsRef<Path>>(filename: P) -> FileType {
let metadata = fs::metadata(&filename);
if metadata.is_err() {
return FileType::Missing;
}
let metadata = metadata.unwrap();
if !metadata.is_file() {
return FileType::NotFile;
}
if GBZ::is_gbz(&filename) {
return FileType::Gbz;
}
let flags = OpenFlags::SQLITE_OPEN_READ_ONLY | OpenFlags::SQLITE_OPEN_NO_MUTEX;
let connection = Connection::open_with_flags(filename, flags);
if connection.is_err() {
return FileType::UnknownFile;
}
let connection = connection.unwrap();
let statement = connection.prepare("SELECT value FROM Tags WHERE key = 'version'");
if statement.is_err() {
return FileType::UnknownDatabase;
}
let mut statement = statement.unwrap();
let version: rusqlite::Result<String> = statement.query_row([], |row| row.get(0));
if version.is_err() {
return FileType::UnknownDatabase;
}
FileType::Version(version.unwrap())
}
fn get_string_value(statement: &mut Statement, key: &str) -> Result<String, String> {
let result: rusqlite::Result<String> = statement.query_row(
(key,),
|row| row.get(0)
);
match result {
Ok(value) => Ok(value),
Err(x) => Err(format!("Key not found: {} ({})", key, x)),
}
}
fn get_numeric_value(statement: &mut Statement, key: &str) -> Result<usize, String> {
let value = get_string_value(statement, key)?;
value.parse::<usize>().map_err(|x| x.to_string())
}