Skip to main content

graphrag_core/pipeline/
data_import.rs

1//! Data Import Pipeline
2//!
3//! This module provides data import capabilities from multiple formats:
4//! - CSV/TSV files
5//! - JSON/JSONL (newline-delimited JSON)
6//! - RDF/Turtle (semantic web formats)
7//! - GraphML (graph exchange format)
8//! - Streaming ingestion from various sources
9//!
10//! ## Architecture
11//!
12//! ```text
13//! Data Source → Parser → Validator → Transformer → Graph Builder
14//!      │           │          │           │              │
15//!      ▼           ▼          ▼           ▼              ▼
16//!   CSV/JSON   Schema    Required    Normalize      KnowledgeGraph
17//!   Files      Check     Fields      Format
18//! ```
19
20use csv::ReaderBuilder;
21use serde::{Deserialize, Serialize};
22use std::fs::File;
23use std::io::{BufRead, BufReader};
24use std::path::Path;
25
26/// Supported data formats
27#[derive(Debug, Clone, PartialEq, Serialize, Deserialize)]
28#[allow(clippy::upper_case_acronyms)]
29pub enum DataFormat {
30    /// CSV (comma-separated values)
31    CSV,
32    /// TSV (tab-separated values)
33    TSV,
34    /// JSON (JavaScript Object Notation)
35    JSON,
36    /// JSONL (newline-delimited JSON)
37    JSONL,
38    /// RDF/Turtle (Resource Description Framework)
39    RDF,
40    /// GraphML (graph markup language)
41    GraphML,
42}
43
44/// Import configuration
45#[derive(Debug, Clone, Serialize, Deserialize)]
46pub struct ImportConfig {
47    /// Data format
48    pub format: DataFormat,
49    /// Skip validation
50    pub skip_validation: bool,
51    /// Batch size for processing
52    pub batch_size: usize,
53    /// Maximum errors before aborting
54    pub max_errors: usize,
55    /// Column mappings (for CSV/TSV)
56    pub column_mappings: Option<ColumnMappings>,
57}
58
59impl Default for ImportConfig {
60    fn default() -> Self {
61        Self {
62            format: DataFormat::JSON,
63            skip_validation: false,
64            batch_size: 1000,
65            max_errors: 10,
66            column_mappings: None,
67        }
68    }
69}
70
71/// Column mappings for CSV/TSV
72#[derive(Debug, Clone, Serialize, Deserialize)]
73pub struct ColumnMappings {
74    /// Entity ID column
75    pub entity_id: String,
76    /// Entity name column
77    pub entity_name: String,
78    /// Entity type column
79    pub entity_type: String,
80    /// Optional source column for relationships
81    pub relationship_source: Option<String>,
82    /// Optional target column for relationships
83    pub relationship_target: Option<String>,
84    /// Optional relationship type column
85    pub relationship_type: Option<String>,
86}
87
88/// Imported entity
89#[derive(Debug, Clone, Serialize, Deserialize)]
90pub struct ImportedEntity {
91    /// Unique identifier for the entity
92    pub id: String,
93    /// Display name of the entity
94    pub name: String,
95    /// Type/category of the entity
96    pub entity_type: String,
97    /// Additional attributes as key-value pairs
98    pub attributes: std::collections::HashMap<String, String>,
99}
100
101/// Imported relationship
102#[derive(Debug, Clone, Serialize, Deserialize)]
103pub struct ImportedRelationship {
104    /// Source entity identifier
105    pub source: String,
106    /// Target entity identifier
107    pub target: String,
108    /// Type of the relationship
109    pub relation_type: String,
110    /// Additional attributes as key-value pairs
111    pub attributes: std::collections::HashMap<String, String>,
112}
113
114/// Import result
115#[derive(Debug, Clone)]
116pub struct ImportResult {
117    /// Number of entities imported
118    pub entities_imported: usize,
119    /// Number of relationships imported
120    pub relationships_imported: usize,
121    /// Number of errors
122    pub errors: Vec<ImportError>,
123    /// Processing time (milliseconds)
124    pub processing_time_ms: u64,
125}
126
127/// Import errors
128#[derive(Debug, Clone)]
129pub enum ImportError {
130    /// File not found
131    FileNotFound(String),
132    /// Parse error
133    ParseError(String, usize), // message, line number
134    /// Validation error
135    ValidationError(String),
136    /// Missing required field
137    MissingField(String),
138    /// Invalid format
139    InvalidFormat(String),
140    /// Unsupported format (feature not enabled)
141    UnsupportedFormat(String),
142}
143
144impl std::fmt::Display for ImportError {
145    fn fmt(&self, f: &mut std::fmt::Formatter) -> std::fmt::Result {
146        match self {
147            ImportError::FileNotFound(path) => write!(f, "File not found: {}", path),
148            ImportError::ParseError(msg, line) => {
149                write!(f, "Parse error at line {}: {}", line, msg)
150            },
151            ImportError::ValidationError(msg) => write!(f, "Validation error: {}", msg),
152            ImportError::MissingField(field) => write!(f, "Missing required field: {}", field),
153            ImportError::InvalidFormat(msg) => write!(f, "Invalid format: {}", msg),
154            ImportError::UnsupportedFormat(msg) => write!(f, "Unsupported format: {}", msg),
155        }
156    }
157}
158
159impl std::error::Error for ImportError {}
160
161/// Data importer
162pub struct DataImporter {
163    config: ImportConfig,
164}
165
166impl DataImporter {
167    /// Create new importer
168    pub fn new(config: ImportConfig) -> Self {
169        Self { config }
170    }
171
172    /// Import from file
173    pub fn import_file(&self, path: impl AsRef<Path>) -> Result<ImportResult, ImportError> {
174        let path = path.as_ref();
175
176        if !path.exists() {
177            return Err(ImportError::FileNotFound(path.display().to_string()));
178        }
179
180        let start_time = std::time::Instant::now();
181
182        let result = match self.config.format {
183            DataFormat::CSV => self.import_csv(path)?,
184            DataFormat::TSV => self.import_tsv(path)?,
185            DataFormat::JSON => self.import_json(path)?,
186            DataFormat::JSONL => self.import_jsonl(path)?,
187            DataFormat::RDF => self.import_rdf(path)?,
188            DataFormat::GraphML => self.import_graphml(path)?,
189        };
190
191        let processing_time_ms = start_time.elapsed().as_millis() as u64;
192
193        Ok(ImportResult {
194            entities_imported: result.entities_imported,
195            relationships_imported: result.relationships_imported,
196            errors: result.errors,
197            processing_time_ms,
198        })
199    }
200
201    /// Import CSV file
202    fn import_csv(&self, path: &Path) -> Result<ImportResult, ImportError> {
203        self.import_csv_with_delimiter(path, b',')
204    }
205
206    /// Import CSV/TSV with custom delimiter
207    fn import_csv_with_delimiter(
208        &self,
209        path: &Path,
210        delimiter: u8,
211    ) -> Result<ImportResult, ImportError> {
212        let mut entities = Vec::new();
213        let mut relationships = Vec::new();
214        let mut errors = Vec::new();
215
216        let file = File::open(path)
217            .map_err(|e| ImportError::ParseError(format!("Failed to open file: {}", e), 0))?;
218
219        let mut reader = ReaderBuilder::new()
220            .delimiter(delimiter)
221            .has_headers(true)
222            .from_reader(file);
223
224        // Get headers
225        let headers = reader
226            .headers()
227            .map_err(|e| ImportError::ParseError(format!("Failed to read headers: {}", e), 0))?
228            .clone();
229
230        let mappings = self.config.column_mappings.as_ref().ok_or_else(|| {
231            ImportError::ValidationError("Column mappings required for CSV import".to_string())
232        })?;
233
234        // Find column indices
235        let entity_id_idx = headers
236            .iter()
237            .position(|h| h == mappings.entity_id)
238            .ok_or_else(|| ImportError::MissingField(mappings.entity_id.clone()))?;
239        let entity_name_idx = headers
240            .iter()
241            .position(|h| h == mappings.entity_name)
242            .ok_or_else(|| ImportError::MissingField(mappings.entity_name.clone()))?;
243        let entity_type_idx = headers
244            .iter()
245            .position(|h| h == mappings.entity_type)
246            .ok_or_else(|| ImportError::MissingField(mappings.entity_type.clone()))?;
247
248        // Optional relationship columns
249        let rel_source_idx = mappings
250            .relationship_source
251            .as_ref()
252            .and_then(|col| headers.iter().position(|h| h == col));
253        let rel_target_idx = mappings
254            .relationship_target
255            .as_ref()
256            .and_then(|col| headers.iter().position(|h| h == col));
257        let rel_type_idx = mappings
258            .relationship_type
259            .as_ref()
260            .and_then(|col| headers.iter().position(|h| h == col));
261
262        // Process records
263        for (line_num, result) in reader.records().enumerate() {
264            let record = match result {
265                Ok(r) => r,
266                Err(e) => {
267                    errors.push(ImportError::ParseError(
268                        format!("CSV parse error: {}", e),
269                        line_num + 2, // +2 for header and 0-indexing
270                    ));
271                    if errors.len() >= self.config.max_errors {
272                        break;
273                    }
274                    continue;
275                },
276            };
277
278            // Extract entity
279            let entity_id = record.get(entity_id_idx).unwrap_or("").to_string();
280            let entity_name = record.get(entity_name_idx).unwrap_or("").to_string();
281            let entity_type = record.get(entity_type_idx).unwrap_or("").to_string();
282
283            if !entity_id.is_empty() && !entity_name.is_empty() && !entity_type.is_empty() {
284                // Collect additional attributes
285                let mut attributes = std::collections::HashMap::new();
286                for (idx, header) in headers.iter().enumerate() {
287                    if idx != entity_id_idx && idx != entity_name_idx && idx != entity_type_idx {
288                        if let Some(value) = record.get(idx) {
289                            if !value.is_empty() {
290                                attributes.insert(header.to_string(), value.to_string());
291                            }
292                        }
293                    }
294                }
295
296                let entity = ImportedEntity {
297                    id: entity_id,
298                    name: entity_name,
299                    entity_type,
300                    attributes,
301                };
302
303                // Validate if not skipped
304                if !self.config.skip_validation {
305                    if let Err(e) = self.validate_entity(&entity) {
306                        errors.push(e);
307                        if errors.len() >= self.config.max_errors {
308                            break;
309                        }
310                        continue;
311                    }
312                }
313
314                entities.push(entity);
315            }
316
317            // Extract relationship if columns present
318            if let (Some(src_idx), Some(tgt_idx), Some(type_idx)) =
319                (rel_source_idx, rel_target_idx, rel_type_idx)
320            {
321                if let (Some(source), Some(target), Some(rel_type)) = (
322                    record.get(src_idx),
323                    record.get(tgt_idx),
324                    record.get(type_idx),
325                ) {
326                    if !source.is_empty() && !target.is_empty() && !rel_type.is_empty() {
327                        let relationship = ImportedRelationship {
328                            source: source.to_string(),
329                            target: target.to_string(),
330                            relation_type: rel_type.to_string(),
331                            attributes: std::collections::HashMap::new(),
332                        };
333
334                        if !self.config.skip_validation {
335                            if let Err(e) = self.validate_relationship(&relationship) {
336                                errors.push(e);
337                                if errors.len() >= self.config.max_errors {
338                                    break;
339                                }
340                                continue;
341                            }
342                        }
343
344                        relationships.push(relationship);
345                    }
346                }
347            }
348        }
349
350        Ok(ImportResult {
351            entities_imported: entities.len(),
352            relationships_imported: relationships.len(),
353            errors,
354            processing_time_ms: 0, // Will be filled by import_file
355        })
356    }
357
358    /// Import TSV file
359    fn import_tsv(&self, path: &Path) -> Result<ImportResult, ImportError> {
360        // TSV is just CSV with tab delimiter
361        self.import_csv_with_delimiter(path, b'\t')
362    }
363
364    /// Import JSON file
365    fn import_json(&self, path: &Path) -> Result<ImportResult, ImportError> {
366        let file = File::open(path)
367            .map_err(|e| ImportError::ParseError(format!("Failed to open file: {}", e), 0))?;
368
369        let reader = BufReader::new(file);
370
371        // Expected JSON structure:
372        // {
373        //   "entities": [...],
374        //   "relationships": [...]
375        // }
376        #[derive(Deserialize)]
377        struct JsonData {
378            entities: Option<Vec<ImportedEntity>>,
379            relationships: Option<Vec<ImportedRelationship>>,
380        }
381
382        let json_data: JsonData = serde_json::from_reader(reader)
383            .map_err(|e| ImportError::ParseError(format!("JSON parse error: {}", e), 0))?;
384
385        let mut errors = Vec::new();
386        let mut valid_entities = Vec::new();
387        let mut valid_relationships = Vec::new();
388
389        // Validate entities
390        if let Some(entities) = json_data.entities {
391            for entity in entities {
392                if !self.config.skip_validation {
393                    if let Err(e) = self.validate_entity(&entity) {
394                        errors.push(e);
395                        if errors.len() >= self.config.max_errors {
396                            break;
397                        }
398                        continue;
399                    }
400                }
401                valid_entities.push(entity);
402            }
403        }
404
405        // Validate relationships
406        if let Some(relationships) = json_data.relationships {
407            for rel in relationships {
408                if !self.config.skip_validation {
409                    if let Err(e) = self.validate_relationship(&rel) {
410                        errors.push(e);
411                        if errors.len() >= self.config.max_errors {
412                            break;
413                        }
414                        continue;
415                    }
416                }
417                valid_relationships.push(rel);
418            }
419        }
420
421        Ok(ImportResult {
422            entities_imported: valid_entities.len(),
423            relationships_imported: valid_relationships.len(),
424            errors,
425            processing_time_ms: 0,
426        })
427    }
428
429    /// Import JSONL file
430    fn import_jsonl(&self, path: &Path) -> Result<ImportResult, ImportError> {
431        let file = File::open(path)
432            .map_err(|e| ImportError::ParseError(format!("Failed to open file: {}", e), 0))?;
433
434        let reader = BufReader::new(file);
435        let mut errors = Vec::new();
436        let mut entities = Vec::new();
437        let mut relationships = Vec::new();
438
439        // Each line is either an entity or relationship JSON object
440        // Expected format:
441        // {"type": "entity", "id": "...", "name": "...", "entity_type": "...", "attributes": {...}}
442        // {"type": "relationship", "source": "...", "target": "...", "relation_type": "...", "attributes": {...}}
443
444        #[derive(Deserialize)]
445        #[serde(tag = "type")]
446        enum JsonLine {
447            #[serde(rename = "entity")]
448            Entity {
449                id: String,
450                name: String,
451                entity_type: String,
452                #[serde(default)]
453                attributes: std::collections::HashMap<String, String>,
454            },
455            #[serde(rename = "relationship")]
456            Relationship {
457                source: String,
458                target: String,
459                relation_type: String,
460                #[serde(default)]
461                attributes: std::collections::HashMap<String, String>,
462            },
463        }
464
465        for (line_num, line) in reader.lines().enumerate() {
466            let line = match line {
467                Ok(l) => l,
468                Err(e) => {
469                    errors.push(ImportError::ParseError(
470                        format!("Failed to read line: {}", e),
471                        line_num + 1,
472                    ));
473                    if errors.len() >= self.config.max_errors {
474                        break;
475                    }
476                    continue;
477                },
478            };
479
480            // Skip empty lines
481            if line.trim().is_empty() {
482                continue;
483            }
484
485            let parsed: JsonLine = match serde_json::from_str(&line) {
486                Ok(p) => p,
487                Err(e) => {
488                    errors.push(ImportError::ParseError(
489                        format!("JSON parse error: {}", e),
490                        line_num + 1,
491                    ));
492                    if errors.len() >= self.config.max_errors {
493                        break;
494                    }
495                    continue;
496                },
497            };
498
499            match parsed {
500                JsonLine::Entity {
501                    id,
502                    name,
503                    entity_type,
504                    attributes,
505                } => {
506                    let entity = ImportedEntity {
507                        id,
508                        name,
509                        entity_type,
510                        attributes,
511                    };
512
513                    if !self.config.skip_validation {
514                        if let Err(e) = self.validate_entity(&entity) {
515                            errors.push(e);
516                            if errors.len() >= self.config.max_errors {
517                                break;
518                            }
519                            continue;
520                        }
521                    }
522
523                    entities.push(entity);
524                },
525                JsonLine::Relationship {
526                    source,
527                    target,
528                    relation_type,
529                    attributes,
530                } => {
531                    let rel = ImportedRelationship {
532                        source,
533                        target,
534                        relation_type,
535                        attributes,
536                    };
537
538                    if !self.config.skip_validation {
539                        if let Err(e) = self.validate_relationship(&rel) {
540                            errors.push(e);
541                            if errors.len() >= self.config.max_errors {
542                                break;
543                            }
544                            continue;
545                        }
546                    }
547
548                    relationships.push(rel);
549                },
550            }
551        }
552
553        Ok(ImportResult {
554            entities_imported: entities.len(),
555            relationships_imported: relationships.len(),
556            errors,
557            processing_time_ms: 0,
558        })
559    }
560
561    /// Import RDF/Turtle file
562    fn import_rdf(&self, _path: &Path) -> Result<ImportResult, ImportError> {
563        #[cfg(not(feature = "rdf-import"))]
564        {
565            return Err(ImportError::UnsupportedFormat(
566                "RDF import requires 'rdf-import' feature".to_string(),
567            ));
568        }
569
570        #[cfg(feature = "rdf-import")]
571        {
572            use oxrdf::NamedNode;
573            use oxttl::TurtleParser;
574            use std::collections::HashMap;
575
576            let start_time = std::time::Instant::now();
577            let mut entities = Vec::new();
578            let mut relationships = Vec::new();
579            let mut errors = Vec::new();
580
581            // Read file content
582            let content = std::fs::read(path)
583                .map_err(|e| ImportError::FileNotFound(format!("Failed to read file: {}", e)))?;
584
585            // Parse Turtle/RDF
586            let parser = TurtleParser::new().parse_read(&content[..]);
587
588            // Track unique entities (subjects and objects)
589            let mut entity_map: HashMap<String, HashMap<String, String>> = HashMap::new();
590
591            for result in parser {
592                match result {
593                    Ok(triple) => {
594                        let subject = triple.subject.to_string();
595                        let predicate = triple.predicate.to_string();
596                        let object = triple.object.to_string();
597
598                        // Extract entity from subject
599                        entity_map
600                            .entry(subject.clone())
601                            .or_insert_with(HashMap::new);
602
603                        // Check if object is a URI (entity) or literal (property)
604                        if object.starts_with('<') && object.ends_with('>') {
605                            // Object is an entity - create relationship
606                            let object_id = object
607                                .trim_start_matches('<')
608                                .trim_end_matches('>')
609                                .to_string();
610                            entity_map
611                                .entry(object_id.clone())
612                                .or_insert_with(HashMap::new);
613
614                            // Extract relation type from predicate URI
615                            let relation_type = Self::extract_local_name(&predicate);
616
617                            relationships.push(ImportedRelationship {
618                                source: Self::extract_local_name(&subject),
619                                target: Self::extract_local_name(&object_id),
620                                relation_type,
621                                attributes: HashMap::new(),
622                            });
623                        } else {
624                            // Object is a literal - add as entity attribute
625                            let prop_name = Self::extract_local_name(&predicate);
626                            let value = object
627                                .trim_start_matches('"')
628                                .trim_end_matches('"')
629                                .to_string();
630
631                            if let Some(attrs) = entity_map.get_mut(&subject) {
632                                attrs.insert(prop_name, value);
633                            }
634                        }
635                    },
636                    Err(e) => {
637                        errors.push(format!("RDF parse error: {}", e));
638                    },
639                }
640            }
641
642            // Convert entity_map to ImportedEntity list
643            for (uri, attributes) in entity_map {
644                let id = Self::extract_local_name(&uri);
645                let name = attributes
646                    .get("label")
647                    .or_else(|| attributes.get("name"))
648                    .cloned()
649                    .unwrap_or_else(|| id.clone());
650
651                let entity_type = attributes
652                    .get("type")
653                    .or_else(|| attributes.get("rdf:type"))
654                    .cloned()
655                    .unwrap_or_else(|| "resource".to_string());
656
657                entities.push(ImportedEntity {
658                    id,
659                    name,
660                    entity_type,
661                    attributes,
662                });
663            }
664
665            let processing_time_ms = start_time.elapsed().as_millis() as u64;
666
667            Ok(ImportResult {
668                entities_imported: entities.len(),
669                relationships_imported: relationships.len(),
670                errors,
671                processing_time_ms,
672            })
673        }
674    }
675
676    /// Extract local name from RDF URI (after last # or /)
677    #[cfg(feature = "rdf-import")]
678    fn extract_local_name(uri: &str) -> String {
679        let cleaned = uri.trim_start_matches('<').trim_end_matches('>');
680        cleaned
681            .split(&['#', '/'][..])
682            .last()
683            .unwrap_or(cleaned)
684            .to_string()
685    }
686
687    /// Import GraphML file
688    fn import_graphml(&self, _path: &Path) -> Result<ImportResult, ImportError> {
689        #[cfg(not(feature = "graphml-import"))]
690        {
691            return Err(ImportError::UnsupportedFormat(
692                "GraphML import requires 'graphml-import' feature".to_string(),
693            ));
694        }
695
696        #[cfg(feature = "graphml-import")]
697        {
698            use crate::core::{ChunkId, Entity, EntityId, EntityMention, Relationship};
699            use quick_xml::events::Event;
700            use quick_xml::Reader;
701            use std::collections::HashMap;
702
703            let start_time = std::time::Instant::now();
704            let mut entities = Vec::new();
705            let mut relationships = Vec::new();
706            let mut errors = Vec::new();
707
708            // Read file content
709            let content = std::fs::read_to_string(path)
710                .map_err(|e| ImportError::FileNotFound(format!("Failed to read file: {}", e)))?;
711
712            let mut reader = Reader::from_str(&content);
713            reader.config_mut().trim_text(true);
714
715            let mut current_node_id = String::new();
716            let mut current_node_attributes: HashMap<String, String> = HashMap::new();
717            let mut in_node = false;
718            let mut in_edge = false;
719            let mut current_edge_source = String::new();
720            let mut current_edge_target = String::new();
721            let mut current_edge_attributes: HashMap<String, String> = HashMap::new();
722            let mut current_data_key = String::new();
723
724            let mut buf = Vec::new();
725            loop {
726                match reader.read_event_into(&mut buf) {
727                    Ok(Event::Start(e)) => {
728                        match e.name().as_ref() {
729                            b"node" => {
730                                in_node = true;
731                                current_node_id.clear();
732                                current_node_attributes.clear();
733
734                                // Read node attributes
735                                for attr in e.attributes() {
736                                    if let Ok(attr) = attr {
737                                        if attr.key.as_ref() == b"id" {
738                                            current_node_id =
739                                                String::from_utf8_lossy(&attr.value).to_string();
740                                        }
741                                    }
742                                }
743                            },
744                            b"edge" => {
745                                in_edge = true;
746                                current_edge_source.clear();
747                                current_edge_target.clear();
748                                current_edge_attributes.clear();
749
750                                // Read edge attributes
751                                for attr in e.attributes() {
752                                    if let Ok(attr) = attr {
753                                        let key = attr.key.as_ref();
754                                        let value =
755                                            String::from_utf8_lossy(&attr.value).to_string();
756                                        match key {
757                                            b"source" => current_edge_source = value,
758                                            b"target" => current_edge_target = value,
759                                            _ => {},
760                                        }
761                                    }
762                                }
763                            },
764                            b"data" => {
765                                // Read data key attribute
766                                for attr in e.attributes() {
767                                    if let Ok(attr) = attr {
768                                        if attr.key.as_ref() == b"key" {
769                                            current_data_key =
770                                                String::from_utf8_lossy(&attr.value).to_string();
771                                        }
772                                    }
773                                }
774                            },
775                            _ => {},
776                        }
777                    },
778                    Ok(Event::Text(e)) => {
779                        if !current_data_key.is_empty() {
780                            let value = e.unescape().unwrap_or_default().to_string();
781                            if in_node {
782                                current_node_attributes.insert(current_data_key.clone(), value);
783                            } else if in_edge {
784                                current_edge_attributes.insert(current_data_key.clone(), value);
785                            }
786                        }
787                    },
788                    Ok(Event::End(e)) => {
789                        match e.name().as_ref() {
790                            b"node" => {
791                                if in_node && !current_node_id.is_empty() {
792                                    // Extract name and type from attributes
793                                    let name = current_node_attributes
794                                        .get("name")
795                                        .or_else(|| current_node_attributes.get("label"))
796                                        .cloned()
797                                        .unwrap_or_else(|| current_node_id.clone());
798
799                                    let entity_type = current_node_attributes
800                                        .get("type")
801                                        .or_else(|| current_node_attributes.get("category"))
802                                        .cloned()
803                                        .unwrap_or_else(|| "node".to_string());
804
805                                    entities.push(ImportedEntity {
806                                        id: current_node_id.clone(),
807                                        name,
808                                        entity_type,
809                                        attributes: current_node_attributes.clone(),
810                                    });
811                                }
812                                in_node = false;
813                            },
814                            b"edge" => {
815                                if in_edge
816                                    && !current_edge_source.is_empty()
817                                    && !current_edge_target.is_empty()
818                                {
819                                    let relation_type = current_edge_attributes
820                                        .get("type")
821                                        .or_else(|| current_edge_attributes.get("label"))
822                                        .cloned()
823                                        .unwrap_or_else(|| "related".to_string());
824
825                                    relationships.push(ImportedRelationship {
826                                        source: current_edge_source.clone(),
827                                        target: current_edge_target.clone(),
828                                        relation_type,
829                                        attributes: current_edge_attributes.clone(),
830                                    });
831                                }
832                                in_edge = false;
833                            },
834                            b"data" => {
835                                current_data_key.clear();
836                            },
837                            _ => {},
838                        }
839                    },
840                    Ok(Event::Eof) => break,
841                    Err(e) => {
842                        errors.push(format!("XML parse error: {}", e));
843                    },
844                    _ => {},
845                }
846                buf.clear();
847            }
848
849            let processing_time_ms = start_time.elapsed().as_millis() as u64;
850
851            Ok(ImportResult {
852                entities_imported: entities.len(),
853                relationships_imported: relationships.len(),
854                errors,
855                processing_time_ms,
856            })
857        }
858    }
859
860    /// Validate imported data
861    fn validate_entity(&self, entity: &ImportedEntity) -> Result<(), ImportError> {
862        if entity.id.is_empty() {
863            return Err(ImportError::MissingField("entity_id".to_string()));
864        }
865
866        if entity.name.is_empty() {
867            return Err(ImportError::MissingField("entity_name".to_string()));
868        }
869
870        if entity.entity_type.is_empty() {
871            return Err(ImportError::MissingField("entity_type".to_string()));
872        }
873
874        Ok(())
875    }
876
877    /// Validate relationship
878    fn validate_relationship(&self, rel: &ImportedRelationship) -> Result<(), ImportError> {
879        if rel.source.is_empty() {
880            return Err(ImportError::MissingField("source".to_string()));
881        }
882
883        if rel.target.is_empty() {
884            return Err(ImportError::MissingField("target".to_string()));
885        }
886
887        if rel.relation_type.is_empty() {
888            return Err(ImportError::MissingField("relation_type".to_string()));
889        }
890
891        Ok(())
892    }
893}
894
895/// Streaming data source
896#[async_trait::async_trait]
897pub trait StreamingSource: Send + Sync {
898    /// Get next batch of entities
899    async fn next_batch(&mut self) -> Result<Vec<ImportedEntity>, ImportError>;
900
901    /// Check if more data available
902    async fn has_more(&self) -> bool;
903}
904
905/// Streaming importer for continuous data ingestion
906pub struct StreamingImporter {
907    config: ImportConfig,
908}
909
910impl StreamingImporter {
911    /// Create new streaming importer
912    pub fn new(config: ImportConfig) -> Self {
913        Self { config }
914    }
915
916    /// Import from streaming source
917    pub async fn import_stream<S: StreamingSource>(
918        &self,
919        mut source: S,
920    ) -> Result<ImportResult, ImportError> {
921        let mut total_entities = 0;
922        let mut errors = Vec::new();
923
924        while source.has_more().await {
925            match source.next_batch().await {
926                Ok(entities) => {
927                    total_entities += entities.len();
928
929                    // Validate if not skipped
930                    if !self.config.skip_validation {
931                        for entity in &entities {
932                            if let Err(e) = self.validate_entity(entity) {
933                                errors.push(e);
934                            }
935                        }
936                    }
937                },
938                Err(e) => {
939                    errors.push(e);
940                    if errors.len() >= self.config.max_errors {
941                        break;
942                    }
943                },
944            }
945        }
946
947        Ok(ImportResult {
948            entities_imported: total_entities,
949            relationships_imported: 0,
950            errors,
951            processing_time_ms: 0,
952        })
953    }
954
955    /// Validate entity
956    fn validate_entity(&self, entity: &ImportedEntity) -> Result<(), ImportError> {
957        if entity.id.is_empty() {
958            return Err(ImportError::MissingField("entity_id".to_string()));
959        }
960        Ok(())
961    }
962}
963
964#[cfg(test)]
965mod tests {
966    use super::*;
967
968    #[test]
969    fn test_import_config_default() {
970        let config = ImportConfig::default();
971        assert_eq!(config.format, DataFormat::JSON);
972        assert_eq!(config.batch_size, 1000);
973    }
974
975    #[test]
976    fn test_validation() {
977        let importer = DataImporter::new(ImportConfig::default());
978
979        let valid_entity = ImportedEntity {
980            id: "1".to_string(),
981            name: "Test".to_string(),
982            entity_type: "Person".to_string(),
983            attributes: std::collections::HashMap::new(),
984        };
985
986        assert!(importer.validate_entity(&valid_entity).is_ok());
987
988        let invalid_entity = ImportedEntity {
989            id: "".to_string(), // Missing ID
990            name: "Test".to_string(),
991            entity_type: "Person".to_string(),
992            attributes: std::collections::HashMap::new(),
993        };
994
995        assert!(importer.validate_entity(&invalid_entity).is_err());
996    }
997}