use super::CommandResult;
use crate::cli::error::helpers as error_helpers;
use crate::cli::logging::{DataLogger, PerfLogger};
use crate::cli::validation::MultiValidator;
use crate::cli::validation::{dataset_validation, fs_validation, validate_rdf_format};
use crate::cli::{progress::helpers, ArgumentValidator, Checkpoint, CheckpointManager, CliContext};
use oxirs_core::format::{RdfFormat, RdfParser};
use oxirs_core::model::{GraphName, NamedNode};
use oxirs_core::rdf_store::RdfStore;
use std::fs;
use std::io::BufReader;
use std::path::{Path, PathBuf};
use std::time::Instant;
pub async fn run(
dataset: String,
file: PathBuf,
format: Option<String>,
graph: Option<String>,
resume: bool,
) -> CommandResult {
let ctx = CliContext::new();
let mut validator = MultiValidator::new();
validator.add(
ArgumentValidator::new("dataset", Some(&dataset))
.required()
.custom(|d| !d.trim().is_empty(), "Dataset name cannot be empty"),
);
if !PathBuf::from(&dataset).exists() {
dataset_validation::validate_dataset_name(&dataset)?;
}
validator.add(
ArgumentValidator::new("file", Some(file.to_str().unwrap_or("")))
.required()
.is_file(),
);
validator.finish()?;
fs_validation::validate_file_size(&file, Some(1_073_741_824))?;
if let Some(ref fmt) = format {
validate_rdf_format(fmt)?;
}
if let Some(ref g) = graph {
dataset_validation::validate_graph_uri(g)?;
}
ctx.info(&format!("Importing data into dataset '{dataset}'"));
ctx.info(&format!("Source file: {}", file.display()));
let detected_format = format.unwrap_or_else(|| detect_format(&file));
ctx.info(&format!("Format: {detected_format}"));
if let Some(g) = &graph {
ctx.info(&format!("Target graph: {g}"));
}
let checkpoint_manager = CheckpointManager::new()
.map_err(|e| format!("Failed to initialize checkpoint manager: {}", e))?;
let mut start_offset = 0u64;
let mut processed_count = 0usize;
if resume {
if let Some(checkpoint) = checkpoint_manager
.load("import", &dataset, file.to_str().unwrap_or(""))
.map_err(|e| format!("Failed to load checkpoint: {}", e))?
{
ctx.info(&format!(
"Found checkpoint from {}: {} triples processed ({:.1}% complete)",
checkpoint.timestamp,
checkpoint.processed_count,
checkpoint_manager.progress_percentage(&checkpoint)
));
start_offset = checkpoint.last_offset;
processed_count = checkpoint.processed_count;
ctx.info(&format!("Resuming from offset {}", start_offset));
} else {
ctx.info("No checkpoint found, starting fresh import");
}
}
let dataset_dir = PathBuf::from(&dataset);
let dataset_path = if dataset_dir.join("oxirs.toml").exists() {
let dataset_name = dataset_dir
.file_name()
.and_then(|n| n.to_str())
.unwrap_or(&dataset);
let (storage_path, _config) =
crate::config::load_named_dataset(&dataset_dir, dataset_name)?;
storage_path
} else {
dataset_dir
};
let store = if dataset_path.is_dir() {
RdfStore::open(&dataset_path).map_err(|e| format!("Failed to open dataset: {e}"))?
} else {
return Err(error_helpers::dataset_not_found_error(&dataset));
};
let start_time = Instant::now();
ctx.info("Import Progress");
let mut data_logger = DataLogger::new("import", &dataset);
let mut perf_logger = PerfLogger::new(format!("import_{detected_format}"));
perf_logger.add_metadata("file", file.display().to_string());
perf_logger.add_metadata("format", &detected_format);
if let Some(ref g) = graph {
perf_logger.add_metadata("graph", g);
}
let file_metadata = fs::metadata(&file)?;
let file_size = file_metadata.len();
let read_progress = helpers::download_progress(file_size, &file.display().to_string());
read_progress.set_message("Reading file");
let file_handle = fs::File::open(&file)?;
read_progress.finish_with_message("File opened");
data_logger.update_progress(file_size, 0);
let parse_progress = helpers::query_progress();
parse_progress.set_message("Parsing and importing RDF data");
let (triple_count, error_count) = parse_and_import(
&store,
file_handle,
&detected_format,
graph.as_deref(),
resume,
&checkpoint_manager,
&dataset,
&file,
file_size,
start_offset,
processed_count,
)?;
parse_progress.finish_with_message("Import complete");
if resume {
checkpoint_manager
.delete("import", &dataset, file.to_str().unwrap_or(""))
.map_err(|e| format!("Failed to delete checkpoint: {}", e))?;
ctx.info("Checkpoint cleared after successful import");
}
let duration = start_time.elapsed();
data_logger.update_progress(file_size, triple_count as u64);
data_logger.complete();
perf_logger.add_metadata("triple_count", triple_count);
perf_logger.add_metadata("error_count", error_count);
perf_logger.complete(Some(5000));
use crate::cli::{format_bytes, format_duration, format_number};
ctx.info("Import Statistics");
ctx.success(&format!(
"✓ Import completed in {}",
format_duration(duration)
));
ctx.info(&format!(
" Triples imported: {}",
format_number(triple_count as u64)
));
ctx.info(&format!(" File size: {}", format_bytes(file_size)));
if error_count > 0 {
ctx.warn(&format!(
" Errors encountered: {}",
format_number(error_count as u64)
));
}
if duration.as_secs_f64() > 0.0 {
let rate = triple_count as f64 / duration.as_secs_f64();
ctx.info(&format!(
" Import rate: {} triples/second",
format_number(rate as u64)
));
let throughput = file_size as f64 / duration.as_secs_f64();
ctx.info(&format!(
" Throughput: {}/second",
format_bytes(throughput as u64)
));
}
Ok(())
}
fn detect_format(file: &Path) -> String {
if let Some(ext) = file.extension().and_then(|s| s.to_str()) {
match ext.to_lowercase().as_str() {
"ttl" | "turtle" => "turtle".to_string(),
"nt" | "ntriples" => "ntriples".to_string(),
"rdf" | "xml" => "rdfxml".to_string(),
"jsonld" | "json-ld" => "jsonld".to_string(),
"trig" => "trig".to_string(),
"nq" | "nquads" => "nquads".to_string(),
_ => "turtle".to_string(), }
} else {
"turtle".to_string() }
}
#[allow(dead_code)]
fn is_supported_format(format: &str) -> bool {
matches!(
format,
"turtle" | "ntriples" | "rdfxml" | "jsonld" | "trig" | "nquads"
)
}
#[allow(clippy::too_many_arguments)]
fn parse_and_import<S: oxirs_core::Store>(
store: &S,
file: fs::File,
format: &str,
graph: Option<&str>,
enable_checkpointing: bool,
checkpoint_manager: &CheckpointManager,
dataset: &str,
file_path: &Path,
total_size: u64,
_start_offset: u64,
start_count: usize,
) -> Result<(usize, usize), Box<dyn std::error::Error>> {
let rdf_format = match format {
"turtle" | "ttl" => RdfFormat::Turtle,
"ntriples" | "nt" => RdfFormat::NTriples,
"nquads" | "nq" => RdfFormat::NQuads,
"trig" => RdfFormat::TriG,
"rdfxml" | "rdf" | "xml" => RdfFormat::RdfXml,
"jsonld" | "json-ld" | "json" => RdfFormat::JsonLd {
profile: oxirs_core::format::JsonLdProfileSet::empty(),
},
"n3" => RdfFormat::N3,
_ => {
return Err(format!("Unsupported import format: {format}").into());
}
};
let target_graph = if let Some(graph_iri) = graph {
if graph_iri == "default" {
GraphName::DefaultGraph
} else {
GraphName::NamedNode(
NamedNode::new(graph_iri).map_err(|e| format!("Invalid graph IRI: {e}"))?,
)
}
} else {
GraphName::DefaultGraph
};
let reader = BufReader::new(file);
let parser = RdfParser::new(rdf_format);
let mut total_parsed = 0; let mut error_count = 0;
const CHECKPOINT_INTERVAL: usize = 10_000;
for quad_result in parser.for_reader(reader) {
match quad_result {
Ok(mut quad) => {
total_parsed += 1;
if total_parsed <= start_count {
continue;
}
if matches!(target_graph, GraphName::NamedNode(_))
&& matches!(quad.graph_name(), GraphName::DefaultGraph)
{
quad = oxirs_core::model::Quad::new(
quad.subject().clone(),
quad.predicate().clone(),
quad.object().clone(),
target_graph.clone(),
);
}
match store.insert_quad(quad) {
Ok(_) => {
if enable_checkpointing && total_parsed % CHECKPOINT_INTERVAL == 0 {
let checkpoint = Checkpoint {
operation: "import".to_string(),
dataset: dataset.to_string(),
file_path: file_path.to_string_lossy().to_string(),
processed_count: total_parsed,
last_offset: 0, timestamp: chrono::Local::now().to_rfc3339(),
format: format.to_string(),
graph: graph.map(|s| s.to_string()),
total_size,
};
if let Err(e) = checkpoint_manager.save(&checkpoint) {
eprintln!("Warning: Failed to save checkpoint: {e}");
}
}
}
Err(e) => {
eprintln!("Warning: Failed to insert quad: {e}");
error_count += 1;
}
}
}
Err(e) => {
eprintln!("Warning: Parse error: {e}");
error_count += 1;
}
}
}
Ok((total_parsed, error_count))
}