1use std::io::{BufRead, BufReader};
25use std::path::Path;
26
27use grafeo_common::types::NodeId;
28use grafeo_common::utils::error::{Error, Result};
29use grafeo_common::utils::hash::FxHashMap;
30
31impl super::GrafeoDB {
32 pub fn import_tsv(
53 &self,
54 path: impl AsRef<Path>,
55 edge_type: &str,
56 directed: bool,
57 ) -> Result<(usize, usize)> {
58 let path = path.as_ref();
59 let file = std::fs::File::open(path)
60 .map_err(|e| Error::Internal(format!("failed to open {}: {}", path.display(), e)))?;
61
62 let reader = BufReader::new(file);
63 let edges = parse_edge_list(reader)?;
64
65 self.import_edge_list(&edges, edge_type, directed)
66 }
67
68 pub fn import_tsv_str(
73 &self,
74 data: &str,
75 edge_type: &str,
76 directed: bool,
77 ) -> Result<(usize, usize)> {
78 let reader = BufReader::new(data.as_bytes());
79 let edges = parse_edge_list(reader)?;
80 self.import_edge_list(&edges, edge_type, directed)
81 }
82
83 pub fn import_mmio(&self, path: impl AsRef<Path>, edge_type: &str) -> Result<(usize, usize)> {
104 let path = path.as_ref();
105 let file = std::fs::File::open(path)
106 .map_err(|e| Error::Internal(format!("failed to open {}: {}", path.display(), e)))?;
107
108 let reader = BufReader::new(file);
109 let (edges, symmetric) = parse_mmio(reader)?;
110 self.import_edge_list(&edges, edge_type, !symmetric)
111 }
112
113 fn import_edge_list(
115 &self,
116 edges: &[(u64, u64)],
117 edge_type: &str,
118 directed: bool,
119 ) -> Result<(usize, usize)> {
120 let store = self.lpg_store();
121
122 let mut ext_to_int: FxHashMap<u64, NodeId> = FxHashMap::default();
124
125 for &(src, dst) in edges {
126 if !ext_to_int.contains_key(&src) {
127 let id = store.create_node(&["_Imported"]);
128 ext_to_int.insert(src, id);
129 }
130 if !ext_to_int.contains_key(&dst) {
131 let id = store.create_node(&["_Imported"]);
132 ext_to_int.insert(dst, id);
133 }
134 }
135
136 let mut batch: Vec<(NodeId, NodeId, &str)> = Vec::with_capacity(if directed {
138 edges.len()
139 } else {
140 edges.len() * 2
141 });
142
143 for &(src, dst) in edges {
144 let src_id = ext_to_int[&src];
145 let dst_id = ext_to_int[&dst];
146 batch.push((src_id, dst_id, edge_type));
147 if !directed {
148 batch.push((dst_id, src_id, edge_type));
149 }
150 }
151
152 store.batch_create_edges(&batch);
153
154 let node_count = ext_to_int.len();
155 let edge_count = batch.len();
156
157 store.ensure_statistics_fresh();
159
160 Ok((node_count, edge_count))
161 }
162
163 #[cfg(feature = "rdf")]
178 pub fn import_tsv_rdf(
179 &self,
180 path: impl AsRef<Path>,
181 predicate_uri: &str,
182 base_uri: &str,
183 ) -> Result<(usize, usize)> {
184 use grafeo_core::graph::rdf::{Term, Triple};
185
186 let path = path.as_ref();
187 let file = std::fs::File::open(path)
188 .map_err(|e| Error::Internal(format!("failed to open {}: {}", path.display(), e)))?;
189
190 let reader = BufReader::new(file);
191 let edges = parse_edge_list(reader)?;
192
193 let predicate = Term::iri(predicate_uri);
194 let mut unique_nodes = grafeo_common::utils::hash::FxHashSet::default();
195
196 let triples: Vec<Triple> = edges
197 .iter()
198 .map(|&(src, dst)| {
199 unique_nodes.insert(src);
200 unique_nodes.insert(dst);
201 Triple::new(
202 Term::iri(format!("{base_uri}{src}")),
203 predicate.clone(),
204 Term::iri(format!("{base_uri}{dst}")),
205 )
206 })
207 .collect();
208
209 let edge_count = self.rdf_store.batch_insert(triples);
210
211 Ok((unique_nodes.len(), edge_count))
212 }
213}
214
215fn parse_edge_list(reader: impl BufRead) -> Result<Vec<(u64, u64)>> {
220 let mut edges = Vec::new();
221
222 for (line_num, line) in reader.lines().enumerate() {
223 let line = line
224 .map_err(|e| Error::Internal(format!("read error at line {}: {}", line_num + 1, e)))?;
225 let trimmed = line.trim();
226
227 if trimmed.is_empty() || trimmed.starts_with('#') || trimmed.starts_with('%') {
229 continue;
230 }
231
232 let mut parts = trimmed.split_whitespace();
233 let src_str = parts
234 .next()
235 .ok_or_else(|| Error::Internal(format!("line {}: missing source ID", line_num + 1)))?;
236 let dst_str = parts
237 .next()
238 .ok_or_else(|| Error::Internal(format!("line {}: missing target ID", line_num + 1)))?;
239
240 let src: u64 = src_str.parse().map_err(|_| {
241 Error::Internal(format!(
242 "line {}: invalid source ID '{}'",
243 line_num + 1,
244 src_str
245 ))
246 })?;
247 let dst: u64 = dst_str.parse().map_err(|_| {
248 Error::Internal(format!(
249 "line {}: invalid target ID '{}'",
250 line_num + 1,
251 dst_str
252 ))
253 })?;
254
255 edges.push((src, dst));
256 }
257
258 Ok(edges)
259}
260
261fn parse_mmio(reader: impl BufRead) -> Result<(Vec<(u64, u64)>, bool)> {
265 let mut lines = reader.lines();
266 let mut symmetric = false;
267
268 let header = lines
270 .next()
271 .ok_or_else(|| Error::Internal("empty MMIO file".into()))?
272 .map_err(|e| Error::Internal(format!("MMIO header read error: {e}")))?;
273
274 if !header.starts_with("%%MatrixMarket") {
275 return Err(Error::Internal(
276 "invalid MMIO file: missing %%MatrixMarket header".into(),
277 ));
278 }
279
280 let header_lower = header.to_lowercase();
281 if header_lower.contains("symmetric") {
282 symmetric = true;
283 }
284
285 let mut size_line = String::new();
287 for line in &mut lines {
288 let line = line.map_err(|e| Error::Internal(format!("MMIO read error: {e}")))?;
289 let trimmed = line.trim();
290 if trimmed.starts_with('%') || trimmed.is_empty() {
291 continue;
292 }
293 size_line = trimmed.to_string();
294 break;
295 }
296
297 let size_parts: Vec<&str> = size_line.split_whitespace().collect();
299 if size_parts.len() < 3 {
300 return Err(Error::Internal("invalid MMIO size line".into()));
301 }
302 let nnz: usize = size_parts[2]
303 .parse()
304 .map_err(|_| Error::Internal(format!("invalid nnz count: '{}'", size_parts[2])))?;
305
306 let mut edges = Vec::with_capacity(nnz);
308 for line in lines {
309 let line = line.map_err(|e| Error::Internal(format!("MMIO read error: {e}")))?;
310 let trimmed = line.trim();
311 if trimmed.is_empty() {
312 continue;
313 }
314
315 let mut parts = trimmed.split_whitespace();
316 let row_str = parts.next().unwrap_or("");
317 let col_str = parts.next().unwrap_or("");
318
319 let row: u64 = row_str
320 .parse()
321 .map_err(|_| Error::Internal(format!("invalid MMIO row: '{row_str}'")))?;
322 let col: u64 = col_str
323 .parse()
324 .map_err(|_| Error::Internal(format!("invalid MMIO col: '{col_str}'")))?;
325
326 edges.push((row, col));
327 }
328
329 Ok((edges, symmetric))
330}
331
332#[cfg(test)]
333mod tests {
334 use super::super::GrafeoDB;
335
336 #[test]
337 fn test_import_tsv_str_directed() {
338 let db = GrafeoDB::new_in_memory();
339 let data = "# comment\n1\t2\n2\t3\n3\t1\n";
340 let (nodes, edges) = db.import_tsv_str(data, "CONNECTS", true).unwrap();
341
342 assert_eq!(nodes, 3);
343 assert_eq!(edges, 3);
344 assert_eq!(db.node_count(), 3);
345 assert_eq!(db.edge_count(), 3);
346 }
347
348 #[test]
349 fn test_import_tsv_str_undirected() {
350 let db = GrafeoDB::new_in_memory();
351 let data = "1 2\n2 3\n";
352 let (nodes, edges) = db.import_tsv_str(data, "CONNECTS", false).unwrap();
353
354 assert_eq!(nodes, 3);
355 assert_eq!(edges, 4); }
357
358 #[test]
359 fn test_import_tsv_str_with_weights() {
360 let db = GrafeoDB::new_in_memory();
361 let data = "1\t2\t0.5\n2\t3\t1.0\n";
363 let (nodes, edges) = db.import_tsv_str(data, "E", true).unwrap();
364
365 assert_eq!(nodes, 3);
366 assert_eq!(edges, 2);
367 }
368
369 #[test]
370 fn test_import_tsv_str_comments_and_blanks() {
371 let db = GrafeoDB::new_in_memory();
372 let data = "# header\n% also a comment\n\n1 2\n\n3 4\n";
373 let (nodes, edges) = db.import_tsv_str(data, "E", true).unwrap();
374
375 assert_eq!(nodes, 4);
376 assert_eq!(edges, 2);
377 }
378
379 #[test]
380 fn test_import_tsv_str_empty() {
381 let db = GrafeoDB::new_in_memory();
382 let data = "# only comments\n% nothing here\n";
383 let (nodes, edges) = db.import_tsv_str(data, "E", true).unwrap();
384
385 assert_eq!(nodes, 0);
386 assert_eq!(edges, 0);
387 }
388
389 #[test]
390 fn test_import_tsv_str_duplicate_nodes() {
391 let db = GrafeoDB::new_in_memory();
392 let data = "1 2\n1 3\n1 4\n";
394 let (nodes, edges) = db.import_tsv_str(data, "E", true).unwrap();
395
396 assert_eq!(nodes, 4); assert_eq!(edges, 3);
398 }
399
400 #[test]
401 fn test_import_mmio_str() {
402 let db = GrafeoDB::new_in_memory();
403 let data = "%%MatrixMarket matrix coordinate real general\n% comment\n3 3 3\n1 2 1.0\n2 3 1.0\n3 1 1.0\n";
404
405 let reader = std::io::BufReader::new(data.as_bytes());
406 let (edges, symmetric) = super::parse_mmio(reader).unwrap();
407
408 assert!(!symmetric);
409 assert_eq!(edges.len(), 3);
410
411 let result = db.import_edge_list(&edges, "E", true);
412 assert!(result.is_ok());
413 let (nodes, edge_count) = result.unwrap();
414 assert_eq!(nodes, 3);
415 assert_eq!(edge_count, 3);
416 }
417
418 #[test]
419 fn test_import_mmio_symmetric() {
420 let data = "%%MatrixMarket matrix coordinate real symmetric\n3 3 2\n1 2 1.0\n2 3 1.0\n";
421
422 let reader = std::io::BufReader::new(data.as_bytes());
423 let (edges, symmetric) = super::parse_mmio(reader).unwrap();
424
425 assert!(symmetric);
426 assert_eq!(edges.len(), 2);
427
428 let db = GrafeoDB::new_in_memory();
429 let (nodes, edge_count) = db.import_edge_list(&edges, "E", false).unwrap();
431 assert_eq!(nodes, 3);
432 assert_eq!(edge_count, 4); }
434
435 #[cfg(feature = "rdf")]
436 #[test]
437 fn test_import_tsv_rdf() {
438 use grafeo_core::graph::GraphStore;
439 use grafeo_core::graph::rdf::RdfGraphStoreAdapter;
440
441 let db = GrafeoDB::new_in_memory();
442
443 let dir = tempfile::tempdir().unwrap();
445 let path = dir.path().join("test.tsv");
446 std::fs::write(&path, "1\t2\n2\t3\n3\t1\n").unwrap();
447
448 let (nodes, edges) = db
449 .import_tsv_rdf(
450 &path,
451 "http://example.org/connects",
452 "http://example.org/node/",
453 )
454 .unwrap();
455
456 assert_eq!(nodes, 3);
457 assert_eq!(edges, 3);
458
459 let adapter = RdfGraphStoreAdapter::new(&db.rdf_store);
461 assert_eq!(adapter.node_count(), 3);
462 assert_eq!(adapter.edge_count(), 3);
463 }
464}