use super::CommandResult;
use crate::cli::{Checkpoint, CheckpointManager};
use oxirs_core::format::{RdfFormat, RdfSerializer};
use oxirs_core::rdf_store::RdfStore;
use std::fs;
use std::path::PathBuf;
use std::time::Instant;
pub async fn run(
dataset: String,
file: PathBuf,
format: String,
graph: Option<String>,
resume: bool,
) -> CommandResult {
println!(
"Exporting data from dataset '{}' to {}",
dataset,
file.display()
);
println!("Output format: {format}");
if let Some(g) = &graph {
println!("Source graph: {g}");
}
if !is_supported_export_format(&format) {
return Err(format!(
"Unsupported export format '{format}'. Supported formats: turtle, ntriples, rdfxml, jsonld, trig, nquads"
).into());
}
let checkpoint_manager = CheckpointManager::new()
.map_err(|e| format!("Failed to initialize checkpoint manager: {}", e))?;
let mut start_count = 0usize;
if resume {
if let Some(checkpoint) = checkpoint_manager
.load("export", &dataset, file.to_str().unwrap_or(""))
.map_err(|e| format!("Failed to load checkpoint: {}", e))?
{
println!(
"Found checkpoint from {}: {} triples exported",
checkpoint.timestamp, checkpoint.processed_count
);
start_count = checkpoint.processed_count;
println!("Resuming from triple {}", start_count);
} else {
println!("No checkpoint found, starting fresh export");
}
}
if file.exists() && !resume {
return Err(format!("Output file '{}' already exists", file.display()).into());
}
if let Some(parent) = file.parent() {
fs::create_dir_all(parent)?;
}
let dataset_path = if PathBuf::from(&dataset).join("oxirs.toml").exists() {
load_dataset_from_config(&dataset)?
} else {
PathBuf::from(&dataset)
};
let store = if dataset_path.is_dir() {
RdfStore::open(&dataset_path).map_err(|e| format!("Failed to open dataset: {e}"))?
} else {
return Err(format!(
"Dataset '{dataset}' not found. Use 'oxirs init' to create a dataset."
)
.into());
};
let start_time = Instant::now();
println!("Starting export...");
let triple_count = export_data(
&store,
&file,
&format,
graph.as_deref(),
resume,
&checkpoint_manager,
&dataset,
start_count,
)?;
let duration = start_time.elapsed();
if resume {
checkpoint_manager
.delete("export", &dataset, file.to_str().unwrap_or(""))
.map_err(|e| format!("Failed to delete checkpoint: {}", e))?;
println!("Checkpoint cleared after successful export");
}
use crate::cli::{file_size, format_bytes, format_duration, format_number};
let file_size_bytes = file_size(&file).unwrap_or(0);
println!("\n✓ Export completed successfully!");
println!(" Duration: {}", format_duration(duration));
println!(" Triples exported: {}", format_number(triple_count as u64));
println!(" Output file: {}", file.display());
println!(" File size: {}", format_bytes(file_size_bytes));
if duration.as_secs_f64() > 0.0 {
let rate = triple_count as f64 / duration.as_secs_f64();
println!(
" Export rate: {} triples/second",
format_number(rate as u64)
);
}
Ok(())
}
fn is_supported_export_format(format: &str) -> bool {
matches!(
format,
"turtle" | "ntriples" | "rdfxml" | "jsonld" | "trig" | "nquads"
)
}
fn load_dataset_from_config(dataset: &str) -> Result<PathBuf, Box<dyn std::error::Error>> {
crate::config::load_dataset_from_config(dataset)
.map_err(|e| Box::new(e) as Box<dyn std::error::Error>)
}
#[allow(clippy::too_many_arguments)]
fn export_data(
store: &RdfStore,
file: &PathBuf,
format: &str,
graph: Option<&str>,
enable_checkpointing: bool,
checkpoint_manager: &CheckpointManager,
dataset: &str,
start_count: usize,
) -> Result<usize, Box<dyn std::error::Error>> {
let rdf_format = match format {
"turtle" | "ttl" => RdfFormat::Turtle,
"ntriples" | "nt" => RdfFormat::NTriples,
"nquads" | "nq" => RdfFormat::NQuads,
"trig" => RdfFormat::TriG,
"rdfxml" | "rdf" | "xml" => RdfFormat::RdfXml,
"jsonld" | "json" => RdfFormat::JsonLd {
profile: oxirs_core::format::JsonLdProfileSet::empty(),
},
"n3" => RdfFormat::N3,
_ => {
return Err(format!("Unsupported export format: {format}").into());
}
};
println!(" [1/3] Querying triples from store...");
let all_quads = store
.quads()
.map_err(|e| format!("Failed to query quads: {e}"))?;
let quads: Vec<_> = if let Some(graph_name) = graph {
use oxirs_core::model::{GraphName, NamedNode};
let graph_filter = if graph_name == "default" {
GraphName::DefaultGraph
} else {
GraphName::NamedNode(
NamedNode::new(graph_name).map_err(|e| format!("Invalid graph IRI: {e}"))?,
)
};
all_quads
.into_iter()
.filter(|quad| quad.graph_name() == &graph_filter)
.collect()
} else {
all_quads
};
let quad_count = quads.len();
println!(" ✓ Retrieved {quad_count} quads from store");
println!(" [2/3] Serializing to {format} format...");
let output_file = if start_count > 0 {
fs::OpenOptions::new().append(true).open(file)?
} else {
fs::File::create(file)?
};
let mut serializer = RdfSerializer::new(rdf_format)
.with_prefix("rdf", "http://www.w3.org/1999/02/22-rdf-syntax-ns#")
.with_prefix("rdfs", "http://www.w3.org/2000/01/rdf-schema#")
.with_prefix("xsd", "http://www.w3.org/2001/XMLSchema#")
.with_prefix("owl", "http://www.w3.org/2002/07/owl#")
.pretty()
.for_writer(output_file);
const CHECKPOINT_INTERVAL: usize = 10_000;
for (idx, quad) in quads.iter().enumerate() {
if idx < start_count {
continue;
}
serializer
.serialize_quad(quad.as_ref())
.map_err(|e| format!("Serialization error: {e}"))?;
let exported_count = idx + 1;
if enable_checkpointing && exported_count % CHECKPOINT_INTERVAL == 0 {
let checkpoint = Checkpoint {
operation: "export".to_string(),
dataset: dataset.to_string(),
file_path: file.to_string_lossy().to_string(),
processed_count: exported_count,
last_offset: 0, timestamp: chrono::Local::now().to_rfc3339(),
format: format.to_string(),
graph: graph.map(|s| s.to_string()),
total_size: quad_count as u64,
};
if let Err(e) = checkpoint_manager.save(&checkpoint) {
eprintln!("Warning: Failed to save checkpoint: {e}");
}
}
}
serializer
.finish()
.map_err(|e| format!("Failed to finalize serialization: {e}"))?;
println!(" ✓ Serialization completed");
println!(" [3/3] Writing to file...");
println!(" ✓ Data written to {}", file.display());
Ok(quad_count)
}