use super::ExportMode;
use super::manifest::ExportManifest;
use crate::backend::native::graph_file::GraphFile;
use crate::backend::native::types::{NativeBackendError, NativeResult};
use crate::backend::native::v2::wal::recovery::states::{
Authority, RecoveryContext, RecoveryState as ExplicitRecoveryState,
};
use crate::backend::native::v2::wal::{V2WALConfig, V2WALReader};
use std::path::{Path, PathBuf};
use std::time::Duration;
#[derive(Debug, Clone)]
pub struct V2ExportConfig {
pub export_path: PathBuf,
pub include_wal_tail: bool,
pub compression_enabled: bool,
pub checksum_validation: bool,
}
#[derive(Debug, Clone)]
pub struct ExportConsistencyReport {
pub recovery_state: ExplicitRecoveryState,
pub authority: Authority,
pub checkpoint_lsn: u64,
pub committed_lsn: u64,
pub current_lsn: u64,
pub active_transactions: u32,
pub recommended_export_mode: ExportMode,
}
#[derive(Debug, Clone)]
pub struct ExportResult {
pub manifest_path: PathBuf,
pub graph_file_path: PathBuf,
pub wal_file_path: Option<PathBuf>,
pub records_exported: u64,
pub bytes_exported: u64,
pub export_duration: Duration,
pub checksum: u64,
}
pub struct V2Exporter {
config: V2ExportConfig,
graph_file: GraphFile,
wal_config: V2WALConfig,
wal_reader: Option<V2WALReader>,
}
impl V2Exporter {
pub fn from_graph_file(graph_path: &Path, export_config: V2ExportConfig) -> NativeResult<Self> {
if !export_config.export_path.exists() {
std::fs::create_dir_all(&export_config.export_path).map_err(|e| {
NativeBackendError::IoError {
context: format!(
"Failed to create export directory: {:?}",
export_config.export_path
),
source: e,
}
})?;
}
let graph_file = GraphFile::open(graph_path)?;
let mut wal_config = V2WALConfig::for_graph_file(graph_path);
wal_config.enable_compression = export_config.compression_enabled;
wal_config.validate()?;
let wal_reader = if wal_config.wal_path.exists() {
match V2WALReader::open(&wal_config.wal_path) {
Ok(reader) => Some(reader),
Err(_) => {
None
}
}
} else {
None
};
Ok(V2Exporter {
config: export_config,
graph_file,
wal_config,
wal_reader,
})
}
pub fn analyze_consistency(&self) -> NativeResult<ExportConsistencyReport> {
let recovery_context = RecoveryContext::analyze_files(
&self.wal_config.wal_path,
self.graph_file.file_path(),
&self.wal_config.checkpoint_path,
)?;
let (checkpoint_lsn, committed_lsn, current_lsn, active_transactions) =
if let Some(wal_reader) = &self.wal_reader {
let wal_header = wal_reader.header();
(
wal_header.checkpointed_lsn,
wal_header.committed_lsn,
wal_header.current_lsn,
wal_header.active_transactions,
)
} else if recovery_context.wal_path.is_some() {
(0, 0, 0, 0)
} else {
(0, 0, 0, 0)
};
let recommended_export_mode = self.determine_export_mode(recovery_context.state);
Ok(ExportConsistencyReport {
recovery_state: recovery_context.state,
authority: recovery_context.authority,
checkpoint_lsn,
committed_lsn,
current_lsn,
active_transactions,
recommended_export_mode,
})
}
pub fn export_checkpoint_aligned(&self) -> NativeResult<ExportResult> {
let start_time = std::time::Instant::now();
let consistency_report = self.analyze_consistency()?;
match consistency_report.recovery_state {
ExplicitRecoveryState::CleanShutdown => {
}
ExplicitRecoveryState::PartialCheckpoint => {
}
_ => {
return Err(NativeBackendError::InvalidState {
context: format!(
"Checkpoint-aligned export requires CleanShutdown or PartialCheckpoint state, got {:?}",
consistency_report.recovery_state
),
source: None,
});
}
}
let export_timestamp = std::time::SystemTime::now()
.duration_since(std::time::UNIX_EPOCH)
.map_err(NativeBackendError::from)?
.as_secs();
let base_filename = format!("v2_export_checkpoint_{}", export_timestamp);
let manifest_filename = format!("{}.manifest", base_filename);
let graph_filename = format!("{}.graph", base_filename);
let manifest_path = self.config.export_path.join(manifest_filename);
let graph_file_path = self.config.export_path.join(graph_filename);
let graph_bytes_copied = std::fs::copy(self.graph_file.file_path(), &graph_file_path)
.map_err(|e| NativeBackendError::IoError {
context: format!(
"Failed to copy graph file from {:?} to {:?}",
self.graph_file.file_path(),
&graph_file_path
),
source: e,
})?;
let manifest = ExportManifest {
magic: ExportManifest::MAGIC,
version: ExportManifest::VERSION,
recovery_state: consistency_report.recovery_state,
authority: consistency_report.authority,
export_mode: ExportMode::CheckpointAligned,
graph_checkpoint_lsn: consistency_report.checkpoint_lsn,
wal_start_lsn: None, wal_end_lsn: None,
graph_format_version: 2,
wal_format_version: 2,
v2_clustered_edges: true,
export_timestamp,
export_duration_ms: 0, graph_checksum: consistency_report.checkpoint_lsn, wal_checksum: None,
total_records: 0, total_bytes: graph_bytes_copied,
reserved: [0; 8],
};
let manifest_content = format!(
"V2 Checkpoint-Aligned Export Manifest\n\
Magic: {:?}\n\
Version: {}\n\
Recovery State: {:?}\n\
Authority: {:?}\n\
Export Mode: {:?}\n\
Graph Checkpoint LSN: {}\n\
Graph Format Version: {}\n\
WAL Format Version: {}\n\
V2 Clustered Edges: {}\n\
Export Timestamp: {}\n\
Graph Checksum: {}\n\
Total Records: {}\n\
Total Bytes: {}\n\
Note: Checkpoint-aligned export - database is in clean state",
manifest.magic,
manifest.version,
manifest.recovery_state,
manifest.authority,
manifest.export_mode,
manifest.graph_checkpoint_lsn,
manifest.graph_format_version,
manifest.wal_format_version,
manifest.v2_clustered_edges,
manifest.export_timestamp,
manifest.graph_checksum,
manifest.total_records,
manifest.total_bytes
);
std::fs::write(&manifest_path, manifest_content).map_err(|e| {
NativeBackendError::IoError {
context: format!(
"Failed to write checkpoint-aligned manifest file: {:?}",
&manifest_path
),
source: e,
}
})?;
let export_duration = start_time.elapsed();
Ok(ExportResult {
manifest_path,
graph_file_path,
wal_file_path: None, records_exported: 0,
bytes_exported: manifest.total_bytes,
export_duration,
checksum: manifest.graph_checksum,
})
}
pub fn export_lsn_bounded(&self, from_lsn: u64, to_lsn: u64) -> NativeResult<ExportResult> {
let start_time = std::time::Instant::now();
if from_lsn > to_lsn {
return Err(NativeBackendError::InvalidParameter {
context: format!(
"LSN range invalid: from_lsn ({}) > to_lsn ({})",
from_lsn, to_lsn
),
source: None,
});
}
let consistency_report = self.analyze_consistency()?;
if !self.wal_config.wal_path.exists() {
return Err(NativeBackendError::InvalidState {
context: "LSN-bounded export requires WAL file to be present".to_string(),
source: None,
});
}
if let Some(ref wal_reader) = self.wal_reader {
let wal_header = wal_reader.header();
if from_lsn > wal_header.committed_lsn || to_lsn > wal_header.committed_lsn {
return Err(NativeBackendError::InvalidState {
context: format!(
"LSN range [{}, {}] exceeds committed LSN ({})",
from_lsn, to_lsn, wal_header.committed_lsn
),
source: None,
});
}
}
let export_timestamp = std::time::SystemTime::now()
.duration_since(std::time::UNIX_EPOCH)
.map_err(NativeBackendError::from)?
.as_secs();
let base_filename = format!("v2_export_lsn_{}_to_{}", from_lsn, to_lsn);
let manifest_filename = format!("{}.manifest", base_filename);
let graph_filename = format!("{}.graph", base_filename);
let wal_filename = format!("{}.wal", base_filename);
let manifest_path = self.config.export_path.join(manifest_filename);
let graph_file_path = self.config.export_path.join(graph_filename);
let wal_file_path = self.config.export_path.join(wal_filename);
let graph_bytes_copied = std::fs::copy(self.graph_file.file_path(), &graph_file_path)
.map_err(|e| NativeBackendError::IoError {
context: format!(
"Failed to copy graph file from {:?} to {:?}",
self.graph_file.file_path(),
&graph_file_path
),
source: e,
})?;
let wal_bytes_copied =
std::fs::copy(&self.wal_config.wal_path, &wal_file_path).map_err(|e| {
NativeBackendError::IoError {
context: format!(
"Failed to copy WAL file from {:?} to {:?}",
&self.wal_config.wal_path, &wal_file_path
),
source: e,
}
})?;
let manifest = ExportManifest {
magic: ExportManifest::MAGIC,
version: ExportManifest::VERSION,
recovery_state: consistency_report.recovery_state,
authority: consistency_report.authority,
export_mode: ExportMode::LsnBounded,
graph_checkpoint_lsn: consistency_report.checkpoint_lsn,
wal_start_lsn: Some(from_lsn),
wal_end_lsn: Some(to_lsn),
graph_format_version: 2,
wal_format_version: 2,
v2_clustered_edges: true,
export_timestamp,
export_duration_ms: 0, graph_checksum: from_lsn.wrapping_add(to_lsn), wal_checksum: Some(from_lsn.wrapping_add(to_lsn)),
total_records: 0, total_bytes: graph_bytes_copied + wal_bytes_copied,
reserved: [0; 8],
};
let manifest_content = format!(
"V2 LSN-Bounded Export Manifest\n\
Magic: {:?}\n\
Version: {}\n\
Recovery State: {:?}\n\
Authority: {:?}\n\
Export Mode: {:?}\n\
Graph Checkpoint LSN: {}\n\
WAL Start LSN: {:?}\n\
WAL End LSN: {:?}\n\
Graph Format Version: {}\n\
WAL Format Version: {}\n\
V2 Clustered Edges: {}\n\
Export Timestamp: {}\n\
Graph Checksum: {}\n\
WAL Checksum: {:?}\n\
Total Records: {}\n\
Total Bytes: {}\n\
Note: LSN-bounded export from {} to {}",
manifest.magic,
manifest.version,
manifest.recovery_state,
manifest.authority,
manifest.export_mode,
manifest.graph_checkpoint_lsn,
manifest.wal_start_lsn,
manifest.wal_end_lsn,
manifest.graph_format_version,
manifest.wal_format_version,
manifest.v2_clustered_edges,
manifest.export_timestamp,
manifest.graph_checksum,
manifest.wal_checksum,
manifest.total_records,
manifest.total_bytes,
from_lsn,
to_lsn
);
std::fs::write(&manifest_path, manifest_content).map_err(|e| {
NativeBackendError::IoError {
context: format!(
"Failed to write LSN-bounded manifest file: {:?}",
&manifest_path
),
source: e,
}
})?;
let export_duration = start_time.elapsed();
Ok(ExportResult {
manifest_path,
graph_file_path,
wal_file_path: Some(wal_file_path),
records_exported: 0,
bytes_exported: manifest.total_bytes,
export_duration,
checksum: manifest.graph_checksum,
})
}
pub fn export_full(&self) -> NativeResult<ExportResult> {
let start_time = std::time::Instant::now();
let export_timestamp = std::time::SystemTime::now()
.duration_since(std::time::UNIX_EPOCH)
.map_err(NativeBackendError::from)?
.as_secs();
let base_filename = format!("v2_export_{}", export_timestamp);
let manifest_filename = format!("{}.manifest", base_filename);
let graph_filename = format!("{}.graph", base_filename);
let wal_filename = format!("{}.wal", base_filename);
let manifest_path = self.config.export_path.join(manifest_filename);
let graph_file_path = self.config.export_path.join(graph_filename);
let wal_file_path = self.config.export_path.join(wal_filename);
let consistency_report = self.analyze_consistency()?;
let graph_bytes_copied = std::fs::copy(self.graph_file.file_path(), &graph_file_path)
.map_err(|e| NativeBackendError::IoError {
context: format!(
"Failed to copy graph file from {:?} to {:?}",
self.graph_file.file_path(),
&graph_file_path
),
source: e,
})?;
let (wal_bytes_copied, final_wal_path) =
if self.wal_config.wal_path.exists() {
let bytes_copied = std::fs::copy(&self.wal_config.wal_path, &wal_file_path)
.map_err(|e| NativeBackendError::IoError {
context: format!(
"Failed to copy WAL file from {:?} to {:?}",
&self.wal_config.wal_path, &wal_file_path
),
source: e,
})?;
(Some(bytes_copied), Some(wal_file_path))
} else {
(None, None)
};
let manifest = ExportManifest {
magic: ExportManifest::MAGIC,
version: ExportManifest::VERSION,
recovery_state: consistency_report.recovery_state,
authority: consistency_report.authority,
export_mode: ExportMode::Full,
graph_checkpoint_lsn: consistency_report.checkpoint_lsn,
wal_start_lsn: if wal_bytes_copied.is_some() {
Some(0)
} else {
None
},
wal_end_lsn: if wal_bytes_copied.is_some() {
Some(consistency_report.committed_lsn)
} else {
None
},
graph_format_version: 2,
wal_format_version: 2,
v2_clustered_edges: true,
export_timestamp,
export_duration_ms: 0, graph_checksum: 0, wal_checksum: None, total_records: 0, total_bytes: graph_bytes_copied + wal_bytes_copied.unwrap_or(0),
reserved: [0; 8],
};
let checksum = graph_bytes_copied + wal_bytes_copied.unwrap_or(0);
let manifest_content = format!(
"V2 Export Manifest\n\
Magic: {:?}\n\
Version: {}\n\
Recovery State: {:?}\n\
Authority: {:?}\n\
Export Mode: {:?}\n\
Graph Checkpoint LSN: {}\n\
WAL Start LSN: {:?}\n\
WAL End LSN: {:?}\n\
Graph Format Version: {}\n\
WAL Format Version: {}\n\
V2 Clustered Edges: {}\n\
Export Timestamp: {}\n\
Graph Checksum: {}\n\
Total Records: {}\n\
Total Bytes: {}\n",
manifest.magic,
manifest.version,
manifest.recovery_state,
manifest.authority,
manifest.export_mode,
manifest.graph_checkpoint_lsn,
manifest.wal_start_lsn,
manifest.wal_end_lsn,
manifest.graph_format_version,
manifest.wal_format_version,
manifest.v2_clustered_edges,
manifest.export_timestamp,
checksum,
manifest.total_records,
manifest.total_bytes
);
std::fs::write(&manifest_path, manifest_content).map_err(|e| {
NativeBackendError::IoError {
context: format!("Failed to write manifest file: {:?}", &manifest_path),
source: e,
}
})?;
let export_duration = start_time.elapsed();
Ok(ExportResult {
manifest_path,
graph_file_path,
wal_file_path: final_wal_path,
records_exported: 0, bytes_exported: manifest.total_bytes,
export_duration,
checksum,
})
}
fn determine_export_mode(&self, recovery_state: ExplicitRecoveryState) -> ExportMode {
match recovery_state {
ExplicitRecoveryState::CleanShutdown => ExportMode::CheckpointAligned,
ExplicitRecoveryState::DirtyShutdown => ExportMode::LsnBounded,
ExplicitRecoveryState::PartialCheckpoint => ExportMode::LsnBounded,
_ => ExportMode::Full,
}
}
}