Skip to main content

gbz_base/
db.rs

1//! GBZ-base and GAF-base: SQLite databases storing a GBZ graph and sequence alignments to the graph.
2
3use crate::{Alignment, AlignmentBlock};
4use crate::formats::{self, JSONValue};
5use crate::utils::{self, PathStartSource};
6
7use std::io::BufRead;
8use std::path::Path;
9use std::sync::{mpsc, Arc};
10use std::{fs, thread};
11
12use rusqlite::{Connection, OpenFlags, OptionalExtension, Row, Statement};
13
14use gbz::{FullPathName, GBWT, GBWTBuilder, GBZ, Orientation, Pos};
15use gbz::algorithms;
16use gbz::bwt::{BWT, Record};
17use gbz::support::{self, Tags, Chains};
18
19use pggname::GraphName;
20
21use simple_sds::{binaries, serialize};
22
23#[cfg(test)]
24mod tests;
25
26//-----------------------------------------------------------------------------
27
28/// A database connection to a GBZ-base database.
29///
30/// This structure stores a database connection and some header information.
31/// In multi-threaded applications, each thread should have its own connection.
32/// Graph operations are supported through the [`GraphInterface`] structure.
33///
34/// # Examples
35///
36/// ```
37/// use gbz_base::GBZBase;
38/// use gbz::support;
39/// use simple_sds::{binaries, serialize};
40/// use std::fs;
41///
42/// // Create the database.
43/// let gbz_file = support::get_test_data("example.gbz");
44/// let db_file = serialize::temp_file_name("gbz-base");
45/// assert!(!binaries::file_exists(&db_file));
46/// // We find top-level chains from the graph instead of providing them in a separate file.
47/// let result = GBZBase::create_from_files(&gbz_file, None, &db_file);
48/// assert!(result.is_ok());
49///
50/// // Open the database and check some header information.
51/// let database = GBZBase::open(&db_file).unwrap();
52/// assert_eq!(database.nodes(), 12);
53/// assert_eq!(database.samples(), 2);
54/// assert_eq!(database.haplotypes(), 3);
55/// assert_eq!(database.contigs(), 2);
56/// assert_eq!(database.paths(), 6);
57///
58/// // Clean up.
59/// drop(database);
60/// fs::remove_file(&db_file).unwrap();
61/// ```
62#[derive(Debug)]
63pub struct GBZBase {
64    connection: Connection,
65    version: String,
66    nodes: usize,
67    chains: usize,
68    chain_links: usize,
69    paths: usize,
70    samples: usize,
71    haplotypes: usize,
72    contigs: usize,
73}
74
75/// Using the database.
76impl GBZBase {
77    /// Index positions at the start of a node on reference paths approximately every this many base pairs.
78    pub const INDEX_INTERVAL: usize = 1000;
79
80    // Key for database version.
81    const KEY_VERSION: &'static str = "version";
82
83    /// Current database version.
84    pub const VERSION: &'static str = "GBZ-base v0.4.0";
85
86    // Key for node count.
87    const KEY_NODES: &'static str = "nodes";
88
89    // Key for chain count.
90    const KEY_CHAINS: &'static str = "chains";
91
92    // Key for the total number of links in the chains.
93    const KEY_CHAIN_LINKS: &'static str = "chain_links";
94
95    // Key for path count.
96    const KEY_PATHS: &'static str = "paths";
97
98    // Key for sample count.
99    const KEY_SAMPLES: &'static str = "samples";
100
101    // Key for sample count.
102    const KEY_HAPLOTYPES: &'static str = "haplotypes";
103
104    // Key for sample count.
105    const KEY_CONTIGS: &'static str = "contigs";
106
107    // Prefix for GBWT tag keys.
108    const KEY_GBWT: &'static str = "gbwt_";
109
110    // Prefix for GBZ tag keys.
111    const KEY_GBZ: &'static str = "gbz_";
112
113    /// Opens a connection to the database in the given file.
114    ///
115    /// Reads the header information and passes through any database errors.
116    pub fn open<P: AsRef<Path>>(filename: P) -> Result<Self, String> {
117        let flags = OpenFlags::SQLITE_OPEN_READ_ONLY | OpenFlags::SQLITE_OPEN_NO_MUTEX;
118        let connection = Connection::open_with_flags(filename, flags).map_err(|x| x.to_string())?;
119
120        // Get some header information.
121        let mut get_tag = connection.prepare(
122            "SELECT value FROM Tags WHERE key = ?1"
123        ).map_err(|x| x.to_string())?;
124        let version = get_string_value(&mut get_tag, Self::KEY_VERSION)?;
125        if version != Self::VERSION {
126            return Err(format!("Unsupported database version: {} (expected {})", version, Self::VERSION));
127        }
128        let nodes = get_numeric_value(&mut get_tag, Self::KEY_NODES)?;
129        let chains = get_numeric_value(&mut get_tag, Self::KEY_CHAINS)?;
130        let chain_links = get_numeric_value(&mut get_tag, Self::KEY_CHAIN_LINKS)?;
131        let paths = get_numeric_value(&mut get_tag, Self::KEY_PATHS)?;
132        let samples = get_numeric_value(&mut get_tag, Self::KEY_SAMPLES)?;
133        let haplotypes = get_numeric_value(&mut get_tag, Self::KEY_HAPLOTYPES)?;
134        let contigs = get_numeric_value(&mut get_tag, Self::KEY_CONTIGS)?;
135        drop(get_tag);
136
137        Ok(GBZBase {
138            connection,
139            version,
140            nodes, chains, chain_links,
141            paths, samples, haplotypes, contigs,
142        })
143    }
144
145    /// Returns the filename of the database or an error if there is no filename.
146    pub fn filename(&self) -> Option<&str> {
147        self.connection.path()
148    }
149
150    /// Returns the size of the database file in a human-readable format.
151    pub fn file_size(&self) -> Option<String> {
152        let filename = self.filename()?;
153        utils::file_size(filename)
154    }
155
156    /// Returns the version of the database.
157    pub fn version(&self) -> &str {
158        &self.version
159    }
160
161    /// Returns the number of nodes in the graph.
162    pub fn nodes(&self) -> usize {
163        self.nodes
164    }
165
166    /// Returns the number of top-level chains with stored links between boundary nodes.
167    pub fn chains(&self) -> usize {
168        self.chains
169    }
170
171    /// Returns the total number of links in the top-level chains.
172    pub fn chain_links(&self) -> usize {
173        self.chain_links
174    }
175
176    /// Returns the number of paths in the graph.
177    pub fn paths(&self) -> usize {
178        self.paths
179    }
180
181    /// Returns the number of samples in path metadata.
182    pub fn samples(&self) -> usize {
183        self.samples
184    }
185
186    /// Returns the number of haplotypes in path metadata.
187    pub fn haplotypes(&self) -> usize {
188        self.haplotypes
189    }
190
191    /// Returns the number of contigs in path metadata.
192    pub fn contigs(&self) -> usize {
193        self.contigs
194    }
195}
196
197//-----------------------------------------------------------------------------
198
199/// Creating the database.
200impl GBZBase {
201    /// Creates a new database from the input files.
202    ///
203    /// # Arguments
204    ///
205    /// * `gbz_file`: Name of the file containing the GBZ graph.
206    /// * `chains_file`: Name of the file containing top-level chains.
207    ///   If not provided, this tries to find the chains using [`algorithms::find_chains`] that works with Minigraph–Cactus graphs.
208    ///   Use [`Self::create`] with empty chains to build a database without chains.
209    /// * `db_file`: Name of the database file to be created.
210    ///
211    /// # Errors
212    ///
213    /// Returns an error if the database already exists or if the GBZ graph does not contain sufficient metadata.
214    /// Passes through any database errors.
215    pub fn create_from_files(gbz_file: &Path, chains_file: Option<&Path>, db_file: &Path) -> Result<(), String> {
216        eprintln!("Loading GBZ graph {}", gbz_file.display());
217        let graph: GBZ = serialize::load_from(gbz_file).map_err(|x| x.to_string())?;
218        let chains = if let Some(filename) = chains_file {
219            eprintln!("Loading top-level chain file {}", filename.display());
220            serialize::load_from(filename).map_err(|x| x.to_string())?
221        } else {
222            eprintln!("Finding top-level chains in the graph");
223            let chains = algorithms::find_chains(&graph);
224            let components = if let Some(components) = chains.components() {
225                components.to_string()
226            } else {
227                String::from("an unknown number of")
228            };
229            eprintln!("Found {} chains with {} links for {} components", chains.len(), chains.links(), components);
230            chains
231        };
232        Self::create(&graph, &chains, db_file)
233    }
234
235    // Sanity checks for the GBZ graph. We do not want to handle graphs without sufficient metadata.
236    fn sanity_checks(graph: &GBZ) -> Result<(), String> {
237        let metadata = graph.metadata().ok_or_else(||
238            String::from("The graph does not contain metadata")
239        )?;
240
241        if !metadata.has_path_names() {
242            return Err("The metadata does not contain path names".to_string());
243        }
244        if !metadata.has_sample_names() {
245            return Err("The metadata does not contain sample names".to_string());
246        }
247        if !metadata.has_contig_names() {
248            return Err("The metadata does not contain contig names".to_string());
249        }
250
251        Ok(())
252    }
253
254    /// Creates a new database from the given inputs.
255    ///
256    /// # Arguments
257    ///
258    /// * `graph`: GBZ graph.
259    /// * `chains`: Top-level chains.
260    /// * `filename`: Name of the database file to be created.
261    ///
262    /// # Errors
263    ///
264    /// Returns an error if the database already exists or if the GBZ graph does not contain sufficient metadata.
265    /// Passes through any database errors.
266    pub fn create<P: AsRef<Path>>(graph: &GBZ, chains: &Chains, filename: P) -> Result<(), String> {
267        eprintln!("Creating database {}", filename.as_ref().display());
268        if binaries::file_exists(&filename) {
269            return Err(format!("Database {} already exists", filename.as_ref().display()));
270        }
271        Self::sanity_checks(graph)?;
272
273        let mut connection = Connection::open(filename).map_err(|x| x.to_string())?;
274        Self::insert_tags(graph, chains, &mut connection).map_err(|x| x.to_string())?;
275        Self::insert_nodes(graph, chains, &mut connection).map_err(|x| x.to_string())?;
276        Self::insert_paths(graph, &mut connection).map_err(|x| x.to_string())?;
277        Self::index_reference_paths(graph, &mut connection).map_err(|x| x.to_string())?;
278        Ok(())
279    }
280
281    fn insert_tags(graph: &GBZ, chains: &Chains, connection: &mut Connection) -> rusqlite::Result<()> {
282        eprintln!("Inserting header and tags");
283
284        // Create the tags table.
285        connection.execute(
286            "CREATE TABLE Tags (
287                key TEXT PRIMARY KEY,
288                value TEXT NOT NULL
289            ) STRICT",
290            (),
291        )?;
292
293        // Insert header and tags.
294        let mut inserted = 0;
295        let transaction = connection.transaction()?;
296        {
297            let mut insert = transaction.prepare(
298                "INSERT INTO Tags(key, value) VALUES (?1, ?2)"
299            )?;
300
301            // Header.
302            let metadata = graph.metadata().unwrap();
303            insert.execute((Self::KEY_VERSION, Self::VERSION))?;
304            insert.execute((Self::KEY_NODES, graph.nodes()))?;
305            insert.execute((Self::KEY_CHAINS, chains.len()))?;
306            insert.execute((Self::KEY_CHAIN_LINKS, chains.links()))?;
307            insert.execute((Self::KEY_PATHS, metadata.paths()))?;
308            insert.execute((Self::KEY_SAMPLES, metadata.samples()))?;
309            insert.execute((Self::KEY_HAPLOTYPES, metadata.haplotypes()))?;
310            insert.execute((Self::KEY_CONTIGS, metadata.contigs()))?;
311            inserted += 6;
312
313            // GBWT tags.
314            let index: &GBWT = graph.as_ref();
315            for (key, value) in index.tags().iter() {
316                let key = format!("{}{}", Self::KEY_GBWT, key);
317                insert.execute((key, value))?;
318                inserted += 1;
319            }
320
321            // GBZ tags.
322            for (key, value) in graph.tags().iter() {
323                let key = format!("{}{}", Self::KEY_GBZ, key);
324                insert.execute((key, value))?;
325                inserted += 1;
326            }
327        }
328        transaction.commit()?;
329
330        eprintln!("Inserted {} key-value pairs", inserted);
331        Ok(())
332    }
333
334    fn insert_nodes(graph: &GBZ, chains: &Chains, connection: &mut Connection) -> rusqlite::Result<()> {
335        eprintln!("Inserting nodes");
336
337        // Create the nodes table.
338        connection.execute(
339            "CREATE TABLE Nodes (
340                handle INTEGER PRIMARY KEY,
341                edges BLOB NOT NULL,
342                bwt BLOB NOT NULL,
343                sequence BLOB NOT NULL,
344                next INTEGER
345            ) STRICT",
346            (),
347        )?;
348
349        // Insert the nodes.
350        let mut inserted = 0;
351        let transaction = connection.transaction()?;
352        {
353            let mut insert = transaction.prepare(
354                "INSERT INTO Nodes(handle, edges, bwt, sequence, next) VALUES (?1, ?2, ?3, ?4, ?5)"
355            )?;
356            let index: &GBWT = graph.as_ref();
357            let bwt: &BWT = index.as_ref();
358            for node_id in graph.node_iter() {
359                // Forward orientation.
360                let forward_id = support::encode_node(node_id, Orientation::Forward);
361                let record_id = index.node_to_record(forward_id);
362                let (edge_bytes, bwt_bytes) = bwt.compressed_record(record_id).unwrap();
363                let sequence = graph.sequence(node_id).unwrap();
364                let encoded_sequence = utils::encode_sequence(sequence);
365                let next: Option<usize> = chains.next(forward_id);
366                insert.execute((forward_id, edge_bytes, bwt_bytes, encoded_sequence, next))?;
367                inserted += 1;
368        
369                // Reverse orientation.
370                let reverse_id = support::encode_node(node_id, Orientation::Reverse);
371                let record_id = index.node_to_record(reverse_id);
372                let (edge_bytes, bwt_bytes) = bwt.compressed_record(record_id).unwrap();
373                let sequence = support::reverse_complement(sequence);
374                let encoded_sequence = utils::encode_sequence(&sequence);
375                let next: Option<usize> = chains.next(reverse_id);
376                insert.execute((reverse_id, edge_bytes, bwt_bytes, encoded_sequence, next))?;
377                inserted += 1;
378            }
379        }
380        transaction.commit()?;
381
382        eprintln!("Inserted {} node records", inserted);
383        Ok(())
384    }
385
386    fn insert_paths(graph: &GBZ, connection: &mut Connection) -> rusqlite::Result<()> {
387        eprintln!("Inserting paths");
388
389        // Create the paths table.
390        connection.execute(
391            "CREATE TABLE Paths (
392                handle INTEGER PRIMARY KEY,
393                fw_node INTEGER NOT NULL,
394                fw_offset INTEGER NOT NULL,
395                rev_node INTEGER NOT NULL,
396                rev_offset INTEGER NOT NULL,
397                sample TEXT NOT NULL,
398                contig TEXT NOT NULL,
399                haplotype INTEGER NOT NULL,
400                fragment INTEGER NOT NULL,
401                is_indexed INTEGER NOT NULL
402            ) STRICT",
403            (),
404        )?;
405
406        // Insert path starts and metadata.
407        let mut inserted = 0;
408        let transaction = connection.transaction()?;
409        {
410            let mut insert = transaction.prepare(
411                "INSERT INTO
412                    Paths(handle, fw_node, fw_offset, rev_node, rev_offset, sample, contig, haplotype, fragment, is_indexed)
413                    VALUES (?1, ?2, ?3, ?4, ?5, ?6, ?7, ?8, ?9, FALSE)"
414            )?;
415            let index: &GBWT = graph.as_ref();
416            let metadata = graph.metadata().unwrap();
417            for handle in 0..metadata.paths() {
418                let fw_start = path_start(index, handle, Orientation::Forward);
419                let rev_start = path_start(index, handle, Orientation::Reverse);
420                let name = FullPathName::from_metadata(metadata, handle).unwrap();
421                insert.execute((
422                    handle,
423                    fw_start.node, fw_start.offset,
424                    rev_start.node, rev_start.offset,
425                    name.sample, name.contig, name.haplotype, name.fragment
426                ))?;
427                inserted += 1;
428            }
429        }
430        transaction.commit()?;
431
432        eprintln!("Inserted information on {} paths", inserted);
433        Ok(())
434    }
435
436    fn index_reference_paths(graph: &GBZ, connection: &mut Connection) -> rusqlite::Result<()> {
437        eprintln!("Indexing reference paths");
438
439        // Create the reference path table.
440        connection.execute(
441            "CREATE TABLE ReferenceIndex (
442                path_handle INTEGER NOT NULL,
443                path_offset INTEGER NOT NULL,
444                node_handle INTEGER NOT NULL,
445                node_offset INTEGER NOT NULL,
446                PRIMARY KEY (path_handle, path_offset)
447            ) STRICT",
448            (),
449        )?;
450
451        // NOTE: If a reference haplotype is fragmented, the indexed positions will be relative
452        // to the start of each fragment.
453        let reference_paths = graph.reference_positions(Self::INDEX_INTERVAL, true);
454        if reference_paths.is_empty() {
455            eprintln!("No reference paths to index");
456            return Ok(());
457        }
458
459        // Insert the reference positions into the database.
460        eprintln!("Inserting indexed positions");
461        let transaction = connection.transaction()?;
462        {
463            let mut set_as_indexed = transaction.prepare(
464                "UPDATE Paths SET is_indexed = TRUE WHERE handle = ?1"
465            )?;
466            let mut insert_position = transaction.prepare(
467                "INSERT INTO ReferenceIndex(path_handle, path_offset, node_handle, node_offset)
468                VALUES (?1, ?2, ?3, ?4)"
469            )?;
470            for ref_path in reference_paths.iter() {
471                set_as_indexed.execute((ref_path.id,))?;
472                for (path_offset, gbwt_position) in ref_path.positions.iter() {
473                    insert_position.execute((ref_path.id, path_offset, gbwt_position.node, gbwt_position.offset))?;
474                }
475            }
476        }
477        transaction.commit()?;
478
479        Ok(())
480    }
481}
482
483//-----------------------------------------------------------------------------
484
485/// A database connection to a GAF-base database.
486///
487/// This structure stores a database connection and some header information.
488/// In multi-threaded applications, each thread should have its own connection.
489/// A set of alignments overlapping with a subgraph can be extracted using the [`crate::ReadSet`] structure.
490///
491/// # Examples
492///
493/// ```
494/// use gbz_base::{GAFBase, GAFBaseParams, GraphReference};
495/// use gbz_base::utils;
496/// use simple_sds::serialize;
497///
498/// let gaf_file = utils::get_test_data("micb-kir3dl1_HG003.gaf.gz");
499/// let gbwt_file = None; // Build a new GBWT index.
500/// let db_file = serialize::temp_file_name("gaf-base");
501///
502/// // Create a database that requires a reference graph to use.
503/// let graph = GraphReference::None;
504/// let params = GAFBaseParams::default();
505/// let db = GAFBase::create_from_files(&gaf_file, gbwt_file, &db_file, graph, &params);
506/// assert!(db.is_ok());
507///
508/// // Now open it and check some statistics.
509/// let db = GAFBase::open(&db_file);
510/// assert!(db.is_ok());
511/// let db = db.unwrap();
512/// assert_eq!(db.nodes(), 2291);
513/// assert_eq!(db.alignments(), 12439);
514/// assert!(!db.bidirectional_gbwt());
515///
516/// drop(db);
517/// let _ = std::fs::remove_file(&db_file);
518/// ```
519#[derive(Debug)]
520pub struct GAFBase {
521    pub(crate) connection: Connection,
522    version: String,
523    nodes: usize,
524    alignments: usize,
525    blocks: usize,
526    bidirectional_gbwt: bool,
527    tags: Tags,
528}
529
530/// Using the database.
531impl GAFBase {
532    // Key for database version.
533    const KEY_VERSION: &'static str = "version";
534
535    /// Current database version.
536    pub const VERSION: &'static str = "GAF-base version 3";
537
538    // Key for node count.
539    const KEY_NODES: &'static str = "nodes";
540
541    // Key for alignment count.
542    const KEY_ALIGNMENTS: &'static str = "alignments";
543
544    // Key for bidirectional GBWT flag.
545    const KEY_BIDIRECTIONAL_GBWT: &'static str = "bidirectional_gbwt";
546
547    // Key for database parameters.
548    const KEY_PARAMS: &'static str = "params";
549
550    fn get_string_value(tags: &Tags, key: &str) -> String {
551        tags.get(key).cloned().unwrap_or_default()
552    }
553
554    fn get_numeric_value(tags: &Tags, key: &str) -> Result<usize, String> {
555        let value = Self::get_string_value(tags, key);
556        value.parse::<usize>().map_err(|x| format!("Invalid numeric value for key {}: {}", key, x))
557    }
558
559    fn get_boolean_value(tags: &Tags, key: &str) -> Result<bool, String> {
560        let value = Self::get_numeric_value(tags, key)?;
561        match value {
562            0 => Ok(false),
563            1 => Ok(true),
564            _ => Err(format!("Invalid boolean value for key {}: {}", key, value)),
565        }
566    }
567
568    /// Opens a connection to the database in the given file.
569    ///
570    /// Reads the header information and passes through any database errors.
571    pub fn open<P: AsRef<Path>>(filename: P) -> Result<Self, String> {
572        let flags = OpenFlags::SQLITE_OPEN_READ_ONLY | OpenFlags::SQLITE_OPEN_NO_MUTEX;
573        let connection = Connection::open_with_flags(filename, flags).map_err(|x| x.to_string())?;
574
575        // Read all tags from the database.
576        let mut get_tags = connection.prepare(
577            "SELECT key, value FROM Tags"
578        ).map_err(|x| x.to_string())?;
579        let mut tags = Tags::new();
580        let mut rows = get_tags.query(()).map_err(|x| x.to_string())?;
581        while let Some(row) = rows.next().map_err(|x| x.to_string())? {
582            let key: String = row.get(0).map_err(|x| x.to_string())?;
583            let value: String = row.get(1).map_err(|x| x.to_string())?;
584            tags.insert(&key, &value);
585        }
586        drop(rows);
587        drop(get_tags);
588
589        let version = Self::get_string_value(&tags, Self::KEY_VERSION);
590        if version != Self::VERSION {
591            return Err(format!("Unsupported database version: {} (expected {})", version, Self::VERSION));
592        }
593        let nodes = Self::get_numeric_value(&tags, Self::KEY_NODES)?;
594        let alignments = Self::get_numeric_value(&tags, Self::KEY_ALIGNMENTS)?;
595        let bidirectional_gbwt = Self::get_boolean_value(&tags, Self::KEY_BIDIRECTIONAL_GBWT)?;
596
597        // Also determine the number of rows in the Alignments table.
598        let mut count_rows = connection.prepare(
599            "SELECT COUNT(*) FROM Alignments"
600        ).map_err(|x| x.to_string())?;
601        let blocks = count_rows.query_row((), |row|
602            row.get::<_, usize>(0)
603        ).map_err(|x| x.to_string())?;
604        drop(count_rows);
605
606        Ok(GAFBase {
607            connection,
608            version,
609            nodes, alignments, blocks, bidirectional_gbwt,
610            tags,
611        })
612    }
613
614    /// Returns the filename of the database or an error if there is no filename.
615    pub fn filename(&self) -> Option<&str> {
616        self.connection.path()
617    }
618
619    /// Returns the size of the database file in a human-readable format.
620    pub fn file_size(&self) -> Option<String> {
621        let filename = self.filename()?;
622        utils::file_size(filename)
623    }
624
625    /// Returns the version of the database.
626    pub fn version(&self) -> &str {
627        &self.version
628    }
629
630    /// Returns the number of nodes in the graph.
631    pub fn nodes(&self) -> usize {
632        self.nodes
633    }
634
635    /// Returns the number of alignments in the database.
636    pub fn alignments(&self) -> usize {
637        self.alignments
638    }
639
640    /// Returns the number of database rows storing the alignments.
641    ///
642    /// Each row corresponds to an [`AlignmentBlock`].
643    pub fn blocks(&self) -> usize {
644        self.blocks
645    }
646
647    /// Returns `true` if the paths are stored in a bidirectional GBWT.
648    pub fn bidirectional_gbwt(&self) -> bool {
649        self.bidirectional_gbwt
650    }
651
652    // TODO: Parse tag `KEY_PARAMS` and have an interface to report the database construction parameters.
653
654    /// Returns all tags stored in the database.
655    pub fn tags(&self) -> &Tags {
656        &self.tags
657    }
658
659    /// Returns the stable graph name (pggname) for the graph used as the reference for the alignments.
660    ///
661    /// Returns an error if the tags cannot be parsed.
662    pub fn graph_name(&self) -> Result<GraphName, String> {
663        GraphName::from_tags(self.tags())
664    }
665}
666
667//-----------------------------------------------------------------------------
668
669// Statistics for the encoded alignments in the database.
670#[derive(Debug, Clone, Copy, PartialEq, Eq)]
671 struct AlignmentStats {
672    pub alignments: usize,
673    pub start_bytes: usize,
674    pub name_bytes: usize,
675    pub quality_bytes: usize,
676    pub difference_bytes: usize,
677    pub flag_bytes: usize,
678    pub number_bytes: usize,
679    pub optional_bytes: usize,
680}
681
682impl AlignmentStats {
683    pub fn new() -> Self {
684        Self {
685            alignments: 0,
686            start_bytes: 0,
687            name_bytes: 0,
688            quality_bytes: 0,
689            difference_bytes: 0,
690            flag_bytes: 0,
691            number_bytes: 0,
692            optional_bytes: 0,
693        }
694    }
695
696    pub fn update(&mut self, block: &AlignmentBlock) {
697        self.alignments += block.len();
698        self.start_bytes += block.gbwt_starts.len();
699        self.name_bytes += block.names.len();
700        self.quality_bytes += block.quality_strings.len();
701        self.difference_bytes += block.difference_strings.len();
702        self.flag_bytes += block.flags.bytes();
703        self.number_bytes += block.numbers.len();
704        self.optional_bytes += block.optional.len();
705    }
706}
707
708/// GAF-base construction parameters.
709#[derive(Debug, Clone, PartialEq, Eq)]
710pub struct GAFBaseParams {
711    /// Number of alignments in a block (database row).
712    ///
713    /// Default: [`Self::BLOCK_SIZE`].
714    /// Note that values `0` and `1` are functionally equivalent.
715    pub block_size: usize,
716
717    /// GBWT construction buffer size in nodes.
718    ///
719    /// Default: [`Self::GBWT_BUFFER_SIZE`].
720    /// Note that the buffer grows automatically if a path is longer than the initial buffer size.
721    pub gbwt_buffer_size: usize,
722
723    /// Build a reference-free GAF-base that stores sequences in table `Nodes`.
724    ///
725    /// Default: `false`.
726    pub reference_free: bool,
727
728    /// Store base quality strings for the alignments.
729    ///
730    /// Default: `true`.
731    pub store_quality_strings: bool,
732
733    /// Store unknown optional fields with the alignments.
734    ///
735    /// Default: `true`.
736    pub store_optional_fields: bool,
737}
738
739impl GAFBaseParams {
740    /// Default block size in alignments.
741    pub const BLOCK_SIZE: usize = 1000;
742
743    /// Default GBWT construction buffer size in nodes.
744    pub const GBWT_BUFFER_SIZE: usize = 100_000_000;
745
746    // TODO: from_json
747
748    /// Returns a JSON description of the parameters that can be stored as a tag.
749    pub fn to_json(&self) -> String {
750        let mut fields = Vec::new();
751        if self.reference_free {
752            fields.push(JSONValue::String(String::from("reference_free")));
753        }
754        if self.store_quality_strings {
755            fields.push(JSONValue::String(String::from("quality_strings")));
756        }
757        if self.store_optional_fields {
758            fields.push(JSONValue::String(String::from("optional")));
759        }
760        let fields = JSONValue::Array(fields);
761        let json = JSONValue::Object(vec![
762            (String::from("block_size"), JSONValue::Number(self.block_size)),
763            (String::from("fields"), fields),
764        ]);
765        json.to_string()
766    }
767}
768
769impl Default for GAFBaseParams {
770    fn default() -> Self {
771        Self {
772            block_size: Self::BLOCK_SIZE,
773            gbwt_buffer_size: Self::GBWT_BUFFER_SIZE,
774            reference_free: false,
775            store_quality_strings: true,
776            store_optional_fields: true,
777        }
778    }
779}
780
781//-----------------------------------------------------------------------------
782
783// TODO: If we are willing to use a reference, we could reuse `edges` in the `Nodes` table from it.
784
785/// Creating the database.
786impl GAFBase {
787    /// Creates a new database from the alignments in file `gaf_file` and stores the database in file `db_file`.
788    ///
789    /// A unidirectional or bidirectional GBWT index can be provided in file `gbwt_file`.
790    /// Path `i` in the GBWT index corresponds to line `i` in the GAF file.
791    /// If a GBWT file is not provided, a new unidirectional index will be built in the background.
792    /// This will use a significant amount of memory, but it should not be the bottleneck of the construction.
793    ///
794    /// If the parameters indicate that node sequences should be stored in the database, a GBZ-compatible graph must be provided.
795    ///
796    /// # Arguments
797    ///
798    /// * `gaf_file`: GAF file storing the alignments. Can be gzip-compressed.
799    /// * `gbwt_file`: An optional GBWT file storing the target paths.
800    /// * `db_file`: Output database file.
801    /// * `graph`: A GBZ-compatible graph for building a reference-free database, or [`GraphReference::None`] for a reference-based one.
802    /// * `params`: Construction parameters.
803    ///
804    /// # Errors
805    ///
806    /// Returns an error, if:
807    ///
808    /// * The GAF file does not exist.
809    /// * The database already exists.
810    /// * Trying to build a reference-free GAF-base without a graph.
811    /// * The graph is not a valid reference for the alignments.
812    ///
813    /// Passes through any I/O, database, and construction errors.
814    pub fn create_from_files(
815        gaf_file: &Path, gbwt_file: Option<&Path>, db_file: &Path,
816        graph: GraphReference<'_, '_>,
817        params: &GAFBaseParams
818    ) -> Result<(), String> {
819        if let Some(gbwt_file) = gbwt_file {
820            eprintln!("Loading GBWT index {}", gbwt_file.display());
821            let index: Arc<GBWT> = Arc::new(serialize::load_from(gbwt_file).map_err(|x| x.to_string())?);
822            Self::create(gaf_file, Some(index), db_file, graph, params)
823        } else {
824            Self::create(gaf_file, None, db_file, graph, params)
825        }
826    }
827
828    /// Creates a new database from the alignments in file `gaf_file` and stores the database in file `db_file`.
829    ///
830    /// A unidirectional or bidirectional GBWT index can be provided.
831    /// Path `i` in the GBWT index corresponds to line `i` in the GAF file.
832    /// If a GBWT index is not provided, a new unidirectional index will be built in the background.
833    /// This will use a significant amount of memory, but it should not be the bottleneck of the construction.
834    ///
835    /// If the parameters indicate that node sequences should be stored in the database, a GBZ-compatible graph must be provided.
836    ///
837    /// # Arguments
838    ///
839    /// * `gaf_file`: GAF file storing the alignments. Can be gzip-compressed.
840    /// * `index`: An optional GBWT index storing the target paths.
841    /// * `db_file`: Output database file.
842    /// * `graph`: A GBZ-compatible graph for building a reference-free database, or [`GraphReference::None`] for a reference-based one.
843    /// * `params`: Construction parameters.
844    ///
845    /// # Errors
846    ///
847    /// Returns an error, if:
848    ///
849    /// * The GAF file does not exist.
850    /// * The database already exists.
851    /// * Trying to build a reference-free GAF-base without a graph.
852    /// * The graph is not a valid reference for the alignments.
853    ///
854    /// Passes through any I/O, database, and construction errors.
855    pub fn create<P: AsRef<Path>, Q: AsRef<Path>>(
856        gaf_file: P, index: Option<Arc<GBWT>>, db_file: Q,
857        graph: GraphReference<'_, '_>,
858        params: &GAFBaseParams
859    ) -> Result<(), String> {
860        eprintln!("Creating database {}", db_file.as_ref().display());
861        if binaries::file_exists(&db_file) {
862            return Err(format!("Database {} already exists", db_file.as_ref().display()));
863        }
864
865        // We will only use the graph if we actually need it.
866        let mut graph = graph;
867        if params.reference_free {
868            if graph.is_none() {
869                return Err(String::from("The construction of a reference-free GAF-base requires a graph"));
870            }
871        } else {
872            graph = GraphReference::None;
873        }
874
875        // Parse GAF headers and check that the graph is a valid reference.
876        let mut gaf_file = utils::open_file(gaf_file)?;
877        let aln_name = Self::parse_gaf_headers(&mut gaf_file)?;
878        let graph_name = graph.graph_name()?;
879        utils::require_valid_reference(&aln_name, &graph_name)?;
880
881        // `insert_alignments` consumes the connection, as it is moved to another thread.
882        let connection = Connection::open(&db_file).map_err(|x| x.to_string())?;
883        let built_index = Self::insert_alignments(index.clone(), gaf_file, connection, params)?;
884        eprintln!("Database size: {}", utils::file_size(&db_file).unwrap_or_else(|| String::from("unknown")));
885        let actual_index = match (&index, &built_index) {
886            (Some(index), None) => index.as_ref(),
887            (None, Some(index)) => index,
888            _ => return Err(String::from("Logic error in GBWT handling")),
889        };
890
891        let mut connection = Connection::open(&db_file).map_err(|x| x.to_string())?;
892        let nodes = Self::insert_nodes(actual_index, graph, &mut connection)?;
893        eprintln!("Database size: {}", utils::file_size(&db_file).unwrap_or_else(|| String::from("unknown")));
894
895        Self::insert_tags(actual_index, nodes, &aln_name, &mut connection, params)?;
896        eprintln!("Database size: {}", utils::file_size(&db_file).unwrap_or_else(|| String::from("unknown")));
897
898        Ok(())
899    }
900
901    fn parse_gaf_headers(gaf_file: &mut impl BufRead) -> Result<GraphName, String> {
902        let header_lines = formats::read_gaf_header_lines(gaf_file).map_err(|x| x.to_string())?;
903        Ok(GraphName::from_header_lines(&header_lines).unwrap_or_default())
904    }
905
906    // Returns the number of paths / alignments in the GBWT index.
907    fn gbwt_paths(index: &GBWT) -> usize {
908        if index.is_bidirectional() { index.sequences() / 2 } else { index.sequences() }
909    }
910
911    // We also include tags parsed from GAF headers.
912    fn insert_tags(index: &GBWT, nodes: usize, aln_name: &GraphName, connection: &mut Connection, params: &GAFBaseParams) -> Result<(), String> {
913        eprintln!("Inserting header and tags");
914
915        // Create the tags table.
916        connection.execute(
917            "CREATE TABLE Tags (
918                key TEXT PRIMARY KEY,
919                value TEXT NOT NULL
920            ) STRICT",
921            (),
922        ).map_err(|x| x.to_string())?;
923
924        // We currently only care about tags related to stable graph names.
925        let mut additional_tags = Tags::new();
926        aln_name.set_tags(&mut additional_tags);
927
928        // Insert header and selected tags.
929        let mut inserted = 0;
930        let transaction = connection.transaction().map_err(|x| x.to_string())?;
931        {
932            let mut insert = transaction.prepare(
933                "INSERT INTO Tags(key, value) VALUES (?1, ?2)"
934            ).map_err(|x| x.to_string())?;
935
936            let alignments = Self::gbwt_paths(index);
937            insert.execute((Self::KEY_VERSION, Self::VERSION)).map_err(|x| x.to_string())?;
938            insert.execute((Self::KEY_NODES, nodes)).map_err(|x| x.to_string())?;
939            insert.execute((Self::KEY_ALIGNMENTS, alignments)).map_err(|x| x.to_string())?;
940            let bidirectional: usize = if index.is_bidirectional() { 1 } else { 0 };
941            insert.execute((Self::KEY_BIDIRECTIONAL_GBWT, bidirectional)).map_err(|x| x.to_string())?;
942            insert.execute((Self::KEY_PARAMS, params.to_json())).map_err(|x| x.to_string())?;
943            inserted += 5;
944
945            for (key, value) in additional_tags.iter() {
946                insert.execute((key, value)).map_err(|x| x.to_string())?;
947                inserted += 1;
948            }
949        }
950        transaction.commit().map_err(|x| x.to_string())?;
951
952        eprintln!("Inserted {} key-value pairs", inserted);
953
954        Ok(())
955    }
956
957    // Returns the number of nodes in the graph.
958    fn insert_nodes(index: &GBWT, graph: GraphReference<'_, '_>, connection: &mut Connection) -> Result<usize, String> {
959        eprintln!("Inserting nodes");
960
961        // Create the nodes table.
962        // In a reference-based GAF-base, `sequence` will be empty.
963        // In a reference-free GAF-base, it will store the sequence label.
964        connection.execute(
965            "CREATE TABLE Nodes (
966                handle INTEGER PRIMARY KEY,
967                edges BLOB NOT NULL,
968                bwt BLOB NOT NULL,
969                sequence BLOB NOT NULL
970            ) STRICT",
971            (),
972        ).map_err(|x| x.to_string())?;
973
974        // Insert the nodes.
975        let mut inserted = 0;
976        let mut graph = graph;
977        let transaction = connection.transaction().map_err(|x| x.to_string())?;
978        {
979            let mut insert = transaction.prepare(
980                "INSERT INTO Nodes(handle, edges, bwt, sequence) VALUES (?1, ?2, ?3, ?4)"
981            ).map_err(|x| x.to_string())?;
982            let bwt: &BWT = index.as_ref();
983            for record_id in bwt.id_iter() {
984                if record_id == gbz::ENDMARKER {
985                    continue;
986                }
987                let handle = index.record_to_node(record_id);
988                let (edge_bytes, bwt_bytes) = bwt.compressed_record(record_id).unwrap();
989                let sequence = if graph.is_none() {
990                    Vec::new()
991                } else {
992                    let record = graph.gbz_record(handle)?;
993                    utils::encode_sequence(record.sequence())
994                };
995                insert.execute((handle, edge_bytes, bwt_bytes, sequence)).map_err(|x| x.to_string())?;
996                inserted += 1;
997            }
998        }
999        transaction.commit().map_err(|x| x.to_string())?;
1000
1001        eprintln!("Inserted {} node records", inserted);
1002        Ok(inserted / 2)
1003    }
1004
1005    // This returns a GBWT index on success if and only if one was not provided.
1006    fn insert_alignments(
1007        index: Option<Arc<GBWT>>,
1008        gaf_file: Box<dyn BufRead>,
1009        connection: Connection,
1010        params: &GAFBaseParams
1011    ) -> Result<Option<GBWT>, String> {
1012        eprintln!("Inserting alignments");
1013        let mut gaf_file = gaf_file;
1014        let mut connection = connection;
1015
1016        // min_handle and max_handle will be NULL for a block of unaligned reads.
1017        // read_length will be NULL if the lengths vary within the block.
1018        connection.execute(
1019            "CREATE TABLE Alignments (
1020                id INTEGER PRIMARY KEY,
1021                min_handle INTEGER,
1022                max_handle INTEGER CHECK (min_handle <= max_handle),
1023                alignments INTEGER NOT NULL,
1024                read_length INTEGER,
1025                gbwt_starts BLOB NOT NULL,
1026                names BLOB NOT NULL,
1027                quality_strings BLOB NOT NULL,
1028                difference_strings BLOB NOT NULL,
1029                flags BLOB NOT NULL,
1030                numbers BLOB NOT NULL,
1031                optional BLOB NOT NULL
1032            ) STRICT",
1033            (),
1034        ).map_err(|x| x.to_string())?;
1035
1036        // TODO: Use rtree?
1037        // Create indexes for min/max nodes.
1038        connection.execute(
1039            "CREATE INDEX AlignmentNodeInterval
1040                ON Alignments(min_handle, max_handle)",
1041            (),
1042        ).map_err(|x| x.to_string())?;
1043
1044        // The main thread parses the GAF file and sends blocks of alignments to an encoder thread.
1045        // That thread sends the encoded blocks to a third thread that inserts them into the database.
1046        // If something fails within a thread, it sends an error message to the next thread and stops.
1047        // If a thread receives an error message, it passes it through and stops. If a thread fails to
1048        // send an `Ok` message, it indicates that the receiver has already reported an error and stopped.
1049        let (to_encoder, from_parser) = mpsc::sync_channel(4);
1050        let (to_insert, from_encoder) = mpsc::sync_channel(4);
1051        let (to_report, from_insert) = mpsc::sync_channel(1);
1052
1053        // Encoder thread.
1054        let expected_alignments = index.as_ref().map(|index| Self::gbwt_paths(index.as_ref()));
1055        let build_gbwt = index.is_none();
1056        let encoder_thread = thread::spawn(move || {
1057            let mut alignment_id = 0;
1058            let mut source = if let Some(index) = index.as_ref() {
1059                PathStartSource::from(index.as_ref())
1060            } else {
1061                PathStartSource::new()
1062            };
1063            loop {
1064                // This can only fail if the sender is disconnected.
1065                let block: Result<Vec<Alignment>, String> = from_parser.recv().unwrap();
1066                match block {
1067                    Ok(block) => {
1068                        let encoded = AlignmentBlock::new(&block, &mut source, alignment_id);
1069                        match encoded {
1070                            Ok(data) => {
1071                                // Failure to send indicates that the receiver has already failed.
1072                                if to_insert.send(Ok(data)).is_err() {
1073                                    return;
1074                                }
1075                            },
1076                            Err(message) => {
1077                                let _ = to_insert.send(Err(message));
1078                                return;
1079                            }
1080                        }
1081                        alignment_id += block.len();
1082                        if block.is_empty() {
1083                            // An empty block indicates that we are done.
1084                            return;
1085                        }
1086                    },
1087                    Err(message) => {
1088                        let _ = to_insert.send(Err(message));
1089                        return;
1090                    },
1091                }
1092            }
1093        });
1094
1095        // Insertion thread.
1096        let insert_thread = thread::spawn(move || {
1097            let transaction = connection.transaction().map_err(|x| x.to_string());
1098            if let Err(message) = transaction {
1099                let _ = to_report.send(Err(message));
1100                return;
1101            }
1102            let transaction = transaction.unwrap();
1103
1104            let insert = transaction.prepare(
1105                "INSERT INTO
1106                    Alignments(min_handle, max_handle, alignments, read_length, gbwt_starts, names, quality_strings, difference_strings, flags, numbers, optional)
1107                    VALUES (?1, ?2, ?3, ?4, ?5, ?6, ?7, ?8, ?9, ?10, ?11)"
1108            ).map_err(|x| x.to_string());
1109            if let Err(message) = insert {
1110                let _ = to_report.send(Err(message));
1111                return;
1112            }
1113            let mut insert = insert.unwrap();
1114
1115            let mut statistics = AlignmentStats::new();
1116            loop {
1117                // This can only fail if the sender is disconnected.
1118                let block: Result<AlignmentBlock, String> = from_encoder.recv().unwrap();
1119                match block {
1120                    Ok(block) => {
1121                        if block.is_empty() {
1122                            // An empty block indicates that we are done.
1123                            break;
1124                        }
1125                        statistics.update(&block);
1126                        let result = insert.execute((
1127                            block.min_handle, block.max_handle, block.alignments, block.read_length,
1128                            block.gbwt_starts, block.names,
1129                            block.quality_strings, block.difference_strings,
1130                            block.flags.as_ref(), block.numbers, block.optional
1131                        )).map_err(|x| x.to_string());
1132                        if let Err(message) = result {
1133                            let _ = to_report.send(Err(message));
1134                            return;
1135                        }
1136                    },
1137                    Err(message) => {
1138                        let _ = to_report.send(Err(message));
1139                        return;
1140                    },
1141                }
1142            }
1143
1144            drop(insert);
1145            let result = transaction.commit().map_err(|x| x.to_string());
1146            if let Err(message) = result {
1147                let _ = to_report.send(Err(message));
1148                return;
1149            }
1150            let _ = to_report.send(Ok(statistics));
1151        });
1152
1153        // TODO: Maybe we should start a new block when the min node changes and the block is large enough.
1154        // Main thread that parses the alignments and builds a GBWT index in
1155        // the background if needed. If we get an error in this thread, we pass
1156        // it through the other threads and stop.
1157        let mut line_num: usize = 0;
1158        let mut unaligned_block = false;
1159        let mut block: Vec<Alignment> = Vec::new();
1160        let mut failed = false;
1161        let mut builder = if build_gbwt {
1162            Some(GBWTBuilder::new(false, false, params.gbwt_buffer_size))
1163        } else {
1164            None
1165        };
1166        loop {
1167            let mut buf: Vec<u8> = Vec::new();
1168            let len = gaf_file.read_until(b'\n', &mut buf).map_err(|x| x.to_string());
1169            match len {
1170                Ok(len) => {
1171                    if len == 0 {
1172                        // End of file.
1173                        break;
1174                    }
1175                },
1176                Err(message) => {
1177                    let _ = to_encoder.send(Err(message));
1178                    failed = true;
1179                    break;
1180                },
1181            };
1182            if formats::is_gaf_header_line(&buf) {
1183                // We should not encounter header lines between alignment lines.
1184                line_num += 1;
1185                continue;
1186            }
1187            let aln = Alignment::from_gaf(&buf).map_err(
1188                |x| format!("Failed to parse the alignment on line {}: {}", line_num, x)
1189            );
1190            match aln {
1191                Ok(aln) => {
1192                    if let Some(builder) = &mut builder
1193                        && let Err(msg) = builder.insert(aln.target_path().unwrap(), None) {
1194                        let _ = to_encoder.send(Err(msg));
1195                        failed = true;
1196                        break;
1197                    }
1198                    let mut aln = aln;
1199                    if !params.store_quality_strings {
1200                        aln.base_quality.clear();
1201                    }
1202                    if !params.store_optional_fields {
1203                        aln.optional.clear();
1204                    }
1205                    if aln.is_unaligned() != unaligned_block {
1206                        // We have a new block.
1207                        if !block.is_empty() {
1208                            // Failure to send indicates that the receiver has already failed.
1209                            if to_encoder.send(Ok(block)).is_err() {
1210                                block = Vec::new();
1211                                failed = true;
1212                                break;
1213                            }
1214                            block = Vec::new();
1215                        }
1216                        unaligned_block = aln.is_unaligned();
1217                    }
1218                    block.push(aln);
1219                    if block.len() >= params.block_size {
1220                        // Failure to send indicates that the receiver has already failed.
1221                        if to_encoder.send(Ok(block)).is_err() {
1222                            block = Vec::new();
1223                            failed = true;
1224                            break;
1225                        }
1226                        block = Vec::new();
1227                    }
1228                },
1229                Err(message) => {
1230                    let _ = to_encoder.send(Err(message));
1231                    failed = true;
1232                    break;
1233                },
1234            }
1235            line_num += 1;
1236        }
1237
1238        // Send the last block and a termination signal.
1239        // Then wait for the threads to finish and get the number of inserted alignments.
1240        if !failed {
1241            // If we fail here, the other threads have already failed, and we will get
1242            // an error message from `from_insert`.
1243            if !block.is_empty() {
1244                let _ = to_encoder.send(Ok(block));
1245            }
1246            let _ = to_encoder.send(Ok(Vec::new()));
1247        }
1248        let _ = encoder_thread.join();
1249        let _ = insert_thread.join();
1250        let statistics = from_insert.recv().unwrap()?;
1251
1252        eprintln!("Inserted information on {} alignments", statistics.alignments);
1253        if let Some(expected_alignments) = expected_alignments && statistics.alignments != expected_alignments {
1254            eprintln!("Warning: Expected {} alignments", expected_alignments);
1255        }
1256        eprintln!(
1257            "Field sizes: gbwt_starts {}, names {}, quality_strings {}, difference_strings {}, flags {}, numbers {}, optional {}",
1258            utils::human_readable_size(statistics.start_bytes),
1259            utils::human_readable_size(statistics.name_bytes),
1260            utils::human_readable_size(statistics.quality_bytes),
1261            utils::human_readable_size(statistics.difference_bytes),
1262            utils::human_readable_size(statistics.flag_bytes),
1263            utils::human_readable_size(statistics.number_bytes),
1264            utils::human_readable_size(statistics.optional_bytes),
1265        );
1266
1267        // Finally build the GBWT index.
1268        if let Some(builder) = builder {
1269            eprintln!("Finishing GBWT construction");
1270            let index = builder.build()?;
1271            eprintln!("GBWT index: {} sequences of total length {}", index.sequences(), index.len());
1272            Ok(Some(index))
1273        } else {
1274            Ok(None)
1275        }
1276    }
1277}
1278
1279//-----------------------------------------------------------------------------
1280
1281/// A record for an oriented node.
1282///
1283/// The record corresponds to one row in table `Nodes`.
1284/// It stores the information in a GBWT node record ([`Record`]) and the sequence of the node in the correct orientation.
1285/// The edges and the sequence are decompressed, while the BWT fragment remains in compressed form.
1286///
1287/// If the node is a boundary node in a top-level chain, there may also be a link to the next node in the chain.
1288/// That requires that the link was also present in the source of the record.
1289#[derive(Clone, Debug, PartialEq, Eq)]
1290pub struct GBZRecord {
1291    handle: usize,
1292    edges: Vec<Pos>,
1293    bwt: Vec<u8>,
1294    sequence: Vec<u8>,
1295    next: Option<usize>,
1296}
1297
1298impl GBZRecord {
1299    // TODO: add next from chains?
1300    /// Creates a new GBZ record from the given GBZ graph and node handle.
1301    ///
1302    /// Returns [`None`] if the node does not exist in the graph.
1303    pub fn from_gbz(graph: &GBZ, handle: usize) -> Option<Self> {
1304        let node_id = support::node_id(handle);
1305        if !graph.has_node(node_id) {
1306            return None;
1307        }
1308
1309        let index: &GBWT = graph.as_ref();
1310        let bwt_records: &BWT = index.as_ref();
1311        let record_id = index.node_to_record(handle);
1312        let (edge_bytes, bwt_bytes) = bwt_records.compressed_record(record_id)?;
1313        let (edges, _) = Record::decompress_edges(edge_bytes)?;
1314        let bwt = bwt_bytes.to_vec();
1315        let sequence = graph.sequence(node_id)?;
1316        let sequence = if support::node_orientation(handle) == Orientation::Forward {
1317            sequence.to_vec()
1318        } else {
1319            support::reverse_complement(sequence)
1320        };
1321        let next = None;
1322
1323        Some(GBZRecord { handle, edges, bwt, sequence, next })
1324    }
1325
1326    /// Creates a new GBZ record from the raw parts.
1327    ///
1328    /// This is primarily for testing.
1329    ///
1330    /// # Safety
1331    ///
1332    /// This is probably safe, even if the record breaks some invariants.
1333    /// However, I do not have the time and the energy to determine the consequences.
1334    #[doc(hidden)]
1335    pub unsafe fn from_raw_parts(handle: usize, edges: Vec<Pos>, bwt: Vec<u8>, sequence: Vec<u8>, next: Option<usize>) -> Self {
1336        GBZRecord { handle, edges, bwt, sequence, next }
1337    }
1338
1339    /// Returns a GBWT record based on this record.
1340    ///
1341    /// The lifetime of the returned record is tied to this record.
1342    ///
1343    /// # Panics
1344    ///
1345    /// Will panic if the record would be empty.
1346    /// This should never happen with a valid database.
1347    pub fn to_gbwt_record(&self) -> Record<'_> {
1348        if self.edges.is_empty() {
1349            panic!("GBZRecord::to_gbwt_record: Empty record");
1350        }
1351        unsafe { Record::from_raw_parts(self.handle, self.edges.clone(), &self.bwt) }
1352    }
1353
1354    /// Returns the handle of the record.
1355    ///
1356    /// The handle is a [`GBWT`] node identifier.
1357    #[inline]
1358    pub fn handle(&self) -> usize {
1359        self.handle
1360    }
1361
1362    /// Returns the node identifier of the record.
1363    #[inline]
1364    pub fn id(&self) -> usize {
1365        support::node_id(self.handle)
1366    }
1367
1368    /// Returns the orientation of the record.
1369    #[inline]
1370    pub fn orientation(&self) -> Orientation {
1371        support::node_orientation(self.handle)
1372    }
1373
1374    /// Returns an iterator over the handles of successor nodes.
1375    #[inline]
1376    pub fn successors(&self) -> impl Iterator<Item = usize> + '_ {
1377        self.edges.iter().filter(|x| x.node != gbz::ENDMARKER).map(|x| x.node)
1378    }
1379
1380    /// Returns an iterator over the outgoing edges from the record.
1381    ///
1382    /// Each edge is a pair consisting of a destination node handle and a offset in the corresponding GBWT record.
1383    /// The edges are sorted by destination node.
1384    /// This iterator does not list the possible edge to [`gbz::ENDMARKER`], as it only exists for technical purposes.
1385    #[inline]
1386    pub fn edges(&self) -> impl Iterator<Item = Pos> + '_ {
1387        self.edges.iter().filter(|x| x.node != gbz::ENDMARKER).copied()
1388    }
1389
1390    /// Returns the slice of edges stored in the record.
1391    ///
1392    /// Each edge is a pair consisting of a destination node handle and a offset in the corresponding GBWT record.
1393    /// The edges are sorted by destination node.
1394    /// This slice does not list the possible edge to [`gbz::ENDMARKER`], as it only exists for technical purposes.
1395    pub(crate) fn edges_slice(&self) -> &[Pos] {
1396        if !self.edges.is_empty() && self.edges[0].node == gbz::ENDMARKER {
1397            return &self.edges[1..];
1398        }
1399        &self.edges
1400    }
1401
1402    /// Returns the sequence of the record.
1403    ///
1404    /// This is the sequence of the node in the correct orientation.
1405    /// The sequence is a valid [`String`] over the alphabet `ACGTN`.
1406    #[inline]
1407    pub fn sequence(&self) -> &[u8] {
1408        &self.sequence
1409    }
1410
1411    /// Returns the length of the sequence stored in the record.
1412    #[inline]
1413    pub fn sequence_len(&self) -> usize {
1414        self.sequence.len()
1415    }
1416
1417    /// Returns the next handle for the next boundary node in the chain, or [`None`] if there is none.
1418    #[inline]
1419    pub fn next(&self) -> Option<usize> {
1420        self.next
1421    }
1422}
1423
1424//-----------------------------------------------------------------------------
1425
1426/// A record for a path in the graph.
1427///
1428/// The record corresponds to one row in table `Paths`.
1429#[derive(Clone, Debug, PartialEq, Eq)]
1430pub struct GBZPath {
1431    /// Path handle / identifier of the path in the original graph.
1432    pub handle: usize,
1433
1434    /// Starting position of the path in the forward orientation.
1435    pub fw_start: Pos,
1436
1437    /// Starting position of the path in the reverse orientation.
1438    pub rev_start: Pos,
1439
1440    /// Path name.
1441    pub name: FullPathName,
1442
1443    /// Has this path been indexed for random access with [`GraphInterface::indexed_position`]?
1444    pub is_indexed: bool,
1445}
1446
1447impl GBZPath {
1448    /// Returns a string representation of the path name.
1449    pub fn name(&self) -> String {
1450        self.name.to_string()
1451    }
1452
1453    /// Creates a new path record from the given GBZ graph and a path identifier.
1454    pub fn with_id(graph: &GBZ, path_id: usize) -> Option<Self> {
1455        let metadata = graph.metadata()?;
1456        let index: &GBWT = graph.as_ref();
1457        let fw_start = path_start(index, path_id, Orientation::Forward);
1458        let rev_start = path_start(index, path_id, Orientation::Reverse);
1459        let name = FullPathName::from_metadata(metadata, path_id)?;
1460        Some(GBZPath {
1461            handle: path_id,
1462            fw_start, rev_start,
1463            name,
1464            is_indexed: false,
1465        })
1466    }
1467
1468    /// Creates a new path record from the given GBZ graph and path name.
1469    ///
1470    /// The fragment field is assumed to be an offset in the haplotype.
1471    /// If the haplotype is fragmented, this returns the last fragment starting at or before the given offset.
1472    pub fn with_name(graph: &GBZ, name: &FullPathName) -> Option<Self> {
1473        let metadata = graph.metadata()?;
1474        let path_id = metadata.find_fragment(name)?;
1475        let index: &GBWT = graph.as_ref();
1476        let fw_start = path_start(index, path_id, Orientation::Forward);
1477        let rev_start = path_start(index, path_id, Orientation::Reverse);
1478        Some(GBZPath {
1479            handle: path_id,
1480            fw_start, rev_start,
1481            name: FullPathName::from_metadata(metadata, path_id)?,
1482            is_indexed: false,
1483        })
1484    }
1485}
1486
1487impl AsRef<FullPathName> for GBZPath {
1488    fn as_ref(&self) -> &FullPathName {
1489        &self.name
1490    }
1491}
1492
1493//-----------------------------------------------------------------------------
1494
1495/// Database query interface.
1496///
1497/// This structure stores prepared statements for accessing the graph.
1498///
1499/// # Examples
1500///
1501/// ```
1502/// use gbz_base::{GBZBase, GraphInterface};
1503/// use gbz::{Orientation, FullPathName};
1504/// use gbz::support;
1505/// use simple_sds::serialize;
1506/// use std::fs;
1507///
1508/// // Create the database.
1509/// let gbz_file = support::get_test_data("example.gbz");
1510/// let db_file = serialize::temp_file_name("graph-interface");
1511/// let result = GBZBase::create_from_files(&gbz_file, None, &db_file);
1512/// assert!(result.is_ok());
1513///
1514/// // Open the database and create a graph interface.
1515/// let database = GBZBase::open(&db_file).unwrap();
1516/// let mut interface = GraphInterface::new(&database).unwrap();
1517///
1518/// // The example graph does not have a reference samples tag.
1519/// assert!(interface.get_gbwt_tag("reference_samples").unwrap().is_none());
1520///
1521/// // Node 21 with edges to 22 and 23, all in forward orientation.
1522/// let id = 21;
1523/// let orientation = Orientation::Forward;
1524/// let handle = support::encode_node(id, orientation);
1525/// let record = interface.get_record(handle).unwrap().unwrap();
1526/// assert_eq!(record.id(), id);
1527/// assert_eq!(record.orientation(), orientation);
1528/// let successors: Vec<usize> = record.successors().collect();
1529/// assert_eq!(
1530///     successors,
1531///     vec![
1532///         support::encode_node(22, Orientation::Forward),
1533///         support::encode_node(23, Orientation::Forward)
1534///     ]
1535/// );
1536///
1537/// // Reference path for contig B goes from 21 to 22.
1538/// let path_name = FullPathName::generic("B");
1539/// let path = interface.find_path(&path_name).unwrap().unwrap();
1540/// assert_eq!(path.fw_start.node, handle);
1541/// let next = record.to_gbwt_record().lf(path.fw_start.offset).unwrap();
1542/// assert_eq!(next.node, support::encode_node(22, Orientation::Forward));
1543///
1544/// // The first indexed position is at the start of the path.
1545/// assert!(path.is_indexed);
1546/// let indexed_pos = interface.indexed_position(path.handle, 3).unwrap().unwrap();
1547/// assert_eq!(indexed_pos, (0, path.fw_start));
1548///
1549/// // Clean up.
1550/// drop(interface);
1551/// drop(database);
1552/// fs::remove_file(&db_file).unwrap();
1553/// ```
1554#[derive(Debug)]
1555pub struct GraphInterface<'a> {
1556    get_tag: Statement<'a>,
1557    get_tags: Statement<'a>,
1558    get_record: Statement<'a>,
1559    get_path: Statement<'a>,
1560    find_path: Statement<'a>,
1561    paths_for_sample: Statement<'a>,
1562    indexed_position: Statement<'a>,
1563}
1564
1565impl<'a> GraphInterface<'a> {
1566    /// Returns a new interface to the given database.
1567    ///
1568    /// Passes through any database errors.
1569    pub fn new(database: &'a GBZBase) -> Result<Self, String> {
1570        let get_tag = database.connection.prepare(
1571            "SELECT value FROM Tags WHERE key = ?1"
1572        ).map_err(|x| x.to_string())?;
1573
1574        let get_tags = database.connection.prepare(
1575            "SELECT key, value FROM Tags WHERE key LIKE ?1"
1576        ).map_err(|x| x.to_string())?;
1577
1578        let get_record = database.connection.prepare(
1579            "SELECT edges, bwt, sequence, next FROM Nodes WHERE handle = ?1"
1580        ).map_err(|x| x.to_string())?;
1581
1582        let get_path = database.connection.prepare(
1583            "SELECT * FROM Paths WHERE handle = ?1"
1584        ).map_err(|x| x.to_string())?;
1585
1586        let find_path = database.connection.prepare(
1587            "SELECT * FROM Paths
1588            WHERE sample = ?1 AND contig = ?2 AND haplotype = ?3 AND fragment <= ?4
1589            ORDER BY fragment DESC
1590            LIMIT 1"
1591        ).map_err(|x| x.to_string())?;
1592
1593        let paths_for_sample = database.connection.prepare(
1594            "SELECT * FROM Paths WHERE sample = ?1"
1595        ).map_err(|x| x.to_string())?;
1596
1597        let indexed_position = database.connection.prepare(
1598            "SELECT path_offset, node_handle, node_offset FROM ReferenceIndex
1599            WHERE path_handle = ?1 AND path_offset <= ?2
1600            ORDER BY path_offset DESC
1601            LIMIT 1"
1602        ).map_err(|x| x.to_string())?;
1603
1604        Ok(GraphInterface {
1605            get_tag, get_tags,
1606            get_record,
1607            get_path, find_path, paths_for_sample,
1608            indexed_position,
1609        })
1610    }
1611
1612    /// Returns the value of the [`GBWT`] tag with the given key, or [`None`] if the tag does not exist.
1613    pub fn get_gbwt_tag(&mut self, key: &str) -> Result<Option<String>, String> {
1614        let key = format!("{}{}", GBZBase::KEY_GBWT, key);
1615        self.get_tag.query_row(
1616            (key,),
1617            |row| row.get(0)
1618        ).optional().map_err(|x| x.to_string())
1619    }
1620
1621    /// Returns the value of the [`GBZ`] tag with the given key, or [`None`] if the tag does not exist.
1622    pub fn get_gbz_tag(&mut self, key: &str) -> Result<Option<String>, String> {
1623        let key = format!("{}{}", GBZBase::KEY_GBZ, key);
1624        self.get_tag.query_row(
1625            (key,),
1626            |row| row.get(0)
1627        ).optional().map_err(|x| x.to_string())
1628    }
1629
1630    // Returns all tags with the given prefix.
1631    fn get_tags_with_prefix(&mut self, prefix: &str) -> Result<Tags, String> {
1632        let mut tags = Tags::new();
1633        let pattern = format!("{}%", prefix);
1634        let mut rows = self.get_tags.query((pattern,)).map_err(|x| x.to_string())?;
1635        while let Some(row) = rows.next().map_err(|x| x.to_string())? {
1636            let key: String = row.get(0).map_err(|x| x.to_string())?;
1637            let value: String = row.get(1).map_err(|x| x.to_string())?;
1638            let key = key.trim_start_matches(prefix).to_string();
1639            tags.insert(&key, &value);
1640        }
1641        Ok(tags)
1642    }
1643
1644    /// Returns all [`GBWT`] tags.
1645    pub fn get_gbwt_tags(&mut self) -> Result<Tags, String> {
1646        self.get_tags_with_prefix(GBZBase::KEY_GBWT)
1647    }
1648
1649    /// Returns all [`GBZ`] tags.
1650    pub fn get_gbz_tags(&mut self) -> Result<Tags, String> {
1651        self.get_tags_with_prefix(GBZBase::KEY_GBZ)
1652    }
1653
1654    /// Returns the stable graph name (pggname) for the graph.
1655    ///
1656    /// Passes through any database errors.
1657    /// Returns an empty name if the corresponding GBZ tags cannot be parsed.
1658    pub fn graph_name(&mut self) -> Result<GraphName, String> {
1659        let tags = self.get_gbz_tags()?;
1660        Ok(GraphName::from_tags(&tags).unwrap_or_default())
1661    }
1662
1663    /// Returns the node record for the given handle, or [`None`] if the node does not exist.
1664    pub fn get_record(&mut self, handle: usize) -> Result<Option<GBZRecord>, String> {
1665        self.get_record.query_row(
1666            (handle,),
1667            |row| {
1668                let edge_bytes: Vec<u8> = row.get(0)?;
1669                let (edges, _) = Record::decompress_edges(&edge_bytes).unwrap();
1670                let bwt: Vec<u8> = row.get(1)?;
1671                let encoded_sequence: Vec<u8> = row.get(2)?;
1672                let sequence: Vec<u8> = utils::decode_sequence(&encoded_sequence);
1673                let next: Option<usize> = row.get(3)?;
1674                Ok(GBZRecord { handle, edges, bwt, sequence, next })
1675            }
1676        ).optional().map_err(|x| x.to_string())
1677    }
1678
1679    fn row_to_gbz_path(row: &Row) -> rusqlite::Result<GBZPath> {
1680        let handle = row.get(0)?;
1681        let fw_start = Pos::new(row.get(1)?, row.get(2)?);
1682        let rev_start = Pos::new(row.get(3)?, row.get(4)?);
1683        let name = FullPathName {
1684            sample: row.get(5)?,
1685            contig: row.get(6)?,
1686            haplotype: row.get(7)?,
1687            fragment: row.get(8)?,
1688        };
1689        let is_indexed = row.get(9)?;
1690        Ok(GBZPath {
1691            handle,
1692            fw_start, rev_start,
1693            name,
1694            is_indexed,
1695        })
1696    }
1697
1698    /// Returns the path with the given handle, or [`None`] if the path does not exist.
1699    pub fn get_path(&mut self, handle: usize) -> Result<Option<GBZPath>, String> {
1700        self.get_path.query_row((handle,), Self::row_to_gbz_path).optional().map_err(|x| x.to_string())
1701    }
1702
1703    /// Returns the path with the given name, or [`None`] if the path does not exist.
1704    ///
1705    /// The fragment field is assumed to be an offset in the haplotype.
1706    /// If the haplotype is fragmented, this returns the last fragment starting at or before the given offset.
1707    pub fn find_path(&mut self, name: &FullPathName) -> Result<Option<GBZPath>, String> {
1708        self.find_path.query_row(
1709            (&name.sample, &name.contig, name.haplotype, name.fragment),
1710            Self::row_to_gbz_path
1711        ).optional().map_err(|x| x.to_string())
1712    }
1713
1714    /// Returns all paths with the given sample name.
1715    pub fn paths_for_sample(&mut self, sample_name: &str) -> Result<Vec<GBZPath>, String> {
1716        let mut result: Vec<GBZPath> = Vec::new();
1717        let mut rows = self.paths_for_sample.query((sample_name,)).map_err(|x| x.to_string())?;
1718        while let Some(row) = rows.next().map_err(|x| x.to_string())? {
1719            let path = Self::row_to_gbz_path(row).map_err(|x| x.to_string())?;
1720            result.push(path);
1721        }
1722        Ok(result)
1723    }
1724
1725    // TODO: List of all paths that are indexed. Handle fragmented haplotypes properly.
1726
1727    /// Returns the last indexed position at or before offset `path_offset` on path `path_handle`.
1728    ///
1729    /// Returns [`None`] if the path does not exist or it has not been indexed.
1730    pub fn indexed_position(&mut self, path_handle: usize, path_offset: usize) -> Result<Option<(usize, Pos)>, String> {
1731        self.indexed_position.query_row(
1732            (path_handle, path_offset),
1733            |row| {
1734                let path_offset = row.get(0)?;
1735                let node_handle = row.get(1)?;
1736                let node_offset = row.get(2)?;
1737                Ok((path_offset, Pos::new(node_handle, node_offset)))
1738            }
1739        ).optional().map_err(|x| x.to_string())
1740    }
1741}
1742
1743//-----------------------------------------------------------------------------
1744
1745// Helper types and functions for using the databases.
1746
1747/// A reference to a GBZ-compatible graph.
1748///
1749/// Graph operations with a [`GBZ`] graph take an immutable reference to the graph.
1750/// The corresponding operations with GBZ-base using [`GraphInterface`] take a mutable reference instead.
1751/// This wrapper can be created on demand to encapsulate a reference to either type of graph.
1752pub enum GraphReference<'reference, 'graph> {
1753    /// A [`GBZ`] graph.
1754    Gbz(&'reference GBZ),
1755    /// A [`GraphInterface`].
1756    Db(&'reference mut GraphInterface<'graph>),
1757    /// No graph provided.
1758    None,
1759}
1760
1761impl<'reference, 'graph> GraphReference<'reference, 'graph> {
1762    /// Returns the record for the oriented node corresponding to the given handle.
1763    ///
1764    /// # Errors
1765    ///
1766    /// Returns an error if the handle does not exist in the graph.
1767    /// Returns an error if the graph reference is [`Self::None`].
1768    /// Passes through any errors from the graph implementation.
1769    pub fn gbz_record(&mut self, handle: usize) -> Result<GBZRecord, String> {
1770        match self {
1771            GraphReference::Gbz(gbz) => {
1772                GBZRecord::from_gbz(gbz, handle).ok_or_else(|| format!("The graph does not contain handle {}", handle))
1773            },
1774            GraphReference::Db(db) => {
1775                db.get_record(handle)?.ok_or_else(|| format!("The graph does not contain handle {}", handle))
1776            },
1777            GraphReference::None => Err(String::from("No reference graph provided")),
1778        }
1779    }
1780
1781    /// Returns the stable graph name (pggname) for the graph.
1782    ///
1783    /// Returns an empty name if the graph reference is [`Self::None`].
1784    ///
1785    /// # Errors
1786    ///
1787    /// Returns an empty object if the corresponding GBZ tags cannot be parsed.
1788    /// Passes through any errors from the graph implementation.
1789    pub fn graph_name(&mut self) -> Result<GraphName, String> {
1790        match self {
1791            GraphReference::Gbz(gbz) => {
1792                Ok(GraphName::from_gbz(gbz))
1793            },
1794            GraphReference::Db(db) => {
1795                db.graph_name()
1796            },
1797            GraphReference::None => Ok(GraphName::default()),
1798        }
1799    }
1800
1801    /// Returns `true` if the graph reference is [`Self::None`].
1802    #[inline]
1803    pub fn is_none(&self) -> bool {
1804        matches!(self, GraphReference::None)
1805    }
1806}
1807
1808/// Returns the starting position of the given path in the given orientation.
1809///
1810/// Returns `(gbz::ENDMARKER, 0)` if the path is empty or does not exist.
1811pub fn path_start(index: &GBWT, path_id: usize, orientation: Orientation) -> Pos {
1812    index.start(support::encode_path(path_id, orientation)).unwrap_or(Pos::new(gbz::ENDMARKER, 0))
1813}
1814
1815/// Type of a potential GBZ / database file.
1816#[derive(Clone, Debug, PartialEq, Eq)]
1817pub enum FileType {
1818    /// The name does not refer to anything.
1819    Missing,
1820    /// The name refers to a directory, a symbolic link, or some other non-file.
1821    NotFile,
1822    /// The type of the file is unknown.
1823    UnknownFile,
1824    /// The file is a GBZ graph.
1825    Gbz,
1826    /// The file is an unknown SQLite database.
1827    UnknownDatabase,
1828    /// The file is a known SQLite database with the given version string.
1829    Version(String),
1830}
1831
1832/// Determines the type of the given file, which may be a GBZ graph or a SQLite database.
1833pub fn identify_file<P: AsRef<Path>>(filename: P) -> FileType {
1834    let metadata = fs::metadata(&filename);
1835    if metadata.is_err() {
1836        return FileType::Missing;
1837    }
1838    let metadata = metadata.unwrap();
1839    if !metadata.is_file() {
1840        return FileType::NotFile;
1841    }
1842
1843    if GBZ::is_gbz(&filename) {
1844        return FileType::Gbz;
1845    }
1846    let flags = OpenFlags::SQLITE_OPEN_READ_ONLY | OpenFlags::SQLITE_OPEN_NO_MUTEX;
1847    let connection = Connection::open_with_flags(filename, flags);
1848    if connection.is_err() {
1849        return FileType::UnknownFile;
1850    }
1851    let connection = connection.unwrap();
1852
1853    let statement = connection.prepare("SELECT value FROM Tags WHERE key = 'version'");
1854    if statement.is_err() {
1855        return FileType::UnknownDatabase;
1856    }
1857    let mut statement = statement.unwrap();
1858
1859    let version: rusqlite::Result<String> = statement.query_row([], |row| row.get(0));
1860    if version.is_err() {
1861        return FileType::UnknownDatabase;
1862    }
1863    FileType::Version(version.unwrap())
1864}
1865
1866// Executes the statement, which is expected to return a single string value.
1867// Then returns the value.
1868fn get_string_value(statement: &mut Statement, key: &str) -> Result<String, String> {
1869    let result: rusqlite::Result<String> = statement.query_row(
1870        (key,),
1871        |row| row.get(0)
1872    );
1873    match result {
1874        Ok(value) => Ok(value),
1875        Err(x) => Err(format!("Key not found: {} ({})", key, x)),
1876    }
1877}
1878
1879// Executes the statement, which is expected to return a single string value.
1880// Then returns the value as an integer.
1881fn get_numeric_value(statement: &mut Statement, key: &str) -> Result<usize, String> {
1882    let value = get_string_value(statement, key)?;
1883    value.parse::<usize>().map_err(|x| x.to_string())
1884}
1885
1886//-----------------------------------------------------------------------------
1887