Skip to main content

grafeo_engine/database/
import.rs

1//! Bulk graph import from TSV edge lists and Matrix Market files.
2//!
3//! These importers bypass per-edge transaction overhead by batching all
4//! operations into a single transaction. This is 10-100x faster than calling
5//! `create_node`/`create_edge` in a loop for large graphs.
6//!
7//! # Supported Formats
8//!
9//! | Format | Extension | Description |
10//! | ------ | --------- | ----------- |
11//! | TSV | `.tsv`, `.txt`, `.edges` | Tab or space-separated edge list |
12//! | MMIO | `.mtx` | Matrix Market coordinate format |
13//!
14//! # Example
15//!
16//! ```no_run
17//! use grafeo_engine::GrafeoDB;
18//!
19//! let db = GrafeoDB::new_in_memory();
20//! let (nodes, edges) = db.import_tsv("graph.tsv", "EDGE", true).unwrap();
21//! println!("Loaded {} nodes, {} edges", nodes, edges);
22//! ```
23
24use std::io::{BufRead, BufReader};
25use std::path::Path;
26
27use grafeo_common::types::NodeId;
28use grafeo_common::utils::error::{Error, Result};
29use grafeo_common::utils::hash::FxHashMap;
30
31impl super::GrafeoDB {
32    /// Bulk-imports a graph from a TSV/space-separated edge list into the LPG store.
33    ///
34    /// Each line should contain two integer IDs separated by whitespace (tab or space):
35    /// `src_id dst_id` with an optional third column for edge weight.
36    /// Lines starting with `#` or `%` are treated as comments and skipped.
37    /// Empty lines are also skipped.
38    ///
39    /// Nodes are created on-demand as new external IDs are encountered.
40    /// All nodes get the label `"_Imported"` and all edges get the given `edge_type`.
41    ///
42    /// # Arguments
43    ///
44    /// * `path` - Path to the TSV file.
45    /// * `edge_type` - Edge type label for all imported edges.
46    /// * `directed` - If `true`, create one directed edge per line.
47    ///   If `false`, create edges in both directions.
48    ///
49    /// # Returns
50    ///
51    /// `(node_count, edge_count)` on success.
52    ///
53    /// # Errors
54    ///
55    /// Returns an error if the file cannot be opened or contains malformed lines.
56    pub fn import_tsv(
57        &self,
58        path: impl AsRef<Path>,
59        edge_type: &str,
60        directed: bool,
61    ) -> Result<(usize, usize)> {
62        let path = path.as_ref();
63        let file = std::fs::File::open(path)
64            .map_err(|e| Error::Internal(format!("failed to open {}: {}", path.display(), e)))?;
65
66        let reader = BufReader::new(file);
67        let edges = parse_edge_list(reader)?;
68
69        self.import_edge_list(&edges, edge_type, directed)
70    }
71
72    /// Bulk-imports from a string containing TSV edge list data.
73    ///
74    /// Same format as [`import_tsv`](Self::import_tsv) but reads from a string
75    /// instead of a file. Useful for tests and embedded data.
76    ///
77    /// # Errors
78    ///
79    /// Returns an error if the data contains malformed lines.
80    pub fn import_tsv_str(
81        &self,
82        data: &str,
83        edge_type: &str,
84        directed: bool,
85    ) -> Result<(usize, usize)> {
86        let reader = BufReader::new(data.as_bytes());
87        let edges = parse_edge_list(reader)?;
88        self.import_edge_list(&edges, edge_type, directed)
89    }
90
91    /// Bulk-imports from a Matrix Market (MMIO) coordinate format file.
92    ///
93    /// Handles the standard MMIO header:
94    /// ```text
95    /// %%MatrixMarket matrix coordinate real general
96    /// % comment
97    /// rows cols nnz
98    /// row col [value]
99    /// ```
100    ///
101    /// Symmetric matrices automatically create edges in both directions.
102    ///
103    /// # Arguments
104    ///
105    /// * `path` - Path to the `.mtx` file.
106    /// * `edge_type` - Edge type label for all imported edges.
107    ///
108    /// # Returns
109    ///
110    /// `(node_count, edge_count)` on success.
111    ///
112    /// # Errors
113    ///
114    /// Returns an error if the file cannot be opened or has an invalid MMIO header or data.
115    pub fn import_mmio(&self, path: impl AsRef<Path>, edge_type: &str) -> Result<(usize, usize)> {
116        let path = path.as_ref();
117        let file = std::fs::File::open(path)
118            .map_err(|e| Error::Internal(format!("failed to open {}: {}", path.display(), e)))?;
119
120        let reader = BufReader::new(file);
121        let (edges, symmetric) = parse_mmio(reader)?;
122        self.import_edge_list(&edges, edge_type, !symmetric)
123    }
124
125    /// Bulk-imports a pre-parsed edge list into the LPG store.
126    fn import_edge_list(
127        &self,
128        edges: &[(u64, u64)],
129        edge_type: &str,
130        directed: bool,
131    ) -> Result<(usize, usize)> {
132        let store = self.lpg_store();
133
134        // Phase 1: Collect unique external IDs and create nodes.
135        let mut ext_to_int: FxHashMap<u64, NodeId> = FxHashMap::default();
136
137        for &(src, dst) in edges {
138            if !ext_to_int.contains_key(&src) {
139                let id = store.create_node(&["_Imported"]);
140                ext_to_int.insert(src, id);
141            }
142            if !ext_to_int.contains_key(&dst) {
143                let id = store.create_node(&["_Imported"]);
144                ext_to_int.insert(dst, id);
145            }
146        }
147
148        // Phase 2: Create edges in batch.
149        let mut batch: Vec<(NodeId, NodeId, &str)> = Vec::with_capacity(if directed {
150            edges.len()
151        } else {
152            edges.len() * 2
153        });
154
155        for &(src, dst) in edges {
156            let src_id = ext_to_int[&src];
157            let dst_id = ext_to_int[&dst];
158            batch.push((src_id, dst_id, edge_type));
159            if !directed {
160                batch.push((dst_id, src_id, edge_type));
161            }
162        }
163
164        store.batch_create_edges(&batch);
165
166        let node_count = ext_to_int.len();
167        let edge_count = batch.len();
168
169        // Refresh statistics so the optimizer has fresh data.
170        store.ensure_statistics_fresh();
171
172        Ok((node_count, edge_count))
173    }
174
175    /// Bulk-imports a TSV edge list into the RDF store.
176    ///
177    /// Each edge `(src, dst)` becomes a triple:
178    /// `<{base_uri}{src}> <{predicate_uri}> <{base_uri}{dst}>`
179    ///
180    /// # Arguments
181    ///
182    /// * `path` - Path to the TSV file.
183    /// * `predicate_uri` - Full IRI for the edge predicate (e.g., `"http://example.org/connects"`).
184    /// * `base_uri` - Base IRI prefix for node identifiers (e.g., `"http://example.org/node/"`).
185    ///
186    /// # Returns
187    ///
188    /// `(node_count, edge_count)` on success.
189    ///
190    /// # Errors
191    ///
192    /// Returns an error if the file cannot be opened or contains malformed lines.
193    #[cfg(feature = "triple-store")]
194    pub fn import_tsv_rdf(
195        &self,
196        path: impl AsRef<Path>,
197        predicate_uri: &str,
198        base_uri: &str,
199    ) -> Result<(usize, usize)> {
200        use grafeo_core::graph::rdf::{Term, Triple};
201
202        let path = path.as_ref();
203        let file = std::fs::File::open(path)
204            .map_err(|e| Error::Internal(format!("failed to open {}: {}", path.display(), e)))?;
205
206        let reader = BufReader::new(file);
207        let edges = parse_edge_list(reader)?;
208
209        let predicate = Term::iri(predicate_uri);
210        let mut unique_nodes = grafeo_common::utils::hash::FxHashSet::default();
211
212        let triples: Vec<Triple> = edges
213            .iter()
214            .map(|&(src, dst)| {
215                unique_nodes.insert(src);
216                unique_nodes.insert(dst);
217                Triple::new(
218                    Term::iri(format!("{base_uri}{src}")),
219                    predicate.clone(),
220                    Term::iri(format!("{base_uri}{dst}")),
221                )
222            })
223            .collect();
224
225        let edge_count = self.rdf_store.batch_insert(triples);
226
227        Ok((unique_nodes.len(), edge_count))
228    }
229}
230
231/// Parses a TSV/space-separated edge list from a reader.
232///
233/// Each non-comment, non-empty line should contain at least two whitespace-separated
234/// integer IDs. Additional columns (e.g., weights) are ignored.
235fn parse_edge_list(reader: impl BufRead) -> Result<Vec<(u64, u64)>> {
236    let mut edges = Vec::new();
237
238    for (line_num, line) in reader.lines().enumerate() {
239        let line = line
240            .map_err(|e| Error::Internal(format!("read error at line {}: {}", line_num + 1, e)))?;
241        let trimmed = line.trim();
242
243        // Skip comments and empty lines.
244        if trimmed.is_empty() || trimmed.starts_with('#') || trimmed.starts_with('%') {
245            continue;
246        }
247
248        let mut parts = trimmed.split_whitespace();
249        let src_str = parts
250            .next()
251            .ok_or_else(|| Error::Internal(format!("line {}: missing source ID", line_num + 1)))?;
252        let dst_str = parts
253            .next()
254            .ok_or_else(|| Error::Internal(format!("line {}: missing target ID", line_num + 1)))?;
255
256        let src: u64 = src_str.parse().map_err(|_| {
257            Error::Internal(format!(
258                "line {}: invalid source ID '{}'",
259                line_num + 1,
260                src_str
261            ))
262        })?;
263        let dst: u64 = dst_str.parse().map_err(|_| {
264            Error::Internal(format!(
265                "line {}: invalid target ID '{}'",
266                line_num + 1,
267                dst_str
268            ))
269        })?;
270
271        edges.push((src, dst));
272    }
273
274    Ok(edges)
275}
276
277/// Parses a Matrix Market coordinate format file.
278///
279/// Returns the edge list and whether the matrix is symmetric.
280fn parse_mmio(reader: impl BufRead) -> Result<(Vec<(u64, u64)>, bool)> {
281    let mut lines = reader.lines();
282    let mut symmetric = false;
283
284    // Parse header line.
285    let header = lines
286        .next()
287        .ok_or_else(|| Error::Internal("empty MMIO file".into()))?
288        .map_err(|e| Error::Internal(format!("MMIO header read error: {e}")))?;
289
290    if !header.starts_with("%%MatrixMarket") {
291        return Err(Error::Internal(
292            "invalid MMIO file: missing %%MatrixMarket header".into(),
293        ));
294    }
295
296    let header_lower = header.to_lowercase();
297    if header_lower.contains("symmetric") {
298        symmetric = true;
299    }
300
301    // Skip comment lines, find the size line.
302    let mut size_line = String::new();
303    for line in &mut lines {
304        let line = line.map_err(|e| Error::Internal(format!("MMIO read error: {e}")))?;
305        let trimmed = line.trim();
306        if trimmed.starts_with('%') || trimmed.is_empty() {
307            continue;
308        }
309        size_line = trimmed.to_string();
310        break;
311    }
312
313    // Parse size line: rows cols nnz
314    let size_parts: Vec<&str> = size_line.split_whitespace().collect();
315    if size_parts.len() < 3 {
316        return Err(Error::Internal("invalid MMIO size line".into()));
317    }
318    let nnz: usize = size_parts[2]
319        .parse()
320        .map_err(|_| Error::Internal(format!("invalid nnz count: '{}'", size_parts[2])))?;
321
322    // Parse data lines.
323    let mut edges = Vec::with_capacity(nnz);
324    for line in lines {
325        let line = line.map_err(|e| Error::Internal(format!("MMIO read error: {e}")))?;
326        let trimmed = line.trim();
327        if trimmed.is_empty() {
328            continue;
329        }
330
331        let mut parts = trimmed.split_whitespace();
332        let row_str = parts.next().unwrap_or("");
333        let col_str = parts.next().unwrap_or("");
334
335        let row: u64 = row_str
336            .parse()
337            .map_err(|_| Error::Internal(format!("invalid MMIO row: '{row_str}'")))?;
338        let col: u64 = col_str
339            .parse()
340            .map_err(|_| Error::Internal(format!("invalid MMIO col: '{col_str}'")))?;
341
342        edges.push((row, col));
343    }
344
345    Ok((edges, symmetric))
346}
347
348#[cfg(test)]
349mod tests {
350    use super::super::GrafeoDB;
351
352    #[test]
353    fn test_import_tsv_str_directed() {
354        let db = GrafeoDB::new_in_memory();
355        let data = "# comment\n1\t2\n2\t3\n3\t1\n";
356        let (nodes, edges) = db.import_tsv_str(data, "CONNECTS", true).unwrap();
357
358        assert_eq!(nodes, 3);
359        assert_eq!(edges, 3);
360        assert_eq!(db.node_count(), 3);
361        assert_eq!(db.edge_count(), 3);
362    }
363
364    #[test]
365    fn test_import_tsv_str_undirected() {
366        let db = GrafeoDB::new_in_memory();
367        let data = "1 2\n2 3\n";
368        let (nodes, edges) = db.import_tsv_str(data, "CONNECTS", false).unwrap();
369
370        assert_eq!(nodes, 3);
371        assert_eq!(edges, 4); // 2 edges * 2 directions
372    }
373
374    #[test]
375    fn test_import_tsv_str_with_weights() {
376        let db = GrafeoDB::new_in_memory();
377        // Third column (weight) should be ignored
378        let data = "1\t2\t0.5\n2\t3\t1.0\n";
379        let (nodes, edges) = db.import_tsv_str(data, "E", true).unwrap();
380
381        assert_eq!(nodes, 3);
382        assert_eq!(edges, 2);
383    }
384
385    #[test]
386    fn test_import_tsv_str_comments_and_blanks() {
387        let db = GrafeoDB::new_in_memory();
388        let data = "# header\n% also a comment\n\n1 2\n\n3 4\n";
389        let (nodes, edges) = db.import_tsv_str(data, "E", true).unwrap();
390
391        assert_eq!(nodes, 4);
392        assert_eq!(edges, 2);
393    }
394
395    #[test]
396    fn test_import_tsv_str_empty() {
397        let db = GrafeoDB::new_in_memory();
398        let data = "# only comments\n% nothing here\n";
399        let (nodes, edges) = db.import_tsv_str(data, "E", true).unwrap();
400
401        assert_eq!(nodes, 0);
402        assert_eq!(edges, 0);
403    }
404
405    #[test]
406    fn test_import_tsv_str_duplicate_nodes() {
407        let db = GrafeoDB::new_in_memory();
408        // Node 1 appears in multiple edges
409        let data = "1 2\n1 3\n1 4\n";
410        let (nodes, edges) = db.import_tsv_str(data, "E", true).unwrap();
411
412        assert_eq!(nodes, 4); // 1, 2, 3, 4
413        assert_eq!(edges, 3);
414    }
415
416    #[test]
417    fn test_import_mmio_str() {
418        let db = GrafeoDB::new_in_memory();
419        let data = "%%MatrixMarket matrix coordinate real general\n% comment\n3 3 3\n1 2 1.0\n2 3 1.0\n3 1 1.0\n";
420
421        let reader = std::io::BufReader::new(data.as_bytes());
422        let (edges, symmetric) = super::parse_mmio(reader).unwrap();
423
424        assert!(!symmetric);
425        assert_eq!(edges.len(), 3);
426
427        let result = db.import_edge_list(&edges, "E", true);
428        assert!(result.is_ok());
429        let (nodes, edge_count) = result.unwrap();
430        assert_eq!(nodes, 3);
431        assert_eq!(edge_count, 3);
432    }
433
434    #[test]
435    fn test_import_mmio_symmetric() {
436        let data = "%%MatrixMarket matrix coordinate real symmetric\n3 3 2\n1 2 1.0\n2 3 1.0\n";
437
438        let reader = std::io::BufReader::new(data.as_bytes());
439        let (edges, symmetric) = super::parse_mmio(reader).unwrap();
440
441        assert!(symmetric);
442        assert_eq!(edges.len(), 2);
443
444        let db = GrafeoDB::new_in_memory();
445        // Symmetric = undirected = both directions
446        let (nodes, edge_count) = db.import_edge_list(&edges, "E", false).unwrap();
447        assert_eq!(nodes, 3);
448        assert_eq!(edge_count, 4); // 2 edges * 2 directions
449    }
450
451    #[cfg(feature = "triple-store")]
452    #[test]
453    fn test_import_tsv_rdf() {
454        use grafeo_core::graph::GraphStore;
455        use grafeo_core::graph::rdf::RdfGraphStoreAdapter;
456
457        let db = GrafeoDB::new_in_memory();
458
459        // Write TSV to a temp file
460        let dir = tempfile::tempdir().unwrap();
461        let path = dir.path().join("test.tsv");
462        std::fs::write(&path, "1\t2\n2\t3\n3\t1\n").unwrap();
463
464        let (nodes, edges) = db
465            .import_tsv_rdf(
466                &path,
467                "http://example.org/connects",
468                "http://example.org/node/",
469            )
470            .unwrap();
471
472        assert_eq!(nodes, 3);
473        assert_eq!(edges, 3);
474
475        // Verify the adapter works on the imported RDF data
476        let adapter = RdfGraphStoreAdapter::new(&db.rdf_store);
477        assert_eq!(adapter.node_count(), 3);
478        assert_eq!(adapter.edge_count(), 3);
479    }
480}