Skip to main content

gbz_base/
read_set.rs

1//! A set of reads extracted from a GAF-base.
2
3use crate::{GAFBase, GBZRecord, GraphReference, Subgraph, Alignment, AlignmentBlock};
4use crate::alignment::{Flags, TargetPath};
5use crate::utils;
6
7use gbz::{Orientation, Pos, GBZ};
8use gbz::bwt::Record;
9use gbz::support;
10
11use rusqlite::{Row, OptionalExtension};
12
13use std::collections::{BTreeMap, HashSet};
14use std::fmt::Display;
15use std::io::Write;
16use std::ops::Range;
17use std::sync::Arc;
18
19#[cfg(test)]
20mod tests;
21
22//-----------------------------------------------------------------------------
23
24/// Output options for alignments in a subgraph.
25#[derive(Clone, Copy, Debug, PartialEq, Eq, PartialOrd, Ord, Hash)]
26pub enum AlignmentOutput {
27    /// Reads overlapping with the subgraph.
28    Overlapping,
29    /// Overlapping reads clipped to the subgraph.
30    Clipped,
31    /// Reads fully contained within the subgraph.
32    Contained,
33}
34
35impl Display for AlignmentOutput {
36    fn fmt(&self, f: &mut std::fmt::Formatter) -> std::fmt::Result {
37        match self {
38            AlignmentOutput::Overlapping => write!(f, "overlapping"),
39            AlignmentOutput::Clipped => write!(f, "clipped"),
40            AlignmentOutput::Contained => write!(f, "contained"),
41        }
42    }
43}
44
45//-----------------------------------------------------------------------------
46
47/// A set of reads extracted from [`GAFBase`].
48///
49/// This is a counterpart to [`Subgraph`].
50/// Sets of reads fully contained in a subgraph or overlapping with it can be created using [`ReadSet::new`].
51/// The reads can be iterated over with [`ReadSet::iter`] and converted to GAF lines with [`ReadSet::to_gaf`].
52/// The reads will appear in the same order as in the database.
53///
54/// # Examples
55///
56/// ```
57/// use gbz_base::{Subgraph, SubgraphQuery, HaplotypeOutput};
58/// use gbz_base::{GAFBase, GAFBaseParams, ReadSet, AlignmentOutput, GraphReference};
59/// use gbz_base::utils;
60/// use gbz::GBZ;
61/// use simple_sds::serialize;
62///
63/// // Get an in-memory graph.
64/// let gbz_file = utils::get_test_data("micb-kir3dl1.gbz");
65/// let graph = serialize::load_from(&gbz_file).unwrap();
66///
67/// // Extract a 100 bp subgraph around node 150.
68/// let nodes = vec![150];
69/// let query = SubgraphQuery::nodes(nodes).with_output(HaplotypeOutput::Distinct);
70/// let mut subgraph = Subgraph::new();
71/// let _ = subgraph.from_gbz(&graph, None, None, &query).unwrap();
72///
73/// // Create a database of reads aligned to the graph.
74/// let gaf_file = utils::get_test_data("micb-kir3dl1_HG003.gaf");
75/// let gbwt_file = None; // Build a new GBWT index.
76/// let db_file = serialize::temp_file_name("gaf-base");
77/// let graph_ref = GraphReference::None; // Do not store sequences in the database.
78/// let params = GAFBaseParams::default();
79/// let db = GAFBase::create_from_files(&gaf_file, gbwt_file, &db_file, graph_ref, &params);
80/// assert!(db.is_ok());
81///
82/// // Extract all reads fully within the subgraph.
83/// let db = GAFBase::open(&db_file);
84/// assert!(db.is_ok());
85/// let db = db.unwrap();
86/// let read_set = ReadSet::new(GraphReference::Gbz(&graph), &subgraph, &db, AlignmentOutput::Contained);
87/// assert!(read_set.is_ok());
88/// let read_set = read_set.unwrap();
89/// assert_eq!(read_set.len(), 148);
90///
91/// // The extracted reads are aligned and fully within the subgraph.
92/// for aln in read_set.iter() {
93///     for handle in aln.target_path().unwrap() {
94///         assert!(subgraph.has_handle(*handle));
95///     }
96/// }
97///
98/// drop(db);
99/// let _ = std::fs::remove_file(&db_file);
100/// ```
101#[derive(Debug, Clone, PartialEq, Default)]
102pub struct ReadSet {
103    // GBZ records from the GAF GBWT, with sequence from the graph/subgraph.
104    nodes: BTreeMap<usize, GBZRecord>,
105    reads: Vec<Alignment>,
106    // Number of alignments before clipping.
107    unclipped: usize,
108    blocks: usize,
109    // Number of node id clusters in the subgraph.
110    clusters: usize,
111}
112
113impl ReadSet {
114    // TODO: Should this be configurable?
115    /// Gap length threshold for clustering node ids.
116    pub const CLUSTER_GAP_THRESHOLD: usize = 1000;
117
118    // Returns the row id from the first column.
119    fn get_row_id(row: &Row) -> Result<usize, String> {
120        let row_id: usize = row.get(0).map_err(|x| x.to_string())?;
121        Ok(row_id)
122    }
123
124    // Decompresses an alignment block from a row, starting from index `from_idx`.
125    fn decompress_block(row: &Row, from_idx: usize) -> Result<Vec<Alignment>, String> {
126        let min_handle: Option<usize> = row.get(from_idx + 0).map_err(|x| x.to_string())?;
127        let max_handle: Option<usize> = row.get(from_idx + 1).map_err(|x| x.to_string())?;
128        let alignments: usize = row.get(from_idx + 2).map_err(|x| x.to_string())?;
129        let read_length: Option<usize> = row.get(from_idx + 3).map_err(|x| x.to_string())?;
130        let gbwt_starts: Vec<u8> = row.get(from_idx + 4).map_err(|x| x.to_string())?;
131        let names: Vec<u8> = row.get(from_idx + 5).map_err(|x| x.to_string())?;
132        let quality_strings: Vec<u8> = row.get(from_idx + 6).map_err(|x| x.to_string())?;
133        let difference_strings: Vec<u8> = row.get(from_idx + 7).map_err(|x| x.to_string())?;
134        let flags: Vec<u8> = row.get(from_idx + 8).map_err(|x| x.to_string())?;
135        let numbers: Vec<u8> = row.get(from_idx + 9).map_err(|x| x.to_string())?;
136        let optional: Vec<u8> = row.get(from_idx + 10).map_err(|x| x.to_string())?;
137        let block = AlignmentBlock {
138            min_handle, max_handle, alignments, read_length,
139            gbwt_starts, names,
140            quality_strings, difference_strings,
141            flags: Flags::from(flags), numbers,
142            optional,
143        };
144        block.decode()
145    }
146
147    // Replaces the GBWT starting position of the alignment with the path and sets the true target path length.
148    // Requires that the path overlaps with / is fully contained in the subgraph.
149    // If the path is valid, inserts all missing node records into the read set.
150    fn set_target_path(
151        &mut self, alignment: &mut Alignment, subgraph: &Subgraph,
152        get_record: &mut dyn FnMut(usize) -> Result<GBZRecord, String>,
153        contained: bool
154    ) -> Result<(), String> {
155        let mut pos = match alignment.path {
156            TargetPath::Path(_) => return Ok(()),
157            TargetPath::StartPosition(pos) => Some(pos),
158        };
159
160        let mut path = Vec::new();
161        let mut overlap = false;
162        let mut target_path_len = 0;
163        while let Some(p) = pos {
164            if subgraph.has_handle(p.node) {
165                overlap = true;
166            } else if contained {
167                return Ok(()); // Not fully contained in the subgraph.
168            }
169
170            // Now get the record for the node.
171            let mut record = self.nodes.get(&p.node);
172            if record.is_none() {
173                let result = get_record(p.node)?;
174                self.nodes.insert(p.node, result);
175                record = self.nodes.get(&p.node);
176            }
177
178            // Navigate to the next position.
179            path.push(p.node);
180            let record = record.unwrap();
181            target_path_len += record.sequence_len();
182            pos = record.to_gbwt_record().lf(p.offset);
183        }
184
185        // Set the target path in the alignment.
186        if overlap {
187            alignment.set_target_path(path);
188            alignment.set_target_path_len(target_path_len);
189        }
190        Ok(())
191    }
192
193    // Replaces the GBWT starting position of the alignment with the path and sets the true target path length.
194    // Inserts all missing node records into the read set.
195    fn set_target_path_simple(
196        &mut self, alignment: &mut Alignment,
197        get_record: &mut dyn FnMut(usize) -> Result<GBZRecord, String>,
198    ) -> Result<(), String> {
199        let mut pos = match alignment.path {
200            TargetPath::Path(_) => return Ok(()),
201            TargetPath::StartPosition(pos) => Some(pos),
202        };
203
204        let mut path = Vec::new();
205        let mut target_path_len = 0;
206        while let Some(p) = pos {
207            // Now get the record for the node.
208            let mut record = self.nodes.get(&p.node);
209            if record.is_none() {
210                let result = get_record(p.node)?;
211                self.nodes.insert(p.node, result);
212                record = self.nodes.get(&p.node);
213            }
214
215            // Navigate to the next position.
216            path.push(p.node);
217            let record = record.unwrap();
218            target_path_len += record.sequence_len();
219            pos = record.to_gbwt_record().lf(p.offset);
220        }
221
222        // Set the target path in the alignment.
223        alignment.set_target_path(path);
224        alignment.set_target_path_len(target_path_len);
225        Ok(())
226    }
227
228    /// Extracts a set of reads overlapping with the subgraph.
229    ///
230    /// The extracted reads will be in the same order as in the database.
231    /// That corresponds to the order in the original GAF file.
232    ///
233    /// # Arguments
234    ///
235    /// * `graph`: A GBZ-compatible graph for querying a reference-based GAF-base, or no graph for a reference-free one.
236    /// * `subgraph`: The subgraph used as the query region.
237    /// * `database`: A database storing reads aligned to the graph.
238    /// * `output`: Which reads to include in the read set.
239    ///
240    /// # Errors
241    ///
242    /// Passes through any database errors.
243    /// Returns an error if an alignment cannot be decompressed.
244    pub fn new(graph: GraphReference<'_, '_>, subgraph: &Subgraph, database: &GAFBase, output: AlignmentOutput) -> Result<Self, String> {
245        let mut read_set = ReadSet::default();
246
247        // Build a record from the databases.
248        let mut get_node = database.connection.prepare(
249            "SELECT edges, bwt, sequence FROM Nodes WHERE handle = ?1"
250        ).map_err(|x| x.to_string())?;
251        let mut graph = graph;
252        let mut get_record = |handle: usize| -> Result<GBZRecord, String> {
253            // Get the node record from the GAF-base.
254            let gaf_result = get_node.query_row(
255                (handle,),
256                |row: &Row<'_>| -> rusqlite::Result<(Vec<Pos>, Vec<u8>, Vec<u8>)> {
257                    let edge_bytes: Vec<u8> = row.get(0)?;
258                    let (edges, _) = Record::decompress_edges(&edge_bytes).unwrap();
259                    let bwt: Vec<u8> = row.get(1)?;
260                    let encoded_sequence: Vec<u8> = row.get(2)?;
261                    let sequence = utils::decode_sequence(&encoded_sequence);
262                    Ok((edges, bwt, sequence))
263                }
264            ).optional().map_err(|x| x.to_string())?;
265            if gaf_result.is_none() {
266                return Err(format!("Could not find the record for handle {} in GAF-base", handle));
267            }
268            let (edges, bwt, mut sequence) = gaf_result.unwrap();
269
270            // If we have a reference-based database, try to get the sequence from the
271            // subgraph or the original graph.
272            if sequence.is_empty() {
273                let seq = subgraph.sequence_for_handle(handle);
274                sequence = match seq {
275                    Some(seq) => seq.to_vec(),
276                    None => {
277                        let gbz_record = graph.gbz_record(handle)?;
278                        gbz_record.sequence().to_vec()
279                    }
280                };
281            }
282
283            unsafe {
284                Ok(GBZRecord::from_raw_parts(handle, edges, bwt, sequence, None))
285            }
286        };
287
288        // Cluster the handles in the subgraph into reasonable intervals. Because node
289        // i corresponds to handles 2i and 2i+1, it is easier to work with node ids.
290        let node_ids: Vec<usize> = subgraph.node_iter().collect();
291        let clusters = utils::cluster_node_ids(node_ids, Self::CLUSTER_GAP_THRESHOLD);
292        let clusters: Vec<(usize, usize)> = clusters.into_iter()
293            .map(|r| (support::encode_node(*r.start(), Orientation::Forward), support::encode_node(*r.end(), Orientation::Reverse)))
294            .collect();
295        read_set.clusters = clusters.len();
296
297        // Get the reads that may overlap with the subgraph. We keep track of row ids
298        // we have encountered to avoid duplicating reads that overlap multiple clusters.
299        let mut row_ids: HashSet<usize> = HashSet::new();
300        let mut get_reads = database.connection.prepare(
301            "SELECT id, min_handle, max_handle, alignments, read_length, gbwt_starts, names, quality_strings, difference_strings, flags, numbers, optional
302            FROM Alignments
303            WHERE min_handle <= ?1 AND max_handle >= ?2"
304        ).map_err(|x| x.to_string())?;
305        for (min_handle, max_handle) in clusters.into_iter() {
306            let mut rows = get_reads.query((max_handle, min_handle)).map_err(|x| x.to_string())?;
307            while let Some(row) = rows.next().map_err(|x| x.to_string())? {
308                let row_id = Self::get_row_id(row)?;
309                if row_ids.contains(&row_id) {
310                    continue;
311                }
312                row_ids.insert(row_id);
313                let block = Self::decompress_block(row, 1)?;
314                for mut alignment in block {
315                    read_set.set_target_path(&mut alignment, subgraph, &mut get_record, output == AlignmentOutput::Contained)?;
316                    if alignment.has_target_path() {
317                        if output == AlignmentOutput::Clipped {
318                            let sequence_len = Arc::new(|handle| {
319                                let record = read_set.nodes.get(&handle)?;
320                                Some(record.sequence().len())
321                            });
322                            let clipped = alignment.clip(subgraph, sequence_len)?;
323                            for aln in clipped.into_iter() {
324                                read_set.reads.push(aln);
325                            }
326                        } else {
327                            read_set.reads.push(alignment);
328                        }
329                        read_set.unclipped += 1;
330                    }
331                }
332                read_set.blocks += 1;
333            }
334        }
335
336        Ok(read_set)
337    }
338
339    /// Extracts all reads from the given range of row ids.
340    ///
341    /// The extracted reads will be in the same order as in the database.
342    /// That corresponds to the order in the original GAF file.
343    ///
344    /// # Arguments
345    ///
346    /// * `database`: A database storing reads aligned to the graph.
347    /// * `row_range`: The range of row ids to extract.
348    /// * `graph`: A GBZ graph if the database is reference-based, or [`None`] for a reference-free one.
349    ///
350    /// # Errors
351    ///
352    /// Passes through any database errors.
353    /// Returns an error if an alignment cannot be decompressed.
354    pub fn from_rows(database: &GAFBase, row_range: Range<usize>, graph: Option<&GBZ>) -> Result<Self, String> {
355        let mut read_set = ReadSet { clusters: 1, ..Default::default() };
356
357        // Build a record from the GAF-base, with the sequence possibly from the GBZ graph.
358        let mut get_node = database.connection.prepare(
359            "SELECT edges, bwt, sequence FROM Nodes WHERE handle = ?1"
360        ).map_err(|x| x.to_string())?;
361        let mut get_record = |handle: usize| -> Result<GBZRecord, String> {
362            // Get the edges and the BWT fragment from the GAF-base.
363            let gaf_result = get_node.query_row(
364                (handle,),
365                |row: &Row<'_>| -> rusqlite::Result<(Vec<Pos>, Vec<u8>, Vec<u8>)> {
366                    let edge_bytes: Vec<u8> = row.get(0)?;
367                    let (edges, _) = Record::decompress_edges(&edge_bytes).unwrap();
368                    let bwt: Vec<u8> = row.get(1)?;
369                    let encoded_sequence: Vec<u8> = row.get(2)?;
370                    let sequence = utils::decode_sequence(&encoded_sequence);
371                    Ok((edges, bwt, sequence))
372                }
373            ).optional().map_err(|x| x.to_string())?;
374            if gaf_result.is_none() {
375                return Err(format!("Could not find the record for handle {} in GAF-base", handle));
376            }
377            let (edges, bwt, mut sequence) = gaf_result.unwrap();
378            if sequence.is_empty() {
379                if let Some(graph) = graph {
380                    let seq = graph.sequence(support::node_id(handle)).ok_or_else(||
381                        format!("Could not find the sequence for handle {} in GBZ", handle)
382                    )?;
383                    sequence = seq.to_vec();
384                } else {
385                    return Err(String::from("No reference provided for a reference-based GAF-base"));
386                }
387            }
388            if support::node_orientation(handle) == Orientation::Reverse {
389                sequence = support::reverse_complement(&sequence);
390            }
391
392            unsafe {
393                Ok(GBZRecord::from_raw_parts(handle, edges, bwt, sequence, None))
394            }
395        };
396
397        // Get the reads in the given range of row ids.
398        let mut get_reads = database.connection.prepare(
399            "SELECT min_handle, max_handle, alignments, read_length, gbwt_starts, names, quality_strings, difference_strings, flags, numbers, optional
400            FROM Alignments
401            WHERE id >= ?1 AND id < ?2"
402        ).map_err(|x| x.to_string())?;
403        let mut rows = get_reads.query((row_range.start, row_range.end)).map_err(|x| x.to_string())?;
404        while let Some(row) = rows.next().map_err(|x| x.to_string())? {
405            let block = Self::decompress_block(row, 0)?;
406            for mut alignment in block {
407                read_set.set_target_path_simple(&mut alignment, &mut get_record)?;
408                if alignment.has_target_path() {
409                    read_set.reads.push(alignment);
410                    read_set.unclipped += 1;
411                }
412            }
413            read_set.blocks += 1;
414        }
415
416        Ok(read_set)
417    }
418
419    /// Returns the number of alignment fragments in the set.
420    #[inline]
421    pub fn len(&self) -> usize {
422        self.reads.len()
423    }
424
425    /// Returns `true` if the set is empty.
426    #[inline]
427    pub fn is_empty(&self) -> bool {
428        self.reads.is_empty()
429    }
430
431    /// Returns the original number of alignments (before clipping) in the set.
432    #[inline]
433    pub fn unclipped(&self) -> usize {
434        self.unclipped
435    }
436
437    /// Returns the number of alignment blocks decompressed when creating the read set.
438    #[inline]
439    pub fn blocks(&self) -> usize {
440        self.blocks
441    }
442
443    /// Returns the number of node records in the read set.
444    ///
445    /// Each record corresponds to an oriented node, and the opposite orientation may not be present.
446    /// This includes all node records encountered while tracing the alignments, even when the alignment was not included in the read set.
447    #[inline]
448    pub fn node_records(&self) -> usize {
449        self.nodes.len()
450    }
451
452    /// Returns the number of node id clusters in the subgraph.
453    #[inline]
454    pub fn clusters(&self) -> usize {
455        self.clusters
456    }
457
458    /// Returns an iterator over the reads in the set.
459    #[inline]
460    pub fn iter(&self) -> impl Iterator<Item = &Alignment> {
461        self.reads.iter()
462    }
463
464    // Extracts the target sequence for the given alignment.
465    fn target_sequence(&self, alignment: &Alignment) -> Result<Vec<u8>, String> {
466        let target_path = alignment.target_path();
467        if target_path.is_none() {
468            return Ok(Vec::new());
469        }
470        let target_path = target_path.unwrap();
471
472        let mut sequence = Vec::new();
473        for handle in target_path{
474            let record = self.nodes.get(handle);
475            if record.is_none() {
476                return Err(format!("Read {}: Missing record for node handle {}", alignment.name, handle));
477            }
478            let record = record.unwrap();
479            sequence.extend_from_slice(record.sequence());
480        }
481
482        if sequence.len() != alignment.path_len {
483            return Err(format!(
484                "Read {}: Target path length {} does not match the expected length {}",
485                alignment.name, sequence.len(), alignment.path_len
486            ));
487        }
488        Ok(sequence)
489    }
490
491    /// Serializes the read set in the GAF format.
492    ///
493    /// The output does not include any header lines, as the GAF file may consist of multiple read sets.
494    /// Returns an error if the target sequence for a read is invalid or cannot be determined.
495    /// Passes through any I/O errors.
496    pub fn to_gaf<W: Write>(&self, writer: &mut W) -> Result<(), String> {
497        for alignment in self.reads.iter() {
498            let target_sequence = self.target_sequence(alignment)?;
499            let mut line = alignment.to_gaf(&target_sequence);
500            line.push(b'\n');
501            writer.write_all(&line).map_err(|x| x.to_string())?;
502        }
503        Ok(())
504    }
505}
506
507//-----------------------------------------------------------------------------