Skip to main content

graphrag_core/pipeline/
data_import.rs

1//! Data Import Pipeline
2//!
3//! This module provides data import capabilities from multiple formats:
4//! - CSV/TSV files
5//! - JSON/JSONL (newline-delimited JSON)
6//! - RDF/Turtle (semantic web formats)
7//! - GraphML (graph exchange format)
8//! - Streaming ingestion from various sources
9//!
10//! ## Architecture
11//!
12//! ```text
13//! Data Source → Parser → Validator → Transformer → Graph Builder
14//!      │           │          │           │              │
15//!      ▼           ▼          ▼           ▼              ▼
16//!   CSV/JSON   Schema    Required    Normalize      KnowledgeGraph
17//!   Files      Check     Fields      Format
18//! ```
19
20use serde::{Deserialize, Serialize};
21use std::path::Path;
22use std::fs::File;
23use std::io::{BufRead, BufReader};
24use csv::ReaderBuilder;
25
26/// Supported data formats
27#[derive(Debug, Clone, PartialEq, Serialize, Deserialize)]
28#[allow(clippy::upper_case_acronyms)]
29pub enum DataFormat {
30    /// CSV (comma-separated values)
31    CSV,
32    /// TSV (tab-separated values)
33    TSV,
34    /// JSON (JavaScript Object Notation)
35    JSON,
36    /// JSONL (newline-delimited JSON)
37    JSONL,
38    /// RDF/Turtle (Resource Description Framework)
39    RDF,
40    /// GraphML (graph markup language)
41    GraphML,
42}
43
44/// Import configuration
45#[derive(Debug, Clone, Serialize, Deserialize)]
46pub struct ImportConfig {
47    /// Data format
48    pub format: DataFormat,
49    /// Skip validation
50    pub skip_validation: bool,
51    /// Batch size for processing
52    pub batch_size: usize,
53    /// Maximum errors before aborting
54    pub max_errors: usize,
55    /// Column mappings (for CSV/TSV)
56    pub column_mappings: Option<ColumnMappings>,
57}
58
59impl Default for ImportConfig {
60    fn default() -> Self {
61        Self {
62            format: DataFormat::JSON,
63            skip_validation: false,
64            batch_size: 1000,
65            max_errors: 10,
66            column_mappings: None,
67        }
68    }
69}
70
71/// Column mappings for CSV/TSV
72#[derive(Debug, Clone, Serialize, Deserialize)]
73pub struct ColumnMappings {
74    /// Entity ID column
75    pub entity_id: String,
76    /// Entity name column
77    pub entity_name: String,
78    /// Entity type column
79    pub entity_type: String,
80    /// Optional source column for relationships
81    pub relationship_source: Option<String>,
82    /// Optional target column for relationships
83    pub relationship_target: Option<String>,
84    /// Optional relationship type column
85    pub relationship_type: Option<String>,
86}
87
88/// Imported entity
89#[derive(Debug, Clone, Serialize, Deserialize)]
90pub struct ImportedEntity {
91    /// Unique identifier for the entity
92    pub id: String,
93    /// Display name of the entity
94    pub name: String,
95    /// Type/category of the entity
96    pub entity_type: String,
97    /// Additional attributes as key-value pairs
98    pub attributes: std::collections::HashMap<String, String>,
99}
100
101/// Imported relationship
102#[derive(Debug, Clone, Serialize, Deserialize)]
103pub struct ImportedRelationship {
104    /// Source entity identifier
105    pub source: String,
106    /// Target entity identifier
107    pub target: String,
108    /// Type of the relationship
109    pub relation_type: String,
110    /// Additional attributes as key-value pairs
111    pub attributes: std::collections::HashMap<String, String>,
112}
113
114/// Import result
115#[derive(Debug, Clone)]
116pub struct ImportResult {
117    /// Number of entities imported
118    pub entities_imported: usize,
119    /// Number of relationships imported
120    pub relationships_imported: usize,
121    /// Number of errors
122    pub errors: Vec<ImportError>,
123    /// Processing time (milliseconds)
124    pub processing_time_ms: u64,
125}
126
127/// Import errors
128#[derive(Debug, Clone)]
129pub enum ImportError {
130    /// File not found
131    FileNotFound(String),
132    /// Parse error
133    ParseError(String, usize), // message, line number
134    /// Validation error
135    ValidationError(String),
136    /// Missing required field
137    MissingField(String),
138    /// Invalid format
139    InvalidFormat(String),
140}
141
142impl std::fmt::Display for ImportError {
143    fn fmt(&self, f: &mut std::fmt::Formatter) -> std::fmt::Result {
144        match self {
145            ImportError::FileNotFound(path) => write!(f, "File not found: {}", path),
146            ImportError::ParseError(msg, line) => write!(f, "Parse error at line {}: {}", line, msg),
147            ImportError::ValidationError(msg) => write!(f, "Validation error: {}", msg),
148            ImportError::MissingField(field) => write!(f, "Missing required field: {}", field),
149            ImportError::InvalidFormat(msg) => write!(f, "Invalid format: {}", msg),
150        }
151    }
152}
153
154impl std::error::Error for ImportError {}
155
156/// Data importer
157pub struct DataImporter {
158    config: ImportConfig,
159}
160
161impl DataImporter {
162    /// Create new importer
163    pub fn new(config: ImportConfig) -> Self {
164        Self { config }
165    }
166
167    /// Import from file
168    pub fn import_file(&self, path: impl AsRef<Path>) -> Result<ImportResult, ImportError> {
169        let path = path.as_ref();
170
171        if !path.exists() {
172            return Err(ImportError::FileNotFound(path.display().to_string()));
173        }
174
175        let start_time = std::time::Instant::now();
176
177        let result = match self.config.format {
178            DataFormat::CSV => self.import_csv(path)?,
179            DataFormat::TSV => self.import_tsv(path)?,
180            DataFormat::JSON => self.import_json(path)?,
181            DataFormat::JSONL => self.import_jsonl(path)?,
182            DataFormat::RDF => self.import_rdf(path)?,
183            DataFormat::GraphML => self.import_graphml(path)?,
184        };
185
186        let processing_time_ms = start_time.elapsed().as_millis() as u64;
187
188        Ok(ImportResult {
189            entities_imported: result.entities_imported,
190            relationships_imported: result.relationships_imported,
191            errors: result.errors,
192            processing_time_ms,
193        })
194    }
195
196    /// Import CSV file
197    fn import_csv(&self, path: &Path) -> Result<ImportResult, ImportError> {
198        self.import_csv_with_delimiter(path, b',')
199    }
200
201    /// Import CSV/TSV with custom delimiter
202    fn import_csv_with_delimiter(&self, path: &Path, delimiter: u8) -> Result<ImportResult, ImportError> {
203        let mut entities = Vec::new();
204        let mut relationships = Vec::new();
205        let mut errors = Vec::new();
206
207        let file = File::open(path)
208            .map_err(|e| ImportError::ParseError(format!("Failed to open file: {}", e), 0))?;
209
210        let mut reader = ReaderBuilder::new()
211            .delimiter(delimiter)
212            .has_headers(true)
213            .from_reader(file);
214
215        // Get headers
216        let headers = reader.headers()
217            .map_err(|e| ImportError::ParseError(format!("Failed to read headers: {}", e), 0))?
218            .clone();
219
220        let mappings = self.config.column_mappings.as_ref()
221            .ok_or_else(|| ImportError::ValidationError("Column mappings required for CSV import".to_string()))?;
222
223        // Find column indices
224        let entity_id_idx = headers.iter().position(|h| h == mappings.entity_id)
225            .ok_or_else(|| ImportError::MissingField(mappings.entity_id.clone()))?;
226        let entity_name_idx = headers.iter().position(|h| h == mappings.entity_name)
227            .ok_or_else(|| ImportError::MissingField(mappings.entity_name.clone()))?;
228        let entity_type_idx = headers.iter().position(|h| h == mappings.entity_type)
229            .ok_or_else(|| ImportError::MissingField(mappings.entity_type.clone()))?;
230
231        // Optional relationship columns
232        let rel_source_idx = mappings.relationship_source.as_ref()
233            .and_then(|col| headers.iter().position(|h| h == col));
234        let rel_target_idx = mappings.relationship_target.as_ref()
235            .and_then(|col| headers.iter().position(|h| h == col));
236        let rel_type_idx = mappings.relationship_type.as_ref()
237            .and_then(|col| headers.iter().position(|h| h == col));
238
239        // Process records
240        for (line_num, result) in reader.records().enumerate() {
241            let record = match result {
242                Ok(r) => r,
243                Err(e) => {
244                    errors.push(ImportError::ParseError(
245                        format!("CSV parse error: {}", e),
246                        line_num + 2, // +2 for header and 0-indexing
247                    ));
248                    if errors.len() >= self.config.max_errors {
249                        break;
250                    }
251                    continue;
252                }
253            };
254
255            // Extract entity
256            let entity_id = record.get(entity_id_idx)
257                .unwrap_or("")
258                .to_string();
259            let entity_name = record.get(entity_name_idx)
260                .unwrap_or("")
261                .to_string();
262            let entity_type = record.get(entity_type_idx)
263                .unwrap_or("")
264                .to_string();
265
266            if !entity_id.is_empty() && !entity_name.is_empty() && !entity_type.is_empty() {
267                // Collect additional attributes
268                let mut attributes = std::collections::HashMap::new();
269                for (idx, header) in headers.iter().enumerate() {
270                    if idx != entity_id_idx && idx != entity_name_idx && idx != entity_type_idx {
271                        if let Some(value) = record.get(idx) {
272                            if !value.is_empty() {
273                                attributes.insert(header.to_string(), value.to_string());
274                            }
275                        }
276                    }
277                }
278
279                let entity = ImportedEntity {
280                    id: entity_id,
281                    name: entity_name,
282                    entity_type,
283                    attributes,
284                };
285
286                // Validate if not skipped
287                if !self.config.skip_validation {
288                    if let Err(e) = self.validate_entity(&entity) {
289                        errors.push(e);
290                        if errors.len() >= self.config.max_errors {
291                            break;
292                        }
293                        continue;
294                    }
295                }
296
297                entities.push(entity);
298            }
299
300            // Extract relationship if columns present
301            if let (Some(src_idx), Some(tgt_idx), Some(type_idx)) =
302                (rel_source_idx, rel_target_idx, rel_type_idx) {
303
304                if let (Some(source), Some(target), Some(rel_type)) =
305                    (record.get(src_idx), record.get(tgt_idx), record.get(type_idx)) {
306
307                    if !source.is_empty() && !target.is_empty() && !rel_type.is_empty() {
308                        let relationship = ImportedRelationship {
309                            source: source.to_string(),
310                            target: target.to_string(),
311                            relation_type: rel_type.to_string(),
312                            attributes: std::collections::HashMap::new(),
313                        };
314
315                        if !self.config.skip_validation {
316                            if let Err(e) = self.validate_relationship(&relationship) {
317                                errors.push(e);
318                                if errors.len() >= self.config.max_errors {
319                                    break;
320                                }
321                                continue;
322                            }
323                        }
324
325                        relationships.push(relationship);
326                    }
327                }
328            }
329        }
330
331        Ok(ImportResult {
332            entities_imported: entities.len(),
333            relationships_imported: relationships.len(),
334            errors,
335            processing_time_ms: 0, // Will be filled by import_file
336        })
337    }
338
339    /// Import TSV file
340    fn import_tsv(&self, path: &Path) -> Result<ImportResult, ImportError> {
341        // TSV is just CSV with tab delimiter
342        self.import_csv_with_delimiter(path, b'\t')
343    }
344
345    /// Import JSON file
346    fn import_json(&self, path: &Path) -> Result<ImportResult, ImportError> {
347        let file = File::open(path)
348            .map_err(|e| ImportError::ParseError(format!("Failed to open file: {}", e), 0))?;
349
350        let reader = BufReader::new(file);
351
352        // Expected JSON structure:
353        // {
354        //   "entities": [...],
355        //   "relationships": [...]
356        // }
357        #[derive(Deserialize)]
358        struct JsonData {
359            entities: Option<Vec<ImportedEntity>>,
360            relationships: Option<Vec<ImportedRelationship>>,
361        }
362
363        let json_data: JsonData = serde_json::from_reader(reader)
364            .map_err(|e| ImportError::ParseError(format!("JSON parse error: {}", e), 0))?;
365
366        let mut errors = Vec::new();
367        let mut valid_entities = Vec::new();
368        let mut valid_relationships = Vec::new();
369
370        // Validate entities
371        if let Some(entities) = json_data.entities {
372            for entity in entities {
373                if !self.config.skip_validation {
374                    if let Err(e) = self.validate_entity(&entity) {
375                        errors.push(e);
376                        if errors.len() >= self.config.max_errors {
377                            break;
378                        }
379                        continue;
380                    }
381                }
382                valid_entities.push(entity);
383            }
384        }
385
386        // Validate relationships
387        if let Some(relationships) = json_data.relationships {
388            for rel in relationships {
389                if !self.config.skip_validation {
390                    if let Err(e) = self.validate_relationship(&rel) {
391                        errors.push(e);
392                        if errors.len() >= self.config.max_errors {
393                            break;
394                        }
395                        continue;
396                    }
397                }
398                valid_relationships.push(rel);
399            }
400        }
401
402        Ok(ImportResult {
403            entities_imported: valid_entities.len(),
404            relationships_imported: valid_relationships.len(),
405            errors,
406            processing_time_ms: 0,
407        })
408    }
409
410    /// Import JSONL file
411    fn import_jsonl(&self, path: &Path) -> Result<ImportResult, ImportError> {
412        let file = File::open(path)
413            .map_err(|e| ImportError::ParseError(format!("Failed to open file: {}", e), 0))?;
414
415        let reader = BufReader::new(file);
416        let mut errors = Vec::new();
417        let mut entities = Vec::new();
418        let mut relationships = Vec::new();
419
420        // Each line is either an entity or relationship JSON object
421        // Expected format:
422        // {"type": "entity", "id": "...", "name": "...", "entity_type": "...", "attributes": {...}}
423        // {"type": "relationship", "source": "...", "target": "...", "relation_type": "...", "attributes": {...}}
424
425        #[derive(Deserialize)]
426        #[serde(tag = "type")]
427        enum JsonLine {
428            #[serde(rename = "entity")]
429            Entity {
430                id: String,
431                name: String,
432                entity_type: String,
433                #[serde(default)]
434                attributes: std::collections::HashMap<String, String>,
435            },
436            #[serde(rename = "relationship")]
437            Relationship {
438                source: String,
439                target: String,
440                relation_type: String,
441                #[serde(default)]
442                attributes: std::collections::HashMap<String, String>,
443            },
444        }
445
446        for (line_num, line) in reader.lines().enumerate() {
447            let line = match line {
448                Ok(l) => l,
449                Err(e) => {
450                    errors.push(ImportError::ParseError(
451                        format!("Failed to read line: {}", e),
452                        line_num + 1,
453                    ));
454                    if errors.len() >= self.config.max_errors {
455                        break;
456                    }
457                    continue;
458                }
459            };
460
461            // Skip empty lines
462            if line.trim().is_empty() {
463                continue;
464            }
465
466            let parsed: JsonLine = match serde_json::from_str(&line) {
467                Ok(p) => p,
468                Err(e) => {
469                    errors.push(ImportError::ParseError(
470                        format!("JSON parse error: {}", e),
471                        line_num + 1,
472                    ));
473                    if errors.len() >= self.config.max_errors {
474                        break;
475                    }
476                    continue;
477                }
478            };
479
480            match parsed {
481                JsonLine::Entity { id, name, entity_type, attributes } => {
482                    let entity = ImportedEntity {
483                        id,
484                        name,
485                        entity_type,
486                        attributes,
487                    };
488
489                    if !self.config.skip_validation {
490                        if let Err(e) = self.validate_entity(&entity) {
491                            errors.push(e);
492                            if errors.len() >= self.config.max_errors {
493                                break;
494                            }
495                            continue;
496                        }
497                    }
498
499                    entities.push(entity);
500                }
501                JsonLine::Relationship { source, target, relation_type, attributes } => {
502                    let rel = ImportedRelationship {
503                        source,
504                        target,
505                        relation_type,
506                        attributes,
507                    };
508
509                    if !self.config.skip_validation {
510                        if let Err(e) = self.validate_relationship(&rel) {
511                            errors.push(e);
512                            if errors.len() >= self.config.max_errors {
513                                break;
514                            }
515                            continue;
516                        }
517                    }
518
519                    relationships.push(rel);
520                }
521            }
522        }
523
524        Ok(ImportResult {
525            entities_imported: entities.len(),
526            relationships_imported: relationships.len(),
527            errors,
528            processing_time_ms: 0,
529        })
530    }
531
532    /// Import RDF/Turtle file
533    fn import_rdf(&self, _path: &Path) -> Result<ImportResult, ImportError> {
534        // TODO: Implement RDF parsing
535        // Use sophia or oxigraph crates
536
537        Ok(ImportResult {
538            entities_imported: 0,
539            relationships_imported: 0,
540            errors: Vec::new(),
541            processing_time_ms: 0,
542        })
543    }
544
545    /// Import GraphML file
546    fn import_graphml(&self, _path: &Path) -> Result<ImportResult, ImportError> {
547        // TODO: Implement GraphML parsing
548        // Use xml-rs crate
549
550        Ok(ImportResult {
551            entities_imported: 0,
552            relationships_imported: 0,
553            errors: Vec::new(),
554            processing_time_ms: 0,
555        })
556    }
557
558    /// Validate imported data
559    fn validate_entity(&self, entity: &ImportedEntity) -> Result<(), ImportError> {
560        if entity.id.is_empty() {
561            return Err(ImportError::MissingField("entity_id".to_string()));
562        }
563
564        if entity.name.is_empty() {
565            return Err(ImportError::MissingField("entity_name".to_string()));
566        }
567
568        if entity.entity_type.is_empty() {
569            return Err(ImportError::MissingField("entity_type".to_string()));
570        }
571
572        Ok(())
573    }
574
575    /// Validate relationship
576    fn validate_relationship(&self, rel: &ImportedRelationship) -> Result<(), ImportError> {
577        if rel.source.is_empty() {
578            return Err(ImportError::MissingField("source".to_string()));
579        }
580
581        if rel.target.is_empty() {
582            return Err(ImportError::MissingField("target".to_string()));
583        }
584
585        if rel.relation_type.is_empty() {
586            return Err(ImportError::MissingField("relation_type".to_string()));
587        }
588
589        Ok(())
590    }
591}
592
593/// Streaming data source
594#[async_trait::async_trait]
595pub trait StreamingSource: Send + Sync {
596    /// Get next batch of entities
597    async fn next_batch(&mut self) -> Result<Vec<ImportedEntity>, ImportError>;
598
599    /// Check if more data available
600    async fn has_more(&self) -> bool;
601}
602
603/// Streaming importer for continuous data ingestion
604pub struct StreamingImporter {
605    config: ImportConfig,
606}
607
608impl StreamingImporter {
609    /// Create new streaming importer
610    pub fn new(config: ImportConfig) -> Self {
611        Self { config }
612    }
613
614    /// Import from streaming source
615    pub async fn import_stream<S: StreamingSource>(
616        &self,
617        mut source: S,
618    ) -> Result<ImportResult, ImportError> {
619        let mut total_entities = 0;
620        let mut errors = Vec::new();
621
622        while source.has_more().await {
623            match source.next_batch().await {
624                Ok(entities) => {
625                    total_entities += entities.len();
626
627                    // Validate if not skipped
628                    if !self.config.skip_validation {
629                        for entity in &entities {
630                            if let Err(e) = self.validate_entity(entity) {
631                                errors.push(e);
632                            }
633                        }
634                    }
635                }
636                Err(e) => {
637                    errors.push(e);
638                    if errors.len() >= self.config.max_errors {
639                        break;
640                    }
641                }
642            }
643        }
644
645        Ok(ImportResult {
646            entities_imported: total_entities,
647            relationships_imported: 0,
648            errors,
649            processing_time_ms: 0,
650        })
651    }
652
653    /// Validate entity
654    fn validate_entity(&self, entity: &ImportedEntity) -> Result<(), ImportError> {
655        if entity.id.is_empty() {
656            return Err(ImportError::MissingField("entity_id".to_string()));
657        }
658        Ok(())
659    }
660}
661
662#[cfg(test)]
663mod tests {
664    use super::*;
665
666    #[test]
667    fn test_import_config_default() {
668        let config = ImportConfig::default();
669        assert_eq!(config.format, DataFormat::JSON);
670        assert_eq!(config.batch_size, 1000);
671    }
672
673    #[test]
674    fn test_validation() {
675        let importer = DataImporter::new(ImportConfig::default());
676
677        let valid_entity = ImportedEntity {
678            id: "1".to_string(),
679            name: "Test".to_string(),
680            entity_type: "Person".to_string(),
681            attributes: std::collections::HashMap::new(),
682        };
683
684        assert!(importer.validate_entity(&valid_entity).is_ok());
685
686        let invalid_entity = ImportedEntity {
687            id: "".to_string(), // Missing ID
688            name: "Test".to_string(),
689            entity_type: "Person".to_string(),
690            attributes: std::collections::HashMap::new(),
691        };
692
693        assert!(importer.validate_entity(&invalid_entity).is_err());
694    }
695}