1use std::io::{BufRead, BufReader};
25use std::path::Path;
26
27use grafeo_common::types::NodeId;
28use grafeo_common::utils::error::{Error, Result};
29use grafeo_common::utils::hash::FxHashMap;
30
31impl super::GrafeoDB {
32 pub fn import_tsv(
57 &self,
58 path: impl AsRef<Path>,
59 edge_type: &str,
60 directed: bool,
61 ) -> Result<(usize, usize)> {
62 let path = path.as_ref();
63 let file = std::fs::File::open(path)
64 .map_err(|e| Error::Internal(format!("failed to open {}: {}", path.display(), e)))?;
65
66 let reader = BufReader::new(file);
67 let edges = parse_edge_list(reader)?;
68
69 self.import_edge_list(&edges, edge_type, directed)
70 }
71
72 pub fn import_tsv_str(
81 &self,
82 data: &str,
83 edge_type: &str,
84 directed: bool,
85 ) -> Result<(usize, usize)> {
86 let reader = BufReader::new(data.as_bytes());
87 let edges = parse_edge_list(reader)?;
88 self.import_edge_list(&edges, edge_type, directed)
89 }
90
91 pub fn import_mmio(&self, path: impl AsRef<Path>, edge_type: &str) -> Result<(usize, usize)> {
116 let path = path.as_ref();
117 let file = std::fs::File::open(path)
118 .map_err(|e| Error::Internal(format!("failed to open {}: {}", path.display(), e)))?;
119
120 let reader = BufReader::new(file);
121 let (edges, symmetric) = parse_mmio(reader)?;
122 self.import_edge_list(&edges, edge_type, !symmetric)
123 }
124
125 fn import_edge_list(
127 &self,
128 edges: &[(u64, u64)],
129 edge_type: &str,
130 directed: bool,
131 ) -> Result<(usize, usize)> {
132 let store = self.lpg_store();
133
134 let mut ext_to_int: FxHashMap<u64, NodeId> = FxHashMap::default();
136
137 for &(src, dst) in edges {
138 if !ext_to_int.contains_key(&src) {
139 let id = store.create_node(&["_Imported"]);
140 ext_to_int.insert(src, id);
141 }
142 if !ext_to_int.contains_key(&dst) {
143 let id = store.create_node(&["_Imported"]);
144 ext_to_int.insert(dst, id);
145 }
146 }
147
148 let mut batch: Vec<(NodeId, NodeId, &str)> = Vec::with_capacity(if directed {
150 edges.len()
151 } else {
152 edges.len() * 2
153 });
154
155 for &(src, dst) in edges {
156 let src_id = ext_to_int[&src];
157 let dst_id = ext_to_int[&dst];
158 batch.push((src_id, dst_id, edge_type));
159 if !directed {
160 batch.push((dst_id, src_id, edge_type));
161 }
162 }
163
164 store.batch_create_edges(&batch);
165
166 let node_count = ext_to_int.len();
167 let edge_count = batch.len();
168
169 store.ensure_statistics_fresh();
171
172 Ok((node_count, edge_count))
173 }
174
175 #[cfg(feature = "triple-store")]
194 pub fn import_tsv_rdf(
195 &self,
196 path: impl AsRef<Path>,
197 predicate_uri: &str,
198 base_uri: &str,
199 ) -> Result<(usize, usize)> {
200 use grafeo_core::graph::rdf::{Term, Triple};
201
202 let path = path.as_ref();
203 let file = std::fs::File::open(path)
204 .map_err(|e| Error::Internal(format!("failed to open {}: {}", path.display(), e)))?;
205
206 let reader = BufReader::new(file);
207 let edges = parse_edge_list(reader)?;
208
209 let predicate = Term::iri(predicate_uri);
210 let mut unique_nodes = grafeo_common::utils::hash::FxHashSet::default();
211
212 let triples: Vec<Triple> = edges
213 .iter()
214 .map(|&(src, dst)| {
215 unique_nodes.insert(src);
216 unique_nodes.insert(dst);
217 Triple::new(
218 Term::iri(format!("{base_uri}{src}")),
219 predicate.clone(),
220 Term::iri(format!("{base_uri}{dst}")),
221 )
222 })
223 .collect();
224
225 let edge_count = self.rdf_store.batch_insert(triples);
226
227 Ok((unique_nodes.len(), edge_count))
228 }
229}
230
231fn parse_edge_list(reader: impl BufRead) -> Result<Vec<(u64, u64)>> {
236 let mut edges = Vec::new();
237
238 for (line_num, line) in reader.lines().enumerate() {
239 let line = line
240 .map_err(|e| Error::Internal(format!("read error at line {}: {}", line_num + 1, e)))?;
241 let trimmed = line.trim();
242
243 if trimmed.is_empty() || trimmed.starts_with('#') || trimmed.starts_with('%') {
245 continue;
246 }
247
248 let mut parts = trimmed.split_whitespace();
249 let src_str = parts
250 .next()
251 .ok_or_else(|| Error::Internal(format!("line {}: missing source ID", line_num + 1)))?;
252 let dst_str = parts
253 .next()
254 .ok_or_else(|| Error::Internal(format!("line {}: missing target ID", line_num + 1)))?;
255
256 let src: u64 = src_str.parse().map_err(|_| {
257 Error::Internal(format!(
258 "line {}: invalid source ID '{}'",
259 line_num + 1,
260 src_str
261 ))
262 })?;
263 let dst: u64 = dst_str.parse().map_err(|_| {
264 Error::Internal(format!(
265 "line {}: invalid target ID '{}'",
266 line_num + 1,
267 dst_str
268 ))
269 })?;
270
271 edges.push((src, dst));
272 }
273
274 Ok(edges)
275}
276
277fn parse_mmio(reader: impl BufRead) -> Result<(Vec<(u64, u64)>, bool)> {
281 let mut lines = reader.lines();
282 let mut symmetric = false;
283
284 let header = lines
286 .next()
287 .ok_or_else(|| Error::Internal("empty MMIO file".into()))?
288 .map_err(|e| Error::Internal(format!("MMIO header read error: {e}")))?;
289
290 if !header.starts_with("%%MatrixMarket") {
291 return Err(Error::Internal(
292 "invalid MMIO file: missing %%MatrixMarket header".into(),
293 ));
294 }
295
296 let header_lower = header.to_lowercase();
297 if header_lower.contains("symmetric") {
298 symmetric = true;
299 }
300
301 let mut size_line = String::new();
303 for line in &mut lines {
304 let line = line.map_err(|e| Error::Internal(format!("MMIO read error: {e}")))?;
305 let trimmed = line.trim();
306 if trimmed.starts_with('%') || trimmed.is_empty() {
307 continue;
308 }
309 size_line = trimmed.to_string();
310 break;
311 }
312
313 let size_parts: Vec<&str> = size_line.split_whitespace().collect();
315 if size_parts.len() < 3 {
316 return Err(Error::Internal("invalid MMIO size line".into()));
317 }
318 let nnz: usize = size_parts[2]
319 .parse()
320 .map_err(|_| Error::Internal(format!("invalid nnz count: '{}'", size_parts[2])))?;
321
322 let mut edges = Vec::with_capacity(nnz);
324 for line in lines {
325 let line = line.map_err(|e| Error::Internal(format!("MMIO read error: {e}")))?;
326 let trimmed = line.trim();
327 if trimmed.is_empty() {
328 continue;
329 }
330
331 let mut parts = trimmed.split_whitespace();
332 let row_str = parts.next().unwrap_or("");
333 let col_str = parts.next().unwrap_or("");
334
335 let row: u64 = row_str
336 .parse()
337 .map_err(|_| Error::Internal(format!("invalid MMIO row: '{row_str}'")))?;
338 let col: u64 = col_str
339 .parse()
340 .map_err(|_| Error::Internal(format!("invalid MMIO col: '{col_str}'")))?;
341
342 edges.push((row, col));
343 }
344
345 Ok((edges, symmetric))
346}
347
348#[cfg(test)]
349mod tests {
350 use super::super::GrafeoDB;
351
352 #[test]
353 fn test_import_tsv_str_directed() {
354 let db = GrafeoDB::new_in_memory();
355 let data = "# comment\n1\t2\n2\t3\n3\t1\n";
356 let (nodes, edges) = db.import_tsv_str(data, "CONNECTS", true).unwrap();
357
358 assert_eq!(nodes, 3);
359 assert_eq!(edges, 3);
360 assert_eq!(db.node_count(), 3);
361 assert_eq!(db.edge_count(), 3);
362 }
363
364 #[test]
365 fn test_import_tsv_str_undirected() {
366 let db = GrafeoDB::new_in_memory();
367 let data = "1 2\n2 3\n";
368 let (nodes, edges) = db.import_tsv_str(data, "CONNECTS", false).unwrap();
369
370 assert_eq!(nodes, 3);
371 assert_eq!(edges, 4); }
373
374 #[test]
375 fn test_import_tsv_str_with_weights() {
376 let db = GrafeoDB::new_in_memory();
377 let data = "1\t2\t0.5\n2\t3\t1.0\n";
379 let (nodes, edges) = db.import_tsv_str(data, "E", true).unwrap();
380
381 assert_eq!(nodes, 3);
382 assert_eq!(edges, 2);
383 }
384
385 #[test]
386 fn test_import_tsv_str_comments_and_blanks() {
387 let db = GrafeoDB::new_in_memory();
388 let data = "# header\n% also a comment\n\n1 2\n\n3 4\n";
389 let (nodes, edges) = db.import_tsv_str(data, "E", true).unwrap();
390
391 assert_eq!(nodes, 4);
392 assert_eq!(edges, 2);
393 }
394
395 #[test]
396 fn test_import_tsv_str_empty() {
397 let db = GrafeoDB::new_in_memory();
398 let data = "# only comments\n% nothing here\n";
399 let (nodes, edges) = db.import_tsv_str(data, "E", true).unwrap();
400
401 assert_eq!(nodes, 0);
402 assert_eq!(edges, 0);
403 }
404
405 #[test]
406 fn test_import_tsv_str_duplicate_nodes() {
407 let db = GrafeoDB::new_in_memory();
408 let data = "1 2\n1 3\n1 4\n";
410 let (nodes, edges) = db.import_tsv_str(data, "E", true).unwrap();
411
412 assert_eq!(nodes, 4); assert_eq!(edges, 3);
414 }
415
416 #[test]
417 fn test_import_mmio_str() {
418 let db = GrafeoDB::new_in_memory();
419 let data = "%%MatrixMarket matrix coordinate real general\n% comment\n3 3 3\n1 2 1.0\n2 3 1.0\n3 1 1.0\n";
420
421 let reader = std::io::BufReader::new(data.as_bytes());
422 let (edges, symmetric) = super::parse_mmio(reader).unwrap();
423
424 assert!(!symmetric);
425 assert_eq!(edges.len(), 3);
426
427 let result = db.import_edge_list(&edges, "E", true);
428 assert!(result.is_ok());
429 let (nodes, edge_count) = result.unwrap();
430 assert_eq!(nodes, 3);
431 assert_eq!(edge_count, 3);
432 }
433
434 #[test]
435 fn test_import_mmio_symmetric() {
436 let data = "%%MatrixMarket matrix coordinate real symmetric\n3 3 2\n1 2 1.0\n2 3 1.0\n";
437
438 let reader = std::io::BufReader::new(data.as_bytes());
439 let (edges, symmetric) = super::parse_mmio(reader).unwrap();
440
441 assert!(symmetric);
442 assert_eq!(edges.len(), 2);
443
444 let db = GrafeoDB::new_in_memory();
445 let (nodes, edge_count) = db.import_edge_list(&edges, "E", false).unwrap();
447 assert_eq!(nodes, 3);
448 assert_eq!(edge_count, 4); }
450
451 #[cfg(feature = "triple-store")]
452 #[test]
453 fn test_import_tsv_rdf() {
454 use grafeo_core::graph::GraphStore;
455 use grafeo_core::graph::rdf::RdfGraphStoreAdapter;
456
457 let db = GrafeoDB::new_in_memory();
458
459 let dir = tempfile::tempdir().unwrap();
461 let path = dir.path().join("test.tsv");
462 std::fs::write(&path, "1\t2\n2\t3\n3\t1\n").unwrap();
463
464 let (nodes, edges) = db
465 .import_tsv_rdf(
466 &path,
467 "http://example.org/connects",
468 "http://example.org/node/",
469 )
470 .unwrap();
471
472 assert_eq!(nodes, 3);
473 assert_eq!(edges, 3);
474
475 let adapter = RdfGraphStoreAdapter::new(&db.rdf_store);
477 assert_eq!(adapter.node_count(), 3);
478 assert_eq!(adapter.edge_count(), 3);
479 }
480}