1use serde::{Deserialize, Serialize};
21use std::path::Path;
22use std::fs::File;
23use std::io::{BufRead, BufReader};
24use csv::ReaderBuilder;
25
26#[derive(Debug, Clone, PartialEq, Serialize, Deserialize)]
28#[allow(clippy::upper_case_acronyms)]
29pub enum DataFormat {
30 CSV,
32 TSV,
34 JSON,
36 JSONL,
38 RDF,
40 GraphML,
42}
43
44#[derive(Debug, Clone, Serialize, Deserialize)]
46pub struct ImportConfig {
47 pub format: DataFormat,
49 pub skip_validation: bool,
51 pub batch_size: usize,
53 pub max_errors: usize,
55 pub column_mappings: Option<ColumnMappings>,
57}
58
59impl Default for ImportConfig {
60 fn default() -> Self {
61 Self {
62 format: DataFormat::JSON,
63 skip_validation: false,
64 batch_size: 1000,
65 max_errors: 10,
66 column_mappings: None,
67 }
68 }
69}
70
71#[derive(Debug, Clone, Serialize, Deserialize)]
73pub struct ColumnMappings {
74 pub entity_id: String,
76 pub entity_name: String,
78 pub entity_type: String,
80 pub relationship_source: Option<String>,
82 pub relationship_target: Option<String>,
84 pub relationship_type: Option<String>,
86}
87
88#[derive(Debug, Clone, Serialize, Deserialize)]
90pub struct ImportedEntity {
91 pub id: String,
93 pub name: String,
95 pub entity_type: String,
97 pub attributes: std::collections::HashMap<String, String>,
99}
100
101#[derive(Debug, Clone, Serialize, Deserialize)]
103pub struct ImportedRelationship {
104 pub source: String,
106 pub target: String,
108 pub relation_type: String,
110 pub attributes: std::collections::HashMap<String, String>,
112}
113
114#[derive(Debug, Clone)]
116pub struct ImportResult {
117 pub entities_imported: usize,
119 pub relationships_imported: usize,
121 pub errors: Vec<ImportError>,
123 pub processing_time_ms: u64,
125}
126
127#[derive(Debug, Clone)]
129pub enum ImportError {
130 FileNotFound(String),
132 ParseError(String, usize), ValidationError(String),
136 MissingField(String),
138 InvalidFormat(String),
140}
141
142impl std::fmt::Display for ImportError {
143 fn fmt(&self, f: &mut std::fmt::Formatter) -> std::fmt::Result {
144 match self {
145 ImportError::FileNotFound(path) => write!(f, "File not found: {}", path),
146 ImportError::ParseError(msg, line) => write!(f, "Parse error at line {}: {}", line, msg),
147 ImportError::ValidationError(msg) => write!(f, "Validation error: {}", msg),
148 ImportError::MissingField(field) => write!(f, "Missing required field: {}", field),
149 ImportError::InvalidFormat(msg) => write!(f, "Invalid format: {}", msg),
150 }
151 }
152}
153
154impl std::error::Error for ImportError {}
155
156pub struct DataImporter {
158 config: ImportConfig,
159}
160
161impl DataImporter {
162 pub fn new(config: ImportConfig) -> Self {
164 Self { config }
165 }
166
167 pub fn import_file(&self, path: impl AsRef<Path>) -> Result<ImportResult, ImportError> {
169 let path = path.as_ref();
170
171 if !path.exists() {
172 return Err(ImportError::FileNotFound(path.display().to_string()));
173 }
174
175 let start_time = std::time::Instant::now();
176
177 let result = match self.config.format {
178 DataFormat::CSV => self.import_csv(path)?,
179 DataFormat::TSV => self.import_tsv(path)?,
180 DataFormat::JSON => self.import_json(path)?,
181 DataFormat::JSONL => self.import_jsonl(path)?,
182 DataFormat::RDF => self.import_rdf(path)?,
183 DataFormat::GraphML => self.import_graphml(path)?,
184 };
185
186 let processing_time_ms = start_time.elapsed().as_millis() as u64;
187
188 Ok(ImportResult {
189 entities_imported: result.entities_imported,
190 relationships_imported: result.relationships_imported,
191 errors: result.errors,
192 processing_time_ms,
193 })
194 }
195
196 fn import_csv(&self, path: &Path) -> Result<ImportResult, ImportError> {
198 self.import_csv_with_delimiter(path, b',')
199 }
200
201 fn import_csv_with_delimiter(&self, path: &Path, delimiter: u8) -> Result<ImportResult, ImportError> {
203 let mut entities = Vec::new();
204 let mut relationships = Vec::new();
205 let mut errors = Vec::new();
206
207 let file = File::open(path)
208 .map_err(|e| ImportError::ParseError(format!("Failed to open file: {}", e), 0))?;
209
210 let mut reader = ReaderBuilder::new()
211 .delimiter(delimiter)
212 .has_headers(true)
213 .from_reader(file);
214
215 let headers = reader.headers()
217 .map_err(|e| ImportError::ParseError(format!("Failed to read headers: {}", e), 0))?
218 .clone();
219
220 let mappings = self.config.column_mappings.as_ref()
221 .ok_or_else(|| ImportError::ValidationError("Column mappings required for CSV import".to_string()))?;
222
223 let entity_id_idx = headers.iter().position(|h| h == mappings.entity_id)
225 .ok_or_else(|| ImportError::MissingField(mappings.entity_id.clone()))?;
226 let entity_name_idx = headers.iter().position(|h| h == mappings.entity_name)
227 .ok_or_else(|| ImportError::MissingField(mappings.entity_name.clone()))?;
228 let entity_type_idx = headers.iter().position(|h| h == mappings.entity_type)
229 .ok_or_else(|| ImportError::MissingField(mappings.entity_type.clone()))?;
230
231 let rel_source_idx = mappings.relationship_source.as_ref()
233 .and_then(|col| headers.iter().position(|h| h == col));
234 let rel_target_idx = mappings.relationship_target.as_ref()
235 .and_then(|col| headers.iter().position(|h| h == col));
236 let rel_type_idx = mappings.relationship_type.as_ref()
237 .and_then(|col| headers.iter().position(|h| h == col));
238
239 for (line_num, result) in reader.records().enumerate() {
241 let record = match result {
242 Ok(r) => r,
243 Err(e) => {
244 errors.push(ImportError::ParseError(
245 format!("CSV parse error: {}", e),
246 line_num + 2, ));
248 if errors.len() >= self.config.max_errors {
249 break;
250 }
251 continue;
252 }
253 };
254
255 let entity_id = record.get(entity_id_idx)
257 .unwrap_or("")
258 .to_string();
259 let entity_name = record.get(entity_name_idx)
260 .unwrap_or("")
261 .to_string();
262 let entity_type = record.get(entity_type_idx)
263 .unwrap_or("")
264 .to_string();
265
266 if !entity_id.is_empty() && !entity_name.is_empty() && !entity_type.is_empty() {
267 let mut attributes = std::collections::HashMap::new();
269 for (idx, header) in headers.iter().enumerate() {
270 if idx != entity_id_idx && idx != entity_name_idx && idx != entity_type_idx {
271 if let Some(value) = record.get(idx) {
272 if !value.is_empty() {
273 attributes.insert(header.to_string(), value.to_string());
274 }
275 }
276 }
277 }
278
279 let entity = ImportedEntity {
280 id: entity_id,
281 name: entity_name,
282 entity_type,
283 attributes,
284 };
285
286 if !self.config.skip_validation {
288 if let Err(e) = self.validate_entity(&entity) {
289 errors.push(e);
290 if errors.len() >= self.config.max_errors {
291 break;
292 }
293 continue;
294 }
295 }
296
297 entities.push(entity);
298 }
299
300 if let (Some(src_idx), Some(tgt_idx), Some(type_idx)) =
302 (rel_source_idx, rel_target_idx, rel_type_idx) {
303
304 if let (Some(source), Some(target), Some(rel_type)) =
305 (record.get(src_idx), record.get(tgt_idx), record.get(type_idx)) {
306
307 if !source.is_empty() && !target.is_empty() && !rel_type.is_empty() {
308 let relationship = ImportedRelationship {
309 source: source.to_string(),
310 target: target.to_string(),
311 relation_type: rel_type.to_string(),
312 attributes: std::collections::HashMap::new(),
313 };
314
315 if !self.config.skip_validation {
316 if let Err(e) = self.validate_relationship(&relationship) {
317 errors.push(e);
318 if errors.len() >= self.config.max_errors {
319 break;
320 }
321 continue;
322 }
323 }
324
325 relationships.push(relationship);
326 }
327 }
328 }
329 }
330
331 Ok(ImportResult {
332 entities_imported: entities.len(),
333 relationships_imported: relationships.len(),
334 errors,
335 processing_time_ms: 0, })
337 }
338
339 fn import_tsv(&self, path: &Path) -> Result<ImportResult, ImportError> {
341 self.import_csv_with_delimiter(path, b'\t')
343 }
344
345 fn import_json(&self, path: &Path) -> Result<ImportResult, ImportError> {
347 let file = File::open(path)
348 .map_err(|e| ImportError::ParseError(format!("Failed to open file: {}", e), 0))?;
349
350 let reader = BufReader::new(file);
351
352 #[derive(Deserialize)]
358 struct JsonData {
359 entities: Option<Vec<ImportedEntity>>,
360 relationships: Option<Vec<ImportedRelationship>>,
361 }
362
363 let json_data: JsonData = serde_json::from_reader(reader)
364 .map_err(|e| ImportError::ParseError(format!("JSON parse error: {}", e), 0))?;
365
366 let mut errors = Vec::new();
367 let mut valid_entities = Vec::new();
368 let mut valid_relationships = Vec::new();
369
370 if let Some(entities) = json_data.entities {
372 for entity in entities {
373 if !self.config.skip_validation {
374 if let Err(e) = self.validate_entity(&entity) {
375 errors.push(e);
376 if errors.len() >= self.config.max_errors {
377 break;
378 }
379 continue;
380 }
381 }
382 valid_entities.push(entity);
383 }
384 }
385
386 if let Some(relationships) = json_data.relationships {
388 for rel in relationships {
389 if !self.config.skip_validation {
390 if let Err(e) = self.validate_relationship(&rel) {
391 errors.push(e);
392 if errors.len() >= self.config.max_errors {
393 break;
394 }
395 continue;
396 }
397 }
398 valid_relationships.push(rel);
399 }
400 }
401
402 Ok(ImportResult {
403 entities_imported: valid_entities.len(),
404 relationships_imported: valid_relationships.len(),
405 errors,
406 processing_time_ms: 0,
407 })
408 }
409
410 fn import_jsonl(&self, path: &Path) -> Result<ImportResult, ImportError> {
412 let file = File::open(path)
413 .map_err(|e| ImportError::ParseError(format!("Failed to open file: {}", e), 0))?;
414
415 let reader = BufReader::new(file);
416 let mut errors = Vec::new();
417 let mut entities = Vec::new();
418 let mut relationships = Vec::new();
419
420 #[derive(Deserialize)]
426 #[serde(tag = "type")]
427 enum JsonLine {
428 #[serde(rename = "entity")]
429 Entity {
430 id: String,
431 name: String,
432 entity_type: String,
433 #[serde(default)]
434 attributes: std::collections::HashMap<String, String>,
435 },
436 #[serde(rename = "relationship")]
437 Relationship {
438 source: String,
439 target: String,
440 relation_type: String,
441 #[serde(default)]
442 attributes: std::collections::HashMap<String, String>,
443 },
444 }
445
446 for (line_num, line) in reader.lines().enumerate() {
447 let line = match line {
448 Ok(l) => l,
449 Err(e) => {
450 errors.push(ImportError::ParseError(
451 format!("Failed to read line: {}", e),
452 line_num + 1,
453 ));
454 if errors.len() >= self.config.max_errors {
455 break;
456 }
457 continue;
458 }
459 };
460
461 if line.trim().is_empty() {
463 continue;
464 }
465
466 let parsed: JsonLine = match serde_json::from_str(&line) {
467 Ok(p) => p,
468 Err(e) => {
469 errors.push(ImportError::ParseError(
470 format!("JSON parse error: {}", e),
471 line_num + 1,
472 ));
473 if errors.len() >= self.config.max_errors {
474 break;
475 }
476 continue;
477 }
478 };
479
480 match parsed {
481 JsonLine::Entity { id, name, entity_type, attributes } => {
482 let entity = ImportedEntity {
483 id,
484 name,
485 entity_type,
486 attributes,
487 };
488
489 if !self.config.skip_validation {
490 if let Err(e) = self.validate_entity(&entity) {
491 errors.push(e);
492 if errors.len() >= self.config.max_errors {
493 break;
494 }
495 continue;
496 }
497 }
498
499 entities.push(entity);
500 }
501 JsonLine::Relationship { source, target, relation_type, attributes } => {
502 let rel = ImportedRelationship {
503 source,
504 target,
505 relation_type,
506 attributes,
507 };
508
509 if !self.config.skip_validation {
510 if let Err(e) = self.validate_relationship(&rel) {
511 errors.push(e);
512 if errors.len() >= self.config.max_errors {
513 break;
514 }
515 continue;
516 }
517 }
518
519 relationships.push(rel);
520 }
521 }
522 }
523
524 Ok(ImportResult {
525 entities_imported: entities.len(),
526 relationships_imported: relationships.len(),
527 errors,
528 processing_time_ms: 0,
529 })
530 }
531
532 fn import_rdf(&self, _path: &Path) -> Result<ImportResult, ImportError> {
534 Ok(ImportResult {
538 entities_imported: 0,
539 relationships_imported: 0,
540 errors: Vec::new(),
541 processing_time_ms: 0,
542 })
543 }
544
545 fn import_graphml(&self, _path: &Path) -> Result<ImportResult, ImportError> {
547 Ok(ImportResult {
551 entities_imported: 0,
552 relationships_imported: 0,
553 errors: Vec::new(),
554 processing_time_ms: 0,
555 })
556 }
557
558 fn validate_entity(&self, entity: &ImportedEntity) -> Result<(), ImportError> {
560 if entity.id.is_empty() {
561 return Err(ImportError::MissingField("entity_id".to_string()));
562 }
563
564 if entity.name.is_empty() {
565 return Err(ImportError::MissingField("entity_name".to_string()));
566 }
567
568 if entity.entity_type.is_empty() {
569 return Err(ImportError::MissingField("entity_type".to_string()));
570 }
571
572 Ok(())
573 }
574
575 fn validate_relationship(&self, rel: &ImportedRelationship) -> Result<(), ImportError> {
577 if rel.source.is_empty() {
578 return Err(ImportError::MissingField("source".to_string()));
579 }
580
581 if rel.target.is_empty() {
582 return Err(ImportError::MissingField("target".to_string()));
583 }
584
585 if rel.relation_type.is_empty() {
586 return Err(ImportError::MissingField("relation_type".to_string()));
587 }
588
589 Ok(())
590 }
591}
592
593#[async_trait::async_trait]
595pub trait StreamingSource: Send + Sync {
596 async fn next_batch(&mut self) -> Result<Vec<ImportedEntity>, ImportError>;
598
599 async fn has_more(&self) -> bool;
601}
602
603pub struct StreamingImporter {
605 config: ImportConfig,
606}
607
608impl StreamingImporter {
609 pub fn new(config: ImportConfig) -> Self {
611 Self { config }
612 }
613
614 pub async fn import_stream<S: StreamingSource>(
616 &self,
617 mut source: S,
618 ) -> Result<ImportResult, ImportError> {
619 let mut total_entities = 0;
620 let mut errors = Vec::new();
621
622 while source.has_more().await {
623 match source.next_batch().await {
624 Ok(entities) => {
625 total_entities += entities.len();
626
627 if !self.config.skip_validation {
629 for entity in &entities {
630 if let Err(e) = self.validate_entity(entity) {
631 errors.push(e);
632 }
633 }
634 }
635 }
636 Err(e) => {
637 errors.push(e);
638 if errors.len() >= self.config.max_errors {
639 break;
640 }
641 }
642 }
643 }
644
645 Ok(ImportResult {
646 entities_imported: total_entities,
647 relationships_imported: 0,
648 errors,
649 processing_time_ms: 0,
650 })
651 }
652
653 fn validate_entity(&self, entity: &ImportedEntity) -> Result<(), ImportError> {
655 if entity.id.is_empty() {
656 return Err(ImportError::MissingField("entity_id".to_string()));
657 }
658 Ok(())
659 }
660}
661
662#[cfg(test)]
663mod tests {
664 use super::*;
665
666 #[test]
667 fn test_import_config_default() {
668 let config = ImportConfig::default();
669 assert_eq!(config.format, DataFormat::JSON);
670 assert_eq!(config.batch_size, 1000);
671 }
672
673 #[test]
674 fn test_validation() {
675 let importer = DataImporter::new(ImportConfig::default());
676
677 let valid_entity = ImportedEntity {
678 id: "1".to_string(),
679 name: "Test".to_string(),
680 entity_type: "Person".to_string(),
681 attributes: std::collections::HashMap::new(),
682 };
683
684 assert!(importer.validate_entity(&valid_entity).is_ok());
685
686 let invalid_entity = ImportedEntity {
687 id: "".to_string(), name: "Test".to_string(),
689 entity_type: "Person".to_string(),
690 attributes: std::collections::HashMap::new(),
691 };
692
693 assert!(importer.validate_entity(&invalid_entity).is_err());
694 }
695}