Skip to main content

grafeo_engine/database/
import.rs

1//! Bulk graph import from TSV edge lists and Matrix Market files.
2//!
3//! These importers bypass per-edge transaction overhead by batching all
4//! operations into a single transaction. This is 10-100x faster than calling
5//! `create_node`/`create_edge` in a loop for large graphs.
6//!
7//! # Supported Formats
8//!
9//! | Format | Extension | Description |
10//! | ------ | --------- | ----------- |
11//! | TSV | `.tsv`, `.txt`, `.edges` | Tab or space-separated edge list |
12//! | MMIO | `.mtx` | Matrix Market coordinate format |
13//!
14//! # Example
15//!
16//! ```no_run
17//! use grafeo_engine::GrafeoDB;
18//!
19//! let db = GrafeoDB::new_in_memory();
20//! let (nodes, edges) = db.import_tsv("graph.tsv", "EDGE", true).unwrap();
21//! println!("Loaded {} nodes, {} edges", nodes, edges);
22//! ```
23
24use std::io::{BufRead, BufReader};
25use std::path::Path;
26
27use grafeo_common::types::NodeId;
28use grafeo_common::utils::error::{Error, Result};
29use grafeo_common::utils::hash::FxHashMap;
30
31impl super::GrafeoDB {
32    /// Bulk-imports a graph from a TSV/space-separated edge list into the LPG store.
33    ///
34    /// Each line should contain two integer IDs separated by whitespace (tab or space):
35    /// `src_id dst_id` with an optional third column for edge weight.
36    /// Lines starting with `#` or `%` are treated as comments and skipped.
37    /// Empty lines are also skipped.
38    ///
39    /// Nodes are created on-demand as new external IDs are encountered.
40    /// All nodes get the label `"_Imported"` and all edges get the given `edge_type`.
41    ///
42    /// # Arguments
43    ///
44    /// * `path` - Path to the TSV file.
45    /// * `edge_type` - Edge type label for all imported edges.
46    /// * `directed` - If `true`, create one directed edge per line.
47    ///   If `false`, create edges in both directions.
48    ///
49    /// # Returns
50    ///
51    /// `(node_count, edge_count)` on success.
52    pub fn import_tsv(
53        &self,
54        path: impl AsRef<Path>,
55        edge_type: &str,
56        directed: bool,
57    ) -> Result<(usize, usize)> {
58        let path = path.as_ref();
59        let file = std::fs::File::open(path)
60            .map_err(|e| Error::Internal(format!("failed to open {}: {}", path.display(), e)))?;
61
62        let reader = BufReader::new(file);
63        let edges = parse_edge_list(reader)?;
64
65        self.import_edge_list(&edges, edge_type, directed)
66    }
67
68    /// Bulk-imports from a string containing TSV edge list data.
69    ///
70    /// Same format as [`import_tsv`](Self::import_tsv) but reads from a string
71    /// instead of a file. Useful for tests and embedded data.
72    pub fn import_tsv_str(
73        &self,
74        data: &str,
75        edge_type: &str,
76        directed: bool,
77    ) -> Result<(usize, usize)> {
78        let reader = BufReader::new(data.as_bytes());
79        let edges = parse_edge_list(reader)?;
80        self.import_edge_list(&edges, edge_type, directed)
81    }
82
83    /// Bulk-imports from a Matrix Market (MMIO) coordinate format file.
84    ///
85    /// Handles the standard MMIO header:
86    /// ```text
87    /// %%MatrixMarket matrix coordinate real general
88    /// % comment
89    /// rows cols nnz
90    /// row col [value]
91    /// ```
92    ///
93    /// Symmetric matrices automatically create edges in both directions.
94    ///
95    /// # Arguments
96    ///
97    /// * `path` - Path to the `.mtx` file.
98    /// * `edge_type` - Edge type label for all imported edges.
99    ///
100    /// # Returns
101    ///
102    /// `(node_count, edge_count)` on success.
103    pub fn import_mmio(&self, path: impl AsRef<Path>, edge_type: &str) -> Result<(usize, usize)> {
104        let path = path.as_ref();
105        let file = std::fs::File::open(path)
106            .map_err(|e| Error::Internal(format!("failed to open {}: {}", path.display(), e)))?;
107
108        let reader = BufReader::new(file);
109        let (edges, symmetric) = parse_mmio(reader)?;
110        self.import_edge_list(&edges, edge_type, !symmetric)
111    }
112
113    /// Bulk-imports a pre-parsed edge list into the LPG store.
114    fn import_edge_list(
115        &self,
116        edges: &[(u64, u64)],
117        edge_type: &str,
118        directed: bool,
119    ) -> Result<(usize, usize)> {
120        let store = self.lpg_store();
121
122        // Phase 1: Collect unique external IDs and create nodes.
123        let mut ext_to_int: FxHashMap<u64, NodeId> = FxHashMap::default();
124
125        for &(src, dst) in edges {
126            if !ext_to_int.contains_key(&src) {
127                let id = store.create_node(&["_Imported"]);
128                ext_to_int.insert(src, id);
129            }
130            if !ext_to_int.contains_key(&dst) {
131                let id = store.create_node(&["_Imported"]);
132                ext_to_int.insert(dst, id);
133            }
134        }
135
136        // Phase 2: Create edges in batch.
137        let mut batch: Vec<(NodeId, NodeId, &str)> = Vec::with_capacity(if directed {
138            edges.len()
139        } else {
140            edges.len() * 2
141        });
142
143        for &(src, dst) in edges {
144            let src_id = ext_to_int[&src];
145            let dst_id = ext_to_int[&dst];
146            batch.push((src_id, dst_id, edge_type));
147            if !directed {
148                batch.push((dst_id, src_id, edge_type));
149            }
150        }
151
152        store.batch_create_edges(&batch);
153
154        let node_count = ext_to_int.len();
155        let edge_count = batch.len();
156
157        // Refresh statistics so the optimizer has fresh data.
158        store.ensure_statistics_fresh();
159
160        Ok((node_count, edge_count))
161    }
162
163    /// Bulk-imports a TSV edge list into the RDF store.
164    ///
165    /// Each edge `(src, dst)` becomes a triple:
166    /// `<{base_uri}{src}> <{predicate_uri}> <{base_uri}{dst}>`
167    ///
168    /// # Arguments
169    ///
170    /// * `path` - Path to the TSV file.
171    /// * `predicate_uri` - Full IRI for the edge predicate (e.g., `"http://example.org/connects"`).
172    /// * `base_uri` - Base IRI prefix for node identifiers (e.g., `"http://example.org/node/"`).
173    ///
174    /// # Returns
175    ///
176    /// `(node_count, edge_count)` on success.
177    #[cfg(feature = "rdf")]
178    pub fn import_tsv_rdf(
179        &self,
180        path: impl AsRef<Path>,
181        predicate_uri: &str,
182        base_uri: &str,
183    ) -> Result<(usize, usize)> {
184        use grafeo_core::graph::rdf::{Term, Triple};
185
186        let path = path.as_ref();
187        let file = std::fs::File::open(path)
188            .map_err(|e| Error::Internal(format!("failed to open {}: {}", path.display(), e)))?;
189
190        let reader = BufReader::new(file);
191        let edges = parse_edge_list(reader)?;
192
193        let predicate = Term::iri(predicate_uri);
194        let mut unique_nodes = grafeo_common::utils::hash::FxHashSet::default();
195
196        let triples: Vec<Triple> = edges
197            .iter()
198            .map(|&(src, dst)| {
199                unique_nodes.insert(src);
200                unique_nodes.insert(dst);
201                Triple::new(
202                    Term::iri(format!("{base_uri}{src}")),
203                    predicate.clone(),
204                    Term::iri(format!("{base_uri}{dst}")),
205                )
206            })
207            .collect();
208
209        let edge_count = self.rdf_store.batch_insert(triples);
210
211        Ok((unique_nodes.len(), edge_count))
212    }
213}
214
215/// Parses a TSV/space-separated edge list from a reader.
216///
217/// Each non-comment, non-empty line should contain at least two whitespace-separated
218/// integer IDs. Additional columns (e.g., weights) are ignored.
219fn parse_edge_list(reader: impl BufRead) -> Result<Vec<(u64, u64)>> {
220    let mut edges = Vec::new();
221
222    for (line_num, line) in reader.lines().enumerate() {
223        let line = line
224            .map_err(|e| Error::Internal(format!("read error at line {}: {}", line_num + 1, e)))?;
225        let trimmed = line.trim();
226
227        // Skip comments and empty lines.
228        if trimmed.is_empty() || trimmed.starts_with('#') || trimmed.starts_with('%') {
229            continue;
230        }
231
232        let mut parts = trimmed.split_whitespace();
233        let src_str = parts
234            .next()
235            .ok_or_else(|| Error::Internal(format!("line {}: missing source ID", line_num + 1)))?;
236        let dst_str = parts
237            .next()
238            .ok_or_else(|| Error::Internal(format!("line {}: missing target ID", line_num + 1)))?;
239
240        let src: u64 = src_str.parse().map_err(|_| {
241            Error::Internal(format!(
242                "line {}: invalid source ID '{}'",
243                line_num + 1,
244                src_str
245            ))
246        })?;
247        let dst: u64 = dst_str.parse().map_err(|_| {
248            Error::Internal(format!(
249                "line {}: invalid target ID '{}'",
250                line_num + 1,
251                dst_str
252            ))
253        })?;
254
255        edges.push((src, dst));
256    }
257
258    Ok(edges)
259}
260
261/// Parses a Matrix Market coordinate format file.
262///
263/// Returns the edge list and whether the matrix is symmetric.
264fn parse_mmio(reader: impl BufRead) -> Result<(Vec<(u64, u64)>, bool)> {
265    let mut lines = reader.lines();
266    let mut symmetric = false;
267
268    // Parse header line.
269    let header = lines
270        .next()
271        .ok_or_else(|| Error::Internal("empty MMIO file".into()))?
272        .map_err(|e| Error::Internal(format!("MMIO header read error: {e}")))?;
273
274    if !header.starts_with("%%MatrixMarket") {
275        return Err(Error::Internal(
276            "invalid MMIO file: missing %%MatrixMarket header".into(),
277        ));
278    }
279
280    let header_lower = header.to_lowercase();
281    if header_lower.contains("symmetric") {
282        symmetric = true;
283    }
284
285    // Skip comment lines, find the size line.
286    let mut size_line = String::new();
287    for line in &mut lines {
288        let line = line.map_err(|e| Error::Internal(format!("MMIO read error: {e}")))?;
289        let trimmed = line.trim();
290        if trimmed.starts_with('%') || trimmed.is_empty() {
291            continue;
292        }
293        size_line = trimmed.to_string();
294        break;
295    }
296
297    // Parse size line: rows cols nnz
298    let size_parts: Vec<&str> = size_line.split_whitespace().collect();
299    if size_parts.len() < 3 {
300        return Err(Error::Internal("invalid MMIO size line".into()));
301    }
302    let nnz: usize = size_parts[2]
303        .parse()
304        .map_err(|_| Error::Internal(format!("invalid nnz count: '{}'", size_parts[2])))?;
305
306    // Parse data lines.
307    let mut edges = Vec::with_capacity(nnz);
308    for line in lines {
309        let line = line.map_err(|e| Error::Internal(format!("MMIO read error: {e}")))?;
310        let trimmed = line.trim();
311        if trimmed.is_empty() {
312            continue;
313        }
314
315        let mut parts = trimmed.split_whitespace();
316        let row_str = parts.next().unwrap_or("");
317        let col_str = parts.next().unwrap_or("");
318
319        let row: u64 = row_str
320            .parse()
321            .map_err(|_| Error::Internal(format!("invalid MMIO row: '{row_str}'")))?;
322        let col: u64 = col_str
323            .parse()
324            .map_err(|_| Error::Internal(format!("invalid MMIO col: '{col_str}'")))?;
325
326        edges.push((row, col));
327    }
328
329    Ok((edges, symmetric))
330}
331
332#[cfg(test)]
333mod tests {
334    use super::super::GrafeoDB;
335
336    #[test]
337    fn test_import_tsv_str_directed() {
338        let db = GrafeoDB::new_in_memory();
339        let data = "# comment\n1\t2\n2\t3\n3\t1\n";
340        let (nodes, edges) = db.import_tsv_str(data, "CONNECTS", true).unwrap();
341
342        assert_eq!(nodes, 3);
343        assert_eq!(edges, 3);
344        assert_eq!(db.node_count(), 3);
345        assert_eq!(db.edge_count(), 3);
346    }
347
348    #[test]
349    fn test_import_tsv_str_undirected() {
350        let db = GrafeoDB::new_in_memory();
351        let data = "1 2\n2 3\n";
352        let (nodes, edges) = db.import_tsv_str(data, "CONNECTS", false).unwrap();
353
354        assert_eq!(nodes, 3);
355        assert_eq!(edges, 4); // 2 edges * 2 directions
356    }
357
358    #[test]
359    fn test_import_tsv_str_with_weights() {
360        let db = GrafeoDB::new_in_memory();
361        // Third column (weight) should be ignored
362        let data = "1\t2\t0.5\n2\t3\t1.0\n";
363        let (nodes, edges) = db.import_tsv_str(data, "E", true).unwrap();
364
365        assert_eq!(nodes, 3);
366        assert_eq!(edges, 2);
367    }
368
369    #[test]
370    fn test_import_tsv_str_comments_and_blanks() {
371        let db = GrafeoDB::new_in_memory();
372        let data = "# header\n% also a comment\n\n1 2\n\n3 4\n";
373        let (nodes, edges) = db.import_tsv_str(data, "E", true).unwrap();
374
375        assert_eq!(nodes, 4);
376        assert_eq!(edges, 2);
377    }
378
379    #[test]
380    fn test_import_tsv_str_empty() {
381        let db = GrafeoDB::new_in_memory();
382        let data = "# only comments\n% nothing here\n";
383        let (nodes, edges) = db.import_tsv_str(data, "E", true).unwrap();
384
385        assert_eq!(nodes, 0);
386        assert_eq!(edges, 0);
387    }
388
389    #[test]
390    fn test_import_tsv_str_duplicate_nodes() {
391        let db = GrafeoDB::new_in_memory();
392        // Node 1 appears in multiple edges
393        let data = "1 2\n1 3\n1 4\n";
394        let (nodes, edges) = db.import_tsv_str(data, "E", true).unwrap();
395
396        assert_eq!(nodes, 4); // 1, 2, 3, 4
397        assert_eq!(edges, 3);
398    }
399
400    #[test]
401    fn test_import_mmio_str() {
402        let db = GrafeoDB::new_in_memory();
403        let data = "%%MatrixMarket matrix coordinate real general\n% comment\n3 3 3\n1 2 1.0\n2 3 1.0\n3 1 1.0\n";
404
405        let reader = std::io::BufReader::new(data.as_bytes());
406        let (edges, symmetric) = super::parse_mmio(reader).unwrap();
407
408        assert!(!symmetric);
409        assert_eq!(edges.len(), 3);
410
411        let result = db.import_edge_list(&edges, "E", true);
412        assert!(result.is_ok());
413        let (nodes, edge_count) = result.unwrap();
414        assert_eq!(nodes, 3);
415        assert_eq!(edge_count, 3);
416    }
417
418    #[test]
419    fn test_import_mmio_symmetric() {
420        let data = "%%MatrixMarket matrix coordinate real symmetric\n3 3 2\n1 2 1.0\n2 3 1.0\n";
421
422        let reader = std::io::BufReader::new(data.as_bytes());
423        let (edges, symmetric) = super::parse_mmio(reader).unwrap();
424
425        assert!(symmetric);
426        assert_eq!(edges.len(), 2);
427
428        let db = GrafeoDB::new_in_memory();
429        // Symmetric = undirected = both directions
430        let (nodes, edge_count) = db.import_edge_list(&edges, "E", false).unwrap();
431        assert_eq!(nodes, 3);
432        assert_eq!(edge_count, 4); // 2 edges * 2 directions
433    }
434
435    #[cfg(feature = "rdf")]
436    #[test]
437    fn test_import_tsv_rdf() {
438        use grafeo_core::graph::GraphStore;
439        use grafeo_core::graph::rdf::RdfGraphStoreAdapter;
440
441        let db = GrafeoDB::new_in_memory();
442
443        // Write TSV to a temp file
444        let dir = tempfile::tempdir().unwrap();
445        let path = dir.path().join("test.tsv");
446        std::fs::write(&path, "1\t2\n2\t3\n3\t1\n").unwrap();
447
448        let (nodes, edges) = db
449            .import_tsv_rdf(
450                &path,
451                "http://example.org/connects",
452                "http://example.org/node/",
453            )
454            .unwrap();
455
456        assert_eq!(nodes, 3);
457        assert_eq!(edges, 3);
458
459        // Verify the adapter works on the imported RDF data
460        let adapter = RdfGraphStoreAdapter::new(&db.rdf_store);
461        assert_eq!(adapter.node_count(), 3);
462        assert_eq!(adapter.edge_count(), 3);
463    }
464}