use byteorder::{LittleEndian, ReadBytesExt, WriteBytesExt};
use serde::{Deserialize, Serialize};
use std::io::{Read, Result as IoResult, Write};
pub const DATABASE_MAGIC: &[u8; 4] = b"RKDB";
pub const DATABASE_VERSION: u16 = 2;
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct DatabaseHeader {
pub magic: [u8; 4],
pub version: u16,
pub kmer_size: u8,
pub total_kmers: u64,
pub sorted: bool,
pub data_offset: u64,
pub index_offset: u64,
pub canonical: bool,
pub unique_kmers: u64,
pub file_size: u64,
}
impl Default for DatabaseHeader {
fn default() -> Self {
Self {
magic: *DATABASE_MAGIC,
version: DATABASE_VERSION,
kmer_size: 0,
total_kmers: 0,
sorted: false,
data_offset: 0,
index_offset: 0,
canonical: false,
unique_kmers: 0,
file_size: 0,
}
}
}
impl DatabaseHeader {
pub fn new(kmer_size: u8, total_kmers: u64, canonical: bool) -> Self {
Self {
magic: *DATABASE_MAGIC,
version: DATABASE_VERSION,
kmer_size,
total_kmers,
sorted: false,
data_offset: 42,
index_offset: 0,
canonical,
unique_kmers: total_kmers,
file_size: 0,
}
}
pub fn write_to<W: Write>(&self, writer: &mut W) -> IoResult<()> {
writer.write_all(&self.magic)?;
writer.write_u16::<LittleEndian>(self.version)?;
writer.write_u8(self.kmer_size)?;
writer.write_u8(0)?;
writer.write_u16::<LittleEndian>(0)?;
writer.write_u64::<LittleEndian>(self.total_kmers)?;
let flags: u8 = if self.sorted { 1 } else { 0 } | if self.canonical { 2 } else { 0 };
writer.write_u8(flags)?;
writer.write_u8(0)?; writer.write_u8(0)?; writer.write_u8(0)?; writer.write_u8(0)?; writer.write_u8(0)?; writer.write_u8(0)?; writer.write_u8(0)?; writer.write_u64::<LittleEndian>(self.data_offset)?;
writer.write_u64::<LittleEndian>(self.index_offset)?;
Ok(())
}
pub fn read_from<R: Read>(reader: &mut R) -> IoResult<Self> {
let mut magic = [0u8; 4];
reader.read_exact(&mut magic)?;
if magic != *DATABASE_MAGIC {
return Err(std::io::Error::new(
std::io::ErrorKind::InvalidData,
"Invalid database magic number",
));
}
let version = reader.read_u16::<LittleEndian>()?;
if version != DATABASE_VERSION {
return Err(std::io::Error::new(
std::io::ErrorKind::InvalidData,
format!("Unsupported database version: {}", version),
));
}
let kmer_size = reader.read_u8()?;
let _padding1 = reader.read_u8()?;
let _padding2 = reader.read_u16::<LittleEndian>()?;
let total_kmers = reader.read_u64::<LittleEndian>()?;
let flags = reader.read_u8()?;
let sorted = (flags & 1) != 0;
let canonical = (flags & 2) != 0;
let _padding1 = reader.read_u8()?;
let _padding2 = reader.read_u8()?;
let _padding3 = reader.read_u8()?;
let _padding4 = reader.read_u8()?;
let _padding5 = reader.read_u8()?;
let _padding6 = reader.read_u8()?;
let _padding7 = reader.read_u8()?;
let data_offset = reader.read_u64::<LittleEndian>()?;
let index_offset = reader.read_u64::<LittleEndian>()?;
Ok(Self {
magic,
version,
kmer_size,
total_kmers,
sorted,
data_offset,
index_offset,
canonical,
unique_kmers: total_kmers, file_size: 0, })
}
pub fn validate(&self) -> Result<(), String> {
if self.kmer_size == 0 || self.kmer_size > 127 {
return Err(format!("Invalid k-mer size: {}", self.kmer_size));
}
if self.data_offset < 40 || self.data_offset > 1000 {
return Err(format!("Invalid data offset: {}", self.data_offset));
}
if self.index_offset > 0 && self.index_offset <= self.data_offset {
return Err("Invalid index offset".to_string());
}
Ok(())
}
}
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
pub struct KmerEntry {
pub kmer: u128,
pub count: u32,
}
impl KmerEntry {
pub fn new(kmer: u128, count: u32) -> Self {
Self { kmer, count }
}
pub fn write_to<W: Write>(&self, writer: &mut W) -> IoResult<()> {
writer.write_u128::<LittleEndian>(self.kmer)?;
writer.write_u32::<LittleEndian>(self.count)
}
pub fn read_from<R: Read>(reader: &mut R) -> IoResult<Self> {
let kmer = reader.read_u128::<LittleEndian>()?;
let count_bytes = {
let mut buf = [0u8; 4];
reader.read_exact(&mut buf)?;
buf
};
let count_le = u32::from_le_bytes(count_bytes);
let count_be = u32::from_be_bytes(count_bytes);
let count = if count_le > 1_000_000 {
count_be
} else {
count_le
};
Ok(Self { kmer, count })
}
}
#[derive(Debug, Clone)]
pub enum DatabaseFormat {
Standard,
Indexed,
Compressed,
}
impl DatabaseFormat {
pub fn extension(&self) -> &'static str {
match self {
DatabaseFormat::Standard => "rkdb",
DatabaseFormat::Indexed => "rkdb",
DatabaseFormat::Compressed => "rkdbz",
}
}
}
#[derive(Debug)]
pub struct RKDatabase {
pub header: DatabaseHeader,
pub entries: Vec<KmerEntry>,
pub file_path: Option<std::path::PathBuf>,
}
impl RKDatabase {
pub fn new(header: DatabaseHeader) -> Self {
Self {
header,
entries: Vec::new(),
file_path: None,
}
}
pub fn header(&self) -> &DatabaseHeader {
&self.header
}
pub fn from_file_path(path: &std::path::Path) -> crate::error::ProcessingResult<Self> {
use std::fs::File;
use std::io::{BufReader, Seek, SeekFrom};
let file_path = path.to_path_buf();
let file =
File::open(path).map_err(|e| crate::error::ProcessingError::io_error(e.to_string()))?;
let mut reader = BufReader::new(file);
let header = DatabaseHeader::read_from(&mut reader)?;
let actual_data_offset = if header.data_offset < 40 {
42 } else if header.data_offset > 1000 {
42 } else {
header.data_offset
};
reader
.seek(SeekFrom::Start(actual_data_offset))
.map_err(|e| {
crate::error::ProcessingError::io_error(format!(
"Failed to seek to data section: {}",
e
))
})?;
let mut entries = Vec::with_capacity(header.total_kmers as usize);
for _ in 0..header.total_kmers {
let entry = KmerEntry::read_from(&mut reader).map_err(|e| {
crate::error::ProcessingError::io_error(format!(
"Failed to read k-mer entry: {}",
e
))
})?;
entries.push(entry);
}
Ok(Self {
header,
entries,
file_path: Some(file_path),
})
}
pub fn from_file_path_mapped(path: &std::path::Path) -> crate::error::ProcessingResult<Self> {
Self::from_file_path(path)
}
pub fn read_from<R: std::io::Read>(reader: &mut R) -> crate::error::ProcessingResult<Self> {
let header = DatabaseHeader::read_from(reader)?;
Ok(Self {
header,
entries: Vec::new(),
file_path: None,
})
}
pub fn write_to_file(&self, path: &std::path::Path) -> crate::error::ProcessingResult<()> {
use std::fs::File;
use std::io::BufWriter;
let file = File::create(path)
.map_err(|e| crate::error::ProcessingError::io_error(e.to_string()))?;
let mut writer = BufWriter::new(file);
self.write_to(&mut writer)
}
pub fn write_to<W: std::io::Write>(
&self,
writer: &mut W,
) -> crate::error::ProcessingResult<()> {
self.header.write_to(writer)?;
for entry in &self.entries {
entry.write_to(writer)?;
}
Ok(())
}
pub fn kmer_size(&self) -> usize {
self.header.kmer_size as usize
}
pub fn size(&self) -> Option<u64> {
Some(self.header.total_kmers)
}
pub fn query_kmer(&self, kmer: &str) -> Option<u64> {
let query_encoded = match crate::kmer::encoding::encode_kmer_u128(kmer) {
Ok(encoded) => encoded,
Err(_) => return None,
};
if self.header.sorted {
self.binary_search_kmer(query_encoded)
} else {
self.linear_search_kmer(query_encoded)
}
}
fn binary_search_kmer(&self, query_encoded: u128) -> Option<u64> {
use std::cmp::Ordering;
let mut left = 0;
let mut right = self.entries.len();
while left < right {
let mid = left + (right - left) / 2;
let mid_kmer = self.entries[mid].kmer;
match query_encoded.cmp(&mid_kmer) {
Ordering::Equal => return Some(self.entries[mid].count as u64),
Ordering::Less => right = mid,
Ordering::Greater => left = mid + 1,
}
}
None
}
fn linear_search_kmer(&self, query_encoded: u128) -> Option<u64> {
for entry in &self.entries {
if entry.kmer == query_encoded {
return Some(entry.count as u64);
}
}
None
}
pub fn all_kmers(&self) -> crate::error::ProcessingResult<Vec<(u128, u32)>> {
let mut kmers = Vec::with_capacity(self.entries.len());
for entry in &self.entries {
kmers.push((entry.kmer, entry.count));
}
Ok(kmers)
}
pub fn from_kmer_pairs(
kmer_pairs: Vec<(u128, u32)>,
kmer_size: u8,
canonical: bool,
sorted: bool,
) -> crate::error::ProcessingResult<Self> {
let mut entries: Vec<KmerEntry> = kmer_pairs
.into_iter()
.map(|(kmer, count)| KmerEntry::new(kmer, count))
.collect();
if sorted && !entries.is_empty() {
entries.sort_by_key(|entry| entry.kmer);
}
let header = DatabaseHeader {
magic: *crate::database::format::DATABASE_MAGIC,
version: crate::database::format::DATABASE_VERSION,
kmer_size,
total_kmers: entries.len() as u64,
canonical,
sorted,
data_offset: 42, index_offset: 0,
unique_kmers: entries.len() as u64,
file_size: 0, };
Ok(Self {
header,
entries,
file_path: None,
})
}
pub fn to_file_path(&self, path: &std::path::Path) -> crate::error::ProcessingResult<()> {
self.write_to_file(path)?;
Ok(())
}
pub fn kmer_size_u8(&self) -> u8 {
self.header.kmer_size
}
pub fn total_kmers(&self) -> u64 {
self.header.total_kmers
}
pub fn is_canonical(&self) -> bool {
self.header.canonical
}
pub fn validate_compatibility(
databases: &[&RKDatabase],
) -> crate::error::ProcessingResult<(usize, bool)> {
Self::validate_compatibility_verbose(databases, false)
}
pub fn validate_compatibility_verbose(
databases: &[&RKDatabase],
verbose: bool,
) -> crate::error::ProcessingResult<(usize, bool)> {
if databases.is_empty() {
return Err(crate::error::ProcessingError::new(
"At least one database is required for validation",
));
}
let first_db = &databases[0];
let kmer_size = first_db.kmer_size();
let canonical = first_db.is_canonical();
if verbose {
eprintln!("Validating compatibility for {} databases", databases.len());
eprintln!(
" Reference database: k-mer size={}, canonical={}",
kmer_size, canonical
);
}
for (i, db) in databases.iter().enumerate().skip(1) {
if db.kmer_size() != kmer_size {
let mut msg = format!(
"Database {} has k-mer size {}, expected {}",
i + 1,
db.kmer_size(),
kmer_size
);
if verbose {
msg.push_str(&format!(
"\n Database 1: k-mer size={}, canonical={}, k-mers={}",
kmer_size,
canonical,
first_db.header().total_kmers
));
msg.push_str(&format!(
"\n Database {}: k-mer size={}, canonical={}, k-mers={}",
i + 1,
db.kmer_size(),
db.is_canonical(),
db.header().total_kmers
));
msg.push_str("\n Hint: All databases must have the same k-mer size to merge");
}
return Err(crate::error::ProcessingError::new(msg));
}
if db.is_canonical() != canonical {
let mut msg = format!(
"Database {} has canonical mode {}, expected {}",
i + 1,
db.is_canonical(),
canonical
);
if verbose {
msg.push_str(&format!(
"\n Database 1: k-mer size={}, canonical={}, k-mers={}",
kmer_size,
canonical,
first_db.header().total_kmers
));
msg.push_str(&format!(
"\n Database {}: k-mer size={}, canonical={}, k-mers={}",
i + 1,
db.kmer_size(),
db.is_canonical(),
db.header().total_kmers
));
msg.push_str(
"\n Hint: All databases must have the same canonical mode to merge",
);
msg.push_str("\n Canonical mode merges reverse complements together");
msg.push_str(
"\n Note: Use --use-prefix-cache for flexible canonical mode merging",
);
}
return Err(crate::error::ProcessingError::new(msg));
}
if verbose {
eprintln!(
" Database {}: compatible (k-mer size={}, canonical={})",
i + 1,
db.kmer_size(),
db.is_canonical()
);
}
}
if verbose {
eprintln!("All databases are compatible");
}
Ok((kmer_size, canonical))
}
pub fn merge_databases(
input_paths: &[std::path::PathBuf],
config: &crate::database::MergeConfig,
) -> crate::error::ProcessingResult<Self> {
let total_kmers: u64 = input_paths
.iter()
.map(|path| {
let db = Self::from_file_path(path)?;
Ok(db.total_kmers())
})
.collect::<crate::error::ProcessingResult<Vec<_>>>()?
.iter()
.sum();
let estimated_memory = total_kmers as usize * 24;
if config.use_prefix_cache {
if config.verbose {
eprintln!(
"DEBUG: Using prefix cache merge for {} k-mers (estimated {} bytes)",
total_kmers, estimated_memory
);
}
return Self::merge_databases_prefix_cache(input_paths, config);
}
let use_streaming = Self::should_use_streaming(input_paths, config)?;
if use_streaming {
if config.verbose {
eprintln!(
"DEBUG: Using streaming merge for {} k-mers (estimated {} bytes)",
total_kmers, estimated_memory
);
}
Self::merge_databases_streaming(input_paths, config)
} else {
if config.verbose {
eprintln!(
"DEBUG: Using in-memory merge for {} k-mers (estimated {} bytes)",
total_kmers, estimated_memory
);
}
Self::merge_databases_inmemory(input_paths, config)
}
}
fn should_use_streaming(
input_paths: &[std::path::PathBuf],
config: &crate::database::MergeConfig,
) -> crate::error::ProcessingResult<bool> {
use crate::database::format::RKDatabase;
let mut total_kmers = 0u64;
for path in input_paths {
let db = RKDatabase::from_file_path(path)?;
total_kmers += db.total_kmers();
}
let estimated_memory = total_kmers as usize * 24;
Ok(estimated_memory > config.max_memory_usage)
}
fn merge_databases_streaming(
input_paths: &[std::path::PathBuf],
config: &crate::database::MergeConfig,
) -> crate::error::ProcessingResult<Self> {
use crate::database::format::RKDatabase;
use crate::database::streaming_merge::ExternalMerger;
use std::time::Instant;
if config.verbose {
eprintln!("Using streaming merge for large datasets");
}
let start_time = Instant::now();
let first_db = RKDatabase::from_file_path(&input_paths[0])?;
let kmer_size = first_db.kmer_size();
let canonical = first_db.is_canonical();
let mut merger = ExternalMerger::new(config.chunk_size, config.temp_dir.clone());
for path in input_paths {
if config.verbose {
eprintln!("Sorting database: {}", path.display());
}
merger.sort_database(path)?;
}
if config.verbose {
eprintln!("Merging sorted chunks...");
}
let merge_iter = merger.merge_sorted_chunks()?;
let mut sorted_kmers: Vec<(u128, u32)> = Vec::new();
for result in merge_iter {
match result {
Ok((kmer, count)) => sorted_kmers.push((kmer, count)),
Err(e) => return Err(e),
}
}
if config.verbose {
let stats = merger.stats();
eprintln!("Streaming merge stats:");
eprintln!(" Total k-mers read: {}", stats.total_kmers_read);
eprintln!(" Chunks created: {}", stats.chunks_created);
eprintln!(" Read time: {:?}", stats.read_time);
eprintln!(" Sort time: {:?}", stats.sort_time);
eprintln!(" Merge time: {:?}", stats.merge_time);
eprintln!(" Write time: {:?}", stats.write_time);
eprintln!(" Total time: {:?}", start_time.elapsed());
}
Self::from_kmer_pairs(sorted_kmers, kmer_size as u8, canonical, true)
}
fn merge_databases_inmemory(
input_paths: &[std::path::PathBuf],
config: &crate::database::MergeConfig,
) -> crate::error::ProcessingResult<Self> {
use hashbrown::hash_map::DefaultHashBuilder;
use hashbrown::HashMap as HashMapBrown;
use indicatif::{ProgressBar, ProgressStyle};
use std::time::Instant;
let _start_time = Instant::now();
if input_paths.is_empty() {
return Err(crate::error::ProcessingError::new(
"At least one input database is required",
));
}
type KmerMap = HashMapBrown<u128, u32, DefaultHashBuilder>;
let mut all_kmers: KmerMap = KmerMap::default();
#[allow(unused_assignments)]
let mut kmer_size = None;
#[allow(unused_assignments)]
let mut canonical = None;
let mut sorted = true;
let mut _total_input_kmers = 0u64;
let progress = if config.verbose && input_paths.len() > 1 {
Some(ProgressBar::new(input_paths.len() as u64))
} else {
None
};
if let Some(ref pb) = progress {
pb.set_style(
ProgressStyle::default_bar()
.template("{spinner:.green} [{elapsed_precise}] [{bar:40.cyan/blue}] {pos}/{len} {msg}")
.unwrap()
.progress_chars("#>-")
);
pb.set_message("Loading databases...");
}
let mut databases = Vec::with_capacity(input_paths.len());
for path in input_paths {
let db = Self::from_file_path(path)?;
databases.push(db);
}
let (kmer_size_val, canonical_val) = Self::validate_compatibility_verbose(
&databases.iter().collect::<Vec<_>>(),
config.verbose,
)?;
kmer_size = Some(kmer_size_val);
canonical = Some(canonical_val);
for (i, db) in databases.iter().enumerate() {
let db_kmers = db.all_kmers()?;
for (kmer, count) in db_kmers {
let entry = all_kmers.entry(kmer).or_insert(0);
*entry = match (*entry).checked_add(count) {
Some(sum) => sum,
None => {
u32::MAX
}
};
_total_input_kmers += count as u64;
}
sorted = sorted && db.header().sorted;
if let Some(ref pb) = progress {
let msg = format!("Loaded database {} ({})", i + 1, input_paths[i].display());
pb.set_message(msg);
pb.inc(1);
}
}
if let Some(ref pb) = progress {
pb.finish_with_message("All databases loaded");
}
let kmer_size = kmer_size.unwrap();
let canonical = canonical.unwrap();
let sort_progress = if config.verbose && all_kmers.len() > 10000 {
let pb = ProgressBar::new(all_kmers.len() as u64);
pb.set_style(
ProgressStyle::default_bar()
.template("{spinner:.green} [{elapsed_precise}] [{bar:40.red/yellow}] {pos}/{len} ({eta}) {msg}")
.unwrap()
.progress_chars("#>-")
);
pb.set_message("Sorting k-mers...");
Some(pb)
} else {
None
};
let mut sorted_kmers: Vec<(u128, u32)> = Vec::with_capacity(all_kmers.len());
if let Some(ref pb) = sort_progress {
for (i, (kmer, count)) in all_kmers.into_iter().enumerate() {
sorted_kmers.push((kmer, count));
if i % 10000 == 0 {
pb.set_position(i as u64);
}
}
pb.finish_with_message("Sorting...");
} else {
sorted_kmers = all_kmers.into_iter().collect();
}
if let Some(ref pb) = sort_progress {
pb.set_message("Sorting k-mers...");
}
sorted_kmers.sort_by_key(|(kmer, _)| *kmer);
if let Some(ref pb) = sort_progress {
pb.finish_with_message("Sorting complete");
}
Self::from_kmer_pairs(sorted_kmers, kmer_size as u8, canonical, sorted)
}
fn merge_databases_prefix_cache(
input_paths: &[std::path::PathBuf],
config: &crate::database::MergeConfig,
) -> crate::error::ProcessingResult<Self> {
use crate::database::prefix_cache_merge::ExternalSortMerger;
use std::time::Instant;
let start_time = Instant::now();
if input_paths.is_empty() {
return Err(crate::error::ProcessingError::new(
"At least one input database is required",
));
}
let mut db_refs = Vec::new();
for path in input_paths {
let db = Self::from_file_path(path)?;
db_refs.push(db);
}
let db_refs_slice: Vec<&Self> = db_refs.iter().collect();
let merge_buffer_mb = (config.max_memory_usage / 1024 / 1024) as usize;
let (_kmer_size, _final_canonical) =
Self::validate_compatibility_external_sort(&db_refs_slice, config.verbose)?;
let mut merger = ExternalSortMerger::new(
input_paths.to_vec(),
config.temp_dir.clone(),
merge_buffer_mb.max(1024), config.num_threads,
config.merge_mode.clone(),
config.keep_intermediate,
)?;
let temp_output = config.temp_dir.join("external_sort_merge_output.tmp");
merger.external_sort_merge(&temp_output)?;
let _elapsed = start_time.elapsed();
let result_db = Self::from_file_path(&temp_output)?;
Ok(result_db)
}
fn validate_compatibility_external_sort(
db_refs: &[&Self],
verbose: bool,
) -> crate::error::ProcessingResult<(u8, bool)> {
let first_db = db_refs[0];
let kmer_size = first_db.kmer_size();
let canonical = first_db.is_canonical();
if verbose {
eprintln!("Validating databases for external sort merge...");
}
let mut has_canonical = false;
let mut has_non_canonical = false;
for (i, db) in db_refs.iter().enumerate() {
if db.kmer_size() != kmer_size {
let mut msg = format!(
"Database {} has k-mer size {}, expected {}",
i + 1,
db.kmer_size(),
kmer_size
);
if verbose {
msg.push_str(&format!(
"\n Database 1: k-mer size={}, canonical={}, k-mers={}",
kmer_size,
canonical,
first_db.header().total_kmers
));
msg.push_str(&format!(
"\n Database {}: k-mer size={}, canonical={}, k-mers={}",
i + 1,
db.kmer_size(),
db.is_canonical(),
db.header().total_kmers
));
msg.push_str("\n Hint: All databases must have the same k-mer size to merge");
}
return Err(crate::error::ProcessingError::new(msg));
}
if db.is_canonical() {
has_canonical = true;
} else {
has_non_canonical = true;
}
if verbose {
eprintln!(
" Database {}: compatible (k-mer size={}, canonical={})",
i + 1,
db.kmer_size(),
db.is_canonical()
);
}
}
let final_canonical = has_canonical;
if verbose {
if has_canonical && has_non_canonical {
eprintln!(" Mixed canonical modes detected - converting all to canonical mode");
}
eprintln!(" Final merge mode: canonical={}", final_canonical);
}
Ok((kmer_size.try_into().unwrap(), final_canonical))
}
}
#[cfg(test)]
mod tests {
use super::super::memory::{constraints, MemoryMonitor};
use super::*;
use tempfile::tempdir;
#[test]
fn test_merge_memory_basic() {
let db1 = RKDatabase::from_kmer_pairs(
vec![(0x1234, 10), (0x5678, 20), (0x9ABC, 30)],
31,
false,
true,
)
.unwrap();
let db2 = RKDatabase::from_kmer_pairs(
vec![(0x1234, 5), (0xDEF0, 15), (0x9ABC, 25)],
31,
false,
true,
)
.unwrap();
let temp_dir = tempdir().unwrap();
let db1_path = temp_dir.path().join("db1.rkdb");
let db2_path = temp_dir.path().join("db2.rkdb");
db1.to_file_path(&db1_path).unwrap();
db2.to_file_path(&db2_path).unwrap();
let mut monitor = MemoryMonitor::new();
let config = crate::database::MergeConfig {
max_memory_usage: constraints::SMALL.max_usage,
chunk_size: 100,
temp_dir: temp_dir.path().to_path_buf(),
use_streaming: false,
use_prefix_cache: false,
num_threads: 0,
merge_mode: "auto".to_string(),
keep_intermediate: false,
verbose: false,
};
let merged_db = RKDatabase::merge_databases(&[db1_path, db2_path], &config)
.expect("Merge should succeed");
monitor.record_reading();
let all_kmers = merged_db.all_kmers().expect("Failed to get merged k-mers");
assert!(!all_kmers.is_empty(), "Merged database should have k-mers");
let kmer_map: std::collections::HashMap<_, _> = all_kmers.into_iter().collect();
assert_eq!(kmer_map.get(&0x1234), Some(&15)); assert_eq!(kmer_map.get(&0x5678), Some(&20));
assert_eq!(kmer_map.get(&0x9ABC), Some(&55)); assert_eq!(kmer_map.get(&0xDEF0), Some(&15));
assert!(
monitor.peak_usage() < constraints::SMALL.max_usage,
"Memory usage should be within small constraint"
);
}
#[test]
fn test_merge_streaming_basic() {
let temp_dir = tempdir().unwrap();
let db1_path = temp_dir.path().join("db1.rkdb");
let db2_path = temp_dir.path().join("db2.rkdb");
let db1 = RKDatabase::from_kmer_pairs(
vec![(0x0010, 10), (0x0020, 20), (0x0030, 30)],
31,
false,
true,
)
.unwrap();
let db2 = RKDatabase::from_kmer_pairs(
vec![(0x0010, 5), (0x0040, 15), (0x0030, 25)],
31,
false,
true,
)
.unwrap();
db1.to_file_path(&db1_path).unwrap();
db2.to_file_path(&db2_path).unwrap();
let config = crate::database::MergeConfig {
max_memory_usage: 1024,
chunk_size: 2,
temp_dir: temp_dir.path().to_path_buf(),
use_streaming: true,
use_prefix_cache: false,
num_threads: 0,
merge_mode: "auto".to_string(),
keep_intermediate: false,
verbose: false,
};
let merged_db = RKDatabase::merge_databases(&[db1_path, db2_path], &config)
.expect("Streaming merge should succeed");
let all_kmers = merged_db.all_kmers().unwrap();
let kmer_map: std::collections::HashMap<_, _> = all_kmers.into_iter().collect();
assert_eq!(kmer_map.get(&0x0010), Some(&15));
assert_eq!(kmer_map.get(&0x0020), Some(&20));
assert_eq!(kmer_map.get(&0x0030), Some(&55));
assert_eq!(kmer_map.get(&0x0040), Some(&15));
}
#[test]
fn test_database_header_serialization() {
let header = DatabaseHeader::new(21, 1000000, true);
let mut buffer = Vec::new();
header.write_to(&mut buffer).unwrap();
let mut reader = std::io::Cursor::new(buffer);
let loaded_header = DatabaseHeader::read_from(&mut reader).unwrap();
assert_eq!(header.kmer_size, loaded_header.kmer_size);
assert_eq!(header.total_kmers, loaded_header.total_kmers);
assert_eq!(header.canonical, loaded_header.canonical);
}
#[test]
fn test_kmer_entry_serialization() {
let entry = KmerEntry::new(0x0123456789ABCDEF0123456789ABCDEF0, 42);
let mut buffer = Vec::new();
entry.write_to(&mut buffer).unwrap();
let mut reader = std::io::Cursor::new(buffer);
let loaded_entry = KmerEntry::read_from(&mut reader).unwrap();
assert_eq!(entry.kmer, loaded_entry.kmer);
assert_eq!(entry.count, loaded_entry.count);
}
#[test]
fn test_database_header_validation() {
let mut header = DatabaseHeader::new(21, 1000, true);
assert!(header.validate().is_ok());
header.kmer_size = 0;
assert!(header.validate().is_err());
header.kmer_size = 128;
assert!(header.validate().is_err());
}
mod validate_compatibility_tests {
use super::*;
#[test]
fn test_validate_compatibility_empty() {
let result = RKDatabase::validate_compatibility(&[]);
assert!(result.is_err());
assert!(result
.unwrap_err()
.to_string()
.contains("At least one database is required"));
}
#[test]
fn test_validate_compatibility_single() {
let db = RKDatabase::from_kmer_pairs(vec![(0x1234, 10)], 31, false, true).unwrap();
let result = RKDatabase::validate_compatibility(&[&db]);
assert!(result.is_ok());
let (kmer_size, canonical) = result.unwrap();
assert_eq!(kmer_size, 31);
assert_eq!(canonical, false);
}
#[test]
fn test_validate_compatibility_matching() {
let db1 = RKDatabase::from_kmer_pairs(vec![(0x1234, 10)], 31, false, true).unwrap();
let db2 = RKDatabase::from_kmer_pairs(vec![(0x5678, 20)], 31, false, true).unwrap();
let result = RKDatabase::validate_compatibility(&[&db1, &db2]);
assert!(result.is_ok());
let (kmer_size, canonical) = result.unwrap();
assert_eq!(kmer_size, 31);
assert_eq!(canonical, false);
}
#[test]
fn test_validate_compatibility_kmer_size_mismatch() {
let db1 = RKDatabase::from_kmer_pairs(vec![(0x1234, 10)], 31, false, true).unwrap();
let db2 = RKDatabase::from_kmer_pairs(
vec![(0x5678, 20)],
51, false,
true,
)
.unwrap();
let result = RKDatabase::validate_compatibility(&[&db1, &db2]);
assert!(result.is_err());
let error_msg = result.unwrap_err().to_string();
assert!(error_msg.contains("k-mer size"));
assert!(error_msg.contains("31"));
assert!(error_msg.contains("51"));
}
#[test]
fn test_validate_compatibility_canonical_mismatch() {
let db1 = RKDatabase::from_kmer_pairs(
vec![(0x1234, 10)],
31,
true, true,
)
.unwrap();
let db2 = RKDatabase::from_kmer_pairs(
vec![(0x5678, 20)],
31,
false, true,
)
.unwrap();
let result = RKDatabase::validate_compatibility(&[&db1, &db2]);
assert!(result.is_err());
let error_msg = result.unwrap_err().to_string();
assert!(error_msg.contains("canonical mode"));
assert!(error_msg.contains("true"));
assert!(error_msg.contains("false"));
}
#[test]
fn test_validate_compatibility_multiple_mismatch() {
let db1 = RKDatabase::from_kmer_pairs(vec![(0x1234, 10)], 31, true, true).unwrap();
let db2 = RKDatabase::from_kmer_pairs(
vec![(0x5678, 20)],
51, true,
true,
)
.unwrap();
let db3 = RKDatabase::from_kmer_pairs(
vec![(0x9ABC, 30)],
31,
false, true,
)
.unwrap();
let result = RKDatabase::validate_compatibility(&[&db1, &db2, &db3]);
assert!(result.is_err());
let error_msg = result.unwrap_err().to_string();
assert!(error_msg.contains("k-mer size"));
}
#[test]
fn test_validate_compatibility_all_compatible() {
let dbs: Vec<RKDatabase> = vec![
RKDatabase::from_kmer_pairs(vec![(0x1234, 10)], 31, true, true).unwrap(),
RKDatabase::from_kmer_pairs(vec![(0x5678, 20)], 31, true, true).unwrap(),
RKDatabase::from_kmer_pairs(vec![(0x9ABC, 30)], 31, true, true).unwrap(),
RKDatabase::from_kmer_pairs(vec![(0xDEF0, 40)], 31, true, true).unwrap(),
];
let db_refs: Vec<&RKDatabase> = dbs.iter().collect();
let result = RKDatabase::validate_compatibility(&db_refs);
assert!(result.is_ok());
let (kmer_size, canonical) = result.unwrap();
assert_eq!(kmer_size, 31);
assert_eq!(canonical, true);
}
}
}