1use crate::{GAFBase, GBZRecord, GraphReference, Subgraph, Alignment, AlignmentBlock};
4use crate::alignment::{Flags, TargetPath};
5use crate::utils;
6
7use gbz::{Orientation, Pos, GBZ};
8use gbz::bwt::Record;
9use gbz::support;
10
11use rusqlite::{Row, OptionalExtension};
12
13use std::collections::{BTreeMap, HashSet};
14use std::fmt::Display;
15use std::io::Write;
16use std::ops::Range;
17use std::sync::Arc;
18
19#[cfg(test)]
20mod tests;
21
22#[derive(Clone, Copy, Debug, PartialEq, Eq, PartialOrd, Ord, Hash)]
26pub enum AlignmentOutput {
27 Overlapping,
29 Clipped,
31 Contained,
33}
34
35impl Display for AlignmentOutput {
36 fn fmt(&self, f: &mut std::fmt::Formatter) -> std::fmt::Result {
37 match self {
38 AlignmentOutput::Overlapping => write!(f, "overlapping"),
39 AlignmentOutput::Clipped => write!(f, "clipped"),
40 AlignmentOutput::Contained => write!(f, "contained"),
41 }
42 }
43}
44
45#[derive(Debug, Clone, PartialEq, Default)]
102pub struct ReadSet {
103 nodes: BTreeMap<usize, GBZRecord>,
105 reads: Vec<Alignment>,
106 unclipped: usize,
108 blocks: usize,
109 clusters: usize,
111}
112
113impl ReadSet {
114 pub const CLUSTER_GAP_THRESHOLD: usize = 1000;
117
118 fn get_row_id(row: &Row) -> Result<usize, String> {
120 let row_id: usize = row.get(0).map_err(|x| x.to_string())?;
121 Ok(row_id)
122 }
123
124 fn decompress_block(row: &Row, from_idx: usize) -> Result<Vec<Alignment>, String> {
126 let min_handle: Option<usize> = row.get(from_idx + 0).map_err(|x| x.to_string())?;
127 let max_handle: Option<usize> = row.get(from_idx + 1).map_err(|x| x.to_string())?;
128 let alignments: usize = row.get(from_idx + 2).map_err(|x| x.to_string())?;
129 let read_length: Option<usize> = row.get(from_idx + 3).map_err(|x| x.to_string())?;
130 let gbwt_starts: Vec<u8> = row.get(from_idx + 4).map_err(|x| x.to_string())?;
131 let names: Vec<u8> = row.get(from_idx + 5).map_err(|x| x.to_string())?;
132 let quality_strings: Vec<u8> = row.get(from_idx + 6).map_err(|x| x.to_string())?;
133 let difference_strings: Vec<u8> = row.get(from_idx + 7).map_err(|x| x.to_string())?;
134 let flags: Vec<u8> = row.get(from_idx + 8).map_err(|x| x.to_string())?;
135 let numbers: Vec<u8> = row.get(from_idx + 9).map_err(|x| x.to_string())?;
136 let optional: Vec<u8> = row.get(from_idx + 10).map_err(|x| x.to_string())?;
137 let block = AlignmentBlock {
138 min_handle, max_handle, alignments, read_length,
139 gbwt_starts, names,
140 quality_strings, difference_strings,
141 flags: Flags::from(flags), numbers,
142 optional,
143 };
144 block.decode()
145 }
146
147 fn set_target_path(
151 &mut self, alignment: &mut Alignment, subgraph: &Subgraph,
152 get_record: &mut dyn FnMut(usize) -> Result<GBZRecord, String>,
153 contained: bool
154 ) -> Result<(), String> {
155 let mut pos = match alignment.path {
156 TargetPath::Path(_) => return Ok(()),
157 TargetPath::StartPosition(pos) => Some(pos),
158 };
159
160 let mut path = Vec::new();
161 let mut overlap = false;
162 let mut target_path_len = 0;
163 while let Some(p) = pos {
164 if subgraph.has_handle(p.node) {
165 overlap = true;
166 } else if contained {
167 return Ok(()); }
169
170 let mut record = self.nodes.get(&p.node);
172 if record.is_none() {
173 let result = get_record(p.node)?;
174 self.nodes.insert(p.node, result);
175 record = self.nodes.get(&p.node);
176 }
177
178 path.push(p.node);
180 let record = record.unwrap();
181 target_path_len += record.sequence_len();
182 pos = record.to_gbwt_record().lf(p.offset);
183 }
184
185 if overlap {
187 alignment.set_target_path(path);
188 alignment.set_target_path_len(target_path_len);
189 }
190 Ok(())
191 }
192
193 fn set_target_path_simple(
196 &mut self, alignment: &mut Alignment,
197 get_record: &mut dyn FnMut(usize) -> Result<GBZRecord, String>,
198 ) -> Result<(), String> {
199 let mut pos = match alignment.path {
200 TargetPath::Path(_) => return Ok(()),
201 TargetPath::StartPosition(pos) => Some(pos),
202 };
203
204 let mut path = Vec::new();
205 let mut target_path_len = 0;
206 while let Some(p) = pos {
207 let mut record = self.nodes.get(&p.node);
209 if record.is_none() {
210 let result = get_record(p.node)?;
211 self.nodes.insert(p.node, result);
212 record = self.nodes.get(&p.node);
213 }
214
215 path.push(p.node);
217 let record = record.unwrap();
218 target_path_len += record.sequence_len();
219 pos = record.to_gbwt_record().lf(p.offset);
220 }
221
222 alignment.set_target_path(path);
224 alignment.set_target_path_len(target_path_len);
225 Ok(())
226 }
227
228 pub fn new(graph: GraphReference<'_, '_>, subgraph: &Subgraph, database: &GAFBase, output: AlignmentOutput) -> Result<Self, String> {
245 let mut read_set = ReadSet::default();
246
247 let mut get_node = database.connection.prepare(
249 "SELECT edges, bwt, sequence FROM Nodes WHERE handle = ?1"
250 ).map_err(|x| x.to_string())?;
251 let mut graph = graph;
252 let mut get_record = |handle: usize| -> Result<GBZRecord, String> {
253 let gaf_result = get_node.query_row(
255 (handle,),
256 |row: &Row<'_>| -> rusqlite::Result<(Vec<Pos>, Vec<u8>, Vec<u8>)> {
257 let edge_bytes: Vec<u8> = row.get(0)?;
258 let (edges, _) = Record::decompress_edges(&edge_bytes).unwrap();
259 let bwt: Vec<u8> = row.get(1)?;
260 let encoded_sequence: Vec<u8> = row.get(2)?;
261 let sequence = utils::decode_sequence(&encoded_sequence);
262 Ok((edges, bwt, sequence))
263 }
264 ).optional().map_err(|x| x.to_string())?;
265 if gaf_result.is_none() {
266 return Err(format!("Could not find the record for handle {} in GAF-base", handle));
267 }
268 let (edges, bwt, mut sequence) = gaf_result.unwrap();
269
270 if sequence.is_empty() {
273 let seq = subgraph.sequence_for_handle(handle);
274 sequence = match seq {
275 Some(seq) => seq.to_vec(),
276 None => {
277 let gbz_record = graph.gbz_record(handle)?;
278 gbz_record.sequence().to_vec()
279 }
280 };
281 }
282
283 unsafe {
284 Ok(GBZRecord::from_raw_parts(handle, edges, bwt, sequence, None))
285 }
286 };
287
288 let node_ids: Vec<usize> = subgraph.node_iter().collect();
291 let clusters = utils::cluster_node_ids(node_ids, Self::CLUSTER_GAP_THRESHOLD);
292 let clusters: Vec<(usize, usize)> = clusters.into_iter()
293 .map(|r| (support::encode_node(*r.start(), Orientation::Forward), support::encode_node(*r.end(), Orientation::Reverse)))
294 .collect();
295 read_set.clusters = clusters.len();
296
297 let mut row_ids: HashSet<usize> = HashSet::new();
300 let mut get_reads = database.connection.prepare(
301 "SELECT id, min_handle, max_handle, alignments, read_length, gbwt_starts, names, quality_strings, difference_strings, flags, numbers, optional
302 FROM Alignments
303 WHERE min_handle <= ?1 AND max_handle >= ?2"
304 ).map_err(|x| x.to_string())?;
305 for (min_handle, max_handle) in clusters.into_iter() {
306 let mut rows = get_reads.query((max_handle, min_handle)).map_err(|x| x.to_string())?;
307 while let Some(row) = rows.next().map_err(|x| x.to_string())? {
308 let row_id = Self::get_row_id(row)?;
309 if row_ids.contains(&row_id) {
310 continue;
311 }
312 row_ids.insert(row_id);
313 let block = Self::decompress_block(row, 1)?;
314 for mut alignment in block {
315 read_set.set_target_path(&mut alignment, subgraph, &mut get_record, output == AlignmentOutput::Contained)?;
316 if alignment.has_target_path() {
317 if output == AlignmentOutput::Clipped {
318 let sequence_len = Arc::new(|handle| {
319 let record = read_set.nodes.get(&handle)?;
320 Some(record.sequence().len())
321 });
322 let clipped = alignment.clip(subgraph, sequence_len)?;
323 for aln in clipped.into_iter() {
324 read_set.reads.push(aln);
325 }
326 } else {
327 read_set.reads.push(alignment);
328 }
329 read_set.unclipped += 1;
330 }
331 }
332 read_set.blocks += 1;
333 }
334 }
335
336 Ok(read_set)
337 }
338
339 pub fn from_rows(database: &GAFBase, row_range: Range<usize>, graph: Option<&GBZ>) -> Result<Self, String> {
355 let mut read_set = ReadSet { clusters: 1, ..Default::default() };
356
357 let mut get_node = database.connection.prepare(
359 "SELECT edges, bwt, sequence FROM Nodes WHERE handle = ?1"
360 ).map_err(|x| x.to_string())?;
361 let mut get_record = |handle: usize| -> Result<GBZRecord, String> {
362 let gaf_result = get_node.query_row(
364 (handle,),
365 |row: &Row<'_>| -> rusqlite::Result<(Vec<Pos>, Vec<u8>, Vec<u8>)> {
366 let edge_bytes: Vec<u8> = row.get(0)?;
367 let (edges, _) = Record::decompress_edges(&edge_bytes).unwrap();
368 let bwt: Vec<u8> = row.get(1)?;
369 let encoded_sequence: Vec<u8> = row.get(2)?;
370 let sequence = utils::decode_sequence(&encoded_sequence);
371 Ok((edges, bwt, sequence))
372 }
373 ).optional().map_err(|x| x.to_string())?;
374 if gaf_result.is_none() {
375 return Err(format!("Could not find the record for handle {} in GAF-base", handle));
376 }
377 let (edges, bwt, mut sequence) = gaf_result.unwrap();
378 if sequence.is_empty() {
379 if let Some(graph) = graph {
380 let seq = graph.sequence(support::node_id(handle)).ok_or_else(||
381 format!("Could not find the sequence for handle {} in GBZ", handle)
382 )?;
383 sequence = seq.to_vec();
384 } else {
385 return Err(String::from("No reference provided for a reference-based GAF-base"));
386 }
387 }
388 if support::node_orientation(handle) == Orientation::Reverse {
389 sequence = support::reverse_complement(&sequence);
390 }
391
392 unsafe {
393 Ok(GBZRecord::from_raw_parts(handle, edges, bwt, sequence, None))
394 }
395 };
396
397 let mut get_reads = database.connection.prepare(
399 "SELECT min_handle, max_handle, alignments, read_length, gbwt_starts, names, quality_strings, difference_strings, flags, numbers, optional
400 FROM Alignments
401 WHERE id >= ?1 AND id < ?2"
402 ).map_err(|x| x.to_string())?;
403 let mut rows = get_reads.query((row_range.start, row_range.end)).map_err(|x| x.to_string())?;
404 while let Some(row) = rows.next().map_err(|x| x.to_string())? {
405 let block = Self::decompress_block(row, 0)?;
406 for mut alignment in block {
407 read_set.set_target_path_simple(&mut alignment, &mut get_record)?;
408 if alignment.has_target_path() {
409 read_set.reads.push(alignment);
410 read_set.unclipped += 1;
411 }
412 }
413 read_set.blocks += 1;
414 }
415
416 Ok(read_set)
417 }
418
419 #[inline]
421 pub fn len(&self) -> usize {
422 self.reads.len()
423 }
424
425 #[inline]
427 pub fn is_empty(&self) -> bool {
428 self.reads.is_empty()
429 }
430
431 #[inline]
433 pub fn unclipped(&self) -> usize {
434 self.unclipped
435 }
436
437 #[inline]
439 pub fn blocks(&self) -> usize {
440 self.blocks
441 }
442
443 #[inline]
448 pub fn node_records(&self) -> usize {
449 self.nodes.len()
450 }
451
452 #[inline]
454 pub fn clusters(&self) -> usize {
455 self.clusters
456 }
457
458 #[inline]
460 pub fn iter(&self) -> impl Iterator<Item = &Alignment> {
461 self.reads.iter()
462 }
463
464 fn target_sequence(&self, alignment: &Alignment) -> Result<Vec<u8>, String> {
466 let target_path = alignment.target_path();
467 if target_path.is_none() {
468 return Ok(Vec::new());
469 }
470 let target_path = target_path.unwrap();
471
472 let mut sequence = Vec::new();
473 for handle in target_path{
474 let record = self.nodes.get(handle);
475 if record.is_none() {
476 return Err(format!("Read {}: Missing record for node handle {}", alignment.name, handle));
477 }
478 let record = record.unwrap();
479 sequence.extend_from_slice(record.sequence());
480 }
481
482 if sequence.len() != alignment.path_len {
483 return Err(format!(
484 "Read {}: Target path length {} does not match the expected length {}",
485 alignment.name, sequence.len(), alignment.path_len
486 ));
487 }
488 Ok(sequence)
489 }
490
491 pub fn to_gaf<W: Write>(&self, writer: &mut W) -> Result<(), String> {
497 for alignment in self.reads.iter() {
498 let target_sequence = self.target_sequence(alignment)?;
499 let mut line = alignment.to_gaf(&target_sequence);
500 line.push(b'\n');
501 writer.write_all(&line).map_err(|x| x.to_string())?;
502 }
503 Ok(())
504 }
505}
506
507